H.3. pg_timetable#
H.3. pg_timetable #
- H.3.1. About pg_timetable
- H.3.2. Main features
- H.3.3. Quick Start
- H.3.4. Command line options
- H.3.5. Contributing
- H.3.6. Support
- H.3.7. Authors
- H.3.8. Project background
- H.3.9. Installation
- H.3.10. Components
- H.3.11. Getting started
- H.3.12. Samples
- H.3.13. Migration from others schedulers
- H.3.14. REST API
- H.3.15. Database Schema
pg_timetable is an advanced job scheduler for PostgreSQL, offering many advantages over traditional schedulers such as cron and others. It is completely database driven and provides a couple of advanced concepts.
H.3.2. Main features #
Tasks can be arranged in chains
A chain can consist of built-int commands, SQL and executables
Parameters can be passed to chains
Missed tasks (possibly due to downtime) can be retried automatically
Support for configurable repetitions
Built-in tasks such as sending emails, etc.
Fully database driven configuration
Full support for database driven logging
Cron-style scheduling at the PostgreSQL server time zone
Optional concurrency protection
Task and chain can have execution timeout settings
H.3.3. Quick Start #
Download pg_timetable executable
Make sure your PostgreSQL server is up and running and has a role with
CREATE
privilege for a target database, e.g.my_database=> CREATE ROLE scheduler PASSWORD 'somestrong'; my_database=> GRANT CREATE ON DATABASE my_database TO scheduler;
Create a new job, e.g. run
VACUUM
each night at 00:30 Postgres server time zonemy_database=> SELECT timetable.add_job('frequent-vacuum', '30 * * * *', 'VACUUM'); add_job --------- 3 (1 row)
Run the pg_timetable
# pg_timetable postgresql://scheduler:somestrong@localhost/my_database --clientname=vacuumer
PROFIT!
H.3.4. Command line options #
# ./pg_timetable Application Options: -c, --clientname= Unique name for application instance [$PGTT_CLIENTNAME] --config= YAML configuration file --no-program-tasks Disable executing of PROGRAM tasks [$PGTT_NOPROGRAMTASKS] -v, --version Output detailed version information [$PGTT_VERSION] Connection: -h, --host= PostgreSQL host (default: localhost) [$PGTT_PGHOST] -p, --port= PostgreSQL port (default: 5432) [$PGTT_PGPORT] -d, --dbname= PostgreSQL database name (default: timetable) [$PGTT_PGDATABASE] -u, --user= PostgreSQL user (default: scheduler) [$PGTT_PGUSER] --password= PostgreSQL user password [$PGTT_PGPASSWORD] --sslmode=[disable|require] Connection SSL mode (default: disable) [$PGTT_PGSSLMODE] --pgurl= PostgreSQL connection URL [$PGTT_URL] --timeout= PostgreSQL connection timeout (default: 90) [$PGTT_TIMEOUT] Logging: --log-level=[debug|info|error] Verbosity level for stdout and log file (default: info) --log-database-level=[debug|info|error|none] Verbosity level for database storing (default: info) --log-file= File name to store logs --log-file-format=[json|text] Format of file logs (default: json) --log-file-rotate Rotate log files --log-file-size= Maximum size in MB of the log file before it gets rotated (default: 100) --log-file-age= Number of days to retain old log files, 0 means forever (default: 0) --log-file-number= Maximum number of old log files to retain, 0 to retain all (default: 0) Start: -f, --file= SQL script file to execute during startup --init Initialize database schema to the latest version and exit. Can be used with --upgrade --upgrade Upgrade database to the latest version --debug Run in debug mode. Only asynchronous chains will be executed Resource: --cron-workers= Number of parallel workers for scheduled chains (default: 16) --interval-workers= Number of parallel workers for interval chains (default: 16) --chain-timeout= Abort any chain that takes more than the specified number of milliseconds --task-timeout= Abort any task within a chain that takes more than the specified number of milliseconds REST: --rest-port= REST API port (default: 0) [$PGTT_RESTPORT]
H.3.5. Contributing #
If you want to contribute to pg_timetable and help make it better, feel free to open an issue or even consider submitting a pull request. You also can give a star to pg_timetable project, and to tell the world about it.
H.3.8. Project background #
The pg_timetable project got started back in 2019 for internal scheduling needs at Cybertec.
For more background on the project motivations and design goals see the original series of blogposts announcing the project and the following feature updates.
Cybertec also provides commercial 9-to-5 and 24/7 support for pg_timetable.
H.3.8.1. Project feedback #
For feature requests or troubleshooting assistance please open an issue on project’s Github page.
H.3.9. Installation #
pg_timetable is compatible with the latest supported PostgreSQL versions: 11, 12, 13, 14 (stable); 15 (dev).
Note
If you want to use pg_timetable with older versions (9.5, 9.6 and 10)... please, execute this SQL command before running pg_timetable:
CREATE OR REPLACE FUNCTION starts_with(text, text) RETURNS bool AS $$ SELECT CASE WHEN length($2) > length($1) THEN FALSE ELSE left($1, length($2)) = $2 END $$ LANGUAGE SQL IMMUTABLE STRICT PARALLEL SAFE COST 5;
H.3.9.1. Official release packages #
You may find binary package for your platform on the official Releases page. Right now Windows, Linux and macOS packages are available.
H.3.9.2. Docker #
The official docker image can be found here: https://hub.docker.com/r/cybertecpostgresql/pg_timetable
Note
The latest
tag is up to date with the master
branch thanks to
this
github action. In production you probably want to use the
latest
stable
tag.
Run pg_timetable in Docker:
docker run --rm \ cybertecpostgresql/pg_timetable:latest \ -h 10.0.0.3 -p 54321 -c worker001
Run pg_timetable in Docker with Environment variables:
docker run --rm \ -e PGTT_PGHOST=10.0.0.3 \ -e PGTT_PGPORT=54321 \ cybertecpostgresql/pg_timetable:latest \ -c worker001
H.3.9.3. Build from sources #
Download and install Go on your system.
Clone pg_timetable repo:
$ git clone https://github.com/cybertec-postgresql/pg_timetable.git $ cd pg_timetable
Run pg_timetable:
$ go run main.go --dbname=dbname --clientname=worker001 --user=scheduler --password=strongpwd
Alternatively, build a binary and run it:
$ go build $ ./pg_timetable --dbname=dbname --clientname=worker001 --user=scheduler --password=strongpwd
(Optional) Run tests in all sub-folders of the project:
$ psql --command="CREATE USER scheduler PASSWORD 'somestrong'" $ createdb --owner=scheduler timetable $ go test -failfast -timeout=300s -count=1 -p 1 ./...
H.3.10. Components #
The scheduling in pg_timetable encompasses three different abstraction levels to facilitate the reuse with other parameters or additional schedules.
- Command:
The base level, command, defines what to do.
- Task:
The second level, task, represents a chain element (step) to run one of the commands. With tasks we define order of commands, arguments passed (if any), and how errors are handled.
- Chain:
The third level represents a connected tasks forming a chain of tasks. Chain defines if, when, and how often a job should be executed.
H.3.10.1. Command #
Currently, there are three different kinds of commands:
-
SQL
SQL snippet. Starting a cleanup, refreshing a materialized view or processing data.
-
PROGRAM
External Command. Anything that can be called as an external binary, including shells, e.g.
bash
,pwsh
, etc. The external command will be called using golang’s exec.CommandContext.-
BUILTIN
Internal Command. A prebuilt functionality included in pg_timetable. These include:
NoOp,
Sleep,
Log,
SendMail,
Download,
CopyFromFile,
CopyToFile,
Shutdown.
H.3.10.2. Task #
The next building block is a task, which simply represents a step in a list of chain commands. An example of tasks combined in a chain would be:
Download files from a server
Import files
Run aggregations
Build report
Remove the files from disk
Note
All tasks of the chain in
pg_timetable are executed
within one transaction. However, please, pay attention there is
no opportunity to rollback PROGRAM
and
BUILTIN
tasks.
H.3.10.2.1. Table timetable.task #
-
chain_id bigint
Link to the chain, if
NULL
task considered to be disabled-
task_order DOUBLE PRECISION
Indicates the order of task within a chain.
-
kind timetable.command_kind
The type of the command. Can be SQL (default), PROGRAM or BUILTIN.
-
command text
Contains either a SQL command, a path to application or name of the BUILTIN command which will be executed.
-
run_as text
The role as which the task should be executed as.
-
database_connection text
The connection string for the external database that should be used.
-
ignore_error boolean
Specify if the next task should proceed after encountering an error (default:
false
).-
autonomous boolean
Specify if the task should be executed out of the chain transaction. Useful for
VACUUM
,CREATE DATABASE
,CALL
etc.-
timeout integer
Abort any task within a chain that takes more than the specified number of milliseconds.
Warning
If the task has been
configured with ignore_error
set to
true
(the default value is
false
), the worker process will report a
success on execution even if the task within the
chain fails.
As mentioned above, commands are simple skeletons (e.g. send email, vacuum, etc.). In most cases, they have to be brought to live by passing input parameters to the execution.
H.3.10.2.2. Table timetable.parameter #
-
task_id bigint
The ID of the task.
-
order_id integer
The order of the parameter. Several parameters are processed one by one according to the order.
-
value jsonb
A JSON value containing the parameters.
H.3.10.2.3. Parameter value format #
Depending on the command kind argument can be represented by different JSON values.
- Kind
- Schema
Example
-
SQL
-
array
'[ "one", 2, 3.14, false ]'::jsonb
-
-
PROGRAM
-
array of strings
'["-x", "Latin-ASCII", "-o", "orte_ansi.txt", "orte.txt"]'::jsonb
-
-
BUILTIN: Sleep
-
integer
'5' :: jsonb
-
-
BUILTIN: Log
-
any
'"WARNING"'::jsonb '{"Status": "WARNING"}'::jsonb
-
-
BUILTIN: SendMail
-
object
'{ "username": "user@example.com", "password": "password", "serverhost": "smtp.example.com", "serverport": 587, "senderaddr": "user@example.com", "ccaddr": ["recipient_cc@example.com"], "bccaddr": ["recipient_bcc@example.com"], "toaddr": ["recipient@example.com"], "subject": "pg_timetable - No Reply", "attachment": ["/temp/attachments/Report.pdf","config.yaml"], "attachmentdata": [{"name": "File.txt", "base64data": "RmlsZSBDb250ZW50"}], "msgbody": "<h2>Hello User,</h2> <p>check some attachments!</p>", "contenttype": "text/html; charset=UTF-8" }'::jsonb
-
-
BUILTIN: Download
-
object
'{ "workersnum": 2, "fileurls": ["http://example.com/foo.gz", "https://example.com/bar.csv"], "destpath": "." }'::jsonb
-
-
BUILTIN: CopyFromFile
-
object
'{ "sql": "COPY location FROM STDIN", "filename": "download/orte_ansi.txt" }'::jsonb
-
-
BUILTIN: CopyToFile
-
object
'{ "sql": "COPY location TO STDOUT", "filename": "download/location.txt" }'::jsonb
-
-
BUILTIN: Shutdown
value ignored
-
BUILTIN: NoOp
value ignored
H.3.10.3. Chain #
Once tasks have been arranged, they have to be scheduled as a chain. For this, pg_timetable builds upon the enhanced cron-string, all the while adding multiple configuration options.
H.3.10.3.1. Table timetable.chain #
-
chain_name text
The unique name of the chain.
-
run_at timetable.cron
Standard cron-style value at Postgres server time zone or
@after
,@every
,@reboot
clause.-
max_instances integer
The amount of instances that this chain may have running at the same time.
-
timeout integer
Abort any chain that takes more than the specified number of milliseconds.
-
live boolean
Control if the chain may be executed once it reaches its schedule.
-
self_destruct boolean
Self destruct the chain after successful execution. Failed chains will be executed according to the schedule one more time.
-
exclusive_execution boolean
Specifies whether the chain should be executed exclusively while all other chains are paused.
-
client_name text
Specifies which client should execute the chain. Set this to NULL to allow any client.
-
timeout integer
Abort a chain that takes more than the specified number of milliseconds.
-
on_error
Holds SQL to execute if an error occurs. If task produced an error is marked with
ignore_error
then nothing is done.
Note
All chains in pg_timetable are scheduled at the PostgreSQL server time zone. You can change the timezone for the current session when adding new chains, e.g.
SET TIME ZONE 'UTC'; -- Run VACUUM at 00:05 every day in August UTC SELECT timetable.add_job('execute-func', '5 0 * 8 *', 'VACUUM');
H.3.11. Getting started #
A variety of examples can be found in the Samples. If you want to migrate from a different scheduler, you can use scripts from Migration from others schedulers chapter.
H.3.11.1. Add simple job #
In a real world usually it’s enough to use simple jobs. Under this term we understand:
job is a chain with only one task (step) in it;
it doesn’t use complicated logic, but rather simple command;
it doesn’t require complex transaction handling, since one task is implicitely executed as a single transaction.
For such a group of chains we’ve introduced a special function
timetable.add_job()
.
- timetable.add_job(job_name, job_schedule, job_command, ...) RETURNS BIGINT
Creates a simple one-task chain
- Parameters:
job_name (text) – The unique name of the chain and command.
job_schedule (timetable.cron) – Time schedule in сron syntax at Postgres server time zone
job_command (text) – The SQL which will be executed.
job_parameters (jsonb) – Arguments for the chain command. Default:
NULL
.job_kind (timetable.command_kind) – Kind of the command: SQL, PROGRAM or BUILTIN. Default:
SQL
.job_client_name (text) – Specifies which client should execute the chain. Set this to NULL to allow any client. Default:
NULL
.job_max_instances (integer) – The amount of instances that this chain may have running at the same time. Default:
NULL
.job_live (boolean) – Control if the chain may be executed once it reaches its schedule. Default:
TRUE
.job_self_destruct (boolean) – Self destruct the chain after execution. Default:
FALSE
.job_ignore_errors (boolean) – Ignore error during execution. Default:
TRUE
.job_exclusive (boolean) – Execute the chain in the exclusive mode. Default:
FALSE
.
- Returns:
the ID of the created chain
- Return type:
integer
H.3.11.2. Examples #
Run
public.my_func()
at 00:05 every day in August Postgres server time zone:SELECT timetable.add_job('execute-func', '5 0 * 8 *', 'SELECT public.my_func()');
Run VACUUM at minute 23 past every 2nd hour from 0 through 20 every day Postgres server time zone:
SELECT timetable.add_job('run-vacuum', '23 0-20/2 * * *', 'VACUUM');
Refresh materialized view every 2 hours:
SELECT timetable.add_job('refresh-matview', '@every 2 hours', 'REFRESH MATERIALIZED VIEW public.mat_view');
Clear log table after pg_timetable restart:
SELECT timetable.add_job('clear-log', '@reboot', 'TRUNCATE timetable.log');
Reindex at midnight Postgres server time zone on Sundays with reindexdb utility:
using default database under default user (no command line arguments)
SELECT timetable.add_job('reindex', '0 0 * * 7', 'reindexdb', job_kind := 'PROGRAM');
specifying target database and tables, and be verbose
SELECT timetable.add_job('reindex', '0 0 * * 7', 'reindexdb', '["--table=foo", "--dbname=postgres", "--verbose"]'::jsonb, 'PROGRAM');
passing password using environment variable through
bash
shellSELECT timetable.add_job('reindex', '0 0 * * 7', 'bash', '["-c", "PGPASSWORD=5m3R7K4754p4m reindexdb -U postgres -h 192.168.0.221 -v"]'::jsonb, 'PROGRAM');
H.3.12. Samples #
H.3.12.1. Basic #
This sample demonstrates how to create a basic one-step chain with parameters. It uses CTE to directly update the timetable schema tables.
SELECT timetable.add_job( job_name => 'notify every minute', job_schedule => '* * * * *', job_command => 'SELECT pg_notify($1, $2)', job_parameters => '[ "TT_CHANNEL", "Ahoj from SQL base task" ]' :: jsonb, job_kind => 'SQL'::timetable.command_kind, job_client_name => NULL, job_max_instances => 1, job_live => TRUE, job_self_destruct => FALSE, job_ignore_errors => TRUE ) as chain_id;
H.3.12.2. Send email #
This sample demonstrates how to create an advanced email job. It will check if there are emails to send, will send them and log the status of the command execution. You don’t need to setup anything, every parameter can be specified during the chain creation.
DO $$ -- An example for using the SendMail task. DECLARE v_mail_task_id bigint; v_log_task_id bigint; v_chain_id bigint; BEGIN -- Get the chain id INSERT INTO timetable.chain (chain_name, max_instances, live) VALUES ('Send Mail', 1, TRUE) RETURNING chain_id INTO v_chain_id; -- Add SendMail task INSERT INTO timetable.task (chain_id, task_order, kind, command) SELECT v_chain_id, 10, 'BUILTIN', 'SendMail' RETURNING task_id INTO v_mail_task_id; -- Create the parameters for the SensMail task -- "username": The username used for authenticating on the mail server -- "password": The password used for authenticating on the mail server -- "serverhost": The IP address or hostname of the mail server -- "serverport": The port of the mail server -- "senderaddr": The email that will appear as the sender -- "ccaddr": String array of the recipients(Cc) email addresses -- "bccaddr": String array of the recipients(Bcc) email addresses -- "toaddr": String array of the recipients(To) email addresses -- "subject": Subject of the email -- "attachment": String array of the attachments (local file) -- "attachmentdata": Pairs of name and base64-encoded content -- "msgbody": The body of the email INSERT INTO timetable.parameter (task_id, order_id, value) VALUES (v_mail_task_id, 1, '{ "username": "user@example.com", "password": "password", "serverhost": "smtp.example.com", "serverport": 587, "senderaddr": "user@example.com", "ccaddr": ["recipient_cc@example.com"], "bccaddr": ["recipient_bcc@example.com"], "toaddr": ["recipient@example.com"], "subject": "pg_timetable - No Reply", "attachment": ["D:\\Go stuff\\Books\\Concurrency in Go.pdf","report.yaml"], "attachmentdata": [{"name": "File.txt", "base64data": "RmlsZSBDb250ZW50"}], "msgbody": "<b>Hello User,</b> <p>I got some Go books for you enjoy</p> <i>pg_timetable</i>!", "contenttype": "text/html; charset=UTF-8" }'::jsonb); -- Add Log task and make it the last task using `task_order` column (=30) INSERT INTO timetable.task (chain_id, task_order, kind, command) SELECT v_chain_id, 30, 'BUILTIN', 'Log' RETURNING task_id INTO v_log_task_id; -- Add housekeeping task, that will delete sent mail and update parameter for the previous logging task -- Since we're using special add_task() function we don't need to specify the `chain_id`. -- Function will take the same `chain_id` from the parent task, SendMail in this particular case PERFORM timetable.add_task( kind => 'SQL', parent_id => v_mail_task_id, command => format( $query$WITH sent_mail(toaddr) AS (DELETE FROM timetable.parameter WHERE task_id = %s RETURNING value->>sername') INSERT INTO timetable.parameter (task_id, order_id, value) SELECT %s, 1, to_jsonb('Sent emails to: ' || string_agg(sent_mail.toaddr, ';')) FROM sent_mail ON CONFLICT (task_id, order_id) DO UPDATE SET value = EXCLUDED.value$query$, v_mail_task_id, v_log_task_id ), order_delta => 10 ); -- In the end we should have something like this. Note, that even Log task was created earlier it will be executed ter -- due to `task_order` column. -- timetable=> SELECT task_id, chain_id, kind, left(command, 50) FROM timetable.task ORDER BY task_order; -- task_id | chain_id | task_order | kind | left -- ---------+----------+------------+---------+--------------------------------------------------------------- -- 45 | 24 | 10 | BUILTIN | SendMail -- 47 | 24 | 20 | SQL | WITH sent_mail(toaddr) AS (DELETE FROM timetable.p -- 46 | 24 | 30 | BUILTIN | Log -- (3 rows) END; $$ LANGUAGE PLPGSQL;
H.3.12.3. Download, Transform and Import #
This sample demonstrates how to create enhanced three-step chain with parameters. It uses DO statement to directly update the timetable schema tables.
-- Prepare the destination table 'location' CREATE TABLE IF NOT EXISTS city( city text, lat numeric, lng numeric, country text, iso2 text, admin_name text, capital text, population bigint, population_proper bigint); -- An enhanced example consisting of three tasks: -- 1. Download text file from internet using BUILT-IN command -- 2. Remove accents (diacritic signs) from letters using PROGRAM command (can be done with `unaccent` PostgreSQL tension) -- 3. Import text file as CSV file using BUILT-IN command (can be down with `psql -c /copy`) DO $$ DECLARE v_head_id bigint; v_task_id bigint; v_chain_id bigint; BEGIN -- Create the chain with default values executed every minute (NULL == '* * * * *' :: timetable.cron) INSERT INTO timetable.chain (chain_name, live) VALUES ('Download locations and aggregate', TRUE) RETURNING chain_id INTO v_chain_id; -- Step 1. Download file from the server -- Create the chain INSERT INTO timetable.task (chain_id, task_order, kind, command, ignore_error) VALUES (v_chain_id, 1, 'BUILTIN', 'Download', TRUE) RETURNING task_id INTO v_task_id; -- Create the parameters for the step 1: INSERT INTO timetable.parameter (task_id, order_id, value) VALUES (v_task_id, 1, '{ "workersnum": 1, "fileurls": ["https://simplemaps.com/static/data/country-cities/mt/mt.csv"], "destpath": "." }'::jsonb); RAISE NOTICE 'Step 1 completed. Chain added with ID: %; DownloadFile task added with ID: %', v_chain_id, task_id; -- Step 2. Transform Unicode characters into ASCII -- Create the program task to call 'uconv' and name it 'unaccent' INSERT INTO timetable.task (chain_id, task_order, kind, command, ignore_error, task_name) VALUES (v_chain_id, 2, 'PROGRAM', 'uconv', TRUE, 'unaccent') RETURNING task_id INTO v_task_id; -- Create the parameters for the 'unaccent' task. Input and output files in this case -- Under Windows we should call PowerShell instead of "uconv" with command: -- Set-content "orte_ansi.txt" ((Get-content "orte.txt").Normalize("FormD") -replace '\p{M}', '') INSERT INTO timetable.parameter (task_id, order_id, value) VALUES (v_task_id, 1, '["-x", "Latin-ASCII", "-o", "mt_ansi.csv", "mt.csv"]'::jsonb); RAISE NOTICE 'Step 2 completed. Unacent task added with ID: %', v_task_id; -- Step 3. Import ASCII file to PostgreSQL table using "CopyFromFile" built-in command INSERT INTO timetable.task (chain_id, task_order, kind, command) VALUES (v_chain_id, 3, 'BUILTIN', 'CopyFromFile') RETURNING task_id INTO v_task_id; -- Add the parameters for the download task. Execute client side COPY to 'location' from 'orte_ansi.txt' INSERT INTO timetable.parameter (task_id, order_id, value) VALUES (v_task_id, 1, '{"sql": "COPY city FROM STDIN (FORMAT csv, HEADER true)", "filename": "mt_ansi.v" }'::jsonb); RAISE NOTICE 'Step 3 completed. Import task added with ID: %', v_task_id; INSERT INTO timetable.task (chain_id, task_order, kind, command, ignore_error, task_name) VALUES (v_chain_id, 4, 'PROGRAM', 'bash', TRUE, 'remove .csv') RETURNING task_id INTO v_task_id; INSERT INTO timetable.parameter (task_id, order_id, value) VALUES (v_task_id, 1, '["-c", "rm *.csv"]'::jsonb); END; $$ LANGUAGE PLPGSQL;
H.3.12.4. Run tasks in autonomous transaction #
This sample demonstrates how to run special tasks out of chain transaction context. This is useful for special routines and/or non-transactional operations, e.g. CREATE DATABASE, REINDEX, VACUUM, CREATE TABLESPACE, etc.
-- An advanced example showing how to use atutonomous tasks. -- This one-task chain will execute test_proc() procedure. -- Since procedure will make two commits (after f1() and f2()) -- we cannot use it as a regular task, because all regular tasks -- must be executed in the context of a single chain transaction. -- Same rule applies for some other SQL commands, -- e.g. CREATE DATABASE, REINDEX, VACUUM, CREATE TABLESPACE, etc. CREATE OR REPLACE FUNCTION f (msg TEXT) RETURNS void AS $$ BEGIN RAISE notice '%', msg; END; $$ LANGUAGE PLPGSQL; CREATE OR REPLACE PROCEDURE test_proc () AS $$ BEGIN PERFORM f('hey 1'); COMMIT; PERFORM f('hey 2'); COMMIT; END; $$ LANGUAGE PLPGSQL; WITH cte_chain (v_chain_id) AS ( INSERT INTO timetable.chain (chain_name, run_at, max_instances, live, self_destruct) VALUES ( 'call proc() every 10 sec', -- chain_name, '@every 10 seconds', -- run_at, 1, -- max_instances, TRUE, -- live, FALSE -- self_destruct ) RETURNING chain_id ), cte_task(v_task_id) AS ( INSERT INTO timetable.task (chain_id, task_order, kind, command, ignore_error, autonomous) SELECT v_chain_id, 10, 'SQL', 'CALL test_proc()', TRUE, TRUE FROM cte_chain RETURNING task_id ) SELECT v_chain_id, v_task_id FROM cte_task, cte_chain;
H.3.12.5. Shutdown the scheduler and terminate the session #
This sample demonstrates how to shutdown the scheduler using special built-in task. This can be used to control maintenance windows, to restart the scheduler for update purposes, or to stop session before the database should be dropped.
-- This one-task chain (aka job) will terminate pg_timetable session. -- This is useful for maintaining purposes or before database being destroyed. -- One should take care of restarting pg_timetable if needed. SELECT timetable.add_job ( job_name => 'Shutdown pg_timetable session on schedule', job_schedule => '* * 1 * *', job_command => 'Shutdown', job_kind => 'BUILTIN' );
H.3.12.6. Access previous task result code and output from the next task #
This sample demonstrates how to check the result code and output of a previous task. If the last task failed, that is possible only if ignore_error boolean = true is set for that task. Otherwise, a scheduler will stop the chain. This sample shows how to calculate failed, successful, and the total number of tasks executed. Based on these values, we can calculate the success ratio.
WITH cte_chain (v_chain_id) AS ( -- let's create a new chain and add tasks to it later INSERT INTO timetable.chain (chain_name, run_at, max_instances, live) VALUES ('many tasks', '* * * * *', 1, true) RETURNING chain_id ), cte_tasks(v_task_id) AS ( -- now we'll add 500 tasks to the chain, some of them will fail INSERT INTO timetable.task (chain_id, task_order, kind, command, ignore_error) SELECT v_chain_id, g.s, 'SQL', 'SELECT 1.0 / round(random())::int4;', true FROM cte_chain, generate_series(1, 500) AS g(s) RETURNING task_id ), report_task(v_task_id) AS ( -- and the last reporting task will calculate the statistic INSERT INTO timetable.task (chain_id, task_order, kind, command) SELECT v_chain_id, 501, 'SQL', $CMD$DO $$ DECLARE s TEXT; BEGIN WITH report AS ( SELECT count(*) FILTER (WHERE returncode = 0) AS success, count(*) FILTER (WHERE returncode != 0) AS fail, count(*) AS total FROM timetable.execution_log WHERE chain_id = current_setting('pg_timetable.current_chain_id')::bigint AND txid = txid_current() ) SELECT 'Tasks executed:' || total || '; succeeded: ' || success || '; failed: ' || fail || '; ratio: ' || 100.0*success/GREATEST(total,1) INTO s FROM report; RAISE NOTICE '%', s; END; $$ $CMD$ FROM cte_chain RETURNING task_id ) SELECT v_chain_id FROM cte_chain
H.3.13. Migration from others schedulers #
H.3.13.1. Migrate jobs from pg_cron to pg_timetable #
If you want to quickly export jobs scheduled from pg_cron to pg_timetable, you can use this SQL snippet:
SELECT timetable.add_job( job_name => COALESCE(jobname, 'job: ' || command), job_schedule => schedule, job_command => command, job_kind => 'SQL', job_live => active ) FROM cron.job;
The timetable.add_job(), however, has some limitations. First of all, the function will mark the task created as autonomous, specifying scheduler should execute the task out of the chain transaction. It’s not an error, but many autonomous chains may cause some extra connections to be used.
Secondly, database connection parameters are lost for source pg_cron jobs, making all jobs local. To export every information available precisely as possible, use this SQL snippet under the role they were scheduled in pg_cron:
SET ROLE 'scheduler'; -- set the role used by pg_cron WITH cron_chain AS ( SELECT nextval('timetable.chain_chain_id_seq'::regclass) AS cron_id, jobname, schedule, active, command, CASE WHEN database != current_database() OR nodename != 'localhost' OR username != CURRENT_USER OR nodeport != inet_server_port() THEN format('host=%s port=%s dbname=%s user=%s', nodename, nodeport, database, username) END AS connstr FROM cron.job ), cte_chain AS ( INSERT INTO timetable.chain (chain_id, chain_name, run_at, live) SELECT cron_id, COALESCE(jobname, 'cronjob' || cron_id), schedule, active FROM cron_chain ), cte_tasks AS ( INSERT INTO timetable.task (chain_id, task_order, kind, command, database_connection) SELECT cron_id, 1, 'SQL', command, connstr FROM cron_chain RETURNING chain_id, task_id ) SELECT * FROM cte_tasks;
H.3.13.2. Migrate jobs from pgAgent to pg_timetable #
To migrate jobs from pgAgent, please use this script. pgAgent doesn’t have concept of PROGRAM task, thus to emulate BATCH steps, pg_timetable will execute them inside the shell. You may change the shell by editing cte_shell CTE clause.
CREATE OR REPLACE FUNCTION bool_array_to_cron(bool[], start_with int4 DEFAULT 0) RETURNS TEXT AS $$ WITH u AS ( SELECT unnest($1) e, generate_series($2, array_length($1, 1)-1+$2) AS i ) SELECT COALESCE(string_agg(i::text, ','), '*') FROM u WHERE e $$ LANGUAGE sql; WITH cte_shell(shell, cmd_param) AS ( VALUES ('sh', '-c') -- set the shell you want to use for batch steps, e.g. "pwsh -c", "cmd /C" ), pga_schedule AS ( SELECT s.jscjobid, s.jscname, format('%s %s %s %s %s', bool_array_to_cron(s.jscminutes), bool_array_to_cron(s.jschours), bool_array_to_cron(s.jscmonthdays), bool_array_to_cron(s.jscmonths, 1), bool_array_to_cron(s.jscweekdays, 1)) AS schedule FROM pgagent.pga_schedule s WHERE s.jscenabled AND now() < COALESCE(s.jscend, 'infinity'::timestamptz) AND now() > s.jscstart ), pga_chain AS ( SELECT nextval('timetable.chain_chain_id_seq'::regclass) AS chain_id, jobid, format('%s @ %s', jobname, jscname) AS jobname, jobhostagent, jobenabled, schedule FROM pgagent.pga_job JOIN pga_schedule ON jobid = jscjobid ), cte_chain AS ( INSERT INTO timetable.chain (chain_id, chain_name, client_name, run_at, live) SELECT chain_id, jobname, jobhostagent, schedule, jobenabled FROM pga_chain ), pga_step AS ( SELECT c.chain_id, nextval('timetable.task_task_id_seq'::regclass) AS task_id, rank() OVER (ORDER BY jstname) AS jstorder, jstid, jstname, jstenabled, CASE jstkind WHEN 'b' THEN 'PROGRAM' ELSE 'SQL' END AS jstkind, jstcode, COALESCE( NULLIF(jstconnstr, ''), CASE WHEN jstdbname = current_database() THEN NULL WHEN jstdbname > '' THEN 'dbname=' || jstdbname END ) AS jstconnstr, jstonerror != 'f' AS jstignoreerror FROM pga_chain c JOIN pgagent.pga_jobstep js ON c.jobid = js.jstjobid ), cte_tasks AS ( INSERT INTO timetable.task(task_id, chain_id, task_name, task_order, kind, command, database_connection) SELECT task_id, chain_id, jstname, jstorder, jstkind::timetable.command_kind, CASE jstkind WHEN 'SQL' THEN jstcode ELSE sh.shell END, jstconnstr FROM pga_step, cte_shell sh ), cte_parameters AS ( INSERT INTO timetable.parameter (task_id, order_id, value) SELECT task_id, 1, jsonb_build_array(sh.cmd_param, s.jstcode) FROM pga_step s, cte_shell sh WHERE s.jstkind = 'PROGRAM' ) SELECT * FROM pga_chain;
H.3.14. REST API #
pg_timetable has a rich REST API, which can be used by external tools in order to perform start/stop/reinitialize/restarts/reloads, by any kind of tools to perform HTTP health checks, and of course, could also be used for monitoring.
Below you will find the list of pg_timetable REST API endpoints.
H.3.14.1. Health check endpoints #
-
GET /liveness
Always returns HTTP status code
200
, indicating that pg_timetable is running.-
GET /readiness
Returns HTTP status code
200
when the pg_timetable is running, and the scheduler is in the main loop processing chains. If the scheduler connects to the database, creates the database schema, or upgrades it, it will return the HTTP status code503
.
H.3.14.2. Chain management endpoints #
-
GET /startchain?id=<chain-id>
Returns HTTP status code
200
if the chain with the given id can be added to the worker queue. It doesn’t, however, mean the chain execution starts immediately. It is up to the worker to perform load and other checks before starting the chain. In the case of an error, the HTTP status code400
followed by an error message returned.-
GET /stopchain?id=<chain-id>
Returns HTTP status code
200
if the chain with the given id is working at the moment and can be stopped. If the chain is running the cancel signal would be sent immediately. In the case of an error, the HTTP status code400
followed by an error message returned.
H.3.15. Database Schema #
pg_timetable is a database driven application. During the first start the necessary schema is created if absent.
H.3.15.1. Main tables and objects #
CREATE TABLE timetable.chain ( chain_id BIGSERIAL PRIMARY KEY, chain_name TEXT NOT NULL UNIQUE, run_at timetable.cron, max_instances INTEGER, timeout INTEGER DEFAULT 0, live BOOLEAN DEFAULT FALSE, self_destruct BOOLEAN DEFAULT FALSE, exclusive_execution BOOLEAN DEFAULT FALSE, client_name TEXT, on_error TEXT ); COMMENT ON TABLE timetable.chain IS 'Stores information about chains schedule'; COMMENT ON COLUMN timetable.chain.run_at IS 'Extended CRON-style time notation the chain has to be run at'; COMMENT ON COLUMN timetable.chain.max_instances IS 'Number of instances (clients) this chain can run in parallel'; COMMENT ON COLUMN timetable.chain.timeout IS 'Abort any chain that takes more than the specified number of milliseconds'; COMMENT ON COLUMN timetable.chain.live IS 'Indication that the chain is ready to run, set to FALSE to pause execution'; COMMENT ON COLUMN timetable.chain.self_destruct IS 'Indication that this chain will delete itself after successful run'; COMMENT ON COLUMN timetable.chain.exclusive_execution IS 'All parallel chains should be paused while executing this chain'; COMMENT ON COLUMN timetable.chain.client_name IS 'Only client with this name is allowed to run this chain, set to NULL to allow any client'; CREATE TYPE timetable.command_kind AS ENUM ('SQL', 'PROGRAM', 'BUILTIN'); CREATE TABLE timetable.task ( task_id BIGSERIAL PRIMARY KEY, chain_id BIGINT REFERENCES timetable.chain(chain_id) ON UPDATE CASCADE ON DELETE SCADE, task_order DOUBLE PRECISION NOT NULL, task_name TEXT, kind timetable.command_kind NOT NULL DEFAULT 'SQL', command TEXT NOT NULL, run_as TEXT, database_connection TEXT, ignore_error BOOLEAN NOT NULL DEFAULT FALSE, autonomous BOOLEAN NOT NULL DEFAULT FALSE, timeout INTEGER DEFAULT 0 ); COMMENT ON TABLE timetable.task IS 'Holds information about chain elements aka tasks'; COMMENT ON COLUMN timetable.task.chain_id IS 'Link to the chain, if NULL task considered to be disabled'; COMMENT ON COLUMN timetable.task.task_order IS 'Indicates the order of task within a chain'; COMMENT ON COLUMN timetable.task.run_as IS 'Role name to run task as. Uses SET ROLE for SQL commands'; COMMENT ON COLUMN timetable.task.ignore_error IS 'Indicates whether a next task in a chain can be executed regardless of the success of the current one'; COMMENT ON COLUMN timetable.task.kind IS 'Indicates whether "command" is SQL, built-in function or an external program'; COMMENT ON COLUMN timetable.task.command IS 'Contains either an SQL command, or command string to be executed'; COMMENT ON COLUMN timetable.task.timeout IS 'Abort any task within a chain that takes more than the specified number of milliseconds'; -- parameter passing for a chain task CREATE TABLE timetable.parameter( task_id BIGINT REFERENCES timetable.task(task_id) ON UPDATE CASCADE ON DELETE CASCADE, order_id INTEGER CHECK (order_id > 0), value JSONB, PRIMARY KEY (task_id, order_id) ); COMMENT ON TABLE timetable.parameter IS 'Stores parameters passed as arguments to a chain task'; CREATE UNLOGGED TABLE timetable.active_session( client_pid BIGINT NOT NULL, server_pid BIGINT NOT NULL, client_name TEXT NOT NULL, started_at TIMESTAMPTZ DEFAULT now() ); COMMENT ON TABLE timetable.active_session IS 'Stores information about active sessions'; CREATE TYPE timetable.log_type AS ENUM ('DEBUG', 'NOTICE', 'INFO', 'ERROR', 'PANIC', 'USER'); CREATE OR REPLACE FUNCTION timetable.get_client_name(integer) RETURNS TEXT AS $$ SELECT client_name FROM timetable.active_session WHERE server_pid = $1 LIMIT 1 $$ LANGUAGE sql; CREATE TABLE timetable.log ( ts TIMESTAMPTZ DEFAULT now(), pid INTEGER NOT NULL, log_level timetable.log_type NOT NULL, client_name TEXT DEFAULT timetable.get_client_name(pg_backend_pid()), message TEXT, message_data jsonb ); COMMENT ON TABLE timetable.log IS 'Stores log entries of active sessions'; CREATE TABLE timetable.execution_log ( chain_id BIGINT, task_id BIGINT, txid BIGINT NOT NULL, last_run TIMESTAMPTZ DEFAULT now(), finished TIMESTAMPTZ, pid BIGINT, returncode INTEGER, kind timetable.command_kind, command TEXT, output TEXT, client_name TEXT NOT NULL ); COMMENT ON TABLE timetable.execution_log IS 'Stores log entries of executed tasks and chains'; CREATE UNLOGGED TABLE timetable.active_chain( chain_id BIGINT NOT NULL, client_name TEXT NOT NULL, started_at TIMESTAMPTZ DEFAULT now() ); COMMENT ON TABLE timetable.active_chain IS 'Stores information about active chains within session'; CREATE OR REPLACE FUNCTION timetable.try_lock_client_name(worker_pid BIGINT, worker_name TEXT) RETURNS bool AS $CODE$ BEGIN IF pg_is_in_recovery() THEN RAISE NOTICE 'Cannot obtain lock on a replica. Please, use the primary node'; RETURN FALSE; END IF; -- remove disconnected sessions DELETE FROM timetable.active_session WHERE server_pid NOT IN ( SELECT pid FROM pg_catalog.pg_stat_activity WHERE application_name = 'pg_timetable' ); DELETE FROM timetable.active_chain WHERE client_name NOT IN ( SELECT client_name FROM timetable.active_session ); -- check if there any active sessions with the client name but different client pid PERFORM 1 FROM timetable.active_session s WHERE s.client_pid <> worker_pid AND s.client_name = worker_name LIMIT 1; IF FOUND THEN RAISE NOTICE 'Another client is already connected to server with name: %', worker_name; RETURN FALSE; END IF; -- insert current session information INSERT INTO timetable.active_session(client_pid, client_name, server_pid) VALUES (worker_pid, worker_name, backend_pid()); RETURN TRUE; END; $CODE$ STRICT LANGUAGE plpgsql;
H.3.15.2. Jobs related functions #
-- add_task() will add a task to the same chain as the task with `parent_id` CREATE OR REPLACE FUNCTION timetable.add_task( IN kind timetable.command_kind, IN command TEXT, IN parent_id BIGINT, IN order_delta DOUBLE PRECISION DEFAULT 10 ) RETURNS BIGINT AS $$ INSERT INTO timetable.task (chain_id, task_order, kind, command) SELECT chain_id, task_order + $4, $1, $2 FROM timetable.task WHERE task_id = $3 RETURNING task_id $$ LANGUAGE SQL; COMMENT ON FUNCTION timetable.add_task IS 'Add a task to the same chain as the task with parent_id'; -- add_job() will add one-task chain to the system CREATE OR REPLACE FUNCTION timetable.add_job( job_name TEXT, job_schedule timetable.cron, job_command TEXT, job_parameters JSONB DEFAULT NULL, job_kind timetable.command_kind DEFAULT 'SQL'::timetable.command_kind, job_client_name TEXT DEFAULT NULL, job_max_instances INTEGER DEFAULT NULL, job_live BOOLEAN DEFAULT TRUE, job_self_destruct BOOLEAN DEFAULT FALSE, job_ignore_errors BOOLEAN DEFAULT TRUE, job_exclusive BOOLEAN DEFAULT FALSE, job_on_error TEXT DEFAULT NULL ) RETURNS BIGINT AS $$ WITH cte_chain (v_chain_id) AS ( INSERT INTO timetable.chain (chain_name, run_at, max_instances, live, self_destruct, client_name, clusive_execution, on_error) VALUES (job_name, job_schedule,job_max_instances, job_live, job_self_destruct, job_client_name, b_exclusive, job_on_error) RETURNING chain_id ), cte_task(v_task_id) AS ( INSERT INTO timetable.task (chain_id, task_order, kind, command, ignore_error, autonomous) SELECT v_chain_id, 10, job_kind, job_command, job_ignore_errors, TRUE FROM cte_chain RETURNING task_id ), cte_param AS ( INSERT INTO timetable.parameter (task_id, order_id, value) SELECT v_task_id, 1, job_parameters FROM cte_task, cte_chain ) SELECT v_chain_id FROM cte_chain $$ LANGUAGE SQL; COMMENT ON FUNCTION timetable.add_job IS 'Add one-task chain (aka job) to the system'; -- notify_chain_start() will send notification to the worker to start the chain CREATE OR REPLACE FUNCTION timetable.notify_chain_start( chain_id BIGINT, worker_name TEXT, start_delay INTERVAL DEFAULT NULL ) RETURNS void AS $$ SELECT pg_notify( worker_name, format('{"ConfigID": %s, "Command": "START", "Ts": %s, "Delay": %s}', chain_id, EXTRACT(epoch FROM clock_timestamp())::bigint, COALESCE(EXTRACT(epoch FROM start_delay)::bigint, 0) ) ) $$ LANGUAGE SQL; COMMENT ON FUNCTION timetable.notify_chain_start IS 'Send notification to the worker to start the chain'; -- notify_chain_stop() will send notification to the worker to stop the chain CREATE OR REPLACE FUNCTION timetable.notify_chain_stop( chain_id BIGINT, worker_name TEXT ) RETURNS void AS $$ SELECT pg_notify( worker_name, format('{"ConfigID": %s, "Command": "STOP", "Ts": %s}', chain_id, EXTRACT(epoch FROM clock_timestamp())::bigint) ) $$ LANGUAGE SQL; COMMENT ON FUNCTION timetable.notify_chain_stop IS 'Send notification to the worker to stop the chain'; -- move_task_up() will switch the order of the task execution with a previous task within the chain CREATE OR REPLACE FUNCTION timetable.move_task_up(IN task_id BIGINT) RETURNS boolean AS $$ WITH current_task (ct_chain_id, ct_id, ct_order) AS ( SELECT chain_id, task_id, task_order FROM timetable.task WHERE task_id = $1 ), tasks(t_id, t_new_order) AS ( SELECT task_id, COALESCE(LAG(task_order) OVER w, LEAD(task_order) OVER w) FROM timetable.task t, current_task ct WHERE chain_id = ct_chain_id AND (task_order < ct_order OR task_id = ct_id) WINDOW w AS (PARTITION BY chain_id ORDER BY ABS(task_order - ct_order)) LIMIT 2 ), upd AS ( UPDATE timetable.task t SET task_order = t_new_order FROM tasks WHERE tasks.t_id = t.task_id AND tasks.t_new_order IS NOT NULL RETURNING true ) SELECT COUNT(*) > 0 FROM upd $$ LANGUAGE SQL; COMMENT ON FUNCTION timetable.move_task_up IS 'Switch the order of the task execution with a previous task within chain'; -- move_task_down() will switch the order of the task execution with a following task within the chain CREATE OR REPLACE FUNCTION timetable.move_task_down(IN task_id BIGINT) RETURNS boolean AS $$ WITH current_task (ct_chain_id, ct_id, ct_order) AS ( SELECT chain_id, task_id, task_order FROM timetable.task WHERE task_id = $1 ), tasks(t_id, t_new_order) AS ( SELECT task_id, COALESCE(LAG(task_order) OVER w, LEAD(task_order) OVER w) FROM timetable.task t, current_task ct WHERE chain_id = ct_chain_id AND (task_order > ct_order OR task_id = ct_id) WINDOW w AS (PARTITION BY chain_id ORDER BY ABS(task_order - ct_order)) LIMIT 2 ), upd AS ( UPDATE timetable.task t SET task_order = t_new_order FROM tasks WHERE tasks.t_id = t.task_id AND tasks.t_new_order IS NOT NULL RETURNING true ) SELECT COUNT(*) > 0 FROM upd $$ LANGUAGE SQL; COMMENT ON FUNCTION timetable.move_task_down IS 'Switch the order of the task execution with a following task hin the chain'; -- delete_job() will delete the chain and its tasks from the system CREATE OR REPLACE FUNCTION timetable.delete_job(IN job_name TEXT) RETURNS boolean AS $$ WITH del_chain AS (DELETE FROM timetable.chain WHERE chain.chain_name = $1 RETURNING chain_id) SELECT EXISTS(SELECT 1 FROM del_chain) $$ LANGUAGE SQL; COMMENT ON FUNCTION timetable.delete_job IS 'Delete the chain and its tasks from the system'; -- delete_task() will delete the task from a chain CREATE OR REPLACE FUNCTION timetable.delete_task(IN task_id BIGINT) RETURNS boolean AS $$ WITH del_task AS (DELETE FROM timetable.task WHERE task_id = $1 RETURNING task_id) SELECT EXISTS(SELECT 1 FROM del_task) $$ LANGUAGE SQL; COMMENT ON FUNCTION timetable.delete_task IS 'Delete the task from a chain';
H.3.15.3. Сron related functions #
CREATE OR REPLACE FUNCTION timetable.cron_split_to_arrays( cron text, OUT mins integer[], OUT hours integer[], OUT days integer[], OUT months integer[], OUT dow integer[] ) RETURNS record AS $$ DECLARE a_element text[]; i_index integer; a_tmp text[]; tmp_item text; a_range int[]; a_split text[]; a_res integer[]; max_val integer; min_val integer; dimensions constant text[] = '{"minutes", "hours", "days", "months", "days of week"}'; allowed_ranges constant integer[][] = '{{0,59},{0,23},{1,31},{1,12},{0,7}}'; BEGIN a_element := regexp_split_to_array(cron, '\s+'); FOR i_index IN 1..5 LOOP a_res := NULL; a_tmp := string_to_array(a_element[i_index],','); FOREACH tmp_item IN ARRAY a_tmp LOOP IF tmp_item ~ '^[0-9]+$' THEN -- normal integer a_res := array_append(a_res, tmp_item::int); ELSIF tmp_item ~ '^[*]+$' THEN -- '*' any value a_range := array(select generate_series(allowed_ranges[i_index][1], allowed_ranges[i_index][2])); a_res := array_cat(a_res, a_range); ELSIF tmp_item ~ '^[0-9]+[-][0-9]+$' THEN -- '-' range of values a_range := regexp_split_to_array(tmp_item, '-'); a_range := array(select generate_series(a_range[1], a_range[2])); a_res := array_cat(a_res, a_range); ELSIF tmp_item ~ '^[0-9]+[\/][0-9]+$' THEN -- '/' step values a_range := regexp_split_to_array(tmp_item, '/'); a_range := array(select generate_series(a_range[1], allowed_ranges[i_index][2], a_range[2])); a_res := array_cat(a_res, a_range); ELSIF tmp_item ~ '^[0-9-]+[\/][0-9]+$' THEN -- '-' range of values and '/' step values a_split := regexp_split_to_array(tmp_item, '/'); a_range := regexp_split_to_array(a_split[1], '-'); a_range := array(select generate_series(a_range[1], a_range[2], a_split[2]::int)); a_res := array_cat(a_res, a_range); ELSIF tmp_item ~ '^[*]+[\/][0-9]+$' THEN -- '*' any value and '/' step values a_split := regexp_split_to_array(tmp_item, '/'); a_range := array(select generate_series(allowed_ranges[i_index][1], allowed_ranges[i_index][2], a_split[2]::int)); a_res := array_cat(a_res, a_range); ELSE RAISE EXCEPTION 'Value ("%") not recognized', a_element[i_index] USING HINT = 'fields separated by space or tab.'+ 'Values allowed: numbers (value list with ","), '+ 'any value with "*", range of value with "-" and step values with "/"!'; END IF; END LOOP; SELECT ARRAY_AGG(x.val), MIN(x.val), MAX(x.val) INTO a_res, min_val, max_val FROM ( SELECT DISTINCT UNNEST(a_res) AS val ORDER BY val) AS x; IF max_val > allowed_ranges[i_index][2] OR min_val < allowed_ranges[i_index][1] OR a_res IS NULL THEN RAISE EXCEPTION '% is out of range % for %', tmp_item, allowed_ranges[i_index:i_index][:], dimensions[i_index]; END IF; CASE i_index WHEN 1 THEN mins := a_res; WHEN 2 THEN hours := a_res; WHEN 3 THEN days := a_res; WHEN 4 THEN months := a_res; ELSE dow := a_res; END CASE; END LOOP; RETURN; END; $$ LANGUAGE PLPGSQL STRICT; CREATE OR REPLACE FUNCTION timetable.cron_months( from_ts timestamptz, allowed_months int[] ) RETURNS SETOF timestamptz AS $$ WITH am(am) AS (SELECT UNNEST(allowed_months)), genm(ts) AS ( --generated months SELECT date_trunc('month', ts) FROM pg_catalog.generate_series(from_ts, from_ts + INTERVAL '1 year', INTERVAL '1 month') g(ts) ) SELECT ts FROM genm JOIN am ON date_part('month', genm.ts) = am.am $$ LANGUAGE SQL STRICT; CREATE OR REPLACE FUNCTION timetable.cron_days( from_ts timestamptz, allowed_months int[], allowed_days int[], allowed_week_days int[] ) RETURNS SETOF timestamptz AS $$ WITH ad(ad) AS (SELECT UNNEST(allowed_days)), am(am) AS (SELECT * FROM timetable.cron_months(from_ts, allowed_months)), gend(ts) AS ( --generated days SELECT date_trunc('day', ts) FROM am, pg_catalog.generate_series(am.am, am.am + INTERVAL '1 month' - INTERVAL '1 day', -- don't include the same day of the next month INTERVAL '1 day') g(ts) ) SELECT ts FROM gend JOIN ad ON date_part('day', gend.ts) = ad.ad WHERE extract(dow from ts)=ANY(allowed_week_days) $$ LANGUAGE SQL STRICT; CREATE OR REPLACE FUNCTION timetable.cron_times( allowed_hours int[], allowed_minutes int[] ) RETURNS SETOF time AS $$ WITH ah(ah) AS (SELECT UNNEST(allowed_hours)), am(am) AS (SELECT UNNEST(allowed_minutes)) SELECT make_time(ah.ah, am.am, 0) FROM ah CROSS JOIN am $$ LANGUAGE SQL STRICT; CREATE OR REPLACE FUNCTION timetable.cron_runs( from_ts timestamp with time zone, cron text ) RETURNS SETOF timestamptz AS $$ SELECT cd + ct FROM timetable.cron_split_to_arrays(cron) a, timetable.cron_times(a.hours, a.mins) ct CROSS JOIN timetable.cron_days(from_ts, a.months, a.days, a.dow) cd WHERE cd + ct > from_ts ORDER BY 1 ASC; $$ LANGUAGE SQL STRICT; CREATE DOMAIN timetable.cron AS TEXT CHECK( VALUE = '@reboot' OR substr(VALUE, 1, 6) IN ('@every', '@after') AND (substr(VALUE, 7) :: INTERVAL) IS NOT NULL OR VALUE ~ '^(((\d+,)+\d+|(\d+(\/|-)\d+)|(\*(\/|-)\d+)|\d+|\*) +){4}(((\d+,)+\d+|(\d+(\/|-)\d+)|(\*(\/|-)\d+)|\d+|\*) ?)$' AND timetable.cron_split_to_arrays(VALUE) IS NOT NULL ); COMMENT ON DOMAIN timetable.cron IS 'Extended CRON-style notation with support of interval values'; -- is_cron_in_time returns TRUE if timestamp is listed in cron expression CREATE OR REPLACE FUNCTION timetable.is_cron_in_time( run_at timetable.cron, ts timestamptz ) RETURNS BOOLEAN AS $$ SELECT CASE WHEN run_at IS NULL THEN TRUE ELSE date_part('month', ts) = ANY(a.months) AND (date_part('dow', ts) = ANY(a.dow) OR date_part('isodow', ts) = ANY(a.dow)) AND date_part('day', ts) = ANY(a.days) AND date_part('hour', ts) = ANY(a.hours) AND date_part('minute', ts) = ANY(a.mins) END FROM timetable.cron_split_to_arrays(run_at) a $$ LANGUAGE SQL; CREATE OR REPLACE FUNCTION timetable.next_run(cron timetable.cron) RETURNS timestamptz AS $$ SELECT * FROM timetable.cron_runs(now(), cron) LIMIT 1 $$ LANGUAGE SQL STRICT;
H.3.15.4. ER-Diagram #
