标签:rect name eric 内核 内存模型 win amp tco parameter
一、Flink内核解析,针对版本1.12.0,四大块:任务的提交流程,组件通信,任务的调度,内存管理。
任务的提交流程:以命令行的提交命令开始追踪类,20多个步骤,几十个类,数千行代码量,最后画一个PPT动图
组件通信:actor的模型,akka基本原理和实现,5大关键角色:代理转发,处理细节,PPT动图
任务调度:streamGraph-jobGraph-ext,四个图的讲解,在什么步骤还是生成图,调用位置,如何转换,
task任务调度:调度器,调度模型,调度策略
task执行:以map算子为例,详细展示
内存管理:1.10之后的内存模型做了改进,jobmanage,taskmanage,
如何jvm内存不足的处理,内存的分配过程,数据结构,特有的组件,网络传输的内存管理如何实现
反压过程的讲解
二、yarn-per-job的提交流程讲解,企业常用的模式
前提启动hadoop,安装了flink,不需要启动flink集群
nc -lk 9999启动端口 flink run -t yarn-per-job /opt/module/flink-1.11.3/examples/streaming/SocketWindowWordCount.jar --port 9999
启动完成之后会多出这几个类
2466 NodeManager 4596 YarnTaskExecutorRunner 4426 YarnJobClusterEntrypoint 2124 DataNode 4749 Jps 1983 NameNode 3679 CliFrontend
执行客户端的入口类
org.apache.flink.client.cli.CliFrontend
进入main方法看主要逻辑,123,细枝末节太多,看主要逻辑,不然容易混乱
// 1. find the configuration directory
// 拿路径 final String configurationDirectory = getConfigurationDirectoryFromEnv(); // 2. load the global configuration
// 拿配置 final Configuration configuration = GlobalConfiguration.loadConfiguration(configurationDirectory); // 3. load the custom command lines
// 加载自定义命令行 三种顺序:1.generic,2.yarn,3.default final List<CustomCommandLine> customCommandLines = loadCustomCommandLines( configuration, configurationDirectory); try { final CliFrontend cli = new CliFrontend( configuration, customCommandLines); SecurityUtils.install(new SecurityConfiguration(cli.configuration));
// 核心逻辑run方法 int retCode = SecurityUtils.getInstalledContext() .runSecured(() -> cli.parseAndRun(args)); System.exit(retCode); }
进入cli.parseAndRun(args));
run -t yarn-per-job /opt/module/flink-1.11.3/examples/streaming/SocketWindowWordCount.jar --port 9999
// get action 这个拿到命令行参数的第一个,run String action = args[0]; // remove action from parameters 除开run 后面的参数 final String[] params = Arrays.copyOfRange(args, 1, args.length); switch (action) { case ACTION_RUN: //进入
run(params); return 0; case ACTION_RUN_APPLICATION: runApplication(params); return 0; case ACTION_LIST: list(params); return 0; case ACTION_INFO: info(params); return 0; case ACTION_CANCEL: cancel(params); return 0; case ACTION_STOP: stop(params); return 0; case ACTION_SAVEPOINT: savepoint(params); return 0; case "-h": case "--help": CliFrontendParser.printHelp(customCommandLines); return 0; case "-v": case "--version": String version = EnvironmentInformation.getVersion(); String commitID = EnvironmentInformation.getRevisionInformation().commitId; System.out.print("Version: " + version); System.out.println(commitID.equals(EnvironmentInformation.UNKNOWN) ? "" : ", Commit ID: " + commitID); return 0; default: System.out.printf("\"%s\" is not a valid action.\n", action); System.out.println(); System.out.println("Valid actions are \"run\", \"list\", \"info\", \"savepoint\", \"stop\", or \"cancel\"."); System.out.println(); System.out.println("Specify the version option (-v or --version) to print Flink version."); System.out.println(); System.out.println("Specify the help option (-h or --help) to get help on the command."); return 1; }
进入 run(params);
//默认配置
final Options commandOptions = CliFrontendParser.getRunCommandOptions();
//获取命令行 final CommandLine commandLine = getCommandLine(commandOptions, args, true); // evaluate help flag if (commandLine.hasOption(HELP_OPTION.getOpt())) { CliFrontendParser.printHelpForRun(customCommandLines); return; } final CustomCommandLine activeCommandLine = validateAndGetActiveCommandLine(checkNotNull(commandLine)); final ProgramOptions programOptions = ProgramOptions.create(commandLine); final List<URL> jobJars = getJobJarAndDependencies(programOptions); final Configuration effectiveConfiguration = getEffectiveConfiguration( activeCommandLine, commandLine, programOptions, jobJars); LOG.debug("Effective executor configuration: {}", effectiveConfiguration); final PackagedProgram program = getPackagedProgram(programOptions, effectiveConfiguration); try {
//执行 executeProgram(effectiveConfiguration, program); }
这个就是最终执行:
executeProgram(effectiveConfiguration, program);
标签:rect name eric 内核 内存模型 win amp tco parameter
原文地址:https://www.cnblogs.com/fi0108/p/14899055.html