-
Notifications
You must be signed in to change notification settings - Fork 3.9k
Closed
Description
hello, when i use server-side streaming, i have a problem.
i get iterators from server list, put them into priority queue. i only get top n from queue. after called many times, i get OutOfMemoryError.
exception:
Caused by: java.lang.OutOfMemoryError: Direct buffer memory
at java.base/java.nio.Bits.reserveMemory(Bits.java:175)
at java.base/java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:118)
at java.base/java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:317)
at io.grpc.netty.shaded.io.netty.buffer.PoolArena$DirectArena.allocateDirect(PoolArena.java:649)
at io.grpc.netty.shaded.io.netty.buffer.PoolArena$DirectArena.newChunk(PoolArena.java:624)
at io.grpc.netty.shaded.io.netty.buffer.PoolArena.allocateNormal(PoolArena.java:203)
at io.grpc.netty.shaded.io.netty.buffer.PoolArena.tcacheAllocateNormal(PoolArena.java:187)
at io.grpc.netty.shaded.io.netty.buffer.PoolArena.allocate(PoolArena.java:136)
at io.grpc.netty.shaded.io.netty.buffer.PoolArena.allocate(PoolArena.java:126)
at io.grpc.netty.shaded.io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:396)
at io.grpc.netty.shaded.io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:188)
at io.grpc.netty.shaded.io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:179)
at io.grpc.netty.shaded.io.netty.buffer.AbstractByteBufAllocator.buffer(AbstractByteBufAllocator.java:116)
at io.grpc.netty.shaded.io.netty.handler.codec.ByteToMessageDecoder.expandCumulation(ByteToMessageDecoder.java:541)
at io.grpc.netty.shaded.io.netty.handler.codec.ByteToMessageDecoder$1.cumulate(ByteToMessageDecoder.java:97)
at io.grpc.netty.shaded.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:277)
at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.grpc.netty.shaded.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.grpc.netty.shaded.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
at io.grpc.netty.shaded.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
at io.grpc.netty.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:722)
at io.grpc.netty.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:658)
at io.grpc.netty.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:584)
at io.grpc.netty.shaded.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:496)
at io.grpc.netty.shaded.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:995)
at io.grpc.netty.shaded.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.grpc.netty.shaded.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
... 1 more
my client code like this:
try (Context.CancellableContext c = Context.current().withCancellation()) {
Context toRestore = c.attach();
try {
PriorityBlockingQueue<PriorityQueueObject> queue = new PriorityBlockingQueue<>();
for (int i = 0; i < num; i++) {
Iterator<ListObjectResult> iterator = ObjectGrpc.newBlockingStub(channelList[i]).listObjects(listObjectRequest);
if (iterator.hasNext()) {
ListObjectResult listObjectResult = iterator.next();
queue.offer(new PriorityQueueObject(listObjectResult, iterator));
}
}
while (!queue.isEmpty() && keyMap.size() < maxKeys) {
PriorityQueueObject priorityQueueObject = queue.poll();
ListObjectResult temp = priorityQueueObject.getListObjectResult();
keyMap.put(temp.getKey(), temp);
Iterator<ListObjectResult> iterator = priorityQueueObject.getIterator();
if (iterator.hasNext()) {
ListObjectResult listObjectResult = iterator.next();
queue.offer(new PriorityQueueObject(listObjectResult, iterator));
}
}
}
finally {
c.detach(toRestore);
}
}
but when i add this, this exception do not appear.
try (Context.CancellableContext c = Context.current().withCancellation()) {
Context toRestore = c.attach();
try {
PriorityBlockingQueue<PriorityQueueObject> queue = new PriorityBlockingQueue<>();
for (int i = 0; i < num; i++) {
Iterator<ListObjectResult> iterator = ObjectGrpc.newBlockingStub(channelList[i]).listObjects(listObjectRequest);
if (iterator.hasNext()) {
ListObjectResult listObjectResult = iterator.next();
queue.offer(new PriorityQueueObject(listObjectResult, iterator));
}
}
while (!queue.isEmpty() && keyMap.size() < maxKeys) {
PriorityQueueObject priorityQueueObject = queue.poll();
ListObjectResult temp = priorityQueueObject.getListObjectResult();
keyMap.put(temp.getKey(), temp);
Iterator<ListObjectResult> iterator = priorityQueueObject.getIterator();
if (iterator.hasNext()) {
ListObjectResult listObjectResult = iterator.next();
queue.offer(new PriorityQueueObject(listObjectResult, iterator));
}
}
// add this do not cause OutOfMemoryError
while (!queue.isEmpty()) {
PriorityQueueObject priorityQueueObject = queue.poll();
priorityQueueObject.getIterator().hasNext();
}
}
finally {
c.detach(toRestore);
}
}
i don't know why, can someone help me explain?