A-A+

RabbitMQ实例教程:路由选择

2016年01月27日 站长资讯 暂无评论

在前面的例子中,我们构建了一个简单的日志系统来日志消息通过广播传送到多个接受者。本文将介绍如何订阅消息的子集。比如,我们能够将关键的错误信息写到日志文件中,同时也能够在控制台打印所有的日志消息。

消息绑定(Bindings)

在前面的例子中,我们使用下面的代码方式再次绑定。

channel.queueBind(queueName, EXCHANGE_NAME, "");

绑定表述的是交换机和队列之间的关系,可以理解为队列对交换机中的消息感兴趣。

绑定的参数是 routingKey,为避免混淆,我们将basic_publish参数称为binding key,下面是使用关键字绑定binding的例子。

channel.queueBind(queueName, EXCHANGE_NAME, "black");

它表示绑定关键字依赖于交换机类型,对于我们之前定义的 fanout 交换机,它会忽略绑定的关键字。即为所有订阅者发送消息。

Direct 交换机(Direct exchange)

在前面的例子中,我们将所有消息广播给了所有消费者,现在我们想在原有的基础上做些改进对日志消息进行过滤。比如我们只把严重错误的日志写入日志文件,这样能节省磁盘空间。

fanout交换机的灵活性比较差,它只是盲目的广播所有消息,现在使用direct交换机代替它。direct交换机的算法非常简单-当绑定关键字与消息的路由关键字匹配时就会将消息传递给队列。

如上图所示,X交换机绑定了两个队列Q1和Q2。Q1队列绑定了orange关键字,Q2队列绑定green和black关键字。路由中带有orange路由关键字的消息会转发给Q1,带有black或green关键字的消息会转发给Q2,其它的消息就会废弃。

多重绑定(Multiple bindings)

一个路由使用同一个绑定代码绑定到多个队列是合法的。上图中,X路由器使用black代码绑定到Q1和Q2。这种情况下,direct交换机会像fanout交换机一样,把消息广播给所有队列。

提交日志

我们继续使用日志系统作为我们的模型。我们使用direct交换机来代替fanout交换机,将日志等级severity作为路由代码,这样我们就能根据日志等级决定接收哪些消息。

(1)创建direct交换机 http://www.xiaoxiongboke.com/

channel.exchangeDeclare(EXCHANGE_NAME, "direct");

(2)准备发送消息

channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());

(3)将日志等级划分为:info,warning和error,根据日志等级创建不同的绑定。

消息订阅

接收消息与之前中的例子一样,不同之处在于会对每个 Severity 创建一个新的绑定。

  1. String queueName = channel.queueDeclare().getQueue();  
  2. for(String severity : argv){      
  3.   channel.queueBind(queueName, EXCHANGE_NAME, severity);  
  4. }  

大总结

  1. EmitLogDirect.java  
  2.    
  3. package com.favccxx.favrabbit;  
  4.    
  5. import com.rabbitmq.client.Channel;  
  6. import com.rabbitmq.client.Connection;  
  7. import com.rabbitmq.client.ConnectionFactory;  
  8.    
  9. public class EmitLogDirect {  
  10.    
  11.     private static final String EXCHANGE_NAME = "direct_logs";  
  12.    
  13.     public static void main(String[] argv) throws Exception {  
  14.         ConnectionFactory factory = new ConnectionFactory();  
  15.         factory.setHost("localhost");  
  16.         Connection connection = factory.newConnection();  
  17.         Channel channel = connection.createChannel();  
  18.    
  19.         channel.exchangeDeclare(EXCHANGE_NAME, "direct");  
  20.    
  21.         String[] serveritys = {"debug", "info", "warning", "error"};  
  22.         String[] messages = {"This is a DEBUG message!", "This is a INFO message!", "This is a WARNING message!", "This is a ERROR message!"};  
  23.            
  24.         for(int i=0; i<serveritys.length; i++){  
  25.             channel.basicPublish(EXCHANGE_NAME, serveritys[i], null, messages[i].getBytes("UTF-8"));  
  26.             System.out.println(" [x] Sent '" + serveritys[i] + "':'" + messages[i] + "'");  
  27.         }  
  28.            
  29.            
  30.         channel.close();  
  31.         connection.close();  
  32.     }  
  33.    
  34. }  

控制台输出内容如下

  1. [x] Sent 'debug':'This is a DEBUG message!'  
  2. [x] Sent 'info':'This is a INFO message!'  
  3. [x] Sent 'warning':'This is a WARNING message!'  
  4. [x] Sent 'error':'This is a ERROR message!'  

为了简化代码实现,我们使用了一个for循环来分别接收各种不同严重程度的日志信息。但在一个真实的应用环境中,我们可能需要3~4个接收器。

  1. ReceiveLogsDirect.java  
  2.    
  3. package com.favccxx.favrabbit;  
  4.    
  5. import java.io.IOException;  
  6.    
  7. import com.rabbitmq.client.AMQP;  
  8. import com.rabbitmq.client.Channel;  
  9. import com.rabbitmq.client.Connection;  
  10. import com.rabbitmq.client.ConnectionFactory;  
  11. import com.rabbitmq.client.Consumer;  
  12. import com.rabbitmq.client.DefaultConsumer;  
  13. import com.rabbitmq.client.Envelope;  
  14.    
  15. public class ReceiveLogsDirect {  
  16.    
  17.     private static final String EXCHANGE_NAME = "direct_logs";  
  18.    
  19.     public static void main(String[] argv) throws Exception {  
  20.         ConnectionFactory factory = new ConnectionFactory();  
  21.         factory.setHost("localhost");  
  22.         Connection connection = factory.newConnection();  
  23.         Channel channel = connection.createChannel();  
  24.    
  25.         channel.exchangeDeclare(EXCHANGE_NAME, "direct");  
  26.         String queueName = channel.queueDeclare().getQueue();  
  27.    
  28.         String[] serveritys = { "debug", "info", "warning", "error" };  
  29.         for (String severity : serveritys) {  
  30.             channel.queueBind(queueName, EXCHANGE_NAME, severity);  
  31.             Consumer consumer = new DefaultConsumer(channel) {  
  32.                 @Override  
  33.                 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,  
  34.                         byte[] body) throws IOException {  
  35.                     String message = new String(body, "UTF-8");  
  36.                     System.out.println(" [x] Received '" + envelope.getRoutingKey() + "':'" + message + "'");  
  37.                 }  
  38.             };  
  39.             channel.basicConsume(queueName, true, consumer);  
  40.         }  
  41.    
  42.     }  
  43. }  

控制台输出内容如下:

  1. [x] Received 'debug':'This is a DEBUG message!'  
  2. [x] Received 'info':'This is a INFO message!'  
  3. [x] Received 'warning':'This is a WARNING message!'  
  4. [x] Received 'error':'This is a ERROR message!'  
标签:

给我留言