Watching Metadata Changes in a Distributed Application Using ZooKeeper

We created a distributed ETL system we affectionately call Mandoline. It is configurable, distributed, scalable, and easy to manage – here’s how we did it.

One of the hardest parts of building a distributed system is ensuring proper coordination between nodes across a cluster, and we decided to do it using Apache ZooKeeper. ZooKeeper can be imagined as a remote file system, where every file is also a folder (these are referred to as “znodes”). For example, let’s say we have the znode /mandoline where we store the system version, "1". Under /mandoline we may also store items like the configuration, so /mandoline/load_configstores our load configuration (in our case, a json dictionary for every source). The magic sauce of ZooKeeper is that it guarantees “strictly ordered access” to znodes, allowing synchronization and coordination to be built on top of it.

Mandoline coordinates its tasks using watchers, a neat ZooKeeper feature. You can specify a znode, and whether you’d like to watch changes to its data or its children, and ZooKeeper will notify you once those have happened. In the Mandoline architecture, processes choose specific znodes and use those to communicate changes in the data they are responsible for.

For example, we have a component that loads orders from our Orders table in the database, and we have two other components that need to track: 1. the purchase history of a given user, and 2. the total sales for that event. This is how the loading data component does it:

latest_timestamp = 0
for datum in query_data:
    key = datum.pop(primary_key)

    timestamp = datum.pop(MANDOLINE_TIME_CHECKPOINT, 0)
    if timestamp > latest_timestamp:
        latest_timestamp = timestamp

    main_batch.insert(key, datum)

self.zk_client.retry(
    self.zk_client.set,
    self.load_notification_node,
    str(latest_timestamp),
)

Notice that there are many operations done for a given query, however only a small value (a timestamp, in this case) is written to ZooKeeper. Znodes have a restriction whereas they cannot hold large values, so the queue containing items to actually perform work on are stored in Cassandra while ZooKeeper handles the notification part.

In the data consuming processes, watchers are placed on the load notification znodes and data is processed any time there is an update:

def start(self)
    ...
    self.zk_client.DataWatch(load_notification)(self.transform)

def transform(self, data, stat):
    # Stop watching if we are being deleted or the znode has been removed.
    if self.delete_event.is_set() or (data, stat) == (None, None):
        self.delete_event.set()
        self.zk_client.retry(self.zk_client.delete, self.checkpoint_node)

        return False

    last_checkin = int(self.zk_client.retry(
        self.zk_client.get,
        self.checkpoint_node,
    )[0])
    latest_update = last_checkin

    ...

    rows = self.following_column_family.multiget(
        updated_keys,
        include_timestamp=True,
        buffer_size=self.buffer_size,
    ).iteritems()

    output = defaultdict(dict)
    for key, data in rows:
        update_time = get_update_time(data)
        if update_time > latest_update:
            latest_update = update_time
        for column_name, column in self.columns.iteritems():
            column_output = column.transform(key, data)
            for out_key, out_value in column_output.iteritems():
                output[out_key][column_name] = out_value

    for out_key, out_data in output.iteritems():
        batch_object.insert(out_key, out_data)

    self.zk_client.retry(
        self.zk_client.set,
        self.load_notification_node,
        str(latest_update),
    )

Some things to note:

  1. Each process uses its own checkpoint, even though it is being given a timestamp as a load notification. This is done because the input timestamps are from MySQL, which could potentially be on a different time. Our single source of trust for time is Cassandra.
  2. The transformer has its own load notification node, in case any further downstream components need to look at the data. This way, processes are agnostic to how the data itself is generated, only caring when it gets updated.

Using znodes in this manner allowed Mandoline to be structured like a distributed relay race; once a task is done, it knows where to pass off the baton.

Another interesting side-effect of coordinating via ZooKeeper is the ease of managing and monitoring a distributed system. By having a centralized system, processes can add watchers to nodes specifying commands to take, like changing the schema or repairing corrupt data. Since ZooKeeper stores how often processes update, we have built several Ganglia charts and Nagios alerts that rely on the data there to see the health of the system.