Logstash ist ein Open-Source-Tool, das als Teil des elk-stack-Stacks (Elasticsearch, Logstash, Kibana) entwickelt wurde und von Elastic (früher ElasticSearch) bereitgestellt wird. Der ELK-Stack ist eine beliebte Lösung für das Sammeln, Verarbeiten, Indexieren und Visualisieren von großen Mengen an Daten, insbesondere von Logdateien und anderen Ereignisdaten.
Logstash dient hauptsächlich dazu, Daten aus verschiedenen Quellen zu sammeln, zu transformieren und in einheitlicher Form an Elasticsearch oder andere Datenbanken weiterzuleiten. Es ermöglicht die Aufnahme von Daten aus verschiedenen Quellen wie Logdateien, Datenbanken, APIs und mehr. Anschließend können diese Daten in geeigneter Weise umgewandelt, normalisiert und in eine Datenbank wie Elasticsearch eingefügt werden, um sie später zu durchsuchen und zu analysieren.
Die Hauptfunktionen von Logstash umfassen:
Grok ein Plugin um strukturierte Daten aus unstrukturierten Protokollnachrichten zu parsen und zu extrahieren. Es ist besonders nützlich, wenn es um Protokolle geht, die kein vordefiniertes Format oder Schema haben. GROK Webinar
input { # Eingangskonfiguration, z. B. Datei, Beats, Kafka, usw. } filter { grok { match => { "message" => "%{SYSLOGTIMESTAMP:timestamp} %{SYSLOGHOST:host} %{DATA:severity}: %{GREEDYDATA:message}" } } } output { # Ausgabekonfiguration, z. B. Elasticsearch, stdout, usw. }
docker pull docker.elastic.co/logstash/logstash:8.13.2
See Stackoverflow
Man kann Alarme bei bestimmten Lognachrichten einstellen.
Siehe auch Elastic Guide - LogStash
Config
/etc/logstash.conf
Apache Access Log Example
input { file { path => "/tmp/access_log" start_position => "beginning" } } filter { if [path] =~ "access" { mutate { replace => { "type" => "apache_access" } } grok { match => { "message" => "%{COMBINEDAPACHELOG}" } } } date { match => [ "timestamp" , "dd/MMM/yyyy:HH:mm:ss Z" ] } } output { elasticsearch { hosts => ["localhost:9200"] } stdout { codec => rubydebug } }
Apache Access Log Example mit Conditions
input { file { path => "/tmp/*_log" } } filter { if [path] =~ "access" { mutate { replace => { type => "apache_access" } } grok { match => { "message" => "%{COMBINEDAPACHELOG}" } } date { match => [ "timestamp" , "dd/MMM/yyyy:HH:mm:ss Z" ] } } else if [path] =~ "error" { mutate { replace => { type => "apache_error" } } } else { mutate { replace => { type => "random_logs" } } } } output { elasticsearch { hosts => ["localhost:9200"] } stdout { codec => rubydebug } }
Run logstash
bin/logstash -f logstash-apache.conf
# the json input root@monitoring:~# cat tmp.json {"message":{"someField":"someValue"}} # the logstash configuration file root@monitoring:~# cat /etc/logstash/conf.d/test.conf input { tcp { port => 5400 codec => json } } filter{ } output { stdout { codec => rubydebug } } # starting the logstash server /opt/logstash/bin/logstash -f /etc/logstash/conf.d/test.conf # sending the json to logstash with netcat nc localhost 5400 < tmp.json # the logstash output via stdout { "message" => { "someField" => "someValue" }, "@version" => "1", "@timestamp" => "2016-02-02T13:31:18.703Z", "host" => "0:0:0:0:0:0:0:1", "port" => 56812 }
input { http { host => "0.0.0.0" port => "8080" } } output { stdout {} }
input { file { path => "D:/test.csv" start_position => "beginning" } } filter { csv { columns => [ "a", "b" ] separator => "," skip_header => "true" } } output { elasticsearch { hosts => ["http://localhost:9200"] index => "test" user => "elastic" password => "" } }
GET server:port/_node/pipelines