Skip to content

server-side streaming must call hasNext() before cancel #10490

@misumisery

Description

@misumisery

hello, when i use server-side streaming, i have a problem.

i get iterators from server list, put them into priority queue. i only get top n from queue. after called many times, i get OutOfMemoryError.

exception:

Caused by: java.lang.OutOfMemoryError: Direct buffer memory
        at java.base/java.nio.Bits.reserveMemory(Bits.java:175)
        at java.base/java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:118)
        at java.base/java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:317)
        at io.grpc.netty.shaded.io.netty.buffer.PoolArena$DirectArena.allocateDirect(PoolArena.java:649)
        at io.grpc.netty.shaded.io.netty.buffer.PoolArena$DirectArena.newChunk(PoolArena.java:624)
        at io.grpc.netty.shaded.io.netty.buffer.PoolArena.allocateNormal(PoolArena.java:203)
        at io.grpc.netty.shaded.io.netty.buffer.PoolArena.tcacheAllocateNormal(PoolArena.java:187)
        at io.grpc.netty.shaded.io.netty.buffer.PoolArena.allocate(PoolArena.java:136)
        at io.grpc.netty.shaded.io.netty.buffer.PoolArena.allocate(PoolArena.java:126)
        at io.grpc.netty.shaded.io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:396)
        at io.grpc.netty.shaded.io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:188)
        at io.grpc.netty.shaded.io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:179)
        at io.grpc.netty.shaded.io.netty.buffer.AbstractByteBufAllocator.buffer(AbstractByteBufAllocator.java:116)
        at io.grpc.netty.shaded.io.netty.handler.codec.ByteToMessageDecoder.expandCumulation(ByteToMessageDecoder.java:541)
        at io.grpc.netty.shaded.io.netty.handler.codec.ByteToMessageDecoder$1.cumulate(ByteToMessageDecoder.java:97)
        at io.grpc.netty.shaded.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:277)
        at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
        at io.grpc.netty.shaded.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
        at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at io.grpc.netty.shaded.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
        at io.grpc.netty.shaded.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
        at io.grpc.netty.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:722)
        at io.grpc.netty.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:658)
        at io.grpc.netty.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:584)
        at io.grpc.netty.shaded.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:496)
        at io.grpc.netty.shaded.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:995)
        at io.grpc.netty.shaded.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
        at io.grpc.netty.shaded.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        ... 1 more

my client code like this:

try (Context.CancellableContext c = Context.current().withCancellation()) {
  Context toRestore = c.attach();
  try {
    PriorityBlockingQueue<PriorityQueueObject> queue = new PriorityBlockingQueue<>();
    for (int i = 0; i < num; i++) {
      Iterator<ListObjectResult> iterator = ObjectGrpc.newBlockingStub(channelList[i]).listObjects(listObjectRequest);
      if (iterator.hasNext()) {
        ListObjectResult listObjectResult = iterator.next();
        queue.offer(new PriorityQueueObject(listObjectResult, iterator));
      }
    }
    while (!queue.isEmpty() && keyMap.size() < maxKeys) {
      PriorityQueueObject priorityQueueObject = queue.poll();
      ListObjectResult temp = priorityQueueObject.getListObjectResult();
      keyMap.put(temp.getKey(), temp);
      Iterator<ListObjectResult> iterator = priorityQueueObject.getIterator();
      if (iterator.hasNext()) {
        ListObjectResult listObjectResult = iterator.next();
        queue.offer(new PriorityQueueObject(listObjectResult, iterator));
      }
    }
  }
  finally {
    c.detach(toRestore);
  }
}

but when i add this, this exception do not appear.

try (Context.CancellableContext c = Context.current().withCancellation()) {
  Context toRestore = c.attach();
  try {
    PriorityBlockingQueue<PriorityQueueObject> queue = new PriorityBlockingQueue<>();
    for (int i = 0; i < num; i++) {
      Iterator<ListObjectResult> iterator = ObjectGrpc.newBlockingStub(channelList[i]).listObjects(listObjectRequest);
      if (iterator.hasNext()) {
        ListObjectResult listObjectResult = iterator.next();
        queue.offer(new PriorityQueueObject(listObjectResult, iterator));
      }
    }
    while (!queue.isEmpty() && keyMap.size() < maxKeys) {
      PriorityQueueObject priorityQueueObject = queue.poll();
      ListObjectResult temp = priorityQueueObject.getListObjectResult();
      keyMap.put(temp.getKey(), temp);
      Iterator<ListObjectResult> iterator = priorityQueueObject.getIterator();
      if (iterator.hasNext()) {
        ListObjectResult listObjectResult = iterator.next();
        queue.offer(new PriorityQueueObject(listObjectResult, iterator));
      }
    }
    // add this do not cause OutOfMemoryError
    while (!queue.isEmpty()) {
      PriorityQueueObject priorityQueueObject = queue.poll();
      priorityQueueObject.getIterator().hasNext();
    }
  }
  finally {
    c.detach(toRestore);
  }
}

i don't know why, can someone help me explain?

Metadata

Metadata

Assignees

Labels

Type

No type

Projects

No projects

Milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions