11using System ;
2- using System . Collections . Concurrent ;
3- using System . Collections . Generic ;
42using System . IO ;
53using System . Linq ;
6- using System . Text ;
74using System . Threading ;
85using System . Threading . Tasks ;
9- using JsonRpc . Server ;
106using JsonRpc . Server . Messages ;
117using Newtonsoft . Json . Linq ;
128
139namespace JsonRpc
1410{
1511 public class InputHandler : IInputHandler
1612 {
17- private readonly TimeSpan _sleepTime = TimeSpan . FromMilliseconds ( 50 ) ;
1813 public const char CR = '\r ' ;
1914 public const char LF = '\n ' ;
2015 public static char [ ] CRLF = { CR , LF } ;
@@ -28,8 +23,7 @@ public class InputHandler : IInputHandler
2823 private Thread _inputThread ;
2924 private readonly IRequestRouter _requestRouter ;
3025 private readonly IResponseRouter _responseRouter ;
31- private readonly ConcurrentQueue < ( RequestProcessType type , Func < Task > request ) > _queue ;
32- private Thread _queueThread ;
26+ private readonly IScheduler _scheduler ;
3327
3428 public InputHandler (
3529 Stream input ,
@@ -47,31 +41,16 @@ IResponseRouter responseRouter
4741 _requestProcessIdentifier = requestProcessIdentifier ;
4842 _requestRouter = requestRouter ;
4943 _responseRouter = responseRouter ;
50- _queue = new ConcurrentQueue < ( RequestProcessType type , Func < Task > request ) > ( ) ;
5144
52- _inputThread = new Thread ( ProcessInputStream ) { IsBackground = true } ;
53-
54- _queueThread = new Thread ( ProcessRequestQueue ) { IsBackground = true } ;
55- }
56-
57- internal InputHandler (
58- Stream input ,
59- IOutputHandler outputHandler ,
60- IReciever reciever ,
61- IRequestProcessIdentifier requestProcessIdentifier ,
62- IRequestRouter requestRouter ,
63- IResponseRouter responseRouter ,
64- TimeSpan sleepTime
65- ) : this ( input , outputHandler , reciever , requestProcessIdentifier , requestRouter , responseRouter )
66- {
67- _sleepTime = sleepTime ;
68- }
45+ _scheduler = new ProcessScheduler ( ) ;
46+ _inputThread = new Thread ( ProcessInputStream ) { IsBackground = true , Name = "ProcessInputStream" } ;
47+ }
6948
7049 public void Start ( )
7150 {
7251 _outputHandler . Start ( ) ;
7352 _inputThread . Start ( ) ;
74- _queueThread . Start ( ) ;
53+ _scheduler . Start ( ) ;
7554 }
7655
7756 private async void ProcessInputStream ( )
@@ -85,33 +64,49 @@ private async void ProcessInputStream()
8564
8665 var buffer = new byte [ 300 ] ;
8766 var current = await _input . ReadAsync ( buffer , 0 , MinBuffer ) ;
67+ if ( current == 0 ) return ; // no more _input
8868 while ( current < MinBuffer ||
8969 buffer [ current - 4 ] != CR || buffer [ current - 3 ] != LF ||
9070 buffer [ current - 2 ] != CR || buffer [ current - 1 ] != LF )
9171 {
92- current += await _input . ReadAsync ( buffer , current , 1 ) ;
72+ var n = await _input . ReadAsync ( buffer , current , 1 ) ;
73+ if ( n == 0 ) return ; // no more _input, mitigates endless loop here.
74+ current += n ;
9375 }
9476
9577 var headersContent = System . Text . Encoding . ASCII . GetString ( buffer , 0 , current ) ;
9678 var headers = headersContent . Split ( HeaderKeys , StringSplitOptions . RemoveEmptyEntries ) ;
9779 long length = 0 ;
98- for ( var i = 0 ; i < headers . Length ; i += 2 )
80+ for ( var i = 1 ; i < headers . Length ; i += 2 )
9981 {
100- var header = headers [ 0 ] ;
101- var value = headers [ i + 1 ] . Trim ( ) ;
82+ // starting at i = 1 instead of 0 won't throw, if we have uneven headers' length
83+ var header = headers [ i - 1 ] ;
84+ var value = headers [ i ] . Trim ( ) ;
10285 if ( header . Equals ( "Content-Length" , StringComparison . OrdinalIgnoreCase ) )
10386 {
104- length = long . Parse ( value ) ;
87+ length = 0 ;
88+ long . TryParse ( value , out length ) ;
10589 }
10690 }
10791
108- var requestBuffer = new byte [ length ] ;
109-
110- await _input . ReadAsync ( requestBuffer , 0 , requestBuffer . Length ) ;
111-
112- var payload = System . Text . Encoding . UTF8 . GetString ( requestBuffer ) ;
113-
114- HandleRequest ( payload ) ;
92+ if ( length == 0 || length >= int . MaxValue )
93+ {
94+ HandleRequest ( string . Empty ) ;
95+ }
96+ else
97+ {
98+ var requestBuffer = new byte [ length ] ;
99+ var received = 0 ;
100+ while ( received < length )
101+ {
102+ var n = await _input . ReadAsync ( requestBuffer , received , requestBuffer . Length - received ) ;
103+ if ( n == 0 ) return ; // no more _input
104+ received += n ;
105+ }
106+ // TODO sometimes: encoding should be based on the respective header (including the wrong "utf8" value)
107+ var payload = System . Text . Encoding . UTF8 . GetString ( requestBuffer ) ;
108+ HandleRequest ( payload ) ;
109+ }
115110 }
116111 }
117112
@@ -162,24 +157,23 @@ private void HandleRequest(string request)
162157 {
163158 if ( item . IsRequest )
164159 {
165- _queue . Enqueue ( (
160+ _scheduler . Add (
166161 type ,
167162 async ( ) => {
168163 var result = await _requestRouter . RouteRequest ( item . Request ) ;
169-
170164 _outputHandler . Send ( result . Value ) ;
171165 }
172- ) ) ;
166+ ) ;
173167 }
174168 else if ( item . IsNotification )
175169 {
176- _queue . Enqueue ( (
170+ _scheduler . Add (
177171 type ,
178172 ( ) => {
179173 _requestRouter . RouteNotification ( item . Notification ) ;
180174 return Task . CompletedTask ;
181175 }
182- ) ) ;
176+ ) ;
183177 }
184178 else if ( item . IsError )
185179 {
@@ -189,42 +183,12 @@ private void HandleRequest(string request)
189183 }
190184 }
191185
192- private bool IsNextSerial ( )
193- {
194- return _queue . TryPeek ( out var queueResult ) && queueResult . type == RequestProcessType . Serial ;
195- }
196-
197- private async void ProcessRequestQueue ( )
198- {
199- while ( true )
200- {
201- if ( _queueThread == null ) return ;
202- var items = new List < Func < Task > > ( ) ;
203- while ( ! _queue . IsEmpty )
204- {
205- if ( IsNextSerial ( ) && items . Count > 0 )
206- {
207- break ;
208- }
209-
210- if ( _queue . TryDequeue ( out var queueResult ) )
211- items . Add ( queueResult . request ) ;
212- }
213-
214- await Task . WhenAll ( items . Select ( x => x ( ) ) ) ;
215-
216- if ( _queue . IsEmpty )
217- {
218- await Task . Delay ( _sleepTime ) ;
219- }
220- }
221- }
222186
223187 public void Dispose ( )
224188 {
225189 _outputHandler . Dispose ( ) ;
226190 _inputThread = null ;
227- _queueThread = null ;
191+ _scheduler . Dispose ( ) ;
228192 }
229193 }
230194}
0 commit comments