FsStorm


FsStorm

Overview

FsStorm is a library for implementation of Apache Storm components, definition of topologies in F# DSL and submission via F# scripts for execution. The topology and the components could be implemented in a single EXE project and are executed by Storm via its multilang protocol as separate processes - one for each task/instance. Accompanying FsJson library is used for dealing with Json structures passed in and out of Storm.

FsStorm components

FsStorm components are defined as functions that take at least one (last) argument: configuration passed in from Storm. In practice, you'll want to pass all your dependencies in, and that means at least one other: a runner, passed in from your topology. Additionally you can pass as many arguments from the topology as needed. Think of the component function as "main" for your program. Storm will start (a copy of) the same EXE for all components in the topology, and will instruct each instance with the task it supposed to execute. The "main" function will be called by FsStorm once per instance of every component and its purpose is to construct either "next" function for spouts or "consume" function for bolts and pass it to a runner. FsStorm implements several runners that either talk to Storm or allow you to unit-test your components by recording outputs or playing back the inputs.

FsStorm tuples

Storm components communicate by passing tuples to each other over streams. The tuples are emmited into streams and have schema defined by the spout topology Output element. Storm multilang is wrapped and accessible via included FsJson. In addition to raw json access, FsStorm defines several helpers: tuple, namedStream, anchor, etc. that help to abstract the specifics of underlying multilang.

Example of a spout

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
11: 
12: 
13: 
14: 
open FsJson
open Storm

let rnd = new System.Random() // used for generating random messages

///spout - produces messages
///cfg: the configuration passed in by Storm
///runner: a spout runner function (passed in from topology)
let spout runner cfg = 
    //define the function that will produce the tuples
    //emit: a function that emits message to storm (passed in by the runner)
    let next emit = fun () -> async { tuple [ rnd.Next(0, 100) ] |> emit } //the "next" function
    //run the spout
    next |> runner

Topology DSL in F#

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
11: 
12: 
13: 
14: 
15: 
16: 
17: 
18: 
19: 
20: 
21: 
22: 
23: 
24: 
25: 
26: 
27: 
28: 
//define the storm topology
open StormDSL
open FsJson

//example of using FsStorm DSL for defining topologies
let topology = 
    { TopologyName = "FstSample"
      Spouts = 
          [ { Id = "SimpleSpout" // unique Id
              Outputs = [ Default [ "number" ] ] // default stream schema
              Spout = Local { Func = spout Storm.simpleSpoutRunner }
              // configuration Storm will use with each instance of the component 
              Config = JsonNull 
              Parallelism = 1 } ] // one instance
      Bolts = 
          [ { Id = "AddOneBolt"
              Outputs = [ Default [ "number" ] ]
              // default stream of SimpleSpout, no grouping/affinity 
              Inputs = [ DefaultStream "SimpleSpout", Shuffle ]
              Bolt = Local { Func = addOneBolt Storm.autoAckBoltRunner Logging.log Storm.emit}
              Config = JsonNull
              Parallelism = 2 } // two instances of the process/component
            { Id = "ResultBolt"
              Outputs = [] // no output
              Inputs = [ DefaultStream "AddOneBolt", Shuffle ]
              Bolt = Local { Func = resultBolt Storm.autoAckBoltRunner Logging.log }
              Config = JsonNull
              Parallelism = 2 } ] }

Submitting the topology using F# scripts

1: 
2: 
3: 
4: 
5: 
6: 
7: 
#I "../../Refs"
#I "../../packages/Thrift/lib/net35"
#load "StormSubmit.fsx"

let binDir = "build"

StormSubmit.runTopology binDir "localhost" StormSubmit.default_nimbus_port

Exporting the topology graph in DOT format (GraphViz) using F# scripts

1: 
2: 
3: 
4: 
5: 
6: 
#r "../../src/FstSample/bin/Release/FstSample.exe"

open StormDotGraph
open System

writeToConsole SampleTopology.topology

Samples & documentation

  • FstSample contains a "unreliable" spout example - emitted tuples do not require ack, could be lost in case of failure.

  • FstGuaranteedSample contains a "reliable" spout example - emitted tuples have unique ID and require ack.

  • API Reference contains automatically generated documentation for public types, modules and functions in the library.

  • WordCount contains a simple example showing a spout with two bolts.

Getting FsStorm

The FsStorm library can be installed from NuGet source or MyGet:
PM> Install-Package FsStorm

Contributing and copyright

The project is hosted on GitHub where you can report issues, fork the project and submit pull requests. If you're adding a new public API, please also consider adding samples that can be turned into a documentation. You might also want to read the library design notes to understand how it works.

The library is available under MIT license, which allows modification and redistribution for both commercial and non-commercial purposes. For more information see the License file in the GitHub repository.

val rnd : System.Random

Full name: Index.rnd
namespace System
Multiple items
type Random =
  new : unit -> Random + 1 overload
  member Next : unit -> int + 2 overloads
  member NextBytes : buffer:byte[] -> unit
  member NextDouble : unit -> float

Full name: System.Random

--------------------
System.Random() : unit
System.Random(Seed: int) : unit
val spout : runner:((('a -> unit) -> unit -> Async<unit>) -> 'b) -> cfg:'c -> 'b

Full name: Index.spout


spout - produces messages
cfg: the configuration passed in by Storm
runner: a spout runner function (passed in from topology)
val runner : ((('a -> unit) -> unit -> Async<unit>) -> 'b)
val cfg : 'c
val next : (('d -> unit) -> unit -> Async<unit>)
val emit : ('d -> unit)
val async : AsyncBuilder

Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.async
System.Random.Next() : int
System.Random.Next(maxValue: int) : int
System.Random.Next(minValue: int, maxValue: int) : int
val topology : obj

Full name: Index.topology
val log : value:'T -> 'T (requires member Log)

Full name: Microsoft.FSharp.Core.Operators.log
val binDir : string

Full name: Index.binDir
module StormSubmit
val runTopology : binDir:string -> nimbus_host:string -> nimbus_port:int -> unit

Full name: StormSubmit.runTopology
val default_nimbus_port : int

Full name: StormSubmit.default_nimbus_port
module SampleTopology
Fork me on GitHub