标签:
使用 MyBatis 连接 Oracle ,然后向 MongoDB 中写入。
数据量 1.3 亿,每分钟 7 - 10 万。
SyncPacker_201603.java
package syncPacker; import java.io.BufferedReader; import java.io.File; import java.io.FileReader; import java.io.FileWriter; import java.io.IOException; import java.net.UnknownHostException; import java.util.ArrayList; import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; import org.apache.http.message.BasicNameValuePair; import org.apache.log4j.Logger; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; import com.mongodb.BasicDBObject; import com.mongodb.DB; import com.mongodb.DBCollection; import com.mongodb.Mongo; import com.pro.framework.action.BaseController; import logCenter.SendLog; import net.sf.json.JSONObject; import syncPacker.bean.PatentBibliographicChangeBean; import syncPacker.bean.SyncDataPackageBean; import utils.DateUtils; import utils.DatetimeUtils; import utils.HttpUtils; /** * 增量数据分块打包 全处理 * * http://localhost:8080/PatentSearchExtend/syncPacker!pack.action * * http://10.78.2.21:8080/PatentSearchExtend/syncPacker!pack.action?bean. * tableName=E_BIBLIOGRAPHIC_CHANGE_TEMP&bean.maxRowsPerSyncPackerPackage= 10000 */ @Service public class SyncPacker_201603 extends BaseController { private static final long serialVersionUID = 1L; // 初始化:数据库接口 @Autowired private SyncPackerDao dao; // 初始化:发送端地址 @Value("${url_syncSender}") private String url_syncSender; // 初始化:本地文件存储路径 @Value("${path_syncPacker_package}") private String path_syncPacker_package; // 初始化:读取最大数据包名称的地址 @Value("${url_selectMaxPackageNumber}") private String url_selectMaxPackageNumber; // 初始化:存储最大数据包名称的地址 @Value("${url_insertSyncDataPackage}") private String url_insertSyncDataPackage; // 初始化:查询条件Bean private SyncDataPackageBean bean = new SyncDataPackageBean(); // 初始化:查询结果List private List<PatentBibliographicChangeBean> pbcList = new ArrayList<PatentBibliographicChangeBean>(); // 初始化:形成的数据包名称 private List<PatentBibliographicChangeBean> pbcList_withPackageName = new ArrayList<PatentBibliographicChangeBean>(); // 初始化:已打包的增量数据ID表单 private List<String> pdaIdList = new ArrayList<String>(); // 初始化:用于删除数据的临时ID List private List<String> idPartList = new ArrayList<String>(); // 初始化:传输协议 private HttpUtils httpUtils = new HttpUtils(); // 初始化:键值串 private List<BasicNameValuePair> paramList = new ArrayList<BasicNameValuePair>(); // POST 结果 private String str_postResult; // 初始化:发送url返回的信息 private JSONObject json_postResult; // 初始化:历史最大包编号 private String maxPackageNumber; // 初始化:记录打包完成后的数据版本 private Integer centerNodeDataVersion; // 发送远程日志 private SendLog sendLog = new SendLog(); // 记录本地日志 private Logger logger = Logger.getLogger(SyncPacker_201603.class); // 初始化:判断程序是否正在运行 public static boolean isRunning = false; // 本次处理完成后的最大包编号 private String packedPackageNumber; // 用于返回json的成功信息 private String success = "success"; /** 文件补发用:指定数据重新打包 */ // http://localhost:8080/PatentSearchExtend/syncPacker!packByPackageNumber.action?bean.packageNumberStart=000101&bean.packageNumberEnd=000102 public String packByPackageNumber() throws Exception { logMemory("本次请求处理开始。", bean.getPackageNumberStart() + " - " + bean.getPackageNumberEnd() ); String job_start = DateUtils.getCurrentTimeString(0); try { for ( int i = Integer.valueOf(bean.getPackageNumberStart()); i <= Integer.valueOf(bean.getPackageNumberEnd()); i++) { // 开始时间 String package_start = DateUtils.getCurrentTimeString(0); // 包编号 String packageNumber = String.format( "%06d", i ); logMemory("开始,包编号:", packageNumber ); //(1)读历史表 pbcList = selectList_changeHistory( packageNumber ); if( null == pbcList ) { logMemory("<span style=\"color:red;\">数据为空 !!!</span>", ""); } else{ logMemory("数据查询完毕,数据量为", String.valueOf( pbcList.size() )); //(2)插入MongoDB insertMongoDB( pbcList ); pbcList.clear(); }; logMemory("传输结束,包编号:" + packageNumber ," 用时: " + DatetimeUtils.getDistanceTimes_string(package_start, DateUtils.getCurrentTimeString(0)) ); } } catch (Exception e) { // 日志输出 logMemory("系统发生异常", e.getMessage()); e.printStackTrace(); } logMemory("本次请求处理完成,程序结束。", bean.getPackageNumberStart() + " - " + bean.getPackageNumberEnd() + " 用时: " + DatetimeUtils.getDistanceTimes_string( job_start, DateUtils.getCurrentTimeString(0)) ); return SUCCESS; } /** * 读历史表 * @param packageNumber * @return */ private List<PatentBibliographicChangeBean> selectList_changeHistory( String packageNumber ){ for ( int i = 0; i < 100; i++ ) { try { return dao.selectList_changeHistory( packageNumber ); } catch (Exception e) { // TODO Auto-generated catch block // e.printStackTrace(); logMemory("系统发生异常", e.getMessage()); try { Thread.sleep(500); logMemory("暂停 0.5 秒", "" ); } catch (InterruptedException e1) { // TODO Auto-generated catch block e1.printStackTrace(); } } }// loop end return null ; } /** * 插入 MongoDB */ public void insertMongoDB( List<PatentBibliographicChangeBean> pbcList ) { //# MongoDB(数据加载目标) String syncLoadIntoMongoDbService = "10.78.2.23:27017"; String syncLoadIntoMongoDbName = "patent_search_extend"; String syncLoadIntoMongoTable = "patent_bibliographic_20160319"; // 加载开始 // logger.info(DateUtils.getNow() + " Load start: " + syncLoadIntoMongoDbService ); Mongo m = null; try { m = new Mongo( syncLoadIntoMongoDbService ); } catch (UnknownHostException e) { e.printStackTrace(); logger.info(DateUtils.getNow() + " UnknownHostException:" + e.getMessage()); } // 库名 DB db = m.getDB( syncLoadIntoMongoDbName ); // logger.info( DateUtils.getNow() + " Db:" + syncLoadIntoMongoDbName ); // 表名 DBCollection collection = db.getCollection( syncLoadIntoMongoTable ); // logger.info(DateUtils.getNow() + " Table:" + syncLoadIntoMongoTable ); // 循环列表,将每个元素插入数据库 for( PatentBibliographicChangeBean pbcBean : pbcList ){ //(1)读取一条著录数据 // JSONObject json = packByPackageNumber_readBibl( pbcBean.getId() ); // if( null == json ) return ; // 列,值 BasicDBObject insDoc = new BasicDBObject(); insDoc.put("abstract_No" , pbcBean.getAbstract_No() ); insDoc.put("app_Addr" , pbcBean.getApp_Addr() ); insDoc.put("app_Cn" , pbcBean.getApp_Cn() ); insDoc.put("app_Country" , pbcBean.getApp_Country() ); insDoc.put("app_Date" , pbcBean.getApp_Date() ); insDoc.put("app_Name" , pbcBean.getApp_Name() ); insDoc.put("app_Sn" , pbcBean.getApp_Sn() ); insDoc.put("app_Type" , pbcBean.getApp_Type() ); insDoc.put("app_Zip" , pbcBean.getApp_Zip() ); insDoc.put("ecla" , pbcBean.getEcla() ); insDoc.put("fi" , pbcBean.getFi() ); insDoc.put("ft" , pbcBean.getFt() ); insDoc.put("id" , pbcBean.getId() ); insDoc.put("inv_Title" , pbcBean.getInv_Title() ); insDoc.put("invent_Type" , pbcBean.getInvent_Type() ); insDoc.put("inventor" , pbcBean.getInventor() ); insDoc.put("ipc_Standard" , pbcBean.getIpc_Standard() ); insDoc.put("locarno" , pbcBean.getLocarno() ); insDoc.put("operation_Time" , pbcBean.getOperation_Time() ); insDoc.put("operation_Type" , pbcBean.getOperation_Type() ); insDoc.put("package_Name" , pbcBean.getPackage_Name() ); insDoc.put("pct_App_Cn" , pbcBean.getPct_App_Cn() ); insDoc.put("pct_App_Date" , pbcBean.getPct_App_Date() ); insDoc.put("pct_App_Sn" , pbcBean.getPct_App_Sn() ); insDoc.put("pct_Date" , pbcBean.getPct_Date() ); insDoc.put("pct_Pub_Cn" , pbcBean.getPct_Pub_Cn() ); insDoc.put("pct_Pub_Date" , pbcBean.getPct_Pub_Date() ); insDoc.put("pct_Pub_Lang" , pbcBean.getPct_Pub_Lang() ); insDoc.put("pct_Pub_Sn" , pbcBean.getPct_Pub_Sn() ); insDoc.put("prn" , pbcBean.getPrn() ); insDoc.put("prn_Cn" , pbcBean.getPrn_Cn() ); insDoc.put("prn_Date" , pbcBean.getPrn_Date() ); insDoc.put("prn_Sn" , pbcBean.getPrn_Sn() ); insDoc.put("prn_Type" , pbcBean.getPrn_Type() ); insDoc.put("pub_Cn" , pbcBean.getPub_Cn() ); insDoc.put("pub_Date" , pbcBean.getPub_Date() ); insDoc.put("pub_Sn" , pbcBean.getPub_Sn() ); insDoc.put("pub_Type" , pbcBean.getPub_Type() ); insDoc.put("uc" , pbcBean.getUc() ); collection.insert(insDoc); insDoc.clear(); } // 循环遍历pdaBeanList // System.out.println("loading ..."); // logger.info(DateUtils.getNow() + " rows:" + pdaBeanList.size()); // 当前记录编号 // int currentRowNumber = 0; if ( m != null) m.close(); // System.out.println("Load finished."); } /** 记录日志 */ private void logMemory(String behavior, String content) { // 向服务器发送日志 // sendLog.send("syncPacker", behavior, content); // 记录本地日志 logger.info(DateUtils.getNow() + " " + behavior + " :" + content); // 控制台输出日志 // System.out.println("syncPacker : " + DateUtils.getNow() + " " + behavior + " :" + content); } @Override public String insert() throws Exception { return null; } @Override public String update() throws Exception { return null; } @Override public String selectList() throws Exception { return null; } @Override public String delete() throws Exception { return null; } public static boolean isRunning() { return isRunning; } public static void setRunning(boolean isRunning) { SyncPacker_201603.isRunning = isRunning; } public Integer getCenterNodeDataVersion() { return centerNodeDataVersion; } public void setCenterNodeDataVersion(Integer centerNodeDataVersion) { this.centerNodeDataVersion = centerNodeDataVersion; } public String getSuccess() { return success; } public void setSuccess(String success) { this.success = success; } public String getMaxPackageNumber() { return maxPackageNumber; } public void setMaxPackageNumber(String maxPackageNumber) { this.maxPackageNumber = maxPackageNumber; } public String getPackedPackageNumber() { return packedPackageNumber; } public void setPackedPackageNumber(String packedPackageNumber) { this.packedPackageNumber = packedPackageNumber; } public SyncDataPackageBean getBean() { return bean; } public void setBean(SyncDataPackageBean bean) { this.bean = bean; } }
一个备份,记一下吧:
1 package syncPacker; 2 3 import java.io.BufferedReader; 4 import java.io.File; 5 import java.io.FileReader; 6 import java.io.FileWriter; 7 import java.io.IOException; 8 import java.net.UnknownHostException; 9 import java.util.ArrayList; 10 import java.util.Date; 11 import java.util.HashMap; 12 import java.util.List; 13 import java.util.Map; 14 15 import org.apache.http.message.BasicNameValuePair; 16 import org.apache.log4j.Logger; 17 import org.springframework.beans.factory.annotation.Autowired; 18 import org.springframework.beans.factory.annotation.Value; 19 import org.springframework.stereotype.Service; 20 21 import com.mongodb.BasicDBObject; 22 import com.mongodb.DB; 23 import com.mongodb.DBCollection; 24 import com.mongodb.Mongo; 25 import com.pro.framework.action.BaseController; 26 27 import logCenter.SendLog; 28 import net.sf.json.JSONObject; 29 import syncPacker.bean.PatentBibliographicChangeBean; 30 import syncPacker.bean.SyncDataPackageBean; 31 import utils.DateUtils; 32 import utils.DatetimeUtils; 33 import utils.HttpUtils; 34 35 /** 36 * 增量数据分块打包 全处理 37 * 38 * http://localhost:8080/PatentSearchExtend/syncPacker!pack.action 39 * 40 * http://10.78.2.21:8080/PatentSearchExtend/syncPacker!pack.action?bean. 41 * tableName=E_BIBLIOGRAPHIC_CHANGE_TEMP&bean.maxRowsPerSyncPackerPackage= 10000 42 */ 43 @Service 44 public class SyncPacker_201603 extends BaseController { 45 46 private static final long serialVersionUID = 1L; 47 48 // 初始化:数据库接口 49 @Autowired 50 private SyncPackerDao dao; 51 52 // 初始化:发送端地址 53 @Value("${url_syncSender}") 54 private String url_syncSender; 55 56 // 初始化:本地文件存储路径 57 @Value("${path_syncPacker_package}") 58 private String path_syncPacker_package; 59 60 // 初始化:读取最大数据包名称的地址 61 @Value("${url_selectMaxPackageNumber}") 62 private String url_selectMaxPackageNumber; 63 64 // 初始化:存储最大数据包名称的地址 65 @Value("${url_insertSyncDataPackage}") 66 private String url_insertSyncDataPackage; 67 68 // 初始化:查询条件Bean 69 private SyncDataPackageBean bean = new SyncDataPackageBean(); 70 // 初始化:查询结果List 71 private List<PatentBibliographicChangeBean> pbcList = new ArrayList<PatentBibliographicChangeBean>(); 72 // 初始化:形成的数据包名称 73 private List<PatentBibliographicChangeBean> pbcList_withPackageName = new ArrayList<PatentBibliographicChangeBean>(); 74 // 初始化:已打包的增量数据ID表单 75 private List<String> pdaIdList = new ArrayList<String>(); 76 // 初始化:用于删除数据的临时ID List 77 private List<String> idPartList = new ArrayList<String>(); 78 // 初始化:传输协议 79 private HttpUtils httpUtils = new HttpUtils(); 80 // 初始化:键值串 81 private List<BasicNameValuePair> paramList = new ArrayList<BasicNameValuePair>(); 82 // POST 结果 83 private String str_postResult; 84 // 初始化:发送url返回的信息 85 private JSONObject json_postResult; 86 // 初始化:历史最大包编号 87 private String maxPackageNumber; 88 // 初始化:记录打包完成后的数据版本 89 private Integer centerNodeDataVersion; 90 // 发送远程日志 91 private SendLog sendLog = new SendLog(); 92 // 记录本地日志 93 private Logger logger = Logger.getLogger(SyncPacker_201603.class); 94 // 初始化:判断程序是否正在运行 95 public static boolean isRunning = false; 96 97 // 本次处理完成后的最大包编号 98 private String packedPackageNumber; 99 // 用于返回json的成功信息 100 private String success = "success"; 101 102 /** 主处理:数据同步_增量数据分块打包 */ 103 // @Scheduled(cron = "${scheduled_syncPacker}") 104 public String pack() { 105 106 logMemory("scheduled_syncPacker start", ""); 107 if (isRunning) { 108 logger.info(DateUtils.getNow() + "正在运行,退出"); 109 return SUCCESS; 110 } 111 isRunning = true; 112 // 数据包数量 113 int packagesAmount = 0; 114 Date startTime = new Date(); 115 116 for (int i = 1; i <= 1; i++) { 117 logMemory("循环内部(单个数据包)开始:内部编号", "" + i); 118 119 try { 120 // 1.读取定量数据 121 selectPbcList(); 122 if (pbcList.size() <= 0) { 123 logMemory("无数据", "退出"); 124 break; 125 } 126 127 // 2.形成文件 128 if (save2disk()) { 129 packagesAmount++; 130 // 3.转移数据 131 transferFromChange2History(); 132 } 133 } catch (Exception e) { 134 // 日志输出 135 logMemory("系统发生异常", e.getMessage()); 136 e.printStackTrace(); 137 break; 138 } finally { 139 // 清空临时List 140 paramList.clear(); 141 pbcList.clear(); 142 pbcList_withPackageName.clear(); 143 pdaIdList.clear(); 144 System.gc(); 145 } 146 logMemory("循环内部(单个数据包)结束,内部编号", "" + i); 147 } 148 // loop end 149 150 // 清空临时List 151 paramList.clear(); 152 pbcList.clear(); 153 pbcList_withPackageName.clear(); 154 pdaIdList.clear(); 155 System.gc(); 156 157 isRunning = false; 158 logMemory("本次打包结束,新增数据包数量:" + packagesAmount + ",历时:" 159 + DatetimeUtils.getDistanceTimes_string(startTime, new Date()), ""); 160 logMemory("finished", "scheduled_syncPacker end"); 161 162 return SUCCESS; 163 } 164 165 /** selectPbcList() */ 166 // 读取 专利著录变化 数据 数据库有连接失败的时候,尝试 10 次 167 private void selectPbcList() { 168 169 // 日志输出 170 logMemory("增量数据分块打包开始执行", ""); 171 172 for (int i = 0; i < 10; i++) { 173 try { 174 // -------------- 查询分块著录信息增量数据 -------------- 175 logMemory("准备读取数据", ""); 176 // 每次取10000 177 pbcList = dao.selectList(bean); 178 logMemory("读取完毕", "数据量" + pbcList.size()); 179 return; 180 } catch (Exception e) { 181 // 日志输出 182 logMemory("系统发生异常", e.getMessage()); 183 e.printStackTrace(); 184 } 185 } 186 // loop end 187 return; 188 } 189 190 /** 191 * save2disk() 192 * 193 * 将打包文件存盘 194 */ 195 private boolean save2disk() throws Exception { 196 197 // -------------- 分块打包增量数据 -------------- 198 199 // 查询上一次处理最后生成的数据包名称 200 logMemory("准备查询最后一个数据包名称", ""); 201 202 // 发送url,请求对应服务器返回最大数据包名称 203 // syncDataPackageBean = dao.lastDataPackageInfo(0); 204 str_postResult = httpUtils.post(url_selectMaxPackageNumber, null); 205 logMemory("取最大包编号:" + url_selectMaxPackageNumber, " 返回 " + str_postResult); 206 207 // 判断是否成功发送信息 208 if (str_postResult == null || "".equals(str_postResult)) { 209 // 发送失败,跳出循环,结束程序 210 logMemory("失败!", " 返回为空"); 211 isRunning = false; 212 return false; 213 } 214 215 // 解析Post返回的Json 216 json_postResult = JSONObject.fromObject(str_postResult); 217 218 // 判断是否成功获取到最大包编号 219 if ("success".equalsIgnoreCase(json_postResult.getString("success"))) { 220 // 发送成功,进行下一步操作 221 maxPackageNumber = json_postResult.get("maxPackageNumber").toString(); 222 } else { 223 // 发送失败,跳出循环,结束程序 224 logMemory("失败!", "没有返回成功标志。"); 225 isRunning = false; 226 return false; 227 } 228 229 logMemory("上一次最后一个数据包名称", maxPackageNumber); 230 logMemory("准备将数据写入文件", " 路径:" + path_syncPacker_package); 231 232 // 得到新数据包名,同时分块打包增量数据 233 pbcList_withPackageName = DataPacking_2.CompressPdaData(pbcList, maxPackageNumber, path_syncPacker_package); 234 235 // 日志输出 236 logMemory("新数据包名称", pbcList_withPackageName.get(0).getPackage_Name() + ".txt"); 237 logMemory("转换完成,数量", String.valueOf(pbcList_withPackageName.size())); 238 239 // 记录打包完成的数据版本信息(流水号) 240 centerNodeDataVersion = Integer.parseInt(pbcList_withPackageName.get(0).getPackage_Name()); 241 242 // -------------- 记录已完成的数据包名称 -------------- 243 // 发送打包完成的数据包名称(流水号) 244 // dao.insertPacName(syncDataPackageBean); 245 paramList = new ArrayList<BasicNameValuePair>(); 246 paramList.add(0, 247 new BasicNameValuePair("packedPackageNumber", pbcList_withPackageName.get(0).getPackage_Name())); 248 249 str_postResult = httpUtils.post(url_insertSyncDataPackage, paramList); 250 logMemory("新增数据包:" + url_insertSyncDataPackage, " 返回 " + str_postResult); 251 252 // 判断是否成功发送信息 253 if (str_postResult == null || "".equals(str_postResult)) { 254 // 发送失败,跳出循环,结束程序 255 logMemory("失败!", " 返回为空"); 256 isRunning = false; 257 return false; 258 } else { 259 // 解析Post返回的Json 260 json_postResult = JSONObject.fromObject(str_postResult); 261 // 判断是否成功发送信息 262 if (!"success".equalsIgnoreCase(json_postResult.getString("success"))) { 263 // 发送失败,跳出循环,结束程序 264 logMemory("失败!", "没有返回成功标志。"); 265 isRunning = false; 266 return false; 267 } 268 } 269 logMemory("新增数据包,完成", ""); 270 return true; 271 } 272 273 /** 274 * transferFromChange2History 275 * 276 * 将数据从 Change 表移至 History 表 277 */ 278 private void transferFromChange2History() throws Exception { 279 280 // -------------- 添加数据至归档表 -------------- 281 logMemory("准备将已打包的记录移入归档表,数据量:", String.valueOf(pbcList_withPackageName.size())); 282 283 // 根据打包后的全量数据量,循环执行插入历史表 284 for (int j = 0; j < pbcList_withPackageName.size(); j++) { 285 // 执行SQL插入操作语句 286 dao.appendIntoChangeHistory(pbcList_withPackageName.get(j)); 287 pdaIdList.add(pbcList_withPackageName.get(j).getId()); 288 } 289 // 日志输出 290 logMemory("插入归档表,结束", String.valueOf(pdaIdList.size())); 291 292 // -------------- 删除操作表的数据 -------------- 293 logMemory("准备删除专利著录信息变化表中的数据,数据量", String.valueOf(pdaIdList.size())); 294 // List元素数量 295 double total = pdaIdList.size(); 296 // 总SQL删除语句执行次数 297 int loop = (int) Math.ceil(total / 1000); 298 // List下标控制 299 int idnumber; 300 for (int k = 0; k < loop; k++) { 301 // 每次只能删除 1000 条,否则SQL长度超长,出错。 302 for (int z = 0; z < 1000; z++) { 303 idnumber = k * 1000 + z; 304 if (idnumber >= total) { 305 break; 306 } 307 // 循环拼接删除条件List 308 idPartList.add(pbcList_withPackageName.get(idnumber).getId()); 309 } 310 // 执行批量删除操作 311 dao.removeFromChangeTable(idPartList); 312 idPartList.clear(); 313 } 314 // 日志输出 315 logMemory("全部删除完毕,删除数据量:", String.valueOf(pdaIdList.size())); 316 } 317 318 /** 获取历史最大Json编号 */ 319 // http://10.78.2.22:8080/PatentSearchExtend/syncPacker!selectMaxPackageNum.action 320 public String selectMaxPackageNum() { 321 try { 322 maxPackageNumber = dao.getMaxPackageNumber(); 323 } catch (Exception e) { 324 e.printStackTrace(); 325 this.setSuccess(false); 326 } 327 return SUCCESS; 328 } 329 330 /** 存储当前打包Json编号 */ 331 // http://10.78.2.22:8080/PatentSearchExtend/syncPacker!insertSyncDataPackage.action 332 public String insertSyncDataPackage() { 333 try { 334 dao.insertSyncDataPackage(packedPackageNumber); 335 } catch (Exception e) { 336 e.printStackTrace(); 337 this.setSuccess(false); 338 } 339 return SUCCESS; 340 } 341 342 /** 文件补发用:指定数据重新打包 */ 343 // http://localhost:8080/PatentSearchExtend/syncPacker!packByPackageNumber.action?bean.packageNumberStart=000101&bean.packageNumberEnd=000102 344 public String packByPackageNumber() throws Exception { 345 346 logMemory("本次请求处理开始。", bean.getPackageNumberStart() + " - " + bean.getPackageNumberEnd() ); 347 348 try { 349 StringBuffer fileFullName; 350 // 函数计算用Integer型数据 351 // int packageNumberStart = Integer.valueOf(bean.getPackageNumberStart()); 352 // int packageNumberEnd = Integer.valueOf(bean.getPackageNumberEnd()); 353 // 重新设置bean地数据包编号 354 // bean.setPackageNumberStart(String.format("%06d", packageNumberStart)); 355 // bean.setPackageNumberEnd(String.format("%06d", packageNumberEnd)); 356 // 输出日志 357 // logMemory("本次手动补包的开始编号为", bean.getPackageNumberStart()); 358 // logMemory("本次手动补包的结束编号为", bean.getPackageNumberEnd()); 359 360 // 打包指定数据包编号对应的数据 361 // for (int i = 0; i < Integer.valueOf(bean.getPackageNumberEnd()) 362 // - Integer.valueOf(bean.getPackageNumberStart()) + 1; i++) { 363 for ( int i = Integer.valueOf(bean.getPackageNumberStart()); 364 i <= Integer.valueOf(bean.getPackageNumberEnd()); i++) { 365 366 String date_start = DateUtils.getCurrentTimeString(0); 367 368 String packageNumber = String.format( "%06d", i ); 369 370 logMemory("开始,包编号:", packageNumber ); 371 372 // 删除现有文件 373 // logMemory("开始配置数据包存储路径", ""); 374 // fileFullName = new StringBuffer(); 375 //// fileFullName.append(path_syncPacker_package); 376 // fileFullName.append("d:/Patent_Bibliographic_Change_Packages/PBC_"); 377 // fileFullName.append( packageNumber ); 378 // fileFullName.append(".txt"); 379 // logMemory("数据包全路径为", fileFullName.toString()); 380 // 381 // File file; 382 // file = new File(fileFullName.toString()); 383 // if(file.exists()&&file.isFile()) file.delete(); 384 385 // if (file.isFile() && file.exists()) { 386 // logMemory("原数据包存在", fileFullName.toString()); 387 // file.delete(); 388 // logMemory("原数据包已删除", fileFullName.toString()); 389 // } else { 390 // logMemory("原数据包不存在,准备创建新文件", fileFullName.toString()); 391 // } 392 393 // 打包数据 394 // logMemory("设置新的文件包编号", String.format("%06d", "" + i)); 395 // bean.setPackageNumber(String.format("%06d", "" + i)); 396 // pbcList = dao.repackList(bean); 397 398 pbcList = packByPackageNumber_readHistory( packageNumber ); 399 400 if( null == pbcList ) continue ; 401 402 logMemory("数据查询完毕,数据量为", String.valueOf( pbcList.size() )); 403 404 // Map<String, JSONObject> bibliographicMap = new HashMap(); 405 // FileWriter writer = new FileWriter( fileFullName.toString(), true); 406 // 407 // for( PatentBibliographicChangeBean pbcBean : pbcList ){ 408 // 409 // // 读取 著录 数据 410 // bibliographicMap.put( pbcBean.getId(), packByPackageNumber_readBibl( pbcBean.getId() )); 411 // try { 412 // //打开一个写文件器,构造函数中的第二个参数true表示以追加形式写文件 413 // writer.write( packByPackageNumber_readBibl( pbcBean.getId() ).toString() + ‘\r‘ ); 414 // } catch (IOException e) { 415 // e.printStackTrace(); 416 // } 417 // } 418 // writer.close(); 419 insertMongoDB( pbcList ); 420 421 //// File file = new File(fileName); 422 // BufferedReader reader = null; 423 // try { 424 // System.out.println("以行为单位读取文件内容,一次读一整行:"); 425 // reader = new BufferedReader(new FileReader(file)); 426 // String tempString = null; 427 // int line = 1; 428 // // 一次读入一行,直到读入null为文件结束 429 // while ((tempString = reader.readLine()) != null) { 430 // // 显示行号 431 // System.out.println("line " + line + ": " + tempString); 432 // line++; 433 // } 434 // reader.close(); 435 // } catch (IOException e) { 436 // e.printStackTrace(); 437 // } finally { 438 // if (reader != null) { 439 // try { 440 // reader.close(); 441 // } catch (IOException e1) { 442 // } 443 // } 444 // } 445 446 // for( String mapKey : bibliographicMap.keySet()){ 447 // JSONObject bibliographic = bibliographicMap.get( mapKey ); 448 // System.out.println( bibliographic ); 449 // } 450 451 // logMemory("数据打包完毕数据", fileFullName.toString()); 452 pbcList.clear(); 453 454 logMemory(DateUtils.getNow() + " 传输结束,包编号:" + packageNumber ," 用时: " 455 + DatetimeUtils.getDistanceTimes_string(date_start, DateUtils.getCurrentTimeString(0))); 456 } 457 } catch (Exception e) { 458 // 日志输出 459 logMemory("系统发生异常", e.getMessage()); 460 e.printStackTrace(); 461 } 462 logMemory("本次请求处理完成,程序结束。", bean.getPackageNumberStart() + " - " + bean.getPackageNumberEnd() ); 463 464 return SUCCESS; 465 } 466 467 468 /** 469 * 读历史表 470 * @param packageNumber 471 * @return 472 */ 473 private List<PatentBibliographicChangeBean> packByPackageNumber_readHistory( String packageNumber ){ 474 475 for (int i = 0; i < 10; i++) { 476 try { 477 return dao.packByPackageNumber_readHistory( packageNumber ); 478 } catch (Exception e) { 479 // TODO Auto-generated catch block 480 e.printStackTrace(); 481 logMemory("系统发生异常", e.getMessage()); 482 try { 483 Thread.sleep(500); 484 } catch (InterruptedException e1) { 485 // TODO Auto-generated catch block 486 e1.printStackTrace(); 487 } 488 } 489 }// loop end 490 return null ; 491 492 } 493 494 495 /** 496 * 读 著录数据 497 * @return 498 */ 499 private String packByPackageNumber_readBibliographic( String patentId ){ 500 501 StringBuffer sb = new StringBuffer(); 502 503 try { 504 pbcList = dao.packByPackageNumber_readBibliographic( patentId ); 505 506 for( PatentBibliographicChangeBean pbcBean : pbcList ){ 507 // 读取 著录 数据 508 sb.append( pbcBean.toString() ); 509 } 510 511 } catch (Exception e) { 512 // TODO Auto-generated catch block 513 e.printStackTrace(); 514 } 515 return sb.toString() ; 516 } 517 518 private JSONObject packByPackageNumber_readBibl( String patentId ){ 519 520 StringBuffer sb = new StringBuffer(); 521 JSONObject json = new JSONObject(); 522 523 for (int i = 0; i < 10; i++) { 524 try { 525 pbcList = dao.packByPackageNumber_readBibliographic( patentId ); 526 527 for( PatentBibliographicChangeBean pbcBean : pbcList ){ 528 // 读取 著录 数据 529 json = JSONObject.fromObject( pbcBean ); 530 } 531 return json ; 532 533 } catch (Exception e) { 534 // TODO Auto-generated catch block 535 e.printStackTrace(); 536 logMemory("系统发生异常", e.getMessage()); 537 try { 538 Thread.sleep(500); 539 } catch (InterruptedException e1) { 540 // TODO Auto-generated catch block 541 e1.printStackTrace(); 542 } 543 } 544 }// loop end 545 546 return null ; 547 } 548 549 550 /** 551 * 插入 MongoDB 552 */ 553 public void insertMongoDB( List<PatentBibliographicChangeBean> pbcList ) { 554 555 //# MongoDB(数据加载目标) 556 String syncLoadIntoMongoDbService = "10.78.2.23:27017"; 557 String syncLoadIntoMongoDbName = "patent_search_extend"; 558 String syncLoadIntoMongoTable = "patent_bibliographic_20160319"; 559 560 // 加载开始 561 // logger.info(DateUtils.getNow() + " Load start: " + syncLoadIntoMongoDbService ); 562 563 Mongo m = null; 564 try { 565 m = new Mongo( syncLoadIntoMongoDbService ); 566 } catch (UnknownHostException e) { 567 e.printStackTrace(); 568 logger.info(DateUtils.getNow() + " UnknownHostException:" + e.getMessage()); 569 } 570 571 // 库名 572 DB db = m.getDB( syncLoadIntoMongoDbName ); 573 // logger.info( DateUtils.getNow() + " Db:" + syncLoadIntoMongoDbName ); 574 575 // 表名 576 DBCollection collection = db.getCollection( syncLoadIntoMongoTable ); 577 // logger.info(DateUtils.getNow() + " Table:" + syncLoadIntoMongoTable ); 578 579 // 循环列表,将每个元素插入数据库 580 for( PatentBibliographicChangeBean pbcBean : pbcList ){ 581 582 // 读取一条著录数据 583 JSONObject json = packByPackageNumber_readBibl( pbcBean.getId() ); 584 585 if( null == json ) return ; 586 587 // 列,值 588 BasicDBObject insDoc = new BasicDBObject(); 589 590 insDoc.put("abstract_No" , json.get( "abstract_No" ).toString()); 591 insDoc.put("app_Addr" , json.get( "app_Addr" ).toString()); 592 insDoc.put("app_Cn" , json.get( "app_Cn" ).toString()); 593 insDoc.put("app_Country" , json.get( "app_Country" ).toString()); 594 insDoc.put("app_Date" , json.get( "app_Date" ).toString()); 595 insDoc.put("app_Name" , json.get( "app_Name" ).toString()); 596 insDoc.put("app_Sn" , json.get( "app_Sn" ).toString()); 597 insDoc.put("app_Type" , json.get( "app_Type" ).toString()); 598 insDoc.put("app_Zip" , json.get( "app_Zip" ).toString()); 599 insDoc.put("ecla" , json.get( "ecla" ).toString()); 600 insDoc.put("fi" , json.get( "fi" ).toString()); 601 insDoc.put("ft" , json.get( "ft" ).toString()); 602 insDoc.put("id" , json.get( "id" ).toString()); 603 insDoc.put("inv_Title" , json.get( "inv_Title" ).toString()); 604 insDoc.put("invent_Type" , json.get( "invent_Type" ).toString()); 605 insDoc.put("inventor" , json.get( "inventor" ).toString()); 606 insDoc.put("ipc_Standard" , json.get( "ipc_Standard" ).toString()); 607 insDoc.put("locarno" , json.get( "locarno" ).toString()); 608 insDoc.put("operation_Time" , json.get( "operation_Time" ).toString()); 609 insDoc.put("operation_Type" , json.get( "operation_Type" ).toString()); 610 insDoc.put("package_Name" , json.get( "package_Name" ).toString()); 611 insDoc.put("pct_App_Cn" , json.get( "pct_App_Cn" ).toString()); 612 insDoc.put("pct_App_Date" , json.get( "pct_App_Date" ).toString()); 613 insDoc.put("pct_App_Sn" , json.get( "pct_App_Sn" ).toString()); 614 insDoc.put("pct_Date" , json.get( "pct_Date" ).toString()); 615 insDoc.put("pct_Pub_Cn" , json.get( "pct_Pub_Cn" ).toString()); 616 insDoc.put("pct_Pub_Date" , json.get( "pct_Pub_Date" ).toString()); 617 insDoc.put("pct_Pub_Lang" , json.get( "pct_Pub_Lang" ).toString()); 618 insDoc.put("pct_Pub_Sn" , json.get( "pct_Pub_Sn" ).toString()); 619 insDoc.put("prn" , json.get( "prn" ).toString()); 620 insDoc.put("prn_Cn" , json.get( "prn_Cn" ).toString()); 621 insDoc.put("prn_Date" , json.get( "prn_Date" ).toString()); 622 insDoc.put("prn_Sn" , json.get( "prn_Sn" ).toString()); 623 insDoc.put("prn_Type" , json.get( "prn_Type" ).toString()); 624 insDoc.put("pub_Cn" , json.get( "pub_Cn" ).toString()); 625 insDoc.put("pub_Date" , json.get( "pub_Date" ).toString()); 626 insDoc.put("pub_Sn" , json.get( "pub_Sn" ).toString()); 627 insDoc.put("pub_Type" , json.get( "pub_Type" ).toString()); 628 insDoc.put("uc" , json.get( "uc" ).toString()); 629 630 collection.insert(insDoc); 631 insDoc.clear(); 632 633 } 634 635 // 循环遍历pdaBeanList 636 // System.out.println("loading ..."); 637 // logger.info(DateUtils.getNow() + " rows:" + pdaBeanList.size()); 638 639 // 当前记录编号 640 // int currentRowNumber = 0; 641 642 if ( m != null) m.close(); 643 644 // System.out.println("Load finished."); 645 } 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 /** 记录日志 */ 665 private void logMemory(String behavior, String content) { 666 // 向服务器发送日志 667 // sendLog.send("syncPacker", behavior, content); 668 // 记录本地日志 669 logger.info(DateUtils.getNow() + " " + behavior + " :" + content); 670 // 控制台输出日志 671 // System.out.println("syncPacker : " + DateUtils.getNow() + " " + behavior + " :" + content); 672 } 673 674 @Override 675 public String insert() throws Exception { 676 return null; 677 } 678 679 @Override 680 public String update() throws Exception { 681 return null; 682 } 683 684 @Override 685 public String selectList() throws Exception { 686 return null; 687 } 688 689 @Override 690 public String delete() throws Exception { 691 return null; 692 } 693 694 public static boolean isRunning() { 695 return isRunning; 696 } 697 698 public static void setRunning(boolean isRunning) { 699 SyncPacker_201603.isRunning = isRunning; 700 } 701 702 public Integer getCenterNodeDataVersion() { 703 return centerNodeDataVersion; 704 } 705 706 public void setCenterNodeDataVersion(Integer centerNodeDataVersion) { 707 this.centerNodeDataVersion = centerNodeDataVersion; 708 } 709 710 public String getSuccess() { 711 return success; 712 } 713 714 public void setSuccess(String success) { 715 this.success = success; 716 } 717 718 public String getMaxPackageNumber() { 719 return maxPackageNumber; 720 } 721 722 public void setMaxPackageNumber(String maxPackageNumber) { 723 this.maxPackageNumber = maxPackageNumber; 724 } 725 726 public String getPackedPackageNumber() { 727 return packedPackageNumber; 728 } 729 730 public void setPackedPackageNumber(String packedPackageNumber) { 731 this.packedPackageNumber = packedPackageNumber; 732 } 733 734 public SyncDataPackageBean getBean() { 735 return bean; 736 } 737 738 public void setBean(SyncDataPackageBean bean) { 739 this.bean = bean; 740 } 741 }
# log4j.rootLogger = INFO, stdout log4j.rootLogger= INFO, syncLoader # + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + # 数据加载器( syncLoader ) # + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + log4j.logger.com.neusoft.patent.search.extend.syncLoader.SyncLoaderAction = INFO, syncLoader # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - log4j.appender.syncLoader.File = ${webapp.root}/logs/syncLoader_20160318/log.html log4j.appender.syncLoader = org.apache.log4j.RollingFileAppender log4j.appender.syncLoader.Encoding = GBK log4j.appender.syncLoader.MaxFileSize = 5MB log4j.appender.syncLoader.MaxBackupIndex = 100 log4j.appender.syncLoader.Threshold = INFO log4j.appender.syncLoader.layout = org.apache.log4j.HTMLLayout
SyncPackerDao.xml
1 <?xml version="1.0" encoding="UTF-8" ?> 2 <!DOCTYPE mapper 3 PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" 4 "http://mybatis.org/dtd/mybatis-3-mapper.dtd"> 5 6 <mapper namespace="syncPacker.SyncPackerDao"> 7 8 <!-- 查询分块著录信息 --> 9 <select id="selectList" 10 parameterType="syncPacker.bean.SyncDataPackageBean" 11 resultType="syncPacker.bean.PatentBibliographicChangeBean"> 12 select t.id, 13 t.operation_time, 14 t.operation_type, 15 sv.app_cn, 16 sv.app_sn, 17 sv.app_type, 18 sv.app_date, 19 sv.pub_cn, 20 sv.pub_sn, 21 sv.pub_type, 22 sv.pub_date, 23 sv.invent_type, 24 sv.app_name, 25 sv.inventor, 26 sv.app_country, 27 sv.app_addr, 28 sv.app_zip, 29 sv.inv_title, 30 sv.abstract_no, 31 sv.pct_date, 32 sv.pct_app_cn, 33 sv.pct_app_sn, 34 sv.pct_app_date, 35 sv.pct_pub_cn, 36 sv.pct_pub_sn, 37 sv.pct_pub_lang, 38 sv.pct_pub_date, 39 sv.ipc_standard, 40 sv.ecla, 41 sv.uc, 42 sv.fi, 43 sv.ft, 44 sv.prn_cn, 45 sv.prn_sn, 46 sv.prn_type, 47 sv.prn_date, 48 sv.prn, 49 sv.locarno 50 from (select t.id, t.operation_time, t.operation_type 51 from ${tableName} t 52 where rownum <= ${maxRowsPerSyncPackerPackage} 53 order by operation_time asc) t 54 left join sipo_vdb sv 55 on t.id = sv.id 56 </select> 57 58 <!-- 添加数据至归档表 --> 59 <insert id="appendIntoChangeHistory" 60 parameterType="syncPacker.bean.PatentBibliographicChangeBean"> 61 insert into e_bibliographic_change_history 62 (id, 63 operation_time, 64 operation_type, 65 package_number) 66 values 67 (#{Id}, 68 #{Operation_Time}, 69 #{Operation_Type}, 70 #{Package_Name}) 71 </insert> 72 73 <!-- 删除操作表的数据 --> 74 <delete id="removeFromChangeTable" parameterType="java.util.List"> 75 delete from 76 e_bibliographic_change_temp 77 where id in 78 <foreach collection="list" index="index" item="item" open="(" separator="," close=")"> 79 #{item} 80 </foreach> 81 </delete> 82 83 <!-- 获取当前最大序列号的数据包信息 --> 84 <select id="getMaxPackageNumber" resultType="String"> 85 select 86 ifnull(max(sync_data_package.packageNumber),0) packageNumber 87 from 88 sync_data_package 89 </select> 90 91 <!-- 添加数据包编号至发送记录表 --> 92 <insert id="insertSyncDataPackage" parameterType="String"> 93 insert into 94 sync_data_package 95 (packageNumber) 96 values 97 (#{theMaxSerialNumber}) 98 </insert> 99 100 <!-- 重新打包指定数据 --> 101 <select id="repackList" 102 parameterType="syncPacker.bean.SyncDataPackageBean" 103 resultType="syncPacker.bean.PatentBibliographicChangeBean"> 104 select t.id, 105 t.operation_time, 106 t.operation_type, 107 sv.app_cn, 108 sv.app_sn, 109 sv.app_type, 110 sv.app_date, 111 sv.pub_cn, 112 sv.pub_sn, 113 sv.pub_type, 114 sv.pub_date, 115 sv.invent_type, 116 sv.app_name, 117 sv.inventor, 118 sv.app_country, 119 sv.app_addr, 120 sv.app_zip, 121 sv.inv_title, 122 sv.abstract_no, 123 sv.pct_date, 124 sv.pct_app_cn, 125 sv.pct_app_sn, 126 sv.pct_app_date, 127 sv.pct_pub_cn, 128 sv.pct_pub_sn, 129 sv.pct_pub_lang, 130 sv.pct_pub_date, 131 sv.ipc_standard, 132 sv.ecla, 133 sv.uc, 134 sv.fi, 135 sv.ft, 136 sv.prn_cn, 137 sv.prn_sn, 138 sv.prn_type, 139 sv.prn_date, 140 sv.prn, 141 sv.locarno 142 from (select t.id, t.operation_time, t.operation_type, t.package_number 143 from E_BIBLIOGRAPHIC_CHANGE_HISTORY t 144 where t.package_number = #{packageNumber} 145 order by transfer_time asc) t 146 left join sipo_vdb sv 147 on t.id = sv.id 148 </select> 149 150 151 <!-- 152 重新打包: 读取著录变化数据 153 参数:包编号 154 --> 155 <select id="selectList_changeHistory" 156 parameterType="java.lang.String" 157 resultType="syncPacker.bean.PatentBibliographicChangeBean"> 158 159 select 160 sv.id, 161 sv.app_cn, 162 sv.app_sn, 163 sv.app_type, 164 sv.app_date, 165 sv.pub_cn, 166 sv.pub_sn, 167 sv.pub_type, 168 sv.pub_date, 169 sv.invent_type, 170 sv.app_name, 171 sv.inventor, 172 sv.app_country, 173 sv.app_addr, 174 sv.app_zip, 175 sv.inv_title, 176 sv.abstract_no, 177 sv.pct_date, 178 sv.pct_app_cn, 179 sv.pct_app_sn, 180 sv.pct_app_date, 181 sv.pct_pub_cn, 182 sv.pct_pub_sn, 183 sv.pct_pub_lang, 184 sv.pct_pub_date, 185 sv.ipc_standard, 186 sv.ecla, 187 sv.uc, 188 sv.fi, 189 sv.ft, 190 sv.prn_cn, 191 sv.prn_sn, 192 sv.prn_type, 193 sv.prn_date, 194 sv.prn, 195 sv.locarno 196 197 from 198 sipo_vdb sv 199 200 where 201 exists ( 202 select 1 from e_bibliographic_change_history t 203 where sv.ID = t.id and t.package_number = #{packageNumber} 204 ) 205 206 </select> 207 208 </mapper>
标签:
原文地址:http://www.cnblogs.com/livon/p/5296045.html