标签:method source 入库 ems 图片 search 解决 insert exec
使用spark生成大量数据过程中遇到问题,如果sc.parallelize(fukeData, 64);的记录数特别大比如500w,1000w时,会特别慢,而且会抛出内存溢出over head错误。解决方案,一次生成的数据量不高于100w,多次调用,这样下来一共生成2000w耗时十几分钟。
如果环境允许你可以在本地生成测试数据,然后上传到hdfs供spark测试。
import java.io.BufferedWriter; import java.io.FileWriter; import java.io.IOException; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Date; import java.util.List; import java.util.Random; public class FileGenerate { public static void main(String[] args) throws IOException { BufferedWriter writer = new BufferedWriter(new FileWriter("d://test.csv")); List<String> fukeData = new ArrayList<String>(); for (int i = 1; i <= 20000000; i++) { fukeData.add(String.valueOf(i)); } List<String> fields = new ArrayList<String>(); fields.add("id");// fields.add("object_id"); // fields.add("scan_start_time"); // fields.add("scan_stop_time");// fields.add("insert_time");// fields.add("enodeb_id"); for (int i = 0; i < 145; i++) { fields.add("mr_tadv_" + (i < 10 ? "0" + i : i)); } writer.write(String.join(",", fields) + "\r\n"); // 假设有1w个小区,数据一共200w条记录,那么每个小区有200条记录。 Random random = new Random(); for (String id : fukeData) { List<String> rowItems = new ArrayList<String>(); // id int intId = Integer.valueOf(id); rowItems.add(id); if (intId % 100000 == 0) { System.out.println(intId); writer.flush(); } // object_id String objectId = String.valueOf((intId % 10000) + 10000 * 256 + 1); rowItems.add(objectId); int hour = random.nextInt(5) + 2; int minute = random.nextInt(59) + 1; int second_start = random.nextInt(30) + 1; int second_stop = second_start + 15; String scan_start_time = "2018-10-29 1" + hour + ":" + minute + ":" + second_start; String scan_stop_time = "2018-10-29 1" + hour + ":" + minute + ":" + second_stop; // scan_start_time rowItems.add(scan_start_time); // scan_stop_time rowItems.add(scan_stop_time); // insert_time rowItems.add(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())); // enodeb_id rowItems.add(String.valueOf((int) Integer.valueOf(objectId) / 256)); for (int i = 0; i < 145; i++) { rowItems.add(String.valueOf(random.nextInt(100))); } writer.write(String.join(",", rowItems) + "\r\n"); } writer.flush(); writer.close(); } }
如下代码是spark2.2.0环境下生成2000w测试数据代码:
public class ESWriterTest extends Driver implements Serializable { private static final long serialVersionUID = 1L; private ExpressionEncoder<Row> encoder = null; private StructType type = null;
private String hdfdFilePath = "/user/my/streaming/test_es/*";
public ESWriterTest() { } @Override public void run() { initSchema(); generateTestData(); sparkSession.stop(); } private void initSchema() { type = DataTypes.createStructType(Arrays.asList(// DataTypes.createStructField("id", DataTypes.StringType, true), // DataTypes.createStructField("object_id", DataTypes.StringType, true), // DataTypes.createStructField("scan_start_time", DataTypes.StringType, true), // DataTypes.createStructField("scan_stop_time", DataTypes.StringType, true), // DataTypes.createStructField("insert_time", DataTypes.StringType, true), // DataTypes.createStructField("enodeb_id", DataTypes.StringType, true))); for (int i = 0; i < 145; i++) { type = type.add("mr_tadv_" + (i < 10 ? "0" + i : i), DataTypes.StringType); } encoder = RowEncoder.apply(type); } private void generateTestData() { generateData("/user/my/streaming/test_es/1/"); generateData("/user/my/streaming/test_es/2/"); generateData("/user/my/streaming/test_es/3/"); generateData("/user/my/streaming/test_es/4/"); generateData("/user/my/streaming/test_es/5/"); generateData("/user/my/streaming/test_es/6/"); generateData("/user/my/streaming/test_es/7/"); generateData("/user/my/streaming/test_es/8/"); generateData("/user/my/streaming/test_es/9/"); generateData("/user/my/streaming/test_es/10/"); generateData("/user/my/streaming/test_es/11/"); generateData("/user/my/streaming/test_es/12/"); generateData("/user/my/streaming/test_es/13/"); generateData("/user/my/streaming/test_es/14/"); generateData("/user/my/streaming/test_es/15/"); generateData("/user/my/streaming/test_es/16/"); generateData("/user/my/streaming/test_es/17/"); generateData("/user/my/streaming/test_es/18/"); generateData("/user/my/streaming/test_es/19/"); generateData("/user/my/streaming/test_es/20/"); // 支持的文件格式有:text、csv、json、parquet。 StructType structType = DataTypes.createStructType(Arrays.asList(DataTypes.createStructField("value", DataTypes.StringType, true))); Dataset<Row> rows = sparkSession.read().format("text").schema(structType).load(hdfdFilePath); rows.printSchema(); rows.show(10); } private void generateData(String hdfsDataFilePath) { List<Row> fukeData = new ArrayList<Row>(); for (int i = 1; i <= 1000000; i++) { fukeData.add(RowFactory.create(String.valueOf(i))); } StructType structType = DataTypes.createStructType(Arrays.asList(DataTypes.createStructField("id", DataTypes.StringType, false))); JavaSparkContext sc = new JavaSparkContext(sparkSession.sparkContext()); JavaRDD<Row> javaRDD = sc.parallelize(fukeData, 64); Dataset<Row> fukeDataset = sparkSession.createDataFrame(javaRDD, structType); Random random = new Random(); // 假设有1w个小区,数据一共200w条记录,那么每个小区有200条记录。 Dataset<Row> rows = fukeDataset.mapPartitions(new MapPartitionsFunction<Row, Row>() { private static final long serialVersionUID = 1L; @Override public Iterator<Row> call(Iterator<Row> idItems) throws Exception { List<Row> newRows = new ArrayList<Row>(); while (idItems.hasNext()) { String id = idItems.next().getString(0); List<Object> rowItems = new ArrayList<Object>(); // id int intId = Integer.valueOf(id); rowItems.add(id); // object_id String objectId = String.valueOf((intId % 10000) + 10000 * 256 + 1); rowItems.add(objectId); int hour = random.nextInt(5) + 2; int minute = random.nextInt(59) + 1; int second_start = random.nextInt(30) + 1; int second_stop = second_start + 15; String scan_start_time = "2018-10-29 1" + hour + ":" + minute + ":" + second_start; String scan_stop_time = "2018-10-29 1" + hour + ":" + minute + ":" + second_stop; // scan_start_time rowItems.add(scan_start_time); // scan_stop_time rowItems.add(scan_stop_time); // insert_time rowItems.add(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())); // enodeb_id rowItems.add(String.valueOf((int) Integer.valueOf(objectId) / 256)); for (int i = 0; i < 145; i++) { rowItems.add(String.valueOf(random.nextInt(100))); } newRows.add(RowFactory.create(rowItems.toArray())); } return newRows.iterator(); } }, encoder); rows.toJavaRDD().repartition(20).saveAsTextFile(hdfsDataFilePath); } }
下边是Spark2.2.0环境下,使用BulkProcessor方式插入2000w条记录到ES6.4.2下的测试代码,测试代码调测过程中发现问题:不能再ForeachPartitionFunction的call函数中调用client.close(),和bulkProcessor.close();函数,否则会抛出异常:原因这个client可能是多个executor共用。
private ExpressionEncoder<Row> encoder = null; private StructType type = null; private String hdfdFilePath = "/user/my/streaming/test_es/*"; public static void main(String[] args) { initSchema(); StructType structType = DataTypes.createStructType(Arrays.asList(DataTypes.createStructField("value", DataTypes.StringType, true))); Dataset<Row> lines = sparkSession.read().format("text").schema(structType).load(hdfdFilePath); Dataset<Row> rows = lines.map(new MapFunction<Row, Row>() { private static final long serialVersionUID = 1L; @Override public Row call(Row value) throws Exception { List<Object> itemsList = new ArrayList<Object>(); String line = value.getAs("value"); String[] fields = line.split(","); for (String filed : fields) { itemsList.add(filed); } return RowFactory.create(itemsList.toArray()); } }, encoder); rows.show(10); rows.toJSON().foreachPartition(new EsForeachPartitionFunction()); sparkSession.stop(); } private void initSchema() { type = DataTypes.createStructType(Arrays.asList(// DataTypes.createStructField("id", DataTypes.StringType, true), // DataTypes.createStructField("object_id", DataTypes.StringType, true), // DataTypes.createStructField("scan_start_time", DataTypes.StringType, true), // DataTypes.createStructField("scan_stop_time", DataTypes.StringType, true), // DataTypes.createStructField("insert_time", DataTypes.StringType, true), // DataTypes.createStructField("enodeb_id", DataTypes.StringType, true))); for (int i = 0; i < 145; i++) { type = type.add("mr_tadv_" + (i < 10 ? "0" + i : i), DataTypes.StringType); } encoder = RowEncoder.apply(type); }
EsForeachPartitionFunction.java
import java.net.InetAddress; import java.net.UnknownHostException; import java.util.HashMap; import java.util.Iterator; import java.util.Map; import java.util.concurrent.TimeUnit; import org.apache.spark.api.java.function.ForeachPartitionFunction; import org.elasticsearch.action.bulk.BackoffPolicy; import org.elasticsearch.action.bulk.BulkProcessor; import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.transport.client.PreBuiltTransportClient; public class EsForeachPartitionFunction implements ForeachPartitionFunction<String> { private static final long serialVersionUID = 1L; @Override public void call(Iterator<String> rows) throws Exception { TransportClient client = null; BulkProcessor bulkProcessor = null; try { client = getClient(); bulkProcessor = getBulkProcessor(client); } catch (Exception ex) { System.out.println(ex.getMessage() + "\r\n" + ex.getStackTrace()); } Map<String, Object> mapType = new HashMap<String, Object>(); while (rows.hasNext()) { @SuppressWarnings("unchecked") Map<String, Object> map = new com.google.gson.Gson().fromJson(rows.next(), mapType.getClass()); bulkProcessor.add(new IndexRequest("twitter", "tweet").source(map)); } try { // Flush any remaining requests bulkProcessor.flush(); System.out.println("--------------------------------bulkProcessor.flush(); over...------------------------"); } catch (Exception ex) { System.out.println("" + ex.getMessage() + "\r\n" + ex.getStackTrace()); } try { // Or close the bulkProcessor if you don‘t need it anymore bulkProcessor.awaitClose(10, TimeUnit.MINUTES); System.out.println("--------------------------------bulkProcessor.awaitClose(10, TimeUnit.MINUTES); over...------------------------"); } catch (Exception ex) { System.out.println("" + ex.getMessage() + "\r\n" + ex.getStackTrace()); } } private BulkProcessor getBulkProcessor(TransportClient client) { BulkProcessor bulkProcessor = BulkProcessor// .builder(client, new BulkProcessor.Listener() { @Override public void afterBulk(long arg0, BulkRequest arg1, BulkResponse arg2) { // TODO Auto-generated method stub System.out.println("结束afterBulk(long arg0, BulkRequest arg1, BulkResponse arg2)。。。。"); } @Override public void afterBulk(long arg0, BulkRequest arg1, Throwable arg2) { // TODO Auto-generated method stub System.out.println("结束afterBulk(long arg0, BulkRequest arg1, Throwable arg2)。。。。"); System.out.println(arg1.numberOfActions() + " data bulk failed,reason :" + arg2); } @Override public void beforeBulk(long arg0, BulkRequest arg1) { // TODO Auto-generated method stub System.out.println("开始。。。。"); } }) // .setBulkActions(10000)// .setBulkSize(new ByteSizeValue(64, ByteSizeUnit.MB))// .setFlushInterval(TimeValue.timeValueSeconds(5))// .setConcurrentRequests(1)// .setBackoffPolicy(BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3))// .build(); return bulkProcessor; } private TransportClient getClient() { Settings settings = Settings.builder()// .put("cluster.name", "es") // .put("client.transport.sniff", true)// .build(); PreBuiltTransportClient preBuiltTransportClient = new PreBuiltTransportClient(settings); TransportClient client = preBuiltTransportClient; // 10.205.201.97,10.205.201.98,10.205.201.96,10.205.201.95 try { client.addTransportAddress(new TransportAddress(InetAddress.getByName("10.205.201.97"), 9300)); client.addTransportAddress(new TransportAddress(InetAddress.getByName("10.205.201.98"), 9300)); client.addTransportAddress(new TransportAddress(InetAddress.getByName("10.205.201.96"), 9300)); client.addTransportAddress(new TransportAddress(InetAddress.getByName("10.205.201.95"), 9300)); } catch (UnknownHostException e) { e.printStackTrace(); throw new RuntimeException(e); } return client; } }
依赖pom.xml
<!--Spark --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>2.2.0</version> </dependency> <dependency> <groupId>org.elasticsearch</groupId> <artifactId>elasticsearch-spark-20_2.11</artifactId> <version>6.4.2</version> </dependency> <dependency> <groupId>org.elasticsearch.client</groupId> <artifactId>transport</artifactId> <version>6.4.2</version> </dependency>
测试速度有点低3500条记录/s
关于ES+SPARK如何优化的文章请参考:
《Elasticsearch进阶(一)写入性能基准测试写入性能优化(56小时到5小时),chunk_size探讨》
《elasticsearch写入优化记录,从3000到8000/s》
《Spark2.x写入Elasticsearch的性能测试》
Spark下生成测试数据,并在Spark环境下使用BulkProcessor将测试数据入库到ES6.4.2
标签:method source 入库 ems 图片 search 解决 insert exec
原文地址:https://www.cnblogs.com/yy3b2007com/p/9885040.html