Движок распределённой таблицы
Чтобы создать движок распределённой таблицы в ClickHouse Cloud, можно использовать табличные функции remote и remoteSecure.
Синтаксис Distributed(...) в ClickHouse Cloud использовать нельзя.
Таблицы с движком Distributed не хранят собственные данные, но позволяют выполнять распределённую обработку запросов на нескольких серверах. Чтение автоматически распараллеливается. Во время чтения используются индексы таблиц на удалённых серверах, если они есть.
Создание таблицы
Из таблицы
Когда таблица Distributed указывает на таблицу на текущем сервере, вы можете заимствовать её схему:
Параметры движка Distributed
| Параметр | Описание |
|---|---|
cluster | Имя кластера в конфигурационном файле сервера |
database | Имя удалённой базы данных |
table | Имя удалённой таблицы |
sharding_key (необязательно) | Ключ шардинга. Указание sharding_key необходимо в следующих случаях:
|
policy_name (необязательно) | Имя политики, которое будет использоваться для хранения временных файлов при фоновой отправке |
См. также
- настройка distributed_foreground_insert
- MergeTree для примеров
Параметры Distributed
| Параметр | Описание | Значение по умолчанию |
|---|---|---|
fsync_after_insert | Выполнять fsync для данных файла после фоновой вставки в Distributed. Гарантирует, что ОС сбросила все вставленные данные в файл на диске инициирующего узла. | false |
fsync_directories | Выполнять fsync для каталогов. Гарантирует, что ОС обновила метаданные каталога после операций, связанных с фоновыми вставками в таблицу Distributed (после вставки, после отправки данных на шард и т. д.). | false |
skip_unavailable_shards | Если true, ClickHouse молча пропускает недоступные шарды. Шард помечается как недоступный, когда: 1) к нему нет доступа из‑за ошибки соединения; 2) шард не может быть разрешён через DNS; 3) таблица не существует на шарде. | false |
bytes_to_throw_insert | Если количество сжатых байт, ожидающих фоновой операции INSERT, превысит это значение, будет выброшено исключение. 0 — не выбрасывать. | 0 |
bytes_to_delay_insert | Если количество сжатых байт, ожидающих фоновой операции INSERT, превысит это значение, выполнение запроса будет задержано. 0 — не задерживать. | 0 |
max_delay_to_insert | Максимальная задержка вставки данных в таблицу Distributed в секундах, если имеется много данных (байт), ожидающих фоновой отправки. | 60 |
background_insert_batch | То же, что и distributed_background_insert_batch. | 0 |
background_insert_split_batch_on_failure | То же, что и distributed_background_insert_split_batch_on_failure. | 0 |
background_insert_sleep_time_ms | То же, что и distributed_background_insert_sleep_time_ms. | 0 |
background_insert_max_sleep_time_ms | То же, что и distributed_background_insert_max_sleep_time_ms. | 0 |
flush_on_detach | Сбрасывать данные на удалённые узлы при DETACH/DROP/остановке сервера. | true |
Параметры надёжности (fsync_...):
- Влияют только на фоновые
INSERT(то есть приdistributed_foreground_insert=false), когда данные сначала сохраняются на диск инициирующего узла, а затем в фоне отправляются на шарды. - Могут существенно снизить производительность
INSERT. - Влияют на запись данных, хранящихся внутри каталога распределённой таблицы, на узел, который принял вашу вставку. Если вам нужны гарантии записи данных в базовые таблицы MergeTree, см. параметры надёжности (
...fsync...) вsystem.merge_tree_settings
Для параметров ограничений вставки (..._insert) см. также:
- параметр
distributed_foreground_insert - параметр
prefer_localhost_replica bytes_to_throw_insertобрабатывается раньше, чемbytes_to_delay_insert, поэтому его не следует устанавливать в значение меньше значенияbytes_to_delay_insert.
Пример
Данные будут читаться со всех серверов в кластере logs из таблицы default.hits, расположенной на каждом сервере кластера. Данные не только читаются, но и частично обрабатываются на удалённых серверах (насколько это возможно). Например, для запроса с GROUP BY данные будут агрегироваться на удалённых серверах, а промежуточные состояния агрегатных функций будут отправлены на сервер, выполняющий запрос. Затем данные будут дополнительно агрегированы.
Вместо имени базы данных вы можете использовать константное выражение, которое возвращает строку. Например: currentDatabase().
Кластеры
Кластеры настраиваются в конфигурационном файле сервера:
Здесь определяется кластер с именем logs, который состоит из двух шардов, каждый из которых содержит по две реплики. Шарды — это серверы, в которых хранятся разные части данных (чтобы прочитать все данные, необходимо обратиться ко всем шардам). Реплики — это серверы-дубликаты (чтобы прочитать все данные, можно обратиться к данным на любой из реплик).
Имена кластеров не должны содержать точек.
Для каждого сервера указываются параметры host, port, а также, при необходимости, user, password, secure, compression, bind_host:
| Parameter | Description | Default Value |
|---|---|---|
host | Адрес удалённого сервера. Можно использовать либо доменное имя, либо адрес IPv4 или IPv6. Если указать доменное имя, сервер при запуске выполнит DNS-запрос, и результат будет храниться, пока сервер работает. Если DNS-запрос завершится ошибкой, сервер не запустится. Если вы измените DNS-запись, перезапустите сервер. | - |
port | TCP-порт для обмена сообщениями (tcp_port в конфигурации, обычно 9000). Не путайте с http_port. | - |
user | Имя пользователя для подключения к удалённому серверу. У этого пользователя должны быть права на подключение к указанному серверу. Доступ настраивается в файле users.xml. Дополнительную информацию см. в разделе Права доступа. | default |
password | Пароль для подключения к удалённому серверу (не маскируется). | '' |
secure | Использовать ли защищённое SSL/TLS-соединение. Обычно также требуется указать порт (защищённый порт по умолчанию — 9440). Сервер должен прослушивать <tcp_port_secure>9440</tcp_port_secure> и быть настроен с корректными сертификатами. | false |
compression | Использовать сжатие данных. | true |
bind_host | Исходный адрес, который следует использовать при подключении к удалённому серверу с этого узла. Поддерживаются только адреса IPv4. Предназначен для сложных сценариев развертывания, когда необходимо задать исходный IP-адрес, используемый ClickHouse для распределённых запросов. | - |
При указании реплик одна из доступных реплик будет выбрана для каждого шарда при чтении. Можно настроить алгоритм балансировки нагрузки (предпочтения при выборе реплики для доступа) — см. настройку load_balancing. Если не удалось установить подключение с сервером, выполняется попытка подключения с коротким таймаутом. Если подключиться не удалось, выбирается следующая реплика и так далее для всех реплик. Если попытка подключения не удалась для всех реплик, попытка будет повторена тем же образом несколько раз. Это повышает устойчивость, но не обеспечивает полной отказоустойчивости: удалённый сервер может принять подключение, но не работать или работать некорректно.
Можно указать только один шард (в этом случае обработку запроса корректнее называть удалённой, а не распределённой) или любое количество шардов. В каждом шарде можно указать от одной до любого количества реплик. Можно задать разное количество реплик для каждого шарда.
В конфигурации можно указать произвольное количество кластеров.
Для просмотра ваших кластеров используйте таблицу system.clusters.
Движок Distributed позволяет работать с кластером как с локальным сервером. Однако конфигурацию кластера нельзя задавать динамически — её необходимо настроить в конфигурационном файле сервера. Обычно все серверы в кластере имеют одинаковую конфигурацию кластера (хотя это и не обязательно). Кластеры из конфигурационного файла обновляются на лету, без перезапуска сервера.
Если при каждом выполнении запроса нужно отправлять его в неизвестный заранее набор шардов и реплик, нет необходимости создавать таблицу Distributed — вместо этого используйте табличную функцию remote. См. раздел Table functions.
Запись данных
Существует два способа записи данных в кластер:
Во-первых, вы можете определить, на какие серверы записывать какие данные, и выполнять запись непосредственно на каждый шард. Другими словами, выполнять прямые запросы INSERT в удалённые таблицы в кластере, на которые указывает таблица Distributed. Это наиболее гибкое решение, поскольку вы можете использовать любую схему шардинга, даже нетривиальную, в силу требований предметной области. Это также наиболее оптимальное решение, так как данные могут записываться в разные шарды полностью независимо.
Во-вторых, вы можете выполнять запросы INSERT в таблицу Distributed. В этом случае таблица будет самостоятельно распределять вставленные данные по серверам. Чтобы выполнять запись в таблицу Distributed, у неё должен быть настроен параметр sharding_key (кроме случая, когда существует только один шард).
Для каждого шарда в конфигурационном файле может быть задан <weight>. По умолчанию вес равен 1. Данные распределяются по шардам в объёме, пропорциональном весу шарда. Все веса шардов суммируются, затем вес каждого шарда делится на эту сумму для определения доли каждого шарда. Например, если есть два шарда и первый имеет вес 1, а второй — вес 2, первому будет отправляться одна треть (1 / 3) вставляемых строк, а второму — две трети (2 / 3).
Для каждого шарда в конфигурационном файле может быть задан параметр internal_replication. Если этот параметр установлен в true, операция записи выбирает первую «здоровую» реплику и записывает данные в неё. Используйте это, если таблицы, лежащие в основе таблицы Distributed, являются реплицируемыми таблицами (например, любые движки таблиц Replicated*MergeTree). Одна из реплик таблицы получит запись, и она будет автоматически реплицирована на остальные реплики.
Если internal_replication установлено в false (значение по умолчанию), данные записываются во все реплики. В этом случае таблица Distributed сама реплицирует данные. Это хуже, чем использование реплицируемых таблиц, потому что согласованность реплик не проверяется, и со временем они будут содержать немного отличающиеся данные.
Для выбора шарда, на который будет отправлена строка данных, анализируется выражение шардинга и берётся остаток от его деления на суммарный вес шардов. Строка отправляется на шард, соответствующий полуинтервалу остатков от prev_weights до prev_weights + weight, где prev_weights — суммарный вес шардов с наименьшими номерами, а weight — вес данного шарда. Например, если есть два шарда, и первый имеет вес 9, а второй — вес 10, строка будет отправлена на первый шард для остатков из диапазона [0, 9), и на второй — для остатков из диапазона [9, 19).
Выражение шардинга может быть любым выражением на основе констант и столбцов таблицы, которое возвращает целое число. Например, вы можете использовать выражение rand() для случайного распределения данных или UserID для распределения по остатку от деления идентификатора пользователя (тогда данные одного пользователя будут находиться на одном шарде, что упрощает выполнение IN и JOIN по пользователям). Если один из столбцов распределяется недостаточно равномерно, вы можете обернуть его в хеш-функцию, например intHash64(UserID).
Простой остаток от деления — ограниченное решение для шардинга и подходит не всегда. Он работает для средних и больших объёмов данных (десятки серверов), но не для очень больших объёмов данных (сотни серверов и более). В последнем случае используйте схему шардинга, требуемую предметной областью, вместо использования таблиц Distributed.
О схеме шардинга следует задуматься в следующих случаях:
- Используются запросы, требующие объединения данных (
INилиJOIN) по определённому ключу. Если данные распределены по сегментам по этому ключу, можно использовать локальныеINилиJOINвместоGLOBAL INилиGLOBAL JOIN, что значительно эффективнее. - Используется большое количество серверов (сотни и более) с большим числом небольших запросов, например запросов к данным отдельных клиентов (например, веб-сайтов, рекламодателей или партнёров). Чтобы небольшие запросы не влияли на весь кластер, имеет смысл размещать данные одного клиента на одном шарде. В качестве альтернативы можно настроить двухуровневый шардинг: разделить весь кластер на «слои», где слой может состоять из нескольких шардов. Данные одного клиента располагаются на одном слое, но при необходимости в слой можно добавлять шарды, а данные внутри них распределяются случайным образом. Для каждого слоя создаются таблицы
Distributed, а для глобальных запросов создаётся одна общая distributed таблица.
Данные записываются в фоновом режиме. При вставке в таблицу блок данных просто записывается в локальную файловую систему. Данные отправляются на удалённые серверы в фоновом режиме как можно скорее. Периодичность отправки данных управляется настройками distributed_background_insert_sleep_time_ms и distributed_background_insert_max_sleep_time_ms. Движок Distributed отправляет каждый файл с вставленными данными отдельно, но можно включить пакетную отправку файлов с помощью настройки distributed_background_insert_batch. Эта настройка повышает производительность кластера за счёт более эффективного использования ресурсов локального сервера и сети. Следует проверять, были ли данные успешно отправлены, просматривая список файлов (данных, ожидающих отправки) в каталоге таблицы: /var/lib/clickhouse/data/database/table/. Количество потоков, выполняющих фоновые задачи, можно задать настройкой background_distributed_schedule_pool_size.
Если сервер был аварийно остановлен или пережил жёсткую перезагрузку (например, из‑за сбоя оборудования) после выполнения INSERT в таблицу Distributed, вставленные данные могут быть потеряны. Если в каталоге таблицы обнаруживается повреждённая часть данных, она переносится в подкаталог broken и больше не используется.
Чтение данных
При выполнении запроса к таблице Distributed запросы SELECT отправляются на все шарды и выполняются независимо от того, как распределены данные по шардам (они могут быть распределены полностью случайным образом). При добавлении нового шарда нет необходимости переносить в него старые данные. Вместо этого можно записывать в него новые данные, используя более высокий вес — данные будут распределены немного неравномерно, но запросы по-прежнему будут выполняться корректно и эффективно.
Когда включён параметр max_parallel_replicas, обработка запросов параллелизуется по всем репликам внутри одного шарда. Дополнительную информацию см. в разделе max_parallel_replicas.
Чтобы узнать больше о том, как обрабатываются распределённые запросы in и global in, см. документацию.
Виртуальные столбцы
_shard_num
_shard_num — содержит значение shard_num из таблицы system.clusters. Тип: UInt32.
См. также
- Описание виртуальных столбцов
- Настройка
background_distributed_schedule_pool_size - Функции
shardNum()иshardCount()