A multithreaded server in 10 mins

In this blog posting I'm going to show how to quickly implement a multithreaded server.

A server is an application that accepts connections from clients, then processes client requests and sends back replies. A multithreaded server processes each requests in a separate thread so that a request from one client won't ever block a request from another client.

First, let's write a simple client so that we can test the server afterwards.

As always, we start with the ØMQ header file and other header files we'll need:

#include <zmq.hpp>
#include <string.h>

Then, in the main routine we'll initialise the library. Like most normal applications we use exactly one I/O thread:

zmq::context_t ctx (1);

Now, let's create a socket. The client is a requester (sends requests and receives replies) so uses a REQ socket:

zmq::socket_t s (ctx, ZMQ_REQ);

Once we have the socket we can connect it to the server. For now, let's assume the server will be running on the local machine listening for new connection on port 5555:

s.connect ("tcp://localhost:5555");

At this point we can send a request to the server. Our server will be a dummy application so it doesn't need any data in the request, so let's send it 10 binary zeroes:

zmq::message_t request (10);
memset (request.data (), 0, request.size ());
s.send (request);

When the request is sent, we'll wait for a reply. As the server won't send any useful data in the reply we'll just ignore it:

zmq::message_t reply;
s.recv (&reply);

That's it. We have the client ready. Now let's write the multithreaded server itself.

First, let's include some header files:

#include <zmq.hpp>
#include <assert.h>
#include <pthread.h>
#include <unistd.h>
#include <string.h>

Create the main function and initialise ØMQ library. You have to be doing weird stuff to need more than one I/O thread:

zmq::context_t ctx (1);

We create an in-process endpoint for worker threads to connect to. We'll use XREQ socket instead of REQ so that processing of one request won't block other requests:

zmq::socket_t workers (ctx, ZMQ_XREQ);
workers.bind ("inproc://workers");

We create a TCP endpoint for client applications to connect to. Recall that when writing the client we've made assumption that server will be running on the local machine on port 5555. Thus we bind the socket to the loopback interface and port 5555. Also, we'll use XREP socket instead of REP so that processing of one request won't block other requests:

zmq::socket_t clients (ctx, ZMQ_XREP);
clients.bind ("tcp://lo:5555");

We'll use one dispatcher thread and ten worker threads. At this point, let's launch the worker threads:

for (int i = 0; i != 10; i++) {
    pthread_t worker;
    int rc = pthread_create (&worker, NULL, worker_routine, (void*) &ctx);
    assert (rc == 0);
}

The last thing to do in the main thread is to run a queue device which will act a message dispatcher loop transferring the requests from clients to the worker threads and replies from workers back to the clients:

zmq::device (ZMQ_QUEUE, clients, workers);

Now we have main function ready. What's missing is the body of the worker thread. We'll create the empty body first, then fill it in:

void *worker_routine (void *arg)
{
}

First thing to do is to retrieve ØMQ context object that main thread sent to the worker thread as an argument:

zmq::context_t *ctx = (zmq::context_t*) arg;

Then let's open a socket (we are going to reply to requests so we'll use REP socket type) and connect it to the in-process endpoint created by the main thread:

zmq::socket_t s (*ctx, ZMQ_REP);
s.connect ("inproc://workers");

We won't mess with decent shutdown of the server so let's just create an infinite loop here. Within the loop we'll get a request dispatched by the main thread, process it and send the reply back to the requester (via dispatcher in the main thread). Given that we have no real work to do, let's just sleep for a while when processing a request:

while (true) {
    zmq::message_t request;
    s.recv (&request);

    sleep (1);

    zmq::message_t reply (10);
    memset (reply.data (), 0, reply.size ());
    s.send (reply);
}

That's it. We have a multi-threaded server and a client to send requests. Here is the source code in compact form:

Enjoy!

Comments: 30

Add a New Comment