redis分布式锁的⼏种实现⽅式,以及Redisson的配置和使⽤最近在开发中涉及到了多个客户端的对redis的某个key同时进⾏增删的问题。这⾥就会涉及⼀个问题:锁
先举例在分布式系统中不加锁会出现问题:
  redis中存放了某个⽤户的账户余额,例如100 (⽤户id:余额)
  A端需要对⽤户扣费-1,需要两步:
    A1.将该⽤户的⽬前余额取出来(100)
    A2.将余额扣除⼀部分(99)后再插⼊到redis中
  B端需要对⽤户充值+10,需要两步:
    B1.将该⽤户的⽬前余额取出来(99)
    B2.将余额添加充值额度(109)后再插⼊到redis中
  我们的期望执⾏顺序是A1、A2、B1、B2  结果就会是109
  但是如果不加锁,就会出现A1、B1、A2、B2(110)或者其他各种随机情况,这样就会造成数据错误。
Redis加锁的⼏种实现⽅式
  ⽅式⼀,⾃⼰造轮⼦
    之前参考的很多博客,关于redis加锁都是先setNX()获取锁,然后再setExpire()设置锁的有效时间。
    然⽽这样的话获取锁的操作就不是原⼦性的了,如果setNX后系统宕机,就会造成锁死,系统阻塞。
    根据官⽅的推荐(redis.io/topics/distlock),最好使⽤set命令:SET key value [EX seconds] [PX milliseconds] [NX|XX]           EX PX设置有效时间    NX属性的作⽤就是如果key存在就返回失败,否则插⼊数据。
    需要注意的是:
      在Redis 2.6.12之前,set只能返回OK,所以⽆法判断操作是否成功,所以也就不适⽤。
      如果使⽤的是spring-boot-starter-data-redis依赖,那么在2.x版本之前的接⼝也不⽀持上述的set操作
    java代码:
//获取锁
//锁的键值需要具有标志性。
//例如,现在有两个系统需要对key=user_id,value=user_balance进⾏操作,这时就可以设计这个键的锁为user_id+"_key"
String user_id="1";
String key=user_id+"_key";
//值设置为⼀个随机数(下⾯讲原因)
String random_value=UUID.randomUUID().toString();
//只有2.0以上的版本才⽀持set返回插⼊结果Boolean
//此命令的意思是只有key不存在,才插⼊值,并且设置有效时间为10s
connection.Bytes(), Bytes(), Expiration.seconds(10), SetOption.SET_IF_ABSENT);
//本⽰例由于依赖版本低于2.0,所以⽆法接受set设置结果
Boolean result=true;
return result;
});
//进⾏更新操作...
//释放锁
//为什么释放之前要⽐较⼀下?
//这是为了防⽌删除掉别⼈的锁,例如此场景中:如果我们的中间操作超过了10s那么锁会⾃动释放,这时别⼈会再获取锁。
//如果我们执⾏完中间就直接删除锁的话,就会把别⼈的锁删除
if(redisTemplate.opsForValue().get(key)==random_value) {
redisTemplate.delete(key);
}
可以发现,如果⾃⼰来实现的话,受限很多。并且这还是最基本的操作,包括出错重试等功能都没有。
所以我们要学习redis推荐的reids⼯具redisson
  ⽅式⼆:集成redisson(github/redisson/redisson/wiki/2.-%E9%85%8D%E7%BD%AE%E6%96%B9%E6%B3%95)
  ⼀.添加依赖
<dependency>
<groupId&disson</groupId>
<artifactId>redisson</artifactId>
<version>3.6.1</version>
</dependency>
  ⼆.在resources⽂件夹添加配置⽂件l
singleServerConfig:
#连接空闲超时,单位:毫秒
idleConnectionTimeout: 10000
pingTimeout: 1000
#连接超时,单位:毫秒
connectTimeout: 10000
#命令等待超时,单位:毫秒
timeout: 3000
#命令失败重试次数
retryAttempts: 3
#命令重试发送时间间隔,单位:毫秒
retryInterval: 1500
#重新连接时间间隔,单位:毫秒
reconnectionTimeout: 3000
#执⾏失败最⼤次数
failedAttempts: 3
#单个连接最⼤订阅数量
subscriptionsPerConnection: 5
setoption
#客户端名称
clientName: null
#地址
address: "redis://192.168.1.16:6379"
#数据库编号
database: 0
#密码
password: xiaokong
#发布和订阅连接的最⼩空闲连接数
subscriptionConnectionMinimumIdleSize: 1
#发布和订阅连接池⼤⼩
subscriptionConnectionPoolSize: 50
#最⼩空闲连接数
connectionMinimumIdleSize: 32
#连接池⼤⼩
connectionPoolSize: 64
#是否启⽤DNS监测
dnsMonitoring: false
#DNS监测时间间隔,单位:毫秒
dnsMonitoringInterval: 5000
threads: 0
nettyThreads: 0
codec: !&dec.JsonJacksonCodec> {}
transportMode : "NIO"
  三.在Application中设置RedissonClient
batis.spring.annotation.MapperScan;
disson.Redisson;
disson.api.RedissonClient;
fig.Config;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
import t.annotation.Bean;
import io.ClassPathResource;
import ansaction.annotation.EnableTransactionManagement;
@SpringBootApplication
@EnableTransactionManagement
@MapperScan("apper")
public class Application {
public static void main(String [] args) {
SpringApplication.run(Application.class, args);
}
@Bean(destroyMethod="shutdown")
public RedissonClient redisson() throws IOException {
RedissonClient redisson = ate(
Config.fromYAML(new ClassPathResource("l").getInputStream()));
return redisson;
}
}
  四.在代码中使⽤
@Autowired
private RedissonClient redisson;
@Test
public void redisson() {
String user_id="1";
String key=user_id+"_key";
//获取锁
RLock lock = Lock(key);
lock.lock();
/
/执⾏具体逻辑...
RBucket<Object> bucket = Bucket("a");
bucket.set("bb");
lock.unlock();
}
需要注意的是redisson的使⽤和redisTemplate有⽐较⼤的区别,这⾥简单介绍⼀下⼏个特性:(刚⽤时迷了很久,希望⼤家能少⾛些弯路)
1.在redisson中不需要set指令,举个例⼦:
RBucket<Object> bucket = Bucket("a");
bucket.set("bb");
在这两条语句中,我们只获取了key="a"的bucket类型对象(⾥⾯可以装⼀个任意对象)。然后修改bucket⾥⾯⼀个值,其实这时["a","bb"]已经被存⼊redis了
2.所有的值都是结构体
和上例的RBucket结构体⼀样,redisson提供了⼗⼏种结构体(github/redisson/redisson/wiki/7.-
%E5%88%86%E5%B8%83%E5%BC%8F%E9%9B%86%E5%90%88)供我们使⽤,当取值时,redisson也会⾃动将值转换成对应的结构体。所以如果使⽤redisson取redisTemplate放⼊的值,就要⼩⼼报错
⽅式三.基于redlock的算法讨论
这种我还没有具体实现过,为什么会出现这种算法,主要是应对redis服务器宕机的问题。当redis宕机时,即使有主从,但是依然会有⼀个同步间隔。这样就会造成数据流失。
当然,更为严重的是,在分布式情况下,丢失的是锁,我们知道⼀般⽤锁的数据都是⽐较重要的。
⼀个场景:A在向主机1请求到锁成功后,主机1宕机了。现在从机1a变成了主机。但是数据没有同步,从机1a是没有A的锁的。那么B⼜可以获得⼀个锁。这样就会造成数据错误。
redlock主要思想就是做数据冗余。建⽴5台独⽴的集,当我们发送⼀个数据的时候,要保证3台(n/2+1)以上的机器接受成功才算成功,否则重试或报错。
当然具体是很复杂,想研究的可以看看(redis.io/topics/distlock)
⽅式四.使⽤zookeeper+redis来管理锁
就像之前讨论的,⽅式2只能保证客户端的正确,却⽆法保证服务端的宕机数据丢失。⽅式三的数据完整性很⾼,但是管理起来很复杂。这时就有了⼀个折中的做法:
将锁存放在zookeeper中,由于zookeeper与redis的场景不同,所以zookeeper的算法对数据的完整性要求很⾼。在分布式的zookeeper中,数据是很难丢失的。
这样,我们就可以把锁放到zookeeper中,来保证锁的完整性。
好吧,这个我也没有实验过(羞耻),不过⽹上⼜很多这⽅⾯的博客,以后⽤到再说吧~~~⼀般项⽬⽤⽅式⼆就可以啦,