-
Notifications
You must be signed in to change notification settings - Fork 1
Refactor Neo4j knowledge query pipeline #24
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Refactor Neo4j knowledge query pipeline #24
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Codex Review
Here are some automated review suggestions for this pull request.
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| async def execute_cypher( | ||
| self, | ||
| query: str, | ||
| parameters: Optional[Dict[str, Any]] = None, | ||
| ) -> Dict[str, Any]: | ||
| """execute cypher query""" | ||
| if not self._initialized: | ||
| raise Exception("Service not initialized") | ||
|
|
||
| try: | ||
| # try to get basic statistics, add timeout control | ||
| try: | ||
| # if graph store supports statistics query | ||
| stats = await asyncio.wait_for( | ||
| asyncio.to_thread(lambda: { | ||
| "index_type": "KnowledgeGraphIndex with Neo4j vector store", | ||
| "graph_store_type": type(self.graph_store).__name__, | ||
| "initialized": self._initialized | ||
| }), | ||
| timeout=self.connection_timeout | ||
| ) | ||
|
|
||
| return { | ||
| "success": True, | ||
| "statistics": stats, | ||
| "message": "Knowledge graph is active" | ||
| } | ||
|
|
||
| except asyncio.TimeoutError: | ||
| return { | ||
| "success": False, | ||
| "error": f"Statistics retrieval timed out after {self.connection_timeout}s" | ||
| } | ||
|
|
||
| result = await asyncio.wait_for( | ||
| asyncio.to_thread( | ||
| self.graph_store.query, | ||
| query, | ||
| parameters or {}, | ||
| ), | ||
| timeout=self.operation_timeout, | ||
| ) | ||
|
|
||
| return { | ||
| "success": True, | ||
| "result": result, | ||
| "query": query, | ||
| } | ||
|
|
||
| except asyncio.TimeoutError: | ||
| error_msg = f"Cypher execution timed out after {self.operation_timeout}s" | ||
| logger.error(error_msg) | ||
| return { | ||
| "success": False, | ||
| "error": error_msg, | ||
| "timeout": self.operation_timeout, | ||
| } | ||
| except Exception as e: | ||
| logger.error(f"Failed to get statistics: {e}") | ||
| logger.error(f"Failed to execute cypher query: {e}") | ||
| return { | ||
| "success": False, | ||
| "error": str(e) | ||
| "error": str(e), | ||
| } | ||
| async def clear_knowledge_base(self) -> Dict[str, Any]: | ||
| """clear knowledge base""" | ||
|
|
||
| async def export_graph(self, output_path: Union[str, Path]) -> Dict[str, Any]: | ||
| """export knowledge graph to file""" | ||
| if not self._initialized: | ||
| raise Exception("Service not initialized") | ||
|
|
||
|
|
||
| output_path = Path(output_path) | ||
| try: | ||
| # recreate empty index, add timeout control | ||
| storage_context = StorageContext.from_defaults( | ||
| graph_store=self.graph_store | ||
| export_result = await asyncio.wait_for( | ||
| asyncio.to_thread(self.graph_store.export_graph, str(output_path)), | ||
| timeout=self.operation_timeout, | ||
| ) | ||
|
|
||
| self.knowledge_index = await asyncio.wait_for( | ||
| asyncio.to_thread(lambda: KnowledgeGraphIndex( | ||
| nodes=[], | ||
| storage_context=storage_context, | ||
| show_progress=True | ||
| )), | ||
| timeout=self.connection_timeout | ||
| ) | ||
|
|
||
| # recreate query engine | ||
| self.query_engine = self.knowledge_index.as_query_engine( | ||
| include_text=True, | ||
| response_mode="tree_summarize", | ||
| embedding_mode="hybrid" | ||
| ) | ||
|
|
||
| logger.info("Knowledge base cleared successfully") | ||
|
|
||
|
|
||
| return { | ||
| "success": True, | ||
| "message": "Knowledge base cleared successfully" | ||
| "output_path": str(output_path), | ||
| "result": export_result, | ||
| } | ||
|
|
||
| except asyncio.TimeoutError: | ||
| error_msg = f"Clear operation timed out after {self.connection_timeout}s" | ||
| error_msg = f"Graph export timed out after {self.operation_timeout}s" | ||
| logger.error(error_msg) | ||
| return { | ||
| "success": False, | ||
| "error": error_msg | ||
| "error": error_msg, | ||
| "timeout": self.operation_timeout, | ||
| } | ||
| except Exception as e: | ||
| logger.error(f"Failed to clear knowledge base: {e}") | ||
| logger.error(f"Failed to export graph: {e}") | ||
| return { | ||
| "success": False, | ||
| "error": str(e) | ||
| "error": str(e), | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Restore get_statistics/clear_knowledge_base API
The service no longer exposes get_statistics or clear_knowledge_base—the section now defines execute_cypher and export_graph instead. However, multiple API and MCP entry points (e.g. api/routes.py’s /knowledge/statistics and /knowledge/clear handlers, MCP tools, and their unit tests) still call knowledge_service.get_statistics() and knowledge_service.clear_knowledge_base(). At runtime those imports will raise AttributeError as soon as any of those endpoints or tools are invoked, effectively breaking the existing statistics/clear flows. Either keep the previous methods (even as wrappers) or update all call sites to the new names before merging.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR refactors the Neo4j knowledge query pipeline by introducing a reusable Neo4jRAGPipeline class that orchestrates knowledge-graph and vector retrievers with a shared response synthesizer. The query API is extended to expose fine-grained pipeline configuration options, tool execution hooks, and step-by-step traces, replacing the previous monolithic query_engine approach.
Key Changes:
- Introduced
Neo4jRAGPipelineandPipelineConfigclasses to compose graph/vector retrieval, response synthesis, and optional tool execution in a single pipeline - Extended the knowledge query API to accept granular configuration (
use_graph,use_vector,use_tools,top_k,graph_depth,tool_kwargs) - Updated documentation (README, API guide, architecture docs) to reflect the new pipeline structure and mapping of query modes to retrieval steps
Reviewed Changes
Copilot reviewed 8 out of 8 changed files in this pull request and generated 6 comments.
Show a summary per file
| File | Description |
|---|---|
src/codebase_rag/services/knowledge/neo4j_knowledge_service.py |
Core refactor introducing Neo4jRAGPipeline class and updating Neo4jKnowledgeService to build/use the pipeline instead of a single query_engine |
src/codebase_rag/api/routes.py |
Extended QueryRequest model and route to accept new pipeline configuration fields |
docs/guide/knowledge/query.md |
Added pipeline step mapping tables, updated code examples to use new field names and response structure |
docs/architecture/dataflow.md |
Replaced old query flow narrative with new pipeline-centric step descriptions and mermaid diagram |
docs/architecture/components.md |
Updated component initialization and query method signatures to reflect pipeline structure |
docs/api/rest.md |
Expanded API documentation with new request parameters and detailed response structure including pipeline traces |
README_CN.md |
Updated Chinese README examples to include new query parameters |
README.md |
Updated English README examples to include new query parameters |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| """Merge retrieved nodes by keeping the highest scoring entry per node id.""" | ||
|
|
||
| for node in nodes: | ||
| node_id = node.node.node_id if node.node else node.node_id |
Copilot
AI
Nov 7, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The node ID extraction pattern node.node.node_id if node.node else node.node_id is duplicated in multiple methods (_merge_nodes, _summarize_nodes, _format_source_nodes, and search_similar_nodes). Extract this into a helper method to reduce duplication and improve maintainability.
| def _collect_statistics() -> Dict[str, Any]: | ||
| base_stats: Dict[str, Any] = { | ||
| "initialized": self._initialized, | ||
| "graph_store_type": type(self.graph_store).__name__ | ||
| if self.graph_store | ||
| else None, | ||
| "vector_index_type": type(self.vector_index).__name__ | ||
| if self.vector_index | ||
| else None, | ||
| "pipeline": { | ||
| "default_top_k": getattr(self.query_pipeline, "default_top_k", None), | ||
| "default_graph_depth": getattr( | ||
| self.query_pipeline, "default_graph_depth", None | ||
| ), | ||
| "supports_tools": bool(self.function_tools), | ||
| }, | ||
| } |
Copilot
AI
Nov 7, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The nested function _collect_statistics uses self from the enclosing scope, which requires careful handling when moved to a thread via asyncio.to_thread (line 1012). While this works in Python, consider making this a regular instance method to avoid potential closure-related issues and improve code clarity.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@copilot open a new pull request to apply changes based on this feedback
| | Mode / Flags | Graph Retrieval | Vector Retrieval | Tool Hooks | | ||
| | --- | --- | --- | --- | | ||
| | `hybrid` (default) | ✅ | ✅ | Optional (`use_tools`) | | ||
| | `graph_only` | ✅ | ❌ | Optional (`use_tools`) | | ||
| | `vector_only` | ❌ | ✅ | Optional (`use_tools`) | | ||
| | Custom (`use_graph` / `use_vector`) | As configured | As configured | Optional (`use_tools`) | |
Copilot
AI
Nov 7, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The same pipeline step mapping table is duplicated at lines 234-241 and lines 436-443. Consider using a reference or include to avoid maintaining two identical copies of this documentation.
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Summary
Testing
Codex Task