We created a distributed ETL system we affectionately call Mandoline. It is configurable, distributed, scalable, and easy to manage – here’s how we did it.
One of the hardest parts of building a distributed system is ensuring proper coordination between nodes across a cluster, and we decided to do it using Apache ZooKeeper. ZooKeeper can be imagined as a remote file system, where every file is also a folder (these are referred to as “znodes”). For example, let’s say we have the znode /mandoline
where we store the system version, "1"
. Under /mandoline
we may also store items like the configuration, so /mandoline/load_config
stores our load configuration (in our case, a json dictionary for every source). The magic sauce of ZooKeeper is that it guarantees “strictly ordered access” to znodes, allowing synchronization and coordination to be built on top of it.
Mandoline coordinates its tasks using watchers, a neat ZooKeeper feature. You can specify a znode, and whether you’d like to watch changes to its data or its children, and ZooKeeper will notify you once those have happened. In the Mandoline architecture, processes choose specific znodes and use those to communicate changes in the data they are responsible for.
For example, we have a component that loads orders from our Orders
table in the database, and we have two other components that need to track: 1. the purchase history of a given user, and 2. the total sales for that event. This is how the loading data component does it:
latest_timestamp = 0
for datum in query_data:
key = datum.pop(primary_key)
timestamp = datum.pop(MANDOLINE_TIME_CHECKPOINT, 0)
if timestamp > latest_timestamp:
latest_timestamp = timestamp
main_batch.insert(key, datum)
self.zk_client.retry(
self.zk_client.set,
self.load_notification_node,
str(latest_timestamp),
)
Notice that there are many operations done for a given query, however only a small value (a timestamp, in this case) is written to ZooKeeper. Znodes have a restriction whereas they cannot hold large values, so the queue containing items to actually perform work on are stored in Cassandra while ZooKeeper handles the notification part.
Continue reading “Watching Metadata Changes in a Distributed Application Using ZooKeeper”