Перейти к основному содержимому
Версия: 5.0

Создание типового пайплайна

Условные обозначения

  • LS_HOME - домашняя директория Logstash, обычно это /app/logstash/
  • PIPELINE_FILE — конфигурационный файл пайплайна
  • PIPELINE_ID — ID пайплайна
  • HOST_OS_DATA — узлы с ролью data
  • INDEX_NAME — название индекса

Расположение пайплайн файла

Создайте файл пайплайна <PIPELINE_FILE> в директории ${LS_HOME}/config/conf.d следующей командой:

sudo -u logstash touch ${LS_HOME}/config/conf.d/<PIPELINE_FILE>

Структура пайплайн файла

Конфигурационный файл пайплайна состоит из трех основных блоков: input, filter и output.

Блок input

Блок input определяет, откуда Logstash получает данные. Например, это может быть файл (в формате построчного лога, json, csv и т.д.), прием данных от агента, сбор с помощью LDAP, REST API и другое.

Если в одном конфигурационном файле указано несколько input-плагинов, каждый из них будет работать независимо — данные будут поступать из всех указанных источников параллельно. При использовании нескольких конфигураций одного и того же input-плагина (например, несколько file-блоков), каждый блок будет обрабатываться отдельно, согласно своим параметрам.

Пример блока input:

input {
file {
path => "/tmp/log_file.json"
start_position => "beginning"
sincedb_path => "/dev/null"
codec => "json"
}
}

В этом примере используется file input-плагин со следующими параметрами для чтения из файла:

  • path — задает путь к файлу, который должен обрабатываться Logstash
  • start_position — указывает начальную позицию, с которой будет начинаться вычитывание данных из файла
  • sincedb_path — отвечает за путь к файлу, в котором хранится текущая позиция чтения. Если параметр не задан, Logstash использует файл в директории ${LS_HOME}/data/plugins/inputs/. Для принудительного чтения с начала можно указать /dev/null — это приведёт к сбросу позиции при перезапуске и возможному дублированию данных
  • codec — общий параметр для всех input плагинов, который задаёт способ декодирования входящих данных. В данном случае file input-плагин будет ожидать, что входящие строки файла представлены в json формате, и десериализовывать их в объекты

Блок filter

Блок filter отвечает за обработку входящих данных: их парсинг, фильтрацию и модификацию. Для парсинга логов обычно используется grok filter-плагин, построенный на работе с регулярными выражениями. Дополнительную информацию про grok filter-плагин можно найти в официальной документации Elastic. Блок filter также может включать плагин mutate, который позволяет применять такие опции, как add_field, remove_field и другие методы преобразования полей события. Дополнительную информацию об этих и других filter-плагинах можно найти в официальной документации Elastic. Выполнение всех плагинов внутри блока filter выполняется друг за другом.

Пример простого блока filter:

filter {
grok {
match => {
"message" => ['%{IP:client_ip}\s+"%{IP:remote_ip}"\s+\[%{HTTPDATE:timestamp}\]\s+%{WORD:http_method}]
}
}

mutate {
rename => { "http_method" => "method" }
add_field => { "service" => "web_server" }
}
}

Блок output

Блок output определяет, куда отправляются обработанные данные. Это может быть Elasticsearch, OpenSearch, файл или база данных.

Если в конфигурации указано несколько output-плагинов, каждое обработанное событие будет последовательно передано во все выходные блоки. Каждый output-блок получит свою копию события, что позволяет, например, параллельно отправлять данные в OpenSearch и сохранять их в файл для целей отладки.

Обратите внимание!

Если один output завершится ошибкой, остальные продолжат работу.

Пример типового блока output для OpenSearch:

output {
opensearch {
hosts => ["https://<HOST_OS_DATA[1]>:9200", "https://<HOST_OS_DATA[2]>:9200", "https://<HOST_OS_DATA[...]>:9200"]
index => "<INDEX_NAME>"
user => "logstash"
password => "${ES_PWD}"
ssl => true
cacert => "${LS_HOME}/config/ca-cert.pem"
ecs_compatibility => "disabled"
}
}

Для этого примера используются типовые параметры:

  • hosts — узлы с ролью data, можно указать несколько значений через запятую
  • index — имя индекса, в который будут записываться события. Для автоматического создания индексов с учетом номера недели можно использовать шаблон с датой, например: <INDEX_NAME>-%{+xxxx.ww}, где %{+xxxx.ww} подставляет год и номер недели (вместо данного подхода рекомендуется использовать политику с действием rollover политики ISM)
  • user — имя пользователя для аутентификации в OpenSearch
  • password — пароль для аутентификации в OpenSearch, в данном примере пароль берётся из хранилища паролей keystore (инструкция по добавлению пароля в хранилище доступна в соответствующем разделе добавление пароля в хранилище keystore)
  • ssl — параметр, определяющий необходимо ли использовать SSL
  • ssl_certificate_verification — уровень проверки сертификата
  • cacert — путь к CA-сертификату
  • ecs_compatibility — настройка совместимости с ECS

Добавление пайплайна в pipelines.yml

После создания пайплайна его необходимо добавить в файл pipelines.yml, чтобы при следующем старте Logstash произошел его запуск. Для этого перейдите в директорию ${LS_HOME}/config и откройте файл pipelines.yml для редактирования:

sudo nano ${LS_HOME}/config/pipelines.yml

Добавьте следующие строки в файл, выполнив команду:

- pipeline.id: <PIPELINE_ID>
path.config: "${LS_HOME}/config/conf.d/<PIPELINE_FILE>"
Обратите внимание!

Рекомендуется использовать <PIPELINE_ID>, совпадающий с именем вашего пайплайна, чтобы данный идентификатор включался в названии файла лога, облегчая их идентификацию и анализ.

Теперь перезапустите Logstash и проверьте его статус следующими командами:

sudo systemctl restart logstash 
sudo systemctl status logstash

Просмотр логов

Для проверки статистики запущенных пайплайнов выполните команду:

tail -n 100 /app/logs/logstash/logstash-plain.log

В списке запущенных пайплайнов (running_pipelines) должен присутствовать id вашего пайплайна (<PIPELINE_ID>). Пример отображения запущенных пайплайнов:

[2025-06-02T16:30:20,227][INFO ][logstash.agent           ] Pipelines running {:count=>3, :running_pipelines=>[:linux_io_monitoring, :win_activity, :ksc_applications], :non_running_pipelines=>[]}

Помимо основного лога Logstash (logstash-plain.log) у каждого пайплайна также есть свой лог, который сохраняется под именем pipeline_<PIPELINE_ID>.log, в котором доступна подробная информация о работе данного пайплайна. Пример лога:

[2025-06-02T16:30:06,222][INFO ][logstash.javapipeline    ] Starting pipeline {:pipeline_id=>"linux_io_monitoring", "pipeline.workers"=>8, "pipeline.batch.size"=>225, "pipeline.batch.delay"=>50, "pipeline.max_inflight"=>1800, "pipeline.sources"=>["/app/logstash/config/conf.d/linux_disk_io.conf"], :thread=>"#<Thread:0x2ebc0ca3 run>"}
[2025-06-02T16:30:07,953][INFO ][logstash.javapipeline ] Pipeline Java execution initialization time {"seconds"=>1.73}
[2025-06-02T16:30:08,678][INFO ][logstash.inputs.beats ] Starting input listener {:address=>"0.0.0.0:51123"}
[2025-06-02T16:30:08,779][INFO ][logstash.javapipeline ] Pipeline started {"pipeline.id"=>"linux_io_monitoring"}
[2025-06-02T16:30:09,058][INFO ][org.logstash.beats.Server] Starting server on port: 51123

Предпоследняя и последняя строки показывают, что пайплайн запущен.