Клиенты

Поскольку PipelineDB совместим с PostgreSQL 9.5, у него нет собственных клиентских библиотек. То есть любой клиент, который работает с PostgreSQL (или любой другой SQL-базой данных), будет работать с PipelineDB.

Здесь приведены примеры простого приложения PipelineDB на нескольких языках и разных клиентах. Приложение просто создает CONTINUOUS VIEW:

CREATE CONTINUOUS VIEW continuous view AS SELECT x::integer, COUNT(*) FROM stream GROUP BY x;

Приложение затем генерирует 100,000 событий, получает 10 уникальных группировок для CONTINUOUS VIEW, и выводит результаты.

Python

Для работы с примером на Python, приведенным ниже, понадобится установить psycopg2.

import psycopg2

conn = psycopg2.connect('dbname=test user=user host=localhost port=5432') pipeline = conn.cursor()

create_cv = """ CREATE CONTINUOUS VIEW continuous_view AS SELECT x::integer, COUNT(\*) FROM stream GROUP BY x """ pipeline.execute(create_cv) conn.commit()

rows = \[]

for n in range(100000): # 10 unique groupings x = n % 10 rows.append({'x': x})

\# Теперь запишите строки в поток pipeline.executemany(INSERT INTO stream (x) VALUES (%(x)s), rows)

\# И прочтите результаты pipeline.execute(SELECT * FROM continuous_view) rows = pipeline.fetchall()

for row in rows: x, count = row

    print x, count

pipeline.execute('DROP CONTINUOUS VIEW continuous_view') pipeline.close()

Ruby

В этом примере на Ruby используется pg gem.

require 'pg'
pipeline = PGconn.connect("dbname='test' user='user' host='localhost' port=5432")

# Это непрерывное представление будет выполнять 3 агрегации по трафику просмотра страниц, сгруппированные по URL:
#
# total_count - подсчитать количество общих просмотров страниц для каждого URL
# uniques - подсчитать количество уникальных пользователей для каждого URL-адреса
# p99_latency - определить задержку в 99-м процентиле для каждого URL-адреса

q = "
CREATE CONTINUOUS VIEW v AS
SELECT
  url::text,
  count(*) AS total_count,
  count(DISTINCT cookie::text) AS uniques,
  percentile_cont(0.99) WITHIN GROUP (ORDER BY latency::integer) AS p99_latency
FROM page_views GROUP BY url"

pipeline.exec(q)

for n in 1..10000 do
  # 10 уникальных URL-адресов
  url = '/some/url/%d' % (n % 10)

  # 1000 уникальных файлов cookie
  cookie = '%032d' % (n % 1000)

  # задержка равномерно распределена между значениями 1 и 100
  latency = rand(101)

  # ПРИМЕЧАНИЕ: было бы намного быстрее объединить их в один INSERT
  # оператор, но для простоты выполняется по одному за раз
  pipeline.exec(
  "INSERT INTO page_views (url, cookie, latency) VALUES ('%s', '%s', %d)"
        % [url, cookie, latency])
end

# Результат непрерывного представления можно запрашивать как любую другую таблицу или представление
rows = pipeline.exec('SELECT * FROM v ORDER BY url')

rows.each do |row|
  puts row
end

# Очистка
pipeline.exec('DROP CONTINUOUS VIEW v')

Java

Для этого примера понадобится JDBC, установленный на вашем CLASSPATH.

import java.util.Properties; import java.sql.\*;

public class Example {

  static final String HOST = "localhost";
  static final String DATABASE = "test";
  static final String USER = "user";

  public static void main(String[] args) throws SQLException {

    // Подключиться к БД "test" через порт 5432
    String url = "jdbc:postgresql://" + HOST + ":5432/" + DATABASE;
    ResultSet  rs;
    Properties props = new Properties();

    props.setProperty("user", USER);
    Connection conn = DriverManager.getConnection(url, props);

    Statement stmt = conn.createStatement();
    stmt.executeUpdate(
      "CREATE CONTINUOUS VIEW v AS SELECT x::integer, COUNT(*) FROM stream GROUP BY x");

    for (int i=0; i<100000; i++)
    {
      // 10 уникальных группировок
      int x = i % 10;

      // INSERT INTO stream (x) VALUES (x)
      stmt.addBatch("INSERT INTO stream (x) VALUES (" + Integer.toString(x) + ")");
    }

    stmt.executeBatch();

    rs = stmt.executeQuery("SELECT * FROM v");
    while (rs.next())
    {
      int id = rs.getInt("x");
      int count = rs.getInt("count");

      System.out.println(id + " = " + count);
    }

    // Очистка
    stmt.executeUpdate("DROP CONTINUOUS VIEW v");
    conn.close();
  }
}