Skip to content

Commit 6b96b21

Browse files
authored
feat: Data Source Api Update (#2468)
* Fix Signed-off-by: Kevin Zhang <[email protected]> * File source Signed-off-by: Kevin Zhang <[email protected]> * Redshift source Signed-off-by: Kevin Zhang <[email protected]> * Rest of them. Signed-off-by: Kevin Zhang <[email protected]> * Fix typo Signed-off-by: Kevin Zhang <[email protected]> * Fix Signed-off-by: Kevin Zhang <[email protected]> * Fix lint Signed-off-by: Kevin Zhang <[email protected]> * Revert small change Signed-off-by: Kevin Zhang <[email protected]> * Address review comments Signed-off-by: Kevin Zhang <[email protected]>
1 parent 43847de commit 6b96b21

File tree

7 files changed

+187
-7
lines changed

7 files changed

+187
-7
lines changed

protos/feast/core/DataSource.proto

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ import "feast/core/DataFormat.proto";
2626
import "feast/types/Value.proto";
2727

2828
// Defines a Data Source that can be used source Feature data
29-
// Next available id: 23
29+
// Next available id: 26
3030
message DataSource {
3131
// Field indexes should *not* be reused. Not sure if fields 6-10 were used previously or not,
3232
// but they are going to be reserved for backwards compatibility.
@@ -53,6 +53,12 @@ message DataSource {
5353
// Name of Feast project that this data source belongs to.
5454
string project = 21;
5555

56+
string description = 23;
57+
58+
map<string, string> tags = 24;
59+
60+
string owner = 25;
61+
5662
SourceType type = 1;
5763

5864
// Defines mapping between fields in the sourced data

sdk/python/feast/data_source.py

Lines changed: 92 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -162,13 +162,20 @@ class DataSource(ABC):
162162
source to feature names in a feature table or view. Only used for feature
163163
columns, not entity or timestamp columns.
164164
date_partition_column (optional): Timestamp column used for partitioning.
165+
description (optional) A human-readable description.
166+
tags (optional): A dictionary of key-value pairs to store arbitrary metadata.
167+
owner (optional): The owner of the data source, typically the email of the primary
168+
maintainer.
165169
"""
166170

167171
name: str
168172
event_timestamp_column: str
169173
created_timestamp_column: str
170174
field_mapping: Dict[str, str]
171175
date_partition_column: str
176+
description: str
177+
tags: Dict[str, str]
178+
owner: str
172179

173180
def __init__(
174181
self,
@@ -177,8 +184,27 @@ def __init__(
177184
created_timestamp_column: Optional[str] = None,
178185
field_mapping: Optional[Dict[str, str]] = None,
179186
date_partition_column: Optional[str] = None,
187+
description: Optional[str] = "",
188+
tags: Optional[Dict[str, str]] = None,
189+
owner: Optional[str] = "",
180190
):
181-
"""Creates a DataSource object."""
191+
"""
192+
Creates a DataSource object.
193+
Args:
194+
name: Name of data source, which should be unique within a project
195+
event_timestamp_column (optional): Event timestamp column used for point in time
196+
joins of feature values.
197+
created_timestamp_column (optional): Timestamp column indicating when the row
198+
was created, used for deduplicating rows.
199+
field_mapping (optional): A dictionary mapping of column names in this data
200+
source to feature names in a feature table or view. Only used for feature
201+
columns, not entity or timestamp columns.
202+
date_partition_column (optional): Timestamp column used for partitioning.
203+
description (optional): A human-readable description.
204+
tags (optional): A dictionary of key-value pairs to store arbitrary metadata.
205+
owner (optional): The owner of the data source, typically the email of the primary
206+
maintainer.
207+
"""
182208
self.name = name
183209
self.event_timestamp_column = (
184210
event_timestamp_column if event_timestamp_column else ""
@@ -190,6 +216,9 @@ def __init__(
190216
self.date_partition_column = (
191217
date_partition_column if date_partition_column else ""
192218
)
219+
self.description = description or ""
220+
self.tags = tags or {}
221+
self.owner = owner or ""
193222

194223
def __hash__(self):
195224
return hash((id(self), self.name))
@@ -207,6 +236,9 @@ def __eq__(self, other):
207236
or self.created_timestamp_column != other.created_timestamp_column
208237
or self.field_mapping != other.field_mapping
209238
or self.date_partition_column != other.date_partition_column
239+
or self.tags != other.tags
240+
or self.owner != other.owner
241+
or self.description != other.description
210242
):
211243
return False
212244

@@ -303,13 +335,19 @@ def __init__(
303335
created_timestamp_column: Optional[str] = "",
304336
field_mapping: Optional[Dict[str, str]] = None,
305337
date_partition_column: Optional[str] = "",
338+
description: Optional[str] = "",
339+
tags: Optional[Dict[str, str]] = None,
340+
owner: Optional[str] = "",
306341
):
307342
super().__init__(
308343
name,
309344
event_timestamp_column,
310345
created_timestamp_column,
311346
field_mapping,
312347
date_partition_column,
348+
description=description,
349+
tags=tags,
350+
owner=owner,
313351
)
314352
self.kafka_options = KafkaOptions(
315353
bootstrap_servers=bootstrap_servers,
@@ -346,6 +384,9 @@ def from_proto(data_source: DataSourceProto):
346384
event_timestamp_column=data_source.event_timestamp_column,
347385
created_timestamp_column=data_source.created_timestamp_column,
348386
date_partition_column=data_source.date_partition_column,
387+
description=data_source.description,
388+
tags=dict(data_source.tags),
389+
owner=data_source.owner,
349390
)
350391

351392
def to_proto(self) -> DataSourceProto:
@@ -354,12 +395,14 @@ def to_proto(self) -> DataSourceProto:
354395
type=DataSourceProto.STREAM_KAFKA,
355396
field_mapping=self.field_mapping,
356397
kafka_options=self.kafka_options.to_proto(),
398+
description=self.description,
399+
tags=self.tags,
400+
owner=self.owner,
357401
)
358402

359403
data_source_proto.event_timestamp_column = self.event_timestamp_column
360404
data_source_proto.created_timestamp_column = self.created_timestamp_column
361405
data_source_proto.date_partition_column = self.date_partition_column
362-
363406
return data_source_proto
364407

365408
@staticmethod
@@ -377,16 +420,25 @@ class RequestDataSource(DataSource):
377420
Args:
378421
name: Name of the request data source
379422
schema: Schema mapping from the input feature name to a ValueType
423+
description (optional): A human-readable description.
424+
tags (optional): A dictionary of key-value pairs to store arbitrary metadata.
425+
owner (optional): The owner of the request data source, typically the email of the primary
426+
maintainer.
380427
"""
381428

382429
name: str
383430
schema: Dict[str, ValueType]
384431

385432
def __init__(
386-
self, name: str, schema: Dict[str, ValueType],
433+
self,
434+
name: str,
435+
schema: Dict[str, ValueType],
436+
description: Optional[str] = "",
437+
tags: Optional[Dict[str, str]] = None,
438+
owner: Optional[str] = "",
387439
):
388440
"""Creates a RequestDataSource object."""
389-
super().__init__(name)
441+
super().__init__(name, description=description, tags=tags, owner=owner)
390442
self.schema = schema
391443

392444
def validate(self, config: RepoConfig):
@@ -403,7 +455,13 @@ def from_proto(data_source: DataSourceProto):
403455
schema = {}
404456
for key, val in schema_pb.items():
405457
schema[key] = ValueType(val)
406-
return RequestDataSource(name=data_source.name, schema=schema)
458+
return RequestDataSource(
459+
name=data_source.name,
460+
schema=schema,
461+
description=data_source.description,
462+
tags=dict(data_source.tags),
463+
owner=data_source.owner,
464+
)
407465

408466
def to_proto(self) -> DataSourceProto:
409467
schema_pb = {}
@@ -414,6 +472,9 @@ def to_proto(self) -> DataSourceProto:
414472
name=self.name,
415473
type=DataSourceProto.REQUEST_SOURCE,
416474
request_data_options=options,
475+
description=self.description,
476+
tags=self.tags,
477+
owner=self.owner,
417478
)
418479

419480
return data_source_proto
@@ -448,6 +509,9 @@ def from_proto(data_source: DataSourceProto):
448509
event_timestamp_column=data_source.event_timestamp_column,
449510
created_timestamp_column=data_source.created_timestamp_column,
450511
date_partition_column=data_source.date_partition_column,
512+
description=data_source.description,
513+
tags=dict(data_source.tags),
514+
owner=data_source.owner,
451515
)
452516

453517
@staticmethod
@@ -467,13 +531,19 @@ def __init__(
467531
stream_name: str,
468532
field_mapping: Optional[Dict[str, str]] = None,
469533
date_partition_column: Optional[str] = "",
534+
description: Optional[str] = "",
535+
tags: Optional[Dict[str, str]] = None,
536+
owner: Optional[str] = "",
470537
):
471538
super().__init__(
472539
name,
473540
event_timestamp_column,
474541
created_timestamp_column,
475542
field_mapping,
476543
date_partition_column,
544+
description=description,
545+
tags=tags,
546+
owner=owner,
477547
)
478548
self.kinesis_options = KinesisOptions(
479549
record_format=record_format, region=region, stream_name=stream_name
@@ -504,6 +574,9 @@ def to_proto(self) -> DataSourceProto:
504574
type=DataSourceProto.STREAM_KINESIS,
505575
field_mapping=self.field_mapping,
506576
kinesis_options=self.kinesis_options.to_proto(),
577+
description=self.description,
578+
tags=self.tags,
579+
owner=self.owner,
507580
)
508581

509582
data_source_proto.event_timestamp_column = self.event_timestamp_column
@@ -529,6 +602,9 @@ def __init__(
529602
schema: Dict[str, ValueType],
530603
batch_source: DataSource,
531604
event_timestamp_column="timestamp",
605+
description: Optional[str] = "",
606+
tags: Optional[Dict[str, str]] = None,
607+
owner: Optional[str] = "",
532608
):
533609
"""
534610
Creates a PushSource object.
@@ -539,8 +615,12 @@ def __init__(
539615
store to the online store, and when retrieving historical features.
540616
event_timestamp_column (optional): Event timestamp column used for point in time
541617
joins of feature values.
618+
description (optional): A human-readable description.
619+
tags (optional): A dictionary of key-value pairs to store arbitrary metadata.
620+
owner (optional): The owner of the data source, typically the email of the primary
621+
maintainer.
542622
"""
543-
super().__init__(name)
623+
super().__init__(name, description=description, tags=tags, owner=owner)
544624
self.schema = schema
545625
self.batch_source = batch_source
546626
if not self.batch_source:
@@ -574,6 +654,9 @@ def from_proto(data_source: DataSourceProto):
574654
schema=schema,
575655
batch_source=batch_source,
576656
event_timestamp_column=data_source.event_timestamp_column,
657+
description=data_source.description,
658+
tags=dict(data_source.tags),
659+
owner=data_source.owner,
577660
)
578661

579662
def to_proto(self) -> DataSourceProto:
@@ -592,6 +675,9 @@ def to_proto(self) -> DataSourceProto:
592675
type=DataSourceProto.PUSH_SOURCE,
593676
push_options=options,
594677
event_timestamp_column=self.event_timestamp_column,
678+
description=self.description,
679+
tags=self.tags,
680+
owner=self.owner,
595681
)
596682

597683
return data_source_proto

sdk/python/feast/infra/offline_stores/bigquery_source.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,9 @@ def __init__(
2424
date_partition_column: Optional[str] = "",
2525
query: Optional[str] = None,
2626
name: Optional[str] = None,
27+
description: Optional[str] = "",
28+
tags: Optional[Dict[str, str]] = None,
29+
owner: Optional[str] = "",
2730
):
2831
"""Create a BigQuerySource from an existing table or query.
2932
@@ -37,6 +40,10 @@ def __init__(
3740
date_partition_column (optional): Timestamp column used for partitioning.
3841
query (optional): SQL query to execute to generate data for this data source.
3942
name (optional): Name for the source. Defaults to the table_ref if not specified.
43+
description (optional): A human-readable description.
44+
tags (optional): A dictionary of key-value pairs to store arbitrary metadata.
45+
owner (optional): The owner of the bigquery source, typically the email of the primary
46+
maintainer.
4047
Example:
4148
>>> from feast import BigQuerySource
4249
>>> my_bigquery_source = BigQuerySource(table="gcp_project:bq_dataset.bq_table")
@@ -75,6 +82,9 @@ def __init__(
7582
created_timestamp_column,
7683
field_mapping,
7784
date_partition_column,
85+
description=description,
86+
tags=tags,
87+
owner=owner,
7888
)
7989

8090
# Note: Python requires redefining hash in child classes that override __eq__
@@ -94,6 +104,9 @@ def __eq__(self, other):
94104
and self.event_timestamp_column == other.event_timestamp_column
95105
and self.created_timestamp_column == other.created_timestamp_column
96106
and self.field_mapping == other.field_mapping
107+
and self.description == other.description
108+
and self.tags == other.tags
109+
and self.owner == other.owner
97110
)
98111

99112
@property
@@ -117,6 +130,9 @@ def from_proto(data_source: DataSourceProto):
117130
created_timestamp_column=data_source.created_timestamp_column,
118131
date_partition_column=data_source.date_partition_column,
119132
query=data_source.bigquery_options.query,
133+
description=data_source.description,
134+
tags=dict(data_source.tags),
135+
owner=data_source.owner,
120136
)
121137

122138
def to_proto(self) -> DataSourceProto:
@@ -125,6 +141,9 @@ def to_proto(self) -> DataSourceProto:
125141
type=DataSourceProto.BATCH_BIGQUERY,
126142
field_mapping=self.field_mapping,
127143
bigquery_options=self.bigquery_options.to_proto(),
144+
description=self.description,
145+
tags=self.tags,
146+
owner=self.owner,
128147
)
129148

130149
data_source_proto.event_timestamp_column = self.event_timestamp_column

sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark_source.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,9 @@ def __init__(
4040
created_timestamp_column: Optional[str] = None,
4141
field_mapping: Optional[Dict[str, str]] = None,
4242
date_partition_column: Optional[str] = None,
43+
description: Optional[str] = "",
44+
tags: Optional[Dict[str, str]] = None,
45+
owner: Optional[str] = "",
4346
):
4447
# If no name, use the table_ref as the default name
4548
_name = name
@@ -54,6 +57,9 @@ def __init__(
5457
created_timestamp_column,
5558
field_mapping,
5659
date_partition_column,
60+
description=description,
61+
tags=tags,
62+
owner=owner,
5763
)
5864
warnings.warn(
5965
"The spark data source API is an experimental feature in alpha development. "
@@ -125,6 +131,9 @@ def from_proto(data_source: DataSourceProto) -> Any:
125131
event_timestamp_column=data_source.event_timestamp_column,
126132
created_timestamp_column=data_source.created_timestamp_column,
127133
date_partition_column=data_source.date_partition_column,
134+
description=data_source.description,
135+
tags=dict(data_source.tags),
136+
owner=data_source.owner,
128137
)
129138

130139
def to_proto(self) -> DataSourceProto:
@@ -133,6 +142,9 @@ def to_proto(self) -> DataSourceProto:
133142
type=DataSourceProto.CUSTOM_SOURCE,
134143
field_mapping=self.field_mapping,
135144
custom_options=self.spark_options.to_proto(),
145+
description=self.description,
146+
tags=self.tags,
147+
owner=self.owner,
136148
)
137149

138150
data_source_proto.event_timestamp_column = self.event_timestamp_column

0 commit comments

Comments
 (0)