Spark RPC 使用说明
概述
想通过 spark RPC 实现服务端则须实现
ThreadSafeRpcEndpoint 或 RpcEndpoint
一般通过实现前者来实现自己的服务,如同字面意思是线程安全的
一般需要实现4个方法
onStart 服务启动时一些内部初始化和启动其他线程服务都在这里处理
receive 接收client发过来的请求,但是不需要回复
receiveAndReply 接受client发过来的请求,并返回response
onStop 服务结束时需要做的一些清理动作在这里处理
Server端示例代码
package org.apache.spark
import java.text.SimpleDateFormat
import java.util.concurrent.ScheduledFuture
import java.util.Date
import java.util.concurrent.TimeUnit
import org.apache.spark.internal.Logging
import org.apache.spark.util.{ThreadUtils, Utils}
import org.apache.spark.rpc.{RpcCallContext, RpcEndpoint, RpcEnv, ThreadSafeRpcEndpoint}
/**
* Created by cloud on 18/1/18.
*/
class ZsparkRpcServer(
override val rpcEnv: RpcEnv,
val conf : SparkConf
) extends ThreadSafeRpcEndpoint with Logging{
val scheduledThread = ThreadUtils.newDaemonSingleThreadScheduledExecutor("echo-thread")
var scheduledThreadFuture : ScheduledFuture[_] = _
override def onStart(): Unit = {
scheduledThreadFuture = scheduledThread.scheduleWithFixedDelay(new Runnable {
override def run(): Unit = {
val simpleTime = new SimpleDateFormat("yy-MM-dd HH:mm:ss")
logInfo(simpleTime.format(new Date()))
}
},3000L,2000L,TimeUnit.MILLISECONDS)
}
override def receive: PartialFunction[Any, Unit] = {
case ZRequest(command) => logInfo(command.toUpperCase)
}
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
case ZRequest(command) => context.reply(ZResponse(command.reverse))
case _ => context.reply(ZResponse("ECHO".reverse))
}
override def onStop(): Unit = {
if(scheduledThreadFuture != null){
scheduledThreadFuture.cancel(true)
}
scheduledThread.shutdownNow()
}
}
object ZsparkRpcServer{
val SN="z-cloud-echo"
val EN="z-cloud-echo-ser"
def main(args : Array[String]): Unit = {
val conf = new SparkConf()
val securityManager = new SecurityManager(conf)
val rpcEnv = RpcEnv.create(SN,Utils.localHostName(),23456,conf,securityManager)
rpcEnv.setupEndpoint(EN,new ZsparkRpcServer(rpcEnv,conf))
rpcEnv.awaitTermination()
}
}
case class ZRequest(command : String)
case class ZResponse(result : String)
Client端示例代码
object ZsparkRpcClient{
def main(args: Array[String]): Unit = {
val host=Utils.localHostName()
val port=23456
val sparkConf = new SparkConf()
val securityManager = new SecurityManager(sparkConf)
val rpcEnv = RpcEnv.create(ZsparkRpcServer.SN,host,host,port,sparkConf,securityManager,true)
val rpcEnvRef = rpcEnv.setupEndpointRef(RpcAddress(host,port),ZsparkRpcServer.EN)
/*
//异步获取response的方式
val res=rpcEnvRef.ask[ZResponse](ZRequest("cli-echo"))
res.onComplete {
case Success(v) => println(v)
case Failure(e) => println(e)
}(ThreadUtils.sameThread)
//发送不需要Server端回复的消息
rpcEnvRef.send(ZRequest("non-response"))
*/
/*
* 同步获取response的方式
* */
for (i <- args){
rpcEnvRef.send(ZRequest(i))
val rpcTimeout=new RpcTimeout(FiniteDuration(3000L,TimeUnit.MILLISECONDS),"timeout")
val res = rpcEnvRef.askSync[ZResponse](ZRequest(i),rpcTimeout)
println(res.result)
}
}
}
启动RPC服务
#可以使用这种spark 封装脚本来执行,也可以自己构建执行环境
#使用spark封装脚本的好处就是处理简单可直接使用spark相关环境管理和配置管理,以及日志管理等等
if [ -z "${SPARK_HOME}" ]; then
export SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)"
fi
. "${SPARK_HOME}/sbin/spark-config.sh"
. "${SPARK_HOME}/bin/load-spark-env.sh"
exec "${SPARK_HOME}/sbin"/spark-daemon.sh start org.apache.spark.rpcDemo 1 "$@"