jds实现方式
上一篇文章中我们介绍了JAVA 简单同步框架的诞生,这一次我们介绍下其是如何实现的。本文基于 1.0.1-SNAPSHOT
版本进行说明。
回顾
考虑JDS JAVA Data Sync 数据同步框架 中提到的几点:
数据分发 ,偏移量持久化
重试机制
消息持久化
偏移量持久化
实际场景中服务会停机,服务重启的时候需要重新读取最新的数据偏移量状态(最后同步到了哪条记录),需要从最后同步的那条记录开始继续同步
重试
消息分发过程中可能会遇到各种问题,比如网络抖动,目标服务停止,业务逻辑出错等等。数据在某些场景下需要重发,而有些场景可能不需要重发;重发也需要考虑间隔问题,不能一直死循环重发
消息持久化
基于上面2条,自然还需要将消息进行持久化,否则服务重启会造成要同步的数据丢失。
架构
主要分为2大模块(逻辑上)
Core 提供API接口和数据处理逻辑
存储层 提供数据持久化能力(包括待分发数据和偏移量)
Core 模块
核心模块,提供了 API,数据处理的能力。接收用户数据,传输给底层存储;启动分发任务,调用用户逻辑进行数据处理;失败数据进行重试;分发成功调用存储层持久化偏移量;抽象存储层逻辑接口
存储层
该层要实现Core提供的接口,可以根据选择的组件进行具体的实现,比如实现基于Mongodb或者MySQL的存储层,甚至可以基于Kafka实现。
核心代码讲解
结构
该项目是一个标准的Maven项目。
在 1.0.1-SNAPSHOT
版本中,项目结构如下:
api 就是Core模块
xxx-provider 具体的存储层实现(该版本只实现了mongodb)
其他都是项目信息
代码逻辑入口
入口类为DataSync
主要方法有:
因为要配置参数,提供了一个builder DataSyncBuilder
,负责源数据,目标地址,还有设置具体的provicer。
启动流程
调用 DataSync.start()
,会调用 init
函数进行初始化,调用 internalStoreDataHandler.internalStoreDataHandler
进行provider的准备工作(比如连接数据库,创建表等),然后启动一个线程,调用 provider 获取数据 internalStoreDataHandler.poll()
其中提到的 provider 的 internalStoreDataHandler
对象是 InternalStoreDataHandler
接口的一个实例,具体 provider 负责具体实现。
可以看到 InternalStoreDataHandler
接口的方法和 DataSync
的方法基本一致,其实面向 DataSync
的操作最终会委托给 provider 的 InternalStoreDataHandler
实现来处理。
数据分发逻辑
接着看 数据交给用户处理逻辑:
以上就是主要的数据分发逻辑,下面介绍 provider(基于mongo实现)的主要逻辑。
Provider 设计
在 core api 模块中,给 provider 提供了一个接口 InternalStoreDataHandler
,同时还提供了一个 抽象类 InternalAbstractStoreDataHandler
方便 provider 获取DataSync的一些共享数据,比如 builder 中的信息。
Mongodb-Provider实现
在 mongodb-provider
模块中,MongoStoreDataHandlerImpl
类实现了InternalAbstractStoreDataHandler
。接下来我们来看下主要内容。
MongoStoreDataHandlerImpl 这里使用了 spring-data-mongodb
依赖,提供了一个基于 MongoTemplate 的 构造方法,传入该参数,可以使用其操作mongodb数据库;同时还有一个collectionName参数,该参数是用于保存数据的collection名字。
存储数据逻辑
存储数据需要设置来源(所有数据都可以放在同一个collection中保存),这样在取数据的时候能够区分不同数据;然后使用 mongoTemplate save 方法保存数据。
查询数据
offset 处理
框架现存问题
mongodb-provider 问题
mongodb-provider 该存储层在实现比较offset的时候,使用的是 mongodb 的ObjectID,而ObjectId的构造在分片集群下不能保证严格的插入顺序,所以这个地方只适合单击或者副本集的mongodb server。
优化的的方案是,可以使用自定义的一个collection,使用一条记录,使用inc函数,这样会严格递增,保证分片下也是严格有序,不会因为比较大小而造成数据遗漏。
重试机制
如果数据发送失败,现在固定写死了等待3s后重试。其实可以使用策略模式,给用户暴露一个配置重试策略的接口。封装好的重试框架有:
spring-retry
guava-retrying
总结
使用的设计模式:构建器 builder 模式; 策略模式(重试策略); xxx-provider
并发控制: synchronize;LockSupport 暂停,唤醒线程
面向接口编程:internalStoreDataHandler
参考资料
Mongodb ObjectID 说明 https://docs.mongodb.com/manual/reference/bson-types/#std-label-objectid
spring-retrying https://docs.spring.io/spring-batch/docs/current/reference/html/retry.html
guava-retrying https://github.com/rholder/guava-retrying