簡述hadoop搭建步驟「hadoop編程的介紹」

前言

之前工作中,有接觸到大數據的需求,雖然當時我們體系有專門的大數據部門,但是由於當時我們中台重構,整個體系的開發量巨大,共用一個大數據部門,人手已經忙不過來,沒法辦,為了趕時間,我自己負責的系統的大數據相關操作,由我們自己承擔了。此前對大數據的知識了解的很少,於是晚上回去花時間突擊大數據知識,白天就開始上手干,一邊學一邊做,總算在部門規定的時間,跟系統一起上線了。後來的維護迭代就交給大數據去了,雖然接觸大數據的時間不長,但是對我來說,確是很有意思的一段經歷,覺得把當時匆匆學的知識點,再仔細回顧回顧,整理下。

01 大數據概述

大數據: 就是對海量數據進行分析處理,得到一些有價值的信息,然後幫助企業做出判斷和決策.

處理流程:

  • ​ 1:獲取數據
  • ​ 2:處理數據
  • ​ 3:展示結果

02 Hadoop介紹

Hadoop是一個分布式系基礎框架,它允許使用簡單的編程模型跨大型計算機的大型數據集進行分布式處理.它主要解決兩個問題

  • 大數據存儲問題: HDFS
  • 大數據計算問題:MapReduce

2.1 問題一: 大文件怎麼存儲?

假設一個文件非常非常大,大小為1PB/a.txt, 大到世界上所有的高級計算機都存儲不下, 怎麼辦?

  • 為了保存大文件, 需要把文件放在多個機器上
    • 文件要分塊 block(128M)
    • 不同的塊放在不同的 HDFS 節點
  • 同時為了對外提供統一的訪問, 讓外部可以像是訪問本機一樣訪問分布式文件系統
    • 有一個統一的 HDFS Master
    • 它保存整個系統的文件信息
    • 所有的文件元數據的修改都從 Master 開始

2. 2 問題二: 大數據怎麼計算?

從一個網絡日誌文件中計算獨立 IP, 以及其出現的次數
如果數據量特別大,我們可以將,整個任務拆開, 劃分為比較小的任務, 從而進行計算呢。

2.3 問題三: 如何將這些計算任務跑在集群中?

如果能夠在不同的節點上並行執行, 更有更大的提升, 如何把這些任務跑在集群中?

  • 可以設置一個集群的管理者, 這個地方叫做 Yarn這個集群管理者有一個 Master, 用於接收和分配任務這個集群管理者有多個 Slave, 用於運行任務

2.4 Hadoop 的組成

  • Hadoop分布式文件系統(HDFS) 提供對應用程序數據的高吞吐量訪問的分布式文件系統
  • Hadoop Common 其他Hadoop模塊所需的Java庫和實用程序。這些庫提供文件系統和操作系統級抽象,並包含啟動Hadoop所需的必要Java文件和腳本
  • Hadoop MapReduce 基於YARN的大型數據集並行處理系統
  • Hadoop YARN 作業調度和集群資源管理的框架

2.5 Hadoop前生今世

  1. Hadoop最早起源於Nutch。Nutch的設計目標是構建一個大型的全網搜索引擎,包括網頁抓取、索引、查詢等功能,但隨着抓取網頁數量的增加,遇到了嚴重的可擴展性問題——如何解決數十億網頁的存儲和索引問題。
  2. 2003年、2004年谷歌發表的兩篇論文為該問題提供了可行的解決方案。

——分布式文件系統(GFS),可用於處理海量網頁的存儲

——分布式計算框架MAPREDUCE,可用於處理海量網頁的索引計算問題。

  1. Nutch的開發人員完成了相應的開源實現HDFSMAPREDUCE,並從Nutch中剝離成為獨立項目HADOOP,到2008年1月,HADOOP成為Apache頂級項目.

狹義上來說,hadoop就是單獨指代hadoop這個軟件,

HDFS :分布式文件系統

MapReduce : 分布式計算系統

廣義上來說,hadoop指代大數據的一個生態圈,包括很多其他的軟件

什麼是 Hadoop?它主要能解決“大數據”的哪兩個問題?

2.6 hadoop的架構模型

1.x的版本架構模型介紹

什麼是 Hadoop?它主要能解決“大數據”的哪兩個問題?

文件系統核心模塊:

NameNode:集群當中的主節點,管理元數據(文件的大小,文件的位置,文件的權限),主要用於管理集群當中的各種數據

secondaryNameNode:主要能用於hadoop當中元數據信息的輔助管理

DataNode:集群當中的從節點,主要用於存儲集群當中的各種數據

數據計算核心模塊:

JobTracker:接收用戶的計算請求任務,並分配任務給從節點

TaskTracker:負責執行主節點JobTracker分配的任務

2.x的版本架構模型介紹

第一種:NameNode與ResourceManager單節點架構模型

什麼是 Hadoop?它主要能解決“大數據”的哪兩個問題?

文件系統核心模塊:

NameNode:集群當中的主節點,主要用於管理集群當中的各種數據

secondaryNameNode:主要能用於hadoop當中元數據信息的輔助管理

DataNode:集群當中的從節點,主要用於存儲集群當中的各種數據

數據計算核心模塊:

ResourceManager:接收用戶的計算請求任務,並負責集群的資源分配

NodeManager:負責執行主節點APPmaster分配的任務

第二種:NameNode單節點與ResourceManager高可用架構模型

什麼是 Hadoop?它主要能解決“大數據”的哪兩個問題?

文件系統核心模塊:

NameNode:集群當中的主節點,主要用於管理集群當中的各種數據

secondaryNameNode:主要能用於hadoop當中元數據信息的輔助管理

DataNode:集群當中的從節點,主要用於存儲集群當中的各種數據

數據計算核心模塊:

ResourceManager:接收用戶的計算請求任務,並負責集群的資源分配,以及計算任務的劃分,通過zookeeper實現ResourceManager的高可用

NodeManager:負責執行主節點ResourceManager分配的任務

第三種:NameNode高可用與ResourceManager單節點架構模型

什麼是 Hadoop?它主要能解決“大數據”的哪兩個問題?

文件系統核心模塊:

NameNode:集群當中的主節點,主要用於管理集群當中的各種數據,其中nameNode可以有兩個,形成高可用狀態

DataNode:集群當中的從節點,主要用於存儲集群當中的各種數據

JournalNode:文件系統元數據信息管理

數據計算核心模塊:

ResourceManager:接收用戶的計算請求任務,並負責集群的資源分配,以及計算任務的劃分

NodeManager:負責執行主節點ResourceManager分配的任務

第四種:NameNode與ResourceManager高可用架構模型

什麼是 Hadoop?它主要能解決“大數據”的哪兩個問題?

文件系統核心模塊:

NameNode:集群當中的主節點,主要用於管理集群當中的各種數據,一般都是使用兩個,實現HA高可用

JournalNode:元數據信息管理進程,一般都是奇數個

DataNode:從節點,用於數據的存儲

數據計算核心模塊:

ResourceManager:Yarn平台的主節點,主要用於接收各種任務,通過兩個,構建成高可用

NodeManager:Yarn平台的從節點,主要用於處理ResourceManager分配的任務

03 Hadoop 核心介紹

3.1 HDFS

HDFS(Hadoop Distributed File System) 是一個 Apache Software Foundation 項目, 是 Apache Hadoop 項目的一個子項目. Hadoop 非常適於存儲大型數據 (比如 TB 和 PB), 其就是使用 HDFS 作為存儲系統. HDFS 使用多台計算機存儲文件, 並且提供統一的訪問接口, 像是訪問一個普通文件系統一樣使用分布式文件系統. HDFS 對數據文件的訪問通過流的方式進行處理, 這意味着通過命令和 MapReduce 程序的方式可以直接使用 HDFS. HDFS 是容錯的, 且提供對大數據集的高吞吐量訪問.

HDFS 的一個非常重要的特點就是一次寫入、多次讀取, 該模型降低了對並發控制的要求, 簡化了數據聚合性, 支持高吞吐量訪問. 而吞吐量是大數據系統的一個非常重要的指標, 吞吐量高意味着能處理的數據量就大.

3.1.1 設計目標

  • 通過跨多個廉價計算機集群分布數據和處理來節約成本
  • 通過自動維護多個數據副本和在故障發生時來實現可靠性
  • 它們為存儲和處理超大規模數據提供所需的擴展能力。

3.1.2 HDFS 的歷史

  1. Doug Cutting 在做 Lucene 的時候, 需要編寫一個爬蟲服務, 這個爬蟲寫的並不順利, 遇到了一些問題, 諸如: 如何存儲大規模的數據, 如何保證集群的可伸縮性, 如何動態容錯等
  2. 2013年的時候, Google 發布了三篇論文, 被稱作為三駕馬車, 其中有一篇叫做 GFS, 是描述了 Google 內部的一個叫做 GFS 的分布式大規模文件系統, 具有強大的可伸縮性和容錯性
  3. Doug Cutting 後來根據 GFS 的論文, 創造了一個新的文件系統, 叫做 HDFS

3.1.3 HDFS 的架構

  1. NameNode 是一個中心服務器, 單一節點(簡化系統的設計和實現), 負責管理文件系統的名字空間(NameSpace)以及客戶端對文件的訪問
  2. 文件操作, NameNode 是負責文件元數據的操作, DataNode 負責處理文件內容的讀寫請求, 跟文件內容相關的數據流不經過 NameNode, 只詢問它跟哪個 DataNode聯繫, 否則 NameNode 會成為系統的瓶頸
  3. 副本存放在哪些 DataNode 上由 NameNode 來控制, 根據全局情況作出塊放置決定, 讀取文件時 NameNode 盡量讓用戶先讀取最近的副本, 降低讀取網絡開銷和讀取延時
  4. NameNode 全權管理數據庫的複製, 它周期性的從集群中的每個DataNode 接收心跳信合和狀態報告, 接收到心跳信號意味着 DataNode 節點工作正常, 塊狀態報告包含了一個該 DataNode 上所有的數據列表
什麼是 Hadoop?它主要能解決“大數據”的哪兩個問題?

3.1.4 HDFS文件副本和Block塊存儲

所有的文件都是以 block 塊的方式存放在 HDFS 文件系統當中, 在 Hadoop1 當中, 文件的 block 塊默認大小是64M, hadoop2 當中, 文件的 block 塊大小默認是 128M, block 塊的大小可以通過 hdfs-site.xml 當中的配置文件進行指定

<property>
    <name>dfs.block.size</name>
    <value>塊大小 以字節為單位</value>
</property>

(1)引入塊機制的好處

  • 一個文件有可能大於集群中任意一個磁盤
  • 使用塊抽象而不是文件可以簡化存儲子系統
  • 塊非常適合用於數據備份進而提供數據容錯能力和可用性

(2)塊緩存

通常 DataNode 從磁盤中讀取塊, 但對於訪問頻繁的文件, 其對應的塊可能被顯式的緩存在 DataNode 的內存中, 以堆外塊緩存的形式存在. 默認情況下,一個塊僅緩存在一個 DataNode 的內存中,當然可以針對每個文件配置 DataNode 的數量. 作業調度器通過在緩存塊的 DataNode 上運行任務, 可以利用塊緩存的優勢提高讀操作的性能.

例如:

連接(join) 操作中使用的一個小的查詢表就是塊緩存的一個很好的候選

用戶或應用通過在緩存池中增加一個 Cache Directive 來告訴 NameNode 需要緩存哪些文件及存多久. 緩存池(Cache Pool) 是一個擁有管理緩存權限和資源使用的管理性分組.

例如一個文件 130M, 會被切分成 2 個 block 塊, 保存在兩個 block 塊裡面, 實際佔用磁盤 130M 空間, 而不是佔用256M的磁盤空間

(3)HDFS 文件權限驗證

HDFS 的文件權限機制與 Linux 系統的文件權限機制類似

r:read  w:write  x:execute

權限 x 對於文件表示忽略, 對於文件夾表示是否有權限訪問其內容 如果 Linux 系統用戶 zhangsan 使用 Hadoop 命令創建一個文件, 那麼這個文件在 HDFS 當中的 Owner 就是 zhangsan HDFS 文件權限的目的, 防止好人做錯事, 而不是阻止壞人做壞事. HDFS相信你告訴我你是誰, 你就是誰

3.1.5 HDFS 的元信息和 SecondaryNameNode

當 Hadoop 的集群當中, 只有一個 NameNode 的時候, 所有的元數據信息都保存在了 FsImage 與 Eidts 文件當中, 這兩個文件就記錄了所有的數據的元數據信息, 元數據信息的保存目錄配置在了 hdfs-site.xml 當中

<property>
  <name>dfs.namenode.name.dir</name>
  <value>file:///export/servers/hadoop-3.1.1/datas/namenode/namenodedatas</value>
</property>
<property>
  <name>dfs.namenode.edits.dir</name>
  <value>file:///export/servers/hadoop-3.1.1/datas/dfs/nn/edits</value>
</property>

(1) FsImage 和 Edits 詳解

  • editsedits 存放了客戶端最近一段時間的操作日誌客戶端對 HDFS 進行寫文件時會首先被記錄在 edits 文件中edits 修改時元數據也會更新每次 HDFS 更新時 edits 先更新後客戶端才會看到最新信息
  • fsimageNameNode 中關於元數據的鏡像, 一般稱為檢查點, fsimage 存放了一份比較完整的元數據信息因為 fsimage 是 NameNode 的完整的鏡像, 如果每次都加載到內存生成樹狀拓撲結構,這是非常耗內存和CPU, 所以一般開始時對 NameNode 的操作都放在 edits 中fsimage 內容包含了 NameNode 管理下的所有 DataNode 文件及文件 block 及 block 所在的 DataNode 的元數據信息.隨着 edits 內容增大, 就需要在一定時間點和 fsimage 合併

(2)fsimage 中的文件信息查看

官方查看文檔

使用命令 hdfs oiv

cd /export/servers/hadoop-3.1.1/datas/namenode/namenodedatas
hdfs oiv -i fsimage_0000000000000000864 -p XML -o hello.xml

(3) edits 中的文件信息查看

官方查看文檔

使用命令 hdfs oev

cd /export/servers/hadoop-3.1.1/datas/dfs/nn/edits
hdfs oev -i  edits_0000000000000000865-0000000000000000866 -o myedit.xml -p XML

(4) SecondaryNameNode 如何輔助管理 fsimage 與 edits 文件?

  • SecondaryNameNode 定期合併 fsimage 和 edits, 把 edits 控制在一個範圍內
  • 配置 SecondaryNameNodeSecondaryNameNode 在 conf/masters 中指定在 masters 指定的機器上, 修改 hdfs-site.xml<property> <name>dfs.http.address</name> <value>host:50070</value> </property> 修改 core-site.xml, 這一步不做配置保持默認也可以<!– 多久記錄一次 HDFS 鏡像, 默認 1小時 –> <property> <name>fs.checkpoint.period</name> <value>3600</value> </property> <!– 一次記錄多大, 默認 64M –> <property> <name>fs.checkpoint.size</name> <value>67108864</value> </property>
  1. SecondaryNameNode 通知 NameNode 切換 editlog
  2. SecondaryNameNode 從 NameNode 中獲得 fsimage 和 editlog(通過http方式)
  3. SecondaryNameNode 將 fsimage 載入內存, 然後開始合併 editlog, 合併之後成為新的 fsimage
  4. SecondaryNameNode 將新的 fsimage 發回給 NameNode
  5. NameNode 用新的 fsimage 替換舊的 fsimage

(5)特點

  • 完成合併的是 SecondaryNameNode, 會請求 NameNode 停止使用 edits, 暫時將新寫操作放入一個新的文件中 edits.new
  • SecondaryNameNode 從 NameNode 中通過 Http GET 獲得 edits, 因為要和 fsimage 合併, 所以也是通過 Http Get 的方式把 fsimage 加載到內存, 然後逐一執行具體對文件系統的操作, 與 fsimage 合併, 生成新的 fsimage, 然後通過 Http POST 的方式把 fsimage 發送給 NameNode. NameNode 從 SecondaryNameNode 獲得了 fsimage 後會把原有的 fsimage 替換為新的 fsimage, 把 edits.new 變成 edits. 同時會更新 fstime
  • Hadoop 進入安全模式時需要管理員使用 dfsadmin 的 save namespace 來創建新的檢查點
  • SecondaryNameNode 在合併 edits 和 fsimage 時需要消耗的內存和 NameNode 差不多, 所以一般把 NameNode 和 SecondaryNameNode 放在不同的機器上

3.1.6 HDFS 文件寫入過程

  1. Client 發起文件上傳請求, 通過 RPC 與 NameNode 建立通訊, NameNode 檢查目標文件是否已存在, 父目錄是否存在, 返回是否可以上傳
  2. Client 請求第一個 block 該傳輸到哪些 DataNode 服務器上
  3. NameNode 根據配置文件中指定的備份數量及機架感知原理進行文件分配, 返回可用的 DataNode 的地址如: A, B, C
  4. Hadoop 在設計時考慮到數據的安全與高效, 數據文件默認在 HDFS 上存放三份, 存儲策略為本地一份, 同機架內其它某一節點上一份, 不同機架的某一節點上一份。
  5. Client 請求 3 台 DataNode 中的一台 A 上傳數據(本質上是一個 RPC 調用,建立 pipeline ), A 收到請求會繼續調用 B, 然後 B 調用 C, 將整個 pipeline 建立完成, 後逐級返回 client
  6. Client 開始往 A 上傳第一個 block(先從磁盤讀取數據放到一個本地內存緩存), 以 packet 為單位(默認64K), A 收到一個 packet 就會傳給 B, B 傳給 C. A 每傳一個 packet 會放入一個應答隊列等待應答
  7. 數據被分割成一個個 packet 數據包在 pipeline 上依次傳輸, 在 pipeline 反方向上, 逐個發送 ack(命令正確應答), 最終由 pipeline 中第一個 DataNode 節點 A 將 pipelineack 發送給 Client
  8. 當一個 block 傳輸完成之後, Client 再次請求 NameNode 上傳第二個 block 到服務 1

3.1.7. HDFS 文件讀取過程

  1. Client向NameNode發起RPC請求,來確定請求文件block所在的位置;
  2. NameNode會視情況返迴文件的部分或者全部block列表,對於每個block,NameNode 都會返回含有該 block 副本的 DataNode 地址; 這些返回的 DN 地址,會按照集群拓撲結構得出 DataNode 與客戶端的距離,然後進行排序,排序兩個規則:網絡拓撲結構中距離 Client 近的排靠前;心跳機制中超時彙報的 DN 狀態為 STALE,這樣的排靠後;
  3. Client 選取排序靠前的 DataNode 來讀取 block,如果客戶端本身就是DataNode,那麼將從本地直接獲取數據(短路讀取特性);
  4. 底層上本質是建立 Socket Stream(FSDataInputStream),重複的調用父類 DataInputStream 的 read 方法,直到這個塊上的數據讀取完畢;
  5. 當讀完列表的 block 後,若文件讀取還沒有結束,客戶端會繼續向NameNode 獲取下一批的 block 列表;
  6. 讀取完一個 block 都會進行 checksum 驗證,如果讀取 DataNode 時出現錯誤,客戶端會通知 NameNode,然後再從下一個擁有該 block 副本的DataNode 繼續讀。
  7. read 方法是並行的讀取 block 信息,不是一塊一塊的讀取;NameNode 只是返回Client請求包含塊的DataNode地址,並不是返回請求塊的數據;
  8. 最終讀取來所有的 block 會合併成一個完整的最終文件。

3.1.8. HDFS 的 API 操作

(1)導入 Maven 依賴

<repositories>
    <repository>
        <id>cloudera</id>
        <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
    </repository>
</repositories>
<dependencies>
  <dependency>
      <groupId>jdk.tools</groupId>
      <artifactId>jdk.tools</artifactId>
      <version>1.8</version>
      <scope>system</scope>
      <systemPath>${JAVA_HOME}/lib/tools.jar</systemPath>
  </dependency>
  <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-common</artifactId>
      <version>3.0.0</version>
      <scope>provided</scope>
  </dependency>
  <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-hdfs</artifactId>
      <version>3.0.0</version>
  </dependency>
  <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-hdfs-client</artifactId>
      <version>3.0.0</version>
      <scope>provided</scope>
  </dependency>
  <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-client</artifactId>
      <version>3.0.0</version>
  </dependency>
  <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>4.12</version>
      <scope>test</scope>
  </dependency>
</dependencies>

(2)概述

在 Java 中操作 HDFS, 主要涉及以下 Class:

  • Configuration該類的對象封轉了客戶端或者服務器的配置
  • FileSystem該類的對象是一個文件系統對象, 可以用該對象的一些方法來對文件進行操作, 通過 FileSystem 的靜態方法 get 獲得該對象FileSystem fs = FileSystem.get(conf) get 方法從 conf 中的一個參數 fs.defaultFS 的配置值判斷具體是什麼類型的文件系統如果我們的代碼中沒有指定 fs.defaultFS, 並且工程 ClassPath 下也沒有給定相應的配置, conf 中的默認值就來自於 Hadoop 的 Jar 包中的 core-default.xml默認值為 file:///, 則獲取的不是一個 DistributedFileSystem 的實例, 而是一個本地文件系統的客戶端對象

(3)獲取 FileSystem 的幾種方式

  • 第一種方式
@Test
public void getFileSystem() throws URISyntaxException, IOException {
    Configuration configuration = new Configuration();
    FileSystem fileSystem = FileSystem.get(new URI("hdfs://192.168.52.250:8020"), configuration);
    System.out.println(fileSystem.toString());
}
  • 第二種方式
@Test
public void getFileSystem2() throws URISyntaxException, IOException {
    Configuration configuration = new Configuration();
    configuration.set("fs.defaultFS","hdfs://192.168.52.250:8020");
    FileSystem fileSystem = FileSystem.get(new URI("/"), configuration);
    System.out.println(fileSystem.toString());
}
  • 第三種方式
@Test
public void getFileSystem3() throws URISyntaxException, IOException {
    Configuration configuration = new Configuration();
    FileSystem fileSystem = FileSystem.newInstance(new URI("hdfs://192.168.52.250:8020"), configuration);
    System.out.println(fileSystem.toString());
}
  • 第四種方式
@Test
public void getFileSystem4() throws  Exception{
    Configuration configuration = new Configuration();
    configuration.set("fs.defaultFS","hdfs://192.168.52.250:8020");
    FileSystem fileSystem = FileSystem.newInstance(configuration);
    System.out.println(fileSystem.toString());
}

(4)遍歷 HDFS 中所有文件

  • 遞歸遍歷
@Test
public void listFile() throws Exception{
    FileSystem fileSystem = FileSystem.get(new URI("hdfs://192.168.52.100:8020"), new Configuration());
    FileStatus[] fileStatuses = fileSystem.listStatus(new Path("/"));
    for (FileStatus fileStatus : fileStatuses) {
        if(fileStatus.isDirectory()){
            Path path = fileStatus.getPath();
            listAllFiles(fileSystem,path);
        }else{
            System.out.println("文件路徑為"+fileStatus.getPath().toString());

        }
    }
}
public void listAllFiles(FileSystem fileSystem,Path path) throws  Exception{
    FileStatus[] fileStatuses = fileSystem.listStatus(path);
    for (FileStatus fileStatus : fileStatuses) {
        if(fileStatus.isDirectory()){
            listAllFiles(fileSystem,fileStatus.getPath());
        }else{
            Path path1 = fileStatus.getPath();
            System.out.println("文件路徑為"+path1);
        }
    }
}
  • 使用 API 遍歷
@Test
public void listMyFiles()throws Exception{
    //獲取fileSystem類
    FileSystem fileSystem = FileSystem.get(new URI("hdfs://192.168.52.250:8020"), new Configuration());
    //獲取RemoteIterator 得到所有的文件或者文件夾,第一個參數指定遍歷的路徑,第二個參數表示是否要遞歸遍歷
    RemoteIterator<LocatedFileStatus> locatedFileStatusRemoteIterator = fileSystem.listFiles(new Path("/"), true);
    while (locatedFileStatusRemoteIterator.hasNext()){
        LocatedFileStatus next = locatedFileStatusRemoteIterator.next();
        System.out.println(next.getPath().toString());
    }
    fileSystem.close();
}

(5)下載文件到本地

@Test
public void getFileToLocal()throws  Exception{
    FileSystem fileSystem = FileSystem.get(new URI("hdfs://192.168.52.250:8020"), new Configuration());
    FSDataInputStream open = fileSystem.open(new Path("/test/input/install.log"));
    FileOutputStream fileOutputStream = new FileOutputStream(new File("c:install.log"));
    IOUtils.copy(open,fileOutputStream );
    IOUtils.closeQuietly(open);
    IOUtils.closeQuietly(fileOutputStream);
    fileSystem.close();
}

(6)HDFS 上創建文件夾

@Test
public void mkdirs() throws  Exception{
    FileSystem fileSystem = FileSystem.get(new URI("hdfs://192.168.52.250:8020"), new Configuration());
    boolean mkdirs = fileSystem.mkdirs(new Path("/hello/mydir/test"));
    fileSystem.close();
}

(7)HDFS 文件上傳

@Test
public void putData() throws  Exception{
    FileSystem fileSystem = FileSystem.get(new URI("hdfs://192.168.52.250:8020"), new Configuration());
    fileSystem.copyFromLocalFile(new Path("file:///c:install.log"),new Path("/hello/mydir/test"));
    fileSystem.close();
}

(8)偽造用戶

  1. 停止hdfs集群,在node01機器上執行以下命令
cd /export/servers/hadoop-3.1.1
sbin/stop-dfs.sh
  1. 修改node01機器上的hdfs-site.xml當中的配置文件
cd /export/servers/hadoop-3.1.1/etc/hadoop
vim hdfs-site.xml
<property>
    <name>dfs.permissions.enabled</name>
    <value>true</value>
</property>
  1. 修改完成之後配置文件發送到其他機器上面去
scp hdfs-site.xml node02:$PWD
scp hdfs-site.xml node03:$PWD
  1. 重啟hdfs集群
cd /export/servers/hadoop-3.1.1
sbin/start-dfs.sh
  1. 隨意上傳一些文件到我們hadoop集群當中準備測試使用
cd /export/servers/hadoop-3.1.1/etc/hadoop
hdfs dfs -mkdir /config
hdfs dfs -put *.xml /config
hdfs dfs -chmod 600 /config/core-site.xml
  1. 使用代碼準備下載文件
@Test
public void getConfig()throws  Exception{
    FileSystem fileSystem = FileSystem.get(new URI("hdfs://192.168.52.250:8020"), new Configuration(),"hadoop");
    fileSystem.copyToLocalFile(new Path("/config/core-site.xml"),new Path("file:///c:/core-site.xml"));
    fileSystem.close();
}

(9)小文件合併

由於 Hadoop 擅長存儲大文件,因為大文件的元數據信息比較少,如果 Hadoop 集群當中有大量的小文件,那麼每個小文件都需要維護一份元數據信息,會大大的增加集群管理元數據的內存壓力,所以在實際工作當中,如果有必要一定要將小文件合併成大文件進行一起處理

在我們的 HDFS 的 Shell 命令模式下,可以通過命令行將很多的 hdfs 文件合併成一個大文件下載到本地

cd /export/servers
hdfs dfs -getmerge /config/*.xml ./hello.xml

既然可以在下載的時候將這些小文件合併成一個大文件一起下載,那麼肯定就可以在上傳的時候將小文件合併到一個大文件裡面去

@Test
public void mergeFile() throws  Exception{
    //獲取分布式文件系統
    FileSystem fileSystem = FileSystem.get(new URI("hdfs://192.168.52.250:8020"), new Configuration(),"hadoop");
    FSDataOutputStream outputStream = fileSystem.create(new Path("/bigfile.xml"));
    //獲取本地文件系統
    LocalFileSystem local = FileSystem.getLocal(new Configuration());
    //通過本地文件系統獲取文件列表,為一個集合
    FileStatus[] fileStatuses = local.listStatus(new Path("file:///F:傳智播客大數據離線階段課程資料3、大數據離線第三天上傳小文件合併"));
    for (FileStatus fileStatus : fileStatuses) {
        FSDataInputStream inputStream = local.open(fileStatus.getPath());
       IOUtils.copy(inputStream,outputStream);
        IOUtils.closeQuietly(inputStream);
    }
    IOUtils.closeQuietly(outputStream);
    local.close();
    fileSystem.close();
}

04 MapReduce介紹

MapReduce思想在生活中處處可見。或多或少都曾接觸過這種思想。MapReduce的思想核心是“分而治之”,適用於大量複雜的任務處理場景(大規模數據處理場景)。

Map負責“分”,即把複雜的任務分解為若干個“簡單的任務”來並行處理。可以進行拆分的前提是這些小任務可以並行計算,彼此間幾乎沒有依賴關係。
Reduce負責“合”,即對map階段的結果進行全局匯總。
MapReduce運行在yarn集群

  • ResourceManager
  • NodeManager

這兩個階段合起來正是MapReduce思想的體現。

4.1 MapReduce設計思想和架構

MapReduce是一個分布式運算程序的編程框架,核心功能是將用戶編寫的業務邏輯代碼和自帶默認組件整合成一個完整的分布式運算程序,並發運行在Hadoop集群上。

Hadoop MapReduce構思:
分而治之
對相互間不具有計算依賴關係的大數據,實現並行最自然的辦法就是採取分而治之的策略。並行計算的第一個重要問題是如何劃分計算任務或者計算數據以便對劃分的子任務或數據塊同時進行計算。不可分拆的計算任務或相互間有依賴關係的數據無法進行並行計算!
統一構架,隱藏系統層細節
如何提供統一的計算框架,如果沒有統一封裝底層細節,那麼程序員則需要考慮諸如數據存儲、劃分、分發、結果收集、錯誤恢復等諸多細節;為此,MapReduce設計並提供了統一的計算框架,為程序員隱藏了絕大多數系統
層面的處理細節。
MapReduce最大的亮點在於通過抽象模型和計算框架把需要做什麼(what need to do)與具體怎麼做(how to do)分開了,為程序員提供一個抽象和高層的編程接口和框架。程序員僅需要關心其應用層的具體計算問題,僅需編寫少量的處理應用本身計算問題的程序代碼。如何具體完成這個並行計算任務所相關的諸多系統層細節被隱藏起來,交給計算框架去處理:從分布代碼的執行,到大到數千小到單個節點集群的自動調度使用。
構建抽象模型:Map和Reduce
MapReduce借鑒了函數式語言中的思想,用Map和Reduce兩個函數提供了高層的並行編程抽象模型
Map: 對一組數據元素進行某種重複式的處理;
Reduce: 對Map的中間結果進行某種進一步的結果整理。
Map和Reduce為程序員提供了一個清晰的操作接口抽象描述。MapReduce
處理的數據類型是鍵值對。
MapReduce中定義了如下的Map和Reduce兩個抽象的編程接口,由用戶去編程實現:
Map: (k1; v1) → [(k2; v2)]
Reduce: (k2; [v2]) → [(k3; v3)]

MapReduce 框架結構
一個完整的mapreduce程序在分布式運行時有三類實例進程:

  1. MRAppMaster 負責整個程序的過程調度及狀態協調
  2. MapTask 負責map階段的整個數據處理流程
  3. ReduceTask 負責reduce階段的整個數據處理流程

4.2 MapReduce編程規範

MapReduce 的開發一共有八個步驟, 其中 Map 階段分為 2 個步驟,Shuffle 階段 4個步驟,Reduce 階段分為 2 個步驟

Map 階段 2 個步驟

  1. 設置 InputFormat 類, 將數據切分為 Key-Value(K1和V1) 對, 輸入到第二步
  2. 自定義 Map 邏輯, 將第一步的結果轉換成另外的 Key-Value(K2和V2) 對, 輸出結果

Shuffle 階段 4 個步驟

  1. 對輸出的 Key-Value 對進行分區
  2. 對不同分區的數據按照相同的 Key 排序
  3. (可選) 對分組過的數據初步規約, 降低數據的網絡拷貝
  4. 對數據進行分組, 相同 Key 的 Value 放入一個集合中

Reduce 階段 2 個步驟

  1. 對多個 Map 任務的結果進行排序以及合併, 編寫 Reduce 函數實現自己的邏輯, 對輸入的 Key-Value 進行處理, 轉為新的 Key-Value(K3和V3)輸出
  2. 設置 OutputFormat 處理並保存 Reduce 輸出的 Key-Value 數據

轉換為代碼,例子如下

Map階段

public class WordCountMapper extends Mapper<Text,Text,Text, LongWritable> {

    /**
     * K1-----V1
     * A -----A
     * B -----B
     * C -----C
     *
     * K2-----V2
     * A -----1
     * B -----1
     * C -----1
     *
     * @param key
     * @param value
     * @param context
     * @throws IOException
     * @throws InterruptedException
     */
    @Override
    protected void map(Text key, Text value, Context context) throws IOException, InterruptedException {
        context.write(key,new LongWritable(1));
    }
}

Reduce階段

public class WordCountReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
    @Override
    protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
        long count = 0L;
        for (LongWritable value : values) {
            count += value.get();
        }
        context.write(key, new LongWritable(count));
    }
}

shuffle階段,舉一個分區的例子:

public class WordCountPartitioner extends Partitioner<Text, LongWritable> {

    @Override
    public int getPartition(Text text, LongWritable longWritable, int i) {
        if (text.toString().length() > 5) {
            return 1;
        }
        return 0;
    }
}

主方法

public class JobMain extends Configured implements Tool {

    public static void main(String[] args) throws Exception {
        ToolRunner.run(new Configuration(),new JobMain(),args);
    }

    @Override
    public int run(String[] strings) throws Exception {
        Job job = Job.getInstance(super.getConf(), "wordcout");
        job.setJarByClass(JobMain.class);
        //輸入
        job.setInputFormatClass(TextInputFormat.class);
        TextInputFormat.addInputPath(job,new Path("/"));
        //map
        job.setMapperClass(WordCountMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(LongWritable.class);

        //shuffle階段
        job.setPartitionerClass(WordCountPartitioner.class);
        job.setNumReduceTasks(2);

        //reduce階段
        job.setReducerClass(WordCountReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(LongWritable.class);

        //輸出
        job.setOutputFormatClass(TextOutputFormat.class);
        TextOutputFormat.setOutputPath(job,new Path("/"));
        return 0;
    }
}

4.4 MapTask運行機制

什麼是 Hadoop?它主要能解決“大數據”的哪兩個問題?

具體步驟:

  1. 讀取數據組件 InputFormat(默認 TextInputFormat) 會通過 getSplits 方法對輸入目錄中文件進行邏輯切片規劃得到 splits, 有多少個 split 就對應啟動多少個MapTask . split 與 block 的對應關係默認是一對一
  2. 將輸入文件切分為 splits 之後, 由 RecordReader 對象 (默認是LineRecordReader)進行讀取, 以 n 作為分隔符, 讀取一行數據, 返回 <key,value> . Key 表示每行首字符偏移值,Value 表示這一行文本內容
  3. 讀取 split 返回 <key,value> , 進入用戶自己繼承的 Mapper 類中,執行用戶重寫的 map 函數, RecordReader 讀取一行這裡調用一次
  4. Mapper 邏輯結束之後, 將 Mapper 的每條結果通過 context.write 進行collect數據收集. 在 collect 中, 會先對其進行分區處理,默認使用 HashPartitioner。
  • MapReduce 提供 Partitioner 接口, 它的作用就是根據 Key 或 Value 及Reducer 的數量來決定當前的這對輸出數據最終應該交由哪個 Reduce task處理, 默認對 Key Hash 後再以 Reducer 數量取模. 默認的取模方式只是為了平均 Reducer 的處理能力, 如果用戶自己對 Partitioner 有需求, 可以訂製並設置到 Job 上。
  1. 接下來, 會將數據寫入內存, 內存中這片區域叫做環形緩衝區, 緩衝區的作用是批量收集Mapper 結果, 減少磁盤 IO 的影響. 我們的 Key/Value 對以及 Partition 的結果都會被寫入緩衝區. 當然, 寫入之前,Key 與 Value 值都會被序列化成字節數組。
  • 環形緩衝區其實是一個數組, 數組中存放着 Key, Value 的序列化數據和 Key,Value 的元數據信息, 包括 Partition, Key 的起始位置, Value 的起始位置以及Value 的長度. 環形結構是一個抽象概念
  • 緩衝區是有大小限制, 默認是 100MB. 當 Mapper 的輸出結果很多時, 就可能會撐爆內存, 所以需要在一定條件下將緩衝區中的數據臨時寫入磁盤, 然後重新利用這塊緩衝區. 這個從內存往磁盤寫數據的過程被稱為 Spill, 中文可譯為溢寫. 這個溢寫是由單獨線程來完成, 不影響往緩衝區寫 Mapper 結果的線程.溢寫線程啟動時不應該阻止 Mapper 的結果輸出, 所以整個緩衝區有個溢寫的比例 spill.percent . 這個比例默認是 0.8, 也就是當緩衝區的數據已經達到閾值 buffer size * spill percent = 100MB * 0.8 = 80MB , 溢寫線程啟動,鎖定這 80MB 的內存, 執行溢寫過程. Mapper 的輸出結果還可以往剩下的20MB 內存中寫, 互不影響
  1. 當溢寫線程啟動後, 需要對這 80MB 空間內的 Key 做排序 (Sort). 排序是 MapReduce模型默認的行為, 這裡的排序也是對序列化的字節做的排序如果 Job 設置過 Combiner, 那麼現在就是使用 Combiner 的時候了. 將有相同 Key 的 Key/Value 對的 Value 加起來, 減少溢寫到磁盤的數據量.Combiner 會優化 MapReduce 的中間結果, 所以它在整個模型中會多次使用那哪些場景才能使用 Combiner 呢? 從這裡分析, Combiner 的輸出是Reducer 的輸入, Combiner 絕不能改變最終的計算結果. Combiner 只應該用於那種 Reduce 的輸入 Key/Value 與輸出 Key/Value 類型完全一致, 且不影響最終結果的場景. 比如累加, 最大值等. Combiner 的使用一定得慎重, 如果用好, 它對 Job 執行效率有幫助, 反之會影響 Reducer 的最終結果
  2. 合併溢寫文件, 每次溢寫會在磁盤上生成一個臨時文件 (寫之前判斷是否有 Combiner),如果 Mapper 的輸出結果真的很大, 有多次這樣的溢寫發生, 磁盤上相應的就會有多個臨時文件存在. 當整個數據處理結束之後開始對磁盤中的臨時文件進行 Merge 合併, 因為最終的文件只有一個, 寫入磁盤, 並且為這個文件提供了一個索引文件, 以記錄每個reduce對應數據的偏移量

4.5 ReduceTask工作機制

什麼是 Hadoop?它主要能解決“大數據”的哪兩個問題?

Reduce 大致分為 copy、sort、reduce 三個階段,重點在前兩個階段。copy 階段包含一個 eventFetcher 來獲取已完成的 map 列表,由 Fetcher 線程去 copy 數據,在此過程中會啟動兩個 merge 線程,分別為 inMemoryMerger 和 onDiskMerger,分別將內存中的數據 merge 到磁盤和將磁盤中的數據進行 merge。待數據 copy 完成之後,copy 階段就完成了,開始進行 sort 階段,sort 階段主要是執行 finalMerge 操作,純粹的 sort 階段,完成之後就是 reduce 階段,調用用戶定義的 reduce 函數進行處理

詳細步驟:

  1. Copy階段 ,簡單地拉取數據。Reduce進程啟動一些數據copy線程(Fetcher),通過HTTP方式請求maptask獲取屬於自己的文件。
  2. Merge階段 。這裡的merge如map端的merge動作,只是數組中存放的是不同map端copy來的數值。Copy過來的數據會先放入內存緩衝區中,這裡的緩衝區大小要比map端的更為靈活。merge有三種形式:內存到內存;內存到磁盤;磁盤到磁盤。默認情況下第一種形式不啟用。當內存中的數據量到達一定閾值,就啟動內存到磁盤的merge。與map 端類似,這也是溢寫的過程,這個過程中如果你設置有Combiner,也是會啟用的,然後在磁盤中生成了眾多的溢寫文件。第二種merge方式一直在運行,直到沒有map端的數據時才結束,然後啟動第三種磁盤到磁盤的merge方式生成最終的文件。
  3. 合併排序 。把分散的數據合併成一個大的數據後,還會再對合併後的數據排序。
  4. 對排序後的鍵值對調用reduce方法 ,鍵相等的鍵值對調用一次reduce方法,每次調用會產生零個或者多個鍵值對,最後把這些輸出的鍵值對寫入到HDFS文件中。

4.6 Shuffle具體流程

map 階段處理的數據如何傳遞給 reduce 階段,是 MapReduce 框架中最關鍵的一個流程,這個流程就叫 shuffle
shuffle: 洗牌、發牌 ——(核心機制:數據分區,排序,分組,規約,合併等過程)

什麼是 Hadoop?它主要能解決“大數據”的哪兩個問題?
  1. Collect階段 :將 MapTask 的結果輸出到默認大小為 100M 的環形緩衝區,保存的是key/value,Partition 分區信息等。
  2. Spill階段 :當內存中的數據量達到一定的閥值的時候,就會將數據寫入本地磁盤,在將數據寫入磁盤之前需要對數據進行一次排序的操作,如果配置了 combiner,還會將有相同分區號和 key 的數據進行排序。
  3. Merge階段 :把所有溢出的臨時文件進行一次合併操作,以確保一個 MapTask 最終只產生一個中間數據文件。
  4. Copy階段 :ReduceTask 啟動 Fetcher 線程到已經完成 MapTask 的節點上複製一份屬於自己的數據,這些數據默認會保存在內存的緩衝區中,當內存的緩衝區達到一定的閥值的時候,就會將數據寫到磁盤之上。
  5. Merge階段 :在 ReduceTask 遠程複製數據的同時,會在後台開啟兩個線程對內存到本地的數據文件進行合併操作。
  6. Sort階段 :在對數據進行合併的同時,會進行排序操作,由於 MapTask 階段已經對數據進行了局部的排序,ReduceTask 只需保證 Copy 的數據的最終整體有效性即可。Shuffle 中的緩衝區大小會影響到 mapreduce 程序的執行效率,原則上說,緩衝區越大,磁盤io的次數越少,執行速度就越快

緩衝區的大小可以通過參數調整, 參數:mapreduce.task.io.sort.mb 默認100M

原創文章,作者:投稿專員,如若轉載,請註明出處:https://www.506064.com/zh-hant/n/253444.html

(0)
打賞 微信掃一掃 微信掃一掃 支付寶掃一掃 支付寶掃一掃
投稿專員的頭像投稿專員
上一篇 2024-12-14 02:28
下一篇 2024-12-14 02:28

相關推薦

發表回復

登錄後才能評論