标签:style blog http color java os 使用 io strong
本博客属原创文章,欢迎转载!转载请务必注明出处:http://guoyunsky.iteye.com/blog/1169912
本博客已迁移本人独立博客: http://www.yun5u.com/
队列很常见,但大部分的队列是将数据放入到内存.如 果数据过多,就有内存溢出危险,而且长久占据着内存,也会影响性能.比如爬虫,将要抓取的URL放到内存,而URL过多,内存肯定要爆.在读 Heritrix源码中,发现Heritrix是基于Bdb实现了一个持久化队列,于是我就将这块代码独立出来,平时使用也蛮爽的,现在拿出来共享.同时 数据已经持久化,相比放在内存的一次性,可以循环累加使用.
大家也知道BDB的高性能和嵌入式.但这个持久化队列我觉得比较适合单机.如果涉及到分布式,就不大适合了.毕竟分布式要通信,负载均衡,冗余等.可以用其他的数据库等替代.
这里大概先说下实现原理,BDB是Key-Value型数据库,而队列是FIFO.所以这个持久化队列以位置作为BDB的Key,数据作为BDB的 Value.然后用两个变量,分别记录队列两头的位置,也就是头部和尾部.当有数据插入的时候,就以尾部的位置为这个数据的Key.而当要取出数据时,以 头部位置作为Key,获取这个Key的数据.原理大概如此,这个类也继承AbstractQueue,这里贴上代码.以下代码需引用bdb- je,common-io,junit.请在附件中下载
- 自定义的BDB环境类,可以缓存StoredClassCatalog并共享
- package com.guoyun.util;
-
- import java.io.File;
-
- import com.sleepycat.bind.serial.StoredClassCatalog;
- import com.sleepycat.je.Database;
- import com.sleepycat.je.DatabaseConfig;
- import com.sleepycat.je.DatabaseException;
- import com.sleepycat.je.Environment;
- import com.sleepycat.je.EnvironmentConfig;
- public class BdbEnvironment extends Environment {
- StoredClassCatalog classCatalog;
- Database classCatalogDB;
-
-
- public BdbEnvironment(File envHome, EnvironmentConfig envConfig) throws DatabaseException {
- super(envHome, envConfig);
- }
-
-
- public StoredClassCatalog getClassCatalog() {
- if(classCatalog == null) {
- DatabaseConfig dbConfig = new DatabaseConfig();
- dbConfig.setAllowCreate(true);
- try {
- classCatalogDB = openDatabase(null, "classCatalog", dbConfig);
- classCatalog = new StoredClassCatalog(classCatalogDB);
- } catch (DatabaseException e) {
-
- throw new RuntimeException(e);
- }
- }
- return classCatalog;
- }
-
- @Override
- public synchronized void close() throws DatabaseException {
- if(classCatalogDB!=null) {
- classCatalogDB.close();
- }
- super.close();
- }
-
- }
2. 基于BDB实现的持久化队列
- package com.guoyun.util;
-
- import java.io.File;
- import java.io.IOException;
- import java.io.Serializable;
- import java.util.AbstractQueue;
- import java.util.Iterator;
- import java.util.concurrent.atomic.AtomicLong;
-
- import org.apache.commons.io.FileUtils;
-
- import com.sleepycat.bind.EntryBinding;
- import com.sleepycat.bind.serial.SerialBinding;
- import com.sleepycat.bind.serial.StoredClassCatalog;
- import com.sleepycat.bind.tuple.TupleBinding;
- import com.sleepycat.collections.StoredMap;
- import com.sleepycat.collections.StoredSortedMap;
- import com.sleepycat.je.Database;
- import com.sleepycat.je.DatabaseConfig;
- import com.sleepycat.je.DatabaseException;
- import com.sleepycat.je.DatabaseExistsException;
- import com.sleepycat.je.DatabaseNotFoundException;
- import com.sleepycat.je.EnvironmentConfig;
- public class BdbPersistentQueue<E extends Serializable> extends AbstractQueue<E> implements
- Serializable {
- private static final long serialVersionUID = 3427799316155220967L;
- private transient BdbEnvironment dbEnv;
- private transient Database queueDb;
- private transient StoredMap<Long,E> queueMap;
- private transient String dbDir;
- private transient String dbName;
- private AtomicLong headIndex;
- private AtomicLong tailIndex;
- private transient E peekItem=null;
-
-
- public BdbPersistentQueue(Database db,Class<E> valueClass,StoredClassCatalog classCatalog){
- this.queueDb=db;
- this.dbName=db.getDatabaseName();
- headIndex=new AtomicLong(0);
- tailIndex=new AtomicLong(0);
- bindDatabase(queueDb,valueClass,classCatalog);
- }
-
- public BdbPersistentQueue(String dbDir,String dbName,Class<E> valueClass){
- headIndex=new AtomicLong(0);
- tailIndex=new AtomicLong(0);
- this.dbDir=dbDir;
- this.dbName=dbName;
- createAndBindDatabase(dbDir,dbName,valueClass);
- }
-
- public void bindDatabase(Database db, Class<E> valueClass, StoredClassCatalog classCatalog){
- EntryBinding<E> valueBinding = TupleBinding.getPrimitiveBinding(valueClass);
- if(valueBinding == null) {
- valueBinding = new SerialBinding<E>(classCatalog, valueClass);
- }
- queueDb = db;
- queueMap = new StoredSortedMap<Long,E>(
- db,
- TupleBinding.getPrimitiveBinding(Long.class),
- valueBinding,
- true);
- }
-
- private void createAndBindDatabase(String dbDir, String dbName,Class<E> valueClass) throws DatabaseNotFoundException,
- DatabaseExistsException,DatabaseException,IllegalArgumentException{
- File envFile = null;
- EnvironmentConfig envConfig = null;
- DatabaseConfig dbConfig = null;
- Database db=null;
-
- try {
-
- envFile = new File(dbDir);
-
-
- envConfig = new EnvironmentConfig();
- envConfig.setAllowCreate(true);
- envConfig.setTransactional(false);
-
-
- dbConfig = new DatabaseConfig();
- dbConfig.setAllowCreate(true);
- dbConfig.setTransactional(false);
- dbConfig.setDeferredWrite(true);
-
-
- dbEnv = new BdbEnvironment(envFile, envConfig);
-
- db = dbEnv.openDatabase(null, dbName, dbConfig);
-
- bindDatabase(db,valueClass,dbEnv.getClassCatalog());
-
- } catch (DatabaseNotFoundException e) {
- throw e;
- } catch (DatabaseExistsException e) {
- throw e;
- } catch (DatabaseException e) {
- throw e;
- } catch (IllegalArgumentException e) {
- throw e;
- }
-
-
- }
-
-
- @Override
- public Iterator<E> iterator() {
- return queueMap.values().iterator();
- }
-
- @Override
- public int size() {
- synchronized(tailIndex){
- synchronized(headIndex){
- return (int)(tailIndex.get()-headIndex.get());
- }
- }
- }
-
-
- @Override
- public boolean offer(E e) {
- synchronized(tailIndex){
- queueMap.put(tailIndex.getAndIncrement(), e);
- }
- return true;
- }
-
-
- @Override
- public E peek() {
- synchronized(headIndex){
- if(peekItem!=null){
- return peekItem;
- }
- E headItem=null;
- while(headItem==null&&headIndex.get()<tailIndex.get()){
- headItem=queueMap.get(headIndex.get());
- if(headItem!=null){
- peekItem=headItem;
- continue;
- }
- headIndex.incrementAndGet();
- }
- return headItem;
- }
- }
-
-
- @Override
- public E poll() {
- synchronized(headIndex){
- E headItem=peek();
- if(headItem!=null){
- queueMap.remove(headIndex.getAndIncrement());
- peekItem=null;
- return headItem;
- }
- }
- return null;
- }
-
-
- public void close(){
- try {
- if(queueDb!=null){
- queueDb.sync();
- queueDb.close();
- }
- } catch (DatabaseException e) {
-
- e.printStackTrace();
- } catch (UnsupportedOperationException e) {
-
- e.printStackTrace();
- }
- }
-
-
- @Override
- public void clear() {
- try {
- close();
- if(dbEnv!=null&&queueDb!=null){
- dbEnv.removeDatabase(null, dbName==null?queueDb.getDatabaseName():dbName);
- dbEnv.close();
- }
- } catch (DatabaseNotFoundException e) {
-
- e.printStackTrace();
- } catch (DatabaseException e) {
-
- e.printStackTrace();
- } finally{
- try {
- if(this.dbDir!=null){
- FileUtils.deleteDirectory(new File(this.dbDir));
- }
-
- } catch (IOException e) {
-
- e.printStackTrace();
- }
- }
- }
-
- }
3. 测试类,测试数据准确性和性能
- package com.guoyun.util;
-
- import java.io.File;
- import java.util.Queue;
- import java.util.concurrent.LinkedBlockingQueue;
-
- import junit.framework.TestCase;
-
- public class BdbPersistentQueueTest extends TestCase{
- Queue<String> memoryQueue;
- Queue<String> persistentQueue;
-
- @Override
- protected void setUp() throws Exception {
- super.setUp();
- memoryQueue=new LinkedBlockingQueue<String>();
- String dbDir="E:/java/test/bdbDir";
- File file=new File(dbDir);
- if(!file.exists()||!file.isDirectory()){
- file.mkdirs();
- }
- persistentQueue=new BdbPersistentQueue(dbDir,"pq",String.class);
- }
-
- @Override
- protected void tearDown() throws Exception {
- super.tearDown();
- memoryQueue.clear();
- memoryQueue=null;
- persistentQueue.clear();
- persistentQueue=null;
- }
-
-
- public int drain(Queue<String> queue){
- int count=0;
- while(true){
- try {
- queue.remove();
- count++;
- } catch (Exception e) {
- return count;
- }
- }
-
- }
-
- public void fill(Queue<String> queue,int size){
- for(int i=0;i<size;i++){
- queue.add(i+"");
- }
- }
-
- public void checkTime(int size){
- System.out.println("1.内存Queue插入和排空数据所耗时间");
- long time=0;
- long start=System.nanoTime();
- fill(memoryQueue,size);
- time=System.nanoTime()-start;
- System.out.println("\t填充 "+size+" 条数据耗时: "+(double)time/1000000+" 毫秒,单条耗时: "+((double)time/size)+" 纳秒");
- start=System.nanoTime();
- drain(memoryQueue);
- time=System.nanoTime()-start;
- System.out.println("\t排空 "+size+" 条数据耗时: "+(double)time/1000000+" 毫秒,单条耗时: "+((double)time/size)+" 纳秒");
-
- System.out.println("2.持久化Queue插入和排空数据所耗时间");
- start=System.nanoTime();
- fill(persistentQueue,size);
- time=System.nanoTime()-start;
- System.out.println("\t填充 "+size+" 条数据耗时: "+(double)time/1000000+" 毫秒,单条耗时: "+((double)time/size/1000000)+" 豪秒");
- start=System.nanoTime();
- drain(persistentQueue);
- time=System.nanoTime()-start;
- System.out.println("\t排空 "+size+" 条数据耗时: "+(double)time/1000000+" 毫秒,单条耗时: "+((double)time/size/1000)+" 豪秒");
-
- }
-
-
- public void testTime_tenThousand(){
- System.out.println("========测试1000000(十万)条数据=================");
- checkTime(100000);
- }
-
-
-
- public void testTime_mil(){
- System.out.println("========测试1000000(百万)条数据=================");
- checkTime(1000000);
- }
-
-
-
- public void testTime_tenMil(){
- System.out.println("========测试10000000(千万)条数据=================");
- checkTime(10000000);
- }
-
-
- public void checkDataExact(Queue<String> queue,String queueName,int size){
- if(queue.size()!=size){
- System.err.println("Error size of "+queueName);
- }
- String value=null;
- for(int i=0;i<size;i++){
- value=queue.remove();
- if(!((i+"").equals(value))){
- System.err.println("Error "+queueName+":"+i+"->"+value);
- }
- }
- }
-
-
- public void testExact(){
- int size=100;
- fill(memoryQueue,size);
- fill(persistentQueue,size);
-
- checkDataExact(memoryQueue,"MemoryQueue",100);
- checkDataExact(persistentQueue,"PersistentQueue",100);
-
- }
-
- }
4.测试性能
========测试1000000(十万)条数据=================
1.内存Queue插入和排空数据所耗时间
填充 100000 条数据耗时: 53.550787 毫秒,单条耗时: 535.50787 纳秒
排空 100000 条数据耗时: 27.09901 毫秒,单条耗时: 270.9901 纳秒
2.持久化Queue插入和排空数据所耗时间
填充 100000 条数据耗时: 1399.644305 毫秒,单条耗时: 0.01399644305 豪秒
排空 100000 条数据耗时: 2104.765179 毫秒,单条耗时: 21.04765179 豪秒
持久化写入是内存写入的26倍,读取是77倍
========测试1000000(百万)条数据=================
1.内存Queue插入和排空数据所耗时间
填充 1000000 条数据耗时: 699.105888 毫秒,单条耗时: 699.105888 纳秒
排空 1000000 条数据耗时: 158.792281 毫秒,单条耗时: 158.792281 纳秒
2.持久化Queue插入和排空数据所耗时间
填充 1000000 条数据耗时: 11978.132218 毫秒,单条耗时: 0.011978132218 豪秒
排空 1000000 条数据耗时: 22355.617205 毫秒,单条耗时: 22.355617204999998 豪秒
持久化写入是内存写入的17倍,读取是141倍
========测试10000000(千万)条数据=================
1.内存Queue插入和排空数据所耗时间
填充 10000000 条数据耗时: 9678.377046 毫秒,单条耗时: 967.8377046 纳秒
排空 10000000 条数据耗时: 1473.416825 毫秒,单条耗时: 147.3416825 纳秒
2.持久化Queue插入和排空数据所耗时间
填充 10000000 条数据耗时: 151177.036391 毫秒,单条耗时: 0.0151177036391 豪秒
排空 10000000 条数据耗时: 361642.655135 毫秒,单条耗时: 36.164265513500006 豪秒
持久化写入是内存写入的15倍,读取是245倍
可以看出写入和遍历一条都是在毫秒级别,还有千万级的数据,BDB的性能着实牛逼.而且随着数据的增多,写的时间在缩短,读的时间在增长.
基于Berkeley DB实现的持久化队列
标签:style blog http color java os 使用 io strong
原文地址:http://www.cnblogs.com/zheh/p/3934344.html