Kiba ETL is an open-source data processing framework for Ruby, first published in 2015, and now available in v3.
Version 3 introduces 2 important changes, driven by what we’ve learned on the field implementing customers projects.
This post provides an in-depth explanation of those changes (with animated SVG to make things a bit more explicit!).
But first, what can Kiba ETL help you achieve?
In case you’re not familiar with it, Kiba ETL is being used in a wide variety of scenarios, including:
- Data integration (e.g. batch synchronization between 2 systems)
- Data aggregation pipelines (e.g. take multiple sources and build a well structured database, ready for querying)
- Construction/update of datawarehouses & OLAP-structured databases
- Business Intelligence reports
- Data consolidation reports (e.g. VAT reports based on bank accounts)
- Large data migrations (e.g. migrating from MySQL to Postgres).
You can learn more on those use-cases in my 40-minute Ruby Kaigi 2018 talk
Kiba OSS is supported/funded by LoGeek, and can be enhanced with Kiba Pro, which will be officially launched in the coming weeks (the first large iteration focuses on batch insert/upsert/lookups for SQL workloads, such as datawarehouses and data migrations).
Change #1 - “StreamingRunner” becomes the default
Situation in Kiba v1
The “runner” is the part of Kiba responsible for processing your pipeline declaration.
In Kiba v1, the Kiba runner takes each “data row” generated by your ETL source component, and passes it along to each transform, then finally to the destination (usually responsible for storing your data somewhere). It visually goes like this:
This works well for most cases, but has a couple of consequences.
Imagine you have created a Kiba source able to extract XML elements from a group of files on disk (with each file containing N elements that we’d want to extract as individual rows).
It would typically look like this:
class MySource
attr_reader :dir_pattern
def initialize(dir_pattern:)
@dir_pattern = dir_pattern
end
def each
Dir[dir_pattern].sort.each do |file|
doc = Nokogiri::XML(IO.binread(file))
doc.search('/invoices/invoice').each do |item|
yield(item)
end
end
end
end
Such a class has 4 responsabilities:
- The directory iteration
- The XML parsing
- The XML node search
- The act of “exploding” each detected sub-node as an independent row
This situation reduces the ability to make the source more reusable and modular, and can lead to situations where you would have to chain 2 Kiba pipelines (multi-pass processing) rather than 1.
Situation in Kiba v2
Kiba v2 introduced an opt-in “StreamingRunner”, which allows each transform in the pipeline to “yield” an arbitrary amount of output rows for a given input row. Visually, this gives:
With this new “runner”, you can rewrite the previous code as 4 independent components.
First is the actual source, this time only responsible for directory iteration:
class DirectoryLister
def initialize(dir_pattern:)
def each
Dir[dir_pattern].sort.each do |filename|
yield(filename)
end
end
end
Then a component responsible for XML reading:
class XMLReader
def process(filename)
Nokogiri::XML(IO.binread(filename))
end
end
Another one able to select items in the XML document:
class XMLSearcher
def initialize(selector:)
def process(doc)
doc.search(selector)
end
end
And finally, leveraging the new ability to yield an arbitrary amount of rows at a transform level:
class EnumerableExploder
def process(row)
row.each do |item|
yield(item)
end
nil # tell the pipeline to ignore the final value
end
end
If we wrap those components together, this gives:
job = Kiba.parse do
source DirectoryLister, dir_pattern: '*.csv'
transform XMLReader
transform XMLSearcher, selector: '/invoices/invoice'
transform EnumerableExploder
# SNIP
end
Each of these 4 components can now be mix-and-matched with other components, in completely unrelated scenarios. For instance:
- The DirectoryLister could be used to list anything (JSON files etc).
- The EnumerableExploder, similarly, could be used for pretty much anything.
You can even (more advanced use) leverage kiba-common
’s (Kiba sister project) SourceTransformAdapter
to dynamically create sources in the middle of the pipeline, using each row as the configuration data for the source creation.
Ultimately, this change allows components to be more reusable, more independent, and also easier to test, which makes for a more manageable & scalable ETL codebase.
Situation in Kiba v2.5
In Kiba v2.5, the runner has been improved to support an optional close
method on transforms, called when all the source rows have been read, and able to yield
an arbitrary number of rows too.
Together with v2 improvement, this means a transform can temporarily “buffer” a batch of rows, then when the buffer has reached the correct size, work on the group of rows as a whole, and finally release them (modified) to the next part of the pipeline. Visually again:
For instance, you can now write code such as this “aggregating transform”:
class AggregateTransform
def initialize(aggregate_size:)
@aggregate_size = aggregate_size
end
def process(row)
@buffer ||= []
@buffer << row
if @buffer.size == @aggregate_size
yield @buffer
@buffer = []
end
nil
end
def close
yield @buffer unless @buffer.empty?
end
end
The process(row)
method is called for each row. Because you are sure that close
will ultimately be called (and can yield
) after your last row is processed (unless an error is raised), you can safely work on groups of rows, and the last (potentially incomplete) batch of rows will still be processed as expected.
This can be used to write grouping, sorting, batching components.
Situation in Kiba v3
In Kiba v3, the new “StreamingRunner” becomes the default (as we’ve been using it in production on a large number of cases), rather than opt-in, to encourage increased components reuse and also more powerful features.
Change #2 - Deprecation of the “kiba” command line interface
Kiba started (in v1) as a tool you would run using the bundle exec kiba my_script.etl
command. Back then the goal was to write cron
jobs and one-off CLI commands, just like activewarehouse-etl (Kiba’s spiritual parent) did.
As more projects were implemented, we also gradually wanted to trigger jobs in response to events such as an HTTP request, and ideally from a background job (Sidekiq etc), without having to shell out to an external process (which is slow and usually more complicated).
Kiba v2 officialized the support for a “programmatic API” (previously this was only considered an internal testing tool), that you can use from “live Ruby code” instead of using the kiba
command, like this:
job = Kiba.parse do
source ...
transform ...
transform ...
destination ...
end
Kiba.run(job)
This changes brings a lot of benefits:
- You can easily trigger Kiba jobs from wherever you need (Sidekiq jobs, IRB, or a Rake task)
- Integration testing of full pipelines is faster (no shelling out) via RSpec or minitest
- Testing of such pipelines can easily rely on in-process mocking (without resorting to complicated out-of-process mocking techniques)
- You can pass Ruby variables to the job (rather than using only
ENV
variables or on-disk config files) to drive the pipeline behaviour - You can easily wrap resources open/closes with a block (to ensure e.g. a connection is closed after your job)
- Also, code in general tends to be cleaner (no top-level namespace pollution)
Ultimately, in Kiba v3, using Kiba.parse/Kiba.run
API is the way to go. The previous CLI is removed (but you can reimplement it easily via Rake tasks).
Documentation rewrite
To reflect those important changes, the documentation has been rewritten almost completely.
You are invited to check it out (and please report any troubles you may have when reading it!)
What’s next?
In the coming weeks, we will integrate an extra SQL transform (batch SQL lookup) to Kiba Pro, and will issue an official “launch” post.
We will also finalize support for Ruby 2.7 in Kiba OSS, and share more articles on explaining how to use Ruby for data processing.
Thanks for reading, and subscribe to the newsletter below to get updates!
Thank you for sharing this article!