Core Concepts
FsShelter is a library for building stream-processing pipelines in F#. A pipeline is a directed graph where data flows from sources through processors, each transforming or routing messages along the way.
You don't need to know anything about Apache Storm to use FsShelter — this page covers the core ideas from scratch.
What is stream processing?
Imagine a data pipeline: events arrive continuously (sensor readings, log entries, user clicks), get transformed, filtered, aggregated, then stored or forwarded. Instead of collecting events into a batch and processing them later, stream processing handles each event as it arrives — giving you low latency and the ability to react in real time.
A stream-processing topology is a directed acyclic graph (DAG) of processing steps. FsShelter lets you define these graphs with F# types and functions, then run them in-process with high-performance message passing.
Key terminology
Term |
What it means |
|---|---|
Spout |
A data source. Produces messages and feeds them into the pipeline. Example: reading from a message queue, generating random data, tailing a log file. |
Bolt |
A processor. Receives messages, transforms them, and optionally emits new messages downstream. Example: splitting sentences into words, counting occurrences, filtering by threshold. |
Tuple |
A single message flowing through the pipeline. In FsShelter, tuples are instances of your F# discriminated union — fully typed. |
Stream |
A named, typed channel connecting components. Each case of your schema DU defines a distinct stream. |
Topology |
The complete graph: spouts, bolts, and the streams connecting them. |
Schema |
An F# discriminated union that defines all the message types in your topology. It provides static type safety across the entire pipeline. |
Topology schema
While most stream-processing systems use dynamically typed messages, FsShelter uses F# discriminated unions to statically type every stream. Mistakes and inconsistencies between producers and consumers are caught at compile time, not at runtime.
Every DU case becomes a distinct stream. The fields of each case become the tuple's payload:
type BasicSchema =
| Original of int
| Incremented of int
It is often handy to define a record type shared across streams:
type Number = { X:int; Desc:string }
type RecordSchema =
| Original of int
| Described of Number
| Translated of Number
Joining tuples from multiple streams is also supported:
type RecordsSchema =
| Original of Number
| Doubled of Number * Number
Generic or nested schemas work too. You can define a base schema and extend it:
type SimpleSchema =
| Value of int
| Transformed of int
type NestedSchema<'a> =
| Named of string
| [<NestedStream>] Nested of 'a
A topology using Topology<NestedSchema<SimpleSchema>> combines both sets of streams.
For more about schema structure, grouping expressions, and serializer details, see Schema reference.
Components
FsShelter components are plain F# functions — no base classes, no interfaces, no framework inheritance.
A spout returns an option: Some tuple to emit, None when there's nothing to emit right now:
// numbers spout - produces a message
let numbers source = Some(BasicSchema.Original(source()))
A bolt receives a tuple and optionally emits downstream. Pattern matching on the schema DU gives you exhaustive, type-safe dispatch:
// add one bolt - transforms and emits
let addOne (input, emit) =
match input with
| BasicSchema.Original(x) -> BasicSchema.Incremented(x + 1)
| _ -> failwithf "unexpected input: %A" input
|> emit
// terminating bolt - consumes without emitting
let logResult (info, input) =
match input with
| BasicSchema.Incremented(x) -> info (sprintf "%A" x)
| _ -> failwithf "unexpected input: %A" input
The arguments to your component functions are wired up when you define the topology — you choose exactly what each function receives. There's no global state or magic injection.
Topology DSL
FsShelter provides a computation expression for defining topologies. You declare components and connect them with arrows:
-->connects two components on a stream (unanchored — fire and forget)==>connects with anchoring (for reliable delivery — see below)Shuffle.ondistributes tuples across instances by hashingGroup.byroutes tuples with the same key to the same instance (affinity)withParallelism nrunsninstances of a component
Here's a minimal topology:
let source =
let rnd = Random()
fun () -> rnd.Next(0, 100)
open FsShelter.DSL
open FsShelter.Multilang
let sampleTopology =
topology "Sample" {
let s1 =
numbers
|> Spout.runUnreliable (fun log cfg -> source) ignore
let b1 =
addOne
|> Bolt.run (fun log cfg tuple emit -> (tuple, emit))
|> withParallelism 2
let b2 =
logResult
|> Bolt.run (fun log cfg tuple emit -> ((log LogLevel.Info), tuple))
|> withParallelism 2
yield s1 --> b1 |> Shuffle.on BasicSchema.Original
yield b1 --> b2 |> Shuffle.on BasicSchema.Incremented
}
The lambda arguments for the run methods construct the arguments passed to your component functions:
logis a logging factorycfgis the runtime configurationtupleis the incoming message (schema DU instance)emitis a function to emit a new tuple downstream
log and cfg are curried once at startup. The tuple and emit arguments arrive per-message.
Reliable vs unreliable delivery
FsShelter supports two delivery modes:
Unreliable (fire and forget): Tuples are emitted and processed but not tracked. If a bolt fails, the message is lost. This is the simplest mode and has the lowest overhead. Use it when losing an occasional message is acceptable (metrics, logging, non-critical analytics).
Reliable (guaranteed delivery): Every tuple emitted by a spout is tracked through the entire DAG. When all downstream bolts finish processing, the spout receives an ack. If any bolt fails or processing takes too long, the spout receives a nack and can retry. This is implemented via an XOR-tree tracking algorithm (see Acker algorithm for details).
In the DSL, the arrow operator determines the mode:
==>creates an anchored stream (starts or continues tracking)-->creates an unanchored stream (fire and forget)
See Word Count for an unreliable example and Guaranteed for a reliable example.
Running a topology
FsShelter topologies run entirely in-process — no external infrastructure required. The runtime uses Disruptor ring buffers for high-performance, low-latency message passing between components.
let shutdown = Hosting.run myTopology
This starts all components, wires up the message routing, and returns a function you call to shut down cleanly. For production use with auto-restart and logging, see Running Topologies.
If you need to deploy to an Apache Storm cluster for horizontal scale-out, the FsShelter.Multilang package provides Storm integration. See Word Count: Deploying to Storm for details.
Visualizing topologies
FsShelter can export any topology as a GraphViz DOT graph:
sampleTopology |> DotGraph.writeToConsole
Pipe the output through dot to generate SVG or PNG:
|
Next steps
- *Word Count* — Complete tutorial: schema, components, topology, graph export
- *Guaranteed delivery* — Reliable spouts with ack/nack and anchoring
- *Schema reference* — Grouping expressions, flattening, serializer details
- *Running Topologies* — Entry points, configuration, diagnostics
- *Routing* — How tuples are distributed across component instances
- *Architecture* — Internal runtime structure: tasks, executors, channels
- *Message Flow* — End-to-end processing scenarios with sequence diagrams
- *Acker Algorithm* — XOR-tree tracking for guaranteed delivery
val int: value: 'T -> int (requires member op_Explicit)
--------------------
type int = int32
--------------------
type int<'Measure> = int
val string: value: 'T -> string
--------------------
type string = String
type NestedStreamAttribute = inherit Attribute new: unit -> NestedStreamAttribute
<summary> Identifies a stream that should be "unfolded" into individual substream for each nested DU case. Only applies to single-argument constructor case that takes another DU case as nested. </summary>
--------------------
new: unit -> NestedStreamAttribute
type Random = new: unit -> unit + 1 overload member GetHexString: stringLength: int * ?lowercase: bool -> string + 1 overload member GetItems<'T> : choices: ReadOnlySpan<'T> * length: int -> 'T array + 2 overloads member GetString: choices: ReadOnlySpan<char> * length: int -> string member Next: unit -> int + 2 overloads member NextBytes: buffer: byte array -> unit + 1 overload member NextDouble: unit -> float member NextInt64: unit -> int64 + 2 overloads member NextSingle: unit -> float32 member Shuffle<'T> : values: Span<'T> -> unit + 1 overload ...
<summary>Represents a pseudo-random number generator, which is an algorithm that produces a sequence of numbers that meet certain statistical requirements for randomness.</summary>
--------------------
Random() : Random
Random(Seed: int) : Random
Random.Next(maxValue: int) : int
Random.Next(minValue: int, maxValue: int) : int
<summary> Embedded DSL for defining the topologies </summary>
<summary> Defines 'multilang' storm interaction as described here: http://FsShelter.apache.org/documentation/Multilang-protocol.html </summary>
<summary> topology builder instance </summary>
<summary> define spout with no processing guarantees mkArgs: one-time construction of arguments that will be passed into each next() call. next: spout function that returns a tuple option. </summary>
<summary> define a bolt that auto-acks mkArgs: curried construction of arguments (log and conf applied only once) that will be passed into each next() call. consume: bolt function that will receive incoming tuples. </summary>
<summary> override default parallelism </summary>
<summary> Storm log levels </summary>
<summary> define shuffle grouping </summary>
<summary> Converters into GraphViz </summary>
<summary> put together default implementations to write to STDOUT </summary>
FsShelter