FsShelter.Extras


Bootstrapping the components

The component constructors are defined in a way that delays instantiation (and consequently environmental requirements) till the moment it's actually needed. What it means is that when implemented correctly, configuration, runtime dependencies, database availability, etc are not demanded when we are just inspecting the topology. For example: to document the topology or submit it for execution. Only when Storm brings it up for execution do we need to have everything in place and available, hence the bootstrap parametrization.

Exmaple

Messaging

Let's boostrap EventStreams consumers and publishers with Json assembly:

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
11: 
12: 
13: 
14: 
15: 
16: 
17: 
18: 
19: 
20: 
21: 
22: 
23: 
24: 
25: 
26: 
27: 
open FsBunny
open FsShelter.Extras.Assemblers

/// DAL helpers
module RabbitMq =
    open System
    open RabbitMQ.Client
    let mkEventStreams (connectionString:string) =
        let uri = Uri(connectionString)
        let cf = ConnectionFactory(Uri = uri)
        RabbitMqEventStreams(cf, "amq.topic", 3us, 1000us)

let rabbit = "amqp://localhost"

/// Create a consumer based on the arguments.
let withQueueConsumer topic queue log cfg = 
    let eventStreams = RabbitMq.mkEventStreams rabbit :> EventStreams
    eventStreams.GetJsonConsumer (Persistent queue) (eventStreams.Routed topic)

/// Create a routed publisher based on the arguments.
let withPublisher publish log cfg = 
    let eventStreams = RabbitMq.mkEventStreams rabbit :> EventStreams
    fun tuple emit -> tuple, publish eventStreams

/// Publish Json
let publishJson exchange (eventStreams:EventStreams) (topic,msg) = 
    msg |-> eventStreams.UsingJsonPublisher (Routed (exchange,topic))

Persistence

Now let's bootstrap some Cassandra reader and writer components:

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
11: 
12: 
13: 
14: 
15: 
16: 
17: 
18: 
19: 
20: 
21: 
22: 
23: 
24: 
25: 
26: 
// DAL helpers
module Cassandra =
  open FsCassy
  open Cassandra.Mapping

  let mkInterpreter mkSession (name:string) = 
    let session = mkSession name
    { new Interpreter with 
        member x.Interpret<'t,'r> (statement:Statement<'t,'r>) : 'r= 
          Cassandra.Interpreter.execute (fun _ -> Cassandra.Api.mkTable (MappingConfiguration()) session) statement }

  let mkSession (connectionString:string) =
    Cassandra.Api.mkCluster connectionString
    |> Cassandra.Api.mkSession id

let cassandra = "contact points=localhost;default keyspace=sample"

let withPersisterArgs log cfg =
    let persister = Cassandra.mkInterpreter Cassandra.mkSession cassandra
    fun input emit ->
        (fun f -> f persister), input

let withReaderArgs log cfg =
    let persister = Cassandra.mkInterpreter Cassandra.mkSession cassandra
    fun input emit ->
        (fun f -> f persister), input, emit

Implementing component logic

FsShelter provides the runtime, the Extras components outline the role of the component, and the bootstrap parameters wire the configured providers, but the user provides the logic. Let's implement some of the components. We'll model some sensors, take the readings as messages coming in and raise an alarm by sending a message if the temperature readings exceed a threshold.

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
11: 
12: 
13: 
14: 
15: 
16: 
17: 
18: 
19: 
20: 
21: 
22: 
23: 
24: 
25: 
26: 
27: 
28: 
29: 
30: 
31: 
32: 
33: 
34: 
35: 
36: 
37: 
38: 
39: 
open System
open FsCassy

/// Events and Notifications
[<AutoOpen>]
module Messaging =
  type SensorReading = 
    { Id : Guid; Temp : int}

  type SensorAlert = 
    { Id : Guid; Text : string }

/// Cassandra-mapped types
[<AutoOpen>]
module Persistence =
  type Sensor = 
    { Id : Guid; Value : int}

/// Topology streams
type Stream =
    | Update of sensorId:Guid * temp:int
    | Alarm of sensorId:Guid * temp:int
    | NotFound of sensorId:Guid

let sensorWriter (persister:Interpreter) = 
    function 
    | Update(id, temp) -> 
            table<Sensor> 
        >>= where (Quote.X(fun s -> s.Id = id))
        >>= select (Quote.X(fun x -> { x with Value = temp }))
        >>= update >>= execute
            |> persister.Interpret

let thresholdAnalyser (persister:Interpreter) = 
    // realistically we'd use the ^ persister to read the threshold value from the database, but let's keep it simple
    function 
    | Update(id, temp) when temp > 100 -> [Alarm(id,temp)]
    | _ -> []
    >> async.Return

Topology

Finally, we put things together in a topology:

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
11: 
12: 
13: 
14: 
15: 
16: 
17: 
18: 
19: 
20: 
21: 
22: 
23: 
24: 
25: 
26: 
27: 
open FsShelter.DSL
open FsShelter.Extras

let sampleTopology = topology "sample" {
    let sensorEvents = 
        createQueueSpout (fun (m:SensorReading) -> Update (m.Id, m.Temp))
        |> runReliableSpout (withQueueConsumer "Topics.Sensor.Started" "sensor_started") ackerOfConsumer

    let sensorWriter = 
        createPersistBolt sensorWriter
        |> runBolt withPersisterArgs 

    let thresholdAnalyser = 
        createTransformBolt thresholdAnalyser
        |> runBolt withReaderArgs 

    let alertNotifier = 
        createNotifierBolt (function Alarm (sensorId,temp) -> "Topics.Alarm", 
                                                              { SensorAlert.Id = sensorId
                                                                Text = sprintf "At %A the reactor went critical" DateTime.Now})
        |> runBolt (withPublisher <| publishJson "amq.topic")

    yield sensorEvents ==> sensorWriter |> shuffle.on Update
    yield sensorEvents ==> thresholdAnalyser |> shuffle.on Update

    yield thresholdAnalyser --> alertNotifier |> shuffle.on Alarm
}

Summary

This example outlined wiring of the persistence and messaging dependencies into the core logic of the application using FsShelter.Extra's components. The topology is available for inspection and instantiation w/o imposing the runtime requirements, such as configuration and connectivity. At the same time, core logic is concise, focused on the problem and testable in isolation.

namespace FsBunny
namespace FsShelter
namespace FsShelter.Extras
namespace FsShelter.Extras.Assemblers
namespace System
namespace RabbitMQ
namespace RabbitMQ.Client
val mkEventStreams : connectionString:string -> RabbitMqEventStreams

Full name: Bootstrap.RabbitMq.mkEventStreams
val connectionString : string
Multiple items
val string : value:'T -> string

Full name: Microsoft.FSharp.Core.Operators.string

--------------------
type string = String

Full name: Microsoft.FSharp.Core.string
val uri : Uri
Multiple items
type Uri =
  new : uriString:string -> Uri + 5 overloads
  member AbsolutePath : string
  member AbsoluteUri : string
  member Authority : string
  member DnsSafeHost : string
  member Equals : comparand:obj -> bool
  member Fragment : string
  member GetComponents : components:UriComponents * format:UriFormat -> string
  member GetHashCode : unit -> int
  member GetLeftPart : part:UriPartial -> string
  ...

Full name: System.Uri

--------------------
Uri(uriString: string) : unit
Uri(uriString: string, uriKind: UriKind) : unit
Uri(baseUri: Uri, relativeUri: string) : unit
Uri(baseUri: Uri, relativeUri: Uri) : unit
val cf : ConnectionFactory
Multiple items
type ConnectionFactory =
  new : unit -> ConnectionFactory
  val UserName : string
  val Password : string
  val VirtualHost : string
  val RequestedChannelMax : uint16
  val RequestedFrameMax : uint32
  val RequestedHeartbeat : uint16
  val ClientProperties : IDictionary
  val Ssl : SslOption
  val HostName : string
  ...

Full name: RabbitMQ.Client.ConnectionFactory

--------------------
ConnectionFactory() : unit
Multiple items
type RabbitMqEventStreams =
  interface EventStreams
  new : factory:ConnectionFactory * defaultExchange:string -> RabbitMqEventStreams
  new : factory:ConnectionFactory * defaultExchange:string * retries:uint16 * limit:uint16 -> RabbitMqEventStreams

Full name: FsBunny.RabbitMqEventStreams

--------------------
new : factory:ConnectionFactory * defaultExchange:string -> RabbitMqEventStreams
new : factory:ConnectionFactory * defaultExchange:string * retries:uint16 * limit:uint16 -> RabbitMqEventStreams
val rabbit : string

Full name: Bootstrap.rabbit
val withQueueConsumer : topic:string -> queue:string -> log:'a -> cfg:'b -> Consumer<'c>

Full name: Bootstrap.withQueueConsumer


 Create a consumer based on the arguments.
val topic : string
val queue : string
val log : 'a
val cfg : 'b
val eventStreams : EventStreams
module RabbitMq

from Bootstrap


 DAL helpers
type EventStreams =
  interface
    abstract member Default : unit -> Exchange
    abstract member GetConsumer : Queue -> Exchange -> Assembler<'T> -> Consumer<'T>
    abstract member GetPublisher : Disassembler<'T> -> Publisher<'T>
    abstract member Routed : string -> Exchange
    abstract member UsingPublisher : Disassembler<'T> -> (Publisher<'T> -> unit) -> unit
  end

Full name: FsBunny.EventStreams
union case Queue.Persistent: string -> Queue
val withPublisher : publish:(EventStreams -> 'a) -> log:'b -> cfg:'c -> ('d -> 'e -> 'd * 'a)

Full name: Bootstrap.withPublisher


 Create a routed publisher based on the arguments.
val publish : (EventStreams -> 'a)
val log : 'b
val cfg : 'c
val tuple : 'd
val emit : 'e
val publishJson : exchange:string -> eventStreams:EventStreams -> topic:string * msg:Messaging.SensorAlert -> unit

Full name: Bootstrap.publishJson


 Publish Json
val exchange : string
val msg : Messaging.SensorAlert
union case Exchange.Routed: name: string * topic: string -> Exchange
val mkInterpreter : mkSession:(string -> 'a) -> name:string -> 'b

Full name: Bootstrap.Cassandra.mkInterpreter
val mkSession : (string -> 'a)
val name : string
Multiple items
val string : value:'T -> string

Full name: Microsoft.FSharp.Core.Operators.string

--------------------
type string = System.String

Full name: Microsoft.FSharp.Core.string
val session : 'a
val mkSession : connectionString:string -> 'a

Full name: Bootstrap.Cassandra.mkSession
val id : x:'T -> 'T

Full name: Microsoft.FSharp.Core.Operators.id
val cassandra : string

Full name: Bootstrap.cassandra
val withPersisterArgs : log:'a -> cfg:'b -> ('c -> 'd -> (('e -> 'f) -> 'f) * 'c)

Full name: Bootstrap.withPersisterArgs
val persister : 'e
module Cassandra

from Bootstrap
val input : 'c
val emit : 'd
val f : ('e -> 'f)
val withReaderArgs : log:'a -> cfg:'b -> ('c -> 'd -> (('e -> 'f) -> 'f) * 'c * 'd)

Full name: Bootstrap.withReaderArgs
Multiple items
type AutoOpenAttribute =
  inherit Attribute
  new : unit -> AutoOpenAttribute
  new : path:string -> AutoOpenAttribute
  member Path : string

Full name: Microsoft.FSharp.Core.AutoOpenAttribute

--------------------
new : unit -> AutoOpenAttribute
new : path:string -> AutoOpenAttribute
type SensorReading =
  {Id: Guid;
   Temp: int;}

Full name: Bootstrap.Messaging.SensorReading
SensorReading.Id: Guid
Multiple items
type Guid =
  struct
    new : b:byte[] -> Guid + 4 overloads
    member CompareTo : value:obj -> int + 1 overload
    member Equals : o:obj -> bool + 1 overload
    member GetHashCode : unit -> int
    member ToByteArray : unit -> byte[]
    member ToString : unit -> string + 2 overloads
    static val Empty : Guid
    static member NewGuid : unit -> Guid
    static member Parse : input:string -> Guid
    static member ParseExact : input:string * format:string -> Guid
    ...
  end

Full name: System.Guid

--------------------
Guid()
Guid(b: byte []) : unit
Guid(g: string) : unit
Guid(a: int, b: int16, c: int16, d: byte []) : unit
Guid(a: uint32, b: uint16, c: uint16, d: byte, e: byte, f: byte, g: byte, h: byte, i: byte, j: byte, k: byte) : unit
Guid(a: int, b: int16, c: int16, d: byte, e: byte, f: byte, g: byte, h: byte, i: byte, j: byte, k: byte) : unit
SensorReading.Temp: int
Multiple items
val int : value:'T -> int (requires member op_Explicit)

Full name: Microsoft.FSharp.Core.Operators.int

--------------------
type int = int32

Full name: Microsoft.FSharp.Core.int

--------------------
type int<'Measure> = int

Full name: Microsoft.FSharp.Core.int<_>
type SensorAlert =
  {Id: Guid;
   Text: string;}

Full name: Bootstrap.Messaging.SensorAlert
SensorAlert.Id: Guid
Multiple items
SensorAlert.Text: string

--------------------
namespace System.Text
type Sensor =
  {Id: Guid;
   Value: int;}

Full name: Bootstrap.Persistence.Sensor
Sensor.Id: Guid
Sensor.Value: int
type Stream =
  | Update of sensorId: Guid * temp: int
  | Alarm of sensorId: Guid * temp: int
  | NotFound of sensorId: Guid

Full name: Bootstrap.Stream


 Topology streams
union case Stream.Update: sensorId: Guid * temp: int -> Stream
union case Stream.Alarm: sensorId: Guid * temp: int -> Stream
union case Stream.NotFound: sensorId: Guid -> Stream
val sensorWriter : persister:'a -> _arg1:Stream -> 'b

Full name: Bootstrap.sensorWriter
val persister : 'a
val id : Guid
val temp : int
val thresholdAnalyser : persister:'a -> (Stream -> Async<Stream list>)

Full name: Bootstrap.thresholdAnalyser
val async : AsyncBuilder

Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.async
member AsyncBuilder.Return : value:'T -> Async<'T>
module DSL

from FsShelter
val sampleTopology : FsShelter.Topology.Topology<obj>

Full name: Bootstrap.sampleTopology
val topology : name:string -> TopologyBuilder

Full name: FsShelter.DSL.topology
val sensorEvents : FsShelter.Topology.Spout<Stream>
val createQueueSpout : toTuple:('a -> 'b) -> consumer:Consumer<'a> -> Async<(string * 'b) option>

Full name: FsShelter.Extras.Components.createQueueSpout
val m : SensorReading
val runReliableSpout : mkArgs:((FsShelter.Multilang.LogLevel -> string -> unit) -> FsShelter.Conf -> 'a) -> mkAcker:('a -> Acker) -> next:Next<'a,(string * 't)> -> FsShelter.Topology.Spout<'t>

Full name: FsShelter.DSL.runReliableSpout
val ackerOfConsumer : consumer:Consumer<'a> -> Acker

Full name: FsShelter.Extras.Components.ackerOfConsumer
val sensorWriter : FsShelter.Topology.Bolt<Stream>
val createPersistBolt : persist:('a -> 'b -> Async<'c>) -> using:(('a -> Async<'c>) -> 'd) * input:'b -> 'd

Full name: FsShelter.Extras.Components.createPersistBolt
val runBolt : mkArgs:((FsShelter.Multilang.LogLevel -> string -> unit) -> FsShelter.Conf -> 't -> ('t -> unit) -> 'a) -> consume:Consume<'a> -> FsShelter.Topology.Bolt<'t>

Full name: FsShelter.DSL.runBolt
val thresholdAnalyser : FsShelter.Topology.Bolt<Stream>
val createTransformBolt : transform:('a -> 'b -> Async<#seq<'d>>) -> using:(('a -> Async<unit>) -> 'e) * input:'b * out:('d -> unit) -> 'e

Full name: FsShelter.Extras.Components.createTransformBolt
val alertNotifier : FsShelter.Topology.Bolt<Stream>
val createNotifierBolt : ofTuple:('a -> 'b) -> input:'a * publish:Publisher<'b> -> Async<unit>

Full name: FsShelter.Extras.Components.createNotifierBolt
val sensorId : Guid
namespace System.Text
val sprintf : format:Printf.StringFormat<'T> -> 'T

Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.sprintf
Multiple items
type DateTime =
  struct
    new : ticks:int64 -> DateTime + 10 overloads
    member Add : value:TimeSpan -> DateTime
    member AddDays : value:float -> DateTime
    member AddHours : value:float -> DateTime
    member AddMilliseconds : value:float -> DateTime
    member AddMinutes : value:float -> DateTime
    member AddMonths : months:int -> DateTime
    member AddSeconds : value:float -> DateTime
    member AddTicks : value:int64 -> DateTime
    member AddYears : value:int -> DateTime
    ...
  end

Full name: System.DateTime

--------------------
DateTime()
   (+0 other overloads)
DateTime(ticks: int64) : unit
   (+0 other overloads)
DateTime(ticks: int64, kind: DateTimeKind) : unit
   (+0 other overloads)
DateTime(year: int, month: int, day: int) : unit
   (+0 other overloads)
DateTime(year: int, month: int, day: int, calendar: Globalization.Calendar) : unit
   (+0 other overloads)
DateTime(year: int, month: int, day: int, hour: int, minute: int, second: int) : unit
   (+0 other overloads)
DateTime(year: int, month: int, day: int, hour: int, minute: int, second: int, kind: DateTimeKind) : unit
   (+0 other overloads)
DateTime(year: int, month: int, day: int, hour: int, minute: int, second: int, calendar: Globalization.Calendar) : unit
   (+0 other overloads)
DateTime(year: int, month: int, day: int, hour: int, minute: int, second: int, millisecond: int) : unit
   (+0 other overloads)
DateTime(year: int, month: int, day: int, hour: int, minute: int, second: int, millisecond: int, kind: DateTimeKind) : unit
   (+0 other overloads)
property DateTime.Now: DateTime
val publishJson : exchange:string -> eventStreams:EventStreams -> topic:string * msg:SensorAlert -> unit

Full name: Bootstrap.publishJson


 Publish Json