Update: check out kiba-common EnumerableExploder
(available with Kiba v2) for a better way to achieve this today.
When processing data, it’s common to be willing to explode this:
po_number,buyers
00001,John:Mary:Sally
(where buyers
is known as a multivalued attribute) into this:
po_number,buyer
00001,John
00001,Mary
00001,Sally
In this article you are going to see how to achieve this with Kiba ETL, ultimately with some very reusable ETL components.
A first way to normalize data from a source
In a first attempt, you could write a Kiba source that reads the data, splits the buyers
field then yields one row per buyer, like this:
require 'facets/kernel/deep_copy'
class NormalizingCsvSource
def initialize(input_file)
@csv = CSV.open(input_file, headers: true, header_converters: :symbol)
end
def each
@csv.each do |row|
row = row.to_hash
row.delete(:buyers).split(':').each do |buyer|
yield(row.deep_copy.merge(buyer: buyer))
end
end
@csv.close
end
end
deep_copy (coming from the facets gem) is used to make sure each sub-row is not sharing any value with the other ones. It relies on Marshal so be careful with performance if relevant here.
You can use such a source this way:
source NormalizingCsvSource, 'file.csv'
This works, yet you now have a single class dealing with two concerns at once (1/ CSV parsing 2/ normalization). This makes it less likely to be reusable.
What if we could separate the parsing and the normalization completely?
Splitting the parsing and the normalization
A Kiba source is simply a class that responds to each
and takes a couple of parameters at instantiation time (see documentation). As such, you can create a RowNormalizer
source that is able to instantiate any source provided and apply a given normalization logic, like this:
class RowNormalizer
def initialize(normalizer_klass, source_klass, *args)
@normalizer = normalizer_klass.new
@source = source_klass.new(*args)
end
def each
@source.each do |row|
@normalizer.process(row) do |subrow|
yield subrow
end
end
end
end
You can also transform your CSV source into a more general purpose one, like:
class CsvSource
def initialize(input_file)
@csv = CSV.open(input_file, headers: true, header_converters: :symbol)
end
def each
@csv.each do |row|
yield(row.to_hash)
end
@csv.close
end
end
This new source has no knowledge whatsoever about the normalization process.
Now you can rewrite your ETL script this way:
require 'facets/kernel/deep_copy'
class MyNormalizationLogic
def process(row)
row.delete(:buyers).split(':').each do |buyer|
yield(row.deep_copy.merge(buyer: buyer))
end
end
end
# your reusable normalizer
source RowNormalizer,
# your script-specific normalization logic
MyNormalizationLogic,
# your reusable CSV (or something else) source
CsvSource, 'input.csv'
Reusing your reusable ETL components
Now equipped with reusable components, you can recycle them to transform data for other scenarios, such as transforming this:
customer_name,sales_q1,sales_q2,sales_q3
Acme Corp,18440,22200,21769
into this:
customer_name,period,sales
Acme Corp,Q1,18440
Acme Corp,Q2,22200
Acme Corp,Q3,21769
At this point you just need the following code:
class SalesNormalizationLogic
def process(row)
%i(sales_q1 sales_q2 sales_q3).each do |key|
period = key.to_s.split('_').last.upcase
yield row.deep_copy.merge(sales: row[key], period: period)
end
end
end
source RowNormalizer, SalesNormalizationLogic, CsvSource, 'file.csv'
Note that you can also switch the source at this stage, for instance to read the data from some database instead of CSV!
Credits
Many thanks to Lucas Di Cioccio, Lukas Fittl, Jordon Bedwell and Franck Verrot for the inspiration!
Thank you for sharing this article!