Defining reliable spouts
Processing guarantees are the biggest selling point of Storm, please see the official docs for the details. FsStorm implements reliability semantics with "housekeeper" functions: defaultHousekeeper can be used as is for transient sources or as an inspiration for reliability over external/persistent sources. The spout implementation is fairly similar to the "unreliable" version, with the addition of unique (int64) tuple ID:
1: 2: 3: 4: 5: 6: 7: 8: 9: 10: 11: 12: 13: 14: 15: 16: |
|
Anchoring and named streams
FsStorm has helper functions to emit to a named stream or to anchor a tuple:
1: 2: 3: 4: 5: 6: 7: 8: 9: 10: 11: 12: 13: 14: 15: 16: 17: |
|
Example of parametrization and use of the tuple's origin (component that emitted it and the stream that it arrived on) inspection:
1: 2: 3: 4: 5: 6: 7: 8: 9: 10: 11: 12: |
|
Topology with named streams and config overrides
1: 2: 3: 4: 5: 6: 7: 8: 9: 10: 11: 12: 13: 14: 15: 16: 17: 18: 19: 20: 21: 22: 23: 24: 25: 26: 27: 28: 29: 30: 31: |
|
val rnd : System.Random
Full name: Guaranteed.rnd
Full name: Guaranteed.rnd
namespace System
Multiple items
type Random =
new : unit -> Random + 1 overload
member Next : unit -> int + 2 overloads
member NextBytes : buffer:byte[] -> unit
member NextDouble : unit -> float
Full name: System.Random
--------------------
System.Random() : unit
System.Random(Seed: int) : unit
type Random =
new : unit -> Random + 1 overload
member Next : unit -> int + 2 overloads
member NextBytes : buffer:byte[] -> unit
member NextDouble : unit -> float
Full name: System.Random
--------------------
System.Random() : unit
System.Random(Seed: int) : unit
val spout : runner:((('a -> 'b -> unit) -> unit -> Async<unit>) -> 'c) -> cfg:'d -> 'c
Full name: Guaranteed.spout
cfg: the configution passed in by storm
runner: a spout runner function (passed in from topology)
Full name: Guaranteed.spout
cfg: the configution passed in by storm
runner: a spout runner function (passed in from topology)
val runner : ((('a -> 'b -> unit) -> unit -> Async<unit>) -> 'c)
val cfg : 'a
val count : int64 ref
Multiple items
val ref : value:'T -> 'T ref
Full name: Microsoft.FSharp.Core.Operators.ref
--------------------
type 'T ref = Ref<'T>
Full name: Microsoft.FSharp.Core.ref<_>
val ref : value:'T -> 'T ref
Full name: Microsoft.FSharp.Core.Operators.ref
--------------------
type 'T ref = Ref<'T>
Full name: Microsoft.FSharp.Core.ref<_>
val next : (('a -> 'b -> unit) -> unit -> Async<unit>)
val emit : ('a -> 'b -> unit)
val async : AsyncBuilder
Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.async
Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.async
System.Random.Next() : int
System.Random.Next(maxValue: int) : int
System.Random.Next(minValue: int, maxValue: int) : int
System.Random.Next(maxValue: int) : int
System.Random.Next(minValue: int, maxValue: int) : int
Ref.contents: int64
val addOneBolt : runner:((obj -> Async<unit>) -> 'a) -> emit:('b -> unit) -> cfg:'c -> 'a
Full name: Guaranteed.addOneBolt
cfg: the configuration passed in from Storm
runner: passed in from topology
emit: passed in from topology
Full name: Guaranteed.addOneBolt
cfg: the configuration passed in from Storm
runner: passed in from topology
emit: passed in from topology
val runner : ((obj -> Async<unit>) -> 'a)
val emit : ('a -> unit)
val add : (obj -> Async<unit>)
val msg : obj
val x : int
val resultBolt : runner:((obj -> Async<unit>) -> 'a) -> log:('b -> string -> unit) -> cfg:'c -> 'a
Full name: Guaranteed.resultBolt
cfg: the configuration passed in by Storm
runner: passed in from topology
log: log write
Full name: Guaranteed.resultBolt
cfg: the configuration passed in by Storm
runner: passed in from topology
log: log write
val log : ('a -> string -> unit)
val desc : 'a
val logResult : (obj -> Async<unit>)
val sprintf : format:Printf.StringFormat<'T> -> 'T
Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.sprintf
Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.sprintf
val topology : 'a
Full name: Guaranteed.topology
Full name: Guaranteed.topology
val log : value:'T -> 'T (requires member Log)
Full name: Microsoft.FSharp.Core.Operators.log
Full name: Microsoft.FSharp.Core.Operators.log