Ruby
Article

Kiba: ETL Done Right

By Glenn Goodrich

Screenshot 2015-06-25 12.57.05

I am a “consultant” or a “contractor”, which are 2 of the various words that begin with “con” that I’ve been called. In essence, my clients have an itch they want to scratch with technology, and they hire my company to do the work.

I am not wild about the term “consultant”, but this is a great joke, if you haven’t heard it.

As a digital gun-for-hire, I am often involved in moving applications and systems from one platform to another. From ASP to Ruby or from SQL Server to PostgreSQL. In many of these cases, there is data that lives in the “old” system that needs to be move/upgraded/transformed to the “new” system. The canonical term is “data migration”, and it strikes fear in the hearts of those that have had a project go months or years overdue due to its complications. This was me the last time a Scope of Work (SOW) called for a data migration:

kiba1

Even seemingly simple migrations can be awful. Those are the kind I am going to look at in today’s post. Simple migrations, pulling data from one source, doing a bit of transformation, and then storing that data in a new destination. I have a need to migrate user information from one system to a new system that we are building. The very idea of the migration had my teeth chattering, until I found Kiba.

Kiba to the Rescue

Kiba is “a lightweight Ruby ETL framework”. ETL, for those that don’t know, stands for Extract, Transform, and Load. ETL is another way to say “data migration”, but it’s a bit closer to the front lines. ETL usually means there is a known source or sources, along with known destination(s). When you’re ready to talk ETL, you’re talking about how to pull (extract) data from the source(s), change (transform) it to match the requirements, then write (load) it into the new destination(s). That’s it. Three easy steps that can eat your project timeline like me eating pancakes (I like pancakes.)

kiba2

Kiba aims to solve this issue, and I have to say I was very excited to find it. Having looked at various ETL solutions in my many, many years doing this work, I was skeptical. I am ecstatic to say, Kiba is simple and flexible.

The Kiba Github Repository has a great overview of how Kiba works which covers much more detail than I will here. After you read this post, you can get more Kiba information there.

Kiba was created by Thibaut Barrere, former maintainer of the activewarehouse-etl gem.

Data Processing as a Pipeline

In an excellent blog post, Thibaut runs through how to write data-processing code. The takeaway is that it should be like a pipeline, where the source feeds data into one end, the data is processed by various transformations, and then comes out the other end of the pipeline into the destination.

kibapipeline

The image above should drive the point home. The source (MySQL) feeds the first pipe. You can string as many transform pipes together to make up your pipeline. Above, the middle pipe could add something from above or filter things out to below. These pipes can add/remove/change data, obviously, but they can also be conditional. Your transforms can react to the incoming data, doing what you need, when you need it.

Furthermore, Kiba allows the creation of reusable transforms, meaning pipes you can take anywhere. These transforms can be tested on their own, bringing a fully mature development cycle to your ETL.

Besides sources, transforms, and destinations, Kiba offers some other goodies. An ETL project can include pre- and post-processors. A pre-processor runs before the sources are accessed, and a post-processor runs after all the rows have been written to the destination.

Here is an example transform from Kiba’s docs:

# declare a ruby method here, for quick reusable logic
def parse_french_date(date)
  Date.strptime(date, '%d/%m/%Y')
end

# or better, include a ruby file which loads reusable assets
# eg: commonly used sources / destinations / transforms, under unit-test
require_relative 'common'

# declare a pre-processor: a block called before the first row is read
pre_process do
  # do something
end

# declare a source where to take data from (you implement it - see notes below)
source MyCsvSource, 'input.csv'

# declare a row transform to process a given field
transform do |row|
  row[:birth_date] = parse_french_date(row[:birth_date])
  # return to keep in the pipeline
  row
end

# declare another row transform, dismissing rows conditionally by returning nil
transform do |row|
  row[:birth_date].year < 2000 ? row : nil
end

# declare a row transform as a class, which can be tested properly
transform ComplianceCheckTransform, eula: 2015

# before declaring a definition, maybe you'll want to retrieve credentials
config = YAML.load(IO.read('config.yml'))

# declare a destination - like source, you implement it (see below)
destination MyDatabaseDestination, config['my_database']

# declare a post-processor: a block called after all rows are successfully processed
post_process do
  # do something
end

That ETL project is very easy to follow and should give you a taste of how Kiba operates.

Done Extracting Setup, Time to Transform This Post and Load the Demo

At this point, your understanding of Kiba should serve you well as I walk through my ETL project. As I mentioned previously, I have the need to migrate user accounts from one system to another. Here’s the breakdown:

Kiba Role Fulfilled By
Source MySQL
Transform Ignore accounts with bad crypto
Transform Get email, password, and salt
Destination PostgreSQL

This is pretty simple. The ETL will pull users out of a MySQL table, filter them, then write them to PostgreSQL.

Source

A Kiba Source is a class that:

  • Has a constructor that sets up any needs specific to the source. For a CSV source, it could be the file name.
  • Implements each, yielding rows to the provided block.

This design makes sources easy to isolate and incredibly testable. In fact, they’re so simple that you may not need much testing.

My source is a MySQL database and looks like:

lib/user_source.rb

require 'mysql2'
require 'uri'

class UserSource
  # connect_url should look like;
  # mysql://user:password@localhost/dbname 
  def initialize(connect_url)
    @mysql = Mysql2::Client.new(connect_hash(connect_url))
  end

  def each
    results = @mysql.query('select * from accounts', as: :hash, symbolize_keys: true)
    results.each do |row|
      yield(row)
    end
  end

  private

  def connect_hash(url)
    u = URI.parse(url)
    {
      host: u.host,
      username: u.user,
      password: u.password,
      port: u.port,
      database: u.path[1..-1]
    }
  end
end

I initialize the source with a MySQL URI. The each method makes a simple query to get the accounts, then yields each row to the block. Seriously, how easy is that? The worst part is parsing the URI.

The above source is simple to make it more (I hope) teachable. You could make it much more generic (think MySQLSource) by passing the query into the constructor. Also, for larger loads, you might want to use MySQL streaming. The point is, it’s easy to do and it’s up to you.

Destination

A Kiba Destination is a class that:

  • Has a constructor for the same reason that a source has one. Any initialization needs of the destination can be passed here.
  • Implements a write(row) method to write the row to the destination.
  • Implements a close method to clean up after itself.

The destination in this project is a PostgreSQL database:

lib/user_destination.rb

require 'pg'

class UserDestination
  # connect_url should look like;
  #  mysql://user:pass@localhost/dbname
  def initialize(connect_url)
    @conn = PG.connect(connect_url)
    @conn.prepare('insert_user_stmt', 'insert into users (email, password_digest, created_at, updated_at) values ($1, $2, $3, $4)')
  end

  def write(row)
    time = Time.now
    @conn.exec_prepared('insert_user_stmt', 
                        [ row[:email], row[:password], row[:date_created], time ])
  rescue PG::Error => ex
    puts "ERROR for #{row[:email]}"
    puts ex.message
    # Maybe, write to db table or file
  end

  def close
    @conn.close 
    @conn = nil
  end
end

As you can see, when initializing the destination, I create a prepared statement. This is a PostgreSQL optimization, of sorts, and shows how the constructor can be used to do destination-specific setup.

The each function simple writes the row by executing the prepared statement. row is a hash with keys for the attributes we want. The destination table also has timestamp columns (updated_at, created_at), so I account for those.

It’s a good idea to have a rescue clause in write to capture errors and track what records died. Also, if you don’t catch errors, the migration will stop on the first error, which is often undesirable.

As I mentioned about UserSource, this class could be made more generic in many ways.

Now, it’s time to make our pipes.

Pipeline

As this is a insultingly simple (but very real) example of an ETL. The transform just takes attributes from the source and maps them to attributes of the destination, then returns the new row. yawwwwns

transform do |row|
  newrow = {}
  newrow[:email] = row[:Email]
  newrow[:password] = row[:Password]
  newrow
end

Kiba offers many ways to implement transforms and you can chain them. Chained transforms will be executed in the order they exist in the script (which we’ll look at soon). For example, the above transform could be changed to this:

transform do |row|
  row[:crypt] == "bcrypt" ? row : nil
end

transform do |row|
  newrow = {}
  newrow[:email] = row[:Email]
  newrow[:password] = row[:Password]
  newrow
end

In this case, we are removing source rows from the pipeline based on an attribute. Neat.

It’s also possible to create a Ruby class that performs the transformation. Like so:

class FilterMD5
  def initialize(value_to_filter) 
    @value = value_to_filter
  end

  def process(row)
    row[:crypt] == value_to_filter ? row : nil
  end
end

Then, the transform changes to:

transform FilterMD5, "md5"

transform do |row|
  newrow = {}
  newrow[:email] = row[:Email]
  newrow[:password] = row[:Password]

  newrow
end

Again, FilterMD5 could be made more generic, perhaps accepting a block that would allow different filters. Consider that homework.

The ability of putting a transform into a class makes writing specs/tests for that transform trivial. Kiba enables best practice. holds up high-five hand

Pre- and Post- Processors

The last piece of Kiba I’d like to cover are pre- and post- processors. As you might imagine, these items run before any records are retrieved from the source and after all records are written to the destination, respectively. Any overall setup or cleanup are handled by these processors. In the example, I use it to communicate progress:

start_time = Time.now
pre_process do
  puts "*** Start ACCOUNT MIGRATION #{start_time}***"
end

... source, transforms, destination ...

post_process do
  end_time = Time.now
  duration_in_minutes = (end_time - start_time)/60
  puts "*** End ACCOUNT MIGRATION #{end_time}***"
  puts "*** Duration (min): #{duration_in_minutes.round(2)}"
end

There. Now I have a nice UX experience for the script runner, which is me. Way to go, me.

Obviously, my examples are trivial, but you can imagine use cases where real work goes into these blocks. Right?

The Full Monty

Here is my Kiba script:

migrate_users.rb

#!/usr/bin/env ruby

$:.unshift "#{File.dirname(__FILE__)}/lib"

require 'user_source'
require 'user_destination'
require 'filter_md5'

mysql_url = ENV["MYSQL_URL"]

source UserSource, mysql_url

start_time = Time.now
pre_process do
  puts "*** START ACCOUNT MIGRATION #{start_time}***"
end

transform FilterMD5

transform do |row|
  puts "OK"
  newrow = {}
  newrow[:email] = row[:Email]
  newrow[:password] = row[:Password]
  newrow[:salt] = row[:Salt]
  newrow[:old_id] = row[:AccountId]

  newrow
end

post_process do
  end_time = Time.now
  duration_in_minutes = (end_time - start_time)/60
  puts "*** End ACCOUNT MIGRATION #{end_time}***"
  puts "*** Duration (min): #{duration_in_minutes.round(2)}"
end

destination UserDestination, ENV["PG_URL"]

The source, destination, and transform files are found in a lib directory in my project.

Run It

Running a Kiba script consists of calling the kiba executable and passing your script name:

kiba migrate_users.rb

And off it goes!

*** START ACCOUNT MIGRATION 2015-06-25 12:53:10 -0400***
....SOME ERRORS...
*** End ACCOUNT MIGRATION 2015-06-25 12:54:11 -0400***
*** 11433 records moved in 1.02 minutes

LOL for ETL DSL

Kiba has given me Lots of Love for doing ETL in Ruby. It is a very well-designed, powerful DSL, filling a void in the Ruby ecosystem, in my opinion. I encourage you to swing by the Kiba site and read more on the library. The docs are good, and there are blog posts related to ETL and Kiba. If you end up using it, let Thibaut know. I did, and he used me as a testimonial on the site. :)

goes into kitchen, makes pancakes

  • I’ve used Pentaho’s open source ETL tool to the same effect.

    • Leonardo Borges Avelino

      Kiba seems to be a light tool to do small jobs or something on Ruby environment. For example: you have a web app written in ruby (rails) and have to deal with MySQL and PostgreSQL for some reason, and Kiba can help you handle it.
      But I agree with @mothore:disqus that Pentaho Kettle is an amazing tool for working with ETL’s

    • Leonardo Borges Avelino

      Kiba seems to be a light tool to do small jobs or something on Ruby environment. For example: you have a web app written in ruby (rails) and have to deal with MySQL and PostgreSQL for some reason, and Kiba can help you handle it.
      But I agree with @mothore:disqus that Pentaho Kettle is an amazing tool for working with ETL’s

    • Leonardo Borges Avelino

      Kiba seems to be a light tool to do small jobs or something on Ruby environment. For example: you have a web app written in ruby (rails) and have to deal with MySQL and PostgreSQL for some reason, and Kiba can help you handle it.
      But I agree with @mothore:disqus that Pentaho Kettle is an amazing tool for working with ETL’s

    • Leonardo Borges Avelino

      Kiba seems to be a light tool to do small jobs or something on Ruby environment. For example: you have a web app written in ruby (rails) and have to deal with MySQL and PostgreSQL for some reason, and Kiba can help you handle it.
      But I agree with @mothore:disqus that Pentaho Kettle is an amazing tool for working with ETL’s

Recommended
Sponsors
Get the latest in Ruby, once a week, for free.