标签:.text -- efault 文件系统 put import miss cto params
离线分析 : hadoop生态圈 HDFS, MapReduce(概念偏多), hive(底层是MapReduce), 离线业务分析80%都是使用hive
实时分析 : spark
数据结构 : 二叉树(面试) 动态规划, redis数据库, SSM三大框架
1. spring
2. springMVC
3. mybatis
@Test
public void mkdirTest() throws IOException {
//1 获得文件系统的连接
Configuration config=new Configuration();
config.set("fs.defaultFS", "hdfs://10.0.152.47:8020");
FileSystem fs=FileSystem.get(config);
//2 操作HDFS API 创建目录
fs.mkdirs(new Path("/customMk"));
//3 关闭资源
fs.close();
System.out.println("创建完成");
}
@Test
public void deleteMk() throws IOException {
//1 获得文件系统的连接
Configuration config=new Configuration();
config.set("fs.defaultFS", "hdfs://10.0.152.47:8020");
FileSystem fs=FileSystem.get(config);
//2 操作HDFS API 创建目录
fs.delete(new Path("/customMk"),true);
//3 关闭资源
fs.close();
System.out.println("删除完成");
}
@Test
public void testRename() throws IOException {
//1 获得文件系统的连接
Configuration config=new Configuration();
config.set("fs.defaultFS", "hdfs://10.0.152.47:8020");
FileSystem fs=FileSystem.get(config);
//2 操作HDFS API 修改名称
fs.rename(new Path("/a.txt"), new Path("/d.txt"));
//3 关闭资源
fs.close();
System.out.println("修改完成");
}
@Test
public void getFileInfo() throws IOException {
//1 获得文件系统的连接
Configuration config=new Configuration();
config.set("fs.defaultFS", "hdfs://10.0.152.47:8020");
FileSystem fs=FileSystem.get(config);
/*
*
* while(String str:strs){
*
* System.out.println(str);
* }
*
* */
RemoteIterator<LocatedFileStatus> listfiles=fs.listFiles(new Path("/"), true);
while(listfiles.hasNext()) {
LocatedFileStatus status=listfiles.next();
//输出文件名称
System.out.println(status.getPath().getName());
//输出文件块
System.out.println(status.getBlockSize());
//文件的长度
System.out.println(status.getLen());
//文件的权限
System.out.println(status.getPermission());
//文件的副本数量
System.out.println(status.getReplication());
//文件的所属者
System.out.println(status.getOwner());
}
}
@Test
public void filePut() throws IllegalArgumentException, IOException {
//1 获得文件系统
Configuration config=new Configuration();
config.set("fs.defaultFS", "hdfs://10.0.152.47:8020");
FileSystem fs=FileSystem.get(config);
// 2创建输入流
FileInputStream input=new FileInputStream(new File("c:/hello.txt"));
// 3获得输出流
FSDataOutputStream fos=fs.create(new Path("/hello21.txt"));
// 流拷贝 把input中的内容交给 output输出
IOUtils.copyBytes(input, fos, config);
// 5关闭资源
IOUtils.closeStream(input);
IOUtils.closeStream(fos);
}
@Test
public void getFile() throws IllegalArgumentException, IOException {
//1 获得文件系统
Configuration config=new Configuration();
config.set("fs.defaultFS", "hdfs://10.0.152.47:8020");
FileSystem fs=FileSystem.get(config);
//2 获得输入流,从HDFS读取
FSDataInputStream input=fs.open(new Path("/hello21.txt"));
//3 获得输出流
FileOutputStream output=new FileOutputStream(new File("c:/hello21.txt"));
//4 流拷贝
IOUtils.copyBytes(input, output, config);
IOUtils.closeStream(input);
IOUtils.closeStream(output);
}
对数据进行切片, 每一个切片对应一个mapTask , 假如一个文件被切成了10个切片, 就存在10个mapTask任务并行运行, 互不干扰
把每个mapTask阶段的输出进行整合
步骤:
文本 : inputFormat 输出到 map<K,V>
java python hadoop
hdfs html css
javascript scala
scala css hdfs
FileInputFormat 会读取一行文本输出给map 切分为 <>k ,v > 对, key 是文本中的偏移量, V 是文本中的内容 <0, java python hadoop>
<1, hdfs html css >
<2, javascript scala > ... <10 , scala css hdfs >
String word = value.toString(); ---->java python hadoop String[ ] words = word.split(" ");----> [java , python , hadoop]
接下来 , map 要输出给reduce <K, V > K:文本的内容, V: 单词出现的次数
Reduce进行接收 <K, V > K:文本的内容, V: 单词出现的次数
在整个MapReduce运行的时候存在如下进程: 1. mapTask 2. reduceTask 3. MRAPPMaster 任务管理的进程
如果要进行对象的传输, 则传输的内容必须进行序列化, 所以hadoop就创建了一些序列化类型
对象 | 序列化 |
---|---|
Long | longWritable |
Int | IntWrieable |
String | Text |
package org.qianfeng.wordcount;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
/**
*
* @author wubo
*
*Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
*
*输入 key 文本中偏移量
*value 文本中的内容
*
*输出 key 是文本的内容
*
*value 是单词出现的次数
*/
public class WordCountMap extends Mapper<LongWritable, Text, Text, IntWritable>{
private Text k=new Text();
@Override
protected void map(LongWritable key, Text value,Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
//1 获取一行的数据
String line=value.toString();
//2 切割 按照空格切分
String[] words=line.split(" ");
for(String word:words) {
k.set(word); //把String类型的word 转换为Text类型
//3 输出到Reduce
context.write(k, new IntWritable(1));
}
}
//需要实现Map方法编写业务逻辑
}
package org.qianfeng.wordcount;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
/**
*
* @author wubo
*
*
*hello 1
*hadoop 1
*
*hadoop 1
*
*hadoop 2
*
*把相同key的values进行累加
*/
public class WordCountReduce extends Reducer<Text, IntWritable, Text, IntWritable>{
@Override
protected void reduce(Text key, Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException {
// TODO Auto-generated method stub
int sum=0;
for(IntWritable count:values) {
sum+=count.get();
}
//输出
context.write(key, new IntWritable(sum));
}
}
package org.qianfeng.wordcount;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class Driver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
//1 获得配置信息
Configuration config=new Configuration();
// 实例化 job类 并且把配置信息传给job
Job job=Job.getInstance(config);
// 通过反射机制 加载主类的位置
job.setJarByClass(Driver.class);
//设置map和reduce类
job.setMapperClass(WordCountMap.class);
job.setReducerClass(WordCountReduce.class);
//设置map的输出
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
//设置redue的输出
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//设置文件的输入 输出路径
FileInputFormat.setInputPaths(job, new Path(args[0]]));
FileOutputFormat.setOutputPath(job, new Path(args[1]]));
//提交任务
boolean result=job.waitForCompletion(true);
System.exit(result?0:1);
}
}
$HADOOP_HOME/bin/hdfs dfs -put a.txt /a.txt
$HADOOP_HOME/bin/hadoop jar wc.jar /a.txt /output
$HADOOP_HOME/bin/hdfs dfs -cat /output/pa...
标签:.text -- efault 文件系统 put import miss cto params
原文地址:https://www.cnblogs.com/xiayangdream/p/9975158.html