Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 63 additions & 0 deletions src/query-cache.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
import crypto from "node:crypto";
import { RawRecentQuery, RecentQuery } from "./sync/pg-connector.ts";

interface CacheEntry {
firstSeen: number;
lastSeen: number;
}

export class QueryCache {
list: Record<string, CacheEntry> = {};
private readonly createdAt: number;

constructor() {
this.createdAt = Date.now();
}

isCached(key: string): boolean {
const entry = this.list[key];
if (!entry) {
return false;
}
return true;
}

isNew(key: string): boolean {
const entry = this.list[key];
if (!entry) {
return true;
}
return entry.firstSeen >= this.createdAt;
}

store(db: string, query: string) {
const key = hash(db, query);
const now = Date.now();
if (this.list[key]) {
this.list[key].lastSeen = now;
} else {
this.list[key] = { firstSeen: now, lastSeen: now };
}
return key;
}

getFirstSeen(key: string): number {
return this.list[key]?.firstSeen || Date.now();
}

sync(db: string, queries: RawRecentQuery[]): RecentQuery[] {
return queries.map(query => {
const key = this.store(db, query.query);
return {
...query,
firstSeen: this.getFirstSeen(key)
};
});
}
}

export const queryCache = new QueryCache();

function hash(db: string, query: string): string {
return crypto.createHash("sha256").update(JSON.stringify([db, query])).digest("hex")
}
10 changes: 7 additions & 3 deletions src/sync/pg-connector.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ export type SerializeResult = {
sampledRecords: Record<TableName, number>;
};

export type RecentQuery = {
export type RawRecentQuery = {
username: string;
query: string;
meanTime: number;
Expand All @@ -60,6 +60,10 @@ export type RecentQuery = {
topLevel: boolean;
};

export type RecentQuery = RawRecentQuery & {
firstSeen: number;
};

export type RecentQueriesError =
| {
kind: "error";
Expand All @@ -75,7 +79,7 @@ export type RecentQueriesError =
export type RecentQueriesResult =
| {
kind: "ok";
queries: RecentQuery[];
queries: RawRecentQuery[];
}
| RecentQueriesError;

Expand Down Expand Up @@ -449,7 +453,7 @@ ORDER BY

public async getRecentQueries(): Promise<RecentQueriesResult> {
try {
const results = await this.sql<RecentQuery[]>`
const results = await this.sql<RawRecentQuery[]>`
SELECT
pg_user.usename as "username",
query,
Expand Down
29 changes: 25 additions & 4 deletions src/sync/syncer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import { PostgresSchemaLink } from "./schema.ts";
import { withSpan } from "../otel.ts";
import { SpanStatusCode } from "@opentelemetry/api";
import { Connectable } from "./connectable.ts";
import { queryCache } from "../query-cache.ts";

type SyncOptions = DependencyAnalyzerOptions;

Expand Down Expand Up @@ -113,6 +114,10 @@ export class PostgresSyncer {
})(),
]);

if (recentQueries.kind === "ok") {
recentQueries.queries = queryCache.sync(urlString, recentQueries.queries);
}

if (dependencies.kind !== "ok") {
return dependencies;
}
Expand Down Expand Up @@ -144,12 +149,28 @@ export class PostgresSyncer {
};
}

liveQuery(
async liveQuery(
connectable: Connectable
): Promise<RecentQueriesResult | PostgresConnectionError> {
const sql = this.getConnection(connectable);
const connector = new PostgresConnector(sql);
return connector.getRecentQueries();
try {
const urlString = connectable.toString();
const sql = this.getConnection(connectable);
const connector = new PostgresConnector(sql);

const queries = await connector.getRecentQueries();

if (queries.kind === "ok") {
queries.queries = queryCache.sync(urlString, queries.queries);
}

return queries;
} catch (error) {
return {
kind: "error",
type: "postgres_connection_error",
error: error instanceof Error ? error : new Error(String(error)),
};
}
}

private checkConnection(sql: postgres.Sql) {
Expand Down