J.4. pg_timetable#
J.4. pg_timetable #
- J.4.1. Обзор
- J.4.2. Основные функции
- J.4.3. Быстрый старт
- J.4.4. Параметры командной строки
- J.4.5. Фон проекта
- J.4.6. Установка
- J.4.7. Компоненты
- J.4.8. Схема базы данных
- J.4.9. Начало работы
- J.4.10. Примеры
- J.4.11. Руководство по настройке цепочки YAML
- J.4.12. Миграция с других планировщиков
- J.4.13. REST API
- J.4.14. Формат определения цепочки YAML для pg_timetable
J.4.1. Обзор #
Версия: 6.2.0
pg_timetable — это продвинутый планировщик задач для Tantor SE, предлагающий множество преимуществ по сравнению с традиционными планировщиками, такими как cron и другие. Он полностью управляется базой данных и предоставляет несколько продвинутых концепций.
J.4.2. Основные функции #
Задачи могут быть организованы в цепочки
Цепочка может состоять из встроенных команд, SQL и исполняемых файлов
Параметры могут быть переданы в цепочки
Пропущенные задачи (возможно, из-за простоя) могут быть повторно выполнены автоматически
Поддержка настраиваемых повторений
Встроенные задачи, такие как отправка электронных писем и т.д.
Полностью управляемая базой данных конфигурация
Полная поддержка ведения журналов на основе базы данных
Планирование в стиле Cron в часовом поясе сервера Tantor SE
Необязательная защита от параллелизма
Задача и цепочка могут иметь настройки времени ожидания выполнения
J.4.3. Быстрый старт #
Скачайте исполняемый файл pg_timetable
Убедитесь, что ваш сервер Tantor SE запущен и имеет роль с привилегией
CREATEдля целевой базы данных, например,my_database=> CREATE ROLE scheduler PASSWORD 'somestrong'; my_database=> GRANT CREATE ON DATABASE my_database TO scheduler;
Создайте новую задачу, например, выполняйте
VACUUMкаждую ночь в 00:30 по часовому поясу сервера Postgresmy_database=> SELECT timetable.add_job('frequent-vacuum', '30 * * * *', 'VACUUM'); add_job --------- 3 (1 row)Запустите pg_timetable
# pg_timetable postgresql://scheduler:somestrong@localhost/my_database --clientname=vacuumer
Успех!
J.4.4. Параметры командной строки #
# ./pg_timetable
Application Options:
-c, --clientname= Unique name for application instance [$PGTT_CLIENTNAME]
--config= YAML configuration file
--no-program-tasks Disable executing of PROGRAM tasks [$PGTT_NOPROGRAMTASKS]
-v, --version Output detailed version information [$PGTT_VERSION]
--connstr PostgreSQL connection string [$PGTT_CONNSTR]
Logging:
--log-level=[debug|info|error] Verbosity level for stdout and log file (default: info)
--log-database-level=[debug|info|error|none] Verbosity level for database storing (default: info)
--log-file= File name to store logs
--log-file-format=[json|text] Format of file logs (default: json)
--log-file-rotate Rotate log files
--log-file-size= Maximum size in MB of the log file before it gets rotated (default: 100)
--log-file-age= Number of days to retain old log files, 0 means forever (default: 0)
--log-file-number= Maximum number of old log files to retain, 0 to retain all (default: 0)
Start:
-f, --file= SQL script file to execute during startup
--init Initialize database schema to the latest version and exit. Can be used
with --upgrade
--upgrade Upgrade database to the latest version
--debug Run in debug mode. Only asynchronous chains will be executed
Resource:
--cron-workers= Number of parallel workers for scheduled chains (default: 16)
--interval-workers= Number of parallel workers for interval chains (default: 16)
--chain-timeout= Abort any chain that takes more than the specified number of
milliseconds
--task-timeout= Abort any task within a chain that takes more than the specified number
of milliseconds
REST:
--rest-port= REST API port (default: 0) [$PGTT_RESTPORT]
J.4.5. Фон проекта #
Проект pg_timetable был запущен еще в 2019 году для внутренних нужд планирования в компании Cybertec.
Для получения дополнительной информации о мотивах проекта и целях проектирования см. оригинальную серию блог-постов, объявляющих о проекте, и последующие обновления функций.
Cybertec также предоставляет коммерческую поддержку с 9 до 5 и 24/7 для pg_timetable.
J.4.5.1. Обратная связь по проекту #
Для запросов на добавление функций или помощи в устранении неполадок, пожалуйста, откройте проблему на странице Github проекта.
J.4.6. Установка #
pg_timetable совместим со всеми поддерживаемыми версиями Tantor SE.
J.4.6.1. Официальные пакеты релизов #
Вы можете найти бинарный пакет для вашей платформы на официальной странице Релизы.
J.4.6.2. Docker #
Официальный образ docker можно найти здесь: https://hub.docker.com/r/cybertecpostgresql/pg_timetable
Примечание
Тег latest актуален с веткой master благодаря
этому
действию github. В производственной среде вам, вероятно, следует использовать
последний
стабильный
тег.
Запустите pg_timetable в Docker:
docker run --rm \ cybertecpostgresql/pg_timetable:latest \ -h 10.0.0.3 -p 54321 -c worker001
Запустите pg_timetable в Docker с переменными окружения:
docker run --rm \ -e PGTT_PGHOST=10.0.0.3 \ -e PGTT_PGPORT=54321 \ cybertecpostgresql/pg_timetable:latest \ -c worker001
J.4.7. Компоненты #
Планирование в pg_timetable охватывает три различных уровня абстракции, чтобы облегчить повторное использование с другими параметрами или дополнительными расписаниями.
- Command:
Базовый уровень, команда, определяет что делать.
- Task:
Второй уровень, задача, представляет собой элемент цепочки (шаг) для выполнения одной из команд. С помощью задач мы определяем порядок команд, передаваемые аргументы (если есть), и как обрабатываются ошибки.
- Chain:
Третий уровень представляет собой связанные задачи, образующие цепочку задач. Цепочка определяет если, когда и как часто должна выполняться задача.
J.4.7.1. Команда #
В настоящее время существует три различных типа команд:
-
SQL SQL фрагмент. Начало очистки, обновление материализованного представления или обработка данных.
-
PROGRAM Внешняя команда. Все, что может быть вызвано как внешний бинарный файл, включая оболочки, например,
bash,pwshи т.д. Внешняя команда будет вызвана с использованием exec.CommandContext из golang.-
BUILTIN Внутренняя команда. Предварительно встроенная функциональность, включенная в pg_timetable. К ним относятся:
NoOp
Sleep
Log
SendMail
Download
CopyFromFile
CopyToFile
CopyFromProgram
CopyToProgram
Shutdown
J.4.7.2. Задача #
Следующим строительным блоком является задача, которая просто представляет собой шаг в списке цепочки команд. Примером задач, объединенных в цепочку, может быть:
Загрузить файлы с сервера
Импорт файлов
Выполнение агрегаций
Создать отчет
Удалить файлы с диска
Примечание
Все задачи цепочки в
pg_timetable выполняются
в рамках одной транзакции. Однако, обратите внимание, что нет
возможности откатить задачи PROGRAM и
BUILTIN.
J.4.7.2.1. Таблица timetable.task #
| Поле | Тип | Описание |
|---|---|---|
chain_id | bigint | Ссылка на цепочку; если NULL, задача считается отключённой |
task_order | DOUBLE PRECISION | Указывает порядок задачи внутри цепочки |
kind | timetable.command_kind | Тип команды. Может быть SQL (по умолчанию), PROGRAM или BUILTIN |
command | text | Содержит либо SQL-команду, либо путь к приложению, либо имя команды BUILTIN, которая будет выполнена |
run_as | text | Роль, от имени которой должна выполняться задача |
database_connection | text | Строка подключения к внешней базе данных, которая должна использоваться |
ignore_error | boolean | Указывает, должна ли следующая задача продолжаться после возникновения ошибки (по умолчанию: false) |
autonomous | boolean | Указывает, должна ли задача выполняться вне транзакции цепочки. Полезно для VACUUM, CREATE DATABASE, CALL и т.д. |
timeout | integer | Прервать любую задачу в цепочке, выполнение которой занимает больше указанного количества миллисекунд |
Предупреждение
Если для задачи установлено значение ignore_error в true (по умолчанию используется значение false), рабочий процесс будет сообщать об успешном выполнении даже если задача внутри цепочки завершилась с ошибкой.
Как упоминалось выше, команды являются простыми скелетами (например, отправить электронное письмо, вакуумировать и т.д.). В большинстве случаев они должны быть оживлены путем передачи входных параметров для выполнения.
J.4.7.2.2. Table timetable.parameter #
| Поле | Тип | Описание |
|---|---|---|
task_id | bigint | Идентификатор задачи |
order_id | integer | Порядок параметра. Несколько параметров обрабатываются один за другим в соответствии с этим порядком |
value | jsonb | Значение JSON, содержащее параметры |
J.4.7.2.3. Формат значения параметра #
В зависимости от command аргумент kind может быть представлен разными значениями JSON.
J.4.7.2.3.2. PROGRAM #
Schema: массив строк
Пример:
'["-x", "Latin-ASCII", "-o", "orte_ansi.txt", "orte.txt"]'::jsonb
J.4.7.2.3.5. BUILTIN: SendMail #
Схема: объект
Пример:
'{
"username": "user@example.com",
"password": "password",
"serverhost": "smtp.example.com",
"serverport": 587,
"senderaddr": "user@example.com",
"ccaddr": ["recipient_cc@example.com"],
"bccaddr": ["recipient_bcc@example.com"],
"toaddr": ["recipient@example.com"],
"subject": "pg_timetable - No Reply",
"attachment": ["/temp/attachments/Report.pdf","config.yaml"],
"attachmentdata": [{"name": "File.txt", "base64data": "RmlsZSBDb250ZW50"}],
"msgbody": "<h2>Hello User,</h2> <p>check some attachments!</p>",
"contenttype": "text/html; charset=UTF-8"
}'::jsonb
J.4.7.2.3.6. BUILTIN: Download #
Схема: объект
Пример:
'{
"workersnum": 2,
"fileurls": ["http://example.com/foo.gz", "https://example.com/bar.csv"],
"destpath": "."
}'::jsonb
J.4.7.2.3.7. BUILTIN: CopyFromFile #
Схема: объект
Пример:
'{
"sql": "COPY location FROM STDIN",
"filename": "download/orte_ansi.txt"
}'::jsonb
J.4.7.2.3.8. BUILTIN: CopyToFile #
Схема: объект
Пример:
'{
"sql": "COPY location TO STDOUT",
"filename": "download/location.txt"
}'::jsonb
J.4.7.2.3.9. BUILTIN: CopyToProgram #
Схема: объект
Пример:
'{
"sql": "COPY location TO STDOUT",
"cmd": "sh",
"args": ["gzip", "-c", ">", "/tmp/output.gz"]
}'::jsonb
J.4.7.2.3.10. BUILTIN: CopyFromProgram #
Схема: объект
Пример:
'{
"sql": "COPY location FROM STDIN",
"cmd": "gunzip",
"args": ["-c", "/tmp/data.gz"]
}'::jsonb
J.4.7.2.3.11. BUILTIN: Shutdown #
значение проигнорировано
J.4.7.2.3.12. BUILTIN: NoOp #
значение проигнорировано
J.4.7.3. Цепочка #
Как только задачи были организованы, их необходимо запланировать как "цепочку". Для этого "pg_timetable" использует расширенную "cron"-строку, при этом добавляя множество вариантов конфигурации.
J.4.7.3.1. Таблица timetable.chain #
| Поле | Тип | Описание |
|---|---|---|
chain_name | text | Уникальное имя цепочки |
run_at | timetable.cron | Стандартное значение в стиле cron во временной зоне сервера Postgres или выражение @after, @every, @reboot |
max_instances | integer | Количество экземпляров, которые эта цепочка может иметь, работающих одновременно |
timeout | integer | Прервать любую цепочку, выполнение которой занимает больше указанного количества миллисекунд |
live | boolean | Управляет возможностью выполнения цепочки при достижении её расписания |
self_destruct | boolean | Самоуничтожить цепочку после успешного выполнения. Неудачные цепочки будут выполнены по расписанию еще один раз |
exclusive_execution | boolean | Указывает, должна ли цепочка выполняться эксклюзивно, пока все остальные цепочки приостановлены |
client_name | text | Указывает, какой клиент должен выполнять цепочку. Установите значение NULL, чтобы разрешить выполнение любому клиенту |
timeout | integer | Прервать цепочку, выполнение которой занимает больше указанного количества миллисекунд |
on_error | — | Содержит SQL-запрос, который выполняется в случае возникновения ошибки. Если задача, вызвавшая ошибку, помечена как ignore_error, то ничего не происходит |
Примечание
Все цепочки в "pg_timetable" планируются по часовому поясу сервера PostgreSQL. Вы можете изменить часовой пояс для "текущей сессии" при добавлении новых цепочек, например,
SET TIME ZONE 'UTC';
-- Run VACUUM at 00:05 every day in August UTC
SELECT timetable.add_job('execute-func', '5 0 * 8 *', 'VACUUM');
J.4.8. Схема базы данных #
pg_timetable является приложением, управляемым базой данных. Во время первого запуска создается необходимая схема, если она отсутствует.
J.4.8.1. Основные таблицы и объекты #
CREATE TABLE timetable.chain (
chain_id BIGSERIAL PRIMARY KEY,
chain_name TEXT NOT NULL UNIQUE,
run_at timetable.cron,
max_instances INTEGER,
timeout INTEGER DEFAULT 0,
live BOOLEAN DEFAULT FALSE,
self_destruct BOOLEAN DEFAULT FALSE,
exclusive_execution BOOLEAN DEFAULT FALSE,
client_name TEXT,
on_error TEXT
);
COMMENT ON TABLE timetable.chain IS
'Stores information about chains schedule';
COMMENT ON COLUMN timetable.chain.run_at IS
'Extended CRON-style time notation the chain has to be run at';
COMMENT ON COLUMN timetable.chain.max_instances IS
'Number of instances (clients) this chain can run in parallel';
COMMENT ON COLUMN timetable.chain.timeout IS
'Abort any chain that takes more than the specified number of milliseconds';
COMMENT ON COLUMN timetable.chain.live IS
'Indication that the chain is ready to run, set to FALSE to pause execution';
COMMENT ON COLUMN timetable.chain.self_destruct IS
'Indication that this chain will delete itself after successful run';
COMMENT ON COLUMN timetable.chain.exclusive_execution IS
'All parallel chains should be paused while executing this chain';
COMMENT ON COLUMN timetable.chain.client_name IS
'Only client with this name is allowed to run this chain, set to NULL to allow any client';
CREATE TYPE timetable.command_kind AS ENUM ('SQL', 'PROGRAM', 'BUILTIN');
CREATE TABLE timetable.task (
task_id BIGSERIAL PRIMARY KEY,
chain_id BIGINT REFERENCES timetable.chain(chain_id) ON UPDATE CASCADE ON DELETE SCADE,
task_order DOUBLE PRECISION NOT NULL,
task_name TEXT,
kind timetable.command_kind NOT NULL DEFAULT 'SQL',
command TEXT NOT NULL,
run_as TEXT,
database_connection TEXT,
ignore_error BOOLEAN NOT NULL DEFAULT FALSE,
autonomous BOOLEAN NOT NULL DEFAULT FALSE,
timeout INTEGER DEFAULT 0
);
COMMENT ON TABLE timetable.task IS
'Holds information about chain elements aka tasks';
COMMENT ON COLUMN timetable.task.chain_id IS
'Link to the chain, if NULL task considered to be disabled';
COMMENT ON COLUMN timetable.task.task_order IS
'Indicates the order of task within a chain';
COMMENT ON COLUMN timetable.task.run_as IS
'Role name to run task as. Uses SET ROLE for SQL commands';
COMMENT ON COLUMN timetable.task.ignore_error IS
'Indicates whether a next task in a chain can be executed regardless of the success of the current one';
COMMENT ON COLUMN timetable.task.kind IS
'Indicates whether "command" is SQL, built-in function or an external program';
COMMENT ON COLUMN timetable.task.command IS
'Contains either an SQL command, or command string to be executed';
COMMENT ON COLUMN timetable.task.timeout IS
'Abort any task within a chain that takes more than the specified number of milliseconds';
COMMENT ON COLUMN timetable.task.autonomous IS
'Specify if the task should be executed out of the chain transaction. Useful for VACUUM, CREATE DATABASE, CALL etc.';
-- parameter passing for a chain task
CREATE TABLE timetable.parameter(
task_id BIGINT REFERENCES timetable.task(task_id)
ON UPDATE CASCADE ON DELETE CASCADE,
order_id INTEGER CHECK (order_id > 0),
value JSONB,
PRIMARY KEY (task_id, order_id)
);
COMMENT ON TABLE timetable.parameter IS
'Stores parameters passed as arguments to a chain task';
CREATE UNLOGGED TABLE timetable.active_session(
client_pid BIGINT NOT NULL,
server_pid BIGINT NOT NULL,
client_name TEXT NOT NULL,
started_at TIMESTAMPTZ DEFAULT now()
);
COMMENT ON TABLE timetable.active_session IS
'Stores information about active sessions';
CREATE TYPE timetable.log_type AS ENUM ('DEBUG', 'NOTICE', 'INFO', 'ERROR', 'PANIC', 'USER');
CREATE OR REPLACE FUNCTION timetable.get_client_name(integer) RETURNS TEXT AS
$$
SELECT client_name FROM timetable.active_session WHERE server_pid = $1 LIMIT 1
$$
LANGUAGE sql;
CREATE TABLE timetable.log
(
ts TIMESTAMPTZ DEFAULT now(),
pid INTEGER NOT NULL,
log_level timetable.log_type NOT NULL,
client_name TEXT DEFAULT timetable.get_client_name(pg_backend_pid()),
message TEXT,
message_data jsonb
);
COMMENT ON TABLE timetable.log IS
'Stores log entries of active sessions';
CREATE TABLE timetable.execution_log (
chain_id BIGINT,
task_id BIGINT,
txid BIGINT NOT NULL,
last_run TIMESTAMPTZ DEFAULT now(),
finished TIMESTAMPTZ,
pid BIGINT,
returncode INTEGER,
ignore_error BOOLEAN,
kind timetable.command_kind,
command TEXT,
output TEXT,
client_name TEXT NOT NULL
);
COMMENT ON TABLE timetable.execution_log IS
'Stores log entries of executed tasks and chains';
CREATE UNLOGGED TABLE timetable.active_chain(
chain_id BIGINT NOT NULL,
client_name TEXT NOT NULL,
started_at TIMESTAMPTZ DEFAULT now()
);
COMMENT ON TABLE timetable.active_chain IS
'Stores information about active chains within session';
CREATE OR REPLACE FUNCTION timetable.try_lock_client_name(worker_pid BIGINT, worker_name TEXT)
RETURNS bool AS
$CODE$
BEGIN
IF pg_is_in_recovery() THEN
RAISE NOTICE 'Cannot obtain lock on a replica. Please, use the primary node';
RETURN FALSE;
END IF;
-- remove disconnected sessions
DELETE
FROM timetable.active_session
WHERE server_pid NOT IN (
SELECT pid
FROM pg_catalog.pg_stat_activity
WHERE application_name = 'pg_timetable'
);
DELETE
FROM timetable.active_chain
WHERE client_name NOT IN (
SELECT client_name FROM timetable.active_session
);
-- check if there any active sessions with the client name but different client pid
PERFORM 1
FROM timetable.active_session s
WHERE
s.client_pid <> worker_pid
AND s.client_name = worker_name
LIMIT 1;
IF FOUND THEN
RAISE NOTICE 'Another client is already connected to server with name: %', worker_name;
RETURN FALSE;
END IF;
-- insert current session information
INSERT INTO timetable.active_session(client_pid, client_name, server_pid) VALUES (worker_pid, worker_name, backend_pid());
RETURN TRUE;
END;
$CODE$
STRICT
LANGUAGE plpgsql;
J.4.8.2. Функции, связанные с работой #
-- add_task() will add a task to the same chain as the task with `parent_id`
CREATE OR REPLACE FUNCTION timetable.add_task(
IN kind timetable.command_kind,
IN command TEXT,
IN parent_id BIGINT,
IN order_delta DOUBLE PRECISION DEFAULT 10
) RETURNS BIGINT AS $$
INSERT INTO timetable.task (chain_id, task_order, kind, command)
SELECT chain_id, task_order + $4, $1, $2 FROM timetable.task WHERE task_id = $3
RETURNING task_id
$$ LANGUAGE SQL;
COMMENT ON FUNCTION timetable.add_task IS 'Add a task to the same chain as the task with parent_id';
-- add_job() will add one-task chain to the system
CREATE OR REPLACE FUNCTION timetable.add_job(
job_name TEXT,
job_schedule timetable.cron,
job_command TEXT,
job_parameters JSONB DEFAULT NULL,
job_kind timetable.command_kind DEFAULT 'SQL'::timetable.command_kind,
job_client_name TEXT DEFAULT NULL,
job_max_instances INTEGER DEFAULT NULL,
job_live BOOLEAN DEFAULT TRUE,
job_self_destruct BOOLEAN DEFAULT FALSE,
job_ignore_errors BOOLEAN DEFAULT TRUE,
job_exclusive BOOLEAN DEFAULT FALSE,
job_on_error TEXT DEFAULT NULL
) RETURNS BIGINT AS $$
WITH
cte_chain (v_chain_id) AS (
INSERT INTO timetable.chain (chain_name, run_at, max_instances, live, self_destruct, client_name, clusive_execution, on_error)
VALUES (job_name, job_schedule,job_max_instances, job_live, job_self_destruct, job_client_name, b_exclusive, job_on_error)
RETURNING chain_id
),
cte_task(v_task_id) AS (
INSERT INTO timetable.task (chain_id, task_order, kind, command, ignore_error, autonomous)
SELECT v_chain_id, 10, job_kind, job_command, job_ignore_errors, TRUE
FROM cte_chain
RETURNING task_id
),
cte_param AS (
INSERT INTO timetable.parameter (task_id, order_id, value)
SELECT v_task_id, 1, job_parameters FROM cte_task, cte_chain
)
SELECT v_chain_id FROM cte_chain
$$ LANGUAGE SQL;
COMMENT ON FUNCTION timetable.add_job IS 'Add one-task chain (aka job) to the system';
-- notify_chain_start() will send notification to the worker to start the chain
CREATE OR REPLACE FUNCTION timetable.notify_chain_start(
chain_id BIGINT,
worker_name TEXT,
start_delay INTERVAL DEFAULT NULL
) RETURNS void AS $$
SELECT pg_notify(
worker_name,
format('{"ConfigID": %s, "Command": "START", "Ts": %s, "Delay": %s}',
chain_id,
EXTRACT(epoch FROM clock_timestamp())::bigint,
COALESCE(EXTRACT(epoch FROM start_delay)::bigint, 0)
)
)
$$ LANGUAGE SQL;
COMMENT ON FUNCTION timetable.notify_chain_start IS 'Send notification to the worker to start the chain';
-- notify_chain_stop() will send notification to the worker to stop the chain
CREATE OR REPLACE FUNCTION timetable.notify_chain_stop(
chain_id BIGINT,
worker_name TEXT
) RETURNS void AS $$
SELECT pg_notify(
worker_name,
format('{"ConfigID": %s, "Command": "STOP", "Ts": %s}',
chain_id,
EXTRACT(epoch FROM clock_timestamp())::bigint)
)
$$ LANGUAGE SQL;
COMMENT ON FUNCTION timetable.notify_chain_stop IS 'Send notification to the worker to stop the chain';
-- move_task_up() will switch the order of the task execution with a previous task within the chain
CREATE OR REPLACE FUNCTION timetable.move_task_up(IN task_id BIGINT) RETURNS boolean AS $$
WITH current_task (ct_chain_id, ct_id, ct_order) AS (
SELECT chain_id, task_id, task_order FROM timetable.task WHERE task_id = $1
),
tasks(t_id, t_new_order) AS (
SELECT task_id, COALESCE(LAG(task_order) OVER w, LEAD(task_order) OVER w)
FROM timetable.task t, current_task ct
WHERE chain_id = ct_chain_id AND (task_order < ct_order OR task_id = ct_id)
WINDOW w AS (PARTITION BY chain_id ORDER BY ABS(task_order - ct_order))
LIMIT 2
),
upd AS (
UPDATE timetable.task t SET task_order = t_new_order
FROM tasks WHERE tasks.t_id = t.task_id AND tasks.t_new_order IS NOT NULL
RETURNING true
)
SELECT COUNT(*) > 0 FROM upd
$$ LANGUAGE SQL;
COMMENT ON FUNCTION timetable.move_task_up IS 'Switch the order of the task execution with a previous task within chain';
-- move_task_down() will switch the order of the task execution with a following task within the chain
CREATE OR REPLACE FUNCTION timetable.move_task_down(IN task_id BIGINT) RETURNS boolean AS $$
WITH current_task (ct_chain_id, ct_id, ct_order) AS (
SELECT chain_id, task_id, task_order FROM timetable.task WHERE task_id = $1
),
tasks(t_id, t_new_order) AS (
SELECT task_id, COALESCE(LAG(task_order) OVER w, LEAD(task_order) OVER w)
FROM timetable.task t, current_task ct
WHERE chain_id = ct_chain_id AND (task_order > ct_order OR task_id = ct_id)
WINDOW w AS (PARTITION BY chain_id ORDER BY ABS(task_order - ct_order))
LIMIT 2
),
upd AS (
UPDATE timetable.task t SET task_order = t_new_order
FROM tasks WHERE tasks.t_id = t.task_id AND tasks.t_new_order IS NOT NULL
RETURNING true
)
SELECT COUNT(*) > 0 FROM upd
$$ LANGUAGE SQL;
COMMENT ON FUNCTION timetable.move_task_down IS 'Switch the order of the task execution with a following task hin the chain';
-- delete_task() will delete the task from a chain
CREATE OR REPLACE FUNCTION timetable.delete_task(IN task_id BIGINT) RETURNS boolean AS $$
WITH del_task AS (DELETE FROM timetable.task WHERE task_id = $1 RETURNING task_id)
SELECT EXISTS(SELECT 1 FROM del_task)
$$ LANGUAGE SQL;
COMMENT ON FUNCTION timetable.delete_task IS 'Delete the task from a chain';
-- delete_job() will delete the chain and its tasks from the system
CREATE OR REPLACE FUNCTION timetable.delete_job(IN job_name TEXT) RETURNS boolean AS $$
WITH del_chain AS (DELETE FROM timetable.chain WHERE chain.chain_name = $1 RETURNING chain_id)
SELECT EXISTS(SELECT 1 FROM del_chain)
$$ LANGUAGE SQL;
COMMENT ON FUNCTION timetable.delete_job IS 'Delete the chain and its tasks from the system';
-- pause_job() will pause the chain (set live = false)
CREATE OR REPLACE FUNCTION timetable.pause_job(IN job_name TEXT) RETURNS boolean AS $$
WITH upd_chain AS (UPDATE timetable.chain SET live = false WHERE chain.chain_name = $1 RETURNING chain_id)
SELECT EXISTS(SELECT 1 FROM upd_chain)
$$ LANGUAGE SQL;
COMMENT ON FUNCTION timetable.pause_job IS 'Pause the chain (set live = false)';
-- resume_job() will resume the chain (set live = true)
CREATE OR REPLACE FUNCTION timetable.resume_job(IN job_name TEXT) RETURNS boolean AS $$
WITH upd_chain AS (UPDATE timetable.chain SET live = true WHERE chain.chain_name = $1 RETURNING chain_id)
SELECT EXISTS(SELECT 1 FROM upd_chain)
$$ LANGUAGE SQL;
COMMENT ON FUNCTION timetable.resume_job IS 'Resume the chain (set live = true)';
J.4.8.3. Функции, связанные с Сron #
CREATE OR REPLACE FUNCTION timetable.cron_split_to_arrays(
cron text,
OUT mins integer[],
OUT hours integer[],
OUT days integer[],
OUT months integer[],
OUT dow integer[]
) RETURNS record AS $$
DECLARE
a_element text[];
i_index integer;
a_tmp text[];
tmp_item text;
a_range int[];
a_split text[];
a_res integer[];
max_val integer;
min_val integer;
dimensions constant text[] = '{"minutes", "hours", "days", "months", "days of week"}';
allowed_ranges constant integer[][] = '{{0,59},{0,23},{1,31},{1,12},{0,7}}';
BEGIN
a_element := regexp_split_to_array(cron, '\s+');
FOR i_index IN 1..5 LOOP
a_res := NULL;
a_tmp := string_to_array(a_element[i_index],',');
FOREACH tmp_item IN ARRAY a_tmp LOOP
IF tmp_item ~ '^[0-9]+$' THEN -- normal integer
a_res := array_append(a_res, tmp_item::int);
ELSIF tmp_item ~ '^[*]+$' THEN -- '*' any value
a_range := array(select generate_series(allowed_ranges[i_index][1], allowed_ranges[i_index][2]));
a_res := array_cat(a_res, a_range);
ELSIF tmp_item ~ '^[0-9]+[-][0-9]+$' THEN -- '-' range of values
a_range := regexp_split_to_array(tmp_item, '-');
a_range := array(select generate_series(a_range[1], a_range[2]));
a_res := array_cat(a_res, a_range);
ELSIF tmp_item ~ '^[0-9]+[\/][0-9]+$' THEN -- '/' step values
a_range := regexp_split_to_array(tmp_item, '/');
a_range := array(select generate_series(a_range[1], allowed_ranges[i_index][2], a_range[2]));
a_res := array_cat(a_res, a_range);
ELSIF tmp_item ~ '^[0-9-]+[\/][0-9]+$' THEN -- '-' range of values and '/' step values
a_split := regexp_split_to_array(tmp_item, '/');
a_range := regexp_split_to_array(a_split[1], '-');
a_range := array(select generate_series(a_range[1], a_range[2], a_split[2]::int));
a_res := array_cat(a_res, a_range);
ELSIF tmp_item ~ '^[*]+[\/][0-9]+$' THEN -- '*' any value and '/' step values
a_split := regexp_split_to_array(tmp_item, '/');
a_range := array(select generate_series(allowed_ranges[i_index][1], allowed_ranges[i_index][2], a_split[2]::int));
a_res := array_cat(a_res, a_range);
ELSE
RAISE EXCEPTION 'Value ("%") not recognized', a_element[i_index]
USING HINT = 'fields separated by space or tab.'+
'Values allowed: numbers (value list with ","), '+
'any value with "*", range of value with "-" and step values with "/"!';
END IF;
END LOOP;
SELECT
ARRAY_AGG(x.val), MIN(x.val), MAX(x.val) INTO a_res, min_val, max_val
FROM (
SELECT DISTINCT UNNEST(a_res) AS val ORDER BY val) AS x;
IF max_val > allowed_ranges[i_index][2] OR min_val < allowed_ranges[i_index][1] OR a_res IS NULL THEN
RAISE EXCEPTION '% is out of range % for %', tmp_item, allowed_ranges[i_index:i_index][:], dimensions[i_index];
END IF;
CASE i_index
WHEN 1 THEN mins := a_res;
WHEN 2 THEN hours := a_res;
WHEN 3 THEN days := a_res;
WHEN 4 THEN months := a_res;
ELSE
dow := a_res;
END CASE;
END LOOP;
RETURN;
END;
$$ LANGUAGE PLPGSQL STRICT;
CREATE OR REPLACE FUNCTION timetable.cron_months(
from_ts timestamptz,
allowed_months int[]
) RETURNS SETOF timestamptz AS $$
WITH
am(am) AS (SELECT UNNEST(allowed_months)),
genm(ts) AS ( --generated months
SELECT date_trunc('month', ts)
FROM pg_catalog.generate_series(from_ts, from_ts + INTERVAL '1 year', INTERVAL '1 month') g(ts)
)
SELECT ts FROM genm JOIN am ON date_part('month', genm.ts) = am.am
$$ LANGUAGE SQL STRICT;
CREATE OR REPLACE FUNCTION timetable.cron_days(
from_ts timestamptz,
allowed_months int[],
allowed_days int[],
allowed_week_days int[]
) RETURNS SETOF timestamptz AS $$
WITH
ad(ad) AS (SELECT UNNEST(allowed_days)),
am(am) AS (SELECT * FROM timetable.cron_months(from_ts, allowed_months)),
gend(ts) AS ( --generated days
SELECT date_trunc('day', ts)
FROM am,
pg_catalog.generate_series(am.am, am.am + INTERVAL '1 month'
- INTERVAL '1 day', -- don't include the same day of the next month
INTERVAL '1 day') g(ts)
)
SELECT ts
FROM gend JOIN ad ON date_part('day', gend.ts) = ad.ad
WHERE extract(dow from ts)=ANY(allowed_week_days)
$$ LANGUAGE SQL STRICT;
CREATE OR REPLACE FUNCTION timetable.cron_times(
allowed_hours int[],
allowed_minutes int[]
) RETURNS SETOF time AS $$
WITH
ah(ah) AS (SELECT UNNEST(allowed_hours)),
am(am) AS (SELECT UNNEST(allowed_minutes))
SELECT make_time(ah.ah, am.am, 0) FROM ah CROSS JOIN am
$$ LANGUAGE SQL STRICT;
CREATE OR REPLACE FUNCTION timetable.cron_runs(
from_ts timestamp with time zone,
cron text
) RETURNS SETOF timestamptz AS $$
SELECT cd + ct
FROM
timetable.cron_split_to_arrays(cron) a,
timetable.cron_times(a.hours, a.mins) ct CROSS JOIN
timetable.cron_days(from_ts, a.months, a.days, a.dow) cd
WHERE cd + ct > from_ts
ORDER BY 1 ASC;
$$ LANGUAGE SQL STRICT;
CREATE DOMAIN timetable.cron AS TEXT CHECK(
VALUE = '@reboot'
OR substr(VALUE, 1, 6) IN ('@every', '@after')
AND (substr(VALUE, 7) :: INTERVAL) IS NOT NULL
OR VALUE ~ '^(((\d+,)+\d+|(\d+(\/|-)\d+)|(\*(\/|-)\d+)|\d+|\*) +){4}(((\d+,)+\d+|(\d+(\/|-)\d+)|(\*(\/|-)\d+)|\d+|\*) ?)$'
AND timetable.cron_split_to_arrays(VALUE) IS NOT NULL
);
COMMENT ON DOMAIN timetable.cron IS 'Extended CRON-style notation with support of interval values';
-- is_cron_in_time returns TRUE if timestamp is listed in cron expression
CREATE OR REPLACE FUNCTION timetable.is_cron_in_time(
run_at timetable.cron,
ts timestamptz
) RETURNS BOOLEAN AS $$
SELECT
CASE WHEN run_at IS NULL THEN
TRUE
ELSE
date_part('month', ts) = ANY(a.months)
AND (date_part('dow', ts) = ANY(a.dow) OR date_part('isodow', ts) = ANY(a.dow))
AND date_part('day', ts) = ANY(a.days)
AND date_part('hour', ts) = ANY(a.hours)
AND date_part('minute', ts) = ANY(a.mins)
END
FROM
timetable.cron_split_to_arrays(run_at) a
$$ LANGUAGE SQL;
CREATE OR REPLACE FUNCTION timetable.next_run(cron timetable.cron) RETURNS timestamptz AS $$
SELECT * FROM timetable.cron_runs(now(), cron) LIMIT 1
$$ LANGUAGE SQL STRICT;
J.4.8.4. ER-Диаграмма #

J.4.9. Начало работы #
Разнообразные примеры можно найти в примерах. Если вы хотите перейти с другого планировщика, вы можете воспользоваться скриптами из главы миграции.
J.4.9.1. Добавить простое задание #
В реальном мире обычно достаточно использовать простые задания. Под этим термином мы понимаем:
задание — это цепочка, содержащая только одну задачу (шаг);
он не использует сложную логику, а скорее простую "команду;
это не требует сложной обработки транзакций, поскольку одна задача неявно выполняется как одна транзакция.
Для такой группы цепочек мы ввели специальную функцию timetable.add_job().
J.4.9.1.1. Функция: timetable.add_job() #
Создает простую цепочку из одной задачи
Возвращает: BIGINT
J.4.9.1.1.1. Параметры #
| Параметр | Тип | Описание | По умолчанию |
|---|---|---|---|
job_name | text | Уникальное имя для "цепочки" и "команды" | Обязательно |
job_schedule | timetable.cron | Расписание времени в синтаксисе cron по часовому поясу сервера Postgres | Обязательно |
job_command | text | SQL-запрос, который будет выполнен | Обязательно |
job_parameters | jsonb | Аргументы для цепочки "command | NULL |
job_kind | timetable.command_kind | Тип команды: SQL, PROGRAM или BUILTIN | SQL |
job_client_name | text | Указывает, какой клиент должен выполнять цепочку. Установите значение NULL, чтобы разрешить выполнение любому клиенту | NULL |
job_max_instances | integer | Количество экземпляров, которые эта цепочка может выполнять одновременно | NULL |
job_live | boolean | Управляет возможностью выполнения цепочки после достижения ею расписания | TRUE |
job_self_destruct | boolean | Самоуничтожить цепочку после выполнения | FALSE |
job_ignore_errors | boolean | Игнорировать ошибку во время выполнения | TRUE |
job_exclusive | boolean | Выполнять цепочку в эксклюзивном режиме | FALSE |
Возвращает: идентификатор созданной цепочки
J.4.9.2. Примеры #
Выполнять
public.my_func()в 00:05 каждый день в августе по часовому поясу сервера Postgres:SELECT timetable.add_job('execute-func', '5 0 * 8 *', 'SELECT public.my_func()');Запускать
VACUUMна 23-й минуте каждого второго часа с 0 до 20 каждый день по часовому поясу сервера Postgres:SELECT timetable.add_job('run-vacuum', '23 0-20/2 * * *', 'VACUUM');Обновлять материализованное представление каждые 2 часа:
SELECT timetable.add_job('refresh-matview', '@every 2 hours', 'REFRESH MATERIALIZED VIEW public.mat_view');Очистить таблицу журнала после перезапуска pg_timetable:
SELECT timetable.add_job('clear-log', '@reboot', 'TRUNCATE timetable.log');Переиндексация в полночь по времени сервера Postgres по воскресеньям с помощью утилиты reindexdb:
используется база данных по умолчанию под пользователем по умолчанию (без аргументов командной строки)
SELECT timetable.add_job('reindex', '0 0 * * 7', 'reindexdb', job_kind := 'PROGRAM');указание целевой базы данных и таблиц, и быть подробным
SELECT timetable.add_job('reindex', '0 0 * * 7', 'reindexdb', '["--table=foo", "--dbname=postgres", "--verbose"]'::jsonb, 'PROGRAM');передача пароля с помощью переменной окружения через оболочку
bashSELECT timetable.add_job('reindex', '0 0 * * 7', 'bash', '["-c", "PGPASSWORD=5m3R7K4754p4m reindexdb -U postgres -h 192.168.0.221 -v"]'::jsonb, 'PROGRAM');
J.4.10. Примеры #
J.4.10.1. Основы #
Этот пример демонстрирует, как создать базовую одноступенчатую цепочку с параметрами. Он использует CTE для прямого обновления таблиц схемы timetable.
SELECT timetable.add_job(
job_name => 'notify every minute',
job_schedule => '* * * * *',
job_command => 'SELECT pg_notify($1, $2)',
job_parameters => '[ "TT_CHANNEL", "Ahoj from SQL base task" ]' :: jsonb,
job_kind => 'SQL'::timetable.command_kind,
job_client_name => NULL,
job_max_instances => 1,
job_live => TRUE,
job_self_destruct => FALSE,
job_ignore_errors => TRUE
) as chain_id;
J.4.10.2. Отправить электронное письмо #
Этот пример демонстрирует, как создать расширенную задачу для отправки электронной почты. Она проверит, есть ли письма для отправки, отправит их и зафиксирует статус выполнения команды. Вам не нужно ничего настраивать, каждый параметр можно указать при создании цепочки.
DO $$
-- An example for using the SendMail task.
DECLARE
v_mail_task_id bigint;
v_log_task_id bigint;
v_chain_id bigint;
BEGIN
-- Get the chain id
INSERT INTO timetable.chain (chain_name, max_instances, live) VALUES ('Send Mail', 1, TRUE)
RETURNING chain_id INTO v_chain_id;
-- Add SendMail task
INSERT INTO timetable.task (chain_id, task_order, kind, command)
SELECT v_chain_id, 10, 'BUILTIN', 'SendMail'
RETURNING task_id INTO v_mail_task_id;
-- Create the parameters for the SensMail task
-- "username": The username used for authenticating on the mail server
-- "password": The password used for authenticating on the mail server
-- "serverhost": The IP address or hostname of the mail server
-- "serverport": The port of the mail server
-- "senderaddr": The email that will appear as the sender
-- "ccaddr": String array of the recipients(Cc) email addresses
-- "bccaddr": String array of the recipients(Bcc) email addresses
-- "toaddr": String array of the recipients(To) email addresses
-- "subject": Subject of the email
-- "attachment": String array of the attachments (local file)
-- "attachmentdata": Pairs of name and base64-encoded content
-- "msgbody": The body of the email
INSERT INTO timetable.parameter (task_id, order_id, value)
VALUES (v_mail_task_id, 1, '{
"username": "user@example.com",
"password": "password",
"serverhost": "smtp.example.com",
"serverport": 587,
"senderaddr": "user@example.com",
"ccaddr": ["recipient_cc@example.com"],
"bccaddr": ["recipient_bcc@example.com"],
"toaddr": ["recipient@example.com"],
"subject": "pg_timetable - No Reply",
"attachment": ["D:\\Go stuff\\Books\\Concurrency in Go.pdf","report.yaml"],
"attachmentdata": [{"name": "File.txt", "base64data": "RmlsZSBDb250ZW50"}],
"msgbody": "<b>Hello User,</b> <p>I got some Go books for you enjoy</p> <i>pg_timetable</i>!",
"contenttype": "text/html; charset=UTF-8"
}'::jsonb);
-- Add Log task and make it the last task using `task_order` column (=30)
INSERT INTO timetable.task (chain_id, task_order, kind, command)
SELECT v_chain_id, 30, 'BUILTIN', 'Log'
RETURNING task_id INTO v_log_task_id;
-- Add housekeeping task, that will delete sent mail and update parameter for the previous logging task
-- Since we're using special add_task() function we don't need to specify the `chain_id`.
-- Function will take the same `chain_id` from the parent task, SendMail in this particular case
PERFORM timetable.add_task(
kind => 'SQL',
parent_id => v_mail_task_id,
command => format(
$query$WITH sent_mail(toaddr) AS (DELETE FROM timetable.parameter WHERE task_id = %s RETURNING value->>'username')
INSERT INTO timetable.parameter (task_id, order_id, value)
SELECT %s, 1, to_jsonb('Sent emails to: ' || string_agg(sent_mail.toaddr, ';'))
FROM sent_mail
ON CONFLICT (task_id, order_id) DO UPDATE SET value = EXCLUDED.value$query$,
v_mail_task_id, v_log_task_id
),
order_delta => 10
);
-- In the end we should have something like this. Note, that even Log task was created earlier it will be executed later
-- due to `task_order` column.
-- timetable=> SELECT task_id, chain_id, kind, left(command, 50) FROM timetable.task ORDER BY task_order;
-- task_id | chain_id | task_order | kind | left
-- ---------+----------+------------+---------+---------------------------------------------------------------
-- 45 | 24 | 10 | BUILTIN | SendMail
-- 47 | 24 | 20 | SQL | WITH sent_mail(toaddr) AS (DELETE FROM timetable.p
-- 46 | 24 | 30 | BUILTIN | Log
-- (3 rows)
END;
$$
LANGUAGE PLPGSQL;
J.4.10.3. Загрузка, Преобразование и Импорт #
Этот пример демонстрирует, как создать расширенную трехшаговую цепочку с параметрами. Он использует оператор DO для прямого обновления таблиц схемы timetable.
-- Prepare the destination table 'location'
CREATE TABLE IF NOT EXISTS public.city(
city text,
lat numeric,
lng numeric,
country text,
iso2 text,
admin_name text,
capital text,
population bigint,
population_proper bigint);
GRANT ALL ON public.city TO scheduler;
-- An enhanced example consisting of three tasks:
-- 1. Download text file from internet using BUILT-IN command
-- 2. Remove accents (diacritic signs) from letters using PROGRAM command (can be done with `unaccent` PostgreSQL extension)
-- 3. Import text file as CSV file using BUILT-IN command (can be down with `psql -c /copy`)
DO $$
DECLARE
v_task_id bigint;
v_chain_id bigint;
BEGIN
-- Create the chain with default values executed every minute (NULL == '* * * * *' :: timetable.cron)
INSERT INTO timetable.chain (chain_name, live)
VALUES ('Download locations and aggregate', TRUE)
RETURNING chain_id INTO v_chain_id;
-- Step 1. Download file from the server
-- Create the chain
INSERT INTO timetable.task (chain_id, task_order, kind, command, ignore_error)
VALUES (v_chain_id, 1, 'BUILTIN', 'Download', TRUE)
RETURNING task_id INTO v_task_id;
-- Create the parameters for the step 1:
INSERT INTO timetable.parameter (task_id, order_id, value)
VALUES (v_task_id, 1,
'{
"workersnum": 1,
"fileurls": ["https://simplemaps.com/static/data/country-cities/mt/mt.csv"],
"destpath": "."
}'::jsonb);
RAISE NOTICE 'Step 1 completed. Chain added with ID: %; DownloadFile task added with ID: %', v_chain_id, v_task_id;
-- Step 2. Transform Unicode characters into ASCII
-- Create the program task to call 'uconv' and name it 'unaccent'
INSERT INTO timetable.task (chain_id, task_order, kind, command, ignore_error, task_name)
VALUES (v_chain_id, 2, 'PROGRAM', 'uconv', TRUE, 'unaccent')
RETURNING task_id INTO v_task_id;
-- Create the parameters for the 'unaccent' task. Input and output files in this case
-- Under Windows we should call PowerShell instead of "uconv" with command:
-- Set-content "orte_ansi.txt" ((Get-content "orte.txt").Normalize("FormD") -replace '\p{M}', '')
INSERT INTO timetable.parameter (task_id, order_id, value)
VALUES (v_task_id, 1, '["-x", "Latin-ASCII", "-o", "mt_ansi.csv", "mt.csv"]'::jsonb);
RAISE NOTICE 'Step 2 completed. Unacent task added with ID: %', v_task_id;
-- Step 3. Import ASCII file to PostgreSQL table using "CopyFromFile" built-in command
INSERT INTO timetable.task (chain_id, task_order, kind, command)
VALUES (v_chain_id, 3, 'BUILTIN', 'CopyFromFile')
RETURNING task_id INTO v_task_id;
-- Add the parameters for the download task. Execute client side COPY to 'location' from 'orte_ansi.txt'
INSERT INTO timetable.parameter (task_id, order_id, value)
VALUES (v_task_id, 1, '{"sql": "COPY city FROM STDIN (FORMAT csv, HEADER true)", "filename": "mt_ansi.csv" }'::jsonb);
RAISE NOTICE 'Step 3 completed. Import task added with ID: %', v_task_id;
INSERT INTO timetable.task (chain_id, task_order, kind, command, ignore_error, task_name)
VALUES (v_chain_id, 4, 'PROGRAM', 'bash', TRUE, 'remove .csv')
RETURNING task_id INTO v_task_id;
INSERT INTO timetable.parameter (task_id, order_id, value)
VALUES (v_task_id, 1, '["-c", "rm *.csv"]'::jsonb);
RAISE NOTICE 'Step 4 completed. Cleanup task added with ID: %', v_task_id;
END;
$$ LANGUAGE PLPGSQL;
J.4.10.4. Выполнение задач в автономной транзакции #
Этот пример демонстрирует, как выполнять специальные задачи вне контекста транзакции цепочки. Это полезно для специальных процедур и/или нетранзакционных операций, например, CREATE DATABASE, REINDEX, VACUUM, CREATE TABLESPACE и т.д.
-- An advanced example showing how to use atutonomous tasks.
-- This one-task chain will execute test_proc() procedure.
-- Since procedure will make two commits (after f1() and f2())
-- we cannot use it as a regular task, because all regular tasks
-- must be executed in the context of a single chain transaction.
-- Same rule applies for some other SQL commands,
-- e.g. CREATE DATABASE, REINDEX, VACUUM, CREATE TABLESPACE, etc.
CREATE OR REPLACE FUNCTION f (msg TEXT) RETURNS void AS $$
BEGIN
RAISE notice '%', msg;
END;
$$ LANGUAGE PLPGSQL;
CREATE OR REPLACE PROCEDURE test_proc () AS $$
BEGIN
PERFORM f('hey 1');
COMMIT;
PERFORM f('hey 2');
COMMIT;
END;
$$
LANGUAGE PLPGSQL;
WITH
cte_chain (v_chain_id) AS (
INSERT INTO timetable.chain (chain_name, run_at, max_instances, live, self_destruct)
VALUES (
'call proc() every 10 sec', -- chain_name,
'@every 10 seconds', -- run_at,
1, -- max_instances,
TRUE, -- live,
FALSE -- self_destruct
) RETURNING chain_id
),
cte_task(v_task_id) AS (
INSERT INTO timetable.task (chain_id, task_order, kind, command, ignore_error, autonomous)
SELECT v_chain_id, 10, 'SQL', 'CALL test_proc()', TRUE, TRUE
FROM cte_chain
RETURNING task_id
)
SELECT v_chain_id, v_task_id FROM cte_task, cte_chain;
J.4.10.5. Завершить работу планировщика и завершить сеанс #
Этот пример демонстрирует, как завершить работу планировщика с помощью специальной встроенной задачи. Это может быть использовано для управления окнами обслуживания, для перезапуска планировщика с целью обновления или для остановки сессии перед удалением базы данных.
-- This one-task chain (aka job) will terminate pg_timetable session.
-- This is useful for maintaining purposes or before database being destroyed.
-- One should take care of restarting pg_timetable if needed.
SELECT timetable.add_job (
job_name => 'Shutdown pg_timetable session on schedule',
job_schedule => '* * 1 * *',
job_command => 'Shutdown',
job_kind => 'BUILTIN'
);
J.4.10.6. Доступ к коду и результату предыдущей задачи из следующей задачи #
Этот пример демонстрирует, как проверить код результата и вывод предыдущей задачи. Если последняя задача завершилась с ошибкой, это возможно только если для этой задачи установлено ignore_error boolean = true. В противном случае планировщик остановит цепочку. Этот пример показывает, как вычислить количество неудачных, успешных и общее число выполненных задач. На основе этих значений можно рассчитать коэффициент успешности.
WITH
cte_chain (v_chain_id) AS ( -- let's create a new chain and add tasks to it later
INSERT INTO timetable.chain (chain_name, run_at, max_instances, live)
VALUES ('many tasks', '* * * * *', 1, true)
RETURNING chain_id
),
cte_tasks(v_task_id) AS ( -- now we'll add 500 tasks to the chain, some of them will fail
INSERT INTO timetable.task (chain_id, task_order, kind, command, ignore_error)
SELECT v_chain_id, g.s, 'SQL', 'SELECT 1.0 / round(random())::int4;', true
FROM cte_chain, generate_series(1, 500) AS g(s)
RETURNING task_id
),
report_task(v_task_id) AS ( -- and the last reporting task will calculate the statistic
INSERT INTO timetable.task (chain_id, task_order, kind, command)
SELECT v_chain_id, 501, 'SQL', $CMD$DO
$$
DECLARE
s TEXT;
BEGIN
WITH report AS (
SELECT
count(*) FILTER (WHERE returncode = 0) AS success,
count(*) FILTER (WHERE returncode != 0) AS fail,
count(*) AS total
FROM timetable.execution_log
WHERE chain_id = current_setting('pg_timetable.current_chain_id')::bigint
AND txid = txid_current()
)
SELECT 'Tasks executed:' || total ||
'; succeeded: ' || success ||
'; failed: ' || fail ||
'; ratio: ' || 100.0*success/GREATEST(total,1)
INTO s
FROM report;
RAISE NOTICE '%', s;
END;
$$
$CMD$
FROM cte_chain
RETURNING task_id
)
SELECT v_chain_id FROM cte_chain
J.4.11. Руководство по настройке цепочки YAML #
В этом руководстве объясняется, как использовать YAML-файлы для определения цепочек pg_timetable в качестве альтернативы конфигурации на основе SQL.
J.4.11.1. Обзор #
Определения цепочек в YAML предоставляют человекочитаемый способ создания цепочек запланированных задач без написания SQL. Преимущества включают:
Создание сложных многоэтапных рабочих процессов с четкой структурой
Контроль версий ваших конфигураций цепочек
Легкий просмотр и изменение запланированных задач
Совместное использование шаблонов цепочек между средами
J.4.11.2. Основное использование #
# Load YAML chains pg_timetable --file chains.yaml postgresql://user:pass@host/db # Validate YAML without importing pg_timetable --file chains.yaml --validate # Replace existing chains with same names pg_timetable --file chains.yaml --replace postgresql://user:pass@host/db
J.4.11.3. YAML Format #
J.4.11.3.1. Базовая структура #
chains:
- name: "chain-name" # Required: unique identifier
schedule: "* * * * *" # Required: cron format
live: true # Optional: enable/disable chain
max_instances: 1 # Optional: max parallel executions
timeout: 30000 # Optional: timeout in milliseconds
self_destruct: false # Optional: delete after success
exclusive: false # Optional: pause other chains while running
client_name: "worker-1" # Optional: restrict to specific client
on_error: "SELECT log_error($1)" # Optional: error handling SQL
tasks: # Required: array of tasks
- name: "task-name" # Optional: task description
kind: "SQL" # Optional: SQL, PROGRAM, or BUILTIN
command: "SELECT now()" # Required: command to execute
run_as: "postgres" # Optional: role for SET ROLE
connect_string: "postgresql://user@host/otherdb" # Optional: different database connection
ignore_error: false # Optional: continue on error
autonomous: false # Optional: run outside transaction
timeout: 5000 # Optional: task timeout in ms
parameters: # Optional: task parameters, each entry causes separate execution
- ["value1", 42] # Parameters for SQL tasks are arrays of values
J.4.11.3.2. Параметры задачи #
Каждая задача может иметь несколько записей параметров, при этом каждая запись вызывает отдельное выполнение:
# SQL task parameters (arrays of values)
- name: "sql-task"
kind: "SQL"
command: "SELECT $1, $2, $3, $4"
parameters:
- ["one", 2, 3.14, false] # First execution
- ["two", 4, 6.28, true] # Second execution
# PROGRAM task parameters (arrays of command-line arguments)
- name: "program-task"
kind: "PROGRAM"
command: "iconv"
parameters:
- ["-x", "Latin-ASCII", "-o", "file1.txt", "input1.txt"]
- ["-x", "UTF-8", "-o", "file2.txt", "input2.txt"]
# BUILTIN: Sleep task (integer values)
- name: "sleep-task"
kind: "BUILTIN"
command: "Sleep"
parameters:
- 5 # Sleep for 5 seconds
- 10 # Then sleep for 10 seconds
# BUILTIN: Log task (string or object values)
- name: "log-task"
kind: "BUILTIN"
command: "Log"
parameters:
- "WARNING: Simple message"
- level: "WARNING"
details: "Object message"
# BUILTIN: SendMail task (complex object)
- name: "mail-task"
kind: "BUILTIN"
command: "SendMail"
parameters:
- username: "user@example.com"
password: "password123"
serverhost: "smtp.example.com"
serverport: 587
senderaddr: "user@example.com"
toaddr: ["recipient@example.com"]
subject: "Notification"
msgbody: "<p>Hello User</p>"
contenttype: "text/html; charset=UTF-8"
J.4.11.3.3. Примеры #
J.4.11.3.3.1. Простая SQL задача #
chains:
- name: "daily-cleanup"
schedule: "0 2 * * *" # 2 AM daily
live: true
tasks:
- name: "vacuum-tables"
command: "VACUUM ANALYZE"
J.4.11.3.3.2. Многошаговая цепочка #
chains:
- name: "data-pipeline"
schedule: "0 1 * * *" # 1 AM daily
live: true
max_instances: 1
timeout: 7200000 # 2 hours
tasks:
- name: "extract"
command: |
CREATE TEMP TABLE temp_data AS
SELECT * FROM source_table
WHERE date >= CURRENT_DATE - INTERVAL '1 day'
- name: "validate"
command: |
DO $$
BEGIN
IF (SELECT COUNT(*) FROM temp_data) = 0 THEN
RAISE EXCEPTION 'No data to process';
END IF;
END $$
- name: "transform"
command: "CALL transform_data_procedure()"
autonomous: true
- name: "load"
command: "INSERT INTO target_table SELECT * FROM temp_data"
J.4.11.3.3.3. Задачи программы #
chains:
- name: "backup-job"
schedule: "0 3 * * 0" # Sunday 3 AM
live: true
client_name: "backup-worker"
tasks:
- name: "database-backup"
kind: "PROGRAM"
command: "pg_dump"
parameters:
- ["-h", "localhost", "-U", "postgres", "-d", "mydb", "-f", "/backups/mydb.sql"]
timeout: 3600000 # 1 hour
- name: "compress-backup"
kind: "PROGRAM"
command: "gzip"
parameters:
- ["/backups/mydb.sql"]
J.4.11.3.3.4. Несколько цепочек в одном файле #
chains:
# Monitoring chain
- name: "health-check"
schedule: "*/15 * * * *" # Every 15 minutes
live: true
tasks:
- command: "SELECT check_database_health()"
# Cleanup chain
- name: "hourly-cleanup"
schedule: "0 * * * *" # Every hour
live: true
tasks:
- command: "DELETE FROM logs WHERE created_at < now() - interval '7 days'"
J.4.11.4. Расширенные возможности #
J.4.11.4.1. Обработка ошибок #
Управление поведением при ошибках с помощью ignore_error и on_error:
chains:
- name: "resilient-chain"
on_error: |
SELECT pg_notify('monitoring',
format('{"ConfigID": %s, "Message": "Something bad happened"}',
current_setting('pg_timetable.current_chain_id')::bigint))
tasks:
- name: "risky-task"
command: "SELECT might_fail()"
ignore_error: true # Continue chain execution even if this task fails
- name: "cleanup-task"
command: "SELECT cleanup()" # Always runs, even if previous task failed
J.4.11.4.2. Управление транзакциями #
Используйте autonomous: true для задач, которые необходимо выполнять вне основной транзакции:
tasks:
- name: "vacuum-task"
command: "VACUUM FULL heavy_table"
autonomous: true # Required for VACUUM FULL
- name: "create-database"
command: "CREATE DATABASE new_db"
autonomous: true # CREATE DATABASE requires autonomous transaction
J.4.11.4.3. Удалённые базы данных #
Выполнение задач в разных базах данных:
tasks:
- name: "cross-database-task"
command: "SELECT sync_data()"
connect_string: "postgresql://user:pass@other-host/other-db"
J.4.11.5. Валидация #
YAML-файлы проверяются при загрузке:
Синтаксис: Допустимый формат YAML
Структура: Обязательные поля присутствуют
Cron: Допустимые 5-полюсные cron-выражения
Виды задач: Должно быть SQL, PROGRAM или BUILTIN
Тайм-ауты: неотрицательные целые числа
Используйте --validate, чтобы проверить файлы без импорта:
pg_timetable --file chains.yaml --validate
J.4.11.6. Миграция с SQL #
J.4.11.6.1. Преобразование существующих цепочек #
Чтобы преобразовать SQL-цепочки в YAML:
Информация о цепочке запросов и задачах:
SELECT * FROM timetable.chain c WHERE c.chain_name = 'my-chain'; SELECT t.* FROM timetable.task t JOIN timetable.chain c ON t.chain_id = c.chain_id AND c.chain_name = 'my-chain' ORDER BY t.task_order;Преобразовать в формат YAML:
chain_name→namerun_at→schedulelive→livemax_instances→max_instancesПоля задачи отображаются напрямую
Проверьте преобразование:
pg_timetable --file converted.yaml --validate
J.4.11.6.2. Пример миграции #
Исходный SQL:
SELECT timetable.add_job(
job_name => 'daily-report',
job_schedule => '0 9 * * *',
job_command => 'CALL generate_report()',
job_live => TRUE
);
Преобразованный YAML:
chains:
- name: "daily-report"
schedule: "0 9 * * *"
live: true
tasks:
- command: "CALL generate_report()"
J.4.11.7. Лучшие практики #
J.4.11.7.1. Соглашения об именовании #
Используйте описательные имена в kebab-case
Включайте среду в имя для ясности
Группируйте связанные цепочки в одном файле
J.4.11.7.2. Документация #
Используйте комментарии YAML для документирования сложной логики
Включайте назначение и зависимости в имена задач
Задокументируйте значения параметров
chains:
- name: "etl-sales-data"
# Processes daily sales data from external API
# Depends on: external API availability, sales_raw table
schedule: "0 2 * * *"
tasks:
- name: "extract-from-api"
# Fetches last 24h of sales data from REST API
command: "SELECT fetch_sales_data($1)"
parameters: ["yesterday"]
J.4.11.7.3. Тестирование #
Всегда проверяйте YAML перед развертыванием
Тест с флагом
--validateИспользуйте неактивные цепочки для тестирования
Храните резервные копии рабочих конфигураций
J.4.11.7.4. Управление версиями #
Храните YAML-файлы в системе управления версиями
Используйте содержательные сообщения фиксаций
Тегированные релизы для производственных внедрений
Проверьте изменения перед слиянием
J.4.11.8. Устранение неполадок #
J.4.11.8.1. Распространённые проблемы #
Недопустимый синтаксис YAML:
Error: failed to parse YAML: yaml: line 5: found character that cannot start any token
→ Проверьте отступы и кавычки
Недопустимый формат cron:
Error: invalid cron format: 0 9 * * (expected 5 fields)
→ Убедитесь, что в cron ровно 5 полей
Цепочка уже существует:
Error: chain 'my-chain' already exists (use --replace flag to overwrite)
→ Используйте флаг --replace или выберите другое имя
Отсутствуют обязательные поля:
Error: chain 1: chain name is required
→ Проверьте, что все обязательные поля присутствуют
J.4.12. Миграция с других планировщиков #
J.4.12.1. Перенос заданий из pg_cron в pg_timetable #
Если вы хотите быстро экспортировать задания, запланированные из pg_cron в pg_timetable, вы можете использовать этот фрагмент SQL:
SELECT timetable.add_job(
job_name => COALESCE(jobname, 'job: ' || command),
job_schedule => schedule,
job_command => command,
job_kind => 'SQL',
job_live => active
) FROM cron.job;
timetable.add_job(), однако, имеет некоторые ограничения. Прежде всего, функция пометит созданную задачу как автономную, указывая, что планировщик должен выполнять задачу вне цепочки транзакций. Это не ошибка, но множество автономных цепочек может привести к использованию дополнительных соединений.
Во-вторых, параметры подключения к базе данных теряются для исходных pg_cron задач, делая все задачи локальными. Чтобы экспортировать всю доступную информацию как можно точнее, используйте этот фрагмент SQL под ролью, в которой они были запланированы в pg_cron:
SET ROLE 'scheduler'; -- set the role used by pg_cron
WITH cron_chain AS (
SELECT
nextval('timetable.chain_chain_id_seq'::regclass) AS cron_id,
jobname,
schedule,
active,
command,
CASE WHEN
database != current_database()
OR nodename != 'localhost'
OR username != CURRENT_USER
OR nodeport != inet_server_port()
THEN
format('host=%s port=%s dbname=%s user=%s', nodename, nodeport, database, username)
END AS connstr
FROM
cron.job
),
cte_chain AS (
INSERT INTO timetable.chain (chain_id, chain_name, run_at, live)
SELECT
cron_id, COALESCE(jobname, 'cronjob' || cron_id), schedule, active
FROM
cron_chain
),
cte_tasks AS (
INSERT INTO timetable.task (chain_id, task_order, kind, command, database_connection)
SELECT
cron_id, 1, 'SQL', command, connstr
FROM
cron_chain
RETURNING
chain_id, task_id
)
SELECT * FROM cte_tasks;
J.4.12.2. Перенос заданий из pgAgent в pg_timetable #
Чтобы перенести задания из pgAgent, пожалуйста, используйте этот скрипт. pgAgent не имеет концепции задачи PROGRAM, поэтому для эмуляции шагов BATCH, pg_timetable будет выполнять их внутри оболочки. Вы можете изменить оболочку, отредактировав предложение CTE cte_shell.
CREATE OR REPLACE FUNCTION bool_array_to_cron(bool[], start_with int4 DEFAULT 0) RETURNS TEXT AS
$$
WITH u AS (
SELECT unnest($1) e, generate_series($2, array_length($1, 1)-1+$2) AS i
)
SELECT COALESCE(string_agg(i::text, ','), '*') FROM u WHERE e
$$
LANGUAGE sql;
WITH
cte_shell(shell, cmd_param) AS (
VALUES ('sh', '-c') -- set the shell you want to use for batch steps, e.g. "pwsh -c", "cmd /C"
),
pga_schedule AS (
SELECT
s.jscjobid,
s.jscname,
format('%s %s %s %s %s',
bool_array_to_cron(s.jscminutes),
bool_array_to_cron(s.jschours),
bool_array_to_cron(s.jscmonthdays),
bool_array_to_cron(s.jscmonths, 1),
bool_array_to_cron(s.jscweekdays, 1)) AS schedule
FROM
pgagent.pga_schedule s
WHERE s.jscenabled
AND now() < COALESCE(s.jscend, 'infinity'::timestamptz)
AND now() > s.jscstart
),
pga_chain AS (
SELECT
nextval('timetable.chain_chain_id_seq'::regclass) AS chain_id,
jobid,
format('%s @ %s', jobname, jscname) AS jobname,
jobhostagent,
jobenabled,
schedule
FROM
pgagent.pga_job JOIN pga_schedule ON jobid = jscjobid
),
cte_chain AS (
INSERT INTO timetable.chain (chain_id, chain_name, client_name, run_at, live)
SELECT
chain_id, jobname, jobhostagent, schedule, jobenabled
FROM
pga_chain
),
pga_step AS (
SELECT
c.chain_id,
nextval('timetable.task_task_id_seq'::regclass) AS task_id,
rank() OVER (ORDER BY jstname) AS jstorder,
jstid,
jstname,
jstenabled,
CASE jstkind WHEN 'b' THEN 'PROGRAM' ELSE 'SQL' END AS jstkind,
jstcode,
COALESCE(
NULLIF(jstconnstr, ''),
CASE
WHEN jstdbname = current_database() THEN NULL
WHEN jstdbname > '' THEN 'dbname=' || jstdbname
END
) AS jstconnstr,
jstonerror != 'f' AS jstignoreerror
FROM
pga_chain c JOIN pgagent.pga_jobstep js ON c.jobid = js.jstjobid
),
cte_tasks AS (
INSERT INTO timetable.task(task_id, chain_id, task_name, task_order, kind, command, database_connection)
SELECT
task_id, chain_id, jstname, jstorder, jstkind::timetable.command_kind,
CASE jstkind WHEN 'SQL' THEN jstcode ELSE sh.shell END,
jstconnstr
FROM
pga_step, cte_shell sh
),
cte_parameters AS (
INSERT INTO timetable.parameter (task_id, order_id, value)
SELECT
task_id, 1, jsonb_build_array(sh.cmd_param, s.jstcode)
FROM
pga_step s, cte_shell sh
WHERE
s.jstkind = 'PROGRAM'
)
SELECT * FROM pga_chain;
J.4.13. REST API #
pg_timetable имеет богатый REST API, который может использоваться внешними инструментами для выполнения запуска/остановки/инициализации/перезапуска/перезагрузки, любыми инструментами для выполнения HTTP-проверок состояния, и, конечно, также может использоваться для мониторинга.
Ниже вы найдете список pg_timetable REST API конечных точек.
J.4.13.1. Точки проверки состояния #
-
GET /liveness Всегда возвращает код состояния HTTP
200, указывая на то, что pg_timetable работает.-
GET /readiness Возвращает HTTP статус-код
200, когда pg_timetable работает, и планировщик находится в основном цикле обработки цепочек. Если планировщик подключается к базе данных, создает схему базы данных или обновляет ее, он вернет HTTP статус-код503.
J.4.13.2. Управление цепочкой конечные точки #
-
GET /startchain?id=<chain-id> Возвращает HTTP статус-код
200, если цепочка с данным идентификатором может быть добавлена в очередь рабочих. Однако это не означает, что выполнение цепочки начнется немедленно. Рабочий процесс должен выполнить проверку нагрузки и другие проверки перед началом выполнения цепочки. В случае ошибки возвращается HTTP статус-код400с последующим сообщением об ошибке.-
GET /stopchain?id=<chain-id> Возвращает HTTP статус-код
200, если цепочка с данным идентификатором работает в данный момент и может быть остановлена. Если цепочка выполняется, сигнал отмены будет отправлен немедленно. В случае ошибки возвращается HTTP статус-код400с сообщением об ошибке.
J.4.14. Формат определения цепочки YAML для pg_timetable #
Этот документ определяет формат YAML для описания цепочек запланированных задач в pg_timetable.
J.4.14.1. YAML-схема #
# Top-level structure
chains:
- name: "chain-name" # Required: chain_name (TEXT, unique)
schedule: "* * * * *" # Required: run_at (cron format)
live: true # Optional: live (BOOLEAN), default: false
max_instances: 1 # Optional: max_instances (INTEGER)
timeout: 30000 # Optional: timeout in milliseconds (INTEGER)
self_destruct: false # Optional: self_destruct (BOOLEAN), default: false
exclusive: false # Optional: exclusive_execution (BOOLEAN), default: false
client_name: "worker-1" # Optional: client_name (TEXT)
on_error: "SELECT log_error()" # Optional: on_error SQL (TEXT)
tasks: # Required: array of tasks
- name: "task-1" # Optional: task_name (TEXT)
kind: "SQL" # Optional: kind (SQL|PROGRAM|BUILTIN), default: SQL
command: "SELECT $1, $2" # Required: command (TEXT)
parameters: # Optional: parameters (array of execution parameters)
- ["value1", 42] # First execution with these parameters
- ["value2", 99] # Second execution with different parameters
run_as: "postgres" # Optional: run_as (TEXT) - role for SET ROLE
connect_string: "postgresql://user@host/otherdb" # Optional: database_connection (TEXT)
ignore_error: false # Optional: ignore_error (BOOLEAN), default: false
autonomous: false # Optional: autonomous (BOOLEAN), default: false
timeout: 5000 # Optional: timeout in milliseconds (INTEGER)
- name: "task-2"
kind: "PROGRAM"
command: "bash"
parameters: ["-c", "echo hello"]
ignore_error: true
J.4.14.2. Соответствие полей #
J.4.14.2.1. Уровень цепочки #
| Поле YAML | Столбец БД | Тип | По умолчанию | Описание |
|---|---|---|---|---|
name | chain_name | TEXT | обязательно | Уникальный идентификатор цепочки |
schedule | run_at | cron | обязательно | Расписание в стиле cron |
live | live | BOOLEAN | false | Активна ли цепочка |
max_instances | max_instances | INTEGER | null | Максимальное количество параллельных экземпляров |
timeout | timeout | INTEGER | 0 | Тайм-аут цепочки (мс) |
self_destruct | self_destruct | BOOLEAN | false | Удалить после успешного выполнения |
exclusive | exclusive_execution | BOOLEAN | false | Приостанавливать другие цепочки |
client_name | client_name | ТЕКСТ | null | Ограничить для конкретного клиента |
on_error | on_error | ТЕКСТ | null | SQL для обработки ошибок |
J.4.14.2.2. Уровень задачи #
| Поле YAML | Столбец БД | Тип | По умолчанию | Описание |
|---|---|---|---|---|
name | task_name | TEXT | null | Описание задачи |
kind | kind | ENUM | 'SQL' | Тип команды (SQL/PROGRAM/BUILTIN) |
command | command | TEXT | обязательно | Команда для выполнения |
параметры | через timetable.parameter | Массив любых значений | null | Массив значений параметров, хранящихся как отдельные строки JSONB с order_id |
run_as | run_as | TEXT | null | Роль для SET ROLE |
connect_string | database_connection | TEXT | null | Строка подключения |
ignore_error | ignore_error | BOOLEAN | false | Продолжать при ошибке |
autonomous | autonomous | BOOLEAN | false | Выполнять вне транзакции |
timeout | timeout | INTEGER | 0 | Тайм-аут задачи (мс) |
J.4.14.3. Порядок выполнения задач #
Задачи упорядочены последовательно внутри цепочки на основе их позиции в массиве. Система автоматически присвоит соответствующие значения task_order с интервалами (например, 10, 20, 30), чтобы обеспечить возможность будущих вставок.
J.4.14.4. Примеры #
J.4.14.4.1. Простая SQL задача #
chains:
- name: "daily-report"
schedule: "0 9 * * *" # 9 AM daily
live: true
tasks:
- name: "generate-report"
command: "CALL generate_daily_report()"
J.4.14.4.2. Многозадачная цепочка #
chains:
- name: "etl-pipeline"
schedule: "0 2 * * *" # 2 AM daily
live: true
max_instances: 1
timeout: 3600000 # 1 hour
tasks:
- name: "extract-data"
command: "SELECT extract_sales_data($1)"
parameters: ["2023-01-01"]
- name: "transform-data"
command: "CALL transform_sales_data()"
autonomous: true
- name: "load-data"
command: "CALL load_to_warehouse()"
ignore_error: false
J.4.14.4.3. Программная задача #
chains:
- name: "backup-job"
schedule: "0 3 * * 0" # Sunday 3 AM
live: true
tasks:
- name: "pg-dump"
kind: "PROGRAM"
command: "pg_dump"
parameters:
- ["-h", "localhost", "-U", "postgres", "-d", "mydb", "-f", "/backups/mydb.sql"]
J.4.14.5. Правила проверки #
Обязательные поля:
name,schedule,tasksиcommandдля каждой задачиУникальные имена: Имена цепочек должны быть уникальными в пределах базы данных
Допустимый Cron: Расписание должно быть в допустимом формате cron (5 полей)
Допустимый тип: Тип задачи должен быть одним из следующих: SQL, PROGRAM, BUILTIN
Типы параметров: Параметры могут быть любым типом, совместимым с JSON (строки, числа, логические значения, массивы, объекты) и хранятся как отдельные значения JSONB
Значения тайм-аутов: Должны быть неотрицательными целыми числами (миллисекунды)