码迷,mamicode.com
首页 > 其他好文 > 详细

spark cassandra connector 使用

时间:2016-04-24 18:28:43      阅读:194      评论:0      收藏:0      [点我收藏+]

标签:

1、cassandra 准备

启动cqlsh,

 

CQLSH_HOST=172.16.163.131 bin/cqlsh

 

cqlsh>CREATE KEYSPACE productlogs WITH REPLICATION = { ‘class‘ : ‘org.apache.cassandra.locator.SimpleStrategy‘, ‘replication_factor‘: ‘2‘ } 

cqlsh>CREATE TABLE productlogs.logs (
    ids uuid,
    app_name text,
    app_version text,
    city text,
    client_time timestamp,
    country text,
    created_at timestamp,
    cs_count int,
    device_id text,
    id int,
    modle_name text,
    province text,
    remote_ip text,
    updated_at timestamp,
    PRIMARY KEY (ids)
)

 

2、spark cassandra conector jar包

新建空项目,使用sbt,引入connector,打包为spark-cassandra-connector-full.jar

这步的意义在于:官方的connector包没有将依赖打进去,所以,直接使用官方包的时候,需要自己将依赖找出来。不同版本依赖的包及版本也不相同,简单起见,直接打一个full包

3、启动spark-shell

 

/opt/db/spark-1.5.2-bin-hadoop2.6/bin/spark-shell --master spark://u1:7077  --jars ~/spark-cassandra-connector-full.jar

以下为sparkshell 命令

4、准备数据源:

//可能大多数文档都先stop掉当前sc,再重启一个,其实根本没必要,直接在原有sc上添加cassandra的参数就好
scala>sc.getConf.set("spark.cassandra.connection.host", "172.16.163.131")
//读取HDFS上的数据源
scala>val df = sc.textFile("/data/logs")
//引入需要的命令空间
scala>import org.apache.spark.sql._
scala>import org.apache.spark.sql.types._
scala>import com.datastax.spark.connector._
scala>import java.util.UUID
//定义shcmea
scala>val schema = StructType(
  StructField("ids", StringType, true) ::
    StructField("id", IntegerType, true) ::
    StructField("app_name", StringType, true) ::
    StructField("app_version", StringType, true) ::
    StructField("client_time", TimestampType, true) ::
    StructField("device_id", StringType, true) ::
    StructField("modle_name", StringType, true) ::
    StructField("cs_count", IntegerType, true) ::
    StructField("created_at", TimestampType, true) ::
    StructField("updated_at", TimestampType, true) ::
    StructField("remote_ip", StringType, true) ::
    StructField("country", StringType, true) ::
    StructField("province", StringType, true) ::
   StructField("city", StringType, true) :: Nil)
//指定数据源的schema
scala>val rowRDD = df.map(_.split("\t")).map(p => Row(UUID.randomUUID().toString(), p(0).toInt, p(1), p(2), java.sql.Timestamp.valueOf(p(3)), p(4), p(5), p(6).toInt, java.sql.Timestamp.valueOf(p(7)), java.sql.Timestamp.valueOf(p(8)), p(9), p(10), p(11), p(12)))
scala>val df= sqlContext.createDataFrame(rowRDD, schema)
scala>df.registerTempTable("logs")
//看下结果
scala>sqlContext.sql("select * from logs limit 1").show

 

5、将数据存入cassandra

scala>import org.apache.spark.sql.cassandra._
scala>df.write.format("org.apache.spark.sql.cassandra").options(Map("table" -> "logs", "keyspace" -> "productlogs")).save()

6、取出刚存的数据:

scala>import org.apache.spark.sql.cassandra._
scala>val cdf = sqlContext.read.
  format("org.apache.spark.sql.cassandra").
  options(Map("table" -> "logs", "keyspace" -> "productlogs")).
  load().registerTempTable("logs")
scala>sqlContext.sql("select * from logs_jsut_save limit 1").show

   

 

spark cassandra connector 使用

标签:

原文地址:http://www.cnblogs.com/piaolingzxh/p/5427568.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!