Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions chia/cmds/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,3 +235,13 @@ def get_root_history(
from chia.cmds.data_funcs import get_root_history_cmd

run(get_root_history_cmd(rpc_port=data_rpc_port, store_id=id))


@data_cmd.command("validate", short_help="Run integrity checks for data layer db.")
@create_rpc_port_option()
def validate(
data_rpc_port: int,
) -> None:
from chia.cmds.data_funcs import validate_data_cmd

run(validate_data_cmd(rpc_port=data_rpc_port))
11 changes: 11 additions & 0 deletions chia/cmds/data_funcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,3 +189,14 @@ async def get_root_history_cmd(
print(f"Connection error. Check if data is running at {rpc_port}")
except Exception as e:
print(f"Exception from 'data': {e}")


async def validate_data_cmd(rpc_port: Optional[int]) -> None:
try:
async with get_client(rpc_port) as (client, rpc_port):
res = await client.validate_data()
print(res)
except aiohttp.ClientConnectorError:
print(f"Connection error. Check if data is running at {rpc_port}")
except Exception as e:
print(f"Exception from 'data': {e}")
3 changes: 3 additions & 0 deletions chia/data_layer/data_layer.py
Original file line number Diff line number Diff line change
Expand Up @@ -365,3 +365,6 @@ async def periodically_fetch_data(self) -> None:
await asyncio.sleep(fetch_data_interval)
except asyncio.CancelledError:
pass

async def validate_data(self) -> None:
await self.data_store.check()
10 changes: 10 additions & 0 deletions chia/data_layer/data_layer_errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,3 +59,13 @@ def __init__(self, node_hashes: List[bytes32]) -> None:
bytes_objects=node_hashes,
)
)


class AncestorTableError(IntegrityError):
def __init__(self, node_hashes: List[bytes32]) -> None:
super().__init__(
build_message_with_hashes(
message="Found nodes with wrong ancestor:",
bytes_objects=node_hashes,
)
)
47 changes: 42 additions & 5 deletions chia/data_layer/data_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
NodeHashError,
TerminalLeftRightError,
TreeGenerationIncrementingError,
AncestorTableError,
)
from chia.data_layer.data_layer_types import (
InternalNode,
Expand Down Expand Up @@ -358,12 +359,48 @@ async def _check_hashes(self, *, lock: bool = True) -> None:
if len(bad_node_hashes) > 0:
raise NodeHashError(node_hashes=bad_node_hashes)

async def _check_ancestors_table(self, *, lock: bool = True) -> None:
async with self.db_wrapper.locked_transaction(lock=lock):
cursor = await self.db.execute("SELECT * FROM root")
roots = [Root.from_row(row=row) async for row in cursor]
hashes: List[bytes32] = []

for root in roots:
if root.node_hash is None:
continue
nodes = await self.get_left_to_right_ordering(root.node_hash, root.tree_id, True, lock=False)
for node in nodes:
if isinstance(node, InternalNode):
for child_hash in (node.left_hash, node.right_hash):
cursor = await self.db.execute(
"""
SELECT ancestors.ancestor AS ancestor, MAX(ancestors.generation) AS generation
FROM ancestors
WHERE ancestors.hash == :hash
AND ancestors.tree_id == :tree_id
AND ancestors.generation <= :generation
GROUP BY hash
""",
{
"hash": child_hash.hex(),
"tree_id": root.tree_id.hex(),
"generation": root.generation,
},
)
row = await cursor.fetchone()
if row is None or row["ancestor"] != node.hash.hex():
hashes.append(child_hash)

if len(hashes) > 0:
raise AncestorTableError(node_hashes=hashes)

_checks: Tuple[Callable[["DataStore"], Awaitable[None]], ...] = (
_check_internal_key_value_are_null,
_check_internal_left_right_are_bytes32,
_check_terminal_left_right_are_null,
_check_roots_are_incrementing,
_check_hashes,
_check_ancestors_table,
)

async def create_tree(self, tree_id: bytes32, *, lock: bool = True, status: Status = Status.PENDING) -> bool:
Expand Down Expand Up @@ -949,18 +986,18 @@ async def get_left_to_right_ordering(
lock: bool = True,
num_nodes: int = 1000000000,
) -> List[Node]:
ancestors = await self.get_ancestors(node_hash, tree_id, lock=True)
path_hashes = {node_hash, *(ancestor.hash for ancestor in ancestors)}
# The hashes that need to be traversed, initialized here as the hashes to the right of the ancestors
# ordered from shallowest (root) to deepest (leaves) so .pop() from the end gives the deepest first.
if not get_subtree_only:
ancestors = await self.get_ancestors(node_hash, tree_id, lock=lock)
path_hashes = {node_hash, *(ancestor.hash for ancestor in ancestors)}
# The hashes that need to be traversed, initialized here as the hashes to the right of the ancestors
# ordered from shallowest (root) to deepest (leaves) so .pop() from the end gives the deepest first.
stack = [ancestor.right_hash for ancestor in reversed(ancestors) if ancestor.right_hash not in path_hashes]
else:
stack = []
nodes: List[Node] = []
while len(nodes) < num_nodes:
try:
node = await self.get_node(node_hash)
node = await self.get_node(node_hash, lock=lock)
except Exception:
return []
if isinstance(node, TerminalNode):
Expand Down
10 changes: 10 additions & 0 deletions chia/rpc/data_layer_rpc_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ def get_routes(self) -> Dict[str, Callable[[Any], Any]]:
"/unsubscribe": self.unsubscribe,
"/get_kv_diff": self.get_kv_diff,
"/get_root_history": self.get_root_history,
"/validate_data": self.validate_data,
}

async def create_data_store(self, request: Dict[str, Any]) -> Dict[str, Any]:
Expand Down Expand Up @@ -258,3 +259,12 @@ async def get_kv_diff(self, request: Dict[str, Any]) -> Dict[str, Any]:
for rec in records:
res.insert(0, {"type": rec.type.name, "key": rec.key.hex(), "value": rec.value.hex()})
return {"diff": res}

async def validate_data(self, request: Dict[str, Any]) -> Dict[str, Any]:
"""
Check if the data is valid.
"""
if self.service is None:
raise Exception("Data layer not created")
await self.service.validate_data()
return {"valid": True}
4 changes: 4 additions & 0 deletions chia/rpc/data_layer_rpc_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,3 +65,7 @@ async def get_kv_diff(self, store_id: bytes32, hash_1: bytes32, hash_2: bytes32)
async def get_root_history(self, store_id: bytes32) -> Dict[str, Any]:
response = await self.fetch("get_root_history", {"id": store_id.hex()})
return response # type: ignore[no-any-return]

async def validate_data(self) -> Dict[str, Any]:
response = await self.fetch("validate_data", {})
return response # type: ignore[no-any-return]