columnar_migrator#
columnar_migrator
columnar_migrator — скрипт, предназначенный для миграции данных из базы данных PostgreSQL с расширением Citus Columnar в базу данных PostgreSQL с расширением Hydra Columnar, обеспечивая целостность и совместимость данных.
Документация для инструмента миграции данных PostgreSQL
Обзор
Этот скрипт облегчает миграцию данных из базы данных PostgreSQL, работающей с расширением Citus Columnar (версия 15), в базу данных PostgreSQL, оснащенную расширением Hydra Columnar (версия 16). Он обеспечивает совместимость и целостность данных в процессе миграции, учитывая изменения в структурах метаданных между двумя колонными расширениями.
Назначение
Основные цели этого скрипта:
Адаптировать структуры колонок таблиц и их связанные метаданные из Citus в формат Hydra.
Для эффективной передачи больших наборов данных с использованием пакетной и параллельной обработки для минимизации операционного простоя.
Для поддержания целостности и согласованности данных на протяжении всего процесса миграции.
Описание
Написанный на Python, скрипт использует asyncpg для асинхронных операций с базой данных, повышая производительность при крупномасштабных передачах данных. Он включает детализированное ведение журнала для прозрачности и устранения неполадок. Подход к миграции включает генерацию соответствующих команд DDL для новой среды базы данных и использование параллелизма для более быстрой репликации данных.
Ключевые особенности
Управление параллелизмом: Использует семафоры для ограничения количества одновременных задач передачи данных.
Пакетная обработка: Передает данные пакетами для эффективной обработки больших наборов данных.
Обработка переподключений: Обеспечивает беспрерывную передачу данных после прерываний соединения.
Проверка количества строк: Сравнивает количество строк между исходными и целевыми таблицами после передачи для обеспечения целостности данных.
Пользовательские запросы для существующих таблиц: Проверяет наличие существующих таблиц в целевой базе данных и запрашивает у пользователя разрешение на их удаление, если необходимо.
Использование
Флаги ввода
Скрипт принимает различные аргументы командной строки для указания конфигураций исходной и целевой баз данных:
--source-db-host
: Имя хоста или IP-адрес исходной базы данных (по умолчанию:localhost
).--source-db-port
: Номер порта для исходной базы данных (обязательно).--source-db-name
: Имя исходной базы данных (по умолчанию:postgres
).--source-db-user
: Имя пользователя для исходной базы данных (по умолчанию:postgres
).--source-db-password
: Пароль для пользователя исходной базы данных (по умолчанию: пусто).--target-db-host
: Имя хоста или IP-адрес целевой базы данных (по умолчанию:localhost
).--target-db-port
: Номер порта для целевой базы данных (обязательно).--target-db-name
: Имя целевой базы данных (по умолчанию:postgres
).--target-db-user
: Имя пользователя для целевой базы данных (по умолчанию:postgres
).--target-db-password
: Пароль для пользователя целевой базы данных (по умолчанию: пусто).--batch-size
: Количество строк в партии при передаче данных (по умолчанию: 100000).--concurrency
: Количество параллельных задач для параллельной передачи данных (по умолчанию: 2).
Пример команды
Запустите миграцию, используя следующую команду:
python columnar_migrator.py / --source-db-host 192.168.1.100 / --source-db-port 5432 / --source-db-user admin / --source-db-password securepass / --target-db-host 192.168.1.101 / --target-db-port 5433 / --target-db-user admin / --target-db-password securepass / --batch-size 100000 / --concurrency 4
Процесс выполнения
Инициализация: Установите пул соединений для исходной и целевой баз данных. Проверьте наличие существующих таблиц в целевой базе данных и запросите у пользователя действия, если таковые найдены.
Выполнение DDL и копирование данных: Создайте необходимые расширения и выполните команды DDL для настройки схемы целевой базы данных. Запланируйте задачи передачи данных, используя семафор для ограничения параллелизма. Передавайте данные партиями, обрабатывая переподключение в случае потери соединения.
Обработка переподключения: Если соединение потеряно во время передачи, попытайтесь переподключиться и возобновить передачу.
Проверка количества строк: После переноса каждой таблицы сравните количество строк между исходной и целевой базами данных. Запишите результаты сравнения.
Требования
Чтобы обеспечить плавное выполнение этого скрипта миграции, должны быть выполнены следующие требования:
Python 3.7 или новее.
Библиотека
asyncpg
установлена для облегчения асинхронных операций с базой данных.
Установите необходимую библиотеку Python с помощью pip:
pip install asyncpg colorlog
Убедитесь, что оба экземпляра PostgreSQL доступны и что у пользователя есть необходимые разрешения для выполнения команд и обработки данных. ## Заключение
Этот скрипт необходим для администраторов баз данных, обновляющих установки PostgreSQL с новыми колонковыми технологиями. Он обеспечивает эффективный и надежный процесс миграции, минимизируя время простоя и поддерживая целостность данных между двумя активно работающими серверами.
Детальный обзор кода и документация
Инициализация и конфигурация:
async def create_pools(source_db_config, target_db_config)
: Устанавливает пул соединений для исходной и целевой баз данных.async def check_existing_tables(conn, tables)
: Проверяет, существуют ли таблицы в целевой базе данных.async def prompt_user_for_deletion(existing_tables)
: Запрашивает у пользователя подтверждение на удаление существующих таблиц.async def drop_existing_tables(conn, tables)
: Удаляет существующие таблицы из целевой базы данных, если пользователь подтверждает.
Основная функция выполнения:
async def execute_ddl_and_copy_data(target_db_config, ddl_commands, source_db_config, batch_size, concurrency)
: Создает пулы подключений и проверяет существующие таблицы. Выполняет DDL команды для настройки схемы целевой базы данных. Управляет процессом передачи данных с использованием контроля параллелизма и пакетной обработки.
Передача данных и обработка переподключений:
async def transfer_data_copy_to_tables(source_pool, target_pool, obj, batch_size, task_id, total_rows)
: Передает данные партиями. Обрабатывает переподключение, если соединение потеряно, и возобновляет процесс передачи.
Логика переподключения:
async def reconnect_new(pool)
: Пытается переподключиться к базе данных несколько раз с задержкой между попытками.