标签:
DNSToSwitchMapping是接口。
定义了以下几个方法:
public interface DNSToSwitchMapping { /** * Resolves a list of DNS-names/IP-addresses and returns back a list of * switch information (network paths). One-to-one correspondence must be * maintained between the elements in the lists. * Consider an element in the argument list - x.y.com. The switch information * that is returned must be a network path of the form /foo/rack, * where / is the root, and 'foo' is the switch where 'rack' is connected. * Note the hostname/ip-address is not part of the returned path. * The network topology of the cluster would determine the number of * components in the network path. * <p/> * * If a name cannot be resolved to a rack, the implementation * should return {@link NetworkTopology#DEFAULT_RACK}. This * is what the bundled implementations do, though it is not a formal requirement * * @param names the list of hosts to resolve (can be empty) * @return list of resolved network paths. * If <i>names</i> is empty, the returned list is also empty */ public List<String> resolve(List<String> names); /** * Reload all of the cached mappings. * * If there is a cache, this method will clear it, so that future accesses * will get a chance to see the new data. */ public void reloadCachedMappings(); /** * Reload cached mappings on specific nodes. * * If there is a cache on these nodes, this method will clear it, so that * future accesses will see updated data. */ public void reloadCachedMappings(List<String> names); }
AbstractDNSToSwitchMapping implements DNSToSwitchMapping
@InterfaceAudience.Public @InterfaceStability.Evolving public abstract class AbstractDNSToSwitchMapping implements DNSToSwitchMapping, Configurable { private Configuration conf; /** * Create an unconfigured instance */ protected AbstractDNSToSwitchMapping() { } /** * Create an instance, caching the configuration file. * This constructor does not call {@link #setConf(Configuration)}; if * a subclass extracts information in that method, it must call it explicitly. * @param conf the configuration */ protected AbstractDNSToSwitchMapping(Configuration conf) { this.conf = conf; } @Override public Configuration getConf() { return conf; } @Override public void setConf(Configuration conf) { this.conf = conf; } /** * Predicate that indicates that the switch mapping is known to be * single-switch. The base class returns false: it assumes all mappings are * multi-rack. Subclasses may override this with methods that are more aware * of their topologies. * * <p/> * * This method is used when parts of Hadoop need know whether to apply * single rack vs multi-rack policies, such as during block placement. * Such algorithms behave differently if they are on multi-switch systems. * </p> * * @return true if the mapping thinks that it is on a single switch */ public boolean isSingleSwitch() { return false; } /** * Get a copy of the map (for diagnostics) * @return a clone of the map or null for none known */ public Map<String, String> getSwitchMap() { return null; } /** * Generate a string listing the switch mapping implementation, * the mapping for every known node and the number of nodes and * unique switches known about -each entry to a separate line. * @return a string that can be presented to the ops team or used in * debug messages. */ public String dumpTopology() { Map<String, String> rack = getSwitchMap(); StringBuilder builder = new StringBuilder(); builder.append("Mapping: ").append(toString()).append("\n"); if (rack != null) { builder.append("Map:\n"); Set<String> switches = new HashSet<String>(); for (Map.Entry<String, String> entry : rack.entrySet()) { builder.append(" ") .append(entry.getKey()) .append(" -> ") .append(entry.getValue()) .append("\n"); switches.add(entry.getValue()); } builder.append("Nodes: ").append(rack.size()).append("\n"); builder.append("Switches: ").append(switches.size()).append("\n"); } else { builder.append("No topology information"); } return builder.toString(); } protected boolean isSingleSwitchByScriptPolicy() { return conf != null && conf.get(CommonConfigurationKeys.NET_TOPOLOGY_SCRIPT_FILE_NAME_KEY) == null; } /** * Query for a {@link DNSToSwitchMapping} instance being on a single * switch. * <p/> * This predicate simply assumes that all mappings not derived from * this class are multi-switch. * @param mapping the mapping to query * @return true if the base class says it is single switch, or the mapping * is not derived from this class. */ public static boolean isMappingSingleSwitch(DNSToSwitchMapping mapping) { return mapping != null && mapping instanceof AbstractDNSToSwitchMapping && ((AbstractDNSToSwitchMapping) mapping).isSingleSwitch(); } }
/** * A cached implementation of DNSToSwitchMapping that takes an * raw DNSToSwitchMapping and stores the resolved network location in * a cache. The following calls to a resolved network location * will get its location from the cache. * */ @InterfaceAudience.Public @InterfaceStability.Evolving public class CachedDNSToSwitchMapping extends AbstractDNSToSwitchMapping { private Map<String, String> cache = new ConcurrentHashMap<String, String>(); /** * The uncached mapping */ protected final DNSToSwitchMapping rawMapping; /** * cache a raw DNS mapping * @param rawMapping the raw mapping to cache */ public CachedDNSToSwitchMapping(DNSToSwitchMapping rawMapping) { this.rawMapping = rawMapping; } /** * @param names a list of hostnames to probe for being cached * @return the hosts from 'names' that have not been cached previously */ private List<String> getUncachedHosts(List<String> names) { // find out all names without cached resolved location List<String> unCachedHosts = new ArrayList<String>(names.size()); for (String name : names) { if (cache.get(name) == null) { unCachedHosts.add(name); } } return unCachedHosts; } /** * Caches the resolved host:rack mappings. The two list * parameters must be of equal size. * * @param uncachedHosts a list of hosts that were uncached * @param resolvedHosts a list of resolved host entries where the element * at index(i) is the resolved value for the entry in uncachedHosts[i] */ private void cacheResolvedHosts(List<String> uncachedHosts, List<String> resolvedHosts) { // Cache the result if (resolvedHosts != null) { for (int i=0; i<uncachedHosts.size(); i++) { cache.put(uncachedHosts.get(i), resolvedHosts.get(i)); } } } /** * @param names a list of hostnames to look up (can be be empty) * @return the cached resolution of the list of hostnames/addresses. * or null if any of the names are not currently in the cache */ private List<String> getCachedHosts(List<String> names) { List<String> result = new ArrayList<String>(names.size()); // Construct the result for (String name : names) { String networkLocation = cache.get(name); if (networkLocation != null) { result.add(networkLocation); } else { return null; } } return result; } @Override public List<String> resolve(List<String> names) { // normalize all input names to be in the form of IP addresses names = NetUtils.normalizeHostNames(names); List <String> result = new ArrayList<String>(names.size()); if (names.isEmpty()) { return result; } List<String> uncachedHosts = getUncachedHosts(names); // Resolve the uncached hosts List<String> resolvedHosts = rawMapping.resolve(uncachedHosts); //cache them cacheResolvedHosts(uncachedHosts, resolvedHosts); //now look up the entire list in the cache return getCachedHosts(names); } /** * Get the (host x switch) map. * @return a copy of the cached map of hosts to rack */ @Override public Map<String, String> getSwitchMap() { Map<String, String > switchMap = new HashMap<String, String>(cache); return switchMap; } @Override public String toString() { return "cached switch mapping relaying to " + rawMapping; } /** * Delegate the switch topology query to the raw mapping, via * {@link AbstractDNSToSwitchMapping#isMappingSingleSwitch(DNSToSwitchMapping)} * @return true iff the raw mapper is considered single-switch. */ @Override public boolean isSingleSwitch() { return isMappingSingleSwitch(rawMapping); } @Override public void reloadCachedMappings() { cache.clear(); } @Override public void reloadCachedMappings(List<String> names) { for (String name : names) { cache.remove(name); } } }
protected static class RawScriptBasedMapping extends AbstractDNSToSwitchMapping { private String scriptName; private int maxArgs; //max hostnames per call of the script private static final Log LOG = LogFactory.getLog(ScriptBasedMapping.class); /** * Set the configuration and extract the configuration parameters of interest * @param conf the new configuration */ @Override public void setConf (Configuration conf) { super.setConf(conf); if (conf != null) { scriptName = conf.get(SCRIPT_FILENAME_KEY); maxArgs = conf.getInt(SCRIPT_ARG_COUNT_KEY, DEFAULT_ARG_COUNT); } else { scriptName = null; maxArgs = 0; } } /** * Constructor. The mapping is not ready to use until * {@link #setConf(Configuration)} has been called */ public RawScriptBasedMapping() {} @Override public List<String> resolve(List<String> names) { List<String> m = new ArrayList<String>(names.size()); if (names.isEmpty()) { return m; } if (scriptName == null) { for (String name : names) { m.add(NetworkTopology.DEFAULT_RACK); } return m; } String output = runResolveCommand(names, scriptName); if (output != null) { StringTokenizer allSwitchInfo = new StringTokenizer(output); while (allSwitchInfo.hasMoreTokens()) { String switchInfo = allSwitchInfo.nextToken(); m.add(switchInfo); } if (m.size() != names.size()) { // invalid number of entries returned by the script LOG.error("Script " + scriptName + " returned " + Integer.toString(m.size()) + " values when " + Integer.toString(names.size()) + " were expected."); return null; } } else { // an error occurred. return null to signify this. // (exn was already logged in runResolveCommand) return null; } return m; } /** * Build and execute the resolution command. The command is * executed in the directory specified by the system property * "user.dir" if set; otherwise the current working directory is used * @param args a list of arguments * @return null if the number of arguments is out of range, * or the output of the command. */ protected String runResolveCommand(List<String> args, String commandScriptName) { int loopCount = 0; if (args.size() == 0) { return null; } StringBuilder allOutput = new StringBuilder(); int numProcessed = 0; if (maxArgs < MIN_ALLOWABLE_ARGS) { LOG.warn("Invalid value " + Integer.toString(maxArgs) + " for " + SCRIPT_ARG_COUNT_KEY + "; must be >= " + Integer.toString(MIN_ALLOWABLE_ARGS)); return null; } while (numProcessed != args.size()) { int start = maxArgs * loopCount; List<String> cmdList = new ArrayList<String>(); cmdList.add(commandScriptName); for (numProcessed = start; numProcessed < (start + maxArgs) && numProcessed < args.size(); numProcessed++) { cmdList.add(args.get(numProcessed)); } File dir = null; String userDir; if ((userDir = System.getProperty("user.dir")) != null) { dir = new File(userDir); } ShellCommandExecutor s = new ShellCommandExecutor( cmdList.toArray(new String[cmdList.size()]), dir); try { s.execute(); allOutput.append(s.getOutput()).append(" "); } catch (Exception e) { LOG.warn("Exception running " + s, e); return null; } loopCount++; } return allOutput.toString(); } /** * Declare that the mapper is single-switched if a script was not named * in the configuration. * @return true iff there is no script */ @Override public boolean isSingleSwitch() { return scriptName == null; } @Override public String toString() { return scriptName != null ? ("script " + scriptName) : NO_SCRIPT; } @Override public void reloadCachedMappings() { // Nothing to do here, since RawScriptBasedMapping has no cache, and // does not inherit from CachedDNSToSwitchMapping } @Override public void reloadCachedMappings(List<String> names) { // Nothing to do here, since RawScriptBasedMapping has no cache, and // does not inherit from CachedDNSToSwitchMapping } }
/** * Build and execute the resolution command. The command is * executed in the directory specified by the system property * "user.dir" if set; otherwise the current working directory is used * @param args a list of arguments * @return null if the number of arguments is out of range, * or the output of the command. */ protected String runResolveCommand(List<String> args, String commandScriptName) { int loopCount = 0; if (args.size() == 0) { return null; } StringBuilder allOutput = new StringBuilder(); int numProcessed = 0; if (maxArgs < MIN_ALLOWABLE_ARGS) { LOG.warn("Invalid value " + Integer.toString(maxArgs) + " for " + SCRIPT_ARG_COUNT_KEY + "; must be >= " + Integer.toString(MIN_ALLOWABLE_ARGS)); return null; } while (numProcessed != args.size()) { int start = maxArgs * loopCount; List<String> cmdList = new ArrayList<String>(); cmdList.add(commandScriptName); for (numProcessed = start; numProcessed < (start + maxArgs) && numProcessed < args.size(); numProcessed++) { cmdList.add(args.get(numProcessed)); } File dir = null; String userDir; if ((userDir = System.getProperty("user.dir")) != null) { dir = new File(userDir); } ShellCommandExecutor s = new ShellCommandExecutor( cmdList.toArray(new String[cmdList.size()]), dir); try { s.execute(); allOutput.append(s.getOutput()).append(" "); } catch (Exception e) { LOG.warn("Exception running " + s, e); return null; } loopCount++; } return allOutput.toString(); }
ShellCommandExecutor实现如下:
public ShellCommandExecutor(String[] execString, File dir) { this(execString, dir, null); } public ShellCommandExecutor(String[] execString, File dir, Map<String, String> env) { this(execString, dir, env , 0L); } /** * Create a new instance of the ShellCommandExecutor to execute a command. * * @param execString The command to execute with arguments * @param dir If not-null, specifies the directory which should be set * as the current working directory for the command. * If null, the current working directory is not modified. * @param env If not-null, environment of the command will include the * key-value pairs specified in the map. If null, the current * environment is not modified. * @param timeout Specifies the time in milliseconds, after which the * command will be killed and the status marked as timedout. * If 0, the command will not be timed out. */ public ShellCommandExecutor(String[] execString, File dir, Map<String, String> env, long timeout) { command = execString.clone(); if (dir != null) { setWorkingDirectory(dir); } if (env != null) { setEnvironment(env); } timeOutInterval = timeout; }
/** Execute the shell command. */ public void execute() throws IOException { this.run(); }
/** check to see if a command needs to be executed and execute if needed */ protected void run() throws IOException { if (lastTime + interval > Time.now()) return; exitCode = 0; // reset for next run runCommand(); }
runCommand的源代码如下:
/** Run a command */ private void runCommand() throws IOException { ProcessBuilder builder = new ProcessBuilder(getExecString()); Timer timeOutTimer = null; ShellTimeoutTimerTask timeoutTimerTask = null; timedOut = new AtomicBoolean(false); completed = new AtomicBoolean(false); if (environment != null) { builder.environment().putAll(this.environment); } if (dir != null) { builder.directory(this.dir); } builder.redirectErrorStream(redirectErrorStream); if (Shell.WINDOWS) { synchronized (WindowsProcessLaunchLock) { // To workaround the race condition issue with child processes // inheriting unintended handles during process launch that can // lead to hangs on reading output and error streams, we // serialize process creation. More info available at: // http://support.microsoft.com/kb/315939 process = builder.start(); } } else { process = builder.start(); } if (timeOutInterval > 0) { timeOutTimer = new Timer("Shell command timeout"); timeoutTimerTask = new ShellTimeoutTimerTask( this); //One time scheduling. timeOutTimer.schedule(timeoutTimerTask, timeOutInterval); } final BufferedReader errReader = new BufferedReader(new InputStreamReader(process .getErrorStream())); BufferedReader inReader = new BufferedReader(new InputStreamReader(process .getInputStream())); final StringBuffer errMsg = new StringBuffer(); // read error and input streams as this would free up the buffers // free the error stream buffer Thread errThread = new Thread() { @Override public void run() { try { String line = errReader.readLine(); while((line != null) && !isInterrupted()) { errMsg.append(line); errMsg.append(System.getProperty("line.separator")); line = errReader.readLine(); } } catch(IOException ioe) { LOG.warn("Error reading the error stream", ioe); } } }; try { errThread.start(); } catch (IllegalStateException ise) { } try { parseExecResult(inReader); // parse the output // clear the input stream buffer String line = inReader.readLine(); while(line != null) { line = inReader.readLine(); } // wait for the process to finish and check the exit code exitCode = process.waitFor(); // make sure that the error thread exits joinThread(errThread); completed.set(true); //the timeout thread handling //taken care in finally block if (exitCode != 0) { throw new ExitCodeException(exitCode, errMsg.toString()); } } catch (InterruptedException ie) { throw new IOException(ie.toString()); } finally { if (timeOutTimer != null) { timeOutTimer.cancel(); } // close the input stream try { // JDK 7 tries to automatically drain the input streams for us // when the process exits, but since close is not synchronized, // it creates a race if we close the stream first and the same // fd is recycled. the stream draining thread will attempt to // drain that fd!! it may block, OOM, or cause bizarre behavior // see: https://bugs.openjdk.java.net/browse/JDK-8024521 // issue is fixed in build 7u60 InputStream stdout = process.getInputStream(); synchronized (stdout) { inReader.close(); } } catch (IOException ioe) { LOG.warn("Error while closing the input stream", ioe); } if (!completed.get()) { errThread.interrupt(); joinThread(errThread); } try { InputStream stderr = process.getErrorStream(); synchronized (stderr) { errReader.close(); } } catch (IOException ioe) { LOG.warn("Error while closing the error stream", ioe); } process.destroy(); lastTime = Time.now(); } }
ScriptBasedMapping,CachedDNSToSwitchMapping,AbstractDNSToSwitchMapping,DNSToSwitchMapping类层次分析
标签:
原文地址:http://blog.csdn.net/houzhizhen/article/details/51354033