How to sort rows in Kiba ETL?

May 14, 2021

It is a common need to be able to sort rows in any ETL. Here are two patterns to achieve this with Kiba ETL.

Using a buffering transform

Kiba v2.5.0 introduced the notion of “aggregating / buffering transforms”. This concept is explained in detail in this blog post, with the following animated figure:

You can design a quick in-memory sort by storing the rows in an array, and leveraging the close method to stream the sorted rows.

class SortingTransform
  def initialize(config...)
    @rows = []
  end

  def process(row)
    @rows << row
    nil # do not emit rows right away
  end

  def close
    # Here: sort the rows, optionally using external
    # configuration passed at init time
    @rows.sort_by { ... }.each do |row|
      yield row
    end
  end
end

For situations where in-memory sort cannot be used, you can use the same pattern to move the data to some datastore as the rows are seen, and then using the datastore sorting ability. If you do this, make sure to properly isolate the data in some temporary table or set, to avoid mixing the data with other operations.

Using a multi-step job

Another solution is to write:

  • A first ETL job, responsible for generating the unsorted output (e.g. a bunch of JSON files).
  • A second ETL job, dedicated to sorting, taking the outcome of the previous one as its input.

The connection between the two jobs can be a simple file storage, an online S3 storage of some sort, or even in-memory stream (inside the same process).

The simple file storage works very well for every day use, but maybe I’ll blog about in-memory streaming later, so stay tuned!


Thank you for sharing this article!