博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
RabbitMQ消息队列改造踩过的坑
阅读量:4083 次
发布时间:2019-05-25

本文共 9113 字,大约阅读时间需要 30 分钟。

1、背景

当两个系统之间存在比较频繁的数据交互,那么就需要考虑两个系统之间的数据传输方式。第一次考虑使用rest接口调用的方式,但是缺点很明显,一个系统宕机时,另外一个系统的业务流程就没法进行下去,耦合性太强;继而想到使用异步队列的形式来解决耦合的问题,也就有了后续的

2、缺陷

  • Redis也可以看做为一个数据库, 那么业务单据的保存和发送消息到redis上其实属于两个事务,那么必然会出现消息发送成功时,本地事务提交失败导致本地的业务数据回滚,而消息已经发送出去,无法保证可靠性。
  • 发送消息与业务单据之间依赖性太强: 发送消息需要连接redis, 如果redis宕机,那么会导致业务单据也无法进行下去,系统与redis之间没有解耦。
  • 无法进行集群部署,可拓展性比较差## 3、解决方案
  • 切换成现有的RumbaMQ可实现最终一致的分布式事务。本身它不提供消息服务, 而是提供了消息服务的抽象访问层,实现了多种主流的消息框架,比如:RabbitMQ、RocketMQ等, 本次改造选择RabbitMQ作为底层消息服务,并通过新的evCall服务实现对它的访问, 它保证了业务单据和发送消息在同一个事务中,同时也将发送消息与业务单据之间实现解耦。
  • rumba-mq版本: 1.16.0; evCall版本: 1.1.0
    evCall实现

4、消息切换过程记录

4.1. 组件升级

  • rumba-commons-biz:1.16.0 =》 1.23.1
  • rumba-commons-mini:1.21.0 =》 1.29.2问题:vaidator校验
Caused by: org.hibernate.HibernateException: Unable to get the default Bean Validation factory    at org.hibernate.cfg.beanvalidation.BeanValidationActivator.applyDDL(BeanValidationActivator.java:127).....Caused by: javax.validation.ValidationException: Unable to create a Configuration, because no Bean Validation provider could be found. Add a provider like Hibernate Validator (RI) to your classpath.    at javax.validation.Validation$GenericBootstrapImpl.configure(Validation.java:271)    at javax.validation.Validation.buildDefaultValidatorFactory(Validation.java:110)    at org.hibernate.cfg.beanvalidation.TypeSafeActivator.getValidatorFactory(TypeSafeActivator.java:380)    ... 91 more

原因

由于新版本mini组件依赖了validation-api,但未依赖实现。jpa检查选择默认为auto,检测到validation-api后,无实现则报错。
解决方案

  • 引入validation实现
org.hibernate.validator
hibernate-validator
  • jpa属性显式的指示不检查。
... 省略其他配置 ...
none

4.2 动态数据源

应用系统实际部署时会存在多个数据库的情况,目前客户使用中最多能达到8个数据库。在应用系统中会根据不同的门店切换到不同的数据库中执行该门店的业务。那么evCall保存到调用任务表时就需要与业务事务保持一致。

问题1: 数据源切换
系统中使用DsRouter类作为数据源,使用门店号作为数据源分片的key值,通过AOP切面的形式设置不同的keyHolder,从而切换到不同的数据源DataSource。而evCall暂时不支持该种方式的动态数据源。

public static final String BUYPOOL = "BUYPOOL"; // 根据storeCode分片的BUYPOOL  public static final String HDPOS = "HDPOS"; // 根据storeCode分片的HDPOS  @Value("${stos.dsrouter.singleStoreUseStoreCode:false}")  private boolean singleStoreUseStoreCode;  /** key: cat value : key:storeCode value:ds */  private Map
> dataSourceCache; /** key: cat value : key:storeCode value:dburl */ private Map
> dataSourceUrlCache; /** key: cat value : key:dbkey value:ds */ private Map
> dataSourceByDbKeyCache; /** key: cat value : key:dbkey value:dburl */ private Map
> dataSourceUrlByDbKeyCache; @Override public Connection getConnection() throws SQLException {
return determineTargetDataSource().getConnection(); } @Override public Connection getConnection(String username, String password) throws SQLException {
return determineTargetDataSource().getConnection(username, password); } private DataSource determineTargetDataSource() throws SQLException {
DataSourceKey key = keyHolder.get(); if (key == null) {
for (StackTraceElement ste : Thread.currentThread().getStackTrace()) {
if (org.hibernate.tool.hbm2ddl.SchemaUpdate.class.getName().equals(ste.getClassName()) && "execute".equals(ste.getMethodName()) || (org.hibernate.cfg.SettingsFactory.class.getName().equals(ste.getClassName()) && "buildSettings".equals(ste.getMethodName())) ) {
Iterator
> iter = getTargetDataSources().entrySet().iterator(); return iter.next().getValue(); } } throw new RuntimeException("No DataSourceKey in current thread, call setKey() first."); } return getDataSource(key.cat, key.storeCode); } private final ThreadLocal
keyHolder = new ThreadLocal
();

解决方案 EvCall提供了一个真正意义上的多通道能力。因为当你在应用系统中建立多个通道的时候,工具内部的各种设施也都是多份的,包括调度器、线程池,甚至在数据库中的数据表也是多份的。这样设计的目的就是尽量减少不同通道之间出现资源争抢的现象,从而在最大程度上做到资源隔离。

利用这种特性,在应用启动的时候根据DsRouter.json文件(如下图)配置的数据源分片,按每个数据源配置不同的name(用dbkey属性)启动多个evCallManager,每个manager都会启动一个守护线程,对应每个不同的数据源。

[   {
"cat": "CENTER", "url": "jdbc:postgresql://localhost:5432/dev_h4csct", "username": "username", "password": "password", "dbkey": "db1", "storecodes": [ "0901", "9787", "0001" ] }, {
"cat": "HDPOS", "url": "jdbc:postgresql://localhost:5432/dev_h4csct", "username": "username", "password": "password", "dbkey": "db2", "storecodes": [ "0103", "0105", "0106", "0107", "0110" ] }]

1. 根据数据源分片创建多个evCallManager

private void prepare() {
// 取到所有的数据源 DsRouter dsRouter = appCtx.getBean(DsRouter.class); Map
dataSourceMap = dsRouter.getTargetDataSourceByDbKeyCache(DsRouter.HDPOS); MQConnection conn = getConnection(); managers = new ConcurrentHashMap
(); // 按数据源分片添加多个evCall管道 for(String dbKey : dataSourceMap.keySet()) {
ReliableEventExecutor executor = new ReliableEventExecutor(conn, getMessageWrapperCodec()); EvCallManager evCallManager = new EvCallManager.Builder(dataSourceMap.get(dbKey)) .properties(properties.getProperties()) .autoStart(false) .name(dbKey) .executor(new EvCallBatchExecutorBinder(executor) .batchSize(properties.getExecutorBatchSize())) .build(); managers.put(dbKey, evCallManager); } assert !managers.isEmpty() && managers.get(0) != null; }

2. 启动evCall

public void start() {
if (isRunning()) {
return; } synchronized (this) {
if (isRunning()) {
return; } if (StringUtil.toBoolean(PropertiesUtils.getPropertiesValue("${h4cs.rumbamq.autoStartup:false}"))) {
debug("Start ReliableEventManager ..."); try {
prepare(); for(EvCallManager evCallManager : managers.values()) {
evCallManager.start(); } } catch (Exception e) {
throw new ReliableEventException(e, "Failed to start ReliableEventManager."); } } } }```**问题2:jpa和jdbcTemplate同时使用的时候是否存在事务不一致**系统目前还是使用JPA来操作单据,集成evCall后,业务单据通过JPA来保存、evCall通过jdbcTemplate来保存消息到调用任务表,是否会存在事务的问题。**排查** 通过查看evCall的源代码可以发现,它是通过注册TransactionAdapter来控制消息的插入,而事务提交由业务代码部分来做控制,所以就不存在事务不一致的问题,经过测试也确认了该场景。 ```java private class TransactionAdapter extends TransactionSynchronizationAdapter {
@Override public void beforeCommit(boolean readOnly) {
// 将requestsCache中内容插入数据库。 List
requests = requestsCache.get(); if (requests == null || requests.isEmpty()) {
return; } debug("Save {} requests to database: {}", requests.size(), requests); dbContext.getEvcRequestDao().batchInsert(requests); // 跟踪日志 for (EvcRequest request : requests) {
trace(LogMessage.byValue(TRACE_EVENT_COMMIT, request)); } } @Override public void afterCompletion(int status) {
List
requests = requestsCache.get(); if (requests == null) {
return; } try {
if (TransactionSynchronization.STATUS_COMMITTED == status) {
if (!requests.isEmpty()) {
try {
debug("Transaction committed, try to schedule {} requests: {}", requests.size(), requests); metricsAgent.onSubmit(requests); scheduler.trySchedule(requests); } catch (Exception e) {
LOGGER.warn(e.getMessage(), e); } } metricsAgent.onCommit(); } else {
for (EvcRequest r : requests) {
trace(LogMessage.byValue(TRACE_EVENT_ROLLBACK, r)); } } } finally {
requestsCache.remove(); } } }

4.3 业务单据消费保证顺序

超市门店与总部之间业务单据交互时, 同一张单据的不同状态流会发起不同的消息,在业务上就需要保证这张单据的几条消息是有序消费的,**而MQ消费端是不保证有序消费的。**解决方案

  • 针对系统中同一张单据的不同业务动作加一个编号,在消费端保证同一张单据内消息消费有序。
  • 具体实现:生产端增加一张业务单据编号表busOrderProducerSerial,用来存储每张单据的发送消息编号;消费端增加一张业务单据编号表busOrderConsumerSerial, 用来存储每张单据的消费消息编号。 只有当前序编号(上一个动作编号)消费完成后,后序编号(下一个动作编号)的消息才能开始消费。以直配进货单为例: 门店生产端
    直配进货收货
    保存单据,发送消息A,门店数据库表busOrderProducerSerial增加一条记录: num= 单据单号, cls = 单据类型, serialNo = 1
    直配进货单进行完成操作
    保存单据,发送消息B,如果busOrderProducerSerial表存在相同单号的单据记录,则将serailNo加1,否则插入一条新的记录:num= 单据单号, cls = 单据类型, serialNo = 1
    直配进货单完成后作废
    保存单据,发送消息C,如果busOrderProducerSerial表存在相同单号的单据记录,则将serailNo加1,否则插入一条新的记录:num= 单据单号, cls = 单据类型, serialNo = 1
    总部消费端(消费时,每条消息都先找比它的编号小1的消息是否已消费)
    先收到消息A,直接执行,向总部数据库busOrderConsumerSerial增加一条记录:num= 单据单号, cls = 单据类型, serialNo = 1;
    先收到消息B,先判断消息A是否消费成功(busOrderConsumerSerial表中是否存在编号为1的记录),如果消费成功,则跳过该条消息,否则进行消费处理,更新busOrderConsumerSerial表中的记录:serailNo = serialNo + 1;
    先收到消息C, 先判断消息A是否消费成功(busOrderConsumerSerial表中是否存在编号为2的记录),如果消费成功,则跳过该条消息,否则进行消费处理,更新busOrderConsumerSerial表中的记录:serailNo = serialNo + 1;
    同一张单据的消息保证消费有序
    其他小问题
  • 由于消息的消费还是无序的,只有在消费端强行控制了它的有序,那么有些消息势必会有延迟性。
  • 消息消费失败一次后就挪到死信队列中,等待存活时间到期时会重新路由到消费队列中,这个时间目前没有好的办法去灵活控制
  • 某些消息即使重试很多次也无法成功,日志文件里就会出现很多无效的日志记录,对生产日志会产生一定的影响, 目前是将消息产生的错误信息剥离出来,单独存放一个日志文件。
  • 如果是通过getBean()的方式取主题的话,那么应用启动的时候不会自动创建主题,需要运维手动创建好主题,否则应用启动会报错。

5、总结

整个改造后,消息中间件RabbitMQ与应用之间进行了解耦,减少了应用系统对中间件的依赖,执行效率比Redis要提升不少,整体消费性能有所提高。

转载地址:http://rqani.baihongyu.com/

你可能感兴趣的文章
码上用它开始Flutter混合开发——FlutterBoost
查看>>
Flutter Boost的router管理
查看>>
Android Flutter混合编译
查看>>
微信小程序 Audio API
查看>>
Amaze UI React框架快速体验上手
查看>>
延展操作符和解构的简单实际应用
查看>>
超自然的箭头函数学习初步
查看>>
令人振奋的Class(上)
查看>>
令人振奋的Class(下):继承和实战代码示例
查看>>
[提高]ES2015中最惊艳特性之生成器函数初步
查看>>
微信小程序学习用精品demo:跟单
查看>>
小程序请求API接口,网络请求封装
查看>>
ES6中Json的与Map之间的转换
查看>>
ES6中Set和WeakSet的使用
查看>>
[React Native]react-native-scrollable-tab-view(进阶篇)
查看>>
React Native开源封装AES,MD5加密模块(react-native-encryption-library)
查看>>
使用Vue.js框架搭建火车票查询系统
查看>>
1-vuejs2.0实战:仿豆瓣app项目,创建自定义组件tabbar
查看>>
基于vue的下拉刷新&滚动刷新指令
查看>>
Vue2 移动端开发环境搭建
查看>>