标签:
们需要一个“单点worker”系统,此系统来确保系统中定时任务在分布式环境中,任意时刻只有一个实例处于活跃;比如,生产环境中,有6台机器支撑一个应用,但是一个应用中有30个定时任务,这些任务有些必须被在“单线程”环境中运行(例如“数据统计”任务),避免并发的原因不是在java层面,可能是在操作db数据时,或者是在消息消费时,或者是信息推送时等。某个指标的“数据统计”任务,每天只需要执行一次,即使执行多次也是妄费,因为这种类型的定时任务,需要被“单点”。同时,如果一个任务在没有报告结果的情况下异常推出,我们仍然期望集群中其他实例能够主动“接管”它。在实现不良好的架构中,可能有些开发者使用手动触发特定脚本的方式执行,有些web项目可能是通过配置特定host的方式开启任务。对于某些定时任务,可能会采用quartz-cluster中的某些实现,但是他需要数据库的额外支持。
此时,我们将使用zookeeper来实现此功能。本实例提供了如下功能展示:
1) 提供了单点worker功能
2) 提供了worker均衡能力(30个worker相对均匀的分配到6台机器上)
3) 提供了worker失效接管能力。
但是仍有很多亟待解决的问题:
1) 无法确保任务的接管是及时的,即一个任务执行者失效,将会在一定的过期时间后,才会被其他sid接管
2) 在极少的情况下,仍然会有一个任务同时被2个sid执行。
3) 在极少的情况下,会有极短的时间内,一个任务不会被任何sid接管,处于“孤立”状态
尽管zk提供了watch机制,但是上述问题,不仅不能完全避免,还会额外增加代码的复杂度。最终我个人放弃了对在此类中使用watch的想法。。
注意:zk中exist和create/delete等操作并非原子,可能在exist返回false的情况下,去create此节点,也有可能抛出NoExistsException;你应该能够想到“并发”环境造成此问题的时机(其他zk客户端也有类似的操作,并发)。
注意:在zk中删除父节点,将会导致子节点一并删除;同理,如果创建一个节点,那么它的各级父节点必须已经存在,且节点的层级越深,对zk底层存储而言数据结构越冗杂。
数据结构与设计思路:
1) serverType为当前应用标识,我们期望每个应用都有各自的serverType,方便数据分类; jobType为任务类型或者任务名称;如下全节点表示某个serverType的jobType下有sid1,sid2,sid3共三个实例(例如tomcat实例,或者物理机器标识)参与了此任务。zk节点路径格式:
/severType/jobType/register/sid1
............................../sid2
............................../sid3
2) 表示此jobType,被sid1运行。zk节点路径格式
/severType/jobType/alive/sid1 挂载数据:null
3) /serverType/jobType 挂载数据:cronException;将任务的“cron表达式”作为数据挂载
4) {todu} 表示serverType下每个sid运行的任务个数,我们可以用来“均衡”任务,将新任务分配给任务较少的sid上。
/serverType/sid1 挂载数据:任务个数.
如下是本人的代码样例,实际生产环境中代码与样例有区别,此处仅供参考,本实例基于zookeeper + quartz 2.1,如有错误之处,请不吝赐教:
1) TestMain.java :测试引导类
2) PrintNumberJob.java:一个简单的任务,打印一个随即数字。
3) PrintTimeJob.java:一个简单的任务,打印当前时间。
4) SingleWorkerManager.java:核心类,用于处理调用者提交的任务,并确保结果符合预期。此类有2个内部工作线程组成,分别处理zk数据同步和用户任务交付等工作。
很遗憾,源代码非常的长,尽管我已经足够细心的整理格式,但还是不够悦目,建议参阅者下载代码阅读,谢谢
SingleWorkerManager.java
- package com.sample.zk.singleWorker;
-
- public class SingleWorkerManager {
-
- private static final String GROUP = "single-worker";
- private Scheduler scheduler;
- private ZooKeeper zkClient;
- private String serverType = "_-default-_";
- private static final String REGISTER = "/register";
- private static final String ALIVE = "/alive";
-
- private Watcher dw = new InnerZK();
-
- private boolean isAlive = false;
-
- private Object tag = new Object();
-
- private ReentrantLock lock = new ReentrantLock();
-
- private String sid;
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
- private SynchronousQueue<Worker> outgoingWorker = new SynchronousQueue<Worker>();
-
-
- private Map<String,Worker> allWorkers = new HashMap<String,Worker>();
-
-
- private Map<String,Worker> selfWorkers = new HashMap<String,Worker>();
-
-
- private Thread syncThread;
-
- private Thread workerThread;
-
-
- public SingleWorkerManager(String sid){
- this(sid,null);
-
- }
-
- public SingleWorkerManager(String sid,String sType){
- if(sType != null){
- this.serverType = sType;
- }
- try{
- zkClient = new ZooKeeper(Constants.connectString, 3000, dw,false);
- }catch(Exception e){
- e.printStackTrace();
- throw new RuntimeException(e);
- }
- this.sid = sid;
- syncThread = new Thread(new SyncHandler());
- syncThread.setDaemon(true);
- syncThread.start();
- }
-
-
- public void start(){
- try{
- scheduler = StdSchedulerFactory.getDefaultScheduler();
- scheduler.start();
- workerThread = new Thread(new WorkerHandler());
- workerThread.setDaemon(true);
- workerThread.start();
- isAlive = true;
- synchronized (tag) {
- tag.notifyAll();
- }
-
-
- sync();
- }catch(Exception e){
- e.printStackTrace();
- throw new RuntimeException(e);
- }
- }
-
-
- public void close(){
- lock.lock();
- try{
- isAlive = false;
- scheduler.shutdown();
- if (syncThread.isAlive()) {
- syncThread.interrupt();
- }
- if(workerThread.isAlive()){
- workerThread.interrupt();
- }
- if(zkClient != null){
- zkClient.close();
- }
- }catch(Exception e){
- e.printStackTrace();
- }finally{
- lock.unlock();
- }
- }
-
-
- public void unschedule(String jobName){
- try{
-
- lock.unlock();
- try{
- String jobPath = "/" + serverType + "/" + jobName;
- Stat stat = zkClient.exists(jobPath, false);
- if(stat != null){
- zkClient.delete(jobPath, stat.getVersion());
- }
- }catch(NoNodeException e){
-
- }catch(Exception e){
- e.printStackTrace();
- }
-
- }catch(Exception e){
- e.printStackTrace();
- }
- }
-
-
- public boolean schedule(Class<? extends Job> jobClass,String cronExpression){
- if(!isAlive){
- throw new IllegalStateException("worker has been closed!");
- }
- try{
- Worker worker = this.build(jobClass, cronExpression);
- return outgoingWorker.offer(worker,15,TimeUnit.SECONDS);
- }catch(Exception e){
- e.printStackTrace();
- throw new RuntimeException(e);
- }
- }
-
- private Worker build(Class<? extends Job> jobClass,String cronExpression){
- String name = jobClass.getName();
- JobDetail job = JobBuilder.newJob(jobClass).withIdentity(name,GROUP).build();
- CronScheduleBuilder sb = CronScheduleBuilder.cronSchedule(cronExpression);
- Trigger trigger = TriggerBuilder.newTrigger().withIdentity(name, GROUP).withSchedule(sb).build();
- return new Worker(job, trigger,cronExpression);
- }
-
-
-
-
-
- private boolean isReady(){
- if(!isAlive){
- return false;
- }
- if(scheduler == null || zkClient == null){
- return false;
- }
- try{
- if(scheduler.isShutdown() || !scheduler.isStarted()){
- return false;
- }
- }catch(Exception e){
- e.printStackTrace();
- return false;
- }
- if(zkClient.getState().isConnected()){
- return true;
- }
- return false;
- }
-
-
- private void syncSelfWorker(){
- lock.lock();
- try{
- if(!isReady()){
- throw new RuntimeException("Scheduler error..");
- }
-
- for(String job : selfWorkers.keySet()){
- String jobPath = "/" + serverType + "/" + job;
-
-
- if(zkClient.exists(jobPath, false) == null){
- allWorkers.remove(job);
- Worker cw = selfWorkers.remove(job);
- if(cw != null){
- if(scheduler.checkExists(cw.getJob().getKey())){
- scheduler.unscheduleJob(cw.getTrigger().getKey());
- }
- }
- continue;
- }
- String alive = "/" + serverType + "/" + job + ALIVE;
-
- List<String> alives = zkClient.getChildren(alive, false);
- if(alives == null || alives.isEmpty()){
-
- continue;
- }
- if(alives.size() == 1){
- String holder = alives.get(0);
-
- if(holder.equalsIgnoreCase(sid)){
- byte[] data = String.valueOf(System.currentTimeMillis()).getBytes();
- zkClient.setData(alive + "/" + sid, data, -1);
- continue;
- }
- }
-
-
- if(zkClient.exists(alive + "/" + sid, false) != null){
- try{
- zkClient.delete(alive + "/" + sid, -1);
- scheduler.unscheduleJob(new TriggerKey(job, GROUP));
- selfWorkers.remove(job);
- }catch(NoNodeException e){
-
- }catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
- }catch(Exception e){
- e.printStackTrace();
- }finally{
- lock.unlock();
- }
- }
-
-
-
- private void sync(){
- lock.lock();
- try{
- if(!isReady()){
- throw new RuntimeException("Scheduler error..");
- }
-
- Stat tstat = zkClient.exists("/" + serverType,false);
- if(tstat == null){
- try{
- zkClient.create("/" + serverType, null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
- }catch(NodeExistsException e){
-
- }
- }
-
- syncSelfWorker();
-
-
-
- List<String> allJobs = zkClient.getChildren("/" + serverType, false);
- if(allJobs == null){
- throw new RuntimeException("NO jobs, error..");
- }
- allWorkers.clear();
- for(String job : allJobs){
- try{
-
- byte[] data = zkClient.getData("/" + serverType + "/" + job, false, null);
- if(data == null || data.length == 0){
- continue;
- }
-
-
- Class<? extends Job> jobClass = (Class<? extends Job>)ClassLoader.getSystemClassLoader().loadClass(job);
- Worker worker = build(jobClass, new String(data));
- allWorkers.put(job,worker);
-
- String registerPath = "/" + serverType + "/" + job + REGISTER + "/" + sid;
-
- if(zkClient.exists(registerPath, false) == null){
- try{
- zkClient.create(registerPath, null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
- }catch(NodeExistsException ex){
-
- }
- }
-
- String alivePath = "/" + serverType + "/" + job + ALIVE +"/" + sid;
-
- if(zkClient.exists(alivePath, false) == null){
- continue;
- }
-
- try{
- boolean exists = scheduler.checkExists(worker.getJob().getKey());
- if(!exists){
-
- scheduler.scheduleJob(worker.getJob(),worker.getTrigger());
- selfWorkers.put(job,worker);
- }
- }catch(Exception e){
- e.printStackTrace();
- zkClient.delete(alivePath, -1);
-
- selfWorkers.remove(job);
- }
- }catch(ClassNotFoundException e){
- e.printStackTrace();
- throw new RuntimeException(e);
- }
- }
-
- }catch(Exception e){
- e.printStackTrace();
- }finally{
- lock.unlock();
- }
- }
-
-
- class InnerZK implements Watcher {
-
- public void process(WatchedEvent event) {
-
- if (event.getType() != EventType.None) {
-
- return;
- }
-
-
-
- switch (event.getState()) {
- case SyncConnected:
- System.out.println("Connected...");
-
- sync();
- break;
- case Expired:
- System.out.println("Expired...");
- break;
-
- case Disconnected:
-
- System.out.println("Connecting....");
- break;
- case AuthFailed:
- close();
- throw new RuntimeException("ZK Connection auth failed...");
- default:
- break;
- }
- }
-
- }
-
-
- private void scheduler(){
- lock.lock();
- for(String job : allWorkers.keySet()){
- try{
-
-
-
-
-
- String alivePath = "/" + serverType + "/" + job + ALIVE;
- List<String> children = zkClient.getChildren(alivePath, false);
- if(children == null || children.isEmpty()){
-
- String registerPath = "/" + serverType + "/" + job + REGISTER;
- List<String> rc = zkClient.getChildren(registerPath, false);
-
- if(rc == null || rc.isEmpty()){
- continue;
- }
- Collections.shuffle(rc);
- String tsid = rc.get(0);
- try{
- byte[] data = String.valueOf(System.currentTimeMillis()).getBytes();
- zkClient.create(alivePath + "/" + tsid, data, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-
-
-
-
-
-
- System.out.println("Job switch,SID:" + tsid + ",JOB :" + job);
- }catch(NodeExistsException e){
-
- }
- continue;
- }
-
-
- for(String id : children){
- String tpath = alivePath + "/" + id;
- Stat stat = new Stat();
- byte[] data = zkClient.getData(tpath, false,stat);
- long time = Long.valueOf(new String(data));
- long current = System.currentTimeMillis();
-
-
-
- if(time + 1500 < current){
- try{
- zkClient.delete(tpath, stat.getVersion());
- }catch(BadVersionException e){
-
- }catch(NoNodeException e){
-
- }
- }else{
- System.out.println(id + " :" + job);
- }
- }
- }catch(Exception e){
- e.printStackTrace();
- }
- }
- lock.unlock();
- }
-
- class SyncHandler implements Runnable {
-
- public void run() {
- try {
- int i = 0;
- int l = 10;
- while (true) {
- synchronized (tag) {
- try{
- while(!scheduler.isStarted()){
- tag.wait();
- }
- }catch(Exception e){
-
- }
- }
- System.out.println("Sync handler,running...tid: " + Thread.currentThread().getId());
- if (zkClient == null || (zkClient.getState() == States.NOT_CONNECTED || zkClient.getState() == States.CLOSED)) {
- lock.lock();
- try {
-
- zkClient = new ZooKeeper(Constants.connectString, 3000, dw, false);
- System.out.println("Reconnected success!...");
- } catch (Exception e) {
- e.printStackTrace();
- i++;
- Thread.sleep(3000 + i * l);
- } finally {
- lock.unlock();
- }
- continue;
- }
- if (zkClient.getState().isConnected()) {
- sync();
- scheduler();
- Thread.sleep(3000);
- i = 0;
- }else{
- Thread.sleep(3000);
- }
- }
- } catch (InterruptedException e) {
- System.out.println("SID:" + sid + ",SyncHandler Exit...");
- close();
- }
-
- }
- }
-
-
- class WorkerHandler implements Runnable{
- private Set<Worker> pending = new HashSet<Worker>();
- private int count = 0;
-
-
- private boolean register(Worker worker){
- lock.lock();
-
- String jobName = worker.getJob().getKey().getName();
- try{
- Transaction tx = zkClient.transaction();
- String jobPath = "/" + serverType + "/" + jobName;
- if(zkClient.exists(jobPath, false) == null){
- tx.create(jobPath, worker.getCronExpression().getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
- }
- String registerPath = "/" + serverType + "/" + jobName+ REGISTER;
- if(zkClient.exists(registerPath, false) == null){
- tx.create(registerPath, null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
- }
- String alivePath = "/" + serverType + "/" + jobName+ ALIVE;
- if(zkClient.exists(alivePath, false) == null){
- tx.create(alivePath, null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
- }
- tx.create(registerPath + "/" + sid, null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
- tx.commit();
- }catch(NodeExistsException e){
-
- }catch(Exception e){
- e.printStackTrace();
- pending.add(worker);
-
-
- }
- lock.unlock();
- return true;
- }
-
- public void run(){
- try{
- while(true){
- synchronized (tag) {
- try{
- while(!scheduler.isStarted()){
- tag.wait();
- }
- }catch(Exception e){
-
- }
- }
- System.out.println("Worker handler,running...");
- if(zkClient != null && zkClient.getState().isConnected()){
- System.out.println("Register...");
- Worker worker = outgoingWorker.take();
- register(worker);
- if(!pending.isEmpty()){
- Thread.sleep(500);
- Iterator<Worker> it = pending.iterator();
- while(it.hasNext()){
- boolean isOk = register(it.next());
- if(!isOk){
- count++;
- Thread.sleep(1000);
- }else{
- count = 0;
- it.remove();
- }
-
- if(count > 20){
- pending.clear();
- }
- }
- }
-
- }else{
- Thread.sleep(1000);
- }
- }
-
- }catch(InterruptedException e){
- System.out.println("SID:" + sid + ",WorkerHandler Exit...");
- close();
- }
- }
- }
-
-
- public void clear(){
- lock.lock();
- try{
- if(zkClient != null && zkClient.getState().isConnected()){
- zkClient.delete("/" + serverType, -1);
- }
- }catch(Exception e){
- e.printStackTrace();
- }finally{
- lock.unlock();
- }
- }
-
- }
其他辅助类,请参考附件中的源码,谢谢。
zookeeper实战:SingleWorker代码样例
标签:
原文地址:http://www.cnblogs.com/scwanglijun/p/4268730.html