11using System ;
2- using System . Collections . Concurrent ;
3- using System . Collections . Generic ;
42using System . IO ;
53using System . Linq ;
6- using System . Text ;
74using System . Threading ;
85using System . Threading . Tasks ;
9- using JsonRpc . Server ;
106using JsonRpc . Server . Messages ;
117using Newtonsoft . Json . Linq ;
128
139namespace JsonRpc
1410{
1511 public class InputHandler : IInputHandler
1612 {
17- private readonly TimeSpan _sleepTime = TimeSpan . FromMilliseconds ( 50 ) ;
1813 public const char CR = '\r ' ;
1914 public const char LF = '\n ' ;
2015 public static char [ ] CRLF = { CR , LF } ;
@@ -28,8 +23,7 @@ public class InputHandler : IInputHandler
2823 private Thread _inputThread ;
2924 private readonly IRequestRouter _requestRouter ;
3025 private readonly IResponseRouter _responseRouter ;
31- private readonly ConcurrentQueue < ( RequestProcessType type , Func < Task > request ) > _queue ;
32- private Thread _queueThread ;
26+ private readonly IScheduler _scheduler ;
3327
3428 public InputHandler (
3529 TextReader input ,
@@ -46,31 +40,16 @@ IResponseRouter responseRouter
4640 _requestProcessIdentifier = requestProcessIdentifier ;
4741 _requestRouter = requestRouter ;
4842 _responseRouter = responseRouter ;
49- _queue = new ConcurrentQueue < ( RequestProcessType type , Func < Task > request ) > ( ) ;
5043
51- _inputThread = new Thread ( ProcessInputStream ) { IsBackground = true } ;
52-
53- _queueThread = new Thread ( ProcessRequestQueue ) { IsBackground = true } ;
54- }
55-
56- internal InputHandler (
57- TextReader input ,
58- IOutputHandler outputHandler ,
59- IReciever reciever ,
60- IRequestProcessIdentifier requestProcessIdentifier ,
61- IRequestRouter requestRouter ,
62- IResponseRouter responseRouter ,
63- TimeSpan sleepTime
64- ) : this ( input , outputHandler , reciever , requestProcessIdentifier , requestRouter , responseRouter )
65- {
66- _sleepTime = sleepTime ;
67- }
44+ _scheduler = new ProcessScheduler ( ) ;
45+ _inputThread = new Thread ( ProcessInputStream ) { IsBackground = true , Name = "ProcessInputStream" } ;
46+ }
6847
6948 public void Start ( )
7049 {
7150 _outputHandler . Start ( ) ;
7251 _inputThread . Start ( ) ;
73- _queueThread . Start ( ) ;
52+ _scheduler . Start ( ) ;
7453 }
7554
7655 private async void ProcessInputStream ( )
@@ -81,10 +60,14 @@ private async void ProcessInputStream()
8160
8261 var buffer = new char [ 300 ] ;
8362 var current = await _input . ReadBlockAsync ( buffer , 0 , MinBuffer ) ;
63+ if ( current == 0 ) return ; // no more _input
64+
8465 while ( current < MinBuffer || buffer [ current - 4 ] != CR || buffer [ current - 3 ] != LF ||
8566 buffer [ current - 2 ] != CR || buffer [ current - 1 ] != LF )
8667 {
87- current += await _input . ReadBlockAsync ( buffer , current , 1 ) ;
68+ var n = await _input . ReadBlockAsync ( buffer , current , 1 ) ;
69+ if ( n == 0 ) return ; // no more _input, mitigates endless loop here.
70+ current += n ;
8871 }
8972
9073 var headersContent = new string ( buffer , 0 , current ) ;
@@ -97,17 +80,28 @@ private async void ProcessInputStream()
9780 var value = headers [ i ] . Trim ( ) ;
9881 if ( header . Equals ( "Content-Length" , StringComparison . OrdinalIgnoreCase ) )
9982 {
100- length = long . Parse ( value ) ;
83+ length = 0 ;
84+ long . TryParse ( value , out length ) ;
10185 }
10286 }
10387
104- var requestBuffer = new char [ length ] ;
105-
106- await _input . ReadBlockAsync ( requestBuffer , 0 , requestBuffer . Length ) ;
107-
108- var payload = new string ( requestBuffer ) ;
109-
110- HandleRequest ( payload ) ;
88+ if ( length == 0 || length >= int . MaxValue )
89+ {
90+ HandleRequest ( string . Empty ) ;
91+ }
92+ else
93+ {
94+ var requestBuffer = new char [ length ] ;
95+ var received = 0 ;
96+ while ( received < length )
97+ {
98+ var n = await _input . ReadBlockAsync ( requestBuffer , received , requestBuffer . Length - received ) ;
99+ if ( n == 0 ) return ; // no more _input
100+ received += n ;
101+ }
102+ var payload = new string ( requestBuffer ) ;
103+ HandleRequest ( payload ) ;
104+ }
111105 }
112106 }
113107
@@ -158,24 +152,23 @@ private void HandleRequest(string request)
158152 {
159153 if ( item . IsRequest )
160154 {
161- _queue . Enqueue ( (
155+ _scheduler . Add (
162156 type ,
163157 async ( ) => {
164158 var result = await _requestRouter . RouteRequest ( item . Request ) ;
165-
166159 _outputHandler . Send ( result . Value ) ;
167160 }
168- ) ) ;
161+ ) ;
169162 }
170163 else if ( item . IsNotification )
171164 {
172- _queue . Enqueue ( (
165+ _scheduler . Add (
173166 type ,
174167 ( ) => {
175168 _requestRouter . RouteNotification ( item . Notification ) ;
176169 return Task . CompletedTask ;
177170 }
178- ) ) ;
171+ ) ;
179172 }
180173 else if ( item . IsError )
181174 {
@@ -185,42 +178,12 @@ private void HandleRequest(string request)
185178 }
186179 }
187180
188- private bool IsNextSerial ( )
189- {
190- return _queue . TryPeek ( out var queueResult ) && queueResult . type == RequestProcessType . Serial ;
191- }
192-
193- private async void ProcessRequestQueue ( )
194- {
195- while ( true )
196- {
197- if ( _queueThread == null ) return ;
198- var items = new List < Func < Task > > ( ) ;
199- while ( ! _queue . IsEmpty )
200- {
201- if ( IsNextSerial ( ) && items . Count > 0 )
202- {
203- break ;
204- }
205-
206- if ( _queue . TryDequeue ( out var queueResult ) )
207- items . Add ( queueResult . request ) ;
208- }
209-
210- await Task . WhenAll ( items . Select ( x => x ( ) ) ) ;
211-
212- if ( _queue . IsEmpty )
213- {
214- await Task . Delay ( _sleepTime ) ;
215- }
216- }
217- }
218181
219182 public void Dispose ( )
220183 {
221184 _outputHandler . Dispose ( ) ;
222185 _inputThread = null ;
223- _queueThread = null ;
186+ _scheduler . Dispose ( ) ;
224187 }
225188 }
226189}
0 commit comments