一、Zookeeper简介
Zookeeper是一个分布式协同工具,提供了可靠的分布式节点数据管理、状态同步、配置维护等功能,使得分布式应用中各个节点能够协同工作。它具有高性能,遵循CP原则,即一致性和分区容错性,保证了数据的一致性;同时可实现高可用性,保证节点的可用性。Zookeeper提供了轻量级的HTTP API,可以通过Java和其他编程语言使用。
二、锁概念
锁(Lock)是一种同步机制,用于管理对共享资源的访问。在分布式环境中,锁的实现变得更加困难,需要确保各个节点的同步和互斥。分布式锁就是在分布式环境中,实现锁的一种方式,保证各个节点的数据一致性和互斥性。
三、Zookeeper分布式锁实现
在分布式锁的实现中,先来看看Zookeeper的数据模型。Zookeeper中的数据模型是非常简单的,类似于Unix文件系统,它也是一个树状结构,称为ZNode。ZNode可以存储一些数据,可以看成一个目录,其中包含了一些子节点。每个ZNode都有一个版本号,可以实现对ZNode的乐观锁控制。
/**
* ZooKeeper实现分布式锁
* 使用zk.create()方法的EPHEMERAL_SEQUENTIAL模式实现锁的节点创建
*/
private ZooKeeper zk;
private String lockName;
private String lockPath;
private int sessionTimeout;
public ZookeeperLock(String connectStr, String lockName, int sessionTimeout) throws IOException {
this.lockName = lockName;
this.sessionTimeout = sessionTimeout;
// 连接zookeeper
zk = new ZooKeeper(connectStr, sessionTimeout, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
if (Watcher.Event.KeeperState.SyncConnected == watchedEvent.getState()) {
System.out.println("已连接");
}
}
});
}
// 创建锁的节点
private void createLockNode(String lockName) throws InterruptedException {
try {
String basePath = "/locks";
// 创建锁的节点,使用EPHEMERAL_SEQUENTIAL模式实现
lockPath = zk.create(basePath + "/" + lockName + "-", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
} catch (KeeperException e) {
throw new RuntimeException(e);
}
}
// 获取锁
public synchronized boolean getLock() throws InterruptedException {
// 创建锁节点
createLockNode(lockName);
// 获取/locks下的所有节点
List allLocks = zk.getChildren("/locks", false);
// 对节点按照名称进行排序
Collections.sort(allLocks);
// 取出所有锁节点中最小的节点,并判断是否为当前锁
String currentLockPath = lockPath.substring("/locks".length() + 1);
int index = allLocks.indexOf(currentLockPath);
if (index == 0) { // 已经获取到锁
return true;
} else { // 未获取到锁,删除当前锁节点
String preLockPath = "/locks/" + allLocks.get(index - 1);
Stat stat = zk.exists(preLockPath, true);
if (stat == null) {
return getLock();
} else {
CountDownLatch latch = new CountDownLatch(1);
Watcher watcher = new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
latch.countDown();
}
};
zk.getData(preLockPath, watcher, stat);
latch.await();
return getLock();
}
}
}
// 释放锁
public synchronized void releaseLock() {
try {
zk.delete(lockPath, -1);
zk.close();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (KeeperException e) {
throw new RuntimeException(e);
}
}
四、分布式锁实现的问题和优化
4.1 问题
在上述Zookeeper分布式锁的实现中,会出现“惊群效应”的问题。当有多个节点需要获取锁时,由于各个节点的状态变化都会触发Zookeeper的watcher机制,会使得羊群效应严重,性能严重下降。
4.2 优化
避免“惊群效应”的解决方案有很多种,这里介绍一种基于Zookeeper的改进方案。在Zookeeper的watcher机制下,多个节点的状态改变都会触发,导致了“惊群效应”的问题。因此,解决方案就是减少状态改变的次数,这样能减少watcher的触发次数,提高性能。
改进方案如下:
- 只在需要获取锁时,才创建锁节点
- 在获取到锁时,传入一个timeout参数作为锁的过期时间,如果在指定的时间内锁未被释放,自动删除锁节点
- 在释放锁时,先检查锁是否已经失效
/**
* ZooKeeper实现分布式锁 - 改进版
*/
private ZooKeeper zk;
private String lockName;
private String lockPath;
private int sessionTimeout;
public ZookeeperLock(String connectStr, String lockName, int sessionTimeout) throws IOException {
this.lockName = lockName;
this.sessionTimeout = sessionTimeout;
// 连接zookeeper
zk = new ZooKeeper(connectStr, sessionTimeout, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
if (Watcher.Event.KeeperState.SyncConnected == watchedEvent.getState()) {
System.out.println("已连接");
}
}
});
}
// 获取锁
public synchronized boolean getLock(long timeout) throws InterruptedException {
long start = System.currentTimeMillis();
// 创建锁节点
createLockNode(lockName);
// 获取/locks下的所有节点
List allLocks = zk.getChildren("/locks", false);
// 对节点按照名称进行排序
Collections.sort(allLocks);
// 取出所有锁节点中最小的节点,并判断是否为当前锁
String currentLockPath = lockPath.substring("/locks".length() + 1);
int index = allLocks.indexOf(currentLockPath);
if (index == 0) { // 已经获取到锁
return true;
} else { // 未获取到锁,监视前一个节点
String preLockPath = "/locks/" + allLocks.get(index - 1);
Stat stat = zk.exists(preLockPath, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
synchronized (this) {
notifyAll();
}
}
});
if (stat == null) {
return getLock(timeout - (System.currentTimeMillis() - start));
} else { // 等待前一个锁释放
synchronized (this) {
wait(timeout - (System.currentTimeMillis() - start));
}
return getLock(timeout);
}
}
}
// 创建锁的节点
private void createLockNode(String lockName) throws InterruptedException {
try {
String basePath = "/locks";
// 创建锁的节点,使用EPHEMERAL_SEQUENTIAL模式实现
lockPath = zk.create(basePath + "/" + lockName + "-", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
} catch (KeeperException e) {
throw new RuntimeException(e);
}
}
// 释放锁
public synchronized void releaseLock() {
try {
zk.delete(lockPath, -1);
zk.close();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (KeeperException e) {
throw new RuntimeException(e);
}
}
// 检查锁是否已经失效
private boolean isLockExpired(String currentLockPath, long timeout) throws Exception {
List allLocks = zk.getChildren("/locks", false);
int index = allLocks.indexOf(currentLockPath);
if (index < 0) { // 当前锁节点已经不存在了
return true;
} else if (index == 0) { // 已经获取到了锁
return false;
} else { // 未获取到锁,检查前一个节点是否已经失效
String preLockPath = "/locks/" + allLocks.get(index - 1);
Stat stat = zk.exists(preLockPath, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
synchronized (this) {
notifyAll();
}
}
});
if (stat == null) { // 前一个节点已经失效
return isLockExpired(currentLockPath, timeout);
} else { // 前一个节点未失效
CountDownLatch latch = new CountDownLatch(1);
Watcher watcher = new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
latch.countDown();
}
};
zk.getData(preLockPath, watcher, stat);
latch.await(timeout, TimeUnit.MILLISECONDS);
return isLockExpired(currentLockPath, timeout);
}
}
}
五、总结
在分布式系统中,实现分布式锁是比较复杂的一件事情。Zookeeper提供可靠的分布式节点数据管理功能,适合实现分布式锁。通过Zookeeper的全局有序性和watcher机制,能够确保分布式锁的同步和互斥。同时,为了避免“惊群效应”和提高分布式锁的性能,可以通过上述的优化方案。
原创文章,作者:小蓝,如若转载,请注明出处:https://www.506064.com/n/285420.html
微信扫一扫
支付宝扫一扫