标签:let conf root dir bin time 前缀 文件类型 upload
1 1:监控端口数据官方案例 2 vim flume-netcat-logger.conf 3 添加内容如下: 4 # Name the components on this agent 5 a1.sources = r1 6 a1.sinks = k1 7 a1.channels = c1 8 9 # Describe/configure the source 10 a1.sources.r1.type = netcat 11 a1.sources.r1.bind = localhost 12 a1.sources.r1.port = 44444 13 14 # Describe the sink 15 a1.sinks.k1.type = logger 16 17 # Use a channel which buffers events in memory 18 a1.channels.c1.type = memory 19 a1.channels.c1.capacity = 1000 20 a1.channels.c1.transactionCapacity = 100 21 22 # Bind the source and sink to the channel 23 a1.sources.r1.channels = c1 24 a1.sinks.k1.channel = c1
2:实时监控单个追加文件 vim flume-file-hdfs.conf # Name the components on this agent a2.sources = r2 a2.sinks = k2 a2.channels = c2 # Describe/configure the source a2.sources.r2.type = exec a2.sources.r2.command = tail -F /opt/module/datas/A.log a2.sources.r2.shell = /bin/bash -c # Describe the sink a2.sinks.k2.type = hdfs a2.sinks.k2.hdfs.path = hdfs://hadoop102:9000/flume/%Y%m%d/%H #上传文件的前缀 a2.sinks.k2.hdfs.filePrefix = logs- #是否按照时间滚动文件夹 a2.sinks.k2.hdfs.round = true #多少时间单位创建一个新的文件夹 a2.sinks.k2.hdfs.roundValue = 1 #重新定义时间单位 a2.sinks.k2.hdfs.roundUnit = hour #是否使用本地时间戳 a2.sinks.k2.hdfs.useLocalTimeStamp = true #积攒多少个Event才flush到HDFS一次 a2.sinks.k2.hdfs.batchSize = 1000 #设置文件类型,可支持压缩 DataStream为数据流,没有使用压缩 a2.sinks.k2.hdfs.fileType = DataStream #多久生成一个新的文件 60秒 a2.sinks.k2.hdfs.rollInterval = 60 #设置每个文件的滚动大小 128M a2.sinks.k2.hdfs.rollSize = 134217700 #文件的滚动与Event数量无关 a2.sinks.k2.hdfs.rollCount = 0 # Use a channel which buffers events in memory a2.channels.c2.type = memory a2.channels.c2.capacity = 1000 a2.channels.c2.transactionCapacity = 100 # Bind the source and sink to the channel a2.sources.r2.channels = c2 a2.sinks.k2.channel = c2
1 3:实时监控目录下多个新文件 2 vim flume-dir-hdfs.conf 3 a3.sources = r3 4 a3.sinks = k3 5 a3.channels = c3 6 7 # Describe/configure the source 8 a3.sources.r3.type = spooldir 9 a3.sources.r3.spoolDir = /opt/module/flume-1.7.0/upload 10 a3.sources.r3.fileSuffix = .COMPLETED 11 a3.sources.r3.fileHeader = true 12 #忽略所有以.tmp结尾的文件,不上传 13 a3.sources.r3.ignorePattern = ([^ ]*\.tmp) 14 15 # Describe the sink 16 a3.sinks.k3.type = hdfs 17 a3.sinks.k3.hdfs.path = hdfs://hadoop102:9000/flume/upload/%Y%m%d/%H 18 #上传文件的前缀 19 a3.sinks.k3.hdfs.filePrefix = upload- 20 #是否按照时间滚动文件夹 21 a3.sinks.k3.hdfs.round = true 22 #多少时间单位创建一个新的文件夹 23 a3.sinks.k3.hdfs.roundValue = 1 24 #重新定义时间单位 25 a3.sinks.k3.hdfs.roundUnit = hour 26 #是否使用本地时间戳 27 a3.sinks.k3.hdfs.useLocalTimeStamp = true 28 #积攒多少个Event才flush到HDFS一次 29 a3.sinks.k3.hdfs.batchSize = 100 30 #设置文件类型,可支持压缩 31 a3.sinks.k3.hdfs.fileType = DataStream 32 #多久生成一个新的文件 33 a3.sinks.k3.hdfs.rollInterval = 60 34 #设置每个文件的滚动大小大概是128M 35 a3.sinks.k3.hdfs.rollSize = 134217700 36 #文件的滚动与Event数量无关 37 a3.sinks.k3.hdfs.rollCount = 0 38 39 # Use a channel which buffers events in memory 40 a3.channels.c3.type = memory 41 a3.channels.c3.capacity = 1000 42 a3.channels.c3.transactionCapacity = 100 43 44 # Bind the source and sink to the channel 45 a3.sources.r3.channels = c3 46 a3.sinks.k3.channel = c3 47 48 实时读取目录文件到HDFS案例 49 50 2.启动监控文件夹命令 51 [root@hadoop102 flume]$ bin/flume-ng agent --conf conf/ --name a3 --conf-file job/flume-dir-hdfs.conf 52 说明:在使用Spooling Directory Source时 53 不要在监控目录中创建并持续修改文件 54 上传完成的文件会以.COMPLETED结尾 55 被监控文件夹每500毫秒扫描一次文件变动 56 3. 向upload文件夹中添加文件 57 在/opt/module/flume目录下创建upload文件夹 58 [root@hadoop102 flume]$ mkdir upload 59 向upload文件夹中添加文件 60 [root@hadoop102 upload]$ touch bawei.txt 61 [root@hadoop102 upload]$ touch bawei.tmp 62 [root@hadoop102 upload]$ touch bawei.log
1 4: 实时监控目录下的多个追加文件 2 vim flume-taildir-hdfs.conf 3 a3.sources = r3 4 a3.sinks = k3 5 a3.channels = c3 6 7 # Describe/configure the source 8 a3.sources.r3.type = TAILDIR 9 a3.sources.r3.positionFile = /opt/module/flume-1.7.0/tail_dir.json 10 a3.sources.r3.filegroups = f1 f2 11 a3.sources.r3.filegroups.f1 = /opt/module/flume-1.7.0/files/.*file.* 12 a3.sources.r3.filegroups.f2 = /opt/module/flume-1.7.0/files/.*log.* 13 14 # Describe the sink 15 a3.sinks.k3.type = hdfs 16 a3.sinks.k3.hdfs.path = hdfs://hadoop102:9000/flume/upload2/%Y%m%d/%H 17 #上传文件的前缀 18 a3.sinks.k3.hdfs.filePrefix = upload- 19 #是否按照时间滚动文件夹 20 a3.sinks.k3.hdfs.round = true 21 #多少时间单位创建一个新的文件夹 22 a3.sinks.k3.hdfs.roundValue = 1 23 #重新定义时间单位 24 a3.sinks.k3.hdfs.roundUnit = hour 25 #是否使用本地时间戳 26 a3.sinks.k3.hdfs.useLocalTimeStamp = true 27 #积攒多少个Event才flush到HDFS一次 28 a3.sinks.k3.hdfs.batchSize = 100 29 #设置文件类型,可支持压缩 30 a3.sinks.k3.hdfs.fileType = DataStream 31 #多久生成一个新的文件 32 a3.sinks.k3.hdfs.rollInterval = 60 33 #设置每个文件的滚动大小大概是128M 34 a3.sinks.k3.hdfs.rollSize = 134217700 35 #文件的滚动与Event数量无关 36 a3.sinks.k3.hdfs.rollCount = 0 37 38 # Use a channel which buffers events in memory 39 a3.channels.c3.type = memory 40 a3.channels.c3.capacity = 1000 41 a3.channels.c3.transactionCapacity = 100 42 43 # Bind the source and sink to the channel 44 a3.sources.r3.channels = c3 45 a3.sinks.k3.channel = c3
标签:let conf root dir bin time 前缀 文件类型 upload
原文地址:https://www.cnblogs.com/xjqi/p/12923104.html