Logo FsShelter

Core Concepts

FsShelter is a library for building stream-processing pipelines in F#. A pipeline is a directed graph where data flows from sources through processors, each transforming or routing messages along the way.

You don't need to know anything about Apache Storm to use FsShelter — this page covers the core ideas from scratch.

What is stream processing?

Imagine a data pipeline: events arrive continuously (sensor readings, log entries, user clicks), get transformed, filtered, aggregated, then stored or forwarded. Instead of collecting events into a batch and processing them later, stream processing handles each event as it arrives — giving you low latency and the ability to react in real time.

A stream-processing topology is a directed acyclic graph (DAG) of processing steps. FsShelter lets you define these graphs with F# types and functions, then run them in-process with high-performance message passing.

Key terminology

Term

What it means

Spout

A data source. Produces messages and feeds them into the pipeline. Example: reading from a message queue, generating random data, tailing a log file.

Bolt

A processor. Receives messages, transforms them, and optionally emits new messages downstream. Example: splitting sentences into words, counting occurrences, filtering by threshold.

Tuple

A single message flowing through the pipeline. In FsShelter, tuples are instances of your F# discriminated union — fully typed.

Stream

A named, typed channel connecting components. Each case of your schema DU defines a distinct stream.

Topology

The complete graph: spouts, bolts, and the streams connecting them.

Schema

An F# discriminated union that defines all the message types in your topology. It provides static type safety across the entire pipeline.

Topology schema

While most stream-processing systems use dynamically typed messages, FsShelter uses F# discriminated unions to statically type every stream. Mistakes and inconsistencies between producers and consumers are caught at compile time, not at runtime.

Every DU case becomes a distinct stream. The fields of each case become the tuple's payload:

type BasicSchema = 
    | Original of int
    | Incremented of int

It is often handy to define a record type shared across streams:

type Number = { X:int; Desc:string }

type RecordSchema = 
    | Original of int
    | Described of Number
    | Translated of Number

Joining tuples from multiple streams is also supported:

type RecordsSchema = 
    | Original of Number
    | Doubled of Number * Number

Generic or nested schemas work too. You can define a base schema and extend it:

type SimpleSchema = 
    | Value of int
    | Transformed of int

type NestedSchema<'a> = 
    | Named of string
    | [<NestedStream>] Nested of 'a

A topology using Topology<NestedSchema<SimpleSchema>> combines both sets of streams. For more about schema structure, grouping expressions, and serializer details, see Schema reference.

Components

FsShelter components are plain F# functions — no base classes, no interfaces, no framework inheritance.

A spout returns an option: Some tuple to emit, None when there's nothing to emit right now:

// numbers spout - produces a message
let numbers source = Some(BasicSchema.Original(source()))

A bolt receives a tuple and optionally emits downstream. Pattern matching on the schema DU gives you exhaustive, type-safe dispatch:

// add one bolt - transforms and emits
let addOne (input, emit) = 
    match input with
    | BasicSchema.Original(x) -> BasicSchema.Incremented(x + 1)
    | _ -> failwithf "unexpected input: %A" input
    |> emit

// terminating bolt - consumes without emitting
let logResult (info, input) = 
    match input with
    | BasicSchema.Incremented(x) -> info (sprintf "%A" x)
    | _ -> failwithf "unexpected input: %A" input

The arguments to your component functions are wired up when you define the topology — you choose exactly what each function receives. There's no global state or magic injection.

Topology DSL

FsShelter provides a computation expression for defining topologies. You declare components and connect them with arrows:

Here's a minimal topology:

let source = 
    let rnd = Random()
    fun () -> rnd.Next(0, 100)

open FsShelter.DSL
open FsShelter.Multilang

let sampleTopology = 
    topology "Sample" { 
        let s1 = 
            numbers
            |> Spout.runUnreliable (fun log cfg -> source) ignore
        let b1 = 
            addOne
            |> Bolt.run (fun log cfg tuple emit -> (tuple, emit))
            |> withParallelism 2
        let b2 = 
            logResult
            |> Bolt.run (fun log cfg tuple emit -> ((log LogLevel.Info), tuple))
            |> withParallelism 2
        
        yield s1 --> b1 |> Shuffle.on BasicSchema.Original
        yield b1 --> b2 |> Shuffle.on BasicSchema.Incremented
    }

The lambda arguments for the run methods construct the arguments passed to your component functions:

log and cfg are curried once at startup. The tuple and emit arguments arrive per-message.

Reliable vs unreliable delivery

FsShelter supports two delivery modes:

Unreliable (fire and forget): Tuples are emitted and processed but not tracked. If a bolt fails, the message is lost. This is the simplest mode and has the lowest overhead. Use it when losing an occasional message is acceptable (metrics, logging, non-critical analytics).

Reliable (guaranteed delivery): Every tuple emitted by a spout is tracked through the entire DAG. When all downstream bolts finish processing, the spout receives an ack. If any bolt fails or processing takes too long, the spout receives a nack and can retry. This is implemented via an XOR-tree tracking algorithm (see Acker algorithm for details).

In the DSL, the arrow operator determines the mode:

See Word Count for an unreliable example and Guaranteed for a reliable example.

Running a topology

FsShelter topologies run entirely in-process — no external infrastructure required. The runtime uses Disruptor ring buffers for high-performance, low-latency message passing between components.

let shutdown = Hosting.run myTopology

This starts all components, wires up the message routing, and returns a function you call to shut down cleanly. For production use with auto-restart and logging, see Running Topologies.

If you need to deploy to an Apache Storm cluster for horizontal scale-out, the FsShelter.Multilang package provides Storm integration. See Word Count: Deploying to Storm for details.

Visualizing topologies

FsShelter can export any topology as a GraphViz DOT graph:

sampleTopology |> DotGraph.writeToConsole

Pipe the output through dot to generate SVG or PNG:

dotnet run -- graph | dot -Tsvg -o topology.svg

Next steps

namespace System
namespace FsShelter
type BasicSchema = | Original of int | Incremented of int
Multiple items
val int: value: 'T -> int (requires member op_Explicit)

--------------------
type int = int32

--------------------
type int<'Measure> = int
type Number = { X: int Desc: string }
Number.X: int
Multiple items
val string: value: 'T -> string

--------------------
type string = String
type RecordSchema = | Original of int | Described of Number | Translated of Number
union case BasicSchema.Original: int -> BasicSchema
type RecordsSchema = | Original of Number | Doubled of Number * Number
union case RecordSchema.Original: int -> RecordSchema
type SimpleSchema = | Value of int | Transformed of int
type NestedSchema<'a> = | Named of string | Nested of 'a
'a
union case TupleId.Named: name: string -> TupleId
Multiple items
type NestedStreamAttribute = inherit Attribute new: unit -> NestedStreamAttribute
<summary> Identifies a stream that should be "unfolded" into individual substream for each nested DU case. Only applies to single-argument constructor case that takes another DU case as nested. </summary>

--------------------
new: unit -> NestedStreamAttribute
val numbers: source: (unit -> int) -> BasicSchema option
val source: (unit -> int)
union case Option.Some: Value: 'T -> Option<'T>
val addOne: input: BasicSchema * emit: (BasicSchema -> 'a) -> 'a
val input: BasicSchema
val emit: (BasicSchema -> 'a)
val x: int
union case BasicSchema.Incremented: int -> BasicSchema
val failwithf: format: Printf.StringFormat<'T,'Result> -> 'T
val logResult: info: (string -> 'a) * input: BasicSchema -> 'a
val info: (string -> 'a)
val sprintf: format: Printf.StringFormat<'T> -> 'T
val rnd: Random
Multiple items
type Random = new: unit -> unit + 1 overload member GetHexString: stringLength: int * ?lowercase: bool -> string + 1 overload member GetItems<'T> : choices: ReadOnlySpan<'T> * length: int -> 'T array + 2 overloads member GetString: choices: ReadOnlySpan<char> * length: int -> string member Next: unit -> int + 2 overloads member NextBytes: buffer: byte array -> unit + 1 overload member NextDouble: unit -> float member NextInt64: unit -> int64 + 2 overloads member NextSingle: unit -> float32 member Shuffle<'T> : values: Span<'T> -> unit + 1 overload ...
<summary>Represents a pseudo-random number generator, which is an algorithm that produces a sequence of numbers that meet certain statistical requirements for randomness.</summary>

--------------------
Random() : Random
Random(Seed: int) : Random
Random.Next() : int
Random.Next(maxValue: int) : int
Random.Next(minValue: int, maxValue: int) : int
module DSL from FsShelter
<summary> Embedded DSL for defining the topologies </summary>
module Multilang from FsShelter
<summary> Defines 'multilang' storm interaction as described here: http://FsShelter.apache.org/documentation/Multilang-protocol.html </summary>
val sampleTopology: Topology.Topology<BasicSchema>
val topology: name: string -> TopologyBuilder
<summary> topology builder instance </summary>
val s1: Topology.Spout<BasicSchema>
module Spout from FsShelter.DSL
val runUnreliable: mkArgs: ((LogLevel -> string -> unit) -> Conf -> 'args) -> deactivate: ('args -> unit) -> next: Next<'args,'t> -> Topology.Spout<'t>
<summary> define spout with no processing guarantees mkArgs: one-time construction of arguments that will be passed into each next() call. next: spout function that returns a tuple option. </summary>
val log: (LogLevel -> string -> unit)
val cfg: Conf
val ignore: value: 'T -> unit
val b1: Topology.Bolt<BasicSchema>
module Bolt from FsShelter.DSL
val run: mkArgs: ((LogLevel -> string -> unit) -> Conf -> 't -> ('t -> unit) -> 'a) -> consume: Consume<'a> -> Topology.Bolt<'t>
<summary> define a bolt that auto-acks mkArgs: curried construction of arguments (log and conf applied only once) that will be passed into each next() call. consume: bolt function that will receive incoming tuples. </summary>
val tuple: BasicSchema
val emit: (BasicSchema -> unit)
val withParallelism: parallelism: 'a -> spec: 's -> 's (requires member op_Explicit and member WithParallelism)
<summary> override default parallelism </summary>
val b2: Topology.Bolt<BasicSchema>
type LogLevel = | Trace = 0 | Debug = 1 | Info = 2 | Warn = 3 | Error = 4
<summary> Storm log levels </summary>
LogLevel.Info: LogLevel = 2
type Shuffle = static member on: case: Expr<('a -> 't)> -> (bool -> ComponentId -> ComponentId -> Stream<'t>)
<summary> define shuffle grouping </summary>
static member Shuffle.on: case: Quotations.Expr<('a -> 't)> -> (bool -> Topology.ComponentId -> Topology.ComponentId -> Topology.Stream<'t>)
val shutdown: obj
module DotGraph from FsShelter
<summary> Converters into GraphViz </summary>
val writeToConsole: t: Topology.Topology<'a> -> unit
<summary> put together default implementations to write to STDOUT </summary>

Type something to start searching.