- Notifications
You must be signed in to change notification settings - Fork1.1k
Distributed Scheduled Job Framework
License
ltsopensource/light-task-scheduler
Folders and files
| Name | Name | Last commit message | Last commit date | |
|---|---|---|---|---|
Repository files navigation
LTS(light-task-scheduler)主要用于解决分布式任务调度问题,支持实时任务,定时任务和Cron任务。有较好的伸缩性,扩展性,健壮稳定性而被多家公司使用,同时也希望开源爱好者一起贡献。
github地址:https://github.com/ltsopensource/light-task-scheduler
oschina地址:http://git.oschina.net/hugui/light-task-scheduler
例子:https://github.com/ltsopensource/lts-examples
文档地址(正在更新中,后面以这个为准):https://www.gitbook.com/book/qq254963746/lts/details
这两个地址都会同步更新。感兴趣,请加QQ群:109500214 (加群密码: hello world)一起探讨、完善。越多人支持,就越有动力去更新,喜欢记得右上角star哈。
- 优化JobContext中的BizLogger,由原来的去掉了threadlocal,解决taskTracker多线程的问题, 去掉LtsLoggerFactory.getLogger()用法
LTS 有主要有以下四种节点:
- JobClient:主要负责提交任务, 并接收任务执行反馈结果。
- JobTracker:负责接收并分配任务,任务调度。
- TaskTracker:负责执行任务,执行完反馈给JobTracker。
- LTS-Admin:(管理后台)主要负责节点管理,任务队列管理,监控管理等。
其中JobClient,JobTracker,TaskTracker节点都是无状态的。可以部署多个并动态的进行删减,来实现负载均衡,实现更大的负载量, 并且框架采用FailStore策略使LTS具有很好的容错能力。
LTS注册中心提供多种实现(Zookeeper,redis等),注册中心进行节点信息暴露,master选举。(Mongo or Mysql)存储任务队列和任务执行日志, netty or mina做底层通信, 并提供多种序列化方式fastjson, hessian2, java等。
LTS支持任务类型:
- 实时任务:提交了之后立即就要执行的任务。
- 定时任务:在指定时间点执行的任务,譬如 今天3点执行(单次)。
- Cron任务:CronExpression,和quartz类似(但是不是使用quartz实现的)譬如 0 0/1 * * * ?
支持动态修改任务参数,任务执行时间等设置,支持后台动态添加任务,支持Cron任务暂停,支持手动停止正在执行的任务(有条件),支持任务的监控统计,支持各个节点的任务执行监控,JVM监控等等.
- 英文名称 NodeGroup,一个节点组等同于一个小的集群,同一个节点组中的各个节点是对等的,等效的,对外提供相同的服务。
- 每个节点组中都有一个master节点,这个master节点是由LTS动态选出来的,当一个master节点挂掉之后,LTS会立马选出另外一个master节点,框架提供API监听接口给用户。
- 顾名思义,这个主要是用于失败了存储的,主要用于节点容错,当远程数据交互失败之后,存储在本地,等待远程通信恢复的时候,再将数据提交。
- FailStore主要用户JobClient的任务提交,TaskTracker的任务反馈,TaskTracker的业务日志传输的场景下。
- FailStore目前提供几种实现:leveldb,rocksdb,berkeleydb,mapdb,ltsdb,用于可以自由选择使用哪种,用户也可以采用SPI扩展使用自己的实现。
下图是一个标准的实时任务执行流程。

LTS可以完全不用Spring框架,但是考虑到很用用户项目中都是用了Spring框架,所以LTS也提供了对Spring的支持,包括Xml和注解,引入lts-spring.jar即可。
在TaskTracker端提供了业务日志记录器,供应用程序使用,通过这个业务日志器,可以将业务日志提交到JobTracker,这些业务日志可以通过任务ID串联起来,可以在LTS-Admin中实时查看任务的执行进度。
SPI扩展可以达到零侵入,只需要实现相应的接口,并实现即可被LTS使用,目前开放出来的扩展接口有
- 对任务队列的扩展,用户可以不选择使用mysql或者mongo作为队列存储,也可以自己实现。
- 对业务日志记录器的扩展,目前主要支持console,mysql,mongo,用户也可以通过扩展选择往其他地方输送日志。
当正在执行任务的TaskTracker宕机之后,JobTracker会立马将分配在宕机的TaskTracker的所有任务再分配给其他正常的TaskTracker节点执行。
可以对JobTracker,TaskTracker节点进行资源监控,任务监控等,可以实时的在LTS-Admin管理后台查看,进而进行合理的资源调配。
LTS框架提供四种执行结果支持,EXECUTE_SUCCESS,EXECUTE_FAILED,EXECUTE_LATER,EXECUTE_EXCEPTION,并对每种结果采取相应的处理机制,譬如重试。
- EXECUTE_SUCCESS: 执行成功,这种情况,直接反馈客户端(如果任务被设置了要反馈给客户端)。
- EXECUTE_FAILED:执行失败,这种情况,直接反馈给客户端,不进行重试。
- EXECUTE_LATER:稍后执行(需要重试),这种情况,不反馈客户端,重试策略采用1min,2min,3min的策略,默认最大重试次数为10次,用户可以通过参数设置修改这个重试次数。
- EXECUTE_EXCEPTION:执行异常, 这种情况也会重试(重试策略,同上)
采用FailStore机制来进行节点容错,Fail And Store,不会因为远程通信的不稳定性而影响当前应用的运行。具体FailStore说明,请参考概念说明中的FailStore说明。
项目主要采用maven进行构建,目前提供shell脚本的打包。环境依赖:Java(jdk1.6+)Maven
用户使用一般分为两种:
可以通过maven命令将lts的jar包上传到本地仓库中。在父pom.xml中添加相应的repository,并用deploy命令上传即可。具体引用方式可以参考lts中的例子即可。
需要将lts的各个模块打包成单独的jar包,并且将所有lts依赖包引入。具体引用哪些jar包可以参考lts中的例子即可。
提供(cmd)windows和(shell)linux两种版本脚本来进行编译和部署:
运行根目录下的
sh build.sh或build.cmd脚本,会在dist目录下生成lts-{version}-bin文件夹下面是其目录结构,其中bin目录主要是JobTracker和LTS-Admin的启动脚本。
jobtracker中是 JobTracker的配置文件和需要使用到的jar包,lts-admin是LTS-Admin相关的war包和配置文件。lts-{version}-bin的文件结构
--lts-${version}-bin |--bin | |--jobtracker.cmd | |--jobtracker.sh | |--lts-admin.cmd | |--lts-admin.sh | |--lts-monitor.cmd | |--lts-monitor.sh | |--tasktracker.sh |--conf | |--log4j.properties | |--lts-admin.cfg | |--lts-monitor.cfg | |--readme.txt | |--tasktracker.cfg | |--zoo | |--jobtracker.cfg | |--log4j.properties | |--lts-monitor.cfg |--lib | |-- *.jar |--war |--jetty | |--lib | |-- *.jar |--lts-admin.war
- JobTracker启动。如果你想启动一个节点,直接修改下
conf/zoo下的配置文件,然后运行sh jobtracker.sh zoo start即可,如果你想启动两个JobTracker节点,那么你需要拷贝一份zoo,譬如命名为zoo2,修改下zoo2下的配置文件,然后运行sh jobtracker.sh zoo2 start即可。logs文件夹下生成jobtracker-zoo.out日志。 - LTS-Admin启动.修改
conf/lts-monitor.cfg和conf/lts-admin.cfg下的配置,然后运行bin下的sh lts-admin.sh或lts-admin.cmd脚本即可。logs文件夹下会生成lts-admin.out日志,启动成功在日志中会打印出访问地址,用户可以通过这个访问地址访问了。
需要引入lts的jar包有lts-jobclient-{version}.jar,lts-core-{version}.jar 及其它第三方依赖jar。
JobClientjobClient =newRetryJobClient();jobClient.setNodeGroup("test_jobClient");jobClient.setClusterName("test_cluster");jobClient.setRegistryAddress("zookeeper://127.0.0.1:2181");jobClient.start();// 提交任务Jobjob =newJob();job.setTaskId("3213213123");job.setParam("shopId","11111");job.setTaskTrackerNodeGroup("test_trade_TaskTracker");// job.setCronExpression("0 0/1 * * * ?"); // 支持 cronExpression表达式// job.setTriggerTime(new Date()); // 支持指定时间执行Responseresponse =jobClient.submitJob(job);
<beanid="jobClient"class="com.github.ltsopensource.spring.JobClientFactoryBean"> <propertyname="clusterName"value="test_cluster"/> <propertyname="registryAddress"value="zookeeper://127.0.0.1:2181"/> <propertyname="nodeGroup"value="test_jobClient"/> <propertyname="masterChangeListeners"> <list> <beanclass="com.github.ltsopensource.example.support.MasterChangeListenerImpl"/> </list> </property> <propertyname="jobFinishedHandler"> <beanclass="com.github.ltsopensource.example.support.JobFinishedHandlerImpl"/> </property> <propertyname="configs"> <props> <!--参数 --> <propkey="job.fail.store">leveldb</prop> </props> </property></bean>
@ConfigurationpublicclassLTSSpringConfig {@Bean(name ="jobClient")publicJobClientgetJobClient()throwsException {JobClientFactoryBeanfactoryBean =newJobClientFactoryBean();factoryBean.setClusterName("test_cluster");factoryBean.setRegistryAddress("zookeeper://127.0.0.1:2181");factoryBean.setNodeGroup("test_jobClient");factoryBean.setMasterChangeListeners(newMasterChangeListener[]{newMasterChangeListenerImpl() });Propertiesconfigs =newProperties();configs.setProperty("job.fail.store","leveldb");factoryBean.setConfigs(configs);factoryBean.afterPropertiesSet();returnfactoryBean.getObject(); }}
需要引入lts的jar包有lts-tasktracker-{version}.jar,lts-core-{version}.jar 及其它第三方依赖jar。
publicclassMyJobRunnerimplementsJobRunner {@OverridepublicResultrun(JobContextjobContext)throwsThrowable {try {// TODO 业务逻辑// 会发送到 LTS (JobTracker上)jobContext.getBizLogger().info("测试,业务日志啊啊啊啊啊"); }catch (Exceptione) {returnnewResult(Action.EXECUTE_FAILED,e.getMessage()); }returnnewResult(Action.EXECUTE_SUCCESS,"执行成功了,哈哈"); }}
TaskTrackertaskTracker =newTaskTracker();taskTracker.setJobRunnerClass(MyJobRunner.class);taskTracker.setRegistryAddress("zookeeper://127.0.0.1:2181");taskTracker.setNodeGroup("test_trade_TaskTracker");taskTracker.setClusterName("test_cluster");taskTracker.setWorkThreads(20);taskTracker.start();
<beanid="taskTracker"class="com.github.ltsopensource.spring.TaskTrackerAnnotationFactoryBean"init-method="start"> <propertyname="jobRunnerClass"value="com.github.ltsopensource.example.support.MyJobRunner"/> <propertyname="bizLoggerLevel"value="INFO"/> <propertyname="clusterName"value="test_cluster"/> <propertyname="registryAddress"value="zookeeper://127.0.0.1:2181"/> <propertyname="nodeGroup"value="test_trade_TaskTracker"/> <propertyname="workThreads"value="20"/> <propertyname="masterChangeListeners"> <list> <beanclass="com.github.ltsopensource.example.support.MasterChangeListenerImpl"/> </list> </property> <propertyname="configs"> <props> <propkey="job.fail.store">leveldb</prop> </props> </property></bean>
@ConfigurationpublicclassLTSSpringConfigimplementsApplicationContextAware {privateApplicationContextapplicationContext;@OverridepublicvoidsetApplicationContext(ApplicationContextapplicationContext)throwsBeansException {this.applicationContext =applicationContext; }@Bean(name ="taskTracker")publicTaskTrackergetTaskTracker()throwsException {TaskTrackerAnnotationFactoryBeanfactoryBean =newTaskTrackerAnnotationFactoryBean();factoryBean.setApplicationContext(applicationContext);factoryBean.setClusterName("test_cluster");factoryBean.setJobRunnerClass(MyJobRunner.class);factoryBean.setNodeGroup("test_trade_TaskTracker");factoryBean.setBizLoggerLevel("INFO");factoryBean.setRegistryAddress("zookeeper://127.0.0.1:2181");factoryBean.setMasterChangeListeners(newMasterChangeListener[]{newMasterChangeListenerImpl() });factoryBean.setWorkThreads(20);Propertiesconfigs =newProperties();configs.setProperty("job.fail.store","leveldb");factoryBean.setConfigs(configs);factoryBean.afterPropertiesSet();// factoryBean.start();returnfactoryBean.getObject(); }}
一般在一个JVM中只需要一个JobClient实例即可,不要为每种任务都新建一个JobClient实例,这样会大大的浪费资源,因为一个JobClient可以提交多种任务。相同的一个JVM一般也尽量保持只有一个TaskTracker实例即可,多了就可能造成资源浪费。当遇到一个TaskTracker要运行多种任务的时候,请参考下面的 "一个TaskTracker执行多种任务"。
有的时候,业务场景需要执行多种任务,有些人会问,是不是要每种任务类型都要一个TaskTracker去执行。我的答案是否定的,如果在一个JVM中,最好使用一个TaskTracker去运行多种任务,因为一个JVM中使用多个TaskTracker实例比较浪费资源(当然当你某种任务量比较多的时候,可以将这个任务单独使用一个TaskTracker节点来执行)。那么怎么才能实现一个TaskTracker执行多种任务呢。下面是我给出来的参考例子。
/** * 总入口,在 taskTracker.setJobRunnerClass(JobRunnerDispatcher.class) * JobClient 提交 任务时指定 Job 类型 job.setParam("type", "aType") */publicclassJobRunnerDispatcherimplementsJobRunner {privatestaticfinalConcurrentHashMap<String/*type*/,JobRunner>JOB_RUNNER_MAP =newConcurrentHashMap<String,JobRunner>();static {JOB_RUNNER_MAP.put("aType",newJobRunnerA());// 也可以从Spring中拿JOB_RUNNER_MAP.put("bType",newJobRunnerB()); }@OverridepublicResultrun(JobContextjobContext)throwsThrowable {Jobjob =jobContext.getJob();Stringtype =job.getParam("type");returnJOB_RUNNER_MAP.get(type).run(job); }}classJobRunnerAimplementsJobRunner {@OverridepublicResultrun(JobContextjobContext)throwsThrowable {// TODO A类型Job的逻辑returnnull; }}classJobRunnerBimplementsJobRunner {@OverridepublicResultrun(JobContextjobContext)throwsThrowable {// TODO B类型Job的逻辑returnnull; }}
一般在编写TaskTracker的时候,只需要测试JobRunner的实现逻辑是否正确,又不想启动LTS进行远程测试。为了方便测试,LTS提供了JobRunner的快捷测试方法。自己的测试类集成com.github.ltsopensource.tasktracker.runner.JobRunnerTester即可,并实现initContext和newJobRunner方法即可。如lts-examples中的例子:
publicclassTestJobRunnerTesterextendsJobRunnerTester {publicstaticvoidmain(String[]args)throwsThrowable {// Mock Job 数据Jobjob =newJob();job.setTaskId("2313213");JobContextjobContext =newJobContext();jobContext.setJob(job);JobExtInfojobExtInfo =newJobExtInfo();jobExtInfo.setRetry(false);jobContext.setJobExtInfo(jobExtInfo);// 运行测试TestJobRunnerTestertester =newTestJobRunnerTester();Resultresult =tester.run(jobContext);System.out.println(JSON.toJSONString(result)); }@OverrideprotectedvoidinitContext() {// TODO 初始化Spring容器 }@OverrideprotectedJobRunnernewJobRunner() {returnnewTestJobRunner(); }}
对于Quartz的Cron任务只需要在Spring配置中增加一下代码就可以接入LTS平台
<beanclass="com.github.ltsopensource.spring.quartz.QuartzLTSProxyBean"> <propertyname="clusterName"value="test_cluster"/> <propertyname="registryAddress"value="zookeeper://127.0.0.1:2181"/> <propertyname="nodeGroup"value="quartz_test_group"/></bean>
@SpringBootApplication@EnableJobTracker// 启动JobTracker@EnableJobClient// 启动JobClient@EnableTaskTracker// 启动TaskTracker@EnableMonitor// 启动MonitorpublicclassApplication {publicstaticvoidmain(String[]args) {SpringApplication.run(Application.class,args); }}
剩下的就只是在application.properties中添加相应的配置就行了, 具体见lts-example中的com.github.ltsopensource.examples.springboot包下的例子
当机器有内网两个网卡的时候,有时候,用户想让LTS的流量走外网网卡,那么需要在host中,把主机名称的映射地址改为外网网卡地址即可,内网同理。
如果在节点启动的时候设置节点标识,LTS会默认设置一个UUID为节点标识,可读性会比较差,但是能保证每个节点的唯一性,如果用户能自己保证节点标识的唯一性,可以通过setIdentity 来设置,譬如如果每个节点都是部署在一台机器(一个虚拟机)上,那么可以将identity设置为主机名称
支持JobLogger,JobQueue等等的SPI扩展
工作年限 三年以上
学历要求 本科
期望层级 P6(资深Java工程师)/P7(技术专家)
岗位描述
会员平台,负责阿里巴巴集团的用户体系,支持集团内各线业务线用户类需求,支持集团对外合作的用户通和业务通。包括各个端的用户登录&授权、Session体系、注册、账户管理、账户安全等功能,底层的用户信息服务,会话和凭证管理等等,是集团最核心的产品线之一,每天承载千亿次调用量、峰值千万QPS、以及分布全球的混合云架构等等。
作为软件工程师,你将会在我们的核心产品上工作,这些产品为我们的商业基础设施提供关键功能,取决于你的兴趣和经验,你可以在如下的一个或多个领域工作:全球化,用户体验,数据安全,机器学习,系统高可用性等等。
- 独立完成中小型项目的系统分析、设计,并主导完成详细设计和编码的任务,确保项目的进度和质量;
- 能够在团队中完成code review的任务,确保相关代码的有效性和正确性,并能够通过code review提供相关性能以及稳定性的建议;
- 参与建设通用、灵活、智能的业务支撑平台,支撑上层多场景的复杂业务。岗位要求
- 扎实的java编程基础,熟悉常用的Java开源框架;
- 具有基于数据库、缓存、分布式存储开发高性能、高可用数据应用的实际经验,熟练掌握LINUX操作系统;
- 具备良好的识别和设计通用框架及模块的能力;
- 热爱技术,工作认真、严谨,对系统质量有近乎苛刻的要求意识,善于沟通与团队协作;
- 具备大型电子商务网站或金融行业核心系统开发、设计工作经验者优先;
- 具备大数据处理、算法、机器学习类工作经验优先。感兴趣,可以发简历到hugui.hg@alibaba-inc.com 欢迎投递
About
Distributed Scheduled Job Framework
Topics
Resources
License
Uh oh!
There was an error while loading.Please reload this page.
Stars
Watchers
Forks
Packages0
Uh oh!
There was an error while loading.Please reload this page.
Contributors13
Uh oh!
There was an error while loading.Please reload this page.

