Stream processing in Golang with a modular notification behavior based on filters.
go get github.com/konimarti/flow
- Define a data stream source:
yourSource := flow.Func{
func() interface{} {
return rand.Float64()
},
500 * time.Millisecond,
}
- Define the stream data processing (i.e. your filters):
yourFilters := filters.AboveFloat64{0.5}
- Create your flow and subscribe to the results:
yourFlow := flow.New(
yourFilters,
yourSource,
)
results := yourFlow.Subscribe()
for {
<-results.C()
fmt.Println(results.Value())
}
- Apply a low-pass filter (exponential smoothing) to a sequence of random numbers between 0 and 1:
flow := flow.New(
&filters.LowPass{A: 0.1},
&flow.Func{
func(){ return rand.Float64() },
500 * time.Millisecond,
},
)
- Generate a stream of random numbers from a standard normal, calculate moving average, print average and check if average is above or below 0.5 and -0.5, respectively. If so, then notify the subscribers.
// define stream processor (flow) that returns an observer
yourFlow := flow.New(
filters.NewChain(
&filters.MovingAverage{Window: 10},
&filters.Print{Writer: os.Stdout, Prefix: "Moving average:"},
filters.NewSwitch(
&filters.AboveFloat64{0.5},
&filters.BelowFloat64{-0.5},
),
),
&flow.Func{
func() interface{} { return rand.NormFloat64() },
500*time.Millisecond,
},
)
// subscribe to flow and listen to events
results := yourFlow.Subscribe()
for {
<-results.C()
fmt.Println("Notified:", results.Value())
}
Two types of flows are available that are suitable for different use cases:
- Channel-based flows accept new values through a
chan interface{}
channel, and - Function-based flows collect new values in regular intervals from a
func() interface{}
function.
Channel-based flows are useful in cases where we receive specific events. Function-based flows can be used to monitor any object or state of resources (i.e. reading data from OPC, HTTP requests, etc.).
- To get a channel-based flow:
// define channel
ch := make(chan interface{})
// define your filter(s) for data processing
filter := filters.OnChange{}
// create your flow
yourChannelFlow := flow.New(
&filter,
&flow.Chan{ch},
)
// publish new data to channel ch
// ch <- ..
- To get a function-based flow:
// define a function that returns the values
fn := func() interface{} {
return rand.Float64()
}
// define your filter(s)
filter := filters.OnChange{}
// create your flow
yourFunctionFlow := flow.New(
&filter,
&flow.Func{
fn,
1 * time.Second,
},
)
- You can subscribe to a flow in order to receive the results from the filter(s):
results := yourFlow.Subscribe()
for {
// wait for a result
<-results.C()
// get value from the filters
results.Value()
}
The filters control the behavior of the observer, i.e. they determine when and what values should be sent to the subscribers.
The following filters are currently implemented in this package:
-
Notification filters:
None{}
: No filter is applied. All values are sent to the observers unfilitered and unprocessed.Sink{}
: Blocks the flow of data. No values are sent to the observers.Mute{Duration time.Duration}
: Mute shuts down all notifications after an event for a specific duration.OnChange{}
: Notifies when the value changes.OnRisingFlank{}
: Notifies when the value increases.OnValue{value float64}
: Notifies when the new value matches the defined value at initialization.AboveFloat64{threshold float64}
: Notifies when a new float64 is above the pre-defined float64 threshold.BelowFloat64{threshold float64}
: Notifies when a new float64 is below the pre-defined float64 threshold.Sigma{Window int, Factor float64}
: Sigma checks if the incoming value is a certain multiple (=factor) of standard deviations away from the mean.
-
Stream-processing filters:
MovingAverage{Window int}
: Calculates the moving average over a certain sample size and sends the current mean to all subscribers.StdDev{Window int}
: Calculates the standard deviation over a certain sample size and sends the current standard deviation to all subscribers.LowPass{A float64}
: Performs low-pass filtering on the input data (exponential smoothing) with the smoothing factor A.
User-defined filters can easily be created: Define your struct and embed the filters.Model
. You can then customize one or both of the interface functions.
The filters.None
is implemented by creating an empty struct and just embedding filters.Model
, for example.
The following user-defined filter expects a float64 value and multiplies it with a pre-defined factor:
type Multiply struct {
filters.Model
Factor float64
}
func (m *Multiply) Update(v interface{}) interface{} {
return v.(float64) * m.Factor
}
Filters can be chained together using filters.NewChain(Filter1, Filter2, ...)
.
To adjust the notification behavior, the filters.NewSwitch
function can be useful, especially in cases when you want
to monitor a value that needs to remain within a certain range ("deadband").
See this example for more information on logical structures
An anomaly detection example for streams with an user-defined filter based on Lytics' Anomalyzer can be found here.
Check out the examples here.
This software package has been developed for and is in production at Kalkfabrik Netstal. The design of the observer implementation was inspired by go-observer.
This package is still work-in-progress. Interfaces might still change substantially. It is also not recommended to use it in a production environment at this point.