A Doubly-MMapped Contiguous Shared-Memory Lock-Free Queue

1 hour ago 1

I realise the title is a bit of a mouthful. It was either that or “No-copy IPC lock-free queue for variable length messages”. Naming is hard.

DISCLAIMER: It’s not uncommon that one needs to send messages to another process as fast as possible, but 99% of the cases the best answer for that is boost::interprocess[1]. Even in the last 1%, in production one should obviously be very careful before attempting to re-invent the wheel. However, as the spirit of this blog is to learn and show how the sausage is made, we shall do just that 😉

Here is the code for the people that just want to get to the point.

Intro

It’s easy(ish) to create a ring-based lockfree queue that is Single Producer-Single Consumer[2], but it gets tricky if one also needs variable size messages. The problem with variable size messages is that the end of the ring and the beginning of the ring could hold two halves of a single message, so we’d have to copy it into a contiguous buffer before reading it. This is usually okay, but for a (very) small amount of users this is not really an option: needless calls to memcpy should be avoided and messages should be read straight off the queue’s buffer.

There is one way (that I know of) to have a contiguous address space that loops around, and it boils down to mmapping the ring region twice and making sure the two mappings are next to one another. This is what this project accomplishes.

Implementation

To abstract the calls to mmap and friends we use a small RAII class called mapped_memory. This will take care of unmapping things for us when we are done using them. The double mapping magic happens in a factory function that builds our reader and writer queues:

static Derived queue_factory(const std::string& queue_filename, const std::string& control_block_filename) { const size_t size = std::filesystem::file_size(queue_filename); // Let's reserve the space before we mmap the buffer twice. mapped_memory double_mapping(2 * size); // Now we do the mapping of the same file twice in the contiguous region // we reserved. This will invalidate the previous mapping btw. mapped_memory first_mapping( queue_filename, size, double_mapping.get_address()); mapped_memory second_mapping( queue_filename, size, first_mapping.get_address() + first_mapping.get_length()); double_mapping.release(); [...]

Ignoring for the moment the return type Derived, the first step is creating an anonymous mapping which is twice the size of the ring buffer; this is to ensure that we can actually map two regions one next to the other. In fact right after we create the two mappings passing the same filename and size, but crucially we pass the address of the end of the first mapping to the second mapping.

One notable detail of the above function is that it takes a file path to a “control block”, this is a file containing the following data structure:

using std::hardware_destructive_interference_size; struct control_block { alignas(hardware_destructive_interference_size) std::atomic<uint64_t> version; std::atomic<uint64_t> next_read_offset; alignas(hardware_destructive_interference_size) std::atomic<uint64_t> next_write_offset; };

We note that we use a feature of C++17: std::hardware_destructive_interference_size[3]. This helps us split the two important members of the header, the reader and writer pointers, into two different cache lines. This is a common feature in lock-free data structures as you don’t want your reader thread/process to invalidate your writer thread/process while moving the its pointer and vice-versa.

As mentioned, we use our queue_factory function to construct the two “queue” classes: queue_reader and queue_writer. The first one is:

class queue_reader : public detail::queue_base<queue_reader> { public: const_view get_buffer(size_t bytes_to_read); void pop(size_t bytes_to_pop); [...]

The reader application has to call get_buffer to attempt reading N bytes, and the return value could be an empty buffer if N bytes are not available (i.e. the writer is being slow). The returned const_view is a non-owning buffer akin to asio::const_buffer (which could be a drop in replacement in production), containing a pointer and a size member. Its interface is

struct const_view { explicit operator bool() const { return _ptr; } size_t size() const { return _len; } const char* data() const { return _ptr; } [...]

The buffer is overloading operator bool, so it’s easy to check whether the call to get_buffer was successful.

Notably the user is just passed the pointer to the internal buffer, so no copy is done. Once the user read the message, they have to call pop to move the internal reader pointer; this will notify the writer that some space has just freed up.

Similarly the writer class is:

class queue_writer : public detail::queue_base<queue_writer> { public: mutable_view get_buffer(size_t bytes_to_write); void push(size_t bytes_to_push); [...]

The only other noteworthy detail is the conspicuous CRTP[4]: that’s just there as a trick to inherit the factory function we described above, which can then use the Derived template as return type.

Examples

There are two apps in the repo: a reader and a writer. Starting with the writer, after a bit of boilerplate, we can find

queue_writer writer = queue_writer::queue_factory(queue_filepath, control_block_file); const std::string message("message"); const size_t total_message_size = sizeof(message_header) + message.size(); queue_writer::mutable_view buffer = writer.get_buffer(total_message_size); if (buffer) { const uint64_t tsc = __rdtsc(); message_header header{1, static_cast<uint32_t>(message.size()), tsc}; std::memcpy(buffer.data(), &header, sizeof(header)); std::memcpy(buffer.data() + sizeof(header), message.data(), message.size()); writer.push(total_message_size); [...]

In the first line we setup the writer and hard-code a message to send. In a real scenario the message can be anything of any length and obviously not the same every time.

The following lines mention a header. This is a customizable struct that is used by the reader to make sense of the incoming message. In our example the header is:

#pragma pack(push, 1) struct message_header { uint32_t version; uint32_t size; uint64_t timestamp; }; #pragma pack(pop)

We notice right away that the struct is packed, even if the members are carefully placed not to leave any padding anyway. What we want to achieve with packing is actually changing its alignment[6] to 1. This is because we need to read it straight off the buffer via reintepret_cast, and doing so is already technically UB (until we’ll get std::bless/std::start_lifetime_as at least[7][8]), but having it be aligned to its natural alignment could lead to crashes.

After that we just call get_buffer and memcpy into the buffer, followed by a push of the written bytes.

The reader is symmetric as expected:

const queue_reader::const_view header_buffer = reader.get_buffer(sizeof(message_header)); if (header_buffer) { const message_header* const header = reinterpret_cast<const message_header*>(header_buffer.data()); const queue_reader::const_view message_buffer = reader.get_buffer(header->size); std::string_view read_message(message_buffer.data(), message_buffer.data() + header->size); Logger::Info("Read:", read_message, ", timestamp", header->timestamp, ", delta:", delta_tsc); reader.pop(header_buffer.size() + message_buffer.size()); [...]

In the above you can see that first we reinterpret_cast the header, then we know how long is the incoming messages so that we can call get_buffer again.

Performance Analysis

In the reader’s and writer’s code you can find a timer and a vector that stores the deltas of TSC[9] between sending and receiving for a million messages. We can run the reader/writer with

taskset -c 6 ./apps/reader --num_messages 1000000 taskset -c 5 ./apps/writer

We are greeted with the output:

Receiving 1000000 messages took: 59490ms INFO: Average: 110.163 INFO: st. dev: 1534.65 INFO: 99th percentile: 238

The total time it took is irrelevant since we are artificially stalling the writer every message to simulate “sparse events”. Feel free to modify the code of the writer to send messages in a loop and measure throughput.

Then we can use an include gnuplot script (see the “plot” directory) to produce the following histograms of TSC Deltas:

The deltas of TSC in the graph are in “cycles” and we notice that the distribution is somewhat discrete. These kind of spectrum reflects the different kind of “cache misses” our application is experiencing. Regardless, we achieved an impressive speed: considering that my test machine has an invariant TSC of about 1696 MHz, we are looking at latencies hovering around 60 nanoseconds here!

Lots more could be done to investigate the performance of this queue and perhaps we will in a follow up post. To name a few things we should definitely do

  • Run with “perf stat”. Perf can tell us quickly how many cache misses we have, how many branch prediction misses and if the CPU is stalled waiting for instructions or data (here almost certainly the latter)
  • Check the generated Assembly. This could be fun to see on a x86 platform how the code gets optimised on the hot-path
  • Plug the code in a more realistic application, and measure performance again. There is only so much information that micro-benchmarking can give you, you need to profile in a real environment.

That’s it for the moment. As usual, if you have any comments and/or corrections and/or suggestion please do let me know 😉

References

[1] https://www.boost.org/doc/libs/1_63_0/doc/html/interprocess.html

[2] https://github.com/facebook/folly/blob/main/folly/ProducerConsumerQueue.h

[3] https://en.cppreference.com/w/cpp/thread/hardware_destructive_interference_size

[4] https://en.wikipedia.org/wiki/Curiously_recurring_template_pattern

[5] https://www.oreilly.com/library/view/97-things-every/9780596809515/ch55.html

[6] https://en.wikipedia.org/wiki/Data_structure_alignment

[7] https://www.open-std.org/jtc1/sc22/wg21/docs/papers/2019/p0593r4.html#166x-implicit-object-creation-ptrbless

[8] https://www.open-std.org/jtc1/sc22/wg21/docs/papers/2022/p2590r0.pdf

[9] https://en.wikipedia.org/wiki/Time_Stamp_Counter

Read Entire Article