Skip to content

Commit 43da230

Browse files
authored
feat: Metadata changes & making data sources top level objects to power Feast UI (feast-dev#2336)
* Squash commits for metadata changes Signed-off-by: Danny Chiao <[email protected]> * tests Signed-off-by: Danny Chiao <[email protected]> * Add more tests Signed-off-by: Danny Chiao <[email protected]> * lint Signed-off-by: Danny Chiao <[email protected]> * Add apply test Signed-off-by: Danny Chiao <[email protected]> * Add apply test Signed-off-by: Danny Chiao <[email protected]> * Add apply test Signed-off-by: Danny Chiao <[email protected]> * lint Signed-off-by: Danny Chiao <[email protected]> * fix bigquery source Signed-off-by: Danny Chiao <[email protected]> * fix test Signed-off-by: Danny Chiao <[email protected]> * fix spark source Signed-off-by: Danny Chiao <[email protected]> * fix spark source Signed-off-by: Danny Chiao <[email protected]>
1 parent 5481caf commit 43da230

30 files changed

+718
-101
lines changed

protos/feast/core/DataSource.proto

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import "feast/core/DataFormat.proto";
2626
import "feast/types/Value.proto";
2727

2828
// Defines a Data Source that can be used source Feature data
29+
// Next available id: 22
2930
message DataSource {
3031
// Field indexes should *not* be reused. Not sure if fields 6-10 were used previously or not,
3132
// but they are going to be reserved for backwards compatibility.
@@ -45,6 +46,13 @@ message DataSource {
4546
REQUEST_SOURCE = 7;
4647

4748
}
49+
50+
// Unique name of data source within the project
51+
string name = 20;
52+
53+
// Name of Feast project that this data source belongs to.
54+
string project = 21;
55+
4856
SourceType type = 1;
4957

5058
// Defines mapping between fields in the sourced data
@@ -156,9 +164,7 @@ message DataSource {
156164

157165
// Defines options for DataSource that sources features from request data
158166
message RequestDataOptions {
159-
// Name of the request data source
160-
string name = 1;
161-
167+
reserved 1;
162168
// Mapping of feature name to type
163169
map<string, feast.types.ValueType.Enum> schema = 2;
164170
}

protos/feast/core/OnDemandFeatureView.proto

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,8 +48,6 @@ message OnDemandFeatureViewSpec {
4848
map<string, OnDemandInput> inputs = 4;
4949

5050
UserDefinedFunction user_defined_function = 5;
51-
52-
5351
}
5452

5553
message OnDemandFeatureViewMeta {

protos/feast/core/Registry.proto

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,13 +28,16 @@ import "feast/core/FeatureView.proto";
2828
import "feast/core/InfraObject.proto";
2929
import "feast/core/OnDemandFeatureView.proto";
3030
import "feast/core/RequestFeatureView.proto";
31+
import "feast/core/DataSource.proto";
3132
import "feast/core/SavedDataset.proto";
3233
import "google/protobuf/timestamp.proto";
3334

35+
// Next id: 13
3436
message Registry {
3537
repeated Entity entities = 1;
3638
repeated FeatureTable feature_tables = 2;
3739
repeated FeatureView feature_views = 6;
40+
repeated DataSource data_sources = 12;
3841
repeated OnDemandFeatureView on_demand_feature_views = 8;
3942
repeated RequestFeatureView request_feature_views = 9;
4043
repeated FeatureService feature_services = 7;

protos/feast/core/RequestFeatureView.proto

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,6 @@ option go_package = "github.com/feast-dev/feast/sdk/go/protos/feast/core";
2222
option java_outer_classname = "RequestFeatureViewProto";
2323
option java_package = "feast.proto.core";
2424

25-
import "feast/core/FeatureView.proto";
26-
import "feast/core/Feature.proto";
2725
import "feast/core/DataSource.proto";
2826

2927
message RequestFeatureView {

protos/feast/core/SavedDataset.proto

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,8 @@ option java_outer_classname = "SavedDatasetProto";
2323
option go_package = "github.com/feast-dev/feast/sdk/go/protos/feast/core";
2424

2525
import "google/protobuf/timestamp.proto";
26-
import "feast/core/FeatureViewProjection.proto";
2726
import "feast/core/DataSource.proto";
27+
import "feast/core/FeatureService.proto";
2828

2929
message SavedDatasetSpec {
3030
// Name of the dataset. Must be unique since it's possible to overwrite dataset by name
@@ -44,6 +44,9 @@ message SavedDatasetSpec {
4444

4545
SavedDatasetStorage storage = 6;
4646

47+
// Optional and only populated if generated from a feature service fetch
48+
string feature_service_name = 8;
49+
4750
// User defined metadata
4851
map<string, string> tags = 7;
4952
}

protos/feast/core/ValidationProfile.proto

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ option java_package = "feast.proto.core";
2222
option java_outer_classname = "ValidationProfile";
2323
option go_package = "github.com/feast-dev/feast/sdk/go/protos/feast/core";
2424

25-
import "google/protobuf/timestamp.proto";
2625
import "feast/core/SavedDataset.proto";
2726

2827
message GEValidationProfiler {

protos/feast/storage/Redis.proto

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616

1717
syntax = "proto3";
1818

19-
import "feast/types/Field.proto";
2019
import "feast/types/Value.proto";
2120

2221
package feast.storage;

sdk/python/feast/cli.py

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,56 @@ def endpoint(ctx: click.Context):
126126
_logger.info("There is no active feature server.")
127127

128128

129+
@cli.group(name="data-sources")
130+
def data_sources_cmd():
131+
"""
132+
Access data sources
133+
"""
134+
pass
135+
136+
137+
@data_sources_cmd.command("describe")
138+
@click.argument("name", type=click.STRING)
139+
@click.pass_context
140+
def data_source_describe(ctx: click.Context, name: str):
141+
"""
142+
Describe a data source
143+
"""
144+
repo = ctx.obj["CHDIR"]
145+
cli_check_repo(repo)
146+
store = FeatureStore(repo_path=str(repo))
147+
148+
try:
149+
data_source = store.get_data_source(name)
150+
except FeastObjectNotFoundException as e:
151+
print(e)
152+
exit(1)
153+
154+
print(
155+
yaml.dump(
156+
yaml.safe_load(str(data_source)), default_flow_style=False, sort_keys=False
157+
)
158+
)
159+
160+
161+
@data_sources_cmd.command(name="list")
162+
@click.pass_context
163+
def data_source_list(ctx: click.Context):
164+
"""
165+
List all data sources
166+
"""
167+
repo = ctx.obj["CHDIR"]
168+
cli_check_repo(repo)
169+
store = FeatureStore(repo_path=str(repo))
170+
table = []
171+
for datasource in store.list_data_sources():
172+
table.append([datasource.name, datasource.__class__])
173+
174+
from tabulate import tabulate
175+
176+
print(tabulate(table, headers=["NAME", "CLASS"], tablefmt="plain"))
177+
178+
129179
@cli.group(name="entities")
130180
def entities_cmd():
131181
"""

sdk/python/feast/data_source.py

Lines changed: 42 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,7 @@ class DataSource(ABC):
139139
DataSource that can be used to source features.
140140
141141
Args:
142+
name: Name of data source, which should be unique within a project
142143
event_timestamp_column (optional): Event timestamp column used for point in time
143144
joins of feature values.
144145
created_timestamp_column (optional): Timestamp column indicating when the row
@@ -149,19 +150,22 @@ class DataSource(ABC):
149150
date_partition_column (optional): Timestamp column used for partitioning.
150151
"""
151152

153+
name: str
152154
event_timestamp_column: str
153155
created_timestamp_column: str
154156
field_mapping: Dict[str, str]
155157
date_partition_column: str
156158

157159
def __init__(
158160
self,
161+
name: str,
159162
event_timestamp_column: Optional[str] = None,
160163
created_timestamp_column: Optional[str] = None,
161164
field_mapping: Optional[Dict[str, str]] = None,
162165
date_partition_column: Optional[str] = None,
163166
):
164167
"""Creates a DataSource object."""
168+
self.name = name
165169
self.event_timestamp_column = (
166170
event_timestamp_column if event_timestamp_column else ""
167171
)
@@ -173,12 +177,16 @@ def __init__(
173177
date_partition_column if date_partition_column else ""
174178
)
175179

180+
def __hash__(self):
181+
return hash((id(self), self.name))
182+
176183
def __eq__(self, other):
177184
if not isinstance(other, DataSource):
178185
raise TypeError("Comparisons should only involve DataSource class objects.")
179186

180187
if (
181-
self.event_timestamp_column != other.event_timestamp_column
188+
self.name != other.name
189+
or self.event_timestamp_column != other.event_timestamp_column
182190
or self.created_timestamp_column != other.created_timestamp_column
183191
or self.field_mapping != other.field_mapping
184192
or self.date_partition_column != other.date_partition_column
@@ -206,7 +214,9 @@ def from_proto(data_source: DataSourceProto) -> Any:
206214
cls = get_data_source_class_from_type(data_source.data_source_class_type)
207215
return cls.from_proto(data_source)
208216

209-
if data_source.file_options.file_format and data_source.file_options.file_url:
217+
if data_source.request_data_options and data_source.request_data_options.schema:
218+
data_source_obj = RequestDataSource.from_proto(data_source)
219+
elif data_source.file_options.file_format and data_source.file_options.file_url:
210220
from feast.infra.offline_stores.file_source import FileSource
211221

212222
data_source_obj = FileSource.from_proto(data_source)
@@ -246,7 +256,7 @@ def from_proto(data_source: DataSourceProto) -> Any:
246256
@abstractmethod
247257
def to_proto(self) -> DataSourceProto:
248258
"""
249-
Converts an DataSourceProto object to its protobuf representation.
259+
Converts a DataSourceProto object to its protobuf representation.
250260
"""
251261
raise NotImplementedError
252262

@@ -296,6 +306,7 @@ def get_table_column_names_and_types(
296306

297307
def __init__(
298308
self,
309+
name: str,
299310
event_timestamp_column: str,
300311
bootstrap_servers: str,
301312
message_format: StreamFormat,
@@ -305,6 +316,7 @@ def __init__(
305316
date_partition_column: Optional[str] = "",
306317
):
307318
super().__init__(
319+
name,
308320
event_timestamp_column,
309321
created_timestamp_column,
310322
field_mapping,
@@ -335,6 +347,7 @@ def __eq__(self, other):
335347
@staticmethod
336348
def from_proto(data_source: DataSourceProto):
337349
return KafkaSource(
350+
name=data_source.name,
338351
field_mapping=dict(data_source.field_mapping),
339352
bootstrap_servers=data_source.kafka_options.bootstrap_servers,
340353
message_format=StreamFormat.from_proto(
@@ -348,6 +361,7 @@ def from_proto(data_source: DataSourceProto):
348361

349362
def to_proto(self) -> DataSourceProto:
350363
data_source_proto = DataSourceProto(
364+
name=self.name,
351365
type=DataSourceProto.STREAM_KAFKA,
352366
field_mapping=self.field_mapping,
353367
kafka_options=self.kafka_options.to_proto(),
@@ -363,6 +377,9 @@ def to_proto(self) -> DataSourceProto:
363377
def source_datatype_to_feast_value_type() -> Callable[[str], ValueType]:
364378
return type_map.redshift_to_feast_value_type
365379

380+
def get_table_query_string(self) -> str:
381+
raise NotImplementedError
382+
366383

367384
class RequestDataSource(DataSource):
368385
"""
@@ -373,19 +390,14 @@ class RequestDataSource(DataSource):
373390
schema: Schema mapping from the input feature name to a ValueType
374391
"""
375392

376-
@staticmethod
377-
def source_datatype_to_feast_value_type() -> Callable[[str], ValueType]:
378-
raise NotImplementedError
379-
380393
name: str
381394
schema: Dict[str, ValueType]
382395

383396
def __init__(
384397
self, name: str, schema: Dict[str, ValueType],
385398
):
386399
"""Creates a RequestDataSource object."""
387-
super().__init__()
388-
self.name = name
400+
super().__init__(name)
389401
self.schema = schema
390402

391403
def validate(self, config: RepoConfig):
@@ -402,21 +414,28 @@ def from_proto(data_source: DataSourceProto):
402414
schema = {}
403415
for key in schema_pb.keys():
404416
schema[key] = ValueType(schema_pb.get(key))
405-
return RequestDataSource(
406-
name=data_source.request_data_options.name, schema=schema
407-
)
417+
return RequestDataSource(name=data_source.name, schema=schema)
408418

409419
def to_proto(self) -> DataSourceProto:
410420
schema_pb = {}
411421
for key, value in self.schema.items():
412422
schema_pb[key] = value.value
413-
options = DataSourceProto.RequestDataOptions(name=self.name, schema=schema_pb)
423+
options = DataSourceProto.RequestDataOptions(schema=schema_pb)
414424
data_source_proto = DataSourceProto(
415-
type=DataSourceProto.REQUEST_SOURCE, request_data_options=options
425+
name=self.name,
426+
type=DataSourceProto.REQUEST_SOURCE,
427+
request_data_options=options,
416428
)
417429

418430
return data_source_proto
419431

432+
def get_table_query_string(self) -> str:
433+
raise NotImplementedError
434+
435+
@staticmethod
436+
def source_datatype_to_feast_value_type() -> Callable[[str], ValueType]:
437+
raise NotImplementedError
438+
420439

421440
class KinesisSource(DataSource):
422441
def validate(self, config: RepoConfig):
@@ -430,6 +449,7 @@ def get_table_column_names_and_types(
430449
@staticmethod
431450
def from_proto(data_source: DataSourceProto):
432451
return KinesisSource(
452+
name=data_source.name,
433453
field_mapping=dict(data_source.field_mapping),
434454
record_format=StreamFormat.from_proto(
435455
data_source.kinesis_options.record_format
@@ -445,8 +465,12 @@ def from_proto(data_source: DataSourceProto):
445465
def source_datatype_to_feast_value_type() -> Callable[[str], ValueType]:
446466
pass
447467

468+
def get_table_query_string(self) -> str:
469+
raise NotImplementedError
470+
448471
def __init__(
449472
self,
473+
name: str,
450474
event_timestamp_column: str,
451475
created_timestamp_column: str,
452476
record_format: StreamFormat,
@@ -456,6 +480,7 @@ def __init__(
456480
date_partition_column: Optional[str] = "",
457481
):
458482
super().__init__(
483+
name,
459484
event_timestamp_column,
460485
created_timestamp_column,
461486
field_mapping,
@@ -475,7 +500,8 @@ def __eq__(self, other):
475500
)
476501

477502
if (
478-
self.kinesis_options.record_format != other.kinesis_options.record_format
503+
self.name != other.name
504+
or self.kinesis_options.record_format != other.kinesis_options.record_format
479505
or self.kinesis_options.region != other.kinesis_options.region
480506
or self.kinesis_options.stream_name != other.kinesis_options.stream_name
481507
):
@@ -485,6 +511,7 @@ def __eq__(self, other):
485511

486512
def to_proto(self) -> DataSourceProto:
487513
data_source_proto = DataSourceProto(
514+
name=self.name,
488515
type=DataSourceProto.STREAM_KINESIS,
489516
field_mapping=self.field_mapping,
490517
kinesis_options=self.kinesis_options.to_proto(),

0 commit comments

Comments
 (0)