@kangwg
2017-04-09T07:15:27.000000Z
字数 14962
阅读 1385
layout: post
title: "quartz"
subtitle: "quartz"
date: 2017-04-09 12:00:00
author: "kangwg"
header-img: "img/post-bg-2015.jpg"
catalog: true
一.简单的spring boot 的quartz
@Component@Configurable@EnableSchedulingpublic class TaskScheduler {private static Logger log = LoggerFactory.getLogger(TaskScheduler.class);@Scheduled(cron = "0 0 2 * * ?") // 每天2ampublic void test() {//do something}}
二.java 自带的quartz
import java.util.*;import javax.annotation.PostConstruct;import org.slf4j.*;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Component;import cn.swao.framework.def.StatusEnum;import cn.swao.jinyao.crawl.StartCatchService;import cn.swao.jinyao.model.*;import cn.swao.jinyao.repository.*;import cn.swao.jinyao.util.DataUtils;@Componentpublic class TaskScheduler {private static Logger log = LoggerFactory.getLogger(TaskScheduler.class);@Autowiredprivate StartCatchService startCatchService;//定时任务的方法存放@Autowiredprivate QuartzRepository quartzRepository;//获取定时任务的参数dao层@Autowiredpublic QuartzLogRepository quartzLogRepository; 定时任务日志记录@PostConstructpublic void doScheduleAuto() {List<Quartz> quartzList = quartzRepository.findAll();for (Quartz quartz : quartzList) {int status = quartz.getStatus();if (StatusEnum.VALID.getValue() == status) {addSchedule(quartz);}}}public void addSchedule(Quartz quartz) {String method = quartz.getMethod();// 要执行的方法String startTime = quartz.getStartTime();// 执行的开始时间String period = quartz.getPeriod();// 执行的间隔String explain = quartz.getExplain();// 说明Timer timer = new Timer();timer.scheduleAtFixedRate(new TimerTask() {public void run() {QuartzLog quartzLog = new QuartzLog(new Date(), method, 1, 0, "sucess");try {StartCatchService.class.getMethod(method).invoke(startCatchService);} catch (Exception e) {quartzLog.setResult(2);quartzLog.setMessage("fail");log.info("开启定时调度爬虫失败,method={},startTime={},period={},explain={}", method, startTime, period, explain);}log.info("定时调度爬取数据sucess:explain={}", explain);quartzLogRepository.save(quartzLog);}}, DataUtils.delayed(startTime), Long.valueOf(period));// 这里设定将延时每天固定执行log.info("加载定时调度爬取数据:method={},startTime={},period={},explain={},sucess", method, startTime, period, explain);}}
将quartz的调度信息放在表中,启动项目后自动加载
三.利用第三方和spring结合
导入jar包
<dependency>
<groupId>org.quartz-scheduler</groupId>
<artifactId>quartz</artifactId>
<version>2.2.2</version>
</dependency>
1.根据quartz自带的sql创建表
2.建立实体TaskJob用于存储和展现quartz信息,创建用于记录job运行的日志表
3.创建quartz.properties
添加配置
--jobListener
org.quartz.jobListener.globalLogJobListener.class=
--jobFactory
org.quartz.scheduler.jobFactory.class=
4.创建jobFactory用于job中spring的注入
import org.quartz.spi.TriggerFiredBundle;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.beans.factory.config.AutowireCapableBeanFactory;import org.springframework.scheduling.quartz.SpringBeanJobFactory;import org.springframework.stereotype.Component;@Componentpublic class AutowireJobFactory extends SpringBeanJobFactory {@Autowiredprivate AutowireCapableBeanFactory beanFactory;@Overrideprotected Object createJobInstance(TriggerFiredBundle bundle) throws Exception {Object jobInstance = super.createJobInstance(bundle);beanFactory.autowireBean(jobInstance);return jobInstance;}}
5.创建quartz的config文件由于读取配置和生成SchedulerFactoryBean
import org.springframework.beans.factory.annotation.Autowired;import org.springframework.beans.factory.config.PropertiesFactoryBean;import org.springframework.context.ApplicationContext;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.core.io.ClassPathResource;import org.springframework.scheduling.annotation.EnableScheduling;import org.springframework.scheduling.quartz.SchedulerFactoryBean;import java.io.IOException;import java.util.Properties;@Configuration@EnableSchedulingpublic class SchedulerConfig {@Autowiredprivate AutowireJobFactory autowireJobFactory;@Beanpublic SchedulerFactoryBean schedulerFactoryBean() throws IOException {SchedulerFactoryBean factory = new SchedulerFactoryBean();factory.setOverwriteExistingJobs(true);// 延时启动factory.setStartupDelay(20);// 加载quartz数据源配置factory.setQuartzProperties(quartzProperties());factory.setAutoStartup(false);// 自定义Job Factory,用于Spring注入factory.setJobFactory(autowireJobFactory);return factory;}/*** 加载quartz数据源配置** @return* @throws IOException*/@Beanpublic Properties quartzProperties() throws IOException {PropertiesFactoryBean propertiesFactoryBean = new PropertiesFactoryBean();propertiesFactoryBean.setLocation(new ClassPathResource("/quartz.properties"));propertiesFactoryBean.afterPropertiesSet();return propertiesFactoryBean.getObject();}}
6.创建quartz的service层,用于创建和控制job
import java.text.ParseException;import java.util.Date;import java.util.List;import java.util.Map;import java.util.Set;import javax.annotation.PostConstruct;import org.quartz.*;import org.quartz.Trigger.TriggerState;import org.quartz.impl.matchers.GroupMatcher;import org.quartz.impl.triggers.CronTriggerImpl;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;import org.springframework.context.ApplicationContext;import org.springframework.context.annotation.Lazy;import org.springframework.scheduling.quartz.SchedulerFactoryBean;import org.springframework.stereotype.Service;import com.google.common.collect.Lists;import cn.swao.framework.model.TaskJob;@Servicepublic class SchedulerService {private static Logger log = LoggerFactory.getLogger(SchedulerService.class);@Autowiredprivate SchedulerFactoryBean schedulerFactoryBean;@Autowiredprivate ApplicationContext applicationContext;private Scheduler scheduler;@PostConstructpublic void init() throws SchedulerException {scheduler = schedulerFactoryBean.getScheduler();scheduler.getContext().put("applicationContext", applicationContext);}/*** 启动quartz*/public void start() throws SchedulerException {if (!scheduler.isStarted())scheduler.start();}/*** 创建任务*/public boolean createJob(TaskJob taskJob) throws SchedulerException {boolean result = false;if (taskJob != null) {try {JobDetail jobDetail = this.createJobDetail(taskJob.getJobName(), taskJob.getJobGroup(), taskJob.getJobClassName(), taskJob.getDataMap(), taskJob.getDescription());List<Map<String, String>> triggerList = taskJob.getTriggerList();for (int i = 0; i < triggerList.size(); i++) {Map<String, String> map = triggerList.get(i);CronTriggerImpl trigger = (CronTriggerImpl) this.createTrigger(taskJob.getTriggerName(map), taskJob.getTriggerGroup(map), taskJob.getCronExpression(map));if (i > 0) {trigger.setJobName(taskJob.getJobName());trigger.setJobName(taskJob.getJobGroup());scheduler.scheduleJob(trigger);} else {scheduler.scheduleJob(jobDetail, trigger);}}log.info("createJob success taskJob:{}", taskJob);result = true;} catch (Exception e) {log.info(taskJob.getJobClassName() + ".createJob (添加任务 ) 发生异常", e);}}return result;}/*** 修改一个任务的触发时间*/public void modifyJobTime(String triggerName, String triggerGroupName, String time) throws SchedulerException, ParseException {TriggerKey triggerKey = TriggerKey.triggerKey(triggerName, triggerGroupName);CronTrigger trigger = (CronTrigger) scheduler.getTrigger(triggerKey);if (trigger == null) {return;}String oldTime = trigger.getCronExpression();if (!oldTime.equalsIgnoreCase(time)) {// 触发器TriggerBuilder<Trigger> triggerBuilder = TriggerBuilder.newTrigger();// 触发器名,触发器组triggerBuilder.withIdentity(triggerName, triggerGroupName);triggerBuilder.startNow();// 触发器时间设定triggerBuilder.withSchedule(CronScheduleBuilder.cronSchedule(time));// 创建Trigger对象trigger = (CronTrigger) triggerBuilder.build();// 修改一个任务的触发时间this.rescheduleJob(triggerKey, trigger);log.info("modify success triggerName:{},triggerGroupName:{},time:{}", triggerName, triggerGroupName, time);}}/*** 追加触发器*/public void addJobTime(String jobName, String jobGroup, String triggerName, String triggerGroupName, String time) throws SchedulerException, ParseException {TriggerKey triggerKey = TriggerKey.triggerKey(triggerName, triggerGroupName);JobKey jobKey = JobKey.jobKey(jobName, jobGroup);CronTriggerImpl trigger = (CronTriggerImpl) this.createTrigger(triggerName, triggerGroupName, time);if (scheduler.checkExists(jobKey)) {trigger.setJobName(jobName);trigger.setJobName(jobGroup);scheduler.scheduleJob(trigger);}}/*** 移除一个任务以及它的触发器**/public void removeJob(String jobName, String jobGroup, String triggerName, String triggerGroup) throws SchedulerException {this.pauseTrigger(triggerName, triggerGroup);this.unscheduleJob(triggerName, triggerGroup);this.deleteJob(jobName, jobGroup);log.info("removeJob success jobName{},jobGroup{},triggerName{},triggerGroup{}", jobName, jobGroup, triggerName, triggerGroup);}/*** 获取所有的任务** @return* @throws SchedulerException*/public List<TaskJob> getAllTaskJob() throws SchedulerException {List<TaskJob> list = Lists.newArrayList();for (String groupName : scheduler.getJobGroupNames()) {for (JobKey jobKey : scheduler.getJobKeys(GroupMatcher.jobGroupEquals(groupName))) {JobDetail jobDetail = scheduler.getJobDetail(jobKey);List<Trigger> triggers = (List<Trigger>) scheduler.getTriggersOfJob(jobKey);TaskJob taskJob = new TaskJob();for (Trigger trigger : triggers) {TriggerState triggerState = scheduler.getTriggerState(trigger.getKey());taskJob.setTrigger(trigger, triggerState.name());}taskJob.setJobDetail(jobDetail);list.add(taskJob);}}return list;}/*** 获取正在运行的任务** @throws SchedulerException*/public List<TaskJob> getAllExecutingTaskJob() throws SchedulerException {List<TaskJob> list = Lists.newArrayList();List<JobExecutionContext> currentlyExecutingJobs = scheduler.getCurrentlyExecutingJobs();for (JobExecutionContext jobExecutionContext : currentlyExecutingJobs) {Trigger trigger = jobExecutionContext.getTrigger();TriggerState triggerState = scheduler.getTriggerState(trigger.getKey());TaskJob taskJob = new TaskJob(jobExecutionContext.getJobDetail(), jobExecutionContext.getTrigger(), triggerState.name());list.add(taskJob);}return list;}/*** 生成触发器*/public Trigger createTrigger(String triggerName, String triggerGroup, String cronExpression) {CronScheduleBuilder cronScheduleBuilder = CronScheduleBuilder.cronSchedule(cronExpression);Trigger trigger = TriggerBuilder.newTrigger().withIdentity(triggerName, triggerGroup).withSchedule(cronScheduleBuilder).build();return trigger;}/*** 生成jobDetail*/public JobDetail createJobDetail(String jobName, String jobGroup, String jobClassName, Map<String, Object> dataMap, String description) throws ClassNotFoundException {Class<? extends Job> jobClass = (Class<? extends Job>) Class.forName(jobClassName);JobDetail jobDetail = JobBuilder.newJob(jobClass).withIdentity(jobName, jobGroup).withDescription(description).storeDurably().build();JobDataMap jobDataMap = jobDetail.getJobDataMap();jobDataMap.putAll(dataMap);return jobDetail;}/*** 获取JobDetail*/public JobDetail getJobDetail(String jobName, String jobGroup) throws SchedulerException {JobKey jobKey = JobKey.jobKey(jobName, jobGroup);JobDetail jobDetail = scheduler.getJobDetail(jobKey);return jobDetail;}/*** 获取Trigger*/public Trigger getTrigger(String triggerName, String triggerGroup) throws SchedulerException {TriggerKey triggerKey = TriggerKey.triggerKey(triggerName, triggerGroup);Trigger trigger = scheduler.getTrigger(triggerKey);return trigger;}/*** 删除相关的job任务**/public boolean deleteJob(String jobName, String jobGroup) throws SchedulerException {JobKey jobKey = JobKey.jobKey(jobName, jobGroup);return scheduler.deleteJob(jobKey);}/*** 停止一个job任务** @throws SchedulerException*/public void pauseJob(String jobName, String jobGroup) throws SchedulerException {JobKey jobKey = JobKey.jobKey(jobName, jobGroup);scheduler.pauseJob(jobKey);}/*** 恢复相关的job任务**/public void resumeJob(String jobName, String jobGroup) throws SchedulerException {JobKey jobKey = JobKey.jobKey(jobName, jobGroup);scheduler.resumeJob(jobKey);}/*** 通过触发器停止调度Job任务**/public boolean unscheduleJob(String triggerName, String triggerGroup) throws SchedulerException {TriggerKey triggerKey = TriggerKey.triggerKey(triggerName, triggerGroup);return scheduler.unscheduleJob(triggerKey);}/*** 重新恢复触发器相关的job任务**/public Date rescheduleJob(TriggerKey triggerkey, Trigger trigger) throws SchedulerException {return scheduler.rescheduleJob(triggerkey, trigger);}/*** 停止使用相关的触发器**/public void pauseTrigger(String triggerName, String triggerGroup) throws SchedulerException {TriggerKey triggerKey = TriggerKey.triggerKey(triggerName, triggerGroup);scheduler.pauseTrigger(triggerKey);}/*** 恢复使用相关的触发器**/public void resumeTrigger(String triggerName, String triggerGroup) throws SchedulerException {TriggerKey triggerKey = TriggerKey.triggerKey(triggerName, triggerGroup);scheduler.resumeTrigger(triggerKey);}
4.创建jobFactory用于job中spring的注入
import org.quartz.spi.TriggerFiredBundle;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.beans.factory.config.AutowireCapableBeanFactory;import org.springframework.scheduling.quartz.SpringBeanJobFactory;import org.springframework.stereotype.Component;@Componentpublic class AutowireJobFactory extends SpringBeanJobFactory {@Autowiredprivate AutowireCapableBeanFactory beanFactory;@Overrideprotected Object createJobInstance(TriggerFiredBundle bundle) throws Exception {Object jobInstance = super.createJobInstance(bundle);beanFactory.autowireBean(jobInstance);return jobInstance;}}
7.创建全局的job监听
import org.quartz.*;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.context.ApplicationContext;import org.springframework.stereotype.Component;import java.util.Date;@Componentpublic class GlobalLogJobListener implements JobListener {private static Logger log = LoggerFactory.getLogger(GlobalLogJobListener.class);@Autowiredprivate ScheduleLogService scheduleLogService;//用于记录log的service@Overridepublic String getName() {return "globalLogJobListener";}// Scheduler 在 JobDetail 将要被执行时调用这个方法。@Overridepublic void jobToBeExecuted(JobExecutionContext context) {try {ApplicationContext applicationContext = (ApplicationContext) (context.getScheduler().getContext().get("applicationContext"));scheduleLogService = applicationContext.getBean(ScheduleLogService.class);} catch (SchedulerException e) {e.printStackTrace();}JobDetail jobDetail = context.getJobDetail();Job jobInstance = context.getJobInstance();this.getScheduleLog(jobDetail);}// Scheduler 在 JobDetail 即将被执行,但又被 TriggerListener 否决了时调用这个方法。@Overridepublic void jobExecutionVetoed(JobExecutionContext context) {}// Scheduler 在 JobDetail 被执行之后调用这个方法。@Overridepublic void jobWasExecuted(JobExecutionContext context, JobExecutionException jobException) {JobDetail jobDetail = context.getJobDetail();ScheduleLog scheduleLog = this.getScheduleLog(jobDetail);// 执行方法后异常if (jobException != null) {String message = jobException.getMessage();scheduleLog.setResultMessage(message);scheduleLog.setResultCode(0);} else {scheduleLog.setResultMessage("success");scheduleLog.setResultCode(1);}ScheduleLog create = this.scheduleLogService.create(scheduleLog);log.info("excute job result:{}", create);}public ScheduleLog getScheduleLog(JobDetail jobDetail) {ScheduleLog scheduleLog = (ScheduleLog) jobDetail.getJobDataMap().get("scheduleLog");if (scheduleLog == null) {JobKey key = jobDetail.getKey();scheduleLog = new ScheduleLog();scheduleLog.setJobName(key.getName());scheduleLog.setJobGroup(key.getGroup());scheduleLog.setStartTime(new Date());jobDetail.getJobDataMap().put("scheduleLog", scheduleLog);}scheduleLog.setEndTime(new Date());return scheduleLog;}}
8.最后提供接口,让job可配置化,可以从界面上操控job