成人午夜视频全免费观看高清-秋霞福利视频一区二区三区-国产精品久久久久电影小说-亚洲不卡区三一区三区一区

一、Flume--數(shù)據(jù)采集器基本原理和使用

一、概述

1、flume是什么

1) Flume提供一個分布式的,可靠的,對大數(shù)據(jù)量的日志進行高效收集、聚集、移動的服務(wù),F(xiàn)lume只能在Linux環(huán)境下運行。
2) Flume基于流式架構(gòu),容錯性強,也很靈活簡單,架構(gòu)簡單。
3) Flume、Kafka用來實時進行數(shù)據(jù)收集,Spark、Storm用來實時處理數(shù)據(jù),impala用來實時查詢。

專注于為中小企業(yè)提供做網(wǎng)站、成都做網(wǎng)站服務(wù),電腦端+手機端+微信端的三站合一,更高效的管理,為中小企業(yè)永康免費做網(wǎng)站提供優(yōu)質(zhì)的服務(wù)。我們立足成都,凝聚了一批互聯(lián)網(wǎng)行業(yè)人才,有力地推動了上千家企業(yè)的穩(wěn)健成長,幫助中小企業(yè)通過網(wǎng)站建設(shè)實現(xiàn)規(guī)模擴充和轉(zhuǎn)變。

2、flume的基本架構(gòu)

一、Flume--數(shù)據(jù)采集器基本原理和使用

圖1.1 flume架構(gòu)

說到flume的架構(gòu),直接拿官網(wǎng)的圖來說就足夠了。
首先在每個數(shù)據(jù)源上都會部署一個 flume agent ,這個agent就是用來采取數(shù)據(jù)的。
這個agent由3個組件組成:source,channel,sink。而在flume中,數(shù)據(jù)傳輸?shù)幕締挝皇莈vent。下面講講這幾個概念

(1)source

用于從數(shù)據(jù)源采集數(shù)據(jù),并將數(shù)據(jù)傳輸在channel中。source支持多種數(shù)據(jù)源采集方式。比如監(jiān)聽端口采集數(shù)據(jù),從文件中采集,從目錄中采集,從http服務(wù)中采集等。

(2)channel

位于source和sink之間,是數(shù)據(jù)的一個暫存區(qū)域。一般情況下,從source流出數(shù)據(jù)的速率和sink流出的數(shù)據(jù)的速率會有所差異。所以需要一個空間暫存那些還沒辦法傳輸?shù)絪ink進行處理的數(shù)據(jù)。所以channel類似于一個緩沖區(qū),一個隊列。

(3)sink

從channel獲取數(shù)據(jù),并將數(shù)據(jù)寫到目標源。目標源支持多種,比如本地文件、hdfs、kafka、下一個flume agent的source等均可。

(4)event

傳輸單元,flume傳輸?shù)幕締挝?,包?headers和body兩部分,header可以添加一些頭部信息,body則是數(shù)據(jù)。

3、flume傳輸過程

基于上面的概念,流程基本很清晰,source監(jiān)控數(shù)據(jù)源,如果產(chǎn)生新的數(shù)據(jù),則獲取數(shù)據(jù),并封裝成一個event,然后將event傳輸?shù)絚hannel,接著sink從channel拉取數(shù)據(jù)寫入到目標源中。

?

二、flume的使用

1、flume部署

flume的程序本身的部署非常簡單,
(1)部署jdk1.8
(2)解壓flume的程序壓縮包到指定目錄,然后添加環(huán)境變量即可
(3)修改配置文件

cd /opt/modules/apache-flume-1.8.0-bin

將模板配置文件復制重命名為正式配置文件
cp conf/flume-env.sh.template conf/flume-env.sh

添加jdk家目錄變量
vim conf/flume-env.sh
加上這句
export JAVA_HOME=/opt/modules/jdk1.8.0_144

這就完成配置了,基本沒啥難度。flume的使用重點在于agent的配置文件的編寫,根據(jù)業(yè)務(wù)場景不同,配置也不同。簡單來說其實就是對source,channel,sink三大組件的工作屬性的配置。

?

2、agent定義流程

agent的配置其實就是對source、channel、sink的配置。主要有5個步驟,下面看看這個流程是怎樣的。

# 1、定義的agent名稱,指定使用的source sinks channels的名稱
# 可以有多個source sinks channels。
<Agent>.sources = <Source>
<Agent>.sinks = <Sink>
<Agent>.channels = <Channel1>

# 2、定義source工作屬性。
# 基本格式就是 agent名.sources.source名.參數(shù)名=value
# 第一個參數(shù)都是type,就是指定source類型的
<Agent>.sources.<Source>.type=xxxx
<Agent>.sources.<Source>.<parameter1>=xxxx
<Agent>.sources.<Source>.<parameter2>=xxxx
.........

# 3、設(shè)置channel工作屬性.格式都是類似的
# 第一個參數(shù)都是type,就是指定channel類型的
<Agent>.channels.<Channel1>.type=xxxxx
<Agent>.channels.<Channel1>.<parameter1>=xxxxx
<Agent>.channels.<Channel1>.<parameter2>=xxxxx
.........

# 4、設(shè)置sink工作屬性
# 第一個參數(shù)都是type,就是指定sink類型的
<Agent>.sinks.<sink>.type=xxxxx
<Agent>.sinks.<sink>.<parameter1>=xxxxx
<Agent>.sinks.<sink>.<parameter2>=xxxxx
...............

# 5、設(shè)置source以及sink使用的channel,通過channel將兩者連接起來
<Agent>.sources.<Source>.channels = <Channel1>
<Agent>.sinks.<Sink>.channel = <Channel1>

這就是agent定義的完整流程,source、channel、sink每個都有不同的類型,每個類型定義的參數(shù)會有差異。下面看看source、channel、sink中常用的類型(想看完整的全部的類型就看官網(wǎng)吧)

3、常用source的類型

(1)netcat--從tcp端口獲取數(shù)據(jù)

常用屬性:
type:需指定為  netcat
bind:監(jiān)聽的主機名或者ip
port:監(jiān)聽的端口

例子:監(jiān)聽在 0.0.0.0:6666端口
a1.sources.r1.type = netcat
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 6666

(2)exec--執(zhí)行命令輸出作為數(shù)據(jù)源

常用屬性:
type:需指定為 exec
command:運行的命令
shell:運行名為所需的shell,如 /bin/bash -c

例子:監(jiān)控文件的新增內(nèi)容
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /var/log/secure
a1.sourcesr.r1.shell = /bin/bash -c

(3)spooldir--監(jiān)控目錄內(nèi)容

常用的屬性:
type:設(shè)置為 spooldir
spoolDir:監(jiān)控的目錄路徑
fileSuffix:上傳完成的文件加上指定的后綴,默認是 .COMPLETED
fileHeader:是否在event的header添加一個key標明該文件的絕對路徑,默認為false
ignorePattern:正則匹配,忽略的文件
還有其他很多參數(shù),具體到官網(wǎng)上看吧

例子:
a3.sources.r3.type = spooldir
a3.sources.r3.spoolDir = /opt/module/flume1.8.0/upload
a3.sources.r3.fileSuffix = .COMPLETED
a3.sources.r3.fileHeader = true
#忽略所有以.tmp結(jié)尾的文件,不上傳
a3.sources.r3.ignorePattern = ([^ ]*\.tmp)

(4)avro--flume之間串聯(lián)的中間格式

這個源比較特別,通常用在上一個flume的sink 輸出,然后作為下一個flume的輸入的格式。

常用的屬性:
type:需指定為  avro
bind:監(jiān)聽的主機名或者ip,只能是agent所在主機的ip或者hostname
port:監(jiān)聽的端口

例子:
a1.sources.r1.type = avro
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 4141

(5)TAILDIR--監(jiān)控文件或者目錄內(nèi)容變化(1.7以及之后才有)

? spoolDir有一個bug,就是已經(jīng)上傳完成的文件,不能再追加內(nèi)容,否則會報錯,而且也無法讀取到新的文件內(nèi)容。所以spooldir只能用來監(jiān)控目錄下新文件的變化,沒辦法監(jiān)控已有文件的內(nèi)容變化。以往這種情況只能使用 exec源,然后使用tail -f xxxlog 的方式來監(jiān)聽文件內(nèi)容變化,但是這種方式有缺陷,就是容易丟失數(shù)據(jù)。而在flume1.7之后有一個新的source,叫TAILDIR,可以直接監(jiān)聽文件變化的內(nèi)容??纯从梅ǎ?/p>

常用屬性:
type:TAILDIR ,記住,要全部大寫
filegroups:要監(jiān)聽的文件組的名字,可以有多個文件組
filegroups.<filegroupName>:指定文件組的包含哪些文件,可以使用擴展正則表達式,這里可以有的小技巧 /path/.*  這樣就可以監(jiān)聽目錄下的所有文件內(nèi)容的變化
positionFile:這個文件json格式記錄了目錄下每個文件的inode,以及pos偏移量
fileHeader:是否添加header

屬性過多,可以當官網(wǎng)看:http://flume.apache.org/releases/content/1.9.0/FlumeUserGuide.html#spooling-directory-source

例子:
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = TAILDIR
a1.sources.r1.channels = c1
a1.sources.r1.positionFile = /var/log/flume/taildir_position.json
a1.sources.r1.filegroups = f1 f2  有兩個文件組
# 文件組1內(nèi)容
a1.sources.r1.filegroups.f1 = /var/log/test1/example.log
a1.sources.r1.headers.f1.headerKey1 = value1
# 使用正則表達式指定文件組
a1.sources.r1.filegroups.f2 = /var/log/test2/.*log.*
a1.sources.r1.headers.f2.headerKey1 = value2
a1.sources.r1.headers.f2.headerKey2 = value2-2
a1.sources.r1.fileHeader = true
a1.sources.ri.maxBatchCount = 1000

下面再說說上面說到的 positionFile 這個東東,看看它的格式:

[{"inode":408241856,"pos":27550,"file":"/opt/modules/apache-flume-1.8.0-bin/logs/flume.log.COMPLETED"},

{"inode":406278032,"pos":0,"file":"/opt/modules/apache-flume-1.8.0-bi
n/logs/words.txt.COMPLETED"},{"inode":406278035,"pos":0,"file":"/opt/modules/apache-flume-1.8.0-bin/logs/words.txt"},

{"inode":406278036,"pos":34,"file":"/opt/modules/apache
-flume-1.8.0-bin/logs/test.txt"}]

分析:
1、每個文件都是一個json串,由多個json串組成一個類似于數(shù)組的東西。
2、每個json包含內(nèi)容有:
    inode:這個什么意思就自己具體看看文件系統(tǒng)的基本知識吧
    pos:開始監(jiān)聽文件內(nèi)容的起始偏移量
    file:文件絕對路徑名
3、小技巧:
(1)如果監(jiān)聽目錄時,某些文件已存在,那么flume默認是從文件最后作為監(jiān)聽起始點進行監(jiān)聽。當文件內(nèi)容更新時,flume會獲取,然后sink。接著就會更新pos值。所以因為這個特點,就算flume agent突然崩了,下一次啟動時,自動從上次崩潰的pos開始監(jiān)聽,而不是從最新的文件末尾開始監(jiān)聽。這樣就不會丟失數(shù)據(jù)了,而且不會重復讀取舊數(shù)據(jù)。
(2)從(1)可知,pos就是實時更新的一個文件內(nèi)容監(jiān)聽點,如果我們想文件從頭開始監(jiān)聽,有時候有需求,需要將監(jiān)聽目錄下的文件全部傳輸一邊。這時候很簡單,將json文件中的pos改為0就好了。
4、如果沒有指定positionFile路徑,默認為/USER_HOME/.flume/taildir_position.json

4、常用channel類型

(1)memory--用內(nèi)存作為暫存空間

常用的屬性:
type:需指定為  memory
capacity:存儲在channel中event數(shù)量的最大值
transactionCapacity:一次傳輸?shù)膃vent的最大數(shù)量 

例子:
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

(2)file--使用磁盤文件作為暫存空間

常用的屬性:
type:需指定為  file
checkpointDir:存儲checkpoint文件的目錄
dataDirs:存儲數(shù)據(jù)的目錄

例子:
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /mnt/flume/checkpoint
a1.channels.c1.dataDirs = /mnt/flume/data

(3)SPILLABLEMEMORY--文件+內(nèi)存作為暫存空間

這個類型是將內(nèi)存+文件作為channel,當容量空間超過內(nèi)存時就寫到文件中

常用的屬性:
type:指定為 SPILLABLEMEMORY
memoryCapacity:使用內(nèi)存存儲的event的最大數(shù)量
overflowCapacity:存儲到文件event的最大數(shù)量
byteCapacity:使用內(nèi)存存儲的event的最大容量,單位是 bytes
checkpointDir:存儲checkpoint文件的目錄
dataDirs:存儲數(shù)據(jù)的目錄

例子:
a1.channels.c1.type = SPILLABLEMEMORY
a1.channels.c1.memoryCapacity = 10000
a1.channels.c1.overflowCapacity = 1000000
a1.channels.c1.byteCapacity = 800000
a1.channels.c1.checkpointDir = /mnt/flume/checkpoint
a1.channels.c1.dataDirs = /mnt/flume/data

(4)kafka--作為channel

生產(chǎn)環(huán)境中,flume+kafka也是常用的技術(shù)棧,但是一般是將kafka作為sink目標

常用屬性:
type:設(shè)置為 org.apache.flume.channel.kafka.KafkaChannel
bootstrap.servers:kafka集群的服務(wù)器, ip:port,ip2:port,....
topic:kafka中的topic
consumer.group.id:消費者的groupid

例子:
a1.channels.channel1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.channel1.kafka.bootstrap.servers = kafka-1:9092,kafka-2:9092,kafka-3:9092
a1.channels.channel1.kafka.topic = channel1
a1.channels.channel1.kafka.consumer.group.id = flume-consumer

5、常用sink類型

(1)logger--直接作為log信息輸出

常用屬性:
type:logger

例子:
a1.sinks.k1.type = logger

這個類型比較簡單,一般用于調(diào)試時使用

(2)avro--串聯(lián)flume的中間格式

這個類型主要就是用來給下一個flume作為輸入的格式,是字節(jié)流的方式,而且是序列化的序列。

常用屬性:
type:avro
hostname:輸出目標的主機名或者ip,可以任意主機,不局限于本機
ip:輸出到的端口

例子:
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = 10.10.10.10
a1.sinks.k1.port = 4545

(3)hdfs--直接寫入到hdfs

常用屬性:
type:hdfs
hdfs.path:存儲路徑 , hdfs://namenode:port/PATH
hdfs.filePrefix:上傳的文件的前綴(額外加上的)
hdfs.round:是否按時間滾動文件夾
hdfs.roundValue:滾動的時間值
hdfs.roundUnit:滾動的時間的單位
hdfs.userLocalTimeStamp:是否使用本地時間戳,true還是false
hdfs.batchSize:積攢多少個event才flush到hdfs 一次
hdfs.fileType:文件類型,DataStream(普通文件),SequenceFile(二進制格式,默認),CompressedStream(壓縮格式)
hdfs.rollInterval:多久生成一個新的文件,單位是秒
hdfs.rollSize:文件滾動大小,單位是 bytes
hdfs.rollCount:文件滾動是否與event數(shù)量有關(guān),true 還是false
hdfs.minBlockReplicas:最小副本數(shù)

例子:
#指定sink的類型為存儲在hdfs中
a2.sinks.k2.type = hdfs
# 路徑命名為按小時
a2.sinks.k2.hdfs.path = hdfs://bigdata121:9000/flume/%H
#上傳文件的前綴
a2.sinks.k2.hdfs.filePrefix = king-
#是否按照時間滾動文件夾
a2.sinks.k2.hdfs.round = true
#多少時間單位創(chuàng)建一個新的文件夾
a2.sinks.k2.hdfs.roundValue = 1
#重新定義時間單位
a2.sinks.k2.hdfs.roundUnit = hour
#是否使用本地時間戳
a2.sinks.k2.hdfs.useLocalTimeStamp = true
#積攢多少個Event才flush到HDFS一次
a2.sinks.k2.hdfs.batchSize = 1000
#設(shè)置文件類型,可支持壓縮
a2.sinks.k2.hdfs.fileType = DataStream
#多久生成一個新的文件,單位是秒
a2.sinks.k2.hdfs.rollInterval = 600
#設(shè)置每個文件的滾動大小,單位是bytes
a2.sinks.k2.hdfs.rollSize = 134217700
#文件的滾動與Event數(shù)量無關(guān)
a2.sinks.k2.hdfs.rollCount = 0
#最小副本數(shù)
a2.sinks.k2.hdfs.minBlockReplicas = 1

(4)file_roll--存儲到本地文件系統(tǒng)

常用屬性:
type:file_roll
sink.directory:存儲路徑

例子:
a1.sinks.k1.type = file_roll
a1.sinks.k1.sink.directory = /var/log/flum

(5)kafka--存儲到kafka集群中

常用屬性:
tpye:org.apache.flume.sink.kafka.KafkaSink
kafka.topic:kafka話題名
kafka.bootstrap.servers:集群服務(wù)器列表,以逗號分隔
kafka.flumeBatchSize:刷寫到kafka的event數(shù)量
kafka.producer.acks:接收到時返回ack信息時,寫入的最少的副本數(shù)
kafka.producer.compression.type:壓縮類型

例子:
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = mytopic
a1.sinks.k1.kafka.bootstrap.servers = localhost:9092
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.compression.type = snappy

6、攔截器interceptors 常用類型

攔截器interceptors并不是必須的,它是工作在source和channel之間的一個組件,用于過濾source來的數(shù)據(jù),并輸出到channel。
使用格式:

先指定攔截器的名字,然后對每個攔截器進行工作屬性配置
<agent>.sources.<source>.interceptors = <interceptor>
<agent>.sources.<source>.interceptors.<interceptor>.<param> = xxxx

(1)timestamp時間戳攔截器

在event 的header中添加一個字段,用于標明時間戳如:headers:{timestamp:111111}。

常用屬性:
type:timestamp
headerName:在header中的key名字,默認是 timestamp

例子:
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = timestamp

(2)host主機名攔截器

在event 的header中添加一個字段,用于標明host戳,如:headers:{host:bigdata121}。

常用屬性:
type:host
hostHeader:在header中的key名字,默認是 host
useIP:用ip還是主機名

例子:
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = host

(3)UUID攔截器

在event 的header中添加一個字段,用于標明uuid如:headers:{id:111111}。

常用屬性:
type:org.apache.flume.sink.solr.morphline.UUIDInterceptor$Builder
headName:在header中的key名字,默認是 id
prefix:給每個UUID添加前綴

(4)search_replace查詢替換

使用正則匹配,然后替換指定字符

常用屬性:
type:search_replace
searchPattern:匹配的正則
replaceString:替換的字符串
charset:字符集,默認UTF-8

例子:刪除特定字符開頭的字符串
a1.sources.avroSrc.interceptors = search-replace
a1.sources.avroSrc.interceptors.search-replace.type = search_replace
a1.sources.avroSrc.interceptors.search-replace.searchPattern = ^[A-Za-z0-9_]+
a1.sources.avroSrc.interceptors.search-replace.replaceString =

(5)regex_filter正則過濾

正則匹配,匹配到的丟棄或者留下

常用屬性:
type:regex_filter
regex:正則
excludeEvents:true為過濾掉匹配的,false為留下匹配的

例子:
a1.sources.r1.interceptors.i1.type = regex_filter
a1.sources.r1.interceptors.i1.regex = ^A.*
#如果excludeEvents設(shè)為false,表示過濾掉不是以A開頭的events。如果excludeEvents設(shè)為true,則表示過濾掉以A開頭的events。
a1.sources.r1.interceptors.i1.excludeEvents = true

(6) regex_extractor正則抽取

這里其實是利用正則的分組匹配來獲取多個匹配組,然后將每個組的匹配值存儲到header中,key可以自定義。

a1.sources.r1.type = exec
a1.sources.r1.channels = c1
a1.sources.r1.command = tail -F /opt/Andy
a1.sources.r1.interceptors = i1
# 指定類型為 regex_extractor
a1.sources.r1.interceptors.i1.type = regex_extractor
# 分組匹配的正則
a1.sources.r1.interceptors.i1.regex = hostname is (.*?) ip is (.*)
# 兩個分組各自的key別名
a1.sources.r1.interceptors.i1.serializers = s1 s2
# 分別設(shè)置key的名字
a1.sources.r1.interceptors.i1.serializers.s1.name = cookieid
a1.sources.r1.interceptors.i1.serializers.s2.name = ip

(7)自定義攔截器

繼承接口 org.apache.flume.interceptor.Interceptor,實現(xiàn)里面的特定方法,如:

import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import java.util.ArrayList;
import java.util.List;

public class MyInterceptor implements Interceptor {
    @Override
    public void initialize() {
    }

    @Override
    public void close() {
    }

    /**
     * 攔截source發(fā)送到通道channel中的消息
     * 處理單個event
     * @param event 接收過濾的event
     * @return event    根據(jù)業(yè)務(wù)處理后的event
     */
    @Override
    public Event intercept(Event event) {
        // 獲取事件對象中的字節(jié)數(shù)據(jù)
        byte[] arr = event.getBody();
        // 將獲取的數(shù)據(jù)轉(zhuǎn)換成大寫
        event.setBody(new String(arr).toUpperCase().getBytes());
        // 返回到消息中
        return event;
    }

    // 處理event集合
    @Override
    public List<Event> intercept(List<Event> events) {
        List<Event> list = new ArrayList<>();
        for (Event event : events) {
            list.add(intercept(event));
        }
        return list;
    }

    //用來返回攔截器對象
    public static class Builder implements Interceptor.Builder {
        // 獲取配置文件的屬性
        @Override
        public Interceptor build() {
            return new MyInterceptor();
        }

        @Override
        public void configure(Context context) {

        }
    }

pom.xml依賴

<dependencies>
        <!-- flume核心依賴 -->
        <dependency>
            <groupId>org.apache.flume</groupId>
            <artifactId>flume-ng-core</artifactId>
            <version>1.8.0</version>
        </dependency>
    </dependencies>

在 agent的配置文件中指定攔截器

a1.sources.r1.interceptors = i1
#全類名$Builder
a1.sources.r1.interceptors.i1.type = ToUpCase.MyInterceptor$Builder

運行命令:

bin/flume-ng agent -c conf/ -n a1 -f jar/ToUpCase.conf -C jar/Flume_Andy-1.0-SNAPSHOT.jar -Dflume.root.logger=DEBUG,console

-C 指定額外的jar包的路徑,就是我們自己寫的攔截器的jar包

也可以將jar包放到flume程序目錄的lib目錄下

三、flume案例

1、讀取文件到hdfs

# 1.定義agent的名字a2.以及定義這個agent中的source,sink,channel的名字
a2.sources = r2
a2.sinks = k2
a2.channels = c2

#2.定義Source,定義數(shù)據(jù)來源
# 定義source類型是exec,執(zhí)行命令的方式
a2.sources.r2.type = exec
# 命令
a2.sources.r2.command = tail -F /tmp/access.log
# 使用的shell
a2.sources.r2.shell = /bin/bash -c

#3.定義sink
#指定sink的類型為存儲在hdfs中
a2.sinks.k2.type = hdfs
# 路徑命名為按小時
a2.sinks.k2.hdfs.path = hdfs://bigdata121:9000/flume/%H
#上傳文件的前綴
a2.sinks.k2.hdfs.filePrefix = king-
#是否按照時間滾動文件夾
a2.sinks.k2.hdfs.round = true
#多少時間單位創(chuàng)建一個新的文件夾
a2.sinks.k2.hdfs.roundValue = 1
#重新定義時間單位
a2.sinks.k2.hdfs.roundUnit = hour
#是否使用本地時間戳
a2.sinks.k2.hdfs.useLocalTimeStamp = true
#積攢多少個Event才flush到HDFS一次
a2.sinks.k2.hdfs.batchSize = 1000
#設(shè)置文件類型,可支持壓縮
a2.sinks.k2.hdfs.fileType = DataStream
#多久生成一個新的文件,單位是秒
a2.sinks.k2.hdfs.rollInterval = 600
#設(shè)置每個文件的滾動大小,單位是bytes
a2.sinks.k2.hdfs.rollSize = 134217700
#文件的滾動與Event數(shù)量無關(guān)
a2.sinks.k2.hdfs.rollCount = 0
#最小副本數(shù)
a2.sinks.k2.hdfs.minBlockReplicas = 1

# 4.定義Channel,類型、容量限制、傳輸容量限制 
a2.channels.c2.type = memory
a2.channels.c2.capacity = 1000
a2.channels.c2.transactionCapacity = 100

# 5.鏈接,通過channel將source和sink連接起來
a2.sources.r2.channels = c2
a2.sinks.k2.channel = c2

啟動flume-agent:

/opt/module/flume1.8.0/bin/flume-ng agent \
--conf /opt/module/flume1.8.0/conf/ \   flume配置目錄
--name a2 \                             agent名字
--conf-file /opt/module/flume1.8.0/jobconf/flume-hdfs.conf  agent配置
-Dflume.root.logger=INFO,console          打印日志到終端

2、多flume聯(lián)合,一對多

flume1:輸出到flume2和flume3
flume2:輸出到本地文件
flume3:輸出到hdfs

flume1.conf

# Name the components on this agent
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2
# 將數(shù)據(jù)流復制給多個channel。啟動復制模式
a1.sources.r1.selector.type = replicating

# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /opt/test
a1.sources.r1.shell = /bin/bash -c

# 這是k1 sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = bigdata111
a1.sinks.k1.port = 4141

# 這是k2 sink
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = bigdata111
a1.sinks.k2.port = 4142

# Describe the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100

# 給source接入連接兩個channel.每個channel對應(yīng)一個sink
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2

flume2.conf

# Name the components on this agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1

# Describe/configure the source
a2.sources.r1.type = avro
a2.sources.r1.bind = bigdata111
a2.sources.r1.port = 4141

# Describe the sink
a2.sinks.k1.type = hdfs
a2.sinks.k1.hdfs.path = hdfs://bigdata111:9000/flume2/%H
#上傳文件的前綴
a2.sinks.k1.hdfs.filePrefix = flume2-
#是否按照時間滾動文件夾
a2.sinks.k1.hdfs.round = true
#多少時間單位創(chuàng)建一個新的文件夾
a2.sinks.k1.hdfs.roundValue = 1
#重新定義時間單位
a2.sinks.k1.hdfs.roundUnit = hour
#是否使用本地時間戳
a2.sinks.k1.hdfs.useLocalTimeStamp = true
#積攢多少個Event才flush到HDFS一次
a2.sinks.k1.hdfs.batchSize = 100
#設(shè)置文件類型,可支持壓縮
a2.sinks.k1.hdfs.fileType = DataStream
#多久生成一個新的文件
a2.sinks.k1.hdfs.rollInterval = 600
#設(shè)置每個文件的滾動大小大概是128M
a2.sinks.k1.hdfs.rollSize = 134217700
#文件的滾動與Event數(shù)量無關(guān)
a2.sinks.k1.hdfs.rollCount = 0
#最小副本數(shù)
a2.sinks.k1.hdfs.minBlockReplicas = 1

# Describe the channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1

flume3.conf

# Name the components on this agent
a3.sources = r1
a3.sinks = k1
a3.channels = c1

# Describe/configure the source
a3.sources.r1.type = avro
a3.sources.r1.bind = bigdata111
a3.sources.r1.port = 4142

# Describe the sink
a3.sinks.k1.type = file_roll
#備注:此處的文件夾需要先創(chuàng)建好
a3.sinks.k1.sink.directory = /opt/flume3

# Describe the channel
a3.channels.c1.type = memory
a3.channels.c1.capacity = 1000
a3.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a3.sources.r1.channels = c1
a3.sinks.k1.channel = c1

啟動時,先啟動flume2和flume3,最后啟動flume1。啟動命令不重復了。

3、多flume聯(lián)合,多對一

多臺server產(chǎn)生的日志,需要各自監(jiān)控,然后匯總起來存儲,這種場景很多。
flume1(監(jiān)聽文件)和flume2(監(jiān)聽端口)各自收集數(shù)據(jù),然后分別sink到flume3,flume3負責匯總寫入hdfs
flume1.conf

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /opt/Andy
a1.sources.r1.shell = /bin/bash -c

# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = bigdata111
a1.sinks.k1.port = 4141

# Describe the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

flume2.conf

# Name the components on this agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1

# Describe/configure the source
a2.sources.r1.type = netcat
a2.sources.r1.bind = bigdata111
a2.sources.r1.port = 44444

# Describe the sink
a2.sinks.k1.type = avro
a2.sinks.k1.hostname = bigdata111
a2.sinks.k1.port = 4141

# Use a channel which buffers events in memory
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1

flume3.conf

# Name the components on this agent
a3.sources = r1
a3.sinks = k1
a3.channels = c1

# Describe/configure the source
a3.sources.r1.type = avro
a3.sources.r1.bind = bigdata111
a3.sources.r1.port = 4141

# Describe the sink
a3.sinks.k1.type = hdfs
a3.sinks.k1.hdfs.path = hdfs://bigdata111:9000/flume3/%H
#上傳文件的前綴
a3.sinks.k1.hdfs.filePrefix = flume3-
#是否按照時間滾動文件夾
a3.sinks.k1.hdfs.round = true
#多少時間單位創(chuàng)建一個新的文件夾
a3.sinks.k1.hdfs.roundValue = 1
#重新定義時間單位
a3.sinks.k1.hdfs.roundUnit = hour
#是否使用本地時間戳
a3.sinks.k1.hdfs.useLocalTimeStamp = true
#積攢多少個Event才flush到HDFS一次
a3.sinks.k1.hdfs.batchSize = 100
#設(shè)置文件類型,可支持壓縮
a3.sinks.k1.hdfs.fileType = DataStream
#多久生成一個新的文件
a3.sinks.k1.hdfs.rollInterval = 600
#設(shè)置每個文件的滾動大小大概是128M
a3.sinks.k1.hdfs.rollSize = 134217700
#文件的滾動與Event數(shù)量無關(guān)
a3.sinks.k1.hdfs.rollCount = 0
#最小冗余數(shù)
a3.sinks.k1.hdfs.minBlockReplicas = 1

# Describe the channel
a3.channels.c1.type = memory
a3.channels.c1.capacity = 1000
a3.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a3.sources.r1.channels = c1
a3.sinks.k1.channel = c1

啟動時先啟動flume3,然后啟動flume1和flume2

$ bin/flume-ng agent --conf conf/ --name a3 --conf-file jobconf/flume3.conf
$ bin/flume-ng agent --conf conf/ --name a2 --conf-file jobconf/flume2.conf
$ bin/flume-ng agent --conf conf/ --name a1 --conf-file jobconf/flume1.conf

測試可以通過 telnet bigdata111 44444 端口來發(fā)送數(shù)據(jù)
可以在/opt/Andy文件中追加數(shù)據(jù)

當前文章:一、Flume--數(shù)據(jù)采集器基本原理和使用
文章出自:http://jinyejixie.com/article4/jjhjie.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供軟件開發(fā)、商城網(wǎng)站域名注冊、搜索引擎優(yōu)化電子商務(wù)、響應(yīng)式網(wǎng)站

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)

外貿(mào)網(wǎng)站建設(shè)
察哈| 永仁县| 海盐县| 三门县| 沧州市| 江孜县| 莱州市| 洛川县| 岳西县| 平昌县| 锡林郭勒盟| 台南县| 连城县| 大冶市| 永城市| 连平县| 弥渡县| 类乌齐县| 栾城县| 平利县| 星座| 南郑县| 蓬安县| 荣成市| 广平县| 尼玛县| 樟树市| 大安市| 平昌县| 五原县| 景洪市| 手机| 内乡县| 石渠县| 滁州市| 邛崃市| 清徐县| 农安县| 新化县| 威信县| 小金县|