- Notifications
You must be signed in to change notification settings - Fork0
PansonPanson/Argo
Folders and files
| Name | Name | Last commit message | Last commit date | |
|---|---|---|---|---|
Repository files navigation
Argo 是一款基于Java实现的最终一致性组件,主要用于处理分布式系统中的数据一致性问题。通过 SpringAOP ,将任务保存到数据库中,然后从数据库中读取任务来执⾏。核心目的:代码中保证“某个操作(Action)”最终⼀定可以执⾏成功。执行流程图:
<dependency> <groupId>top.panson</groupId> <artifactId>argo-core</artifactId> <version>0.0.1-SNAPSHOT</version></dependency>
CREATETABLE `argo_task` (`id`bigintNOT NULL AUTO_INCREMENT COMMENT'主键⾃增',`task_id`varchar(500) CHARACTERSET utf8mb4 COLLATE utf8mb4_general_ciNOT NULL COMMENT'⽤户⾃定义的任务名称,如果没有则使⽤⽅法签名',`task_status`intNOT NULL DEFAULT'0' COMMENT'执⾏状态',`execute_times`intNOT NULL COMMENT'执⾏次数',`execute_time`bigintNOT NULL COMMENT'执⾏时间',`parameter_types`varchar(255) CHARACTERSET utf8mb4 COLLATE utf8mb4_general_ciNOT NULL COMMENT'参数的类路径名称',`method_name`varchar(100) CHARACTERSET utf8mb4 COLLATE utf8mb4_general_ciNOT NULL COMMENT'⽅法名',`method_sign_name`varchar(200) CHARACTERSET utf8mb4 COLLATE utf8mb4_general_ciNOT NULL DEFAULT'' COMMENT'⽅法签名',`execute_interval_sec`intNOT NULL DEFAULT'60' COMMENT'执⾏间隔秒',`delay_time`intNOT NULL DEFAULT'60' COMMENT'延迟时间:单位秒',`task_parameter`varchar(200) CHARACTERSET utf8mb4 COLLATE utf8mb4_general_ciNOT NULL DEFAULT'' COMMENT'任务参数',`performance_way`intNOT NULL COMMENT'执⾏模式:1、⽴即执⾏ 2、调度执⾏',`thread_way`intNOT NULL COMMENT'线程模型 1、异步 2、同步',`error_msg`varchar(200) CHARACTERSET utf8mb4 COLLATE utf8mb4_general_ciNOT NULL DEFAULT'' COMMENT'执⾏的error信息',`alert_expression`varchar(100) CHARACTERSET utf8mb4 COLLATE utf8mb4_general_ci DEFAULTNULL COMMENT'告警表达式',`alert_action_bean_name`varchar(255) CHARACTERSET utf8mb4 COLLATE utf8mb4_general_ci DEFAULTNULL COMMENT'告警逻辑的的执⾏beanName',`fallback_class_name`varchar(255) CHARACTERSET utf8mb4 COLLATE utf8mb4_general_ci DEFAULTNULL COMMENT'降级逻辑的的类路径',`fallback_error_msg`varchar(200) CHARACTERSET utf8mb4 COLLATE utf8mb4_general_ci DEFAULTNULL COMMENT'降级失败时的错误信息',`shard_key`bigint DEFAULT'0' COMMENT'任务分⽚键',`gmt_create` datetimeNOT NULL COMMENT'创建时间',`gmt_modified` datetimeNOT NULL COMMENT'修改时间',PRIMARY KEY (`id`), UNIQUE KEY`uk_id_shard_key` (`id`,`shard_key`) USING BTREE) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;
spring: datasource: url: jdbc:mysql://localhost:3306/task-engine?characterEncoding=utf8&characterSetResults=utf8&autoReconnect=true&failOverReadOnly=false&useSSL=false username: root password: your password driver-class-name: com.mysql.cj.jdbc.Driver hikari: connection-timeout: 3000top: panson: argo: parallel: pool: thread-max-pool-size: 6 thread-core-pool-size: 6 thread-pool-keep-alive-time: 60 thread-pool-keep-alive-time-unit: SECONDS thread-pool-queue-size: 200 action: fail-count-threshold: 0 shard: task-sharded: false
@EnableTendConsistencyTask@EnableScheduling@SpringBootApplicationpublicclassApplication {publicstaticvoidmain(String[]args) {SpringApplication.run(Application.class,args); }}
@ConsistencyTask(id ="testAction",executeIntervalSec =2,delayTime =5,performanceWay =PerformanceEnum.EXECUTE_RIGHT_NOW,threadWay =ThreadWayEnum.ASYNC,fallbackClass =SendMessageFallbackHandler.class,alertActionBeanName ="normalAlerter")publicvoidaction(CreateOrderRequestcreateOrderRequest) {}
注解解释:
- id: 表示action的名字
- executeIntervalSec,默认20秒,表示失败间隔的基数,假如某个action已经失败了5次,那么第 5 次失败后需要等待(5+1) * executeIntervalSec 秒之后,才会进⾏第6次重试
- delayTime: 当前action需要延迟多久执⾏,默认不延迟
- performanceWay:当前的action是在当前线程执⾏,还是新创建线程执⾏。
- fallbackClass: 降级类,执⾏失败会调⽤降级类的同名⽅法
- alertActionBeanName:告警实例的beanName,需要实现ConsistencyFrameworkAlerter
任务失败重试是通过定时任务调⽤taskScheduleManager.performanceTask() ⽅法来实现的,底层逻辑就是根据条件从数据库中查询出来失败的任务,然后判断是否需要重试,执行后续逻辑。
在这个过程中,根据条件查询失败的任务,这⾥的条件允许⼀定程度的⾃定义。默认情况下⾏为是:每次查询当前时间 - 1⼩时 时间范围内的1000条失败的记录。
如果想要更改此逻辑,可以通过实现TaskTimeRangeQuery 接⼝来达到⽬的。如下:
@ComponentpublicclassMyTaskTimeRangeQueryimplementsTaskTimeRangeQuery {/** * 获取查询任务的初始时间 默认的开始时间为 查询12⼩时内的任务 * * @return 启始时间 */@OverridepublicDategetStartTime() {returnDateUtils.getDateByDayNum(newDate(),Calendar.HOUR, -12); }/** * 获取查询任务的结束时间 * * @return 结束时间 */@OverridepublicDategetEndTime() {returnnewDate(); }/** * 每次最多查询出多少个未完成的任务出来 * * @return 未完成的任务数量 */@OverridepublicLonglimitTaskCount() {return200L; }}
同时在配置文件中增加配置:
top:panson:argo:parallel:pool:task-schedule-time-range-class-name:top.panson.argo.example.range.MyTaskTimeRangeQuery
About
从零开始写数据一致性组件
Resources
License
Uh oh!
There was an error while loading.Please reload this page.
Stars
Watchers
Forks
Releases
No releases published
Packages0
No packages published
Uh oh!
There was an error while loading.Please reload this page.