本文目錄一覽:
Hadoop系列之HDFS架構
本篇文章翻譯了Hadoop系列下的 HDFS Architecture ,原文最初經過筆者翻譯後大概有6000字,之後筆者對內容進行了精簡化壓縮,從而使筆者自己和其他讀者們閱讀本文時能夠更加高效快速的完成對Hadoop的學習或複習。本文主要介紹了Hadoop的整體架構,包括但不限於節點概念、命名空間、數據容錯機制、數據管理方式、簡單的腳本命令和垃圾回收概念。
PS:筆者新手一枚,如果看出哪裡存在問題,歡迎下方留言!
Hadoop Distributed File System(HDFS)是高容錯、高吞吐量、用於處理海量數據的分佈式文件系統。
HDFS一般由成百上千的機器組成,每個機器存儲整個數據集的一部分數據,機器故障的快速發現與恢復是HDFS的核心目標。
HDFS對接口的核心目標是高吞吐量而非低延遲。
HDFS支持海量數據集合,一個集群一般能夠支持千萬以上數量級的文件。
HDFS應用需要對文件寫一次讀多次的接口模型,文件變更只支持尾部添加和截斷。
HDFS的海量數據與一致性接口特點,使得遷移計算以適應文件內容要比遷移數據從而支持計算更加高效。
HDFS支持跨平台使用。
HDFS使用主從架構。一個HDFS集群由一個NameNode、一個主服務器(用於管理系統命名空間和控制客戶端文件接口)、大量的DataNode(一般一個節點一個,用於管理該節點數據存儲)。HDFS對外暴露了文件系統命名空間並允許在文件中存儲用戶數據。一個文件被分成一個或多個塊,這些塊存儲在一組DataNode中。NameNode執行文件系統命名空間的打開關閉重命名等命令並記錄著塊和DataNode之間的映射。DataNode用於處理客戶端的讀寫請求和塊的相關操作。NameNode和DataNode一般運行在GNU/Linux操作系統上,HDFS使用Java語言開發的,因此NameNode和DataNode可以運行在任何支持Java的機器上,再加上Java語言的高度可移植性,使得HDFS可以發佈在各種各樣的機器上。一個HDFS集群中運行一個NameNode,其他機器每個運行一個(也可以多個,非常少見)DataNode。NameNode簡化了系統的架構,只用於存儲所有HDFS元數據,用戶數據不會進入該節點。下圖為HDFS架構圖:
HDFS支持傳統的分層文件管理,用戶或者應用能夠在目錄下創建目錄或者文件。文件系統命名空間和其他文件系統是相似的,支持創建、刪除、移動和重命名文件。HDFS支持用戶數量限制和訪問權限控制,不支持軟硬鏈接,用戶可以自己實現軟硬鏈接。NameNode控制該命名空間,命名空間任何變動幾乎都要記錄到NameNode中。應用可以在HDFS中對文件聲明複製次數,這個次數叫做複製係數,會被記錄到NameNode中。
HDFS將每個文件存儲為一個或多個塊,並為文件設置了塊的大小和複製係數從而支持文件容錯。一個文件所有的塊(除了最後一個塊)大小相同,後來支持了可變長度的塊。複製係數在創建文件時賦值,後續可以更改。文件在任何時候只能有一個writer。NameNode負責塊複製,它周期性收到每個數據節點的心跳和塊報告,心跳錶示數據節點的正常運作,塊報告包含了這個DataNode的所有塊。
副本存儲方案對於HDFS的穩定性和性能至關重要。為了提升數據可靠性、靈活性和充分利用網絡帶寬,HDFS引入了機架感知的副本存儲策略,該策略只是副本存儲策略的第一步,為後續優化打下基礎。大型HDFS集群一般運行於橫跨許多支架的計算機集群中,一般情況下同一支架中兩個節點數據傳輸快於不同支架。一種簡單的方法是將副本存放在單獨的機架上,從而防止丟失數據並提高帶寬,但是增加了數據寫入的負擔。一般情況下,複製係數是3,HDFS存儲策略是將第一份副本存儲到本地機器或者同一機架下一個隨機DataNode,另外兩份副本存儲到同一個遠程機架的不同DataNode。NameNode不允許同一DataNode存儲相同副本多次。在機架感知的策略基礎上,後續支持了 存儲類型和機架感知相結合的策略 ,簡單來說就是在機架感知基礎上判斷DataNode是否支持該類型的文件,不支持則尋找下一個。
HDFS讀取數據使用就近原則,首先尋找相同機架上是否存在副本,其次本地數據中心,最後遠程數據中心。
啟動時,NameNode進入安全模式,該模式下不會發生數據塊複製,NameNode接收來自DataNode的心跳和塊報告,每個塊都有一個最小副本數量n,數據塊在NameNode接受到該塊n次後,認為這個數據塊完成安全複製。當完成安全複製的數據塊比例達到一個可配的百分比值並再過30s後,NameNode退出安全模式,最後判斷是否仍然存在未達到最小複製次數的數據塊,並對這些塊進行複製操作。
NameNode使用名為EditLog的事務日誌持續記錄文件系統元數據的每一次改動(如創建文件、改變複製係數),使用名為FsImage的文件存儲全部的文件系統命名空間(包括塊到文件的映射關係和文件系統的相關屬性),EditLog和FsImage都存儲在NameNode本地文件系統中。NameNode在內存中保存着元數據和塊映射的快照,當NameNode啟動後或者某個配置項達到閾值時,會從磁盤中讀取EditLog和FsImage,通過EditLog新的記錄更新內存中的FsImage,再講新版本的FsImage刷新到磁盤中,然後截斷EditLog中已經處理的記錄,這個過程就是一個檢查點。檢查點的目的是確保文件系統通過在內存中使用元數據的快照從而持續的觀察元數據的變更並將快照信息存儲到磁盤FsImage中。檢查點通過下面兩個配置參數出發,時間周期(dfs.namenode.checkpoint.period)和文件系統事務數量(dfs.namenode.checkpoint.txns),二者同時配置時,滿足任意一個條件就會觸發檢查點。
所有的HDFS網絡協議都是基於TCP/IP的,客戶端建立一個到NameNode機器的可配置的TCP端口,用於二者之間的交互。DataNode使用DataNode協議和NameNode交互,RPC包裝了客戶端協議和DataNode協議,通過設計,NameNode不會發起RPC,只負責響應來自客戶端或者DataNode的RPC請求。
HDFS的核心目標是即使在失敗或者錯誤情況下依然能夠保證數據可靠性,三種常見失敗情況包括NameNode故障、DataNode故障和network partitions。
網絡分區可能會導致部分DataNode市區和NameNode的連接,NameNode通過心跳包判斷並將失去連接的DataNode標記為掛掉狀態,於是所有註冊到掛掉DataNode的數據都不可用了,可能會導致部分數據塊的複製數量低於了原本配置的複製係數。NameNode不斷地追蹤哪些需要複製的塊並在必要時候進行複製,觸發條件包含多種情況:DataNode不可用、複製亂碼、硬件磁盤故障或者認為增大負值係數。為了避免DataNode的狀態不穩定導致的複製風暴,標記DataNode掛掉的超時時間設置比較長(默認10min),用戶可以設置更短的時間間隔來標記DataNode為陳舊狀態從而避免在對讀寫性能要求高的請求上使用這些陳舊節點。
HDFS架構兼容數據各種重新平衡方案,一種方案可以在某個DataNode的空閑空間小於某個閾值時將數據移動到另一個DataNode上;在某個特殊文件突然有高的讀取需求時,一種方式是積極創建額外副本並且平衡集群中的其他數據。這些類型的平衡方案暫時還未實現(不太清楚現有方案是什麼…)。
存儲設備、網絡或者軟件的問題都可能導致從DataNode獲取的數據發生亂碼,HDFS客戶端實現了對文件內容的校驗,客戶端在創建文件時,會計算文件中每個塊的校驗值並存儲到命名空間,當客戶端取回數據後會使用校驗值對每個塊進行校驗,如果存在問題,客戶端就會去另一個DataNode獲取這個塊的副本。
FsImage和EditLog是HDFS的核心數據結構,他們的錯誤會導致整個HDFS掛掉,因此,NameNode應該支持時刻維持FsImage和EditLog的多分複製文件,它們的任何改變所有文件應該同步更新。另一個選擇是使用 shared storage on NFS 或者 distributed edit log 支持多個NameNode,官方推薦 distributed edit log 。
快照能夠存儲某一特殊時刻的數據副本,從而支持HDFS在發生錯誤時會滾到上一個穩定版本。
HDFS的應用場景是大的數據集下,且數據只需要寫一次但是要讀取一到多次並且支持流速讀取數據。一般情況下一個塊大小為128MB,因此一個文件被切割成128MB的大塊,且每個快可能分佈在不同的DataNode。
當客戶端在複製係數是3的條件下寫數據時,NameNode通過目標選擇算法收到副本要寫入的DataNode的集合,第1個DataNode開始一部分一部分的獲取數據,把每個部分存儲到本地並轉發給第2個DataNode,第2個DataNode同樣的把每個部分存儲到本地並轉發給第3個DataNode,第3個DataNode將數據存儲到本地,這就是管道複製。
HDFS提供了多種訪問方式,比如 FileSystem Java API 、 C language wrapper for this Java API 和 REST API ,而且還支持瀏覽器直接瀏覽。通過使用 NFS gateway ,客戶端可以在本地文件系統上安裝HDFS。
HDFS使用目錄和文件的方式管理數據,並提供了叫做 FS shell 的命令行接口,下面有一些簡單的命令:
DFSAdmin命令集合用於管理HDFS集群,這些命令只有集群管理員可以使用,下面有一些簡單的命令:
正常的HDFS安裝都會配置一個web服務,通過可配的TCP端口對外暴露命名空間,從而使得用戶可以通過web瀏覽器查看文件內容。
如果垃圾回收配置打開,通過FS shell移除的文件不會立刻刪除,而是會移動到一個垃圾文件專用的目錄(/user/username/.Trash),類似回收站,只要文件還存在於那個目錄下,則隨時可以被回復。絕大多數最近刪除的文件都被移動到了垃圾目錄(/user/username/.Trash/Current),並且HDFS每個一段時間在這個目錄下創建一個檢查點用於刪除已經過期的舊的檢查點,詳情見 expunge command of FS shell 。在垃圾目錄中的文件過期後,NameNode會刪除這個文件,文件刪除會引起這個文件的所有塊的空間空閑,需要注意的是在文件被刪除之後和HDFS的可用空間變多之間會有一些時間延遲(個人認為是垃圾回收機制佔用的時間)。下面是一些簡單的理解刪除文件的例子:
當文件複製係數減小時,NameNode會選擇多餘的需要刪除的副本,在收到心跳包時將刪除信息發送給DataNode。和上面一樣,這個刪除操作也是需要一些時間後,才能在集群上展現空閑空間的增加。
HDFS Architecture
我寫java程序能遠程調用hdfs命令嗎?
最差的情況可以在遠端寫個socketserver, 然後你寫個client連, 讓server調hadoop命令
如何使用Java API讀寫HDFS
Java API讀寫HDFS
public class FSOptr {
/**
* @param args
*/
public static void main(String[] args) throws Exception {
// TODO Auto-generated method stub
Configuration conf = new Configuration();
makeDir(conf);
rename(conf);
delete(conf);
}
// 創建文件目錄
private static void makeDir(Configuration conf) throws Exception {
FileSystem fs = FileSystem.get(conf);
Path dir = new Path(“/user/hadoop/data/20140318”);
boolean result = fs.mkdirs(dir);// 創建文件夾
System.out.println(“make dir :” + result);
// 創建文件,並寫入內容
Path dst = new Path(“/user/hadoop/data/20140318/tmp”);
byte[] buff = “hello,hadoop!”.getBytes();
FSDataOutputStream outputStream = fs.create(dst);
outputStream.write(buff, 0, buff.length);
outputStream.close();
FileStatus files[] = fs.listStatus(dst);
for (FileStatus file : files) {
System.out.println(file.getPath());
}
fs.close();
}
// 重命名文件
private static void rename(Configuration conf) throws Exception {
FileSystem fs = FileSystem.get(conf);
Path oldName = new Path(“/user/hadoop/data/20140318/1.txt”);
Path newName = new Path(“/user/hadoop/data/20140318/2.txt”);
fs.rename(oldName, newName);
FileStatus files[] = fs.listStatus(new Path(
“/user/hadoop/data/20140318”));
for (FileStatus file : files) {
System.out.println(file.getPath());
}
fs.close();
}
// 刪除文件
@SuppressWarnings(“deprecation”)
private static void delete(Configuration conf) throws Exception {
FileSystem fs = FileSystem.get(conf);
Path path = new Path(“/user/hadoop/data/20140318”);
if (fs.isDirectory(path)) {
FileStatus files[] = fs.listStatus(path);
for (FileStatus file : files) {
fs.delete(file.getPath());
}
} else {
fs.delete(path);
}
// 或者
fs.delete(path, true);
fs.close();
}
/**
* 下載,將hdfs文件下載到本地磁盤
*
* @param localSrc1
* 本地的文件地址,即文件的路徑
* @param hdfsSrc1
* 存放在hdfs的文件地址
*/
public boolean sendFromHdfs(String hdfsSrc1, String localSrc1) {
Configuration conf = new Configuration();
FileSystem fs = null;
try {
fs = FileSystem.get(URI.create(hdfsSrc1), conf);
Path hdfs_path = new Path(hdfsSrc1);
Path local_path = new Path(localSrc1);
fs.copyToLocalFile(hdfs_path, local_path);
return true;
} catch (IOException e) {
e.printStackTrace();
}
return false;
}
/**
* 上傳,將本地文件copy到hdfs系統中
*
* @param localSrc
* 本地的文件地址,即文件的路徑
* @param hdfsSrc
* 存放在hdfs的文件地址
*/
public boolean sendToHdfs1(String localSrc, String hdfsSrc) {
InputStream in;
try {
in = new BufferedInputStream(new FileInputStream(localSrc));
Configuration conf = new Configuration();// 得到配置對象
FileSystem fs; // 文件系統
try {
fs = FileSystem.get(URI.create(hdfsSrc), conf);
// 輸出流,創建一個輸出流
OutputStream out = fs.create(new Path(hdfsSrc),
new Progressable() {
// 重寫progress方法
public void progress() {
// System.out.println(“上傳完一個設定緩存區大小容量的文件!”);
}
});
// 連接兩個流,形成通道,使輸入流向輸出流傳輸數據,
IOUtils.copyBytes(in, out, 10240, true); // in為輸入流對象,out為輸出流對象,4096為緩衝區大小,true為上傳後關閉流
return true;
} catch (IOException e) {
e.printStackTrace();
}
} catch (FileNotFoundException e) {
e.printStackTrace();
}
return false;
}
/**
* 移動
*
* @param old_st原來存放的路徑
* @param new_st移動到的路徑
*/
public boolean moveFileName(String old_st, String new_st) {
try {
// 下載到服務器本地
boolean down_flag = sendFromHdfs(old_st, “/home/hadoop/文檔/temp”);
Configuration conf = new Configuration();
FileSystem fs = null;
// 刪除源文件
try {
fs = FileSystem.get(URI.create(old_st), conf);
Path hdfs_path = new Path(old_st);
fs.delete(hdfs_path);
} catch (IOException e) {
e.printStackTrace();
}
// 從服務器本地傳到新路徑
new_st = new_st + old_st.substring(old_st.lastIndexOf(“/”));
boolean uplod_flag = sendToHdfs1(“/home/hadoop/文檔/temp”, new_st);
if (down_flag uplod_flag) {
return true;
}
} catch (Exception e) {
e.printStackTrace();
}
return false;
}
// copy本地文件到hdfs
private static void CopyFromLocalFile(Configuration conf) throws Exception {
FileSystem fs = FileSystem.get(conf);
Path src = new Path(“/home/hadoop/word.txt”);
Path dst = new Path(“/user/hadoop/data/”);
fs.copyFromLocalFile(src, dst);
fs.close();
}
// 獲取給定目錄下的所有子目錄以及子文件
private static void getAllChildFile(Configuration conf) throws Exception {
FileSystem fs = FileSystem.get(conf);
Path path = new Path(“/user/hadoop”);
getFile(path, fs);
}
private static void getFile(Path path, FileSystem fs)throws Exception {
FileStatus[] fileStatus = fs.listStatus(path);
for (int i = 0; i fileStatus.length; i++) {
if (fileStatus[i].isDir()) {
Path p = new Path(fileStatus[i].getPath().toString());
getFile(p, fs);
} else {
System.out.println(fileStatus[i].getPath().toString());
}
}
}
//判斷文件是否存在
private static boolean isExist(Configuration conf,String path)throws Exception{
FileSystem fileSystem = FileSystem.get(conf);
return fileSystem.exists(new Path(path));
}
//獲取hdfs集群所有主機結點數據
private static void getAllClusterNodeInfo(Configuration conf)throws Exception{
FileSystem fs = FileSystem.get(conf);
DistributedFileSystem hdfs = (DistributedFileSystem)fs;
DatanodeInfo[] dataNodeStats = hdfs.getDataNodeStats();
String[] names = new String[dataNodeStats.length];
System.out.println(“list of all the nodes in HDFS cluster:”); //print info
for(int i=0; i dataNodeStats.length; i++){
names[i] = dataNodeStats[i].getHostName();
System.out.println(names[i]); //print info
}
}
//get the locations of a file in HDFS
private static void getFileLocation(Configuration conf)throws Exception{
FileSystem fs = FileSystem.get(conf);
Path f = new Path(“/user/cluster/dfs.txt”);
FileStatus filestatus = fs.getFileStatus(f);
BlockLocation[] blkLocations = fs.getFileBlockLocations(filestatus,0,filestatus.getLen());
int blkCount = blkLocations.length;
for(int i=0; i blkCount; i++){
String[] hosts = blkLocations[i].getHosts();
//Do sth with the block hosts
System.out.println(hosts);
}
}
//get HDFS file last modification time
private static void getModificationTime(Configuration conf)throws Exception{
FileSystem fs = FileSystem.get(conf);
Path f = new Path(“/user/cluster/dfs.txt”);
FileStatus filestatus = fs.getFileStatus(f);
long modificationTime = filestatus.getModificationTime(); // measured in milliseconds since the epoch
Date d = new Date(modificationTime);
System.out.println(d);
}
}
如何用java程序把本地文件拷貝到hdfs上並顯示進度
把程序打成jar包放到Linux上
轉到目錄下執行命令 hadoop jar mapreducer.jar /home/clq/export/java/count.jar hdfs://ubuntu:9000/out06/count/
上面一個是本地文件,一個是上傳hdfs位置
成功後出現:打印出來,你所要打印的字符。
package com.clq.hdfs;
import java.io.BufferedInputStream;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.Progressable;
public class FileCopyWithProgress {
//********************************
//把本地的一個文件拷貝到hdfs上
//********************************
public static void main(String[] args) throws IOException {
String localSrc = args[0];
String dst = args[1];
InputStream in = new BufferedInputStream(new FileInputStream(localSrc));
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(dst), conf);
FSDataOutputStream out = fs.create(new Path(dst), new Progressable() {
@Override
public void progress() {
System.out.print(“.”);
}
});
IOUtils.copyBytes(in, out, conf, true);
}
}
可能出現異常:
Exception in thread “main” org.apache.hadoop.ipc.RemoteException: java.io.IOException: Cannot create /out06; already exists as a directory
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInternal(FSNamesystem.java:1569)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFile(FSNamesystem.java:1527)
at org.apache.hadoop.hdfs.server.namenode.NameNode.create(NameNode.java:710)
at org.apache.hadoop.hdfs.server.namenode.NameNode.create(NameNode.java:689)
at sun.reflect.GeneratedMethodAccessor7.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:587)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1432)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1428)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
說明你這個路徑在hdfs上已經存在,換一個即可。
原創文章,作者:DBYCZ,如若轉載,請註明出處:https://www.506064.com/zh-hk/n/129708.html