本文目錄一覽:
- 1、使用Java API操作HDFS時,_方法用於獲取文件列表?
- 2、java怎麼連接hdfs文件系統,需要哪些包?
- 3、java修改hdfs上文件權限問題
- 4、用java向hdfs上傳文件時,如何實現斷點續傳
- 5、HDFS 系統架構
- 6、如何使用Java API讀寫HDFS
使用Java API操作HDFS時,_方法用於獲取文件列表?
當使用 Java API 操作 HDFS 時,可以使用 FileSystem.listFiles() 方法來獲取文件列表。該方法接受一個 Path 對象,表示要列舉文件的目錄,並返回一個 RemoteIteratorLocatedFileStatus 對象,該對象可用於迭代目錄中的文件。
例如,下面的代碼演示了如何使用 listFiles() 方法來獲取 HDFS 上的文件列表:
// 定義 HDFS 連接配置
Configuration conf = new Configuration();
// 獲取 HDFS FileSystem 對象
FileSystem fs = FileSystem.get(conf);
// 定義要列舉文件的目錄
Path dirPath = new Path(“/user/hadoop”);
// 獲取文件列表
RemoteIteratorLocatedFileStatus fileIter = fs.listFiles(dirPath, true);
// 遍歷文件列表
while (fileIter.hasNext()) {
// 獲取當前文件信息
LocatedFileStatus fileStatus = fileIter.next();
// 輸出文件名稱和大小
System.out.println(fileStatus.getPath().getName() + ” : ” + fileStatus.getLen());
}
java怎麼連接hdfs文件系統,需要哪些包?
apache的Hadoop項目提供一類api可以通過java工程操作hdfs中的文件,包括:文件打開,讀寫,刪除等、目錄的創建,刪除,讀取目錄中所有文件等。
1、到下載Hadoop,解壓後把所有jar加入項目的lib里
2、程序處理步驟: 1)得到Configuration對象,2)得到FileSystem對象,3)進行文件操作,簡單示例如下:
/**
*
*/
package org.jrs.wlh;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
/**
* @PutMeger.java
* java操作hdfs 往 hdfs中上傳數據
* @version $Revision$/br
* update: $Date$
*/
public class PutMeger {
public static void main(String[] args) throws IOException {
String[] str = new String[]{“E:\\hadoop\\UploadFileClient.java”,”hdfs://master:9000/user/hadoop/inccnt.java”};
Configuration conf = new Configuration();
FileSystem fileS= FileSystem.get(conf);
FileSystem localFile = FileSystem.getLocal(conf); //得到一個本地的FileSystem對象
Path input = new Path(str[0]); //設定文件輸入保存路徑
Path out = new Path(str[1]); //文件到hdfs輸出路徑
try{
FileStatus[] inputFile = localFile.listStatus(input); //listStatus得到輸入文件路徑的文件列表
FSDataOutputStream outStream = fileS.create(out); //創建輸出流
for (int i = 0; i inputFile.length; i++) {
System.out.println(inputFile[i].getPath().getName());
FSDataInputStream in = localFile.open(inputFile[i].getPath());
byte buffer[] = new byte[1024];
int bytesRead = 0;
while((bytesRead = in.read(buffer))0){ //按照字節讀取數據
System.out.println(buffer);
outStream.write(buffer,0,bytesRead);
}
in.close();
}
}catch(Exception e){
e.printStackTrace();
}
}
}
java修改hdfs上文件權限問題
看來你是開啟了HDFS的權限檢查功能,這樣你訪問HDFS,NameNode都會檢查訪問用戶的權限的。
你現在想要修改/process/startall.txt文件的權限,那process目錄以及startall.txt的有效用戶、有效組以及其權限是什麼呢?
假設process目錄與startall.txt原始的有效用戶和有效組分別為root和supergroup,原始權限為750的話,你若在自己電腦運行上述程序,它會自動獲取當前計算機的登錄用戶,假設為wyc,去訪問HDFS,很顯然,你的程序連process目錄都進不去的。
此外,想要更改一個目錄或文件的權限,當前用戶則必須是有效用戶或超級用戶才可以。
想要解決的話,嘿嘿, 如果你設置的hadoop.security.authentication property,也就是認證方式為simple的話(默認就是simple),那還可以鑽該認證方式的空子,運行程序是偽裝成有效用戶或者超級用戶即可。
此外,有一行代碼需要修改一下,我在實驗後發現設置權限那一行有誤,如下:
//hdfs.setPermission(dstPath, new FsPermission((short) 775));
hdfs.setPermission(dstPath, new FsPermission(“755”));
用java向hdfs上傳文件時,如何實現斷點續傳
@Component(“javaLargeFileUploaderServlet”)
@WebServlet(name = “javaLargeFileUploaderServlet”, urlPatterns = { “/javaLargeFileUploaderServlet” })
public class UploadServlet extends HttpRequestHandlerServlet
implements HttpRequestHandler {
private static final Logger log = LoggerFactory.getLogger(UploadServlet.class);
@Autowired
UploadProcessor uploadProcessor;
@Autowired
FileUploaderHelper fileUploaderHelper;
@Autowired
ExceptionCodeMappingHelper exceptionCodeMappingHelper;
@Autowired
Authorizer authorizer;
@Autowired
StaticStateIdentifierManager staticStateIdentifierManager;
@Override
public void handleRequest(HttpServletRequest request, HttpServletResponse response)
throws IOException {
log.trace(“Handling request”);
Serializable jsonObject = null;
try {
// extract the action from the request
UploadServletAction actionByParameterName =
UploadServletAction.valueOf(fileUploaderHelper.getParameterValue(request, UploadServletParameter.action));
// check authorization
checkAuthorization(request, actionByParameterName);
// then process the asked action
jsonObject = processAction(actionByParameterName, request);
// if something has to be written to the response
if (jsonObject != null) {
fileUploaderHelper.writeToResponse(jsonObject, response);
}
}
// If exception, write it
catch (Exception e) {
exceptionCodeMappingHelper.processException(e, response);
}
}
private void checkAuthorization(HttpServletRequest request, UploadServletAction actionByParameterName)
throws MissingParameterException, AuthorizationException {
// check authorization
// if its not get progress (because we do not really care about authorization for get
// progress and it uses an array of file ids)
if (!actionByParameterName.equals(UploadServletAction.getProgress)) {
// extract uuid
final String fileIdFieldValue = fileUploaderHelper.getParameterValue(request, UploadServletParameter.fileId, false);
// if this is init, the identifier is the one in parameter
UUID clientOrJobId;
String parameter = fileUploaderHelper.getParameterValue(request, UploadServletParameter.clientId, false);
if (actionByParameterName.equals(UploadServletAction.getConfig) parameter != null) {
clientOrJobId = UUID.fromString(parameter);
}
// if not, get it from manager
else {
clientOrJobId = staticStateIdentifierManager.getIdentifier();
}
// call authorizer
authorizer.getAuthorization(
request,
actionByParameterName,
clientOrJobId,
fileIdFieldValue != null ? getFileIdsFromString(fileIdFieldValue).toArray(new UUID[] {}) : null);
}
}
private Serializable processAction(UploadServletAction actionByParameterName, HttpServletRequest request)
throws Exception {
log.debug(“Processing action ” + actionByParameterName.name());
Serializable returnObject = null;
switch (actionByParameterName) {
case getConfig:
String parameterValue = fileUploaderHelper.getParameterValue(request, UploadServletParameter.clientId, false);
returnObject =
uploadProcessor.getConfig(
parameterValue != null ? UUID.fromString(parameterValue) : null);
break;
case verifyCrcOfUncheckedPart:
returnObject = verifyCrcOfUncheckedPart(request);
break;
case prepareUpload:
returnObject = prepareUpload(request);
break;
case clearFile:
uploadProcessor.clearFile(UUID.fromString(fileUploaderHelper.getParameterValue(request, UploadServletParameter.fileId)));
break;
case clearAll:
uploadProcessor.clearAll();
break;
case pauseFile:
ListUUID uuids = getFileIdsFromString(fileUploaderHelper.getParameterValue(request, UploadServletParameter.fileId));
uploadProcessor.pauseFile(uuids);
break;
case resumeFile:
returnObject =
uploadProcessor.resumeFile(UUID.fromString(fileUploaderHelper.getParameterValue(request, UploadServletParameter.fileId)));
break;
case setRate:
uploadProcessor.setUploadRate(UUID.fromString(fileUploaderHelper.getParameterValue(request, UploadServletParameter.fileId)),
Long.valueOf(fileUploaderHelper.getParameterValue(request, UploadServletParameter.rate)));
break;
case getProgress:
returnObject = getProgress(request);
break;
}
return returnObject;
}
ListUUID getFileIdsFromString(String fileIds) {
String[] splittedFileIds = fileIds.split(“,”);
ListUUID uuids = Lists.newArrayList();
for (int i = 0; i splittedFileIds.length; i++) {
uuids.add(UUID.fromString(splittedFileIds[i]));
}
return uuids;
}
private Serializable getProgress(HttpServletRequest request)
throws MissingParameterException {
Serializable returnObject;
String[] ids =
new Gson()
.fromJson(fileUploaderHelper.getParameterValue(request, UploadServletParameter.fileId), String[].class);
CollectionUUID uuids = Collections2.transform(Arrays.asList(ids), new FunctionString, UUID() {
@Override
public UUID apply(String input) {
return UUID.fromString(input);
}
});
returnObject = Maps.newHashMap();
for (UUID fileId : uuids) {
try {
ProgressJson progress = uploadProcessor.getProgress(fileId);
((HashMapString, ProgressJson) returnObject).put(fileId.toString(), progress);
}
catch (FileNotFoundException e) {
log.debug(“No progress will be retrieved for ” + fileId + ” because ” + e.getMessage());
}
}
return returnObject;
}
private Serializable prepareUpload(HttpServletRequest request)
throws MissingParameterException, IOException {
// extract file information
PrepareUploadJson[] fromJson =
new Gson()
.fromJson(fileUploaderHelper.getParameterValue(request, UploadServletParameter.newFiles), PrepareUploadJson[].class);
// prepare them
final HashMapString, UUID prepareUpload = uploadProcessor.prepareUpload(fromJson);
// return them
return Maps.newHashMap(Maps.transformValues(prepareUpload, new FunctionUUID, String() {
public String apply(UUID input) {
return input.toString();
};
}));
}
private Boolean verifyCrcOfUncheckedPart(HttpServletRequest request)
throws IOException, MissingParameterException, FileCorruptedException, FileStillProcessingException {
UUID fileId = UUID.fromString(fileUploaderHelper.getParameterValue(request, UploadServletParameter.fileId));
try {
uploadProcessor.verifyCrcOfUncheckedPart(fileId,
fileUploaderHelper.getParameterValue(request, UploadServletParameter.crc));
}
catch (InvalidCrcException e) {
// no need to log this exception, a fallback behaviour is defined in the
// throwing method.
// but we need to return something!
return Boolean.FALSE;
}
return Boolean.TRUE;
}
}
HDFS 系統架構
HDFS Architecture
Hadoop Distributed File System (HDFS) 是設計可以運行於普通商業硬件上的分布式文件系統。它跟現有的分布式文件系統有很多相通的地方,但是區別也是顯著的。HDFS具有高度容錯性能,被設計運行於低成本硬件上。HDFS可以嚮應用提供高吞吐帶寬,適合於大數據應用。HDFS 放寬了一些 POSIX 的要求,以開啟對文件系統數據的流式訪問。HDFS 最初是作為Apache Nutch web 搜索引擎項目的基礎設施開發的。HDFS 現在是 Apache Hadoop 核心項目的一部分。
HDFS是主從架構。一個HDFS集群包含一個NameNode,一個管理文件系統命名空間和控制客戶端訪問文件的master server。以及,若干的 DataNodes,通常集群的每個node一個,管理運行DataNode的節點上的存儲。HDFS 發布一個文件系統命名空間,並允許用戶數據已文件的形式存儲在上面。內部,一個文件被分成一個或多個塊,存儲在一組DataNodes上。NameNode 執行文件系統命名空間操作,比如:打開、關閉、重命名文件或目錄。它還確定塊到DataNodes的映射。DataNodes 負責向文件系統客戶端提供讀寫服務。DataNodes 根據 NameNode 的指令執行塊的創建、刪除以及複製。
NameNode 和 DataNode 是設計運行於普通商業機器的軟件。這些機器通常運行 GNU/Linux 操作系統。HDFS 是Java 語言編寫的;任何支持Java的機器都可以運行NameNode or DataNode 軟件。使用高移植性Java語言,意味着HDFS可以部署在很大範圍的機器上。一個典型的部署就是一台特定的機器只運行NameNode 軟件,而集群內的其他機器運行DataNode 軟件的一個實例。這種架構不排除一台機器上運行多個DataNodes ,但是在實際部署中很少見。
單 NameNode 節點的存在大大簡化了架構。NameNode 是所有HDFS 元數據的仲裁和倉庫。系統設計上,用戶數據永遠不經過NameNode。
HDFS 支持傳統的文件分級組織。用戶或應用可以創建目錄,並在目錄內存儲文件。 文件系統命名空間的層次結構跟其他文件系統類似;可以創建、刪除、移動、重命名文件。HDFS 支持 user quotas 和 access permissions 。 HDFS 不支持軟、硬鏈接。但是,HDFS 架構不排除實現這些功能。
雖然HDFS遵守 文件系統命名約定 ,一些路徑和名稱 (比如/.reserved 和.snapshot ) 保留了。比如功能 transparent encryption 和 snapshot 就使用的保留路徑。
NameNode 維護文件系統命名空間。任何文件系統命名空間或屬性的變化,都會被NameNode記錄。 應用可以指定HDFS應維護的文件副本數量。文件副本的數量被稱為該文件的複製因子 replication factor 。該信息存儲於NameNode。
HDFS 被設計用於在一個大規模集群上跨機器可靠地存儲巨大的文件。它以一序列的塊的方式存儲文件。每個文件都可以配置塊尺寸和複製因子。
一個文件除了最後一個塊外,其他的塊一樣大。在 append 和 hsync 添加了可變長度塊的支持後,用戶可以啟動一個新的塊,而不用填充最後一個塊到配置的塊大小。
應用可以指定一個文件的副本數量。複製因子可以在創建的時候指定,也可以以後更改。HDFS的文件只寫一次(除了 appends 和 truncates) ,並在任何時候只允許一個 writer 。
NameNode 指定塊複製的所有決策。它周期性的從集群的每個DataNodes 接受 Heartbeat 和 Blockreport。Heartbeat 的接受代表 DataNode 工作正常。Blockreport 包含了DataNode上所有塊的清單。
副本的位置對HDFS的可靠性和性能至關重要。副本位置的優化是HDFS和其他大多數分布式文件系統的區別。這是一個需要大量調優和經驗的特性。Rack-aware 複製策略的目的就是提高數據可靠性,可用性和網絡帶寬利用率。當前副本位置策略的實現是這個方向的第一步。實施該策略的短期目標是在生產環境驗證它,了解其更多的行為,為測試和研究更複雜的策略打下基礎。
大型HDFS實例運行在跨多個Rack的集群服務器上。不同rack的兩個node通信需要通過交換機。大多數情況下,同一rack內的帶寬大於rack之間的帶寬。
NameNode 通過在 Hadoop Rack Awareness 內的進程描述 判斷DataNode 屬於哪個rack id。一個簡單但是並非最佳的策略是將副本分布於不同的racks。這可以防止整個機架發生故障時丟失數據,並允許在讀取數據時使用多個機架的帶寬。該策略在群集中均勻地分布副本,使得組件故障時很容易平衡負載。 但是,該策略會增加寫入成本,因為寫入操作需要將塊傳輸到多個機架。
一般,複製因子設置為3, HDFS 的分布策略是:如果writer在datanode上則將一個副本放到本地機器, 如果writer不在datanode上則將一個副本放到writer所在機櫃的隨機datanode 上;另一個副本位於不同機架的node上;最後一個副本位於同一遠程機架的不同node上。 該策略減少了機架間的寫流量,提升了寫性能。機架故障的概率遠小於節點故障的概率;此策略不會影響數據可靠性和可用性承諾。但是,在讀取數據時,它確實減少了聚合帶寬,因為塊存儲於兩個機櫃而不是三個機櫃內。使用此策略,副本不會均勻的分布於機架上。1/3 副本 位於同一節點, 2/3 副本位於同一機架, 另1/3副本位於其他機架。該策略提升了寫性能而不影響數據可靠性和讀性能。
如果複製因子大於3,那麼第4個及以後的副本則隨機放置,只要滿足每個機架的副本在(replicas – 1) / racks + 2)之下。
因為 NameNode 不允許 DataNodes 擁有同一個塊的多個副本,所以副本的最大數就是DataNodes的數量。
在把對 存儲類型和存儲策略 的支持添加到 HDFS 後,除了上面介紹的rack awareness外, NameNode 會考慮其他副本排布的策略。NameNode 先基於rack awareness 選擇節點,然後檢查候選節點有文件關聯的策略需要的存儲空間。 如果候選節點沒有該存儲類型, NameNode 會查找其他節點。如果在第一條路徑中找不到足夠的節點來放置副本,NameNode會在第二條路徑中查找具有回滾存儲類型的節點。 、
當前,這裡描述的默認副本排布策略正在使用中。
為了最小化全局帶寬消耗和讀取延遲, HDFS 會嘗試從最靠近reader的副本響應讀取請求。如果在reader節點的同一機架上上存在副本,則該副本有限響應讀請求。如果HDFS集群跨多個數據中心,則本地數據中心優先。
啟動時,NameNode 會進入一個稱為 Safemode 的特殊狀態。當NameNode處於Safemode狀態時,不會複製數據塊。NameNode從DataNodes接收Heartbeat和Blockreport消息。Blockreport包含DataNode託管的數據塊列表。每個塊都指定了最小副本數。當數據塊的最小副本數已與NameNode簽入時,該塊被認為是安全複製的。在NameNode簽入安全複製數據塊的已配置百分比(加上額外的30秒)後,NameNode退出Safemode狀態。然後,它判斷列表內的數據塊清單是否少於副本指定的數量。NameNode 然後複製這些塊給其他 DataNodes。
HDFS 命名空間由 NameNode 存儲。NameNode 使用事務日誌 EditLog 來持久化的保存系統元數據的每次變更。比如,在HDFS創建一個新文件,NameNode會在 EditLog 插入一條記錄來指示該變更。類似的,變更文件的複製因子也會在 EditLog 插入一條新記錄。NameNode 以文件的形式,將 EditLog 保存在本地OS文件系統上。整個文件系統命名空間,包括塊到文件的映射、文件系統屬性,都存儲於名字為 FsImage 的文件內。 FsImage 也以文件的形式,存儲在NameNode的本地文件系統上。
NameNode 將包含整個文件系統和塊映射的image保存在內存中。當NameNode啟動時,或檢查點被預先定義的閾值觸發時,它會從磁盤讀取 FsImage 和 EditLog ,把 EditLog 內的事物應用到內存中的FsImage,再將新版本刷新回磁盤的新 FsImage 。然後會截斷舊的 EditLog ,因為它的事物已經應用到了持久化的 FsImage 上。 這個過程稱為檢查點 checkpoint 。檢查點的目的是通過對文件系統元數據進行快照並保存到FsImage,來確保HDFS擁有文件系統元數據的一致性視圖。儘管讀取 FsImage 是高效的,但是對 FsImage 直接增量修改是不高效的。不是對每次編輯修改 FsImage ,而是將每次編輯保存到 Editlog 。在檢查點期間,將 Editlog 的變更應用到 FsImage 。一個檢查點可以在固定周期(dfs.namenode.checkpoint.period)(以秒為單位)觸發,也可以文件系統事物數量達到某個值(dfs.namenode.checkpoint.txns)的時候觸發。
DataNode 在本地文件系統上以文件的形式存儲 HDFS data 。DataNode 不知道 HDFS 文件。它將HDFS data 的每個塊以獨立的文件存儲於本地文件系統上。DataNode 不在同一目錄創建所有的文件。而是,使用heuristic來確定每個目錄的最佳文件數量,並適當的創建子目錄。在一個目錄創建所有的本地文件是不好的,因為本地文件系統可能不支持單目錄的海量文件數量。當DataNode啟動的時候,它掃描本地文件系統,生成與本地文件系統一一對應的HDFS數據塊列表,然後報告給NameNode。這個報告稱為 Blockreport。
所有的HDFS通信協議都在TCP/IP協議棧上。客戶端與NameNode指定的端口建立連接。與NameNode以ClientProtocol 通信。DataNodes與NameNode以DataNode Protocol進行通信。遠程過程調用(RPC)封裝了Client Protocol 和 DataNode Protocol。設計上,NameNode從不啟動任何RPCs。相反,它只應答DataNodes or clients發出的RPC請求。
HDFS的主要目標是可靠的存儲數據,即使是在故障的情況下。常見故障類型有三種: NameNode failures , DataNode failures 和 network partitions 。
每個DataNode都周期性的向NameNode發送心跳信息。 一個 network partition 可能導致DataNodes子集丟失與NameNode的連接。NameNode會基於心跳信息的缺失來偵測這種情況。NameNode將沒有心跳信息的DataNodes標記為 dead ,並不再轉發任何IO請求給它們。任何註冊到dead DataNode的數據對HDFS將不再可用。DataNode death會導致某些塊的複製因子低於它們指定的值。NameNode不斷跟蹤需要複製的塊,並在必要時啟動複製。很多因素會導致重新複製:DataNode不可用,副本損壞,DataNode上硬盤故障,複製因子增加。
標記 DataNodes dead 的超時時間保守地設置了較長時間 (默認超過10分鐘) 以避免DataNodes狀態抖動引起的複製風暴。對於性能敏感的應用,用戶可以設置較短的周期來標記DataNodes為過期,讀寫時避免過期節點。
HDFS 架構支持數據再平衡schemes。如果一個DataNode的空餘磁盤空間低於閾值,sheme就會將數據從一個DataNode 移動到另外一個。在某些文件需求突然增長的情況下,sheme可能會在集群內動態的創建額外的副本,並再平衡其他數據。這些類型的數據再平衡schemes還沒有實現。
有可能從DataNode獲取的數據塊,到達的時候損壞了。這種損壞可能是由於存儲設備故障、網絡故障、軟件bug。HDFS客戶端軟件會HDFS的內容進行校驗。當客戶端創建HDFS文件的時候,它計算文件每個塊的校驗值,並以獨立的隱藏文件存儲在同一HDFS命名空間內。當客戶端檢索文件時候,它會校驗從每個DataNode獲取的數據,是否與關聯校驗文件內的校驗值匹配。 如果不匹配,客戶端可以從另外擁有副本塊的DataNode檢索。
FsImage 和 EditLog 是HDFS的核心數據結構。這些文件的損壞將導致HDFS實例異常。 因此,NameNode可以配置為支持多 FsImage 和 EditLog 副本模式。任何對 FsImage or EditLog 的更新都會導致每個 FsImages 和 EditLogs 的同步更新。 FsImage 和 EditLog 的同步更新會導致降低命名空間每秒的事物效率。但是,這種降級是可以接受的,因為HDFS應用是數據密集型,而不是元數據密集型。當NameNode重啟的時候,它會選擇最新的一致的 FsImage 和 EditLog 。
另外一種提供故障恢復能力的辦法是多NameNodes 開啟HA,以 shared storage on NFS or distributed edit log (called Journal)的方式。推薦後者。
Snapshots – 快照,支持在特定時刻存儲數據的副本。快照功能的一個用法,可以回滾一個故障的HDFS實例到已知工作良好的時候。
HDFS被設計與支持超大的文件。與HDFS適配的軟件都是處理大數據的。這些應用都只寫一次,但是它們會讀取一或多次,並且需要滿足流式讀速度。HDFS支持文件的 一次寫入-多次讀取 語義。 HDFS典型的塊大小是128 MB.。因此,HDFS文件被分割為128 MB的塊,可能的話每個塊都位於不同的DataNode上。
當客戶端以複製因子3寫入HDFS文件時,NameNode以 複製目標選擇算法 replication target choosing algorithm 檢索DataNodes 列表。該列表包含了承載該數據塊副本的DataNodes清單。然後客戶端寫入到第一個DataNode。第一DataNode逐步接受數據的一部分,將每一部分內容寫入到本地倉庫,並將該部分數據傳輸給清單上的第二DataNode。第二DataNode,按順序接受數據塊的每個部分,寫入到倉庫,然後將該部分數據刷新到第三DataNode。最終,第三DataNode將數據寫入到其本地倉庫。
因此,DataNode從管道的前一個DataNode獲取數據,同時轉發到管道的後一個DataNode。因此,數據是以管道的方式從一個DataNode傳輸到下一個的。
應用訪問HDFS有很多方式。原生的,HDFS 提供了 FileSystem Java API 來給應用調用。還提供了 C language wrapper for this Java API 和 REST API 。另外,還支持HTTP瀏覽器查看HDFS實例的文件。 通過使用 NFS gateway ,HDFS還可以掛載到客戶端作為本地文件系統的一部分。
HDFS的用戶數據是以文件和目錄的形式組織的。它提供了一個命令行接口 FS shell 來提供用戶交互。命令的語法類似於其他shell (比如:bash, csh)。如下是一些範例:
FS shell 的目標是向依賴於腳本語言的應用提供與存儲數據的交互。
DFSAdmin 命令用於管理HDFS集群。這些命令僅給HDFS管理員使用。如下範例:
如果啟用了回收站配置,那麼文件被 FS Shell 移除時並不會立即從HDFS刪除。HDFS會將其移動到回收站目錄(每個用戶都有回收站,位於 /user/username/.Trash )。只要文件還在回收站內,就可以快速恢復。
最近刪除的文件大多數被移動到 current 回收站目錄 ( /user/username/.Trash/Current ),在配置周期內,HDFS給 current目錄內的文件創建檢查點 checkpoints (位於 /user/username/.Trash/date ) ,並刪除舊的檢查點。參考 expunge command of FS shell 獲取更多關於回收站檢查點的信息。
在回收站過期後,NameNode從HDFS命名空間刪除文件。刪除文件會將文件關聯的塊釋放。注意,在用戶刪除文件和HDFS增加free空間之間,會有一個明顯的延遲。
如下範例展示了FS Shell如何刪除文件。我們在delete目錄下創建兩個文件(test1 test2)
我們刪除文件 test1。如下命令顯示文件被移動到回收站。
現在我們嘗試以skipTrash參數刪除文件,該參數將不將文件發送到回收站。文件將會從HDFS完全刪除。
我們檢查回收站,只有文件test1。
如上,文件test1進了回收站,文件test2被永久刪除了。
當縮減文件的複製因子時,NameNode選擇可以被刪除的多餘副本。下一個Heartbeat會通報此信息給DataNode。DataNode然後會刪除響應的塊,相應的剩餘空間會顯示在集群內。同樣,在setReplication API調用完成和剩餘空間在集群顯示之間會有一個時間延遲。
Hadoop JavaDoc API .
HDFS source code:
如何使用Java API讀寫HDFS
Java API讀寫HDFS
public class FSOptr {
/**
* @param args
*/
public static void main(String[] args) throws Exception {
// TODO Auto-generated method stub
Configuration conf = new Configuration();
makeDir(conf);
rename(conf);
delete(conf);
}
// 創建文件目錄
private static void makeDir(Configuration conf) throws Exception {
FileSystem fs = FileSystem.get(conf);
Path dir = new Path(“/user/hadoop/data/20140318”);
boolean result = fs.mkdirs(dir);// 創建文件夾
System.out.println(“make dir :” + result);
// 創建文件,並寫入內容
Path dst = new Path(“/user/hadoop/data/20140318/tmp”);
byte[] buff = “hello,hadoop!”.getBytes();
FSDataOutputStream outputStream = fs.create(dst);
outputStream.write(buff, 0, buff.length);
outputStream.close();
FileStatus files[] = fs.listStatus(dst);
for (FileStatus file : files) {
System.out.println(file.getPath());
}
fs.close();
}
// 重命名文件
private static void rename(Configuration conf) throws Exception {
FileSystem fs = FileSystem.get(conf);
Path oldName = new Path(“/user/hadoop/data/20140318/1.txt”);
Path newName = new Path(“/user/hadoop/data/20140318/2.txt”);
fs.rename(oldName, newName);
FileStatus files[] = fs.listStatus(new Path(
“/user/hadoop/data/20140318”));
for (FileStatus file : files) {
System.out.println(file.getPath());
}
fs.close();
}
// 刪除文件
@SuppressWarnings(“deprecation”)
private static void delete(Configuration conf) throws Exception {
FileSystem fs = FileSystem.get(conf);
Path path = new Path(“/user/hadoop/data/20140318”);
if (fs.isDirectory(path)) {
FileStatus files[] = fs.listStatus(path);
for (FileStatus file : files) {
fs.delete(file.getPath());
}
} else {
fs.delete(path);
}
// 或者
fs.delete(path, true);
fs.close();
}
/**
* 下載,將hdfs文件下載到本地磁盤
*
* @param localSrc1
* 本地的文件地址,即文件的路徑
* @param hdfsSrc1
* 存放在hdfs的文件地址
*/
public boolean sendFromHdfs(String hdfsSrc1, String localSrc1) {
Configuration conf = new Configuration();
FileSystem fs = null;
try {
fs = FileSystem.get(URI.create(hdfsSrc1), conf);
Path hdfs_path = new Path(hdfsSrc1);
Path local_path = new Path(localSrc1);
fs.copyToLocalFile(hdfs_path, local_path);
return true;
} catch (IOException e) {
e.printStackTrace();
}
return false;
}
/**
* 上傳,將本地文件copy到hdfs系統中
*
* @param localSrc
* 本地的文件地址,即文件的路徑
* @param hdfsSrc
* 存放在hdfs的文件地址
*/
public boolean sendToHdfs1(String localSrc, String hdfsSrc) {
InputStream in;
try {
in = new BufferedInputStream(new FileInputStream(localSrc));
Configuration conf = new Configuration();// 得到配置對象
FileSystem fs; // 文件系統
try {
fs = FileSystem.get(URI.create(hdfsSrc), conf);
// 輸出流,創建一個輸出流
OutputStream out = fs.create(new Path(hdfsSrc),
new Progressable() {
// 重寫progress方法
public void progress() {
// System.out.println(“上傳完一個設定緩存區大小容量的文件!”);
}
});
// 連接兩個流,形成通道,使輸入流向輸出流傳輸數據,
IOUtils.copyBytes(in, out, 10240, true); // in為輸入流對象,out為輸出流對象,4096為緩衝區大小,true為上傳後關閉流
return true;
} catch (IOException e) {
e.printStackTrace();
}
} catch (FileNotFoundException e) {
e.printStackTrace();
}
return false;
}
/**
* 移動
*
* @param old_st原來存放的路徑
* @param new_st移動到的路徑
*/
public boolean moveFileName(String old_st, String new_st) {
try {
// 下載到服務器本地
boolean down_flag = sendFromHdfs(old_st, “/home/hadoop/文檔/temp”);
Configuration conf = new Configuration();
FileSystem fs = null;
// 刪除源文件
try {
fs = FileSystem.get(URI.create(old_st), conf);
Path hdfs_path = new Path(old_st);
fs.delete(hdfs_path);
} catch (IOException e) {
e.printStackTrace();
}
// 從服務器本地傳到新路徑
new_st = new_st + old_st.substring(old_st.lastIndexOf(“/”));
boolean uplod_flag = sendToHdfs1(“/home/hadoop/文檔/temp”, new_st);
if (down_flag uplod_flag) {
return true;
}
} catch (Exception e) {
e.printStackTrace();
}
return false;
}
// copy本地文件到hdfs
private static void CopyFromLocalFile(Configuration conf) throws Exception {
FileSystem fs = FileSystem.get(conf);
Path src = new Path(“/home/hadoop/word.txt”);
Path dst = new Path(“/user/hadoop/data/”);
fs.copyFromLocalFile(src, dst);
fs.close();
}
// 獲取給定目錄下的所有子目錄以及子文件
private static void getAllChildFile(Configuration conf) throws Exception {
FileSystem fs = FileSystem.get(conf);
Path path = new Path(“/user/hadoop”);
getFile(path, fs);
}
private static void getFile(Path path, FileSystem fs)throws Exception {
FileStatus[] fileStatus = fs.listStatus(path);
for (int i = 0; i fileStatus.length; i++) {
if (fileStatus[i].isDir()) {
Path p = new Path(fileStatus[i].getPath().toString());
getFile(p, fs);
} else {
System.out.println(fileStatus[i].getPath().toString());
}
}
}
//判斷文件是否存在
private static boolean isExist(Configuration conf,String path)throws Exception{
FileSystem fileSystem = FileSystem.get(conf);
return fileSystem.exists(new Path(path));
}
//獲取hdfs集群所有主機結點數據
private static void getAllClusterNodeInfo(Configuration conf)throws Exception{
FileSystem fs = FileSystem.get(conf);
DistributedFileSystem hdfs = (DistributedFileSystem)fs;
DatanodeInfo[] dataNodeStats = hdfs.getDataNodeStats();
String[] names = new String[dataNodeStats.length];
System.out.println(“list of all the nodes in HDFS cluster:”); //print info
for(int i=0; i dataNodeStats.length; i++){
names[i] = dataNodeStats[i].getHostName();
System.out.println(names[i]); //print info
}
}
//get the locations of a file in HDFS
private static void getFileLocation(Configuration conf)throws Exception{
FileSystem fs = FileSystem.get(conf);
Path f = new Path(“/user/cluster/dfs.txt”);
FileStatus filestatus = fs.getFileStatus(f);
BlockLocation[] blkLocations = fs.getFileBlockLocations(filestatus,0,filestatus.getLen());
int blkCount = blkLocations.length;
for(int i=0; i blkCount; i++){
String[] hosts = blkLocations[i].getHosts();
//Do sth with the block hosts
System.out.println(hosts);
}
}
//get HDFS file last modification time
private static void getModificationTime(Configuration conf)throws Exception{
FileSystem fs = FileSystem.get(conf);
Path f = new Path(“/user/cluster/dfs.txt”);
FileStatus filestatus = fs.getFileStatus(f);
long modificationTime = filestatus.getModificationTime(); // measured in milliseconds since the epoch
Date d = new Date(modificationTime);
System.out.println(d);
}
}
原創文章,作者:ICPT,如若轉載,請註明出處:https://www.506064.com/zh-hant/n/139368.html