Skip to content

Commit 2746cb9

Browse files
committed
Removed busy waiting in the processing scheduler thread ProcessRequestQueue()
1 parent c71fef1 commit 2746cb9

File tree

2 files changed

+27
-40
lines changed

2 files changed

+27
-40
lines changed

src/JsonRpc/InputHandler.cs

Lines changed: 26 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@ namespace JsonRpc
1414
{
1515
public class InputHandler : IInputHandler
1616
{
17-
private readonly TimeSpan _sleepTime = TimeSpan.FromMilliseconds(50);
1817
public const char CR = '\r';
1918
public const char LF = '\n';
2019
public static char[] CRLF = { CR, LF };
@@ -28,7 +27,8 @@ public class InputHandler : IInputHandler
2827
private Thread _inputThread;
2928
private readonly IRequestRouter _requestRouter;
3029
private readonly IResponseRouter _responseRouter;
31-
private readonly ConcurrentQueue<(RequestProcessType type, Func<Task> request)> _queue;
30+
private readonly BlockingCollection<(RequestProcessType type, Func<Task> request)> _queue;
31+
private readonly CancellationTokenSource _cancelQueue;
3232
private Thread _queueThread;
3333

3434
public InputHandler(
@@ -46,26 +46,14 @@ IResponseRouter responseRouter
4646
_requestProcessIdentifier = requestProcessIdentifier;
4747
_requestRouter = requestRouter;
4848
_responseRouter = responseRouter;
49-
_queue = new ConcurrentQueue<(RequestProcessType type, Func<Task> request)>();
49+
_queue = new BlockingCollection<(RequestProcessType type, Func<Task> request)>();
50+
_cancelQueue = new CancellationTokenSource();
5051

5152
_inputThread = new Thread(ProcessInputStream) { IsBackground = true };
5253

5354
_queueThread = new Thread(ProcessRequestQueue) { IsBackground = true };
5455
}
5556

56-
internal InputHandler(
57-
TextReader input,
58-
IOutputHandler outputHandler,
59-
IReciever reciever,
60-
IRequestProcessIdentifier requestProcessIdentifier,
61-
IRequestRouter requestRouter,
62-
IResponseRouter responseRouter,
63-
TimeSpan sleepTime
64-
) : this(input, outputHandler, reciever, requestProcessIdentifier, requestRouter, responseRouter)
65-
{
66-
_sleepTime = sleepTime;
67-
}
68-
6957
public void Start()
7058
{
7159
_outputHandler.Start();
@@ -157,7 +145,7 @@ private void HandleRequest(string request)
157145
{
158146
if (item.IsRequest)
159147
{
160-
_queue.Enqueue((
148+
_queue.Add((
161149
type,
162150
async () => {
163151
var result = await _requestRouter.RouteRequest(item.Request);
@@ -168,7 +156,7 @@ private void HandleRequest(string request)
168156
}
169157
else if (item.IsNotification)
170158
{
171-
_queue.Enqueue((
159+
_queue.Add((
172160
type,
173161
() => {
174162
_requestRouter.RouteNotification(item.Notification);
@@ -184,33 +172,32 @@ private void HandleRequest(string request)
184172
}
185173
}
186174

187-
private bool IsNextSerial()
188-
{
189-
return _queue.TryPeek(out var queueResult) && queueResult.type == RequestProcessType.Serial;
190-
}
191-
192175
private async void ProcessRequestQueue()
193176
{
194-
while (true)
177+
// see https://github.com/OmniSharp/csharp-language-server-protocol/issues/4
178+
var token = _cancelQueue.Token;
179+
var waitables = new List<Task>();
180+
while(true)
195181
{
196182
if (_queueThread == null) return;
197-
var items = new List<Func<Task>>();
198-
while (!_queue.IsEmpty)
183+
if (_queue.TryTake(out var item, Timeout.Infinite, token))
199184
{
200-
if (IsNextSerial() && items.Count > 0)
185+
var (type, request) = item;
186+
var task = request();
187+
if (type == RequestProcessType.Serial)
201188
{
202-
break;
189+
await Task.WhenAll(waitables);
190+
waitables.Clear();
191+
task.Start();
192+
await task;
203193
}
204-
205-
if (_queue.TryDequeue(out var queueResult))
206-
items.Add(queueResult.request);
207-
}
208-
209-
await Task.WhenAll(items.Select(x => x()));
210-
211-
if (_queue.IsEmpty)
212-
{
213-
await Task.Delay(_sleepTime);
194+
else if (type == RequestProcessType.Parallel)
195+
{
196+
task.Start();
197+
waitables.Add(task);
198+
}
199+
else
200+
throw new NotImplementedException("Only Serial and Parallel execution types can be handled currently");
214201
}
215202
}
216203
}
@@ -220,6 +207,7 @@ public void Dispose()
220207
_outputHandler.Dispose();
221208
_inputThread = null;
222209
_queueThread = null;
210+
_cancelQueue.Cancel();
223211
}
224212
}
225213
}

test/JsonRpc.Tests/InputHandlerTests.cs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,7 @@ private static InputHandler NewHandler(
3838
reciever,
3939
requestProcessIdentifier,
4040
requestRouter,
41-
responseRouter,
42-
TimeSpan.Zero);
41+
responseRouter);
4342
handler.Start();
4443
cts.Wait();
4544
return handler;

0 commit comments

Comments
 (0)