一、pom.xml
4.0.0 yofc root 1.0-SNAPSHOT org.apache.hadoop hadoop-client 2.9.2 junit junit 4.12 test org.apache.maven.plugins maven-compiler-plugin 3.8.0 UTF-8
二、测试
import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.log4j.BasicConfigurator;import org.junit.After;import org.junit.Before;import org.junit.Test;import java.io.IOException;import java.net.URI;public class HDFSClient { private Configuration conf; private FileSystem fs; @Before public void init() throws Exception { // 设置 HADOOP_HOME 环境变量 System.setProperty("hadoop.home.dir", "D:/DevelopTools/hadoop-2.9.2/"); // 日志初始化 BasicConfigurator.configure(); conf = new Configuration(); //conf.set("fs.defaultFS", "hdfs://192.168.8.136:9000"); //fs = FileSystem.get(conf ); // 获取 hdfs 客户端对象,指定用户名,避免无权限 fs = FileSystem.get(new URI("hdfs://192.168.8.136:9000"), conf, "root"); } @After public void close() throws IOException { fs.close(); } // 在 hdfs 上创建文件夹 @Test public void mkdirs() throws IOException { fs.mkdirs(new Path("/10086/")); }}
文件上传
@Testpublic void testCopyFromLocalFile() throws Exception{ fs.copyFromLocalFile(new Path("D:/MyFile/Downloads/Writage-1.12.msi"), new Path("/Writage-1.12.msi"));}
手动 IO 流方式
@Testpublic void putFileToHDFS() throws Exception{ // 获取输入流 FileInputStream fis = new FileInputStream(new File("D://MyFile/Downloads/Writage-1.12.msi")); // 获取输出流 FSDataOutputStream fos = fs.create(new Path("/Writage-1.12.msi")); // 流的对拷 IOUtils.copyBytes(fis, fos, conf); // 关闭资源 IOUtils.closeStream(fos); IOUtils.closeStream(fis);}
文件下载
@Testpublic void testCopyToLocalFile() throws Exception{ // fs.copyToLocalFile(new Path("/AAA.txt"), new Path("d:/BBB.txt")); /** * delSrc:是否删除原数据 * src:hdfs 路径 * dst:本地 路径 * useRawLocalFileSystem:crc 文件完整性校验 */ fs.copyToLocalFile(false, new Path("/Writage-1.12.msi"), new Path("D://Writage.msi"), true);}
手动 IO 流方式
@Testpublic void getFileFromHDFS() throws Exception{ // 获取输入流 FSDataInputStream fis = fs.open(new Path("/Writage-1.12.msi")); // 获取输出流 FileOutputStream fos = new FileOutputStream(new File("D://Writage.msi")); // 流的对拷 IOUtils.copyBytes(fis, fos, conf); // 关闭资源 IOUtils.closeStream(fos); IOUtils.closeStream(fis);}
分块方式,这里要下载的文件被 hdfs 切割成了 3 块
// 下载第一块@Testpublic void readFileSeek1() throws Exception{ // 获取输入流 FSDataInputStream fis = fs.open(new Path("/hadoop-2.9.2-win10-64.tar.gz")); // 获取输出流 FileOutputStream fos = new FileOutputStream(new File("D://hadoop-2.9.2-win10-64.tar.gz")); // 流的对拷(只拷贝128m) byte[] buf = new byte[1024]; for (int i = 0; i < 1024 * 128; i++) { fis.read(buf); fos.write(buf); } // 关闭资源 IOUtils.closeStream(fos); IOUtils.closeStream(fis);}// 下载第二块@Testpublic void readFileSeek2() throws Exception{ // 获取输入流 FSDataInputStream fis = fs.open(new Path("/hadoop-2.9.2-win10-64.tar.gz")); // 设置指定读取的起点 fis.seek(1024*1024*128); // 获取输出流 FileOutputStream fos = new FileOutputStream(new File("D://hadoop-2.9.2-win10-64.tar.gz2")); // 流的对拷(只拷贝128m) byte[] buf = new byte[1024]; for (int i = 0; i < 1024 * 128; i++) { fis.read(buf); fos.write(buf); } // 关闭资源 IOUtils.closeStream(fos); IOUtils.closeStream(fis);}// 下载第三块@Testpublic void readFileSeek3() throws Exception{ // 获取输入流 FSDataInputStream fis = fs.open(new Path("/hadoop-2.9.2-win10-64.tar.gz")); // 设置指定读取的起点 fis.seek(1024*1024*128*2); // 获取输出流 FileOutputStream fos = new FileOutputStream(new File("D://hadoop-2.9.2-win10-64.tar.gz3")); // 流的对拷 IOUtils.copyBytes(fis, fos, conf); // 关闭资源 IOUtils.closeStream(fos); IOUtils.closeStream(fis);}
分块下载完毕后合并文件
# Windows 环境下# 将 hadoop-2.9.2-win10-64.tar.gz2 追加到 hadoop-2.9.2-win10-64.tar.gztype hadoop-2.9.2-win10-64.tar.gz2 >> hadoop-2.9.2-win10-64.tar.gz# 将 hadoop-2.9.2-win10-64.tar.gz3 追加到 hadoop-2.9.2-win10-64.tar.gztype hadoop-2.9.2-win10-64.tar.gz3 >> hadoop-2.9.2-win10-64.tar.gz# 最后 hadoop-2.9.2-win10-64.tar.gz 就是一个完整的文件了
文件删除
@Testpublic void testDelete() throws Exception{ /** * var1:hdfs 路径 * var2:是否递归删除,若为文件夹则必须为 true */ fs.delete(new Path("/Writage-1.12.msi"), true);}
重命名
@Testpublic void testRename() throws Exception{ // 把根目录的 10086 改为 mkmk fs.rename(new Path("/10086/"), new Path("/mkmk/"));}
查看文件详情
@Testpublic void testListFiles() throws Exception{ // 递归获取根目录下的所有文件 RemoteIteratorlistFiles = fs.listFiles(new Path("/"), true); while(listFiles.hasNext()){ LocatedFileStatus fileStatus = listFiles.next(); // 文件名称 System.out.println(fileStatus.getPath().getName()); // 文件权限 System.out.println(fileStatus.getPermission()); // 文件长度 System.out.println(fileStatus.getLen()); // 文件块信息 BlockLocation[] blockLocations = fileStatus.getBlockLocations(); for (BlockLocation blockLocation : blockLocations) { // 文件块所在的主机名 String[] hosts = blockLocation.getHosts(); for (String host : hosts) { System.out.println(host); } } System.out.println("-------------------"); }}
判断是文件还是文件夹
@Testpublic void testListStatus() throws Exception{ FileStatus[] listStatus = fs.listStatus(new Path("/")); for (FileStatus fileStatus : listStatus) { if (fileStatus.isFile()) { System.out.println("文件:"+fileStatus.getPath().getName()); }else{ System.out.println("文件夹:"+fileStatus.getPath().getName()); } }}