Routing
The routing layer determines how tuples emitted by a component reach the appropriate downstream bolt instances. It supports four grouping strategies and handles stream-based fan-out.
Grouping strategies
When defining a topology, each stream connection specifies a grouping that controls how tuples are distributed across bolt instances:
type Grouping<'t> =
| Shuffle // Hash-based distribution
| Fields of ('t -> obj) * string list // Key-based affinity
| All // Broadcast to all instances
| Direct // Caller-specified target
Shuffle
Distributes tuples across bolt instances by hashing the tuple ID:
| Shuffle when instances.Length = 1 ->
fun _ _ -> instances :> _ seq // Optimization: skip hash for single instance
| Shuffle ->
let ix tupleId = abs(tupleId.GetHashCode() % instances.Length)
fun tupleId _ ->
instances.[ix tupleId] |> Seq.singleton
|
Fields
Routes tuples to a specific instance based on a key extracted from the tuple. Guarantees that tuples with the same key always go to the same bolt instance:
| Fields (map, _) ->
fun _ tuple ->
let ix = (map tuple).GetHashCode() % instances.Length |> abs
instances.[ix] |> Seq.singleton
DSL usage:
yield b1 --> b2 |> Group.by (function Odd(n, _) -> n.x | _ -> failwith "unexpected")
|
All
Broadcasts every tuple to all instances of the downstream bolt:
| All ->
fun _ _ -> instances :> _ seq
|
Direct
Allows the emitter to specify the target task ID explicitly. Returns an empty sequence from the group function — routing is handled separately via a direct lookup:
| Direct ->
fun _ _ -> Seq.empty
// Direct routing bypasses the distributor:
let direct =
memoize (fun dstId ->
let (_, (send, _)) = boltTasks |> Map.find dstId
in Other >> send)
Tuple router construction
Routing.mkTupleRouter builds the complete routing function for a task. It combines:
- Stream definitions — which components subscribe to which streams
- Grouping functions — how to select target instances
- ID generation —
mkIdsfor tuple tracking
|
Distributor construction
For each (streamId, componentId) pair in the topology's stream definitions, a distributor function is created at startup:
let mkDistributors taskId map (KeyValue((streamId, dstId), streamDef)) =
let instances = sinksOfComp dstId
let group = mkGroup instances streamDef.Grouping
map |> Map.add streamId (fun mkIds tuple ->
let ids = mkIds ()
ids |> Seq.iter (fun tupleId ->
let msg = Tuple(tuple, tupleId, fst streamId, snd streamId, taskId) |> Other
group tupleId tuple
|> Seq.apply msg))
Instance resolution
Bolt instances are grouped by component ID using memoization:
let sinksOfComp =
let bolts = boltTasks |> Map.groupBy
(fun (_, (compId, _)) -> compId)
(fun (_, (_, (send, _))) -> send)
memoize (fun compId -> bolts.[compId] |> Array.ofSeq)
Stream-based fan-out
A single component can emit to multiple streams, each routed to different downstream bolts:
|
DSL definition:
yield b1 --> b2 |> Shuffle.on Odd // Odd stream from b1 to b2
yield b1 --> b3 |> Shuffle.on Even // Even stream from b1 to b3
Each stream has its own distributor entry in the routing map. When a bolt emits, the getStream function (derived from the DU case) determines which distributor handles it.
Anchored vs unanchored routing
The routing function receives a mkIds parameter that differs based on context:
Context |
mkIds Source |
Behavior |
|---|---|---|
Spout emit |
|
Generates |
Bolt emit (anchored) |
|
Sends |
Bolt emit (unanchored) |
Identity |
Returns plain tuple IDs (no acker interaction) |
The ==> operator in the DSL creates anchored streams where emitted tuples are tracked in the anchor tree.
The --> operator creates unanchored streams where emitted tuples are not tracked:
yield s1 ==> b1 |> Shuffle.on Original // Anchored: b1's emits are tracked
yield b1 ==> b2 |> Shuffle.on Odd // Anchored: b2's emits are tracked
yield b1 --> b3 |> Shuffle.on Even // Unanchored: b3's emits are not tracked
Complete routing example
For the Guaranteed sample topology:
|
Routing decisions:
- Spout → addOne:
Shuffleacross 2 instances →hash(tupleId) % 2 - addOne → logOdd:
Shuffleacross 1 instance → always instance 0 - addOne → logEven:
Shuffleacross 1 instance → always instance 0 - Acker assignment:
anchorId % 2determines which acker handles the tree
val string: value: 'T -> string
--------------------
type string = System.String
val seq: sequence: 'T seq -> 'T seq
--------------------
type 'T seq = System.Collections.Generic.IEnumerable<'T>
module Map from Microsoft.FSharp.Collections
--------------------
type Map<'Key,'Value (requires comparison)> = interface IReadOnlyDictionary<'Key,'Value> interface IReadOnlyCollection<KeyValuePair<'Key,'Value>> interface IEnumerable interface IStructuralEquatable interface IComparable interface IEnumerable<KeyValuePair<'Key,'Value>> interface ICollection<KeyValuePair<'Key,'Value>> interface IDictionary<'Key,'Value> new: elements: ('Key * 'Value) seq -> Map<'Key,'Value> member Add: key: 'Key * value: 'Value -> Map<'Key,'Value> ...
--------------------
new: elements: ('Key * 'Value) seq -> Map<'Key,'Value>
FsShelter