标签:mapreduce 分布式 矩阵相乘 求解节点三角形 hadoop
Hadoop 求解无向图中节点的重要性,通过求解节点的三角形个数来展现:
求解图中节点重要性,并排序,在大数据,分布式处理大型图组织形式的数据时很重要,找出重要节点,并对重要节点做特殊处理是很重要的
下面讲解如何来求解
这篇文章分为三部分:
1,python生成无向图的邻接矩阵
2,python画出这个无向图
3,hadoop mapreduce 求解图中每个节点的三角形个数
关于hadoop求解矩阵相乘,请看之前的文章:http://blog.csdn.net/thao6626/article/details/46472535
1,python生成无向图的邻接矩阵
# coding:utf-8 __author__ = 'taohao' import random class AdjMatrix(object): def build_adjmatrix(self, dimension): temp = 1 fd = open("./AdjMatrix.txt", 'w+') for i in range(1, dimension + 1): for j in range(temp, dimension + 1): if i == j: if i == dimension: fd.write('A,' + str(i) + ',' + str(j) + ',' + '0' + '\n') fd.write('B,' + str(i) + ',' + str(j) + ',' + '0') else: fd.write('A,' + str(i) + ',' + str(j) + ',' + '0' + '\n') fd.write('B,' + str(i) + ',' + str(j) + ',' + '0' + '\n') else: value = random.randint(0, 1) fd.write('A,' + str(i) + ',' + str(j) + ',' + str(value) + '\n') fd.write('A,' + str(j) + ',' + str(i) + ',' + str(value) + '\n') fd.write('B,' + str(i) + ',' + str(j) + ',' + str(value) + '\n') fd.write('B,' + str(j) + ',' + str(i) + ',' + str(value) + '\n') temp += 1 fd.close() if __name__ == '__main__': adjMatrix = AdjMatrix() adjMatrix.build_adjmatrix(10)
2,python画出这个无向图
# coding:utf-8 __author__ = 'taohao' import matplotlib.pyplot as plt import networkx as nx class DrawGraph(object): def __init__(self): self.graph = nx.Graph(name='graph') def build_graph(self): fd = open('./AdjMatrix.txt', 'r') for line in fd: item = line.split(',') print item # length = len(item) if item[0] == 'A': self.graph.add_node(item[1]) self.graph.add_node(item[2]) # self.graph.add_nodes_from([int(item[1]), int(item[2])]) if item[3][0] == '1': self.graph.add_edge(item[1], item[2]) def draw_graph(self): nx.draw_networkx(self.graph, with_labels=True) # draw_networkx() can display the label of nodes plt.show() if __name__ == '__main__': draw_graph = DrawGraph() draw_graph.build_graph() draw_graph.draw_graph()
画出的图为:
3,hadoop mapreduce 求解图中每个节点的三角形个数
import java.io.BufferedReader; import java.io.File; import java.io.FileReader; import java.io.IOException; import java.util.HashMap; import java.util.Iterator; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; public class MatrixMutiply { /* * 矩阵存放在一个文件里面。 * 刚开始两个矩阵放在一个文件里面,hadoop会为两个文件做两次map导致先做一次map和reduce, * 这样另外一个矩阵就没有数据,后面的reduce会出现问题 * 矩阵存放的形式是: * A,1,1,2 表示A矩阵第一行第一列数据为2 * A,1,2,1 * A,2,1,3 * A,2,2,4 * 这样存放的目的是防止一次map在读取数据时分片而导致数据读取不完整 * 矩阵由python脚本产生,python脚本见BuildMatrix.py * * */ private static int colNumB = 10; private static int rowNumA = 10; public static class MatrixMapper extends Mapper<Object, Text, Text, Text>{ /* * rowNumA and colNumB need to be confirm manually * map阶段: * 将数据组织为KEY VALUE的形式 * key:结果矩阵的元素的位置号 * value:结果矩阵元素需要用到的原两个矩阵的数据 * 要注意运算矩阵前矩阵和后矩阵在map阶段处理数据在组织map输出数据时不一样 * * */ private Text mapOutputkey; private Text mapOutputvalue; @Override protected void map(Object key, Text value, Mapper<Object, Text, Text, Text>.Context context) throws IOException, InterruptedException { // TODO Auto-generated method stub System.out.println("map input key:" + key); System.out.println("map input value:" + value); String[] matrixStrings = value.toString().split("\n"); for(String item : matrixStrings){ System.out.println("item:"+ item); String[] elemString = item.split(","); for(String string : elemString){ System.out.println("element" + string); } System.out.println("elemString[0]:"+elemString[0]); if(elemString[0].equals("A")){ // 此处一定要用equals,而不能用==来判断 /* * 对A矩阵进行map化,outputkey outputvalue 在组织上要注意细节,处理好细节 * */ for(int i=1; i<=colNumB; i++){ mapOutputkey = new Text(elemString[1] + "," + String.valueOf(i)); mapOutputvalue = new Text("A:" + elemString[2] + "," + elemString[3]); context.write(mapOutputkey, mapOutputvalue); System.out.println("mapoutA:"+mapOutputkey+mapOutputvalue); } } /* * 对B矩阵map,mapoutput的组织和A矩阵的不同,细节要处理好 * */ else if(elemString[0].equals("B")){ for(int j=1; j<=rowNumA; j++){ mapOutputkey = new Text(String.valueOf(j) + "," + elemString[2]); mapOutputvalue = new Text("B:" + elemString[1] + "," + elemString[3]); context.write(mapOutputkey, mapOutputvalue); System.out.println("mapoutB"+mapOutputkey+mapOutputvalue); } } else{ // just for debug System.out.println("mapout else else :--------------->"+ item); } } } } public static class MatixReducer extends Reducer<Text, Text, Text, Text> { private HashMap<String, String> MatrixAHashmap = new HashMap<String, String>(); private HashMap<String, String> MatrixBHashmap = new HashMap<String, String>(); private String val; @Override protected void reduce(Text key, Iterable<Text> value, Reducer<Text, Text, Text, Text>.Context context) throws IOException, InterruptedException { // TODO Auto-generated method stub System.out.println("reduce input key:" + key); System.out.println("reduce input value:" + value.toString()); for(Text item : value){ val = item.toString(); System.out.println("val------------"+val); if(!val.equals("0")){ String[] kv = val.substring(2).split(","); if(val.startsWith("A:")){ MatrixAHashmap.put(kv[0], kv[1]); } if(val.startsWith("B:")){ MatrixBHashmap.put(kv[0], kv[1]); } } } /*just for debug*/ System.out.println("hashmapA:"+MatrixAHashmap); System.out.println("hashmapB:"+MatrixBHashmap); Iterator<String> iterator = MatrixAHashmap.keySet().iterator(); int sum = 0; while(iterator.hasNext()){ String keyString = iterator.next(); sum += Integer.parseInt(MatrixAHashmap.get(keyString))* Integer.parseInt(MatrixBHashmap.get(keyString)); } //LongWritable reduceOutputvalue = new LongWritable(sum); Text reduceOutputvalue = new Text(String.valueOf(sum)); context.write(key, reduceOutputvalue); /*just for debug*/ System.out.println("reduce output key:" + key); System.out.println("reduce output value:" + reduceOutputvalue); } } public static class TriangleMapper extends Mapper<Object, Text, Text, Text>{ @Override protected void map(Object key, Text value, Mapper<Object, Text, Text, Text>.Context context) throws IOException, InterruptedException { // TODO Auto-generated method stub /* * map 将矩阵相乘的结果作为输入 * map input key 是hadoop自己分配 * map input value 就是矩阵相乘的结果文件中的每一行 * 对map input value 进行处理 * map output key: 行号 * map output value: 元素的列号+","+元素值 * for example: * key:1 * value:1,2 * */ String[] valueString = value.toString().split("\t"); String[] keyItems = valueString[0].split(","); Text outputKey = new Text(keyItems[0]); Text outputValue = new Text(keyItems[1] + "," + valueString[1]); context.write(outputKey, outputValue); } } public static class TriangleReducer extends Reducer<Text, Text, Text, Text>{ private String[] matrix = new String[colNumB*colNumB]; private boolean readGlobalMatrixFlag = false; private int[] rowValue = new int[colNumB]; /* * 得到原始矩阵的邻接矩阵 * */ private void getGlobalMatrix() { // TODO Auto-generated method stub String ADJ_MATRIX_PATH = "/home/taohao/PycharmProjects/Webs/pythonScript/Matrix/AdjMatrix.txt"; File file = new File(ADJ_MATRIX_PATH); BufferedReader bufferedReader = null; try { bufferedReader = new BufferedReader(new FileReader(file)); String line = null; while((line = bufferedReader.readLine()) != null){ String[] items = line.split("[,\n]"); if(items[0].equals("A")){ matrix[(Integer.parseInt(items[1])-1)* colNumB + Integer.parseInt(items[2]) - 1] = items[3]; } } bufferedReader.close(); } catch (Exception e) { // TODO: handle exception System.out.println(e.toString()); } } @Override protected void reduce(Text key, Iterable<Text> value, Reducer<Text, Text, Text, Text>.Context context) throws IOException, InterruptedException { /* * 以行为单位去求解三角形 * * */ // TODO Auto-generated method stub if(!readGlobalMatrixFlag){ getGlobalMatrix(); readGlobalMatrixFlag = true; } Iterator<Text> iterator = value.iterator(); int rowSum = 0; while(iterator.hasNext()){ /* * 注意此处要以reduce input value 中的数来标记元素是哪一列的 * 因为reduce输入的不一定是从前到后的,会是乱序 * */ String[] valueItems = iterator.next().toString().split(","); rowValue[Integer.parseInt(valueItems[0])-1] = Integer.parseInt(valueItems[1]); } int rowKey = Integer.parseInt(key.toString()); for(int i = 0; i < colNumB; i++){ if(matrix[i + (rowKey-1)*colNumB].equals("1")){ rowSum += rowValue[i]; } } rowSum = rowSum / 2; Text outputValue = new Text(String.valueOf(rowSum)); context.write(key, outputValue); } } public static void main(String[] args) throws Exception{ Configuration conf = new Configuration(); Configuration confTriangle = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if(otherArgs.length != 2){ System.err.println("Usage: matrix <in> <out>"); System.exit(2); } Job job = Job.getInstance(conf, "matrix"); job.setJarByClass(MatrixMutiply.class); job.setMapperClass(MatrixMapper.class); /*按照思路,这里不需要combiner操作,不需指明*/ // job.setCombinerClass(MatixReducer.class); job.setReducerClass(MatixReducer.class); /*这两个outputkeyclass outputvalueclass 对map output 和 reduce output同时起作用*/ /*注意是同时,所以在指定map 和 reduce的输出时要一致*/ job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); job.waitForCompletion(true); Job jobTriangle = Job.getInstance(confTriangle, "triangle"); jobTriangle.setJarByClass(MatrixMutiply.class); jobTriangle.setMapperClass(TriangleMapper.class); jobTriangle.setReducerClass(TriangleReducer.class); jobTriangle.setOutputKeyClass(Text.class); jobTriangle.setOutputValueClass(Text.class); FileInputFormat.addInputPath(jobTriangle, new Path("/trianglematrixoutput/part-r-00000")); FileOutputFormat.setOutputPath(jobTriangle, new Path("/triangleoutput")); System.exit(jobTriangle.waitForCompletion(true) ? 0 : 1); } }
第一轮做矩阵相乘,邻接矩阵自乘,结果输出到一个目录下面
第二轮,将邻接矩阵自乘的结果作为输入,通过对相乘的结果和原邻接矩阵进行分析得到最终的结果
每一轮mapreduce需要一个job来控制,因此这里要启动两个job实例来做两轮mapreduce
版权声明:本文为博主原创文章,未经博主允许不得转载。
标签:mapreduce 分布式 矩阵相乘 求解节点三角形 hadoop
原文地址:http://blog.csdn.net/thao6626/article/details/46653437