44import logging
55import uuid
66import sys
7+ import asyncio
78
89from concurrent import futures
910from .exceptions import (JsonRpcException , JsonRpcRequestCancelled ,
1213log = logging .getLogger (__name__ )
1314JSONRPC_VERSION = '2.0'
1415CANCEL_METHOD = '$/cancelRequest'
16+ EXIT_METHOD = 'exit'
1517
1618
1719class Endpoint :
@@ -35,9 +37,25 @@ def __init__(self, dispatcher, consumer, id_generator=lambda: str(uuid.uuid4()),
3537 self ._client_request_futures = {}
3638 self ._server_request_futures = {}
3739 self ._executor_service = futures .ThreadPoolExecutor (max_workers = max_workers )
40+ self ._cancelledRequests = set ()
41+ self ._messageQueue = None
42+ self ._consume_task = None
43+
44+ def init_async (self ):
45+ self ._messageQueue = asyncio .Queue ()
46+ self ._consume_task = asyncio .create_task (self .consume_task ())
47+
48+ async def consume_task (self ):
49+ loop = asyncio .get_running_loop ()
50+ while loop .is_running ():
51+ message = await self ._messageQueue .get ()
52+ await asyncio .to_thread (self .consume , message )
53+ self ._messageQueue .task_done ()
3854
3955 def shutdown (self ):
4056 self ._executor_service .shutdown ()
57+ if self ._consume_task is not None :
58+ self ._consume_task .cancel ()
4159
4260 def notify (self , method , params = None ):
4361 """Send a JSON RPC notification to the client.
@@ -94,6 +112,21 @@ def callback(future):
94112 future .set_exception (JsonRpcRequestCancelled ())
95113 return callback
96114
115+ async def consume_async (self , message ):
116+ """Consume a JSON RPC message from the client and put it into a queue.
117+
118+ Args:
119+ message (dict): The JSON RPC message sent by the client
120+ """
121+ if message ['method' ] == CANCEL_METHOD :
122+ self ._cancelledRequests .add (message .get ('params' )['id' ])
123+
124+ # The exit message needs to be handled directly since the stream cannot be closed asynchronously
125+ if message ['method' ] == EXIT_METHOD :
126+ self .consume (message )
127+ else :
128+ await self ._messageQueue .put (message )
129+
97130 def consume (self , message ):
98131 """Consume a JSON RPC message from the client.
99132
@@ -182,6 +215,9 @@ def _handle_request(self, msg_id, method, params):
182215 except KeyError as e :
183216 raise JsonRpcMethodNotFound .of (method ) from e
184217
218+ if msg_id in self ._cancelledRequests :
219+ raise JsonRpcRequestCancelled ()
220+
185221 handler_result = handler (params )
186222
187223 if callable (handler_result ):
0 commit comments