Skip to content

Instantly share code, notes, and snippets.

@pgiraud
Last active May 22, 2020 07:41
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save pgiraud/6a3178d2dada460615c9377395a8058f to your computer and use it in GitHub Desktop.
Save pgiraud/6a3178d2dada460615c9377395a8058f to your computer and use it in GitHub Desktop.
Statements
docker run --rm --name statements -e POSTGRES_PASSWORD=postgres -d -p 5432:5432 postgres
export PGUSER=postgres
export PGPASSWORD=postgres
export PGHOST=0.0.0.0
psql -f init.sql
psql -f populate.sql
psql -f chart.sql
-- SELECT
-- anon_1.ts,
-- SUM(anon_1.calls) / Greatest(Extract(epoch FROM anon_1.mesure_interval), 1) AS calls
-- FROM (
-- SELECT dbid,
-- ts,
-- Greatest(ts - Lead(ts) over (PARTITION BY dbid ORDER BY ts desc), '0 s') AS mesure_interval,
-- Greatest(SUM(calls) - Lead(SUM(calls)) over (PARTITION BY dbid ORDER BY ts desc), 0) AS calls,
-- LEAD(SUM(calls)) over (PARTITION BY dbid ORDEr BY ts desc) AS lead_calls_origin,
-- SUM(calls) AS calls_origin
-- FROM (
-- SELECT agent_address,
-- agent_port,
-- dbid,
-- datname,
-- queryid,
-- userid,
-- base.ts,
-- base.calls
-- FROM statements.statements,
-- lateral
-- (
-- SELECT *
-- FROM (
-- SELECT row_number() over ( PARTITION BY queryid ORDER BY ts ) AS NUMBER,
-- count(*) over (PARTITION BY queryid) AS total,
-- *
-- FROM (
-- SELECT (record).*
-- FROM statements.statements_history_current
-- WHERE (record).ts <@ tstzrange('2009-03-01 00:00'::timestamp, '2009-03-02 12:00'::timestamp, '[]')
-- AND queryid = statements.statements.queryid
-- AND userid = statements.statements.userid
-- AND dbid = statements.statements.dbid
-- AND datname = 'mydb'
-- AND agent_address = '0.0.0.0'
-- AND agent_port = 2345
-- ) AS statements_history
-- ) AS sh
-- WHERE NUMBER % ( int8larger((total)/(10 +1),1) ) = 0
-- ) AS base
-- ) by_db
-- GROUP BY dbid, ts
-- ORDER BY ts
-- ) AS anon_1
-- WHERE TRUE
-- GROUP BY anon_1.ts,
-- anon_1.mesure_interval
-- ORDER BY anon_1.ts
-- ;
-- SELECT agent_address,
-- agent_port,
-- dbid,
-- datname,
-- queryid,
-- userid,
-- base.*
-- FROM statements.statements,
-- lateral
-- (
-- SELECT *
-- FROM (
-- SELECT row_number() OVER ( ORDER BY ts ) AS NUMBER,
-- count(*) OVER ( PARTITION BY 1 ) AS total,
-- *
-- FROM (
-- SELECT (record).ts
-- FROM statements.statements_history_current
-- WHERE (record).ts <@ tstzrange('2009-03-01 00:00'::timestamp, '2009-03-02 12:00'::timestamp, '[]')
-- AND dbid = statements.statements.dbid
-- AND agent_address = '0.0.0.0'
-- AND agent_port = 2345
-- GROUP BY (record).ts
-- ) AS statements_history
-- ) AS sh
-- WHERE NUMBER % ( int8larger((total)/(10 +1),1) ) = 0
-- ) AS base
-- ;
-- SET max_parallel_workers_per_gather = 0;
-- SET track_io_timing TO on;
-- SELECT
-- anon_1.ts,
-- SUM(anon_1.calls) / Greatest(Extract(epoch FROM anon_1.mesure_interval), 1) AS calls_per_sec
-- FROM (
-- SELECT
-- ts,
-- Greatest(ts - Lag(ts) over (ORDER BY ts), '0 s') AS mesure_interval,
-- Greatest(calls - Lag(calls) over (ORDER BY ts), 0) AS calls
-- FROM (
-- SELECT *
-- FROM (
-- SELECT row_number() OVER ( ORDER BY ts ) AS NUMBER,
-- count(*) OVER ( PARTITION BY 1 ) AS total,
-- *
-- FROM (
-- select (record).*
-- from statements.statements_history_current_db AS shc
-- -- JOIN statements.statements AS st
-- -- ON st.agent_address = shc.agent_address
-- -- AND st.agent_port = shc.agent_port
-- -- AND st.dbid = shc.dbid
-- WHERE (record).ts <@ tstzrange('2009-03-02 10:00'::timestamp, '2009-03-02 11:00'::timestamp, '[]')
-- AND agent_address = '0.0.0.0'
-- AND agent_port = 2345
-- ) as foo
-- ) AS sh
-- WHERE NUMBER % ( int8larger((total)/(20 +1),1) ) = 0
-- ) AS base
-- ) AS anon_1
-- WHERE TRUE
-- GROUP BY anon_1.ts,
-- anon_1.mesure_interval
-- ORDER BY anon_1.ts
-- ;
-- per database
-- EXPLAIN (ANALYZE, BUFFERS)
SELECT
anon_1.ts,
SUM(anon_1.calls) / Greatest(Extract(epoch FROM anon_1.mesure_interval), 1) AS calls_per_sec
FROM (
SELECT
ts,
Greatest(ts - Lag(ts) over (ORDER BY ts), '0 s') AS mesure_interval,
Greatest(calls - Lag(calls) over (ORDER BY ts), 0) AS calls
FROM (
SELECT *
FROM (
SELECT row_number() OVER ( ORDER BY ts ) AS NUMBER,
count(*) OVER ( PARTITION BY 1 ) AS total,
*
FROM (
SELECT (unnested.records).* AS record
FROM (
SELECT psh.dbid, psh.coalesce_range, unnest(records) AS records
FROM statements.statements_history_db psh
WHERE coalesce_range && tstzrange(now() - interval '300 minutes', now(),'[]')
AND datname = 'mydatabase'
AND agent_address = '0.0.0.0'
AND agent_port = 2345
) AS unnested
WHERE tstzrange((records).ts, (records).ts, '[]') <@ tstzrange(now() - interval '300 minutes', now(), '[]')
UNION ALL
select
(record).*
from statements.statements_history_current_db AS shc
WHERE tstzrange((record).ts, (record).ts, '[]') <@ tstzrange(now() - interval '300 minutes', now(), '[]')
AND datname = 'mydatabase'
AND agent_address = '0.0.0.0'
AND agent_port = 2345
-- GROUP BY (record).ts
-- ORDER by (record).ts
) as foo
) AS sh
WHERE NUMBER % ( int8larger((total)/(50 +1),1) ) = 0
) AS base
) AS anon_1
WHERE TRUE
GROUP BY anon_1.ts, anon_1.mesure_interval
ORDER BY anon_1.ts
;
--
-- Insertion de données
-- On prend les données les plus récentes qui sont dans statements et on les
-- duplique dans le passé
--
SET search_path TO statements, public;
WITH last_ts AS (
select (record).ts
from statements.statements_history_current
group by (record).ts
order by (record).ts desc
limit 1
)
DELETE FROM statements_history_current
WHERE (record).ts NOT IN (SELECT * FROM last_ts);
WITH last_ts AS (
select (record).ts
from statements.statements_history_current_db
group by (record).ts
order by (record).ts desc
limit 1
)
DELETE FROM statements_history_current_db
WHERE (record).ts NOT IN (SELECT * FROM last_ts);
TRUNCATE statements_src_tmp;
VACUUM ANALYZE;
-- EXPLAIN ANALYZE
-- WITH _last AS (
-- select *, (record).*
-- from statements.statements_history_current
-- )
-- INSERT INTO statements_src_tmp
-- SELECT
-- _last.agent_address,
-- _last.agent_port,
-- _last.ts - interval '1 minute',
-- _last.userid,
-- '',
-- _last.dbid,
-- '',
-- _last.queryid,
-- '',
-- 2,
-- _last.total_time,
-- _last.rows,
-- _last.shared_blks_hit,
-- _last.shared_blks_read,
-- _last.shared_blks_dirtied,
-- _last.shared_blks_written,
-- _last.local_blks_hit,
-- _last.local_blks_read,
-- _last.local_blks_dirtied,
-- _last.local_blks_written,
-- _last.temp_blks_read,
-- _last.temp_blks_written,
-- _last.blk_read_time,
-- _last.blk_write_time
-- FROM _last;
DROP table statements.statements_history_current_copy;
CREATE table statements.statements_history_current_copy
AS select * from statements.statements_history_current;
DROP table statements.statements_history_current_db_copy;
CREATE table statements.statements_history_current_db_copy
AS select * from statements.statements_history_current_db;
DROP PROCEDURE duplicate;
CREATE PROCEDURE duplicate() AS $PROC$
DECLARE
num_loops integer := 60 * 24 * 100;
BEGIN
FOR i IN 1..num_loops LOOP
IF i % 100 = 0 THEN
RAISE NOTICE '% %', i, num_loops;
COMMIT;
END IF;
WITH _last AS (
select s.*, (record).*
from statements.statements_history_current_copy shc
JOIN statements.statements s
ON s.queryid = shc.queryid
AND s.dbid = shc.dbid
AND s.userid = shc.userid
)
INSERT INTO statements_history_current
(
SELECT
_last.agent_address,
_last.agent_port,
_last.queryid,
_last.dbid,
_last.userid,
ROW(
_last.ts - i * interval '1 minute',
_last.calls - i,
_last.total_time,
_last.rows,
_last.shared_blks_hit,
_last.shared_blks_read,
_last.shared_blks_dirtied,
_last.shared_blks_written,
_last.local_blks_hit,
_last.local_blks_read,
_last.local_blks_dirtied,
_last.local_blks_written,
_last.temp_blks_read,
_last.temp_blks_written,
_last.blk_read_time,
_last.blk_write_time
)::statements.statements_history_record AS record
FROM _last
);
END LOOP;
FOR i IN 1..num_loops LOOP
IF i % 100 = 0 THEN
RAISE NOTICE '% %', i, num_loops;
COMMIT;
END IF;
WITH _last AS (
select *, (record).*
from statements.statements_history_current_db_copy
)
INSERT INTO statements_history_current_db
(
SELECT
_last.agent_address,
_last.agent_port,
_last.dbid,
_last.datname,
ROW(
_last.ts - i * interval '1 minute',
_last.calls - i,
_last.total_time,
_last.rows,
_last.shared_blks_hit,
_last.shared_blks_read,
_last.shared_blks_dirtied,
_last.shared_blks_written,
_last.local_blks_hit,
_last.local_blks_read,
_last.local_blks_dirtied,
_last.local_blks_written,
_last.temp_blks_read,
_last.temp_blks_written,
_last.blk_read_time,
_last.blk_write_time
)::statements.statements_history_record AS record
FROM _last
);
END LOOP;
RAISE NOTICE 'populated';
END;
$PROC$ LANGUAGE plpgsql; /* end of powa_statements_src */
CALL duplicate();
-- SELECT process_statements('0.0.0.0', 2345);
DROP SCHEMA IF EXISTS application CASCADE;
CREATE SCHEMA application;
CREATE EXTENSION btree_gist;
SET search_path TO application, public;
CREATE OR REPLACE FUNCTION public.temboard_log (msg text) RETURNS void
LANGUAGE plpgsql
AS $_$
BEGIN
RAISE NOTICE '%', msg;
END;
$_$;
CREATE TABLE instances (
agent_address TEXT NOT NULL,
agent_port INTEGER NOT NULL,
agent_key TEXT,
hostname TEXT NOT NULL,
cpu INTEGER,
memory_size BIGINT,
pg_port INTEGER,
pg_version TEXT,
pg_version_summary TEXT,
pg_data TEXT,
notify BOOLEAN DEFAULT true,
PRIMARY KEY (agent_address, agent_port)
);
DROP SCHEMA IF EXISTS statements CASCADE;
CREATE SCHEMA statements;
SET search_path TO statements, public;
BEGIN;
CREATE TABLE metas(
agent_address TEXT NOT NULL,
agent_port INTEGER NOT NULL,
coalesce_seq bigint NOT NULL default (1),
snapts timestamp with time zone NOT NULL default '-infinity'::timestamptz,
aggts timestamp with time zone NOT NULL default '-infinity'::timestamptz,
FOREIGN KEY (agent_address, agent_port) REFERENCES application.instances (agent_address, agent_port)
ON DELETE CASCADE
ON UPDATE CASCADE,
PRIMARY KEY (agent_address, agent_port)
);
CREATE TABLE statements (
agent_address TEXT NOT NULL,
agent_port INTEGER NOT NULL,
queryid BIGINT NOT NULL,
query TEXT NOT NULL,
dbid OID NOT NULL,
datname TEXT NOT NULL,
userid OID NOT NULL,
rolname TEXT NOT NULL,
FOREIGN KEY (agent_address, agent_port) REFERENCES application.instances (agent_address, agent_port)
ON DELETE CASCADE
ON UPDATE CASCADE,
PRIMARY KEY (agent_address, agent_port, queryid, dbid, userid)
);
CREATE TYPE statements_history_record AS (
ts TIMESTAMP WITH TIME ZONE,
calls BIGINT,
total_time DOUBLE PRECISION,
rows BIGINT,
shared_blks_hit BIGINT,
shared_blks_read BIGINT,
shared_blks_dirtied BIGINT,
shared_blks_written BIGINT,
local_blks_hit BIGINT,
local_blks_read BIGINT,
local_blks_dirtied BIGINT,
local_blks_written BIGINT,
temp_blks_read BIGINT,
temp_blks_written BIGINT,
blk_read_time DOUBLE PRECISION,
blk_write_time DOUBLE PRECISION
);
CREATE UNLOGGED TABLE statements_src_tmp (
agent_address TEXT NOT NULL,
agent_port INTEGER NOT NULL,
ts TIMESTAMP WITH TIME ZONE NOT NULL,
userid oid NOT NULL,
rolname TEXT NOT NULL,
dbid oid NOT NULL,
datname TEXT NOT NULL,
queryid BIGINT NOT NULL,
query TEXT NOT NULL,
calls BIGINT NOT NULL,
total_time DOUBLE PRECISION NOT NULL,
rows BIGINT NOT NULL,
shared_blks_hit BIGINT NOT NULL,
shared_blks_read BIGINT NOT NULL,
shared_blks_dirtied BIGINT NOT NULL,
shared_blks_written BIGINT NOT NULL,
local_blks_hit BIGINT NOT NULL,
local_blks_read BIGINT NOT NULL,
local_blks_dirtied BIGINT NOT NULL,
local_blks_written BIGINT NOT NULL,
temp_blks_read BIGINT NOT NULL,
temp_blks_written BIGINT NOT NULL,
blk_read_time DOUBLE PRECISION NOT NULL,
blk_write_time DOUBLE PRECISION NOT NULL
);
CREATE TABLE statements_history (
agent_address TEXT NOT NULL,
agent_port INTEGER NOT NULL,
queryid BIGINT NOT NULL,
dbid oid NOT NULL,
userid oid NOT NULL,
coalesce_range tstzrange NOT NULL,
records statements_history_record[] NOT NULL,
mins_in_range statements_history_record NOT NULL,
maxs_in_range statements_history_record NOT NULL,
FOREIGN KEY (agent_address, agent_port, queryid, dbid, userid) REFERENCES statements
ON DELETE CASCADE ON UPDATE CASCADE
);
CREATE INDEX ON statements_history USING gist (agent_address, agent_port, queryid, coalesce_range);
CREATE TABLE statements_history_db (
agent_address TEXT NOT NULL,
agent_port INTEGER NOT NULL,
dbid oid NOT NULL,
coalesce_range tstzrange NOT NULL,
records statements_history_record[] NOT NULL,
mins_in_range statements_history_record NOT NULL,
maxs_in_range statements_history_record NOT NULL,
FOREIGN KEY (agent_address, agent_port) REFERENCES application.instances (agent_address, agent_port)
ON DELETE CASCADE ON UPDATE CASCADE
);
CREATE INDEX ON statements_history_db USING gist (agent_address, agent_port, dbid, coalesce_range);
CREATE TABLE statements_history_current (
agent_address TEXT NOT NULL,
agent_port INTEGER NOT NULL,
queryid BIGINT NOT NULL,
dbid OID NOT NULL,
userid OID NOT NULL,
record statements_history_record NOT NULL,
FOREIGN KEY (agent_address, agent_port, queryid, dbid, userid) REFERENCES statements ON DELETE CASCADE ON UPDATE CASCADE
);
CREATE INDEX ON statements_history_current (agent_address, agent_port, dbid, userid, queryid);
CREATE INDEX on statements_history_current USING GIST (tstzrange((record).ts, (record).ts, '[]'));
CREATE TABLE statements_history_current_db (
agent_address TEXT NOT NULL,
agent_port INTEGER NOT NULL,
dbid OID NOT NULL,
datname TEXT NOT NULL,
record statements_history_record NOT NULL,
FOREIGN KEY (agent_address, agent_port) REFERENCES application.instances (agent_address, agent_port) ON DELETE CASCADE ON UPDATE CASCADE
);
CREATE INDEX ON statements_history_current_db (agent_address, agent_port, dbid);
CREATE INDEX ON statements_history_current_db (agent_address, agent_port);
CREATE INDEX on statements_history_current_db USING GIST (tstzrange((record).ts, (record).ts, '[]'));
CREATE OR REPLACE FUNCTION statements_aggregate(_address text, _port integer)
RETURNS void AS $PROC$
DECLARE
v_funcname text := 'statements_aggregate(' || _address || ':' || _port || ')';
v_rowcount bigint;
BEGIN
-- PERFORM temboard_log(format('running %I', v_funcname));
-- PERFORM prevent_concurrent_snapshot(_srvid);
-- aggregate statements table
INSERT INTO statements_history
SELECT agent_address, agent_port, queryid, dbid, userid,
tstzrange(min((record).ts), max((record).ts),'[]'),
array_agg(record),
ROW(min((record).ts),
min((record).calls),min((record).total_time),min((record).rows),
min((record).shared_blks_hit),min((record).shared_blks_read),
min((record).shared_blks_dirtied),min((record).shared_blks_written),
min((record).local_blks_hit),min((record).local_blks_read),
min((record).local_blks_dirtied),min((record).local_blks_written),
min((record).temp_blks_read),min((record).temp_blks_written),
min((record).blk_read_time),min((record).blk_write_time))::statements_history_record,
ROW(max((record).ts),
max((record).calls),max((record).total_time),max((record).rows),
max((record).shared_blks_hit),max((record).shared_blks_read),
max((record).shared_blks_dirtied),max((record).shared_blks_written),
max((record).local_blks_hit),max((record).local_blks_read),
max((record).local_blks_dirtied),max((record).local_blks_written),
max((record).temp_blks_read),max((record).temp_blks_written),
max((record).blk_read_time),max((record).blk_write_time))::statements_history_record
FROM statements_history_current
WHERE agent_address = _address AND agent_port = _port
GROUP BY agent_address, agent_port, queryid, dbid, userid;
GET DIAGNOSTICS v_rowcount = ROW_COUNT;
-- perform temboard_log(format('%I (statements_history) - rowcount: %s',
-- v_funcname, v_rowcount));
DELETE FROM statements_history_current
WHERE agent_address = _address AND agent_port = _port;
-- aggregate db table
INSERT INTO statements_history_db
SELECT agent_address, agent_port, dbid,
tstzrange(min((record).ts), max((record).ts),'[]'),
array_agg(record),
ROW(min((record).ts),
min((record).calls),min((record).total_time),min((record).rows),
min((record).shared_blks_hit),min((record).shared_blks_read),
min((record).shared_blks_dirtied),min((record).shared_blks_written),
min((record).local_blks_hit),min((record).local_blks_read),
min((record).local_blks_dirtied),min((record).local_blks_written),
min((record).temp_blks_read),min((record).temp_blks_written),
min((record).blk_read_time),min((record).blk_write_time))::statements_history_record,
ROW(max((record).ts),
max((record).calls),max((record).total_time),max((record).rows),
max((record).shared_blks_hit),max((record).shared_blks_read),
max((record).shared_blks_dirtied),max((record).shared_blks_written),
max((record).local_blks_hit),max((record).local_blks_read),
max((record).local_blks_dirtied),max((record).local_blks_written),
max((record).temp_blks_read),max((record).temp_blks_written),
max((record).blk_read_time),max((record).blk_write_time))::statements_history_record
FROM statements_history_current_db
WHERE agent_address = _address AND agent_port = _port
GROUP BY agent_address, agent_port, dbid;
GET DIAGNOSTICS v_rowcount = ROW_COUNT;
-- perform temboard_log(format('%I (statements_history_db) - rowcount: %s',
-- v_funcname, v_rowcount));
DELETE FROM statements_history_current_db
WHERE agent_address = _address AND agent_port = _port;
END;
$PROC$ LANGUAGE plpgsql; /* end of statements_aggregate */
CREATE OR REPLACE FUNCTION process_statements(_address text, _port integer) RETURNS void AS $PROC$
DECLARE
v_rowcount bigint;
v_coalesce integer := 100;
purge_seq bigint;
BEGIN
-- In this function, we process statements that have just been rerieved
-- from agent, and also aggregate counters by database
-- PERFORM temboard_log('start of process_statements(' || _address || ':' || _port || ')');
-- Create new meta for agent if doesn't already exist
INSERT INTO metas (agent_address, agent_port) VALUES (_address, _port)
ON CONFLICT DO NOTHING;
-- Update meta with info from the current proccess (snapshot)
UPDATE metas
SET coalesce_seq = coalesce_seq + 1,
snapts = now()
WHERE agent_address = _address AND agent_port = _port
RETURNING coalesce_seq INTO purge_seq;
-- PERFORM temboard_log(format('coalesce_seq(%s:%s): %s', _address, _port, purge_seq));
-- Lock table to prevent multiple snapshots to work at the same time
-- and insert data from statements_src_tmp table multiple times
-- This would lead to incoherent data
-- Use NOWAIT to avoid waiting for lock to be released
LOCK TABLE statements.statements, statements.statements_history_current, statements.statements_history_current_db
IN SHARE MODE NOWAIT;
WITH capture AS(
SELECT *
FROM statements.statements_src_tmp
WHERE agent_address = _address AND agent_port = _port
),
missing_statements AS (
INSERT INTO statements.statements (agent_address, agent_port, queryid, query, dbid, datname, userid, rolname)
SELECT _address, _port, queryid, query, dbid, datname, userid, rolname
FROM capture c
ON CONFLICT DO NOTHING
),
by_query AS (
INSERT INTO statements.statements_history_current
SELECT _address, _port, queryid, dbid, userid,
ROW(
ts, calls, total_time, rows, shared_blks_hit, shared_blks_read,
shared_blks_dirtied, shared_blks_written, local_blks_hit, local_blks_read,
local_blks_dirtied, local_blks_written, temp_blks_read, temp_blks_written,
blk_read_time, blk_write_time
)::statements.statements_history_record AS record
FROM capture
),
by_database AS (
INSERT INTO statements.statements_history_current_db
SELECT _address, _port, dbid, datname,
ROW(
ts, sum(calls), sum(total_time), sum(rows), sum(shared_blks_hit), sum(shared_blks_read),
sum(shared_blks_dirtied), sum(shared_blks_written), sum(local_blks_hit), sum(local_blks_read),
sum(local_blks_dirtied), sum(local_blks_written), sum(temp_blks_read), sum(temp_blks_written),
sum(blk_read_time), sum(blk_write_time)
)::statements.statements_history_record AS record
FROM capture
GROUP BY dbid, datname, ts
)
SELECT count(*) INTO v_rowcount
FROM capture;
-- Coalesce datas if needed
IF ( (purge_seq % v_coalesce ) = 0 )
THEN
PERFORM temboard_log(
format('coalesce needed, agent: %s - seq: %s - coalesce seq: %s',
_address, _port, purge_seq, v_coalesce ));
UPDATE metas
SET aggts = now()
WHERE agent_address = _address AND agent_port = _port;
-- PERFORM temboard_log(format('aggregating: %s:%s', _address, _port));
EXECUTE format('SELECT statements_aggregate(''%s'', %s)', _address, _port);
END IF;
-- PERFORM temboard_log(format('%s statements were added', v_rowcount));
DELETE FROM statements.statements_src_tmp WHERE agent_address = _address AND agent_port = _port;
END;
$PROC$ language plpgsql; /* end of process_statements */
COMMIT;
--
-- Insertion de données
--
SET search_path TO statements, public;
INSERT INTO application.instances (agent_address, agent_port, hostname)
VALUES ('0.0.0.0', 2345, 'instance.fqdn');
TRUNCATE statements_src_tmp;
TRUNCATE statements_history_current;
TRUNCATE statements_history_current_db;
TRUNCATE statements_history;
TRUNCATE statements_history_db;
TRUNCATE statements CASCADE;
Create or replace function random_phrase() returns text as
$$
declare
result text := '';
begin
with symbols(characters) as (
VALUES ('ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789')
)
SELECT string_agg(substr(word, (random() * 8 + 1 )::integer) , ' ') as phrase
INTO result
FROM (
select string_agg(substr(characters, (random() * length(characters) + 1) :: INTEGER, 1), '') as word
from symbols
join generate_series(1, 10) as word(chr_idx) on 1 = 1 -- word length
join generate_series(1,100) as words(idx) on 1 = 1 -- # of words
group by idx
) as foo;
return result;
end;
$$ language plpgsql;
DROP PROCEDURE populate;
CREATE PROCEDURE populate() AS $PROC$
DECLARE
num_statements integer := 100;
num_loops integer := 60;
_end timestamp = now();
_start timestamp := _end - interval '1 month';
BEGIN
FOR i IN 1..num_statements LOOP
INSERT INTO statements
VALUES ('0.0.0.0', 2345, i, random_phrase(), 16392, 'tpc', 1, 'toto');
END LOOP;
FOR i IN 1..num_statements LOOP
DROP SEQUENCE IF EXISTS calls_seq;
CREATE SEQUENCE calls_seq INCREMENT BY 6000;
INSERT INTO statements_history_current
VALUES (
'0.0.0.0',
2345,
i,
16392,
1,
ROW(
generate_series(_start, _end, '1 minute'),
nextval('calls_seq'),
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0
)::statements.statements_history_record
);
END LOOP;
DROP SEQUENCE IF EXISTS calls_seq;
CREATE SEQUENCE calls_seq INCREMENT BY 6000;
INSERT INTO statements_history_current_db
VALUES (
'0.0.0.0',
2345,
16392,
'tpc',
ROW(
generate_series(_start, _end, '1 minute'),
nextval('calls_seq') * num_statements,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0
)::statements.statements_history_record
);
END;
$PROC$ LANGUAGE plpgsql;
-- CALL populate();
CREATE OR REPLACE FUNCTION populate_src(_num_snapshots integer, datname text, dbid integer) RETURNS void AS $PROC$
DECLARE
_now timestamp := now();
ts timestamp;
num_statements integer := 500;
statements text[];
BEGIN
DROP SEQUENCE IF EXISTS calls_seq;
CREATE SEQUENCE calls_seq INCREMENT BY 60;
-- INSERT INTO statements_src_tmp
-- SELECT '0.0.0.0', 2345, *, 1, 'toto', 1, 'mydb', 1, _statement, nextval('calls_seq'), 10, 0, 1, 1, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0 FROM generate_series(_1monthago, _1weekago, '1 minute');
SELECT array(select random_phrase() FROM generate_series(0, num_statements))
INTO statements;
FOR n IN REVERSE _num_snapshots..0 LOOP
PERFORM nextval('calls_seq');
IF (_num_snapshots - n) % 100 = 0 THEN
RAISE NOTICE 'snapshot % / %, calls %', _num_snapshots - n, _num_snapshots, currval('calls_seq');
RAISE NOTICE '%', clock_timestamp();
END IF;
ts = _now - (n || ' minutes')::INTERVAL;
FOR i IN 1..num_statements LOOP
INSERT INTO statements_src_tmp
VALUES (
'0.0.0.0',
2345,
ts,
1,
'toto',
dbid,
datname,
i,
statements[i],
currval('calls_seq'),
10, 0, 1, 1, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0
);
END LOOP;
PERFORM process_statements('0.0.0.0', 2345);
END LOOP;
-- INSERT INTO statements_src_tmp
-- SELECT '0.0.0.0', 2345, *, 1, 'toto', 1, 'mydb', 1, _statement, currval('calls_seq'), 10, 0, 1, 1, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0 FROM generate_series(_1dayago + interval '1 minute', _now, '1 minute');
-- Même base, autre queryid
-- DROP SEQUENCE calls_seq;
-- CREATE SEQUENCE calls_seq INCREMENT BY 6000;
-- INSERT INTO statements_src_tmp
-- SELECT '0.0.0.0', 2345, *, 1, 'toto', 1, 'mydb', 321, 'SELECT $1 + $2', nextval('calls_seq'), 10, 0, 1, 1, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0 FROM generate_series(_1weekago, _now, '1 minute');
-- -- Autre base, même queryid
-- DROP SEQUENCE calls_seq;
-- CREATE SEQUENCE calls_seq INCREMENT BY 6000;
-- INSERT INTO statements_src_tmp
-- SELECT '0.0.0.0', 2345, *, 1, 'toto', 123, 'other_db', 1, _statement, nextval('calls_seq'), 10, 0, 1, 1, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0 FROM generate_series(_1weekago, _now, '1 minute');
RAISE NOTICE 'populated';
END;
$PROC$ LANGUAGE plpgsql; /* end of powa_statements_src */
DROP INDEX statements_history_current_agent_address_agent_port_dbid_us_idx;
DROP INDEX statements_history_current_tstzrange_idx;
DROP INDEX statements_history_current_db_agent_address_agent_port_dbid_idx;
DROP INDEX statements_history_current_db_agent_address_agent_port_idx;
DROP INDEX statements_history_current_db_tstzrange_idx;
DROP INDEX statements_history_agent_address_agent_port_queryid_coalesc_idx;
DROP INDEX statements_history_db_agent_address_agent_port_dbid_coalesc_idx;
SELECT populate_src(60 * 24 * 4, 'new_database', 1); -- 10 days
-- SELECT populate_src((now() - interval '5 hours')::timestamp, (now() - interval '4 hour')::timestamp, 1, 'mydb');
-- SELECT process_statements('0.0.0.0', 2345);
-- SELECT populate_src((now() - interval '4 hours')::timestamp, (now() - interval '3 hour')::timestamp, 1, 'mydb');
-- SELECT process_statements('0.0.0.0', 2345);
-- SELECT populate_src((now() - interval '3 hours')::timestamp, (now() - interval '2 hour')::timestamp, 1, 'mydb');
-- SELECT process_statements('0.0.0.0', 2345);
-- SELECT populate_src((now() - interval '2 hours')::timestamp, (now() - interval '1 hour')::timestamp, 1, 'mydb');
-- SELECT process_statements('0.0.0.0', 2345);
-- DROP FUNCTION populate_src();
VACUUM ANALYZE;
SELECT count(*) from statements_history_current_db;
SET search_path TO statements, public;
TRUNCATE statements_src_tmp;
TRUNCATE statements_history_current;
TRUNCATE statements_history_current_db;
TRUNCATE statements_history;
TRUNCATE statements_history_db;
TRUNCATE statements CASCADE;
INSERT INTO application.instances (agent_address, agent_port, hostname)
VALUES ('0.0.0.0', 2345, 'instance.fqdn');
CREATE OR REPLACE FUNCTION populate_src(_num_snapshots integer) RETURNS void AS $PROC$
DECLARE
ts timestamp;
num_statements integer := 500;
statements text[];
BEGIN
FOR n IN REVERSE _num_snapshots..0 LOOP
ts = clock_timestamp() - (n || ' minutes')::interval;
IF n % 100 = 0 THEN
RAISE NOTICE 'snapshot % / %', n, _num_snapshots;
RAISE NOTICE '%', ts;
END IF;
INSERT INTO statements_src_tmp
SELECT
'0.0.0.0',
2345,
ts,
userid,
rolname,
dbid,
datname,
queryid,
query,
calls,
total_time,
rows,
shared_blks_hit,
shared_blks_read,
shared_blks_dirtied,
shared_blks_written,
local_blks_hit,
local_blks_read,
local_blks_dirtied,
local_blks_written,
temp_blks_read,
temp_blks_written,
blk_read_time,
blk_write_time
FROM pg_stat_statements pgss
JOIN pg_authid ON pgss.userid = pg_authid.oid
JOIN pg_database ON pgss.dbid = pg_database.oid;
PERFORM process_statements('0.0.0.0', 2345);
END LOOP;
RAISE NOTICE 'populated';
END;
$PROC$ LANGUAGE plpgsql; /* end of powa_statements_src */
-- SELECT populate_src(60 * 24 * 10, 'new_database', 1); -- 10 days
SELECT populate_src(60 * 24 * 1);
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment