Version 3 introduces 2 important changes, driven by what we’ve learned on the field implementing customers projects.
This post provides an in-depth explanation of those changes (with animated SVG to make things a bit more explicit!).
But first, what can Kiba ETL help you achieve?
In case you’re not familiar with it, Kiba ETL is being used in a wide variety of scenarios, including:
- Data integration (e.g. batch synchronization between 2 systems)
- Data aggregation pipelines (e.g. take multiple sources and build a well structured database, ready for querying)
- Construction/update of datawarehouses & OLAP-structured databases
- Business Intelligence reports
- Data consolidation reports (e.g. VAT reports based on bank accounts)
- Large data migrations (e.g. migrating from MySQL to Postgres).
You can learn more on those use-cases in my 40-minute Ruby Kaigi 2018 talk
Kiba OSS is supported/funded by LoGeek, and can be enhanced with Kiba Pro, which will be officially launched in the coming weeks (the first large iteration focuses on batch insert/upsert/lookups for SQL workloads, such as datawarehouses and data migrations).
Change #1 - “StreamingRunner” becomes the default
Situation in Kiba v1
The “runner” is the part of Kiba responsible for processing your pipeline declaration.
In Kiba v1, the Kiba runner takes each “data row” generated by your ETL source component, and passes it along to each transform, then finally to the destination (usually responsible for storing your data somewhere). It visually goes like this:
This works well for most cases, but has a couple of consequences.
Imagine you have created a Kiba source able to extract XML elements from a group of files on disk (with each file containing N elements that we’d want to extract as individual rows).
It would typically look like this:
class MySource attr_reader :dir_pattern def initialize(dir_pattern:) @dir_pattern = dir_pattern end def each Dir[dir_pattern].sort.each do |file| doc = Nokogiri::XML(IO.binread(file)) doc.search('/invoices/invoice').each do |item| yield(item) end end end end
Such a class has 4 responsabilities:
- The directory iteration
- The XML parsing
- The XML node search
- The act of “exploding” each detected sub-node as an independent row
This situation reduces the ability to make the source more reusable and modular, and can lead to situations where you would have to chain 2 Kiba pipelines (multi-pass processing) rather than 1.
Situation in Kiba v2
Kiba v2 introduced an opt-in “StreamingRunner”, which allows each transform in the pipeline to “yield” an arbitrary amount of output rows for a given input row. Visually, this gives:
With this new “runner”, you can rewrite the previous code as 4 independent components.
First is the actual source, this time only responsible for directory iteration:
class DirectoryLister def initialize(dir_pattern:) def each Dir[dir_pattern].sort.each do |filename| yield(filename) end end end
Then a component responsible for XML reading:
class XMLReader def process(filename) Nokogiri::XML(IO.binread(filename)) end end
Another one able to select items in the XML document:
class XMLSearcher def initialize(selector:) def process(doc) doc.search(selector) end end
And finally, leveraging the new ability to yield an arbitrary amount of rows at a transform level:
class EnumerableExploder def process(row) row.each do |item| yield(item) end nil # tell the pipeline to ignore the final value end end
If we wrap those components together, this gives:
job = Kiba.parse do source DirectoryLister, dir_pattern: '*.csv' transform XMLReader transform XMLSearcher, selector: '/invoices/invoice' transform EnumerableExploder # SNIP end
Each of these 4 components can now be mix-and-matched with other components, in completely unrelated scenarios. For instance:
- The DirectoryLister could be used to list anything (JSON files etc).
- The EnumerableExploder, similarly, could be used for pretty much anything.
You can even (more advanced use) leverage
kiba-common’s (Kiba sister project)
SourceTransformAdapter to dynamically create sources in the middle of the pipeline, using each row as the configuration data for the source creation.
Ultimately, this change allows components to be more reusable, more independent, and also easier to test, which makes for a more manageable & scalable ETL codebase.
Situation in Kiba v2.5
In Kiba v2.5, the runner has been improved to support an optional
close method on transforms, called when all the source rows have been read, and able to
yield an arbitrary number of rows too.
Together with v2 improvement, this means a transform can temporarily “buffer” a batch of rows, then when the buffer has reached the correct size, work on the group of rows as a whole, and finally release them (modified) to the next part of the pipeline. Visually again:
For instance, you can now write code such as this “aggregating transform”:
class AggregateTransform def initialize(aggregate_size:) @aggregate_size = aggregate_size end def process(row) @buffer ||=  @buffer << row if @buffer.size == @aggregate_size yield @buffer @buffer =  end nil end def close yield @buffer unless @buffer.empty? end end
process(row) method is called for each row. Because you are sure that
close will ultimately be called (and can
yield) after your last row is processed (unless an error is raised), you can safely work on groups of rows, and the last (potentially incomplete) batch of rows will still be processed as expected.
This can be used to write grouping, sorting, batching components.
Situation in Kiba v3
In Kiba v3, the new “StreamingRunner” becomes the default (as we’ve been using it in production on a large number of cases), rather than opt-in, to encourage increased components reuse and also more powerful features.
Change #2 - Deprecation of the “kiba” command line interface
Kiba started (in v1) as a tool you would run using the
bundle exec kiba my_script.etl command. Back then the goal was to write
cron jobs and one-off CLI commands, just like activewarehouse-etl (Kiba’s spiritual parent) did.
As more projects were implemented, we also gradually wanted to trigger jobs in response to events such as an HTTP request, and ideally from a background job (Sidekiq etc), without having to shell out to an external process (which is slow and usually more complicated).
Kiba v2 officialized the support for a “programmatic API” (previously this was only considered an internal testing tool), that you can use from “live Ruby code” instead of using the
kiba command, like this:
job = Kiba.parse do source ... transform ... transform ... destination ... end Kiba.run(job)
This changes brings a lot of benefits:
- You can easily trigger Kiba jobs from wherever you need (Sidekiq jobs, IRB, or a Rake task)
- Integration testing of full pipelines is faster (no shelling out) via RSpec or minitest
- Testing of such pipelines can easily rely on in-process mocking (without resorting to complicated out-of-process mocking techniques)
- You can pass Ruby variables to the job (rather than using only
ENVvariables or on-disk config files) to drive the pipeline behaviour
- You can easily wrap resources open/closes with a block (to ensure e.g. a connection is closed after your job)
- Also, code in general tends to be cleaner (no top-level namespace pollution)
Ultimately, in Kiba v3, using
Kiba.parse/Kiba.run API is the way to go. The previous CLI is removed (but you can reimplement it easily via Rake tasks).
To reflect those important changes, the documentation has been rewritten almost completely.
You are invited to check it out (and please report any troubles you may have when reading it!)
In the coming weeks, we will integrate an extra SQL transform (batch SQL lookup) to Kiba Pro, and will issue an official “launch” post.
We will also finalize support for Ruby 2.7 in Kiba OSS, and share more articles on explaining how to use Ruby for data processing.
Thanks for reading, and subscribe to the newsletter below to get updates!
Thank you for sharing this article!