Currently 104 operators have been implemented:
⭐ - commonly used
All determines whether all items emitted by an Observable meet some criteria.
Pass a predicate function to the All operator that accepts an item emitted by the source Observable and returns a boolean value based on an evaluation of that item. All returns an ObservableBool that emits a single boolean value: true if and only if the source Observable terminates normally and every item emitted by the source Observable evaluated as true according to the predicate; false if any item emitted by the source Observable evaluates as false according to the predicate.
Code:
// Setup All to produce true only when all source values are less than 5
lessthan5 := func(i interface{}) bool {
return i.(int) < 5
}
result, err := rx.From(1, 2, 5, 2, 1).All(lessthan5).ToSingle()
fmt.Println("All values less than 5?", result, err)
result, err = rx.From(4, 1, 0, -1, 2, 3, 4).All(lessthan5).ToSingle()
fmt.Println("All values less than 5?", result, err)
Output:
All values less than 5? false <nil>
All values less than 5? true <nil>
AsObservableInt
or AsObservableBool
type asserts an Observable
to an observable of type int
or bool
.
Also AsObservable
can be called on an ObservableInt
and ObservableBool
to convert to an observable of type interface{}
.
AuditTime waits until the source emits and then starts a timer. When the timer expires, AuditTime will emit the last value received from the source during the time period when the timer was active.
Buffer buffers the source Observable values until closingNotifier emits.
import _ "github.com/reactivego/rx/generic"
Code:
source := rx.Timer(0*ms, 100*ms).Take(4).ConcatMap(func(i interface{}) rx.Observable {
switch i.(int) {
case 0:
return rx.From("a", "b")
case 1:
return rx.From("c", "d", "e")
case 3:
return rx.From("f", "g")
}
return rx.Empty()
})
closingNotifier := rx.Interval(100 * ms)
source.Buffer(closingNotifier).Println()
Output:
[a b]
[c d e]
[]
[f g]
BufferTime buffers the source Observable values for a specific time period and emits those as a slice periodically in time.
Code:
const ms = time.Millisecond
source := rx.Timer(0*ms, 100*ms).Take(4).ConcatMap(func(i interface{}) rx.Observable {
switch i.(int) {
case 0:
return rx.From("a", "b")
case 1:
return rx.From("c", "d", "e")
case 3:
return rx.From("f", "g")
}
return rx.Empty()
})
source.BufferTime(100 * ms).Println()
Output:
[a b]
[c d e]
[]
[f g]
Catch recovers from an error notification by continuing the sequence without emitting the error but by switching to the catch ObservableInt to provide items.
Code:
const problem = rx.RxError("problem")
rx.From(1, 2, 3).ConcatWith(rx.Throw(problem)).Catch(rx.From(4, 5)).Println()
Output:
1
2
3
4
5
CatchError catches errors on the Observable to be handled by returning a new Observable or throwing an error. It is passed a selector function that takes as arguments err, which is the error, and caught, which is the source observable, in case you'd like to "retry" that observable by returning it again. Whatever observable is returned by the selector will be used to continue the observable chain.
Code:
const problem = rx.RxError("problem")
catcher := func(err error, caught rx.Observable) rx.Observable {
if err == problem {
return rx.From(4, 5)
} else {
return caught
}
}
rx.From(1, 2, 3).ConcatWith(rx.Throw(problem)).CatchError(catcher).Println()
Output:
1
2
3
4
5
Distinct suppress duplicate items emitted by an Observable.
The operator filters an Observable by only allowing items through that have not already been emitted. In some implementations there are variants that allow you to adjust the criteria by which two items are considered “distinct.” In some, there is a variant of the operator that only compares an item against its immediate predecessor for distinctness, thereby filtering only consecutive duplicate items from the sequence.
Code:
rx.From(1, 2, 2, 1, 3).Distinct().Println()
Output:
1
2
3
DistinctUntilChanged only emits when the current value is different from the last.
The operator only compares emitted items from the source Observable against their immediate predecessors in order to determine whether or not they are distinct.
Code:
rx.From(1, 2, 2, 1, 3).DistinctUntilChanged().Println()
Output:
1
2
1
3
StartWith returns an observable that, at the moment of subscription, will synchronously emit all values provided to this operator, then subscribe to the source and mirror all of its emissions to subscribers.
Code:
rx.From(2, 3).StartWith(1).Println()
Output:
1
2
3
SubscribeOn specifies which scheduler an Observable should use when it is subscribed to.
Code:
trampoline := rx.MakeTrampolineScheduler()
observer := func(next interface{}, err error, done bool) {
switch {
case !done:
fmt.Println(trampoline, "print", next)
case err != nil:
fmt.Println(trampoline, "print", err)
default:
fmt.Println(trampoline, "print", "complete")
}
}
fmt.Println(trampoline, "SUBSCRIBING...")
subscription := rx.From(1, 2, 3).SubscribeOn(trampoline).Subscribe(observer)
fmt.Println(trampoline, "WAITING...")
subscription.Wait()
fmt.Println(trampoline, "DONE")
Output:
Trampoline{ tasks = 0 } SUBSCRIBING...
Trampoline{ tasks = 1 } WAITING...
Trampoline{ tasks = 1 } print 1
Trampoline{ tasks = 1 } print 2
Trampoline{ tasks = 1 } print 3
Trampoline{ tasks = 1 } print complete
Trampoline{ tasks = 0 } DONE
Code:
const ms = time.Millisecond
goroutine := rx.GoroutineScheduler()
observer := func(next interface{}, err error, done bool) {
switch {
case !done:
fmt.Println(goroutine, "print", next)
case err != nil:
fmt.Println(goroutine, "print", err)
default:
fmt.Println(goroutine, "print", "complete")
}
}
fmt.Println(goroutine, "SUBSCRIBING...")
subscription := rx.From(1, 2, 3).Delay(10 * ms).SubscribeOn(goroutine).Subscribe(observer)
// Note that without a Delay the next Println lands at a random spot in the output.
fmt.Println("WAITING...")
subscription.Wait()
fmt.Println(goroutine, "DONE")
Output:
Output:
Goroutine{ tasks = 0 } SUBSCRIBING...
WAITING...
Goroutine{ tasks = 1 } print 1
Goroutine{ tasks = 1 } print 2
Goroutine{ tasks = 1 } print 3
Goroutine{ tasks = 1 } print complete
Goroutine{ tasks = 0 } DONE
WithLatestFrom will subscribe to all Observables and wait for all of them to emit before emitting the first slice. The source observable determines the rate at which the values are emitted. The idea is that observables that are faster than the source, don't determine the rate at which the resulting observable emits. The observables that are combined with the source will be allowed to continue emitting but only will have their last emitted value emitted whenever the source emits.
Note that any values emitted by the source before all other observables have emitted will effectively be lost. The first emit will occur the first time the source emits after all other observables have emitted.
Code:
a := rx.From(1,2,3,4,5)
b := rx.From("A","B","C","D","E")
a.WithLatestFrom(b).Println()
Output:
[2 A]
[3 B]
[4 C]
[5 D]
WithLatestFromAll flattens a higher order observable (e.g. ObservableObservable) by subscribing to all emitted observables (ie. Observable entries) until the source completes. It will then wait for all of the subscribed Observables to emit before emitting the first slice. The first observable that was emitted by the source will be used as the trigger observable. Whenever the trigger observable emits, a new slice will be emitted containing all the latest values.
Note that any values emitted by the source before all other observables have emitted will effectively be lost. The first emit will occur the first time the source emits after all other observables have emitted.
Code:
a := rx.From(1, 2, 3, 4, 5)
b := rx.From("A", "B", "C", "D", "E")
c := rx.FromObservable(a, b)
c.WithLatestFromAll().Println()
Output:
[2 A]
[3 B]
[4 C]
[5 D]