Rabbitmq

AMQP

AMQP: Advanced Message Queuing Protocol,高级消息队列协议

常用的开源AMQP框架:

https://github.com/rabbitmq

https://github.com/apache/kafka

https://github.com/apache/activemq

MQTT

MQTT: Message Queuing Telemetry Transport,消息队列遥测传输协议


Rabbitmq

https://github.com/rabbitmq

RabbitMQ是一个erlang开发的符合AMQP和MQTT的开源项目.

rabbitmq-server也叫broker server

rabbitmq的三个组件:

  • exchange, 交换器,发送消息的实体
  • binding, 绑定器,连接交换器和队列,并且封装消息的路由信息
  • queue, 队列,接受消息的实体

workflow:

producer(publish-message) =>

rabbitmq-server => exchange -> binding -> queue =>

=> comsumer

producer: 生产message并且publish到rabbitmq-server.

consumer: 连接到rabbitmq-server并且subscribe一些queue.

connection: producer和consumer都是通过tcp连接到rabbitmq-server.

channels: 建立在tcp连接中的虚拟连接,用于处理数据流动.

queue: 生产者和消费者都应该创建queue.(只能通过exchange接收message)

exchanges类型:

  • fanout: 所有绑定到此exchange的queue都可以接收消息
  • direct: 通过routingKey和exchange决定的那个唯一的queue可以接收消息
  • topic:所有符合routingKey(此时可以是一个表达式)的routingKey所bind的queue可以接收消息

message类型:

  • messages: 生产者产生的总消息数.
  • messages_ready: 等待deliver给消费者的消息.
  • messages_unack: 已经被consumer处理,但是没有被ack的消息.

virtual hosts: 本质就是一个rabbitmq server,拥有独立的exchange,queue.默认是/(%2F).

round-robin dispatch: 循环分发,按顺序分发message到consumer,如果message被consumer正确接收,就会从queue中移除.

no-ack: 每次consumer接收数据后,不管是否处理完成,就标记为ack,然后从queue中删除.但是如果处理过程异常,数据就会丢失.

ack: ack方式就是数据处理完成后发送ack,保证数据被处理再从queue删除,如果异常,会dispatch到别的consumer.

durable: 消息持久化,如果rabbitmq-server异常退出或服务器重启,为了保证数据还在,需要做数据持久化.

消息的状态信息:

  • deliver: 消息投递给消费着. redeliver: 消息重新投递给消费者.
  • publish/subscribe: 将同一个消息deliver到多个consumer叫publish或subscribe.
  • ack: 已经处理完成的消息

安装

$ sudo apt-get install rabbitmq-server

配置

rabbitmq-3.7 开始配置文件格式为sysctl,同时也兼容旧的erlang格式。

erlang格式:

[
    {rabbit,
        [
            {heartbeat, 8000}
        ]
    }
].

sysctl格式:

key = value

rabbitmqctl 命令

$ sudo rabbitmqctl [-n node] [-t timeout] [-q] <commands> [command options]

添加用户并授权:

# 默认的guest/guest只能用于localhost.
$ add_user [username] [password]
$ delete_user <username>
$ change_password <username> <newpassword>
$ clear_password <username>
$ set_user_tags [username] administrator
$ list_users

权限管理:

add_vhost <vhostpath>
delete_vhost <vhostpath>
list_vhosts [<vhostinfoitem> ...]
set_permissions [-p <vhostpath>] <user> <conf> <write> <read>
clear_permissions [-p <vhostpath>] <username>
list_permissions [-p <vhostpath>]
list_user_permissions <username>

list_queues [-p <vhostpath>] [<queueinfoitem> ...]
list_exchanges [-p <vhostpath>] [<exchangeinfoitem> ...]
list_bindings [-p <vhostpath>] [<bindinginfoitem> ...]
list_connections [<connectioninfoitem> ...]
list_channels [<channelinfoitem> ...]
list_consumers [-p <vhostpath>]
status
environment
report
eval <expr>

rabbitmq-plugins 插件管理

启动web-gui:

$ rabbitmq-plugins enable rabbitmq_management
# http://localhost:15672 guest/guest

HAProxy

rabbitmq-cluster部署:

  1. 在所有node上安装rabbitmq-server.

  2. 修改所有node的/etc/hosts,配置ip和hostname.

  3. 同步所有node的cookie(/var/lib/rabbitmq/.erlang.cookie).

  4. 启动所有node上的rabbitmq-server.

     # rabbitmq-server -detached
    
  5. 将所有slave node添加manager组成cluster.

     # rabbitmqctl stop_app
     # rabbitmqctl reset
     # rabbitmqctl join_cluster rabbit@manager
     # rabbitmqctl start_app
    
  6. 检查cluster状态

     # rabbitmqctl cluster_status
    
  7. 设置policy

     # rabbitmqctl set_policy ha-all "" '{"ha-mode":"all","ha-sync-mode":"automatic"}'
    

haproxy-server部署:

  1. 在haproxy server安装haproxy

  2. 配置haproxy

     # sudo vim /etc/haproxy/haproxy.cfg
     global
         log /dev/log    local0
         log /dev/log    local1 notice
         chroot /var/lib/haproxy
         user haproxy
         group haproxy
         daemon
     defaults
         log     global
         mode    tcp
         maxconn 10000
         timeout connect 3000
         timeout client 1000s
         timeout server 1000s
     frontend rabbitmq_front
         bind <haproxy-ip>:5672
         reqadd X-Forwarded-Proto:\ amqp
         default_backend rabbitmq_backend
     backend rabbitmq_backend
         balance roundrobin
         server rabbitmq-master <master-ip>:5672 check
         server rabbitmq-slave <slave-ip>:5672 check
     bind 0.0.0.0:15672
         server <master-hostname> <master-ip>:15672 check
         server <slave-hostname> <slave-ip>:15672 check
    
  3. 重启haproxy service.


Celery

https://github.com/celery

可用配置,django需要加namespace作为前缀。

https://docs.celeryproject.org/en/stable/userguide/configuration.html#new-lowercase-settings

django中的celery配置

CELERY_BROKER_URL = "amqp://user:pw@amqp:5672"
CELERY_ACCEPT_CONTENT = ['json']
CELERY_RESULT_SERIALIZER = 'json'
CELERY_EVENT_SERIALIZER = 'json'
CELERY_TIMEZONE = 'UTC'
CELERY_ENABLE_UTC = True
CELERY_WORKER_PREFETCH_MULTIPLIER = 4
CELERY_TASK_ACKS_LATE = False

CELERY_TASK_SERIALIZER = 'json'
CELERY_TASK_QUEUES = (
    Queue('queue1', Exchange('exchange1', type='direct'), routing_key='default', queue_arguments={'x-max-priority': 100}),
    Queue('queue2', Exchange('exchange2', type='direct'), routing_key='default', queue_arguments={'x-max-priority': 100}),
)
CELERY_TASK_DEFAULT_QUEUE = 'default'
CELERY_TASK_DEFAULT_EXCHANGE = 'default'
CELERY_TASK_DEFAULT_EXCHANGE_TYPE = 'direct'
CELERY_TASK_DEFAULT_DELIVERY_MODE = 'persistent'
CELERY_TASK_DEFAULT_ROUTING_KEY = 'default'
CELERY_TASK_ROUTES = (
    {
        "proj.app1.tasks.task1": {
            "queue": "queue1",
            "routing_key": "default"
        }
    },
    {
        "proj.app2.tasks.task2": {
            "queue": "queue2",
            "routing_key": "default"
        }
    },
)

在project中创建celery的app:

import os
from celery import Celery

# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'site_main.settings')

app = Celery('proj')

// namespace就会去django的配置找前缀为celery的参数.
app.config_from_object('django.conf:settings', namespace='CELERY')

app.autodiscover_tasks()

celery选项:

celery [options] command [args]

-A, --app <app>¶
-b, --broker <broker>
--result-backend <result_backend>
--loader <loader>
--config <config>
--workdir <workdir>
-C, --no-color
-q, --quiet
--version

worker选项:

-n, --hostname
-D, --detach
-l, --loglevel
-O default|fair
// 默认的concurrency是cpu的个数,如果是container运行就是host的cpu个数,不是resourcelimit的cpu个数.
-c, --concurrency
// cpu密集型任务最好用prefork, IO密集最好用eventlet/gevent. 
-P, --poo prefork(default)|eventlet|gevent|solo
-E, --task-events, --events
-Q, --queues
-B, --beat

beat选项:

--detach
-s, --schedule
-S, --scheduler
-l, --loglevel

inspect选项:

-t, --timeout
-d, --destination
-j, --json
Designed by Canux