码迷,mamicode.com
首页 > 其他好文 > 详细

模拟实现配置中心配置发生变化时秒级推送至客户端代码思路

时间:2019-05-04 00:42:08      阅读:163      评论:0      收藏:0      [点我收藏+]

标签:syn   style   col   request   etc   factory   spring   setvalue   time   

import com.alibaba.fastjson.JSON;
import com.xuebusi.spring.study.http.BasicHttpUtil;
import com.xuebusi.spring.study.model.ConfData;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.context.request.async.DeferredResult;

import java.util.Collection;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * 模拟实现配置中心配置发生变化时秒级推送至客户端
 * 操作步骤:
 * 1.请求http://host:port/conf/add?key=&value=接口添加数据
 * 2.请求http://host:port/conf/get?key=接口查看数据
 * 3.请求http://host:port/conf/set?key=&value=接口修改数据
 * 4.观察控制台日志会看到配置变化的通知
 * <p>
 * 原理:
 * 1.项目启动时,模拟一个客户端调用 /conf/monitor 接口请求数据,创建DeferredResult对象,
 * 以客户端ID为key,以DeferredResult对象为value放入 deferredResultMap;
 * 2.当客户端调用修改接口时,就将修改后的数据放入 changedDataMap(这里用一个map存,实际应该存数据库);
 * 3.通过另一个线程定时(间隔1s)查看 changedDataMap 中变化的数据;
 * 4.如果数据有变化就遍历deferredResultMap通过 deferredResult.setResult() 通知给所有的客户端;
 *
 * @author syj
 */
@RestController
@RequestMapping
public class ConfController implements InitializingBean {

    private static final int BEAT_TIME_OUT = 30;

    private static ExecutorService executorService = Executors.newCachedThreadPool();

    private static Map<String, ConfData> changedDataMap = new ConcurrentHashMap<>();

    private Map<String, DeferredResult> deferredResultMap = new ConcurrentHashMap();

    // 模拟数据库存储配置
    public static Map<String, ConfData> confDataMap = new ConcurrentHashMap<>();

    /**
     * 获取最新配置
     * 使用 DeferredResult 返回异步结果
     *
     * @param clientId
     * @return
     */
    @GetMapping("/conf/monitor")
    public DeferredResult<String> monitor(String clientId) {
        DeferredResult<String> result = new DeferredResult<>(BEAT_TIME_OUT * 1000L, "30秒超时啦");
        deferredResultMap.put(clientId, result);
        return result;
    }

    /**
     * 查询所有配置
     *
     * @return
     */
    @GetMapping("/conf/list")
    public Result<Map<String, ConfData>> list() {
        return new Result<>(confDataMap);
    }

    /**
     * 查询配置
     *
     * @param key
     * @return
     */
    @GetMapping("/conf/get")
    public Result<ConfData> get(String key) {
        ConfData confData = confDataMap.get(key);
        if (confData == null) {
            return new Result<ConfData>(Result.FAIL_CODE, "key不存在");
        } else {
            return new Result<ConfData>(confData);
        }
    }

    /**
     * 修改配置
     *
     * @param key
     * @param value
     * @return
     */
    @GetMapping("/conf/set")
    public Result<String> set(String key, String value) {
        ConfData confData = confDataMap.get(key);
        if (confData == null) {
            return new Result<String>(Result.FAIL_CODE, "key不存在");
        } else {
            if (!confData.getValue().equals(value)) {
                confData.setValue(value);
                confDataMap.put(key, confData);
                // 同时放到 changedDataMap 中一份最新数据
                changedDataMap.put(key, confData);
            }
            return Result.SUCCESS;
        }
    }

    /**
     * 新增配置
     *
     * @param key
     * @param value
     * @return
     */
    @GetMapping("/conf/add")
    public Result<String> add(String key, String value) {
        ConfData confData = confDataMap.get(key);
        if (confData != null) {
            return new Result<String>(Result.FAIL_CODE, "key已经存在了");
        }
        confDataMap.put(key, new ConfData(key, value));
        return Result.SUCCESS;
    }

    @Override
    public void afterPropertiesSet() throws Exception {

        // 配置中心启动线程监控配置变化
        executorService.execute(() -> {
            while (true) {
                Collection<ConfData> values = changedDataMap.values();
                if (values != null && values.size() > 0) {
                    for (String key : changedDataMap.keySet()) {
                        for (String clientId : deferredResultMap.keySet()) {
                            DeferredResult deferredResult = deferredResultMap.get(clientId);
                            String msg = "通知客户端" + clientId + "配置" + key + "发生变化:" + JSON.toJSONString(changedDataMap.get(key));
                            deferredResult.setResult(msg);

                            // 移除已经通知过的客户端
                            deferredResultMap.remove(clientId);

                            // 移除已经通知过的数据
                            changedDataMap.remove(key);
                        }
                    }
                }
                try {
                    Thread.sleep(1000L);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });

        // 模拟客户端 http请求最新配置
        executorService.execute(() -> {
            String clientId = UUID.randomUUID().toString().replace("-", "");
            while (true) {
                String url = "http://localhost:8888/conf/monitor?clientId=" + clientId;
                String result = BasicHttpUtil.get(url, BEAT_TIME_OUT + 30);
                System.out.println(Thread.currentThread().getName() + "----http调用结果:" + result);
            }
        });
    }

    /**
     * 返回结果
     *
     * @param <T>
     */
    public static class Result<T> {

        public static int SUCCESS_CODE = 200;
        public static int FAIL_CODE = 500;
        public static final Result<String> SUCCESS = new Result<>(SUCCESS_CODE, "操作成功");
        public static final Result<String> FAIL = new Result<>(FAIL_CODE, "操作失败");

        private int code;
        private String msg;
        private T data;

        public Result(T data) {
            this.data = data;
        }

        public Result(int code, String msg) {
            this.code = code;
            this.msg = msg;
        }

        public Result(int code, String msg, T data) {
            this.code = code;
            this.msg = msg;
            this.data = data;
        }

        public int getCode() {
            return code;
        }

        public void setCode(int code) {
            this.code = code;
        }

        public String getMsg() {
            return msg;
        }

        public void setMsg(String msg) {
            this.msg = msg;
        }

        public T getData() {
            return data;
        }

        public void setData(T data) {
            this.data = data;
        }
    }
}

 

模拟实现配置中心配置发生变化时秒级推送至客户端代码思路

标签:syn   style   col   request   etc   factory   spring   setvalue   time   

原文地址:https://www.cnblogs.com/jun1019/p/10807055.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!