标签:berkeleydb
最近搞一个高并发的服务中心,需要把数据写入到MySql中,结果测试发现最大TPS才4K,经过讨论后决定先把接收到的数据写到本地,然后通过同步线程再同步到MySql。
最初本地存储选用的SqlLite,结果测试发现SqlLite支持并发有问题;又选型BerkeleyDB,经过测试发现BerkeleyDB满足需求。
BerkeleyDB测试代码如下:
注:代码还有改造的地方,如initCheck方法去掉同步,改为初始化为同步,请在项目中自行修改
package test.berkelyDb;
import java.io.File;
import java.util.Date;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.msgpack.MessagePack;
import com.sleepycat.je.Cursor;
import com.sleepycat.je.CursorConfig;
import com.sleepycat.je.Database;
import com.sleepycat.je.DatabaseConfig;
import com.sleepycat.je.DatabaseEntry;
import com.sleepycat.je.Environment;
import com.sleepycat.je.EnvironmentConfig;
import com.sleepycat.je.LockMode;
import com.sleepycat.je.OperationStatus;
public class TestBerkely {
public Environment env;
public Database db;
private static final String dbName = "jsf";
public synchronized void initAndCheck() throws Exception {
if (env != null && env.isValid()) {
return;
}
EnvironmentConfig envConfig = new EnvironmentConfig();
envConfig.setAllowCreate(true);
envConfig.setCacheSize(10*1024 * 1024);
try {
env = new Environment(new File("e:\\test"), envConfig);
} catch (Exception e) {
e.printStackTrace();
}
}
public void open() {
if(db != null){
return;
}
DatabaseConfig dbConfig = new DatabaseConfig();
dbConfig.setSortedDuplicates(true);
dbConfig.setAllowCreate(true);
try {
db = env.openDatabase(null, dbName, dbConfig);
} catch (Exception e) {
e.printStackTrace();
}
}
public void close() {
if (db != null) {
try {
db.close();
} catch (Exception e) {
e.printStackTrace();
}
}
if (env != null) {
try {
env.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
public Object get(String key) throws Exception {
DatabaseEntry queryKey = new DatabaseEntry();
DatabaseEntry value = new DatabaseEntry();
queryKey.setData(key.getBytes("UTF-8"));
OperationStatus status = db.get(null, queryKey, value, LockMode.DEFAULT);
if (status == OperationStatus.SUCCESS) {
return new String(value.getData());
}
return null;
}
public boolean put(String key, byte values[]) throws Exception {
byte[] theKey = key.getBytes("UTF-8");
OperationStatus status = db.put(null, new DatabaseEntry(theKey),
new DatabaseEntry(values));
if (status == OperationStatus.SUCCESS) {
return true;
}
return false;
}
public boolean del(String key) throws Exception{
byte[] theKey = key.getBytes("UTF-8");
OperationStatus status = db.delete(null, new DatabaseEntry(theKey));
if(status == OperationStatus.SUCCESS) {
return true;
}
return false;
}
public static void main(String[] args) throws Exception {
final long len = 10000000;
final TestBerkely tb = new TestBerkely();
final AtomicInteger counter = new AtomicInteger(1);
tb.initAndCheck();
tb.open();
Timer timer = new Timer();
Client client = new Client();
client.setAlias("saf@0.0.1");
client.setAppPath("E:\\workspace\\MyProject\\bin");
client.setCreateTime(new Date());
client.setId(100000);
client.setInsKey(TestBerkely.class.getCanonicalName() + "::saf@0.0.1");
client.setInterfaceId(10092389);
client.setIp("192.168.229.39");
client.setPid(2398);
client.setProtocol(1);
client.setSafVer(120);
client.setSrcType(1);
client.setStartTime(System.currentTimeMillis());
client.setStatus(1);
client.setUniqKey("uniqKey");
client.setUpdateTime(new Date());
client.setUrlDesc("89uf92438yq29384yf");
MessagePack mp = new MessagePack();
mp.register(Client.class);
final byte data[] = mp.write(client);
ExecutorService exePool = Executors.newFixedThreadPool(10);
final long start = System.currentTimeMillis();
//统计TPS线程
timer.schedule(new TimerTask() {
@Override
public void run() {
long end = System.currentTimeMillis();
long time = (end-start)/1000;
if(time == 0){
return;
}
int current = counter.get();
System.out.println("***********----------------->" + (current*100/time/100f));
}
}, 1000, 2000);
for(int i=0; i<10; i++){
exePool.execute(new Runnable() {
@Override
public void run() {
int num = counter.getAndIncrement();
String key = "key" + num;
try {
while(true){
tb.put(key, data);
if(counter.get() < len) {
num = counter.getAndIncrement();
key = "key" + num;
continue;
}
break;
}
} catch (Exception e) {
e.printStackTrace();
}
}
});
}
exePool.shutdown();
try {
while(!exePool.awaitTermination(1, TimeUnit.SECONDS)){
}
} catch (InterruptedException e) {
e.printStackTrace();
}
long end = System.currentTimeMillis();
long tps = len/((end - start)/1000);
System.out.println("tps------------------>" + tps);
timer.cancel();
tb.env.sync();
tb.env.cleanLog();
counter.set(0);
int errorNum = 0;
//检查写入数据
while(counter.get() < 100000){
if(tb.get("key" + counter.getAndIncrement()) == null){
errorNum++;
}
}
System.out.println("error data is ----------->" + errorNum);
tb.del("key" + 5000);
tb.env.sync();
tb.close();
}
}标签:berkeleydb
原文地址:http://jfzhang.blog.51cto.com/1934093/1567284