PostgreSQL
pg+debezium+kafka实时同步
三机patroni+etcd高可用
基于Python实现大批量dataframe格式数据快速存入postgresql
python数据批量插入postgreSQL数据库
pg 插件扩展(extension)开发
psql 连接数据库的方法
pg 函数返回表、数据集、结果集
pg 日志记录 异常处理
pg 作业调度器 pg_timetable
pg 查询对象的权限
正则表达式去除注释
pg 数据库高可用及负载均衡JDBC参数
pg 生成节假日
pg_recvlogical 解析日志
pg 查看锁表
pg upsert写法
pg 查看函数或存储过程的定义
plpgsql 代码块
pg 查看表的膨胀率
pg 获取表大小
pg 非物化视图所有者刷新物化视图
本站点使用 MrDoc 构建
-
+
首页
pg 作业调度器 pg_timetable
> `pg_timetable`是`PostgreSQL`的高级作业调度器,与传统的调度器(如cron等)相比具有许多优势。 它完全由数据库驱动,并提供了一些高级概念 * GitHub:https://github.com/cybertec-postgresql/pg_timetable * Doc:https://pg-timetable.readthedocs.io/en/master/index.html * 官网:https://www.cybertec-postgresql.com/ ## 安装 ```bash $ tar -zxf pg_timetable_4.5.0_Linux_x86_64.tar.gz $ cp pg_timetable_4.5.0_Linux_x86_64/pg_timetable $PGHOME/bin $ pg_timetable Configuration error: The required flag `-c, --clientname` was not specified Usage: pg_timetable Application Options: -c, --clientname= Unique name for application instance [$PGTT_CLIENTNAME] --config= YAML configuration file --no-program-tasks Disable executing of PROGRAM tasks [$PGTT_NOPROGRAMTASKS] -v, --version Output detailed version information [$PGTT_VERSION] Connection: -h, --host= PostgreSQL host (default: localhost) [$PGTT_PGHOST] -p, --port= PostgreSQL port (default: 5432) [$PGTT_PGPORT] -d, --dbname= PostgreSQL database name (default: timetable) [$PGTT_PGDATABASE] -u, --user= PostgreSQL user (default: scheduler) [$PGTT_PGUSER] --password= PostgreSQL user password [$PGTT_PGPASSWORD] --sslmode=[disable|require] What SSL priority use for connection (default: disable) --pgurl= PostgreSQL connection URL [$PGTT_URL] --timeout= PostgreSQL connection timeout (default: 90) [$PGTT_TIMEOUT] Logging: --log-level=[debug|info|error] Verbosity level for stdout and log file (default: info) --log-database-level=[debug|info|error] Verbosity level for database storing (default: info) --log-file= File name to store logs --log-file-format=[json|text] Format of file logs (default: json) Start: -f, --file= SQL script file to execute during startup --init Initialize database schema to the latest version and exit. Can be used with --upgrade --upgrade Upgrade database to the latest version --debug Run in debug mode. Only asynchronous chains will be executed Resource: --cron-workers= Number of parallel workers for scheduled chains (default: 16) --interval-workers= Number of parallel workers for interval chains (default: 16) --chain-timeout= Abort any chain that takes more than the specified number of milliseconds --task-timeout= Abort any task within a chain that takes more than the specified number of milliseconds REST: --rest-port= REST API port (default: 0) [$PGTT_RESTPORT] # 启动pg_timetable,第一次会自动创建schema及相关表、函数 pg_timetable postgresql://postgres@localhost:5432/stockdb --clientname=pgtimetable01 # 后台启动 nohup pg_timetable postgresql://postgres@localhost:5432/stockdb --clientname=pgtimetable01 \ --log-file=/u01/pgdata/data_5432/pg_timetable01.log > /dev/null 2>&1 & ``` ## 示例 * 基本任务 ```sql SELECT timetable.add_job( job_name => 'notify every minute', job_schedule => '* * * * *', job_command => 'SELECT pg_notify($1, $2)', job_parameters => '[ "TT_CHANNEL", "Ahoj from SQL base task" ]' :: jsonb, job_kind => 'SQL'::timetable.command_kind, job_client_name => NULL, job_max_instances => 1, job_live => TRUE, job_self_destruct => FALSE, job_ignore_errors => TRUE ) as chain_id; ``` * 发送电子邮件 ```sql DO $$ -- An example for using the SendMail task. DECLARE v_mail_task_id bigint; v_log_task_id bigint; v_chain_id bigint; BEGIN -- Get the chain id INSERT INTO timetable.chain (chain_name, max_instances, live) VALUES ('Send Mail', 1, TRUE) RETURNING chain_id INTO v_chain_id; -- Add SendMail task INSERT INTO timetable.task (chain_id, task_order, kind, command) SELECT v_chain_id, 10, 'BUILTIN', 'SendMail' RETURNING task_id INTO v_mail_task_id; -- Create the parameters for the SensMail task -- "username": The username used for authenticating on the mail server -- "password": The password used for authenticating on the mail server -- "serverhost": The IP address or hostname of the mail server -- "serverport": The port of the mail server -- "senderaddr": The email that will appear as the sender -- "ccaddr": String array of the recipients(Cc) email addresses -- "bccaddr": String array of the recipients(Bcc) email addresses -- "toaddr": String array of the recipients(To) email addresses -- "subject": Subject of the email -- "attachment": String array of the attachments (local file) -- "attachmentdata": Pairs of name and base64-encoded content -- "msgbody": The body of the email INSERT INTO timetable.parameter (task_id, order_id, value) VALUES (v_mail_task_id, 1, '{ "username": "user@example.com", "password": "password", "serverhost": "smtp.example.com", "serverport": 587, "senderaddr": "user@example.com", "ccaddr": ["recipient_cc@example.com"], "bccaddr": ["recipient_bcc@example.com"], "toaddr": ["recipient@example.com"], "subject": "pg_timetable - No Reply", "attachment": ["D:\\Go stuff\\Books\\Concurrency in Go.pdf","report.yaml"], "attachmentdata": [{"name": "File.txt", "base64data": "RmlsZSBDb250ZW50"}], "msgbody": "<b>Hello User,</b> <p>I got some Go books for you enjoy</p> <i>pg_timetable</i>!", "contenttype": "text/html; charset=UTF-8" }'::jsonb); -- Add Log task and make it the last task using `task_order` column (=30) INSERT INTO timetable.task (chain_id, task_order, kind, command) SELECT v_chain_id, 30, 'BUILTIN', 'Log' RETURNING task_id INTO v_log_task_id; -- Add housekeeping task, that will delete sent mail and update parameter for the previous logging task -- Since we're using special add_task() function we don't need to specify the `chain_id`. -- Function will take the same `chain_id` from the parent task, SendMail in this particular case PERFORM timetable.add_task( kind => 'SQL', parent_id => v_mail_task_id, command => format( $query$WITH sent_mail(toaddr) AS (DELETE FROM timetable.parameter WHERE task_id = %s RETURNING value->>'username') INSERT INTO timetable.parameter (task_id, order_id, value) SELECT %s, 1, to_jsonb('Sent emails to: ' || string_agg(sent_mail.toaddr, ';')) FROM sent_mail ON CONFLICT (task_id, order_id) DO UPDATE SET value = EXCLUDED.value$query$, v_mail_task_id, v_log_task_id ), order_delta => 10 ); -- In the end we should have something like this. Note, that even Log task was created earlier it will be executed later -- due to `task_order` column. -- timetable=> SELECT task_id, chain_id, kind, left(command, 50) FROM timetable.task ORDER BY task_order; -- task_id | chain_id | task_order | kind | left -- ---------+----------+------------+---------+--------------------------------------------------------------- -- 45 | 24 | 10 | BUILTIN | SendMail -- 47 | 24 | 20 | SQL | WITH sent_mail(toaddr) AS (DELETE FROM timetable.p -- 46 | 24 | 30 | BUILTIN | Log -- (3 rows) END; $$ LANGUAGE PLPGSQL; ``` * 下载转换和导入 ```sql -- Prepare the destination table 'location' CREATE TABLE IF NOT EXISTS city( city text, lat numeric, lng numeric, country text, iso2 text, admin_name text, capital text, population bigint, population_proper bigint); -- An enhanced example consisting of three tasks: -- 1. Download text file from internet using BUILT-IN command -- 2. Remove accents (diacritic signs) from letters using PROGRAM command (can be done with `unaccent` PostgreSQL extension) -- 3. Import text file as CSV file using BUILT-IN command (can be down with `psql -c /copy`) DO $$ DECLARE v_head_id bigint; v_task_id bigint; v_chain_id bigint; BEGIN -- Create the chain with default values executed every minute (NULL == '* * * * *' :: timetable.cron) INSERT INTO timetable.chain (chain_name, live) VALUES ('Download locations and aggregate', TRUE) RETURNING chain_id INTO v_chain_id; -- Step 1. Download file from the server -- Create the chain INSERT INTO timetable.task (chain_id, task_order, kind, command, ignore_error) VALUES (v_chain_id, 1, 'BUILTIN', 'Download', TRUE) RETURNING task_id INTO v_task_id; -- Create the parameters for the step 1: INSERT INTO timetable.parameter (task_id, order_id, value) VALUES (v_task_id, 1, '{ "workersnum": 1, "fileurls": ["https://simplemaps.com/static/data/country-cities/mt/mt.csv"], "destpath": "." }'::jsonb); RAISE NOTICE 'Step 1 completed. Chain added with ID: %; DownloadFile task added with ID: %', v_chain_id, v_task_id; -- Step 2. Transform Unicode characters into ASCII -- Create the program task to call 'uconv' and name it 'unaccent' INSERT INTO timetable.task (chain_id, task_order, kind, command, ignore_error, task_name) VALUES (v_chain_id, 2, 'PROGRAM', 'uconv', TRUE, 'unaccent') RETURNING task_id INTO v_task_id; -- Create the parameters for the 'unaccent' task. Input and output files in this case -- Under Windows we should call PowerShell instead of "uconv" with command: -- Set-content "orte_ansi.txt" ((Get-content "orte.txt").Normalize("FormD") -replace '\p{M}', '') INSERT INTO timetable.parameter (task_id, order_id, value) VALUES (v_task_id, 1, '["-x", "Latin-ASCII", "-o", "mt_ansi.csv", "mt.csv"]'::jsonb); RAISE NOTICE 'Step 2 completed. Unacent task added with ID: %', v_task_id; -- Step 3. Import ASCII file to PostgreSQL table using "CopyFromFile" built-in command INSERT INTO timetable.task (chain_id, task_order, kind, command) VALUES (v_chain_id, 3, 'BUILTIN', 'CopyFromFile') RETURNING task_id INTO v_task_id; -- Add the parameters for the download task. Execute client side COPY to 'location' from 'orte_ansi.txt' INSERT INTO timetable.parameter (task_id, order_id, value) VALUES (v_task_id, 1, '{"sql": "COPY city FROM STDIN (FORMAT csv, HEADER true)", "filename": "mt_ansi.csv" }'::jsonb); RAISE NOTICE 'Step 3 completed. Import task added with ID: %', v_task_id; INSERT INTO timetable.task (chain_id, task_order, kind, command, ignore_error, task_name) VALUES (v_chain_id, 4, 'PROGRAM', 'bash', TRUE, 'remove .csv') RETURNING task_id INTO v_task_id; INSERT INTO timetable.parameter (task_id, order_id, value) VALUES (v_task_id, 1, '["-c", "rm *.csv"]'::jsonb); END; $$ LANGUAGE PLPGSQL; ``` * 在自治事务中运行任务,例如`CREATE DATABASE, REINDEX, VACUUM, CREATE TABLESPACE`等 ```sql -- An advanced example showing how to use atutonomous tasks. -- This one-task chain will execute test_proc() procedure. -- Since procedure will make two commits (after f1() and f2()) -- we cannot use it as a regular task, because all regular tasks -- must be executed in the context of a single chain transaction. -- Same rule applies for some other SQL commands, -- e.g. CREATE DATABASE, REINDEX, VACUUM, CREATE TABLESPACE, etc. CREATE OR REPLACE FUNCTION f (msg TEXT) RETURNS void AS $$ BEGIN RAISE notice '%', msg; END; $$ LANGUAGE PLPGSQL; CREATE OR REPLACE PROCEDURE test_proc () AS $$ BEGIN PERFORM f('hey 1'); COMMIT; PERFORM f('hey 2'); COMMIT; END; $$ LANGUAGE PLPGSQL; WITH cte_chain (v_chain_id) AS ( INSERT INTO timetable.chain (chain_name, run_at, max_instances, live, self_destruct) VALUES ( 'call proc() every 10 sec', -- chain_name, '@every 10 seconds', -- run_at, 1, -- max_instances, TRUE, -- live, FALSE -- self_destruct ) RETURNING chain_id ), cte_task(v_task_id) AS ( INSERT INTO timetable.task (chain_id, task_order, kind, command, ignore_error, autonomous) SELECT v_chain_id, 10, 'SQL', 'CALL test_proc()', TRUE, TRUE FROM cte_chain RETURNING task_id ) SELECT v_chain_id, v_task_id FROM cte_task, cte_chain; ``` * 关闭调度程序并终止会话 ```sql -- This one-task chain (aka job) will terminate pg_timetable session. -- This is useful for maintaining purposes or before database being destroyed. -- One should take care of restarting pg_timetable if needed. SELECT timetable.add_job ( job_name => 'Shutdown pg_timetable session on schedule', job_schedule => '* * 1 * *', job_command => 'Shutdown', job_kind => 'BUILTIN' ); ``` * 访问上一个任务的结果和下一个任务的输出 ```sql WITH cte_chain (v_chain_id) AS ( -- let's create a new chain and add tasks to it later INSERT INTO timetable.chain (chain_name, run_at, max_instances, live) VALUES ('many tasks', '* * * * *', 1, true) RETURNING chain_id ), cte_tasks(v_task_id) AS ( -- now we'll add 500 tasks to the chain, some of them will fail INSERT INTO timetable.task (chain_id, task_order, kind, command, ignore_error) SELECT v_chain_id, g.s, 'SQL', 'SELECT 1.0 / round(random())::int4;', true FROM cte_chain, generate_series(1, 500) AS g(s) RETURNING task_id ), report_task(v_task_id) AS ( -- and the last reporting task will calculate the statistic INSERT INTO timetable.task (chain_id, task_order, kind, command) SELECT v_chain_id, 501, 'SQL', $CMD$DO $$ DECLARE s TEXT; BEGIN WITH report AS ( SELECT count(*) FILTER (WHERE returncode = 0) AS success, count(*) FILTER (WHERE returncode != 0) AS fail, count(*) AS total FROM timetable.execution_log WHERE chain_id = current_setting('pg_timetable.current_chain_id')::bigint AND txid = txid_current() ) SELECT 'Tasks executed:' || total || '; succeeded: ' || success || '; failed: ' || fail || '; ratio: ' || 100.0*success/GREATEST(total,1) INTO s FROM report; RAISE NOTICE '%', s; END; $$ $CMD$ FROM cte_chain RETURNING task_id ) SELECT v_chain_id FROM cte_chain ```
vleity
2025年7月19日 11:06
转发文档
收藏文档
上一篇
下一篇
手机扫码
复制链接
手机扫一扫转发分享
复制链接
Markdown文件
PDF文档(打印)
分享
链接
类型
密码
更新密码