Создание типового пайплайна
Условные обозначения
LS_HOME
- домашняя директория Logstash, обычно это/app/logstash/
PIPELINE_FILE
— конфигурационный файл пайплайнаPIPELINE_ID
— ID пайплайнаHOST_OS_DATA
— узлы с ролью dataINDEX_NAME
— название индекса
Расположение пайплайн файла
Создайте файл пайплайна <PIPELINE_FILE>
в директории ${LS_HOME}/config/conf.d
следующей командой:
sudo -u logstash touch ${LS_HOME}/config/conf.d/<PIPELINE_FILE>
Структура пайплайн файла
Конфигурационный файл пайплайна состоит из трех основных блоков: input
, filter
и output
.
Блок input
Блок input
определяет, откуда Logstash получает данные. Например, это может быть файл (в формате построчного лога, json, csv и т.д.), прием данных от агента, сбор с помощью LDAP, REST API и другое.
Если в одном конфигурационном файле указано несколько input-плагинов, каждый из них будет работать независимо — данные будут поступать из всех указанных источников параллельно. При использовании нескольких конфигураций одного и того же input-плагина (например, несколько file-блоков), каждый блок будет обрабатываться отдельно, согласно своим параметрам.
Пример блока input
:
input {
file {
path => "/tmp/log_file.json"
start_position => "beginning"
sincedb_path => "/dev/null"
codec => "json"
}
}
В этом примере используется file
input-плагин со следующими параметрами для чтения из файла:
path
— задает путь к файлу, который должен обрабатываться Logstashstart_position
— указывает начальную позицию, с которой будет начинаться вычитывание данных из файлаsincedb_path
— отвечает за путь к файлу, в котором хранится текущая позиция чтения. Если параметр не задан, Logstash использует файл в директории${LS_HOME}/data/plugins/inputs/
. Для принудительного чтения с начала можно указать /dev/null — это приведёт к сбросу позиции при перезапуске и возможному дублированию данныхcodec
— общий параметр для всех input плагинов, который задаёт способ декодирования входящих данных. В данном случае file input-плагин будет ожидать, что входящие строки файла представлены в json формате, и десериализовывать их в объекты
Блок filter
Блок filter
отвечает за обработку входящих данных: их парсинг, фильтрацию и модификацию. Для парсинга логов обычно используется grok filter-плагин, построенный на работе с регулярными выражениями. Дополнительную информацию про grok filter-плагин можно найти в официальной документации Elastic. Блок filter
также может включать плагин mutate, который позволяет применять такие опции, как add_field
, remove_field
и другие методы преобразования полей события. Дополнительную информацию об этих и других filter-плагинах можно найти в официальной документации Elastic. Выполнение всех плагинов внутри блока filter
выполняется друг за другом.
Пример простого блока filter
:
filter {
grok {
match => {
"message" => ['%{IP:client_ip}\s+"%{IP:remote_ip}"\s+\[%{HTTPDATE:timestamp}\]\s+%{WORD:http_method}]
}
}
mutate {
rename => { "http_method" => "method" }
add_field => { "service" => "web_server" }
}
}
Блок output
Блок output
определяет, куда отправляются обработанные данные. Это может быть Elasticsearch, OpenSearch, файл или база данных.
Если в конфигурации указано несколько output-плагинов, каждое обработанное событие будет последовательно передано во все выходные блоки. Каждый output-блок получит свою копию события, что позволяет, например, параллельно отправлять данные в OpenSearch и сохранять их в файл для целей отладки.
Если один output
завершится ошибкой, остальные продолжат работу.
Пример типового блока output
для OpenSearch:
output {
opensearch {
hosts => ["https://<HOST_OS_DATA[1]>:9200", "https://<HOST_OS_DATA[2]>:9200", "https://<HOST_OS_DATA[...]>:9200"]
index => "<INDEX_NAME>"
user => "logstash"
password => "${ES_PWD}"
ssl => true
cacert => "${LS_HOME}/config/ca-cert.pem"
ecs_compatibility => "disabled"
}
}
Для этого примера используются типовые параметры:
hosts
— узлы с ролью data, можно указать несколько значений через запятуюindex
— имя индекса, в который будут записываться события. Для автоматического создания индексов с учетом номера недели можно использовать шаблон с датой, например:<INDEX_NAME>-%{+xxxx.ww}
, где%{+xxxx.ww}
подставляет год и номер недели (вместо данного подхода рекомендуется использовать политику с действием rollover политики ISM)user
— имя пользователя для аутентификации в OpenSearchpassword
— пароль для аутентификации в OpenSearch, в данном примере пароль берётся из хранилища паролей keystore (инструкция по добавлению пароля в хранилище доступна в соответствующем разделе добавление пароля в хранилище keystore)ssl
— параметр, определяющий необходимо ли использовать SSLssl_certificate_verification
— уровень проверки сертификатаcacert
— путь к CA-сертификатуecs_compatibility
— настройка совместимости с ECS
Добавление пайплайна в pipelines.yml
После создания пайплайна его необходимо добавить в файл pipelines.yml
, чтобы при следующем старте Logstash произошел его запуск. Для этого перейдите в директорию ${LS_HOME}/config
и откройте файл pipelines.yml
для редактирования:
sudo nano ${LS_HOME}/config/pipelines.yml
Добавьте следующие строки в файл, выполнив команду:
- pipeline.id: <PIPELINE_ID>
path.config: "${LS_HOME}/config/conf.d/<PIPELINE_FILE>"
Рекомендуется использовать <PIPELINE_ID>
, совпадающий с именем вашего пайплайна, чтобы данный идентификатор включался в названии файла лога, облегчая их идентификацию и анализ.
Теперь перезапустите Logstash и проверьте его статус следующими командами:
sudo systemctl restart logstash
sudo systemctl status logstash
Просмотр логов
Для проверки статистики запущенных пайплайнов выполните команду:
tail -n 100 /app/logs/logstash/logstash-plain.log
В списке запущенных пайплайнов (running_pipelines
) должен присутствовать id вашего пайплайна (<PIPELINE_ID>
). Пример отображения запущенных пайплайнов:
[2025-06-02T16:30:20,227][INFO ][logstash.agent ] Pipelines running {:count=>3, :running_pipelines=>[:linux_io_monitoring, :win_activity, :ksc_applications], :non_running_pipelines=>[]}
Помимо основного лога Logstash (logstash-plain.log) у каждого пайплайна также есть свой лог, который сохраняется под именем pipeline_<PIPELINE_ID>.log
, в котором доступна подробная информация о работе данного пайплайна. Пример лога:
[2025-06-02T16:30:06,222][INFO ][logstash.javapipeline ] Starting pipeline {:pipeline_id=>"linux_io_monitoring", "pipeline.workers"=>8, "pipeline.batch.size"=>225, "pipeline.batch.delay"=>50, "pipeline.max_inflight"=>1800, "pipeline.sources"=>["/app/logstash/config/conf.d/linux_disk_io.conf"], :thread=>"#<Thread:0x2ebc0ca3 run>"}
[2025-06-02T16:30:07,953][INFO ][logstash.javapipeline ] Pipeline Java execution initialization time {"seconds"=>1.73}
[2025-06-02T16:30:08,678][INFO ][logstash.inputs.beats ] Starting input listener {:address=>"0.0.0.0:51123"}
[2025-06-02T16:30:08,779][INFO ][logstash.javapipeline ] Pipeline started {"pipeline.id"=>"linux_io_monitoring"}
[2025-06-02T16:30:09,058][INFO ][org.logstash.beats.Server] Starting server on port: 51123
Предпоследняя и последняя строки показывают, что пайплайн запущен.