AMQP
AMQP: Advanced Message Queuing Protocol,高级消息队列协议
常用的开源AMQP框架:
https://github.com/apache/kafka
https://github.com/apache/activemq
MQTT
MQTT: Message Queuing Telemetry Transport,消息队列遥测传输协议
Rabbitmq
RabbitMQ是一个erlang开发的符合AMQP和MQTT的开源项目.
rabbitmq-server也叫broker server
rabbitmq的三个组件:
- exchange, 交换器,发送消息的实体
- binding, 绑定器,连接交换器和队列,并且封装消息的路由信息
- queue, 队列,接受消息的实体
workflow:
producer(publish-message) =>
rabbitmq-server => exchange -> binding -> queue =>
=> comsumer
producer: 生产message并且publish到rabbitmq-server.
consumer: 连接到rabbitmq-server并且subscribe一些queue.
connection: producer和consumer都是通过tcp连接到rabbitmq-server.
channels: 建立在tcp连接中的虚拟连接,用于处理数据流动.
queue: 生产者和消费者都应该创建queue.(只能通过exchange接收message)
exchanges类型:
- fanout: 所有绑定到此exchange的queue都可以接收消息
- direct: 通过routingKey和exchange决定的那个唯一的queue可以接收消息
- topic:所有符合routingKey(此时可以是一个表达式)的routingKey所bind的queue可以接收消息
message类型:
- messages: 生产者产生的总消息数.
- messages_ready: 等待deliver给消费者的消息.
- messages_unack: 已经被consumer处理,但是没有被ack的消息.
virtual hosts: 本质就是一个rabbitmq server,拥有独立的exchange,queue.默认是/(%2F).
round-robin dispatch: 循环分发,按顺序分发message到consumer,如果message被consumer正确接收,就会从queue中移除.
no-ack: 每次consumer接收数据后,不管是否处理完成,就标记为ack,然后从queue中删除.但是如果处理过程异常,数据就会丢失.
ack: ack方式就是数据处理完成后发送ack,保证数据被处理再从queue删除,如果异常,会dispatch到别的consumer.
durable: 消息持久化,如果rabbitmq-server异常退出或服务器重启,为了保证数据还在,需要做数据持久化.
消息的状态信息:
- deliver: 消息投递给消费着. redeliver: 消息重新投递给消费者.
- publish/subscribe: 将同一个消息deliver到多个consumer叫publish或subscribe.
- ack: 已经处理完成的消息
安装
$ sudo apt-get install rabbitmq-server
配置
rabbitmq-3.7 开始配置文件格式为sysctl,同时也兼容旧的erlang格式。
erlang格式:
[
{rabbit,
[
{heartbeat, 8000}
]
}
].
sysctl格式:
key = value
rabbitmqctl 命令
$ sudo rabbitmqctl [-n node] [-t timeout] [-q] <commands> [command options]
添加用户并授权:
# 默认的guest/guest只能用于localhost.
$ add_user [username] [password]
$ delete_user <username>
$ change_password <username> <newpassword>
$ clear_password <username>
$ set_user_tags [username] administrator
$ list_users
权限管理:
add_vhost <vhostpath>
delete_vhost <vhostpath>
list_vhosts [<vhostinfoitem> ...]
set_permissions [-p <vhostpath>] <user> <conf> <write> <read>
clear_permissions [-p <vhostpath>] <username>
list_permissions [-p <vhostpath>]
list_user_permissions <username>
list_queues [-p <vhostpath>] [<queueinfoitem> ...]
list_exchanges [-p <vhostpath>] [<exchangeinfoitem> ...]
list_bindings [-p <vhostpath>] [<bindinginfoitem> ...]
list_connections [<connectioninfoitem> ...]
list_channels [<channelinfoitem> ...]
list_consumers [-p <vhostpath>]
status
environment
report
eval <expr>
rabbitmq-plugins 插件管理
启动web-gui:
$ rabbitmq-plugins enable rabbitmq_management
# http://localhost:15672 guest/guest
HAProxy
rabbitmq-cluster部署:
-
在所有node上安装rabbitmq-server.
-
修改所有node的/etc/hosts,配置ip和hostname.
-
同步所有node的cookie(/var/lib/rabbitmq/.erlang.cookie).
-
启动所有node上的rabbitmq-server.
# rabbitmq-server -detached
-
将所有slave node添加manager组成cluster.
# rabbitmqctl stop_app # rabbitmqctl reset # rabbitmqctl join_cluster rabbit@manager # rabbitmqctl start_app
-
检查cluster状态
# rabbitmqctl cluster_status
-
设置policy
# rabbitmqctl set_policy ha-all "" '{"ha-mode":"all","ha-sync-mode":"automatic"}'
haproxy-server部署:
-
在haproxy server安装haproxy
-
配置haproxy
# sudo vim /etc/haproxy/haproxy.cfg global log /dev/log local0 log /dev/log local1 notice chroot /var/lib/haproxy user haproxy group haproxy daemon defaults log global mode tcp maxconn 10000 timeout connect 3000 timeout client 1000s timeout server 1000s frontend rabbitmq_front bind <haproxy-ip>:5672 reqadd X-Forwarded-Proto:\ amqp default_backend rabbitmq_backend backend rabbitmq_backend balance roundrobin server rabbitmq-master <master-ip>:5672 check server rabbitmq-slave <slave-ip>:5672 check bind 0.0.0.0:15672 server <master-hostname> <master-ip>:15672 check server <slave-hostname> <slave-ip>:15672 check
-
重启haproxy service.
Celery
可用配置,django需要加namespace作为前缀。
https://docs.celeryproject.org/en/stable/userguide/configuration.html#new-lowercase-settings
django中的celery配置
CELERY_BROKER_URL = "amqp://user:pw@amqp:5672"
CELERY_ACCEPT_CONTENT = ['json']
CELERY_RESULT_SERIALIZER = 'json'
CELERY_EVENT_SERIALIZER = 'json'
CELERY_TIMEZONE = 'UTC'
CELERY_ENABLE_UTC = True
CELERY_WORKER_PREFETCH_MULTIPLIER = 4
CELERY_TASK_ACKS_LATE = False
CELERY_TASK_SERIALIZER = 'json'
CELERY_TASK_QUEUES = (
Queue('queue1', Exchange('exchange1', type='direct'), routing_key='default', queue_arguments={'x-max-priority': 100}),
Queue('queue2', Exchange('exchange2', type='direct'), routing_key='default', queue_arguments={'x-max-priority': 100}),
)
CELERY_TASK_DEFAULT_QUEUE = 'default'
CELERY_TASK_DEFAULT_EXCHANGE = 'default'
CELERY_TASK_DEFAULT_EXCHANGE_TYPE = 'direct'
CELERY_TASK_DEFAULT_DELIVERY_MODE = 'persistent'
CELERY_TASK_DEFAULT_ROUTING_KEY = 'default'
CELERY_TASK_ROUTES = (
{
"proj.app1.tasks.task1": {
"queue": "queue1",
"routing_key": "default"
}
},
{
"proj.app2.tasks.task2": {
"queue": "queue2",
"routing_key": "default"
}
},
)
在project中创建celery的app:
import os
from celery import Celery
# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'site_main.settings')
app = Celery('proj')
// namespace就会去django的配置找前缀为celery的参数.
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()
celery选项:
celery [options] command [args]
-A, --app <app>¶
-b, --broker <broker>
--result-backend <result_backend>
--loader <loader>
--config <config>
--workdir <workdir>
-C, --no-color
-q, --quiet
--version
worker选项:
-n, --hostname
-D, --detach
-l, --loglevel
-O default|fair
// 默认的concurrency是cpu的个数,如果是container运行就是host的cpu个数,不是resourcelimit的cpu个数.
-c, --concurrency
// cpu密集型任务最好用prefork, IO密集最好用eventlet/gevent.
-P, --poo prefork(default)|eventlet|gevent|solo
-E, --task-events, --events
-Q, --queues
-B, --beat
beat选项:
--detach
-s, --schedule
-S, --scheduler
-l, --loglevel
inspect选项:
-t, --timeout
-d, --destination
-j, --json