VPC FLow Logsのリリース直後から、クラメソさんの「VPC Flow LogsをElasticsearch + Kibana4で可視化する」と同じことを考えていました。週末に試行錯誤した結果をアウトプットします。
ログの取り方
AWS SDK for Ruby を利用してClodWatch Logsを取得する方法は以下の様になります。
# coding: utf-8
require 'aws-sdk-core'
cloudwatchlogs = Aws::CloudWatchLogs::Client.new(region: region )
# cloudwatchlogs.get_log_eventsのオプションを定義
options = {
log_group_name: log_group_name,
log_stream_name: log_stream_name,
}
# ログを取得
resp = cloudwatchlogs.get_log_events(options)
ただし、この方法でログを取得した場合、指定したlog_streamに格納されている大量のデータがレスポンスとして帰ってきます。デフォルトでは最大で1M Byte分のログが取得するようです。
Class: Aws::CloudWatchLogs::Client
By default, this operation returns as much log events as can fit in a response size of 1MB, up to 10,000 log events.
したがって、このコードを継続的に実行すると、最初から1M byte分のログを繰り返し取得してしまいます。これでは意味がありません。実行時には、前回実行分以降のログを取得してほしい。これを実現する方法が、get_log_events
のnext_token
オプションです。
増分ログの取り方
get_log_events
のレスポンスにはnext_forward_token
とnext_backward_token
が含まれています。これらは取得結果の次のページの位置を示しています。より新しいログの位置は名前的にnext_forward_token
が保持しているっぽいです。
Class: Aws::CloudWatchLogs::Client
resp.events #=> Array
resp.events[0].timestamp #=> Integer
resp.events[0].message #=> String
resp.events[0].ingestion_time #=> Integer
resp.next_forward_token #=> String
resp.next_backward_token #=> String
そこで、このtokenを利用してget_log_events
を実行するように、スクリプトを変更します。tokenの値は、fluentdっぽくstateファイルを作り、そこに書き込んでおきます。
# coding: utf-8
require 'aws-sdk-core'
require 'fileutils'
region = 'ap-northeast-1'
log_group_name = 'VPCFLowLog'
log_stream_name = 'eni-xxxxxxxx-all'
@state_file = Dir.pwd + "/" + log_group_name + "." + log_stream_name + ".state"
# トークンをstateファイルに書き込む
def write_token(token)
File.open(@state_file,"w") do |file|
file.puts(token)
end
end
# トークンをstateファイルから読み込む
def read_token
if File.exist?(@state_file) then
return File.read(@state_file).chomp
else
return
end
end
cloudwatchlogs = Aws::CloudWatchLogs::Client.new(region: region )
# cloudwatchlogs.get_log_eventsのオプションを定義
options = {
log_group_name: log_group_name,
log_stream_name: log_stream_name,
}
# もしstateファイルから前回のtokenが取得できたら、そのtokenをオプションに追加
if read_token != nil then
options[:next_token] = read_token
end
# ログを取得
resp = cloudwatchlogs.get_log_events(options)
# 取得したログからtokenを保存
write_token(resp.next_forward_token)
動作確認
ElasticSearchに投入済みのデータは以下の通りです。19:25:47までのログが格納されています。
next_token
をつけてget_log_events
したデータをElasticSearchに投入します。投入時のログは以下の通りです。19:28:40のデータ以降がElasticSearchに投入されていることがわかります。
2015-06-14 19:38:36 +0900: > {"@timestamp":"2015-06-14 19:28:40","version":"2","account-id":"250369693989","interface-id":"eni-f5a92c83","srcaddr":"157.7.235.92","dstaddr":"10.175.10.97","srcport":"123","dstport":"123","protocol":"17","packets":"1","bytes":"76","start":"1434277720","end":"1434277760","action":"ACCEPT","log-status":"OK"}
2015-06-14 19:38:36 +0900: < {"_index":"aws","_type":"vpcflowlog","_id":"AU3xpkmyyQj6bWWoL4TA","_version":1,"created":true}
2015-06-14 19:38:36 +0900: POST http://localhost:9200/aws/vpcflowlog [status:201, request:0.004s, query:n/a]
2015-06-14 19:38:36 +0900: > {"@timestamp":"2015-06-14 19:28:40","version":"2","account-id":"250369693989","interface-id":"eni-f5a92c83","srcaddr":"10.175.10.97","dstaddr":"157.7.235.92","srcport":"123","dstport":"123","protocol":"17","packets":"1","bytes":"76","start":"1434277720","end":"1434277760","action":"ACCEPT","log-status":"OK"}
2015-06-14 19:38:36 +0900: < {"_index":"aws","_type":"vpcflowlog","_id":"AU3xpkm4yQj6bWWoL4TB","_version":1,"created":true}
2015-06-14 19:38:36 +0900: POST http://localhost:9200/aws/vpcflowlog [status:201, request:0.003s, query:n/a]
2015-06-14 19:38:36 +0900: > {"@timestamp":"2015-06-14 19:28:40","version":"2","account-id":"250369693989","interface-id":"eni-f5a92c83","srcaddr":"162.255.180.213","dstaddr":"10.175.10.97","srcport":"1982","dstport":"445","protocol":"6","packets":"2","bytes":"96","start":"1434277720","end":"1434277760","action":"REJECT","log-status":"OK"}
2015-06-14 19:38:36 +0900: < {"_index":"aws","_type":"vpcflowlog","_id":"AU3xpkm9yQj6bWWoL4TC","_version":1,"created":true}
2015-06-14 19:38:36 +0900: POST http://localhost:9200/aws/vpcflowlog [status:201, request:0.003s, query:n/a]
(中略)
2015-06-14 19:38:36 +0900: > {"@timestamp":"2015-06-14 19:35:22","version":"2","account-id":"250369693989","interface-id":"eni-f5a92c83","srcaddr":"10.175.10.97","dstaddr":"46.17.98.184","srcport":"22","dstport":"27530","protocol":"6","packets":"1","bytes":"48","start":"1434278122","end":"1434278181","action":"ACCEPT","log-status":"OK"}
2015-06-14 19:38:36 +0900: < {"_index":"aws","_type":"vpcflowlog","_id":"AU3xpkoQyQj6bWWoL4TY","_version":1,"created":true}
2015-06-14 19:38:36 +0900: POST http://localhost:9200/aws/vpcflowlog [status:201, request:0.002s, query:n/a]
2015-06-14 19:38:36 +0900: > {"@timestamp":"2015-06-14 19:36:55","version":"2","account-id":"250369693989","interface-id":"eni-f5a92c83","srcaddr":"199.203.59.117","dstaddr":"10.175.10.97","srcport":"26600","dstport":"80","protocol":"6","packets":"1","bytes":"48","start":"1434278215","end":"1434278241","action":"REJECT","log-status":"OK"}
2015-06-14 19:38:36 +0900: < {"_index":"aws","_type":"vpcflowlog","_id":"AU3xpkoTyQj6bWWoL4TZ","_version":1,"created":true}
投入後のデータ一覧は以下の通りです。19:25:47以前のログが重複登録されることなく、19:25:47以降のログが増えました!!!
それっぽく動いたスクリプトは以下の通りです。cronで回してみてみようと思います。
# coding: utf-8
require "json"
require 'aws-sdk-core'
require 'elasticsearch'
require 'fileutils'
region = 'ap-northeast-1'
log_group_name = 'VPCFLowLog'
log_stream_name = 'eni-xxxxxxxx-all'
@state_file = Dir.pwd + "/" + log_group_name + "." + log_stream_name + ".state"
# トークンをstateファイルに書き込む
def write_token(token)
File.open(@state_file,"w") do |file|
file.puts(token)
end
end
# トークンをstateファイルから読み込む
def read_token
if File.exist?(@state_file) then
return File.read(@state_file).chomp
else
return
end
end
cloudwatchlogs = Aws::CloudWatchLogs::Client.new(region: region )
# cloudwatchlogs.get_log_eventsのオプションを定義
options = {
log_group_name: log_group_name,
log_stream_name: log_stream_name,
}
# もしstateファイルから前回のtokenが取得できたら、そのtokenをオプションに追加
if read_token != nil then
options[:next_token] = read_token
end
# ログを取得
resp = cloudwatchlogs.get_log_events(options)
# 取得したログからtokenを保存
write_token(resp.next_forward_token)
hash = {}
message_elements = Array.new()
message_field = [
"version",
"account-id",
"interface-id",
"srcaddr",
"dstaddr",
"srcport",
"dstport",
"protocol",
"packets",
"bytes",
"start",
"end",
"action",
"log-status"]
resp.events.each {|event|
hash["@timestamp"] = Time.at(event.timestamp/1000.0).strftime('%Y-%m-%d %H:%M:%S')
message_elements = event.message.split(" ")
message_elements.each.with_index(0) {|element,i|
hash[message_field[i]] = element
}
# BytesとPacketsをInteger型にすると、NODATAの時の-が型エラーになるので、捨てる
if hash["log-status"] != "NODATA" then
client = Elasticsearch::Client.new(hosts: "localhost:9200",log: true)
client.index(index:"aws", type:"vpcflowlog", body:hash.to_json)
end
}