F.51. PGQ — универсальная, высокопроизводительная очередь без блокировок с простым API на основе SQL-функций#
F.51. PGQ — универсальная, высокопроизводительная очередь без блокировок с простым API на основе SQL-функций #
Версия: 3.5.1
F.51.1. Обзор #
PGQ является решением для очередей от Skytools. Решение для репликации Londiste представляет собой демон-потребитель, построенный на основе PGQ, и API доступен для создания любых асинхронных процессов, основанных на очередях.
F.51.2. Какие проблемы решает PGQ? #
PGQ решит асинхронную пакетную обработку живых транзакций.
Это означает, что вы выполняете некоторые INSERT/DELETE/UPDATE строк из
вашей рабочей среды, и нужно вызвать некоторые действия, но
не в момент COMMIT, а позже. Не в слишком отдаленном будущем,
просто асинхронно: без блокировки рабочей транзакции.
Каждое приложение определенного размера будет нуждаться в отложенной обработке некоторых операций на более поздний срок, и PGQ - это универсальное высокопроизводительное решение, созданное для PostgreSQL, которое позволяет реализовать именно это: пакетную обработку. PGQ будет заботиться о асинхронном потреблении событий, управлении ошибками, поведении очереди и т. д., и он поставляется с простым SQL API для этого.
F.51.3. Установка и настройка #
Вам потребуется иметь экземпляр тикера, работающий на базе данных, где происходит генерация событий, что означает, что вам нужно предоставить файл конфигурации ticker.ini и запустить демона тикера.
F.51.3.1. Тикер #
См. Londiste_Tutorial#The_ticker_daemon для введения в тему, примера конфигурации и как запустить демон.
Тикер будет генерировать тики, которые затем будут служить границами для пакетных событий. Эти пакеты генерируются по требованию, каждый раз, когда потребитель запрашивает новый пакет, и будут генерироваться таким образом, чтобы каждый пакет содержал либо события, стоимостью в ticker_max_lag секунд, либо ticker_max_count событий, в зависимости от того, что произойдет раньше.
Демон отвечает за обслуживание очередей, что достигается путем ротации событий INSERT в 3 таблицах и использования операции TRUNCATE, как только одна из таблиц больше не содержит необработанных событий.
F.51.3.2. Производство и потребление событий #
Чтобы не потерять ни одно событие, убедитесь, что есть хотя бы один зарегистрированный потребитель перед тем, как производить какое-либо событие, так как события, в которых никто не заинтересован, теряются.
Вы можете иметь столько потребителей, сколько вам нужно, в одной очереди событий, но все они будут видеть одни и те же события, а не разделять нагрузку. Это имеет смысл, если учесть, что основное применение - репликация: потребители являются репликами, и когда их больше одного, нужно, чтобы все они обрабатывали одни и те же события, чтобы все реплики имели одинаковый набор данных.
F.51.4. Генерация событий #
Документация вполне ясно говорит об этом, вы можете создавать события либо с использованием API на основе функций, либо с использованием триггеров. Последний вариант предпочтительнее, так как он обеспечивает простую проверку типов данных SQL и известный интерфейс для вставки событий, классическую INSERT INTO.
F.51.5. Написание потребителя PGQ #
Вашим лучшим вариантом будет написать код потребителя на Python, следующим лучшим вариантом - PHP. Если нужно использовать существующий код, написанный на другом языке, вам придется самостоятельно написать код поддержки демона операционной системы и цикл вокруг точек входа API PGQ.
Вся логика PGQ написана в виде расширения PostgreSQL и кода на стороне сервера, поэтому все, что вам нужно от вашего языка, это способ подключиться к вашему серверу PostgreSQL и вызвать функции.
F.51.5.1. Как использовать PGQ SQL API #
Сначала вам понадобится, чтобы ваш код потребителя мог зарегистрироваться и отменить регистрацию в существующей очереди.
Затем основная идея довольно проста: потребитель циклически вызывает pgq.next_batch() как можно быстрее, пока не получит NULL, вот тогда он должен отдохнуть.
Пожалуйста, обратите внимание, что демон тикер будет генерировать тики каждый ticker_idle_period, когда ваша система не производит события, поэтому совершенно нормально (и ожидаемо) получать пустые пакеты в периоды низкой активности, например, при отладке вашего первого потребителя. Пустые пакеты нужно потреблять как можно быстрее, только позвольте вашему пользовательскому подписчику отдохнуть, когда pgq.next_batch() возвращает NULL.
Когда pgq.next_batch() возвращает идентификатор (bigint), вы получаете возможность обработать события пакета, которые вы получаете с помощью pgq.get_batch_events(batch_id). Что делать с каждым событием должно быть специфично для кода потребителя, а не в общем интерфейсе абстракции PGQ потребления (или библиотеки, пакета, как называется ваше средство программирования).
Функция обработки событий должна иметь возможность помечать события как обработанные (OK), "неудачные" или для "повторной попытки" позже. В случае повторной попытки, событие будет введено снова в последующую партию, в зависимости от того, через какой промежуток времени нужно повторить его.
После обработки всех событий пакета, необходимо вызвать pgq.finish_batch(batch_id), а затем выполнить COMMIT вашей транзакции обработки пакета.
В псевдо-языке это дает:
do
batch_id = pgq.next_batch()
if batch_id is not null
then
BEGIN;
events = pgq.get_batch_events(batch_id);
// this could be a function pointer or a virtual method or a delegate, e.g.
user_defined_process_event(events);
pgq.finish_batch(batch_id);
COMMIT;
end if;
while true;
F.51.5.2. Удаленное потребление #
Это ситуация, когда обработка событий происходит на другой базе данных, чем та, где события были созданы. Londiste является хорошим примером удаленного потребления.
В этом случае вам потребуется реализовать способ избежать повторной обработки пакета, когда COMMIT обработки прошел успешно, но pgq.finish_batch() (который должен быть выполнен на производящей базе данных, а не на потребляющей) завершился неудачей. Как решить проблему в трех других сценариях оставляется на усмотрение читателя.
Идея pgq_ext заключается в записи на потребительской базе данных последнего обработанного идентификатора пакета (batch_id) и выполнении обновления в рамках транзакции обработки. Данные могут быть уточнены до последнего обработанного идентификатора события (event id) в случаях, когда у вас нет простого способа откатить всю обработку пакетов, если сбой произошел на удаленном сервере.
Как справочная информация, вы можете захотеть знать, что londiste использует pgq_ext, но перемещает записи SQL (функции и таблицы) в схему londiste.
F.51.5.3. Нетранзакционная обработка #
Ваш код потребителя может отправлять почту вместо изменения состояния базы данных. В этом случае обработка не является транзакционной (вы не можете откатить пакетную обработку), и вам придется самостоятельно решать проблемы надежности, PGQ не предоставит волшебство.
F.51.5.4. Использование Python API #
[[Skytools]] написаны в основном на языке Python и предоставляют все необходимое для написания собственных потребителей.
F.51.5.4.1. Использование: счетчик строк для ускорения count(*) #
Вот фрагмент кода на Python для потребителя PGQ:
import pgq
class RowCounter(pgq.Consumer):
def process_batch(self, db, batch_id, ev_list):
tbl = self.cf.get('table_name'); delta = 0
for ev in ev_list:
if ev.type == 'I' and ev.extra1 == tbl: delta += 1
elif ev.type == 'D' and ev.extra1 == tbl: delta -= 1
ev.tag_done()
q = 'select update_stats(%s, %s)'
db.cursor().execute(q, [tbl, delta])
RowCounter('row_counter', 'db', sys.argv[1:]).start()
F.51.5.5. Использование PHP API #
Также был внесён вклад в виде PHP API, позволяющего легко писать демон-потребитель PGQ на PHP.
Вот пример
#!/usr/bin/php5
<?php
require( "pgq/PGQRemoteConsumer.php" );
require("conf/duration.php");
define("CONFIGURATION", "conf/duration.php");
$con_src = "dbname=foo_db port=5432 host=localhost";
$con_dst = "dbname=bar_db port=5432 host=localhost"
class PGQDaemonExample extends PGQRemoteConsumer
{
public function config( )
{
unset($Config);
if( $this->log !== null )
$this->log->notice("Reloading configuration (HUP) from '%s'", CONFIGURATION);
global $Config;
require(CONFIGURATION);
$this->loglevel = $Config["LOGLEVEL"];
$this->logfile = $Config["LOGFILE"];
$this->delay = $Config["DELAY"];
}
public function process_event( &$event )
{
$this->log->notice("Starting process event");
$id = $event->data["id"];
$code = $event->data["code"];
$data = $event->data["data"];
$this->log->notice("Processing event : %d ", $id);
$query = sprintf( "UPDATE table SET ... WHERE ...", $code );
$this->log->debug( $query );
$result = pg_query( $this->pg_dst_con, $query );
if( $result === False )
{
$this->log->error( "Unable to update : %s ", $query );
$event->retry_delay = 2 * $this->delay;
return PGQ_EVENT_RETRY;
}
return PGQ_EVENT_OK;
}
}
$daemon = new PGQDaemonExample( "mydaemon", "daemonq", "table", $argc, $argv, $con_src, $con_dst );
?>
F.51.5.6. Java API #
Также доступен Java API для потребителей.