YARN


声明:本文转载自https://my.oschina.net/u/554140/blog/1785608,转载目的在于传递更多信息,仅供学习交流之用。如有侵权行为,请联系我,我会及时删除。

1. 什么是YARN

Yet Another Resource Negotiator(另一种资源协调者),是一种新的Hadoop资源管理器,它是一个通用资源管理系统,可为上层应用提供统一的资源管理和调度

2. YARN架构

  1. ResurceManager(RM):一个纯粹的调度器,专门负责集群中可用资源的分配和管理。
  2. Container :分配给具体应用的资源抽象表现形式,包括内存、cpu、disk
  3. NodeManager(NM) :负责节点本地资源的管理,包括启动应用程序的Container,监控它们的资源使用情况,并报告给RM
  4. App Master (ApplicationMaster(AM)):特定框架库的一个实例,负责有RM协商资源,并和NM协调工作来执行和监控Container以及它们的资源消耗。AM也是以一个的Container身份运行。
  5. 客户端(Client):是集群中一个能向RM提交应用的实例,并且指定了执行应用所需要的AM类型

YARN架构

MR Client和Drill Client

3. 如何编写YARN应用程序

  1. Client

    • 初始化并启动一个YarnClient
    Configuration yarnConfig = new YarnConfiguration(getConf()); YarnClient client = YarnClient.createYarnClient(); client.init(yarnConfig); client.start(); 
    • 创建一个应用程序
    YarnClientApplication app = client.createApplication(); GetNewApplicationResponse appResponse = app.getNewApplicationResponse(); 
    • 设置应用程序提交上下文
    // 1. 设置应用程序提交上下文基本信息 ApplicationSubmissionContext appContext = app.getApplicationSubmissionContext(); appContext.setApplicationId(appResponse.getApplicationId()); appContext.setApplicationName(config.getProperty("app.name")); appContext.setApplicationType(config.getProperty("app.type")); appContext.setApplicationTags(new LinkedHashSet<>(Arrays.asList(config.getProperty("app.tags").split(",")))); // queue:默认是default appContext.setQueue(config.getProperty("app.queue")); appContext.setPriority(Priority.newInstance(Integer.parseInt(config.getProperty("app.priority")))); appContext.setResource(Resource.newInstance(Integer.parseInt(config.getProperty("am.memory")),     Integer.parseInt(config.getProperty("am.vCores"))));  //2. 设置am container启动上下文  ContainerLaunchContext amContainer = Records.newRecord(ContainerLaunchContext.class);  // 3. 设置am localResources Map<String, LocalResource> amLocalResources = new LinkedHashMap<>(); LocalResource drillArchive = Records.newRecord(LocalResource.class); drillArchive.setResource(ConverterUtils.getYarnUrlFromPath(drillArchiveFileStatus.getPath())); drillArchive.setSize(drillArchiveFileStatus.getLen()); drillArchive.setTimestamp(drillArchiveFileStatus.getModificationTime()); drillArchive.setType(LocalResourceType.ARCHIVE); drillArchive.setVisibility(LocalResourceVisibility.PUBLIC); amLocalResources.put(config.getProperty("drill.archive.name"), drillArchive); amContainer.setLocalResources(amLocalResources);  // 4. 设置am environment  Map<String, String> amEnvironment = new LinkedHashMap<>(); // add Hadoop Classpath for (String classpath : yarnConfig.getStrings(YarnConfiguration.YARN_APPLICATION_CLASSPATH,     YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH)) {     Apps.addToEnvironment(amEnvironment, Environment.CLASSPATH.name(),         classpath.trim(), ApplicationConstants.CLASS_PATH_SEPARATOR); } Apps.addToEnvironment(amEnvironment, Environment.CLASSPATH.name(),     Environment.PWD.$() + File.separator + "*", ApplicationConstants.CLASS_PATH_SEPARATOR); StringWriter sw = new StringWriter(); config.store(sw, ""); String configBase64Binary = DatatypeConverter.printBase64Binary(sw.toString().getBytes("UTF-8")); Apps.addToEnvironment(amEnvironment, "DRILL_ON_YARN_CONFIG", configBase64Binary,     ApplicationConstants.CLASS_PATH_SEPARATOR); amContainer.setEnvironment(amEnvironment);  // 5. 设置am command  List<String> commands = new ArrayList<>(); commands.add(Environment.SHELL.$$()); commands.add(config.getProperty("drill.archive.name") + "/bin/drill-am.sh"); commands.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/" + ApplicationConstants.STDOUT); commands.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/" + ApplicationConstants.STDERR); StringBuilder amCommand = new StringBuilder(); for (String str : commands) {     amCommand.append(str).append(" "); } amCommand.setLength(amCommand.length() - " ".length()); amContainer.setCommands(Collections.singletonList(amCommand.toString()));  // 6. 设置安全令牌 if (UserGroupInformation.isSecurityEnabled()) {     Credentials credentials = new Credentials();     String tokenRenewer = yarnConfig.get(YarnConfiguration.RM_PRINCIPAL);     final Token<?> tokens[] = fileSystem.addDelegationTokens(tokenRenewer, credentials);     DataOutputBuffer dob = new DataOutputBuffer();     credentials.writeTokenStorageToStream(dob);     ByteBuffer fsTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());     amContainer.setTokens(fsTokens); }  appContext.setAMContainerSpec(amContainer); 
    • 提交应用程序
    client.submitApplication(appContext); 
  2. ApplicationMaster(AM)

    1. 初始化AMRMClientAsync
    YarnConfiguration yarnConfig = new YarnConfiguration(); AMRMClientAsync amrmClientAsync = AMRMClientAsync.createAMRMClientAsync(5000, new AMRMCallbackHandler()); amrmClientAsync.init(yarnConfig); amrmClientAsync.start(); 
    1. 初始化NMClientAsync
    YarnConfiguration yarnConfig = new YarnConfiguration(); NMClientAsync nmClientAsync = NMClientAsync.createNMClientAsync(new NMCallbackHandler()); nmClientAsync.init(yarnConfig); nmClientAsync.start(); 
    1. 注册ApplicationMaster(AM)
    String thisHostName = InetAddress.getLocalHost(); amrmClientAsync.registerApplicationMaster(thisHostName, 0, ""); 
    1. 添加ContainerRequest
    for (NodeReport containerReport : containerReports) {     ContainerRequest containerRequest = new ContainerRequest(capability,         new String[] {containerReport.getNodeId().getHost()},         null, priority, false);     amrmClientAsync.addContainerRequest(containerRequest); } 
    1. 启动容器
    private static class AMRMCallbackHandler implements AMRMClientAsync.CallbackHandler {     @Override     public void onContainersAllocated(List<Container> containers) {         for (Container container : containers) {             ContainerLaunchContext containerContext = Records.newRecord(ContainerLaunchContext.class);              // setEnvironment             Map<String, String> containerEnvironment = new LinkedHashMap<>();             // add Hadoop Classpath             for (String classpath : yarnConfig.getStrings(YarnConfiguration.YARN_APPLICATION_CLASSPATH,                 YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH)) {                 Apps.addToEnvironment(containerEnvironment, Environment.CLASSPATH.name(),                     classpath.trim(), ApplicationConstants.CLASS_PATH_SEPARATOR);             }             Apps.addToEnvironment(containerEnvironment, Environment.CLASSPATH.name(),                 Environment.PWD.$() + File.separator + "*", ApplicationConstants.CLASS_PATH_SEPARATOR);             containerContext.setEnvironment(containerEnvironment);              // setContainerResource             Map<String, LocalResource> containerResources = new LinkedHashMap<>();             LocalResource drillArchive = Records.newRecord(LocalResource.class);             String drillArchivePath = appConfig.getProperty("fs.upload.dir") + appConfig.getProperty(                 "drill.archive.name");             Path path = new Path(drillArchivePath);             FileStatus fileStatus = FileSystem.get(yarnConfig).getFileStatus(path);             drillArchive.setResource(ConverterUtils.getYarnUrlFromPath(fileStatus.getPath()));             drillArchive.setSize(fileStatus.getLen());             drillArchive.setTimestamp(fileStatus.getModificationTime());             drillArchive.setType(LocalResourceType.ARCHIVE);             drillArchive.setVisibility(LocalResourceVisibility.PUBLIC);             containerResources.put(appConfig.getProperty("drill.archive.name"), drillArchive);             containerContext.setLocalResources(containerResources);              // setContainerCommand             List<String> commands = new ArrayList<>();             commands.add(Environment.SHELL.$$());             commands.add(appConfig.getProperty("drill.archive.name") + "/bin/drillbit.sh run");             commands.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/" + ApplicationConstants.STDOUT);             commands.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/" + ApplicationConstants.STDERR);             StringBuilder containerCommand = new StringBuilder();             for (String str : commands) {                 containerCommand.append(str).append(" ");             }             containerCommand.setLength(containerCommand.length() - " ".length());             containerContext.setCommands(Collections.singletonList(containerCommand.toString()));              nmClientAsync.startContainerAsync(container, containerContext);         }      } } 
    1. unregisterApplicationMaster(AM)
    amrmClientAsync.unregisterApplicationMaster(appStatus, appMessage, null); 

客户端发给ResourceManager的资源请求示例

ApplicationMaster和NodeManager的交互

本文发表于2018年03月27日 22:38
(c)注:本文转载自https://my.oschina.net/u/554140/blog/1785608,转载目的在于传递更多信息,并不代表本网赞同其观点和对其真实性负责。如有侵权行为,请联系我们,我们会及时删除.

阅读 1754 讨论 0 喜欢 0

抢先体验

扫码体验
趣味小程序
文字表情生成器

闪念胶囊

你要过得好哇,这样我才能恨你啊,你要是过得不好,我都不知道该恨你还是拥抱你啊。

直抵黄龙府,与诸君痛饮尔。

那时陪伴我的人啊,你们如今在何方。

不出意外的话,我们再也不会见了,祝你前程似锦。

这世界真好,吃野东西也要留出这条命来看看

快捷链接
网站地图
提交友链
Copyright © 2016 - 2021 Cion.
All Rights Reserved.
京ICP备2021004668号-1