storm-hdfs 这个插件支持 Rotation Actions 这个功能,官方文档解释是这样的,
### File Rotation Actions
Both the HDFS bolt and Trident State implementation allow you to register any number of `RotationAction`s.
What `RotationAction`s do is provide a hook to allow you to perform some action right after a file is rotated. For
example, moving a file to a different location or renaming it.
大概意思就是,写完hdfs文件之后会调用这个注册的方法,可以做个善后工作,比如移动个文件夹啥的。官方给了个例子,MoveFileAction 也是当文件写完之后移动到另外一个文件夹中,可是坑爹的是这个类不好用。于是没办法只能自己手动写一个了,把这个公布出来大家一起分享。
package lanbo.storm.kafka.examples; import org.apache.commons.io.IOUtils; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.storm.hdfs.common.rotation.RotationAction; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; /** * Created by lanbo on 15-2-13. */ public class CopyFileAction implements RotationAction { private static final Logger LOG = LoggerFactory.getLogger(CopyFileAction.class); private String destination; public CopyFileAction toDestination(String destDir){ destination = destDir; return this; } @Override public void execute(FileSystem fileSystem, Path filePath) throws IOException { Path destPath = new Path(destination, filePath.getName()); LOG.info("Moving file {} to {}", filePath, destPath); InputStream ins = fileSystem.open(filePath); OutputStream os = fileSystem.create(destPath); try{ IOUtils.copy(ins, os); fileSystem.delete(filePath,true); }catch(Exception e){ throw new IOException(e); }finally{ ins.close(); os.close(); } return; } }这个类简单的就能实现移动文件夹,以后会陆续更新很多更有用的工具
storm-hdfs RotationActions 接口用法
原文地址:http://blog.csdn.net/cuilanbo/article/details/43816317