[[https://www.elastic.co/de/elasticsearch|Elasticsearch]] ist eine Open-Source-Such- und Analyselösung in [[coding:java|Java]], die hauptsächlich für das **Durchsuchen und Analysieren großer Mengen von Daten in Echtzeit** verwendet wird. Es wurde ursprünglich von der Firma Elastic entwickelt und ist ein wichtiger Bestandteil des sogenannten [[elk-stack|ELK-Stacks]] (Elasticsearch, [[Logstash]] und [[Kibana]]), der häufig zur Verarbeitung und Visualisierung von Logdaten eingesetzt wird. Elasticsearch **basiert auf Apache Lucene**, einer leistungsstarken, Open-Source-Textsuchbibliothek.
Siehe auch [[OpenSearch]], [[Wazuh]]
{
"query": {
"match_all": {}
},
"sort": [{ "@timestamp": "asc" }]
}
#Delete datastream
curl -X DELETE "https://172.21.0.134:9200/_data_stream/test-vie-srv-ex01"
# find all agents
# windows
curl -X GET "https://172.21.0.134:9200/winlogbeat-8.17.4/_search?pretty" --insecure -u "xxxxxxx:xxxxxx" -d "{\"size\":0,\"aggs\":{\"unique_agents\":{\"terms\":{\"field\":\"agent.name\",\"size\":10000}}}}" -H "Content-Type:application/json"
# linux
curl -X GET "https://172.21.0.134:9200/winlogbeat-8.17.4/_search?pretty" --insecure -u "xxxxxxx:xxxxxx" -d '{"size":0,"aggs":{"unique_agents":{"terms":{"field":"agent.name","size":10000}}}}' -H "Content-Type:application/json" | jq -r '.aggregations.unique_agents.buckets[] | "\(.key)"'
[[https://www.elastic.co/de/what-is/open-x-pack|X-Pack]] ist eine Plugin-Suite und bietet eine Sammlung von kommerziellen Erweiterungen und Funktionen, um die Fähigkeiten von Elasticsearch zu erweitern. **X-Pack bietet verschiedene Module in den Bereichen Sicherheit, Überwachung, Reporting und [[machine_learning|maschinelles Lernen]]**.
=====Installation=====
====Windows====
In Windows gibt es einen Installer (und ein ZIP).
====Linux====
wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-7.10.0-amd64.deb
sudo dpkg -i elasticsearch-7.10.0-amd64.deb
sudo apt --fix-broken install
wget -qO - https://artifacts.elastic.co/GPG-KEY-elasticsearch | sudo apt-key add -
sudo sh -c 'echo "deb https://artifacts.elastic.co/packages/7.x/apt stable main" > /etc/apt/sources.list.d/elastic-7.x.list'
sudo apt update
sudo apt install elasticsearch
# sudo nano /etc/elasticsearch/elasticsearch.yml
cluster.name: my-cluster
network.host: 0.0.0.0
http.port: 9200
sudo systemctl start elasticsearch
sudo systemctl enable elasticsearch
curl -X GET "localhost:9200/"
====Docker====
version: '3.8'
services:
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:8.6.0
container_name: elasticsearch
environment:
- discovery.type=single-node
- ES_JAVA_OPTS=-Xms512m -Xmx512m
- ELASTIC_PASSWORD=dein_sicheres_passwort
ports:
- 9200:9200
- 9300:9300
volumes:
- esdata:/usr/share/elasticsearch/data
networks:
- es-net
volumes:
esdata:
driver: local
networks:
es-net:
driver: bridge
sudo curl -X GET "https://localhost:9200/_cat/indices" --insecure -u elastic:dein_sicheres_passwort
=====Service=====
Als Service unter Windows einrichten: Im Ordner elasticsearch/bin gibt es das Script elasticsearch-service.bat.
elasticsearch-service.bat install
elasticsearch-service.bat start
elasticsearch-service.bat stop
elasticsearch-service.bat remove
=====Shards=====
Elasticsearch, "shards" sind die grundlegenden Einheiten, in die ein Index aufgeteilt wird, um die Daten zu speichern und zu verteilen. Elasticsearch ist ein verteiltes Such- und Analysetool, das entwickelt wurde, um große Mengen strukturierter oder unstrukturierter Daten zu verwalten und abzufragen. Sharding ist eine Schlüsselkomponente dieses verteilten Ansatzes.
Jeder Shard ist eine Instanz von Lucene, der Volltextsuch-Engine, die von Elasticsearch verwendet wird.
Lucene ist für die Indexierung, das Durchsuchen und das Abrufen von Dokumenten in einem Shard verantwortlich. Elasticsearch organisiert und verteilt diese Shards über verschiedene Knoten in einem Cluster, um eine skalierbare und zuverlässige Suchinfrastruktur bereitzustellen.
====Primary Shards====
Ein Elasticsearch-Index kann aus einer oder mehreren "Primary Shards" bestehen. Jeder Primary Shard ist eine eigenständige Einheit, die eine Teilmenge der Indexdaten enthält. Diese Daten werden über verschiedene Server oder Knoten im Elasticsearch-Cluster verteilt, wodurch die Last auf verschiedene Hardware-Ressourcen verteilt wird und die Suche und Abfrage parallelisiert werden können.
====Replica Shards====
Jede Primary Shard kann auch über eine konfigurierbare Anzahl von "Replica Shards" verfügen. Replika-Shards sind Kopien der Primary Shards und dienen dazu, die Hochverfügbarkeit und Ausfallsicherheit des Indexes zu gewährleisten. Wenn ein Knoten ausfällt oder nicht mehr erreichbar ist, können die Replika-Shards auf anderen Knoten weiterhin abgefragt werden.
* Replicas werden automatisch erstellt, wenn ein Index erstellt wird. Die Anzahl der Replikate kann nachträglich erhöht oder verringert werden.
* Replicas befinden sich normalerweise auf anderen Servern oder Knoten im Cluster als die Primärshards, um Redundanz und Verfügbarkeit sicherzustellen.
* Suchanfragen können parallel auf Primär- und Replikashards ausgeführt werden, um die Antwortzeit und die Auslastung des Systems zu verbessern.
* Wenn ein Knoten ausfällt oder ein Primärshard nicht verfügbar ist, kann ein Replikashard automatisch zum neuen Primärshard werden, um die Verfügbarkeit aufrechtzuerhalten.
=====Datatypes=====
* **Text**: Wird für Volltextsuche verwendet. Inhalte werden analysiert (z. B. in Tokens zerlegt) und in einem inversen Index gespeichert. Ideal für große, durchsuchbare Textblöcke (z. B. Artikel oder Beschreibungen). (KEIN wildcard)
* **Keyword**: Eignet sich für nicht-analysierte Werte. Inhalte werden genau so gespeichert, wie sie eingegeben wurden. Geeignet für Filter, Sortierung und Aggregationen (z. B. Tags, Kategorien, IDs). (wildcard)
* **Integer**, Long, Short, Byte: Unterschiedliche Ganzzahltypen mit verschiedenen Größenbereichen.
* **Float**, **Double**: Gleitkommazahlen für präzise numerische Berechnungen.
* **Scaled Float**: Speziell für Gleitkommazahlen, die als ganzzahliger Wert gespeichert werden, skaliert durch einen Faktor (z. B. für Geldbeträge).
* **Integer_range**, Float_range, Long_range, Date_range, etc.: Speichert Wertebereiche und unterstützt Abfragen über Überlappung oder Inklusion.
* **Date**: Unterstützt verschiedene Formate (ISO 8601, benutzerdefinierte Formate). Speichert Zeitstempel und unterstützt Datumsmanipulation (z. B. Zeitdifferenzen).
* **Object**: Ermöglicht geschachtelte Datenstrukturen. Wird als einfache JSON-Objekte gespeichert.
* **Nested**: Ähnlich wie object, aber jedes Element wird separat indiziert. Ermöglicht präzisere Abfragen in verschachtelten Daten.
=====Queries=====
GET /myIndex/_search
{
"size": 10,
"query": {
"wildcard": {
"data.win.eventdata.authenticationPackageName": "NTLM"
}
}
}
GET /myIndex/_search
{
"size": 10,
"query": {
"wildcard": {
"data.win.eventdata.authenticationPackageName": "NTLM"
}
},
"sort": [{ "@timestamp": "asc" }],
"search_after": ["2025-03-04T18:56:45.890+0000"]
}
GET /myIndex/_search
{
"size": 10,
"query": {
"wildcard": {
"data.win.eventdata.authenticationPackageName": "NTLM"
}
},
"sort": [{ "@timestamp": "asc" }]
}
GET /myIndex/_search
{
"query": {
"bool": {
"must": [
{
"match": {
"data.win.eventdata.targetUserName": "dom_docusnap"
}
},
{
"wildcard": {
"data.win.system.message": "*logged on*"
}
}
]
}
}
}
=====API=====
Passwort für user "elastic" wird beim ersten Start in der Console angezeigt. Auch der Enrollment-Token.
#add user
curl -X POST "http://localhost:9200/_security/user/manuel" -H "Content-Type: application/json" -d "{ \"password\": \"s3cr3t\", \"roles\": [\"superuser\"], \"full_name\": \"Manuel Zarat\" }" -u elastic:xxx
#delete user
curl -X DELETE "https://localhost:9200/_security/user/manuel" -u elastic:xxx
# create index
curl -X PUT "http://localhost:9200/mein_index" -H "Content-Type: application/json" -d "{ \"settings\": { \"number_of_shards\": 5, \"number_of_replicas\": 2}, \"mappings\": { \"properties\": { \"field1\": { \"type\": \"text\" }, \"field2\": { \"type\": \"text\" } } } }" -u elastic:xxx
#list indices
curl -X GET "http://localhost:9200/_cat/indices?v" -u elastic:xxx
#delete index
curl -X DELETE "http://localhost:9200/mein_index" -u elastic:xxx
#add entry
curl -X POST "http://localhost:9200/mein_index/_doc" -H "Content-Type: application/json" -d "{\"field1\": \"testname\", \"field2\": \"testkeyword\"}" -u elastic:xxx
#search entry
curl -X GET "http://localhost:9200/mein_index/_search" -H "Content-Type: application/json" -d "{ \"query\": { \"match\": { \"field1\": \"testname\" } } }" -u elastic:xxx
curl -X GET "http://localhost:9200/mein_index/_search" -H "Content-Type: application/json" -d "{ \"query\": { \"wildcard\": { \"field1\": \"test*\" } } }" -u elastic:xxx
# As Text
curl -X GET "http://localhost:9200/_sql?format=txt" -H "Content-Type: application/json" -d "{\"query\": \"SELECT * FROM mein_index WHERE MATCH(field1,'hund') AND NOT MATCH(field2, 'katze')\"}" --insecure -u elastic:xxx
# As JSON
curl -X GET "http://localhost:9200/_sql?format=json" -H "Content-Type: application/json" -d "{\"query\": \"SELECT * FROM mein_index WHERE MATCH(field1,'hund') AND NOT MATCH(field2, 'katze')\"}" --insecure -u elastic:xxx
# For LIKE the field must be a "keyword"
curl -X GET "http://localhost:9200/_sql?format=txt" -H "Content-Type: application/json" -d "{\"query\": \"SELECT * FROM mein_index WHERE field1 LIKE 'D:\\Musik\\Hip%%'\"}" --insecure -u elastic:xxx
curl -X GET "http://localhost:9200/_sql?format=txt" -H "Content-Type: application/json" -d "{\"query\": \"SELECT * FROM mein_index WHERE field1 RLIKE 'D:\\Mus.*\\Hip.*'\"}" --insecure -u elastic:xxx
=====Python=====
import logging
from urllib.parse import urljoin
import requests
from bs4 import BeautifulSoup
logging.basicConfig(
format='%(asctime)s %(levelname)s:%(message)s',
level=logging.INFO)
class Crawler:
def __init__(self, urls=[]):
self.visited_urls = []
self.urls_to_visit = urls
def download_url(self, url):
return requests.get(url).text
def get_linked_urls(self, url, html):
soup = BeautifulSoup(html, 'html.parser')
for link in soup.find_all('a'):
path = link.get('href')
if path and path.startswith('/'):
path = urljoin(url, path)
yield path
def add_url_to_visit(self, url):
if url not in self.visited_urls and url not in self.urls_to_visit:
self.urls_to_visit.append(url)
def crawl(self, url):
html = self.download_url(url)
for url in self.get_linked_urls(url, html):
self.add_url_to_visit(url)
def run(self):
while self.urls_to_visit:
url = self.urls_to_visit.pop(0)
logging.info(f'Crawling: {url}')
try:
self.crawl(url)
except Exception:
logging.exception(f'Failed to crawl: {url}')
finally:
self.visited_urls.append(url)
if __name__ == '__main__':
Crawler(urls=['https://www.imdb.com/']).run()
====Create Index====
import os
from elasticsearch import Elasticsearch
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
es = Elasticsearch(
['https://localhost:9200'],
basic_auth=("elastic", "secure"),
verify_certs=False
)
index_name = "test"
if not es.indices.exists(index=index_name):
es.indices.create(index=index_name, body={
"mappings": {
"properties": {
"url": {"type": "text"},
"title": {"type": "text"}
}
}
})
print(f"Index '{index_name}' wurde erstellt.")
else:
print(f"Index '{index_name}' existiert bereits.")
====Insert Data====
import os
from elasticsearch import Elasticsearch
from elasticsearch.helpers import streaming_bulk
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
es = Elasticsearch(
['https://localhost:9200'],
basic_auth=("elastic", "lunikoff"),
verify_certs=False
)
index_name = "test"
# Testeinträge als Dokumente
documents = [
{"url": "heute.at", "title": "Das war heute"},
{"url": "gestern.at", "title": "Das war gestern"},
]
# Dokumente in den Index einfügen
for doc in documents:
es.index(index=index_name, body=doc)
print(f"Dokument hinzugefügt: {doc}")
print("Alles wurde indexiert.")
====Query Data====
from elasticsearch import Elasticsearch
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
es = Elasticsearch(
['https://localhost:9200'],
basic_auth=("elastic", "lunikoff"),
verify_certs=False
)
data = {
"query": {
"wildcard": {
"url": "heute*"
}
},
"size": 100
}
response = es.search(index="test", body=data, scroll="1m")
if 'hits' not in response or len(response['hits']['hits']) == 0:
print("Keine Ergebnisse gefunden.")
exit(1)
scroll_id = response['_scroll_id']
res_c = 0
for hit in response['hits']['hits']:
res_c += 1
print("%(url)s; %(title)s" % hit["_source"])
while True:
response = es.scroll(body={"scroll_id": scroll_id, "scroll": "1m"})
print("Got a response")
if 'hits' not in response or len(response['hits']['hits']) == 0:
break
for hit in response['hits']['hits']:
res_c += 1
print("%(.url)s; %(.title)s" % hit["_source"])
es.clear_scroll(body={"scroll_id": scroll_id})
print("We had", res_c, "results")
====Delete Index====
import csv
import glob
import os
from elasticsearch import Elasticsearch
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
es = Elasticsearch(
['https://localhost:9200'],
basic_auth=("elastic", "secure"),
verify_certs=False
)
index_name = input('Index name: ')
# Prüfen, ob der Index existiert und ihn löschen
if es.indices.exists(index=index_name):
es.indices.delete(index=index_name)
print(f"Index '{index_name}' wurde gelöscht.")
else:
print(f"Index '{index_name}' existiert nicht.")
====Scroll Query====
from elasticsearch import Elasticsearch
# Initialisiere Elasticsearch-Verbindung
es = Elasticsearch(
['http://localhost:9200'],
basic_auth=("elastic", "PqGVK*4uj4r8_kUzmyWA"),
verify_certs=False
)
# Definiere den Query ohne Scroll und sende den ersten Search-Request
data = {
"size": 1000, # Maximale Anzahl an Ergebnissen pro Anfrage
"query": {
"match_all": {}
}
}
# Erster Scroll-Request
response = es.search(index="test", body=data, scroll="1m")
# Holen Sie sich den Scroll-ID aus der ersten Antwort
scroll_id = response['_scroll_id']
res_c = 0
# Solange es Ergebnisse gibt, hole weitere Ergebnisse
while True:
# Hole den nächsten Scroll-Chunk
response = es.scroll(scroll_id=scroll_id, scroll="1m")
# Wenn keine Treffer mehr vorhanden sind, beende die Schleife
if len(response['hits']['hits']) == 0:
break
# Verarbeite die Treffer
for hit in response['hits']['hits']:
res_c += 1
print("%(FolderPath)s %(IdentityReference)s: %(FileSystemRights)s" % hit["_source"])
# Wenn du fertig bist, rufe clear_scroll auf, um den Scroll-Context zu löschen
es.clear_scroll(scroll_id=scroll_id)
print("We had ", res_c, " results")
====QueryBuilder====
class ElasticsearchQueryBuilder:
def __init__(self):
self.query = {
"query": {
"bool": {
"must": [],
"should": [],
"must_not": []
}
}
}
def add_condition(self, method, field, value):
"""
Add a condition to the query.
The method is one of: 'match', 'wildcard', 'term', 'range'
"""
if method == "match":
return {"match": {field: value}}
elif method == "wildcard":
return {"wildcard": {field: value}}
elif method == "term":
return {"term": {field: value}}
elif method == "range":
return {"range": {field: {"gte": value}}}
else:
raise ValueError(f"Unsupported method: {method}")
def build_query(self, conditions, operator="AND", sort_field=None, sort_order="asc"):
"""
Build the query with the provided conditions, logical operator, and optional sorting.
'AND' means all conditions must match, 'OR' means any condition can match.
"""
if operator == "AND":
self.query["query"]["bool"]["must"] = conditions
elif operator == "OR":
self.query["query"]["bool"]["should"] = conditions
self.query["query"]["bool"]["minimum_should_match"] = 1
else:
raise ValueError("Operator must be 'AND' or 'OR'")
# Füge die Sortierung hinzu, falls ein Sortierfeld angegeben wurde
if sort_field:
self.query["sort"] = [{sort_field: {"order": sort_order}}]
return self.query
def display_query(self):
"""Print the generated query in a pretty JSON format."""
import json
print(json.dumps(self.query, indent=2))
def add_nested_condition(self, method, field, value):
"""
Add a nested condition, creating a bool query within another bool query.
"""
nested_condition = {
"bool": {
"must": [self.add_condition(method, field, value)]
}
}
return nested_condition
# Beispielnutzung
def main():
# Initialisierung des Query Builders
builder = ElasticsearchQueryBuilder()
# Verschachtelte Bedingungen erstellen
must_conditions = []
must_conditions.append(builder.add_condition("match", "data.win.eventdata.authenticationPackageName", "NTLM"))
must_conditions.append(builder.add_condition("wildcard", "data.win.eventdata.targetUserName", "*"))
# Eine verschachtelte "bool" Abfrage innerhalb der "must" Bedingungen
nested_condition = builder.add_nested_condition("term", "data.win.eventdata.lmPackageName", "NTLM V1")
must_conditions.append(nested_condition)
# Die Bedingungen zur Query hinzufügen (Operator AND)
query = builder.build_query(must_conditions, operator="AND", sort_field="timestamp", sort_order="desc")
# Die erzeugte Query ausgeben
builder.display_query()
if __name__ == "__main__":
main()
====Query Builder Module====
import json
class ElasticsearchQueryBuilder:
def __init__(self):
self.query = {
"query": {
"bool": {
"must": [],
"should": [],
"must_not": []
}
}
}
def add_condition(self, method, field, value):
"""
Add a condition to the query.
The method is one of: 'match', 'wildcard', 'term', 'range'
"""
if method == "match":
return {"match": {field: value}}
elif method == "wildcard":
return {"wildcard": {field: value}}
elif method == "term":
return {"term": {field: value}}
elif method == "range":
return {"range": {field: {"gte": value}}}
else:
raise ValueError(f"Unsupported method: {method}")
def build_query(self, conditions, operator="AND", sort_field=None, sort_order="asc"):
"""
Build the query with the provided conditions, logical operator, and optional sorting.
'AND' means all conditions must match, 'OR' means any condition can match.
"""
if operator == "AND":
self.query["query"]["bool"]["must"] = conditions
elif operator == "OR":
self.query["query"]["bool"]["should"] = conditions
self.query["query"]["bool"]["minimum_should_match"] = 1
else:
raise ValueError("Operator must be 'AND' or 'OR'")
# Füge die Sortierung hinzu, falls ein Sortierfeld angegeben wurde
if sort_field:
self.query["sort"] = [{sort_field: {"order": sort_order}}]
return self.query
def display_query(self):
"""Print the generated query in a pretty JSON format."""
print(json.dumps(self.query, indent=2))
def add_nested_condition(self, method, field, value):
"""
Add a nested condition, creating a bool query within another bool query.
"""
nested_condition = {
"bool": {
"must": [self.add_condition(method, field, value)]
}
}
return nested_condition
Usage:
from elasticsearch_query_builder import ElasticsearchQueryBuilder
# Initialisierung des Query Builders
builder = ElasticsearchQueryBuilder()
# Verschachtelte Bedingungen erstellen
must_conditions = []
must_conditions.append(builder.add_condition("match", "data.win.eventdata.authenticationPackageName", "NTLM"))
must_conditions.append(builder.add_condition("wildcard", "data.win.eventdata.targetUserName", "*"))
# Eine verschachtelte "bool" Abfrage innerhalb der "must" Bedingungen
nested_condition = builder.add_nested_condition("term", "data.win.eventdata.lmPackageName", "NTLM V1")
must_conditions.append(nested_condition)
# Die Bedingungen zur Query hinzufügen (Operator AND)
query = builder.build_query(must_conditions, operator="AND", sort_field="timestamp", sort_order="desc")
# Die erzeugte Query ausgeben
builder.display_query()
# Initialisierung des Query Builders
builder = ElasticsearchQueryBuilder()
# Verschachtelte Bedingungen erstellen
must_conditions = []
must_conditions.append(builder.add_condition("match", "data.win.eventdata.authenticationPackageName", "NTLM"))
must_conditions.append(builder.add_condition("wildcard", "data.win.eventdata.targetUserName", "*"))
# Erste verschachtelte Bedingung
nested_condition_1 = builder.add_nested_condition("term", "data.win.eventdata.lmPackageName", "NTLM V1")
# Zweite verschachtelte Bedingung
nested_condition_2 = builder.add_nested_condition("term", "data.win.eventdata.targetDomainName", "example.com")
# Die verschachtelten Bedingungen zur Liste der "must" Bedingungen hinzufügen
must_conditions.append(nested_condition_1)
must_conditions.append(nested_condition_2)
# Die Bedingungen zur Query hinzufügen (Operator AND)
query = builder.build_query(must_conditions, operator="AND", sort_field="timestamp", sort_order="desc")
# Die erzeugte Query ausgeben
builder.display_query()
# Initialisierung des Query Builders
builder = ElasticsearchQueryBuilder()
# Verschachtelte Bedingungen erstellen
must_conditions = []
must_conditions.append(builder.add_condition("match", "data.win.eventdata.authenticationPackageName", "NTLM"))
must_conditions.append(builder.add_condition("wildcard", "data.win.eventdata.targetUserName", "*"))
# Erste verschachtelte Bedingung mit zwei "should" Bedingungen
nested_condition_1 = {
"bool": {
"should": [
builder.add_condition("term", "data.win.eventdata.lmPackageName", "NTLM V1"),
builder.add_condition("wildcard", "data.win.eventdata.targetDomainName", "example*")
],
"minimum_should_match": 1 # Minimum eine der "should" Bedingungen muss zutreffen
}
}
# Zweite verschachtelte Bedingung mit zwei "should" Bedingungen
nested_condition_2 = {
"bool": {
"should": [
builder.add_condition("term", "data.win.eventdata.lmPackageName", "NTLM V1"),
builder.add_condition("wildcard", "data.win.eventdata.targetDomainName", "example*")
],
"minimum_should_match": 1 # Minimum eine der "should" Bedingungen muss zutreffen
}
}
# Die verschachtelten Bedingungen zur Liste der "must" und "should" Bedingungen hinzufügen
should_conditions = []
should_conditions.append(nested_condition_1)
should_conditions.append(nested_condition_2)
# Die "must"-Bedingungen und "should"-Bedingungen kombinieren
query = builder.build_query(must_conditions, operator="AND", sort_field="timestamp", sort_order="desc")
# Füge die "should"-Bedingungen in den "bool" Block ein
query["query"]["bool"]["should"] = should_conditions
# Die erzeugte Query ausgeben
builder.display_query()
=====Bash=====
====Scroll Query====
#!/bin/bash
# Elasticsearch-Server-URL
ES_HOST="https://localhost:9200"
INDEX="wazuh-alerts*"
USER="admin"
PASS="SecretPassword"
# Erste Suchanfrage mit Scroll
response=$(curl -X POST "$ES_HOST/$INDEX/_search?scroll=1m" -u "$USER:$PASS" -H "Content-Type: application/json" -d '{
"size": 10,
"query": {
"match_all": {}
}
}' --insecure)
# Scroll-ID extrahieren
scroll_id=$(echo "$response" | jq -r '._scroll_id')
# Anzahl der Treffer zählen
res_c=0
# Loop über die Ergebnisse
while true; do
read -p "Press enter to continue"
# Treffer verarbeiten
hits=$(echo "$response" | jq -c '.hits.hits[]?')
# Wenn keine Treffer mehr vorhanden sind, abbrechen
if [[ -z "$hits" ]]; then
break
fi
# Treffer ausgeben
echo "$hits" | while read -r hit; do
#folder_path=$(echo "$hit" | jq -r '._source.FolderPath // empty')
echo $hit |jq
((res_c++))
done
# Nächste Scroll-Anfrage senden
response=$(curl -X POST "$ES_HOST/_search/scroll" -u "$USER:$PASS" -H "Content-Type: application/json" -d "{
\"scroll\": \"1m\",
\"scroll_id\": \"$scroll_id\"
}" --insecure)
# Neuen Scroll-ID extrahieren
scroll_id=$(echo "$response" | jq -r '._scroll_id')
done
# Scroll-Context aufräumen
curl -s -X DELETE "$ES_HOST/_search/scroll" -u "$USER:$PASS" -H "Content-Type: application/json" -d "{
\"scroll_id\": \"$scroll_id\"
}" > /dev/null
echo "We had $res_c results"
====Query Builder====
#!/bin/bash
add_condition() {
local method="$1"
local field="$2"
local value="$3"
case "$method" in
"match")
echo "{\"match\": {\"$field\": \"$value\"}}"
;;
"wildcard")
echo "{\"wildcard\": {\"$field\": \"$value\"}}"
;;
"term")
echo "{\"term\": {\"$field\": \"$value\"}}"
;;
"range")
echo "{\"range\": {\"$field\": {\"gte\": \"$value\"}}}"
;;
*)
echo "Unsupported method: $method"
return 1
;;
esac
}
add_nested_condition() {
local method="$1"
local field="$2"
local value="$3"
local condition
condition=$(add_condition "$method" "$field" "$value")
echo "{\"bool\": {\"must\": [$condition]}}"
}
build_query() {
local conditions="$1"
local operator="$2"
local sort_field="$3"
local sort_order="$4"
local query="{\"query\": {\"bool\": {\"must\": [], \"should\": [], \"must_not\": []}}}"
if [[ "$operator" == "AND" ]]; then
query=$(echo "$query" | jq ".query.bool.must = $conditions")
elif [[ "$operator" == "OR" ]]; then
query=$(echo "$query" | jq ".query.bool.should = $conditions | .query.bool.minimum_should_match = 1")
else
echo "Operator must be 'AND' or 'OR'"
return 1
fi
if [[ -n "$sort_field" ]]; then
query=$(echo "$query" | jq ".sort = [{\"$sort_field\": {\"order\": \"$sort_order\"}}]")
fi
echo "$query"
}
display_query() {
local query="$1"
echo "$query" | jq .
}
# Example
and_conditions=()
and_conditions+=("$(add_condition "match" "data.win.eventdata.authenticationPackageName" "NTLM")")
and_conditions+=("$(add_condition "wildcard" "data.win.eventdata.targetUserName" "*")")
nested_condition=$(add_nested_condition "term" "data.win.eventdata.lmPackageName" "NTLM V1")
and_conditions+=("$nested_condition")
and_conditions_json=$(printf "%s\n" "${and_conditions[@]}" | jq -s add)
query=$(build_query "$and_conditions_json" "AND" "timestamp" "desc")
display_query "$query"
=====CLI=====
elastic-search-users useradd manuel -p ToPsEcRet
elasticsearch-users roles manuel -a superuser
elasticsearch-users roles manuel -r superuser
=====Links=====
* [[https://kruschecompany.com/de/elastic-stack-elk|Elastic Stack (ELK) Tutorial]]
* [[https://www.elastic.co/guide/en/elasticsearch/reference/current/setup.html|Setting up Elasticsearch]]