Отслеживание Command Processing End-to-End Latency с помощью заголовков сообщений Kafka¶
1. Цели¶
Обычно с клиентами заключаются SLA-контракты на время обработки Системой команд, поступающих во входящие командные топики Kafka. Они же не могут ждать бесконечно и хотят какой-то определённости!:-)
Отсюда необходимость иметь какой-то инструмент для отслеживания Command Processing End-to-End Latency. Могла бы помочь система трассировки (APM, Jaeger, Ключ-Астром, etc), но её может не быть в моменте или интеграция долгая/сложная. К тому же, системы трассировки обычно семплируют данные и дают только верхнеуровневую картинку.
Они не смогут помочь, когда требуется гранулярность до уровня конкретного сообщения, а не "статистика". А ещё нужно переключаться между разными системами: логи → трассировка → обратно логи. В этом месте можно представить себе "аварийку" в 2 часа ночи или задачу по разработке PostMortem.
Цель
Для выполнения SLO необходимо с помощью заголовков сообщений Kafka отслеживать Command Processing End-to-End Latency: от момента отправки сообщения клиентом во входящий командный топик до получения результата в исходящем топике событий.
2. Ограничения¶
- В распредёленных системах (построенные по микросервисной архитектуре как раз такие), есть фундаментальные проблемы со временем. См. Time Matters. Используя заголовки сообщений Kafka для отслеживания времени следует знать и принимать известные ограничения.
- Ресурсы команды ограничены.
3. Задачи¶
Для достижения цели с учётом ограничений, можно подумать о решении следующих задач по принципу "сначала поставим ёлочку, а уж потом развешаем игрушки":
-
"Елочка" — первостепенные:
-
Обеспечить отслеживание разницы времени между (одна цифра для быстрого отслеживания SLO):
- временем создания клиентом инициирующего сообщения, публикуемого им во входящий командный топик и;
- временем создания системой результирующего сообщения, публикуемого ей в исходящий топик событий.
Service Boundary
Подразумевается, что Система не отвечает за то, с какой задержкой клиент вычитывает сообщения. Это его обязанность — делать это постоянно и с минимальной задержкой.
Service Boundary проходит по Producer в исходящем топике событий.
-
-
"Игрушки" — второстепенные:
- Обеспечить отслеживание времени "отлеживания" в топиках (lag) накопительным итогом. Так можно будет оценить, какие consumer не справляются с нагрузкой, чтобы выполнить их доумощнение/оптимизацию.
- Обеспечить отслеживание времени обработки в микросервисах накопительным итогом. Так можно будет понять "вклад" compute каждого хопа во всей цепочке, чтобы оптимизировать узкие места.
- Обеспечить отслеживание длины цепочки (числа хопов) с ограничением их числа. Это поможет митигировать риск возникновения бесконечных циклов обработки сообщений. К тому же, счетчик хопов является простой и дешевой метрикой, которую каждый микросервис может экспортировать в Prometheus. Это позволяет строить в Grafana дашборды, показывающие распределение длин цепочек обработки для разных типов команд. Рост среднего или максимального числа хопов со временем является явным сигналом об усложнении архитектуры или появлении "узких мест", который сложно получить другими способами без написания сложных запросов к данным Jaeger или ELK.
4. Описание подхода¶
Подход должен быть простым и "защищённым" от перекосов времени и мульти‑хопов, поэтому:
- Не хранить подробную "историю" по каждому микросервису цепочки в заголовках. Это раздувает сообщения и усложняет поддержку.
- Использовать единую схему "накопительных счётчиков" в миллисекундах, которые обновляются каждым сервисом на каждом
хопе:
- Суммарное время обработки по цепочке (processing).
- Суммарное время ожидания в очередях Kafka между хопами (wait).
- Итоговая метрика total = processing + wait.
- Для вычисления Command Processing End-to-End Latency события, от команды нужны два источника:
chain-origin-created-at
(метка времени исходной команды);- накопительные счётчики из события (или контрольная разница
event.created-at
−chain-origin-created-at
).
- Для локальной диагностики на "последнем хопе" добавляем только длительность текущего этапа (
chain-stage-duration-ms
). Идентификатор сервиса берём из уже существующегоsource-service-id
. - Для алертов и SLO достаточно контролировать p95/p99 и max@chain-total-ms на исходящих событиях.
5. Принципы измерения и устойчивости¶
- Clock-skew и доверие к внешним часам:
- Времена обработки на каждом хопе измеряем локально как дельты в миллисекундах (монотонные часы1), и только сумму дельт переносим в заголовки → это устойчиво к рассинхронизации часов между сервисами.
- Ожидание в очереди считаем как wall‑clock1 разницу:
stage_start_wall_time
−consumed_message.created-at
. Это отражает фактическую задержку в Kafka и паузы потребителей.
- Источник "начала цепочки"
chain-origin-created-at
переносим сквозь всю цепочку. Если клиент не заполнил — устанавливаем в первом внутреннем потребителе изconsumed.created-at
. - Итоговая проверка:
- Для события:
chain-total-ms
должен практически совпадать с (chain-processing-ms
+chain-wait-ms
), допустимы небольшие расхождения из‑за точности часов.
- Для события:
- Fan-out/fan-in:
- При разветвлении (fan-out) дети наследуют
chain-origin-created-at
и накопительные счётчики родителя. - При слиянии (fan-in) берём:
chain-origin-created-at
= минимальное среди родителей;chain-processing-ms
= максимум среди родителей + собственная обработка;chain-wait-ms
= максимум среди родителей + собственное ожидание;
- Это не занижает
total
и остаётся консервативной (верхней) оценкой.
- При разветвлении (fan-out) дети наследуют
6. Состав и правила¶
Note
Здесь и далее подразумевается, что к каждому заголовку добавлятся префикс x-1234-abc-
, где 1234-abc
— это
идентификатор Системы.
Поддержка int языками программирования Golang, Java, Python
Язык | int8 (max) | int16 (max) | int32 (max) | int64 (max) | Поддержка типов |
---|---|---|---|---|---|
Golang | 127 | 32767 | 2147483647 | 9223372036854775807 | Полная |
Java | 127 | 32767 | 2147483647 | 9223372036854775807 | Полная |
Python | - | - | - | - | Нет (только int) |
1. "Ёлочка": заголовки для решения первостепенных задач¶
Для отслеживания Command Processing End-to-End Latency всей цепочки используется chain-total-ms
. Значение получаем
через расчёт временных меток (created-at − chain-origin-created-at)
(1).
- Помним и принимаем фундаментальные ограничения времени в распределенных системах. См. Time Matters.
Имя заголовка | Тип | Правило заполнения / семантика | Обязат. для public.{commands/events} | Обязат. для internal.{commands/events} |
---|---|---|---|---|
chain-origin-created-at |
UTC в формате RFC 3339 с миллисекундами, суффикс Z. Пример: 2025-09-24T12:34:56.789Z . |
Временная метка "начала цепочки". Для внешней команды — равно её created-at ; для любых производных сообщений копируется без изменений.Если отсутствует у входящей команды — установить при первом потреблении = consumed.created-at . |
рекомендован для входящих команд да для производных исходящих событий |
да для всех производных сообщений |
chain-total-ms |
int64 (миллисекунды) | Должен равняться (created-at − chain-origin-created-at) . Пересчитывается продюсером на каждом хопе перед отправкой. |
да для производных исходящих событий | да для всех производных сообщений |
Примечания:
- Заголовок
chain-origin-created-at
должен копироваться каждым микросервисом в цепочке при публикации нового сообщения. - Формат меток времени: согласно Time Matters: Выбор формата меток времени для заголовков Kafka.
2. "Игрушки": заголовки для решения второстепенных задач¶
Заголовки для отслеживания времени "отлеживания" в топиках и времени компьюта в микросервисах:
Имя заголовка | Тип | Правило заполнения / семантика | Обязат. для public.{commands/events} | Обязат. для internal.{commands/events} |
---|---|---|---|---|
chain-processing-ms |
int64 (миллисекунды) | Сумма обработок по всем пройденным хопам. Каждый сервис добавляет (t_end_monotonic − t_start_monotonic) .Если заголовка нет — считать 0. |
нет | да для всех производных сообщений |
chain-wait-ms |
int64 (миллисекунды) | Сумма ожиданий в очередях Kafka между хопами. На каждом хопе добавить max(0, now_wall − consumed.created-at) .Если заголовка нет — считать 0. |
нет | да для всех производных сообщений |
chain-stage-duration-ms |
int64 (миллисекунды) | Длительность обработки текущим сервисом: (t_end_monotonic − t_start_monotonic) .Для определения "кто обрабатывал" используется source-service-id . |
нет | да для всех производных сообщений |
Примечания:
chain-*
заголовки должны копироваться и обновляться каждым микросервисом при публикации нового сообщения.- Формат меток времени: согласно Time Matters: Выбор формата меток времени для заголовков Kafka.
Заголовки для отслеживания длины цепочки команд/событий и разрыва петель:
Имя заголовка | Тип | Правило заполнения / семантика | Обязат. для public.{commands/events} | Обязат. для internal.{commands/events} |
---|---|---|---|---|
chain-hop-count |
int8 | Текущее число хопов в цепочке. Если отсутствует у входящего сообщения — установить при первом потреблении = 1. На каждом хопе увеличивать на 1. Retry != хоп. |
нет | да для всех сообщений |
chain-ttl-hops |
int8 | Верхняя граница допустимых хопов (например, 20). На каждом хопе: декремент на 1 + проверка на исчерпание для защиты от петель. При достижении нуля — отправка в DLQ с меткой причины loop_protection . |
нет | да для всех сообщений |
4. Алгоритм обновления заголовков¶
Обновляют каждый микросервис, каждый хоп (retry != хоп).
1. "Ёлочка": для решения первостепенных задач¶
- На входе (consume):
- Считать
consumed.created-at
. - Считать
chain-origin-created-at
; если отсутствует —chain-origin-created-at
:=consumed.created-at
.
- Считать
- Обработка: выполнить бизнес-логику.
- На выходе (produce):
- Создать/обновить переменные:
new_created-at
:= текущее wall‑время.new_chain_total
:=new_created-at
−chain-origin-created-at
.
- Записать заголовки:
chain-origin-created-at
:=chain-origin-created-at
(pass-through).created-at
:=new_created-at
.chain-total-ms
: =new_chain_total
.
- Создать/обновить переменные:
2. "Игрушки": для решения второстепенных задач¶
Дополнительно к "ёлочке":
- На входе (consume):
chain-processing-ms
:= заголовок или 0.chain-wait-ms
:= заголовок или 0.chain-hop-count
:= заголовок или 1.chain-ttl-hops
:= заголовок или 19.now_wall
:= текущее wall‑время;t_start_mono
:= текущее монотонное время.wait_delta
:= max(0,now_wall
−consumed.created-at
) в мс.
- Проверка на петли и шторм ретраев:
- Если
chain-ttl-hops
== 0, то отправка в DLQ с меткой причиныloop_protection
.
- Если
- Обработка: выполнить бизнес-логику.
- На выходе (produce):
- Создать/обновить переменные:
t_end_mono
:= текущее монотонное время.stage_delta
:= (t_end_mono
−t_start_mono
) в мс.new_chain_processing
:=chain-processing-ms
+stage_delta
.new_chain_wait
:=chain-wait-ms
+wait_delta
.
- Записать заголовки:
chain-processing-ms
:=new_chain_processing
.chain-wait-ms
:=new_chain_wait
.chain-stage-duration-ms
:=stage_delta
.chain-hop-count
:= инкремент на 1.chain-ttl-hops
: = декремент на 1.
- Создать/обновить переменные:
5. Пример цепочки "входящая команда → обработка → исходящее событие"¶
Пример для "ёлочки":
Хоп | Заголовок | Значение |
---|---|---|
1. Команда от клиента клиент → публикует public.command в командный топик Kafka |
created-at |
2025-09-17T10:00:00.000Z |
2. Микросервис-обработчик A (ожидание в топике 120 мс; обработка 80 мс) → публикует intenral.{event/command} |
created-at |
2025-09-17T10:00:00.200Z |
chain-origin-created-at |
2025-09-17T10:00:00.000Z | |
chain-total-ms |
200 | |
3. Микросервис-обработчик B (ожидание в топике 60 мс; обработка 40 мс) → публикует intenral.{event/command} |
created-at |
2025-09-17T10:00:00.300Z |
chain-origin-created-at |
2025-09-17T10:00:00.000Z | |
chain-total-ms |
300 | |
4. Микросервис-обработчик С (ожидание в топике 40 мс; обработка 60 мс) → публикует public.event |
created-at |
2025-09-17T10:00:00.400Z |
chain-origin-created-at |
2025-09-17T10:00:00.000Z | |
chain-total-ms |
400 |
Пример для "игрушек":
Хоп | Заголовок | Значение |
---|---|---|
1. Команда от клиента клиент → публикует public.command в командный топик Kafka |
created-at |
2025-09-17T10:00:00.000Z |
2. Микросервис-обработчик A (ожидание в топике 120 мс; обработка 80 мс) → публикует intenral.{event/command} |
created-at |
2025-09-17T10:00:00.200Z |
chain-origin-created-at |
2025-09-17T10:00:00.000Z | |
chain-stage-duration-ms |
80 | |
chain-processing-ms |
80 | |
chain-wait-ms |
120 | |
chain-total-ms |
200 | |
chain-hop-count |
2 | |
chain-ttl-hops |
18 | |
3. Микросервис-обработчик B (ожидание в топике 60 мс; обработка 40 мс) → публикует internal.{event/command} |
created-at |
2025-09-17T10:00:00.300Z |
chain-origin-created-at |
2025-09-17T10:00:00.000Z | |
chain-stage-duration-ms |
40 | |
chain-processing-ms |
120 | |
chain-wait-ms |
180 | |
chain-total-ms |
300 | |
chain-hop-count |
3 | |
chain-ttl-hops |
17 | |
4. Микросервис-обработчик С (ожидание в топике 40 мс; обработка 60 мс) → публикует public.event |
created-at |
2025-09-17T10:00:00.400Z |
chain-origin-created-at |
2025-09-17T10:00:00.000Z | |
chain-total-ms |
400 |
В обоих случаях цепочка завершается чтением сообщения клиентом.
Для расчёта Command Processing End-to-End Latency всей цепочки клиент прибавляет к chain-total-ms
время,
потребовавшееся ему для вычитки сообщения из топика: (текущее wall‑время - chain-origin-created-at
).
Для расчёта SLO по времени обработки команды Системой вычисляется разница (created-at
- chain-origin-created-at
).
В качестве SLI и источника для алертов используется chain-total-ms
.
Во втором случае внутри Системы в заголовках Kafka передается дополнительная информация и осуществляется предотвращение возникновения петель.
Описанный выше подход решает все задачи для достижения поставленной цели.
6. Что ещё можно сделать¶
Можно в chain-заголовках хранить не последние/накопительные значения, а историю по каждому хопу между микросервисами с соблюдением порядка:
Заголовок | Значение |
---|---|
chain-processing-ms |
[50, 100, 150, 200] |
chain-wait-ms |
[100, 200, 300, 400] |
chain-total-ms |
[150, 450, 900, 1500] |
В этом случае:
- Заголовок
chain-stage-duration-ms
не потребуется, т.к. время компьюта на последнем хопе будет записано последним значением в массивchain-processing-ms
. - Заголовок
chain-hop-count
не потребуется, т.к. всегда можно вычислить длину массива вchain-total-ms
. chain-total-ms
будет вычисляться как суммаchain-processing-ms
иchain-wait-ms
. Для определения Command Processing End-to-End Latency берётся последнее значение из массива.
Для полной картины может потребоваться добавление ещё 2-х заголовков для отслеживания микросервисов в цепочке:
Заголовок | Значение |
---|---|
chain-source-system-id |
["1234-abc", "1234-abc", "1234-abc", "1234-abc" |
chain-source-service-id |
["microservice-a", "microservice-b", "microservice-c", "microservice-d"] |
К плюсам данного подхода можно отнести лучшую наблюдаемость: сразу видно всю историю и вклад в latency каждого хопа.
К минусам:
- Разрастание размера заголовков сообщений Kafka. Заголовки являются частью общего сообщения, поэтому они
подчиняются ограничению
message.max.bytes
(брокер; по умолчанию 1 МБ) иmax.request.size
(продюсер). Превысили размер — получили ошибку producer. Разработчикам придётся знать и помнить об этом. - Дополнительные трудозатраты на реализацию.
Что дальше?