javahdfs的簡單介紹

本文目錄一覽:

使用Java API操作HDFS時,_方法用於獲取文件列表?

當使用 Java API 操作 HDFS 時,可以使用 FileSystem.listFiles() 方法來獲取文件列表。該方法接受一個 Path 對象,表示要列舉文件的目錄,並返回一個 RemoteIteratorLocatedFileStatus 對象,該對象可用於迭代目錄中的文件。

例如,下面的代碼演示了如何使用 listFiles() 方法來獲取 HDFS 上的文件列表:

// 定義 HDFS 連接配置

Configuration conf = new Configuration();

// 獲取 HDFS FileSystem 對象

FileSystem fs = FileSystem.get(conf);

// 定義要列舉文件的目錄

Path dirPath = new Path(“/user/hadoop”);

// 獲取文件列表

RemoteIteratorLocatedFileStatus fileIter = fs.listFiles(dirPath, true);

// 遍歷文件列表

while (fileIter.hasNext()) {

// 獲取當前文件信息

LocatedFileStatus fileStatus = fileIter.next();

// 輸出文件名稱和大小

System.out.println(fileStatus.getPath().getName() + ” : ” + fileStatus.getLen());

}

java怎麼連接hdfs文件系統,需要哪些包?

apache的Hadoop項目提供一類api可以通過java工程操作hdfs中的文件,包括:文件打開,讀寫,刪除等、目錄的創建,刪除,讀取目錄中所有文件等。

1、到下載Hadoop,解壓後把所有jar加入項目的lib里

2、程序處理步驟: 1)得到Configuration對象,2)得到FileSystem對象,3)進行文件操作,簡單示例如下:

/**

*

*/

package org.jrs.wlh;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.FSDataInputStream;

import org.apache.hadoop.fs.FSDataOutputStream;

import org.apache.hadoop.fs.FileStatus;

import org.apache.hadoop.fs.FileSystem;

import org.apache.hadoop.fs.Path;

/**

* @PutMeger.java

* java操作hdfs 往 hdfs中上傳數據

* @version $Revision$/br

* update: $Date$

*/

public class PutMeger {

public static void main(String[] args) throws IOException {

String[] str = new String[]{“E:\\hadoop\\UploadFileClient.java”,”hdfs://master:9000/user/hadoop/inccnt.java”};

Configuration conf = new Configuration();

FileSystem fileS= FileSystem.get(conf);

FileSystem localFile = FileSystem.getLocal(conf); //得到一個本地的FileSystem對象

Path input = new Path(str[0]); //設定文件輸入保存路徑

Path out = new Path(str[1]); //文件到hdfs輸出路徑

try{

FileStatus[] inputFile = localFile.listStatus(input); //listStatus得到輸入文件路徑的文件列表

FSDataOutputStream outStream = fileS.create(out); //創建輸出流

for (int i = 0; i inputFile.length; i++) {

System.out.println(inputFile[i].getPath().getName());

FSDataInputStream in = localFile.open(inputFile[i].getPath());

byte buffer[] = new byte[1024];

int bytesRead = 0;

while((bytesRead = in.read(buffer))0){ //按照字節讀取數據

System.out.println(buffer);

outStream.write(buffer,0,bytesRead);

}

in.close();

}

}catch(Exception e){

e.printStackTrace();

}

}

}

java修改hdfs上文件權限問題

看來你是開啟了HDFS的權限檢查功能,這樣你訪問HDFS,NameNode都會檢查訪問用戶的權限的。

你現在想要修改/process/startall.txt文件的權限,那process目錄以及startall.txt的有效用戶、有效組以及其權限是什麼呢?

假設process目錄與startall.txt原始的有效用戶和有效組分別為root和supergroup,原始權限為750的話,你若在自己電腦運行上述程序,它會自動獲取當前計算機的登錄用戶,假設為wyc,去訪問HDFS,很顯然,你的程序連process目錄都進不去的。

此外,想要更改一個目錄或文件的權限,當前用戶則必須是有效用戶或超級用戶才可以。

想要解決的話,嘿嘿, 如果你設置的hadoop.security.authentication property,也就是認證方式為simple的話(默認就是simple),那還可以鑽該認證方式的空子,運行程序是偽裝成有效用戶或者超級用戶即可。

此外,有一行代碼需要修改一下,我在實驗後發現設置權限那一行有誤,如下:

//hdfs.setPermission(dstPath, new FsPermission((short) 775));

hdfs.setPermission(dstPath, new FsPermission(“755”));

用java向hdfs上傳文件時,如何實現斷點續傳

@Component(“javaLargeFileUploaderServlet”)

@WebServlet(name = “javaLargeFileUploaderServlet”, urlPatterns = { “/javaLargeFileUploaderServlet” })

public class UploadServlet extends HttpRequestHandlerServlet

implements HttpRequestHandler {

private static final Logger log = LoggerFactory.getLogger(UploadServlet.class);

@Autowired

UploadProcessor uploadProcessor;

@Autowired

FileUploaderHelper fileUploaderHelper;

@Autowired

ExceptionCodeMappingHelper exceptionCodeMappingHelper;

@Autowired

Authorizer authorizer;

@Autowired

StaticStateIdentifierManager staticStateIdentifierManager;

@Override

public void handleRequest(HttpServletRequest request, HttpServletResponse response)

throws IOException {

log.trace(“Handling request”);

Serializable jsonObject = null;

try {

// extract the action from the request

UploadServletAction actionByParameterName =

UploadServletAction.valueOf(fileUploaderHelper.getParameterValue(request, UploadServletParameter.action));

// check authorization

checkAuthorization(request, actionByParameterName);

// then process the asked action

jsonObject = processAction(actionByParameterName, request);

// if something has to be written to the response

if (jsonObject != null) {

fileUploaderHelper.writeToResponse(jsonObject, response);

}

}

// If exception, write it

catch (Exception e) {

exceptionCodeMappingHelper.processException(e, response);

}

}

private void checkAuthorization(HttpServletRequest request, UploadServletAction actionByParameterName)

throws MissingParameterException, AuthorizationException {

// check authorization

// if its not get progress (because we do not really care about authorization for get

// progress and it uses an array of file ids)

if (!actionByParameterName.equals(UploadServletAction.getProgress)) {

// extract uuid

final String fileIdFieldValue = fileUploaderHelper.getParameterValue(request, UploadServletParameter.fileId, false);

// if this is init, the identifier is the one in parameter

UUID clientOrJobId;

String parameter = fileUploaderHelper.getParameterValue(request, UploadServletParameter.clientId, false);

if (actionByParameterName.equals(UploadServletAction.getConfig) parameter != null) {

clientOrJobId = UUID.fromString(parameter);

}

// if not, get it from manager

else {

clientOrJobId = staticStateIdentifierManager.getIdentifier();

}

// call authorizer

authorizer.getAuthorization(

request,

actionByParameterName,

clientOrJobId,

fileIdFieldValue != null ? getFileIdsFromString(fileIdFieldValue).toArray(new UUID[] {}) : null);

}

}

private Serializable processAction(UploadServletAction actionByParameterName, HttpServletRequest request)

throws Exception {

log.debug(“Processing action ” + actionByParameterName.name());

Serializable returnObject = null;

switch (actionByParameterName) {

case getConfig:

String parameterValue = fileUploaderHelper.getParameterValue(request, UploadServletParameter.clientId, false);

returnObject =

uploadProcessor.getConfig(

parameterValue != null ? UUID.fromString(parameterValue) : null);

break;

case verifyCrcOfUncheckedPart:

returnObject = verifyCrcOfUncheckedPart(request);

break;

case prepareUpload:

returnObject = prepareUpload(request);

break;

case clearFile:

uploadProcessor.clearFile(UUID.fromString(fileUploaderHelper.getParameterValue(request, UploadServletParameter.fileId)));

break;

case clearAll:

uploadProcessor.clearAll();

break;

case pauseFile:

ListUUID uuids = getFileIdsFromString(fileUploaderHelper.getParameterValue(request, UploadServletParameter.fileId));

uploadProcessor.pauseFile(uuids);

break;

case resumeFile:

returnObject =

uploadProcessor.resumeFile(UUID.fromString(fileUploaderHelper.getParameterValue(request, UploadServletParameter.fileId)));

break;

case setRate:

uploadProcessor.setUploadRate(UUID.fromString(fileUploaderHelper.getParameterValue(request, UploadServletParameter.fileId)),

Long.valueOf(fileUploaderHelper.getParameterValue(request, UploadServletParameter.rate)));

break;

case getProgress:

returnObject = getProgress(request);

break;

}

return returnObject;

}

ListUUID getFileIdsFromString(String fileIds) {

String[] splittedFileIds = fileIds.split(“,”);

ListUUID uuids = Lists.newArrayList();

for (int i = 0; i splittedFileIds.length; i++) {

uuids.add(UUID.fromString(splittedFileIds[i]));

}

return uuids;

}

private Serializable getProgress(HttpServletRequest request)

throws MissingParameterException {

Serializable returnObject;

String[] ids =

new Gson()

.fromJson(fileUploaderHelper.getParameterValue(request, UploadServletParameter.fileId), String[].class);

CollectionUUID uuids = Collections2.transform(Arrays.asList(ids), new FunctionString, UUID() {

@Override

public UUID apply(String input) {

return UUID.fromString(input);

}

});

returnObject = Maps.newHashMap();

for (UUID fileId : uuids) {

try {

ProgressJson progress = uploadProcessor.getProgress(fileId);

((HashMapString, ProgressJson) returnObject).put(fileId.toString(), progress);

}

catch (FileNotFoundException e) {

log.debug(“No progress will be retrieved for ” + fileId + ” because ” + e.getMessage());

}

}

return returnObject;

}

private Serializable prepareUpload(HttpServletRequest request)

throws MissingParameterException, IOException {

// extract file information

PrepareUploadJson[] fromJson =

new Gson()

.fromJson(fileUploaderHelper.getParameterValue(request, UploadServletParameter.newFiles), PrepareUploadJson[].class);

// prepare them

final HashMapString, UUID prepareUpload = uploadProcessor.prepareUpload(fromJson);

// return them

return Maps.newHashMap(Maps.transformValues(prepareUpload, new FunctionUUID, String() {

public String apply(UUID input) {

return input.toString();

};

}));

}

private Boolean verifyCrcOfUncheckedPart(HttpServletRequest request)

throws IOException, MissingParameterException, FileCorruptedException, FileStillProcessingException {

UUID fileId = UUID.fromString(fileUploaderHelper.getParameterValue(request, UploadServletParameter.fileId));

try {

uploadProcessor.verifyCrcOfUncheckedPart(fileId,

fileUploaderHelper.getParameterValue(request, UploadServletParameter.crc));

}

catch (InvalidCrcException e) {

// no need to log this exception, a fallback behaviour is defined in the

// throwing method.

// but we need to return something!

return Boolean.FALSE;

}

return Boolean.TRUE;

}

}

HDFS 系統架構

HDFS Architecture

Hadoop Distributed File System (HDFS) 是設計可以運行於普通商業硬件上的分布式文件系統。它跟現有的分布式文件系統有很多相通的地方,但是區別也是顯著的。HDFS具有高度容錯性能,被設計運行於低成本硬件上。HDFS可以嚮應用提供高吞吐帶寬,適合於大數據應用。HDFS 放寬了一些 POSIX 的要求,以開啟對文件系統數據的流式訪問。HDFS 最初是作為Apache Nutch web 搜索引擎項目的基礎設施開發的。HDFS 現在是 Apache Hadoop 核心項目的一部分。

HDFS是主從架構。一個HDFS集群包含一個NameNode,一個管理文件系統命名空間和控制客戶端訪問文件的master server。以及,若干的 DataNodes,通常集群的每個node一個,管理運行DataNode的節點上的存儲。HDFS 發布一個文件系統命名空間,並允許用戶數據已文件的形式存儲在上面。內部,一個文件被分成一個或多個塊,存儲在一組DataNodes上。NameNode 執行文件系統命名空間操作,比如:打開、關閉、重命名文件或目錄。它還確定塊到DataNodes的映射。DataNodes 負責向文件系統客戶端提供讀寫服務。DataNodes 根據 NameNode 的指令執行塊的創建、刪除以及複製。

NameNode 和 DataNode 是設計運行於普通商業機器的軟件。這些機器通常運行 GNU/Linux 操作系統。HDFS 是Java 語言編寫的;任何支持Java的機器都可以運行NameNode or DataNode 軟件。使用高移植性Java語言,意味着HDFS可以部署在很大範圍的機器上。一個典型的部署就是一台特定的機器只運行NameNode 軟件,而集群內的其他機器運行DataNode 軟件的一個實例。這種架構不排除一台機器上運行多個DataNodes ,但是在實際部署中很少見。

單 NameNode 節點的存在大大簡化了架構。NameNode 是所有HDFS 元數據的仲裁和倉庫。系統設計上,用戶數據永遠不經過NameNode。

HDFS 支持傳統的文件分級組織。用戶或應用可以創建目錄,並在目錄內存儲文件。 文件系統命名空間的層次結構跟其他文件系統類似;可以創建、刪除、移動、重命名文件。HDFS 支持 user quotas 和 access permissions 。 HDFS 不支持軟、硬鏈接。但是,HDFS 架構不排除實現這些功能。

雖然HDFS遵守 文件系統命名約定 ,一些路徑和名稱 (比如/.reserved 和.snapshot ) 保留了。比如功能 transparent encryption 和 snapshot 就使用的保留路徑。

NameNode 維護文件系統命名空間。任何文件系統命名空間或屬性的變化,都會被NameNode記錄。 應用可以指定HDFS應維護的文件副本數量。文件副本的數量被稱為該文件的複製因子 replication factor 。該信息存儲於NameNode。

HDFS 被設計用於在一個大規模集群上跨機器可靠地存儲巨大的文件。它以一序列的塊的方式存儲文件。每個文件都可以配置塊尺寸和複製因子。

一個文件除了最後一個塊外,其他的塊一樣大。在 append 和 hsync 添加了可變長度塊的支持後,用戶可以啟動一個新的塊,而不用填充最後一個塊到配置的塊大小。

應用可以指定一個文件的副本數量。複製因子可以在創建的時候指定,也可以以後更改。HDFS的文件只寫一次(除了 appends 和 truncates) ,並在任何時候只允許一個 writer 。

NameNode 指定塊複製的所有決策。它周期性的從集群的每個DataNodes 接受 Heartbeat 和 Blockreport。Heartbeat 的接受代表 DataNode 工作正常。Blockreport 包含了DataNode上所有塊的清單。

副本的位置對HDFS的可靠性和性能至關重要。副本位置的優化是HDFS和其他大多數分布式文件系統的區別。這是一個需要大量調優和經驗的特性。Rack-aware 複製策略的目的就是提高數據可靠性,可用性和網絡帶寬利用率。當前副本位置策略的實現是這個方向的第一步。實施該策略的短期目標是在生產環境驗證它,了解其更多的行為,為測試和研究更複雜的策略打下基礎。

大型HDFS實例運行在跨多個Rack的集群服務器上。不同rack的兩個node通信需要通過交換機。大多數情況下,同一rack內的帶寬大於rack之間的帶寬。

NameNode 通過在 Hadoop Rack Awareness 內的進程描述 判斷DataNode 屬於哪個rack id。一個簡單但是並非最佳的策略是將副本分布於不同的racks。這可以防止整個機架發生故障時丟失數據,並允許在讀取數據時使用多個機架的帶寬。該策略在群集中均勻地分布副本,使得組件故障時很容易平衡負載。 但是,該策略會增加寫入成本,因為寫入操作需要將塊傳輸到多個機架。

一般,複製因子設置為3, HDFS 的分布策略是:如果writer在datanode上則將一個副本放到本地機器, 如果writer不在datanode上則將一個副本放到writer所在機櫃的隨機datanode 上;另一個副本位於不同機架的node上;最後一個副本位於同一遠程機架的不同node上。 該策略減少了機架間的寫流量,提升了寫性能。機架故障的概率遠小於節點故障的概率;此策略不會影響數據可靠性和可用性承諾。但是,在讀取數據時,它確實減少了聚合帶寬,因為塊存儲於兩個機櫃而不是三個機櫃內。使用此策略,副本不會均勻的分布於機架上。1/3 副本 位於同一節點, 2/3 副本位於同一機架, 另1/3副本位於其他機架。該策略提升了寫性能而不影響數據可靠性和讀性能。

如果複製因子大於3,那麼第4個及以後的副本則隨機放置,只要滿足每個機架的副本在(replicas – 1) / racks + 2)之下。

因為 NameNode 不允許 DataNodes 擁有同一個塊的多個副本,所以副本的最大數就是DataNodes的數量。

在把對 存儲類型和存儲策略 的支持添加到 HDFS 後,除了上面介紹的rack awareness外, NameNode 會考慮其他副本排布的策略。NameNode 先基於rack awareness 選擇節點,然後檢查候選節點有文件關聯的策略需要的存儲空間。 如果候選節點沒有該存儲類型, NameNode 會查找其他節點。如果在第一條路徑中找不到足夠的節點來放置副本,NameNode會在第二條路徑中查找具有回滾存儲類型的節點。 、

當前,這裡描述的默認副本排布策略正在使用中。

為了最小化全局帶寬消耗和讀取延遲, HDFS 會嘗試從最靠近reader的副本響應讀取請求。如果在reader節點的同一機架上上存在副本,則該副本有限響應讀請求。如果HDFS集群跨多個數據中心,則本地數據中心優先。

啟動時,NameNode 會進入一個稱為 Safemode 的特殊狀態。當NameNode處於Safemode狀態時,不會複製數據塊。NameNode從DataNodes接收Heartbeat和Blockreport消息。Blockreport包含DataNode託管的數據塊列表。每個塊都指定了最小副本數。當數據塊的最小副本數已與NameNode簽入時,該塊被認為是安全複製的。在NameNode簽入安全複製數據塊的已配置百分比(加上額外的30秒)後,NameNode退出Safemode狀態。然後,它判斷列表內的數據塊清單是否少於副本指定的數量。NameNode 然後複製這些塊給其他 DataNodes。

HDFS 命名空間由 NameNode 存儲。NameNode 使用事務日誌 EditLog 來持久化的保存系統元數據的每次變更。比如,在HDFS創建一個新文件,NameNode會在 EditLog 插入一條記錄來指示該變更。類似的,變更文件的複製因子也會在 EditLog 插入一條新記錄。NameNode 以文件的形式,將 EditLog 保存在本地OS文件系統上。整個文件系統命名空間,包括塊到文件的映射、文件系統屬性,都存儲於名字為 FsImage 的文件內。 FsImage 也以文件的形式,存儲在NameNode的本地文件系統上。

NameNode 將包含整個文件系統和塊映射的image保存在內存中。當NameNode啟動時,或檢查點被預先定義的閾值觸發時,它會從磁盤讀取 FsImage 和 EditLog ,把 EditLog 內的事物應用到內存中的FsImage,再將新版本刷新回磁盤的新 FsImage 。然後會截斷舊的 EditLog ,因為它的事物已經應用到了持久化的 FsImage 上。 這個過程稱為檢查點 checkpoint 。檢查點的目的是通過對文件系統元數據進行快照並保存到FsImage,來確保HDFS擁有文件系統元數據的一致性視圖。儘管讀取 FsImage 是高效的,但是對 FsImage 直接增量修改是不高效的。不是對每次編輯修改 FsImage ,而是將每次編輯保存到 Editlog 。在檢查點期間,將 Editlog 的變更應用到 FsImage 。一個檢查點可以在固定周期(dfs.namenode.checkpoint.period)(以秒為單位)觸發,也可以文件系統事物數量達到某個值(dfs.namenode.checkpoint.txns)的時候觸發。

DataNode 在本地文件系統上以文件的形式存儲 HDFS data 。DataNode 不知道 HDFS 文件。它將HDFS data 的每個塊以獨立的文件存儲於本地文件系統上。DataNode 不在同一目錄創建所有的文件。而是,使用heuristic來確定每個目錄的最佳文件數量,並適當的創建子目錄。在一個目錄創建所有的本地文件是不好的,因為本地文件系統可能不支持單目錄的海量文件數量。當DataNode啟動的時候,它掃描本地文件系統,生成與本地文件系統一一對應的HDFS數據塊列表,然後報告給NameNode。這個報告稱為 Blockreport。

所有的HDFS通信協議都在TCP/IP協議棧上。客戶端與NameNode指定的端口建立連接。與NameNode以ClientProtocol 通信。DataNodes與NameNode以DataNode Protocol進行通信。遠程過程調用(RPC)封裝了Client Protocol 和 DataNode Protocol。設計上,NameNode從不啟動任何RPCs。相反,它只應答DataNodes or clients發出的RPC請求。

HDFS的主要目標是可靠的存儲數據,即使是在故障的情況下。常見故障類型有三種: NameNode failures , DataNode failures 和 network partitions 。

每個DataNode都周期性的向NameNode發送心跳信息。 一個 network partition 可能導致DataNodes子集丟失與NameNode的連接。NameNode會基於心跳信息的缺失來偵測這種情況。NameNode將沒有心跳信息的DataNodes標記為 dead ,並不再轉發任何IO請求給它們。任何註冊到dead DataNode的數據對HDFS將不再可用。DataNode death會導致某些塊的複製因子低於它們指定的值。NameNode不斷跟蹤需要複製的塊,並在必要時啟動複製。很多因素會導致重新複製:DataNode不可用,副本損壞,DataNode上硬盤故障,複製因子增加。

標記 DataNodes dead 的超時時間保守地設置了較長時間 (默認超過10分鐘) 以避免DataNodes狀態抖動引起的複製風暴。對於性能敏感的應用,用戶可以設置較短的周期來標記DataNodes為過期,讀寫時避免過期節點。

HDFS 架構支持數據再平衡schemes。如果一個DataNode的空餘磁盤空間低於閾值,sheme就會將數據從一個DataNode 移動到另外一個。在某些文件需求突然增長的情況下,sheme可能會在集群內動態的創建額外的副本,並再平衡其他數據。這些類型的數據再平衡schemes還沒有實現。

有可能從DataNode獲取的數據塊,到達的時候損壞了。這種損壞可能是由於存儲設備故障、網絡故障、軟件bug。HDFS客戶端軟件會HDFS的內容進行校驗。當客戶端創建HDFS文件的時候,它計算文件每個塊的校驗值,並以獨立的隱藏文件存儲在同一HDFS命名空間內。當客戶端檢索文件時候,它會校驗從每個DataNode獲取的數據,是否與關聯校驗文件內的校驗值匹配。 如果不匹配,客戶端可以從另外擁有副本塊的DataNode檢索。

FsImage 和 EditLog 是HDFS的核心數據結構。這些文件的損壞將導致HDFS實例異常。 因此,NameNode可以配置為支持多 FsImage 和 EditLog 副本模式。任何對 FsImage or EditLog 的更新都會導致每個 FsImages 和 EditLogs 的同步更新。 FsImage 和 EditLog 的同步更新會導致降低命名空間每秒的事物效率。但是,這種降級是可以接受的,因為HDFS應用是數據密集型,而不是元數據密集型。當NameNode重啟的時候,它會選擇最新的一致的 FsImage 和 EditLog 。

另外一種提供故障恢復能力的辦法是多NameNodes 開啟HA,以 shared storage on NFS or distributed edit log (called Journal)的方式。推薦後者。

Snapshots – 快照,支持在特定時刻存儲數據的副本。快照功能的一個用法,可以回滾一個故障的HDFS實例到已知工作良好的時候。

HDFS被設計與支持超大的文件。與HDFS適配的軟件都是處理大數據的。這些應用都只寫一次,但是它們會讀取一或多次,並且需要滿足流式讀速度。HDFS支持文件的 一次寫入-多次讀取 語義。 HDFS典型的塊大小是128 MB.。因此,HDFS文件被分割為128 MB的塊,可能的話每個塊都位於不同的DataNode上。

當客戶端以複製因子3寫入HDFS文件時,NameNode以 複製目標選擇算法 replication target choosing algorithm 檢索DataNodes 列表。該列表包含了承載該數據塊副本的DataNodes清單。然後客戶端寫入到第一個DataNode。第一DataNode逐步接受數據的一部分,將每一部分內容寫入到本地倉庫,並將該部分數據傳輸給清單上的第二DataNode。第二DataNode,按順序接受數據塊的每個部分,寫入到倉庫,然後將該部分數據刷新到第三DataNode。最終,第三DataNode將數據寫入到其本地倉庫。

因此,DataNode從管道的前一個DataNode獲取數據,同時轉發到管道的後一個DataNode。因此,數據是以管道的方式從一個DataNode傳輸到下一個的。

應用訪問HDFS有很多方式。原生的,HDFS 提供了 FileSystem Java API 來給應用調用。還提供了 C language wrapper for this Java API 和 REST API 。另外,還支持HTTP瀏覽器查看HDFS實例的文件。 通過使用 NFS gateway ,HDFS還可以掛載到客戶端作為本地文件系統的一部分。

HDFS的用戶數據是以文件和目錄的形式組織的。它提供了一個命令行接口 FS shell 來提供用戶交互。命令的語法類似於其他shell (比如:bash, csh)。如下是一些範例:

FS shell 的目標是向依賴於腳本語言的應用提供與存儲數據的交互。

DFSAdmin 命令用於管理HDFS集群。這些命令僅給HDFS管理員使用。如下範例:

如果啟用了回收站配置,那麼文件被 FS Shell 移除時並不會立即從HDFS刪除。HDFS會將其移動到回收站目錄(每個用戶都有回收站,位於 /user/username/.Trash )。只要文件還在回收站內,就可以快速恢復。

最近刪除的文件大多數被移動到 current 回收站目錄 ( /user/username/.Trash/Current ),在配置周期內,HDFS給 current目錄內的文件創建檢查點 checkpoints (位於 /user/username/.Trash/date ) ,並刪除舊的檢查點。參考 expunge command of FS shell 獲取更多關於回收站檢查點的信息。

在回收站過期後,NameNode從HDFS命名空間刪除文件。刪除文件會將文件關聯的塊釋放。注意,在用戶刪除文件和HDFS增加free空間之間,會有一個明顯的延遲。

如下範例展示了FS Shell如何刪除文件。我們在delete目錄下創建兩個文件(test1 test2)

我們刪除文件 test1。如下命令顯示文件被移動到回收站。

現在我們嘗試以skipTrash參數刪除文件,該參數將不將文件發送到回收站。文件將會從HDFS完全刪除。

我們檢查回收站,只有文件test1。

如上,文件test1進了回收站,文件test2被永久刪除了。

當縮減文件的複製因子時,NameNode選擇可以被刪除的多餘副本。下一個Heartbeat會通報此信息給DataNode。DataNode然後會刪除響應的塊,相應的剩餘空間會顯示在集群內。同樣,在setReplication API調用完成和剩餘空間在集群顯示之間會有一個時間延遲。

Hadoop JavaDoc API .

HDFS source code:

如何使用Java API讀寫HDFS

Java API讀寫HDFS

public class FSOptr {

/**

* @param args

*/

public static void main(String[] args) throws Exception {

// TODO Auto-generated method stub

Configuration conf = new Configuration();

makeDir(conf);

rename(conf);

delete(conf);

}

// 創建文件目錄

private static void makeDir(Configuration conf) throws Exception {

FileSystem fs = FileSystem.get(conf);

Path dir = new Path(“/user/hadoop/data/20140318”);

boolean result = fs.mkdirs(dir);// 創建文件夾

System.out.println(“make dir :” + result);

// 創建文件,並寫入內容

Path dst = new Path(“/user/hadoop/data/20140318/tmp”);

byte[] buff = “hello,hadoop!”.getBytes();

FSDataOutputStream outputStream = fs.create(dst);

outputStream.write(buff, 0, buff.length);

outputStream.close();

FileStatus files[] = fs.listStatus(dst);

for (FileStatus file : files) {

System.out.println(file.getPath());

}

fs.close();

}

// 重命名文件

private static void rename(Configuration conf) throws Exception {

FileSystem fs = FileSystem.get(conf);

Path oldName = new Path(“/user/hadoop/data/20140318/1.txt”);

Path newName = new Path(“/user/hadoop/data/20140318/2.txt”);

fs.rename(oldName, newName);

FileStatus files[] = fs.listStatus(new Path(

“/user/hadoop/data/20140318”));

for (FileStatus file : files) {

System.out.println(file.getPath());

}

fs.close();

}

// 刪除文件

@SuppressWarnings(“deprecation”)

private static void delete(Configuration conf) throws Exception {

FileSystem fs = FileSystem.get(conf);

Path path = new Path(“/user/hadoop/data/20140318”);

if (fs.isDirectory(path)) {

FileStatus files[] = fs.listStatus(path);

for (FileStatus file : files) {

fs.delete(file.getPath());

}

} else {

fs.delete(path);

}

// 或者

fs.delete(path, true);

fs.close();

}

/**

* 下載,將hdfs文件下載到本地磁盤

*

* @param localSrc1

* 本地的文件地址,即文件的路徑

* @param hdfsSrc1

* 存放在hdfs的文件地址

*/

public boolean sendFromHdfs(String hdfsSrc1, String localSrc1) {

Configuration conf = new Configuration();

FileSystem fs = null;

try {

fs = FileSystem.get(URI.create(hdfsSrc1), conf);

Path hdfs_path = new Path(hdfsSrc1);

Path local_path = new Path(localSrc1);

fs.copyToLocalFile(hdfs_path, local_path);

return true;

} catch (IOException e) {

e.printStackTrace();

}

return false;

}

/**

* 上傳,將本地文件copy到hdfs系統中

*

* @param localSrc

* 本地的文件地址,即文件的路徑

* @param hdfsSrc

* 存放在hdfs的文件地址

*/

public boolean sendToHdfs1(String localSrc, String hdfsSrc) {

InputStream in;

try {

in = new BufferedInputStream(new FileInputStream(localSrc));

Configuration conf = new Configuration();// 得到配置對象

FileSystem fs; // 文件系統

try {

fs = FileSystem.get(URI.create(hdfsSrc), conf);

// 輸出流,創建一個輸出流

OutputStream out = fs.create(new Path(hdfsSrc),

new Progressable() {

// 重寫progress方法

public void progress() {

// System.out.println(“上傳完一個設定緩存區大小容量的文件!”);

}

});

// 連接兩個流,形成通道,使輸入流向輸出流傳輸數據,

IOUtils.copyBytes(in, out, 10240, true); // in為輸入流對象,out為輸出流對象,4096為緩衝區大小,true為上傳後關閉流

return true;

} catch (IOException e) {

e.printStackTrace();

}

} catch (FileNotFoundException e) {

e.printStackTrace();

}

return false;

}

/**

* 移動

*

* @param old_st原來存放的路徑

* @param new_st移動到的路徑

*/

public boolean moveFileName(String old_st, String new_st) {

try {

// 下載到服務器本地

boolean down_flag = sendFromHdfs(old_st, “/home/hadoop/文檔/temp”);

Configuration conf = new Configuration();

FileSystem fs = null;

// 刪除源文件

try {

fs = FileSystem.get(URI.create(old_st), conf);

Path hdfs_path = new Path(old_st);

fs.delete(hdfs_path);

} catch (IOException e) {

e.printStackTrace();

}

// 從服務器本地傳到新路徑

new_st = new_st + old_st.substring(old_st.lastIndexOf(“/”));

boolean uplod_flag = sendToHdfs1(“/home/hadoop/文檔/temp”, new_st);

if (down_flag uplod_flag) {

return true;

}

} catch (Exception e) {

e.printStackTrace();

}

return false;

}

// copy本地文件到hdfs

private static void CopyFromLocalFile(Configuration conf) throws Exception {

FileSystem fs = FileSystem.get(conf);

Path src = new Path(“/home/hadoop/word.txt”);

Path dst = new Path(“/user/hadoop/data/”);

fs.copyFromLocalFile(src, dst);

fs.close();

}

// 獲取給定目錄下的所有子目錄以及子文件

private static void getAllChildFile(Configuration conf) throws Exception {

FileSystem fs = FileSystem.get(conf);

Path path = new Path(“/user/hadoop”);

getFile(path, fs);

}

private static void getFile(Path path, FileSystem fs)throws Exception {

FileStatus[] fileStatus = fs.listStatus(path);

for (int i = 0; i fileStatus.length; i++) {

if (fileStatus[i].isDir()) {

Path p = new Path(fileStatus[i].getPath().toString());

getFile(p, fs);

} else {

System.out.println(fileStatus[i].getPath().toString());

}

}

}

//判斷文件是否存在

private static boolean isExist(Configuration conf,String path)throws Exception{

FileSystem fileSystem = FileSystem.get(conf);

return fileSystem.exists(new Path(path));

}

//獲取hdfs集群所有主機結點數據

private static void getAllClusterNodeInfo(Configuration conf)throws Exception{

FileSystem fs = FileSystem.get(conf);

DistributedFileSystem hdfs = (DistributedFileSystem)fs;

DatanodeInfo[] dataNodeStats = hdfs.getDataNodeStats();

String[] names = new String[dataNodeStats.length];

System.out.println(“list of all the nodes in HDFS cluster:”); //print info

for(int i=0; i dataNodeStats.length; i++){

names[i] = dataNodeStats[i].getHostName();

System.out.println(names[i]); //print info

}

}

//get the locations of a file in HDFS

private static void getFileLocation(Configuration conf)throws Exception{

FileSystem fs = FileSystem.get(conf);

Path f = new Path(“/user/cluster/dfs.txt”);

FileStatus filestatus = fs.getFileStatus(f);

BlockLocation[] blkLocations = fs.getFileBlockLocations(filestatus,0,filestatus.getLen());

int blkCount = blkLocations.length;

for(int i=0; i blkCount; i++){

String[] hosts = blkLocations[i].getHosts();

//Do sth with the block hosts

System.out.println(hosts);

}

}

//get HDFS file last modification time

private static void getModificationTime(Configuration conf)throws Exception{

FileSystem fs = FileSystem.get(conf);

Path f = new Path(“/user/cluster/dfs.txt”);

FileStatus filestatus = fs.getFileStatus(f);

long modificationTime = filestatus.getModificationTime(); // measured in milliseconds since the epoch

Date d = new Date(modificationTime);

System.out.println(d);

}

}

原創文章,作者:ICPT,如若轉載,請註明出處:https://www.506064.com/zh-hant/n/139368.html

(0)
打賞 微信掃一掃 微信掃一掃 支付寶掃一掃 支付寶掃一掃
ICPT的頭像ICPT
上一篇 2024-10-04 00:22
下一篇 2024-10-04 00:22

相關推薦

  • Python簡單數學計算

    本文將從多個方面介紹Python的簡單數學計算,包括基礎運算符、函數、庫以及實際應用場景。 一、基礎運算符 Python提供了基礎的算術運算符,包括加(+)、減(-)、乘(*)、除…

    編程 2025-04-29
  • Python滿天星代碼:讓編程變得更加簡單

    本文將從多個方面詳細闡述Python滿天星代碼,為大家介紹它的優點以及如何在編程中使用。無論是剛剛接觸編程還是資深程序員,都能從中獲得一定的收穫。 一、簡介 Python滿天星代碼…

    編程 2025-04-29
  • Python海龜代碼簡單畫圖

    本文將介紹如何使用Python的海龜庫進行簡單畫圖,並提供相關示例代碼。 一、基礎用法 使用Python的海龜庫,我們可以控制一個小海龜在窗口中移動,並利用它的“畫筆”在窗口中繪製…

    編程 2025-04-29
  • Python櫻花樹代碼簡單

    本文將對Python櫻花樹代碼進行詳細的闡述和講解,幫助讀者更好地理解該代碼的實現方法。 一、簡介 櫻花樹是一種圖形效果,它的實現方法比較簡單。Python中可以通過turtle這…

    編程 2025-04-28
  • Python大神作品:讓編程變得更加簡單

    Python作為一種高級的解釋性編程語言,一直被廣泛地運用於各個領域,從Web開發、遊戲開發到人工智能,Python都扮演着重要的角色。Python的代碼簡潔明了,易於閱讀和維護,…

    編程 2025-04-28
  • 用Python實現簡單爬蟲程序

    在當今時代,互聯網上的信息量是爆炸式增長的,其中很多信息可以被利用。對於數據分析、數據挖掘或者其他一些需要大量數據的任務,我們可以使用爬蟲技術從各個網站獲取需要的信息。而Pytho…

    編程 2025-04-28
  • 如何製作一個簡單的換裝遊戲

    本文將從以下幾個方面,為大家介紹如何製作一個簡單的換裝遊戲: 1. 遊戲需求和界面設計 2. 使用HTML、CSS和JavaScript開發遊戲 3. 實現遊戲的基本功能:拖拽交互…

    編程 2025-04-27
  • Guava Limiter——限流器的簡單易用

    本文將從多個維度對Guava Limiter進行詳細闡述,介紹其定義、使用方法、工作原理和案例應用等方面,並給出完整的代碼示例,希望能夠幫助讀者更好地了解和使用該庫。 一、定義 G…

    編程 2025-04-27
  • 2的32次方-1:一個看似簡單卻又複雜的數字

    對於計算機領域的人來說,2的32次方-1(也就是十進制下的4294967295)這個數字並不陌生。它經常被用來表示IPv4地址或者無符號32位整數的最大值。但實際上,這個數字卻包含…

    編程 2025-04-27
  • 製作一個簡單的管理系統的成本及實現

    想要製作一個簡單的管理系統,需要進行技術選型、開發、測試等過程,那麼這個過程會花費多少錢呢?我們將從多個方面來闡述製作一個簡單的管理系統的成本及實現。 一、技術選型 當我們開始思考…

    編程 2025-04-27

發表回復

登錄後才能評論