背景 共享单车及打车软件在国内较为普遍,如果数据完善,不失为一份分析的好素材,但是国内的数据确实太难寻觅,这里只找到了两份纽约的的行程数据
共享单车行程数据集:Citi Bikers 出租车行程数据集:New York City Taxi & Limousine Commission
本次以New York City Taxi作为示例数据进行分析
数据集 网站New York City Taxi & Limousine Commission 提供了关于纽约市从2009-2015年关于出租车驾驶的公共数据集。
nycTaxiRides.gz
TaxiRides 行程信息。每次出行包含两条记录。type标识为 行程开始start 和 行程结束end。 数据集结构
1 2 3 4 5 6 7 8 9 10 11 rideId : int(10) // 唯一行程id taxiId : int(10) // 出租车唯一id driverId : int(10) // 出租车司机唯一id type : varchar(5) // START行程开始,END行程结束 每次出行包含两条记录 startTime : datetime(6) // 行程开始时间 endTime : datetime(6) // 行程结束时间 对于行程开始记录该值为 1970-01-01 00:00:00 startLon : decimal(10,6) // 开始行程的经度 startLat : decimal(10,6) // 开始行程的纬度 endLon : decimal(10,6) // 结束的经度 endLat : decimal(10,6) // 结束的纬度 passengerCnt : int(3) // 乘客数量
TaxiRides 数据示例
rideId,type,startTime,endTime,startLon,startLat,endLon,endLat,passengerCnt,driverId,taxiId
nycTaxiFares.gz
TaxiFares 费用信息。 与上面行程信息对应
1 2 3 4 5 6 7 8 rideId : int(10) // 唯一行程id taxiId : int(10) // 出租车唯一id driverId : int(10) // 出租车司机唯一id startTime : datetime(6) // 行程开始时间 paymentType : varchar(6) // 现金(CASH)或刷卡(CARD) tip : float // 小费 tolls : float // 过路费 totalFare : float // 总计车费
TaxiFares 数据示例
rideId,taxiId,driverId,startTime,paymentType,tip,tolls,totalFare
目标 1、将每次车程的 TaxiRide 和 TaxiFare 记录依据相同的rideId连接在一起 2、对于每个不同的 rideId,恰好有三个事件:
TaxiRide START 事件
TaxiRide END 事件
一个 TaxiFare 事件(其时间戳恰好与开始时间匹配)
最终完成一个 RideAndFare 的输出
3、对这个输出进行统计过滤聚合完成案例要求
我们暂时假设数据流是通过kafka进行传输
案例 基础操作:生成数据流 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 // 定义出租车-车程数据源 KafkaSource<TaxiRide> rideSource = KafkaSource.<TaxiRide>builder() .setBootstrapServers("192.168.0.192:9092") .setTopics("TOPIC_RIDE") .setGroupId("TEST_GROUP") .setClientIdPrefix("ride") .setStartingOffsets(OffsetsInitializer.latest()) .setValueOnlyDeserializer(new TaxiRideDeserialization()) .build(); // 定义出租车-车费数据源 KafkaSource<TaxiFare> fareSource = KafkaSource.<TaxiFare>builder() .setBootstrapServers("192.168.0.192:9092") .setTopics("TOPIC_FARE") .setGroupId("TEST_GROUP") .setClientIdPrefix("fare") .setStartingOffsets(OffsetsInitializer.latest()) .setValueOnlyDeserializer(new TaxiFareDeserialization()) .build();
基础操作:过滤和连接 过滤 例如我们现在只想查看发生在纽约的行车记录。
1 2 3 4 5 6 7 8 9 10 11 public class GeoUtils { // geo boundaries of the area of NYC public static double LON_EAST = -73.7; public static double LON_WEST = -74.05; public static double LAT_NORTH = 41.0; public static double LAT_SOUTH = 40.5; public static boolean isInNYC(double lat, double lon) { return !(lon > LON_EAST || lon < LON_WEST) && !(lat > LAT_NORTH || lat < LAT_SOUTH); } }
过滤器
1 2 3 4 5 6 7 private static class NYCFilter implements FilterFunction<TaxiRide> { @Override public boolean filter(TaxiRide taxiRide) throws Exception { return GeoUtils.isInNYC(taxiRide.startLon, taxiRide.startLat) && GeoUtils.isInNYC(taxiRide.endLon, taxiRide.endLat); } }
连接 我们需要把TaxiRide和TaxiFare两者的数据记录结合。在这个过程中,我们要同时处理两个source的流数据。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 package com.example.datastream.rideandfare; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.connector.kafka.source.KafkaSource; import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction; public class RideAndFareJob { public static void main(String[] args) throws Exception { // 初始化环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(5000L); env.setStateBackend(new FsStateBackend("file:///mnt/data/checkpoints")); env.setParallelism(2); // 定义出租车-车程数据源 KafkaSource<TaxiRide> rideSource = KafkaSource.<TaxiRide>builder() .setBootstrapServers("192.168.0.192:9092") .setTopics("TOPIC_RIDE") .setGroupId("TEST_GROUP") .setClientIdPrefix("ride") .setStartingOffsets(OffsetsInitializer.latest()) .setValueOnlyDeserializer(new TaxiRideDeserialization()) .build(); // 定义出租车-车费数据源 KafkaSource<TaxiFare> fareSource = KafkaSource.<TaxiFare>builder() .setBootstrapServers("192.168.0.192:9092") .setTopics("TOPIC_FARE") .setGroupId("TEST_GROUP") .setClientIdPrefix("fare") .setStartingOffsets(OffsetsInitializer.latest()) .setValueOnlyDeserializer(new TaxiFareDeserialization()) .build(); // 从车程事件中过滤类型为Start的车程数据,并按车程标识 rideId 分组 KeyedStream<TaxiRide, Long> rideStream = env.fromSource(rideSource, WatermarkStrategy.noWatermarks(), "ride source") .filter(ride -> ride.type==='START').keyBy(TaxiRide::getRideId); // 付车费事件按行程标识 rideId 分组 KeyedStream<TaxiFare, Long> fareStream = env.fromSource(fareSource, WatermarkStrategy.noWatermarks(), "fare source") .keyBy(TaxiFare::getRideId); rideStream.connect(fareStream).flatMap(new EnrichmentFunction()) .uid("enrichment") // uid for this operator's state .name("enrichment") // name for this operator in the web UI .addSink(new PrintSinkFunction<>()); env.execute("Join Rides with Fares"); } }
案例3:每种乘客数量的行车事件数 我们的另一个需求是计算搭载每种乘客数量的行车事件数。也就是搭载1个乘客的行车数、搭载2个乘客的行车… 当然,我们仍然只关心纽约的行车事件。 TODO:
案例4:每5分钟的进入的车辆数 为了持续地监测纽约的交通流量,需要计算出每个区块每5分钟的进入的车辆数。我们只关心至少有5辆车子进入的区块。 TODO:
案例5:收入最高出租车司机 我们想统计每个小时收入topN的司机 TODO:
相关链接 flink-training ververica flink-learning-in-action
http://wuchong.me/blog/2019/08/20/flink-sql-training/ https://www.cnblogs.com/bjwu/p/9973521.html https://article.itxueyuan.com/0Kp2pR https://www.cnblogs.com/luxh/p/16427196.html