J.4. pg_timetable#
J.4. pg_timetable #
- J.4.1. Overview
- J.4.2. Main features
- J.4.3. Quick Start
- J.4.4. Command line options
- J.4.5. Project background
- J.4.6. Installation
- J.4.7. Components
- J.4.8. Database Schema
- J.4.9. Getting started
- J.4.10. Samples
- J.4.11. YAML Chain Configuration Guide
- J.4.12. Migration from others schedulers
- J.4.13. REST API
- J.4.14. YAML Chain Definition Format for pg_timetable
J.4.1. Overview #
Version: 6.2.0
pg_timetable is an advanced job scheduler for Tantor SE, offering many advantages over traditional schedulers such as cron and others. It is completely database driven and provides a couple of advanced concepts.
J.4.2. Main features #
Tasks can be arranged in chains
A chain can consist of built-int commands, SQL and executables
Parameters can be passed to chains
Missed tasks (possibly due to downtime) can be retried automatically
Support for configurable repetitions
Built-in tasks such as sending emails, etc.
Fully database driven configuration
Full support for database driven logging
Cron-style scheduling at the Tantor SE server time zone
Optional concurrency protection
Task and chain can have execution timeout settings
J.4.3. Quick Start #
Download pg_timetable executable
Make sure your Tantor SE server is up and running and has a role with
CREATEprivilege for a target database, e.g.my_database=> CREATE ROLE scheduler PASSWORD 'somestrong'; my_database=> GRANT CREATE ON DATABASE my_database TO scheduler;
Create a new job, e.g. run
VACUUMeach night at 00:30 Postgres server time zonemy_database=> SELECT timetable.add_job('frequent-vacuum', '30 * * * *', 'VACUUM'); add_job --------- 3 (1 row)Run the pg_timetable
# pg_timetable postgresql://scheduler:somestrong@localhost/my_database --clientname=vacuumer
PROFIT!
J.4.4. Command line options #
# ./pg_timetable
Application Options:
-c, --clientname= Unique name for application instance [$PGTT_CLIENTNAME]
--config= YAML configuration file
--no-program-tasks Disable executing of PROGRAM tasks [$PGTT_NOPROGRAMTASKS]
-v, --version Output detailed version information [$PGTT_VERSION]
--connstr PostgreSQL connection string [$PGTT_CONNSTR]
Logging:
--log-level=[debug|info|error] Verbosity level for stdout and log file (default: info)
--log-database-level=[debug|info|error|none] Verbosity level for database storing (default: info)
--log-file= File name to store logs
--log-file-format=[json|text] Format of file logs (default: json)
--log-file-rotate Rotate log files
--log-file-size= Maximum size in MB of the log file before it gets rotated (default: 100)
--log-file-age= Number of days to retain old log files, 0 means forever (default: 0)
--log-file-number= Maximum number of old log files to retain, 0 to retain all (default: 0)
Start:
-f, --file= SQL script file to execute during startup
--init Initialize database schema to the latest version and exit. Can be used
with --upgrade
--upgrade Upgrade database to the latest version
--debug Run in debug mode. Only asynchronous chains will be executed
Resource:
--cron-workers= Number of parallel workers for scheduled chains (default: 16)
--interval-workers= Number of parallel workers for interval chains (default: 16)
--chain-timeout= Abort any chain that takes more than the specified number of
milliseconds
--task-timeout= Abort any task within a chain that takes more than the specified number
of milliseconds
REST:
--rest-port= REST API port (default: 0) [$PGTT_RESTPORT]
J.4.5. Project background #
The pg_timetable project got started back in 2019 for internal scheduling needs at Cybertec.
For more background on the project motivations and design goals see the original series of blogposts announcing the project and the following feature updates.
Cybertec also provides commercial 9-to-5 and 24/7 support for pg_timetable.
J.4.5.1. Project feedback #
For feature requests or troubleshooting assistance please open an issue on project’s Github page.
J.4.6. Installation #
pg_timetable is compatible with all supported Tantor SE versions.
J.4.6.1. Official release packages #
You may find binary package for your platform on the official Releases page.
J.4.6.2. Docker #
The official docker image can be found here: https://hub.docker.com/r/cybertecpostgresql/pg_timetable
Note
The latest tag is up to date with the master
branch thanks to
this
github action. In production you probably want to use the
latest
stable
tag.
Run pg_timetable in Docker:
docker run --rm \ cybertecpostgresql/pg_timetable:latest \ -h 10.0.0.3 -p 54321 -c worker001
Run pg_timetable in Docker with Environment variables:
docker run --rm \ -e PGTT_PGHOST=10.0.0.3 \ -e PGTT_PGPORT=54321 \ cybertecpostgresql/pg_timetable:latest \ -c worker001
J.4.7. Components #
The scheduling in pg_timetable encompasses three different abstraction levels to facilitate the reuse with other parameters or additional schedules.
- Command:
The base level, command, defines what to do.
- Task:
The second level, task, represents a chain element (step) to run one of the commands. With tasks we define order of commands, arguments passed (if any), and how errors are handled.
- Chain:
The third level represents a connected tasks forming a chain of tasks. Chain defines if, when, and how often a job should be executed.
J.4.7.1. Command #
Currently, there are three different kinds of commands:
-
SQL SQL snippet. Starting a cleanup, refreshing a materialized view or processing data.
-
PROGRAM External Command. Anything that can be called as an external binary, including shells, e.g.
bash,pwsh, etc. The external command will be called using golang’s exec.CommandContext.-
BUILTIN Internal Command. A prebuilt functionality included in pg_timetable. These include:
NoOp
Sleep
Log
SendMail
Download
CopyFromFile
CopyToFile
CopyFromProgram
CopyToProgram
Shutdown
J.4.7.2. Task #
The next building block is a task, which simply represents a step in a list of chain commands. An example of tasks combined in a chain would be:
Download files from a server
Import files
Run aggregations
Build report
Remove the files from disk
Note
All tasks of the chain in
pg_timetable are executed
within one transaction. However, please, pay attention there is
no opportunity to rollback PROGRAM and
BUILTIN tasks.
J.4.7.2.1. Table timetable.task #
| Field | Type | Description |
|---|---|---|
chain_id | bigint | Link to the chain, if NULL task considered to be disabled |
task_order | DOUBLE PRECISION | Indicates the order of task within a chain |
kind | timetable.command_kind | The type of the command. Can be SQL (default), PROGRAM or BUILTIN |
command | text | Contains either a SQL command, a path to application or name of the BUILTIN command which will be executed |
run_as | text | The role as which the task should be executed as |
database_connection | text | The connection string for the external database that should be used |
ignore_error | boolean | Specify if the next task should proceed after encountering an error (default: false) |
autonomous | boolean | Specify if the task should be executed out of the chain transaction. Useful for VACUUM, CREATE DATABASE, CALL etc. |
timeout | integer | Abort any task within a chain that takes more than the specified number of milliseconds |
Warning
If the "task has been configured with ignore_error set to true (the default value is false), the worker process will report a success on execution even if the task within the chain fails.
As mentioned above, "commands are simple skeletons (e.g. send email, vacuum, etc.). In most cases, they have to be brought to live by passing input parameters to the execution.
J.4.7.2.2. Table timetable.parameter #
| Field | Type | Description |
|---|---|---|
task_id | bigint | The ID of the task |
order_id | integer | The order of the parameter. Several parameters are processed one by one according to the order |
value | jsonb | A JSON value containing the parameters |
J.4.7.2.3. Parameter value format #
Depending on the "command kind argument can be represented by different JSON values.
J.4.7.2.3.2. PROGRAM #
Schema: array of strings
Example:
'["-x", "Latin-ASCII", "-o", "orte_ansi.txt", "orte.txt"]'::jsonb
J.4.7.2.3.5. BUILTIN: SendMail #
Schema: object
Example:
'{
"username": "user@example.com",
"password": "password",
"serverhost": "smtp.example.com",
"serverport": 587,
"senderaddr": "user@example.com",
"ccaddr": ["recipient_cc@example.com"],
"bccaddr": ["recipient_bcc@example.com"],
"toaddr": ["recipient@example.com"],
"subject": "pg_timetable - No Reply",
"attachment": ["/temp/attachments/Report.pdf","config.yaml"],
"attachmentdata": [{"name": "File.txt", "base64data": "RmlsZSBDb250ZW50"}],
"msgbody": "<h2>Hello User,</h2> <p>check some attachments!</p>",
"contenttype": "text/html; charset=UTF-8"
}'::jsonb
J.4.7.2.3.6. BUILTIN: Download #
Schema: object
Example:
'{
"workersnum": 2,
"fileurls": ["http://example.com/foo.gz", "https://example.com/bar.csv"],
"destpath": "."
}'::jsonb
J.4.7.2.3.7. BUILTIN: CopyFromFile #
Schema: object
Example:
'{
"sql": "COPY location FROM STDIN",
"filename": "download/orte_ansi.txt"
}'::jsonb
J.4.7.2.3.8. BUILTIN: CopyToFile #
Schema: object
Example:
'{
"sql": "COPY location TO STDOUT",
"filename": "download/location.txt"
}'::jsonb
J.4.7.2.3.9. BUILTIN: CopyToProgram #
Schema: object
Example:
'{
"sql": "COPY location TO STDOUT",
"cmd": "sh",
"args": ["gzip", "-c", ">", "/tmp/output.gz"]
}'::jsonb
J.4.7.2.3.10. BUILTIN: CopyFromProgram #
Schema: object
Example:
'{
"sql": "COPY location FROM STDIN",
"cmd": "gunzip",
"args": ["-c", "/tmp/data.gz"]
}'::jsonb
J.4.7.2.3.11. BUILTIN: Shutdown #
value ignored
J.4.7.2.3.12. BUILTIN: NoOp #
value ignored
J.4.7.3. Chain #
Once tasks have been arranged, they have to be scheduled as a "chain. For this, "pg_timetable builds upon the enhanced "cron-string, all the while adding multiple configuration options.
J.4.7.3.1. Table timetable.chain #
| Field | Type | Description |
|---|---|---|
chain_name | text | The unique name of the chain |
run_at | timetable.cron | Standard cron-style value at Postgres server time zone or @after, @every, @reboot clause |
max_instances | integer | The amount of instances that this chain may have running at the same time |
timeout | integer | Abort any chain that takes more than the specified number of milliseconds |
live | boolean | Control if the chain may be executed once it reaches its schedule |
self_destruct | boolean | Self destruct the chain after successful execution. Failed chains will be executed according to the schedule one more time |
exclusive_execution | boolean | Specifies whether the chain should be executed exclusively while all other chains are paused |
client_name | text | Specifies which client should execute the chain. Set this to NULL to allow any client |
timeout | integer | Abort a chain that takes more than the specified number of milliseconds |
on_error | — | Holds SQL to execute if an error occurs. If task produced an error is marked with ignore_error then nothing is done |
Note
All chains in "pg_timetable are scheduled at the PostgreSQL server time zone. You can change the timezone for the "current session when adding new chains, e.g.
SET TIME ZONE 'UTC';
-- Run VACUUM at 00:05 every day in August UTC
SELECT timetable.add_job('execute-func', '5 0 * 8 *', 'VACUUM');
J.4.8. Database Schema #
pg_timetable is a database driven application. During the first start the necessary schema is created if absent.
J.4.8.1. Main tables and objects #
CREATE TABLE timetable.chain (
chain_id BIGSERIAL PRIMARY KEY,
chain_name TEXT NOT NULL UNIQUE,
run_at timetable.cron,
max_instances INTEGER,
timeout INTEGER DEFAULT 0,
live BOOLEAN DEFAULT FALSE,
self_destruct BOOLEAN DEFAULT FALSE,
exclusive_execution BOOLEAN DEFAULT FALSE,
client_name TEXT,
on_error TEXT
);
COMMENT ON TABLE timetable.chain IS
'Stores information about chains schedule';
COMMENT ON COLUMN timetable.chain.run_at IS
'Extended CRON-style time notation the chain has to be run at';
COMMENT ON COLUMN timetable.chain.max_instances IS
'Number of instances (clients) this chain can run in parallel';
COMMENT ON COLUMN timetable.chain.timeout IS
'Abort any chain that takes more than the specified number of milliseconds';
COMMENT ON COLUMN timetable.chain.live IS
'Indication that the chain is ready to run, set to FALSE to pause execution';
COMMENT ON COLUMN timetable.chain.self_destruct IS
'Indication that this chain will delete itself after successful run';
COMMENT ON COLUMN timetable.chain.exclusive_execution IS
'All parallel chains should be paused while executing this chain';
COMMENT ON COLUMN timetable.chain.client_name IS
'Only client with this name is allowed to run this chain, set to NULL to allow any client';
CREATE TYPE timetable.command_kind AS ENUM ('SQL', 'PROGRAM', 'BUILTIN');
CREATE TABLE timetable.task (
task_id BIGSERIAL PRIMARY KEY,
chain_id BIGINT REFERENCES timetable.chain(chain_id) ON UPDATE CASCADE ON DELETE SCADE,
task_order DOUBLE PRECISION NOT NULL,
task_name TEXT,
kind timetable.command_kind NOT NULL DEFAULT 'SQL',
command TEXT NOT NULL,
run_as TEXT,
database_connection TEXT,
ignore_error BOOLEAN NOT NULL DEFAULT FALSE,
autonomous BOOLEAN NOT NULL DEFAULT FALSE,
timeout INTEGER DEFAULT 0
);
COMMENT ON TABLE timetable.task IS
'Holds information about chain elements aka tasks';
COMMENT ON COLUMN timetable.task.chain_id IS
'Link to the chain, if NULL task considered to be disabled';
COMMENT ON COLUMN timetable.task.task_order IS
'Indicates the order of task within a chain';
COMMENT ON COLUMN timetable.task.run_as IS
'Role name to run task as. Uses SET ROLE for SQL commands';
COMMENT ON COLUMN timetable.task.ignore_error IS
'Indicates whether a next task in a chain can be executed regardless of the success of the current one';
COMMENT ON COLUMN timetable.task.kind IS
'Indicates whether "command" is SQL, built-in function or an external program';
COMMENT ON COLUMN timetable.task.command IS
'Contains either an SQL command, or command string to be executed';
COMMENT ON COLUMN timetable.task.timeout IS
'Abort any task within a chain that takes more than the specified number of milliseconds';
COMMENT ON COLUMN timetable.task.autonomous IS
'Specify if the task should be executed out of the chain transaction. Useful for VACUUM, CREATE DATABASE, CALL etc.';
-- parameter passing for a chain task
CREATE TABLE timetable.parameter(
task_id BIGINT REFERENCES timetable.task(task_id)
ON UPDATE CASCADE ON DELETE CASCADE,
order_id INTEGER CHECK (order_id > 0),
value JSONB,
PRIMARY KEY (task_id, order_id)
);
COMMENT ON TABLE timetable.parameter IS
'Stores parameters passed as arguments to a chain task';
CREATE UNLOGGED TABLE timetable.active_session(
client_pid BIGINT NOT NULL,
server_pid BIGINT NOT NULL,
client_name TEXT NOT NULL,
started_at TIMESTAMPTZ DEFAULT now()
);
COMMENT ON TABLE timetable.active_session IS
'Stores information about active sessions';
CREATE TYPE timetable.log_type AS ENUM ('DEBUG', 'NOTICE', 'INFO', 'ERROR', 'PANIC', 'USER');
CREATE OR REPLACE FUNCTION timetable.get_client_name(integer) RETURNS TEXT AS
$$
SELECT client_name FROM timetable.active_session WHERE server_pid = $1 LIMIT 1
$$
LANGUAGE sql;
CREATE TABLE timetable.log
(
ts TIMESTAMPTZ DEFAULT now(),
pid INTEGER NOT NULL,
log_level timetable.log_type NOT NULL,
client_name TEXT DEFAULT timetable.get_client_name(pg_backend_pid()),
message TEXT,
message_data jsonb
);
COMMENT ON TABLE timetable.log IS
'Stores log entries of active sessions';
CREATE TABLE timetable.execution_log (
chain_id BIGINT,
task_id BIGINT,
txid BIGINT NOT NULL,
last_run TIMESTAMPTZ DEFAULT now(),
finished TIMESTAMPTZ,
pid BIGINT,
returncode INTEGER,
ignore_error BOOLEAN,
kind timetable.command_kind,
command TEXT,
output TEXT,
client_name TEXT NOT NULL
);
COMMENT ON TABLE timetable.execution_log IS
'Stores log entries of executed tasks and chains';
CREATE UNLOGGED TABLE timetable.active_chain(
chain_id BIGINT NOT NULL,
client_name TEXT NOT NULL,
started_at TIMESTAMPTZ DEFAULT now()
);
COMMENT ON TABLE timetable.active_chain IS
'Stores information about active chains within session';
CREATE OR REPLACE FUNCTION timetable.try_lock_client_name(worker_pid BIGINT, worker_name TEXT)
RETURNS bool AS
$CODE$
BEGIN
IF pg_is_in_recovery() THEN
RAISE NOTICE 'Cannot obtain lock on a replica. Please, use the primary node';
RETURN FALSE;
END IF;
-- remove disconnected sessions
DELETE
FROM timetable.active_session
WHERE server_pid NOT IN (
SELECT pid
FROM pg_catalog.pg_stat_activity
WHERE application_name = 'pg_timetable'
);
DELETE
FROM timetable.active_chain
WHERE client_name NOT IN (
SELECT client_name FROM timetable.active_session
);
-- check if there any active sessions with the client name but different client pid
PERFORM 1
FROM timetable.active_session s
WHERE
s.client_pid <> worker_pid
AND s.client_name = worker_name
LIMIT 1;
IF FOUND THEN
RAISE NOTICE 'Another client is already connected to server with name: %', worker_name;
RETURN FALSE;
END IF;
-- insert current session information
INSERT INTO timetable.active_session(client_pid, client_name, server_pid) VALUES (worker_pid, worker_name, backend_pid());
RETURN TRUE;
END;
$CODE$
STRICT
LANGUAGE plpgsql;
J.4.8.2. Jobs related functions #
-- add_task() will add a task to the same chain as the task with `parent_id`
CREATE OR REPLACE FUNCTION timetable.add_task(
IN kind timetable.command_kind,
IN command TEXT,
IN parent_id BIGINT,
IN order_delta DOUBLE PRECISION DEFAULT 10
) RETURNS BIGINT AS $$
INSERT INTO timetable.task (chain_id, task_order, kind, command)
SELECT chain_id, task_order + $4, $1, $2 FROM timetable.task WHERE task_id = $3
RETURNING task_id
$$ LANGUAGE SQL;
COMMENT ON FUNCTION timetable.add_task IS 'Add a task to the same chain as the task with parent_id';
-- add_job() will add one-task chain to the system
CREATE OR REPLACE FUNCTION timetable.add_job(
job_name TEXT,
job_schedule timetable.cron,
job_command TEXT,
job_parameters JSONB DEFAULT NULL,
job_kind timetable.command_kind DEFAULT 'SQL'::timetable.command_kind,
job_client_name TEXT DEFAULT NULL,
job_max_instances INTEGER DEFAULT NULL,
job_live BOOLEAN DEFAULT TRUE,
job_self_destruct BOOLEAN DEFAULT FALSE,
job_ignore_errors BOOLEAN DEFAULT TRUE,
job_exclusive BOOLEAN DEFAULT FALSE,
job_on_error TEXT DEFAULT NULL
) RETURNS BIGINT AS $$
WITH
cte_chain (v_chain_id) AS (
INSERT INTO timetable.chain (chain_name, run_at, max_instances, live, self_destruct, client_name, clusive_execution, on_error)
VALUES (job_name, job_schedule,job_max_instances, job_live, job_self_destruct, job_client_name, b_exclusive, job_on_error)
RETURNING chain_id
),
cte_task(v_task_id) AS (
INSERT INTO timetable.task (chain_id, task_order, kind, command, ignore_error, autonomous)
SELECT v_chain_id, 10, job_kind, job_command, job_ignore_errors, TRUE
FROM cte_chain
RETURNING task_id
),
cte_param AS (
INSERT INTO timetable.parameter (task_id, order_id, value)
SELECT v_task_id, 1, job_parameters FROM cte_task, cte_chain
)
SELECT v_chain_id FROM cte_chain
$$ LANGUAGE SQL;
COMMENT ON FUNCTION timetable.add_job IS 'Add one-task chain (aka job) to the system';
-- notify_chain_start() will send notification to the worker to start the chain
CREATE OR REPLACE FUNCTION timetable.notify_chain_start(
chain_id BIGINT,
worker_name TEXT,
start_delay INTERVAL DEFAULT NULL
) RETURNS void AS $$
SELECT pg_notify(
worker_name,
format('{"ConfigID": %s, "Command": "START", "Ts": %s, "Delay": %s}',
chain_id,
EXTRACT(epoch FROM clock_timestamp())::bigint,
COALESCE(EXTRACT(epoch FROM start_delay)::bigint, 0)
)
)
$$ LANGUAGE SQL;
COMMENT ON FUNCTION timetable.notify_chain_start IS 'Send notification to the worker to start the chain';
-- notify_chain_stop() will send notification to the worker to stop the chain
CREATE OR REPLACE FUNCTION timetable.notify_chain_stop(
chain_id BIGINT,
worker_name TEXT
) RETURNS void AS $$
SELECT pg_notify(
worker_name,
format('{"ConfigID": %s, "Command": "STOP", "Ts": %s}',
chain_id,
EXTRACT(epoch FROM clock_timestamp())::bigint)
)
$$ LANGUAGE SQL;
COMMENT ON FUNCTION timetable.notify_chain_stop IS 'Send notification to the worker to stop the chain';
-- move_task_up() will switch the order of the task execution with a previous task within the chain
CREATE OR REPLACE FUNCTION timetable.move_task_up(IN task_id BIGINT) RETURNS boolean AS $$
WITH current_task (ct_chain_id, ct_id, ct_order) AS (
SELECT chain_id, task_id, task_order FROM timetable.task WHERE task_id = $1
),
tasks(t_id, t_new_order) AS (
SELECT task_id, COALESCE(LAG(task_order) OVER w, LEAD(task_order) OVER w)
FROM timetable.task t, current_task ct
WHERE chain_id = ct_chain_id AND (task_order < ct_order OR task_id = ct_id)
WINDOW w AS (PARTITION BY chain_id ORDER BY ABS(task_order - ct_order))
LIMIT 2
),
upd AS (
UPDATE timetable.task t SET task_order = t_new_order
FROM tasks WHERE tasks.t_id = t.task_id AND tasks.t_new_order IS NOT NULL
RETURNING true
)
SELECT COUNT(*) > 0 FROM upd
$$ LANGUAGE SQL;
COMMENT ON FUNCTION timetable.move_task_up IS 'Switch the order of the task execution with a previous task within chain';
-- move_task_down() will switch the order of the task execution with a following task within the chain
CREATE OR REPLACE FUNCTION timetable.move_task_down(IN task_id BIGINT) RETURNS boolean AS $$
WITH current_task (ct_chain_id, ct_id, ct_order) AS (
SELECT chain_id, task_id, task_order FROM timetable.task WHERE task_id = $1
),
tasks(t_id, t_new_order) AS (
SELECT task_id, COALESCE(LAG(task_order) OVER w, LEAD(task_order) OVER w)
FROM timetable.task t, current_task ct
WHERE chain_id = ct_chain_id AND (task_order > ct_order OR task_id = ct_id)
WINDOW w AS (PARTITION BY chain_id ORDER BY ABS(task_order - ct_order))
LIMIT 2
),
upd AS (
UPDATE timetable.task t SET task_order = t_new_order
FROM tasks WHERE tasks.t_id = t.task_id AND tasks.t_new_order IS NOT NULL
RETURNING true
)
SELECT COUNT(*) > 0 FROM upd
$$ LANGUAGE SQL;
COMMENT ON FUNCTION timetable.move_task_down IS 'Switch the order of the task execution with a following task hin the chain';
-- delete_task() will delete the task from a chain
CREATE OR REPLACE FUNCTION timetable.delete_task(IN task_id BIGINT) RETURNS boolean AS $$
WITH del_task AS (DELETE FROM timetable.task WHERE task_id = $1 RETURNING task_id)
SELECT EXISTS(SELECT 1 FROM del_task)
$$ LANGUAGE SQL;
COMMENT ON FUNCTION timetable.delete_task IS 'Delete the task from a chain';
-- delete_job() will delete the chain and its tasks from the system
CREATE OR REPLACE FUNCTION timetable.delete_job(IN job_name TEXT) RETURNS boolean AS $$
WITH del_chain AS (DELETE FROM timetable.chain WHERE chain.chain_name = $1 RETURNING chain_id)
SELECT EXISTS(SELECT 1 FROM del_chain)
$$ LANGUAGE SQL;
COMMENT ON FUNCTION timetable.delete_job IS 'Delete the chain and its tasks from the system';
-- pause_job() will pause the chain (set live = false)
CREATE OR REPLACE FUNCTION timetable.pause_job(IN job_name TEXT) RETURNS boolean AS $$
WITH upd_chain AS (UPDATE timetable.chain SET live = false WHERE chain.chain_name = $1 RETURNING chain_id)
SELECT EXISTS(SELECT 1 FROM upd_chain)
$$ LANGUAGE SQL;
COMMENT ON FUNCTION timetable.pause_job IS 'Pause the chain (set live = false)';
-- resume_job() will resume the chain (set live = true)
CREATE OR REPLACE FUNCTION timetable.resume_job(IN job_name TEXT) RETURNS boolean AS $$
WITH upd_chain AS (UPDATE timetable.chain SET live = true WHERE chain.chain_name = $1 RETURNING chain_id)
SELECT EXISTS(SELECT 1 FROM upd_chain)
$$ LANGUAGE SQL;
COMMENT ON FUNCTION timetable.resume_job IS 'Resume the chain (set live = true)';
J.4.8.3. Сron related functions #
CREATE OR REPLACE FUNCTION timetable.cron_split_to_arrays(
cron text,
OUT mins integer[],
OUT hours integer[],
OUT days integer[],
OUT months integer[],
OUT dow integer[]
) RETURNS record AS $$
DECLARE
a_element text[];
i_index integer;
a_tmp text[];
tmp_item text;
a_range int[];
a_split text[];
a_res integer[];
max_val integer;
min_val integer;
dimensions constant text[] = '{"minutes", "hours", "days", "months", "days of week"}';
allowed_ranges constant integer[][] = '{{0,59},{0,23},{1,31},{1,12},{0,7}}';
BEGIN
a_element := regexp_split_to_array(cron, '\s+');
FOR i_index IN 1..5 LOOP
a_res := NULL;
a_tmp := string_to_array(a_element[i_index],',');
FOREACH tmp_item IN ARRAY a_tmp LOOP
IF tmp_item ~ '^[0-9]+$' THEN -- normal integer
a_res := array_append(a_res, tmp_item::int);
ELSIF tmp_item ~ '^[*]+$' THEN -- '*' any value
a_range := array(select generate_series(allowed_ranges[i_index][1], allowed_ranges[i_index][2]));
a_res := array_cat(a_res, a_range);
ELSIF tmp_item ~ '^[0-9]+[-][0-9]+$' THEN -- '-' range of values
a_range := regexp_split_to_array(tmp_item, '-');
a_range := array(select generate_series(a_range[1], a_range[2]));
a_res := array_cat(a_res, a_range);
ELSIF tmp_item ~ '^[0-9]+[\/][0-9]+$' THEN -- '/' step values
a_range := regexp_split_to_array(tmp_item, '/');
a_range := array(select generate_series(a_range[1], allowed_ranges[i_index][2], a_range[2]));
a_res := array_cat(a_res, a_range);
ELSIF tmp_item ~ '^[0-9-]+[\/][0-9]+$' THEN -- '-' range of values and '/' step values
a_split := regexp_split_to_array(tmp_item, '/');
a_range := regexp_split_to_array(a_split[1], '-');
a_range := array(select generate_series(a_range[1], a_range[2], a_split[2]::int));
a_res := array_cat(a_res, a_range);
ELSIF tmp_item ~ '^[*]+[\/][0-9]+$' THEN -- '*' any value and '/' step values
a_split := regexp_split_to_array(tmp_item, '/');
a_range := array(select generate_series(allowed_ranges[i_index][1], allowed_ranges[i_index][2], a_split[2]::int));
a_res := array_cat(a_res, a_range);
ELSE
RAISE EXCEPTION 'Value ("%") not recognized', a_element[i_index]
USING HINT = 'fields separated by space or tab.'+
'Values allowed: numbers (value list with ","), '+
'any value with "*", range of value with "-" and step values with "/"!';
END IF;
END LOOP;
SELECT
ARRAY_AGG(x.val), MIN(x.val), MAX(x.val) INTO a_res, min_val, max_val
FROM (
SELECT DISTINCT UNNEST(a_res) AS val ORDER BY val) AS x;
IF max_val > allowed_ranges[i_index][2] OR min_val < allowed_ranges[i_index][1] OR a_res IS NULL THEN
RAISE EXCEPTION '% is out of range % for %', tmp_item, allowed_ranges[i_index:i_index][:], dimensions[i_index];
END IF;
CASE i_index
WHEN 1 THEN mins := a_res;
WHEN 2 THEN hours := a_res;
WHEN 3 THEN days := a_res;
WHEN 4 THEN months := a_res;
ELSE
dow := a_res;
END CASE;
END LOOP;
RETURN;
END;
$$ LANGUAGE PLPGSQL STRICT;
CREATE OR REPLACE FUNCTION timetable.cron_months(
from_ts timestamptz,
allowed_months int[]
) RETURNS SETOF timestamptz AS $$
WITH
am(am) AS (SELECT UNNEST(allowed_months)),
genm(ts) AS ( --generated months
SELECT date_trunc('month', ts)
FROM pg_catalog.generate_series(from_ts, from_ts + INTERVAL '1 year', INTERVAL '1 month') g(ts)
)
SELECT ts FROM genm JOIN am ON date_part('month', genm.ts) = am.am
$$ LANGUAGE SQL STRICT;
CREATE OR REPLACE FUNCTION timetable.cron_days(
from_ts timestamptz,
allowed_months int[],
allowed_days int[],
allowed_week_days int[]
) RETURNS SETOF timestamptz AS $$
WITH
ad(ad) AS (SELECT UNNEST(allowed_days)),
am(am) AS (SELECT * FROM timetable.cron_months(from_ts, allowed_months)),
gend(ts) AS ( --generated days
SELECT date_trunc('day', ts)
FROM am,
pg_catalog.generate_series(am.am, am.am + INTERVAL '1 month'
- INTERVAL '1 day', -- don't include the same day of the next month
INTERVAL '1 day') g(ts)
)
SELECT ts
FROM gend JOIN ad ON date_part('day', gend.ts) = ad.ad
WHERE extract(dow from ts)=ANY(allowed_week_days)
$$ LANGUAGE SQL STRICT;
CREATE OR REPLACE FUNCTION timetable.cron_times(
allowed_hours int[],
allowed_minutes int[]
) RETURNS SETOF time AS $$
WITH
ah(ah) AS (SELECT UNNEST(allowed_hours)),
am(am) AS (SELECT UNNEST(allowed_minutes))
SELECT make_time(ah.ah, am.am, 0) FROM ah CROSS JOIN am
$$ LANGUAGE SQL STRICT;
CREATE OR REPLACE FUNCTION timetable.cron_runs(
from_ts timestamp with time zone,
cron text
) RETURNS SETOF timestamptz AS $$
SELECT cd + ct
FROM
timetable.cron_split_to_arrays(cron) a,
timetable.cron_times(a.hours, a.mins) ct CROSS JOIN
timetable.cron_days(from_ts, a.months, a.days, a.dow) cd
WHERE cd + ct > from_ts
ORDER BY 1 ASC;
$$ LANGUAGE SQL STRICT;
CREATE DOMAIN timetable.cron AS TEXT CHECK(
VALUE = '@reboot'
OR substr(VALUE, 1, 6) IN ('@every', '@after')
AND (substr(VALUE, 7) :: INTERVAL) IS NOT NULL
OR VALUE ~ '^(((\d+,)+\d+|(\d+(\/|-)\d+)|(\*(\/|-)\d+)|\d+|\*) +){4}(((\d+,)+\d+|(\d+(\/|-)\d+)|(\*(\/|-)\d+)|\d+|\*) ?)$'
AND timetable.cron_split_to_arrays(VALUE) IS NOT NULL
);
COMMENT ON DOMAIN timetable.cron IS 'Extended CRON-style notation with support of interval values';
-- is_cron_in_time returns TRUE if timestamp is listed in cron expression
CREATE OR REPLACE FUNCTION timetable.is_cron_in_time(
run_at timetable.cron,
ts timestamptz
) RETURNS BOOLEAN AS $$
SELECT
CASE WHEN run_at IS NULL THEN
TRUE
ELSE
date_part('month', ts) = ANY(a.months)
AND (date_part('dow', ts) = ANY(a.dow) OR date_part('isodow', ts) = ANY(a.dow))
AND date_part('day', ts) = ANY(a.days)
AND date_part('hour', ts) = ANY(a.hours)
AND date_part('minute', ts) = ANY(a.mins)
END
FROM
timetable.cron_split_to_arrays(run_at) a
$$ LANGUAGE SQL;
CREATE OR REPLACE FUNCTION timetable.next_run(cron timetable.cron) RETURNS timestamptz AS $$
SELECT * FROM timetable.cron_runs(now(), cron) LIMIT 1
$$ LANGUAGE SQL STRICT;
J.4.8.4. ER-Diagram #

J.4.9. Getting started #
A variety of examples can be found in the samples. If you want to migrate from a different scheduler, you can use scripts from migration chapter.
J.4.9.1. Add simple job #
In a real world usually it's enough to use simple jobs. Under this term we understand:
job is a chain with only one "task (step) in it;
it doesn't use complicated logic, but rather simple "command;
it doesn't require complex transaction handling, since one task is implicitely executed as a single transaction.
For such a group of chains we've introduced a special function timetable.add_job().
J.4.9.1.1. Function: timetable.add_job() #
Creates a simple one-task chain
"Returns: BIGINT
J.4.9.1.1.1. Parameters #
| Parameter | Type | Description | Default |
|---|---|---|---|
job_name | text | The unique name of the "chain and "command | Required |
job_schedule | timetable.cron | Time schedule in сron syntax at Postgres server time zone | Required |
job_command | text | The SQL which will be executed | Required |
job_parameters | jsonb | Arguments for the chain "command | NULL |
job_kind | timetable.command_kind | Kind of the command: SQL, PROGRAM or BUILTIN | SQL |
job_client_name | text | Specifies which client should execute the chain. Set this to NULL to allow any client | NULL |
job_max_instances | integer | The amount of instances that this chain may have running at the same time | NULL |
job_live | boolean | Control if the chain may be executed once it reaches its schedule | TRUE |
job_self_destruct | boolean | Self destruct the chain after execution | FALSE |
job_ignore_errors | boolean | Ignore error during execution | TRUE |
job_exclusive | boolean | Execute the chain in the exclusive mode | FALSE |
"Returns: the ID of the created chain
J.4.9.2. Examples #
Run
public.my_func()at 00:05 every day in August Postgres server time zone:SELECT timetable.add_job('execute-func', '5 0 * 8 *', 'SELECT public.my_func()');Run
VACUUMat minute 23 past every 2nd hour from 0 through 20 every day Postgres server time zone:SELECT timetable.add_job('run-vacuum', '23 0-20/2 * * *', 'VACUUM');Refresh materialized view every 2 hours:
SELECT timetable.add_job('refresh-matview', '@every 2 hours', 'REFRESH MATERIALIZED VIEW public.mat_view');Clear log table after "pg_timetable restart:
SELECT timetable.add_job('clear-log', '@reboot', 'TRUNCATE timetable.log');Reindex at midnight Postgres server time zone on Sundays with reindexdb utility:
using default database under default user (no command line arguments)
SELECT timetable.add_job('reindex', '0 0 * * 7', 'reindexdb', job_kind := 'PROGRAM');specifying target database and tables, and be verbose
SELECT timetable.add_job('reindex', '0 0 * * 7', 'reindexdb', '["--table=foo", "--dbname=postgres", "--verbose"]'::jsonb, 'PROGRAM');passing password using environment variable through
bashshellSELECT timetable.add_job('reindex', '0 0 * * 7', 'bash', '["-c", "PGPASSWORD=5m3R7K4754p4m reindexdb -U postgres -h 192.168.0.221 -v"]'::jsonb, 'PROGRAM');
J.4.10. Samples #
J.4.10.1. Basic #
This sample demonstrates how to create a basic one-step chain with parameters. It uses CTE to directly update the timetable schema tables.
SELECT timetable.add_job(
job_name => 'notify every minute',
job_schedule => '* * * * *',
job_command => 'SELECT pg_notify($1, $2)',
job_parameters => '[ "TT_CHANNEL", "Ahoj from SQL base task" ]' :: jsonb,
job_kind => 'SQL'::timetable.command_kind,
job_client_name => NULL,
job_max_instances => 1,
job_live => TRUE,
job_self_destruct => FALSE,
job_ignore_errors => TRUE
) as chain_id;
J.4.10.2. Send email #
This sample demonstrates how to create an advanced email job. It will check if there are emails to send, will send them and log the status of the command execution. You don't need to setup anything, every parameter can be specified during the chain creation.
DO $$
-- An example for using the SendMail task.
DECLARE
v_mail_task_id bigint;
v_log_task_id bigint;
v_chain_id bigint;
BEGIN
-- Get the chain id
INSERT INTO timetable.chain (chain_name, max_instances, live) VALUES ('Send Mail', 1, TRUE)
RETURNING chain_id INTO v_chain_id;
-- Add SendMail task
INSERT INTO timetable.task (chain_id, task_order, kind, command)
SELECT v_chain_id, 10, 'BUILTIN', 'SendMail'
RETURNING task_id INTO v_mail_task_id;
-- Create the parameters for the SensMail task
-- "username": The username used for authenticating on the mail server
-- "password": The password used for authenticating on the mail server
-- "serverhost": The IP address or hostname of the mail server
-- "serverport": The port of the mail server
-- "senderaddr": The email that will appear as the sender
-- "ccaddr": String array of the recipients(Cc) email addresses
-- "bccaddr": String array of the recipients(Bcc) email addresses
-- "toaddr": String array of the recipients(To) email addresses
-- "subject": Subject of the email
-- "attachment": String array of the attachments (local file)
-- "attachmentdata": Pairs of name and base64-encoded content
-- "msgbody": The body of the email
INSERT INTO timetable.parameter (task_id, order_id, value)
VALUES (v_mail_task_id, 1, '{
"username": "user@example.com",
"password": "password",
"serverhost": "smtp.example.com",
"serverport": 587,
"senderaddr": "user@example.com",
"ccaddr": ["recipient_cc@example.com"],
"bccaddr": ["recipient_bcc@example.com"],
"toaddr": ["recipient@example.com"],
"subject": "pg_timetable - No Reply",
"attachment": ["D:\\Go stuff\\Books\\Concurrency in Go.pdf","report.yaml"],
"attachmentdata": [{"name": "File.txt", "base64data": "RmlsZSBDb250ZW50"}],
"msgbody": "<b>Hello User,</b> <p>I got some Go books for you enjoy</p> <i>pg_timetable</i>!",
"contenttype": "text/html; charset=UTF-8"
}'::jsonb);
-- Add Log task and make it the last task using `task_order` column (=30)
INSERT INTO timetable.task (chain_id, task_order, kind, command)
SELECT v_chain_id, 30, 'BUILTIN', 'Log'
RETURNING task_id INTO v_log_task_id;
-- Add housekeeping task, that will delete sent mail and update parameter for the previous logging task
-- Since we're using special add_task() function we don't need to specify the `chain_id`.
-- Function will take the same `chain_id` from the parent task, SendMail in this particular case
PERFORM timetable.add_task(
kind => 'SQL',
parent_id => v_mail_task_id,
command => format(
$query$WITH sent_mail(toaddr) AS (DELETE FROM timetable.parameter WHERE task_id = %s RETURNING value->>'username')
INSERT INTO timetable.parameter (task_id, order_id, value)
SELECT %s, 1, to_jsonb('Sent emails to: ' || string_agg(sent_mail.toaddr, ';'))
FROM sent_mail
ON CONFLICT (task_id, order_id) DO UPDATE SET value = EXCLUDED.value$query$,
v_mail_task_id, v_log_task_id
),
order_delta => 10
);
-- In the end we should have something like this. Note, that even Log task was created earlier it will be executed later
-- due to `task_order` column.
-- timetable=> SELECT task_id, chain_id, kind, left(command, 50) FROM timetable.task ORDER BY task_order;
-- task_id | chain_id | task_order | kind | left
-- ---------+----------+------------+---------+---------------------------------------------------------------
-- 45 | 24 | 10 | BUILTIN | SendMail
-- 47 | 24 | 20 | SQL | WITH sent_mail(toaddr) AS (DELETE FROM timetable.p
-- 46 | 24 | 30 | BUILTIN | Log
-- (3 rows)
END;
$$
LANGUAGE PLPGSQL;
J.4.10.3. Download, Transform and Import #
This sample demonstrates how to create enhanced three-step chain with parameters. It uses DO statement to directly update the timetable schema tables.
-- Prepare the destination table 'location'
CREATE TABLE IF NOT EXISTS public.city(
city text,
lat numeric,
lng numeric,
country text,
iso2 text,
admin_name text,
capital text,
population bigint,
population_proper bigint);
GRANT ALL ON public.city TO scheduler;
-- An enhanced example consisting of three tasks:
-- 1. Download text file from internet using BUILT-IN command
-- 2. Remove accents (diacritic signs) from letters using PROGRAM command (can be done with `unaccent` PostgreSQL extension)
-- 3. Import text file as CSV file using BUILT-IN command (can be down with `psql -c /copy`)
DO $$
DECLARE
v_task_id bigint;
v_chain_id bigint;
BEGIN
-- Create the chain with default values executed every minute (NULL == '* * * * *' :: timetable.cron)
INSERT INTO timetable.chain (chain_name, live)
VALUES ('Download locations and aggregate', TRUE)
RETURNING chain_id INTO v_chain_id;
-- Step 1. Download file from the server
-- Create the chain
INSERT INTO timetable.task (chain_id, task_order, kind, command, ignore_error)
VALUES (v_chain_id, 1, 'BUILTIN', 'Download', TRUE)
RETURNING task_id INTO v_task_id;
-- Create the parameters for the step 1:
INSERT INTO timetable.parameter (task_id, order_id, value)
VALUES (v_task_id, 1,
'{
"workersnum": 1,
"fileurls": ["https://simplemaps.com/static/data/country-cities/mt/mt.csv"],
"destpath": "."
}'::jsonb);
RAISE NOTICE 'Step 1 completed. Chain added with ID: %; DownloadFile task added with ID: %', v_chain_id, v_task_id;
-- Step 2. Transform Unicode characters into ASCII
-- Create the program task to call 'uconv' and name it 'unaccent'
INSERT INTO timetable.task (chain_id, task_order, kind, command, ignore_error, task_name)
VALUES (v_chain_id, 2, 'PROGRAM', 'uconv', TRUE, 'unaccent')
RETURNING task_id INTO v_task_id;
-- Create the parameters for the 'unaccent' task. Input and output files in this case
-- Under Windows we should call PowerShell instead of "uconv" with command:
-- Set-content "orte_ansi.txt" ((Get-content "orte.txt").Normalize("FormD") -replace '\p{M}', '')
INSERT INTO timetable.parameter (task_id, order_id, value)
VALUES (v_task_id, 1, '["-x", "Latin-ASCII", "-o", "mt_ansi.csv", "mt.csv"]'::jsonb);
RAISE NOTICE 'Step 2 completed. Unacent task added with ID: %', v_task_id;
-- Step 3. Import ASCII file to PostgreSQL table using "CopyFromFile" built-in command
INSERT INTO timetable.task (chain_id, task_order, kind, command)
VALUES (v_chain_id, 3, 'BUILTIN', 'CopyFromFile')
RETURNING task_id INTO v_task_id;
-- Add the parameters for the download task. Execute client side COPY to 'location' from 'orte_ansi.txt'
INSERT INTO timetable.parameter (task_id, order_id, value)
VALUES (v_task_id, 1, '{"sql": "COPY city FROM STDIN (FORMAT csv, HEADER true)", "filename": "mt_ansi.csv" }'::jsonb);
RAISE NOTICE 'Step 3 completed. Import task added with ID: %', v_task_id;
INSERT INTO timetable.task (chain_id, task_order, kind, command, ignore_error, task_name)
VALUES (v_chain_id, 4, 'PROGRAM', 'bash', TRUE, 'remove .csv')
RETURNING task_id INTO v_task_id;
INSERT INTO timetable.parameter (task_id, order_id, value)
VALUES (v_task_id, 1, '["-c", "rm *.csv"]'::jsonb);
RAISE NOTICE 'Step 4 completed. Cleanup task added with ID: %', v_task_id;
END;
$$ LANGUAGE PLPGSQL;
J.4.10.4. Run tasks in autonomous transaction #
This sample demonstrates how to run special tasks out of chain transaction context. This is useful for special routines and/or non-transactional operations, e.g. CREATE DATABASE, REINDEX, VACUUM, CREATE TABLESPACE, etc.
-- An advanced example showing how to use atutonomous tasks.
-- This one-task chain will execute test_proc() procedure.
-- Since procedure will make two commits (after f1() and f2())
-- we cannot use it as a regular task, because all regular tasks
-- must be executed in the context of a single chain transaction.
-- Same rule applies for some other SQL commands,
-- e.g. CREATE DATABASE, REINDEX, VACUUM, CREATE TABLESPACE, etc.
CREATE OR REPLACE FUNCTION f (msg TEXT) RETURNS void AS $$
BEGIN
RAISE notice '%', msg;
END;
$$ LANGUAGE PLPGSQL;
CREATE OR REPLACE PROCEDURE test_proc () AS $$
BEGIN
PERFORM f('hey 1');
COMMIT;
PERFORM f('hey 2');
COMMIT;
END;
$$
LANGUAGE PLPGSQL;
WITH
cte_chain (v_chain_id) AS (
INSERT INTO timetable.chain (chain_name, run_at, max_instances, live, self_destruct)
VALUES (
'call proc() every 10 sec', -- chain_name,
'@every 10 seconds', -- run_at,
1, -- max_instances,
TRUE, -- live,
FALSE -- self_destruct
) RETURNING chain_id
),
cte_task(v_task_id) AS (
INSERT INTO timetable.task (chain_id, task_order, kind, command, ignore_error, autonomous)
SELECT v_chain_id, 10, 'SQL', 'CALL test_proc()', TRUE, TRUE
FROM cte_chain
RETURNING task_id
)
SELECT v_chain_id, v_task_id FROM cte_task, cte_chain;
J.4.10.5. Shutdown the scheduler and terminate the session #
This sample demonstrates how to shutdown the scheduler using special built-in task. This can be used to control maintenance windows, to restart the scheduler for update purposes, or to stop session before the database should be dropped.
-- This one-task chain (aka job) will terminate pg_timetable session.
-- This is useful for maintaining purposes or before database being destroyed.
-- One should take care of restarting pg_timetable if needed.
SELECT timetable.add_job (
job_name => 'Shutdown pg_timetable session on schedule',
job_schedule => '* * 1 * *',
job_command => 'Shutdown',
job_kind => 'BUILTIN'
);
J.4.10.6. Access previous task result code and output from the next task #
This sample demonstrates how to check the result code and output of a previous task. If the last task failed, that is possible only if ignore_error boolean = true is set for that task. Otherwise, a scheduler will stop the chain. This sample shows how to calculate failed, successful, and the total number of tasks executed. Based on these values, we can calculate the success ratio.
WITH
cte_chain (v_chain_id) AS ( -- let's create a new chain and add tasks to it later
INSERT INTO timetable.chain (chain_name, run_at, max_instances, live)
VALUES ('many tasks', '* * * * *', 1, true)
RETURNING chain_id
),
cte_tasks(v_task_id) AS ( -- now we'll add 500 tasks to the chain, some of them will fail
INSERT INTO timetable.task (chain_id, task_order, kind, command, ignore_error)
SELECT v_chain_id, g.s, 'SQL', 'SELECT 1.0 / round(random())::int4;', true
FROM cte_chain, generate_series(1, 500) AS g(s)
RETURNING task_id
),
report_task(v_task_id) AS ( -- and the last reporting task will calculate the statistic
INSERT INTO timetable.task (chain_id, task_order, kind, command)
SELECT v_chain_id, 501, 'SQL', $CMD$DO
$$
DECLARE
s TEXT;
BEGIN
WITH report AS (
SELECT
count(*) FILTER (WHERE returncode = 0) AS success,
count(*) FILTER (WHERE returncode != 0) AS fail,
count(*) AS total
FROM timetable.execution_log
WHERE chain_id = current_setting('pg_timetable.current_chain_id')::bigint
AND txid = txid_current()
)
SELECT 'Tasks executed:' || total ||
'; succeeded: ' || success ||
'; failed: ' || fail ||
'; ratio: ' || 100.0*success/GREATEST(total,1)
INTO s
FROM report;
RAISE NOTICE '%', s;
END;
$$
$CMD$
FROM cte_chain
RETURNING task_id
)
SELECT v_chain_id FROM cte_chain
J.4.11. YAML Chain Configuration Guide #
This guide explains how to use YAML files to define pg_timetable chains as an alternative to SQL-based configuration.
J.4.11.1. Overview #
YAML chain definitions provide a human-readable way to create scheduled task chains without writing SQL. Benefits include:
Creating complex multi-step workflows with clear structure
Version controlling your chain configurations
Easy review and modification of scheduled tasks
Sharing chain templates across environments
J.4.11.2. Basic Usage #
# Load YAML chains pg_timetable --file chains.yaml postgresql://user:pass@host/db # Validate YAML without importing pg_timetable --file chains.yaml --validate # Replace existing chains with same names pg_timetable --file chains.yaml --replace postgresql://user:pass@host/db
J.4.11.3. YAML Format #
J.4.11.3.1. Basic Structure #
chains:
- name: "chain-name" # Required: unique identifier
schedule: "* * * * *" # Required: cron format
live: true # Optional: enable/disable chain
max_instances: 1 # Optional: max parallel executions
timeout: 30000 # Optional: timeout in milliseconds
self_destruct: false # Optional: delete after success
exclusive: false # Optional: pause other chains while running
client_name: "worker-1" # Optional: restrict to specific client
on_error: "SELECT log_error($1)" # Optional: error handling SQL
tasks: # Required: array of tasks
- name: "task-name" # Optional: task description
kind: "SQL" # Optional: SQL, PROGRAM, or BUILTIN
command: "SELECT now()" # Required: command to execute
run_as: "postgres" # Optional: role for SET ROLE
connect_string: "postgresql://user@host/otherdb" # Optional: different database connection
ignore_error: false # Optional: continue on error
autonomous: false # Optional: run outside transaction
timeout: 5000 # Optional: task timeout in ms
parameters: # Optional: task parameters, each entry causes separate execution
- ["value1", 42] # Parameters for SQL tasks are arrays of values
J.4.11.3.2. Task Parameters #
Each task can have multiple parameter entries, with each entry causing a separate execution:
# SQL task parameters (arrays of values)
- name: "sql-task"
kind: "SQL"
command: "SELECT $1, $2, $3, $4"
parameters:
- ["one", 2, 3.14, false] # First execution
- ["two", 4, 6.28, true] # Second execution
# PROGRAM task parameters (arrays of command-line arguments)
- name: "program-task"
kind: "PROGRAM"
command: "iconv"
parameters:
- ["-x", "Latin-ASCII", "-o", "file1.txt", "input1.txt"]
- ["-x", "UTF-8", "-o", "file2.txt", "input2.txt"]
# BUILTIN: Sleep task (integer values)
- name: "sleep-task"
kind: "BUILTIN"
command: "Sleep"
parameters:
- 5 # Sleep for 5 seconds
- 10 # Then sleep for 10 seconds
# BUILTIN: Log task (string or object values)
- name: "log-task"
kind: "BUILTIN"
command: "Log"
parameters:
- "WARNING: Simple message"
- level: "WARNING"
details: "Object message"
# BUILTIN: SendMail task (complex object)
- name: "mail-task"
kind: "BUILTIN"
command: "SendMail"
parameters:
- username: "user@example.com"
password: "password123"
serverhost: "smtp.example.com"
serverport: 587
senderaddr: "user@example.com"
toaddr: ["recipient@example.com"]
subject: "Notification"
msgbody: "<p>Hello User</p>"
contenttype: "text/html; charset=UTF-8"
J.4.11.3.3. Examples #
J.4.11.3.3.1. Simple SQL Job #
chains:
- name: "daily-cleanup"
schedule: "0 2 * * *" # 2 AM daily
live: true
tasks:
- name: "vacuum-tables"
command: "VACUUM ANALYZE"
J.4.11.3.3.2. Multi-Step Chain #
chains:
- name: "data-pipeline"
schedule: "0 1 * * *" # 1 AM daily
live: true
max_instances: 1
timeout: 7200000 # 2 hours
tasks:
- name: "extract"
command: |
CREATE TEMP TABLE temp_data AS
SELECT * FROM source_table
WHERE date >= CURRENT_DATE - INTERVAL '1 day'
- name: "validate"
command: |
DO $$
BEGIN
IF (SELECT COUNT(*) FROM temp_data) = 0 THEN
RAISE EXCEPTION 'No data to process';
END IF;
END $$
- name: "transform"
command: "CALL transform_data_procedure()"
autonomous: true
- name: "load"
command: "INSERT INTO target_table SELECT * FROM temp_data"
J.4.11.3.3.3. Program Tasks #
chains:
- name: "backup-job"
schedule: "0 3 * * 0" # Sunday 3 AM
live: true
client_name: "backup-worker"
tasks:
- name: "database-backup"
kind: "PROGRAM"
command: "pg_dump"
parameters:
- ["-h", "localhost", "-U", "postgres", "-d", "mydb", "-f", "/backups/mydb.sql"]
timeout: 3600000 # 1 hour
- name: "compress-backup"
kind: "PROGRAM"
command: "gzip"
parameters:
- ["/backups/mydb.sql"]
J.4.11.3.3.4. Multiple Chains in One File #
chains:
# Monitoring chain
- name: "health-check"
schedule: "*/15 * * * *" # Every 15 minutes
live: true
tasks:
- command: "SELECT check_database_health()"
# Cleanup chain
- name: "hourly-cleanup"
schedule: "0 * * * *" # Every hour
live: true
tasks:
- command: "DELETE FROM logs WHERE created_at < now() - interval '7 days'"
J.4.11.4. Advanced Features #
J.4.11.4.1. Error Handling #
Control error behavior with ignore_error and on_error:
chains:
- name: "resilient-chain"
on_error: |
SELECT pg_notify('monitoring',
format('{"ConfigID": %s, "Message": "Something bad happened"}',
current_setting('pg_timetable.current_chain_id')::bigint))
tasks:
- name: "risky-task"
command: "SELECT might_fail()"
ignore_error: true # Continue chain execution even if this task fails
- name: "cleanup-task"
command: "SELECT cleanup()" # Always runs, even if previous task failed
J.4.11.4.2. Transaction Control #
Use autonomous: true for tasks that need to run outside the main transaction:
tasks:
- name: "vacuum-task"
command: "VACUUM FULL heavy_table"
autonomous: true # Required for VACUUM FULL
- name: "create-database"
command: "CREATE DATABASE new_db"
autonomous: true # CREATE DATABASE requires autonomous transaction
J.4.11.4.3. Remote Databases #
Execute tasks on different databases:
tasks:
- name: "cross-database-task"
command: "SELECT sync_data()"
connect_string: "postgresql://user:pass@other-host/other-db"
J.4.11.5. Validation #
YAML files are validated when loaded:
Syntax: Valid YAML format
Structure: Required fields present
Cron: Valid 5-field cron expressions
Task kinds: Must be SQL, PROGRAM, or BUILTIN
Timeouts: Non-negative integers
Use --validate to check files without importing:
pg_timetable --file chains.yaml --validate
J.4.11.6. Migration from SQL #
J.4.11.6.1. Converting Existing Chains #
To convert SQL-based chains to YAML:
Query chain and tasks information:
SELECT * FROM timetable.chain c WHERE c.chain_name = 'my-chain'; SELECT t.* FROM timetable.task t JOIN timetable.chain c ON t.chain_id = c.chain_id AND c.chain_name = 'my-chain' ORDER BY t.task_order;Map to YAML format:
chain_name→namerun_at→schedulelive→livemax_instances→max_instancesTask fields map directly
Test the conversion:
pg_timetable --file converted.yaml --validate
J.4.11.6.2. Example Migration #
Original SQL:
SELECT timetable.add_job(
job_name => 'daily-report',
job_schedule => '0 9 * * *',
job_command => 'CALL generate_report()',
job_live => TRUE
);
Converted YAML:
chains:
- name: "daily-report"
schedule: "0 9 * * *"
live: true
tasks:
- command: "CALL generate_report()"
J.4.11.7. Best Practices #
J.4.11.7.1. Naming Conventions #
Use descriptive, kebab-case names
Include environment in name for clarity
Group related chains in same file
J.4.11.7.2. Documentation #
Use YAML comments to document complex logic
Include purpose and dependencies in task names
Document parameter meanings
chains:
- name: "etl-sales-data"
# Processes daily sales data from external API
# Depends on: external API availability, sales_raw table
schedule: "0 2 * * *"
tasks:
- name: "extract-from-api"
# Fetches last 24h of sales data from REST API
command: "SELECT fetch_sales_data($1)"
parameters: ["yesterday"]
J.4.11.7.3. Testing #
Always validate YAML before deployment
Test with
--validateflagUse non-live chains for testing
Keep backups of working configurations
J.4.11.7.4. Version Control #
Store YAML files in version control
Use meaningful commit messages
Tag releases for production deployments
Review changes before merging
J.4.11.8. Troubleshooting #
J.4.11.8.1. Common Issues #
Invalid YAML syntax:
Error: failed to parse YAML: yaml: line 5: found character that cannot start any token
→ Check indentation and quotes
Invalid cron format:
Error: invalid cron format: 0 9 * * (expected 5 fields)
→ Ensure cron has exactly 5 fields
Chain already exists:
Error: chain 'my-chain' already exists (use --replace flag to overwrite)
→ Use --replace flag or choose different name
Missing required fields:
Error: chain 1: chain name is required
→ Check all required fields are present
J.4.12. Migration from others schedulers #
J.4.12.1. Migrate jobs from pg_cron to pg_timetable #
If you want to quickly export jobs scheduled from pg_cron to pg_timetable, you can use this SQL snippet:
SELECT timetable.add_job(
job_name => COALESCE(jobname, 'job: ' || command),
job_schedule => schedule,
job_command => command,
job_kind => 'SQL',
job_live => active
) FROM cron.job;
The timetable.add_job(), however, has some limitations. First of all, the function will mark the task created as autonomous, specifying scheduler should execute the task out of the chain transaction. It’s not an error, but many autonomous chains may cause some extra connections to be used.
Secondly, database connection parameters are lost for source pg_cron jobs, making all jobs local. To export every information available precisely as possible, use this SQL snippet under the role they were scheduled in pg_cron:
SET ROLE 'scheduler'; -- set the role used by pg_cron
WITH cron_chain AS (
SELECT
nextval('timetable.chain_chain_id_seq'::regclass) AS cron_id,
jobname,
schedule,
active,
command,
CASE WHEN
database != current_database()
OR nodename != 'localhost'
OR username != CURRENT_USER
OR nodeport != inet_server_port()
THEN
format('host=%s port=%s dbname=%s user=%s', nodename, nodeport, database, username)
END AS connstr
FROM
cron.job
),
cte_chain AS (
INSERT INTO timetable.chain (chain_id, chain_name, run_at, live)
SELECT
cron_id, COALESCE(jobname, 'cronjob' || cron_id), schedule, active
FROM
cron_chain
),
cte_tasks AS (
INSERT INTO timetable.task (chain_id, task_order, kind, command, database_connection)
SELECT
cron_id, 1, 'SQL', command, connstr
FROM
cron_chain
RETURNING
chain_id, task_id
)
SELECT * FROM cte_tasks;
J.4.12.2. Migrate jobs from pgAgent to pg_timetable #
To migrate jobs from pgAgent, please use this script. pgAgent doesn’t have concept of PROGRAM task, thus to emulate BATCH steps, pg_timetable will execute them inside the shell. You may change the shell by editing cte_shell CTE clause.
CREATE OR REPLACE FUNCTION bool_array_to_cron(bool[], start_with int4 DEFAULT 0) RETURNS TEXT AS
$$
WITH u AS (
SELECT unnest($1) e, generate_series($2, array_length($1, 1)-1+$2) AS i
)
SELECT COALESCE(string_agg(i::text, ','), '*') FROM u WHERE e
$$
LANGUAGE sql;
WITH
cte_shell(shell, cmd_param) AS (
VALUES ('sh', '-c') -- set the shell you want to use for batch steps, e.g. "pwsh -c", "cmd /C"
),
pga_schedule AS (
SELECT
s.jscjobid,
s.jscname,
format('%s %s %s %s %s',
bool_array_to_cron(s.jscminutes),
bool_array_to_cron(s.jschours),
bool_array_to_cron(s.jscmonthdays),
bool_array_to_cron(s.jscmonths, 1),
bool_array_to_cron(s.jscweekdays, 1)) AS schedule
FROM
pgagent.pga_schedule s
WHERE s.jscenabled
AND now() < COALESCE(s.jscend, 'infinity'::timestamptz)
AND now() > s.jscstart
),
pga_chain AS (
SELECT
nextval('timetable.chain_chain_id_seq'::regclass) AS chain_id,
jobid,
format('%s @ %s', jobname, jscname) AS jobname,
jobhostagent,
jobenabled,
schedule
FROM
pgagent.pga_job JOIN pga_schedule ON jobid = jscjobid
),
cte_chain AS (
INSERT INTO timetable.chain (chain_id, chain_name, client_name, run_at, live)
SELECT
chain_id, jobname, jobhostagent, schedule, jobenabled
FROM
pga_chain
),
pga_step AS (
SELECT
c.chain_id,
nextval('timetable.task_task_id_seq'::regclass) AS task_id,
rank() OVER (ORDER BY jstname) AS jstorder,
jstid,
jstname,
jstenabled,
CASE jstkind WHEN 'b' THEN 'PROGRAM' ELSE 'SQL' END AS jstkind,
jstcode,
COALESCE(
NULLIF(jstconnstr, ''),
CASE
WHEN jstdbname = current_database() THEN NULL
WHEN jstdbname > '' THEN 'dbname=' || jstdbname
END
) AS jstconnstr,
jstonerror != 'f' AS jstignoreerror
FROM
pga_chain c JOIN pgagent.pga_jobstep js ON c.jobid = js.jstjobid
),
cte_tasks AS (
INSERT INTO timetable.task(task_id, chain_id, task_name, task_order, kind, command, database_connection)
SELECT
task_id, chain_id, jstname, jstorder, jstkind::timetable.command_kind,
CASE jstkind WHEN 'SQL' THEN jstcode ELSE sh.shell END,
jstconnstr
FROM
pga_step, cte_shell sh
),
cte_parameters AS (
INSERT INTO timetable.parameter (task_id, order_id, value)
SELECT
task_id, 1, jsonb_build_array(sh.cmd_param, s.jstcode)
FROM
pga_step s, cte_shell sh
WHERE
s.jstkind = 'PROGRAM'
)
SELECT * FROM pga_chain;
J.4.13. REST API #
pg_timetable has a rich REST API, which can be used by external tools in order to perform start/stop/reinitialize/restarts/reloads, by any kind of tools to perform HTTP health checks, and of course, could also be used for monitoring.
Below you will find the list of pg_timetable REST API endpoints.
J.4.13.1. Health check endpoints #
-
GET /liveness Always returns HTTP status code
200, indicating that pg_timetable is running.-
GET /readiness Returns HTTP status code
200when the pg_timetable is running, and the scheduler is in the main loop processing chains. If the scheduler connects to the database, creates the database schema, or upgrades it, it will return the HTTP status code503.
J.4.13.2. Chain management endpoints #
-
GET /startchain?id=<chain-id> Returns HTTP status code
200if the chain with the given id can be added to the worker queue. It doesn’t, however, mean the chain execution starts immediately. It is up to the worker to perform load and other checks before starting the chain. In the case of an error, the HTTP status code400followed by an error message returned.-
GET /stopchain?id=<chain-id> Returns HTTP status code
200if the chain with the given id is working at the moment and can be stopped. If the chain is running the cancel signal would be sent immediately. In the case of an error, the HTTP status code400followed by an error message returned.
J.4.14. YAML Chain Definition Format for pg_timetable #
This document defines the YAML format for defining chains of scheduled tasks in pg_timetable.
J.4.14.1. YAML Schema #
# Top-level structure
chains:
- name: "chain-name" # Required: chain_name (TEXT, unique)
schedule: "* * * * *" # Required: run_at (cron format)
live: true # Optional: live (BOOLEAN), default: false
max_instances: 1 # Optional: max_instances (INTEGER)
timeout: 30000 # Optional: timeout in milliseconds (INTEGER)
self_destruct: false # Optional: self_destruct (BOOLEAN), default: false
exclusive: false # Optional: exclusive_execution (BOOLEAN), default: false
client_name: "worker-1" # Optional: client_name (TEXT)
on_error: "SELECT log_error()" # Optional: on_error SQL (TEXT)
tasks: # Required: array of tasks
- name: "task-1" # Optional: task_name (TEXT)
kind: "SQL" # Optional: kind (SQL|PROGRAM|BUILTIN), default: SQL
command: "SELECT $1, $2" # Required: command (TEXT)
parameters: # Optional: parameters (array of execution parameters)
- ["value1", 42] # First execution with these parameters
- ["value2", 99] # Second execution with different parameters
run_as: "postgres" # Optional: run_as (TEXT) - role for SET ROLE
connect_string: "postgresql://user@host/otherdb" # Optional: database_connection (TEXT)
ignore_error: false # Optional: ignore_error (BOOLEAN), default: false
autonomous: false # Optional: autonomous (BOOLEAN), default: false
timeout: 5000 # Optional: timeout in milliseconds (INTEGER)
- name: "task-2"
kind: "PROGRAM"
command: "bash"
parameters: ["-c", "echo hello"]
ignore_error: true
J.4.14.2. Field Mappings #
J.4.14.2.1. Chain Level #
| YAML Field | DB Column | Type | Default | Description |
|---|---|---|---|---|
name | chain_name | TEXT | required | Unique chain identifier |
schedule | run_at | cron | required | Cron-style schedule |
live | live | BOOLEAN | false | Whether chain is active |
max_instances | max_instances | INTEGER | null | Max parallel instances |
timeout | timeout | INTEGER | 0 | Chain timeout (ms) |
self_destruct | self_destruct | BOOLEAN | false | Delete after success |
exclusive | exclusive_execution | BOOLEAN | false | Pause other chains |
client_name | client_name | TEXT | null | Restrict to specific client |
on_error | on_error | TEXT | null | Error handling SQL |
J.4.14.2.2. Task Level #
| YAML Field | DB Column | Type | Default | Description |
|---|---|---|---|---|
name | task_name | TEXT | null | Task description |
kind | kind | ENUM | 'SQL' | Command type (SQL/PROGRAM/BUILTIN) |
command | command | TEXT | required | Command to execute |
parameters | via timetable.parameter | Array of any | null | Array of parameter values stored as individual JSONB rows with order_id |
run_as | run_as | TEXT | null | Role for SET ROLE |
connect_string | database_connection | TEXT | null | Connection string |
ignore_error | ignore_error | BOOLEAN | false | Continue on error |
autonomous | autonomous | BOOLEAN | false | Execute outside transaction |
timeout | timeout | INTEGER | 0 | Task timeout (ms) |
J.4.14.3. Task Ordering #
Tasks are ordered sequentially within a chain based on their array position. The system will automatically assign appropriate task_order values with spacing (e.g., 10, 20, 30) to allow future insertions.
J.4.14.4. Examples #
J.4.14.4.1. Simple SQL Job #
chains:
- name: "daily-report"
schedule: "0 9 * * *" # 9 AM daily
live: true
tasks:
- name: "generate-report"
command: "CALL generate_daily_report()"
J.4.14.4.2. Multi-task Chain #
chains:
- name: "etl-pipeline"
schedule: "0 2 * * *" # 2 AM daily
live: true
max_instances: 1
timeout: 3600000 # 1 hour
tasks:
- name: "extract-data"
command: "SELECT extract_sales_data($1)"
parameters: ["2023-01-01"]
- name: "transform-data"
command: "CALL transform_sales_data()"
autonomous: true
- name: "load-data"
command: "CALL load_to_warehouse()"
ignore_error: false
J.4.14.4.3. Program Task #
chains:
- name: "backup-job"
schedule: "0 3 * * 0" # Sunday 3 AM
live: true
tasks:
- name: "pg-dump"
kind: "PROGRAM"
command: "pg_dump"
parameters:
- ["-h", "localhost", "-U", "postgres", "-d", "mydb", "-f", "/backups/mydb.sql"]
J.4.14.5. Validation Rules #
Required Fields:
name,schedule,tasks, andcommandfor each taskUnique Names: Chain names must be unique across the database
Valid Cron: Schedule must be valid cron format (5 fields)
Valid Kind: Task kind must be one of: SQL, PROGRAM, BUILTIN
Parameter Types: Parameters can be any JSON-compatible type (strings, numbers, booleans, arrays, objects) and are stored as individual JSONB values
Timeout Values: Must be non-negative integers (milliseconds)