diff --git a/.github/workflows/build.yaml b/.github/workflows/build.yaml index 62ae032..8042166 100644 --- a/.github/workflows/build.yaml +++ b/.github/workflows/build.yaml @@ -69,6 +69,7 @@ jobs: mv /tmp/.buildx-cache-new /tmp/.buildx-cache - name: Attest uses: actions/attest-build-provenance@v2 + if: ${{ github.event_name == 'push' }} with: subject-name: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }} subject-digest: ${{ steps.build.outputs.digest }} diff --git a/src/server/http.ts b/src/server/http.ts index e7cf57e..43a0a7e 100644 --- a/src/server/http.ts +++ b/src/server/http.ts @@ -2,7 +2,7 @@ import { SpanStatusCode, trace } from "@opentelemetry/api"; import { PostgresSyncer } from "../sync/syncer.ts"; import { log } from "../log.ts"; import * as limiter from "./rate-limit.ts"; -import { SyncRequest } from "./sync.dto.ts"; +import { LiveQueryRequest, SyncRequest } from "./sync.dto.ts"; import { ZodError } from "zod/v4"; import { shutdownController } from "../shutdown.ts"; import { env } from "../env.ts"; @@ -157,6 +157,30 @@ async function onSync(req: Request) { return Response.json(result, { status: 200 }); } +async function onSyncLiveQuery(req: Request) { + let body: LiveQueryRequest; + try { + body = LiveQueryRequest.parse(await req.json()); + } catch (e: unknown) { + if (e instanceof ZodError) { + return Response.json( + { + kind: "error", + type: "invalid_body", + error: e.issues.map((issue) => issue.message).join("\n"), + }, + { status: 400 } + ); + } + throw e; + } + const queries = await syncer.liveQuery(body.db); + if (queries.kind !== "ok") { + return Response.json(queries, { status: 500 }); + } + return Response.json(queries, { status: 200 }); +} + export function createServer(hostname: string, port: number) { return Deno.serve( { hostname, port, signal: shutdownController.signal }, @@ -164,6 +188,18 @@ export function createServer(hostname: string, port: number) { const url = new URL(req.url); log.http(req); + if (req.method === "OPTIONS") { + return new Response("OK", { + status: 200, + headers: corsHeaders, + }); + } + if (url.pathname === "/") { + return Response.redirect( + "https://github.com/Query-Doctor/local-sync", + 307 + ); + } const limit = limiter.sync.check(url.pathname, info.remoteAddr.hostname); if (limit.limited) { return limiter.appendHeaders( @@ -171,28 +207,25 @@ export function createServer(hostname: string, port: number) { limit ); } - if (url.pathname === "/") { - return Response.redirect( - "https://github.com/Query-Doctor/local-sync", - 307 - ); + function handleResponse(res: Response) { + for (const [key, value] of Object.entries(corsHeaders)) { + res.headers.set(key, value); + } + limiter.appendHeaders(res, limit); + return res; } if (url.pathname === "/postgres/all") { - if (req.method === "OPTIONS") { - return new Response("OK", { - status: 200, - headers: corsHeaders, - }); - } if (req.method !== "POST") { return new Response("Method not allowed", { status: 405 }); } const res = await onSync(req); - for (const [key, value] of Object.entries(corsHeaders)) { - res.headers.set(key, value); + return handleResponse(res); + } else if (url.pathname === "/postgres/live") { + if (req.method !== "POST") { + return new Response("Method not allowed", { status: 405 }); } - limiter.appendHeaders(res, limit); - return res; + const res = await onSyncLiveQuery(req); + return handleResponse(res); } return new Response("Not found", { status: 404 }); } diff --git a/src/server/sync.dto.ts b/src/server/sync.dto.ts index 589b70f..f2f4948 100644 --- a/src/server/sync.dto.ts +++ b/src/server/sync.dto.ts @@ -1,6 +1,12 @@ import { z } from "zod/v4"; import { Connectable } from "../sync/connectable.ts"; +export const LiveQueryRequest = z.object({ + db: z.string().transform(Connectable.transform), +}); + +export type LiveQueryRequest = z.infer; + export const SyncRequest = z.object({ db: z.string().transform(Connectable.transform), seed: z.coerce.number().min(0).max(1).default(0), diff --git a/src/sync/pg-connector.ts b/src/sync/pg-connector.ts index ee614d9..5099ad8 100644 --- a/src/sync/pg-connector.ts +++ b/src/sync/pg-connector.ts @@ -436,9 +436,8 @@ ORDER BY JOIN pg_user ON pg_user.usesysid = pg_stat_statements.userid WHERE query not like '%pg_stat_statements%' and query not like '%@qd_introspection%' - and pg_user.usename not in (/* supabase */ 'supabase_admin', 'supabase_auth_admin', /* neon */ 'cloud_admin') - LIMIT 10; -- @qd_introspection - `; // we're excluding `pg_stat_statements` from the results since it's almost certainly unrelated + and pg_user.usename not in (/* supabase */ 'supabase_admin', 'supabase_auth_admin', /* neon */ 'cloud_admin'); -- @qd_introspection + `; // we're excluding `pg_stat_statements` from the results since it's almost certainly unrelated return { kind: "ok", queries: results, diff --git a/src/sync/syncer.ts b/src/sync/syncer.ts index f2a9030..d6b5ad1 100644 --- a/src/sync/syncer.ts +++ b/src/sync/syncer.ts @@ -7,7 +7,7 @@ import { } from "./dependency-tree.ts"; import { PostgresConnector, - RecentQuery, + RecentQueriesResult, type TableMetadata, } from "./pg-connector.ts"; import { PostgresSchemaLink } from "./schema.ts"; @@ -17,7 +17,7 @@ import { Connectable } from "./connectable.ts"; type SyncOptions = DependencyAnalyzerOptions; -type PostgresConnectionError = { +export type PostgresConnectionError = { kind: "error"; type: "postgres_connection_error"; error: Error; @@ -36,22 +36,6 @@ type PostgresSuperuserError = { export type SyncNotice = DependencyResolutionNotice | PostgresSuperuserError; -type RecentQueries = - | { - kind: "ok"; - results: RecentQuery[]; - } - | { - kind: "error"; - type: "postgres_error"; - error: string; - } - | { - kind: "error"; - type: "extension_not_installed"; - extensionName: string; - }; - export type SyncResult = | { kind: "ok"; @@ -60,7 +44,7 @@ export type SyncResult = setup: string; sampledRecords: Record; notices: SyncNotice[]; - queries: RecentQueries; + queries: RecentQueriesResult; metadata: TableMetadata[]; } | PostgresConnectionError @@ -165,28 +149,37 @@ export class PostgresSyncer { const wrapped = schema + serializedResult.serialized; - let queries: RecentQueries; - if (recentQueries.kind === "ok") { - queries = { - kind: "ok", - results: recentQueries.queries, - }; - } else { - queries = recentQueries; - } return { kind: "ok", versionNum: databaseInfo.serverVersionNum, version: databaseInfo.serverVersion, sampledRecords: serializedResult.sampledRecords, notices, - queries, + queries: recentQueries, setup: wrapped, metadata: serializedResult.schema, }; } + liveQuery( + connectable: Connectable + ): Promise { + const sql = this.getConnection(connectable); + const connector = new PostgresConnector(sql); + return connector.getRecentQueries(); + } + private async checkConnection(sql: postgres.Sql) { await sql`select 1`; } + + private getConnection(connectable: Connectable) { + const urlString = connectable.toString(); + let sql = this.connections.get(urlString); + if (!sql) { + sql = postgres(urlString); + this.connections.set(urlString, sql); + } + return sql; + } }