PolarDBforPostgreSQL/external/polar_stat_sql/polar_stat_sql.c

1891 lines
55 KiB
C
Executable File

/*-------------------------------------------------------------------------
* polar_stat_sql.c
* There are two parts: one is the collection of information about the kernel,
* and the other is the statistics about the execution plan nodes.
*
* Copyright (c) 2021, Alibaba Group Holding Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* IDENTIFICATION
* external/polar_stat_sql/polar_stat_sql.c
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include <unistd.h>
#ifdef HAVE_SYS_RESOURCE_H
#include <sys/time.h>
#include <sys/resource.h>
#endif
#ifndef HAVE_GETRUSAGE
#include "rusagestub.h"
#endif
#include "access/hash.h"
#include "access/parallel.h"
#include "catalog/pg_type.h"
#include "executor/executor.h"
#include "executor/hashjoin.h"
#include "executor/instrument.h"
#include "funcapi.h"
#include "miscadmin.h"
#include "parser/analyze.h"
#include "pgstat.h"
#include "postmaster/autovacuum.h"
#include "storage/fd.h"
#include "storage/ipc.h"
#include "storage/polar_lock_stats.h"
#include "storage/polar_io_stat.h"
#include "storage/spin.h"
#include "tcop/utility.h"
#include "utils/builtins.h"
#include "utils/guc.h"
#include "utils/polar_sql_time_stat.h"
/* POLAR */
#include "utils/polar_sql_time_stat.h"
PG_MODULE_MAGIC;
#define PGSS_DUMP_FILE "pg_stat/polar_stat_sql.stat"
typedef uint64 pgss_queryid;
#define USAGE_INCREASE (1.0)
#define USAGE_DECREASE_FACTOR (0.99) /* decreased every pgss_entry_dealloc */
#define USAGE_DEALLOC_PERCENT 5 /* free this % of entries at once */
#define USAGE_INIT (1.0) /* including initial planning */
/*
* ru_inblock block size is 512 bytes with Linux
* see http://lkml.indiana.edu/hypermail/linux/kernel/0703.2/0937.html
*/
#define RUSAGE_BLOCK_SIZE 512 /* Size of a block for getrusage() */
#define TIMEVAL_DIFF(start, end) (((double) end.tv_sec + (double) end.tv_usec / 1000000.0) \
- ((double) start.tv_sec + (double) start.tv_usec / 1000000.0))
#define TIMESPEC_DIFF_IN_MS(t2, t1) ((t2 - t1)/1000.0)
#define LOCK_WAIT_TIMING(tag) \
TIMESPEC_DIFF_IN_MS(polar_lock_stat_local_summary[tag].wait_time, \
queryDesc->polar_sql_info->lock_stats_table_start[tag].wait_time);
/*
* Extension version number, for supporting older extension versions' objects
*/
typedef enum pgssVersion
{
PGSS_V1_0 = 0,
PGSS_V1_1,
} pgssVersion;
typedef enum POLAR_SS_COL_DEF {
COL_QUERY_ID = 0,
COL_USER_ID,
COL_DB_ID,
COL_READS,
COL_WRITES,
COL_USER_TIME,
COL_SYSTEM_TIME,
COL_MINFLTS,
COL_MAJFLTS,
COL_NSWAPS,
COL_MSGSNDS,
COL_MSGRCVS,
COL_NSIGNALS,
COL_NVCSWS,
COL_NIVCSWS,
COL_SCAN_ROWS,
COL_SCAN_TIME,
COL_SCAN_COUNT,
COL_JOIN_ROWS,
COL_JOIN_TIME,
COL_JOIN_COUNT,
COL_SORT_ROWS,
COL_SORT_TIME,
COL_SORT_COUNT,
COL_GROUP_ROWS,
COL_GROUP_TIME,
COl_GROUP_COUNT,
COL_HASH_ROWS,
COL_HASH_MEMORY,
COL_HASH_COUNT,
COL_PARSE_TIME,
COL_ANALYZE_TIME,
COL_REWRITE_TIME,
COL_PLAN_TIME,
COL_EXECUTE_TIME,
COL_LWLOCK_WAIT,
COL_REL_LOCK_WAIT,
COL_XACT_LOCK_WAIT,
COL_PAGE_LOCK_WAIT,
COL_TUPLE_LOCK_WAIT,
COL_SHARED_READ_PS,
COL_SHARED_WRITE_PS,
COL_SHARED_READ_THROUGHPUT,
COL_SHARED_WRITE_THROUGHPUT,
COL_SHARED_READ_LATENCY,
COL_SHARED_WRITE_LATENCY,
COL_IO_OPEN_NUM,
COL_IO_SEEK_COUNT,
COL_IO_OPEN_TIME,
COL_IO_SEEK_TIME,
COL_POLAR_STAT_SQL_MAX
} POLAR_SS_COL_DEF;
static const uint32 PGSS_FILE_HEADER = 0x0d756e10;
/*
* Current getrusage counters.
*
* For platform without getrusage support, we rely on postgres implementation
* defined in rusagestub.h, which only supports user and system time.
*
* Note that the following counters are not maintained on GNU/Linux:
* - ru_nswap
* - ru_msgsnd
* - ru_msgrcv
* - ru_nsignals
*/
typedef struct pgssCounters
{
double usage; /* usage factor */
float8 utime; /* CPU user time */
float8 stime; /* CPU system time */
int64 minflts; /* page reclaims (soft page faults) */
int64 majflts; /* page faults (hard page faults) */
int64 nswaps; /* page faults (hard page faults) */
int64 reads; /* Physical block reads */
int64 writes; /* Physical block writes */
int64 msgsnds; /* IPC messages sent */
int64 msgrcvs; /* IPC messages received */
int64 nsignals; /* signals received */
int64 nvcsws; /* voluntary context witches */
int64 nivcsws; /* unvoluntary context witches */
double scan_rows; /* scan rows */
double scan_time; /* scan time */
int64 scan_count; /* scan count */
double join_rows; /* join rows */
double join_time; /* join time */
int64 join_count; /* join count */
double sort_rows; /* sort rows */
double sort_time; /* sort time */
int64 sort_count; /* sort count */
double group_rows; /* group rows */
double group_time; /* group time */
int64 group_count; /* group count */
double hash_rows; /* hash rows */
int64 hash_memory; /* hash memory, unit kB */
int64 hash_count; /* hash count */
double parse_time; /* total time for sql parse */
double analyze_time; /* total time for sql analyze */
double rewrite_time; /* total time for sql analyze */
double plan_time; /* total time for sql plan */
double execute_time; /* total time for sql execute */
double lwlock_wait; /* lwlock wait time */
double rel_lock_wait; /* rel lock wait time */
double xact_lock_wait; /* xact lock wait time */
double page_lock_wait; /* page lock wait time */
double tuple_lock_wait; /* tuple lock wait time */
int64 shared_read_ps;
int64 shared_write_ps;
int64 shared_read_throughput;
int64 shared_write_throughput;
double shared_read_latency;
double shared_write_latency;
int64 io_open_num;
int64 io_seek_count;
double io_open_time;
double io_seek_time;
} pgssCounters;
static int pgss_max = 0; /* max #queries to store. pg_stat_statements.max is used */
/*
* Hashtable key that defines the identity of a hashtable entry. We use the
* same hash as pg_stat_statements
*/
typedef struct pgssHashKey
{
Oid userid; /* user OID */
Oid dbid; /* database OID */
pgss_queryid queryid; /* query identifier */
} pgssHashKey;
/*
* Statistics per database
*/
typedef struct pgssEntry
{
pgssHashKey key; /* hash key of entry - MUST BE FIRST */
pgssCounters counters; /* statistics for this query */
slock_t mutex; /* protects the counters only */
} pgssEntry;
/*
* Global shared state
*/
typedef struct pgssSharedState
{
LWLock *lock; /* protects hashtable search/modification */
} pgssSharedState;
/*
* struct about plan info
*/
typedef struct pgssPlanInfo
{
double scan_rows; /* scan rows */
double scan_time; /* scan time */
int64 scan_count; /* scan count */
double join_rows; /* join rows */
double join_time; /* join time */
int64 join_count; /* join count */
double sort_rows; /* sort rows */
double sort_time; /* sort time */
int64 sort_count; /* sort count */
double group_rows; /* group rows */
double group_time; /* group time */
int64 group_count; /* group count */
double hash_rows; /* hash rows */
int64 hash_memory; /* hash memory, unit kB */
int64 hash_count; /* hash count */
}pgssPlanInfo;
/* QPS */
typedef struct QueryTypeStat
{
LWLock *lock;
/* DQL */
pg_atomic_uint64 select_count;
/* DML */
pg_atomic_uint64 update_count;
pg_atomic_uint64 insert_count;
pg_atomic_uint64 delete_count;
pg_atomic_uint64 explain_count;
pg_atomic_uint64 lock_count;
/* DDL */
pg_atomic_uint64 create_count;
pg_atomic_uint64 drop_count;
pg_atomic_uint64 rename_count;
pg_atomic_uint64 truncate_count;
pg_atomic_uint64 comment_count;
pg_atomic_uint64 alter_count;
/* DCL */
pg_atomic_uint64 grant_count;
} QueryTypeStat;
static QueryTypeStat* query_type_stat = NULL;
static void add_various_tuple(Tuplestorestate *state, TupleDesc tdesc, char* sqltype, char* cmdtype,
const int arraylen, int64 count, Datum *values,
bool *nulls, int array_size);
static void record_query_type(Query *query);
static void pgsk_post_parse_analyze(ParseState *pstate, Query *query);
/* QPS end */
/* saved hook address in case of unload */
static shmem_startup_hook_type prev_shmem_startup_hook = NULL;
static ExecutorStart_hook_type prev_ExecutorStart = NULL;
static ExecutorRun_hook_type prev_ExecutorRun = NULL;
static ExecutorFinish_hook_type prev_ExecutorFinish = NULL;
static ExecutorEnd_hook_type prev_ExecutorEnd = NULL;
static post_parse_analyze_hook_type prev_post_parse_analyze_hook = NULL;
static ProcessUtility_hook_type prev_ProcessUtility = NULL;
/* Links to shared memory state */
static pgssSharedState *pgss = NULL;
static HTAB *polar_stat_sql_hash = NULL;
/*--- Functions --- */
void _PG_init(void);
void _PG_fini(void);
extern PGDLLEXPORT Datum polar_stat_sql_reset(PG_FUNCTION_ARGS);
extern PGDLLEXPORT Datum polar_stat_sql(PG_FUNCTION_ARGS);
extern PGDLLEXPORT Datum polar_stat_sql_1_0(PG_FUNCTION_ARGS);
extern PGDLLEXPORT Datum polar_stat_sql_1_1(PG_FUNCTION_ARGS);
PG_FUNCTION_INFO_V1(polar_stat_sql_reset);
PG_FUNCTION_INFO_V1(polar_stat_sql);
PG_FUNCTION_INFO_V1(polar_stat_sql_1_0);
PG_FUNCTION_INFO_V1(polar_stat_sql_1_1);
static void polar_stat_sql_internal(FunctionCallInfo fcinfo, pgssVersion
api_version);
static void pgss_setmax(void);
static Size pgss_memsize(void);
static void polar_stat_shmem_startup(void);
static void pgss_shmem_shutdown(int code, Datum arg);
static void polar_stat_ExecutorStart(QueryDesc *queryDesc, int eflags);
static void polar_stat_ExecutorEnd(QueryDesc *queryDesc);
static void polar_stat_ExecutorRun(QueryDesc *queryDesc, ScanDirection direction, uint64 count,
bool execute_once);
static void
polar_stat_ExecutorFinish(QueryDesc *queryDesc);
static void
polar_stat_ProcessUtility(PlannedStmt *pstmt, const char *queryString,
ProcessUtilityContext context,
ParamListInfo params, QueryEnvironment *queryEnv,
DestReceiver *dest, char *completionTag);
static pgssEntry *pgss_entry_alloc(pgssHashKey *key);
static void pgss_entry_dealloc(void);
static void pgss_entry_reset(void);
static void pgss_entry_store(pgss_queryid queryId, pgssCounters counters, pgssPlanInfo *plan_info, PolarCollectIoStat *io_stat_info, QueryDesc *queryDesc);
static uint32 polar_stat_sql_hash_fn(const void *key, Size keysize);
static int pgss_match_fn(const void *key1, const void *key2, Size keysize);
static void pgss_stat_plan_node(PlanState *planstate, pgssPlanInfo *planInfo);
static void pgss_stat_subplan_node(List *plans, pgssPlanInfo *planInfo);
static bool pgss_assign_linux_hz_check_hook(int *newval, void **extra, GucSource source);
static Size pgss_queryids_array_size(void);
static int pgss_linux_hz;
static bool polar_ss_enable_stat; /* whether to enable the extension */
static bool polar_ss_getuage; /* whether to enable getuage */
static bool polar_ss_gather_plan_info; /* whether to gather plan node information */
static bool polar_ss_plan_need_time; /* whether to gather plan node information need time*/
static bool polar_ss_save; /* whether to save stats across shutdown */
static bool polar_pgsk_qps_monitor; /* whether to monitor qps */
static double polar_ss_sample_rate; /* sample rate of sql */
static bool is_sql_sampled; /* sql level sampled */
static bool is_stat_enable; /* stat enable switch */
/* Current nesting depth of ExecutorRun+ProcessUtility calls */
static int nested_level = 0;
void
_PG_init(void)
{
if (!process_shared_preload_libraries_in_progress)
{
elog(ERROR, "This module can only be loaded via shared_preload_libraries");
return;
}
DefineCustomIntVariable("polar_stat_sql.linux_hz",
"Inform polar_stat_sql of the linux CONFIG_HZ config option",
"This is used by polar_stat_sql to compensate for sampling errors "
"in getrusage due to the kernel adhering to its ticks. The default value, -1, "
"tries to guess it at startup. ",
&pgss_linux_hz,
-1,
-1,
INT_MAX,
PGC_USERSET,
0,
pgss_assign_linux_hz_check_hook,
NULL,
NULL);
/*
* POLAR: whether to enable the extension
*/
DefineCustomBoolVariable("polar_stat_sql.enable_stat",
"Whether to enable the extension",
NULL,
&polar_ss_enable_stat,
false,
PGC_SUSET,
0,
NULL,
NULL,
NULL);
DefineCustomRealVariable("polar_stat_sql.sample_rate",
"Sampling rate. 1 means every query, 0.2 means 1 in five queries",
NULL,
&polar_ss_sample_rate,
0.5,
0.0,
1.0,
PGC_USERSET,
0,
NULL,
NULL,
NULL);
/*
* POLAR: whether to enable getrusage
*/
DefineCustomBoolVariable("polar_stat_sql.enable_getrusage",
"Whether to enable getuage",
NULL,
&polar_ss_getuage,
false,
PGC_SUSET,
0,
NULL,
NULL,
NULL);
/*
* POLAR: whether to gather plan nodes info switch
*/
DefineCustomBoolVariable("polar_stat_sql.enable_gather_plan_info",
"Whether to gather plan nodes info",
NULL,
&polar_ss_gather_plan_info,
false,
PGC_SUSET,
0,
NULL,
NULL,
NULL);
/*
* POLAR: whether to gather plan nodes need time
*/
DefineCustomBoolVariable("polar_stat_sql.enable_plan_need_time",
"Whether to gather plan nodes need time information",
NULL,
&polar_ss_plan_need_time,
false,
PGC_SUSET,
0,
NULL,
NULL,
NULL);
/*
* POLAR: Set the default value to be 'false' here.
*/
DefineCustomBoolVariable("polar_stat_sql.save",
"Save polar statistics for across server shutdowns.",
NULL,
&polar_ss_save,
false,
PGC_SIGHUP,
0,
NULL,
NULL,
NULL);
/*
* POLAR: Set the default value to be 'false' here,
*/
DefineCustomBoolVariable("polar_stat_sql.enable_qps_monitor",
"Selects whether to monitor qps ",
NULL,
&polar_pgsk_qps_monitor,
false,
PGC_SUSET,
0,
NULL,
NULL,
NULL);
EmitWarningsOnPlaceholders("polar_stat_sql");
/* set pgss_max if needed */
pgss_setmax();
RequestAddinShmemSpace(pgss_memsize());
RequestNamedLWLockTranche("polar_stat_sql", 1);
/* Install hook */
prev_shmem_startup_hook = shmem_startup_hook;
shmem_startup_hook = polar_stat_shmem_startup;
prev_post_parse_analyze_hook = post_parse_analyze_hook;
post_parse_analyze_hook = pgsk_post_parse_analyze;
prev_ExecutorStart = ExecutorStart_hook;
ExecutorStart_hook = polar_stat_ExecutorStart;
prev_ExecutorRun = ExecutorRun_hook;
ExecutorRun_hook = polar_stat_ExecutorRun;
prev_ExecutorFinish = ExecutorFinish_hook;
ExecutorFinish_hook = polar_stat_ExecutorFinish;
prev_ExecutorEnd = ExecutorEnd_hook;
ExecutorEnd_hook = polar_stat_ExecutorEnd;
prev_ProcessUtility = ProcessUtility_hook;
ProcessUtility_hook = polar_stat_ProcessUtility;
}
void
_PG_fini(void)
{
/* uninstall hook */
ExecutorStart_hook = prev_ExecutorStart;
ExecutorRun_hook = prev_ExecutorRun;
ExecutorFinish_hook = prev_ExecutorFinish;
ExecutorEnd_hook = prev_ExecutorEnd;
shmem_startup_hook = prev_shmem_startup_hook;
post_parse_analyze_hook = prev_post_parse_analyze_hook;
ProcessUtility_hook = prev_ProcessUtility;
}
static bool
pgss_assign_linux_hz_check_hook(int *newval, void **extra, GucSource source)
{
int val = *newval;
struct rusage myrusage;
struct timeval previous_value;
/* In that case we try to guess it */
if (val == -1)
{
elog(LOG, "Auto detecting polar_stat_sql.linux_hz parameter...");
getrusage(RUSAGE_SELF, &myrusage);
previous_value = myrusage.ru_utime;
while (myrusage.ru_utime.tv_usec == previous_value.tv_usec &&
myrusage.ru_utime.tv_sec == previous_value.tv_sec)
{
getrusage(RUSAGE_SELF, &myrusage);
}
*newval = (int) (1 / ((myrusage.ru_utime.tv_sec - previous_value.tv_sec) +
(myrusage.ru_utime.tv_usec - previous_value.tv_usec) / 1000000.));
elog(LOG, "polar_stat_sql.linux_hz is set to %d", *newval);
}
return true;
}
static void
polar_stat_shmem_startup(void)
{
bool found;
HASHCTL info;
FILE *file;
int i;
uint32 header;
int32 num;
pgssEntry *buffer = NULL;
if (prev_shmem_startup_hook)
prev_shmem_startup_hook();
/* reset in case this is a restart within the postmaster */
pgss = NULL;
query_type_stat = NULL;
/* Create or attach to the shared memory state */
LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE);
query_type_stat = ShmemInitStruct("query_type_stat",
sizeof(QueryTypeStat),
&found);
/* global access lock */
pgss = ShmemInitStruct("polar_stat_sql",
sizeof(pgssSharedState),
&found);
if (!found)
{
/* First time through ... */
LWLockPadded *locks = GetNamedLWLockTranche("polar_stat_sql");
pgss->lock = &(locks[0]).lock;
}
/* set pgss_max if needed */
pgss_setmax();
memset(&info, 0, sizeof(info));
info.keysize = sizeof(pgssHashKey);
info.entrysize = sizeof(pgssEntry);
info.hash = polar_stat_sql_hash_fn;
info.match = pgss_match_fn;
/* allocate stats shared memory hash */
polar_stat_sql_hash = ShmemInitHash("polar_stat_sql hash",
pgss_max, pgss_max,
&info,
HASH_ELEM | HASH_FUNCTION | HASH_COMPARE);
LWLockRelease(AddinShmemInitLock);
if (!IsUnderPostmaster)
on_shmem_exit(pgss_shmem_shutdown, (Datum) 0);
/*
* Done if some other process already completed our initialization.
*/
if (found)
return;
/*
* If we were told not to load old statistics, we're done. (Note we do
* not try to unlink any old dump file in this case. This seems a bit
* questionable but it's the historical behavior.)
*/
if (!polar_ss_save)
{
return;
}
/* Load stat file, don't care about locking */
file = AllocateFile(PGSS_DUMP_FILE, PG_BINARY_R);
if (file == NULL)
{
if (errno == ENOENT)
return; /* ignore not-found error */
goto error;
}
/* check if header is valid */
if (fread(&header, sizeof(uint32), 1, file) != 1 ||
header != PGSS_FILE_HEADER)
goto error;
/* get number of entries */
if (fread(&num, sizeof(int32), 1, file) != 1)
goto error;
for (i = 0; i < num ; i++)
{
pgssEntry temp;
pgssEntry *entry;
if (fread(&temp, sizeof(pgssEntry), 1, file) != 1)
goto error;
entry = pgss_entry_alloc(&temp.key);
/* copy in the actual stats */
entry->counters = temp.counters;
/* don't initialize spinlock, already done */
}
FreeFile(file);
/*
* Remove the file so it's not included in backups/replication slaves,
* etc. A new file will be written on next shutdown.
*/
unlink(PGSS_DUMP_FILE);
return;
error:
ereport(LOG,
(errcode_for_file_access(),
errmsg("could not read polar_stat_sql file \"%s\": %m",
PGSS_DUMP_FILE)));
if (buffer)
pfree(buffer);
if (file)
FreeFile(file);
/* delete bogus file, don't care of errors in this case */
unlink(PGSS_DUMP_FILE);
}
/*
* shmem_shutdown hook: dump statistics into file.
*
*/
static void
pgss_shmem_shutdown(int code, Datum arg)
{
FILE *file;
HASH_SEQ_STATUS hash_seq;
int32 num_entries;
pgssEntry *entry;
/* Don't try to dump during a crash. */
if (code)
return;
if (!pgss)
return;
/* Don't dump if told not to. */
if (!polar_ss_save)
return;
file = AllocateFile(PGSS_DUMP_FILE ".tmp", PG_BINARY_W);
if (file == NULL)
goto error;
if (fwrite(&PGSS_FILE_HEADER, sizeof(uint32), 1, file) != 1)
goto error;
num_entries = hash_get_num_entries(polar_stat_sql_hash);
if (fwrite(&num_entries, sizeof(int32), 1, file) != 1)
goto error;
hash_seq_init(&hash_seq, polar_stat_sql_hash);
while ((entry = hash_seq_search(&hash_seq)) != NULL)
{
if (fwrite(entry, sizeof(pgssEntry), 1, file) != 1)
{
/* note: we assume hash_seq_term won't change errno */
hash_seq_term(&hash_seq);
goto error;
}
}
if (FreeFile(file))
{
file = NULL;
goto error;
}
/*
* Rename file inplace
*/
if (rename(PGSS_DUMP_FILE ".tmp", PGSS_DUMP_FILE) != 0)
ereport(LOG,
(errcode_for_file_access(),
errmsg("could not rename polar_stat_sql file \"%s\": %m",
PGSS_DUMP_FILE ".tmp")));
return;
error:
ereport(LOG,
(errcode_for_file_access(),
errmsg("could not read polar_stat_sql file \"%s\": %m",
PGSS_DUMP_FILE)));
if (file)
FreeFile(file);
unlink(PGSS_DUMP_FILE);
}
/*
* Retrieve pg_stat_statement.max GUC value and store it into pgss_max, since
* we want to store the same number of entries as pg_stat_statements. Don't do
* anything if pgss_max is already set.
*/
static void pgss_setmax(void)
{
const char *pgss_max_str;
const char *name = "pg_stat_statements.max";
if (pgss_max != 0)
return;
pgss_max_str = GetConfigOption(name, true, false);
/*
* Retrieving pg_stat_statements.max can fail if pgss is loaded after pgss
* in shared_preload_libraries. Hint user in case this happens.
*/
if (!pgss_max_str)
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_OBJECT),
errmsg("unrecognized configuration parameter \"%s\"",
name),
errhint("make sure pg_stat_statements is loaded,\n"
"and make sure polar_stat_sql is present after pg_stat_statements"
" in the shared_preload_libraries setting")));
pgss_max = atoi(pgss_max_str);
}
static Size pgss_memsize(void)
{
Size size;
Assert(pgss_max != 0);
size = MAXALIGN(sizeof(pgssSharedState));
size = add_size(size, hash_estimate_size(pgss_max, sizeof(pgssEntry)));
size = add_size(size, MAXALIGN(pgss_queryids_array_size()));
return size;
}
static Size
pgss_queryids_array_size(void)
{
/*
* queryid isn't pushed to parallel workers. We store them in shared mem
* for each query, identified by their BackendId. If need room for all
* possible backends, plus autovacuum launcher and workers, plus bg workers
* and an extra one since BackendId numerotation starts at 1.
*/
return (sizeof(bool) * (MaxConnections + autovacuum_max_workers + 1
+ max_worker_processes + 1));
}
/*
* support functions
*/
static void
pgss_entry_store(pgss_queryid queryId, pgssCounters counters, pgssPlanInfo *plan_info,
PolarCollectIoStat *io_stat_info, QueryDesc *queryDesc)
{
volatile pgssEntry *e;
pgssHashKey key;
pgssEntry *entry;
/* Safety check... */
if (!pgss || !polar_stat_sql_hash)
return;
/* Set up key for hashtable search */
key.userid = GetUserId();
key.dbid = MyDatabaseId;
key.queryid = queryId;
/* Lookup the hash table entry with shared lock. */
LWLockAcquire(pgss->lock, LW_SHARED);
entry = (pgssEntry *) hash_search(polar_stat_sql_hash, &key, HASH_FIND, NULL);
/* Create new entry, if not present */
if (!entry)
{
/* Need exclusive lock to make a new hashtable entry - promote */
LWLockRelease(pgss->lock);
LWLockAcquire(pgss->lock, LW_EXCLUSIVE);
/* OK to create a new hashtable entry */
entry = pgss_entry_alloc(&key);
}
/*
* Grab the spinlock while updating the counters (see comment about
* locking rules at the head of the file)
*/
e = (volatile pgssEntry *) entry;
SpinLockAcquire(&e->mutex);
e->counters.usage += USAGE_INCREASE;
e->counters.utime += counters.utime;
e->counters.stime += counters.stime;
e->counters.minflts += counters.minflts;
e->counters.majflts += counters.majflts;
e->counters.nswaps += counters.nswaps;
e->counters.reads += counters.reads;
e->counters.writes += counters.writes;
e->counters.msgsnds += counters.msgsnds;
e->counters.msgrcvs += counters.msgrcvs;
e->counters.nsignals += counters.nsignals;
e->counters.nvcsws += counters.nvcsws;
e->counters.nivcsws += counters.nivcsws;
e->counters.scan_rows += plan_info->scan_rows;
e->counters.scan_time += plan_info->scan_time;
e->counters.scan_count += plan_info->scan_count;
e->counters.join_rows += plan_info->join_rows;
e->counters.join_time += plan_info->join_time;
e->counters.join_count += plan_info->join_count;
e->counters.sort_rows += plan_info->sort_rows;
e->counters.sort_time += plan_info->sort_time;
e->counters.sort_count += plan_info->sort_count;
e->counters.group_rows += plan_info->group_rows;
e->counters.group_time += plan_info->group_time;
e->counters.group_count += plan_info->group_count;
e->counters.hash_rows += plan_info->hash_rows;
e->counters.hash_memory += plan_info->hash_memory;
e->counters.hash_count += plan_info->hash_count;
/* sql stat time */
if (polar_enable_track_sql_time_stat)
{
e->counters.execute_time += polar_sql_time_stat_local_summary.execute_time / 1000.0;
/*
* Only the top sql need to record the parse/analyze/rewrite/plan time.
*/
if (nested_level == 0)
{
e->counters.parse_time += polar_sql_time_stat_local_summary.parse_time / 1000.0;
e->counters.analyze_time += polar_sql_time_stat_local_summary.analyze_time / 1000.0;
e->counters.rewrite_time += polar_sql_time_stat_local_summary.rewrite_time / 1000.0;
e->counters.plan_time += polar_sql_time_stat_local_summary.plan_time / 1000.0;
/*
* When all the sql execute finish, need to init the global record.
*/
polar_init_sql_time_local_stats();
}
}
/* lwlock timing */
e->counters.lwlock_wait += TIMESPEC_DIFF_IN_MS(
polar_lwlock_stat_local_summary.wait_time,
queryDesc->polar_sql_info->lwlock_stat_start.wait_time);
/* lock timing */
e->counters.rel_lock_wait += LOCK_WAIT_TIMING(LOCKTAG_RELATION);
e->counters.rel_lock_wait += LOCK_WAIT_TIMING(LOCKTAG_RELATION_EXTEND);
e->counters.xact_lock_wait += LOCK_WAIT_TIMING(LOCKTAG_TRANSACTION);
e->counters.xact_lock_wait += LOCK_WAIT_TIMING(LOCKTAG_VIRTUALTRANSACTION);
e->counters.xact_lock_wait += LOCK_WAIT_TIMING(LOCKTAG_SPECULATIVE_TOKEN);
e->counters.page_lock_wait += LOCK_WAIT_TIMING(LOCKTAG_PAGE);
e->counters.tuple_lock_wait += LOCK_WAIT_TIMING(LOCKTAG_TUPLE);
e->counters.shared_read_ps += io_stat_info->shared_read_ps;
e->counters.shared_write_ps += io_stat_info->shared_write_ps;
e->counters.shared_read_throughput += io_stat_info->shared_read_throughput;
e->counters.shared_write_throughput += io_stat_info->shared_write_throughput;
e->counters.shared_read_latency += io_stat_info->shared_read_latency;
e->counters.shared_write_latency += io_stat_info->shared_write_latency;
e->counters.io_open_num += io_stat_info->io_open_num;
e->counters.io_seek_count += io_stat_info->io_seek_count;
e->counters.io_open_time += io_stat_info->io_open_time;
e->counters.io_seek_time += io_stat_info->io_seek_time;
SpinLockRelease(&e->mutex);
LWLockRelease(pgss->lock);
}
/*
* Allocate a new hashtable entry.
* caller must hold an exclusive lock on pgss->lock
*/
static pgssEntry *pgss_entry_alloc(pgssHashKey *key)
{
pgssEntry *entry;
bool found;
/* Make space if needed */
while (hash_get_num_entries(polar_stat_sql_hash) >= pgss_max)
pgss_entry_dealloc();
/* Find or create an entry with desired hash code */
entry = (pgssEntry *) hash_search(polar_stat_sql_hash, key, HASH_ENTER, &found);
if (!found)
{
/* New entry, initialize it */
/* reset the statistics */
memset(&entry->counters, 0, sizeof(pgssCounters));
/* set the appropriate initial usage count */
entry->counters.usage = USAGE_INIT;
/* re-initialize the mutex each time ... we assume no one using it */
SpinLockInit(&entry->mutex);
}
return entry;
}
/*
* qsort comparator for sorting into increasing usage order
*/
static int
entry_cmp(const void *lhs, const void *rhs)
{
double l_usage = (*(pgssEntry *const *) lhs)->counters.usage;
double r_usage = (*(pgssEntry *const *) rhs)->counters.usage;
if (l_usage < r_usage)
return -1;
else if (l_usage > r_usage)
return +1;
else
return 0;
}
/*
* Deallocate least used entries.
* Caller must hold an exclusive lock on pgss->lock.
*/
static void
pgss_entry_dealloc(void)
{
HASH_SEQ_STATUS hash_seq;
pgssEntry **entries;
pgssEntry *entry;
int nvictims;
int i;
/*
* Sort entries by usage and deallocate USAGE_DEALLOC_PERCENT of them.
* While we're scanning the table, apply the decay factor to the usage
* values.
*/
entries = palloc(hash_get_num_entries(polar_stat_sql_hash) * sizeof(pgssEntry *));
i = 0;
hash_seq_init(&hash_seq, polar_stat_sql_hash);
while ((entry = hash_seq_search(&hash_seq)) != NULL)
{
entries[i++] = entry;
entry->counters.usage *= USAGE_DECREASE_FACTOR;
}
qsort(entries, i, sizeof(pgssEntry *), entry_cmp);
nvictims = Max(10, i * USAGE_DEALLOC_PERCENT / 100);
nvictims = Min(nvictims, i);
for (i = 0; i < nvictims; i++)
{
hash_search(polar_stat_sql_hash, &entries[i]->key, HASH_REMOVE, NULL);
}
pfree(entries);
}
static void pgss_entry_reset(void)
{
HASH_SEQ_STATUS hash_seq;
pgssEntry *entry;
LWLockAcquire(pgss->lock, LW_EXCLUSIVE);
hash_seq_init(&hash_seq, polar_stat_sql_hash);
while ((entry = hash_seq_search(&hash_seq)) != NULL)
{
hash_search(polar_stat_sql_hash, &entry->key, HASH_REMOVE, NULL);
}
LWLockRelease(pgss->lock);
}
/*
* Hooks
*/
static void
polar_stat_ExecutorStart (QueryDesc *queryDesc, int eflags)
{
is_sql_sampled = (random() <= (MAX_RANDOM_VALUE * polar_ss_sample_rate));
is_stat_enable = polar_ss_enable_stat;
if (is_stat_enable && queryDesc->polar_sql_info == NULL)
{
queryDesc->polar_sql_info = palloc0(sizeof(PolarStatSqlCollector));
}
if (is_stat_enable && is_sql_sampled) {
/* capture kernel usage stats in rusage_start */
if (polar_ss_getuage)
getrusage(RUSAGE_SELF, &(queryDesc->polar_sql_info->rusage_start));
if (polar_enable_track_sql_time_stat)
{
INSTR_TIME_SET_CURRENT(queryDesc->polar_sql_info->polar_execute_start);
}
/*
* Enable per-node instrumentation if plan execute info is required.
* We need set the instrument_options before the function standard_ExecutorStart
*/
if (queryDesc->plannedstmt->queryId != UINT64CONST(0)
&& polar_ss_gather_plan_info)
{
queryDesc->instrument_options |= INSTRUMENT_POLAR_PLAN;
if (polar_ss_plan_need_time)
queryDesc->instrument_options |= INSTRUMENT_TIMER;
}
queryDesc->polar_sql_info->lwlock_stat_start = polar_lwlock_stat_local_summary;
memcpy(queryDesc->polar_sql_info->lock_stats_table_start, polar_lock_stat_local_summary,
sizeof(queryDesc->polar_sql_info->lock_stats_table_start));
polar_collect_io_stat(&queryDesc->polar_sql_info->start_io_stat, MyBackendId);
}
/* give control back to PostgreSQL */
if (prev_ExecutorStart)
prev_ExecutorStart(queryDesc, eflags);
else
standard_ExecutorStart(queryDesc, eflags);
}
/*
* ExecutorRun hook: all we need do is track nesting depth
*/
static void
polar_stat_ExecutorRun(QueryDesc *queryDesc, ScanDirection direction, uint64 count,
bool execute_once)
{
nested_level++;
PG_TRY();
{
if (prev_ExecutorRun)
prev_ExecutorRun(queryDesc, direction, count, execute_once);
else
standard_ExecutorRun(queryDesc, direction, count, execute_once);
nested_level--;
}
PG_CATCH();
{
nested_level--;
PG_RE_THROW();
}
PG_END_TRY();
}
/*
* ExecutorFinish hook: all we need do is track nesting depth
*/
static void
polar_stat_ExecutorFinish(QueryDesc *queryDesc)
{
nested_level++;
PG_TRY();
{
if (prev_ExecutorFinish)
prev_ExecutorFinish(queryDesc);
else
standard_ExecutorFinish(queryDesc);
nested_level--;
}
PG_CATCH();
{
nested_level--;
PG_RE_THROW();
}
PG_END_TRY();
}
/*
* ProcessUtility hook
*/
static void
polar_stat_ProcessUtility(PlannedStmt *pstmt, const char *queryString,
ProcessUtilityContext context,
ParamListInfo params, QueryEnvironment *queryEnv,
DestReceiver *dest, char *completionTag)
{
nested_level++;
PG_TRY();
{
if (prev_ProcessUtility)
prev_ProcessUtility(pstmt, queryString,
context, params, queryEnv,
dest, completionTag);
else
standard_ProcessUtility(pstmt, queryString,
context, params, queryEnv,
dest, completionTag);
nested_level--;
}
PG_CATCH();
{
nested_level--;
PG_RE_THROW();
}
PG_END_TRY();
}
static void
polar_stat_ExecutorEnd (QueryDesc *queryDesc)
{
pgss_queryid queryId;
struct rusage rusage_start;
struct rusage rusage_end;
pgssCounters counters;
pgssPlanInfo planInfo = {0};
PolarCollectIoStat start_io_stat = {0};
PolarCollectIoStat end_io_stat = {0};
PolarCollectIoStat interval_io_stat = {0};
if (is_stat_enable && is_sql_sampled) {
/* capture kernel usage stats in rusage_end */
if (polar_ss_getuage)
getrusage(RUSAGE_SELF, &rusage_end);
if (polar_enable_track_sql_time_stat
&& ! (INSTR_TIME_IS_ZERO(queryDesc->polar_sql_info->polar_execute_start)))
{
instr_time end;
uint64 diff;
INSTR_TIME_SET_CURRENT(end);
diff = INSTR_TIME_GET_MICROSEC(end) - INSTR_TIME_GET_MICROSEC(queryDesc->polar_sql_info->polar_execute_start);
polar_sql_time_stat_local_summary.execute_time = diff;
}
/* gather plan nodes information */
if (polar_ss_gather_plan_info)
pgss_stat_plan_node(queryDesc->planstate, &planInfo);
queryId = queryDesc->plannedstmt->queryId;
rusage_start = queryDesc->polar_sql_info->rusage_start;
/* Compute CPU time delta */
counters.utime = 1000.0 * TIMEVAL_DIFF(rusage_start.ru_utime, rusage_end.ru_utime);
counters.stime = 1000.0 * TIMEVAL_DIFF(rusage_start.ru_stime, rusage_end.ru_stime);
if (queryDesc->totaltime)
{
/* Make sure stats accumulation is done */
InstrEndLoop(queryDesc->totaltime);
/*
* We only consider values greater than 3 * linux tick, otherwise the
* bias is too big
*/
if (queryDesc->totaltime->total < (3. / pgss_linux_hz))
{
counters.stime = 0;
counters.utime = 1000.0 * queryDesc->totaltime->total;
}
}
/* Compute the rest of the counters */
counters.minflts = rusage_end.ru_minflt - rusage_start.ru_minflt;
counters.majflts = rusage_end.ru_majflt - rusage_start.ru_majflt;
counters.nswaps = rusage_end.ru_nswap - rusage_start.ru_nswap;
counters.reads = rusage_end.ru_inblock - rusage_start.ru_inblock;
counters.writes = rusage_end.ru_oublock - rusage_start.ru_oublock;
counters.msgsnds = rusage_end.ru_msgsnd - rusage_start.ru_msgsnd;
counters.msgrcvs = rusage_end.ru_msgrcv - rusage_start.ru_msgrcv;
counters.nsignals = rusage_end.ru_nsignals - rusage_start.ru_nsignals;
counters.nvcsws = rusage_end.ru_nvcsw - rusage_start.ru_nvcsw;
counters.nivcsws = rusage_end.ru_nivcsw - rusage_start.ru_nivcsw;
/* If disable getuage, memset the counters */
if (!polar_ss_getuage)
memset(&counters, 0, sizeof(counters));
polar_collect_io_stat(&end_io_stat, MyBackendId);
start_io_stat = queryDesc->polar_sql_info->start_io_stat;
interval_io_stat.shared_read_ps = end_io_stat.shared_read_ps - start_io_stat.shared_read_ps;
interval_io_stat.shared_write_ps = end_io_stat.shared_write_ps - start_io_stat.shared_write_ps;
interval_io_stat.shared_read_throughput = end_io_stat.shared_read_throughput - start_io_stat.shared_read_throughput;
interval_io_stat.shared_write_throughput = end_io_stat.shared_write_throughput - start_io_stat.shared_write_throughput;
interval_io_stat.shared_read_latency = end_io_stat.shared_read_latency - start_io_stat.shared_read_latency;
interval_io_stat.shared_write_latency = end_io_stat.shared_write_latency - start_io_stat.shared_write_latency;
interval_io_stat.io_open_num = end_io_stat.io_open_num - start_io_stat.io_open_num;
interval_io_stat.io_seek_count = end_io_stat.io_seek_count - start_io_stat.io_seek_count;
interval_io_stat.io_open_time = end_io_stat.io_open_time - start_io_stat.io_open_time;
interval_io_stat.io_seek_time = end_io_stat.io_seek_time - start_io_stat.io_seek_time;
/* store current number of block reads and writes */
pgss_entry_store(queryId, counters, &planInfo, &interval_io_stat, queryDesc);
pfree(queryDesc->polar_sql_info);
queryDesc->polar_sql_info = NULL;
}
/* give control back to PostgreSQL */
if (prev_ExecutorEnd)
prev_ExecutorEnd(queryDesc);
else
standard_ExecutorEnd(queryDesc);
}
/*
* get hash nodes memory information.
*/
static int64
get_hash_memory_info(PlanState *planstate)
{
HashState *hashstate = (HashState *)planstate;
int64 spacePeakKb = 0;
HashInstrumentation hinstrument = {0};
/*
* In a parallel query, the leader process may or may not have run the
* hash join, and even if it did it may not have built a hash table due to
* timing (if it started late it might have seen no tuples in the outer
* relation and skipped building the hash table). Therefore we have to be
* prepared to get instrumentation data from all participants.
*/
if (hashstate->hashtable)
{
hinstrument.nbatch = hashstate->hashtable->nbatch;
hinstrument.space_peak = hashstate->hashtable->spacePeak;
}
/*
* Merge results from workers. In the parallel-oblivious case, the
* results from all participants should be identical, except where
* participants didn't run the join at all so have no data. In the
* parallel-aware case, we need to consider all the results. Each worker
* may have seen a different subset of batches and we want to find the
* highest memory usage for any one batch across all batches.
*/
if (hashstate->shared_info)
{
SharedHashInfo *shared_info = hashstate->shared_info;
int i;
for (i = 0; i < shared_info->num_workers; ++i)
{
HashInstrumentation *worker_hi = &shared_info->hinstrument[i];
if (!worker_hi)
{
continue;
}
if (worker_hi->nbatch > 0)
{
/*
* Normally every participant should agree on the number of
* batches too, but it's possible for a backend that started
* late and missed the whole join not to have the final nbatch
* number. So we'll take the largest number.
*/
hinstrument.nbatch = Max(hinstrument.nbatch, worker_hi->nbatch);
/*
* In a parallel-aware hash join, for now we report the
* maximum peak memory reported by any worker.
*/
hinstrument.space_peak =
Max(hinstrument.space_peak, worker_hi->space_peak);
}
}
}
if (hinstrument.nbatch > 0)
{
spacePeakKb = (hinstrument.space_peak + 1023) / 1024;
}
return spacePeakKb;
}
/*
* Statistical execution plan node information
*/
static void
pgss_stat_plan_node(PlanState *planstate, pgssPlanInfo *planInfo)
{
Plan *plan;
/*
* if planstate or instrument is null, we should return.
*/
if (!planstate || !planstate->instrument)
return;
plan = planstate->plan;
/*
* Make sure stats accumulation is done.
*/
InstrEndLoop(planstate->instrument);
/*
* In version 1.9, we only gather information about scan, join, sort agg/group_by hash nodes
*/
switch (nodeTag(plan))
{
case T_Scan:
case T_SeqScan:
case T_SampleScan:
case T_IndexScan:
case T_IndexOnlyScan:
case T_BitmapIndexScan:
case T_BitmapHeapScan:
case T_TidScan:
case T_SubqueryScan:
case T_FunctionScan:
case T_ValuesScan:
case T_TableFuncScan:
case T_CteScan:
case T_NamedTuplestoreScan:
case T_WorkTableScan:
case T_ForeignScan:
case T_CustomScan:
planInfo->scan_rows += planstate->instrument->ntuples;
planInfo->scan_time += 1000.0
* planstate->instrument->total; /* Total scan_time (in milliseconds) */
planInfo->scan_count += 1;
break;
case T_NestLoop:
case T_MergeJoin:
case T_HashJoin: planInfo->join_rows += planstate->instrument->ntuples;
planInfo->join_time += 1000.0
* planstate->instrument->total; /* Total join_time (in milliseconds) */
planInfo->join_count += 1;
break;
case T_Sort: planInfo->sort_rows += planstate->instrument->ntuples;
planInfo->sort_time += 1000.0
* planstate->instrument->total; /* Total sort_time (in milliseconds) */
planInfo->sort_count += 1;
break;
case T_Group:
case T_Agg: planInfo->group_rows += planstate->instrument->ntuples;
planInfo->group_time += 1000.0
* planstate->instrument->total; /* Total total time (in milliseconds) */
planInfo->group_count += 1;
break;
case T_Hash: planInfo->hash_rows += planstate->instrument->ntuples;
planInfo->hash_memory += get_hash_memory_info(planstate); /* KB */
planInfo->hash_count += 1;
break;
default: break;
}
/* initPlan-s */
if (planstate->initPlan)
pgss_stat_subplan_node(planstate->initPlan, planInfo);
/* lefttree */
if (outerPlanState(planstate))
pgss_stat_plan_node(outerPlanState(planstate), planInfo);
/* righttree */
if (innerPlanState(planstate))
pgss_stat_plan_node(innerPlanState(planstate), planInfo);
/* subPlan-s */
if (planstate->subPlan)
pgss_stat_subplan_node(planstate->subPlan, planInfo);
}
/*
* Statistical execution sub plan node information
*/
static void
pgss_stat_subplan_node(List *plans, pgssPlanInfo *planInfo)
{
ListCell *lst;
foreach(lst, plans)
{
SubPlanState *sps = (SubPlanState *) lfirst(lst);
pgss_stat_plan_node(sps->planstate, planInfo);
}
}
/*
* Calculate hash value for a key
*/
static uint32
polar_stat_sql_hash_fn(const void *key, Size keysize)
{
const pgssHashKey *k = (const pgssHashKey *) key;
return hash_uint32((uint32) k->userid) ^
hash_uint32((uint32) k->dbid) ^
hash_uint32((uint32) k->queryid);
}
/*
* Compare two keys - zero means match
*/
static int
pgss_match_fn(const void *key1, const void *key2, Size keysize)
{
const pgssHashKey *k1 = (const pgssHashKey *) key1;
const pgssHashKey *k2 = (const pgssHashKey *) key2;
if (k1->userid == k2->userid &&
k1->dbid == k2->dbid &&
k1->queryid == k2->queryid)
return 0;
else
return 1;
}
/*
* Reset statistics.
*/
PGDLLEXPORT Datum
polar_stat_sql_reset(PG_FUNCTION_ARGS)
{
if (!pgss)
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("polar_stat_sql must be loaded via shared_preload_libraries")));
pgss_entry_reset();
PG_RETURN_VOID();
}
PGDLLEXPORT Datum
polar_stat_sql(PG_FUNCTION_ARGS)
{
polar_stat_sql_internal(fcinfo, PGSS_V1_0);
return (Datum) 0;
}
PGDLLEXPORT Datum
polar_stat_sql_1_0(PG_FUNCTION_ARGS)
{
polar_stat_sql_internal(fcinfo, PGSS_V1_0);
return (Datum) 0;
}
PGDLLEXPORT Datum
polar_stat_sql_1_1(PG_FUNCTION_ARGS)
{
polar_stat_sql_internal(fcinfo, PGSS_V1_1);
return (Datum) 0;
}
static void
polar_stat_sql_internal(FunctionCallInfo fcinfo, pgssVersion api_version)
{
ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
MemoryContext per_query_ctx;
MemoryContext oldcontext;
TupleDesc tupdesc;
Tuplestorestate *tupstore;
HASH_SEQ_STATUS hash_seq;
pgssEntry *entry;
if (!pgss)
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("polar_stat_sql must be loaded via shared_preload_libraries")));
/* check to see if caller supports us returning a tuplestore */
if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("set-valued function called in context that cannot accept a set")));
if (!(rsinfo->allowedModes & SFRM_Materialize))
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("materialize mode required, but it is not " \
"allowed in this context")));
/* Switch into long-lived context to construct returned data structures */
per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
oldcontext = MemoryContextSwitchTo(per_query_ctx);
/* Build a tuple descriptor for our result type */
if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
elog(ERROR, "return type must be a row type");
tupstore = tuplestore_begin_heap(true, false, work_mem);
rsinfo->returnMode = SFRM_Materialize;
rsinfo->setResult = tupstore;
rsinfo->setDesc = tupdesc;
MemoryContextSwitchTo(oldcontext);
LWLockAcquire(pgss->lock, LW_SHARED);
hash_seq_init(&hash_seq, polar_stat_sql_hash);
while ((entry = hash_seq_search(&hash_seq)) != NULL)
{
Datum values[COL_POLAR_STAT_SQL_MAX];
bool nulls[COL_POLAR_STAT_SQL_MAX];
pgssCounters tmp;
int64 reads, writes;
memset(values, 0, sizeof(values));
memset(nulls, 0, sizeof(nulls));
/* copy counters to a local variable to keep locking time short */
{
volatile pgssEntry *e = (volatile pgssEntry *) entry;
SpinLockAcquire(&e->mutex);
tmp = e->counters;
SpinLockRelease(&e->mutex);
}
values[COL_QUERY_ID] = Int64GetDatum(entry->key.queryid);
values[COL_USER_ID] = ObjectIdGetDatum(entry->key.userid);
values[COL_DB_ID] = ObjectIdGetDatum(entry->key.dbid);
reads = tmp.reads * RUSAGE_BLOCK_SIZE;
writes = tmp.writes * RUSAGE_BLOCK_SIZE;
values[COL_READS] = Int64GetDatumFast(reads);
values[COL_WRITES] = Int64GetDatumFast(writes);
values[COL_USER_TIME] = Float8GetDatumFast(tmp.utime);
values[COL_SYSTEM_TIME] = Float8GetDatumFast(tmp.stime);
values[COL_MINFLTS] = Int64GetDatumFast(tmp.minflts);
values[COL_MAJFLTS] = Int64GetDatumFast(tmp.majflts);
values[COL_NSWAPS] = Int64GetDatumFast(tmp.nswaps);
values[COL_MSGSNDS] = Int64GetDatumFast(tmp.msgsnds);
values[COL_MSGRCVS] = Int64GetDatumFast(tmp.msgrcvs);
values[COL_NSIGNALS] = Int64GetDatumFast(tmp.nsignals);
values[COL_NVCSWS] = Int64GetDatumFast(tmp.nvcsws);
values[COL_NIVCSWS] = Int64GetDatumFast(tmp.nivcsws);
values[COL_SCAN_ROWS] = Float8GetDatumFast(tmp.scan_rows);
values[COL_SCAN_TIME] = Float8GetDatumFast(tmp.scan_time);
values[COL_SCAN_COUNT] = Int64GetDatumFast(tmp.scan_count);
values[COL_JOIN_ROWS] = Float8GetDatumFast(tmp.join_rows);
values[COL_JOIN_TIME] = Float8GetDatumFast(tmp.join_time);
values[COL_JOIN_COUNT] = Int64GetDatumFast(tmp.join_count);
values[COL_SORT_ROWS] = Float8GetDatumFast(tmp.sort_rows);
values[COL_SORT_TIME] = Float8GetDatumFast(tmp.sort_time);
values[COL_SORT_COUNT] = Int64GetDatumFast(tmp.sort_count);
values[COL_GROUP_ROWS] = Float8GetDatumFast(tmp.group_rows);
values[COL_GROUP_TIME] = Float8GetDatumFast(tmp.group_time);
values[COl_GROUP_COUNT] = Int64GetDatumFast(tmp.group_count);
values[COL_HASH_ROWS] = Float8GetDatumFast(tmp.hash_rows);
values[COL_HASH_MEMORY] = Int64GetDatumFast(tmp.hash_memory);
values[COL_HASH_COUNT] = Int64GetDatumFast(tmp.hash_count);
if (api_version >= PGSS_V1_1)
{
values[COL_PARSE_TIME] = Float8GetDatumFast(tmp.parse_time);
values[COL_ANALYZE_TIME] = Float8GetDatumFast(tmp.analyze_time);
values[COL_REWRITE_TIME] = Float8GetDatumFast(tmp.rewrite_time);
values[COL_PLAN_TIME] = Float8GetDatumFast(tmp.plan_time);
values[COL_EXECUTE_TIME] = Float8GetDatumFast(tmp.execute_time);
values[COL_LWLOCK_WAIT] = Float8GetDatumFast(tmp.lwlock_wait);
values[COL_REL_LOCK_WAIT] = Float8GetDatumFast(tmp.rel_lock_wait);
values[COL_XACT_LOCK_WAIT] = Float8GetDatumFast(tmp.xact_lock_wait);
values[COL_PAGE_LOCK_WAIT] = Float8GetDatumFast(tmp.page_lock_wait);
values[COL_TUPLE_LOCK_WAIT] = Float8GetDatumFast(tmp.tuple_lock_wait);
values[COL_SHARED_READ_PS] = Int64GetDatumFast(tmp.shared_read_ps);
values[COL_SHARED_WRITE_PS] = Int64GetDatumFast(tmp.shared_write_ps);
values[COL_SHARED_READ_THROUGHPUT] = Int64GetDatumFast(tmp.shared_read_throughput);
values[COL_SHARED_WRITE_THROUGHPUT] = Int64GetDatumFast(tmp.shared_write_throughput);
values[COL_SHARED_READ_LATENCY] = Float8GetDatumFast(tmp.shared_read_latency);
values[COL_SHARED_WRITE_LATENCY] = Float8GetDatumFast(tmp.shared_write_latency);
values[COL_IO_OPEN_NUM] = Int64GetDatumFast(tmp.io_open_num);
values[COL_IO_SEEK_COUNT] = Int64GetDatumFast(tmp.io_seek_count);
values[COL_IO_OPEN_TIME] = Float8GetDatumFast(tmp.io_open_time);
values[COL_IO_SEEK_TIME] = Float8GetDatumFast(tmp.io_seek_time);
}
tuplestore_putvalues(tupstore, tupdesc, values, nulls);
}
LWLockRelease(pgss->lock);
/* clean up and return the tuplestore */
tuplestore_donestoring(tupstore);
}
static void
pgsk_post_parse_analyze(ParseState *pstate, Query *query)
{
if (prev_post_parse_analyze_hook)
prev_post_parse_analyze_hook(pstate, query);
/*
* Record call times of each query type
*/
record_query_type(query);
}
/*
* Add various query type to tuple.
*/
static void
add_various_tuple(Tuplestorestate *state, TupleDesc tdesc, char* sqltype, char* cmdtype,
const int arraylen, int64 count, Datum *values,
bool *nulls, int array_size)
{
int col = 0;
MemSet(values, 0, sizeof(Datum) * array_size);
MemSet(nulls, 0, sizeof(bool) * array_size);
values[col++] = CStringGetTextDatum(sqltype);
values[col++] = CStringGetTextDatum(cmdtype);
values[col++] = Int64GetDatumFast(count);
Assert(col <= arraylen);
tuplestore_putvalues(state, tdesc, values, nulls);
}
/*
* Record call times of each query type
*/
static void
record_query_type(Query *query)
{
if(polar_pgsk_qps_monitor)
{
switch(query->commandType){
case CMD_SELECT:
pg_atomic_add_fetch_u64(&(query_type_stat->select_count),1);
break;
case CMD_UPDATE:
pg_atomic_add_fetch_u64(&(query_type_stat->update_count),1);
break;
case CMD_INSERT:
pg_atomic_add_fetch_u64(&(query_type_stat->insert_count),1);
break;
case CMD_DELETE:
pg_atomic_add_fetch_u64(&(query_type_stat->delete_count),1);
break;
case CMD_UTILITY:
Assert(query->utilityStmt != NULL);
switch (query->utilityStmt->type)
{
case T_ExplainStmt:
pg_atomic_add_fetch_u64(&(query_type_stat->explain_count),1);
break;
case T_CreateStmt:
pg_atomic_add_fetch_u64(&(query_type_stat->create_count),1);
break;
case T_DropStmt:
pg_atomic_add_fetch_u64(&(query_type_stat->drop_count),1);
break;
case T_RenameStmt:
pg_atomic_add_fetch_u64(&(query_type_stat->rename_count),1);
break;
case T_TruncateStmt:
pg_atomic_add_fetch_u64(&(query_type_stat->truncate_count),1);
break;
case T_CommentStmt:
pg_atomic_add_fetch_u64(&(query_type_stat->comment_count),1);
break;
case T_GrantStmt:
pg_atomic_add_fetch_u64(&(query_type_stat->grant_count),1);
break;
case T_LockStmt:
pg_atomic_add_fetch_u64(&(query_type_stat->lock_count),1);
break;
case T_AlterTableStmt:
case T_AlterTableCmd:
case T_AlterDomainStmt:
case T_AlterDefaultPrivilegesStmt:
case T_AlterFunctionStmt:
case T_AlterSeqStmt:
case T_AlterRoleStmt:
case T_AlterDatabaseStmt:
case T_AlterDatabaseSetStmt:
case T_AlterRoleSetStmt:
case T_AlterOpFamilyStmt:
case T_AlterObjectDependsStmt:
case T_AlterObjectSchemaStmt:
case T_AlterOwnerStmt:
case T_AlterOperatorStmt:
case T_AlterEnumStmt:
case T_AlterTSDictionaryStmt:
case T_AlterTSConfigurationStmt:
case T_AlterFdwStmt:
case T_AlterForeignServerStmt:
case T_AlterUserMappingStmt:
case T_AlterTableSpaceOptionsStmt:
case T_AlterTableMoveAllStmt:
case T_AlterExtensionStmt:
case T_AlterExtensionContentsStmt:
case T_AlterEventTrigStmt:
case T_AlterSystemStmt:
case T_AlterPolicyStmt:
case T_AlterPublicationStmt:
case T_AlterSubscriptionStmt:
case T_AlterCollationStmt:
pg_atomic_add_fetch_u64(&(query_type_stat->alter_count),1);
break;
default:
break;
}
break;
default:
break;
}
}
}
PG_FUNCTION_INFO_V1(polar_stat_query_count);
Datum
polar_stat_query_count(PG_FUNCTION_ARGS)
{
const int POLAR_QUERYTYPE_STAT_COLS = 3;
int index = 1;
ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
TupleDesc tupdesc;
Tuplestorestate *tupstore;
MemoryContext per_query_ctx;
MemoryContext oldcontext;
Datum values[POLAR_QUERYTYPE_STAT_COLS];
bool nulls[POLAR_QUERYTYPE_STAT_COLS];
/* check to see if caller supports us returning a tuplestore */
if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("set-valued function called in context that cannot accept a set")));
if (!(rsinfo->allowedModes & SFRM_Materialize))
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("materialize mode required, but it is not " \
"allowed in this context")));
tupdesc = CreateTemplateTupleDesc(POLAR_QUERYTYPE_STAT_COLS, false);
TupleDescInitEntry(tupdesc, (AttrNumber) index++, "sql_type",
TEXTOID, -1, 0);
TupleDescInitEntry(tupdesc, (AttrNumber) index++, "cmp_type",
TEXTOID, -1, 0);
TupleDescInitEntry(tupdesc, (AttrNumber) index++, "count",
INT8OID, -1, 0);
Assert(index - 1 <= POLAR_QUERYTYPE_STAT_COLS);
per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
oldcontext = MemoryContextSwitchTo(per_query_ctx);
tupstore = tuplestore_begin_heap(true, false, work_mem);
rsinfo->returnMode = SFRM_Materialize;
rsinfo->setResult = tupstore;
rsinfo->setDesc = tupdesc;
MemoryContextSwitchTo(oldcontext);
/* DQL */
add_various_tuple(tupstore, tupdesc, "DQL", "SELECT", POLAR_QUERYTYPE_STAT_COLS,
pg_atomic_read_u64(&(query_type_stat->select_count)),
values, nulls, POLAR_QUERYTYPE_STAT_COLS);
/* DML */
add_various_tuple(tupstore, tupdesc, "DML", "INSERT", POLAR_QUERYTYPE_STAT_COLS,
pg_atomic_read_u64(&(query_type_stat->insert_count)),
values, nulls, POLAR_QUERYTYPE_STAT_COLS);
add_various_tuple(tupstore, tupdesc, "DML", "UPDATE", POLAR_QUERYTYPE_STAT_COLS,
pg_atomic_read_u64(&(query_type_stat->update_count)),
values, nulls, POLAR_QUERYTYPE_STAT_COLS);
add_various_tuple(tupstore, tupdesc, "DML", "DELETE", POLAR_QUERYTYPE_STAT_COLS,
pg_atomic_read_u64(&(query_type_stat->delete_count)),
values, nulls, POLAR_QUERYTYPE_STAT_COLS);
add_various_tuple(tupstore, tupdesc, "DML", "EXPLAIN", POLAR_QUERYTYPE_STAT_COLS,
pg_atomic_read_u64(&(query_type_stat->explain_count)),
values, nulls, POLAR_QUERYTYPE_STAT_COLS);
add_various_tuple(tupstore, tupdesc, "DML", "LOCKTABLE", POLAR_QUERYTYPE_STAT_COLS,
pg_atomic_read_u64(&(query_type_stat->lock_count)),
values, nulls, POLAR_QUERYTYPE_STAT_COLS);
/* DDL */
add_various_tuple(tupstore, tupdesc, "DDL", "CREATE", POLAR_QUERYTYPE_STAT_COLS,
pg_atomic_read_u64(&(query_type_stat->create_count)),
values, nulls, POLAR_QUERYTYPE_STAT_COLS);
add_various_tuple(tupstore, tupdesc, "DDL", "DROP", POLAR_QUERYTYPE_STAT_COLS,
pg_atomic_read_u64(&(query_type_stat->drop_count)),
values, nulls, POLAR_QUERYTYPE_STAT_COLS);
add_various_tuple(tupstore, tupdesc, "DDL", "RENAME", POLAR_QUERYTYPE_STAT_COLS,
pg_atomic_read_u64(&(query_type_stat->rename_count)),
values, nulls, POLAR_QUERYTYPE_STAT_COLS);
add_various_tuple(tupstore, tupdesc, "DDL", "TRUNCATE", POLAR_QUERYTYPE_STAT_COLS,
pg_atomic_read_u64(&(query_type_stat->truncate_count)),
values, nulls, POLAR_QUERYTYPE_STAT_COLS);
add_various_tuple(tupstore, tupdesc, "DDL", "COMMENT", POLAR_QUERYTYPE_STAT_COLS,
pg_atomic_read_u64(&(query_type_stat->comment_count)),
values, nulls, POLAR_QUERYTYPE_STAT_COLS);
add_various_tuple(tupstore, tupdesc, "DDL", "ALTER", POLAR_QUERYTYPE_STAT_COLS,
pg_atomic_read_u64(&(query_type_stat->alter_count)),
values, nulls, POLAR_QUERYTYPE_STAT_COLS);
/* DCL */
add_various_tuple(tupstore, tupdesc, "DCL", "GRANT", POLAR_QUERYTYPE_STAT_COLS,
pg_atomic_read_u64(&(query_type_stat->grant_count)),
values, nulls, POLAR_QUERYTYPE_STAT_COLS);
/* clean up and return the tuplestore */
tuplestore_donestoring(tupstore);
return (Datum) 0;
}