Zero-copy and Multi-part Messages

In high performance networking copying data is considered harmful to performance and avoided as much as possible. The technique of avoiding all the copies is known as "zero-copy".

This article demonstrates the impact of single copy of the data on latency. It shows, for example, that for 256MB of data, single copy can increase latency by 0.1 second!

Obviously, data are copied from memory to network interface card and vice versa, they are copied on user space/kernel space boundary etc. This article in Linux Journal gives detailed explanation of what's going on under the hood of the operating system and what are the ways to get as close to the zero-copy as possible.

However, in this blog we are going to discuss only a single instance of copying the data, namely copying user data into ØMQ messages.

Consider the following example. We'll create a message million bytes long and copy the user data into it before sending:

zmq_msg_t msg;
zmq_msg_init_size (&msg, 1000000);
memcpy (zmq_msg_data (&msg), buffer, 1000000);
zmq_send (s, &msg, 0);

The memcpy part looks suspicious. We have the data in the buffer already so why not send the buffer itself instead of copying it to the message? Is ØMQ capable of such thing?

Actually, yes. It is and it has always been. All we have to do is to define deallocation function for the buffer and pass it to ØMQ along with the buffer:

void my_free (void *data, void *hint)
{
    //  We've allocated the buffer using malloc and
    //  at this point we deallocate it using free.
    free (data);
}

Once the deallocation function is defined we can create a "zero-copy" message and pass it the buffer and deallocation function:

zmq_msg_t msg;
void *hint = NULL;
zmq_msg_init_data (&msg, buffer, 1000000, my_free, hint);
zmq_send (s, &msg, 0);

Note that the buffer is now owned by the message. It will be deallocated once the message is sent. We must not deallocate the buffer ourselves!

Also note the hint parameter. It can be used if more complex allocation mechanism is used. Say we allocated the chunk using some "allocator" object and we have to deallocate it via the same object. In such case we can pass the pointer to allocator as a hint to zmq_msg_init_data and modify the deallocation function as follows:

void my_free (void *data, void *hint)
{
    ((allocator_t*) hint)->free (data);
}

We've got rid of the copying, right?

Well, not entirely. In some cases the above may work. In other cases it is insufficient.

Consider the case when we have two large matrices — each 100MB long — which we want to transfer. Unfortunately they are not contiguous in the memory. Each was allocated using separate malloc invocation and thus we cannot describe both using single data pointer.

Why not send them as two separate messages then? Consider say REQ socket. It load balances messages. In other words, if there are two REP sockets connected to it, sending two messages would result in first matrix being dispatched to one REP socket while the second to the other REP socket. This is not what we want. We want the two matrices to form an atomic unit of transfer. They should never be split apart.

It seems that in this case we need something equivalent to POSIX gather arrays. For those unfamiliar with Berkeley socket API, gather array is an array of data chunks that's sent to the networking stack using a single call.

But would that account for all possible scenarios?

There's still a scenario where it won't help. namely, when the two matrices don't exist at the same time. First one is created, sent and deallocated, then second one. In such case the gather array would be of no use. There's no single point in time when we own all the data and thus are able to fill in the gather array.

The new feature in ØMQ called "multi-part message" solves the problem. To put it simply, it allows you to concatenate multiple messages into a single message:

zmq_msg_t msg1;
zmq_msg_init_data (&msg1, matrix1, matrix1_size, my_free, NULL);
zmq_send (s, &msg, ZMQ_SNDMORE);
zmq_msg_t msg2;
zmq_msg_init_data (&msg2, matrix2, matrix2_size, my_free, NULL);
zmq_send (s, &msg, 0);

It looks almost exactly as if you were sending two separate messages except for passing ZMQ_SNDMORE flag to the first send. The flag says: "Hold on! There are more data going to be added to this message!"

The important point to note is that although all parts of the message are treated as a single atomic unit of transfer, the boundaries between message parts are strictly preserved. In other words, if you send a message consisting of two message parts, each 100 bytes long, on the other side you'll never receive a single message part 200 bytes long. Or two message parts, 50 and 150 bytes long. Or even four message parts, each 50 bytes long. You'll get exactly what you've sent — two message parts, each 100 bytes long in the same order as they were sent.

This fact allows for using multi-part messages for adding coarse-grained structure to your message. The example with two matrices illustrates the point. You send the two matrices as two message parts and thus avoid the copy. However, at the same time the matrices are cleanly separated, each residing in its own message part and you are guaranteed that the separation will be preserved even on the receiving side. Consequently you don't have to put matrix size into the message or invent any kind of "matrix delimiters".

Another interesting use of multi-part messages is to combine them with PUB/SUB sockets. Publish/subscribe messaging pattern allows for subscribing for particular subset of messages. Subscription is a chunk of data supplied by receiver, saying "please, send me all the messages beginning with these data":

zmq_setsockopt (s, ZMQ_SUBSCRIBE, "ABC", 3);

Obviously, sender has to place the appropriate data at the beginning of the message to make it delivered to the specific subscriber:

zmq_msg_t msg;
zmq_msg_init_size (&msg, 6);
memcpy (zmq_msg_data (&msg), "ABCxyz", 6);

The part of the message that is checked against the subscriptions is called topic. In our case the topic is "ABC".

When the topic is of variable length you need a delimiter to separate is from the rest of the message, so that subscription mechanism doesn't incidentally consider beginning of the data to be a continuation of the topic. Following example uses "pipe" symbol as delimiter:

zmq_msg_t msg;
zmq_msg_init_size (&msg, 7);
memcpy (zmq_msg_data (&msg), "ABC|xyz", 7);

While this works, it's a bit ugly. Even more importantly, if the topic happens to be binary data, there's no spare symbol we can use as the delimiter.

Elegant solution is to use a two-part message. Subscriptions are always evaluated only against the first message part, so we can place the topic into the first message part while the rest of the data into the second one (or even into several subsequent message parts):

zmq_msg_t topic;
zmq_msg_init_size (&topic, 3);
memcpy (zmq_msg_data (&topic, "ABC", 3);
zmq_send (s, &topic, ZMQ_SNDMORE);
zmq_msg_t value;
zmq_msg_init_size (&value, 3);
memcpy (zmq_msg_data (&value, "xyz", 3);
zmq_send (s, &value, 0);

One final remark. When receiving a message you may know that each message consists of two parts, say "topic" and "value". However, in other scenarios you may have no idea how many message parts there are in the message. In such case ØMQ allows you to ask the socket whether there are more message parts to be received or not. This is done using ZMQ_RCVMORE socket option:

zmq_recv (s, &msg, 0);
int64_t more;
size_t more_size = sizeof (more);
zmq_getsockopt (s, ZMQ_RCVMORE, &more, &more_size);
if (more) ...