When processing data, it’s common to be willing to explode this:
buyers is known as a multivalued attribute) into this:
po_number,buyer 00001,John 00001,Mary 00001,Sally
In this article you are going to see how to achieve this with Kiba ETL, ultimately with some very reusable ETL components.
A first way to normalize data from a source
In a first attempt, you could write a Kiba source that reads the data, splits the
buyers field then yields one row per buyer, like this:
require 'facets/kernel/deep_copy' class NormalizingCsvSource def initialize(input_file) @csv = CSV.open(input_file, headers: true, header_converters: :symbol) end def each @csv.each do |row| row = row.to_hash row.delete(:buyers).split(':').each do |buyer| yield(row.deep_copy.merge(buyer: buyer)) end end @csv.close end end
deep_copy (coming from the facets gem) is used to make sure each sub-row is not sharing any value with the other ones. It relies on Marshal so be careful with performance if relevant here.
You can use such a source this way:
source NormalizingCsvSource, 'file.csv'
This works, yet you now have a single class dealing with two concerns at once (1/ CSV parsing 2/ normalization). This makes it less likely to be reusable.
What if we could separate the parsing and the normalization completely?
Splitting the parsing and the normalization
A Kiba source is simply a class that responds to
each and takes a couple of parameters at instantiation time (see documentation). As such, you can create a
RowNormalizer source that is able to instantiate any source provided and apply a given normalization logic, like this:
class RowNormalizer def initialize(normalizer_klass, source_klass, *args) @normalizer = normalizer_klass.new @source = source_klass.new(*args) end def each @source.each do |row| @normalizer.process(row) do |subrow| yield subrow end end end end
You can also transform your CSV source into a more general purpose one, like:
class CsvSource def initialize(input_file) @csv = CSV.open(input_file, headers: true, header_converters: :symbol) end def each @csv.each do |row| yield(row.to_hash) end @csv.close end end
This new source has no knowledge whatsoever about the normalization process.
Now you can rewrite your ETL script this way:
require 'facets/kernel/deep_copy' class MyNormalizationLogic def process(row) row.delete(:buyers).split(':').each do |buyer| yield(row.deep_copy.merge(buyer: buyer)) end end end # your reusable normalizer source RowNormalizer, # your script-specific normalization logic MyNormalizationLogic, # your reusable CSV (or something else) source CsvSource, 'input.csv'
Reusing your reusable ETL components
Now equipped with reusable components, you can recycle them to transform data for other scenarios, such as transforming this:
customer_name,sales_q1,sales_q2,sales_q3 Acme Corp,18440,22200,21769
customer_name,period,sales Acme Corp,Q1,18440 Acme Corp,Q2,22200 Acme Corp,Q3,21769
At this point you just need the following code:
class SalesNormalizationLogic def process(row) %i(sales_q1 sales_q2 sales_q3).each do |key| period = key.to_s.split('_').last.upcase yield row.deep_copy.merge(sales: row[key], period: period) end end end source RowNormalizer, SalesNormalizationLogic, CsvSource, 'file.csv'
Note that you can also switch the source at this stage, for instance to read the data from some database instead of CSV!