rabbitmq小记

前言

​ 前段时间使用RabbitMQ做了一个小需求,也是一个很奇葩的需求,批量刷新一个表的外键ID,本来这个需求可以直接代码一步到位的,但是组里大佬多想了一些,考虑到了数据一致性,在事物执行的同时,避免同步方案将其他的订单商品同时同步过来,从而导致有部分订单和商品的外键没有修改成药店的主键,从而考虑使用RabbitMQ来异步的重新开一个事物去修改订单和商品的外键。

​ 所以,在这里使用RabbitMQ并不是为了解耦,也不是为了削峰,而是为了我也不知道为什么,反正就是另开了一个事物

开始

​ 这是我第一次接触消息中间件,之前间接的使用过消息中间件,在使用DataX数据同步工具的时候,算是间接的使用了,DataX会将需要取的数据放到RabbitMQ里面,然后在从RabbitMQ里面把数据拉下来。去操作这些数据,不管你是从MySql拉下来,放到oracle,还是从别人服务器的MySql拉下来放到我们自己的MySql都行,这就起到了一个解耦的作用。

​ 这次算是正式的接触了RabbitMQ,但是由于当时写需求的时候有时间限制,囫囵吞枣的把代码写完了,并没有过多的了解,加上公司架构组还针对RabbitMQ上面封装了一层,因此在不使用项目框架的时候,要让程序正常运行,需要做的改动还是挺多的,所以依旧老规矩,重写一个demo,让他正常跑起来

介绍

​ 其实不会做什么介绍啊,理论的东西,不是很适合大龄程序员的,要什么理论,拿起键盘就干,但是这样容易吃亏的,所以这里还是学着说一说

​ RabbitMQ是一个由erlang开发的AMQP(Advanced Message Queuing Protocol)的开源实现,MQ的全程是Message Queue,翻译过来就是消息队列的意思

​ RabbitMQ是有三个端,生产者,消费者,和RabbitMQ本身,生产者负责生产数据到服务端,也就是到RabbitMQ上去,消费端就是RabbitMQ会推送数据到消费端,其中会有几个推的模式,后面慢慢道来

几个重要概念

Exchange:消息交换机,他指定每个消息按什么规则投递到哪个队列里面

Queue:队列,也就是消息的载体,每个消息都会投到一个或多个队列中

Binding:绑定,他是将Exchange和Queue按照路由的规则绑定起来

Routing Key:路由关键字,exchange根据这个关键字进行消息投递

Producer:消息生产者,就是上文说到的负责生产数据到服务端的

Consumer:消息消费者,就是RabbitMQ会推送到订阅了该列队的消费端

Channel:消息通道,在客户端的每个链接里面可以建立多个Channel,但是本文这没有直接用到。

几个交换机

direct交换器

这个是默认交换器,下文的例子就是使用的默认交换器

他是根据路由建去匹配的,消息会投递到绑定了相应的路由的队列上,我在网上盗了几张图,帮助自己理解

Topic交换器

这个交换器是根据路由键的某种通配符去匹配发送,可以使用 和 # 去匹配 表示分段 #表示0或多个字符

比如路由键的名字是 com.routingkey.test 如果我要去匹配,就可以去使用#.test 或者com.routingkey.*

这样做可以灵活的订阅自己的想要的信息,也可以按照这个规则去发布订阅这个消息

fanout交换器

这个就是发布订阅模式的交换器,当发送成功一条消息时,会广播到所有绑定了这个交换器的队列上。

这里有一个原来在ERP做过的场景,但是当时没有用消息队列来处理,简单说一下,当用户下单之后,系统会生成采购订单,然后会有一系列的订单流转,但是当在生成采购订单的时候,不紧紧只是生成了一些采购出库单,而是会生成其他平台的采购入库单,或者销售出库单(不同场景下的出库,所生成的订单有区别)如果按照这个交换器的模式去做的话,生成一条采购订单之后,发送一条消息,需要生成采购出库,订阅此消息即可,如果当时会生成其他更多的单据,只用在订阅该消息,这样就有了解耦,添加新需求的成本大大降低

代码

废话不多说了,看代码吧,很早之前写的一个,由于工作原因,这篇文章搁置下来了,现在继续补充完成

配置账号密码,这个简单就不用说了,是写在配置文件里面,只用读一下就好了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
/**
* @author CongZ
* @classname MqConfig
* @create 2019/6/4
**/
@ConfigurationProperties(prefix = "demo.mq.changemerchantid.event.rabbitmq")
public class MqConfig {
private String host;
private int port;
private String username;
private String password;

public String getHost() {
return host;
}

public void setHost(String host) {
this.host = host;
}

public int getPort() {
return port;
}

public void setPort(int port) {
this.port = port;
}

public String getUsername() {
return username;
}

public void setUsername(String username) {
this.username = username;
}

public String getPassword() {
return password;
}

public void setPassword(String password) {
this.password = password;
}
}
发送方配置

主要就是连接配置,发送的模版配置,再就是创建队列了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
/**
* @author CongZ
* @classname 发送方的配置
* @create 2019/6/4
**/
@Configuration
@EnableConfigurationProperties(MqConfig.class)
public class TestAutoConfiguration {

private final MqConfig mqConfig;

/**
* 简单配置,这里直接定义字符串
**/
private String ExchangeName="changeName-test";
private String RoutingKey="RoutingKey-test";
private String QueueName="QueueName-test-2019-06-04";

@Autowired
public TestAutoConfiguration(MqConfig mqConfig) {
this.mqConfig = mqConfig;
}

/***
* 连接配置
* @return
*/
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setHost(mqConfig.getHost());
connectionFactory.setPort(mqConfig.getPort());
connectionFactory.setUsername(mqConfig.getUsername());
connectionFactory.setPassword(mqConfig.getPassword());
return connectionFactory;
}

/***
* 命令发送模版
* @return
*/
public RabbitTemplate TestRabbitTemplate() {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
checkAndInitQueue(connectionFactory());
//设置JSON转换器
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
//设置交换器
rabbitTemplate.setExchange(ExchangeName);
//设置路由key
rabbitTemplate.setRoutingKey(RoutingKey);
//设置队列名
rabbitTemplate.setQueue(QueueName);
return rabbitTemplate;
}

@Bean(name = "testService")
public TestService TestCommandService() {
TestService commandService = new TestService(TestRabbitTemplate());
return commandService;
}

/***
* 初始化队列
* @param connectionFactory
* @return
*/
public boolean checkAndInitQueue(ConnectionFactory connectionFactory){
RabbitAdmin admin = new RabbitAdmin(connectionFactory);

/**
* 创建交换器
* 这里有四种交换器的类型
* DirectExchange 默认交换器 由路由去匹配
* TopicExchange 根据路由key和某模式去匹配,广播的升级版
* FanoutExchange 广播,即发送消息到Exchange,所有的Queue都能接收到消息,然后由消费者消费
* HeadersExchange 不去会匹配路由key和绑定的key,会去匹配消息里面的消息头去匹配q
*/
admin.declareExchange(new DirectExchange(ExchangeName));
Map<String, Object> args = new HashMap<>();
/**
* 定义队列
* 参数1:队列名
* 参数2:是否持久化 因为队列是放在内存里面,
* 当rabbitmq重启会丢失,设置为True,消息会保存到Erlang自带的Mnesia数据库中
* rabbitmq重启就回读取数据库
* 参数3:是否独占
* 如果独占,则会给当前队列加锁,其他消费者是不能访问,一般设置为false
* 如果为true,则一个队列只能有一个消费者消费
* 参数4:是否自动删除,当最后一个消费者断开之后
* 队列是否会被自动删除 在RabbitMq Management中consumers=0时队列就会自动删除
* 参数5:其他参数
*/
admin.declareQueue(new Queue(QueueName, true, false, false, args));

/**
* 绑定队列
*/
admin.declareBinding(new Binding(QueueName, Binding.DestinationType.QUEUE, ExchangeName, RoutingKey, null));
return true;
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/**
* @author CongZ
* @classname 发送的类,这里我们一般在写项目的时候会单独抽出来
* @create 2019/6/5
**/
public class TestService {

private final RabbitTemplate template;

public TestService(RabbitTemplate template){
this.template = template;
}

public boolean send(String str) {
template.convertAndSend(str);
return true;
}
}
消费方配置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
/**
* @author CongZ
* @classname 消费端配置
* @create 2019/6/5
**/

@Configuration
@EnableConfigurationProperties(MqConfig.class)
public class TestContainer {

private final MqConfig mqConfig;

@Autowired
public TestContainer(MqConfig mqConfig) {
this.mqConfig = mqConfig;
}

public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setHost(mqConfig.getHost());
connectionFactory.setPort(mqConfig.getPort());
connectionFactory.setUsername(mqConfig.getUsername());
connectionFactory.setPassword(mqConfig.getPassword());
return connectionFactory;
}

@Bean(name = "singleListenerContainer")
public SimpleRabbitListenerContainerFactory listenerContainer(){
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
//设置连接
factory.setConnectionFactory(connectionFactory());
//设置消息序列化类型
factory.setMessageConverter(new Jackson2JsonMessageConverter());
//设置每个MessageListenerContainer将会创建的Consumer的最小数量,默认是1个。
factory.setConcurrentConsumers(1);
//设置最大的
factory.setMaxConcurrentConsumers(1);
//设置每次请求发送给每个Consumer的消息数量。
factory.setPrefetchCount(1);
//设置事务当中可以处理的消息数量。
factory.setTxSize(1);

//factory.setErrorHandler(); //设置处理失败的处理类
/***
* 设置消费端的应答模式
*
*/
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
return factory;
}
}
消费的类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
/**
* @author CongZ
* @classname TestHandler
* @create 2019/6/12
**/
@Component
public class TestHandler {


// @RabbitListener(bindings = @QueueBinding(
// value = @Queue(value = TopicRabbitConfig.QUEUE_NAME, durable = "true"),
// exchange = @Exchange(value = TopicRabbitConfig.TEST_EXCHANGE_HAHA, type = ExchangeTypes.TOPIC),
// key = TopicRabbitConfig.TEST_TOPIC_ROUTINGKEY)
// )
@RabbitListener(queues="QueueName-test-2019-06-04",containerFactory ="singleListenerContainer")
public void handler(Message message, Channel channel) throws IOException {
System.out.println("监听消费端的消息:"+new String(message.getBody()));
try{
// String is=null;
// is.toString();

/***
* false表示 只确认当前的消费者收到一个消息,true 表示确认所有的消费者获得消息
*/
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}catch(Exception ex){
System.out.println("消息处理失败,重新丢到队列去");
/***
* multiple false 只拒绝当前标签提供的消息,true表示拒绝所有的消息
* requeue false 表示直接丢掉,true表示重新入队
*/
channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true);
}
}
}

讲一下

上述的内容都挺简单的,是按照公司的配置做一些改版,搭建出来,spring boot更方便的搭建方式,比我的更通俗易懂,但是这个更方便让自己知道始末。

这篇文章只是开始,后面打算研究rabbitmq的重试机制,使用死信队列,以及延时加载。上文的例子,是使用的手动确认方式,当消费失败,则该消息会重新放到队列,继续消费,这样会导致一个问题,会频繁重复加载,则会对后面的消息造成阻塞,。

一般在项目中会使用自动确认的模式,即不报错就会自动消费。但是在复杂情况下也会有例外

记一下

[]: https://www.cnblogs.com/piaolingzxh/p/5448927.html
[]: https://www.jianshu.com/p/2c5eebfd0e95
[]: https://juejin.im/post/5a12ffd451882578da0d7b3a

在这做一个记录,上面三篇文章写的挺好的

介绍了关于消息确认以及延时列队的内容,下次写关于rabbitmq的内容,就去研究一下

总结

这些内容耗时太长了,由于这段时间工作生活的一些原因,导致本来已经开篇的文章,一直搁置,最后都不知道自己要写什么,结果就是这篇文章看上去很散乱,所以尽早结束重新开篇。

-------------本文结束感谢您的阅读-------------
0%