Getting programmatic feedback from Kiba ETL

October 11, 2018

When writing a Kiba ETL job, you may find desirable to get some form of feedback from the job, such as:

  • Some metrics telling what the job did (e.g. count of inserted/updated records).
  • A list of values which it didn’t manage to map correctly.
  • A list of primary keys.
  • Etc…

If that’s the case, I recommend that you use the new Kiba programmatic API. This API provides more control over the job, compared to the original way of writing Kiba jobs as .etl files.

A concrete example: counting source records

Let’s imagine we have a CSV-reading source job.

Legacy mode (Kiba v1)

With Kiba v1 you would have written your job as an .etl file:

source MyCSVSource, filename: "input.csv"
# SNIP

which you would then have run with bundle exec kiba my_job.etl.

This provides limited abilities to get output, via logging, some form of file writing, or writing to STDOUT, but that’s all.

Kiba API (Kiba v2)

To provide a richer & more flexible integration, you can use the Kiba API and write your Kiba job as a “regular” Ruby class:

# lib/etl/import.rb

module ETL
  module Import
    module_function
    
    def setup(filename:)
      Kiba.parse do
        source MyCSVSource, filename: filename
        # snip
      end
    end
  end
end

which you can run programmatically (e.g. in a Rake task, but it could also be inside a Sidekiq job), with:

require 'etl/import'

job = ETL::Import.setup(filename: "input.csv")
Kiba.run(job)

At this point you still do not get any feedback, but we can pass a Hash variable to let the job aggregate statistics:

def setup(filename:, stats:)
  Kiba.parse do
    source MyCSVSource, filename: filename
    
    transform do |row|
      stats[:records_read_count] += 1
      row
    end
    
    # SNIP
  end
end

which you can invoke with:

stats = Hash.new(0)
job = ETL::Import.setup(filename: "input.csv", stats: stats)
Kiba.run(job)
puts stats.fetch(:records_read_count)

This will work nicely when you need to provide visual feedback to a user, for instance, or trigger emails as a result of specific outcome.

Going further with Ruby lambda hooks

Let’s imagine you need to gather statistics in a Kiba destination. You can leverage Ruby lambdas to do so, like this:

class MyDestination
  attr_reader :on_write

  def initialize(on_write: nil)
    @on_write = on_write
  end
  
  def write(row)
    # do your writing, then call the hook if passed
    on_write&.call(row)
  end
end

This provides a generic hook to instrument the destination:

destination MyDestination, on_write: -> (row) { stats[:written_count] += 1 }

As you see, passing a simple Hash around then writing components to manipulate it provides many ways to get feedback from Kiba jobs, be it in real life (e.g. providing user with feedback) or when you write tests.


Thank you for sharing this article!