Настройка узлов Kafka
Примеры конфигурационных файлов, задающих роль узла в кластере, расположены в каталоге /app/kafka/config/kraft
и представлены файлами controller.properties
и broker.properties
.
В таблице представлены основные параметры конфигурации, требующие редактирования на этапе базовой настройки Kafka
:
Основные параметры конфигурации Kafka:
Параметр | Описание | Примечание |
---|---|---|
process.roles | Задает роль узла: broker или controller . | - |
node.id | Уникальный ID узла в кластере. | - |
controller.quorum.voters | Перечень контроллеров с их адресами и портами. | - |
listeners | Список URI и имен слушателей, через которые Kafka принимает подключения. Каждый слушатель должен иметь уникальные имя и порт. Для привязки ко всем интерфейсам используйте 0.0.0.0 , оставьте хост пустым — для интерфейса по умолчанию. | Примеры: PLAINTEXT://myhost:9092 , SSL://:9091 , CLIENT://0.0.0.0:9092 , REPLICATION://localhost:9093 . |
controller.listener.names | Устанавливает адреса и порты, по которым клиенты (producer и consumer) будут подключаться к брокерам. Особенно актуально при работе за NAT или с несколькими сетевыми интерфейсами. | - |
listener.security.protocol.map | Определяет соответствие между именами слушателей и используемыми протоколами безопасности. Позволяет задать разные настройки безопасности (SSL, SASL) для разных портов или IP. Каждый слушатель указывается один раз. | Например, для INTERNAL: listener.name.internal.ssl.keystore.location . Если параметр не задан, используется общее значение (ssl.keystore.location ). В KRaft, если не задан явный маппинг, слушатели из controller.listener.names по умолчанию используют PLAINTEXT. |
log.dirs | Каталоги, где Kafka хранит данные логов. | Указывается список путей, разделенных запятыми. |
num.partitions | Устанавливает количество партиций по умолчанию для каждого топика. | Значение параметра будет заменено, если оно задано при создании топика. |
offsets.topic.replication.factor | Определяет фактор репликации для топика смещений. | Увеличьте значение для повышения доступности. Внутренний топик не будет создан, пока размер кластера не удовлетворит этому требованию. |
transaction.state.log.replication.factor | Устанавливает фактор репликации для топика транзакций. | Увеличьте значение для повышения доступности. Внутренний топик не будет создан, пока размер кластера не удовлетворит этому требованию. |
transaction.state.log.min.isr | Переопределяет значение min.insync.replicas топика для транзакций. | - |
min.insync.replicas | Определяет минимальное число реплик, которые должны подтвердить запись при использовании значения acks=all (или -1 ). | Если требуемое число реплик недоступно, producer получит ошибку (NotEnoughReplicas или NotEnoughReplicasAfterAppend ). Совместная настройка acks и min.insync.replicas повышает надежность доставки сообщений. |
log.retention.hours | Определяет срок хранения логов (в часах) перед их удалением. | Альтернативные параметры: log.retention.ms , log.retention.minutes . |
log.segment.bytes | Максимальный размер одного файла-сегмента партиции. | - |
ssl.keystore.location | Местоположение файла хранилища ключей. | - |
ssl.keystore.password | Пароль к keystore-файлу, необходимый при использовании SSL и заданном пути к хранилищу ключей. | Не требуется, если не задан ssl.keystore.location . Не поддерживается для PEM-формата. |
ssl.truststore.location | Расположение файла хранилища доверенных сертификатов. | - |
ssl.truststore.password | Пароль для файла хранилища доверенных сертификатов. | При отсутствии пароля файл будет использоваться, но без проверки целостности. Не поддерживается в формате PEM. |
ssl.client.auth | Определяет, требуется ли клиентская аутентификация при использовании SSL-соединения. | Поддерживаются следующие значения: required — клиент обязан пройти аутентификацию. requested — аутентификация желательна, но не обязательна, клиент может не предоставлять данные. none — клиентская аутентификация отключена. |
super.users | Список суперпользователей с полными правами доступа. | Игнорируют настройки ACL. |
ssl.principal.mapping.rules | Список правил преобразования полного имени (CN) из сертификата клиента в короткое имя. | Правила применяются по порядку. Первое подходящее правило используется, остальные игнорируются. |
Настройка узлов Kafka с ролью controller
Пример настройки первого узла с ролью controller
- файл /app/kafka/config/kraft/controller.properties
:
Пример
process.roles=controller
node.id=1
controller.quorum.voters=1@192.168.0.51:9093,2@192.168.0.52:9093,3@192.168.0.53:9093
listeners=CONTROLLER://192.168.0.51:9093
controller.listener.names=CONTROLLER
listener.security.protocol.map=CONTROLLER:SSL
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/app/logs/kraft-logs
num.partitions=5
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=5
transaction.state.log.replication.factor=5
transaction.state.log.min.isr=3
min.insync.replicas=3
log.retention.hours=3
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
ssl.keystore.location=/app/certs/kafka.keystore.jks
ssl.keystore.password=PASSWORDKeystore
ssl.truststore.location=/app/certs/ca-truststore.jks
ssl.truststore.password=PASSWORDTruststore
ssl.client.auth=required
authorizer.class.name=org.apache.kafka.metadata.authorizer.StandardAuthorizer
super.users=User:controller1;User:conroller2;User:controller3;User:broker1;User:broker2;User:broker3;User:broker4;User:broker5;User:adminkfk
allow.everyone.if.no.acl.found=False
ssl.principal.mapping.rules=RULE:^CN=(adminkfk),O=Work,L=Moscow,ST=Moscow,C=RU$/$1/L,RULE:^CN=(controller[1-3]),O=Work,L=Moscow,ST=Moscow,C=RU$/$1/L, RULE:^ CN=(broker[1-5]),O=Work,L=Moscow,ST=Moscow,C=RU$/$1/L
Аналогично первому узлу, необходимо настроить остальные узлы с ролью controller
.
Настройка узлов Kafka c ролью broker
Пример настройки первого узла с ролью broker
:
Пример
process.roles=broker
node.id=4
controller.quorum.voters=1@192.168.0.51:9093,2@192.168.0.52:9093,3@192.168.0.53:9093
listeners=SSL:// 192.168.0.54:9092
inter.broker.listener.name=SSL
advertised.listeners=SSL:// 192.168.0.54:9092
controller.listener.names=CONTROLLER
listener.security.protocol.map=CONTROLLER:SSL,SSL:SSL
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/app/logs/kraft-logs
num.partitions=5
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=5
transaction.state.log.replication.factor=5
transaction.state.log.min.isr=3
min.insync.replicas=3
log.retention.hours=3
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
ssl.keystore.location=/app/certs/kafka.keystore.jks
ssl.keystore.password=PASSWORDkeystore
ssl.truststore.location=/app/certs/ca-truststore.jks
ssl.truststore.password=PASSWORDtruststore
ssl.client.auth=requested
allow.everyone.if.no.acl.found=False
auto.create.topics.enable=false
super.users=User:contreoller1;User:controller2;User:controller3;User:broker1;User:broker2;User:broker3;User:broker4;User:broker5;User:adminkfk
ssl.principal.mapping.rules=RULE:^CN=producer_hosts[1-2],O=Work,L=Moscow,ST=Moscow,C=RU$/producer_user/L,RULE:^CN=consumer_hosts[1-3],O=Work,L=Moscow,ST=Moscow,C=RU$/consumer_user/L,RULE:^CN=(adminkfk),O=Innotech,L=Moscow,ST=Moscow,C=RU$/$1/L
Настройка сервисов Kafka
На серверах Kafka
необходимо создать конфигурационные файлы для управления сервисом Kafka
. Названия файлов будут идентичны для узлов с ролью controller
и broker
- /etc/systemd/system/kafka.service
.
- Пример конфигурационного файла для сервиса
Kafka
на узле c рольюcontroller
:
[Unit]
Description=Apache Kafka server (controller)
Documentation=http://kafka.apache.org/documentation.html
Requires=network.target remote-fs.target
After=network.target remote-fs.target
[Service]
Type=simple
WorkingDirectory=/app/kafka
User=kafka
#User=root
Environment=KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
Environment=LOG_DIR="/app/logs/kafka"
XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent"
Environment=JAVA_HOME="/app/jdk/"
ExecStart=/app/kafka/bin/kafka-server-start.sh /app/kafka/config/kraft/controller.properties
ExecStop=/app/kafka/bin/kafka-server-stop.sh
TimeoutSec=30
LimitNOFILE=infinity
Restart=on-failure
RestartSec=10s
[Install]
WantedBy=multi-user.target
- Пример конфигурационного файла
/app/service/kafka.service
для сервисаKafka
на узле с рольюbroker
:
[Unit]
Description=Apache Kafka server (broker)
Documentation=http://kafka.apache.org/documentation.html
Requires=network.target remote-fs.target
After=network.target remote-fs.target
[Service]
Type=simple
WorkingDirectory=/app/kafka
User=kafka
#User=root
Environment=KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
Environment=LOG_DIR="/app/logs/kafka"
XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent"
Environment=JAVA_HOME="/app/jdk/"
ExecStart=/app/kafka/bin/kafka-server-start.sh /app/kafka/config/kraft/broker.properties
ExecStop=/app/kafka/bin/kafka-server-stop.sh
TimeoutSec=30
LimitNOFILE=infinity
Restart=on-failure
RestartSec=10s
[Install]
WantedBy=multi-user.target
kafka.service_broker
После создания файлов сервисов необходимо перечитать все файлы юнитов из стандартных путей (/etc/systemd/system
, /usr/lib/systemd/system
и т.д.). Для этого выполните команду:
systemctl daemon-reload