Skip to content
1 change: 0 additions & 1 deletion descope/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,5 @@
REFRESH_SESSION_COOKIE_NAME,
SESSION_COOKIE_NAME,
DeliveryMethod,
User,
)
from descope.exceptions import AuthException
170 changes: 100 additions & 70 deletions descope/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
DeliveryMethod,
EndpointsV1,
OAuthProviders,
User,
)
from descope.exceptions import AuthException

Expand Down Expand Up @@ -206,20 +205,25 @@ def _compose_logout_url() -> str:
return EndpointsV1.logoutPath

@staticmethod
def _get_identifier_name_by_method(method: DeliveryMethod) -> str:
def _get_identifier_by_method(
method: DeliveryMethod, user: dict
) -> Tuple[str, str]:
if method is DeliveryMethod.EMAIL:
return "email"
email = user.get("email", "")
return "email", email
elif method is DeliveryMethod.PHONE:
return "phone"
phone = user.get("phone", "")
return "phone", phone
elif method is DeliveryMethod.WHATSAPP:
return "phone"
whatsapp = user.get("phone", "")
return ("whatsapp", whatsapp)
else:
raise AuthException(
500, "identifier failure", f"Unknown delivery method {method}"
)

def sign_up_otp(
self, method: DeliveryMethod, identifier: str, user: User = None
self, method: DeliveryMethod, identifier: str, user: dict = None
) -> None:
"""
Sign up a new user by OTP
Expand All @@ -244,10 +248,12 @@ def sign_up_otp(
f"Identifier {identifier} is not valid by delivery method {method}",
)

body = {self._get_identifier_name_by_method(method): identifier}
body = {"externalID": identifier}

if user is not None:
body["user"] = user.get_data()
body["user"] = user
method_str, val = self._get_identifier_by_method(method, user)
body[method_str] = val

uri = AuthClient._compose_signup_url(method)
response = requests.post(
Expand Down Expand Up @@ -283,7 +289,7 @@ def sign_in_otp(self, method: DeliveryMethod, identifier: str) -> None:
)

body = {
self._get_identifier_name_by_method(method): identifier,
"externalID": identifier,
}

uri = AuthClient._compose_signin_url(method)
Expand All @@ -295,9 +301,7 @@ def sign_in_otp(self, method: DeliveryMethod, identifier: str) -> None:
if not response.ok:
raise AuthException(response.status_code, "", response.text)

def verify_code(
self, method: DeliveryMethod, identifier: str, code: str
) -> Tuple[dict, dict]: # Tuple(dict of claims, dict of tokens)
def verify_code(self, method: DeliveryMethod, identifier: str, code: str) -> dict:
"""Verify OTP code sent by the delivery method that chosen

Args:
Expand Down Expand Up @@ -327,7 +331,7 @@ def verify_code(
f"Identifier {identifier} is not valid by delivery method {method}",
)

body = {self._get_identifier_name_by_method(method): identifier, "code": code}
body = {"externalID": identifier, "code": code}

uri = AuthClient._compose_verify_code_url(method)
response = requests.post(
Expand All @@ -338,14 +342,48 @@ def verify_code(
if not response.ok:
raise AuthException(response.status_code, "", response.reason)

session_token = response.cookies.get(SESSION_COOKIE_NAME)
refresh_token = response.cookies.get(REFRESH_SESSION_COOKIE_NAME)

claims, tokens = self._validate_and_load_tokens(session_token, refresh_token)
return (claims, tokens)
resp = response.json()
jwt_response = self._generate_jwt_response(
resp, response.cookies.get(REFRESH_SESSION_COOKIE_NAME, None)
)
return jwt_response

def _generate_auth_info(self, response_body, cookie) -> dict:
tokens = {}
for token in response_body["jwts"]:
token_claims = self._validate_and_load_tokens(token, None)
token_claims["projectId"] = token_claims.pop(
"iss"
) # replace the key name from iss->projectId
token_claims["userId"] = token_claims.pop(
"sub"
) # replace the key name from sub->userId
tokens[token_claims["cookieName"]] = token_claims

if cookie:
token_claims = self._validate_and_load_tokens(cookie, None)
token_claims["projectId"] = token_claims.pop(
"iss"
) # replace the key name from iss->projectId
token_claims["userId"] = token_claims.pop(
"sub"
) # replace the key name from sub->userId
tokens[token_claims["cookieName"]] = token_claims

return tokens

def _generate_jwt_response(self, response_body, cookie) -> dict:
tokens = self._generate_auth_info(response_body, cookie)
jwt_response = {
"error": response_body.get("error", ""),
"jwts": tokens,
"user": response_body.get("user", ""),
"firstSeen": response_body.get("firstSeen", True),
}
return jwt_response

def sign_up_magiclink(
self, method: DeliveryMethod, identifier: str, uri: str, user: User = None
self, method: DeliveryMethod, identifier: str, uri: str, user: dict = None
) -> None:
"""
Sign up a new user by magic link
Expand All @@ -372,10 +410,16 @@ def sign_up_magiclink(
f"Identifier {identifier} is not valid by delivery method {method}",
)

body = {self._get_identifier_name_by_method(method): identifier, "URI": uri}
body = {
"externalID": identifier,
"URI": uri,
"crossDevice": False,
}

if user is not None:
body["user"] = user.get_data()
body["user"] = user
method_str, val = self._get_identifier_by_method(method, user)
body[method_str] = val

requestUri = AuthClient._compose_signup_magiclink_url(method)
response = requests.post(
Expand Down Expand Up @@ -414,7 +458,11 @@ def sign_in_magiclink(
f"Identifier {identifier} is not valid by delivery method {method}",
)

body = {self._get_identifier_name_by_method(method): identifier, "URI": uri}
body = {
"externalID": identifier,
"URI": uri,
"crossDevice": False,
}

requestUri = AuthClient._compose_signin_magiclink_url(method)
response = requests.post(
Expand All @@ -425,9 +473,7 @@ def sign_in_magiclink(
if not response.ok:
raise AuthException(response.status_code, "", response.text)

def verify_magiclink(
self, code: str
) -> Tuple[dict, dict]: # Tuple(dict of claims, dict of tokens)
def verify_magiclink(self, code: str) -> dict:
"""Verify magiclink

Args:
Expand All @@ -453,13 +499,13 @@ def verify_magiclink(
if not response.ok:
raise AuthException(response.status_code, "", response.reason)

session_token = response.cookies.get(SESSION_COOKIE_NAME)
refresh_token = response.cookies.get(REFRESH_SESSION_COOKIE_NAME)

claims, tokens = self._validate_and_load_tokens(session_token, refresh_token)
return (claims, tokens)
resp = response.json()
jwt_response = self._generate_jwt_response(
resp, response.cookies.get(REFRESH_SESSION_COOKIE_NAME, None)
)
return jwt_response

def refresh_token(self, signed_token: str, signed_refresh_token: str) -> str:
def refresh_token(self, signed_token: str, signed_refresh_token: str) -> dict:
cookies = {
SESSION_COOKIE_NAME: signed_token,
REFRESH_SESSION_COOKIE_NAME: signed_refresh_token,
Expand All @@ -479,23 +525,20 @@ def refresh_token(self, signed_token: str, signed_refresh_token: str) -> str:
f"Failed to refresh token with error: {response.text}",
)

res_cookies = response.cookies
ds_cookie = res_cookies.get(SESSION_COOKIE_NAME, None)
if not ds_cookie:
raise AuthException(
401, "Refresh token failed", "Failed to get new refreshed token"
)
return ds_cookie
resp = response.json()
auth_info = self._generate_auth_info(
resp, response.cookies.get(REFRESH_SESSION_COOKIE_NAME, None)
)
return auth_info

def _validate_and_load_tokens(
self, signed_token: str, signed_refresh_token: str
) -> Tuple[dict, dict]: # Tuple(dict of claims, dict of tokens)

if signed_token is None or signed_refresh_token is None:
) -> dict:
if signed_token is None:
raise AuthException(
401,
"token validation failure",
f"signed token {signed_token} or/and signed refresh token {signed_refresh_token} are empty",
f"signed token {signed_token} is empty",
)

try:
Expand Down Expand Up @@ -544,11 +587,10 @@ def _validate_and_load_tokens(
claims = jwt.decode(
jwt=signed_token, key=copy_key[0].key, algorithms=[alg_header]
)
tokens = {
SESSION_COOKIE_NAME: signed_token,
REFRESH_SESSION_COOKIE_NAME: signed_refresh_token,
}
return (claims, tokens)

claims["jwt"] = signed_token
return claims

except ExpiredSignatureError:
# Session token expired, check that refresh token is valid
try:
Expand All @@ -561,36 +603,21 @@ def _validate_and_load_tokens(
raise AuthException(
401, "token validation failure", f"refresh token is not valid, {e}"
)

# Refresh token is valid now refresh the session token
refreshed_session_token = self.refresh_token(
signed_token, signed_refresh_token
)
# Parse the new session token
try:
claims = jwt.decode(
jwt=refreshed_session_token,
key=copy_key[0].key,
algorithms=[alg_header],
)
tokens = {
SESSION_COOKIE_NAME: refreshed_session_token,
REFRESH_SESSION_COOKIE_NAME: signed_refresh_token,
}
return (claims, tokens)
except Exception as e:
raise AuthException(
401,
"token validation failure",
f"new session token is not valid, {e}",
)
auth_info = self.refresh_token(signed_token, signed_refresh_token)

claims = auth_info[SESSION_COOKIE_NAME]
return claims

except Exception as e:
raise AuthException(
401, "token validation failure", f"token is not valid, {e}"
)

def validate_session_request(
self, signed_token: str, signed_refresh_token: str
) -> Tuple[dict, dict]: # Tuple(dict of claims, dict of tokens)
) -> dict:
"""
Validate session request by verify the session JWT session token
and session refresh token in case it expired
Expand All @@ -610,7 +637,10 @@ def validate_session_request(
AuthException: for any case token is not valid means session is not
authorized
"""
return self._validate_and_load_tokens(signed_token, signed_refresh_token)
token_claims = self._validate_and_load_tokens(
signed_token, signed_refresh_token
)
return {token_claims["cookieName"]: token_claims}

def logout(
self, signed_token: str, signed_refresh_token: str
Expand Down
20 changes: 2 additions & 18 deletions descope/common.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from enum import Enum

DEFAULT_BASE_URI = "https://localhost:8443"
DEFAULT_FETCH_PUBLIC_KEY_URI = "https://localhost:8443" # will use the same base uri as above once gateway will be available
DEFAULT_BASE_URI = "https://localhost:8443" # "http://127.0.0.1:8191"
DEFAULT_FETCH_PUBLIC_KEY_URI = "https://localhost:8443" # "http://127.0.0.1:8152" # will use the same base uri as above once gateway will be available

PHONE_REGEX = """^(?:(?:\\(?(?:00|\\+)([1-4]\\d\\d|[1-9]\\d?)\\)?)?[\\-\\.\\ \\\\/]?)?((?:\\(?\\d{1,}\\)?[\\-\\.\\ \\\\/]?){0,})(?:[\\-\\.\\ \\\\/]?(?:#|ext\\.?|extension|x)[\\-\\.\\ \\\\/]?(\\d+))?$"""

Expand Down Expand Up @@ -31,19 +31,3 @@ class DeliveryMethod(Enum):


OAuthProviders = ["facebook", "github", "google", "microsoft", "gitlab", "apple"]


class User:
def __init__(self, username: str, name: str, phone: str, email: str):
self.username = username
self.name = name
self.phone = phone
self.email = email

def get_data(self):
return {
"username": self.username,
"name": self.name,
"phone": self.phone,
"email": self.email,
}
Loading