[关闭]
@kangwg 2017-04-09T15:15:27.000000Z 字数 14962 阅读 1291

layout: post
title: "quartz"
subtitle: "quartz"
date: 2017-04-09 12:00:00
author: "kangwg"
header-img: "img/post-bg-2015.jpg"
catalog: true

- quartz

一.简单的spring boot 的quartz

  1. @Component
  2. @Configurable
  3. @EnableScheduling
  4. public class TaskScheduler {
  5. private static Logger log = LoggerFactory.getLogger(TaskScheduler.class);
  6. @Scheduled(cron = "0 0 2 * * ?") // 每天2am
  7. public void test() {
  8. //do something
  9. }
  10. }

二.java 自带的quartz

  1. import java.util.*;
  2. import javax.annotation.PostConstruct;
  3. import org.slf4j.*;
  4. import org.springframework.beans.factory.annotation.Autowired;
  5. import org.springframework.stereotype.Component;
  6. import cn.swao.framework.def.StatusEnum;
  7. import cn.swao.jinyao.crawl.StartCatchService;
  8. import cn.swao.jinyao.model.*;
  9. import cn.swao.jinyao.repository.*;
  10. import cn.swao.jinyao.util.DataUtils;
  11. @Component
  12. public class TaskScheduler {
  13. private static Logger log = LoggerFactory.getLogger(TaskScheduler.class);
  14. @Autowired
  15. private StartCatchService startCatchService;//定时任务的方法存放
  16. @Autowired
  17. private QuartzRepository quartzRepository;//获取定时任务的参数dao层
  18. @Autowired
  19. public QuartzLogRepository quartzLogRepository; 定时任务日志记录
  20. @PostConstruct
  21. public void doScheduleAuto() {
  22. List<Quartz> quartzList = quartzRepository.findAll();
  23. for (Quartz quartz : quartzList) {
  24. int status = quartz.getStatus();
  25. if (StatusEnum.VALID.getValue() == status) {
  26. addSchedule(quartz);
  27. }
  28. }
  29. }
  30. public void addSchedule(Quartz quartz) {
  31. String method = quartz.getMethod();// 要执行的方法
  32. String startTime = quartz.getStartTime();// 执行的开始时间
  33. String period = quartz.getPeriod();// 执行的间隔
  34. String explain = quartz.getExplain();// 说明
  35. Timer timer = new Timer();
  36. timer.scheduleAtFixedRate(new TimerTask() {
  37. public void run() {
  38. QuartzLog quartzLog = new QuartzLog(new Date(), method, 1, 0, "sucess");
  39. try {
  40. StartCatchService.class.getMethod(method).invoke(startCatchService);
  41. } catch (Exception e) {
  42. quartzLog.setResult(2);
  43. quartzLog.setMessage("fail");
  44. log.info("开启定时调度爬虫失败,method={},startTime={},period={},explain={}", method, startTime, period, explain);
  45. }
  46. log.info("定时调度爬取数据sucess:explain={}", explain);
  47. quartzLogRepository.save(quartzLog);
  48. }
  49. }, DataUtils.delayed(startTime), Long.valueOf(period));// 这里设定将延时每天固定执行
  50. log.info("加载定时调度爬取数据:method={},startTime={},period={},explain={},sucess", method, startTime, period, explain);
  51. }
  52. }

将quartz的调度信息放在表中,启动项目后自动加载

三.利用第三方和spring结合
导入jar包

    <dependency>
        <groupId>org.quartz-scheduler</groupId>
        <artifactId>quartz</artifactId>
        <version>2.2.2</version>
    </dependency>

1.根据quartz自带的sql创建表
2.建立实体TaskJob用于存储和展现quartz信息,创建用于记录job运行的日志表
3.创建quartz.properties
添加配置
--jobListener
org.quartz.jobListener.globalLogJobListener.class=
--jobFactory
org.quartz.scheduler.jobFactory.class=

4.创建jobFactory用于job中spring的注入

  1. import org.quartz.spi.TriggerFiredBundle;
  2. import org.springframework.beans.factory.annotation.Autowired;
  3. import org.springframework.beans.factory.config.AutowireCapableBeanFactory;
  4. import org.springframework.scheduling.quartz.SpringBeanJobFactory;
  5. import org.springframework.stereotype.Component;
  6. @Component
  7. public class AutowireJobFactory extends SpringBeanJobFactory {
  8. @Autowired
  9. private AutowireCapableBeanFactory beanFactory;
  10. @Override
  11. protected Object createJobInstance(TriggerFiredBundle bundle) throws Exception {
  12. Object jobInstance = super.createJobInstance(bundle);
  13. beanFactory.autowireBean(jobInstance);
  14. return jobInstance;
  15. }
  16. }

5.创建quartz的config文件由于读取配置和生成SchedulerFactoryBean

  1. import org.springframework.beans.factory.annotation.Autowired;
  2. import org.springframework.beans.factory.config.PropertiesFactoryBean;
  3. import org.springframework.context.ApplicationContext;
  4. import org.springframework.context.annotation.Bean;
  5. import org.springframework.context.annotation.Configuration;
  6. import org.springframework.core.io.ClassPathResource;
  7. import org.springframework.scheduling.annotation.EnableScheduling;
  8. import org.springframework.scheduling.quartz.SchedulerFactoryBean;
  9. import java.io.IOException;
  10. import java.util.Properties;
  11. @Configuration
  12. @EnableScheduling
  13. public class SchedulerConfig {
  14. @Autowired
  15. private AutowireJobFactory autowireJobFactory;
  16. @Bean
  17. public SchedulerFactoryBean schedulerFactoryBean() throws IOException {
  18. SchedulerFactoryBean factory = new SchedulerFactoryBean();
  19. factory.setOverwriteExistingJobs(true);
  20. // 延时启动
  21. factory.setStartupDelay(20);
  22. // 加载quartz数据源配置
  23. factory.setQuartzProperties(quartzProperties());
  24. factory.setAutoStartup(false);
  25. // 自定义Job Factory,用于Spring注入
  26. factory.setJobFactory(autowireJobFactory);
  27. return factory;
  28. }
  29. /**
  30. * 加载quartz数据源配置
  31. *
  32. * @return
  33. * @throws IOException
  34. */
  35. @Bean
  36. public Properties quartzProperties() throws IOException {
  37. PropertiesFactoryBean propertiesFactoryBean = new PropertiesFactoryBean();
  38. propertiesFactoryBean.setLocation(new ClassPathResource("/quartz.properties"));
  39. propertiesFactoryBean.afterPropertiesSet();
  40. return propertiesFactoryBean.getObject();
  41. }
  42. }

6.创建quartz的service层,用于创建和控制job

  1. import java.text.ParseException;
  2. import java.util.Date;
  3. import java.util.List;
  4. import java.util.Map;
  5. import java.util.Set;
  6. import javax.annotation.PostConstruct;
  7. import org.quartz.*;
  8. import org.quartz.Trigger.TriggerState;
  9. import org.quartz.impl.matchers.GroupMatcher;
  10. import org.quartz.impl.triggers.CronTriggerImpl;
  11. import org.slf4j.Logger;
  12. import org.slf4j.LoggerFactory;
  13. import org.springframework.beans.factory.annotation.Autowired;
  14. import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
  15. import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
  16. import org.springframework.context.ApplicationContext;
  17. import org.springframework.context.annotation.Lazy;
  18. import org.springframework.scheduling.quartz.SchedulerFactoryBean;
  19. import org.springframework.stereotype.Service;
  20. import com.google.common.collect.Lists;
  21. import cn.swao.framework.model.TaskJob;
  22. @Service
  23. public class SchedulerService {
  24. private static Logger log = LoggerFactory.getLogger(SchedulerService.class);
  25. @Autowired
  26. private SchedulerFactoryBean schedulerFactoryBean;
  27. @Autowired
  28. private ApplicationContext applicationContext;
  29. private Scheduler scheduler;
  30. @PostConstruct
  31. public void init() throws SchedulerException {
  32. scheduler = schedulerFactoryBean.getScheduler();
  33. scheduler.getContext().put("applicationContext", applicationContext);
  34. }
  35. /**
  36. * 启动quartz
  37. */
  38. public void start() throws SchedulerException {
  39. if (!scheduler.isStarted())
  40. scheduler.start();
  41. }
  42. /**
  43. * 创建任务
  44. */
  45. public boolean createJob(TaskJob taskJob) throws SchedulerException {
  46. boolean result = false;
  47. if (taskJob != null) {
  48. try {
  49. JobDetail jobDetail = this.createJobDetail(taskJob.getJobName(), taskJob.getJobGroup(), taskJob.getJobClassName(), taskJob.getDataMap(), taskJob.getDescription());
  50. List<Map<String, String>> triggerList = taskJob.getTriggerList();
  51. for (int i = 0; i < triggerList.size(); i++) {
  52. Map<String, String> map = triggerList.get(i);
  53. CronTriggerImpl trigger = (CronTriggerImpl) this.createTrigger(taskJob.getTriggerName(map), taskJob.getTriggerGroup(map), taskJob.getCronExpression(map));
  54. if (i > 0) {
  55. trigger.setJobName(taskJob.getJobName());
  56. trigger.setJobName(taskJob.getJobGroup());
  57. scheduler.scheduleJob(trigger);
  58. } else {
  59. scheduler.scheduleJob(jobDetail, trigger);
  60. }
  61. }
  62. log.info("createJob success taskJob:{}", taskJob);
  63. result = true;
  64. } catch (Exception e) {
  65. log.info(taskJob.getJobClassName() + ".createJob (添加任务 ) 发生异常", e);
  66. }
  67. }
  68. return result;
  69. }
  70. /**
  71. * 修改一个任务的触发时间
  72. */
  73. public void modifyJobTime(String triggerName, String triggerGroupName, String time) throws SchedulerException, ParseException {
  74. TriggerKey triggerKey = TriggerKey.triggerKey(triggerName, triggerGroupName);
  75. CronTrigger trigger = (CronTrigger) scheduler.getTrigger(triggerKey);
  76. if (trigger == null) {
  77. return;
  78. }
  79. String oldTime = trigger.getCronExpression();
  80. if (!oldTime.equalsIgnoreCase(time)) {
  81. // 触发器
  82. TriggerBuilder<Trigger> triggerBuilder = TriggerBuilder.newTrigger();
  83. // 触发器名,触发器组
  84. triggerBuilder.withIdentity(triggerName, triggerGroupName);
  85. triggerBuilder.startNow();
  86. // 触发器时间设定
  87. triggerBuilder.withSchedule(CronScheduleBuilder.cronSchedule(time));
  88. // 创建Trigger对象
  89. trigger = (CronTrigger) triggerBuilder.build();
  90. // 修改一个任务的触发时间
  91. this.rescheduleJob(triggerKey, trigger);
  92. log.info("modify success triggerName:{},triggerGroupName:{},time:{}", triggerName, triggerGroupName, time);
  93. }
  94. }
  95. /**
  96. * 追加触发器
  97. */
  98. public void addJobTime(String jobName, String jobGroup, String triggerName, String triggerGroupName, String time) throws SchedulerException, ParseException {
  99. TriggerKey triggerKey = TriggerKey.triggerKey(triggerName, triggerGroupName);
  100. JobKey jobKey = JobKey.jobKey(jobName, jobGroup);
  101. CronTriggerImpl trigger = (CronTriggerImpl) this.createTrigger(triggerName, triggerGroupName, time);
  102. if (scheduler.checkExists(jobKey)) {
  103. trigger.setJobName(jobName);
  104. trigger.setJobName(jobGroup);
  105. scheduler.scheduleJob(trigger);
  106. }
  107. }
  108. /**
  109. * 移除一个任务以及它的触发器
  110. *
  111. */
  112. public void removeJob(String jobName, String jobGroup, String triggerName, String triggerGroup) throws SchedulerException {
  113. this.pauseTrigger(triggerName, triggerGroup);
  114. this.unscheduleJob(triggerName, triggerGroup);
  115. this.deleteJob(jobName, jobGroup);
  116. log.info("removeJob success jobName{},jobGroup{},triggerName{},triggerGroup{}", jobName, jobGroup, triggerName, triggerGroup);
  117. }
  118. /**
  119. * 获取所有的任务
  120. *
  121. * @return
  122. * @throws SchedulerException
  123. */
  124. public List<TaskJob> getAllTaskJob() throws SchedulerException {
  125. List<TaskJob> list = Lists.newArrayList();
  126. for (String groupName : scheduler.getJobGroupNames()) {
  127. for (JobKey jobKey : scheduler.getJobKeys(GroupMatcher.jobGroupEquals(groupName))) {
  128. JobDetail jobDetail = scheduler.getJobDetail(jobKey);
  129. List<Trigger> triggers = (List<Trigger>) scheduler.getTriggersOfJob(jobKey);
  130. TaskJob taskJob = new TaskJob();
  131. for (Trigger trigger : triggers) {
  132. TriggerState triggerState = scheduler.getTriggerState(trigger.getKey());
  133. taskJob.setTrigger(trigger, triggerState.name());
  134. }
  135. taskJob.setJobDetail(jobDetail);
  136. list.add(taskJob);
  137. }
  138. }
  139. return list;
  140. }
  141. /**
  142. * 获取正在运行的任务
  143. *
  144. * @throws SchedulerException
  145. */
  146. public List<TaskJob> getAllExecutingTaskJob() throws SchedulerException {
  147. List<TaskJob> list = Lists.newArrayList();
  148. List<JobExecutionContext> currentlyExecutingJobs = scheduler.getCurrentlyExecutingJobs();
  149. for (JobExecutionContext jobExecutionContext : currentlyExecutingJobs) {
  150. Trigger trigger = jobExecutionContext.getTrigger();
  151. TriggerState triggerState = scheduler.getTriggerState(trigger.getKey());
  152. TaskJob taskJob = new TaskJob(jobExecutionContext.getJobDetail(), jobExecutionContext.getTrigger(), triggerState.name());
  153. list.add(taskJob);
  154. }
  155. return list;
  156. }
  157. /**
  158. * 生成触发器
  159. */
  160. public Trigger createTrigger(String triggerName, String triggerGroup, String cronExpression) {
  161. CronScheduleBuilder cronScheduleBuilder = CronScheduleBuilder.cronSchedule(cronExpression);
  162. Trigger trigger = TriggerBuilder.newTrigger().withIdentity(triggerName, triggerGroup).withSchedule(cronScheduleBuilder).build();
  163. return trigger;
  164. }
  165. /**
  166. * 生成jobDetail
  167. */
  168. public JobDetail createJobDetail(String jobName, String jobGroup, String jobClassName, Map<String, Object> dataMap, String description) throws ClassNotFoundException {
  169. Class<? extends Job> jobClass = (Class<? extends Job>) Class.forName(jobClassName);
  170. JobDetail jobDetail = JobBuilder.newJob(jobClass).withIdentity(jobName, jobGroup).withDescription(description).storeDurably().build();
  171. JobDataMap jobDataMap = jobDetail.getJobDataMap();
  172. jobDataMap.putAll(dataMap);
  173. return jobDetail;
  174. }
  175. /**
  176. * 获取JobDetail
  177. */
  178. public JobDetail getJobDetail(String jobName, String jobGroup) throws SchedulerException {
  179. JobKey jobKey = JobKey.jobKey(jobName, jobGroup);
  180. JobDetail jobDetail = scheduler.getJobDetail(jobKey);
  181. return jobDetail;
  182. }
  183. /**
  184. * 获取Trigger
  185. */
  186. public Trigger getTrigger(String triggerName, String triggerGroup) throws SchedulerException {
  187. TriggerKey triggerKey = TriggerKey.triggerKey(triggerName, triggerGroup);
  188. Trigger trigger = scheduler.getTrigger(triggerKey);
  189. return trigger;
  190. }
  191. /**
  192. * 删除相关的job任务
  193. *
  194. */
  195. public boolean deleteJob(String jobName, String jobGroup) throws SchedulerException {
  196. JobKey jobKey = JobKey.jobKey(jobName, jobGroup);
  197. return scheduler.deleteJob(jobKey);
  198. }
  199. /**
  200. * 停止一个job任务
  201. *
  202. * @throws SchedulerException
  203. */
  204. public void pauseJob(String jobName, String jobGroup) throws SchedulerException {
  205. JobKey jobKey = JobKey.jobKey(jobName, jobGroup);
  206. scheduler.pauseJob(jobKey);
  207. }
  208. /**
  209. * 恢复相关的job任务
  210. *
  211. */
  212. public void resumeJob(String jobName, String jobGroup) throws SchedulerException {
  213. JobKey jobKey = JobKey.jobKey(jobName, jobGroup);
  214. scheduler.resumeJob(jobKey);
  215. }
  216. /**
  217. * 通过触发器停止调度Job任务
  218. *
  219. */
  220. public boolean unscheduleJob(String triggerName, String triggerGroup) throws SchedulerException {
  221. TriggerKey triggerKey = TriggerKey.triggerKey(triggerName, triggerGroup);
  222. return scheduler.unscheduleJob(triggerKey);
  223. }
  224. /**
  225. * 重新恢复触发器相关的job任务
  226. *
  227. */
  228. public Date rescheduleJob(TriggerKey triggerkey, Trigger trigger) throws SchedulerException {
  229. return scheduler.rescheduleJob(triggerkey, trigger);
  230. }
  231. /**
  232. * 停止使用相关的触发器
  233. *
  234. */
  235. public void pauseTrigger(String triggerName, String triggerGroup) throws SchedulerException {
  236. TriggerKey triggerKey = TriggerKey.triggerKey(triggerName, triggerGroup);
  237. scheduler.pauseTrigger(triggerKey);
  238. }
  239. /**
  240. * 恢复使用相关的触发器
  241. *
  242. */
  243. public void resumeTrigger(String triggerName, String triggerGroup) throws SchedulerException {
  244. TriggerKey triggerKey = TriggerKey.triggerKey(triggerName, triggerGroup);
  245. scheduler.resumeTrigger(triggerKey);
  246. }

4.创建jobFactory用于job中spring的注入

  1. import org.quartz.spi.TriggerFiredBundle;
  2. import org.springframework.beans.factory.annotation.Autowired;
  3. import org.springframework.beans.factory.config.AutowireCapableBeanFactory;
  4. import org.springframework.scheduling.quartz.SpringBeanJobFactory;
  5. import org.springframework.stereotype.Component;
  6. @Component
  7. public class AutowireJobFactory extends SpringBeanJobFactory {
  8. @Autowired
  9. private AutowireCapableBeanFactory beanFactory;
  10. @Override
  11. protected Object createJobInstance(TriggerFiredBundle bundle) throws Exception {
  12. Object jobInstance = super.createJobInstance(bundle);
  13. beanFactory.autowireBean(jobInstance);
  14. return jobInstance;
  15. }
  16. }

7.创建全局的job监听

  1. import org.quartz.*;
  2. import org.slf4j.Logger;
  3. import org.slf4j.LoggerFactory;
  4. import org.springframework.beans.factory.annotation.Autowired;
  5. import org.springframework.context.ApplicationContext;
  6. import org.springframework.stereotype.Component;
  7. import java.util.Date;
  8. @Component
  9. public class GlobalLogJobListener implements JobListener {
  10. private static Logger log = LoggerFactory.getLogger(GlobalLogJobListener.class);
  11. @Autowired
  12. private ScheduleLogService scheduleLogService;//用于记录log的service
  13. @Override
  14. public String getName() {
  15. return "globalLogJobListener";
  16. }
  17. // Scheduler 在 JobDetail 将要被执行时调用这个方法。
  18. @Override
  19. public void jobToBeExecuted(JobExecutionContext context) {
  20. try {
  21. ApplicationContext applicationContext = (ApplicationContext) (context.getScheduler().getContext().get("applicationContext"));
  22. scheduleLogService = applicationContext.getBean(ScheduleLogService.class);
  23. } catch (SchedulerException e) {
  24. e.printStackTrace();
  25. }
  26. JobDetail jobDetail = context.getJobDetail();
  27. Job jobInstance = context.getJobInstance();
  28. this.getScheduleLog(jobDetail);
  29. }
  30. // Scheduler 在 JobDetail 即将被执行,但又被 TriggerListener 否决了时调用这个方法。
  31. @Override
  32. public void jobExecutionVetoed(JobExecutionContext context) {
  33. }
  34. // Scheduler 在 JobDetail 被执行之后调用这个方法。
  35. @Override
  36. public void jobWasExecuted(JobExecutionContext context, JobExecutionException jobException) {
  37. JobDetail jobDetail = context.getJobDetail();
  38. ScheduleLog scheduleLog = this.getScheduleLog(jobDetail);
  39. // 执行方法后异常
  40. if (jobException != null) {
  41. String message = jobException.getMessage();
  42. scheduleLog.setResultMessage(message);
  43. scheduleLog.setResultCode(0);
  44. } else {
  45. scheduleLog.setResultMessage("success");
  46. scheduleLog.setResultCode(1);
  47. }
  48. ScheduleLog create = this.scheduleLogService.create(scheduleLog);
  49. log.info("excute job result:{}", create);
  50. }
  51. public ScheduleLog getScheduleLog(JobDetail jobDetail) {
  52. ScheduleLog scheduleLog = (ScheduleLog) jobDetail.getJobDataMap().get("scheduleLog");
  53. if (scheduleLog == null) {
  54. JobKey key = jobDetail.getKey();
  55. scheduleLog = new ScheduleLog();
  56. scheduleLog.setJobName(key.getName());
  57. scheduleLog.setJobGroup(key.getGroup());
  58. scheduleLog.setStartTime(new Date());
  59. jobDetail.getJobDataMap().put("scheduleLog", scheduleLog);
  60. }
  61. scheduleLog.setEndTime(new Date());
  62. return scheduleLog;
  63. }
  64. }

8.最后提供接口,让job可配置化,可以从界面上操控job

添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注