Files
ea-chatbot-lg/backend/tests/test_review_fix_vfs.py

51 lines
1.5 KiB
Python

import pytest
from ea_chatbot.utils.vfs import VFSHelper
from ea_chatbot.graph.workers.data_analyst.mapping import prepare_worker_input
from ea_chatbot.graph.state import AgentState
def test_vfs_isolation_deep_copy():
"""Verify that worker VFS is deep-copied from global state."""
global_vfs = {
"data.txt": {
"content": "original",
"metadata": {"tags": ["a"]}
}
}
state = AgentState(
checklist=[{"task": "test", "worker": "data_analyst"}],
current_step=0,
messages=[],
question="test",
analysis={},
next_action="",
iterations=0,
vfs=global_vfs,
plots=[],
dfs={}
)
worker_input = prepare_worker_input(state)
worker_vfs = worker_input["vfs_state"]
# Mutate worker VFS nested object
worker_vfs["data.txt"]["metadata"]["tags"].append("b")
# Global VFS should remain unchanged
assert global_vfs["data.txt"]["metadata"]["tags"] == ["a"]
def test_vfs_schema_normalization():
"""Verify that VFSHelper handles inconsistent VFS entries."""
vfs = {
"raw.txt": "just a string", # Inconsistent with standard Dict[str, Any] entry
"valid.txt": {"content": "data", "metadata": {}}
}
helper = VFSHelper(vfs)
# Should not crash during read
content, metadata = helper.read("raw.txt")
assert content == "just a string"
assert metadata == {}
# Should not crash during list/other ops if they assume dict
assert "raw.txt" in helper.list()