对配置文件的配置及解析是每个框架的基本且必不可少的部分,本文主要对Hadoop中的配置文件的解析类Configuration的基本结构及主要方法进行介绍。 Hadoop的配置文件的操作主要分为三个部分:配置的加载、属性读取和设置,本文将分别对其进行介绍。如下是Configuration的主要的成员属性:
/** * 保存了所有的资源配置的来源信息,资源文件主要有以下几种形式: URL、String、Path、InputStream和Properties。 */ private ArrayList<Resource> resources = new ArrayList<Resource>(); /** * 记录了配置文件中配置的final类型的属性名称,标记为final之后如果另外有同名的属性,那么该属性将不会被替换 */ private Set<String> finalParameters = Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>()); /** * 记录了配置初始化后更新过的属性,其键为更新的属性名,值为更新操作的来源 */ private Map<String, String[]> updatingResource; /** * 记录了所有的属性,包括系统初始化以及后续设置的属性 */ private Properties properties; /** * 记录了除了初始化之后手动调用set方法设置的属性 */ private Properties overlay; /** * 记录了过期的属性 */ private static AtomicReference<DeprecationContext> deprecationContext = new AtomicReference<DeprecationContext>(new DeprecationContext(null, defaultDeprecations));
1.配置的加载
Configuration中的配置文件的信息主要保存在resources属性中,该属性是一个ArrayList<Resource>类型,如下是Resource的结构:
private static class Resource { private final Object resource; private final String name; // 构造方法,get和set方法略 }
这里的name表示资源文件的名称,resource则表示具体的资源,其用一个Object类型,但具体的类型如下:
- URL: 通过一个URL链接来进行读取;
- String: 从当前classpath下该字符串所指定的文件读取;
- Path: 以绝对路径指定的配置文件或者是使用url指定的配置;
- InputStream: 以流的形式指定的配置文件;
- Properties: 以属性配置类保存的配置信息。 除了可以通过上述方式进行配置的设置以外,Hadoop还设置了几个默认的配置文件:core-default.xml、core-site.xml和hadoop-site.xml,具体的读取代码如下:
static { // Add default resources addDefaultResource("core-default.xml"); addDefaultResource("core-site.xml"); // print deprecation warning if hadoop-site.xml is found in classpath ClassLoader cL = Thread.currentThread().getContextClassLoader(); if (cL == null) { cL = Configuration.class.getClassLoader(); } if (cL.getResource("hadoop-site.xml") != null) { LOG.warn("DEPRECATED: hadoop-site.xml found in the classpath. " + "Usage of hadoop-site.xml is deprecated. Instead use core-site.xml, " + "mapred-site.xml and hdfs-site.xml to override properties of " + "core-default.xml, mapred-default.xml and hdfs-default.xml " + "respectively"); addDefaultResource("hadoop-site.xml"); } }
这里需要注意的是,Hadoop对配置文件的读取并不是在Configuration类初始化时进行的,而是在获取配置信息时再进行读取,再此之前,配置相关的信息都保存在resources属性中。如下是addDefaultResource(String)方法的代码:
public static synchronized void addDefaultResource(String name) { if(!defaultResources.contains(name)) { defaultResources.add(name); for(Configuration conf : REGISTRY.keySet()) { if(conf.loadDefaults) { conf.reloadConfiguration(); } } } }
从上述代码可以看出,在添加配置文件时,其先检查默认配置文件中是否有该配置文件,如果不存在,则将其添加到defaultResources列表中,并且其会对Configuration配置项进行检查,如果其配置了加载默认配置文件,那么其就会对该配置中保存配置信息的properties和finalParameters进行清空,以此触发属性的重新加载。 在初始化时将相关配置信息添加到resources和defaultResources列表之后,配置文件中属性的读取是在客户端具体调用get*方法时进行的,以下是初始化配置信息的代码:
protected synchronized Properties getProps() { if (properties == null) { properties = new Properties(); Map<String, String[]> backup = new ConcurrentHashMap<String, String[]>(updatingResource); loadResources(properties, resources, quietmode); if (overlay != null) { properties.putAll(overlay); for (Map.Entry<Object,Object> item: overlay.entrySet()) { String key = (String)item.getKey(); String[] source = backup.get(key); if(source != null) { updatingResource.put(key, source); } } } } return properties; }
上述代码中,首先判断properties是否为空,为空则进行初始化,否则直接返回,这也就是前面为什么将properties和finalParameters进行清空之后能够触发属性的重新加载的原因。在初始化配置信息的时候首先对更新的资源过的资源进行备份,然后调用loadResources()方法加载配置文件的信息,加载完之后还会将用户调用set*方法设置的属性(保存在overlay中)添加到properties中,并且将用户设置的信息添加到updatingResource中。如下是loadResources()方法的具体代码:
private void loadResources(Properties properties, ArrayList<Resource> resources, boolean quiet) { if(loadDefaults) { for (String resource : defaultResources) { loadResource(properties, new Resource(resource), quiet); } } for (int i = 0; i < resources.size(); i++) { Resource ret = loadResource(properties, resources.get(i), quiet); if (ret != null) { resources.set(i, ret); } } }
loadResources是对所有的配置文件信息进行加载的方法,其主要包括两个部分:加载默认配置文件和加载资源文件。在加载配置文件时除了Properties和Resource属性之外,还有一个boolean类型的属性quiet,该属性表示是否以“安静”模式加载配置文件,即加载时是否打印加载日志。如下是通过loadResource()加载配置文件的具体代码:
private Resource loadResource(Properties properties, Resource wrapper, boolean quiet) { String name = UNKNOWN_RESOURCE; try { Object resource = wrapper.getResource(); name = wrapper.getName(); XMLStreamReader2 reader = null; boolean returnCachedProperties = false; if (resource instanceof URL) { // an URL resource reader = (XMLStreamReader2)parse((URL)resource); } else if (resource instanceof String) { // a CLASSPATH resource URL url = getResource((String)resource); reader = (XMLStreamReader2)parse(url); } else if (resource instanceof Path) { // a file resource // Can't use FileSystem API or we get an infinite loop // since FileSystem uses Configuration API. Use java.io.File instead. File file = new File(((Path)resource).toUri().getPath()) .getAbsoluteFile(); if (file.exists()) { if (!quiet) { LOG.debug("parsing File " + file); } reader = (XMLStreamReader2)parse(new BufferedInputStream( new FileInputStream(file)), ((Path)resource).toString()); } } else if (resource instanceof InputStream) { reader = (XMLStreamReader2)parse((InputStream)resource, null); returnCachedProperties = true; } else if (resource instanceof Properties) { overlay(properties, (Properties)resource); } if (reader == null) { if (quiet) { return null; } throw new RuntimeException(resource + " not found"); } Properties toAddTo = properties; if(returnCachedProperties) { toAddTo = new Properties(); } DeprecationContext deprecations = deprecationContext.get(); StringBuilder token = new StringBuilder(); String confName = null; String confValue = null; String confInclude = null; boolean confFinal = false; boolean fallbackAllowed = false; boolean fallbackEntered = false; boolean parseToken = false; LinkedList<String> confSource = new LinkedList<String>(); while (reader.hasNext()) { switch (reader.next()) { case XMLStreamConstants.START_ELEMENT: switch (reader.getLocalName()) { case "property": confName = null; confValue = null; confFinal = false; confSource.clear(); // First test for short format configuration int attrCount = reader.getAttributeCount(); for (int i = 0; i < attrCount; i++) { String propertyAttr = reader.getAttributeLocalName(i); if ("name".equals(propertyAttr)) { confName = StringInterner.weakIntern( reader.getAttributeValue(i)); } else if ("value".equals(propertyAttr)) { confValue = StringInterner.weakIntern( reader.getAttributeValue(i)); } else if ("final".equals(propertyAttr)) { confFinal = "true".equals(reader.getAttributeValue(i)); } else if ("source".equals(propertyAttr)) { confSource.add(StringInterner.weakIntern( reader.getAttributeValue(i))); } } break; case "name": case "value": case "final": case "source": parseToken = true; token.setLength(0); break; case "include": // Determine href for xi:include confInclude = null; attrCount = reader.getAttributeCount(); for (int i = 0; i < attrCount; i++) { String attrName = reader.getAttributeLocalName(i); if ("href".equals(attrName)) { confInclude = reader.getAttributeValue(i); } } if (confInclude == null) { break; } // Determine if the included resource is a classpath resource // otherwise fallback to a file resource // xi:include are treated as inline and retain current source URL include = getResource(confInclude); if (include != null) { Resource classpathResource = new Resource(include, name); loadResource(properties, classpathResource, quiet); } else { URL url; try { url = new URL(confInclude); url.openConnection().connect(); } catch (IOException ioe) { File href = new File(confInclude); if (!href.isAbsolute()) { // Included resources are relative to the current resource File baseFile = new File(name).getParentFile(); href = new File(baseFile, href.getPath()); } if (!href.exists()) { // Resource errors are non-fatal iff there is 1 xi:fallback fallbackAllowed = true; break; } url = href.toURI().toURL(); } Resource uriResource = new Resource(url, name); loadResource(properties, uriResource, quiet); } break; case "fallback": fallbackEntered = true; break; case "configuration": break; default: break; } break; case XMLStreamConstants.CHARACTERS: if (parseToken) { char[] text = reader.getTextCharacters(); token.append(text, reader.getTextStart(), reader.getTextLength()); } break; case XMLStreamConstants.END_ELEMENT: switch (reader.getLocalName()) { case "name": if (token.length() > 0) { confName = StringInterner.weakIntern(token.toString().trim()); } break; case "value": if (token.length() > 0) { confValue = StringInterner.weakIntern(token.toString()); } break; case "final": confFinal = "true".equals(token.toString()); break; case "source": confSource.add(StringInterner.weakIntern(token.toString())); break; case "include": if (fallbackAllowed && !fallbackEntered) { throw new IOException("Fetch fail on include for '" + confInclude + "' with no fallback while loading '" + name + "'"); } fallbackAllowed = false; fallbackEntered = false; break; case "property": if (confName == null || (!fallbackAllowed && fallbackEntered)) { break; } confSource.add(name); DeprecatedKeyInfo keyInfo = deprecations.getDeprecatedKeyMap().get(confName); if (keyInfo != null) { keyInfo.clearAccessed(); for (String key : keyInfo.newKeys) { // update new keys with deprecated key's value loadProperty(toAddTo, name, key, confValue, confFinal, confSource.toArray(new String[confSource.size()])); } } else { loadProperty(toAddTo, name, confName, confValue, confFinal, confSource.toArray(new String[confSource.size()])); } break; default: break; } default: break; } } reader.close(); if (returnCachedProperties) { overlay(properties, toAddTo); return new Resource(toAddTo, name); } return null; } catch (IOException e) { LOG.fatal("error parsing conf " + name, e); throw new RuntimeException(e); } catch (XMLStreamException e) { LOG.fatal("error parsing conf " + name, e); throw new RuntimeException(e); } }
从代码中可以看出,在加载每个配置文件的时候,首先通过instanceof判断wrapper对象中Object类型的属性resource是什么类型,然后根据具体不同的类型将其转换为一个XMLStreamReader2类型的reader。对于转换之后的reader,其会依次读取xml配置文件中的具体标签信息,如:name、value、final、source等等,并且将读取后的信息保存在properties中,这里需要注意的是,在代码中的case "include"处可以看出,配置文件中如果使用了<include></include>标签引入其他的配置文件,那么其会递归的调用loadResource()方法对其进行读取。 在加载完配置
2.属性的获取
Configuration类主要是通过get方法获取相关属性的,其get方法的种类有三十多个,如:get(String)、get(String, String)、getBoolean(String, Boolean)、getClass(String, Class<?>)等等,但是最终调用的还是get(String, String)方法,该方法的作用是获取一个某个名称对应的属性值,具体代码如下:
public String get(String name, String defaultValue) { // 处理过期的属性 String[] names = handleDeprecation(deprecationContext.get(), name); String result = null; // 对属性值中形如${foo.bar}引入的其他属性进行替换 for(String n : names) { result = substituteVars(getProps().getProperty(n, defaultValue)); } return result; }
通过上述代码可以看出,get(String, String)方法主要做了两件事:处理过期键和对属性值进行标签替换处理。根据前面的介绍我们知道deprecationContext是一个AtomicReference<DeprecationContext>类型的变量,其保存有过期键的相关信息,并且这里使用AtomicReference包装,也就是说其获取和更新都是原子化的。这里我们首先讲解一下DeprecationContext具体实现方式,如下是该类的代码:
private static class DeprecationContext { /** * 保存有过期键信息,其值为主要是该过期键更新后的键的信息 */ private final Map<String, DeprecatedKeyInfo> deprecatedKeyMap; /** * 对于DeprecatedKeyInfo,其有一个String[] newKeys属性,即更新后的键信息,这里reverseDeprecatedKeyMap的 * 键为newKeys中的各个字符串,而值则对应于newKeys所在的DeprecatedKeyInfo对象在deprecatedKeyMap所属的键 */ private final Map<String, String> reverseDeprecatedKeyMap; /** * 根据另一个DeprecationContext对象实例化一个DeprecationContext对象,这里DeprecationDelta指的是除了DeprecationContext对象以外新增加的属性 */ @SuppressWarnings("unchecked") DeprecationContext(DeprecationContext other, DeprecationDelta[] deltas) { HashMap<String, DeprecatedKeyInfo> newDeprecatedKeyMap = new HashMap<String, DeprecatedKeyInfo>(); HashMap<String, String> newReverseDeprecatedKeyMap = new HashMap<String, String>(); if (other != null) { for (Entry<String, DeprecatedKeyInfo> entry : other.deprecatedKeyMap.entrySet()) { newDeprecatedKeyMap.put(entry.getKey(), entry.getValue()); } for (Entry<String, String> entry : other.reverseDeprecatedKeyMap.entrySet()) { newReverseDeprecatedKeyMap.put(entry.getKey(), entry.getValue()); } } for (DeprecationDelta delta : deltas) { if (!newDeprecatedKeyMap.containsKey(delta.getKey())) { DeprecatedKeyInfo newKeyInfo = new DeprecatedKeyInfo(delta.getNewKeys(), delta.getCustomMessage()); newDeprecatedKeyMap.put(delta.key, newKeyInfo); // 从这里可以看出,reverseDeprecatedKeyMap中的键为更新之后的过期键信息,而值为最初的过期键 for (String newKey : delta.getNewKeys()) { newReverseDeprecatedKeyMap.put(newKey, delta.key); } } } this.deprecatedKeyMap = UnmodifiableMap.decorate(newDeprecatedKeyMap); this.reverseDeprecatedKeyMap = UnmodifiableMap.decorate(newReverseDeprecatedKeyMap); } Map<String, DeprecatedKeyInfo> getDeprecatedKeyMap() { return deprecatedKeyMap; } Map<String, String> getReverseDeprecatedKeyMap() { return reverseDeprecatedKeyMap; } }
DeprecationContext中主要有两个属性:deprecatedKeyMap和reverseDeprecatedKeyMap。对于deprecatedKeyMap,其键是过期的键,而值则主要保存该键被替换之后的新键的信息;对于reverseDeprecatedKeyMap,其键为某个过期键更新之后的键,而值则了被更新的过期键。 在介绍了DeprecationContext的具体结构之后,我们继续来看get(String, String)方法中handleDeprecation()方法的具体处理方式,以下是该方法的具体代码:
private String[] handleDeprecation(DeprecationContext deprecations, String name) { if (null != name) { name = name.trim(); } // 默认使用目标name作为返回值 String[] names = new String[]{name}; // 查询当前name所对应的过期键信息,并且获取其更新后的信息 DeprecatedKeyInfo keyInfo = deprecations.getDeprecatedKeyMap().get(name); if (keyInfo != null) { if (!keyInfo.getAndSetAccessed()) { logDeprecation(keyInfo.getWarningMessage(name)); } // 将当前过期键更新后的名称返回 names = keyInfo.newKeys; } // 如果没用通过set*方法设置的属性,那么直接返回 Properties overlayProperties = getOverlay(); if (overlayProperties.isEmpty()) { return names; } // 查找当前name在DeprecatedKeyInfo中对应的最新的keys,如果某个key的属性在overlay(用户设置的key-value)中存在,那么就将该key对应的 // 值更新该key的值为所更新的过期键的值 for (String n : names) { String deprecatedKey = deprecations.getReverseDeprecatedKeyMap().get(n); // 查找当前name是否在更新后的key中存在 if (deprecatedKey != null && overlayProperties.containsKey(n)) { // 当前name在DeprecatedKeyInfo更新后的名字中存在,且在overlayProperties中也存在当前name的键 String deprecatedValue = overlayProperties.getProperty(deprecatedKey); if (deprecatedValue != null) { getProps().setProperty(n, deprecatedValue); // 将DeprecatedKeyInfo中新的键与旧的键所对应的值关联起来,保存在properties和overlay中 overlayProperties.setProperty(n, deprecatedValue); } } } return names; }
总结来说,在handleDeprecation()方法中,其会查询当前的name是否是过期键,如果不是,则将当前name组装为一个数组返回,如果是,则通过该过期键获取其所对应的更新之后的键,并将其作为返回值。除此之外,其还会在调用者设置的属性(overlayProperties)中查询其是否保存有当前过期键所更新的键名的信息,如果有,则将其更新为该过期键所对应的值。 接下来我们回头看看get(String, String)方法,在通过handleDeprecation()方法获取到该name的相关信息之后,这里主要调用了substituteVars()方法,该方法的主要作用为对获取到的属性值中的占位符如${foo.bar}进行替换,这里getProps()方法即为前面所讲解的通过配置文件初始化相关属性的方法。以下是substituteVars()的具体实现:
private String substituteVars(String expr) { if (expr == null) { return null; } String eval = expr; for(int s = 0; s < MAX_SUBST; s++) { // 返回值为长度为2的数组,该方法获取属性值中最内层占位符的起始和终止索引,如${prefix${foo.bar}suffix}将返回foo.bar的起始和终止索引 final int[] varBounds = findSubVariable(eval); if (varBounds[SUB_START_IDX] == -1) { return eval; } final String var = eval.substring(varBounds[SUB_START_IDX], varBounds[SUB_END_IDX]); String val = null; try { // 判断是否为系统属性,系统属性以env.开头 if (var.startsWith("env.") && 4 < var.length()) { String v = var.substring(4); int i = 0; for (; i < v.length(); i++) { char c = v.charAt(i); if (c == ':' && i < v.length() - 1 && v.charAt(i + 1) == '-') { val = getenv(v.substring(0, i)); if (val == null || val.length() == 0) { val = v.substring(i + 2); } break; } else if (c == '-') { val = getenv(v.substring(0, i)); if (val == null) { val = v.substring(i + 1); } break; } } if (i == v.length()) { // i == v.length()说明为系统属性,那么返回值为系统属性 val = getenv(v); } } else { // 如果不为系统属性,则从当前上下文环境中获取该属性 val = getProperty(var); } } catch(SecurityException se) { LOG.warn("Unexpected SecurityException in Configuration", se); } if (val == null) { // 如果既不是系统属性也不是环境属性,则在配置文件或者是用户设置的属性中查找 val = getRaw(var); } if (val == null) { // val为null,说明该属性没有定义,则直接返回原始值 return eval; } final int dollar = varBounds[SUB_START_IDX] - "${".length(); final int afterRightBrace = varBounds[SUB_END_IDX] + "}".length(); // 因为这里SUB_END_INDEX实际上指向的就是"{" final String refVar = eval.substring(dollar, afterRightBrace); // 这里refVar即为要替换的占位符,如${foo.bar},走到这一步说明foo.bar有对应的属性值,即val,这里判断val中是否继续包含 // 有${foo.bar},如果有,则说明发生了嵌套占位,这种情况直接返回原始字符串,否则会发生无限循环 if (val.contains(refVar)) { return expr; } eval = eval.substring(0, dollar) + val + eval.substring(afterRightBrace); } throw new IllegalStateException("Variable substitution depth too large: " + MAX_SUBST + " " + expr); }
总结而言,substituteVars()方法主要对属性值中的占位符进行替换,对于属性值的获取,其可以通过三个途径进行:系统变量、当前环境变量和配置文件及用户设置的属性值,并且其优先级是:系统变量 > 当前环境变量 > 配置文件及用户设置的属性值。除此之外,该方法还会处理嵌套占位符,如${prefix${foo.bar}suffix},其是由内而外进行解析,直到不存在占位符,或者是进行解析的层数超过了20层(最外层的for循环控制,这里MAX_SUBST为20)。
3.属性设置
相对而言,属性的设置要简单一些,和get方法类似,虽然set方法也有很多,但其最终还是调用的set(String, String, String)方法,其第一个和第二个参数为要设置的键值对,第三个参数则为当前属性值的来源。如下是该方法的具体实现:
public void set(String name, String value, String source) { Preconditions.checkArgument(name != null, "Property name must not be null"); Preconditions.checkArgument(value != null, "The value of property %s must not be null", name); name = name.trim(); DeprecationContext deprecations = deprecationContext.get(); if (deprecations.getDeprecatedKeyMap().isEmpty()) { // 初始化配置文件信息 getProps(); } getOverlay().setProperty(name, value); getProps().setProperty(name, value); String newSource = (source == null ? "programmatically" : source); if (!isDeprecated(name)) { // 这里该name不是过期键分为两种情况,一种是在存储过期键的map(即deprecatedKeyMap)中没有相应数据, // 而在更新的map(即reverseDeprecatedKeyMap)中有数据,第二种是在这两个map中都没有数据 updatingResource.put(name, new String[] {newSource}); // 判断该键是否为更新某一个过期键之后的键,如果是,则获取所有更新了该过期键的键 String[] altNames = getAlternativeNames(name); if(altNames != null) { // 更新所有该键所对应的过期键更新之后的键值 for(String n: altNames) { if(!n.equals(name)) { getOverlay().setProperty(n, value); getProps().setProperty(n, value); updatingResource.put(n, new String[] {newSource}); } } } } else { // 如果该键为过期键,则将该过期键更新之后的键的值都设置为新的值 String[] names = handleDeprecation(deprecationContext.get(), name); String altSource = "because " + name + " is deprecated"; for(String n : names) { getOverlay().setProperty(n, value); getProps().setProperty(n, value); updatingResource.put(n, new String[] {altSource}); } } }
对于set(String, String, String)方法,其不仅更新了当前键值对的属性,而且还判断了该键是否为过期键或者是更新过期键之后的键,如果是则将更新之后的键所对应的值都设置为新值。