Atomicity of tuple assigment

Tuples in C# have been present since version 4.0 as a library without any language support. C# 7.0 finally introduces proper language integration of tuples. For example, the syntax to create a tuple is simply:

var pair = (1,2);

Deconstructing is also supported:

var (x,y) = pair;

Combining these two features lets us use tuple assignment to swap variables:

(a,b) = (b,a);

Which is almost equivalent to, but more compact and more explicit than, the following:

var temp = a;
a = b;
b = temp;

This behaviour of tuple is akin to atomicity, indeed the left-hand side variables, are only assigned to once all the right-hand side ones have been evaluated. In pseudo syntax of my transactional model we would have:

transaction {
  a = b;
  b = a;
}

where a and b are both semi-mutable variables, hence changes would only be visible after the atomic block. Or in standard C# syntax:

var tempA = a;
var tempB = b;
a = tempB;
b = tempaA;

The tuple assignment syntax to swap two variables simplifies some standard algorithms such as the partitioning in the quick sort algorithm:

    public int Partition(int[] v, int low, int high) {
        var p = v[high];
        var i = low-1;
        for (int k = low; k < high; k++) {
            if (v[k] < p) {                
                i++;
                if (i != k) {
                    (v[i], v[k]) = (v[k], v[i]);
                }
            }
        }
        i++;
        (v[i], v[high]) = (v[high], v[i]);
        return i ;
    }

where the big advantage is that there is no need to use a temporary value or to worry about the order of the various node references are assigned. The downside is readability which decreases as the number of variables increases.

Other algorithms which manipulate linked list can also benefit from this syntax, for example the partitioning of a linked list can be written as:

private ListNode partition(ListNode node, int value) {
if (node == null) return null;
ListNode head = node;
while (node.next != null) {
if (node.next.value < value) {
(node.next, node.next.next, head) = (node.next.next, head, node.next);
} else {
node = node.next;
}
}
return head;
}

where the big advantage is that there is no need to use a temporary value or to worry about the order of the various node references are assigned. The downside is readability which decreases as the number of variables increases.

The atomicity of this tuple assignment in C# is much stronger than, for example, in Python. You might have noticed that in the tuple assignment we first assign a new value to node.next and then to node.next.next. This will give a wrong result in Python, as the lhs are assigned in turn. In C#, the address of the destination variable is evaluated and stored before any assignment is performed.

In terms of atomicity of this assignment operation in case of exceptions, exceptions thrown while evaluating the right-hand side will result in none of the left-hand side value being assigned to. However, if any setter in the left-hand side throws, we can end-up with partially assigned values:

(c.r,c.i) = (1,2);

if the setter of c.i throws an exception, only c.r will have been assigned to, leaving c.i unassigned.

Albeit this assignment is only partially atomic, I find it useful in algorithms where is various variables or properties are swapped around.

Advertisement

Array based semi-mutable sets – consolidation

The approach described so far minimized the write activity by creating arrays of events that overlap in time. Storing events in arrays results in extremely fast reads, while the only-add-don’t-modify existing data limits the cost of writes. We even saw that in some cases it might not even be necessary to sort the events by time.

Although it was easy to find all events within a time range using simple binary searches, the fact that the events are not fully ordered renders some calculations impossible. For example, any calculation that relies on the time interval between events is pretty much impossible unless all the events are ordered. Similarly aggregation, compression and decimation become much easier or simply feasible when the events are ordered.

The other problem I can see arising is the multiplication of small arrays, there is a point where too many small arrays renders the approach counter productive. The solution to this is to consolidate several arrays into larger ones. This can be done at any time, even when new events are added. Alternatively it can be limited to older events, for example, when one knows that no event older than some time limit will be added to the system.

The following diagrams illustrates the process:

before consolidation

Each line represents an array of events, each dot represents an event. In this example only five arrays overlap. The state of the system after consolidation is then:

after consolidation

where the consolidated array contains all the events within a predetermined time range. Whether one will want to sort the events within the arrays is again a question of “it depends”. In particular it will depend on the size of the array and the expected queries.

My code consolidates all events older than 30 minutes over one minute periods, so that the new event batches contain on average 180’000 events. The consolidation process consists of copying the events to a new array and replacing the truncated batch instances with new one. To optimize this process and avoiding copying the events that are not consolidated unnecessarily I added a new field to the batch class that specifies the first valid event in the event array. This way the (immutable) array of events is reused by the new batch instance.

public struct TradingEvent : IComparable {
    public long Id;
    public long Time; // unix milliseconds
    public short Price;
    public short Volume;
    public short Stock;
    public short BuyerId;
    public short SellerId;
    public short TraderId;
    public long  FirstEvent; // the index of the first event after partial consolidation
    public int CompareTo(object other) {
        return Time.CompareTo(((long)other));
    }
}

If the small arrays are not sorted, each has to be enumerated to extract the events to be moved to the consolidated array while the remaining ones have to be moved to another array. Potentially one could avoid the second copy operation by adding a filter field to the batch class.

Since I am consolidating over a fixed time period rather than for a fixed array size, the size of the consolidated arrays varies around an average of  5.5 MB. The CLR stores these relatively large arrays in the Large Object Heap. By default the garbage collector does not compact this heap due to the potentially high cost. The downside of this approach is memory fragmentation. There are two ways to avoid this problem, firstly one can ask the garbage collector to compact the Large Object Heap (see here) or use fixed sized arrays.

The consolidation process takes place once every minute, the cost of this operation is dominated by the sorting of the consolidated array. Currently I am using the Array.Sort method. Since the array is already partially ordered a different sort algorithm might improve performance unless it is more efficient to directly merge the arrays rather than copying them and then sort.

On my laptop the sorting takes on average 27 ms, executed every minute this represents a minuscule proportion of time.

Now there are many other ways the events can be stored, the events could be grouped by some property such as the ticket, so that for any time interval there would be as many arrays as tickets. This is just one form of indexing. Again any indexing is possible, it all depends on the use case. This can be taken even further if we have another continuous variable: events can be grouped by range in two or more dimensions. Thus providing double indexing of events.

All this is made easy by the use of transactions as all writes are performed atomically. So at any time it is extremely easy to modify or extend the way the data is stored while maintaining consistency and allowing many readers to run unobstructed.

Note that if time ordering of events is not required within a set, the events can be sorted on any other property. This becomes be particularly interesting if that other property is another continuous property.

 

Array based semi-mutable sets – implementation

Now a typical example where many items can be added within single transaction is when a database collects time stamped events also called  time series. One can easily imagine a system where many thousands of events are added to a database every second. Obviously there is no point in creating a single transaction per event and in general one can batch them together. The delay between consecutive batches will then depend on the acceptable delay for events to be made available to the database readers. One can imagine batching periods ranging from the order of milliseconds to many seconds or more. Overall the important parameters of such a system are:

  • event production rate
  • batching period
  • events retention period

Together these parameters will determine the liveliness and memory usage of the database. As an example consider NASDAQ trades. About 20 billion shares are traded every day on the NASDAQ, assuming an average of 300 shares per trade (approximate NYSE numbers) and a trading day of 7 hours there are, on average, about 3000 transactions per second. Assuming a retention period of 5 days, we are talking about keeping about 350 million trading transactions in memory. With a batching period of 1 second, we will be creating about 117000 arrays, each containing on average 3000 events. While events are being added to the database, transactions beyond the retention date will be removed by the system. To avoid contention, I will be doing this garbage collection in the same transaction as the one that writes the new events. Essentially I will have a system with a single threaded writer and multi threaded reader.

Implementation

The trading transactions will be represented using the following data structure:

public struct TradingEvent : IComparable {
    public long Id;
    public long Time; // unix milliseconds
    public short Price;
    public short Volume;
    public short Stock;
    public short BuyerId;
    public short SellerId;
    public short TraderId;
    public int CompareTo(object other) {
        return Time.CompareTo(((long)other));
    }
}

which takes up 32 bytes. The following data structure holds the array of trades ordered by time:

public struct TradeBatch : IComparable {
    public Guid Id;
    public long First;
    public long Last;
    public TradingEvent[] Trades;
    public int CompareTo(object other) {
        var that = (TradeBatch) other;
        var result = First.CompareTo(that.First);
        if (result == 0) result = Id.CompareTo(that.Id);
        return result;
    }
}

which are then stored in the following semi-mutable set:

smxSet<TradeBatch> allTradeBatches;

The code first populates 5 days worth of events (about 378 million events, i.e. 11 GB) and then generates 3000 events every second. 500 events are generated with time stamps within the batching period of one second, another 500 events  within 2 seconds, then another 500 within 5 seconds, 500 within 10 seconds, 500 within 30 seconds and the last 500 are generated within 60 seconds. This models the case where events are received after some delay. Hence on average about 60 arrays overlap each other in time, or in other words, for any given time, events can be found on average in 60 arrays.

Since the events within an array are sorted, enumerating all events within a time range is just a matter of filtering the batches that overlap with the desired range and then binary chopping the array to get the required indices that match the time interval:

foreach (var batch in allTradeBatches) {
    var firstIndex = 0;
    var lastIndex = batch.Trades.Length;
    if (start >= batch.First && start <= batch.Last) {
        firstIndex = Array.BinarySearch(batch.Trades, start);
        if (firstIndex < 0) firstIndex = -(firstIndex + 1); } 
        if (end >= batch.First && end<= batch.Last) {
        lastIndex = Array.BinarySearch(batch.Trades, end);
        if (lastIndex < 0) lastIndex = -(lastIndex + 1);
    }
    for (int i = firstIndex; i < lastIndex; i++) {
        totalVolume += batch.Trades[i].Volume;
    }
}

On average, the binary search should only have to be carried twice on sixty arrays. Once the indices have been determined, all accesses to memory are sequential, i.e. rates of 10 GB/s can be expected, which should translate to 320 Mev/s. Assuming a maximum bandwidth of 50 GB/s when multithreaded, we could expect to read events at the rate of 1.6 Gev/s.

Obviously the binary search adds an overhead. The standard array implementation takes about 4 μs per array fo 3000 events on my laptop. A basic custom implementation, one which doesn’t have to go through the ICompare interface and the unboxing of the long takes 1.4 μs. Assuming 120 binary searches, the current implementation presents an overhead of up to 0.5 ms or, with a custom implementation, 170 μs. To put things in context, this corresponds to the time it takes to sequentially access respectively 160 Kev or 54 Kev.

Interestingly, the binary search time of 4 μs corresponds to reading 1280 events, double that if you have to perform two binary searches and we could have already retrieved 2560 event! Reading the whole array takes almost the same time as performing the binary search! However, this is not the case if one uses the custom implementation, but is still a remarkable fact that highlights the incredible speed of sequential reads. Actually the code to be executed would have to be something like:

for (int i = 0; i < batch.Trades.Length; i++) { 
    if (batch.Trades[i].Time > start && batch.Trades[i].Time < end) {
        totalVolume += batch.Trades[i].Volume;
    }
}

So let’s see how this compares to the same loop but without the conditions on the time. On an E5 2666 Xeon machine, it took 36 ms with the conditions versus 29 ms without them to enumerate 3000*3600 events. In other words 300 Mev/s versus 372 Mev/s. So a slowdown of about 20%.

The main advantage this last version is that the arrays do not have to be sorted! Whether this is really an advantage will depend on the actual use case, and in general more time is spent reading than writing, so it is often good practice to do more work only once.

I have executed some more complete tests (still focused on enumerating large numbers of events rather than just a few, so the impact of the binary searches should be negligible). The code is executed in up to 8 threads, each threads enumerates a disjoint set of events, for example, in the case of two threads, the first one enumerates the first half of all events while the second thread enumerates the second half. This way all the data from the event batches is read from main memory rather than from the cache. The events set was populated with 5*7*3600 batches of 3000 events.

Reads-overlaping

Single threaded performance stands at 315 Mev/s (almost 10 GB/s), maximum total bandwidth reaches about 1.3 Gev/s. These results include garbage collection of versions, generation of 3000 events every second, removal of old batches, but no persistence to disk.

Using this approach, the database is populated with 126’000 arrays, which is a lot, also on average 60 arrays overlap in time. Although this approach is most efficient at enumerating all the events within a given time range, it is important to note that overall the events are only partially ordered.

In the next post I shall illustrate how these small arrays can be consolidated into fully ordered larger non-overlapping ones.

Array based semi-mutable sets

Persistent sets, on which my semi-mutable sets are based, are usually implemented as trees and are relatively inefficient in both insertion costs and memory usage. They are particularly inefficient when inserting many items in a single transaction as each insert requires the creation of a new path to the newly inserted item. This comes with all the associated memory allocation costs and subsequent garbage collection overhead. This problem can severe when deserializing large sets. Unless of course the data structure supports a merge function.

To illustrate this I have performed some quick and dirty tests on a few .Net data structures:

  • array
  • HashSet
  • FSharpSet

Where the FSharpSet is the only persistent data structure and which I use in my semi-mutable set. The first test consist of populating the data structures with 1’000’000 random longs:

    long[]    12 ms
    long[]   112 ms    and sorted
    HashSet  180 ms    added one at a time
    FSharp  4990 ms    added one at a time

Nothing surprising here, Note that creating the FSharpSet set from an array of longs isn’t faster than adding one item at at a time. A quick look at the source code indicates that the constructor actually adds these values one at a time.

The next tests consist of measuring the enumeration speed of the above structures. 100 enumerations of 1’000’000 elements:

    long[]    820 ms
    HashSet  1400 ms    
    FSharp  14000 ms

Again here, there is no real surprise, except maybe that the HashSet is doing rather well when compared to the array. Note that as the enumeration is executed 100 times in loop, the results might well be different when executed only once due to the caching of the data.

Finally memory usage is again unsurprising:

    long[]    7 MB
    HashSet  37 MB
    FSharp   34 MB

Arrays obviously win, by a factor of four. For small objects such as a long, the difference is massive, with an overhead over the actual data of a factor of 3 for longs. 28 MB of overhead for 1’000’000 elements is about 28 bytes per element, which becomes almost negligible when dealing with large objects.

So, the idea is to combine the advantages of both array and persistent sets in a single structure that minimizes batch insertion time and memory consumption with maximum enumeration speed. We create a persistent set of arrays of values. At its simplest every new batch of values is inserted in the semi-mutable set as an ordered array of values.

Here is a short list of some of the disadvantages of this approach

  • it is expensive to enumerate all the element in order as one will have to continuously hop through all the arrays, unless all the arrays are disjoint.
  • adding or removing a few elements requires the copying of a complete array
  • there is nothing to stop having multiple copies of the same elements in a given sets

However, since the arrays are ordered, it is relatively cheap to enumerate (not in order) all elements in a given range. Though it will still be slower than when working with a fully ordered set as all the arrays have to be searched (unless again they don’t overlap).

In the next post I shall illustrate the implementation and use of this data structure with a fairly realistic example. At that point we will a system with a very large number of relatively small array and we will see how our arrays can be consolidated into fewer, larger ones.

Model comparison

A long time overdue post…

One of my preferred slides when presenting my transactional model compares different locking models. I start with the most restrictive model, a single global lock:

model1

where I consider four threads named A,B,C,D. Writes are amber coloured, reads are in blue. Dashed sections represent blocked threads waiting to either write or read. Since there is a single global lock, the execution is serialized, there is no concurrent execution and there is always one active thread and three blocked ones. Long running “transactions” have a direct impact on the system’s throughput.

Things improve “dramatically” when we swap our single global lock for a reader-writer lock:

model2

Hey it’s concurrent now! Well as long as we read we have concurrency, but as soon as we want to write all the reader threads are blocked. But still a huge improvement over having a single global lock. Also the writer thread has to wait for all reader threads to terminate before it can acquire the writer lock. The reader threads cannot be allowed to last too long if we want to have to give the writer thread a chance to update the system. So there are still some serious constraints. On the positive side there is one important property: since only one writer is allowed, with no concurrent reader, there is no need for any transactional mechanism, all writes performed can only be read after the writer lock has been released. So no partial writes can be observed by any reader. Long running “transactions” still have a direct impact on the system’s throughput: long running read transaction prevents writes and vice-versa.

Next comes the versioned system:

model3

which uses a single, global writer lock. As before we can have any number of concurrent readers; the novelty is that since existing versions are strictly immutable, the reader threads can be executed concurrently with a writer thread. So in practice a reader thread can last as long as needed (at the cost of having to keep some versions in memory). We still have a single writer lock, so a thread that wants to perform writes has to wait for the lock to become available. So we are still single threaded in terms of writes. As a consequence there is still no need for any transactional mechanism as partial writes cannot be observed. Long running read-only transactions are possible, we can also have long running write transactions, but at the cost of blocking other would-be writers. The cost of moving to this higher level of concurrency is an increase in memory usage as all accessible versions must be preserved. Note that write transactions cannot fail when we are dealing with essentially single threaded writer systems. Additionally the thread can directly modify the versioned variables, no need to delay the writes to the commit phase. This model can also be fully single threaded in which case we get:

model4

The last model is the one I have described in this blog:

model5b

Here we are adding support for long-running concurrent write transactions. In terms of locking we have the single global version lock which is very short (increment a number and copy it) and the longer per transactional box/variable lock that must be taken for the duration of the validation and update phase. In the above diagram we have thread A waiting on thread D to acquire a box lock and later we have thread C waiting on thread D. We have seen before that this can be sometimes mitigated by partitioning the data.

This last model comes at a cost: registration of reads and writes during the execution of the body of the transaction, validation stage and commit stage. Concurrent writer transactions also introduce the risk of conflicts, but we have seen that leveraging the semantics of data structures can greatly reduce this risk.

Any system with a decent level of concurrency will, most likely have to use at least versioned variables. We must remember that server processors can have up to 18 Intel cores (expensive, $4115  ) or 16 AMD cores (cheaper, $719) You can use dual socket, 8 cores per processor, systems in the cloud, that’s 32 hyper threaded cores at a relatively low cost.

I expect the difference between the single threaded writer model and the concurrent writer model to be, in terms of ease of implementation and in terms of performance to be absolutely massive. However, when dealing with concurrent, long-running transactions (I shall come back to discussing what a transaction can be considered a long-running one) write transactions there is no substitute to the full model.

Piecewise immutable programming

Copyright © 2011 Gabriel Zs. K. Horvath

Although not a programming language expert at all, I will try to muster all my limited knowledge to briefly discuss software transactions in terms of programming model. Be aware that this is all very tentative…

On the one hand the level of isolation and immutability is very much reminiscent of functional programming, on the other hand the mutability of the versioned variables is essentially imperative programming. To better understand this relationship let’s see, very tentatively, how my model can be used to code in a “pure” imperative way or in a more functional one.

Read more of this post

Model specification

Copyright © 2011 Gabriel Zs. K. Horvath

This is my first attempt at giving a more formal definition of my software transactions model. Undoubtedly, a post which will be frequently updated.

Consider a system composed of the following:

  • A single transaction manager holding a unique global version number
  • Transactions
  • Threads
  • Semi-mutable versioned variables managed by the transaction manager
  • Non-versioned transaction-local immutable variables
  • External resources, these can be either read from or written to
  • A garbage collector

The program consists in the execution of transactions in one or more concurrent threads.

Read more of this post

Joining transactions

Copyright 2011 © Gabriel Zs. K. Horvath

Up to now all the transactions have been running in independent threads. There hasn’t been any mention of sub-transactions or the possibility of joining threads.

Sub-transactions

Since all variables within a transaction are immutable, it is meaningless to think in terms of sub-transactions which commit changes. Instead we can have multiple threads running concurrently within a single transaction and merge the recorded write operations of the various threads when joing the threads. Since the threads within the same transaction must still register all the reads and writes independently of the main transaction they are equivalent to normal transactions which happen to share the same version number as the main transaction. So far my model has no concept of sub transactions.

Read more of this post

Garbage collection


Copyright 2010-2011 © Gabriel Zs. K. Horvath

So far all the versions of the versioned variables have been kept in the variables’ history. This is obviously not a viable option and an unnecessary one as most of the versions are likely to become quickly unreachable. Let me first define what a reachable version is. Consider the following versioned variables:

Read more of this post

Contiguous transactions and notifications

Copyright 2009-2011 © Gabriel Zs. K. Horvath

Contiguous transactions

Up to now atomicity has consisted in ensuring that writes to a number of versioned variables were “performed instantly”. The idea is that there is no observable gap between when the variables involved in the transaction are modified. So we have no gap between writes, but what about gaps between transactions? After a transaction has committed one might need to start a new transaction in the state the previous transaction left it in. One could start a new transaction straightaway, hoping to catch the system in the state the previous transaction left it in. But of course there always a risk that another transaction commits in the meantime and modifies the state of the system before our follow-on transaction starts. A contiguous transaction is one which is guaranteed to see the system in the state its parent transaction put it in.

Read more of this post

Awelon Blue

Thoughts on Programming Experience Design

Joe Duffy's Blog

Adventures in the High-tech Underbelly

Design Matters

Furniture design blog from Design Matters author George Walker

Shavings

A Blog for Woodworkers by Gary Rogowski

randallnatomagan

Woodworking, life and all things between