designetwork

ネットワークを軸としたIT技術メモ

Logstashで内容ごとに送信先を複数に振り分ける設定

※本記事の内容は古いです。pipelines.ymlでパイプライン分割する方法を推奨します。

www.elastic.co

- pipeline.id: my-pipeline_1
  path.config: "/etc/path/to/p1.config"
  pipeline.workers: 3
- pipeline.id: my-other-pipeline
  path.config: "/etc/different/path/p2.cfg"
  queue.type: persisted

Logstashでは設定したConfigは全体的に有効になるため、シンプルな設定では単一の出力設定となる。そこで、ifで項目により条件分岐させることで、複数の出力を設定できる。

前回こちらの記事で紹介した汎用的な設計を元に、Logstashからの送信先を複数に分散、振り分ける設定を追加する。

Logstashの設定

  • input/filter

inputのConfigファイルはこちらのとおり。syslog系のログをモニタリングして収集する。tagsにより、Fluentd(td-agent)と同様の処理を実装する。

/etc/logstash/conf.d/messages.conf

input {
  file {
    path => "/var/log/messages"
    tags => ["sys", "logstash_messages", "%{host}" ]
    type => syslog
  }
  file {
    path => "/var/log/cron"
    tags => ["sys", "logstash_cron", "%{host}" ]
    type => syslog
  }
  file {
    path => "/var/log/secure"
    tags => ["sys", "logstash_secure", "%{host}" ]
    type => syslog
  }
}

filter {
  if [type] == "syslog" {
    grok {
      match => { "message" => "%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:syslog_hostname} %{DATA:syslog_program}(?:\[%{POSINT:syslog_pid}\])?: %{GREEDYDATA:syslog_message}" }
      add_field => [ "received_at", "%{@timestamp}" ]
      add_field => [ "received_from", "%{host}" ]
    }
    date {
      match => [ "syslog_timestamp", "MMM  d HH:mm:ss", "MMM dd HH:mm:ss" ]
    }
  }
}

今回は試験用に/var/log/secure/var/log/cronchmod 644で読み取れるようにしておく。デフォルトだと600で開けず以下のエラーが発生する。

[2017-04-10T02:14:50,228][WARN ][logstash.inputs.file     ] failed to open /var/log/cron: Permission denied - /var/log/cron
[2017-04-10T02:14:50,519][WARN ][logstash.inputs.file     ] failed to open /var/log/secure: Permission denied - /var/log/secure
  • output

出力用の設定は次の通り。Fluentd(td-agent)の<match tag>と同様にtagにより送信・出力先を複数に振り分ける。配列の項目を比較する際は次のようになる。今回Elasticsearchは同一サーバを使用したが、別々に指定できる。ちなみに、上記filterプラグインでも既にシンプルなif分岐を使用している。

比較は基本的にRubyの文法に従う。詳細はオフィシャルを参照。

/etc/logstash/conf.d/output.conf

output {
  if [tags][1] == "logstash_messages" {
    elasticsearch {
      hosts => ["localhost:9200"]
      index => "%{tags[0]}.%{tags[1]}-%{+YYYY.MM.dd}"
    }
    file {
      path => "/var/log/logstash/%{tags[0]}/%{tags[1]}.log"
    }
  } else if [tags][1] == "logstash_cron" {
    elasticsearch {
      hosts => ["localhost:9200"]
      index => "%{tags[0]}.%{tags[1]}-%{+YYYY.MM.dd}"
    }
    file {
      path => "/var/log/logstash/others/%{tags[1]}.log"
    }
  } else {
    file {
      path => "/var/log/logstash/others/other.log"
    }
  }
}

配列の内容を比較する場合は[<type>][<number>]を指定する。通常だと[<type>]のみ。

実装結果

Elasticsearchのindexは設定通り、messagesとcronのみが作成されている。

$ curl http://localhost:9200/sys.logstash_cron-*
{"sys.logstash_cron-2017.04.09":{"aliases":{},"mappings":{"syslog":...

$ curl http://localhost:9200/sys.logstash_messages-*
{"sys.logstash_messages-2017.04.09":{"aliases":{},"mappings":{"syslog":...

$ curl http://localhost:9200/sys.logstash_secure-*
{}

fileへのバックアップはこちらの通り。設定した通りにファイルが生成されている。

$ sudo tree /var/log/logstash/

/var/log/logstash/
├── logstash-plain.log
├── others
│   ├── logstash_cron.log
│   └── other.log
└── sys
    └── logstash_messages.log

まとめ - Logstashで内容ごとに送信先を複数に振り分ける設定

if文を使用することで、Logstashで内容ごとにログの送信先を複数に分散、振り分けることができた。ログ種別に応じて、Elasticsearch、fileなど柔軟に保存方法を設定できる。