Message middleware solution JMS
1. What is message middleware
Messaging middleware uses efficient and reliable message delivery mechanisms for platform-independent data exchange, and integrates distributed systems based on data communication. By providing message passing and message queuing models, it can expand the communication between processes in a distributed environment. For message middleware, the common roles are roughly Producer (producer), Consumer (consumer)
Common messaging middleware products
(1) ActiveMQ
ActiveMQ is the most popular and powerful open source message bus produced by Apache. ActiveMQ is a JMS Provider implementation that fully supports JMS1.1 and J2EE 1.4 specifications. We introduce the use of ActiveMQ in this course.
(2) RabbitMQ
The leading realization of AMQP protocol supports multiple scenarios. Taobao's MySQL cluster uses it for communication internally. The communication component of the OpenStack open source cloud platform was first used in the financial industry.
(3) ZeroMQ
The fastest message queuing system in history
(4) Kafka
A sub-project under Apache. Features: High throughput, a throughput rate of 10W/s can be reached on an ordinary server; a completely distributed system. Suitable for processing massive data.
2. Introduction to JMS
2.1, what is JMS
JMS (Java Messaging Service) is a technical specification for message-oriented middleware on the Java platform. It is convenient for Java applications in the messaging system to exchange messages, and it simplifies the development of enterprise applications by providing standard interfaces for generating, sending, and receiving messages .
JMS itself only defines a series of interface specifications, which is a vendor-independent API for accessing messaging systems. It is similar to JDBC (java Database Connectivity): here, JDBC is an API that can be used to access many different relational databases, while JMS provides the same access method independent of vendors to access messaging services. Many vendors currently support JMS, including IBM's MQSeries, BEA's Weblogic JMS service, and Progress's SonicMQ. These are just a few examples. JMS enables you to send messages from one JMS client to another JML client through a messaging service (sometimes called a message broker or router). A message is a type of object in JMS, which consists of two parts: a header and a message body. The header consists of routing information and metadata about the message. The message body carries the data or payload of the application.
JMS defines five different message body formats, as well as the message types called, allowing you to send and receive data in different forms, providing
Some levels of compatibility with existing message formats.
· TextMessage--a string object
· MapMessage--a set of name-value pairs
· ObjectMessage--a serialized Java object
· BytesMessage--a byte data stream
StreamMessage - data stream of Java primitive values
2.2, JMS messaging type
There are two types of message delivery
One is point-to-point, that is, one producer and one consumer correspond one-to-one.
The other is the publish/subscribe model, that is, after a producer generates a message and sends it, it can be received by multiple consumers.
3. ActiveMQ download and installation
3.1, official website download
Download from the official website: http://activemq.apache.org/
3.2. Installation (Linux)
(1) Upload apache-activemq-5.12.0-bin.tar.gz to the server
(2) Unzip this file
tar zxvf apache-activemq-5.12.0-bin.tar.gz
(3)为apache-activemq-5.12.0Directory empowerment
chmod 777 apache-activemq-5.12.0
(6)start up
./activemq start
The following prompt appears to indicate success!
Assuming the server address is 192.168.25.135, open the browser and enter the address http://192.168.25.135:8161/ You can enter the ActiveMQ management page
Click to enter the management page
Enter the user name and password are both admin
Enter the main interface
Point-to-point message queue
The meaning of each column in the list
Number Of Pending Messages: Messages waiting to be consumed This is the number of messages currently not out of the queue.
Number Of Consumers: Consumers This is the number of consumers on the consumer side
Messages Enqueued: The total number of messages entering the queue, including those out of the queue.
Messages Dequeued: Messages out of the queue can be understood as the amount consumed.
4, JMS small Demo
4.1, point-to-point mode
The point-to-point mode is mainly based on a queue. When connecting to a queue, the sender does not need to know whether the receiver is receiving or not. It can directly send a message to ActiveMQ. The sent message will enter the queue first. If there is a receiver When listening, it will be sent to the receiving end. If there is no receiving end, it will be stored in the activemq server until the receiving end receives the message. The point-to-point message mode can have multiple senders and multiple receivers, but one message is only It will be received by a receiving end, and which receiving end is connected to ActiveMQ first will receive it first, and the subsequent receiving end will not receive the message.
4.2, message producer
(1) Create the project jmsDemo and introduce dependencies
org.apache.activemqactivemq-client5.13.4
(2) Create the main method code of the class QueueProducer as follows:
//1.创建连接工厂 ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.25.135:61616"); //2.获取连接 Connection connection = connectionFactory.createConnection(); //3.启动连接 connection.start(); //4.获取session (参数1:是否启动事务,参数2:消息确认模式) Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //5.创建队列对象 Queue queue = session.createQueue("test-queue"); //6.创建消息生产者 MessageProducer producer = session.createProducer(queue); //7.创建消息 TextMessage textMessage = session.createTextMessage("欢迎来到神奇的品优购世界"); //8.发送消息 producer.send(textMessage); //9.关闭资源 producer.close(); session.close(); connection.close();
Two parameters of session creation in step 4 of the above code:
The first parameter is whether to use transaction
The second parameter is the confirmation mode of the message
• AUTO_ACKNOWLEDGE = 1 automatic confirmation
• CLIENT_ACKNOWLEDGE = 2 The client confirms manually
• DUPS_OK_ACKNOWLEDGE = 3 automatic batch confirmation
• SESSION_TRANSACTED = 0 transaction is submitted and confirmed
Query through the ActiveMQ management interface after running
4.3、消息消费者
创建类QueueConsumer ,main方法代码如下:
//1.创建连接工厂 ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.25.135:61616"); //2.获取连接 Connection connection = connectionFactory.createConnection(); //3.启动连接 connection.start(); //4.获取session (参数1:是否启动事务,参数2:消息确认模式) Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //5.创建队列对象 Queue queue = session.createQueue("test-queue"); //6.创建消息消费 MessageConsumer consumer = session.createConsumer(queue); //7.监听消息 consumer.setMessageListener(new MessageListener() { public void onMessage(Message message) { TextMessage textMessage=(TextMessage)message; try { System.out.println("接收到消息:"+textMessage.getText()); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }); //8.等待键盘输入 System.in.read(); //9.关闭资源 consumer.close(); session.close(); connection.close();
See the console output after execution
Run test
Open two or more consumers at the same time, run the producer again, observe the output of each consumer's console, you will find that only one consumer will receive the message.
5. Publish/subscribe model
5.1, message producer
Create a class TopicProducer, the main method code is as follows:
//1.创建连接工厂 ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.25.135:61616"); //2.获取连接 Connection connection = connectionFactory.createConnection(); //3.启动连接 connection.start(); //4.获取session (参数1:是否启动事务,参数2:消息确认模式) Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //5.创建主题对象 Topic topic = session.createTopic("test-topic"); //6.创建消息生产者 MessageProducer producer = session.createProducer(topic); //7.创建消息 TextMessage textMessage = session.createTextMessage("欢迎来到神奇的品优购世界"); //8.发送消息 producer.send(textMessage); //9.关闭资源 producer.close(); session.close(); connection.close();
running result
5.2, message consumers
Create a class TopicConsumer, the main method code is as follows:
//1.创建连接工厂 ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.25.135:61616"); //2.获取连接 Connection connection = connectionFactory.createConnection(); //3.启动连接 connection.start(); //4.获取session (参数1:是否启动事务,参数2:消息确认模式) Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //5.创建主题对象 //Queue queue = session.createQueue("test-queue"); Topic topic = session.createTopic("test-topic"); //6.创建消息消费 MessageConsumer consumer = session.createConsumer(topic); //7.监听消息 consumer.setMessageListener(new MessageListener() { public void onMessage(Message message) { TextMessage textMessage=(TextMessage)message; try { System.out.println("接收到消息:"+textMessage.getText()); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }); //8.等待键盘输入 System.in.read(); //9.关闭资源 consumer.close(); session.close(); connection.close();
5.3, run the test
Open more than 2 consumers at the same time, run the producer again, observe the output of each consumer's console, you will find that each consumer will receive the message.
6. Spring integrates JMS
6.1, point-to-point mode
Message producer
(1) Create the project springjms_producer, and introduce SpringJms, activeMQ and unit test related dependencies in the POM file
(2) Create the spring configuration file applicationContext-jms-producer.xml under src/main/resources
(3) Create a message producer class under the cn.itcast.demo package
@Componentpublic class QueueProducer { @Autowired private JmsTemplate jmsTemplate; @Autowired private Destination queueTextDestination; /** * 发送文本消息 * @param text */ public void sendTextMessage(final String text){ jmsTemplate.send(queueTextDestination, new MessageCreator() { public Message createMessage(Session session) throws JMSException { return session.createTextMessage(text); } }); }}
(4) Unit testing
Create a test class in src/test/java
@RunWith(SpringJUnit4ClassRunner.class)@ContextConfiguration(locations="classpath:applicationContext-jms-producer.xml")public class TestQueue { @Autowired private QueueProducer queueProducer; @Test public void testSend(){ queueProducer.sendTextMessage("SpringJms-点对点"); } }
Message consumer
(1) Create the springjms_consumer project and introduce dependencies in the POM file (same as the previous project)
(2) Create the configuration file applicationContext-jms-consumer-queue.xml
(3) Write monitoring class
public class MyMessageListener implements MessageListener { public void onMessage(Message message) { TextMessage textMessage=(TextMessage)message; try { System.out.println("接收到消息:"+textMessage.getText()); } catch (JMSException e) { e.printStackTrace(); } }}
(4) Create a test class
@RunWith(SpringJUnit4ClassRunner.class)@ContextConfiguration(locations="classpath:applicationContext-jms-consumer-queue.xml")public class TestQueue { @Test public void testQueue(){ try { System.in.read(); } catch (IOException e) { e.printStackTrace(); } } }
6.2, publish/subscribe model
Message producer
(1) Add configuration in applicationContext-jms-producer.xml of project springjms_producer
(2) Create a producer class
@Componentpublic class TopicProducer { @Autowired private JmsTemplate jmsTemplate; @Autowired private Destination topicTextDestination; /** * 发送文本消息 * @param text */ public void sendTextMessage(final String text){ jmsTemplate.send(topicTextDestination, new MessageCreator() { public Message createMessage(Session session) throws JMSException { return session.createTextMessage(text); } }); }}
(3) Write test class
import org.junit.Test;import org.junit.runner.RunWith;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.test.context.ContextConfiguration;import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;import cn.itcast.demo.TopicProducer;@RunWith(SpringJUnit4ClassRunner.class)@ContextConfiguration(locations="classpath:applicationContext-activemq-producer.xml")public class TestTopic { @Autowired private TopicProducer topicProducer; @Test public void sendTextQueue(){ topicProducer.sendTextMessage(); } }
Message consumer
(1) Create the configuration file applicationContext-jms-consumer-topic.xml in the activemq-spring-consumer project
(2) Write test class
@RunWith(SpringJUnit4ClassRunner.class)@ContextConfiguration(locations="classpath:applicationContext-jms-consumer-topic.xml")public class TestTopic { @Test public void testTopic(){ try { System.in.read(); } catch (IOException e) { e.printStackTrace(); } } }