码迷,mamicode.com
首页 > 其他好文 > 详细

hhhh

时间:2018-06-13 11:48:54      阅读:186      评论:0      收藏:0      [点我收藏+]

标签:assign   property   find   extra   environ   trap   configure   conf   ext   

public class LongRidesWithKafka {
    private static final String LOCAL_ZOOKEEPER_HOST = "localhost:2181";
    private static final String LOCAL_KAFKA_BROKER = "localhost:9092";
    private static final String RIDE_SPEED_GROUP = "rideSpeedGroup";
    private static final int MAX_EVENT_DELAY = 60; // rides are at most 60 sec out-of-order.

    public static void main(String[] args) throws Exception {
        final int popThreshold = 1; // threshold for popular places
        // set up streaming execution environment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        //  env.getConfig().setAutoWatermarkInterval(5000);
        //  env.setParallelism(8);
        // configure the Kafka consumer
        Properties kafkaProps = new Properties();
        //kafkaProps.setProperty("zookeeper.connect", LOCAL_ZOOKEEPER_HOST);
        kafkaProps.setProperty("bootstrap.servers", LOCAL_KAFKA_BROKER);
        kafkaProps.setProperty("group.id", RIDE_SPEED_GROUP);
        // always read the Kafka topic from the start
        kafkaProps.setProperty("auto.offset.reset", "earliest");

        // create a Kafka consumer
        FlinkKafkaConsumer011<TaxiRide> consumer = new FlinkKafkaConsumer011<>(
                "flinktest",
                new TaxiRideSchema(),
                kafkaProps);
        // assign a timestamp extractor to the consumer
        consumer.assignTimestampsAndWatermarks(new CustomWatermarkExtractor());
        DataStream<TaxiRide> rides = env.addSource(consumer);

        DataStream<TaxiRide> keyedRides = rides.keyBy("rideId");
        // A complete taxi ride has a START event followed by an END event
        Pattern<TaxiRide, TaxiRide> completedRides =
                Pattern.<TaxiRide>begin("start")
                        .where(new SimpleCondition<TaxiRide>() {
                            @Override
                            public boolean filter(TaxiRide ride) throws Exception {
                                return ride.isStart;
                            }
                        })
                        .next("end")
                        .where(new SimpleCondition<TaxiRide>() {
                            @Override
                            public boolean filter(TaxiRide ride) throws Exception {
                                return !ride.isStart;
                            }
                        });

        // We want to find rides that have NOT been completed within 120 minutes
        PatternStream<TaxiRide> patternStream = CEP.pattern(keyedRides, completedRides.within(Time.seconds(5)));

        OutputTag<TaxiRide> timedout = new OutputTag<TaxiRide>("timedout") {
        };
        SingleOutputStreamOperator<TaxiRide> longRides = patternStream.flatSelect(
                timedout,
                new LongRides.TaxiRideTimedOut<TaxiRide>(),
                new LongRides.FlatSelectNothing<TaxiRide>()
        );
        longRides.getSideOutput(timedout).print();
        env.execute("Long Taxi Rides");
    }

    public static class TaxiRideTimedOut<TaxiRide> implements PatternFlatTimeoutFunction<TaxiRide, TaxiRide> {
        @Override
        public void timeout(Map<String, List<TaxiRide>> map, long l, Collector<TaxiRide> collector) throws Exception {
            TaxiRide rideStarted = map.get("start").get(0);
            collector.collect(rideStarted);
        }
    }

    public static class FlatSelectNothing<T> implements PatternFlatSelectFunction<T, T> {
        @Override
        public void flatSelect(Map<String, List<T>> pattern, Collector<T> collector) {
        }
    }

    private static class TaxiRideTSExtractor extends AscendingTimestampExtractor<TaxiRide> {
        private static final long serialVersionUID = 1L;

        @Override
        public long extractAscendingTimestamp(TaxiRide ride) {

            //  Watermark Watermark = getCurrentWatermark();

            if (ride.isStart) {
                return ride.startTime.getMillis();
            } else {
                return ride.endTime.getMillis();
            }
        }
    }


    private static class CustomWatermarkExtractor implements AssignerWithPeriodicWatermarks<TaxiRide> {

        private static final long serialVersionUID = -742759155861320823L;

        private long currentTimestamp = Long.MIN_VALUE;

        @Override
        public long extractTimestamp(TaxiRide ride, long previousElementTimestamp) {
            // the inputs are assumed to be of format (message,timestamp)

            if (ride.isStart) {
                this.currentTimestamp = ride.startTime.getMillis();
                return ride.startTime.getMillis();
            } else {
                this.currentTimestamp = ride.endTime.getMillis();
                return ride.endTime.getMillis();
            }
        }

        @Nullable
        @Override
        public Watermark getCurrentWatermark() {
            return new Watermark(currentTimestamp == Long.MIN_VALUE ? Long.MIN_VALUE : currentTimestamp - 1);
        }
    }

}

 

hhhh

标签:assign   property   find   extra   environ   trap   configure   conf   ext   

原文地址:https://www.cnblogs.com/WCFGROUP/p/9175810.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!