首页 > 编程语言 > 详细

flink Transitive Closure算法,实现寻找新的可达路径

时间:2019-07-04 11:23:31      阅读:91      评论:0      收藏:0      [点我收藏+]

标签:对比   from   rate   nta   包括   return   asc   计划   字段   


1、Transitive Closure是翻译闭包传递?我觉得直译不准确,意译应该是传递特性直至特性关闭,也符合本例中传递路径,寻找路径可达,直到可达路径不存在(即关闭)。


 * @Author: xu.dm
 * @Date: 2019/7/3 11:41
 * @Version: 1.0
 * @Description: 传递闭包算法,本例中就是根据成对路径,查找和生成新的可达路径
 * 例如:1-2,2-4这两对数据,可以得出新的可达路径1-4。
 * 迭代算法步骤:
 * 1、获取成对数据集edges,里面包括路径对,比如 1->2,2->4,2->5等,如果是无向边,还可以反转数据集union之前的数据。本例按有向边处理
 * 2、生成迭代头paths可迭代数据集
 * 3、用paths和原始数据集edges做join连接,找出头尾相连的数据nextPaths,即类似1->2,2->4这种,然后生成新的路径1->4。
 * 4、新的路径集nextPaths和迭代头数据集paths进行并集操作,即union操作,生成新的nextPaths,这个时候它包含了新旧两种数据
 *    在这里总是nextPaths>=paths
 * 5、去重操作,第一次迭代不会重复,但是第二次迭代开始就会有重复数据,通过groupBy全字段,去分组第一条即可达到去重效果
 * 6、以上核心迭代体完成,后面需要形成迭代闭环,确定迭代退出条件
 * 7、退出原理:每次迭代完成后,需要检查是否新的路径产生,如果没有则表示迭代可以结束
 * 8、可达寻路步骤完成后,通过对比nextPaths和paths,如果nextPaths>paths,表示有新路径生成,需要继续迭代,直到nextPaths=paths
 * 9、这里有一个迭代重要的概念,paths和nextPaths是通过迭代闭环不断更新的
 * 10、本例中迭代头和迭代尾的数据流向:paths->nextPaths->paths.
 * 11、本例通过bulk迭代方式实现了delta迭代的效果
public class TransitiveClosureNaive {
    public static void main(String args[]) throws Exception {
        // Checking input parameters
        final ParameterTool params = ParameterTool.fromArgs(args);

        // set up execution environment
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        // make parameters available in the web interface

        final int maxIterations = params.getInt("iterations", 10);

        DataSet<Tuple2<Long, Long>> edges;
            edges = env.readCsvFile(params.get("edges")).fieldDelimiter(" ").types(Long.class, Long.class);
        }else {
            System.out.println("Executing TransitiveClosureNaive example with default edges data set.");
            System.out.println("Use --edges to specify file input.");
            edges = ConnectedComponentsData.getDefaultEdgeDataSet(env);

        IterativeDataSet<Tuple2<Long,Long>> paths = edges.iterate(maxIterations);

        DataSet<Tuple2<Long,Long>> nextPaths = paths
                .with(new JoinFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>>() {
                     left: Path (z,x) - 通过z可达x
                     right: Edge (x,y) - 通过x可达y
                     out: Path (z,y) - 最终输出z可达y
                    public Tuple2<Long, Long> join(Tuple2<Long, Long> left, Tuple2<Long, Long> right) throws Exception {
                        return new Tuple2<>(left.f0,right.f1);
                //转发第一个输入Tuple2<Long, Long>中的第一个字段,转发第二个输入Tuple2<Long, Long>中的第二个字段
                .reduceGroup(new GroupReduceFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>() {
                    public void reduce(Iterable<Tuple2<Long, Long>> values, Collector<Tuple2<Long, Long>> out) throws Exception {

        DataSet<Tuple2<Long,Long>> newPaths = paths
                .with(new CoGroupFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>>() {
                    Set<Tuple2<Long, Long>> prevSet = new HashSet<>();
                    public void coGroup(Iterable<Tuple2<Long, Long>> prevPaths, Iterable<Tuple2<Long, Long>> nextPaths, Collector<Tuple2<Long, Long>> out) throws Exception {
                        for(Tuple2<Long,Long> prev:prevPaths){
                        for(Tuple2<Long,Long> next:nextPaths){

        DataSet<Tuple2<Long, Long>> transitiveClosure = paths.closeWith(nextPaths, newPaths);

        // emit result
        if (params.has("output")) {
            transitiveClosure.writeAsCsv(params.get("output"), "\n", " ");

            // execute program explicitly, because file sinks are lazy
            env.execute("Transitive Closure Example");
        } else {
            System.out.println("Printing result to stdout. Use --output to specify output path.");



public class ConnectedComponentsData {
    public static final long[] VERTICES  = new long[] {
            1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16};

    public static DataSet<Long> getDefaultVertexDataSet(ExecutionEnvironment env) {
        List<Long> verticesList = new LinkedList<Long>();
        for (long vertexId : VERTICES) {
        return env.fromCollection(verticesList);

    public static final Object[][] EDGES = new Object[][] {
            new Object[]{1L, 2L},
            new Object[]{2L, 3L},
            new Object[]{2L, 4L},
            new Object[]{3L, 5L},
            new Object[]{6L, 7L},
            new Object[]{8L, 9L},
            new Object[]{8L, 10L},
            new Object[]{5L, 11L},
            new Object[]{11L, 12L},
            new Object[]{10L, 13L},
            new Object[]{9L, 14L},
            new Object[]{13L, 14L},
            new Object[]{1L, 15L},
            new Object[]{16L, 1L}

    public static DataSet<Tuple2<Long, Long>> getDefaultEdgeDataSet(ExecutionEnvironment env) {

        List<Tuple2<Long, Long>> edgeList = new LinkedList<Tuple2<Long, Long>>();
        for (Object[] edge : EDGES) {
            edgeList.add(new Tuple2<Long, Long>((Long) edge[0], (Long) edge[1]));
        return env.fromCollection(edgeList);





flink Transitive Closure算法,实现寻找新的可达路径

标签:对比   from   rate   nta   包括   return   asc   计划   字段   


评论 一句话评论(0
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com