Технологии

Atomic insert in Clickhouse

В этой статье мы поговорим о том, как реализовать поведение атомарной вставки в ClickHouse. Рассмотрим несколько вариантов, подсветим их сильные и слабые стороны, а также, когда каждый из них применять. Задача Мы хотим добиться, чтобы не было случаев, когда мы начали вставку, а пользователь прочитал данные до её завершения и получил неактуальный (неполный) набор данных. Неатомарная вставка = риск чтения некорректного набора данных. Сценарии, когда такое может произойти: Удалили партицию и хотели начать вставлять данные взамен удалённой, но пользователь обратился к этому интервалу. Вставка данных оборвалась (из-за проблем с сетью, например). Вставляем данные батчами, и пользователь проверил наличие данных, но не проверил, что все батчи на месте — прочитал часть данных. Перейдём к реализации атомарной вставки в ClickHouse. Вариант 1. Настройка min_insert_block_size_rows Подход описан в статье от Altinity. Суть его заключается в том, что мы можем управлять размером батча при вставке данных и принудительно вставить данные единым блоком. Метод сработает, если при вставке мы создадим только 1 парт (какие критерии этого можно прочитать в статье) Скрипт для реализации подхода -- Создаем целевую таблицу drop table if exists core.data; create table if not exists core.data ( num_int Int64, partiton_num String ) engine = MergeTree order by num_int; -- Вставляем данные insert into core.data ( num_int, partiton_num ) select toInt64(number) num_int, intDiv(number, 1_000_000) as partiton_num from numbers(100_000_000); -- Видим что партов больше чем 1 SELECT count(), min(rows), max(rows), sum(rows) FROM system.parts WHERE (level = 0) AND (table = 'data'); -- count()|min(rows)|max(rows)|sum(rows)| -- -------+---------+---------+---------+ -- 96| 397345| 1048449|100000000| -- Пересоздаем целевую таблицу drop table if exists core.data; create table if not exists core.data ( num_int Int64, partiton_num String ) engine = MergeTree order by num_int; -- Вставляем данные но уже с настройкамии -- input_format_parallel_parsing=0, -- min_insert_block_size_bytes=0, -- min_insert_block_size_rows=1000000000 -- так мы задаем что если строк меньше чем 1_000_000_000 -- то данные будут вставлены единым партом insert into core.data ( num_int, partiton_num ) select toInt64(number) num_int, intDiv(number, 1_000_000) as partiton_num from numbers(100_000_000) settings input_format_parallel_parsing=0, min_insert_block_size_bytes=0, min_insert_block_size_rows=1_000_000_000; -- Видим что создался только 1 парт SELECT count(), min(rows), max(rows), sum(rows) FROM system.parts WHERE (level = 0) AND (table = 'data'); -- count()|min(rows)|max(rows)|sum(rows)| -- -------+---------+---------+---------+ -- 1|100000000|100000000|100000000| Кажется что можно остановиться на этом варианте, но как писал выше у него есть минусы и зоны применения. Плюсы: Лаконичная и простая реализация Минусы: Высокое потребление памяти при вставке, ниже видем что даже на "игрушечном" примере разница значительная (в 10 раз). Вызвано это тем что весь парт предварительно пишется в оперативную память event_time |memory_usage_gb|query | -------------------+---------------+------------------+ 2025-11-06 17:02:17| 3.5|with settings | 2025-11-06 17:04:13| 0.034|with out settings | Если захотим вставлять данные батчами, то вариант не подойдет Когда применять: Небольшие таблицы Единичная вствка в одну партицию Если скорость разработки у вас в приоритете, и вы готовы принять, то что будет использоваться оперативная память на вставку Вариант 2. Временная таблица + переименование Вариант заключается в том, что мы вставляем новые данные во временную таблицу, после чего меняем местами названия таблиц (временная становится основной, а основная — временной). Возможно это благодаря операции exchange в ClickHouse. exchange tables table1 and table2 Скрипт для реализации подхода -- Создаем целевую таблицу drop table if exists core.data; create table if not exists core.data ( num_int Int64, partiton_num String ) engine = MergeTree order by num_int; -- Наполняем целевую таблицу данными insert into core.data ( num_int, partiton_num ) select toInt64(number) num_int, intDiv(number, 1_000_000) as partiton_num from numbers(100_000_000); -- Видим что в ней числа от 0 до 99_999_999 select min(num_int), max(num_int), from core.data; -- Создаем временную таблицу drop table if exists stage.data_tmp; create table if not exists stage.data_tmp ( num_int Int64, partiton_num String ) engine = MergeTree order by num_int; -- Вставляем в нее числа в диапазоне 100_000_000 - 200_000_000 insert into stage.data_tmp ( num_int, partiton_num ) select toInt64(number) num_int, intDiv(number, 1_000_000) as partiton_num from numbers(100_000_000, 100_000_000); -- Меняем местами временную и целевую таблицы exchange tables core.data and stage.data_tmp; -- Видим что в целевой обновились данные -- и произошло это абсолютно атомарно select min(num_int), max(num_int) from core.data; Плюсы: Достаточно простая и понятная реализация. Можно откатиться, если снова переименовать таблицы (восстановить предыдущее состояние). Нет затрат на дополнительную оперативную память. Минусы: Нужна дополнительная таблица, в которой необходимо поддерживать консистентность полей с основной. Дополнительное место на диске (можно очищать tmp после переименования, и тогда этот минус почти незначительный). Когда применять: Таблицы ещё относительно небольшие, их можно полностью пересобрать. Вариант 3. Временная таблица + replace партиции Если мы не готовы полностью перезаписывать всю таблицу, мы можем доработать подход из варианта 2 и перезаписывать только нужную партицию. Операция replace partition в ClickHouse работает атомарно, то есть партиция обновится сразу, а не отдельными блоками. Скрипт для реализации подхода -- Создаем целевую таблицу -- Партиционируем таблицу (в 1й партиции миллион строк) drop table if exists core.data; create table if not exists core.data ( num_int Int64, partiton_num String ) engine = MergeTree partition by partiton_num order by num_int; -- Наполняем целевую таблицу данными -- от 1 до 100М insert into core.data ( num_int, partiton_num ) select toInt64(number) num_int, intDiv(number, 1_000_000) as partiton_num from numbers(100_000_000); -- Видим что в партиции с id '99' -- дежит диапазон 99М - 100М select min(num_int), max(num_int) from core.data where partiton_num = '99'; -- Создаем временную таблицу drop table if exists stage.data_tmp; create table if not exists stage.data_tmp ( num_int Int64, partiton_num String ) engine = MergeTree partition by partiton_num order by num_int; -- Для временной таблицы заполняем только партицию с id '99' -- Причем делаем диамазон 100М - 101М insert into stage.data_tmp ( num_int, partiton_num ) select toInt64(number) num_int, '99' as partiton_num from numbers(100_000_000, 1_000_000); -- Проверяем что вставился именно диапазон 100М - 101М select min(num_int), max(num_int) from stage.data_tmp where partiton_num = '99'; -- Делаем атомарную операцию replace partition -- Партиция '99' копируется из stage.data_tmp и заменяется в core.data alter table core.data replace partition '99' from stage.data_tmp; -- Видим что в целевой таблице данные изменены -- В партиции '99' диапазон 100М - 101М select min(num_int), max(num_int) from core.data where partiton_num = '99'; Плюсы: Перезапись короткого интервала, что чаще используется, чем обновление всей таблицы. Нет затрат на дополнительную оперативную память. Минусы: Нужна дополнительная таблица, в которой необходимо поддерживать консистентность полей с основной. Дополнительное место на диске (можно очищать tmp после переименования, что уменьшает минус). Когда применять: В случаях, когда используется стратегия обновления через перезапись партиций. Вариант 4. View с фильтрами Считаю, что поддерживать временную и целевую таблицы консистентными (одинаковый набор полей и их порядок) может быть затруднительно, особенно если вы синхронизируете их вручную. Часто пользователи обращаются к таблицам не напрямую, а через view, где может быть реализована легковесная логика. В таком случае мы можем использовать это view как инструмент атомарного обновления данных. Подход: Имеем таблицу и view поверх неё. Во view используем фильтр, который выбирает только актуальные данные. Например, фильтр может быть таким: partiton_num < 100. Добавляем новую партицию данных insert'ом в целевую таблицу. Пересоздаём view с обновлённым фильтром. Скрипт для реализации подхода -- Создаем целевую таблицу drop table if exists core.data; create table if not exists core.data ( num_int Int64, partiton_num Int16 ) engine = MergeTree partition by partiton_num order by num_int; -- Наполняем целевую таблицу данными -- от 1 до 100М insert into core.data ( num_int, partiton_num ) select toInt64(number) num_int, intDiv(number, 1_000_000) as partiton_num from numbers(100_000_000); -- Создаем view для целевой таблицы create or replace view core.v_data as select num_int, partiton_num from core.data where partiton_num < 100; -- Видим что доступны данные в интервале -- от 1 до 100М select min(num_int), max(num_int) from core.v_data; -- Вставляем еще блок данных в диапазоне 100М - 101М -- Для наглядности делаем это в 2 подхода -- 1й батч insert into core.data ( num_int, partiton_num ) select toInt64(number) num_int, '100' as partiton_num from numbers(100_000_000, 500_000); -- 2й батч insert into core.data ( num_int, partiton_num ) select toInt64(number) num_int, '100' as partiton_num from numbers(100_500_000, 500_000); -- Видим что пока нам все еще доступен интервал -- от 1 до 100М select min(num_int), max(num_int) from core.v_data; -- Пересоздаем view и тем самым предоставляя пользователю новые данные create or replace view core.v_data as select num_int, partiton_num from core.data where partiton_num < 101; -- Видим что доступны данные до 101М select min(num_int), max(num_int) from core.v_data; Плюсы: Нет дополнительных затрат на дисковое пространство. Легко реализовать, если view уже используется. Минусы: Не самый гибкий подход. Если требуется перезапись существующих данных, придётся дважды обновлять view для одной вставки (до и после), что приведёт к большим объёмам DDL-операций на кластере. Когда применять: Если view уже используется для работы с данными. Если фильтр выбора актуальных данных не слишком сложный (подходит для таблиц, где данные в основном только добавляются и редко перезаписываются). Примечания и советы: Используйте insert_quorum (https://clickhouse.com/docs/operations/settings/settings#insert\_quorum). Без этой настройки есть риск, что данные запишутся только на одну реплику, а затем чтение будет происходить с другой, на которую данные ещё не успели реплицироваться. Спасибо что прочитали до конца Приглашаю вступить в мой Telegram-канал - flow_data, там я буду публиковать материалы и заметки по Data Engineering.

Фильтры и сортировка