FsStorm


Defining unreliable spouts

FsStorm spouts can be implemented as "reliable" or "unreliable". Unreliable spouts implementation can be as simple as a single function:

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
let rnd = new System.Random() // used for generating random messages

///cfg: the configution passed in by storm
///runner: a spout runner function (passed in from topology)
let spout runner cfg = 
    //define the function that will return the emitter function
    //emit: a function that emits message to storm
    let createEmitter emit = fun () -> async { tuple [ rnd.Next(0, 100) ] |> emit } //the "next" function
    //run the spout
    createEmitter |> runner

Defining bolts

Example of a FsStorm bolt that reads a tuple and emits another one:

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
11: 
12: 
13: 
14: 
15: 
16: 
///cfg: the configuration passed in from Storm
///runner: passed in from topology
///log: passed in from topology
///emit: passed in from topology
let addOneBolt runner log emit cfg = 
    //define the function that will return the consumer function
    let createAdder = 
        //accept/consume tuple function
        fun (msg : Json) -> 
            async { 
                log "msg" (sprintf "%A" msg)
                // note the tuple fields are accessible as Jval array items
                tuple [ msg?tuple.[0].ValI + 1 ] |> emit
            }
    //run the bolt
    createAdder |> runner

And a terminating bolt that reads a tuple, but doesn't emit anything:

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
///cfg: the configution passed in by storm
///runner: passed in from topology
///log: passed in from topology
let resultBolt runner log cfg = 
    //define the function that will return the consumer function
    let createReader = 
        //accept messages function
        fun (msg : Json) -> async { log "x" (sprintf "%A" msg?tuple.[0].ValI) }
    //run the bolt
    createReader |> runner

Using F# DSL to define the topology

Topologies are defined using a strong-typed DSL:

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
11: 
12: 
13: 
14: 
15: 
16: 
17: 
18: 
19: 
20: 
21: 
22: 
23: 
open StormDSL

let topology = 
    { TopologyName = "FstSample"
      Spouts = 
          [ { Id = "SimpleSpout"
              Outputs = [ Default [ "number" ] ]
              Spout = Local { Func = spout Storm.simpleSpoutRunner }
              Config = JsonNull
              Parallelism = 1 } ]
      Bolts = 
          [ { Id = "AddOneBolt"
              Outputs = [ Default [ "number" ] ]
              Inputs = [ DefaultStream "SimpleSpout", Shuffle ]
              Bolt = Local { Func = addOneBolt Storm.autoAckBoltRunner Logging.log Storm.emit}
              Config = JsonNull
              Parallelism = 2 }
            { Id = "ResultBolt"
              Outputs = []
              Inputs = [ DefaultStream "AddOneBolt", Shuffle ]
              Bolt = Local { Func = resultBolt Storm.autoAckBoltRunner Logging.log }
              Config = JsonNull
              Parallelism = 2 } ] }

Unit-testing the components

Several runners are implemented to facilitate unit-testing in StormTest module:

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
11: 
12: 
13: 
14: 
15: 
16: 
17: 
18: 
19: 
20: 
21: 
22: 
23: 
24: 
25: 
26: 
27: 
28: 
29: 
30: 
31: 
32: 
33: 
34: 
35: 
36: 
37: 
38: 
39: 
open NUnit.Framework
open System
open StormTest

let conf = 
    { PidDir = ""
      TaskId = ""
      Json = jval "" }

[<Test>]
let ``spout emits``() = 
    let results = ref []
    let s = spout (simpleSpoutRunner (fun x -> results := x :: results.Value) [ next; next ]) conf
    Async.RunSynchronously s
    match results.Value with
    | x :: y :: [] -> ()
    | _ -> Assert.Fail "Must have emitted exactly two!"

[<Test>]
let ``bolt adds``() = 
    let results = ref []
    let b = 
        addOneBolt (autoAckBoltRunner [ tuple [ 123 ] ] Console.WriteLine) 
            (fun tag desc -> Console.WriteLine("{0}: {1}", tag, desc)) (fun t -> results := t :: results.Value) conf
    Async.RunSynchronously(b, 2000)
    match results.Value with
    | x :: [] -> Assert.AreEqual(124, x?tuple.[0].ValI)
    | _ -> Assert.Fail "Head, no tail, that's the deal!"

[<Test>]
let ``bolt consumes``() = 
    let results = ref []
    let b = 
        resultBolt (autoAckBoltRunner [ tuple [ 123 ] ] Console.WriteLine) 
            (fun tag desc -> results := (tag, desc) :: results.Value) conf
    Async.RunSynchronously(b, 2000)
    match results.Value with
    | x :: [] -> Assert.AreEqual(("x", sprintf "%A" 123), x)
    | _ -> Assert.Fail "Head, no tail, that's the deal!"
val rnd : System.Random

Full name: Sample.rnd
namespace System
Multiple items
type Random =
  new : unit -> Random + 1 overload
  member Next : unit -> int + 2 overloads
  member NextBytes : buffer:byte[] -> unit
  member NextDouble : unit -> float

Full name: System.Random

--------------------
System.Random() : unit
System.Random(Seed: int) : unit
val spout : runner:((('a -> unit) -> unit -> Async<unit>) -> 'b) -> cfg:'c -> 'b

Full name: Sample.spout


cfg: the configution passed in by storm
runner: a spout runner function (passed in from topology)
val runner : ((('a -> unit) -> unit -> Async<unit>) -> 'b)
val cfg : 'c
val createEmitter : (('d -> unit) -> unit -> Async<unit>)
val emit : ('d -> unit)
val async : AsyncBuilder

Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.async
System.Random.Next() : int
System.Random.Next(maxValue: int) : int
System.Random.Next(minValue: int, maxValue: int) : int
val addOneBolt : runner:(('a -> Async<unit>) -> 'b) -> log:(string -> string -> unit) -> emit:('c -> unit) -> cfg:'d -> 'b

Full name: Sample.addOneBolt


cfg: the configuration passed in from Storm
runner: passed in from topology
log: passed in from topology
emit: passed in from topology
val runner : (('a -> Async<unit>) -> 'b)
val log : (string -> string -> unit)
val emit : ('c -> unit)
val cfg : 'd
val createAdder : ('e -> Async<unit>)
val msg : 'e
val sprintf : format:Printf.StringFormat<'T> -> 'T

Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.sprintf
val resultBolt : runner:((obj -> Async<unit>) -> 'a) -> log:(string -> string -> unit) -> cfg:'b -> 'a

Full name: Sample.resultBolt


cfg: the configution passed in by storm
runner: passed in from topology
log: passed in from topology
val runner : ((obj -> Async<unit>) -> 'a)
val cfg : 'b
val createReader : (obj -> Async<unit>)
val msg : obj
val topology : obj

Full name: Sample.topology
val log : value:'T -> 'T (requires member Log)

Full name: Microsoft.FSharp.Core.Operators.log
val conf : obj

Full name: Sample.conf
val ( spout emits ) : unit -> unit

Full name: Sample.( spout emits )
val results : obj list ref
Multiple items
val ref : value:'T -> 'T ref

Full name: Microsoft.FSharp.Core.Operators.ref

--------------------
type 'T ref = Ref<'T>

Full name: Microsoft.FSharp.Core.ref<_>
val s : Async<unit>
property Ref.Value: obj list
Multiple items
type Async
static member AsBeginEnd : computation:('Arg -> Async<'T>) -> ('Arg * AsyncCallback * obj -> IAsyncResult) * (IAsyncResult -> 'T) * (IAsyncResult -> unit)
static member AwaitEvent : event:IEvent<'Del,'T> * ?cancelAction:(unit -> unit) -> Async<'T> (requires delegate and 'Del :> Delegate)
static member AwaitIAsyncResult : iar:IAsyncResult * ?millisecondsTimeout:int -> Async<bool>
static member AwaitTask : task:Task -> Async<unit>
static member AwaitTask : task:Task<'T> -> Async<'T>
static member AwaitWaitHandle : waitHandle:WaitHandle * ?millisecondsTimeout:int -> Async<bool>
static member CancelDefaultToken : unit -> unit
static member Catch : computation:Async<'T> -> Async<Choice<'T,exn>>
static member FromBeginEnd : beginAction:(AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg:'Arg1 * beginAction:('Arg1 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg1:'Arg1 * arg2:'Arg2 * beginAction:('Arg1 * 'Arg2 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg1:'Arg1 * arg2:'Arg2 * arg3:'Arg3 * beginAction:('Arg1 * 'Arg2 * 'Arg3 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromContinuations : callback:(('T -> unit) * (exn -> unit) * (OperationCanceledException -> unit) -> unit) -> Async<'T>
static member Ignore : computation:Async<'T> -> Async<unit>
static member OnCancel : interruption:(unit -> unit) -> Async<IDisposable>
static member Parallel : computations:seq<Async<'T>> -> Async<'T []>
static member RunSynchronously : computation:Async<'T> * ?timeout:int * ?cancellationToken:CancellationToken -> 'T
static member Sleep : millisecondsDueTime:int -> Async<unit>
static member Start : computation:Async<unit> * ?cancellationToken:CancellationToken -> unit
static member StartAsTask : computation:Async<'T> * ?taskCreationOptions:TaskCreationOptions * ?cancellationToken:CancellationToken -> Task<'T>
static member StartChild : computation:Async<'T> * ?millisecondsTimeout:int -> Async<Async<'T>>
static member StartChildAsTask : computation:Async<'T> * ?taskCreationOptions:TaskCreationOptions -> Async<Task<'T>>
static member StartImmediate : computation:Async<unit> * ?cancellationToken:CancellationToken -> unit
static member StartWithContinuations : computation:Async<'T> * continuation:('T -> unit) * exceptionContinuation:(exn -> unit) * cancellationContinuation:(OperationCanceledException -> unit) * ?cancellationToken:CancellationToken -> unit
static member SwitchToContext : syncContext:SynchronizationContext -> Async<unit>
static member SwitchToNewThread : unit -> Async<unit>
static member SwitchToThreadPool : unit -> Async<unit>
static member TryCancelled : computation:Async<'T> * compensation:(OperationCanceledException -> unit) -> Async<'T>
static member CancellationToken : Async<CancellationToken>
static member DefaultCancellationToken : CancellationToken

Full name: Microsoft.FSharp.Control.Async

--------------------
type Async<'T>

Full name: Microsoft.FSharp.Control.Async<_>
static member Async.RunSynchronously : computation:Async<'T> * ?timeout:int * ?cancellationToken:Threading.CancellationToken -> 'T
val x : obj
val y : obj
val ( bolt adds ) : unit -> 'a

Full name: Sample.( bolt adds )
val b : Async<unit>
type Console =
  static member BackgroundColor : ConsoleColor with get, set
  static member Beep : unit -> unit + 1 overload
  static member BufferHeight : int with get, set
  static member BufferWidth : int with get, set
  static member CapsLock : bool
  static member Clear : unit -> unit
  static member CursorLeft : int with get, set
  static member CursorSize : int with get, set
  static member CursorTop : int with get, set
  static member CursorVisible : bool with get, set
  ...

Full name: System.Console
Console.WriteLine() : unit
   (+0 other overloads)
Console.WriteLine(value: string) : unit
   (+0 other overloads)
Console.WriteLine(value: obj) : unit
   (+0 other overloads)
Console.WriteLine(value: uint64) : unit
   (+0 other overloads)
Console.WriteLine(value: int64) : unit
   (+0 other overloads)
Console.WriteLine(value: uint32) : unit
   (+0 other overloads)
Console.WriteLine(value: int) : unit
   (+0 other overloads)
Console.WriteLine(value: float32) : unit
   (+0 other overloads)
Console.WriteLine(value: float) : unit
   (+0 other overloads)
Console.WriteLine(value: decimal) : unit
   (+0 other overloads)
val tag : string
val desc : string
val t : obj
val ( bolt consumes ) : unit -> 'a

Full name: Sample.( bolt consumes )
val results : (string * string) list ref
property Ref.Value: (string * string) list
val x : string * string
Fork me on GitHub