1.编写BinSFTPDataSource数据源,用于生成响应的InputStream流.编写过程中注意流的关闭,否则容易造成Too many files 异常.
package org.apache.solr.handler.dataimport; import static org.apache.solr.handler.dataimport.DataImportHandlerException.SEVERE; import static org.apache.solr.handler.dataimport.DataImportHandlerException.wrapAndThrow; import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.jcraft.jsch.ChannelSftp; import com.jcraft.jsch.JSch; import com.jcraft.jsch.JSchException; import com.jcraft.jsch.Session; import java.io.IOException; import java.io.InputStream; import java.util.Properties; import java.util.regex.Pattern; /** */ public class BinSFTPDataSource extends DataSource<InputStream> { Logger LOG = LoggerFactory.getLogger(BinSFTPDataSource.class); private Session session ; private ChannelSftp channel; private InputStream is; private String baseUrl; private String username; private String password; private String host; private int connectionTimeout = CONNECTION_TIMEOUT; private int readTimeout = READ_TIMEOUT; private Context context; private Properties initProps; public BinSFTPDataSource() { } @Override public void init(Context context, Properties initProps) { this.context = context; this.initProps = initProps; baseUrl = getInitPropWithReplacements(BASE_URL); String cTimeout = getInitPropWithReplacements(CONNECTION_TIMEOUT_FIELD_NAME); String rTimeout = getInitPropWithReplacements(READ_TIMEOUT_FIELD_NAME); username = getInitPropWithReplacements(USERNAME); password = getInitPropWithReplacements(PASSWORD); host = getInitPropWithReplacements(HOST); if (cTimeout != null) { try { connectionTimeout = Integer.parseInt(cTimeout); } catch (NumberFormatException e) { LOG.warn("Invalid connection timeout: " + cTimeout); } } if (rTimeout != null) { try { readTimeout = Integer.parseInt(rTimeout); } catch (NumberFormatException e) { LOG.warn("Invalid read timeout: " + rTimeout); } } try { JSch jsch = new JSch(); // 创建JSch对象 session = jsch.getSession(username, host, PORT); if (password != null) session.setPassword(password); Properties config = new Properties(); config.put("StrictHostKeyChecking", "no"); session.setConfig(config); session.setTimeout(readTimeout); session.connect(connectionTimeout); } catch (JSchException e) { close(); e.printStackTrace(); } } @Override public InputStream getData(String filename) { if(StringUtils.isEmpty(filename)) return null; if(StringUtils.isNotEmpty(baseUrl)) filename = baseUrl + filename; try { LOG.info("session isConnect:"+session.isConnected()); channel = (ChannelSftp) session.openChannel("sftp"); channel.connect(); // 建立SFTP通道的连接 LOG.info("channel isConnect:"+channel.isConnected()); is = channel.get(filename); return is; } catch (Exception e) { close(); LOG.error("Exception thrown while getting data", e); wrapAndThrow(SEVERE, e, "Exception in invoking url " +filename); return null;// unreachable } } @Override public void close() { if(is!=null) try { is.close(); } catch (IOException e) { e.printStackTrace(); } if(channel!=null) channel.disconnect(); // if(session!=null) session.disconnect(); } public String getBaseUrl() { return baseUrl; } private String getInitPropWithReplacements(String propertyName) { final String expr = initProps.getProperty(propertyName); if (expr == null) { return null; } return context.replaceTokens(expr); } static final Pattern URIMETHOD = Pattern.compile("sftp:/", Pattern.CASE_INSENSITIVE); public static final String ENCODING = "encoding"; public static final String BASE_URL = "baseUrl"; public static final String UTF_8 = "UTF-8"; public static final String CONNECTION_TIMEOUT_FIELD_NAME = "connectionTimeout"; public static final String READ_TIMEOUT_FIELD_NAME = "readTimeout"; public static final int CONNECTION_TIMEOUT = 5000; public static final int READ_TIMEOUT = 10000; public static final String USERNAME = "username"; public static final String PASSWORD = "password"; public static final String HOST = "host"; public static final int PORT = 22; }
2. 编写URLListEntityProcessor.java类,用于循环遍历多url文件.
package org.apache.solr.handler.dataimport; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; /** * 主要用于抽取多个文件内容.可是是本地主机也可以是远程主机上的文件 */ public class URLListEntityProcessor extends EntityProcessorBase { /** * 文件名字符串 */ protected String fileNames; /** * 文件名字符串分隔符 */ protected String regex; /** * data-config.xml中给定的基础目录 */ protected String baseDir; /** * The recursive given in data-config. Default value is false. */ protected boolean recursive = false; @Override public void init(Context context) { super.init(context); fileNames = context.getEntityAttribute(FILE_NAMES); if (fileNames != null) { fileNames = context.replaceTokens(fileNames); } regex = context.getEntityAttribute(REGEX); if (regex != null) { regex = context.replaceTokens(regex); } baseDir = context.getEntityAttribute(BASE_DIR); if (baseDir == null) throw new DataImportHandlerException(DataImportHandlerException.SEVERE, "‘baseDir‘ is a required attribute"); baseDir = context.replaceTokens(baseDir); String r = context.getEntityAttribute(RECURSIVE); if (r != null) recursive = Boolean.parseBoolean(r); } @Override public Map<String, Object> nextRow() { if (rowIterator != null) return getNext(); List<Map<String, Object>> fileDetails = new ArrayList<Map<String, Object>>(); getUrls(fileDetails); rowIterator = fileDetails.iterator(); return getNext(); } private void getUrls(final List<Map<String, Object>> fileDetails) { String[] names = fileNames.split(regex); for(String name : names){ Map<String, Object> details = new HashMap<String, Object>(); details.put(FILE_NAME, baseDir+name); fileDetails.add(details); } } public static final String DIR = "fileDir"; public static final String ABSOLUTE_FILE = "fileAbsolutePath"; public static final String FILE_NAME = "fileName"; public static final String FILE_NAMES = "fileNames"; public static final String BASE_DIR = "baseDir"; public static final String REGEX = "regex"; public static final String RECURSIVE = "recursive"; }
<dataConfig> <dataSource name="jdbc" driver="oracle.jdbc.driver.OracleDriver" url="jdbc:oracle:thin:@" user="kms_iep" password="kms_iep" batchSize="2000"/> <dataSource name="binSftp" type="BinSFTPDataSource" username="kms" password="kms" host="" connectionTimeout="10000" readTimeout="20000" /> <document> <entity pk="ID" dataSource="jdbc" name="province" query="select (provincecode || ‘_‘ || kng_id) as id, kng_id, kng_type as type, kng_title as title, provincecode, opertime, modify_date, url, pack_month_fee, pack_type, pack_sen_flow, filename_html, (‘/kmsinterface/jt/province_bak/‘ || provincecode || ‘/‘ || to_char(opertime, ‘yyyymmdd‘) || substr(filepath,instr(filepath,‘/‘,2)) || ‘/‘ ) as path, (‘/kmsinterface/‘ || provincecode ||filepath) as bakpath from IEP_UPLOAD_DOCUMENT t where kng_status = 0 and provincecode=‘ah‘ and to_char(opertime,‘yyyy-mm-dd‘)=‘2014-12-24‘ " deltaQuery="select (provincecode || ‘_‘ || kng_id) as id from IEP_UPLOAD_DOCUMENT where kng_status = 0 and opertime > to_date(‘${dih.last_index_time}‘,‘yyyy-mm-dd hh24:mi:ss‘) order by opertime asc" deltaImportQuery="select * from ( select (provincecode || ‘_‘ || kng_id) as id, kng_id, kng_type as type, kng_title as title, provincecode, opertime, modify_date, url, pack_month_fee, pack_type, pack_sen_flow, filename_html, (‘/kmsinterface/jt/province_bak/‘ || provincecode || ‘/‘ || to_char(opertime, ‘yyyymmdd‘) || substr(filepath,instr(filepath,‘/‘,2)) || ‘/‘ ) as path, (‘/kmsinterface/‘ || provincecode ||filepath) as bakpath from IEP_UPLOAD_DOCUMENT t where kng_status = 0 ) where id = ‘${dih.delta.ID}‘" deletePKQuery="select (provincecode || ‘_‘ || kng_id) as id from IEP_UPLOAD_DOCUMENT where kng_status = 1 and opertime > to_date(‘${dih.last_index_time}‘,‘yyyy-mm-dd hh24:mi:ss‘) order by id desc" transformer="DateFormatTransformer,RegexTransformer" onError="skip"> <field column="ID" name="id" /> <field column="KNG_ID" name="kng_id" /> <field column="type" name="type" /> <field column="TITLE" name="title" /> <field column="PROVINCECODE" name="provincecode" /> <field column="OPERTIME" name="opertime" dateTimeFormat="yyyy-MM-dd HH:mm:ss"/> <field column="MODIFY_DATE" name="modify_date" dateTimeFormat="yyyy-MM-dd HH:mm:ss"/> <field column="URL" name="url" /> <field column="PACK_MONTH_FEE" name="pack_month_fee" /> <field column="PACK_TYPE" name="pack_type" /> <field column="PACK_SEN_FLOW" name="pack_sen_flow" /> <entity name="urllist1" processor="URLListEntityProcessor" baseDir="/kms/solr${province.PATH}" fileNames="${province.FILENAME_HTML}" regex=","> <!--解析附件--> <entity name="test1" processor="TikaEntityProcessor" url="${urllist1.fileName}" dataSource="url" format="text" transformer="HTMLStripTransformer,RegexTransformer" onError="skip"> <field column="text" name="content" stripHTML="true" regex="\t|\r|\n|\s" replaceWith="" /> </entity> </entity> <entity name="urllist2" processor="URLListEntityProcessor" baseDir="/kms/solr${province.BAKPATH}" fileNames="${province.FILENAME_HTML}" regex=","> <entity name="test2" processor="TikaEntityProcessor" url="${urllist2.fileName}" dataSource="url" format="text" transformer="HTMLStripTransformer,RegexTransformer" onError="skip"> <field column="text" name="content" stripHTML="true" regex="\t|\r|\n|\s" replaceWith="" /> </entity> </entity> </entity> </document> </dataConfig>