Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion aws_lambda_powertools/utilities/batch/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@
BasePartialProcessor,
BatchProcessor,
EventType,
ExceptionInfo,
FailureResponse,
SuccessResponse,
batch_processor,
)
from aws_lambda_powertools.utilities.batch.exceptions import ExceptionInfo
from aws_lambda_powertools.utilities.batch.sqs import PartialSQSProcessor, sqs_batch_processor

__all__ = (
Expand Down
27 changes: 10 additions & 17 deletions aws_lambda_powertools/utilities/batch/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,10 @@
import sys
from abc import ABC, abstractmethod
from enum import Enum
from types import TracebackType
from typing import Any, Callable, Dict, List, Optional, Tuple, Type, Union, overload

from aws_lambda_powertools.middleware_factory import lambda_handler_decorator
from aws_lambda_powertools.utilities.batch.exceptions import BatchProcessingError
from aws_lambda_powertools.utilities.batch.exceptions import BatchProcessingError, ExceptionInfo
from aws_lambda_powertools.utilities.data_classes.dynamo_db_stream_event import DynamoDBRecord
from aws_lambda_powertools.utilities.data_classes.kinesis_stream_event import KinesisStreamRecord
from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord
Expand All @@ -30,8 +29,6 @@ class EventType(Enum):
# type specifics
#
has_pydantic = "pydantic" in sys.modules
ExceptionInfo = Tuple[Type[BaseException], BaseException, TracebackType]
OptExcInfo = Union[ExceptionInfo, Tuple[None, None, None]]

# For IntelliSense and Mypy to work, we need to account for possible SQS, Kinesis and DynamoDB subclasses
# We need them as subclasses as we must access their message ID or sequence number metadata via dot notation
Expand Down Expand Up @@ -61,7 +58,7 @@ class BasePartialProcessor(ABC):
def __init__(self):
self.success_messages: List[BatchEventTypes] = []
self.fail_messages: List[BatchEventTypes] = []
self.exceptions: List = []
self.exceptions: List[ExceptionInfo] = []

@abstractmethod
def _prepare(self):
Expand Down Expand Up @@ -132,15 +129,15 @@ def success_handler(self, record, result: Any) -> SuccessResponse:
self.success_messages.append(record)
return entry

def failure_handler(self, record, exception: OptExcInfo) -> FailureResponse:
def failure_handler(self, record, exception: ExceptionInfo) -> FailureResponse:
"""
Keeps track of batch records that failed processing

Parameters
----------
record: Any
record that failed processing
exception: OptExcInfo
exception: ExceptionInfo
Exception information containing type, value, and traceback (sys.exc_info())

Returns
Expand Down Expand Up @@ -411,32 +408,28 @@ def _get_messages_to_report(self) -> Dict[str, str]:
def _collect_sqs_failures(self):
if self.model:
return {"itemIdentifier": msg.messageId for msg in self.fail_messages}
else:
return {"itemIdentifier": msg.message_id for msg in self.fail_messages}
return {"itemIdentifier": msg.message_id for msg in self.fail_messages}

def _collect_kinesis_failures(self):
if self.model:
# Pydantic model uses int but Lambda poller expects str
return {"itemIdentifier": msg.kinesis.sequenceNumber for msg in self.fail_messages}
else:
return {"itemIdentifier": msg.kinesis.sequence_number for msg in self.fail_messages}
return {"itemIdentifier": msg.kinesis.sequence_number for msg in self.fail_messages}

def _collect_dynamodb_failures(self):
if self.model:
return {"itemIdentifier": msg.dynamodb.SequenceNumber for msg in self.fail_messages}
else:
return {"itemIdentifier": msg.dynamodb.sequence_number for msg in self.fail_messages}
return {"itemIdentifier": msg.dynamodb.sequence_number for msg in self.fail_messages}

@overload
def _to_batch_type(self, record: dict, event_type: EventType, model: "BatchTypeModels") -> "BatchTypeModels":
...
... # pragma: no cover

@overload
def _to_batch_type(self, record: dict, event_type: EventType) -> EventSourceDataClassTypes:
...
... # pragma: no cover

def _to_batch_type(self, record: dict, event_type: EventType, model: Optional["BatchTypeModels"] = None):
if model is not None:
return model.parse_obj(record)
else:
return self._DATA_CLASS_MAPPING[event_type](record)
return self._DATA_CLASS_MAPPING[event_type](record)
11 changes: 7 additions & 4 deletions aws_lambda_powertools/utilities/batch/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,14 @@
Batch processing exceptions
"""
import traceback
from typing import Optional, Tuple
from types import TracebackType
from typing import List, Optional, Tuple, Type

ExceptionInfo = Tuple[Type[BaseException], BaseException, TracebackType]


class BaseBatchProcessingError(Exception):
def __init__(self, msg="", child_exceptions=()):
def __init__(self, msg="", child_exceptions: Optional[List[ExceptionInfo]] = None):
super().__init__(msg)
self.msg = msg
self.child_exceptions = child_exceptions
Expand All @@ -24,7 +27,7 @@ def format_exceptions(self, parent_exception_str):
class SQSBatchProcessingError(BaseBatchProcessingError):
"""When at least one message within a batch could not be processed"""

def __init__(self, msg="", child_exceptions: Optional[Tuple[Exception]] = None):
def __init__(self, msg="", child_exceptions: Optional[List[ExceptionInfo]] = None):
super().__init__(msg, child_exceptions)

# Overriding this method so we can output all child exception tracebacks when we raise this exception to prevent
Expand All @@ -37,7 +40,7 @@ def __str__(self):
class BatchProcessingError(BaseBatchProcessingError):
"""When all batch records failed to be processed"""

def __init__(self, msg="", child_exceptions: Optional[Tuple[Exception]] = None):
def __init__(self, msg="", child_exceptions: Optional[List[ExceptionInfo]] = None):
super().__init__(msg, child_exceptions)

def __str__(self):
Expand Down
4 changes: 2 additions & 2 deletions aws_lambda_powertools/utilities/parser/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,12 +86,12 @@ def handler(event: Order, context: LambdaContext):

@overload
def parse(event: Dict[str, Any], model: Type[Model]) -> Model:
...
... # pragma: no cover


@overload
def parse(event: Dict[str, Any], model: Type[Model], envelope: Type[Envelope]) -> EnvelopeModel:
...
... # pragma: no cover


def parse(event: Dict[str, Any], model: Type[Model], envelope: Optional[Type[Envelope]] = None):
Expand Down
6 changes: 6 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,12 @@ exclude_lines = [
# Don't complain if non-runnable code isn't run:
"if 0:",
"if __name__ == .__main__.:",

# Ignore type function overload
"@overload",

# Ignore interfaces, future protocols, and @overload impl
"...",
]

[tool.isort]
Expand Down
12 changes: 12 additions & 0 deletions tests/functional/idempotency/test_idempotency.py
Original file line number Diff line number Diff line change
Expand Up @@ -1057,3 +1057,15 @@ def two(data):
assert one(data=mock_event) == "one"
assert two(data=mock_event) == "two"
assert len(persistence_store.table.method_calls) == 4


def test_invalid_dynamodb_persistence_layer():
# Scenario constructing a DynamoDBPersistenceLayer with a key_attr matching sort_key_attr should fail
with pytest.raises(ValueError) as ve:
DynamoDBPersistenceLayer(
table_name="Foo",
key_attr="id",
sort_key_attr="id",
)
# and raise a ValueError
assert str(ve.value) == "key_attr [id] and sort_key_attr [id] cannot be the same!"
4 changes: 3 additions & 1 deletion tests/functional/test_utilities_batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -832,5 +832,7 @@ def lambda_handler(event, context):
return processor.response()

# WHEN/THEN
with pytest.raises(BatchProcessingError):
with pytest.raises(BatchProcessingError) as e:
lambda_handler(event, {})
ret = str(e)
assert ret is not None