Logo FsShelter

Guaranteed Delivery

Reliable message processing ensures every tuple emitted by a spout is either fully processed through the entire DAG or retried on failure. This tutorial shows how to implement a spout backed by an external queue with ack/nack callbacks.

For background on how this works internally, see Message Flow and Acker Algorithm.

Defining reliable spouts

Reliable spouts need to obtain an event ID from the source and forward acks/nacks back — so the source knows whether to commit or re-enqueue. The obtained Id has to be passed along with the tuple from the spout function:

// data schema for the topology, every case is a unqiue stream
type Schema = 
    | Original of int
    | Even of int
    | Odd of int

// numbers spout - produces messages
let numbers source =
    let (tupleId,number) = source()
    Some(tupleId, Original (number)) 

// add 1 bolt - consumes and emits messages to either Even or Odd stream
let addOne (input,emit) =
    match input with
    | Original x -> 
        match x % 2 with
        | 1 -> Even (x+1)
        | _ -> Odd (x+1)
    | _ -> failwithf "unexpected input: %A" input
    |> emit

// terminating bolt - consumes messages
let logResult (info,input) =
    match input with
    | Even x
    | Odd x -> info (sprintf "Got: %A" input)
    | _ -> failwithf "unexpected input: %A" input

Here we mimic an external source and implement all three possible cases: produce a new message, retry a failed one (indefinetely) and ack a successfully processed.

open FsShelter.Topology

type QueueCmd =
    | Get of AsyncReplyChannel<TupleId*int>
    | Ack of TupleId
    | Nack of TupleId

// faking an external source here
let source = 
    let rnd = Random()
    let count = ref 0L
    let pending = Dictionary<TupleId,int>()
    let nextId() = Threading.Interlocked.Increment &count.contents

    MailboxProcessor.Start (fun inbox -> 
        let rec loop nacked = 
            async { 
                let! cmd = inbox.Receive()
                return! loop <|
                       match cmd, nacked with
                       | Get rc, [] ->
                            let tupleId,number = Named(string(nextId())), rnd.Next(0, 100)
                            pending.Add(tupleId,number)
                            rc.Reply(tupleId,number)
                            []
                       | Get rc,(tupleId,number)::xs ->
                            pending.Add(tupleId,number)
                            rc.Reply (tupleId,number)
                            xs
                       | Ack id, _ -> 
                            pending.Remove id |> ignore
                            nacked
                       | Nack id, _ -> 
                            (id,pending.[id])::nacked
            }
        loop [])

Anchoring

In order to provide processing guarantees Storm needs to construct and track the state of entire "tuple tree", which is built out by emitting "anchored" tuples. FsShelter implements anchoring statically: instead of ad-hoc, as determined by a component, it is a property of the stream leading to an emit. Consequently the implementation of emit (anchored/unanchored) is determined by the topology graph and completely transparent to the bolt that processes a tuple that will be used as an anchor.

//define the storm topology
open FsShelter.DSL

#nowarn "25" / // for stream matching expressions
let sampleTopology = topology "Guaranteed" {
    let s1 = numbers
             |> Spout.runReliable (fun log cfg () -> source.PostAndReply Get)  // ignoring logging and cfg available
                                  (fun _ -> Ack >> source.Post, Nack >> source.Post)
                                  ignore                                       // no deactivation
    let b1 = addOne
             |> Bolt.run (fun log cfg tuple emit -> (tuple,emit)) // pass incoming tuple and emit function
             |> withParallelism 2
    
    let b2 = logResult
             |> Bolt.run (fun log cfg ->
                            let mylog text = log LogLevel.Info text
                            fun tuple emit -> (mylog,tuple))
             |> withParallelism 1

    let b3 = logResult
             |> Bolt.run (fun log cfg -> 
                            let mylog text = log LogLevel.Info text
                            fun tuple emit -> (mylog,tuple))
             |> withParallelism 1

    yield s1 ==> b1 |> Shuffle.on Original  // emit from s1 to b1 on Original stream and anchor immediately following emits to this tuple
    yield b1 --> b2 |> Shuffle.on Odd       // anchored emit from b1 to b2 on Odd stream 
    yield b1 --> b3 |> Shuffle.on Even      // anchored emit from b1 to b2 on Even stream 
}

Resulting topology graph:

SVG

The solid lines represent "anchoring" streams and the dotted lines indicate the outer limits of the processing guarantees: a tuple emitted along a dotted line is only anchored if the line leading to it is solid.

For a detailed walkthrough of the XOR-tree tracking that powers this, see Acker Algorithm. For end-to-end message flow scenarios including backpressure and timeouts, see Message Flow.

namespace FsShelter
namespace System
namespace System.Collections
namespace System.Collections.Generic
type Schema = | Original of int | Even of int | Odd of int
Multiple items
val int: value: 'T -> int (requires member op_Explicit)

--------------------
type int = int32

--------------------
type int<'Measure> = int
val numbers: source: (unit -> 'a * int) -> ('a * Schema) option
val source: (unit -> 'a * int)
val tupleId: 'a
val number: int
union case Option.Some: Value: 'T -> Option<'T>
union case Schema.Original: int -> Schema
val addOne: input: Schema * emit: (Schema -> 'a) -> 'a
val input: Schema
val emit: (Schema -> 'a)
val x: int
union case Schema.Even: int -> Schema
union case Schema.Odd: int -> Schema
val failwithf: format: Printf.StringFormat<'T,'Result> -> 'T
val logResult: info: (string -> 'a) * input: Schema -> 'a
val info: (string -> 'a)
val sprintf: format: Printf.StringFormat<'T> -> 'T
module Topology from FsShelter
<summary> Topology data model </summary>
type QueueCmd = | Get of AsyncReplyChannel<TupleId * int> | Ack of TupleId | Nack of TupleId
type AsyncReplyChannel<'Reply> = member Reply: value: 'Reply -> unit
type TupleId = | Anchored of anchor: int64 * local: int64 | Unanchored of id: int64 | Named of name: string override ToString: unit -> string
<summary> Tuple id - opaque identifier for tuple tracking. </summary>
type Ack = TupleId -> unit
<summary> ack signature </summary>
type Nack = TupleId -> unit
<summary> nack signature </summary>
val source: MailboxProcessor<QueueCmd>
val rnd: Random
Multiple items
type Random = new: unit -> unit + 1 overload member GetHexString: stringLength: int * ?lowercase: bool -> string + 1 overload member GetItems<'T> : choices: ReadOnlySpan<'T> * length: int -> 'T array + 2 overloads member GetString: choices: ReadOnlySpan<char> * length: int -> string member Next: unit -> int + 2 overloads member NextBytes: buffer: byte array -> unit + 1 overload member NextDouble: unit -> float member NextInt64: unit -> int64 + 2 overloads member NextSingle: unit -> float32 member Shuffle<'T> : values: Span<'T> -> unit + 1 overload ...
<summary>Represents a pseudo-random number generator, which is an algorithm that produces a sequence of numbers that meet certain statistical requirements for randomness.</summary>

--------------------
Random() : Random
Random(Seed: int) : Random
val count: int64 ref
Multiple items
val ref: value: 'T -> 'T ref

--------------------
type 'T ref = Ref<'T>
val pending: Dictionary<TupleId,int>
Multiple items
type Dictionary<'TKey,'TValue> = interface ICollection<KeyValuePair<'TKey,'TValue>> interface seq<KeyValuePair<'TKey,'TValue>> interface IEnumerable interface IDictionary<'TKey,'TValue> interface IReadOnlyCollection<KeyValuePair<'TKey,'TValue>> interface IReadOnlyDictionary<'TKey,'TValue> interface ICollection interface IDictionary interface IDeserializationCallback interface ISerializable ...
<summary>Represents a collection of keys and values.</summary>
<typeparam name="TKey">The type of the keys in the dictionary.</typeparam>
<typeparam name="TValue">The type of the values in the dictionary.</typeparam>


--------------------
Dictionary() : Dictionary<'TKey,'TValue>
Dictionary(dictionary: IDictionary<'TKey,'TValue>) : Dictionary<'TKey,'TValue>
Dictionary(collection: KeyValuePair<'TKey,'TValue> seq) : Dictionary<'TKey,'TValue>
Dictionary(comparer: IEqualityComparer<'TKey>) : Dictionary<'TKey,'TValue>
Dictionary(capacity: int) : Dictionary<'TKey,'TValue>
Dictionary(dictionary: IDictionary<'TKey,'TValue>, comparer: IEqualityComparer<'TKey>) : Dictionary<'TKey,'TValue>
Dictionary(collection: KeyValuePair<'TKey,'TValue> seq, comparer: IEqualityComparer<'TKey>) : Dictionary<'TKey,'TValue>
Dictionary(capacity: int, comparer: IEqualityComparer<'TKey>) : Dictionary<'TKey,'TValue>
val nextId: unit -> int64
namespace System.Threading
type Interlocked = static member Add: location1: byref<int> * value: int -> int + 3 overloads static member And: location1: byref<int> * value: int -> int + 3 overloads static member CompareExchange: location1: byref<float> * value: float * comparand: float -> float + 13 overloads static member Decrement: location: byref<int> -> int + 3 overloads static member Exchange: location1: byref<float> * value: float -> float + 13 overloads static member Increment: location: byref<int> -> int + 3 overloads static member MemoryBarrier: unit -> unit static member MemoryBarrierProcessWide: unit -> unit static member Or: location1: byref<int> * value: int -> int + 3 overloads static member Read: location: inref<int64> -> int64 + 1 overload
<summary>Provides atomic operations for variables that are shared by multiple threads.</summary>
Threading.Interlocked.Increment(location: byref<uint64>) : uint64
Threading.Interlocked.Increment(location: byref<uint32>) : uint32
Threading.Interlocked.Increment(location: byref<int64>) : int64
Threading.Interlocked.Increment(location: byref<int>) : int
Ref.contents: int64
Multiple items
type MailboxProcessor<'Msg> = interface IDisposable new: body: (MailboxProcessor<'Msg> -> Async<unit>) * ?cancellationToken: CancellationToken -> MailboxProcessor<'Msg> + 1 overload member Dispose: unit -> unit member Post: message: 'Msg -> unit member PostAndAsyncReply: buildMessage: (AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout: int -> Async<'Reply> member PostAndReply: buildMessage: (AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout: int -> 'Reply member PostAndTryAsyncReply: buildMessage: (AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout: int -> Async<'Reply option> member Receive: ?timeout: int -> Async<'Msg> member Scan: scanner: ('Msg -> Async<'T> option) * ?timeout: int -> Async<'T> member TryPostAndReply: buildMessage: (AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout: int -> 'Reply option ...

--------------------
new: body: (MailboxProcessor<'Msg> -> Async<unit>) * ?cancellationToken: Threading.CancellationToken -> MailboxProcessor<'Msg>
new: body: (MailboxProcessor<'Msg> -> Async<unit>) * isThrowExceptionAfterDisposed: bool * ?cancellationToken: Threading.CancellationToken -> MailboxProcessor<'Msg>
static member MailboxProcessor.Start: body: (MailboxProcessor<'Msg> -> Async<unit>) * ?cancellationToken: Threading.CancellationToken -> MailboxProcessor<'Msg>
static member MailboxProcessor.Start: body: (MailboxProcessor<'Msg> -> Async<unit>) * isThrowExceptionAfterDisposed: bool * ?cancellationToken: Threading.CancellationToken -> MailboxProcessor<'Msg>
val inbox: MailboxProcessor<QueueCmd>
val loop: nacked: (TupleId * int) list -> Async<'a>
val nacked: (TupleId * int) list
val async: AsyncBuilder
val cmd: QueueCmd
member MailboxProcessor.Receive: ?timeout: int -> Async<'Msg>
union case QueueCmd.Get: AsyncReplyChannel<TupleId * int> -> QueueCmd
val rc: AsyncReplyChannel<TupleId * int>
val tupleId: TupleId
union case TupleId.Named: name: string -> TupleId
Multiple items
val string: value: 'T -> string

--------------------
type string = String
Random.Next() : int
Random.Next(maxValue: int) : int
Random.Next(minValue: int, maxValue: int) : int
Dictionary.Add(key: TupleId, value: int) : unit
member AsyncReplyChannel.Reply: value: 'Reply -> unit
val xs: (TupleId * int) list
Multiple items
union case QueueCmd.Ack: TupleId -> QueueCmd

--------------------
type Ack = TupleId -> unit
<summary> ack signature </summary>
val id: TupleId
Dictionary.Remove(key: TupleId) : bool
(extension) IDictionary.Remove<'TKey,'TValue>(key: 'TKey, value: byref<'TValue>) : bool
Dictionary.Remove(key: TupleId, value: byref<int>) : bool
val ignore: value: 'T -> unit
Multiple items
union case QueueCmd.Nack: TupleId -> QueueCmd

--------------------
type Nack = TupleId -> unit
<summary> nack signature </summary>
module DSL from FsShelter
<summary> Embedded DSL for defining the topologies </summary>
val sampleTopology: Topology<Schema>
val topology: name: string -> TopologyBuilder
<summary> topology builder instance </summary>
val s1: Spout<Schema>
Multiple items
module Spout from FsShelter.DSL

--------------------
type Spout<'t> = { MkComp: (unit -> Component<'t>) Parallelism: uint32 Executors: uint32 option Conf: Conf } static member WithConf: s: Spout<'a> * conf: Conf -> Spout<'a> static member WithExecutors: s: Spout<'a> * e: uint32 -> Spout<'a> static member WithParallelism: s: Spout<'a> * p: uint32 -> Spout<'a>
<summary> Storm Spout abstraction </summary>
val runReliable: mkArgs: ((LogLevel -> string -> unit) -> Conf -> 'args) -> mkAcker: ('args -> Acker) -> deactivate: ('args -> unit) -> next: Next<'args,(TupleId * 't)> -> Spout<'t>
<summary> define a reliable spout mkArgs: one-time construction of arguments that will be passed into each next() call. mkAcker: one time construction of `Ack*Nack` handlers (using the args). next: spout function that returns an id*tuple option. </summary>
val log: (LogLevel -> string -> unit)
val cfg: Conf
member MailboxProcessor.PostAndReply: buildMessage: (AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout: int -> 'Reply
member MailboxProcessor.Post: message: 'Msg -> unit
val b1: Bolt<Schema>
Multiple items
module Bolt from FsShelter.DSL

--------------------
type Bolt<'t> = { Activate: 't option Deactivate: 't option MkComp: ((StreamId -> TupleId -> TupleId list) * 't option * 't option -> Component<'t>) Parallelism: uint32 Executors: uint32 option Conf: Conf } static member WithActivation: s: Bolt<'a> * t: 'a -> Bolt<'a> static member WithConf: s: Bolt<'a> * conf: Conf -> Bolt<'a> static member WithDeactivation: s: Bolt<'a> * t: 'a -> Bolt<'a> static member WithExecutors: s: Bolt<'a> * e: uint32 -> Bolt<'a> static member WithParallelism: s: Bolt<'a> * p: uint32 -> Bolt<'a>
<summary> Storm Bolt abstraction </summary>
val run: mkArgs: ((LogLevel -> string -> unit) -> Conf -> 't -> ('t -> unit) -> 'a) -> consume: Consume<'a> -> Bolt<'t>
<summary> define a bolt that auto-acks mkArgs: curried construction of arguments (log and conf applied only once) that will be passed into each next() call. consume: bolt function that will receive incoming tuples. </summary>
val tuple: Schema
val emit: (Schema -> unit)
val withParallelism: parallelism: 'a -> spec: 's -> 's (requires member op_Explicit and member WithParallelism)
<summary> override default parallelism </summary>
val b2: Bolt<Schema>
val mylog: text: string -> unit
val text: string
type LogLevel = | Trace = 0 | Debug = 1 | Info = 2 | Warn = 3 | Error = 4
<summary> Storm log levels </summary>
LogLevel.Info: LogLevel = 2
val b3: Bolt<Schema>
type Shuffle = static member on: case: Expr<('a -> 't)> -> (bool -> ComponentId -> ComponentId -> Stream<'t>)
<summary> define shuffle grouping </summary>
static member Shuffle.on: case: Quotations.Expr<('a -> 't)> -> (bool -> ComponentId -> ComponentId -> Stream<'t>)

Type something to start searching.