J.4. pg_timetable#

J.4. pg_timetable

J.4. pg_timetable #

J.4.1. Обзор #

Версия: 6.2.0

Скачать с nexus-public

pg_timetable — это продвинутый планировщик задач для Tantor SE, предлагающий множество преимуществ по сравнению с традиционными планировщиками, такими как cron и другие. Он полностью управляется базой данных и предоставляет несколько продвинутых концепций.

J.4.2. Основные функции #

  • Задачи могут быть организованы в цепочки

  • Цепочка может состоять из встроенных команд, SQL и исполняемых файлов

  • Параметры могут быть переданы в цепочки

  • Пропущенные задачи (возможно, из-за простоя) могут быть повторно выполнены автоматически

  • Поддержка настраиваемых повторений

  • Встроенные задачи, такие как отправка электронных писем и т.д.

  • Полностью управляемая базой данных конфигурация

  • Полная поддержка ведения журналов на основе базы данных

  • Планирование в стиле Cron в часовом поясе сервера Tantor SE

  • Необязательная защита от параллелизма

  • Задача и цепочка могут иметь настройки времени ожидания выполнения

J.4.3. Быстрый старт #

  1. Скачайте исполняемый файл pg_timetable

  2. Убедитесь, что ваш сервер Tantor SE запущен и имеет роль с привилегией CREATE для целевой базы данных, например,

    my_database=> CREATE ROLE scheduler PASSWORD 'somestrong';
    my_database=> GRANT CREATE ON DATABASE my_database TO scheduler;
    
  3. Создайте новую задачу, например, выполняйте VACUUM каждую ночь в 00:30 по часовому поясу сервера Postgres

    my_database=> SELECT timetable.add_job('frequent-vacuum', '30 * * * *', 'VACUUM');
    add_job
    ---------
          3
    (1 row)
    
  4. Запустите pg_timetable

    # pg_timetable postgresql://scheduler:somestrong@localhost/my_database --clientname=vacuumer
    
  5. Успех!

J.4.4. Параметры командной строки #

# ./pg_timetable

Application Options:
  -c, --clientname=                                Unique name for application instance [$PGTT_CLIENTNAME]
      --config=                                    YAML configuration file
      --no-program-tasks                           Disable executing of PROGRAM tasks [$PGTT_NOPROGRAMTASKS]
  -v, --version                                    Output detailed version information [$PGTT_VERSION]
      --connstr                                    PostgreSQL connection string [$PGTT_CONNSTR]

Logging:
      --log-level=[debug|info|error]               Verbosity level for stdout and log file (default: info)
      --log-database-level=[debug|info|error|none] Verbosity level for database storing (default: info)
      --log-file=                                  File name to store logs
      --log-file-format=[json|text]                Format of file logs (default: json)
      --log-file-rotate                            Rotate log files
      --log-file-size=                             Maximum size in MB of the log file before it gets rotated (default: 100)
      --log-file-age=                              Number of days to retain old log files, 0 means forever (default: 0)
      --log-file-number=                           Maximum number of old log files to retain, 0 to retain all (default: 0)

Start:
  -f, --file=                                      SQL script file to execute during startup
      --init                                       Initialize database schema to the latest version and exit. Can be used
                                                   with --upgrade
      --upgrade                                    Upgrade database to the latest version
      --debug                                      Run in debug mode. Only asynchronous chains will be executed

Resource:
      --cron-workers=                              Number of parallel workers for scheduled chains (default: 16)
      --interval-workers=                          Number of parallel workers for interval chains (default: 16)
      --chain-timeout=                             Abort any chain that takes more than the specified number of
                                                   milliseconds
      --task-timeout=                              Abort any task within a chain that takes more than the specified number
                                                   of milliseconds

REST:
      --rest-port=                                 REST API port (default: 0) [$PGTT_RESTPORT]

J.4.5. Фон проекта #

Проект pg_timetable был запущен еще в 2019 году для внутренних нужд планирования в компании Cybertec.

Для получения дополнительной информации о мотивах проекта и целях проектирования см. оригинальную серию блог-постов, объявляющих о проекте, и последующие обновления функций.

Cybertec также предоставляет коммерческую поддержку с 9 до 5 и 24/7 для pg_timetable.

J.4.5.1. Обратная связь по проекту #

Для запросов на добавление функций или помощи в устранении неполадок, пожалуйста, откройте проблему на странице Github проекта.

J.4.6. Установка #

pg_timetable совместим со всеми поддерживаемыми версиями Tantor SE.

J.4.6.1. Официальные пакеты релизов #

Вы можете найти бинарный пакет для вашей платформы на официальной странице Релизы.

J.4.6.2. Docker #

Официальный образ docker можно найти здесь: https://hub.docker.com/r/cybertecpostgresql/pg_timetable

Примечание

Тег latest актуален с веткой master благодаря этому действию github. В производственной среде вам, вероятно, следует использовать последний стабильный тег.

Запустите pg_timetable в Docker:

docker run --rm \
cybertecpostgresql/pg_timetable:latest \
-h 10.0.0.3 -p 54321 -c worker001

Запустите pg_timetable в Docker с переменными окружения:

docker run --rm \
-e PGTT_PGHOST=10.0.0.3 \
-e PGTT_PGPORT=54321 \
cybertecpostgresql/pg_timetable:latest \
-c worker001

J.4.7. Компоненты #

Планирование в pg_timetable охватывает три различных уровня абстракции, чтобы облегчить повторное использование с другими параметрами или дополнительными расписаниями.

Command:

Базовый уровень, команда, определяет что делать.

Task:

Второй уровень, задача, представляет собой элемент цепочки (шаг) для выполнения одной из команд. С помощью задач мы определяем порядок команд, передаваемые аргументы (если есть), и как обрабатываются ошибки.

Chain:

Третий уровень представляет собой связанные задачи, образующие цепочку задач. Цепочка определяет если, когда и как часто должна выполняться задача.

J.4.7.1. Команда #

В настоящее время существует три различных типа команд:

SQL

SQL фрагмент. Начало очистки, обновление материализованного представления или обработка данных.

PROGRAM

Внешняя команда. Все, что может быть вызвано как внешний бинарный файл, включая оболочки, например, bash, pwsh и т.д. Внешняя команда будет вызвана с использованием exec.CommandContext из golang.

BUILTIN

Внутренняя команда. Предварительно встроенная функциональность, включенная в pg_timetable. К ним относятся:

  • NoOp

  • Sleep

  • Log

  • SendMail

  • Download

  • CopyFromFile

  • CopyToFile

  • CopyFromProgram

  • CopyToProgram

  • Shutdown

J.4.7.2. Задача #

Следующим строительным блоком является задача, которая просто представляет собой шаг в списке цепочки команд. Примером задач, объединенных в цепочку, может быть:

  1. Загрузить файлы с сервера

  2. Импорт файлов

  3. Выполнение агрегаций

  4. Создать отчет

  5. Удалить файлы с диска

Примечание

Все задачи цепочки в pg_timetable выполняются в рамках одной транзакции. Однако, обратите внимание, что нет возможности откатить задачи PROGRAM и BUILTIN.

J.4.7.2.1. Таблица timetable.task #
ПолеТипОписание
chain_idbigintСсылка на цепочку; если NULL, задача считается отключённой
task_orderDOUBLE PRECISIONУказывает порядок задачи внутри цепочки
kindtimetable.command_kindТип команды. Может быть SQL (по умолчанию), PROGRAM или BUILTIN
commandtextСодержит либо SQL-команду, либо путь к приложению, либо имя команды BUILTIN, которая будет выполнена
run_astextРоль, от имени которой должна выполняться задача
database_connectiontextСтрока подключения к внешней базе данных, которая должна использоваться
ignore_errorbooleanУказывает, должна ли следующая задача продолжаться после возникновения ошибки (по умолчанию: false)
autonomousbooleanУказывает, должна ли задача выполняться вне транзакции цепочки. Полезно для VACUUM, CREATE DATABASE, CALL и т.д.
timeoutintegerПрервать любую задачу в цепочке, выполнение которой занимает больше указанного количества миллисекунд

Предупреждение

Если для задачи установлено значение ignore_error в true (по умолчанию используется значение false), рабочий процесс будет сообщать об успешном выполнении даже если задача внутри цепочки завершилась с ошибкой.

Как упоминалось выше, команды являются простыми скелетами (например, отправить электронное письмо, вакуумировать и т.д.). В большинстве случаев они должны быть оживлены путем передачи входных параметров для выполнения.

J.4.7.2.2. Table timetable.parameter #
ПолеТипОписание
task_idbigintИдентификатор задачи
order_idintegerПорядок параметра. Несколько параметров обрабатываются один за другим в соответствии с этим порядком
valuejsonbЗначение JSON, содержащее параметры
J.4.7.2.3. Формат значения параметра #

В зависимости от command аргумент kind может быть представлен разными значениями JSON.

J.4.7.2.3.1. SQL #

Схема: массив

Пример:

'[ "one", 2, 3.14, false ]'::jsonb
J.4.7.2.3.2. PROGRAM #

Schema: массив строк

Пример:

'["-x", "Latin-ASCII", "-o", "orte_ansi.txt", "orte.txt"]'::jsonb
J.4.7.2.3.3. BUILTIN: Sleep #

Схема: integer

Пример:

'5' :: jsonb
J.4.7.2.3.4. BUILTIN: Log #

Схема: любой

Примеры:

'"WARNING"'::jsonb
'{"Status": "WARNING"}'::jsonb
J.4.7.2.3.5. BUILTIN: SendMail #

Схема: объект

Пример:

'{
    "username":     "user@example.com",
    "password":     "password",
    "serverhost":   "smtp.example.com",
    "serverport":   587,
    "senderaddr":   "user@example.com",
    "ccaddr":       ["recipient_cc@example.com"],
    "bccaddr":      ["recipient_bcc@example.com"],
    "toaddr":       ["recipient@example.com"],
    "subject":      "pg_timetable - No Reply",
    "attachment":   ["/temp/attachments/Report.pdf","config.yaml"],
    "attachmentdata": [{"name": "File.txt", "base64data": "RmlsZSBDb250ZW50"}],
    "msgbody":      "<h2>Hello User,</h2> <p>check some attachments!</p>",
    "contenttype":   "text/html; charset=UTF-8"
}'::jsonb
J.4.7.2.3.6. BUILTIN: Download #

Схема: объект

Пример:

'{
    "workersnum": 2, 
    "fileurls": ["http://example.com/foo.gz", "https://example.com/bar.csv"], 
    "destpath": "."
}'::jsonb
J.4.7.2.3.7. BUILTIN: CopyFromFile #

Схема: объект

Пример:

'{
    "sql": "COPY location FROM STDIN", 
    "filename": "download/orte_ansi.txt" 
}'::jsonb
J.4.7.2.3.8. BUILTIN: CopyToFile #

Схема: объект

Пример:

'{
    "sql": "COPY location TO STDOUT", 
    "filename": "download/location.txt" 
}'::jsonb
J.4.7.2.3.9. BUILTIN: CopyToProgram #

Схема: объект

Пример:

'{
    "sql": "COPY location TO STDOUT",
    "cmd": "sh",
    "args": ["gzip", "-c", ">", "/tmp/output.gz"]
}'::jsonb
J.4.7.2.3.10. BUILTIN: CopyFromProgram #

Схема: объект

Пример:

'{
    "sql": "COPY location FROM STDIN",
    "cmd": "gunzip",
    "args": ["-c", "/tmp/data.gz"]
}'::jsonb
J.4.7.2.3.11. BUILTIN: Shutdown #

значение проигнорировано

J.4.7.2.3.12. BUILTIN: NoOp #

значение проигнорировано

J.4.7.3. Цепочка #

Как только задачи были организованы, их необходимо запланировать как "цепочку". Для этого "pg_timetable" использует расширенную "cron"-строку, при этом добавляя множество вариантов конфигурации.

J.4.7.3.1. Таблица timetable.chain #
ПолеТипОписание
chain_nametextУникальное имя цепочки
run_attimetable.cronСтандартное значение в стиле cron во временной зоне сервера Postgres или выражение @after, @every, @reboot
max_instancesintegerКоличество экземпляров, которые эта цепочка может иметь, работающих одновременно
timeoutintegerПрервать любую цепочку, выполнение которой занимает больше указанного количества миллисекунд
livebooleanУправляет возможностью выполнения цепочки при достижении её расписания
self_destructbooleanСамоуничтожить цепочку после успешного выполнения. Неудачные цепочки будут выполнены по расписанию еще один раз
exclusive_executionbooleanУказывает, должна ли цепочка выполняться эксклюзивно, пока все остальные цепочки приостановлены
client_nametextУказывает, какой клиент должен выполнять цепочку. Установите значение NULL, чтобы разрешить выполнение любому клиенту
timeoutintegerПрервать цепочку, выполнение которой занимает больше указанного количества миллисекунд
on_errorСодержит SQL-запрос, который выполняется в случае возникновения ошибки. Если задача, вызвавшая ошибку, помечена как ignore_error, то ничего не происходит

Примечание

Все цепочки в "pg_timetable" планируются по часовому поясу сервера PostgreSQL. Вы можете изменить часовой пояс для "текущей сессии" при добавлении новых цепочек, например,

SET TIME ZONE 'UTC';

-- Run VACUUM at 00:05 every day in August UTC
SELECT timetable.add_job('execute-func', '5 0 * 8 *', 'VACUUM');

J.4.8. Схема базы данных #

pg_timetable является приложением, управляемым базой данных. Во время первого запуска создается необходимая схема, если она отсутствует.

J.4.8.1. Основные таблицы и объекты #

CREATE TABLE timetable.chain (
    chain_id            BIGSERIAL   PRIMARY KEY,
    chain_name          TEXT        NOT NULL UNIQUE,
    run_at              timetable.cron,
    max_instances       INTEGER,
    timeout             INTEGER     DEFAULT 0,
    live                BOOLEAN     DEFAULT FALSE,
    self_destruct       BOOLEAN     DEFAULT FALSE,
    exclusive_execution BOOLEAN     DEFAULT FALSE,
    client_name         TEXT,
    on_error            TEXT
);

COMMENT ON TABLE timetable.chain IS
    'Stores information about chains schedule';
COMMENT ON COLUMN timetable.chain.run_at IS
    'Extended CRON-style time notation the chain has to be run at';
COMMENT ON COLUMN timetable.chain.max_instances IS
    'Number of instances (clients) this chain can run in parallel';
COMMENT ON COLUMN timetable.chain.timeout IS
    'Abort any chain that takes more than the specified number of milliseconds';
COMMENT ON COLUMN timetable.chain.live IS
    'Indication that the chain is ready to run, set to FALSE to pause execution';
COMMENT ON COLUMN timetable.chain.self_destruct IS
    'Indication that this chain will delete itself after successful run';
COMMENT ON COLUMN timetable.chain.exclusive_execution IS
    'All parallel chains should be paused while executing this chain';
COMMENT ON COLUMN timetable.chain.client_name IS
    'Only client with this name is allowed to run this chain, set to NULL to allow any client';    

CREATE TYPE timetable.command_kind AS ENUM ('SQL', 'PROGRAM', 'BUILTIN');

CREATE TABLE timetable.task (
    task_id             BIGSERIAL               PRIMARY KEY,
    chain_id            BIGINT                  REFERENCES timetable.chain(chain_id) ON UPDATE CASCADE ON DELETE SCADE,
    task_order          DOUBLE PRECISION        NOT NULL,
    task_name           TEXT,
    kind                timetable.command_kind  NOT NULL DEFAULT 'SQL',
    command             TEXT                    NOT NULL,
    run_as              TEXT,
    database_connection TEXT,
    ignore_error        BOOLEAN                 NOT NULL DEFAULT FALSE,
    autonomous          BOOLEAN                 NOT NULL DEFAULT FALSE,
    timeout             INTEGER                 DEFAULT 0
);          

COMMENT ON TABLE timetable.task IS
    'Holds information about chain elements aka tasks';
COMMENT ON COLUMN timetable.task.chain_id IS
    'Link to the chain, if NULL task considered to be disabled';
COMMENT ON COLUMN timetable.task.task_order IS
    'Indicates the order of task within a chain';    
COMMENT ON COLUMN timetable.task.run_as IS
    'Role name to run task as. Uses SET ROLE for SQL commands';
COMMENT ON COLUMN timetable.task.ignore_error IS
    'Indicates whether a next task in a chain can be executed regardless of the success of the current one';
COMMENT ON COLUMN timetable.task.kind IS
    'Indicates whether "command" is SQL, built-in function or an external program';
COMMENT ON COLUMN timetable.task.command IS
    'Contains either an SQL command, or command string to be executed';
COMMENT ON COLUMN timetable.task.timeout IS
    'Abort any task within a chain that takes more than the specified number of milliseconds';
COMMENT ON COLUMN timetable.task.autonomous IS
    'Specify if the task should be executed out of the chain transaction. Useful for VACUUM, CREATE DATABASE, CALL etc.';

-- parameter passing for a chain task
CREATE TABLE timetable.parameter(
    task_id     BIGINT  REFERENCES timetable.task(task_id)
                        ON UPDATE CASCADE ON DELETE CASCADE,
    order_id    INTEGER CHECK (order_id > 0),
    value       JSONB,
    PRIMARY KEY (task_id, order_id)
);

COMMENT ON TABLE timetable.parameter IS
    'Stores parameters passed as arguments to a chain task';

CREATE UNLOGGED TABLE timetable.active_session(
    client_pid  BIGINT  NOT NULL,
    server_pid  BIGINT  NOT NULL,
    client_name TEXT    NOT NULL,
    started_at  TIMESTAMPTZ DEFAULT now()
);

COMMENT ON TABLE timetable.active_session IS
    'Stores information about active sessions';

CREATE TYPE timetable.log_type AS ENUM ('DEBUG', 'NOTICE', 'INFO', 'ERROR', 'PANIC', 'USER');

CREATE OR REPLACE FUNCTION timetable.get_client_name(integer) RETURNS TEXT AS
$$
    SELECT client_name FROM timetable.active_session WHERE server_pid = $1 LIMIT 1
$$
LANGUAGE sql;

CREATE TABLE timetable.log
(
    ts              TIMESTAMPTZ         DEFAULT now(),
    pid             INTEGER             NOT NULL,
    log_level       timetable.log_type  NOT NULL,
    client_name     TEXT                DEFAULT timetable.get_client_name(pg_backend_pid()),
    message         TEXT,
    message_data    jsonb
);

COMMENT ON TABLE timetable.log IS
    'Stores log entries of active sessions';

CREATE TABLE timetable.execution_log (
    chain_id        BIGINT,
    task_id         BIGINT,
    txid            BIGINT NOT NULL,
    last_run        TIMESTAMPTZ DEFAULT now(),
    finished        TIMESTAMPTZ,
    pid             BIGINT,
    returncode      INTEGER,
    ignore_error    BOOLEAN,
    kind            timetable.command_kind,
    command         TEXT,
    output          TEXT,
    client_name     TEXT        NOT NULL
);

COMMENT ON TABLE timetable.execution_log IS
    'Stores log entries of executed tasks and chains';

CREATE UNLOGGED TABLE timetable.active_chain(
    chain_id    BIGINT  NOT NULL,
    client_name TEXT    NOT NULL,
    started_at  TIMESTAMPTZ DEFAULT now()
);

COMMENT ON TABLE timetable.active_chain IS
    'Stores information about active chains within session';

CREATE OR REPLACE FUNCTION timetable.try_lock_client_name(worker_pid BIGINT, worker_name TEXT)
RETURNS bool AS
$CODE$
BEGIN
    IF pg_is_in_recovery() THEN
        RAISE NOTICE 'Cannot obtain lock on a replica. Please, use the primary node';
        RETURN FALSE;
    END IF;
    -- remove disconnected sessions
    DELETE
        FROM timetable.active_session
        WHERE server_pid NOT IN (
            SELECT pid
            FROM pg_catalog.pg_stat_activity
            WHERE application_name = 'pg_timetable'
        );
    DELETE 
        FROM timetable.active_chain 
        WHERE client_name NOT IN (
            SELECT client_name FROM timetable.active_session
        );
    -- check if there any active sessions with the client name but different client pid
    PERFORM 1
        FROM timetable.active_session s
        WHERE
            s.client_pid <> worker_pid
            AND s.client_name = worker_name
        LIMIT 1;
    IF FOUND THEN
        RAISE NOTICE 'Another client is already connected to server with name: %', worker_name;
        RETURN FALSE;
    END IF;
    -- insert current session information
    INSERT INTO timetable.active_session(client_pid, client_name, server_pid) VALUES (worker_pid, worker_name, backend_pid());
    RETURN TRUE;
END;
$CODE$
STRICT
LANGUAGE plpgsql;

J.4.8.4. ER-Диаграмма #

ER-Diagram

J.4.9. Начало работы #

Разнообразные примеры можно найти в примерах. Если вы хотите перейти с другого планировщика, вы можете воспользоваться скриптами из главы миграции.

J.4.9.1. Добавить простое задание #

В реальном мире обычно достаточно использовать простые задания. Под этим термином мы понимаем:

  • задание — это цепочка, содержащая только одну задачу (шаг);

  • он не использует сложную логику, а скорее простую "команду;

  • это не требует сложной обработки транзакций, поскольку одна задача неявно выполняется как одна транзакция.

Для такой группы цепочек мы ввели специальную функцию timetable.add_job().

J.4.9.1.1. Функция: timetable.add_job() #

Создает простую цепочку из одной задачи

Возвращает: BIGINT

J.4.9.1.1.1. Параметры #
ПараметрТипОписаниеПо умолчанию
job_nametextУникальное имя для "цепочки" и "команды"Обязательно
job_scheduletimetable.cronРасписание времени в синтаксисе cron по часовому поясу сервера PostgresОбязательно
job_commandtextSQL-запрос, который будет выполненОбязательно
job_parametersjsonbАргументы для цепочки "commandNULL
job_kindtimetable.command_kindТип команды: SQL, PROGRAM или BUILTINSQL
job_client_nametextУказывает, какой клиент должен выполнять цепочку. Установите значение NULL, чтобы разрешить выполнение любому клиентуNULL
job_max_instancesintegerКоличество экземпляров, которые эта цепочка может выполнять одновременноNULL
job_livebooleanУправляет возможностью выполнения цепочки после достижения ею расписанияTRUE
job_self_destructbooleanСамоуничтожить цепочку после выполненияFALSE
job_ignore_errorsbooleanИгнорировать ошибку во время выполненияTRUE
job_exclusivebooleanВыполнять цепочку в эксклюзивном режимеFALSE

Возвращает: идентификатор созданной цепочки

J.4.9.2. Примеры #

  1. Выполнять public.my_func() в 00:05 каждый день в августе по часовому поясу сервера Postgres:

    SELECT timetable.add_job('execute-func', '5 0 * 8 *', 'SELECT public.my_func()');
    
  2. Запускать VACUUM на 23-й минуте каждого второго часа с 0 до 20 каждый день по часовому поясу сервера Postgres:

    SELECT timetable.add_job('run-vacuum', '23 0-20/2 * * *', 'VACUUM');
    
  3. Обновлять материализованное представление каждые 2 часа:

    SELECT timetable.add_job('refresh-matview', '@every 2 hours', 'REFRESH MATERIALIZED VIEW public.mat_view');
    
  4. Очистить таблицу журнала после перезапуска pg_timetable:

    SELECT timetable.add_job('clear-log', '@reboot', 'TRUNCATE timetable.log');
    
  5. Переиндексация в полночь по времени сервера Postgres по воскресеньям с помощью утилиты reindexdb:

    • используется база данных по умолчанию под пользователем по умолчанию (без аргументов командной строки)

      SELECT timetable.add_job('reindex', '0 0 * * 7', 'reindexdb', job_kind := 'PROGRAM');
      
    • указание целевой базы данных и таблиц, и быть подробным

      SELECT timetable.add_job('reindex', '0 0 * * 7', 'reindexdb', 
          '["--table=foo", "--dbname=postgres", "--verbose"]'::jsonb, 'PROGRAM');
      
    • передача пароля с помощью переменной окружения через оболочку bash

      SELECT timetable.add_job('reindex', '0 0 * * 7', 'bash', 
          '["-c", "PGPASSWORD=5m3R7K4754p4m reindexdb -U postgres -h 192.168.0.221 -v"]'::jsonb, 
          'PROGRAM');
      

J.4.10. Примеры #

J.4.10.1. Основы #

Этот пример демонстрирует, как создать базовую одноступенчатую цепочку с параметрами. Он использует CTE для прямого обновления таблиц схемы timetable.

SELECT timetable.add_job(
    job_name            => 'notify every minute',
    job_schedule        => '* * * * *',
    job_command         => 'SELECT pg_notify($1, $2)',
    job_parameters      => '[ "TT_CHANNEL", "Ahoj from SQL base task" ]' :: jsonb,
    job_kind            => 'SQL'::timetable.command_kind,
    job_client_name     => NULL,
    job_max_instances   => 1,
    job_live            => TRUE,
    job_self_destruct   => FALSE,
    job_ignore_errors   => TRUE
) as chain_id;

J.4.10.2. Отправить электронное письмо #

Этот пример демонстрирует, как создать расширенную задачу для отправки электронной почты. Она проверит, есть ли письма для отправки, отправит их и зафиксирует статус выполнения команды. Вам не нужно ничего настраивать, каждый параметр можно указать при создании цепочки.

DO $$
    -- An example for using the SendMail task.
DECLARE
    v_mail_task_id bigint;
    v_log_task_id bigint;
    v_chain_id bigint;
BEGIN
    -- Get the chain id
    INSERT INTO timetable.chain (chain_name, max_instances, live) VALUES ('Send Mail', 1, TRUE)
    RETURNING chain_id INTO v_chain_id;

    -- Add SendMail task
    INSERT INTO timetable.task (chain_id, task_order, kind, command) 
    SELECT v_chain_id, 10, 'BUILTIN', 'SendMail'
    RETURNING task_id INTO v_mail_task_id;

    -- Create the parameters for the SensMail task
        -- "username":	      The username used for authenticating on the mail server
        -- "password":        The password used for authenticating on the mail server
        -- "serverhost":      The IP address or hostname of the mail server
        -- "serverport":      The port of the mail server
        -- "senderaddr":      The email that will appear as the sender
        -- "ccaddr":	      String array of the recipients(Cc) email addresses
        -- "bccaddr":	      String array of the recipients(Bcc) email addresses
        -- "toaddr":          String array of the recipients(To) email addresses
        -- "subject":	      Subject of the email
        -- "attachment":      String array of the attachments (local file)
        -- "attachmentdata":  Pairs of name and base64-encoded content
        -- "msgbody":	      The body of the email

    INSERT INTO timetable.parameter (task_id, order_id, value)
        VALUES (v_mail_task_id, 1, '{
                "username":     "user@example.com",
                "password":     "password",
                "serverhost":   "smtp.example.com",
                "serverport":   587,
                "senderaddr":   "user@example.com",
                "ccaddr":       ["recipient_cc@example.com"],
                "bccaddr":      ["recipient_bcc@example.com"],
                "toaddr":       ["recipient@example.com"],
                "subject":      "pg_timetable - No Reply",
                "attachment":   ["D:\\Go stuff\\Books\\Concurrency in Go.pdf","report.yaml"],
                "attachmentdata": [{"name": "File.txt", "base64data": "RmlsZSBDb250ZW50"}],
                "msgbody":      "<b>Hello User,</b> <p>I got some Go books for you enjoy</p> <i>pg_timetable</i>!",
                "contenttype":  "text/html; charset=UTF-8"
                }'::jsonb);
    
    -- Add Log task and make it the last task using `task_order` column (=30)
    INSERT INTO timetable.task (chain_id, task_order, kind, command) 
    SELECT v_chain_id, 30, 'BUILTIN', 'Log'
    RETURNING task_id INTO v_log_task_id;

    -- Add housekeeping task, that will delete sent mail and update parameter for the previous logging task
    -- Since we're using special add_task() function we don't need to specify the `chain_id`.
    -- Function will take the same `chain_id` from the parent task, SendMail in this particular case
    PERFORM timetable.add_task(
        kind => 'SQL', 
        parent_id => v_mail_task_id,
        command => format(
$query$WITH sent_mail(toaddr) AS (DELETE FROM timetable.parameter WHERE task_id = %s RETURNING value->>'username')
INSERT INTO timetable.parameter (task_id, order_id, value) 
SELECT %s, 1, to_jsonb('Sent emails to: ' || string_agg(sent_mail.toaddr, ';'))
FROM sent_mail
ON CONFLICT (task_id, order_id) DO UPDATE SET value = EXCLUDED.value$query$, 
                v_mail_task_id, v_log_task_id
            ),
        order_delta => 10
    );

-- In the end we should have something like this. Note, that even Log task was created earlier it will be executed later
-- due to `task_order` column.

-- timetable=> SELECT task_id, chain_id, kind, left(command, 50) FROM timetable.task ORDER BY task_order;  
--  task_id | chain_id | task_order |  kind   |                             left
-- ---------+----------+------------+---------+---------------------------------------------------------------
--       45 |       24 |         10 | BUILTIN | SendMail
--       47 |       24 |         20 | SQL     | WITH sent_mail(toaddr) AS (DELETE FROM timetable.p
--       46 |       24 |         30 | BUILTIN | Log
-- (3 rows)

END;
$$
LANGUAGE PLPGSQL;

J.4.10.3. Загрузка, Преобразование и Импорт #

Этот пример демонстрирует, как создать расширенную трехшаговую цепочку с параметрами. Он использует оператор DO для прямого обновления таблиц схемы timetable.

-- Prepare the destination table 'location'
CREATE TABLE IF NOT EXISTS public.city(
    city text,
    lat numeric,
    lng numeric,
    country text,
    iso2 text,
    admin_name text,
    capital text,
    population bigint,
    population_proper bigint);
   
GRANT ALL ON public.city TO scheduler;

-- An enhanced example consisting of three tasks:
-- 1. Download text file from internet using BUILT-IN command
-- 2. Remove accents (diacritic signs) from letters using PROGRAM command (can be done with `unaccent` PostgreSQL extension) 
-- 3. Import text file as CSV file using BUILT-IN command (can be down with `psql -c /copy`)
DO $$
DECLARE
    v_task_id bigint;
    v_chain_id bigint;
BEGIN
    -- Create the chain with default values executed every minute (NULL == '* * * * *' :: timetable.cron)
    INSERT INTO timetable.chain (chain_name, live)
    VALUES ('Download locations and aggregate', TRUE)
    RETURNING chain_id INTO v_chain_id;

    -- Step 1. Download file from the server
    -- Create the chain
    INSERT INTO timetable.task (chain_id, task_order, kind, command, ignore_error)
    VALUES (v_chain_id, 1, 'BUILTIN', 'Download', TRUE)
    RETURNING task_id INTO v_task_id;

    -- Create the parameters for the step 1:
    INSERT INTO timetable.parameter (task_id, order_id, value)
        VALUES (v_task_id, 1, 
           '{
                "workersnum": 1,
                "fileurls": ["https://simplemaps.com/static/data/country-cities/mt/mt.csv"], 
                "destpath": "."
            }'::jsonb);
    
    RAISE NOTICE 'Step 1 completed. Chain added with ID: %; DownloadFile task added with ID: %', v_chain_id, v_task_id;

    -- Step 2. Transform Unicode characters into ASCII
    -- Create the program task to call 'uconv' and name it 'unaccent'
    INSERT INTO timetable.task (chain_id, task_order, kind, command, ignore_error, task_name)
    VALUES (v_chain_id, 2, 'PROGRAM', 'uconv', TRUE, 'unaccent')
    RETURNING task_id INTO v_task_id;

    -- Create the parameters for the 'unaccent' task. Input and output files in this case
    -- Under Windows we should call PowerShell instead of "uconv" with command:
    -- Set-content "orte_ansi.txt" ((Get-content "orte.txt").Normalize("FormD") -replace '\p{M}', '')
    INSERT INTO timetable.parameter (task_id, order_id, value)
        VALUES (v_task_id, 1, '["-x", "Latin-ASCII", "-o", "mt_ansi.csv", "mt.csv"]'::jsonb);

    RAISE NOTICE 'Step 2 completed. Unacent task added with ID: %', v_task_id;

    -- Step 3. Import ASCII file to PostgreSQL table using "CopyFromFile" built-in command
    INSERT INTO timetable.task (chain_id, task_order, kind, command)
        VALUES (v_chain_id, 3, 'BUILTIN', 'CopyFromFile')
    RETURNING task_id INTO v_task_id;

    -- Add the parameters for the download task. Execute client side COPY to 'location' from 'orte_ansi.txt'
    INSERT INTO timetable.parameter (task_id, order_id, value)
        VALUES (v_task_id, 1, '{"sql": "COPY city FROM STDIN (FORMAT csv, HEADER true)", "filename": "mt_ansi.csv" }'::jsonb);

    RAISE NOTICE 'Step 3 completed. Import task added with ID: %', v_task_id;

    INSERT INTO timetable.task (chain_id, task_order, kind, command, ignore_error, task_name)
    VALUES (v_chain_id, 4, 'PROGRAM', 'bash', TRUE, 'remove .csv')
    RETURNING task_id INTO v_task_id;

    INSERT INTO timetable.parameter (task_id, order_id, value)
    VALUES (v_task_id, 1, '["-c", "rm *.csv"]'::jsonb);
   
   RAISE NOTICE 'Step 4 completed. Cleanup task added with ID: %', v_task_id;
END;
$$ LANGUAGE PLPGSQL;

J.4.10.4. Выполнение задач в автономной транзакции #

Этот пример демонстрирует, как выполнять специальные задачи вне контекста транзакции цепочки. Это полезно для специальных процедур и/или нетранзакционных операций, например, CREATE DATABASE, REINDEX, VACUUM, CREATE TABLESPACE и т.д.

-- An advanced example showing how to use atutonomous tasks.
-- This one-task chain will execute test_proc() procedure.
-- Since procedure will make two commits (after f1() and f2())
-- we cannot use it as a regular task, because all regular tasks 
-- must be executed in the context of a single chain transaction.
-- Same rule applies for some other SQL commands, 
-- e.g. CREATE DATABASE, REINDEX, VACUUM, CREATE TABLESPACE, etc.
CREATE OR REPLACE FUNCTION f (msg TEXT) RETURNS void AS $$
BEGIN 
    RAISE notice '%', msg; 
END;
$$ LANGUAGE PLPGSQL;

CREATE OR REPLACE PROCEDURE test_proc () AS $$
BEGIN
    PERFORM f('hey 1');
    COMMIT;
    PERFORM f('hey 2');
    COMMIT;
END;
$$
LANGUAGE PLPGSQL;

WITH
    cte_chain (v_chain_id) AS (
        INSERT INTO timetable.chain (chain_name, run_at, max_instances, live, self_destruct) 
        VALUES (
            'call proc() every 10 sec', -- chain_name, 
            '@every 10 seconds',        -- run_at,
            1,     -- max_instances, 
            TRUE,  -- live, 
            FALSE -- self_destruct
        ) RETURNING chain_id
    ),
    cte_task(v_task_id) AS (
        INSERT INTO timetable.task (chain_id, task_order, kind, command, ignore_error, autonomous)
        SELECT v_chain_id, 10, 'SQL', 'CALL test_proc()', TRUE, TRUE
        FROM cte_chain
        RETURNING task_id
    )
SELECT v_chain_id, v_task_id FROM cte_task, cte_chain;

J.4.10.5. Завершить работу планировщика и завершить сеанс #

Этот пример демонстрирует, как завершить работу планировщика с помощью специальной встроенной задачи. Это может быть использовано для управления окнами обслуживания, для перезапуска планировщика с целью обновления или для остановки сессии перед удалением базы данных.

-- This one-task chain (aka job) will terminate pg_timetable session.
-- This is useful for maintaining purposes or before database being destroyed.
-- One should take care of restarting pg_timetable if needed.

SELECT timetable.add_job (
    job_name     => 'Shutdown pg_timetable session on schedule',
    job_schedule => '* * 1 * *',
    job_command  => 'Shutdown',
    job_kind     => 'BUILTIN'
);

J.4.10.6. Доступ к коду и результату предыдущей задачи из следующей задачи #

Этот пример демонстрирует, как проверить код результата и вывод предыдущей задачи. Если последняя задача завершилась с ошибкой, это возможно только если для этой задачи установлено ignore_error boolean = true. В противном случае планировщик остановит цепочку. Этот пример показывает, как вычислить количество неудачных, успешных и общее число выполненных задач. На основе этих значений можно рассчитать коэффициент успешности.

WITH 
    cte_chain (v_chain_id) AS ( -- let's create a new chain and add tasks to it later
        INSERT INTO timetable.chain (chain_name, run_at, max_instances, live) 
        VALUES ('many tasks', '* * * * *', 1, true)
        RETURNING chain_id
    ),
    cte_tasks(v_task_id) AS ( -- now we'll add 500 tasks to the chain, some of them will fail
        INSERT INTO timetable.task (chain_id, task_order, kind, command, ignore_error)
        SELECT v_chain_id, g.s, 'SQL', 'SELECT 1.0 / round(random())::int4;', true
        FROM cte_chain, generate_series(1, 500) AS g(s)
        RETURNING task_id
    ),
    report_task(v_task_id) AS ( -- and the last reporting task will calculate the statistic
        INSERT INTO timetable.task (chain_id, task_order, kind, command)
        SELECT v_chain_id, 501, 'SQL', $CMD$DO
$$
DECLARE
    s TEXT;
BEGIN
    WITH report AS (
        SELECT 
        count(*) FILTER (WHERE returncode = 0) AS success,
        count(*) FILTER (WHERE returncode != 0) AS fail,
        count(*) AS total
        FROM timetable.execution_log 
        WHERE chain_id = current_setting('pg_timetable.current_chain_id')::bigint
          AND txid = txid_current()
    )
    SELECT 'Tasks executed:' || total || 
         '; succeeded: ' || success || 
         '; failed: ' || fail || 
         '; ratio: ' || 100.0*success/GREATEST(total,1)
    INTO s
    FROM report;
    RAISE NOTICE '%', s;
END;
$$
$CMD$
        FROM cte_chain
        RETURNING task_id
    )
SELECT v_chain_id FROM cte_chain

J.4.11. Руководство по настройке цепочки YAML #

В этом руководстве объясняется, как использовать YAML-файлы для определения цепочек pg_timetable в качестве альтернативы конфигурации на основе SQL.

J.4.11.1. Обзор #

Определения цепочек в YAML предоставляют человекочитаемый способ создания цепочек запланированных задач без написания SQL. Преимущества включают:

  • Создание сложных многоэтапных рабочих процессов с четкой структурой

  • Контроль версий ваших конфигураций цепочек

  • Легкий просмотр и изменение запланированных задач

  • Совместное использование шаблонов цепочек между средами

J.4.11.2. Основное использование #

# Load YAML chains
pg_timetable --file chains.yaml postgresql://user:pass@host/db

# Validate YAML without importing
pg_timetable --file chains.yaml --validate

# Replace existing chains with same names
pg_timetable --file chains.yaml --replace postgresql://user:pass@host/db

J.4.11.3. YAML Format #

J.4.11.3.1. Базовая структура #
chains:
  - name: "chain-name"                 # Required: unique identifier
    schedule: "* * * * *"              # Required: cron format
    live: true                         # Optional: enable/disable chain
    max_instances: 1                   # Optional: max parallel executions
    timeout: 30000                     # Optional: timeout in milliseconds
    self_destruct: false               # Optional: delete after success
    exclusive: false                   # Optional: pause other chains while running
    client_name: "worker-1"            # Optional: restrict to specific client
    on_error: "SELECT log_error($1)"   # Optional: error handling SQL
    tasks:                             # Required: array of tasks
      - name: "task-name"              # Optional: task description
        kind: "SQL"                    # Optional: SQL, PROGRAM, or BUILTIN
        command: "SELECT now()"        # Required: command to execute
        run_as: "postgres"             # Optional: role for SET ROLE
        connect_string: "postgresql://user@host/otherdb"  # Optional: different database connection
        ignore_error: false            # Optional: continue on error
        autonomous: false              # Optional: run outside transaction
        timeout: 5000                  # Optional: task timeout in ms
        parameters:                    # Optional: task parameters, each entry causes separate execution
          - ["value1", 42]             # Parameters for SQL tasks are arrays of values
J.4.11.3.2. Параметры задачи #

Каждая задача может иметь несколько записей параметров, при этом каждая запись вызывает отдельное выполнение:

# SQL task parameters (arrays of values)
- name: "sql-task"
  kind: "SQL"
  command: "SELECT $1, $2, $3, $4"
  parameters:
    - ["one", 2, 3.14, false]    # First execution
    - ["two", 4, 6.28, true]     # Second execution

# PROGRAM task parameters (arrays of command-line arguments)
- name: "program-task" 
  kind: "PROGRAM"
  command: "iconv"
  parameters:
    - ["-x", "Latin-ASCII", "-o", "file1.txt", "input1.txt"]
    - ["-x", "UTF-8", "-o", "file2.txt", "input2.txt"]

# BUILTIN: Sleep task (integer values)
- name: "sleep-task"
  kind: "BUILTIN"
  command: "Sleep"
  parameters:
    - 5    # Sleep for 5 seconds
    - 10   # Then sleep for 10 seconds

# BUILTIN: Log task (string or object values)
- name: "log-task"
  kind: "BUILTIN"
  command: "Log"
  parameters:
    - "WARNING: Simple message"
    - level: "WARNING"
      details: "Object message"

# BUILTIN: SendMail task (complex object)
- name: "mail-task"
  kind: "BUILTIN"
  command: "SendMail"
  parameters:
    - username: "user@example.com"
      password: "password123"
      serverhost: "smtp.example.com"
      serverport: 587
      senderaddr: "user@example.com"
      toaddr: ["recipient@example.com"]
      subject: "Notification"
      msgbody: "<p>Hello User</p>"
      contenttype: "text/html; charset=UTF-8"
J.4.11.3.3. Примеры #
J.4.11.3.3.1. Простая SQL задача #
chains:
  - name: "daily-cleanup"
    schedule: "0 2 * * *"  # 2 AM daily
    live: true
    
    tasks:
      - name: "vacuum-tables"
        command: "VACUUM ANALYZE"
J.4.11.3.3.2. Многошаговая цепочка #
chains:
  - name: "data-pipeline"
    schedule: "0 1 * * *"  # 1 AM daily
    live: true
    max_instances: 1
    timeout: 7200000  # 2 hours
    
    tasks:
      - name: "extract"
        command: |
          CREATE TEMP TABLE temp_data AS
          SELECT * FROM source_table 
          WHERE date >= CURRENT_DATE - INTERVAL '1 day'
          
      - name: "validate"
        command: |
          DO $$
          BEGIN
            IF (SELECT COUNT(*) FROM temp_data) = 0 THEN
              RAISE EXCEPTION 'No data to process';
            END IF;
          END $$
          
      - name: "transform"
        command: "CALL transform_data_procedure()"
        autonomous: true
        
      - name: "load"
        command: "INSERT INTO target_table SELECT * FROM temp_data"
J.4.11.3.3.3. Задачи программы #
chains:
  - name: "backup-job"
    schedule: "0 3 * * 0"  # Sunday 3 AM
    live: true
    client_name: "backup-worker"
    
    tasks:
      - name: "database-backup"
        kind: "PROGRAM"
        command: "pg_dump"
        parameters:
          - ["-h", "localhost", "-U", "postgres", "-d", "mydb", "-f", "/backups/mydb.sql"]
        timeout: 3600000  # 1 hour
        
      - name: "compress-backup"
        kind: "PROGRAM" 
        command: "gzip"
        parameters: 
          - ["/backups/mydb.sql"]
J.4.11.3.3.4. Несколько цепочек в одном файле #
chains:
  # Monitoring chain
  - name: "health-check"
    schedule: "*/15 * * * *"  # Every 15 minutes
    live: true
    
    tasks:
      - command: "SELECT check_database_health()"
      
  # Cleanup chain  
  - name: "hourly-cleanup"
    schedule: "0 * * * *"  # Every hour
    live: true
    
    tasks:
      - command: "DELETE FROM logs WHERE created_at < now() - interval '7 days'"

J.4.11.4. Расширенные возможности #

J.4.11.4.1. Обработка ошибок #

Управление поведением при ошибках с помощью ignore_error и on_error:

chains:
  - name: "resilient-chain"
    on_error: |
      SELECT pg_notify('monitoring', 
            format('{"ConfigID": %s, "Message": "Something bad happened"}', 
                current_setting('pg_timetable.current_chain_id')::bigint))
    
    tasks:
      - name: "risky-task"
        command: "SELECT might_fail()"
        ignore_error: true  # Continue chain execution even if this task fails
        
      - name: "cleanup-task"
        command: "SELECT cleanup()"  # Always runs, even if previous task failed
J.4.11.4.2. Управление транзакциями #

Используйте autonomous: true для задач, которые необходимо выполнять вне основной транзакции:

tasks:
  - name: "vacuum-task"
    command: "VACUUM FULL heavy_table"
    autonomous: true  # Required for VACUUM FULL
    
  - name: "create-database"
    command: "CREATE DATABASE new_db"
    autonomous: true  # CREATE DATABASE requires autonomous transaction
J.4.11.4.3. Удалённые базы данных #

Выполнение задач в разных базах данных:

tasks:
  - name: "cross-database-task"
    command: "SELECT sync_data()"
    connect_string: "postgresql://user:pass@other-host/other-db"

J.4.11.5. Валидация #

YAML-файлы проверяются при загрузке:

  • Синтаксис: Допустимый формат YAML

  • Структура: Обязательные поля присутствуют

  • Cron: Допустимые 5-полюсные cron-выражения

  • Виды задач: Должно быть SQL, PROGRAM или BUILTIN

  • Тайм-ауты: неотрицательные целые числа

Используйте --validate, чтобы проверить файлы без импорта:

pg_timetable --file chains.yaml --validate

J.4.11.6. Миграция с SQL #

J.4.11.6.1. Преобразование существующих цепочек #

Чтобы преобразовать SQL-цепочки в YAML:

  1. Информация о цепочке запросов и задачах:

    SELECT *
    FROM timetable.chain c 
    WHERE c.chain_name = 'my-chain';
    
    SELECT t.*
    FROM timetable.task t JOIN 
         timetable.chain c ON t.chain_id = c.chain_id AND c.chain_name = 'my-chain'
    ORDER BY t.task_order;
    
  2. Преобразовать в формат YAML:

    • chain_namename

    • run_atschedule

    • livelive

    • max_instancesmax_instances

    • Поля задачи отображаются напрямую

  3. Проверьте преобразование:

    pg_timetable --file converted.yaml --validate
    
J.4.11.6.2. Пример миграции #

Исходный SQL:

SELECT timetable.add_job(
    job_name => 'daily-report',
    job_schedule => '0 9 * * *',
    job_command => 'CALL generate_report()',
    job_live => TRUE
);

Преобразованный YAML:

chains:
  - name: "daily-report"
    schedule: "0 9 * * *"
    live: true
    
    tasks:
      - command: "CALL generate_report()"

J.4.11.7. Лучшие практики #

J.4.11.7.1. Соглашения об именовании #
  • Используйте описательные имена в kebab-case

  • Включайте среду в имя для ясности

  • Группируйте связанные цепочки в одном файле

J.4.11.7.2. Документация #
  • Используйте комментарии YAML для документирования сложной логики

  • Включайте назначение и зависимости в имена задач

  • Задокументируйте значения параметров

chains:
  - name: "etl-sales-data"
    # Processes daily sales data from external API
    # Depends on: external API availability, sales_raw table
    schedule: "0 2 * * *"
    
    tasks:
      - name: "extract-from-api"
        # Fetches last 24h of sales data from REST API
        command: "SELECT fetch_sales_data($1)"
        parameters: ["yesterday"]
J.4.11.7.3. Тестирование #
  • Всегда проверяйте YAML перед развертыванием

  • Тест с флагом --validate

  • Используйте неактивные цепочки для тестирования

  • Храните резервные копии рабочих конфигураций

J.4.11.7.4. Управление версиями #
  • Храните YAML-файлы в системе управления версиями

  • Используйте содержательные сообщения фиксаций

  • Тегированные релизы для производственных внедрений

  • Проверьте изменения перед слиянием

J.4.11.8. Устранение неполадок #

J.4.11.8.1. Распространённые проблемы #

Недопустимый синтаксис YAML:

Error: failed to parse YAML: yaml: line 5: found character that cannot start any token

→ Проверьте отступы и кавычки

Недопустимый формат cron:

Error: invalid cron format: 0 9 * * (expected 5 fields)

→ Убедитесь, что в cron ровно 5 полей

Цепочка уже существует:

Error: chain 'my-chain' already exists (use --replace flag to overwrite)

→ Используйте флаг --replace или выберите другое имя

Отсутствуют обязательные поля:

Error: chain 1: chain name is required

→ Проверьте, что все обязательные поля присутствуют

J.4.12. Миграция с других планировщиков #

J.4.12.1. Перенос заданий из pg_cron в pg_timetable #

Если вы хотите быстро экспортировать задания, запланированные из pg_cron в pg_timetable, вы можете использовать этот фрагмент SQL:

SELECT timetable.add_job(
    job_name            => COALESCE(jobname, 'job: ' || command),
    job_schedule        => schedule,
    job_command         => command,
    job_kind            => 'SQL',
    job_live            => active
) FROM cron.job;

timetable.add_job(), однако, имеет некоторые ограничения. Прежде всего, функция пометит созданную задачу как автономную, указывая, что планировщик должен выполнять задачу вне цепочки транзакций. Это не ошибка, но множество автономных цепочек может привести к использованию дополнительных соединений.

Во-вторых, параметры подключения к базе данных теряются для исходных pg_cron задач, делая все задачи локальными. Чтобы экспортировать всю доступную информацию как можно точнее, используйте этот фрагмент SQL под ролью, в которой они были запланированы в pg_cron:

SET ROLE 'scheduler'; -- set the role used by pg_cron

WITH cron_chain AS (
    SELECT
        nextval('timetable.chain_chain_id_seq'::regclass) AS cron_id,
        jobname,
        schedule,
        active,
        command,
        CASE WHEN 
            database != current_database()
            OR nodename != 'localhost'
            OR username != CURRENT_USER
            OR nodeport != inet_server_port() 
        THEN
            format('host=%s port=%s dbname=%s user=%s', nodename, nodeport, database, username)
        END AS connstr
    FROM
        cron.job
),
cte_chain AS (
 INSERT INTO timetable.chain (chain_id, chain_name, run_at, live)
     SELECT 
         cron_id, COALESCE(jobname, 'cronjob' || cron_id), schedule, active
     FROM
         cron_chain
),
cte_tasks AS (
 INSERT INTO timetable.task (chain_id, task_order, kind, command, database_connection)
     SELECT
         cron_id, 1, 'SQL', command, connstr
     FROM
         cron_chain
     RETURNING
         chain_id, task_id
)
SELECT * FROM cte_tasks;

J.4.12.2. Перенос заданий из pgAgent в pg_timetable #

Чтобы перенести задания из pgAgent, пожалуйста, используйте этот скрипт. pgAgent не имеет концепции задачи PROGRAM, поэтому для эмуляции шагов BATCH, pg_timetable будет выполнять их внутри оболочки. Вы можете изменить оболочку, отредактировав предложение CTE cte_shell.

CREATE OR REPLACE FUNCTION bool_array_to_cron(bool[], start_with int4 DEFAULT 0) RETURNS TEXT AS
$$
WITH u AS (
 SELECT unnest($1) e, generate_series($2, array_length($1, 1)-1+$2) AS i 
)
SELECT COALESCE(string_agg(i::text, ','), '*') FROM u WHERE e
$$
LANGUAGE sql;


WITH
cte_shell(shell, cmd_param) AS (
 VALUES ('sh', '-c') -- set the shell you want to use for batch steps, e.g. "pwsh -c", "cmd /C"
),
pga_schedule AS (
 SELECT
     s.jscjobid,
     s.jscname,
     format('%s %s %s %s %s', 
         bool_array_to_cron(s.jscminutes), 
         bool_array_to_cron(s.jschours), 
         bool_array_to_cron(s.jscmonthdays), 
         bool_array_to_cron(s.jscmonths, 1), 
         bool_array_to_cron(s.jscweekdays, 1)) AS schedule
 FROM 
     pgagent.pga_schedule s  
 WHERE s.jscenabled 
         AND now() < COALESCE(s.jscend, 'infinity'::timestamptz)
         AND now() > s.jscstart
),
pga_chain AS (
    SELECT
        nextval('timetable.chain_chain_id_seq'::regclass) AS chain_id,
        jobid,
        format('%s @ %s', jobname, jscname) AS jobname,
        jobhostagent,
        jobenabled,
        schedule
    FROM
        pgagent.pga_job JOIN pga_schedule ON jobid = jscjobid
),
cte_chain AS (
 INSERT INTO timetable.chain (chain_id, chain_name, client_name, run_at, live)
     SELECT 
         chain_id, jobname, jobhostagent, schedule, jobenabled
     FROM
         pga_chain
),
pga_step AS (
 SELECT 
     c.chain_id,
     nextval('timetable.task_task_id_seq'::regclass) AS task_id,
     rank() OVER (ORDER BY jstname) AS jstorder,
     jstid,
     jstname,
     jstenabled,
     CASE jstkind WHEN 'b' THEN 'PROGRAM' ELSE 'SQL' END AS jstkind,
     jstcode,
     COALESCE(
         NULLIF(jstconnstr, ''), 
         CASE 
             WHEN jstdbname = current_database() THEN NULL
             WHEN jstdbname > '' THEN 'dbname=' || jstdbname 
         END
     ) AS jstconnstr,
     jstonerror != 'f' AS jstignoreerror
 FROM
     pga_chain c JOIN pgagent.pga_jobstep js ON c.jobid = js.jstjobid
),
cte_tasks AS (
 INSERT INTO timetable.task(task_id, chain_id, task_name, task_order, kind, command, database_connection)
     SELECT
         task_id, chain_id, jstname, jstorder, jstkind::timetable.command_kind, 
         CASE jstkind WHEN 'SQL' THEN jstcode ELSE sh.shell END,
         jstconnstr
     FROM
         pga_step, cte_shell sh
),
cte_parameters AS (
 INSERT INTO timetable.parameter (task_id, order_id, value)
     SELECT 
         task_id, 1, jsonb_build_array(sh.cmd_param, s.jstcode)
     FROM
         pga_step s, cte_shell sh
     WHERE 
         s.jstkind = 'PROGRAM'
)
SELECT * FROM pga_chain;

J.4.13. REST API #

pg_timetable имеет богатый REST API, который может использоваться внешними инструментами для выполнения запуска/остановки/инициализации/перезапуска/перезагрузки, любыми инструментами для выполнения HTTP-проверок состояния, и, конечно, также может использоваться для мониторинга.

Ниже вы найдете список pg_timetable REST API конечных точек.

J.4.13.1. Точки проверки состояния #

GET /liveness

Всегда возвращает код состояния HTTP 200, указывая на то, что pg_timetable работает.

GET /readiness

Возвращает HTTP статус-код 200, когда pg_timetable работает, и планировщик находится в основном цикле обработки цепочек. Если планировщик подключается к базе данных, создает схему базы данных или обновляет ее, он вернет HTTP статус-код 503.

J.4.13.2. Управление цепочкой конечные точки #

GET /startchain?id=<chain-id>

Возвращает HTTP статус-код 200, если цепочка с данным идентификатором может быть добавлена в очередь рабочих. Однако это не означает, что выполнение цепочки начнется немедленно. Рабочий процесс должен выполнить проверку нагрузки и другие проверки перед началом выполнения цепочки. В случае ошибки возвращается HTTP статус-код 400 с последующим сообщением об ошибке.

GET /stopchain?id=<chain-id>

Возвращает HTTP статус-код 200, если цепочка с данным идентификатором работает в данный момент и может быть остановлена. Если цепочка выполняется, сигнал отмены будет отправлен немедленно. В случае ошибки возвращается HTTP статус-код 400 с сообщением об ошибке.

J.4.14. Формат определения цепочки YAML для pg_timetable #

Этот документ определяет формат YAML для описания цепочек запланированных задач в pg_timetable.

J.4.14.1. YAML-схема #

# Top-level structure
chains:
  - name: "chain-name"                        # Required: chain_name (TEXT, unique)
    schedule: "* * * * *"                     # Required: run_at (cron format)
    live: true                                # Optional: live (BOOLEAN), default: false
    max_instances: 1                          # Optional: max_instances (INTEGER)
    timeout: 30000                            # Optional: timeout in milliseconds (INTEGER)
    self_destruct: false                      # Optional: self_destruct (BOOLEAN), default: false
    exclusive: false                          # Optional: exclusive_execution (BOOLEAN), default: false  
    client_name: "worker-1"                   # Optional: client_name (TEXT)
    on_error: "SELECT log_error()"            # Optional: on_error SQL (TEXT)
    
    tasks:                                                # Required: array of tasks
      - name: "task-1"                                    # Optional: task_name (TEXT)
        kind: "SQL"                                       # Optional: kind (SQL|PROGRAM|BUILTIN), default: SQL
        command: "SELECT $1, $2"                          # Required: command (TEXT)
        parameters:                                       # Optional: parameters (array of execution parameters)
          - ["value1", 42]                                # First execution with these parameters
          - ["value2", 99]                                # Second execution with different parameters
        run_as: "postgres"                                # Optional: run_as (TEXT) - role for SET ROLE
        connect_string: "postgresql://user@host/otherdb"  # Optional: database_connection (TEXT)
        ignore_error: false                               # Optional: ignore_error (BOOLEAN), default: false
        autonomous: false                                 # Optional: autonomous (BOOLEAN), default: false
        timeout: 5000                                     # Optional: timeout in milliseconds (INTEGER)
        
      - name: "task-2"
        kind: "PROGRAM"
        command: "bash"
        parameters: ["-c", "echo hello"]
        ignore_error: true

J.4.14.2. Соответствие полей #

J.4.14.2.1. Уровень цепочки #
Поле YAMLСтолбец БДТипПо умолчаниюОписание
namechain_nameTEXTобязательноУникальный идентификатор цепочки
schedulerun_atcronобязательноРасписание в стиле cron
liveliveBOOLEANfalseАктивна ли цепочка
max_instancesmax_instancesINTEGERnullМаксимальное количество параллельных экземпляров
timeouttimeoutINTEGER0Тайм-аут цепочки (мс)
self_destructself_destructBOOLEANfalseУдалить после успешного выполнения
exclusiveexclusive_executionBOOLEANfalseПриостанавливать другие цепочки
client_nameclient_nameТЕКСТnullОграничить для конкретного клиента
on_erroron_errorТЕКСТnullSQL для обработки ошибок
J.4.14.2.2. Уровень задачи #
Поле YAMLСтолбец БДТипПо умолчаниюОписание
nametask_nameTEXTnullОписание задачи
kindkindENUM'SQL'Тип команды (SQL/PROGRAM/BUILTIN)
commandcommandTEXTобязательноКоманда для выполнения
параметрычерез timetable.parameterМассив любых значенийnullМассив значений параметров, хранящихся как отдельные строки JSONB с order_id
run_asrun_asTEXTnullРоль для SET ROLE
connect_stringdatabase_connectionTEXTnullСтрока подключения
ignore_errorignore_errorBOOLEANfalseПродолжать при ошибке
autonomousautonomousBOOLEANfalseВыполнять вне транзакции
timeouttimeoutINTEGER0Тайм-аут задачи (мс)

J.4.14.3. Порядок выполнения задач #

Задачи упорядочены последовательно внутри цепочки на основе их позиции в массиве. Система автоматически присвоит соответствующие значения task_order с интервалами (например, 10, 20, 30), чтобы обеспечить возможность будущих вставок.

J.4.14.4. Примеры #

J.4.14.4.1. Простая SQL задача #
chains:
  - name: "daily-report"
    schedule: "0 9 * * *"  # 9 AM daily
    live: true
    tasks:
      - name: "generate-report"
        command: "CALL generate_daily_report()"
J.4.14.4.2. Многозадачная цепочка #
chains:
  - name: "etl-pipeline"
    schedule: "0 2 * * *"  # 2 AM daily
    live: true
    max_instances: 1
    timeout: 3600000  # 1 hour
    
    tasks:
      - name: "extract-data"
        command: "SELECT extract_sales_data($1)"
        parameters: ["2023-01-01"]
        
      - name: "transform-data"  
        command: "CALL transform_sales_data()"
        autonomous: true
        
      - name: "load-data"
        command: "CALL load_to_warehouse()"
        ignore_error: false
J.4.14.4.3. Программная задача #
chains:
  - name: "backup-job"
    schedule: "0 3 * * 0"  # Sunday 3 AM
    live: true
    
    tasks:
      - name: "pg-dump"
        kind: "PROGRAM"
        command: "pg_dump"
        parameters: 
          - ["-h", "localhost", "-U", "postgres", "-d", "mydb", "-f", "/backups/mydb.sql"]

J.4.14.5. Правила проверки #

  1. Обязательные поля: name, schedule, tasks и command для каждой задачи

  2. Уникальные имена: Имена цепочек должны быть уникальными в пределах базы данных

  3. Допустимый Cron: Расписание должно быть в допустимом формате cron (5 полей)

  4. Допустимый тип: Тип задачи должен быть одним из следующих: SQL, PROGRAM, BUILTIN

  5. Типы параметров: Параметры могут быть любым типом, совместимым с JSON (строки, числа, логические значения, массивы, объекты) и хранятся как отдельные значения JSONB

  6. Значения тайм-аутов: Должны быть неотрицательными целыми числами (миллисекунды)