Skip to content

Commit 5481caf

Browse files
author
Vitaly Sergeyev
authored
feat: Event timestamps response (#2355)
* ability to get event timestamps from online response Signed-off-by: Vitaly Sergeyev <[email protected]> * fix event timestamp bugs Signed-off-by: Vitaly Sergeyev <[email protected]> * python formatting Signed-off-by: Vitaly Sergeyev <[email protected]> * optional param to retrieve event_timestamp in online_reponse Signed-off-by: Vitaly Sergeyev <[email protected]> * formatting Signed-off-by: Vitaly Sergeyev <[email protected]> * renaming param Signed-off-by: Vitaly Sergeyev <[email protected]>
1 parent d1b7c67 commit 5481caf

File tree

4 files changed

+88
-6
lines changed

4 files changed

+88
-6
lines changed

sdk/python/feast/feature_store.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1508,10 +1508,10 @@ def _read_from_online_store(
15081508

15091509
# Each row is a set of features for a given entity key. We only need to convert
15101510
# the data to Protobuf once.
1511-
row_ts_proto = Timestamp()
15121511
null_value = Value()
15131512
read_row_protos = []
15141513
for read_row in read_rows:
1514+
row_ts_proto = Timestamp()
15151515
row_ts, feature_data = read_row
15161516
if row_ts is not None:
15171517
row_ts_proto.FromDatetime(row_ts)

sdk/python/feast/infra/online_stores/redis.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
Union,
2828
)
2929

30+
import pytz
3031
from google.protobuf.timestamp_pb2 import Timestamp
3132
from pydantic import StrictStr
3233
from pydantic.typing import Literal
@@ -302,5 +303,5 @@ def _get_features_for_entity(
302303
if not res:
303304
return None, None
304305
else:
305-
timestamp = datetime.fromtimestamp(res_ts.seconds)
306+
timestamp = datetime.fromtimestamp(res_ts.seconds, tz=pytz.utc)
306307
return timestamp, res

sdk/python/feast/online_response.py

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,12 @@
2020
from feast.protos.feast.serving.ServingService_pb2 import GetOnlineFeaturesResponse
2121
from feast.type_map import feast_value_type_to_python_type
2222

23+
TIMESTAMP_POSTFIX: str = "__ts"
24+
2325

2426
class OnlineResponse:
2527
"""
26-
Defines a online response in feast.
28+
Defines an online response in feast.
2729
"""
2830

2931
def __init__(self, online_response_proto: GetOnlineFeaturesResponse):
@@ -44,9 +46,12 @@ def __init__(self, online_response_proto: GetOnlineFeaturesResponse):
4446
del result.event_timestamps[idx]
4547
break
4648

47-
def to_dict(self) -> Dict[str, Any]:
49+
def to_dict(self, include_event_timestamps: bool = False) -> Dict[str, Any]:
4850
"""
4951
Converts GetOnlineFeaturesResponse features into a dictionary form.
52+
53+
Args:
54+
is_with_event_timestamps: bool Optionally include feature timestamps in the dictionary
5055
"""
5156
response: Dict[str, List[Any]] = {}
5257

@@ -58,11 +63,22 @@ def to_dict(self) -> Dict[str, Any]:
5863
else:
5964
response[feature_ref].append(native_type_value)
6065

66+
if include_event_timestamps:
67+
event_ts = result.event_timestamps[idx].seconds
68+
timestamp_ref = feature_ref + TIMESTAMP_POSTFIX
69+
if timestamp_ref not in response:
70+
response[timestamp_ref] = [event_ts]
71+
else:
72+
response[timestamp_ref].append(event_ts)
73+
6174
return response
6275

63-
def to_df(self) -> pd.DataFrame:
76+
def to_df(self, include_event_timestamps: bool = False) -> pd.DataFrame:
6477
"""
6578
Converts GetOnlineFeaturesResponse features into Panda dataframe form.
79+
80+
Args:
81+
is_with_event_timestamps: bool Optionally include feature timestamps in the dataframe
6682
"""
6783

68-
return pd.DataFrame(self.to_dict())
84+
return pd.DataFrame(self.to_dict(include_event_timestamps))

sdk/python/tests/integration/online_store/test_universal_online.py

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
FeatureNameCollisionError,
1919
RequestDataNotFoundInEntityRowsException,
2020
)
21+
from feast.online_response import TIMESTAMP_POSTFIX
2122
from feast.wait import wait_retry_backoff
2223
from tests.integration.feature_repos.repo_configuration import (
2324
Environment,
@@ -322,6 +323,70 @@ def get_online_features_dict(
322323
return dict1
323324

324325

326+
@pytest.mark.integration
327+
@pytest.mark.universal
328+
@pytest.mark.parametrize("full_feature_names", [True, False], ids=lambda v: str(v))
329+
def test_online_retrieval_with_event_timestamps(
330+
environment, universal_data_sources, full_feature_names
331+
):
332+
fs = environment.feature_store
333+
entities, datasets, data_sources = universal_data_sources
334+
feature_views = construct_universal_feature_views(data_sources)
335+
336+
fs.apply([driver(), feature_views.driver, feature_views.global_fv])
337+
338+
# fake data to ingest into Online Store
339+
data = {
340+
"driver_id": [1, 2],
341+
"conv_rate": [0.5, 0.3],
342+
"acc_rate": [0.6, 0.4],
343+
"avg_daily_trips": [4, 5],
344+
"event_timestamp": [
345+
pd.to_datetime(1646263500, utc=True, unit="s"),
346+
pd.to_datetime(1646263600, utc=True, unit="s"),
347+
],
348+
"created": [
349+
pd.to_datetime(1646263500, unit="s"),
350+
pd.to_datetime(1646263600, unit="s"),
351+
],
352+
}
353+
df_ingest = pd.DataFrame(data)
354+
355+
# directly ingest data into the Online Store
356+
fs.write_to_online_store("driver_stats", df_ingest)
357+
358+
response = fs.get_online_features(
359+
features=[
360+
"driver_stats:avg_daily_trips",
361+
"driver_stats:acc_rate",
362+
"driver_stats:conv_rate",
363+
],
364+
entity_rows=[{"driver": 1}, {"driver": 2}],
365+
)
366+
df = response.to_df(True)
367+
assertpy.assert_that(len(df)).is_equal_to(2)
368+
assertpy.assert_that(df["driver_id"].iloc[0]).is_equal_to(1)
369+
assertpy.assert_that(df["driver_id"].iloc[1]).is_equal_to(2)
370+
assertpy.assert_that(df["avg_daily_trips" + TIMESTAMP_POSTFIX].iloc[0]).is_equal_to(
371+
1646263500
372+
)
373+
assertpy.assert_that(df["avg_daily_trips" + TIMESTAMP_POSTFIX].iloc[1]).is_equal_to(
374+
1646263600
375+
)
376+
assertpy.assert_that(df["acc_rate" + TIMESTAMP_POSTFIX].iloc[0]).is_equal_to(
377+
1646263500
378+
)
379+
assertpy.assert_that(df["acc_rate" + TIMESTAMP_POSTFIX].iloc[1]).is_equal_to(
380+
1646263600
381+
)
382+
assertpy.assert_that(df["conv_rate" + TIMESTAMP_POSTFIX].iloc[0]).is_equal_to(
383+
1646263500
384+
)
385+
assertpy.assert_that(df["conv_rate" + TIMESTAMP_POSTFIX].iloc[1]).is_equal_to(
386+
1646263600
387+
)
388+
389+
325390
@pytest.mark.integration
326391
@pytest.mark.universal
327392
@pytest.mark.parametrize("full_feature_names", [True, False], ids=lambda v: str(v))

0 commit comments

Comments
 (0)