Logo FsShelter

Routing

The routing layer determines how tuples emitted by a component reach the appropriate downstream bolt instances. It supports four grouping strategies and handles stream-based fan-out.

Grouping strategies

When defining a topology, each stream connection specifies a grouping that controls how tuples are distributed across bolt instances:

type Grouping<'t> =
    | Shuffle                               // Hash-based distribution
    | Fields of ('t -> obj) * string list   // Key-based affinity
    | All                                   // Broadcast to all instances
    | Direct                                // Caller-specified target

Shuffle

Distributes tuples across bolt instances by hashing the tuple ID:

| Shuffle when instances.Length = 1 ->
    fun _ _ -> instances :> _ seq          // Optimization: skip hash for single instance
| Shuffle ->
    let ix tupleId = abs(tupleId.GetHashCode() % instances.Length)
    fun tupleId _ ->
        instances.[ix tupleId] |> Seq.singleton
graph LR
    SP[Spout] -->|"hash mod 3 = 0"| B0[Bolt 0]
    SP -->|"hash mod 3 = 1"| B1[Bolt 1]
    SP -->|"hash mod 3 = 2"| B2[Bolt 2]

Fields

Routes tuples to a specific instance based on a key extracted from the tuple. Guarantees that tuples with the same key always go to the same bolt instance:

| Fields (map, _) ->
    fun _ tuple ->
        let ix = (map tuple).GetHashCode() % instances.Length |> abs
        instances.[ix] |> Seq.singleton

DSL usage:

yield b1 --> b2 |> Group.by (function Odd(n, _) -> n.x | _ -> failwith "unexpected")
graph LR
    B1[Bolt 1] -->|"key=apple, hash mod 2 = 0"| B2a[Bolt 2a]
    B1 -->|"key=banana, hash mod 2 = 1"| B2b[Bolt 2b]

All

Broadcasts every tuple to all instances of the downstream bolt:

| All ->
    fun _ _ -> instances :> _ seq
graph LR
    SP[Spout] --> B0[Bolt 0]
    SP --> B1[Bolt 1]
    SP --> B2[Bolt 2]

Direct

Allows the emitter to specify the target task ID explicitly. Returns an empty sequence from the group function — routing is handled separately via a direct lookup:

| Direct ->
    fun _ _ -> Seq.empty

// Direct routing bypasses the distributor:
let direct =
    memoize (fun dstId ->
        let (_, (send, _)) = boltTasks |> Map.find dstId
        in Other >> send)

Tuple router construction

Routing.mkTupleRouter builds the complete routing function for a task. It combines:

  1. Stream definitions — which components subscribe to which streams
  2. Grouping functions — how to select target instances
  3. ID generationmkIds for tuple tracking
graph TB
    subgraph mkTupleRouter
        EM["Emit(tuple, anchors, srcId, compId, stream, dstId)"]

        EM -->|"dstId = Some id"| DIRECT["Direct Send"]
        EM -->|"dstId = None"| DIST["Distributor Lookup"]

        DIST --> GROUP["Grouping Function"]
        GROUP --> INST["Target Instance(s)"]
        INST --> SEND["Send to Disruptor ring buffer"]

        DIRECT --> SEND
    end

Distributor construction

For each (streamId, componentId) pair in the topology's stream definitions, a distributor function is created at startup:

let mkDistributors taskId map (KeyValue((streamId, dstId), streamDef)) =
    let instances = sinksOfComp dstId
    let group = mkGroup instances streamDef.Grouping
    map |> Map.add streamId (fun mkIds tuple ->
        let ids = mkIds ()
        ids |> Seq.iter (fun tupleId ->
            let msg = Tuple(tuple, tupleId, fst streamId, snd streamId, taskId) |> Other
            group tupleId tuple
            |> Seq.apply msg))

Instance resolution

Bolt instances are grouped by component ID using memoization:

let sinksOfComp =
    let bolts = boltTasks |> Map.groupBy
        (fun (_, (compId, _)) -> compId)
        (fun (_, (_, (send, _))) -> send)
    memoize (fun compId -> bolts.[compId] |> Array.ofSeq)

Stream-based fan-out

A single component can emit to multiple streams, each routed to different downstream bolts:

graph LR
    A[Bolt A] -->|"Odd stream Shuffle"| B1[Bolt B logOdd]
    A -->|"Even stream Shuffle"| B2[Bolt C logEven]

DSL definition:

yield b1 --> b2 |> Shuffle.on Odd    // Odd stream from b1 to b2
yield b1 --> b3 |> Shuffle.on Even   // Even stream from b1 to b3

Each stream has its own distributor entry in the routing map. When a bolt emits, the getStream function (derived from the DU case) determines which distributor handles it.

Anchored vs unanchored routing

The routing function receives a mkIds parameter that differs based on context:

Context

mkIds Source

Behavior

Spout emit

TupleTree.track

Generates anchorId, sends Track + Anchor to acker

Bolt emit (anchored)

TupleTree.anchor

Sends Anchor to existing tree, returns "anchorId:tupleId"

Bolt emit (unanchored)

Identity

Returns plain tuple IDs (no acker interaction)

The ==> operator in the DSL creates anchored streams where emitted tuples are tracked in the anchor tree. The --> operator creates unanchored streams where emitted tuples are not tracked:

yield s1 ==> b1 |> Shuffle.on Original   // Anchored: b1's emits are tracked
yield b1 ==> b2 |> Shuffle.on Odd        // Anchored: b2's emits are tracked
yield b1 --> b3 |> Shuffle.on Even       // Unanchored: b3's emits are not tracked

Complete routing example

For the Guaranteed sample topology:

graph TB
    S1["numbers (1 instance)"]
    B1a["addOne (instance 0)"]
    B1b["addOne (instance 1)"]
    B2["logOdd (1 instance)"]
    B3["logEven (1 instance)"]

    S1 -->|"Original Shuffle"| B1a
    S1 -->|"Original Shuffle"| B1b
    B1a -->|"Odd Shuffle"| B2
    B1b -->|"Odd Shuffle"| B2
    B1a -->|"Even Shuffle"| B3
    B1b -->|"Even Shuffle"| B3

    A1["Acker 1"]
    A2["Acker 2"]

    S1 -. "Track" .-> A1
    S1 -. "Track" .-> A2
    B1a -. "Anchor + Ok" .-> A1
    B1b -. "Anchor + Ok" .-> A2
    B2 -. "Ok" .-> A1
    B3 -. "Ok" .-> A2

Routing decisions:

type Grouping<'t> = | Shuffle | Fields of ('t -> obj) * string list | All | Direct
't
type obj = System.Object
Multiple items
val string: value: 'T -> string

--------------------
type string = System.String
type 'T list = List<'T>
Multiple items
val seq: sequence: 'T seq -> 'T seq

--------------------
type 'T seq = System.Collections.Generic.IEnumerable<'T>
val abs: value: 'T -> 'T (requires member Abs)
module Seq from Microsoft.FSharp.Collections
val singleton: value: 'T -> 'T seq
val failwith: message: string -> 'T
val empty<'T> : 'T seq
Multiple items
module Map from Microsoft.FSharp.Collections

--------------------
type Map<'Key,'Value (requires comparison)> = interface IReadOnlyDictionary<'Key,'Value> interface IReadOnlyCollection<KeyValuePair<'Key,'Value>> interface IEnumerable interface IStructuralEquatable interface IComparable interface IEnumerable<KeyValuePair<'Key,'Value>> interface ICollection<KeyValuePair<'Key,'Value>> interface IDictionary<'Key,'Value> new: elements: ('Key * 'Value) seq -> Map<'Key,'Value> member Add: key: 'Key * value: 'Value -> Map<'Key,'Value> ...

--------------------
new: elements: ('Key * 'Value) seq -> Map<'Key,'Value>
val find: key: 'Key -> table: Map<'Key,'T> -> 'T (requires comparison)
active recognizer KeyValue: System.Collections.Generic.KeyValuePair<'Key,'Value> -> 'Key * 'Value
val add: key: 'Key -> value: 'T -> table: Map<'Key,'T> -> Map<'Key,'T> (requires comparison)
val iter: action: ('T -> unit) -> source: 'T seq -> unit
val fst: tuple: ('T1 * 'T2) -> 'T1
val snd: tuple: ('T1 * 'T2) -> 'T2
module Array from Microsoft.FSharp.Collections
val ofSeq: source: 'T seq -> 'T array

Type something to start searching.