Flume sinks案例HDFS Sink(每 5 秒在 hdfs 上创建一个新的文件夹)

参考网址:hdfs sinks
Flume sinks案例HDFS Sink(每 5 秒在 hdfs 上创建一个新的文件夹)

  1. %t Unix 时间戳,毫秒
  2. %{host} 替换名为"host"的事件 header 的值。支持任意标题名称。
  3. %a 星期几的短名,即 Mon, Tue,
  4. %A 星期几的全名,即 Monday, Tuesday, …
  5. %b 月份短名,即 Jan, Feb, …
  6. %B 月份全名,即 January, February, …
  7. %c 时间和日期,即 Thu Mar 3 23:05:25 2005
  8. %d day of month (01)
  9. %e day of month without padding (1)
  10. %D date; same as %m/%d/%y
  11. %H hour (00…23)
  12. %I hour (01…12)
  13. %j day of year (001…366)
  14. %k 小时 ( 0…23)
  15. %m 月份 (01…12)
  16. %n 不加前缀的月份 (1…12)
  17. %M 分钟(00…59)
  18. %p locale’s equivalent of am or pm
  19. %s seconds since 1970-01-01 00:00:00 UTC
  20. %S second (00…59)
  21. %y 年份最后两位 (00…99)
  22. %Y year (2010)
  23. %z +hhmm 数字时区 (for example, -0400)
  24. type - 组件类型名称,必须是 hdfs
  25. hdfs.path - HDFS 路 径 , 如 hdfs://mycluster/flume/mydata
  26. hdfs.filePrefix FlumeData flume 在 hdfs 目录中创建文件的前缀
  27. hdfs.fileSuffix - flume 在 hdfs 目录中创建文件的后缀。
  28. hdfs.inUsePrefix - flume 正在写入的临时文件的前缀
  29. hdfs.inUseSuffix .tmp flume 正在写入的临时文件的后缀
  30. hdfs.rollInterval 30 多长时间写一个新的文件 (0 = 不写新 的文件),单位秒
  31. hdfs.rollSize 1024 文件多大写新文件单位字节(0: 不基 于文件大小写新文件)
  32. hdfs.rollCount 10 当写一个新的文件之前要求当前文件写 入多少事件(0 = 不基于事件数写新文 件)
  33. hdfs.idleTimeout 0 多长时间没有新增事件则关闭文件(0 =不自动关闭文件)单位为秒
  34. hdfs.batchSize 100 写多少个事件开始向 HDFS 刷数据
  35. hdfs.codeC - 压缩格式:gzip, bzip2, lzo, lzop, snappy
  36. hdfs.fileType SequenceFil e 当前支持三个值:SequenceFile, DataStream,CompressedStream。 (1)DataStream 不压缩输出文件,不 要 设 置 codeC (2)CompressedStream 必 须 设 置 codeC
  37. hdfs.maxOpenFiles 5000 最大打开多少个文件。如果数量超了则 关闭最旧的文件
  38. hdfs.minBlockReplicas - 对每个 hdfs 的 block 设置最小副本 数。如果不指定,则使用 hadoop 的配 置的值。1
  39. hdfs.writeFormat - 对于 sequence file 记录的类型。 Text 或者 Writable(默认值)
  40. hdfs.callTimeout 10000 为 HDFS 操作如 open、write、flush、 close 准备的时间。如果 HDFS 操作很 慢,则可以设置这个值大一点儿。单位 毫秒
  41. hdfs.threadsPoolSize 10 每个 HDFS sink 的用于 HDFS io 操作 的线程数 (open, write, etc.)
  42. hdfs.rollTimerPoolSiz e 1 每个 HDFS sink 使用几个线程用于调 度计时文件滚动。
  43. hdfs.round false 支持文件夹滚动的属性。是否需要新建 文件夹。如果设置为 true,则会影响所 有的基于时间的逃逸字符,除了%t。
  44. hdfs.roundValue 1 该值与 roundUnit 一起指定文件夹滚 动的时长,会四舍五入
  45. hdfs.roundUnit second 控制文件夹个数。多长时间生成新文件 夹。可以设置为- second, minute 或者 hour.
  46. hdfs.timeZone Local Time Name of the timezone that should be used for resolving the directory path, e.g. America/Los_Angeles.
  47. hdfs.useLocalTimeStam p false 一般设置为 true,使用本地时间。如果 不使用本地时间,要求 flume 发送的事 件 header 中带有时间戳。该时间用于 替换逃逸字符

启动 hadoop 集群上的 hdfs,
将 node1 上的 option_sdir 拷贝 option_hdfs1,并修改:

# example.conf: A single-node Flume configuration 
# Name the components on this agent 
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source 
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /root/log
a1.sources.r1.fileHeader = true
a1.sources.r1.fileSuffix=.neusoft
# Describe the sink 
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/%S
a1.sinks.k1.hdfs.filePrefix = events-
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 5
a1.sinks.k1.hdfs.roundUnit = second
a1.sinks.k1.hdfs.useLocalTimeStamp = true
# Use a channel which buffers events in memory 
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel 
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

启动 node1 上的 flume

[root@node1 ~]# flume-ng agent -n a1 --conf-file option_hdfs1 -Dflume.root.logger=INFO,console

通过浏览器 node2:50070 访问 hdfs 目录,发现/flume 并不存在。
复制一个 node1 连接的 xshell 终端,/root/log 目录下拷贝文本文件

[root@node1 ~]# cp wc.txt log/

通过浏览器 node2:50070 访问 hdfs 目录,发现/flume 出现了
Flume sinks案例HDFS Sink(每 5 秒在 hdfs 上创建一个新的文件夹)

上一篇:HTTP系列(一)URI、URL、URN的区别


下一篇:flume入门