标签:
自从前段时间的一个事故让队列里缓存的大量关键数据丢失后,一直琢磨着弄一个能持久化到本地文件的队列,这样即使系统再次发生意外,我也不至于再苦逼的修数据了。选定使用mappedbytebuffer来实现,但做出来的原型不够理想。《高性能队列Fqueue的设计和使用实践》这篇文章给了我莫大的帮助。 当然只是借鉴了大致的文件系统结构,绝大部分还是按自己的想法来的。
上图就是队列的文件系统,index文件记录了队列当前的读写文件号,读写位置和读写计数。队列的size是通过读写计数writeCounter-readCounter的方式记录的,这样做的好处是可以做到读写分离。运行时size用一个AtomicInteger变量记录,系统初始化加载队列时才用到读写计数差。block文件记录了实际的入队数据,每个block必须要有足够的空间写入4(len)+data.length+4(EOF)长度的数据,否则写入一个EOF标记,换一个新的block开始写入数据,而当读取到这个EOF时,表示这个block读取完毕,载入下一个block,如果有的话,释放并准备删除当前block。现在规定的block大小32MB,按我的使用场景,每个block可以写入100W数据(PS:protostuff-runtime挺不错的,我用它做的序列化)。
最后附上代码:
public class MFQueuePool { private static final Logger LOGGER = LoggerFactory.getLogger(MFQueuePool.class); private static final BlockingQueue<String> DELETING_QUEUE = new LinkedBlockingQueue<>(); private static MFQueuePool instance = null; private String fileBackupPath; private Map<String, MFQueue> fQueueMap; private ScheduledExecutorService syncService; private MFQueuePool(String fileBackupPath) { this.fileBackupPath = fileBackupPath; File fileBackupDir = new File(fileBackupPath); if (!fileBackupDir.exists() && !fileBackupDir.mkdir()) { throw new IllegalArgumentException("can not create directory"); } this.fQueueMap = scanDir(fileBackupDir); this.syncService = Executors.newSingleThreadScheduledExecutor(); this.syncService.scheduleWithFixedDelay(new Runnable() { @Override public void run() { for (MFQueue MFQueue : fQueueMap.values()) { MFQueue.sync(); } deleteBlockFile(); } }, 10L, 10L, TimeUnit.SECONDS); } private void deleteBlockFile() { String blockFilePath = DELETING_QUEUE.poll(); if (StringUtils.isNotBlank(blockFilePath)) { File delFile = new File(blockFilePath); try { if (!delFile.delete()) { LOGGER.warn("block file:{} delete failed", blockFilePath); } } catch (SecurityException e) { LOGGER.error("security manager exists, delete denied"); } } } private static void toClear(String filePath) { DELETING_QUEUE.add(filePath); } private Map<String, MFQueue> scanDir(File fileBackupDir) { if (!fileBackupDir.isDirectory()) { throw new IllegalArgumentException("it is not a directory"); } Map<String, MFQueue> exitsFQueues = new HashMap<>(); File[] indexFiles = fileBackupDir.listFiles(new FilenameFilter() { @Override public boolean accept(File dir, String name) { return MFQueueIndex.isIndexFile(name); } }); if (ArrayUtils.isNotEmpty(indexFiles)) { for (File indexFile : indexFiles) { String queueName = MFQueueIndex.parseQueueName(indexFile.getName()); exitsFQueues.put(queueName, new MFQueue(queueName, fileBackupPath)); } } return exitsFQueues; } public synchronized static void init(String deployPath) { if (instance == null) { instance = new MFQueuePool(deployPath); } } private void disposal() { this.syncService.shutdown(); for (MFQueue MFQueue : fQueueMap.values()) { MFQueue.close(); } while (!DELETING_QUEUE.isEmpty()) { deleteBlockFile(); } } public synchronized static void destory() { if (instance != null) { instance.disposal(); instance = null; } } private MFQueue getQueueFromPool(String queueName) { if (fQueueMap.containsKey(queueName)) { return fQueueMap.get(queueName); } MFQueue MFQueue = new MFQueue(queueName, fileBackupPath); fQueueMap.put(queueName, MFQueue); return MFQueue; } public synchronized static MFQueue getFQueue(String queueName) { if (StringUtils.isBlank(queueName)) { throw new IllegalArgumentException("empty queue name"); } return instance.getQueueFromPool(queueName); } public static class MFQueue extends AbstractQueue<byte[]> { private String queueName; private String fileBackupDir; private MFQueueIndex index; private MFQueueBlock readBlock; private MFQueueBlock writeBlock; private ReentrantLock readLock; private ReentrantLock writeLock; private AtomicInteger size; public MFQueue(String queueName, String fileBackupDir) { this.queueName = queueName; this.fileBackupDir = fileBackupDir; this.readLock = new ReentrantLock(); this.writeLock = new ReentrantLock(); this.index = new MFQueueIndex(MFQueueIndex.formatIndexFilePath(queueName, fileBackupDir)); this.size = new AtomicInteger(index.getWriteCounter() - index.getReadCounter()); this.writeBlock = new MFQueueBlock(index, MFQueueBlock.formatBlockFilePath(queueName, index.getWriteNum(), fileBackupDir)); if (index.getReadNum() == index.getWriteNum()) { this.readBlock = this.writeBlock.duplicate(); } else { this.readBlock = new MFQueueBlock(index, MFQueueBlock.formatBlockFilePath(queueName, index.getReadNum(), fileBackupDir)); } } @Override public Iterator<byte[]> iterator() { throw new UnsupportedOperationException(); } @Override public int size() { return this.size.get(); } private void rotateNextWriteBlock() { int nextWriteBlockNum = index.getWriteNum() + 1; nextWriteBlockNum = (nextWriteBlockNum < 0) ? 0 : nextWriteBlockNum; writeBlock.putEOF(); if (index.getReadNum() == index.getWriteNum()) { writeBlock.sync(); } else { writeBlock.close(); } writeBlock = new MFQueueBlock(index, MFQueueBlock.formatBlockFilePath(queueName, nextWriteBlockNum, fileBackupDir)); index.putWriteNum(nextWriteBlockNum); index.putWritePosition(0); } @Override public boolean offer(byte[] bytes) { if (ArrayUtils.isEmpty(bytes)) { return true; } writeLock.lock(); try { if (!writeBlock.isSpaceAvailable(bytes.length)) { rotateNextWriteBlock(); } writeBlock.write(bytes); size.incrementAndGet(); return true; } finally { writeLock.unlock(); } } private void rotateNextReadBlock() { if (index.getReadNum() == index.getWriteNum()) { // 读缓存块的滑动必须发生在写缓存块滑动之后 return; } int nextReadBlockNum = index.getReadNum() + 1; nextReadBlockNum = (nextReadBlockNum < 0) ? 0 : nextReadBlockNum; readBlock.close(); String blockPath = readBlock.getBlockFilePath(); if (nextReadBlockNum == index.getWriteNum()) { readBlock = writeBlock.duplicate(); } else { readBlock = new MFQueueBlock(index, MFQueueBlock.formatBlockFilePath(queueName, nextReadBlockNum, fileBackupDir)); } index.putReadNum(nextReadBlockNum); index.putReadPosition(0); MFQueuePool.toClear(blockPath); } @Override public byte[] poll() { readLock.lock(); try { if (readBlock.eof()) { rotateNextReadBlock(); } byte[] bytes = readBlock.read(); if (bytes != null) { size.decrementAndGet(); } return bytes; } finally { readLock.unlock(); } } @Override public byte[] peek() { throw new UnsupportedOperationException(); } public void sync() { index.sync(); // read block只读,不用同步 writeBlock.sync(); } public void close() { writeBlock.close(); if (index.getReadNum() != index.getWriteNum()) { readBlock.close(); } index.reset(); index.close(); } } @SuppressWarnings("UnusedDeclaration") private static class MFQueueIndex { private static final String MAGIC = "v.1.0000"; private static final String INDEX_FILE_SUFFIX = ".idx"; private static final int INDEX_SIZE = 32; private static final int READ_NUM_OFFSET = 8; private static final int READ_POS_OFFSET = 12; private static final int READ_CNT_OFFSET = 16; private static final int WRITE_NUM_OFFSET = 20; private static final int WRITE_POS_OFFSET = 24; private static final int WRITE_CNT_OFFSET = 28; private int p11, p12, p13, p14, p15, p16, p17, p18; // 缓存行填充 32B private volatile int readPosition; // 12 读索引位置 private volatile int readNum; // 8 读索引文件号 private volatile int readCounter; // 16 总读取数量 private int p21, p22, p23, p24, p25, p26, p27, p28; // 缓存行填充 32B private volatile int writePosition; // 24 写索引位置 private volatile int writeNum; // 20 写索引文件号 private volatile int writeCounter; // 28 总写入数量 private int p31, p32, p33, p34, p35, p36, p37, p38; // 缓存行填充 32B private RandomAccessFile indexFile; private FileChannel fileChannel; // 读写分离 private MappedByteBuffer writeIndex; private MappedByteBuffer readIndex; public MFQueueIndex(String indexFilePath) { File file = new File(indexFilePath); try { if (file.exists()) { this.indexFile = new RandomAccessFile(file, "rw"); byte[] bytes = new byte[8]; this.indexFile.read(bytes, 0, 8); if (!MAGIC.equals(new String(bytes))) { throw new IllegalArgumentException("version mismatch"); } this.readNum = indexFile.readInt(); this.readPosition = indexFile.readInt(); this.readCounter = indexFile.readInt(); this.writeNum = indexFile.readInt(); this.writePosition = indexFile.readInt(); this.writeCounter = indexFile.readInt(); this.fileChannel = indexFile.getChannel(); this.writeIndex = fileChannel.map(FileChannel.MapMode.READ_WRITE, 0, INDEX_SIZE); this.writeIndex = writeIndex.load(); this.readIndex = (MappedByteBuffer) writeIndex.duplicate(); } else { this.indexFile = new RandomAccessFile(file, "rw"); this.fileChannel = indexFile.getChannel(); this.writeIndex = fileChannel.map(FileChannel.MapMode.READ_WRITE, 0, INDEX_SIZE); this.readIndex = (MappedByteBuffer) writeIndex.duplicate(); putMagic(); putReadNum(0); putReadPosition(0); putReadCounter(0); putWriteNum(0); putWritePosition(0); putWriteCounter(0); } } catch (Exception e) { throw new IllegalArgumentException(e); } } public static boolean isIndexFile(String fileName) { return fileName.endsWith(INDEX_FILE_SUFFIX); } public static String parseQueueName(String indexFileName) { String fileName = indexFileName.substring(0, indexFileName.lastIndexOf(‘.‘)); return fileName.split("_")[1]; } public static String formatIndexFilePath(String queueName, String fileBackupDir) { return fileBackupDir + File.separator + String.format("findex_%s%s", queueName, INDEX_FILE_SUFFIX); } public int getReadNum() { return this.readNum; } public int getReadPosition() { return this.readPosition; } public int getReadCounter() { return this.readCounter; } public int getWriteNum() { return this.writeNum; } public int getWritePosition() { return this.writePosition; } public int getWriteCounter() { return this.writeCounter; } public void putMagic() { this.writeIndex.position(0); this.writeIndex.put(MAGIC.getBytes()); } public void putWritePosition(int writePosition) { this.writeIndex.position(WRITE_POS_OFFSET); this.writeIndex.putInt(writePosition); this.writePosition = writePosition; } public void putWriteNum(int writeNum) { this.writeIndex.position(WRITE_NUM_OFFSET); this.writeIndex.putInt(writeNum); this.writeNum = writeNum; } public void putWriteCounter(int writeCounter) { this.writeIndex.position(WRITE_CNT_OFFSET); this.writeIndex.putInt(writeCounter); this.writeCounter = writeCounter; } public void putReadNum(int readNum) { this.readIndex.position(READ_NUM_OFFSET); this.readIndex.putInt(readNum); this.readNum = readNum; } public void putReadPosition(int readPosition) { this.readIndex.position(READ_POS_OFFSET); this.readIndex.putInt(readPosition); this.readPosition = readPosition; } public void putReadCounter(int readCounter) { this.readIndex.position(READ_CNT_OFFSET); this.readIndex.putInt(readCounter); this.readCounter = readCounter; } public void reset() { int size = writeCounter - readCounter; putReadCounter(0); putWriteCounter(size); if (size == 0 && readNum == writeNum) { putReadPosition(0); putWritePosition(0); } } public void sync() { if (writeIndex != null) { writeIndex.force(); } } public void close() { try { if (writeIndex == null) { return; } sync(); AccessController.doPrivileged(new PrivilegedAction<Object>() { public Object run() { try { Method getCleanerMethod = writeIndex.getClass().getMethod("cleaner"); getCleanerMethod.setAccessible(true); sun.misc.Cleaner cleaner = (sun.misc.Cleaner) getCleanerMethod.invoke(writeIndex); cleaner.clean(); } catch (Exception e) { LOGGER.error("close fqueue index file failed", e); } return null; } }); writeIndex = null; readIndex = null; fileChannel.close(); indexFile.close(); } catch (IOException e) { LOGGER.error("close fqueue index file failed", e); } } } private static class MFQueueBlock { private static final String BLOCK_FILE_SUFFIX = ".blk"; // 数据文件 private static final int BLOCK_SIZE = 32 * 1024 * 1024; // 32MB private final int EOF = -1; private String blockFilePath; private MFQueueIndex index; private RandomAccessFile blockFile; private FileChannel fileChannel; private ByteBuffer byteBuffer; private MappedByteBuffer mappedBlock; public MFQueueBlock(String blockFilePath, MFQueueIndex index, RandomAccessFile blockFile, FileChannel fileChannel, ByteBuffer byteBuffer, MappedByteBuffer mappedBlock) { this.blockFilePath = blockFilePath; this.index = index; this.blockFile = blockFile; this.fileChannel = fileChannel; this.byteBuffer = byteBuffer; this.mappedBlock = mappedBlock; } public MFQueueBlock(MFQueueIndex index, String blockFilePath) { this.index = index; this.blockFilePath = blockFilePath; try { File file = new File(blockFilePath); this.blockFile = new RandomAccessFile(file, "rw"); this.fileChannel = blockFile.getChannel(); this.mappedBlock = fileChannel.map(FileChannel.MapMode.READ_WRITE, 0, BLOCK_SIZE); this.byteBuffer = mappedBlock.load(); } catch (Exception e) { throw new IllegalArgumentException(e); } } public MFQueueBlock duplicate() { return new MFQueueBlock(this.blockFilePath, this.index, this.blockFile, this.fileChannel, this.byteBuffer.duplicate(), this.mappedBlock); } public static String formatBlockFilePath(String queueName, int fileNum, String fileBackupDir) { return fileBackupDir + File.separator + String.format("fblock_%s_%d%s", queueName, fileNum, BLOCK_FILE_SUFFIX); } public String getBlockFilePath() { return blockFilePath; } public void putEOF() { this.byteBuffer.position(index.getWritePosition()); this.byteBuffer.putInt(EOF); } public boolean isSpaceAvailable(int len) { int increment = len + 4; int writePosition = index.getWritePosition(); return BLOCK_SIZE >= increment + writePosition + 4; // 保证最后有4字节的空间可以写入EOF } public boolean eof() { int readPosition = index.getReadPosition(); return readPosition > 0 && byteBuffer.getInt(readPosition) == EOF; } public int write(byte[] bytes) { int len = bytes.length; int increment = len + 4; int writePosition = index.getWritePosition(); byteBuffer.position(writePosition); byteBuffer.putInt(len); byteBuffer.put(bytes); index.putWritePosition(increment + writePosition); index.putWriteCounter(index.getWriteCounter() + 1); return increment; } public byte[] read() { byte[] bytes; int readNum = index.getReadNum(); int readPosition = index.getReadPosition(); int writeNum = index.getWriteNum(); int writePosition = index.getWritePosition(); if (readNum == writeNum && readPosition >= writePosition) { return null; } byteBuffer.position(readPosition); int dataLength = byteBuffer.getInt(); if (dataLength <= 0) { return null; } bytes = new byte[dataLength]; byteBuffer.get(bytes); index.putReadPosition(readPosition + bytes.length + 4); index.putReadCounter(index.getReadCounter() + 1); return bytes; } public void sync() { if (mappedBlock != null) { mappedBlock.force(); } } public void close() { try { if (mappedBlock == null) { return; } sync(); AccessController.doPrivileged(new PrivilegedAction<Object>() { public Object run() { try { Method getCleanerMethod = mappedBlock.getClass().getMethod("cleaner"); getCleanerMethod.setAccessible(true); sun.misc.Cleaner cleaner = (sun.misc.Cleaner) getCleanerMethod.invoke(mappedBlock); cleaner.clean(); } catch (Exception e) { LOGGER.error("close fqueue block file failed", e); } return null; } }); mappedBlock = null; byteBuffer = null; fileChannel.close(); blockFile.close(); } catch (IOException e) { LOGGER.error("close fqueue block file failed", e); } } } }
标签:
原文地址:http://my.oschina.net/xnkl/blog/477690