@kangwg
2017-04-09T15:15:27.000000Z
字数 14962
阅读 1291
layout: post
title: "quartz"
subtitle: "quartz"
date: 2017-04-09 12:00:00
author: "kangwg"
header-img: "img/post-bg-2015.jpg"
catalog: true
一.简单的spring boot 的quartz
@Component
@Configurable
@EnableScheduling
public class TaskScheduler {
private static Logger log = LoggerFactory.getLogger(TaskScheduler.class);
@Scheduled(cron = "0 0 2 * * ?") // 每天2am
public void test() {
//do something
}
}
二.java 自带的quartz
import java.util.*;
import javax.annotation.PostConstruct;
import org.slf4j.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import cn.swao.framework.def.StatusEnum;
import cn.swao.jinyao.crawl.StartCatchService;
import cn.swao.jinyao.model.*;
import cn.swao.jinyao.repository.*;
import cn.swao.jinyao.util.DataUtils;
@Component
public class TaskScheduler {
private static Logger log = LoggerFactory.getLogger(TaskScheduler.class);
@Autowired
private StartCatchService startCatchService;//定时任务的方法存放
@Autowired
private QuartzRepository quartzRepository;//获取定时任务的参数dao层
@Autowired
public QuartzLogRepository quartzLogRepository; 定时任务日志记录
@PostConstruct
public void doScheduleAuto() {
List<Quartz> quartzList = quartzRepository.findAll();
for (Quartz quartz : quartzList) {
int status = quartz.getStatus();
if (StatusEnum.VALID.getValue() == status) {
addSchedule(quartz);
}
}
}
public void addSchedule(Quartz quartz) {
String method = quartz.getMethod();// 要执行的方法
String startTime = quartz.getStartTime();// 执行的开始时间
String period = quartz.getPeriod();// 执行的间隔
String explain = quartz.getExplain();// 说明
Timer timer = new Timer();
timer.scheduleAtFixedRate(new TimerTask() {
public void run() {
QuartzLog quartzLog = new QuartzLog(new Date(), method, 1, 0, "sucess");
try {
StartCatchService.class.getMethod(method).invoke(startCatchService);
} catch (Exception e) {
quartzLog.setResult(2);
quartzLog.setMessage("fail");
log.info("开启定时调度爬虫失败,method={},startTime={},period={},explain={}", method, startTime, period, explain);
}
log.info("定时调度爬取数据sucess:explain={}", explain);
quartzLogRepository.save(quartzLog);
}
}, DataUtils.delayed(startTime), Long.valueOf(period));// 这里设定将延时每天固定执行
log.info("加载定时调度爬取数据:method={},startTime={},period={},explain={},sucess", method, startTime, period, explain);
}
}
将quartz的调度信息放在表中,启动项目后自动加载
三.利用第三方和spring结合
导入jar包
<dependency>
<groupId>org.quartz-scheduler</groupId>
<artifactId>quartz</artifactId>
<version>2.2.2</version>
</dependency>
1.根据quartz自带的sql创建表
2.建立实体TaskJob用于存储和展现quartz信息,创建用于记录job运行的日志表
3.创建quartz.properties
添加配置
--jobListener
org.quartz.jobListener.globalLogJobListener.class=
--jobFactory
org.quartz.scheduler.jobFactory.class=
4.创建jobFactory用于job中spring的注入
import org.quartz.spi.TriggerFiredBundle;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.AutowireCapableBeanFactory;
import org.springframework.scheduling.quartz.SpringBeanJobFactory;
import org.springframework.stereotype.Component;
@Component
public class AutowireJobFactory extends SpringBeanJobFactory {
@Autowired
private AutowireCapableBeanFactory beanFactory;
@Override
protected Object createJobInstance(TriggerFiredBundle bundle) throws Exception {
Object jobInstance = super.createJobInstance(bundle);
beanFactory.autowireBean(jobInstance);
return jobInstance;
}
}
5.创建quartz的config文件由于读取配置和生成SchedulerFactoryBean
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.PropertiesFactoryBean;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.quartz.SchedulerFactoryBean;
import java.io.IOException;
import java.util.Properties;
@Configuration
@EnableScheduling
public class SchedulerConfig {
@Autowired
private AutowireJobFactory autowireJobFactory;
@Bean
public SchedulerFactoryBean schedulerFactoryBean() throws IOException {
SchedulerFactoryBean factory = new SchedulerFactoryBean();
factory.setOverwriteExistingJobs(true);
// 延时启动
factory.setStartupDelay(20);
// 加载quartz数据源配置
factory.setQuartzProperties(quartzProperties());
factory.setAutoStartup(false);
// 自定义Job Factory,用于Spring注入
factory.setJobFactory(autowireJobFactory);
return factory;
}
/**
* 加载quartz数据源配置
*
* @return
* @throws IOException
*/
@Bean
public Properties quartzProperties() throws IOException {
PropertiesFactoryBean propertiesFactoryBean = new PropertiesFactoryBean();
propertiesFactoryBean.setLocation(new ClassPathResource("/quartz.properties"));
propertiesFactoryBean.afterPropertiesSet();
return propertiesFactoryBean.getObject();
}
}
6.创建quartz的service层,用于创建和控制job
import java.text.ParseException;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.annotation.PostConstruct;
import org.quartz.*;
import org.quartz.Trigger.TriggerState;
import org.quartz.impl.matchers.GroupMatcher;
import org.quartz.impl.triggers.CronTriggerImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Lazy;
import org.springframework.scheduling.quartz.SchedulerFactoryBean;
import org.springframework.stereotype.Service;
import com.google.common.collect.Lists;
import cn.swao.framework.model.TaskJob;
@Service
public class SchedulerService {
private static Logger log = LoggerFactory.getLogger(SchedulerService.class);
@Autowired
private SchedulerFactoryBean schedulerFactoryBean;
@Autowired
private ApplicationContext applicationContext;
private Scheduler scheduler;
@PostConstruct
public void init() throws SchedulerException {
scheduler = schedulerFactoryBean.getScheduler();
scheduler.getContext().put("applicationContext", applicationContext);
}
/**
* 启动quartz
*/
public void start() throws SchedulerException {
if (!scheduler.isStarted())
scheduler.start();
}
/**
* 创建任务
*/
public boolean createJob(TaskJob taskJob) throws SchedulerException {
boolean result = false;
if (taskJob != null) {
try {
JobDetail jobDetail = this.createJobDetail(taskJob.getJobName(), taskJob.getJobGroup(), taskJob.getJobClassName(), taskJob.getDataMap(), taskJob.getDescription());
List<Map<String, String>> triggerList = taskJob.getTriggerList();
for (int i = 0; i < triggerList.size(); i++) {
Map<String, String> map = triggerList.get(i);
CronTriggerImpl trigger = (CronTriggerImpl) this.createTrigger(taskJob.getTriggerName(map), taskJob.getTriggerGroup(map), taskJob.getCronExpression(map));
if (i > 0) {
trigger.setJobName(taskJob.getJobName());
trigger.setJobName(taskJob.getJobGroup());
scheduler.scheduleJob(trigger);
} else {
scheduler.scheduleJob(jobDetail, trigger);
}
}
log.info("createJob success taskJob:{}", taskJob);
result = true;
} catch (Exception e) {
log.info(taskJob.getJobClassName() + ".createJob (添加任务 ) 发生异常", e);
}
}
return result;
}
/**
* 修改一个任务的触发时间
*/
public void modifyJobTime(String triggerName, String triggerGroupName, String time) throws SchedulerException, ParseException {
TriggerKey triggerKey = TriggerKey.triggerKey(triggerName, triggerGroupName);
CronTrigger trigger = (CronTrigger) scheduler.getTrigger(triggerKey);
if (trigger == null) {
return;
}
String oldTime = trigger.getCronExpression();
if (!oldTime.equalsIgnoreCase(time)) {
// 触发器
TriggerBuilder<Trigger> triggerBuilder = TriggerBuilder.newTrigger();
// 触发器名,触发器组
triggerBuilder.withIdentity(triggerName, triggerGroupName);
triggerBuilder.startNow();
// 触发器时间设定
triggerBuilder.withSchedule(CronScheduleBuilder.cronSchedule(time));
// 创建Trigger对象
trigger = (CronTrigger) triggerBuilder.build();
// 修改一个任务的触发时间
this.rescheduleJob(triggerKey, trigger);
log.info("modify success triggerName:{},triggerGroupName:{},time:{}", triggerName, triggerGroupName, time);
}
}
/**
* 追加触发器
*/
public void addJobTime(String jobName, String jobGroup, String triggerName, String triggerGroupName, String time) throws SchedulerException, ParseException {
TriggerKey triggerKey = TriggerKey.triggerKey(triggerName, triggerGroupName);
JobKey jobKey = JobKey.jobKey(jobName, jobGroup);
CronTriggerImpl trigger = (CronTriggerImpl) this.createTrigger(triggerName, triggerGroupName, time);
if (scheduler.checkExists(jobKey)) {
trigger.setJobName(jobName);
trigger.setJobName(jobGroup);
scheduler.scheduleJob(trigger);
}
}
/**
* 移除一个任务以及它的触发器
*
*/
public void removeJob(String jobName, String jobGroup, String triggerName, String triggerGroup) throws SchedulerException {
this.pauseTrigger(triggerName, triggerGroup);
this.unscheduleJob(triggerName, triggerGroup);
this.deleteJob(jobName, jobGroup);
log.info("removeJob success jobName{},jobGroup{},triggerName{},triggerGroup{}", jobName, jobGroup, triggerName, triggerGroup);
}
/**
* 获取所有的任务
*
* @return
* @throws SchedulerException
*/
public List<TaskJob> getAllTaskJob() throws SchedulerException {
List<TaskJob> list = Lists.newArrayList();
for (String groupName : scheduler.getJobGroupNames()) {
for (JobKey jobKey : scheduler.getJobKeys(GroupMatcher.jobGroupEquals(groupName))) {
JobDetail jobDetail = scheduler.getJobDetail(jobKey);
List<Trigger> triggers = (List<Trigger>) scheduler.getTriggersOfJob(jobKey);
TaskJob taskJob = new TaskJob();
for (Trigger trigger : triggers) {
TriggerState triggerState = scheduler.getTriggerState(trigger.getKey());
taskJob.setTrigger(trigger, triggerState.name());
}
taskJob.setJobDetail(jobDetail);
list.add(taskJob);
}
}
return list;
}
/**
* 获取正在运行的任务
*
* @throws SchedulerException
*/
public List<TaskJob> getAllExecutingTaskJob() throws SchedulerException {
List<TaskJob> list = Lists.newArrayList();
List<JobExecutionContext> currentlyExecutingJobs = scheduler.getCurrentlyExecutingJobs();
for (JobExecutionContext jobExecutionContext : currentlyExecutingJobs) {
Trigger trigger = jobExecutionContext.getTrigger();
TriggerState triggerState = scheduler.getTriggerState(trigger.getKey());
TaskJob taskJob = new TaskJob(jobExecutionContext.getJobDetail(), jobExecutionContext.getTrigger(), triggerState.name());
list.add(taskJob);
}
return list;
}
/**
* 生成触发器
*/
public Trigger createTrigger(String triggerName, String triggerGroup, String cronExpression) {
CronScheduleBuilder cronScheduleBuilder = CronScheduleBuilder.cronSchedule(cronExpression);
Trigger trigger = TriggerBuilder.newTrigger().withIdentity(triggerName, triggerGroup).withSchedule(cronScheduleBuilder).build();
return trigger;
}
/**
* 生成jobDetail
*/
public JobDetail createJobDetail(String jobName, String jobGroup, String jobClassName, Map<String, Object> dataMap, String description) throws ClassNotFoundException {
Class<? extends Job> jobClass = (Class<? extends Job>) Class.forName(jobClassName);
JobDetail jobDetail = JobBuilder.newJob(jobClass).withIdentity(jobName, jobGroup).withDescription(description).storeDurably().build();
JobDataMap jobDataMap = jobDetail.getJobDataMap();
jobDataMap.putAll(dataMap);
return jobDetail;
}
/**
* 获取JobDetail
*/
public JobDetail getJobDetail(String jobName, String jobGroup) throws SchedulerException {
JobKey jobKey = JobKey.jobKey(jobName, jobGroup);
JobDetail jobDetail = scheduler.getJobDetail(jobKey);
return jobDetail;
}
/**
* 获取Trigger
*/
public Trigger getTrigger(String triggerName, String triggerGroup) throws SchedulerException {
TriggerKey triggerKey = TriggerKey.triggerKey(triggerName, triggerGroup);
Trigger trigger = scheduler.getTrigger(triggerKey);
return trigger;
}
/**
* 删除相关的job任务
*
*/
public boolean deleteJob(String jobName, String jobGroup) throws SchedulerException {
JobKey jobKey = JobKey.jobKey(jobName, jobGroup);
return scheduler.deleteJob(jobKey);
}
/**
* 停止一个job任务
*
* @throws SchedulerException
*/
public void pauseJob(String jobName, String jobGroup) throws SchedulerException {
JobKey jobKey = JobKey.jobKey(jobName, jobGroup);
scheduler.pauseJob(jobKey);
}
/**
* 恢复相关的job任务
*
*/
public void resumeJob(String jobName, String jobGroup) throws SchedulerException {
JobKey jobKey = JobKey.jobKey(jobName, jobGroup);
scheduler.resumeJob(jobKey);
}
/**
* 通过触发器停止调度Job任务
*
*/
public boolean unscheduleJob(String triggerName, String triggerGroup) throws SchedulerException {
TriggerKey triggerKey = TriggerKey.triggerKey(triggerName, triggerGroup);
return scheduler.unscheduleJob(triggerKey);
}
/**
* 重新恢复触发器相关的job任务
*
*/
public Date rescheduleJob(TriggerKey triggerkey, Trigger trigger) throws SchedulerException {
return scheduler.rescheduleJob(triggerkey, trigger);
}
/**
* 停止使用相关的触发器
*
*/
public void pauseTrigger(String triggerName, String triggerGroup) throws SchedulerException {
TriggerKey triggerKey = TriggerKey.triggerKey(triggerName, triggerGroup);
scheduler.pauseTrigger(triggerKey);
}
/**
* 恢复使用相关的触发器
*
*/
public void resumeTrigger(String triggerName, String triggerGroup) throws SchedulerException {
TriggerKey triggerKey = TriggerKey.triggerKey(triggerName, triggerGroup);
scheduler.resumeTrigger(triggerKey);
}
4.创建jobFactory用于job中spring的注入
import org.quartz.spi.TriggerFiredBundle;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.AutowireCapableBeanFactory;
import org.springframework.scheduling.quartz.SpringBeanJobFactory;
import org.springframework.stereotype.Component;
@Component
public class AutowireJobFactory extends SpringBeanJobFactory {
@Autowired
private AutowireCapableBeanFactory beanFactory;
@Override
protected Object createJobInstance(TriggerFiredBundle bundle) throws Exception {
Object jobInstance = super.createJobInstance(bundle);
beanFactory.autowireBean(jobInstance);
return jobInstance;
}
}
7.创建全局的job监听
import org.quartz.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.stereotype.Component;
import java.util.Date;
@Component
public class GlobalLogJobListener implements JobListener {
private static Logger log = LoggerFactory.getLogger(GlobalLogJobListener.class);
@Autowired
private ScheduleLogService scheduleLogService;//用于记录log的service
@Override
public String getName() {
return "globalLogJobListener";
}
// Scheduler 在 JobDetail 将要被执行时调用这个方法。
@Override
public void jobToBeExecuted(JobExecutionContext context) {
try {
ApplicationContext applicationContext = (ApplicationContext) (context.getScheduler().getContext().get("applicationContext"));
scheduleLogService = applicationContext.getBean(ScheduleLogService.class);
} catch (SchedulerException e) {
e.printStackTrace();
}
JobDetail jobDetail = context.getJobDetail();
Job jobInstance = context.getJobInstance();
this.getScheduleLog(jobDetail);
}
// Scheduler 在 JobDetail 即将被执行,但又被 TriggerListener 否决了时调用这个方法。
@Override
public void jobExecutionVetoed(JobExecutionContext context) {
}
// Scheduler 在 JobDetail 被执行之后调用这个方法。
@Override
public void jobWasExecuted(JobExecutionContext context, JobExecutionException jobException) {
JobDetail jobDetail = context.getJobDetail();
ScheduleLog scheduleLog = this.getScheduleLog(jobDetail);
// 执行方法后异常
if (jobException != null) {
String message = jobException.getMessage();
scheduleLog.setResultMessage(message);
scheduleLog.setResultCode(0);
} else {
scheduleLog.setResultMessage("success");
scheduleLog.setResultCode(1);
}
ScheduleLog create = this.scheduleLogService.create(scheduleLog);
log.info("excute job result:{}", create);
}
public ScheduleLog getScheduleLog(JobDetail jobDetail) {
ScheduleLog scheduleLog = (ScheduleLog) jobDetail.getJobDataMap().get("scheduleLog");
if (scheduleLog == null) {
JobKey key = jobDetail.getKey();
scheduleLog = new ScheduleLog();
scheduleLog.setJobName(key.getName());
scheduleLog.setJobGroup(key.getGroup());
scheduleLog.setStartTime(new Date());
jobDetail.getJobDataMap().put("scheduleLog", scheduleLog);
}
scheduleLog.setEndTime(new Date());
return scheduleLog;
}
}
8.最后提供接口,让job可配置化,可以从界面上操控job