只要注册ofo就送你10块钱,还等什么,快来注册吧
简介
面向消息队列是一个总体比较合理的应用系统集成方案。
ActiveMQ是JMS消息通信规范的一个实现。消息通信模式主要有发布-订阅、点对点。
基础流程
ActiveMQ启动服务的过程:
- 获得JMS connection factory,通过提供特定环境的连接信息来构造factory。
- 利用factory构造JMS connection.
- 启动connection
- 通过connection创建JMS session.
- 指定JMS destination.
- 创建JMS producer和JMS message并提供destination.
- 创建JMS consumer和注册JMS message listener.
- 发送和接收JMS message.
- 关闭所有JMS资源,包括connection, session, producer, consumer等。
publish-subscribe
发布订阅模式类似于订阅报纸。
例子
publisher
publisher是属于发布信息的一方,它通过定义一个或者多个topic,然后给这些topic发送消息。
publisher的构造函数如下:
public Publisher() throws JMSException {
factory = new ActiveMQConnectionFactory(brokerURL);
connection = factory.createConnection();
try {
connection.start();
} catch (JMSException jmse) {
connection.close();
throw jmse;
}
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
producer = session.createProducer(null);
}
按照前面说的流程定义了基本的connectionFactory
, connection
,session
,producer
。。
接着定义一系列的topic让所有的consumer来订阅
protected void setTopics(String[] stocks) throws JMSException {
destinations = new Destination[stocks.length];
for(int i = 0; i < stocks.length; i++) {
destinations[i] = session.createTopic("STOCKS." + stocks[i]);
}
}
定义好topic之后要给这些指定的topic发消息:
protected void sendMessage(String[] stocks) throws JMSException {
for(int i = 0; i < stocks.length; i++) {
Message message = createStockMessage(stocks[i], session);
System.out.println("Sending: " + ((ActiveMQMapMessage)message).getContentMap() + " on destination: " + destinations[i]);
producer.send(destinations[i], message);
}
}
protected Message createStockMessage(String stock, Session session) throws JMSException {
MapMessage message = session.createMapMessage();
message.setString("stock", stock);
message.setDouble("price", 1.00);
message.setDouble("offer", 0.01);
message.setBoolean("up", true);
return message;
}
在sendMessage
方法里遍历每个topic
,然后给每个topic
发送定义的Message
消息。
publisher发布消息
public static void main(String[] args) throws JMSException {
if(args.length < 1)
throw new IllegalArgumentException();
// Create publisher
Publisher publisher = new Publisher();
// Set topics
publisher.setTopics(args);
for(int i = 0; i < 10; i++) {
publisher.sendMessage(args);
System.out.println("Publisher '" + i + " price messages");
try {
Thread.sleep(1000);
} catch(InterruptedException e) {
e.printStackTrace();
}
}
// Close all resources
publisher.close();
}
close
方法关闭资源:
public void close() throws JMSException {
if (connection != null) {
connection.close();
}
}
consumer
具体的步骤:
1.初始化资源。
- 接收消息。
- 必要的时候关闭资源。
初始化资源放到构造函数里面:
public Consumer() throws JMSException {
factory = new ActiveMQConnectionFactory(brokerURL);
connection = factory.createConnection();
connection.start();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
}
接收和处理消息的方法
分为同步和异步的:
- 同步
MessageConsumer.receive()
方法 异步 注册
MessageListener
,使用MessageConsumer.setMessageListener()
。public static void main(String[] args) throws JMSException {
Consumer consumer = new Consumer(); for (String stock : args) { Destination destination = consumer.getSession().createTopic("STOCKS." + stock); MessageConsumer messageConsumer = consumer.getSession().createConsumer(destination); messageConsumer.setMessageListener(new Listener()); }
}
public Session getSession() {
return session;
}
这里的代码不要当真了,写得很烂
Listener
负责处理接收到的消息:
public class Listener implements MessageListener {
public void onMessage(Message message) {
try {
MapMessage map = (MapMessage)message;
String stock = map.getString("stock");
double price = map.getDouble("price");
double offer = map.getDouble("offer");
boolean up = map.getBoolean("up");
DecimalFormat df = new DecimalFormat( "#,###,###,##0.00" );
System.out.println(stock + "\t" + df.format(price) + "\t" + df.format(offer) + "\t" + (up?"up":"down"));
} catch (Exception e) {
e.printStackTrace();
}
}
}
实现了MessageListener
接口,里面的onMessage
方法在接收到消息之后会被调用的方法。
实现pub-sub模式的步骤
两者设定一个共同的topic。
在publisher端通过session
创建producer
,根据指定的参数创建destination
,然后将消息和destination
作为producer.send()
方法的参数。
在consumer端也要创建类似的connection
,session
。通过session
得到destination
,再通过session.createConsumer(destination)
来得到一个MessageConsumer
对象。有了这个MessageConsumer
就可以自行选择是直接同步的receive
消息还是注册listener了。
p2p
在p2p的场景里,相互通信的双方是通过一个类似于队列的方式来进行交流。和pub-sub的区别在于一个消息会发送给订阅此topic的多个订阅者,而在p2p里queue的消息只能被一个接受者接受。
发送者
public Publisher() throws JMSException {
factory = new ActiveMQConnectionFactory(brokerURL);
connection = factory.createConnection();
connection.start();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
producer = session.createProducer(null);
}
发送消息:
public void sendMessage() throws JMSException {
for(int i = 0; i < jobs.length; i++)
{
String job = jobs[i];
Destination destination = session.createQueue("JOBS." + job);
Message message = session.createObjectMessage(i);
System.out.println("Sending: id: " + ((ObjectMessage)message).getObject() + " on queue: " + destination);
producer.send(destination, message);
}
}
消息发送者的启动代码:
public static void main(String[] args) throws JMSException {
Publisher publisher = new Publisher();
for(int i = 0; i < 10; i++) {
publisher.sendMessage();
System.out.println("Published " + i + " job messages");
try {
Thread.sleep(1000);
} catch (InterruptedException x) {
e.printStackTrace();
}
}
publisher.close();
}
在这里发送10条消息,在每个sendMessage
的方法里实际上是针对每个queue
发送了10条。
接收者
public Consumer() throws JMSException {
factory = new ActiveMQConnectionFactory(brokerURL);
connection = factory.createConnection();
connection.start();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
}
public static void main(String[] args) throws JMSException {
Consumer consumer = new Consumer();
for (String job : consumer.jobs) {
Destination destination = consumer.getSession().createQueue("JOBS." + job);
MessageConsumer messageConsumer = consumer.getSession().createConsumer(destination);
messageConsumer.setMessageListener(new Listener(job));
}
}
public Session getSession() {
return session;
}
MessageListener
接口实现类
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.ObjectMessage;
public class Listener implements MessageListener {
private String job;
public Listener(String job) {
this.job = job;
}
public void onMessage(Message message) {
try {
//do something here
System.out.println(job + " id:" + ((ObjectMessage)message).getObject());
} catch (Exception e) {
e.printStackTrace();
}
}
}
比较pub-sub和p2p模式
基本的处理流程都是类似的,除了在pub-sub中要通过createTopic
来设置topic,而在p2p中要通过createQueue
来创建通信队列。
request-response
和前面两种方式比较起来,request-response的通信方式很常见,但是不是默认提供的一种模式。在前面的两种模式中都是一方负责发送消息而另外一方负责处理。而实际中的很多应用需要双方都能给对方发送消息。
请求-应答方式并不是JMS规范系统默认提供的一种通信方式,而是通过在现有通信方式的基础上稍微运用一点技巧实现的。
以下这种方式只能说是很差,并不是什么高明的做法
在JMS里面,如果要实现请求/应答的方式,可以利用JMSReplyTo
和JMSCorrelationID
消息头来将通信的双方关联起来。另外,QueueRequestor
和TopicRequestor
能够支持简单的请求/应答过程。
// client side
Destination tempDest = session.createTemporaryQueue();
MessageConsumer responseConsumer = session.createConsumer(tempDest);
...
// send a request..
message.setJMSReplyTo(tempDest)
message.setJMSCorrelationID(myCorrelationID);
producer.send(message);
client端创建一个临时队列并在发送的消息里指定了发送返回消息的destination
以及correlationID
。那么在处理消息的server端得到这个消息后就知道该发送给谁了。Server端的大致流程如下:
public void onMessage(Message request) {
Message response = session.createMessage();
response.setJMSCorrelationID(request.getJMSCorrelationID())
producer.send(request.getJMSReplyTo(), response)
}
这里是在server端注册MessageListener
,通过设置返回信息的CorrelationID
和JMSReplyTo
将信息返回。
Client:
public Client() {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
Connection connection;
try {
connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(transacted, ackMode);
Destination adminQueue = session.createQueue(clientQueueName);
//Setup a message producer to send message to the queue the server is consuming from
this.producer = session.createProducer(adminQueue);
this.producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
//Create a temporary queue that this client will listen for responses on then create a consumer
//that consumes message from this temporary queue...for a real application a client should reuse
//the same temp queue for each message to the server...one temp queue per client
Destination tempDest = session.createTemporaryQueue();
MessageConsumer responseConsumer = session.createConsumer(tempDest);
//This class will handle the messages to the temp queue as well
responseConsumer.setMessageListener(this);
//Now create the actual message you want to send
TextMessage txtMessage = session.createTextMessage();
txtMessage.setText("MyProtocolMessage");
//Set the reply to field to the temp queue you created above, this is the queue the server
//will respond to
txtMessage.setJMSReplyTo(tempDest);
//Set a correlation ID so when you get a response you know which sent message the response is for
//If there is never more than one outstanding message to the server then the
//same correlation ID can be used for all the messages...if there is more than one outstanding
//message to the server you would presumably want to associate the correlation ID with this
//message somehow...a Map works good
String correlationId = this.createRandomString();
txtMessage.setJMSCorrelationID(correlationId);
this.producer.send(txtMessage);
} catch (JMSException e) {
//Handle the exception appropriately
}
}
这里的代码除了初始化构造函数里的参数还同时设置了两个destination,一个是自己要发送消息出去的destination,在session.createProducer(adminQueue);
这一句设置。另外一个是自己要接收的消息destination, 通过Destination tempDest = session.createTemporaryQueue(); responseConsumer = session.createConsumer(tempDest);
这两句指定了要接收消息的目的地。这里是用的一个临时队列。在前面指定了返回消息的通信队列之后,需要通知server端知道发送返回消息给哪个队列。于是txtMessage.setJMSReplyTo(tempDest);
指定了这一部分,同时txtMessage.setJMSCorrelationID(correlationId);
方法主要是为了保证每次发送回来请求的server端能够知道对应的是哪个请求。这里一个请求和一个应答是相当于对应一个相同的序列号一样。
同时,因为client端在发送消息之后还要接收server端返回的消息,所以它也要实现一个消息receiver的功能。这里采用实现MessageListener
接口的方式:
public void onMessage(Message message) {
String messageText = null;
try {
if (message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
messageText = textMessage.getText();
System.out.println("messageText = " + messageText);
}
} catch (JMSException e) {
//Handle the exception appropriately
}
}
Server:
这里server端要执行的过程和client端相反,它是先接收消息,在接收到消息后根据提供的JMSCorelationID
来发送返回的消息:
public void onMessage(Message message) {
try {
TextMessage response = this.session.createTextMessage();
if (message instanceof TextMessage) {
TextMessage txtMsg = (TextMessage) message;
String messageText = txtMsg.getText();
response.setText(this.messageProtocol.handleProtocolMessage(messageText));
}
//Set the correlation ID from the received message to be the correlation id of the response message
//this lets the client identify which message this is a response to if it has more than
//one outstanding message to the server
response.setJMSCorrelationID(message.getJMSCorrelationID());
//Send the response to the Destination specified by the JMSReplyTo field of the received message,
//this is presumably a temporary queue created by the client
this.replyProducer.send(message.getJMSReplyTo(), response);
} catch (JMSException e) {
//Handle the exception appropriately
}
}
前面,在replyProducer.send()
方法里,message.getJMSReplyTo()
就得到了要发送消息回去的destination。
另外,设置这些发送返回信息的replyProducer的信息主要在构造函数相关的方法里实现了:
public Server() {
try {
//This message broker is embedded
BrokerService broker = new BrokerService();
broker.setPersistent(false);
broker.setUseJmx(false);
broker.addConnector(messageBrokerUrl);
broker.start();
} catch (Exception e) {
//Handle the exception appropriately
}
//Delegating the handling of messages to another class, instantiate it before setting up JMS so it
//is ready to handle messages
this.messageProtocol = new MessageProtocol();
this.setupMessageQueueConsumer();
}
private void setupMessageQueueConsumer() {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(messageBrokerUrl);
Connection connection;
try {
connection = connectionFactory.createConnection();
connection.start();
this.session = connection.createSession(this.transacted, ackMode);
Destination adminQueue = this.session.createQueue(messageQueueName);
//Setup a message producer to respond to messages from clients, we will get the destination
//to send to from the JMSReplyTo header field from a Message
this.replyProducer = this.session.createProducer(null);
this.replyProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
//Set up a consumer to consume messages off of the admin queue
MessageConsumer consumer = this.session.createConsumer(adminQueue);
consumer.setMessageListener(this);
} catch (JMSException e) {
//Handle the exception appropriately
}
}
对于请求/应答的方式来说,这种典型交互的过程就是Client端在设定正常发送请求的Queue同时也设定一个临时的Queue。同时在要发送的message里头指定要返回消息的destination以及CorelationID,这些就好比是一封信里面所带的回执。根据这个信息人家才知道怎么给你回信。对于Server端来说则要额外创建一个producer,在处理接收到消息的方法里再利用producer将消息发回去。这一系列的过程看起来很像http协议里面请求-应答的方式,都是一问一答。
一些应用和改进
回顾前面三种基本的通信方式,发现它们都存在着一定的共同点,比如说都要初始化ConnectionFactory
, Connection
, Session
等。在使用完之后都要将这些资源关闭。如果每一个实现它的通信端都这么写一通的话,其实是一种简单的重复。从工程的角度来看是完全没有必要的。
解决方法
通过工厂方法封装这些对象的创建和销毁,然后简单的通过调用工厂方法的方式得到它们。
既然基本的流程都是在开头创建资源在结尾销毁,也可以采用Template Method模式的思路。通过继承一个抽象类,在抽象类里提供了资源的封装。所有继承的类只要实现怎么去使用这些资源的方法就可以了。
中国人都在使用的地球上最好玩的游戏
中国人都在使用的地球上最好玩的游戏
中国人都在使用的地球上最快的浏览器
中国人都在使用的地球上最厉害的安全软件
中国人都在使用的地球上最好的看图王
中国人都在使用的地球上最快速的视频软件
中国人都在使用的地球上最全的视频软件
中国人都在使用的地球上最好最全的压缩软件
中国人都在使用的地球上最好的音乐播放器
中国人都在使用的地球上最安全的杀毒软件
中国人都在使用的地球上最全的影视大全