Also published the same day: What's ahead for Kiba ETL? (announcing Kiba Pro and an upcoming ebook).
Today’s scenario: you’re working for a thriving US company who just bought a French company to expand to the EU market.
You are responsible to build an aggregated sales reports and you’d like to integrate the French company sales data, coming as an original CSV flavor:
date_facture;montant_eur;numero_commande
7/3/2015;10,96;FA1986
7/3/2015;85,11;FA1987
8/3/2015;6,41;FA1988
8/3/2015;12,08;FA1989
9/3/2015;99,99;FA1990
As a US company, you would like to have this kind of format instead:
invoice_number,invoice_date,amount_eur
FA1986,2015-03-07,10.96
FA1987,2015-03-07,85.11
FA1988,2015-03-08,6.41
FA1989,2015-03-08,12.08
FA1990,2015-03-09,99.99
The key differences are:
- the columns are separated by a semi-colon (not a comma)
- the prices are using a comma for the decimal separator (instead of the dot)
- the column names are in French (rather than in English)
- the dates are formatted as 8/3/2015 (we’d like ISO instead)
You'd usually would have to transcode data from ISO-8859-1 to UTF-8 too. I'll leave that for a future post.
The aim of this post is to show you how to convert the data to your target format, using Kiba ETL.
There are plenty of other tools to manipulate CSV! I'm just using CSV as an example of how Kiba works as a general-purpose ETL tool.
How plain Ruby and Kiba ETL compare?
A quick, one-off version conversion script using plain Ruby could look like this:
require 'CSV'
require 'facets/kernel/blank'
output_fields = [:invoice_number, :invoice_date, :amount]
CSV.open('orders-classic.csv', 'w') do |output|
output << output_fields
CSV.foreach('commandes.csv', col_sep: ';', headers: true, header_converters: :symbol) do |row|
row = row.to_hash
[:date_facture, :montant_eur, :numero_commande].each do |field|
raise "Missing value for field #{field}" if row[field].blank?
end
row[:invoice_date] = Date.strptime(row[:date_facture], '%d/%m/%Y')
row[:amount] = Float(row[:montant_eur].gsub(',', '.'))
row[:invoice_number] = row[:numero_commande]
output << row.values_at(*output_fields)
end
end
As this kind of script grows though, they get harder and harder to maintain. It’s easy to introduce an error in there! And the various parts are not really easily reusable nor testable.
To fix this, I’m going to take you step by step, incrementally, so we rewrite and refactor the ETL script with Kiba, to end up this way:
require_relative 'common'
source CsvSource, 'commandes.csv', col_sep: ';', headers: true, header_converters: :symbol
transform VerifyFieldsPresence, [:date_facture, :montant_eur, :numero_commande]
transform ParseFrenchDate, from: :date_facture, to: :invoice_date
transform ParseFrenchFloat, from: :montant_eur, to: :amount_eur
transform RenameField, from: :numero_commande, to: :invoice_number
destination CsvDestination, 'orders.csv', [:invoice_number, :invoice_date, :amount_eur]
Don't be fooled! This version requires a lot more (but: reusable, testable) code that will sit in common.rb.
Setup your Kiba ETL project
First create a Gemfile
to specify our dependencies:
source 'https://rubygems.org'
gem 'kiba', '~> 0.6.0'
# for awesome row printing!
gem 'awesome_print'
Install the dependencies:
$ bundle install
Create our script:
$ echo "puts 'Hello from Kiba!'" > convert-csv.etl
Verify things work as expected:
$ bundle exec kiba convert-csv.etl
Hello from Kiba!
You’re good to go!
You can also use the .rb extension, this is regular Ruby. I often change the extension to make sure the file won't get loaded by mistake inside a given Rails app, for instance.
Create a reusable CSV source
Kiba ETL focuses on creating reusable, maintainable ETL components. We’re going to do just that for the source of rows.
Create a repository of reusable bits:
$ touch common.rb
Edit it and add this simple CSV source declaration:
You can read more about the syntax used below in the Kiba documentation.
require 'csv'
class CsvSource
def initialize(file, options)
@file = file
@options = options
end
def each
CSV.foreach(@file, @options) do |row|
yield row.to_hash
end
end
end
If your project becomes much larger, you could put the CsvSource in its own file, with proper unit tests etc.
Now modify your convert-csv.etl
file to use exactly this instead:
require_relative 'common'
# read from a french-formatted CSV file
source CsvSource, 'commandes.csv', col_sep: ';', headers: true, header_converters: :symbol
The way we declare the source here makes it easier to quickly modify the script to use an alternate implementation (eg: using JRuby CSV reader). We could even declare the source conditionally based on what is currently running.
Try out your brand new CSV source
Create a commandes.csv
file with the data provided at the beginning of the post, then run Kiba:
$ bundle exec kiba convert-csv.etl
At this stage, nothing should show up. How do we know we have what we need?
Let’s use awesome_print
right after the declaration to debug our pipeline:
require 'awesome_print'
transform do |row|
ap row
row # always return the row to keep it in the pipeline
end
Run the ETL script again:
$ bundle exec kiba convert-csv.etl
You should get:
{
:date_facture => "7/3/2015",
:montant_eur => "10,96",
:numero_commande => "FA1986"
}
# SNIP
{
:date_facture => "9/3/2015",
:montant_eur => "99,99",
:numero_commande => "FA1990"
}
This means we’ve been able to parse our data made of semi-colon separated values, yay!
Since using ap
here seems handy, let’s make a reusable helper in common.rb:
# in common.rb
require 'awesome_print'
def show_me!
transform do |row|
ap row
row # always return the row to keep it in the pipeline
end
end
Now you can quickly enjoy calling show_me!
whenever you want it.
Remember, we’ve been able to parse the semi-colon data, but we still have a number of differences in the format that we want to smooth out. Let’s tackle this!
Parse the numbers
The montant_eur
field (amount in euros) uses the comma as the decimal separator.
Let’s fix this and also rename the field at the same time:
# bottom of convert-csv.etl
transform do |row|
# adapt from french culture here
amount_eur = row[:montant_eur].gsub(',', '.')
# sort of validate the input, raise if not a float
row[:amount_eur] = Float(amount_eur)
# return the row to keep it
row
end
# show the row at this step of the pipeline
show_me!
Make sure to understand float precision and validate the input as needed, depending on your needs.
Now verify that things work as expected:
$ bundle exec convert-csv.etl
{
# SNIP
:montant_eur => "10,96",
:amount_eur => 10.96
}
# SNIP
{
# SNIP
:montant_eur => "99,99",
:amount_eur => 99.99
}
Looks good again! Now that we have this working, maybe we can extract a reusable component again?
Add this to common.rb
:
class ParseFrenchFloat
def initialize(from:, to:)
@from = from
@to = to
end
def process(row)
value = row[@from].gsub(',', '.')
row[@to] = Float(value)
row
end
end
I'm using Ruby 2.1+ required keywords arguments here. This allows flexible yet fail-fast code.
You may want to make this source more robust or configurable depending on your needs.
We can now update the ETL script:
# replace bottom of convert-csv.etl
transform ParseFrenchFloat, from: :montant_eur, to: :amount_eur
Much better!
Another way to tackle this is to separate the parsing and the renaming into 2 separate steps. This is very common in ETL scripts. I'm keeping a common step here to make sure we see the old and new values at once, for learning purposes.
Reformat your dates
Let’s parse our invoicing date in convert-csv.etl
now:
transform do |row|
row[:invoice_date] = Date.strptime(row[:date_facture], '%d/%m/%Y')
row
end
show_me!
Again run kiba and check the output:
{
:date_facture => "7/3/2015",
# SNIP
:invoice_date => #<Date: 2015-03-07 ((2457089j,0s,0n),+0s,2299161j)>
}
# SNIP
Looks like things work as expected! We could keep the date as a Ruby date and convert down the road (if we need to apply more transforms) or just convert it back to a string:
row[:invoice_date] = Date.strptime(row[:date_facture], '%d/%m/%Y')
row[:invoice_date] = row[:invoice_date].to_s
Run again and we’ll have what we want:
{
:date_facture => "7/3/2015",
:invoice_date => "2015-03-07"
}
Let’s now extract a reusable component. Put this in common.rb
:
class ParseFrenchDate
def initialize(from:, to:)
@from = from
@to = to
end
def process(row)
row[@to] = Date.strptime(row[@from], '%d/%m/%Y').to_s
row
end
end
Then update the ETL script:
transform ParseFrenchDate, from: :date_facture, to: :invoice_date
Run again, and you’ll see we’re good to go forward!
Rename the remaining column
"Rename the remaining". It's a bit difficult to pronounce. Say it loud 10 times.
The previous transforms already took care to assign a target name to the fields they were processing. We still have to rename one column though. Let’s do this:
transform do |row|
row[:invoice_number] = row.delete(:numero_commande)
row
end
Although that’s a bit overkill (but may not be, if you have many columns to process), we can refactor the code this way:
transform RenameField, from: :numero_commande, to: :invoice_number
Add this to common.rb
:
class RenameField
def initialize(from:, to:)
@from = from
@to = to
end
def process(row)
row[@to] = row.delete(@from)
row
end
end
Protect your script from input changes
It’s not unusual for a data source to suddenly change of format. It’s a good practice to put guards in place to detect such changes and fail fast.
For instance, we could just verify the source columns are there and provide a non-blank value, and raise an error otherwise.
Edit the Gemfile (we’re going to add a useful gem here):
gem "facets", require: false
Install the new gem:
$ bundle install
Then modify your ETL script:
source CsvSource, ...
# just right after the source
transform VerifyFieldsPresence, [:date_facture, :montant_eur, :numero_commande]
The check can be implemented this way:
require 'facets/kernel/blank'
class VerifyFieldsPresence
def initialize(expected_fields)
@expected_fields = expected_fields
end
def process(row)
@expected_fields.each do |field|
if row[field].blank?
raise "Row lacks value for field #{field} - #{row.inspect}"
end
end
row
end
end
Voilà! If the French subsidiary start sending a modified format, you’ll get an error.
You have a more robust ETL script now!
Prepare your CSV destination
Finally, it’s time to write our target CSV destination. Add this to common.rb
:
class CsvDestination
def initialize(file, output_fields)
@csv = CSV.open(file, 'w')
@output_fields = output_fields
@csv << @output_fields
end
def write(row)
verify_row!(row)
@csv << row.values_at(*@output_fields) #*
end
def verify_row!(row)
missing_fields = @output_fields - [row.keys & @output_fields].flatten
if missing_fields.size > 0
raise "Row lacks required field(s) #{missing_fields}\n#{row}"
end
end
def close
@csv.close
end
end
This destination will write the rows, flushing the file on close, and will also make sure that the provided columns are in the row.
Let’s use it now:
output_fields = [:invoice_number, :invoice_date, :amount_eur]
destination CsvDestination, 'orders.csv', output_fields
Run kiba on the script and you’ll get the expected CSV format!
Where do you go from here?
Obviously, you had to write more lines of code compared to the original Ruby version!
But you also now have a bunch of reusable components, and your ETL scripts are easier to maintain and to evolve.
For instance, if you write a database destination, modifying the ETL script to target it will only involve a one-line change, without much risk to break anything.
I hope you have a better understanding of the process of writing and refactoring ETL jobs with Kiba now. Drop me a line or a comment otherwise!
Stay tuned for more ETL articles!
Thank you for sharing this article!