logstash採集與清洗資料到elasticsearch案例實戰

NO IMAGE

Logstash的使用

logstash支援把配置寫入檔案 xxx.conf,然後通過讀取配置檔案來採集資料
./bin/logstash –f xxx.conf

logstash最終會把資料封裝成json型別,預設會新增@timestamp時間欄位、host主機欄位、type欄位。原訊息資料會整個封裝進message欄位。如果資料處理過程中,使用者解析新增了多個欄位,則最終結果又會多出多個欄位。也可以在資料處理過程中移除多個欄位,總之,logstash最終輸出的資料格式是json格式。

Logstash的結構

Logstash由 input,filter,output三個元件去完成採集資料
如下是一個logstash的配置例項:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
input {
    file {
        type => "log"
        path => "/log/*/*.log"
        discover_interval => 10
        start_position => "beginning"
    }
}
filter {
}
output {
    elasticsearch {
    index => "log-%{ YYYY.MM.dd}"
    hosts => ["172.16.0.14:9200", "172.16.0.15:9200", "172.16.0.16:9200"]
    }
    stdout {codec => rubydebug}
}

input

input元件負責讀取資料,可以採用file外掛讀取本地文字檔案,stdin外掛讀取標準輸入資料,tcp外掛讀取網路資料,log4j外掛讀取log4j傳送過來的資料等等。

filter

filter外掛負責過濾解析input讀取的資料,可以用grok外掛正則解析資料,date外掛解析日期,json外掛解析json等等。

output

output外掛負責將filter處理過的資料輸出。可以用elasticsearch外掛輸出到es,rediss外掛輸出到redis,stdout外掛標準輸出,kafka外掛輸出到kafka等等
trade.log日誌採集。

 

trade.log日誌採集

配置內容如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
input {
    file {
        type => "tradelog"
        path => "/home/elk/his/trade.log*"
        discover_interval => 5
        start_position => "beginning"
         
        sincedb_path => "/home/elk/myconf/sincedb_trade.txt"
        sincedb_write_interval => 15
         
        codec => plain { charset => "GB2312" }
    }   
}
 
filter {
    grok {
        match => { "message" => "%{DATESTAMP_CN:[@metadata][logdate]} .* - %{WORD:opeType}\|%{WORD:name}\|Oid: %{WORD:oid}\|IP: %{IP:ip}\|MAC: %{GREEDYDATA:mac}\|%{WORD:result}\|%{GREEDYDATA:exception}\|" }   
        match => { "message" => "%{DATESTAMP_CN:[@metadata][logdate]} .* - %{WORD:opeType}\|%{WORD:name}\|Oid: %{WORD:oid}\|IP: %{IP:ip}\|MAC: %{GREEDYDATA:mac}\|%{WORD:result}\|"  }
        match => { "message" => "%{DATESTAMP_CN:[@metadata][logdate]} .* - %{WORD:opeType}\|%{WORD:name}\|Oid: %{WORD:oid}\|IP: %{IP:ip}\|MAC: %{GREEDYDATA:mac}\|"  }   
        match => { "message" => "%{DATESTAMP_CN:[@metadata][logdate]} .* - %{WORD:opeType}\|IP: %{IP:ip}\|MAC: %{GREEDYDATA:mac}\|%{WORD:result}\|"  }
        match => { "message" => "%{DATESTAMP_CN:[@metadata][logdate]} .* - %{WORD:opeType}\|IP: %{IP:ip}\|MAC: %{GREEDYDATA:mac}\|" }
        remove_field  => "message"
    }   
    date {
        match => ["[@metadata][logdate]", "YYYY-MM-dd HH:mm:ss,SSS"]
    }   
}
 
output {
    if "_grokparsefailure" not in [tags] and "_dateparsefailure" not in [tags] {
        stdout {codec => rubydebug}
         
        elasticsearch {
            index => "log4j-tradelog"
            hosts => ["168.7.1.67:9200"]
            manage_template => true
            template_overwrite => true
            template_name => "log4j-tradelog"
            template => "/home/elk/myconf/tradelog_template.json"
        }
    }   
}

input

1. start_position:設定beginning保證從檔案開頭讀取資料。
2. path:填入檔案路徑。
3. type:自定義型別為tradelog,由使用者任意填寫。
4. codec:設定讀取檔案的編碼為GB2312,使用者也可以設定為UTF-8等等
5. discover_interval:每隔多久去檢查一次被監聽的 path 下是否有新檔案,預設值是15秒
6. sincedb_path:設定記錄原始檔讀取位置的檔案,預設為檔案所在位置的隱藏檔案。
7. sincedb_write_interval:每隔15秒記錄一下檔案讀取位置

filter

日誌格式如下:

1
2
2016-05-09 09:49:13,817 [] [ACTIVE] ExecuteThread: '1' for queue: 'weblogic.kernel.Default (self-tuning)' [INFO ] com.c.command.StartLogCommand.execute(StartLogCommand.java:46) - FrontPageFinProdListQry|IP: 192.168.1.105|MAC: A1345C05-26C1-4263-8845-01CFCA6EC4FD|
2016-05-09 09:49:13,928 [] [ACTIVE] ExecuteThread: '2' for queue: 'weblogic.kernel.Default (self-tuning)' [INFO ] com.c.command.EndLogCommand.execute(EndLogCommand.java:44) - FrontPageAdvertListQry|IP: 192.168.1.105|MAC: A1245C05-26C1-4263-8845-01CFCA6EC4FD|Success|

 

grok外掛

因為該日誌中有5種格式如下,以最後幾個我需要的欄位為例說明:

1
2
3
4
5
交易名|登入名|編號|ip地址|mac地址|返回結果|異常資訊
交易名|登入名|編號|ip地址|mac地址|返回結果|
交易名|登入名|編號|ip地址|mac地址|
交易名|ip地址|mac地址|返回結果|
交易名|ip地址|mac地址|

所以採用5種正則規則去匹配,logstash預設會從上到下按規則去匹配,直到匹配上為止。(日誌中的多行錯誤資訊,匹配不上,logstash會在tags欄位新增”_ grokparsefailure”,所以後面輸出的時候會用if條件判斷過濾掉解析失敗的行訊息)

注意:5種正則規則的上下順序,下面的規則放在上面會導致可能內容解析不全,比如源資料是:請求交易名|操作員登入名|操作員編號|ip地址|mac地址|返回結果|異常資訊,如果按照“請求交易名|ip地址|mac地址|”規則去匹配,只能識別出3個欄位,而且匹配成功,不繼續往下執行,這樣識別的內容就不全。

logstash內建了很多正則匹配規則,使用者可以直接呼叫這些規則來解析,例如%{WORD:result} 表示呼叫WORD規則(即識別字串規則)來解析並最後賦值給result欄位(result欄位會自動建立)。
下面以第一條match規則為例來說明:

1
match => { "message" => "%{DATESTAMP_CN:[@metadata][logdate]} .* - %{WORD:opeType}\|%{WORD:name}\|Oid: %{WORD:oid}\|IP: %{IP:ip}\|MAC: %{GREEDYDATA:mac}\|%{WORD:result}\|%{GREEDYDATA:exception}\|" }

首先行首使用DATESTAMP_CN規則來識別時間,並賦值給logdate欄位名;然後.*識別任意字串(.代表任意一個字元,包括特殊字元,*代表個數是任意個);然後使用WORD規則(即匹配字串規則,不包含特殊字元)識別到字串並賦值給opeType欄位;後面同理。這些WORD、IP、GREEDYDATA規則都是logstash內部grok-patterns檔案已經定義好了的規則。使用者可以直接拿來使用。

注意:[@metadata]表示logdate這個欄位在資料處理過程中只是一個臨時欄位,最後不會真的輸出。避免了使用remove_field手動移除欄位。

注意:logstash預設不支援”YYYY-MM-dd HH:mm:ss,SSS”格式的時間匹配,需要自己定義正規表示式到logstash-2.3.1/vendor/bundle/jruby/1.9/gems/logstash-patterns-core-2.0.5/patterns/grok-patterns檔案中。grok-patterns檔案中追加2行內容:

1
2
DATE_CN %{YEAR}[./-]%{MONTHNUM}[./-]%{MONTHDAY}
DATESTAMP_CN %{DATE_CN} %{TIME}

注意:logstash的正規表示式採用ruby語言正規表示式,具體語法可以參考網上。

remove_field => “message”表示解析完成之後刪除原來的 message欄位,避免重複。

date外掛

1
match => ["[@metadata][logdate]", "YYYY-MM-dd HH:mm:ss,SSS"]

logstash預設的時間欄位是@timestamp,如果不設定的話,預設是資料採集時候的時間,這裡我們將日誌列印的時間(即解析出的logdate欄位的內容)設定為@timestamp內容,方便之後kibana根據時間檢索。

注意:解析出來的@timestamp會比實際時間早8個小時,這是內建utc時間格式問題,kibana頁面展示的時候會根據瀏覽器當前時區自動轉換回來,這裡不作處理。

output

 

1
2
3
4
5
6
7
8
9
10
11
if "_grokparsefailure" not in [tags] and "_dateparsefailure" not in [tags] {
    stdout {codec => rubydebug}
     
    elasticsearch {
        index => "log4j-tradelog"
        hosts => ["134.7.1.67:9200"]
        manage_template => true
        template_overwrite => true
        template => "/home/elk/myconf/tradelog_template.json"
    }
}

前面提到過,如果grok解析失敗,會在tags欄位自動新增_grokparsefailure值,如果date解析失敗,會在tags欄位自動新增_dateparsefailure值。所以最後的輸出,我們採用條件過濾掉解析失敗的行內容。最終的每一行內容解析成json,一路存入elasticsearch,另一路進行標準輸出。

elasticsearch外掛

index:要匯入的es索引
host:es地址,有多個節點配置多個節點
template:指定elasticsearch的mapping模板檔案,如果該索引不存在,logstash會根據這個mapping模板去自動建立索引。

stdout外掛

rubydebug標準輸出,便於除錯,可以不使用該外掛。

最終解析出結果示例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
{
      "@version" => "1",
    "@timestamp" => "2016-05-09T01:44:48.366Z",
          "path" => "/home/elk/e.log",
          "host" => "ccc7",
          "type" => "tradelog",
       "opeType" => "WZQry",
          "name" => "lhcsssz2",
           "oid" => "abzzak",
            "ip" => "192.168.44.105",
           "mac" => "A1345C05-26C1-4253-8845-01CFCA8EC4FD",
        "result" => "Success"
}

error.log採集

日誌例項:

1
2016-09-29 17:13:24,184|ncid=1100343164|oid=acaatv|loginName=zhenglw1|transId=Withdraw|traceId=N/A-_A-88C4D-043|exceptType=com.intenft.exception.AppRTException|exceptCode=CORESYST_TXN_NATIVE_89042|exceptMsg=對不起!記錄沒有找到

配置檔案如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
input {
    file {
        path => "/home/elk/his/error.log*"
        type => "errorlog"
        start_position => "beginning"
        discover_interval => 5
         
        codec => multiline {
            charset => "GB2312"
            pattern => "^%{DATESTAMP_CN}"
            negate => true
            what => "next"       
        }
         
        sincedb_path => "/home/elk/myconf/sincedb_error.txt"
        sincedb_write_interval => 15
    }   
}
 
filter {
    grok {
        match => { "message" => "%{DATESTAMP_CN:[@metadata][logdate]}%{GREEDYDATA:[@metadata][keyvalue]}" }
        remove_field  => "message"
    }   
    date {
        match => ["[@metadata][logdate]", "YYYY-MM-dd HH:mm:ss,SSS"]
    }
    kv {
        source => "[@metadata][keyvalue]"
        field_split => "\|"
        value_split => "="
    }
}
 
output {
    if "multiline" in [tags] {
        stdout {codec => rubydebug}
        elasticsearch {
            index => "log4j-errorlog-3"
            hosts => ["168.7.1.67:9200"]
            manage_template => true
            template_overwrite => true
            template => "/home/elk/myconf/errorlog_template.json"
        }
    }   
}

 

input

8. start_position:設定beginning保證從檔案開頭讀取資料。
9. path:填入檔案路徑。
10. type:自定義型別為tradelog,由使用者任意填寫。
11. codec:multiline外掛
12. discover_interval:每隔多久去檢查一次被監聽的 path 下是否有新檔案,預設值是15秒
13. sincedb_path:設定記錄原始檔讀取位置的檔案,預設為檔案所在位置的隱藏檔案。
14. sincedb_write_interval:每隔15秒記錄一下檔案讀取位置

multiline外掛

logstash預設讀取一行內容為一個訊息,因為錯誤日誌包含堆疊資訊,多行對應一個訊息,所以使用該外掛合併多行為一條訊息。
pattern:以”YYYY-MM-dd HH:mm:ss,SSS”格式開頭的匹配為一條訊息。
negate:true 表示正向使用該patttern
what:匹配到的日期屬於下一條訊息
charset:設定檔案編碼

filter

grok外掛

匹配日期到logdata欄位,匹配剩下的所有字串到keyvalue臨時欄位,”GREEDYDATA”正規表示式為”.*”

date外掛

1
match => ["[@metadata][logdate]", "YYYY-MM-dd HH:mm:ss,SSS"]

logstash預設的時間欄位是@timestamp,如果不設定的話,預設是資料採集時候的時間,這裡我們將日誌列印的時間(即解析出的logdate欄位的內容)設定為@timestamp內容,方便之後kibana根據時間檢索。

注意:解析出來的@timestamp會比實際時間早8個小時,這是內建utc時間格式問題,kibana頁面展示的時候會根據瀏覽器當前時區自動轉換回來,這裡不作處理。

kv外掛

source:解析前面grok獲取的keyvalue欄位

1
(比如:|ncid=1100783164|oid=acaatv|loginName=zhew1|transId=Withdraw|traceId=N/A-_A-88C4D-043|exceptType=com.inteft.exception.AppRTException|exceptCode=CORESYST_TXN_NATIVE_89042|exceptMsg=對不起!記錄沒有找到)

field_split:按”|”切分key-value對
value_split:按”=”切分key 和 value,最終切分出來key作為欄位名,value作為欄位值

output

1
2
3
4
5
6
7
8
9
10
11
12
output {
    if "multiline" in [tags] {
        stdout {codec => rubydebug}
        elasticsearch {
            index => "log4j-errorlog-3"
            hosts => ["168.7.1.67:9200"]
            manage_template => true
            template_overwrite => true
            template => "/home/elk/myconf/errorlog_template.json"
        }
    }  
}

該日誌有2種格式的日誌,一種是單行的錯誤資訊日誌,一種是多行的包含堆疊資訊的日誌,這2種日誌內容重複,那麼只需要解析單行格式的日誌。kv外掛解析多行格式的日誌時, tags欄位裡沒有”multipline”值(原因是因為grok解析的時候不能解析換行符),所以可以通過if條件判斷tags欄位是否有”multipline”值,來過濾掉多行格式的日誌。

elasticsearch外掛

index:要匯入的es索引
host:es地址,有多個節點配置多個節點
template:指定elasticsearch的mapping模板檔案,如果該索引不存在,logstash會根據這個mapping模板去自動建立索引。

最終解析的結果示例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
{
    "@timestamp" => "2016-09-29T09:14:22.194Z",
      "@version" => "1",
          "tags" => [
        [0] "multiline"
    ],
          "path" => "/home/elk/stst.log",
          "host" => "ci7",
          "type" => "sttlog",
          "ncid" => "1143164",
           "oid" => "acav",
     "loginName" => "zhew1",
       "transId" => "MyQuery",
       "traceId" => "N/A8C4E-047",
    "exceptType" => "com.exception.AppRTException",
    "exceptCode" => "CORESYNATIVE_82243",
     "exceptMsg" => "對不起!根據賬號獲取客戶資訊錯誤"
}

 

總結:

注意:

logstash filter中的每一個外掛都有add_field,remove_field,add_tag,remove_tag 4個功能。

附錄:

mapping模板檔案

tradelog:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
{
    "template": "log4j-tradelog*",
    "settings": {
        "index.number_of_shards": 3,
        "number_of_replicas": 0
    },
    "mappings": {
        "tradelog": {
            "_all": {
                "enabled": false
            },
            "properties": {
                "@timestamp": {
                    "type": "date",
                    "format": "strict_date_optional_time||epoch_millis",
                    "doc_values": true
                },
                "@version": {
                    "type": "string",
                    "index": "not_analyzed"
                },
                "exception": {
                    "type": "string",
                    "index": "analyzed"
                },
                "path": {
                    "type": "string",
                    "index": "not_analyzed"
                },
                "host": {
                    "type": "string",
                    "index": "not_analyzed"
                },
                "ip": {
                    "type": "ip",
                    "index": "not_analyzed"
                },
                "logger_name": {
                    "type": "string",
                    "index": "not_analyzed"
                },
                "mac": {
                    "type": "string",
                    "index": "not_analyzed"
                },
                "name": {
                    "type": "string",
                    "index": "not_analyzed"
                },
                "oid": {
                    "type": "string",
                    "index": "not_analyzed"
                },
                "opeType": {
                    "type": "string",
                    "index": "not_analyzed"
                },
                "priority": {
                    "type": "string",
                    "index": "not_analyzed"
                },
                "result": {
                    "type": "string",
                    "index": "not_analyzed"
                },
                "type": {
                    "type": "string",
                    "index": "not_analyzed"
                }
            }
        }
    }
}

 

errorlog:

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
{
    "template": "log4j-errorlog*",
    "settings": {
        "index.number_of_shards": 3,
        "number_of_replicas": 0
    },
    "mappings": {
        "errorlog": {
            "_all": {
                "enabled": false
            },
            "properties": {
                "host": {
                    "type": "string",
                    "index": "not_analyzed"
                },
                "ncid": {
                    "type": "string",
                    "index": "not_analyzed"
                },
                "type": {
                    "type": "string",
                    "index": "not_analyzed"
                },
                "@version": {
                    "type": "string",
                    "index": "not_analyzed"
                },
                "exceptType": {
                    "type": "string",
                    "index": "not_analyzed"
                },
                "@timestamp": {
                    "format": "strict_date_optional_time||epoch_millis",
                    "type": "date"
                },
                "exceptCode": {
                    "type": "string",
                    "index": "not_analyzed"
                },
                "transId": {
                    "type": "string",
                    "index": "not_analyzed"
                },
                "priority": {
                    "type": "string",
                    "index": "not_analyzed"
                },
                "oid": {
                    "type": "string",
                    "index": "not_analyzed"
                },
                "traceId": {
                    "type": "string",
                    "index": "not_analyzed"
                },
                "exceptMsg": {
                    "type": "string",
                    "index": "analyzed"
                },
                "path": {
                    "type": "string",
                    "index": "not_analyzed"
                },
                "logger_name": {
                    "type": "string",
                    "index": "not_analyzed"
                },
                "loginName": {
                    "type": "string",
                    "index": "not_analyzed"
                }
            }
        }
    }
}

############################################################################

轉自:https://www.2cto.com/kf/201610/560348.html