Jstorm到Flink在今日头条的迁移实践#

本文内容如下:

  • 引入Flink的背景
  • Flink集群的构建过程
  • 构建流式管理平台

引入Flink的背景#

下面这幅图展示的是字节跳动公司的业务场景 01

首先,应用层有广告,也有AB测,也有推送和数据仓库的一些业务。然后在使用J storm的过程中,增加了一层模板主要应用于storm的计算模型,使用的语言是python。所以说中间相对抽象了一个schema,跑在最下面一层J storm计算引擎的上面。

字节跳动公司有很多J-storm集群,在当时17年7月份的时候,也就是在计划迁移到Flink之前,J storm集群的规模大概是下图所示的规模级别,当时已经有5000台机器左右了。 02

接下来,介绍下迁移Flink的整个过程。先详细地介绍一下当时J-Storm是怎么用的。 03

上面是一个word count的例子:左边是一个目录结构,这个目录结构在resources下面,里面的Spout/Bolt的逻辑都是一些python脚本写的。然后在最外层还有一个topology_online.yaml配置文件。 这个配置文件是用来干什么的?就是把所有的Spout和Bolt串联起来构成一个有向无关图,也就是DAG图。这就是使用J storm时的整个目录结构,大部分用户都是这样用的。右边是Spout和Bolt的逻辑,其实是抽象出来了一个函数,就在这里面写业务方面的函数,然后将tuple_batch也就是上游流下来的数据去做一些计算逻辑。 04

下面详细介绍一下配置文件的信息,其实我们有整个拓扑结构拓扑的信息,比如说作业名叫什么,作业需要多少资源,需要多少work数。这里面会有单个的spout和Bolt的配置信息,比如是消费的topic还是一些并发度?

除了这些信息还有整个这个数据流的流转,比如说spout的输出,输出messsage的消息等等。最后还有整个的Spout到Bolt之间的shuffle逻辑。这就是我们之前Jstorm的整个使用方式。最后会把整个目录结构里面的内容去解析出来,根据配置文件把整个storm的拓扑结构构建出来,然后提交到集群上面去跑。

使用Jstorm集群遇到了什么问题呢?第一个问题,因为我们当时是用使用python写的代码,整个集群是没有内存隔离的,job和work之间是没有内存限制的。比如说在实际过程中会经常遇到一个用户,他可能代码写的有问题导致一个work可能占了70G内存,把机器的内存占了1/3。第二个问题就是说业务团队之间没有扩大管理,预算和审核是无头绪的。我们当时都是都是跑在一个大集群上面,然后个别业务是单独跑在一些小集群,但是我们每次都是资源不足,也没办法梳理这个预算。

第三个问题就是集群过多,运维平台化做得不太好,都是靠人来运维的。这个时候集群多了基本上是管不过来的。

第四个问题就是说我们用python写的代码,有些性能比较差。但是我们在Storm的基础上面去推广这个Java也比较难,因为我们部分同事实际上是不认可Java的,因为他觉得java开发速度太慢了。 05

我们当时想解决上面的问题,一个思路是把Jstorm放在yarn上面,直接把Jstorm在yarn上面兼容做这一套。后来因为知道阿里在用Flink所以去调研Flink,发现了Flink的一些优势,所以想尝试用Flink解决存在的问题。

使用Flink首先第一个问题可以成功解决,因为Flink作业是跑在yarn上面的,这就解决了内存隔离的问题。然后Yarn也是支持队列的,我们可以根据业务去划分队列,这样我们的扩大预算审核的问题得到解决了。我们也不需要自己运维一个集群了,因为有yarn去管理我们的资源,这样也节省了运维成员。在此基础上还可以做一些物理隔离队列,其实物理隔离队列现在也遇到了问题。因为物理隔离队列只是说这个机器隔离了,但是相当于是机柜也没有隔离网络带宽也没有隔离,所以说即使是物理隔离队列,现在也遇到比如说和离线作业共用机柜的时候,这个机柜的出口带宽被打满的问题。针对这些问题,我们后续可能想在这个离线离线集群上面做QOS这种流量级别的方式来解决这个问题。

Flink实际上是可以兼容Storm的,比如说之前的历史作业是可以迁移过来的,不需要维护两套计算引擎。Flink支持一些高优先级的API比如说支持SQL以及窗口等特性包括说checkpoint。我们头条的业务对exactly-once的需求不是特别的强烈。

06

以上就是Flink的优势,于是我们就决定从J storm往Flink去迁移。

Flink集群的构建过程#

在迁移的过程中,第一件事情是先把Flink集群建立起来。一开始肯定都是追求稳定性,比如说把离线的yarn集群隔离开,然后不依赖于HDFS也可以把Hdfs线上的name node, name space隔离出来。然后我们梳理了原来storm上面的作业,哪些作业属于不同的业务,然后映射到不同的队列里面去,最后把一些特殊的队列也隔离开来。这是我们准备这个Fink集群的时候考虑的几个点。 07

下面就考虑Flink怎么兼容J storm,然后把它迁移过来。 08

我们当时Flink用的是1.32版本,因为Flink有Flink-storm这个工程,它能把Storm作业转化成Flink作业,我们就借鉴这些技术上实现了一个Flink –jstorm。相当于把一个J storm的拓扑结构转化成了一个Flink job。只做完这件事情是不够的,因为我们有一系列的外围工具需要去对齐。比如说之前提交作业的时候是通过一个脚本提交的让用户去屏蔽一些其他的参数。使用 flink的话我们同样也是需要构建这么一个脚本,然后去提交Flink Job,最后停止flink Job。第三点是构建flink job外围工具,自动注册报警,比如说消费延迟报警,自动注册这个Dashboard以及一些log service,所有的这些为外围工具都要和原来的服务去对齐。

对齐完之后,我们需要构建一个迁移脚本,迁移的过程中最困难的是资源配置这一块。因为原来Storm用了多少资源,Storm怎么配,这对于迁移的用户来说,如果是第一次做肯定是不了解这些东西。因此我们写这么一个脚本,帮用户生成它Flink集群里面对应的资源使用情况。这些工作做完了之后,我们就开始去迁移。到现在为止,整体迁移完了,还剩下十个左右的作业没有迁移完。现在集群规模达到了大概是6000多台。 09

在迁移的过程中我们有一些其他优化,比如说J storm是能够支持task和work维度的重启的,Flink这一块做得不是特别好。我们在这方面做了一些优化实现了一个single task和single tm粒度的重启,这样就解决部分作业因为task重启导致整个作业全部重启。

构建流式管理平台#

10

迁移完之后,我们又构建了一个流式管理平台。这个平台是为了解决实际过程中遇到了一些问题,比如说整个机群挂了无法确定哪些作业在上面跑着,也通知不到具体的用户,有些用户作业都不知道自己提交了哪些作业。我们构建流式作业的时候目标实际上就是和其他的管理平台是一样的,比如说我们提供一些界面操作,然后提供一个版本管理,就是为了方便方便用户升级和回滚的操作,我们还提供了一站式的查问题的工具:把一些用户需要的信息都聚合在一个页面上面,防止用户不断跳来跳去以及避免不同系统之间的切换。有一些历史记录之前不管是跑在yarn上面还是跑到storm上面,我一个作业被别人kill到了,其实我都是不知道的。针对这个问题我们提供了一些历史操作记录的一些目标。

设计这个管理平台的时候,我们考虑到提供这么一个前端管理平台可能只是针对公司内部的一部分产品,其他的产品也做了自己的一套前端。他们可以用一个模板,根据自己的逻辑去生成一个storm任务。基于此,我们把整个管理平台抽象了两层:最上一层实际上相当于一个面向用户或者说是类似于前端的一个产品。中间这一层实际上是一个类似于提交作业调度任务,这一层只负责提任务,然后停任务,管理生命周期以及因为故障导致作业失败了,将作业重新拉起来。这是中间层TSS层做的事情。

这样,我们就可以对接到所有的前端平台。通过一个RPC进行TSS通信,就把所有的底层的服务和Filnk和Yarn还有HDFS这些交互的底层的逻辑完全屏蔽开来了。 11

接下来,用户写一个作业就比较简单了,流程如下:

第一步用户先要生成自己的一个作业模板,我们这边通过maven提供的脚本架去生成一些作业的schema,这个作业执行完之后,它会把帮你把一些porm文件,还有一些类似于kafkasource这种常规的组件都帮你准备好,然后你直接在这个模板里面填自己的主要逻辑就可以了。因为我们写Java程序遇到最多的一个问题就是包冲突问题。所以porm文件帮助用户把一些可能冲突的一些jar包都给以exclude掉,这样包冲突的概率会越来越小。

12

我们测试作业基本上是用IDEA或者local模式去测试,也提供了一个脚本去提交作业,通过这个脚本提交到stage环境上面。在提交注册在平台上面去注册这个作业,然后添加一些配置信息。 13

下面是一个代码版本管理的界面: 14

把整个作业提交之后如下图所示: 15

提交完一个作业之后,用户可能想看作业运行的状态怎么样,我们通过四种方式去给用户展示他的作业运行状态的。 16

第一个是Flink UI,也就是官方自带的UI用户可以去看。第二个是Dashboard,我们展示了作业里面的task维度,QPS以及task之间的网络buffer,这些重要的信息汇聚到一起创建了一个Dashboard,这样可能查问题的时候方便一些。第三个是错误日志,其实和大家的思路一样,把一个分布式的日志然后聚合起来,然后写到ES上面去。第四是做了一个Jobtrace的工具,就是我们把Flink里面常见的一些异常匹配出来,然后直接给用户一个wiki的使用指南,告诉用户比如说你的作业OM了需要扩大内存。只要用户的作业出现了某些问题,我们把已知的所有的异常都会匹配给用户。 下面是ES的kibana: 17

这是我们Jobtrace的功能,我们把Flink的这些常见的异常都匹配出来,每一个异常其实对应了一个wiki然后去让用户去解决自己的问题。 18

最后分享下我们的近期规划,前面的基本做完并且趋于稳定了,但是现在又遇到了一些新的问题。比如资源使用率这个问题,因为用户提交作业的时候,用户对资源不是特别敏感就随意把一个资源提上去了,可能他明明需要两个CPU,但是他提了四个CPU。我们想通过一个工具能够监控到他需要多少资源,然后通知yarn去把这个资源给重置了。就是动态调整job资源,自动把资源重置。

第二个问题是优化作业重启速度。我们这边好多业务是根据流式计算的指标来监控它业务的稳定性,如果最上游重启一个作业,底下一群人收到报警说线上出现一些问题了。原因是最上游某一个作业再重启。我们想把重启时间间隔去做到最短或者是无缝重启,这是下一阶段需要去探索探索的一个问题。

第四点:Flink SQL也刚上线,可能需要一些精力投入去推广。

最后一点,我们希望在此抽象出更多的模式作业模型来,因为我们本身是有一些比如说kafka2ES,kafka2hdfs这些需求,能不能把他们抽象成一个schema,然后去对外提供一些服务。

19

以上就是我本次分享的主要内容,感谢Flink的举办者和参与者,感谢我们的同事,因为以上的分享内容是我和我们同事一起做的。