Guaranteed Delivery
Reliable message processing ensures every tuple emitted by a spout is either fully processed through the entire DAG or retried on failure. This tutorial shows how to implement a spout backed by an external queue with ack/nack callbacks.
For background on how this works internally, see Message Flow and Acker Algorithm.
Defining reliable spouts
Reliable spouts need to obtain an event ID from the source and forward acks/nacks back — so the source knows whether to commit or re-enqueue. The obtained Id has to be passed along with the tuple from the spout function:
// data schema for the topology, every case is a unqiue stream
type Schema =
| Original of int
| Even of int
| Odd of int
// numbers spout - produces messages
let numbers source =
let (tupleId,number) = source()
Some(tupleId, Original (number))
// add 1 bolt - consumes and emits messages to either Even or Odd stream
let addOne (input,emit) =
match input with
| Original x ->
match x % 2 with
| 1 -> Even (x+1)
| _ -> Odd (x+1)
| _ -> failwithf "unexpected input: %A" input
|> emit
// terminating bolt - consumes messages
let logResult (info,input) =
match input with
| Even x
| Odd x -> info (sprintf "Got: %A" input)
| _ -> failwithf "unexpected input: %A" input
Here we mimic an external source and implement all three possible cases: produce a new message, retry a failed one (indefinetely) and ack a successfully processed.
open FsShelter.Topology
type QueueCmd =
| Get of AsyncReplyChannel<TupleId*int>
| Ack of TupleId
| Nack of TupleId
// faking an external source here
let source =
let rnd = Random()
let count = ref 0L
let pending = Dictionary<TupleId,int>()
let nextId() = Threading.Interlocked.Increment &count.contents
MailboxProcessor.Start (fun inbox ->
let rec loop nacked =
async {
let! cmd = inbox.Receive()
return! loop <|
match cmd, nacked with
| Get rc, [] ->
let tupleId,number = Named(string(nextId())), rnd.Next(0, 100)
pending.Add(tupleId,number)
rc.Reply(tupleId,number)
[]
| Get rc,(tupleId,number)::xs ->
pending.Add(tupleId,number)
rc.Reply (tupleId,number)
xs
| Ack id, _ ->
pending.Remove id |> ignore
nacked
| Nack id, _ ->
(id,pending.[id])::nacked
}
loop [])
Anchoring
In order to provide processing guarantees Storm needs to construct and track the state of entire "tuple tree", which is built out by emitting "anchored" tuples. FsShelter implements anchoring statically: instead of ad-hoc, as determined by a component, it is a property of the stream leading to an emit. Consequently the implementation of emit (anchored/unanchored) is determined by the topology graph and completely transparent to the bolt that processes a tuple that will be used as an anchor.
//define the storm topology
open FsShelter.DSL
#nowarn "25" / // for stream matching expressions
let sampleTopology = topology "Guaranteed" {
let s1 = numbers
|> Spout.runReliable (fun log cfg () -> source.PostAndReply Get) // ignoring logging and cfg available
(fun _ -> Ack >> source.Post, Nack >> source.Post)
ignore // no deactivation
let b1 = addOne
|> Bolt.run (fun log cfg tuple emit -> (tuple,emit)) // pass incoming tuple and emit function
|> withParallelism 2
let b2 = logResult
|> Bolt.run (fun log cfg ->
let mylog text = log LogLevel.Info text
fun tuple emit -> (mylog,tuple))
|> withParallelism 1
let b3 = logResult
|> Bolt.run (fun log cfg ->
let mylog text = log LogLevel.Info text
fun tuple emit -> (mylog,tuple))
|> withParallelism 1
yield s1 ==> b1 |> Shuffle.on Original // emit from s1 to b1 on Original stream and anchor immediately following emits to this tuple
yield b1 --> b2 |> Shuffle.on Odd // anchored emit from b1 to b2 on Odd stream
yield b1 --> b3 |> Shuffle.on Even // anchored emit from b1 to b2 on Even stream
}
Resulting topology graph:
The solid lines represent "anchoring" streams and the dotted lines indicate the outer limits of the processing guarantees: a tuple emitted along a dotted line is only anchored if the line leading to it is solid.
For a detailed walkthrough of the XOR-tree tracking that powers this, see Acker Algorithm. For end-to-end message flow scenarios including backpressure and timeouts, see Message Flow.
val int: value: 'T -> int (requires member op_Explicit)
--------------------
type int = int32
--------------------
type int<'Measure> = int
<summary> Topology data model </summary>
<summary> Tuple id - opaque identifier for tuple tracking. </summary>
<summary> ack signature </summary>
<summary> nack signature </summary>
type Random = new: unit -> unit + 1 overload member GetHexString: stringLength: int * ?lowercase: bool -> string + 1 overload member GetItems<'T> : choices: ReadOnlySpan<'T> * length: int -> 'T array + 2 overloads member GetString: choices: ReadOnlySpan<char> * length: int -> string member Next: unit -> int + 2 overloads member NextBytes: buffer: byte array -> unit + 1 overload member NextDouble: unit -> float member NextInt64: unit -> int64 + 2 overloads member NextSingle: unit -> float32 member Shuffle<'T> : values: Span<'T> -> unit + 1 overload ...
<summary>Represents a pseudo-random number generator, which is an algorithm that produces a sequence of numbers that meet certain statistical requirements for randomness.</summary>
--------------------
Random() : Random
Random(Seed: int) : Random
val ref: value: 'T -> 'T ref
--------------------
type 'T ref = Ref<'T>
type Dictionary<'TKey,'TValue> = interface ICollection<KeyValuePair<'TKey,'TValue>> interface seq<KeyValuePair<'TKey,'TValue>> interface IEnumerable interface IDictionary<'TKey,'TValue> interface IReadOnlyCollection<KeyValuePair<'TKey,'TValue>> interface IReadOnlyDictionary<'TKey,'TValue> interface ICollection interface IDictionary interface IDeserializationCallback interface ISerializable ...
<summary>Represents a collection of keys and values.</summary>
<typeparam name="TKey">The type of the keys in the dictionary.</typeparam>
<typeparam name="TValue">The type of the values in the dictionary.</typeparam>
--------------------
Dictionary() : Dictionary<'TKey,'TValue>
Dictionary(dictionary: IDictionary<'TKey,'TValue>) : Dictionary<'TKey,'TValue>
Dictionary(collection: KeyValuePair<'TKey,'TValue> seq) : Dictionary<'TKey,'TValue>
Dictionary(comparer: IEqualityComparer<'TKey>) : Dictionary<'TKey,'TValue>
Dictionary(capacity: int) : Dictionary<'TKey,'TValue>
Dictionary(dictionary: IDictionary<'TKey,'TValue>, comparer: IEqualityComparer<'TKey>) : Dictionary<'TKey,'TValue>
Dictionary(collection: KeyValuePair<'TKey,'TValue> seq, comparer: IEqualityComparer<'TKey>) : Dictionary<'TKey,'TValue>
Dictionary(capacity: int, comparer: IEqualityComparer<'TKey>) : Dictionary<'TKey,'TValue>
<summary>Provides atomic operations for variables that are shared by multiple threads.</summary>
Threading.Interlocked.Increment(location: byref<uint32>) : uint32
Threading.Interlocked.Increment(location: byref<int64>) : int64
Threading.Interlocked.Increment(location: byref<int>) : int
type MailboxProcessor<'Msg> = interface IDisposable new: body: (MailboxProcessor<'Msg> -> Async<unit>) * ?cancellationToken: CancellationToken -> MailboxProcessor<'Msg> + 1 overload member Dispose: unit -> unit member Post: message: 'Msg -> unit member PostAndAsyncReply: buildMessage: (AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout: int -> Async<'Reply> member PostAndReply: buildMessage: (AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout: int -> 'Reply member PostAndTryAsyncReply: buildMessage: (AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout: int -> Async<'Reply option> member Receive: ?timeout: int -> Async<'Msg> member Scan: scanner: ('Msg -> Async<'T> option) * ?timeout: int -> Async<'T> member TryPostAndReply: buildMessage: (AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout: int -> 'Reply option ...
--------------------
new: body: (MailboxProcessor<'Msg> -> Async<unit>) * ?cancellationToken: Threading.CancellationToken -> MailboxProcessor<'Msg>
new: body: (MailboxProcessor<'Msg> -> Async<unit>) * isThrowExceptionAfterDisposed: bool * ?cancellationToken: Threading.CancellationToken -> MailboxProcessor<'Msg>
static member MailboxProcessor.Start: body: (MailboxProcessor<'Msg> -> Async<unit>) * isThrowExceptionAfterDisposed: bool * ?cancellationToken: Threading.CancellationToken -> MailboxProcessor<'Msg>
val string: value: 'T -> string
--------------------
type string = String
Random.Next(maxValue: int) : int
Random.Next(minValue: int, maxValue: int) : int
union case QueueCmd.Ack: TupleId -> QueueCmd
--------------------
type Ack = TupleId -> unit
<summary> ack signature </summary>
(extension) IDictionary.Remove<'TKey,'TValue>(key: 'TKey, value: byref<'TValue>) : bool
Dictionary.Remove(key: TupleId, value: byref<int>) : bool
union case QueueCmd.Nack: TupleId -> QueueCmd
--------------------
type Nack = TupleId -> unit
<summary> nack signature </summary>
<summary> Embedded DSL for defining the topologies </summary>
<summary> topology builder instance </summary>
module Spout from FsShelter.DSL
--------------------
type Spout<'t> = { MkComp: (unit -> Component<'t>) Parallelism: uint32 Executors: uint32 option Conf: Conf } static member WithConf: s: Spout<'a> * conf: Conf -> Spout<'a> static member WithExecutors: s: Spout<'a> * e: uint32 -> Spout<'a> static member WithParallelism: s: Spout<'a> * p: uint32 -> Spout<'a>
<summary> Storm Spout abstraction </summary>
<summary> define a reliable spout mkArgs: one-time construction of arguments that will be passed into each next() call. mkAcker: one time construction of `Ack*Nack` handlers (using the args). next: spout function that returns an id*tuple option. </summary>
module Bolt from FsShelter.DSL
--------------------
type Bolt<'t> = { Activate: 't option Deactivate: 't option MkComp: ((StreamId -> TupleId -> TupleId list) * 't option * 't option -> Component<'t>) Parallelism: uint32 Executors: uint32 option Conf: Conf } static member WithActivation: s: Bolt<'a> * t: 'a -> Bolt<'a> static member WithConf: s: Bolt<'a> * conf: Conf -> Bolt<'a> static member WithDeactivation: s: Bolt<'a> * t: 'a -> Bolt<'a> static member WithExecutors: s: Bolt<'a> * e: uint32 -> Bolt<'a> static member WithParallelism: s: Bolt<'a> * p: uint32 -> Bolt<'a>
<summary> Storm Bolt abstraction </summary>
<summary> define a bolt that auto-acks mkArgs: curried construction of arguments (log and conf applied only once) that will be passed into each next() call. consume: bolt function that will receive incoming tuples. </summary>
<summary> override default parallelism </summary>
<summary> Storm log levels </summary>
<summary> define shuffle grouping </summary>
FsShelter