FsShelter


Overview

FsShelter is a library that lets you implement Apache Storm components and topologies in F#. FsShelter is a major rewrite of FsStorm. It departs from FsStorm in significant ways and therefore has been split into its own project.

Overall, the library provides a "batteries included" experience with wrappers for the Nimbus API as well as support for packaging and exporting:

  • Bundle and submit a topology for execution without needing JDK or Storm CLI
  • Includes Storm-side serializer
  • Kill a running topology
  • Generate a topology graph as part of your build

The topology and the components can be implemented in a single EXE project and are executed by Storm via its multilang protocol as separate processes - one for each task/instance. The corresponding ProtoShell Storm-side library facilitates Protobuf serialization, which improves the throughput of FsShelter topologies as compared to default JSON. See samples to learn how to bundle the assemblies and a serializer for upload to Storm.

Bring your own, if you need it:

  • command line parser
  • logging
  • custom serializer

Migrating to FsShelter 2.0

The largest change in this release is the switch to synchronous signatures for spout and bolt functions. Primarily, this was driven by the need to reduce the footprint of self-hosting, but also by a realization that things like back-pressure are much easier to implement correctly using synchronous primitives. Asynchrony can be easily added where needed at the topology level. Other breaking changes are:

  • Statically-typed configuration (for a small subset of properties we use all the time)
  • Modularized Bolt and Spout DSL
  • Activate/Deactivate implementation for spouts

Activation is now handled implicitly - the dispatcher will wait for an activation command before constructing the arguments for the spout function. Deactivation is now an explicit argument into the spout DSL - use ignore if you don't have any deactivation semantics to implement.

FsShelter topology schema

While Storm tuples are dynamically typed, and to a large extent the types are transparent to Storm itself, they are not type-less. Mistakes and inconsistencies between declared outputs and tuple consumers could easily lead to errors detectable at run-time only and may be frustrating to test, detect and fix. FsShelter introduces the concept of a topology schema, defined as an F# discriminated union:

1: 
2: 
3: 
type BasicSchema = 
    | Original of int
    | Incremented of int

Every DU case becomes a distinct stream in the topology. The fields of each DU case will become tuple fields in Storm streams.

It is often handy to define a type that's shared across streams, and FsShelter supports defining cases with records:

1: 
2: 
3: 
4: 
5: 
6: 
type Number = { X:int; Desc:string }

type RecordSchema = 
    | Original of int
    | Described of Number
    | Translated of Number

It is also common to join/zip tuples from multiple streams, and FsShelter supports defining cases with records adjoined:

1: 
2: 
3: 
type RecordsSchema = 
    | Original of Number
    | Doubled of Number * Number

Other than the safety of working with a statically-verified schema, the reason we care about the structure of the tuples is because we reference them in Storm grouping definitions. FsShelter "flattens" the first immediate "layer" of the DU case so that all the fields, weither they come from the embedded record or the DU case itself, are available for grouping expressions.

Generic or nested schemas are also supported. For example:

1: 
2: 
3: 
4: 
5: 
6: 
7: 
8: 
type BasicSchema = 
    | Original of int
    | Incremented of int

type NestedSchema<'a> = 
    | Named of string
    | Nested of 'a
    

where a topology can be defined with the signature: Topology<NestedSchema<BasicSchema>>. This can be useful for implementing a base topology and extending it using a nested set of streams. Nested streams can be grouped on by adding the NestedStreamAttribute to the Nested case. Without this attribute, nested streams will be treated as blobs.

1: 
2: 
3: 
type NestedSchema<'a> = 
    | Named of string
    | [<NestedStream>] Nested of 'a

FsShelter components

Some of the flexibility of Storm has been hidden to provide a simple developer experience for authoring event-driven solutions. For exmple, FsShelter components are implemeted as simple functions:

1: 
2: 
// numbers spout - produces messages
let numbers source = return Some(Original(source()))

The async body of a spout is expected to return an option if there's a tuple to emit or None if there's nothing to emit at this time.

Bolts can get a tuple on any number of streams, and so we pattern match:

1: 
2: 
3: 
4: 
5: 
6: 
// add 1 bolt - consumes and emits messages to Incremented stream
let addOne (input, emit) = 
    match input with
    | BasicSchema.Original(x) -> Incremented(x + 1)
    | _ -> failwithf "unexpected input: %A" input
    |> emit

The bolt can also emit at any time, and we can hold on to the passed-in emit function (with caveats). Also, there can be as many arguments for the component functions as needed; the specifics will be determined when the components are put together in a topology.

1: 
2: 
3: 
4: 
5: 
// terminating bolt - consumes messages
let logResult (info, input) = 
    match input with
    | BasicSchema.Incremented(x) -> info (sprintf "%A" x)
    | _ -> failwithf "unexpected input: %A" input

Using F# DSL to define the topology

Storm topology is a graph of spouts and bolts connected via streams. FsShelter provides an embedded DSL for defining the topologies, which allows for mixing and matching of native Java, external shell, and FsShelter components:

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
11: 
12: 
13: 
14: 
15: 
16: 
17: 
18: 
19: 
20: 
21: 
22: 
23: 
24: 
25: 
26: 
27: 
28: 
// define our source dependency
let source = 
    let rnd = Random()
    fun () -> rnd.Next(0, 100)

open FsShelter.DSL
open FsShelter.Multilang

//define the Storm topology
let sampleTopology = 
    topology "Sample" { 
        let s1 = 
            numbers
            |> Spout.runUnreliable (fun log cfg -> source) // ignoring available Storm logging and cfg and passing our source function
                                   ignore                  // no deactivation
        let b1 = 
            addOne
            |> Bolt.run (fun log cfg tuple emit -> (tuple, emit)) // pass incoming tuple and emit function
            |> withParallelism 2 // override default parallelism of 1
        
        let b2 = 
            logResult
            |> Bolt.run (fun log cfg tuple emit -> ((log LogLevel.Info), tuple)) // example of passing Info-level Storm logger into the bolt
            |> withParallelism 2
        
        yield s1 --> b1 |> Shuffle.on BasicSchema.Original // emit from s1 to b1 on Original stream
        yield b1 --> b2 |> Shuffle.on Incremented // emit from b1 to b2 on Incremented stream
    }

Storm will start (a copy of) the same EXE for every component instance in the topology and will assign each instance a task it supposed to execute.

The topology can be packaged with all its dependecies and submitted using the embedded Nimbus client; see the examples for details.

Exporting the topology graph in DOT format (GraphViz) using F# scripts

Once the number of components grows beyond a trivial number, it is often handy to be able to visualize them. FsShelter includes a simple way to export the topology into a graph:

1: 
sampleTopology |> DotGraph.writeToConsole

See the samples included for further details.

Samples & documentation

  • WordCount contains an "unreliable" spout example - emitted tuples do not require ack, and could be lost in case of failure.

  • Guaranteed contains a "reliable" spout example - emitted tuples have a unique ID and require ack.

  • API Reference contains automatically generated documentation for public types, modules and functions in the library.

Getting FsShelter

The FsShelter library can be installed from NuGet or MyGet:
PM> Install-Package FsShelter
The library can also be tried out quickly as a Docker container, downloaded from docker hub:
$ docker run --name fsshelter-samples -d -p 8080:8080 FsStorm/fsshelter-samples

Contributing and copyright

The project is hosted on GitHub where you can report issues, fork the project and submit pull requests. If you're adding a new public API, please also consider adding samples that can be turned into a documentation. You might also want to read the library design notes to understand how it works.

The library is available under Apache 2.0 license, which allows modification and redistribution for both commercial and non-commercial purposes. For more information see the License file in the GitHub repository.

Commercial support

Commercial training and support are available from the project sponsor: FsStorm

namespace System
namespace FsShelter
Number.X: int
union case BasicSchema.Original: int -> BasicSchema
val x : int
union case BasicSchema.Incremented: int -> BasicSchema
Random.Next() : int
Random.Next(maxValue: int) : int
Random.Next(minValue: int, maxValue: int) : int
module DSL

from FsShelter
module Multilang

from FsShelter
val runUnreliable : mkArgs:((LogLevel -> string -> unit) -> Conf -> 'args) -> deactivate:('args -> unit) -> next:Next<'args,'t> -> Topology.Spout<'t>

Full name: FsShelter.DSL.Spout.runUnreliable
val run : mkArgs:((LogLevel -> string -> unit) -> Conf -> 't -> ('t -> unit) -> 'a) -> consume:Consume<'a> -> Topology.Bolt<'t>

Full name: FsShelter.DSL.Bolt.run
static member Shuffle.on : case:Quotations.Expr<('a0 -> 't)> -> (bool -> Topology.ComponentId -> Topology.ComponentId -> Topology.Stream<'t>)
val writeToConsole : t:Topology.Topology<'a> -> unit

Full name: FsShelter.DotGraph.writeToConsole
Fork me on GitHub