Child Management

Introduction

This document defines a child management protocol used by Marlin nodes to maintain a set of child nodes from whom they wish to receive chunks or chunk IDs.

For the couterpart protocol that manages parents, see the parent management protocol.

Concepts

Chunks

Messages are broken into erasure-coded chunks for faster and more reliable transmission. Each chunk has a chunk ID associated with it.

Tiers

Peers are categorized into tiers based on proximity:

  • Tier 0: <5 ms
  • Tier 1: <50 ms
  • Tier 2: 50-100 ms
  • Tier 3: 100-150 ms
  • Tier 4: 150+ ms

Each tier has limited vacancies which are adjusted to force the children to pick diverse parents.

Active, passive and tracked peers

Each peer is further categorized as an active, passive or a tracked peer.

Active peers are transmitted entire chunks and are the hot path used for fast propagation through the Marlin network.

Passive peers are transmitted only the chunk IDs of the chunks nodes receive and act as an advertisement for children to fetch chunks on request in case they are missing any chunks needed to reconstruct the message. Apart from this, it also acts as an advertisement

Tracked peers are not transmitted anything, their metrics are simply tracked for the purposes of our reinforcement learning algorithm.

Peer optimization routines periodically adjust nodes in these sets to maximize profits.

Constants

Name Type Value
INITIAL_SCORE double 0.3
ACTIVE_PEERCOUNT_SOFTTARGET mapping(Tier => uint8) {0: 1, 1: 3, 2: 3, 3: 1, 4: 1}
PASSIVE_PEERCOUNT_SOFTTARGET mapping(Tier => uint8) {0: 5, 1: 15, 2: 15, 3: 5, 4: 5}
TRACKED_PEERCOUNT_SOFTTARGET mapping(Tier => uint8) {0: 5, 1: 15, 2: 15, 3: 5, 4: 5}

Data structures

Type aliases

Name Type Description Example
Tier uint8 Tier number 0, 1, 2, ..., 4
LocationId bytes32 Location ID
PeerId bytes32 Peer ID
Score double Score value indicating fitness for being a child

ChildPeer

struct ChildPeer {
    PeerId peerId
    Score score
}

Storage

activeChildPeers

mapping((TierId, LocationId) => ChildPeer[]) activeChildPeers

passiveChildPeers

mapping((TierId, LocationId) => ChildPeer[]) passiveChildPeers

trackedChildPeers

mapping((TierId, LocationId) => ChildPeer[]) trackedChildPeers

Operations

Joining the network

The node sets a recurring timer that triggers peer optimization.

func findChildren() {
    DiscoveryProtocol::startDiscovery(beaconAddr)
    helpers::startPeerOptimizationTimer()
}

Leaving the network

The node disconnects from all its children before leaving the network.

func disconnectChildren() {
    for tier in tiers {
        for location in locations {
            for peer in activeChildPeers[(tier, location)] {
                activeChildPeers[(tier, location)].remove(peer)
                PubSubProtocol::close(peer)
            }
            for peer in passiveChildPeers[(tier, location)] {
                passiveChildPeers[(tier, location)].remove(peer)
                PubSubProtocol::close(peer)
            }
        }
    }
}

New subscription

On receiving a new subscription request, the active, passive and tracked lists are checked in order to see if there are vacancies. In case there are vacancies, the peer is added to the list and an appropriate subscribeSuccess message is sent to the peer. If all lists are full, a subscribeFailure message is sent instead.

func onSubscribe(Peer peer) {
    peerLocationId = LocationProtocol::getLocation(peer)
    selfLocationId = LocationProtocol::getLocation(self)
    tier = helpers::getTier(selfLocationId, peerLocationId)

    for locationId in LocationProtocol::allLocations() {
        if activeChildPeers[(tier, locationId)].size() < ACTIVE_PEERCOUNT_SOFTTARGET[tier] {
            PubSubProtocol::subscribeSuccess(peer, locationId, CHUNKS)
        } else if passiveChildPeers[(tier, locationId)].size() < PASSIVE_PEERCOUNT_SOFTTARGET[tier] {
            PubSubProtocol::subscribeSuccess(peer, locationId, CHUNK_IDS)
        }
    }
}

func onSubscribeSuccess(Peer peer, LocationId locationId, SubscriptionType subscriptionType) {
    if subscriptionType == CHUNKS {
        activeChildPeers[(tier, locationId)].insert(peer, INITIAL_SCORE)
    } else if subscriptionType == CHUNK_IDS {
        passiveChildPeers[(tier, locationId)].insert(peer, INITIAL_SCORE)
    }
}

func onSubscribeFailure(Peer peer, LocationId locationId, SubscriptionType subscriptionType) {
    // Nothing
}

Peer close

When a peer closes the connection, it is removed from all the active and passive sets.

func onClose(Peer peer) {
    for tier in tiers {
        for location in locations {
            activeChildPeers[(tier, location)].remove(peer)
            passiveChildPeers[(tier, location)].remove(peer)
        }
    }
}

New chunk

On receiving a new chunk the peer's score is updated based on a scoring function.

func onNewChunk() {
    updatePeerRanking(
        from,
        contentCache[contentType+contentId].size() +
            idCache[contentType+contentId].size(),
        MIN_CHUNKS[contentType+contentId]
    );
}

Invalid chunk

On receiving an invalid chunk, the peer is removed from the active set and a passive peer is promoted to active status.

func onInvalidChunk(Peer peer) {
    peerLocationId = LocationProtocol::getLocation(peer)

    activeChildPeers[(tier, peerLocationId)].remove(peer);

    bestPassivePeer, passiveMetric = passiveChildPeers[(tier, location)].getBestPeer()

    activeChildPeers[(tier, peerLocationId)].insert(bestPassivePeer, passiveMetric);
    passiveChildPeers[(tier, peerLocationId)].remove(bestPassivePeer);
}

New chunk ID

On receiving a new chunk the peer's score is updated based on a reinforcement learning algorithm.

func onNewChunkId() {
    updatePeerRanking(
        from,
        contentCache[contentType+contentId].size() +
            idCache[contentType+contentId].size(),
        MIN_CHUNKS[contentType+contentId]
    );
}

Invalid chunk ID

On receiving an invalid chunk ID, the peer is removed from the passive set and a tracked peer is promoted to passive status.

func onInvalidChunkId(Peer peer) {
    peerLocationId = LocationProtocol::getLocation(peer)

    passiveChildPeers[(tier, peerLocationId)].remove(peer);

    bestTrackedPeer, trackedMetric = trackedChildPeers[(tier, location)].getBestPeer()

    passiveChildPeers[(tier, peerLocationId)].insert(bestTrackedPeer, trackedMetric);
    trackedChildPeers[(tier, peerLocationId)].remove(bestTrackedPeer);
}

Optimizing active peers

On a timer event, the node optimizes its active peers by exchanging a peer in the active and passive lists if the passive list has a significantly better peer.

func onTimerEvent() {
    for tier in tiers {
        for location in locations {
            bestPassivePeer, passiveMetric = passiveChildPeers[(tier, location)].getBestPeer()
            worstActivePeer, activeMetric = activeChildPeers[(tier, location)].getWorstPeer()
            if passiveMetric * 0.9 > activeMetric {  // Damping factor to prevent excessive swaps
                passiveChildPeers[(tier, location)].remove(bestPassivePeer)
                activeChildPeers[(tier, location)].remove(worstActivePeer)
                passiveChildPeers[(tier, location)].add(worstActivePeer)
                activeChildPeers[(tier, location)].add(bestPassivePeer)

                unsubscribe(worstActivePeer)
                subscribe(bestPassivePeer)
            }
        }
    }
}

Optimizing passive peers

On a timer event, the node optimizes its passive peers by exchange a peer in the passive and tracked lists based on a reinforcement algorithm.