Hadoop源码解析之Configuration简介


声明:本文转载自https://my.oschina.net/zhangxufeng/blog/1538894,转载目的在于传递更多信息,仅供学习交流之用。如有侵权行为,请联系我,我会及时删除。

       对配置文件的配置及解析是每个框架的基本且必不可少的部分,本文主要对Hadoop中的配置文件的解析类Configuration的基本结构及主要方法进行介绍。       Hadoop的配置文件的操作主要分为三个部分:配置的加载、属性读取和设置,本文将分别对其进行介绍。如下是Configuration的主要的成员属性:

/**    * 保存了所有的资源配置的来源信息,资源文件主要有以下几种形式: URL、String、Path、InputStream和Properties。   */ private ArrayList<Resource> resources = new ArrayList<Resource>();  /**   * 记录了配置文件中配置的final类型的属性名称,标记为final之后如果另外有同名的属性,那么该属性将不会被替换   */ private Set<String> finalParameters = Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());  /**   * 记录了配置初始化后更新过的属性,其键为更新的属性名,值为更新操作的来源   */ private Map<String, String[]> updatingResource;  /**   * 记录了所有的属性,包括系统初始化以及后续设置的属性   */ private Properties properties;  /**   * 记录了除了初始化之后手动调用set方法设置的属性   */ private Properties overlay;  /**   * 记录了过期的属性   */ private static AtomicReference<DeprecationContext> deprecationContext = new AtomicReference<DeprecationContext>(new DeprecationContext(null, defaultDeprecations)); 

1.配置的加载

       Configuration中的配置文件的信息主要保存在resources属性中,该属性是一个ArrayList<Resource>类型,如下是Resource的结构:

private static class Resource {     private final Object resource;     private final String name;          // 构造方法,get和set方法略   } 

这里的name表示资源文件的名称,resource则表示具体的资源,其用一个Object类型,但具体的类型如下:

  • URL: 通过一个URL链接来进行读取;
  • String: 从当前classpath下该字符串所指定的文件读取;
  • Path: 以绝对路径指定的配置文件或者是使用url指定的配置;
  • InputStream: 以流的形式指定的配置文件;
  • Properties: 以属性配置类保存的配置信息。 除了可以通过上述方式进行配置的设置以外,Hadoop还设置了几个默认的配置文件:core-default.xml、core-site.xml和hadoop-site.xml,具体的读取代码如下:
  static {     // Add default resources     addDefaultResource("core-default.xml");     addDefaultResource("core-site.xml");      // print deprecation warning if hadoop-site.xml is found in classpath     ClassLoader cL = Thread.currentThread().getContextClassLoader();     if (cL == null) {       cL = Configuration.class.getClassLoader();     }     if (cL.getResource("hadoop-site.xml") != null) {       LOG.warn("DEPRECATED: hadoop-site.xml found in the classpath. " +           "Usage of hadoop-site.xml is deprecated. Instead use core-site.xml, "           + "mapred-site.xml and hdfs-site.xml to override properties of " +           "core-default.xml, mapred-default.xml and hdfs-default.xml " +           "respectively");       addDefaultResource("hadoop-site.xml");     }   } 

这里需要注意的是,Hadoop对配置文件的读取并不是在Configuration类初始化时进行的,而是在获取配置信息时再进行读取,再此之前,配置相关的信息都保存在resources属性中。如下是addDefaultResource(String)方法的代码:

  public static synchronized void addDefaultResource(String name) {     if(!defaultResources.contains(name)) {       defaultResources.add(name);       for(Configuration conf : REGISTRY.keySet()) {         if(conf.loadDefaults) {           conf.reloadConfiguration();         }       }     }   } 

       从上述代码可以看出,在添加配置文件时,其先检查默认配置文件中是否有该配置文件,如果不存在,则将其添加到defaultResources列表中,并且其会对Configuration配置项进行检查,如果其配置了加载默认配置文件,那么其就会对该配置中保存配置信息的properties和finalParameters进行清空,以此触发属性的重新加载。        在初始化时将相关配置信息添加到resources和defaultResources列表之后,配置文件中属性的读取是在客户端具体调用get*方法时进行的,以下是初始化配置信息的代码:

  protected synchronized Properties getProps() {     if (properties == null) {       properties = new Properties();       Map<String, String[]> backup =           new ConcurrentHashMap<String, String[]>(updatingResource);       loadResources(properties, resources, quietmode);        if (overlay != null) {         properties.putAll(overlay);         for (Map.Entry<Object,Object> item: overlay.entrySet()) {           String key = (String)item.getKey();           String[] source = backup.get(key);           if(source != null) {             updatingResource.put(key, source);           }         }       }     }     return properties;   } 

上述代码中,首先判断properties是否为空,为空则进行初始化,否则直接返回,这也就是前面为什么将properties和finalParameters进行清空之后能够触发属性的重新加载的原因。在初始化配置信息的时候首先对更新的资源过的资源进行备份,然后调用loadResources()方法加载配置文件的信息,加载完之后还会将用户调用set*方法设置的属性(保存在overlay中)添加到properties中,并且将用户设置的信息添加到updatingResource中。如下是loadResources()方法的具体代码:

  private void loadResources(Properties properties, ArrayList<Resource> resources, boolean quiet) {     if(loadDefaults) {       for (String resource : defaultResources) {         loadResource(properties, new Resource(resource), quiet);       }     }          for (int i = 0; i < resources.size(); i++) {       Resource ret = loadResource(properties, resources.get(i), quiet);       if (ret != null) {         resources.set(i, ret);       }     }   } 

loadResources是对所有的配置文件信息进行加载的方法,其主要包括两个部分:加载默认配置文件和加载资源文件。在加载配置文件时除了Properties和Resource属性之外,还有一个boolean类型的属性quiet,该属性表示是否以“安静”模式加载配置文件,即加载时是否打印加载日志。如下是通过loadResource()加载配置文件的具体代码:

  private Resource loadResource(Properties properties,                                 Resource wrapper, boolean quiet) {     String name = UNKNOWN_RESOURCE;     try {       Object resource = wrapper.getResource();       name = wrapper.getName();       XMLStreamReader2 reader = null;       boolean returnCachedProperties = false;        if (resource instanceof URL) {                  // an URL resource         reader = (XMLStreamReader2)parse((URL)resource);       } else if (resource instanceof String) {        // a CLASSPATH resource         URL url = getResource((String)resource);         reader = (XMLStreamReader2)parse(url);       } else if (resource instanceof Path) {          // a file resource         // Can't use FileSystem API or we get an infinite loop         // since FileSystem uses Configuration API.  Use java.io.File instead.         File file = new File(((Path)resource).toUri().getPath())           .getAbsoluteFile();         if (file.exists()) {           if (!quiet) {             LOG.debug("parsing File " + file);           }           reader = (XMLStreamReader2)parse(new BufferedInputStream(               new FileInputStream(file)), ((Path)resource).toString());         }       } else if (resource instanceof InputStream) {         reader = (XMLStreamReader2)parse((InputStream)resource, null);         returnCachedProperties = true;       } else if (resource instanceof Properties) {         overlay(properties, (Properties)resource);       }        if (reader == null) {         if (quiet) {           return null;         }         throw new RuntimeException(resource + " not found");       }       Properties toAddTo = properties;       if(returnCachedProperties) {         toAddTo = new Properties();       }       DeprecationContext deprecations = deprecationContext.get();        StringBuilder token = new StringBuilder();       String confName = null;       String confValue = null;       String confInclude = null;       boolean confFinal = false;       boolean fallbackAllowed = false;       boolean fallbackEntered = false;       boolean parseToken = false;       LinkedList<String> confSource = new LinkedList<String>();        while (reader.hasNext()) {         switch (reader.next()) {         case XMLStreamConstants.START_ELEMENT:           switch (reader.getLocalName()) {           case "property":             confName = null;             confValue = null;             confFinal = false;             confSource.clear();              // First test for short format configuration             int attrCount = reader.getAttributeCount();             for (int i = 0; i < attrCount; i++) {               String propertyAttr = reader.getAttributeLocalName(i);               if ("name".equals(propertyAttr)) {                 confName = StringInterner.weakIntern(                     reader.getAttributeValue(i));               } else if ("value".equals(propertyAttr)) {                 confValue = StringInterner.weakIntern(                     reader.getAttributeValue(i));               } else if ("final".equals(propertyAttr)) {                 confFinal = "true".equals(reader.getAttributeValue(i));               } else if ("source".equals(propertyAttr)) {                 confSource.add(StringInterner.weakIntern(                     reader.getAttributeValue(i)));               }             }             break;           case "name":           case "value":           case "final":           case "source":             parseToken = true;             token.setLength(0);             break;           case "include":             // Determine href for xi:include             confInclude = null;             attrCount = reader.getAttributeCount();             for (int i = 0; i < attrCount; i++) {               String attrName = reader.getAttributeLocalName(i);               if ("href".equals(attrName)) {                 confInclude = reader.getAttributeValue(i);               }             }             if (confInclude == null) {               break;             }             // Determine if the included resource is a classpath resource             // otherwise fallback to a file resource             // xi:include are treated as inline and retain current source             URL include = getResource(confInclude);             if (include != null) {               Resource classpathResource = new Resource(include, name);               loadResource(properties, classpathResource, quiet);             } else {               URL url;               try {                 url = new URL(confInclude);                 url.openConnection().connect();               } catch (IOException ioe) {                 File href = new File(confInclude);                 if (!href.isAbsolute()) {                   // Included resources are relative to the current resource                   File baseFile = new File(name).getParentFile();                   href = new File(baseFile, href.getPath());                 }                 if (!href.exists()) {                   // Resource errors are non-fatal iff there is 1 xi:fallback                   fallbackAllowed = true;                   break;                 }                 url = href.toURI().toURL();               }               Resource uriResource = new Resource(url, name);               loadResource(properties, uriResource, quiet);             }             break;           case "fallback":             fallbackEntered = true;             break;           case "configuration":             break;           default:             break;           }           break;          case XMLStreamConstants.CHARACTERS:           if (parseToken) {             char[] text = reader.getTextCharacters();             token.append(text, reader.getTextStart(), reader.getTextLength());           }           break;          case XMLStreamConstants.END_ELEMENT:           switch (reader.getLocalName()) {           case "name":             if (token.length() > 0) {               confName = StringInterner.weakIntern(token.toString().trim());             }             break;           case "value":             if (token.length() > 0) {               confValue = StringInterner.weakIntern(token.toString());             }             break;           case "final":             confFinal = "true".equals(token.toString());             break;           case "source":             confSource.add(StringInterner.weakIntern(token.toString()));             break;           case "include":             if (fallbackAllowed && !fallbackEntered) {               throw new IOException("Fetch fail on include for '"                   + confInclude + "' with no fallback while loading '"                   + name + "'");             }             fallbackAllowed = false;             fallbackEntered = false;             break;           case "property":             if (confName == null || (!fallbackAllowed && fallbackEntered)) {               break;             }             confSource.add(name);             DeprecatedKeyInfo keyInfo =                 deprecations.getDeprecatedKeyMap().get(confName);             if (keyInfo != null) {               keyInfo.clearAccessed();               for (String key : keyInfo.newKeys) {                 // update new keys with deprecated key's value                 loadProperty(toAddTo, name, key, confValue, confFinal,                     confSource.toArray(new String[confSource.size()]));               }             } else {               loadProperty(toAddTo, name, confName, confValue, confFinal,                   confSource.toArray(new String[confSource.size()]));             }             break;           default:             break;           }         default:           break;         }       }       reader.close();        if (returnCachedProperties) {         overlay(properties, toAddTo);         return new Resource(toAddTo, name);       }       return null;     } catch (IOException e) {       LOG.fatal("error parsing conf " + name, e);       throw new RuntimeException(e);     } catch (XMLStreamException e) {       LOG.fatal("error parsing conf " + name, e);       throw new RuntimeException(e);     }   } 

从代码中可以看出,在加载每个配置文件的时候,首先通过instanceof判断wrapper对象中Object类型的属性resource是什么类型,然后根据具体不同的类型将其转换为一个XMLStreamReader2类型的reader。对于转换之后的reader,其会依次读取xml配置文件中的具体标签信息,如:name、value、final、source等等,并且将读取后的信息保存在properties中,这里需要注意的是,在代码中的case "include"处可以看出,配置文件中如果使用了<include></include>标签引入其他的配置文件,那么其会递归的调用loadResource()方法对其进行读取。        在加载完配置

2.属性的获取

       Configuration类主要是通过get方法获取相关属性的,其get方法的种类有三十多个,如:get(String)、get(String, String)、getBoolean(String, Boolean)、getClass(String, Class<?>)等等,但是最终调用的还是get(String, String)方法,该方法的作用是获取一个某个名称对应的属性值,具体代码如下:

  public String get(String name, String defaultValue) {     // 处理过期的属性     String[] names = handleDeprecation(deprecationContext.get(), name);     String result = null;     // 对属性值中形如${foo.bar}引入的其他属性进行替换     for(String n : names) {       result = substituteVars(getProps().getProperty(n, defaultValue));     }     return result;   } 

通过上述代码可以看出,get(String, String)方法主要做了两件事:处理过期键和对属性值进行标签替换处理。根据前面的介绍我们知道deprecationContext是一个AtomicReference<DeprecationContext>类型的变量,其保存有过期键的相关信息,并且这里使用AtomicReference包装,也就是说其获取和更新都是原子化的。这里我们首先讲解一下DeprecationContext具体实现方式,如下是该类的代码:

  private static class DeprecationContext {     /**      * 保存有过期键信息,其值为主要是该过期键更新后的键的信息      */     private final Map<String, DeprecatedKeyInfo> deprecatedKeyMap;      /**      * 对于DeprecatedKeyInfo,其有一个String[] newKeys属性,即更新后的键信息,这里reverseDeprecatedKeyMap的      * 键为newKeys中的各个字符串,而值则对应于newKeys所在的DeprecatedKeyInfo对象在deprecatedKeyMap所属的键      */     private final Map<String, String> reverseDeprecatedKeyMap;      /**      * 根据另一个DeprecationContext对象实例化一个DeprecationContext对象,这里DeprecationDelta指的是除了DeprecationContext对象以外新增加的属性      */     @SuppressWarnings("unchecked")     DeprecationContext(DeprecationContext other, DeprecationDelta[] deltas) {       HashMap<String, DeprecatedKeyInfo> newDeprecatedKeyMap = new HashMap<String, DeprecatedKeyInfo>();       HashMap<String, String> newReverseDeprecatedKeyMap = new HashMap<String, String>();       if (other != null) {         for (Entry<String, DeprecatedKeyInfo> entry : other.deprecatedKeyMap.entrySet()) {           newDeprecatedKeyMap.put(entry.getKey(), entry.getValue());         }         for (Entry<String, String> entry : other.reverseDeprecatedKeyMap.entrySet()) {           newReverseDeprecatedKeyMap.put(entry.getKey(), entry.getValue());         }       }       for (DeprecationDelta delta : deltas) {         if (!newDeprecatedKeyMap.containsKey(delta.getKey())) {           DeprecatedKeyInfo newKeyInfo = new DeprecatedKeyInfo(delta.getNewKeys(), delta.getCustomMessage());           newDeprecatedKeyMap.put(delta.key, newKeyInfo);           // 从这里可以看出,reverseDeprecatedKeyMap中的键为更新之后的过期键信息,而值为最初的过期键           for (String newKey : delta.getNewKeys()) {             newReverseDeprecatedKeyMap.put(newKey, delta.key);           }         }       }       this.deprecatedKeyMap = UnmodifiableMap.decorate(newDeprecatedKeyMap);       this.reverseDeprecatedKeyMap = UnmodifiableMap.decorate(newReverseDeprecatedKeyMap);     }      Map<String, DeprecatedKeyInfo> getDeprecatedKeyMap() {       return deprecatedKeyMap;     }      Map<String, String> getReverseDeprecatedKeyMap() {       return reverseDeprecatedKeyMap;     }   } 

        DeprecationContext中主要有两个属性:deprecatedKeyMap和reverseDeprecatedKeyMap。对于deprecatedKeyMap,其键是过期的键,而值则主要保存该键被替换之后的新键的信息;对于reverseDeprecatedKeyMap,其键为某个过期键更新之后的键,而值则了被更新的过期键。         在介绍了DeprecationContext的具体结构之后,我们继续来看get(String, String)方法中handleDeprecation()方法的具体处理方式,以下是该方法的具体代码:

  private String[] handleDeprecation(DeprecationContext deprecations, String name) {     if (null != name) {       name = name.trim();     }     // 默认使用目标name作为返回值     String[] names = new String[]{name};     // 查询当前name所对应的过期键信息,并且获取其更新后的信息     DeprecatedKeyInfo keyInfo = deprecations.getDeprecatedKeyMap().get(name);     if (keyInfo != null) {       if (!keyInfo.getAndSetAccessed()) {         logDeprecation(keyInfo.getWarningMessage(name));       }       // 将当前过期键更新后的名称返回       names = keyInfo.newKeys;     }     // 如果没用通过set*方法设置的属性,那么直接返回     Properties overlayProperties = getOverlay();     if (overlayProperties.isEmpty()) {       return names;     }      // 查找当前name在DeprecatedKeyInfo中对应的最新的keys,如果某个key的属性在overlay(用户设置的key-value)中存在,那么就将该key对应的     // 值更新该key的值为所更新的过期键的值     for (String n : names) {       String deprecatedKey = deprecations.getReverseDeprecatedKeyMap().get(n);  // 查找当前name是否在更新后的key中存在       if (deprecatedKey != null && overlayProperties.containsKey(n)) {  // 当前name在DeprecatedKeyInfo更新后的名字中存在,且在overlayProperties中也存在当前name的键         String deprecatedValue = overlayProperties.getProperty(deprecatedKey);         if (deprecatedValue != null) {           getProps().setProperty(n, deprecatedValue); // 将DeprecatedKeyInfo中新的键与旧的键所对应的值关联起来,保存在properties和overlay中           overlayProperties.setProperty(n, deprecatedValue);         }       }     }     return names;   } 

       总结来说,在handleDeprecation()方法中,其会查询当前的name是否是过期键,如果不是,则将当前name组装为一个数组返回,如果是,则通过该过期键获取其所对应的更新之后的键,并将其作为返回值。除此之外,其还会在调用者设置的属性(overlayProperties)中查询其是否保存有当前过期键所更新的键名的信息,如果有,则将其更新为该过期键所对应的值。        接下来我们回头看看get(String, String)方法,在通过handleDeprecation()方法获取到该name的相关信息之后,这里主要调用了substituteVars()方法,该方法的主要作用为对获取到的属性值中的占位符如${foo.bar}进行替换,这里getProps()方法即为前面所讲解的通过配置文件初始化相关属性的方法。以下是substituteVars()的具体实现:

  private String substituteVars(String expr) {     if (expr == null) {       return null;     }     String eval = expr;     for(int s = 0; s < MAX_SUBST; s++) {       // 返回值为长度为2的数组,该方法获取属性值中最内层占位符的起始和终止索引,如${prefix${foo.bar}suffix}将返回foo.bar的起始和终止索引       final int[] varBounds = findSubVariable(eval);       if (varBounds[SUB_START_IDX] == -1) {         return eval;       }       final String var = eval.substring(varBounds[SUB_START_IDX], varBounds[SUB_END_IDX]);       String val = null;       try {         // 判断是否为系统属性,系统属性以env.开头         if (var.startsWith("env.") && 4 < var.length()) {           String v = var.substring(4);           int i = 0;           for (; i < v.length(); i++) {             char c = v.charAt(i);             if (c == ':' && i < v.length() - 1 && v.charAt(i + 1) == '-') {               val = getenv(v.substring(0, i));               if (val == null || val.length() == 0) {                 val = v.substring(i + 2);               }               break;             } else if (c == '-') {               val = getenv(v.substring(0, i));               if (val == null) {                 val = v.substring(i + 1);               }               break;             }           }           if (i == v.length()) {             // i == v.length()说明为系统属性,那么返回值为系统属性             val = getenv(v);           }         } else {           // 如果不为系统属性,则从当前上下文环境中获取该属性           val = getProperty(var);         }       } catch(SecurityException se) {         LOG.warn("Unexpected SecurityException in Configuration", se);       }       if (val == null) {         // 如果既不是系统属性也不是环境属性,则在配置文件或者是用户设置的属性中查找         val = getRaw(var);       }       if (val == null) {         // val为null,说明该属性没有定义,则直接返回原始值         return eval;       }        final int dollar = varBounds[SUB_START_IDX] - "${".length();       final int afterRightBrace = varBounds[SUB_END_IDX] + "}".length();  // 因为这里SUB_END_INDEX实际上指向的就是"{"       final String refVar = eval.substring(dollar, afterRightBrace);        // 这里refVar即为要替换的占位符,如${foo.bar},走到这一步说明foo.bar有对应的属性值,即val,这里判断val中是否继续包含       // 有${foo.bar},如果有,则说明发生了嵌套占位,这种情况直接返回原始字符串,否则会发生无限循环       if (val.contains(refVar)) {         return expr;       }        eval = eval.substring(0, dollar) + val + eval.substring(afterRightBrace);     }     throw new IllegalStateException("Variable substitution depth too large: " + MAX_SUBST + " " + expr);   } 

       总结而言,substituteVars()方法主要对属性值中的占位符进行替换,对于属性值的获取,其可以通过三个途径进行:系统变量、当前环境变量和配置文件及用户设置的属性值,并且其优先级是:系统变量 > 当前环境变量 > 配置文件及用户设置的属性值。除此之外,该方法还会处理嵌套占位符,如${prefix${foo.bar}suffix},其是由内而外进行解析,直到不存在占位符,或者是进行解析的层数超过了20层(最外层的for循环控制,这里MAX_SUBST为20)。

3.属性设置

       相对而言,属性的设置要简单一些,和get方法类似,虽然set方法也有很多,但其最终还是调用的set(String, String, String)方法,其第一个和第二个参数为要设置的键值对,第三个参数则为当前属性值的来源。如下是该方法的具体实现:

  public void set(String name, String value, String source) {     Preconditions.checkArgument(name != null, "Property name must not be null");     Preconditions.checkArgument(value != null, "The value of property %s must not be null", name);     name = name.trim();     DeprecationContext deprecations = deprecationContext.get();     if (deprecations.getDeprecatedKeyMap().isEmpty()) {       // 初始化配置文件信息       getProps();     }     getOverlay().setProperty(name, value);     getProps().setProperty(name, value);     String newSource = (source == null ? "programmatically" : source);      if (!isDeprecated(name)) {       // 这里该name不是过期键分为两种情况,一种是在存储过期键的map(即deprecatedKeyMap)中没有相应数据,       // 而在更新的map(即reverseDeprecatedKeyMap)中有数据,第二种是在这两个map中都没有数据       updatingResource.put(name, new String[] {newSource});       // 判断该键是否为更新某一个过期键之后的键,如果是,则获取所有更新了该过期键的键       String[] altNames = getAlternativeNames(name);       if(altNames != null) {         // 更新所有该键所对应的过期键更新之后的键值         for(String n: altNames) {           if(!n.equals(name)) {             getOverlay().setProperty(n, value);             getProps().setProperty(n, value);             updatingResource.put(n, new String[] {newSource});           }         }       }     } else {       // 如果该键为过期键,则将该过期键更新之后的键的值都设置为新的值       String[] names = handleDeprecation(deprecationContext.get(), name);       String altSource = "because " + name + " is deprecated";       for(String n : names) {         getOverlay().setProperty(n, value);         getProps().setProperty(n, value);         updatingResource.put(n, new String[] {altSource});       }     }   } 

       对于set(String, String, String)方法,其不仅更新了当前键值对的属性,而且还判断了该键是否为过期键或者是更新过期键之后的键,如果是则将更新之后的键所对应的值都设置为新值。

本文发表于2017年09月18日 08:36
(c)注:本文转载自https://my.oschina.net/zhangxufeng/blog/1538894,转载目的在于传递更多信息,并不代表本网赞同其观点和对其真实性负责。如有侵权行为,请联系我们,我们会及时删除.

阅读 1941 讨论 0 喜欢 0

抢先体验

扫码体验
趣味小程序
文字表情生成器

闪念胶囊

你要过得好哇,这样我才能恨你啊,你要是过得不好,我都不知道该恨你还是拥抱你啊。

直抵黄龙府,与诸君痛饮尔。

那时陪伴我的人啊,你们如今在何方。

不出意外的话,我们再也不会见了,祝你前程似锦。

这世界真好,吃野东西也要留出这条命来看看

快捷链接
网站地图
提交友链
Copyright © 2016 - 2021 Cion.
All Rights Reserved.
京ICP备2021004668号-1