H.2. pg_timetable#

H.2. pg_timetable

H.2. pg_timetable #

pg_timetable является продвинутым планировщиком заданий для PostgreSQL, предлагая множество преимуществ по сравнению с традиционными планировщиками, такими как крон и другие. Он полностью управляется базой данных и предоставляет несколько продвинутых концепций.

H.2.1. О pg_timetable #

Версия : 5.9.0

GitHub

H.2.2. Основные функции #

  • Задачи могут быть организованы в цепочки

  • Цепочка может состоять из встроенных команд, SQL и исполняемых файлов

  • Параметры могут быть переданы в цепочки

  • Пропущенные задачи (возможно, из-за простоя) могут быть повторно выполнены автоматически

  • Поддержка настраиваемых повторений

  • Встроенные задачи, такие как отправка электронных писем и т.д.

  • Полностью управляемая базой данных конфигурация

  • Полная поддержка ведения журналов на основе базы данных

  • Планирование в стиле Cron в часовом поясе сервера PostgreSQL

  • Необязательная защита от параллелизма

  • Задача и цепочка могут иметь настройки времени ожидания выполнения

H.2.3. Быстрый старт #

  1. Скачайте исполняемый файл pg_timetable

  2. Убедитесь, что ваш сервер PostgreSQL запущен и имеет роль с привилегией CREATE для целевой базы данных, например,

    my_database=> CREATE ROLE scheduler PASSWORD 'somestrong';
    my_database=> GRANT CREATE ON DATABASE my_database TO scheduler;
    
  3. Создайте новую задачу, например, выполняйте VACUUM каждую ночь в 00:30 по часовому поясу сервера Postgres

    my_database=> SELECT timetable.add_job('frequent-vacuum', '30 * * * *', 'VACUUM');
    add_job
    ---------
          3
    (1 row)
    
  4. Запустите pg_timetable

    # pg_timetable postgresql://scheduler:somestrong@localhost/my_database --clientname=vacuumer
    
  5. Успех!

H.2.4. Параметры командной строки #

# ./pg_timetable

Application Options:
  -c, --clientname=                                Unique name for application instance [$PGTT_CLIENTNAME]
      --config=                                    YAML configuration file
      --no-program-tasks                           Disable executing of PROGRAM tasks [$PGTT_NOPROGRAMTASKS]
  -v, --version                                    Output detailed version information [$PGTT_VERSION]

Connection:
  -h, --host=                                      PostgreSQL host (default: localhost) [$PGTT_PGHOST]
  -p, --port=                                      PostgreSQL port (default: 5432) [$PGTT_PGPORT]
  -d, --dbname=                                    PostgreSQL database name (default: timetable) [$PGTT_PGDATABASE]
  -u, --user=                                      PostgreSQL user (default: scheduler) [$PGTT_PGUSER]
      --password=                                  PostgreSQL user password [$PGTT_PGPASSWORD]
      --sslmode=[disable|require]                  Connection SSL mode (default: disable) [$PGTT_PGSSLMODE]
      --pgurl=                                     PostgreSQL connection URL [$PGTT_URL]
      --timeout=                                   PostgreSQL connection timeout (default: 90) [$PGTT_TIMEOUT]

Logging:
      --log-level=[debug|info|error]               Verbosity level for stdout and log file (default: info)
      --log-database-level=[debug|info|error|none] Verbosity level for database storing (default: info)
      --log-file=                                  File name to store logs
      --log-file-format=[json|text]                Format of file logs (default: json)
      --log-file-rotate                            Rotate log files
      --log-file-size=                             Maximum size in MB of the log file before it gets rotated (default: 100)
      --log-file-age=                              Number of days to retain old log files, 0 means forever (default: 0)
      --log-file-number=                           Maximum number of old log files to retain, 0 to retain all (default: 0)

Start:
  -f, --file=                                      SQL script file to execute during startup
      --init                                       Initialize database schema to the latest version and exit. Can be used
                                                   with --upgrade
      --upgrade                                    Upgrade database to the latest version
      --debug                                      Run in debug mode. Only asynchronous chains will be executed

Resource:
      --cron-workers=                              Number of parallel workers for scheduled chains (default: 16)
      --interval-workers=                          Number of parallel workers for interval chains (default: 16)
      --chain-timeout=                             Abort any chain that takes more than the specified number of
                                                   milliseconds
      --task-timeout=                              Abort any task within a chain that takes more than the specified number
                                                   of milliseconds

REST:
      --rest-port=                                 REST API port (default: 0) [$PGTT_RESTPORT]

H.2.5. Вклад #

Если вы хотите внести вклад в pg_timetable и помочь сделать его лучше, не стесняйтесь открыть обсуждение или даже рассмотреть возможность отправки запроса на изменение. Вы также можете поставить звезду проекту pg_timetable проект, и рассказать о нём миру.

H.2.6. Поддержка #

Для профессиональной поддержки, пожалуйста, свяжитесь с Cybertec.

H.2.7. Авторы #

Реализация: Павло Голуб

Изначальная идея и проектирование: Ханс-Юрген Шёниг

H.2.8. Фон проекта #

Проект pg_timetable был запущен еще в 2019 году для внутренних нужд планирования в компании Cybertec.

Для получения дополнительной информации о мотивах проекта и целях проектирования см. оригинальную серию блог-постов, объявляющих о проекте, и последующие обновления функций.

Cybertec также предоставляет коммерческую поддержку с 9 до 5 и 24/7 для pg_timetable.

H.2.8.1. Обратная связь по проекту #

Для запросов на добавление функций или помощи в устранении неполадок, пожалуйста, откройте проблему на странице Github проекта.

H.2.9. Установка #

pg_timetable совместим с последними поддерживаемыми версиями PostgreSQL: 11, 12, 13, 14 (стабильные); 15 (разработка).

Примечание

Если вы хотите использовать pg_timetable с более старыми версиями (9.5, 9.6 и 10)... пожалуйста, выполните эту SQL команду перед запуском pg_timetable:

CREATE OR REPLACE FUNCTION starts_with(text, text)
RETURNS bool AS
$$
SELECT
    CASE WHEN length($2) > length($1) THEN
        FALSE
    ELSE
        left($1, length($2)) = $2
    END
$$
LANGUAGE SQL
IMMUTABLE STRICT PARALLEL SAFE
COST 5;

H.2.9.1. Официальные пакеты релизов #

Вы можете найти бинарный пакет для вашей платформы на официальной странице выпусков . Прямо сейчас Windows, Linux и macOS пакеты доступны.

H.2.9.2. Docker #

Официальный образ docker можно найти здесь: https://hub.docker.com/r/cybertecpostgresql/pg_timetable

Примечание

Тег latest актуален с веткой master благодаря этому действию github. В производственной среде вам, вероятно, следует использовать последний стабильный тег.

Запустите pg_timetable в Docker:

docker run --rm \
cybertecpostgresql/pg_timetable:latest \
-h 10.0.0.3 -p 54321 -c worker001

Запустите pg_timetable в Docker с переменными окружения:

docker run --rm \
-e PGTT_PGHOST=10.0.0.3 \
-e PGTT_PGPORT=54321 \
cybertecpostgresql/pg_timetable:latest \
-c worker001

H.2.9.3. Сборка из исходников #

  1. Скачайте и установите Go на вашу систему.

  2. Склонируйте pg_timetable репозиторий:

    $ git clone https://github.com/cybertec-postgresql/pg_timetable.git
    $ cd pg_timetable
    
  3. Запустите pg_timetable:

    $ go run main.go --dbname=dbname --clientname=worker001 --user=scheduler --password=strongpwd
    
  4. Или создайте бинарный файл и запустите его:

    $ go build
    $ ./pg_timetable --dbname=dbname --clientname=worker001 --user=scheduler --password=strongpwd
    
  5. (Необязательно) Запустите тесты во всех подкаталогах проекта:

    $ psql --command="CREATE USER scheduler PASSWORD 'somestrong'"
    $ createdb --owner=scheduler timetable
    $ go test -failfast -timeout=300s -count=1 -p 1 ./...
    

H.2.10. Компоненты #

Планирование в pg_timetable включает три различных уровня абстракции, чтобы облегчить повторное использование с другими параметрами или дополнительными расписаниями.

Command:

Базовый уровень, команда , определяет что делать.

Task:

Второй уровень, task , представляет собой элемент цепочки (шаг) для выполнения одной из команд. С помощью задачи мы определяем порядок команд, передаваемые аргументы (если есть), и как обрабатываются ошибки.

Chain:

Третий уровень представляет собой связанные задачи, образующие цепочку задач. Цепочка определяет если, когда и как часто задача должна выполняться.

H.2.10.1. Команда #

В настоящее время существует три различных типа команд:

SQL

SQL фрагмент. Начало очистки, обновление материализованного представления или обработка данных.

PROGRAM

Внешняя команда. Все, что может быть вызвано как внешний бинарный файл, включая оболочки, например, bash, pwsh и т.д. Внешняя команда будет вызвана с использованием exec.CommandContext из golang.

BUILTIN

Внутренняя команда. Встроенная функциональность, включенная в pg_timetable . Эти включают:

  • NoOp,

  • Сон,

  • Журнал,

  • SendMail,

  • Скачать,

  • CopyFromFile,

  • КопироватьВФайл,

  • Shutdown.

H.2.10.2. Задача #

Следующим строительным блоком является task , который просто представляет собой шаг в списке цепочки команд. Пример задач, объединенных в цепочку, может быть:

  1. Загрузить файлы с сервера

  2. Импорт файлов

  3. Выполнение агрегаций

  4. Создать отчет

  5. Удалить файлы с диска

Примечание

Все задачи цепочки в pg_timetable выполняются в рамках одной транзакции. Однако, обратите внимание, что нет возможности откатить задачи PROGRAM и BUILTIN.

H.2.10.2.1. Таблица timetable.task #
chain_id bigint

Ссылка на цепочку, если задача NULL считается отключенной

task_order DOUBLE PRECISION

Указывает порядок задачи в цепочке.

kind timetable.command_kind

Тип команды. Может быть SQL (по умолчанию), PROGRAM или BUILTIN.

command text

Содержит либо SQL-команду, либо путь к приложению или имя команды BUILTIN, которая будет выполнена.

run_as text

Роль, от имени которой должна выполняться задача.

database_connection text

Строка подключения для внешней базы данных, которая должна быть использована.

ignore_error boolean

Укажите, следует ли продолжать выполнение следующей задачи после возникновения ошибки (по умолчанию: false).

autonomous boolean

Укажите, следует ли выполнять задачу вне транзакции цепочки. Полезно для VACUUM, CREATE DATABASE, CALL и т. д.

timeout integer

Прервать любую задачу в цепочке, которая занимает больше указанного количества миллисекунд.

Предупреждение

Если task был настроен с ignore_error, установленным в true (значение по умолчанию - false), рабочий процесс будет сообщать об успешном выполнении даже если задача в цепочке завершится неудачей.

Как упоминалось выше, команды являются простыми скелетами (например, отправить email, вакуум и т.д.). В большинстве случаев, они должны быть оживлены путем передачи входных параметров для выполнения.

H.2.10.2.2. Table timetable.parameter #
task_id bigint

Идентификатор задачи.

order_id integer

Порядок параметра. Несколько параметров обрабатываются один за другим в соответствии с порядком.

value jsonb

Значение JSON, содержащее параметры.

H.2.10.2.3. Формат значения параметра #

В зависимости от команда аргумент kind может быть представлен различными значениями JSON.

Kind
Schema

Пример

SQL
array
'[ "one", 2, 3.14, false ]'::jsonb
PROGRAM
array of strings
'["-x", "Latin-ASCII", "-o", "orte_ansi.txt", "orte.txt"]'::jsonb
BUILTIN: Sleep
integer
'5' :: jsonb
BUILTIN: Log
any
'"WARNING"'::jsonb
'{"Status": "WARNING"}'::jsonb
BUILTIN: SendMail
object
'{
    "username":     "user@example.com",
    "password":     "password",
    "serverhost":   "smtp.example.com",
    "serverport":   587,
    "senderaddr":   "user@example.com",
    "ccaddr":       ["recipient_cc@example.com"],
    "bccaddr":      ["recipient_bcc@example.com"],
    "toaddr":       ["recipient@example.com"],
    "subject":      "pg_timetable - No Reply",
    "attachment":   ["/temp/attachments/Report.pdf","config.yaml"],
    "attachmentdata": [{"name": "File.txt", "base64data": "RmlsZSBDb250ZW50"}],
    "msgbody":      "<h2>Hello User,</h2> <p>check some attachments!</p>",
    "contenttype":   "text/html; charset=UTF-8"
}'::jsonb
BUILTIN: Download
object
'{
    "workersnum": 2,
    "fileurls": ["http://example.com/foo.gz", "https://example.com/bar.csv"],
    "destpath": "."
}'::jsonb
BUILTIN: CopyFromFile
object
'{
    "sql": "COPY location FROM STDIN",
    "filename": "download/orte_ansi.txt"
}'::jsonb
BUILTIN: CopyToFile
object
'{
    "sql": "COPY location TO STDOUT",
    "filename": "download/location.txt"
}'::jsonb
BUILTIN: Shutdown

значение игнорируется

BUILTIN: NoOp

значение игнорируется

H.2.10.3. Цепочка #

Как только задачи были упорядочены, их необходимо запланировать как цепочка . Для этого, pg_timetable основывается на улучшенной крон -строка, при этом добавляя несколько параметров конфигурации.

H.2.10.3.1. Table timetable.chain #
chain_name text

Уникальное имя цепочки.

run_at timetable.cron

Стандартное значение в стиле cron в часовом поясе сервера Postgres или @after, @every, @reboot условие.

max_instances integer

Количество экземпляров, которые эта цепочка может иметь работающими одновременно.

timeout integer

Прервать любую цепочку, которая занимает больше указанного количества миллисекунд.

live boolean

Управление возможностью выполнения цепочки, когда она достигает своего расписания.

self_destruct boolean

Самоуничтожение цепочки после успешного выполнения. Неудачные цепочки будут выполнены еще раз в соответствии с расписанием.

exclusive_execution boolean

Указывает, следует ли выполнять цепочку исключительно, пока все остальные цепочки приостановлены.

client_name text

Указывает, какой клиент должен выполнить цепочку. Установите это значение в NULL, чтобы разрешить выполнение любому клиенту.

timeout integer

Прервать цепочку, которая занимает больше указанного количества миллисекунд.

on_error

Содержит SQL для выполнения в случае ошибки. Если задача, вызвавшая ошибку, помечена как ignore_error, то ничего не делается.

Примечание

Все цепочки в pg_timetable запланированы в часовом поясе сервера PostgreSQL. Вы можете изменить часовой пояс для текущая сессия при добавлении новых цепочек, например

SET TIME ZONE 'UTC';

-- Run VACUUM at 00:05 every day in August UTC
SELECT timetable.add_job('execute-func', '5 0 * 8 *', 'VACUUM');

H.2.11. Начало работы #

Разнообразные примеры можно найти в Примеры. Если вы хотите перейти с другого планировщика, вы можете использовать скрипты из Миграция с других планировщиков главы.

H.2.11.1. Добавить простую задачу #

В реальном мире обычно достаточно использовать простые задания. Под этим термином мы понимаем:

  • работа является цепочкой с только одним task (шаг) в нем;

  • он не использует сложную логику, а скорее простую команда;

  • это не требует сложной обработки транзакций, так как одна задача неявно выполняется как одна транзакция.

Для такой группы цепочек мы ввели специальную функцию timetable.add_job().

timetable.add_job(job_name, job_schedule, job_command, ...) RETURNS BIGINT

Создает простую цепочку из одной задачи

Parameters:
  • job_name (текст) – Уникальное имя цепочка и команда.

  • job_schedule (timetable.cron) – Расписание времени в синтаксисе cron в часовом поясе сервера Postgres

  • job_command (текст) – SQL, который будет выполнен.

  • job_parameters (jsonb) – Аргументы для цепочки команда . По умолчанию: NULL.

  • job_kind (timetable.command_kind) – Вид команды: SQL, PROGRAM или BUILTIN. По умолчанию: SQL.

  • job_client_name (текст) – Указывает, какой клиент должен выполнить цепочку. Установите это значение в NULL, чтобы разрешить выполнение любому клиенту. По умолчанию: NULL.

  • job_max_instances (integer) – Количество экземпляров, которые эта цепочка может иметь, работающих одновременно. По умолчанию: NULL.

  • job_live (boolean) – Управляет возможностью выполнения цепочки, когда она достигает своего расписания. По умолчанию: TRUE.

  • job_self_destruct (boolean) – Самоуничтожение цепочки после выполнения. По умолчанию: FALSE.

  • job_ignore_errors (boolean) – Игнорировать ошибку во время выполнения. По умолчанию: TRUE.

  • job_exclusive (boolean) – Выполнить цепочку в эксклюзивном режиме. По умолчанию: FALSE.

Returns:

ID созданной цепочки

Return type:

целое число

H.2.11.2. Примеры #

  1. Запустите public.my_func() в 00:05 каждый день в августе по часовому поясу сервера Postgres:

    SELECT timetable.add_job('execute-func', '5 0 * 8 *', 'SELECT public.my_func()');
    
  2. Запускать VACUUM на 23-й минуте каждого 2-го часа с 0 до 20 каждый день по часовому поясу сервера Postgres:

    SELECT timetable.add_job('run-vacuum', '23 0-20/2 * * *', 'VACUUM');
    
  3. Обновлять материализованное представление каждые 2 часа:

    SELECT timetable.add_job('refresh-matview', '@every 2 hours', 'REFRESH MATERIALIZED VIEW public.mat_view');
    
  4. Очистить таблицу журнала после pg_timetable перезапуск:

    SELECT timetable.add_job('clear-log', '@reboot', 'TRUNCATE timetable.log');
    
  5. Переиндексация в полночь по часовому поясу сервера Postgres по воскресеньям с утилитой reindexdb :

    • используя базу данных по умолчанию под пользователем по умолчанию (без аргументов командной строки)

      SELECT timetable.add_job('reindex', '0 0 * * 7', 'reindexdb', job_kind := 'PROGRAM');
      
    • указание целевой базы данных и таблиц, и быть подробным

      SELECT timetable.add_job('reindex', '0 0 * * 7', 'reindexdb',
          '["--table=foo", "--dbname=postgres", "--verbose"]'::jsonb, 'PROGRAM');
      
    • передача пароля с использованием переменной окружения через bash оболочку

      SELECT timetable.add_job('reindex', '0 0 * * 7', 'bash',
          '["-c", "PGPASSWORD=5m3R7K4754p4m reindexdb -U postgres -h 192.168.0.221 -v"]'::jsonb,
          'PROGRAM');
      

H.2.12. Примеры #

H.2.12.1. Основы #

Этот пример демонстрирует, как создать базовую цепочку из одного шага с параметрами. Он использует CTE для непосредственного обновления расписание схема таблицы.

SELECT timetable.add_job(
    job_name            => 'notify every minute',
    job_schedule        => '* * * * *',
    job_command         => 'SELECT pg_notify($1, $2)',
    job_parameters      => '[ "TT_CHANNEL", "Ahoj from SQL base task" ]' :: jsonb,
    job_kind            => 'SQL'::timetable.command_kind,
    job_client_name     => NULL,
    job_max_instances   => 1,
    job_live            => TRUE,
    job_self_destruct   => FALSE,
    job_ignore_errors   => TRUE
) as chain_id;

H.2.12.2. Отправить электронное письмо #

Этот пример демонстрирует, как создать продвинутую задачу отправки электронной почты. Он будет проверять, есть ли электронные письма для отправки, отправлять их и записывать статус выполнения команды. Вам не нужно ничего настраивать, каждый параметр может быть указан во время создания цепочки.

DO $$
    -- An example for using the SendMail task.
DECLARE
    v_mail_task_id bigint;
    v_log_task_id bigint;
    v_chain_id bigint;
BEGIN
    -- Get the chain id
    INSERT INTO timetable.chain (chain_name, max_instances, live) VALUES ('Send Mail', 1, TRUE)
    RETURNING chain_id INTO v_chain_id;

    -- Add SendMail task
    INSERT INTO timetable.task (chain_id, task_order, kind, command) 
    SELECT v_chain_id, 10, 'BUILTIN', 'SendMail'
    RETURNING task_id INTO v_mail_task_id;

    -- Create the parameters for the SensMail task
        -- "username":       The username used for authenticating on the mail server
        -- "password":        The password used for authenticating on the mail server
        -- "serverhost":      The IP address or hostname of the mail server
        -- "serverport":      The port of the mail server
        -- "senderaddr":      The email that will appear as the sender
        -- "ccaddr":         String array of the recipients(Cc) email addresses
        -- "bccaddr":        String array of the recipients(Bcc) email addresses
        -- "toaddr":          String array of the recipients(To) email addresses
        -- "subject":        Subject of the email
        -- "attachment":      String array of the attachments (local file)
        -- "attachmentdata":  Pairs of name and base64-encoded content
        -- "msgbody":        The body of the email

    INSERT INTO timetable.parameter (task_id, order_id, value)
        VALUES (v_mail_task_id, 1, '{
                "username":     "user@example.com",
                "password":     "password",
                "serverhost":   "smtp.example.com",
                "serverport":   587,
                "senderaddr":   "user@example.com",
                "ccaddr":       ["recipient_cc@example.com"],
                "bccaddr":      ["recipient_bcc@example.com"],
                "toaddr":       ["recipient@example.com"],
                "subject":      "pg_timetable - No Reply",
                "attachment":   ["D:\\Go stuff\\Books\\Concurrency in Go.pdf","report.yaml"],
                "attachmentdata": [{"name": "File.txt", "base64data": "RmlsZSBDb250ZW50"}],
                "msgbody":      "<b>Hello User,</b> <p>I got some Go books for you enjoy</p> <i>pg_timetable</i>!",
                "contenttype":  "text/html; charset=UTF-8"
                }'::jsonb);
    
    -- Add Log task and make it the last task using `task_order` column (=30)
    INSERT INTO timetable.task (chain_id, task_order, kind, command) 
    SELECT v_chain_id, 30, 'BUILTIN', 'Log'
    RETURNING task_id INTO v_log_task_id;

    -- Add housekeeping task, that will delete sent mail and update parameter for the previous logging task
    -- Since we're using special add_task() function we don't need to specify the `chain_id`.
    -- Function will take the same `chain_id` from the parent task, SendMail in this particular case
    PERFORM timetable.add_task(
        kind => 'SQL', 
        parent_id => v_mail_task_id,
        command => format(
$query$WITH sent_mail(toaddr) AS (DELETE FROM timetable.parameter WHERE task_id = %s RETURNING value->>sername')
INSERT INTO timetable.parameter (task_id, order_id, value) 
SELECT %s, 1, to_jsonb('Sent emails to: ' || string_agg(sent_mail.toaddr, ';'))
FROM sent_mail
ON CONFLICT (task_id, order_id) DO UPDATE SET value = EXCLUDED.value$query$, 
                v_mail_task_id, v_log_task_id
            ),
        order_delta => 10
    );

-- In the end we should have something like this. Note, that even Log task was created earlier it will be executed ter
-- due to `task_order` column.

-- timetable=> SELECT task_id, chain_id, kind, left(command, 50) FROM timetable.task ORDER BY task_order;  
--  task_id | chain_id | task_order |  kind   |                             left
-- ---------+----------+------------+---------+---------------------------------------------------------------
--       45 |       24 |         10 | BUILTIN | SendMail
--       47 |       24 |         20 | SQL     | WITH sent_mail(toaddr) AS (DELETE FROM timetable.p
--       46 |       24 |         30 | BUILTIN | Log
-- (3 rows)

END;
$$
LANGUAGE PLPGSQL;

H.2.12.3. Загрузка, Преобразование и Импорт #

Этот пример демонстрирует, как создать улучшенную трехшаговую цепочку с параметрами. Используется оператор DO для непосредственного обновления расписание схема таблицы.

-- Prepare the destination table 'location'
CREATE TABLE IF NOT EXISTS city(
    city text,
    lat numeric,
    lng numeric,
    country text,
    iso2 text,
    admin_name text,
    capital text,
    population bigint,
    population_proper bigint);

-- An enhanced example consisting of three tasks:
-- 1. Download text file from internet using BUILT-IN command
-- 2. Remove accents (diacritic signs) from letters using PROGRAM command (can be done with `unaccent` PostgreSQL tension) 
-- 3. Import text file as CSV file using BUILT-IN command (can be down with `psql -c /copy`)
DO $$
DECLARE
    v_head_id bigint;
    v_task_id bigint;
    v_chain_id bigint;
BEGIN
    -- Create the chain with default values executed every minute (NULL == '* * * * *' :: timetable.cron)
    INSERT INTO timetable.chain (chain_name, live)
    VALUES ('Download locations and aggregate', TRUE)
    RETURNING chain_id INTO v_chain_id;

    -- Step 1. Download file from the server
    -- Create the chain
    INSERT INTO timetable.task (chain_id, task_order, kind, command, ignore_error)
    VALUES (v_chain_id, 1, 'BUILTIN', 'Download', TRUE)
    RETURNING task_id INTO v_task_id;

    -- Create the parameters for the step 1:
    INSERT INTO timetable.parameter (task_id, order_id, value)
        VALUES (v_task_id, 1, 
           '{
                "workersnum": 1,
                "fileurls": ["https://simplemaps.com/static/data/country-cities/mt/mt.csv"], 
                "destpath": "."
            }'::jsonb);
    
    RAISE NOTICE 'Step 1 completed. Chain added with ID: %; DownloadFile task added with ID: %', v_chain_id, task_id;

    -- Step 2. Transform Unicode characters into ASCII
    -- Create the program task to call 'uconv' and name it 'unaccent'
    INSERT INTO timetable.task (chain_id, task_order, kind, command, ignore_error, task_name)
    VALUES (v_chain_id, 2, 'PROGRAM', 'uconv', TRUE, 'unaccent')
    RETURNING task_id INTO v_task_id;

    -- Create the parameters for the 'unaccent' task. Input and output files in this case
    -- Under Windows we should call PowerShell instead of "uconv" with command:
    -- Set-content "orte_ansi.txt" ((Get-content "orte.txt").Normalize("FormD") -replace '\p{M}', '')
    INSERT INTO timetable.parameter (task_id, order_id, value)
        VALUES (v_task_id, 1, '["-x", "Latin-ASCII", "-o", "mt_ansi.csv", "mt.csv"]'::jsonb);

    RAISE NOTICE 'Step 2 completed. Unacent task added with ID: %', v_task_id;

    -- Step 3. Import ASCII file to PostgreSQL table using "CopyFromFile" built-in command
    INSERT INTO timetable.task (chain_id, task_order, kind, command)
        VALUES (v_chain_id, 3, 'BUILTIN', 'CopyFromFile')
    RETURNING task_id INTO v_task_id;

    -- Add the parameters for the download task. Execute client side COPY to 'location' from 'orte_ansi.txt'
    INSERT INTO timetable.parameter (task_id, order_id, value)
        VALUES (v_task_id, 1, '{"sql": "COPY city FROM STDIN (FORMAT csv, HEADER true)", "filename": "mt_ansi.v" }'::jsonb);

    RAISE NOTICE 'Step 3 completed. Import task added with ID: %', v_task_id;

    INSERT INTO timetable.task (chain_id, task_order, kind, command, ignore_error, task_name)
    VALUES (v_chain_id, 4, 'PROGRAM', 'bash', TRUE, 'remove .csv')
    RETURNING task_id INTO v_task_id;

    INSERT INTO timetable.parameter (task_id, order_id, value)
    VALUES (v_task_id, 1, '["-c", "rm *.csv"]'::jsonb);
END;
$$ LANGUAGE PLPGSQL;

H.2.12.4. Выполнение задач в автономной транзакции #

Этот пример демонстрирует, как выполнять специальные задачи вне контекста транзакционной цепочки. Это полезно для специальных процедур и/или нетранзакционных операций, например, CREATE DATABASE, REINDEX, VACUUM, CREATE TABLESPACE и т.д.

-- An advanced example showing how to use atutonomous tasks.
-- This one-task chain will execute test_proc() procedure.
-- Since procedure will make two commits (after f1() and f2())
-- we cannot use it as a regular task, because all regular tasks 
-- must be executed in the context of a single chain transaction.
-- Same rule applies for some other SQL commands, 
-- e.g. CREATE DATABASE, REINDEX, VACUUM, CREATE TABLESPACE, etc.
CREATE OR REPLACE FUNCTION f (msg TEXT) RETURNS void AS $$
BEGIN 
    RAISE notice '%', msg; 
END;
$$ LANGUAGE PLPGSQL;

CREATE OR REPLACE PROCEDURE test_proc () AS $$
BEGIN
    PERFORM f('hey 1');
    COMMIT;
    PERFORM f('hey 2');
    COMMIT;
END;
$$
LANGUAGE PLPGSQL;

WITH
    cte_chain (v_chain_id) AS (
        INSERT INTO timetable.chain (chain_name, run_at, max_instances, live, self_destruct) 
        VALUES (
            'call proc() every 10 sec', -- chain_name, 
            '@every 10 seconds',        -- run_at,
            1,     -- max_instances, 
            TRUE,  -- live, 
            FALSE -- self_destruct
        ) RETURNING chain_id
    ),
    cte_task(v_task_id) AS (
        INSERT INTO timetable.task (chain_id, task_order, kind, command, ignore_error, autonomous)
        SELECT v_chain_id, 10, 'SQL', 'CALL test_proc()', TRUE, TRUE
        FROM cte_chain
        RETURNING task_id
    )
SELECT v_chain_id, v_task_id FROM cte_task, cte_chain;

H.2.12.5. Остановите планировщик и завершите сеанс #

Этот пример демонстрирует, как завершить работу планировщика, используя специальную встроенную задачу. Это может быть использовано для управления окнами обслуживания, для перезапуска планировщика в целях обновления или для остановки сессии перед удалением базы данных.

-- This one-task chain (aka job) will terminate pg_timetable session.
-- This is useful for maintaining purposes or before database being destroyed.
-- One should take care of restarting pg_timetable if needed.

SELECT timetable.add_job (
    job_name     => 'Shutdown pg_timetable session on schedule',
    job_schedule => '* * 1 * *',
    job_command  => 'Shutdown',
    job_kind     => 'BUILTIN'
);

H.2.12.6. Доступ к коду и результату предыдущей задачи из следующей задачи #

Этот пример демонстрирует, как проверить код результата и вывод предыдущей задачи. Если последняя задача завершилась неудачно, это возможно только если ignore_error boolean = true установлено для этой задачи. В противном случае планировщик остановит цепочку. Этот пример показывает, как вычислить количество неудачных, успешных и общее количество выполненных задач. На основе этих значений мы можем вычислить коэффициент успеха.

WITH 
    cte_chain (v_chain_id) AS ( -- let's create a new chain and add tasks to it later
        INSERT INTO timetable.chain (chain_name, run_at, max_instances, live) 
        VALUES ('many tasks', '* * * * *', 1, true)
        RETURNING chain_id
    ),
    cte_tasks(v_task_id) AS ( -- now we'll add 500 tasks to the chain, some of them will fail
        INSERT INTO timetable.task (chain_id, task_order, kind, command, ignore_error)
        SELECT v_chain_id, g.s, 'SQL', 'SELECT 1.0 / round(random())::int4;', true
        FROM cte_chain, generate_series(1, 500) AS g(s)
        RETURNING task_id
    ),
    report_task(v_task_id) AS ( -- and the last reporting task will calculate the statistic
        INSERT INTO timetable.task (chain_id, task_order, kind, command)
        SELECT v_chain_id, 501, 'SQL', $CMD$DO
$$
DECLARE
    s TEXT;
BEGIN
    WITH report AS (
        SELECT 
        count(*) FILTER (WHERE returncode = 0) AS success,
        count(*) FILTER (WHERE returncode != 0) AS fail,
        count(*) AS total
        FROM timetable.execution_log 
        WHERE chain_id = current_setting('pg_timetable.current_chain_id')::bigint
          AND txid = txid_current()
    )
    SELECT 'Tasks executed:' || total || 
         '; succeeded: ' || success || 
         '; failed: ' || fail || 
         '; ratio: ' || 100.0*success/GREATEST(total,1)
    INTO s
    FROM report;
    RAISE NOTICE '%', s;
END;
$$
$CMD$
        FROM cte_chain
        RETURNING task_id
    )
SELECT v_chain_id FROM cte_chain

H.2.13. Миграция с других планировщиков #

H.2.13.1. Перенос заданий из pg_cron в pg_timetable #

Если вы хотите быстро экспортировать задания, запланированные из pg_cron в pg_timetable, вы можете использовать этот фрагмент SQL:

SELECT timetable.add_job(
    job_name            => COALESCE(jobname, 'job: ' || command),
    job_schedule        => schedule,
    job_command         => command,
    job_kind            => 'SQL',
    job_live            => active
) FROM cron.job;

timetable.add_job(), однако, имеет некоторые ограничения. Прежде всего, функция пометит созданную задачу как автономный , указывая, что планировщик должен выполнить задачу вне цепочки транзакции. Это не ошибка, но многие автономные цепочки могут вызвать использование дополнительных подключений.

Во-вторых, параметры подключения к базе данных теряются для исходных pg_cron задач, делая все задачи локальными. Чтобы экспортировать всю доступную информацию как можно точнее, используйте этот фрагмент SQL под ролью, в которой они были запланированы в pg_cron:

SET ROLE 'scheduler'; -- set the role used by pg_cron

WITH cron_chain AS (
    SELECT
        nextval('timetable.chain_chain_id_seq'::regclass) AS cron_id,
        jobname,
        schedule,
        active,
        command,
        CASE WHEN 
            database != current_database()
            OR nodename != 'localhost'
            OR username != CURRENT_USER
            OR nodeport != inet_server_port() 
        THEN
            format('host=%s port=%s dbname=%s user=%s', nodename, nodeport, database, username)
        END AS connstr
    FROM
        cron.job
),
cte_chain AS (
 INSERT INTO timetable.chain (chain_id, chain_name, run_at, live)
     SELECT 
         cron_id, COALESCE(jobname, 'cronjob' || cron_id), schedule, active
     FROM
         cron_chain
),
cte_tasks AS (
 INSERT INTO timetable.task (chain_id, task_order, kind, command, database_connection)
     SELECT
         cron_id, 1, 'SQL', command, connstr
     FROM
         cron_chain
     RETURNING
         chain_id, task_id
)
SELECT * FROM cte_tasks;

H.2.13.2. Перенос заданий из pgAgent в pg_timetable #

Чтобы перенести задания из pgAgent , пожалуйста, используйте этот скрипт. pgAgent не имеет концепции задачи PROGRAM, поэтому для эмуляции шагов BATCH, pg_timetable будет выполнять их внутри оболочки. Вы можете изменить оболочку, отредактировав cte_shell CTE выражение.

CREATE OR REPLACE FUNCTION bool_array_to_cron(bool[], start_with int4 DEFAULT 0) RETURNS TEXT AS
$$
WITH u AS (
 SELECT unnest($1) e, generate_series($2, array_length($1, 1)-1+$2) AS i 
)
SELECT COALESCE(string_agg(i::text, ','), '*') FROM u WHERE e
$$
LANGUAGE sql;


WITH
cte_shell(shell, cmd_param) AS (
 VALUES ('sh', '-c') -- set the shell you want to use for batch steps, e.g. "pwsh -c", "cmd /C"
),
pga_schedule AS (
 SELECT
     s.jscjobid,
     s.jscname,
     format('%s %s %s %s %s', 
         bool_array_to_cron(s.jscminutes), 
         bool_array_to_cron(s.jschours), 
         bool_array_to_cron(s.jscmonthdays), 
         bool_array_to_cron(s.jscmonths, 1), 
         bool_array_to_cron(s.jscweekdays, 1)) AS schedule
 FROM 
     pgagent.pga_schedule s  
 WHERE s.jscenabled 
         AND now() < COALESCE(s.jscend, 'infinity'::timestamptz)
         AND now() > s.jscstart
),
pga_chain AS (
    SELECT
        nextval('timetable.chain_chain_id_seq'::regclass) AS chain_id,
        jobid,
        format('%s @ %s', jobname, jscname) AS jobname,
        jobhostagent,
        jobenabled,
        schedule
    FROM
        pgagent.pga_job JOIN pga_schedule ON jobid = jscjobid
),
cte_chain AS (
 INSERT INTO timetable.chain (chain_id, chain_name, client_name, run_at, live)
     SELECT 
         chain_id, jobname, jobhostagent, schedule, jobenabled
     FROM
         pga_chain
),
pga_step AS (
 SELECT 
     c.chain_id,
     nextval('timetable.task_task_id_seq'::regclass) AS task_id,
     rank() OVER (ORDER BY jstname) AS jstorder,
     jstid,
     jstname,
     jstenabled,
     CASE jstkind WHEN 'b' THEN 'PROGRAM' ELSE 'SQL' END AS jstkind,
     jstcode,
     COALESCE(
         NULLIF(jstconnstr, ''), 
         CASE 
             WHEN jstdbname = current_database() THEN NULL
             WHEN jstdbname > '' THEN 'dbname=' || jstdbname 
         END
     ) AS jstconnstr,
     jstonerror != 'f' AS jstignoreerror
 FROM
     pga_chain c JOIN pgagent.pga_jobstep js ON c.jobid = js.jstjobid
),
cte_tasks AS (
 INSERT INTO timetable.task(task_id, chain_id, task_name, task_order, kind, command, database_connection)
     SELECT
         task_id, chain_id, jstname, jstorder, jstkind::timetable.command_kind, 
         CASE jstkind WHEN 'SQL' THEN jstcode ELSE sh.shell END,
         jstconnstr
     FROM
         pga_step, cte_shell sh
),
cte_parameters AS (
 INSERT INTO timetable.parameter (task_id, order_id, value)
     SELECT 
         task_id, 1, jsonb_build_array(sh.cmd_param, s.jstcode)
     FROM
         pga_step s, cte_shell sh
     WHERE 
         s.jstkind = 'PROGRAM'
)
SELECT * FROM pga_chain;

H.2.14. REST API #

pg_timetable имеет богатый REST API, который может использоваться внешними инструментами для выполнения запуска/остановки/инициализации/перезапуска/перезагрузки, любыми инструментами для выполнения проверок состояния HTTP, и, конечно, также может использоваться для мониторинга.

Ниже вы найдете список pg_timetable REST API конечные точки.

H.2.14.1. Точки проверки состояния #

GET /liveness

Всегда возвращает HTTP статус-код 200, указывая, что pg_timetable работает.

GET /readiness

Возвращает HTTP статус-код 200, когда pg_timetable работает, и планировщик находится в основном цикле обработки цепочек. Если планировщик подключается к базе данных, создает схему базы данных или обновляет ее, он вернет HTTP статус-код 503.

H.2.14.2. Управление цепочкой конечные точки #

GET /startchain?id=<chain-id>

Возвращает HTTP статус-код 200, если цепочка с данным идентификатором может быть добавлена в очередь рабочих. Однако это не означает, что выполнение цепочки начнется немедленно. Рабочий должен выполнить проверку нагрузки и другие проверки перед началом выполнения цепочки. В случае ошибки возвращается HTTP статус-код 400 с последующим сообщением об ошибке.

GET /stopchain?id=<chain-id>

Возвращает HTTP статус-код 200, если цепочка с данным идентификатором работает в данный момент и может быть остановлена. Если цепочка выполняется, сигнал отмены будет отправлен немедленно. В случае ошибки возвращается HTTP статус-код 400 с сообщением об ошибке.

H.2.15. Схема базы данных #

pg_timetable является приложением, управляемым базой данных. При первом запуске создается необходимая схема, если она отсутствует.

H.2.15.1. Основные таблицы и объекты #

CREATE TABLE timetable.chain (
    chain_id            BIGSERIAL   PRIMARY KEY,
    chain_name          TEXT        NOT NULL UNIQUE,
    run_at              timetable.cron,
    max_instances       INTEGER,
    timeout             INTEGER     DEFAULT 0,
    live                BOOLEAN     DEFAULT FALSE,
    self_destruct       BOOLEAN     DEFAULT FALSE,
    exclusive_execution BOOLEAN     DEFAULT FALSE,
    client_name         TEXT,
    on_error            TEXT
);

COMMENT ON TABLE timetable.chain IS
    'Stores information about chains schedule';
COMMENT ON COLUMN timetable.chain.run_at IS
    'Extended CRON-style time notation the chain has to be run at';
COMMENT ON COLUMN timetable.chain.max_instances IS
    'Number of instances (clients) this chain can run in parallel';
COMMENT ON COLUMN timetable.chain.timeout IS
    'Abort any chain that takes more than the specified number of milliseconds';
COMMENT ON COLUMN timetable.chain.live IS
    'Indication that the chain is ready to run, set to FALSE to pause execution';
COMMENT ON COLUMN timetable.chain.self_destruct IS
    'Indication that this chain will delete itself after successful run';
COMMENT ON COLUMN timetable.chain.exclusive_execution IS
    'All parallel chains should be paused while executing this chain';
COMMENT ON COLUMN timetable.chain.client_name IS
    'Only client with this name is allowed to run this chain, set to NULL to allow any client';    

CREATE TYPE timetable.command_kind AS ENUM ('SQL', 'PROGRAM', 'BUILTIN');

CREATE TABLE timetable.task (
    task_id             BIGSERIAL               PRIMARY KEY,
    chain_id            BIGINT                  REFERENCES timetable.chain(chain_id) ON UPDATE CASCADE ON DELETE SCADE,
    task_order          DOUBLE PRECISION        NOT NULL,
    task_name           TEXT,
    kind                timetable.command_kind  NOT NULL DEFAULT 'SQL',
    command             TEXT                    NOT NULL,
    run_as              TEXT,
    database_connection TEXT,
    ignore_error        BOOLEAN                 NOT NULL DEFAULT FALSE,
    autonomous          BOOLEAN                 NOT NULL DEFAULT FALSE,
    timeout             INTEGER                 DEFAULT 0
);          

COMMENT ON TABLE timetable.task IS
    'Holds information about chain elements aka tasks';
COMMENT ON COLUMN timetable.task.chain_id IS
    'Link to the chain, if NULL task considered to be disabled';
COMMENT ON COLUMN timetable.task.task_order IS
    'Indicates the order of task within a chain';    
COMMENT ON COLUMN timetable.task.run_as IS
    'Role name to run task as. Uses SET ROLE for SQL commands';
COMMENT ON COLUMN timetable.task.ignore_error IS
    'Indicates whether a next task in a chain can be executed regardless of the success of the current one';
COMMENT ON COLUMN timetable.task.kind IS
    'Indicates whether "command" is SQL, built-in function or an external program';
COMMENT ON COLUMN timetable.task.command IS
    'Contains either an SQL command, or command string to be executed';
COMMENT ON COLUMN timetable.task.timeout IS
    'Abort any task within a chain that takes more than the specified number of milliseconds';

-- parameter passing for a chain task
CREATE TABLE timetable.parameter(
    task_id     BIGINT  REFERENCES timetable.task(task_id)
                        ON UPDATE CASCADE ON DELETE CASCADE,
    order_id    INTEGER CHECK (order_id > 0),
    value       JSONB,
    PRIMARY KEY (task_id, order_id)
);

COMMENT ON TABLE timetable.parameter IS
    'Stores parameters passed as arguments to a chain task';

CREATE UNLOGGED TABLE timetable.active_session(
    client_pid  BIGINT  NOT NULL,
    server_pid  BIGINT  NOT NULL,
    client_name TEXT    NOT NULL,
    started_at  TIMESTAMPTZ DEFAULT now()
);

COMMENT ON TABLE timetable.active_session IS
    'Stores information about active sessions';

CREATE TYPE timetable.log_type AS ENUM ('DEBUG', 'NOTICE', 'INFO', 'ERROR', 'PANIC', 'USER');

CREATE OR REPLACE FUNCTION timetable.get_client_name(integer) RETURNS TEXT AS
$$
    SELECT client_name FROM timetable.active_session WHERE server_pid = $1 LIMIT 1
$$
LANGUAGE sql;

CREATE TABLE timetable.log
(
    ts              TIMESTAMPTZ         DEFAULT now(),
    pid             INTEGER             NOT NULL,
    log_level       timetable.log_type  NOT NULL,
    client_name     TEXT                DEFAULT timetable.get_client_name(pg_backend_pid()),
    message         TEXT,
    message_data    jsonb
);

COMMENT ON TABLE timetable.log IS
    'Stores log entries of active sessions';

CREATE TABLE timetable.execution_log (
    chain_id    BIGINT,
    task_id     BIGINT,
    txid        BIGINT NOT NULL,
    last_run    TIMESTAMPTZ DEFAULT now(),
    finished    TIMESTAMPTZ,
    pid         BIGINT,
    returncode  INTEGER,
    kind        timetable.command_kind,
    command     TEXT,
    output      TEXT,
    client_name TEXT        NOT NULL
);

COMMENT ON TABLE timetable.execution_log IS
    'Stores log entries of executed tasks and chains';

CREATE UNLOGGED TABLE timetable.active_chain(
    chain_id    BIGINT  NOT NULL,
    client_name TEXT    NOT NULL,
    started_at  TIMESTAMPTZ DEFAULT now()
);

COMMENT ON TABLE timetable.active_chain IS
    'Stores information about active chains within session';

CREATE OR REPLACE FUNCTION timetable.try_lock_client_name(worker_pid BIGINT, worker_name TEXT)
RETURNS bool AS
$CODE$
BEGIN
    IF pg_is_in_recovery() THEN
        RAISE NOTICE 'Cannot obtain lock on a replica. Please, use the primary node';
        RETURN FALSE;
    END IF;
    -- remove disconnected sessions
    DELETE
        FROM timetable.active_session
        WHERE server_pid NOT IN (
            SELECT pid
            FROM pg_catalog.pg_stat_activity
            WHERE application_name = 'pg_timetable'
        );
    DELETE 
        FROM timetable.active_chain 
        WHERE client_name NOT IN (
            SELECT client_name FROM timetable.active_session
        );
    -- check if there any active sessions with the client name but different client pid
    PERFORM 1
        FROM timetable.active_session s
        WHERE
            s.client_pid <> worker_pid
            AND s.client_name = worker_name
        LIMIT 1;
    IF FOUND THEN
        RAISE NOTICE 'Another client is already connected to server with name: %', worker_name;
        RETURN FALSE;
    END IF;
    -- insert current session information
    INSERT INTO timetable.active_session(client_pid, client_name, server_pid) VALUES (worker_pid, worker_name, backend_pid());
    RETURN TRUE;
END;
$CODE$
STRICT
LANGUAGE plpgsql;

H.2.15.4. ER-Диаграмма #

ER-Diagram