GNU General Public License v3.0 licensed. Source available on github.com/zifeo/EPFL.
Spring 2015: Reactive Programming & Parallelism
[TOC]
- sequential programming : one part at once
- parallel programing : multiple parts at once
- parallel is about performance
- infrastructure : libraries, hardware, frameworks for parallelism
- application : design parallel software
- concurrent programming is a general concept : structure program into concurrent tasks that communicate both among each other and with the external environment
- must ensure safe accesses to shared resources (locks)
- can be used to ensure responsiveness in interactive apps (even on a single CPU core)
-
parallel programming : speed up computation through parallel hardware resources without solving all difficulties of concurrency (use frameworks instead)
- get some part of the problem (isolation)
- return result
- avoid side effects (except for local performance improvements), so general rule : if one parallel tasks writes to a variable (or array entry), no other task may read or write this variable at the same time
-
def parallel[A, B](taskA: => A, taskB: => B): (A, B) = { ... }
- running time can be maximum of time (instead of sum)
- if no avaible parallel resources, executes tasks sequentially
- invoking this carries a significant constant overhead (do not do it if not needed)
- call by name
-
costs :
$S(e)$ - $S($sumSegment(a,p,s,t)$)=c_1(t-s)+c_2$
- $S($parallel(t1,t2)$)=c_P+max(S(
$t1$ ),S($t2$ ))$
-
span :
$S(e)$ number of steps if we had unbounded parallelism- $S($parallel(
$e_1$ ,$e_2$)$)=c_P+max(S(e_1),S(e_2))$
- $S($parallel(
-
work :
$W(e)$ number of steps$e$ would take if there was no parallelism (sequential)- $W($parallel(
$e_1$ ,$e_2$)$)=c_Q+W(e_1)+W(e_2)$
- $W($parallel(
-
waiting :
$max(S(e), \frac{W(e)}{K})$ for$K$ threads- if
$K$ is constant but inputs grow, parallel programs have same asymptotic time cmplexity as sequeial ones - even if we have infinite resources (
$K\to\infty$ ), we have non-zero complexity given by$S(e)$
- if
-
Amdahl's law : speedup =
$\frac{1}{1-f+\frac{f}{K}}$ for$K$ threads and$f$ part that is parallelized (%) - ScalaMeter
val time = measure { (0 until 1000000).toArray }
- need warmup
withWarmer(new Warmer.Default) measure { ... }
- first the program is interpreted
- then, part fo the program are compiled into machine code
- JVM may apply additional dynamic optimizations during the run
- eventually, the program reaches steady-state performance
- running config
- Default (plain running time)
- IgnoringGC
- OutlierElimination
- MemoryFootprint
- GarbageCollectionCycles
- memory bandwidth : memory bound if computation is simple (the cache for a parallel sum do not help)
-
def task(c: => A) : ForkJoinTask[A]
withtrait ForkJoinTask[A] { def join: A }
- list traversal (map) is not parallelizable
- alternative
- array : be careful on accesses, good memory locality, hard to construct from pieces)
- immutable tres : functional, high memory allocation overhead, bad locality
- mid computation : avoid overflow by
val mid = left + (right − left)/2
- need a threshold for avoiding to much splitting at small scale
- inlining high-order function helps but this is nothing
- similar perf with trees
- parallel reduce
- messy behavior so associative operation
f : (A,A) => A
are required (no left nor right fold) - associativity iff
f (x, f (y, z)) = f (f (x, y), z)
for all x, y and z- prove by induction
- important associative operations that happen to be also commutative
- addition, multiplication modulo a positive integer
- addition, multiplication of BigInt's
- union, intersection, symmetric difference of sets
- boolean operations
- all complements of associative operations
- floating point is not associative
- messy behavior so associative operation
- parallel scan
- upsweep : compute the tree in parallel
- downsweep : compute scan result in parallel (recursive way)
- combining and fusing operations
- combine two maps into one
- fusing basic operation in the sequential part (under threshold)
- task-parallel programming : a form of parallelization that distributes execution processes across computing nodes
- data-parallel programming : a form of parallelization that distributes data across computing nodes
- must also follow the rule : check the usage of shared memory
- parallel collection :
for (idx <- (0 until image.length).par) {
- about 2x faster
- workload : function that maps each input element to the amount of
work required to process it
- data-parallel scheduler: efficiently balance the workload across processors without any knowledge about the w(i)
- deterministric as long as used functionnally
- not all operation excute in parallel (foldRight, reduceLeft, scanRight, etc.)
- need for associativity : we say that the neutral element z and the binary operator f must form a monoid
f(a, f(b, c)) == f(f(a, b), c)
f(z, a) == f(a, z) == a
- commutativity is not need for fold
- can only produce values from the same type
- aggreagate :
def aggregate[B](z: B)(f: (B, A) => B, g: (B, B) => B): B
(can change return type) - this was accessor combinators : return single value
- transformer combinators : map, filter, flatMap, groupBy return new collections
- Scala collections hierarchy
Traversable[T]
– collection of elements with type T, with operations implemented using foreachIterable[T]
– collection of elements with type T, with operations implemented using iteratorSeq[T]
– an ordered sequence of elements with type TSet[T]
– a set of elements with type T (no duplicates)Map[K, V]
– a map of keys with type K associated with values of type V (no duplicate keys)- parallel equivalence :
ParIterable[T]
, etc. - agnostic collections regroup both :
GenIterable[T]
, etc. - parallelizable collections :
ParArray[T]
– parallel array of objects, counterpart of Array and ArrayBufferParRange
– parallel range of integers, counterpart of RangeParVector[T]
– parallel vector, counterpart of Vectormutable.ParHashSet[T]
– counterpart ofmutable.HashSet
mutable.PasHashMap[K, V]
– counterpart ofmutable.HashMap
immutable.ParHashSet[T]
– counterpart ofimmutable.HashSet
immutable.ParHashMap[K, V]
– counterpart ofimmutable.HashMap
ParTrieMap[K, V]
– thread-safe parallel map with atomic snapshots, counterpart of TrieMap- for other collections, par creates the most similar parallel collection – e.g. a List is converted to a ParVector
- 2 rules
- avoid side effect without proper synchronization
- never modifiy a parallel collection on which a data-parallel operation is in progress
- exceptions : TrieMap allows traversing and modifying at the same time (by snapshot)
- 4 abstractions :
- iterators :
trait Iterator[A] { def next(): A; def hasNext: Boolean }
- splitters :
trait Splitter[A] extends Iterator[A] { def split: Seq[Splitter[A]]; def remaining: Int }
- after call to split, the original splitter is in an undefined state
- disjoint
- remaining is an estimate
- split is efficient O(log n) or better
- builders :
trait Builder[A, Repr] { def +=(elem: A): Builder[A, Repr]; def result: Repr }
- calling result leave the Builder in an undefined state
- combiners :
trait Combiner[A, Repr] extends Builder[A, Repr]
{ def combine(that: Combiner[A, Repr]): Combiner[A, Repr] }
- combine returns a new combiner that contains elements of input combiners
- combine leaves both original Combiners in an undefined state
- combine is efficient O(P log (n)) or better (P number of processors)
- iterators :
- set data structures
- lookup, intertion, deletion : hash tables O(1)
- combine represents union
- sequence data structures
- mutable prepend, append O(1) or intertion O(n)
- functional prepend O(1), or else O(n)
- array lists append, random access O(1) or else O(n)
- mutable linked list can have O(1) concatenation
- combine represents concatenation
- two phases construction : no efficient concatenation so use intermediate data structure is a data structure defined with
- efficient combine method
- efficient += method
- result method max O(n) but parallelizable
- Array combiner :
class ArrayCombiner[T <: AnyRef: ClassTag](val parallelism: Int)
{ private var numElems = 0; private val buffers = new ArrayBuffer[ArrayBuffer[T]];
buffers += new ArrayBuffer[T]
- compute multilevel array at the end
- can be used :
xs.par.aggregate(newCombiner)(_ += _, _ combine _).result
- conc-trees
- list are built for sequential computations (left to right)
- trees allow parallel computation but not balanced...
- lets use con-trees :
sealed trait Conc[+T] { def level: Int; def size: Int; def left: Conc[T]; def right: Conc[T] }
- invariants
- A <> node can never contain Empty as its subtree
- The level difference between the left and the right subtree of a <> node is always 1 or less (only an indication)
- concatenation in O(h1-h2) (h is the height)
- two trees could have height difference one or less : concat both
- left tree is higher that right one
- left tree is left-leaning : recursively concatenate the right subtree
- left tree is right-leaning : get right tree subtrees with left tree and the right one and concatenate the 4 part according smallest levels
- data-parallelism in distributed setting
- spark abstraction (Apache)
- distribution introduce important concerns
- partial failure in a distributed computation
- latency due to network communication
- latency number
L1 cache reference ......................... 0.5 ns
Branch mispredict ............................ 5 ns
L2 cache reference ........................... 7 ns
Mutex lock/unlock ........................... 25 ns
Main memory reference ...................... 100 ns
Compress 1K bytes with Zippy ............. 3,000 ns = 3 µs
Send 2K bytes over 1 Gbps network ....... 20,000 ns = 20 µs
SSD random read ........................ 150,000 ns = 150 µs
Read 1 MB sequentially from memory ..... 250,000 ns = 250 µs
Round trip within same datacenter ...... 500,000 ns = 0.5 ms
Read 1 MB sequentially from SSD* ..... 1,000,000 ns = 1 ms
Disk seek ........................... 10,000,000 ns = 10 ms
Read 1 MB sequentially from disk .... 20,000,000 ns = 20 ms
Send packet CA->Netherlands->CA .... 150,000,000 ns = 150 ms
- humanzied : multiply all these durations by a billion
L1 cache reference 0.5 s One heart beat (0.5 s)
Branch mispredict 5 s Yawn
L2 cache reference 7 s Long yawn
Mutex lock/unlock 25 s Making a coffee
Main memory reference 100 s Brushing your teeth
Compress 1K bytes with Zippy 50 min One episode of a TV show (including ad breaks)
Send 2K bytes over 1 Gbps network 5.5 hr From lunch to end of work day
SSD random read 1.7 days A normal weekend
Read 1 MB sequentially from memory 2.9 days A long weekend
Round trip within same datacenter 5.8 days A medium vacation
Read 1 MB sequentially from SSD 11.6 days Waiting for almost 2 weeks for a delivery
Disk seek 16.5 weeks A semester in university
Read 1 MB sequentially from disk 7.8 months Almost producing a new human being
The above 2 together 1 year
Send packet CA->Netherlands->CA 4.8 years Average time it takes to complete a bachelor's degree
- distributed data-parallel
- Shared memory case : Data partitioned in memory and operated upon in parallel
- Distributed case : Data partitioned between machines, network in between, operated upon in parallel
- RDDs : Resilient Distributed Datasets
- like an immutable sequential or parallel scala collection
- combinators : map, flatMap, filter, reduce, fold, aggregate but return RDD instead
- creation :
spark.textFile(”hdfs://...”)
orsc.parallelize(largeList)
- transformers (new collection as result) are transformations : lazy RDD not immediately computed
- accessors (value as result) are actions : eager result immediately computed
- important RDD transformations
- sample : sample a fraction of the data
- union (duplicates remain)
- intersection (duplicates remain)
- distinct
- coalesce : decrease the number of partitions in the RDD to given number (after filtering)
- repartition : reshuffle the data randomly
- collect : return all elements as an array
- count
- foreach : for side effects
- saveAsTextFile : write elements of the dataset as a text file
- in the cluster
- tranformations are deferred until we add an action : no intermediate RDD (chain of operations is optimized)
- coordination done by the driver program of Spark
- recomputed each time there is call for an actions : retains intermediate collection by calling
.persist()
- cached in memory or in disk
- memory_only vs memory_only_ser (serizalized data) vs memory_and_disk vs memory_and_disk_ser vs disk_only
- pair RDD
- groupByKey
- reduceByKey
- join (inner) : combined pairs whose keys are present in both input
- mapVsalues
- leftOuterJoin/rightOuterJoin : combined pairs whose key is present in either
- countByKey (action)
- carefull on the shuffle (latency)
- creation : simple map
- no foldLeft : as sequential makes no sense when distributed
- shuffling : reduce by key is better as it reduce locally first
- happen (rule of thumb) : can occur when the resulting RDD depends on other elements from the same RDD or another RDD
- can be reduced by join or smart partitioning
- use method
toDebugString
to check whether it was shuffled (ShuffledRDD
)
- partitioning
- partitions never span multiple machines
- one machine can hold multiple partition (by default total number of cores on executor nodes)
- hash partitioning (spread evenly)
- range partitioning (better for specific data locality)
val partitioned = pairs.partitionBy(new RangePartitioner(8, pairs)).persist()
- argument : desired number of partitions and provide a pair RDD with ordered keys which is used to sample a suitable set of sorted ranges
- parent heritage
- automatically-set partitions (i.e. sortByKey give a RangePartitioner and groupByKey is a hashed one)
- partionners propagation : cogroup, groupWith, joins, byKeys, sort, mapValues (if parent) and filter (if parent)
- closures and capturing
- bad serialization or impossible (require all caputred variable to be serializable)
- too large (always pass local variable and not class-wide ones)
- think of JVM replacement this.myvar
- shared variables
- broadcast variables : allow the programmer to keep a read-only variable cached on each machine rather than shipping a copy of it with tasks
- creation :
sc.broadcast(team)
- usage :
broadcastTeam.value.keys
- creation :
- accumulators : are variables that are only “added” to through an associative operation and can therefore be efficiently supported across nodes in parallel
- use for counters
- only numeric
- creation :
val badRecords = sc.accumulator(0)
- usage :
badRecords += 1
- result :
badRecords.value
- actions : each tasks' update is applied to each accumulator only once
- transformation : can be used more than one time (should only be used for debugging)
- broadcast variables : allow the programmer to keep a read-only variable cached on each machine rather than shipping a copy of it with tasks
- reactive applications : readily responsive to a stimulus
- event-driven : react to events
- scalable : react to load
- resilient : react to failures (overcome software, hardware, network and be loose coupling, state encapsulated)
- responsive : react to users (rich, real-time interaction, careful on system design)
- traditionally : multiple threads which communicate with syncrhonized shared state (hard to compose)
- nowadays : loosely coupled event handlers (asynchronously, without blocking)
- scalability : ability to be expanded according to its usage (important to minimize shared mutable state)
- scale up : make use of parallelism
- scale out : make use of multiple server (must be location transparent and resilient)
- call-backs : problem with shared mutable state, no composition
- composition : fundamental constructions of functional programming
- events are first class
- events are often represented as messages
- handlers of events also first-class
- complex handler can be composed from primitive ones
- partial matches :
val f: String => String = { case ”ping” => ”pong” }
- MatchError on
f(”pong”)
- trait
- MatchError on
trait PartialFunction[-A, +R] extends Function1[-A, +R] {
def apply(x: A): R
def isDefinedAt(x: A): Boolean
}
- for-expression
for (x <- e1) yield e2
:e1.map(x => e2)
for (x <- e1 if f; s) yield e2
:for (x <- e1.withFilter(x => f); s) yield e2
(potentially empty)for (x <- e1; y <- e2; s) yield e3
:e1.flatMap(x => for (y <- e2; s) yield e3)
pat <- expr
:x <- expr withFilter { case pat => true; case _ => false } map { case pat => x }
- random generator
trait Generator[+T] {
self => // an alias for ”this”.
def generate: T
def map[S](f: T => S): Generator[S] = new Generator[S] {
def generate = f(self.generate)
}
def flatMap[S](f: T => Generator[S]): Generator[S] = new Generator[S] {
def generate = f(self.generate).generate
}
}
- scalacheck :
forAll { (l1: List[Int], l2: List[Int]) => l1.size + l2.size == (l1 ++ l2).size }
- monads (bind) : is a parametric type
M[T]
with following rules- unit operation :
def unit[T](x: T): M[T]
- flatMap :
trait M[T] { def flatMap[U](f: T => M[U]): M[U] }
- List, Set, Option, Generator
- map is flatmap with unit :
m map f == m flatMap (x => unit(f(x))) == m flatMap (f andThen unit)
- associativity :
m flatMap f flatMap g == m flatMap (x => f(x) flatMap g)
- left unit :
unit(x) flatMap f == f(x)
- right unit :
m flatMap unit == m
- unit operation :
Try[+T]
: pass computation that can fail between threads or computersSuccess[T](x: T) extends Try[T]
Failure(ex: Excption) extends Try[Nothing]
- left unit fails but ...
- bullet-proof principle : an expression composed from Try, map, flatMap will never throw a non-fatal exception
- statefulness : has a state because it depends on history
- referential transparency : whether two vars are the same by operational equivalence (no possible test can distinguish between them)
- observer pattern : publish/subscribe
trait Publisher {
private var subscribers: Set[Subscriber] = Set()
def subscribe(subscriber: Subscriber): Unit =
subscribers += subscriber
def unsubscribe(subscriber: Subscriber): Unit =
subscribers -= subscriber
def publish(): Unit =
subscribers.foreach(_.handler(this))
}
trait Subscriber {
def handler(pub: Publisher)
}
- force imperative pattern as unit typed
- moving part need to be co-ordinated
- concurrenvy makes things more complicated
- 1/3 in Adobe applications are devoted to events and 1/2 of the bugs are in it
- reacting to sequences of events that happen in time
Signal[T]
: change value over time (instead of propagating updates, define new signals in terms of existing ones)- obtain value : need for
()
- constructor :
Signal { val pos = mousePosition(); LL <= pos && pos <= UR }
(immutable) - variable signal :
Var(3)
- update :
sig.update(5)
orsig() = 5
- avoid cyclic definition by pulling out the value to be updated
- stackable variables
- obtain value : need for
class StackableVariable[T](init: T) {
private var values: List[T] = List(init)
def value: T = values.head
def withValue[R](newValue: T)(op: => R): R = {
values = newValue :: values
try op finally values = values.tail
}
}
- signal
class Signal[T](expr: => T) {
import Signal._
private var myExpr: () => T = _
private var myValue: T = _
private var observers: Set[Signal[_]] = Set()
update(expr)
protected def update(expr: => T): Unit = {
myExpr = () => expr
computeValue()
}
protected def computeValue(): Unit = {
val newValue = caller.withValue(this)(myExpr())
if (myValue != newValue) {
myValue = newValue
val obs = observers
observers = Set()
obs.foreach(_.computeValue())
}
}
def apply() = {
observers += caller.value
assert(!caller.value.observers.contains(this), ”cyclic signal definition”)
myValue
}
}
- thread-local state (each having its copy) : DynamicVariable (but can lead to performance problem and messy situations)
- best would be having an implicit parameters to pass its current value
- continous signal : done by sampling instead of propagation
- synchronous
- one : T/Try[T]
- many : Iterable[T]
- asynchronous
- one : Future[T]
- many : Observable[T]
- synchrone <=> asynchrone : duality
- one <=> many : cardinality
- honest type of return : "monad" Try
def map[S](f: T => S): Try[S] = this match{
case Success(value) => Try(f(value))
case failure@Failure(t) => failure
}
- Duration
import scala.language.postfixOps
object Duration {
def apply(length: Long, unit: TimeUnit): Duration
}
val fiveYears = 1826 minutes
- type that handle latency : "monad" Future
- need context :
import scala.concurrent.ExecutionContext.Implicits.global
- callback :
def onComplete(callback: Try[T] => Unit)(implicit executor: ExecutionContext): Unit
- need to use pattern matching in callback : sucess or error
- recover :
def recover(f: PartialFunction[Throwable,T]): Future[T]
- recoverWith :
def recover(f: PartialFunction[Throwable,Future[T]]): Future[T]
- fallbackTo :
def fallbackTo(that: =>Future[T]): Future[T]
- retry :
def retry(noTimes: Int)(block: =>Future[T]): Future[T]
- await :
val c = Await.result(confirmation, 2 seconds); println(c.toText)
- need context :
trait Awaitable[T] extends AnyRef {
abstract def ready(atMost: Duration): Unit
abstract def result(atMost: Duration): T
}
trait Future[T] extends Awaitable[T] {
def filter(p: T=>Boolean): Future[T]
def flatMap[S](f: T=>Future[S]): Future[U]
def map[S](f: T=>S): Future[S]
def recoverWith(f: PartialFunction[Throwable, Future[T]]): Future[T]
}
object Future {
def apply[T](body : =>T): Future[T]
}
def sequence[T](fts: List[Future[T]]): Future[List[T]] = {
fts match {
case Nil => Future(Nil)
case (ft::fts) => ft.flatMap(t => sequence(fts).flatMap(ts => Future(t::ts)))
}
}
- async await magic
- async :
def async[T](body: =>T)(implicit context: ExecutionContext) :Future[T]
- await :
def await[T](future: Future[T]): T
- always :
async{ … await{…}…}
- illegal use
- await need direclty-enclosing async : no await inside closure nested within an async block
- no await inside expression by-name parameter
- no await inside Boolean short-circuit argument
- no return expression inside async
- no await under try/catch
- async :
def flatMap[S](f: T ⇒ Future[S]): Future[S] = async {
val x: T = await { this }
await { f(x) }
}
def retry(noTimes: Int)(block: ⇒Future[T]): Future[T] = async {
var i = 0
var result: Try[T] = Failure(new Exception(“…”))
while (result.isFailure && i < noTimes) {
result = await { Try(block) }
i += 1
}
result.get
}
def filter(p: T ⇒ Boolean): Future[T] = async {
val x = await { this }
if (!p(x)) {
throw new NoSuchElementException()
} else {
x
}
}
- promise
trait Promise[T] {
def future: Future[T]
def complete(result: Try[T]): Unit
def tryComplete(result: Try[T]): Boolean
def success(value: T): Unit = this.complete(Success(value))
def failure(t: Throwable): Unit = this.complete(Failure(t))
}
def filter(pred: T ⇒ Boolean): Future[T] = {
val p = Promise[T]()
this onComplete {
case Failure(e) ⇒
p.failure(e)
case Success(x) ⇒
if (!pred(x)) p.failure(new NoSuchElementException)
else p.success(x)
}
p.future
}
- duality : Try <=> Future by simplifying types (
(Try[T] => Unit) => Unit
to() => (() => Try[T])
)
- Iterable : "monad"
trait Iterable[T] {
def iterator(): Iterator[T]
}
trait Iterator[T] {
def hasNext: Boolean
def next(): T
}
- Observables : "monad" by duality reversion (
()=>(()=>Try[Option[T]])
to(Try[Option[T]]=>Unit)=>Unit
)
trait Observable[T] {
def Subscribe(observer: Observer[T]): Subscription
}
trait Observer[T] {
def onNext(value: T): Unit
def onError(error: Throwable): Unit
def onCompleted(): Unit
}
trait Subscription {
def unsubscribe(): Unit
def isUnsubscribed: Boolean
}
- examples
val ticks: Observable[Long] = Observable.interval(1 seconds)
val evens: Observable[Long] = ticks.filter(_%2==0)
val bufs: Observable[Seq[Long]] = evens.slidingBuffer(2,1)
val s = bufs.subscribe(println(_))
s.unsubscribe()
- RX operators : marble diagrams
- map
- flatMap : second application could overlap with some other first application
- flatten : respect time
- merge : if one fails, everything stop
- concat : flatten but with order respect
- groupBy : separate flux
def flatMap(f: T=>Observable[S]): Observable[S] = { map(f).merge }
- cold observer : emit when its observable finds it convienient
- each subscriber has its own private source
- subscription causes side effect
- hot observer : emit when created
- same source shared by all subscribers
- no side effect
- subscritions
- CompositeSubscription : collection of subscriptions
- MultiAssignementSubscription : swap underlying subscription
- SerialSubscription : unsubscribed when swapped
- idempotent : has same effect if called once or multiple times
trait Subscription {
def unsubscribe(): Unit
def isUnsubscribed: Boolean
}
object Subscription {
def apply(unsubscribe: =>Unit): Subscription
}
- RX streams
- error
- startWith
- filter
- map
- reduce
create{ onNext(); onNext; onComplete }
:def create[T](s: Observer[T] => Subscription): Observable[T]
object Observable {
def create[T](subscribe: Observer[T] ⇒ Subscription): Observable[T]
def apply[T](f: Subscriber[T] => Unit): Observable[T]
}
def never(): Observable[Nothing] =
Observable[Nothing](subscriber=> {})
def empty(): Observable[Nothing] =
Observable[Nothing](subscriber=> {
subscriber.onCompleted()
})
def +:[U >: T](elem: U): = {
Observable.create[T](observer ⇒ {
observer.onNext(s); this.subscribe(observer)
})
}
def filter(p: T ⇒ Boolean): Observable[T] = {
Observable.create[T](observer ⇒ {
this.subscribe(
(t: T) => if(p(t)) observer.onNext(t),
(e: Throwable) => observer.onError(e),
() => observer.onCompleted()
)
})
}
def map[S](f: T=>S): Observable[S] = {
Observable.create[S](observer ⇒ {
subscribe (
(t: T) => observer.onNext(f(t)),
(e: Throwable) => observer.onError(e),
() => observer.onCompleted()
)
})
}
- Subject[T] : Promise equivalent for many (stop if error)
- can be subscribe/unsubscribe as wanted
- PublishSubject : current value
- ReplaySubject : repeat and caches all values
- AsyncSubject : caches final value
- BehaviorSubject : caches lastest value
object Observable {
def apply[T](f: Future[T]): Observable[T] = {
val subject = AsyncSubject[T]()
f.onComplete {
case Failure(e) ⇒ { subject.onError(e) }
case Success(t) ⇒ {
subject.onNext(t)
subject.onCompleted()
}
}
subject
}
}
- observable notification : case class enclosing message (materialize method)
- Schedulder
trait ExecutionContext {
def execute(runnable: Runnable): Unit
}
trait Scheduler {
def now: Long
def createWorker: Worker
}
trait Worker extends Subscription {
def schedule (action: => Unit): Subscription
def schedule(delay: Duration)(action: => Unit): Subscription
def schedulePeriodically(initialDelay: Duration, period: (action: => Unit): Subscription
def scheduleRec (action: => Unit): Subscription
def now: Long
}
- observable contract
- auto-unsubscribe : when onCompleted or onError
- scala lock :
synchronized(obj) { }
- block
- introduce dead-lock risk
- actor model : represents objects and their interactions build upon the laws of physics
- an actor
- is an object with identity
- has a behiavor
- only interacts using asynchronous message passing
- send message
- create actors
- designate the behavior for the next message
type Receive = PartialFunction[Any, Unit]
trait Actor {
implicit val self: ActorRef
implicit val context: ActorContext
def receive: Receive
}
trait ActorContext {
def become(behavior: Receive, discardOld: Boolean = true): Unit
def unbecome(): Unit
def stop(a: ActorRef): Unit
def actorOf(p: Props, name: String): ActorRef
}
abstract class ActorRef {
def !(msg: Any)(implicit sender: ActorRef = Actor.noSender): Unit
def tell ...
}
- new actor :
context.actorOf(Props[Counter], "counter")
- message processing semantics
- local execution, no notion of global synchronization
- fully concurrent
- actor itself is single-threaded
- processing one message is the atomic unit
- enqueues message
- delivery guarantess : unreliable
- at-most-once : sending once delivers 0 or 1 time
- at-leat-once : resending until acknowledged
- exactly-once : processing only first reception delivers 1 time
- no reordering possible
- reliable messaging
- all emssage can be persisted
- can include unique ID
- retry delivery
- actor should be created in its companion object as well as its message
- logging : extending ActorLogging and use
log.debug()
- data structures should be immutable but in var
- receive timeout :
context.setReceiveTimeout(10 seconds)
- scheduler : to send message to itself (otherwise break encapsulation)
- scheduleOnce : delay, target
- testing actors :
expectMsg("happy")
- failure : containment and automatic response needed
- terminating or restarting : Restart, Stop, Escalate
- decision taken by supervisor (the creator)
- supervisorStrategy
- OneForOneStrategy : partial function with error and reaction
- AllForOneStrategy : allow finite number of restart (in a time)
- events
- preStart
- preRestart
- postRestart
- postStop
- DeathWatch : once an actor stopped, no message will come from him
- lifecycle monitoring :
def watch(target: ActorRef): ActorRef
and may receive Terminated(true) (false if not sure whether it has existed) until unwatch - eventstream
- subscribe(sub, topic)
- unsubscribe(sub)
- publish(event)
- persistance : ability to recover state at (re)start
- in-place updates : constant time, depends on volume records
- persist changes in append-only fashion : history can be replayed, immutable
- can be async or sync (not confirmation until persist confirmation) : tradeoff between throughput and consistency