标签:berkeleydb
最近搞一个高并发的服务中心,需要把数据写入到MySql中,结果测试发现最大TPS才4K,经过讨论后决定先把接收到的数据写到本地,然后通过同步线程再同步到MySql。
最初本地存储选用的SqlLite,结果测试发现SqlLite支持并发有问题;又选型BerkeleyDB,经过测试发现BerkeleyDB满足需求。
BerkeleyDB测试代码如下:
注:代码还有改造的地方,如initCheck方法去掉同步,改为初始化为同步,请在项目中自行修改
package test.berkelyDb; import java.io.File; import java.util.Date; import java.util.Timer; import java.util.TimerTask; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import org.msgpack.MessagePack; import com.sleepycat.je.Cursor; import com.sleepycat.je.CursorConfig; import com.sleepycat.je.Database; import com.sleepycat.je.DatabaseConfig; import com.sleepycat.je.DatabaseEntry; import com.sleepycat.je.Environment; import com.sleepycat.je.EnvironmentConfig; import com.sleepycat.je.LockMode; import com.sleepycat.je.OperationStatus; public class TestBerkely { public Environment env; public Database db; private static final String dbName = "jsf"; public synchronized void initAndCheck() throws Exception { if (env != null && env.isValid()) { return; } EnvironmentConfig envConfig = new EnvironmentConfig(); envConfig.setAllowCreate(true); envConfig.setCacheSize(10*1024 * 1024); try { env = new Environment(new File("e:\\test"), envConfig); } catch (Exception e) { e.printStackTrace(); } } public void open() { if(db != null){ return; } DatabaseConfig dbConfig = new DatabaseConfig(); dbConfig.setSortedDuplicates(true); dbConfig.setAllowCreate(true); try { db = env.openDatabase(null, dbName, dbConfig); } catch (Exception e) { e.printStackTrace(); } } public void close() { if (db != null) { try { db.close(); } catch (Exception e) { e.printStackTrace(); } } if (env != null) { try { env.close(); } catch (Exception e) { e.printStackTrace(); } } } public Object get(String key) throws Exception { DatabaseEntry queryKey = new DatabaseEntry(); DatabaseEntry value = new DatabaseEntry(); queryKey.setData(key.getBytes("UTF-8")); OperationStatus status = db.get(null, queryKey, value, LockMode.DEFAULT); if (status == OperationStatus.SUCCESS) { return new String(value.getData()); } return null; } public boolean put(String key, byte values[]) throws Exception { byte[] theKey = key.getBytes("UTF-8"); OperationStatus status = db.put(null, new DatabaseEntry(theKey), new DatabaseEntry(values)); if (status == OperationStatus.SUCCESS) { return true; } return false; } public boolean del(String key) throws Exception{ byte[] theKey = key.getBytes("UTF-8"); OperationStatus status = db.delete(null, new DatabaseEntry(theKey)); if(status == OperationStatus.SUCCESS) { return true; } return false; } public static void main(String[] args) throws Exception { final long len = 10000000; final TestBerkely tb = new TestBerkely(); final AtomicInteger counter = new AtomicInteger(1); tb.initAndCheck(); tb.open(); Timer timer = new Timer(); Client client = new Client(); client.setAlias("saf@0.0.1"); client.setAppPath("E:\\workspace\\MyProject\\bin"); client.setCreateTime(new Date()); client.setId(100000); client.setInsKey(TestBerkely.class.getCanonicalName() + "::saf@0.0.1"); client.setInterfaceId(10092389); client.setIp("192.168.229.39"); client.setPid(2398); client.setProtocol(1); client.setSafVer(120); client.setSrcType(1); client.setStartTime(System.currentTimeMillis()); client.setStatus(1); client.setUniqKey("uniqKey"); client.setUpdateTime(new Date()); client.setUrlDesc("89uf92438yq29384yf"); MessagePack mp = new MessagePack(); mp.register(Client.class); final byte data[] = mp.write(client); ExecutorService exePool = Executors.newFixedThreadPool(10); final long start = System.currentTimeMillis(); //统计TPS线程 timer.schedule(new TimerTask() { @Override public void run() { long end = System.currentTimeMillis(); long time = (end-start)/1000; if(time == 0){ return; } int current = counter.get(); System.out.println("***********----------------->" + (current*100/time/100f)); } }, 1000, 2000); for(int i=0; i<10; i++){ exePool.execute(new Runnable() { @Override public void run() { int num = counter.getAndIncrement(); String key = "key" + num; try { while(true){ tb.put(key, data); if(counter.get() < len) { num = counter.getAndIncrement(); key = "key" + num; continue; } break; } } catch (Exception e) { e.printStackTrace(); } } }); } exePool.shutdown(); try { while(!exePool.awaitTermination(1, TimeUnit.SECONDS)){ } } catch (InterruptedException e) { e.printStackTrace(); } long end = System.currentTimeMillis(); long tps = len/((end - start)/1000); System.out.println("tps------------------>" + tps); timer.cancel(); tb.env.sync(); tb.env.cleanLog(); counter.set(0); int errorNum = 0; //检查写入数据 while(counter.get() < 100000){ if(tb.get("key" + counter.getAndIncrement()) == null){ errorNum++; } } System.out.println("error data is ----------->" + errorNum); tb.del("key" + 5000); tb.env.sync(); tb.close(); } }
标签:berkeleydb
原文地址:http://jfzhang.blog.51cto.com/1934093/1567284