Skip to content
JeanHuguesRobert edited this page Feb 15, 2013 · 2 revisions

Simple example, explained

Two steps. Hypothetical synchronous version if functions could block:

  function fetch( a ){
    meth1( a)
    return meth2( a)
  }

Idem but actual javascript using callback style:

  function fetch( a, cb ){
    meth1( a, function( error, result ){
      if( error ) return cb( error);
      meth2( a, function( error, result ){
        cb( error, result);
      }
    }
  }

Idem but using l8, extra long version:

  function fetch_this_and_that( a, callback ){
  return l8.begin
    .step( function(){
      meth1( a, this.next ) })
    .step( function( err, result ){
      if( err ) throw err else meth2( a, this.next }) })
    .step( function( err, result ){
      if( err ) throw err else return result })
    .final( function( err, result ){ callback( err, result) })
  .end}

CoffeeScript, much shorter, also thanks to Task() functor:

  fetch = l8.Task (a,cb) ->
    @step        -> meth1 a, @walk
    @step  (e,r) -> if e then throw e else meth2 a, @walk
    @step  (e,r) -> if e then throw e else r
    @final (e,r) -> cb e, r

Idem but returning a promise instead of using a callback:

  fetch = l8.Task (a) ->
    @step       -> meth1 a, @walk
    @step (e,r) -> if e then throw e else meth2 a, @walk
    @step (e,r) -> if e then throw e else r

Idem but assuming meth1 and meth2 make tasks returning promises too:

  fetch = l8.Task (a) ->
    @step -> meth1 a
    @step -> meth2 a

Back to Javascript:

  fetch = l8.Task( function( a ){
    l8.step( function(){ meth1( a) })
    l8.step( function(){ meth2( a) })
  })

Using the "transpiler":

  fetch = l8.compile( function( a ){
    step; meth1( a);
    step; meth2( a);
  })

The conclusion is that using tasks, steps and promises, the code's structure is similar to the hypothetical javascript blocking function.

Multiple steps, run sequentially:

  fetch_all_seq = l8.Task (urls) ->
    results = []
    for url in urls then do (url) ->
      @step -> scrap url, @proceed (err, content) -> result.push {url, err, content}
    @success -> results

Multiple steps, each run in parallel:

  fetch_all = l8.Task (urls) ->
    results = []
    for url in urls then do (url) ->
      @fork ->
        scrap url, @proceed (err, content) -> results.push {url, err, content}
    @success -> results

Repeated steps, externally terminated, gently:

  spider = l8.Task (urls, queue) ->
    @repeat ->
      url = null
      @step -> url = queue.shift
      @step -> @delay 10000 if @parent.tasks.length > 10
      @step ->
        @break if @stopping
        scrap url, @walk
      @step (err,urls) ->
        return if err or @stopping
        for url in urls
          queue.unshift url unless url in queue

  spider_task = l8.spawn -> spider( "http://xxx.com")
  ...
  stop_spider = -> spider_task.stop

Small loop, on one step, using "continue":

  fire_all = l8.Task (targets) ->
    ii = 0
    @step ->
      return if ii > targets.length
      targets[ii++].fire()
      @continue

StratifiedJs example, see http://onilabs.com/stratifiedjs

var news;
waitfor {
  news = http.get("http://news.bbc.co.uk");
}
or {
  hold(1000);
  news = http.get("http://news.cnn.com");
}
or {
  hold(1000*60);
  throw "sorry, no news. timeout";
}
show(news);

The equivalent code with l8 is:


// JavaScript
var show_news = l8.Task( function(){
  var news = 
  l8.fork( function(){
    http.get( "http://news.bbc.co.uk",
    l8.proceed( function( item ){ news.return( item) }) )
    
  }).fork( function(){
  
    l8.step( function(){
      this.sleep( 1000)
      
    }).step( function(){
      http.get( "http://news.cnn.com",
      l8.proceed( function( item ){ news.return( item) }) )
    })
    
  }).fork( function(){
  
    l8.step( function(){ this.sleep( 1000 * 60)
    
    }).step( function(){ throw "sorry, no news. timeout" })
  })
  .success( function( news ){ show( news) });
})

// CoffeeScript
show_news = l8.Task ->
  news = @current
  @fork ->
    @step -> http.get "http://news.bbc.co.uk"
    @step -> @news.return()
  @fork ->
    @step -> @sleep 1000
    @step -> http.get "http://news.cnn.com"
    @step -> @news.return()
  @fork ->
    @step -> @sleep 1000 * 60
    @step -> throw "sorry, no news. timeout"
  @success( news ) -> show news

// l8 transpiler
var show_new = l8.compile( function(){
  var news = this
  fork; begin
    step; http.get( "http://news.bbc.co.uk");
    step; news.return();
  end
  fork; begin
    step; this.sleep( 1000);
    step; http.get( "http://news.cnn.com");
    step; news.return();
  end
  fork; begin
    step; this.sleep( 1000 * 60);
    step; throw "sorry, no news. timeout";
  end
  success( news ); show( news);
})

Node.js google group "pipe" example

See https://groups.google.com/forum/?fromgroups=#!topic/nodejs/5hv6uIBpDl8

function pipe( inStream, outStream, callback ){
  var loop = function( err ){
    if (err) callback( err);
    else inStream.read( function( err, data ){
      if (err) callback(err);
      else data != null ? outStream.write( data, loop) : callback();
    });
  }
  loop();
}

pipe = l8.Task( function ( inStream, outStream ){
  l8.repeat( function(){
    l8.step( function(){
      inStream.read()
    }).step( function( data ){
      if( !data) l8.break;
      outStream.write( data);
    })
  })
})

pipe = l8.Task (in,out) ->
  @repeat ->
    @step -> in.read()
    @step (data) ->
      @break if !data
      out.write data

pipe = l8.compile( function( in, out ){
  repeat; begin
    step; in.read();
    step( data ); if( !data ) this.break;
    out.write( data);
  end
})

Note: for this example to work, node.js streams need to be "taskified". This is currently left as an exercize (however, work is in progres about that).

The "recursive dir walk" nodejs challenge:

Var fs = require('fs');
var path = require('path');

var recurseDir = function(dir) {
  fs.readdirSync(dir).forEach(function(child) {
    if (child[0] != '.') {
      var childPath = path.join(dir, child);
      if (fs.statSync(childPath).isDirectory()) {
        recurseDir(childPath);
      } else {
        console.log(childPath);
      }
    }
  });
};
recurseDir(process.argv[2]);

// Async version:
var recurseDir = l8.Task( function( dir ){
  l8.step( function(   ){ fs.readdir( dir, l8.flow) })
  l8.step( function( l ){ l.forEach( function( child ){
    if( child[0] != "." ){
      var childPath = path.join( dir, child);
      l8.step( function(   ){ fs.stat( childPath, l8.flow) })
      l8.step( function( r ){
        if( r.isDirectory() ){
          recurseDir( childPath)
        }else{
          console.log(dchildPath)
        }
      })
    }
  }) })
})