在機器學習和數據科學的現代世界中,非常容易找到與眾不同的 Python Tools。這些軟體包包括 scikit-learn、NumPy 或pands,它們不能根據內存使用或處理時間的數據進行適當擴展。
預計將轉向分散式計算工具(傳統上稱為 Apache Spark )。然而,這可能意味著為一個全新的系統重組工作流,在熟悉的 Python 生態系統和不同的 Java 虛擬機(JVM) 世界之間導航,並使開發工作流明顯複雜化。
Dask 庫用於將分散式計算能力與數據科學 Python 開發的靈活性相結合,與 Python 的標準數據工具無縫集成。
讓我們考慮一個場景:我們有一個數據集,可能是一組非常大的文本文件,以便放入機器的內存中。我們可以利用 Python 中的文件流和另一個生成器工具來迭代數據集,而無需將它們載入到內存中。然而,另一個問題會出現,因為程序仍然在單線程上工作,這最終會限制速度,即使在內存管理之後。
因此, Python 提供了一個被稱為全局解釋器 Look 的安全特性(換句話說,大多數開發人員使用 CPython )來用 Python 編寫並行代碼,但這可能有點棘手。
因此,很少有好的解決方案可供選擇。此類解決方案涉及使用 GIL 以外的低級工具(例如在 Python 以外的編譯代碼中執行多線程重載的 NumPy)或利用 Python 代碼包中的多個進程/線程,例如多進程或 joblib 。
然而,很難嘗試並行化來加快代碼的速度,因此,即使流程完成得很正確,在可讀性較差的代碼中,開發人員需要完全重新設計流程,但系統資源有限。
對於實際上像上面這樣的大規模困難,分散式計算可以被認為是一個普遍的關鍵。工作被分配給分散式系統中的多個獨立工作機器,而不是僅僅試圖在單個設備上的多個線程中工作。
這些自主工作機器在其處理器和磁碟空間或內存中處理數據集的塊。這些工作機只通過相對簡單的消息傳遞相互通信或通過中央調度進行通信,而不是像多線程代碼那樣共享磁碟空間和內存。
分散式計算系統還允許開發人員在相當大的數據集上擴展代碼,以便在任意數量的工作人員上並行運行,以換取設置集中式調度器並使工作人員與每個工作人員完全分離的複雜設計。
讓我們了解什麼是 Dask 以及它是如何工作的。
Dask 是與其他社區項目協同開發設計的免費開源庫,如 【Pandas】NumPy**scikit-learn。它是一個並行計算庫,通過任務工作者和任務調度器來分配更廣泛的計算,並將它們分解成更小的計算。 Dask** 庫在比內存更大的數據集上提供分散式並行和多核執行。
Dask 通過其低級調度器和高級集合提供不同的實用程序。
- 低級調度器:Dask 提供動態的任務調度器,並行處理任務圖。這些執行機器控制著高級集合。但是,我們可以使用它們來支持用戶定義的習慣和工作負載。這些調度器具有較低的延遲(大約 1 毫秒),並且努力在較小的內存佔用中處理計算。 Dask 中的調度器是在複雜情況下或其他任務調度系統(如 IPython 並行或 Luigi )中指導使用多進程和線程庫的替代方案。
- 高級集合:Dask 提供了模仿 Pandas、列表和 NumPy 的高級數組、數據幀和包集合。然而,我們可以在不適合內存的數據集上並行操作這些。對於大數據集來說, Dask 的高級集合是 Pandas 和 NumPy 的替代品。
Dask 的用例提供了幾個示例工作流,其中 Dask 可以被認為是完美的匹配。
Dask 提供的調度器主要有兩種:單機調度器和分散式調度器。
- 單機調度器:單機調度器針對大於內存利用率進行了優化。這個調度程序簡單、相似,而且使用起來很便宜;然而,由於在單一機器上工作,它無法擴展。
- 分散式調度器:與單機調度器相比,分散式調度器更加複雜,並且完全非同步(連續無阻塞對話)。
建議在大多數情況下使用分散式計劃程序,因為它提供了一個由多個表格和帶有實時信息的圖表組成的可調節的互動式儀錶板。默認情況下,在初始化群集時,它在埠 8787 可用。
在進入安裝部分之前,讓我們先了解一下 Dask 集群。
一個集群是一個分散式或並行處理系統,包含一組相互連接的獨立計算機,它們作為一個單一的集成計算資源一起支持運行。集群中的一個節點可以被認為是單個或多進程器系統,如個人計算機、工作站,甚至是 SMP。
在集群世界中有各種各樣的架構形式,以決定我們如何在計算機之間精確地分配工作。讓我們了解集群的組織是如何在 Dask 中完成的。
Dask 網路由三部分組成:
- 集中式調度器:集中式調度器管理工作人員,並為他們分配需要完成的任務。
- 許多工人:許多工人執行計算,保留結果,並與其他工人交流結果。
- 一個或多個客戶端:一個或多個客戶端可以通過 Jupyter 筆記本或腳本與用戶進行交互。這些客戶還將工作提交給時間表,以便對工人進行處理。
客戶端將向調度發送請求,描述用於計算的代碼類型。一旦接收到請求,調度器就在工人之間分配工作以完成請求,最後工人完成計算工作。
正如我們所觀察到的, Dask 將這些大量的數據計算分為多個小的計算。
還值得注意的是 Dask 也可以基於集群部署在各種技術上,比如:
- 不可思議的群集
- 高性能計算集群處理工作經理,如 LSF、PBS、SGE、思樂姆或任何其他常見的科學和學術實驗室。
- Spark 或 Hadoop 集群處理紗。
我們可以使用 Anaconda 或者 pip 來安裝 Dask 。
通過 Anaconda 安裝 Dask 的語法如下:
conda install dask
或
我們可以簡單的在終端或者命令提示符下使用以下命令通過 pip 安裝 Dask :
$ pip install dask[complete]
一旦我們成功安裝了 Dask 庫,讓我們了解一下 Dask 界面。
Dask 提供不同的用戶界面。這些介面包含一組不同的分散式計算並行演算法。下面為數據科學搜索從業者陳述了一些重要的用戶界面,以擴展 NumPy、Pandas 和 scikit-learn。
- 陣列:並行 NumPy
- 數據幀:平行 Pandas
- 機器學習:並行 Scikit-Learn
達斯克陣列
Dask 中的數組在阻塞演算法的幫助下提供了一個大於內存的、並行的、n 維數組。換句話說,它是 NumPy 數組的分散式形式。
下面這張圖片將幫助我們了解 Dask 陣列的外觀:
正如我們所看到的,多個 NumPy 數組被組織成網格,以形成一個 Dask 數組。當我們創建一個 Dask 數組時,我們可以規定卡盤的大小,這定義了 NumPy 數組的大小。例如,如果我們在一個數組中有十個值,並且提供的塊大小為五,那麼它將返回兩個 NumPy 數組,每個數組有五個值。
Dask 陣列提供了以下描述的一些重要特性:
- 大於內存: Dask Arrays 讓我們可以處理比可用內存更大的數據集。Dask 有助於將數組分解成許多小片段,在這些片段上運行以減少計算的內存佔用,並有效地從磁碟流式傳輸數據。
- 並行: Dask 陣列利用所有內核進行並行計算。
- 阻塞演算法: Dask Arrays 還提供阻塞演算法,以便在塊或子矩陣上操作,而不是在數組的整行或整列上運行。該函數通過處理許多小的計算來幫助執行大的計算。
這裡有一些使用 Dask 創建數組的簡單案例。
例 1:藉助 Dask 數組創建隨機數組
import dask.array as darray
# using arange for creating an array with values from 0 to 15
my_array = darray.arange(16, chunks = 5)
print( my_array.compute())
# using chunks for checking the size of each chunk
print(my_array.chunks)
輸出:
[ 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15]
((5, 5, 5, 1),)
說明:
在上面的程序中,我們從 dask 庫中導入了數組模塊,並使用 arange() 方法創建了一個由 16 個值組成的數組,並將塊大小分別定義為 5。然後,我們使用計算()方法列印數組。我們還使用組塊功能檢查了每個組塊的大小。結果,我們得到了結果數組,我們還可以觀察到數組分布在四個塊中,其中第一個、第二個和第三個塊各包含五個值,第四個塊只有一個值。
示例 2:將 NumPy 數組轉換為 Dask 數組
import numpy as np
import dask.array as darray
first_array = np.arange(15)
second_array = darray.from_array(first_array, chunks = 5)
# resulting in a dask array
print(second_array.compute())
輸出:
[ 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14]
說明:
在上例中,我們導入了 NumPy 庫和 dask 庫的數組模塊。然後,我們使用方法創建了一個由 15 個值組成的 NumPy 數組作為 first_array 。然後,我們使用 from_array() 方法將組塊分別定義為 5,將第一 _ 數組轉換為 Dask 數組作為第二 _ 數組。然後我們使用計算()函數列印數組。
此外,Dask 陣列支持 NumPy 陣列的大部分功能。例如,我們可以使用 mean() 、 sum() 等等。
例 3:計算前 100 個數之和
import numpy as np
import dask.array as darray
# arange is used to create array on values from 0 to 100
first_array = np.arange(100)
# converting numpy array to dask array
second_array = darray.from_array(first_array, chunks = (10))
# computing mean of the array
print(second_array.sum().compute())
輸出:
4950
說明:
在上例中,我們導入了 NumPy 庫和 Dask 庫的數組模塊,並使用保證函數創建了一個從 1 到 100 的 NumPy 數組。然後,我們將 NumPy 數組轉換為 Dask 數組,並使用 sum() 函數列印 Dask 數組值的總和。因此,我們得到了前 100 個數字的總和。
我們已經討論了 Dask Python 的基本介紹,但是幾乎沒有重要的概念需要討論。教程的其餘部分將在第二部分中介紹。
原創文章,作者:簡單一點,如若轉載,請註明出處:https://www.506064.com/zh-tw/n/126346.html