Перейти к основному содержимому
Версия: 5.1

Генерация идентификатора кластера и подготовка хранилищ Kafka

Идентификатор кластера - это уникальный идентификатор, генерируемый при создании кластера. Служит внутренней идентификации узлов кластера и управления метаданными. Изменение ID кластера Kafka приведет к его неработоспособности.

На всех узлах кластера необходимо подготовить хранилища Kafka с помощью утилиты kafka-storage.sh. Она создаст необходимую структуру каталогов и файлов для хранения данных Kafka.

  1. На любом узле Kafka выполнить команду для генерации cluster_id:
$ KAFKA_CLUSTER_ID="$(JAVA_HOME=/app/jdk/ /app/kafka/bin/kafka-storage.sh random-uuid)"

Получить значение переменной:

$ echo $KAFKA_CLUSTER_ID
  1. Переменная должна быть задана на всех узлах кластера. Значение берется с узла, на котором был впервые сгенерирован cluster_id. Выполните команду:
$ KAFKA_CLUSTER_ID= mK5QOoYJQFaaGLq9wV6uiA
  1. На всех узлах Kafka с ролью controller выполните команду:
$ JAVA_HOME=/app/jdk/ /app/kafka/bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c /app/kafka/config/kraft/controller.properties
  1. На всех узлах Kafka с ролью broker выполните команду:
$ JAVA_HOME=/app/jdk/ /app/kafka/bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c /app/kafka/config/kraft/broker.properties

Настройка хранилища сертификатов

В примере рассматривается использование локального центра сертификации. Также каждому узлу Kafka будет выдан индивидуальный сертификат. Потребуются корневой, промежуточный(при наличии) и сертификаты узлов в формате .pem.

Сертификаты промежуточные и корневые, которые используются для проверки подлинности других сторон размещаются в truststore. Сертификат и приватный ключ сервера хранится в keystore. Наименование truststore и keystore не имеет значение и можно выбрать любое название.

Пример таблицы сертификатов:

Common Name (CN)CSTLOSubject Alternative Name
adminkfkRUMoscowMoscowWork-
broker1RUMoscowMoscowWorkDNS:broker1, IP:192.168.0.54
broker2RUMoscowMoscowWorkDNS:broker2, IP:192.168.0.55
broker3RUMoscowMoscowWorkDNS:broker3, IP:192.168.0.56
broker4RUMoscowMoscowWorkDNS:broker4, IP:192.168.0.57
broker5RUMoscowMoscowWorkDNS:broker5, IP:192.168.0.58
controller1RUMoscowMoscowWorkDNS:controller1, IP:192.168.0.51
controller2RUMoscowMoscowWorkDNS:controller2, IP:192.168.0.52
controller3RUMoscowMoscowWorkDNS:controller3, IP:192.168.0.53
producer_host1RUMoscowMoscowWorkDNS:producer_host1, IP:192.168.0.71
consumer_host1RUMoscowMoscowWorkDNS:consumer_host1, IP:192.168.0.81
Примечание

Корневой (ca-cert.pem) и промежуточный (ca-sub-cert.pem) сертификаты нужно разместить в хранилище с именем ca-truststore.jks.

Для добавление корневого сертификата выполните команду:

$ JAVA_HOME=/app/jdk/ /app/jdk/bin/keytool -keystore /app/certs/ca-truststore.jks -alias ca-root -importcert -file ca-cert.pem

При выполнении команды будет создан keystore. Система запросит ввод нового пароля хранилища. Этот пароль потребуется для добавления новых сертификатов и его будет использовать Kafka при обращении к truststore. Затем появится запрос: Trust this certificate? [no]:, на который нужно ответить yes. В конце будет сообщение: Certificate was added to keystore.

Для добавления промежуточного сертификата выполните аналогичную команду:

$JAVA_HOME=/app/jdk/ /app/jdk/bin/keytool -keystore /app/certs/ca-truststore.jks
-alias ca-sub-root -importcert -file ca-sub-cert.pem

Хранилище ca-truststore.jks должно быть одинаковым для всех узлов Kafka и распространяться на остальные хосты любым удобным способом.

Сертификат сервера и соответствующий ему закрытый ключ необходимо разместить в хранилище kafka.keystore.jks. Поскольку keytool не поддерживает отдельный импорт ключа, требуется предварительно объединить ключ и сертификат в формат .p12. Для этого выполните команду:

$ openssl pkcs12 -export -in broker1-cert.pem -inkey broker1-key.pem -name broker1 -out broker1.p12

Система запросит ввод пароля Enter Export Password. Указанный пароль необходимо сохранить — он потребуется при импорте сертификата в хранилище kafka.keystore.jks. Для импорта сертификата в хранилище выполните команду:

$ JAVA_HOME=/app/jdk/ /app/jdk/bin/keytool  -importkeystore -destkeystore /app/certs/ kafka.keystore.jks -srckeystore broker1.p12 -srcstoretype PKCS12

Система попросит задать новый пароль Enter destination keystore password — его следует сохранить. После этого потребуется ввести ранее установленный пароль экспорта Export Password. В завершении операции система выведет: Entry for alias broker1 successfully imported. Import command completed: 1 entries successfully imported, 0 entries failed or cancelled.

Для проверки импортированных сертификатов в хранилище можно использовать следующие команды:

$ JAVA_HOME=/app/jdk/ /app/jdk/bin/keytool -keystore /app/certs/kafka.keystore.jks --list
$ JAVA_HOME=/app/jdk/ /app/jdk/bin/keytool -keystore /app/certs/ca-truststore.jks –list
Примечание

Пароли для доступа к хранилищам сертификатов следует указать в конфигурационных файлах /app/kafka/config/kraft/broker.properties и /app/kafka/config/kraft/controller.properties в разделе Security, используя переменные ssl.keystore.password и ssl.truststore.password.

Те же действия по импорту сертификата субъекта в хранилище kafka.keystore.jks необходимо выполнить на каждом узле Kafka, поскольку для каждого узла был подготовлен отдельный сертификат.

Для администрирования кластера создан отдельный сертификат с CN = adminkfk. Нам потребуется создать для него отдельный keystore. Его можно разместить на любом узле кластера Kafka, либо стороннем узле (в таком случае - потребуется дополнительные инструменты для управление кластером. В рамках данной статьи не рассматривается). В качестве примера управление кластером будет осуществляться только с первого узла с ролью broker. На первом узле c ролью broker выполните команды:

$ openssl pkcs12 -export -in adminkfk-cert.pem -inkey adminkfk-key.pem -name adminkfk -out adminkfk.p12
$ JAVA_HOME=/app/jdk/ /app/jdk/bin/keytool -importkeystore -destkeystore /app/certs/adminkfk.keystore.jks -srckeystore adminkfk.p12 -srcstoretype PKCS12

Пароли для доступа к хранилищам ca-truststore.jks и adminkfk.keystore.jks следует указать в файле /app/certs/adminkfk.properties:

security.protocol=SSL
ssl.keystore.location=/app/certs/adminkfk.keystore.jks
ssl.keystore.password=PasswordKeystore
ssl.truststore.location=/app/certs/ca-truststore.jks
ssl.truststore.password=PasswordTruststore
client.id=adminkfk
ssl.endpoint.identification.algorithm=

Обратите внимание на параметр ssl.endpoint.identification.algorithm=. Аналогичный парамер указан в конфигурационных файлах controller.properties, broker.properties. Также используется в клиентских конфигурациях (producer/consumer). Служит для проверки соответствия имени хоста в сертификате сервера или клиента, с фактическим именем хоста при подключении.

По умолчанию имеет значение https - включает проверку имени хоста, по аналогии с https. Kafka сравнивает CN или SAN в сертификате с именем хоста, с которого выполняется подключение. При использовании пустой строки - отключается проверка имени хоста.