Architecture
This page describes the internal structure of the FsShelter self-hosting runtime — how topologies are compiled into tasks, how messages flow through Disruptor ring buffers, and how the lifecycle is managed.
For a higher-level introduction, see Core Concepts. For configuration and entry points, see Running Topologies.
RuntimeTopology structure
When a topology is hosted, it is compiled into a RuntimeTopology<'t> record:
type RuntimeTopology<'t> =
{ systemTask : TaskId * Channel<TaskMsg<'t, unit>>
ackerTasks : Map<TaskId, Channel<TaskMsg<'t, AckerMsg>>>
spoutTasks : Map<TaskId, ComponentId * Channel<TaskMsg<'t, InCommand<'t>>>>
boltTasks : Map<TaskId, ComponentId * Channel<TaskMsg<'t, InCommand<'t>>>> }
Each task has a unique TaskId (sequential integer) and a Channel<'msg> which is a (Send<'msg> * Shutdown) pair — a publish function and a halt function for the underlying Disruptor.
Tasks and executors
A task is a logical processing unit with independent state. An executor is a Disruptor thread that serves one or more tasks. Multiple tasks sharing an executor have their messages dispatched by TaskId embedded in each Envelope.
Task |
Count |
Executors |
Channel Type |
Purpose |
|---|---|---|---|---|
System |
1 |
1 |
|
Manages timer callbacks for tick tuples |
Acker |
|
|
|
XOR-tree tracking for guaranteed delivery |
Spout |
Per-component |
|
|
Message generation with backpressure |
Bolt |
Per-component |
|
|
Message processing and downstream emission |
Tasks are distributed across executors round-robin via groupIntoExecutors. Each task within an executor maintains its own independent handler, dispatcher, and state (e.g. pending counter for spouts, acker buckets for ackers).
Message wrapper
All task messages are wrapped in TaskMsg<'t, 'msg> (a struct DU to avoid allocation on the hot path):
[<Struct>]
type TaskMsg<'t, 'msg> =
| Start of rtt:RuntimeTopology<'t> // Receive topology reference
| Stop // Graceful shutdown
| Tick // Timer-driven heartbeat
| Other of msg:'msg // Domain-specific message
Channel infrastructure
Each executor runs on a Disruptor ring buffer. Messages carry a TaskId in their Envelope, and the consumer dispatches to the correct per-task handler via an array lookup table.
Channel.startExecutor (ackers, bolts)
Multiple tasks share a single Disruptor ring buffer. Each published message includes a TaskId via publishWithTaskId. The consumer looks up the handler by envelope.TaskId and invokes it.
Channel.startExecutorWithTimeout (spouts)
Same as startExecutor but with TimeoutBlockingWaitStrategy (100ms timeout). When no message arrives within the timeout window, a combined onTimeout function fires, calling each spout task's onTimeout to issue additional Next requests.
// Executor channel — multiple tasks sharing one ring buffer
let startExecutor ringSize onException (tasks: (TaskId * handler) array) =
// Builds taskIndex dictionary for O(1) dispatch
// Returns: (send: TaskId -> Send<'msg>), halt
// Executor channel with timeout — for spouts
let startExecutorWithTimeout ringSize onException (tasks: (TaskId * handler) array) onTimeout =
// Same dispatch + timeout fires onTimeout()
// Returns: (send: TaskId -> Send<'msg>), halt
The system task uses the basic Channel.start (single handler, no executor dispatch).
The ring buffer size per executor scales with the number of tasks it serves: TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE * tasksPerExecutor (base default 256).
Spouts and bolts send directly to downstream Disruptor ring buffers via typed send functions — no intermediate buffering or boxing.
Topology construction
RuntimeTopology.ofTopology builds the runtime from a topology definition:
|
Task lifecycle
After construction, the topology goes through activation, processing, and shutdown phases:
|
Activation order
RuntimeTopology.activate sends Start rtt to each task, giving it a reference to the full runtime so it can route messages to other tasks:
- Ackers first — must be ready before spouts emit tracked tuples
- Bolts second — must be ready before spouts emit routed tuples
- Spouts third — begin generating messages
- System last — starts timer callbacks that drive tick tuples
Shutdown order
RuntimeTopology.stop sends Stop with a drain window between spouts and bolts:
- System — stop timers
- Spouts — stop generating messages (deactivate)
- Sleep(timeout) — allow in-flight tuples to drain through the bolt DAG
- Bolts — stop processing (deactivate)
- Ackers — stop tracking (last, so late acks can still be processed)
Then RuntimeTopology.shutdown halts all Disruptor instances. When multiple tasks share an executor, the shared Disruptor is halted only once (tracked via a HashSet<Shutdown>).
Component wrappers
Spout wrapper (mkSpout)
Wraps a Runnable<'t> into a (handler, onTimeout) pair:
-
handlerprocessesTaskMsgvalues:- On
Start rtt: creates output function (routing + acker tracking), callsActivate, issues initialNextrequests up tomaxPending - On
Other(Ack _)/Other(Nack _): decrementspending, issuesNext, forwards to dispatcher - On
Stop: sendsDeactivate, clears dispatcher
- On
onTimeoutis called when the Disruptor 100ms timeout fires: issues anotherNextif under the pending limit
Bolt wrapper (mkBolt)
Wraps a Runnable<'t> into a handler function:
- On
Start rtt: creates output function (routing + anchoring + ack/nack), callsActivate - On
Other(Tuple ...): forwards to dispatcher for processing - On
Stop: sendsDeactivate, clears dispatcher
System task (mkSystem)
Manages System.Threading.Timer instances:
- Spout/Acker tick: every 30 seconds, sends
Tickto all spouts and ackers - Bolt tick tuples: for bolts with
TOPOLOGY_TICK_TUPLE_FREQ_SECSconfigured, sends a__ticktuple at the configured interval - On
Stop: disposes all timers
Component interaction overview
|
Solid arrows = data tuples. Dashed arrows = acker protocol messages.
See also
- Message flow — End-to-end processing scenarios with sequence diagrams
- Acker algorithm — XOR-tree tuple tracking for guaranteed delivery
- Routing — Grouping strategies and stream-based fan-out
module Map from Microsoft.FSharp.Collections
--------------------
type Map<'Key,'Value (requires comparison)> = interface IReadOnlyDictionary<'Key,'Value> interface IReadOnlyCollection<KeyValuePair<'Key,'Value>> interface IEnumerable interface IStructuralEquatable interface IComparable interface IEnumerable<KeyValuePair<'Key,'Value>> interface ICollection<KeyValuePair<'Key,'Value>> interface IDictionary<'Key,'Value> new: elements: ('Key * 'Value) seq -> Map<'Key,'Value> member Add: key: 'Key * value: 'Value -> Map<'Key,'Value> ...
--------------------
new: elements: ('Key * 'Value) seq -> Map<'Key,'Value>
type StructAttribute = inherit Attribute new: unit -> StructAttribute
--------------------
new: unit -> StructAttribute
FsShelter