F.43. PGQ#

F.43. PGQ

F.43. PGQ

F.43.1. О pgAudit

Версия: 3.5

GitHub

F.43.2. Описание

PGQ - это решение для очередей от Skytools. Решение для репликации Londiste_Tutorial - это фоновый потребитель, построенный на основе PGQ, и API доступен для создания любого асинхронного обработчика, основанного на очередях.

Подсказка: Начиная с 2021 года есть предложение о дополнительной части стандарта SQL: SQL/PGQ" (Property Graph Queries). Не путайте данную информацию с этим предложением.

F.43.3. Какие проблемы решает PGQ?

PGQ решит асинхронную пакетную обработку живых транзакций.

Это означает, что вы выполняете некоторые INSERT/DELETE/UPDATE строк из вашей рабочей среды, и вы хотите вызвать некоторые действия, но не в момент COMMIT, а позже. Не в слишком отдаленном будущем, просто асинхронно: без блокировки рабочей транзакции.

Каждое приложение определенного размера будет нуждаться в отложенной обработке некоторых операций на более поздний срок, и PGQ - это универсальное высокопроизводительное решение, созданное для PostgreSQL, которое позволяет реализовать именно это: пакетную обработку. PGQ будет заботиться о асинхронном потреблении событий, управлении ошибками, поведении очереди и т. д., и он поставляется с простым SQL API для этого.

F.43.4. Установка и настройка

Вам потребуется иметь экземпляр тикера, работающий на базе данных, где происходит генерация событий, что означает, что вам нужно предоставить файл конфигурации ticker.ini и запустить демона тикера.

F.43.4.1. Тикер

См. Руководство по Londiste, The_ticker_daemon для введения в тему, примера конфигурации и инструкций по запуску демона.

Тикер будет генерировать тики, которые затем будут служить границами для пакетных событий. Эти пакеты генерируются по требованию, каждый раз, когда потребитель запрашивает новый пакет, и будут генерироваться таким образом, чтобы каждый пакет содержал либо события, стоимостью в ticker_max_lag секунд, либо ticker_max_count событий, в зависимости от того, что произойдет раньше.

Демон отвечает за обслуживание очередей, что достигается путем ротации событий INSERT в 3 таблицах и использования операции TRUNCATE, как только одна из таблиц больше не содержит необработанных событий.

F.43.4.2. Производство и потребление событий

Чтобы не потерять ни одно событие, убедитесь, что есть хотя бы один зарегистрированный потребитель перед тем, как производить какое-либо событие, так как события, в которых никто не заинтересован, теряются.

Вы можете иметь столько потребителей, сколько вам нужно, в одной очереди событий, но все они будут видеть одни и те же события, а не разделять нагрузку. Это имеет смысл, если учесть, что основное применение - репликация: потребители являются репликами, и когда их больше одного, вы хотите, чтобы все они обрабатывали одни и те же события, чтобы все реплики имели одинаковый набор данных.

F.43.5. Генерация событий

Документация вполне ясно говорит об этом, вы можете создавать события либо с использованием API на основе функций, либо с использованием триггеров. Последний вариант предпочтительнее, так как он обеспечивает простую проверку типов данных SQL и известный интерфейс для вставки событий, классическую INSERT INTO.

F.43.6. Написание потребителя PGQ

Вашим лучшим вариантом будет написать код потребителя на Python, следующим лучшим вариантом - PHP. Если вы хотите использовать существующий код, написанный на другом языке, вам придется самостоятельно написать код поддержки демона операционной системы и цикл вокруг точек входа API PGQ.

Вся логика PGQ написана в виде расширения PostgreSQL и кода на стороне сервера, поэтому все, что вам нужно от вашего языка, это способ подключиться к вашему серверу PostgreSQL и вызвать функции.

F.43.6.1. Как использовать PGQ SQL API

Сначала вам понадобится, чтобы ваш код потребителя мог зарегистрироваться и отменить регистрацию в существующей очереди.

Затем основная идея довольно проста: потребитель циклически вызывает pgq.next_batch() как можно быстрее, пока не получит NULL, вот тогда он должен отдохнуть.

Пожалуйста, обратите внимание, что демон тикер будет генерировать тики каждый ticker_idle_period, когда ваша система не производит события, поэтому совершенно нормально (и ожидаемо) получать пустые пакеты в периоды низкой активности, например, при отладке вашего первого потребителя. Пустые пакеты нужно потреблять как можно быстрее, только позвольте вашему пользовательскому подписчику отдохнуть, когда pgq.next_batch() возвращает NULL.

Когда pgq.next_batch() возвращает идентификатор (bigint), вы получаете возможность обработать события пакета, которые вы получаете с помощью pgq.get_batch_events(batch_id). Что делать с каждым событием должно быть специфично для кода потребителя, а не в общем интерфейсе абстракции PGQ потребления (или библиотеки, пакета, как называется ваше средство программирования).

Функция обработки событий должна иметь возможность помечать события как обработанные (OK), "неудачные" или для "повторной попытки" позже. В случае повторной попытки, событие будет введено снова в последующую партию, в зависимости от того, через какой промежуток времени вы хотите повторить его.

После обработки всех событий пакета, необходимо вызвать pgq.finish_batch(batch_id), а затем выполнить COMMIT вашей транзакции обработки пакета.

В псевдо-языке это дает:

do
  batch_id = pgq.next_batch()
  if batch_id is not null
  then
    BEGIN;
    events = pgq.get_batch_events(batch_id);

    // this could be a function pointer or a virtual method or a delegate, e.g.
    user_defined_process_event(events);

    pgq.finish_batch(batch_id);

    COMMIT;
  end if;
while true;

F.43.6.2. Удаленное потребление

Это ситуация, когда обработка событий происходит на другой базе данных, чем та, где события были созданы. Londiste является хорошим примером удаленного потребления.

В этом случае вам потребуется реализовать способ избежать повторной обработки пакета, когда COMMIT обработки прошел успешно, но pgq.finish_batch() (который должен быть выполнен на производящей базе данных, а не на потребляющей) завершился неудачей. Как решить проблему в трех других сценариях оставляется на усмотрение читателя.

Идея pgq_ext заключается в записи на потребительской базе данных последнего обработанного идентификатора пакета (batch_id) и выполнении обновления в рамках транзакции обработки. Данные могут быть уточнены до последнего обработанного идентификатора события (event id) в случаях, когда у вас нет простого способа откатить всю обработку пакетов, если сбой произошел на удаленном сервере.

Как справочная информация, вы можете захотеть знать, что londiste использует pgq_ext, но перемещает записи SQL (функции и таблицы) в схему londiste.

F.43.6.3. Нетранзакционная обработка

Ваш код потребителя может отправлять почту вместо изменения состояния базы данных. В этом случае обработка не является транзакционной (вы не можете откатить пакетную обработку), и вам придется самостоятельно решать проблемы надежности, PGQ не предоставит волшебство.

F.43.6.4. Использование Python API

F.43.6.4.1. Использование: счетчик строк для ускорения count(*)

Вот фрагмент кода на Python для потребителя PGQ:

import pgq 
class RowCounter(pgq.Consumer): 
    def process_batch(self, db, batch_id, ev_list): 
        tbl = self.cf.get('table_name'); delta = 0 
        for ev in ev_list: 
            if   ev.type == 'I' and ev.extra1 == tbl: delta += 1 
            elif ev.type == 'D' and ev.extra1 == tbl: delta -= 1 
            ev.tag_done() 
        q = 'select update_stats(%s, %s)' 
        db.cursor().execute(q, [tbl, delta]) 
RowCounter('row_counter', 'db', sys.argv[1:]).start() 

F.43.6.5. Использование PHP API

Был также представлен API для PHP, позволяющий легко написать демон-потребитель PGQ на PHP. Смотрите libphp-pgq для более подробной информации.

Вот пример

#!/usr/bin/php5
<?php
require( "pgq/PGQRemoteConsumer.php" );
require("conf/duration.php");

define("CONFIGURATION", "conf/duration.php");

$con_src = "dbname=foo_db port=5432 host=localhost";
$con_dst = "dbname=bar_db port=5432 host=localhost"

class PGQDaemonExample extends PGQRemoteConsumer
{
   public function config( )
   {
       unset($Config);
       if( $this->log !== null )
           $this->log->notice("Reloading configuration (HUP) from '%s'", CONFIGURATION);

       global $Config;
       require(CONFIGURATION);

       $this->loglevel = $Config["LOGLEVEL"];
       $this->logfile  = $Config["LOGFILE"];
       $this->delay    = $Config["DELAY"];
   }

   public function process_event( &$event ) 
   {
       $this->log->notice("Starting process event");

       $id = $event->data["id"];
       $code = $event->data["code"];
       $data = $event->data["data"];

       $this->log->notice("Processing event : %d ", $id);

       $query = sprintf( "UPDATE table SET ... WHERE ...", $code );
       $this->log->debug( $query );
       $result = pg_query( $this->pg_dst_con, $query );

       if( $result === False ) 
       {
           $this->log->error( "Unable to update : %s ", $query );
           $event->retry_delay = 2 * $this->delay;
           return PGQ_EVENT_RETRY;
       }

       return PGQ_EVENT_OK;
   }
}

$daemon = new PGQDaemonExample( "mydaemon", "daemonq", "table", $argc, $argv, $con_src, $con_dst );
?>