一、Zookeeper簡介
Zookeeper是一個分布式協同工具,提供了可靠的分布式節點數據管理、狀態同步、配置維護等功能,使得分布式應用中各個節點能夠協同工作。它具有高性能,遵循CP原則,即一致性和分區容錯性,保證了數據的一致性;同時可實現高可用性,保證節點的可用性。Zookeeper提供了輕量級的HTTP API,可以通過Java和其他編程語言使用。
二、鎖概念
鎖(Lock)是一種同步機制,用於管理對共享資源的訪問。在分布式環境中,鎖的實現變得更加困難,需要確保各個節點的同步和互斥。分布式鎖就是在分布式環境中,實現鎖的一種方式,保證各個節點的數據一致性和互斥性。
三、Zookeeper分布式鎖實現
在分布式鎖的實現中,先來看看Zookeeper的數據模型。Zookeeper中的數據模型是非常簡單的,類似於Unix文件系統,它也是一個樹狀結構,稱為ZNode。ZNode可以存儲一些數據,可以看成一個目錄,其中包含了一些子節點。每個ZNode都有一個版本號,可以實現對ZNode的樂觀鎖控制。
/**
* ZooKeeper實現分布式鎖
* 使用zk.create()方法的EPHEMERAL_SEQUENTIAL模式實現鎖的節點創建
*/
private ZooKeeper zk;
private String lockName;
private String lockPath;
private int sessionTimeout;
public ZookeeperLock(String connectStr, String lockName, int sessionTimeout) throws IOException {
this.lockName = lockName;
this.sessionTimeout = sessionTimeout;
// 連接zookeeper
zk = new ZooKeeper(connectStr, sessionTimeout, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
if (Watcher.Event.KeeperState.SyncConnected == watchedEvent.getState()) {
System.out.println("已連接");
}
}
});
}
// 創建鎖的節點
private void createLockNode(String lockName) throws InterruptedException {
try {
String basePath = "/locks";
// 創建鎖的節點,使用EPHEMERAL_SEQUENTIAL模式實現
lockPath = zk.create(basePath + "/" + lockName + "-", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
} catch (KeeperException e) {
throw new RuntimeException(e);
}
}
// 獲取鎖
public synchronized boolean getLock() throws InterruptedException {
// 創建鎖節點
createLockNode(lockName);
// 獲取/locks下的所有節點
List allLocks = zk.getChildren("/locks", false);
// 對節點按照名稱進行排序
Collections.sort(allLocks);
// 取出所有鎖節點中最小的節點,並判斷是否為當前鎖
String currentLockPath = lockPath.substring("/locks".length() + 1);
int index = allLocks.indexOf(currentLockPath);
if (index == 0) { // 已經獲取到鎖
return true;
} else { // 未獲取到鎖,刪除當前鎖節點
String preLockPath = "/locks/" + allLocks.get(index - 1);
Stat stat = zk.exists(preLockPath, true);
if (stat == null) {
return getLock();
} else {
CountDownLatch latch = new CountDownLatch(1);
Watcher watcher = new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
latch.countDown();
}
};
zk.getData(preLockPath, watcher, stat);
latch.await();
return getLock();
}
}
}
// 釋放鎖
public synchronized void releaseLock() {
try {
zk.delete(lockPath, -1);
zk.close();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (KeeperException e) {
throw new RuntimeException(e);
}
}
四、分布式鎖實現的問題和優化
4.1 問題
在上述Zookeeper分布式鎖的實現中,會出現“驚群效應”的問題。當有多個節點需要獲取鎖時,由於各個節點的狀態變化都會觸發Zookeeper的watcher機制,會使得羊群效應嚴重,性能嚴重下降。
4.2 優化
避免“驚群效應”的解決方案有很多種,這裡介紹一種基於Zookeeper的改進方案。在Zookeeper的watcher機制下,多個節點的狀態改變都會觸發,導致了“驚群效應”的問題。因此,解決方案就是減少狀態改變的次數,這樣能減少watcher的觸發次數,提高性能。
改進方案如下:
- 只在需要獲取鎖時,才創建鎖節點
- 在獲取到鎖時,傳入一個timeout參數作為鎖的過期時間,如果在指定的時間內鎖未被釋放,自動刪除鎖節點
- 在釋放鎖時,先檢查鎖是否已經失效
/**
* ZooKeeper實現分布式鎖 - 改進版
*/
private ZooKeeper zk;
private String lockName;
private String lockPath;
private int sessionTimeout;
public ZookeeperLock(String connectStr, String lockName, int sessionTimeout) throws IOException {
this.lockName = lockName;
this.sessionTimeout = sessionTimeout;
// 連接zookeeper
zk = new ZooKeeper(connectStr, sessionTimeout, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
if (Watcher.Event.KeeperState.SyncConnected == watchedEvent.getState()) {
System.out.println("已連接");
}
}
});
}
// 獲取鎖
public synchronized boolean getLock(long timeout) throws InterruptedException {
long start = System.currentTimeMillis();
// 創建鎖節點
createLockNode(lockName);
// 獲取/locks下的所有節點
List allLocks = zk.getChildren("/locks", false);
// 對節點按照名稱進行排序
Collections.sort(allLocks);
// 取出所有鎖節點中最小的節點,並判斷是否為當前鎖
String currentLockPath = lockPath.substring("/locks".length() + 1);
int index = allLocks.indexOf(currentLockPath);
if (index == 0) { // 已經獲取到鎖
return true;
} else { // 未獲取到鎖,監視前一個節點
String preLockPath = "/locks/" + allLocks.get(index - 1);
Stat stat = zk.exists(preLockPath, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
synchronized (this) {
notifyAll();
}
}
});
if (stat == null) {
return getLock(timeout - (System.currentTimeMillis() - start));
} else { // 等待前一個鎖釋放
synchronized (this) {
wait(timeout - (System.currentTimeMillis() - start));
}
return getLock(timeout);
}
}
}
// 創建鎖的節點
private void createLockNode(String lockName) throws InterruptedException {
try {
String basePath = "/locks";
// 創建鎖的節點,使用EPHEMERAL_SEQUENTIAL模式實現
lockPath = zk.create(basePath + "/" + lockName + "-", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
} catch (KeeperException e) {
throw new RuntimeException(e);
}
}
// 釋放鎖
public synchronized void releaseLock() {
try {
zk.delete(lockPath, -1);
zk.close();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (KeeperException e) {
throw new RuntimeException(e);
}
}
// 檢查鎖是否已經失效
private boolean isLockExpired(String currentLockPath, long timeout) throws Exception {
List allLocks = zk.getChildren("/locks", false);
int index = allLocks.indexOf(currentLockPath);
if (index < 0) { // 當前鎖節點已經不存在了
return true;
} else if (index == 0) { // 已經獲取到了鎖
return false;
} else { // 未獲取到鎖,檢查前一個節點是否已經失效
String preLockPath = "/locks/" + allLocks.get(index - 1);
Stat stat = zk.exists(preLockPath, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
synchronized (this) {
notifyAll();
}
}
});
if (stat == null) { // 前一個節點已經失效
return isLockExpired(currentLockPath, timeout);
} else { // 前一個節點未失效
CountDownLatch latch = new CountDownLatch(1);
Watcher watcher = new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
latch.countDown();
}
};
zk.getData(preLockPath, watcher, stat);
latch.await(timeout, TimeUnit.MILLISECONDS);
return isLockExpired(currentLockPath, timeout);
}
}
}
五、總結
在分布式系統中,實現分布式鎖是比較複雜的一件事情。Zookeeper提供可靠的分布式節點數據管理功能,適合實現分布式鎖。通過Zookeeper的全局有序性和watcher機制,能夠確保分布式鎖的同步和互斥。同時,為了避免“驚群效應”和提高分布式鎖的性能,可以通過上述的優化方案。
原創文章,作者:小藍,如若轉載,請註明出處:https://www.506064.com/zh-hant/n/285420.html
微信掃一掃
支付寶掃一掃