Skip to content

Commit 702ec49

Browse files
TremaMigueladchia
andauthored
feat: Add support for DynamoDB online_read in batches (#2371)
* feat: dynamodb onlin read in batches Signed-off-by: Miguel Trejo <[email protected]> * run linters and format Signed-off-by: Miguel Trejo <[email protected]> * feat: batch_size parameter Signed-off-by: Miguel Trejo <[email protected]> * docs: typo in batch_size description Signed-off-by: Miguel Trejo <[email protected]> * trailing white space Signed-off-by: Miguel Trejo <[email protected]> * fix: batch_size is last argument Signed-off-by: Miguel Trejo <[email protected]> * test: dynamodb online store online_read in batches Signed-off-by: Miguel Trejo <[email protected]> * test: mock dynamodb behavior Signed-off-by: Miguel Trejo <[email protected]> * feat: batch_size value must be less than 40 Signed-off-by: Miguel Trejo <[email protected]> * feat: batch_size defaults to 40 Signed-off-by: Miguel Trejo <[email protected]> * feat: sort dynamodb responses Signed-off-by: Miguel Trejo <[email protected]> * resolve merge conflicts Signed-off-by: Miguel Trejo <[email protected]> * test online response proto with redshift:dynamodb Signed-off-by: Miguel Trejo <[email protected]> * feat: consistency in batch_size process Signed-off-by: Miguel Trejo <[email protected]> * fix: return batch_size times None Signed-off-by: Miguel Trejo <[email protected]> * remove debug code Signed-off-by: Miguel Trejo <[email protected]> * typo in docstring Signed-off-by: Miguel Trejo <[email protected]> * batch_size in onlineconfigstore Signed-off-by: Miguel Trejo <[email protected]> Co-authored-by: Danny Chiao <[email protected]>
1 parent 45db6dc commit 702ec49

File tree

3 files changed

+170
-18
lines changed

3 files changed

+170
-18
lines changed

sdk/python/feast/infra/online_stores/dynamodb.py

Lines changed: 59 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
14+
import itertools
1415
import logging
1516
from datetime import datetime
1617
from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple
@@ -50,10 +51,16 @@ class DynamoDBOnlineStoreConfig(FeastConfigBaseModel):
5051
"""Online store type selector"""
5152

5253
region: StrictStr
53-
""" AWS Region Name """
54+
"""AWS Region Name"""
5455

5556
table_name_template: StrictStr = "{project}.{table_name}"
56-
""" DynamoDB table name template """
57+
"""DynamoDB table name template"""
58+
59+
sort_response: bool = True
60+
"""Whether or not to sort BatchGetItem response."""
61+
62+
batch_size: int = 40
63+
"""Number of items to retrieve in a DynamoDB BatchGetItem call."""
5764

5865

5966
class DynamoDBOnlineStore(OnlineStore):
@@ -211,26 +218,46 @@ def online_read(
211218
online_config = config.online_store
212219
assert isinstance(online_config, DynamoDBOnlineStoreConfig)
213220
dynamodb_resource = self._get_dynamodb_resource(online_config.region)
221+
table_instance = dynamodb_resource.Table(
222+
_get_table_name(online_config, config, table)
223+
)
214224

215225
result: List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]] = []
216-
for entity_key in entity_keys:
217-
table_instance = dynamodb_resource.Table(
218-
_get_table_name(online_config, config, table)
219-
)
220-
entity_id = compute_entity_id(entity_key)
226+
entity_ids = [compute_entity_id(entity_key) for entity_key in entity_keys]
227+
batch_size = online_config.batch_size
228+
sort_response = online_config.sort_response
229+
entity_ids_iter = iter(entity_ids)
230+
while True:
231+
batch = list(itertools.islice(entity_ids_iter, batch_size))
232+
# No more items to insert
233+
if len(batch) == 0:
234+
break
235+
batch_entity_ids = {
236+
table_instance.name: {
237+
"Keys": [{"entity_id": entity_id} for entity_id in batch]
238+
}
239+
}
221240
with tracing_span(name="remote_call"):
222-
response = table_instance.get_item(Key={"entity_id": entity_id})
223-
value = response.get("Item")
224-
225-
if value is not None:
226-
res = {}
227-
for feature_name, value_bin in value["values"].items():
228-
val = ValueProto()
229-
val.ParseFromString(value_bin.value)
230-
res[feature_name] = val
231-
result.append((datetime.fromisoformat(value["event_ts"]), res))
241+
response = dynamodb_resource.batch_get_item(
242+
RequestItems=batch_entity_ids
243+
)
244+
response = response.get("Responses")
245+
table_responses = response.get(table_instance.name)
246+
if table_responses:
247+
if sort_response:
248+
table_responses = self._sort_dynamodb_response(
249+
table_responses, entity_ids
250+
)
251+
for tbl_res in table_responses:
252+
res = {}
253+
for feature_name, value_bin in tbl_res["values"].items():
254+
val = ValueProto()
255+
val.ParseFromString(value_bin.value)
256+
res[feature_name] = val
257+
result.append((datetime.fromisoformat(tbl_res["event_ts"]), res))
232258
else:
233-
result.append((None, None))
259+
batch_size_nones = ((None, None),) * len(batch)
260+
result.extend(batch_size_nones)
234261
return result
235262

236263
def _get_dynamodb_client(self, region: str):
@@ -243,6 +270,20 @@ def _get_dynamodb_resource(self, region: str):
243270
self._dynamodb_resource = _initialize_dynamodb_resource(region)
244271
return self._dynamodb_resource
245272

273+
def _sort_dynamodb_response(self, responses: list, order: list):
274+
"""DynamoDB Batch Get Item doesn't return items in a particular order."""
275+
# Assign an index to order
276+
order_with_index = {value: idx for idx, value in enumerate(order)}
277+
# Sort table responses by index
278+
table_responses_ordered = [
279+
(order_with_index[tbl_res["entity_id"]], tbl_res) for tbl_res in responses
280+
]
281+
table_responses_ordered = sorted(
282+
table_responses_ordered, key=lambda tup: tup[0]
283+
)
284+
_, table_responses_ordered = zip(*table_responses_ordered)
285+
return table_responses_ordered
286+
246287

247288
def _initialize_dynamodb_client(region: str):
248289
return boto3.client("dynamodb", region_name=region)
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
from dataclasses import dataclass
2+
3+
import pytest
4+
from moto import mock_dynamodb2
5+
6+
from feast.infra.offline_stores.file import FileOfflineStoreConfig
7+
from feast.infra.online_stores.dynamodb import (
8+
DynamoDBOnlineStore,
9+
DynamoDBOnlineStoreConfig,
10+
)
11+
from feast.repo_config import RepoConfig
12+
from tests.utils.online_store_utils import (
13+
_create_n_customer_test_samples,
14+
_create_test_table,
15+
_insert_data_test_table,
16+
)
17+
18+
REGISTRY = "s3://test_registry/registry.db"
19+
PROJECT = "test_aws"
20+
PROVIDER = "aws"
21+
TABLE_NAME = "dynamodb_online_store"
22+
REGION = "us-west-2"
23+
24+
25+
@dataclass
26+
class MockFeatureView:
27+
name: str
28+
29+
30+
@pytest.fixture
31+
def repo_config():
32+
return RepoConfig(
33+
registry=REGISTRY,
34+
project=PROJECT,
35+
provider=PROVIDER,
36+
online_store=DynamoDBOnlineStoreConfig(region=REGION),
37+
offline_store=FileOfflineStoreConfig(),
38+
)
39+
40+
41+
@mock_dynamodb2
42+
@pytest.mark.parametrize("n_samples", [5, 50, 100])
43+
def test_online_read(repo_config, n_samples):
44+
"""Test DynamoDBOnlineStore online_read method."""
45+
_create_test_table(PROJECT, f"{TABLE_NAME}_{n_samples}", REGION)
46+
data = _create_n_customer_test_samples(n=n_samples)
47+
_insert_data_test_table(data, PROJECT, f"{TABLE_NAME}_{n_samples}", REGION)
48+
49+
entity_keys, features = zip(*data)
50+
dynamodb_store = DynamoDBOnlineStore()
51+
returned_items = dynamodb_store.online_read(
52+
config=repo_config,
53+
table=MockFeatureView(name=f"{TABLE_NAME}_{n_samples}"),
54+
entity_keys=entity_keys,
55+
)
56+
assert len(returned_items) == len(data)
57+
assert [item[1] for item in returned_items] == list(features)
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
from datetime import datetime
2+
3+
import boto3
4+
5+
from feast import utils
6+
from feast.infra.online_stores.helpers import compute_entity_id
7+
from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto
8+
from feast.protos.feast.types.Value_pb2 import Value as ValueProto
9+
10+
11+
def _create_n_customer_test_samples(n=10):
12+
return [
13+
(
14+
EntityKeyProto(
15+
join_keys=["customer"], entity_values=[ValueProto(string_val=str(i))]
16+
),
17+
{
18+
"avg_orders_day": ValueProto(float_val=1.0),
19+
"name": ValueProto(string_val="John"),
20+
"age": ValueProto(int64_val=3),
21+
},
22+
)
23+
for i in range(n)
24+
]
25+
26+
27+
def _create_test_table(project, tbl_name, region):
28+
client = boto3.client("dynamodb", region_name=region)
29+
client.create_table(
30+
TableName=f"{project}.{tbl_name}",
31+
KeySchema=[{"AttributeName": "entity_id", "KeyType": "HASH"}],
32+
AttributeDefinitions=[{"AttributeName": "entity_id", "AttributeType": "S"}],
33+
BillingMode="PAY_PER_REQUEST",
34+
)
35+
36+
37+
def _delete_test_table(project, tbl_name, region):
38+
client = boto3.client("dynamodb", region_name=region)
39+
client.delete_table(TableName=f"{project}.{tbl_name}")
40+
41+
42+
def _insert_data_test_table(data, project, tbl_name, region):
43+
dynamodb_resource = boto3.resource("dynamodb", region_name=region)
44+
table_instance = dynamodb_resource.Table(f"{project}.{tbl_name}")
45+
for entity_key, features in data:
46+
entity_id = compute_entity_id(entity_key)
47+
with table_instance.batch_writer() as batch:
48+
batch.put_item(
49+
Item={
50+
"entity_id": entity_id,
51+
"event_ts": str(utils.make_tzaware(datetime.utcnow())),
52+
"values": {k: v.SerializeToString() for k, v in features.items()},
53+
}
54+
)

0 commit comments

Comments
 (0)