wiki · home


ZooKeeper

What is it?

ZooKeeper can be viewed as service for coordinating processes of distributed applications. When used, you could think of ZooKeeper as the coordination kernel of the system. This kernel enables new primitives and multiple forms of coordination that are adapted to the requirements of the applications using ZooKeeper.

Service overview

ZooKeeper provides the abstraction of a set of data nodes called znodes which are organized as a hierarchical name space.

                               ┌─────┐
                             / │     │
                               └─────┘
                                  │
                ┌─────────────────┴─────────────────┐
                ▼                                   ▼
             ┌─────┐                             ┌─────┐
       /app1 │     │                             │     │ /app2
             └─────┘                             └─────┘
                │                                   │
                │
    ┌───────────┼───────────┐                       │
    │           │           │
    │           │           │                       │
    ▼           ▼           ▼
 ┌─────┐     ┌─────┐     ┌─────┐                    │
 │     │     │     │     │     │
 └─────┘     └─────┘     └─────┘                    │
/app1/p_1   /app1/p_2   /app1/p_3

The znodes can be of two types:

Clients can also pass a sequential flag that will have a monotonically increasing counter appended to the created znode.

ZooKeeper also implements watches to allow clients to receive notifications when changes happen to a znode.

Data model

The data model of ZooKeeper is essentially a file system with a simplified API and only full data reads and writes, or a key/value table with hierarchical keys. Znodes can store information to be used as meta-data or configuration in a distributed computation. They also have associated meta-data with time stamps and version counters that allow clients to track changes to znodes and execute conditional updates based on the version of the znode.

Sessions

When clients connect to ZooKeeper they initiate a session. Sessions have an associated timeout. ZooKeeper considers a client faulty if it doesn’t receive anything from the client’s session for more than a timeout. The session ends if ZooKeeper considers the client faulty or if the client explicitly closes the session.

Sessions are useful because they allow clients to move transparently between servers inside the ZooKeeper cluster.

Client API

The list without the description is as follows:

ZooKeeper guarantees

ZooKeeper has two basic ordering guarantees:

Reads can return stale data for different clients. If a client wants to ensure that it is reading the most up-to-date data it can use the sync function that causes a server to apply all pending write requests before reading the data requested by the client.

Examples of primitives

ZooKeeper Implementation

ZooKeeper provides high availability by replicating the ZooKeeper data on each server that composes the service. It is assumed that servers fail by crashing, and such servers may later recover.

When receiving a request, the server prepares it for execution (request processor). If the request requires coordination among the servers (write requests), then they use an agreement protocol (an implementation of atomic broadcast), and finally commit changes to the ZooKeeper database fully replicated across all servers of the cluster. For read requests the server just read the state from its local database and return the information to the client.

The replicated database is an in-memory database containing the entire data tree. Each znode can store a maximum of 1MB of data by default which can be configured in special circumstances. Updates are logged to disk for recovery purposes and writes are forced to the disk before being applied to the in-memory database. A replay log of committed operations also exists and snapshots are generated to reduce the memory usage of the database.

Requests that need coordination are routed to the leader of the cluster. The other servers, the followers, receive message proposals with state changes from the leader and agree on the state changes.

Request Processor

Because the messaging layer is atomic, it is guaranteed that local replicas never diverge but at any time some servers might have applied more transactions than others. The transactions are idempotent. This comes from the fact that when the leader receives a request it calculates the state when the write request is applied and captures it into a transaction with the new state.

Atomic Broadcast

The leader executes the request and broadcasts the change using Zab1, an atomic broadcast protocol.

Zab will use a majority quorum to decide on a proposal which means Zab and ZooKeeper only work when a majority of servers are correct. A cluster of 2 * F + 1 servers tolerates F failures.

Zab guarantees that changes broadcast by a leader are delivered in the order they were sent and all changes made by previous leaders are delivered to an established leader before it broadcasts its own changes.

The leader chosen by Zab is also used as the ZooKeeper leader which means the leader that creates transactions is the same that proposes them.

Under normal operation Zab delivers all messages exactly once but it does store id’s of delivered messages so it might resend messages during recovery.

Replicated Database

As mentioned before, each replica stores a copy of the ZooKeeper state in memory. Periodic snapshots are used which only require redelivery of messages since the start of the snapshot. The snapshots are called fuzzy snapshots since the ZooKeeper state is not locked when creating the snapshot. A depth-first scan of the tree is done atomically reading each znode’s data and meta-data and writing them to disk.

Client-Server Interactions

Servers process writes in order and do not process other writes or reads concurrently. This ensures strict succession of notifications. Notifications are handled locally so only the server the client is connect to tracks and notifies the client.

Each read request is processed and tagged with a zxid that marks the last transaction seen by the server. The server checks the zxid the client has last seen and if the client has a more recent view than the server, the server does not reestablish the session with the client until the server has caught up. The client is guaranteed to find another server that has a recent view of the system.

Performance

The paper can be consulted for exact numbers and ratios but one thing to remember is that since reads do not use atomic broadcast the more servers are added the throughput for reads will increase.

Another thing to keep in mind is that when the number of servers go up, write requests take longer than read requests. The reason is that write requests use atomic broadcast and the servers have to also ensure that transactions are written to non-volatile storage before sending acks to the leader.

References

Notes


  1. A simple totally ordered broadcast protocol. http://diyhpl.us/~bryan/papers2/distributed/distributed-systems/zab.totally-ordered-broadcast-protocol.2008.pdf↩︎