python(11)上:RabbitMQ使用详细介绍

发布时间:2017-09-02 15:43:24
python(11)上:RabbitMQ使用详细介绍

上节回顾
一、RabbitMQ 消息队列介绍
二、RabbitMQ基本示例.
  1、Rabbitmq 安装
  2、基本示例
  3、RabbitMQ 消息分发轮询
三、RabbitMQ 消息持久化(durable、properties)
  1、RabbitMQ 相关命令
  2、消息持久化
四、RabbitMQ 广播模式(exchange)
  1、fanout 纯广播、all
  2、direct 有选择的接收消息
  3、topic 更细致的过滤
  4、RabbitMQ RPC 实现(Remote procedure call)

上节回顾

主要讲了协程、进程、异步IO多路复用。
协程和IO多路复用都是单线程的。

epoll 在linux下通过这个模块libevent.so实现 gevent 在底层也是用了libevent.so

gevent可以理解为一个更上层的封装。
使用select或者selectors,每接收或发送数据一次都要select一次

twisted异步网络框架,强大又庞大,不支持python3 (代码量python中排top3)。几乎把所有的网络服务都重写了一遍。

一、RabbitMQ 消息队列介绍

RabbitMQ也是消息队列,那RabbitMQ和之前python的Queue有什么区别么?

py 消息队列: 线程 queue(同一进程下线程之间进行交互) 进程 Queue(父子进程进行交互 或者 同属于同一进程下的多个子进程进行交互)

如果是两个完全独立的python程序,也是不能用上面两个queue进行交互的,或者和其他语言交互有哪些实现方式呢。
【Disk、Socket、其他中间件】这里中间件不仅可以支持两个程序之间交互,可以支持多个程序,可以维护好多个程序的队列。

像这种公共的中间件有好多成熟的产品:
RabbitMQ
ZeroMQ
ActiveMQ
……

RabbitMQ:erlang语言 开发的。
Python中连接RabbitMQ的模块:pika 、Celery(分布式任务队列) 、haigha
可以维护很多的队列

RabbitMQ 教程官网:

几个概念说明:

Broker:简单来说就是消息队列服务器实体。
Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列。
Queue:消息队列载体,每个消息都会被投入到一个或多个队列。
Binding:绑定,它的作用就是把exchange和queue按照路由规则绑定起来。
Routing Key:路由关键字,exchange根据这个关键字进行消息投递。
vhost:虚拟主机,一个broker里可以开设多个vhost,用作不同用户的权限分离。
producer:消息生产者,就是投递消息的程序。
consumer:消息消费者,就是接受消息的程序。
channel:消息通道,在客户端的每个连接里,可建立多个channel,每个channel代表一个会话任务

二、RabbitMQ基本示例. 1、Rabbitmq 安装

ubuntu系统

install rabbitmq-server # 直接搞定

以下centos系统
1)Install Erlang

# For EL5: rpm -Uvh # For EL6: rpm -Uvh # For EL7: rpm -Uvh yum install erlang

2)Install RabbitMQ Server

rpm --import https://www.rabbitmq.com/rabbitmq-release-signing-key.asc yum install rabbitmq-server-3.6.5-1.noarch.rpm

3)use RabbitMQ Server

chkconfig rabbitmq-server on service rabbitmq-server stop/start 2、基本示例

发送端 producer

import pika # 建立一个实例 connection = pika.BlockingConnection( pika.ConnectionParameters('localhost',5672) # 默认端口5672,可不写 ) # 声明一个管道,在管道里发消息 channel = connection.channel() # 在管道里声明queue channel.queue_declare(queue='hello') # RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange. channel.basic_publish(exchange='', routing_key='hello', # queue名字 body='Hello World!') # 消息内容 print(" [x] Sent 'Hello World!'") connection.close() # 队列关闭

接收端 consumer

import pika import time # 建立实例 connection = pika.BlockingConnection(pika.ConnectionParameters( 'localhost')) # 声明管道 channel = connection.channel() # 为什么又声明了一个‘hello’队列? # 如果确定已经声明了,可以不声明。但是你不知道那个机器先运行,所以要声明两次。 channel.queue_declare(queue='hello') def callback(ch, method, properties, body): # 四个参数为标准格式 print(ch, method, properties) # 打印看一下是什么 # 管道内存对象 内容相关信息 后面讲 print(" [x] Received %r" % body) time.sleep(15) ch.basic_ack(delivery_tag = method.delivery_tag) # 告诉生成者,消息处理完成 channel.basic_consume( # 消费消息 callback, # 如果收到消息,就调用callback函数来处理消息 queue='hello', # 你要从那个队列里收消息 # no_ack=True # 写的话,如果接收消息,机器宕机消息就丢了 # 一般不写。宕机则生产者检测到发给其他消费者 ) print(' [*] Waiting for messages. To exit press CTRL+C') channel.start_consuming() # 开始消费消息 3、RabbitMQ 消息分发轮询 上面的只是一个生产者、一个消费者,能不能一个生产者多个消费者呢?
可以上面的例子,多启动几个消费者consumer,看一下消息的接收情况。
采用轮询机制;把消息依次分发 假如消费者处理消息需要15秒,如果当机了,那这个消息处理明显还没处理完,怎么处理?
(可以模拟消费端断了,分别注释和不注释 no_ack=True 看一下)
你没给我回复确认,就代表消息没处理完。

企业建站2800元起,携手武汉肥猫科技,做一个有见地的颜值派!更多优惠请戳:武汉网络推广 http://www.feimao666.com