Клиенты
Поскольку Tantor PipelineDB совместим с PostgreSQL 15+, то у него нет собственных клиентских библиотек. То есть любой клиент, который работает с PostgreSQL (или любой другой SQL-базой данных), будет работать с Tantor PipelineDB.
Здесь приведены примеры простого приложения Tantor PipelineDB на нескольких языках и разных клиентах. Приложение просто создает непрерывное представление:
CREATE VIEW continuous view (action=materialize) AS
SELECT x::integer, COUNT(*) FROM stream GROUP BY x;
Приложение генерирует 100,000
событий, получает 10
уникальных группировок для непрерывного представления и выводит результаты.
Python
Для работы с примером на Python, приведенным ниже, понадобится установить psycopg2.
import psycopg2
conn = psycopg2.connect('dbname=test user=user host=localhost port=5432')
pipeline = conn.cursor()
create_stream = """
CREATE FOREIGN TABLE stream (x integer) SERVER pipelinedb
"""
pipeline.execute(create_stream)
create_cv = """
CREATE VIEW continuous_view WITH (action=materialize) AS SELECT x::integer, COUNT(*) FROM stream GROUP BY x
"""
pipeline.execute(create_cv)
conn.commit()
rows = []
for n in range(100000):
# 10 unique groupings
x = n % 10
rows.append({'x': x})
# Теперь запишите строки в поток
pipeline.executemany('INSERT INTO stream (x) VALUES (%(x)s)', rows)
# И прочтите результаты
pipeline.execute('SELECT * FROM continuous_view')
rows = pipeline.fetchall()
for row in rows:
x, count = row
print x, count
pipeline.execute('DROP VIEW continuous_view')
pipeline.close()
Ruby
В этом примере на Ruby используется pg gem.
require 'pg'
pipeline = PGconn.connect("dbname='test' user='user' host='localhost' port=5432")
# Это непрерывное представление будет выполнять 3 агрегации по трафику просмотра страниц, сгруппированные по URL:
#
# total_count - подсчитать количество общих просмотров страниц для каждого URL
# uniques - подсчитать количество уникальных пользователей для каждого URL-адреса
# p99_latency - определить задержку в 99-м процентиле для каждого URL-адреса
s = "
CREATE FOREIGN TABLE page_views (
url text,
cookie text,
latency integer
) SERVER pipelinedb"
pipeline.exec(s)
q = "
CREATE VIEW v WITH (action=materialize) AS
SELECT
url,
count(*) AS total_count,
count(DISTINCT cookie) AS uniques,
percentile_cont(0.99) WITHIN GROUP (ORDER BY latency) AS p99_latency
FROM page_views GROUP BY url"
pipeline.exec(q)
for n in 1..10000 do
# 10 уникальных URL-адресов
url = '/some/url/%d' % (n % 10)
# 1000 уникальных файлов cookie
cookie = '%032d' % (n % 1000)
# Задержка равномерно распределена между значениями 1 и 100
latency = rand(101)
# ПРИМЕЧАНИЕ: было бы намного быстрее объединить их в один INSERT
# оператор, но для простоты выполняется по одному за раз
pipeline.exec(
"INSERT INTO page_views (url, cookie, latency) VALUES ('%s', '%s', %d)"
% [url, cookie, latency])
end
# Результат непрерывного представления можно запрашивать как любую другую таблицу или представление
rows = pipeline.exec('SELECT * FROM v ORDER BY url')
rows.each do |row|
puts row
end
# Очистка
pipeline.exec('DROP VIEW v')
Java
Для этого примера понадобится JDBC, установленный на вашем CLASSPATH
.
import java.util.Properties;
import java.sql.*;
public class Example {
static final String HOST = "localhost";
static final String DATABASE = "test";
static final String USER = "user";
public static void main(String[] args) throws SQLException {
// Подключиться к БД "test" через порт 5432
String url = "jdbc:postgresql://" + HOST + ":5432/" + DATABASE;
ResultSet rs;
Properties props = new Properties();
props.setProperty("user", USER);
Connection conn = DriverManager.getConnection(url, props);
Statement stmt = conn.createStatement();
stmt.executeUpdate(
"CREATE FOREIGN TABLE stream (x integer) SERVER pipelinedb");
stmt.executeUpdate(
"CREATE VIEW v WITH (action=materialize) AS SELECT x::integer, COUNT(*) FROM stream GROUP BY x");
for (int i=0; i<100000; i++)
{
// 10 уникальных группировок
int x = i % 10;
// INSERT INTO stream (x) VALUES (x)
stmt.addBatch("INSERT INTO stream (x) VALUES (" + Integer.toString(x) + ")");
}
stmt.executeBatch();
rs = stmt.executeQuery("SELECT * FROM v");
while (rs.next())
{
int id = rs.getInt("x");
int count = rs.getInt("count");
System.out.println(id + " = " + count);
}
// Очистка
stmt.executeUpdate("DROP VIEW v");
conn.close();
}
}