Skip to content

Latest commit

 

History

History
888 lines (644 loc) · 19 KB

OPERATORS.md

File metadata and controls

888 lines (644 loc) · 19 KB

Operators

Currently 104 operators have been implemented:

A … C D … L M … P R … S T … W
All DebounceTime Map Range Take
AsObservable Defer MapTo Reduce TakeLast
AsyncSubject Delay Max RefCount TakeUntil
AuditTime Distinct Merge Repeat TakeWhile
AutoConnect DistinctUntilChanged MergeAll ReplaySubject ThrottleTime
Average Do MergeDelayError Retry Throw
BehaviorSubject DoOnComplete MergeDelayErrorWith SampleTime Ticker
Buffer DoOnError MergeMap Scan TimeInterval
BufferTime ElementAt MergeMapTo Serialize Timeout
Catch Empty MergeWith Single Timer
CatchError Filter Min Skip Timestamp
CombineLatest Finally Never SkipLast ToChan
CombineLatestAll First ObserveOn Start ToSingle
CombineLatestMap From Of StartWith ToSlice
CombineLatestMapTo FromChan Only Subject Wait
CombineLatestWith IgnoreCompletion Passthrough Subscribe WithLatestFrom
Concat IgnoreElements Println SubscribeOn WithLatestFromAll
ConcatAll Interval Publish Sum
ConcatMap Just PublishBehavior SwitchAll
ConcatMapTo Last PublishLast SwitchMap
ConcatWith PublishReplay
Connect
Count
Create
CreateFutureRecursive
CreateRecursive

⭐ - commonly used

All

All determines whether all items emitted by an Observable meet some criteria.

Pass a predicate function to the All operator that accepts an item emitted by the source Observable and returns a boolean value based on an evaluation of that item. All returns an ObservableBool that emits a single boolean value: true if and only if the source Observable terminates normally and every item emitted by the source Observable evaluated as true according to the predicate; false if any item emitted by the source Observable evaluates as false according to the predicate.

All

Code:

// Setup All to produce true only when all source values are less than 5
lessthan5 := func(i interface{}) bool {
	return i.(int) < 5
}

result, err := rx.From(1, 2, 5, 2, 1).All(lessthan5).ToSingle()

fmt.Println("All values less than 5?", result, err)

result, err = rx.From(4, 1, 0, -1, 2, 3, 4).All(lessthan5).ToSingle()

fmt.Println("All values less than 5?", result, err)

Output:

All values less than 5? false <nil>
All values less than 5? true <nil>

AsObservable

AsObservableInt or AsObservableBool type asserts an Observable to an observable of type int or bool. Also AsObservable can be called on an ObservableInt and ObservableBool to convert to an observable of type interface{}.

AuditTime

AuditTime waits until the source emits and then starts a timer. When the timer expires, AuditTime will emit the last value received from the source during the time period when the timer was active.

AsyncSubject

TBD

AutoConnect

TBD

Average

TBD

BehaviorSubject

TBD

Buffer

Buffer buffers the source Observable values until closingNotifier emits.

Buffer

Example

import _ "github.com/reactivego/rx/generic"

Code:

source := rx.Timer(0*ms, 100*ms).Take(4).ConcatMap(func(i interface{}) rx.Observable {
    switch i.(int) {
    case 0:
        return rx.From("a", "b")
    case 1:
        return rx.From("c", "d", "e")
    case 3:
        return rx.From("f", "g")
    }
    return rx.Empty()
})
closingNotifier := rx.Interval(100 * ms)
source.Buffer(closingNotifier).Println()

Output:

[a b]
[c d e]
[]
[f g]

BufferTime

BufferTime buffers the source Observable values for a specific time period and emits those as a slice periodically in time.

BufferTime

Code:

const ms = time.Millisecond
source := rx.Timer(0*ms, 100*ms).Take(4).ConcatMap(func(i interface{}) rx.Observable {
    switch i.(int) {
    case 0:
        return rx.From("a", "b")
    case 1:
        return rx.From("c", "d", "e")
    case 3:
        return rx.From("f", "g")
    }
    return rx.Empty()
})
source.BufferTime(100 * ms).Println()

Output:

[a b]
[c d e]
[]
[f g]

Catch

Catch recovers from an error notification by continuing the sequence without emitting the error but by switching to the catch ObservableInt to provide items.

Catch

Code:

const problem = rx.RxError("problem")

rx.From(1, 2, 3).ConcatWith(rx.Throw(problem)).Catch(rx.From(4, 5)).Println()

Output:

1
2
3
4
5

CatchError

CatchError catches errors on the Observable to be handled by returning a new Observable or throwing an error. It is passed a selector function that takes as arguments err, which is the error, and caught, which is the source observable, in case you'd like to "retry" that observable by returning it again. Whatever observable is returned by the selector will be used to continue the observable chain.

CatchError

Code:

const problem = rx.RxError("problem")

catcher := func(err error, caught rx.Observable) rx.Observable {
    if err == problem {
        return rx.From(4, 5)
    } else {
        return caught
    }
}

rx.From(1, 2, 3).ConcatWith(rx.Throw(problem)).CatchError(catcher).Println()

Output:

1
2
3
4
5

CombineLatest

TBD

CombineLatestAll

TBD

CombineLatestMap

TBD

CombineLatestMapTo

TBD

CombineLatestWith

TBD

Concat

TBD

ConcatAll

TBD

ConcatMap

TBD

ConcatMapTo

TBD

ConcatWith

ConcatWith

TBD

Connect

TBD

Count

TBD

Create

TBD

CreateFutureRecursive

TBD

CreateRecursive

TBD

DebounceTime

TBD

Defer

TBD

Delay

TBD

Distinct

Distinct suppress duplicate items emitted by an Observable.

The operator filters an Observable by only allowing items through that have not already been emitted. In some implementations there are variants that allow you to adjust the criteria by which two items are considered “distinct.” In some, there is a variant of the operator that only compares an item against its immediate predecessor for distinctness, thereby filtering only consecutive duplicate items from the sequence.

Distinct

Code:

rx.From(1, 2, 2, 1, 3).Distinct().Println()

Output:

1
2
3

DistinctUntilChanged

DistinctUntilChanged only emits when the current value is different from the last.

The operator only compares emitted items from the source Observable against their immediate predecessors in order to determine whether or not they are distinct.

DistinctUntilChanged

Code:

rx.From(1, 2, 2, 1, 3).DistinctUntilChanged().Println()

Output:

1
2
1
3

Do

TBD

DoOnComplete

TBD

DoOnError

TBD

ElementAt

TBD

Empty

TBD

Filter

TBD

Finally

TBD

First

TBD

From

TBD

FromChan

TBD

IgnoreCompletion

TBD

IgnoreElements

TBD

Interval

TBD

Just

TBD

Last

TBD

Map

TBD

MapTo

TBD

Max

TBD

Merge

Merge

TBD

MergeAll

TBD

MergeDelayError

MergeDelayError

TBD

MergeDelayErrorWith

MergeDelayErrorWith

TBD

MergeMap

TBD

MergeMapTo

TBD

MergeWith

MergeWith

TBD

Min

TBD

Never

TBD

ObserveOn

TBD

Of

TBD

Only

TBD

Passthrough

TBD

Println

TBD

Publish

TBD

PublishBehavior

TBD

PublishLast

TBD

PublishReplay

TBD

Range

TBD

Reduce

TBD

RefCount

TBD

Repeat

TBD

ReplaySubject

TBD

Retry

TBD

SampleTime

TBD

Scan

TBD

Serialize

TBD

Single

TBD

Skip

TBD

SkipLast

TBD

Start

TBD

StartWith

StartWith returns an observable that, at the moment of subscription, will synchronously emit all values provided to this operator, then subscribe to the source and mirror all of its emissions to subscribers.

StartWith

Code:

rx.From(2, 3).StartWith(1).Println()

Output:

1
2
3

Subject

TBD

Subscribe

TBD

SubscribeOn

SubscribeOn specifies which scheduler an Observable should use when it is subscribed to.

Example (SubscribeOn Trampoline)

Code:

trampoline := rx.MakeTrampolineScheduler()
observer := func(next interface{}, err error, done bool) {
	switch {
	case !done:
		fmt.Println(trampoline, "print", next)
	case err != nil:
		fmt.Println(trampoline, "print", err)
	default:
		fmt.Println(trampoline, "print", "complete")
	}
}
fmt.Println(trampoline, "SUBSCRIBING...")
subscription := rx.From(1, 2, 3).SubscribeOn(trampoline).Subscribe(observer)
fmt.Println(trampoline, "WAITING...")
subscription.Wait()
fmt.Println(trampoline, "DONE")

Output:

Trampoline{ tasks = 0 } SUBSCRIBING...
Trampoline{ tasks = 1 } WAITING...
Trampoline{ tasks = 1 } print 1
Trampoline{ tasks = 1 } print 2
Trampoline{ tasks = 1 } print 3
Trampoline{ tasks = 1 } print complete
Trampoline{ tasks = 0 } DONE

Example (SubscribeOn Goroutine)

Code:

const ms = time.Millisecond
goroutine := rx.GoroutineScheduler()
observer := func(next interface{}, err error, done bool) {
	switch {
	case !done:
		fmt.Println(goroutine, "print", next)
	case err != nil:
		fmt.Println(goroutine, "print", err)
	default:
		fmt.Println(goroutine, "print", "complete")
	}
}
fmt.Println(goroutine, "SUBSCRIBING...")
subscription := rx.From(1, 2, 3).Delay(10 * ms).SubscribeOn(goroutine).Subscribe(observer)
// Note that without a Delay the next Println lands at a random spot in the output.
fmt.Println("WAITING...")
subscription.Wait()
fmt.Println(goroutine, "DONE")

Output:

Output:
Goroutine{ tasks = 0 } SUBSCRIBING...
WAITING...
Goroutine{ tasks = 1 } print 1
Goroutine{ tasks = 1 } print 2
Goroutine{ tasks = 1 } print 3
Goroutine{ tasks = 1 } print complete
Goroutine{ tasks = 0 } DONE

Sum

TBD

SwitchAll

TBD

SwitchMap

TBD

Take

TBD

TakeLast

TBD

TakeUntil

TBD

TakeWhile

TBD

ThrottleTime

TBD

Throw

TBD

Ticker

TBD

TimeInterval

TBD

Timeout

TBD

Timer

TBD

Timestamp

TBD

ToChan

TBD

ToSingle

TBD

ToSlice

TBD

Wait

TBD

WithLatestFrom

WithLatestFrom will subscribe to all Observables and wait for all of them to emit before emitting the first slice. The source observable determines the rate at which the values are emitted. The idea is that observables that are faster than the source, don't determine the rate at which the resulting observable emits. The observables that are combined with the source will be allowed to continue emitting but only will have their last emitted value emitted whenever the source emits.

Note that any values emitted by the source before all other observables have emitted will effectively be lost. The first emit will occur the first time the source emits after all other observables have emitted.

WithLatestFrom

Code:

a := rx.From(1,2,3,4,5)
b := rx.From("A","B","C","D","E")
a.WithLatestFrom(b).Println()

Output:

[2 A]
[3 B]
[4 C]
[5 D]

WithLatestFromAll

WithLatestFromAll flattens a higher order observable (e.g. ObservableObservable) by subscribing to all emitted observables (ie. Observable entries) until the source completes. It will then wait for all of the subscribed Observables to emit before emitting the first slice. The first observable that was emitted by the source will be used as the trigger observable. Whenever the trigger observable emits, a new slice will be emitted containing all the latest values.

Note that any values emitted by the source before all other observables have emitted will effectively be lost. The first emit will occur the first time the source emits after all other observables have emitted.

WithLatestFromAll

Code:

a := rx.From(1, 2, 3, 4, 5)
b := rx.From("A", "B", "C", "D", "E")
c := rx.FromObservable(a, b)
c.WithLatestFromAll().Println()

Output:

[2 A]
[3 B]
[4 C]
[5 D]