消息队列

在产品结构上,将同步转为异步

优化oss备份功能,原来备份文件和回复响应是同步的,只有备份完才可以回复响应,导致响应时间变长,为了优化这个问题,可以使用消息队列,再起一个线程,让备份的实际操作让线程做,而原来线程只做发起备份操作,实现了备份与响应的异步。

目前主流的消息队列:kafka(java开发),rocketMQ(java开发),rabbitMQ(Erlang开发)

为了不学习对应的语言,使用非c++环境下的产品可以使用虚拟机,但是性能差(CPU执行翻译多一层,内存占用多),使用容器可以解决这个问题

容器

容器是轻量级的虚拟机,可以解决环境问题,提供隔离性

fedc863a4c2b5511e72778bb84794762.png

docker做了隐藏的工作,让上层的虚拟机认为自己是操作系统的独享者,则上层的一个个虚拟机就称为容器。

docker是如何做到隐藏的?

  1. namespace

    可以让每个进程眼中的文件系统和别的进程不一样,相当于从空间上隔离

  2. cgroup

    限制了每个进程可以使用的cpu资源的数量,相当于从cpu资源上隔离

容器相关的产品

docker是事实上的容器标准—>云原生的基石 DevOps

docker

容器:是一个动态的概念——>打包存储起来称为镜像:是一个静态的概念

容器和镜像的关系类似于,可执行程序和进程的关系

通过镜像可以启动一个容器

镜像

镜像可以从仓库拉取一个镜像,默认是从hub.docker.com拉取

执行docker所有指令要切换到root下执行

1
2
3
4
5
docker images #列出本地所有镜像
docker rmi ID#通过id删除镜像
docker rmi nginx::TAG#通过仓库名:tag删除
docker pull nginx:1.27.0#通过仓库名:标签名下载 下载是分层的 是因为要拉取的镜像有公共的依赖,就不下载了,类似于网盘的秒传功能
#只写仓库名不鞋tag默认tage是latest

从镜像中启动一个容器

1
2
3
docker run hello-world #启动容器 docker + run + 仓库名:标签
docker ps #展示所有运行中的容器
docker ps -a #展示所有的容器

删除容器,必须等容器已经停止才可以删除

1
2
docker rm ID #删除容器
docker rm NAME#通过名字删除容器

以守护进程的方式启动容器(后台容器)

1
docker run -d NAME#以镜像的名字启动容器

停止后台的容器

1
2
docker stop ID/NAME #用id或name停止容器
docker start ID #恢复已经停止的容器

启动容器,深入到容器内部,并且启动shell(命令行解释器)

1
2
3
docke run -it NAME /bin/bash#NAME是镜像的名字
ctrl + p + q#安全离开 不会导致程序终止
docker exec -it ID /bin/bash #回到容器内部

端口映射

外界是无法访问到容器的端口的,所以为了让外界可以访问容器的端口,所以需要端口映射

1
docker run -d -p 8888:80 NAME #-p表示端口映射,8888表示宿主机的端口,80表示容器的端口宿主机的端口不允许相同,容器的端口运行相同

如果把这个容器停止,然后删除,再重新启动一个镜像文件,那么之前容器内的修改不会被保存,

如想要保存可以把之前的容器打包成镜像

1
2
docker commit --help #查看帮助文档
docker commit ID NAME:TAGE #NAME表示仓库名,根据运行中的容器打包一个镜像

还可以持久化

在容器会有文件系统,在宿主机里面也有文件系统,可以把容器文件系统的一颗子树和宿主机的文件系统的一棵子树映射

a7a26f9dedd8d4f9860f3f80def088e0.png

相当于u盘插到了文件系统里

1
docker run d -p 8888:80 -v /root/yingshe/:/user/share/nagin/html/naginx #加-v选项,后面的格式 宿主机映射目录:容器映射目录

启动一个消息队列

1
docker run -d --hostname rabbitsvr --name rabbit -p 5672:5672 -p 15672:15672 -p 25672:25672 -v /data/rabbitmq:/var/lib/rabbitmq rabbitmq:management

端口:5672支持AMQP协议 15672支持HTTP 25672给集群通信用

AMQP协议

AMQP(Advance message Queuing protocol)

bfe1567e0d880ff163763037f3f07959.png

负责存储A的消息,并进行转发,A称为生产者,B称为消费者

好处

  1. 同步转异步,原来跟A说完话才能处理别的事,有消息队列就可以说了交给消息队列,然后就可以处理别的事情了
  2. 排队,削峰,把可以把流量的峰值拉低,如果同时又大量的消息,消息队列可以让大量的消息排队,让消费者慢慢消化
  3. 支持持久化,避免消息丢失,如果消费者挂掉了,那么消息会存储在消息队列里,不会丢失

消息队列当中的组件

389496e94eaaa4fb7bb54d363d927d4f.png

exchange可以使用direct(直连)模式

d4e4f998a4995d24adba061ea32bef3d.png

每个队列上都会绑定一个routing key只有发来的消息的ronting key和队列绑定的routing key匹配 才会把消息交给该队列,如果两个队列都有相同的key,就复制一份给两个都发

消息队列使用

  1. 在rabbitMQ网页上新建交换器

    4fb6ed5e64766bc0af194a6019657325.png

  2. 在rabbitMQ网页上新建队列

    4fb6ed5e64766bc0af194a6019657325.png

  3. 添加绑定

要使用AMQP协议要安装两个库

1
2
3
4
5
6
rabbitmq-c-0.11xxx.tar.gz
SimpleAmqpClien.tar.gz
#安装第二个库 会缺依赖 所以要安装下面这些依赖
sudo apt install libboost-dev
sudo apt install libboost-chrono-dev
sudo apt install libboost-system-dev

代码使用rabbitmq

生产者

包含头文件

1
#include <SimpleAmqpclient/SimpleAmqpClient.h>

使用AMQP Channel

提供了工厂函数来构建对象,返回值是一个智能指针类型的对象,并且参数都提供了默认值

1
statc ptr_t Create(...);

使用

1
2
3
4
5
#include <SimpleAmqpclient/SimpleAmqpClient.h>

int main(){
Amqplient::Channel::ptr_t channel = AmqpClient::channel::Create();
}

构造消息

1
2
AmqpClient::BasicMessage::ptr_t message = AmqpClient::BasicMessage::Create("hello world");

发送消息

1
2
//发送消息
channel->BasicPublish("file_exchange","fileinfo",message);//第一个参数是交换器的名字,第二个参数是routeing key值,第三个是消息

生产者完整代码

1
2
3
4
5
6
7
#include <SimpleAmqpclient/SimpleAmqpClient.h>

int main(){
Amqplient::Channel::ptr_t channel = AmqpClient::channel::Create();
AmqpClient::BasicMessage::ptr_t message = AmqpClient::BasicMessage::Create("hello world");
channel->BasicPublish("file_exchange","fileinfo",message);
}

消费者

消费者的消费模式

83a4a756c648d2d7b2ce3c47f455f38e.png

阻塞的性能更好,因为消费者和生产者很可能不在一台计算机,所以每次检查是否有数据的代价是很高的,所以减少检查数据是否存在的行为,所以阻塞的性能更好

1,前面的步骤跟生产者类似

1
2
3
4
5
#include <SimpleAmqpclient/SimpleAmqpClient.h>

int main(){
Amqplient::Channel::ptr_t channel = AmqpClient::channel::Create();
}

2.调用BasicConsume

表示要开始消费,进入消费状态

1
channel->BasicConsume("myqueue","myqueue");//第一个参数是队列,第二个参数是tag可以对消息进行过滤,一般跟队列一样就可以

3.申请一片内存空间(Envelope)去接收消息

1
AmqpCient::Envelope::ptr_r * envelope;

4.执行真正的消费行为

1
2
3
4
5
6
bool flag = channel->asicConsumeMessage(envelope,3000);
if(flag){
std::cout << "message = "<< envelop->Message()->Body() << "\n";
}else{
cout << "timeout\n";
}

5.完整代码

1
2
3
4
5
6
7
8
9
10
11
12
13
#include <SimpleAmqpclient/SimpleAmqpClient.h>

int main(){
Amqplient::Channel::ptr_t channel = AmqpClient::channel::Create();
channel->BasicConsme("myqueue","myqueue");
AmqpCient::Envelope::ptr_r * envelope;
bool flag = channel->asicConsumeMessage(envelope,3000);
if(flag){
std::cout << "message = "<< envelop->Message()->Body() << "\n";
}else{
cout << "timeout\n";
}
}

怎么在项目中接入消息队列

一开始是为了优化开始的思路,即先保存文件–>备份–>回复响应

改为:当服务端保存文件之后,把文件信息生产到消息队列里面,然后回复响应。

消费者做一个死循环,然后接受exchange中的消息,完成备份到oss的任务,实现了异步备份的效果

3597a5c3d22f62688f2552f8bec00b13.png

好处:

  1. 响应变快
  2. 备份和上传分离