Data

Introduction

This document defines the protocol used by the network to trasmit messages across the network.

Concepts

Types

Data that flows through Marlin can either be transactions or blocks of any Blockchain Protocol. The protocol can also be extended to other type of data but relevant spam detection mechanisms should be in place.

Constants

Name Type value
TX_MASK bytes32 2^255 = b10000......000
BLOCK_MASK bytes32 2^256 - 2^255 - 1 = b01111....111
CHUNKS_PER_PEER uint8 TBD
CHUNK_REPORT_LIMIT bytes32 TBD

Data Structures

Type Aliases

Name Type Description
Time int Timestamp in unix format
PeerId bytes32 Peer public Identifier
MessageId bytes32 Unique Id for the message
MessageType uint8 Type of the protocol for which message is being sent
LocationId bytes32 Location Id
Tier uint8 Tier number

Chunk

struct Chunk {
    byte[] bytes
    Time timestamp
    PeerId sender
    MessageId messageId
    bytes32 id
    MessageType messageType
    Witness witness
}

ChunkId

struct ChunkId {
    bytes[] partialWitness // only signature and witness from the previous hop
    bytes32 id
}

ParentPeer

struct ParentPeer {
    PeerId peerId
    Score score
}

Witness

struct Witness {
    bytes[] data
    bytes32 signature
}

Storage

chunkCache

mapping((MessageId, ChunkId.id) => Chunk[]) chunkCache

messageCache

mapping(MessageId => Message)

activeParentPeers

mapping((TierId, LocationId) => ParentPeer[]) activeParentPeers

passiveParentPeers

mapping((TierId, LocationId) => ParentPeer[]) passiveParentPeers

NUM_CHUNKS

mapping(MESSAGE_TYPE => uint32)

MIN_CHUNKS

mapping(MESSAGE_TYPE => uint32)

Operations

Setup

Marlin provides support for plugins that can be installed at each subscriber and publisher node for a specific protocol to preprocess the data to perform protocol specific data & optimizations. The plugin also includes a post processor to translate the optimized data sent through Marlin network to it's original form.

There are different pre and post processing plugins used for transactions and blocks. Once the data is processed, the relay nodes need not differentiate between the different types of data that flows through them. Transactions use plugins that validate the message according to the protocol and use compression techniques. Blocks use plugins that validate the block, convert them into serialization formats such as RLPx or a efficient custom format and compress them using techniques like Xthinner or Graphene.

Plugins will also have data regarding beacon nodes that are used to bootstrap connections to the network.

Plugins can also act as a playground for testing various improvements on the data serialization and compression techniques, to test out various algorithms. Analyze these new algorithms in real world conditions. Any custom compression techniques can also be used and plugins can be made availablle to relevant users.

Data Entry

A Miner creates a block to be broadcasted to the network. Miner then generates a unique message id by taking the hash of message and public key of the sender with a random salt. The first bit of the message is 1 for Transaction and 0 for Block.

func bytes[] generateMessageId(bytes[] message, bool isTx):
    messageId = hash(message, self.pubKey, salt)
    if isTx:
        messageId = messageId | TX_MASK
    else
        messageId = messageId & BLOCK_MASK
    return messageId

Message is broken into chunks using erasure coding and sent to connected Marlin peers to ensure that even if few chunks are dropped by malicious users the message can still be reconstructed at the receiver. The message is divided into NUM_CHUNKS[message_type] chunks using Rabin's algorithm as shown below. MIN_CHUNKS[message_type] are required to reconstruct the block. All encoding and decoding operations are done in finite field mod(P).

func bytes[][] getChunksFromMessage(bytes[] message, Message_type message_type):
    chunk_length = MIN_CHUNKS[message_type]
    num_of_chunks = ceiling(message.length/chunk_length)
    padded_message = pad_left(message, chunk_length*num_of_chunks)
    for chunk_number in range(0, num_of_chunks)
        original_chunked_data.push(message[(chunk_number * chunk_length):((chunk_number + 1) * chunk_length) - 1])
    for encoded_chunk_number in range(0, NUM_CHUNKS[message.type])
        for chunk_number in range(0, num_of_chunks)
            for character_number in range(0, MIN_CHUNKS(message_type))
                encoded_chunked_character += original_chunked_data[chunk_number][character_number] * ENCODER_DATA[encoded_chunk_number][character_number]
            encoded_chunked_message.push(encoded_chunked_character)
        encoded_message.push(encoded_chunked_message)
    return encoded_message

Each of the chunks is attested by the source or by someone delegated by source. Attested chunks are then randomly selected and sent to peers along with a witness.

func send(message):
    chunks = getChunksFromMessage(message)
    for chunk in chunks:
    if should_self_attest:
            attestedChunks += attest(chunk)
        else:
            attestedChunks += delegateAttestation(chunk, delegate)

    for peer in active_child_peers:
        witness = createWitness(null, self.privKey, peer.pubKey, self.relayerFee)
        for idx in range(0,CHUNKS_PER_PEER):
            randomChunk = attestedChunks.popRandomChunk()
            peer.send(randomChunk, witness)
func attest(message):
    sig = sign(message)
    return (message || sig)
func delegateAttestation(message):
    sig = delegateSign(message, delegate)
    return (message || sig)
func onDelegateAttestation(message):
    return sign(message)

Data Propogation

Once the chunks are sent to the relayer peers by the source. Relayer then checks if the message is attested and if the witness is valid.

func onChunk(PeerId from, bytes[] bytes):
    isValid = checkAttestation(bytes)
    if !isValid:
        return
    chunk = parseMessage(bytes)
    if !verifyWitness(chunk.witness) :
        return
    source_locationId = getLocation(getOrigin(bytes))
    sendToPeers(source_locationId, chunk)
    reportUnverifiedMessage(chunk)
func bool verifyWitness(Witness witness):
    isSigCorrect = verifySignature(witness.data, witness.signature)
    if !isSigCorrect :
        return false
    innerWitness = witness.getInnerWitness() // witness of the previous relayer
    if !innerWitness : // if innerwitness is null
        return true
    return verifyWitness(innerWitness)

Based on Child Selection Algorithm the peers to communicate data with are either active or passive. Based on the type of peer, relayer decides on the data sent. Complete chunk is sent to active peers and chunkId is sent to passive peers.

func sendToPeers(LocationId source_locationId, Chunk chunk) {
    for(tier in tiers) {
        for(peer in active_child_peers[tier][source_locationId]) {
            sendChunkNotificationMessage(peer, chunk);
        } 

        for(peer in passive_child_peers[tier][source_locationId]) {
            sendChunkIdNotificationMessage(peer, chunk);
        }
    }
}

Before chunks are sent to active peers, relayers should report a chunk even if the chunk is otherwise valid if the hash of the chunk falls below the CHUNK_REPORT_LIMIT.

func sendChunkNotificationMessage(Peer peer, Chunk chunk):
    witness = createWitness(chunk.witness, self.privKey, peer.pubKey, self.relayerFee)
    if verifyMessage(keccak256(chunk.toString())):
        chunk.witness = witness
        chunkCache[chunk.messageId][id].push(chunk)
        send(peer, chunk)
        return
func sendChunkIdNotificationMessage(Peer peer, Chunk chunk):
    witness = createWitness(chunk.witness, self.privKey, peer.pubKey)
    partialWitness = getShortWitness(witness) // only witness with signature of the present hop
    if verifyMessage(keccak256(chunk.toString())):
        chunk.witness = witness
        chunkCache[chunk.messageId][id].push(chunk)
        send(peer, ChunkId(chunkid, partialWitness))
        return
func bool verifyMessage(chunkHash):
    if chunkHash <= CHUNK_REPORT_LIMIT:
        return false
    return true

The witness is also verified for instances where relayer propogated unverified messages when they have to be reported. If such an instance is found it is reported to ReportUnverifiedMessages contract.

func reportUnverifiedMessage(chunk):
    relayerSignatures = getWitnessSignatures(chunk.witness)
    for signature in relayerSignatures
        if isMessageToBeReported(signature):
            relayer = getRelayer(signature, chunk.witness)
            reportUnverifiedMessageToContract(chunk.data, relayer, signature, chunk.witness)
func reportUnverifiedMessageToContract(data, relayer, signature, witness):
    ReportUnverifiedMessages.report()
contract ReportUnverifiedMessages {
    function report(bytes[] chunkData, bytes32 relayer, bytes32 signature) {
        if(verifySignature(chunkData, signature)) {
            if(keccak256(chunkData) <= CHUNK_REPORT_LIMIT) {
                slash(relayer)
            }
        }
    }
}

Data Exit

A consumer connected to Marlin network can receive the messages based on the messageId or sender. The chunks received are checked for attestation. Once MIN_CHUNKS[message_type] chunks are received by the consumer the message can be reconstructed.

func receive(bytes[] data):
    isValid = checkAttestation(data)
    if !isValid:
        return
    chunk = parseChunk(data)
    chunks = chunkCache[chunk.messageId]
    if chunkId in chunks:
        return
    if chunks[chunk.id].length >= MIN_CHUNKS[chunk.messageType]:
        message = getMessageFromChunks(chunks)
        messageCache.push(message)
        messageReceived(message)
func messageReceived(message)
    // send notification to user
func bytes[] getMessageFromChunks(bytes[][] chunks, int[] chunk_indices):
    for chunk_indicator_index in chunk_indices:
        relevant_encoder_data.push(ENCODER_DATA[chunk_indices[chunk_indicator_index]])
    decoder_data = matrixInverse(relevant_encoder_data)
    decoded_chunks = matrixMultiplication(decoder_data, chunks)
    for chunk in decoded_chunks:
        decodedMessage = decodedMessage.append(decoded_chunks[chunk])
    return decodedMessage