Acker algorithm
The acker implements Storm's XOR-tree algorithm for tracking tuple completion across a DAG of bolts. It provides guaranteed message processing: every tuple emitted by a spout will eventually be either fully acknowledged (all downstream bolts completed) or nacked (timeout or failure).
Core idea
Each tuple tree is identified by an anchorId (random int64). The acker maintains a single XOR accumulator per anchor. As tuples flow through bolts, each tuple ID is XOR'd into the accumulator twice — once when anchored (created) and once when completed (Ok). Since x ⊕ x = 0, when all tuples complete, the accumulator returns to zero.
Data structures
AckerMsg
Messages sent to acker tasks:
type AckerMsg =
| Track of taskId:TaskId * tupleId:TupleId * anchorId:int64
| Anchor of anchoredId:(int64 * int64) // (anchorId, tupleId)
| Ok of okId:(int64 * int64) // (anchorId, tupleId)
| Fail of failId:(int64 * int64) // (anchorId, tupleId)
TreeState
Internal acker state per anchor:
type TreeState =
| Pending of TupleId * TaskId * int64 * int64
// srcId srcTask xorVal (unused, 0L)
| Complete of srcId:TupleId * src:TaskId
| Done
The fourth field of Pending is unused (always 0L) — timeout expiry is handled by bucket rotation rather than per-entry timestamps.
AnchoredTupleId
Tuple IDs flowing through bolts are encoded as "anchorId:tupleId" strings:
let (|AnchoredTuple|_|) (tupleId:string) =
match tupleId.Split ':' with
| [|anchor;tid|] -> Some (int64 anchor, int64 tid)
| _ -> None
State machine
|
TupleTree module
The TupleTree module provides three functions that produce the acker protocol messages at the spout and bolt layers.
Acker assignment
Each anchor is assigned to an acker instance by hashing:
let inline ackerOfAnchor (ackers: _ array) (anchorId: int64) =
let i = abs (anchorId % (int64 ackers.Length))
ackers.[int i]
This ensures all messages for a given anchor always go to the same acker instance.
TupleTree.track (spout side)
Called when a spout emits a tuple. Generates a fresh anchorId and sends Track to the acker:
let track nextId ackers taskId _ sourceTupleId =
let anchorId = nextId()
match sourceTupleId with
| Some sid -> Track(taskId, sid, anchorId) |> toAcker
| _ -> ()
fun () ->
let tupleId = nextId()
Anchor(anchorId, tupleId) |> toAcker
[sprintf "%d:%d" anchorId tupleId]
Returns a function mkIds: unit -> string list that, when called by the router, generates a new tupleId, sends Anchor to the acker, and returns ["anchorId:tupleId"] for downstream bolts.
|
TupleTree.anchor (bolt side)
Called when a bolt receives a tuple and prepares to emit downstream. Parses the incoming anchored IDs, and returns a function that generates new tuple IDs anchored to the same tree:
let anchor nextId ackers anchors _ =
let anchors =
anchors
|> List.choose ((|AnchoredTuple|_|) >> Option.map (fun (a, _) -> a, toAcker ackers a))
fun () ->
let tupleId = nextId()
anchors |> List.iter (fun ((aid, enqueue), _) -> Anchor(aid, tupleId) |> enqueue)
match anchors with
| [] -> [string tupleId]
| _ -> anchors |> List.map (fun ((aid, _), _) -> sprintf "%d:%d" aid tupleId)
|
TupleTree.mkAck (bolt completion)
Creates ack/nack functions that bolts use to signal tuple completion:
let mkAck toResult ackers =
function
| AnchoredTuple (a, id) ->
toResult (a, id) |> toAcker ackers a
| _ -> ()
Called with Ok for successful processing, Fail for exceptions.
|
XOR accumulation example
Consider a simple DAG: Spout → Bolt A → Bolt B:
|
Step |
Event |
XOR Value |
State |
|---|---|---|---|
1 |
|
|
Pending |
2 |
|
|
Pending |
3 |
|
|
Pending |
4 |
|
|
Pending |
5 |
|
|
Complete → Ack |
Fan-out example
Spout → Bolt A → (Bolt B, Bolt C):
Step |
Event |
XOR Value |
|---|---|---|
1 |
Track |
|
2 |
Anchor(aid, 100) — spout→A |
|
3 |
Anchor(aid, 200) — A→B |
|
4 |
Anchor(aid, 300) — A→C |
|
5 |
Ok(aid, 100) — A done |
|
6 |
Ok(aid, 200) — B done |
|
7 |
Ok(aid, 300) — C done |
|
Acker capacity and bucket rotation
The acker uses a rotating array of numBuckets = 3 dictionaries instead of a single inFlight dictionary. New entries go into the current bucket. On each Tick (every 30 seconds), the oldest bucket is expired — all remaining Pending entries are nacked and the bucket is cleared. This provides coarse-grained timeout without per-entry timestamps.
let numBuckets = 3
let buckets = Array.init numBuckets (fun _ -> Dictionary<int64, TreeState>(...))
let mutable currentBucket = 0
Lookup across buckets
tryFind scans all buckets to locate an anchor, returning ValueSome(bucketIndex, state). xor uses the bucket index to update the entry in place:
let tryFind anchor =
for i = 0 to numBuckets - 1 do
match buckets.[i].TryGetValue anchor with ...
let xor tupleId anchor =
match tryFind anchor with
| ValueSome (i, Pending(sourceId, taskId, v, ts)) ->
let v = v ^^^ tupleId
ValueSome (i, if v = 0L then Complete(...) else Pending(...))
Bucket rotation (timeout expiry)
The system timer sends Tick to ackers every 30 seconds. On tick:
- Identify the oldest bucket:
(currentBucket + 1) % numBuckets - For each
Pendingentry in that bucket: sendNackto the source spout - Clear the bucket
- Advance
currentBucketto the now-empty bucket
With 3 buckets and a 30-second tick, entries survive between 30 and 90 seconds before expiry.
|
High-water mark
A capacity guard prevents unbounded memory growth. When totalCount() (sum of all buckets) exceeds highWater * 2, new tuples are nacked immediately:
let totalCount () = buckets |> Array.sumBy (fun b -> b.Count)
if totalCount() > highWater * 2 then
Nack sid |> sendToSpout taskId
ID generation
Each task has its own Random instance for generating tuple and anchor IDs:
let seed = ref (int DateTime.Now.Ticks)
let mkIdGenerator() =
let rnd = Random(Threading.Interlocked.Increment &seed.contents)
let rec nextId () =
let v = rnd.NextInt64()
if v = 0L then nextId() // 0 would break XOR tracking
else v
nextId
Zero values are excluded because x ⊕ 0 = x — a zero tuple ID would be invisible to the XOR accumulator.
val int64: value: 'T -> int64 (requires member op_Explicit)
--------------------
type int64 = System.Int64
--------------------
type int64<'Measure> = int64
val string: value: 'T -> string
--------------------
type string = System.String
(+0 other overloads)
System.String.Split( separator: char array) : string array
(+0 other overloads)
System.String.Split(separator: string array, options: System.StringSplitOptions) : string array
(+0 other overloads)
System.String.Split(separator: string, ?options: System.StringSplitOptions) : string array
(+0 other overloads)
System.String.Split(separator: char array, options: System.StringSplitOptions) : string array
(+0 other overloads)
System.String.Split(separator: char array, count: int) : string array
(+0 other overloads)
System.String.Split(separator: char, ?options: System.StringSplitOptions) : string array
(+0 other overloads)
System.String.Split(separator: string array, count: int, options: System.StringSplitOptions) : string array
(+0 other overloads)
System.String.Split(separator: string, count: int, ?options: System.StringSplitOptions) : string array
(+0 other overloads)
System.String.Split(separator: char array, count: int, options: System.StringSplitOptions) : string array
(+0 other overloads)
val int: value: 'T -> int (requires member op_Explicit)
--------------------
type int = int32
--------------------
type int<'Measure> = int
module List from Microsoft.FSharp.Collections
--------------------
type List<'T> = | op_Nil | op_ColonColon of Head: 'T * Tail: 'T list interface IReadOnlyList<'T> interface IReadOnlyCollection<'T> interface IEnumerable interface IEnumerable<'T> member GetReverseIndex: rank: int * offset: int -> int member GetSlice: startIndex: int option * endIndex: int option -> 'T list static member Cons: head: 'T * tail: 'T list -> 'T list member Head: 'T member IsEmpty: bool member Item: index: int -> 'T with get ...
val ref: value: 'T -> 'T ref
--------------------
type 'T ref = Ref<'T>
FsShelter