wiki · home


Wormhole: Realiable Pub-Sub to support Geo-replicated Internet Services

Wormhole is a pub-sub system developed by Facebook to be used in their geo-replicated datacenters. It is used to replicate changes among several Facebook services including TAO, Graph Search and Memcache.

Wormhole publishers directly read the transaction logs of the datastores to learn about any new writes and when a new write is identified, they map the data to a Wormhole update that is delivered to subscribers. The data is always encoded as a set of key-value pairs that are orthogonal to the representation of the data in the datastore where the data came from.

Wormhole Architecture

The architecture can be represented as follows:

┌─────────Write──────────────┐
│                            │
│                            │
│   ┌──────────┐       ┌──────────┐
│   │   Web    │       │ Backend  │
│   │ Service  │       │(Producer)│
│   │(Producer)│       │          │                   Application 1
│   └──────────┘       └──────────┘                         ▲
│         │                                         ────────┴───────
│      Write                                         ┌────────────┐
│         │                                    ┌────▶│Subscriber 1│──┐   ┌──────────┐
│         ▼                                    │     └────────────┘  └──▶│Invalidate│
│   ┌──────────┐                               │     ┌────────────┐  ┌──▶│  Cache   │
│   │          │                               │ ┌──▶│Subscriber 2│──┘   └──────────┘
│   │ Database │                               │ │   └────────────┘
│   │       ┌──┴─┐            ┌──────────┐     │ │  ────────┬───────
│   │       │Txn │     ┌─────▶│Publisher │─────┤ │          ▼
│   └───────┤Log │     │      └──────────┘     │ │
│         │ │    │     │                       │ │          ▲
│         │ └────┘     │                       │ │  ────────┴───────
│         └───────Reader                       │ │   ┌────────────┐
│                                           ┌──┼─┼──▶│Subscriber 1│───┐
│                                           │  │ │   └────────────┘   │   ┌──────────┐
│  ┌──────────┐                             │  │ │   ┌────────────┐   └──▶│  Update  │
│  │          │               ┌──────────┐  │  └─┼──▶│Subscriber 2│──────▶│  Index   │
│  │ Database │       ┌──────▶│Publisher │──┘    │   └────────────┘   ┌──▶│          │
└─▶│       ┌──┴─┐     │       └──────────┘───────┘   ┌────────────┐   │   └──────────┘
   │       │Txn │     │                              │Subscriber 3│───┘
   └───────┤Log │     │                              └────────────┘
         │ │    │     │                             ────────┬───────
         │ └────┘     │                                     ▼
         └───────Reader                               Application 2

Data Model and System Architecture

A dataset is a collection of related data, for example, user generated data in Facebook. It is partitioned in a number of shards, with each update being assigned to a unique shard. A datastore stores data for a collection of shards of the dataset. Each datastore runs a publisher that sends the updates to subscribers. Publishers are typically co-located on the database machines for having fast local access to the transaction logs.

An application links itself with the subscriber library and arranges for multiple instances of itself to be run, which are the subscribers. The publishers find all interested applications and corresponding subscribers via a configuration system stored on ZooKeeper. The publishers then divide all the shards to be sent to the application among all subscribers of the application in question. They use a callback onShardNotice to notify subscribers about which shard they will be responsible for.

Shards belonging to the same producer might be processed by different subscribers, and shards belonging to the different producers might be processed by the same subscriber.

All updates of a single shard are sent to the same subscriber and they are delivered in order.

The stream of updates for every shard is called a flow. Publishers occasionally track datamarkers per flow after the subscribers acknowledge that they have received the updates up to a new datamarker. A datamarker is essentially a pointer indicating the position in the log of the last received and acknowledged update of the flow by a subscriber.

Updates Delivery

A publisher constructs a flow per each (application, shard) pair. When a new application is added to the configuration, it typically specifies which point in the past it wants to start receiving updates from. The publishers make sure that it starts by sending updates from the specified point.

To avoid high I/O loads when multiple subscribers are recovering and need updates from different points in time, Wormhole groups these flows based on their position and create a reader for each cluster, constituting then a caravan. The single steady state caravan (which has the farthest read position) is called the lead caravan.

Load balancing is achieved in two ways:

Subscribers can also devise their own load-balancing strategy using the primitives above.

After a flow is associated with a caravan and a subscriber, the updates are sent over a TCP connection. To save on connection overhead the publisher multiplex all flows associated with a subscriber in the same connection.

Caravan Allocation

Caravans are periodically split and merged based on the datamarkers of the flows. The caravan can be split if the flows on it can be tightly clustered into more than one cluster. And caravans can be merged if they are “close” to each other reading from almost the same position.

Flows can also be dynamically moved between caravans. Maybe a subscriber can not keep up with the rate of updates coming and the publisher chooses to move the flow to a different caravan so a different subscriber takes care of processing the updates.

Filtering Mechanism

Wormhole exposes a publisher-side filtering feature: the application informs publishers of what filters it needs and the publishers only send updates that pass the given filters.

Reliable Delivery

Wormhole supports two modes for update delivery: single-copy reliable delivery (SCRD), and multiple-copy reliable delivery (MCRD).

Single-Copy Reliable Delivery (SCRD)

In this mode, Wormhole leverages the reliability of TCP: while a subscriber is responsive, TCP guarantees reliable delivery. Wormhole also doesn’t use application acks for setting the datamarker. Instead, for every flow, it sends a datamarker interspersed with the updates. The subscriber acknowledges a datamarker once it has processed all updates before the datamarker. The datamarkers are stored on persistent storage by the publisher and they are used when a subscriber recovers from a failure, so that the publisher start sending updates again based on the stored datamarker.

Multiple-Copy Reliable Delivery (MCRD)

In this mode, applications can subscribe at once to more than one copy of a dataset, and when this happens, Wormhole guarantees that subscribers will receive at least once all updates contained by any copy of the dataset. The advantage is that if a dataset becomes unavailable, a publisher running in a copy of the dataset that is unavailable can take over and start sending updates from where the old primary replica last sent.

So in this mode, the publishers store the datamarkers in ZooKeeper. The physical location of a datamarker can vary according to the datastore and to overcome this problem, Wormhole uses logical positions so it can identify the same data in different datastore replicas. This all happens with the help of logical position mappers that have the job of mapping these logical positions from the datamarkers to datastore-specific positions.

References