VPC FLow Logsを継続的にElasticSearchに投入する

AWS
Published: 2015-06-14

 VPC FLow Logsのリリース直後から、クラメソさんの「VPC Flow LogsをElasticsearch + Kibana4で可視化する」と同じことを考えていました。週末に試行錯誤した結果をアウトプットします。

ログの取り方

 AWS SDK for Ruby を利用してClodWatch Logsを取得する方法は以下の様になります。

# coding: utf-8

require 'aws-sdk-core'

cloudwatchlogs = Aws::CloudWatchLogs::Client.new(region: region )

# cloudwatchlogs.get_log_eventsのオプションを定義
options = {
    log_group_name: log_group_name,
    log_stream_name: log_stream_name,
}

# ログを取得
resp = cloudwatchlogs.get_log_events(options)

 ただし、この方法でログを取得した場合、指定したlog_streamに格納されている大量のデータがレスポンスとして帰ってきます。デフォルトでは最大で1M Byte分のログが取得するようです。

Class: Aws::CloudWatchLogs::Client

By default, this operation returns as much log events as can fit in a response size of 1MB, up to 10,000 log events.

 したがって、このコードを継続的に実行すると、最初から1M byte分のログを繰り返し取得してしまいます。これでは意味がありません。実行時には、前回実行分以降のログを取得してほしい。これを実現する方法が、get_log_eventsnext_tokenオプションです。

増分ログの取り方

 get_log_eventsのレスポンスにはnext_forward_tokennext_backward_tokenが含まれています。これらは取得結果の次のページの位置を示しています。より新しいログの位置は名前的にnext_forward_tokenが保持しているっぽいです。

Class: Aws::CloudWatchLogs::Client

resp.events #=> Array

resp.events[0].timestamp #=> Integer

resp.events[0].message #=> String

resp.events[0].ingestion_time #=> Integer

resp.next_forward_token #=> String

resp.next_backward_token #=> String

 そこで、このtokenを利用してget_log_eventsを実行するように、スクリプトを変更します。tokenの値は、fluentdっぽくstateファイルを作り、そこに書き込んでおきます。

# coding: utf-8

require 'aws-sdk-core'
require 'fileutils'

region = 'ap-northeast-1'
log_group_name = 'VPCFLowLog'
log_stream_name = 'eni-xxxxxxxx-all'
@state_file = Dir.pwd + "/" + log_group_name + "." + log_stream_name + ".state"

# トークンをstateファイルに書き込む
def write_token(token)
    File.open(@state_file,"w") do |file|
        file.puts(token)
    end
end

# トークンをstateファイルから読み込む
def read_token
    if File.exist?(@state_file) then
        return File.read(@state_file).chomp
    else
        return 
    end
end

cloudwatchlogs = Aws::CloudWatchLogs::Client.new(region: region )

# cloudwatchlogs.get_log_eventsのオプションを定義
options = {
    log_group_name: log_group_name,
    log_stream_name: log_stream_name,
}

# もしstateファイルから前回のtokenが取得できたら、そのtokenをオプションに追加
if read_token != nil  then
    options[:next_token] = read_token
end

# ログを取得
resp = cloudwatchlogs.get_log_events(options)

# 取得したログからtokenを保存
write_token(resp.next_forward_token)

動作確認

 ElasticSearchに投入済みのデータは以下の通りです。19:25:47までのログが格納されています。

投入済みデータ

 next_tokenをつけてget_log_eventsしたデータをElasticSearchに投入します。投入時のログは以下の通りです。19:28:40のデータ以降がElasticSearchに投入されていることがわかります。

2015-06-14 19:38:36 +0900: > {"@timestamp":"2015-06-14 19:28:40","version":"2","account-id":"250369693989","interface-id":"eni-f5a92c83","srcaddr":"157.7.235.92","dstaddr":"10.175.10.97","srcport":"123","dstport":"123","protocol":"17","packets":"1","bytes":"76","start":"1434277720","end":"1434277760","action":"ACCEPT","log-status":"OK"}
2015-06-14 19:38:36 +0900: < {"_index":"aws","_type":"vpcflowlog","_id":"AU3xpkmyyQj6bWWoL4TA","_version":1,"created":true}
2015-06-14 19:38:36 +0900: POST http://localhost:9200/aws/vpcflowlog [status:201, request:0.004s, query:n/a]
2015-06-14 19:38:36 +0900: > {"@timestamp":"2015-06-14 19:28:40","version":"2","account-id":"250369693989","interface-id":"eni-f5a92c83","srcaddr":"10.175.10.97","dstaddr":"157.7.235.92","srcport":"123","dstport":"123","protocol":"17","packets":"1","bytes":"76","start":"1434277720","end":"1434277760","action":"ACCEPT","log-status":"OK"}
2015-06-14 19:38:36 +0900: < {"_index":"aws","_type":"vpcflowlog","_id":"AU3xpkm4yQj6bWWoL4TB","_version":1,"created":true}
2015-06-14 19:38:36 +0900: POST http://localhost:9200/aws/vpcflowlog [status:201, request:0.003s, query:n/a]
2015-06-14 19:38:36 +0900: > {"@timestamp":"2015-06-14 19:28:40","version":"2","account-id":"250369693989","interface-id":"eni-f5a92c83","srcaddr":"162.255.180.213","dstaddr":"10.175.10.97","srcport":"1982","dstport":"445","protocol":"6","packets":"2","bytes":"96","start":"1434277720","end":"1434277760","action":"REJECT","log-status":"OK"}
2015-06-14 19:38:36 +0900: < {"_index":"aws","_type":"vpcflowlog","_id":"AU3xpkm9yQj6bWWoL4TC","_version":1,"created":true}
2015-06-14 19:38:36 +0900: POST http://localhost:9200/aws/vpcflowlog [status:201, request:0.003s, query:n/a]
(中略)
2015-06-14 19:38:36 +0900: > {"@timestamp":"2015-06-14 19:35:22","version":"2","account-id":"250369693989","interface-id":"eni-f5a92c83","srcaddr":"10.175.10.97","dstaddr":"46.17.98.184","srcport":"22","dstport":"27530","protocol":"6","packets":"1","bytes":"48","start":"1434278122","end":"1434278181","action":"ACCEPT","log-status":"OK"}
2015-06-14 19:38:36 +0900: < {"_index":"aws","_type":"vpcflowlog","_id":"AU3xpkoQyQj6bWWoL4TY","_version":1,"created":true}
2015-06-14 19:38:36 +0900: POST http://localhost:9200/aws/vpcflowlog [status:201, request:0.002s, query:n/a]
2015-06-14 19:38:36 +0900: > {"@timestamp":"2015-06-14 19:36:55","version":"2","account-id":"250369693989","interface-id":"eni-f5a92c83","srcaddr":"199.203.59.117","dstaddr":"10.175.10.97","srcport":"26600","dstport":"80","protocol":"6","packets":"1","bytes":"48","start":"1434278215","end":"1434278241","action":"REJECT","log-status":"OK"}
2015-06-14 19:38:36 +0900: < {"_index":"aws","_type":"vpcflowlog","_id":"AU3xpkoTyQj6bWWoL4TZ","_version":1,"created":true}

 投入後のデータ一覧は以下の通りです。19:25:47以前のログが重複登録されることなく、19:25:47以降のログが増えました!!!

投入済みデータ

 それっぽく動いたスクリプトは以下の通りです。cronで回してみてみようと思います。

 

# coding: utf-8

require "json"
require 'aws-sdk-core'
require 'elasticsearch'
require 'fileutils'

region = 'ap-northeast-1'
log_group_name = 'VPCFLowLog'
log_stream_name = 'eni-xxxxxxxx-all'
@state_file = Dir.pwd + "/" + log_group_name + "." + log_stream_name + ".state"

# トークンをstateファイルに書き込む
def write_token(token)
    File.open(@state_file,"w") do |file|
        file.puts(token)
    end
end

# トークンをstateファイルから読み込む
def read_token
    if File.exist?(@state_file) then
        return File.read(@state_file).chomp
    else
        return 
    end
end

cloudwatchlogs = Aws::CloudWatchLogs::Client.new(region: region )

# cloudwatchlogs.get_log_eventsのオプションを定義
options = {
    log_group_name: log_group_name,
    log_stream_name: log_stream_name,
}

# もしstateファイルから前回のtokenが取得できたら、そのtokenをオプションに追加
if read_token != nil  then
    options[:next_token] = read_token
end

# ログを取得
resp = cloudwatchlogs.get_log_events(options)

# 取得したログからtokenを保存
write_token(resp.next_forward_token)

hash = {}
message_elements = Array.new()

message_field = [
    "version",
    "account-id",
    "interface-id",
    "srcaddr",
    "dstaddr",
    "srcport",
    "dstport",
    "protocol",
    "packets",
    "bytes",
    "start",
    "end",
    "action",
    "log-status"]

resp.events.each {|event|

    hash["@timestamp"] = Time.at(event.timestamp/1000.0).strftime('%Y-%m-%d %H:%M:%S')
    message_elements = event.message.split(" ")
    message_elements.each.with_index(0)  {|element,i|
        hash[message_field[i]] = element
    }

    # BytesとPacketsをInteger型にすると、NODATAの時の-が型エラーになるので、捨てる
    if hash["log-status"] != "NODATA" then
        client = Elasticsearch::Client.new(hosts: "localhost:9200",log: true)
        client.index(index:"aws", type:"vpcflowlog", body:hash.to_json)
    end
}