Kiba: ETL Done Right
I am a “consultant” or a “contractor”, which are 2 of the various words that begin with “con” that I’ve been called. In essence, my clients have an itch they want to scratch with technology, and they hire my company to do the work.
I am not wild about the term “consultant”, but this is a great joke, if you haven’t heard it.
As a digital gun-for-hire, I am often involved in moving applications and systems from one platform to another. From ASP to Ruby or from SQL Server to PostgreSQL. In many of these cases, there is data that lives in the “old” system that needs to be move/upgraded/transformed to the “new” system. The canonical term is “data migration”, and it strikes fear in the hearts of those that have had a project go months or years overdue due to its complications. This was me the last time a Scope of Work (SOW) called for a data migration:
Even seemingly simple migrations can be awful. Those are the kind I am going to look at in today’s post. Simple migrations, pulling data from one source, doing a bit of transformation, and then storing that data in a new destination. I have a need to migrate user information from one system to a new system that we are building. The very idea of the migration had my teeth chattering, until I found Kiba.
Kiba to the Rescue
Kiba is “a lightweight Ruby ETL framework”. ETL, for those that don’t know, stands for Extract, Transform, and Load. ETL is another way to say “data migration”, but it’s a bit closer to the front lines. ETL usually means there is a known source or sources, along with known destination(s). When you’re ready to talk ETL, you’re talking about how to pull (extract) data from the source(s), change (transform) it to match the requirements, then write (load) it into the new destination(s). That’s it. Three easy steps that can eat your project timeline like me eating pancakes (I like pancakes.)
Kiba aims to solve this issue, and I have to say I was very excited to find it. Having looked at various ETL solutions in my many, many years doing this work, I was skeptical. I am ecstatic to say, Kiba is simple and flexible.
The Kiba Github Repository has a great overview of how Kiba works which covers much more detail than I will here. After you read this post, you can get more Kiba information there.
Kiba was created by Thibaut Barrere, former maintainer of the activewarehouse-etl gem.
Data Processing as a Pipeline
In an excellent blog post, Thibaut runs through how to write data-processing code. The takeaway is that it should be like a pipeline, where the source feeds data into one end, the data is processed by various transformations, and then comes out the other end of the pipeline into the destination.
The image above should drive the point home. The source (MySQL) feeds the first pipe. You can string as many transform pipes together to make up your pipeline. Above, the middle pipe could add something from above or filter things out to below. These pipes can add/remove/change data, obviously, but they can also be conditional. Your transforms can react to the incoming data, doing what you need, when you need it.
Furthermore, Kiba allows the creation of reusable transforms, meaning pipes you can take anywhere. These transforms can be tested on their own, bringing a fully mature development cycle to your ETL.
Besides sources, transforms, and destinations, Kiba offers some other goodies. An ETL project can include pre- and post-processors. A pre-processor runs before the sources are accessed, and a post-processor runs after all the rows have been written to the destination.
Here is an example transform from Kiba’s docs:
# declare a ruby method here, for quick reusable logic
def parse_french_date(date)
Date.strptime(date, '%d/%m/%Y')
end
# or better, include a ruby file which loads reusable assets
# eg: commonly used sources / destinations / transforms, under unit-test
require_relative 'common'
# declare a pre-processor: a block called before the first row is read
pre_process do
# do something
end
# declare a source where to take data from (you implement it - see notes below)
source MyCsvSource, 'input.csv'
# declare a row transform to process a given field
transform do |row|
row[:birth_date] = parse_french_date(row[:birth_date])
# return to keep in the pipeline
row
end
# declare another row transform, dismissing rows conditionally by returning nil
transform do |row|
row[:birth_date].year < 2000 ? row : nil
end
# declare a row transform as a class, which can be tested properly
transform ComplianceCheckTransform, eula: 2015
# before declaring a definition, maybe you'll want to retrieve credentials
config = YAML.load(IO.read('config.yml'))
# declare a destination - like source, you implement it (see below)
destination MyDatabaseDestination, config['my_database']
# declare a post-processor: a block called after all rows are successfully processed
post_process do
# do something
end
That ETL project is very easy to follow and should give you a taste of how Kiba operates.
Done Extracting Setup, Time to Transform This Post and Load the Demo
At this point, your understanding of Kiba should serve you well as I walk through my ETL project. As I mentioned previously, I have the need to migrate user accounts from one system to another. Here’s the breakdown:
Kiba Role | Fulfilled By |
---|---|
Source | MySQL |
Transform | Ignore accounts with bad crypto |
Transform | Get email, password, and salt |
Destination | PostgreSQL |
This is pretty simple. The ETL will pull users out of a MySQL table, filter them, then write them to PostgreSQL.
Source
A Kiba Source is a class that:
- Has a constructor that sets up any needs specific to the source. For a CSV source, it could be the file name.
- Implements
each
, yielding rows to the provided block.
This design makes sources easy to isolate and incredibly testable. In fact, they’re so simple that you may not need much testing.
My source is a MySQL database and looks like:
lib/user_source.rb
require 'mysql2'
require 'uri'
class UserSource
# connect_url should look like;
# mysql://user:password@localhost/dbname
def initialize(connect_url)
@mysql = Mysql2::Client.new(connect_hash(connect_url))
end
def each
results = @mysql.query('select * from accounts', as: :hash, symbolize_keys: true)
results.each do |row|
yield(row)
end
end
private
def connect_hash(url)
u = URI.parse(url)
{
host: u.host,
username: u.user,
password: u.password,
port: u.port,
database: u.path[1..-1]
}
end
end
I initialize the source with a MySQL URI. The each
method makes a simple query to get the accounts, then yields each row to the block. Seriously, how easy is that? The worst part is parsing the URI.
The above source is simple to make it more (I hope) teachable. You could make it much more generic (think MySQLSource
) by passing the query into the constructor. Also, for larger loads, you might want to use MySQL streaming. The point is, it’s easy to do and it’s up to you.
Destination
A Kiba Destination is a class that:
- Has a constructor for the same reason that a source has one. Any initialization needs of the destination can be passed here.
- Implements a
write(row)
method to write the row to the destination. - Implements a
close
method to clean up after itself.
The destination in this project is a PostgreSQL database:
lib/user_destination.rb
require 'pg'
class UserDestination
# connect_url should look like;
# mysql://user:pass@localhost/dbname
def initialize(connect_url)
@conn = PG.connect(connect_url)
@conn.prepare('insert_user_stmt', 'insert into users (email, password_digest, created_at, updated_at) values ($1, $2, $3, $4)')
end
def write(row)
time = Time.now
@conn.exec_prepared('insert_user_stmt',
[ row[:email], row[:password], row[:date_created], time ])
rescue PG::Error => ex
puts "ERROR for #{row[:email]}"
puts ex.message
# Maybe, write to db table or file
end
def close
@conn.close
@conn = nil
end
end
As you can see, when initializing the destination, I create a prepared statement. This is a PostgreSQL optimization, of sorts, and shows how the constructor can be used to do destination-specific setup.
The each
function simple writes the row by executing the prepared statement. row
is a hash with keys for the attributes we want. The destination table also has timestamp columns (updated_at
, created_at
), so I account for those.
It’s a good idea to have a rescue
clause in write
to capture errors and track what records died. Also, if you don’t catch errors, the migration will stop on the first error, which is often undesirable.
As I mentioned about UserSource
, this class could be made more generic in many ways.
Now, it’s time to make our pipes.
Pipeline
As this is a insultingly simple (but very real) example of an ETL. The transform
just takes attributes from the source and maps them to attributes of the destination, then returns the new row. yawwwwns
transform do |row|
newrow = {}
newrow[:email] = row[:Email]
newrow[:password] = row[:Password]
newrow
end
Kiba offers many ways to implement transforms and you can chain them. Chained transforms will be executed in the order they exist in the script (which we’ll look at soon). For example, the above transform could be changed to this:
transform do |row|
row[:crypt] == "bcrypt" ? row : nil
end
transform do |row|
newrow = {}
newrow[:email] = row[:Email]
newrow[:password] = row[:Password]
newrow
end
In this case, we are removing source rows from the pipeline based on an attribute. Neat.
It’s also possible to create a Ruby class that performs the transformation. Like so:
class FilterMD5
def initialize(value_to_filter)
@value = value_to_filter
end
def process(row)
row[:crypt] == value_to_filter ? row : nil
end
end
Then, the transform changes to:
transform FilterMD5, "md5"
transform do |row|
newrow = {}
newrow[:email] = row[:Email]
newrow[:password] = row[:Password]
newrow
end
Again, FilterMD5
could be made more generic, perhaps accepting a block that would allow different filters. Consider that homework.
The ability of putting a transform into a class makes writing specs/tests for that transform trivial. Kiba enables best practice. holds up high-five hand
Pre- and Post- Processors
The last piece of Kiba I’d like to cover are pre- and post- processors. As you might imagine, these items run before any records are retrieved from the source and after all records are written to the destination, respectively. Any overall setup or cleanup are handled by these processors. In the example, I use it to communicate progress:
start_time = Time.now
pre_process do
puts "*** Start ACCOUNT MIGRATION #{start_time}***"
end
... source, transforms, destination ...
post_process do
end_time = Time.now
duration_in_minutes = (end_time - start_time)/60
puts "*** End ACCOUNT MIGRATION #{end_time}***"
puts "*** Duration (min): #{duration_in_minutes.round(2)}"
end
There. Now I have a nice UX experience for the script runner, which is me. Way to go, me.
Obviously, my examples are trivial, but you can imagine use cases where real work goes into these blocks. Right?
The Full Monty
Here is my Kiba script:
migrate_users.rb
#!/usr/bin/env ruby
$:.unshift "#{File.dirname(__FILE__)}/lib"
require 'user_source'
require 'user_destination'
require 'filter_md5'
mysql_url = ENV["MYSQL_URL"]
source UserSource, mysql_url
start_time = Time.now
pre_process do
puts "*** START ACCOUNT MIGRATION #{start_time}***"
end
transform FilterMD5
transform do |row|
puts "OK"
newrow = {}
newrow[:email] = row[:Email]
newrow[:password] = row[:Password]
newrow[:salt] = row[:Salt]
newrow[:old_id] = row[:AccountId]
newrow
end
post_process do
end_time = Time.now
duration_in_minutes = (end_time - start_time)/60
puts "*** End ACCOUNT MIGRATION #{end_time}***"
puts "*** Duration (min): #{duration_in_minutes.round(2)}"
end
destination UserDestination, ENV["PG_URL"]
The source, destination, and transform files are found in a lib directory in my project.
Run It
Running a Kiba script consists of calling the kiba
executable and passing your script name:
kiba migrate_users.rb
And off it goes!
*** START ACCOUNT MIGRATION 2015-06-25 12:53:10 -0400***
....SOME ERRORS...
*** End ACCOUNT MIGRATION 2015-06-25 12:54:11 -0400***
*** 11433 records moved in 1.02 minutes
LOL for ETL DSL
Kiba has given me Lots of Love for doing ETL in Ruby. It is a very well-designed, powerful DSL, filling a void in the Ruby ecosystem, in my opinion. I encourage you to swing by the Kiba site and read more on the library. The docs are good, and there are blog posts related to ETL and Kiba. If you end up using it, let Thibaut know. I did, and he used me as a testimonial on the site. :)
goes into kitchen, makes pancakes