Word Count
This tutorial walks through a complete fire-and-forget word count topology. We'll define a schema, implement components, wire them together, and visualize the result. For background on the concepts used here, see Core Concepts.
Defining the schema
FsShelter uses F# discriminated unions to statically type streams:
// data schema for the topology, every case is a unqiue stream
type Schema =
| Sentence of string
| Word of string
| WordCount of string * int64
Defining unreliable spouts
FsShelter spouts can be implemented as "reliable" or "unreliable".
Either implementation is a single function, returning an async option, where None indicates there's nothing to emit from this spout at this moment:
// sentences spout - feeds messages into the topology
let sentences source = source() |> Sentence |> Some
Defining bolts
A couple of examples of FsShelter bolts that read a tuple and emit another one:
// split bolt - consumes sentences and emits words
let splitIntoWords (input, emit) =
match input with
| Sentence s -> s.Split([|' '|],StringSplitOptions.RemoveEmptyEntries)
|> Seq.map Word
|> Seq.iter emit
| _ -> failwithf "unexpected input: %A" input
// count words bolt
let countWords (input, increment, emit) =
match input with
| Word word -> WordCount (word, increment word) |> emit
| _ -> failwithf "unexpected input: %A" input
And a terminating bolt that reads a tuple, but doesn't emit anything:
// log word count - terminating bolt
let logResult (log, input) =
match input with
| WordCount (word,count) -> log (sprintf "%s: %d" word count)
| _ -> failwithf "unexpected input: %A" input
We will pass these implementations into the component functions when we put things together in a topology definition.
let source =
let rnd = new System.Random()
let sentences = [ "the cow jumped over the moon"
"an apple a day keeps the doctor away"
"four score and seven years ago"
"snow white and the seven dwarfs"
"i am at two with nature" ]
fun () -> sentences.[ rnd.Next(0, sentences.Length) ]
// increment word count and return new value
let increment =
let cache = Collections.Concurrent.ConcurrentDictionary<string,int64 ref>()
fun word ->
let c = cache.GetOrAdd(word, ref 0L)
Threading.Interlocked.Increment &c.contents
Using F# DSL to define a topology
A typical (event-streaming) Storm topology is a graph of spouts and bolts connected via streams that can be defined via a topology computation expression:
open FsShelter.DSL
#nowarn "25" / // for stream grouping expressions
//define the storm topology
let sampleTopology =
topology "WordCount" {
let sentencesSpout =
sentences
|> Spout.runUnreliable (fun log cfg -> source) // make arguments: ignoring Storm logging and cfg, passing our source function
ignore // no deactivation
|> withParallelism 4
let splitBolt =
splitIntoWords
|> Bolt.run (fun log cfg tuple emit -> (tuple, emit)) // make arguments: pass incoming tuple and emit function
|> withParallelism 4
let countBolt =
countWords
|> Bolt.run (fun log cfg tuple emit -> (tuple, increment, emit))
|> withParallelism 4
let logBolt =
logResult
|> Bolt.run (fun log cfg -> // make arguments: pass PID-log and incoming tuple
let mylog text = log LogLevel.Info text
fun tuple emit -> (mylog, tuple))
|> withParallelism 2
yield sentencesSpout --> splitBolt |> Shuffle.on Sentence // emit from sentencesSpout to splitBolt on Sentence stream, shuffle among target task instances
yield splitBolt --> countBolt |> Group.by (function Word w -> w) // emit from splitBolt into countBolt on Word stream, group by word (into the same task instance)
yield countBolt --> logBolt |> Group.by (function WordCount (w,_) -> w) // emit from countBolt into logBolt on WordCount stream, group by word value
}
Here we define the graph by declaring the components and connecting them with arrows. The lambda arguments for the "run" methods provide the opportunity to carry out construction of the arguments that will be passed into the component functions, where:
logis the Storm log factorycfgis the runtime configuration passed in from Stormtupleis the instance of the schema DU coming inemitis the function to emit another tuple
log and cfg are fixed once (curried) and as demonstrated in the logBolt's mkArgs lambda, one time-initialization can be carried out by inserting arbitrary code before tuple and emit arguments.
This initialization will not be triggered unless the task execution is actually requested by Storm for this specific instance of the process.
Deploying to Storm
To deploy to an Apache Storm cluster, the FsShelter.Multilang package provides functions to package assemblies, upload to Nimbus, and run the multilang handshake:
sampleTopology
|> Task.ofTopology
|> Task.run ProtoIO.start
Both JsonIO and ProtoIO serializers are provided. For faster IO, consider ProtoShell. Note that STDIN/STDOUT are reserved for Storm communications when running as a multilang component.
Exporting the topology graph
FsShelter includes a completely customizable GraphViz (dot) export functionality. Here's what the word count topology looks like with default renderers:
The dotted lines represent "unanchored" streams and the number inside the [] shows the parallelism hint.
This was achived by a simple export to console:
sampleTopology |> DotGraph.writeToConsole
Followed by further conversion into a desired format, piping the markup into GraphViz:
|
It is also possible to generate graphs with colour-coded streams:
sampleTopology |> DotGraph.writeColourizedToConsole
Alternatively, you can provide your own X11 colour scheme:
let customColours = [| "purple"; "dodgerblue"; "springgreen"; "olivedrab"; "orange"; "orangered"; "maroon"; "black" |]
sampleTopology
|> DotGraph.exportToDot (DotGraph.writeHeader, DotGraph.writeFooter, DotGraph.writeSpout, DotGraph.writeBolt, DotGraph.writeColourfulStream <| DotGraph.getColour customColours) System.Console.Out
val string: value: 'T -> string
--------------------
type string = String
val int64: value: 'T -> int64 (requires member op_Explicit)
--------------------
type int64 = Int64
--------------------
type int64<'Measure> = int64
(+0 other overloads)
String.Split( separator: char array) : string array
(+0 other overloads)
String.Split(separator: string array, options: StringSplitOptions) : string array
(+0 other overloads)
String.Split(separator: string, ?options: StringSplitOptions) : string array
(+0 other overloads)
String.Split(separator: char array, options: StringSplitOptions) : string array
(+0 other overloads)
String.Split(separator: char array, count: int) : string array
(+0 other overloads)
String.Split(separator: char, ?options: StringSplitOptions) : string array
(+0 other overloads)
String.Split(separator: string array, count: int, options: StringSplitOptions) : string array
(+0 other overloads)
String.Split(separator: string, count: int, ?options: StringSplitOptions) : string array
(+0 other overloads)
String.Split(separator: char array, count: int, options: StringSplitOptions) : string array
(+0 other overloads)
<summary>Specifies options for applicable <see cref="Overload:System.String.Split" /> method overloads, such as whether to omit empty substrings from the returned array or trim whitespace from substrings.</summary>
type Random = new: unit -> unit + 1 overload member GetHexString: stringLength: int * ?lowercase: bool -> string + 1 overload member GetItems<'T> : choices: ReadOnlySpan<'T> * length: int -> 'T array + 2 overloads member GetString: choices: ReadOnlySpan<char> * length: int -> string member Next: unit -> int + 2 overloads member NextBytes: buffer: byte array -> unit + 1 overload member NextDouble: unit -> float member NextInt64: unit -> int64 + 2 overloads member NextSingle: unit -> float32 member Shuffle<'T> : values: Span<'T> -> unit + 1 overload ...
<summary>Represents a pseudo-random number generator, which is an algorithm that produces a sequence of numbers that meet certain statistical requirements for randomness.</summary>
--------------------
Random() : Random
Random(Seed: int) : Random
Random.Next(maxValue: int) : int
Random.Next(minValue: int, maxValue: int) : int
namespace System.Collections
--------------------
namespace Microsoft.FSharp.Collections
type ConcurrentDictionary<'TKey,'TValue> = interface ICollection<KeyValuePair<'TKey,'TValue>> interface seq<KeyValuePair<'TKey,'TValue>> interface IEnumerable interface IDictionary<'TKey,'TValue> interface IReadOnlyCollection<KeyValuePair<'TKey,'TValue>> interface IReadOnlyDictionary<'TKey,'TValue> interface ICollection interface IDictionary new: unit -> unit + 6 overloads member AddOrUpdate: key: 'TKey * addValueFactory: Func<'TKey,'TValue> * updateValueFactory: Func<'TKey,'TValue,'TValue> -> 'TValue + 2 overloads ...
<summary>Represents a thread-safe collection of key/value pairs that can be accessed by multiple threads concurrently.</summary>
<typeparam name="TKey">The type of the keys in the dictionary.</typeparam>
<typeparam name="TValue">The type of the values in the dictionary.</typeparam>
--------------------
Collections.Concurrent.ConcurrentDictionary() : Collections.Concurrent.ConcurrentDictionary<'TKey,'TValue>
Collections.Concurrent.ConcurrentDictionary(collection: Collections.Generic.KeyValuePair<'TKey,'TValue> seq) : Collections.Concurrent.ConcurrentDictionary<'TKey,'TValue>
Collections.Concurrent.ConcurrentDictionary(comparer: Collections.Generic.IEqualityComparer<'TKey>) : Collections.Concurrent.ConcurrentDictionary<'TKey,'TValue>
Collections.Concurrent.ConcurrentDictionary(collection: Collections.Generic.KeyValuePair<'TKey,'TValue> seq, comparer: Collections.Generic.IEqualityComparer<'TKey>) : Collections.Concurrent.ConcurrentDictionary<'TKey,'TValue>
Collections.Concurrent.ConcurrentDictionary(concurrencyLevel: int, capacity: int) : Collections.Concurrent.ConcurrentDictionary<'TKey,'TValue>
Collections.Concurrent.ConcurrentDictionary(concurrencyLevel: int, collection: Collections.Generic.KeyValuePair<'TKey,'TValue> seq, comparer: Collections.Generic.IEqualityComparer<'TKey>) : Collections.Concurrent.ConcurrentDictionary<'TKey,'TValue>
Collections.Concurrent.ConcurrentDictionary(concurrencyLevel: int, capacity: int, comparer: Collections.Generic.IEqualityComparer<'TKey>) : Collections.Concurrent.ConcurrentDictionary<'TKey,'TValue>
val ref: value: 'T -> 'T ref
--------------------
type 'T ref = Ref<'T>
Collections.Concurrent.ConcurrentDictionary.GetOrAdd(key: string, valueFactory: Func<string,int64 ref>) : int64 ref
Collections.Concurrent.ConcurrentDictionary.GetOrAdd<'TArg (allows ref struct)>(key: string, valueFactory: Func<string,'TArg,int64 ref>, factoryArgument: 'TArg) : int64 ref
<summary>Provides atomic operations for variables that are shared by multiple threads.</summary>
Threading.Interlocked.Increment(location: byref<uint32>) : uint32
Threading.Interlocked.Increment(location: byref<int64>) : int64
Threading.Interlocked.Increment(location: byref<int>) : int
<summary> Embedded DSL for defining the topologies </summary>
<summary> topology builder instance </summary>
<summary> define spout with no processing guarantees mkArgs: one-time construction of arguments that will be passed into each next() call. next: spout function that returns a tuple option. </summary>
<summary> override default parallelism </summary>
<summary> define a bolt that auto-acks mkArgs: curried construction of arguments (log and conf applied only once) that will be passed into each next() call. consume: bolt function that will receive incoming tuples. </summary>
<summary> Storm log levels </summary>
<summary> define shuffle grouping </summary>
<summary> define fields grouping </summary>
<summary> Task execution </summary>
<summary> converts topology to a runnable task </summary>
<summary> Reads the handshake and runs the specified task </summary>
<summary> Protobuf IO implementation </summary>
<summary> Start IO over STDIN/STDOUT and serialized sychrnonization using specifed logger </summary>
<summary> Converters into GraphViz </summary>
<summary> put together default implementations to write to STDOUT </summary>
<summary> put together default implementations with colour to write to STDOUT </summary>
<summary> send the topology to a given writer in GraphViz DOT format </summary>
<summary> opening graph statement </summary>
<summary> closing graph statement </summary>
<summary> node statement for a spout </summary>
<summary> node statement for a bolt </summary>
<summary> colourized edge statement for a stream </summary>
<summary> lookup a colour based on a stream id </summary>
<summary>Represents the standard input, output, and error streams for console applications. This class cannot be inherited.</summary>
<summary>Gets the standard output stream.</summary>
<returns>A <see cref="T:System.IO.TextWriter" /> that represents the standard output stream.</returns>
FsShelter