Defining unreliable spouts
FsStorm spouts can be implemented as "reliable" or "unreliable". Unreliable spouts implementation can be as simple as a single function:
1: 2: 3: 4: 5: 6: 7: 8: 9: 10: |
|
Defining bolts
Example of a FsStorm bolt that reads a tuple and emits another one:
1: 2: 3: 4: 5: 6: 7: 8: 9: 10: 11: 12: 13: 14: 15: 16: |
|
And a terminating bolt that reads a tuple, but doesn't emit anything:
1: 2: 3: 4: 5: 6: 7: 8: 9: 10: |
|
Using F# DSL to define the topology
Topologies are defined using a strong-typed DSL:
1: 2: 3: 4: 5: 6: 7: 8: 9: 10: 11: 12: 13: 14: 15: 16: 17: 18: 19: 20: 21: 22: 23: |
|
Unit-testing the components
Several runners are implemented to facilitate unit-testing in StormTest module:
1: 2: 3: 4: 5: 6: 7: 8: 9: 10: 11: 12: 13: 14: 15: 16: 17: 18: 19: 20: 21: 22: 23: 24: 25: 26: 27: 28: 29: 30: 31: 32: 33: 34: 35: 36: 37: 38: 39: |
|
val rnd : System.Random
Full name: Sample.rnd
Full name: Sample.rnd
namespace System
Multiple items
type Random =
new : unit -> Random + 1 overload
member Next : unit -> int + 2 overloads
member NextBytes : buffer:byte[] -> unit
member NextDouble : unit -> float
Full name: System.Random
--------------------
System.Random() : unit
System.Random(Seed: int) : unit
type Random =
new : unit -> Random + 1 overload
member Next : unit -> int + 2 overloads
member NextBytes : buffer:byte[] -> unit
member NextDouble : unit -> float
Full name: System.Random
--------------------
System.Random() : unit
System.Random(Seed: int) : unit
val spout : runner:((('a -> unit) -> unit -> Async<unit>) -> 'b) -> cfg:'c -> 'b
Full name: Sample.spout
cfg: the configution passed in by storm
runner: a spout runner function (passed in from topology)
Full name: Sample.spout
cfg: the configution passed in by storm
runner: a spout runner function (passed in from topology)
val runner : ((('a -> unit) -> unit -> Async<unit>) -> 'b)
val cfg : 'c
val createEmitter : (('d -> unit) -> unit -> Async<unit>)
val emit : ('d -> unit)
val async : AsyncBuilder
Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.async
Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.async
System.Random.Next() : int
System.Random.Next(maxValue: int) : int
System.Random.Next(minValue: int, maxValue: int) : int
System.Random.Next(maxValue: int) : int
System.Random.Next(minValue: int, maxValue: int) : int
val addOneBolt : runner:(('a -> Async<unit>) -> 'b) -> log:(string -> string -> unit) -> emit:('c -> unit) -> cfg:'d -> 'b
Full name: Sample.addOneBolt
cfg: the configuration passed in from Storm
runner: passed in from topology
log: passed in from topology
emit: passed in from topology
Full name: Sample.addOneBolt
cfg: the configuration passed in from Storm
runner: passed in from topology
log: passed in from topology
emit: passed in from topology
val runner : (('a -> Async<unit>) -> 'b)
val log : (string -> string -> unit)
val emit : ('c -> unit)
val cfg : 'd
val createAdder : ('e -> Async<unit>)
val msg : 'e
val sprintf : format:Printf.StringFormat<'T> -> 'T
Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.sprintf
Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.sprintf
val resultBolt : runner:((obj -> Async<unit>) -> 'a) -> log:(string -> string -> unit) -> cfg:'b -> 'a
Full name: Sample.resultBolt
cfg: the configution passed in by storm
runner: passed in from topology
log: passed in from topology
Full name: Sample.resultBolt
cfg: the configution passed in by storm
runner: passed in from topology
log: passed in from topology
val runner : ((obj -> Async<unit>) -> 'a)
val cfg : 'b
val createReader : (obj -> Async<unit>)
val msg : obj
val topology : obj
Full name: Sample.topology
Full name: Sample.topology
val log : value:'T -> 'T (requires member Log)
Full name: Microsoft.FSharp.Core.Operators.log
Full name: Microsoft.FSharp.Core.Operators.log
val conf : obj
Full name: Sample.conf
Full name: Sample.conf
val ( spout emits ) : unit -> unit
Full name: Sample.( spout emits )
Full name: Sample.( spout emits )
val results : obj list ref
Multiple items
val ref : value:'T -> 'T ref
Full name: Microsoft.FSharp.Core.Operators.ref
--------------------
type 'T ref = Ref<'T>
Full name: Microsoft.FSharp.Core.ref<_>
val ref : value:'T -> 'T ref
Full name: Microsoft.FSharp.Core.Operators.ref
--------------------
type 'T ref = Ref<'T>
Full name: Microsoft.FSharp.Core.ref<_>
val s : Async<unit>
property Ref.Value: obj list
Multiple items
type Async
static member AsBeginEnd : computation:('Arg -> Async<'T>) -> ('Arg * AsyncCallback * obj -> IAsyncResult) * (IAsyncResult -> 'T) * (IAsyncResult -> unit)
static member AwaitEvent : event:IEvent<'Del,'T> * ?cancelAction:(unit -> unit) -> Async<'T> (requires delegate and 'Del :> Delegate)
static member AwaitIAsyncResult : iar:IAsyncResult * ?millisecondsTimeout:int -> Async<bool>
static member AwaitTask : task:Task -> Async<unit>
static member AwaitTask : task:Task<'T> -> Async<'T>
static member AwaitWaitHandle : waitHandle:WaitHandle * ?millisecondsTimeout:int -> Async<bool>
static member CancelDefaultToken : unit -> unit
static member Catch : computation:Async<'T> -> Async<Choice<'T,exn>>
static member FromBeginEnd : beginAction:(AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg:'Arg1 * beginAction:('Arg1 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg1:'Arg1 * arg2:'Arg2 * beginAction:('Arg1 * 'Arg2 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg1:'Arg1 * arg2:'Arg2 * arg3:'Arg3 * beginAction:('Arg1 * 'Arg2 * 'Arg3 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromContinuations : callback:(('T -> unit) * (exn -> unit) * (OperationCanceledException -> unit) -> unit) -> Async<'T>
static member Ignore : computation:Async<'T> -> Async<unit>
static member OnCancel : interruption:(unit -> unit) -> Async<IDisposable>
static member Parallel : computations:seq<Async<'T>> -> Async<'T []>
static member RunSynchronously : computation:Async<'T> * ?timeout:int * ?cancellationToken:CancellationToken -> 'T
static member Sleep : millisecondsDueTime:int -> Async<unit>
static member Start : computation:Async<unit> * ?cancellationToken:CancellationToken -> unit
static member StartAsTask : computation:Async<'T> * ?taskCreationOptions:TaskCreationOptions * ?cancellationToken:CancellationToken -> Task<'T>
static member StartChild : computation:Async<'T> * ?millisecondsTimeout:int -> Async<Async<'T>>
static member StartChildAsTask : computation:Async<'T> * ?taskCreationOptions:TaskCreationOptions -> Async<Task<'T>>
static member StartImmediate : computation:Async<unit> * ?cancellationToken:CancellationToken -> unit
static member StartWithContinuations : computation:Async<'T> * continuation:('T -> unit) * exceptionContinuation:(exn -> unit) * cancellationContinuation:(OperationCanceledException -> unit) * ?cancellationToken:CancellationToken -> unit
static member SwitchToContext : syncContext:SynchronizationContext -> Async<unit>
static member SwitchToNewThread : unit -> Async<unit>
static member SwitchToThreadPool : unit -> Async<unit>
static member TryCancelled : computation:Async<'T> * compensation:(OperationCanceledException -> unit) -> Async<'T>
static member CancellationToken : Async<CancellationToken>
static member DefaultCancellationToken : CancellationToken
Full name: Microsoft.FSharp.Control.Async
--------------------
type Async<'T>
Full name: Microsoft.FSharp.Control.Async<_>
type Async
static member AsBeginEnd : computation:('Arg -> Async<'T>) -> ('Arg * AsyncCallback * obj -> IAsyncResult) * (IAsyncResult -> 'T) * (IAsyncResult -> unit)
static member AwaitEvent : event:IEvent<'Del,'T> * ?cancelAction:(unit -> unit) -> Async<'T> (requires delegate and 'Del :> Delegate)
static member AwaitIAsyncResult : iar:IAsyncResult * ?millisecondsTimeout:int -> Async<bool>
static member AwaitTask : task:Task -> Async<unit>
static member AwaitTask : task:Task<'T> -> Async<'T>
static member AwaitWaitHandle : waitHandle:WaitHandle * ?millisecondsTimeout:int -> Async<bool>
static member CancelDefaultToken : unit -> unit
static member Catch : computation:Async<'T> -> Async<Choice<'T,exn>>
static member FromBeginEnd : beginAction:(AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg:'Arg1 * beginAction:('Arg1 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg1:'Arg1 * arg2:'Arg2 * beginAction:('Arg1 * 'Arg2 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg1:'Arg1 * arg2:'Arg2 * arg3:'Arg3 * beginAction:('Arg1 * 'Arg2 * 'Arg3 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromContinuations : callback:(('T -> unit) * (exn -> unit) * (OperationCanceledException -> unit) -> unit) -> Async<'T>
static member Ignore : computation:Async<'T> -> Async<unit>
static member OnCancel : interruption:(unit -> unit) -> Async<IDisposable>
static member Parallel : computations:seq<Async<'T>> -> Async<'T []>
static member RunSynchronously : computation:Async<'T> * ?timeout:int * ?cancellationToken:CancellationToken -> 'T
static member Sleep : millisecondsDueTime:int -> Async<unit>
static member Start : computation:Async<unit> * ?cancellationToken:CancellationToken -> unit
static member StartAsTask : computation:Async<'T> * ?taskCreationOptions:TaskCreationOptions * ?cancellationToken:CancellationToken -> Task<'T>
static member StartChild : computation:Async<'T> * ?millisecondsTimeout:int -> Async<Async<'T>>
static member StartChildAsTask : computation:Async<'T> * ?taskCreationOptions:TaskCreationOptions -> Async<Task<'T>>
static member StartImmediate : computation:Async<unit> * ?cancellationToken:CancellationToken -> unit
static member StartWithContinuations : computation:Async<'T> * continuation:('T -> unit) * exceptionContinuation:(exn -> unit) * cancellationContinuation:(OperationCanceledException -> unit) * ?cancellationToken:CancellationToken -> unit
static member SwitchToContext : syncContext:SynchronizationContext -> Async<unit>
static member SwitchToNewThread : unit -> Async<unit>
static member SwitchToThreadPool : unit -> Async<unit>
static member TryCancelled : computation:Async<'T> * compensation:(OperationCanceledException -> unit) -> Async<'T>
static member CancellationToken : Async<CancellationToken>
static member DefaultCancellationToken : CancellationToken
Full name: Microsoft.FSharp.Control.Async
--------------------
type Async<'T>
Full name: Microsoft.FSharp.Control.Async<_>
static member Async.RunSynchronously : computation:Async<'T> * ?timeout:int * ?cancellationToken:Threading.CancellationToken -> 'T
val x : obj
val y : obj
val ( bolt adds ) : unit -> 'a
Full name: Sample.( bolt adds )
Full name: Sample.( bolt adds )
val b : Async<unit>
type Console =
static member BackgroundColor : ConsoleColor with get, set
static member Beep : unit -> unit + 1 overload
static member BufferHeight : int with get, set
static member BufferWidth : int with get, set
static member CapsLock : bool
static member Clear : unit -> unit
static member CursorLeft : int with get, set
static member CursorSize : int with get, set
static member CursorTop : int with get, set
static member CursorVisible : bool with get, set
...
Full name: System.Console
static member BackgroundColor : ConsoleColor with get, set
static member Beep : unit -> unit + 1 overload
static member BufferHeight : int with get, set
static member BufferWidth : int with get, set
static member CapsLock : bool
static member Clear : unit -> unit
static member CursorLeft : int with get, set
static member CursorSize : int with get, set
static member CursorTop : int with get, set
static member CursorVisible : bool with get, set
...
Full name: System.Console
Console.WriteLine() : unit
(+0 other overloads)
Console.WriteLine(value: string) : unit
(+0 other overloads)
Console.WriteLine(value: obj) : unit
(+0 other overloads)
Console.WriteLine(value: uint64) : unit
(+0 other overloads)
Console.WriteLine(value: int64) : unit
(+0 other overloads)
Console.WriteLine(value: uint32) : unit
(+0 other overloads)
Console.WriteLine(value: int) : unit
(+0 other overloads)
Console.WriteLine(value: float32) : unit
(+0 other overloads)
Console.WriteLine(value: float) : unit
(+0 other overloads)
Console.WriteLine(value: decimal) : unit
(+0 other overloads)
(+0 other overloads)
Console.WriteLine(value: string) : unit
(+0 other overloads)
Console.WriteLine(value: obj) : unit
(+0 other overloads)
Console.WriteLine(value: uint64) : unit
(+0 other overloads)
Console.WriteLine(value: int64) : unit
(+0 other overloads)
Console.WriteLine(value: uint32) : unit
(+0 other overloads)
Console.WriteLine(value: int) : unit
(+0 other overloads)
Console.WriteLine(value: float32) : unit
(+0 other overloads)
Console.WriteLine(value: float) : unit
(+0 other overloads)
Console.WriteLine(value: decimal) : unit
(+0 other overloads)
val tag : string
val desc : string
val t : obj
val ( bolt consumes ) : unit -> 'a
Full name: Sample.( bolt consumes )
Full name: Sample.( bolt consumes )
val results : (string * string) list ref
property Ref.Value: (string * string) list
val x : string * string