Flume
概述Flume
是Cloudera
提供的一個高可用的,高可靠的,分布式的海量日志采集、聚合和傳輸?shù)南到y(tǒng);
網(wǎng)站制作、成都網(wǎng)站制作,成都做網(wǎng)站公司-創(chuàng)新互聯(lián)公司已向近1000家企業(yè)提供了,網(wǎng)站設(shè)計,網(wǎng)站制作,網(wǎng)絡(luò)營銷等服務(wù)!設(shè)計與技術(shù)結(jié)合,多年網(wǎng)站推廣經(jīng)驗,合理的價格為您打造企業(yè)品質(zhì)網(wǎng)站。
Flume
基于流式架構(gòu),靈活簡單。
可以和任意存儲進(jìn)程集成
輸入的的數(shù)據(jù)速率大于寫入目的存儲的速率,Flume
會進(jìn)行緩沖,減小HDFS
的壓力
Flume
中的事務(wù)基于Channel
,使用了兩個事務(wù)模型(sender
+ receiver
),確保消息被可靠發(fā)送
Flume
使用兩個獨立的事務(wù)分別負(fù)責(zé)從Soucrce
到Channel
,以及從Channel
到Sink
的事件傳遞。一旦事務(wù)中所有的數(shù)據(jù)全部成功提交到Channel
,那么Source
才認(rèn)為該數(shù)據(jù)讀取完成,同理,只有成功被Sink
寫出去的數(shù)據(jù),才會從Channel
中移除
Agent
Agent
是一個JVM
進(jìn)程,它以事件的形式將數(shù)據(jù)從源頭傳遞到目的地
Agent
主要由Source
、Channel
、Sink
組成
Source
Source
是負(fù)責(zé)接收數(shù)據(jù)到Agent
的組件,可以處理各種類型,包括avro
、thrift
、exec
、jms
、spooling directory
、netcat
、sequence generator
、syslog
、http
、legacy
Channel
Channel
是位于Source
和Sink
之間的緩沖區(qū),因此,Channel
允許Source
和Sink
運作在不同的速率上,Channel
是線程安全的,可以同時處理幾個Source
的寫入操作和幾個Sink
的讀取操作。
Flume
自帶兩種Channel
:
Memory Channel
:內(nèi)存中的隊列速度快,適合在不需要關(guān)系數(shù)據(jù)丟失的情境下使用
File Channel
:將所有事件寫入磁盤,因此在程序關(guān)閉或機(jī)器宕機(jī)的情況下不會丟失數(shù)據(jù)
Sink
Sink
不斷地輪詢Channel
中的事件且批量地移除它們,并將這些事件批量寫入到存儲或索引系統(tǒng)、或者被發(fā)送到另一個Flume Agent
。
Sink
是完全事務(wù)性的,在從Channel
批量刪除數(shù)據(jù)之前,每個Sink
用Channel
啟動一個事務(wù),批量事件一旦成功寫出到存儲系統(tǒng)或下一個Flume Agent
,Sink
就利用Channel
提交事務(wù),事務(wù)一旦被提交,該Channel
從自己的內(nèi)部緩沖區(qū)刪除事件。
Sink
組件目的地包括hdfs
、logger
、avro
、thrift
、ipc
、file
、null
、HBase
、solr
、自定義。
Event
傳輸單元,Flume
數(shù)據(jù)傳輸?shù)幕締卧?,以事件的形式將?shù)據(jù)從源頭送至目的地。
Event
由可選的header
和載有數(shù)據(jù)的一個byte array
構(gòu)成,Header
是容納了key-value
字符串對的HashMap
。
通常一條數(shù)據(jù)就是一個 Event
,每2048
個字節(jié)劃分一個Event
。
這種模式是將多個Flume
給順序連接起來了,從最初的Source
開始到最終Sink
傳送的目的存儲系統(tǒng),此模式不建議橋接過多的Flume
數(shù)量, Flume
數(shù)量過多不僅會影響傳輸速率,而且一旦傳輸過程中某個節(jié)點Flume
宕機(jī),會影響整個傳輸系統(tǒng)。
Flum
支持將事件流向一個或者多個目的地,這種模式將數(shù)據(jù)源復(fù)制到多個Channel
中,每個Channel
都有相同的數(shù)據(jù),Sink
可以選擇傳送的不同的目的地。
Flume
支持使用將多個Sink
邏輯上分到一個Sink
組,Flume
將數(shù)據(jù)發(fā)送到不同的Sink
,主要解決負(fù)載均衡和故障轉(zhuǎn)移問題。
這種模式是我們最常見的,也非常實用,日常web
應(yīng)用通常分布在上百個服務(wù)器,大者甚至上千個、上萬個服務(wù)器,產(chǎn)生的日志,處理起來也非常麻煩,用Flume
的這種組合方式能很好的解決這一問題,每臺服務(wù)器部署一個Flume
采集日志,傳送到一個集中收集日志的Flume
,再由此Flume
上傳到 hdfs
、hive
、hbase
、jms
等進(jìn)行日志分析。
Agent
原理Flume
部署1、解壓apache-flume-1.7.0-bin.tar.gz
到/opt/module
目錄下
2、修改apache-flume-1.7.0-bi
的名稱為flume
3、將flume/conf
下的flume-env.sh.template
文件修改為flume-env.sh
,并配置flume-env.sh
中的JAVA_HOME
需求分析:
服務(wù)端監(jiān)聽本機(jī)44444
端口
服務(wù)端使用netcat
工具向44444
端口發(fā)送消息
最后將數(shù)據(jù)展示在控制臺上
實現(xiàn)步驟:
1、在job
文件夾下創(chuàng)建Agent
配置文件flume-netcat-logger.conf
[djm@hadoop102 job]$ vim flume-netcat-logger.conf
2、添加如下內(nèi)容:
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
3、啟動任務(wù)
[djm@hadoop102 flume]$ bin/flume-ng agent -c conf/ -n a1 –f job/flume-netcat-logger.conf -Dflume.root.logger==INFO,console
參數(shù)說明:
--conf conf/
表示配置文件存儲在conf/
目錄
--name a1
表示給 Agent 起名為a1
--conf-file job/flume-netcat.conf Flume
本次啟動讀取的配置文件是在job
文件夾下的 flume-telnet.conf
文件
-Dflume.root.logger==INFO,console -D
表示Flume
運行時動態(tài)修改flume.root.logger
參數(shù)屬性值,并將控制臺日志打印級別設(shè)置為INFO
級別
HDFS
需求分析:
實時監(jiān)控Hive
日志,并上傳到HDFS
中
實現(xiàn)步驟:
1、在job
文件夾下創(chuàng)建Agent
配置文件flume-file-hdfs.conf
[djm@hadoop102 job]$ vim flume-file-hdfs.conf
2、添加如下內(nèi)容:
# Name the components on this agent
a2.sources = r2
a2.sinks = k2
a2.channels = c2
# Describe/configure the source
a2.sources.r2.type = exec
a2.sources.r2.command = tail -F /opt/module/hive/logs/hive.log
a2.sources.r2.shell = /bin/bash -c
# Describe the sink
a2.sinks.k2.type = hdfs
a2.sinks.k2.hdfs.path = hdfs://hadoop102:9000/flume/%Y%m%d/%H
#上傳文件的前綴
a2.sinks.k2.hdfs.filePrefix = logs-
#是否按照時間滾動文件夾
a2.sinks.k2.hdfs.round = true
#多少時間單位創(chuàng)建一個新的文件夾
a2.sinks.k2.hdfs.roundValue = 1
#重新定義時間單位
a2.sinks.k2.hdfs.roundUnit = hour
#是否使用本地時間戳
a2.sinks.k2.hdfs.useLocalTimeStamp = true
#積攢多少個Event才flush到HDFS一次
a2.sinks.k2.hdfs.batchSize = 1000
#設(shè)置文件類型,可支持壓縮
a2.sinks.k2.hdfs.fileType = DataStream
#多久生成一個新的文件
a2.sinks.k2.hdfs.rollInterval = 60
#設(shè)置每個文件的滾動大小
a2.sinks.k2.hdfs.rollSize = 134217700
#文件的滾動與Event數(shù)量無關(guān)
a2.sinks.k2.hdfs.rollCount = 0
# Use a channel which buffers events in memory
a2.channels.c2.type = memory
a2.channels.c2.capacity = 1000
a2.channels.c2.transactionCapacity = 100
# Bind the source and sink to the channel
a2.sources.r2.channels = c2
a2.sinks.k2.channel = c2
3、啟動任務(wù)
[djm@hadoop102 flume]$ bin/flume-ng agent -c conf/ -n a2 –f job/flume-file-hdfs.conf
注意:
要想讀取Linux
系統(tǒng)中的文件,就得按照Linux
命令的規(guī)則執(zhí)行命令,由于Hive
日志在Linux
系統(tǒng)中所以讀取文件的類型選擇:exec
即execute
執(zhí)行的意思。表示執(zhí)行Linux
命令來讀取文件。
HDFS
需求分析:
使用Flume
監(jiān)聽整個目錄的文件
實現(xiàn)步驟:
1、在job
文件夾下創(chuàng)建Agent
配置文件flume-dir-hdfs.conf
[djm@hadoop102 job]$ vim flume-dir-hdfs.conf
2、添加如下內(nèi)容:
a3.sources = r3
a3.sinks = k3
a3.channels = c3
# Describe/configure the source
a3.sources.r3.type = spooldir
a3.sources.r3.spoolDir = /opt/module/flume/upload
a3.sources.r3.fileSuffix = .COMPLETED
a3.sources.r3.fileHeader = true
#忽略所有以.tmp結(jié)尾的文件,不上傳
a3.sources.r3.ignorePattern = ([^ ]*\.tmp)
# Describe the sink
a3.sinks.k3.type = hdfs
a3.sinks.k3.hdfs.path = hdfs://hadoop102:9000/flume/upload/%Y%m%d/%H
#上傳文件的前綴
a3.sinks.k3.hdfs.filePrefix = upload-
#是否按照時間滾動文件夾
a3.sinks.k3.hdfs.round = true
#多少時間單位創(chuàng)建一個新的文件夾
a3.sinks.k3.hdfs.roundValue = 1
#重新定義時間單位
a3.sinks.k3.hdfs.roundUnit = hour
#是否使用本地時間戳
a3.sinks.k3.hdfs.useLocalTimeStamp = true
#積攢多少個Event才flush到HDFS一次
a3.sinks.k3.hdfs.batchSize = 100
#設(shè)置文件類型,可支持壓縮
a3.sinks.k3.hdfs.fileType = DataStream
#多久生成一個新的文件
a3.sinks.k3.hdfs.rollInterval = 60
#設(shè)置每個文件的滾動大小大概是128M
a3.sinks.k3.hdfs.rollSize = 134217700
#文件的滾動與Event數(shù)量無關(guān)
a3.sinks.k3.hdfs.rollCount = 0
# Use a channel which buffers events in memory
a3.channels.c3.type = memory
a3.channels.c3.capacity = 1000
a3.channels.c3.transactionCapacity = 100
# Bind the source and sink to the channel
a3.sources.r3.channels = c3
a3.sinks.k3.channel = c3
3、啟動任務(wù)
[djm@hadoop102 flume]$ bin/flume-ng agent -c conf/ -n a3 –f job/flume-dir-hdfs.conf
注意:
不要在監(jiān)控目錄中創(chuàng)建并持續(xù)修改文件
需求分析:
使用Flume-1
監(jiān)控文件變動,Flume-1
將變動內(nèi)容傳遞給Flume-2
Flume-2
負(fù)責(zé)存儲到HDFS
同時Flume-1
將變動內(nèi)容傳遞給Flume-3
,Flume-3
負(fù)責(zé)輸出到Local FileSystem
1、在group1
文件夾下創(chuàng)建Agent
配置文件flume-file-flume.conf
[djm@hadoop102 group1]$ vim flume-file-flume.conf
2、添加如下內(nèi)容:
# Name the components on this agent
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2
# 將數(shù)據(jù)流復(fù)制給所有channel
a1.sources.r1.selector.type = replicating
# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /opt/module/hive/logs/hive.log
a1.sources.r1.shell = /bin/bash -c
# Describe the sink
# sink端的avro是一個數(shù)據(jù)發(fā)送者
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop102
a1.sinks.k1.port = 4141
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = hadoop102
a1.sinks.k2.port = 4142
# Describe the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2
3、在group1
文件夾下創(chuàng)建Agent
配置文件flume-flume-hdfs.conf
[djm@hadoop102 group1]$ vim flume-flume-hdfs.conf
4、添加如下內(nèi)容:
# Name the components on this agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1
# Describe/configure the source
# source端的avro是一個數(shù)據(jù)接收服務(wù)
a2.sources.r1.type = avro
a2.sources.r1.bind = hadoop102
a2.sources.r1.port = 4141
# Describe the sink
a2.sinks.k1.type = hdfs
a2.sinks.k1.hdfs.path = hdfs://hadoop102:9000/flume2/%Y%m%d/%H
#上傳文件的前綴
a2.sinks.k1.hdfs.filePrefix = flume2-
#是否按照時間滾動文件夾
a2.sinks.k1.hdfs.round = true
#多少時間單位創(chuàng)建一個新的文件夾
a2.sinks.k1.hdfs.roundValue = 1
#重新定義時間單位
a2.sinks.k1.hdfs.roundUnit = hour
#是否使用本地時間戳
a2.sinks.k1.hdfs.useLocalTimeStamp = true
#積攢多少個Event才flush到HDFS一次
a2.sinks.k1.hdfs.batchSize = 100
#設(shè)置文件類型,可支持壓縮
a2.sinks.k1.hdfs.fileType = DataStream
#多久生成一個新的文件
a2.sinks.k1.hdfs.rollInterval = 600
#設(shè)置每個文件的滾動大小大概是128M
a2.sinks.k1.hdfs.rollSize = 134217700
#文件的滾動與Event數(shù)量無關(guān)
a2.sinks.k1.hdfs.rollCount = 0
# Describe the channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1
5、在group1
文件夾下創(chuàng)建 Agent 配置文件flume-flume-dir.conf
[djm@hadoop102 group1]$ vim flume-flume-dir.conf
6、添加如下內(nèi)容:
# Name the components on this agent
a3.sources = r1
a3.sinks = k1
a3.channels = c2
# Describe/configure the source
a3.sources.r1.type = avro
a3.sources.r1.bind = hadoop102
a3.sources.r1.port = 4142
# Describe the sink
a3.sinks.k1.type = file_roll
a3.sinks.k1.sink.directory = /opt/module/data/flume3
# Describe the channel
a3.channels.c2.type = memory
a3.channels.c2.capacity = 1000
a3.channels.c2.transactionCapacity = 100
# Bind the source and sink to the channel
a3.sources.r1.channels = c2
a3.sinks.k1.channel = c2
7、啟動任務(wù)
[djm@hadoop102 flume]$ bin/flume-ng agent -c conf/ -n a3 -f job/group1/flume-flume-dir.conf
[djm@hadoop102 flume]$ bin/flume-ng agent -c conf/ -n a2 -f job/group1/flume-flume-hdfs.conf
[djm@hadoop102 flume]$ bin/flume-ng agent -c conf/ -n a1 -f job/group1/flume-file-flume.conf
注意:
Avro
是一種語言無關(guān)的數(shù)據(jù)序列化和RPC
框架
輸出的本地目錄必須是已經(jīng)存在的目錄,如果該目錄不存在,并不會創(chuàng)建新的目錄
必須先啟動Sink
存在的job
Sink
組)需求分析:
使用Flume-1
監(jiān)控端口數(shù)據(jù),Flume-1
將變動內(nèi)容傳遞給Flume-2
Flume-2
負(fù)責(zé)將數(shù)據(jù)展示在控制臺上
同時Flume-1
將變動內(nèi)容傳遞給Flume-3
,Flume-3
也負(fù)責(zé)將數(shù)據(jù)展示在控制臺上
實現(xiàn)步驟:
1、在group2
文件夾下創(chuàng)建Agent
配置文件flume-netcat-flume.conf
2、添加如下內(nèi)容:
# Name the components on this agent
a1.sources = r1
a1.channels = c1
a1.sinkgroups = g1
a1.sinks = k1 k2
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
a1.sinkgroups.g1.processor.type = load_balance
a1.sinkgroups.g1.processor.backoff = true
a1.sinkgroups.g1.processor.selector = round_robin
a1.sinkgroups.g1.processor.selector.maxTimeOut=10000
# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop102
a1.sinks.k1.port = 4141
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = hadoop102
a1.sinks.k2.port = 4142
# Describe the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c1
3、在group2
文件夾下創(chuàng)建Agent
配置文件flume-flume-console1.conf
# Name the components on this agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1
# Describe/configure the source
a2.sources.r1.type = avro
a2.sources.r1.bind = hadoop102
a2.sources.r1.port = 4141
# Describe the sink
a2.sinks.k1.type = logger
# Describe the channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1
5、在 group2
文件夾下創(chuàng)建Agent
配置文件flume-flume-console2.conf
6、添加如下內(nèi)容:
# Name the components on this agent
a3.sources = r1
a3.sinks = k1
a3.channels = c2
# Describe/configure the source
a3.sources.r1.type = avro
a3.sources.r1.bind = hadoop102
a3.sources.r1.port = 4142
# Describe the sink
a3.sinks.k1.type = logger
# Describe the channel
a3.channels.c2.type = memory
a3.channels.c2.capacity = 1000
a3.channels.c2.transactionCapacity = 100
# Bind the source and sink to the channel
a3.sources.r1.channels = c2
a3.sinks.k1.channel = c2
7、啟動任務(wù)
[djm@hadoop102 flume]$ bin/flume-ng agent -c conf/ -n a3 -f job/group2/flume-flume-console2.conf -Dflume.root.logger=INFO,console
[djm@hadoop102 flume]$ bin/flume-ng agent -c conf/ -n a2 -f job/group2/flume-flume-console1.conf -Dflume.root.logger=INFO,console
[djm@hadoop102 flume]$ bin/flume-ng agent -c conf/ -n a1 -f job/group2/flume-netcat-flume.conf
需求分析:
hadoop103
上的Flume-1
監(jiān)控文件/opt/module/group.log
hadoop102
上的Flume-2
監(jiān)控某一個端口的數(shù)據(jù)流
Flume-1
與Flume-2
將數(shù)據(jù)發(fā)送給hadoop104
上的Flume-3
,Flume-3
將最終數(shù)據(jù)打印到控制臺
實現(xiàn)步驟:
1、在group3
文件夾下創(chuàng)建Agent
配置文件flume1-logger-flume.conf
[djm@hadoop102 group3]$ vim flume1-logger-flume.conf
2、添加如下內(nèi)容:
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /opt/module/group.log
a1.sources.r1.shell = /bin/bash -c
# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop104
a1.sinks.k1.port = 4141
# Describe the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
3、在group3
文件夾下創(chuàng)建Agent
配置文件flume2-netcat-flume.conf
[djm@hadoop102 group3]$ vim flume2-netcat-flume.conf
4、添加如下內(nèi)容:
# Name the components on this agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1
# Describe/configure the source
a2.sources.r1.type = netcat
a2.sources.r1.bind = hadoop102
a2.sources.r1.port = 44444
# Describe the sink
a2.sinks.k1.type = avro
a2.sinks.k1.hostname = hadoop104
a2.sinks.k1.port = 4141
# Use a channel which buffers events in memory
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1
5、在group3
文件夾下創(chuàng)建Agent
配置文件flume3-flume-logger.conf
[djm@hadoop102 group3]$ vim flume3-flume-logger.conf
6、添加如下內(nèi)容:
# Name the components on this agent
a3.sources = r1
a3.sinks = k1
a3.channels = c1
# Describe/configure the source
a3.sources.r1.type = avro
a3.sources.r1.bind = hadoop104
a3.sources.r1.port = 4141
# Describe the sink
# Describe the sink
a3.sinks.k1.type = logger
# Describe the channel
a3.channels.c1.type = memory
a3.channels.c1.capacity = 1000
a3.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a3.sources.r1.channels = c1
a3.sinks.k1.channel = c1
7、分發(fā)配置文件
[djm@hadoop102 group3]$ xsync /opt/module/flume/job
8、啟動任務(wù)
[djm@hadoop104 flume]$ bin/flume-ng agent -c conf/ -n a3 -f job/group3/flume3-flume-logger.conf -Dflume.root.logger=INFO,console
[djm@hadoop102 flume]$ bin/flume-ng agent -c conf/ -n a2 -f job/group3/flume2-netcat-flume.conf
[djm@hadoop103 flume]$ bin/flume-ng agent -c conf/ -n a1 -f job/group3/flume1-logger-flume.conf
Ganglia
部署1、安裝httpd
服務(wù)與php
yum -y install httpd php
2、安裝其他依賴
yum -y install rrdtool perl-rrdtool rrdtool-devel
3、安裝ganglia
rpm -Uvh http://dl.fedoraproject.org/pub/epel/6/x86_64/epel-release-6-8.noarch.rpm
yum -y install ganglia-gmetad ganglia-gmond ganglia-web
4、修改ganglia
配置文件
vim /etc/httpd/conf.d/ganglia.conf
#
# Ganglia monitoring system php web frontend
#
Alias /ganglia /usr/share/ganglia
<Location /ganglia>
# Require local
Require all granted
# Require ip 10.1.2.3
# Require host example.org
</Location>
特別注意:以下配置是不能起作用的
<Location /ganglia>
Order deny,allow
Allow from all
</Location>
5、修改gmetad
配置文件
vim /etc/ganglia/gmetad.conf
data_source "hadoop102" 192.168.1.102
6、修改gmond
配置文件
vim /etc/ganglia/gmond.conf
cluster {
#name = "unspecified"
name = "hadoop102"
owner = "unspecified"
latlong = "unspecified"
url = "unspecified"
}
udp_send_channel {
#bind_hostname = yes # Highly recommended, soon to be default.
# This option tells gmond to use a source address
# that resolves to the machine's hostname. Without
# this, the metrics may appear to come from any
# interface and the DNS names associated with
# those IPs will be used to create the RRDs.
#mcast_join = 239.2.11.71
host = 192.168.10.102
port = 8649
ttl = 1
}
/* You can specify as many udp_recv_channels as you like as well. */
udp_recv_channel {
#mcast_join = 239.2.11.71
port = 8649
#bind = 239.2.11.71
bind = 192.168.10.102
retry_bind = true
# Size of the UDP buffer. If you are handling lots of metrics you really
# should bump it up to e.g. 10MB or even higher.
# buffer = 10485760
}
6、查看SELinux
狀態(tài)
sestatus
如果不是disabled
,需修改以下配置文件:
vim /etc/selinux/config
或者臨時關(guān)閉SELinux
:
setenforce 0
7、啟動ganglia
systemctl start httpd
systemctl start gmetad
systemctl start gmond
8、打開瀏覽器訪問
http://hadoop102/ganglia/
如果完成以上操作仍出現(xiàn)權(quán)限不足錯誤,可修改/var/lib/ganglia
目錄的權(quán)限嘗試
chmod -R 777 /var/lib/ganglia
Source
需求分析:
編碼實現(xiàn):
1、引入依賴
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.7.0</version>
</dependency>
2、代碼編寫
package com.djm.flume;
import org.apache.flume.Context;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.PollableSource;
import org.apache.flume.conf.Configurable;
import org.apache.flume.event.SimpleEvent;
import org.apache.flume.source.AbstractSource;
import java.util.HashMap;
public class MySource extends AbstractSource implements Configurable, PollableSource {
//定義配置文件將來要讀取的字段
private Long delay;
private String field;
/**
* 接收數(shù)據(jù),將數(shù)據(jù)封裝成一個個event,寫入channel
* @return
* @throws EventDeliveryException
*/
public Status process() throws EventDeliveryException {
HashMap<String, String> hearderMap = new HashMap<>();
SimpleEvent event = new SimpleEvent();
try {
for (int i = 0; i < 5; i++) {
event.setHeaders(hearderMap);
event.setBody((field + i).getBytes());
getChannelProcessor().processEvent(event);
Thread.sleep(delay);
}
} catch (InterruptedException e) {
e.printStackTrace();
return Status.BACKOFF;
}
return Status.READY;
}
public long getBackOffSleepIncrement() {
return 0;
}
public long getMaxBackOffSleepInterval() {
return 0;
}
/**
* 讀取配置文件
* @param context
*/
public void configure(Context context) {
delay = context.getLong("delay");
field = context.getString("field", "hello");
}
}
3、打包測試
利用Maven
打包并上傳到 /opt/module/flume/lib
目錄下
在job
文件夾下創(chuàng)建Agent
配置文件mysource.conf
[djm@hadoop102 job]$ vim mysource.conf
添加如下內(nèi)容:
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = com.djm.flume.MySource
a1.sources.r1.delay = 1000
a1.sources.r1.field = djm
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
啟動任務(wù)
[djm@hadoop102 flume]$ bin/flume-ng agent -c conf/ -f job/mysource.conf -n a1 -Dflume.root.logger=INFO,console
Sink
需求分析:
編碼實現(xiàn):
1、引入依賴
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.7.0</version>
</dependency>
2、代碼編寫
package com.djm.flume;
import org.apache.flume.*;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MySink extends AbstractSink implements Configurable {
private static final Logger LOG = LoggerFactory.getLogger(AbstractSink.class);
private String prefix;
private String suffix;
@Override
public Status process() throws EventDeliveryException {
Status status = null;
Channel channel = getChannel();
Transaction transaction = channel.getTransaction();
try {
Event event;
transaction.begin();
while ((event = channel.take()) == null) {
Thread.sleep(200);
}
LOG.info(prefix + new String(event.getBody()) + suffix);
transaction.commit();
status = Status.READY;
} catch (Throwable e) {
transaction.rollback();
status = Status.BACKOFF;
if (e instanceof Error)
throw (Error) e;
} finally {
transaction.close();
}
return status;
}
@Override
public void configure(Context context) {
prefix = context.getString("prefix");
suffix = context.getString("suffix");
}
}
3、打包測試
利用Maven
打包并上傳到 /opt/module/flume/lib
目錄下
在job
文件夾下創(chuàng)建Agent
配置文件mysource.conf
[djm@hadoop102 job]$ vim mysink.conf
添加如下內(nèi)容:
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
# Describe the sink
a1.sinks.k1.type = com.djm.flume.MySink
a1.sinks.k1.prefix = djm:
a1.sinks.k1.suffix = :end
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
啟動任務(wù)
[djm@hadoop102 flume]$ bin/flume-ng agent -c conf/ -f job/mysink.conf -n a1 -Dflume.root.logger=INFO,console
Flume
參數(shù)調(diào)優(yōu)Source
增加Source
個數(shù)可以增大Source
的讀取數(shù)據(jù)的能力,例如:當(dāng)某一個目錄產(chǎn)生的文件過多時需要將這個文件目錄拆分成多個文件目錄,同時配置好多個Source
以保證Source
有足夠的能力獲取到新產(chǎn)生的數(shù)據(jù)。
batchSize
參數(shù)決定Source
一次批量運輸?shù)?code>Channel的Event
條數(shù),適當(dāng)調(diào)大這個參數(shù)可以提高Source
搬運Event
到Channel
時的性能。
Channel
Type
選擇Memory Channel
時Channel
的性能最好,但是如果Flume
進(jìn)程意外掛掉可能會丟失數(shù)據(jù)
Type
選擇File Channel
時Channel
的容錯性更好,但是性能上會比Memory Channel
差,使用File Channel
時`dataDirs 配置多個不同盤下的目錄可以提高性能。
Capacity
參數(shù)決定Channel
可容納最大的Event
條數(shù),TransactionCapacity
參數(shù)決定每次Source
往Channel
里面寫的最大Event
條數(shù)和每次Sink
從Channel
里面讀的最大Event
條數(shù),TransactionCapacity
需要大于Source
和Sink
的batchSize
參數(shù)。
Sink
增加Sink
的個數(shù)可以增加Sink
消費Event
的能力,Sink
也不是越多越好夠用就行,過多的Sink
會占用系統(tǒng)資源,造成系統(tǒng)資源不必要的浪費。
batchSize
參數(shù)決定Sink
一次批量從Channel
讀取的Event
條數(shù),適當(dāng)調(diào)大這個參數(shù)可以提高Sink
從Channel
搬出Event
的性能。
當(dāng)前文章:Flume入門
瀏覽地址:http://jinyejixie.com/article38/iisdsp.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供網(wǎng)頁設(shè)計公司、響應(yīng)式網(wǎng)站、定制網(wǎng)站、網(wǎng)站營銷、移動網(wǎng)站建設(shè)、關(guān)鍵詞優(yōu)化
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)