Skip to content

Commit 0bb5e8c

Browse files
authored
feat: Use a daemon thread to monitor the go feature server exclusively (#2391)
* feat: Use a daemon thread to monitor the go feature server exclusively Also reenable the tests for lifecycle management of the goserver Signed-off-by: Achal Shah <[email protected]> * fix workflow Signed-off-by: Achal Shah <[email protected]> * Fix setup.py build_python_protos Signed-off-by: Achal Shah <[email protected]> * CR comments Signed-off-by: Achal Shah <[email protected]> * mkdir is needed Signed-off-by: Achal Shah <[email protected]> * explicit commit Signed-off-by: Achal Shah <[email protected]> * flush file explicitly Signed-off-by: Achal Shah <[email protected]> * Flush registry changes Signed-off-by: Achal Shah <[email protected]> * Signal handler can only be triggered from main thread Signed-off-by: Achal Shah <[email protected]> * join on background thread when cancelling and update test Signed-off-by: Achal Shah <[email protected]> * fix redis Signed-off-by: Achal Shah <[email protected]> * add back go_build.py Signed-off-by: Achal Shah <[email protected]> * wait on grpc connection check Signed-off-by: Achal Shah <[email protected]> * Add a more robust wait Signed-off-by: Achal Shah <[email protected]> * Add a sleep Signed-off-by: Achal Shah <[email protected]> * Remove explicitt cleanup Signed-off-by: Achal Shah <[email protected]> * More defensive Signed-off-by: Achal Shah <[email protected]> * Even more defensive Signed-off-by: Achal Shah <[email protected]> * Stop join Signed-off-by: Achal Shah <[email protected]> * Clean up the process to ensure next one is fine Signed-off-by: Achal Shah <[email protected]> * join thread Signed-off-by: Achal Shah <[email protected]> * remove a sleep Signed-off-by: Achal Shah <[email protected]> * Cleanup Signed-off-by: Achal Shah <[email protected]>
1 parent ae133fd commit 0bb5e8c

File tree

18 files changed

+181
-237
lines changed

18 files changed

+181
-237
lines changed

.github/workflows/linter.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,6 @@ jobs:
6464
run: |
6565
pip install --upgrade "pip>=21.3.1"
6666
- name: Install dependencies
67-
run: make install-go-ci-dependencies
67+
run: make install-go-proto-dependencies
6868
- name: Lint go
6969
run: make lint-go

.github/workflows/unit_tests.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ jobs:
8484
with:
8585
go-version: 1.17.7
8686
- name: Install dependencies
87-
run: make install-go-ci-dependencies
87+
run: make install-go-proto-dependencies
8888
- name: Compile protos
8989
run: make compile-protos-go
9090
- name: Test

Makefile

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ build: protos build-java build-docker build-html
3535

3636
# Python SDK
3737

38-
install-python-ci-dependencies: install-go-ci-dependencies
38+
install-python-ci-dependencies: install-go-proto-dependencies
3939
cd sdk/python && python -m piptools sync requirements/py$(PYTHON)-ci-requirements.txt
4040
cd sdk/python && COMPILE_GO=true python setup.py develop
4141

@@ -125,19 +125,21 @@ build-java-no-tests:
125125

126126
# Go SDK
127127

128-
install-go-ci-dependencies:
128+
install-go-proto-dependencies:
129129
go install google.golang.org/protobuf/cmd/[email protected]
130130
go install google.golang.org/grpc/cmd/[email protected]
131131

132-
compile-protos-go: install-go-ci-dependencies
132+
install-protoc-dependencies:
133133
pip install grpcio-tools==1.34.0
134+
135+
compile-protos-go: install-go-proto-dependencies install-protoc-dependencies
134136
python sdk/python/setup.py build_go_protos
135137

136138
compile-go-feature-server: compile-protos-go
137139
go mod tidy
138140
go build -o ${ROOT_DIR}/sdk/python/feast/binaries/goserver github.com/feast-dev/feast/go/cmd/goserver
139141

140-
test-go: install-go-ci-dependencies
142+
test-go: compile-protos-go
141143
go test ./...
142144

143145
format-go:

go/cmd/goserver/main.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ type FeastEnvConfig struct {
2323
}
2424

2525
// TODO: Add a proper logging library such as https://github.com/Sirupsen/logrus
26-
2726
func main() {
2827

2928
var feastEnvConfig FeastEnvConfig
Binary file not shown.

sdk/python/feast/feature_store.py

Lines changed: 10 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -108,10 +108,7 @@ class FeatureStore:
108108

109109
@log_exceptions
110110
def __init__(
111-
self,
112-
repo_path: Optional[str] = None,
113-
config: Optional[RepoConfig] = None,
114-
go_server_use_thread: bool = False,
111+
self, repo_path: Optional[str] = None, config: Optional[RepoConfig] = None,
115112
):
116113
"""
117114
Creates a FeatureStore object.
@@ -135,7 +132,6 @@ def __init__(
135132
self._registry._initialize_registry()
136133
self._provider = get_provider(self.config, self.repo_path)
137134
self._go_server = None
138-
self._go_server_use_thread = go_server_use_thread
139135

140136
@log_exceptions
141137
def version(self) -> str:
@@ -733,6 +729,10 @@ def apply(
733729
service.name, project=self.project, commit=False
734730
)
735731

732+
# If a go server is running, kill it so that it can be recreated in `update_infra` with
733+
# the latest registry state.
734+
self.kill_go_server()
735+
736736
self._get_provider().update_infra(
737737
project=self.project,
738738
tables_to_delete=views_to_delete if not partial else [],
@@ -754,6 +754,8 @@ def teardown(self):
754754

755755
entities = self.list_entities()
756756

757+
self.kill_go_server()
758+
757759
self._get_provider().teardown_infra(self.project, tables, entities)
758760
self._registry.teardown()
759761

@@ -1233,11 +1235,8 @@ def get_online_features(
12331235
if self.config.go_feature_server:
12341236
# Lazily start the go server on the first request
12351237
if self._go_server is None:
1236-
self._go_server = GoServer(
1237-
str(self.repo_path.absolute()),
1238-
self.config,
1239-
self._go_server_use_thread,
1240-
)
1238+
self._go_server = GoServer(str(self.repo_path.absolute()), self.config,)
1239+
self._go_server._shared_connection._check_grpc_connection()
12411240
return self._go_server.get_online_features(
12421241
features, columnar, full_feature_names
12431242
)
@@ -1860,12 +1859,7 @@ def serve_transformations(self, port: int) -> None:
18601859
def kill_go_server(self):
18611860
if self._go_server:
18621861
self._go_server.kill_go_server_explicitly()
1863-
1864-
def set_go_server_use_thread(self, use: bool):
1865-
if self._go_server:
1866-
self._go_server.set_use_thread(use)
1867-
else:
1868-
self._go_server_use_thread = use
1862+
self._go_server = None
18691863

18701864

18711865
def _validate_entity_values(join_key_values: Dict[str, List[Value]]):

0 commit comments

Comments
 (0)