标签:
thirift是一个支持跨种语言的远程调用框架,通过thrift远程调用框架,结合hadoop1.x中的thriftfs,编写了一个针对hadoop2.x的thriftfs,供外部程序调用。
1.准备工作
1.1 编译boost
boost下载地址:http://120.52.72.39/jaist.dl.sourceforge.net/c3pr90ntcsf0/project/boost/boost/1.60.0/boost_1_60_0.tar.gz
解压boost,windows运行bootstrap.bat,然后点击生成的b2.exe。
若有多个vs编译环境,选择特定环境编译时,选择该vs的命令提示工具(开始-》vs-》工具-》命令行xxx),cd boost解压目录,./b2.exe.
1.2 编译thrift
thrift下载地址:http://mirrors.cnnic.cn/apache/thrift/0.9.3/thrift-0.9.3.tar.gz
解压后,进入lib\cpp文件夹,选择thrift.sln(需要vs2010以上的工具打开),选择libthrift,设置boost头文件引用路径。
在编译过程中,删除不需要的.h和.cpp文件,增加需要的文件(视使用环境而定)。
1.3编译hadoopfs.thrift文件
下载编译好的thrift程序:http://www.apache.org/dyn/closer.cgi?path=/thrift/0.9.3/thrift-0.9.3.exe
修改hadoop1.x中的hadoopfs.thrift文件
hadoopfs.thrift内容:
#!/usr/local/bin/thrift -java # # Thrift Service exported by Hadoop File System # Dhruba Borthakur (dhruba@gmail.com) # /** * The available types in Thrift: * * bool Boolean, one byte * byte Signed byte * i16 Signed 16-bit integer * i32 Signed 32-bit integer * i64 Signed 64-bit integer * double 64-bit floating point value * string String * binary Blob (byte array) * map<t1,t2> Map from one type to another * list<t1> Ordered list of one type * set<t1> Set of unique elements of one type * */ namespace java org.apache.hadoop.thriftfs.api namespace php hadoopfs struct ThriftHandle { 1: i64 id } struct Pathname { 1: string pathname } struct FileStatus { 1: string path, 2: i64 length, 3: bool isdir, 4: i16 block_replication, 5: i64 blocksize, 6: i64 modification_time, 7: string permission, 8: string owner, 9: string group } struct BlockLocation { 1: list<string> hosts, /* hostnames of datanodes */ 2: list<string> names, /* hostname:portNumber of datanodes */ 3: i64 offset, /* offset of the block in the file */ 4: i64 length /* length of data */ } exception MalformedInputException { 1: string message } exception ThriftIOException { 1: string message } service ThriftHadoopFileSystem { // set inactivity timeout period. The period is specified in seconds. // if there are no RPC calls to the HadoopThrift server for this much // time, then the server kills itself. void setInactivityTimeoutPeriod(1:i64 periodInSeconds), // close session void shutdown(1:i32 status), // create a file and open it for writing ThriftHandle create(1:Pathname path) throws (1:ThriftIOException ouch), // create a file and open it for writing ThriftHandle createFile(1:Pathname path, 2:i16 mode, 3:bool overwrite, 4:i32 bufferSize, 5:i16 block_replication, 6:i64 blocksize) throws (1:ThriftIOException ouch), // returns a handle to an existing file for reading ThriftHandle open(1:Pathname path) throws (1:ThriftIOException ouch), // returns a handle to an existing file for appending to it. ThriftHandle append(1:Pathname path) throws (1:ThriftIOException ouch), // write a string to the open handle for the file bool write(1:ThriftHandle handle, 2:binary data) throws (1:ThriftIOException ouch), // read some bytes from the open handle for the file binary read(1:ThriftHandle handle, 2:i64 offset, 3:i32 size) throws (1:ThriftIOException ouch), // close file bool close(1:ThriftHandle out) throws (1:ThriftIOException ouch), // delete file(s) or directory(s) bool rm(1:Pathname path, 2:bool recursive) throws (1:ThriftIOException ouch), // rename file(s) or directory(s) bool rename(1:Pathname path, 2:Pathname dest) throws (1:ThriftIOException ouch), // create directory bool mkdirs(1:Pathname path) throws (1:ThriftIOException ouch), // Does this pathname exist? bool exists(1:Pathname path) throws (1:ThriftIOException ouch), // Returns status about the path FileStatus stat(1:Pathname path) throws (1:ThriftIOException ouch), // If the path is a directory, then returns the list of pathnames in that directory list<FileStatus> listStatus(1:Pathname path) throws (1:ThriftIOException ouch), // Set permission for this file void chmod(1:Pathname path, 2:i16 mode) throws (1:ThriftIOException ouch), // set the owner and group of the file. void chown(1:Pathname path, 2:string owner, 3:string group) throws (1:ThriftIOException ouch), // set the replication factor for all blocks of the specified file void setReplication(1:Pathname path, 2:i16 replication) throws (1:ThriftIOException ouch), // get the locations of the blocks of this file list<BlockLocation> getFileBlockLocations(1:Pathname path, 2:i64 start, 3:i64 length) throws (1:ThriftIOException ouch), }
编译 cpp,java文件
cmd进入thrift.exe文件夹下,复制hadoopfs.thrift到相同目录,分别运行
thrift -gen java hadoopfs.thrift
thrift -gen cpp hadoopfs.thrift
生成gen-cpp, gen-java文件夹,里面为生成的程序文件
2.编写hdfs服务端程序(java)
2.1 eclipse创建libthrift工程,复制thrift解压路径/lib/java/src下的代码到工程src目录下;复制1.3生成的gen-java中的代码到工程src目录下;
解压hadoop2.x(下载地址:http://mirrors.cnnic.cn/apache/hadoop/common/hadoop-2.6.3/hadoop-2.6.3.tar.gz);
eclipse添加lib引用的jar文件:进入hadoop2.x/share/hadoop/目录下,添加common, common/lib, hdfs, hdfs/lib文件夹下的所有jar到工程引用路径
修改hadoop1.x中提供的HadoopThriftServer代码为:
package org.apache.hadoop.thriftfs; import java.io.IOException; import java.net.InetSocketAddress; import java.net.ServerSocket; import java.nio.ByteBuffer; import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Random; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; // Include Generated code import org.apache.hadoop.thriftfs.api.Pathname; import org.apache.hadoop.thriftfs.api.ThriftHadoopFileSystem; import org.apache.hadoop.thriftfs.api.ThriftHandle; import org.apache.hadoop.thriftfs.api.ThriftIOException; import org.apache.hadoop.util.Daemon; import org.apache.hadoop.util.StringUtils; import org.apache.thrift.TException; import org.apache.thrift.server.TServer; import org.apache.thrift.server.TThreadPoolServer; import org.apache.thrift.transport.TServerSocket; import org.apache.thrift.transport.TServerTransport; /** * ThriftHadoopFileSystem * A thrift wrapper around the Hadoop File System */ public class HadoopThriftServer extends ThriftHadoopFileSystem { static int serverPort = 0; // default port TServer server = null; public static class HadoopThriftHandler implements ThriftHadoopFileSystem.Iface { public static final Log LOG = LogFactory.getLog("org.apache.hadoop.thrift"); // HDFS glue Configuration conf; FileSystem fs; // stucture that maps each Thrift object into an hadoop object private long nextId = new Random().nextLong(); private HashMap<Long, Object> hadoopHash = new HashMap<Long, Object>(); private Daemon inactivityThread = null; // Detect inactive session private static volatile long inactivityPeriod = 3600 * 1000; // 1 hr private static volatile long inactivityRecheckInterval = 60 * 1000; private static volatile boolean fsRunning = true; private static long now; // allow outsider to change the hadoopthrift path public void setOption(String key, String val) { } /** * Current system time. * @return current time in msec. */ static long now() { return System.currentTimeMillis(); } /** * getVersion * * @return current version of the interface. */ public String getVersion() { return "0.1"; } /** * shutdown * * cleanly closes everything and exit. */ @Override public void shutdown(int status) { LOG.info("HadoopThriftServer shutting down."); try { fs.close(); } catch (IOException e) { LOG.warn("Unable to close file system"); } Runtime.getRuntime().exit(status); } /** * Periodically checks to see if there is inactivity */ class InactivityMonitor implements Runnable { @Override public void run() { while (fsRunning) { try { if (now() > now + inactivityPeriod) { LOG.warn("HadoopThriftServer Inactivity period of " + inactivityPeriod + " expired... Stopping Server."); shutdown(-1); } } catch (Exception e) { LOG.error(StringUtils.stringifyException(e)); } try { Thread.sleep(inactivityRecheckInterval); } catch (InterruptedException ie) { } } } } /** * HadoopThriftServer * * Constructor for the HadoopThriftServer glue with Thrift Class. * * @param name - the name of this handler */ public HadoopThriftHandler(String name) { conf = new Configuration(); now = now(); try { inactivityThread = new Daemon(new InactivityMonitor()); fs = FileSystem.get(conf); } catch (IOException e) { LOG.warn("Unable to open hadoop file system..."); Runtime.getRuntime().exit(-1); } } /** * printStackTrace * * Helper function to print an exception stack trace to the log and not stderr * * @param e the exception * */ static private void printStackTrace(Exception e) { for(StackTraceElement s: e.getStackTrace()) { LOG.error(s); } } /** * Lookup a thrift object into a hadoop object */ private synchronized Object lookup(long id) { return hadoopHash.get(new Long(id)); } /** * Insert a thrift object into a hadoop object. Return its id. */ private synchronized long insert(Object o) { nextId++; hadoopHash.put(nextId, o); return nextId; } /** * Delete a thrift object from the hadoop store. */ private synchronized Object remove(long id) { return hadoopHash.remove(new Long(id)); } /** * Implement the API exported by this thrift server */ /** Set inactivity timeout period. The period is specified in seconds. * if there are no RPC calls to the HadoopThrift server for this much * time, then the server kills itself. */ @Override public synchronized void setInactivityTimeoutPeriod(long periodInSeconds) { inactivityPeriod = periodInSeconds * 1000; // in milli seconds if (inactivityRecheckInterval > inactivityPeriod ) { inactivityRecheckInterval = inactivityPeriod; } } /** * Create a file and open it for writing */ @Override public ThriftHandle create(Pathname path) throws ThriftIOException { try { now = now(); HadoopThriftHandler.LOG.debug("create: " + path); FSDataOutputStream out = fs.create(new Path(path.pathname)); long id = insert(out); ThriftHandle obj = new ThriftHandle(id); HadoopThriftHandler.LOG.debug("created: " + path + " id: " + id); return obj; } catch (IOException e) { throw new ThriftIOException(e.getMessage()); } } /** * Create a file and open it for writing, delete file if it exists */ @Override public ThriftHandle createFile(Pathname path, short mode, boolean overwrite, int bufferSize, short replication, long blockSize) throws ThriftIOException { try { now = now(); HadoopThriftHandler.LOG.debug("create: " + path + " permission: " + mode + " overwrite: " + overwrite + " bufferSize: " + bufferSize + " replication: " + replication + " blockSize: " + blockSize); FSDataOutputStream out = fs.create(new Path(path.pathname), new FsPermission(mode), overwrite, bufferSize, replication, blockSize, null); // progress long id = insert(out); ThriftHandle obj = new ThriftHandle(id); HadoopThriftHandler.LOG.debug("created: " + path + " id: " + id); return obj; } catch (IOException e) { throw new ThriftIOException(e.getMessage()); } } /** * Opens an existing file and returns a handle to read it */ @Override public ThriftHandle open(Pathname path) throws ThriftIOException { try { now = now(); HadoopThriftHandler.LOG.debug("open: " + path); FSDataInputStream out = fs.open(new Path(path.pathname)); long id = insert(out); ThriftHandle obj = new ThriftHandle(id); HadoopThriftHandler.LOG.debug("opened: " + path + " id: " + id); return obj; } catch (IOException e) { throw new ThriftIOException(e.getMessage()); } } /** * Opens an existing file to append to it. */ @Override public ThriftHandle append(Pathname path) throws ThriftIOException { try { now = now(); HadoopThriftHandler.LOG.debug("append: " + path); FSDataOutputStream out = fs.append(new Path(path.pathname)); long id = insert(out); ThriftHandle obj = new ThriftHandle(id); HadoopThriftHandler.LOG.debug("appended: " + path + " id: " + id); return obj; } catch (IOException e) { throw new ThriftIOException(e.getMessage()); } } /** * write to a file */ @Override public boolean write(ThriftHandle tout, ByteBuffer data) throws ThriftIOException, TException { try { now = now(); HadoopThriftHandler.LOG.debug("write: " + tout.id); FSDataOutputStream out = (FSDataOutputStream)lookup(tout.id); // Retrieve all bytes in the buffer byte[] bytes = new byte[data.limit()]; // transfer bytes from this buffer into the given destination array data.get(bytes); out.write(bytes, 0, bytes.length); data.clear(); HadoopThriftHandler.LOG.debug("wrote: " + tout.id); return true; } catch (IOException e) { throw new ThriftIOException(e.getMessage()); } } /** * read from a file */ @Override public ByteBuffer read(ThriftHandle tout, long offset, int length) throws ThriftIOException, TException { try { now = now(); HadoopThriftHandler.LOG.debug("read: " + tout.id + " offset: " + offset + " length: " + length); FSDataInputStream in = (FSDataInputStream)lookup(tout.id); if (in.getPos() != offset) { in.seek(offset); } byte[] tmp = new byte[length]; int numbytes = in.read(offset, tmp, 0, length); HadoopThriftHandler.LOG.debug("read done: " + tout.id); return ByteBuffer.wrap(tmp,0,numbytes); } catch (IOException e) { throw new ThriftIOException(e.getMessage()); } } /** * Delete a file/directory */ @Override public boolean rm(Pathname path, boolean recursive) throws ThriftIOException { try { now = now(); HadoopThriftHandler.LOG.debug("rm: " + path + " recursive: " + recursive); boolean ret = fs.delete(new Path(path.pathname), recursive); HadoopThriftHandler.LOG.debug("rm: " + path); return ret; } catch (IOException e) { throw new ThriftIOException(e.getMessage()); } } /** * Move a file/directory */ @Override public boolean rename(Pathname path, Pathname dest) throws ThriftIOException { try { now = now(); HadoopThriftHandler.LOG.debug("rename: " + path + " destination: " + dest); boolean ret = fs.rename(new Path(path.pathname), new Path(dest.pathname)); HadoopThriftHandler.LOG.debug("rename: " + path); return ret; } catch (IOException e) { throw new ThriftIOException(e.getMessage()); } } /** * close file */ @Override public boolean close(ThriftHandle tout) throws ThriftIOException { try { now = now(); HadoopThriftHandler.LOG.debug("close: " + tout.id); Object obj = remove(tout.id); if (obj instanceof FSDataOutputStream) { FSDataOutputStream out = (FSDataOutputStream)obj; out.close(); } else if (obj instanceof FSDataInputStream) { FSDataInputStream in = (FSDataInputStream)obj; in.close(); } else { throw new ThriftIOException("Unknown thrift handle."); } HadoopThriftHandler.LOG.debug("closed: " + tout.id); return true; } catch (IOException e) { throw new ThriftIOException(e.getMessage()); } } /** * Create a directory */ @Override public boolean mkdirs(Pathname path) throws ThriftIOException { try { now = now(); HadoopThriftHandler.LOG.debug("mkdirs: " + path); boolean ret = fs.mkdirs(new Path(path.pathname)); HadoopThriftHandler.LOG.debug("mkdirs: " + path); return ret; } catch (IOException e) { throw new ThriftIOException(e.getMessage()); } } /** * Does this pathname exist? */ @Override public boolean exists(Pathname path) throws ThriftIOException { try { now = now(); HadoopThriftHandler.LOG.debug("exists: " + path); boolean ret = fs.exists(new Path(path.pathname)); HadoopThriftHandler.LOG.debug("exists done: " + path); return ret; } catch (IOException e) { throw new ThriftIOException(e.getMessage()); } } /** * Returns status about the specified pathname */ @Override public org.apache.hadoop.thriftfs.api.FileStatus stat( Pathname path) throws ThriftIOException { try { now = now(); HadoopThriftHandler.LOG.debug("stat: " + path); org.apache.hadoop.fs.FileStatus stat = fs.getFileStatus( new Path(path.pathname)); HadoopThriftHandler.LOG.debug("stat done: " + path); return new org.apache.hadoop.thriftfs.api.FileStatus( stat.getPath().toString(), stat.getLen(), stat.isDir(), stat.getReplication(), stat.getBlockSize(), stat.getModificationTime(), stat.getPermission().toString(), stat.getOwner(), stat.getGroup()); } catch (IOException e) { throw new ThriftIOException(e.getMessage()); } } /** * If the specified pathname is a directory, then return the * list of pathnames in this directory */ @Override public List<org.apache.hadoop.thriftfs.api.FileStatus> listStatus( Pathname path) throws ThriftIOException { try { now = now(); HadoopThriftHandler.LOG.debug("listStatus: " + path); org.apache.hadoop.fs.FileStatus[] stat = fs.listStatus( new Path(path.pathname)); HadoopThriftHandler.LOG.debug("listStatus done: " + path); org.apache.hadoop.thriftfs.api.FileStatus tmp; List<org.apache.hadoop.thriftfs.api.FileStatus> value = new LinkedList<org.apache.hadoop.thriftfs.api.FileStatus>(); for (int i = 0; i < stat.length; i++) { tmp = new org.apache.hadoop.thriftfs.api.FileStatus( stat[i].getPath().toString(), stat[i].getLen(), stat[i].isDir(), stat[i].getReplication(), stat[i].getBlockSize(), stat[i].getModificationTime(), stat[i].getPermission().toString(), stat[i].getOwner(), stat[i].getGroup()); value.add(tmp); } return value; } catch (IOException e) { throw new ThriftIOException(e.getMessage()); } } /** * Sets the permission of a pathname */ @Override public void chmod(Pathname path, short mode) throws ThriftIOException { try { now = now(); HadoopThriftHandler.LOG.debug("chmod: " + path + " mode " + mode); fs.setPermission(new Path(path.pathname), new FsPermission(mode)); HadoopThriftHandler.LOG.debug("chmod done: " + path); } catch (IOException e) { throw new ThriftIOException(e.getMessage()); } } /** * Sets the owner & group of a pathname */ @Override public void chown(Pathname path, String owner, String group) throws ThriftIOException { try { now = now(); HadoopThriftHandler.LOG.debug("chown: " + path + " owner: " + owner + " group: " + group); fs.setOwner(new Path(path.pathname), owner, group); HadoopThriftHandler.LOG.debug("chown done: " + path); } catch (IOException e) { throw new ThriftIOException(e.getMessage()); } } /** * Sets the replication factor of a file */ @Override public void setReplication(Pathname path, short repl) throws ThriftIOException { try { now = now(); HadoopThriftHandler.LOG.debug("setrepl: " + path + " replication factor: " + repl); fs.setReplication(new Path(path.pathname), repl); HadoopThriftHandler.LOG.debug("setrepl done: " + path); } catch (IOException e) { throw new ThriftIOException(e.getMessage()); } } /** * Returns the block locations of this file */ @Override public List<org.apache.hadoop.thriftfs.api.BlockLocation> getFileBlockLocations(Pathname path, long start, long length) throws ThriftIOException { try { now = now(); HadoopThriftHandler.LOG.debug("getFileBlockLocations: " + path); org.apache.hadoop.fs.FileStatus status = fs.getFileStatus( new Path(path.pathname)); org.apache.hadoop.fs.BlockLocation[] stat = fs.getFileBlockLocations(status, start, length); HadoopThriftHandler.LOG.debug("getFileBlockLocations done: " + path); org.apache.hadoop.thriftfs.api.BlockLocation tmp; List<org.apache.hadoop.thriftfs.api.BlockLocation> value = new LinkedList<org.apache.hadoop.thriftfs.api.BlockLocation>(); for (int i = 0; i < stat.length; i++) { // construct the list of hostnames from the array returned // by HDFS List<String> hosts = new LinkedList<String>(); String[] hostsHdfs = stat[i].getHosts(); for (int j = 0; j < hostsHdfs.length; j++) { hosts.add(hostsHdfs[j]); } // construct the list of host:port from the array returned // by HDFS List<String> names = new LinkedList<String>(); String[] namesHdfs = stat[i].getNames(); for (int j = 0; j < namesHdfs.length; j++) { names.add(namesHdfs[j]); } tmp = new org.apache.hadoop.thriftfs.api.BlockLocation( hosts, names, stat[i].getOffset(), stat[i].getLength()); value.add(tmp); } return value; } catch (IOException e) { throw new ThriftIOException(e.getMessage()); } } } // Bind to port. If the specified port is 0, then bind to random port. private ServerSocket createServerSocket(int port) throws IOException { try { ServerSocket sock = new ServerSocket(); // Prevent 2MSL delay problem on server restarts sock.setReuseAddress(true); // Bind to listening port if (port == 0) { sock.bind(null); serverPort = sock.getLocalPort(); } else { sock.bind(new InetSocketAddress(port)); } return sock; } catch (IOException ioe) { throw new IOException("Could not create ServerSocket on port " + port + "." + ioe); } } /** * Constrcts a server object */ public HadoopThriftServer(String [] args) { if (args.length > 0) { serverPort = new Integer(args[0]); } try { ServerSocket ssock = createServerSocket(serverPort); TServerTransport serverTransport = new TServerSocket(ssock); Iface handler = new HadoopThriftHandler("hdfs-thrift-dhruba"); ThriftHadoopFileSystem.Processor processor = new ThriftHadoopFileSystem.Processor(handler); TThreadPoolServer.Args options = new TThreadPoolServer.Args(serverTransport); options.minWorkerThreads(10); options.processor(processor); server = new TThreadPoolServer(options); System.out.println("Starting the hadoop thrift server on port [" + serverPort + "]..."); HadoopThriftHandler.LOG.info("Starting the hadoop thrift server on port [" +serverPort + "]..."); System.out.flush(); } catch (Exception x) { x.printStackTrace(); } } public static void main(String [] args) { HadoopThriftServer me = new HadoopThriftServer(args); me.server.serve(); } };
红色部分为修改的内容;这里,我们使用thrift的binary类型传输文件!!!
参考资料:使用Thrift传输二进制数据遇到的问题
3.windows c++客户端
3.1创建工程,添加boost,thrift中的lib/cpp/src文件夹为头文件引用路径,将1.2生成的libthrift.lib添加到工程根目录(或添加到库目录引用路径)
3.2复制1.3生成的gen-cpp文件夹下的代码到工程根目录,添加到工程
3.3 编写thriftfsclient处理类:
HdfsClient.h
#pragma once #include "hadoopfs_types.h" #include "ThriftHadoopFileSystem.h" #include <boost\shared_ptr.hpp> #include <thrift\transport\TSocket.h> #include <thrift\transport\TBufferTransports.h> #include <thrift\protocol\TBinaryProtocol.h> using namespace ::apache::thrift; using namespace ::apache::thrift::protocol; using namespace ::apache::thrift::transport; using boost::shared_ptr; #define FILEOPEN_SUCCESS 0 class HdfsClient { private: bool m_IsConn; shared_ptr<TTransport> m_Socket; shared_ptr<TBufferedTransport> m_Transport; shared_ptr<TBinaryProtocol> m_Protocol; shared_ptr<ThriftHadoopFileSystemClient> m_Client; ThriftHandle m_Handler; public: HdfsClient(void); ~HdfsClient(void); bool connect(const std::string server,int port); bool shutdown(); bool put(const std::string& localfile,const std::string& rem_path); bool append(const std::string& localfile,const std::string& rem_path); bool get(const std::string& rem_path,const std::string& localfile); bool rm(const std::string& rem_path, const bool recursive=false); bool mv(const std::string& src_path,const std::string& dst_path); bool mkdirs(const std::string& rem_path); bool exists(const std::string& rem_path); void ls(std::vector<FileStatus> & result, const std::string& path); void chmod(const std::string& path, const int16_t mode); void chown(const std::string& path, const std::string& owner); void setReplication(const std::string& path, const int16_t replication); void getFileBlockLocations(std::vector<BlockLocation> & result, const std::string& path, const int64_t start, const int64_t length); };
HdfsClient.cpp
#include "StdAfx.h" #include "HdfsClient.h" #include <stdio.h> HdfsClient::HdfsClient(void) { m_IsConn = false; } HdfsClient::~HdfsClient(void) { if(m_IsConn) shutdown(); } bool HdfsClient::connect(std::string server,int port) { m_Socket = shared_ptr<TTransport>(new TSocket(server,port)); m_Transport = shared_ptr<TBufferedTransport>(new TBufferedTransport(m_Socket)); m_Protocol = shared_ptr<TBinaryProtocol>(new TBinaryProtocol(m_Transport)); m_Client = shared_ptr<ThriftHadoopFileSystemClient>(new ThriftHadoopFileSystemClient(m_Protocol)); try { m_Transport->open(); // tell the HadoopThrift server to die after 60 minutes of inactivity m_Client->setInactivityTimeoutPeriod(3600); m_IsConn = true; } catch (const ThriftIOException& ex) { printf("ERROR: %s",ex.message.c_str()); return false; } return true; } bool HdfsClient::shutdown() { try { m_Transport->close(); m_IsConn = false; } catch (const ThriftIOException& ex) { printf("ERROR: %s",ex.message.c_str()); return false; } return true; } bool HdfsClient::put(const std::string& localfile,const std::string& rem_path) { Pathname ptname; ptname.pathname = rem_path; m_Client->create(m_Handler,ptname);//Create the specified file. Returns a handle to write data. if(m_Handler.id == 0)//error return false; else { FILE* fp = fopen(localfile.c_str(),"rb"); if(GetLastError() != FILEOPEN_SUCCESS) return false; /*//A.read the whole file into content; //把文件的位置指针移到文件尾 fseek(fp,0L,SEEK_END); //获取文件长度; long length=ftell(fp); fseek(fp,0,SEEK_SET);//将文件指针设置到开始位置 char* buffer = new char[length]; //memset(buffer,‘\0‘,length); fread(buffer,sizeof(char),length,fp); //write data to hdfs std::string content; content.append(buffer,buffer+length);//string m_Client->write(m_Handler,content);*/ //B.分块读取,然后上传; 可以使用内存映射文件方式将文件读入内存,然后发送到hdfs size_t bufferSize = 1 << 20;//1M size_t readSize = 0; char* buffer = new char[bufferSize]; while(!feof(fp)) { readSize = fread(buffer,sizeof(char),bufferSize,fp); //write data to hdfs std::string content; content.append(buffer,buffer+readSize);//string m_Client->write(m_Handler,content); } fclose(fp); delete[] buffer; return m_Client->close(m_Handler); } } bool HdfsClient::append(const std::string& localfile,const std::string& rem_path) { Pathname ptname; ptname.pathname = rem_path; m_Client->append(m_Handler,ptname); if(m_Handler.id == 0)//error return false; else { FILE* fp = fopen(localfile.c_str(),"rb"); if(GetLastError() != FILEOPEN_SUCCESS) return false; /*//A.read the whole file into content; //把文件的位置指针移到文件尾 fseek(fp,0L,SEEK_END); //获取文件长度; long length=ftell(fp); fseek(fp,0,SEEK_SET);//将文件指针设置到开始位置 char* buffer = new char[length]; //memset(buffer,‘\0‘,length); fread(buffer,sizeof(char),length,fp); //write data to hdfs std::string content; content.append(buffer,buffer+length);//string m_Client->write(m_Handler,content);*/ //B.分块读取,然后上传; 可以使用内存映射文件方式将文件读入内存,然后发送到hdfs size_t bufferSize = 1 << 20;//1M size_t readSize = 0; char* buffer = new char[bufferSize]; while(!feof(fp)) { readSize = fread(buffer,sizeof(char),bufferSize,fp); //write data to hdfs std::string content; content.append(buffer,buffer+readSize);//string m_Client->write(m_Handler,content); } fclose(fp); delete[] buffer; return m_Client->close(m_Handler); } } bool HdfsClient::get(const std::string& rem_path,const std::string& localfile) { Pathname ptname; ptname.__set_pathname(rem_path); m_Client->open(m_Handler,ptname); if(m_Handler.id == 0)//error return false; else { FileStatus rfstat; m_Client->stat(rfstat,ptname); int64_t offset = 0; int bufferSize = 1 << 20;//1M std::string content; int contentlen = 0; FILE* fp = fopen(localfile.c_str(),"wb+"); DWORD err_code = GetLastError(); if(err_code != FILEOPEN_SUCCESS) return false; while(offset < rfstat.length) { m_Client->read(content,m_Handler,offset,bufferSize); contentlen = content.length(); if(contentlen > 0) { fwrite(content.c_str(),sizeof(char),contentlen,fp);//todo: can use multi thread to read and write offset += contentlen; } else break; } fclose(fp); return m_Client->close(m_Handler); } } bool HdfsClient::rm(const std::string& rem_path, const bool recursive) { Pathname ptname; ptname.pathname = rem_path; return m_Client->rm(ptname,recursive); } bool HdfsClient::mv(const std::string& src_path,const std::string& dst_path) { Pathname src_ptname,dst_ptname; src_ptname.pathname = src_path; dst_ptname.pathname = dst_path; return m_Client->rename(src_ptname,dst_ptname); } bool HdfsClient::mkdirs(const std::string& rem_path) { Pathname ptname; ptname.pathname = rem_path; return m_Client->mkdirs(ptname); } bool HdfsClient::exists(const std::string& rem_path) { Pathname ptname; ptname.pathname = rem_path; return m_Client->exists(ptname); } void HdfsClient::ls(std::vector<FileStatus> & result, const std::string& path) { Pathname ptname; ptname.pathname = path; m_Client->listStatus(result,ptname); } void HdfsClient::chmod(const std::string& path, const int16_t mode) { Pathname ptname; ptname.pathname = path; m_Client->chmod(ptname,mode); } void HdfsClient::chown(const std::string& path, const std::string& owner) { Pathname ptname; ptname.pathname = path; FileStatus rfstat; m_Client->stat(rfstat,ptname); m_Client->chown(ptname,owner,rfstat.group); } void HdfsClient::setReplication(const std::string& path, const int16_t replication) { Pathname ptname; ptname.pathname = path; m_Client->setReplication(ptname,replication); } void HdfsClient::getFileBlockLocations(std::vector<BlockLocation> & result, const std::string& path, const int64_t start, const int64_t length) { Pathname ptname; ptname.pathname = path; m_Client->getFileBlockLocations(result,ptname,start,length); } int main() { std::string host = "192.168.0.111"; int port = 50841; HdfsClient hdfs; std::string local_file = ".\\hadoop1.1.2-thriftfs.rar"; std::string local_file2 = ".\\test.rar"; std::string rem_file = "hdfs://master:9000/test.txt"; std::string rem_dir = "hdfs://master:9000/"; hdfs.connect(host,port); std::vector<FileStatus> result; hdfs.put(local_file,rem_file); //hdfs.append(local_file,rem_file); //hdfs.rm(rem_file); hdfs.ls(result,rem_dir); for (std::vector<FileStatus>::const_iterator itr = result.begin(); itr != result.end(); itr++) { printf("%s\t%d\n",itr->path.c_str(),itr->length); } hdfs.get(rem_file,local_file2); getchar(); return 0; }
4.测试
4.1安装配置hadoop2.x环境
(具体步骤参考网络)
4.2 编写开启服务端程序的脚本
首先将服务端的java代码编译成功后打包成jar文件(libthrift.jar),放在libthrift文件夹下。
然后复制hadoop安装目录/etc/hadoop/下的core-site.xml 和 hdfs-site.xml配置文件到脚本所在目录(访问hdfs时使用)(参考:http://blog.csdn.net/kkdelta/article/details/19908209)
start_thrift_server.sh脚本
#!/bin/sh CLASSPATH= HADOOP_DIR=/usr/hadoop-2.6.3 # the hadoop common libraries for f in $HADOOP_DIR/share/hadoop/common/*.jar ; do CLASSPATH=$CLASSPATH:$f done # the apache libraries for f in $HADOOP_DIR/share/hadoop/common/lib/*.jar ; do CLASSPATH=$CLASSPATH:$f done # the hadoop hdfs libraries for f in $HADOOP_DIR/share/hadoop/hdfs/*.jar ; do CLASSPATH=$CLASSPATH:$f done # the apache libraries for f in $HADOOP_DIR/share/hadoop/hdfs/lib/*.jar ; do CLASSPATH=$CLASSPATH:$f done # the thrift libraries for f in ./libthrift/*.jar ; do CLASSPATH=$CLASSPATH:$f done java -Dcom.sun.management.jmxremote -cp $CLASSPATH org.apache.hadoop.thriftfs.HadoopThriftServer $*
运行该脚本,记录程序数据的端口号,便于客户端使用。
测试c++客户端,测试上传、下载等操作是否正常。
标签:
原文地址:http://www.cnblogs.com/hikeepgoing/p/5295909.html