Running Topologies
We call running a FsShelter topology entirely inside a .NET process "self-hosting". The feature simulates Storm cluster and can be used for testing or production deployments where scale-out capability of Storm is not required. The 2.0 release of FsShelter reimplements this feature using the same technology Storm does - Disruptor queues, greatly improving the performance and further reducing overhead.
Limitations:
- only FsShelter components can be self-hosted, no shell or java;
- only does in-proc message passing - can't scale out.
Benefits:
- Self-hosted F# components perform on par with native JVM components under Storm, while taking as little as 30MB and ~20 threads.
- no Storm, zookeeper or even JRE are required, while delivering the same processing guarantees including tuple timeouts.
Start small then scale out to run on a cluster, with no changes to your code! Use the same building blocks that run in the cloud... anywhere!
For details refer to the Guaranteed sample, which demonstrates how to run the topology either way.
See also:
- Architecture — Internal runtime structure: tasks, executors, channels, topology construction, and lifecycle.
- Message flow — End-to-end processing scenarios with diagrams: reliable/unreliable delivery, backpressure, timeouts, failures, and restarts.
- Acker algorithm — XOR-tree tuple tracking: how anchoring, acking, and nacking work to provide guaranteed message processing.
- Routing — Tuple routing between components: grouping strategies (Shuffle, Fields, All, Direct) and stream-based fan-out.
Entry points
Hosting.run
Simplest entry point. No logging, with auto-restart (delegates to runWith). Returns a shutdown function:
let run topology = runWith (fun _ _ -> ignore) topology
Hosting.runWith (auto-restart with backoff)
Production entry point. Accepts a logging factory and auto-restarts on unhandled exceptions with exponential backoff and a maximum restart count:
let runWith (startLog: int -> Log) (topology: Topology<'t>) =
let maxRestarts = 5
let mutable restartCount = 0
let rec restart (ex: exn) =
if restartCount >= maxRestarts then
// give up after maxRestarts
else
restartCount <- restartCount + 1
let backoffMs = min (1000 * (1 <<< restartCount)) 30000
Thread.Sleep backoffMs
stopAndShutdown(); start()
and start () =
let r = topology |> RuntimeTopology.ofTopology startLog restart
restartCount <- 0 // reset on successful start
// activate...
and stopAndShutdown () = ...
start()
stopAndShutdown // returns the shutdown function
The restart function is passed as the onException handler to every Disruptor channel. When any component throws, the topology is torn down and rebuilt with increasing delay between attempts.
|
Key details:
Monitor.TryEnterprevents concurrent restart attempts — if a restart is already in progress, additional exceptions are logged as warnings and ignored- Exponential backoff:
min(1000 * 2^attempt, 30000)milliseconds between attempts restartCountresets to 0 after each successful start- After
maxRestarts(5) consecutive failures, the topology stays down runNoRestartusesEnvironment.Exit(1)instead, delegating restart to the process supervisor
Diagnostics
Self-hosted topologies emit diagnostics via standard .NET APIs:
OpenTelemetry integration
The FsShelter.Hosting namespace exposes an ActivitySource and a Meter for distributed tracing and metrics collection.
Tracing (ActivitySource: "FsShelter.Hosting"):
Activity |
Kind |
Tags |
Description |
|---|---|---|---|
|
Internal |
— |
Wraps each Disruptor event; propagates parent context via |
|
Producer |
|
Wraps spout dispatch (Next, Ack/Nack handling) |
|
Consumer |
|
Wraps bolt dispatch (Tuple processing) |
Metrics (Meter: "FsShelter.Hosting"):
Instrument |
Type |
Unit |
Description |
|---|---|---|---|
|
Counter |
— |
Tuples emitted by spouts |
|
Counter |
— |
Tuples acked (XOR tree completed) |
|
Counter |
— |
Tuples nacked (explicit failure) |
|
Counter |
— |
Tuples expired by acker bucket rotation |
|
Histogram |
ms |
Per-message processing time (tagged by |
|
UpDownCounter |
— |
Current number of unacked spout tuples |
Debug logging
TOPOLOGY_DEBUGflag can be set on individual components to selectively trace the traffic.- Setting the debug flag globally will also trace the system and acker components.
- Setting the flag will also trace the time spent in the body of your function.
Configuration
Topology-level options
Option |
Default |
Effect |
|---|---|---|
|
128 |
Maximum unacked tuples per spout task before backpressure kicks in |
|
4 |
Number of acker task instances (logical units with independent state) |
|
2 |
Number of acker executor threads (tasks are distributed round-robin) |
|
256 |
Base Disruptor ring buffer size (scaled by tasks per executor) |
|
30 |
Timeout for tuple completion; controls acker bucket rotation interval and drain window during shutdown |
|
100 |
Spout executor timeout: how often the spout wakes to poll for new tuples when idle |
|
false |
Enable trace-level logging with timing |
|
(none) |
Per-bolt tick tuple interval in seconds |
Component-level DSL
Function |
Applies to |
Default |
Effect |
|---|---|---|---|
|
Spout, Bolt |
1 |
Number of tasks (independent processing units with their own state) |
|
Spout, Bolt |
= Parallelism |
Number of executor threads; tasks are distributed round-robin across executors |
|
Spout, Bolt, Topology |
— |
Configuration overrides (component-level merged with topology-level) |
|
Bolt |
None |
Send this tuple to the bolt on activation |
|
Bolt |
None |
Send this tuple to the bolt on deactivation |
Example:
// 4 bolt tasks served by 2 executor threads (2 tasks per thread)
let counter = countWords
|> Bolt.run (fun _ _ t emit -> (t, emit))
|> withParallelism 4
|> withExecutors 2
Parameter relationships
The configuration parameters interact in important ways. Understanding these relationships helps with tuning.
Parallelism vs Executors:
Parallelismsets the number of tasks — independent logical units with their own state, pending counters, and handlers.Executorssets the number of threads serving those tasks. WhenExecutors < Parallelism, multiple tasks share a Disruptor thread and their messages are dispatched byTaskId.- The runtime enforces
Executors = min(Executors, Parallelism)— you can't have more threads than tasks. - Default is 1:1 (each task gets its own thread).
Ring buffer sizing:
The ring buffer per executor is: TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE * tasks_per_executor.
When sharing executors (e.g., 4 tasks on 2 executors), each executor's buffer is 256 * 2 = 512 slots. This automatic scaling prevents starvation when multiple tasks share a thread.
The ring buffer must also be large enough to hold TOPOLOGY_MAX_SPOUT_PENDING in-flight tuples. If maxPending exceeds the ring buffer capacity, spout publishes can block and deadlock the drain on shutdown.
Spout wait strategy:
TOPOLOGY_SLEEP_SPOUT_WAIT_STRATEGY_TIME_MS controls how often an idle spout wakes to call next. Lower values mean faster response to available work but higher CPU utilisation when idle. Values below ~10ms cause measurable overhead from frequent wake-ups with no throughput benefit.
Message timeout and shutdown:
TOPOLOGY_MESSAGE_TIMEOUT_SECS serves three roles:
- Acker bucket rotation — tuples older than this are expired
- Shutdown drain window — the system sleeps this long between stopping spouts and stopping bolts
- Restart backoff ceiling — indirectly affects how long restarts take
Lower values speed up shutdown and failure detection, but risk false-expiring tuples that are merely slow to process.
Acker capacity:
TOPOLOGY_ACKER_TASKS controls the number of independent acker state machines.
TOPOLOGY_ACKER_EXECUTORS controls the threads serving them.
Acker tasks are stateless relative to each other (each tracks a disjoint set of tuples via hash partitioning), so more tasks reduce lock contention. More executors reduce thread contention when acker throughput is the bottleneck.
Tuning summary
Goal |
Adjust |
|---|---|
Increase throughput (CPU-bound bolts) |
Increase bolt |
Reduce thread count |
Set |
Handle slow consumers |
Increase |
Faster failure detection |
Lower |
Reduce idle CPU |
Increase |
Handle high acker load |
Increase |
val int: value: 'T -> int (requires member op_Explicit)
--------------------
type int = int32
--------------------
type int<'Measure> = int
FsShelter