Перейти к основному содержимому
Версия: 5.1

Настройка узлов Kafka

Примеры конфигурационных файлов, задающих роль узла в кластере, расположены в каталоге /app/kafka/config/kraft и представлены файлами controller.properties и broker.properties.

В таблице представлены основные параметры конфигурации, требующие редактирования на этапе базовой настройки Kafka:

Основные параметры конфигурации Kafka:

ПараметрОписаниеПримечание
process.rolesЗадает роль узла: broker или controller.-
node.idУникальный ID узла в кластере.-
controller.quorum.votersПеречень контроллеров с их адресами и портами.-
listenersСписок URI и имен слушателей, через которые Kafka принимает подключения. Каждый слушатель должен иметь уникальные имя и порт. Для привязки ко всем интерфейсам используйте 0.0.0.0, оставьте хост пустым — для интерфейса по умолчанию.Примеры: PLAINTEXT://myhost:9092, SSL://:9091, CLIENT://0.0.0.0:9092, REPLICATION://localhost:9093.
controller.listener.namesУстанавливает адреса и порты, по которым клиенты (producer и consumer) будут подключаться к брокерам. Особенно актуально при работе за NAT или с несколькими сетевыми интерфейсами.-
listener.security.protocol.mapОпределяет соответствие между именами слушателей и используемыми протоколами безопасности. Позволяет задать разные настройки безопасности (SSL, SASL) для разных портов или IP. Каждый слушатель указывается один раз.Например, для INTERNAL: listener.name.internal.ssl.keystore.location. Если параметр не задан, используется общее значение (ssl.keystore.location). В KRaft, если не задан явный маппинг, слушатели из controller.listener.names по умолчанию используют PLAINTEXT.
log.dirsКаталоги, где Kafka хранит данные логов.Указывается список путей, разделенных запятыми.
num.partitionsУстанавливает количество партиций по умолчанию для каждого топика.Значение параметра будет заменено, если оно задано при создании топика.
offsets.topic.replication.factorОпределяет фактор репликации для топика смещений.Увеличьте значение для повышения доступности. Внутренний топик не будет создан, пока размер кластера не удовлетворит этому требованию.
transaction.state.log.replication.factorУстанавливает фактор репликации для топика транзакций.Увеличьте значение для повышения доступности. Внутренний топик не будет создан, пока размер кластера не удовлетворит этому требованию.
transaction.state.log.min.isrПереопределяет значение min.insync.replicas топика для транзакций.-
min.insync.replicasОпределяет минимальное число реплик, которые должны подтвердить запись при использовании значения acks=all (или -1).Если требуемое число реплик недоступно, producer получит ошибку (NotEnoughReplicas или NotEnoughReplicasAfterAppend). Совместная настройка acks и min.insync.replicas повышает надежность доставки сообщений.
log.retention.hoursОпределяет срок хранения логов (в часах) перед их удалением.Альтернативные параметры: log.retention.ms, log.retention.minutes.
log.segment.bytesМаксимальный размер одного файла-сегмента партиции.-
ssl.keystore.locationМестоположение файла хранилища ключей.-
ssl.keystore.passwordПароль к keystore-файлу, необходимый при использовании SSL и заданном пути к хранилищу ключей.Не требуется, если не задан ssl.keystore.location. Не поддерживается для PEM-формата.
ssl.truststore.locationРасположение файла хранилища доверенных сертификатов.-
ssl.truststore.passwordПароль для файла хранилища доверенных сертификатов.При отсутствии пароля файл будет использоваться, но без проверки целостности. Не поддерживается в формате PEM.
ssl.client.authОпределяет, требуется ли клиентская аутентификация при использовании SSL-соединения.Поддерживаются следующие значения: required — клиент обязан пройти аутентификацию. requested — аутентификация желательна, но не обязательна, клиент может не предоставлять данные. none — клиентская аутентификация отключена.
super.usersСписок суперпользователей с полными правами доступа.Игнорируют настройки ACL.
ssl.principal.mapping.rulesСписок правил преобразования полного имени (CN) из сертификата клиента в короткое имя.Правила применяются по порядку. Первое подходящее правило используется, остальные игнорируются.

Настройка узлов Kafka с ролью controller

Пример настройки первого узла с ролью controller - файл /app/kafka/config/kraft/controller.properties:

Пример
process.roles=controller
node.id=1
controller.quorum.voters=1@192.168.0.51:9093,2@192.168.0.52:9093,3@192.168.0.53:9093
listeners=CONTROLLER://192.168.0.51:9093
controller.listener.names=CONTROLLER
listener.security.protocol.map=CONTROLLER:SSL
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/app/logs/kraft-logs
num.partitions=5
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=5
transaction.state.log.replication.factor=5
transaction.state.log.min.isr=3
min.insync.replicas=3
log.retention.hours=3
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
ssl.keystore.location=/app/certs/kafka.keystore.jks
ssl.keystore.password=PASSWORDKeystore
ssl.truststore.location=/app/certs/ca-truststore.jks
ssl.truststore.password=PASSWORDTruststore
ssl.client.auth=required
authorizer.class.name=org.apache.kafka.metadata.authorizer.StandardAuthorizer
super.users=User:controller1;User:conroller2;User:controller3;User:broker1;User:broker2;User:broker3;User:broker4;User:broker5;User:adminkfk
allow.everyone.if.no.acl.found=False
ssl.principal.mapping.rules=RULE:^CN=(adminkfk),O=Work,L=Moscow,ST=Moscow,C=RU$/$1/L,RULE:^CN=(controller[1-3]),O=Work,L=Moscow,ST=Moscow,C=RU$/$1/L, RULE:^ CN=(broker[1-5]),O=Work,L=Moscow,ST=Moscow,C=RU$/$1/L
Примечание

Аналогично первому узлу, необходимо настроить остальные узлы с ролью controller.

Настройка узлов Kafka c ролью broker

Пример настройки первого узла с ролью broker:

Пример
process.roles=broker
node.id=4
controller.quorum.voters=1@192.168.0.51:9093,2@192.168.0.52:9093,3@192.168.0.53:9093
listeners=SSL:// 192.168.0.54:9092
inter.broker.listener.name=SSL
advertised.listeners=SSL:// 192.168.0.54:9092
controller.listener.names=CONTROLLER
listener.security.protocol.map=CONTROLLER:SSL,SSL:SSL
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/app/logs/kraft-logs
num.partitions=5
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=5
transaction.state.log.replication.factor=5
transaction.state.log.min.isr=3
min.insync.replicas=3
log.retention.hours=3
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
ssl.keystore.location=/app/certs/kafka.keystore.jks
ssl.keystore.password=PASSWORDkeystore
ssl.truststore.location=/app/certs/ca-truststore.jks
ssl.truststore.password=PASSWORDtruststore
ssl.client.auth=requested
allow.everyone.if.no.acl.found=False
auto.create.topics.enable=false
super.users=User:contreoller1;User:controller2;User:controller3;User:broker1;User:broker2;User:broker3;User:broker4;User:broker5;User:adminkfk
ssl.principal.mapping.rules=RULE:^CN=producer_hosts[1-2],O=Work,L=Moscow,ST=Moscow,C=RU$/producer_user/L,RULE:^CN=consumer_hosts[1-3],O=Work,L=Moscow,ST=Moscow,C=RU$/consumer_user/L,RULE:^CN=(adminkfk),O=Innotech,L=Moscow,ST=Moscow,C=RU$/$1/L

Настройка сервисов Kafka

На серверах Kafka необходимо создать конфигурационные файлы для управления сервисом Kafka. Названия файлов будут идентичны для узлов с ролью controller и broker - /etc/systemd/system/kafka.service.

  1. Пример конфигурационного файла для сервиса Kafka на узле c ролью controller:
[Unit]
Description=Apache Kafka server (controller)
Documentation=http://kafka.apache.org/documentation.html
Requires=network.target remote-fs.target
After=network.target remote-fs.target

[Service]
Type=simple
WorkingDirectory=/app/kafka
User=kafka
#User=root
Environment=KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
Environment=LOG_DIR="/app/logs/kafka"
XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent"
Environment=JAVA_HOME="/app/jdk/"
ExecStart=/app/kafka/bin/kafka-server-start.sh /app/kafka/config/kraft/controller.properties
ExecStop=/app/kafka/bin/kafka-server-stop.sh
TimeoutSec=30
LimitNOFILE=infinity

Restart=on-failure
RestartSec=10s

[Install]
WantedBy=multi-user.target
  1. Пример конфигурационного файла /app/service/kafka.service для сервиса Kafka на узле с ролью broker:
 [Unit]
Description=Apache Kafka server (broker)
Documentation=http://kafka.apache.org/documentation.html
Requires=network.target remote-fs.target
After=network.target remote-fs.target

[Service]
Type=simple
WorkingDirectory=/app/kafka
User=kafka
#User=root
Environment=KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
Environment=LOG_DIR="/app/logs/kafka"
XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent"
Environment=JAVA_HOME="/app/jdk/"
ExecStart=/app/kafka/bin/kafka-server-start.sh /app/kafka/config/kraft/broker.properties
ExecStop=/app/kafka/bin/kafka-server-stop.sh

TimeoutSec=30
LimitNOFILE=infinity

Restart=on-failure
RestartSec=10s

[Install]
WantedBy=multi-user.target
kafka.service_broker

После создания файлов сервисов необходимо перечитать все файлы юнитов из стандартных путей (/etc/systemd/system, /usr/lib/systemd/system и т.д.). Для этого выполните команду:

systemctl daemon-reload