From 02dff095b36dd051fd619a9bb1f385e0719781fb Mon Sep 17 00:00:00 2001 From: Roy Zhu Date: Thu, 6 Nov 2025 23:14:24 -0500 Subject: [PATCH 1/5] Refactor Neo4j ingestion through configurable pipelines --- docs/getting-started/configuration.md | 38 ++ src/codebase_rag/config/settings.py | 6 +- .../knowledge/neo4j_knowledge_service.py | 409 ++++++++++-------- .../services/knowledge/pipeline_components.py | 253 +++++++++++ tests/services/test_pipeline_components.py | 103 +++++ 5 files changed, 630 insertions(+), 179 deletions(-) create mode 100644 src/codebase_rag/services/knowledge/pipeline_components.py create mode 100644 tests/services/test_pipeline_components.py diff --git a/docs/getting-started/configuration.md b/docs/getting-started/configuration.md index 9b7ac02..0d548bc 100644 --- a/docs/getting-started/configuration.md +++ b/docs/getting-started/configuration.md @@ -192,6 +192,44 @@ OPERATION_TIMEOUT=300 LARGE_DOCUMENT_TIMEOUT=600 ``` +### Ingestion Pipelines + +Document ingestion now uses [LlamaIndex ingestion pipelines](https://docs.llamaindex.ai/) with pluggable connectors, transformations, and writers. The service ships with three pipelines (`manual_input`, `file`, `directory`), and you can override or extend them from configuration by providing a JSON-style mapping in your `.env` file: + +```bash +INGESTION_PIPELINES='{ + "file": { + "transformations": [ + { + "class_path": "llama_index.core.node_parser.SimpleNodeParser", + "kwargs": {"chunk_size": 256, "chunk_overlap": 20} + }, + { + "class_path": "codebase_rag.services.knowledge.pipeline_components.MetadataEnrichmentTransformation", + "kwargs": {"metadata": {"language": "python"}} + } + ] + }, + "git": { + "connector": { + "class_path": "my_project.pipeline.GitRepositoryConnector", + "kwargs": {"branch": "main"} + }, + "transformations": [ + { + "class_path": "my_project.pipeline.CodeBlockParser", + "kwargs": {"max_tokens": 400} + } + ], + "writer": { + "class_path": "codebase_rag.services.knowledge.pipeline_components.Neo4jKnowledgeGraphWriter" + } + } +}' +``` + +Each entry is merged with the defaults. This means you can change chunking behaviour, add metadata enrichment steps, or register new data sources by publishing your own connector class. At runtime the knowledge service builds and reuses the configured pipeline instances so changes only require a service restart. + ### Neo4j Performance Tuning For large repositories: diff --git a/src/codebase_rag/config/settings.py b/src/codebase_rag/config/settings.py index ab9cf0f..f8ade89 100644 --- a/src/codebase_rag/config/settings.py +++ b/src/codebase_rag/config/settings.py @@ -7,7 +7,7 @@ from pydantic_settings import BaseSettings from pydantic import Field -from typing import Optional, Literal +from typing import Optional, Literal, Dict, Any class Settings(BaseSettings): @@ -99,6 +99,10 @@ class Settings(BaseSettings): # Document Processing Settings max_document_size: int = Field(default=10 * 1024 * 1024, description="Maximum document size in bytes (10MB)") max_payload_size: int = Field(default=50 * 1024 * 1024, description="Maximum task payload size for storage (50MB)") + ingestion_pipelines: Dict[str, Dict[str, Any]] = Field( + default_factory=dict, + description="Optional ingestion pipeline overrides", + ) # API Settings cors_origins: list = Field(default=["*"], description="CORS allowed origins") diff --git a/src/codebase_rag/services/knowledge/neo4j_knowledge_service.py b/src/codebase_rag/services/knowledge/neo4j_knowledge_service.py index 31184f6..b0489dc 100644 --- a/src/codebase_rag/services/knowledge/neo4j_knowledge_service.py +++ b/src/codebase_rag/services/knowledge/neo4j_knowledge_service.py @@ -4,18 +4,16 @@ supports multiple LLM and embedding model providers """ -from typing import List, Dict, Any, Optional, Union +from typing import List, Dict, Any, Optional from pathlib import Path import asyncio from loguru import logger import time from llama_index.core import ( - KnowledgeGraphIndex, - Document, + KnowledgeGraphIndex, Settings, StorageContext, - SimpleDirectoryReader ) # LLM Providers @@ -33,10 +31,12 @@ # Graph Store from llama_index.graph_stores.neo4j import Neo4jGraphStore -# Core components -from llama_index.core.node_parser import SimpleNodeParser - from codebase_rag.config import settings +from codebase_rag.services.knowledge.pipeline_components import ( + PipelineBundle, + build_pipeline_bundle, + merge_pipeline_configs, +) class Neo4jKnowledgeService: """knowledge graph service based on Neo4j's native vector index""" @@ -46,6 +46,7 @@ def __init__(self): self.knowledge_index = None self.query_engine = None self._initialized = False + self._pipeline_bundles: Dict[str, PipelineBundle] = {} # get timeout settings from config self.connection_timeout = settings.connection_timeout @@ -101,7 +102,7 @@ def _create_llm(self): def _create_embedding_model(self): """create embedding model instance based on config""" provider = settings.embedding_provider.lower() - + if provider == "ollama": return OllamaEmbedding( model_name=settings.ollama_embedding_model, @@ -139,7 +140,101 @@ def _create_embedding_model(self): ) else: raise ValueError(f"Unsupported embedding provider: {provider}") - + + def _default_pipeline_configs(self) -> Dict[str, Dict[str, Any]]: + """Return the built-in ingestion pipeline configuration.""" + + node_parser_config = { + "class_path": "llama_index.core.node_parser.SimpleNodeParser", + "kwargs": { + "chunk_size": settings.chunk_size, + "chunk_overlap": settings.chunk_overlap, + }, + } + metadata_transform = ( + "codebase_rag.services.knowledge.pipeline_components.MetadataEnrichmentTransformation" + ) + writer_path = ( + "codebase_rag.services.knowledge.pipeline_components.Neo4jKnowledgeGraphWriter" + ) + + return { + "manual_input": { + "connector": { + "class_path": "codebase_rag.services.knowledge.pipeline_components.ManualDocumentConnector", + }, + "transformations": [ + dict(node_parser_config), + { + "class_path": metadata_transform, + "kwargs": {"metadata": {"pipeline": "manual_input"}}, + }, + ], + "writer": {"class_path": writer_path}, + }, + "file": { + "connector": { + "class_path": "codebase_rag.services.knowledge.pipeline_components.SimpleFileConnector", + }, + "transformations": [ + dict(node_parser_config), + { + "class_path": metadata_transform, + "kwargs": {"metadata": {"pipeline": "file"}}, + }, + ], + "writer": {"class_path": writer_path}, + }, + "directory": { + "connector": { + "class_path": "codebase_rag.services.knowledge.pipeline_components.SimpleDirectoryConnector", + "kwargs": { + "recursive": True, + "file_extensions": [ + ".txt", + ".md", + ".py", + ".js", + ".ts", + ".sql", + ".json", + ".yaml", + ".yml", + ], + }, + }, + "transformations": [ + dict(node_parser_config), + { + "class_path": metadata_transform, + "kwargs": {"metadata": {"pipeline": "directory"}}, + }, + ], + "writer": {"class_path": writer_path}, + }, + } + + def _setup_ingestion_pipelines(self) -> None: + """Build ingestion pipelines from defaults and user configuration.""" + + default_config = self._default_pipeline_configs() + merged_config = merge_pipeline_configs(default_config, settings.ingestion_pipelines) + + bundles: Dict[str, PipelineBundle] = {} + for name, config in merged_config.items(): + try: + bundles[name] = build_pipeline_bundle( + name, + knowledge_index=self.knowledge_index, + graph_store=self.graph_store, + configuration=config, + ) + logger.debug(f"Built ingestion pipeline '{name}'") + except Exception as exc: + logger.error(f"Failed to build ingestion pipeline '{name}': {exc}") + + self._pipeline_bundles = bundles + async def initialize(self) -> bool: """initialize service""" try: @@ -202,203 +297,161 @@ async def initialize(self) -> bool: response_mode="tree_summarize", embedding_mode="hybrid" ) - + + self._setup_ingestion_pipelines() + self._initialized = True logger.success("Neo4j Knowledge Service initialized successfully") return True - + except Exception as e: logger.error(f"Failed to initialize Neo4j Knowledge Service: {e}") return False - - async def add_document(self, - content: str, - title: str = None, - metadata: Dict[str, Any] = None) -> Dict[str, Any]: - """add document to knowledge graph""" - if not self._initialized: - raise Exception("Service not initialized") - + + async def _run_ingestion_pipeline( + self, + pipeline_name: str, + *, + connector_overrides: Dict[str, Any], + timeout: Optional[int] = None, + ) -> Dict[str, Any]: + if pipeline_name not in self._pipeline_bundles: + raise ValueError(f"Pipeline '{pipeline_name}' is not configured") + + bundle = self._pipeline_bundles[pipeline_name] + connector = bundle.instantiate_connector(**connector_overrides) + + documents = await connector.aload_data() + documents = list(documents) + if not documents: + return { + "success": False, + "error": f"Pipeline '{pipeline_name}' produced no documents", + } + + timeout = timeout or self.operation_timeout + total_chars = sum(len(doc.text) for doc in documents) + logger.info( + f"Running pipeline '{pipeline_name}' with {len(documents)} documents (total chars: {total_chars})" + ) + + def _process_pipeline() -> Dict[str, Any]: + nodes = bundle.pipeline.run(show_progress=False, documents=documents) + bundle.writer.write(nodes) + return { + "nodes_count": len(nodes), + "documents_count": len(documents), + } + try: - # create document - doc = Document( - text=content, - metadata={ - "title": title or "Untitled", - "source": "manual_input", - "timestamp": time.time(), - **(metadata or {}) - } + result = await asyncio.wait_for( + asyncio.to_thread(_process_pipeline), + timeout=timeout, ) - - # select timeout based on document size - content_size = len(content) - timeout = self.operation_timeout if content_size < 10000 else self.large_document_timeout - - logger.info(f"Adding document '{title}' (size: {content_size} chars, timeout: {timeout}s)") - - # use async timeout control for insert operation - await asyncio.wait_for( - asyncio.to_thread(self.knowledge_index.insert, doc), - timeout=timeout + logger.info( + f"Pipeline '{pipeline_name}' completed with {result['nodes_count']} nodes" ) - - logger.info(f"Successfully added document: {title}") - return { "success": True, - "message": f"Document '{title}' added to knowledge graph", - "document_id": doc.doc_id, - "content_size": content_size + "pipeline": pipeline_name, + "documents_count": result["documents_count"], + "nodes_count": result["nodes_count"], + "total_chars": total_chars, } - except asyncio.TimeoutError: - error_msg = f"Document insertion timed out after {timeout}s" + error_msg = ( + f"Pipeline '{pipeline_name}' execution timed out after {timeout}s" + ) logger.error(error_msg) - return { - "success": False, - "error": error_msg, - "timeout": timeout - } - except Exception as e: - logger.error(f"Failed to add document: {e}") - return { - "success": False, - "error": str(e) - } + return {"success": False, "error": error_msg, "timeout": timeout} + except Exception as exc: + logger.error(f"Pipeline '{pipeline_name}' failed: {exc}") + return {"success": False, "error": str(exc)} + + async def add_document(self, + content: str, + title: str = None, + metadata: Dict[str, Any] = None) -> Dict[str, Any]: + """add document to knowledge graph""" + if not self._initialized: + raise Exception("Service not initialized") + + metadata = metadata or {} + metadata.setdefault("title", title or metadata.get("title", "Untitled")) + metadata.setdefault("source", metadata.get("source", "manual_input")) + metadata.setdefault("timestamp", metadata.get("timestamp", time.time())) + + content_size = len(content) + timeout = ( + self.operation_timeout if content_size < 10000 else self.large_document_timeout + ) + + result = await self._run_ingestion_pipeline( + "manual_input", + connector_overrides={ + "content": content, + "title": title, + "metadata": metadata, + }, + timeout=timeout, + ) + + if result.get("success"): + result.update({ + "message": f"Document '{metadata['title']}' added to knowledge graph", + "content_size": content_size, + }) + return result async def add_file(self, file_path: str) -> Dict[str, Any]: """add file to knowledge graph""" if not self._initialized: raise Exception("Service not initialized") - - try: - # read file - documents = await asyncio.to_thread( - lambda: SimpleDirectoryReader(input_files=[file_path]).load_data() + + absolute_path = str(Path(file_path).expanduser()) + result = await self._run_ingestion_pipeline( + "file", + connector_overrides={"file_path": absolute_path}, + ) + + if result.get("success"): + result.setdefault( + "message", + f"File '{absolute_path}' processed with {result.get('nodes_count', 0)} nodes", ) - - if not documents: - return { - "success": False, - "error": "No documents loaded from file" - } - - # batch insert, handle timeout for each document - success_count = 0 - errors = [] - - for i, doc in enumerate(documents): - try: - content_size = len(doc.text) - timeout = self.operation_timeout if content_size < 10000 else self.large_document_timeout - - await asyncio.wait_for( - asyncio.to_thread(self.knowledge_index.insert, doc), - timeout=timeout - ) - success_count += 1 - logger.debug(f"Added document {i+1}/{len(documents)} from {file_path}") - - except asyncio.TimeoutError: - error_msg = f"Document {i+1} timed out" - errors.append(error_msg) - logger.warning(error_msg) - except Exception as e: - error_msg = f"Document {i+1} failed: {str(e)}" - errors.append(error_msg) - logger.warning(error_msg) - - logger.info(f"Added {success_count}/{len(documents)} documents from {file_path}") - - return { - "success": success_count > 0, - "message": f"Added {success_count}/{len(documents)} documents from {file_path}", - "documents_count": len(documents), - "success_count": success_count, - "errors": errors - } - - except Exception as e: - logger.error(f"Failed to add file {file_path}: {e}") - return { - "success": False, - "error": str(e) - } + else: + result.setdefault("error", f"Failed to process file '{absolute_path}'") + return result - async def add_directory(self, + async def add_directory(self, directory_path: str, recursive: bool = True, file_extensions: List[str] = None) -> Dict[str, Any]: """batch add files in directory""" if not self._initialized: raise Exception("Service not initialized") - - try: - # set file extension filter - if file_extensions is None: - file_extensions = [".txt", ".md", ".py", ".js", ".ts", ".sql", ".json", ".yaml", ".yml"] - - # read directory - reader = SimpleDirectoryReader( - input_dir=directory_path, - recursive=recursive, - file_extractor={ext: None for ext in file_extensions} + + absolute_path = str(Path(directory_path).expanduser()) + overrides: Dict[str, Any] = { + "directory_path": absolute_path, + "recursive": recursive, + } + if file_extensions is not None: + overrides["file_extensions"] = file_extensions + + result = await self._run_ingestion_pipeline( + "directory", + connector_overrides=overrides, + ) + + if result.get("success"): + result.setdefault( + "message", + f"Directory '{absolute_path}' processed with {result.get('documents_count', 0)} documents", ) - - documents = await asyncio.to_thread(reader.load_data) - - if not documents: - return { - "success": False, - "error": "No documents found in directory" - } - - # batch insert, handle timeout for each document - success_count = 0 - errors = [] - - logger.info(f"Processing {len(documents)} documents from {directory_path}") - - for i, doc in enumerate(documents): - try: - content_size = len(doc.text) - timeout = self.operation_timeout if content_size < 10000 else self.large_document_timeout - - await asyncio.wait_for( - asyncio.to_thread(self.knowledge_index.insert, doc), - timeout=timeout - ) - success_count += 1 - - if i % 10 == 0: # record progress every 10 documents - logger.info(f"Progress: {i+1}/{len(documents)} documents processed") - - except asyncio.TimeoutError: - error_msg = f"Document {i+1} ({doc.metadata.get('file_name', 'unknown')}) timed out" - errors.append(error_msg) - logger.warning(error_msg) - except Exception as e: - error_msg = f"Document {i+1} ({doc.metadata.get('file_name', 'unknown')}) failed: {str(e)}" - errors.append(error_msg) - logger.warning(error_msg) - - logger.info(f"Successfully added {success_count}/{len(documents)} documents from {directory_path}") - - return { - "success": success_count > 0, - "message": f"Added {success_count}/{len(documents)} documents from {directory_path}", - "documents_count": len(documents), - "success_count": success_count, - "errors": errors - } - - except Exception as e: - logger.error(f"Failed to add directory {directory_path}: {e}") - return { - "success": False, - "error": str(e) - } + else: + result.setdefault("error", f"Failed to process directory '{absolute_path}'") + return result async def query(self, question: str, diff --git a/src/codebase_rag/services/knowledge/pipeline_components.py b/src/codebase_rag/services/knowledge/pipeline_components.py new file mode 100644 index 0000000..4ecda4c --- /dev/null +++ b/src/codebase_rag/services/knowledge/pipeline_components.py @@ -0,0 +1,253 @@ +"""Reusable ingestion pipeline components for the Neo4j knowledge service.""" + +from __future__ import annotations + +import asyncio +import time +from dataclasses import dataclass +from importlib import import_module +from pathlib import Path +from typing import Any, Dict, List, Optional, Sequence, Type + +from llama_index.core import Document +from llama_index.core.ingestion import IngestionPipeline +from llama_index.core.schema import BaseNode, TransformComponent + +from codebase_rag.config import settings + +try: # pragma: no cover - optional dependency surface differs by version + from llama_index.core.ingestion import BaseConnector, BaseWriter +except ImportError: # pragma: no cover + from typing import Protocol + + class BaseConnector(Protocol): # type: ignore[misc] + """Minimal connector protocol used for runtime type checking.""" + + def load_data(self) -> Sequence[Document]: + ... + + async def aload_data(self) -> Sequence[Document]: + ... + + class BaseWriter(Protocol): # type: ignore[misc] + """Minimal writer protocol used for runtime type checking.""" + + def write(self, nodes: Sequence[BaseNode]) -> None: + ... + + async def awrite(self, nodes: Sequence[BaseNode]) -> None: + ... + + +class BaseTransformation(TransformComponent): + """Alias for TransformComponent for readability.""" + + # TransformComponent already implements __call__/acall + + +class ManualDocumentConnector(BaseConnector): + """Connector that materialises a single document from raw text input.""" + + def __init__( + self, + content: str, + *, + title: Optional[str] = None, + metadata: Optional[Dict[str, Any]] = None, + ) -> None: + self._content = content + self._title = title or "Untitled" + self._metadata = metadata or {} + + def _build_document(self) -> Document: + base_metadata = { + "title": self._title, + "source": self._metadata.get("source", "manual_input"), + "timestamp": self._metadata.get("timestamp", time.time()), + } + merged_metadata = {**base_metadata, **self._metadata} + return Document(text=self._content, metadata=merged_metadata) + + def load_data(self) -> Sequence[Document]: + return [self._build_document()] + + async def aload_data(self) -> Sequence[Document]: + return self.load_data() + + +class SimpleFileConnector(BaseConnector): + """Connector that loads a single file via SimpleDirectoryReader.""" + + def __init__(self, file_path: str | Path, **reader_kwargs: Any) -> None: + self._file_path = Path(file_path) + self._reader_kwargs = reader_kwargs + + def load_data(self) -> Sequence[Document]: + from llama_index.core import SimpleDirectoryReader + + reader = SimpleDirectoryReader( + input_files=[str(self._file_path)], + **self._reader_kwargs, + ) + return reader.load_data() + + async def aload_data(self) -> Sequence[Document]: + return await asyncio.to_thread(self.load_data) + + +class SimpleDirectoryConnector(BaseConnector): + """Connector that loads all supported files from a directory.""" + + def __init__( + self, + directory_path: str | Path, + *, + recursive: bool = True, + file_extensions: Optional[Sequence[str]] = None, + reader_kwargs: Optional[Dict[str, Any]] = None, + ) -> None: + self._directory_path = Path(directory_path) + self._recursive = recursive + self._file_extensions = list(file_extensions or []) + self._reader_kwargs = reader_kwargs or {} + + def load_data(self) -> Sequence[Document]: + from llama_index.core import SimpleDirectoryReader + + file_extractor = None + if self._file_extensions: + file_extractor = {ext: None for ext in self._file_extensions} + + reader = SimpleDirectoryReader( + input_dir=str(self._directory_path), + recursive=self._recursive, + file_extractor=file_extractor, + **self._reader_kwargs, + ) + return reader.load_data() + + async def aload_data(self) -> Sequence[Document]: + return await asyncio.to_thread(self.load_data) + + +class MetadataEnrichmentTransformation(BaseTransformation): + """Inject static metadata into every processed node.""" + + def __init__(self, metadata: Optional[Dict[str, Any]] = None) -> None: + self._metadata = metadata or {} + + def __call__(self, nodes: Sequence[BaseNode], **kwargs: Any) -> Sequence[BaseNode]: + for node in nodes: + node.metadata.update(self._metadata) + return nodes + + +class Neo4jKnowledgeGraphWriter(BaseWriter): + """Writer that persists nodes through the KnowledgeGraphIndex.""" + + def __init__(self, knowledge_index, graph_store) -> None: + self._knowledge_index = knowledge_index + self._graph_store = graph_store + + def write(self, nodes: Sequence[BaseNode]) -> None: + if not nodes: + return + self._knowledge_index.insert_nodes(nodes) + + async def awrite(self, nodes: Sequence[BaseNode]) -> None: + if not nodes: + return + await asyncio.to_thread(self._knowledge_index.insert_nodes, nodes) + + +@dataclass +class PipelineBundle: + """Container for an ingestion pipeline and its runtime dependencies.""" + + name: str + connector_cls: Type[BaseConnector] + connector_kwargs: Dict[str, Any] + pipeline: IngestionPipeline + writer: BaseWriter + + def instantiate_connector(self, **overrides: Any) -> BaseConnector: + params = {**self.connector_kwargs, **overrides} + return self.connector_cls(**params) + + +def import_from_string(dotted_path: str) -> Any: + """Import a class from a dotted module path.""" + + module_path, _, attribute = dotted_path.rpartition(".") + if not module_path: + raise ImportError(f"Invalid class path: {dotted_path}") + module = import_module(module_path) + try: + return getattr(module, attribute) + except AttributeError as exc: # pragma: no cover - invalid config + raise ImportError(f"Module '{module_path}' has no attribute '{attribute}'") from exc + + +def build_pipeline_bundle( + name: str, + *, + knowledge_index, + graph_store, + configuration: Dict[str, Any], +) -> PipelineBundle: + """Construct a PipelineBundle from configuration metadata.""" + + connector_cfg = configuration.get("connector", {}) + connector_cls = import_from_string(connector_cfg.get("class_path", "")) + connector_kwargs = connector_cfg.get("kwargs", {}) + + transformations: List[TransformComponent] = [] + for transform_cfg in configuration.get("transformations", []): + transform_cls = import_from_string(transform_cfg["class_path"]) + kwargs = transform_cfg.get("kwargs", {}) + transformations.append(transform_cls(**kwargs)) + + if not transformations: + from llama_index.core.node_parser import SimpleNodeParser + + transformations.append( + SimpleNodeParser.from_defaults( + chunk_size=settings.chunk_size, + chunk_overlap=settings.chunk_overlap, + ) + ) + + writer_cfg = configuration.get("writer", {}) + writer_cls = import_from_string(writer_cfg.get("class_path", "")) + writer_kwargs = writer_cfg.get("kwargs", {}) + writer = writer_cls(knowledge_index=knowledge_index, graph_store=graph_store, **writer_kwargs) + + pipeline = IngestionPipeline(transformations=transformations) + return PipelineBundle( + name=name, + connector_cls=connector_cls, + connector_kwargs=connector_kwargs, + pipeline=pipeline, + writer=writer, + ) + + +def merge_pipeline_configs( + default_config: Dict[str, Dict[str, Any]], + override_config: Optional[Dict[str, Dict[str, Any]]], +) -> Dict[str, Dict[str, Any]]: + """Merge user supplied pipeline configuration with defaults.""" + + merged = {**default_config} + if not override_config: + return merged + + for key, value in override_config.items(): + if key in merged: + combined = dict(merged[key]) + combined.update(value) + merged[key] = combined + else: + merged[key] = value + return merged + diff --git a/tests/services/test_pipeline_components.py b/tests/services/test_pipeline_components.py new file mode 100644 index 0000000..830ac76 --- /dev/null +++ b/tests/services/test_pipeline_components.py @@ -0,0 +1,103 @@ +import asyncio +import importlib.util +from pathlib import Path +from typing import Dict + +import pytest + +try: + spec = importlib.util.spec_from_file_location( + "codebase_rag.services.knowledge.pipeline_components", + Path("src/codebase_rag/services/knowledge/pipeline_components.py"), + ) + pipeline_components = importlib.util.module_from_spec(spec) + assert spec.loader is not None + spec.loader.exec_module(pipeline_components) +except ImportError: # pragma: no cover - dependency mismatch + pipeline_components = None + build_pipeline_bundle = merge_pipeline_configs = None +else: + build_pipeline_bundle = pipeline_components.build_pipeline_bundle + merge_pipeline_configs = pipeline_components.merge_pipeline_configs + +pytestmark = pytest.mark.skipif( + pipeline_components is None, reason="llama_index could not be imported" +) + + +class DummyKnowledgeIndex: + def __init__(self) -> None: + self.inserted_nodes = [] + + def insert_nodes(self, nodes): + self.inserted_nodes.extend(nodes) + + +class DummyGraphStore: + pass + + +@pytest.mark.asyncio +async def test_build_pipeline_bundle_executes_pipeline(): + config: Dict[str, Dict] = { + "connector": { + "class_path": "codebase_rag.services.knowledge.pipeline_components.ManualDocumentConnector", + "kwargs": {"metadata": {"source": "test"}}, + }, + "transformations": [ + { + "class_path": "llama_index.core.node_parser.SimpleNodeParser", + "kwargs": {"chunk_size": 64, "chunk_overlap": 0}, + }, + ], + "writer": { + "class_path": "codebase_rag.services.knowledge.pipeline_components.Neo4jKnowledgeGraphWriter", + }, + } + + knowledge_index = DummyKnowledgeIndex() + bundle = build_pipeline_bundle( + "test", + knowledge_index=knowledge_index, + graph_store=DummyGraphStore(), + configuration=config, + ) + + connector = bundle.instantiate_connector(content="hello world", title="Test") + documents = await connector.aload_data() + assert len(documents) == 1 + + nodes = await asyncio.to_thread( + bundle.pipeline.run, + False, + documents, + ) + assert nodes, "Pipeline should produce nodes" + + bundle.writer.write(nodes) + assert knowledge_index.inserted_nodes, "Writer should forward nodes to knowledge index" + + +def test_merge_pipeline_configs_allows_override(): + default = { + "file": { + "connector": {"class_path": "default.Connector"}, + "transformations": [], + "writer": {"class_path": "default.Writer"}, + } + } + override = { + "file": { + "connector": {"kwargs": {"recursive": False}}, + }, + "custom": { + "connector": {"class_path": "custom.Connector"}, + "transformations": [], + "writer": {"class_path": "custom.Writer"}, + }, + } + + merged = merge_pipeline_configs(default, override) + assert merged["file"]["connector"]["class_path"] == "default.Connector" + assert merged["file"]["connector"]["kwargs"] == {"recursive": False} + assert "custom" in merged From d60e1bf424938922b77bc9c83a57faa90c573793 Mon Sep 17 00:00:00 2001 From: Roy Zhu Date: Thu, 6 Nov 2025 23:47:18 -0500 Subject: [PATCH 2/5] fix: deep merge pipeline configs --- .../services/knowledge/pipeline_components.py | 25 +++++++++++++++---- 1 file changed, 20 insertions(+), 5 deletions(-) diff --git a/src/codebase_rag/services/knowledge/pipeline_components.py b/src/codebase_rag/services/knowledge/pipeline_components.py index 4ecda4c..a49140c 100644 --- a/src/codebase_rag/services/knowledge/pipeline_components.py +++ b/src/codebase_rag/services/knowledge/pipeline_components.py @@ -3,6 +3,7 @@ from __future__ import annotations import asyncio +import copy import time from dataclasses import dataclass from importlib import import_module @@ -238,16 +239,30 @@ def merge_pipeline_configs( ) -> Dict[str, Dict[str, Any]]: """Merge user supplied pipeline configuration with defaults.""" - merged = {**default_config} + def _merge_values(default: Any, override: Any) -> Any: + if isinstance(default, dict) and isinstance(override, dict): + merged_dict: Dict[str, Any] = {} + for key in default.keys() | override.keys(): + if key in override: + if key in default: + merged_dict[key] = _merge_values(default[key], override[key]) + else: + merged_dict[key] = copy.deepcopy(override[key]) + else: + merged_dict[key] = copy.deepcopy(default[key]) + return merged_dict + if isinstance(override, dict): + return copy.deepcopy(override) + return override + + merged = copy.deepcopy(default_config) if not override_config: return merged for key, value in override_config.items(): if key in merged: - combined = dict(merged[key]) - combined.update(value) - merged[key] = combined + merged[key] = _merge_values(merged[key], value) else: - merged[key] = value + merged[key] = copy.deepcopy(value) return merged From b3edac869ca6f33ca32e484d5be26c7694b212e8 Mon Sep 17 00:00:00 2001 From: Roy Zhu Date: Fri, 7 Nov 2025 13:10:37 +0800 Subject: [PATCH 3/5] Update src/codebase_rag/services/knowledge/neo4j_knowledge_service.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- .../services/knowledge/neo4j_knowledge_service.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/codebase_rag/services/knowledge/neo4j_knowledge_service.py b/src/codebase_rag/services/knowledge/neo4j_knowledge_service.py index b0489dc..ff583ce 100644 --- a/src/codebase_rag/services/knowledge/neo4j_knowledge_service.py +++ b/src/codebase_rag/services/knowledge/neo4j_knowledge_service.py @@ -316,8 +316,10 @@ async def _run_ingestion_pipeline( timeout: Optional[int] = None, ) -> Dict[str, Any]: if pipeline_name not in self._pipeline_bundles: - raise ValueError(f"Pipeline '{pipeline_name}' is not configured") - + available_pipelines = ", ".join(self._pipeline_bundles.keys()) + raise ValueError( + f"Pipeline '{pipeline_name}' is not configured. Available pipelines: {available_pipelines}" + ) bundle = self._pipeline_bundles[pipeline_name] connector = bundle.instantiate_connector(**connector_overrides) From bc0a3a3243fb9cf4eab6af3cf05024620943a423 Mon Sep 17 00:00:00 2001 From: Roy Zhu Date: Fri, 7 Nov 2025 13:10:45 +0800 Subject: [PATCH 4/5] Update src/codebase_rag/services/knowledge/pipeline_components.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- src/codebase_rag/services/knowledge/pipeline_components.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/codebase_rag/services/knowledge/pipeline_components.py b/src/codebase_rag/services/knowledge/pipeline_components.py index a49140c..3a91bf5 100644 --- a/src/codebase_rag/services/knowledge/pipeline_components.py +++ b/src/codebase_rag/services/knowledge/pipeline_components.py @@ -253,7 +253,7 @@ def _merge_values(default: Any, override: Any) -> Any: return merged_dict if isinstance(override, dict): return copy.deepcopy(override) - return override + return copy.deepcopy(override) merged = copy.deepcopy(default_config) if not override_config: From 67a2f34634e60715fb3128cb1968ad52c703cbff Mon Sep 17 00:00:00 2001 From: Roy Zhu Date: Fri, 7 Nov 2025 13:10:54 +0800 Subject: [PATCH 5/5] Update src/codebase_rag/services/knowledge/pipeline_components.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- src/codebase_rag/services/knowledge/pipeline_components.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/codebase_rag/services/knowledge/pipeline_components.py b/src/codebase_rag/services/knowledge/pipeline_components.py index 3a91bf5..2fb686a 100644 --- a/src/codebase_rag/services/knowledge/pipeline_components.py +++ b/src/codebase_rag/services/knowledge/pipeline_components.py @@ -181,7 +181,7 @@ def import_from_string(dotted_path: str) -> Any: module_path, _, attribute = dotted_path.rpartition(".") if not module_path: - raise ImportError(f"Invalid class path: {dotted_path}") + raise ImportError(f"Invalid class path (must contain at least one dot): {dotted_path}") module = import_module(module_path) try: return getattr(module, attribute)