Skip to content

Implementing ETL transforms

Armin edited this page Apr 21, 2020 · 7 revisions

Kiba ETL transforms can be implemented as a Ruby class, or a Ruby block.

Row transform as a Ruby class

A Kiba transform is a Ruby class with:

  • a constructor (used for configuration)
  • a process(row) method (responsible for preparing output rows based on an input row)
  • optional: a close method (useful for "yielding transforms" in particular, see next section)

Here is an example of transform, expecting rows as Hash instances with an index key, which will drop rows unless their index value matches the expected modulo value:

class SamplingTransform
  def initialize(modulo_value)
    @modulo_value = modulo_value
  end

  def process(row)
    row.fetch(:index) % @modulo_value == 0 ? row : nil
  end
end

One can then use the transform this way:

job = Kiba.parse do
  # SNIP
  transform SamplingTransform, 10
  # SNIP
end

Kiba will call the process method for each input row.

The process method must return the modified row, or return nil to indicate that the row should be dropped from the pipeline.

Generating more than one output row per input row (a.k.a. yielding transforms)

Since Kiba v3 (or Kiba v2 with StreamingRunner enabled), you can also yield as many rows as you want for a given input row, using the yield keyword.

For technical reasons, this will only work in class transforms, not in block transforms.

While simple in appearance, this is a powerful feature which you can leverage to build more reusable components (see Kiba v2.0.0 release notes for more information).

class ExplodingTransform
  def process(row)
    2.times do |i|
      yield({ value: row, value_index: i })
    end
    # avoid returning a row (as a normal "process" call)
    # but you could return one if needed
    nil
  end
end

⚠️ Be careful not to share reference data types between emitted rows, or any change to one row will impact the others. If needed, use some form of deep-copy or re-instantiate N times.

You can then use this transform this way:

Kiba.parse do
  source Kiba::Common::Sources::Enumerable, (1..4)
  transform ExplodingTransform
end

This will generate 2 rows for each of the 4 input rows:

{ value: 1, value_index: 0 }
{ value: 1, value_index: 1 }
{ value: 2, value_index: 0 }
{ value: 2, value_index: 1 }
{ value: 3, value_index: 0 }
{ value: 3, value_index: 1 }
{ value: 4, value_index: 0 }
{ value: 4, value_index: 1 }

Ability to yield from close

Since Kiba v2.5.0, it is possible to call yield from the optional close method.

This feature is very useful for anything that will batch process groups of rows, or work with aggregates of rows in general.

See PR #57 and those kiba-common transforms for more information.

Row transform as a block

An alternate syntax is available for simple transforms to be written as blocks:

transform do |row|
  row[:this_field] = row.fetch(:that_field) * 10
  # make sure to return the row to keep it in the pipeline
  row
end

⚠️ One cannot use yield from block transforms.

Tip: use Ruby's next to exit early from a block transform

While you cannot call return from a Ruby block, you can return early from a block by using next:

transform do |row|
  # remove a row with `next`
  if row.fetch(:index) % 2 == 0
    next # the row will be removed from the pipeline
  end
    
  # return a modified row
  if row.fetch(:index) % 3 == 0
    next {great_index: row.fetch(:index) * 10}
  end
  
  # otherwise return the row as is
  row
end

This is very useful to avoid nested if statements inside a single block.

Like the class form, the block form can return nil to dismiss the row. The class form allows better testability and reusability across your(s) ETL script(s).

Next: Implementing ETL destinations