标签:share 原因 arc warning 流式 输出 recover center output
https://blog.csdn.net/shshheyi/article/details/84893371
[root@localhost ~]# vim /etc/hosts # 三台机器都需要操作
192.168.28.131 master
192.168.77.130 slave1
192.168.77.134 slave2
注:修改hosts中,是立即生效的,无需source或者
vim /etc/sysconfig/network
HOSTNAME=master #最后一行添加
[root@localhost ~]# reboot
使用uname -a 可以查看hostname是多少,就可以知道是否修改生效了
集群之间的机器需要相互通信,所以我们得先配置免密码登录。在三台机器上分别运行如下命令,生成密钥对
[root@master ~]# ssh-keygen -t rsa # 三台机器都需要执行这个命令生成密钥对
Generating public/private rsa key pair.
Enter file in which to save the key (/root/.ssh/id_rsa):
Enter passphrase (empty for no passphrase):
Enter same passphrase again:
Your identification has been saved in /root/.ssh/id_rsa.
Your public key has been saved in /root/.ssh/id_rsa.pub.
The key fingerprint is:
0d:00:bd:a3:69:b7:03:d5:89:dc:a8:a2:ca:28:d6:06 root@hadoop000
The key‘s randomart image is:
+--[ RSA 2048]----+
| .o. |
| .. |
| . *.. |
| B +o |
| = .S . |
| E. * . |
| .oo o . |
|=. o o |
|*.. . |
+-----------------+
[root@master ~]# cat ~/.ssh/id_rsa.pub >> ~/.ssh/authorized_keys
[root@master ~]# ls .ssh/
authorized_keys id_rsa id_rsa.pub known_hosts
以master为主,执行以下命令,分别把公钥拷贝到其他机器上
[root@master ~]# ssh-copy-id -i ~/.ssh/id_rsa.pub master
[root@master ~]# ssh-copy-id -i ~/.ssh/id_rsa.pub slave1
[root@master ~]# ssh-copy-id -i ~/.ssh/id_rsa.pub slave2
http://www.oracle.com/technetwork/java/javase/downloads/jdk8-downloads-2133151.html
root用户使用wget命令将JDK下载到/usr/local/src/目录下
cd /usr/local/src/
tar -zxvf jdk-8u151-linux-x64.tar.gz
mv ./jdk1.8.0_151 /usr/local/jdk1
vim /etc/profile # 增加如下内容
export JAVA_HOME=/usr/local/jdk1.8.0_181
export JRE_HOME=${JAVA_HOME}/jre
export CLASSPATH=${JAVA_HOME}/lib:${JRE_HOME}/lib:${JRE_HOME}/lib/charsets.jar
export PATH=$PATH:$JAVA_HOME/bin:/usr/local/mysql/bin/
source /etc/profile #使文件生效
为了便于管理,给Master的hdfs的NameNode、DataNode及临时文件,在用户目录下创建目录
mkdir -p /data/hdfs/name
mkdir -p /data/hdfs/data
mkdir -p /data/hdfs/tmp
然后将这些目录通过scp命令拷贝到Slave1和Slave2的相同目录下。
首先到Apache官网(http://www.apache.org/dyn/closer.cgi/hadoop/common/)下载Hadoop,从中选择推荐的下载镜像(https://hadoop.apache.org/releases.html),我选择hadoop-3.2.0的版本,并使用以下命令下载到Master机器的/usr/local/目录
cd /usr/local
wget https://mirrors.cnnic.cn/apache/hadoop/common/stable/hadoop-3.2.0.tar.gz
tar -zxvf hadoop-3.2.0.tar.gz
vim /etc/profile
export HADOOP_HOME=/usr/local/hadoop-3.2.0
export PATH=$HADOOP_HOME/bin:$PATH
source /etc/profile #使环境变量生效
hadoop #发现可以有提示了,则表示配置生效了
进入目录/usr/local/hadoop-3.2.0/etc/hadoop,依次修改core-site.xml、hdfs-site.xml、mapred-site.xml、yarn-site.xml以及slaves文件
cd /usr/local/hadoop-3.2.0/etc/hadoop
ls
vim core-site.xml
<configuration>
<!-- 指定hadoop运行时产生文件的存储路径 -->
<property>
<name>hadoop.tmp.dir</name>
<value>file:/data/hdfs/tmp</value>
<description>A base for other temporary directories.</description>
</property>
<property>
<name>io.file.buffer.size</name>
<value>131072</value>
</property>
<!-- 指定HDFS老大(namenode)的通信地址 -->
<property>
<name>fs.default.name</name> #fs.defaultFS 集群模式
<value>hdfs://master:9000</value> #主节点上改为hdfs://0.0.0.0:9000
</property>
<property>
<name>hadoop.proxyuser.root.hosts</name>
<value>*</value>
</property>
<property>
<name>hadoop.proxyuser.root.groups</name>
<value>*</value>
</property>
</configuration>
注意:hadoop.tmp.dir的value填写对应前面创建的目录
vim hdfs-site.xml
<configuration>
<!-- 设置hdfs副本数量 -->
<property>
<name>dfs.replication</name>
<value>2</value>
</property>
<!-- 设置namenode存放的路径 -->
<property>
<name>dfs.namenode.name.dir</name>
<value>file:/data/hdfs/name</value>
<final>true</final>
</property>
<property>
<!-- 设置datanode存放的路径 -->
<name>dfs.datanode.data.dir</name>
<value>file:/data/hdfs/data</value>
<final>true</final>
</property>
<!-- 设置namenode的http通讯地址 -->
<property>
<name>dfs.namenode.secondary.http-address</name>
<value>master:9001</value>
</property>
<property>
<name>dfs.webhdfs.enabled</name>
<value>true</value>
</property>
<property>
<name>dfs.permissions</name>
<value>false</value>
</property>
<!-- 主节点地址 -->
<property>
<name>dfs.namenode.http-address</name>
<value>master:50070</value> #主节点上修改为0.0.0.0:50070
<description>开启50070端口,不然web不能访问hadoop</description>
</property>
</configuration>
注意:dfs.namenode.name.dir和dfs.datanode.data.dir的value填写对应前面创建的目录
复制template,生成xml,命令如下:
cp mapred-site.xml.template mapred-site.xml
vim mapred-site.xml
<!-- 通知框架MR使用YARN -->
<configuration>
<property>
<name>mapreduce.framework.name</name>
<value>yarn</value>
</property>
<property>
<name>mapreduce.jobhistory.address</name>
<value>master:10020</value>
</property>
<property>
<name>mapreduce.jobhistory.webapp.address</name>
<value>master:19888</value>
</property>
</configuration>
vim yarn-site.xml
<property>
<name>yarn.resourcemanager.address</name>
<value>master:18040</value>
</property>
<property>
<name>yarn.resourcemanager.scheduler.address</name>
<value>master:18030</value>
</property>
<property>
<name>yarn.resourcemanager.webapp.address</name>
<value>master:18088</value>
</property>
<property>
<name>yarn.resourcemanager.resource-tracker.address</name>
<value>master:18025</value>
</property>
<property>
<name>yarn.resourcemanager.admin.address</name>
<value>master:18141</value>
</property>
<!-- reducer取数据的方式是mapreduce_shuffle -->
<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce.shuffle</value>
</property>
<property>
<name>yarn.nodemanager.aux-services.mapreduce.shuffle.class</name>
<value>org.apache.hadoop.mapred.ShuffleHandler</value>
</property>
cd /usr/local/hadoop-3.2.0/etc/hadoop
vim hadoop-env.sh
添加
export JAVA_HOME=/usr/local/jdk1.8.0_181
将原来的localhost删除,改成如下内容
vim /usr/local/hadoop-3.2.0/etc/hadoop/workers
vim /usr/local/hadoop-3.2.0/etc/hadoop/slaves
最后,将整个hadoop-3.2.0文件夹及其子文件夹使用scp复制到slave1和slave2的相同目录中:
scp -r /usr/local/hadoop-3.2.0 root@slave1: /usr/local
scp -r /usr/local/hadoop-3.2.0 root@slave2: /usr/local
在每台机子上都执行此操作:
# systemctl stop firewalld && systemctl disable firewalld
systemctl stop firewalld.service #停止防火墙
systemctl disable firewalld.service #禁止防火墙开机启动
firewall-cmd --state #检查防火墙状态
永久关闭selinux
vi /etc/selinux/config
登录阿里云——》云服务ECS——》网络与安全(选择安全组)——》点击对应得实例
执行命令:
hadoop namenode -format
# 开启dfs,包括namenode,datanode,secondarynamenode服务
sbin/start-dfs.sh
# 开启yarn,包括resourcemanager,nodemanager
sbin/start-yarn.sh
#查看集群情况
hadoop dfsadmin -report
/usr/local/hadoop-3.2.0/sbin/stop-all.sh #停止所有服务
/usr/local/hadoop-3.2.0/sbin/start-all.sh #启动所有服务
执行命令如下:
/usr/local/hadoop-3.2.0/sbin/hadoop-daemon.sh start namenode
jps
执行命令如下:
/usr/local/hadoop-3.2.0/sbin/hadoop-daemons.sh start datanode
解决办法:
ssh-keygen -t rsa #然后一直按回车,选择默认的操作即可
cd /root/.ssh
cp id_rsa.pub authorized_keys
/usr/local/hadoop-3.2.0/sbin/start-yarn.sh
运行成功
运行失败
解决办法:
进入/usr/local/hadoop-3.2.0/sbin目录
注意是在文件开始空白处
在start-dfs.sh,stop-dfs.sh中:
HDFS_DATANODE_USER=root
HADOOP_SECURE_DN_USER=hdfs
HDFS_NAMENODE_USER=root
HDFS_SECONDARYNAMENODE_USER=root
在start-yarn.sh,stop-yarn.sh中
YARN_RESOURCEMANAGER_USER=root
HADOOP_SECURE_DN_USER=yarn
YARN_NODEMANAGER_USER=root
l 有服务器上$HADOOP_HOME/etc/hadoop下workers文件新增slave3的配置
slave1
slave2
slave3
l 启动新节点上的DataNode和NodeManager
在新节点上启动datanode
$HADOOP_HOME/bin/hdfs --daemon start datanode
#在新节点上启动nodemanager
$HADOOP_HOME/bin/yarn --daemon start nodemanager
l 查看集群状态
#查看hdfs各节点状态
$HADOOP_HOME/bin/hdfs dfsadmin -report
#查看yarn各节点状态
$HADOOP_HOME/bin/yarn node -list
执行命令:
/usr/local/hadoop-3.2.0/bin/hdfs dfsadmin -report
浏览器登录:http://192.168.28.131:18088/cluster
浏览器登录:http://192.168.28.131:50070
http://hive.apache.org/downloads.html
cd /usr/local
wget https://mirrors.tuna.tsinghua.edu.cn/apache/hive/hive-2.3.5/apache-hive-2.3.5-bin.tar.gz
tar -zxvf apache-hive-2.3.5-bin.tar.gz
vim /etc/profile
#在文件结尾添加内容如下
export HIVE_HOME=/usr/local/apache-hive-2.3.5-bin
export PATH=$PATH:$HIVE_HOME/bin
source /etc/profile #使环境变量生效
进入/usr/local/apache-hive-2.3.5-bin/conf目录
执行命令:
cp hive-default.xml.template hive-site.xml
<?xml version="1.0" ?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<!--使用hadoop新建hdfs目录 -->
<property>
<name>hive.metastore.warehouse.dir</name>
<value>/user/hive/warehouse</value>
<description>location of default database for the warehouse</description>
</property>
<!--使用hadoop新建hdfs临时目录 -->
<property>
<name>hive.downloaded.resources.dir</name>
<value>/tmp/hive/${hive.session.id}_resources</value>
<description>Temporary local directory </description>
</property>
<property>
<name>hive.metastore.local</name>
<value>true</value>
</property>
<property>
<name>javax.jdo.option.ConnectionURL</name>
<value>jdbc:mysql://master:3306/hive?createDatabaseIfNotExist=true</value>
</property>
<property>
<name>javax.jdo.option.ConnectionDriverName</name>
<value>com.mysql.jdbc.Driver</value>
</property>
<property>
<name>javax.jdo.option.ConnectionUserName</name>
<value>root</value>
</property>
<property>
<name>javax.jdo.option.ConnectionPassword</name>
<value>123456</value>
</property>
<property>
<name>hive.metastore.schema.verification</name>
<value>false</value>
</property>
</configuration>
[root@master conf]# hadoop fs -mkdir -p /user/hive/warehouse
[root@master conf]# hadoop fs -mkdir -p /tmp/hive
[root@master conf]# hadoop fs -chmod 777 /user/hive/warehouse
[root@master conf]# hadoop fs -chmod 777 /tmp/hive
进入/usr/local/apache-hive-2.3.5-bin/conf目录
cp hive-env.sh.template hive-env.sh
vim hive-env.sh
export HADOOP_HOME=/usr/local/hadoop-3.2.0
export HIVE_CONF_DIR=/usr/local/apache-hive-2.3.5-bin/conf
export HIVE_AUX_JARS_PATH=/usr/local/apache-hive-2.3.5-bin/lib
#CentOS7的yum源中默认好像是没有mysql
cd /usr/local
wget http://repo.mysql.com/mysql-community-release-el7-5.noarch.rpm
rpm -ivh mysql-community-release-el7-5.noarch.rpm
yum install mysql-server mysql-devel -y
查看mysql状态启动及停止
service mysqld status
service mysqld start
service mysqld stop
vim /etc/my.cnf
skip-grant-tables #[mysqld] 部分,skip-grant-tables添加 保存退出
service mysqld restart
mysql -uroot -p #进入mysql中,此时不需要输入密码就可以登陆
mysql>show databases;
mysql>use mysql;
mysql>UPDATE user SET password=PASSWORD(‘123456‘) WHERE user=‘root‘;
mysql>FLUSH PRIVILEGES;
mysql>QUIT
将之前更改的配置文件/etc/my.cnf中的 skip-grant-tables 删除
service mysqld restart
修改/etc/my.cnf配置文件,在[mysqld]下添加编码配置,如下所示:
[mysqld]
character_set_server=utf8
init_connect=‘SET NAMES utf8‘
#重载授权表:
mysql>GRANT ALL PRIVILEGES ON *.* TO ‘root‘@‘%‘ IDENTIFIED BY ‘123456‘ WITH GRANT OPTION;
mysql>create database hive;
mysql>FLUSH PRIVILEGES;
mysql>create database hive;
mysql>create user ‘hive‘ identified by ‘hive‘;
mysql>grant all privileges on *.* to ‘hive‘ with grant option;
mysql>flush privileges;
wget https://gitee.com/boyuecom/tool/raw/master/mysql-connector-java-5.1.6-bin.jar
cp mysql-connector-java-5.1.6-bin.jar /usr/local/apache-hive-2.3.5-bin/lib
执行命令:
schematool -initSchema -dbType mysql
hive
http://www.scala-lang.org/downloads
cd /usr/local
wget https://downloads.lightbend.com/scala/2.13.0/scala-2.13.0.tgz
tar -zxf scala-2.13.0.tgz
vim /etc/profile
export SCALA_HOME=/usr/local/scala-2.13.0
export PATH=$SCALA_HOME/bin:$PATH
source /etc/profile #使环境变量生效
cp conf/log4j.properties.template conf/log4j.properties
在第一行替换:
log4j.rootCategory=INFO, console
通过:
log4j.rootCategory=WARN, console
scala -version
Spark是基于内存计算的大数据分布式计算框架。Spark基于内存计算,提高了在大数据环境下数据处理的实时性,同时保证了高容错性和高可伸缩性,允许用户将Spark部署在大量廉价硬件之上,形成集群
1.提供分布式计算功能,将分布式存储的数据读入,同时将任务分发到各个节点进行计算;
2.基于内存计算,将磁盘数据读入内存,将计算的中间结果保存在内存,这样可以很好的进行迭代运算;
3.支持高容错;
4.提供多计算范式
http://spark.apache.org/downloads.html
cd /usr/local
wget http://d3kbcqa49mib13.cloudfront.net/spark-2.1.0-bin-hadoop2.7.tgz
tar -xvf spark-2.1.0-bin-hadoop2.7.tgz
vim /etc/profile
export SPARK_HOME=/usr/local/spark-2.1.0-bin-hadoop2.7
export PATH=$SPARK_HOME/bin:$PATH
source /etc/profile #使环境变量生效
cd /usr/local/spark-2.1.0-bin-hadoop2.7/conf
cp spark-env.sh.template spark-env.sh
vim spark-env.sh
export SCALA_HOME=/usr/local/scala-2.13.0
export JAVA_HOME=/usr/local/jdk1.8.0_181
export SPARK_MASTER_IP=master
export SPARK_WORKER_MEMORY=1g
export HADOOP_CONF_DIR=/usr/local/hadoop-3.2.0/etc/hadoop
cd /usr/local/spark-2.1.0-bin-hadoop2.7/conf
cp slaves.template slaves
vim slaves
slave1
slave2
vim wordcount.txt
Hello hadoop
hello spark
hello bigdata
执行下列命令:
hadoop fs -mkdir -p /Hadoop/Input
hadoop fs -put wordcount.txt /Hadoop/Input
hadoop jar /usr/local/hadoop-3.2.0/share/hadoop/mapreduce/hadoop-mapreduce-examples-3.2.0.jar wordcount /Hadoop/Input /Hadoop/Output
等待mapreduce执行完毕后,查看结果
hadoop fs -cat /Hadoop/Output/*
hadoop集群搭建成功!
spark-submit 详细参数说明
参数名 |
参数 |
--master |
master 的地址,提交任务到哪里执行,例如 spark://host:port, yarn, local |
--deploy-mode |
在本地 (client) 启动 driver 或在 cluster 上启动,默认是 client |
--class |
应用程序的主类,仅针对 java 或 scala 应用 |
--name |
应用程序的名称 |
--jars |
用逗号分隔的本地 jar 包,设置后,这些 jar 将包含在 driver 和 executor 的 classpath 下 |
--packages |
包含在driver 和executor 的 classpath 中的 jar 的 maven 坐标 |
--exclude-packages |
为了避免冲突 而指定不包含的 package |
--repositories |
远程 repository |
--conf PROP=VALUE |
指定 spark 配置属性的值;例如 -conf spark.executor.extraJavaOptions="-XX:MaxPermSize=256m" |
--properties-file |
加载的配置文件,默认为 conf/spark-defaults.conf |
--driver-memory |
Driver内存,默认 1G |
--driver-java-options |
传给 driver 的额外的 Java 选项 |
--driver-library-path |
传给 driver 的额外的库路径 |
--driver-class-path |
传给 driver 的额外的类路径 |
--driver-cores |
Driver 的核数,默认是1。在 yarn 或者 standalone 下使用 |
--executor-memory |
每个 executor 的内存,默认是1G |
--total-executor-cores |
所有 executor 总共的核数。仅仅在 mesos 或者 standalone 下使用 |
--num-executors |
启动的 executor 数量。默认为2。在 yarn 下使用 |
--executor-core |
每个 executor 的核数。在yarn或者standalone下使用 |
Local模式就是运行在一台计算机上的模式,通常就是用于在本机上练手和测试。它可以通过以下几种方式设置Master:
local:所有计算都运行在一个线程当中,没有任何并行计算,通常我们在本机执行一些测试代码,或者练手,就用这种模式;
local[K]:指定使用几个线程来运行计算,比如local[4]就是运行4个Worker线程。通常我们的CPU有几个Core,就指定几个线程,最大化利用CPU的计算能力;
local[*]:这种模式直接帮你按照CPU最多Cores来设置线程数了。
bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--executor-memory 1G \
--total-executor-cores 2 \
./examples/jars/spark-examples_2.11-2.1.0.jar \
100
(1)基本语法
bin/spark-submit \
--class <main-class>
--master <master-url> \
--deploy-mode <deploy-mode> \
--conf <key>=<value> \
... # other options
<application-jar> \
[application-arguments]
(2)参数说明:
--master 指定Master的地址,默认为Local
--class: 你的应用的启动类 (如 org.apache.spark.examples.SparkPi)
--deploy-mode: 是否发布你的驱动到worker节点(cluster) 或者作为一个本地客户端 (client) (default: client)*
--conf: 任意的Spark配置属性, 格式key=value. 如果值包含空格,可以加引号“key=value”
application-jar: 打包好的应用jar,包含依赖. 这个URL在集群中全局可见。 比如hdfs:// 共享存储系统, 如果是 file:// path, 那么所有的节点的path都包含同样的jar
application-arguments: 传给main()方法的参数
--executor-memory 1G 指定每个executor可用内存为1G
--total-executor-cores 2 指定每个executor使用的cup核数为2个
3)结果展示
该算法是利用蒙特·卡罗算法求PI
l 概述
Spark客户端直接连接Yarn,不需要额外构建Spark集群。有yarn-client和yarn-cluster两种模式,主要区别在于:Driver程序的运行节点。
yarn-client:Driver程序运行在客户端,适用于交互、调试,希望立即看到app的输出
yarn-cluster:Driver程序运行在由RM(ResourceManager)启动的AP(APPMaster)适用于生产环境。
l 安装使用
1) 修改hadoop配置文件yarn-site.xml,添加如下内容:
vim /usr/local/hadoop-3.2.0/etc/hadoop/yarn-site.xml
<!--是否启动一个线程检查每个任务正使用的物理内存量,如果任务超出分配值,则直接将其杀掉,默认是true -->
<property>
<name>yarn.nodemanager.pmem-check-enabled</name>
<value>false</value>
</property>
<!--是否启动一个线程检查每个任务正使用的虚拟内存量,如果任务超出分配值,则直接将其杀掉,默认是true -->
<property>
<name>yarn.nodemanager.vmem-check-enabled</name>
<value>false</value>
</property>
2)修改spark-env.sh,添加如下配置:
vim /usr/local/spark-2.1.0-bin-hadoop2.7/conf spark-env.sh
export YARN_CONF_DIR=/usr/local/hadoop-3.2.0/etc/hadoop
3)分发配置文件
[atguigu@hadoop102 conf]$xsync/opt/module/hadoop-2.7.2/etc/hadoop/yarn-site.xml
[atguigu@hadoop102 conf]$ xsync spark-env.sh
4)执行一个程序
[atguigu@hadoop102 spark]$ bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master yarn \
--deploy-mode client \
./examples/jars/spark-examples_2.11-2.1.0.jar \
100
注意:在提交任务之前需启动HDFS以及YARN集群。
l 日志查看
1) 修改配置文件spark-defaults.conf
cd /usr/local/spark-2.1.0-bin-hadoop2.7/conf
cp spark-defaults.conf.template spark-defaults.conf
添加如下内容:
spark.yarn.historyServer.address=master:18080
spark.history.ui.port=18080
2)重启spark历史服务
[atguigu@hadoop102 spark]$ sbin/stop-history-server.sh
[atguigu@hadoop102 spark]$ sbin/start-history-server.sh
3)提交任务到Yarn执行
[atguigu@hadoop102 spark]$ bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master yarn \
--deploy-mode client \
./examples/jars/spark-examples_2.11-2.1.0.jar \
100
4)Web页面查看日志
[root@master spark-2.1.0-bin-hadoop2.7]#vim wordcount.txt
Hello hadoop
hello spark
hello bigdata
执行下列命令:
[root@master spark-2.1.0-bin-hadoop2.7]#hadoop fs -mkdir -p /Hadoop/Input
[root@master spark-2.1.0-bin-hadoop2.7]#hadoop fs -put wordcount.txt /Hadoop/Input
[root@master spark-2.1.0-bin-hadoop2.7]#bin/spark-shell
scala> sc
scala> val file=sc.textFile("hdfs://master:9000/Hadoop/Input/wordcount.txt")
scala> val rdd = file.flatMap(line => line.split(" ")).map(word => (word,1)).reduceByKey(_+_)
scala> rdd.collect()
scala> rdd.foreach(println)
import os
import sys
spark_name = os.environ.get(‘SPARK_HOME‘,None)
if not spark_name:
raise ValueErrorError(‘spark环境没有配置好‘)
sys.path.insert(0,os.path.join(spark_name,‘python‘))
sys.path.insert(0,os.path.join(spark_name,‘python/lib/py4j-0.10.4-src.zip‘))
exec(open(os.path.join(spark_name,‘python/pyspark/shell.py‘)).read())
在Spark中创建RDD的创建方式可以分为三种:从集合中创建RDD;从外部存储创建RDD;从其他RDD创建。
从集合中创建RDD,Spark主要提供了两种函数:parallelize和makeRDD
1)使用parallelize()从集合创建
scala> val rdd = sc.parallelize(Array(1,2,3,4,5,6,7,8))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24
2)使用makeRDD()从集合创建
scala> val rdd1 = sc.makeRDD(Array(1,2,3,4,5,6,7,8))
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at makeRDD at <console>:24
包括本地的文件系统,还有所有Hadoop支持的数据集,比如HDFS、Cassandra、HBase等,我们会在第4章详细介绍。
scala> val rdd2= sc.textFile("hdfs://hadoop102:9000/RELEASE")
rdd2: org.apache.spark.rdd.RDD[String] = hdfs:// hadoop102:9000/RELEASE MapPartitionsRDD[4] at textFile at <console>:24
1. 变换(Transformations):
特点: 懒执行,变换只是一些指令集并不会去马上执行,需要等到有Actions操作的时候才会真正的据算结果
比如: map() flatMap() groupByKey reduceByKey
2. 操作(Actions):
特点: 立即执行
比如: count() take() collect() top() first()
RDD的持久化存储(cache和persist)
默认情况下使用Action在RDD上时Spark 会重新计算刷新RDD.但是这俩种持久化方法可以将RDD放在内存当中,这样第二次使用的时候action在RDD上时候Spark 不会重新计算刷新RDD
rows = sc.textFile(‘/user/hadoop/hello.txt‘)
rows.persist() # 或者 rows.cache()
rows.count() # 第一次执行,会将RDD放在内存上
rows.count() # 第二次执行不会重新从文件读取RDD
过滤,将符合条件的数据留下来
l reduce将RDD中元素前两个传给输入函数,产生一个新的return值,新产生的return值与RDD中下一个元素(第三个元素)组成两个元素,再被传给输入函数,直到最后只有一个值为止
l reduceByKey就是对元素为键值对的RDD中Key相同的元素的Value进行reduce操作,因此,Key相同的多个元素的值被reduce为一个值,然后与原RDD中的Key组成一个新的键值对。(去键重)
pathA = [(‘a‘,1),(‘b‘,1),(‘c‘,2),(‘d‘,3)]
pathB = [(‘c‘,1),(‘d‘,3),(‘e‘,3),(‘f‘,4),]
a = sc.parallelize(pathA)
b = sc.parallelize(pathB)
print(a.join(b).collect()) # 交集
print(a.rightOuterJoin(b).collect()) # 右连接
print(a.leftOuterJoin(b).collect()) # 左连接
print(a.cogroup(b).collect()) # 全连接
print(a.subtractByKey(b).collect()) # 减连接
take(n) 返回前n个元素
top(n) 返回最大的n个元素
first() 返回第一个元素
collect() 返回所有元素,一般元素少的话才会使用
lookup(key) 返回某键值下的所有值
collectAsMap()返回的是一MAP形式的串行化结果
countByKey() 返回的是每一键组内的记录数
【Example】
#!/usr/bin/env
#-*- coding:utf-8 -*-
import os
from pyspark.sql import SparkSession
spark=SparkSession.builder.appName("boye").getOrCreate()
#运行在本地(local),2个线程
# spark = SparkSession.builder.appName("test").master("local[2]").getOrCreate()
sc = spark.sparkContext
textFile = sc.textFile("file:///usr/local/test/urls")
#获取域名
#rdd = textFile.filter(lambda x:x.__contains__("http")).map( lambda x:(x.split("\t")[1].split("/")[2],1))
#获取url
rdd = textFile.filter(lambda x:x.__contains__("http")).map( lambda x:(x.split("\t")[1],1))
rdd = rdd.reduceByKey(lambda a,b:a+b)
#sortBy 升序排序
rdd = rdd.sortBy(lambda x:x[0]).map(lambda x:"\t".join([str(i) for i in x]))
os.popen(‘rm -rf /usr/local/test/spark_urls‘)
rdd.saveAsTextFile("file:///usr/local/test/spark_urls")
cd /usr/local/
wget http://archive.apache.org/dist/hbase/1.3.1/hbase-1.3.1-bin.tar.gz
tar -zxvf hbase-1.3.1-bin.tar.gz
cd /usr/local/hbase-1.3.1/conf
vim hbase-site.xml
<!-- 指定HRegion服务器的位置,即数据存放位置 -->
<property>
<name>hbase.rootdir</name>
<value>file:///tmp/hbase</value>
</property>
<!-- 指定HRegion服务器的位置,即数据存放位置 -->
<property>
<name>hbase.rootdir</name>
<value>hdfs://localhost:9000/hbase</value>
</property>
<!-- 指定HLog和Hfile的副本个数 -->
<property>
<name>dfs.replication</name>
<value>1</value>
</property>
<!-- 指定HRegion服务器的位置,即数据存放位置 -->
<property>
<name>hbase.rootdir</name>
<value>hdfs://master:9000/hbase</value>
</property>
<!-- 指定HBase运行模式,false表示单机模式或伪分布式,true表示完全分布式模式 -->
<property>
<name>hbase.clister.distributed</name>
<value>true</value>
</property>
<!-- 指定master位置 -->
<property>
<name>hbase.master</name>
<value>hdfs://master:60000</value>
</property>
<!-- 指定zookeeper集群 -->
<property>
<name>hbase.zookeeper.quorum</name>
<value>master,slave1,slave2</value>
</property>
vim /etc/profile
最后一行添加
export HBASE_HOME=/usr/local/hbase-1.3.1
export PATH=$HBASE_HOME/bin:$PATH
source /etc/profile #使环境变量生效
进入bin目录
cd $HBASE_HOME/bin
sh start-hbase.sh
jps
sh start-dfs.sh
sh start-hbase.sh
sh start-dfs.sh
sh zookeeper.sh start
sh start-hbase.sh
sh stop-hbase.sh
Could not start ZK at requested port of 2181. ZK was started at port: 2182. Aborting a
HMaster和HRegionServer是Hbase的两个子进程,但是使用jps发现没有启动起来,所以去我们配置的logs查看错误信息。提示:
Could not start ZK at requested port of 2181. ZK was started at port: 2182. Aborting as clients (e.g. shell) will not be able to find this ZK quorum.
但是在hbase-env.sh文件中设置了export HBASE_MANAGES_ZK=false
设置不使用自带zookeeper,这一步设置完按理说就可以使用独立的zookeeper程序了,但是还是报错。很明显,这是启动自带zookeeper与独立zookeeper冲突了。因为把hbase.cluster.distributed设置为false,也就是让hbase以standalone模式运行时,依然会去启动自带的zookeeper。
所以要做如下设置,值为true:
vim conf/hbase-site.xml
<property>
<name>hbase.cluster.distributed</name>
<value>true</value>
</property>
HBase是一个分布式的、面向列的开源数据库,源于google的一篇论文《bigtable:一个结构化数据的分布式存储系统》。HBase是Google Bigtable的开源实现,它利用Hadoop HDFS作为其文件存储系统,利用Hadoop MapReduce来处理HBase中的海量数据,利用Zookeeper作为协同服务。
HBase以表的形式存储数据。表有行和列组成。列划分为若干个列族/列簇(column family)
HBase数据模型
HBase是一个面向列的数据库,在表中它由行排序。表模式定义只能列族,也就是键值对。一个表有多个列族以及每一个列族可以有任意数量的列。后续列的值连续存储在磁盘上。表中的每个单元格值都具有时间戳。总之,在一个HBase:
² 表是行的集合。
² 行是列族的集合。
² 列族是列的集合。
² 列是键值对的集合。
hbase shell #进入Hbase数据库
help [‘command‘] 查看帮助命令
status #查询服务器状态
whoami #查询当前用户
version #当前hbase使用的版本号
操作 |
命令表达式 |
创建表 |
create ‘table_name, ‘family1‘,‘family2‘,‘familyN‘ |
添加记录 |
put ‘table_name‘, ‘rowkey‘, ‘family:column‘, ‘value‘ |
查看记录 |
get ‘table_name‘, ‘rowkey‘ --查询单条记录 |
查看表中的记录总数 |
count ‘table_name‘ --这个命令并不快 |
删除记录 |
第一种方式删除一条记录单列的数据,第二种方式删除整条记录 delete ‘table_name‘ ,‘rowkey‘,‘family_name:column‘ deleteall ‘table_name‘,‘rowkey‘ |
删除一张表 |
1、disable ‘table_name‘ 2、drop ‘table_name‘ |
查看所有记录 |
scan "table_name" ,{LIMIT=>10} --LIMIT=>10 只返回10条记录 |
#语法 : grant <user> <permissions> <table> <column family> <column qualifier> 参数后面用逗号分隔
# 权限用五个字母表示: "RWXCA".
# READ(‘R‘), WRITE(‘W‘), EXEC(‘X‘), CREATE(‘C‘), ADMIN(‘A‘)
# 例如,给用户‘test‘分配对表t1有读写的权限,
hbase(main)> grant ‘test‘,‘RW‘,‘t1‘
# 语法:user_permission <table>
# 例如,查看表t1的权限列表
hbase(main)> user_permission ‘t1‘
# 与分配权限类似,语法:revoke <user> <table> <column family> <column qualifier>
# 例如,收回test用户在表t1上的权限
hbase(main)> revoke ‘test‘,‘t1‘
语法:create <table>, {NAME => <family>, VERSIONS => <VERSIONS>}
create ‘表名称‘, ‘列名称1‘,‘列名称2‘,‘列名称N‘
create ‘table1‘, ‘tab1_id‘, ‘tab1_add‘, ‘tab1_info‘
list
describe "table1"
exists ‘table2‘
disable ‘table1‘
drop ‘table1‘
注:先要屏蔽该表,才能对该表进行删除
is_enabled ‘table1‘
is_disabled ‘table1‘
² 修改emp表的personal data列族的VERSIONS值为5
alter ‘emp’,NAME=>’personal data’,VERSIONS=>5
² 可以将表设置为只读模式,命令如下:
alter ‘tablename’,READONLY
² 删除表范围运算符,需首先将表disable:
alter ‘tablename’,METHOD=>’table_att_unset’,NAME=>’MAX_FILESIZE’
删除列族,需首先将表disable:
alter ‘tablename’,’delete’=>’column family’
删除一个列族之后,这个列族的数据也会全部被删除
语法:put <table>,<rowkey>,<family:column>,<value>,<timestamp>
create ‘member‘,‘member_id‘,‘address‘,‘info‘
put ‘member‘, ‘scutshuxue1‘, ‘info:age‘, ‘24‘
put ‘member‘, ‘scutshuxue2‘, ‘info:birthday‘, ‘1987-06-17‘
put ‘member‘, ‘scutshuxue3‘, ‘info:company‘, ‘alibaba‘
put ‘member‘, ‘scutshuxue‘, ‘address:contry‘, ‘china‘
put ‘member‘, ‘scutshuxue‘, ‘address:province‘, ‘zhejiang‘
put ‘member‘, ‘scutshuxue‘, ‘address:city‘, ‘hangzhou‘
scan "表名称" , [‘列名称:‘]
# 语法:scan <table>, {COLUMNS => [ <family:column>,.... ], LIMIT => num}
scan ‘User‘, {LIMIT => 2}
scan “table1”
# 语法:count <table>, {INTERVAL => intervalNum, CACHE => cacheNum}
count ‘member‘
get ‘表名称‘, ‘行名称‘
# 语法:get <table>,<rowkey>,[<family:column>,....]
获得一行的所有数据
get ‘member‘,‘scutshuxue‘
获得某行,某列族的所有数据
get ‘member‘,‘scutshuxue‘,‘info‘
获得某行,某列族,某列的所有数据
get ‘member‘,‘scutshuxue‘,‘info:company‘
hbase(main):002:0> incr ‘table‘,‘id‘,‘column_famaly1:addr‘
COUNTER VALUE = 1
0 row(s) in 0.0340 seconds
put ‘表名称‘, ‘行名称‘, ‘列名称:‘, ‘值‘
put ‘member‘, ‘scutshuxue‘, ‘info:age‘, 99 --把scutshuxue年龄改为99
delete ‘表名‘ ,‘行名称‘ , ‘列名称‘
# 语法:delete <table>, <rowkey>, <family:column> , <timestamp>,必须指定列名
delete ‘member‘, ‘scutshuxue‘, ‘info:age‘ --删除行‘scutshuxue‘, 列族为‘info‘ 中age的值
# 语法:deleteall <table>, <rowkey>, <family:column> , <timestamp>,可以不指定列名,删除整行数据
deleteall ‘member‘, ‘scutshuxue‘ --删除整行
truncate ‘member‘
官网:http://archive.apache.org/dist/zookeeper/
l 下载
wget http://archive.apache.org/dist/zookeeper/zookeeper-3.4.5/zookeeper-3.4.5.tar.gz
l 解压
tar -zxvf zookeeper-3.4.5.tar.gz
l 进入解压目录,创建data和logs目录
cd /usr/local/zookeeper-3.4.5
mkdir data
mkdir logs
l 在conf目录下新建zoo.cfg文件,写入以下内容保存
vim /usr/local/zookeeper-3.4.5/conf/zoo.cfg
tickTime=2000
dataDir=/usr/local/zookeeper-3.4.5/data
dataLogDir=/usr/local/zookeeper-3.4.5/logs
clientPort=2181
进入bin目录,启动、停止、重启分和查看当前节点状态(包括集群中是何角色)别执行:
cd /usr/local/zookeeper-3.4.5/bin
./zkServer.sh start
./zkServer.sh stop
./zkServer.sh restart
./zkServer.sh status
ps -aux | grep ‘zookeeper‘
伪集群模式就是在同一主机启动多个zookeeper并组成集群,
下边以在192.168.28.131主机上创3个zookeeper组集群为例。
将通过第一大点安装的zookeeper,复制成zookeeper1/zookeeper2/zookeeper3三份
l zookeeper1配置
zookeeper1配置文件conf/zoo.cfg修改如下:
tickTime=2000
dataDir=/usr/local/zookeeper1/data
dataLogDir=/usr/local/zookeeper1/logs
clientPort=2181
initLimit=5
syncLimit=2
server.1=192.168.28.131:2888:3888
server.2=192.168.28.131:4888:5888
server.3=192.168.28.131:6888:7888
l zookeeper1的data/myid配置如下
echo ‘1‘ > data/myid
l zookeeper2配置
zookeeper2配置文件conf/zoo.cfg修改如下:
tickTime=2000
dataDir=/usr/local/zookeeper2/data
dataLogDir=/usr/local/zookeeper2/logs
clientPort=3181
initLimit=5
syncLimit=2
server.1=192.168.28.131:2888:3888
server.2=192.168.28.131:4888:5888
server.3=192.168.28.131:6888:7888
l zookeeper2的data/myid配置如下:
echo ‘2‘ > data/myid
l zookeeper3配置
zookeeper3配置文件conf/zoo.cfg修改如下:
tickTime=2000
dataDir=/usr/local/zookeeper3/data
dataLogDir=/usr/local/zookeeper3/logs
clientPort=4181
initLimit=5
syncLimit=2
server.1=192.168.28.131:2888:3888
server.2=192.168.28.131:4888:5888
server.3=192.168.28.131:6888:7888
l zookeeper3的data/myid配置如下:
echo ‘3‘ > data/myid
l 最后使用命令把三个zookeeper都启动即可,启动顺序随意没要求
sh /usr/local/zookeeper1/bin/zkServer.sh start
sh /usr/local/zookeeper2/bin/zkServer.sh start
sh /usr/local/zookeeper3/bin/zkServer.sh start
集群模式就是在不同主机上安装zookeeper然后组成集群的模式;下边以在192.168.28.131/132/133三台主机为例。
将第1.1到1.3步中安装好的zookeeper打包复制到132和133上,并都解压到同样的目录下。
三个zookeeper的conf/zoo.cfg修改如下:
tickTime=2000
dataDir=/usr/local/zookeeper-3.4.5/data
dataLogDir=/usr/local/zookeeper-3.4.5/logs
clientPort=2181
initLimit=5
syncLimit=2
server.1=192.168.220.131:2888:3888
server.2=192.168.220.132:2888:3888
server.3=192.168.220.133:2888:3888
对于132和133,由于安装目录都是zookeeper-3.4.5所以dataDir和dataLogDir不需要改变,又由于在不同机器上所以clientPort也不需要改变
所以此时132和133的conf/zoo.cfg的内容与131一样即可
l 131 data/myid修改如下
echo ‘1‘ > data/myid
l 132 data/myid修改如下
echo ‘2‘ > data/myid
l 133 data/myid修改如下
echo ‘3‘ > data/myid
以下命令不管是单机、伪集群、集群模式都适用;伪集群和集群模式随便连接其中一个zookeeper即可。
进入zookeeper的bin目录,使用zkCli连接zookeeper
./bin/zkServer.sh status #查看集群状态
./zkCli.sh # 默认连接localhost:2181
./zkCli.sh -server 192.168.220.128:2181 #指定ip和端口
命令 |
描述 |
help |
查看所有支持的命令 |
ls / |
查看目录下有哪些节点。以根目录为例 |
create /example_path "example_data" |
创建一个节点;加-s表示创建顺序节点,即会自动在给定的路径后面再加上一个数字串,保证路径不重复;默认是持久节点,加-e是临时节点 |
get /example_path |
查看节点内容;返回第一行即是节点的内容,如果第一行空白或null那就说明该节点创建时就没有值;后续的cZxid到numChildren都是该节点的一些属性信息;其中numChildren标识该节点下有多少个子节点 |
delete /example_path |
删除一个没有子节点的节点 |
rmr /example_path |
递规删除节点及其所有子节点 |
quit |
退出zkCli |
参考:https://blog.csdn.net/luanpeng825485697/article/details/81036028
在大数据中,使用了大量的数据。 关于数据,我们有两个主要挑战。第一个挑战是如何收集大量的数据,第二个挑战是分析收集的数据。为了克服这些挑战,您必须需要一个消息系统。
Kafka专为分布式高吞吐量系统而设计。 Kafka往往工作得很好,作为一个更传统的消息代理的替代品。 与其他消息传递系统相比,Kafka具有更好的吞吐量,内置分区,复制和固有的容错能力,这使得它非常适合大规模消息处理应用程序。
消息系统负责将数据从一个应用程序传输到另一个应用程序,因此应用程序可以专注于数据,但不担心如何共享它。 分布式消息传递基于可靠消息队列的概念。 消息在客户端应用程序和消息传递系统之间异步排队。 有两种类型的消息模式可用 - 一种是点对点,另一种是发布 - 订阅(pub-sub)消息系统。 大多数消息模式遵循 pub-sub 。
在点对点系统中,消息被保留在队列中。 一个或多个消费者可以消耗队列中的消息,但是特定消息只能由最多一个消费者消费。 一旦消费者读取队列中的消息,它就从该队列中消失。 该系统的典型示例是订单处理系统,其中每个订单将由一个订单处理器处理,但多个订单处理器也可以同时工作。 下图描述了结构。
在发布 - 订阅系统中,消息被保留在主题中。 与点对点系统不同,消费者可以订阅一个或多个主题并使用该主题中的所有消息。 在发布 - 订阅系统中,消息生产者称为发布者,消息使用者称为订阅者。 一个现实生活的例子是Dish电视,它发布不同的渠道,如运动,电影,音乐等,任何人都可以订阅自己的频道集,并获得他们订阅的频道时可用。
Apache Kafka是一个分布式发布 - 订阅消息系统和一个强大的队列,可以处理大量的数据,并使您能够将消息从一个端点传递到另一个端点。 Kafka适合离线和在线消息消费。 Kafka消息保留在磁盘上,并在群集内复制以防止数据丢失。 Kafka构建在ZooKeeper同步服务之上。 它与Apache Storm和Spark非常好地集成,用于实时流式数据分析。
以下是Kafka的几个好处:
Kafka非常快,并保证零停机和零数据丢失。
Kafka可以在许多用例中使用。 其中一些列出如下:
l 指标 - Kafka通常用于操作监控数据。 这涉及聚合来自分布式应用程序的统计信息,以产生操作数据的集中馈送。
l 日志聚合解决方案 - Kafka可用于跨组织从多个服务收集日志,并使它们以标准格式提供给多个服务器。
l 流处理 - 流行的框架(如Storm和Spark Streaming)从主题中读取数据,对其进行处理,并将处理后的数据写入新主题,供用户和应用程序使用。 Kafka的强耐久性在流处理的上下文中也非常有用。
需要Kafka
Kafka是一个统一的平台,用于处理所有实时数据Feed。 Kafka支持低延迟消息传递,并在出现机器故障时提供对容错的保证。 它具有处理大量不同消费者的能力。 Kafka非常快,执行2百万写/秒。 Kafka将所有数据保存到磁盘,这实质上意味着所有写入都会进入操作系统(RAM)的页面缓存。 这使得将数据从页面缓存传输到网络套接字非常有效。
http://kafka.apache.org/downloads
l 下载
wget https://mirrors.cnnic.cn/apache/kafka/2.3.0/kafka_2.11-2.3.0.tgz
l 解压
tar -zxvf kafka_2.11-2.3.0.tgz
l 进入目录
cd /usr/local/kafka_2.11-2.3.0
l 修改配置文件
vi conf/server.properties
#broker的全局唯一编号,不能重复
broker.id=0
#删除topic功能使能
delete.topic.enable=true
#处理网络请求的线程数量
num.network.threads=3
#用来处理磁盘IO的现成数量
num.io.threads=8
#发送套接字的缓冲区大小
socket.send.buffer.bytes=102400
#接收套接字的缓冲区大小
socket.receive.buffer.bytes=102400
#请求套接字的缓冲区大小
socket.request.max.bytes=104857600
#kafka运行日志存放的路径
log.dirs=/usr/local/kafka_2.11-2.3.0/logs
#topic在当前broker上的分区个数
num.partitions=1
#用来恢复和清理data下数据的线程数量
num.recovery.threads.per.data.dir=1
#segment文件保留的最长时间,超时将被删除
log.retention.hours=168
#配置连接Zookeeper集群地址
zookeeper.connect=hadoop102:2181,hadoop103:2181,hadoop104:2181
l 配置环境变量
sudo vi /etc/profile
#KAFKA_HOME
export KAFKA_HOME=/usr/local/kafka_2.11-2.3.0
export PATH=$PATH:$KAFKA_HOME/bin
[atguigu@hadoop102 module]$ source /etc/profile
l 启动服务器
注:需要先启动 zookeeper
bin/kafka-server-start.sh config/server.properties
l 停止服务器
bin/kafka-server-stop.sh config/server.properties
https://www.cnblogs.com/rilley/p/5391268.html
#每一个broker在集群中的唯一标示,要求是正数。在改变IP地址,不改变broker.id的话不会影响consumers
broker.id=0
#kafka数据的存放地址,多个地址的话用逗号分割 /tmp/kafka-logs-1,/tmp/kafka-logs-2
log.dirs=/usr/local/kafka/kafka-logs
#消息体的最大大小,单位是字节
message.max.bytes = 1000000
zookeeper.connect=master:2181
listeners=PLAINTEXT://:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
num.partitions=3 #分区
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
#日志清理策略 选择有:delete和compact 主要针对过期数据的处理,或是日志文件达到限制的额度,会被 topic创建时的指定参数覆盖
log.cleanup.policy = delete
#数据存储的最大时间 超过这个时间 会根据log.cleanup.policy设置的策略处理数据
log.retention.hours=168
#控制toppic分区中每个segment(00000000000000006223.log)的大小
log.segment.bytes=1024 * 1024 * 1024
#这个参数会在日志segment没有达到log.segment.bytes设置的大小,也会强制新建一个segment 会被 topic创建时的指定参数覆盖
log.roll.hours = 24*7
#topic每个分区的最大文件大小,一个topic的大小限制 = 分区数*log.retention.bytes =-1 没有大小限制log.retention.bytes和log.retention.minutes任意一个达到要求,都会执行删除,会被topic创建时的指定参数覆盖
log.retention.bytes=-1
#文件大小检查的周期时间,是否处罚 log.cleanup.policy中设置的策略
log.retention.check.interval.ms=300000
log.cleaner.enable=false #是否开启日志压缩
log.cleaner.threads =1 #日志压缩运行的线程数
#日志压缩去重时候的缓存空间 ,在空间允许的情况下,越大越好
log.cleaner.dedupe.buffer.size=500*1024*1024
zookeeper.connection.timeout.ms=60000
#对于压缩的日志保留的最长时间,也是客户端消费消息的最长时间,同log.retention.minutes的区别在于一个控制未压缩数据,一个控制压缩后的数据。会被topic创建时的指定参数覆盖
log.cleaner.delete.retention.ms = 1 day
#对于segment日志的索引文件大小限制,会被topic创建时的指定参数覆盖
log.index.size.max.bytes = 10 * 1024 * 1024
#当执行一个fetch操作后,需要一定的空间来扫描最近的offset大小,设置越大,代表扫描速度越快,但是也更好内存,一般情况下不需要搭理这个参数
log.index.interval.bytes = 4096
group.initial.rebalance.delay.ms=0
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
参数说明:
–zookeeper:指定kafka连接zk的连接url,该值和server.properties文件中的配置项{zookeeper.connect}一样
–replication-factor:指定副本数量
–partitions:指定分区数量
–topic:主题名称
bin/kafka-topics.sh --list --zookeeper localhost:2181
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
--from-beginning:会把主题中以往所有的数据都读取出来
拷贝server.properties三份
cd /usr/local/kafka_2.11-2.3.0
cp server.properties server-1.properties
cp server.properties server-2.properties
cp server.properties server-3.properties
l 修改server-1.properties文件
# broker的全局唯一编号,不能重复
broker.id=1
# 监听
listeners=PLAINTEXT://:9093
# 日志目录
log.dirs=/home/hadoop/kafka-logs-1
l 修改server-2.properties文件
# broker的全局唯一编号,不能重复
broker.id=2
# 监听
listeners=PLAINTEXT://:9094
# 日志目录
log.dirs=/home/hadoop/kafka-logs-2
l 修改server-3.properties文件
# broker的全局唯一编号,不能重复
broker.id=3
# 监听
listeners=PLAINTEXT://:9095
# 日志目录
log.dirs=/home/hadoop/kafka-logs-3
l 启动Zookeeper
cd /usr/local/zookeeper-3.4.5/bin
sh zkServer.sh start
ZooKeeper JMX enabled by default
Using config: /usr/local/zookeeper-3.4.5/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
l 启动Kafka(分别启动server1、2、3)
cd /usr/local/kafka_2.11-2.3.0
bin/kafka-server-start.sh -daemon config/server-1.properties
bin/kafka-server-start.sh -daemon config/server-2.properties
bin/kafka-server-start.sh -daemon config/server-3.properties
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3
-partitions 1 --topic Multibrokerapplication
bin/kafka-topics.sh --describe --zookeeper localhost:2181
--topic Multibrokerappli-cation
bin/kafka-console-producer.sh --broker-list localhost:9092
--topic Multibrokerapplication
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic myjob --from-beginning
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic myjob --group myjob-group
l 查看指定topic信息
bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic test
l 查看积压
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group test-group
LogEndOffset:下一条将要被加入到日志的消息的位移
CurrentOffset:当前消费的位移
LAG :消息堆积量
l 修改主题
bin/kafka-topics.sh —zookeeper localhost:2181 --alter --topic topic_name
--parti-tions count
l 删除topic
bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic first
注:需要server.properties中设置delete.topic.enable=true否则只是标记删除。
l 查看某个Topic的详情
bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic first
l 修改分区数
bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic first --partitions 6
删除主题
语法:
bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic topic_name
zookeeper 集群启动没有问题,集群状态也正常,但是启动kafka 确报了这个错误:
Timed out waiting for connection while in state: CONNECTING (连接超时)
原因:
a) zookeeper 访问不了。 查看下zookeeper (网络是否通)是否正常启动。
b) kafka 的zookeeper (server.properties里面)访问地址不正确,检查一下。
c) kafka 的 broker.id (server.properties里面)没有注释掉,这里集群最好注释掉,不要手动指定。
d) 修改 kafka 配置 连接超时间,这里是以毫秒为单位。
zookeeper.connection.timeout.ms=60000
启动Kafka报Java HotSpot(TM) 64-Bit Server VM warning: INFO: os::commit_memory(0x00000000c0000000, 107
原因:启动kafka报JAVA HotSpot 内存不足
直接在bin下面
vim kafka-server-start.sh
《hadoop 集群搭建、spark安装、Hbase安装、Hive安装、Kafka安装》
标签:share 原因 arc warning 流式 输出 recover center output
原文地址:https://www.cnblogs.com/boye169/p/13394594.html