In this blog posting I'm going to show how to quickly implement a multithreaded server.
A server is an application that accepts connections from clients, then processes client requests and sends back replies. A multithreaded server processes each requests in a separate thread so that a request from one client won't ever block a request from another client.
First, let's write a simple client so that we can test the server afterwards.
As always, we start with the ØMQ header file and other header files we'll need:
#include <zmq.hpp>
#include <string.h>
Then, in the main routine we'll initialise the library. Like most normal applications we use exactly one I/O thread:
zmq::context_t ctx (1);
Now, let's create a socket. The client is a requester (sends requests and receives replies) so uses a REQ socket:
zmq::socket_t s (ctx, ZMQ_REQ);
Once we have the socket we can connect it to the server. For now, let's assume the server will be running on the local machine listening for new connection on port 5555:
s.connect ("tcp://localhost:5555");
At this point we can send a request to the server. Our server will be a dummy application so it doesn't need any data in the request, so let's send it 10 binary zeroes:
zmq::message_t request (10);
memset (request.data (), 0, request.size ());
s.send (request);
When the request is sent, we'll wait for a reply. As the server won't send any useful data in the reply we'll just ignore it:
zmq::message_t reply;
s.recv (&reply);
That's it. We have the client ready. Now let's write the multithreaded server itself.
First, let's include some header files:
#include <zmq.hpp>
#include <assert.h>
#include <pthread.h>
#include <unistd.h>
#include <string.h>
Create the main function and initialise ØMQ library. You have to be doing weird stuff to need more than one I/O thread:
zmq::context_t ctx (1);
We create an in-process endpoint for worker threads to connect to. We'll use XREQ socket instead of REQ so that processing of one request won't block other requests:
zmq::socket_t workers (ctx, ZMQ_XREQ);
workers.bind ("inproc://workers");
We create a TCP endpoint for client applications to connect to. Recall that when writing the client we've made assumption that server will be running on the local machine on port 5555. Thus we bind the socket to the loopback interface and port 5555. Also, we'll use XREP socket instead of REP so that processing of one request won't block other requests:
zmq::socket_t clients (ctx, ZMQ_XREP);
clients.bind ("tcp://lo:5555");
We'll use one dispatcher thread and ten worker threads. At this point, let's launch the worker threads:
for (int i = 0; i != 10; i++) {
pthread_t worker;
int rc = pthread_create (&worker, NULL, worker_routine, (void*) &ctx);
assert (rc == 0);
}
The last thing to do in the main thread is to run a queue device which will act a message dispatcher loop transferring the requests from clients to the worker threads and replies from workers back to the clients:
zmq::device (ZMQ_QUEUE, clients, workers);
Now we have main function ready. What's missing is the body of the worker thread. We'll create the empty body first, then fill it in:
void *worker_routine (void *arg)
{
}
First thing to do is to retrieve ØMQ context object that main thread sent to the worker thread as an argument:
zmq::context_t *ctx = (zmq::context_t*) arg;
Then let's open a socket (we are going to reply to requests so we'll use REP socket type) and connect it to the in-process endpoint created by the main thread:
zmq::socket_t s (*ctx, ZMQ_REP);
s.connect ("inproc://workers");
We won't mess with decent shutdown of the server so let's just create an infinite loop here. Within the loop we'll get a request dispatched by the main thread, process it and send the reply back to the requester (via dispatcher in the main thread). Given that we have no real work to do, let's just sleep for a while when processing a request:
while (true) {
zmq::message_t request;
s.recv (&request);
sleep (1);
zmq::message_t reply (10);
memset (reply.data (), 0, reply.size ());
s.send (reply);
}
That's it. We have a multi-threaded server and a client to send requests. Here is the source code in compact form:
Enjoy!

I looked at the latest zmq_socket.txt from the git repository but I did not see these socket types defined. Based on your description above they sound like non-blocking variants of REQ and REP. True?
Yes. True.
"Yes. True."
Where are they documented?!? What other types are lurking out there someplace? Google ZMQ_XREP links to various copies of this page and an assortment of people reporting that 'ZMQ_XREP' is undefined…
Worst. Documentation. Ever.
They are not documented because they are not yet considered to have stable semantics. However, you can contribute the documentation if you feel like it. I'll be happy to answer your questions on the topic.
I am trying to write something similar to the multithreaded server, with three worker threads. I want to connect each worker thread to three clients but on different sockets i.e. i have three sources talking to the server and the server responding to each of these using a different worker_function ?
Can you please help me with this ?
Why not use 3 different sockets?
How does the worker thread now to which client it has to send the answer?
It's automatic. 0MQ sockets handles the routing of answers to correct clients for you.
Out of sheer interest: Would this work also without the QUEUE device, i.e. implementing one's own scheduling strategy?
Yes, sure. Queue doesn't do much anyway. Have a look at its implementation here:
http://github.com/sustrik/zeromq2/blob/master/src/queue.cpp
Can the client get responses asynchronously? I mean without issuing an explicit recv() or polling the socket with zmq_poll(). Something like registering a callback to be invoked when the reply comes back?
What about this:
Not what I had in mind. I need to allow for any number of outstanding requests, not just one, and subscribers as well. I know I can use zmq_poll() and implement this myself, but I was wondering if 0MQ had a more flexible dispatching mechanism than just zmq_poll(). I don't really like that interface at all.
Use XREQ socket in the client instead of REQ socket. That allows for any number of outstanding requests. That's what you had in mind?
No, by "any number of outstanding requests" I meant in an arbitrary number of sockets as well. It's a dispatching issue, not a socket type issue. The same would apply to SUB sockets. It looks like I'll have to stick to zmq_poll() for now.
Thanks anyway!
Each 0MQ socket can handle arbitrary number of TCP connections in the background, if that's what you mean. You can do for example the following:
Exactly, but to actually get messages from those sockets, I have to either call recv() on each one, or call zmq_poll() with all three. My original question was whether there was another message dispatch mechanism so I can just pass a message callback and have 0MQ handle things behind the scenes. For example, the way 29West's LBM does it.
We are missing each other somewhere… There is only one socket in the example :|
Matt, I agree that such a thing would be useful. Since it can be built on top of the zmq_poll primitive with a little additional magic, why not submit it as a patch back to the library?
Matt, msustrik and I were chatting about this on irc. Any chance you could join the channel (freenode, zeromq channel) and provide more details on what you are looking for?
I *think* I know what you want, but I don't want to misrepresent you.
Alternately, post more details here.
Martin, yes, I can see the example has only one socket. My question was more general than the specific example. Sorry if that wasn't clear from the start.
Cremes, yes, as I mentioned above, I know this can be implemented more or less easily on top of zmq_poll(), but that's not the right interface to use as primitive (IMHO, of course).
I'll give IRC a try later tonight. I won't have any time before then.
Thanks!
Matt,
It's alread 10:20 PM here, so I won't be on IRC later on. Feel free to discuss with Chuck though.
What we've figured so far is that you maybe ask for something like this:
s.send (request, callback);
expecting that callback will be invoked once the reply arrives.
The sending thread won't be blocked and will be free to send more request.
Getting any closer or am I on a wrong track?
Yes, that's exactly what I had in mind, and also extend it to SUB sockets, so I can do something like
s.subscribe("topic", callback, arg)
and have callback(msg, arg) invoked whenever a message arrives on that topic. BTW, this is in the context of porting an LBM-based messaging library to 0MQ. I'm currently using zmq_poll() to do this background dispatching, but I'm not very happy with it.
Thanks!
I also ran into a similar problem using zmq for a pub-sub server.
The simplest way to describe my application is its a messaging server to which clients connect using normal tcp sockets over intranet/internet. The purpose is to exchange messages between them.
I am using boost::asio to implement the tcp server part of the app and each client connection is tied to a SUB socket with a unique topic and all clients publish messages to the PUB socket the SUB sockets are connected to.
Now the tricky part is client connections are managed using asio; means no thread-connection pair. so i dont have a thread to wait on the SUB socket attached to the connection. so i should use zmq_poll but it appears complicated to work with if you have 10,000 SUB sockets in the server process. It would have been great if there is a wrapper around zmq_poll which would give something like what Matt asked for.
s.subscribe("topic", callback, arg)
Kindly advise me in this regard. Did anyone write such a wrapper?
Thank you.
It would be interesting to demonstrate how to do a proper shutdown of the above server.
I have been scratching my head on how to evolve the above sample to be able to do so particularly if the shutdown is to be initiated by a request processed by a worker thread.
You can destroy the context. In such case all blocking functions in whatever thread exit with ETERM error. Also, any subsequent attempts to use sockets result in ETERM.
Thanks,
I finally manage to achieve what I wanted but this wasn't an easy road. I guess it is mainly the c++ binding that got me in trouble.
There is probably a nicer solution but this is what I had to do:
- Add a shutdown method to the context (also make sure we don't call it more than once otherwise the term method assert). That way a worker thread can call the api instead of destroying the context which could be in use.
- Remove all assert from zmq::queue otherwise the program assert on exit
- Add a timeout value in the poll method of zmq::queue and exit the loop when getting a ETERM error code. For some reason when the timeout is set to -1 sometime the zmq:poll doesn't return even after closing the context.
Yes. The devices should exit when ETERM is returned from the poll. This will be done shortly.
As for poll, it should return when context it terminated. If it does not, it's a bug. Can you please report it to the bug tracker?
Thanks.
Martin
You guys may take a look at our implementation:
http://sysgears.blog.com/2010/12/08/load-balancing-work-between-java-threads-using-zeromq/
We hope it can be found useful.
Hi Max,
I've added the link to 0MQ blog. You may also annouce it on the 0MQ mailing list.
Thanks!
Martin