Gone in 5 milliseconds: How we set up fast data delivery in a complex trading system using Tarantool

Tarantool
18 min readMay 16, 2022

Hi everyone! This is Oleg Utkin from Tarantool’s storage architecture team. I want to tell you how in the course of a project for the Moscow Exchange we consolidated data flows from different trading systems and set up their delivery to clients. Along the way, we stepped on some rakes and drew a few interesting conclusions, which I’ll share below.

A brief overview of our task

Last year, we were approached by our colleagues from the Moscow Exchange regarding the task of separating trading cores. This is necessary to conduct transactions on different trading cores according to certain criteria and with specific tools. For example, some currency pairs can be processed on one trading core, and some — on another one. Or you can allocate a separate core for a particular type of security and all transactions involving it.

What it’s for. Such major architectural changes obviously have to pay off. Why would a stock exchange need this? First of all, to scale recording in order to process more transactions per second.

It also allows you to minimize the risks. Imagine a very large company — like a major bank — going public. Since there are many different processes going on at the stock exchange, the listing of securities can somehow be affected by other processes. To minimize the risks of a deal not being pulled off, exchanges run these things on separate trading cores.

It’s important that customers don’t actually see what’s going on when switching to separate cores happens. That is, there are already existing protocols, traders and brokers are somehow receiving the necessary data, so they shouldn’t notice any changes or latencies. Plus, it is important that the system scales both for read and write, without causing a bottleneck.

How trading at the exchange works

To understand exactly what task we faced, let’s consider how an exchange works, in very simple terms.

First, we will define what an exchange bid or trade order is. It is the desire of someone to buy something under certain limited conditions. This bid goes into the market depth (aka depth of market, DOM), and when it meets with another bid located in the same place, we get a deal.

As a result, the bid changes its status along with the one that it met, and something happens. Or, if the bid is in the DOM for a long time, and the deal is not happening, the initiator can withdraw it.

This is how the master of the trading core works. It’s a single-threaded application that takes up all memory on the server and horizontally scales to read at the expense of replicas. Replicas are copies of trading cores, except they get exactly the same bids in the same order. When all is well (and it almost always is) the replicas nearly keep up with the master. Sometimes they can even get ahead of it. They are periodically verified by check sums and serve as access servers. That’s what traders and brokers are connecting to — the replicas.

Structure of the replica system

We were primarily engaged with trading orders and their delivery to the customers. Let’s identify some of the things that go along with them:

Global counters. The exchange has a counter in the trading core, monotonically increasing with each new entry added. It’s very similar to an incremental primary key. It is called rec — a record number.

There is another counter, seq — sequence number — which operates rather more interestingly: it increments with each update that occurs on the trading core. That is, if any bid is updated and something happens to it, this counter is incremented by 1.

All orders have these two fields so that customers can get the most up-to-date data possible.

Delivery system. The customer has a pair of rec and seq and knows that there was some update in the exchange with some number, and there is a record with a different number. So the customer asks for the most recent data, changed or new. They receive those, take the next rec and seq pair from the latest order, and use it as a continuation token. Thus, customers receive data by a constant polling. They want the data to stay up-to-date at all times.

There is also a read request for secondary keys, since not all users want to see all the data. Some only need the name of the trading firm or the details of a particular broker.

Requirements for the data delivery system and choice of solution

Knowing all the conditions, we are ready to set out the requirements for this black box. The system must be able to accept up to 200,000 transactions per second on a single node. At the same time, the time between the transaction on the exchange and informing the client about it should be 1 ms on average. And 99% of the time you have to do it in 5 ms.

It seems to resemble a queue: some events are happening, and customers are getting those events. But in fact it’s not quite like that. The exchange only provides data that has changed, while customers want to see the whole picture. Therefore, it is necessary to provide the current state. If the client is a few microseconds or a millisecond behind, more recent data might have already appeared. And the customer doesn’t want to see old data, they want to see the recent one.

That’s why we need that kind of performance and latency. The customer should not know about the distribution, that is, you need to give the data without any omissions and guarantee its order.

In fact, we are dealing simultaneously with a database, a queue with indexes, and a cache with secondary indexes. It appears that you either have to write a whole system entirely from scratch, or take a tool that already has all these properties: it can be both a queue and a queue with indexes and it can guarantee latency.

We suggested that Tarantool might be suitable for this task, since it:

  • Is single-threaded and in-memory, like the core of the exchange.
  • It’s written in C.
  • It has an efficient asynchronous protocol with multiplexing.
  • In fact, it is a framework for building databases.
  • Already has a storage and indexing system, that is, there are secondary table keys and primary keys.
  • Can store data on disk and restore it, have replication with sharding.

Everything seems to add up, but there are some details. Next, when we talk about a technology, we will keep some kind of counter of the expediency of using it.

Working with indexes

We have two crucial fields: rec and seq. They mean the application number and change number. These fields allow customers to paginate over the data. With each insert we increment rec, and with each update we increment seq. When a customer requests new data, we can make sure they get the most recent versions of the order data in the right sequence and without any omissions. So to understand how to implement this, let’s start with how composite indexes work.

Let’s say we have a composite index consisting of two fields: field1 and field2. Suppose we want to select some record that will match keys 2 and 3. What is needed to do so? We should go through the values in the first field, looking for the searched key — 2, in this case. And from there we move on to look for the second key, 3. This would take us a logarithmic time.

Having found this point in the index, we can continue to iterate through the data as a linked list, since we are using B+ trees in which all leaves are bound by a linked list. That is, we have reached some place in the index, and then we can safely move to the next element in a constant time.

What would it look like in our case? We have rec and seq fields. We put seq in the first place, as this field means the freshness of the data. There is an order in the index that we want to update. To do this, the seq value is incremented, and the order jumps further in the index. Then the rec value is incremented. Thus, we are sure that the order is at the very end of the index.

The situation with insertion is similar: the data that comes from the trading core will always have the maximum seq.

We just need to set the maximum rec and insert the data. This way we guarantee that the most recent data is at the end of the index. When the customer’s request comes in, it can iterate and pick up data the client doesn’t have yet. Tarantool suits us very well here, since it has such a feature.

Storage implementation and problems with read operations

We had the storage implemented on Tarantool. Obviously, the storage’s purpose is storing data. In addition, it contains the logic for processing this data inside.

We had no access to the client’s actual test bench, so we couldn’t test on real data and their workload. So we needed to design some emulators for local debugging. These emulators allowed us to collect numerous metrics to draw some conclusions about performance and ways to optimize it.

We started with the simplest architecture possible. We had two procedures: ApplyEvent for simply taking data and putting it into the repository, and GetOrders, responsible for giving data to the client.

It works as follows. The client comes with a key — rec and seq, indicating at what point in time the client has started to read the data. The next time they come in with this key, they can be sure that the data will be fresher than the last time.

The data that comes from the trading core are events: of an order being created, deleted, or updated. The client receives the data in its current state. That is, they receive the last update that was in the trading core.

We started our testing with a load of writing entries. We needed to write 200,000 transactions per second, but we easily got more than 250,000 and even went as high as 340,000.

Then we decided to test the reading and discovered that we had a problem. At some point in time, as the number of customers increases, the lag between the customer and the trading core increases dramatically. And this growth is infinite, the system cannot recover from it. Our storage is no longer able to accept data from the trading core, so the lag is getting bigger and bigger.

Analyzing and solving the problem of lagging in reading

To solve this problem, we needed to measure exactly how far behind our clients were. When an order is generated, it has the time of its creation put in it. When the client receives it, they can compare the time of creation and the time of its delivery to feed it into the monitoring system. That’s how we found a dramatic increase in latency.

And we started looking for the source of this problem. And when we analyzed the CPU usage, we found that the Tarantool network thread was overloaded. That is, it could not properly handle not only requests from customers, but also data from the trading core.

We also found another weird thing: our RPS from the client increased sharply as well. Meaning, the customer started to access our system very frequently. The latency of such requests is very low — in other words, the customers come for updates frequently, but their requests are quite short. So they weren’t getting the right data.

We got a whole set of symptoms: the network thread was overloaded, the RPS of clients increased, while their requests became very short. Why would this happen?

Normally, customers would pick up data from the queue with some lag from the trading core. But there can be situations when the clients start to get ahead of the trading core, and the pieces of data they’re picking up become very small. As a result, the clients make more and more requests, loading the system harder and slowing down further receiving of data from the trading core, thus aggravating the situation.

To stop this, we decided to monitor how many blank responses we were giving to clients. Turned out, more than 80% of responses were empty. And the moment when the number of empty responses increases exactly matched the moment when we have a drastic increase in customer latency from the trading core. So we found a problem, and now it had to be solved.

We decided to do the following. When new data comes into the system, we check to see how recent the data is, and save information about the most recent order that came into our system.

When a customer comes in with some key, we recognize that this data may not yet be in the system. In this case, we put the client’s request to sleep for a while until the new data arrives, or until the timeout has passed. It’s very convenient to implement this in Tarantool, as it has some form of coroutines, fibers, and there are also condition variables that know how to put a fiber to sleep and then wake it up. That is, when new data comes in, we can wake up the client right from the procedure that writes the data.

When the client wakes up, there’s no need to recreate an iterator, as the old one can be reused. That is, in order to continue iterating, we don’t need to look up the position in the index in the iterator, again in logarithmic time.

As a result, we had almost got rid of empty responses and dramatically reduced the number of packets per second, relieving the Tarantool thread by more than 15%.

Working with the size of responses

We realized that the number of data given away is a very important metric that needs to be monitored. Then we decided to collect how much data comes to the client. It was convenient to present as a histogram over time:

You can see clearly that there are no more empty responses. But the packages are still very small, the peak of response size is at one entry. It would be good to shift this peak to the right, i.e., to increase the size of the package that customers will receive.

We ended up simplifying the system. The function that receives the data has returned to its original state: simply putting the data into the storage. The procedure that collects the data performs a select query. If it realizes that the right data is missing and the client will get too small a package, it falls asleep. And after a while it performs a select query again. If we meet the latency of the request, we can do it again, otherwise we have to return the data as it is. This optimization gave a serious increase in package size from 1 to 20.

The waiting time depends on how big a package we want to send. It is better to choose the size experimentally. The metrics we collect on customer lag from the trading core, as well as package size, help us a lot with this. We decided to try it on, and this is what we got:

With this approach, we can accumulate packages of up to 70 items, which relieves the network thread considerably and allows us to keep latency very low. You have to take into account that this is a long polling, but only at 0.5 ms. Generally, the data was returned within hundreds of microseconds. So, in a way, we can call it a short polling.

Sharding the trading core

We needed to split the trading core into pieces — shards. But in such a way that the client wouldn’t be aware of the complexity of sharding and would continue to receive the data in the right order.

Initially we have one trading core. The client needs a key pair of seq and rec to get the data. They can easily use this key to retrieve data from a single trading core. But a problem arises with two and more cores: the client needs to know where the data is stored and should have a key for each core.

A distributed substitute key can be introduced to solve this problem. It can run on the same protocol as before. But there is either a single point of failure, or the data has to be reconciled, and that will slow things down a lot.

We decided not to use a substitute key and to use a vector clock instead. They hide the complexity of sharding, as the client only needs to have a key, which our system will give him. The next time they can come with this key, provide this key to the router, and the router itself will determine which shard the data should be taken from and how it should be merged.

As a result, our sharding started to look like this:

We have several sources of data — pieces of a divided trading core. We can divide these pieces into shards however we want: duplicate them, make replicas, whatever we need for flexible scaling on reads.

It’s worth recalling that we have two kinds of customers:

  • The first type is customers who want to receive some subset of data by filtering it. Low latency is important to them. It can be various brokers and traders who only need a certain tool, such as their trading orders.
  • The second type is customers who don’t need filtering. They want to acquire the entire data stream coming from the trading core. In doing so, they don’t require a minimum latency.

Since we know that the minimum latency in our system is on the masters, that’s where we can put clients with filtering requests. The rest can be handled in replicas:

Not only do we know that customers without filters don’t need filtering, they also don’t need indexes. So we can make replicas with no indexes to save on replication and memory. This approach gives us the following picture:

We manage to keep very low latency for customers who care about it by moving a large number of customers to replicas. Moreover, as replicas have some lag, it keeps the clients’ lag from the sources low, keeping the average lag time at one millisecond.

Metadata for diagnostics and optimization

There is some response body that the client receives from the system. And there is also some metadata — information about what operations were performed in the stored procedure and how long it took. As a result, during system diagnostics, we will understand where and what is running slowly.

We put the most recent cursor in the package into the metadata. That is, the client can take this cursor without reading the data further and immediately make the next request.

We also read when the oldest entry in the package was created. And all this data is right in the beginning of the header. So, to make the next request, the client only needs to read the header. They read the next cursor, perform a new request immediately, and decode the body of the response.

This is significant since the body of one trading order occupies about 600 bytes. And we have a whole bunch of 20–70 items. Previously, the client had to go through a large amount of data to read — the cursor at the beginning of the response optimizes this.

More about our emulators

We mentioned above that we needed some emulators for testing purposes. We have two types of them: for the trading core and for the customer.

The trading core emulator is able to generate trading orders, which will go further into the system. It puts the time of creation in this order. Thus, when the order comes to the client, we can calculate the time it took for the order to reach the client from the trading core. It can also simulate outbursts, like situations in which some instruments start to trade much more intensely. It is able to save the data generation speed we need and calculate the speed at which the data is actually going to be delivered. And then feed it all to Prometheus.

The customer emulator collects a large number of metrics, including traces that come from the storage itself, and also sends them to Prometheus, counting the size of the package. This way, we can trace how the system works.

There are also a number of metrics that are in the storage itself — Tarantool supports this. It can collect data on how much the threads are loaded on CPU, metrics on the storage (e.g., the rate of inserts, updates and selects), and measure the table growth rate. It also passes it all further.

First, we tested this on our personal test bench. We used emulators in the form of command line utilities for this purpose. Then we had to test it on our customer’s test bench. And without direct access to it: we had a person with whom we could talk on Zoom and give him some instructions.

We decided to optimize it by turning the trading core emulator into a single daemon that knows how to launch the right sources on the right trading core emulator and emulate clients automatically, according to some configuration and load profile.

We have some complete descriptions of what the test and the cluster configuration should look like, and how the emulators, clients, and trading kernels should be configured. The person on the other end of the line only needs to enter a single command, which will deploy everything fully and then will run the test. All that’s left to do is to pass us the data that has been collected in Prometheus, and we will analyze this data in our dashboards. Basically, we have a complete picture of everything that happens in the cluster during the test.

Project results

  • The system can accept 200,000 transactions per second to write on a single node.
  • The latency is within the limits.
  • Customers are unaware of the separation, the complexity of sharding, and how our system is organized internally.
  • The data is given to the client in the right order without any omissions.
  • The client always gets the most up-to-date data.
  • Not only can the system quickly return the new data that came streaming in, but also the old data from within a minute or within a day.

We successfully experimented with the architecture of a distributed trading information delivery system using Tarantool. The pilot was considered successful by the customer. And it will be further developed.

We drew several conclusions:

If some problem occurred, you have to learn how to measure it before jumping at it. Find the metric that correlates with when it occurs. You can struggle for weeks trying to figure out why something isn’t working. But until there is no metric, solving it would make no difference. A complex system must be built iteratively from the most basic. Errors, including the most complex and unusual ones, tend to occur at the simplest level.

It is important to measure all the business and technical metrics: transit time through the system, iptraf-ng, utilization of various threads, and so on.

Keeping it all in one dashboard is important in order to observe the correlation of what’s going on. For example, you see an increase in packages per second, network thread usage, and number of requests. You can make a guess, but rather than rushing out to fix it, you can find a metric that will confirm or disprove your assumption.

It’s important to understand how the algorithms work. We described the B+ tree. So we were able to stack the data in such a way that all fresh or updated data was at the end of the tree. This allowed us to use an iterator that would traverse this tree as a linked list.

Sometimes it’s good to slow down in order to speed up. If you don’t, you might fall into a curious trap. For example, customers can get errors and return for the data again. They need to be slowed down in some way. You can use a circuit breaker for this, or you can insert artificial delays into the application itself.

Even though we need a latency of fractions of a millisecond at high performance, we still slow down clients at high load. This works effectively together with controlling how many entries we are handing out to clients.

And it’s crucial not to be afraid to test different tools. In the course of this pilot, we took completely different tools and versions to test and see how everything worked. As a result, we chose Tarantool because it suited us best, and we didn’t regret it.

You can download Tarantool on the official website and get help in the Telegram chat.

--

--