FsStorm


Defining reliable spouts

Processing guarantees are the biggest selling point of Storm, please see the official docs for the details. FsStorm implements reliability semantics with "housekeeper" functions: defaultHousekeeper can be used as is for transient sources or as an inspiration for reliability over external/persistent sources. The spout implementation is fairly similar to the "unreliable" version, with the addition of unique (int64) tuple ID:

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
11: 
12: 
13: 
14: 
15: 
16: 
let rnd = new System.Random() // used for generating random messages

///cfg: the configution passed in by storm
///runner: a spout runner function (passed in from topology)
let spout runner (cfg:Configuration) = 
    let count = ref 0L
    //define the "next" function
    //emit: a function that emits message to storm with unique ID
    let next emit = 
         fun () -> async { 
                     tuple [ rnd.Next(0, 100) ] 
                     |> emit (Threading.Interlocked.Increment &count.contents) 
                   }
    //run the spout
    next |> runner
	

Anchoring and named streams

FsStorm has helper functions to emit to a named stream or to anchor a tuple:

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
11: 
12: 
13: 
14: 
15: 
16: 
17: 
///cfg: the configuration passed in from Storm
///runner: passed in from topology
///emit: passed in from topology
let addOneBolt runner emit cfg = 
    //define the consumer function 
    let add (msg : Json) =
        async { 
            let x = msg?tuple.[0].ValI + 1
            tuple [ x ]
            // write to a named stream
            |> namedStream (match x % 2 with | 0 -> "even" | _ -> "odd")
            // anchor to ensure the entire tuple tree is processed before the spout is ack'ed
            |> anchor msg // anchor to the original message
            |> emit
        }
    //run the bolt
    add |> runner

Example of parametrization and use of the tuple's origin (component that emitted it and the stream that it arrived on) inspection:

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
11: 
12: 
///cfg: the configuration passed in by Storm
///runner: passed in from topology
///log: log write
let resultBolt runner log (cfg:Configuration) = 
    let desc = cfg.Json?conf?desc.Val // the value passed in with the submitted topology Config
    //define the function that will return the consumer 
    let logResult (msg : Json) = 
          async { 
            log desc (sprintf "origin: %A(%A), data: %A" msg?comp.Val msg?stream.Val msg?tuple.[0].ValI)
          }
    //run the bolt
    logResult |> runner

Topology with named streams and config overrides

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
11: 
12: 
13: 
14: 
15: 
16: 
17: 
18: 
19: 
20: 
21: 
22: 
23: 
24: 
25: 
26: 
27: 
28: 
29: 
30: 
31: 
open StormDSL

let topology = 
    { TopologyName = "FstGuaranteed"
      Spouts = 
          [ { Id = "ReliableSpout"
              Outputs = [ Default [ "number" ] ]
              Spout = Local { Func = spout (Storm.reliableSpoutRunner Storm.defaultHousekeeper) }
              Config = jval [ "topology.max.spout.pending", jval 123 ] // override "backpressure"
              Parallelism = 1 } ]
      Bolts = 
          [ { Id = "AddOneBolt"
              Outputs = [ Named("even", [ "number" ]) // named stream "even"
                          Named("odd", [ "number" ]) ]
              Inputs = [ DefaultStream "ReliableSpout", Shuffle ]
              Bolt = Local { Func = addOneBolt Storm.autoAckBoltRunner Storm.emit }
              Config = JsonNull
              Parallelism = 2 }
            { Id = "EvenResultBolt"
              Outputs = []
              Inputs = [ Stream("AddOneBolt","even"), Shuffle ]
              Bolt = Local { Func = resultBolt Storm.autoAckBoltRunner }
              Config = jval ["desc", "even"] // pass custom config property to the component
              Parallelism = 1 }
            { Id = "OddResultBolt"
              Outputs = []
              Inputs = [ Stream("AddOneBolt","odd"), Shuffle ]
              // logs to custom (FsLogging) pid-based log file
              Bolt = Local { Func = resultBolt Storm.autoAckBoltRunner Logging.log }
              Config = jval ["desc", "odd"] // pass custom config property to the component
              Parallelism = 1 } ] }
val rnd : System.Random

Full name: Guaranteed.rnd
namespace System
Multiple items
type Random =
  new : unit -> Random + 1 overload
  member Next : unit -> int + 2 overloads
  member NextBytes : buffer:byte[] -> unit
  member NextDouble : unit -> float

Full name: System.Random

--------------------
System.Random() : unit
System.Random(Seed: int) : unit
val spout : runner:((('a -> 'b -> unit) -> unit -> Async<unit>) -> 'c) -> cfg:'d -> 'c

Full name: Guaranteed.spout


cfg: the configution passed in by storm
runner: a spout runner function (passed in from topology)
val runner : ((('a -> 'b -> unit) -> unit -> Async<unit>) -> 'c)
val cfg : 'a
val count : int64 ref
Multiple items
val ref : value:'T -> 'T ref

Full name: Microsoft.FSharp.Core.Operators.ref

--------------------
type 'T ref = Ref<'T>

Full name: Microsoft.FSharp.Core.ref<_>
val next : (('a -> 'b -> unit) -> unit -> Async<unit>)
val emit : ('a -> 'b -> unit)
val async : AsyncBuilder

Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.async
System.Random.Next() : int
System.Random.Next(maxValue: int) : int
System.Random.Next(minValue: int, maxValue: int) : int
Ref.contents: int64
val addOneBolt : runner:((obj -> Async<unit>) -> 'a) -> emit:('b -> unit) -> cfg:'c -> 'a

Full name: Guaranteed.addOneBolt


cfg: the configuration passed in from Storm
runner: passed in from topology
emit: passed in from topology
val runner : ((obj -> Async<unit>) -> 'a)
val emit : ('a -> unit)
val add : (obj -> Async<unit>)
val msg : obj
val x : int
val resultBolt : runner:((obj -> Async<unit>) -> 'a) -> log:('b -> string -> unit) -> cfg:'c -> 'a

Full name: Guaranteed.resultBolt


cfg: the configuration passed in by Storm
runner: passed in from topology
log: log write
val log : ('a -> string -> unit)
val desc : 'a
val logResult : (obj -> Async<unit>)
val sprintf : format:Printf.StringFormat<'T> -> 'T

Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.sprintf
val topology : 'a

Full name: Guaranteed.topology
val log : value:'T -> 'T (requires member Log)

Full name: Microsoft.FSharp.Core.Operators.log
Fork me on GitHub