Flink之New York City Taxi

背景

共享单车及打车软件在国内较为普遍,如果数据完善,不失为一份分析的好素材,但是国内的数据确实太难寻觅,这里只找到了两份纽约的的行程数据

共享单车行程数据集:Citi Bikers
出租车行程数据集:New York City Taxi & Limousine Commission

本次以New York City Taxi作为示例数据进行分析

数据集

网站New York City Taxi & Limousine Commission提供了关于纽约市从2009-2015年关于出租车驾驶的公共数据集。

nycTaxiRides.gz

TaxiRides 行程信息。每次出行包含两条记录。type标识为 行程开始start 和 行程结束end。
数据集结构

1
2
3
4
5
6
7
8
9
10
11
rideId         : int(10)         // 唯一行程id
taxiId : int(10) // 出租车唯一id
driverId : int(10) // 出租车司机唯一id
type : varchar(5) // START行程开始,END行程结束 每次出行包含两条记录
startTime : datetime(6) // 行程开始时间
endTime : datetime(6) // 行程结束时间 对于行程开始记录该值为 1970-01-01 00:00:00
startLon : decimal(10,6) // 开始行程的经度
startLat : decimal(10,6) // 开始行程的纬度
endLon : decimal(10,6) // 结束的经度
endLat : decimal(10,6) // 结束的纬度
passengerCnt : int(3) // 乘客数量

TaxiRides 数据示例

rideId,type,startTime,endTime,startLon,startLat,endLon,endLat,passengerCnt,driverId,taxiId

8cUzkd

nycTaxiFares.gz

TaxiFares 费用信息。 与上面行程信息对应

1
2
3
4
5
6
7
8
rideId         : int(10)      // 唯一行程id
taxiId : int(10) // 出租车唯一id
driverId : int(10) // 出租车司机唯一id
startTime : datetime(6) // 行程开始时间
paymentType : varchar(6) // 现金(CASH)或刷卡(CARD)
tip : float // 小费
tolls : float // 过路费
totalFare : float // 总计车费

TaxiFares 数据示例

rideId,taxiId,driverId,startTime,paymentType,tip,tolls,totalFare

k0RZE3

目标

1、将每次车程的 TaxiRide 和 TaxiFare 记录依据相同的rideId连接在一起
2、对于每个不同的 rideId,恰好有三个事件:

  • TaxiRide START 事件
  • TaxiRide END 事件
  • 一个 TaxiFare 事件(其时间戳恰好与开始时间匹配)

最终完成一个 RideAndFare 的输出

3、对这个输出进行统计过滤聚合完成案例要求

我们暂时假设数据流是通过kafka进行传输
FRPL8q

案例

基础操作:生成数据流

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// 定义出租车-车程数据源
KafkaSource<TaxiRide> rideSource = KafkaSource.<TaxiRide>builder()
.setBootstrapServers("192.168.0.192:9092")
.setTopics("TOPIC_RIDE")
.setGroupId("TEST_GROUP")
.setClientIdPrefix("ride")
.setStartingOffsets(OffsetsInitializer.latest())
.setValueOnlyDeserializer(new TaxiRideDeserialization())
.build();

// 定义出租车-车费数据源
KafkaSource<TaxiFare> fareSource = KafkaSource.<TaxiFare>builder()
.setBootstrapServers("192.168.0.192:9092")
.setTopics("TOPIC_FARE")
.setGroupId("TEST_GROUP")
.setClientIdPrefix("fare")
.setStartingOffsets(OffsetsInitializer.latest())
.setValueOnlyDeserializer(new TaxiFareDeserialization())
.build();

基础操作:过滤和连接

过滤

例如我们现在只想查看发生在纽约的行车记录。

1
2
3
4
5
6
7
8
9
10
11
public class GeoUtils {
// geo boundaries of the area of NYC
public static double LON_EAST = -73.7;
public static double LON_WEST = -74.05;
public static double LAT_NORTH = 41.0;
public static double LAT_SOUTH = 40.5;
public static boolean isInNYC(double lat, double lon) {
return !(lon > LON_EAST || lon < LON_WEST) &&
!(lat > LAT_NORTH || lat < LAT_SOUTH);
}
}

过滤器

1
2
3
4
5
6
7
private static class NYCFilter implements FilterFunction<TaxiRide> {  
@Override
public boolean filter(TaxiRide taxiRide) throws Exception {
return GeoUtils.isInNYC(taxiRide.startLon, taxiRide.startLat) &&
GeoUtils.isInNYC(taxiRide.endLon, taxiRide.endLat);
}
}
连接

我们需要把TaxiRide和TaxiFare两者的数据记录结合。在这个过程中,我们要同时处理两个source的流数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
package com.example.datastream.rideandfare;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;


public class RideAndFareJob {

public static void main(String[] args) throws Exception {

// 初始化环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.enableCheckpointing(5000L);
env.setStateBackend(new FsStateBackend("file:///mnt/data/checkpoints"));

env.setParallelism(2);

// 定义出租车-车程数据源
KafkaSource<TaxiRide> rideSource = KafkaSource.<TaxiRide>builder()
.setBootstrapServers("192.168.0.192:9092")
.setTopics("TOPIC_RIDE")
.setGroupId("TEST_GROUP")
.setClientIdPrefix("ride")
.setStartingOffsets(OffsetsInitializer.latest())
.setValueOnlyDeserializer(new TaxiRideDeserialization())
.build();

// 定义出租车-车费数据源
KafkaSource<TaxiFare> fareSource = KafkaSource.<TaxiFare>builder()
.setBootstrapServers("192.168.0.192:9092")
.setTopics("TOPIC_FARE")
.setGroupId("TEST_GROUP")
.setClientIdPrefix("fare")
.setStartingOffsets(OffsetsInitializer.latest())
.setValueOnlyDeserializer(new TaxiFareDeserialization())
.build();


// 从车程事件中过滤类型为Start的车程数据,并按车程标识 rideId 分组
KeyedStream<TaxiRide, Long> rideStream = env.fromSource(rideSource, WatermarkStrategy.noWatermarks(), "ride source")
.filter(ride -> ride.type==='START').keyBy(TaxiRide::getRideId);

// 付车费事件按行程标识 rideId 分组
KeyedStream<TaxiFare, Long> fareStream = env.fromSource(fareSource, WatermarkStrategy.noWatermarks(), "fare source")
.keyBy(TaxiFare::getRideId);

rideStream.connect(fareStream).flatMap(new EnrichmentFunction())
.uid("enrichment") // uid for this operator's state
.name("enrichment") // name for this operator in the web UI
.addSink(new PrintSinkFunction<>());


env.execute("Join Rides with Fares");
}
}

案例3:每种乘客数量的行车事件数

我们的另一个需求是计算搭载每种乘客数量的行车事件数。也就是搭载1个乘客的行车数、搭载2个乘客的行车… 当然,我们仍然只关心纽约的行车事件。
TODO:

案例4:每5分钟的进入的车辆数

为了持续地监测纽约的交通流量,需要计算出每个区块每5分钟的进入的车辆数。我们只关心至少有5辆车子进入的区块。
TODO:

案例5:收入最高出租车司机

我们想统计每个小时收入topN的司机
TODO:

相关链接

flink-training
ververica
flink-learning-in-action

http://wuchong.me/blog/2019/08/20/flink-sql-training/
https://www.cnblogs.com/bjwu/p/9973521.html
https://article.itxueyuan.com/0Kp2pR
https://www.cnblogs.com/luxh/p/16427196.html