Skip to content

Commit 99a3fc7

Browse files
CopilotMte90
andcommitted
Move DB operations to dedicated files in db folder
Co-authored-by: Mte90 <403283+Mte90@users.noreply.github.com>
1 parent d5d48d6 commit 99a3fc7

File tree

4 files changed

+463
-363
lines changed

4 files changed

+463
-363
lines changed

ai/analyzer.py

Lines changed: 8 additions & 245 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,6 @@
22
import json
33
import time
44
import traceback
5-
import sqlite3
6-
import importlib.resources
75
import hashlib
86
import math
97
from pathlib import Path
@@ -13,6 +11,14 @@
1311
import threading
1412

1513
from db.operations import store_file, needs_reindex, set_project_metadata_batch, get_project_metadata
14+
from db.vector_operations import (
15+
connect_db as _connect_db,
16+
load_sqlite_vector_extension as _load_sqlite_vector_extension,
17+
ensure_chunks_and_meta as _ensure_chunks_and_meta,
18+
insert_chunk_vector_with_retry as _insert_chunk_vector_with_retry,
19+
search_vectors as _search_vectors,
20+
get_chunk_text as _get_chunk_text,
21+
)
1622
from .openai import get_embedding_for_text, call_coding_api
1723
from llama_index.core import Document
1824
from utils.logger import get_logger
@@ -49,18 +55,6 @@
4955
_THREADPOOL_WORKERS = max(16, EMBEDDING_CONCURRENCY + 8)
5056
_EXECUTOR = concurrent.futures.ThreadPoolExecutor(max_workers=_THREADPOOL_WORKERS)
5157

52-
# sqlite-vector defaults (sensible fixed defaults)
53-
SQLITE_VECTOR_PKG = "sqlite_vector.binaries"
54-
SQLITE_VECTOR_RESOURCE = "vector"
55-
SQLITE_VECTOR_VERSION_FN = "vector_version" # SELECT vector_version();
56-
57-
# Strict behavior: fail fast if extension can't be loaded or calls fail
58-
STRICT_VECTOR_INTEGRATION = True
59-
60-
# Retry policy for DB-locked operations
61-
DB_LOCK_RETRY_COUNT = 6
62-
DB_LOCK_RETRY_BASE_DELAY = 0.05 # seconds, exponential backoff multiplier
63-
6458
logger = get_logger(__name__)
6559

6660

@@ -84,7 +78,6 @@ def compute_file_hash(content: str) -> str:
8478
return hashlib.sha256(content.encode('utf-8')).hexdigest()
8579

8680

87-
# Simple chunker (character-based). Tunable CHUNK_SIZE, CHUNK_OVERLAP.
8881
def chunk_text(text: str, chunk_size: int = CHUNK_SIZE, overlap: int = CHUNK_OVERLAP) -> List[str]:
8982
if chunk_size <= 0:
9083
return [text]
@@ -99,236 +92,6 @@ def chunk_text(text: str, chunk_size: int = CHUNK_SIZE, overlap: int = CHUNK_OVE
9992
return chunks
10093

10194

102-
# --- sqlite-vector integration helpers ---------------------------------------
103-
def _connect_db(db_path: str, timeout: float = 30.0) -> sqlite3.Connection:
104-
# timeout instructs sqlite to wait up to `timeout` seconds for locks
105-
conn = sqlite3.connect(db_path, timeout=timeout, check_same_thread=False)
106-
conn.row_factory = sqlite3.Row
107-
try:
108-
conn.execute("PRAGMA busy_timeout = 30000;") # 30s
109-
except Exception:
110-
pass
111-
return conn
112-
113-
114-
def _load_sqlite_vector_extension(conn: sqlite3.Connection) -> None:
115-
"""
116-
Loads sqlite-vector binary from the installed python package and performs a lightweight
117-
sanity check (calls vector_version() if available). Raises on error if STRICT_VECTOR_INTEGRATION.
118-
"""
119-
try:
120-
ext_path = importlib.resources.files(SQLITE_VECTOR_PKG) / SQLITE_VECTOR_RESOURCE
121-
conn.load_extension(str(ext_path))
122-
try:
123-
conn.enable_load_extension(False)
124-
except Exception:
125-
pass
126-
# optional quick check: call vector_version()
127-
try:
128-
cur = conn.execute(f"SELECT {SQLITE_VECTOR_VERSION_FN}()")
129-
_ = cur.fetchone()
130-
except Exception:
131-
# version function may not be present; ignore
132-
pass
133-
except Exception as e:
134-
if STRICT_VECTOR_INTEGRATION:
135-
raise RuntimeError(f"Failed to load sqlite-vector extension: {e}") from e
136-
else:
137-
logger.warning("sqlite-vector extension not loaded: %s", e)
138-
139-
140-
def _ensure_chunks_and_meta(conn: sqlite3.Connection):
141-
"""
142-
Create chunks table (if not exist) with embedding column and meta table for vector dimension.
143-
Safe to call multiple times.
144-
"""
145-
cur = conn.cursor()
146-
cur.execute(
147-
"""
148-
CREATE TABLE IF NOT EXISTS chunks (
149-
id INTEGER PRIMARY KEY AUTOINCREMENT,
150-
file_id INTEGER NOT NULL,
151-
path TEXT NOT NULL,
152-
chunk_index INTEGER NOT NULL,
153-
embedding BLOB,
154-
created_at TEXT DEFAULT (datetime('now'))
155-
)
156-
"""
157-
)
158-
cur.execute(
159-
"""
160-
CREATE TABLE IF NOT EXISTS vector_meta (
161-
key TEXT PRIMARY KEY,
162-
value TEXT
163-
)
164-
"""
165-
)
166-
conn.commit()
167-
168-
169-
def _set_vector_dimension(conn: sqlite3.Connection, dim: int):
170-
cur = conn.cursor()
171-
cur.execute("INSERT OR REPLACE INTO vector_meta(key, value) VALUES('dimension', ?)", (str(dim),))
172-
conn.commit()
173-
174-
175-
def _insert_chunk_vector_with_retry(conn: sqlite3.Connection, file_id: int, path: str, chunk_index: int, vector: List[float]) -> int:
176-
"""
177-
Insert a chunk row with embedding using vector_as_f32(json); retries on sqlite3.OperationalError 'database is locked'.
178-
Returns the chunks.rowid.
179-
"""
180-
cur = conn.cursor()
181-
# Ensure schema/meta present
182-
_ensure_chunks_and_meta(conn)
183-
184-
# dimension handling: store or verify
185-
cur.execute("SELECT value FROM vector_meta WHERE key = 'dimension'")
186-
row = cur.fetchone()
187-
dim = len(vector)
188-
if not row:
189-
_set_vector_dimension(conn, dim)
190-
try:
191-
conn.execute(f"SELECT vector_init('chunks', 'embedding', 'dimension={dim},type=FLOAT32,distance=COSINE')")
192-
except Exception as e:
193-
raise RuntimeError(f"vector_init failed: {e}") from e
194-
else:
195-
stored_dim = int(row[0])
196-
if stored_dim != dim:
197-
raise RuntimeError(f"Embedding dimension mismatch: stored={stored_dim}, new={dim}")
198-
199-
q_vec = json.dumps(vector)
200-
201-
attempt = 0
202-
while True:
203-
try:
204-
# use vector_as_f32(json) as per API so extension formats blob
205-
cur.execute("INSERT INTO chunks (file_id, path, chunk_index, embedding) VALUES (?, ?, ?, vector_as_f32(?))",
206-
(file_id, path, chunk_index, q_vec))
207-
conn.commit()
208-
return int(cur.lastrowid)
209-
except sqlite3.OperationalError as e:
210-
msg = str(e).lower()
211-
if "database is locked" in msg and attempt < DB_LOCK_RETRY_COUNT:
212-
attempt += 1
213-
delay = DB_LOCK_RETRY_BASE_DELAY * (2 ** (attempt - 1))
214-
time.sleep(delay)
215-
continue
216-
else:
217-
raise RuntimeError(f"Failed to INSERT chunk vector (vector_as_f32 call): {e}") from e
218-
except Exception as e:
219-
raise RuntimeError(f"Failed to INSERT chunk vector (vector_as_f32 call): {e}") from e
220-
221-
222-
def _search_vectors(database_path: str, q_vector: List[float], top_k: int = 5) -> List[Dict[str, Any]]:
223-
"""
224-
Uses vector_full_scan to retrieve nearest neighbors from the chunks table.
225-
Returns list of dicts: {file_id, path, chunk_index, score}
226-
"""
227-
conn = _connect_db(database_path)
228-
try:
229-
_load_sqlite_vector_extension(conn)
230-
_ensure_chunks_and_meta(conn)
231-
232-
q_json = json.dumps(q_vector)
233-
cur = conn.cursor()
234-
try:
235-
cur.execute(
236-
"""
237-
SELECT c.file_id, c.path, c.chunk_index, v.distance
238-
FROM vector_full_scan('chunks', 'embedding', vector_as_f32(?), ?) AS v
239-
JOIN chunks AS c ON c.rowid = v.rowid
240-
ORDER BY v.distance ASC
241-
LIMIT ?
242-
""",
243-
(q_json, top_k, top_k),
244-
)
245-
rows = cur.fetchall()
246-
except Exception as e:
247-
raise RuntimeError(f"vector_full_scan call failed: {e}") from e
248-
249-
results: List[Dict[str, Any]] = []
250-
for file_id, path, chunk_index, distance in rows:
251-
try:
252-
score = 1.0 - float(distance)
253-
except Exception:
254-
score = float(distance)
255-
results.append({"file_id": int(file_id), "path": path, "chunk_index": int(chunk_index), "score": score})
256-
return results
257-
finally:
258-
conn.close()
259-
260-
261-
def _get_chunk_text(database_path: str, file_id: int, chunk_index: int) -> Optional[str]:
262-
"""
263-
Get chunk text by reading from filesystem instead of database.
264-
Uses project_path metadata and file path to read the actual file.
265-
"""
266-
conn = _connect_db(database_path)
267-
try:
268-
cur = conn.cursor()
269-
# Get file path from database
270-
cur.execute("SELECT path FROM files WHERE id = ?", (file_id,))
271-
row = cur.fetchone()
272-
if not row:
273-
logger.warning(f"File not found in database: file_id={file_id}")
274-
return None
275-
276-
file_path = row[0]
277-
if not file_path:
278-
logger.warning(f"File path is empty for file_id={file_id}")
279-
return None
280-
281-
# Get project path from metadata
282-
project_path = get_project_metadata(database_path, "project_path")
283-
if not project_path:
284-
logger.error("Project path not found in metadata, cannot read file from filesystem")
285-
raise RuntimeError("Project path metadata is missing - ensure the indexing process has stored project metadata properly")
286-
287-
# Construct full file path and resolve to absolute path
288-
full_path = os.path.abspath(os.path.join(project_path, file_path))
289-
normalized_project_path = os.path.abspath(project_path)
290-
291-
# Security check: ensure the resolved path is within the project directory
292-
try:
293-
common = os.path.commonpath([full_path, normalized_project_path])
294-
if common != normalized_project_path:
295-
logger.error(f"Path traversal attempt detected: {file_path} resolves outside project directory")
296-
return None
297-
if full_path != normalized_project_path and not full_path.startswith(normalized_project_path + os.sep):
298-
logger.error(f"Path traversal attempt detected: {file_path} does not start with project directory")
299-
return None
300-
except ValueError:
301-
logger.error(f"Path traversal attempt detected: {file_path} is on a different drive or incompatible path")
302-
return None
303-
304-
# Read file content from filesystem
305-
try:
306-
with open(full_path, "r", encoding="utf-8", errors="replace") as fh:
307-
content = fh.read()
308-
except Exception as e:
309-
logger.warning(f"Failed to read file from filesystem: {full_path}, error: {e}")
310-
return None
311-
312-
if not content:
313-
return None
314-
315-
# Extract the chunk
316-
if CHUNK_SIZE <= 0:
317-
return content
318-
319-
# Validate chunk_index
320-
if chunk_index < 0:
321-
logger.warning(f"Invalid chunk_index {chunk_index} for file_id={file_id}")
322-
return None
323-
324-
step = max(1, CHUNK_SIZE - CHUNK_OVERLAP)
325-
start = chunk_index * step
326-
end = min(start + CHUNK_SIZE, len(content))
327-
return content[start:end]
328-
finally:
329-
conn.close()
330-
331-
33295
# Main synchronous processing for a single file
33396
def _process_file_sync(
33497
semaphore: threading.Semaphore,

0 commit comments

Comments
 (0)