Omnibus is an HTTP-friendly reactive message bus which means :
- Topic hierachies and subscriptions are managed via a rest API.
- Updates are streamed by Server-Sent-Event which can be easily consumed by javascript frontends.
- With reactive modes it is possible to replay specific parts of the events.
- Subscriptions can be composed via the url keyword
+
. - It can be easily integrated in an existing Akka application.
This is still a work in progress, any API is likely to change
Let's demonstrate some basic commands using Curl.
Topics are trees, you can create them simply with a POST request.
The root url of every topic is "/topics", this keyword is reserved.
Use POST to create the nested topic "/topics/animals/furry".
curl -X POST http://localhost:8080/topics/animals/furry
With PUT it is only possible to push data to an existing topic.
curl -X PUT http://localhost:8080/topics/animals -d "dolphins are the best"
If you publish a message at the "/animals" level, all subtopics will receive it as well.
And finally you can subscribe to the notifications on a topic.
curl -X GET http://localhost:8080/topics/animals
~~> Streaming subscription for topics /animals
When you subscribe to a topic, you will of course receive all the notifications targetting its sub topics.
It will be soon possible to DELETE a topic and all its subtopic through the protected administration API.
Omnibus supports reactive modes in order to replay specific sequence of events from topics.
The supported modes are:
-
simple
: classic subscription (default one if not specified) -
last
: get last message on a topic and the following events -
replay
: get all past messages on topic and the following events -
since-id
: all the past events since a given event-id and the following events -
since-ts
: all the past events since a given unix timestamp and the following events -
between-id
: all the events between two given event-id -
between-ts
: all the events between two given unix timestamp
Modes are specified by url parameter
curl -X GET "http://localhost:8080/topics/results/basketball?mode=between-id&since=1&to=2"
~~> Streaming subscription for topics /results/basketball with mode replay
id: 1 event: result/basketball data: A basket ball game result timestamp: 1388250283
id: 2 event: result/basketball data: Another basket ball game result timestamp: 1388250552
You can compose subscriptions with the char '+' in order to merge notifications from multiple topics.
curl -X GET http://localhost:8080/topics/customer/order/+/logistic/export
~~> Streaming subscription for topics /customer/order + /logistic/export with mode simple
Of course you are free to use reactive modes on composed subscriptions. Just be ready to handle the flow of data if you target a root topic with the replay mode :D
Omnibus exposes usage statistics concerning all topics and the system itself following two modes.
live
: get the current statistics.history
: get all statistics history available.streaming
: continous data stream of statistics in realtime
There are two ways of running Omnibus
Get the latest omnibus-standalone.zip distribution, extract and run it.
java -jar omnibus-standalone.jar &
This starts Omnibus on default port 8080.
It is possible to integrate Omnibus in an existing Akka application.
Add the latest omnibus.jar to your application by building from source with sbt publishLocal
libraryDependencies += "com.agourlay" % "omnibus" % "0.1-SNAPSHOT"
and then call :
val receptionist : OmnibusReceptionist = omnibus.service.OmnibusBuilder.start()
This will start the Omnibus system on the given port and return an OmnibusReceptionist object that offers all the useful methods to interact with the system :
def createTopic(topicName : String)
def deleteTopic(topicName : String)
def checkTopic(topicName : String) : Future[Boolean]
def publishToTopic(topicName : String, message : String)
def subToTopic(topicName : String, subscriber : ActorRef, mode : ReactiveCmd) : Future[Boolean]
def unsubFromTopic(topicName : String, subscriber : ActorRef)
def shutDownOmnibus()
The subscriber's actorRef will receive this kind of updates
omnibus.domain.Message(id : Long, topicName : String, payload : String, timestamp : Long)
-
provide minimal administration interface with statistics per topics
-
support properly Server-Sent-Event specification (Last-Event-Id header...)
-
deploy bus in cluster (akka-cluster)
-
support websockets