|
Table of Contents
|
Reference manual
The ØMQ distribution comes with a detailed reference manual in both UNIX manual page and HTML format.
The reference manual for the latest release can also be viewed on-line.
Sample code
A simple client/server application
This example implements a simple "SQL server" following the client/server model. The server listens to requests using unicast TCP. Each request is a simple NULL-terminated ASCII string. Upon receiving a request the server will send back a canned response of OK, also as a NULL-terminated string.
The server application, using C bindings:
#include <assert.h> #include <stdio.h> #include <string.h> #include <zmq.h> int main () { int rc; void *ctx, *s; zmq_msg_t query, resultset; const char *query_string, *resultset_string = "OK"; /* Initialise 0MQ context, requesting a single application thread and a single I/O thread */ ctx = zmq_init (1, 1, 0); assert (ctx); /* Create a ZMQ_REP socket to receive requests and send replies */ s = zmq_socket (ctx, ZMQ_REP); assert (s); /* Bind to the TCP transport and port 5555 on the 'lo' interface */ rc = zmq_bind (s, "tcp://lo:5555"); assert (rc == 0); while (1) { /* Allocate an empty message to receive a query into */ rc = zmq_msg_init (&query); assert (rc == 0); /* Receive a message, blocks until one is available */ rc = zmq_recv (s, &query, 0); assert (rc == 0); /* Process the query */ query_string = (const char *)zmq_msg_data (&query); printf ("Received query: '%s'\n", query_string); zmq_msg_close (&query); /* Allocate a response message and fill in an example response */ rc = zmq_msg_init_size (&resultset, strlen (resultset_string) + 1); assert (rc == 0); memcpy (zmq_msg_data (&resultset), resultset_string, strlen (resultset_string) + 1); /* Send back our canned response */ rc = zmq_send (s, &resultset, 0); assert (rc == 0); zmq_msg_close (&resultset); } }
The client application, using C++ bindings:
#include <stdio.h> #include <zmq.hpp> int main () { try { // Initialise 0MQ context with one application and one I/O thread zmq::context_t ctx (1, 1); // Create a ZMQ_REQ socket to send requests and receive replies zmq::socket_t s (ctx, ZMQ_REQ); // Connect it to port 5555 on localhost using the TCP transport s.connect ("tcp://localhost:5555"); // Construct an example zmq::message_t with our query const char *query_string = "SELECT * FROM mytable"; zmq::message_t query (strlen (query_string) + 1); memcpy (query.data (), query_string, strlen (query_string) + 1); // Send the query s.send (query); // Receive and display the result zmq::message_t resultset; s.recv (&resultset); const char *resultset_string = (const char *)resultset.data (); printf ("Received response: '%s'\n", resultset_string); } catch (std::exception &e) { // 0MQ throws standard exceptions just like any other C++ API printf ("An error occurred: %s\n", e.what()); return 1; } return 0; }
To build this example, ensure that you have ØMQ built with --with-c and --with-cpp to include the C and C++ bindings, and run the following:
gcc `pkg-config --libs --cflags libzmq` -o server server.c
g++ `pkg-config --libs --cflags libzmq` -o client client.cpp
You can then run ./server and ./client.
Publish/subscribe example
This example implements a publisher which publishes a continuous stream of 8 byte messages containing a message id as a long long in big-endian byte order. We use native operations for each language to serialize the long long value into a byte buffer.
The publisher application, using Java bindings:
import java.nio.*; import org.zmq.*; class publisherApp { public static void main (String [] args) { // Initialise 0MQ with a single application and I/O thread org.zmq.Context ctx = new org.zmq.Context (1, 1, 0); // Create a PUB socket for port 5555 on the lo interface org.zmq.Socket s = new org.zmq.Socket (ctx, org.zmq.Socket.PUB); s.bind ("tcp://lo:5555"); for (long msg_id = 1; ; msg_id++) { // Create a new, empty, 8 byte message byte msg [] = new byte [8]; // Fill it with the current message ID ByteBuffer bb = ByteBuffer.wrap (msg); bb.putLong (msg_id); // Publish our message s.send (msg, 0); } } }
The subscriber application, using Python bindings:
import libpyzmq from struct import unpack def main (): # Initialise 0MQ with a single application and I/O thread ctx = libpyzmq.Context (1, 1) # Create a SUB socket ... s = libpyzmq.Socket (ctx, libpyzmq.SUB) # ... ask to subscribe to all messages ... s.setsockopt (libpyzmq.SUBSCRIBE , "") # ... request the tcp transport with the endpoint myserver.lan:5555 s.connect ("tcp://localhost:5555") while True: # Receive a message msg = s.recv () # Unpack the msg_id from it msg_id = unpack ('>q', msg)[0] # Show progress every 10k messages if (msg_id % 10000) == 0: print msg_id if __name__ == "__main__": main ()
To build this example, ensure that you have ØMQ built with --with-java and --with-python to include the Java and Python bindings, and run the following to build the Java publisher:
javac -cp .../zeromq/bindings/java/Zmq.jar ./publisherApp.java
Replace …/zeromq with the path to your ØMQ source tree.
You can then run the Java publisher as follows (again, replacing …/zeromq as appropriate):
java -Djava.library.path=/usr/local/lib -cp .../zeromq/bindings/java/Zmq.jar:. publisherApp
And the Python subscriber as follows:
python ./subscriber.py
Chat example
Simple example for newbies, an instant messaging application. Get the code here.
Camera example
This example demonstrates a more complex ØMQ application using multiple threads and transports. To get the code, see the git tree.
Cookbook
Message subscriptions
Messages that receivers are going to subscribe to have to start with a NULL-terminated string (the topic). To send the message ABCDE with the topic xy.z do the following:
zmq::message_t msg (10);
memcpy (msg.data (), "xy.z\x00" "ABCDE", 10);
Subscribing to all messages:
s.setsockopt (ZMQ_SUBSCRIBE, "", 0);
Subscribing to a particular topic:
s.setsockopt (ZMQ_SUBSCRIBE, "xy.z\x00", 5);
Subscribing to topics with a specified prefix:
s.setsockopt (ZMQ_SUBSCRIBE, "xy.", 3);
Multiple subscriptions can be active at the same time. In this case, any message matching at least one subscription will be passed to the user:
s.setsockopt (ZMQ_SUBSCRIBE, "x.", 2);
s.setsockopt (ZMQ_SUBSCRIBE, "y.", 2);
Sending a message using multicast
// Initialise 0mq context with a single application and single I/O thread zmq::context_t ctx (1, 1); // Create a ZMQ_PUB socket zmq::socket_t s (ctx, ZMQ_PUB); // Limit the throughput rate to 10Mb/s int64_t rate = 10000; s.setsockopt (ZMQ_RATE, &rate, sizeof (rate)); // Connect our socket to the multicast group 224.0.0.1:5555 on the eth0 interface s.connect ("udp://eth0;224.0.0.1:5555"); // Send a message zmq::message_t msg (10); memcpy (msg.data (), "xy.z\x00" "ABCDE", 10); s.send (msg);
Receiving a message using multicast
// Initialise 0mq context with a single application and single I/O thread zmq::context_t ctx (1, 1); // Create a ZMQ_SUB socket and bind it to the multicast group 224.0.0.1:5555 on eth0 zmq::socket_t s (ctx, ZMQ_SUB); s.connect ("udp://eth0;224.0.0.1:5555"); // Subscribe to all messages s.setsockopt (ZMQ_SUBSCRIBE, "", 0); // Receive a single message zmq::message_t msg; s.recv (&msg);
Using zmq_forwarder to set up a forwarding device
Create following configuration file myconfig.cfg. The forwarder will connect to IP address 192.168.0.1 and port 5555 to get the message feed. It will republish the feed to all applications that will connect to its interface eth0, port 5556:
<forwarder> <in> <connect addr = "tcp://192.168.0.1:5555"/> </in> <out> <bind addr = "tcp://eth0:5556"/> </out> </forwarder>
To start the forwarder:
zmq_forwarder ./myconfig.cfg
Establishing inter-thread communication
Following example shows how to establish a connection between worker thread and main thread using inproc transport. The messaging pattern used (publish/subscribe) is arbitrary and can be replaced by messaging pattern of your choice.
void *worker_routine (void *arg) { // Connect to the inproc endpoint created by the main thread. zmq::context_t *ctx = (zmq::context_t*) arg; zmq::socket_t sw (*ctx, ZMQ_PUB); sw.connect ("inproc://pipe"); ... } int main () { zmq::context_t ctx (2, 1); // Create inproc endoint called 'pipe'. zmq::socket_t sm (ctx, ZMQ_SUB); sm.setsockopt (ZMQ_SUBSCRIBE, "", 0); sm.bind ("inproc://pipe"); // Start the worker thread. pthread_t worker; int rc = pthread_create (&worker, NULL, worker_routine, (void*) &ctx); assert (rc == 0); ... }
