activemq的几种基本通信方式总结

中国人最喜欢访问的网站
只要注册ofo就送你10块钱,还等什么,快来注册吧

简介

面向消息队列是一个总体比较合理的应用系统集成方案。
ActiveMQ是JMS消息通信规范的一个实现。消息通信模式主要有发布-订阅、点对点。

基础流程

ActiveMQ启动服务的过程:

  1. 获得JMS connection factory,通过提供特定环境的连接信息来构造factory。
  2. 利用factory构造JMS connection.
  3. 启动connection
  4. 通过connection创建JMS session.
  5. 指定JMS destination.
  6. 创建JMS producer和JMS message并提供destination.
  7. 创建JMS consumer和注册JMS message listener.
  8. 发送和接收JMS message.
  9. 关闭所有JMS资源,包括connection, session, producer, consumer等。

publish-subscribe

发布订阅模式类似于订阅报纸。

JMS发布订阅模式示意图

例子

publisher

publisher是属于发布信息的一方,它通过定义一个或者多个topic,然后给这些topic发送消息。
publisher的构造函数如下:

public Publisher() throws JMSException {  
    factory = new ActiveMQConnectionFactory(brokerURL);  
    connection = factory.createConnection();  
    try {  
        connection.start();  
    } catch (JMSException jmse) {  
        connection.close();  
        throw jmse;  
    }  
    session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);  
    producer = session.createProducer(null);  
}

按照前面说的流程定义了基本的connectionFactory, connection,session,producer。。

接着定义一系列的topic让所有的consumer来订阅

protected void setTopics(String[] stocks) throws JMSException {  
    destinations = new Destination[stocks.length];  
    for(int i = 0; i < stocks.length; i++) {  
        destinations[i] = session.createTopic("STOCKS." + stocks[i]);  
    }  
}  

定义好topic之后要给这些指定的topic发消息:

protected void sendMessage(String[] stocks) throws JMSException {  
    for(int i = 0; i < stocks.length; i++) {  
        Message message = createStockMessage(stocks[i], session);  
        System.out.println("Sending: " + ((ActiveMQMapMessage)message).getContentMap() + " on destination: " + destinations[i]);  
        producer.send(destinations[i], message);  
    }  
}  

protected Message createStockMessage(String stock, Session session) throws JMSException {  
    MapMessage message = session.createMapMessage();  
    message.setString("stock", stock);  
    message.setDouble("price", 1.00);  
    message.setDouble("offer", 0.01);  
    message.setBoolean("up", true);          
    return message;  
}  

sendMessage方法里遍历每个topic,然后给每个topic发送定义的Message消息。

publisher发布消息

public static void main(String[] args) throws JMSException {  
    if(args.length < 1)  
        throw new IllegalArgumentException();  

    // Create publisher       
    Publisher publisher = new Publisher();  

    // Set topics  
    publisher.setTopics(args);  

    for(int i = 0; i < 10; i++) {  
        publisher.sendMessage(args);  
        System.out.println("Publisher '" + i + " price messages");  
        try {  
            Thread.sleep(1000);  
        } catch(InterruptedException e) {  
            e.printStackTrace();  
        }  
    }  
    // Close all resources  
    publisher.close();  
}  

close方法关闭资源:

public void close() throws JMSException {  
    if (connection != null) {  
        connection.close();  
    }  
}  

consumer

具体的步骤:
1.初始化资源。

  1. 接收消息。
  2. 必要的时候关闭资源。

初始化资源放到构造函数里面:

public Consumer() throws JMSException {  
    factory = new ActiveMQConnectionFactory(brokerURL);  
    connection = factory.createConnection();  
    connection.start();  
    session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);  
}  

接收和处理消息的方法

分为同步和异步的:

  • 同步 MessageConsumer.receive()方法
  • 异步 注册MessageListener,使用MessageConsumer.setMessageListener()

    public static void main(String[] args) throws JMSException {

    Consumer consumer = new Consumer();  
    for (String stock : args) {  
        Destination destination = consumer.getSession().createTopic("STOCKS." + stock);  
        MessageConsumer messageConsumer = consumer.getSession().createConsumer(destination);  
        messageConsumer.setMessageListener(new Listener());  
    }  
    

    }

    public Session getSession() {

    return session;  
    

    }

这里的代码不要当真了,写得很烂

Listener负责处理接收到的消息:

public class Listener implements MessageListener {  
    public void onMessage(Message message) {  
        try {  
            MapMessage map = (MapMessage)message;  
            String stock = map.getString("stock");  
            double price = map.getDouble("price");  
            double offer = map.getDouble("offer");  
            boolean up = map.getBoolean("up");  
            DecimalFormat df = new DecimalFormat( "#,###,###,##0.00" );  
            System.out.println(stock + "\t" + df.format(price) + "\t" + df.format(offer) + "\t" + (up?"up":"down"));  
        } catch (Exception e) {  
            e.printStackTrace();  
        }  
    }  
}  

实现了MessageListener接口,里面的onMessage方法在接收到消息之后会被调用的方法。

实现pub-sub模式的步骤

两者设定一个共同的topic。

在publisher端通过session创建producer,根据指定的参数创建destination,然后将消息和destination作为producer.send()方法的参数。

在consumer端也要创建类似的connection,session。通过session得到destination,再通过session.createConsumer(destination)来得到一个MessageConsumer对象。有了这个MessageConsumer就可以自行选择是直接同步的receive消息还是注册listener了。

p2p

JMS点对点模式示意图

在p2p的场景里,相互通信的双方是通过一个类似于队列的方式来进行交流。和pub-sub的区别在于一个消息会发送给订阅此topic的多个订阅者,而在p2p里queue的消息只能被一个接受者接受。

发送者

public Publisher() throws JMSException {  
    factory = new ActiveMQConnectionFactory(brokerURL);  
    connection = factory.createConnection();  
    connection.start();  
    session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);  
    producer = session.createProducer(null);  
}

发送消息:

public void sendMessage() throws JMSException {  
    for(int i = 0; i < jobs.length; i++)  
    {  
        String job = jobs[i];  
        Destination destination = session.createQueue("JOBS." + job);  
        Message message = session.createObjectMessage(i);  
        System.out.println("Sending: id: " + ((ObjectMessage)message).getObject() + " on queue: " + destination);  
        producer.send(destination, message);  
    }  
}  

消息发送者的启动代码:

public static void main(String[] args) throws JMSException {  
    Publisher publisher = new Publisher();  
    for(int i = 0; i < 10; i++) {  
        publisher.sendMessage();  
        System.out.println("Published " + i + " job messages");  
    try {  
            Thread.sleep(1000);  
        } catch (InterruptedException x) {  
            e.printStackTrace();  
        }  
    }  
    publisher.close();  
}  

在这里发送10条消息,在每个sendMessage的方法里实际上是针对每个queue发送了10条。

接收者

public Consumer() throws JMSException {  
    factory = new ActiveMQConnectionFactory(brokerURL);  
    connection = factory.createConnection();  
    connection.start();  
    session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);  
}  


public static void main(String[] args) throws JMSException {  
        Consumer consumer = new Consumer();  
        for (String job : consumer.jobs) {  
            Destination destination = consumer.getSession().createQueue("JOBS." + job);  
            MessageConsumer messageConsumer = consumer.getSession().createConsumer(destination);  
            messageConsumer.setMessageListener(new Listener(job));  
        }  
    }  

    public Session getSession() {  
        return session;  
    }

MessageListener接口实现类

import javax.jms.Message;  
import javax.jms.MessageListener;  
import javax.jms.ObjectMessage;  

public class Listener implements MessageListener {        
    private String job;  

    public Listener(String job) {  
        this.job = job;  
    }  

    public void onMessage(Message message) {  
        try {  
            //do something here  
            System.out.println(job + " id:" + ((ObjectMessage)message).getObject());  
        } catch (Exception e) {  
            e.printStackTrace();  
        }  
    }        
}

比较pub-sub和p2p模式

基本的处理流程都是类似的,除了在pub-sub中要通过createTopic来设置topic,而在p2p中要通过createQueue来创建通信队列。

request-response

和前面两种方式比较起来,request-response的通信方式很常见,但是不是默认提供的一种模式。在前面的两种模式中都是一方负责发送消息而另外一方负责处理。而实际中的很多应用需要双方都能给对方发送消息。
请求-应答方式并不是JMS规范系统默认提供的一种通信方式,而是通过在现有通信方式的基础上稍微运用一点技巧实现的。

以下这种方式只能说是很差,并不是什么高明的做法

JMS实际应用中产生的应答方式示意图
在JMS里面,如果要实现请求/应答的方式,可以利用JMSReplyToJMSCorrelationID消息头来将通信的双方关联起来。另外,QueueRequestorTopicRequestor能够支持简单的请求/应答过程。

// client side  
Destination tempDest = session.createTemporaryQueue();  
MessageConsumer responseConsumer = session.createConsumer(tempDest);  
...  

// send a request..  
message.setJMSReplyTo(tempDest)  
message.setJMSCorrelationID(myCorrelationID);  

producer.send(message);  

client端创建一个临时队列并在发送的消息里指定了发送返回消息的destination以及correlationID。那么在处理消息的server端得到这个消息后就知道该发送给谁了。Server端的大致流程如下:

public void onMessage(Message request) {  

  Message response = session.createMessage();  
  response.setJMSCorrelationID(request.getJMSCorrelationID())  

  producer.send(request.getJMSReplyTo(), response)  
}  

这里是在server端注册MessageListener,通过设置返回信息的CorrelationIDJMSReplyTo将信息返回。

Client:

public Client() {  
    ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");  
    Connection connection;  
    try {  
        connection = connectionFactory.createConnection();  
        connection.start();  
        Session session = connection.createSession(transacted, ackMode);  
        Destination adminQueue = session.createQueue(clientQueueName);  

        //Setup a message producer to send message to the queue the server is consuming from  
        this.producer = session.createProducer(adminQueue);  
        this.producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);  

        //Create a temporary queue that this client will listen for responses on then create a consumer  
        //that consumes message from this temporary queue...for a real application a client should reuse  
        //the same temp queue for each message to the server...one temp queue per client  
        Destination tempDest = session.createTemporaryQueue();  
        MessageConsumer responseConsumer = session.createConsumer(tempDest);  

        //This class will handle the messages to the temp queue as well  
        responseConsumer.setMessageListener(this);  

        //Now create the actual message you want to send  
        TextMessage txtMessage = session.createTextMessage();  
        txtMessage.setText("MyProtocolMessage");  

        //Set the reply to field to the temp queue you created above, this is the queue the server  
        //will respond to  
        txtMessage.setJMSReplyTo(tempDest);  

        //Set a correlation ID so when you get a response you know which sent message the response is for  
        //If there is never more than one outstanding message to the server then the  
        //same correlation ID can be used for all the messages...if there is more than one outstanding  
        //message to the server you would presumably want to associate the correlation ID with this  
        //message somehow...a Map works good  
        String correlationId = this.createRandomString();  
        txtMessage.setJMSCorrelationID(correlationId);  
        this.producer.send(txtMessage);  
    } catch (JMSException e) {  
        //Handle the exception appropriately  
    }  
}  

这里的代码除了初始化构造函数里的参数还同时设置了两个destination,一个是自己要发送消息出去的destination,在session.createProducer(adminQueue);这一句设置。另外一个是自己要接收的消息destination, 通过Destination tempDest = session.createTemporaryQueue(); responseConsumer = session.createConsumer(tempDest); 这两句指定了要接收消息的目的地。这里是用的一个临时队列。在前面指定了返回消息的通信队列之后,需要通知server端知道发送返回消息给哪个队列。于是txtMessage.setJMSReplyTo(tempDest);指定了这一部分,同时txtMessage.setJMSCorrelationID(correlationId);方法主要是为了保证每次发送回来请求的server端能够知道对应的是哪个请求。这里一个请求和一个应答是相当于对应一个相同的序列号一样。

同时,因为client端在发送消息之后还要接收server端返回的消息,所以它也要实现一个消息receiver的功能。这里采用实现MessageListener接口的方式:

public void onMessage(Message message) {  
    String messageText = null;  
    try {  
        if (message instanceof TextMessage) {  
            TextMessage textMessage = (TextMessage) message;  
            messageText = textMessage.getText();  
            System.out.println("messageText = " + messageText);  
        }  
    } catch (JMSException e) {  
        //Handle the exception appropriately  
    }  
}  

Server:

这里server端要执行的过程和client端相反,它是先接收消息,在接收到消息后根据提供的JMSCorelationID来发送返回的消息:

public void onMessage(Message message) {  
    try {  
        TextMessage response = this.session.createTextMessage();  
        if (message instanceof TextMessage) {  
            TextMessage txtMsg = (TextMessage) message;  
            String messageText = txtMsg.getText();  
            response.setText(this.messageProtocol.handleProtocolMessage(messageText));  
        }  

        //Set the correlation ID from the received message to be the correlation id of the response message  
        //this lets the client identify which message this is a response to if it has more than  
        //one outstanding message to the server  
        response.setJMSCorrelationID(message.getJMSCorrelationID());  

        //Send the response to the Destination specified by the JMSReplyTo field of the received message,  
        //this is presumably a temporary queue created by the client  
        this.replyProducer.send(message.getJMSReplyTo(), response);  
    } catch (JMSException e) {  
        //Handle the exception appropriately  
    }  
}

前面,在replyProducer.send()方法里,message.getJMSReplyTo()就得到了要发送消息回去的destination。
另外,设置这些发送返回信息的replyProducer的信息主要在构造函数相关的方法里实现了:

public Server() {  
    try {  
        //This message broker is embedded  
        BrokerService broker = new BrokerService();  
        broker.setPersistent(false);  
        broker.setUseJmx(false);  
        broker.addConnector(messageBrokerUrl);  
        broker.start();  
    } catch (Exception e) {  
        //Handle the exception appropriately  
    }  

    //Delegating the handling of messages to another class, instantiate it before setting up JMS so it  
    //is ready to handle messages  
    this.messageProtocol = new MessageProtocol();  
    this.setupMessageQueueConsumer();  
}  

private void setupMessageQueueConsumer() {  
    ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(messageBrokerUrl);  
    Connection connection;  
    try {  
        connection = connectionFactory.createConnection();  
        connection.start();  
        this.session = connection.createSession(this.transacted, ackMode);  
        Destination adminQueue = this.session.createQueue(messageQueueName);  

        //Setup a message producer to respond to messages from clients, we will get the destination  
        //to send to from the JMSReplyTo header field from a Message  
        this.replyProducer = this.session.createProducer(null);  
        this.replyProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);  

        //Set up a consumer to consume messages off of the admin queue  
        MessageConsumer consumer = this.session.createConsumer(adminQueue);  
        consumer.setMessageListener(this);  
    } catch (JMSException e) {  
        //Handle the exception appropriately  
    }  
}

对于请求/应答的方式来说,这种典型交互的过程就是Client端在设定正常发送请求的Queue同时也设定一个临时的Queue。同时在要发送的message里头指定要返回消息的destination以及CorelationID,这些就好比是一封信里面所带的回执。根据这个信息人家才知道怎么给你回信。对于Server端来说则要额外创建一个producer,在处理接收到消息的方法里再利用producer将消息发回去。这一系列的过程看起来很像http协议里面请求-应答的方式,都是一问一答。

一些应用和改进

回顾前面三种基本的通信方式,发现它们都存在着一定的共同点,比如说都要初始化ConnectionFactory, Connection, Session等。在使用完之后都要将这些资源关闭。如果每一个实现它的通信端都这么写一通的话,其实是一种简单的重复。从工程的角度来看是完全没有必要的。

解决方法

通过工厂方法封装这些对象的创建和销毁,然后简单的通过调用工厂方法的方式得到它们。
既然基本的流程都是在开头创建资源在结尾销毁,也可以采用Template Method模式的思路。通过继承一个抽象类,在抽象类里提供了资源的封装。所有继承的类只要实现怎么去使用这些资源的方法就可以了。

快下载安装吧,今天头条送你钱啦!!!!
中国人都在使用的地球上最好玩的游戏
中国人都在使用的地球上最好玩的游戏
中国人都在使用的地球上最快的浏览器
中国人都在使用的地球上最厉害的安全软件
中国人都在使用的地球上最好的看图王
中国人都在使用的地球上最快速的视频软件
中国人都在使用的地球上最全的视频软件
中国人都在使用的地球上最好最全的压缩软件
中国人都在使用的地球上最好的音乐播放器
中国人都在使用的地球上最安全的杀毒软件
中国人都在使用的地球上最全的影视大全