jms

Apache ActiveMQ Best Practices Tutorial

Photo of Mary ZhengMary ZhengDecember 5th, 2017Last Updated: March 4th, 2019
0 2,264 15 minutes read

Apache ActiveMQ is an open source messaging server written in Java which implements JMS 1.1 specifications. In this tutorial, you will learn how to develop a few Java applications which integrate ActiveMQ to send and receive messages to and from destinations. If you already know how to install and configure ActiveMQ, you can skip the first four chapters.
 
 
 
 
 
 
 

Table Of Contents

1. Introduction
2. Install an Apache ActiveMQ Server
3. Start the Apache ActiveMQ Server
4. Monitor the Apache ActiveMQ Server
5. Business Use Cases
6. Define JMS Message
6.1 Message Destination
6.2 Message Header
6.3 Message Body
6.4 Configure Virtual Topic
7. Apache ActiveMQ Java Client Library
8. Publish Message Application
8.1 ActiveMQMessgeProducer
8.2 ActiveMQMessgeProducerTest
8.3 Execution Output
8.4 OnBoardNewCustomerApp
9. Consume Message Application
9.1 ActiveMQMessageConsumer
9.2 ActiveMQMessageConsumerMainApp
9.3 Execution Output
10. Integration with Spring JMS
10.1 Add Spring JMS dependency
10.2 Configure Spring Beans
10.3 MessageSender
10.4 BillingAppListener
10.5 SupportAppListener
10.6 ConfigBillingforNewCustomerApp
10.7 ConfigSupportforNewCustomerApp
10.8 Run as Distributed Systems
11. Integrating with Tomcat
11.1 Configure Tomcat Resource
11.2 Look up JNDI Resource
12. Common Problems
12.1 Slow Consumer Application
12.2 ActiveMQ Sends Unwanted messages to Virtual Topic Queue
12.3 Exception Handler
13. Summary
14. References
15. Download the Source Code

1. Introduction

Apache ActiveMQ (AMQ) isJMS 1.1 implementation fromthe Apache Software Foundation.

AMQ is amessage broker which translates the messages from the sender to the receiver. Message brokers are the building blocks of message-oriented middleware (MOM) architecture.

AMQ is one of the best open source messaging andIntegration Patterns server. It provides a communication between applications, as well as fulfills both notification and inter-operation needs among the applications.

2. Install an Apache ActiveMQ Server

Most of business applications treat the AMQ as an infrastructure resource. We will install an AMQ server as a standalone server in this tutorial. Follow these instructions, we installed the AMQ 5.15.0.

3. Start the Apache ActiveMQ Server

Navigate to\apache-activemq-5.15.0\bin\win64 directory and click on theactivemq.bat to start the server.

The output below demonstrates that the server started successfully.

server.log

jvm 1    |  INFO | Apache ActiveMQ 5.15.0 (localhost, ID:SL2LS431841-57319-1512184574307-0:1) startedjvm 1    |  INFO | For help or more information please see: http://activemq.apache.org

4. Monitor the Apache ActiveMQ Server

AMQ provides a web console application to monitor and administrate. After the AMQ server starts, follow the steps below to launch the web console.

  • Open a Browser: Chrome, IE, Firefox, etc
  • Enter the URL:localhost:8161/admin/index.php
  • Enter admin/admin as username/password

Here you should see the “Welcome” page. Users can send, read, and delete messages via the web console.

5. Business Use Cases

Company X provides services to customers. Each new customer will be set up at billing and support systems.

In this tutorial, we will demonstrate how to build customer on-boarding process, billing system, support application, and integrate them via AMQ:

  • OnBoardNewCustomerApp which sets up new customers and sends the new customer events to ActiveMQ customer topic
  • ConfigBillingForNewCustomerApp which listens to the new customer events from the virtual topic and configures it into the billing application
  • ConfigSupportForNewCustomerApp which listens to the new customer events from the virtual topic and configures it into the support application

6. Define JMS Message

6.1 Message Destination

For this business use case, both billing and support systems get notified when new customer joins. We choose the publish/subscribe message pattern to build theOnBoardNewCustomerApp which publishes the customer event to AMQ broker topic:VirtualTopic.Customer.Topic.
There are three special characters reserved by AMQ when naming the destination:

  • . is used to separate names in a path
  • * is used to match any name in a path
  • > is used to recursively match any destination starting from this name

6.2 Message Header

The message header provides meta data about the message used by both clients and the AMQ brokers. There are sets of pre-defined JMS message header. Giving two examples below:

  • JMSXGroupID: utilize this if you want some group of message to always go to same consumer
  • JMXCorrelationId: use this to link the message together

6.3 Message Body

The message body is the actual message that integrates the applications together. For this example, the message is Json format of theCustomerEvent.

CustomerEvent

package jcg.demo.model;public class CustomerEvent {private String type;private Integer customerId;public CustomerEvent(String type, Integer customerId) {this.type = type;this.customerId = customerId;}public String getType() {return type;}public Integer getCustomerId() {return customerId;}public String toString() {return "CustomerEvent: type(" + type + "), customerId(" + customerId + ")";}public String getCustomerDetailUri() {return "https://localhost:8080/support/customer/" + customerId;}}

6.4 Configure Virtual Topic

AMQ server installation comes with a ready to use configuration file. Modify theactivemq.xml to add below to allow AMQ Broker forwards the messages from any topic named asVirtualTopic.*.Topic to any virtutal topic destination with name starts as Consumer.*.

activemq.xml

 <destinationInterceptors>        <virtualDestinationInterceptor>             <virtualDestinations>                 <virtualTopic name="VirtualTopic.>" prefix="Consumer.*." selectorAware="false"/>                 <virtualTopic name="JCG.>" prefix="VTC.*." selectorAware="true"/>             </virtualDestinations>       </virtualDestinationInterceptor></destinationInterceptors>
  • line 4: Configure Virtual Topic to disableselectorAware
  • line 4: Configure Virtual Topic to enableselectorAware

Restart the AMQ server after the configuration file updates.

7. Apache ActiveMQ Java Client Library

Add ActiveMQ Java library to the project pom.xml.

pom.xml

<dependency><groupId>org.apache.activemq</groupId><artifactId>activemq-client</artifactId><version>5.15.0</version></dependency>

8. Publish Message Application

In this example, you will see how to createActiveMQMessgeProducer to send the messages.

8.1 ActiveMQMessgeProducer

A Java class wraps the ActiveMQ Java API to send the messages.

ActiveMQMessgeProducer

package jcg.demo.activemq;import java.util.Random;import javax.jms.Connection;import javax.jms.ConnectionFactory;import javax.jms.Destination;import javax.jms.JMSException;import javax.jms.MessageProducer;import javax.jms.Session;import javax.jms.TextMessage;import org.apache.activemq.ActiveMQConnectionFactory;import org.apache.activemq.RedeliveryPolicy;import com.google.gson.Gson;import jcg.demo.jms.util.DataUtil;/** * A simple message producer which sends the message to the ActiveMQ Broker. *  * @author Mary.Zheng * */public class ActiveMQMessageProducer {private static final String ACTION_ID_HEADER = "actionId";private static final String ACTION_HEADER = "action";private ConnectionFactory connFactory;private Connection connection;private Session session;private Destination destination;// https://docs.oracle.com/javaee/7/api/javax/jms/MessageProducer.htmlprivate MessageProducer msgProducer;private String activeMqBrokerUri;private String username;private String password;public ActiveMQMessageProducer(final String activeMqBrokerUri, final String username, final String password) {super();this.activeMqBrokerUri = activeMqBrokerUri;this.username = username;this.password = password;}public void setup(final boolean transacted, final boolean isDestinationTopic, final String destinationName)throws JMSException {setConnectionFactory(activeMqBrokerUri, username, password);setConnection();setSession(transacted);setDdestination(isDestinationTopic, destinationName);setMsgProducer();}public void close() throws JMSException {if (msgProducer != null) {msgProducer.close();msgProducer = null;}if (session != null) {session.close();session = null;}if (connection != null) {connection.close();connection = null;}}public void commit(final boolean transacted) throws JMSException {if (transacted) {session.commit();}}public void sendMessage(final String actionVal) throws JMSException {TextMessage textMessage = buildTextMessageWithProperty(actionVal);msgProducer.send(destination, textMessage);// msgProducer.send(textMessage, DeliveryMode.NON_PERSISTENT, 0, 0);}private TextMessage buildTextMessageWithProperty(final String action) throws JMSException {Gson gson = new Gson();String eventMsg = gson.toJson(DataUtil.buildDummyCustomerEvent());TextMessage textMessage = session.createTextMessage(eventMsg);Random rand = new Random();int value = rand.nextInt(100);textMessage.setStringProperty(ACTION_HEADER, action);textMessage.setStringProperty(ACTION_ID_HEADER, String.valueOf(value));return textMessage;}private void setDdestination(final boolean isDestinationTopic, final String destinationName) throws JMSException {if (isDestinationTopic) {destination = session.createTopic(destinationName);} else {destination = session.createQueue(destinationName);}}private void setMsgProducer() throws JMSException {msgProducer = session.createProducer(destination);}private void setSession(final boolean transacted) throws JMSException {// transacted=true for better performance to push message in batch modesession = connection.createSession(transacted, Session.AUTO_ACKNOWLEDGE);}private void setConnection() throws JMSException {connection = connFactory.createConnection();connection.start();}private void setConnectionFactory(final String activeMqBrokerUri, final String username, final String password) {connFactory = new ActiveMQConnectionFactory(username, password, activeMqBrokerUri);((ActiveMQConnectionFactory) connFactory).setUseAsyncSend(true);RedeliveryPolicy policy = ((ActiveMQConnectionFactory) connFactory).getRedeliveryPolicy();policy.setInitialRedeliveryDelay(500);policy.setBackOffMultiplier(2);policy.setUseExponentialBackOff(true);policy.setMaximumRedeliveries(2);}}
  • line 51-55: Wire connection, session with correct order. Spring JMS Dependency Injection takes care of it for you.
  • line 58-73: Close connection. Spring JMS takes care of it for you
  • line 84: Define the durability of message. All message are durable by default. We can turn off to get better performance

8.2 ActiveMQMessgeProducerTest

This Junit test sends the messages to various destinations. This is my convenient way to send the message to the destination.

ActiveMQMessgeProducerTest

package jcg.demo.activemq;import javax.jms.JMSException;import org.junit.After;import org.junit.Before;import org.junit.Test;import jcg.demo.jms.util.DataUtil;public class ActiveMQMessageProducerTest {private ActiveMQMessageProducer msgQueueSender;@Beforepublic void setup() {msgQueueSender = new ActiveMQMessageProducer("tcp://localhost:61616", "admin", "admin");}@Afterpublic void cleanup() throws JMSException {msgQueueSender.close();}@Testpublic void send_msg_to_no_transaction_Queue() throws JMSException {msgQueueSender.setup(false, false, DataUtil.TEST_GROUP1_QUEUE_1);msgQueueSender.sendMessage("JCG");}@Testpublic void send_msg_to_Group2_Queue1() throws JMSException {msgQueueSender.setup(false, false, DataUtil.TEST_GROUP2_QUEUE_1);msgQueueSender.sendMessage("JCG");}@Testpublic void send_msg_to_transaction_Group1_Queue2() throws JMSException {msgQueueSender.setup(true, false, DataUtil.TEST_GROUP1_QUEUE_2);msgQueueSender.sendMessage("DEMO");msgQueueSender.commit(true);}@Testpublic void send_msg_to_no_transaction_Group1_Topic() throws JMSException {msgQueueSender.setup(false, true, DataUtil.TEST_GROUP1_TOPIC);msgQueueSender.sendMessage("MZHENG");}@Testpublic void send_msg_to_Virtual_Topic() throws JMSException {msgQueueSender.setup(false, true, DataUtil.CUSTOMER_VTC_TOPIC);msgQueueSender.sendMessage("MZHENG");}@Testpublic void send_msg_to_Virtual_Topic_WithSelector() throws JMSException {msgQueueSender.setup(false, true, DataUtil.TEST_VTC_TOPIC_SELECTOR);msgQueueSender.sendMessage("DZONE");}}
  • line 27-28: Send to queuetest.group1.queue1
  • line 33-34: Send to queuetest.group2.queue1
  • line 39-41: Send to queuetest.group1.queue2
  • line 46-47: Send to normal topictest.group1.topic
  • line 52-53: Send to selector unaware topicVirtualTopic.Customer.Topic
  • line 58-59: Send to selector aware topicJCG.Mary.Topic

8.3 Execution Output

We ran theActiveMQMessgeProducerTest to send message to three queues and three topics. You can verified by viewing the AMQ web console. There are one pending messages in each of three queues:test.group1.queue1,test.group1.queue2, andtest.group2.queue1.

There is one messages in each of three topics:JCG.Mary.Topic,test.group1.topic andVirtualTopic.Customer.Topic.

8.4 OnBoardNewCustomerApp

OnBoardNewCustomerApp sends the new customer message to theVirtualTopic.Customer.Topic.

OnBoardNewCustomerApp

package jcg.demo.activemq.app;import jcg.demo.activemq.ActiveMQMessageProducer;import jcg.demo.jms.util.DataUtil;public class OnBoardNewCustomerApp {public static void main(String[] args) {ActiveMQMessageProducer msgQueueSender = new ActiveMQMessageProducer("tcp://localhost:61616", "admin", "admin");try {msgQueueSender.setup(false, true, DataUtil.CUSTOMER_VTC_TOPIC);msgQueueSender.sendMessage("CUSTOMER");} catch (Exception e) {e.printStackTrace();}}}

ExecuteOnBoardNewCustomerApp sends a customer message to theVirtualTopic.Customer.Topic. However, since there is no consumer yet, so AMQ Broker will not send any message to the virtual topic queue yet.

9. Consume Message Application

9.1 ActiveMQMessageConsumer

A message consumer utilitizes AMQ java API.

ActiveMQMessgeConsumer

package jcg.demo.activemq;import java.util.Enumeration;import javax.jms.Connection;import javax.jms.Destination;import javax.jms.JMSException;import javax.jms.Message;import javax.jms.MessageConsumer;import javax.jms.MessageListener;import javax.jms.Session;import javax.jms.TextMessage;import javax.jms.Topic;import org.apache.activemq.ActiveMQConnectionFactory;/** * A simple message consumer which consumes the message from ActiveMQ Broker. *  * @author Mary.Zheng * */public class ActiveMQMessageConsumer implements MessageListener {private String activeMqBrokerUri;private String username;private String password;private boolean isDestinationTopic;private String destinationName;private String selector;private String clientId;public ActiveMQMessageConsumer(String activeMqBrokerUri, String username, String password) {super();this.activeMqBrokerUri = activeMqBrokerUri;this.username = username;this.password = password;}public void run() throws JMSException {ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(username, password, activeMqBrokerUri);if (clientId != null) {factory.setClientID(clientId);}Connection connection = factory.createConnection();if (clientId != null) {connection.setClientID(clientId);}Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);setComsumer(session);connection.start();System.out.println(Thread.currentThread().getName() + ": ActiveMQMessageConsumer Waiting for messages at "+ destinationName);}private void setComsumer(Session session) throws JMSException {MessageConsumer consumer = null;if (isDestinationTopic) {Topic topic = session.createTopic(destinationName);if (selector == null) {consumer = session.createConsumer(topic);} else {consumer = session.createConsumer(topic, selector);}} else {Destination destination = session.createQueue(destinationName);if (selector == null) {consumer = session.createConsumer(destination);} else {consumer = session.createConsumer(destination, selector);}}consumer.setMessageListener(this);}@Overridepublic void onMessage(Message message) {String msg;try {msg = String.format("[%s]: ActiveMQMessageConsumer Received message from [ %s] - Headers: [ %s] Message: [ %s ]",Thread.currentThread().getName(), destinationName, getPropertyNames(message),((TextMessage) message).getText());System.out.println(msg);} catch (JMSException e) {e.printStackTrace();}}private String getPropertyNames(Message message) throws JMSException {String props = "";@SuppressWarnings("unchecked")Enumeration properties = message.getPropertyNames();while (properties.hasMoreElements()) {String propKey = properties.nextElement();props += propKey + "=" + message.getStringProperty(propKey) + " ";}return props;}public void setSelector(String selector) {this.selector = selector;}public boolean isDestinationTopic() {return isDestinationTopic;}public String getDestinationName() {return destinationName;}public String getSelector() {return selector;}public String getClientId() {return clientId;}public void setDestinationTopic(boolean isDestinationTopic) {this.isDestinationTopic = isDestinationTopic;}public void setDestinationName(String destinationName) {this.destinationName = destinationName;}public void setClientId(String clientId) {this.clientId = clientId;}}
  • line 23: CreateActiveMQMessageConsumer by implementingjavax.jms.MessageListener
  • line 44: Set connectionclientID
  • line 62: Create a topic
  • line 65: Create message consumer from a topic without selector
  • line 67: Create message consumer from a topic with selector
  • line 70: Create a queue
  • line 73: Create message consumer from a queue without selector
  • line 75: Create message consumer from a queue with selector
  • line 79: Register message listener
  • line 83:Override theonMessage

9.2 ActiveMQMessageConsumerMainApp

CreateActiveMQMessageConsumerMainApp to consume from various destinations.

ActiveMQMessageConsumerMainApp

package jcg.demo.activemq.app;import javax.jms.JMSException;import jcg.demo.activemq.ActiveMQMessageConsumer;import jcg.demo.jms.util.DataUtil;public class ActiveMQMessageConsumerMainApp {public static void main(String[] args) {consumeCustomerVTCQueue();consumerVTCQueueWithSelector();consumeGroup1Topic();consumeAllGroup2();consume_queue_with_prefetchsize();}private static void consumeCustomerVTCQueue() {// the message in the topic before this subscriber starts will not be// picked up.ActiveMQMessageConsumer queueMsgListener = new ActiveMQMessageConsumer("tcp://localhost:61616", "admin","admin");queueMsgListener.setDestinationName("Consumer.zheng." + DataUtil.CUSTOMER_VTC_TOPIC);try {queueMsgListener.run();} catch (JMSException e) {e.printStackTrace();}}private static void consumerVTCQueueWithSelector() {ActiveMQMessageConsumer queueMsgListener = new ActiveMQMessageConsumer("tcp://localhost:61616", "admin","admin");queueMsgListener.setDestinationName("VTC.DZONE." + DataUtil.TEST_VTC_TOPIC_SELECTOR);queueMsgListener.setSelector("action='DZONE'");try {queueMsgListener.run();} catch (JMSException e) {e.printStackTrace();}}private static void consumeGroup1Topic() {ActiveMQMessageConsumer queueMsgListener = new ActiveMQMessageConsumer("tcp://localhost:61616", "admin","admin");queueMsgListener.setDestinationName(DataUtil.TEST_GROUP1_TOPIC);queueMsgListener.setDestinationTopic(true);try {queueMsgListener.run();} catch (JMSException e) {e.printStackTrace();}}private static void consumeAllGroup2() {ActiveMQMessageConsumer queueMsgListener = new ActiveMQMessageConsumer("tcp://localhost:61616", "admin","admin");queueMsgListener.setDestinationName("*.group2.*");try {queueMsgListener.run();} catch (JMSException e) {e.printStackTrace();}}private static void exclusive_queue_Consumer() {ActiveMQMessageConsumer queueMsgListener = new ActiveMQMessageConsumer("tcp://localhost:61616", "admin","admin");queueMsgListener.setDestinationName(DataUtil.TEST_GROUP2_QUEUE_2 + "?consumer.exclusive=true");try {queueMsgListener.run();} catch (JMSException e) {e.printStackTrace();}}private static void consume_queue_with_prefetchsize() {ActiveMQMessageConsumer queueMsgListener = new ActiveMQMessageConsumer("tcp://localhost:61616", "admin","admin");queueMsgListener.setDestinationName(DataUtil.TEST_GROUP1_QUEUE_2 + "?consumer.prefetchSize=10");try {queueMsgListener.run();} catch (JMSException e) {e.printStackTrace();}}}
  • line 25: Consume from virtual topic queueConsumer.zheng.VirtualTopic.Customer.Topic
  • line 38-39: Consume from virtual topic queueVTC.DZONE.JCG.Mary.Topic which message selector set asaction='DZONE'
  • line 51: Consume from topictest.group1.topic
  • line 65: Consume from any queue name matches the*.group2.*"
  • line 78: Set exclusive message consumer. It will fail over if one consumer is down then the other will be picked to continue
  • line 91: SetpreFetch size for the consumer

9.3 Execution Output

Now, started theActiveMQMessageConsumerMainApp. Here is the application output:

ActiveMQMessageConsumerMainApp Output

main: ActiveMQMessageConsumer Waiting for messages at Consumer.zheng.VirtualTopic.Customer.Topicmain: ActiveMQMessageConsumer Waiting for messages at VTC.DZONE.JCG.Mary.Topicmain: ActiveMQMessageConsumer Waiting for messages at test.group1.topicmain: ActiveMQMessageConsumer Waiting for messages at *.group2.*[ActiveMQ Session Task-1]: ActiveMQMessageConsumer Received message from [ *.group2.*] - Headers: [ action=JCG actionId=40 ] Message: [ {"type":"NEWCUSTOMER","customerId":79} ]main: ActiveMQMessageConsumer Waiting for messages at test.group1.queue2?consumer.prefetchSize=10[ActiveMQ Session Task-1]: ActiveMQMessageConsumer Received message from [ test.group1.queue2?consumer.prefetchSize=10] - Headers: [ action=DEMO actionId=84 ] Message: [ {"type":"NEWCUSTOMER","customerId":28} ]

Now executeOnBoardNewConsumerApp a couple times. Here you see two lines printed out from the running consumer application console as the output below.

Want to be an ActiveMQ Master ?
Subscribe to our newsletter and download theApache ActiveMQCookbookright now!
In order to help you master Apache ActiveMQ JMS, we have compiled a kick-ass guide with all the major ActiveMQ features and use cases! Besides studying them online you may download the eBook in PDF format!

Thank you!

We will contact you soon.

ActiveMQMessageConsumerMainApp Output Continue

[ActiveMQ Session Task-1]: ActiveMQMessageConsumer Received message from [ Consumer.zheng.VirtualTopic.Customer.Topic] - Headers: [ action=CUSTOMER actionId=15 ] Message: [ {"type":"NEWCUSTOMER","customerId":51} ][ActiveMQ Session Task-2]: ActiveMQMessageConsumer Received message from [ Consumer.zheng.VirtualTopic.Customer.Topic] - Headers: [ action=CUSTOMER actionId=75 ] Message: [ {"type":"NEWCUSTOMER","customerId":73} ]

Always verify and confirm via the AMQ web console.

10. Integration with Spring JMS

Spring JMS provides a JMS integration framework that simplifies the use of the JMS API.

10.1 Add Spring JMS dependency

Add Spring JMS library to the project pom.xml.

pom.xml

<dependency><groupId>org.springframework</groupId><artifactId>spring-core</artifactId><version>4.1.5.RELEASE</version></dependency><dependency><groupId>org.springframework</groupId><artifactId>spring-context</artifactId><version>4.1.5.RELEASE</version></dependency><dependency><groupId>org.springframework</groupId><artifactId>spring-jms</artifactId><version>4.1.5.RELEASE</version></dependency>

10.2 Configure Spring Beans

Add Spring JMS Beans to the context.

JmsConfig

package jcg.demo.spring.jms.config;import javax.jms.ConnectionFactory;import org.apache.activemq.ActiveMQConnectionFactory;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.ComponentScan;import org.springframework.context.annotation.Configuration;import org.springframework.jms.annotation.EnableJms;import org.springframework.jms.config.DefaultJmsListenerContainerFactory;import org.springframework.jms.connection.CachingConnectionFactory;import org.springframework.jms.core.JmsTemplate;import org.springframework.jms.support.destination.DestinationResolver;import org.springframework.jms.support.destination.DynamicDestinationResolver;import jcg.demo.spring.jms.component.JmsExceptionListener;@Configuration@EnableJms@ComponentScan(basePackages = "jcg.demo.spring.jms.component, jcg.demo.spring.service")public class JmsConfig {private String concurrency = "1-10";private String brokerURI = "tcp://localhost:61616";@Autowired@Beanpublic DefaultJmsListenerContainerFactory jmsListenerContainerFactory(JmsExceptionListener jmsExceptionListener) {DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();factory.setConnectionFactory(jmsConnectionFactory(jmsExceptionListener));factory.setDestinationResolver(destinationResolver());factory.setConcurrency(concurrency);factory.setPubSubDomain(false);return factory;}@Bean@Autowiredpublic ConnectionFactory jmsConnectionFactory(JmsExceptionListener jmsExceptionListener) {return createJmsConnectionFactory(brokerURI, jmsExceptionListener);}private ConnectionFactory createJmsConnectionFactory(String brokerURI, JmsExceptionListener jmsExceptionListener) {ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(brokerURI);activeMQConnectionFactory.setExceptionListener(jmsExceptionListener);CachingConnectionFactory connectionFactory = new CachingConnectionFactory(activeMQConnectionFactory);return connectionFactory;}@Bean(name = "jmsQueueTemplate")@Autowiredpublic JmsTemplate createJmsQueueTemplate(ConnectionFactory jmsConnectionFactory) {return new JmsTemplate(jmsConnectionFactory);}@Bean(name = "jmsTopicTemplate")@Autowiredpublic JmsTemplate createJmsTopicTemplate(ConnectionFactory jmsConnectionFactory) {JmsTemplate template = new JmsTemplate(jmsConnectionFactory);template.setPubSubDomain(true);return template;}@Beanpublic DestinationResolver destinationResolver() {return new DynamicDestinationResolver();}}

As you seen here, the order to create these Beans is managed by the Spring Dependency Injection.

10.3 MessageSender

A class to send messages based on Spring JMS framework.

MessageSender

package jcg.demo.spring.jms.component;import java.util.Map;import javax.jms.JMSException;import javax.jms.Message;import javax.jms.Session;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.jms.core.JmsTemplate;import org.springframework.jms.core.MessageCreator;import org.springframework.stereotype.Component;@Componentpublic class MessageSender {@Autowiredprivate JmsTemplate jmsQueueTemplate;@Autowiredprivate JmsTemplate jmsTopicTemplate;public void postToQueue(final String queueName, final String message) {MessageCreator messageCreator = new MessageCreator() {@Overridepublic Message createMessage(Session session) throws JMSException {return session.createTextMessage(message);}};jmsQueueTemplate.send(queueName, messageCreator);}public void postToQueue(final String queueName, Map headers, final String message) {jmsQueueTemplate.send(queueName, new MessageCreator() {@Overridepublic Message createMessage(Session session) throws JMSException {Message msg = session.createTextMessage(message);headers.forEach((k, v) -> {try {msg.setStringProperty(k, v);} catch (JMSException e) {System.out.println(String.format("JMS fails to set the Header value '%s' to property '%s'", v, k));}});return msg;}});}public void postToTopic(final String topicName, Map headers, final String message) {jmsTopicTemplate.send(topicName, new MessageCreator() {@Overridepublic Message createMessage(Session session) throws JMSException {Message msg = session.createTextMessage(message);headers.forEach((k, v) -> {try {msg.setStringProperty(k, v);} catch (JMSException e) {System.out.println(String.format("JMS fails to set the Header value '%s' to property '%s'", v, k));}});return msg;}});}}

As you seen here, theMessageSender is simpler than theActiveMQMessageProducer created at step8.1.

10.4 BillingAppListener

A listener listens the new customer events and integrates with billing system.

BillingAppListener

package jcg.demo.spring.jms.component;import javax.jms.Destination;import javax.jms.JMSException;import javax.jms.TextMessage;import org.apache.activemq.command.ActiveMQQueue;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.jms.core.JmsTemplate;import org.springframework.stereotype.Component;import jcg.demo.jms.util.DataUtil;import jcg.demo.model.CustomerEvent;import jcg.demo.spring.service.BillingService;import jcg.demo.spring.service.MessageTransformer;@Componentpublic class BillingAppListener {@Autowiredprivate JmsTemplate jmsQueueTemplate;@Autowiredprivate BillingService billingService;@Autowiredprivate MessageTransformer msgTransformer;private String queueName = "Consumer.Billing." + DataUtil.CUSTOMER_VTC_TOPIC;public String receiveMessage() throws JMSException {System.out.println(Thread.currentThread().getName() + ": BillingAppListener receiveMessage.");Destination destination = new ActiveMQQueue(queueName);TextMessage textMessage = (TextMessage) jmsQueueTemplate.receive(destination);CustomerEvent customerEvt = msgTransformer.fromJson(textMessage.getText(), CustomerEvent.class);return billingService.handleNewCustomer(customerEvt);}}

As you seen here, this class is simpler than theActiveMQMessageConsumer created at step9.1.

10.5 SupportAppListener

A listener listens the new customer events and integrates with the support system.

SupportAppListener

package jcg.demo.spring.jms.component;import javax.jms.Destination;import javax.jms.JMSException;import javax.jms.TextMessage;import org.apache.activemq.command.ActiveMQQueue;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.jms.core.JmsTemplate;import org.springframework.stereotype.Component;import jcg.demo.jms.util.DataUtil;import jcg.demo.model.CustomerEvent;import jcg.demo.spring.service.MessageTransformer;import jcg.demo.spring.service.SupportService;@Componentpublic class SupportAppListener {@Autowiredprivate JmsTemplate jmsQueueTemplate;@Autowiredprivate SupportService supportService;@Autowiredprivate MessageTransformer msgTransformer;private String queueName = "Consumer.Support." + DataUtil.CUSTOMER_VTC_TOPIC;public String receiveMessage() throws JMSException {System.out.println(Thread.currentThread().getName() + ": SupportAppListener receiveMessage." );Destination destination = new ActiveMQQueue(queueName);TextMessage textMessage = (TextMessage) jmsQueueTemplate.receive(destination);CustomerEvent customerEvt = msgTransformer.fromJson(textMessage.getText(), CustomerEvent.class);return supportService.handleNewCustomer(customerEvt);}}

10.6 ConfigBillingforNewCustomerApp

Configure a Spring context to consume the new customer events to integrates with the billing system.

ConfigBillingforNewCustomerApp

package jcg.demo.spring.jms.app;import java.net.URISyntaxException;import org.springframework.context.annotation.AnnotationConfigApplicationContext;import org.springframework.context.annotation.Configuration;import com.google.gson.Gson;import jcg.demo.spring.jms.component.BillingAppListener;import jcg.demo.spring.jms.config.JmsConfig;@Configurationpublic class ConfigBillingForNewCustomerApp {public static void main(String[] args) throws URISyntaxException, Exception {Gson gson = new Gson();AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(JmsConfig.class);context.register(ConfigBillingForNewCustomerApp.class);try {BillingAppListener billingAppListener = (BillingAppListener) context.getBean("billingAppListener");System.out.println("ConfigBillingForewCustomerApp receives " + billingAppListener.receiveMessage());} finally {context.close();}}}

10.7 ConfigSupportforNewCustomerApp

Configure a Spring context to consume the new customer events to integrates with the support system.

ConfigSupportforNewCustomerApp

package jcg.demo.spring.jms.app;import java.net.URISyntaxException;import org.springframework.context.annotation.AnnotationConfigApplicationContext;import org.springframework.context.annotation.Configuration;import com.google.gson.Gson;import jcg.demo.spring.jms.component.SupportAppListener;import jcg.demo.spring.jms.config.JmsConfig;@Configurationpublic class ConfigSupportForNewCustomerApp {public static void main(String[] args) throws URISyntaxException, Exception {Gson gson = new Gson();AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(JmsConfig.class);context.register(ConfigSupportForNewCustomerApp.class);try {SupportAppListener supportAppListener = (SupportAppListener) context.getBean("supportAppListener");System.out.println("supportAppListener receives " + supportAppListener.receiveMessage());} finally {context.close();}}}

10.8 Run as Distributed Systems

By far, we built one Java JMS application –OnBoardNewCustomerApp and two Spring JMS applications:ConfigBillingForNewCustomerApp andConfigSupportForNewCustomerApp. Now it’s the time to run them together to enable the onborading customer process integrates with both billing and support system.

ConfigBillingForNewCustomerApp Output

main: ConfigBillingForNewCustomerApp receiveMessage.

ConfigSupportForNewCustomerApp Ourput

main: ConfigSupportForNewCustomerAppreceiveMessage.

Execute theOnBoardNewCustomerApp. Here you will see both consumer received the customer message and processed them.

ConfigBillingForNewCustomerApp Output Continue

ConfigBillingForewCustomerApp receives BillingService handleNewCustomer CustomerEvent: type(NEWCUSTOMER), customerId(41)

ConfigSupportForNewCustomerApp Output Continue

ConfigSupportForNewCustomerApp receives SupportService handleNewCustomer CustomerEvent: type(NEWCUSTOMER), customerId(41)

You just witnessed a working distributed system.

11. Integrating with Tomcat

11.1 Configure Tomcat Resource

Configure Tomcat context.xml with AMQ resource as below.

context.xml

 <Resource name="jms/ConnectionFactory" global="jms/ConnectionFactory" auth="Container"        type="org.apache.activemq.ActiveMQConnectionFactory"        factory="org.apache.activemq.jndi.JNDIReferenceFactory"        brokerURL="tcp://localhost:61616"        userName="admin"        password="admin"        useEmbeddedBroker="false"/>

11.2 Look up JNDI Resource

UsejndiContext.lookup to look up theActiveMQConnectionFactory from the JNDI resource.

JmsConfig

private ConnectionFactory createJmsConnectionFactory(String jndiName, JMSExceptionListener exceptionListener) {CachingConnectionFactory connectionFactory = null;try {Context jndiContext = new InitialContext();Context envContext = (Context) jndiContext.lookup("java:comp/env");ActiveMQConnectionFactory activeMQConnectionFactory = (ActiveMQConnectionFactory) envContext.lookup(jndiName);connectionFactory = new CachingConnectionFactory(activeMQConnectionFactory);connectionFactory.setExceptionListener(exceptionListener);} catch (NamingException e) {String msg = String.format("Unable to get JMS container with name %s ", jndiName);throw new RuntimeException(msg, e);}        return connectionFactory;    }

12. Common Problems

There are three common problems when developing an ActiveMQ application.

12.1 Slow Consumer Application

When the AMQ console shows that there are growing numbers of pending messages. It indicates that the consumer’s application is slower than the producer publishes the messages. There are several ways to address this issue:

  • The publishers publish the messages with a similar speed to the consumers consuming the messages
  • The publishers publish the messages to different destinations to reduce the total messages consumers consume
  • The consumers improve the speed it takes to process the message by separating any long processes from the main thread to an asynchronous thread

12.2 ActiveMQ Sends Unwanted Messages to Virtual Topic Queue

There a bug found in an AMQ broker which sends unwanted messages to the virtual queue when selector is defined. Our solution is let the applications handle the selector by setting theselectorAware to false.

12.3 Exception Handler

Some applications redeliver the message back to destination when it encounters an exception. This may jam up the destination if it fails again. The better solution is to have separate exception handler to deal with any exceptions.

13. Summary

In this tutorial, we outlined the steps to install the configure the AMQ server and demonstrated:

  • how to install and configure
  • how to build AMQ applications via ActiveMQ library
  • how to build AMQ applications with Spring JMS framework
  • how to integrate with Tomcat web container

We also described three common problems when developing an AMQ application.

14. References

    1. ActionMQ in Action
    2. Apache ActiveMQ

15. Download the Source Code

This example builds several java applications to send and receive messages via the AMQ broker.

Download
You can download the full source code of this example here:Apache ActiveMQ Best Practices Tutorial
Do you want to know how to develop your skillset to become aJava Rockstar?
Subscribe to our newsletter to start Rockingright now!
To get you started we give you our best selling eBooks forFREE!
1. JPA Mini Book
2. JVM Troubleshooting Guide
3. JUnit Tutorial for Unit Testing
4. Java Annotations Tutorial
5. Java Interview Questions
6. Spring Interview Questions
7. Android UI Design
and many more ....
I agree to theTerms andPrivacy Policy

Thank you!

We will contact you soon.

Photo of Mary ZhengMary ZhengDecember 5th, 2017Last Updated: March 4th, 2019
0 2,264 15 minutes read
Photo of Mary Zheng

Mary Zheng

Mary has graduated from Mechanical Engineering department at ShangHai JiaoTong University. She also holds a Master degree in Computer Science from Webster University. During her studies she has been involved with a large number of projects ranging from programming and software engineering. She works as a senior Software Engineer in the telecommunications sector where she acts as a leader and works with others to design, implement, and monitor the software solution.

    Related Articles

    JMS MessageListener Example

    December 8th, 2015

    JMS Queue Example

    November 5th, 2015

    JMS TextMessage Example

    December 15th, 2015

    JMS Client Example

    November 19th, 2015

    JMS Topic Example

    November 12th, 2015
    Subscribe
    Notify of
    guest
    I agree to theTerms andPrivacy Policy
    The comment form collects your name, email and content to allow us keep track of the comments placed on the website. Please read and accept our website Terms and Privacy Policy to post a comment.

    I agree to theTerms andPrivacy Policy
    The comment form collects your name, email and content to allow us keep track of the comments placed on the website. Please read and accept our website Terms and Privacy Policy to post a comment.