A-A+

Python通过amqp消息队列协议中的Qpid实现数据通信

2015年11月25日 站长资讯 暂无评论

这两天看了消息队列通信,打算在配置平台上应用起来。以前用过zeromq但是这东西太快了,还有就是rabbitmq有点大,新浪的朋友推荐了qpid,简单轻便。自己总结了下文档,大家可以瞅瞅。

AMQP(消息队列协议Advanced Message Queuing Protocol)是一种消息协议 ,等同于JMS,但是JMS只是java平台的方案,AMQP是一个跨语言的协议。

AMQP 不分语言平台,主流的语言都支持,运维这边的perl,python,ruby更是支持,所以大家就放心用吧。

主流的消息队列通信类型:

点对点:A 发消息给 B。

广播:A 发给所有其他人的消息

组播:A 发给多个但不是所有其他人的消息。

Requester/response:类似访问网页的通信方式,客户端发请求并等待,服务端回复该请求

Pub-sub:类似杂志发行,出版杂志的人并不知道谁在看这本杂志,订阅的人并不关心谁在发表这本杂志。出版的人只管将信息发布出去,订阅的人也只在需要的时候收到该信息。

Store-and-forward:存储转发模型类似信件投递,写信的人将消息写给某人,但在将信件发出的时候,收信的人并不一定在家等待,也并不知道有消息给他。但这个消息不会丢失,会放在收信者的信箱中。这种模型允许信息的异步交换。

其他通信模型。。。

Publisher --->Exchange ---> MessageQueue --->Consumer

整个过程是异步的.Publisher,Consumer相互不知道对方的存在,Exchange负责交换/路由,依靠Routing Key,每个消息者有一个Routing Key,每个Binding将自已感兴趣的RoutingKey告诉Exchange,以便Exchange将相关的消息转发给相应的Queue !

几个概念

Producer,Routing Key,Exchange,Binding,Queue,Consumer.

Producer: 消息的创建者,消息的发送者

Routing Key:唯一用来映射消息该进入哪个队列的标识

Exchange:负责消息的路由,交换

Binding:定义Queue和Exchange的映射关系

Queue:消息队列

Consumer:消息的使用者

Exchange类型

Fan-Out:类似于广播方式,不管RoutingKey

Direct:根据RoutingKey,进行关联投寄

Topic:类似于Direct,但是支持多个Key关联,以组的方式投寄。

key以.来定义界限。类似于usea.news,usea.weather.这两个消息是一组的。

QPID

Qpid 是 Apache 开发的一款面向对象的消息中间件,它是一个 AMQP 的实现,可以和其他符合 AMQP 协议的系统进行通信。Qpid 提供了 C++/Python/Java/C# 等主流编程语言的客户端库,安装使用非常方便。相对于其他的 AMQP 实现,Qpid 社区十分活跃,有望成为标准 AMQP 中间件产品。除了符合 AMQP 基本要求之外,Qpid 提供了很多额外的 HA 特性,非常适于集群环境下的消息通信!

基本功能外提供以下特性:

采用 Corosync(?)来保证集群环境下的Fault-tolerant(?) 特性

支持XML的Exchange,消息为XML时,彩用Xquery过滤

支持plugin

提供安全认证,可对producer/consumer提供身份认证

qpidd --port --no-data-dir --auth

port:端口

--no-data-dir:不指定数据目录

--auth:不启用安全身份认证

启动后自动创建一些Exchange,amp.topic,amp.direct,amp.fanout

tools:

Qpid-config:维护Queue,Exchange,内部配置

Qpid-route:配置broker Federation(联盟?集群?)

Qpid-tool:监控

咱们说完介绍了,这里就赶紧测试下。

服务器端的安装:

yum install qpid-cpp-server

yum install qpid-tools

/etc/init.d/qpidd start

发布端的测试代码:

  1. #!/usr/bin/env python  
  2. #xiaorui.cc  
  3. import optparse, time  
  4. from qpid.messaging import *  
  5. from qpid.util import URL  
  6. from qpid.log import enable, DEBUG, WARN  
  7. def nameval(st):  
  8.   idx = st.find("=")  
  9.   if idx >= 0:  
  10.     name = st[0:idx]  
  11.     value = st[idx+1:]  
  12.   else:  
  13.     name = st  
  14.     value = None  
  15.   return name, value  
  16. parser = optparse.OptionParser(usage="usage: %prog [options] ADDRESS [ CONTENT ... ]",  
  17.                                description="Send messages to the supplied address.")  
  18. parser.add_option("-b", "--broker", default="localhost",  
  19.                   help="connect to specified BROKER (default %default)")   
  20. parser.add_option("-r", "--reconnect", action="store_true",  
  21.                   help="enable auto reconnect")  
  22. parser.add_option("-i", "--reconnect-interval", type="float"default=3,  
  23.                   help="interval between reconnect attempts")  
  24. parser.add_option("-l", "--reconnect-limit", type="int",  
  25.                   help="maximum number of reconnect attempts")  
  26. parser.add_option("-c", "--count", type="int"default=1,  
  27.                   help="stop after count messages have been sent, zero disables (default %default)")   
  28. parser.add_option("-t", "--timeout", type="float"default=None,  
  29.                   help="exit after the specified time")  
  30. parser.add_option("-I", "--id", help="use the supplied id instead of generating one")  
  31. parser.add_option("-S", "--subject", help="specify a subject")  
  32. parser.add_option("-R", "--reply-to", help="specify reply-to address")  
  33. parser.add_option("-P", "--property", dest="properties"action="append"default=[],  
  34.                   metavar="NAME=VALUE"help="specify message property")  
  35. parser.add_option("-M", "--map", dest="entries"action="append"default=[],   
  36.                   metavar="KEY=VALUE",  
  37.                   help="specify map entry for message body")  
  38. parser.add_option("-v", dest="verbose"action="store_true",  
  39.                   help="enable logging")  
  40. opts, args = parser.parse_args()  
  41. if opts.verbose:  
  42.   enable("qpid", DEBUG)  
  43. else:  
  44.   enable("qpid", WARN)  
  45. if opts.id is None:  
  46.   spout_id = str(uuid4())  
  47. else:  
  48.   spout_id = opts.id  
  49. if args:  
  50.   addr = args.pop(0)  
  51. else:  
  52.   parser.error("address is required")  
  53. content = None  
  54. if args:  
  55.   text = " ".join(args)  
  56.   
  57.   
  58. else:  
  59.   text = None  
  60. if opts.entries:  
  61.   content = {}  
  62.   if text:  
  63.     content["text"] = text  
  64.   for e in opts.entries:  
  65.     name, val = nameval(e)  
  66.     content[name] = val  
  67. else:  
  68.   content = text  
  69. conn = Connection(opts.broker,  
  70.                   reconnect=opts.reconnect,  
  71.                   reconnect_interval=opts.reconnect_interval,  
  72.                   reconnect_limit=opts.reconnect_limit)  
  73. try:  
  74.   conn.open()  
  75.   ssn = conn.session()  
  76.   snd = ssn.sender(addr)  
  77.   count = 0  
  78.   start = time.time()  
  79.   while (opts.count == 0 or count < opts.count) and \   
  80.         (opts.timeout is None or time.time() - start < opts.timeout):  
  81.     msg = Message(subject=opts.subject,  
  82.                   reply_to=opts.reply_to,  
  83.                   contentcontent=content)  
  84.     msg.properties["spout-id"] = "%s:%s" % (spout_id, count)  
  85.     for p in opts.properties:  
  86.       name, val = nameval(p)  
  87.       msg.properties[name] = val  
  88.     snd.send(msg)  
  89.     count += 1  
  90.     print msg  
  91. except SendError, e:  
  92.   print e  
  93. except KeyboardInterrupt:  
  94.   pass  
  95. conn.close()  

客户端的测试代码:

  1. #!/usr/bin/env python  
  2. #xiaorui.cc  
  3. import optparse  
  4. from qpid.messaging import *  
  5. from qpid.util import URL  
  6. from qpid.log import enable, DEBUG, WARN  
  7. parser = optparse.OptionParser(usage="usage: %prog [options] ADDRESS ...",  
  8.                                description="Drain messages from the supplied address.")  
  9. parser.add_option("-b", "--broker", default="localhost",  
  10.                   help="connect to specified BROKER (default %default)")  
  11. parser.add_option("-c", "--count", type="int",  
  12.                   help="number of messages to drain")   
  13. parser.add_option("-f", "--forever", action="store_true",  
  14.                   help="ignore timeout and wait forever")  
  15. parser.add_option("-r", "--reconnect", action="store_true",  
  16.                   help="enable auto reconnect")  
  17. parser.add_option("-i", "--reconnect-interval", type="float"default=3,  
  18.                   help="interval between reconnect attempts")  
  19. parser.add_option("-l", "--reconnect-limit", type="int",  
  20.                   help="maximum number of reconnect attempts")   
  21. parser.add_option("-t", "--timeout", type="float"default=0,  
  22.                   help="timeout in seconds to wait before exiting (default %default)")  
  23. parser.add_option("-p", "--print", dest="format"default="%(M)s",  
  24.                   help="format string for printing messages (default %default)")  
  25. parser.add_option("-v", dest="verbose"action="store_true",  
  26.                   help="enable logging")  
  27. opts, args = parser.parse_args()  
  28. if opts.verbose:  
  29.   enable("qpid", DEBUG)  
  30. else:  
  31.   enable("qpid", WARN)  
  32. if args:  
  33.   addr = args.pop(0)   
  34. else:  
  35.   parser.error("address is required")  
  36. if opts.forever:  
  37.   timeout = None  
  38. else:  
  39.   timeout = opts.timeout  
  40. class Formatter:  
  41.   def __init__(self, message):  
  42.     self.message = message  
  43.     self.environ = {"M": self.message,  
  44.                     "P": self.message.properties,  
  45.                     "C": self.message.content}  
  46.   def __getitem__(self, st):  
  47.     return eval(st, self.environ)  
  48. conn = Connection(opts.broker,  
  49.                   reconnect=opts.reconnect,  
  50.                   reconnect_interval=opts.reconnect_interval,  
  51.   
  52.   
  53.                   reconnect_limit=opts.reconnect_limit)  
  54. try:  
  55.   conn.open()  
  56.   ssn = conn.session()  
  57.   rcv = ssn.receiver(addr)  
  58.   count = 0  
  59.   while not opts.count or count < opts.count:  
  60.     try:  
  61.       msg = rcv.fetch(timeouttimeout=timeout)  
  62.       print opts.format % Formatter(msg)  
  63.       count += 1  
  64.       ssn.acknowledge()  
  65.     except Empty:  
  66.       break  
  67. except ReceiverError, e:  
  68.   print e  
  69. except KeyboardInterrupt:  
  70.   pass  
  71. conn.close()  

Browse 模式的意思是,浏览的意思,一个特殊的需求,我访问了一次,别人也能访问。

Consume 模式的意思是,我浏览了一次后,删除这一条。别人就访问不到啦。

这个是浏览模式.

sub-pub 通信的例子

Pub-sub 是另一种很有用的通信模型。恐怕它的名字就源于出版发行这种现实中的信息传递方式吧,publisher 就是出版商,subscriber 就是订阅者。

服务端

qpid-config add exchange topic news-service

./spout news-service/news xiaorui.cc

客户端:

./drain -t 120 news-service/#.news

PUB端,创建TOPIC点 !

SUB端,也就是接收端!

总结:

qpid挺好用的,比rabbitmq要轻型,比zeromq保险点! 各方面的文档也都很健全,值得一用。 话说,这三个消息队列我也都用过,最一开始用的是redis的pubsub做日志收集和信息通知,后来在做集群相关的项目的时候,我自己搞了一套zeromq的分布式任务分发,和saltstack很像,当然了远没有万人用的salt强大。 rabbitmq的用法,更是看中他的安全和持久化,当然性能真的不咋地。

关于qpid的性能我没有亲自做大量的测试,但是听朋友说,加持久化可以到7k,不加持久化可以到1500左右。

标签:

给我留言