知其然知其所以然,Flink运行底层原理和源码剖析(字节、阿里大数据面经)

第一章:Flink 是怎么跑起来的?——从入口点开始扒源码

Flink 项目那么大,要看源码从哪儿开始?别慌,带你一点点扒。

一般来说,我们提交一个 Flink 程序,大多是通过 StreamExecutionEnvironment.getExecutionEnvironment().execute() 这句熟得不能再熟的代码。那么我们今天就从这条链开始撸,看看底下藏了多少妖魔鬼怪。

StreamExecutionEnvironment 背后是个啥?

这货是用户入口类,不管你是写批的还是流的,基本都绕不过去它。我们直接看 getExecutionEnvironment() 这个方法:

public static StreamExecutionEnvironment getExecutionEnvironment() {
    return Utils.resolveFactory(threadContextClassLoader).createExecutionEnvironment();
}

看到没,它其实调了个 Utils.resolveFactory(...).createExecutionEnvironment(),这一看就是 SPI 风格,动态决定到底是创建本地的,还是集群的环境。

resolveFactory 做了什么?

public static ExecutionEnvironmentFactory resolveFactory(ClassLoader classLoader) {
    final ServiceLoader<ExecutionEnvironmentFactory> loader = ServiceLoader.load(ExecutionEnvironmentFactory.class, classLoader);
    for (ExecutionEnvironmentFactory factory : loader) {
        return factory;
    }
    return defaultFactory;
}

关键在 ServiceLoader,它是 Java 标准的 SPI 实现,也就是说只要在 META-INF/services/ 目录下注册了实现类,它就能把你捞出来。

在 Flink 中会有哪些实现呢?你猜到了:本地执行的 LocalStreamEnvironmentFactory,还有远程执行的 RemoteStreamEnvironmentFactory。源码里默认走的是本地:

return defaultFactory; // new LocalStreamEnvironmentFactory()

createExecutionEnvironment 是谁实现的?

我们看 LocalStreamEnvironmentFactory

@Override
public StreamExecutionEnvironment createExecutionEnvironment() {
    return new LocalStreamEnvironment();
}

LocalStreamEnvironment 做了哪些事?

构造函数里干了不少事,最核心的就是给 ExecutionConfig 设置默认参数、启动 EmbeddedJobExecutor 等等。我们重点看 execute() 这个链路是怎么调用的。

execute() -> StreamGraph -> JobGraph

你写的 Flink 作业,一开始就是一个 API DAG,后面它会被转成可调度的 JobGraph,而这一系列的“转化过程”正藏在 StreamExecutionEnvironment#execute() 里:

public JobExecutionResult execute(String jobName) throws Exception {
    StreamGraph streamGraph = getStreamGraph(jobName);
    return execute(streamGraph);
}

getStreamGraph() 是重点:

public StreamGraph getStreamGraph(String jobName) {
    this.streamGraph.setJobName(jobName);
    return this.streamGraph;
}

等等,不对啊?这看着像缓存,真相呢?要看下 StreamGraph 到底是啥时候构造出来的。

StreamGraph 是怎么一步步 build 出来的?

看 API,比如你写了:

env.fromElements(1,2,3).map(x -> x * 2).print();

每一步其实都会调用一个 addTransformation() 方法。

例如:

DataStream<T> map(...) {
    return new SingleOutputStreamOperator<>(...)
        .transform("Map", TypeInformation, new MapOperator());
}

transform(...) 最终会走到:

public <R> SingleOutputStreamOperator<R> transform(...) {
    transformation = new OneInputTransformation<>(...);
    environment.addOperator(transformation);
    return new SingleOutputStreamOperator<>(...);
}

注意 addOperator() 会把这个 transformation 放进 transformations 列表里,这些 Transformation 就是构造 StreamGraph 的基础。

接下来,StreamGraphGenerator 会把这些 Transformation 转成节点:

StreamGraphGenerator generator = new StreamGraphGenerator(transformations, config, checkpointCfg);
StreamGraph streamGraph = generator.generate();

StreamGraph 到 JobGraph

这步是 Flink 中最核心的“编译”过程。

JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(streamGraph);

这个方法会做很多事,包括 operator chaining、slot sharing group 设置、checkpoint config 生效等。你可以想象,StreamGraph 更像“语法树”,而 JobGraph 是“可执行计划”。

到这里为止,你写的 API DAG,已经变成了可提交的 JobGraph,下一步就是:提交到 cluster 去跑了。

第二章:JobGraph 是怎么提交给集群的?——LocalEnvironment 的幕后推手

继续沿着 LocalStreamEnvironment#execute(StreamGraph) 看,它最终走到了:

JobClient jobClient = executeAsync(streamGraph);
JobExecutionResult result = jobClient.getJobExecutionResult().get();

你没看错,这里走的是异步提交 + 阻塞等待,典型的 submit-and-wait。

来重点看 executeAsync()

public JobClient executeAsync(StreamGraph streamGraph) throws Exception {
    JobGraph jobGraph = streamGraph.getJobGraph();
    configureJobGraph(jobGraph);
    return jobExecutor.execute(jobGraph);
}

这就出现了关键角色:jobExecutor,它其实是个 ExecutorServiceLoader 配出来的 JobExecutor 实现,在 Local 模式下,它就是 EmbeddedExecutor

也就是说,在本地模式中,Flink 是在一个嵌套线程里跑了个 mini 集群,JobManager、TaskManager 全部“假装自己在远方”。

EmbeddedExecutor 的 execute 过程

我们看下 EmbeddedExecutor#execute(JobGraph)

public CompletableFuture<JobClient> execute(JobGraph jobGraph) {
    DispatcherGateway gateway = miniCluster.submitJob(jobGraph);
    return CompletableFuture.completedFuture(new EmbeddedJobClient(gateway, jobGraph));
}

你没看错,真正提交作业的是 MiniCluster#submitJob

MiniCluster 是 Flink 本地模式最核心的模拟集群,它内部其实真的维护了一堆组件:Dispatcher、JobManagerRunner、TaskManager。对,就是你集群里那一堆东东,只不过缩小了规模,搬到了 JVM 进程里。

MiniCluster 是怎么玩 Dispatcher 的?

public DispatcherGateway submitJob(JobGraph jobGraph) throws Exception {
    Dispatcher dispatcher = this.dispatcher;
    dispatcher.submitJob(jobGraph);
    return dispatcher.getGateway();
}

我们一步步来追进去看 Dispatcher 的 submitJob

public void submitJob(JobGraph jobGraph) throws Exception {
    jobManagerRunner = createJobManagerRunner(jobGraph);
    jobManagerRunner.start();
}

JobManagerRunner:核心中的核心

它内部封装了 JobMaster,是负责调度每个 Job 的老大哥。

public void start() {
    jobMaster.start();
}

JobMaster 的启动过程会注册资源管理器、分发 ExecutionGraph、启动 slot 分配逻辑,等等等等,一堆调度初始化逻辑。

第三章:ExecutionGraph 是怎么生出来的?——调度大脑初现端倪

说真的,ExecutionGraph 是 Flink 调度系统的灵魂。如果说 JobGraph 是结构图,那 ExecutionGraph 就是调度图,它定义了:

  • 哪些 task 会启动,怎么切分成子任务;
  • 每个 subtask 的生命周期,状态转移逻辑;
  • 如何重启失败的节点,以及如何协同 checkpoint。

入口点在哪?

还记得上章里 JobMaster#start() 被调了吗?在它启动过程中,有一个特别重要的方法:

jobManagerRunner.createJobMaster().initializeJobMaster();

而这个 initializeJobMaster() 会调用:

ExecutionGraph executionGraph = schedulerNG.getExecutionGraph();

没错,ExecutionGraph 的创建,是由 Scheduler 负责的!

Scheduler 是谁?

在新版 Flink(尤其是 1.15+)中,调度器默认是 DefaultScheduler,它继承自 SchedulerBase。而构造 Scheduler 时,会传入一个叫 ExecutionGraphFactory 的东东。

ExecutionGraph executionGraph = executionGraphFactory.createAndRestoreExecutionGraph(...);

好,接下来咱们进入正题——到底 ExecutionGraph 是怎么构造出来的?

ExecutionGraphFactory 做了哪些黑科技?

public ExecutionGraph createAndRestoreExecutionGraph(...) {
    ExecutionGraph executionGraph = createExecutionGraph();
    restoreState(executionGraph);
    return executionGraph;
}

它做了两件大事:

  • 构造 ExecutionGraph(基于 JobGraph);
  • 如果有保存点(savepoint/checkpoint),就进行状态恢复。

这两步都很关键,我们先关注构造部分:

ExecutionGraph executionGraph = new ExecutionGraph(...);
executionGraph.attachJobGraph(jobGraph);

熟悉的构造函数 + attach 方法组合拳来了。

attachJobGraph:组装逻辑集中地

这是整个 ExecutionGraph 生成过程中,最复杂的一环,绝对的“重头戏”。源码就像一锅炖肉,全是料:

for (JobVertex vertex : jobGraph.getVerticesSortedTopologicallyFromSources()) {
    executionVertices = createExec

剩余60%内容,订阅专栏后可继续查看/也可单篇购买

17年+码农经历了很多次面试,多次作为面试官面试别人,多次大数据面试和面试别人,深知哪些面试题是会被经常问到。 在多家企业从0到1开发过离线数仓实时数仓等多个大型项目,详细介绍项目架构等企业内部秘不外传的资料,介绍踩过的坑和开发干货,分享多个拿来即用的大数据ETL工具,让小白用户快速入门并精通,指导如何入职后快速上手。 计划更新内容100篇以上,包括一些企业内部秘不外宣的干货,欢迎订阅!

全部评论

相关推荐

我的人生算是废了,23届裸辞空档一年,存款只能坚持几个月了,找不到像样的工作了,人生何去何从。
梦想是成为七海千秋:这大环境下为什么要裸辞呀,风险真的挺大的,而且社招的话23届没有太多的竞争力,不过既然已经裸辞了就不要焦虑慢慢找。
我的求职精神状态
点赞 评论 收藏
分享
使用AJAX进行异步通信的基本步骤如下:https://www.nowcoder.com/issue/tutorial?zhuanlanId=Mg58Em&amp;uuid=43521d43a8e341f888324dd690363024创建XMLHttpRequest对象:使用JavaScript代码创建一个XMLHttpRequest对象,该对象用于进行异步通信。为XMLHttpRequest对象添加事件监听器:为XMLHttpRequest对象添加事件监听器,以便在通信状态改变时接收回调。创建请求:使用XMLHttpRequest对象的open方法创建一个HTTP请求。其中,需要指定请求的方法(GET或POST)和目标URL。设置请求头部:使用XMLHttpRequest对象的setRequestHeader方法设置请求头部,以便向服务器传递必要的信息,如数据格式等。发送请求:使用XMLHttpRequest对象的send方法发送请求。对于GET请求,可以将参数拼接到URL后面;对于POST请求,可以将参数以字符串或FormData对象的形式传递。处理响应:在XMLHttpRequest对象的事件回调函数中,使用responseText或responseXML属性来获取服务器的响应数据。可以使用这些数据来更新页面或进行其他操作。AJAX的原理是通过XMLHttpRequest对象实现与服务器的异步通信。在传统的同步通信中,浏览器发起请求后需要等待服务器响应,并在等待期间无法进行其他操作。而使用AJAX进行异步通信时,浏览器可以在发送请求后继续执行其他代码,不需要等待服务器响应。当响应返回后,浏览器会调用注册的回调函数来处理响应数据,从而实现异步更新页面内容。AJAX主要用于以下方面:https://www.nowcoder.com/issue/tutorial?zhuanlanId=Mg58Em&amp;uuid=43521d43a8e341f888324dd690363024动态加载内容:可以在页面加载完成后通过AJAX请求服务器获取额外的内容,如文章列表、评论等。表单数据提交:可以通过AJAX将表单数据异步提交给服务器,而不需要刷新整个页面。轮询更新:可以周期性地向服务器发送请求,以获取热点数据的更新。需要注意的是,由于AJAX请求涉及跨域问题,可能会遇到安全性限制。在跨域请求时,需要服务器允许相关的请求,并且需要特别处理响应的数据。
社畜职场交流圈
点赞 评论 收藏
分享
评论
点赞
收藏
分享

创作者周榜

更多
牛客网
牛客网在线编程
牛客网题解
牛客企业服务