Skip to content

Commit 587d524

Browse files
committed
Max processing time limiter
1 parent c0e2418 commit 587d524

File tree

11 files changed

+372
-10
lines changed

11 files changed

+372
-10
lines changed

config/parameters.yml.dist

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,3 +79,5 @@ parameters:
7979
env(MAILQUEUE_BATCH_PERIOD): '5'
8080
messaging.mail_queue_throttle: '%%env(MAILQUEUE_THROTTLE)%%'
8181
env(MAILQUEUE_THROTTLE): '5'
82+
messaging.max_process_time: '%%env(MESSAGING_MAX_PROCESS_TIME)%%'
83+
env(MESSAGING_MAX_PROCESS_TIME): '600'

config/services/services.yml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,13 @@ services:
120120
arguments:
121121
- !tagged_iterator { tag: 'phplist.bounce_action_handler' }
122122

123+
PhpList\Core\Domain\Messaging\Service\MaxProcessTimeLimiter:
124+
autowire: true
125+
autoconfigure: true
126+
arguments:
127+
$maxSeconds: '%messaging.max_process_time%'
128+
129+
123130
PhpList\Core\Domain\Identity\Service\PermissionChecker:
124131
autowire: true
125132
autoconfigure: true

src/Domain/Messaging/Model/Message/MessageStatus.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ public function allowedTransitions(): array
2525
self::Draft, self::Suspended => [self::Submitted],
2626
self::Submitted => [self::Prepared, self::InProcess],
2727
self::Prepared => [self::InProcess],
28-
self::InProcess => [self::Sent, self::Suspended],
28+
self::InProcess => [self::Sent, self::Suspended, self::Submitted],
2929
self::Requeued => [self::InProcess, self::Suspended],
3030
self::Sent => [],
3131
};
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace PhpList\Core\Domain\Messaging\Service\Handler;
6+
7+
use DateInterval;
8+
use DateTime;
9+
use Doctrine\ORM\EntityManagerInterface;
10+
use PhpList\Core\Domain\Messaging\Model\Message;
11+
use PhpList\Core\Domain\Messaging\Model\Message\MessageStatus;
12+
use Psr\Log\LoggerInterface;
13+
use Symfony\Component\Console\Output\OutputInterface;
14+
15+
class RequeueHandler
16+
{
17+
public function __construct(
18+
private readonly LoggerInterface $logger,
19+
private readonly EntityManagerInterface $entityManager
20+
) {
21+
}
22+
23+
public function handle(Message $campaign, ?OutputInterface $output = null): bool
24+
{
25+
$schedule = $campaign->getSchedule();
26+
$interval = $schedule->getRequeueInterval() ?? 0;
27+
$until = $schedule->getRequeueUntil();
28+
29+
if ($interval <= 0) {
30+
return false;
31+
}
32+
$now = new DateTime();
33+
if ($until instanceof DateTime && $now > $until) {
34+
return false;
35+
}
36+
37+
$base = $schedule->getEmbargo() instanceof DateTime && $schedule->getEmbargo() > new DateTime()
38+
? clone $schedule->getEmbargo()
39+
: new DateTime();
40+
$next = (clone $base)->add(new DateInterval('PT' . max(1, $interval) . 'M'));
41+
if ($until instanceof DateTime && $next > $until) {
42+
return false;
43+
}
44+
45+
$schedule->setEmbargo($next);
46+
$campaign->setSchedule($schedule);
47+
$campaign->getMetadata()->setStatus(MessageStatus::Submitted);
48+
$this->entityManager->flush();
49+
50+
$output?->writeln(sprintf(
51+
'Requeued campaign; next embargo at %s',
52+
$next->format(DateTime::ATOM)
53+
));
54+
$this->logger->info('Campaign requeued with new embargo', [
55+
'campaign_id' => $campaign->getId(),
56+
'embargo' => $next->format(DateTime::ATOM),
57+
]);
58+
59+
return true;
60+
}
61+
62+
}
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace PhpList\Core\Domain\Messaging\Service;
6+
7+
use Psr\Log\LoggerInterface;
8+
use Symfony\Component\Console\Output\OutputInterface;
9+
10+
/**
11+
* Limits the total processing time of a long-running operation.
12+
*/
13+
class MaxProcessTimeLimiter
14+
{
15+
private float $startedAt = 0.0;
16+
private int $maxSeconds;
17+
18+
public function __construct(private readonly LoggerInterface $logger, ?int $maxSeconds = null)
19+
{
20+
$this->maxSeconds = $maxSeconds ?? 600;
21+
}
22+
23+
public function start(): void
24+
{
25+
$this->startedAt = microtime(true);
26+
}
27+
28+
public function shouldStop(?OutputInterface $output = null): bool
29+
{
30+
if ($this->maxSeconds <= 0) {
31+
return false;
32+
}
33+
if ($this->startedAt <= 0.0) {
34+
$this->start();
35+
}
36+
$elapsed = microtime(true) - $this->startedAt;
37+
if ($elapsed >= $this->maxSeconds) {
38+
$this->logger->warning(sprintf('Reached max processing time of %d seconds', $this->maxSeconds));
39+
$output?->writeln('Reached max processing time; stopping cleanly.');
40+
41+
return true;
42+
}
43+
44+
return false;
45+
}
46+
}

src/Domain/Messaging/Service/Processor/CampaignProcessor.php

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,13 +10,19 @@
1010
use PhpList\Core\Domain\Messaging\Model\Message\UserMessageStatus;
1111
use PhpList\Core\Domain\Messaging\Model\Message\MessageStatus;
1212
use PhpList\Core\Domain\Messaging\Repository\UserMessageRepository;
13+
use PhpList\Core\Domain\Messaging\Service\Handler\RequeueHandler;
1314
use PhpList\Core\Domain\Messaging\Service\RateLimitedCampaignMailer;
15+
use PhpList\Core\Domain\Messaging\Service\MaxProcessTimeLimiter;
1416
use PhpList\Core\Domain\Messaging\Service\MessageProcessingPreparator;
1517
use PhpList\Core\Domain\Subscription\Service\Provider\SubscriberProvider;
18+
use PhpList\Core\Domain\Subscription\Model\Subscriber;
1619
use Psr\Log\LoggerInterface;
1720
use Symfony\Component\Console\Output\OutputInterface;
1821
use Throwable;
1922

23+
/**
24+
* @SuppressWarnings(PHPMD.CouplingBetweenObjects)
25+
*/
2026
class CampaignProcessor
2127
{
2228
private RateLimitedCampaignMailer $mailer;
@@ -25,21 +31,27 @@ class CampaignProcessor
2531
private MessageProcessingPreparator $messagePreparator;
2632
private LoggerInterface $logger;
2733
private UserMessageRepository $userMessageRepository;
34+
private MaxProcessTimeLimiter $timeLimiter;
35+
private RequeueHandler $requeueHandler;
2836

2937
public function __construct(
3038
RateLimitedCampaignMailer $mailer,
3139
EntityManagerInterface $entityManager,
3240
SubscriberProvider $subscriberProvider,
3341
MessageProcessingPreparator $messagePreparator,
3442
LoggerInterface $logger,
35-
UserMessageRepository $userMessageRepository
43+
UserMessageRepository $userMessageRepository,
44+
MaxProcessTimeLimiter $timeLimiter,
45+
RequeueHandler $requeueHandler
3646
) {
3747
$this->mailer = $mailer;
3848
$this->entityManager = $entityManager;
3949
$this->subscriberProvider = $subscriberProvider;
4050
$this->messagePreparator = $messagePreparator;
4151
$this->logger = $logger;
4252
$this->userMessageRepository = $userMessageRepository;
53+
$this->timeLimiter = $timeLimiter;
54+
$this->requeueHandler = $requeueHandler;
4355
}
4456

4557
public function process(Message $campaign, ?OutputInterface $output = null): void
@@ -49,7 +61,15 @@ public function process(Message $campaign, ?OutputInterface $output = null): voi
4961

5062
$this->updateMessageStatus($campaign, MessageStatus::InProcess);
5163

64+
$this->timeLimiter->start();
65+
$stoppedEarly = false;
66+
5267
foreach ($subscribers as $subscriber) {
68+
if ($this->timeLimiter->shouldStop($output)) {
69+
$stoppedEarly = true;
70+
break;
71+
}
72+
5373
$existing = $this->userMessageRepository->findOneByUserAndMessage($subscriber, $campaign);
5474
if ($existing && $existing->getStatus() !== UserMessageStatus::Todo) {
5575
continue;
@@ -61,6 +81,8 @@ public function process(Message $campaign, ?OutputInterface $output = null): voi
6181

6282
if (!filter_var($subscriber->getEmail(), FILTER_VALIDATE_EMAIL)) {
6383
$this->updateUserMessageStatus($userMessage, UserMessageStatus::InvalidEmailAddress);
84+
$this->unconfirmSubscriber($subscriber);
85+
$output?->writeln('Invalid email, marking unconfirmed: ' . $subscriber->getEmail());
6486
continue;
6587
}
6688

@@ -80,9 +102,21 @@ public function process(Message $campaign, ?OutputInterface $output = null): voi
80102
}
81103
}
82104

105+
if ($stoppedEarly && $this->requeueHandler->handle($campaign, $output)) {
106+
return;
107+
}
108+
83109
$this->updateMessageStatus($campaign, MessageStatus::Sent);
84110
}
85111

112+
private function unconfirmSubscriber(Subscriber $subscriber): void
113+
{
114+
if ($subscriber->isConfirmed()) {
115+
$subscriber->setConfirmed(false);
116+
$this->entityManager->flush();
117+
}
118+
}
119+
86120
private function updateMessageStatus(Message $message, MessageStatus $status): void
87121
{
88122
$message->getMetadata()->setStatus($status);

src/Domain/Subscription/Service/Provider/SubscriberProvider.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ public function getSubscribersForMessage(Message $message): array
3636
foreach ($lists as $list) {
3737
$listSubscribers = $this->subscriberRepository->getSubscribersBySubscribedListId($list->getId());
3838
foreach ($listSubscribers as $subscriber) {
39-
$subscribers[$subscriber->getId()] = $subscriber;
39+
$subscribers[$subscriber->getEmail()] = $subscriber;
4040
}
4141
}
4242

Lines changed: 154 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,154 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace PhpList\Core\Tests\Unit\Domain\Messaging\Service\Handler;
6+
7+
use DateInterval;
8+
use DateTime;
9+
use Doctrine\ORM\EntityManagerInterface;
10+
use PhpList\Core\Domain\Messaging\Model\Message;
11+
use PhpList\Core\Domain\Messaging\Model\Message\MessageContent;
12+
use PhpList\Core\Domain\Messaging\Model\Message\MessageFormat;
13+
use PhpList\Core\Domain\Messaging\Model\Message\MessageMetadata;
14+
use PhpList\Core\Domain\Messaging\Model\Message\MessageOptions;
15+
use PhpList\Core\Domain\Messaging\Model\Message\MessageSchedule;
16+
use PhpList\Core\Domain\Messaging\Model\Message\MessageStatus;
17+
use PhpList\Core\Domain\Messaging\Service\Handler\RequeueHandler;
18+
use PHPUnit\Framework\MockObject\MockObject;
19+
use PHPUnit\Framework\TestCase;
20+
use Psr\Log\LoggerInterface;
21+
use Symfony\Component\Console\Output\OutputInterface;
22+
23+
class RequeueHandlerTest extends TestCase
24+
{
25+
private LoggerInterface&MockObject $logger;
26+
private EntityManagerInterface&MockObject $em;
27+
private OutputInterface&MockObject $output;
28+
29+
protected function setUp(): void
30+
{
31+
$this->logger = $this->createMock(LoggerInterface::class);
32+
$this->em = $this->createMock(EntityManagerInterface::class);
33+
$this->output = $this->createMock(OutputInterface::class);
34+
}
35+
36+
private function createMessage(
37+
?int $requeueInterval,
38+
?DateTime $requeueUntil,
39+
?DateTime $embargo
40+
): Message {
41+
$format = new MessageFormat(htmlFormatted: false, sendFormat: null);
42+
$schedule = new MessageSchedule(
43+
repeatInterval: null,
44+
repeatUntil: null,
45+
requeueInterval: $requeueInterval,
46+
requeueUntil: $requeueUntil,
47+
embargo: $embargo
48+
);
49+
$metadata = new MessageMetadata(MessageStatus::Draft);
50+
$content = new MessageContent('(no subject)');
51+
$options = new MessageOptions();
52+
53+
return new Message($format, $schedule, $metadata, $content, $options, owner: null, template: null);
54+
}
55+
56+
public function testReturnsFalseWhenIntervalIsZeroOrNegative(): void
57+
{
58+
$handler = new RequeueHandler($this->logger, $this->em);
59+
$message = $this->createMessage(0, null, null);
60+
61+
$this->em->expects($this->never())->method('flush');
62+
$this->output->expects($this->never())->method('writeln');
63+
$this->logger->expects($this->never())->method('info');
64+
65+
$result = $handler->handle($message, $this->output);
66+
67+
$this->assertFalse($result);
68+
$this->assertSame(MessageStatus::Draft, $message->getMetadata()->getStatus());
69+
}
70+
71+
public function testReturnsFalseWhenNowIsAfterRequeueUntil(): void
72+
{
73+
$handler = new RequeueHandler($this->logger, $this->em);
74+
$past = (new DateTime())->sub(new DateInterval('PT5M'));
75+
$message = $this->createMessage(5, $past, null);
76+
77+
$this->em->expects($this->never())->method('flush');
78+
$this->logger->expects($this->never())->method('info');
79+
80+
$result = $handler->handle($message, $this->output);
81+
82+
$this->assertFalse($result);
83+
$this->assertSame(MessageStatus::Draft, $message->getMetadata()->getStatus());
84+
}
85+
86+
public function testRequeuesFromFutureEmbargoAndSetsSubmittedStatus(): void
87+
{
88+
$handler = new RequeueHandler($this->logger, $this->em);
89+
$embargo = (new DateTime())->add(new DateInterval('PT5M'));
90+
$interval = 10;
91+
$message = $this->createMessage($interval, null, $embargo);
92+
93+
$this->em->expects($this->once())->method('flush');
94+
$this->output->expects($this->once())->method('writeln');
95+
$this->logger->expects($this->once())->method('info');
96+
97+
$result = $handler->handle($message, $this->output);
98+
99+
$this->assertTrue($result);
100+
$this->assertSame(MessageStatus::Submitted, $message->getMetadata()->getStatus());
101+
102+
$expectedNext = (clone $embargo)->add(new DateInterval('PT' . $interval . 'M'));
103+
$actualNext = $message->getSchedule()->getEmbargo();
104+
$this->assertInstanceOf(DateTime::class, $actualNext);
105+
$this->assertEquals($expectedNext->format(DateTime::ATOM), $actualNext->format(DateTime::ATOM));
106+
}
107+
108+
public function testRequeuesFromNowWhenEmbargoIsNullOrPast(): void
109+
{
110+
$handler = new RequeueHandler($this->logger, $this->em);
111+
$interval = 3;
112+
$message = $this->createMessage($interval, null, null);
113+
114+
$this->em->expects($this->once())->method('flush');
115+
$this->logger->expects($this->once())->method('info');
116+
117+
$before = new DateTime();
118+
$result = $handler->handle($message, $this->output);
119+
$after = new DateTime();
120+
121+
$this->assertTrue($result);
122+
$this->assertSame(MessageStatus::Submitted, $message->getMetadata()->getStatus());
123+
124+
$embargo = $message->getSchedule()->getEmbargo();
125+
$this->assertInstanceOf(DateTime::class, $embargo);
126+
127+
$minExpected = (clone $before)->add(new DateInterval('PT' . $interval . 'M'));
128+
$maxExpected = (clone $after)->add(new DateInterval('PT' . $interval . 'M'));
129+
130+
$this->assertGreaterThanOrEqual($minExpected->getTimestamp(), $embargo->getTimestamp());
131+
$this->assertLessThanOrEqual($maxExpected->getTimestamp(), $embargo->getTimestamp());
132+
}
133+
134+
public function testReturnsFalseWhenNextEmbargoExceedsUntil(): void
135+
{
136+
$handler = new RequeueHandler($this->logger, $this->em);
137+
$embargo = (new DateTime())->add(new DateInterval('PT1M'));
138+
$interval = 10;
139+
$until = (clone $embargo)->add(new DateInterval('PT5M')); // next would be +10, which exceeds until
140+
$message = $this->createMessage($interval, $until, $embargo);
141+
142+
$this->em->expects($this->never())->method('flush');
143+
$this->logger->expects($this->never())->method('info');
144+
145+
$result = $handler->handle($message, $this->output);
146+
147+
$this->assertFalse($result);
148+
$this->assertSame(MessageStatus::Draft, $message->getMetadata()->getStatus());
149+
$this->assertEquals(
150+
$embargo->format(DateTime::ATOM),
151+
$message->getSchedule()->getEmbargo()?->format(DateTime::ATOM)
152+
);
153+
}
154+
}

0 commit comments

Comments
 (0)