码迷,mamicode.com
首页 > 其他好文 > 详细

在不同版本hdfs集群之间转移数据

时间:2015-06-05 17:38:17      阅读:226      评论:0      收藏:0      [点我收藏+]

标签:hadoop   超人学院   hbase   

在不同版本hdfs集群之间转移数据
    最简单的办法就是把src集群的数据导到本地,然后起另一个进程将本地数据传到des集群上去。 
    不过这有几个问题:

  • 效率降低 
  • 占用本地磁盘空间 
  • 不能应付实时导数据需求     
  • 两个进程需要协调,复杂度增加 

    
更好的办法是在同一个进程内一边读src数据,一边写des集群。不过这相当于在同一个进程空间内加载两个版本的hadoop jar包,这就需要在程序中使用两个classloader来实现。 
    以下代码可以实现classloader加载自定义的jar包,并生成需要的Configuration对象:
Java代码

  • URL[] jarUrls = new URL[1];   
  • jarUrls[0]=new File(des_jar_path).toURI().toURL();       
  • ClassLoader jarloader = new URLClassLoader(jarUrls, null);       
  • Class Proxy = Class.forName("yourclass", true, jarloader);       
  • Configuration conf = (Configuration)Proxy.newInstance();  
URL[] jarUrls = new URL[1];
jarUrls[0]=newFile(des_jar_path).toURI().toURL();
ClassLoader jarloader = newURLClassLoader(jarUrls, null);
Class Proxy =Class.forName("yourclass", true, jarloader);
Configuration conf =(Configuration)Proxy.newInstance();


    
但是由于在生成HTable对象时,需要使用这个conf对象,而加载这个conf对象的代码本身是由默认的classloader加载的,也就是0.19.2的jar包。所以在以上代码最后一行所强制转换的Configuration对象仍然是0.19.2版本的。那怎么办呢? 
    琢磨了一会,发现如果要实现以上功能,必须将生成HTable对象,以及以后的所有hbase操作都使用这个新的classloader,因此这个新的classloader必须加载除了0.19.2的jar包外所有需要用到的jar包,然后把所有操作都封装进去。在外面用反射来调用。
    这样的话,通常构造函数都不为空了,因此需要用到Constructor来构造一个自定义的构造函数 
    代码段如下:
Java代码

  • main.java       
  • void init(){       
  •     ClassLoader jarloader = generateJarLoader();       
  •     Class Proxy = Class.forName("test.writer.hbasewriter.HBaseProxy", true, jarloader);       
  •     Constructor con = Proxy.getConstructor(new Class[]{String.class, String.class, boolean.class});       
  •     Boolean autoflush = param.getBoolValue(ParamsKey.HbaseWriter.autoFlush, true);       
  •     proxy = con.newInstance(new Object[]{path, tablename, autoflush});       
  • }       
  • void put(){       
  • ...       
  •     while((line = getLine()) != null) {       
  •         proxy.getClass().getMethod("generatePut",String.class).invoke(proxy, line.getField(rowkey));       
  •         Method addPut = proxy.getClass().getMethod("addPut",       
  •                 new Class[]{String.class, String.class, String.class});       
  •         addPut.invoke(proxy, new Object[]{field, column, encode});       
  •         proxy.getClass().getMethod("putLine").invoke(proxy);       
  •     }       
  • }       
  •   
  • ClassLoader generateJarLoader() throws IOException {       
  •       String libPath = System.getProperty("java.ext.dirs");       
  •       FileFilter filter = new FileFilter() {       
  •       @Override  
  •       public boolean accept(File pathname) {       
  •         if(pathname.getName().startsWith("hadoop-0.19.2"))       
  •           return false;       
  •         else  
  •             return pathname.getName().endsWith(".jar");       
  •       }       
  •       };       
  •       File[] jars = new File(libPath).listFiles(filter);       
  •       URL[] jarUrls = new URL[jars.length+1];   
  •                
  •       int k = 0;   
  •       for (int i = 0; i < jars.length; i++) {       
  •         jarUrls[k++] = jars.toURI().toURL();       
  •       }       
  •       jarUrls[k] = new File("hadoop-0.20.205.jar")       
  •       ClassLoader jarloader = new URLClassLoader(jarUrls, null);       
  •       return jarloader;       
  • }  
main.java
void init(){
  ClassLoader jarloader = generateJarLoader();
  Class Proxy =Class.forName("test.writer.hbasewriter.HBaseProxy", true, jarloader);
  Constructor con = Proxy.getConstructor(new Class[]{String.class,String.class, boolean.class});
  Boolean autoflush =param.getBoolValue(ParamsKey.HbaseWriter.autoFlush, true);
  proxy = con.newInstance(new Object[]{path, tablename, autoflush});
}
void put(){
...
  while((line = getLine()) != null) {
   proxy.getClass().getMethod("generatePut",String.class).invoke(proxy,line.getField(rowkey));
   Method addPut = proxy.getClass().getMethod("addPut",
      new Class[]{String.class, String.class, String.class});
   addPut.invoke(proxy, new Object[]{field, column, encode});
   proxy.getClass().getMethod("putLine").invoke(proxy);
  }
}
ClassLoader generateJarLoader()throws IOException {
      String libPath =System.getProperty("java.ext.dirs");
      FileFilter filter = new FileFilter() {
      @Override
      public boolean accept(File pathname) {
        if(pathname.getName().startsWith("hadoop-0.19.2"))
          return false;
        else
         returnpathname.getName().endsWith(".jar");
      }
      };
      File[] jars = newFile(libPath).listFiles(filter);
      URL[] jarUrls = new URL[jars.length+1];
   
      int k = 0;
      for (int i = 0; i < jars.length; i++){
        jarUrls[k++] = jars.toURI().toURL();
      }
      jarUrls[k] = newFile("hadoop-0.20.205.jar")
      ClassLoader jarloader = newURLClassLoader(jarUrls, null);
     return jarloader;
}
Java代码

  • HBaseProxy.java       
  • public HBaseProxy(String hbase_conf, String tableName, boolean autoflush)       
  •      throws IOException{       
  •         Configuration conf = new Configuration();       
  •         conf.addResource(new Path(hbase_conf));       
  •         config = new Configuration(conf);       
  •         htable = new HTable(config, tableName);       
  •         admin = new HBaseAdmin(config);       
  •         htable.setAutoFlush(autoflush);       
  •     }       
  • public void addPut(String field, String column, String encode) throws IOException {       
  •     try {       
  •             p.add(column.split(":")[0].getBytes(), column.split(":")[1].getBytes(),   
  •                     field.getBytes(encode));       
  •         } catch (UnsupportedEncodingException e) {       
  •             p.add(column.split(":")[0].getBytes(), column.split(":")[1].getBytes(),   
  •                     field.getBytes());       
  •         }       
  •                
  •     }       
  •     public void generatePut(String rowkey){       
  •         p = new Put(rowkey.getBytes());       
  •     }       
  •            
  •     public void putLine() throws IOException{       
  •         htable.put(p);       
  •     }  
HBaseProxy.java
public HBaseProxy(Stringhbase_conf, String tableName, boolean autoflush)
     throws IOException{
   Configuration conf = new Configuration();
   conf.addResource(new Path(hbase_conf));
   config = new Configuration(conf);
   htable = new HTable(config, tableName);
   admin = new HBaseAdmin(config);
   htable.setAutoFlush(autoflush);
  }
public void addPut(Stringfield, String column, String encode) throws IOException {
    try {
     p.add(column.split(":")[0].getBytes(), column.split(":")[1].getBytes(),
        field.getBytes(encode));
   } catch (UnsupportedEncodingException e) {
     p.add(column.split(":")[0].getBytes(),column.split(":")[1].getBytes(),
        field.getBytes());
   }
   
  }
    public void generatePut(String rowkey){
   p = new Put(rowkey.getBytes());
  }
  
    public void putLine() throws IOException{
   htable.put(p);
  }

    总之,在同一个进程中加载多个classloader时一定要注意,classloader A所加载的对象是不能转换成classloader B的对象的,当然也不能使用。两个空间的相互调用只能用java的基本类型或是反射。


更多精彩内容请关注:http://bbs.superwu.cn

关注超人学院微信二维码:技术分享

关注超人学院java免费学习交流群:技术分享

在不同版本hdfs集群之间转移数据

标签:hadoop   超人学院   hbase   

原文地址:http://blog.csdn.net/crxy2014/article/details/46378845

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!