Skip to content

Commit 728791a

Browse files
authored
enhance: Store alias before wait for ready (#2894)
Signed-off-by: yangxuan <[email protected]>
1 parent 32d9aa4 commit 728791a

File tree

5 files changed

+131
-144
lines changed

5 files changed

+131
-144
lines changed

pymilvus/milvus_client/_utils.py

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
import hashlib
2+
import logging
3+
4+
from pymilvus.orm.connections import connections
5+
6+
logger = logging.getLogger(__name__)
7+
8+
9+
def create_connection(
10+
uri: str,
11+
token: str = "",
12+
db_name: str = "",
13+
use_async: bool = False,
14+
*,
15+
user: str = "",
16+
password: str = "",
17+
**kwargs,
18+
) -> str:
19+
"""Get or create the connection to the Milvus server.
20+
21+
Returns:
22+
str: The alias of the connection
23+
"""
24+
using = kwargs.pop("alias", None)
25+
if not using:
26+
use_async_fmt = "async" if use_async else ""
27+
28+
auth_fmt = ""
29+
if user:
30+
auth_fmt = f"{user}"
31+
elif token:
32+
md5 = hashlib.new("md5", usedforsecurity=False)
33+
md5.update(token.encode())
34+
auth_fmt = f"{md5.hexdigest()}"
35+
36+
# different uri, auth, db_name cannot share the same connection
37+
not_empty = [v for v in [use_async_fmt, uri, db_name, auth_fmt] if v]
38+
using = "-".join(not_empty)
39+
40+
if connections.has_connection(using):
41+
return using
42+
43+
try:
44+
connections.connect(
45+
using, user, password, db_name, token, uri=uri, _async=use_async, **kwargs
46+
)
47+
except Exception as ex:
48+
logger.error("Failed to create new connection using: %s", using)
49+
raise ex from ex
50+
else:
51+
logger.debug("Created new connection using: %s", using)
52+
return using

pymilvus/milvus_client/async_milvus_client.py

Lines changed: 10 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
from pymilvus.orm.schema import FieldSchema
2323
from pymilvus.orm.types import DataType
2424

25+
from ._utils import create_connection
2526
from .check import validate_param
2627
from .index import IndexParam, IndexParams
2728

@@ -43,8 +44,15 @@ def __init__(
4344
timeout: Optional[float] = None,
4445
**kwargs,
4546
) -> None:
46-
self._using = self._create_connection(
47-
uri, user, password, db_name, token, timeout=timeout, **kwargs
47+
self._using = create_connection(
48+
uri,
49+
token,
50+
db_name,
51+
use_async=True,
52+
user=user,
53+
password=password,
54+
timeout=timeout,
55+
**kwargs,
4856
)
4957
self.is_self_hosted = bool(utility.get_server_type(using=self._using) == "milvus")
5058

@@ -815,30 +823,6 @@ async def close(self):
815823
def _get_connection(self):
816824
return connections._fetch_handler(self._using)
817825

818-
def _create_connection(
819-
self,
820-
uri: str,
821-
user: str = "",
822-
password: str = "",
823-
db_name: str = "",
824-
token: str = "",
825-
**kwargs,
826-
) -> str:
827-
"""Create the connection to the Milvus server."""
828-
using = kwargs.pop("alias", None)
829-
if not using or using == "":
830-
using = f"async-{uri}{user}"
831-
try:
832-
connections.connect(
833-
using, user, password, db_name, token, uri=uri, _async=True, **kwargs
834-
)
835-
except Exception as ex:
836-
logger.error("Failed to create new connection using: %s", using)
837-
raise ex from ex
838-
else:
839-
logger.debug("Created new connection using: %s", using)
840-
return using
841-
842826
def _extract_primary_field(self, schema_dict: Dict) -> dict:
843827
fields = schema_dict.get("fields", [])
844828
if not fields:

pymilvus/milvus_client/milvus_client.py

Lines changed: 3 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
import hashlib
21
import logging
32
from typing import Dict, List, Optional, Union
43

@@ -28,6 +27,7 @@
2827
from pymilvus.orm.iterator import QueryIterator, SearchIterator
2928
from pymilvus.orm.types import DataType
3029

30+
from ._utils import create_connection
3131
from .check import validate_param
3232
from .index import IndexParam, IndexParams
3333

@@ -62,8 +62,8 @@ def __init__(
6262
to None.
6363
Unit: second
6464
"""
65-
self._using = self._create_connection(
66-
uri, user, password, db_name, token, timeout=timeout, **kwargs
65+
self._using = create_connection(
66+
uri, token, db_name, user=user, password=password, timeout=timeout, **kwargs
6767
)
6868
self.is_self_hosted = bool(utility.get_server_type(using=self._using) == "milvus")
6969

@@ -919,41 +919,6 @@ def close(self):
919919
def _get_connection(self):
920920
return connections._fetch_handler(self._using)
921921

922-
def _create_connection(
923-
self,
924-
uri: str,
925-
user: str = "",
926-
password: str = "",
927-
db_name: str = "",
928-
token: str = "",
929-
**kwargs,
930-
) -> str:
931-
"""Create the connection to the Milvus server."""
932-
using = kwargs.pop("alias", None)
933-
if not using or using == "":
934-
base_using = f"{uri}-{db_name}"
935-
# different user cannot share the same connection
936-
if user:
937-
using = f"{base_using}-{user}"
938-
elif token:
939-
# make md5 of token
940-
md5 = hashlib.new("md5", usedforsecurity=False)
941-
md5.update(token.encode())
942-
using = f"{base_using}-{md5.hexdigest()}"
943-
else:
944-
using = f"{base_using}"
945-
946-
if connections.has_connection(using):
947-
return using
948-
try:
949-
connections.connect(using, user, password, db_name, token, uri=uri, **kwargs)
950-
except Exception as ex:
951-
logger.error("Failed to create new connection using: %s", using)
952-
raise ex from ex
953-
else:
954-
logger.debug("Created new connection using: %s", using)
955-
return using
956-
957922
def _extract_primary_field(self, schema_dict: Dict) -> dict:
958923
fields = schema_dict.get("fields", [])
959924
if not fields:

0 commit comments

Comments
 (0)