Elastic Job是一个不错的分布式定时任务调度开源组件。使用它实现定时任务简单又方便,网上有很多种做法,今天记录一下我在项目中的一种简单使用方式。在介绍具体做法之前,先给出Elastic Job的官网,没用过想了解的可以前往:
http://elasticjob.io/
基于springboot的项目配置很简单,先来看看maven依赖:
<!-- 任务调度 -->
<dependency>
<groupId>com.dangdang</groupId>
<artifactId>elastic-job-lite-core</artifactId>
<version>2.1.5</version>
</dependency>
<dependency>
<groupId>com.dangdang</groupId>
<artifactId>elastic-job-lite-spring</artifactId>
<version>2.1.5</version>
</dependency>
<!-- 如果需要任务管理相关的接口则可以加上,不需要则不加 -->
<dependency>
<groupId>com.dangdang</groupId>
<artifactId>elastic-job-lite-lifecycle</artifactId>
<version>2.1.5</version>
</dependency>
核心依赖前面两个就足以,如果你想对任务更好的控制可以把第三个依赖也加上,此文不介绍管理功能。
来看springboot工程的application.properties,我们加入如下的配置:
#任务注册中心地址,多个逗号隔开
job.registry.center.address=172.17.0.2:2181,172.17.0.3:2181,172.17.0.4:2181
#任务注册中心命名空间
job.registry.center.namespace=elastic-job-lite
#起价计算任务
job.startPrice.cron=0/5 * * * * ?
job.startPrice.shardingTotalCount=1
job.startPrice.shardingItemParameters=
job.startPrice.jobParameter=
接下来任务中心配置类JobRegistryCenterConfig,如下:
package cn.lovecto.promotion.config;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperConfiguration;
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter;
/**
* 任务注册中心
*
*/
@Configuration
public class JobRegistryCenterConfig {
@Value("${job.registry.center.address}")
private String connectString;
@Value("${job.registry.center.namespace}")
private String namespace;
/**
* 任务注册中心
* @return
*/
@Bean(initMethod = "init")
public ZookeeperRegistryCenter regCenter() {
return new ZookeeperRegistryCenter(new ZookeeperConfiguration(connectString, namespace));
}
}
为了更方便的创建任务类,我们定义一个任务抽象类AbstractJob,抽象类中包含了这个任务需要的主要参数,如任务执行时间表达式、任务分片总数、任务分片参数、任务参数等。AbstractJob实现SimpleJob接口,代码如下:
package cn.lovecto.promotion.job;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.dangdang.ddframe.job.api.ShardingContext;
import com.dangdang.ddframe.job.api.simple.SimpleJob;
import com.dangdang.ddframe.job.config.JobCoreConfiguration;
import com.dangdang.ddframe.job.config.simple.SimpleJobConfiguration;
import com.dangdang.ddframe.job.lite.config.LiteJobConfiguration;
import com.dangdang.ddframe.job.lite.spring.api.SpringJobScheduler;
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter;
/**
* 抽象的任务
*
*
*/
public abstract class AbstractJob implements SimpleJob {
protected static final Logger LOG = LoggerFactory.getLogger("job");
/** 任务执行时间表达式 */
private String cron;
/** 任务分片总数 */
private Integer shardingTotalCount;
/** 任务分片参数 */
private String shardingItemParameters;
/** 任务参数 */
private String jobParameter;
/** 是否已经启动 */
private boolean started = false;
public AbstractJob() {
super();
}
/**
* 构造函数
*
* @param cron
* @param shardingTotalCount
* @param shardingItemParameters
* @param jobParameter
*/
public AbstractJob(String cron, Integer shardingTotalCount,
String shardingItemParameters, String jobParameter) {
super();
this.cron = cron;
this.shardingTotalCount = shardingTotalCount;
this.shardingItemParameters = shardingItemParameters;
this.jobParameter = jobParameter;
}
/**
* 启动任务,调用此方法,将启动任务调度
* @param zookeeperRegistryCenter 任务注册中心
*/
protected synchronized void start(ZookeeperRegistryCenter zookeeperRegistryCenter) {
// 保证一个任务只执行一次start
if (this.started){
return;
}
this.started = true;
SpringJobScheduler s = new SpringJobScheduler(this,
zookeeperRegistryCenter, getLiteJobConfiguration());
s.init();
}
@Override
public void execute(ShardingContext shardingContext) {
LOG.info(String.format("Thread ID:%s,任务总片数:%s," + "当前分片项:%s,当前参数:%s,"
+ "当前任务名称:%s,当前任务参数:%s", Thread.currentThread().getId(),
shardingContext.getShardingTotalCount(),
shardingContext.getShardingItem(),
shardingContext.getShardingParameter(),
shardingContext.getJobName(), shardingContext.getJobParameter()));
executeJob(shardingContext.getShardingTotalCount(),
shardingContext.getShardingItem(),
shardingContext.getShardingParameter(),
shardingContext.getJobParameter());
}
/**
* 执行任务
*
* @param shardingTotalCount
* 任务分片数
* @param shardingItem
* 当前分配序号
* @param parameter
* 当前分配任务参数
*/
public abstract void executeJob(Integer shardingTotalCount,
Integer shardingItem, String itemParameter, String jobParameter);
/**
* 获取任务配置
* @return
*/
protected LiteJobConfiguration getLiteJobConfiguration() {
return LiteJobConfiguration
.newBuilder(
new SimpleJobConfiguration(JobCoreConfiguration
.newBuilder(this.getClass().getName(),
this.cron, this.shardingTotalCount)
.shardingItemParameters(
this.shardingItemParameters)
.jobParameter(this.jobParameter).build(), this
.getClass().getCanonicalName()))
.overwrite(true).build();
}
public String getCron() {
return cron;
}
public void setCron(String cron) {
this.cron = cron;
}
public Integer getShardingTotalCount() {
return shardingTotalCount;
}
public void setShardingTotalCount(Integer shardingTotalCount) {
this.shardingTotalCount = shardingTotalCount;
}
public String getShardingItemParameters() {
return shardingItemParameters;
}
public void setShardingItemParameters(String shardingItemParameters) {
this.shardingItemParameters = shardingItemParameters;
}
/**
* @return the jobParameter
*/
public String getJobParameter() {
return jobParameter;
}
/**
* @param jobParameter
* the jobParameter to set
*/
public void setJobParameter(String jobParameter) {
this.jobParameter = jobParameter;
}
}
抽象类中的excute会打印当前任务的一些线程信息及任务分片信息,子类只需要继承抽象类,实现executeJob方法即可。
由于我们是基于springboot,为了让应用启动后,任务就自动注册,我们只需要在任务类上加入@Component,在任务类bean初始化完成后,执行一个init方法即可开启任务,这里用一个TestJob为例,如下:
package cn.lovecto.promotion.job;
import javax.annotation.PostConstruct;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter;
@Component
public class TestJob extends AbstractJob {
private static final Logger log = LoggerFactory.getLogger("job");
@Autowired
private ZookeeperRegistryCenter zookeeperRegistryCenter;
public TestJob(
@Value("${job.startPrice.cron}") String cron,
@Value("${job.startPrice.shardingTotalCount}") Integer shardingTotalCount,
@Value("${job.startPrice.shardingItemParameters}") String shardingItemParameters,
@Value("${job.startPrice.jobParameter}") String jobParameter) {
super(cron, shardingTotalCount, shardingItemParameters, jobParameter);
}
@PostConstruct
public void init() {
start(zookeeperRegistryCenter);
}
@Override
public void executeJob(Integer shardingTotalCount, Integer shardingItem,
String itemParameter, String jobParameter) {
log.info("这是一个测试任务");
}
}
TestJob的构造方法参数直接使用application.properties中配置的相关参数,启动应用后,执行结果如下:
INFO 2018-08-29 13:33:55.216 [AbstractJob.java:83] Thread ID:49,任务总片数:1,当前分片项:0,当前参数:null,当前任务名称:cn.lovecto.promotion.job.TestJob,当前任务参数:
INFO 2018-08-29 13:33:55.217 [TestJob.java:47] 这是一个测试任务
INFO 2018-08-29 13:34:00.082 [AbstractJob.java:83] Thread ID:49,任务总片数:1,当前分片项:0,当前参数:null,当前任务名称:cn.lovecto.promotion.job.TestJob,当前任务参数:
INFO 2018-08-29 13:34:00.082 [TestJob.java:47] 这是一个测试任务
INFO 2018-08-29 13:34:05.068 [AbstractJob.java:83] Thread ID:49,任务总片数:1,当前分片项:0,当前参数:null,当前任务名称:cn.lovecto.promotion.job.TestJob,当前任务参数:
INFO 2018-08-29 13:34:05.069 [TestJob.java:47] 这是一个测试任务
这种方式适合一些应用需要定时任务但不需要做太多任务管理功能的方式,如果任务分片数为1,能保证每次任务只有一个节点执行,如果有多个分片,则能够分担单个机器的负载,使整个大的任务被分配到几个节点进行执行,分配逻辑可根据自己的业务场景来处理,如按照数据库主键hash、取模等,或者按照分页策略等。