Implementing custom reliability semantics
The reliable spout implementation for a custom source, like a queue (RabbitMQ, Kafka, etc) needs to obtain the event id from the source and forward Storm's acks and nacks to the source, which could be accomplished in FsStorm with:
a housekeeper
1: 2: 3: 4: 5: 6: 7: 8: 9: 10: |
|
and custom runner
1: 2: 3: 4: 5: |
|
then given an event type
1:
|
|
we can declare a spout factory like this:
1: 2: 3: 4: 5: 6: 7: 8: 9: 10: |
|
and a queue/consumer interface
1: 2: 3: 4: 5: 6: |
|
we can implement the actual spout like this:
1: 2: 3: 4: 5: 6: |
|
Passing the createQueueRunner from the topology will tie all the pieces together and the ids, acks and nacks should start flowing from the source and back.
val createQueueHousekeeper : ack:(int64 -> unit) -> nack:(int64 -> unit) -> msg:obj -> unit
Full name: Custom.createQueueHousekeeper
Maintains reliability semantics of a queueSpout.
ack: thread-safe method to ack a message by id.
nack: thread-safe method to nack a message by id.
Full name: Custom.createQueueHousekeeper
Maintains reliability semantics of a queueSpout.
ack: thread-safe method to ack a message by id.
nack: thread-safe method to nack a message by id.
val ack : (int64 -> unit)
val nack : (int64 -> unit)
val msg : obj
Multiple items
val int64 : value:'T -> int64 (requires member op_Explicit)
Full name: Microsoft.FSharp.Core.Operators.int64
--------------------
type int64 = System.Int64
Full name: Microsoft.FSharp.Core.int64
--------------------
type int64<'Measure> = int64
Full name: Microsoft.FSharp.Core.int64<_>
val int64 : value:'T -> int64 (requires member op_Explicit)
Full name: Microsoft.FSharp.Core.Operators.int64
--------------------
type int64 = System.Int64
Full name: Microsoft.FSharp.Core.int64
--------------------
type int64<'Measure> = int64
Full name: Microsoft.FSharp.Core.int64<_>
val createQueueRunner : ack:'a -> nack:'b -> 'c
Full name: Custom.createQueueRunner
Partially configures a reliable runner for the queues.
ack: thread-safe method to ack a message by id.
nack: thread-safe method to nack a message by id.
Full name: Custom.createQueueRunner
Partially configures a reliable runner for the queues.
ack: thread-safe method to ack a message by id.
nack: thread-safe method to nack a message by id.
val ack : 'a
val nack : 'b
Multiple items
val uint64 : value:'T -> uint64 (requires member op_Explicit)
Full name: Microsoft.FSharp.Core.Operators.uint64
--------------------
type uint64 = System.UInt64
Full name: Microsoft.FSharp.Core.uint64
val uint64 : value:'T -> uint64 (requires member op_Explicit)
Full name: Microsoft.FSharp.Core.Operators.uint64
--------------------
type uint64 = System.UInt64
Full name: Microsoft.FSharp.Core.uint64
Multiple items
module Event
from Microsoft.FSharp.Control
--------------------
type Event<'m> =
{msg: 'm;
id: uint64;}
Full name: Custom.Event<_>
--------------------
type Event<'Delegate,'Args (requires delegate and 'Delegate :> Delegate)> =
new : unit -> Event<'Delegate,'Args>
member Trigger : sender:obj * args:'Args -> unit
member Publish : IEvent<'Delegate,'Args>
Full name: Microsoft.FSharp.Control.Event<_,_>
--------------------
new : unit -> Event<'Delegate,'Args>
module Event
from Microsoft.FSharp.Control
--------------------
type Event<'m> =
{msg: 'm;
id: uint64;}
Full name: Custom.Event<_>
--------------------
type Event<'Delegate,'Args (requires delegate and 'Delegate :> Delegate)> =
new : unit -> Event<'Delegate,'Args>
member Trigger : sender:obj * args:'Args -> unit
member Publish : IEvent<'Delegate,'Args>
Full name: Microsoft.FSharp.Control.Event<_,_>
--------------------
new : unit -> Event<'Delegate,'Args>
Event.msg: 'm
Event.id: uint64
val createQueueSpout : get:(unit -> Event<'a> option) -> toTuple:('a -> 'b) -> emit:(int64 -> 'b -> unit) -> unit -> Async<unit>
Full name: Custom.createQueueSpout
Reads message from a queue into a tuple stream.
get: thread-safe event consumer.
toTuple: message object to Json tuple converter.
Full name: Custom.createQueueSpout
Reads message from a queue into a tuple stream.
get: thread-safe event consumer.
toTuple: message object to Json tuple converter.
val get : (unit -> Event<'a> option)
val toTuple : ('a -> 'b)
val emit : (int64 -> 'b -> unit)
val async : AsyncBuilder
Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.async
Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.async
union case Option.Some: Value: 'T -> Option<'T>
val evt : Event<'a>
Event.msg: 'a
union case Option.None: Option<'T>
val getConsumer : cfg:'a -> (unit -> Event<int>) * ('b -> unit) * ('c -> unit)
Full name: Custom.getConsumer
this is the consumer interface into the queue
that returns three functions: get:unit->Event, ack:uint64->unit and nack:unit64->unit
Full name: Custom.getConsumer
this is the consumer interface into the queue
that returns three functions: get:unit->Event, ack:uint64->unit and nack:unit64->unit
val cfg : 'a
val id : x:'T -> 'T
Full name: Microsoft.FSharp.Core.Operators.id
Full name: Microsoft.FSharp.Core.Operators.id
val id : 'b
val id : 'c
val statusEvents : runner:('a -> 'b -> ((int64 -> 'c -> unit) -> unit -> Async<unit>) -> 'd) -> getConsumer:('e -> (unit -> Event<string * byte> option) * 'a * 'b) -> cfg:'e -> 'd
Full name: Custom.statusEvents
Full name: Custom.statusEvents
val runner : ('a -> 'b -> ((int64 -> 'c -> unit) -> unit -> Async<unit>) -> 'd)
val getConsumer : ('e -> (unit -> Event<string * byte> option) * 'a * 'b)
val cfg : 'e
val toTuple : (string * byte -> 'f)
val id : string
Multiple items
val string : value:'T -> string
Full name: Microsoft.FSharp.Core.Operators.string
--------------------
type string = System.String
Full name: Microsoft.FSharp.Core.string
val string : value:'T -> string
Full name: Microsoft.FSharp.Core.Operators.string
--------------------
type string = System.String
Full name: Microsoft.FSharp.Core.string
val status : byte
Multiple items
val byte : value:'T -> byte (requires member op_Explicit)
Full name: Microsoft.FSharp.Core.Operators.byte
--------------------
type byte = System.Byte
Full name: Microsoft.FSharp.Core.byte
val byte : value:'T -> byte (requires member op_Explicit)
Full name: Microsoft.FSharp.Core.Operators.byte
--------------------
type byte = System.Byte
Full name: Microsoft.FSharp.Core.byte
Multiple items
val char : value:'T -> char (requires member op_Explicit)
Full name: Microsoft.FSharp.Core.Operators.char
--------------------
type char = System.Char
Full name: Microsoft.FSharp.Core.char
val char : value:'T -> char (requires member op_Explicit)
Full name: Microsoft.FSharp.Core.Operators.char
--------------------
type char = System.Char
Full name: Microsoft.FSharp.Core.char
val get : (unit -> Event<string * byte> option)