70 lines
2.8 KiB
Python
70 lines
2.8 KiB
Python
import copy
|
|
from typing import Dict, Any, Optional, Tuple, List
|
|
from ea_chatbot.utils.logging import get_logger
|
|
|
|
logger = get_logger("utils:vfs")
|
|
|
|
def safe_vfs_copy(vfs_state: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""
|
|
Perform a safe deep copy of the VFS state.
|
|
|
|
If an entry cannot be deep-copied (e.g., it contains a non-copyable object like a DB handle),
|
|
logs an error and replaces the entry with a descriptive error marker.
|
|
This prevents crashing the graph/persistence while making the failure explicit.
|
|
"""
|
|
new_vfs = {}
|
|
for filename, data in vfs_state.items():
|
|
try:
|
|
# Attempt a standard deepcopy for isolation
|
|
new_vfs[filename] = copy.deepcopy(data)
|
|
except Exception as e:
|
|
logger.error(
|
|
f"CRITICAL: VFS artifact '{filename}' is NOT copyable/serializable: {str(e)}. "
|
|
"Replacing with error placeholder to prevent graph crash."
|
|
)
|
|
# Replace with a standardized error artifact
|
|
new_vfs[filename] = {
|
|
"content": f"<ERROR: This artifact could not be persisted or copied: {str(e)}>",
|
|
"metadata": {
|
|
"type": "error",
|
|
"error": str(e),
|
|
"original_filename": filename
|
|
}
|
|
}
|
|
return new_vfs
|
|
|
|
class VFSHelper:
|
|
"""Helper class for managing in-memory Virtual File System (VFS) artifacts."""
|
|
|
|
def __init__(self, vfs_state: Dict[str, Any]):
|
|
"""Initialize with a reference to the VFS state from AgentState."""
|
|
self._vfs = vfs_state
|
|
|
|
def write(self, filename: str, content: Any, metadata: Optional[Dict[str, Any]] = None) -> None:
|
|
"""Write a file to the VFS."""
|
|
self._vfs[filename] = {
|
|
"content": content,
|
|
"metadata": metadata or {}
|
|
}
|
|
|
|
def read(self, filename: str) -> Tuple[Optional[Any], Optional[Dict[str, Any]]]:
|
|
"""Read a file and its metadata from the VFS. Returns (None, None) if not found."""
|
|
file_data = self._vfs.get(filename)
|
|
if file_data is not None:
|
|
# Handle raw values (backwards compatibility or inconsistent schema)
|
|
if not isinstance(file_data, dict) or "content" not in file_data:
|
|
return file_data, {}
|
|
return file_data["content"], file_data.get("metadata", {})
|
|
return None, None
|
|
|
|
def list(self) -> List[str]:
|
|
"""List all filenames in the VFS."""
|
|
return list(self._vfs.keys())
|
|
|
|
def delete(self, filename: str) -> bool:
|
|
"""Delete a file from the VFS. Returns True if deleted, False if not found."""
|
|
if filename in self._vfs:
|
|
del self._vfs[filename]
|
|
return True
|
|
return False
|