系统序列号生成服务是写的一个jar包,不依赖其他服务和数据,以下提供部分代码作为一个思路,作为大量数据订单生成时,不再使用数据库表的自增设置,由个系统模块自行生成。
+ 生成时间 yyMMddHHmmss
+ 3位服务节点(001 到 999)
+ N位滚动序列(000001 到 999999 长度可自定义)
+ 每秒钟单节点产生序列大于 999999 将会造成序列重复
+ 每秒钟单节点产生序列小于 999999 则不会重复.
+ 应用启动后扫描zk /sequence/${appName}/${seqName}/ 下的子节点.
+ 无节点则从001 开始创建节点.
+ 有节点则判断是否有绑定关系,有则继续使用无则新创建。
+ 序列基本本地生成(除启动时注册zk),省去其它网络开销,数据库开销。
+ 序列批次生成,每次生成一个批次放入队列。
+ 通过zk 解决分布式问题,每台机器部署的应用生成的序列不会重复。
+ 动态节点的扩容及减少无需修改配置
+ 不支持同一个应用在同一台机器上部署多个。
+ 其它未知情况
<dependency>
<groupId>com.system.commons</groupId>
<artifactId>commons-sequence</artifactId>
<version>1.0.0-SNAPSHOT</version>
</dependency>
+ JDK 1.8
+ Zookeeper 3.5.1-alpha
+ 第三方jar依赖
<dependency>
<groupId>com.system.commons</groupId>
<artifactId>commons</artifactId>
</dependency>
<dependency>
<groupId>com.system.logback</groupId>
<artifactId>logback</artifactId>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
</dependency>
package com.system.commons.sequence.zk; import lombok.extern.slf4j.Slf4j; import java.util.HashMap; import java.util.Map; import java.util.Objects; /** * 序列生成工具<br/> * * 组成结构:yyMMddHHmm(可自定义) + 3位服务节点(001 到 999) + N位滚动序列(000001 到 999999 长度可自定义)<br/> * * 重复性解决方案:每分钟单节点产生序列大于 999999 将会造成序列重复,如果小于该数值则不会重复.<br/> * * 分布式解决方案:应用节点启动后扫描zookeeper sequence 节点 /sequence/${appName}/ 下的子节点 * 如无子节点则从001 开始创建节点,如有子节点则 001 > 节点 < 已有节点最小节点值 或 已有节点最大值 > 节点 < 999 * * 每个服务节点startUp 后会去zk 寻找属于自己的服务节点标志,若找不到则在/sequence/${appName}/下创建 * 一个新的跟机器及应用绑定的服务节点标志,若找到了则使用已有的服务节点标志。 * */ @Slf4j public class SequenceFactory { /** 缓存已经实例化的序列化生产者 */ private static Map<String,SequenceProducer> producerMap = new HashMap<>(); /** 将构造函数私有化 */ private SequenceFactory(){} /** * 获取序列化生产者 * * @param zkAddress zookeeper 连接地址(ip:port) * @param appName 应用名称 * @param seqName 序列名称 * @param length 序列长度(建议至少18位,18位意味着单机并发超过999笔/秒后序列号将会重复) * @return 序列化生产者 */ public static SequenceProducer getProducer(String zkAddress,String appName,String seqName,Integer length) { SequenceProducer producer; synchronized (SequenceFactory.class) { producer = producerMap.get(appName + seqName); if (Objects.isNull(producer)) { producer = new SequenceProducer(appName,seqName,zkAddress,length); producerMap.put(appName + seqName,producer); } } return producer; } }
package com.system.commons.sequence.zk; import com.system.commons.utils.DateUtil; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import java.util.Objects; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; /** * 序列生成工具<br/> * * 组成结构:yyMMddHHmm + 3位服务节点(001 到 999) + N位滚动序列(000001 到 999999 长度可自定义)<br/> * * 重复性解决方案:每分钟单节点产生序列大于 999999 将会造成序列重复,如果小于该数值则不会重复.<br/> * * 分布式解决方案:应用节点启动后扫描zookeeper sequence 节点 /sequence/${appName}/${seqName}/ 下的子节点 * 如无子节点则从001 开始创建节点,如有子节点则 001 > 节点 < 已有节点最小节点值 或 已有节点最大值 > 节点 < 999 * * 每个服务节点startUp 后会去zk 寻找属于自己的服务节点标志,若找不到则在/sequence/${appName}/${seqName}/下创建 * 一个新的跟机器及应用绑定的服务节点标志,若找到了则使用已有的服务节点标志。 * */ @Slf4j public final class SequenceProducer extends BaseSequenceProducer { /** 自增长序列ID */ private static Long _currentSequence = 1L; /** 序列缓存队列 */ private BlockingQueue<String> _sequenceQueue; /** 序列匹配应用名称 */ private String _appName; /** 序列名称 */ private String _seqName; /** Zookeeper 连接地址 */ private String _zkAddress; /** 序列总长度 */ private int _length; /** 序列前缀格式 */ private String _dataPattern = DateUtil.partPattern; /** 左补零长度 */ private int _leftPadLength; /** 最大序列号 */ private Long _maxSeq; /** 上一次生成凭证号的日期 */ private String _lastGenerateTime; private SequenceProducer(){} SequenceProducer(String appName, String seqName, String zkAddress,Integer length) { _appName = appName; _seqName = seqName; _zkAddress = zkAddress; if (Objects.nonNull(length) ) { _length = length; } if (length < 18) { log.error("序列号长度小于18位是不安全的,请另行实现"); System.exit(1); } if (length > 128) { log.error("序列号长度大于128位,请另行实现"); System.exit(1); } _leftPadLength = _length - _dataPattern.length() - 3; _maxSeq = (long) Math.pow(100, 2) - 1; _sequenceQueue = new ArrayBlockingQueue<>(_maxSeq.intValue()); } /** * 获取自定长序列 格式:yyMMddHHmm(自定义) + 3位服务节点标志(左补零) + 19位(可自定义)自增数字(左补零) * * @return 指定长度序列 */ public synchronized String getSequenceNo(){ String sequence = null; try { // 当前时间 String currentDate = DateUtil.getCurrent(_dataPattern); // 判断上次序列生成时间是否为空,如果为空则意味着是第一次生成(初始化生成时间) if(StringUtils.isBlank(_lastGenerateTime)){ _lastGenerateTime = currentDate; } // 判断是否需要重置:重置滚动号,上次生成日期,序列池 if(!StringUtils.isBlank(_lastGenerateTime) && !_lastGenerateTime.equals(currentDate)){ _lastGenerateTime = currentDate; _sequenceQueue.clear(); _currentSequence = 1L; } if (_sequenceQueue.isEmpty()) { generate(); } sequence = _sequenceQueue.poll(100, TimeUnit.MILLISECONDS); } catch (Exception e) { log.error(e.getMessage(),e); System.exit(1); } return sequence; } /** * 重新生成序列 * * 按既定时间重新生成序列,如:每分钟生成100万,一分钟后如果没被消耗完也会将队列里的序列清空按新的时间重新生成序列 * */ private void generate() { String seqNode = registerSeqNode(_zkAddress,_appName,_seqName); try { for (int i = 0; i < 1000; i++) { if (_currentSequence >= _maxSeq) { _currentSequence = 1L; } StringBuilder sequence = new StringBuilder(); sequence.append(_lastGenerateTime); sequence.append(seqNode); String seqNo = StringUtils.leftPad(String.valueOf(_currentSequence), _leftPadLength,"0"); _currentSequence ++; sequence.append(seqNo); _sequenceQueue.put(sequence.toString()); } } catch (Exception e) { log.error("生成序列号异常,系统退出...\r\n Error :{},Detail :{}",e.getMessage(),e); System.exit(1); } } /** * 注册序列服务节点(用于解决分布式部署生成重复序列) * * @param zkAddress zookeeper 连接地址 * @param appName 应用名称 * @param seqName 序列名称 * @return 序列服务节点001 ~ 999 */ private String registerSeqNode(String zkAddress, String appName, String seqName) { String seqNode = BaseSequenceProducer.selectSeqNode(zkAddress,appName,seqName); if (StringUtils.isBlank(seqNode)){ log.error("无法选择序列节点,ZK 连接异常 !"); } return seqNode; } }
package com.system.commons.sequence.zk; import com.system.commons.sequence.zk.utils.NodeSelector; import org.apache.commons.lang3.StringUtils; import java.util.HashMap; import java.util.Map; import java.util.Objects; /** * 简介 * */ public class BaseSequenceProducer { private static NodeSelector nodeSelector; private static Map<String,String> seqNodeMap = new HashMap<>(); public static synchronized String selectSeqNode(String zkAddress, String appName, String seqName){ String seqNode = seqNodeMap.get(appName + seqName); if (StringUtils.isNoneBlank(seqNode)) { return seqNode; } if (Objects.isNull(nodeSelector)){ nodeSelector = new NodeSelector(); } seqNode = nodeSelector.generateServerNode(zkAddress,appName,seqName); seqNodeMap.put(appName + seqName,seqNode); return seqNode; } }
package com.system.commons.sequence.zk.utils; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.imps.CuratorFrameworkState; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.zookeeper.CreateMode; import java.util.List; /** * 节点选择器<br/> * */ @Slf4j public class NodeSelector { /** zookeeper 客户端连接 */ private static CuratorFramework client = null; /** zookeeper 连接字符串 */ private static String connectString; /** zookeeper 序列节点名称 */ private static final String PATH = "/sequence"; /** 应用名称 */ private String applicationName; /** 序列名称 */ private String sequenceName; /** zookeeper 连接超时时间 */ private static final int _connection_timeout = 1000 * 10; /** Session 超时时间(一周,为了防止网络抖动节点被重复使用) */ private static final int _session_timeout = 1000 * 60 * 60 * 24 * 7; /** zookeeper 连接重试最大次数 */ private static final int _max_retry_times = 10; /** zookeeper 连接重试间隔休眠时间 */ private static final int _retry_sleep_times = 1000 * 30; private static String localIp = IPHelper.getLocalIP(); /** * 生成服务节点 * * @param connectString zookeeper连接字符串 * @param applicationName 应用名称 * @param sequenceName 序列名称 * @return 服务节点 */ public String generateServerNode (String connectString,String applicationName,String sequenceName){ if (StringUtils.isBlank(connectString)){ log.error("zookeeper 连接地址为空,系统异常退出."); System.exit(1); } if (StringUtils.isBlank(applicationName)) { log.error("应用名称为空,系统异常退出."); System.exit(1); } if (StringUtils.isBlank(NodeSelector.connectString)){ NodeSelector.connectString = connectString; } this.applicationName = applicationName; this.sequenceName = sequenceName; synchronized (NodeSelector.class){ connectZookeeper(); } return lockAndSelectNode(); } /** * 连接zookeeper服务 */ private void connectZookeeper(){ try{ if (null == client || !CuratorFrameworkState.STARTED.equals(client.getState())){ client = createSimple(); client = createWithOptions( connectString, new ExponentialBackoffRetry(_retry_sleep_times, _max_retry_times), _connection_timeout, _session_timeout ); client.start(); } createRootNode(); createAppNode(); createSeqNode(); } catch (Exception e){ log.error(e.getMessage(),e); System.exit(1); } } private void createRootNode() throws Exception{ if (null == client.checkExists().forPath(PATH)){ client.create() .withMode(CreateMode.PERSISTENT) .forPath(PATH, "project sequence node".getBytes()); } } private void createAppNode() throws Exception{ String pathData = "project [" + applicationName + "] sequence"; if (null == client.checkExists().forPath(PATH + "/" + applicationName)){ client.create() .withMode(CreateMode.PERSISTENT) .forPath(PATH + "/" + applicationName, pathData.getBytes()); } } private void createSeqNode() throws Exception{ if (null == client.checkExists().forPath(PATH + "/" + applicationName + "/" + sequenceName)){ client.create() .withMode(CreateMode.PERSISTENT) .forPath(PATH + "/" + applicationName + "/" + sequenceName); } } /** * 锁定并生成服务节点 * * @return 服务节点 */ private String lockAndSelectNode(){ try { String node = selectNode(); if (StringUtils.isBlank(node)){ throw new NullPointerException("selectNode return null"); } if ( null == client.checkExists().forPath(PATH + "/" + applicationName + "/" + sequenceName + "/" + node)){ client.create() .withMode(CreateMode.PERSISTENT) .forPath(PATH + "/" + applicationName + "/" + sequenceName + "/" + node,localIp.getBytes()); } return node; } catch (Exception e){ log.error(e.getMessage(),e); System.exit(1); } return null; } private CuratorFramework createSimple() { ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 3); return CuratorFrameworkFactory.newClient(NodeSelector.connectString, retryPolicy); } private CuratorFramework createWithOptions(String connectionString, RetryPolicy retryPolicy, int connectionTimeoutMs, int sessionTimeoutMs) { return CuratorFrameworkFactory .builder() .connectString(connectionString) .retryPolicy(retryPolicy) .connectionTimeoutMs(connectionTimeoutMs) .sessionTimeoutMs(sessionTimeoutMs) .build(); } /*** * 选择节点 * * @return 服务节点 */ private String selectNode(){ synchronized (NodeSelector.class){ try{ List<String> list = client.getChildren().forPath(PATH + "/" + applicationName + "/" + sequenceName); int minNodeData = 1; if (list.isEmpty()){ return StringUtils.leftPad(String.valueOf(minNodeData),3,"0"); } int [] pathDataArr = new int [list.size()]; int position = 0; for (String path : list){ pathDataArr[position] = Integer.valueOf(path); position ++; String data = new String(client.getData().forPath(PATH + "/" + applicationName + "/" + sequenceName + "/"+path)); if (!data.equals("") && data.equals(localIp)){ return StringUtils.leftPad(path,3,"0"); } } sort(pathDataArr); int node = pathDataArr[0] > minNodeData ? pathDataArr[0] - 1 : pathDataArr[pathDataArr.length -1] + 1; return StringUtils.leftPad(String.valueOf(node),3,"0"); } catch (Exception e) { log.error(e.getMessage(),e); } } return null; } /** * 冒泡排序 * * @param arr 需要排序的数组 */ private static void sort(int [] arr){ for(int i = 0 ; i < arr.length-1 ; i++){ for(int j = i+1 ; j < arr.length ; j++){ int temp ; if(arr[i] > arr[j]){ temp = arr[j]; arr[j] = arr[i]; arr[i] = temp; } } } } }
package com.system.commons.sequence.zk.utils; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import java.net.Inet4Address; import java.net.InetAddress; import java.net.NetworkInterface; import java.net.SocketException; import java.util.Enumeration; /** * IP获取帮助类 * * */ @Slf4j public class IPHelper { private static volatile String IP_ADDRESS = ""; private static final String LOCAL_IP = "127.0.0.1"; /** * 获取本地IP * * @return IP地址 */ public static String getLocalIP() { if (StringUtils.isNotBlank(IP_ADDRESS)) { return IP_ADDRESS; } try { Enumeration allNetInterfaces = NetworkInterface.getNetworkInterfaces(); InetAddress ip; while (allNetInterfaces.hasMoreElements()) { NetworkInterface netInterface = (NetworkInterface) allNetInterfaces.nextElement(); Enumeration addresses = netInterface.getInetAddresses(); while (addresses.hasMoreElements()) { ip = (InetAddress) addresses.nextElement(); if (ip != null && ip instanceof Inet4Address) { String tip = ip.getHostAddress(); if(LOCAL_IP.equals(tip)){ continue; } IP_ADDRESS = tip; return IP_ADDRESS; } } } } catch (SocketException e) { log.error("获取本机IP Socket异常:{}", e); }catch (Exception e) { log.error("获取本机IP异常:{}", e); } return LOCAL_IP; } }
package com.system.commons.sequence.redis.impl; import com.google.common.base.Strings; import com.system.commons.sequence.redis.SequenceFacade; import com.system.commons.sequence.redis.utils.SeqRedisManager; import lombok.extern.slf4j.Slf4j; import org.joda.time.DateTime; /** * 获取序列 * * <p> * 1 获取32位数字序列 * 2 获取定长序列 * </p> */ @Slf4j public class SequenceImpl extends SeqRedisManager implements SequenceFacade { /** * 日期格式:yyyyMMdd */ public static final String datePattern = "yyyyMMdd"; /** * 32长序列key */ public static final String LENGTH_32_KEY = "SEQUENCE:LENGTH_32_KEY"; /** * 自定义长序列key */ public static final String CUSTOM_KEY = "SEQUENCE:CUSTOM_KEY:"; /** * 获取32位数字序列 * * @return 32位长序列 */ @Override public String getUniqueSeq() { Long num = autoIncrement(LENGTH_32_KEY); String seq = DateTime.now().toString(datePattern) + Strings.padStart(String.valueOf(num), 24, ‘0‘); log.debug("32位序列:{}",seq); return seq; } /** * 获取定长序列 * 默认从0开始,超过指定长度后从0开始循环 * * @param key key * @param length 长度 * @return 指定长度序列 */ @Override public synchronized String getSeqByKey(String key, int length) { Long num = autoIncrement(CUSTOM_KEY+key); if(String.valueOf(num).length() > length){ num = autoIncrementBy(CUSTOM_KEY+key,-num); } String seq = Strings.padStart(String.valueOf(num), length, ‘0‘); log.debug("自定义长度序列:{}",seq); return seq; } }
package com.system.commons.sequence.redis.utils; import lombok.Setter; import lombok.extern.slf4j.Slf4j; import org.springframework.data.redis.connection.RedisConnection; import org.springframework.data.redis.connection.RedisSentinelConfiguration; import org.springframework.data.redis.connection.jedis.JedisConnectionFactory; import org.springframework.data.redis.core.StringRedisTemplate; import redis.clients.jedis.JedisPoolConfig; /** * redis实现 * * <p> * 1、redis 自增长 * 2、redis 增长定值 * </p> */ @Slf4j public class SeqRedisManager { /** * 用户 */ private static final String hostName = "requirepass" ; /** * 密码 */ private static final String pwd = "BaoFu@pay629" ; /** * 实例配置 */ @Setter public RedisSentinelConfiguration redisSentinelConfiguration; /** * 基础配置 */ @Setter public JedisPoolConfig jedisPoolConfig; /** * redisTemplate */ public StringRedisTemplate redisTemplate; /** * 初始化用户\密码 */ public void init(){ JedisConnectionFactory jedisConnectionFactory = new JedisConnectionFactory(redisSentinelConfiguration); jedisConnectionFactory.setHostName(hostName); jedisConnectionFactory.setPassword(pwd); jedisConnectionFactory.setPoolConfig(jedisPoolConfig); jedisConnectionFactory.afterPropertiesSet(); redisTemplate = new StringRedisTemplate(jedisConnectionFactory); log.info("序列初始化完成"); } /** * redis 自增长 * * @param keyEnum 关键字 */ public Long autoIncrement(final String keyEnum) { Long result = redisTemplate.execute((RedisConnection connection) -> { byte[] redisKey = redisTemplate.getStringSerializer().serialize(keyEnum); Long num = connection.incr(redisKey); log.debug("keyEnum:{} num:{}", keyEnum, num); return num; }); log.debug("incr response:{}", result); return result; } /** * redis 增长定值 * * @param keyEnum 关键字 * @param incrementBy 增长值 */ public Long autoIncrementBy(final String keyEnum,long incrementBy) { Long result = redisTemplate.execute((RedisConnection connection) -> { byte[] redisKey = redisTemplate.getStringSerializer().serialize(keyEnum); Long num = connection.incrBy(redisKey, incrementBy); log.debug("keyEnum:{} num:{},incrementBy:{}",keyEnum,num,incrementBy); return num; }); log.debug("incrBy response:{}", result); return result; } }
package com.system.commons.sequence.redis; /** * 获取序列 * * <p> * 1 获取32位数字序列 * 2 获取定长序列 * </p> */ public interface SequenceFacade { /** * 获取32位数字序列 * * @return 32位长序列 */ String getUniqueSeq(); /** * 获取定长序列 * 默认从0开始,超过指定长度后从0开始循环(可能出现全是0的序列) * * @param key key * @param length 长度 * @return 指定长度序列 */ String getSeqByKey(String key,int length); }
package com.system.commons.sequence; import com.system.commons.sequence.zk.SequenceFactory; import com.system.commons.sequence.zk.SequenceProducer; /** * 序列号测试类 */ public class Test { public static void main(String[] args) throws InterruptedException { String zkAddress = "10.0.21.56:2181"; for (int j = 0 ; j < 10 ; j ++) { new Thread (()->{ SequenceProducer producer = SequenceFactory.getProducer(zkAddress,"account","receipt",32); System.out.println("receiptNo => " +producer.getSequenceNo()); }).start(); new Thread (()->{ SequenceProducer producer = SequenceFactory.getProducer(zkAddress,"account","test",18); System.out.println("testNo =>" + producer.getSequenceNo()); }).start(); } Thread.sleep(1000 * 60 * 60); } }
本文出自 “让希望不再失望!” 博客,谢绝转载!
原文地址:http://peterz2011.blog.51cto.com/3186140/1839214