`
student_lp
  • 浏览: 428648 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论
阅读更多

一、介绍

1、异步消息

    异步消息是一个非常普通并且广泛使用的技术,例如Skype。这些服务都有如下特征:

  • 他们会在传输消息的时候或多或少加入一些随意的内容和一些比较正式的路由信息;
  • 他们都是异步的,也是就是说他们将生产者和消费者区分开来,因此可能将消息加入队列(例如某人发送给你一条消息,但是你不在线或者你的邮箱会受到一封Email)。
  • 生产者和消费者是具有不同知识的不同角色。

2、AMQP

    AMQP是一个异步消息传递所使用的应用层协议规范,是一个抽象的协议。AMQP当中有四个概念非常重要:虚拟主机(virtual host)、交换机(exchange)、队列(queue)和绑定(binding)。一个虚拟主机持有一组交换机、队列和绑定。RabbitMQ当中,用户只能在虚拟主机的粒度进行权限控制。

    队列(queue)是你的消息的终点,可以理解成消息的容器。消息就一直在里面,直到有客户端连接到这个队列并且将其取走为止。

    交换机可以理解成具有路由表的路由程序,仅此而已。每个消息都有一个称为路由键的属性,就是一个简单的字符串。交换机当中有一系列的绑定,即路由规则。每个交换机在自己独立的进程当中执行,因此增加多个交换机就是增加多个进程,可以充分利用服务器上的CPU核以便达到更高的效率。例如,在一个8核的服务器上,可以创建5个交换机用5个核,另外3个核留下来做消息处理。

    一个绑定就是一个基于路由键将交换机和队列连接起来的路由规则。交换机的类型:

  • Fanout Exchange--不处理路由键。你只需要简单的将队列绑定到交换机上。 一个发送到交换机的消息都会被转发到与该交换机绑定的所有队列上。很像子网广播,每台子网内的主机都获得一份复制的消息。Fanout交换机转发消息是最快的。
  • Direct Exchange--处理路由器。需要将一个队列绑定到交换机上,要求该消息与一个特定的路由键完全匹配。这是一个完整的匹配。如果一个队列绑定到该交换机上要求路由键“dog",则只有被标记为dog的消息才被转发,不会转发dog.puppy,也不会转发dog.guard,只会转发dog。
  • Topic Exchange--将路由键和某模式进行匹配。此时队列需要绑定一个模式上。符号#匹配一个或多个词,符号*匹配不多不少一个词。因此audit.#能够匹配到audit.irs.corporate,但是audiit.*只能匹配到audit.irs。

    RabbitMQ是一个Erlang编写的AMQP服务器。他的核心原理非常简单:接收和发送消息。你可以把它想象成一个邮局:你把信件放入邮箱,邮递员就会把信件投递到你的收件人处。这个比喻中,RabbitMQ是一个邮箱、邮局、邮递员。RabbitMQ和邮局的主要区别是,他处理的不是纸,而是接收、存储和发送二进制的数据--消息。


 3、RabbitMQ特点

    支持持久化:如果RabbitMQ死掉了,消息并不会丢失,当队列重启,一切都会恢复。

    RabbitMQ支持消息的持久化,也就是数据写在磁盘上,为了数据安全考虑,大多数用户都会选择持久化。消息队列持久化包括3个部分:

  • exchange持久化,在声明时指定durable=>1
  • queue持久化,在声明时指定durable=>1
  • 消息持久化,在投递时指定delivery_mode=>2(1是非持久化)

    如果exchange和queue都是持久化的,那么他们之间的binding也是持久化的。

    如果exchange和queue两者间有一个持久化,一个非持久化,就不允许建立绑定。

    RabbitMQ的集群节点包括内存节点、磁盘节点。顾名思义内存节点就是将所有数据放在内存,磁盘节点将数据放到磁盘。不过,如果在投递消息时,打开了消息的持久化,那么即使是内存节点,数据还是安全的放在磁盘。

    良好的设计架构可以如下:在一个集群里,有3台以上机器,其中1台使用磁盘模式,其他使用内存模式。其他几台为内存模式的节点,无疑速度更快,因此客户端连接访问它们。而磁盘模式的 节点,由于磁盘IO相对较慢,因此仅作为数据备份使用。

二、原理

    一个用作发送消息,另一个接受消息并打印消息内容


其中:p为生产者;hello表示队列名称;c为消费者;首先要做的事情就是建立一个到RabbitMQ服务器的连接,在发送消息之前我们要确认队列是存在的,如果我们把消息发送到一个不存在的队列,RabbitMQ会丢弃这条消息。

    在项目中,将一些无需即时返回且耗时的操作提取出来,进行了异步处理,而这种异步处理的方式大大的节省了服务器的请求响应时间,从而提高了系统的吞吐量。

三、RabbitMQ安装

    RabbitMQ使用的是AMQP协议。要使用它就必须需要一个使用同样协议的库。几乎所有的编程语言都有可选择的库。python也一样,可以从以下几个库中选择:py-amqplib、txAMQP、pika。

    下载RabbitMQ:http://www.rabbitmq.com/releases/rabbitmq-server/v3.2.1/rabbitmq-server-3.2.1-1. noarch.rpm

     安装:rpm -ivh rabbitmq-server-3.2.1-1.noarch.rpm

四、结构图


Exchange:消息交换机,它指定消息按什么规则,路由到那个队列。

Queue:消息队列载体,每个消息都会被插入到一个或多个队列。

Channel:消息通道,在客户端的每个链接里,可建立多个Channel,每个Channel代表一个会话任务。

routing key:路由关键字,Exchange根据这个关键字进行消息投递。

消息队列的使用过程大概如下:

  • 客户端连接到消息队列服务器,打开一个Channel;
  • 客户端声明一个Exchange,并设置相关属性;
  • 客户端声明一个queue,并设置相关属性;
  • 客户端使用routing key,在Exchange和queue之间建立绑定关系。
  • 客户端投递消息到Exchange。

五、具体应用

1、服务器段设置

创建用户myuser和密码mypassword:
$ rabbitmqctl add_user myuser mypassword
创建虚拟主机名myvhost:
$ rabbitmqctl add_vhost myvhost
设置权限:
$ rabbitmqctl set_permissions -p myvhost myuser ".*" ".*" ".*"
启动:
./rabbitmq-server
停止:
./rabbitmqctl stop

 2、客户端部分代码

from amqplib import client_0_8 as amqp
conn = amqp.Connection(host="localhost:5672 ", userid="guest",
password="guest", virtual_host="/", insist=False)
chan = conn.channel()

    AMQP支持在一个TCP连接上启用多个MQ通讯Channel,每个channel都可以被应用作为通讯流。每个AMQP程序都至少有一个连接和一个channel。

    每个channel都被分配一个整数标示,自动由connection()类的.channel()方法维护。或者你可以使用.channel(x)来指定channel标示,其中x是你想要使用的channel标示。通常情况下,推荐使用.channel()方法来自从分配channel标示,以便防止冲突。

    前面已经有了一个可用的连接和channel。现在代码将分成两个应用,生产者和消费者。先创建一个消费者程序,包含一个叫做po_box的队列和一个叫sorting_room的交换机。

chan.queue_declare(queue="po_box", durable=True,exclusive=False, auto_delete=False)
chan.exchange_declare(exchange="sorting_room", type="direct", durable=True,auto_delete=False,)

   首先,创建了一个名叫po_box的队列,它是durable的(重启之后会重新建立),并且最后一个消费者断开的时候不会自动删除(auto_delete=false)。在常见durable的队列的时候,将auto_delete设置为FALSE是很重要的,否则队列将会在最后一个消费者断开的时候消失,与durable与否无关。如果将durable和auto_delete都设置成TRUE,只有尚有消费者活动的队列可以在rabbitMQ意外崩溃的时候自动恢复。

    另外一个标志exclusive。如果设置为TRUE,只有创建这个队列的消费者程序才允许连接该队列。这种队列对于这个消费者程序是私有的。

    还有另外一个交换机声明,创建了一个名字叫”sorting_room“的交换机。auto_delete和durable的含义和队列是一样的。但是.excange_declare()还有另外一个参数type,用来指定要创建的交换机的类型:fanout、direct和topic。

    到此为止,已经有了一个可以接受消息的队列和一个可以发送消息的交换机。不过需要创建一个绑定,把他们连接起来:

chan.queue_bind(queue=”po_box”, exchange=”sorting_room”,
routing_key=”jason”)

    这个绑定的过程非常直接。任何送到交换机“sorting_room”的具有路由键“jason” 的消息都被路由到名为“po_box” 的队列。

    现在有两种方法从队列当中取出消息。第一个是调用chan.basic_get(),主动从队列当中拉出下一个消息(如果队列当中没有消息,chan.basic_get()会返回None, 因此下面代码当中print msg.body 会在没有消息的时候崩掉):

msg = chan.basic_get("po_box")
print msg.body
chan.basic_ack(msg.delivery_tag)

    但是如果你想要应用程序在消息到达的时候立即得到通知怎么办?这种情况下不能使用chan.basic_get(),你需要用chan.basic_consume()注册一个新消息到达的回调:

def recv_callback(msg):
    print 'Received: ' + msg.body
chan.basic_consume(queue='po_box', no_ack=True,
callback=recv_callback, consumer_tag="testtag")
while True:
    chan.wait()
chan.basic_cancel("testtag")

    chan.wait() 放在一个无限循环里面,这个函数会等待在队列上,直到下一个消息到达队列。chan.basic_cancel() 用来注销该回调函数。参数consumer_tag 当中指定的字符串和chan.basic_consume() 注册的一致。在这个例子当中chan.basic_cancel() 不会被调用到,因为上面是个无限循环…… 不过你需要知道这个调用,所以我把它放在了代码里。

   需要注意的另一个东西是no_ack参数。这个参数可以传给chan.basic_get()chan.basic_consume(),默认是false。当从队列当中取出一个消息的时候,RabbitMQ需要应用显式地回馈说已经获取到了该消息。如果一段时间内不回馈,RabbitMQ会将该消息重新分配给另外一个绑定在该队列上的消费者。另一种情况是消费者断开连接,但是获取到的消息没有回馈,则RabbitMQ同样重新分配。如果将no_ack 参数设置为true,则py-amqplib会为下一个AMQP请求添加一个no_ack属性,告诉AMQP服务器不需要等待回馈。但是,大多数时候,你也许想要自己手工发送回馈,例如,需要在回馈之前将消息存入数据库。回馈通常是通过调用chan.basic_ack()方法,使用消息的delivery_tag属性作为参数。

   下面的代码示例表明如何将一个简单消息发送到交换区sorting_room,并且标记为路由键jason

msg = amqp.Message("Test message!")
msg.properties["delivery_mode"] = 2
chan.basic_publish(msg,exchange="sorting_room",routing_key="jason")

  你也许注意到我们设置消息的delivery_mode属性为2,因为队列和交换机都设置为durable的,这个设置将保证消息能够持久化,也就是说,当它还没有送达消费者之前如果RabbitMQ重启则它能够被恢复。

 

   剩下的最后一件事情(生产者和消费者都需要调用的)是关闭channel和连接:

chan.close()
conn.close()

   结果:

生产者:
python amqp_publisher.py thank
python amqp_publisher.py think1
消费者:
Received:thank from channel #1
Received:thank1 from channel #1

六、队列模式

1、一个队列对应一个消费者:


 2、一个队列对应多个消费者


3、一个交换队列对应两个队列,注意要先建立交换机和队列的绑定,才可以发送消息:


4、一个交换机和一个队列进行绑定,交换机类型为direct


5、一个交换机和一个队列进行多个绑定,交换机类型为topic


6、远程程序调用


 

 

  • 大小: 50.6 KB
  • 大小: 9.4 KB
  • 大小: 234.3 KB
  • 大小: 9.2 KB
  • 大小: 24.2 KB
  • 大小: 21.5 KB
  • 大小: 34.7 KB
  • 大小: 29.9 KB
  • 大小: 37.5 KB
分享到:
评论

相关推荐

    RabbitMQ详解搭建

    RabbitMQ详解搭建

    Java思维导图xmind文件+导出图片

    详解RabbitMQ消息分发机制及主题消息分发 RabbitMQ消息路由机制分析 RabbitMQ消息确认机制 Redis redis数据结构分析 Redis主从复制原理及无磁盘复制分析 Redis管道模式详解 Redis缓存与数据库一致性问题解决...

    springBoor集成RabbitMQ详解

    springboot集成rabbitMq,此篇文档,不足之处请大家指正

    RabbitMQ 知识全面详解(值得珍藏)

    RabbitMQ是一个开源的消息代理软件,实现了高级消息队列协议(AMQP)。它使用Erlang语言编写,并且可以与多种编程语言进行交互,包括Java、.NET、C、Python等。RabbitMQ的核心概念包括生产者、消费者、队列和消息。...

    RabbitMQ技术详解

    本文来自于网络,本文主要介绍了RabbitMQ是什么,RabbitMQ为何会出现,RabbitMQ基础概念,RabbitMQ集群等。RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java...

    RabbitMQ安装配置步骤详解.doc

    RabbitMQ安装配置步骤详解.doc

    20.消息中间件之RabbitMQ入门讲解

    简单介绍了RabbitMQ的内容,怎么使用控制界面,以及编写消息生产者和消息消费者

    RabbitMQ详解(超详细整理,值得珍藏)

    RabbitMQ详解(超详细整理,值得珍藏) 内容简介: 1、基本概念 2、系统架构 3、消费原理 4、高级特性 5、特性分析

    rabbitmq完整例子

    rabbitmq完整例子,包含客户端、服务端例子。里面集中了各种案例详解

    RabbitMQ 学习整理.pdf

    RabbitMQ 学习整理 包括RabbitMQ 配置详解,交换机有四种类型,分别为Direct,topic,headers,Fanout,消息确认机制,多线程处理消息,消费端的限流策略,回调等。都有代码,保证代码正确性都是自己的的从工程中直接...

    RabbitMQ消息中间件视频教程

    RabbitMQ消息中间件视频无加密教程,包括消息中间件的多个企业级应用场景案例详解

    RabbitMQ入门小Dome ------> RabbitMQDome.zip

    最近整理学习的RabbitMQ入门Dome,文件是一个普通java项目导入完成后在lib文件夹中amqp-client-5.2.0.jar,slf4j-api-1.7.25.jar添加进去即可,里面有5个dome分是 dome1 : 简单队列,dome2 :work模式,dome3 : 订阅...

    RabbitMQ延迟队列及消息延迟推送实现详解

    主要介绍了RabbitMQ延迟队列及消息延迟推送实现详解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下

    JAVA获取rabbitmq消息总数过程详解

    主要介绍了JAVA获取rabbitmq消息总数过程详解,公司使用的是rabbitMQ,需要做监控预警的job去监控rabbitMQ里面的堆积消息个数,如何使用rabbitMQ获取监控的队列里面的队列消息个数呢,需要的朋友可以参考下

    SpringMVCHibernate集成RabbitMQ

    SpringMVC和RabbitMQ集成使用详解,是一个完整的intellij idea项目(包含创建数据库脚本,jar等等),用IDEA打开就可以使用。 大致步骤如下: 1.需要引入的jar 2.集成rabbitmq的配置文件 3.服务端代码 ...

    详解spring boot集成RabbitMQ

    RabbitMQ作为AMQP的代表性产品,在项目中大量使用。结合现在主流的spring boot,极大简化了开发过程中所涉及到的消息通信问题。

    详解Python操作RabbitMQ服务器消息队列的远程结果返回

    RabbitMQ是一款基于MQ的服务器,Python可以通过Pika库来进行程序操控,这里我们将来详解Python操作RabbitMQ服务器消息队列的远程结果返回:

    docker中安装rabbitmq(阿里云服务器)

    完成在docker中安装rabbitmq,简单易上手,命令行解释,每个命令的含义等详解。 解决了docker内部安装完成,外部浏览器无法访问的问题。

    Spring RabbitMQ死信机制原理实例详解

    主要介绍了Spring RabbitMQ死信机制原理实例详解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下

    rabbitmq-prometheus:核心RabbitMQ指标的简约Prometheus导出器

    rabbitmq-prometheus:核心RabbitMQ指标的简约Prometheus导出器

Global site tag (gtag.js) - Google Analytics