H.2. pg_timetable#
H.2. pg_timetable #
- H.2.1. О pg_timetable
- H.2.2. Основные функции
- H.2.3. Быстрый старт
- H.2.4. Параметры командной строки
- H.2.5. Вклад
- H.2.6. Поддержка
- H.2.7. Авторы
- H.2.8. Фон проекта
- H.2.9. Установка
- H.2.10. Компоненты
- H.2.11. Начало работы
- H.2.12. Примеры
- H.2.13. Миграция с других планировщиков
- H.2.14. REST API
- H.2.15. Схема базы данных
pg_timetable является продвинутым планировщиком заданий для PostgreSQL, предлагая множество преимуществ по сравнению с традиционными планировщиками, такими как крон и другие. Он полностью управляется базой данных и предоставляет несколько продвинутых концепций.
H.2.2. Основные функции #
Задачи могут быть организованы в цепочки
Цепочка может состоять из встроенных команд, SQL и исполняемых файлов
Параметры могут быть переданы в цепочки
Пропущенные задачи (возможно, из-за простоя) могут быть повторно выполнены автоматически
Поддержка настраиваемых повторений
Встроенные задачи, такие как отправка электронных писем и т.д.
Полностью управляемая базой данных конфигурация
Полная поддержка ведения журналов на основе базы данных
Планирование в стиле Cron в часовом поясе сервера PostgreSQL
Необязательная защита от параллелизма
Задача и цепочка могут иметь настройки времени ожидания выполнения
H.2.3. Быстрый старт #
Скачайте исполняемый файл pg_timetable
Убедитесь, что ваш сервер PostgreSQL запущен и имеет роль с привилегией
CREATE
для целевой базы данных, например,my_database=> CREATE ROLE scheduler PASSWORD 'somestrong'; my_database=> GRANT CREATE ON DATABASE my_database TO scheduler;
Создайте новую задачу, например, выполняйте
VACUUM
каждую ночь в 00:30 по часовому поясу сервера Postgresmy_database=> SELECT timetable.add_job('frequent-vacuum', '30 * * * *', 'VACUUM'); add_job --------- 3 (1 row)
Запустите pg_timetable
# pg_timetable postgresql://scheduler:somestrong@localhost/my_database --clientname=vacuumer
Успех!
H.2.4. Параметры командной строки #
# ./pg_timetable Application Options: -c, --clientname= Unique name for application instance [$PGTT_CLIENTNAME] --config= YAML configuration file --no-program-tasks Disable executing of PROGRAM tasks [$PGTT_NOPROGRAMTASKS] -v, --version Output detailed version information [$PGTT_VERSION] Connection: -h, --host= PostgreSQL host (default: localhost) [$PGTT_PGHOST] -p, --port= PostgreSQL port (default: 5432) [$PGTT_PGPORT] -d, --dbname= PostgreSQL database name (default: timetable) [$PGTT_PGDATABASE] -u, --user= PostgreSQL user (default: scheduler) [$PGTT_PGUSER] --password= PostgreSQL user password [$PGTT_PGPASSWORD] --sslmode=[disable|require] Connection SSL mode (default: disable) [$PGTT_PGSSLMODE] --pgurl= PostgreSQL connection URL [$PGTT_URL] --timeout= PostgreSQL connection timeout (default: 90) [$PGTT_TIMEOUT] Logging: --log-level=[debug|info|error] Verbosity level for stdout and log file (default: info) --log-database-level=[debug|info|error|none] Verbosity level for database storing (default: info) --log-file= File name to store logs --log-file-format=[json|text] Format of file logs (default: json) --log-file-rotate Rotate log files --log-file-size= Maximum size in MB of the log file before it gets rotated (default: 100) --log-file-age= Number of days to retain old log files, 0 means forever (default: 0) --log-file-number= Maximum number of old log files to retain, 0 to retain all (default: 0) Start: -f, --file= SQL script file to execute during startup --init Initialize database schema to the latest version and exit. Can be used with --upgrade --upgrade Upgrade database to the latest version --debug Run in debug mode. Only asynchronous chains will be executed Resource: --cron-workers= Number of parallel workers for scheduled chains (default: 16) --interval-workers= Number of parallel workers for interval chains (default: 16) --chain-timeout= Abort any chain that takes more than the specified number of milliseconds --task-timeout= Abort any task within a chain that takes more than the specified number of milliseconds REST: --rest-port= REST API port (default: 0) [$PGTT_RESTPORT]
H.2.5. Вклад #
Если вы хотите внести вклад в pg_timetable и помочь сделать его лучше, не стесняйтесь открыть обсуждение или даже рассмотреть возможность отправки запроса на изменение. Вы также можете поставить звезду проекту pg_timetable проект, и рассказать о нём миру.
H.2.8. Фон проекта #
Проект pg_timetable был запущен еще в 2019 году для внутренних нужд планирования в компании Cybertec.
Для получения дополнительной информации о мотивах проекта и целях проектирования см. оригинальную серию блог-постов, объявляющих о проекте, и последующие обновления функций.
Cybertec также предоставляет коммерческую поддержку с 9 до 5 и 24/7 для pg_timetable.
H.2.8.1. Обратная связь по проекту #
Для запросов на добавление функций или помощи в устранении неполадок, пожалуйста, откройте проблему на странице Github проекта.
H.2.9. Установка #
pg_timetable совместим с последними поддерживаемыми версиями PostgreSQL: 11, 12, 13, 14 (стабильные); 15 (разработка).
Примечание
Если вы хотите использовать pg_timetable с более старыми версиями (9.5, 9.6 и 10)... пожалуйста, выполните эту SQL команду перед запуском pg_timetable:
CREATE OR REPLACE FUNCTION starts_with(text, text) RETURNS bool AS $$ SELECT CASE WHEN length($2) > length($1) THEN FALSE ELSE left($1, length($2)) = $2 END $$ LANGUAGE SQL IMMUTABLE STRICT PARALLEL SAFE COST 5;
H.2.9.1. Официальные пакеты релизов #
Вы можете найти бинарный пакет для вашей платформы на официальной странице выпусков . Прямо сейчас Windows, Linux и macOS пакеты доступны.
H.2.9.2. Docker #
Официальный образ docker можно найти здесь: https://hub.docker.com/r/cybertecpostgresql/pg_timetable
Примечание
Тег latest
актуален с веткой master благодаря
этому
действию github. В производственной среде вам, вероятно, следует использовать
последний
стабильный
тег.
Запустите pg_timetable в Docker:
docker run --rm \ cybertecpostgresql/pg_timetable:latest \ -h 10.0.0.3 -p 54321 -c worker001
Запустите pg_timetable в Docker с переменными окружения:
docker run --rm \ -e PGTT_PGHOST=10.0.0.3 \ -e PGTT_PGPORT=54321 \ cybertecpostgresql/pg_timetable:latest \ -c worker001
H.2.9.3. Сборка из исходников #
Скачайте и установите Go на вашу систему.
Склонируйте pg_timetable репозиторий:
$ git clone https://github.com/cybertec-postgresql/pg_timetable.git $ cd pg_timetable
Запустите pg_timetable:
$ go run main.go --dbname=dbname --clientname=worker001 --user=scheduler --password=strongpwd
Или создайте бинарный файл и запустите его:
$ go build $ ./pg_timetable --dbname=dbname --clientname=worker001 --user=scheduler --password=strongpwd
(Необязательно) Запустите тесты во всех подкаталогах проекта:
$ psql --command="CREATE USER scheduler PASSWORD 'somestrong'" $ createdb --owner=scheduler timetable $ go test -failfast -timeout=300s -count=1 -p 1 ./...
H.2.10. Компоненты #
Планирование в pg_timetable включает три различных уровня абстракции, чтобы облегчить повторное использование с другими параметрами или дополнительными расписаниями.
- Command:
Базовый уровень, команда , определяет что делать.
- Task:
Второй уровень, task , представляет собой элемент цепочки (шаг) для выполнения одной из команд. С помощью задачи мы определяем порядок команд, передаваемые аргументы (если есть), и как обрабатываются ошибки.
- Chain:
Третий уровень представляет собой связанные задачи, образующие цепочку задач. Цепочка определяет если, когда и как часто задача должна выполняться.
H.2.10.1. Команда #
В настоящее время существует три различных типа команд:
-
SQL
SQL фрагмент. Начало очистки, обновление материализованного представления или обработка данных.
-
PROGRAM
Внешняя команда. Все, что может быть вызвано как внешний бинарный файл, включая оболочки, например,
bash
,pwsh
и т.д. Внешняя команда будет вызвана с использованием exec.CommandContext из golang.-
BUILTIN
Внутренняя команда. Встроенная функциональность, включенная в pg_timetable . Эти включают:
NoOp,
Сон,
Журнал,
SendMail,
Скачать,
CopyFromFile,
КопироватьВФайл,
Shutdown.
H.2.10.2. Задача #
Следующим строительным блоком является task , который просто представляет собой шаг в списке цепочки команд. Пример задач, объединенных в цепочку, может быть:
Загрузить файлы с сервера
Импорт файлов
Выполнение агрегаций
Создать отчет
Удалить файлы с диска
Примечание
Все задачи цепочки в
pg_timetable
выполняются в рамках одной транзакции. Однако, обратите внимание, что нет возможности откатить задачи PROGRAM
и BUILTIN
.
H.2.10.2.1. Таблица timetable.task #
-
chain_id bigint
Ссылка на цепочку, если задача
NULL
считается отключенной-
task_order DOUBLE PRECISION
Указывает порядок задачи в цепочке.
-
kind timetable.command_kind
Тип команды. Может быть SQL (по умолчанию), PROGRAM или BUILTIN.
-
command text
Содержит либо SQL-команду, либо путь к приложению или имя команды BUILTIN, которая будет выполнена.
-
run_as text
Роль, от имени которой должна выполняться задача.
-
database_connection text
Строка подключения для внешней базы данных, которая должна быть использована.
-
ignore_error boolean
Укажите, следует ли продолжать выполнение следующей задачи после возникновения ошибки (по умолчанию:
false
).-
autonomous boolean
Укажите, следует ли выполнять задачу вне транзакции цепочки. Полезно для
VACUUM
,CREATE DATABASE
,CALL
и т. д.-
timeout integer
Прервать любую задачу в цепочке, которая занимает больше указанного количества миллисекунд.
Предупреждение
Если
task
был
настроен с ignore_error
, установленным в
true
(значение по умолчанию -
false
), рабочий процесс будет сообщать об
успешном выполнении даже если задача в цепочке
завершится неудачей.
Как упоминалось выше, команды являются простыми скелетами (например, отправить email, вакуум и т.д.). В большинстве случаев, они должны быть оживлены путем передачи входных параметров для выполнения.
H.2.10.2.2. Table timetable.parameter #
-
task_id bigint
Идентификатор задачи.
-
order_id integer
Порядок параметра. Несколько параметров обрабатываются один за другим в соответствии с порядком.
-
value jsonb
Значение JSON, содержащее параметры.
H.2.10.2.3. Формат значения параметра #
В зависимости от команда аргумент kind может быть представлен различными значениями JSON.
- Kind
- Schema
Пример
-
SQL
-
array
'[ "one", 2, 3.14, false ]'::jsonb
-
-
PROGRAM
-
array of strings
'["-x", "Latin-ASCII", "-o", "orte_ansi.txt", "orte.txt"]'::jsonb
-
-
BUILTIN: Sleep
-
integer
'5' :: jsonb
-
-
BUILTIN: Log
-
any
'"WARNING"'::jsonb '{"Status": "WARNING"}'::jsonb
-
-
BUILTIN: SendMail
-
object
'{ "username": "user@example.com", "password": "password", "serverhost": "smtp.example.com", "serverport": 587, "senderaddr": "user@example.com", "ccaddr": ["recipient_cc@example.com"], "bccaddr": ["recipient_bcc@example.com"], "toaddr": ["recipient@example.com"], "subject": "pg_timetable - No Reply", "attachment": ["/temp/attachments/Report.pdf","config.yaml"], "attachmentdata": [{"name": "File.txt", "base64data": "RmlsZSBDb250ZW50"}], "msgbody": "<h2>Hello User,</h2> <p>check some attachments!</p>", "contenttype": "text/html; charset=UTF-8" }'::jsonb
-
-
BUILTIN: Download
-
object
'{ "workersnum": 2, "fileurls": ["http://example.com/foo.gz", "https://example.com/bar.csv"], "destpath": "." }'::jsonb
-
-
BUILTIN: CopyFromFile
-
object
'{ "sql": "COPY location FROM STDIN", "filename": "download/orte_ansi.txt" }'::jsonb
-
-
BUILTIN: CopyToFile
-
object
'{ "sql": "COPY location TO STDOUT", "filename": "download/location.txt" }'::jsonb
-
-
BUILTIN: Shutdown
значение игнорируется
-
BUILTIN: NoOp
значение игнорируется
H.2.10.3. Цепочка #
Как только задачи были упорядочены, их необходимо запланировать как цепочка . Для этого, pg_timetable основывается на улучшенной крон -строка, при этом добавляя несколько параметров конфигурации.
H.2.10.3.1. Table timetable.chain #
-
chain_name text
Уникальное имя цепочки.
-
run_at timetable.cron
Стандартное значение в стиле cron в часовом поясе сервера Postgres или
@after
,@every
,@reboot
условие.-
max_instances integer
Количество экземпляров, которые эта цепочка может иметь работающими одновременно.
-
timeout integer
Прервать любую цепочку, которая занимает больше указанного количества миллисекунд.
-
live boolean
Управление возможностью выполнения цепочки, когда она достигает своего расписания.
-
self_destruct boolean
Самоуничтожение цепочки после успешного выполнения. Неудачные цепочки будут выполнены еще раз в соответствии с расписанием.
-
exclusive_execution boolean
Указывает, следует ли выполнять цепочку исключительно, пока все остальные цепочки приостановлены.
-
client_name text
Указывает, какой клиент должен выполнить цепочку. Установите это значение в NULL, чтобы разрешить выполнение любому клиенту.
-
timeout integer
Прервать цепочку, которая занимает больше указанного количества миллисекунд.
-
on_error
Содержит SQL для выполнения в случае ошибки. Если задача, вызвавшая ошибку, помечена как
ignore_error
, то ничего не делается.
Примечание
Все цепочки в pg_timetable запланированы в часовом поясе сервера PostgreSQL. Вы можете изменить часовой пояс для текущая сессия при добавлении новых цепочек, например
SET TIME ZONE 'UTC'; -- Run VACUUM at 00:05 every day in August UTC SELECT timetable.add_job('execute-func', '5 0 * 8 *', 'VACUUM');
H.2.11. Начало работы #
Разнообразные примеры можно найти в Примеры. Если вы хотите перейти с другого планировщика, вы можете использовать скрипты из Миграция с других планировщиков главы.
H.2.11.1. Добавить простую задачу #
В реальном мире обычно достаточно использовать простые задания. Под этим термином мы понимаем:
работа является цепочкой с только одним task (шаг) в нем;
он не использует сложную логику, а скорее простую команда;
это не требует сложной обработки транзакций, так как одна задача неявно выполняется как одна транзакция.
Для такой группы цепочек мы ввели специальную функцию
timetable.add_job()
.
- timetable.add_job(job_name, job_schedule, job_command, ...) RETURNS BIGINT
Создает простую цепочку из одной задачи
- Parameters:
job_name (текст) – Уникальное имя цепочка и команда.
job_schedule (timetable.cron) – Расписание времени в синтаксисе cron в часовом поясе сервера Postgres
job_command (текст) – SQL, который будет выполнен.
job_parameters (jsonb) – Аргументы для цепочки команда . По умолчанию:
NULL
.job_kind (timetable.command_kind) – Вид команды: SQL, PROGRAM или BUILTIN. По умолчанию:
SQL
.job_client_name (текст) – Указывает, какой клиент должен выполнить цепочку. Установите это значение в NULL, чтобы разрешить выполнение любому клиенту. По умолчанию:
NULL
.job_max_instances (integer) – Количество экземпляров, которые эта цепочка может иметь, работающих одновременно. По умолчанию:
NULL
.job_live (boolean) – Управляет возможностью выполнения цепочки, когда она достигает своего расписания. По умолчанию:
TRUE
.job_self_destruct (boolean) – Самоуничтожение цепочки после выполнения. По умолчанию:
FALSE
.job_ignore_errors (boolean) – Игнорировать ошибку во время выполнения. По умолчанию:
TRUE
.job_exclusive (boolean) – Выполнить цепочку в эксклюзивном режиме. По умолчанию:
FALSE
.
- Returns:
ID созданной цепочки
- Return type:
целое число
H.2.11.2. Примеры #
Запустите
public.my_func()
в 00:05 каждый день в августе по часовому поясу сервера Postgres:SELECT timetable.add_job('execute-func', '5 0 * 8 *', 'SELECT public.my_func()');
Запускать VACUUM на 23-й минуте каждого 2-го часа с 0 до 20 каждый день по часовому поясу сервера Postgres:
SELECT timetable.add_job('run-vacuum', '23 0-20/2 * * *', 'VACUUM');
Обновлять материализованное представление каждые 2 часа:
SELECT timetable.add_job('refresh-matview', '@every 2 hours', 'REFRESH MATERIALIZED VIEW public.mat_view');
Очистить таблицу журнала после pg_timetable перезапуск:
SELECT timetable.add_job('clear-log', '@reboot', 'TRUNCATE timetable.log');
Переиндексация в полночь по часовому поясу сервера Postgres по воскресеньям с утилитой reindexdb :
используя базу данных по умолчанию под пользователем по умолчанию (без аргументов командной строки)
SELECT timetable.add_job('reindex', '0 0 * * 7', 'reindexdb', job_kind := 'PROGRAM');
указание целевой базы данных и таблиц, и быть подробным
SELECT timetable.add_job('reindex', '0 0 * * 7', 'reindexdb', '["--table=foo", "--dbname=postgres", "--verbose"]'::jsonb, 'PROGRAM');
передача пароля с использованием переменной окружения через
bash
оболочкуSELECT timetable.add_job('reindex', '0 0 * * 7', 'bash', '["-c", "PGPASSWORD=5m3R7K4754p4m reindexdb -U postgres -h 192.168.0.221 -v"]'::jsonb, 'PROGRAM');
H.2.12. Примеры #
H.2.12.1. Основы #
Этот пример демонстрирует, как создать базовую цепочку из одного шага с параметрами. Он использует CTE для непосредственного обновления расписание схема таблицы.
SELECT timetable.add_job( job_name => 'notify every minute', job_schedule => '* * * * *', job_command => 'SELECT pg_notify($1, $2)', job_parameters => '[ "TT_CHANNEL", "Ahoj from SQL base task" ]' :: jsonb, job_kind => 'SQL'::timetable.command_kind, job_client_name => NULL, job_max_instances => 1, job_live => TRUE, job_self_destruct => FALSE, job_ignore_errors => TRUE ) as chain_id;
H.2.12.2. Отправить электронное письмо #
Этот пример демонстрирует, как создать продвинутую задачу отправки электронной почты. Он будет проверять, есть ли электронные письма для отправки, отправлять их и записывать статус выполнения команды. Вам не нужно ничего настраивать, каждый параметр может быть указан во время создания цепочки.
DO $$ -- An example for using the SendMail task. DECLARE v_mail_task_id bigint; v_log_task_id bigint; v_chain_id bigint; BEGIN -- Get the chain id INSERT INTO timetable.chain (chain_name, max_instances, live) VALUES ('Send Mail', 1, TRUE) RETURNING chain_id INTO v_chain_id; -- Add SendMail task INSERT INTO timetable.task (chain_id, task_order, kind, command) SELECT v_chain_id, 10, 'BUILTIN', 'SendMail' RETURNING task_id INTO v_mail_task_id; -- Create the parameters for the SensMail task -- "username": The username used for authenticating on the mail server -- "password": The password used for authenticating on the mail server -- "serverhost": The IP address or hostname of the mail server -- "serverport": The port of the mail server -- "senderaddr": The email that will appear as the sender -- "ccaddr": String array of the recipients(Cc) email addresses -- "bccaddr": String array of the recipients(Bcc) email addresses -- "toaddr": String array of the recipients(To) email addresses -- "subject": Subject of the email -- "attachment": String array of the attachments (local file) -- "attachmentdata": Pairs of name and base64-encoded content -- "msgbody": The body of the email INSERT INTO timetable.parameter (task_id, order_id, value) VALUES (v_mail_task_id, 1, '{ "username": "user@example.com", "password": "password", "serverhost": "smtp.example.com", "serverport": 587, "senderaddr": "user@example.com", "ccaddr": ["recipient_cc@example.com"], "bccaddr": ["recipient_bcc@example.com"], "toaddr": ["recipient@example.com"], "subject": "pg_timetable - No Reply", "attachment": ["D:\\Go stuff\\Books\\Concurrency in Go.pdf","report.yaml"], "attachmentdata": [{"name": "File.txt", "base64data": "RmlsZSBDb250ZW50"}], "msgbody": "<b>Hello User,</b> <p>I got some Go books for you enjoy</p> <i>pg_timetable</i>!", "contenttype": "text/html; charset=UTF-8" }'::jsonb); -- Add Log task and make it the last task using `task_order` column (=30) INSERT INTO timetable.task (chain_id, task_order, kind, command) SELECT v_chain_id, 30, 'BUILTIN', 'Log' RETURNING task_id INTO v_log_task_id; -- Add housekeeping task, that will delete sent mail and update parameter for the previous logging task -- Since we're using special add_task() function we don't need to specify the `chain_id`. -- Function will take the same `chain_id` from the parent task, SendMail in this particular case PERFORM timetable.add_task( kind => 'SQL', parent_id => v_mail_task_id, command => format( $query$WITH sent_mail(toaddr) AS (DELETE FROM timetable.parameter WHERE task_id = %s RETURNING value->>sername') INSERT INTO timetable.parameter (task_id, order_id, value) SELECT %s, 1, to_jsonb('Sent emails to: ' || string_agg(sent_mail.toaddr, ';')) FROM sent_mail ON CONFLICT (task_id, order_id) DO UPDATE SET value = EXCLUDED.value$query$, v_mail_task_id, v_log_task_id ), order_delta => 10 ); -- In the end we should have something like this. Note, that even Log task was created earlier it will be executed ter -- due to `task_order` column. -- timetable=> SELECT task_id, chain_id, kind, left(command, 50) FROM timetable.task ORDER BY task_order; -- task_id | chain_id | task_order | kind | left -- ---------+----------+------------+---------+--------------------------------------------------------------- -- 45 | 24 | 10 | BUILTIN | SendMail -- 47 | 24 | 20 | SQL | WITH sent_mail(toaddr) AS (DELETE FROM timetable.p -- 46 | 24 | 30 | BUILTIN | Log -- (3 rows) END; $$ LANGUAGE PLPGSQL;
H.2.12.3. Загрузка, Преобразование и Импорт #
Этот пример демонстрирует, как создать улучшенную трехшаговую цепочку с параметрами. Используется оператор DO для непосредственного обновления расписание схема таблицы.
-- Prepare the destination table 'location' CREATE TABLE IF NOT EXISTS city( city text, lat numeric, lng numeric, country text, iso2 text, admin_name text, capital text, population bigint, population_proper bigint); -- An enhanced example consisting of three tasks: -- 1. Download text file from internet using BUILT-IN command -- 2. Remove accents (diacritic signs) from letters using PROGRAM command (can be done with `unaccent` PostgreSQL tension) -- 3. Import text file as CSV file using BUILT-IN command (can be down with `psql -c /copy`) DO $$ DECLARE v_head_id bigint; v_task_id bigint; v_chain_id bigint; BEGIN -- Create the chain with default values executed every minute (NULL == '* * * * *' :: timetable.cron) INSERT INTO timetable.chain (chain_name, live) VALUES ('Download locations and aggregate', TRUE) RETURNING chain_id INTO v_chain_id; -- Step 1. Download file from the server -- Create the chain INSERT INTO timetable.task (chain_id, task_order, kind, command, ignore_error) VALUES (v_chain_id, 1, 'BUILTIN', 'Download', TRUE) RETURNING task_id INTO v_task_id; -- Create the parameters for the step 1: INSERT INTO timetable.parameter (task_id, order_id, value) VALUES (v_task_id, 1, '{ "workersnum": 1, "fileurls": ["https://simplemaps.com/static/data/country-cities/mt/mt.csv"], "destpath": "." }'::jsonb); RAISE NOTICE 'Step 1 completed. Chain added with ID: %; DownloadFile task added with ID: %', v_chain_id, task_id; -- Step 2. Transform Unicode characters into ASCII -- Create the program task to call 'uconv' and name it 'unaccent' INSERT INTO timetable.task (chain_id, task_order, kind, command, ignore_error, task_name) VALUES (v_chain_id, 2, 'PROGRAM', 'uconv', TRUE, 'unaccent') RETURNING task_id INTO v_task_id; -- Create the parameters for the 'unaccent' task. Input and output files in this case -- Under Windows we should call PowerShell instead of "uconv" with command: -- Set-content "orte_ansi.txt" ((Get-content "orte.txt").Normalize("FormD") -replace '\p{M}', '') INSERT INTO timetable.parameter (task_id, order_id, value) VALUES (v_task_id, 1, '["-x", "Latin-ASCII", "-o", "mt_ansi.csv", "mt.csv"]'::jsonb); RAISE NOTICE 'Step 2 completed. Unacent task added with ID: %', v_task_id; -- Step 3. Import ASCII file to PostgreSQL table using "CopyFromFile" built-in command INSERT INTO timetable.task (chain_id, task_order, kind, command) VALUES (v_chain_id, 3, 'BUILTIN', 'CopyFromFile') RETURNING task_id INTO v_task_id; -- Add the parameters for the download task. Execute client side COPY to 'location' from 'orte_ansi.txt' INSERT INTO timetable.parameter (task_id, order_id, value) VALUES (v_task_id, 1, '{"sql": "COPY city FROM STDIN (FORMAT csv, HEADER true)", "filename": "mt_ansi.v" }'::jsonb); RAISE NOTICE 'Step 3 completed. Import task added with ID: %', v_task_id; INSERT INTO timetable.task (chain_id, task_order, kind, command, ignore_error, task_name) VALUES (v_chain_id, 4, 'PROGRAM', 'bash', TRUE, 'remove .csv') RETURNING task_id INTO v_task_id; INSERT INTO timetable.parameter (task_id, order_id, value) VALUES (v_task_id, 1, '["-c", "rm *.csv"]'::jsonb); END; $$ LANGUAGE PLPGSQL;
H.2.12.4. Выполнение задач в автономной транзакции #
Этот пример демонстрирует, как выполнять специальные задачи вне контекста транзакционной цепочки. Это полезно для специальных процедур и/или нетранзакционных операций, например, CREATE DATABASE, REINDEX, VACUUM, CREATE TABLESPACE и т.д.
-- An advanced example showing how to use atutonomous tasks. -- This one-task chain will execute test_proc() procedure. -- Since procedure will make two commits (after f1() and f2()) -- we cannot use it as a regular task, because all regular tasks -- must be executed in the context of a single chain transaction. -- Same rule applies for some other SQL commands, -- e.g. CREATE DATABASE, REINDEX, VACUUM, CREATE TABLESPACE, etc. CREATE OR REPLACE FUNCTION f (msg TEXT) RETURNS void AS $$ BEGIN RAISE notice '%', msg; END; $$ LANGUAGE PLPGSQL; CREATE OR REPLACE PROCEDURE test_proc () AS $$ BEGIN PERFORM f('hey 1'); COMMIT; PERFORM f('hey 2'); COMMIT; END; $$ LANGUAGE PLPGSQL; WITH cte_chain (v_chain_id) AS ( INSERT INTO timetable.chain (chain_name, run_at, max_instances, live, self_destruct) VALUES ( 'call proc() every 10 sec', -- chain_name, '@every 10 seconds', -- run_at, 1, -- max_instances, TRUE, -- live, FALSE -- self_destruct ) RETURNING chain_id ), cte_task(v_task_id) AS ( INSERT INTO timetable.task (chain_id, task_order, kind, command, ignore_error, autonomous) SELECT v_chain_id, 10, 'SQL', 'CALL test_proc()', TRUE, TRUE FROM cte_chain RETURNING task_id ) SELECT v_chain_id, v_task_id FROM cte_task, cte_chain;
H.2.12.5. Остановите планировщик и завершите сеанс #
Этот пример демонстрирует, как завершить работу планировщика, используя специальную встроенную задачу. Это может быть использовано для управления окнами обслуживания, для перезапуска планировщика в целях обновления или для остановки сессии перед удалением базы данных.
-- This one-task chain (aka job) will terminate pg_timetable session. -- This is useful for maintaining purposes or before database being destroyed. -- One should take care of restarting pg_timetable if needed. SELECT timetable.add_job ( job_name => 'Shutdown pg_timetable session on schedule', job_schedule => '* * 1 * *', job_command => 'Shutdown', job_kind => 'BUILTIN' );
H.2.12.6. Доступ к коду и результату предыдущей задачи из следующей задачи #
Этот пример демонстрирует, как проверить код результата и вывод предыдущей задачи. Если последняя задача завершилась неудачно, это возможно только если ignore_error boolean = true установлено для этой задачи. В противном случае планировщик остановит цепочку. Этот пример показывает, как вычислить количество неудачных, успешных и общее количество выполненных задач. На основе этих значений мы можем вычислить коэффициент успеха.
WITH cte_chain (v_chain_id) AS ( -- let's create a new chain and add tasks to it later INSERT INTO timetable.chain (chain_name, run_at, max_instances, live) VALUES ('many tasks', '* * * * *', 1, true) RETURNING chain_id ), cte_tasks(v_task_id) AS ( -- now we'll add 500 tasks to the chain, some of them will fail INSERT INTO timetable.task (chain_id, task_order, kind, command, ignore_error) SELECT v_chain_id, g.s, 'SQL', 'SELECT 1.0 / round(random())::int4;', true FROM cte_chain, generate_series(1, 500) AS g(s) RETURNING task_id ), report_task(v_task_id) AS ( -- and the last reporting task will calculate the statistic INSERT INTO timetable.task (chain_id, task_order, kind, command) SELECT v_chain_id, 501, 'SQL', $CMD$DO $$ DECLARE s TEXT; BEGIN WITH report AS ( SELECT count(*) FILTER (WHERE returncode = 0) AS success, count(*) FILTER (WHERE returncode != 0) AS fail, count(*) AS total FROM timetable.execution_log WHERE chain_id = current_setting('pg_timetable.current_chain_id')::bigint AND txid = txid_current() ) SELECT 'Tasks executed:' || total || '; succeeded: ' || success || '; failed: ' || fail || '; ratio: ' || 100.0*success/GREATEST(total,1) INTO s FROM report; RAISE NOTICE '%', s; END; $$ $CMD$ FROM cte_chain RETURNING task_id ) SELECT v_chain_id FROM cte_chain
H.2.13. Миграция с других планировщиков #
H.2.13.1. Перенос заданий из pg_cron в pg_timetable #
Если вы хотите быстро экспортировать задания, запланированные из pg_cron в pg_timetable, вы можете использовать этот фрагмент SQL:
SELECT timetable.add_job( job_name => COALESCE(jobname, 'job: ' || command), job_schedule => schedule, job_command => command, job_kind => 'SQL', job_live => active ) FROM cron.job;
timetable.add_job(), однако, имеет некоторые ограничения. Прежде всего, функция пометит созданную задачу как автономный , указывая, что планировщик должен выполнить задачу вне цепочки транзакции. Это не ошибка, но многие автономные цепочки могут вызвать использование дополнительных подключений.
Во-вторых, параметры подключения к базе данных теряются для исходных pg_cron задач, делая все задачи локальными. Чтобы экспортировать всю доступную информацию как можно точнее, используйте этот фрагмент SQL под ролью, в которой они были запланированы в pg_cron:
SET ROLE 'scheduler'; -- set the role used by pg_cron WITH cron_chain AS ( SELECT nextval('timetable.chain_chain_id_seq'::regclass) AS cron_id, jobname, schedule, active, command, CASE WHEN database != current_database() OR nodename != 'localhost' OR username != CURRENT_USER OR nodeport != inet_server_port() THEN format('host=%s port=%s dbname=%s user=%s', nodename, nodeport, database, username) END AS connstr FROM cron.job ), cte_chain AS ( INSERT INTO timetable.chain (chain_id, chain_name, run_at, live) SELECT cron_id, COALESCE(jobname, 'cronjob' || cron_id), schedule, active FROM cron_chain ), cte_tasks AS ( INSERT INTO timetable.task (chain_id, task_order, kind, command, database_connection) SELECT cron_id, 1, 'SQL', command, connstr FROM cron_chain RETURNING chain_id, task_id ) SELECT * FROM cte_tasks;
H.2.13.2. Перенос заданий из pgAgent в pg_timetable #
Чтобы перенести задания из pgAgent , пожалуйста, используйте этот скрипт. pgAgent не имеет концепции задачи PROGRAM, поэтому для эмуляции шагов BATCH, pg_timetable будет выполнять их внутри оболочки. Вы можете изменить оболочку, отредактировав cte_shell CTE выражение.
CREATE OR REPLACE FUNCTION bool_array_to_cron(bool[], start_with int4 DEFAULT 0) RETURNS TEXT AS $$ WITH u AS ( SELECT unnest($1) e, generate_series($2, array_length($1, 1)-1+$2) AS i ) SELECT COALESCE(string_agg(i::text, ','), '*') FROM u WHERE e $$ LANGUAGE sql; WITH cte_shell(shell, cmd_param) AS ( VALUES ('sh', '-c') -- set the shell you want to use for batch steps, e.g. "pwsh -c", "cmd /C" ), pga_schedule AS ( SELECT s.jscjobid, s.jscname, format('%s %s %s %s %s', bool_array_to_cron(s.jscminutes), bool_array_to_cron(s.jschours), bool_array_to_cron(s.jscmonthdays), bool_array_to_cron(s.jscmonths, 1), bool_array_to_cron(s.jscweekdays, 1)) AS schedule FROM pgagent.pga_schedule s WHERE s.jscenabled AND now() < COALESCE(s.jscend, 'infinity'::timestamptz) AND now() > s.jscstart ), pga_chain AS ( SELECT nextval('timetable.chain_chain_id_seq'::regclass) AS chain_id, jobid, format('%s @ %s', jobname, jscname) AS jobname, jobhostagent, jobenabled, schedule FROM pgagent.pga_job JOIN pga_schedule ON jobid = jscjobid ), cte_chain AS ( INSERT INTO timetable.chain (chain_id, chain_name, client_name, run_at, live) SELECT chain_id, jobname, jobhostagent, schedule, jobenabled FROM pga_chain ), pga_step AS ( SELECT c.chain_id, nextval('timetable.task_task_id_seq'::regclass) AS task_id, rank() OVER (ORDER BY jstname) AS jstorder, jstid, jstname, jstenabled, CASE jstkind WHEN 'b' THEN 'PROGRAM' ELSE 'SQL' END AS jstkind, jstcode, COALESCE( NULLIF(jstconnstr, ''), CASE WHEN jstdbname = current_database() THEN NULL WHEN jstdbname > '' THEN 'dbname=' || jstdbname END ) AS jstconnstr, jstonerror != 'f' AS jstignoreerror FROM pga_chain c JOIN pgagent.pga_jobstep js ON c.jobid = js.jstjobid ), cte_tasks AS ( INSERT INTO timetable.task(task_id, chain_id, task_name, task_order, kind, command, database_connection) SELECT task_id, chain_id, jstname, jstorder, jstkind::timetable.command_kind, CASE jstkind WHEN 'SQL' THEN jstcode ELSE sh.shell END, jstconnstr FROM pga_step, cte_shell sh ), cte_parameters AS ( INSERT INTO timetable.parameter (task_id, order_id, value) SELECT task_id, 1, jsonb_build_array(sh.cmd_param, s.jstcode) FROM pga_step s, cte_shell sh WHERE s.jstkind = 'PROGRAM' ) SELECT * FROM pga_chain;
H.2.14. REST API #
pg_timetable имеет богатый REST API, который может использоваться внешними инструментами для выполнения запуска/остановки/инициализации/перезапуска/перезагрузки, любыми инструментами для выполнения проверок состояния HTTP, и, конечно, также может использоваться для мониторинга.
Ниже вы найдете список pg_timetable REST API конечные точки.
H.2.14.1. Точки проверки состояния #
-
GET /liveness
Всегда возвращает HTTP статус-код
200
, указывая, что pg_timetable работает.-
GET /readiness
Возвращает HTTP статус-код
200
, когда pg_timetable работает, и планировщик находится в основном цикле обработки цепочек. Если планировщик подключается к базе данных, создает схему базы данных или обновляет ее, он вернет HTTP статус-код503
.
H.2.14.2. Управление цепочкой конечные точки #
-
GET /startchain?id=<chain-id>
Возвращает HTTP статус-код
200
, если цепочка с данным идентификатором может быть добавлена в очередь рабочих. Однако это не означает, что выполнение цепочки начнется немедленно. Рабочий должен выполнить проверку нагрузки и другие проверки перед началом выполнения цепочки. В случае ошибки возвращается HTTP статус-код400
с последующим сообщением об ошибке.-
GET /stopchain?id=<chain-id>
Возвращает HTTP статус-код
200
, если цепочка с данным идентификатором работает в данный момент и может быть остановлена. Если цепочка выполняется, сигнал отмены будет отправлен немедленно. В случае ошибки возвращается HTTP статус-код400
с сообщением об ошибке.
H.2.15. Схема базы данных #
pg_timetable является приложением, управляемым базой данных. При первом запуске создается необходимая схема, если она отсутствует.
H.2.15.1. Основные таблицы и объекты #
CREATE TABLE timetable.chain ( chain_id BIGSERIAL PRIMARY KEY, chain_name TEXT NOT NULL UNIQUE, run_at timetable.cron, max_instances INTEGER, timeout INTEGER DEFAULT 0, live BOOLEAN DEFAULT FALSE, self_destruct BOOLEAN DEFAULT FALSE, exclusive_execution BOOLEAN DEFAULT FALSE, client_name TEXT, on_error TEXT ); COMMENT ON TABLE timetable.chain IS 'Stores information about chains schedule'; COMMENT ON COLUMN timetable.chain.run_at IS 'Extended CRON-style time notation the chain has to be run at'; COMMENT ON COLUMN timetable.chain.max_instances IS 'Number of instances (clients) this chain can run in parallel'; COMMENT ON COLUMN timetable.chain.timeout IS 'Abort any chain that takes more than the specified number of milliseconds'; COMMENT ON COLUMN timetable.chain.live IS 'Indication that the chain is ready to run, set to FALSE to pause execution'; COMMENT ON COLUMN timetable.chain.self_destruct IS 'Indication that this chain will delete itself after successful run'; COMMENT ON COLUMN timetable.chain.exclusive_execution IS 'All parallel chains should be paused while executing this chain'; COMMENT ON COLUMN timetable.chain.client_name IS 'Only client with this name is allowed to run this chain, set to NULL to allow any client'; CREATE TYPE timetable.command_kind AS ENUM ('SQL', 'PROGRAM', 'BUILTIN'); CREATE TABLE timetable.task ( task_id BIGSERIAL PRIMARY KEY, chain_id BIGINT REFERENCES timetable.chain(chain_id) ON UPDATE CASCADE ON DELETE SCADE, task_order DOUBLE PRECISION NOT NULL, task_name TEXT, kind timetable.command_kind NOT NULL DEFAULT 'SQL', command TEXT NOT NULL, run_as TEXT, database_connection TEXT, ignore_error BOOLEAN NOT NULL DEFAULT FALSE, autonomous BOOLEAN NOT NULL DEFAULT FALSE, timeout INTEGER DEFAULT 0 ); COMMENT ON TABLE timetable.task IS 'Holds information about chain elements aka tasks'; COMMENT ON COLUMN timetable.task.chain_id IS 'Link to the chain, if NULL task considered to be disabled'; COMMENT ON COLUMN timetable.task.task_order IS 'Indicates the order of task within a chain'; COMMENT ON COLUMN timetable.task.run_as IS 'Role name to run task as. Uses SET ROLE for SQL commands'; COMMENT ON COLUMN timetable.task.ignore_error IS 'Indicates whether a next task in a chain can be executed regardless of the success of the current one'; COMMENT ON COLUMN timetable.task.kind IS 'Indicates whether "command" is SQL, built-in function or an external program'; COMMENT ON COLUMN timetable.task.command IS 'Contains either an SQL command, or command string to be executed'; COMMENT ON COLUMN timetable.task.timeout IS 'Abort any task within a chain that takes more than the specified number of milliseconds'; -- parameter passing for a chain task CREATE TABLE timetable.parameter( task_id BIGINT REFERENCES timetable.task(task_id) ON UPDATE CASCADE ON DELETE CASCADE, order_id INTEGER CHECK (order_id > 0), value JSONB, PRIMARY KEY (task_id, order_id) ); COMMENT ON TABLE timetable.parameter IS 'Stores parameters passed as arguments to a chain task'; CREATE UNLOGGED TABLE timetable.active_session( client_pid BIGINT NOT NULL, server_pid BIGINT NOT NULL, client_name TEXT NOT NULL, started_at TIMESTAMPTZ DEFAULT now() ); COMMENT ON TABLE timetable.active_session IS 'Stores information about active sessions'; CREATE TYPE timetable.log_type AS ENUM ('DEBUG', 'NOTICE', 'INFO', 'ERROR', 'PANIC', 'USER'); CREATE OR REPLACE FUNCTION timetable.get_client_name(integer) RETURNS TEXT AS $$ SELECT client_name FROM timetable.active_session WHERE server_pid = $1 LIMIT 1 $$ LANGUAGE sql; CREATE TABLE timetable.log ( ts TIMESTAMPTZ DEFAULT now(), pid INTEGER NOT NULL, log_level timetable.log_type NOT NULL, client_name TEXT DEFAULT timetable.get_client_name(pg_backend_pid()), message TEXT, message_data jsonb ); COMMENT ON TABLE timetable.log IS 'Stores log entries of active sessions'; CREATE TABLE timetable.execution_log ( chain_id BIGINT, task_id BIGINT, txid BIGINT NOT NULL, last_run TIMESTAMPTZ DEFAULT now(), finished TIMESTAMPTZ, pid BIGINT, returncode INTEGER, kind timetable.command_kind, command TEXT, output TEXT, client_name TEXT NOT NULL ); COMMENT ON TABLE timetable.execution_log IS 'Stores log entries of executed tasks and chains'; CREATE UNLOGGED TABLE timetable.active_chain( chain_id BIGINT NOT NULL, client_name TEXT NOT NULL, started_at TIMESTAMPTZ DEFAULT now() ); COMMENT ON TABLE timetable.active_chain IS 'Stores information about active chains within session'; CREATE OR REPLACE FUNCTION timetable.try_lock_client_name(worker_pid BIGINT, worker_name TEXT) RETURNS bool AS $CODE$ BEGIN IF pg_is_in_recovery() THEN RAISE NOTICE 'Cannot obtain lock on a replica. Please, use the primary node'; RETURN FALSE; END IF; -- remove disconnected sessions DELETE FROM timetable.active_session WHERE server_pid NOT IN ( SELECT pid FROM pg_catalog.pg_stat_activity WHERE application_name = 'pg_timetable' ); DELETE FROM timetable.active_chain WHERE client_name NOT IN ( SELECT client_name FROM timetable.active_session ); -- check if there any active sessions with the client name but different client pid PERFORM 1 FROM timetable.active_session s WHERE s.client_pid <> worker_pid AND s.client_name = worker_name LIMIT 1; IF FOUND THEN RAISE NOTICE 'Another client is already connected to server with name: %', worker_name; RETURN FALSE; END IF; -- insert current session information INSERT INTO timetable.active_session(client_pid, client_name, server_pid) VALUES (worker_pid, worker_name, backend_pid()); RETURN TRUE; END; $CODE$ STRICT LANGUAGE plpgsql;
H.2.15.2. Функции, связанные с работой #
-- add_task() will add a task to the same chain as the task with `parent_id` CREATE OR REPLACE FUNCTION timetable.add_task( IN kind timetable.command_kind, IN command TEXT, IN parent_id BIGINT, IN order_delta DOUBLE PRECISION DEFAULT 10 ) RETURNS BIGINT AS $$ INSERT INTO timetable.task (chain_id, task_order, kind, command) SELECT chain_id, task_order + $4, $1, $2 FROM timetable.task WHERE task_id = $3 RETURNING task_id $$ LANGUAGE SQL; COMMENT ON FUNCTION timetable.add_task IS 'Add a task to the same chain as the task with parent_id'; -- add_job() will add one-task chain to the system CREATE OR REPLACE FUNCTION timetable.add_job( job_name TEXT, job_schedule timetable.cron, job_command TEXT, job_parameters JSONB DEFAULT NULL, job_kind timetable.command_kind DEFAULT 'SQL'::timetable.command_kind, job_client_name TEXT DEFAULT NULL, job_max_instances INTEGER DEFAULT NULL, job_live BOOLEAN DEFAULT TRUE, job_self_destruct BOOLEAN DEFAULT FALSE, job_ignore_errors BOOLEAN DEFAULT TRUE, job_exclusive BOOLEAN DEFAULT FALSE, job_on_error TEXT DEFAULT NULL ) RETURNS BIGINT AS $$ WITH cte_chain (v_chain_id) AS ( INSERT INTO timetable.chain (chain_name, run_at, max_instances, live, self_destruct, client_name, clusive_execution, on_error) VALUES (job_name, job_schedule,job_max_instances, job_live, job_self_destruct, job_client_name, b_exclusive, job_on_error) RETURNING chain_id ), cte_task(v_task_id) AS ( INSERT INTO timetable.task (chain_id, task_order, kind, command, ignore_error, autonomous) SELECT v_chain_id, 10, job_kind, job_command, job_ignore_errors, TRUE FROM cte_chain RETURNING task_id ), cte_param AS ( INSERT INTO timetable.parameter (task_id, order_id, value) SELECT v_task_id, 1, job_parameters FROM cte_task, cte_chain ) SELECT v_chain_id FROM cte_chain $$ LANGUAGE SQL; COMMENT ON FUNCTION timetable.add_job IS 'Add one-task chain (aka job) to the system'; -- notify_chain_start() will send notification to the worker to start the chain CREATE OR REPLACE FUNCTION timetable.notify_chain_start( chain_id BIGINT, worker_name TEXT, start_delay INTERVAL DEFAULT NULL ) RETURNS void AS $$ SELECT pg_notify( worker_name, format('{"ConfigID": %s, "Command": "START", "Ts": %s, "Delay": %s}', chain_id, EXTRACT(epoch FROM clock_timestamp())::bigint, COALESCE(EXTRACT(epoch FROM start_delay)::bigint, 0) ) ) $$ LANGUAGE SQL; COMMENT ON FUNCTION timetable.notify_chain_start IS 'Send notification to the worker to start the chain'; -- notify_chain_stop() will send notification to the worker to stop the chain CREATE OR REPLACE FUNCTION timetable.notify_chain_stop( chain_id BIGINT, worker_name TEXT ) RETURNS void AS $$ SELECT pg_notify( worker_name, format('{"ConfigID": %s, "Command": "STOP", "Ts": %s}', chain_id, EXTRACT(epoch FROM clock_timestamp())::bigint) ) $$ LANGUAGE SQL; COMMENT ON FUNCTION timetable.notify_chain_stop IS 'Send notification to the worker to stop the chain'; -- move_task_up() will switch the order of the task execution with a previous task within the chain CREATE OR REPLACE FUNCTION timetable.move_task_up(IN task_id BIGINT) RETURNS boolean AS $$ WITH current_task (ct_chain_id, ct_id, ct_order) AS ( SELECT chain_id, task_id, task_order FROM timetable.task WHERE task_id = $1 ), tasks(t_id, t_new_order) AS ( SELECT task_id, COALESCE(LAG(task_order) OVER w, LEAD(task_order) OVER w) FROM timetable.task t, current_task ct WHERE chain_id = ct_chain_id AND (task_order < ct_order OR task_id = ct_id) WINDOW w AS (PARTITION BY chain_id ORDER BY ABS(task_order - ct_order)) LIMIT 2 ), upd AS ( UPDATE timetable.task t SET task_order = t_new_order FROM tasks WHERE tasks.t_id = t.task_id AND tasks.t_new_order IS NOT NULL RETURNING true ) SELECT COUNT(*) > 0 FROM upd $$ LANGUAGE SQL; COMMENT ON FUNCTION timetable.move_task_up IS 'Switch the order of the task execution with a previous task within chain'; -- move_task_down() will switch the order of the task execution with a following task within the chain CREATE OR REPLACE FUNCTION timetable.move_task_down(IN task_id BIGINT) RETURNS boolean AS $$ WITH current_task (ct_chain_id, ct_id, ct_order) AS ( SELECT chain_id, task_id, task_order FROM timetable.task WHERE task_id = $1 ), tasks(t_id, t_new_order) AS ( SELECT task_id, COALESCE(LAG(task_order) OVER w, LEAD(task_order) OVER w) FROM timetable.task t, current_task ct WHERE chain_id = ct_chain_id AND (task_order > ct_order OR task_id = ct_id) WINDOW w AS (PARTITION BY chain_id ORDER BY ABS(task_order - ct_order)) LIMIT 2 ), upd AS ( UPDATE timetable.task t SET task_order = t_new_order FROM tasks WHERE tasks.t_id = t.task_id AND tasks.t_new_order IS NOT NULL RETURNING true ) SELECT COUNT(*) > 0 FROM upd $$ LANGUAGE SQL; COMMENT ON FUNCTION timetable.move_task_down IS 'Switch the order of the task execution with a following task hin the chain'; -- delete_job() will delete the chain and its tasks from the system CREATE OR REPLACE FUNCTION timetable.delete_job(IN job_name TEXT) RETURNS boolean AS $$ WITH del_chain AS (DELETE FROM timetable.chain WHERE chain.chain_name = $1 RETURNING chain_id) SELECT EXISTS(SELECT 1 FROM del_chain) $$ LANGUAGE SQL; COMMENT ON FUNCTION timetable.delete_job IS 'Delete the chain and its tasks from the system'; -- delete_task() will delete the task from a chain CREATE OR REPLACE FUNCTION timetable.delete_task(IN task_id BIGINT) RETURNS boolean AS $$ WITH del_task AS (DELETE FROM timetable.task WHERE task_id = $1 RETURNING task_id) SELECT EXISTS(SELECT 1 FROM del_task) $$ LANGUAGE SQL; COMMENT ON FUNCTION timetable.delete_task IS 'Delete the task from a chain';
H.2.15.3. Функции, связанные с Сron #
CREATE OR REPLACE FUNCTION timetable.cron_split_to_arrays( cron text, OUT mins integer[], OUT hours integer[], OUT days integer[], OUT months integer[], OUT dow integer[] ) RETURNS record AS $$ DECLARE a_element text[]; i_index integer; a_tmp text[]; tmp_item text; a_range int[]; a_split text[]; a_res integer[]; max_val integer; min_val integer; dimensions constant text[] = '{"minutes", "hours", "days", "months", "days of week"}'; allowed_ranges constant integer[][] = '{{0,59},{0,23},{1,31},{1,12},{0,7}}'; BEGIN a_element := regexp_split_to_array(cron, '\s+'); FOR i_index IN 1..5 LOOP a_res := NULL; a_tmp := string_to_array(a_element[i_index],','); FOREACH tmp_item IN ARRAY a_tmp LOOP IF tmp_item ~ '^[0-9]+$' THEN -- normal integer a_res := array_append(a_res, tmp_item::int); ELSIF tmp_item ~ '^[*]+$' THEN -- '*' any value a_range := array(select generate_series(allowed_ranges[i_index][1], allowed_ranges[i_index][2])); a_res := array_cat(a_res, a_range); ELSIF tmp_item ~ '^[0-9]+[-][0-9]+$' THEN -- '-' range of values a_range := regexp_split_to_array(tmp_item, '-'); a_range := array(select generate_series(a_range[1], a_range[2])); a_res := array_cat(a_res, a_range); ELSIF tmp_item ~ '^[0-9]+[\/][0-9]+$' THEN -- '/' step values a_range := regexp_split_to_array(tmp_item, '/'); a_range := array(select generate_series(a_range[1], allowed_ranges[i_index][2], a_range[2])); a_res := array_cat(a_res, a_range); ELSIF tmp_item ~ '^[0-9-]+[\/][0-9]+$' THEN -- '-' range of values and '/' step values a_split := regexp_split_to_array(tmp_item, '/'); a_range := regexp_split_to_array(a_split[1], '-'); a_range := array(select generate_series(a_range[1], a_range[2], a_split[2]::int)); a_res := array_cat(a_res, a_range); ELSIF tmp_item ~ '^[*]+[\/][0-9]+$' THEN -- '*' any value and '/' step values a_split := regexp_split_to_array(tmp_item, '/'); a_range := array(select generate_series(allowed_ranges[i_index][1], allowed_ranges[i_index][2], a_split[2]::int)); a_res := array_cat(a_res, a_range); ELSE RAISE EXCEPTION 'Value ("%") not recognized', a_element[i_index] USING HINT = 'fields separated by space or tab.'+ 'Values allowed: numbers (value list with ","), '+ 'any value with "*", range of value with "-" and step values with "/"!'; END IF; END LOOP; SELECT ARRAY_AGG(x.val), MIN(x.val), MAX(x.val) INTO a_res, min_val, max_val FROM ( SELECT DISTINCT UNNEST(a_res) AS val ORDER BY val) AS x; IF max_val > allowed_ranges[i_index][2] OR min_val < allowed_ranges[i_index][1] OR a_res IS NULL THEN RAISE EXCEPTION '% is out of range % for %', tmp_item, allowed_ranges[i_index:i_index][:], dimensions[i_index]; END IF; CASE i_index WHEN 1 THEN mins := a_res; WHEN 2 THEN hours := a_res; WHEN 3 THEN days := a_res; WHEN 4 THEN months := a_res; ELSE dow := a_res; END CASE; END LOOP; RETURN; END; $$ LANGUAGE PLPGSQL STRICT; CREATE OR REPLACE FUNCTION timetable.cron_months( from_ts timestamptz, allowed_months int[] ) RETURNS SETOF timestamptz AS $$ WITH am(am) AS (SELECT UNNEST(allowed_months)), genm(ts) AS ( --generated months SELECT date_trunc('month', ts) FROM pg_catalog.generate_series(from_ts, from_ts + INTERVAL '1 year', INTERVAL '1 month') g(ts) ) SELECT ts FROM genm JOIN am ON date_part('month', genm.ts) = am.am $$ LANGUAGE SQL STRICT; CREATE OR REPLACE FUNCTION timetable.cron_days( from_ts timestamptz, allowed_months int[], allowed_days int[], allowed_week_days int[] ) RETURNS SETOF timestamptz AS $$ WITH ad(ad) AS (SELECT UNNEST(allowed_days)), am(am) AS (SELECT * FROM timetable.cron_months(from_ts, allowed_months)), gend(ts) AS ( --generated days SELECT date_trunc('day', ts) FROM am, pg_catalog.generate_series(am.am, am.am + INTERVAL '1 month' - INTERVAL '1 day', -- don't include the same day of the next month INTERVAL '1 day') g(ts) ) SELECT ts FROM gend JOIN ad ON date_part('day', gend.ts) = ad.ad WHERE extract(dow from ts)=ANY(allowed_week_days) $$ LANGUAGE SQL STRICT; CREATE OR REPLACE FUNCTION timetable.cron_times( allowed_hours int[], allowed_minutes int[] ) RETURNS SETOF time AS $$ WITH ah(ah) AS (SELECT UNNEST(allowed_hours)), am(am) AS (SELECT UNNEST(allowed_minutes)) SELECT make_time(ah.ah, am.am, 0) FROM ah CROSS JOIN am $$ LANGUAGE SQL STRICT; CREATE OR REPLACE FUNCTION timetable.cron_runs( from_ts timestamp with time zone, cron text ) RETURNS SETOF timestamptz AS $$ SELECT cd + ct FROM timetable.cron_split_to_arrays(cron) a, timetable.cron_times(a.hours, a.mins) ct CROSS JOIN timetable.cron_days(from_ts, a.months, a.days, a.dow) cd WHERE cd + ct > from_ts ORDER BY 1 ASC; $$ LANGUAGE SQL STRICT; CREATE DOMAIN timetable.cron AS TEXT CHECK( VALUE = '@reboot' OR substr(VALUE, 1, 6) IN ('@every', '@after') AND (substr(VALUE, 7) :: INTERVAL) IS NOT NULL OR VALUE ~ '^(((\d+,)+\d+|(\d+(\/|-)\d+)|(\*(\/|-)\d+)|\d+|\*) +){4}(((\d+,)+\d+|(\d+(\/|-)\d+)|(\*(\/|-)\d+)|\d+|\*) ?)$' AND timetable.cron_split_to_arrays(VALUE) IS NOT NULL ); COMMENT ON DOMAIN timetable.cron IS 'Extended CRON-style notation with support of interval values'; -- is_cron_in_time returns TRUE if timestamp is listed in cron expression CREATE OR REPLACE FUNCTION timetable.is_cron_in_time( run_at timetable.cron, ts timestamptz ) RETURNS BOOLEAN AS $$ SELECT CASE WHEN run_at IS NULL THEN TRUE ELSE date_part('month', ts) = ANY(a.months) AND (date_part('dow', ts) = ANY(a.dow) OR date_part('isodow', ts) = ANY(a.dow)) AND date_part('day', ts) = ANY(a.days) AND date_part('hour', ts) = ANY(a.hours) AND date_part('minute', ts) = ANY(a.mins) END FROM timetable.cron_split_to_arrays(run_at) a $$ LANGUAGE SQL; CREATE OR REPLACE FUNCTION timetable.next_run(cron timetable.cron) RETURNS timestamptz AS $$ SELECT * FROM timetable.cron_runs(now(), cron) LIMIT 1 $$ LANGUAGE SQL STRICT;
H.2.15.4. ER-Диаграмма #
