Thursday, September 19, 2013

Simple JMS Topic Example

I was recently dealing with some JMS issues, in order to do which I developed a simple utility to send/receive messages using ActiveMQ. Although it is a trivial thing to do, but I though I can share it here in case someone finds it useful.

You can find the complete code and import it to eclipse from here.


Here is a simple topic producer:
package com.test;

import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class SimpleTopicProducer implements Runnable
{
    private final String brokerUrl;
    public final String topicName;

    public SimpleTopicProducer(String brokerUrl, String topicName)
    {
        this.brokerUrl = brokerUrl;
        this.topicName = topicName;
    }

    public void run()
    {
        try
        {
            // Create a ConnectionFactory
            ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerUrl);

            // Create a Connection
            Connection connection = connectionFactory.createConnection();
            connection.start();

            // Create a Session
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

            // Create the destination (Topic or Queue)
            Destination destination = session.createTopic(topicName);

            // Create a MessageProducer from the Session to the Topic or
            // Queue
            MessageProducer producer = session.createProducer(destination);
            producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

            // Create a messages
            String text = "SimpleTopicProducer - From: " + Thread.currentThread().getName() + " : " + this.hashCode();
            TextMessage message = session.createTextMessage(text);

            // Tell the producer to send the message
            System.out.println("Sent message: " + message.hashCode() + " : " + Thread.currentThread().getName());
            producer.send(message);

            // Clean up
            session.close();
            connection.close();
        }
        catch (Exception e)
        {
            System.out.println("Caught: " + e);
            e.printStackTrace();
        }
    }
}

Here is a simple topic consumer:
package com.test;

import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class SimpleTopicConsumer implements Runnable, ExceptionListener
{
    private final String brokerUrl;
    private final String topicName;
    private final int lifetime;

    public SimpleTopicConsumer(String brokerUrl, String topicName, int lifetime)
    {
        this.brokerUrl = brokerUrl;
        this.topicName = topicName;
        this.lifetime = lifetime;
    }

    public void run()
    {
        try
        {
            System.out.println("SimpleTopicConsumer, started on " + topicName);

            // Create a ConnectionFactory
            ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerUrl);

            // Create a Connection
            Connection connection = connectionFactory.createConnection();
            connection.start();

            connection.setExceptionListener(this);

            // Create a Session
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

            // Create the destination (Topic or Queue)
            Destination destination = session.createTopic(topicName);

            // Create a MessageConsumer from the Session to the Topic or
            // Queue
            MessageConsumer consumer = session.createConsumer(destination);

            long startTime = System.currentTimeMillis();

            while (true)
            {
                long now = System.currentTimeMillis();
                if (now - startTime > lifetime)
                {
                    System.out.println("Time's up, exiting...");
                    break;
                }

                // Wait for a message
                Message message = consumer.receive(1000);

                if (message == null)
                    continue;

                if (message instanceof TextMessage)
                {
                    TextMessage textMessage = (TextMessage) message;
                    String text = textMessage.getText();
                    System.out.println("SimpleTopicConsumer - Received (text): " + text);
                }
                else
                {
                    System.out.println("SimpleTopicConsumer - Received: " + message);
                }
            }

            consumer.close();
            session.close();
            connection.close();
        }
        catch (Exception e)
        {
            System.out.println("Caught: " + e);
            e.printStackTrace();
        }
    }

    public synchronized void onException(JMSException ex)
    {
        System.out.println("JMS Exception occured.  Shutting down client.");
    }
}

This is the main program to put the things together:
package com.test;

import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;

public class SimpleJmsApp
{
    private static final String BROKER_URL = "tcp://localhost:61616?jms.prefetchPolicy.all=1000";
    private static final int CONSUME_LIFE_TIME_IN_MS = 3600 * 1000;
    private static final boolean START_PRODUCERS = false;
    private static final Set TOPICS = new HashSet<>(Arrays.asList("test_topic"));

    public static void main(String[] args) throws Exception
    {
        if (args.length > 0)
        {
            TOPICS.clear();
            TOPICS.addAll(Arrays.asList(args));
        }

        System.out.println("Now starting consumers...");
        for (String topic : TOPICS)
        {
            SimpleTopicConsumer consumer = new SimpleTopicConsumer(BROKER_URL, topic, CONSUME_LIFE_TIME_IN_MS);
            thread(consumer, false);
        }

        if (START_PRODUCERS)
        {
            Thread.sleep(1000);
            System.out.println("starting producers...");
            for (String topic : TOPICS)
            {
                SimpleTopicProducer producer = new SimpleTopicProducer(BROKER_URL, topic);
                thread(producer, false);
            }
        }
    }

    public static void thread(Runnable runnable, boolean daemon)
    {
        Thread brokerThread = new Thread(runnable);
        brokerThread.setDaemon(daemon);
        brokerThread.start();
    }
}