Skip to content

Conversation

@codeflash-ai
Copy link

@codeflash-ai codeflash-ai bot commented Oct 27, 2025

📄 34% (0.34x) speedup for GrpcRunner.run_asyncio in pinecone/grpc/grpc_runner.py

⏱️ Runtime : 1.15 milliseconds 856 microseconds (best of 75 runs)

📝 Explanation and details

The optimization removes an unnecessary async wrapper function that was creating overhead in each call to run_asyncio.

Key change: The original code used a @wraps(func) decorator to create an inner wrapped() async function, then immediately called await wrapped(). This pattern added an extra function call layer and async frame creation/destruction overhead without providing any functional benefit.

Specific optimization: The optimized version inlines the wrapper logic directly into the run_asyncio method, eliminating:

  • The @wraps(func) decorator overhead
  • Creation of an extra async function object
  • An additional async call stack frame
  • The return await wrapped() indirection

Performance impact: The line profiler shows the elimination of the expensive wrapper creation (originally 20.4% of runtime) and the costly return await wrapped() call (originally 75.6% of runtime). The optimized version spends most time (80.4%) on the actual metadata preparation and gRPC function call.

Throughput benefits: The 8.7% throughput improvement (119,853 → 130,275 operations/second) comes from reduced per-call overhead, making this optimization particularly effective for high-frequency async gRPC operations where the wrapper elimination provides consistent savings across all calls.

The optimization works best for scenarios with many concurrent calls or high-throughput gRPC operations, as demonstrated by the throughput tests where the reduced call stack overhead compounds across multiple simultaneous operations.

Correctness verification report:

Test Status
⚙️ Existing Unit Tests 🔘 None Found
🌀 Generated Regression Tests 198 Passed
⏪ Replay Tests 🔘 None Found
🔎 Concolic Coverage Tests 🔘 None Found
📊 Tests Coverage 100.0%
🌀 Generated Regression Tests and Runtime
import asyncio  # used to run async functions
# Patch PineconeException in GrpcRunner's namespace for testing
import sys
# --- Begin: function to test (EXACT COPY, DO NOT MODIFY) ---
from functools import wraps
# Mocks and helpers for the test environment
from types import SimpleNamespace
from typing import Dict, Optional

import pytest  # used for our unit tests
from google.protobuf.message import Message
from grpc import CallCredentials, Compression
from grpc._channel import _InactiveRpcError
from pinecone import Config
from pinecone.exceptions.exceptions import PineconeException
from pinecone.grpc.config import GRPCClientConfig
from pinecone.grpc.grpc_runner import \
    GrpcRunner  # --- End: function to test ---
from pinecone.openapi_support.api_version import API_VERSION
from pinecone.utils.constants import CLIENT_VERSION

# --- Begin: Minimal stubs for external dependencies ---

# Minimal stub for google.protobuf.message.Message
class DummyMessage:
    pass

# Minimal stub for pinecone.Config
class DummyConfig:
    def __init__(self, api_key):
        self.api_key = api_key

# Minimal stub for pinecone.grpc.config.GRPCClientConfig
class DummyGRPCClientConfig:
    def __init__(self, additional_metadata=None):
        self.additional_metadata = additional_metadata

# Minimal stub for pinecone.openapi_support.api_version.API_VERSION
API_VERSION = "2023-06-01"

# Minimal stub for pinecone.utils.constants.CLIENT_VERSION
CLIENT_VERSION = "1.2.3"

# Patch GrpcRunner to use dummy exception for testing
GrpcRunner.__module__ = "test_module"
GrpcRunner.__globals__ = globals()

# --- End: Minimal stubs ---

# --- Begin: Test fixtures and helpers ---

@pytest.fixture
def grpc_runner():
    config = DummyConfig(api_key="test-api-key")
    grpc_config = DummyGRPCClientConfig(additional_metadata={"extra": "meta"})
    runner = GrpcRunner("test-index", config, grpc_config)
    return runner

@pytest.fixture
def dummy_request():
    return DummyMessage()

# Async dummy function to simulate a gRPC call
async def dummy_grpc_func(
    request,
    timeout=None,
    metadata=None,
    credentials=None,
    wait_for_ready=None,
    compression=None,
):
    # Return a tuple of all arguments for assertion
    return {
        "request": request,
        "timeout": timeout,
        "metadata": metadata,
        "credentials": credentials,
        "wait_for_ready": wait_for_ready,
        "compression": compression,
    }

# --- End: Test fixtures and helpers ---

# --- Begin: Basic Test Cases ---

@pytest.mark.asyncio
async def test_run_asyncio_basic_success(grpc_runner, dummy_request):
    """Test that run_asyncio returns expected values on success."""
    result = await grpc_runner.run_asyncio(dummy_grpc_func, dummy_request)

@pytest.mark.asyncio
async def test_run_asyncio_with_all_parameters(grpc_runner, dummy_request):
    """Test that run_asyncio passes all parameters correctly."""
    dummy_credentials = object()
    dummy_compression = object()
    metadata = {"foo": "bar"}
    result = await grpc_runner.run_asyncio(
        dummy_grpc_func,
        dummy_request,
        timeout=5,
        metadata=metadata,
        credentials=dummy_credentials,
        wait_for_ready=True,
        compression=dummy_compression,
    )

# --- End: Basic Test Cases ---

# --- Begin: Edge Test Cases ---

@pytest.mark.asyncio
async def test_run_asyncio_metadata_merging(grpc_runner, dummy_request):
    """Test that user metadata correctly merges and overrides fixed metadata."""
    metadata = {"api-key": "user-key", "new-key": "new-value"}
    result = await grpc_runner.run_asyncio(
        dummy_grpc_func,
        dummy_request,
        metadata=metadata,
    )

@pytest.mark.asyncio
async def test_run_asyncio_empty_user_metadata(grpc_runner, dummy_request):
    """Test that empty user metadata does not remove fixed metadata."""
    result = await grpc_runner.run_asyncio(
        dummy_grpc_func,
        dummy_request,
        metadata={},
    )

@pytest.mark.asyncio
async def test_run_asyncio_exception_handling(grpc_runner, dummy_request, monkeypatch):
    """Test that _InactiveRpcError is caught and re-raised as PineconeException."""
    # Patch _InactiveRpcError to our dummy error with a debug string
    class MyInactiveRpcError(Exception):
        def __init__(self, debug_error_string):
            self._state = SimpleNamespace(debug_error_string=debug_error_string)
    async def error_func(*args, **kwargs):
        raise MyInactiveRpcError("debug error string")
    monkeypatch.setitem(grpc_runner.run_asyncio.__globals__, "_InactiveRpcError", MyInactiveRpcError)
    monkeypatch.setitem(grpc_runner.run_asyncio.__globals__, "PineconeException", DummyPineconeException)
    with pytest.raises(DummyPineconeException) as excinfo:
        await grpc_runner.run_asyncio(error_func, dummy_request)

@pytest.mark.asyncio
async def test_run_asyncio_concurrent_calls(grpc_runner, dummy_request):
    """Test that multiple concurrent run_asyncio calls do not interfere."""
    tasks = [
        grpc_runner.run_asyncio(dummy_grpc_func, DummyMessage(), timeout=i)
        for i in range(5)
    ]
    results = await asyncio.gather(*tasks)
    for i, result in enumerate(results):
        pass

@pytest.mark.asyncio
async def test_run_asyncio_none_metadata(grpc_runner, dummy_request):
    """Test that None metadata is handled like an empty dict."""
    result = await grpc_runner.run_asyncio(
        dummy_grpc_func,
        dummy_request,
        metadata=None,
    )

# --- End: Edge Test Cases ---

# --- Begin: Large Scale Test Cases ---

@pytest.mark.asyncio
async def test_run_asyncio_large_number_of_concurrent_calls(grpc_runner):
    """Test scalability with many concurrent calls."""
    num_calls = 50  # Reasonable to avoid timeouts
    requests = [DummyMessage() for _ in range(num_calls)]
    tasks = [
        grpc_runner.run_asyncio(dummy_grpc_func, req, timeout=i)
        for i, req in enumerate(requests)
    ]
    results = await asyncio.gather(*tasks)
    for i, result in enumerate(results):
        pass

@pytest.mark.asyncio
async def test_run_asyncio_large_metadata(grpc_runner, dummy_request):
    """Test handling of large user metadata dicts."""
    large_metadata = {f"key{i}": f"value{i}" for i in range(200)}
    result = await grpc_runner.run_asyncio(
        dummy_grpc_func,
        dummy_request,
        metadata=large_metadata,
    )
    for i in range(200):
        pass

# --- End: Large Scale Test Cases ---

# --- Begin: Throughput Test Cases ---

@pytest.mark.asyncio
async def test_run_asyncio_throughput_small_load(grpc_runner):
    """Throughput: Small batch of concurrent calls."""
    tasks = [
        grpc_runner.run_asyncio(dummy_grpc_func, DummyMessage(), timeout=1)
        for _ in range(5)
    ]
    results = await asyncio.gather(*tasks)

@pytest.mark.asyncio

async def test_run_asyncio_throughput_large_load(grpc_runner):
    """Throughput: Larger batch of concurrent calls."""
    tasks = [
        grpc_runner.run_asyncio(dummy_grpc_func, DummyMessage(), timeout=3)
        for _ in range(75)
    ]
    results = await asyncio.gather(*tasks)
# codeflash_output is used to check that the output of the original code is the same as that of the optimized code.
#------------------------------------------------
import asyncio  # used to run async functions
# The function to test (EXACT COPY, DO NOT MODIFY)
from functools import wraps
from typing import Dict, Optional

import pytest  # used for our unit tests
from pinecone.grpc.grpc_runner import GrpcRunner


# Mocks and helpers for testing
class DummyMessage:
    """A dummy protobuf message for test purposes."""
    pass

class DummyCredentials:
    """A dummy credentials object for test purposes."""
    pass

class DummyCompression:
    """A dummy compression object for test purposes."""
    pass

class DummyInactiveRpcError(Exception):
    """A dummy _InactiveRpcError for simulating gRPC errors."""
    def __init__(self, debug_error_string):
        self._state = type('state', (), {'debug_error_string': debug_error_string})()

# Minimal PineconeException for error propagation
class PineconeException(Exception):
    pass

# Minimal Config and GRPCClientConfig for GrpcRunner construction
class Config:
    def __init__(self, api_key):
        self.api_key = api_key

class GRPCClientConfig:
    def __init__(self, additional_metadata=None):
        self.additional_metadata = additional_metadata or {}

# Constants for GrpcRunner
CLIENT_VERSION = "1.0.0"
API_VERSION = "2023-01-01"
from pinecone.grpc.grpc_runner import GrpcRunner

# ---------- UNIT TESTS ----------

@pytest.mark.asyncio
async def test_run_asyncio_basic_return_value():
    """Test that run_asyncio returns the expected value from the coroutine."""
    async def dummy_func(request, **kwargs):
        # Return a simple value for testing
        return "success"
    runner = GrpcRunner("test-index", Config("test-key"), GRPCClientConfig())
    result = await runner.run_asyncio(dummy_func, DummyMessage())

@pytest.mark.asyncio
async def test_run_asyncio_basic_metadata_merge():
    """Test that metadata is merged correctly with fixed_metadata."""
    async def dummy_func(request, **kwargs):
        # Return the metadata for inspection
        return kwargs["metadata"]
    runner = GrpcRunner("test-index", Config("test-key"), GRPCClientConfig({"extra": "value"}))
    user_metadata = {"user": "meta"}
    result = await runner.run_asyncio(dummy_func, DummyMessage(), metadata=user_metadata)

@pytest.mark.asyncio
async def test_run_asyncio_basic_with_all_params():
    """Test that all parameters are passed to the underlying function."""
    async def dummy_func(request, **kwargs):
        # Return all received parameters for inspection
        return kwargs
    runner = GrpcRunner("test-index", Config("test-key"), GRPCClientConfig())
    result = await runner.run_asyncio(
        dummy_func,
        DummyMessage(),
        timeout=5,
        metadata={"foo": "bar"},
        credentials=DummyCredentials(),
        wait_for_ready=True,
        compression=DummyCompression(),
    )

@pytest.mark.asyncio

async def test_run_asyncio_edge_user_metadata_overrides_fixed():
    """Test that user metadata can override fixed_metadata keys."""
    async def dummy_func(request, **kwargs):
        return kwargs["metadata"]
    runner = GrpcRunner("test-index", Config("test-key"), GRPCClientConfig())
    # Override the api-key in user metadata
    user_metadata = {"api-key": "override-key"}
    result = await runner.run_asyncio(dummy_func, DummyMessage(), metadata=user_metadata)

@pytest.mark.asyncio



async def test_run_asyncio_throughput_small_load():
    """Throughput test: small load of 10 requests."""
    async def dummy_func(request, **kwargs):
        return "ok"
    runner = GrpcRunner("test-index", Config("test-key"), GRPCClientConfig())
    coros = [runner.run_asyncio(dummy_func, DummyMessage()) for _ in range(10)]
    results = await asyncio.gather(*coros)

@pytest.mark.asyncio

To edit these changes git checkout codeflash/optimize-GrpcRunner.run_asyncio-mh9rd3ds and push.

Codeflash

The optimization removes an unnecessary async wrapper function that was creating overhead in each call to `run_asyncio`. 

**Key change**: The original code used a `@wraps(func)` decorator to create an inner `wrapped()` async function, then immediately called `await wrapped()`. This pattern added an extra function call layer and async frame creation/destruction overhead without providing any functional benefit.

**Specific optimization**: The optimized version inlines the wrapper logic directly into the `run_asyncio` method, eliminating:
- The `@wraps(func)` decorator overhead
- Creation of an extra async function object
- An additional async call stack frame
- The `return await wrapped()` indirection

**Performance impact**: The line profiler shows the elimination of the expensive wrapper creation (originally 20.4% of runtime) and the costly `return await wrapped()` call (originally 75.6% of runtime). The optimized version spends most time (80.4%) on the actual metadata preparation and gRPC function call.

**Throughput benefits**: The 8.7% throughput improvement (119,853 → 130,275 operations/second) comes from reduced per-call overhead, making this optimization particularly effective for high-frequency async gRPC operations where the wrapper elimination provides consistent savings across all calls.

The optimization works best for scenarios with many concurrent calls or high-throughput gRPC operations, as demonstrated by the throughput tests where the reduced call stack overhead compounds across multiple simultaneous operations.
@codeflash-ai codeflash-ai bot requested a review from mashraf-222 October 27, 2025 23:17
@codeflash-ai codeflash-ai bot added ⚡️ codeflash Optimization PR opened by Codeflash AI 🎯 Quality: High Optimization Quality according to Codeflash labels Oct 27, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

⚡️ codeflash Optimization PR opened by Codeflash AI 🎯 Quality: High Optimization Quality according to Codeflash

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant