标签:hadoop单元测试
import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SASL_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HTTPS_ADDRESS_KEY;
import static org.junit.Assert.*;
import java.io.*;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferTestCase;
import org.apache.hadoop.http.HttpConfig;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.minikdc.MiniKdc;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.ssl.KeyStoreTestUtil;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.MiniYARNCluster;
import org.junit.*;
public class TestClusterWithKerberos {
private static File baseDir;
private static String hdfsPrincipal;
private static MiniKdc kdc;
private static String keytab;
private static String spnegoPrincipal;
private MiniYARNCluster yarnCluster;
private MiniDFSCluster cluster;
@BeforeClass
public static void initKdc() throws Exception {
baseDir = new File(System.getProperty("test.build.dir", "target/test-dir"),
SaslDataTransferTestCase.class.getSimpleName());
FileUtil.fullyDelete(baseDir);
assertTrue(baseDir.mkdirs());
Properties kdcConf = MiniKdc.createConf();
kdc = new MiniKdc(kdcConf, baseDir);
kdc.start();
UserGroupInformation ugi = UserGroupInformation.createRemoteUser("tjj");
UserGroupInformation.setLoginUser(ugi);
String userName = UserGroupInformation.getLoginUser().getShortUserName();
File keytabFile = new File(baseDir, userName + ".keytab");
keytab = keytabFile.getAbsolutePath();
kdc.createPrincipal(keytabFile, userName + "/localhost", "HTTP/localhost");
hdfsPrincipal = userName + "/localhost@" + kdc.getRealm();
spnegoPrincipal = "HTTP/localhost@" + kdc.getRealm();
System.out.println("keytab "+keytab+"hdfsPrincipal "+hdfsPrincipal);
}
@AfterClass
public static void shutdownKdc() {
if (kdc != null) {
kdc.stop();
}
FileUtil.fullyDelete(baseDir);
}
private void startCluster(HdfsConfiguration conf) throws IOException {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();//
cluster.waitActive();
yarnCluster = new MiniYARNCluster("MiniClusterStartsWithCountJobTest", // testName
1, // number of node managers
1, // number of local log dirs per node manager
1); // number of hdfs dirs per node manager
yarnCluster.init(conf);
yarnCluster.start();
yarnCluster.getConfig().writeXml(new FileOutputStream(new File("conf.Xml")));
}
@Test
public void testWithMiniCluster() throws Exception {
HdfsConfiguration clusterConf = createSecureConfig("authentication,integrity,privacy");
YarnConfiguration yarnConf = createYarnSecureConfig();
clusterConf.addResource(yarnConf);
startCluster(clusterConf);
Configuration conf = new Configuration();
conf.addResource(FileUtils.openInputStream(new File("conf.Xml")));
String IN_DIR = "testing/wordcount/input";
String OUT_DIR = "testing/wordcount/output";
String DATA_FILE = "sample.txt";
FileSystem fs = FileSystem.get(conf);
Path inDir = new Path(IN_DIR);
Path outDir = new Path(OUT_DIR);
fs.delete(inDir, true);
fs.delete(outDir, true);
// create the input data files
List<String> content = new ArrayList<String>();
content.add("She sells seashells at the seashore, and she sells nuts in the mountain.");
writeHDFSContent(fs, inDir, DATA_FILE, content);
// set up the job, submit the job and wait for it complete
Job job = Job.getInstance(conf);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setMapperClass(BasicWordCount.TokenizerMapper.class);
job.setReducerClass(BasicWordCount.IntSumReducer.class);
FileInputFormat.addInputPath(job, inDir);
FileOutputFormat.setOutputPath(job, outDir);
job.waitForCompletion(true);
assertTrue(job.isSuccessful());
// now check that the output is as expected
List<String> results = getJobResults(fs, outDir, 11);
assertTrue(results.contains("She\t1"));
assertTrue(results.contains("sells\t2"));
// clean up after test case
fs.delete(inDir, true);
fs.delete(outDir, true);
}
/* @Test
public void wordcount() throws Exception {
HdfsConfiguration clusterConf = createSecureConfig("authentication,integrity,privacy");
YarnConfiguration yarnConf = createYarnSecureConfig();
clusterConf.addResource(yarnConf);
startCluster(clusterConf);
Configuration conf = new Configuration();
conf.addResource(FileUtils.openInputStream(new File("conf.Xml")));
String IN_DIR = "testing/wordcount/input";
String OUT_DIR = "testing/wordcount/output";
String DATA_FILE = "sample.txt";
FileSystem fs = FileSystem.get(conf);
Path inDir = new Path(IN_DIR);
Path outDir = new Path(OUT_DIR);
fs.delete(inDir, true);
fs.delete(outDir, true);
// create the input data files
List<String> content = new ArrayList<String>();
content.add("She sells seashells at the seashore, and she sells nuts in the mountain.");
writeHDFSContent(fs, inDir, DATA_FILE, content);
String[] args = new String[]{IN_DIR,OUT_DIR};
int exitCode = ToolRunner.run(conf,new WordCount(), args);
fs.delete(inDir, true);
fs.delete(outDir, true);
}*/
private void writeHDFSContent(FileSystem fs, Path dir, String fileName, List<String> content) throws IOException {
Path newFilePath = new Path(dir, fileName);
FSDataOutputStream out = fs.create(newFilePath);
for (String line : content) {
out.writeBytes(line);
}
out.close();
}
protected List<String> getJobResults(FileSystem fs, Path outDir, int numLines) throws Exception {
List<String> results = new ArrayList<String>();
FileStatus[] fileStatus = fs.listStatus(outDir);
for (FileStatus file : fileStatus) {
String name = file.getPath().getName();
if (name.contains("part-r-00000")) {
Path filePath = new Path(outDir + "/" + name);
BufferedReader reader = new BufferedReader(new InputStreamReader(fs.open(filePath)));
for (int i = 0; i < numLines; i++) {
String line = reader.readLine();
if (line == null) {
fail("Results are not what was expected");
}
System.out.println("line info: "+line);
results.add(line);
}
assertNull(reader.readLine());
reader.close();
}
}
return results;
}
private HdfsConfiguration createSecureConfig(String dataTransferProtection) throws Exception {
HdfsConfiguration conf = new HdfsConfiguration();
SecurityUtil.setAuthenticationMethod(UserGroupInformation.AuthenticationMethod.KERBEROS, conf);
conf.set(DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY, hdfsPrincipal);
conf.set(DFS_NAMENODE_KEYTAB_FILE_KEY, keytab);
conf.set(DFS_DATANODE_KERBEROS_PRINCIPAL_KEY, hdfsPrincipal);
conf.set(DFS_DATANODE_KEYTAB_FILE_KEY, keytab);
conf.set(DFS_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL_KEY, spnegoPrincipal);
conf.setBoolean(DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true);
conf.set(DFS_DATA_TRANSFER_PROTECTION_KEY, dataTransferProtection);
conf.set(DFS_HTTP_POLICY_KEY, HttpConfig.Policy.HTTPS_ONLY.name());
conf.set(DFS_NAMENODE_HTTPS_ADDRESS_KEY, "localhost:0");
conf.set(DFS_DATANODE_HTTPS_ADDRESS_KEY, "localhost:0");
conf.setInt(IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SASL_KEY, 10);
conf.set(DFS_ENCRYPT_DATA_TRANSFER_KEY, "true");//https://issues.apache.org/jira/browse/HDFS-7431
String keystoresDir = baseDir.getAbsolutePath();
String sslConfDir = KeyStoreTestUtil.getClasspathDir(this.getClass());
KeyStoreTestUtil.setupSSLConfig(keystoresDir, sslConfDir, conf, false);
return conf;
}
private YarnConfiguration createYarnSecureConfig(){
YarnConfiguration conf = new YarnConfiguration();
//yarn secure config
conf.set("yarn.resourcemanager.keytab", keytab);
conf.set("yarn.resourcemanager.principal", hdfsPrincipal);
conf.set("yarn.nodemanager.keytab", keytab);
conf.set("yarn.nodemanager.principal", hdfsPrincipal);
// conf.set("yarn.nodemanager.container-executor.class", "org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor");
conf.set("yarn.nodemanager.container-executor.class", "org.apache.hadoop.yarn.server.nodemanager.LinuxContainerExecutor");
conf.set("yarn.nodemanager.linux-container-executor.path", "/container/container-executor");
conf.set("mapreduce.jobhistory.keytab", keytab);
conf.set("mapreduce.jobhistory.principal", hdfsPrincipal);
conf.set("yarn.nodemanager.aux-services", "mapreduce_shuffle");//https://issues.apache.org/jira/browse/YARN-1289
//enable security
conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, "true");
//yarn
conf.set("mapreduce.framework.name", "yarn"); //http://stackoverflow.com/questions/26567223/java-io-ioexception-cannot-initialize-cluster-in-hadoop2-with-yarn use Yarn runner
return conf;
}
}
down vote
I have run into similar issues today. In my case I was building an über jar, where some dependency (I have not found the culprit yet) was bringing in a META-INF/services/org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider with the contents:
org.apache.hadoop.mapred.LocalClientProtocolProvider
I provided my own in the project (e.g. put it on the classpath) with the following:
org.apache.hadoop.mapred.YarnClientProtocolProvider
and the correct one is picked up. I suspect you are seeing similar. To fix, please create the file described above, and put it on the classpath. If I find the culprit Jar, I will update the answer.
hadoop-mapreduce-client-common-2.6.0.jar
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
#org.apache.hadoop.mapred.LocalClientProtocolProvider
org.apache.hadoop.mapred.YarnClientProtocolProvider
MiniYARNCluster MiniDFSCluster Kerberos
标签:hadoop单元测试
原文地址:http://tangjj.blog.51cto.com/1848040/1686809