1.1 JMS简介
JMS(Java Messaging Service) 是 Java 平台上有关面向消息中间件的技术规范, 它便于消息系统中的Java应用程序进行消息交换,并且通过提供标准的产生、发送、 接收消息的接口简化企业应用的开发。
1.2 ActiveMQ简介
ActiveMQ 是 Apache 出品, 最流行的, 能力强劲的开源消息总线。 ActiveMQ 是一个完全支持 JMS1.1 和 J2EE 1.4 规范的 JMS Provider 实现, 尽管 JMS 规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮演着特殊的地位。
对消息的传递有两种类型:
- 一种是点对点(Queue):消息的生产者和消费者都可以是多个,但是对同一条消息而言,消息的制造者只有一个,消费者也只有一个,生产者和消费者一一对应
- 另一种是发布/订阅模式:即一个生产者产生并发送一个消息后,可以由多个消费者进行接收。
JMS定义的五种消息正文格式
- StreamMessage–Java原始值的数据流
- MapMessage–一套名称-值对
- TextMessage–一个字符串对象
- ObjectMessage–一个序列化的Java对象
- ByteMessage–一个字节的数据流
ActiveMQ安装:
下载并解压
tar -zxvf apache-activemq-5.12.0-bin.tar.gz
|
为apache-activemq-5.12.0赋权
chmod 777 apache-activemq-5.12.0
|
进入apache-activemq-5.12.0\bin,并且给activemq赋权
启动
ActiveMQ的默认管理端口:8161,通信端口:61616
ActiveMQ示例
点对点模式
(1) 创建activemqDemo,引入依赖
<groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.11.2</version> </dependency>
|
(2) 创建QueueProducer(核心步骤)
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFaction("tcp://192.168.25.133:61616");
Connection connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("test-queue");
MessageProductor producer = session.createProducer(queue);
TextMessage textMessage = session.createTextMessage("ActiveMQ消息测试!");
producer.send(textMessage);
producer.close(); session.close(); connection.close();
|
(3)消息消费者
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.133:61616");
Connection connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("test-queue");
MessageConsumer consumer = session.createConsumer(queue);
consumer.setMessageListener(new MessageListener(){ public void onMessage(Message message){ TextMessage textMessage = (TextMessage)message; try{ String text = textMEssage.getText(); System.out.println(text); }catch(JMSException e){ ····· } } });
System.in.read();
consumer.close(); session.close(); connection.close();
|
发布/订阅模式
(1)创建消息生产者
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.133:61616");
Connection connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic("text-topic");
MessageProducer producer = session.CreateProducer(topic);
TextMessage textMessage = session.createTextMEssage("这是一个topic消息测试");
producer.send(textMessage);
producer.close(); session.close(); connection.close();
|
(2)创建消息消费者
ActiveMQConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.25.133:61616");
Connection connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic("test-topic");
MessageConsumer consumer=session.createConsumer(topic);
consumer.setMessageListener(new MessageListener(){ public void onMessage(Message message){ TextMessage textMessage = (TextMessage)message; try{ String text = textMessage.getText(); System.out.println(text); }catch(){ ······· } } });
System.in.read();
consumer.close(); session.close(); connection.close();
|