From af25d9a3ba43d87b51ffe69c17b6a083a163beec Mon Sep 17 00:00:00 2001 From: Kebe Date: Wed, 27 Aug 2025 11:30:37 +0000 Subject: [PATCH] [Feature][Response API] Add streaming support for non-harmony Signed-off-by: Kebe --- .../openai/responses/test_basic.py | 16 + vllm/entrypoints/context.py | 8 + vllm/entrypoints/openai/serving_responses.py | 440 +++++++++++++++--- 3 files changed, 393 insertions(+), 71 deletions(-) diff --git a/tests/v1/entrypoints/openai/responses/test_basic.py b/tests/v1/entrypoints/openai/responses/test_basic.py index 7a0baa5767cb..e192a8af0c56 100644 --- a/tests/v1/entrypoints/openai/responses/test_basic.py +++ b/tests/v1/entrypoints/openai/responses/test_basic.py @@ -2,6 +2,7 @@ # SPDX-FileCopyrightText: Copyright contributors to the vLLM project import openai # use the official client for correctness check +import openai.types.responses as openai_responses_types import pytest @@ -86,3 +87,18 @@ async def test_logprobs(client: openai.AsyncOpenAI): outputs = response.output assert outputs[-1].content[-1].logprobs assert len(outputs[-1].content[-1].logprobs[0].top_logprobs) == 5 + + +@pytest.mark.asyncio +async def test_streaming(client: openai.AsyncOpenAI): + stream = await client.responses.create( + input="What is 13 * 24?", + stream=True, + ) + events = [event for event in stream] + assert isinstance(events[0], openai_responses_types.ResponseCreatedEvent) + assert any( + isinstance(event, openai_responses_types.ResponseTextDeltaEvent) + for event in events) + assert isinstance(events[-1], + openai_responses_types.ResponseCompletedEvent) diff --git a/vllm/entrypoints/context.py b/vllm/entrypoints/context.py index 9d587e866933..073849f886d3 100644 --- a/vllm/entrypoints/context.py +++ b/vllm/entrypoints/context.py @@ -49,9 +49,17 @@ class SimpleContext(ConversationContext): def __init__(self): self.last_output = None + self.num_prompt_tokens = 0 + self.num_output_tokens = 0 + self.num_cached_tokens = 0 def append_output(self, output) -> None: self.last_output = output + if not isinstance(output, RequestOutput): + raise ValueError("SimpleContext only supports RequestOutput.") + self.num_prompt_tokens = len(output.prompt_token_ids or []) + self.num_cached_tokens = output.num_cached_tokens or 0 + self.num_output_tokens += len(output.outputs[0].token_ids or []) def need_builtin_tool_call(self) -> bool: return False diff --git a/vllm/entrypoints/openai/serving_responses.py b/vllm/entrypoints/openai/serving_responses.py index 899cb07b2b37..2adadfaff6d0 100644 --- a/vllm/entrypoints/openai/serving_responses.py +++ b/vllm/entrypoints/openai/serving_responses.py @@ -24,7 +24,8 @@ ResponseOutputMessage, ResponseOutputText, ResponseReasoningItem, ResponseReasoningTextDeltaEvent, - ResponseReasoningTextDoneEvent) + ResponseReasoningTextDoneEvent, + response_text_delta_event) from openai.types.responses.response_output_text import (Logprob, LogprobTopLogprob) # yapf: enable @@ -46,7 +47,7 @@ from vllm.entrypoints.logger import RequestLogger # yapf conflicts with isort for this block # yapf: disable -from vllm.entrypoints.openai.protocol import (ErrorResponse, +from vllm.entrypoints.openai.protocol import (DeltaMessage, ErrorResponse, InputTokensDetails, OutputTokensDetails, RequestResponseMetadata, @@ -459,9 +460,9 @@ async def responses_full_generator( # Calculate usage. assert final_res.prompt_token_ids is not None - num_prompt_tokens = len(final_res.prompt_token_ids) - num_generated_tokens = len(final_output.token_ids) - num_cached_tokens = final_res.num_cached_tokens + num_prompt_tokens = context.num_prompt_tokens + num_generated_tokens = context.num_output_tokens + num_cached_tokens = context.num_cached_tokens num_reasoning_tokens = 0 usage = ResponseUsage( @@ -537,6 +538,28 @@ def _create_response_logprobs( )) return out + def _create_stream_response_logprobs( + self, + token_ids: Sequence[int], + logprobs: Optional[SampleLogprobs], + tokenizer: AnyTokenizer, + top_logprobs: Optional[int] = None + ) -> list[response_text_delta_event.Logprob]: + lgs = self._create_response_logprobs(token_ids=token_ids, + logprobs=logprobs, + tokenizer=tokenizer, + top_logprobs=top_logprobs) + return [ + response_text_delta_event.Logprob( + token=lg.token, + logprob=lg.logprob, + top_logprobs=[ + response_text_delta_event.LogprobTopLogprob( + token=tl.token, logprob=tl.logprob) + for tl in lg.top_logprobs + ]) for lg in lgs + ] + def _make_response_output_items( self, request: ResponsesRequest, @@ -829,7 +852,7 @@ def _make_store_not_supported_error(self) -> ErrorResponse: status_code=HTTPStatus.BAD_REQUEST, ) - async def _process_streaming_events( + async def _process_simple_streaming_events( self, request: ResponsesRequest, sampling_params: SamplingParams, @@ -839,47 +862,284 @@ async def _process_streaming_events( tokenizer: AnyTokenizer, request_metadata: RequestResponseMetadata, created_time: int, + _send_event: Callable[[BaseModel], str], ) -> AsyncGenerator[str, None]: - sequence_number = 0 + current_content_index = 0 # FIXME: this number is never changed + current_output_index = 0 + current_item_id = "" # FIXME: this number is never changed + reasoning_parser = None + if self.reasoning_parser: + reasoning_parser = self.reasoning_parser(tokenizer) + previous_text = "" + previous_token_ids: list[int] = [] + first_delta_sent = False + previous_delta_messages: list[DeltaMessage] = [] + async for ctx in result_generator: + assert isinstance(ctx, SimpleContext) + if ctx.last_output is None: + continue + if ctx.last_output.outputs: + output = ctx.last_output.outputs[0] + if reasoning_parser: + delta_message = \ + reasoning_parser.extract_reasoning_content_streaming( + previous_text=previous_text, + current_text=previous_text + output.text, + delta_text=output.text, + previous_token_ids=previous_token_ids, + current_token_ids=previous_token_ids + + output.token_ids, + delta_token_ids=output.token_ids, + ) + else: + delta_message = DeltaMessage(content=output.text, ) + previous_text += output.text + previous_token_ids += output.token_ids + if not delta_message: + continue + if not first_delta_sent: + if delta_message.reasoning_content: + yield _send_event( + openai_responses_types. + ResponseOutputItemAddedEvent( + type="response.output_item.added", + sequence_number=-1, + output_index=current_output_index, + item=openai_responses_types. + ResponseReasoningItem( + type="reasoning", + id=current_item_id, + summary=[], + status="in_progress", + ), + )) + else: + yield _send_event( + openai_responses_types. + ResponseOutputItemAddedEvent( + type="response.output_item.added", + sequence_number=-1, + output_index=current_output_index, + item=openai_responses_types. + ResponseOutputMessage( + id=current_item_id, + type="message", + role="assistant", + content=[], + status="in_progress", + ), + )) + yield _send_event( + openai_responses_types.ResponseContentPartAddedEvent( + type="response.content_part.added", + sequence_number=-1, + output_index=current_output_index, + item_id=current_item_id, + content_index=current_content_index, + part=openai_responses_types.ResponseOutputText( + type="output_text", + text="", + annotations=[], + logprobs=[], + ), + )) + first_delta_sent = True + # todo(kebe7jun) tool call support + + # check delta message and previous delta message are + # same as content or reasoning content + if (previous_delta_messages + and previous_delta_messages[-1].reasoning_content + is not None and delta_message.content is not None): + # from reasoning to normal content, send done + # event for reasoning + reason_content = ''.join( + pm.reasoning_content for pm in previous_delta_messages + if pm.reasoning_content is not None) + yield _send_event( + ResponseReasoningTextDoneEvent( + type="response.reasoning_text.done", + item_id=current_item_id, + sequence_number=-1, + output_index=current_output_index, + content_index=current_content_index, + text=reason_content, + )) + reasoning_item = ResponseReasoningItem( + type="reasoning", + content=[ + ResponseReasoningTextContent( + text=reason_content, + type="reasoning_text", + ), + ], + status="completed", + id=current_item_id, + summary=[], + ) + yield _send_event( + ResponseOutputItemDoneEvent( + type="response.output_item.done", + sequence_number=-1, + output_index=current_output_index, + item=reasoning_item, + )) + yield _send_event( + openai_responses_types.ResponseOutputItemAddedEvent( + type="response.output_item.added", + sequence_number=-1, + output_index=current_output_index, + item=openai_responses_types.ResponseOutputMessage( + id=current_item_id, + type="message", + role="assistant", + content=[], + status="in_progress", + ), + )) + current_output_index += 1 + yield _send_event( + openai_responses_types.ResponseContentPartAddedEvent( + type="response.content_part.added", + sequence_number=-1, + output_index=current_output_index, + item_id=current_item_id, + content_index=current_content_index, + part=openai_responses_types.ResponseOutputText( + type="output_text", + text="", + annotations=[], + logprobs=[], + ), + )) + # reset previous delta messages + previous_delta_messages = [] - def _send_event(event: BaseModel): - nonlocal sequence_number - # Set sequence_number if the event has this attribute - if hasattr(event, 'sequence_number'): - event.sequence_number = sequence_number - sequence_number += 1 - # Get event type from the event's type field if it exists - event_type = getattr(event, 'type', 'unknown') - return (f"event: {event_type}\n" - f"data: {event.model_dump_json(indent=None)}\n\n") + if delta_message.reasoning_content is not None: + yield _send_event( + ResponseReasoningTextDeltaEvent( + type="response.reasoning_text.delta", + sequence_number=-1, + content_index=current_content_index, + output_index=current_output_index, + item_id=current_item_id, + delta=delta_message.reasoning_content, + )) + elif delta_message.content is not None: + yield _send_event( + openai_responses_types.ResponseTextDeltaEvent( + type="response.output_text.delta", + sequence_number=-1, + content_index=current_content_index, + output_index=current_output_index, + item_id=current_item_id, + delta=delta_message.content, + logprobs=self._create_stream_response_logprobs( + token_ids=output.token_ids, + logprobs=output.logprobs, + tokenizer=tokenizer, + top_logprobs=request.top_logprobs, + ) + if request.is_include_output_logprobs() else None, + )) + previous_delta_messages.append(delta_message) + if previous_delta_messages: + if previous_delta_messages[-1].reasoning_content is not None: + reason_content = ''.join(pm.reasoning_content + for pm in previous_delta_messages + if pm.reasoning_content is not None) + yield _send_event( + ResponseReasoningTextDoneEvent( + type="response.reasoning_text.done", + item_id=current_item_id, + sequence_number=-1, + output_index=current_output_index, + content_index=current_content_index, + text=reason_content, + )) + reasoning_item = ResponseReasoningItem( + type="reasoning", + content=[ + ResponseReasoningTextContent( + text=reason_content, + type="reasoning_text", + ), + ], + status="completed", + id=current_item_id, + summary=[], + ) + yield _send_event( + ResponseOutputItemDoneEvent( + type="response.output_item.done", + sequence_number=-1, + output_index=current_output_index, + item=reasoning_item, + )) + elif previous_delta_messages[-1].content is not None: + final_content = ''.join(pm.content + for pm in previous_delta_messages + if pm.content is not None) + yield _send_event( + openai_responses_types.ResponseTextDoneEvent( + type="response.output_text.done", + sequence_number=-1, + output_index=current_output_index, + content_index=current_content_index, + text=final_content, + logprobs=[], + item_id=current_item_id, + )) + part = ResponseOutputText( + text=final_content, + type="output_text", + annotations=[], + ) + yield _send_event( + openai_responses_types.ResponseContentPartDoneEvent( + type="response.content_part.done", + sequence_number=-1, + item_id=current_item_id, + output_index=current_output_index, + content_index=current_content_index, + part=part, + )) + item = ResponseOutputMessage( + type="message", + role="assistant", + content=[ + part, + ], + status="completed", + id=current_item_id, + summary=[], + ) + yield _send_event( + ResponseOutputItemDoneEvent( + type="response.output_item.done", + sequence_number=-1, + output_index=current_output_index, + item=item, + )) + + async def _process_harmony_streaming_events( + self, + request: ResponsesRequest, + sampling_params: SamplingParams, + result_generator: AsyncIterator[Optional[ConversationContext]], + context: ConversationContext, + model_name: str, + tokenizer: AnyTokenizer, + request_metadata: RequestResponseMetadata, + created_time: int, + _send_event: Callable[[BaseModel], str], + ) -> AsyncGenerator[str, None]: current_content_index = 0 # FIXME: this number is never changed current_output_index = 0 current_item_id = "" # FIXME: this number is never changed sent_output_item_added = False - initial_response = ResponsesResponse.from_request( - request, - sampling_params, - model_name=model_name, - created_time=created_time, - output=[], - status="in_progress", - usage=None, - ).model_dump() - yield _send_event( - ResponseCreatedEvent( - type="response.created", - sequence_number=-1, - response=initial_response, - )) - yield _send_event( - ResponseInProgressEvent( - type="response.in_progress", - sequence_number=-1, - response=initial_response, - )) - async for ctx in result_generator: assert isinstance(ctx, StreamingHarmonyContext) @@ -1229,29 +1489,6 @@ def _send_event(event: BaseModel): ), )) - async def empty_async_generator(): - # A hack to trick Python to think this is a generator but in fact - # it immediately returns. - if False: - yield - - final_response = await self.responses_full_generator( - request, - sampling_params, - empty_async_generator(), - context, - model_name, - tokenizer, - request_metadata, - created_time=created_time, - ) - yield _send_event( - openai_responses_types.ResponseCompletedEvent( - type="response.completed", - sequence_number=-1, - response=final_response.model_dump(), - )) - async def responses_stream_generator( self, request: ResponsesRequest, @@ -1266,16 +1503,77 @@ async def responses_stream_generator( # TODO: # 1. Handle disconnect - if not isinstance(context, StreamingHarmonyContext): - raise NotImplementedError( - "Streaming is not supported for responses API without Harmony." - ) - created_time = created_time or int(time.time()) + sequence_number = 0 + + def _send_event(event: BaseModel): + nonlocal sequence_number + # Set sequence_number if the event has this attribute + if hasattr(event, 'sequence_number'): + event.sequence_number = sequence_number + sequence_number += 1 + # Get event type from the event's type field if it exists + event_type = getattr(event, 'type', 'unknown') + return (f"event: {event_type}\n" + f"data: {event.model_dump_json(indent=None)}\n\n") + async with AsyncExitStack() as exit_stack: - await context.init_tool_sessions(self.tool_server, exit_stack) - async for event_data in self._process_streaming_events( - request, sampling_params, result_generator, context, - model_name, tokenizer, request_metadata, created_time): + processer = None + if self.use_harmony: + await context.init_tool_sessions(self.tool_server, exit_stack) + processer = self._process_harmony_streaming_events + else: + processer = self._process_simple_streaming_events + + initial_response = ResponsesResponse.from_request( + request, + sampling_params, + model_name=model_name, + created_time=created_time, + output=[], + status="in_progress", + usage=None, + ).model_dump() + yield _send_event( + ResponseCreatedEvent( + type="response.created", + sequence_number=-1, + response=initial_response, + )) + yield _send_event( + ResponseInProgressEvent( + type="response.in_progress", + sequence_number=-1, + response=initial_response, + )) + + async for event_data in processer(request, sampling_params, + result_generator, context, + model_name, tokenizer, + request_metadata, created_time, + _send_event): yield event_data + + async def empty_async_generator(): + # A hack to trick Python to think this is a generator but + # in fact it immediately returns. + if False: + yield + + final_response = await self.responses_full_generator( + request, + sampling_params, + empty_async_generator(), + context, + model_name, + tokenizer, + request_metadata, + created_time=created_time, + ) + yield _send_event( + openai_responses_types.ResponseCompletedEvent( + type="response.completed", + sequence_number=-1, + response=final_response.model_dump(), + ))