问题导读:
1.node向master发送心跳之后等待反馈的最大时长由哪个参数来决定,默认多长时间?
2.当primary sink(可以认为是第一collector)故障后,重启primary sink的一个延迟时间,在此期间,agent将把数据发送到secondary sink(可能是第二collector)由哪个参数来决定?
3.collector的默认发送目录通过哪个参数可以配置?
4.flume command执行命令的方式有几种?
Flume配置文件(flume-site.conf)
1、 watchdog
watchdog.restarts.max
|
watchdog每分钟重启的最大数???
|
2、 common node
flume.config.heartbeat.period
|
node发送心跳周期,默认5000(毫秒)
|
flume.node.status.port
|
node web端口
|
flume.node.heartbeat.backoff.ceiling
|
node向master发送心跳之后等待反馈的最大时长,默认60000(毫秒)
|
flume.node.http.autofindport
|
如果已有node启动,允许第二个node自动选择一个未使用的端口做web服务。多个node的界面端口从35862、35863向后延续
|
3、agent
flume.agent.logdir
|
agent日志路径
|
flume.agent.logdir.maxage
|
当前处于打开状态agent日志文件收集信息的时长,在这之后该日志文件将会被关闭,并将数据发送到网络,默认10000(毫秒)
|
flume.agent.logdir.retransmit
|
在end-to-end模式下agent向collector发送数据失败后再次发送的间隔时长,默认60000(毫秒),建议至少是flume.collector.roll.millis的两倍
|
flume.agent.failover.backoff.initial
|
当primary sink(可以认为是第一collector)故障后,重启primary sink的一个延迟时间,在此期间,agent将把数据发送到secondary sink(可能是第二collector)
|
flume.agent.failover.backoff.max
|
在一定时限内尝试链接故障节点失败后,agent将把数据转发向备用节点
|
4、collector
flume.collector.event.host
|
默认collector地址
|
flume.collector.port
|
默认collector端口
|
flume.collector.dfs.dir
|
最终数据发向目录(默认),可以是本地,可以是hdfs,默认是/tmp
|
flume.collector.dfs.compress.codec
|
压缩格式GzipCodec, DefaultCodec (deflate), BZip2Codec,默认是None
|
flume.collector.roll.millis
|
hdfs文件切换(关闭后新建)的时长
|
flume.collector.output.format
|
collector发送数据格式avro, avrojson(默认), avrodata…
|
5、master
flume.master.servers
|
用逗号分隔多个master地址列表
|
flume.master.store
|
master配置存储方式(zookeeper/memory) zookeeper保证master的配置在多master节点之间同步,memory则保存在内存中,其配置随着master宕机而丢失
|
flume.master.serverid
|
master的唯一标识
|
flume.master.http.port
|
http端口
|
flume.master.heartbeat.missed.max
|
判断节点失效的最大未达心跳数
|
flume.master.savefile
|
当前flume配置文件的路径,默认conf/current.flume
|
flume.master.savefile.autoload
|
启动时是否加载current.flume,默认false
|
flume.master.gossip.period
|
master通信周期(毫秒)
|
flume.master.heartbeat.rpc
|
THRIFT/AVRO
|
flume.event.rpc
|
THRIFT/AVRO
|
flume.report.server.rpc.type
|
THRIFT/AVRO
|
6、zookeeper
flume.master.zk.logdir
|
zookeeper日志路径
|
7、thrift
flume.thrift.socket.timeout.ms
|
thrift网络连接超时时间(毫秒)
|
1.command shell(flume command)
help
|
帮助
|
connect master:port
|
登录master
|
config logicalnode source sink
|
为逻辑节点配置一个source到sink的映射
|
getnodestatus
|
获得节点状态(HELLO, CONFIGURING, ACTIVE, IDLE, ERROR, DECOMMISSIONED, LOST )
HELLO, node启动时
CONFIGURING, node被配置后
ACTIVE, 一个event从source送达到sink
IDLE, source中所有evnet发送完毕后
ERROR, 节点故障退出,数据没有flush
DECOMMISSIONED, node被master移除
LOST, master长时间未收到node心跳
|
getconfigs
|
获得配置
|
getmappings [physical node]
|
如果physical node参数被省略,将显示所有logical node到physical node的映射关系
|
exec
|
同步执行命令
|
Source file
|
执行脚本.
|
submit
|
异步执行命令
|
wait ms [cmdid]
|
设定一个时间,周期检查命令进程的状态(success or failure)
|
waitForNodesActive ms node1 [node2 […]]
|
设定一个时间,检查node是否处于使用(configuring, active)状态
|
waitForNodesDone ms node1 [node2 […]]
|
设定一个时间,检查node是否处于未用(IDLE, ERROR, LOST)状态
|
quit
|
退出
|
2.command shell(exec & submit command)
双引号
|
包含转义字符的java string
|
单引号
|
能引住除单引号之外的所有字符
|
noop
|
touch master, 不做操作
|
config logicalnode source sink
|
为逻辑节点配置source到sink的映射
|
multiconfig flumespec
|
|
unconfig logicalnode
|
取消逻辑节点的配置,影响master调整故障转移列表(failover list)
|
refreshAll logicalnode
|
刷新
|
save filename
|
保存current configuration到master硬盘
|
load filename
|
从master中加载current configuration
|
map physicalnode logicalnode
|
配置物理节点到逻辑节点的映射关系,master的配置将被同步到logicalnode
|
spawn physicalnode logicalnode
|
恢复
|
decommission logicalnode
|
|
unmap physicalnode logicalnode
|
取消映射
|
unmapAll
|
全部取消
|
purge logicalnode
|
清除状态,类似重启一个logical node, 适用于(DECOMMISSIONED、 LOST)状态
|
purgeAll
|
清除所有logical node的状态
|
Flume
Source
1、Flume’s Tiered Event Sources
collectorSource[(port)]
|
Collector source,监听端口汇聚数据
|
autoCollectorSource
|
通过master协调物理节点自动汇聚数据
|
logicalSource
|
逻辑source,由master分配端口并监听rpcSink
|
2、Flume’s Basic Sources
null
|
|
console
|
监听用户编辑历史和快捷键输入,只在node_nowatch模式下可用
|
stdin
|
监听标准输入,只在node_nowatch模式下可用,每行将作为一个event source
|
rpcSource(port)
|
由rpc框架(thrift/avro)监听tcp端口
|
text("filename")
|
一次性读取一个文本,每行为一个event
|
tail("filename"[, startFromEnd=false])
|
每行为一个event。监听文件尾部的追加行,如果startFromEnd为true,tail将从文件尾读取,如果为false,tail将从文件开始读取全部数据
|
multitail("filename"[, file2 [,file3… ] ])
|
同上,同时监听多个文件的末尾
|
tailDir("dirname"[, fileregex=".*"[, startFromEnd=false[, recurseDepth=0]]])
|
监听目录中的文件末尾,使用正则去选定需要监听的文件(不包含目录),recurseDepth为递归监听其下子目录的深度
|
seqfile("filename")
|
监听hdfs的sequencefile,全路径
|
syslogUdp(port)
|
监听Udp端口
|
syslogTcp(port)
|
监听Tcp端口
|
syslogTcp1(port)
|
只监听Tcp端口的一个链接
|
execPeriodic("cmdline", ms)
|
周期执行指令,监听指令的输出,整个输出都被作为一个event
|
execStream("cmdline")
|
执行指令,监听指令的输出,输出的每一行被作为一个event
|
exec("cmdline"[,aggregate=false[,restart=false[,period=0]]])
|
执行指令,监听指令的输出,aggregate如果为true,整个输出作为一个event如果为false,则每行作为一个event。如果restart为true,则按period为周期重新运行
|
synth(msgCount,msgSize)
|
随即产生字符串event,msgCount为产生数量,msgSize为串长度
|
synthrndsize(msgCount,minSize,maxSize)
|
同上,minSize – maxSize
|
nonlsynth(msgCount,msgSize)
|
|
asciisynth(msgCount,msgSize)
|
Ascii码字符
|
twitter("username","pw"[,"url"])
|
尼玛twitter的插件啊
|
irc("server",port, "nick","chan")
|
|
scribe[(+port)]
|
Scribe插件
|
report[(periodMillis)]
|
生成所有physical node报告为事件源
|
Flume
Sinks
1、Flume’s Collector Tier Event Sinks
collectorSink( "fsdir","fsfileprefix",rollmillis)
|
collectorSink,数据通过collector汇聚之后发送到hdfs, fsdir 是hdfs目录,fsfileprefix为文件前缀码
|
2、Flume’s Agent Tier Event Sinks
agentSink[("machine"[,port])]
|
Defaults to agentE2ESink,如果省略,machine参数,默认使用flume.collector.event.host与flume.collector.event.port作为默认collecotr(以下同此)
|
agentE2ESink[("machine"[,port])]
|
执着的agent,如果agent发送event没有收到collector成功写入的状态码,该event将被agent重复发送,直到接到成功写入的状态码
|
agentDFOSink[("machine" [,port])]
|
本地热备agent,agent发现collector节点故障后,不断检查collector的存活状态以便重新发送event,在此间产生的数据将缓存到本地磁盘中
|
agentBESink[("machine"[,port])]
|
不负责的agent,如果collector故障,将不做任何处理,它发送的数据也将被直接丢弃
|
agentE2EChain("m1[:_p1_]" [,"m2[:_p2_]"[,…]])
|
指定多个collector提高可用性。 当向主collector发送event失效后,转向第二个collector发送,当所有的collector失败后,它会非常执着的再来一遍...
|
agentDFOChain("m1[:_p1_]"[, "m2[:_p2_]"[,…]])
|
同上,当向所有的collector发送事件失效后,他会将event缓存到本地磁盘,并检查collector状态,尝试重新发送
|
agentBEChain("m1[:_p1_]"[, "m2[:_p2_]"[,…]])
|
同上,当向所有的collector发送事件失效后,他会将event丢弃
|
autoE2EChain
|
无需指定collector, 由master协调管理event的流向
|
autoDFOChain
|
同上
|
autoBEChain
|
同上
|
3、Flume’s Logical Sinks
logicalSink("logicalnode")
|
|
4、Flume’s Basic Sinks
在不使用collector收集event的情况下,可将source直接发向basic sinks
null
|
null
|
console[("formatter")]
|
转发到控制台
|
text("txtfile" [,"formatter"])
|
转发到文本文件
|
seqfile("filename")
|
转发到seqfile
|
dfs("hdfspath")
|
转发到hdfs
|
customdfs("hdfspath"[, "format"])
|
自定义格式dfs
|
+escapedCustomDfs("hdfspath", "file", "format")
|
|
rpcSink("host"[, port])
|
Rpc框架
|
syslogTcp("host"[,port])
|
发向网络地址
|
irc("host",port, "nick", "chan")
|
|