columnar_migrator#

columnar_migrator

columnar_migrator

columnar_migrator — скрипт, предназначенный для миграции данных из базы данных PostgreSQL с расширением Citus Columnar в базу данных PostgreSQL с расширением Hydra Columnar, обеспечивая целостность и совместимость данных.

Документация для инструмента миграции данных PostgreSQL

Обзор

Этот скрипт облегчает миграцию данных из базы данных PostgreSQL, работающей с расширением Citus Columnar (версия 15), в базу данных PostgreSQL, оснащенную расширением Hydra Columnar (версия 16). Он обеспечивает совместимость и целостность данных в процессе миграции, учитывая изменения в структурах метаданных между двумя колонными расширениями.

Назначение

Основные цели этого скрипта:

  • Адаптировать структуры колонок таблиц и их связанные метаданные из Citus в формат Hydra.

  • Для эффективной передачи больших наборов данных с использованием пакетной и параллельной обработки для минимизации операционного простоя.

  • Для поддержания целостности и согласованности данных на протяжении всего процесса миграции.

Описание

Написанный на Python, скрипт использует asyncpg для асинхронных операций с базой данных, повышая производительность при крупномасштабных передачах данных. Он включает детализированное ведение журнала для прозрачности и устранения неполадок. Подход к миграции включает генерацию соответствующих команд DDL для новой среды базы данных и использование параллелизма для более быстрой репликации данных.

Ключевые особенности

  • Управление параллелизмом: Использует семафоры для ограничения количества одновременных задач передачи данных.

  • Пакетная обработка: Передает данные пакетами для эффективной обработки больших наборов данных.

  • Обработка переподключений: Обеспечивает беспрерывную передачу данных после прерываний соединения.

  • Проверка количества строк: Сравнивает количество строк между исходными и целевыми таблицами после передачи для обеспечения целостности данных.

  • Пользовательские запросы для существующих таблиц: Проверяет наличие существующих таблиц в целевой базе данных и запрашивает у пользователя разрешение на их удаление, если необходимо.

Использование

Флаги ввода

Скрипт принимает различные аргументы командной строки для указания конфигураций исходной и целевой баз данных:

  • --source-db-host: Имя хоста или IP-адрес исходной базы данных (по умолчанию: localhost).

  • --source-db-port: Номер порта для исходной базы данных (обязательно).

  • --source-db-name: Имя исходной базы данных (по умолчанию: postgres).

  • --source-db-user: Имя пользователя для исходной базы данных (по умолчанию: postgres).

  • --source-db-password: Пароль для пользователя исходной базы данных (по умолчанию: пусто).

  • --target-db-host: Имя хоста или IP-адрес целевой базы данных (по умолчанию: localhost).

  • --target-db-port: Номер порта для целевой базы данных (обязательно).

  • --target-db-name: Имя целевой базы данных (по умолчанию: postgres).

  • --target-db-user: Имя пользователя для целевой базы данных (по умолчанию: postgres).

  • --target-db-password: Пароль для пользователя целевой базы данных (по умолчанию: пусто).

  • --batch-size: Количество строк в партии при передаче данных (по умолчанию: 100000).

  • --concurrency: Количество параллельных задач для параллельной передачи данных (по умолчанию: 2).

Пример команды

Запустите миграцию, используя следующую команду:

python columnar_migrator.py /
--source-db-host 192.168.1.100 /
--source-db-port 5432 / 
--source-db-user admin /
--source-db-password securepass /
--target-db-host 192.168.1.101 /
--target-db-port 5433 /
--target-db-user admin /
--target-db-password securepass /
--batch-size 100000 /
--concurrency 4

Процесс выполнения

  • Инициализация: Установите пул соединений для исходной и целевой баз данных. Проверьте наличие существующих таблиц в целевой базе данных и запросите у пользователя действия, если таковые найдены.

  • Выполнение DDL и копирование данных: Создайте необходимые расширения и выполните команды DDL для настройки схемы целевой базы данных. Запланируйте задачи передачи данных, используя семафор для ограничения параллелизма. Передавайте данные партиями, обрабатывая переподключение в случае потери соединения.

  • Обработка переподключения: Если соединение потеряно во время передачи, попытайтесь переподключиться и возобновить передачу.

  • Проверка количества строк: После переноса каждой таблицы сравните количество строк между исходной и целевой базами данных. Запишите результаты сравнения.

Требования

Чтобы обеспечить плавное выполнение этого скрипта миграции, должны быть выполнены следующие требования:

  • Python 3.7 или новее.

  • Библиотека asyncpg установлена для облегчения асинхронных операций с базой данных.

Установите необходимую библиотеку Python с помощью pip:

pip install asyncpg colorlog

Убедитесь, что оба экземпляра PostgreSQL доступны и что у пользователя есть необходимые разрешения для выполнения команд и обработки данных. ## Заключение

Этот скрипт необходим для администраторов баз данных, обновляющих установки PostgreSQL с новыми колонковыми технологиями. Он обеспечивает эффективный и надежный процесс миграции, минимизируя время простоя и поддерживая целостность данных между двумя активно работающими серверами.

Детальный обзор кода и документация

Инициализация и конфигурация:

  • async def create_pools(source_db_config, target_db_config): Устанавливает пул соединений для исходной и целевой баз данных.

  • async def check_existing_tables(conn, tables): Проверяет, существуют ли таблицы в целевой базе данных.

  • async def prompt_user_for_deletion(existing_tables): Запрашивает у пользователя подтверждение на удаление существующих таблиц.

  • async def drop_existing_tables(conn, tables): Удаляет существующие таблицы из целевой базы данных, если пользователь подтверждает.

Основная функция выполнения:

  • async def execute_ddl_and_copy_data(target_db_config, ddl_commands, source_db_config, batch_size, concurrency): Создает пулы подключений и проверяет существующие таблицы. Выполняет DDL команды для настройки схемы целевой базы данных. Управляет процессом передачи данных с использованием контроля параллелизма и пакетной обработки.

Передача данных и обработка переподключений:

  • async def transfer_data_copy_to_tables(source_pool, target_pool, obj, batch_size, task_id, total_rows): Передает данные партиями. Обрабатывает переподключение, если соединение потеряно, и возобновляет процесс передачи.

Логика переподключения:

  • async def reconnect_new(pool): Пытается переподключиться к базе данных несколько раз с задержкой между попытками.