标签:日志文件 行锁 删除 ati 计数 todo name 学习 事务
定义:Redis 是完全开源免费基于内存亦可持久化的,遵守 BSD 协议,是一个高性能的 key-value 数据库。
特点:
优势:
Redis提供两种持久化机制 RDB 和 AOF 机制。
RDB(Redis DataBase)持久化方式
定义:记录 redis 数据库的所有键值对,在某个时间点将数据写入一个临时文件,持久化结束后,用这个临时文件替换上次持久化的文件,达到数据恢复。
优势:
劣势:
AOF(Append-only file)持久化方式
定义:把写操作指令,持续的写到一个类似日志文件里,保存为 aof 文件。
优势:
劣势:
##设置键“ZHKey”的过期时间为60s(单位默认为秒) EXPIRE ZHKey 60 ##移除键“ZHKey”的过期时间,使得键永不过期 PERSIST ZHKey
解析:
volatile 和 allkeys 规定了是对已设置过期时间的数据集淘汰数据还是从全部数据集淘汰数据,后面的 lru、ttl 以及 random 是三种不同的淘汰策略,再加上一种 no-enviction 永不回收的策略。
使用规则:
PS:每次使用redis新增数据时,会检查内存是否越界,如果大于 maxmemory 的限制,则根据设定好的策略进行回收。
(1)Redis 为了达到最快的读写速度将数据都读到内存中,并通过异步的方式将数据写入磁盘。所以 redis 具有快速和数据持久化的特征。
(2)如果不将数据放在内存中,磁盘 I/O 速度会严重影响 redis 的性能。
(3)如果设置了最大使用的内存,则数据已有记录数达到内存限值后不能继续插入新值。
redis中的集群分为三种:主从模式、哨兵模式、集群模式
特点:从服务器会对主服务器的数据和操作命令进行复制,主服务器可进行读写功能,从服务器只能进行读功能。
全量同步
Redis全量复制一般发生在Slave(从服务器)初始化阶段,这时Slave需要将Master上的所有数据都复制一份。具体步骤如下:
借用资料的一张图:(原文戳这里)
增量同步
Redis增量复制是指Slave(从服务器)初始化后开始正常工作时主服务器发生的写操作同步到从服务器的过程。增量复制的过程主要是主服务器每执行一个写命令就会向从服务器发送相同的写命令,从服务器接收并执行收到的写命令。
同步策略
使用方式:
#启动从服务器(6380端口),监听主服务器(6379端口),或者在redis的conf文件里面配置 redis-server --port 6380 --slaveof 127.0.0.1 6379
存在缺陷:当主服务器挂了的时候,从服务器无法提供写功能,凉凉思密达。
小结:首先会尝试进行增量同步,如不成功,要求从机进行全量同步。
由于主从模式中的主服务器宕机后,无法继续进行写操作(从机只能进行读操作),那不就凉凉了么,所以引出哨兵模式。
概念:哨兵的作用就是监控redis主、从数据库是否正常运行,主出现故障自动将从数据库转换为主数据库。
哨兵的任务:(详细工作流程图解看这里)
监控: Sentinel 会不断地检查你的主服务器和从服务器是否运作正常。
提醒: 当被监控的某个 Redis 服务器出现问题时, Sentinel 可以通过 API 向管理员或者其他应用程序发送通知。
自动故障迁移:
使用方式:
##1主2从1哨兵 #启动主服务器,端口为6379 redis-server --port 6379 #启动从服务器,端口为6380,并监听IP为192.168.0.167 6379的主服务器 redis-server --port 6380 --slaveof 192.168.0.167 6379 #启动从服务器,端口为6381,并监听IP为192.168.0.167 6379的主服务器 redis-server --port 6381 --slaveof 192.168.0.167 6379 #配置哨兵文件sentinel.conf,这里的1代表1个哨兵
PS:只用配置主服务器就好,哨兵会自动识别从服务器 sentinel monitor mymaster 192.168.0.167 6379 1
我们常见的集群模式一般为“三主三从”,至少需要六台服务器来配合部署(至少需要三个主数据库才能正常运行,且太少达不到集群效果,可以选择哨兵模式),具体的部署流程可以看这里
特点:
使用方式:
#1、开启redis的集群模式 cluster-enabled yes #2、配置集群模式下的配置文件 cluster-config-file nodes-6379.conf #3、集群内节点之间支持最长响应时间 cluster-node-timeout 15000 #4、借助redis-tri.rb的工具可以部署集群,执行以下命令就可以成功创建集群 PS:--replicas 后面的1表示每台主服务器配置1个从服务器 redis-trib.rb create --replicas 1 127.0.0.1:6379 127.0.0.1:6380 127.0.0.1:6381 127.0.0.1:6382 127.0.0.1:6383 127.0.0.1:6384
安装redis-tri.rb命令 、redis-trib.rb命令详解
异步复制,并不能保证数据的强一致性。
事务相关的命令:(详细讲解看这里)
PS:redis的每条命令是原子性的,但是事务不支持原子性,即事务块中的某条语句报错后,其他语句一样会执行,且没有回滚。
选用Redis实现分布式锁原因:(1)Redis有很高的性能;(2)Redis命令对此支持较好,实现起来比较方便
相关命令:
SETNX
可以用于加锁(当键存在返回0,不存在设置一个新的键值对并返回1)
##当且仅当键“ZHKey”不存在时,set一个键为“ZHKey”的字符串键值对,返回1;若键“ZHKey”存在,则什么都不做,返回0
SETNX ZHKey 值
EXPIRE
设置键的超时时间
##为键设置一个超时时间,单位为秒,超过这个时间锁会自动释放,避免死锁。 EXPIRE ZHKey 60 #设置键为“ZHKey”的键值对,超时时间为60秒
DELETE
删除指定的键值对
##删除键为“ZHKey”的键值对
DELETE ZHKey
实现思想:(可参考此处)
/** * Redis实现分布式锁 */ public class RedisLock { //JedisPool(Jedis线程池) private final JedisPool jedisPool; /** * 构造方法实例化JedisPool * * @param jedisPool */ public RedisLock(JedisPool jedisPool) { this.jedisPool = jedisPool; } /** * 加锁 * * @param lockName 锁的key * @param acquireTimeout 获取超时时间 * @param timeout 锁的超时时间 * @return 锁标识(即锁的值 , 用于释放锁时的校验) */ public String lockWithTimeout(String lockName, long acquireTimeout, long timeout) { //Jedis实例 Jedis conn = null; //锁的值(用于释放锁时的校验) String redisValue = null; try { // 获取连接 conn = jedisPool.getResource(); // 随机生成一个randomValue String randomValue = UUID.randomUUID().toString(); // 锁名,即key值 String lockKey = "lock:" + lockName; // 超时时间,上锁后超过此时间则自动释放锁 int lockExpire = (int) (timeout / 1000); // 获取锁的超时时间,超过这个时间则放弃获取锁 long end = System.currentTimeMillis() + acquireTimeout; // 当处于锁的超时时间之内时,进行判断(在超时时间内或者方法体返回值之前,会一直处于while块中) while (System.currentTimeMillis() < end) { if (conn.setnx(lockKey, randomValue) == 1) { //不存在当前键时,设置一个新的键值对,并设置超时时间 conn.expire(lockKey, lockExpire); // 返回randomValue值,用于释放锁时间确认 redisValue = randomValue; return redisValue; } // 返回-1代表key没有设置超时时间,为key设置一个超时时间 if (conn.ttl(lockKey) == -1) { conn.expire(lockKey, lockExpire); } try { Thread.sleep(10); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } } catch (JedisException e) { e.printStackTrace(); } finally { //关闭Jedis if (conn != null) { conn.close(); } } return redisValue; } /** * 释放锁 * * @param lockName 锁的key * @param randomValue 释放锁的标识(即锁的值) * @return */ public boolean releaseLock(String lockName, String randomValue) { //Jedis实例 Jedis conn = null; // 锁名,即key值 String lockKey = "lock:" + lockName; //锁是否释放成功标识 boolean releaseFlag = false; try { // 获取连接 conn = jedisPool.getResource(); while (true) { // 监视lock,准备开始事务 conn.watch(lockKey); // 通过前面返回的randomValue值判断是不是该锁,若是该锁,则删除,释放锁 if (randomValue.equals(conn.get(lockKey))) { //conn.multi()标记开始事务 Transaction transaction = conn.multi(); //删除键值对 transaction.del(lockKey); //transaction.exec()执行事务块内命令,并返回结果集 List<Object> results = transaction.exec(); if (results == null) { continue; } releaseFlag = true; } //解除watch对所有key的监控 conn.unwatch(); break; } } catch (JedisException e) { e.printStackTrace(); } finally { //关闭Jedis if (conn != null) { conn.close(); } } return releaseFlag; } }
业务实现代码块:
/** * 模拟使用redis实现分布式锁的业务代码 */ public class redisLockService { //JedisPool池对象 private static JedisPool pool = null; //Redis分布式锁对象 private RedisLock lock = new RedisLock(pool); //静态代码块初始化JedisPool(最好放在配置文件中读取) static { JedisPoolConfig config = new JedisPoolConfig(); // 设置最大连接数(表示同时最多有100个数据库连接) config.setMaxTotal(100); // 设置最大空闲数(表示连接池中最多可空闲maxIdle个连接 ,这里取值为20,表示即使没有数据库连接时依然可以保持20空闲的连接,而不被清除,随时处于待命状态。) config.setMaxIdle(8); // 设置最大连接等待时间 config.setMaxWaitMillis(1000 * 100); // 在borrow一个jedis实例时,是否需要验证,若为true,则所有jedis实例均是可用的 config.setTestOnBorrow(true); pool = new JedisPool(config, "127.0.0.1", 6379, 3000); } public void lockSomeThings() { //加锁,返回锁的value值,供解锁时使用 String redisValue = lock.lockWithTimeout("goodsStockReduce", 5000, 1000); //TODO 这里可添加需要使用分布式锁的代码(如库存扣减服务) System.out.println(Thread.currentThread().getName() + "获得了锁"); //解锁 lock.releaseLock("resource", redisValue); } }
标签:日志文件 行锁 删除 ati 计数 todo name 学习 事务
原文地址:https://www.cnblogs.com/riches/p/12234153.html