Skip to content

Implementation Guidelines

Thibaut Barrère edited this page Feb 10, 2020 · 1 revision

A few recommendations when working with Kiba:

Do not call require inside Kiba.parse

My understanding is that require is generally not thread-safe, so calling require inside Kiba.parse is not recommended in multi-threaded environments.

Do not do this:

job = Kiba.parse do
  require 'dsl_extensions/progress_bar'
  # SNIP
end

You are advised to eager-load all your dependencies instead (e.g. from a Sidekiq initializer, or calling require at the top of your files).

You can pass variables to Kiba.parse

It is very common, and definitely allowed, to reference parameters (such as filenames) or live instances (such as Sequel connections) from Kiba.parse, in order to condition how your job will run.

In the job below, the name of a source file, a live Sequel connection, and a Logger instance, are passed as parameters then used in the definition:

require 'kiba-pro/destinations/sql_upsert'

module ETL
  module SyncPartners
    module_function

    def setup(source_file, sequel_connection, logger)
      Kiba.parse do
        pre_process do
          logger.info "Starting processing for file #{source_file}"
        end

        source CSVSource, 
          filename: source_file,
          csv_options: { headers: true, col_sep: ',' }

        # SNIP
        
        destination Kiba::Pro::Destination::SQLUpsert,
          table: :partners,
          unique_key: :crm_partner_id,
          database: sequel_connection
      end
    end
  end
end

You can then call your job programmatically:

job = ETL::SyncPartners.setup(my_source_file, my_sequel_connection, logger)
Kiba.run(job)

You can use instance variables inside Kiba.parse

It can be useful at times to use instance variables. This can be done safely, as long as you do not reuse job instances (to avoid keeping state around).

For instance, one could build a bit of statistics like this:

job = Kiba.parse do
  pre_process do
    @row_read_from_source_count = 0
  end

  source SomeSource
  transform do |row|
    @row_read_from_source_count += 1
    row
  end

  # SNIP

  post_process do
    puts "#{@row_read_from_source_count} rows have been read from source"
  end
end

If you are careful with choosing well namespaced variables, this can be used together with Kiba DSL extensions.

Avoid re-using the job instance

It is not recommended to re-use the output of Kiba.parse (variable job above) for multiple calls to Kiba.run.

If you do so, you may unknowingly end up sharing some form of state between runs (such as variables parameters as described above, or in the way you write ETL components), leading to unexpected results.

Close resources explicitly

At time of writing (Kiba v3), if an error is raised while Kiba.run is called, nothing is done by Kiba to close resources that you may have opened during the processing (such as files, database connections, etc).

It is for now your responsibility to rescue any error that may happen and to close resources that your components may have opened, or to use construct that will automatically close resources on error (such as the block form of CSV.open).

When applicable, you can also wrap the call to Kiba.run by a block-construct to automatically close the resources you need, e.g:

allocate_connection_from_pool do |connection|
  job = Kiba.parse do
    source SQL, connection: connection
    # SNIP
  end
  Kiba.run(job)
end

(here the connection will be returned to the pool automatically).

Next: How to extend the Kiba DSL