标签:serialize comment ams == map curd mysql ado while
Hadoop中有很多组件,为了实现复杂的功能通常都是使用混合架构,
其实跟Hbase是有点像的
1:支持主键(类似 关系型数据库)
2:支持事务操作,可对数据增删改查数据
3:支持各种数据类型
4:支持 alter table。可删除列(非主键)
5:支持 INSERT, UPDATE, DELETE, UPSERT
6:支持Hash,Range分区
进入Impala-shell -i node1ip
具体的CURD语法可以查询官方文档,我就不一一列了
http://kudu.apache.org/docs/kudu_impala_integration.html
建表
Create table kudu_table (Id string,Namestring,Age int,
Primary key(id,name)
)partition by hash partitions 16
Stored as kudu;
插入数据
Insert into kudu_table
Select * from impala_table;
注意
以上的sql语句都是在impala里面执行的。Kudu和hbase一样都是nosql查询的,Kudu本身只提供api。impala集成了kudu。
奉上我的Git地址:
https://github.com/LinMingQiang/spark-util/tree/spark-kudu
pom.xml
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-metastore</artifactId>
<version>1.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-jdbc</artifactId>
<version>1.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-service</artifactId>
<version>1.1.0</version>
<exclusions>
<exclusion>
<artifactId>servlet-api</artifactId>
<groupId>javax.servlet</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.kudu</groupId>
<artifactId>kudu-client</artifactId>
<version>1.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.10</artifactId>
<version>1.6.0</version>
</dependency>
<dependency>
<groupId>org.kududb</groupId>
<artifactId>kudu-spark_2.10</artifactId>
<version>1.3.1</version>
</dependency>
<dependency>
<groupId>org.apache.kudu</groupId>
<artifactId>kudu-mapreduce</artifactId>
<version>1.3.1</version>
<exclusions>
<exclusion>
<artifactId>jsp-api</artifactId>
<groupId>javax.servlet.jsp</groupId>
</exclusion>
<exclusion>
<artifactId>servlet-api</artifactId>
<groupId>javax.servlet</groupId>
</exclusion>
</exclusions>
val client = new KuduClientBuilder("master2").build()
val table = client.openTable("impala::default.kudu_pc_log")
client.getTablesList.getTablesList.foreach { println }
val schema = table.getSchema();
val kp = KuduPredicate.newComparisonPredicate(schema.getColumn("id"), KuduPredicate.ComparisonOp.EQUAL, "1")
val scanner = client.newScanTokenBuilder(table)
.addPredicate(kp)
.limit(100)
.build()
val token = scanner.get(0)
val scan = KuduScanToken.deserializeIntoScanner(token.serialize(), client)
while (scan.hasMoreRows()) {
val results = scan.nextRows()
while (results.hasNext()) {
val rowresult = results.next();
println(rowresult.getString("id"))
}
}
val sc = new SparkContext(new SparkConf().setMaster("local").setAppName("Test"))
val sparksql = new SQLContext(sc)
import sparksql.implicits._
val a = new KuduContext(kuduMaster, sc)
def getKuduRDD() {
val tableName = "impala::default.kudu_pc_log"
val columnProjection = Seq("id", "name")
val kp = KuduPredicate.newComparisonPredicate(new ColumnSchemaBuilder("id", Type.STRING).build(), KuduPredicate.ComparisonOp.EQUAL, "q")
val df = a.kuduRDD(sc, tableName, columnProjection,Array(kp))
df.foreach { x => println(x.mkString(",")) }
}
def writetoKudu() {
val tableName = "impala::default.student"
val rdd = sc.parallelize(Array("k", "b", "a")).map { n => STU(n.hashCode, n) }
val data = rdd.toDF()
a.insertRows(data, tableName)
}
case class STU(id: Int, name: String)
标签:serialize comment ams == map curd mysql ado while
原文地址:http://www.cnblogs.com/liuchuanfeng/p/7212033.html