Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/build.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ jobs:
mv /tmp/.buildx-cache-new /tmp/.buildx-cache
- name: Attest
uses: actions/attest-build-provenance@v2
if: ${{ github.event_name == 'push' }}
with:
subject-name: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}
subject-digest: ${{ steps.build.outputs.digest }}
Expand Down
65 changes: 49 additions & 16 deletions src/server/http.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { SpanStatusCode, trace } from "@opentelemetry/api";
import { PostgresSyncer } from "../sync/syncer.ts";
import { log } from "../log.ts";
import * as limiter from "./rate-limit.ts";
import { SyncRequest } from "./sync.dto.ts";
import { LiveQueryRequest, SyncRequest } from "./sync.dto.ts";
import { ZodError } from "zod/v4";
import { shutdownController } from "../shutdown.ts";
import { env } from "../env.ts";
Expand Down Expand Up @@ -157,42 +157,75 @@ async function onSync(req: Request) {
return Response.json(result, { status: 200 });
}

async function onSyncLiveQuery(req: Request) {
let body: LiveQueryRequest;
try {
body = LiveQueryRequest.parse(await req.json());
} catch (e: unknown) {
if (e instanceof ZodError) {
return Response.json(
{
kind: "error",
type: "invalid_body",
error: e.issues.map((issue) => issue.message).join("\n"),
},
{ status: 400 }
);
}
throw e;
}
const queries = await syncer.liveQuery(body.db);
if (queries.kind !== "ok") {
return Response.json(queries, { status: 500 });
}
return Response.json(queries, { status: 200 });
}

export function createServer(hostname: string, port: number) {
return Deno.serve(
{ hostname, port, signal: shutdownController.signal },
async (req, info) => {
const url = new URL(req.url);
log.http(req);

if (req.method === "OPTIONS") {
return new Response("OK", {
status: 200,
headers: corsHeaders,
});
}
if (url.pathname === "/") {
return Response.redirect(
"https://github.com/Query-Doctor/local-sync",
307
);
}
const limit = limiter.sync.check(url.pathname, info.remoteAddr.hostname);
if (limit.limited) {
return limiter.appendHeaders(
new Response("Rate limit exceeded", { status: 429 }),
limit
);
}
if (url.pathname === "/") {
return Response.redirect(
"https://github.com/Query-Doctor/local-sync",
307
);
function handleResponse(res: Response) {
for (const [key, value] of Object.entries(corsHeaders)) {
res.headers.set(key, value);
}
limiter.appendHeaders(res, limit);
return res;
}
if (url.pathname === "/postgres/all") {
if (req.method === "OPTIONS") {
return new Response("OK", {
status: 200,
headers: corsHeaders,
});
}
if (req.method !== "POST") {
return new Response("Method not allowed", { status: 405 });
}
const res = await onSync(req);
for (const [key, value] of Object.entries(corsHeaders)) {
res.headers.set(key, value);
return handleResponse(res);
} else if (url.pathname === "/postgres/live") {
if (req.method !== "POST") {
return new Response("Method not allowed", { status: 405 });
}
limiter.appendHeaders(res, limit);
return res;
const res = await onSyncLiveQuery(req);
return handleResponse(res);
}
return new Response("Not found", { status: 404 });
}
Expand Down
6 changes: 6 additions & 0 deletions src/server/sync.dto.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
import { z } from "zod/v4";
import { Connectable } from "../sync/connectable.ts";

export const LiveQueryRequest = z.object({
db: z.string().transform(Connectable.transform),
});

export type LiveQueryRequest = z.infer<typeof LiveQueryRequest>;

export const SyncRequest = z.object({
db: z.string().transform(Connectable.transform),
seed: z.coerce.number().min(0).max(1).default(0),
Expand Down
5 changes: 2 additions & 3 deletions src/sync/pg-connector.ts
Original file line number Diff line number Diff line change
Expand Up @@ -436,9 +436,8 @@ ORDER BY
JOIN pg_user ON pg_user.usesysid = pg_stat_statements.userid
WHERE query not like '%pg_stat_statements%'
and query not like '%@qd_introspection%'
and pg_user.usename not in (/* supabase */ 'supabase_admin', 'supabase_auth_admin', /* neon */ 'cloud_admin')
LIMIT 10; -- @qd_introspection
`; // we're excluding `pg_stat_statements` from the results since it's almost certainly unrelated
and pg_user.usename not in (/* supabase */ 'supabase_admin', 'supabase_auth_admin', /* neon */ 'cloud_admin'); -- @qd_introspection
`; // we're excluding `pg_stat_statements` from the results since it's almost certainly unrelated
return {
kind: "ok",
queries: results,
Expand Down
51 changes: 22 additions & 29 deletions src/sync/syncer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import {
} from "./dependency-tree.ts";
import {
PostgresConnector,
RecentQuery,
RecentQueriesResult,
type TableMetadata,
} from "./pg-connector.ts";
import { PostgresSchemaLink } from "./schema.ts";
Expand All @@ -17,7 +17,7 @@ import { Connectable } from "./connectable.ts";

type SyncOptions = DependencyAnalyzerOptions;

type PostgresConnectionError = {
export type PostgresConnectionError = {
kind: "error";
type: "postgres_connection_error";
error: Error;
Expand All @@ -36,22 +36,6 @@ type PostgresSuperuserError = {

export type SyncNotice = DependencyResolutionNotice | PostgresSuperuserError;

type RecentQueries =
| {
kind: "ok";
results: RecentQuery[];
}
| {
kind: "error";
type: "postgres_error";
error: string;
}
| {
kind: "error";
type: "extension_not_installed";
extensionName: string;
};

export type SyncResult =
| {
kind: "ok";
Expand All @@ -60,7 +44,7 @@ export type SyncResult =
setup: string;
sampledRecords: Record<string, number>;
notices: SyncNotice[];
queries: RecentQueries;
queries: RecentQueriesResult;
metadata: TableMetadata[];
}
| PostgresConnectionError
Expand Down Expand Up @@ -165,28 +149,37 @@ export class PostgresSyncer {

const wrapped = schema + serializedResult.serialized;

let queries: RecentQueries;
if (recentQueries.kind === "ok") {
queries = {
kind: "ok",
results: recentQueries.queries,
};
} else {
queries = recentQueries;
}
return {
kind: "ok",
versionNum: databaseInfo.serverVersionNum,
version: databaseInfo.serverVersion,
sampledRecords: serializedResult.sampledRecords,
notices,
queries,
queries: recentQueries,
setup: wrapped,
metadata: serializedResult.schema,
};
}

liveQuery(
connectable: Connectable
): Promise<RecentQueriesResult | PostgresConnectionError> {
const sql = this.getConnection(connectable);
const connector = new PostgresConnector(sql);
return connector.getRecentQueries();
}

private async checkConnection(sql: postgres.Sql) {
await sql`select 1`;
}

private getConnection(connectable: Connectable) {
const urlString = connectable.toString();
let sql = this.connections.get(urlString);
if (!sql) {
sql = postgres(urlString);
this.connections.set(urlString, sql);
}
return sql;
}
}