FsStorm


Implementing custom reliability semantics

The reliable spout implementation for a custom source, like a queue (RabbitMQ, Kafka, etc) needs to obtain the event id from the source and forward Storm's acks and nacks to the source, which could be accomplished in FsStorm with:

a housekeeper

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
/// Maintains reliability semantics of a queueSpout.
/// ack: thread-safe method to ack a message by id.
/// nack: thread-safe method to nack a message by id.
let createQueueHousekeeper ack nack = 
	fun (msg:Json) ->
		match msg?command.Val with
		| "ack" -> ack (int64 (msgId msg))
		| "fail" -> nack (int64 (msgId msg))
		| _ -> ()
	

and custom runner

1: 
2: 
3: 
4: 
5: 
/// Partially configures a reliable runner for the queues.
/// ack: thread-safe method to ack a message by id.
/// nack: thread-safe method to nack a message by id.
let createQueueRunner ack nack = Storm.reliableSpoutRunner 
                                        (createQueueHousekeeper (uint64 >> ack) (uint64 >> nack))

then given an event type

1: 
type Event<'m> = { msg:'m; id:uint64 }

we can declare a spout factory like this:

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
/// Reads message from a queue into a tuple stream.
/// get: thread-safe event consumer.
/// toTuple: message object to Json tuple converter.
let createQueueSpout get toTuple emit = 
	fun () -> 
		async { 
			match get() with
			| Some evt -> toTuple evt.msg |> emit (int64 evt.id)
			| None -> do ()
		}

and a queue/consumer interface

1: 
2: 
3: 
4: 
5: 
6: 
/// this is the consumer interface into the queue
/// that returns three functions: get:unit->Event, ack:uint64->unit and nack:unit64->unit
let getConsumer cfg = 
	((fun () -> { msg = 1b; id = 0ul }), //get the next message and its id
	 (fun id -> ()), // forward the ack to the queue
	 (fun id -> ())) // forward the nack to the queue

we can implement the actual spout like this:

1: 
2: 
3: 
4: 
5: 
6: 
let statusEvents runner getConsumer (cfg : Configuration) =
    let toTuple (id:string,status:byte) = 
        tuple [ id ] 
        |> namedStream (match char status with | '1' -> "online" | _ -> "offline")
    let get, ack, nack = getConsumer cfg
    createQueueSpout get toTuple |> runner ack nack

Passing the createQueueRunner from the topology will tie all the pieces together and the ids, acks and nacks should start flowing from the source and back.

val createQueueHousekeeper : ack:(int64 -> unit) -> nack:(int64 -> unit) -> msg:obj -> unit

Full name: Custom.createQueueHousekeeper


 Maintains reliability semantics of a queueSpout.
 ack: thread-safe method to ack a message by id.
 nack: thread-safe method to nack a message by id.
val ack : (int64 -> unit)
val nack : (int64 -> unit)
val msg : obj
Multiple items
val int64 : value:'T -> int64 (requires member op_Explicit)

Full name: Microsoft.FSharp.Core.Operators.int64

--------------------
type int64 = System.Int64

Full name: Microsoft.FSharp.Core.int64

--------------------
type int64<'Measure> = int64

Full name: Microsoft.FSharp.Core.int64<_>
val createQueueRunner : ack:'a -> nack:'b -> 'c

Full name: Custom.createQueueRunner


 Partially configures a reliable runner for the queues.
 ack: thread-safe method to ack a message by id.
 nack: thread-safe method to nack a message by id.
val ack : 'a
val nack : 'b
Multiple items
val uint64 : value:'T -> uint64 (requires member op_Explicit)

Full name: Microsoft.FSharp.Core.Operators.uint64

--------------------
type uint64 = System.UInt64

Full name: Microsoft.FSharp.Core.uint64
Multiple items
module Event

from Microsoft.FSharp.Control

--------------------
type Event<'m> =
  {msg: 'm;
   id: uint64;}

Full name: Custom.Event<_>

--------------------
type Event<'Delegate,'Args (requires delegate and 'Delegate :> Delegate)> =
  new : unit -> Event<'Delegate,'Args>
  member Trigger : sender:obj * args:'Args -> unit
  member Publish : IEvent<'Delegate,'Args>

Full name: Microsoft.FSharp.Control.Event<_,_>

--------------------
new : unit -> Event<'Delegate,'Args>
Event.msg: 'm
Event.id: uint64
val createQueueSpout : get:(unit -> Event<'a> option) -> toTuple:('a -> 'b) -> emit:(int64 -> 'b -> unit) -> unit -> Async<unit>

Full name: Custom.createQueueSpout


 Reads message from a queue into a tuple stream.
 get: thread-safe event consumer.
 toTuple: message object to Json tuple converter.
val get : (unit -> Event<'a> option)
val toTuple : ('a -> 'b)
val emit : (int64 -> 'b -> unit)
val async : AsyncBuilder

Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.async
union case Option.Some: Value: 'T -> Option<'T>
val evt : Event<'a>
Event.msg: 'a
union case Option.None: Option<'T>
val getConsumer : cfg:'a -> (unit -> Event<int>) * ('b -> unit) * ('c -> unit)

Full name: Custom.getConsumer


 this is the consumer interface into the queue
 that returns three functions: get:unit->Event, ack:uint64->unit and nack:unit64->unit
val cfg : 'a
val id : x:'T -> 'T

Full name: Microsoft.FSharp.Core.Operators.id
val id : 'b
val id : 'c
val statusEvents : runner:('a -> 'b -> ((int64 -> 'c -> unit) -> unit -> Async<unit>) -> 'd) -> getConsumer:('e -> (unit -> Event<string * byte> option) * 'a * 'b) -> cfg:'e -> 'd

Full name: Custom.statusEvents
val runner : ('a -> 'b -> ((int64 -> 'c -> unit) -> unit -> Async<unit>) -> 'd)
val getConsumer : ('e -> (unit -> Event<string * byte> option) * 'a * 'b)
val cfg : 'e
val toTuple : (string * byte -> 'f)
val id : string
Multiple items
val string : value:'T -> string

Full name: Microsoft.FSharp.Core.Operators.string

--------------------
type string = System.String

Full name: Microsoft.FSharp.Core.string
val status : byte
Multiple items
val byte : value:'T -> byte (requires member op_Explicit)

Full name: Microsoft.FSharp.Core.Operators.byte

--------------------
type byte = System.Byte

Full name: Microsoft.FSharp.Core.byte
Multiple items
val char : value:'T -> char (requires member op_Explicit)

Full name: Microsoft.FSharp.Core.Operators.char

--------------------
type char = System.Char

Full name: Microsoft.FSharp.Core.char
val get : (unit -> Event<string * byte> option)
Fork me on GitHub