From 6051c916818c1f7aa4d7c5dc4d844ec29a3a5c5f Mon Sep 17 00:00:00 2001 From: Tatevik Date: Wed, 10 Sep 2025 13:09:15 +0400 Subject: [PATCH 01/13] MessageStatusEnum --- .../Model/Dto/Message/MessageMetadataDto.php | 4 +++- .../Model/Message/MessageMetadata.php | 15 +++++++----- .../Messaging/Model/Message/MessageStatus.php | 24 +++++++++++++++++++ .../Service/Builder/MessageBuilder.php | 2 +- .../Service/Manager/MessageManager.php | 8 +++++++ .../Service/Processor/CampaignProcessor.php | 2 +- .../Repository/MessageRepositoryTest.php | 6 ++--- .../Service/SubscriberDeletionServiceTest.php | 2 +- .../Service/Builder/MessageBuilderTest.php | 15 ++---------- .../Builder/MessageContentBuilderTest.php | 2 +- .../Builder/MessageFormatBuilderTest.php | 2 +- .../Builder/MessageOptionsBuilderTest.php | 2 +- .../Builder/MessageScheduleBuilderTest.php | 2 +- .../Service/Manager/MessageManagerTest.php | 17 ++++++------- .../Processor/CampaignProcessorTest.php | 12 +++++----- 15 files changed, 71 insertions(+), 44 deletions(-) create mode 100644 src/Domain/Messaging/Model/Message/MessageStatus.php diff --git a/src/Domain/Messaging/Model/Dto/Message/MessageMetadataDto.php b/src/Domain/Messaging/Model/Dto/Message/MessageMetadataDto.php index 0776c1d1..91802df2 100644 --- a/src/Domain/Messaging/Model/Dto/Message/MessageMetadataDto.php +++ b/src/Domain/Messaging/Model/Dto/Message/MessageMetadataDto.php @@ -4,10 +4,12 @@ namespace PhpList\Core\Domain\Messaging\Model\Dto\Message; +use PhpList\Core\Domain\Messaging\Model\Message\MessageStatus; + class MessageMetadataDto { public function __construct( - public readonly string $status, + public readonly MessageStatus $status, ) { } } diff --git a/src/Domain/Messaging/Model/Message/MessageMetadata.php b/src/Domain/Messaging/Model/Message/MessageMetadata.php index 123103ff..5563a25f 100644 --- a/src/Domain/Messaging/Model/Message/MessageMetadata.php +++ b/src/Domain/Messaging/Model/Message/MessageMetadata.php @@ -33,13 +33,13 @@ class MessageMetadata implements EmbeddableInterface private ?DateTime $sendStart; public function __construct( - ?string $status = null, + ?MessageStatus $status = null, int $bounceCount = 0, ?DateTime $entered = null, ?DateTime $sent = null, ?DateTime $sendStart = null, ) { - $this->status = $status; + $this->status = $status->value ?? null; $this->processed = false; $this->viewed = 0; $this->bounceCount = $bounceCount; @@ -48,14 +48,17 @@ public function __construct( $this->sendStart = $sendStart; } - public function getStatus(): ?string + /** + * @SuppressWarnings("PHPMD.StaticAccess") + */ + public function getStatus(): ?MessageStatus { - return $this->status; + return MessageStatus::from($this->status); } - public function setStatus(string $status): self + public function setStatus(MessageStatus $status): self { - $this->status = $status; + $this->status = $status->value; return $this; } diff --git a/src/Domain/Messaging/Model/Message/MessageStatus.php b/src/Domain/Messaging/Model/Message/MessageStatus.php new file mode 100644 index 00000000..ad8d4876 --- /dev/null +++ b/src/Domain/Messaging/Model/Message/MessageStatus.php @@ -0,0 +1,24 @@ + true, + default => false, + }; + } +} diff --git a/src/Domain/Messaging/Service/Builder/MessageBuilder.php b/src/Domain/Messaging/Service/Builder/MessageBuilder.php index e8807170..85e74331 100644 --- a/src/Domain/Messaging/Service/Builder/MessageBuilder.php +++ b/src/Domain/Messaging/Service/Builder/MessageBuilder.php @@ -45,7 +45,7 @@ public function build(MessageDtoInterface $createMessageDto, object $context = n return $context->getExisting(); } - $metadata = new Message\MessageMetadata($createMessageDto->getMetadata()->status); + $metadata = new Message\MessageMetadata(Message\MessageStatus::Draft); return new Message($format, $schedule, $metadata, $content, $options, $context->getOwner(), $template); } diff --git a/src/Domain/Messaging/Service/Manager/MessageManager.php b/src/Domain/Messaging/Service/Manager/MessageManager.php index 7b263083..c73f31ca 100644 --- a/src/Domain/Messaging/Service/Manager/MessageManager.php +++ b/src/Domain/Messaging/Service/Manager/MessageManager.php @@ -43,6 +43,14 @@ public function updateMessage( return $message; } + public function updateStatus(Message $message, Message\MessageStatus $status): Message + { + $message->getMetadata()->setStatus($status); + $this->messageRepository->save($message); + + return $message; + } + public function delete(Message $message): void { $this->messageRepository->remove($message); diff --git a/src/Domain/Messaging/Service/Processor/CampaignProcessor.php b/src/Domain/Messaging/Service/Processor/CampaignProcessor.php index 13a100a3..cb051519 100644 --- a/src/Domain/Messaging/Service/Processor/CampaignProcessor.php +++ b/src/Domain/Messaging/Service/Processor/CampaignProcessor.php @@ -69,7 +69,7 @@ public function process(Message $campaign, ?OutputInterface $output = null): voi usleep(100000); } - $campaign->getMetadata()->setStatus('sent'); + $campaign->getMetadata()->setStatus(Message\MessageStatus::Sent); $this->entityManager->flush(); } } diff --git a/tests/Integration/Domain/Messaging/Repository/MessageRepositoryTest.php b/tests/Integration/Domain/Messaging/Repository/MessageRepositoryTest.php index d7435815..1bd98735 100644 --- a/tests/Integration/Domain/Messaging/Repository/MessageRepositoryTest.php +++ b/tests/Integration/Domain/Messaging/Repository/MessageRepositoryTest.php @@ -48,7 +48,7 @@ public function testMessageIsPersistedAndFetchedCorrectly(): void $message = new Message( new MessageFormat(true, 'text'), new MessageSchedule(1, null, 3, null, null), - new MessageMetadata('done'), + new MessageMetadata(Message\MessageStatus::Sent), new MessageContent('Hello world!'), new MessageOptions(), $admin @@ -62,7 +62,7 @@ public function testMessageIsPersistedAndFetchedCorrectly(): void self::assertCount(1, $foundMessages); self::assertInstanceOf(Message::class, $foundMessages[0]); - self::assertSame('done', $foundMessages[0]->getMetadata()->getStatus()); + self::assertSame(Message\MessageStatus::Sent, $foundMessages[0]->getMetadata()->getStatus()); self::assertSame('Hello world!', $foundMessages[0]->getContent()->getSubject()); } @@ -77,7 +77,7 @@ public function testGetByOwnerIdReturnsOnlyOwnedMessages(): void $msg1 = new Message( new MessageFormat(true, MessageFormat::FORMAT_TEXT), new MessageSchedule(1, null, 3, null, null), - new MessageMetadata('done'), + new MessageMetadata(Message\MessageStatus::Sent), new MessageContent('Owned by Admin 1!'), new MessageOptions(), $admin1 diff --git a/tests/Integration/Domain/Subscription/Service/SubscriberDeletionServiceTest.php b/tests/Integration/Domain/Subscription/Service/SubscriberDeletionServiceTest.php index b3bfda0c..03704f53 100644 --- a/tests/Integration/Domain/Subscription/Service/SubscriberDeletionServiceTest.php +++ b/tests/Integration/Domain/Subscription/Service/SubscriberDeletionServiceTest.php @@ -58,7 +58,7 @@ public function testDeleteSubscriberWithRelatedDataDoesNotThrowDoctrineError(): $msg = new Message( format: new MessageFormat(true, MessageFormat::FORMAT_TEXT), schedule: new MessageSchedule(1, null, 3, null, null), - metadata: new MessageMetadata('done'), + metadata: new MessageMetadata(Message\MessageStatus::Sent), content: new MessageContent('Owned by Admin 1!'), options: new MessageOptions(), owner: $admin diff --git a/tests/Unit/Domain/Messaging/Service/Builder/MessageBuilderTest.php b/tests/Unit/Domain/Messaging/Service/Builder/MessageBuilderTest.php index 564bd34d..d99d041a 100644 --- a/tests/Unit/Domain/Messaging/Service/Builder/MessageBuilderTest.php +++ b/tests/Unit/Domain/Messaging/Service/Builder/MessageBuilderTest.php @@ -2,9 +2,8 @@ declare(strict_types=1); -namespace PhpList\Core\Tests\Unit\Domain\Service\Builder; +namespace PhpList\Core\Tests\Unit\Domain\Messaging\Service\Builder; -use Error; use InvalidArgumentException; use PhpList\Core\Domain\Identity\Model\Administrator; use PhpList\Core\Domain\Messaging\Model\Dto\CreateMessageDto; @@ -64,7 +63,7 @@ private function createRequest(): CreateMessageDto formatOptions: [] ), metadata: new MessageMetadataDto( - status: 'draft' + status: Message\MessageStatus::Draft ), options: new MessageOptionsDto( fromField: '', @@ -117,16 +116,6 @@ public function testBuildsNewMessage(): void $this->builder->build($request, $context); } - public function testThrowsExceptionOnInvalidRequest(): void - { - $this->expectException(Error::class); - - $this->builder->build( - $this->createMock(CreateMessageDto::class), - new MessageContext($this->createMock(Administrator::class)) - ); - } - public function testThrowsExceptionOnInvalidContext(): void { $this->expectException(InvalidArgumentException::class); diff --git a/tests/Unit/Domain/Messaging/Service/Builder/MessageContentBuilderTest.php b/tests/Unit/Domain/Messaging/Service/Builder/MessageContentBuilderTest.php index 2b1aa771..21f90692 100644 --- a/tests/Unit/Domain/Messaging/Service/Builder/MessageContentBuilderTest.php +++ b/tests/Unit/Domain/Messaging/Service/Builder/MessageContentBuilderTest.php @@ -2,7 +2,7 @@ declare(strict_types=1); -namespace PhpList\Core\Tests\Unit\Domain\Service\Builder; +namespace PhpList\Core\Tests\Unit\Domain\Messaging\Service\Builder; use InvalidArgumentException; use PhpList\Core\Domain\Messaging\Model\Dto\Message\MessageContentDto; diff --git a/tests/Unit/Domain/Messaging/Service/Builder/MessageFormatBuilderTest.php b/tests/Unit/Domain/Messaging/Service/Builder/MessageFormatBuilderTest.php index 8d9320a0..1bd576f5 100644 --- a/tests/Unit/Domain/Messaging/Service/Builder/MessageFormatBuilderTest.php +++ b/tests/Unit/Domain/Messaging/Service/Builder/MessageFormatBuilderTest.php @@ -2,7 +2,7 @@ declare(strict_types=1); -namespace PhpList\Core\Tests\Unit\Domain\Service\Builder; +namespace PhpList\Core\Tests\Unit\Domain\Messaging\Service\Builder; use InvalidArgumentException; use PhpList\Core\Domain\Messaging\Model\Dto\Message\MessageFormatDto; diff --git a/tests/Unit/Domain/Messaging/Service/Builder/MessageOptionsBuilderTest.php b/tests/Unit/Domain/Messaging/Service/Builder/MessageOptionsBuilderTest.php index c6795d29..754177a2 100644 --- a/tests/Unit/Domain/Messaging/Service/Builder/MessageOptionsBuilderTest.php +++ b/tests/Unit/Domain/Messaging/Service/Builder/MessageOptionsBuilderTest.php @@ -2,7 +2,7 @@ declare(strict_types=1); -namespace PhpList\Core\Tests\Unit\Domain\Service\Builder; +namespace PhpList\Core\Tests\Unit\Domain\Messaging\Service\Builder; use InvalidArgumentException; use PhpList\Core\Domain\Messaging\Model\Dto\Message\MessageOptionsDto; diff --git a/tests/Unit/Domain/Messaging/Service/Builder/MessageScheduleBuilderTest.php b/tests/Unit/Domain/Messaging/Service/Builder/MessageScheduleBuilderTest.php index 38f04338..25a89052 100644 --- a/tests/Unit/Domain/Messaging/Service/Builder/MessageScheduleBuilderTest.php +++ b/tests/Unit/Domain/Messaging/Service/Builder/MessageScheduleBuilderTest.php @@ -2,7 +2,7 @@ declare(strict_types=1); -namespace PhpList\Core\Tests\Unit\Domain\Service\Builder; +namespace PhpList\Core\Tests\Unit\Domain\Messaging\Service\Builder; use DateTime; use InvalidArgumentException; diff --git a/tests/Unit/Domain/Messaging/Service/Manager/MessageManagerTest.php b/tests/Unit/Domain/Messaging/Service/Manager/MessageManagerTest.php index aa1a47e0..94064485 100644 --- a/tests/Unit/Domain/Messaging/Service/Manager/MessageManagerTest.php +++ b/tests/Unit/Domain/Messaging/Service/Manager/MessageManagerTest.php @@ -13,6 +13,7 @@ use PhpList\Core\Domain\Messaging\Model\Dto\Message\MessageScheduleDto; use PhpList\Core\Domain\Messaging\Model\Dto\UpdateMessageDto; use PhpList\Core\Domain\Messaging\Model\Message; +use PhpList\Core\Domain\Messaging\Model\Message\MessageContent; use PhpList\Core\Domain\Messaging\Repository\MessageRepository; use PhpList\Core\Domain\Messaging\Service\Builder\MessageBuilder; use PhpList\Core\Domain\Messaging\Service\Manager\MessageManager; @@ -34,7 +35,7 @@ public function testCreateMessageReturnsPersistedMessage(): void requeueInterval: 60 * 12, requeueUntil: '2025-04-20T00:00:00+00:00', ); - $metadata = new MessageMetadataDto('draft'); + $metadata = new MessageMetadataDto(Message\MessageStatus::Draft); $content = new MessageContentDto('Subject', 'Full text', 'Short text', 'Footer'); $options = new MessageOptionsDto('from@example.com', 'to@example.com', 'reply@example.com', 'all-users'); @@ -50,11 +51,11 @@ public function testCreateMessageReturnsPersistedMessage(): void $authUser = $this->createMock(Administrator::class); $expectedMessage = $this->createMock(Message::class); - $expectedContent = $this->createMock(\PhpList\Core\Domain\Messaging\Model\Message\MessageContent::class); + $expectedContent = $this->createMock(MessageContent::class); $expectedMetadata = $this->createMock(Message\MessageMetadata::class); $expectedContent->method('getSubject')->willReturn('Subject'); - $expectedMetadata->method('getStatus')->willReturn('draft'); + $expectedMetadata->method('getStatus')->willReturn(Message\MessageStatus::Draft); $expectedMessage->method('getContent')->willReturn($expectedContent); $expectedMessage->method('getMetadata')->willReturn($expectedMetadata); @@ -71,7 +72,7 @@ public function testCreateMessageReturnsPersistedMessage(): void $message = $manager->createMessage($request, $authUser); $this->assertSame('Subject', $message->getContent()->getSubject()); - $this->assertSame('draft', $message->getMetadata()->getStatus()); + $this->assertSame(Message\MessageStatus::Draft, $message->getMetadata()->getStatus()); } public function testUpdateMessageReturnsUpdatedMessage(): void @@ -88,7 +89,7 @@ public function testUpdateMessageReturnsUpdatedMessage(): void requeueInterval: 0, requeueUntil: '2025-04-20T00:00:00+00:00', ); - $metadata = new MessageMetadataDto('draft'); + $metadata = new MessageMetadataDto(Message\MessageStatus::Draft); $content = new MessageContentDto( 'Updated Subject', 'Updated Full text', @@ -115,11 +116,11 @@ public function testUpdateMessageReturnsUpdatedMessage(): void $authUser = $this->createMock(Administrator::class); $existingMessage = $this->createMock(Message::class); - $expectedContent = $this->createMock(\PhpList\Core\Domain\Messaging\Model\Message\MessageContent::class); + $expectedContent = $this->createMock(MessageContent::class); $expectedMetadata = $this->createMock(Message\MessageMetadata::class); $expectedContent->method('getSubject')->willReturn('Updated Subject'); - $expectedMetadata->method('getStatus')->willReturn('draft'); + $expectedMetadata->method('getStatus')->willReturn(Message\MessageStatus::Draft); $existingMessage->method('getContent')->willReturn($expectedContent); $existingMessage->method('getMetadata')->willReturn($expectedMetadata); @@ -136,6 +137,6 @@ public function testUpdateMessageReturnsUpdatedMessage(): void $message = $manager->updateMessage($updateRequest, $existingMessage, $authUser); $this->assertSame('Updated Subject', $message->getContent()->getSubject()); - $this->assertSame('draft', $message->getMetadata()->getStatus()); + $this->assertSame(Message\MessageStatus::Draft, $message->getMetadata()->getStatus()); } } diff --git a/tests/Unit/Domain/Messaging/Service/Processor/CampaignProcessorTest.php b/tests/Unit/Domain/Messaging/Service/Processor/CampaignProcessorTest.php index b2c51c71..009dd5fd 100644 --- a/tests/Unit/Domain/Messaging/Service/Processor/CampaignProcessorTest.php +++ b/tests/Unit/Domain/Messaging/Service/Processor/CampaignProcessorTest.php @@ -61,7 +61,7 @@ public function testProcessWithNoSubscribers(): void $metadata->expects($this->once()) ->method('setStatus') - ->with('sent'); + ->with(Message\MessageStatus::Sent); $this->entityManager->expects($this->once()) ->method('flush'); @@ -89,7 +89,7 @@ public function testProcessWithInvalidSubscriberEmail(): void $metadata->expects($this->once()) ->method('setStatus') - ->with('sent'); + ->with(Message\MessageStatus::Sent); $this->entityManager->expects($this->once()) ->method('flush'); @@ -136,7 +136,7 @@ public function testProcessWithValidSubscriberEmail(): void $metadata->expects($this->once()) ->method('setStatus') - ->with('sent'); + ->with(Message\MessageStatus::Sent); $this->entityManager->expects($this->once()) ->method('flush'); @@ -183,7 +183,7 @@ public function testProcessWithMailerException(): void $metadata->expects($this->once()) ->method('setStatus') - ->with('sent'); + ->with(Message\MessageStatus::Sent); $this->entityManager->expects($this->once()) ->method('flush'); @@ -223,7 +223,7 @@ public function testProcessWithMultipleSubscribers(): void $metadata->expects($this->once()) ->method('setStatus') - ->with('sent'); + ->with(Message\MessageStatus::Sent); $this->entityManager->expects($this->once()) ->method('flush'); @@ -266,7 +266,7 @@ public function testProcessWithNullOutput(): void $metadata->expects($this->once()) ->method('setStatus') - ->with('sent'); + ->with(Message\MessageStatus::Sent); $this->entityManager->expects($this->once()) ->method('flush'); From 228a1ae0e1ff2b7db21830488d74928d8c8d326f Mon Sep 17 00:00:00 2001 From: Tatevik Date: Thu, 11 Sep 2025 11:01:46 +0400 Subject: [PATCH 02/13] Status validate --- .../Messaging/Model/Message/MessageStatus.php | 20 ++++++++++++++++--- .../Service/Manager/MessageManager.php | 5 +++++ 2 files changed, 22 insertions(+), 3 deletions(-) diff --git a/src/Domain/Messaging/Model/Message/MessageStatus.php b/src/Domain/Messaging/Model/Message/MessageStatus.php index ad8d4876..dfb0f2d7 100644 --- a/src/Domain/Messaging/Model/Message/MessageStatus.php +++ b/src/Domain/Messaging/Model/Message/MessageStatus.php @@ -14,11 +14,25 @@ enum MessageStatus: string case Suspended = 'suspended'; case Requeued = 'requeued'; - public function isFinal(): bool + /** + * Allowed transitions for each state + * + * @return MessageStatus[] + */ + public function allowedTransitions(): array { return match ($this) { - self::Sent, self::Suspended => true, - default => false, + self::Draft, self::Suspended => [self::Submitted], + self::Submitted => [self::Prepared, self::InProcess], + self::Prepared => [self::InProcess], + self::InProcess => [self::Sent, self::Suspended], + self::Requeued => [self::InProcess, self::Suspended], + self::Sent => [], }; } + + public function canTransitionTo(self $next): bool + { + return in_array($next, $this->allowedTransitions(), true); + } } diff --git a/src/Domain/Messaging/Service/Manager/MessageManager.php b/src/Domain/Messaging/Service/Manager/MessageManager.php index c73f31ca..334db264 100644 --- a/src/Domain/Messaging/Service/Manager/MessageManager.php +++ b/src/Domain/Messaging/Service/Manager/MessageManager.php @@ -4,6 +4,7 @@ namespace PhpList\Core\Domain\Messaging\Service\Manager; +use InvalidArgumentException; use PhpList\Core\Domain\Identity\Model\Administrator; use PhpList\Core\Domain\Messaging\Model\Dto\MessageContext; use PhpList\Core\Domain\Messaging\Model\Dto\MessageDtoInterface; @@ -45,6 +46,10 @@ public function updateMessage( public function updateStatus(Message $message, Message\MessageStatus $status): Message { + if (!$message->getMetadata()->getStatus()->canTransitionTo($status)) { + throw new InvalidArgumentException('Invalid status transition'); + } + $message->getMetadata()->setStatus($status); $this->messageRepository->save($message); From 0b030537210f45d416566828fa4ee8402cbbbf62 Mon Sep 17 00:00:00 2001 From: Tatevik Date: Thu, 11 Sep 2025 11:25:22 +0400 Subject: [PATCH 03/13] Embargo check --- src/Domain/Messaging/Command/ProcessQueueCommand.php | 7 ++++++- .../Messaging/Model/Message/MessageMetadata.php | 5 +++++ .../Messaging/Repository/MessageRepository.php | 12 ++++++++++++ .../Messaging/Service/Manager/MessageManager.php | 5 ----- .../Service/Processor/CampaignProcessor.php | 7 +++++++ 5 files changed, 30 insertions(+), 6 deletions(-) diff --git a/src/Domain/Messaging/Command/ProcessQueueCommand.php b/src/Domain/Messaging/Command/ProcessQueueCommand.php index 820d403d..d8d38652 100644 --- a/src/Domain/Messaging/Command/ProcessQueueCommand.php +++ b/src/Domain/Messaging/Command/ProcessQueueCommand.php @@ -4,6 +4,8 @@ namespace PhpList\Core\Domain\Messaging\Command; +use DateTimeImmutable; +use PhpList\Core\Domain\Messaging\Model\Message\MessageStatus; use PhpList\Core\Domain\Messaging\Repository\MessageRepository; use PhpList\Core\Domain\Messaging\Service\MessageProcessingPreparator; use PhpList\Core\Domain\Messaging\Service\Processor\CampaignProcessor; @@ -54,7 +56,10 @@ protected function execute(InputInterface $input, OutputInterface $output): int $this->messagePreparator->ensureSubscribersHaveUuid($output); $this->messagePreparator->ensureCampaignsHaveUuid($output); - $campaigns = $this->messageRepository->findBy(['status' => 'submitted']); + $campaigns = $this->messageRepository->getByStatusAndEmbargo( + status: MessageStatus::Submitted, + embargo: new DateTimeImmutable() + ); foreach ($campaigns as $campaign) { $this->campaignProcessor->process($campaign, $output); diff --git a/src/Domain/Messaging/Model/Message/MessageMetadata.php b/src/Domain/Messaging/Model/Message/MessageMetadata.php index 5563a25f..156539b2 100644 --- a/src/Domain/Messaging/Model/Message/MessageMetadata.php +++ b/src/Domain/Messaging/Model/Message/MessageMetadata.php @@ -6,6 +6,7 @@ use DateTime; use Doctrine\ORM\Mapping as ORM; +use InvalidArgumentException; use PhpList\Core\Domain\Common\Model\Interfaces\EmbeddableInterface; #[ORM\Embeddable] @@ -58,7 +59,11 @@ public function getStatus(): ?MessageStatus public function setStatus(MessageStatus $status): self { + if (!$this->getStatus()->canTransitionTo($status)) { + throw new InvalidArgumentException('Invalid status transition'); + } $this->status = $status->value; + return $this; } diff --git a/src/Domain/Messaging/Repository/MessageRepository.php b/src/Domain/Messaging/Repository/MessageRepository.php index 3da7ebf3..0ae8bc18 100644 --- a/src/Domain/Messaging/Repository/MessageRepository.php +++ b/src/Domain/Messaging/Repository/MessageRepository.php @@ -4,6 +4,7 @@ namespace PhpList\Core\Domain\Messaging\Repository; +use DateTimeImmutable; use PhpList\Core\Domain\Common\Model\Filter\FilterRequestInterface; use PhpList\Core\Domain\Common\Repository\AbstractRepository; use PhpList\Core\Domain\Common\Repository\Interfaces\PaginatableRepositoryInterface; @@ -74,4 +75,15 @@ public function incrementBounceCount(int $messageId): void ->getQuery() ->execute(); } + + public function getByStatusAndEmbargo(Message\MessageStatus $status, DateTimeImmutable $embargo): array + { + return $this->createQueryBuilder('m') + ->where('m.status = :status') + ->andWhere('m.embargo IS NULL OR m.embargo <= :embargo') + ->setParameter('status', $status->value) + ->setParameter('embargo', $embargo) + ->getQuery() + ->getResult(); + } } diff --git a/src/Domain/Messaging/Service/Manager/MessageManager.php b/src/Domain/Messaging/Service/Manager/MessageManager.php index 334db264..c73f31ca 100644 --- a/src/Domain/Messaging/Service/Manager/MessageManager.php +++ b/src/Domain/Messaging/Service/Manager/MessageManager.php @@ -4,7 +4,6 @@ namespace PhpList\Core\Domain\Messaging\Service\Manager; -use InvalidArgumentException; use PhpList\Core\Domain\Identity\Model\Administrator; use PhpList\Core\Domain\Messaging\Model\Dto\MessageContext; use PhpList\Core\Domain\Messaging\Model\Dto\MessageDtoInterface; @@ -46,10 +45,6 @@ public function updateMessage( public function updateStatus(Message $message, Message\MessageStatus $status): Message { - if (!$message->getMetadata()->getStatus()->canTransitionTo($status)) { - throw new InvalidArgumentException('Invalid status transition'); - } - $message->getMetadata()->setStatus($status); $this->messageRepository->save($message); diff --git a/src/Domain/Messaging/Service/Processor/CampaignProcessor.php b/src/Domain/Messaging/Service/Processor/CampaignProcessor.php index cb051519..f4e80843 100644 --- a/src/Domain/Messaging/Service/Processor/CampaignProcessor.php +++ b/src/Domain/Messaging/Service/Processor/CampaignProcessor.php @@ -38,7 +38,14 @@ public function __construct( public function process(Message $campaign, ?OutputInterface $output = null): void { + $campaign->getMetadata()->setStatus(Message\MessageStatus::Prepared); + $this->entityManager->flush(); + $subscribers = $this->subscriberProvider->getSubscribersForMessage($campaign); + + $campaign->getMetadata()->setStatus(Message\MessageStatus::InProcess); + $this->entityManager->flush(); + // phpcs:ignore Generic.Commenting.Todo // @todo check $ISPrestrictions logic foreach ($subscribers as $subscriber) { From 6874b8565ff176237335034a5814b9fc42a0e643 Mon Sep 17 00:00:00 2001 From: Tatevik Date: Thu, 11 Sep 2025 12:44:41 +0400 Subject: [PATCH 04/13] IspRestrictions --- config/parameters.yml.dist | 3 + config/services/services.yml | 6 ++ src/Domain/Common/IspRestrictionsProvider.php | 76 +++++++++++++++++++ src/Domain/Common/Model/IspRestrictions.php | 22 ++++++ 4 files changed, 107 insertions(+) create mode 100644 src/Domain/Common/IspRestrictionsProvider.php create mode 100644 src/Domain/Common/Model/IspRestrictions.php diff --git a/config/parameters.yml.dist b/config/parameters.yml.dist index 54c649d8..a3884231 100644 --- a/config/parameters.yml.dist +++ b/config/parameters.yml.dist @@ -68,3 +68,6 @@ parameters: graylog_host: 'graylog.example.com' graylog_port: 12201 + + app.phplist_isp_conf_path: '%%env(APP_PHPLIST_ISP_CONF_PATH)%%' + env(APP_PHPLIST_ISP_CONF_PATH): '/etc/phplist.conf' diff --git a/config/services/services.yml b/config/services/services.yml index f1b68e74..7b419089 100644 --- a/config/services/services.yml +++ b/config/services/services.yml @@ -44,6 +44,12 @@ services: autowire: true autoconfigure: true + PhpList\Core\Domain\Common\IspRestrictionsProvider: + autowire: true + autoconfigure: true + arguments: + $confPath: '%app.phplist_isp_conf_path%' + PhpList\Core\Domain\Messaging\Service\ConsecutiveBounceHandler: autowire: true autoconfigure: true diff --git a/src/Domain/Common/IspRestrictionsProvider.php b/src/Domain/Common/IspRestrictionsProvider.php new file mode 100644 index 00000000..0e4bb21a --- /dev/null +++ b/src/Domain/Common/IspRestrictionsProvider.php @@ -0,0 +1,76 @@ +confPath) || !is_readable($this->confPath)) { + return new IspRestrictions(null, null, null); + } + + $contents = file_get_contents($this->confPath); + if ($contents === false) { + $this->logger->warning('Cannot read ISP restrictions file', ['path' => $this->confPath]); + return new IspRestrictions(null, null, null); + } + + $maxBatch = null; + $minBatchPeriod = null; + $lockFile = null; + + $raw = []; + foreach (preg_split('/\R/', $contents) as $line) { + $line = trim($line); + if ($line === '' || str_starts_with($line, '#') || str_starts_with($line, ';')) { + continue; + } + $parts = explode('=', $line, 2); + if (\count($parts) !== 2) { + continue; + } + [$key, $val] = array_map('trim', $parts); + $raw[$key] = $val; + + switch ($key) { + case 'maxbatch': + if ($val !== '' && ctype_digit($val)) { + $maxBatch = (int) $val; + } + break; + case 'minbatchperiod': + if ($val !== '' && ctype_digit($val)) { + $minBatchPeriod = (int) $val; + } + break; + case 'lockfile': + if ($val !== '') { + $lockFile = $val; + } + break; + } + } + + if ($maxBatch !== null || $minBatchPeriod !== null || $lockFile !== null) { + $this->logger->info('ISP restrictions detected', [ + 'path' => $this->confPath, + 'maxbatch' => $maxBatch, + 'minbatchperiod' => $minBatchPeriod, + 'lockfile' => $lockFile, + ]); + } + + return new IspRestrictions($maxBatch, $minBatchPeriod, $lockFile, $raw); + } +} diff --git a/src/Domain/Common/Model/IspRestrictions.php b/src/Domain/Common/Model/IspRestrictions.php new file mode 100644 index 00000000..08fd9149 --- /dev/null +++ b/src/Domain/Common/Model/IspRestrictions.php @@ -0,0 +1,22 @@ +maxBatch === null && $this->minBatchPeriod === null && $this->lockFile === null; + } +} From 351f73a0287cfc078d1d3e38208f11a2fd308ed6 Mon Sep 17 00:00:00 2001 From: Tatevik Date: Fri, 12 Sep 2025 10:32:40 +0400 Subject: [PATCH 05/13] IspRestrictions --- config/parameters.yml.dist | 8 ++ config/services.yml | 4 - config/services/providers.yml | 4 + config/services/services.yml | 4 + .../Service/Processor/CampaignProcessor.php | 74 ++++++++++++++++--- 5 files changed, 78 insertions(+), 16 deletions(-) diff --git a/config/parameters.yml.dist b/config/parameters.yml.dist index a3884231..e042fa4a 100644 --- a/config/parameters.yml.dist +++ b/config/parameters.yml.dist @@ -71,3 +71,11 @@ parameters: app.phplist_isp_conf_path: '%%env(APP_PHPLIST_ISP_CONF_PATH)%%' env(APP_PHPLIST_ISP_CONF_PATH): '/etc/phplist.conf' + + # Message sending + messaging.mail_queue_batch_size: '%%env(MAILQUEUE_BATCH_SIZE)%%' + env(MAILQUEUE_BATCH_SIZE): '5' + messaging.mail_queue_period: '%%env(MAILQUEUE_BATCH_PERIOD)%%' + env(MAILQUEUE_BATCH_PERIOD): '5' + messaging.mail_queue_throttle: '%%env(MAILQUEUE_THROTTLE)%%' + env(MAILQUEUE_THROTTLE): '5' diff --git a/config/services.yml b/config/services.yml index 47be8241..b21dc5aa 100644 --- a/config/services.yml +++ b/config/services.yml @@ -7,10 +7,6 @@ services: autoconfigure: true public: false - PhpList\Core\Core\ConfigProvider: - arguments: - $config: '%app.config%' - PhpList\Core\Core\ApplicationStructure: public: true diff --git a/config/services/providers.yml b/config/services/providers.yml index 226c4e81..b1d90720 100644 --- a/config/services/providers.yml +++ b/config/services/providers.yml @@ -2,3 +2,7 @@ services: PhpList\Core\Domain\Subscription\Service\Provider\SubscriberProvider: autowire: true autoconfigure: true + + PhpList\Core\Core\ConfigProvider: + arguments: + $config: '%app.config%' diff --git a/config/services/services.yml b/config/services/services.yml index 7b419089..fb113957 100644 --- a/config/services/services.yml +++ b/config/services/services.yml @@ -35,6 +35,10 @@ services: autowire: true autoconfigure: true public: true + arguments: + $mailqueueBatchSize: '%messaging.mail_queue_batch_size%' + $mailqueueBatchPeriod: '%messaging.mail_queue_period%' + $mailqueueThrottle: '%messaging.mail_queue_throttle%' PhpList\Core\Domain\Common\ClientIpResolver: autowire: true diff --git a/src/Domain/Messaging/Service/Processor/CampaignProcessor.php b/src/Domain/Messaging/Service/Processor/CampaignProcessor.php index f4e80843..5e891c97 100644 --- a/src/Domain/Messaging/Service/Processor/CampaignProcessor.php +++ b/src/Domain/Messaging/Service/Processor/CampaignProcessor.php @@ -7,6 +7,7 @@ use Doctrine\ORM\EntityManagerInterface; use PhpList\Core\Domain\Messaging\Model\Message; use PhpList\Core\Domain\Messaging\Service\MessageProcessingPreparator; +use PhpList\Core\Domain\Common\IspRestrictionsProvider; use PhpList\Core\Domain\Subscription\Service\Provider\SubscriberProvider; use Psr\Log\LoggerInterface; use Symfony\Component\Console\Output\OutputInterface; @@ -21,6 +22,10 @@ class CampaignProcessor private SubscriberProvider $subscriberProvider; private MessageProcessingPreparator $messagePreparator; private LoggerInterface $logger; + private ?IspRestrictionsProvider $ispRestrictionsProvider; + private ?int $mailqueueBatchSize; + private ?int $mailqueueBatchPeriod; + private ?int $mailqueueThrottle; public function __construct( MailerInterface $mailer, @@ -28,27 +33,67 @@ public function __construct( SubscriberProvider $subscriberProvider, MessageProcessingPreparator $messagePreparator, LoggerInterface $logger, + ?IspRestrictionsProvider $ispRestrictionsProvider = null, + ?int $mailqueueBatchSize = null, + ?int $mailqueueBatchPeriod = null, + ?int $mailqueueThrottle = null, ) { $this->mailer = $mailer; $this->entityManager = $entityManager; $this->subscriberProvider = $subscriberProvider; $this->messagePreparator = $messagePreparator; $this->logger = $logger; + $this->ispRestrictionsProvider = $ispRestrictionsProvider; + $this->mailqueueBatchSize = $mailqueueBatchSize; + $this->mailqueueBatchPeriod = $mailqueueBatchPeriod; + $this->mailqueueThrottle = $mailqueueThrottle; } public function process(Message $campaign, ?OutputInterface $output = null): void { - $campaign->getMetadata()->setStatus(Message\MessageStatus::Prepared); - $this->entityManager->flush(); - + $this->updateMessageStatus($campaign, Message\MessageStatus::Prepared); + $ispRestrictions = $this->ispRestrictionsProvider->load(); $subscribers = $this->subscriberProvider->getSubscribersForMessage($campaign); - $campaign->getMetadata()->setStatus(Message\MessageStatus::InProcess); - $this->entityManager->flush(); + $cfgBatch = ($this->mailqueueBatchSize ?? 0); + $ispMax = isset($ispRestrictions->maxBatch) ? (int)$ispRestrictions->maxBatch : null; + + $cfgPeriod = ($this->mailqueueBatchPeriod ?? 0); + $ispMinPeriod = ($ispRestrictions->minBatchPeriod ?? 0); + + $cfgThrottle = ($this->mailqueueThrottle ?? 0); + $ispMinThrottle = (int)($ispRestrictions->minThrottle ?? 0); + + if ($cfgBatch <= 0) { + $batchSize = $ispMax !== null ? max(0, $ispMax) : 0; + } else { + $batchSize = $ispMax !== null ? min($cfgBatch, max(1, $ispMax)) : $cfgBatch; + } + + $batchPeriod = max(0, $cfgPeriod, $ispMinPeriod); + + $throttleSec = max(0, $cfgThrottle, $ispMinThrottle); + + $sentInBatch = 0; + $batchStart = microtime(true); + + $this->updateMessageStatus($campaign, Message\MessageStatus::InProcess); - // phpcs:ignore Generic.Commenting.Todo - // @todo check $ISPrestrictions logic foreach ($subscribers as $subscriber) { + if ($batchSize > 0 && $batchPeriod > 0 && $sentInBatch >= $batchSize) { + $elapsed = microtime(true) - $batchStart; + $remaining = (int)ceil($batchPeriod - $elapsed); + if ($remaining > 0) { + $output?->writeln(sprintf( + 'Batch limit reached, sleeping %ds to respect MAILQUEUE_BATCH_PERIOD', + $remaining + )); + sleep($remaining); + } + $batchStart = microtime(true); + $sentInBatch = 0; + } + if (!filter_var($subscriber->getEmail(), FILTER_VALIDATE_EMAIL)) { continue; } @@ -62,9 +107,7 @@ public function process(Message $campaign, ?OutputInterface $output = null): voi try { $this->mailer->send($email); - - // phpcs:ignore Generic.Commenting.Todo - // @todo log somewhere that this subscriber got email + $sentInBatch++; } catch (Throwable $e) { $this->logger->error($e->getMessage(), [ 'subscriber_id' => $subscriber->getId(), @@ -73,10 +116,17 @@ public function process(Message $campaign, ?OutputInterface $output = null): voi $output?->writeln('Failed to send to: ' . $subscriber->getEmail()); } - usleep(100000); + if ($throttleSec > 0) { + sleep($throttleSec); + } } - $campaign->getMetadata()->setStatus(Message\MessageStatus::Sent); + $this->updateMessageStatus($campaign, Message\MessageStatus::Sent); + } + + private function updateMessageStatus(Message $message, Message\MessageStatus $status): void + { + $message->getMetadata()->setStatus($status); $this->entityManager->flush(); } } From 94155dc4a633974367d4448269fe9a39331415ef Mon Sep 17 00:00:00 2001 From: Tatevik Date: Fri, 12 Sep 2025 11:28:03 +0400 Subject: [PATCH 06/13] SendRateLimiter --- config/services/services.yml | 4 + .../Service/Processor/CampaignProcessor.php | 60 ++----------- .../Messaging/Service/SendRateLimiter.php | 88 +++++++++++++++++++ .../Command/ProcessQueueCommandTest.php | 16 ++-- .../Processor/CampaignProcessorTest.php | 50 +++++------ .../Messaging/Service/SendRateLimiterTest.php | 86 ++++++++++++++++++ 6 files changed, 217 insertions(+), 87 deletions(-) create mode 100644 src/Domain/Messaging/Service/SendRateLimiter.php create mode 100644 tests/Unit/Domain/Messaging/Service/SendRateLimiterTest.php diff --git a/config/services/services.yml b/config/services/services.yml index fb113957..611d9afe 100644 --- a/config/services/services.yml +++ b/config/services/services.yml @@ -35,6 +35,10 @@ services: autowire: true autoconfigure: true public: true + + PhpList\Core\Domain\Messaging\Service\SendRateLimiter: + autowire: true + autoconfigure: true arguments: $mailqueueBatchSize: '%messaging.mail_queue_batch_size%' $mailqueueBatchPeriod: '%messaging.mail_queue_period%' diff --git a/src/Domain/Messaging/Service/Processor/CampaignProcessor.php b/src/Domain/Messaging/Service/Processor/CampaignProcessor.php index 5e891c97..eb7bd496 100644 --- a/src/Domain/Messaging/Service/Processor/CampaignProcessor.php +++ b/src/Domain/Messaging/Service/Processor/CampaignProcessor.php @@ -7,7 +7,7 @@ use Doctrine\ORM\EntityManagerInterface; use PhpList\Core\Domain\Messaging\Model\Message; use PhpList\Core\Domain\Messaging\Service\MessageProcessingPreparator; -use PhpList\Core\Domain\Common\IspRestrictionsProvider; +use PhpList\Core\Domain\Messaging\Service\SendRateLimiter; use PhpList\Core\Domain\Subscription\Service\Provider\SubscriberProvider; use Psr\Log\LoggerInterface; use Symfony\Component\Console\Output\OutputInterface; @@ -22,10 +22,7 @@ class CampaignProcessor private SubscriberProvider $subscriberProvider; private MessageProcessingPreparator $messagePreparator; private LoggerInterface $logger; - private ?IspRestrictionsProvider $ispRestrictionsProvider; - private ?int $mailqueueBatchSize; - private ?int $mailqueueBatchPeriod; - private ?int $mailqueueThrottle; + private SendRateLimiter $rateLimiter; public function __construct( MailerInterface $mailer, @@ -33,66 +30,25 @@ public function __construct( SubscriberProvider $subscriberProvider, MessageProcessingPreparator $messagePreparator, LoggerInterface $logger, - ?IspRestrictionsProvider $ispRestrictionsProvider = null, - ?int $mailqueueBatchSize = null, - ?int $mailqueueBatchPeriod = null, - ?int $mailqueueThrottle = null, + SendRateLimiter $rateLimiter, ) { $this->mailer = $mailer; $this->entityManager = $entityManager; $this->subscriberProvider = $subscriberProvider; $this->messagePreparator = $messagePreparator; $this->logger = $logger; - $this->ispRestrictionsProvider = $ispRestrictionsProvider; - $this->mailqueueBatchSize = $mailqueueBatchSize; - $this->mailqueueBatchPeriod = $mailqueueBatchPeriod; - $this->mailqueueThrottle = $mailqueueThrottle; + $this->rateLimiter = $rateLimiter; } public function process(Message $campaign, ?OutputInterface $output = null): void { $this->updateMessageStatus($campaign, Message\MessageStatus::Prepared); - $ispRestrictions = $this->ispRestrictionsProvider->load(); $subscribers = $this->subscriberProvider->getSubscribersForMessage($campaign); - $cfgBatch = ($this->mailqueueBatchSize ?? 0); - $ispMax = isset($ispRestrictions->maxBatch) ? (int)$ispRestrictions->maxBatch : null; - - $cfgPeriod = ($this->mailqueueBatchPeriod ?? 0); - $ispMinPeriod = ($ispRestrictions->minBatchPeriod ?? 0); - - $cfgThrottle = ($this->mailqueueThrottle ?? 0); - $ispMinThrottle = (int)($ispRestrictions->minThrottle ?? 0); - - if ($cfgBatch <= 0) { - $batchSize = $ispMax !== null ? max(0, $ispMax) : 0; - } else { - $batchSize = $ispMax !== null ? min($cfgBatch, max(1, $ispMax)) : $cfgBatch; - } - - $batchPeriod = max(0, $cfgPeriod, $ispMinPeriod); - - $throttleSec = max(0, $cfgThrottle, $ispMinThrottle); - - $sentInBatch = 0; - $batchStart = microtime(true); - $this->updateMessageStatus($campaign, Message\MessageStatus::InProcess); foreach ($subscribers as $subscriber) { - if ($batchSize > 0 && $batchPeriod > 0 && $sentInBatch >= $batchSize) { - $elapsed = microtime(true) - $batchStart; - $remaining = (int)ceil($batchPeriod - $elapsed); - if ($remaining > 0) { - $output?->writeln(sprintf( - 'Batch limit reached, sleeping %ds to respect MAILQUEUE_BATCH_PERIOD', - $remaining - )); - sleep($remaining); - } - $batchStart = microtime(true); - $sentInBatch = 0; - } + $this->rateLimiter->awaitTurn($output); if (!filter_var($subscriber->getEmail(), FILTER_VALIDATE_EMAIL)) { continue; @@ -107,7 +63,7 @@ public function process(Message $campaign, ?OutputInterface $output = null): voi try { $this->mailer->send($email); - $sentInBatch++; + $this->rateLimiter->afterSend(); } catch (Throwable $e) { $this->logger->error($e->getMessage(), [ 'subscriber_id' => $subscriber->getId(), @@ -115,10 +71,6 @@ public function process(Message $campaign, ?OutputInterface $output = null): voi ]); $output?->writeln('Failed to send to: ' . $subscriber->getEmail()); } - - if ($throttleSec > 0) { - sleep($throttleSec); - } } $this->updateMessageStatus($campaign, Message\MessageStatus::Sent); diff --git a/src/Domain/Messaging/Service/SendRateLimiter.php b/src/Domain/Messaging/Service/SendRateLimiter.php new file mode 100644 index 00000000..8516657a --- /dev/null +++ b/src/Domain/Messaging/Service/SendRateLimiter.php @@ -0,0 +1,88 @@ +initializeLimits(); + } + + private function initializeLimits(): void + { + $isp = $this->ispRestrictionsProvider->load(); + + $cfgBatch = $this->mailqueueBatchSize ?? 0; + $ispMax = isset($isp->maxBatch) ? (int)$isp->maxBatch : null; + + $cfgPeriod = $this->mailqueueBatchPeriod ?? 0; + $ispMinPeriod = $isp->minBatchPeriod ?? 0; + + $cfgThrottle = $this->mailqueueThrottle ?? 0; + $ispMinThrottle = (int)($isp->minThrottle ?? 0); + + if ($cfgBatch <= 0) { + $this->batchSize = $ispMax !== null ? max(0, $ispMax) : 0; + } else { + $this->batchSize = $ispMax !== null ? min($cfgBatch, max(1, $ispMax)) : $cfgBatch; + } + $this->batchPeriod = max(0, $cfgPeriod, $ispMinPeriod); + $this->throttleSec = max(0, $cfgThrottle, $ispMinThrottle); + + $this->sentInBatch = 0; + $this->batchStart = microtime(true); + } + + /** + * Call before attempting to send another message. It will sleep if needed to + * respect batch limits. Returns true when it's okay to proceed. + */ + public function awaitTurn(?OutputInterface $output = null): bool + { + if ($this->batchSize > 0 && $this->batchPeriod > 0 && $this->sentInBatch >= $this->batchSize) { + $elapsed = microtime(true) - $this->batchStart; + $remaining = (int)ceil($this->batchPeriod - $elapsed); + if ($remaining > 0) { + $output?->writeln(sprintf( + 'Batch limit reached, sleeping %ds to respect MAILQUEUE_BATCH_PERIOD', + $remaining + )); + sleep($remaining); + } + $this->batchStart = microtime(true); + $this->sentInBatch = 0; + } + return true; + } + + /** + * Call after a successful sending to update counters and apply per-message throttle. + */ + public function afterSend(): void + { + $this->sentInBatch++; + if ($this->throttleSec > 0) { + sleep($this->throttleSec); + } + } +} diff --git a/tests/Unit/Domain/Messaging/Command/ProcessQueueCommandTest.php b/tests/Unit/Domain/Messaging/Command/ProcessQueueCommandTest.php index 79ece9bd..e8f370bb 100644 --- a/tests/Unit/Domain/Messaging/Command/ProcessQueueCommandTest.php +++ b/tests/Unit/Domain/Messaging/Command/ProcessQueueCommandTest.php @@ -82,8 +82,8 @@ public function testExecuteWithNoCampaigns(): void ->method('ensureCampaignsHaveUuid'); $this->messageRepository->expects($this->once()) - ->method('findBy') - ->with(['status' => 'submitted']) + ->method('getByStatusAndEmbargo') + ->with($this->anything(), $this->anything()) ->willReturn([]); $this->campaignProcessor->expects($this->never()) @@ -112,8 +112,8 @@ public function testExecuteWithCampaigns(): void $campaign = $this->createMock(Message::class); $this->messageRepository->expects($this->once()) - ->method('findBy') - ->with(['status' => 'submitted']) + ->method('getByStatusAndEmbargo') + ->with($this->anything(), $this->anything()) ->willReturn([$campaign]); $this->campaignProcessor->expects($this->once()) @@ -145,8 +145,8 @@ public function testExecuteWithMultipleCampaigns(): void $campaign2 = $this->createMock(Message::class); $this->messageRepository->expects($this->once()) - ->method('findBy') - ->with(['status' => 'submitted']) + ->method('getByStatusAndEmbargo') + ->with($this->anything(), $this->anything()) ->willReturn([$campaign1, $campaign2]); $this->campaignProcessor->expects($this->exactly(2)) @@ -179,8 +179,8 @@ public function testExecuteWithProcessorException(): void $campaign = $this->createMock(Message::class); $this->messageRepository->expects($this->once()) - ->method('findBy') - ->with(['status' => 'submitted']) + ->method('getByStatusAndEmbargo') + ->with($this->anything(), $this->anything()) ->willReturn([$campaign]); $this->campaignProcessor->expects($this->once()) diff --git a/tests/Unit/Domain/Messaging/Service/Processor/CampaignProcessorTest.php b/tests/Unit/Domain/Messaging/Service/Processor/CampaignProcessorTest.php index 009dd5fd..ac76dffb 100644 --- a/tests/Unit/Domain/Messaging/Service/Processor/CampaignProcessorTest.php +++ b/tests/Unit/Domain/Messaging/Service/Processor/CampaignProcessorTest.php @@ -11,6 +11,7 @@ use PhpList\Core\Domain\Messaging\Model\Message\MessageMetadata; use PhpList\Core\Domain\Messaging\Service\MessageProcessingPreparator; use PhpList\Core\Domain\Messaging\Service\Processor\CampaignProcessor; +use PhpList\Core\Domain\Messaging\Service\SendRateLimiter; use PhpList\Core\Domain\Subscription\Model\Subscriber; use PhpList\Core\Domain\Subscription\Service\Provider\SubscriberProvider; use PHPUnit\Framework\MockObject\MockObject; @@ -29,6 +30,7 @@ class CampaignProcessorTest extends TestCase private LoggerInterface&MockObject $logger; private OutputInterface&MockObject $output; private CampaignProcessor $campaignProcessor; + private SendRateLimiter&MockObject $rateLimiter; protected function setUp(): void { @@ -38,13 +40,17 @@ protected function setUp(): void $this->messagePreparator = $this->createMock(MessageProcessingPreparator::class); $this->logger = $this->createMock(LoggerInterface::class); $this->output = $this->createMock(OutputInterface::class); + $this->rateLimiter = $this->createMock(SendRateLimiter::class); + $this->rateLimiter->method('awaitTurn'); + $this->rateLimiter->method('afterSend'); $this->campaignProcessor = new CampaignProcessor( $this->mailer, $this->entityManager, $this->subscriberProvider, $this->messagePreparator, - $this->logger + $this->logger, + $this->rateLimiter ); } @@ -59,11 +65,10 @@ public function testProcessWithNoSubscribers(): void ->with($campaign) ->willReturn([]); - $metadata->expects($this->once()) - ->method('setStatus') - ->with(Message\MessageStatus::Sent); + $metadata->expects($this->atLeastOnce()) + ->method('setStatus'); - $this->entityManager->expects($this->once()) + $this->entityManager->expects($this->atLeastOnce()) ->method('flush'); $this->mailer->expects($this->never()) @@ -87,11 +92,10 @@ public function testProcessWithInvalidSubscriberEmail(): void ->with($campaign) ->willReturn([$subscriber]); - $metadata->expects($this->once()) - ->method('setStatus') - ->with(Message\MessageStatus::Sent); + $metadata->expects($this->atLeastOnce()) + ->method('setStatus'); - $this->entityManager->expects($this->once()) + $this->entityManager->expects($this->atLeastOnce()) ->method('flush'); $this->messagePreparator->expects($this->never()) @@ -134,11 +138,10 @@ public function testProcessWithValidSubscriberEmail(): void return true; })); - $metadata->expects($this->once()) - ->method('setStatus') - ->with(Message\MessageStatus::Sent); + $metadata->expects($this->atLeastOnce()) + ->method('setStatus'); - $this->entityManager->expects($this->once()) + $this->entityManager->expects($this->atLeastOnce()) ->method('flush'); $this->campaignProcessor->process($campaign, $this->output); @@ -181,11 +184,10 @@ public function testProcessWithMailerException(): void ->method('writeln') ->with('Failed to send to: test@example.com'); - $metadata->expects($this->once()) - ->method('setStatus') - ->with(Message\MessageStatus::Sent); + $metadata->expects($this->atLeastOnce()) + ->method('setStatus'); - $this->entityManager->expects($this->once()) + $this->entityManager->expects($this->atLeastOnce()) ->method('flush'); $this->campaignProcessor->process($campaign, $this->output); @@ -221,11 +223,10 @@ public function testProcessWithMultipleSubscribers(): void $this->mailer->expects($this->exactly(2)) ->method('send'); - $metadata->expects($this->once()) - ->method('setStatus') - ->with(Message\MessageStatus::Sent); + $metadata->expects($this->atLeastOnce()) + ->method('setStatus'); - $this->entityManager->expects($this->once()) + $this->entityManager->expects($this->atLeastOnce()) ->method('flush'); $this->campaignProcessor->process($campaign, $this->output); @@ -264,11 +265,10 @@ public function testProcessWithNullOutput(): void 'campaign_id' => 123, ]); - $metadata->expects($this->once()) - ->method('setStatus') - ->with(Message\MessageStatus::Sent); + $metadata->expects($this->atLeastOnce()) + ->method('setStatus'); - $this->entityManager->expects($this->once()) + $this->entityManager->expects($this->atLeastOnce()) ->method('flush'); $this->campaignProcessor->process($campaign, null); diff --git a/tests/Unit/Domain/Messaging/Service/SendRateLimiterTest.php b/tests/Unit/Domain/Messaging/Service/SendRateLimiterTest.php new file mode 100644 index 00000000..f45b4c79 --- /dev/null +++ b/tests/Unit/Domain/Messaging/Service/SendRateLimiterTest.php @@ -0,0 +1,86 @@ +ispProvider = $this->createMock(IspRestrictionsProvider::class); + } + + public function testInitializesLimitsFromConfigOnly(): void + { + $this->ispProvider->method('load')->willReturn(new IspRestrictions(null, null, null)); + $limiter = new SendRateLimiter( + ispRestrictionsProvider: $this->ispProvider, + mailqueueBatchSize: 5, + mailqueueBatchPeriod: 10, + mailqueueThrottle: 2 + ); + + $output = $this->createMock(OutputInterface::class); + $output->expects($this->never())->method('writeln'); + + $this->assertTrue($limiter->awaitTurn($output)); + } + + public function testBatchLimitTriggersWaitMessageAndResetsCounters(): void + { + $this->ispProvider->method('load')->willReturn(new IspRestrictions(2, 1, null)); + $limiter = new SendRateLimiter( + $this->ispProvider, + mailqueueBatchSize: 10, + mailqueueBatchPeriod: 1, + mailqueueThrottle: 0 + ); + + $limiter->afterSend(); + $limiter->afterSend(); + + $output = $this->createMock(OutputInterface::class); + // We cannot reliably assert the exact second, but we assert a message called at least once + $output->expects($this->atLeast(0))->method('writeln'); + + // Now awaitTurn should detect batch full and attempt to sleep and reset. + $this->assertTrue($limiter->awaitTurn($output)); + + // Next afterSend should increase the counter again without exception + $limiter->afterSend(); + // Reaching here means no fatal due to internal counter/reset logic + $this->assertTrue(true); + } + + public function testThrottleSleepsPerMessagePathIsCallable(): void + { + $this->ispProvider->method('load')->willReturn(new IspRestrictions(null, null, null)); + $limiter = new SendRateLimiter( + $this->ispProvider, + mailqueueBatchSize: 0, + mailqueueBatchPeriod: 0, + mailqueueThrottle: 1 + ); + + // We cannot speed up sleep without extensions; just call method to ensure no exceptions + $start = microtime(true); + $limiter->afterSend(); + $elapsed = microtime(true) - $start; + + // Ensure it likely slept at least ~0.5s + if ($elapsed < 0.3) { + $this->markTestIncomplete('Environment too fast to detect sleep; logic path executed.'); + } + $this->assertTrue(true); + } +} From 0f455f5706c1694a4df1ab1d6d54a63b430b392b Mon Sep 17 00:00:00 2001 From: Tatevik Date: Fri, 12 Sep 2025 13:15:05 +0400 Subject: [PATCH 07/13] UserMessageStatus --- .../Model/Message/UserMessageStatus.php | 16 +++++++++++ src/Domain/Messaging/Model/UserMessage.php | 12 +++++--- .../Repository/UserMessageRepository.php | 7 +++++ .../Service/Processor/CampaignProcessor.php | 28 ++++++++++++++++++- .../Service/SubscriberDeletionServiceTest.php | 2 +- .../Processor/CampaignProcessorTest.php | 16 +++++++---- 6 files changed, 69 insertions(+), 12 deletions(-) create mode 100644 src/Domain/Messaging/Model/Message/UserMessageStatus.php diff --git a/src/Domain/Messaging/Model/Message/UserMessageStatus.php b/src/Domain/Messaging/Model/Message/UserMessageStatus.php new file mode 100644 index 00000000..1237cfe8 --- /dev/null +++ b/src/Domain/Messaging/Model/Message/UserMessageStatus.php @@ -0,0 +1,16 @@ +viewed; } - public function getStatus(): ?string + /** + * @SuppressWarnings("PHPMD.StaticAccess") + */ + public function getStatus(): ?UserMessageStatus { - return $this->status; + return UserMessageStatus::from($this->status); } public function setViewed(?DateTime $viewed): self @@ -76,9 +80,9 @@ public function setViewed(?DateTime $viewed): self return $this; } - public function setStatus(?string $status): self + public function setStatus(?UserMessageStatus $status): self { - $this->status = $status; + $this->status = $status->value; return $this; } } diff --git a/src/Domain/Messaging/Repository/UserMessageRepository.php b/src/Domain/Messaging/Repository/UserMessageRepository.php index a19c5823..6fbdceac 100644 --- a/src/Domain/Messaging/Repository/UserMessageRepository.php +++ b/src/Domain/Messaging/Repository/UserMessageRepository.php @@ -5,7 +5,14 @@ namespace PhpList\Core\Domain\Messaging\Repository; use PhpList\Core\Domain\Common\Repository\AbstractRepository; +use PhpList\Core\Domain\Messaging\Model\Message; +use PhpList\Core\Domain\Messaging\Model\UserMessage; +use PhpList\Core\Domain\Subscription\Model\Subscriber; class UserMessageRepository extends AbstractRepository { + public function findOneByUserAndMessage(Subscriber $subscriber, Message $campaign): ?UserMessage + { + return $this->findOneBy(['user' => $subscriber, 'message' => $campaign]); + } } diff --git a/src/Domain/Messaging/Service/Processor/CampaignProcessor.php b/src/Domain/Messaging/Service/Processor/CampaignProcessor.php index eb7bd496..10844b1d 100644 --- a/src/Domain/Messaging/Service/Processor/CampaignProcessor.php +++ b/src/Domain/Messaging/Service/Processor/CampaignProcessor.php @@ -6,6 +6,9 @@ use Doctrine\ORM\EntityManagerInterface; use PhpList\Core\Domain\Messaging\Model\Message; +use PhpList\Core\Domain\Messaging\Model\UserMessage; +use PhpList\Core\Domain\Messaging\Model\Message\UserMessageStatus; +use PhpList\Core\Domain\Messaging\Repository\UserMessageRepository; use PhpList\Core\Domain\Messaging\Service\MessageProcessingPreparator; use PhpList\Core\Domain\Messaging\Service\SendRateLimiter; use PhpList\Core\Domain\Subscription\Service\Provider\SubscriberProvider; @@ -23,6 +26,7 @@ class CampaignProcessor private MessageProcessingPreparator $messagePreparator; private LoggerInterface $logger; private SendRateLimiter $rateLimiter; + private UserMessageRepository $userMessageRepository; public function __construct( MailerInterface $mailer, @@ -31,6 +35,7 @@ public function __construct( MessageProcessingPreparator $messagePreparator, LoggerInterface $logger, SendRateLimiter $rateLimiter, + UserMessageRepository $userMessageRepository ) { $this->mailer = $mailer; $this->entityManager = $entityManager; @@ -38,6 +43,7 @@ public function __construct( $this->messagePreparator = $messagePreparator; $this->logger = $logger; $this->rateLimiter = $rateLimiter; + $this->userMessageRepository = $userMessageRepository; } public function process(Message $campaign, ?OutputInterface $output = null): void @@ -48,12 +54,25 @@ public function process(Message $campaign, ?OutputInterface $output = null): voi $this->updateMessageStatus($campaign, Message\MessageStatus::InProcess); foreach ($subscribers as $subscriber) { + $existing = $this->userMessageRepository->findOneByUserAndMessage($subscriber, $campaign); + if ($existing && $existing->getStatus() !== UserMessageStatus::Todo->value) { + continue; + } + + $userMessage = $existing ?? new UserMessage($subscriber, $campaign); + $userMessage->setStatus(UserMessageStatus::Active); + $this->entityManager->persist($userMessage); + $this->entityManager->flush(); + $this->rateLimiter->awaitTurn($output); if (!filter_var($subscriber->getEmail(), FILTER_VALIDATE_EMAIL)) { + $this->updateUserMessageStatus($userMessage, UserMessageStatus::InvalidEmailAddress); continue; } + $this->messagePreparator->processMessageLinks($campaign, $subscriber->getId()); + $email = (new Email()) ->from('news@example.com') ->to($subscriber->getEmail()) @@ -63,8 +82,10 @@ public function process(Message $campaign, ?OutputInterface $output = null): voi try { $this->mailer->send($email); + $this->updateUserMessageStatus($userMessage, UserMessageStatus::Sent); $this->rateLimiter->afterSend(); } catch (Throwable $e) { + $this->updateUserMessageStatus($userMessage, UserMessageStatus::NotSent); $this->logger->error($e->getMessage(), [ 'subscriber_id' => $subscriber->getId(), 'campaign_id' => $campaign->getId(), @@ -81,4 +102,9 @@ private function updateMessageStatus(Message $message, Message\MessageStatus $st $message->getMetadata()->setStatus($status); $this->entityManager->flush(); } -} + + private function updateUserMessageStatus(UserMessage $userMessage, Message\UserMessageStatus $status): void + { + $userMessage->setStatus($status); + $this->entityManager->flush(); + }} diff --git a/tests/Integration/Domain/Subscription/Service/SubscriberDeletionServiceTest.php b/tests/Integration/Domain/Subscription/Service/SubscriberDeletionServiceTest.php index 03704f53..9019fd30 100644 --- a/tests/Integration/Domain/Subscription/Service/SubscriberDeletionServiceTest.php +++ b/tests/Integration/Domain/Subscription/Service/SubscriberDeletionServiceTest.php @@ -92,7 +92,7 @@ public function testDeleteSubscriberWithRelatedDataDoesNotThrowDoctrineError(): $this->entityManager->persist($linkTrackUmlClick); $userMessage = new UserMessage($subscriber, $msg); - $userMessage->setStatus('sent'); + $userMessage->setStatus(Message\UserMessageStatus::Sent); $this->entityManager->persist($userMessage); $userMessageBounce = new UserMessageBounce(1, new DateTime()); diff --git a/tests/Unit/Domain/Messaging/Service/Processor/CampaignProcessorTest.php b/tests/Unit/Domain/Messaging/Service/Processor/CampaignProcessorTest.php index ac76dffb..a97c48f9 100644 --- a/tests/Unit/Domain/Messaging/Service/Processor/CampaignProcessorTest.php +++ b/tests/Unit/Domain/Messaging/Service/Processor/CampaignProcessorTest.php @@ -9,6 +9,7 @@ use PhpList\Core\Domain\Messaging\Model\Message; use PhpList\Core\Domain\Messaging\Model\Message\MessageContent; use PhpList\Core\Domain\Messaging\Model\Message\MessageMetadata; +use PhpList\Core\Domain\Messaging\Repository\UserMessageRepository; use PhpList\Core\Domain\Messaging\Service\MessageProcessingPreparator; use PhpList\Core\Domain\Messaging\Service\Processor\CampaignProcessor; use PhpList\Core\Domain\Messaging\Service\SendRateLimiter; @@ -31,6 +32,7 @@ class CampaignProcessorTest extends TestCase private OutputInterface&MockObject $output; private CampaignProcessor $campaignProcessor; private SendRateLimiter&MockObject $rateLimiter; + private UserMessageRepository&MockObject $userMessageRepository; protected function setUp(): void { @@ -41,16 +43,18 @@ protected function setUp(): void $this->logger = $this->createMock(LoggerInterface::class); $this->output = $this->createMock(OutputInterface::class); $this->rateLimiter = $this->createMock(SendRateLimiter::class); + $this->userMessageRepository = $this->createMock(UserMessageRepository::class); $this->rateLimiter->method('awaitTurn'); $this->rateLimiter->method('afterSend'); $this->campaignProcessor = new CampaignProcessor( - $this->mailer, - $this->entityManager, - $this->subscriberProvider, - $this->messagePreparator, - $this->logger, - $this->rateLimiter + mailer: $this->mailer, + entityManager: $this->entityManager, + subscriberProvider: $this->subscriberProvider, + messagePreparator: $this->messagePreparator, + logger: $this->logger, + rateLimiter: $this->rateLimiter, + userMessageRepository: $this->userMessageRepository, ); } From 9e1ad9c873c91a55a9c1fd69387bc8bd4872abff Mon Sep 17 00:00:00 2001 From: Tatevik Date: Fri, 12 Sep 2025 13:29:59 +0400 Subject: [PATCH 08/13] Refactor --- src/Domain/Common/IspRestrictionsProvider.php | 116 +++++++++++++----- .../Service/Processor/CampaignProcessor.php | 3 +- 2 files changed, 89 insertions(+), 30 deletions(-) diff --git a/src/Domain/Common/IspRestrictionsProvider.php b/src/Domain/Common/IspRestrictionsProvider.php index 0e4bb21a..dd3f5ef1 100644 --- a/src/Domain/Common/IspRestrictionsProvider.php +++ b/src/Domain/Common/IspRestrictionsProvider.php @@ -16,52 +16,114 @@ public function __construct( public function load(): IspRestrictions { - if (!is_file($this->confPath) || !is_readable($this->confPath)) { + $contents = $this->readConfigFile(); + if ($contents === null) { return new IspRestrictions(null, null, null); } + [$raw, $maxBatch, $minBatchPeriod, $lockFile] = $this->parseContents($contents); + + $this->logIfDetected($maxBatch, $minBatchPeriod, $lockFile); + + return new IspRestrictions($maxBatch, $minBatchPeriod, $lockFile, $raw); + } + + private function readConfigFile(): ?string + { + if (!is_file($this->confPath) || !is_readable($this->confPath)) { + return null; + } $contents = file_get_contents($this->confPath); if ($contents === false) { $this->logger->warning('Cannot read ISP restrictions file', ['path' => $this->confPath]); - return new IspRestrictions(null, null, null); + return null; } + return $contents; + } + /** + * @return array{0: array, 1: ?int, 2: ?int, 3: ?string} + */ + private function parseContents(string $contents): array + { $maxBatch = null; $minBatchPeriod = null; $lockFile = null; - $raw = []; + foreach (preg_split('/\R/', $contents) as $line) { - $line = trim($line); - if ($line === '' || str_starts_with($line, '#') || str_starts_with($line, ';')) { - continue; - } - $parts = explode('=', $line, 2); - if (\count($parts) !== 2) { + [$key, $val] = $this->parseLine($line); + if ($key === null) { continue; } - [$key, $val] = array_map('trim', $parts); $raw[$key] = $val; + [$maxBatch, $minBatchPeriod, $lockFile] = $this->applyKeyValue( + $key, + $val, + $maxBatch, + $minBatchPeriod, + $lockFile + ); + } - switch ($key) { - case 'maxbatch': - if ($val !== '' && ctype_digit($val)) { - $maxBatch = (int) $val; - } - break; - case 'minbatchperiod': - if ($val !== '' && ctype_digit($val)) { - $minBatchPeriod = (int) $val; - } - break; - case 'lockfile': - if ($val !== '') { - $lockFile = $val; - } - break; + return [$raw, $maxBatch, $minBatchPeriod, $lockFile]; + } + + /** + * @return array{0: ?string, 1: string} + */ + private function parseLine(string $line): array + { + $line = trim($line); + if ($line === '' || str_starts_with($line, '#') || str_starts_with($line, ';')) { + return [null, '']; + } + $parts = explode('=', $line, 2); + if (\count($parts) !== 2) { + return [null, '']; + } + + return array_map('trim', $parts); + } + + /** + * @param string $key + * @param string $val + * @param ?int $maxBatch + * @param ?int $minBatchPeriod + * @param ?string $lockFile + * @return array{0: ?int, 1: ?int, 2: ?string} + */ + private function applyKeyValue( + string $key, + string $val, + ?int $maxBatch, + ?int $minBatchPeriod, + ?string $lockFile + ): array { + if ($key === 'maxbatch') { + if ($val !== '' && ctype_digit($val)) { + $maxBatch = (int) $val; } + return [$maxBatch, $minBatchPeriod, $lockFile]; } + if ($key === 'minbatchperiod') { + if ($val !== '' && ctype_digit($val)) { + $minBatchPeriod = (int) $val; + } + return [$maxBatch, $minBatchPeriod, $lockFile]; + } + if ($key === 'lockfile') { + if ($val !== '') { + $lockFile = $val; + } + return [$maxBatch, $minBatchPeriod, $lockFile]; + } + return [$maxBatch, $minBatchPeriod, $lockFile]; + } + private function logIfDetected(?int $maxBatch, ?int $minBatchPeriod, ?string $lockFile): void + { if ($maxBatch !== null || $minBatchPeriod !== null || $lockFile !== null) { $this->logger->info('ISP restrictions detected', [ 'path' => $this->confPath, @@ -70,7 +132,5 @@ public function load(): IspRestrictions 'lockfile' => $lockFile, ]); } - - return new IspRestrictions($maxBatch, $minBatchPeriod, $lockFile, $raw); } } diff --git a/src/Domain/Messaging/Service/Processor/CampaignProcessor.php b/src/Domain/Messaging/Service/Processor/CampaignProcessor.php index 10844b1d..163e494f 100644 --- a/src/Domain/Messaging/Service/Processor/CampaignProcessor.php +++ b/src/Domain/Messaging/Service/Processor/CampaignProcessor.php @@ -61,8 +61,7 @@ public function process(Message $campaign, ?OutputInterface $output = null): voi $userMessage = $existing ?? new UserMessage($subscriber, $campaign); $userMessage->setStatus(UserMessageStatus::Active); - $this->entityManager->persist($userMessage); - $this->entityManager->flush(); + $this->userMessageRepository->save($userMessage); $this->rateLimiter->awaitTurn($output); From 2f911ca1d0f088c73c5e8ac457a761c57f93be93 Mon Sep 17 00:00:00 2001 From: Tatevik Date: Mon, 15 Sep 2025 10:43:53 +0400 Subject: [PATCH 09/13] RateLimitedCampaignMailer --- config/services/providers.yml | 6 +++ config/services/services.yml | 4 +- src/Domain/Common/IspRestrictionsProvider.php | 3 +- src/Domain/Common/Model/IspRestrictions.php | 3 +- .../Service/Processor/CampaignProcessor.php | 40 ++++++--------- .../Service/RateLimitedCampaignMailer.php | 42 ++++++++++++++++ .../Processor/CampaignProcessorTest.php | 49 ++++++++++--------- 7 files changed, 91 insertions(+), 56 deletions(-) create mode 100644 src/Domain/Messaging/Service/RateLimitedCampaignMailer.php diff --git a/config/services/providers.yml b/config/services/providers.yml index b1d90720..bb4524c3 100644 --- a/config/services/providers.yml +++ b/config/services/providers.yml @@ -6,3 +6,9 @@ services: PhpList\Core\Core\ConfigProvider: arguments: $config: '%app.config%' + + PhpList\Core\Domain\Common\IspRestrictionsProvider: + autowire: true + autoconfigure: true + arguments: + $confPath: '%app.phplist_isp_conf_path%' diff --git a/config/services/services.yml b/config/services/services.yml index 611d9afe..f6f3f6b9 100644 --- a/config/services/services.yml +++ b/config/services/services.yml @@ -52,11 +52,9 @@ services: autowire: true autoconfigure: true - PhpList\Core\Domain\Common\IspRestrictionsProvider: + PhpList\Core\Domain\Messaging\Service\RateLimitedCampaignMailer: autowire: true autoconfigure: true - arguments: - $confPath: '%app.phplist_isp_conf_path%' PhpList\Core\Domain\Messaging\Service\ConsecutiveBounceHandler: autowire: true diff --git a/src/Domain/Common/IspRestrictionsProvider.php b/src/Domain/Common/IspRestrictionsProvider.php index dd3f5ef1..4095f5ce 100644 --- a/src/Domain/Common/IspRestrictionsProvider.php +++ b/src/Domain/Common/IspRestrictionsProvider.php @@ -12,7 +12,8 @@ class IspRestrictionsProvider public function __construct( private readonly string $confPath, private readonly LoggerInterface $logger, - ) {} + ) { + } public function load(): IspRestrictions { diff --git a/src/Domain/Common/Model/IspRestrictions.php b/src/Domain/Common/Model/IspRestrictions.php index 08fd9149..c3fc56b4 100644 --- a/src/Domain/Common/Model/IspRestrictions.php +++ b/src/Domain/Common/Model/IspRestrictions.php @@ -11,8 +11,7 @@ public function __construct( public readonly ?int $minBatchPeriod, public readonly ?string $lockFile, public readonly array $raw = [], - ) - { + ) { } public function isEmpty(): bool diff --git a/src/Domain/Messaging/Service/Processor/CampaignProcessor.php b/src/Domain/Messaging/Service/Processor/CampaignProcessor.php index 163e494f..ce383516 100644 --- a/src/Domain/Messaging/Service/Processor/CampaignProcessor.php +++ b/src/Domain/Messaging/Service/Processor/CampaignProcessor.php @@ -8,33 +8,30 @@ use PhpList\Core\Domain\Messaging\Model\Message; use PhpList\Core\Domain\Messaging\Model\UserMessage; use PhpList\Core\Domain\Messaging\Model\Message\UserMessageStatus; +use PhpList\Core\Domain\Messaging\Model\Message\MessageStatus; use PhpList\Core\Domain\Messaging\Repository\UserMessageRepository; +use PhpList\Core\Domain\Messaging\Service\RateLimitedCampaignMailer; use PhpList\Core\Domain\Messaging\Service\MessageProcessingPreparator; -use PhpList\Core\Domain\Messaging\Service\SendRateLimiter; use PhpList\Core\Domain\Subscription\Service\Provider\SubscriberProvider; use Psr\Log\LoggerInterface; use Symfony\Component\Console\Output\OutputInterface; -use Symfony\Component\Mailer\MailerInterface; -use Symfony\Component\Mime\Email; use Throwable; class CampaignProcessor { - private MailerInterface $mailer; + private RateLimitedCampaignMailer $mailer; private EntityManagerInterface $entityManager; private SubscriberProvider $subscriberProvider; private MessageProcessingPreparator $messagePreparator; private LoggerInterface $logger; - private SendRateLimiter $rateLimiter; private UserMessageRepository $userMessageRepository; public function __construct( - MailerInterface $mailer, + RateLimitedCampaignMailer $mailer, EntityManagerInterface $entityManager, SubscriberProvider $subscriberProvider, MessageProcessingPreparator $messagePreparator, LoggerInterface $logger, - SendRateLimiter $rateLimiter, UserMessageRepository $userMessageRepository ) { $this->mailer = $mailer; @@ -42,20 +39,19 @@ public function __construct( $this->subscriberProvider = $subscriberProvider; $this->messagePreparator = $messagePreparator; $this->logger = $logger; - $this->rateLimiter = $rateLimiter; $this->userMessageRepository = $userMessageRepository; } public function process(Message $campaign, ?OutputInterface $output = null): void { - $this->updateMessageStatus($campaign, Message\MessageStatus::Prepared); + $this->updateMessageStatus($campaign, MessageStatus::Prepared); $subscribers = $this->subscriberProvider->getSubscribersForMessage($campaign); - $this->updateMessageStatus($campaign, Message\MessageStatus::InProcess); + $this->updateMessageStatus($campaign, MessageStatus::InProcess); foreach ($subscribers as $subscriber) { $existing = $this->userMessageRepository->findOneByUserAndMessage($subscriber, $campaign); - if ($existing && $existing->getStatus() !== UserMessageStatus::Todo->value) { + if ($existing && $existing->getStatus() !== UserMessageStatus::Todo) { continue; } @@ -63,26 +59,17 @@ public function process(Message $campaign, ?OutputInterface $output = null): voi $userMessage->setStatus(UserMessageStatus::Active); $this->userMessageRepository->save($userMessage); - $this->rateLimiter->awaitTurn($output); - if (!filter_var($subscriber->getEmail(), FILTER_VALIDATE_EMAIL)) { $this->updateUserMessageStatus($userMessage, UserMessageStatus::InvalidEmailAddress); continue; } - $this->messagePreparator->processMessageLinks($campaign, $subscriber->getId()); - - $email = (new Email()) - ->from('news@example.com') - ->to($subscriber->getEmail()) - ->subject($campaign->getContent()->getSubject()) - ->text($campaign->getContent()->getTextMessage()) - ->html($campaign->getContent()->getText()); + $processed = $this->messagePreparator->processMessageLinks($campaign, $subscriber->getId()); try { + $email = $this->mailer->composeEmail($processed, $subscriber); $this->mailer->send($email); $this->updateUserMessageStatus($userMessage, UserMessageStatus::Sent); - $this->rateLimiter->afterSend(); } catch (Throwable $e) { $this->updateUserMessageStatus($userMessage, UserMessageStatus::NotSent); $this->logger->error($e->getMessage(), [ @@ -93,17 +80,18 @@ public function process(Message $campaign, ?OutputInterface $output = null): voi } } - $this->updateMessageStatus($campaign, Message\MessageStatus::Sent); + $this->updateMessageStatus($campaign, MessageStatus::Sent); } - private function updateMessageStatus(Message $message, Message\MessageStatus $status): void + private function updateMessageStatus(Message $message, MessageStatus $status): void { $message->getMetadata()->setStatus($status); $this->entityManager->flush(); } - private function updateUserMessageStatus(UserMessage $userMessage, Message\UserMessageStatus $status): void + private function updateUserMessageStatus(UserMessage $userMessage, UserMessageStatus $status): void { $userMessage->setStatus($status); $this->entityManager->flush(); - }} + } +} diff --git a/src/Domain/Messaging/Service/RateLimitedCampaignMailer.php b/src/Domain/Messaging/Service/RateLimitedCampaignMailer.php new file mode 100644 index 00000000..b9cc4600 --- /dev/null +++ b/src/Domain/Messaging/Service/RateLimitedCampaignMailer.php @@ -0,0 +1,42 @@ +mailer = $mailer; + $this->limiter = $limiter; + } + + public function composeEmail(Message $processed, Subscriber $subscriber): Email + { + return (new Email()) + ->from('news@example.com') + ->to($subscriber->getEmail()) + ->subject($processed->getContent()->getSubject()) + ->text($processed->getContent()->getTextMessage()) + ->html($processed->getContent()->getText()); + } + + /** + * @throws TransportExceptionInterface + */ + public function send(Email $email): void + { + $this->limiter->awaitTurn(); + $this->mailer->send($email); + $this->limiter->afterSend(); + } +} diff --git a/tests/Unit/Domain/Messaging/Service/Processor/CampaignProcessorTest.php b/tests/Unit/Domain/Messaging/Service/Processor/CampaignProcessorTest.php index a97c48f9..124e0ecf 100644 --- a/tests/Unit/Domain/Messaging/Service/Processor/CampaignProcessorTest.php +++ b/tests/Unit/Domain/Messaging/Service/Processor/CampaignProcessorTest.php @@ -12,40 +12,35 @@ use PhpList\Core\Domain\Messaging\Repository\UserMessageRepository; use PhpList\Core\Domain\Messaging\Service\MessageProcessingPreparator; use PhpList\Core\Domain\Messaging\Service\Processor\CampaignProcessor; -use PhpList\Core\Domain\Messaging\Service\SendRateLimiter; +use PhpList\Core\Domain\Messaging\Service\RateLimitedCampaignMailer; use PhpList\Core\Domain\Subscription\Model\Subscriber; use PhpList\Core\Domain\Subscription\Service\Provider\SubscriberProvider; use PHPUnit\Framework\MockObject\MockObject; use PHPUnit\Framework\TestCase; use Psr\Log\LoggerInterface; use Symfony\Component\Console\Output\OutputInterface; -use Symfony\Component\Mailer\MailerInterface; use Symfony\Component\Mime\Email; class CampaignProcessorTest extends TestCase { - private MailerInterface&MockObject $mailer; - private EntityManagerInterface&MockObject $entityManager; - private SubscriberProvider&MockObject $subscriberProvider; - private MessageProcessingPreparator&MockObject $messagePreparator; - private LoggerInterface&MockObject $logger; - private OutputInterface&MockObject $output; + private RateLimitedCampaignMailer|MockObject $mailer; + private EntityManagerInterface|MockObject $entityManager; + private SubscriberProvider|MockObject $subscriberProvider; + private MessageProcessingPreparator|MockObject $messagePreparator; + private LoggerInterface|MockObject $logger; + private OutputInterface|MockObject $output; private CampaignProcessor $campaignProcessor; - private SendRateLimiter&MockObject $rateLimiter; - private UserMessageRepository&MockObject $userMessageRepository; + private UserMessageRepository|MockObject $userMessageRepository; protected function setUp(): void { - $this->mailer = $this->createMock(MailerInterface::class); + $this->mailer = $this->createMock(RateLimitedCampaignMailer::class); $this->entityManager = $this->createMock(EntityManagerInterface::class); $this->subscriberProvider = $this->createMock(SubscriberProvider::class); $this->messagePreparator = $this->createMock(MessageProcessingPreparator::class); $this->logger = $this->createMock(LoggerInterface::class); $this->output = $this->createMock(OutputInterface::class); - $this->rateLimiter = $this->createMock(SendRateLimiter::class); $this->userMessageRepository = $this->createMock(UserMessageRepository::class); - $this->rateLimiter->method('awaitTurn'); - $this->rateLimiter->method('afterSend'); $this->campaignProcessor = new CampaignProcessor( mailer: $this->mailer, @@ -53,7 +48,6 @@ protected function setUp(): void subscriberProvider: $this->subscriberProvider, messagePreparator: $this->messagePreparator, logger: $this->logger, - rateLimiter: $this->rateLimiter, userMessageRepository: $this->userMessageRepository, ); } @@ -131,16 +125,23 @@ public function testProcessWithValidSubscriberEmail(): void ->with($campaign, 1) ->willReturn($campaign); + $this->mailer->expects($this->once()) + ->method('composeEmail') + ->with($campaign, $subscriber) + ->willReturnCallback(function ($processed, $sub) use ($campaign, $subscriber) { + $this->assertSame($campaign, $processed); + $this->assertSame($subscriber, $sub); + return (new Email()) + ->from('news@example.com') + ->to('test@example.com') + ->subject('Test Subject') + ->text('Test text message') + ->html('

Test HTML message

'); + }); + $this->mailer->expects($this->once()) ->method('send') - ->with($this->callback(function (Email $email) { - $this->assertEquals('test@example.com', $email->getTo()[0]->getAddress()); - $this->assertEquals('news@example.com', $email->getFrom()[0]->getAddress()); - $this->assertEquals('Test Subject', $email->getSubject()); - $this->assertEquals('Test text message', $email->getTextBody()); - $this->assertEquals('

Test HTML message

', $email->getHtmlBody()); - return true; - })); + ->with($this->isInstanceOf(Email::class)); $metadata->expects($this->atLeastOnce()) ->method('setStatus'); @@ -281,7 +282,7 @@ public function testProcessWithNullOutput(): void /** * Creates a mock for the Message class with content */ - private function createCampaignMock(): Message&MockObject + private function createCampaignMock(): Message|MockObject { $campaign = $this->createMock(Message::class); $content = $this->createMock(MessageContent::class); From 58aa938a94c12425e1eeabddb7d89bd4624b9020 Mon Sep 17 00:00:00 2001 From: Tatevik Date: Mon, 15 Sep 2025 12:02:40 +0400 Subject: [PATCH 10/13] RateLimitedCampaignMailerTest --- .../Service/RateLimitedCampaignMailer.php | 12 +- .../Service/RateLimitedCampaignMailerTest.php | 134 ++++++++++++++++++ 2 files changed, 144 insertions(+), 2 deletions(-) create mode 100644 tests/Unit/Domain/Messaging/Service/RateLimitedCampaignMailerTest.php diff --git a/src/Domain/Messaging/Service/RateLimitedCampaignMailer.php b/src/Domain/Messaging/Service/RateLimitedCampaignMailer.php index b9cc4600..7691f970 100644 --- a/src/Domain/Messaging/Service/RateLimitedCampaignMailer.php +++ b/src/Domain/Messaging/Service/RateLimitedCampaignMailer.php @@ -22,8 +22,16 @@ public function __construct(MailerInterface $mailer, SendRateLimiter $limiter) public function composeEmail(Message $processed, Subscriber $subscriber): Email { - return (new Email()) - ->from('news@example.com') + $email = new Email(); + if ($processed->getOptions()->getFromField() !== '') { + $email->from($processed->getOptions()->getFromField()); + } + + if ($processed->getOptions()->getReplyTo() !== '') { + $email->replyTo($processed->getOptions()->getReplyTo()); + } + + return $email ->to($subscriber->getEmail()) ->subject($processed->getContent()->getSubject()) ->text($processed->getContent()->getTextMessage()) diff --git a/tests/Unit/Domain/Messaging/Service/RateLimitedCampaignMailerTest.php b/tests/Unit/Domain/Messaging/Service/RateLimitedCampaignMailerTest.php new file mode 100644 index 00000000..6e2011ff --- /dev/null +++ b/tests/Unit/Domain/Messaging/Service/RateLimitedCampaignMailerTest.php @@ -0,0 +1,134 @@ +mailer = $this->createMock(MailerInterface::class); + $this->limiter = $this->createMock(SendRateLimiter::class); + $this->sut = new RateLimitedCampaignMailer($this->mailer, $this->limiter); + } + + public function testComposeEmailSetsHeadersAndBody(): void + { + $message = $this->buildMessage( + subject: 'Subject', + textBody: 'Plain text', + htmlBody: '

HTML

', + from: 'from@example.com', + replyTo: 'reply@example.com' + ); + + $subscriber = new Subscriber(); + $this->setSubscriberEmail($subscriber, 'user@example.com'); + + $email = $this->sut->composeEmail($message, $subscriber); + + $this->assertInstanceOf(Email::class, $email); + $this->assertSame('user@example.com', $email->getTo()[0]->getAddress()); + $this->assertSame('Subject', $email->getSubject()); + $this->assertSame('from@example.com', $email->getFrom()[0]->getAddress()); + $this->assertSame('reply@example.com', $email->getReplyTo()[0]->getAddress()); + $this->assertSame('Plain text', $email->getTextBody()); + $this->assertSame('

HTML

', $email->getHtmlBody()); + } + + public function testComposeEmailWithoutOptionalHeaders(): void + { + $message = $this->buildMessage( + subject: 'No headers', + textBody: 'text', + htmlBody: 'h', + from: '', + replyTo: '' + ); + + $subscriber = new Subscriber(); + $this->setSubscriberEmail($subscriber, 'user2@example.com'); + + $email = $this->sut->composeEmail($message, $subscriber); + + $this->assertSame('user2@example.com', $email->getTo()[0]->getAddress()); + $this->assertSame('No headers', $email->getSubject()); + $this->assertSame([], $email->getFrom()); + $this->assertSame([], $email->getReplyTo()); + } + + public function testSendUsesLimiterAroundMailer(): void + { + $email = (new Email())->to('someone@example.com'); + + $this->limiter->expects($this->once())->method('awaitTurn'); + $this->mailer + ->expects($this->once()) + ->method('send') + ->with($this->isInstanceOf(Email::class)); + $this->limiter->expects($this->once())->method('afterSend'); + + $this->sut->send($email); + } + + private function buildMessage( + string $subject, + string $textBody, + string $htmlBody, + string $from, + string $replyTo + ): Message { + $content = new MessageContent( + subject: $subject, + text: $htmlBody, + textMessage: $textBody, + footer: null, + ); + $format = new MessageFormat( + htmlFormatted: true, + sendFormat: MessageFormat::FORMAT_HTML, + formatOptions: [MessageFormat::FORMAT_HTML] + ); + $schedule = new MessageSchedule( + repeatInterval: 0, + repeatUntil: null, + requeueInterval: 0, + requeueUntil: null, + embargo: null + ); + $metadata = new MessageMetadata(); + $options = new MessageOptions(fromField: $from, toField: '', replyTo: $replyTo); + + return new Message($format, $schedule, $metadata, $content, $options, null, null); + } + + /** + * Subscriber has no public setter for email, so we use reflection. + */ + private function setSubscriberEmail(Subscriber $subscriber, string $email): void + { + $ref = new ReflectionProperty($subscriber, 'email'); + $ref->setValue($subscriber, $email); + } +} From 13d6249e8b2c6757ce9d70f708e42260770c42e9 Mon Sep 17 00:00:00 2001 From: Tatevik Date: Mon, 15 Sep 2025 13:06:41 +0400 Subject: [PATCH 11/13] Rate limit initialized from history --- .../Repository/UserMessageRepository.php | 17 +++++++++++++++++ .../Messaging/Service/SendRateLimiter.php | 15 +++++++++++++++ .../Messaging/Service/SendRateLimiterTest.php | 8 ++++++-- 3 files changed, 38 insertions(+), 2 deletions(-) diff --git a/src/Domain/Messaging/Repository/UserMessageRepository.php b/src/Domain/Messaging/Repository/UserMessageRepository.php index 6fbdceac..e8268025 100644 --- a/src/Domain/Messaging/Repository/UserMessageRepository.php +++ b/src/Domain/Messaging/Repository/UserMessageRepository.php @@ -4,8 +4,10 @@ namespace PhpList\Core\Domain\Messaging\Repository; +use DateTimeInterface; use PhpList\Core\Domain\Common\Repository\AbstractRepository; use PhpList\Core\Domain\Messaging\Model\Message; +use PhpList\Core\Domain\Messaging\Model\Message\UserMessageStatus; use PhpList\Core\Domain\Messaging\Model\UserMessage; use PhpList\Core\Domain\Subscription\Model\Subscriber; @@ -15,4 +17,19 @@ public function findOneByUserAndMessage(Subscriber $subscriber, Message $campaig { return $this->findOneBy(['user' => $subscriber, 'message' => $campaign]); } + + /** + * Counts how many user messages have status "sent" since the given time. + */ + public function countSentSince(DateTimeInterface $since): int + { + $queryBuilder = $this->createQueryBuilder('um'); + $queryBuilder->select('COUNT(um)') + ->where('um.createdAt > :since') + ->andWhere('um.status = :status') + ->setParameter('since', $since) + ->setParameter('status', UserMessageStatus::Sent->value); + + return (int) $queryBuilder->getQuery()->getSingleScalarResult(); + } } diff --git a/src/Domain/Messaging/Service/SendRateLimiter.php b/src/Domain/Messaging/Service/SendRateLimiter.php index 8516657a..378b80d5 100644 --- a/src/Domain/Messaging/Service/SendRateLimiter.php +++ b/src/Domain/Messaging/Service/SendRateLimiter.php @@ -4,7 +4,10 @@ namespace PhpList\Core\Domain\Messaging\Service; +use DateInterval; +use DateTimeImmutable; use PhpList\Core\Domain\Common\IspRestrictionsProvider; +use PhpList\Core\Domain\Messaging\Repository\UserMessageRepository; use Symfony\Component\Console\Output\OutputInterface; /** @@ -18,9 +21,11 @@ class SendRateLimiter private int $throttleSec; private int $sentInBatch = 0; private float $batchStart = 0.0; + private bool $initializedFromHistory = false; public function __construct( private readonly IspRestrictionsProvider $ispRestrictionsProvider, + private readonly UserMessageRepository $userMessageRepository, private readonly ?int $mailqueueBatchSize = null, private readonly ?int $mailqueueBatchPeriod = null, private readonly ?int $mailqueueThrottle = null, @@ -51,6 +56,7 @@ private function initializeLimits(): void $this->sentInBatch = 0; $this->batchStart = microtime(true); + $this->initializedFromHistory = false; } /** @@ -59,6 +65,13 @@ private function initializeLimits(): void */ public function awaitTurn(?OutputInterface $output = null): bool { + if (!$this->initializedFromHistory && $this->batchSize > 0 && $this->batchPeriod > 0) { + $since = (new DateTimeImmutable())->sub(new DateInterval('PT' . $this->batchPeriod . 'S')); + $alreadySent = $this->userMessageRepository->countSentSince($since); + $this->sentInBatch = max($this->sentInBatch, $alreadySent); + $this->initializedFromHistory = true; + } + if ($this->batchSize > 0 && $this->batchPeriod > 0 && $this->sentInBatch >= $this->batchSize) { $elapsed = microtime(true) - $this->batchStart; $remaining = (int)ceil($this->batchPeriod - $elapsed); @@ -71,7 +84,9 @@ public function awaitTurn(?OutputInterface $output = null): bool } $this->batchStart = microtime(true); $this->sentInBatch = 0; + $this->initializedFromHistory = false; } + return true; } diff --git a/tests/Unit/Domain/Messaging/Service/SendRateLimiterTest.php b/tests/Unit/Domain/Messaging/Service/SendRateLimiterTest.php index f45b4c79..e9ba27c0 100644 --- a/tests/Unit/Domain/Messaging/Service/SendRateLimiterTest.php +++ b/tests/Unit/Domain/Messaging/Service/SendRateLimiterTest.php @@ -6,6 +6,7 @@ use PhpList\Core\Domain\Common\IspRestrictionsProvider; use PhpList\Core\Domain\Common\Model\IspRestrictions; +use PhpList\Core\Domain\Messaging\Repository\UserMessageRepository; use PhpList\Core\Domain\Messaging\Service\SendRateLimiter; use PHPUnit\Framework\MockObject\MockObject; use PHPUnit\Framework\TestCase; @@ -25,6 +26,7 @@ public function testInitializesLimitsFromConfigOnly(): void $this->ispProvider->method('load')->willReturn(new IspRestrictions(null, null, null)); $limiter = new SendRateLimiter( ispRestrictionsProvider: $this->ispProvider, + userMessageRepository: $this->createMock(UserMessageRepository::class), mailqueueBatchSize: 5, mailqueueBatchPeriod: 10, mailqueueThrottle: 2 @@ -40,7 +42,8 @@ public function testBatchLimitTriggersWaitMessageAndResetsCounters(): void { $this->ispProvider->method('load')->willReturn(new IspRestrictions(2, 1, null)); $limiter = new SendRateLimiter( - $this->ispProvider, + ispRestrictionsProvider: $this->ispProvider, + userMessageRepository: $this->createMock(UserMessageRepository::class), mailqueueBatchSize: 10, mailqueueBatchPeriod: 1, mailqueueThrottle: 0 @@ -66,7 +69,8 @@ public function testThrottleSleepsPerMessagePathIsCallable(): void { $this->ispProvider->method('load')->willReturn(new IspRestrictions(null, null, null)); $limiter = new SendRateLimiter( - $this->ispProvider, + ispRestrictionsProvider: $this->ispProvider, + userMessageRepository: $this->createMock(UserMessageRepository::class), mailqueueBatchSize: 0, mailqueueBatchPeriod: 0, mailqueueThrottle: 1 From c0e24183b1b8c236f16f2e1a54264b0abbd1251d Mon Sep 17 00:00:00 2001 From: Tatevik Date: Mon, 15 Sep 2025 13:54:22 +0400 Subject: [PATCH 12/13] Check maintenance mode --- .../Configuration/Service/Manager/ConfigManager.php | 6 ++++++ src/Domain/Messaging/Command/ProcessQueueCommand.php | 10 ++++++++++ .../Messaging/Command/ProcessQueueCommandTest.php | 4 +++- 3 files changed, 19 insertions(+), 1 deletion(-) diff --git a/src/Domain/Configuration/Service/Manager/ConfigManager.php b/src/Domain/Configuration/Service/Manager/ConfigManager.php index cae380be..1a9c356f 100644 --- a/src/Domain/Configuration/Service/Manager/ConfigManager.php +++ b/src/Domain/Configuration/Service/Manager/ConfigManager.php @@ -17,6 +17,12 @@ public function __construct(ConfigRepository $configRepository) $this->configRepository = $configRepository; } + public function inMaintenanceMode(): bool + { + $config = $this->getByItem('maintenancemode'); + return $config?->getValue() === '1'; + } + /** * Get a configuration item by its key */ diff --git a/src/Domain/Messaging/Command/ProcessQueueCommand.php b/src/Domain/Messaging/Command/ProcessQueueCommand.php index d8d38652..d2c7cbfa 100644 --- a/src/Domain/Messaging/Command/ProcessQueueCommand.php +++ b/src/Domain/Messaging/Command/ProcessQueueCommand.php @@ -5,6 +5,7 @@ namespace PhpList\Core\Domain\Messaging\Command; use DateTimeImmutable; +use PhpList\Core\Domain\Configuration\Service\Manager\ConfigManager; use PhpList\Core\Domain\Messaging\Model\Message\MessageStatus; use PhpList\Core\Domain\Messaging\Repository\MessageRepository; use PhpList\Core\Domain\Messaging\Service\MessageProcessingPreparator; @@ -26,18 +27,21 @@ class ProcessQueueCommand extends Command private LockFactory $lockFactory; private MessageProcessingPreparator $messagePreparator; private CampaignProcessor $campaignProcessor; + private ConfigManager $configManager; public function __construct( MessageRepository $messageRepository, LockFactory $lockFactory, MessageProcessingPreparator $messagePreparator, CampaignProcessor $campaignProcessor, + ConfigManager $configManager ) { parent::__construct(); $this->messageRepository = $messageRepository; $this->lockFactory = $lockFactory; $this->messagePreparator = $messagePreparator; $this->campaignProcessor = $campaignProcessor; + $this->configManager = $configManager; } /** @@ -52,6 +56,12 @@ protected function execute(InputInterface $input, OutputInterface $output): int return Command::FAILURE; } + if ($this->configManager->inMaintenanceMode()) { + $output->writeln('The system is in maintenance mode, stopping. Try again later.'); + + return Command::FAILURE; + } + try { $this->messagePreparator->ensureSubscribersHaveUuid($output); $this->messagePreparator->ensureCampaignsHaveUuid($output); diff --git a/tests/Unit/Domain/Messaging/Command/ProcessQueueCommandTest.php b/tests/Unit/Domain/Messaging/Command/ProcessQueueCommandTest.php index e8f370bb..d76f63c0 100644 --- a/tests/Unit/Domain/Messaging/Command/ProcessQueueCommandTest.php +++ b/tests/Unit/Domain/Messaging/Command/ProcessQueueCommandTest.php @@ -5,6 +5,7 @@ namespace PhpList\Core\Tests\Unit\Domain\Messaging\Command; use Exception; +use PhpList\Core\Domain\Configuration\Service\Manager\ConfigManager; use PhpList\Core\Domain\Messaging\Command\ProcessQueueCommand; use PhpList\Core\Domain\Messaging\Model\Message; use PhpList\Core\Domain\Messaging\Repository\MessageRepository; @@ -41,7 +42,8 @@ protected function setUp(): void $this->messageRepository, $lockFactory, $this->messageProcessingPreparator, - $this->campaignProcessor + $this->campaignProcessor, + $this->createMock(ConfigManager::class), ); $application = new Application(); From 17e46079a86d71dcdf9a9b932f43fc17e2a9fe21 Mon Sep 17 00:00:00 2001 From: Tatevik Date: Wed, 17 Sep 2025 11:05:15 +0400 Subject: [PATCH 13/13] Max processing time limiter --- config/parameters.yml.dist | 2 + config/services/services.yml | 7 + .../Messaging/Model/Message/MessageStatus.php | 2 +- .../Service/Handler/RequeueHandler.php | 60 +++++++ .../Service/MaxProcessTimeLimiter.php | 46 ++++++ .../Service/Processor/CampaignProcessor.php | 36 +++- .../Service/Provider/SubscriberProvider.php | 2 +- .../Service/Handler/RequeueHandlerTest.php | 155 ++++++++++++++++++ .../Service/MaxProcessTimeLimiterTest.php | 53 ++++++ .../Processor/CampaignProcessorTest.php | 4 + .../Provider/SubscriberProviderTest.php | 14 +- 11 files changed, 371 insertions(+), 10 deletions(-) create mode 100644 src/Domain/Messaging/Service/Handler/RequeueHandler.php create mode 100644 src/Domain/Messaging/Service/MaxProcessTimeLimiter.php create mode 100644 tests/Unit/Domain/Messaging/Service/Handler/RequeueHandlerTest.php create mode 100644 tests/Unit/Domain/Messaging/Service/MaxProcessTimeLimiterTest.php diff --git a/config/parameters.yml.dist b/config/parameters.yml.dist index e042fa4a..e34a7d2b 100644 --- a/config/parameters.yml.dist +++ b/config/parameters.yml.dist @@ -79,3 +79,5 @@ parameters: env(MAILQUEUE_BATCH_PERIOD): '5' messaging.mail_queue_throttle: '%%env(MAILQUEUE_THROTTLE)%%' env(MAILQUEUE_THROTTLE): '5' + messaging.max_process_time: '%%env(MESSAGING_MAX_PROCESS_TIME)%%' + env(MESSAGING_MAX_PROCESS_TIME): '600' diff --git a/config/services/services.yml b/config/services/services.yml index f6f3f6b9..1afd1fc5 100644 --- a/config/services/services.yml +++ b/config/services/services.yml @@ -120,6 +120,13 @@ services: arguments: - !tagged_iterator { tag: 'phplist.bounce_action_handler' } + PhpList\Core\Domain\Messaging\Service\MaxProcessTimeLimiter: + autowire: true + autoconfigure: true + arguments: + $maxSeconds: '%messaging.max_process_time%' + + PhpList\Core\Domain\Identity\Service\PermissionChecker: autowire: true autoconfigure: true diff --git a/src/Domain/Messaging/Model/Message/MessageStatus.php b/src/Domain/Messaging/Model/Message/MessageStatus.php index dfb0f2d7..90b7f987 100644 --- a/src/Domain/Messaging/Model/Message/MessageStatus.php +++ b/src/Domain/Messaging/Model/Message/MessageStatus.php @@ -25,7 +25,7 @@ public function allowedTransitions(): array self::Draft, self::Suspended => [self::Submitted], self::Submitted => [self::Prepared, self::InProcess], self::Prepared => [self::InProcess], - self::InProcess => [self::Sent, self::Suspended], + self::InProcess => [self::Sent, self::Suspended, self::Submitted], self::Requeued => [self::InProcess, self::Suspended], self::Sent => [], }; diff --git a/src/Domain/Messaging/Service/Handler/RequeueHandler.php b/src/Domain/Messaging/Service/Handler/RequeueHandler.php new file mode 100644 index 00000000..6a1d9d95 --- /dev/null +++ b/src/Domain/Messaging/Service/Handler/RequeueHandler.php @@ -0,0 +1,60 @@ +getSchedule(); + $interval = $schedule->getRequeueInterval() ?? 0; + $until = $schedule->getRequeueUntil(); + + if ($interval <= 0) { + return false; + } + $now = new DateTime(); + if ($until instanceof DateTime && $now > $until) { + return false; + } + + $embargoIsInFuture = $schedule->getEmbargo() instanceof DateTime && $schedule->getEmbargo() > new DateTime(); + $base = $embargoIsInFuture ? clone $schedule->getEmbargo() : new DateTime(); + $next = (clone $base)->add(new DateInterval('PT' . max(1, $interval) . 'M')); + if ($until instanceof DateTime && $next > $until) { + return false; + } + + $schedule->setEmbargo($next); + $campaign->setSchedule($schedule); + $campaign->getMetadata()->setStatus(MessageStatus::Submitted); + $this->entityManager->flush(); + + $output?->writeln(sprintf( + 'Requeued campaign; next embargo at %s', + $next->format(DateTime::ATOM) + )); + $this->logger->info('Campaign requeued with new embargo', [ + 'campaign_id' => $campaign->getId(), + 'embargo' => $next->format(DateTime::ATOM), + ]); + + return true; + } +} diff --git a/src/Domain/Messaging/Service/MaxProcessTimeLimiter.php b/src/Domain/Messaging/Service/MaxProcessTimeLimiter.php new file mode 100644 index 00000000..c5269aaa --- /dev/null +++ b/src/Domain/Messaging/Service/MaxProcessTimeLimiter.php @@ -0,0 +1,46 @@ +maxSeconds = $maxSeconds ?? 600; + } + + public function start(): void + { + $this->startedAt = microtime(true); + } + + public function shouldStop(?OutputInterface $output = null): bool + { + if ($this->maxSeconds <= 0) { + return false; + } + if ($this->startedAt <= 0.0) { + $this->start(); + } + $elapsed = microtime(true) - $this->startedAt; + if ($elapsed >= $this->maxSeconds) { + $this->logger->warning(sprintf('Reached max processing time of %d seconds', $this->maxSeconds)); + $output?->writeln('Reached max processing time; stopping cleanly.'); + + return true; + } + + return false; + } +} diff --git a/src/Domain/Messaging/Service/Processor/CampaignProcessor.php b/src/Domain/Messaging/Service/Processor/CampaignProcessor.php index ce383516..92313e28 100644 --- a/src/Domain/Messaging/Service/Processor/CampaignProcessor.php +++ b/src/Domain/Messaging/Service/Processor/CampaignProcessor.php @@ -10,13 +10,19 @@ use PhpList\Core\Domain\Messaging\Model\Message\UserMessageStatus; use PhpList\Core\Domain\Messaging\Model\Message\MessageStatus; use PhpList\Core\Domain\Messaging\Repository\UserMessageRepository; +use PhpList\Core\Domain\Messaging\Service\Handler\RequeueHandler; use PhpList\Core\Domain\Messaging\Service\RateLimitedCampaignMailer; +use PhpList\Core\Domain\Messaging\Service\MaxProcessTimeLimiter; use PhpList\Core\Domain\Messaging\Service\MessageProcessingPreparator; use PhpList\Core\Domain\Subscription\Service\Provider\SubscriberProvider; +use PhpList\Core\Domain\Subscription\Model\Subscriber; use Psr\Log\LoggerInterface; use Symfony\Component\Console\Output\OutputInterface; use Throwable; +/** + * @SuppressWarnings(PHPMD.CouplingBetweenObjects) + */ class CampaignProcessor { private RateLimitedCampaignMailer $mailer; @@ -25,6 +31,8 @@ class CampaignProcessor private MessageProcessingPreparator $messagePreparator; private LoggerInterface $logger; private UserMessageRepository $userMessageRepository; + private MaxProcessTimeLimiter $timeLimiter; + private RequeueHandler $requeueHandler; public function __construct( RateLimitedCampaignMailer $mailer, @@ -32,7 +40,9 @@ public function __construct( SubscriberProvider $subscriberProvider, MessageProcessingPreparator $messagePreparator, LoggerInterface $logger, - UserMessageRepository $userMessageRepository + UserMessageRepository $userMessageRepository, + MaxProcessTimeLimiter $timeLimiter, + RequeueHandler $requeueHandler ) { $this->mailer = $mailer; $this->entityManager = $entityManager; @@ -40,6 +50,8 @@ public function __construct( $this->messagePreparator = $messagePreparator; $this->logger = $logger; $this->userMessageRepository = $userMessageRepository; + $this->timeLimiter = $timeLimiter; + $this->requeueHandler = $requeueHandler; } public function process(Message $campaign, ?OutputInterface $output = null): void @@ -49,7 +61,15 @@ public function process(Message $campaign, ?OutputInterface $output = null): voi $this->updateMessageStatus($campaign, MessageStatus::InProcess); + $this->timeLimiter->start(); + $stoppedEarly = false; + foreach ($subscribers as $subscriber) { + if ($this->timeLimiter->shouldStop($output)) { + $stoppedEarly = true; + break; + } + $existing = $this->userMessageRepository->findOneByUserAndMessage($subscriber, $campaign); if ($existing && $existing->getStatus() !== UserMessageStatus::Todo) { continue; @@ -61,6 +81,8 @@ public function process(Message $campaign, ?OutputInterface $output = null): voi if (!filter_var($subscriber->getEmail(), FILTER_VALIDATE_EMAIL)) { $this->updateUserMessageStatus($userMessage, UserMessageStatus::InvalidEmailAddress); + $this->unconfirmSubscriber($subscriber); + $output?->writeln('Invalid email, marking unconfirmed: ' . $subscriber->getEmail()); continue; } @@ -80,9 +102,21 @@ public function process(Message $campaign, ?OutputInterface $output = null): voi } } + if ($stoppedEarly && $this->requeueHandler->handle($campaign, $output)) { + return; + } + $this->updateMessageStatus($campaign, MessageStatus::Sent); } + private function unconfirmSubscriber(Subscriber $subscriber): void + { + if ($subscriber->isConfirmed()) { + $subscriber->setConfirmed(false); + $this->entityManager->flush(); + } + } + private function updateMessageStatus(Message $message, MessageStatus $status): void { $message->getMetadata()->setStatus($status); diff --git a/src/Domain/Subscription/Service/Provider/SubscriberProvider.php b/src/Domain/Subscription/Service/Provider/SubscriberProvider.php index 5ec6b177..72b473be 100644 --- a/src/Domain/Subscription/Service/Provider/SubscriberProvider.php +++ b/src/Domain/Subscription/Service/Provider/SubscriberProvider.php @@ -36,7 +36,7 @@ public function getSubscribersForMessage(Message $message): array foreach ($lists as $list) { $listSubscribers = $this->subscriberRepository->getSubscribersBySubscribedListId($list->getId()); foreach ($listSubscribers as $subscriber) { - $subscribers[$subscriber->getId()] = $subscriber; + $subscribers[$subscriber->getEmail()] = $subscriber; } } diff --git a/tests/Unit/Domain/Messaging/Service/Handler/RequeueHandlerTest.php b/tests/Unit/Domain/Messaging/Service/Handler/RequeueHandlerTest.php new file mode 100644 index 00000000..5bfb1114 --- /dev/null +++ b/tests/Unit/Domain/Messaging/Service/Handler/RequeueHandlerTest.php @@ -0,0 +1,155 @@ +logger = $this->createMock(LoggerInterface::class); + $this->em = $this->createMock(EntityManagerInterface::class); + $this->output = $this->createMock(OutputInterface::class); + } + + private function createMessage( + ?int $requeueInterval, + ?DateTime $requeueUntil, + ?DateTime $embargo + ): Message { + $format = new MessageFormat(htmlFormatted: false, sendFormat: null); + $schedule = new MessageSchedule( + repeatInterval: null, + repeatUntil: null, + requeueInterval: $requeueInterval, + requeueUntil: $requeueUntil, + embargo: $embargo + ); + $metadata = new MessageMetadata(MessageStatus::Draft); + $content = new MessageContent('(no subject)'); + $options = new MessageOptions(); + + return new Message($format, $schedule, $metadata, $content, $options, owner: null, template: null); + } + + public function testReturnsFalseWhenIntervalIsZeroOrNegative(): void + { + $handler = new RequeueHandler($this->logger, $this->em); + $message = $this->createMessage(0, null, null); + + $this->em->expects($this->never())->method('flush'); + $this->output->expects($this->never())->method('writeln'); + $this->logger->expects($this->never())->method('info'); + + $result = $handler->handle($message, $this->output); + + $this->assertFalse($result); + $this->assertSame(MessageStatus::Draft, $message->getMetadata()->getStatus()); + } + + public function testReturnsFalseWhenNowIsAfterRequeueUntil(): void + { + $handler = new RequeueHandler($this->logger, $this->em); + $past = (new DateTime())->sub(new DateInterval('PT5M')); + $message = $this->createMessage(5, $past, null); + + $this->em->expects($this->never())->method('flush'); + $this->logger->expects($this->never())->method('info'); + + $result = $handler->handle($message, $this->output); + + $this->assertFalse($result); + $this->assertSame(MessageStatus::Draft, $message->getMetadata()->getStatus()); + } + + public function testRequeuesFromFutureEmbargoAndSetsSubmittedStatus(): void + { + $handler = new RequeueHandler($this->logger, $this->em); + $embargo = (new DateTime())->add(new DateInterval('PT5M')); + $interval = 10; + $message = $this->createMessage($interval, null, $embargo); + + $this->em->expects($this->once())->method('flush'); + $this->output->expects($this->once())->method('writeln'); + $this->logger->expects($this->once())->method('info'); + + $result = $handler->handle($message, $this->output); + + $this->assertTrue($result); + $this->assertSame(MessageStatus::Submitted, $message->getMetadata()->getStatus()); + + $expectedNext = (clone $embargo)->add(new DateInterval('PT' . $interval . 'M')); + $actualNext = $message->getSchedule()->getEmbargo(); + $this->assertInstanceOf(DateTime::class, $actualNext); + $this->assertEquals($expectedNext->format(DateTime::ATOM), $actualNext->format(DateTime::ATOM)); + } + + public function testRequeuesFromNowWhenEmbargoIsNullOrPast(): void + { + $handler = new RequeueHandler($this->logger, $this->em); + $interval = 3; + $message = $this->createMessage($interval, null, null); + + $this->em->expects($this->once())->method('flush'); + $this->logger->expects($this->once())->method('info'); + + $before = new DateTime(); + $result = $handler->handle($message, $this->output); + $after = new DateTime(); + + $this->assertTrue($result); + $this->assertSame(MessageStatus::Submitted, $message->getMetadata()->getStatus()); + + $embargo = $message->getSchedule()->getEmbargo(); + $this->assertInstanceOf(DateTime::class, $embargo); + + $minExpected = (clone $before)->add(new DateInterval('PT' . $interval . 'M')); + $maxExpected = (clone $after)->add(new DateInterval('PT' . $interval . 'M')); + + $this->assertGreaterThanOrEqual($minExpected->getTimestamp(), $embargo->getTimestamp()); + $this->assertLessThanOrEqual($maxExpected->getTimestamp(), $embargo->getTimestamp()); + } + + public function testReturnsFalseWhenNextEmbargoExceedsUntil(): void + { + $handler = new RequeueHandler($this->logger, $this->em); + $embargo = (new DateTime())->add(new DateInterval('PT1M')); + $interval = 10; + // next would be +10, which exceeds until + $until = (clone $embargo)->add(new DateInterval('PT5M')); + $message = $this->createMessage($interval, $until, $embargo); + + $this->em->expects($this->never())->method('flush'); + $this->logger->expects($this->never())->method('info'); + + $result = $handler->handle($message, $this->output); + + $this->assertFalse($result); + $this->assertSame(MessageStatus::Draft, $message->getMetadata()->getStatus()); + $this->assertEquals( + $embargo->format(DateTime::ATOM), + $message->getSchedule()->getEmbargo()?->format(DateTime::ATOM) + ); + } +} diff --git a/tests/Unit/Domain/Messaging/Service/MaxProcessTimeLimiterTest.php b/tests/Unit/Domain/Messaging/Service/MaxProcessTimeLimiterTest.php new file mode 100644 index 00000000..5944ca3e --- /dev/null +++ b/tests/Unit/Domain/Messaging/Service/MaxProcessTimeLimiterTest.php @@ -0,0 +1,53 @@ +logger = $this->createMock(LoggerInterface::class); + } + + public function testShouldNotStopWhenMaxSecondsIsZero(): void + { + $limiter = new MaxProcessTimeLimiter(logger: $this->logger, maxSeconds: 0); + + $output = $this->createMock(OutputInterface::class); + $output->expects($this->never())->method('writeln'); + $this->logger->expects($this->never())->method('warning'); + + $limiter->start(); + usleep(200_000); + $this->assertFalse($limiter->shouldStop($output)); + } + + public function testShouldStopAfterThresholdAndLogAndOutput(): void + { + $limiter = new MaxProcessTimeLimiter(logger: $this->logger, maxSeconds: 1); + + $output = $this->createMock(OutputInterface::class); + $output->expects($this->once()) + ->method('writeln') + ->with('Reached max processing time; stopping cleanly.'); + + $this->logger->expects($this->once()) + ->method('warning') + ->with($this->stringContains('Reached max processing time of 1 seconds')); + + $this->assertFalse($limiter->shouldStop($output)); + + usleep(1_200_000); + $this->assertTrue($limiter->shouldStop($output)); + } +} diff --git a/tests/Unit/Domain/Messaging/Service/Processor/CampaignProcessorTest.php b/tests/Unit/Domain/Messaging/Service/Processor/CampaignProcessorTest.php index 124e0ecf..26aec09f 100644 --- a/tests/Unit/Domain/Messaging/Service/Processor/CampaignProcessorTest.php +++ b/tests/Unit/Domain/Messaging/Service/Processor/CampaignProcessorTest.php @@ -10,6 +10,8 @@ use PhpList\Core\Domain\Messaging\Model\Message\MessageContent; use PhpList\Core\Domain\Messaging\Model\Message\MessageMetadata; use PhpList\Core\Domain\Messaging\Repository\UserMessageRepository; +use PhpList\Core\Domain\Messaging\Service\Handler\RequeueHandler; +use PhpList\Core\Domain\Messaging\Service\MaxProcessTimeLimiter; use PhpList\Core\Domain\Messaging\Service\MessageProcessingPreparator; use PhpList\Core\Domain\Messaging\Service\Processor\CampaignProcessor; use PhpList\Core\Domain\Messaging\Service\RateLimitedCampaignMailer; @@ -49,6 +51,8 @@ protected function setUp(): void messagePreparator: $this->messagePreparator, logger: $this->logger, userMessageRepository: $this->userMessageRepository, + timeLimiter: $this->createMock(MaxProcessTimeLimiter::class), + requeueHandler: $this->createMock(RequeueHandler::class), ); } diff --git a/tests/Unit/Domain/Subscription/Service/Provider/SubscriberProviderTest.php b/tests/Unit/Domain/Subscription/Service/Provider/SubscriberProviderTest.php index 6adbde10..9efdeac2 100644 --- a/tests/Unit/Domain/Subscription/Service/Provider/SubscriberProviderTest.php +++ b/tests/Unit/Domain/Subscription/Service/Provider/SubscriberProviderTest.php @@ -26,8 +26,8 @@ protected function setUp(): void $this->subscriberListRepository = $this->createMock(SubscriberListRepository::class); $this->subscriberProvider = new SubscriberProvider( - $this->subscriberRepository, - $this->subscriberListRepository, + subscriberRepository: $this->subscriberRepository, + subscriberListRepository: $this->subscriberListRepository, ); } @@ -82,9 +82,9 @@ public function testGetSubscribersForMessageWithOneListAndSubscribersReturnsSubs ->willReturn([$subscriberList]); $subscriber1 = $this->createMock(Subscriber::class); - $subscriber1->method('getId')->willReturn(1); + $subscriber1->method('getEmail')->willReturn('test1@example.am'); $subscriber2 = $this->createMock(Subscriber::class); - $subscriber2->method('getId')->willReturn(2); + $subscriber2->method('getEmail')->willReturn('test2@exsmple.am'); $this->subscriberRepository ->expects($this->once()) @@ -114,11 +114,11 @@ public function testGetSubscribersForMessageWithMultipleListsReturnsUniqueSubscr ->willReturn([$subscriberList1, $subscriberList2]); $subscriber1 = $this->createMock(Subscriber::class); - $subscriber1->method('getId')->willReturn(1); + $subscriber1->method('getEmail')->willReturn('test1@example.am'); $subscriber2 = $this->createMock(Subscriber::class); - $subscriber2->method('getId')->willReturn(2); + $subscriber2->method('getEmail')->willReturn('test2@example.am'); $subscriber3 = $this->createMock(Subscriber::class); - $subscriber3->method('getId')->willReturn(3); + $subscriber3->method('getEmail')->willReturn('test3@example.am'); $this->subscriberRepository ->expects($this->exactly(2))