码迷,mamicode.com
首页 > 其他好文 > 详细

Hadoop 分析图中节点的重要性,求解图中节点三角形个数

时间:2015-06-27 10:00:14      阅读:177      评论:0      收藏:0      [点我收藏+]

标签:mapreduce   分布式   矩阵相乘   求解节点三角形   hadoop   

Hadoop 求解无向图中节点的重要性,通过求解节点的三角形个数来展现:

求解图中节点重要性,并排序,在大数据,分布式处理大型图组织形式的数据时很重要,找出重要节点,并对重要节点做特殊处理是很重要的

下面讲解如何来求解


这篇文章分为三部分:

1,python生成无向图的邻接矩阵

2,python画出这个无向图

3,hadoop mapreduce 求解图中每个节点的三角形个数


关于hadoop求解矩阵相乘,请看之前的文章:http://blog.csdn.net/thao6626/article/details/46472535  


1,python生成无向图的邻接矩阵

# coding:utf-8
__author__ = 'taohao'
import random


class AdjMatrix(object):

    def build_adjmatrix(self, dimension):
        temp = 1
        fd = open("./AdjMatrix.txt", 'w+')
        for i in range(1, dimension + 1):
            for j in range(temp, dimension + 1):
                if i == j:
                    if i == dimension:
                        fd.write('A,' + str(i) + ',' + str(j) + ',' + '0' + '\n')
                        fd.write('B,' + str(i) + ',' + str(j) + ',' + '0')
                    else:
                        fd.write('A,' + str(i) + ',' + str(j) + ',' + '0' + '\n')
                        fd.write('B,' + str(i) + ',' + str(j) + ',' + '0' + '\n')
                else:
                    value = random.randint(0, 1)
                    fd.write('A,' + str(i) + ',' + str(j) + ',' + str(value) + '\n')
                    fd.write('A,' + str(j) + ',' + str(i) + ',' + str(value) + '\n')
                    fd.write('B,' + str(i) + ',' + str(j) + ',' + str(value) + '\n')
                    fd.write('B,' + str(j) + ',' + str(i) + ',' + str(value) + '\n')
            temp += 1
        fd.close()


if __name__ == '__main__':
    adjMatrix = AdjMatrix()
    adjMatrix.build_adjmatrix(10)


2,python画出这个无向图

# coding:utf-8
__author__ = 'taohao'
import matplotlib.pyplot as plt
import networkx as nx


class DrawGraph(object):

    def __init__(self):
        self.graph = nx.Graph(name='graph')

    def build_graph(self):
        fd = open('./AdjMatrix.txt', 'r')
        for line in fd:
            item = line.split(',')
            print item
            # length = len(item)
            if item[0] == 'A':
                self.graph.add_node(item[1])
                self.graph.add_node(item[2])
                # self.graph.add_nodes_from([int(item[1]), int(item[2])])
                if item[3][0] == '1':
                    self.graph.add_edge(item[1], item[2])

    def draw_graph(self):
        nx.draw_networkx(self.graph, with_labels=True)   # draw_networkx() can display the label of nodes
        plt.show()


if __name__ == '__main__':
    draw_graph = DrawGraph()
    draw_graph.build_graph()
    draw_graph.draw_graph()

画出的图为:

技术分享





3,hadoop mapreduce 求解图中每个节点的三角形个数

import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;  
import java.util.HashMap;  
import java.util.Iterator;  
import org.apache.hadoop.conf.Configuration;  
import org.apache.hadoop.fs.Path;  
import org.apache.hadoop.io.Text;  
import org.apache.hadoop.mapreduce.Job;  
import org.apache.hadoop.mapreduce.Mapper;  
import org.apache.hadoop.mapreduce.Reducer;  
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  
import org.apache.hadoop.util.GenericOptionsParser;  
  
public class MatrixMutiply {  
    /* 
     * 矩阵存放在一个文件里面。 
     * 刚开始两个矩阵放在一个文件里面,hadoop会为两个文件做两次map导致先做一次map和reduce, 
     * 这样另外一个矩阵就没有数据,后面的reduce会出现问题 
     * 矩阵存放的形式是: 
     * A,1,1,2   表示A矩阵第一行第一列数据为2 
     * A,1,2,1 
     * A,2,1,3 
     * A,2,2,4 
     * 这样存放的目的是防止一次map在读取数据时分片而导致数据读取不完整 
     * 矩阵由python脚本产生,python脚本见BuildMatrix.py 
     *  
     * */  
    private static int colNumB = 10;
    private static int rowNumA = 10;
	
    public static class MatrixMapper extends Mapper<Object, Text, Text, Text>{  
        /* 
         * rowNumA and colNumB need to be confirm manually 
         * map阶段: 
         * 将数据组织为KEY VALUE的形式 
         * key:结果矩阵的元素的位置号 
         * value:结果矩阵元素需要用到的原两个矩阵的数据 
         * 要注意运算矩阵前矩阵和后矩阵在map阶段处理数据在组织map输出数据时不一样 
         *  
         * */  
        private Text mapOutputkey;  
        private Text mapOutputvalue;  
          
        @Override  
        protected void map(Object key, Text value,  
                Mapper<Object, Text, Text, Text>.Context context)  
                throws IOException, InterruptedException {  
            // TODO Auto-generated method stub  
            System.out.println("map input key:" + key);  
            System.out.println("map input value:" + value);  
            String[] matrixStrings = value.toString().split("\n");  
            for(String item : matrixStrings){  
                System.out.println("item:"+ item);  
                String[] elemString = item.split(",");  
                for(String string : elemString){  
                    System.out.println("element" + string);  
                }  
  
                System.out.println("elemString[0]:"+elemString[0]);  
                if(elemString[0].equals("A")){   // 此处一定要用equals,而不能用==来判断  
                    /* 
                     * 对A矩阵进行map化,outputkey outputvalue 在组织上要注意细节,处理好细节 
                     * */  
                    for(int i=1; i<=colNumB; i++){  
                        mapOutputkey = new Text(elemString[1] + "," + String.valueOf(i));  
                        mapOutputvalue = new Text("A:" + elemString[2] + "," + elemString[3]);  
                        context.write(mapOutputkey, mapOutputvalue);  
                        System.out.println("mapoutA:"+mapOutputkey+mapOutputvalue);  
                    }  
                }  
                /* 
                 * 对B矩阵map,mapoutput的组织和A矩阵的不同,细节要处理好 
                 * */  
                else if(elemString[0].equals("B")){  
                    for(int j=1; j<=rowNumA; j++){  
                        mapOutputkey = new Text(String.valueOf(j) + "," + elemString[2]);  
                        mapOutputvalue = new Text("B:" + elemString[1] + "," + elemString[3]);  
                        context.write(mapOutputkey, mapOutputvalue);  
                        System.out.println("mapoutB"+mapOutputkey+mapOutputvalue);  
                    }  
                }  
                else{   // just for debug  
                    System.out.println("mapout else else :--------------->"+ item);  
                }  
            }  
        }  
    }  
      
    public static class MatixReducer extends Reducer<Text, Text, Text, Text> {  
  
        private HashMap<String, String> MatrixAHashmap = new HashMap<String, String>();  
        private HashMap<String, String> MatrixBHashmap = new HashMap<String, String>();  
        private String val;   
          
        @Override  
        protected void reduce(Text key, Iterable<Text> value,  
                Reducer<Text, Text, Text, Text>.Context context)  
                throws IOException, InterruptedException {  
            // TODO Auto-generated method stub  
            System.out.println("reduce input key:" + key);  
            System.out.println("reduce input value:" + value.toString());  
            for(Text item : value){  
                val = item.toString();  
                System.out.println("val------------"+val);  
                if(!val.equals("0")){  
                    String[] kv = val.substring(2).split(",");  
                    if(val.startsWith("A:")){  
                        MatrixAHashmap.put(kv[0], kv[1]);  
                    }  
                    if(val.startsWith("B:")){  
                        MatrixBHashmap.put(kv[0], kv[1]);  
                    }  
  
                }  
            }  
            /*just for debug*/  
            System.out.println("hashmapA:"+MatrixAHashmap);  
            System.out.println("hashmapB:"+MatrixBHashmap);  
                Iterator<String> iterator = MatrixAHashmap.keySet().iterator();  
                int sum = 0;  
                    while(iterator.hasNext()){  
                        String keyString = iterator.next();  
                            sum += Integer.parseInt(MatrixAHashmap.get(keyString))*  
                                    Integer.parseInt(MatrixBHashmap.get(keyString));  
                    }  
                    //LongWritable reduceOutputvalue = new LongWritable(sum);  
                    Text reduceOutputvalue = new Text(String.valueOf(sum));  
                    context.write(key, reduceOutputvalue);  
                    /*just for debug*/  
                    System.out.println("reduce output key:" + key);  
                    System.out.println("reduce output value:" + reduceOutputvalue);  
        }  
    }  
    
    public static class TriangleMapper extends Mapper<Object, Text, Text, Text>{
    	
		@Override
		protected void map(Object key, Text value,
				Mapper<Object, Text, Text, Text>.Context context)
				throws IOException, InterruptedException {
			// TODO Auto-generated method stub
			/*
			 * map 将矩阵相乘的结果作为输入
			 * map input key 是hadoop自己分配
			 * map input value 就是矩阵相乘的结果文件中的每一行
			 * 对map input value 进行处理 
			 * map output key: 行号
			 * map output value: 元素的列号+","+元素值
			 * for example:
			 * key:1
			 * value:1,2
			 * */
            String[] valueString = value.toString().split("\t");
            String[] keyItems = valueString[0].split(",");
            Text outputKey = new Text(keyItems[0]);
            Text outputValue = new Text(keyItems[1] + "," + valueString[1]);
            context.write(outputKey, outputValue);
		}
    	
    }
    
    public static class TriangleReducer extends Reducer<Text, Text, Text, Text>{
    	private String[] matrix = new String[colNumB*colNumB];
    	private boolean readGlobalMatrixFlag = false;
    	private int[] rowValue = new int[colNumB];
    	
    	
    	/*
    	 * 得到原始矩阵的邻接矩阵
    	 * */
    	private void getGlobalMatrix() {
			// TODO Auto-generated method stub
    		String ADJ_MATRIX_PATH = "/home/taohao/PycharmProjects/Webs/pythonScript/Matrix/AdjMatrix.txt";
    		File file = new File(ADJ_MATRIX_PATH);
    		BufferedReader bufferedReader = null;
    		try {
    			bufferedReader = new BufferedReader(new FileReader(file));
    			String line = null;
    			while((line = bufferedReader.readLine()) != null){
    				String[] items = line.split("[,\n]"); 
    				if(items[0].equals("A")){
    					matrix[(Integer.parseInt(items[1])-1)* colNumB + Integer.parseInt(items[2]) - 1] = items[3];
    				}
    			}
    			bufferedReader.close();
    		} catch (Exception e) {
    			// TODO: handle exception
    			System.out.println(e.toString());
    		}
		}

		@Override
		protected void reduce(Text key, Iterable<Text> value,
				Reducer<Text, Text, Text, Text>.Context context)
				throws IOException, InterruptedException {
			/*
			 * 以行为单位去求解三角形
			 * 
			 * */
			// TODO Auto-generated method stub
			if(!readGlobalMatrixFlag){
				getGlobalMatrix();
				readGlobalMatrixFlag = true;
			}
			Iterator<Text> iterator = value.iterator();
            int rowSum = 0;
            while(iterator.hasNext()){
            	/*
            	 * 注意此处要以reduce input value 中的数来标记元素是哪一列的
            	 * 因为reduce输入的不一定是从前到后的,会是乱序
            	 * */
                 String[] valueItems = iterator.next().toString().split(",");
                 rowValue[Integer.parseInt(valueItems[0])-1] = Integer.parseInt(valueItems[1]); 
            }
			int rowKey = Integer.parseInt(key.toString());
			for(int i = 0; i < colNumB; i++){
				if(matrix[i + (rowKey-1)*colNumB].equals("1")){
					rowSum += rowValue[i];
				}
			}
			rowSum = rowSum / 2;
			Text outputValue = new Text(String.valueOf(rowSum));
			context.write(key, outputValue);
		}
    	
    }
    
    public static void main(String[] args) throws Exception{  
        Configuration conf = new Configuration();  
        Configuration confTriangle = new Configuration();
        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();  
        if(otherArgs.length != 2){  
            System.err.println("Usage: matrix <in> <out>");  
            System.exit(2);  
        }  
          
        Job job = Job.getInstance(conf, "matrix");
        
        job.setJarByClass(MatrixMutiply.class);  
        job.setMapperClass(MatrixMapper.class);
        /*按照思路,这里不需要combiner操作,不需指明*/  
//      job.setCombinerClass(MatixReducer.class);    
        job.setReducerClass(MatixReducer.class);  
        /*这两个outputkeyclass outputvalueclass 对map output 和 reduce output同时起作用*/  
        /*注意是同时,所以在指定map 和 reduce的输出时要一致*/  
        job.setOutputKeyClass(Text.class);  
        job.setOutputValueClass(Text.class);
        FileInputFormat.addInputPath(job, new Path(otherArgs[0]));  
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
        job.waitForCompletion(true);
        
        Job jobTriangle = Job.getInstance(confTriangle, "triangle");
        jobTriangle.setJarByClass(MatrixMutiply.class);
        jobTriangle.setMapperClass(TriangleMapper.class);
        jobTriangle.setReducerClass(TriangleReducer.class);
        jobTriangle.setOutputKeyClass(Text.class);  
        jobTriangle.setOutputValueClass(Text.class);
        FileInputFormat.addInputPath(jobTriangle, new Path("/trianglematrixoutput/part-r-00000"));  
        FileOutputFormat.setOutputPath(jobTriangle, new Path("/triangleoutput"));
        System.exit(jobTriangle.waitForCompletion(true) ? 0 : 1);
    }  
}  

有两轮mapreduce :

第一轮做矩阵相乘,邻接矩阵自乘,结果输出到一个目录下面

第二轮,将邻接矩阵自乘的结果作为输入,通过对相乘的结果和原邻接矩阵进行分析得到最终的结果


每一轮mapreduce需要一个job来控制,因此这里要启动两个job实例来做两轮mapreduce



版权声明:本文为博主原创文章,未经博主允许不得转载。

Hadoop 分析图中节点的重要性,求解图中节点三角形个数

标签:mapreduce   分布式   矩阵相乘   求解节点三角形   hadoop   

原文地址:http://blog.csdn.net/thao6626/article/details/46653437

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!