Array based semi-mutable sets – implementation

Now a typical example where many items can be added within single transaction is when a database collects time stamped events also called  time series. One can easily imagine a system where many thousands of events are added to a database every second. Obviously there is no point in creating a single transaction per event and in general one can batch them together. The delay between consecutive batches will then depend on the acceptable delay for events to be made available to the database readers. One can imagine batching periods ranging from the order of milliseconds to many seconds or more. Overall the important parameters of such a system are:

  • event production rate
  • batching period
  • events retention period

Together these parameters will determine the liveliness and memory usage of the database. As an example consider NASDAQ trades. About 20 billion shares are traded every day on the NASDAQ, assuming an average of 300 shares per trade (approximate NYSE numbers) and a trading day of 7 hours there are, on average, about 3000 transactions per second. Assuming a retention period of 5 days, we are talking about keeping about 350 million trading transactions in memory. With a batching period of 1 second, we will be creating about 117000 arrays, each containing on average 3000 events. While events are being added to the database, transactions beyond the retention date will be removed by the system. To avoid contention, I will be doing this garbage collection in the same transaction as the one that writes the new events. Essentially I will have a system with a single threaded writer and multi threaded reader.

Implementation

The trading transactions will be represented using the following data structure:

public struct TradingEvent : IComparable {
    public long Id;
    public long Time; // unix milliseconds
    public short Price;
    public short Volume;
    public short Stock;
    public short BuyerId;
    public short SellerId;
    public short TraderId;
    public int CompareTo(object other) {
        return Time.CompareTo(((long)other));
    }
}

which takes up 32 bytes. The following data structure holds the array of trades ordered by time:

public struct TradeBatch : IComparable {
    public Guid Id;
    public long First;
    public long Last;
    public TradingEvent[] Trades;
    public int CompareTo(object other) {
        var that = (TradeBatch) other;
        var result = First.CompareTo(that.First);
        if (result == 0) result = Id.CompareTo(that.Id);
        return result;
    }
}

which are then stored in the following semi-mutable set:

smxSet<TradeBatch> allTradeBatches;

The code first populates 5 days worth of events (about 378 million events, i.e. 11 GB) and then generates 3000 events every second. 500 events are generated with time stamps within the batching period of one second, another 500 events  within 2 seconds, then another 500 within 5 seconds, 500 within 10 seconds, 500 within 30 seconds and the last 500 are generated within 60 seconds. This models the case where events are received after some delay. Hence on average about 60 arrays overlap each other in time, or in other words, for any given time, events can be found on average in 60 arrays.

Since the events within an array are sorted, enumerating all events within a time range is just a matter of filtering the batches that overlap with the desired range and then binary chopping the array to get the required indices that match the time interval:

foreach (var batch in allTradeBatches) {
    var firstIndex = 0;
    var lastIndex = batch.Trades.Length;
    if (start >= batch.First && start <= batch.Last) {
        firstIndex = Array.BinarySearch(batch.Trades, start);
        if (firstIndex < 0) firstIndex = -(firstIndex + 1); } 
        if (end >= batch.First && end<= batch.Last) {
        lastIndex = Array.BinarySearch(batch.Trades, end);
        if (lastIndex < 0) lastIndex = -(lastIndex + 1);
    }
    for (int i = firstIndex; i < lastIndex; i++) {
        totalVolume += batch.Trades[i].Volume;
    }
}

On average, the binary search should only have to be carried twice on sixty arrays. Once the indices have been determined, all accesses to memory are sequential, i.e. rates of 10 GB/s can be expected, which should translate to 320 Mev/s. Assuming a maximum bandwidth of 50 GB/s when multithreaded, we could expect to read events at the rate of 1.6 Gev/s.

Obviously the binary search adds an overhead. The standard array implementation takes about 4 μs per array fo 3000 events on my laptop. A basic custom implementation, one which doesn’t have to go through the ICompare interface and the unboxing of the long takes 1.4 μs. Assuming 120 binary searches, the current implementation presents an overhead of up to 0.5 ms or, with a custom implementation, 170 μs. To put things in context, this corresponds to the time it takes to sequentially access respectively 160 Kev or 54 Kev.

Interestingly, the binary search time of 4 μs corresponds to reading 1280 events, double that if you have to perform two binary searches and we could have already retrieved 2560 event! Reading the whole array takes almost the same time as performing the binary search! However, this is not the case if one uses the custom implementation, but is still a remarkable fact that highlights the incredible speed of sequential reads. Actually the code to be executed would have to be something like:

for (int i = 0; i < batch.Trades.Length; i++) { 
    if (batch.Trades[i].Time > start && batch.Trades[i].Time < end) {
        totalVolume += batch.Trades[i].Volume;
    }
}

So let’s see how this compares to the same loop but without the conditions on the time. On an E5 2666 Xeon machine, it took 36 ms with the conditions versus 29 ms without them to enumerate 3000*3600 events. In other words 300 Mev/s versus 372 Mev/s. So a slowdown of about 20%.

The main advantage this last version is that the arrays do not have to be sorted! Whether this is really an advantage will depend on the actual use case, and in general more time is spent reading than writing, so it is often good practice to do more work only once.

I have executed some more complete tests (still focused on enumerating large numbers of events rather than just a few, so the impact of the binary searches should be negligible). The code is executed in up to 8 threads, each threads enumerates a disjoint set of events, for example, in the case of two threads, the first one enumerates the first half of all events while the second thread enumerates the second half. This way all the data from the event batches is read from main memory rather than from the cache. The events set was populated with 5*7*3600 batches of 3000 events.

Reads-overlaping

Single threaded performance stands at 315 Mev/s (almost 10 GB/s), maximum total bandwidth reaches about 1.3 Gev/s. These results include garbage collection of versions, generation of 3000 events every second, removal of old batches, but no persistence to disk.

Using this approach, the database is populated with 126’000 arrays, which is a lot, also on average 60 arrays overlap in time. Although this approach is most efficient at enumerating all the events within a given time range, it is important to note that overall the events are only partially ordered.

In the next post I shall illustrate how these small arrays can be consolidated into fully ordered larger non-overlapping ones.

Array based semi-mutable sets

Persistent sets, on which my semi-mutable sets are based, are usually implemented as trees and are relatively inefficient in both insertion costs and memory usage. They are particularly inefficient when inserting many items in a single transaction as each insert requires the creation of a new path to the newly inserted item. This comes with all the associated memory allocation costs and subsequent garbage collection overhead. This problem can severe when deserializing large sets. Unless of course the data structure supports a merge function.

To illustrate this I have performed some quick and dirty tests on a few .Net data structures:

  • array
  • HashSet
  • FSharpSet

Where the FSharpSet is the only persistent data structure and which I use in my semi-mutable set. The first test consist of populating the data structures with 1’000’000 random longs:

    long[]    12 ms
    long[]   112 ms    and sorted
    HashSet  180 ms    added one at a time
    FSharp  4990 ms    added one at a time

Nothing surprising here, Note that creating the FSharpSet set from an array of longs isn’t faster than adding one item at at a time. A quick look at the source code indicates that the constructor actually adds these values one at a time.

The next tests consist of measuring the enumeration speed of the above structures. 100 enumerations of 1’000’000 elements:

    long[]    820 ms
    HashSet  1400 ms    
    FSharp  14000 ms

Again here, there is no real surprise, except maybe that the HashSet is doing rather well when compared to the array. Note that as the enumeration is executed 100 times in loop, the results might well be different when executed only once due to the caching of the data.

Finally memory usage is again unsurprising:

    long[]    7 MB
    HashSet  37 MB
    FSharp   34 MB

Arrays obviously win, by a factor of four. For small objects such as a long, the difference is massive, with an overhead over the actual data of a factor of 3 for longs. 28 MB of overhead for 1’000’000 elements is about 28 bytes per element, which becomes almost negligible when dealing with large objects.

So, the idea is to combine the advantages of both array and persistent sets in a single structure that minimizes batch insertion time and memory consumption with maximum enumeration speed. We create a persistent set of arrays of values. At its simplest every new batch of values is inserted in the semi-mutable set as an ordered array of values.

Here is a short list of some of the disadvantages of this approach

  • it is expensive to enumerate all the element in order as one will have to continuously hop through all the arrays, unless all the arrays are disjoint.
  • adding or removing a few elements requires the copying of a complete array
  • there is nothing to stop having multiple copies of the same elements in a given sets

However, since the arrays are ordered, it is relatively cheap to enumerate (not in order) all elements in a given range. Though it will still be slower than when working with a fully ordered set as all the arrays have to be searched (unless again they don’t overlap).

In the next post I shall illustrate the implementation and use of this data structure with a fairly realistic example. At that point we will a system with a very large number of relatively small array and we will see how our arrays can be consolidated into fewer, larger ones.

Model comparison

A long time overdue post…

One of my preferred slides when presenting my transactional model compares different locking models. I start with the most restrictive model, a single global lock:

model1

where I consider four threads named A,B,C,D. Writes are amber coloured, reads are in blue. Dashed sections represent blocked threads waiting to either write or read. Since there is a single global lock, the execution is serialized, there is no concurrent execution and there is always one active thread and three blocked ones. Long running “transactions” have a direct impact on the system’s throughput.

Things improve “dramatically” when we swap our single global lock for a reader-writer lock:

model2

Hey it’s concurrent now! Well as long as we read we have concurrency, but as soon as we want to write all the reader threads are blocked. But still a huge improvement over having a single global lock. Also the writer thread has to wait for all reader threads to terminate before it can acquire the writer lock. The reader threads cannot be allowed to last too long if we want to have to give the writer thread a chance to update the system. So there are still some serious constraints. On the positive side there is one important property: since only one writer is allowed, with no concurrent reader, there is no need for any transactional mechanism, all writes performed can only be read after the writer lock has been released. So no partial writes can be observed by any reader. Long running “transactions” still have a direct impact on the system’s throughput: long running read transaction prevents writes and vice-versa.

Next comes the versioned system:

model3

which uses a single, global writer lock. As before we can have any number of concurrent readers; the novelty is that since existing versions are strictly immutable, the reader threads can be executed concurrently with a writer thread. So in practice a reader thread can last as long as needed (at the cost of having to keep some versions in memory). We still have a single writer lock, so a thread that wants to perform writes has to wait for the lock to become available. So we are still single threaded in terms of writes. As a consequence there is still no need for any transactional mechanism as partial writes cannot be observed. Long running read-only transactions are possible, we can also have long running write transactions, but at the cost of blocking other would-be writers. The cost of moving to this higher level of concurrency is an increase in memory usage as all accessible versions must be preserved. Note that write transactions cannot fail when we are dealing with essentially single threaded writer systems. Additionally the thread can directly modify the versioned variables, no need to delay the writes to the commit phase. This model can also be fully single threaded in which case we get:

model4

The last model is the one I have described in this blog:

model5b

Here we are adding support for long-running concurrent write transactions. In terms of locking we have the single global version lock which is very short (increment a number and copy it) and the longer per transactional box/variable lock that must be taken for the duration of the validation and update phase. In the above diagram we have thread A waiting on thread D to acquire a box lock and later we have thread C waiting on thread D. We have seen before that this can be sometimes mitigated by partitioning the data.

This last model comes at a cost: registration of reads and writes during the execution of the body of the transaction, validation stage and commit stage. Concurrent writer transactions also introduce the risk of conflicts, but we have seen that leveraging the semantics of data structures can greatly reduce this risk.

Any system with a decent level of concurrency will, most likely have to use at least versioned variables. We must remember that server processors can have up to 18 Intel cores (expensive, $4115  ) or 16 AMD cores (cheaper, $719) You can use dual socket, 8 cores per processor, systems in the cloud, that’s 32 hyper threaded cores at a relatively low cost.

I expect the difference between the single threaded writer model and the concurrent writer model to be, in terms of ease of implementation and in terms of performance to be absolutely massive. However, when dealing with concurrent, long-running transactions (I shall come back to discussing what a transaction can be considered a long-running one) write transactions there is no substitute to the full model.

Traced execution

I will present in this post a proof of concept of an idea I got nearly a year ago while trying to remote the execution of some code and got tired of the limitations of C# in general and of Linq.Expressions in particular. Run-time reflection in C# 4.0 is limited to type and Linq.Expressions, here I demonstrate a form of reflection that can be performed on a small subset of C# statements.

Read more of this post

Piecewise immutable programming

Copyright © 2011 Gabriel Zs. K. Horvath

Although not a programming language expert at all, I will try to muster all my limited knowledge to briefly discuss software transactions in terms of programming model. Be aware that this is all very tentative…

On the one hand the level of isolation and immutability is very much reminiscent of functional programming, on the other hand the mutability of the versioned variables is essentially imperative programming. To better understand this relationship let’s see, very tentatively, how my model can be used to code in a “pure” imperative way or in a more functional one.

Read more of this post

Model specification

Copyright © 2011 Gabriel Zs. K. Horvath

This is my first attempt at giving a more formal definition of my software transactions model. Undoubtedly, a post which will be frequently updated.

Consider a system composed of the following:

  • A single transaction manager holding a unique global version number
  • Transactions
  • Threads
  • Semi-mutable versioned variables managed by the transaction manager
  • Non-versioned transaction-local immutable variables
  • External resources, these can be either read from or written to
  • A garbage collector

The program consists in the execution of transactions in one or more concurrent threads.

Read more of this post

Joining transactions

Copyright 2011 © Gabriel Zs. K. Horvath

Up to now all the transactions have been running in independent threads. There hasn’t been any mention of sub-transactions or the possibility of joining threads.

Sub-transactions

Since all variables within a transaction are immutable, it is meaningless to think in terms of sub-transactions which commit changes. Instead we can have multiple threads running concurrently within a single transaction and merge the recorded write operations of the various threads when joing the threads. Since the threads within the same transaction must still register all the reads and writes independently of the main transaction they are equivalent to normal transactions which happen to share the same version number as the main transaction. So far my model has no concept of sub transactions.

Read more of this post

Garbage collection


Copyright 2010-2011 © Gabriel Zs. K. Horvath

So far all the versions of the versioned variables have been kept in the variables’ history. This is obviously not a viable option and an unnecessary one as most of the versions are likely to become quickly unreachable. Let me first define what a reachable version is. Consider the following versioned variables:

Read more of this post

Contiguous transactions and notifications

Copyright 2009-2011 © Gabriel Zs. K. Horvath

Contiguous transactions

Up to now atomicity has consisted in ensuring that writes to a number of versioned variables were “performed instantly”. The idea is that there is no observable gap between when the variables involved in the transaction are modified. So we have no gap between writes, but what about gaps between transactions? After a transaction has committed one might need to start a new transaction in the state the previous transaction left it in. One could start a new transaction straightaway, hoping to catch the system in the state the previous transaction left it in. But of course there always a risk that another transaction commits in the meantime and modifies the state of the system before our follow-on transaction starts. A contiguous transaction is one which is guaranteed to see the system in the state its parent transaction put it in.

Read more of this post

Composition and silent reads

Copyright 2010-2011 © Gabriel Zs. K. Horvath

So far all the read operations performed in the atomic block were being recorded, so as to be re-executed at commit time. We will see in this post that there are circumstances where one does not want the reads to be recorded. I will call these silent reads.

Composition

One of the most important and powerful concept in software engineering is the one of composition. We want to be able to compose existing data structures together to build new ones. Or we want to add new methods to existing ones without having to perform open heart surgery on that component. So let’s look at the concrete example of trying to implement the Last method on top of the set data structure:

public IEnumerable<T> Last(this IEnumerable<T> that) {
    var enumerator = that.GetEnumerator()
    if (enumerator.MoveNext()) {
        T t = enumerator.Current;
        while (enumerator.MoveNext()) {
            t = enumerator.Current;
        }
        yield t;
    }
}

Read more of this post

Awelon Blue

Thoughts on Programming Experience Design

Joe Duffy's Blog

Adventures in the High-tech Underbelly

Design Matters

Furniture design blog from Design Matters author George Walker

Shavings

A Blog for Woodworkers by Gary Rogowski

randallnatomagan

Woodworking, life and all things between