Logo FsShelter

Core Concepts

FsShelter is a library for building stream-processing pipelines in F#. A pipeline is a directed graph where data flows from sources through processors, each transforming or routing messages along the way.

You don't need to know anything about Apache Storm to use FsShelter — this page covers the core ideas from scratch.

What is stream processing?

Imagine a data pipeline: events arrive continuously (sensor readings, log entries, user clicks), get transformed, filtered, aggregated, then stored or forwarded. Instead of collecting events into a batch and processing them later, stream processing handles each event as it arrives — giving you low latency and the ability to react in real time.

A stream-processing topology is a directed acyclic graph (DAG) of processing steps. FsShelter lets you define these graphs with F# types and functions, then run them in-process with high-performance message passing.

Key terminology

Term

What it means

Spout

A data source. Produces messages and feeds them into the pipeline. Example: reading from a message queue, generating random data, tailing a log file.

Bolt

A processor. Receives messages, transforms them, and optionally emits new messages downstream. Example: splitting sentences into words, counting occurrences, filtering by threshold.

Tuple

A single message flowing through the pipeline. In FsShelter, tuples are instances of your F# discriminated union — fully typed.

Stream

A named, typed channel connecting components. Each case of your schema DU defines a distinct stream.

Topology

The complete graph: spouts, bolts, and the streams connecting them.

Schema

An F# discriminated union that defines all the message types in your topology. It provides static type safety across the entire pipeline.

Topology schema

While most stream-processing systems use dynamically typed messages, FsShelter uses F# discriminated unions to statically type every stream. Mistakes and inconsistencies between producers and consumers are caught at compile time, not at runtime.

Every DU case becomes a distinct stream. The fields of each case become the tuple's payload:

type BasicSchema = 
    | Original of int
    | Incremented of int

It is often handy to define a record type shared across streams:

type Number = { X:int; Desc:string }

type RecordSchema = 
    | Original of int
    | Described of Number
    | Translated of Number

Joining tuples from multiple streams is also supported:

type RecordsSchema = 
    | Original of Number
    | Doubled of Number * Number

Generic or nested schemas work too. You can define a base schema and extend it:

type SimpleSchema = 
    | Value of int
    | Transformed of int

type NestedSchema<'a> = 
    | Named of string
    | [<NestedStream>] Nested of 'a

A topology using Topology<NestedSchema<SimpleSchema>> combines both sets of streams. For more about schema structure, grouping expressions, and serializer details, see Schema reference.

Components

FsShelter components are plain F# functions — no base classes, no interfaces, no framework inheritance.

A spout returns an option: Some tuple to emit, None when there's nothing to emit right now:

// numbers spout - produces a message
let numbers source = Some(BasicSchema.Original(source()))

A bolt receives a tuple and optionally emits downstream. Pattern matching on the schema DU gives you exhaustive, type-safe dispatch:

// add one bolt - transforms and emits
let addOne (input, emit) = 
    match input with
    | BasicSchema.Original(x) -> BasicSchema.Incremented(x + 1)
    | _ -> failwithf "unexpected input: %A" input
    |> emit

// terminating bolt - consumes without emitting
let logResult (info, input) = 
    match input with
    | BasicSchema.Incremented(x) -> info (sprintf "%A" x)
    | _ -> failwithf "unexpected input: %A" input

The arguments to your component functions are wired up when you define the topology — you choose exactly what each function receives. There's no global state or magic injection.

Async components

When your spout or bolt needs to perform I/O (database queries, HTTP calls, file access), you can use the async variants. Instead of returning a value directly, async components return a Task:

open System.Threading.Tasks

// async spout - produces a message via async I/O
let asyncNumbers source : Task<BasicSchema option> =
    task {
        let value = source()
        return Some(BasicSchema.Original value)
    }

// async bolt - transforms via async I/O
let asyncAddOne (input, emit) : Task<unit> =
    task {
        match input with
        | BasicSchema.Original(x) -> BasicSchema.Incremented(x + 1) |> emit
        | _ -> failwithf "unexpected input: %A" input
    }

The type signatures are:

Async components run on async executors — the Disruptor thread awaits each Task, allowing non-blocking I/O without dedicating extra threads. All configuration (withParallelism, withExecutors, etc.) and delivery guarantees (reliable/unreliable) work identically.

Topology DSL

FsShelter provides a computation expression for defining topologies. You declare components and connect them with arrows:

Here's a minimal topology:

let source = 
    let rnd = Random()
    fun () -> rnd.Next(0, 100)

open FsShelter.DSL
open FsShelter.Multilang

let sampleTopology = 
    topology "Sample" { 
        let s1 = 
            numbers
            |> Spout.runUnreliable (fun log cfg -> source) ignore
        let b1 = 
            addOne
            |> Bolt.run (fun log cfg tuple emit -> (tuple, emit))
            |> withParallelism 2
        let b2 = 
            logResult
            |> Bolt.run (fun log cfg tuple emit -> ((log LogLevel.Info), tuple))
            |> withParallelism 2
        
        yield s1 --> b1 |> Shuffle.on BasicSchema.Original
        yield b1 --> b2 |> Shuffle.on BasicSchema.Incremented
    }

The lambda arguments for the run methods construct the arguments passed to your component functions:

log and cfg are curried once at startup. The tuple and emit arguments arrive per-message.

The async variants have the same structure — just swap Spout.run* for Spout.run*Async and Bolt.run for Bolt.runAsync:

// Async spout (reliable)
let s1 = asyncNumbers
         |> Spout.runReliableAsync (fun log cfg -> source) (fun s -> ack, nack) ignore

// Async spout (unreliable)
let s1 = asyncNumbers
         |> Spout.runUnreliableAsync (fun log cfg -> source) ignore

// Async bolt (auto-ack on success)
let b1 = asyncAddOne
         |> Bolt.runAsync (fun log cfg tuple emit -> (tuple, emit))

// Async bolt (always nack — terminator)
let b2 = asyncLog
         |> Bolt.runTerminatorAsync (fun log cfg tuple _ -> (log, tuple))

Sync and async components can be mixed freely in the same topology. The hosting runtime detects the component type and uses the appropriate executor.

Reliable vs unreliable delivery

FsShelter supports two delivery modes:

Unreliable (fire and forget): Tuples are emitted and processed but not tracked. If a bolt fails, the message is lost. This is the simplest mode and has the lowest overhead. Use it when losing an occasional message is acceptable (metrics, logging, non-critical analytics).

Reliable (guaranteed delivery): Every tuple emitted by a spout is tracked through the entire DAG. When all downstream bolts finish processing, the spout receives an ack. If any bolt fails or processing takes too long, the spout receives a nack and can retry. This is implemented via an XOR-tree tracking algorithm (see Acker algorithm for details).

In the DSL, the arrow operator determines the mode:

See Word Count for an unreliable example and Guaranteed for a reliable example.

Running a topology

FsShelter topologies run entirely in-process — no external infrastructure required. The runtime uses Disruptor ring buffers for high-performance, low-latency message passing between components.

let shutdown = Hosting.run myTopology

This starts all components, wires up the message routing, and returns a function you call to shut down cleanly. For production use with auto-restart and logging, see Running Topologies.

If you need to deploy to an Apache Storm cluster for horizontal scale-out, the FsShelter.Multilang package provides Storm integration. See Word Count: Deploying to Storm for details.

Visualizing topologies

FsShelter can export any topology as a GraphViz DOT graph:

sampleTopology |> DotGraph.writeToConsole

Pipe the output through dot to generate SVG or PNG:

dotnet run -- graph | dot -Tsvg -o topology.svg

Next steps

namespace System
namespace FsShelter
type BasicSchema = | Original of int | Incremented of int
Multiple items
val int: value: 'T -> int (requires member op_Explicit)

--------------------
type int = int32

--------------------
type int<'Measure> = int
type Number = { X: int Desc: string }
Number.X: int
Multiple items
val string: value: 'T -> string

--------------------
type string = String
type RecordSchema = | Original of int | Described of Number | Translated of Number
union case BasicSchema.Original: int -> BasicSchema
type RecordsSchema = | Original of Number | Doubled of Number * Number
union case RecordSchema.Original: int -> RecordSchema
type SimpleSchema = | Value of int | Transformed of int
type NestedSchema<'a> = | Named of string | Nested of 'a
'a
Multiple items
type NestedStreamAttribute = inherit Attribute new: unit -> NestedStreamAttribute
<summary> Identifies a stream that should be "unfolded" into individual substream for each nested DU case. Only applies to single-argument constructor case that takes another DU case as nested. </summary>

--------------------
new: unit -> NestedStreamAttribute
val numbers: source: (unit -> int) -> BasicSchema option
val source: (unit -> int)
union case Option.Some: Value: 'T -> Option<'T>
val addOne: input: BasicSchema * emit: (BasicSchema -> 'a) -> 'a
val input: BasicSchema
val emit: (BasicSchema -> 'a)
val x: int
union case BasicSchema.Incremented: int -> BasicSchema
val failwithf: format: Printf.StringFormat<'T,'Result> -> 'T
val logResult: info: (string -> 'a) * input: BasicSchema -> 'a
val info: (string -> 'a)
val sprintf: format: Printf.StringFormat<'T> -> 'T
namespace System.Threading
namespace System.Threading.Tasks
val asyncNumbers: source: (unit -> int) -> Task<BasicSchema option>
Multiple items
type Task = interface IAsyncResult interface IDisposable new: action: Action -> unit + 7 overloads member ConfigureAwait: continueOnCapturedContext: bool -> ConfiguredTaskAwaitable + 1 overload member ContinueWith: continuationAction: Action<Task,obj> * state: obj -> Task + 19 overloads member Dispose: unit -> unit member GetAwaiter: unit -> TaskAwaiter member RunSynchronously: unit -> unit + 1 overload member Start: unit -> unit + 1 overload member Wait: unit -> unit + 5 overloads ...
<summary>Represents an asynchronous operation.</summary>

--------------------
type Task<'TResult> = inherit Task new: ``function`` : Func<obj,'TResult> * state: obj -> unit + 7 overloads member ConfigureAwait: continueOnCapturedContext: bool -> ConfiguredTaskAwaitable<'TResult> + 1 overload member ContinueWith: continuationAction: Action<Task<'TResult>,obj> * state: obj -> Task + 19 overloads member GetAwaiter: unit -> TaskAwaiter<'TResult> member WaitAsync: cancellationToken: CancellationToken -> Task<'TResult> + 4 overloads member Result: 'TResult static member Factory: TaskFactory<'TResult>
<summary>Represents an asynchronous operation that can return a value.</summary>
<typeparam name="TResult">The type of the result produced by this <see cref="T:System.Threading.Tasks.Task`1" />.</typeparam>


--------------------
Task(action: Action) : Task
Task(action: Action, cancellationToken: Threading.CancellationToken) : Task
Task(action: Action, creationOptions: TaskCreationOptions) : Task
Task(action: Action<obj>, state: obj) : Task
Task(action: Action, cancellationToken: Threading.CancellationToken, creationOptions: TaskCreationOptions) : Task
Task(action: Action<obj>, state: obj, cancellationToken: Threading.CancellationToken) : Task
Task(action: Action<obj>, state: obj, creationOptions: TaskCreationOptions) : Task
Task(action: Action<obj>, state: obj, cancellationToken: Threading.CancellationToken, creationOptions: TaskCreationOptions) : Task

--------------------
Task(``function`` : Func<'TResult>) : Task<'TResult>
Task(``function`` : Func<obj,'TResult>, state: obj) : Task<'TResult>
Task(``function`` : Func<'TResult>, cancellationToken: Threading.CancellationToken) : Task<'TResult>
Task(``function`` : Func<'TResult>, creationOptions: TaskCreationOptions) : Task<'TResult>
Task(``function`` : Func<obj,'TResult>, state: obj, cancellationToken: Threading.CancellationToken) : Task<'TResult>
Task(``function`` : Func<obj,'TResult>, state: obj, creationOptions: TaskCreationOptions) : Task<'TResult>
Task(``function`` : Func<'TResult>, cancellationToken: Threading.CancellationToken, creationOptions: TaskCreationOptions) : Task<'TResult>
Task(``function`` : Func<obj,'TResult>, state: obj, cancellationToken: Threading.CancellationToken, creationOptions: TaskCreationOptions) : Task<'TResult>
type 'T option = Option<'T>
val task: TaskBuilder
val value: int
val asyncAddOne: input: BasicSchema * emit: (BasicSchema -> unit) -> Task<unit>
val emit: (BasicSchema -> unit)
type unit = Unit
val rnd: Random
Multiple items
type Random = new: unit -> unit + 1 overload member GetHexString: stringLength: int * ?lowercase: bool -> string + 1 overload member GetItems<'T> : choices: ReadOnlySpan<'T> * length: int -> 'T array + 2 overloads member GetString: choices: ReadOnlySpan<char> * length: int -> string member Next: unit -> int + 2 overloads member NextBytes: buffer: byte array -> unit + 1 overload member NextDouble: unit -> float member NextInt64: unit -> int64 + 2 overloads member NextSingle: unit -> float32 member Shuffle<'T> : values: Span<'T> -> unit + 1 overload ...
<summary>Represents a pseudo-random number generator, which is an algorithm that produces a sequence of numbers that meet certain statistical requirements for randomness.</summary>

--------------------
Random() : Random
Random(Seed: int) : Random
Random.Next() : int
Random.Next(maxValue: int) : int
Random.Next(minValue: int, maxValue: int) : int
module DSL from FsShelter
<summary> Embedded DSL for defining the topologies </summary>
module Multilang from FsShelter
<summary> Defines 'multilang' storm interaction as described here: http://FsShelter.apache.org/documentation/Multilang-protocol.html </summary>
val sampleTopology: Topology.Topology<BasicSchema>
val topology: name: string -> TopologyBuilder
<summary> topology builder instance </summary>
val s1: Topology.Spout<BasicSchema>
module Spout from FsShelter.DSL
val runUnreliable: mkArgs: ((LogLevel -> string -> unit) -> Conf -> 'args) -> deactivate: ('args -> unit) -> next: Next<'args,'t> -> Topology.Spout<'t>
<summary> define spout with no processing guarantees mkArgs: one-time construction of arguments that will be passed into each next() call. next: spout function that returns a tuple option. </summary>
val log: (LogLevel -> string -> unit)
val cfg: Conf
val ignore: value: 'T -> unit
val b1: Topology.Bolt<BasicSchema>
module Bolt from FsShelter.DSL
val run: mkArgs: ((LogLevel -> string -> unit) -> Conf -> 't -> ('t -> unit) -> 'a) -> consume: Consume<'a> -> Topology.Bolt<'t>
<summary> define a bolt that auto-acks mkArgs: curried construction of arguments (log and conf applied only once) that will be passed into each next() call. consume: bolt function that will receive incoming tuples. </summary>
val tuple: BasicSchema
val withParallelism: parallelism: 'a -> spec: 's -> 's (requires member op_Explicit and member WithParallelism)
<summary> override default parallelism </summary>
val b2: Topology.Bolt<BasicSchema>
type LogLevel = | Trace = 0 | Debug = 1 | Info = 2 | Warn = 3 | Error = 4
<summary> Storm log levels </summary>
LogLevel.Info: LogLevel = 2
type Shuffle = static member on: case: Expr<('a -> 't)> -> (bool -> ComponentId -> ComponentId -> Stream<'t>)
<summary> define shuffle grouping </summary>
static member Shuffle.on: case: Quotations.Expr<('a -> 't)> -> (bool -> Topology.ComponentId -> Topology.ComponentId -> Topology.Stream<'t>)
val s1: obj
val log: value: 'T -> 'T (requires member Log)
val b1: obj
val b2: obj
val shutdown: obj
module DotGraph from FsShelter
<summary> Converters into GraphViz </summary>
val writeToConsole: t: Topology.Topology<'a> -> unit
<summary> put together default implementations to write to STDOUT </summary>

Type something to start searching.