Array based semi-mutable sets – implementation

Now a typical example where many items can be added within single transaction is when a database collects time stamped events also called  time series. One can easily imagine a system where many thousands of events are added to a database every second. Obviously there is no point in creating a single transaction per event and in general one can batch them together. The delay between consecutive batches will then depend on the acceptable delay for events to be made available to the database readers. One can imagine batching periods ranging from the order of milliseconds to many seconds or more. Overall the important parameters of such a system are:

  • event production rate
  • batching period
  • events retention period

Together these parameters will determine the liveliness and memory usage of the database. As an example consider NASDAQ trades. About 20 billion shares are traded every day on the NASDAQ, assuming an average of 300 shares per trade (approximate NYSE numbers) and a trading day of 7 hours there are, on average, about 3000 transactions per second. Assuming a retention period of 5 days, we are talking about keeping about 350 million trading transactions in memory. With a batching period of 1 second, we will be creating about 117000 arrays, each containing on average 3000 events. While events are being added to the database, transactions beyond the retention date will be removed by the system. To avoid contention, I will be doing this garbage collection in the same transaction as the one that writes the new events. Essentially I will have a system with a single threaded writer and multi threaded reader.

Implementation

The trading transactions will be represented using the following data structure:

public struct TradingEvent : IComparable {
    public long Id;
    public long Time; // unix milliseconds
    public short Price;
    public short Volume;
    public short Stock;
    public short BuyerId;
    public short SellerId;
    public short TraderId;
    public int CompareTo(object other) {
        return Time.CompareTo(((long)other));
    }
}

which takes up 32 bytes. The following data structure holds the array of trades ordered by time:

public struct TradeBatch : IComparable {
    public Guid Id;
    public long First;
    public long Last;
    public TradingEvent[] Trades;
    public int CompareTo(object other) {
        var that = (TradeBatch) other;
        var result = First.CompareTo(that.First);
        if (result == 0) result = Id.CompareTo(that.Id);
        return result;
    }
}

which are then stored in the following semi-mutable set:

smxSet<TradeBatch> allTradeBatches;

The code first populates 5 days worth of events (about 378 million events, i.e. 11 GB) and then generates 3000 events every second. 500 events are generated with time stamps within the batching period of one second, another 500 events  within 2 seconds, then another 500 within 5 seconds, 500 within 10 seconds, 500 within 30 seconds and the last 500 are generated within 60 seconds. This models the case where events are received after some delay. Hence on average about 60 arrays overlap each other in time, or in other words, for any given time, events can be found on average in 60 arrays.

Since the events within an array are sorted, enumerating all events within a time range is just a matter of filtering the batches that overlap with the desired range and then binary chopping the array to get the required indices that match the time interval:

foreach (var batch in allTradeBatches) {
    var firstIndex = 0;
    var lastIndex = batch.Trades.Length;
    if (start >= batch.First && start <= batch.Last) {
        firstIndex = Array.BinarySearch(batch.Trades, start);
        if (firstIndex < 0) firstIndex = -(firstIndex + 1); } 
        if (end >= batch.First && end<= batch.Last) {
        lastIndex = Array.BinarySearch(batch.Trades, end);
        if (lastIndex < 0) lastIndex = -(lastIndex + 1);
    }
    for (int i = firstIndex; i < lastIndex; i++) {
        totalVolume += batch.Trades[i].Volume;
    }
}

On average, the binary search should only have to be carried twice on sixty arrays. Once the indices have been determined, all accesses to memory are sequential, i.e. rates of 10 GB/s can be expected, which should translate to 320 Mev/s. Assuming a maximum bandwidth of 50 GB/s when multithreaded, we could expect to read events at the rate of 1.6 Gev/s.

Obviously the binary search adds an overhead. The standard array implementation takes about 4 μs per array fo 3000 events on my laptop. A basic custom implementation, one which doesn’t have to go through the ICompare interface and the unboxing of the long takes 1.4 μs. Assuming 120 binary searches, the current implementation presents an overhead of up to 0.5 ms or, with a custom implementation, 170 μs. To put things in context, this corresponds to the time it takes to sequentially access respectively 160 Kev or 54 Kev.

Interestingly, the binary search time of 4 μs corresponds to reading 1280 events, double that if you have to perform two binary searches and we could have already retrieved 2560 event! Reading the whole array takes almost the same time as performing the binary search! However, this is not the case if one uses the custom implementation, but is still a remarkable fact that highlights the incredible speed of sequential reads. Actually the code to be executed would have to be something like:

for (int i = 0; i < batch.Trades.Length; i++) { 
    if (batch.Trades[i].Time > start && batch.Trades[i].Time < end) {
        totalVolume += batch.Trades[i].Volume;
    }
}

So let’s see how this compares to the same loop but without the conditions on the time. On an E5 2666 Xeon machine, it took 36 ms with the conditions versus 29 ms without them to enumerate 3000*3600 events. In other words 300 Mev/s versus 372 Mev/s. So a slowdown of about 20%.

The main advantage this last version is that the arrays do not have to be sorted! Whether this is really an advantage will depend on the actual use case, and in general more time is spent reading than writing, so it is often good practice to do more work only once.

I have executed some more complete tests (still focused on enumerating large numbers of events rather than just a few, so the impact of the binary searches should be negligible). The code is executed in up to 8 threads, each threads enumerates a disjoint set of events, for example, in the case of two threads, the first one enumerates the first half of all events while the second thread enumerates the second half. This way all the data from the event batches is read from main memory rather than from the cache. The events set was populated with 5*7*3600 batches of 3000 events.

Reads-overlaping

Single threaded performance stands at 315 Mev/s (almost 10 GB/s), maximum total bandwidth reaches about 1.3 Gev/s. These results include garbage collection of versions, generation of 3000 events every second, removal of old batches, but no persistence to disk.

Using this approach, the database is populated with 126’000 arrays, which is a lot, also on average 60 arrays overlap in time. Although this approach is most efficient at enumerating all the events within a given time range, it is important to note that overall the events are only partially ordered.

In the next post I shall illustrate how these small arrays can be consolidated into fully ordered larger non-overlapping ones.

Model comparison

A long time overdue post…

One of my preferred slides when presenting my transactional model compares different locking models. I start with the most restrictive model, a single global lock:

model1

where I consider four threads named A,B,C,D. Writes are amber coloured, reads are in blue. Dashed sections represent blocked threads waiting to either write or read. Since there is a single global lock, the execution is serialized, there is no concurrent execution and there is always one active thread and three blocked ones. Long running “transactions” have a direct impact on the system’s throughput.

Things improve “dramatically” when we swap our single global lock for a reader-writer lock:

model2

Hey it’s concurrent now! Well as long as we read we have concurrency, but as soon as we want to write all the reader threads are blocked. But still a huge improvement over having a single global lock. Also the writer thread has to wait for all reader threads to terminate before it can acquire the writer lock. The reader threads cannot be allowed to last too long if we want to have to give the writer thread a chance to update the system. So there are still some serious constraints. On the positive side there is one important property: since only one writer is allowed, with no concurrent reader, there is no need for any transactional mechanism, all writes performed can only be read after the writer lock has been released. So no partial writes can be observed by any reader. Long running “transactions” still have a direct impact on the system’s throughput: long running read transaction prevents writes and vice-versa.

Next comes the versioned system:

model3

which uses a single, global writer lock. As before we can have any number of concurrent readers; the novelty is that since existing versions are strictly immutable, the reader threads can be executed concurrently with a writer thread. So in practice a reader thread can last as long as needed (at the cost of having to keep some versions in memory). We still have a single writer lock, so a thread that wants to perform writes has to wait for the lock to become available. So we are still single threaded in terms of writes. As a consequence there is still no need for any transactional mechanism as partial writes cannot be observed. Long running read-only transactions are possible, we can also have long running write transactions, but at the cost of blocking other would-be writers. The cost of moving to this higher level of concurrency is an increase in memory usage as all accessible versions must be preserved. Note that write transactions cannot fail when we are dealing with essentially single threaded writer systems. Additionally the thread can directly modify the versioned variables, no need to delay the writes to the commit phase. This model can also be fully single threaded in which case we get:

model4

The last model is the one I have described in this blog:

model5b

Here we are adding support for long-running concurrent write transactions. In terms of locking we have the single global version lock which is very short (increment a number and copy it) and the longer per transactional box/variable lock that must be taken for the duration of the validation and update phase. In the above diagram we have thread A waiting on thread D to acquire a box lock and later we have thread C waiting on thread D. We have seen before that this can be sometimes mitigated by partitioning the data.

This last model comes at a cost: registration of reads and writes during the execution of the body of the transaction, validation stage and commit stage. Concurrent writer transactions also introduce the risk of conflicts, but we have seen that leveraging the semantics of data structures can greatly reduce this risk.

Any system with a decent level of concurrency will, most likely have to use at least versioned variables. We must remember that server processors can have up to 18 Intel cores (expensive, $4115  ) or 16 AMD cores (cheaper, $719) You can use dual socket, 8 cores per processor, systems in the cloud, that’s 32 hyper threaded cores at a relatively low cost.

I expect the difference between the single threaded writer model and the concurrent writer model to be, in terms of ease of implementation and in terms of performance to be absolutely massive. However, when dealing with concurrent, long-running transactions (I shall come back to discussing what a transaction can be considered a long-running one) write transactions there is no substitute to the full model.

Awelon Blue

Thoughts on Programming Experience Design

Joe Duffy's Blog

Adventures in the High-tech Underbelly

Design Matters

Furniture design blog from Design Matters author George Walker

Shavings

A Blog for Woodworkers by Gary Rogowski

Paul Sellers' Blog

A Lifestyle Woodworker

randallnatomagan

Woodworking, life and all things between