本文档在高层次上描述了为 YARN 实现新应用程序的方法。
相关概念和流程
在应用的提交流程中,是应用客户端将应用提交到 YARN ResourceManager。这可以通过设置YarnClient来完成。YarnClient启动后,客户端可以设置应用程序环境,准备好包含应用程序的第一个容器ApplicationMaster(AM),然后提交申请。您需要提供诸如应用程序运行所需的本地文件/jar 的详细信息、需要执行的实际命令(带有必要的命令行参数)、任何操作系统环境设置(可选)等信息等等。实际上,您需要描述需要为 ApplicationMaster 启动的 Unix 进程。
然后 YARN ResourceManager 将在分配的容器上启动 ApplicationMaster(如指定的那样)。ApplicationMaster 与 YARN 集群通信,并处理应用程序执行。它以异步方式执行操作。在应用程序启动期间,ApplicationMaster 的主要任务是:
 (资料图)
(资料图)
任务 a) 可以通过AMRMClientAsync对象异步执行,在AMRMClientAsync.CallbackHandler 中指定事件处理方法事件处理程序的类型。事件处理程序需要显式设置给客户端。任务 b) 可以通过启动一个可运行对象来执行,该对象在分配了容器时启动容器。作为启动此容器的一部分,AM 必须指定具有启动信息(例如命令行规范、环境等)的ContainerLaunchContext。
在应用程序执行期间,ApplicationMaster 通过NMClientAsync对象与NodeManager 通信。所有的容器事件NMClientAsync.CallbackHandler处理并关联NMClientAsync。典型的回调处理程序处理客户端启动、停止、状态更新和错误。ApplicationMaster 还通过处理AMRMClientAsync.CallbackHandler的getProgress()方法向 ResourceManager 报告执行进度。
除了异步客户端之外,某些工作流(AMRMClient和NMClient)还有同步版本。推荐使用异步客户端,因为(主观上)使用更简单,本文将主要介绍异步客户端。有关同步客户端的更多信息,请参阅AMRMClient和NMClient。
接口
以下部分是一些重要的接口:
Client<–>ResourceManager通过使用YarnClient对象 ApplicationMaster<–>ResourceManager通过使用AMRMClientAsync对象,通过AMRMClientAsync.CallbackHandler异步处理事件 ApplicationMaster<–>NodeManager启动容器。通过使用NodeManagers沟通NMClientAsync对象,通过NMClientAsync.CallbackHandler处理容器事件特别提醒:
YARN 应用程序的三个主要协议(ApplicationClientProtocol、ApplicationMasterProtocol 和 ContainerManagementProtocol)仍然保留。3 个客户端封装了这 3 个协议,为 YARN 应用程序提供更简单的编程模型。 在极少数情况下,程序员可能希望直接使用这 3 种协议来实现应用程序。但是,请注意,对于一般用例,不再鼓励此类行为。编写一个简单的YARN应用
初始化和启动YarnClient
YarnClient yarnClient = YarnClient.createYarnClient(); yarnClient.init(conf); yarnClient.start();一旦client启动后,即可在yarn上创建应用,并获取应用id
YarnClientApplication app = yarnClient.createApplication();GetNewApplicationResponse appResponse = app.getNewApplicationResponse();YarnClientApplication 对新应用程序的响应还包含有关集群的信息,例如集群的最小/最大资源能力。 这是必需的,以确保您可以正确设置将在其中启动 ApplicationMaster 的容器的规范。 详情请参考 GetNewApplicationResponse。
客户端的主要关键是设置 ApplicationSubmissionContext,它定义了 RM 启动 AM 所需的所有信息。 客户需要将以下内容设置到上下文中:
应用信息:id, name
Queue, priority info:应用程序将被提交到的队列,为应用程序分配的优先级。
用户:提交申请的用户
ContainerLaunchContext:定义将在其中启动和运行 AM 的容器的信息。 如前所述,ContainerLaunchContext 定义了运行应用程序所需的所有必需信息,例如本地 Resources(二进制文件、jar、文件等)、环境设置(CLASSPATH 等)、要执行的命令和安全 Tokens (RECT)。
// set the application submission contextApplicationSubmissionContext appContext = app.getApplicationSubmissionContext();ApplicationId appId = appContext.getApplicationId();appContext.setKeepContainersAcrossApplicationAttempts(keepContainers);appContext.setApplicationName(appName);// set local resources for the application master// local files or archives as needed// In this scenario, the jar file for the application master is part of the local resourcesMap localResources = new HashMap();LOG.info("Copy App Master jar from local filesystem and add to local environment");// Copy the application master jar to the filesystem// Create a local resource to point to the destination jar pathFileSystem fs = FileSystem.get(conf);addToLocalResources(fs, appMasterJar, appMasterJarPath, appId.toString(),    localResources, null);// Set the log4j properties if neededif (!log4jPropFile.isEmpty()) {  addToLocalResources(fs, log4jPropFile, log4jPath, appId.toString(),      localResources, null);}// The shell script has to be made available on the final container(s)// where it will be executed.// To do this, we need to first copy into the filesystem that is visible// to the yarn framework.// We do not need to set this as a local resource for the application// master as the application master does not need it.String hdfsShellScriptLocation = "";long hdfsShellScriptLen = 0;long hdfsShellScriptTimestamp = 0;if (!shellScriptPath.isEmpty()) {  Path shellSrc = new Path(shellScriptPath);  String shellPathSuffix =      appName + "/" + appId.toString() + "/" + SCRIPT_PATH;  Path shellDst =      new Path(fs.getHomeDirectory(), shellPathSuffix);  fs.copyFromLocalFile(false, true, shellSrc, shellDst);  hdfsShellScriptLocation = shellDst.toUri().toString();  FileStatus shellFileStatus = fs.getFileStatus(shellDst);  hdfsShellScriptLen = shellFileStatus.getLen();  hdfsShellScriptTimestamp = shellFileStatus.getModificationTime();}if (!shellCommand.isEmpty()) {  addToLocalResources(fs, null, shellCommandPath, appId.toString(),      localResources, shellCommand);}if (shellArgs.length > 0) {  addToLocalResources(fs, null, shellArgsPath, appId.toString(),      localResources, StringUtils.join(shellArgs, " "));}// Set the env variables to be setup in the env where the application master will be runLOG.info("Set the environment for the application master");Map env = new HashMap();// put location of shell script into env// using the env info, the application master will create the correct local resource for the// eventual containers that will be launched to execute the shell scriptsenv.put(DSConstants.DISTRIBUTEDSHELLSCRIPTLOCATION, hdfsShellScriptLocation);env.put(DSConstants.DISTRIBUTEDSHELLSCRIPTTIMESTAMP, Long.toString(hdfsShellScriptTimestamp));env.put(DSConstants.DISTRIBUTEDSHELLSCRIPTLEN, Long.toString(hdfsShellScriptLen));// Add AppMaster.jar location to classpath// At some point we should not be required to add// the hadoop specific classpaths to the env.// It should be provided out of the box.// For now setting all required classpaths including// the classpath to "." for the application jarStringBuilder classPathEnv = new StringBuilder(Environment.CLASSPATH.$$())  .append(ApplicationConstants.CLASS_PATH_SEPARATOR).append("./*");for (String c : conf.getStrings(    YarnConfiguration.YARN_APPLICATION_CLASSPATH,    YarnConfiguration.DEFAULT_YARN_CROSS_PLATFORM_APPLICATION_CLASSPATH)) {  classPathEnv.append(ApplicationConstants.CLASS_PATH_SEPARATOR);  classPathEnv.append(c.trim());}classPathEnv.append(ApplicationConstants.CLASS_PATH_SEPARATOR).append(  "./log4j.properties");// Set the necessary command to execute the application masterVector vargs = new Vector(30);// Set java executable commandLOG.info("Setting up app master command");vargs.add(Environment.JAVA_HOME.$$() + "/bin/java");// Set Xmx based on am memory sizevargs.add("-Xmx" + amMemory + "m");// Set class namevargs.add(appMasterMainClass);// Set params for Application Mastervargs.add("--container_memory " + String.valueOf(containerMemory));vargs.add("--container_vcores " + String.valueOf(containerVirtualCores));vargs.add("--num_containers " + String.valueOf(numContainers));vargs.add("--priority " + String.valueOf(shellCmdPriority));for (Map.Entry entry : shellEnv.entrySet()) {  vargs.add("--shell_env " + entry.getKey() + "=" + entry.getValue());}if (debugFlag) {  vargs.add("--debug");}vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/AppMaster.stdout");vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/AppMaster.stderr");// Get final commandStringBuilder command = new StringBuilder();for (CharSequence str : vargs) {  command.append(str).append(" ");}LOG.info("Completed setting up app master command " + command.toString());List commands = new ArrayList();commands.add(command.toString());// Set up the container launch context for the application masterContainerLaunchContext amContainer = ContainerLaunchContext.newInstance(  localResources, env, commands, null, null, null);// Set up resource type requirements// For now, both memory and vcores are supported, so we set memory and// vcores requirementsResource capability = Resource.newInstance(amMemory, amVCores);appContext.setResource(capability);// Service data is a binary blob that can be passed to the application// Not needed in this scenario// amContainer.setServiceData(serviceData);// Setup security tokensif (UserGroupInformation.isSecurityEnabled()) {  // Note: Credentials class is marked as LimitedPrivate for HDFS and MapReduce  Credentials credentials = new Credentials();  String tokenRenewer = conf.get(YarnConfiguration.RM_PRINCIPAL);  if (tokenRenewer == null | | tokenRenewer.length() == 0) {    throw new IOException(      "Can"t get Master Kerberos principal for the RM to use as renewer");  }  // For now, only getting tokens for the default file-system.  final Token> tokens[] =      fs.addDelegationTokens(tokenRenewer, credentials);  if (tokens != null) {    for (Token> token : tokens) {      LOG.info("Got dt for " + fs.getUri() + "; " + token);    }  }  DataOutputBuffer dob = new DataOutputBuffer();  credentials.writeTokenStorageToStream(dob);  ByteBuffer fsTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());  amContainer.setTokens(fsTokens);}appContext.setAMContainerSpec(amContainer);         设置过程完成后,客户端就可以提交具有指定优先级和队列的应用程序。
// Set the priority for the application masterPriority pri = Priority.newInstance(amPriority);appContext.setPriority(pri);// Set the queue to which this application is to be submitted in the RMappContext.setQueue(amQueue);// Submit the application to the applications manager// SubmitApplicationResponse submitResp = applicationsManager.submitApplication(appRequest);yarnClient.submitApplication(appContext);此时,RM 将接受申请,并在后台完成分配具有所需规格的容器的过程,然后最终在分配的容器上设置和启动 AM。
客户端可以通过多种方式跟踪实际任务的进度。
它可以通过 YarnClient 的 getApplicationReport() 方法与 RM 通信并请求应用程序的报告。
// Get application report for the appId we are interested inApplicationReport report = yarnClient.getApplicationReport(appId);从 RM 收到的 ApplicationReport 包括以下内容:
一般申请信息:申请id、提交申请的队列、提交申请的用户、申请开始时间。ApplicationMaster 详细信息:运行 AM 的主机,它正在侦听来自客户端的请求的 rpc 端口(如果有)以及客户端与 AM 通信所需的令牌。应用程序跟踪信息:如果应用程序支持某种形式的进度跟踪,它可以设置一个跟踪 url,该 url 可通过 ApplicationReport 的 getTrackingUrl() 方法获得,客户端可以查看该 url 以监控进度。应用程序状态:ResourceManager 看到的应用程序状态可通过 ApplicationReport#getYarnApplicationState 获得。 如果 YarnApplicationState 设置为 FINISHED,客户端应参考 ApplicationReport#getFinalApplicationStatus 来检查应用程序任务本身的实际成功/失败。 如果出现故障,ApplicationReport#getDiagnostics 可能有助于进一步了解故障。如果ApplicationMaster支持,client可以直接通过应用报告中的host:rpcport信息向AM自身查询进度更新。 如果可用,它还可以使用从报告中获取的跟踪 url。
在某些情况下,如果应用程序花费的时间太长或由于其他因素,客户端可能希望终止该应用程序。 YarnClient 支持 killApplication 调用,允许客户端通过 ResourceManager 向 AM 发送终止信号。 如果这样设计,ApplicationMaster 也可以通过客户端可以利用的 rpc 层支持中止调用。
yarnClient.killApplication(appId);编写ApplicationMaster(AM)
AM 是作业的实际所有者。 它将由 RM 启动,并通过客户提供有关其负责监督和完成的工作的所有必要信息和资源。
由于 AM 是在一个容器内启动的,该容器可能(很可能会)与其他容器共享一个物理主机,考虑到多租户的性质,除其他问题外,它不能对它可以侦听的预配置端口等做出任何假设 .
当 AM 启动时,几个参数通过环境变量提供给它。 其中包括 AM 容器的 ContainerId、应用程序提交时间和有关运行 ApplicationMaster 的 NM(NodeManager)主机的详细信息。 Ref ApplicationConstants 参数名称。
与 RM 的所有交互都需要一个 ApplicationAttemptId(在失败的情况下每个应用程序可以有多次尝试)。 ApplicationAttemptId 可以从 AM 的容器 id 中获取。 有一些辅助 API 可以将从环境中获得的值转换为对象。
Map envs = System.getenv();String containerIdString =    envs.get(ApplicationConstants.AM_CONTAINER_ID_ENV);if (containerIdString == null) {  // container id should always be set in the env by the framework  throw new IllegalArgumentException(      "ContainerId not set in the environment");}ContainerId containerId = ConverterUtils.toContainerId(containerIdString);ApplicationAttemptId appAttemptID = containerId.getApplicationAttemptId(); 在 AM 完全初始化之后,我们可以启动两个客户端:一个到 ResourceManager,一个到 NodeManagers。 我们使用自定义事件处理程序设置它们,我们将在本文后面详细讨论这些事件处理程序。
AMRMClientAsync.CallbackHandler allocListener = new RMCallbackHandler();  amRMClient = AMRMClientAsync.createAMRMClientAsync(1000, allocListener);  amRMClient.init(conf);  amRMClient.start();  containerListener = createNMCallbackHandler();  nmClientAsync = new NMClientAsyncImpl(containerListener);  nmClientAsync.init(conf);  nmClientAsync.start();AM 必须向 RM 发出心跳,以通知它 AM 处于活动状态并且仍在运行。 RM 的超时到期间隔由可通过 YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS 访问的配置设置定义,默认值由 YarnConfiguration.DEFAULT_RM_AM_EXPIRY_INTERVAL_MS 定义。 ApplicationMaster 需要向 ResourceManager 注册自己才能开始心跳。
// Register self with ResourceManager// This will start heartbeating to the RMappMasterHostname = NetUtils.getHostname();RegisterApplicationMasterResponse response = amRMClient    .registerApplicationMaster(appMasterHostname, appMasterRpcPort,        appMasterTrackingUrl);在注册的响应中,如果包含最大资源能力。 您可能想使用它来检查应用程序的请求。
// Dump out information about cluster capability as seen by the// resource managerint maxMem = response.getMaximumResourceCapability().getMemory();LOG.info("Max mem capability of resources in this cluster " + maxMem);int maxVCores = response.getMaximumResourceCapability().getVirtualCores();LOG.info("Max vcores capability of resources in this cluster " + maxVCores);// A resource ask cannot exceed the max.if (containerMemory > maxMem) {  LOG.info("Container memory specified above max threshold of cluster."      + " Using max value." + ", specified=" + containerMemory + ", max="      + maxMem);  containerMemory = maxMem;}if (containerVirtualCores > maxVCores) {  LOG.info("Container virtual cores specified above max threshold of  cluster."    + " Using max value." + ", specified=" + containerVirtualCores + ", max="    + maxVCores);  containerVirtualCores = maxVCores;}List previousAMRunningContainers =    response.getContainersFromPreviousAttempts();LOG.info("Received " + previousAMRunningContainers.size()        + " previous AM"s running containers on AM registration."); 根据任务要求,AM 可以请求一组容器来运行其任务。 我们现在可以计算我们需要多少个容器,并请求这些容器。
List previousAMRunningContainers =    response.getContainersFromPreviousAttempts();LOG.info("Received " + previousAMRunningContainers.size()    + " previous AM"s running containers on AM registration.");int numTotalContainersToRequest =    numTotalContainers - previousAMRunningContainers.size();// Setup ask for containers from RM// Send request for containers to RM// Until we get our fully allocated quota, we keep on polling RM for// containers// Keep looping until all the containers are launched and shell script// executed on them ( regardless of success/failure).for (int i = 0; i < numTotalContainersToRequest; ++i) {  ContainerRequest containerAsk = setupContainerAskForRM();  amRMClient.addContainerRequest(containerAsk);} 在 setupContainerAskForRM() 中,需要设置以下两件事:资源能力:目前,YARN 支持基于内存的资源需求,因此请求应定义需要多少内存。 该值以 MB 为单位定义,并且必须小于集群的最大容量和最小容量的精确倍数。 内存资源对应于对任务容器施加的物理内存限制。 它还将支持基于计算的资源 (vCore),如代码中所示。
优先级:当请求容器集时,AM 可以为每个集定义不同的优先级。 例如,Map-Reduce AM 可以为 Map 任务所需的容器分配更高的优先级,为 Reduce 任务的容器分配更低的优先级。
private ContainerRequest setupContainerAskForRM() {  // setup requirements for hosts  // using * as any host will do for the distributed shell app  // set the priority for the request  Priority pri = Priority.newInstance(requestPriority);  // Set up resource type requirements  // For now, memory and CPU are supported so we set memory and cpu requirements  Resource capability = Resource.newInstance(containerMemory,    containerVirtualCores);  ContainerRequest request = new ContainerRequest(capability, null, null,      pri);  LOG.info("Requested container ask: " + request.toString());  return request;}在应用程序管理器发送容器分配请求后,容器将由 AMRMClientAsync 客户端的事件处理程序异步启动。 处理程序应实现 AMRMClientAsync.CallbackHandler 接口。
当分配了容器时,处理程序会设置一个线程来运行代码以启动容器。 这里我们使用名称 LaunchContainerRunnable 来进行演示。 我们将在本文的以下部分讨论 LaunchContainerRunnable 类。
@Overridepublic void onContainersAllocated(List allocatedContainers) {  LOG.info("Got response from RM for container ask, allocatedCnt="      + allocatedContainers.size());  numAllocatedContainers.addAndGet(allocatedContainers.size());  for (Container allocatedContainer : allocatedContainers) {    LaunchContainerRunnable runnableLaunchContainer =        new LaunchContainerRunnable(allocatedContainer, containerListener);    Thread launchThread = new Thread(runnableLaunchContainer);    // launch and start the container on a separate thread to keep    // the main thread unblocked    // as all containers may not be allocated at one go.    launchThreads.add(launchThread);    launchThread.start();  }} 在心跳时,事件处理程序报告应用程序的进度。
@Overridepublic float getProgress() {  // set progress to deliver to RM on next heartbeat  float progress = (float) numCompletedContainers.get()      / numTotalContainers;  return progress;}容器启动线程实际上是在 NM 上启动容器。 将容器分配给 AM 后,它需要遵循与客户端为将要在分配的容器上运行的最终任务设置 ContainerLaunchContext 所遵循的类似过程。 一旦定义了 ContainerLaunchContext,AM 就可以通过 NMClientAsync 启动它。
// Set the necessary command to execute on the allocated containerVector vargs = new Vector(5);// Set executable commandvargs.add(shellCommand);// Set shell script pathif (!scriptPath.isEmpty()) {  vargs.add(Shell.WINDOWS ? ExecBatScripStringtPath    : ExecShellStringPath);}// Set args for the shell command if anyvargs.add(shellArgs);// Add log redirect paramsvargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout");vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr");// Get final commandStringBuilder command = new StringBuilder();for (CharSequence str : vargs) {  command.append(str).append(" ");}List commands = new ArrayList();commands.add(command.toString());// Set up ContainerLaunchContext, setting local resource, environment,// command and token for constructor.// Note for tokens: Set up tokens for the container too. Today, for normal// shell commands, the container in distribute-shell doesn"t need any// tokens. We are populating them mainly for NodeManagers to be able to// download anyfiles in the distributed file-system. The tokens are// otherwise also useful in cases, for e.g., when one is running a// "hadoop dfs" command inside the distributed shell.ContainerLaunchContext ctx = ContainerLaunchContext.newInstance(  localResources, shellEnv, commands, null, allTokens.duplicate(), null);containerListener.addContainer(container.getId(), container);nmClientAsync.startContainerAsync(container, ctx);    NMClientAsync 对象及其事件处理程序一起处理容器事件。 包括容器启动、停止、状态更新、发生错误。
ApplicationMaster确定工作完成后,需要通过AM-RM客户端注销自己,然后停止客户端。
try {  amRMClient.unregisterApplicationMaster(appStatus, appMessage, null);} catch (YarnException ex) {  LOG.error("Failed to unregister application", ex);} catch (IOException e) {  LOG.error("Failed to unregister application", e);}amRMClient.stop();疑难解答
我如何将应用程序的 jar 分发到 YARN 集群中需要它的所有节点?
您可以使用 LocalResource 将资源添加到您的应用程序请求中。 这将导致 YARN 将资源分发到 ApplicationMaster 节点。 如果资源是 tgz、zip 或 jar – 您可以让 YARN 解压缩它。 然后,您需要做的就是将解压缩的文件夹添加到您的类路径中。 例如,在创建您的申请请求时:
File packageFile = new File(packagePath);URL packageUrl = ConverterUtils.getYarnUrlFromPath(    FileContext.getFileContext().makeQualified(new Path(packagePath)));packageResource.setResource(packageUrl);packageResource.setSize(packageFile.length());packageResource.setTimestamp(packageFile.lastModified());packageResource.setType(LocalResourceType.ARCHIVE);packageResource.setVisibility(LocalResourceVisibility.APPLICATION);resource.setMemory(memory);containerCtx.setResource(resource);containerCtx.setCommands(ImmutableList.of(    "java -cp "./package/*" some.class.to.Run "    + "1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout "    + "2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr"));containerCtx.setLocalResources(    Collections.singletonMap("package", packageResource));appCtx.setApplicationId(appId);appCtx.setUser(user.getShortUserName);appCtx.setAMContainerSpec(containerCtx);yarnClient.submitApplication(appCtx);如您所见,setLocalResources 命令获取名称到资源的映射。 该名称成为您应用程序的 cwd 中的符号链接,因此您可以使用 ./package/* 引用其中的工件。
注意:Java 的类路径 (cp) 参数非常敏感。 确保语法完全正确。
一旦您的包被分发到您的 AM,每当您的 AM 启动一个新容器时,您都需要遵循相同的过程(假设您希望将资源发送到您的容器)。 代码是一样的。 您只需要确保为您的 AM 提供包路径(HDFS 或本地),以便它可以将资源 URL 与容器 ctx 一起发送。
如何获取ApplicationMaster的ApplicationAttemptId?
ApplicationAttemptId 将通过环境变量传递给 AM,环境变量中的值可以通过 ConverterUtils 辅助函数转换为 ApplicationAttemptId 对象。
为什么我的container被NodeManager kill?
这可能是由于高内存使用量超过了您请求的容器内存大小。 造成这种情况的原因有很多。 首先,查看 NodeManager 在终止您的容器时转储的进程树。 您感兴趣的两件事是物理内存和虚拟内存。 如果您超出了物理内存限制,则您的应用程序使用了过多的物理内存。 如果您正在运行 Java 应用程序,则可以使用 -hprof 查看堆中占用空间的内容。 如果您已经超出了虚拟内存,您可能需要增加集群范围的配置变量 yarn.nodemanager.vmem-pmem-ratio 的值。
本文为从大数据到人工智能博主「xiaozhch5」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://lrting.top/backend/12468/

