Интеграция систем с помощью Apache Kafka
Настройки брокера
Kafka 3.8 Documentation — 3. Configuration.
Настройка | Значение по умолчанию | Моё значение | Комментарий |
---|---|---|---|
auto.create.topics.enable |
true |
false |
Отключить автоматическое создание топиков. Управлять явно. |
delete.topic.enable |
true |
false в PROD средах |
В production-средах рекомендуется запрещать удаление топиков. Если потребуется, можно временно изменить его значение, перезапустить брокеры, удалить топик, а затем вернуть значение обратно. |
default.replication.factor |
1 | min.insync.replicas + 2 |
Позволяет разрешить одно запланированное отключение в наборе реплик и одно незапланированное. |
log.retention.hours |
168 часов или 1 неделя. | Рассчитывать. | Для изменения рекомендуется использовать более приоритетные опции: log.retention.minutes и log.retention.ms . |
log.retention.bytes |
-1 (без ограничений) | Рассчитывать. | Значение указывается из расчёта на раздел, а не на топик. |
log.segment.bytes |
1073741824 (1 gibibyte) | Рассчитывать, особенно для топиков с низкой частотой генерации сообщений. | Kafka удаляет данные только на уровне сегментов. Если сегмент не заполняется до log.segment.bytes , он не будет удален, пока не истечет время хранения.Для топиков с низкой частотой сообщений можно уменьшить log.segment.bytes , чтобы сегменты удалялись чаще, даже если они не заполнены. |
log.roll.hours |
168 часов или 1 неделя. | Рассчитывать. | Для изменения рекомендуется использовать более приоритетную опцию log.roll.ms . |
min.insync.replicas |
1 | A typical scenario would be to create a topic with a replication factor of 3, set min.insync.replicas to 2, and produce with acks of "all" |
When a producer sets acks to "all" (or "-1"), min.insync.replicas specifies the minimum number of replicas that must acknowledge a write for the write to be considered successful. If this minimum cannot be met, then the producer will raise an exception (either NotEnoughReplicas or NotEnoughReplicasAfterAppend ). |
max.message.bytes |
1048588 после компрессии, если она включена | Не менять. | При изменении иметь в виду, что эта опция должна быть согласована с другими. Проверять документацию. |
Producer
ProduceRecord
→ Producer
.
Отправка возможна синхронная и асинхронная. Метод producer.send()
возвращает объект RecordMetadata
с информацией
о топике, разделе и смещении записанного сообщения и другими метаданными. Обычно, эта информация не нужна Producer
.
При асинхронной отправке хочется получать от брокера информацию, удалось ли вообще отправить сообщение. Это позволит сгенерировать исключение, записать сообщение в лог, записать сообщение в файл ошибок для последующего анализа.
Для этого Producer поддерживает добавление Callback при отправке записи: класс org.apache.kafka.clients.producer.
Callback
и метод onCompletion()
.
Note
Не рекомендуется выполнять блокирующие операции внутри callback. Вместо этого следует использовать другой поток для одновременного выполнения блокирующей операции.
Настройка
bootstrap.servers
— необязательно указывать все брокеры, можно указать 2.key.serializer
:- имя класса, реализующего интерфейс
org.apache.kafka.common.serialization.Serializer
; - используется для сериализации объекта ключа в байтовый массив;
- если не будет ключей, то указать тип
Void
для ключа иVoidSerializer
.
- имя класса, реализующего интерфейс
value.serializer
— аналогичноkey.serializer
.client.id
:- логический идентификатор клиента и приложения, в котором он используется;
- используется брокерами для идентификации сообщений;
- строка, указывать что-то осмысленное;
- будет фигурировать в логах и метриках, а также для определения квот.
acks
:- сколько реплик разделов должны получить запись, чтобы она считалась успешной;
- по умолчанию требуется только лидера реплики;
acks=0
— не ждёт ответа от брокера;acks=1
— ждёт ответа от лидера реплики;acks=all
— ждёт ответа от всех ISR.
Время доставки сообщения
Сколько времени потребуется, пока вызов функции send()
не завершится успешно или с ошибкой. Это время, которое мы
готовы потратить, пока Kafka не ответит успешно или пока мы не будем готовы сдаться и не признаем поражение.
Настройка | Значение по умолчанию | Моё значение | Комментарий |
---|---|---|---|
max.block.ms |
60000 (1 минута) | — | Увеличиывем при проблемах в сети, уменьшаем, когда нужна более быстрая реакция на проблемы. |
delivery.timeout.ms |
120000 (2 минуты) | — | Значение должно быть >= linger.ms + retry.backoff.ms + request.timeout.ms . |
request.timeout.ms |
30000 (30 секунд) | — | Увеличиывем при проблемах в сети, уменьшаем, когда нужна более быстрая реакция на проблемы. |
retry.backoff.ms |
100 | — | Это начальные интервал между retry. Увеличивается экспоненциально до retry.backoff.max.ms . |
linger.ms |
0 | — | Задержка перед отправкой пакета. Увеличивает пропускную способность сети и эффективность сжатия. |
buffer.memory |
33554432 (32 МБ) | — | Увеличивать имеет смысл, только если producer генерирует большое количество сообщений за короткий промежуток времени, и вы хотите избежать блокировок или ошибок. |
compression.type |
none |
lz4 , если важна скорость и мало CPU-ресурсов, иначе gzip |
Потребители (consumers) автоматически распаковывают данные, поэтому настройка сжатия на стороне продюсера не требует изменений на стороне потребителя. |
batch.size |
16384 (16 КБ) | — | Увеличивать имеет смысл, если вы отправляете большие объёмы данных и хотите максимизировать пропускную способность. Уменьшение может привести к увеличению количества запросов к брокеру и снижению производительности. Увеличение может увеличить задержку и потребление памяти на стороне продюсера и если задержка не является критичным фактором. |
max.in.flight.requests.per.connection |
5 | — | Для обеспечения гарантии упорядоченности необходимо либо retries=0 , либо enable.idempotence=true . |
max.request.size |
1048576 (1 МБ) | — | Делать одинаковым с настройкой брокеров message.max.bytes и косньюмеров fetch.max.bytes . |
receive.buffer.bytes |
32768 (32 kibibytes) | — |
Имеет смысл думать об увеличении, когда consumer/producer взаимодействует с брокерами из другого ЦОД, когда latency больше и throughput ниже. |
send.buffer.bytes |
131072 (128 kibibytes) | — | Аналогично receive.buffer.bytes . |
enable.idempotence |
false |
true |
Producer будет прикреплять порядковый номер к каждой отправляемой записи. Брокер будет проверять их на дубли. |
retries |
2147483647 | — | Предпочтительно не изменять и оперировать delivery.timeout.ms . |
Исключения
В классе KafkaProducer
2 типа исключений:
- Те, которые можно исправить повторной попыткой отправки сообщения — retriable. Например, ошибки соединения,
ошибка «отсутствует ведущий узел для раздела». Можно настроить
KafkaProducer
так, чтобы при таких ошибках отправка повторялась автоматически (с ограничением max числа попыток). - Те, которые невозможно исправить повторной отправкой сообщения. Например, «сообщение слишком велико».
Возможные исключения на стороне Producer:
SerializationException
— неудачная сериализация сообщения.BufferExhaustedException
илиTimeoutException
— при переполнении буфера.InterruptException
— при сбое отправляющего потока.DuplicateSequenceException
— безобидное исключение приenable.idempotence=true
и получении брокером дубликата сообщения.TimeoutException
— время ожидания превышаетmax.block.ms
.InvalidRecordException
— когда сообщение не соответствует ожидаемому формату или содержит некорректные данные.
Note
В целом, поскольку producer обрабатывает повторные попытки сам, нет смысла обрабатывать их в рамках логики приложения. Вместо этого, необходимо сосредоточить усилия на обработке ошибок, не допускающих повторных попыток, или случаев, когда повторные попытки были исчерпаны.
Сериализаторы
По умолчанию — StringSerializer
.
Есть ещё Apache Avro, Thrift, Protobuf.
Схемы Apache Avro предпочтительно хранить в Schema Registry. Если его нет, то можно создать файл схемы в формате
JSON и сохранить его в файле .avsc
. Далее, этот файл использовать со стороны Producer и Consumer для
сериализации/десериализации.
Партиции
Partitioners:
DefaultPartitioner
;RoundRobinPartitioner
;UniformStickyPartitioner
.
Kafka распределяет сообщения по партициям внутри топика. Ключ сообщения используется для определения, в какую партицию оно будет отправлено. По умолчанию Kafka использует хэш ключа для выбора партиции. Это гарантирует, что все сообщения с одинаковым ключом попадут в одну и ту же партицию, что важно для обеспечения порядка обработки сообщений.
Если порядок сообщений не важен или если все сообщения независимы, можно отправлять их без ключа. В этом случае Kafka будет распределять сообщения по партициям случайным образом (батчами).
Рекомендуется заранее определять количество партиций и никогда не добавлять новые.
Заголовки
Заголовки сообщений в Apache Kafka — это метаданные, которые можно добавить к каждому сообщению. Они представляют собой пары ключ-значение и могут использоваться для передачи дополнительной информации, которая не является частью основного тела сообщения. Вот несколько рекомендаций, что можно записывать в заголовки на стороне Producer:
- Идентификаторы и метаданные
- Trace ID / Correlation ID: Для трассировки сообщений в распределенных системах. Это помогает отслеживать цепочку событий.
- Message ID: Уникальный идентификатор сообщения для отслеживания и логирования.
- Source Application: Название приложения или сервиса, отправившего сообщение.
- Timestamp: Временная метка создания сообщения (если она не дублирует timestamp Kafka).
- Информация о версии
- Schema Version: Версия схемы данных (например, если используется Avro или Protobuf).
- API Version: Версия API, используемого для создания сообщения.
- Маршрутизация и фильтрация
- Routing Key: Ключ для маршрутизации сообщения в определенный топик или партицию.
- Tags / Labels: Метки для фильтрации или категоризации сообщений (например, "high-priority", "test", "production").
- Безопасность
- Authentication Token: Токен для аутентификации или авторизации.
- Encryption Key ID: Идентификатор ключа шифрования, если сообщение зашифровано.
- Бизнес-логика
- User ID: Идентификатор пользователя, связанного с сообщением.
- Transaction ID: Идентификатор транзакции, если сообщение является частью бизнес-процесса.
- Event Type: Тип события (например, "order_created", "user_updated").
- Технические метаданные
- Compression Type: Тип сжатия данных (если используется).
- Content-Type: Тип содержимого (например, "application/json", "avro/binary").
- Отладка и мониторинг
- Debug Flag: Флаг для включения дополнительной отладочной информации.
- Environment: Среда, в которой было создано сообщение (например, "dev", "staging", "prod").
Квоты и регулирование запросов
Следует проектировать и настраивать. Настройка через конфиг требует рестарта брокеров. Динамическая настройка — нет. См. детали в Quota Configuration.
Прочее
- Топики:
- для запросов
- для ответов
- названия
- Партиции
- Если >1 consumer, то нужно добавлять партиции.
- Сообщения упорядочены только в рамках 1 партиции.
- Размер сегмента
- TTL сообщения
- replication factor
- Формат сераилизации
- Распределение Leaders по Brokers
- Message
- Key
- Value
- Timestamp
- Headers
- Сжатие payload
- Настройка
min.insync.replicas
- Producer acks
1
- Семантика -
at-least-once
Обеспечение идемпотентности
- Идемпотентный продюсер:
enable.idempotence=true
,acks=all
- Использование транзакций —
transactional.id
- Дедупликация на стороне потребителя — уникальные ID сообщений + Redis или БД.
Сравнение алгоритмов компрессии
Характеристика | GZIP | Snappy | LZ4 | ZSTD |
---|---|---|---|---|
Степень сжатия | Высокая | Низкая | Средняя | Высокая |
Скорость сжатия | Низкая | Высокая | Очень высокая | Средняя |
Скорость распаковки | Низкая | Высокая | Очень высокая | Высокая |
Использование CPU | Высокое | Низкое | Очень низкое | Среднее |
Пропускная способность | Низкая | Высокая | Очень высокая | Высокая |
Параметр конфигурации | compression.type=gzip | compression.type=snappy | compression.type=lz4 | compression.type=zstd |
Оптимально для | Максимальной экономии места при некритичной производительности | Баланса между производительностью и сжатием | Высокопроизводительных систем с большим объемом данных | Хорошего сжатия при приемлемой производительности |
Коэффициент сжатия (типичный) | 2.7x - 3.5x | 1.5x - 1.7x | 1.8x - 2.1x | 2.5x - 3.0x |
Поддержка в Kafka | С ранних версий | С ранних версий | С версии 0.10.0.0 | С версии 2.1.0 |
Зависимости | Встроена в JDK | Требует нативную библиотеку | Требует нативную библиотеку | Требует нативную библиотеку |
Совместимость с клиентами | Высокая | Высокая | Высокая | Может потребоваться обновление для старых клиентов |
Примечание: Фактические показатели могут варьироваться в зависимости от характера данных, конфигурации оборудования и других факторов. В Kafka 2.8 все перечисленные алгоритмы сжатия полностью поддерживаются.
Протоколы безопасности подключения к Kafka
В Apache Kafka 2.8 доступны несколько протоколов безопасности для защиты подключений к кластеру. Рассмотрим основные из них:
1. SSL/TLS (Secure Sockets Layer / Transport Layer Security)
- Описание: SSL/TLS используется для шифрования данных, передаваемых между клиентами и брокерами Kafka, а также между самими брокерами. Это обеспечивает конфиденциальность и целостность данных.
- Использование:
- Шифрование трафика между клиентами и брокерами.
- Аутентификация клиентов с использованием сертификатов.
- Преимущества:
- Высокий уровень безопасности.
- Поддержка аутентификации на основе сертификатов.
- Недостатки:
- Сложность настройки и управления сертификатами.
- Некоторое снижение производительности из-за шифрования.
2. SASL (Simple Authentication and Security Layer)
- Описание: SASL предоставляет механизмы аутентификации для клиентов и брокеров Kafka. Поддерживает несколько механизмов, таких как PLAIN, SCRAM, GSSAPI (Kerberos).
- Механизмы SASL:
- SASL/PLAIN: Простой механизм аутентификации с использованием имени пользователя и пароля. Не рекомендуется для использования в производственных средах без дополнительного шифрования (например, TLS).
- SASL/SCRAM: Более безопасный механизм, использующий хэширование паролей и защиту от атак повторного воспроизведения. Подходит для производственных сред.
- SASL/GSSAPI (Kerberos): Использует протокол Kerberos для аутентификации. Обеспечивает высокий уровень безопасности и подходит для корпоративных сред.
- Преимущества:
- Гибкость в выборе механизмов аутентификации.
- Поддержка Kerberos для интеграции с корпоративными системами.
- Недостатки:
- Сложность настройки, особенно для Kerberos.
- SASL/PLAIN уязвим без дополнительного шифрования.
3. SASL_SSL
- Описание: Комбинация SASL и SSL/TLS, которая обеспечивает как аутентификацию, так и шифрование данных.
- Использование:
- Аутентификация клиентов с использованием SASL.
- Шифрование трафика с помощью SSL/TLS.
- Преимущества:
- Высокий уровень безопасности за счет комбинации аутентификации и шифрования.
- Подходит для производственных сред.
- Недостатки:
- Сложность настройки и управления сертификатами и механизмами SASL.
4. PLAINTEXT
- Описание: Отсутствие шифрования и аутентификации. Данные передаются в открытом виде.
- Использование:
- Только для тестовых сред или внутренних сетей с низкими требованиями к безопасности.
- Преимущества:
- Простота настройки.
- Недостатки:
- Отсутствие безопасности. Не подходит для производственных сред.
Сравнение протоколов безопасности
Протокол | Шифрование | Аутентификация | Сложность настройки | Рекомендации по использованию |
---|---|---|---|---|
SSL/TLS | Да | Сертификаты | Высокая | Для шифрования трафика и аутентификации на основе сертификатов. |
SASL/PLAIN | Нет | Имя/пароль | Низкая | Только для тестовых сред с дополнительным шифрованием. |
SASL/SCRAM | Нет | Имя/пароль | Средняя | Для производственных сред с аутентификацией. |
SASL/GSSAPI | Нет | Kerberos | Высокая | Для корпоративных сред с интеграцией Kerberos. |
SASL_SSL | Да | SASL | Высокая | Для производственных сред с аутентификацией и шифрованием. |
PLAINTEXT | Нет | Нет | Низкая | Только для тестовых сред. |