GO’CIRCUIT
A project by Petar Maymounkov.

Sumr: Scalable low-latency persistent counter database

 

This is an advanced tutorial that is best read after the Hello, world! tutorial. It assumes the Go Circuit is installed. The application in this tutorial is a distributed persistent key-value store, built from the ground up. One might describe it as a “distributed Redis” that supports only counter values.

While this application does not have the polishings of a final product, it is performant, stable and quite useful. We have successfully experimented with using it for site analytics, whereby our deployment was exposed to the web-page impressions event stream at Tumblr.

While discussing Sumr, we go over some design patterns that we have converged to for organizing sharded services.

Quick start

Go to subdirectory tutorials/sumr of your circuit clone. Within it, find a sample app configuration file called app.config. This configuration is geared towards a quick and simple build and deploy on the local machine. The cross-build tool is configured to perform the build on the local machine, and all necessary repos (Go, circuit and app) are rsync-ed locally into the build jail. You must modify the hardcoded paths with ones applicable to your system.

You can then build and deploy the Sumr app. From subdirectory tutorials/sumr:

% CIR=app.config 4crossbuild
This will result in a binary distribution in local directory ~/.sumr/ship. Deploy your distribution to localhost:
% echo localhost | CIR=app.config 4deploy
This will install the deployment in ~/.sumr/deploy. Lauch the shard workers:
% CIR=app.config ~/.sumr/ship/sumr-spawn-shard -sumr shard.config -durable /tutorial/sumr
You will notice we need a shard configuration file. Its syntax is explained in the next section. The flag -durable specifies the name of a file in the durable file system where the service will record its configuration for clients to discover it. Finally, deploy the API workers:
% CIR=app.config ~/.sumr/ship/sumr-spawn-api -api api.config -durable /tutorial/sumr
The API workers also require a configuration, described below. The flag -durbale points to the same durable file that was created during sumr-spawn-shard.

Configuration

Two configuration files specify the shard and API workers independently.

Shards configuration. In JSON format, it describes the set of shard workers to be spawned.

{
	"Anchor": "/tutorial/sumr/shard",
	"Workers": [
		{
			"Host": "localhost",
			"DiskPath": "/tmp/sumr1",
			"Forget": 3600000000000
		},
		{
			"Host": "localhost",
			"DiskPath": "/tmp/sumr2",
			"Forget": 3600000000000
		}
	]
}

Field Anchor specifies the directory prefix for the anchors of the shard workers. Following, field Workers is a list of worker configs. Each worker config specifies a Host where the worker is to be executed, a DiskPath on that same host, where the database directory is to be located (created if not present), and a time interval Forget in nanoseconds, which ensures that counters that have not been updated for more than Forget nanoseconds will be evicted from memory.

API configuration. In JSON format, as well, describes the set of API workers:

{
	"Anchor": "/tutorial/sumr/api",
	"ReadOnly": false,
	"Workers": [
		{
			"Host": "localhost",
			"Port": 43000
		},
		{
			"Host": "localhost",
			"Port": 43001
		}
	]
}

Field Anchor is the directory prefix for the API worker anchors. Within this directory, workers will be registered under subdirectories named after the index into the configuration of the respective worker. If ReadOnly is true, only SUM operations will be accepted. Workers is a list of worker configuration. Each specifies a Host where the worker is to be spawned and an HTTP Port where the API interface is to be installed.

Using Sumr

Summer supports two operations ADD and SUM.

Operation ADD adds a persistent floating-point value to a counter—which is simply a persistent floating-point variable named by a key. The operation is explicitly timestamped by the caller. Keys are in the form of depth-1 JSON structs which can contain arbitrary string fields. Here is the CURL invocation sample

curl localhost:43000/add --data '{"t": 0, "k":{"p":"q","r":"s"}, "v":0.23} {"t": 10, "k":{"r":"s","a":"b"}, "v":0.11}'
Multiple operations can be requested within the body of a single request, separated by zero or more whitespace characters (including newline and TAB). Each request is a JSON object with three fields. Field "t" specifies an integral timestamp in nanoseconds since epoch. Field "k" specifies a JSON object to be used a counter key. Field "v" specifies an integral floating-point number to be added to the current value of the counter, or to create this counter with that value if one does not exist.

Operation SUM returns the sum of all values added to a given key. In other words, it returns the current value of that counter. The CURL sample:

curl localhost:43000/sum --data '{"k":{"p":"q","r":"s"}}'
The syntax here is identical to that for ADD except the fileds "t" and "v" are omitted.

Design overview

We build Sumr—a distributed low-latency persistent counter database. A “counter database” is a key-value store where the values are numbers and support an “addition” operation, as well as the generic “set” and “get” operations. One application of a counter database is in keeping site analytics over time. In this context, for example, the purpose of the database might be to fulfill two types of commands: “increase the impression count of page P for the hour 5pm–6pm on March 2, 2013” and “tell me how many impressions of page P occured during the hour 2am–3am on July 2, 2011.”

To meet this requirement, Sumr supports two commands. An addition command ADD takes three parameters: (i) a 64-bit timestamp, (ii) a 64-bit key and a (iii) 64-bit floating-point additive difference. If the key has not been seen before, a new key is created with a corresponding value of zero. The additive difference is added into the current value and the prior value of the key is returned. The user-supplied timestamp is remembered as the latest time when this key was updated. Keys that have not been updated for a pre-configured interval of time are evicted from the database in-memory structures and are not seen by subsequent queries (unless updated again). All ADD operations are persisted to disk.

A “summation” command SUM takes a 64-bit key parameter and returns its current value or zero if the key is not present in-memory.

A database of this kind is typically used by upstream front-end logic, from a language—like PHP—that is text-based, with poor concurrency and inefficient CPU utilization. To enable this use case, we also build an API proxy which translates richer REST HTTP API requests—incoming from external technologies—to database calls. The API supports batched ADD and SUM requests, which are parallelized transparently.

The ADD and SUM HTTP API calls differ from the naked database API calls in two ways. First, they are presented in the form of JSON objects. And, second, keys can be nearly-any JSON objects. This allows upstream logic to encode semantically meaningful information in the keys themselves, and in a format that is more naturally accessible to common interpreted languages, like JavaScript and PHP.

The presence of an HTTP API proxy resolves another problem too. It turns out that parsing JSON-formatted keys and converting them to 64-bit binary keys is more CPU-intensive than the database workload itself. By decoupling the API layer from the database, one is able to add as many API workers as are necessary to hit the right balance.

Illustration of the Hello, world! application logic.

Architecture

The worker-level anatomy of a Sumr circuit is illustrated above. We have two kinds of workers: shard and API.

Discovery and smart clients

The API workers are clients to the shard workers, naturally. The circuit provides a few different ways to engineer how the API workers “find” the shard workers. Which technique is best depands on the occasion. In the context of Sumr, we assume that the individual workers can suffer independent failures and that it is desirable ro repair individual failures separately while affecting the remaining circuit in a minimal way.

We therefore design Sumr so that:

Anchored discovery

When shards are spawned (with the command from package /sumr/cmd/sumr-spawn-shard), they are registered under descriptive anchor paths. We use the following format

/sumr/server/1234ab009cc
The prefix directory, /sumr/server in this example, is specified by the user so as to enable co-existence of multiple instances of the service. The prefix directory houses one or more subdirectories, each corresponding to one worker, and automatically named after the XOR-metric key, 1234ab009cc in our example, of the shard logic running on the worker.

Of course, the actual worker file is automatically generated within the latter directory by the circuit spawn mechanism. It would look something like this:

/sumr/server/1234ab009cc/R0de1473b3c192b4c

Upon spawning, an API worker would list the contents of the directory /sumr/server to discover all the shards living in the circuit at the moment, as well as their XOR-metric keys.

Discovering the anchor file of a worker amounts to obtaining a circuit.Addr address to the worker process. However, the client's end goal is to access the Server object living within a shard worker and make cross-calls to its Add and Sum methods. For this, the client needs to obtain a cross-interface to the Server object.

To accomplish this, we utilize the circuit's listen and dial mechanism, accessed via circuit.Listen and circuit.Dial. (This mechanism is described in detail in the programming guide under the name “cross-services”.) In particular, every shard worker listens on the service name "sumr" and when requests for this service are received from clients, the worker returns a cross-interface to the Server object.

This anchor-based technique is very clean and simple and it additionally provides visibility into the service to administrators, who could manually list the content of the shard directory with 4ls as well as use patterns like

% 4kill /sumr/...
To kill all workers pertaining to a deployment.

Durable discovery

An additional discovery mechanism is given, mostly for demonstrating a different approach. The durable file system is a global file system for small files, backed by Zookeeper, and its interface is found in circuit/use/durablefs. This file system has two distibguishing characteristics.

First, file contents is saved and retrieved in the form of unconstrained native Go values which can, in addition, also have permanent cross-interface sub-values. Second, directory listing semantics are similar to those of Zookeeper, in order to enable implementation of app synchronization logic.

The main idea of this mechanism is that during the launch of the shard workers, the launching sequence—implemented in package circuit/app/sumr/server and file spawn.go—saves shard XOR-metric keys and respective permanent cross-interfaces, for all shards, in a durable file. Consequently, clients can read this file and immediately access the shards, using the cross-interfaces.

Smart clients

Smart clients are the norm, not the exception, within the circuit. Each individual API worker is a client to the entire set of shard workers. Other circuit workers and application could be as well.

As we showed in our discussion of discovery, there a few steps of work involved in being a client. First, one needs to discover the set of shards and their respective XOR-metric keys. And second, for very database operations, the client needs to determine which shard to use and only after this it can proceed with a vanilla cross-call to the shard.

Furthermore, since we would like clients to parallelize batched operations, one also needs to take care of parallelization in a manner that is stable against varying load and even against DoS attacks.

Since this logic would be replicated by any entity that acts as a client to the Sumr database, it is customary to write a client library that exposes simple Go routines that do all the work.

For Sumr, the client implementation is found in package circuit/app/sumr/client. It performs preceisely the steps described above. Ensuring resilience against DoS attacks in a parallel setting is more challanging than the synchronous one, where simple backpressure suffices. A simple but effective method, based on concurrency caps at various locations, is demonstrated within the Sumr client.

In the client implementation, one will find a useful primitive that we find ourselves using over and over again. It is given by package circuit/kit/sched/limiter and it constitutes a lock that can be held by a maximum limited number of users at any given time. It is a natural abstraction for throttling various concurrent patterns appearing throughout the circuit code base.