Logo FsShelter

Word Count

This tutorial walks through a complete fire-and-forget word count topology. We'll define a schema, implement components, wire them together, and visualize the result. For background on the concepts used here, see Core Concepts.

Defining the schema

FsShelter uses F# discriminated unions to statically type streams:

// data schema for the topology, every case is a unqiue stream
type Schema = 
    | Sentence of string
    | Word of string
    | WordCount of string * int64

Defining unreliable spouts

FsShelter spouts can be implemented as "reliable" or "unreliable". Either implementation is a single function, returning an async option, where None indicates there's nothing to emit from this spout at this moment:

// sentences spout - feeds messages into the topology
let sentences source = source() |> Sentence |> Some

Defining bolts

A couple of examples of FsShelter bolts that read a tuple and emit another one:

// split bolt - consumes sentences and emits words
let splitIntoWords (input, emit) = 
    match input with
    | Sentence s -> s.Split([|' '|],StringSplitOptions.RemoveEmptyEntries) 
                    |> Seq.map Word 
                    |> Seq.iter emit
    | _ -> failwithf "unexpected input: %A" input

// count words bolt 
let countWords (input, increment, emit) = 
    match input with
    | Word word -> WordCount (word, increment word) |> emit
    | _ -> failwithf "unexpected input: %A" input

And a terminating bolt that reads a tuple, but doesn't emit anything:

// log word count - terminating bolt 
let logResult (log, input) = 
    match input with
    | WordCount (word,count) -> log (sprintf "%s: %d" word count)
    | _ -> failwithf "unexpected input: %A" input

We will pass these implementations into the component functions when we put things together in a topology definition.

let source = 
    let rnd = new System.Random()
    let sentences = [ "the cow jumped over the moon"
                      "an apple a day keeps the doctor away"
                      "four score and seven years ago"
                      "snow white and the seven dwarfs"
                      "i am at two with nature" ]

    fun () -> sentences.[ rnd.Next(0, sentences.Length) ]

// increment word count and return new value
let increment =
    let cache = Collections.Concurrent.ConcurrentDictionary<string,int64 ref>()
    fun word -> 
        let c = cache.GetOrAdd(word, ref 0L) 
        Threading.Interlocked.Increment &c.contents

Using F# DSL to define a topology

A typical (event-streaming) Storm topology is a graph of spouts and bolts connected via streams that can be defined via a topology computation expression:

open FsShelter.DSL
#nowarn "25" / // for stream grouping expressions

//define the storm topology
let sampleTopology = 
    topology "WordCount" { 
        let sentencesSpout = 
            sentences
            |> Spout.runUnreliable (fun log cfg -> source)  // make arguments: ignoring Storm logging and cfg, passing our source function
                                   ignore                   // no deactivation
            |> withParallelism 4
        
        let splitBolt = 
            splitIntoWords
            |> Bolt.run (fun log cfg tuple emit -> (tuple, emit)) // make arguments: pass incoming tuple and emit function
            |> withParallelism 4
        
        let countBolt = 
            countWords
            |> Bolt.run (fun log cfg tuple emit -> (tuple, increment, emit))
            |> withParallelism 4
        
        let logBolt = 
            logResult
            |> Bolt.run (fun log cfg ->                           // make arguments: pass PID-log and incoming tuple 
                            let mylog text = log LogLevel.Info text
                            fun tuple emit -> (mylog, tuple))
            |> withParallelism 2
        
        yield sentencesSpout --> splitBolt |> Shuffle.on Sentence               // emit from sentencesSpout to splitBolt on Sentence stream, shuffle among target task instances
        yield splitBolt --> countBolt |> Group.by (function Word w -> w)        // emit from splitBolt into countBolt on Word stream, group by word (into the same task instance)
        yield countBolt --> logBolt |> Group.by (function WordCount (w,_) -> w) // emit from countBolt into logBolt on WordCount stream, group by word value
    }

Here we define the graph by declaring the components and connecting them with arrows. The lambda arguments for the "run" methods provide the opportunity to carry out construction of the arguments that will be passed into the component functions, where:

log and cfg are fixed once (curried) and as demonstrated in the logBolt's mkArgs lambda, one time-initialization can be carried out by inserting arbitrary code before tuple and emit arguments. This initialization will not be triggered unless the task execution is actually requested by Storm for this specific instance of the process.

Deploying to Storm

To deploy to an Apache Storm cluster, the FsShelter.Multilang package provides functions to package assemblies, upload to Nimbus, and run the multilang handshake:

sampleTopology
|> Task.ofTopology
|> Task.run ProtoIO.start

Both JsonIO and ProtoIO serializers are provided. For faster IO, consider ProtoShell. Note that STDIN/STDOUT are reserved for Storm communications when running as a multilang component.

Exporting the topology graph

FsShelter includes a completely customizable GraphViz (dot) export functionality. Here's what the word count topology looks like with default renderers:

SVG

The dotted lines represent "unanchored" streams and the number inside the [] shows the parallelism hint. This was achived by a simple export to console:

sampleTopology |> DotGraph.writeToConsole

Followed by further conversion into a desired format, piping the markup into GraphViz:

dotnet samples/WordCount/bin/Release/net6.0/WordCount.dll graph | dot -Tsvg -o build/WordCount.svg

It is also possible to generate graphs with colour-coded streams:

sampleTopology |> DotGraph.writeColourizedToConsole

SVG

Alternatively, you can provide your own X11 colour scheme:

let customColours = [| "purple"; "dodgerblue"; "springgreen"; "olivedrab"; "orange"; "orangered"; "maroon"; "black" |]
sampleTopology
|> DotGraph.exportToDot (DotGraph.writeHeader, DotGraph.writeFooter, DotGraph.writeSpout, DotGraph.writeBolt, DotGraph.writeColourfulStream <| DotGraph.getColour customColours) System.Console.Out

SVG

namespace System
namespace FsShelter
type Schema = | Sentence of string | Word of string | WordCount of string * int64
Multiple items
val string: value: 'T -> string

--------------------
type string = String
Multiple items
val int64: value: 'T -> int64 (requires member op_Explicit)

--------------------
type int64 = Int64

--------------------
type int64<'Measure> = int64
val sentences: source: (unit -> string) -> Schema option
val source: (unit -> string)
union case Schema.Sentence: string -> Schema
union case Option.Some: Value: 'T -> Option<'T>
val splitIntoWords: input: Schema * emit: (Schema -> unit) -> unit
val input: Schema
val emit: (Schema -> unit)
val s: string
String.Split(separator: ReadOnlySpan<char>) : string array
   (+0 other overloads)
String.Split( separator: char array) : string array
   (+0 other overloads)
String.Split(separator: string array, options: StringSplitOptions) : string array
   (+0 other overloads)
String.Split(separator: string, ?options: StringSplitOptions) : string array
   (+0 other overloads)
String.Split(separator: char array, options: StringSplitOptions) : string array
   (+0 other overloads)
String.Split(separator: char array, count: int) : string array
   (+0 other overloads)
String.Split(separator: char, ?options: StringSplitOptions) : string array
   (+0 other overloads)
String.Split(separator: string array, count: int, options: StringSplitOptions) : string array
   (+0 other overloads)
String.Split(separator: string, count: int, ?options: StringSplitOptions) : string array
   (+0 other overloads)
String.Split(separator: char array, count: int, options: StringSplitOptions) : string array
   (+0 other overloads)
type StringSplitOptions = | None = 0 | RemoveEmptyEntries = 1 | TrimEntries = 2
<summary>Specifies options for applicable <see cref="Overload:System.String.Split" /> method overloads, such as whether to omit empty substrings from the returned array or trim whitespace from substrings.</summary>
field StringSplitOptions.RemoveEmptyEntries: StringSplitOptions = 1
module Seq from Microsoft.FSharp.Collections
val map: mapping: ('T -> 'U) -> source: 'T seq -> 'U seq
union case Schema.Word: string -> Schema
val iter: action: ('T -> unit) -> source: 'T seq -> unit
val failwithf: format: Printf.StringFormat<'T,'Result> -> 'T
val countWords: input: Schema * increment: (string -> int64) * emit: (Schema -> 'a) -> 'a
val increment: (string -> int64)
val emit: (Schema -> 'a)
val word: string
union case Schema.WordCount: string * int64 -> Schema
val logResult: log: (string -> 'a) * input: Schema -> 'a
val log: (string -> 'a)
val count: int64
val sprintf: format: Printf.StringFormat<'T> -> 'T
val rnd: Random
Multiple items
type Random = new: unit -> unit + 1 overload member GetHexString: stringLength: int * ?lowercase: bool -> string + 1 overload member GetItems<'T> : choices: ReadOnlySpan<'T> * length: int -> 'T array + 2 overloads member GetString: choices: ReadOnlySpan<char> * length: int -> string member Next: unit -> int + 2 overloads member NextBytes: buffer: byte array -> unit + 1 overload member NextDouble: unit -> float member NextInt64: unit -> int64 + 2 overloads member NextSingle: unit -> float32 member Shuffle<'T> : values: Span<'T> -> unit + 1 overload ...
<summary>Represents a pseudo-random number generator, which is an algorithm that produces a sequence of numbers that meet certain statistical requirements for randomness.</summary>

--------------------
Random() : Random
Random(Seed: int) : Random
val sentences: string list
Random.Next() : int
Random.Next(maxValue: int) : int
Random.Next(minValue: int, maxValue: int) : int
property List.Length: int with get
val cache: Collections.Concurrent.ConcurrentDictionary<string,int64 ref>
Multiple items
namespace System.Collections

--------------------
namespace Microsoft.FSharp.Collections
namespace System.Collections.Concurrent
Multiple items
type ConcurrentDictionary<'TKey,'TValue> = interface ICollection<KeyValuePair<'TKey,'TValue>> interface seq<KeyValuePair<'TKey,'TValue>> interface IEnumerable interface IDictionary<'TKey,'TValue> interface IReadOnlyCollection<KeyValuePair<'TKey,'TValue>> interface IReadOnlyDictionary<'TKey,'TValue> interface ICollection interface IDictionary new: unit -> unit + 6 overloads member AddOrUpdate: key: 'TKey * addValueFactory: Func<'TKey,'TValue> * updateValueFactory: Func<'TKey,'TValue,'TValue> -> 'TValue + 2 overloads ...
<summary>Represents a thread-safe collection of key/value pairs that can be accessed by multiple threads concurrently.</summary>
<typeparam name="TKey">The type of the keys in the dictionary.</typeparam>
<typeparam name="TValue">The type of the values in the dictionary.</typeparam>


--------------------
Collections.Concurrent.ConcurrentDictionary() : Collections.Concurrent.ConcurrentDictionary<'TKey,'TValue>
Collections.Concurrent.ConcurrentDictionary(collection: Collections.Generic.KeyValuePair<'TKey,'TValue> seq) : Collections.Concurrent.ConcurrentDictionary<'TKey,'TValue>
Collections.Concurrent.ConcurrentDictionary(comparer: Collections.Generic.IEqualityComparer<'TKey>) : Collections.Concurrent.ConcurrentDictionary<'TKey,'TValue>
Collections.Concurrent.ConcurrentDictionary(collection: Collections.Generic.KeyValuePair<'TKey,'TValue> seq, comparer: Collections.Generic.IEqualityComparer<'TKey>) : Collections.Concurrent.ConcurrentDictionary<'TKey,'TValue>
Collections.Concurrent.ConcurrentDictionary(concurrencyLevel: int, capacity: int) : Collections.Concurrent.ConcurrentDictionary<'TKey,'TValue>
Collections.Concurrent.ConcurrentDictionary(concurrencyLevel: int, collection: Collections.Generic.KeyValuePair<'TKey,'TValue> seq, comparer: Collections.Generic.IEqualityComparer<'TKey>) : Collections.Concurrent.ConcurrentDictionary<'TKey,'TValue>
Collections.Concurrent.ConcurrentDictionary(concurrencyLevel: int, capacity: int, comparer: Collections.Generic.IEqualityComparer<'TKey>) : Collections.Concurrent.ConcurrentDictionary<'TKey,'TValue>
Multiple items
val ref: value: 'T -> 'T ref

--------------------
type 'T ref = Ref<'T>
val c: int64 ref
Collections.Concurrent.ConcurrentDictionary.GetOrAdd(key: string, value: int64 ref) : int64 ref
Collections.Concurrent.ConcurrentDictionary.GetOrAdd(key: string, valueFactory: Func<string,int64 ref>) : int64 ref
Collections.Concurrent.ConcurrentDictionary.GetOrAdd<'TArg (allows ref struct)>(key: string, valueFactory: Func<string,'TArg,int64 ref>, factoryArgument: 'TArg) : int64 ref
namespace System.Threading
type Interlocked = static member Add: location1: byref<int> * value: int -> int + 3 overloads static member And: location1: byref<int> * value: int -> int + 3 overloads static member CompareExchange: location1: byref<float> * value: float * comparand: float -> float + 13 overloads static member Decrement: location: byref<int> -> int + 3 overloads static member Exchange: location1: byref<float> * value: float -> float + 13 overloads static member Increment: location: byref<int> -> int + 3 overloads static member MemoryBarrier: unit -> unit static member MemoryBarrierProcessWide: unit -> unit static member Or: location1: byref<int> * value: int -> int + 3 overloads static member Read: location: inref<int64> -> int64 + 1 overload
<summary>Provides atomic operations for variables that are shared by multiple threads.</summary>
Threading.Interlocked.Increment(location: byref<uint64>) : uint64
Threading.Interlocked.Increment(location: byref<uint32>) : uint32
Threading.Interlocked.Increment(location: byref<int64>) : int64
Threading.Interlocked.Increment(location: byref<int>) : int
Ref.contents: int64
module DSL from FsShelter
<summary> Embedded DSL for defining the topologies </summary>
val sampleTopology: Topology.Topology<Schema>
val topology: name: string -> TopologyBuilder
<summary> topology builder instance </summary>
val sentencesSpout: Topology.Spout<Schema>
module Spout from FsShelter.DSL
val runUnreliable: mkArgs: ((LogLevel -> string -> unit) -> Conf -> 'args) -> deactivate: ('args -> unit) -> next: Next<'args,'t> -> Topology.Spout<'t>
<summary> define spout with no processing guarantees mkArgs: one-time construction of arguments that will be passed into each next() call. next: spout function that returns a tuple option. </summary>
val log: (LogLevel -> string -> unit)
val cfg: Conf
val ignore: value: 'T -> unit
val withParallelism: parallelism: 'a -> spec: 's -> 's (requires member op_Explicit and member WithParallelism)
<summary> override default parallelism </summary>
val splitBolt: Topology.Bolt<Schema>
module Bolt from FsShelter.DSL
val run: mkArgs: ((LogLevel -> string -> unit) -> Conf -> 't -> ('t -> unit) -> 'a) -> consume: Consume<'a> -> Topology.Bolt<'t>
<summary> define a bolt that auto-acks mkArgs: curried construction of arguments (log and conf applied only once) that will be passed into each next() call. consume: bolt function that will receive incoming tuples. </summary>
val tuple: Schema
val countBolt: Topology.Bolt<Schema>
val logBolt: Topology.Bolt<Schema>
val mylog: text: string -> unit
val text: string
type LogLevel = | Trace = 0 | Debug = 1 | Info = 2 | Warn = 3 | Error = 4
<summary> Storm log levels </summary>
LogLevel.Info: LogLevel = 2
type Shuffle = static member on: case: Expr<('a -> 't)> -> (bool -> ComponentId -> ComponentId -> Stream<'t>)
<summary> define shuffle grouping </summary>
static member Shuffle.on: case: Quotations.Expr<('a -> 't)> -> (bool -> Topology.ComponentId -> Topology.ComponentId -> Topology.Stream<'t>)
type Group = static member by: select: Expr<('t -> 'p)> -> (bool -> ComponentId -> ComponentId -> Stream<'t>)
<summary> define fields grouping </summary>
static member Group.by: select: Quotations.Expr<('t -> 'p)> -> (bool -> Topology.ComponentId -> Topology.ComponentId -> Topology.Stream<'t>)
val w: string
module Task from FsShelter
<summary> Task execution </summary>
val ofTopology: t: Topology.Topology<'t> -> compId: Topology.ComponentId -> Topology.Runnable<'t>
<summary> converts topology to a runnable task </summary>
val run: mkIO: (Log -> Topology.IO<'t>) -> task: Task.Task<'t> -> unit
<summary> Reads the handshake and runs the specified task </summary>
module ProtoIO from FsShelter
<summary> Protobuf IO implementation </summary>
val start: log: Log -> Topology.IO<'a>
<summary> Start IO over STDIN/STDOUT and serialized sychrnonization using specifed logger </summary>
module DotGraph from FsShelter
<summary> Converters into GraphViz </summary>
val writeToConsole: t: Topology.Topology<'a> -> unit
<summary> put together default implementations to write to STDOUT </summary>
val writeColourizedToConsole: t: Topology.Topology<'a> -> unit
<summary> put together default implementations with colour to write to STDOUT </summary>
val customColours: string array
val exportToDot: writeHeader: ('a -> Topology.Topology<'b> -> unit) * writeFooter: ('a -> Topology.Topology<'b> -> 'c) * writeSpout: ('a -> Topology.ComponentId -> Topology.Spout<'b> -> unit) * writeBolt: ('a -> Topology.ComponentId -> Topology.Bolt<'b> -> unit) * writeStream: ('a -> Topology.StreamId * Topology.ComponentId -> Topology.Stream<'b> -> unit) -> writer: 'a -> topology: Topology.Topology<'b> -> 'c (requires 'a :> IO.TextWriter)
<summary> send the topology to a given writer in GraphViz DOT format </summary>
val writeHeader: writer: IO.TextWriter -> t: Topology.Topology<'a> -> unit
<summary> opening graph statement </summary>
val writeFooter: writer: IO.TextWriter -> Topology.Topology<'a> -> unit
<summary> closing graph statement </summary>
val writeSpout: writer: IO.TextWriter -> id: string -> s: Topology.Spout<'a> -> unit
<summary> node statement for a spout </summary>
val writeBolt: writer: IO.TextWriter -> id: string -> b: Topology.Bolt<'a> -> unit
<summary> node statement for a bolt </summary>
val writeColourfulStream: getColour: (string -> string) -> writer: IO.TextWriter -> ('a * string) * 'b -> st: Topology.Stream<'c> -> unit
<summary> colourized edge statement for a stream </summary>
val getColour: colours: string array -> (string -> string)
<summary> lookup a colour based on a stream id </summary>
type Console = static member Beep: unit -> unit + 1 overload static member Clear: unit -> unit static member GetCursorPosition: unit -> struct (int * int) static member MoveBufferArea: sourceLeft: int * sourceTop: int * sourceWidth: int * sourceHeight: int * targetLeft: int * targetTop: int -> unit + 1 overload static member OpenStandardError: unit -> Stream + 1 overload static member OpenStandardInput: unit -> Stream + 1 overload static member OpenStandardOutput: unit -> Stream + 1 overload static member Read: unit -> int static member ReadKey: unit -> ConsoleKeyInfo + 1 overload static member ReadLine: unit -> string ...
<summary>Represents the standard input, output, and error streams for console applications. This class cannot be inherited.</summary>
property Console.Out: IO.TextWriter with get
<summary>Gets the standard output stream.</summary>
<returns>A <see cref="T:System.IO.TextWriter" /> that represents the standard output stream.</returns>

Type something to start searching.