feat(utils): Implement VFSHelper for in-memory artifact management
This commit is contained in:
33
backend/src/ea_chatbot/utils/vfs.py
Normal file
33
backend/src/ea_chatbot/utils/vfs.py
Normal file
@@ -0,0 +1,33 @@
|
||||
from typing import Dict, Any, Optional, Tuple, List
|
||||
|
||||
class VFSHelper:
|
||||
"""Helper class for managing in-memory Virtual File System (VFS) artifacts."""
|
||||
|
||||
def __init__(self, vfs_state: Dict[str, Any]):
|
||||
"""Initialize with a reference to the VFS state from AgentState."""
|
||||
self._vfs = vfs_state
|
||||
|
||||
def write(self, filename: str, content: Any, metadata: Optional[Dict[str, Any]] = None) -> None:
|
||||
"""Write a file to the VFS."""
|
||||
self._vfs[filename] = {
|
||||
"content": content,
|
||||
"metadata": metadata or {}
|
||||
}
|
||||
|
||||
def read(self, filename: str) -> Tuple[Optional[Any], Optional[Dict[str, Any]]]:
|
||||
"""Read a file and its metadata from the VFS. Returns (None, None) if not found."""
|
||||
file_data = self._vfs.get(filename)
|
||||
if file_data:
|
||||
return file_data["content"], file_data["metadata"]
|
||||
return None, None
|
||||
|
||||
def list(self) -> List[str]:
|
||||
"""List all filenames in the VFS."""
|
||||
return list(self._vfs.keys())
|
||||
|
||||
def delete(self, filename: str) -> bool:
|
||||
"""Delete a file from the VFS. Returns True if deleted, False if not found."""
|
||||
if filename in self._vfs:
|
||||
del self._vfs[filename]
|
||||
return True
|
||||
return False
|
||||
Reference in New Issue
Block a user