13
13
14
14
use Flowpack \JobQueue \Common \Queue \Message ;
15
15
use Flowpack \JobQueue \Common \Queue \QueueInterface ;
16
+ use TYPO3 \Flow \Utility \Algorithms ;
17
+ use Flowpack \JobQueue \Common \Exception as JobQueueException ;
16
18
17
19
/**
18
20
* A queue implementation using Redis as the queue backend
@@ -58,51 +60,49 @@ class RedisQueue implements QueueInterface
58
60
protected $ maxReconnectDelay = 30.0 ;
59
61
60
62
/**
61
- * Constructor
62
- *
63
63
* @param string $name
64
64
* @param array $options
65
+ * @throws JobQueueException
65
66
*/
66
- public function __construct ($ name , array $ options = array () )
67
+ public function __construct ($ name , array $ options = [] )
67
68
{
68
69
$ this ->name = $ name ;
69
70
if (isset ($ options ['defaultTimeout ' ])) {
70
71
$ this ->defaultTimeout = (integer )$ options ['defaultTimeout ' ];
71
72
}
72
- $ this ->clientOptions = isset ($ options ['client ' ]) ? $ options ['client ' ] : array ();
73
-
73
+ $ this ->clientOptions = isset ($ options ['client ' ]) ? $ options ['client ' ] : [];
74
74
$ this ->client = new \Redis ();
75
75
if (!$ this ->connectClient ()) {
76
- throw new \ Flowpack \ JobQueue \ Common \ Exception ('Could not connect to Redis ' , 1467382685 );
76
+ throw new JobQueueException ('Could not connect to Redis ' , 1467382685 );
77
77
}
78
78
}
79
79
80
80
/**
81
- * Submit a message to the queue
82
- *
83
- * @param Message $message
84
- * @return void
81
+ * @inheritdoc
82
+ */
83
+ public function getName ()
84
+ {
85
+ return $ this ->name ;
86
+ }
87
+
88
+ /**
89
+ * @inheritdoc
85
90
*/
86
- public function submit (Message $ message )
91
+ public function submit ($ payload , array $ options = [] )
87
92
{
88
93
$ this ->checkClientConnection ();
89
- if ($ message ->getIdentifier () !== null ) {
90
- $ added = $ this ->client ->sAdd ("queue: {$ this ->name }:ids " , $ message ->getIdentifier ());
91
- if (!$ added ) {
92
- return ;
93
- }
94
+ $ messageId = Algorithms::generateUUID ();
95
+ $ idStored = $ this ->client ->hSet ("queue: {$ this ->name }:ids " , $ messageId , json_encode ($ payload ));
96
+ if ($ idStored === 0 ) {
97
+ return null ;
94
98
}
95
- $ encodedMessage = $ this -> encodeMessage ( $ message );
96
- $ this ->client ->lPush ("queue: {$ this ->name }:messages " , $ encodedMessage );
97
- $ message -> setState (Message:: STATE_SUBMITTED ) ;
99
+
100
+ $ this ->client ->lPush ("queue: {$ this ->name }:messages " , $ messageId );
101
+ return $ messageId ;
98
102
}
99
103
100
104
/**
101
- * Wait for a message in the queue and return the message for processing
102
- * (without safety queue)
103
- *
104
- * @param int $timeout
105
- * @return Message The received message or NULL if a timeout occurred
105
+ * @inheritdoc
106
106
*/
107
107
public function waitAndTake ($ timeout = null )
108
108
{
@@ -111,149 +111,128 @@ public function waitAndTake($timeout = null)
111
111
}
112
112
$ this ->checkClientConnection ();
113
113
$ keyAndValue = $ this ->client ->brPop ("queue: {$ this ->name }:messages " , $ timeout );
114
- $ value = isset ($ keyAndValue [1 ]) ? $ keyAndValue [1 ] : null ;
115
- if (is_string ($ value )) {
116
- $ message = $ this ->decodeMessage ($ value );
117
-
118
- if ($ message ->getIdentifier () !== null ) {
119
- $ this ->client ->sRem ("queue: {$ this ->name }:ids " , $ message ->getIdentifier ());
120
- }
121
-
122
- // The message is marked as done
123
- $ message ->setState (Message::STATE_DONE );
124
-
125
- return $ message ;
126
- } else {
114
+ $ messageId = isset ($ keyAndValue [1 ]) ? $ keyAndValue [1 ] : null ;
115
+ if ($ messageId === null ) {
127
116
return null ;
128
117
}
118
+ $ message = $ this ->getMessageById ($ messageId );
119
+ if ($ message !== null ) {
120
+ $ this ->client ->hDel ("queue: {$ this ->name }:ids " , $ messageId );
121
+ }
122
+ return $ message ;
129
123
}
130
124
131
125
/**
132
- * Wait for a message in the queue and save the message to a safety queue
133
- *
134
- * TODO: Idea for implementing a TTR (time to run) with monitoring of safety queue. E.g.
135
- * use different queue names with encoded times? With "brpoplpush" we cannot modify the
136
- * queued item on transfer to the safety queue and we cannot update a timestamp to mark
137
- * the run start time in the message, so separate keys should be used for this.
138
- *
139
- * @param int $timeout
140
- * @return Message
126
+ * @inheritdoc
141
127
*/
142
128
public function waitAndReserve ($ timeout = null )
143
129
{
144
130
if ($ timeout === null ) {
145
131
$ timeout = $ this ->defaultTimeout ;
146
132
}
147
133
$ this ->checkClientConnection ();
148
- $ value = $ this ->client ->brpoplpush ("queue: {$ this ->name }:messages " , "queue: {$ this ->name }:processing " , $ timeout );
149
- if (is_string ($ value )) {
150
- $ message = $ this ->decodeMessage ($ value );
151
- if ($ message ->getIdentifier () !== null ) {
152
- $ this ->client ->sRem ("queue: {$ this ->name }:ids " , $ message ->getIdentifier ());
153
- }
154
- return $ message ;
155
- } else {
156
- return null ;
157
- }
134
+ $ messageId = $ this ->client ->brpoplpush ("queue: {$ this ->name }:messages " , "queue: {$ this ->name }:processing " , $ timeout );
135
+ return $ this ->getMessageById ($ messageId );
158
136
}
159
137
160
138
/**
161
- * Mark a message as finished
162
- *
163
- * @param Message $message
164
- * @return boolean TRUE if the message could be removed
139
+ * @inheritdoc
140
+ */
141
+ public function release ($ messageId , array $ options = [])
142
+ {
143
+ $ this ->checkClientConnection ();
144
+ $ this ->client ->lRem ("queue: {$ this ->name }:processing " , $ messageId , 0 );
145
+ $ numberOfReleases = (integer )$ this ->client ->hGet ("queue: {$ this ->name }:releases " , $ messageId );
146
+ $ this ->client ->hSet ("queue: {$ this ->name }:releases " , $ messageId , $ numberOfReleases + 1 );
147
+ $ this ->client ->lPush ("queue: {$ this ->name }:messages " , $ messageId );
148
+ }
149
+
150
+ /**
151
+ * @inheritdoc
165
152
*/
166
- public function finish ( Message $ message )
153
+ public function abort ( $ messageId )
167
154
{
168
155
$ this ->checkClientConnection ();
169
- $ originalValue = $ message ->getOriginalValue ();
170
- $ success = $ this ->client ->lRem ("queue: {$ this ->name }:processing " , $ originalValue , 0 ) > 0 ;
171
- if ($ success ) {
172
- $ message ->setState (Message::STATE_DONE );
156
+ $ numberOfRemoved = $ this ->client ->lRem ("queue: {$ this ->name }:processing " , $ messageId , 0 );
157
+ if ($ numberOfRemoved === 1 ) {
158
+ $ this ->client ->lPush ("queue: {$ this ->name }:failed " , $ messageId );
173
159
}
174
- return $ success ;
175
160
}
176
161
177
162
/**
178
- * Peek for messages
179
- *
180
- * @param integer $limit
181
- * @return Message[] Messages or empty array if no messages were present
163
+ * @inheritdoc
164
+ */
165
+ public function finish ($ messageId )
166
+ {
167
+ $ this ->checkClientConnection ();
168
+ $ this ->client ->hDel ("queue: {$ this ->name }:ids " , $ messageId );
169
+ $ this ->client ->hDel ("queue: {$ this ->name }:releases " , $ messageId );
170
+ return $ this ->client ->lRem ("queue: {$ this ->name }:processing " , $ messageId , 0 ) > 0 ;
171
+ }
172
+
173
+ /**
174
+ * @inheritdoc
182
175
*/
183
176
public function peek ($ limit = 1 )
184
177
{
185
178
$ this ->checkClientConnection ();
186
179
$ result = $ this ->client ->lRange ("queue: {$ this ->name }:messages " , -($ limit ), -1 );
187
- if (is_array ($ result ) && count ($ result ) > 0 ) {
188
- $ messages = array ();
189
- foreach ($ result as $ value ) {
190
- $ message = $ this ->decodeMessage ($ value );
191
- // The message is still submitted and should not be processed!
192
- $ message ->setState (Message::STATE_SUBMITTED );
193
- $ messages [] = $ message ;
194
- }
195
- return $ messages ;
180
+ if (!is_array ($ result ) || count ($ result ) === 0 ) {
181
+ return [];
182
+ }
183
+ $ messages = [];
184
+ foreach ($ result as $ messageId ) {
185
+ $ encodedPayload = $ this ->client ->hGet ("queue: {$ this ->name }:ids " , $ messageId );
186
+ $ messages [] = new Message ($ messageId , json_decode ($ encodedPayload , true ));
196
187
}
197
- return array () ;
188
+ return $ messages ;
198
189
}
199
190
200
191
/**
201
- * Count messages in the queue
202
- *
203
- * @return integer
192
+ * @inheritdoc
204
193
*/
205
194
public function count ()
206
195
{
207
196
$ this ->checkClientConnection ();
208
- $ count = $ this ->client ->lLen ("queue: {$ this ->name }:messages " );
209
- return $ count ;
197
+ return $ this ->client ->lLen ("queue: {$ this ->name }:messages " );
210
198
}
211
199
212
200
/**
213
- * Encode a message
214
- *
215
- * Updates the original value property of the message to resemble the
216
- * encoded representation.
217
- *
218
- * @param Message $message
219
- * @return string
201
+ * @return void
220
202
*/
221
- protected function encodeMessage ( Message $ message )
203
+ public function setUp ( )
222
204
{
223
- $ value = json_encode ($ message ->toArray ());
224
- $ message ->setOriginalValue ($ value );
225
- return $ value ;
205
+ $ this ->checkClientConnection ();
226
206
}
227
207
228
208
/**
229
- * Decode a message from a string representation
230
- *
231
- * @param string $value
232
- * @return Message
209
+ * @inheritdoc
233
210
*/
234
- protected function decodeMessage ( $ value )
211
+ public function flush ( )
235
212
{
236
- $ decodedMessage = json_decode ($ value , true );
237
- $ message = new Message ($ decodedMessage ['payload ' ]);
238
- if (isset ($ decodedMessage ['identifier ' ])) {
239
- $ message ->setIdentifier ($ decodedMessage ['identifier ' ]);
240
- }
241
- $ message ->setOriginalValue ($ value );
242
- return $ message ;
213
+ $ this ->checkClientConnection ();
214
+ $ this ->client ->flushDB ();
243
215
}
244
216
245
217
/**
246
- *
247
- * @param string $identifier
218
+ * @param string $messageId
248
219
* @return Message
249
220
*/
250
- public function getMessage ( $ identifier )
221
+ protected function getMessageById ( $ messageId )
251
222
{
252
- return null ;
223
+ if (!is_string ($ messageId )) {
224
+ return null ;
225
+ }
226
+ $ encodedPayload = $ this ->client ->hGet ("queue: {$ this ->name }:ids " , $ messageId );
227
+ $ numberOfReleases = (integer )$ this ->client ->hGet ("queue: {$ this ->name }:releases " , $ messageId );
228
+ return new Message ($ messageId , json_decode ($ encodedPayload , true ), $ numberOfReleases );
253
229
}
254
230
255
231
/**
256
232
* Check if the Redis client connection is still up and reconnect if Redis was disconnected
233
+ *
234
+ * @return void
235
+ * @throws JobQueueException
257
236
*/
258
237
protected function checkClientConnection ()
259
238
{
@@ -268,7 +247,7 @@ protected function checkClientConnection()
268
247
}
269
248
if ($ reconnect ) {
270
249
if (!$ this ->connectClient ()) {
271
- throw new \ Flowpack \ JobQueue \ Common \ Exception ('Could not connect to Redis ' , 1467382685 );
250
+ throw new JobQueueException ('Could not connect to Redis ' , 1467382685 );
272
251
}
273
252
}
274
253
}
@@ -286,20 +265,16 @@ protected function connectClient()
286
265
$ host = isset ($ this ->clientOptions ['host ' ]) ? $ this ->clientOptions ['host ' ] : '127.0.0.1 ' ;
287
266
$ port = isset ($ this ->clientOptions ['port ' ]) ? $ this ->clientOptions ['port ' ] : 6379 ;
288
267
$ database = isset ($ this ->clientOptions ['database ' ]) ? $ this ->clientOptions ['database ' ] : 0 ;
289
-
290
268
// The connection read timeout should be higher than the timeout for blocking operations!
291
269
$ timeout = isset ($ this ->clientOptions ['timeout ' ]) ? $ this ->clientOptions ['timeout ' ] : round ($ this ->defaultTimeout * 1.5 );
292
270
$ connected = $ this ->client ->connect ($ host , $ port , $ timeout ) && $ this ->client ->select ($ database );
293
-
294
271
// Break the cycle that could cause a high CPU load
295
272
if (!$ connected ) {
296
273
usleep ($ this ->reconnectDelay * 1e6 );
297
274
$ this ->reconnectDelay = min ($ this ->reconnectDelay * $ this ->reconnectDecay , $ this ->maxReconnectDelay );
298
275
} else {
299
276
$ this ->reconnectDelay = 1.0 ;
300
277
}
301
-
302
278
return $ connected ;
303
279
}
304
-
305
280
}
0 commit comments