package mongodb; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.net.URI; import java.util.Arrays; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.CompressionCodecFactory; import org.apache.hadoop.io.compress.CompressionInputStream; import org.apache.hadoop.util.ReflectionUtils; class Item implements Comparable { String value; double weight; public Item(String v) { value=v; weight = Double.parseDouble(value.split(":")[1]); } public int compareTo(Object o) { return this.weight == ((Item) o).weight ? 0 : (this.weight > ((Item) o).weight ? -1 : 1); } } public class BatchUpdateSim { public static String parse(String str) { //String str="90003 20718001:1.0,2077635:1.0,2053809:1.0"; String[] fields=str.split("\t"); String[] valueArray= fields[1].split(","); Item[] items = new Item[valueArray.length]; for(int i=0;i<items.length;i++) { items[i]=new Item(valueArray[i]); } Arrays.sort(items); for(int i=0;i<valueArray.length;i++) { valueArray[i] = "{"+items[i].value+"}"; } String valueStr = StringUtils.join(valueArray,","); return "{\"key\":"+fields[0]+",\"values\":["+valueStr+"]}"; } public static void main(String[] args) throws IOException, ClassNotFoundException { // TODO Auto-generated method stub Configuration conf = new Configuration (); conf.addResource("/usr/local/hadoop/conf/core-site.xml"); conf.addResource("/usr/local/hadoop/conf/hdfs-site.xml"); conf.addResource("/usr/local/hadoop/conf/mapred-site.xml"); //String HDFS="hdfs://webdm-cluster"; //String HDFS="hdfs://localhost:9000"; String HDFS=args[0]; FileSystem hdfs = FileSystem.get(URI.create(HDFS),conf); String filePath=args[1]; FSDataInputStream fin = hdfs.open(new Path(filePath)); //CompressionCodecFactory factory = new CompressionCodecFactory(conf); //CompressionCodec codec = factory.getCodec(new Path(filePath)); //根据hdfs文件的后缀类型自动识别 //Class<?> codecClass = Class.forName("com.hadoop.compression.lzo.LzoCodec"); //CompressionCodec codec = (CompressionCodec)ReflectionUtils.newInstance(codecClass, conf); BufferedReader reader = null; String line; int count=0; RecsysDb db = RecsysDb.getInstance(); String itemSimName=args[2]; try { // if (codec == null) // { reader = new BufferedReader(new InputStreamReader(fin)); // in = new BufferedReader(new InputStreamReader(fin, "UTF-8")); // } /* else { System.out.println("识别出压缩类型"); CompressionInputStream comInputStream = codec.createInputStream(fin); reader = new BufferedReader(new InputStreamReader(comInputStream)); } */ while ((line = reader.readLine()) != null) { //if(count==0) System.out.println(line); String strJson=parse(line); db.updateItemSim(itemSimName, strJson); count++; if(count%1000==0)System.out.println("count:"+count); } } finally { if (reader != null) { reader.close(); } System.out.println(count); } } }
hd_core="/usr/local/hadoop/hadoop-core-1.1.2.jar"; s4j="/usr/local/hadoop/lib/slf4j-log4j12-1.6.1.jar"; s4japi="/usr/local/hadoop/lib/slf4j-api-1.6.1.jar"; log4j="/usr/local/hadoop/lib/log4j-1.2.17.jar"; guva="/usr/local/hadoop/lib/guava-11.0.2.jar"; clog="/usr/local/hadoop/lib/commons-logging-1.1.1.jar"; cconf="/usr/local/hadoop/lib/commons-configuration-1.6.jar"; cl="/usr/local/hadoop/lib/commons-lang-2.5.jar"; ccli="/usr/local/hadoop/lib/commons-cli-1.2.jar"; protobuf="/usr/local/hadoop/lib/protobuf-java-2.4.0a.jar"; hdfs="/usr/local/hadoop/lib/hadoop-hdfs-1.1.2.jar"; mongodb="/data/home/liulian/linger/jars/mongo-java-driver-2.12.4.jar"; libs=( $hd_core $s4j $s4japi $log4j $guva $clog $cconf $cl $ccli $protobuf $hdfs $mongodb ) libstr="" for jarlib in ${libs[@]}; do libstr=${jarlib}":"${libstr} done echo $libstr; java -Xbootclasspath/a:${libstr} -jar ../jars/updateSim.jar hdfs://10.200.91.164:9000 tv_sim/result/000000_0 tvItemsimColl
原文地址:http://blog.csdn.net/lingerlanlan/article/details/42178675