11using System ;
2- using System . Collections . Concurrent ;
3- using System . Collections . Generic ;
42using System . IO ;
53using System . Linq ;
6- using System . Text ;
74using System . Threading ;
85using System . Threading . Tasks ;
9- using JsonRpc . Server ;
106using JsonRpc . Server . Messages ;
117using Newtonsoft . Json . Linq ;
128
@@ -27,9 +23,7 @@ public class InputHandler : IInputHandler
2723 private Thread _inputThread ;
2824 private readonly IRequestRouter _requestRouter ;
2925 private readonly IResponseRouter _responseRouter ;
30- private readonly BlockingCollection < ( RequestProcessType type , Func < Task > request ) > _queue ;
31- private readonly CancellationTokenSource _cancelQueue ;
32- private Thread _queueThread ;
26+ private readonly ProcessScheduler _scheduler ;
3327
3428 public InputHandler (
3529 TextReader input ,
@@ -46,19 +40,16 @@ IResponseRouter responseRouter
4640 _requestProcessIdentifier = requestProcessIdentifier ;
4741 _requestRouter = requestRouter ;
4842 _responseRouter = responseRouter ;
49- _queue = new BlockingCollection < ( RequestProcessType type , Func < Task > request ) > ( ) ;
50- _cancelQueue = new CancellationTokenSource ( ) ;
5143
44+ _scheduler = new ProcessScheduler ( ) ;
5245 _inputThread = new Thread ( ProcessInputStream ) { IsBackground = true } ;
53-
54- _queueThread = new Thread ( ProcessRequestQueue ) { IsBackground = true } ;
5546 }
5647
5748 public void Start ( )
5849 {
5950 _outputHandler . Start ( ) ;
6051 _inputThread . Start ( ) ;
61- _queueThread . Start ( ) ;
52+ _scheduler . Start ( ) ;
6253 }
6354
6455 private async void ProcessInputStream ( )
@@ -145,24 +136,24 @@ private void HandleRequest(string request)
145136 {
146137 if ( item . IsRequest )
147138 {
148- _queue . Add ( (
139+ _scheduler . Add (
149140 type ,
150141 async ( ) => {
151142 var result = await _requestRouter . RouteRequest ( item . Request ) ;
152143
153144 _outputHandler . Send ( result . Value ) ;
154145 }
155- ) ) ;
146+ ) ;
156147 }
157148 else if ( item . IsNotification )
158149 {
159- _queue . Add ( (
150+ _scheduler . Add (
160151 type ,
161152 ( ) => {
162153 _requestRouter . RouteNotification ( item . Notification ) ;
163154 return Task . CompletedTask ;
164155 }
165- ) ) ;
156+ ) ;
166157 }
167158 else if ( item . IsError )
168159 {
@@ -172,46 +163,12 @@ private void HandleRequest(string request)
172163 }
173164 }
174165
175- private Task Start ( Func < Task > request )
176- {
177- var t = request ( ) ;
178- t . Start ( ) ;
179- return t ;
180- }
181-
182- private async void ProcessRequestQueue ( )
183- {
184- // see https://github.com/OmniSharp/csharp-language-server-protocol/issues/4
185- var token = _cancelQueue . Token ;
186- var waitables = new List < Task > ( ) ;
187- while ( true )
188- {
189- if ( _queueThread == null ) return ;
190- if ( _queue . TryTake ( out var item , Timeout . Infinite , token ) )
191- {
192- var ( type , request ) = item ;
193- if ( type == RequestProcessType . Serial )
194- {
195- await Task . WhenAll ( waitables ) ;
196- waitables . Clear ( ) ;
197- await Start ( request ) ;
198- }
199- else if ( type == RequestProcessType . Parallel )
200- {
201- waitables . Add ( Start ( request ) ) ;
202- }
203- else
204- throw new NotImplementedException ( "Only Serial and Parallel execution types can be handled currently" ) ;
205- }
206- }
207- }
208166
209167 public void Dispose ( )
210168 {
211169 _outputHandler . Dispose ( ) ;
212170 _inputThread = null ;
213- _queueThread = null ;
214- _cancelQueue . Cancel ( ) ;
171+ _scheduler ? . Dispose ( ) ;
215172 }
216173 }
217174}
0 commit comments