基于Flink构建用户实时基础行为工程

来源: Qunar技术沙龙 作者: 孙赵宏 | 发布时间: 2019-02-20 09:30:00

本篇文章给大家介绍下基于Flink构建用户实时基础行为工程的相关实践,包括Flink相关的技术点和基础行为实时工程的业务。

孙赵宏,2018年4月加入去哪儿网,后端大数据研发工程师,目前在大住宿事业部/公共技术中心负责用户基础行为数据工程的研发。是一个贱贱的geek,想象力和技术同样重要!工匠精神编码,谦卑态度学习,勇敢品质创新!


导读

Hi,小伙伴们!今天我给大家介绍下基于 Flink 构建用户实时基础行为工程的相关实践,包括 Flink 相关的技术点和基础行为实时工程的业务。 Flink 是目前 Qunar 主推的实时数据处理开源平台,用于替代 SparkStreaming。如果你们使用 Flink 也是和我们之前一样,不知道如何使用我们的 Flink 实时计算平台,或者不知道该怎样合理利用其 Features 去更好构建我们的工程,再或者你想了解每天处理超过12亿条实时数据,数据实时性达到秒级,QPS 可支持10万的用户实时基础行为工程的技术实现,你在后面应该能找到你的答案。

Flink简介

Apache Flink 是一个面向数据流处理和批量数据处理的分布式的开源计算框架,能够支持流处理和批处理两种应用类型。有着低延迟、Exactly-once 保证,而批处理需要支持高吞吐、高效处理的特点。 Flink 是完全支持流处理,也就是说作为流处理看待时输入数据流是无界的;批处理被作为一种特殊的流处理,只是它的输入数据流被定义为有界的。这与 sparkstreaming 不同,sparkstreaming 是将流处理视为无限个有界的批处理(microbatch)。

1 Flink 特点

(1)有状态计算的 Exactly-once 语义。状态是指 flink 能够维护数据在时序上的聚类和聚合,同时它的 checkpoint 机制可以方便快速的做出失败重试; (2)支持带有事件时间(event time)语义的流处理和窗口处理。事件时间的语义使流计算的结果更加精确,尤其在事件到达无序或者延迟的情况下; (3)支持高度灵活的窗口(window)操作。支持基于 time、count、session,以及 data-driven 的窗口操作,能很好的对现实环境中的创建的数据进行建模; (4)轻量的容错处理(fault tolerance)。 它使得系统既能保持高的吞吐率又能保证 exactly-once 的一致性。通过轻量的 state snapshots 实现; (5)支持高吞吐、低延迟、高性能的流处理; (6)支持 savepoints 机制(一般手动触发)。即可以将应用的运行状态保存下来;在升级应用或者处理历史数据是能够做到无状态丢失和最小停机时间; (7)支持大规模的集群模式,支持 yarn、Mesos。可运行在成千上万的节点上; (8)支持具有 Backpressure 功能的持续流模型; (9)Flink 在 JVM 内部实现了自己的内存管理,包括完善的内存架构和 OOM error prevention;

(10)支持迭代计算; (11)支持程序自动优化:避免特定情况下 Shuffle、排序等昂贵操作,中间结果进行缓存。

2 Flink 分布式 runtime

(1)JobManager 主要工作是协调分布式系统的运行。比如协调各个任务的执行时间,管理 checkpoint 和协调异常状态的恢复等。 (2)TaskManager 是任务的真正执行者,包括数据流的缓存和交换等操作。 (3)client 不是 Flink Runtime 的一部分,也不参与任务的真正执行,只是用来启动 Job 时生成执行计划并交给 JobManager。

3 Flink 流式(DataStream)编程模型

3.1 编程抽象层级

最底层为有状态的流,通过处理函数进入 DadaStream 的 API 处理层。DataStream API 层也叫核心 API 层,一般大部分编程工作都集中于此,包括业务处理,聚合,关联等逻辑操作。Table API 层和 SQL 层其实是可以提供基于 schema 的 SQL 查询形式,目前较少使用。

3.2 流式编程

Flink 编程模型中三大元素分别是 Source,Operator 和 Sink。Flink 的流起始于 source,经过 Transformation Operator 对流进行处理,最后在 Sink 进行持久化。

用户实时基础行为工程简介

1 用户基础行为工程架构

首先适用 Flink 订阅各个业务线日志的 Kafka 集群 Topic,实时数据进入 Flink 集群中运行的各个业务线对应的 Job 进行数据清洗,为保证实时性和增加系统吞吐量,直接按照业务线为 Key 存入 redis 中。Server 端按照 Gid 和 Username 从 redis 中取出此用户所有业务线的所有行为并与离线数据合并,通过 dubbo 接口返回给客户端。为了减小服务的压力,数据的截取、解压缩等耗 CPU 资源的操作都在客户端进行。

2 基础用户行为工程的意义

目前我们提供的服务是提供一个用户100天内的实时行为,包括了机票、酒店、火车票、门票、度假、车票、大搜等业务线的搜索、点击、收藏、订单、预定等行为,数据源为 Hotdog 日志,Kylin 日志,业务线日志等。为了补充度假,门票和景点的商品信息,使用了skuvacationinfo,skuticketinfo,skusightinfo 这三个 SKU 库。 基于用户行为,我们可以做精细化留存评估,让留存数据更有价值和指导意义。也可以进行质量评估,需要基于用户行为并且贴合业务去评估,比如某个景点的搜索点击行为在同类中较高,那我们就可以认为此景点为优质景点从而增加此景点的推荐权重等。用户行为也可以用作产品分析,用数据量化产品核心功能,让产品迭代排期更科学,部门配合更高效。用户行为还可以更好实现用户分群、用户分层等精准营销。 现在基础用户行为工程已经服务于首页所有的推荐场景。用户实时行为服务已经广泛用于定向广告、首页预制词、单品推荐、目的地推荐等多个场景。我们在个性化推荐场景中进行了测试,使用实时行为服务比使用T-1日行为数据,点击率提升20%。

DataStream 的典型算子(operator)使用举例

1 使用 Filter 对流进行过滤

某些时候一种日志流中包含了许多种不同行为类型的日志,但业务处理时我们只需要对一种行为类型的日志进行清洗,这时我们可以使用DataStream 的 Filter 来对数据流进行过滤。

  1. dataStream.filter(new FilterFunction<Integer>() { @Override public boolean filter(Integer value) throws Exception { return value != 0;

  2. }

  3. });

2 使用 Split 和 select 对流进行拆分

实际业务中经常会遇到一种日志包含了多种不同的业务或者行为,但我们想将不同的业务分流后分别处理,这就使用到了分流 split 算子。

  1. SplitStream<Integer> split = someDataStream.split( new OutputSelector<Integer>() { @Override public Iterable<String> select(Integer value) {

  2.        List<String> output = new ArrayList<String>(); if (value % 2 == 0) {

  3.            output.add("even");

  4.        } else {

  5.            output.add("odd");

  6.        } return output;

  7.    }

  8. });

分流过后,再使用 select 算子进行拆分。

  1. SplitStream<Integer> split;

  2. DataStream<Integer> even = split.select("even");

  3. DataStream<Integer> odd = split.select("odd");

  4. DataStream<Integer> all = split.select("even","odd");

逻辑非常的清晰,实现非常简单。

3 使用 Join 实现双流的聚合

当我们要实现两个数据流中的数据关联的时候,我们可能想到使用 redis 等缓存中间件对中间数据进行缓存,幸运的是 Flink 的算子中已经提供了两条流进行关联的操作 Join。Join 操作和 SQL 的 Join 道理是一样的,需要用 where 指定 Join 的字段和用equalTo 指定 Join 的条件,最后使用 apply 对 Join 成功的结果进行处理。节约了中间件的资源。

  1. dataStream.join(otherStream)

  2.    .where(<key selector>).equalTo(<key selector>)

  3.    .window(TumblingEventTimeWindows.of(Time.seconds(3)))

  4.    .apply { ... }

容错策略

1 异常重试策略

特殊情况下(如遇上无法解析的数据或者出现未知异常)导致任务执行失败,Flink 会根据自己的 checkpoints 来进行自动重启恢复。 Flink 的 checkpointing 机制会存储状态的一致性快照,配置了不同的状态存储策略,checkpoints 就会保存在不同的地方,比如 JM 的内存,文件系统或是数据库。 当前我们设置5秒触发一次 checkpoint 保存,为了节约集群内存资源我们选择保存的位置为 HDFS。 重启的策略支持自定义,集群的重启策略可以通过 flink-conf.yaml的restart-strategy 来进行集群级别的控制,也可以在 Job 级别进行设置,分为固定延迟重启策略(Fixed Delay Restart Strategy)和失败率重启策略(Failure rate Restart Strategy)。 固定延迟重启策略会尝试一个给定的次数来重启 Job,如果超过了最大的重启次数,Job 最终将失败。在连续的两次重启尝试之间,重启策略会等待一个固定的时间。 失败率重启策略在 Job 失败后会重启,但是超过失败率后,Job 会最终被认定失败。在两个连续的重启尝试之间,重启策略会等待一个固定的时间。 当前我们使用 Job 级别的设置,重启1次,间隔10秒。

2 停机恢复策略

任务有更新时,Flink 版本升级时,系统升级或系统迁移等情况下,需要停掉 running 状态的 Job,此时如何保证正在处理的数据不会丢失?这就用到了 Flink 的另一个特性 Savepoint。 不同于 checkpoint 的自动触发机制,Savepoint 是手动触发的。Savepoint 为全局一致性快照,可以保存数据源 offset,operator 操作状态等信息。 (1)停止 job 并保存 savepoit

  1. ./bin/flink cancel -s [savepointDirectory] <jobID>

(2)启动 job 并开启 savepoint

  1. ./bin/flink run -s <savepointPath> ...

(3)使用 WEBUI 启动 Job 并输入 savepoint 路径

3 监控告警

Flink 提供了 Metrics,相当于我们的 Qmonitor 来对各项指标进行监控,API 丰富且使用简单。监控的指标可以投射到 Watcher上面。目前我们所有的任务包括了“单位时间接收的数据”、“单位时间处理失败的数据”、“数据从产生到进入 Flink 的延时”、“数据在 Flink 处理的实际时间”、“单位时间持久化成功率”等监控。

可用性

1 HA

目前实时处理使用的是公司 Flink 集群,4台 taskmanager,16个 slot。DB 使用的是 mysql-mmm 高可用方式。Redis 使用10个实例的集群。Server 使用8台虚机做的 NG。没有单点问题。

2 使用 Flink Backpressure 应对流量洪峰

Flink 自带背压感知功能。使我们不用手动去缓存过剩的消息,Flink 会自动控制消费速度。其实现方式印证了“最简单的办法往往最有效”这个道理。Flink 使用分布式阻塞队列来作为有界缓冲区。如同 Java 里通用的阻塞队列跟处理线程进行连接一样,一旦队列达到容量上限,一个相对较慢的接受者将拖慢发送者。

(1)记录“A”进入Flink,然后被Task 1处理; (2)Task 1处理后的结果被序列化进缓冲区; (3)task 2从缓冲区内读取一些数据,缓冲区内将有更多的空间; (4)如果task 2处理的较慢,task1的缓存区将很快填满。发送速度随之下降。 我们可以通过 WEBUI 来查看 Flink 各个 operator 的背压状态。

3 使用 EventTime、Window 和 WaterMark 处理无序流量(数据延时)

实际处理实时数据的过程中,由于日志收集和传递发送过程中,难免会在时间上乱序,就导致在处理前后有依赖性或者关联性的数据的时候出现问题。Flink 可以使用 EventTime 和 WaterMark 优雅的处理无序流量问题。

3.1 Time

在 flink 中元素可以设置3种不同的时间模型:-Processing time此时间为元素进入 operator 后被赋予的当前算子所在服务器的本地时间戳。简单说就是算子时间。-Event time此时间为元素真正的产生时间(例如日志内容中的时间戳),所以通常情况下需要我们从原始的日志内容中提取出来。-Ingestion time此时间的赋值是在元素从 source 发出,刚进入 operator 时此时的服务器本地时间戳。

从上面的简介中可以看出,如果我们想让保证元素的顺序与其最初产生的顺序一致,我们需要使用 EventTime 时间模型。

3.2 Window

Window 是无限数据流处理的核心,Window 将一个无限的 stream 拆分成有限大小的缓存区,我们在这些缓存区中对数据进行处理和计算。是一个相对比较好理解的概念。Window 分为滚动窗口、滑动窗口、会话窗口、全局窗口等。可以根据业务需求去选择使用。由于篇幅原因,这里暂时不详细介绍各自的原理和使用。

3.3 Watermark

首先介绍一下 watermark 的由来。当我们使用 window 去处理 EventTime 的乱序流时,难免会遇到延迟的元素,但我们又不想无限期的等下去,所以我们想要这么一种方式去告诉 window 停止等待,马上进入计算,watermark 就是做此工作的。 Watermark 是衡量 EventTime 的流进度的一种方式。我们可以把水印视为 flink 插入流中的一个元素,它也拥有一个时间戳,只是这个元素不会像普通元素那样被做逻辑处理。

以上图为例,W(11)和W(17) 是两个 watermark 分别携带的时间戳为11和17,当算子处理到W(11)时,首先它先识别出这是一个watermark 对象,然后它会知道时间戳小于11的元素已经不会再进入此 window 了,于是触发当前 window 进行计算并将11缓存到自己的状态中。

3.4 编程实现

对 EventTime、Window 和 WaterMark 的概念的介绍完后,我们来了解下实践中这三者是如何进行配合来处理乱序流的。

3.4.1 首先开启 EventTime 时间模型

  1. env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

我们需要告诉 Flink 我们需要使用 EventTime 的元素时间模型。

3.4.2 设置 window

  1. ds.window(SlidingEventTimeWindows.of(Time.minutes(2), Time.minutes(1)))

这里我们例子中选用的滑动窗口,根据业务调整窗口的大小和窗口滑动的距离。

3.4.3 设置 watermark

设置 watermark 有两种方式。 (1)AssignerWithPeriodicWatermarks:定时更新的 watermark。 (2)AssignerWithPunctuatedWatermarks:每个元素到来都要设置一个 watermark。 虽然后者更精准,但是大数据量的情况下会影响性能,一般使用前者。

  1.        static AssignerWithPeriodicWatermarks assigner = new AssignerWithPeriodicWatermarks<String>() {

  2. private final long maxTimeLag = 60000;//60 secs @Nullable  @Override

  3. public Watermark getCurrentWatermark() { //设置允许延后60秒  return new Watermark(System.currentTimeMillis() - maxTimeLag);

  4.       }

  5. @Override

  6. public long extractTimestamp(String logStr, long l) {

  7.            JsonObject jsonObject =

  8.                 (JsonObject) new JsonParser().parse(logStr).getAsJsonObject(); return jsonObject.get("timestamp").getAsLong();

  9.        }

  10.    };

  11. }

重写 getCurrentWatermark()来设置生成 watermark 的方式。重写 extractTimestamp()来提取元素的 eventtime。在WEBUI 上可以实时查看 watermark 状态。

4 性能扩展

当业务扩展时,只需要申请新的 taskmanager,扩容 Redis 实例,申请虚机,然后提交新的 Job 即可,水平扩展非常简单方便,完全不影响在运行的业务。 感谢浏览,如有问题欢迎指正或者联系我进行讨论。有其他希望详细了解的 Flink 的点请在下面留言给我,别忘记点赞哦,谢谢!

公众号导航