Inhaltsverzeichnis

Logstash ist ein Open-Source-Tool, das als Teil des elk-stack-Stacks (Elasticsearch, Logstash, Kibana) entwickelt wurde und von Elastic (früher ElasticSearch) bereitgestellt wird. Der ELK-Stack ist eine beliebte Lösung für das Sammeln, Verarbeiten, Indexieren und Visualisieren von großen Mengen an Daten, insbesondere von Logdateien und anderen Ereignisdaten.

X-Pack ist ein Feature-Pack das Elasticsearch und Kibana um Authentication und Authorization erweitert.

Logstash dient hauptsächlich dazu, Daten aus verschiedenen Quellen zu sammeln, zu transformieren und in einheitlicher Form an Elasticsearch oder andere Datenbanken weiterzuleiten. Es ermöglicht die Aufnahme von Daten aus verschiedenen Quellen wie Logdateien, Datenbanken, APIs und mehr. Anschließend können diese Daten in geeigneter Weise umgewandelt, normalisiert und in eine Datenbank wie Elasticsearch eingefügt werden, um sie später zu durchsuchen und zu analysieren.

Die Hauptfunktionen von Logstash umfassen:

Grok ein Plugin um strukturierte Daten aus unstrukturierten Protokollnachrichten zu parsen und zu extrahieren. Es ist besonders nützlich, wenn es um Protokolle geht, die kein vordefiniertes Format oder Schema haben. GROK Webinar

input {
  # Eingangskonfiguration, z. B. Datei, Beats, Kafka, usw.
}

filter {
  grok {
    match => { "message" => "%{SYSLOGTIMESTAMP:timestamp} %{SYSLOGHOST:host} %{DATA:severity}: %{GREEDYDATA:message}" }
  }
}

output {
  # Ausgabekonfiguration, z. B. Elasticsearch, stdout, usw.
}

Installation

Windows

logstash-7.12.0-windows-x86_64.zip

Docker

docker pull docker.elastic.co/logstash/logstash:8.13.2

Greedydata Newline Error

See Stackoverflow

Alerts

Man kann Alarme bei bestimmten Lognachrichten einstellen.

Konfiguration

Siehe auch Elastic Guide - LogStash

Config

/etc/logstash.conf

Apache Access Log Example

input {
  file {
    path => "/tmp/access_log"
    start_position => "beginning"
  }
}

filter {
  if [path] =~ "access" {
    mutate { replace => { "type" => "apache_access" } }
    grok {
      match => { "message" => "%{COMBINEDAPACHELOG}" }
    }
  }
  date {
    match => [ "timestamp" , "dd/MMM/yyyy:HH:mm:ss Z" ]
  }
}

output {
  elasticsearch {
    hosts => ["localhost:9200"]
  }
  stdout { codec => rubydebug }
}

Apache Access Log Example mit Conditions

input {
  file {
    path => "/tmp/*_log"
  }
}

filter {
  if [path] =~ "access" {
    mutate { replace => { type => "apache_access" } }
    grok {
      match => { "message" => "%{COMBINEDAPACHELOG}" }
    }
    date {
      match => [ "timestamp" , "dd/MMM/yyyy:HH:mm:ss Z" ]
    }
  } else if [path] =~ "error" {
    mutate { replace => { type => "apache_error" } }
  } else {
    mutate { replace => { type => "random_logs" } }
  }
}

output {
  elasticsearch { hosts => ["localhost:9200"] }
  stdout { codec => rubydebug }
}

Run logstash

bin/logstash -f logstash-apache.conf
# the json input
root@monitoring:~# cat tmp.json 
{"message":{"someField":"someValue"}}


# the logstash configuration file
root@monitoring:~# cat /etc/logstash/conf.d/test.conf
input {
  tcp {
    port => 5400
    codec => json
  }
}

filter{
}

output {
   stdout {
     codec => rubydebug
   }
}


# starting the logstash server
/opt/logstash/bin/logstash -f /etc/logstash/conf.d/test.conf


# sending the json to logstash with netcat
nc localhost 5400 <  tmp.json

# the logstash output via stdout
{
       "message" => {
        "someField" => "someValue"
    },
      "@version" => "1",
    "@timestamp" => "2016-02-02T13:31:18.703Z",
          "host" => "0:0:0:0:0:0:0:1",
          "port" => 56812
}
input {
 http {
    host => "0.0.0.0"
    port => "8080"
  }
}

output {
    stdout {}
}
input {
  file {
    path => "D:/test.csv"
    start_position => "beginning"
  }
}

filter {
  csv {
    columns => [ "a", "b" ]
    separator => ","
    skip_header => "true"
  }
}

output {
  elasticsearch { 
    hosts => ["http://localhost:9200"] 
    index => "test"
    user => "elastic"        
    password => ""  
  }
}

API Postman

GET server:port/_node/pipelines