How to explode multivalued attributes with Kiba ETL?

June 25, 2015

When processing data, it’s common to be willing to explode this:

po_number,buyers
00001,John:Mary:Sally

(where buyers is known as a multivalued attribute) into this:

po_number,buyer
00001,John
00001,Mary
00001,Sally

In this article you are going to see how to achieve this with Kiba ETL, ultimately with some very reusable ETL components.

A first way to normalize data from a source

In a first attempt, you could write a Kiba source that reads the data, splits the buyers field then yields one row per buyer, like this:

require 'facets/kernel/deep_copy'

class NormalizingCsvSource
  def initialize(input_file)
    @csv = CSV.open(input_file, headers: true, header_converters: :symbol)
  end

  def each
    @csv.each do |row|
      row = row.to_hash
      row.delete(:buyers).split(':').each do |buyer|
        yield(row.deep_copy.merge(buyer: buyer))
      end
    end
    @csv.close
  end
end

deep_copy (coming from the facets gem) is used to make sure each sub-row is not sharing any value with the other ones. It relies on Marshal so be careful with performance if relevant here.

You can use such a source this way:

source NormalizingCsvSource, 'file.csv'

This works, yet you now have a single class dealing with two concerns at once (1/ CSV parsing 2/ normalization). This makes it less likely to be reusable.

What if we could separate the parsing and the normalization completely?

Splitting the parsing and the normalization

A Kiba source is simply a class that responds to each and takes a couple of parameters at instantiation time (see documentation). As such, you can create a RowNormalizer source that is able to instantiate any source provided and apply a given normalization logic, like this:

class RowNormalizer
  def initialize(normalizer_klass, source_klass, *args)
    @normalizer = normalizer_klass.new
    @source = source_klass.new(*args)
  end

  def each
    @source.each do |row|
      @normalizer.process(row) do |subrow|
        yield subrow
      end
    end
  end
end

You can also transform your CSV source into a more general purpose one, like:

class CsvSource
  def initialize(input_file)
    @csv = CSV.open(input_file, headers: true, header_converters: :symbol)
  end

  def each
    @csv.each do |row|
      yield(row.to_hash)
    end
    @csv.close
  end
end

This new source has no knowledge whatsoever about the normalization process.

Now you can rewrite your ETL script this way:

require 'facets/kernel/deep_copy'

class MyNormalizationLogic
  def process(row)
    row.delete(:buyers).split(':').each do |buyer|
      yield(row.deep_copy.merge(buyer: buyer))
    end
  end
end

# your reusable normalizer
source RowNormalizer,
  # your script-specific normalization logic
  MyNormalizationLogic,
  # your reusable CSV (or something else) source
  CsvSource, 'input.csv'

Reusing your reusable ETL components

Now equipped with reusable components, you can recycle them to transform data for other scenarios, such as transforming this:

customer_name,sales_q1,sales_q2,sales_q3
Acme Corp,18440,22200,21769

into this:

customer_name,period,sales
Acme Corp,Q1,18440
Acme Corp,Q2,22200
Acme Corp,Q3,21769

At this point you just need the following code:

class SalesNormalizationLogic
  def process(row)
    %i(sales_q1 sales_q2 sales_q3).each do |key|
      period = key.to_s.split('_').last.upcase
      yield row.deep_copy.merge(sales: row[key], period: period)
    end
  end
end

source RowNormalizer, SalesNormalizationLogic, CsvSource, 'file.csv'

Note that you can also switch the source at this stage, for instance to read the data from some database instead of CSV!

Credits

Many thanks to Lucas Di Cioccio, Lukas Fittl, Jordon Bedwell and Franck Verrot for the inspiration!