FsShelter


Defining the schema

FsShelter uses F# discriminated unions to statically type streams:

1: 
2: 
3: 
4: 
5: 
// data schema for the topology, every case is a unqiue stream
type Schema = 
    | Sentence of string
    | Word of string
    | WordCount of string * int64

Defining unreliable spouts

FsShelter spouts can be implemented as "reliable" or "unreliable". Either implementation is a single function, returning an async option, where None indicates there's nothing to emit from this spout at this moment:

1: 
2: 
// sentences spout - feeds messages into the topology
let sentences source = source() |> Sentence |> Some

Defining bolts

A couple of examples of FsShelter bolts that read a tuple and emit another one:

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
11: 
12: 
13: 
// split bolt - consumes sentences and emits words
let splitIntoWords (input, emit) = 
    match input with
    | Sentence s -> s.Split([|' '|],StringSplitOptions.RemoveEmptyEntries) 
                    |> Seq.map Word 
                    |> Seq.iter emit
    | _ -> failwithf "unexpected input: %A" input

// count words bolt 
let countWords (input, increment, emit) = 
    match input with
    | Word word -> WordCount (word, increment word) |> emit
    | _ -> failwithf "unexpected input: %A" input

And a terminating bolt that reads a tuple, but doesn't emit anything:

1: 
2: 
3: 
4: 
5: 
// log word count - terminating bolt 
let logResult (log, input) = 
    match input with
    | WordCount (word,count) -> log (sprintf "%s: %d" word count)
    | _ -> failwithf "unexpected input: %A" input

We will pass these implementations into the component functions when we put things together in a topology definition.

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
11: 
12: 
13: 
14: 
15: 
16: 
let source = 
    let rnd = new System.Random()
    let sentences = [ "the cow jumped over the moon"
                      "an apple a day keeps the doctor away"
                      "four score and seven years ago"
                      "snow white and the seven dwarfs"
                      "i am at two with nature" ]

    fun () -> sentences.[ rnd.Next(0, sentences.Length) ]

// increment word count and return new value
let increment =
    let cache = Collections.Concurrent.ConcurrentDictionary<string,int64 ref>()
    fun word -> 
        let c = cache.GetOrAdd(word, ref 0L) 
        Threading.Interlocked.Increment &c.contents

Using F# DSL to define a topology

A typical (event-streaming) Storm topology is a graph of spouts and bolts connected via streams that can be defined via a topology computation expression:

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
11: 
12: 
13: 
14: 
15: 
16: 
17: 
18: 
19: 
20: 
21: 
22: 
23: 
24: 
25: 
26: 
27: 
28: 
29: 
30: 
31: 
32: 
33: 
open FsShelter.DSL
#nowarn "25" // for stream grouping expressions

//define the storm topology
let sampleTopology = 
    topology "WordCount" { 
        let sentencesSpout = 
            sentences
            |> Spout.runUnreliable (fun log cfg -> source)  // make arguments: ignoring Storm logging and cfg, passing our source function
                                   ignore                   // no deactivation
            |> withParallelism 4
        
        let splitBolt = 
            splitIntoWords
            |> Bolt.run (fun log cfg tuple emit -> (tuple, emit)) // make arguments: pass incoming tuple and emit function
            |> withParallelism 4
        
        let countBolt = 
            countWords
            |> Bolt.run (fun log cfg tuple emit -> (tuple, increment, emit))
            |> withParallelism 4
        
        let logBolt = 
            logResult
            |> Bolt.run (fun log cfg ->                           // make arguments: pass PID-log and incoming tuple 
                            let mylog = Common.Logging.asyncLog (Diagnostics.Process.GetCurrentProcess().Id.ToString()+"_count")
                            fun tuple emit -> (mylog, tuple))
            |> withParallelism 2
        
        yield sentencesSpout --> splitBolt |> Shuffle.on Sentence               // emit from sentencesSpout to splitBolt on Sentence stream, shuffle among target task instances
        yield splitBolt --> countBolt |> Group.by (function Word w -> w)        // emit from splitBolt into countBolt on Word stream, group by word (into the same task instance)
        yield countBolt --> logBolt |> Group.by (function WordCount (w,_) -> w) // emit from countBolt into logBolt on WordCount stream, group by word value
    }

Here we define the graph by declaring the components and connecting them with arrows. The lambda arguments for the "run" methods provide the opportunity to carry out construction of the arguments that will be passed into the component functions, where:

  • log is the Storm log factory
  • cfg is the runtime configuration passed in from Storm
  • tuple is the instance of the schema DU coming in
  • emit is the function to emit another tuple

log and cfg are fixed once (curried) and as demonstrated in the logBolt's mkArgs lambda, one time-initialization can be carried out by inserting arbitrary code before tuple and emit arguments. This initialization will not be triggered unless the task execution is actually requested by Storm for this specific instance of the process.

Submitting the topology for execution

Storm accepts JARs for code distribution and FsShelter provides functions to package our assemblies and upload them to Nimbus. Once the code is uploaded, Storm needs to be told how to run it and FsShelter has functions that convert the above representation into that of Nimbus API. Storm then starts the supervising processes across the cluster and spins up a copy of our executable for each task instance in our topology. FsShelter's Task will perform the handshake that will determine which component a given process instance has been assigned to execute:

1: 
2: 
3: 
sampleTopology
|> Task.ofTopology
|> Task.run ProtoIO.start // here we specify which serializer to use when talking to Storm

Then the execution will be handed over to one of the corresponding "dispatchers", which will handle the subsequent interaction between Storm and the component function.

Keep in mind that:

  • STDIN/STDOUT are reserved for communications with Storm and any IO the component is going to do has to go through some other channel (no console logging!).
  • out of the box Storm only supports JSON multilang. For faster IO, consider ProtoShell. The serilizer JAR can be bundled along with the submitted topology or deployed in Storm's classpath beforehand.

Exporting the topology graph

FsShelter includes a completely customizable GraphViz (dot) export functionality. Here's what the word count topology looks like with default renderers:

SVG

The dotted lines represent "unanchored" streams and the number inside the [] shows the parallelism hint. This was achived by a simple export to console:

1: 
sampleTopology |> DotGraph.writeToConsole

Followed by further conversion into a desired format, piping the markup into GraphViz:

1: 
dotnet samples/WordCount/bin/Release/netcoreapp2.1/WordCount.dll graph | dot -Tsvg -o build/WordCount.svg

It is also possible to generate graphs with colour-coded streams:

1: 
sampleTopology |> DotGraph.writeColourizedToConsole

SVG

Alternatively, you can provide your own X11 colour scheme:

1: 
2: 
3: 
let customColours = [| "purple"; "dodgerblue"; "springgreen"; "olivedrab"; "orange"; "orangered"; "maroon"; "black" |]
sampleTopology
|> DotGraph.exportToDot (DotGraph.writeHeader, DotGraph.writeFooter, DotGraph.writeSpout, DotGraph.writeBolt, DotGraph.writeColourfulStream <| DotGraph.getColour customColours) System.Console.Out

SVG

namespace System
namespace FsShelter
val s : string
String.Split([<ParamArray>] separator: char []) : string []
String.Split(separator: string [], options: StringSplitOptions) : string []
String.Split(separator: char [], options: StringSplitOptions) : string []
String.Split(separator: char [], count: int) : string []
String.Split(separator: string [], count: int, options: StringSplitOptions) : string []
String.Split(separator: char [], count: int, options: StringSplitOptions) : string []
field StringSplitOptions.RemoveEmptyEntries = 1
val map : mapping:('T -> 'U) -> source:seq<'T> -> seq<'U>

Full name: Microsoft.FSharp.Collections.Seq.map
val iter : action:('T -> unit) -> source:seq<'T> -> unit

Full name: Microsoft.FSharp.Collections.Seq.iter
Multiple items
type Random =
  new : unit -> Random + 1 overload
  member Next : unit -> int + 2 overloads
  member NextBytes : buffer:byte[] -> unit
  member NextDouble : unit -> float

Full name: System.Random

--------------------
Random() : unit
Random(Seed: int) : unit
Random.Next() : int
Random.Next(maxValue: int) : int
Random.Next(minValue: int, maxValue: int) : int
property List.Length: int
namespace System.Collections.Concurrent
Multiple items
type ConcurrentDictionary<'TKey,'TValue> =
  new : unit -> ConcurrentDictionary<'TKey, 'TValue> + 6 overloads
  member AddOrUpdate : key:'TKey * addValueFactory:Func<'TKey, 'TValue> * updateValueFactory:Func<'TKey, 'TValue, 'TValue> -> 'TValue + 2 overloads
  member Clear : unit -> unit
  member ContainsKey : key:'TKey -> bool
  member Count : int
  member GetEnumerator : unit -> IEnumerator<KeyValuePair<'TKey, 'TValue>>
  member GetOrAdd : key:'TKey * valueFactory:Func<'TKey, 'TValue> -> 'TValue + 2 overloads
  member IsEmpty : bool
  member Item : 'TKey -> 'TValue with get, set
  member Keys : ICollection<'TKey>
  ...

Full name: System.Collections.Concurrent.ConcurrentDictionary<_,_>

--------------------
Collections.Concurrent.ConcurrentDictionary() : unit
Collections.Concurrent.ConcurrentDictionary(collection: Collections.Generic.IEnumerable<Collections.Generic.KeyValuePair<'TKey,'TValue>>) : unit
Collections.Concurrent.ConcurrentDictionary(comparer: Collections.Generic.IEqualityComparer<'TKey>) : unit
Collections.Concurrent.ConcurrentDictionary(concurrencyLevel: int, capacity: int) : unit
Collections.Concurrent.ConcurrentDictionary(collection: Collections.Generic.IEnumerable<Collections.Generic.KeyValuePair<'TKey,'TValue>>, comparer: Collections.Generic.IEqualityComparer<'TKey>) : unit
Collections.Concurrent.ConcurrentDictionary(concurrencyLevel: int, collection: Collections.Generic.IEnumerable<Collections.Generic.KeyValuePair<'TKey,'TValue>>, comparer: Collections.Generic.IEqualityComparer<'TKey>) : unit
Collections.Concurrent.ConcurrentDictionary(concurrencyLevel: int, capacity: int, comparer: Collections.Generic.IEqualityComparer<'TKey>) : unit
val c : int64 ref
Collections.Concurrent.ConcurrentDictionary.GetOrAdd(key: string, value: int64 ref) : int64 ref
Collections.Concurrent.ConcurrentDictionary.GetOrAdd(key: string, valueFactory: Func<string,int64 ref>) : int64 ref
Collections.Concurrent.ConcurrentDictionary.GetOrAdd<'TArg>(key: string, valueFactory: Func<string,'TArg,int64 ref>, factoryArgument: 'TArg) : int64 ref
type Interlocked =
  static member Add : location1:int * value:int -> int + 1 overload
  static member CompareExchange : location1:int * value:int * comparand:int -> int + 6 overloads
  static member Decrement : location:int -> int + 1 overload
  static member Exchange : location1:int * value:int -> int + 6 overloads
  static member Increment : location:int -> int + 1 overload
  static member MemoryBarrier : unit -> unit
  static member Read : location:int64 -> int64

Full name: System.Threading.Interlocked
Threading.Interlocked.Increment(location: byref<int64>) : int64
Threading.Interlocked.Increment(location: byref<int>) : int
Ref.contents: int64
module DSL

from FsShelter
val runUnreliable : mkArgs:((Multilang.LogLevel -> string -> unit) -> Conf -> 'args) -> deactivate:('args -> unit) -> next:Next<'args,'t> -> Topology.Spout<'t>

Full name: FsShelter.DSL.Spout.runUnreliable
val run : mkArgs:((Multilang.LogLevel -> string -> unit) -> Conf -> 't -> ('t -> unit) -> 'a) -> consume:Consume<'a> -> Topology.Bolt<'t>

Full name: FsShelter.DSL.Bolt.run
Multiple items
type Process =
  inherit Component
  new : unit -> Process
  member BasePriority : int
  member BeginErrorReadLine : unit -> unit
  member BeginOutputReadLine : unit -> unit
  member CancelErrorRead : unit -> unit
  member CancelOutputRead : unit -> unit
  member Close : unit -> unit
  member CloseMainWindow : unit -> bool
  member EnableRaisingEvents : bool with get, set
  member ExitCode : int
  ...

Full name: System.Diagnostics.Process

--------------------
Diagnostics.Process() : unit
Diagnostics.Process.GetCurrentProcess() : Diagnostics.Process
static member Shuffle.on : case:Quotations.Expr<('a0 -> 't)> -> (bool -> Topology.ComponentId -> Topology.ComponentId -> Topology.Stream<'t>)
static member Group.by : select:Quotations.Expr<('t -> 'p)> -> (bool -> Topology.ComponentId -> Topology.ComponentId -> Topology.Stream<'t>)
val w : string
val ofTopology : t:Topology.Topology<'t> -> compId:Topology.ComponentId -> Topology.Runnable<'t>

Full name: FsShelter.Task.ofTopology
val run : mkIO:(Task.Log -> Topology.IO<'t>) -> task:Task.Task<'t> -> unit

Full name: FsShelter.Task.run
val start : log:Task.Log -> Topology.IO<'a>

Full name: FsShelter.ProtoIO.start
val writeToConsole : t:Topology.Topology<'a> -> unit

Full name: FsShelter.DotGraph.writeToConsole
val writeColourizedToConsole : t:Topology.Topology<'a> -> unit

Full name: FsShelter.DotGraph.writeColourizedToConsole
val getColour : colours:string [] -> (string -> string)

Full name: FsShelter.DotGraph.getColour
type Console =
  static member BackgroundColor : ConsoleColor with get, set
  static member Beep : unit -> unit + 1 overload
  static member BufferHeight : int with get, set
  static member BufferWidth : int with get, set
  static member CapsLock : bool
  static member Clear : unit -> unit
  static member CursorLeft : int with get, set
  static member CursorSize : int with get, set
  static member CursorTop : int with get, set
  static member CursorVisible : bool with get, set
  ...

Full name: System.Console
property Console.Out: IO.TextWriter
Fork me on GitHub