Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions src/openai/_streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,11 @@ def __stream__(self) -> Iterator[_T]:
if sse.data.startswith("[DONE]"):
break

# Skip events with no data (e.g., standalone retry/id directives)
# Per SSE spec, these are valid meta-only events that shouldn't be parsed as JSON
if not sse.data or sse.data.strip() == "":
continue

# we have to special case the Assistants `thread.` events since we won't have an "event" key in the data
if sse.event and sse.event.startswith("thread."):
data = sse.json()
Expand Down Expand Up @@ -161,6 +166,11 @@ async def __stream__(self) -> AsyncIterator[_T]:
if sse.data.startswith("[DONE]"):
break

# Skip events with no data (e.g., standalone retry/id directives)
# Per SSE spec, these are valid meta-only events that shouldn't be parsed as JSON
if not sse.data or sse.data.strip() == "":
continue

# we have to special case the Assistants `thread.` events since we won't have an "event" key in the data
if sse.event and sse.event.startswith("thread."):
data = sse.json()
Expand Down
70 changes: 70 additions & 0 deletions tests/test_streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,76 @@ def body() -> Iterator[bytes]:
assert sse.json() == {"content": "известни"}


@pytest.mark.asyncio
@pytest.mark.parametrize("sync", [True, False], ids=["sync", "async"])
async def test_retry_directive_without_data(sync: bool, client: OpenAI, async_client: AsyncOpenAI) -> None:
"""Test that standalone retry directives (with no data field) are skipped.

This is a common pattern in SSE streams, especially from providers like Anthropic,
where a retry directive is sent at the beginning of the stream:

retry: 3000

data: {"actual":"content"}

The retry directive is a valid SSE meta-field but should not be parsed as JSON data.
"""
def body() -> Iterator[bytes]:
# Standalone retry directive (no data field)
yield b"retry: 3000\n"
yield b"\n"
# Actual data event
yield b'data: {"foo":true}\n'
yield b"\n"

iterator = make_event_iterator(content=body(), sync=sync, client=client, async_client=async_client)

# First SSE event has retry but no data - should be yielded by decoder
sse = await iter_next(iterator)
assert sse.retry == 3000
assert sse.data == ""

# Second SSE event has actual data
sse = await iter_next(iterator)
assert sse.json() == {"foo": True}

await assert_empty_iter(iterator)


@pytest.mark.asyncio
@pytest.mark.parametrize("sync", [True, False], ids=["sync", "async"])
async def test_retry_with_id_directive_without_data(sync: bool, client: OpenAI, async_client: AsyncOpenAI) -> None:
"""Test that SSE events with only meta-fields (retry, id, event) and no data are skipped.

Per the SSE spec, these are valid meta-only events that configure the event stream
but don't contain actual data to be processed.
"""
def body() -> Iterator[bytes]:
# Meta-only event with retry and id
yield b"id: msg_123\n"
yield b"retry: 5000\n"
yield b"\n"
# Actual data event
yield b"id: msg_124\n"
yield b'data: {"bar":false}\n'
yield b"\n"

iterator = make_event_iterator(content=body(), sync=sync, client=client, async_client=async_client)

# First SSE event is meta-only
sse = await iter_next(iterator)
assert sse.id == "msg_123"
assert sse.retry == 5000
assert sse.data == ""

# Second SSE event has actual data
sse = await iter_next(iterator)
assert sse.id == "msg_124"
assert sse.json() == {"bar": False}

await assert_empty_iter(iterator)


async def to_aiter(iter: Iterator[bytes]) -> AsyncIterator[bytes]:
for chunk in iter:
yield chunk
Expand Down