标签:fun model public tab refresh efi core prefix entry
首先配置ThreadPoolTaskScheduler线程池:
package cn.demo.support.config; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; @Configuration public class ScheduleConfig { @Bean(destroyMethod = "shutdown") public ThreadPoolTaskScheduler taskExecutor() { ThreadPoolTaskScheduler executor = new ThreadPoolTaskScheduler(); executor.setPoolSize(20); executor.setThreadNamePrefix("taskExecutor-"); executor.setWaitForTasksToCompleteOnShutdown(true); executor.setAwaitTerminationSeconds(60); return executor; } }
配置定时任务,业务类中注入HiveClusterSyncScheduler即可调用:
1 package cn.jsfund.ngdp.support.batchSchedule; 2 3 import java.util.ArrayList; 4 import java.util.Date; 5 import java.util.List; 6 import java.util.Map; 7 import java.util.Map.Entry; 8 import java.util.concurrent.ConcurrentHashMap; 9 import java.util.concurrent.ScheduledFuture; 10 11 import javax.annotation.PostConstruct; 12 import javax.annotation.Resource; 13 14 import org.apache.commons.lang.StringUtils; 15 import org.slf4j.Logger; 16 import org.slf4j.LoggerFactory; 17 import org.springframework.beans.factory.annotation.Autowired; 18 import org.springframework.jdbc.core.JdbcTemplate; 19 import org.springframework.scheduling.Trigger; 20 import org.springframework.scheduling.TriggerContext; 21 import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; 22 import org.springframework.scheduling.support.CronSequenceGenerator; 23 import org.springframework.scheduling.support.CronTrigger; 24 import org.springframework.stereotype.Component; 25 import org.springframework.util.ObjectUtils; 26 27 import cn.jsfund.ngdp.support.config.BasicConfig; 28 import cn.jsfund.ngdp.support.exception.ServiceException; 29 import cn.jsfund.ngdp.support.model.bigdata.TaskDef; 30 import cn.jsfund.ngdp.support.web.service.bigdata.HiveClusterSyncService; 31 32 @SuppressWarnings("rawtypes") 33 @Component 34 public class HiveClusterSyncScheduler { 35 36 @Resource 37 private JdbcTemplate dmJdbcTemplate; 38 39 @Resource 40 private BasicConfig basicConfig; 41 42 @Resource 43 HiveClusterSyncService hiveClusterSyncService;//业务类 44 45 @Autowired 46 ThreadPoolTaskScheduler threadPoolTaskScheduler; 47 48 private static final Logger logger = LoggerFactory.getLogger(HiveClusterSyncScheduler.class); 49 50 private static final String MAPTASKKEY = "map_task_key"; 51 52 private static Map<String, ScheduledFuture> scheduledFutureMap = new ConcurrentHashMap<String, ScheduledFuture>(); 53 54 @PostConstruct 55 public void init() { 56 refreshTasks(); 57 } 58 59 // @Scheduled(cron = "${bigData.hive.backup.schduler.refresh.crontab}") 60 public void refreshTasks() { 61 62 if (!"true".equals(basicConfig.getBackupEnabled())) { 63 logger.info("*************hive集群同步开关未开启,结束计划任务刷新.*************"); 64 return; 65 } 66 for (Entry<String, ScheduledFuture> entry : scheduledFutureMap.entrySet()) { 67 ScheduledFuture sf = entry.getValue(); 68 if (sf != null) { 69 sf.cancel(false); 70 } 71 } 72 scheduledFutureMap.clear(); 73 74 logger.info("*************开始扫描数据库,刷新定时任务*************"); 75 76 List<TaskDef> list = new ArrayList<>(); 77 78 try { 79 list = hiveClusterSyncService.queryTaskDefs(null, null, null, null, null, "1", null) 80 .getContent(); 81 } catch (Exception e) { 82 logger.info("查询数据库异常,代码执行结束,异常信息:", e); 83 } 84 if (ObjectUtils.isEmpty(list)) { 85 logger.info("查询启动状态的任务记录为空,代码执行结束。"); 86 return; 87 } 88 for (TaskDef taskDef : list) { 89 String taskId = taskDef.getId(); 90 String crontab = taskDef.getCrontab(); 91 if (StringUtils.isBlank(crontab)) { 92 continue; 93 } 94 95 TaskThread taskThread = new TaskThread(taskId, crontab); 96 boolean isValidExp = CronSequenceGenerator.isValidExpression(crontab); 97 if (!isValidExp) { 98 logger.error("当前cron表达式无效,taskId={},cronExpression={}", taskId, crontab); 99 continue; 100 } 101 ScheduledFuture<?> scheduledFuture = threadPoolTaskScheduler.schedule(taskThread, 102 new Trigger() { 103 104 @Override 105 public Date nextExecutionTime(TriggerContext triggerContext) { 106 107 return new CronTrigger(crontab).nextExecutionTime(triggerContext); 108 } 109 }); 110 111 scheduledFutureMap.put(MAPTASKKEY + taskId, scheduledFuture); 112 } 113 logger.info("*************刷新定时任务完成*************"); 114 } 115 116 //添加计划 117 public void addTask(String taskId, String crontab) { 118 if (!"true".equals(basicConfig.getBackupEnabled())) { 119 return; 120 } 121 TaskThread taskThread = new TaskThread(taskId, crontab); 122 ScheduledFuture<?> scheduledFuture = threadPoolTaskScheduler.schedule(taskThread, 123 new Trigger() { 124 @Override 125 public Date nextExecutionTime(TriggerContext triggerContext) { 126 return new CronTrigger(crontab).nextExecutionTime(triggerContext); 127 } 128 }); 129 scheduledFutureMap.put(MAPTASKKEY + taskId, scheduledFuture); 130 } 131 132 //取消计划 133 public void cancelTask(Object... taskId) { 134 for (int i = 0; i < taskId.length; i++) { 135 ScheduledFuture sf = scheduledFutureMap.get(MAPTASKKEY + taskId[i]); 136 if (sf != null) { 137 sf.cancel(false); 138 scheduledFutureMap.remove(MAPTASKKEY + taskId[i]); 139 } 140 } 141 } 142 143 //更新计划:先取消再添加 144 public void updateScheduleTask(String taskId, String crontab) throws ServiceException { 145 if (!"true".equals(basicConfig.getBackupEnabled())) { 146 return; 147 } 148 ScheduledFuture sf = scheduledFutureMap.get(MAPTASKKEY + taskId); 149 if (sf != null) { 150 sf.cancel(false); 151 scheduledFutureMap.remove(MAPTASKKEY + taskId); 152 } 153 TaskThread taskThread = new TaskThread(taskId, crontab); 154 ScheduledFuture<?> scheduledFuture = threadPoolTaskScheduler.schedule(taskThread, 155 new Trigger() { 156 @Override 157 public Date nextExecutionTime(TriggerContext triggerContext) { 158 return new CronTrigger(crontab).nextExecutionTime(triggerContext); 159 } 160 }); 161 scheduledFutureMap.put(MAPTASKKEY + taskId, scheduledFuture); 162 } 163 164 class TaskThread extends Thread { 165 166 private String taskId; 167 168 private String crontab; 169 170 public TaskThread(String taskId, String crontab) { 171 this.taskId = taskId; 172 this.crontab = crontab; 173 } 174 175 public void run() { 176 try { 177 hiveClusterSyncService.bootTask(taskId, crontab); 178 } catch (Exception e) { 179 logger.info("任务(taskId={})结束初始化,异常信息:{}", taskId, e.getMessage()); 180 } 181 } 182 } 183 184 }
package cn.jsfund.ngdp.support.batchSchedule;
import java.util.ArrayList;import java.util.Date;import java.util.List;import java.util.Map;import java.util.Map.Entry;import java.util.concurrent.ConcurrentHashMap;import java.util.concurrent.ScheduledFuture;
import javax.annotation.PostConstruct;import javax.annotation.Resource;
import org.apache.commons.lang.StringUtils;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.jdbc.core.JdbcTemplate;import org.springframework.scheduling.Trigger;import org.springframework.scheduling.TriggerContext;import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;import org.springframework.scheduling.support.CronSequenceGenerator;import org.springframework.scheduling.support.CronTrigger;import org.springframework.stereotype.Component;import org.springframework.util.ObjectUtils;
import cn.jsfund.ngdp.support.config.BasicConfig;import cn.jsfund.ngdp.support.exception.ServiceException;import cn.jsfund.ngdp.support.model.bigdata.TaskDef;import cn.jsfund.ngdp.support.web.service.bigdata.HiveClusterSyncService;
@SuppressWarnings("rawtypes")@Componentpublic class HiveClusterSyncScheduler {
    @Resource    private JdbcTemplate dmJdbcTemplate;
    @Resource    private BasicConfig basicConfig;
    @Resource    HiveClusterSyncService hiveClusterSyncService;
    @Autowired    ThreadPoolTaskScheduler threadPoolTaskScheduler;
    private static final Logger logger = LoggerFactory.getLogger(HiveClusterSyncScheduler.class);
    private static final String MAPTASKKEY = "map_task_key";
    private static Map<String, ScheduledFuture> scheduledFutureMap = new ConcurrentHashMap<String, ScheduledFuture>();
    @PostConstruct    public void init() {        refreshTasks();    }
    //    @Scheduled(cron = "${bigData.hive.backup.schduler.refresh.crontab}")    public void refreshTasks() {
        if (!"true".equals(basicConfig.getBackupEnabled())) {            logger.info("*************hive集群同步开关未开启,结束计划任务刷新.*************");            return;        }        for (Entry<String, ScheduledFuture> entry : scheduledFutureMap.entrySet()) {            ScheduledFuture sf = entry.getValue();            if (sf != null) {                sf.cancel(false);            }        }        scheduledFutureMap.clear();
        logger.info("*************开始扫描数据库,刷新定时任务*************");
        List<TaskDef> list = new ArrayList<>();
        try {            list = hiveClusterSyncService.queryTaskDefs(null, null, null, null, null, "1", null)                    .getContent();        } catch (Exception e) {            logger.info("查询数据库异常,代码执行结束,异常信息:", e);        }        if (ObjectUtils.isEmpty(list)) {            logger.info("查询启动状态的任务记录为空,代码执行结束。");            return;        }        for (TaskDef taskDef : list) {            String taskId = taskDef.getId();            String crontab = taskDef.getCrontab();            if (StringUtils.isBlank(crontab)) {                continue;            }
            TaskThread taskThread = new TaskThread(taskId, crontab);            boolean isValidExp = CronSequenceGenerator.isValidExpression(crontab);            if (!isValidExp) {                logger.error("当前cron表达式无效,taskId={},cronExpression={}", taskId, crontab);                continue;            }            ScheduledFuture<?> scheduledFuture = threadPoolTaskScheduler.schedule(taskThread,                    new Trigger() {
                        @Override                        public Date nextExecutionTime(TriggerContext triggerContext) {
                            return new CronTrigger(crontab).nextExecutionTime(triggerContext);                        }                    });
            scheduledFutureMap.put(MAPTASKKEY + taskId, scheduledFuture);        }        logger.info("*************刷新定时任务完成*************");    }
    //添加计划    public void addTask(String taskId, String crontab) {        if (!"true".equals(basicConfig.getBackupEnabled())) {            return;        }        TaskThread taskThread = new TaskThread(taskId, crontab);        ScheduledFuture<?> scheduledFuture = threadPoolTaskScheduler.schedule(taskThread,                new Trigger() {                    @Override                    public Date nextExecutionTime(TriggerContext triggerContext) {                        return new CronTrigger(crontab).nextExecutionTime(triggerContext);                    }                });        scheduledFutureMap.put(MAPTASKKEY + taskId, scheduledFuture);    }
    //取消计划    public void cancelTask(Object... taskId) {        for (int i = 0; i < taskId.length; i++) {            ScheduledFuture sf = scheduledFutureMap.get(MAPTASKKEY + taskId[i]);            if (sf != null) {                sf.cancel(false);                scheduledFutureMap.remove(MAPTASKKEY + taskId[i]);            }        }    }
    //更新计划:先取消再添加    public void updateScheduleTask(String taskId, String crontab) throws ServiceException {        if (!"true".equals(basicConfig.getBackupEnabled())) {            return;        }        ScheduledFuture sf = scheduledFutureMap.get(MAPTASKKEY + taskId);        if (sf != null) {            sf.cancel(false);            scheduledFutureMap.remove(MAPTASKKEY + taskId);        }        TaskThread taskThread = new TaskThread(taskId, crontab);        ScheduledFuture<?> scheduledFuture = threadPoolTaskScheduler.schedule(taskThread,                new Trigger() {                    @Override                    public Date nextExecutionTime(TriggerContext triggerContext) {                        return new CronTrigger(crontab).nextExecutionTime(triggerContext);                    }                });        scheduledFutureMap.put(MAPTASKKEY + taskId, scheduledFuture);    }
    class TaskThread extends Thread {
        private String taskId;
        private String crontab;
        public TaskThread(String taskId, String crontab) {            this.taskId = taskId;            this.crontab = crontab;        }
        public void run() {            try {                hiveClusterSyncService.bootTask(taskId, crontab);            } catch (Exception e) {                logger.info("任务(taskId={})结束初始化,异常信息:{}", taskId, e.getMessage());            }        }    }
}
标签:fun model public tab refresh efi core prefix entry
原文地址:https://www.cnblogs.com/itfeng813/p/14629006.html