1
1
import asyncio
2
2
import json
3
- from pathlib import Path
3
+ import logging
4
4
5
- from typing_extensions import override
5
+ from more_itertools import flatten
6
+ from typing_extensions import Self , override
6
7
7
8
from crawlee ._consts import METADATA_FILENAME
9
+ from crawlee .configuration import Configuration as CrawleeConfiguration
8
10
from crawlee .storage_clients ._file_system import FileSystemKeyValueStoreClient
11
+ from crawlee .storage_clients .models import KeyValueStoreRecord
9
12
10
13
from apify ._configuration import Configuration
11
14
15
+ logger = logging .getLogger (__name__ )
16
+
12
17
13
18
class ApifyFileSystemKeyValueStoreClient (FileSystemKeyValueStoreClient ):
14
19
"""Apify-specific implementation of the `FileSystemKeyValueStoreClient`.
@@ -17,23 +22,39 @@ class ApifyFileSystemKeyValueStoreClient(FileSystemKeyValueStoreClient):
17
22
directory, except for the metadata file and the `INPUT.json` file.
18
23
"""
19
24
25
+ @override
26
+ @classmethod
27
+ async def open (
28
+ cls ,
29
+ * ,
30
+ id : str | None ,
31
+ name : str | None ,
32
+ alias : str | None ,
33
+ configuration : CrawleeConfiguration ,
34
+ ) -> Self :
35
+ client = await super ().open (id = id , name = name , alias = alias , configuration = configuration )
36
+
37
+ await client ._sanitize_input_json_files () # noqa: SLF001 - it's okay, this is a factory method
38
+
39
+ return client
40
+
20
41
@override
21
42
async def purge (self ) -> None :
22
43
"""Purges the key-value store by deleting all its contents.
23
44
24
45
It deletes all files in the key-value store directory, except for the metadata file and
25
46
the `INPUT.json` file. It also updates the metadata to reflect that the store has been purged.
26
47
"""
27
- kvs_input_key = Configuration .get_global_configuration ().input_key
28
-
29
- # First try to find the alternative format of the input file and process it if it exists.
30
- for file_path in self .path_to_kvs .glob ('*' ):
31
- if file_path .name == f'{ kvs_input_key } .json' :
32
- await self ._process_input_json (file_path )
48
+ configuration = Configuration .get_global_configuration ()
33
49
34
50
async with self ._lock :
51
+ files_to_keep = set (
52
+ flatten ([key , f'{ key } .{ METADATA_FILENAME } ' ] for key in configuration .input_key_candidates )
53
+ )
54
+ files_to_keep .add (METADATA_FILENAME )
55
+
35
56
for file_path in self .path_to_kvs .glob ('*' ):
36
- if file_path .name in { METADATA_FILENAME , kvs_input_key , f' { kvs_input_key } . { METADATA_FILENAME } ' } :
57
+ if file_path .name in files_to_keep :
37
58
continue
38
59
if file_path .is_file ():
39
60
await asyncio .to_thread (file_path .unlink , missing_ok = True )
@@ -43,15 +64,40 @@ async def purge(self) -> None:
43
64
update_modified_at = True ,
44
65
)
45
66
46
- async def _process_input_json (self , path : Path ) -> None :
47
- """Process simple input json file to format expected by the FileSystemKeyValueStoreClient.
67
+ async def _sanitize_input_json_files (self ) -> None :
68
+ """Handle missing metadata for input files."""
69
+ configuration = Configuration .get_global_configuration ()
70
+ alternative_keys = configuration .input_key_candidates - {configuration .canonical_input_key }
48
71
49
- For example: INPUT.json -> INPUT, INPUT.json.metadata
50
- """
51
- try :
52
- f = await asyncio .to_thread (path .open )
53
- input_data = json .load (f )
54
- finally :
55
- f .close ()
56
- await asyncio .to_thread (path .unlink , missing_ok = True )
57
- await self .set_value (key = path .stem , value = input_data )
72
+ if (self .path_to_kvs / configuration .canonical_input_key ).exists ():
73
+ # Refresh metadata to prevent inconsistencies
74
+ input_data = await asyncio .to_thread (
75
+ lambda : json .loads ((self .path_to_kvs / configuration .canonical_input_key ).read_text ())
76
+ )
77
+ await self .set_value (key = configuration .canonical_input_key , value = input_data )
78
+
79
+ for alternative_key in alternative_keys :
80
+ if (alternative_input_file := self .path_to_kvs / alternative_key ).exists ():
81
+ logger .warning (f'Redundant input file found: { alternative_input_file } ' )
82
+ else :
83
+ for alternative_key in alternative_keys :
84
+ alternative_input_file = self .path_to_kvs / alternative_key
85
+
86
+ # Only process files that actually exist
87
+ if alternative_input_file .exists ():
88
+ # Refresh metadata to prevent inconsistencies
89
+ with alternative_input_file .open () as f :
90
+ input_data = await asyncio .to_thread (lambda : json .load (f ))
91
+ await self .set_value (key = alternative_key , value = input_data )
92
+
93
+ @override
94
+ async def get_value (self , * , key : str ) -> KeyValueStoreRecord | None :
95
+ configuration = Configuration .get_global_configuration ()
96
+
97
+ if key in configuration .input_key_candidates :
98
+ for candidate in configuration .input_key_candidates :
99
+ value = await super ().get_value (key = candidate )
100
+ if value is not None :
101
+ return value
102
+
103
+ return await super ().get_value (key = key )
0 commit comments