11using System ;
22using System . Collections . Concurrent ;
33using System . Collections . Generic ;
4+ using System . ComponentModel ;
5+ using System . Diagnostics ;
6+ using System . Linq ;
7+ using System . Reactive ;
8+ using System . Reactive . Concurrency ;
9+ using System . Reactive . Disposables ;
10+ using System . Reactive . Linq ;
11+ using System . Reactive . Subjects ;
412using System . Threading ;
513using System . Threading . Tasks ;
614using Microsoft . Extensions . Logging ;
@@ -9,122 +17,97 @@ namespace OmniSharp.Extensions.JsonRpc
917{
1018 public class ProcessScheduler : IScheduler
1119 {
20+ private readonly int ? _concurrency ;
1221 private readonly ILogger < ProcessScheduler > _logger ;
13- private readonly BlockingCollection < ( RequestProcessType type , string name , Func < Task > request ) > _queue ;
14- private readonly CancellationTokenSource _cancel ;
15- private readonly Thread _thread ;
22+ private readonly IObserver < ( RequestProcessType type , string name , IObservable < Unit > request ) > _enqueue ;
23+ private readonly IObservable < ( RequestProcessType type , string name , IObservable < Unit > request ) > _queue ;
24+ private bool _disposed = false ;
25+ private readonly CompositeDisposable _disposable = new CompositeDisposable ( ) ;
26+ private readonly System . Reactive . Concurrency . IScheduler _scheduler ;
1627
17- public ProcessScheduler ( ILoggerFactory loggerFactory )
28+ public ProcessScheduler ( ILoggerFactory loggerFactory , int ? concurrency ) : this ( loggerFactory , concurrency ,
29+ new EventLoopScheduler (
30+ _ => new Thread ( _ ) { IsBackground = true , Name = "ProcessRequestQueue" } ) )
1831 {
19- _logger = loggerFactory . CreateLogger < ProcessScheduler > ( ) ;
20- _queue = new BlockingCollection < ( RequestProcessType type , string name , Func < Task > request ) > ( ) ;
21- _cancel = new CancellationTokenSource ( ) ;
22- _thread = new Thread ( ProcessRequestQueue ) { IsBackground = true , Name = "ProcessRequestQueue" } ;
2332 }
2433
25- public void Start ( )
34+ internal ProcessScheduler ( ILoggerFactory loggerFactory , int ? concurrency ,
35+ System . Reactive . Concurrency . IScheduler scheduler )
2636 {
27- _thread . Start ( ) ;
28- }
37+ _concurrency = concurrency ;
38+ _logger = loggerFactory . CreateLogger < ProcessScheduler > ( ) ;
2939
30- public void Add ( RequestProcessType type , string name , Func < Task > request )
31- {
32- _queue . Add ( ( type , name , request ) ) ;
40+ var subject = new Subject < ( RequestProcessType type , string name , IObservable < Unit > request ) > ( ) ;
41+ _disposable . Add ( subject ) ;
42+ _enqueue = subject ;
43+ _scheduler = scheduler ;
44+ _queue = subject ;
3345 }
3446
35- private Task Start ( Func < Task > request )
47+ public void Start ( )
3648 {
37- var t = request ( ) ;
38- if ( t . Status == TaskStatus . Created ) // || t.Status = TaskStatus.WaitingForActivation ?
39- t . Start ( ) ;
40- return t ;
41- }
49+ var obs = Observable . Create < Unit > ( observer => {
50+ var cd = new CompositeDisposable ( ) ;
4251
43- private List < Task > RemoveCompleteTasks ( List < Task > list )
44- {
45- if ( list . Count == 0 ) return list ;
52+ var observableQueue =
53+ new BehaviorSubject < ( RequestProcessType type , ReplaySubject < IObservable < Unit > > observer ) > ( (
54+ RequestProcessType . Serial , new ReplaySubject < IObservable < Unit > > ( int . MaxValue ) ) ) ;
55+
56+ cd . Add ( _queue . Subscribe ( item => {
57+ if ( observableQueue . Value . type != item . type )
58+ {
59+ observableQueue . Value . observer . OnCompleted ( ) ;
60+ observableQueue . OnNext ( ( item . type , new ReplaySubject < IObservable < Unit > > ( int . MaxValue ) ) ) ;
61+ }
62+
63+ observableQueue . Value . observer . OnNext ( HandleRequest ( item . name , item . request ) ) ;
64+ } ) ) ;
4665
47- var result = new List < Task > ( ) ;
48- foreach ( var t in list )
66+ cd . Add ( observableQueue
67+ . Select ( item => {
68+ var ( type , replay ) = item ;
69+
70+ if ( type == RequestProcessType . Serial )
71+ return replay . Concat ( ) ;
72+
73+ return _concurrency . HasValue
74+ ? replay . Merge ( _concurrency . Value )
75+ : replay . Merge ( ) ;
76+ } )
77+ . Concat ( )
78+ . Subscribe ( observer )
79+ ) ;
80+
81+ return cd ;
82+ } ) ;
83+
84+ _disposable . Add ( obs
85+ // .ObserveOn(_scheduler)
86+ . Subscribe ( _ => { } )
87+ ) ;
88+
89+ IObservable < Unit > HandleRequest ( string name , IObservable < Unit > request )
4990 {
50- if ( t . IsFaulted )
51- {
52- // TODO: Handle Fault
53- }
54- else if ( ! t . IsCompleted )
55- {
56- result . Add ( t ) ;
57- }
91+ return request
92+ . Catch < Unit , OperationCanceledException > ( ex => Observable . Empty < Unit > ( ) )
93+ . Catch < Unit , Exception > ( ex => {
94+ _logger . LogCritical ( Events . UnhandledException , ex , "Unhandled exception executing {Name}" ,
95+ name ) ;
96+ return Observable . Empty < Unit > ( ) ;
97+ } ) ;
5898 }
59- return result ;
6099 }
61100
62- public long _TestOnly_NonCompleteTaskCount = 0 ;
63- private void ProcessRequestQueue ( )
101+ public void Add ( RequestProcessType type , string name , IObservable < Unit > request )
64102 {
65- // see https://github.com/OmniSharp/csharp-language-server-protocol/issues/4
66- // no need to be async, because this thing already allocated a thread on it's own.
67- var token = _cancel . Token ;
68- var waitables = new List < Task > ( ) ;
69- try
70- {
71- while ( ! token . IsCancellationRequested )
72- {
73- if ( _queue . TryTake ( out var item , Timeout . Infinite , token ) )
74- {
75- var ( type , name , request ) = item ;
76- try
77- {
78- if ( type == RequestProcessType . Serial )
79- {
80- Task . WaitAll ( waitables . ToArray ( ) , token ) ;
81- Start ( request ) . Wait ( token ) ;
82- }
83- else if ( type == RequestProcessType . Parallel )
84- {
85- waitables . Add ( Start ( request ) ) ;
86- }
87- else
88- throw new NotImplementedException ( "Only Serial and Parallel execution types can be handled currently" ) ;
89- waitables = RemoveCompleteTasks ( waitables ) ;
90- Interlocked . Exchange ( ref _TestOnly_NonCompleteTaskCount , waitables . Count ) ;
91- }
92- catch ( OperationCanceledException ex ) when ( ex . CancellationToken == token )
93- {
94- throw ;
95- }
96- catch ( Exception e )
97- {
98- // TODO: Should we rethrow or swallow?
99- // If an exception happens... the whole system could be in a bad state, hence this throwing currently.
100- _logger . LogCritical ( Events . UnhandledException , e , "Unhandled exception executing {Name}" , name ) ;
101- throw ;
102- }
103- }
104- }
105- }
106- catch ( OperationCanceledException ex ) when ( ex . CancellationToken == token )
107- {
108- // OperationCanceledException - The CancellationToken has been canceled.
109- Task . WaitAll ( waitables . ToArray ( ) , TimeSpan . FromMilliseconds ( 1000 ) ) ;
110- var keeponrunning = RemoveCompleteTasks ( waitables ) ;
111- Interlocked . Exchange ( ref _TestOnly_NonCompleteTaskCount , keeponrunning . Count ) ;
112- keeponrunning . ForEach ( ( t ) =>
113- {
114- // TODO: There is no way to abort a Task. As we don't construct the tasks, we can do nothing here
115- // Option is: change the task factory "Func<Task> request" to a "Func<CancellationToken, Task> request"
116- } ) ;
117- }
103+ _enqueue . OnNext ( ( type , name , request ) ) ;
118104 }
119105
120- private bool _disposed = false ;
121106 public void Dispose ( )
122107 {
123108 if ( _disposed ) return ;
124109 _disposed = true ;
125- _cancel . Cancel ( ) ;
126- _thread . Join ( ) ;
127- _cancel . Dispose ( ) ;
110+ _disposable . Dispose ( ) ;
128111 }
129112 }
130113}
0 commit comments