标签:tst ring new t throws thread private config 插入 with
<dependencies>
<dependency>
<groupId>ru.yandex.clickhouse</groupId>
<artifactId>clickhouse-jdbc</artifactId>
<version>0.3.1</version>
</dependency>
<dependency>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
<version>1.2</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>3.1.1</version>
<!-- <scope>provided</scope>-->
</dependency>
<!-- 注意版本号,版本号不对也会报错 -->
<dependency>
<groupId>org.codehaus.janino</groupId>
<artifactId>janino</artifactId>
<version>3.0.8</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.12</version>
</dependency>
</dependencies>
package com.ley;
import com.ley.pojo.Person;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Arrays;
import java.util.List;
public class Application_Spark {
public static void main(String[] args) {
SparkConf conf = new SparkConf();
conf.setMaster("local[2]");
conf.setAppName("spark with ch");
SparkSession session = SparkSession.builder().config(conf).getOrCreate();
JavaSparkContext context = new JavaSparkContext(session.sparkContext());
List<Person> personList = Arrays.asList(
new Person(1,2,3,4,"qq qb", GenderEnum.male),
new Person(11,12,13,14,"blabla", GenderEnum.female)
);
JavaRDD<Person> rdd = context.parallelize(personList);
rdd.foreachPartition(rowIterator -> {
String sqlText = "insert into user (uage, name, gender) values (";
try{
Connection connection = MyDBManager.instance().get();
Statement statement = connection.createStatement();
// TODO convert to batch insert
while (rowIterator.hasNext()){
Person person = rowIterator.next();
sqlText += person.getUage() + "," + "‘" + person.getName() + "‘," + "‘" + person.getGender().getKey() + "‘)";
statement.execute(sqlText);
}
}catch (SQLException e){
e.printStackTrace();
}finally {
MyDBManager.instance().returnBack();
}
});
MyDBManager.instance().dispose();
}
}
// 自定义数据库连接池
public class MyDBManager {
private List<MyConnection> CONNECTION_POOL = new ArrayList<>();
private static ThreadLocal<MyConnection> CACHED_CONNECTION = new ThreadLocal<>();
private int NUMBER = 10;
private String username = "default";
private String password = "123456";
private String url = "jdbc:clickhouse://cdh101:8123/default";
private static MyDBManager INSTANCE;
static {
try {
INSTANCE = new MyDBManager();
} catch (SQLException e) {
e.printStackTrace();
}
}
@Data
@AllArgsConstructor
private class MyConnection {
private Connection connection;
private boolean used;
// not implement yet.
private int timeout;
}
private MyDBManager() throws SQLException {
for(int i = 0; i < NUMBER; i++) {
Connection connection = DriverManager.getConnection(url, username, password);
CONNECTION_POOL.add(new MyConnection(connection, false, -1));
}
}
public static MyDBManager instance() {
return INSTANCE;
}
public synchronized Connection get() {
MyConnection myConnection = null;
while(true) {
Optional<MyConnection> connection = CONNECTION_POOL.stream().filter(x -> !x.used).findFirst();
if(connection.isPresent()) {
myConnection = connection.get();
break;
}
sleep();
}
myConnection.setUsed(true);
CACHED_CONNECTION.remove();
CACHED_CONNECTION.set(myConnection);
return myConnection.getConnection();
}
private void sleep() {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
/**
* should provide timeout check in another thread.
*/
public synchronized void returnBack() {
MyConnection myConnection = CACHED_CONNECTION.get();
myConnection.setUsed(false);
CACHED_CONNECTION.remove();
}
public void dispose() {
CONNECTION_POOL.stream().forEach(x -> {
try {
x.getConnection().close();
} catch (SQLException e) {
e.printStackTrace();
}
});
}
}
// 测试对象
@NoArgsConstructor
@AllArgsConstructor
@Data
public class Person implements Serializable {
private int uage;
private int age;
private int age2;
private int age3;
private String name;
private GenderEnum gender;
}
public enum GenderEnum {
male("male", 0), female("female", 1);
private String key;
private int val;
GenderEnum(String key, int val) {
this.key = key;
this.val= val;
}
public String getKey() {
return key;
}
}
标签:tst ring new t throws thread private config 插入 with
原文地址:https://www.cnblogs.com/abc608088/p/14810196.html