What's new in Kiba ETL v3 (visually explained)

March 05, 2020

Kiba ETL is an open-source data processing framework for Ruby, first published in 2015, and now available in v3.

Version 3 introduces 2 important changes, driven by what we’ve learned on the field implementing customers projects.

This post provides an in-depth explanation of those changes (with animated SVG to make things a bit more explicit!).

But first, what can Kiba ETL help you achieve?

In case you’re not familiar with it, Kiba ETL is being used in a wide variety of scenarios, including:

  • Data integration (e.g. batch synchronization between 2 systems)
  • Data aggregation pipelines (e.g. take multiple sources and build a well structured database, ready for querying)
  • Construction/update of datawarehouses & OLAP-structured databases
  • Business Intelligence reports
  • Data consolidation reports (e.g. VAT reports based on bank accounts)
  • Large data migrations (e.g. migrating from MySQL to Postgres).

You can learn more on those use-cases in my 40-minute Ruby Kaigi 2018 talk

Kiba OSS is supported/funded by LoGeek, and can be enhanced with Kiba Pro, which will be officially launched in the coming weeks (the first large iteration focuses on batch insert/upsert/lookups for SQL workloads, such as datawarehouses and data migrations).

Change #1 - “StreamingRunner” becomes the default

Situation in Kiba v1

The “runner” is the part of Kiba responsible for processing your pipeline declaration.

In Kiba v1, the Kiba runner takes each “data row” generated by your ETL source component, and passes it along to each transform, then finally to the destination (usually responsible for storing your data somewhere). It visually goes like this:

This works well for most cases, but has a couple of consequences.

Imagine you have created a Kiba source able to extract XML elements from a group of files on disk (with each file containing N elements that we’d want to extract as individual rows).

It would typically look like this:

class MySource
  attr_reader :dir_pattern

  def initialize(dir_pattern:)
    @dir_pattern = dir_pattern
  end

  def each
    Dir[dir_pattern].sort.each do |file|
      doc = Nokogiri::XML(IO.binread(file))
      doc.search('/invoices/invoice').each do |item|
        yield(item)
      end
    end
  end
end

Such a class has 4 responsabilities:

  • The directory iteration
  • The XML parsing
  • The XML node search
  • The act of “exploding” each detected sub-node as an independent row

This situation reduces the ability to make the source more reusable and modular, and can lead to situations where you would have to chain 2 Kiba pipelines (multi-pass processing) rather than 1.

Situation in Kiba v2

Kiba v2 introduced an opt-in “StreamingRunner”, which allows each transform in the pipeline to “yield” an arbitrary amount of output rows for a given input row. Visually, this gives:

With this new “runner”, you can rewrite the previous code as 4 independent components.

First is the actual source, this time only responsible for directory iteration:

class DirectoryLister
  def initialize(dir_pattern:)

  def each
    Dir[dir_pattern].sort.each do |filename|
      yield(filename)
    end
  end
end

Then a component responsible for XML reading:

class XMLReader
  def process(filename)
    Nokogiri::XML(IO.binread(filename))
  end
end

Another one able to select items in the XML document:

class XMLSearcher
  def initialize(selector:)

  def process(doc)
    doc.search(selector)
  end
end

And finally, leveraging the new ability to yield an arbitrary amount of rows at a transform level:

class EnumerableExploder
  def process(row)
    row.each do |item|
      yield(item)
    end
    nil # tell the pipeline to ignore the final value
  end
end

If we wrap those components together, this gives:

job = Kiba.parse do
  source DirectoryLister, dir_pattern: '*.csv'
  transform XMLReader
  transform XMLSearcher, selector: '/invoices/invoice'
  transform EnumerableExploder
  # SNIP
end

Each of these 4 components can now be mix-and-matched with other components, in completely unrelated scenarios. For instance:

  • The DirectoryLister could be used to list anything (JSON files etc).
  • The EnumerableExploder, similarly, could be used for pretty much anything.

You can even (more advanced use) leverage kiba-common’s (Kiba sister project) SourceTransformAdapter to dynamically create sources in the middle of the pipeline, using each row as the configuration data for the source creation.

Ultimately, this change allows components to be more reusable, more independent, and also easier to test, which makes for a more manageable & scalable ETL codebase.

Situation in Kiba v2.5

In Kiba v2.5, the runner has been improved to support an optional close method on transforms, called when all the source rows have been read, and able to yield an arbitrary number of rows too.

Together with v2 improvement, this means a transform can temporarily “buffer” a batch of rows, then when the buffer has reached the correct size, work on the group of rows as a whole, and finally release them (modified) to the next part of the pipeline. Visually again:

For instance, you can now write code such as this “aggregating transform”:

class AggregateTransform
  def initialize(aggregate_size:)
    @aggregate_size = aggregate_size
  end
  
  def process(row)
    @buffer ||= []
    @buffer << row
    if @buffer.size == @aggregate_size
      yield @buffer
      @buffer = []
    end
    nil
  end
  
  def close
    yield @buffer unless @buffer.empty?
  end
end

The process(row) method is called for each row. Because you are sure that close will ultimately be called (and can yield) after your last row is processed (unless an error is raised), you can safely work on groups of rows, and the last (potentially incomplete) batch of rows will still be processed as expected.

This can be used to write grouping, sorting, batching components.

Situation in Kiba v3

In Kiba v3, the new “StreamingRunner” becomes the default (as we’ve been using it in production on a large number of cases), rather than opt-in, to encourage increased components reuse and also more powerful features.

Change #2 - Deprecation of the “kiba” command line interface

Kiba started (in v1) as a tool you would run using the bundle exec kiba my_script.etl command. Back then the goal was to write cron jobs and one-off CLI commands, just like activewarehouse-etl (Kiba’s spiritual parent) did.

As more projects were implemented, we also gradually wanted to trigger jobs in response to events such as an HTTP request, and ideally from a background job (Sidekiq etc), without having to shell out to an external process (which is slow and usually more complicated).

Kiba v2 officialized the support for a “programmatic API” (previously this was only considered an internal testing tool), that you can use from “live Ruby code” instead of using the kiba command, like this:

job = Kiba.parse do
  source ...
  transform ...
  transform ...
  destination ...
end
Kiba.run(job)

This changes brings a lot of benefits:

  • You can easily trigger Kiba jobs from wherever you need (Sidekiq jobs, IRB, or a Rake task)
  • Integration testing of full pipelines is faster (no shelling out) via RSpec or minitest
  • Testing of such pipelines can easily rely on in-process mocking (without resorting to complicated out-of-process mocking techniques)
  • You can pass Ruby variables to the job (rather than using only ENV variables or on-disk config files) to drive the pipeline behaviour
  • You can easily wrap resources open/closes with a block (to ensure e.g. a connection is closed after your job)
  • Also, code in general tends to be cleaner (no top-level namespace pollution)

Ultimately, in Kiba v3, using Kiba.parse/Kiba.run API is the way to go. The previous CLI is removed (but you can reimplement it easily via Rake tasks).

Documentation rewrite

To reflect those important changes, the documentation has been rewritten almost completely.

You are invited to check it out (and please report any troubles you may have when reading it!)

What’s next?

In the coming weeks, we will integrate an extra SQL transform (batch SQL lookup) to Kiba Pro, and will issue an official “launch” post.

We will also finalize support for Ruby 2.7 in Kiba OSS, and share more articles on explaining how to use Ruby for data processing.

Thanks for reading, and subscribe to the newsletter below to get updates!


Thank you for sharing this article!