Zookeeper實現分佈式鎖

一、Zookeeper簡介

Zookeeper是一個分佈式協同工具,提供了可靠的分佈式節點數據管理、狀態同步、配置維護等功能,使得分佈式應用中各個節點能夠協同工作。它具有高性能,遵循CP原則,即一致性和分區容錯性,保證了數據的一致性;同時可實現高可用性,保證節點的可用性。Zookeeper提供了輕量級的HTTP API,可以通過Java和其他編程語言使用。

二、鎖概念

鎖(Lock)是一種同步機制,用於管理對共享資源的訪問。在分佈式環境中,鎖的實現變得更加困難,需要確保各個節點的同步和互斥。分佈式鎖就是在分佈式環境中,實現鎖的一種方式,保證各個節點的數據一致性和互斥性。

三、Zookeeper分佈式鎖實現

在分佈式鎖的實現中,先來看看Zookeeper的數據模型。Zookeeper中的數據模型是非常簡單的,類似於Unix文件系統,它也是一個樹狀結構,稱為ZNode。ZNode可以存儲一些數據,可以看成一個目錄,其中包含了一些子節點。每個ZNode都有一個版本號,可以實現對ZNode的樂觀鎖控制。

    /**
     * ZooKeeper實現分佈式鎖
     * 使用zk.create()方法的EPHEMERAL_SEQUENTIAL模式實現鎖的節點創建
     */

    private ZooKeeper zk;
    private String lockName;
    private String lockPath;
    private int sessionTimeout;

    public ZookeeperLock(String connectStr, String lockName, int sessionTimeout) throws IOException {
        this.lockName = lockName;
        this.sessionTimeout = sessionTimeout;
        // 連接zookeeper
        zk = new ZooKeeper(connectStr, sessionTimeout, new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {
                if (Watcher.Event.KeeperState.SyncConnected == watchedEvent.getState()) {
                    System.out.println("已連接");
                }
            }
        });
    }
    
    // 創建鎖的節點
    private void createLockNode(String lockName) throws InterruptedException {
        try {
            String basePath = "/locks";
            // 創建鎖的節點,使用EPHEMERAL_SEQUENTIAL模式實現
            lockPath = zk.create(basePath + "/" + lockName + "-", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
        } catch (KeeperException e) {
             throw new RuntimeException(e);
        } 
    }

    // 獲取鎖
    public synchronized boolean getLock() throws InterruptedException {
        // 創建鎖節點
        createLockNode(lockName);

        // 獲取/locks下的所有節點
        List allLocks = zk.getChildren("/locks", false);

        // 對節點按照名稱進行排序
        Collections.sort(allLocks);

        // 取出所有鎖節點中最小的節點,並判斷是否為當前鎖
        String currentLockPath = lockPath.substring("/locks".length() + 1);
        int index = allLocks.indexOf(currentLockPath);
        if (index == 0) { // 已經獲取到鎖
            return true;
        } else { // 未獲取到鎖,刪除當前鎖節點
            String preLockPath = "/locks/" + allLocks.get(index - 1);
            Stat stat = zk.exists(preLockPath, true);
            if (stat == null) {
                return getLock();
            } else {
                CountDownLatch latch = new CountDownLatch(1);
                Watcher watcher = new Watcher() {
                    @Override
                    public void process(WatchedEvent watchedEvent) {
                        latch.countDown();
                    }
                };
                zk.getData(preLockPath, watcher, stat);
                latch.await();
                return getLock();
            }
        }
    }

    // 釋放鎖
    public synchronized void releaseLock() {
        try {
            zk.delete(lockPath, -1);
            zk.close();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (KeeperException e) {
            throw new RuntimeException(e);
        }
    }

四、分佈式鎖實現的問題和優化

4.1 問題

在上述Zookeeper分佈式鎖的實現中,會出現「驚群效應」的問題。當有多個節點需要獲取鎖時,由於各個節點的狀態變化都會觸發Zookeeper的watcher機制,會使得羊群效應嚴重,性能嚴重下降。

4.2 優化

避免「驚群效應」的解決方案有很多種,這裡介紹一種基於Zookeeper的改進方案。在Zookeeper的watcher機制下,多個節點的狀態改變都會觸發,導致了「驚群效應」的問題。因此,解決方案就是減少狀態改變的次數,這樣能減少watcher的觸發次數,提高性能。

改進方案如下:

  • 只在需要獲取鎖時,才創建鎖節點
  • 在獲取到鎖時,傳入一個timeout參數作為鎖的過期時間,如果在指定的時間內鎖未被釋放,自動刪除鎖節點
  • 在釋放鎖時,先檢查鎖是否已經失效
    /**
     * ZooKeeper實現分佈式鎖 - 改進版
     */
    private ZooKeeper zk;
    private String lockName;
    private String lockPath;
    private int sessionTimeout;

    public ZookeeperLock(String connectStr, String lockName, int sessionTimeout) throws IOException {
        this.lockName = lockName;
        this.sessionTimeout = sessionTimeout;
        // 連接zookeeper
        zk = new ZooKeeper(connectStr, sessionTimeout, new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {
                if (Watcher.Event.KeeperState.SyncConnected == watchedEvent.getState()) {
                    System.out.println("已連接");
                }
            }
        });
    }

    // 獲取鎖
    public synchronized boolean getLock(long timeout) throws InterruptedException {
        long start = System.currentTimeMillis();

        // 創建鎖節點
        createLockNode(lockName);

        // 獲取/locks下的所有節點
        List allLocks = zk.getChildren("/locks", false);

        // 對節點按照名稱進行排序
        Collections.sort(allLocks);

        // 取出所有鎖節點中最小的節點,並判斷是否為當前鎖
        String currentLockPath = lockPath.substring("/locks".length() + 1);
        int index = allLocks.indexOf(currentLockPath);
        if (index == 0) { // 已經獲取到鎖
            return true;
        } else { // 未獲取到鎖,監視前一個節點
            String preLockPath = "/locks/" + allLocks.get(index - 1);
            Stat stat = zk.exists(preLockPath, new Watcher() {
                @Override
                public void process(WatchedEvent watchedEvent) {
                    synchronized (this) {
                        notifyAll();
                    }
                }
            });
            if (stat == null) {
                return getLock(timeout - (System.currentTimeMillis() - start));
            } else { // 等待前一個鎖釋放
                synchronized (this) {
                    wait(timeout - (System.currentTimeMillis() - start));
                }
                return getLock(timeout);
            }
        }
    }

    // 創建鎖的節點
    private void createLockNode(String lockName) throws InterruptedException {
        try {
            String basePath = "/locks";
            // 創建鎖的節點,使用EPHEMERAL_SEQUENTIAL模式實現
            lockPath = zk.create(basePath + "/" + lockName + "-", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
        } catch (KeeperException e) {
            throw new RuntimeException(e);
        }
    }

    // 釋放鎖
    public synchronized void releaseLock() {
        try {
            zk.delete(lockPath, -1);
            zk.close();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (KeeperException e) {
            throw new RuntimeException(e);
        }
    }

    // 檢查鎖是否已經失效
    private boolean isLockExpired(String currentLockPath, long timeout) throws Exception {
        List allLocks = zk.getChildren("/locks", false);

        int index = allLocks.indexOf(currentLockPath);
        if (index < 0) { // 當前鎖節點已經不存在了
            return true;
        } else if (index == 0) { // 已經獲取到了鎖
            return false;
        } else { // 未獲取到鎖,檢查前一個節點是否已經失效
            String preLockPath = "/locks/" + allLocks.get(index - 1);
            Stat stat = zk.exists(preLockPath, new Watcher() {
                @Override
                public void process(WatchedEvent watchedEvent) {
                    synchronized (this) {
                        notifyAll();
                    }
                }
            });

            if (stat == null) { // 前一個節點已經失效
                return isLockExpired(currentLockPath, timeout);
            } else { // 前一個節點未失效
                CountDownLatch latch = new CountDownLatch(1);
                Watcher watcher = new Watcher() {
                    @Override
                    public void process(WatchedEvent watchedEvent) {
                        latch.countDown();
                    }
                };
                zk.getData(preLockPath, watcher, stat);
                latch.await(timeout, TimeUnit.MILLISECONDS);
                return isLockExpired(currentLockPath, timeout);
            }
        }
    }

五、總結

在分佈式系統中,實現分佈式鎖是比較複雜的一件事情。Zookeeper提供可靠的分佈式節點數據管理功能,適合實現分佈式鎖。通過Zookeeper的全局有序性和watcher機制,能夠確保分佈式鎖的同步和互斥。同時,為了避免「驚群效應」和提高分佈式鎖的性能,可以通過上述的優化方案。

原創文章,作者:小藍,如若轉載,請註明出處:https://www.506064.com/zh-hk/n/285420.html

(0)
打賞 微信掃一掃 微信掃一掃 支付寶掃一掃 支付寶掃一掃
小藍的頭像小藍
上一篇 2024-12-22 15:44
下一篇 2024-12-22 15:44

相關推薦

  • KeyDB Java:完美的分佈式高速緩存方案

    本文將從以下幾個方面對KeyDB Java進行詳細闡述:KeyDB Java的特點、安裝和配置、使用示例、性能測試。 一、KeyDB Java的特點 KeyDB Java是KeyD…

    編程 2025-04-29
  • Java Hmily分佈式事務解決方案

    分佈式系統是現在互聯網公司架構中的必備項,但隨着業務的不斷擴展,分佈式事務的問題也日益凸顯。為了解決分佈式事務問題,Java Hmily分佈式事務解決方案應運而生。本文將對Java…

    編程 2025-04-28
  • Zookeeper ACL 用戶 anyone 全面解析

    本文將從以下幾個方面對Zookeeper ACL中的用戶anyone進行全面的解析,並為讀者提供相關的示例代碼。 一、anyone 的作用是什麼? 在Zookeeper中,anyo…

    編程 2025-04-28
  • JL Transaction – 實現分佈式事務管理的利器

    本文將為大家介紹JL Transaction,這是一款可以實現分佈式事務管理的開源事務框架,它可以幫助企業在分佈式環境下有效地解決事務的一致性問題,從而保障系統的穩定性和可靠性。 …

    編程 2025-04-28
  • 使用RPC研發雲實現分佈式服務交互

    本文將基於RPC研發雲,闡述分佈式服務交互實現的過程和實現方式。 一、RPC研發雲簡介 RPC研發雲是一種基於分佈式架構的服務框架,在處理不同語言之間的通信上變得越來越流行。通過使…

    編程 2025-04-28
  • 分佈式文件系統數據分佈算法

    數據分佈算法是分佈式文件系統中的重要技術之一,它能夠實現將文件分散存儲於各個節點上,提高系統的可靠性和性能。在這篇文章中,我們將從多個方面對分佈式文件系統數據分佈算法進行詳細的闡述…

    編程 2025-04-27
  • Zookeeper啟動詳解

    一、下載和安裝Zookeeper 1、訪問 https://zookeeper.apache.org/releases.html 下載最新的穩定版本。 wget https://m…

    編程 2025-04-25
  • Zookeeper默認端口的詳細解析

    一、Zookeeper端口的概念 Zookeeper是一個分佈式的協調服務,這意味着它需要在多台服務器之間進行通信。在Zookeeper通信的過程中,需要使用端口來進行標識和傳輸數…

    編程 2025-04-25
  • 使用Spring Cloud Redis實現分佈式緩存管理

    一、背景介紹 在分佈式互聯網應用中,緩存技術扮演着非常重要的角色。緩存技術能夠有效減輕數據庫的訪問壓力,提高應用的訪問速度。在分佈式應用中,如何統一管理分佈式緩存成為了一項挑戰。本…

    編程 2025-04-24
  • 使用Kubernetes(K8s)搭建分佈式系統

    一、Kubernetes概述 Kubernetes是一個用於自動部署、擴展和管理容器化應用程序的開源平台。其提供了高可用性、自我修復能力和易於擴展的特徵,使得大規模、高度可用的分佈…

    編程 2025-04-24

發表回復

登錄後才能評論