H.4. pg_timetable#
H.4. pg_timetable
- H.4.1. О pg_timetable
- H.4.2. Основные функции
- H.4.3. Быстрый старт
- H.4.4. Параметры командной строки
- H.4.5. Для разработчика
- H.4.6. Поддержка
- H.4.7. Авторы
- H.4.8. Фон проекта
- H.4.9. Установка
- H.4.10. Компоненты
- H.4.11. Начало работы
- H.4.12. Примеры
- H.4.13. Миграция с других планировщиков
- H.4.14. REST API
- H.4.15. Схема базы данных
pg_timetable — это продвинутый планировщик задач для Tantor SE-1С, предлагающий множество преимуществ по сравнению с традиционными планировщиками, такими как cron и другие. Он полностью управляется базой данных и предоставляет несколько продвинутых концепций.
H.4.2. Основные функции
Задачи могут быть организованы в цепочки
Цепочка может состоять из встроенных команд, SQL и исполняемых файлов
Параметры могут быть переданы в цепочки
Пропущенные задачи (возможно, из-за простоя) могут быть повторно выполнены автоматически
Поддержка настраиваемых повторений
Встроенные задачи, такие как отправка электронных писем и т.д.
Полностью управляемая базой данных конфигурация
Полная поддержка ведения журналов на основе базы данных
Планирование в стиле Cron в часовом поясе сервера Tantor SE-1С
Необязательная защита от параллелизма
Задача и цепочка могут иметь настройки времени ожидания выполнения
H.4.3. Быстрый старт
Скачайте исполняемый файл pg_timetable
Убедитесь, что ваш сервер Tantor SE-1С запущен и имеет роль с привилегией
CREATEдля целевой базы данных, например,my_database=> CREATE ROLE scheduler PASSWORD 'somestrong'; my_database=> GRANT CREATE ON DATABASE my_database TO scheduler;
Создайте новую задачу, например, выполняйте
VACUUMкаждую ночь в 00:30 по часовому поясу сервера Postgresmy_database=> SELECT timetable.add_job('frequent-vacuum', '30 * * * *', 'VACUUM'); add_job --------- 3 (1 row)Запустите pg_timetable
# pg_timetable postgresql://scheduler:somestrong@localhost/my_database --clientname=vacuumer
Успех!
H.4.4. Параметры командной строки
# ./pg_timetable
Application Options:
-c, --clientname= Unique name for application instance [$PGTT_CLIENTNAME]
--config= YAML configuration file
--no-program-tasks Disable executing of PROGRAM tasks [$PGTT_NOPROGRAMTASKS]
-v, --version Output detailed version information [$PGTT_VERSION]
Connection:
-h, --host= Host (default: localhost) [$PGTT_PGHOST]
-p, --port= Port (default: 5432) [$PGTT_PGPORT]
-d, --dbname= Database name (default: timetable) [$PGTT_PGDATABASE]
-u, --user= User (default: scheduler) [$PGTT_PGUSER]
--password= User password [$PGTT_PGPASSWORD]
--sslmode=[disable|require] Connection SSL mode (default: disable) [$PGTT_PGSSLMODE]
--pgurl= Connection URL [$PGTT_URL]
--timeout= Connection timeout (default: 90) [$PGTT_TIMEOUT]
Logging:
--log-level=[debug|info|error] Verbosity level for stdout and log file (default: info)
--log-database-level=[debug|info|error|none] Verbosity level for database storing (default: info)
--log-file= File name to store logs
--log-file-format=[json|text] Format of file logs (default: json)
--log-file-rotate Rotate log files
--log-file-size= Maximum size in MB of the log file before it gets rotated (default: 100)
--log-file-age= Number of days to retain old log files, 0 means forever (default: 0)
--log-file-number= Maximum number of old log files to retain, 0 to retain all (default: 0)
Start:
-f, --file= SQL script file to execute during startup
--init Initialize database schema to the latest version and exit. Can be used
with --upgrade
--upgrade Upgrade database to the latest version
--debug Run in debug mode. Only asynchronous chains will be executed
Resource:
--cron-workers= Number of parallel workers for scheduled chains (default: 16)
--interval-workers= Number of parallel workers for interval chains (default: 16)
--chain-timeout= Abort any chain that takes more than the specified number of
milliseconds
--task-timeout= Abort any task within a chain that takes more than the specified number
of milliseconds
REST:
--rest-port= REST API port (default: 0) [$PGTT_RESTPORT]
H.4.5. Для разработчика
Если вы хотите внести вклад в pg_timetable и помочь сделать его лучше, не стесняйтесь открыть задачу или даже рассмотреть возможность отправки запроса на добавление изменений. Вы также можете поставить звезду проекту pg_timetable и рассказать о нем миру.
H.4.6. Поддержка
Для профессиональной поддержки, пожалуйста, свяжитесь с Cybertec.
H.4.8. Фон проекта
Проект pg_timetable был запущен еще в 2019 году для внутренних нужд планирования в компании Cybertec.
Для получения дополнительной информации о мотивах проекта и целях проектирования см. оригинальную серию блог-постов, объявляющих о проекте, и последующие обновления функций.
Cybertec также предоставляет коммерческую поддержку с 9 до 5 и 24/7 для pg_timetable.
H.4.8.1. Обратная связь по проекту
Для запросов на добавление функций или помощи в устранении неполадок, пожалуйста, откройте проблему на странице Github проекта.
H.4.9. Установка
pg_timetable совместим с последними поддерживаемыми версиями Tantor SE-1С.
Примечание
Если вы хотите использовать pg_timetable со старыми версиями (9.5, 9.6 и 10)... пожалуйста, выполните эту SQL команду перед запуском pg_timetable:
CREATE OR REPLACE FUNCTION starts_with(text, text)
RETURNS bool AS
$$
SELECT
CASE WHEN length($2) > length($1) THEN
FALSE
ELSE
left($1, length($2)) = $2
END
$$
LANGUAGE SQL
IMMUTABLE STRICT PARALLEL SAFE
COST 5;
H.4.9.1. Официальные пакеты релизов
Вы можете найти бинарный пакет для вашей платформы на официальной странице выпусков. В настоящее время доступны пакеты для Windows, Linux и macOS.
H.4.9.2. Docker
Официальный образ docker можно найти здесь: https://hub.docker.com/r/cybertecpostgresql/pg_timetable
Примечание
Тег latest актуален с веткой master благодаря
этому
действию github. В производственной среде вам, вероятно, следует использовать
последний
стабильный
тег.
Запустите pg_timetable в Docker:
docker run --rm \ cybertecpostgresql/pg_timetable:latest \ -h 10.0.0.3 -p 54321 -c worker001
Запустите pg_timetable в Docker с переменными окружения:
docker run --rm \ -e PGTT_PGHOST=10.0.0.3 \ -e PGTT_PGPORT=54321 \ cybertecpostgresql/pg_timetable:latest \ -c worker001
H.4.10. Компоненты
Планирование в pg_timetable охватывает три различных уровня абстракции, чтобы облегчить повторное использование с другими параметрами или дополнительными расписаниями.
- Command:
Базовый уровень, команда, определяет что делать.
- Task:
Второй уровень, задача, представляет собой элемент цепочки (шаг) для выполнения одной из команд. С помощью задач мы определяем порядок команд, передаваемые аргументы (если есть), и как обрабатываются ошибки.
- Chain:
Третий уровень представляет собой связанные задачи, образующие цепочку задач. Цепочка определяет если, когда и как часто должна выполняться задача.
H.4.10.1. Команда
В настоящее время существует три различных типа команд:
-
SQL SQL фрагмент. Начало очистки, обновление материализованного представления или обработка данных.
-
PROGRAM Внешняя команда. Все, что может быть вызвано как внешний бинарный файл, включая оболочки, например,
bash,pwshи т.д. Внешняя команда будет вызвана с использованием exec.CommandContext из golang.-
BUILTIN Внутренняя команда. Предварительно встроенная функциональность, включенная в pg_timetable. К ним относятся:
NoOp,
Сон,
Журнал,
SendMail,
Скачать,
CopyFromFile,
КопироватьВФайл,
Shutdown.
H.4.10.2. Задача
Следующим строительным блоком является задача, которая просто представляет собой шаг в списке цепочки команд. Примером задач, объединенных в цепочку, может быть:
Загрузить файлы с сервера
Импорт файлов
Выполнение агрегаций
Создать отчет
Удалить файлы с диска
Примечание
Все задачи цепочки в
pg_timetable выполняются
в рамках одной транзакции. Однако, обратите внимание, что нет
возможности откатить задачи PROGRAM и
BUILTIN.
H.4.10.2.1. Таблица timetable.task
-
chain_id bigint Ссылка на цепочку, если задача
NULLсчитается отключенной-
task_order DOUBLE PRECISION Указывает порядок задачи в цепочке.
-
kind timetable.command_kind Тип команды. Может быть SQL (по умолчанию), PROGRAM или BUILTIN.
-
command text Содержит либо SQL-команду, либо путь к приложению или имя команды BUILTIN, которая будет выполнена.
-
run_as text Роль, от имени которой должна выполняться задача.
-
database_connection text Строка подключения для внешней базы данных, которая должна быть использована.
-
ignore_error boolean Укажите, следует ли продолжать выполнение следующей задачи после возникновения ошибки (по умолчанию:
false).-
autonomous boolean Укажите, следует ли выполнять задачу вне транзакции цепочки. Полезно для
VACUUM,CREATE DATABASE,CALLи т. д.-
timeout integer Прервать любую задачу в цепочке, которая занимает больше указанного количества миллисекунд.
Предупреждение
Если задача была
настроена с ignore_error, установленным в
true (значение по умолчанию -
false), процесс-работник будет сообщать об
успешном выполнении даже если задача в цепочке
завершится неудачей.
Как упоминалось выше, команды являются простыми скелетами (например, отправить email, вакуум и т.д.). В большинстве случаев они должны быть оживлены путем передачи входных параметров для выполнения.
H.4.10.2.2. Table timetable.parameter
-
task_id bigint Идентификатор задачи.
-
order_id integer Порядок параметра. Несколько параметров обрабатываются один за другим в соответствии с порядком.
-
value jsonb Значение JSON, содержащее параметры.
H.4.10.2.3. Формат значения параметра
В зависимости от команды аргумент kind может быть представлен различными значениями JSON.
- Kind
- Schema
Пример
-
SQL -
array '[ "one", 2, 3.14, false ]'::jsonb
-
-
PROGRAM -
array of strings '["-x", "Latin-ASCII", "-o", "orte_ansi.txt", "orte.txt"]'::jsonb
-
-
BUILTIN: Sleep -
integer '5' :: jsonb
-
-
BUILTIN: Log -
any '"WARNING"'::jsonb '{"Status": "WARNING"}'::jsonb
-
-
BUILTIN: SendMail -
object '{ "username": "user@example.com", "password": "password", "serverhost": "smtp.example.com", "serverport": 587, "senderaddr": "user@example.com", "ccaddr": ["recipient_cc@example.com"], "bccaddr": ["recipient_bcc@example.com"], "toaddr": ["recipient@example.com"], "subject": "pg_timetable - No Reply", "attachment": ["/temp/attachments/Report.pdf","config.yaml"], "attachmentdata": [{"name": "File.txt", "base64data": "RmlsZSBDb250ZW50"}], "msgbody": "<h2>Hello User,</h2> <p>check some attachments!</p>", "contenttype": "text/html; charset=UTF-8" }'::jsonb
-
-
BUILTIN: Download -
object '{ "workersnum": 2, "fileurls": ["http://example.com/foo.gz", "https://example.com/bar.csv"], "destpath": "." }'::jsonb
-
-
BUILTIN: CopyFromFile -
object '{ "sql": "COPY location FROM STDIN", "filename": "download/orte_ansi.txt" }'::jsonb
-
-
BUILTIN: CopyToFile -
object '{ "sql": "COPY location TO STDOUT", "filename": "download/location.txt" }'::jsonb
-
-
BUILTIN: Shutdown значение игнорируется
-
BUILTIN: NoOp значение игнорируется
H.4.10.3. Цепочка
Как только задачи были упорядочены, их необходимо запланировать как цепочку. Для этого pg_timetable основывается на расширенной строке cron, при этом добавляя множество вариантов конфигурации.
H.4.10.3.1. Table timetable.chain
-
chain_name text Уникальное имя цепочки.
-
run_at timetable.cron Стандартное значение в стиле cron в часовом поясе сервера Postgres или предложение
@after,@every,@reboot.-
max_instances integer Количество экземпляров, которые эта цепочка может иметь работающими одновременно.
-
timeout integer Прервать любую цепочку, которая занимает больше указанного количества миллисекунд.
-
live boolean Управление возможностью выполнения цепочки, когда она достигает своего расписания.
-
self_destruct boolean Самоуничтожение цепочки после успешного выполнения. Неудачные цепочки будут выполнены еще раз в соответствии с расписанием.
-
exclusive_execution boolean Указывает, следует ли выполнять цепочку исключительно, пока все остальные цепочки приостановлены.
-
client_name text Указывает, какой клиент должен выполнить цепочку. Установите это значение в NULL, чтобы разрешить выполнение любому клиенту.
-
timeout integer Прервать цепочку, которая занимает больше указанного количества миллисекунд.
-
on_error Содержит SQL для выполнения в случае ошибки. Если задача, вызвавшая ошибку, помечена как
ignore_error, то ничего не делается.
Примечание
Все цепочки в pg_timetable запланированы в часовом поясе сервера Tantor SE-1С. Вы можете изменить часовой пояс для текущей сессии при добавлении новых цепочек, например.
SET TIME ZONE 'UTC';
-- Run VACUUM at 00:05 every day in August UTC
SELECT timetable.add_job('execute-func', '5 0 * 8 *', 'VACUUM');
H.4.11. Начало работы
Разнообразные примеры можно найти в Примеры. Если вы хотите перейти с другого планировщика, вы можете использовать скрипты из Миграция с других планировщиков главы.
H.4.11.1. Добавить простую задачу
В реальном мире обычно достаточно использовать простые задания. Под этим термином мы понимаем:
job — это цепочка, содержащая только одну задачу (шаг);
это не использует сложную логику, а скорее простую команду;
это не требует сложной обработки транзакций, так как одна задача неявно выполняется как одна транзакция.
Для такой группы цепочек мы ввели специальную функцию
timetable.add_job().
- timetable.add_job(job_name, job_schedule, job_command, ...) RETURNS BIGINT
Создает простую цепочку из одной задачи
- Parameters:
job_name (text) – Уникальное имя цепочки и команды.
job_schedule (timetable.cron) – Расписание времени в синтаксисе cron в часовом поясе сервера Postgres
job_command (text) – SQL, который будет выполнен.
job_parameters (jsonb) – Аргументы для цепочки command. По умолчанию:
NULL.job_kind (timetable.command_kind) – Тип команды: SQL, PROGRAM или BUILTIN. По умолчанию:
SQL.job_client_name (text) – Указывает, какой клиент должен выполнить цепочку. Установите это значение в NULL, чтобы разрешить любому клиенту. По умолчанию:
NULL.job_max_instances (integer) – Количество экземпляров, которые эта цепочка может иметь, выполняясь одновременно. По умолчанию:
NULL.job_live (boolean) – Управляет возможностью выполнения цепочки при достижении расписания. По умолчанию:
TRUE.job_self_destruct (boolean) – Самоуничтожение цепочки после выполнения. По умолчанию:
FALSE.job_ignore_errors (boolean) – Игнорировать ошибки во время выполнения. По умолчанию:
TRUE.job_exclusive (boolean) – Выполнять цепочку в эксклюзивном режиме. По умолчанию:
FALSE.
- Returns:
ID созданной цепочки
- Return type:
целое число
H.4.11.2. Примеры
Запустите
public.my_func()в 00:05 каждый день в августе по часовому поясу сервера Postgres:SELECT timetable.add_job('execute-func', '5 0 * 8 *', 'SELECT public.my_func()');Запускать VACUUM на 23-й минуте каждого 2-го часа с 0 до 20 каждый день по часовому поясу сервера Postgres:
SELECT timetable.add_job('run-vacuum', '23 0-20/2 * * *', 'VACUUM');Обновлять материализованное представление каждые 2 часа:
SELECT timetable.add_job('refresh-matview', '@every 2 hours', 'REFRESH MATERIALIZED VIEW public.mat_view');Очистить таблицу журнала после pg_timetable перезапуска:
SELECT timetable.add_job('clear-log', '@reboot', 'TRUNCATE timetable.log');Переиндексация в полночь по часовому поясу сервера Postgres по воскресеньям с помощью утилиты reindexdb:
используя базу данных по умолчанию под пользователем по умолчанию (без аргументов командной строки)
SELECT timetable.add_job('reindex', '0 0 * * 7', 'reindexdb', job_kind := 'PROGRAM');указание целевой базы данных и таблиц, и быть подробным
SELECT timetable.add_job('reindex', '0 0 * * 7', 'reindexdb', '["--table=foo", "--dbname=postgres", "--verbose"]'::jsonb, 'PROGRAM');передача пароля с использованием переменной окружения через
bashоболочкуSELECT timetable.add_job('reindex', '0 0 * * 7', 'bash', '["-c", "PGPASSWORD=5m3R7K4754p4m reindexdb -U postgres -h 192.168.0.221 -v"]'::jsonb, 'PROGRAM');
H.4.12. Примеры
H.4.12.1. Основы
Этот пример демонстрирует, как создать базовую цепочку из одного шага с параметрами. Он использует CTE для непосредственного обновления таблиц схемы timetable.
SELECT timetable.add_job(
job_name => 'notify every minute',
job_schedule => '* * * * *',
job_command => 'SELECT pg_notify($1, $2)',
job_parameters => '[ "TT_CHANNEL", "Ahoj from SQL base task" ]' :: jsonb,
job_kind => 'SQL'::timetable.command_kind,
job_client_name => NULL,
job_max_instances => 1,
job_live => TRUE,
job_self_destruct => FALSE,
job_ignore_errors => TRUE
) as chain_id;
H.4.12.2. Отправить электронное письмо
Этот пример демонстрирует, как создать продвинутую задачу отправки электронной почты. Он будет проверять, есть ли электронные письма для отправки, отправлять их и записывать статус выполнения команды. Вам не нужно ничего настраивать, каждый параметр может быть указан во время создания цепочки.
DO $$
-- An example for using the SendMail task.
DECLARE
v_mail_task_id bigint;
v_log_task_id bigint;
v_chain_id bigint;
BEGIN
-- Get the chain id
INSERT INTO timetable.chain (chain_name, max_instances, live) VALUES ('Send Mail', 1, TRUE)
RETURNING chain_id INTO v_chain_id;
-- Add SendMail task
INSERT INTO timetable.task (chain_id, task_order, kind, command)
SELECT v_chain_id, 10, 'BUILTIN', 'SendMail'
RETURNING task_id INTO v_mail_task_id;
-- Create the parameters for the SensMail task
-- "username": The username used for authenticating on the mail server
-- "password": The password used for authenticating on the mail server
-- "serverhost": The IP address or hostname of the mail server
-- "serverport": The port of the mail server
-- "senderaddr": The email that will appear as the sender
-- "ccaddr": String array of the recipients(Cc) email addresses
-- "bccaddr": String array of the recipients(Bcc) email addresses
-- "toaddr": String array of the recipients(To) email addresses
-- "subject": Subject of the email
-- "attachment": String array of the attachments (local file)
-- "attachmentdata": Pairs of name and base64-encoded content
-- "msgbody": The body of the email
INSERT INTO timetable.parameter (task_id, order_id, value)
VALUES (v_mail_task_id, 1, '{
"username": "user@example.com",
"password": "password",
"serverhost": "smtp.example.com",
"serverport": 587,
"senderaddr": "user@example.com",
"ccaddr": ["recipient_cc@example.com"],
"bccaddr": ["recipient_bcc@example.com"],
"toaddr": ["recipient@example.com"],
"subject": "pg_timetable - No Reply",
"attachment": ["D:\\Go stuff\\Books\\Concurrency in Go.pdf","report.yaml"],
"attachmentdata": [{"name": "File.txt", "base64data": "RmlsZSBDb250ZW50"}],
"msgbody": "<b>Hello User,</b> <p>I got some Go books for you enjoy</p> <i>pg_timetable</i>!",
"contenttype": "text/html; charset=UTF-8"
}'::jsonb);
-- Add Log task and make it the last task using `task_order` column (=30)
INSERT INTO timetable.task (chain_id, task_order, kind, command)
SELECT v_chain_id, 30, 'BUILTIN', 'Log'
RETURNING task_id INTO v_log_task_id;
-- Add housekeeping task, that will delete sent mail and update parameter for the previous logging task
-- Since we're using special add_task() function we don't need to specify the `chain_id`.
-- Function will take the same `chain_id` from the parent task, SendMail in this particular case
PERFORM timetable.add_task(
kind => 'SQL',
parent_id => v_mail_task_id,
command => format(
$query$WITH sent_mail(toaddr) AS (DELETE FROM timetable.parameter WHERE task_id = %s RETURNING value->>sername')
INSERT INTO timetable.parameter (task_id, order_id, value)
SELECT %s, 1, to_jsonb('Sent emails to: ' || string_agg(sent_mail.toaddr, ';'))
FROM sent_mail
ON CONFLICT (task_id, order_id) DO UPDATE SET value = EXCLUDED.value$query$,
v_mail_task_id, v_log_task_id
),
order_delta => 10
);
-- In the end we should have something like this. Note, that even Log task was created earlier it will be executed ter
-- due to `task_order` column.
-- timetable=> SELECT task_id, chain_id, kind, left(command, 50) FROM timetable.task ORDER BY task_order;
-- task_id | chain_id | task_order | kind | left
-- ---------+----------+------------+---------+---------------------------------------------------------------
-- 45 | 24 | 10 | BUILTIN | SendMail
-- 47 | 24 | 20 | SQL | WITH sent_mail(toaddr) AS (DELETE FROM timetable.p
-- 46 | 24 | 30 | BUILTIN | Log
-- (3 rows)
END;
$$
LANGUAGE PLPGSQL;
H.4.12.3. Загрузка, Преобразование и Импорт
Этот пример демонстрирует, как создать улучшенную трехшаговую цепочку с параметрами. Он использует оператор DO для непосредственного обновления таблиц схемы timetable.
-- Prepare the destination table 'location'
CREATE TABLE IF NOT EXISTS city(
city text,
lat numeric,
lng numeric,
country text,
iso2 text,
admin_name text,
capital text,
population bigint,
population_proper bigint);
-- An enhanced example consisting of three tasks:
-- 1. Download text file from internet using BUILT-IN command
-- 2. Remove accents (diacritic signs) from letters using PROGRAM command (can be done with `unaccent` tension)
-- 3. Import text file as CSV file using BUILT-IN command (can be down with `psql -c /copy`)
DO $$
DECLARE
v_head_id bigint;
v_task_id bigint;
v_chain_id bigint;
BEGIN
-- Create the chain with default values executed every minute (NULL == '* * * * *' :: timetable.cron)
INSERT INTO timetable.chain (chain_name, live)
VALUES ('Download locations and aggregate', TRUE)
RETURNING chain_id INTO v_chain_id;
-- Step 1. Download file from the server
-- Create the chain
INSERT INTO timetable.task (chain_id, task_order, kind, command, ignore_error)
VALUES (v_chain_id, 1, 'BUILTIN', 'Download', TRUE)
RETURNING task_id INTO v_task_id;
-- Create the parameters for the step 1:
INSERT INTO timetable.parameter (task_id, order_id, value)
VALUES (v_task_id, 1,
'{
"workersnum": 1,
"fileurls": ["https://simplemaps.com/static/data/country-cities/mt/mt.csv"],
"destpath": "."
}'::jsonb);
RAISE NOTICE 'Step 1 completed. Chain added with ID: %; DownloadFile task added with ID: %', v_chain_id, task_id;
-- Step 2. Transform Unicode characters into ASCII
-- Create the program task to call 'uconv' and name it 'unaccent'
INSERT INTO timetable.task (chain_id, task_order, kind, command, ignore_error, task_name)
VALUES (v_chain_id, 2, 'PROGRAM', 'uconv', TRUE, 'unaccent')
RETURNING task_id INTO v_task_id;
-- Create the parameters for the 'unaccent' task. Input and output files in this case
-- Under Windows we should call PowerShell instead of "uconv" with command:
-- Set-content "orte_ansi.txt" ((Get-content "orte.txt").Normalize("FormD") -replace '\p{M}', '')
INSERT INTO timetable.parameter (task_id, order_id, value)
VALUES (v_task_id, 1, '["-x", "Latin-ASCII", "-o", "mt_ansi.csv", "mt.csv"]'::jsonb);
RAISE NOTICE 'Step 2 completed. Unacent task added with ID: %', v_task_id;
-- Step 3. Import ASCII file to table using "CopyFromFile" built-in command
INSERT INTO timetable.task (chain_id, task_order, kind, command)
VALUES (v_chain_id, 3, 'BUILTIN', 'CopyFromFile')
RETURNING task_id INTO v_task_id;
-- Add the parameters for the download task. Execute client side COPY to 'location' from 'orte_ansi.txt'
INSERT INTO timetable.parameter (task_id, order_id, value)
VALUES (v_task_id, 1, '{"sql": "COPY city FROM STDIN (FORMAT csv, HEADER true)", "filename": "mt_ansi.v" }'::jsonb);
RAISE NOTICE 'Step 3 completed. Import task added with ID: %', v_task_id;
INSERT INTO timetable.task (chain_id, task_order, kind, command, ignore_error, task_name)
VALUES (v_chain_id, 4, 'PROGRAM', 'bash', TRUE, 'remove .csv')
RETURNING task_id INTO v_task_id;
INSERT INTO timetable.parameter (task_id, order_id, value)
VALUES (v_task_id, 1, '["-c", "rm *.csv"]'::jsonb);
END;
$$ LANGUAGE PLPGSQL;
H.4.12.4. Выполнение задач в автономной транзакции
Этот пример демонстрирует, как выполнять специальные задачи вне контекста транзакционной цепочки. Это полезно для специальных процедур и/или нетранзакционных операций, например, CREATE DATABASE, REINDEX, VACUUM, CREATE TABLESPACE и т.д.
-- An advanced example showing how to use atutonomous tasks.
-- This one-task chain will execute test_proc() procedure.
-- Since procedure will make two commits (after f1() and f2())
-- we cannot use it as a regular task, because all regular tasks
-- must be executed in the context of a single chain transaction.
-- Same rule applies for some other SQL commands,
-- e.g. CREATE DATABASE, REINDEX, VACUUM, CREATE TABLESPACE, etc.
CREATE OR REPLACE FUNCTION f (msg TEXT) RETURNS void AS $$
BEGIN
RAISE notice '%', msg;
END;
$$ LANGUAGE PLPGSQL;
CREATE OR REPLACE PROCEDURE test_proc () AS $$
BEGIN
PERFORM f('hey 1');
COMMIT;
PERFORM f('hey 2');
COMMIT;
END;
$$
LANGUAGE PLPGSQL;
WITH
cte_chain (v_chain_id) AS (
INSERT INTO timetable.chain (chain_name, run_at, max_instances, live, self_destruct)
VALUES (
'call proc() every 10 sec', -- chain_name,
'@every 10 seconds', -- run_at,
1, -- max_instances,
TRUE, -- live,
FALSE -- self_destruct
) RETURNING chain_id
),
cte_task(v_task_id) AS (
INSERT INTO timetable.task (chain_id, task_order, kind, command, ignore_error, autonomous)
SELECT v_chain_id, 10, 'SQL', 'CALL test_proc()', TRUE, TRUE
FROM cte_chain
RETURNING task_id
)
SELECT v_chain_id, v_task_id FROM cte_task, cte_chain;
H.4.12.5. Остановите планировщик и завершите сеанс
Этот пример демонстрирует, как завершить работу планировщика, используя специальную встроенную задачу. Это может быть использовано для управления окнами обслуживания, для перезапуска планировщика в целях обновления или для остановки сессии перед удалением базы данных.
-- This one-task chain (aka job) will terminate pg_timetable session.
-- This is useful for maintaining purposes or before database being destroyed.
-- One should take care of restarting pg_timetable if needed.
SELECT timetable.add_job (
job_name => 'Shutdown pg_timetable session on schedule',
job_schedule => '* * 1 * *',
job_command => 'Shutdown',
job_kind => 'BUILTIN'
);
H.4.12.6. Доступ к коду и результату предыдущей задачи из следующей задачи
Этот пример демонстрирует, как проверить код результата и вывод предыдущей задачи. Если последняя задача завершилась неудачно, это возможно только если ignore_error boolean = true установлено для этой задачи. В противном случае планировщик остановит цепочку. Этот пример показывает, как вычислить количество неудачных, успешных и общее количество выполненных задач. На основе этих значений мы можем вычислить коэффициент успеха.
WITH
cte_chain (v_chain_id) AS ( -- let's create a new chain and add tasks to it later
INSERT INTO timetable.chain (chain_name, run_at, max_instances, live)
VALUES ('many tasks', '* * * * *', 1, true)
RETURNING chain_id
),
cte_tasks(v_task_id) AS ( -- now we'll add 500 tasks to the chain, some of them will fail
INSERT INTO timetable.task (chain_id, task_order, kind, command, ignore_error)
SELECT v_chain_id, g.s, 'SQL', 'SELECT 1.0 / round(random())::int4;', true
FROM cte_chain, generate_series(1, 500) AS g(s)
RETURNING task_id
),
report_task(v_task_id) AS ( -- and the last reporting task will calculate the statistic
INSERT INTO timetable.task (chain_id, task_order, kind, command)
SELECT v_chain_id, 501, 'SQL', $CMD$DO
$$
DECLARE
s TEXT;
BEGIN
WITH report AS (
SELECT
count(*) FILTER (WHERE returncode = 0) AS success,
count(*) FILTER (WHERE returncode != 0) AS fail,
count(*) AS total
FROM timetable.execution_log
WHERE chain_id = current_setting('pg_timetable.current_chain_id')::bigint
AND txid = txid_current()
)
SELECT 'Tasks executed:' || total ||
'; succeeded: ' || success ||
'; failed: ' || fail ||
'; ratio: ' || 100.0*success/GREATEST(total,1)
INTO s
FROM report;
RAISE NOTICE '%', s;
END;
$$
$CMD$
FROM cte_chain
RETURNING task_id
)
SELECT v_chain_id FROM cte_chain
H.4.13. Миграция с других планировщиков
H.4.13.1. Перенос заданий из pg_cron в pg_timetable
Если вы хотите быстро экспортировать задания, запланированные из pg_cron в pg_timetable, вы можете использовать этот фрагмент SQL:
SELECT timetable.add_job(
job_name => COALESCE(jobname, 'job: ' || command),
job_schedule => schedule,
job_command => command,
job_kind => 'SQL',
job_live => active
) FROM cron.job;
timetable.add_job(), однако, имеет некоторые ограничения. Прежде всего, функция пометит созданную задачу как автономную, указывая, что планировщик должен выполнять задачу вне цепочки транзакций. Это не ошибка, но множество автономных цепочек может привести к использованию дополнительных соединений.
Во-вторых, параметры подключения к базе данных теряются для исходных pg_cron задач, делая все задачи локальными. Чтобы экспортировать всю доступную информацию как можно точнее, используйте этот фрагмент SQL под ролью, в которой они были запланированы в pg_cron:
SET ROLE 'scheduler'; -- set the role used by pg_cron
WITH cron_chain AS (
SELECT
nextval('timetable.chain_chain_id_seq'::regclass) AS cron_id,
jobname,
schedule,
active,
command,
CASE WHEN
database != current_database()
OR nodename != 'localhost'
OR username != CURRENT_USER
OR nodeport != inet_server_port()
THEN
format('host=%s port=%s dbname=%s user=%s', nodename, nodeport, database, username)
END AS connstr
FROM
cron.job
),
cte_chain AS (
INSERT INTO timetable.chain (chain_id, chain_name, run_at, live)
SELECT
cron_id, COALESCE(jobname, 'cronjob' || cron_id), schedule, active
FROM
cron_chain
),
cte_tasks AS (
INSERT INTO timetable.task (chain_id, task_order, kind, command, database_connection)
SELECT
cron_id, 1, 'SQL', command, connstr
FROM
cron_chain
RETURNING
chain_id, task_id
)
SELECT * FROM cte_tasks;
H.4.13.2. Перенос заданий из pgAgent в pg_timetable
Чтобы перенести задания из pgAgent, пожалуйста, используйте этот скрипт. pgAgent не имеет концепции задачи PROGRAM, поэтому для эмуляции шагов BATCH, pg_timetable будет выполнять их внутри оболочки. Вы можете изменить оболочку, отредактировав предложение CTE cte_shell.
CREATE OR REPLACE FUNCTION bool_array_to_cron(bool[], start_with int4 DEFAULT 0) RETURNS TEXT AS
$$
WITH u AS (
SELECT unnest($1) e, generate_series($2, array_length($1, 1)-1+$2) AS i
)
SELECT COALESCE(string_agg(i::text, ','), '*') FROM u WHERE e
$$
LANGUAGE sql;
WITH
cte_shell(shell, cmd_param) AS (
VALUES ('sh', '-c') -- set the shell you want to use for batch steps, e.g. "pwsh -c", "cmd /C"
),
pga_schedule AS (
SELECT
s.jscjobid,
s.jscname,
format('%s %s %s %s %s',
bool_array_to_cron(s.jscminutes),
bool_array_to_cron(s.jschours),
bool_array_to_cron(s.jscmonthdays),
bool_array_to_cron(s.jscmonths, 1),
bool_array_to_cron(s.jscweekdays, 1)) AS schedule
FROM
pgagent.pga_schedule s
WHERE s.jscenabled
AND now() < COALESCE(s.jscend, 'infinity'::timestamptz)
AND now() > s.jscstart
),
pga_chain AS (
SELECT
nextval('timetable.chain_chain_id_seq'::regclass) AS chain_id,
jobid,
format('%s @ %s', jobname, jscname) AS jobname,
jobhostagent,
jobenabled,
schedule
FROM
pgagent.pga_job JOIN pga_schedule ON jobid = jscjobid
),
cte_chain AS (
INSERT INTO timetable.chain (chain_id, chain_name, client_name, run_at, live)
SELECT
chain_id, jobname, jobhostagent, schedule, jobenabled
FROM
pga_chain
),
pga_step AS (
SELECT
c.chain_id,
nextval('timetable.task_task_id_seq'::regclass) AS task_id,
rank() OVER (ORDER BY jstname) AS jstorder,
jstid,
jstname,
jstenabled,
CASE jstkind WHEN 'b' THEN 'PROGRAM' ELSE 'SQL' END AS jstkind,
jstcode,
COALESCE(
NULLIF(jstconnstr, ''),
CASE
WHEN jstdbname = current_database() THEN NULL
WHEN jstdbname > '' THEN 'dbname=' || jstdbname
END
) AS jstconnstr,
jstonerror != 'f' AS jstignoreerror
FROM
pga_chain c JOIN pgagent.pga_jobstep js ON c.jobid = js.jstjobid
),
cte_tasks AS (
INSERT INTO timetable.task(task_id, chain_id, task_name, task_order, kind, command, database_connection)
SELECT
task_id, chain_id, jstname, jstorder, jstkind::timetable.command_kind,
CASE jstkind WHEN 'SQL' THEN jstcode ELSE sh.shell END,
jstconnstr
FROM
pga_step, cte_shell sh
),
cte_parameters AS (
INSERT INTO timetable.parameter (task_id, order_id, value)
SELECT
task_id, 1, jsonb_build_array(sh.cmd_param, s.jstcode)
FROM
pga_step s, cte_shell sh
WHERE
s.jstkind = 'PROGRAM'
)
SELECT * FROM pga_chain;
H.4.14. REST API
pg_timetable имеет богатый REST API, который может использоваться внешними инструментами для выполнения запуска/остановки/инициализации/перезапуска/перезагрузки, любыми инструментами для выполнения HTTP-проверок состояния, и, конечно, также может использоваться для мониторинга.
Ниже вы найдете список pg_timetable REST API конечных точек.
H.4.14.1. Точки проверки состояния
-
GET /liveness Всегда возвращает код состояния HTTP
200, указывая на то, что pg_timetable работает.-
GET /readiness Возвращает HTTP статус-код
200, когда pg_timetable работает, и планировщик находится в основном цикле обработки цепочек. Если планировщик подключается к базе данных, создает схему базы данных или обновляет ее, он вернет HTTP статус-код503.
H.4.14.2. Управление цепочкой конечные точки
-
GET /startchain?id=<chain-id> Возвращает HTTP статус-код
200, если цепочка с данным идентификатором может быть добавлена в очередь рабочих. Однако это не означает, что выполнение цепочки начнется немедленно. Рабочий процесс должен выполнить проверку нагрузки и другие проверки перед началом выполнения цепочки. В случае ошибки возвращается HTTP статус-код400с последующим сообщением об ошибке.-
GET /stopchain?id=<chain-id> Возвращает HTTP статус-код
200, если цепочка с данным идентификатором работает в данный момент и может быть остановлена. Если цепочка выполняется, сигнал отмены будет отправлен немедленно. В случае ошибки возвращается HTTP статус-код400с сообщением об ошибке.
H.4.15. Схема базы данных
pg_timetable является приложением, управляемым базой данных. Во время первого запуска создается необходимая схема, если она отсутствует.
H.4.15.1. Основные таблицы и объекты
CREATE TABLE timetable.chain (
chain_id BIGSERIAL PRIMARY KEY,
chain_name TEXT NOT NULL UNIQUE,
run_at timetable.cron,
max_instances INTEGER,
timeout INTEGER DEFAULT 0,
live BOOLEAN DEFAULT FALSE,
self_destruct BOOLEAN DEFAULT FALSE,
exclusive_execution BOOLEAN DEFAULT FALSE,
client_name TEXT,
on_error TEXT
);
COMMENT ON TABLE timetable.chain IS
'Stores information about chains schedule';
COMMENT ON COLUMN timetable.chain.run_at IS
'Extended CRON-style time notation the chain has to be run at';
COMMENT ON COLUMN timetable.chain.max_instances IS
'Number of instances (clients) this chain can run in parallel';
COMMENT ON COLUMN timetable.chain.timeout IS
'Abort any chain that takes more than the specified number of milliseconds';
COMMENT ON COLUMN timetable.chain.live IS
'Indication that the chain is ready to run, set to FALSE to pause execution';
COMMENT ON COLUMN timetable.chain.self_destruct IS
'Indication that this chain will delete itself after successful run';
COMMENT ON COLUMN timetable.chain.exclusive_execution IS
'All parallel chains should be paused while executing this chain';
COMMENT ON COLUMN timetable.chain.client_name IS
'Only client with this name is allowed to run this chain, set to NULL to allow any client';
CREATE TYPE timetable.command_kind AS ENUM ('SQL', 'PROGRAM', 'BUILTIN');
CREATE TABLE timetable.task (
task_id BIGSERIAL PRIMARY KEY,
chain_id BIGINT REFERENCES timetable.chain(chain_id) ON UPDATE CASCADE ON DELETE SCADE,
task_order DOUBLE PRECISION NOT NULL,
task_name TEXT,
kind timetable.command_kind NOT NULL DEFAULT 'SQL',
command TEXT NOT NULL,
run_as TEXT,
database_connection TEXT,
ignore_error BOOLEAN NOT NULL DEFAULT FALSE,
autonomous BOOLEAN NOT NULL DEFAULT FALSE,
timeout INTEGER DEFAULT 0
);
COMMENT ON TABLE timetable.task IS
'Holds information about chain elements aka tasks';
COMMENT ON COLUMN timetable.task.chain_id IS
'Link to the chain, if NULL task considered to be disabled';
COMMENT ON COLUMN timetable.task.task_order IS
'Indicates the order of task within a chain';
COMMENT ON COLUMN timetable.task.run_as IS
'Role name to run task as. Uses SET ROLE for SQL commands';
COMMENT ON COLUMN timetable.task.ignore_error IS
'Indicates whether a next task in a chain can be executed regardless of the success of the current one';
COMMENT ON COLUMN timetable.task.kind IS
'Indicates whether "command" is SQL, built-in function or an external program';
COMMENT ON COLUMN timetable.task.command IS
'Contains either an SQL command, or command string to be executed';
COMMENT ON COLUMN timetable.task.timeout IS
'Abort any task within a chain that takes more than the specified number of milliseconds';
-- parameter passing for a chain task
CREATE TABLE timetable.parameter(
task_id BIGINT REFERENCES timetable.task(task_id)
ON UPDATE CASCADE ON DELETE CASCADE,
order_id INTEGER CHECK (order_id > 0),
value JSONB,
PRIMARY KEY (task_id, order_id)
);
COMMENT ON TABLE timetable.parameter IS
'Stores parameters passed as arguments to a chain task';
CREATE UNLOGGED TABLE timetable.active_session(
client_pid BIGINT NOT NULL,
server_pid BIGINT NOT NULL,
client_name TEXT NOT NULL,
started_at TIMESTAMPTZ DEFAULT now()
);
COMMENT ON TABLE timetable.active_session IS
'Stores information about active sessions';
CREATE TYPE timetable.log_type AS ENUM ('DEBUG', 'NOTICE', 'INFO', 'ERROR', 'PANIC', 'USER');
CREATE OR REPLACE FUNCTION timetable.get_client_name(integer) RETURNS TEXT AS
$$
SELECT client_name FROM timetable.active_session WHERE server_pid = $1 LIMIT 1
$$
LANGUAGE sql;
CREATE TABLE timetable.log
(
ts TIMESTAMPTZ DEFAULT now(),
pid INTEGER NOT NULL,
log_level timetable.log_type NOT NULL,
client_name TEXT DEFAULT timetable.get_client_name(pg_backend_pid()),
message TEXT,
message_data jsonb
);
COMMENT ON TABLE timetable.log IS
'Stores log entries of active sessions';
CREATE TABLE timetable.execution_log (
chain_id BIGINT,
task_id BIGINT,
txid BIGINT NOT NULL,
last_run TIMESTAMPTZ DEFAULT now(),
finished TIMESTAMPTZ,
pid BIGINT,
returncode INTEGER,
kind timetable.command_kind,
command TEXT,
output TEXT,
client_name TEXT NOT NULL
);
COMMENT ON TABLE timetable.execution_log IS
'Stores log entries of executed tasks and chains';
CREATE UNLOGGED TABLE timetable.active_chain(
chain_id BIGINT NOT NULL,
client_name TEXT NOT NULL,
started_at TIMESTAMPTZ DEFAULT now()
);
COMMENT ON TABLE timetable.active_chain IS
'Stores information about active chains within session';
CREATE OR REPLACE FUNCTION timetable.try_lock_client_name(worker_pid BIGINT, worker_name TEXT)
RETURNS bool AS
$CODE$
BEGIN
IF pg_is_in_recovery() THEN
RAISE NOTICE 'Cannot obtain lock on a replica. Please, use the primary node';
RETURN FALSE;
END IF;
-- remove disconnected sessions
DELETE
FROM timetable.active_session
WHERE server_pid NOT IN (
SELECT pid
FROM pg_catalog.pg_stat_activity
WHERE application_name = 'pg_timetable'
);
DELETE
FROM timetable.active_chain
WHERE client_name NOT IN (
SELECT client_name FROM timetable.active_session
);
-- check if there any active sessions with the client name but different client pid
PERFORM 1
FROM timetable.active_session s
WHERE
s.client_pid <> worker_pid
AND s.client_name = worker_name
LIMIT 1;
IF FOUND THEN
RAISE NOTICE 'Another client is already connected to server with name: %', worker_name;
RETURN FALSE;
END IF;
-- insert current session information
INSERT INTO timetable.active_session(client_pid, client_name, server_pid) VALUES (worker_pid, worker_name, backend_pid());
RETURN TRUE;
END;
$CODE$
STRICT
LANGUAGE plpgsql;
H.4.15.2. Функции, связанные с работой
-- add_task() will add a task to the same chain as the task with `parent_id`
CREATE OR REPLACE FUNCTION timetable.add_task(
IN kind timetable.command_kind,
IN command TEXT,
IN parent_id BIGINT,
IN order_delta DOUBLE PRECISION DEFAULT 10
) RETURNS BIGINT AS $$
INSERT INTO timetable.task (chain_id, task_order, kind, command)
SELECT chain_id, task_order + $4, $1, $2 FROM timetable.task WHERE task_id = $3
RETURNING task_id
$$ LANGUAGE SQL;
COMMENT ON FUNCTION timetable.add_task IS 'Add a task to the same chain as the task with parent_id';
-- add_job() will add one-task chain to the system
CREATE OR REPLACE FUNCTION timetable.add_job(
job_name TEXT,
job_schedule timetable.cron,
job_command TEXT,
job_parameters JSONB DEFAULT NULL,
job_kind timetable.command_kind DEFAULT 'SQL'::timetable.command_kind,
job_client_name TEXT DEFAULT NULL,
job_max_instances INTEGER DEFAULT NULL,
job_live BOOLEAN DEFAULT TRUE,
job_self_destruct BOOLEAN DEFAULT FALSE,
job_ignore_errors BOOLEAN DEFAULT TRUE,
job_exclusive BOOLEAN DEFAULT FALSE,
job_on_error TEXT DEFAULT NULL
) RETURNS BIGINT AS $$
WITH
cte_chain (v_chain_id) AS (
INSERT INTO timetable.chain (chain_name, run_at, max_instances, live, self_destruct, client_name, clusive_execution, on_error)
VALUES (job_name, job_schedule,job_max_instances, job_live, job_self_destruct, job_client_name, b_exclusive, job_on_error)
RETURNING chain_id
),
cte_task(v_task_id) AS (
INSERT INTO timetable.task (chain_id, task_order, kind, command, ignore_error, autonomous)
SELECT v_chain_id, 10, job_kind, job_command, job_ignore_errors, TRUE
FROM cte_chain
RETURNING task_id
),
cte_param AS (
INSERT INTO timetable.parameter (task_id, order_id, value)
SELECT v_task_id, 1, job_parameters FROM cte_task, cte_chain
)
SELECT v_chain_id FROM cte_chain
$$ LANGUAGE SQL;
COMMENT ON FUNCTION timetable.add_job IS 'Add one-task chain (aka job) to the system';
-- notify_chain_start() will send notification to the worker to start the chain
CREATE OR REPLACE FUNCTION timetable.notify_chain_start(
chain_id BIGINT,
worker_name TEXT,
start_delay INTERVAL DEFAULT NULL
) RETURNS void AS $$
SELECT pg_notify(
worker_name,
format('{"ConfigID": %s, "Command": "START", "Ts": %s, "Delay": %s}',
chain_id,
EXTRACT(epoch FROM clock_timestamp())::bigint,
COALESCE(EXTRACT(epoch FROM start_delay)::bigint, 0)
)
)
$$ LANGUAGE SQL;
COMMENT ON FUNCTION timetable.notify_chain_start IS 'Send notification to the worker to start the chain';
-- notify_chain_stop() will send notification to the worker to stop the chain
CREATE OR REPLACE FUNCTION timetable.notify_chain_stop(
chain_id BIGINT,
worker_name TEXT
) RETURNS void AS $$
SELECT pg_notify(
worker_name,
format('{"ConfigID": %s, "Command": "STOP", "Ts": %s}',
chain_id,
EXTRACT(epoch FROM clock_timestamp())::bigint)
)
$$ LANGUAGE SQL;
COMMENT ON FUNCTION timetable.notify_chain_stop IS 'Send notification to the worker to stop the chain';
-- move_task_up() will switch the order of the task execution with a previous task within the chain
CREATE OR REPLACE FUNCTION timetable.move_task_up(IN task_id BIGINT) RETURNS boolean AS $$
WITH current_task (ct_chain_id, ct_id, ct_order) AS (
SELECT chain_id, task_id, task_order FROM timetable.task WHERE task_id = $1
),
tasks(t_id, t_new_order) AS (
SELECT task_id, COALESCE(LAG(task_order) OVER w, LEAD(task_order) OVER w)
FROM timetable.task t, current_task ct
WHERE chain_id = ct_chain_id AND (task_order < ct_order OR task_id = ct_id)
WINDOW w AS (PARTITION BY chain_id ORDER BY ABS(task_order - ct_order))
LIMIT 2
),
upd AS (
UPDATE timetable.task t SET task_order = t_new_order
FROM tasks WHERE tasks.t_id = t.task_id AND tasks.t_new_order IS NOT NULL
RETURNING true
)
SELECT COUNT(*) > 0 FROM upd
$$ LANGUAGE SQL;
COMMENT ON FUNCTION timetable.move_task_up IS 'Switch the order of the task execution with a previous task within chain';
-- move_task_down() will switch the order of the task execution with a following task within the chain
CREATE OR REPLACE FUNCTION timetable.move_task_down(IN task_id BIGINT) RETURNS boolean AS $$
WITH current_task (ct_chain_id, ct_id, ct_order) AS (
SELECT chain_id, task_id, task_order FROM timetable.task WHERE task_id = $1
),
tasks(t_id, t_new_order) AS (
SELECT task_id, COALESCE(LAG(task_order) OVER w, LEAD(task_order) OVER w)
FROM timetable.task t, current_task ct
WHERE chain_id = ct_chain_id AND (task_order > ct_order OR task_id = ct_id)
WINDOW w AS (PARTITION BY chain_id ORDER BY ABS(task_order - ct_order))
LIMIT 2
),
upd AS (
UPDATE timetable.task t SET task_order = t_new_order
FROM tasks WHERE tasks.t_id = t.task_id AND tasks.t_new_order IS NOT NULL
RETURNING true
)
SELECT COUNT(*) > 0 FROM upd
$$ LANGUAGE SQL;
COMMENT ON FUNCTION timetable.move_task_down IS 'Switch the order of the task execution with a following task hin the chain';
-- delete_job() will delete the chain and its tasks from the system
CREATE OR REPLACE FUNCTION timetable.delete_job(IN job_name TEXT) RETURNS boolean AS $$
WITH del_chain AS (DELETE FROM timetable.chain WHERE chain.chain_name = $1 RETURNING chain_id)
SELECT EXISTS(SELECT 1 FROM del_chain)
$$ LANGUAGE SQL;
COMMENT ON FUNCTION timetable.delete_job IS 'Delete the chain and its tasks from the system';
-- delete_task() will delete the task from a chain
CREATE OR REPLACE FUNCTION timetable.delete_task(IN task_id BIGINT) RETURNS boolean AS $$
WITH del_task AS (DELETE FROM timetable.task WHERE task_id = $1 RETURNING task_id)
SELECT EXISTS(SELECT 1 FROM del_task)
$$ LANGUAGE SQL;
COMMENT ON FUNCTION timetable.delete_task IS 'Delete the task from a chain';
H.4.15.3. Функции, связанные с Сron
CREATE OR REPLACE FUNCTION timetable.cron_split_to_arrays(
cron text,
OUT mins integer[],
OUT hours integer[],
OUT days integer[],
OUT months integer[],
OUT dow integer[]
) RETURNS record AS $$
DECLARE
a_element text[];
i_index integer;
a_tmp text[];
tmp_item text;
a_range int[];
a_split text[];
a_res integer[];
max_val integer;
min_val integer;
dimensions constant text[] = '{"minutes", "hours", "days", "months", "days of week"}';
allowed_ranges constant integer[][] = '{{0,59},{0,23},{1,31},{1,12},{0,7}}';
BEGIN
a_element := regexp_split_to_array(cron, '\s+');
FOR i_index IN 1..5 LOOP
a_res := NULL;
a_tmp := string_to_array(a_element[i_index],',');
FOREACH tmp_item IN ARRAY a_tmp LOOP
IF tmp_item ~ '^[0-9]+$' THEN -- normal integer
a_res := array_append(a_res, tmp_item::int);
ELSIF tmp_item ~ '^[*]+$' THEN -- '*' any value
a_range := array(select generate_series(allowed_ranges[i_index][1], allowed_ranges[i_index][2]));
a_res := array_cat(a_res, a_range);
ELSIF tmp_item ~ '^[0-9]+[-][0-9]+$' THEN -- '-' range of values
a_range := regexp_split_to_array(tmp_item, '-');
a_range := array(select generate_series(a_range[1], a_range[2]));
a_res := array_cat(a_res, a_range);
ELSIF tmp_item ~ '^[0-9]+[\/][0-9]+$' THEN -- '/' step values
a_range := regexp_split_to_array(tmp_item, '/');
a_range := array(select generate_series(a_range[1], allowed_ranges[i_index][2], a_range[2]));
a_res := array_cat(a_res, a_range);
ELSIF tmp_item ~ '^[0-9-]+[\/][0-9]+$' THEN -- '-' range of values and '/' step values
a_split := regexp_split_to_array(tmp_item, '/');
a_range := regexp_split_to_array(a_split[1], '-');
a_range := array(select generate_series(a_range[1], a_range[2], a_split[2]::int));
a_res := array_cat(a_res, a_range);
ELSIF tmp_item ~ '^[*]+[\/][0-9]+$' THEN -- '*' any value and '/' step values
a_split := regexp_split_to_array(tmp_item, '/');
a_range := array(select generate_series(allowed_ranges[i_index][1], allowed_ranges[i_index][2], a_split[2]::int));
a_res := array_cat(a_res, a_range);
ELSE
RAISE EXCEPTION 'Value ("%") not recognized', a_element[i_index]
USING HINT = 'fields separated by space or tab.'+
'Values allowed: numbers (value list with ","), '+
'any value with "*", range of value with "-" and step values with "/"!';
END IF;
END LOOP;
SELECT
ARRAY_AGG(x.val), MIN(x.val), MAX(x.val) INTO a_res, min_val, max_val
FROM (
SELECT DISTINCT UNNEST(a_res) AS val ORDER BY val) AS x;
IF max_val > allowed_ranges[i_index][2] OR min_val < allowed_ranges[i_index][1] OR a_res IS NULL THEN
RAISE EXCEPTION '% is out of range % for %', tmp_item, allowed_ranges[i_index:i_index][:], dimensions[i_index];
END IF;
CASE i_index
WHEN 1 THEN mins := a_res;
WHEN 2 THEN hours := a_res;
WHEN 3 THEN days := a_res;
WHEN 4 THEN months := a_res;
ELSE
dow := a_res;
END CASE;
END LOOP;
RETURN;
END;
$$ LANGUAGE PLPGSQL STRICT;
CREATE OR REPLACE FUNCTION timetable.cron_months(
from_ts timestamptz,
allowed_months int[]
) RETURNS SETOF timestamptz AS $$
WITH
am(am) AS (SELECT UNNEST(allowed_months)),
genm(ts) AS ( --generated months
SELECT date_trunc('month', ts)
FROM pg_catalog.generate_series(from_ts, from_ts + INTERVAL '1 year', INTERVAL '1 month') g(ts)
)
SELECT ts FROM genm JOIN am ON date_part('month', genm.ts) = am.am
$$ LANGUAGE SQL STRICT;
CREATE OR REPLACE FUNCTION timetable.cron_days(
from_ts timestamptz,
allowed_months int[],
allowed_days int[],
allowed_week_days int[]
) RETURNS SETOF timestamptz AS $$
WITH
ad(ad) AS (SELECT UNNEST(allowed_days)),
am(am) AS (SELECT * FROM timetable.cron_months(from_ts, allowed_months)),
gend(ts) AS ( --generated days
SELECT date_trunc('day', ts)
FROM am,
pg_catalog.generate_series(am.am, am.am + INTERVAL '1 month'
- INTERVAL '1 day', -- don't include the same day of the next month
INTERVAL '1 day') g(ts)
)
SELECT ts
FROM gend JOIN ad ON date_part('day', gend.ts) = ad.ad
WHERE extract(dow from ts)=ANY(allowed_week_days)
$$ LANGUAGE SQL STRICT;
CREATE OR REPLACE FUNCTION timetable.cron_times(
allowed_hours int[],
allowed_minutes int[]
) RETURNS SETOF time AS $$
WITH
ah(ah) AS (SELECT UNNEST(allowed_hours)),
am(am) AS (SELECT UNNEST(allowed_minutes))
SELECT make_time(ah.ah, am.am, 0) FROM ah CROSS JOIN am
$$ LANGUAGE SQL STRICT;
CREATE OR REPLACE FUNCTION timetable.cron_runs(
from_ts timestamp with time zone,
cron text
) RETURNS SETOF timestamptz AS $$
SELECT cd + ct
FROM
timetable.cron_split_to_arrays(cron) a,
timetable.cron_times(a.hours, a.mins) ct CROSS JOIN
timetable.cron_days(from_ts, a.months, a.days, a.dow) cd
WHERE cd + ct > from_ts
ORDER BY 1 ASC;
$$ LANGUAGE SQL STRICT;
CREATE DOMAIN timetable.cron AS TEXT CHECK(
VALUE = '@reboot'
OR substr(VALUE, 1, 6) IN ('@every', '@after')
AND (substr(VALUE, 7) :: INTERVAL) IS NOT NULL
OR VALUE ~ '^(((\d+,)+\d+|(\d+(\/|-)\d+)|(\*(\/|-)\d+)|\d+|\*) +){4}(((\d+,)+\d+|(\d+(\/|-)\d+)|(\*(\/|-)\d+)|\d+|\*) ?)$'
AND timetable.cron_split_to_arrays(VALUE) IS NOT NULL
);
COMMENT ON DOMAIN timetable.cron IS 'Extended CRON-style notation with support of interval values';
-- is_cron_in_time returns TRUE if timestamp is listed in cron expression
CREATE OR REPLACE FUNCTION timetable.is_cron_in_time(
run_at timetable.cron,
ts timestamptz
) RETURNS BOOLEAN AS $$
SELECT
CASE WHEN run_at IS NULL THEN
TRUE
ELSE
date_part('month', ts) = ANY(a.months)
AND (date_part('dow', ts) = ANY(a.dow) OR date_part('isodow', ts) = ANY(a.dow))
AND date_part('day', ts) = ANY(a.days)
AND date_part('hour', ts) = ANY(a.hours)
AND date_part('minute', ts) = ANY(a.mins)
END
FROM
timetable.cron_split_to_arrays(run_at) a
$$ LANGUAGE SQL;
CREATE OR REPLACE FUNCTION timetable.next_run(cron timetable.cron) RETURNS timestamptz AS $$
SELECT * FROM timetable.cron_runs(now(), cron) LIMIT 1
$$ LANGUAGE SQL STRICT;
H.4.15.4. ER-Диаграмма
