How to reformat CSV files with Kiba ETL?

June 04, 2015

Also published the same day: What's ahead for Kiba ETL? (announcing Kiba Pro and an upcoming ebook).

Today’s scenario: you’re working for a thriving US company who just bought a French company to expand to the EU market.

You are responsible to build an aggregated sales reports and you’d like to integrate the French company sales data, coming as an original CSV flavor:

date_facture;montant_eur;numero_commande
7/3/2015;10,96;FA1986
7/3/2015;85,11;FA1987
8/3/2015;6,41;FA1988
8/3/2015;12,08;FA1989
9/3/2015;99,99;FA1990

As a US company, you would like to have this kind of format instead:

invoice_number,invoice_date,amount_eur
FA1986,2015-03-07,10.96
FA1987,2015-03-07,85.11
FA1988,2015-03-08,6.41
FA1989,2015-03-08,12.08
FA1990,2015-03-09,99.99

The key differences are:

  • the columns are separated by a semi-colon (not a comma)
  • the prices are using a comma for the decimal separator (instead of the dot)
  • the column names are in French (rather than in English)
  • the dates are formatted as 8/3/2015 (we’d like ISO instead)

You'd usually would have to transcode data from ISO-8859-1 to UTF-8 too. I'll leave that for a future post.

The aim of this post is to show you how to convert the data to your target format, using Kiba ETL.

There are plenty of other tools to manipulate CSV! I'm just using CSV as an example of how Kiba works as a general-purpose ETL tool.

How plain Ruby and Kiba ETL compare?

A quick, one-off version conversion script using plain Ruby could look like this:

require 'CSV'
require 'facets/kernel/blank'

output_fields = [:invoice_number, :invoice_date, :amount]

CSV.open('orders-classic.csv', 'w') do |output|
  output << output_fields
  CSV.foreach('commandes.csv', col_sep: ';', headers: true, header_converters: :symbol) do |row|
    row = row.to_hash
    [:date_facture, :montant_eur, :numero_commande].each do |field|
      raise "Missing value for field #{field}" if row[field].blank?
    end
    row[:invoice_date] = Date.strptime(row[:date_facture], '%d/%m/%Y')
    row[:amount] = Float(row[:montant_eur].gsub(',', '.'))
    row[:invoice_number] = row[:numero_commande]
    output << row.values_at(*output_fields)
  end
end

As this kind of script grows though, they get harder and harder to maintain. It’s easy to introduce an error in there! And the various parts are not really easily reusable nor testable.

To fix this, I’m going to take you step by step, incrementally, so we rewrite and refactor the ETL script with Kiba, to end up this way:

require_relative 'common'

source CsvSource, 'commandes.csv', col_sep: ';', headers: true, header_converters: :symbol

transform VerifyFieldsPresence, [:date_facture, :montant_eur, :numero_commande]
transform ParseFrenchDate,      from: :date_facture, to: :invoice_date
transform ParseFrenchFloat,     from: :montant_eur, to: :amount_eur
transform RenameField,          from: :numero_commande, to: :invoice_number

destination CsvDestination, 'orders.csv', [:invoice_number, :invoice_date, :amount_eur]

Don't be fooled! This version requires a lot more (but: reusable, testable) code that will sit in common.rb.

Setup your Kiba ETL project

First create a Gemfile to specify our dependencies:

source 'https://rubygems.org'

gem 'kiba', '~> 0.6.0'
# for awesome row printing!
gem 'awesome_print'

Install the dependencies:

$ bundle install

Create our script:

$ echo "puts 'Hello from Kiba!'" > convert-csv.etl

Verify things work as expected:

$ bundle exec kiba convert-csv.etl
Hello from Kiba!

You’re good to go!

You can also use the .rb extension, this is regular Ruby. I often change the extension to make sure the file won't get loaded by mistake inside a given Rails app, for instance.

Create a reusable CSV source

Kiba ETL focuses on creating reusable, maintainable ETL components. We’re going to do just that for the source of rows.

Create a repository of reusable bits:

$ touch common.rb

Edit it and add this simple CSV source declaration:

You can read more about the syntax used below in the Kiba documentation.

require 'csv'

class CsvSource
  def initialize(file, options)
    @file = file
    @options = options
  end
  
  def each
    CSV.foreach(@file, @options) do |row|
      yield row.to_hash
    end
  end
end

If your project becomes much larger, you could put the CsvSource in its own file, with proper unit tests etc.

Now modify your convert-csv.etl file to use exactly this instead:

require_relative 'common'

# read from a french-formatted CSV file
source CsvSource, 'commandes.csv', col_sep: ';', headers: true, header_converters: :symbol

The way we declare the source here makes it easier to quickly modify the script to use an alternate implementation (eg: using JRuby CSV reader). We could even declare the source conditionally based on what is currently running.

Try out your brand new CSV source

Create a commandes.csv file with the data provided at the beginning of the post, then run Kiba:

$ bundle exec kiba convert-csv.etl

At this stage, nothing should show up. How do we know we have what we need?

Let’s use awesome_print right after the declaration to debug our pipeline:

require 'awesome_print'

transform do |row|
  ap row
  row # always return the row to keep it in the pipeline
end

Run the ETL script again:

$ bundle exec kiba convert-csv.etl

You should get:

{
       :date_facture => "7/3/2015",
        :montant_eur => "10,96",
    :numero_commande => "FA1986"
}
# SNIP
{
       :date_facture => "9/3/2015",
        :montant_eur => "99,99",
    :numero_commande => "FA1990"
}

This means we’ve been able to parse our data made of semi-colon separated values, yay!

Since using ap here seems handy, let’s make a reusable helper in common.rb:

# in common.rb
require 'awesome_print'

def show_me!
  transform do |row|
    ap row
    row # always return the row to keep it in the pipeline
  end
end

Now you can quickly enjoy calling show_me! whenever you want it.

Remember, we’ve been able to parse the semi-colon data, but we still have a number of differences in the format that we want to smooth out. Let’s tackle this!

Parse the numbers

The montant_eur field (amount in euros) uses the comma as the decimal separator.

Let’s fix this and also rename the field at the same time:

# bottom of convert-csv.etl

transform do |row|
  # adapt from french culture here
  amount_eur = row[:montant_eur].gsub(',', '.')
  # sort of validate the input, raise if not a float
  row[:amount_eur] = Float(amount_eur)
  # return the row to keep it
  row
end

# show the row at this step of the pipeline
show_me!

Make sure to understand float precision and validate the input as needed, depending on your needs.

Now verify that things work as expected:

$ bundle exec convert-csv.etl
{
# SNIP
        :montant_eur => "10,96",
         :amount_eur => 10.96
}
# SNIP
{
# SNIP
        :montant_eur => "99,99",
         :amount_eur => 99.99
}

Looks good again! Now that we have this working, maybe we can extract a reusable component again?

Add this to common.rb:

class ParseFrenchFloat
  def initialize(from:, to:)
    @from = from
    @to = to
  end
  
  def process(row)
    value = row[@from].gsub(',', '.')
    row[@to] = Float(value)
    row
  end
end

I'm using Ruby 2.1+ required keywords arguments here. This allows flexible yet fail-fast code.

You may want to make this source more robust or configurable depending on your needs.

We can now update the ETL script:

# replace bottom of convert-csv.etl
transform ParseFrenchFloat, from: :montant_eur, to: :amount_eur

Much better!

Another way to tackle this is to separate the parsing and the renaming into 2 separate steps. This is very common in ETL scripts. I'm keeping a common step here to make sure we see the old and new values at once, for learning purposes.

Reformat your dates

Let’s parse our invoicing date in convert-csv.etl now:

transform do |row|
  row[:invoice_date] = Date.strptime(row[:date_facture], '%d/%m/%Y')
  row
end

show_me!

Again run kiba and check the output:

{
       :date_facture => "7/3/2015",
# SNIP
       :invoice_date => #<Date: 2015-03-07 ((2457089j,0s,0n),+0s,2299161j)>
}
# SNIP

Looks like things work as expected! We could keep the date as a Ruby date and convert down the road (if we need to apply more transforms) or just convert it back to a string:

row[:invoice_date] = Date.strptime(row[:date_facture], '%d/%m/%Y')
row[:invoice_date] = row[:invoice_date].to_s

Run again and we’ll have what we want:

{
       :date_facture => "7/3/2015",
       :invoice_date => "2015-03-07"
}

Let’s now extract a reusable component. Put this in common.rb:

class ParseFrenchDate
  def initialize(from:, to:)
    @from = from
    @to = to
  end
  
  def process(row)
    row[@to] = Date.strptime(row[@from], '%d/%m/%Y').to_s
    row
  end
end

Then update the ETL script:

transform ParseFrenchDate, from: :date_facture, to: :invoice_date

Run again, and you’ll see we’re good to go forward!

Rename the remaining column

"Rename the remaining". It's a bit difficult to pronounce. Say it loud 10 times.

The previous transforms already took care to assign a target name to the fields they were processing. We still have to rename one column though. Let’s do this:

transform do |row|
  row[:invoice_number] = row.delete(:numero_commande)
  row
end

Although that’s a bit overkill (but may not be, if you have many columns to process), we can refactor the code this way:

transform RenameField, from: :numero_commande, to: :invoice_number

Add this to common.rb:

class RenameField
  def initialize(from:, to:)
    @from = from
    @to = to
  end
  
  def process(row)
    row[@to] = row.delete(@from)
    row
  end
end

Protect your script from input changes

It’s not unusual for a data source to suddenly change of format. It’s a good practice to put guards in place to detect such changes and fail fast.

For instance, we could just verify the source columns are there and provide a non-blank value, and raise an error otherwise.

Edit the Gemfile (we’re going to add a useful gem here):

gem "facets", require: false

Install the new gem:

$ bundle install

Then modify your ETL script:

source CsvSource, ...
# just right after the source
transform VerifyFieldsPresence, [:date_facture, :montant_eur, :numero_commande]

The check can be implemented this way:

require 'facets/kernel/blank'

class VerifyFieldsPresence
  def initialize(expected_fields)
    @expected_fields = expected_fields
  end
  
  def process(row)
    @expected_fields.each do |field|
      if row[field].blank?
        raise "Row lacks value for field #{field} - #{row.inspect}"
      end
    end
    row
  end
end

Voilà! If the French subsidiary start sending a modified format, you’ll get an error.

You have a more robust ETL script now!

Prepare your CSV destination

Finally, it’s time to write our target CSV destination. Add this to common.rb:

class CsvDestination
  def initialize(file, output_fields)
    @csv = CSV.open(file, 'w')
    @output_fields = output_fields

    @csv << @output_fields
  end
  
  def write(row)
    verify_row!(row)
    @csv << row.values_at(*@output_fields) #*
  end

  def verify_row!(row)
    missing_fields = @output_fields - [row.keys & @output_fields].flatten

    if missing_fields.size > 0
      raise "Row lacks required field(s) #{missing_fields}\n#{row}"
    end
  end

  def close
    @csv.close
  end
end

This destination will write the rows, flushing the file on close, and will also make sure that the provided columns are in the row.

Let’s use it now:

output_fields = [:invoice_number, :invoice_date, :amount_eur]

destination CsvDestination, 'orders.csv', output_fields

Run kiba on the script and you’ll get the expected CSV format!

Where do you go from here?

Obviously, you had to write more lines of code compared to the original Ruby version!

But you also now have a bunch of reusable components, and your ETL scripts are easier to maintain and to evolve.

For instance, if you write a database destination, modifying the ETL script to target it will only involve a one-line change, without much risk to break anything.

I hope you have a better understanding of the process of writing and refactoring ETL jobs with Kiba now. Drop me a line or a comment otherwise!

Stay tuned for more ETL articles!


Thank you for sharing this article!