diff --git a/ydb/docs/ru/core/concepts/streaming_query/toc_i.yaml b/ydb/docs/ru/core/concepts/streaming_query/toc_i.yaml index c6495b91f2db..22d1cc172944 100644 --- a/ydb/docs/ru/core/concepts/streaming_query/toc_i.yaml +++ b/ydb/docs/ru/core/concepts/streaming_query/toc_i.yaml @@ -2,3 +2,4 @@ items: - { name: Обзор, href: index.md } - { name: Форматы данных, href: formats.md } - { name: Чекпойнты, href: checkpoints.md } +- { name: Водяные знаки, href: watermarks.md } diff --git a/ydb/docs/ru/core/concepts/streaming_query/watermarks.md b/ydb/docs/ru/core/concepts/streaming_query/watermarks.md new file mode 100644 index 000000000000..54dcc94e7c0f --- /dev/null +++ b/ydb/docs/ru/core/concepts/streaming_query/watermarks.md @@ -0,0 +1,9 @@ +# Водяные знаки + +Каждое [событие](../datamodel/topic.md#message) в системе потоковой обработки данных имеет ассоциированную с ним временную метку. Эта метка может равняться времени чтения события из [топика](../datamodel/topic.md), может быть получена из данных внутри события или из метаданных [топика](../datamodel/topic.md). + +Поверх этого времени события можно делать сортировку на потоке (внутри [MATCH_RECOGNIZE](../../yql/reference/syntax/select/match_recognize.md#order_by)) или агрегацию на временнОм окне ([GROUP BY HoppingWindow](../../yql/reference/syntax/select/group-by.md#hopping_window)). Эти потоковые операции должны знать текущее время, чтобы на основе этой информации генерировать выходные данные и делать это в режиме реального времени. Времени, получаемого из события, не всегда достаточно, так как события могут приходить нерегулярно или отфильтровываться на более ранних этапах. + +Для решения этой проблемы нужен водяной знак. Это наибольшая временная метка, которая гарантированно находится в прошлом для каждой партиции. + +В условиях распределенных систем, когда часы на разных устройствах могут дрейфовать, а данные - задерживаться из-за проблем с сетью, водяной знак не может полагаться на монотонное возрастание времени событий даже в рамках одной партиции. Для этого в формулу расчета водяного знака закладывается отставание. Сейчас водяной знак считается как время записи события в топик, уменьшенное на 5 секунд. diff --git a/ydb/docs/ru/core/yql/reference/syntax/select/group-by.md b/ydb/docs/ru/core/yql/reference/syntax/select/group-by.md index 8411f4a40e62..f19f9dfbbd1b 100644 --- a/ydb/docs/ru/core/yql/reference/syntax/select/group-by.md +++ b/ydb/docs/ru/core/yql/reference/syntax/select/group-by.md @@ -269,7 +269,7 @@ LIMIT 3; {% endif %} - ## GROUP BY ... HOP +## GROUP BY HOP {#hop} Сгруппировать таблицу по значениям указанных столбцов или выражений, а также подмножества по времени (окно времени). @@ -334,6 +334,48 @@ GROUP BY HOP(ts, "PT1М", "PT1M", "PT1M"); ``` +## GROUP BY HoppingWindow {#hopping_window} + +Новая версия [GROUP BY HOP](#hop) + +{% if select_command == "SELECT STREAM" %} +Отличается от предшественника тем, что не требует указания аргумента `delay` из-за обязательного использования [водяных знаков](../../../../concepts/streaming_query/watermarks.md) +{% else %} +Отличается от предшественника тем, что не требует указания игнорируемого аргумента `delay` +{% endif %} + +### Пример + +{% if select_command == "SELECT STREAM" %} +```yql +SELECT + key, + COUNT(*) +FROM my_topic +WITH ( + FORMAT = json_each_row, + SCHEMA ( + key String, + subkey String, + value String + ) +) +GROUP BY + key, + HoppingWindow(CAST(subkey AS Timestamp), "PT10S", "PT1M"); +``` +{% else %} +```yql +SELECT + key, + COUNT(*) +FROM my_table +GROUP BY + key, + HoppingWindow(CAST(subkey AS Timestamp), "PT10S", "PT1M"); +``` +{% endif %} + ## HAVING {#having} Фильтрация выборки `SELECT` по результатам вычисления [агрегатных функций](../../builtins/aggregation.md). Синтаксис аналогичен конструкции [`WHERE`](where.md). diff --git a/ydb/docs/ru/core/yql/reference/syntax/select/with.md b/ydb/docs/ru/core/yql/reference/syntax/select/with.md index 17df1d29560b..ee4fb981614f 100644 --- a/ydb/docs/ru/core/yql/reference/syntax/select/with.md +++ b/ydb/docs/ru/core/yql/reference/syntax/select/with.md @@ -24,6 +24,13 @@ * `projection.enabled` - флаг включения [расширенного партиционирования данных](../../../../concepts/federated_query/s3/partition_projection.md). Допустимые значения: `true`, `false`. * `projection..type` - тип поля [расширенного партиционирования данных](../../../../concepts/federated_query/s3/partition_projection.md). Допустимые значения: `integer`, `enum`, `date`. * `projection..` - расширенные свойства поля [расширенного партиционирования данных](../../../../concepts/federated_query/s3/partition_projection.md). +{% if select_command == "SELECT STREAM" %} +* `WATERMARK LATE EVENTS POLICY` - политика, определяющая реакцию на событие с временем меньшим, чем [водяной знак](../../../../concepts/streaming_query/watermarks.md). Значение по умолчанию - `WATERMARK_ADJUST_LATE_EVENTS`. Выбрать что-то одно: + * `WATERMARK_ADJUST_LATE_EVENTS` - если у события время меньше, чем [водяной знак](../../../../concepts/streaming_query/watermarks.md), то время этого события исправляется на значение [водяного знака](../../../../concepts/streaming_query/watermarks.md); + * `WATERMARK_DROP_LATE_EVENTS` - отбросить событие с временем меньшим, чем [водяной знак](../../../../concepts/streaming_query/watermarks.md); +* `WATERMARK_GRANULARITY` - периодичность генерации водяных знаков. Чем она меньше, тем больше потребление CPU запросом и тем меньше задержка ответа, и наоборот. Значение по умолчанию - 1 секунда; +* `WATERMARK_IDLE_TIMEOUT` - период, после которого партиция без данных будет исключена из вычисления объединенного водяного знака. Значение по умолчанию - 5 секунд. +{% endif %} {% endif %} @@ -60,3 +67,19 @@ SELECT key, value FROM my_table WITH COLUMNS Struct; ```yql SELECT key, value FROM EACH($my_tables) WITH SCHEMA Struct>; ``` + +{% if select_command == "SELECT STREAM" %} +```yql +SELECT * +FROM my_topic +WITH ( + FORMAT = json_each_row, + SCHEMA ( + ts String + ), + WATERMARK_ADJUST_LATE_EVENTS, + WATERMARK_GRANULARITY="PT1S", + WATERMARK_IDLE_TIMEOUT="PT5S" +); +``` +{% endif %}