成人午夜视频全免费观看高清-秋霞福利视频一区二区三区-国产精品久久久久电影小说-亚洲不卡区三一区三区一区

Flume入門

1Flume概述

1.1 定義

FlumeCloudera提供的一個高可用的,高可靠的,分布式的海量日志采集、聚合和傳輸?shù)南到y(tǒng);

網(wǎng)站制作、成都網(wǎng)站制作,成都做網(wǎng)站公司-創(chuàng)新互聯(lián)公司已向近1000家企業(yè)提供了,網(wǎng)站設(shè)計,網(wǎng)站制作,網(wǎng)絡(luò)營銷等服務(wù)!設(shè)計與技術(shù)結(jié)合,多年網(wǎng)站推廣經(jīng)驗,合理的價格為您打造企業(yè)品質(zhì)網(wǎng)站。

Flume基于流式架構(gòu),靈活簡單。

1.2 特點

可以和任意存儲進(jìn)程集成

輸入的的數(shù)據(jù)速率大于寫入目的存儲的速率,Flume會進(jìn)行緩沖,減小HDFS的壓力

Flume中的事務(wù)基于Channel,使用了兩個事務(wù)模型(sender+ receiver),確保消息被可靠發(fā)送

Flume使用兩個獨立的事務(wù)分別負(fù)責(zé)從SoucrceChannel,以及從ChannelSink 的事件傳遞。一旦事務(wù)中所有的數(shù)據(jù)全部成功提交到Channel,那么Source才認(rèn)為該數(shù)據(jù)讀取完成,同理,只有成功被Sink寫出去的數(shù)據(jù),才會從Channel中移除

1.3 組成架構(gòu)

Flume 入門

1.3.1Agent

Agent是一個JVM進(jìn)程,它以事件的形式將數(shù)據(jù)從源頭傳遞到目的地

Agent主要由Source、ChannelSink組成

1.3.2Source

Source是負(fù)責(zé)接收數(shù)據(jù)到Agent的組件,可以處理各種類型,包括avro、thrift、exec、jmsspooling directory、netcatsequence generator、sysloghttp、legacy

1.3.3Channel

Channel是位于SourceSink之間的緩沖區(qū),因此,Channel允許SourceSink運作在不同的速率上,Channel是線程安全的,可以同時處理幾個Source的寫入操作和幾個Sink的讀取操作。

Flume自帶兩種Channel

Memory Channel:內(nèi)存中的隊列速度快,適合在不需要關(guān)系數(shù)據(jù)丟失的情境下使用

File Channel:將所有事件寫入磁盤,因此在程序關(guān)閉或機(jī)器宕機(jī)的情況下不會丟失數(shù)據(jù)

1.3.4Sink

Sink不斷地輪詢Channel中的事件且批量地移除它們,并將這些事件批量寫入到存儲或索引系統(tǒng)、或者被發(fā)送到另一個Flume Agent

Sink是完全事務(wù)性的,在從Channel批量刪除數(shù)據(jù)之前,每個SinkChannel啟動一個事務(wù),批量事件一旦成功寫出到存儲系統(tǒng)或下一個Flume Agent,Sink就利用Channel提交事務(wù),事務(wù)一旦被提交,該Channel從自己的內(nèi)部緩沖區(qū)刪除事件。

Sink組件目的地包括hdfslogger、avrothrift、ipc、file、nullHBase、solr、自定義。

1.3.5Event

傳輸單元,Flume數(shù)據(jù)傳輸?shù)幕締卧?,以事件的形式將?shù)據(jù)從源頭送至目的地。

Event由可選的header和載有數(shù)據(jù)的一個byte array構(gòu)成,Header是容納了key-value字符串對的HashMap

通常一條數(shù)據(jù)就是一個 Event,每2048個字節(jié)劃分一個Event

1.4 拓?fù)浣Y(jié)構(gòu)

Flume 入門

這種模式是將多個Flume給順序連接起來了,從最初的Source開始到最終Sink傳送的目的存儲系統(tǒng),此模式不建議橋接過多的Flume數(shù)量, Flume數(shù)量過多不僅會影響傳輸速率,而且一旦傳輸過程中某個節(jié)點Flume宕機(jī),會影響整個傳輸系統(tǒng)。

Flume 入門

Flum支持將事件流向一個或者多個目的地,這種模式將數(shù)據(jù)源復(fù)制到多個Channel中,每個Channel都有相同的數(shù)據(jù),Sink可以選擇傳送的不同的目的地。

Flume 入門

Flume支持使用將多個Sink邏輯上分到一個Sink組,Flume將數(shù)據(jù)發(fā)送到不同的Sink,主要解決負(fù)載均衡和故障轉(zhuǎn)移問題。

Flume 入門

這種模式是我們最常見的,也非常實用,日常web應(yīng)用通常分布在上百個服務(wù)器,大者甚至上千個、上萬個服務(wù)器,產(chǎn)生的日志,處理起來也非常麻煩,用Flume的這種組合方式能很好的解決這一問題,每臺服務(wù)器部署一個Flume采集日志,傳送到一個集中收集日志的Flume,再由此Flume上傳到 hdfs、hivehbase、jms等進(jìn)行日志分析。

1.5Agent原理

Flume 入門

2Flume部署

1、解壓apache-flume-1.7.0-bin.tar.gz/opt/module目錄下

2、修改apache-flume-1.7.0-bi的名稱為flume

3、將flume/conf下的flume-env.sh.template文件修改為flume-env.sh,并配置flume-env.sh中的JAVA_HOME

3 企業(yè)開發(fā)案例

3.1 監(jiān)控端口數(shù)據(jù)

需求分析:

服務(wù)端監(jiān)聽本機(jī)44444端口

服務(wù)端使用netcat工具向44444端口發(fā)送消息

最后將數(shù)據(jù)展示在控制臺上

實現(xiàn)步驟:

1、在job文件夾下創(chuàng)建Agent配置文件flume-netcat-logger.conf

[djm@hadoop102 job]$ vim flume-netcat-logger.conf

2、添加如下內(nèi)容:

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

# Describe the sink
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

3、啟動任務(wù)

[djm@hadoop102 flume]$ bin/flume-ng agent -c conf/ -n a1 –f job/flume-netcat-logger.conf -Dflume.root.logger==INFO,console

參數(shù)說明:

--conf conf/表示配置文件存儲在conf/目錄

--name a1表示給 Agent 起名為a1

--conf-file job/flume-netcat.conf Flume本次啟動讀取的配置文件是在job文件夾下的 flume-telnet.conf文件

-Dflume.root.logger==INFO,console -D表示Flume運行時動態(tài)修改flume.root.logger參數(shù)屬性值,并將控制臺日志打印級別設(shè)置為INFO級別

3.2 實時讀取本地文件到HDFS

需求分析:

實時監(jiān)控Hive日志,并上傳到HDFS

實現(xiàn)步驟:

1、在job文件夾下創(chuàng)建Agent配置文件flume-file-hdfs.conf

[djm@hadoop102 job]$ vim flume-file-hdfs.conf

2、添加如下內(nèi)容:

# Name the components on this agent
a2.sources = r2
a2.sinks = k2
a2.channels = c2

# Describe/configure the source
a2.sources.r2.type = exec
a2.sources.r2.command = tail -F /opt/module/hive/logs/hive.log
a2.sources.r2.shell = /bin/bash -c

# Describe the sink
a2.sinks.k2.type = hdfs
a2.sinks.k2.hdfs.path = hdfs://hadoop102:9000/flume/%Y%m%d/%H
#上傳文件的前綴
a2.sinks.k2.hdfs.filePrefix = logs-
#是否按照時間滾動文件夾
a2.sinks.k2.hdfs.round = true
#多少時間單位創(chuàng)建一個新的文件夾
a2.sinks.k2.hdfs.roundValue = 1
#重新定義時間單位
a2.sinks.k2.hdfs.roundUnit = hour
#是否使用本地時間戳
a2.sinks.k2.hdfs.useLocalTimeStamp = true
#積攢多少個Event才flush到HDFS一次
a2.sinks.k2.hdfs.batchSize = 1000
#設(shè)置文件類型,可支持壓縮
a2.sinks.k2.hdfs.fileType = DataStream
#多久生成一個新的文件
a2.sinks.k2.hdfs.rollInterval = 60
#設(shè)置每個文件的滾動大小
a2.sinks.k2.hdfs.rollSize = 134217700
#文件的滾動與Event數(shù)量無關(guān)
a2.sinks.k2.hdfs.rollCount = 0

# Use a channel which buffers events in memory
a2.channels.c2.type = memory
a2.channels.c2.capacity = 1000
a2.channels.c2.transactionCapacity = 100

# Bind the source and sink to the channel
a2.sources.r2.channels = c2
a2.sinks.k2.channel = c2

3、啟動任務(wù)

[djm@hadoop102 flume]$ bin/flume-ng agent -c conf/ -n a2 –f job/flume-file-hdfs.conf

注意:

要想讀取Linux系統(tǒng)中的文件,就得按照Linux命令的規(guī)則執(zhí)行命令,由于Hive日志在Linux系統(tǒng)中所以讀取文件的類型選擇:execexecute執(zhí)行的意思。表示執(zhí)行Linux命令來讀取文件。

3.3 實時讀取目錄文件到 HDFS

需求分析:

使用Flume監(jiān)聽整個目錄的文件

實現(xiàn)步驟:

1、在job文件夾下創(chuàng)建Agent配置文件flume-dir-hdfs.conf

[djm@hadoop102 job]$ vim flume-dir-hdfs.conf

2、添加如下內(nèi)容:

a3.sources = r3
a3.sinks = k3
a3.channels = c3

# Describe/configure the source
a3.sources.r3.type = spooldir
a3.sources.r3.spoolDir = /opt/module/flume/upload
a3.sources.r3.fileSuffix = .COMPLETED
a3.sources.r3.fileHeader = true
#忽略所有以.tmp結(jié)尾的文件,不上傳
a3.sources.r3.ignorePattern = ([^ ]*\.tmp)

# Describe the sink
a3.sinks.k3.type = hdfs
a3.sinks.k3.hdfs.path = hdfs://hadoop102:9000/flume/upload/%Y%m%d/%H
#上傳文件的前綴
a3.sinks.k3.hdfs.filePrefix = upload-
#是否按照時間滾動文件夾
a3.sinks.k3.hdfs.round = true
#多少時間單位創(chuàng)建一個新的文件夾
a3.sinks.k3.hdfs.roundValue = 1
#重新定義時間單位
a3.sinks.k3.hdfs.roundUnit = hour
#是否使用本地時間戳
a3.sinks.k3.hdfs.useLocalTimeStamp = true
#積攢多少個Event才flush到HDFS一次
a3.sinks.k3.hdfs.batchSize = 100
#設(shè)置文件類型,可支持壓縮
a3.sinks.k3.hdfs.fileType = DataStream
#多久生成一個新的文件
a3.sinks.k3.hdfs.rollInterval = 60
#設(shè)置每個文件的滾動大小大概是128M
a3.sinks.k3.hdfs.rollSize = 134217700
#文件的滾動與Event數(shù)量無關(guān)
a3.sinks.k3.hdfs.rollCount = 0

# Use a channel which buffers events in memory
a3.channels.c3.type = memory
a3.channels.c3.capacity = 1000
a3.channels.c3.transactionCapacity = 100

# Bind the source and sink to the channel
a3.sources.r3.channels = c3
a3.sinks.k3.channel = c3

3、啟動任務(wù)

[djm@hadoop102 flume]$ bin/flume-ng agent -c conf/ -n a3 –f job/flume-dir-hdfs.conf

注意:

不要在監(jiān)控目錄中創(chuàng)建并持續(xù)修改文件

3.4 單數(shù)據(jù)源多出口案例(選擇器)

需求分析:

使用Flume-1監(jiān)控文件變動,Flume-1將變動內(nèi)容傳遞給Flume-2

Flume-2負(fù)責(zé)存儲到HDFS

同時Flume-1將變動內(nèi)容傳遞給Flume-3,Flume-3負(fù)責(zé)輸出到Local FileSystem

1、在group1文件夾下創(chuàng)建Agent配置文件flume-file-flume.conf

[djm@hadoop102 group1]$ vim flume-file-flume.conf

2、添加如下內(nèi)容:

# Name the components on this agent
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2
# 將數(shù)據(jù)流復(fù)制給所有channel
a1.sources.r1.selector.type = replicating

# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /opt/module/hive/logs/hive.log
a1.sources.r1.shell = /bin/bash -c

# Describe the sink
# sink端的avro是一個數(shù)據(jù)發(fā)送者
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop102 
a1.sinks.k1.port = 4141

a1.sinks.k2.type = avro
a1.sinks.k2.hostname = hadoop102
a1.sinks.k2.port = 4142

# Describe the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2

3、在group1文件夾下創(chuàng)建Agent配置文件flume-flume-hdfs.conf

[djm@hadoop102 group1]$ vim flume-flume-hdfs.conf

4、添加如下內(nèi)容:

# Name the components on this agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1

# Describe/configure the source
# source端的avro是一個數(shù)據(jù)接收服務(wù)
a2.sources.r1.type = avro
a2.sources.r1.bind = hadoop102
a2.sources.r1.port = 4141

# Describe the sink
a2.sinks.k1.type = hdfs
a2.sinks.k1.hdfs.path = hdfs://hadoop102:9000/flume2/%Y%m%d/%H
#上傳文件的前綴
a2.sinks.k1.hdfs.filePrefix = flume2-
#是否按照時間滾動文件夾
a2.sinks.k1.hdfs.round = true
#多少時間單位創(chuàng)建一個新的文件夾
a2.sinks.k1.hdfs.roundValue = 1
#重新定義時間單位
a2.sinks.k1.hdfs.roundUnit = hour
#是否使用本地時間戳
a2.sinks.k1.hdfs.useLocalTimeStamp = true
#積攢多少個Event才flush到HDFS一次
a2.sinks.k1.hdfs.batchSize = 100
#設(shè)置文件類型,可支持壓縮
a2.sinks.k1.hdfs.fileType = DataStream
#多久生成一個新的文件
a2.sinks.k1.hdfs.rollInterval = 600
#設(shè)置每個文件的滾動大小大概是128M
a2.sinks.k1.hdfs.rollSize = 134217700
#文件的滾動與Event數(shù)量無關(guān)
a2.sinks.k1.hdfs.rollCount = 0

# Describe the channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1

5、在group1文件夾下創(chuàng)建 Agent 配置文件flume-flume-dir.conf

[djm@hadoop102 group1]$ vim flume-flume-dir.conf

6、添加如下內(nèi)容:

# Name the components on this agent
a3.sources = r1
a3.sinks = k1
a3.channels = c2

# Describe/configure the source
a3.sources.r1.type = avro
a3.sources.r1.bind = hadoop102
a3.sources.r1.port = 4142

# Describe the sink
a3.sinks.k1.type = file_roll
a3.sinks.k1.sink.directory = /opt/module/data/flume3

# Describe the channel
a3.channels.c2.type = memory
a3.channels.c2.capacity = 1000
a3.channels.c2.transactionCapacity = 100

# Bind the source and sink to the channel
a3.sources.r1.channels = c2
a3.sinks.k1.channel = c2

7、啟動任務(wù)

[djm@hadoop102 flume]$ bin/flume-ng agent -c conf/ -n a3 -f job/group1/flume-flume-dir.conf
[djm@hadoop102 flume]$ bin/flume-ng agent -c conf/ -n a2 -f job/group1/flume-flume-hdfs.conf
[djm@hadoop102 flume]$ bin/flume-ng agent -c conf/ -n a1 -f job/group1/flume-file-flume.conf

注意:

Avro是一種語言無關(guān)的數(shù)據(jù)序列化和RPC框架

輸出的本地目錄必須是已經(jīng)存在的目錄,如果該目錄不存在,并不會創(chuàng)建新的目錄

必須先啟動Sink存在的job

3.5 單數(shù)據(jù)源多出口案例(Sink組)

需求分析:

使用Flume-1監(jiān)控端口數(shù)據(jù),Flume-1將變動內(nèi)容傳遞給Flume-2

Flume-2負(fù)責(zé)將數(shù)據(jù)展示在控制臺上

同時Flume-1將變動內(nèi)容傳遞給Flume-3,Flume-3也負(fù)責(zé)將數(shù)據(jù)展示在控制臺上

實現(xiàn)步驟:

1、在group2文件夾下創(chuàng)建Agent配置文件flume-netcat-flume.conf

2、添加如下內(nèi)容:

# Name the components on this agent
a1.sources = r1
a1.channels = c1
a1.sinkgroups = g1
a1.sinks = k1 k2

# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

a1.sinkgroups.g1.processor.type = load_balance
a1.sinkgroups.g1.processor.backoff = true
a1.sinkgroups.g1.processor.selector = round_robin
a1.sinkgroups.g1.processor.selector.maxTimeOut=10000

# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop102
a1.sinks.k1.port = 4141

a1.sinks.k2.type = avro
a1.sinks.k2.hostname = hadoop102
a1.sinks.k2.port = 4142

# Describe the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c1

3、在group2文件夾下創(chuàng)建Agent配置文件flume-flume-console1.conf

# Name the components on this agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1

# Describe/configure the source
a2.sources.r1.type = avro
a2.sources.r1.bind = hadoop102
a2.sources.r1.port = 4141

# Describe the sink
a2.sinks.k1.type = logger

# Describe the channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1

5、在 group2文件夾下創(chuàng)建Agent配置文件flume-flume-console2.conf

6、添加如下內(nèi)容:

# Name the components on this agent
a3.sources = r1
a3.sinks = k1
a3.channels = c2

# Describe/configure the source
a3.sources.r1.type = avro
a3.sources.r1.bind = hadoop102
a3.sources.r1.port = 4142

# Describe the sink
a3.sinks.k1.type = logger

# Describe the channel
a3.channels.c2.type = memory
a3.channels.c2.capacity = 1000
a3.channels.c2.transactionCapacity = 100

# Bind the source and sink to the channel
a3.sources.r1.channels = c2
a3.sinks.k1.channel = c2

7、啟動任務(wù)

[djm@hadoop102 flume]$ bin/flume-ng agent -c conf/ -n a3 -f job/group2/flume-flume-console2.conf -Dflume.root.logger=INFO,console
[djm@hadoop102 flume]$ bin/flume-ng agent -c conf/ -n a2 -f job/group2/flume-flume-console1.conf -Dflume.root.logger=INFO,console
[djm@hadoop102 flume]$ bin/flume-ng agent -c conf/ -n a1 -f job/group2/flume-netcat-flume.conf

3.6 多數(shù)據(jù)源匯總

需求分析:

hadoop103上的Flume-1監(jiān)控文件/opt/module/group.log

hadoop102上的Flume-2監(jiān)控某一個端口的數(shù)據(jù)流

Flume-1Flume-2將數(shù)據(jù)發(fā)送給hadoop104上的Flume-3,Flume-3將最終數(shù)據(jù)打印到控制臺

實現(xiàn)步驟:

1、在group3文件夾下創(chuàng)建Agent配置文件flume1-logger-flume.conf

[djm@hadoop102 group3]$ vim flume1-logger-flume.conf 

2、添加如下內(nèi)容:

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /opt/module/group.log
a1.sources.r1.shell = /bin/bash -c

# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop104
a1.sinks.k1.port = 4141

# Describe the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

3、在group3文件夾下創(chuàng)建Agent配置文件flume2-netcat-flume.conf

[djm@hadoop102 group3]$ vim flume2-netcat-flume.conf

4、添加如下內(nèi)容:

# Name the components on this agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1

# Describe/configure the source
a2.sources.r1.type = netcat
a2.sources.r1.bind = hadoop102
a2.sources.r1.port = 44444

# Describe the sink
a2.sinks.k1.type = avro
a2.sinks.k1.hostname = hadoop104
a2.sinks.k1.port = 4141

# Use a channel which buffers events in memory
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1

5、在group3文件夾下創(chuàng)建Agent配置文件flume3-flume-logger.conf

[djm@hadoop102 group3]$ vim flume3-flume-logger.conf

6、添加如下內(nèi)容:

# Name the components on this agent
a3.sources = r1
a3.sinks = k1
a3.channels = c1

# Describe/configure the source
a3.sources.r1.type = avro
a3.sources.r1.bind = hadoop104
a3.sources.r1.port = 4141

# Describe the sink
# Describe the sink
a3.sinks.k1.type = logger

# Describe the channel
a3.channels.c1.type = memory
a3.channels.c1.capacity = 1000
a3.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a3.sources.r1.channels = c1
a3.sinks.k1.channel = c1

7、分發(fā)配置文件

[djm@hadoop102 group3]$ xsync /opt/module/flume/job

8、啟動任務(wù)

[djm@hadoop104 flume]$ bin/flume-ng agent -c conf/ -n a3 -f job/group3/flume3-flume-logger.conf -Dflume.root.logger=INFO,console
[djm@hadoop102 flume]$ bin/flume-ng agent -c conf/ -n a2 -f job/group3/flume2-netcat-flume.conf
[djm@hadoop103 flume]$ bin/flume-ng agent -c conf/ -n a1 -f job/group3/flume1-logger-flume.conf

4Ganglia部署

1、安裝httpd服務(wù)與php

yum -y install httpd php

2、安裝其他依賴

yum -y install rrdtool perl-rrdtool rrdtool-devel

3、安裝ganglia

rpm -Uvh http://dl.fedoraproject.org/pub/epel/6/x86_64/epel-release-6-8.noarch.rpm
yum -y install ganglia-gmetad ganglia-gmond ganglia-web 

4、修改ganglia配置文件

vim /etc/httpd/conf.d/ganglia.conf
#
# Ganglia monitoring system php web frontend
#

Alias /ganglia /usr/share/ganglia

<Location /ganglia>
  # Require local
  Require all granted
  # Require ip 10.1.2.3
  # Require host example.org
</Location> 

特別注意:以下配置是不能起作用的

<Location /ganglia>
  Order deny,allow
  Allow from all
</Location> 

5、修改gmetad配置文件

vim /etc/ganglia/gmetad.conf 
data_source "hadoop102" 192.168.1.102

6、修改gmond配置文件

vim /etc/ganglia/gmond.conf 
cluster {
  #name = "unspecified"
  name = "hadoop102"
  owner = "unspecified"
  latlong = "unspecified"
  url = "unspecified"
}

udp_send_channel { 
#bind_hostname = yes # Highly recommended, soon to be default. 
# This option tells gmond to use a source address
# that resolves to the machine's hostname. Without
# this, the metrics may appear to come from any
# interface and the DNS names associated with
# those IPs will be used to create the RRDs.
  #mcast_join = 239.2.11.71
  host = 192.168.10.102
  port = 8649
  ttl = 1
}

/* You can specify as many udp_recv_channels as you like as well. */
udp_recv_channel {
  #mcast_join = 239.2.11.71
  port = 8649
  #bind = 239.2.11.71
  bind = 192.168.10.102
  retry_bind = true 

# Size of the UDP buffer. If you are handling lots of metrics you really
# should bump it up to e.g. 10MB or even higher.
# buffer = 10485760
} 

6、查看SELinux狀態(tài)

sestatus

如果不是disabled,需修改以下配置文件:

vim /etc/selinux/config

或者臨時關(guān)閉SELinux

setenforce 0

7、啟動ganglia

systemctl start httpd
systemctl start gmetad 
systemctl start gmond

8、打開瀏覽器訪問

http://hadoop102/ganglia/

如果完成以上操作仍出現(xiàn)權(quán)限不足錯誤,可修改/var/lib/ganglia目錄的權(quán)限嘗試

chmod -R 777 /var/lib/ganglia

5 自定義Source

需求分析:

Flume 入門

編碼實現(xiàn):

1、引入依賴

<dependency>
    <groupId>org.apache.flume</groupId>
    <artifactId>flume-ng-core</artifactId>
    <version>1.7.0</version>
</dependency>

2、代碼編寫

package com.djm.flume;

import org.apache.flume.Context;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.PollableSource;
import org.apache.flume.conf.Configurable;
import org.apache.flume.event.SimpleEvent;
import org.apache.flume.source.AbstractSource;

import java.util.HashMap;

public class MySource extends AbstractSource implements Configurable, PollableSource {

    //定義配置文件將來要讀取的字段
    private Long delay;

    private String field;

    /**
     * 接收數(shù)據(jù),將數(shù)據(jù)封裝成一個個event,寫入channel
     * @return
     * @throws EventDeliveryException
     */
    public Status process() throws EventDeliveryException {
        HashMap<String, String> hearderMap  = new HashMap<>();
        SimpleEvent event = new SimpleEvent();
            try {
                for (int i = 0; i < 5; i++) {
                    event.setHeaders(hearderMap);
                    event.setBody((field + i).getBytes());
                    getChannelProcessor().processEvent(event);
                    Thread.sleep(delay);
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
                return Status.BACKOFF;
            }
            return Status.READY;
    }

    public long getBackOffSleepIncrement() {
        return 0;
    }

    public long getMaxBackOffSleepInterval() {
        return 0;
    }

    /**
     * 讀取配置文件
     * @param context
     */
    public void configure(Context context) {
        delay = context.getLong("delay");
        field = context.getString("field", "hello");
    }
}

3、打包測試

利用Maven打包并上傳到 /opt/module/flume/lib目錄下

job文件夾下創(chuàng)建Agent配置文件mysource.conf

[djm@hadoop102 job]$ vim mysource.conf

添加如下內(nèi)容:

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = com.djm.flume.MySource
a1.sources.r1.delay = 1000
a1.sources.r1.field = djm

# Describe the sink
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

啟動任務(wù)

[djm@hadoop102 flume]$ bin/flume-ng agent -c conf/ -f job/mysource.conf -n a1 -Dflume.root.logger=INFO,console

6 自定義Sink

需求分析:

Flume 入門

編碼實現(xiàn):

1、引入依賴

<dependency>
    <groupId>org.apache.flume</groupId>
    <artifactId>flume-ng-core</artifactId>
    <version>1.7.0</version>
</dependency>

2、代碼編寫

package com.djm.flume;

import org.apache.flume.*;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MySink extends AbstractSink implements Configurable {

    private static final Logger LOG = LoggerFactory.getLogger(AbstractSink.class);

    private String prefix;
    private String suffix;

    @Override
    public Status process() throws EventDeliveryException {
        Status status = null;

        Channel channel = getChannel();
        Transaction transaction = channel.getTransaction();
        try {
            Event event;
            transaction.begin();
            while ((event = channel.take()) == null) {
                Thread.sleep(200);
            }
            LOG.info(prefix + new String(event.getBody()) + suffix);
            transaction.commit();
            status = Status.READY;
        } catch (Throwable e) {
            transaction.rollback();
            status = Status.BACKOFF;
            if (e instanceof Error)
                throw (Error) e;
        } finally {
            transaction.close();
        }
        return status;
    }

    @Override
    public void configure(Context context) {
        prefix = context.getString("prefix");
        suffix = context.getString("suffix");
    }
}

3、打包測試

利用Maven打包并上傳到 /opt/module/flume/lib目錄下

job文件夾下創(chuàng)建Agent配置文件mysource.conf

[djm@hadoop102 job]$ vim mysink.conf

添加如下內(nèi)容:

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

# Describe the sink
a1.sinks.k1.type = com.djm.flume.MySink
a1.sinks.k1.prefix = djm:
a1.sinks.k1.suffix = :end

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

啟動任務(wù)

[djm@hadoop102 flume]$ bin/flume-ng agent -c conf/ -f job/mysink.conf -n a1 -Dflume.root.logger=INFO,console

7Flume參數(shù)調(diào)優(yōu)

7.1Source

增加Source個數(shù)可以增大Source的讀取數(shù)據(jù)的能力,例如:當(dāng)某一個目錄產(chǎn)生的文件過多時需要將這個文件目錄拆分成多個文件目錄,同時配置好多個Source以保證Source有足夠的能力獲取到新產(chǎn)生的數(shù)據(jù)。

batchSize參數(shù)決定Source一次批量運輸?shù)?code>Channel的Event條數(shù),適當(dāng)調(diào)大這個參數(shù)可以提高Source搬運EventChannel時的性能。

7.2Channel

Type選擇Memory ChannelChannel的性能最好,但是如果Flume進(jìn)程意外掛掉可能會丟失數(shù)據(jù)

Type選擇File ChannelChannel的容錯性更好,但是性能上會比Memory Channel差,使用File Channel時`dataDirs 配置多個不同盤下的目錄可以提高性能。

Capacity參數(shù)決定Channel可容納最大的Event條數(shù),TransactionCapacity 參數(shù)決定每次SourceChannel里面寫的最大Event條數(shù)和每次SinkChannel里面讀的最大Event條數(shù),TransactionCapacity需要大于SourceSinkbatchSize參數(shù)。

7.3Sink

增加Sink的個數(shù)可以增加Sink消費Event的能力,Sink也不是越多越好夠用就行,過多的Sink會占用系統(tǒng)資源,造成系統(tǒng)資源不必要的浪費。

batchSize參數(shù)決定Sink一次批量從Channel讀取的Event條數(shù),適當(dāng)調(diào)大這個參數(shù)可以提高SinkChannel搬出Event的性能。

當(dāng)前文章:Flume入門
瀏覽地址:http://jinyejixie.com/article38/iisdsp.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供網(wǎng)頁設(shè)計公司響應(yīng)式網(wǎng)站、定制網(wǎng)站、網(wǎng)站營銷移動網(wǎng)站建設(shè)、關(guān)鍵詞優(yōu)化

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)

外貿(mào)網(wǎng)站建設(shè)
湘潭市| 仙游县| 阜南县| 黔西县| 张家界市| 芦溪县| 大庆市| 玛曲县| 中西区| 新田县| 盖州市| 尼玛县| 台州市| 广元市| 古交市| 名山县| 石河子市| 武胜县| 南丹县| 洱源县| 玛多县| 陈巴尔虎旗| 德江县| 黄龙县| 镇雄县| 荥经县| 全南县| 彰化市| 乃东县| 宜城市| 梅河口市| 绩溪县| 曲麻莱县| 平远县| 灌南县| 台中县| 诏安县| 措勤县| 富锦市| 宿州市| 富民县|