Search This Blog

Monday 11 February 2013

JSON messages with RabbitMQ

Using JSON message solution in RabbitMQ gives you something that's a bit more friendly in polyglot systems, and leaves your application with more future flexibility. Although we can use Java Objects this puts restrictions on the the recipients of your message as they're also going to need to be in Java. JSON (JavaScript Object Notation) is rather common alternative that is more flexible and portable across different languages and platforms.

In the example below we use the following

RabbitMQ 3.0 - http://www.rabbitmq.com/
JSON-Simple - http://code.google.com/p/json-simple/

1. Create a start up script to start a single RabbitMQ server.

export RABBITMQ_NODE_PORT=5676
export RABBITMQ_NODENAME=rabbit_standalone

export RABBIT_HOME=/Users/papicella/vmware/software/rabittMQ/rabbitmq_server-3.0.0

$RABBIT_HOME/sbin/rabbitmq-server -detached

netstat -an | grep 5676


2. Start as shown below.

[Mon Feb 11 21:07:17 papicella@:~/rabbitMQ/standalone-rabbit ] $ sudo ./node1.sh
Warning: PID file not written; -detached was passed.

tcp4       0      0  *.5676                 *.*                    LISTEN    
tcp46      0      0  *.5676                 *.*                    LISTEN   
 

3. Create a RECEIVE client which will create a QUEUE and wait for messages. IT is expecting a JSON string to be sent.

JSONRecv.java
  
package pas.au.rabbitmq30.tutorial.helloworld.json;

import org.json.simple.JSONObject;
import org.json.simple.parser.JSONParser;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;

public class JSONRecv 
{
 private final static String QUEUE_NAME = "json-example";
 private ConnectionFactory factory = null;
 private JSONParser parser;
 
 public JSONRecv() 
 {
  parser = new JSONParser();
 }

 public void run () throws Exception
 {
  factory = new ConnectionFactory();
     factory.setHost("localhost");
     factory.setPort(5676);
     Connection connection = factory.newConnection();
     Channel channel = connection.createChannel();

     channel.queueDeclare(QUEUE_NAME, false, false, false, null);
     System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
     
     QueueingConsumer consumer = new QueueingConsumer(channel);
     channel.basicConsume(QUEUE_NAME, true, consumer);
     
     while (true) 
     {
       QueueingConsumer.Delivery delivery = consumer.nextDelivery();
       String message = new String(delivery.getBody()); 
       JSONObject obj = (JSONObject) parser.parse(message);
       
       System.out.println(" [x] Received '" + obj.toJSONString() + "'");
     }  
 }
 
 /**
  * @param args
  * @throws Exception 
  */
 public static void main(String[] args) throws Exception 
 {
  // TODO Auto-generated method stub
  JSONRecv test = new JSONRecv();
  test.run();
 }

}  

4. Create a SEND client and place 10 JSON objects onto the QUEUE

JSONSend.java
  
package pas.au.rabbitmq30.tutorial.helloworld.json;

import org.json.simple.JSONObject;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class JSONSend 
{
 private final static String QUEUE_NAME = "json-example";
 private ConnectionFactory factory = null;
 
 public JSONSend() 
 {
  // TODO Auto-generated constructor stub
 }
 
 @SuppressWarnings("unchecked")
 public void run() throws Exception
 {
     factory = new ConnectionFactory(); 
     factory.setHost("localhost"); 
     factory.setPort(5676);
     
     System.out.println("connected to rabbitMQ on localhost ...");
     Connection connection = factory.newConnection();
     Channel channel = connection.createChannel();

     channel.queueDeclare(QUEUE_NAME, false, false, false, null);
     for (int i = 1; i <= 10; i++)
     {
      JSONObject obj = new JSONObject();
      
      obj.put("name", String.format("Person%s", i));
      obj.put("age", new Integer(37));
     
      channel.basicPublish("", QUEUE_NAME, null, obj.toJSONString().getBytes()); 
      System.out.println(" [x] Sent '" + obj.toJSONString() + "'");
     }
     
     channel.close();
     connection.close();
 }
 
 /**
  * @param args
  * @throws Exception 
  */
 public static void main(String[] args) throws Exception 
 {
  // TODO Auto-generated method stub
  JSONSend test = new JSONSend();
  test.run();
 }

}

5. Run JSONRecv.java and verify output as follows

[*] Waiting for messages. To exit press CTRL+C

6. Run JSONSend.java and then check the output from JSONRecv.java.

 [*] Waiting for messages. To exit press CTRL+C
 [x] Received '{"name":"Person1","age":37}'
 [x] Received '{"name":"Person2","age":37}'
 [x] Received '{"name":"Person3","age":37}'
 [x] Received '{"name":"Person4","age":37}'
 [x] Received '{"name":"Person5","age":37}'
 [x] Received '{"name":"Person6","age":37}'
 [x] Received '{"name":"Person7","age":37}'
 [x] Received '{"name":"Person8","age":37}'
 [x] Received '{"name":"Person9","age":37}'
 [x] Received '{"name":"Person10","age":37}'


For more information on RabbitMQ view the link below.

http://www.rabbitmq.com/


No comments: