Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ydb/docs/ru/core/concepts/streaming_query/toc_i.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@ items:
- { name: Обзор, href: index.md }
- { name: Форматы данных, href: formats.md }
- { name: Чекпойнты, href: checkpoints.md }
- { name: Водяные знаки, href: watermarks.md }
9 changes: 9 additions & 0 deletions ydb/docs/ru/core/concepts/streaming_query/watermarks.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# Водяные знаки

Каждое [событие](../datamodel/topic.md#message) в системе потоковой обработки данных имеет ассоциированную с ним временную метку. Эта метка может равняться времени чтения события из [топика](../datamodel/topic.md), может быть получена из данных внутри события или из метаданных [топика](../datamodel/topic.md).

Поверх этого времени события можно делать сортировку на потоке (внутри [MATCH_RECOGNIZE](../../yql/reference/syntax/select/match_recognize.md#order_by)) или агрегацию на временнОм окне ([GROUP BY HoppingWindow](../../yql/reference/syntax/select/group-by.md#hopping_window)). Эти потоковые операции должны знать текущее время, чтобы на основе этой информации генерировать выходные данные и делать это в режиме реального времени. Времени, получаемого из события, не всегда достаточно, так как события могут приходить нерегулярно или отфильтровываться на более ранних этапах.

Для решения этой проблемы нужен водяной знак. Это наибольшая временная метка, которая гарантированно находится в прошлом для каждой партиции.

В условиях распределенных систем, когда часы на разных устройствах могут дрейфовать, а данные - задерживаться из-за проблем с сетью, водяной знак не может полагаться на монотонное возрастание времени событий даже в рамках одной партиции. Для этого в формулу расчета водяного знака закладывается отставание. Сейчас водяной знак считается как время записи события в топик, уменьшенное на 5 секунд.
44 changes: 43 additions & 1 deletion ydb/docs/ru/core/yql/reference/syntax/select/group-by.md
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ LIMIT 3;

{% endif %}

## GROUP BY ... HOP
## GROUP BY HOP {#hop}

Сгруппировать таблицу по значениям указанных столбцов или выражений, а также подмножества по времени (окно времени).

Expand Down Expand Up @@ -334,6 +334,48 @@ GROUP BY
HOP(ts, "PT1М", "PT1M", "PT1M");
```

## GROUP BY HoppingWindow {#hopping_window}

Новая версия [GROUP BY HOP](#hop)

{% if select_command == "SELECT STREAM" %}
Отличается от предшественника тем, что не требует указания аргумента `delay` из-за обязательного использования [водяных знаков](../../../../concepts/streaming_query/watermarks.md)
{% else %}
Отличается от предшественника тем, что не требует указания игнорируемого аргумента `delay`
{% endif %}

### Пример

{% if select_command == "SELECT STREAM" %}
```yql
SELECT
key,
COUNT(*)
FROM my_topic
WITH (
FORMAT = json_each_row,
SCHEMA (
key String,
subkey String,
value String
)
)
GROUP BY
key,
HoppingWindow(CAST(subkey AS Timestamp), "PT10S", "PT1M");
```
{% else %}
```yql
SELECT
key,
COUNT(*)
FROM my_table
GROUP BY
key,
HoppingWindow(CAST(subkey AS Timestamp), "PT10S", "PT1M");
```
{% endif %}

## HAVING {#having}

Фильтрация выборки `SELECT` по результатам вычисления [агрегатных функций](../../builtins/aggregation.md). Синтаксис аналогичен конструкции [`WHERE`](where.md).
Expand Down
23 changes: 23 additions & 0 deletions ydb/docs/ru/core/yql/reference/syntax/select/with.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,13 @@
* `projection.enabled` - флаг включения [расширенного партиционирования данных](../../../../concepts/federated_query/s3/partition_projection.md). Допустимые значения: `true`, `false`.
* `projection.<field_name>.type` - тип поля [расширенного партиционирования данных](../../../../concepts/federated_query/s3/partition_projection.md). Допустимые значения: `integer`, `enum`, `date`.
* `projection.<field_name>.<options>` - расширенные свойства поля [расширенного партиционирования данных](../../../../concepts/federated_query/s3/partition_projection.md).
{% if select_command == "SELECT STREAM" %}
* `WATERMARK LATE EVENTS POLICY` - политика, определяющая реакцию на событие с временем меньшим, чем [водяной знак](../../../../concepts/streaming_query/watermarks.md). Значение по умолчанию - `WATERMARK_ADJUST_LATE_EVENTS`. Выбрать что-то одно:
* `WATERMARK_ADJUST_LATE_EVENTS` - если у события время меньше, чем [водяной знак](../../../../concepts/streaming_query/watermarks.md), то время этого события исправляется на значение [водяного знака](../../../../concepts/streaming_query/watermarks.md);
* `WATERMARK_DROP_LATE_EVENTS` - отбросить событие с временем меньшим, чем [водяной знак](../../../../concepts/streaming_query/watermarks.md);
* `WATERMARK_GRANULARITY` - периодичность генерации водяных знаков. Чем она меньше, тем больше потребление CPU запросом и тем меньше задержка ответа, и наоборот. Значение по умолчанию - 1 секунда;
* `WATERMARK_IDLE_TIMEOUT` - период, после которого партиция без данных будет исключена из вычисления объединенного водяного знака. Значение по умолчанию - 5 секунд.
{% endif %}

{% endif %}

Expand Down Expand Up @@ -60,3 +67,19 @@ SELECT key, value FROM my_table WITH COLUMNS Struct<value:Int32?>;
```yql
SELECT key, value FROM EACH($my_tables) WITH SCHEMA Struct<key:String, value:List<Int32>>;
```

{% if select_command == "SELECT STREAM" %}
```yql
SELECT *
FROM my_topic
WITH (
FORMAT = json_each_row,
SCHEMA (
ts String
),
WATERMARK_ADJUST_LATE_EVENTS,
WATERMARK_GRANULARITY="PT1S",
WATERMARK_IDLE_TIMEOUT="PT5S"
);
```
{% endif %}
Loading