标签:collect too clu poi slice name 交互 shel spark
Spring Cloud Flow与Apache Spark集成点击左上角,关注:“锅外的大佬”
专注分享国外最新技术内容
帮助每位开发者更优秀地成长
SpringCloudDataFlow是用于构建数据集成和实时数据处理管道的工具包。 在这种情况下,管道(Pipelines)是使用 SpringCloudStream或 SpringCloudTask框架构建的 SpringBoot应用程序。
在本教程中,我们将展示如何将 SpringCloudDataFlow与 ApacheSpark一起使用。
首先,我们需要运行数据流服务器(Data Flow Server)才能部署我们的作业( jobs)。要在本地运行数据流服务器,需要使用 spring-cloud-starter-dataflow-server-local依赖创建一个新项目:
<dependency>
<groupId>
org.springframework.cloud
</groupId>
<artifactId>
spring-cloud-starter-dataflow-server-local
</artifactId>
<version>
1.7.4.RELEASE
</version>
</dependency>
之后,使用 @EnableDataFlowServer来注解服务中的主类(main class):
@EnableDataFlowServer
@SpringBootApplication
public
class
SpringDataFlowServerApplication
{
public
static
void
main
(
String
[]
args
)
{
SpringApplication
.
run
(
SpringDataFlowServerApplication
.
class
,
args
);
}
}
运行此应用程序后,本地数据流服务运行在端口 9393。
我们将 SparkJob作为本地单体应用程序创建,这样我们就不需要任何集群来运行它。
首先,添加 Spark依赖
<dependency>
<groupId>
org.apache.spark
</groupId>
<artifactId>
spark-core_2.10
</artifactId>
<version>
2.4.0
</version>
</dependency>
对 job来说,就是为了求 pi的近似值:
public
class
PiApproximation
{
public
static
void
main
(
String
[]
args
)
{
SparkConf
conf
=
new
SparkConf
().
setAppName
(
"BaeldungPIApproximation"
);
JavaSparkContext
context
=
new
JavaSparkContext
(
conf
);
int
slices
=
args
.
length
>=
1
?
Integer
.
valueOf
(
args
[
0
])
:
2
;
int
n
=
(
100000L
*
slices
)
>
Integer
.
MAX_VALUE
?
Integer
.
MAX_VALUE
:
100000
*
slices
;
List
<
Integer
>
xs
=
IntStream
.
rangeClosed
(
0
,
n
)
.
mapToObj
(
element
->
Integer
.
valueOf
(
element
))
.
collect
(
Collectors
.
toList
());
JavaRDD
<
Integer
>
dataSet
=
context
.
parallelize
(
xs
,
slices
);
JavaRDD
<
Integer
>
pointsInsideTheCircle
=
dataSet
.
map
(
integer
->
{
double
x
=
Math
.
random
()
*
2
-
1
;
double
y
=
Math
.
random
()
*
2
-
1
;
return
(
x
*
x
+
y
*
y
)
<
1
?
1
:
0
;
});
int
count
=
pointsInsideTheCircle
.
reduce
((
integer
,
integer2
)
->
integer
+
integer2
);
System
.
out
.
println
(
"The pi was estimated as:"
+
count
/
n
);
context
.
stop
();
}
}
DataFlowShell是一个 允许我们与服务器交互的应用程序。 Shell使用 DSL命令来描述数据流。
要使用 DataFlowShell,我们要创建一个运行它的项目。 首先,需要 spring-cloud-dataflow-shell依赖:
<dependency>
<groupId>
org.springframework.cloud
</groupId>
<artifactId>
spring-cloud-dataflow-shell
</artifactId>
<version>
1.7.4.RELEASE
</version>
</dependency>
添加依赖项后,我们可以创建主类来运行 DataFlowShell:
@EnableDataFlowShell
@SpringBootApplication
public
class
SpringDataFlowShellApplication
{
public
static
void
main
(
String
[]
args
)
{
SpringApplication
.
run
(
SpringDataFlowShellApplication
.
class
,
args
);
}
}
为了部署我们的项目,可在三个版本( cluster, yarn和 client)中使用 ApacheSpark所谓 任务运行器(task runner)—— 我们将使用 client版本。 任务运行器(task runner)是真正运行 Sparkjob的实例。为此,我们首先需要使用 DataFlowShell注册 task:
app
register
--
type task
--
name spark
-
client
--
uri
maven
:
//org.springframework.cloud.task.app:spark-client-task:1.0.0.BUILD-SNAPSHOT
task允许我们指定多个不同的参数,其中一些参数是可选的,但是一些参数是正确部署 Sparkjob所必需的:
task create spark1
definition
"spark-client spark.app-name=my-test-pi --spark.app-class=com.baeldung.spring.cloud.PiApproximation spark.app-jar=/apache-spark-job-0.0.1-SNAPSHOT.jar --spark.app-args=10"
请注意, spark.app-jar是我们 job中 fat-jar的路径。成功创建任务后,我们可以使用以下命令继续运行它:
task launch spark1
这将调用 task的执行。
在本教程中,我们展示了如何使用 SpringCloudDataFlow框架来处理 ApacheSpark数据。 有关 SpringCloudDataFlow框架的更多信息,请参阅文档。
所有代码示例都可以在 GitHub上找到。
原文链接:https://www.baeldung.com/spring-cloud-data-flow-spark
作者:baeldung
译者:Leesen
推荐阅读:快速掌握FileChannel
上篇好文:Spring Boot中使用RSocket
点击在看,和我一起帮助更多开发者!
Spring Cloud Flow与Apache Spark集成
标签:collect too clu poi slice name 交互 shel spark
原文地址:https://blog.51cto.com/14901350/2524842