Skip to content

Commit f5e8ffa

Browse files
committed
Add ProducerBatch.__lt__ for heapq (#2698)
1 parent 6191f98 commit f5e8ffa

File tree

2 files changed

+20
-0
lines changed

2 files changed

+20
-0
lines changed

kafka/producer/record_accumulator.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,10 @@ def __str__(self):
179179
return 'ProducerBatch(topic_partition=%s, record_count=%d)' % (
180180
self.topic_partition, self.records.next_offset())
181181

182+
# for heapq
183+
def __lt__(self, other):
184+
return self.created < other.created
185+
182186

183187
class RecordAccumulator(object):
184188
"""

test/test_record_accumulator.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,22 @@ def test_batch_cannot_complete_twice():
120120
assert record_metadata.offset == 500
121121
assert record_metadata.timestamp == 10
122122

123+
def test_producer_batch_lt(tp):
124+
records = MemoryRecordsBuilder(
125+
magic=2, compression_type=0, batch_size=100000)
126+
b1 = ProducerBatch(tp, records, now=1)
127+
b2 = ProducerBatch(tp, records, now=2)
128+
129+
assert b1 < b2
130+
assert not b1 < b1
131+
132+
import heapq
133+
q = []
134+
heapq.heappush(q, b2)
135+
heapq.heappush(q, b1)
136+
assert q[0] == b1
137+
assert q[1] == b2
138+
123139
def test_linger(tp, cluster):
124140
now = 0
125141
accum = RecordAccumulator(linger_ms=10)

0 commit comments

Comments
 (0)