码迷,mamicode.com
首页 > 其他好文 > 详细

MiniYARNCluster   MiniDFSCluster Kerberos

时间:2015-08-21 17:23:36      阅读:302      评论:0      收藏:0      [点我收藏+]

标签:hadoop单元测试


import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SASL_KEY;

import static org.apache.hadoop.hdfs.DFSConfigKeys.*;

import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HTTPS_ADDRESS_KEY;

import static org.junit.Assert.*;


import java.io.*;

import java.util.ArrayList;

import java.util.List;

import java.util.Properties;


import org.apache.commons.io.FileUtils;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.*;

import org.apache.hadoop.hdfs.HdfsConfiguration;

import org.apache.hadoop.hdfs.MiniDFSCluster;

import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferTestCase;

import org.apache.hadoop.http.HttpConfig;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.apache.hadoop.minikdc.MiniKdc;

import org.apache.hadoop.security.SecurityUtil;

import org.apache.hadoop.security.UserGroupInformation;

import org.apache.hadoop.security.ssl.KeyStoreTestUtil;

import org.apache.hadoop.yarn.conf.YarnConfiguration;

import org.apache.hadoop.yarn.server.MiniYARNCluster;

import org.junit.*;


public class TestClusterWithKerberos {



    private static File baseDir;

    private static String hdfsPrincipal;

    private static MiniKdc kdc;

    private static String keytab;

    private static String spnegoPrincipal;

    private MiniYARNCluster yarnCluster;

    private MiniDFSCluster cluster;



    @BeforeClass

    public static void initKdc() throws Exception {

        baseDir = new File(System.getProperty("test.build.dir", "target/test-dir"),

                SaslDataTransferTestCase.class.getSimpleName());

        FileUtil.fullyDelete(baseDir);

        assertTrue(baseDir.mkdirs());


        Properties kdcConf = MiniKdc.createConf();

        kdc = new MiniKdc(kdcConf, baseDir);

        kdc.start();

        UserGroupInformation ugi = UserGroupInformation.createRemoteUser("tjj");

        UserGroupInformation.setLoginUser(ugi);

        String userName = UserGroupInformation.getLoginUser().getShortUserName();

        File keytabFile = new File(baseDir, userName + ".keytab");

        keytab = keytabFile.getAbsolutePath();

        kdc.createPrincipal(keytabFile, userName + "/localhost", "HTTP/localhost");

        hdfsPrincipal = userName + "/localhost@" + kdc.getRealm();

        spnegoPrincipal = "HTTP/localhost@" + kdc.getRealm();

        System.out.println("keytab "+keytab+"hdfsPrincipal "+hdfsPrincipal);

    }


    @AfterClass

    public static void shutdownKdc() {

        if (kdc != null) {

            kdc.stop();

        }

        FileUtil.fullyDelete(baseDir);

    }






   

    private void startCluster(HdfsConfiguration conf) throws IOException {

        cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();//

        cluster.waitActive();

        yarnCluster = new MiniYARNCluster("MiniClusterStartsWithCountJobTest", // testName

                1, // number of node managers

                1, // number of local log dirs per node manager

                1); // number of hdfs dirs per node manager

        yarnCluster.init(conf);




        yarnCluster.start();

        yarnCluster.getConfig().writeXml(new FileOutputStream(new File("conf.Xml")));

    }


    @Test

    public void testWithMiniCluster() throws Exception {


        HdfsConfiguration clusterConf = createSecureConfig("authentication,integrity,privacy");

        YarnConfiguration yarnConf =  createYarnSecureConfig();

        clusterConf.addResource(yarnConf);

        startCluster(clusterConf);


        Configuration conf = new Configuration();

       conf.addResource(FileUtils.openInputStream(new File("conf.Xml")));

 

        String IN_DIR = "testing/wordcount/input";

        String OUT_DIR = "testing/wordcount/output";

        String DATA_FILE = "sample.txt";


        FileSystem fs = FileSystem.get(conf);

        Path inDir = new Path(IN_DIR);

        Path outDir = new Path(OUT_DIR);


        fs.delete(inDir, true);

        fs.delete(outDir, true);


        // create the input data files

        List<String> content = new ArrayList<String>();

        content.add("She sells seashells at the seashore, and she sells nuts in the mountain.");

        writeHDFSContent(fs, inDir, DATA_FILE, content);


        // set up the job, submit the job and wait for it complete

     

        Job job = Job.getInstance(conf);


        job.setOutputKeyClass(Text.class);

        job.setOutputValueClass(IntWritable.class);

        job.setMapperClass(BasicWordCount.TokenizerMapper.class);

        job.setReducerClass(BasicWordCount.IntSumReducer.class);

        FileInputFormat.addInputPath(job, inDir);

        FileOutputFormat.setOutputPath(job, outDir);

        job.waitForCompletion(true);

        assertTrue(job.isSuccessful());


        // now check that the output is as expected

        List<String> results = getJobResults(fs, outDir, 11);


        assertTrue(results.contains("She\t1"));

        assertTrue(results.contains("sells\t2"));



        // clean up after test case

        fs.delete(inDir, true);

        fs.delete(outDir, true);

    }

   /* @Test

    public void wordcount() throws Exception {


        HdfsConfiguration clusterConf = createSecureConfig("authentication,integrity,privacy");

        YarnConfiguration yarnConf =  createYarnSecureConfig();

        clusterConf.addResource(yarnConf);

        startCluster(clusterConf);


        Configuration conf = new Configuration();

        conf.addResource(FileUtils.openInputStream(new File("conf.Xml")));

        String IN_DIR = "testing/wordcount/input";

        String OUT_DIR = "testing/wordcount/output";

        String DATA_FILE = "sample.txt";


        FileSystem fs = FileSystem.get(conf);

        Path inDir = new Path(IN_DIR);

        Path outDir = new Path(OUT_DIR);


        fs.delete(inDir, true);

        fs.delete(outDir, true);


        // create the input data files

        List<String> content = new ArrayList<String>();

        content.add("She sells seashells at the seashore, and she sells nuts in the mountain.");

        writeHDFSContent(fs, inDir, DATA_FILE, content);

        String[] args = new String[]{IN_DIR,OUT_DIR};

        int exitCode = ToolRunner.run(conf,new WordCount(), args);

        fs.delete(inDir, true);

        fs.delete(outDir, true);

    }*/

    private void writeHDFSContent(FileSystem fs, Path dir, String fileName, List<String> content) throws IOException {

        Path newFilePath = new Path(dir, fileName);

        FSDataOutputStream out = fs.create(newFilePath);

        for (String line : content) {

            out.writeBytes(line);

        }

        out.close();

    }


    protected List<String> getJobResults(FileSystem fs, Path outDir, int numLines) throws Exception {

        List<String> results = new ArrayList<String>();

        FileStatus[] fileStatus = fs.listStatus(outDir);

        for (FileStatus file : fileStatus) {

            String name = file.getPath().getName();

            if (name.contains("part-r-00000")) {

                Path filePath = new Path(outDir + "/" + name);

                BufferedReader reader = new BufferedReader(new InputStreamReader(fs.open(filePath)));

                for (int i = 0; i < numLines; i++) {

                    String line = reader.readLine();

                    if (line == null) {

                        fail("Results are not what was expected");

                    }

                    System.out.println("line info: "+line);

                    results.add(line);

                }

                assertNull(reader.readLine());

                reader.close();

            }

        }

        return results;

    }


    private HdfsConfiguration createSecureConfig(String dataTransferProtection) throws Exception {

        HdfsConfiguration conf = new HdfsConfiguration();

        SecurityUtil.setAuthenticationMethod(UserGroupInformation.AuthenticationMethod.KERBEROS, conf);

        conf.set(DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY, hdfsPrincipal);

        conf.set(DFS_NAMENODE_KEYTAB_FILE_KEY, keytab);

        conf.set(DFS_DATANODE_KERBEROS_PRINCIPAL_KEY, hdfsPrincipal);

        conf.set(DFS_DATANODE_KEYTAB_FILE_KEY, keytab);

        conf.set(DFS_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL_KEY, spnegoPrincipal);

        conf.setBoolean(DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true);

        conf.set(DFS_DATA_TRANSFER_PROTECTION_KEY, dataTransferProtection);

        conf.set(DFS_HTTP_POLICY_KEY, HttpConfig.Policy.HTTPS_ONLY.name());

        conf.set(DFS_NAMENODE_HTTPS_ADDRESS_KEY, "localhost:0");

        conf.set(DFS_DATANODE_HTTPS_ADDRESS_KEY, "localhost:0");

        conf.setInt(IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SASL_KEY, 10);

        conf.set(DFS_ENCRYPT_DATA_TRANSFER_KEY, "true");//https://issues.apache.org/jira/browse/HDFS-7431

        String keystoresDir = baseDir.getAbsolutePath();

        String sslConfDir = KeyStoreTestUtil.getClasspathDir(this.getClass());

        KeyStoreTestUtil.setupSSLConfig(keystoresDir, sslConfDir, conf, false);

        return conf;

    }

    private YarnConfiguration createYarnSecureConfig(){


        YarnConfiguration conf = new YarnConfiguration();



        //yarn secure config

        conf.set("yarn.resourcemanager.keytab", keytab);

        conf.set("yarn.resourcemanager.principal", hdfsPrincipal);


        conf.set("yarn.nodemanager.keytab", keytab);

        conf.set("yarn.nodemanager.principal", hdfsPrincipal);

        //   conf.set("yarn.nodemanager.container-executor.class", "org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor");

        conf.set("yarn.nodemanager.container-executor.class", "org.apache.hadoop.yarn.server.nodemanager.LinuxContainerExecutor");

       

        conf.set("yarn.nodemanager.linux-container-executor.path", "/container/container-executor");

        conf.set("mapreduce.jobhistory.keytab", keytab);

        conf.set("mapreduce.jobhistory.principal", hdfsPrincipal);

        conf.set("yarn.nodemanager.aux-services", "mapreduce_shuffle");//https://issues.apache.org/jira/browse/YARN-1289


        //enable security


        conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, "true");

        //yarn

       conf.set("mapreduce.framework.name", "yarn");  //http://stackoverflow.com/questions/26567223/java-io-ioexception-cannot-initialize-cluster-in-hadoop2-with-yarn   use Yarn runner

        return conf;

    }

}



down vote

I have run into similar issues today. In my case I was building an über jar, where some dependency (I have not found the culprit yet) was bringing in a META-INF/services/org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider with the contents:


org.apache.hadoop.mapred.LocalClientProtocolProvider

I provided my own in the project (e.g. put it on the classpath) with the following:


org.apache.hadoop.mapred.YarnClientProtocolProvider

and the correct one is picked up. I suspect you are seeing similar. To fix, please create the file described above, and put it on the classpath. If I find the culprit Jar, I will update the answer.



http://stackoverflow.com/questions/26567223/java-io-ioexception-cannot-initialize-cluster-in-hadoop2-with-yarn


hadoop-mapreduce-client-common-2.6.0.jar

#

#   Licensed under the Apache License, Version 2.0 (the "License");

#   you may not use this file except in compliance with the License.

#   You may obtain a copy of the License at

#

#       http://www.apache.org/licenses/LICENSE-2.0

#

#   Unless required by applicable law or agreed to in writing, software

#   distributed under the License is distributed on an "AS IS" BASIS,

#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

#   See the License for the specific language governing permissions and

#   limitations under the License.

#

#org.apache.hadoop.mapred.LocalClientProtocolProvider

org.apache.hadoop.mapred.YarnClientProtocolProvider


MiniYARNCluster   MiniDFSCluster Kerberos

标签:hadoop单元测试

原文地址:http://tangjj.blog.51cto.com/1848040/1686809

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!