H.3. pg_timetable#

H.3. pg_timetable

H.3. pg_timetable #

pg_timetable is an advanced job scheduler for PostgreSQL, offering many advantages over traditional schedulers such as cron and others. It is completely database driven and provides a couple of advanced concepts.

H.3.1. About pg_timetable #

Version: 5.9.0

GitHub

H.3.2. Main features #

  • Tasks can be arranged in chains

  • A chain can consist of built-int commands, SQL and executables

  • Parameters can be passed to chains

  • Missed tasks (possibly due to downtime) can be retried automatically

  • Support for configurable repetitions

  • Built-in tasks such as sending emails, etc.

  • Fully database driven configuration

  • Full support for database driven logging

  • Cron-style scheduling at the PostgreSQL server time zone

  • Optional concurrency protection

  • Task and chain can have execution timeout settings

H.3.3. Quick Start #

  1. Download pg_timetable executable

  2. Make sure your PostgreSQL server is up and running and has a role with CREATE privilege for a target database, e.g.

    my_database=> CREATE ROLE scheduler PASSWORD 'somestrong';
    my_database=> GRANT CREATE ON DATABASE my_database TO scheduler;
    
  3. Create a new job, e.g. run VACUUM each night at 00:30 Postgres server time zone

    my_database=> SELECT timetable.add_job('frequent-vacuum', '30 * * * *', 'VACUUM');
    add_job
    ---------
          3
    (1 row)
    
  4. Run the pg_timetable

    # pg_timetable postgresql://scheduler:somestrong@localhost/my_database --clientname=vacuumer
    
  5. PROFIT!

H.3.4. Command line options #

# ./pg_timetable

Application Options:
  -c, --clientname=                                Unique name for application instance [$PGTT_CLIENTNAME]
      --config=                                    YAML configuration file
      --no-program-tasks                           Disable executing of PROGRAM tasks [$PGTT_NOPROGRAMTASKS]
  -v, --version                                    Output detailed version information [$PGTT_VERSION]

Connection:
  -h, --host=                                      PostgreSQL host (default: localhost) [$PGTT_PGHOST]
  -p, --port=                                      PostgreSQL port (default: 5432) [$PGTT_PGPORT]
  -d, --dbname=                                    PostgreSQL database name (default: timetable) [$PGTT_PGDATABASE]
  -u, --user=                                      PostgreSQL user (default: scheduler) [$PGTT_PGUSER]
      --password=                                  PostgreSQL user password [$PGTT_PGPASSWORD]
      --sslmode=[disable|require]                  Connection SSL mode (default: disable) [$PGTT_PGSSLMODE]
      --pgurl=                                     PostgreSQL connection URL [$PGTT_URL]
      --timeout=                                   PostgreSQL connection timeout (default: 90) [$PGTT_TIMEOUT]

Logging:
      --log-level=[debug|info|error]               Verbosity level for stdout and log file (default: info)
      --log-database-level=[debug|info|error|none] Verbosity level for database storing (default: info)
      --log-file=                                  File name to store logs
      --log-file-format=[json|text]                Format of file logs (default: json)
      --log-file-rotate                            Rotate log files
      --log-file-size=                             Maximum size in MB of the log file before it gets rotated (default: 100)
      --log-file-age=                              Number of days to retain old log files, 0 means forever (default: 0)
      --log-file-number=                           Maximum number of old log files to retain, 0 to retain all (default: 0)

Start:
  -f, --file=                                      SQL script file to execute during startup
      --init                                       Initialize database schema to the latest version and exit. Can be used
                                                   with --upgrade
      --upgrade                                    Upgrade database to the latest version
      --debug                                      Run in debug mode. Only asynchronous chains will be executed

Resource:
      --cron-workers=                              Number of parallel workers for scheduled chains (default: 16)
      --interval-workers=                          Number of parallel workers for interval chains (default: 16)
      --chain-timeout=                             Abort any chain that takes more than the specified number of
                                                   milliseconds
      --task-timeout=                              Abort any task within a chain that takes more than the specified number
                                                   of milliseconds

REST:
      --rest-port=                                 REST API port (default: 0) [$PGTT_RESTPORT]

H.3.5. Contributing #

If you want to contribute to pg_timetable and help make it better, feel free to open an issue or even consider submitting a pull request. You also can give a star to pg_timetable project, and to tell the world about it.

H.3.6. Support #

For professional support, please contact Cybertec.

H.3.7. Authors #

Implementation: Pavlo Golub

Initial idea and draft design: Hans-Jürgen Schönig

H.3.8. Project background #

The pg_timetable project got started back in 2019 for internal scheduling needs at Cybertec.

For more background on the project motivations and design goals see the original series of blogposts announcing the project and the following feature updates.

Cybertec also provides commercial 9-to-5 and 24/7 support for pg_timetable.

H.3.8.1. Project feedback #

For feature requests or troubleshooting assistance please open an issue on project’s Github page.

H.3.9. Installation #

pg_timetable is compatible with the latest supported PostgreSQL versions: 11, 12, 13, 14 (stable); 15 (dev).

Note

If you want to use pg_timetable with older versions (9.5, 9.6 and 10)... please, execute this SQL command before running pg_timetable:

CREATE OR REPLACE FUNCTION starts_with(text, text)
RETURNS bool AS
$$
SELECT
    CASE WHEN length($2) > length($1) THEN
        FALSE
    ELSE
        left($1, length($2)) = $2
    END
$$
LANGUAGE SQL
IMMUTABLE STRICT PARALLEL SAFE
COST 5;

H.3.9.1. Official release packages #

You may find binary package for your platform on the official Releases page. Right now Windows, Linux and macOS packages are available.

H.3.9.2. Docker #

The official docker image can be found here: https://hub.docker.com/r/cybertecpostgresql/pg_timetable

Note

The latest tag is up to date with the master branch thanks to this github action. In production you probably want to use the latest stable tag.

Run pg_timetable in Docker:

docker run --rm \
cybertecpostgresql/pg_timetable:latest \
-h 10.0.0.3 -p 54321 -c worker001

Run pg_timetable in Docker with Environment variables:

docker run --rm \
-e PGTT_PGHOST=10.0.0.3 \
-e PGTT_PGPORT=54321 \
cybertecpostgresql/pg_timetable:latest \
-c worker001

H.3.9.3. Build from sources #

  1. Download and install Go on your system.

  2. Clone pg_timetable repo:

    $ git clone https://github.com/cybertec-postgresql/pg_timetable.git
    $ cd pg_timetable
    
  3. Run pg_timetable:

    $ go run main.go --dbname=dbname --clientname=worker001 --user=scheduler --password=strongpwd
    
  4. Alternatively, build a binary and run it:

    $ go build
    $ ./pg_timetable --dbname=dbname --clientname=worker001 --user=scheduler --password=strongpwd
    
  5. (Optional) Run tests in all sub-folders of the project:

    $ psql --command="CREATE USER scheduler PASSWORD 'somestrong'"
    $ createdb --owner=scheduler timetable
    $ go test -failfast -timeout=300s -count=1 -p 1 ./...
    

H.3.10. Components #

The scheduling in pg_timetable encompasses three different abstraction levels to facilitate the reuse with other parameters or additional schedules.

Command:

The base level, command, defines what to do.

Task:

The second level, task, represents a chain element (step) to run one of the commands. With tasks we define order of commands, arguments passed (if any), and how errors are handled.

Chain:

The third level represents a connected tasks forming a chain of tasks. Chain defines if, when, and how often a job should be executed.

H.3.10.1. Command #

Currently, there are three different kinds of commands:

SQL

SQL snippet. Starting a cleanup, refreshing a materialized view or processing data.

PROGRAM

External Command. Anything that can be called as an external binary, including shells, e.g. bash, pwsh, etc. The external command will be called using golang’s exec.CommandContext.

BUILTIN

Internal Command. A prebuilt functionality included in pg_timetable. These include:

  • NoOp,

  • Sleep,

  • Log,

  • SendMail,

  • Download,

  • CopyFromFile,

  • CopyToFile,

  • Shutdown.

H.3.10.2. Task #

The next building block is a task, which simply represents a step in a list of chain commands. An example of tasks combined in a chain would be:

  1. Download files from a server

  2. Import files

  3. Run aggregations

  4. Build report

  5. Remove the files from disk

Note

All tasks of the chain in pg_timetable are executed within one transaction. However, please, pay attention there is no opportunity to rollback PROGRAM and BUILTIN tasks.

H.3.10.2.1. Table timetable.task #
chain_id bigint

Link to the chain, if NULL task considered to be disabled

task_order DOUBLE PRECISION

Indicates the order of task within a chain.

kind timetable.command_kind

The type of the command. Can be SQL (default), PROGRAM or BUILTIN.

command text

Contains either a SQL command, a path to application or name of the BUILTIN command which will be executed.

run_as text

The role as which the task should be executed as.

database_connection text

The connection string for the external database that should be used.

ignore_error boolean

Specify if the next task should proceed after encountering an error (default: false).

autonomous boolean

Specify if the task should be executed out of the chain transaction. Useful for VACUUM, CREATE DATABASE, CALL etc.

timeout integer

Abort any task within a chain that takes more than the specified number of milliseconds.

Warning

If the task has been configured with ignore_error set to true (the default value is false), the worker process will report a success on execution even if the task within the chain fails.

As mentioned above, commands are simple skeletons (e.g. send email, vacuum, etc.). In most cases, they have to be brought to live by passing input parameters to the execution.

H.3.10.2.2. Table timetable.parameter #
task_id bigint

The ID of the task.

order_id integer

The order of the parameter. Several parameters are processed one by one according to the order.

value jsonb

A JSON value containing the parameters.

H.3.10.2.3. Parameter value format #

Depending on the command kind argument can be represented by different JSON values.

Kind
Schema

Example

SQL
array
'[ "one", 2, 3.14, false ]'::jsonb
PROGRAM
array of strings
'["-x", "Latin-ASCII", "-o", "orte_ansi.txt", "orte.txt"]'::jsonb
BUILTIN: Sleep
integer
'5' :: jsonb
BUILTIN: Log
any
'"WARNING"'::jsonb
'{"Status": "WARNING"}'::jsonb
BUILTIN: SendMail
object
'{
    "username":     "user@example.com",
    "password":     "password",
    "serverhost":   "smtp.example.com",
    "serverport":   587,
    "senderaddr":   "user@example.com",
    "ccaddr":       ["recipient_cc@example.com"],
    "bccaddr":      ["recipient_bcc@example.com"],
    "toaddr":       ["recipient@example.com"],
    "subject":      "pg_timetable - No Reply",
    "attachment":   ["/temp/attachments/Report.pdf","config.yaml"],
    "attachmentdata": [{"name": "File.txt", "base64data": "RmlsZSBDb250ZW50"}],
    "msgbody":      "<h2>Hello User,</h2> <p>check some attachments!</p>",
    "contenttype":   "text/html; charset=UTF-8"
}'::jsonb
BUILTIN: Download
object
'{
    "workersnum": 2,
    "fileurls": ["http://example.com/foo.gz", "https://example.com/bar.csv"],
    "destpath": "."
}'::jsonb
BUILTIN: CopyFromFile
object
'{
    "sql": "COPY location FROM STDIN",
    "filename": "download/orte_ansi.txt"
}'::jsonb
BUILTIN: CopyToFile
object
'{
    "sql": "COPY location TO STDOUT",
    "filename": "download/location.txt"
}'::jsonb
BUILTIN: Shutdown

value ignored

BUILTIN: NoOp

value ignored

H.3.10.3. Chain #

Once tasks have been arranged, they have to be scheduled as a chain. For this, pg_timetable builds upon the enhanced cron-string, all the while adding multiple configuration options.

H.3.10.3.1. Table timetable.chain #
chain_name text

The unique name of the chain.

run_at timetable.cron

Standard cron-style value at Postgres server time zone or @after, @every, @reboot clause.

max_instances integer

The amount of instances that this chain may have running at the same time.

timeout integer

Abort any chain that takes more than the specified number of milliseconds.

live boolean

Control if the chain may be executed once it reaches its schedule.

self_destruct boolean

Self destruct the chain after successful execution. Failed chains will be executed according to the schedule one more time.

exclusive_execution boolean

Specifies whether the chain should be executed exclusively while all other chains are paused.

client_name text

Specifies which client should execute the chain. Set this to NULL to allow any client.

timeout integer

Abort a chain that takes more than the specified number of milliseconds.

on_error

Holds SQL to execute if an error occurs. If task produced an error is marked with ignore_error then nothing is done.

Note

All chains in pg_timetable are scheduled at the PostgreSQL server time zone. You can change the timezone for the current session when adding new chains, e.g.

SET TIME ZONE 'UTC';

-- Run VACUUM at 00:05 every day in August UTC
SELECT timetable.add_job('execute-func', '5 0 * 8 *', 'VACUUM');

H.3.11. Getting started #

A variety of examples can be found in the Samples. If you want to migrate from a different scheduler, you can use scripts from Migration from others schedulers chapter.

H.3.11.1. Add simple job #

In a real world usually it’s enough to use simple jobs. Under this term we understand:

  • job is a chain with only one task (step) in it;

  • it doesn’t use complicated logic, but rather simple command;

  • it doesn’t require complex transaction handling, since one task is implicitely executed as a single transaction.

For such a group of chains we’ve introduced a special function timetable.add_job().

timetable.add_job(job_name, job_schedule, job_command, ...) RETURNS BIGINT

Creates a simple one-task chain

Parameters:
  • job_name (text) – The unique name of the chain and command.

  • job_schedule (timetable.cron) – Time schedule in сron syntax at Postgres server time zone

  • job_command (text) – The SQL which will be executed.

  • job_parameters (jsonb) – Arguments for the chain command. Default: NULL.

  • job_kind (timetable.command_kind) – Kind of the command: SQL, PROGRAM or BUILTIN. Default: SQL.

  • job_client_name (text) – Specifies which client should execute the chain. Set this to NULL to allow any client. Default: NULL.

  • job_max_instances (integer) – The amount of instances that this chain may have running at the same time. Default: NULL.

  • job_live (boolean) – Control if the chain may be executed once it reaches its schedule. Default: TRUE.

  • job_self_destruct (boolean) – Self destruct the chain after execution. Default: FALSE.

  • job_ignore_errors (boolean) – Ignore error during execution. Default: TRUE.

  • job_exclusive (boolean) – Execute the chain in the exclusive mode. Default: FALSE.

Returns:

the ID of the created chain

Return type:

integer

H.3.11.2. Examples #

  1. Run public.my_func() at 00:05 every day in August Postgres server time zone:

    SELECT timetable.add_job('execute-func', '5 0 * 8 *', 'SELECT public.my_func()');
    
  2. Run VACUUM at minute 23 past every 2nd hour from 0 through 20 every day Postgres server time zone:

    SELECT timetable.add_job('run-vacuum', '23 0-20/2 * * *', 'VACUUM');
    
  3. Refresh materialized view every 2 hours:

    SELECT timetable.add_job('refresh-matview', '@every 2 hours', 'REFRESH MATERIALIZED VIEW public.mat_view');
    
  4. Clear log table after pg_timetable restart:

    SELECT timetable.add_job('clear-log', '@reboot', 'TRUNCATE timetable.log');
    
  5. Reindex at midnight Postgres server time zone on Sundays with reindexdb utility:

    • using default database under default user (no command line arguments)

      SELECT timetable.add_job('reindex', '0 0 * * 7', 'reindexdb', job_kind := 'PROGRAM');
      
    • specifying target database and tables, and be verbose

      SELECT timetable.add_job('reindex', '0 0 * * 7', 'reindexdb',
          '["--table=foo", "--dbname=postgres", "--verbose"]'::jsonb, 'PROGRAM');
      
    • passing password using environment variable through bash shell

      SELECT timetable.add_job('reindex', '0 0 * * 7', 'bash',
          '["-c", "PGPASSWORD=5m3R7K4754p4m reindexdb -U postgres -h 192.168.0.221 -v"]'::jsonb,
          'PROGRAM');
      

H.3.12. Samples #

H.3.12.1. Basic #

This sample demonstrates how to create a basic one-step chain with parameters. It uses CTE to directly update the timetable schema tables.

SELECT timetable.add_job(
    job_name            => 'notify every minute',
    job_schedule        => '* * * * *',
    job_command         => 'SELECT pg_notify($1, $2)',
    job_parameters      => '[ "TT_CHANNEL", "Ahoj from SQL base task" ]' :: jsonb,
    job_kind            => 'SQL'::timetable.command_kind,
    job_client_name     => NULL,
    job_max_instances   => 1,
    job_live            => TRUE,
    job_self_destruct   => FALSE,
    job_ignore_errors   => TRUE
) as chain_id;

H.3.12.2. Send email #

This sample demonstrates how to create an advanced email job. It will check if there are emails to send, will send them and log the status of the command execution. You don’t need to setup anything, every parameter can be specified during the chain creation.

DO $$
    -- An example for using the SendMail task.
DECLARE
    v_mail_task_id bigint;
    v_log_task_id bigint;
    v_chain_id bigint;
BEGIN
    -- Get the chain id
    INSERT INTO timetable.chain (chain_name, max_instances, live) VALUES ('Send Mail', 1, TRUE)
    RETURNING chain_id INTO v_chain_id;

    -- Add SendMail task
    INSERT INTO timetable.task (chain_id, task_order, kind, command) 
    SELECT v_chain_id, 10, 'BUILTIN', 'SendMail'
    RETURNING task_id INTO v_mail_task_id;

    -- Create the parameters for the SensMail task
        -- "username":       The username used for authenticating on the mail server
        -- "password":        The password used for authenticating on the mail server
        -- "serverhost":      The IP address or hostname of the mail server
        -- "serverport":      The port of the mail server
        -- "senderaddr":      The email that will appear as the sender
        -- "ccaddr":         String array of the recipients(Cc) email addresses
        -- "bccaddr":        String array of the recipients(Bcc) email addresses
        -- "toaddr":          String array of the recipients(To) email addresses
        -- "subject":        Subject of the email
        -- "attachment":      String array of the attachments (local file)
        -- "attachmentdata":  Pairs of name and base64-encoded content
        -- "msgbody":        The body of the email

    INSERT INTO timetable.parameter (task_id, order_id, value)
        VALUES (v_mail_task_id, 1, '{
                "username":     "user@example.com",
                "password":     "password",
                "serverhost":   "smtp.example.com",
                "serverport":   587,
                "senderaddr":   "user@example.com",
                "ccaddr":       ["recipient_cc@example.com"],
                "bccaddr":      ["recipient_bcc@example.com"],
                "toaddr":       ["recipient@example.com"],
                "subject":      "pg_timetable - No Reply",
                "attachment":   ["D:\\Go stuff\\Books\\Concurrency in Go.pdf","report.yaml"],
                "attachmentdata": [{"name": "File.txt", "base64data": "RmlsZSBDb250ZW50"}],
                "msgbody":      "<b>Hello User,</b> <p>I got some Go books for you enjoy</p> <i>pg_timetable</i>!",
                "contenttype":  "text/html; charset=UTF-8"
                }'::jsonb);
    
    -- Add Log task and make it the last task using `task_order` column (=30)
    INSERT INTO timetable.task (chain_id, task_order, kind, command) 
    SELECT v_chain_id, 30, 'BUILTIN', 'Log'
    RETURNING task_id INTO v_log_task_id;

    -- Add housekeeping task, that will delete sent mail and update parameter for the previous logging task
    -- Since we're using special add_task() function we don't need to specify the `chain_id`.
    -- Function will take the same `chain_id` from the parent task, SendMail in this particular case
    PERFORM timetable.add_task(
        kind => 'SQL', 
        parent_id => v_mail_task_id,
        command => format(
$query$WITH sent_mail(toaddr) AS (DELETE FROM timetable.parameter WHERE task_id = %s RETURNING value->>sername')
INSERT INTO timetable.parameter (task_id, order_id, value) 
SELECT %s, 1, to_jsonb('Sent emails to: ' || string_agg(sent_mail.toaddr, ';'))
FROM sent_mail
ON CONFLICT (task_id, order_id) DO UPDATE SET value = EXCLUDED.value$query$, 
                v_mail_task_id, v_log_task_id
            ),
        order_delta => 10
    );

-- In the end we should have something like this. Note, that even Log task was created earlier it will be executed ter
-- due to `task_order` column.

-- timetable=> SELECT task_id, chain_id, kind, left(command, 50) FROM timetable.task ORDER BY task_order;  
--  task_id | chain_id | task_order |  kind   |                             left
-- ---------+----------+------------+---------+---------------------------------------------------------------
--       45 |       24 |         10 | BUILTIN | SendMail
--       47 |       24 |         20 | SQL     | WITH sent_mail(toaddr) AS (DELETE FROM timetable.p
--       46 |       24 |         30 | BUILTIN | Log
-- (3 rows)

END;
$$
LANGUAGE PLPGSQL;

H.3.12.3. Download, Transform and Import #

This sample demonstrates how to create enhanced three-step chain with parameters. It uses DO statement to directly update the timetable schema tables.

-- Prepare the destination table 'location'
CREATE TABLE IF NOT EXISTS city(
    city text,
    lat numeric,
    lng numeric,
    country text,
    iso2 text,
    admin_name text,
    capital text,
    population bigint,
    population_proper bigint);

-- An enhanced example consisting of three tasks:
-- 1. Download text file from internet using BUILT-IN command
-- 2. Remove accents (diacritic signs) from letters using PROGRAM command (can be done with `unaccent` PostgreSQL tension) 
-- 3. Import text file as CSV file using BUILT-IN command (can be down with `psql -c /copy`)
DO $$
DECLARE
    v_head_id bigint;
    v_task_id bigint;
    v_chain_id bigint;
BEGIN
    -- Create the chain with default values executed every minute (NULL == '* * * * *' :: timetable.cron)
    INSERT INTO timetable.chain (chain_name, live)
    VALUES ('Download locations and aggregate', TRUE)
    RETURNING chain_id INTO v_chain_id;

    -- Step 1. Download file from the server
    -- Create the chain
    INSERT INTO timetable.task (chain_id, task_order, kind, command, ignore_error)
    VALUES (v_chain_id, 1, 'BUILTIN', 'Download', TRUE)
    RETURNING task_id INTO v_task_id;

    -- Create the parameters for the step 1:
    INSERT INTO timetable.parameter (task_id, order_id, value)
        VALUES (v_task_id, 1, 
           '{
                "workersnum": 1,
                "fileurls": ["https://simplemaps.com/static/data/country-cities/mt/mt.csv"], 
                "destpath": "."
            }'::jsonb);
    
    RAISE NOTICE 'Step 1 completed. Chain added with ID: %; DownloadFile task added with ID: %', v_chain_id, task_id;

    -- Step 2. Transform Unicode characters into ASCII
    -- Create the program task to call 'uconv' and name it 'unaccent'
    INSERT INTO timetable.task (chain_id, task_order, kind, command, ignore_error, task_name)
    VALUES (v_chain_id, 2, 'PROGRAM', 'uconv', TRUE, 'unaccent')
    RETURNING task_id INTO v_task_id;

    -- Create the parameters for the 'unaccent' task. Input and output files in this case
    -- Under Windows we should call PowerShell instead of "uconv" with command:
    -- Set-content "orte_ansi.txt" ((Get-content "orte.txt").Normalize("FormD") -replace '\p{M}', '')
    INSERT INTO timetable.parameter (task_id, order_id, value)
        VALUES (v_task_id, 1, '["-x", "Latin-ASCII", "-o", "mt_ansi.csv", "mt.csv"]'::jsonb);

    RAISE NOTICE 'Step 2 completed. Unacent task added with ID: %', v_task_id;

    -- Step 3. Import ASCII file to PostgreSQL table using "CopyFromFile" built-in command
    INSERT INTO timetable.task (chain_id, task_order, kind, command)
        VALUES (v_chain_id, 3, 'BUILTIN', 'CopyFromFile')
    RETURNING task_id INTO v_task_id;

    -- Add the parameters for the download task. Execute client side COPY to 'location' from 'orte_ansi.txt'
    INSERT INTO timetable.parameter (task_id, order_id, value)
        VALUES (v_task_id, 1, '{"sql": "COPY city FROM STDIN (FORMAT csv, HEADER true)", "filename": "mt_ansi.v" }'::jsonb);

    RAISE NOTICE 'Step 3 completed. Import task added with ID: %', v_task_id;

    INSERT INTO timetable.task (chain_id, task_order, kind, command, ignore_error, task_name)
    VALUES (v_chain_id, 4, 'PROGRAM', 'bash', TRUE, 'remove .csv')
    RETURNING task_id INTO v_task_id;

    INSERT INTO timetable.parameter (task_id, order_id, value)
    VALUES (v_task_id, 1, '["-c", "rm *.csv"]'::jsonb);
END;
$$ LANGUAGE PLPGSQL;

H.3.12.4. Run tasks in autonomous transaction #

This sample demonstrates how to run special tasks out of chain transaction context. This is useful for special routines and/or non-transactional operations, e.g. CREATE DATABASE, REINDEX, VACUUM, CREATE TABLESPACE, etc.

-- An advanced example showing how to use atutonomous tasks.
-- This one-task chain will execute test_proc() procedure.
-- Since procedure will make two commits (after f1() and f2())
-- we cannot use it as a regular task, because all regular tasks 
-- must be executed in the context of a single chain transaction.
-- Same rule applies for some other SQL commands, 
-- e.g. CREATE DATABASE, REINDEX, VACUUM, CREATE TABLESPACE, etc.
CREATE OR REPLACE FUNCTION f (msg TEXT) RETURNS void AS $$
BEGIN 
    RAISE notice '%', msg; 
END;
$$ LANGUAGE PLPGSQL;

CREATE OR REPLACE PROCEDURE test_proc () AS $$
BEGIN
    PERFORM f('hey 1');
    COMMIT;
    PERFORM f('hey 2');
    COMMIT;
END;
$$
LANGUAGE PLPGSQL;

WITH
    cte_chain (v_chain_id) AS (
        INSERT INTO timetable.chain (chain_name, run_at, max_instances, live, self_destruct) 
        VALUES (
            'call proc() every 10 sec', -- chain_name, 
            '@every 10 seconds',        -- run_at,
            1,     -- max_instances, 
            TRUE,  -- live, 
            FALSE -- self_destruct
        ) RETURNING chain_id
    ),
    cte_task(v_task_id) AS (
        INSERT INTO timetable.task (chain_id, task_order, kind, command, ignore_error, autonomous)
        SELECT v_chain_id, 10, 'SQL', 'CALL test_proc()', TRUE, TRUE
        FROM cte_chain
        RETURNING task_id
    )
SELECT v_chain_id, v_task_id FROM cte_task, cte_chain;

H.3.12.5. Shutdown the scheduler and terminate the session #

This sample demonstrates how to shutdown the scheduler using special built-in task. This can be used to control maintenance windows, to restart the scheduler for update purposes, or to stop session before the database should be dropped.

-- This one-task chain (aka job) will terminate pg_timetable session.
-- This is useful for maintaining purposes or before database being destroyed.
-- One should take care of restarting pg_timetable if needed.

SELECT timetable.add_job (
    job_name     => 'Shutdown pg_timetable session on schedule',
    job_schedule => '* * 1 * *',
    job_command  => 'Shutdown',
    job_kind     => 'BUILTIN'
);

H.3.12.6. Access previous task result code and output from the next task #

This sample demonstrates how to check the result code and output of a previous task. If the last task failed, that is possible only if ignore_error boolean = true is set for that task. Otherwise, a scheduler will stop the chain. This sample shows how to calculate failed, successful, and the total number of tasks executed. Based on these values, we can calculate the success ratio.

WITH 
    cte_chain (v_chain_id) AS ( -- let's create a new chain and add tasks to it later
        INSERT INTO timetable.chain (chain_name, run_at, max_instances, live) 
        VALUES ('many tasks', '* * * * *', 1, true)
        RETURNING chain_id
    ),
    cte_tasks(v_task_id) AS ( -- now we'll add 500 tasks to the chain, some of them will fail
        INSERT INTO timetable.task (chain_id, task_order, kind, command, ignore_error)
        SELECT v_chain_id, g.s, 'SQL', 'SELECT 1.0 / round(random())::int4;', true
        FROM cte_chain, generate_series(1, 500) AS g(s)
        RETURNING task_id
    ),
    report_task(v_task_id) AS ( -- and the last reporting task will calculate the statistic
        INSERT INTO timetable.task (chain_id, task_order, kind, command)
        SELECT v_chain_id, 501, 'SQL', $CMD$DO
$$
DECLARE
    s TEXT;
BEGIN
    WITH report AS (
        SELECT 
        count(*) FILTER (WHERE returncode = 0) AS success,
        count(*) FILTER (WHERE returncode != 0) AS fail,
        count(*) AS total
        FROM timetable.execution_log 
        WHERE chain_id = current_setting('pg_timetable.current_chain_id')::bigint
          AND txid = txid_current()
    )
    SELECT 'Tasks executed:' || total || 
         '; succeeded: ' || success || 
         '; failed: ' || fail || 
         '; ratio: ' || 100.0*success/GREATEST(total,1)
    INTO s
    FROM report;
    RAISE NOTICE '%', s;
END;
$$
$CMD$
        FROM cte_chain
        RETURNING task_id
    )
SELECT v_chain_id FROM cte_chain

H.3.13. Migration from others schedulers #

H.3.13.1. Migrate jobs from pg_cron to pg_timetable #

If you want to quickly export jobs scheduled from pg_cron to pg_timetable, you can use this SQL snippet:

SELECT timetable.add_job(
    job_name            => COALESCE(jobname, 'job: ' || command),
    job_schedule        => schedule,
    job_command         => command,
    job_kind            => 'SQL',
    job_live            => active
) FROM cron.job;

The timetable.add_job(), however, has some limitations. First of all, the function will mark the task created as autonomous, specifying scheduler should execute the task out of the chain transaction. It’s not an error, but many autonomous chains may cause some extra connections to be used.

Secondly, database connection parameters are lost for source pg_cron jobs, making all jobs local. To export every information available precisely as possible, use this SQL snippet under the role they were scheduled in pg_cron:

SET ROLE 'scheduler'; -- set the role used by pg_cron

WITH cron_chain AS (
    SELECT
        nextval('timetable.chain_chain_id_seq'::regclass) AS cron_id,
        jobname,
        schedule,
        active,
        command,
        CASE WHEN 
            database != current_database()
            OR nodename != 'localhost'
            OR username != CURRENT_USER
            OR nodeport != inet_server_port() 
        THEN
            format('host=%s port=%s dbname=%s user=%s', nodename, nodeport, database, username)
        END AS connstr
    FROM
        cron.job
),
cte_chain AS (
 INSERT INTO timetable.chain (chain_id, chain_name, run_at, live)
     SELECT 
         cron_id, COALESCE(jobname, 'cronjob' || cron_id), schedule, active
     FROM
         cron_chain
),
cte_tasks AS (
 INSERT INTO timetable.task (chain_id, task_order, kind, command, database_connection)
     SELECT
         cron_id, 1, 'SQL', command, connstr
     FROM
         cron_chain
     RETURNING
         chain_id, task_id
)
SELECT * FROM cte_tasks;

H.3.13.2. Migrate jobs from pgAgent to pg_timetable #

To migrate jobs from pgAgent, please use this script. pgAgent doesn’t have concept of PROGRAM task, thus to emulate BATCH steps, pg_timetable will execute them inside the shell. You may change the shell by editing cte_shell CTE clause.

CREATE OR REPLACE FUNCTION bool_array_to_cron(bool[], start_with int4 DEFAULT 0) RETURNS TEXT AS
$$
WITH u AS (
 SELECT unnest($1) e, generate_series($2, array_length($1, 1)-1+$2) AS i 
)
SELECT COALESCE(string_agg(i::text, ','), '*') FROM u WHERE e
$$
LANGUAGE sql;


WITH
cte_shell(shell, cmd_param) AS (
 VALUES ('sh', '-c') -- set the shell you want to use for batch steps, e.g. "pwsh -c", "cmd /C"
),
pga_schedule AS (
 SELECT
     s.jscjobid,
     s.jscname,
     format('%s %s %s %s %s', 
         bool_array_to_cron(s.jscminutes), 
         bool_array_to_cron(s.jschours), 
         bool_array_to_cron(s.jscmonthdays), 
         bool_array_to_cron(s.jscmonths, 1), 
         bool_array_to_cron(s.jscweekdays, 1)) AS schedule
 FROM 
     pgagent.pga_schedule s  
 WHERE s.jscenabled 
         AND now() < COALESCE(s.jscend, 'infinity'::timestamptz)
         AND now() > s.jscstart
),
pga_chain AS (
    SELECT
        nextval('timetable.chain_chain_id_seq'::regclass) AS chain_id,
        jobid,
        format('%s @ %s', jobname, jscname) AS jobname,
        jobhostagent,
        jobenabled,
        schedule
    FROM
        pgagent.pga_job JOIN pga_schedule ON jobid = jscjobid
),
cte_chain AS (
 INSERT INTO timetable.chain (chain_id, chain_name, client_name, run_at, live)
     SELECT 
         chain_id, jobname, jobhostagent, schedule, jobenabled
     FROM
         pga_chain
),
pga_step AS (
 SELECT 
     c.chain_id,
     nextval('timetable.task_task_id_seq'::regclass) AS task_id,
     rank() OVER (ORDER BY jstname) AS jstorder,
     jstid,
     jstname,
     jstenabled,
     CASE jstkind WHEN 'b' THEN 'PROGRAM' ELSE 'SQL' END AS jstkind,
     jstcode,
     COALESCE(
         NULLIF(jstconnstr, ''), 
         CASE 
             WHEN jstdbname = current_database() THEN NULL
             WHEN jstdbname > '' THEN 'dbname=' || jstdbname 
         END
     ) AS jstconnstr,
     jstonerror != 'f' AS jstignoreerror
 FROM
     pga_chain c JOIN pgagent.pga_jobstep js ON c.jobid = js.jstjobid
),
cte_tasks AS (
 INSERT INTO timetable.task(task_id, chain_id, task_name, task_order, kind, command, database_connection)
     SELECT
         task_id, chain_id, jstname, jstorder, jstkind::timetable.command_kind, 
         CASE jstkind WHEN 'SQL' THEN jstcode ELSE sh.shell END,
         jstconnstr
     FROM
         pga_step, cte_shell sh
),
cte_parameters AS (
 INSERT INTO timetable.parameter (task_id, order_id, value)
     SELECT 
         task_id, 1, jsonb_build_array(sh.cmd_param, s.jstcode)
     FROM
         pga_step s, cte_shell sh
     WHERE 
         s.jstkind = 'PROGRAM'
)
SELECT * FROM pga_chain;

H.3.14. REST API #

pg_timetable has a rich REST API, which can be used by external tools in order to perform start/stop/reinitialize/restarts/reloads, by any kind of tools to perform HTTP health checks, and of course, could also be used for monitoring.

Below you will find the list of pg_timetable REST API endpoints.

H.3.14.1. Health check endpoints #

GET /liveness

Always returns HTTP status code 200, indicating that pg_timetable is running.

GET /readiness

Returns HTTP status code 200 when the pg_timetable is running, and the scheduler is in the main loop processing chains. If the scheduler connects to the database, creates the database schema, or upgrades it, it will return the HTTP status code 503.

H.3.14.2. Chain management endpoints #

GET /startchain?id=<chain-id>

Returns HTTP status code 200 if the chain with the given id can be added to the worker queue. It doesn’t, however, mean the chain execution starts immediately. It is up to the worker to perform load and other checks before starting the chain. In the case of an error, the HTTP status code 400 followed by an error message returned.

GET /stopchain?id=<chain-id>

Returns HTTP status code 200 if the chain with the given id is working at the moment and can be stopped. If the chain is running the cancel signal would be sent immediately. In the case of an error, the HTTP status code 400 followed by an error message returned.

H.3.15. Database Schema #

pg_timetable is a database driven application. During the first start the necessary schema is created if absent.

H.3.15.1. Main tables and objects #

CREATE TABLE timetable.chain (
    chain_id            BIGSERIAL   PRIMARY KEY,
    chain_name          TEXT        NOT NULL UNIQUE,
    run_at              timetable.cron,
    max_instances       INTEGER,
    timeout             INTEGER     DEFAULT 0,
    live                BOOLEAN     DEFAULT FALSE,
    self_destruct       BOOLEAN     DEFAULT FALSE,
    exclusive_execution BOOLEAN     DEFAULT FALSE,
    client_name         TEXT,
    on_error            TEXT
);

COMMENT ON TABLE timetable.chain IS
    'Stores information about chains schedule';
COMMENT ON COLUMN timetable.chain.run_at IS
    'Extended CRON-style time notation the chain has to be run at';
COMMENT ON COLUMN timetable.chain.max_instances IS
    'Number of instances (clients) this chain can run in parallel';
COMMENT ON COLUMN timetable.chain.timeout IS
    'Abort any chain that takes more than the specified number of milliseconds';
COMMENT ON COLUMN timetable.chain.live IS
    'Indication that the chain is ready to run, set to FALSE to pause execution';
COMMENT ON COLUMN timetable.chain.self_destruct IS
    'Indication that this chain will delete itself after successful run';
COMMENT ON COLUMN timetable.chain.exclusive_execution IS
    'All parallel chains should be paused while executing this chain';
COMMENT ON COLUMN timetable.chain.client_name IS
    'Only client with this name is allowed to run this chain, set to NULL to allow any client';    

CREATE TYPE timetable.command_kind AS ENUM ('SQL', 'PROGRAM', 'BUILTIN');

CREATE TABLE timetable.task (
    task_id             BIGSERIAL               PRIMARY KEY,
    chain_id            BIGINT                  REFERENCES timetable.chain(chain_id) ON UPDATE CASCADE ON DELETE SCADE,
    task_order          DOUBLE PRECISION        NOT NULL,
    task_name           TEXT,
    kind                timetable.command_kind  NOT NULL DEFAULT 'SQL',
    command             TEXT                    NOT NULL,
    run_as              TEXT,
    database_connection TEXT,
    ignore_error        BOOLEAN                 NOT NULL DEFAULT FALSE,
    autonomous          BOOLEAN                 NOT NULL DEFAULT FALSE,
    timeout             INTEGER                 DEFAULT 0
);          

COMMENT ON TABLE timetable.task IS
    'Holds information about chain elements aka tasks';
COMMENT ON COLUMN timetable.task.chain_id IS
    'Link to the chain, if NULL task considered to be disabled';
COMMENT ON COLUMN timetable.task.task_order IS
    'Indicates the order of task within a chain';    
COMMENT ON COLUMN timetable.task.run_as IS
    'Role name to run task as. Uses SET ROLE for SQL commands';
COMMENT ON COLUMN timetable.task.ignore_error IS
    'Indicates whether a next task in a chain can be executed regardless of the success of the current one';
COMMENT ON COLUMN timetable.task.kind IS
    'Indicates whether "command" is SQL, built-in function or an external program';
COMMENT ON COLUMN timetable.task.command IS
    'Contains either an SQL command, or command string to be executed';
COMMENT ON COLUMN timetable.task.timeout IS
    'Abort any task within a chain that takes more than the specified number of milliseconds';

-- parameter passing for a chain task
CREATE TABLE timetable.parameter(
    task_id     BIGINT  REFERENCES timetable.task(task_id)
                        ON UPDATE CASCADE ON DELETE CASCADE,
    order_id    INTEGER CHECK (order_id > 0),
    value       JSONB,
    PRIMARY KEY (task_id, order_id)
);

COMMENT ON TABLE timetable.parameter IS
    'Stores parameters passed as arguments to a chain task';

CREATE UNLOGGED TABLE timetable.active_session(
    client_pid  BIGINT  NOT NULL,
    server_pid  BIGINT  NOT NULL,
    client_name TEXT    NOT NULL,
    started_at  TIMESTAMPTZ DEFAULT now()
);

COMMENT ON TABLE timetable.active_session IS
    'Stores information about active sessions';

CREATE TYPE timetable.log_type AS ENUM ('DEBUG', 'NOTICE', 'INFO', 'ERROR', 'PANIC', 'USER');

CREATE OR REPLACE FUNCTION timetable.get_client_name(integer) RETURNS TEXT AS
$$
    SELECT client_name FROM timetable.active_session WHERE server_pid = $1 LIMIT 1
$$
LANGUAGE sql;

CREATE TABLE timetable.log
(
    ts              TIMESTAMPTZ         DEFAULT now(),
    pid             INTEGER             NOT NULL,
    log_level       timetable.log_type  NOT NULL,
    client_name     TEXT                DEFAULT timetable.get_client_name(pg_backend_pid()),
    message         TEXT,
    message_data    jsonb
);

COMMENT ON TABLE timetable.log IS
    'Stores log entries of active sessions';

CREATE TABLE timetable.execution_log (
    chain_id    BIGINT,
    task_id     BIGINT,
    txid        BIGINT NOT NULL,
    last_run    TIMESTAMPTZ DEFAULT now(),
    finished    TIMESTAMPTZ,
    pid         BIGINT,
    returncode  INTEGER,
    kind        timetable.command_kind,
    command     TEXT,
    output      TEXT,
    client_name TEXT        NOT NULL
);

COMMENT ON TABLE timetable.execution_log IS
    'Stores log entries of executed tasks and chains';

CREATE UNLOGGED TABLE timetable.active_chain(
    chain_id    BIGINT  NOT NULL,
    client_name TEXT    NOT NULL,
    started_at  TIMESTAMPTZ DEFAULT now()
);

COMMENT ON TABLE timetable.active_chain IS
    'Stores information about active chains within session';

CREATE OR REPLACE FUNCTION timetable.try_lock_client_name(worker_pid BIGINT, worker_name TEXT)
RETURNS bool AS
$CODE$
BEGIN
    IF pg_is_in_recovery() THEN
        RAISE NOTICE 'Cannot obtain lock on a replica. Please, use the primary node';
        RETURN FALSE;
    END IF;
    -- remove disconnected sessions
    DELETE
        FROM timetable.active_session
        WHERE server_pid NOT IN (
            SELECT pid
            FROM pg_catalog.pg_stat_activity
            WHERE application_name = 'pg_timetable'
        );
    DELETE 
        FROM timetable.active_chain 
        WHERE client_name NOT IN (
            SELECT client_name FROM timetable.active_session
        );
    -- check if there any active sessions with the client name but different client pid
    PERFORM 1
        FROM timetable.active_session s
        WHERE
            s.client_pid <> worker_pid
            AND s.client_name = worker_name
        LIMIT 1;
    IF FOUND THEN
        RAISE NOTICE 'Another client is already connected to server with name: %', worker_name;
        RETURN FALSE;
    END IF;
    -- insert current session information
    INSERT INTO timetable.active_session(client_pid, client_name, server_pid) VALUES (worker_pid, worker_name, backend_pid());
    RETURN TRUE;
END;
$CODE$
STRICT
LANGUAGE plpgsql;

H.3.15.4. ER-Diagram #

ER-Diagram