Перейти к содержанию

Интеграция систем с помощью Apache Kafka

Настройки брокера

Kafka 3.8 Documentation — 3. Configuration.

Настройка Значение по умолчанию Моё значение Комментарий
auto.create.topics.enable true false Отключить автоматическое создание топиков.
Управлять явно.
delete.topic.enable true false в PROD средах В production-средах рекомендуется запрещать удаление топиков.
Если потребуется, можно временно изменить его значение, перезапустить брокеры, удалить топик, а затем вернуть значение обратно.
default.replication.factor 1 min.insync.replicas + 2 Позволяет разрешить одно запланированное отключение в наборе реплик и одно незапланированное.
log.retention.hours 168 часов или 1 неделя. Рассчитывать. Для изменения рекомендуется использовать более приоритетные опции: log.retention.minutes и log.retention.ms.
log.retention.bytes -1 (без ограничений) Рассчитывать. Значение указывается из расчёта на раздел, а не на топик.
log.segment.bytes 1073741824 (1 gibibyte) Рассчитывать, особенно для топиков с низкой частотой генерации сообщений. Kafka удаляет данные только на уровне сегментов. Если сегмент не заполняется до log.segment.bytes, он не будет удален, пока не истечет время хранения.
Для топиков с низкой частотой сообщений можно уменьшить log.segment.bytes, чтобы сегменты удалялись чаще, даже если они не заполнены.
log.roll.hours 168 часов или 1 неделя. Рассчитывать. Для изменения рекомендуется использовать более приоритетную опцию log.roll.ms.
min.insync.replicas 1 A typical scenario would be to create a topic with a replication factor of 3, set min.insync.replicas to 2, and produce with acks of "all" When a producer sets acks to "all" (or "-1"), min.insync.replicas specifies the minimum number of replicas that must acknowledge a write for the write to be considered successful. If this minimum cannot be met, then the producer will raise an exception (either NotEnoughReplicas or NotEnoughReplicasAfterAppend).
max.message.bytes 1048588 после компрессии, если она включена Не менять. При изменении иметь в виду, что эта опция должна быть согласована с другими. Проверять документацию.

Producer

ProduceRecordProducer.

Отправка возможна синхронная и асинхронная. Метод producer.send() возвращает объект RecordMetadata с информацией о топике, разделе и смещении записанного сообщения и другими метаданными. Обычно, эта информация не нужна Producer.

При асинхронной отправке хочется получать от брокера информацию, удалось ли вообще отправить сообщение. Это позволит сгенерировать исключение, записать сообщение в лог, записать сообщение в файл ошибок для последующего анализа.

Для этого Producer поддерживает добавление Callback при отправке записи: класс org.apache.kafka.clients.producer. Callback и метод onCompletion().

Note

Не рекомендуется выполнять блокирующие операции внутри callback. Вместо этого следует использовать другой поток для одновременного выполнения блокирующей операции.

Настройка

  1. bootstrap.servers — необязательно указывать все брокеры, можно указать 2.
  2. key.serializer:
    1. имя класса, реализующего интерфейс org.apache.kafka.common.serialization.Serializer;
    2. используется для сериализации объекта ключа в байтовый массив;
    3. если не будет ключей, то указать тип Void для ключа и VoidSerializer.
  3. value.serializer — аналогично key.serializer.
  4. client.id:
    1. логический идентификатор клиента и приложения, в котором он используется;
    2. используется брокерами для идентификации сообщений;
    3. строка, указывать что-то осмысленное;
    4. будет фигурировать в логах и метриках, а также для определения квот.
  5. acks:
    1. сколько реплик разделов должны получить запись, чтобы она считалась успешной;
    2. по умолчанию требуется только лидера реплики;
    3. acks=0 — не ждёт ответа от брокера;
    4. acks=1 — ждёт ответа от лидера реплики;
    5. acks=all — ждёт ответа от всех ISR.

Время доставки сообщения

Сколько времени потребуется, пока вызов функции send() не завершится успешно или с ошибкой. Это время, которое мы готовы потратить, пока Kafka не ответит успешно или пока мы не будем готовы сдаться и не признаем поражение.

Настройка Значение по умолчанию Моё значение Комментарий
max.block.ms 60000 (1 минута) Увеличиывем при проблемах в сети, уменьшаем, когда нужна более быстрая реакция на проблемы.
delivery.timeout.ms 120000 (2 минуты) Значение должно быть >= linger.ms + retry.backoff.ms + request.timeout.ms.
request.timeout.ms 30000 (30 секунд) Увеличиывем при проблемах в сети, уменьшаем, когда нужна более быстрая реакция на проблемы.
retry.backoff.ms 100 Это начальные интервал между retry. Увеличивается экспоненциально до retry.backoff.max.ms.
linger.ms 0 Задержка перед отправкой пакета. Увеличивает пропускную способность сети и эффективность сжатия.
buffer.memory 33554432 (32 МБ) Увеличивать имеет смысл, только если producer генерирует большое количество сообщений за короткий промежуток времени, и вы хотите избежать блокировок или ошибок.
compression.type none lz4, если важна скорость и мало CPU-ресурсов, иначе gzip Потребители (consumers) автоматически распаковывают данные, поэтому настройка сжатия на стороне продюсера не требует изменений на стороне потребителя.
batch.size 16384 (16 КБ) Увеличивать имеет смысл, если вы отправляете большие объёмы данных и хотите максимизировать пропускную способность.
Уменьшение может привести к увеличению количества запросов к брокеру и снижению производительности.
Увеличение может увеличить задержку и потребление памяти на стороне продюсера и если задержка не является критичным фактором.
max.in.flight.requests.per.connection 5 Для обеспечения гарантии упорядоченности необходимо либо retries=0, либо enable.idempotence=true.
max.request.size 1048576 (1 МБ) Делать одинаковым с настройкой брокеров message.max.bytes и косньюмеров fetch.max.bytes.
receive.buffer.bytes 32768 (32 kibibytes)
Имеет смысл думать об увеличении, когда consumer/producer взаимодействует с брокерами из другого ЦОД, когда latency больше и throughput ниже.
send.buffer.bytes 131072 (128 kibibytes) Аналогично receive.buffer.bytes.
enable.idempotence false true Producer будет прикреплять порядковый номер к каждой отправляемой записи. Брокер будет проверять их на дубли.
retries 2147483647 Предпочтительно не изменять и оперировать delivery.timeout.ms.

Исключения

В классе KafkaProducer 2 типа исключений:

  1. Те, которые можно исправить повторной попыткой отправки сообщения — retriable. Например, ошибки соединения, ошибка «отсутствует ведущий узел для раздела». Можно настроить KafkaProducer так, чтобы при таких ошибках отправка повторялась автоматически (с ограничением max числа попыток).
  2. Те, которые невозможно исправить повторной отправкой сообщения. Например, «сообщение слишком велико».

Возможные исключения на стороне Producer:

  1. SerializationException — неудачная сериализация сообщения.
  2. BufferExhaustedException или TimeoutException — при переполнении буфера.
  3. InterruptException — при сбое отправляющего потока.
  4. DuplicateSequenceException — безобидное исключение при enable.idempotence=true и получении брокером дубликата сообщения.
  5. TimeoutException — время ожидания превышает max.block.ms.
  6. InvalidRecordException — когда сообщение не соответствует ожидаемому формату или содержит некорректные данные.

Note

В целом, поскольку producer обрабатывает повторные попытки сам, нет смысла обрабатывать их в рамках логики приложения. Вместо этого, необходимо сосредоточить усилия на обработке ошибок, не допускающих повторных попыток, или случаев, когда повторные попытки были исчерпаны.

Сериализаторы

По умолчанию — StringSerializer.

Есть ещё Apache Avro, Thrift, Protobuf.

Схемы Apache Avro предпочтительно хранить в Schema Registry. Если его нет, то можно создать файл схемы в формате JSON и сохранить его в файле .avsc. Далее, этот файл использовать со стороны Producer и Consumer для сериализации/десериализации.

Партиции

Partitioners:

  1. DefaultPartitioner;
  2. RoundRobinPartitioner;
  3. UniformStickyPartitioner.

Kafka распределяет сообщения по партициям внутри топика. Ключ сообщения используется для определения, в какую партицию оно будет отправлено. По умолчанию Kafka использует хэш ключа для выбора партиции. Это гарантирует, что все сообщения с одинаковым ключом попадут в одну и ту же партицию, что важно для обеспечения порядка обработки сообщений.

Если порядок сообщений не важен или если все сообщения независимы, можно отправлять их без ключа. В этом случае Kafka будет распределять сообщения по партициям случайным образом (батчами).

Рекомендуется заранее определять количество партиций и никогда не добавлять новые.

Заголовки

Заголовки сообщений в Apache Kafka — это метаданные, которые можно добавить к каждому сообщению. Они представляют собой пары ключ-значение и могут использоваться для передачи дополнительной информации, которая не является частью основного тела сообщения. Вот несколько рекомендаций, что можно записывать в заголовки на стороне Producer:

  1. Идентификаторы и метаданные
    • Trace ID / Correlation ID: Для трассировки сообщений в распределенных системах. Это помогает отслеживать цепочку событий.
    • Message ID: Уникальный идентификатор сообщения для отслеживания и логирования.
    • Source Application: Название приложения или сервиса, отправившего сообщение.
    • Timestamp: Временная метка создания сообщения (если она не дублирует timestamp Kafka).
  2. Информация о версии
    • Schema Version: Версия схемы данных (например, если используется Avro или Protobuf).
    • API Version: Версия API, используемого для создания сообщения.
  3. Маршрутизация и фильтрация
    • Routing Key: Ключ для маршрутизации сообщения в определенный топик или партицию.
    • Tags / Labels: Метки для фильтрации или категоризации сообщений (например, "high-priority", "test", "production").
  4. Безопасность
    • Authentication Token: Токен для аутентификации или авторизации.
    • Encryption Key ID: Идентификатор ключа шифрования, если сообщение зашифровано.
  5. Бизнес-логика
    • User ID: Идентификатор пользователя, связанного с сообщением.
    • Transaction ID: Идентификатор транзакции, если сообщение является частью бизнес-процесса.
    • Event Type: Тип события (например, "order_created", "user_updated").
  6. Технические метаданные
    • Compression Type: Тип сжатия данных (если используется).
    • Content-Type: Тип содержимого (например, "application/json", "avro/binary").
  7. Отладка и мониторинг
    • Debug Flag: Флаг для включения дополнительной отладочной информации.
    • Environment: Среда, в которой было создано сообщение (например, "dev", "staging", "prod").

Квоты и регулирование запросов

Следует проектировать и настраивать. Настройка через конфиг требует рестарта брокеров. Динамическая настройка — нет. См. детали в Quota Configuration.

Прочее

  1. Топики:
    1. для запросов
    2. для ответов
    3. названия
  2. Партиции
    1. Если >1 consumer, то нужно добавлять партиции.
    2. Сообщения упорядочены только в рамках 1 партиции.
  3. Размер сегмента
  4. TTL сообщения
  5. replication factor
  6. Формат сераилизации
  7. Распределение Leaders по Brokers
  8. Message
    1. Key
    2. Value
    3. Timestamp
    4. Headers
    5. Сжатие payload
  9. Настройка min.insync.replicas
  10. Producer acks
    1. 1
  11. Семантика - at-least-once

Обеспечение идемпотентности

  1. Идемпотентный продюсер: enable.idempotence=true, acks=all
  2. Использование транзакций — transactional.id
  3. Дедупликация на стороне потребителя — уникальные ID сообщений + Redis или БД.

Сравнение алгоритмов компрессии

Характеристика GZIP Snappy LZ4 ZSTD
Степень сжатия Высокая Низкая Средняя Высокая
Скорость сжатия Низкая Высокая Очень высокая Средняя
Скорость распаковки Низкая Высокая Очень высокая Высокая
Использование CPU Высокое Низкое Очень низкое Среднее
Пропускная способность Низкая Высокая Очень высокая Высокая
Параметр конфигурации compression.type=gzip compression.type=snappy compression.type=lz4 compression.type=zstd
Оптимально для Максимальной экономии места при некритичной производительности Баланса между производительностью и сжатием Высокопроизводительных систем с большим объемом данных Хорошего сжатия при приемлемой производительности
Коэффициент сжатия (типичный) 2.7x - 3.5x 1.5x - 1.7x 1.8x - 2.1x 2.5x - 3.0x
Поддержка в Kafka С ранних версий С ранних версий С версии 0.10.0.0 С версии 2.1.0
Зависимости Встроена в JDK Требует нативную библиотеку Требует нативную библиотеку Требует нативную библиотеку
Совместимость с клиентами Высокая Высокая Высокая Может потребоваться обновление для старых клиентов

Примечание: Фактические показатели могут варьироваться в зависимости от характера данных, конфигурации оборудования и других факторов. В Kafka 2.8 все перечисленные алгоритмы сжатия полностью поддерживаются.

Протоколы безопасности подключения к Kafka

В Apache Kafka 2.8 доступны несколько протоколов безопасности для защиты подключений к кластеру. Рассмотрим основные из них:

1. SSL/TLS (Secure Sockets Layer / Transport Layer Security)

  • Описание: SSL/TLS используется для шифрования данных, передаваемых между клиентами и брокерами Kafka, а также между самими брокерами. Это обеспечивает конфиденциальность и целостность данных.
  • Использование:
    • Шифрование трафика между клиентами и брокерами.
    • Аутентификация клиентов с использованием сертификатов.
  • Преимущества:
    • Высокий уровень безопасности.
    • Поддержка аутентификации на основе сертификатов.
  • Недостатки:
    • Сложность настройки и управления сертификатами.
    • Некоторое снижение производительности из-за шифрования.

2. SASL (Simple Authentication and Security Layer)

  • Описание: SASL предоставляет механизмы аутентификации для клиентов и брокеров Kafka. Поддерживает несколько механизмов, таких как PLAIN, SCRAM, GSSAPI (Kerberos).
  • Механизмы SASL:
    • SASL/PLAIN: Простой механизм аутентификации с использованием имени пользователя и пароля. Не рекомендуется для использования в производственных средах без дополнительного шифрования (например, TLS).
    • SASL/SCRAM: Более безопасный механизм, использующий хэширование паролей и защиту от атак повторного воспроизведения. Подходит для производственных сред.
    • SASL/GSSAPI (Kerberos): Использует протокол Kerberos для аутентификации. Обеспечивает высокий уровень безопасности и подходит для корпоративных сред.
  • Преимущества:
    • Гибкость в выборе механизмов аутентификации.
    • Поддержка Kerberos для интеграции с корпоративными системами.
  • Недостатки:
    • Сложность настройки, особенно для Kerberos.
    • SASL/PLAIN уязвим без дополнительного шифрования.

3. SASL_SSL

  • Описание: Комбинация SASL и SSL/TLS, которая обеспечивает как аутентификацию, так и шифрование данных.
  • Использование:
    • Аутентификация клиентов с использованием SASL.
    • Шифрование трафика с помощью SSL/TLS.
  • Преимущества:
    • Высокий уровень безопасности за счет комбинации аутентификации и шифрования.
    • Подходит для производственных сред.
  • Недостатки:
    • Сложность настройки и управления сертификатами и механизмами SASL.

4. PLAINTEXT

  • Описание: Отсутствие шифрования и аутентификации. Данные передаются в открытом виде.
  • Использование:
    • Только для тестовых сред или внутренних сетей с низкими требованиями к безопасности.
  • Преимущества:
    • Простота настройки.
  • Недостатки:
    • Отсутствие безопасности. Не подходит для производственных сред.

Сравнение протоколов безопасности

Протокол Шифрование Аутентификация Сложность настройки Рекомендации по использованию
SSL/TLS Да Сертификаты Высокая Для шифрования трафика и аутентификации на основе сертификатов.
SASL/PLAIN Нет Имя/пароль Низкая Только для тестовых сред с дополнительным шифрованием.
SASL/SCRAM Нет Имя/пароль Средняя Для производственных сред с аутентификацией.
SASL/GSSAPI Нет Kerberos Высокая Для корпоративных сред с интеграцией Kerberos.
SASL_SSL Да SASL Высокая Для производственных сред с аутентификацией и шифрованием.
PLAINTEXT Нет Нет Низкая Только для тестовых сред.

Что дальше?

  1. Нашли эту статью полезной? Поделитесь ею и помогите распространить знания!
  2. Нашли ошибку или есть идеи 💡 о том, что и как я могу улучшить? Напишите мне в Telegram.
  3. Хотите узнать обо мне больше? Читайте здесь.