Skip to content

Commit acb618b

Browse files
Added ability to define the scheduler used for both input and output (#458)
1 parent 30cbd6e commit acb618b

File tree

8 files changed

+62
-22
lines changed

8 files changed

+62
-22
lines changed

src/JsonRpc/Connection.cs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
using System;
22
using System.IO.Pipelines;
3+
using System.Reactive.Concurrency;
34
using System.Threading.Tasks;
45
using Microsoft.Extensions.Logging;
56

@@ -22,6 +23,7 @@ public Connection(
2223
TimeSpan requestTimeout,
2324
bool supportContentModified,
2425
int concurrency,
26+
IScheduler scheduler,
2527
CreateResponseExceptionHandler? getException = null
2628
) =>
2729
_inputHandler = new InputHandler(
@@ -36,7 +38,8 @@ public Connection(
3638
getException,
3739
requestTimeout,
3840
supportContentModified,
39-
concurrency > 1 ? (int?) concurrency : null
41+
concurrency > 1 ? (int?) concurrency : null,
42+
scheduler
4043
);
4144

4245
public void Open()

src/JsonRpc/InputHandler.cs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,8 @@ public InputHandler(
7070
CreateResponseExceptionHandler? getException,
7171
TimeSpan requestTimeout,
7272
bool supportContentModified,
73-
int? concurrency
73+
int? concurrency,
74+
IScheduler scheduler
7475
)
7576
{
7677
_pipeReader = pipeReader;
@@ -87,7 +88,7 @@ public InputHandler(
8788
loggerFactory,
8889
supportContentModified,
8990
concurrency,
90-
TaskPoolScheduler.Default
91+
scheduler
9192
);
9293
_headersBuffer = new Memory<byte>(new byte[HeadersFinishedLength]);
9394
_contentLengthBuffer = new Memory<byte>(new byte[ContentLengthLength]);

src/JsonRpc/JsonRpcServerOptionsBase.cs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
using System.IO;
44
using System.IO.Pipelines;
55
using System.Linq;
6+
using System.Reactive.Concurrency;
67
using System.Reactive.Disposables;
78
using System.Reflection;
89
using Microsoft.Extensions.DependencyInjection;
@@ -31,6 +32,8 @@ public ILoggerFactory LoggerFactory
3132
public IEnumerable<Assembly> Assemblies { get; set; } = Enumerable.Empty<Assembly>();
3233
public IRequestProcessIdentifier? RequestProcessIdentifier { get; set; }
3334
public int? Concurrency { get; set; }
35+
public IScheduler InputScheduler { get; set; } = TaskPoolScheduler.Default;
36+
public IScheduler OutputScheduler { get; set; } = TaskPoolScheduler.Default;
3437
public CreateResponseExceptionHandler? CreateResponseException { get; set; }
3538
public OnUnhandledExceptionHandler? OnUnhandledException { get; set; }
3639
public bool SupportsContentModified { get; set; } = true;

src/JsonRpc/JsonRpcServerOptionsExtensions.cs

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
using System.Reactive.Concurrency;
2+
13
namespace OmniSharp.Extensions.JsonRpc
24
{
35
public static class JsonRpcServerOptionsExtensions
@@ -7,5 +9,41 @@ public static JsonRpcServerOptions WithSerializer(this JsonRpcServerOptions opti
79
options.Serializer = serializer;
810
return options;
911
}
12+
13+
/// <summary>
14+
/// Sets both input and output schedulers to the same scheduler
15+
/// </summary>
16+
/// <param name="options"></param>
17+
/// <param name="inputScheduler"></param>
18+
/// <returns></returns>
19+
public static JsonRpcServerOptions WithScheduler(this JsonRpcServerOptions options, IScheduler inputScheduler)
20+
{
21+
options.InputScheduler = options.OutputScheduler = inputScheduler;
22+
return options;
23+
}
24+
25+
/// <summary>
26+
/// Sets the scheduler used during reading input
27+
/// </summary>
28+
/// <param name="options"></param>
29+
/// <param name="inputScheduler"></param>
30+
/// <returns></returns>
31+
public static JsonRpcServerOptions WithInputScheduler(this JsonRpcServerOptions options, IScheduler inputScheduler)
32+
{
33+
options.InputScheduler = inputScheduler;
34+
return options;
35+
}
36+
37+
/// <summary>
38+
/// Sets the scheduler use during writing output
39+
/// </summary>
40+
/// <param name="options"></param>
41+
/// <param name="outputScheduler"></param>
42+
/// <returns></returns>
43+
public static JsonRpcServerOptions WithOutputScheduler(this JsonRpcServerOptions options, IScheduler outputScheduler)
44+
{
45+
options.OutputScheduler = outputScheduler;
46+
return options;
47+
}
1048
}
1149
}

src/JsonRpc/JsonRpcServerServiceCollectionExtensions.cs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
using System;
22
using System.IO.Pipelines;
33
using System.Linq;
4+
using System.Reactive.Concurrency;
45
using System.Threading;
56
using System.Threading.Tasks;
67
using DryIoc;
@@ -32,6 +33,8 @@ internal static IContainer AddJsonRpcServerCore<T>(this IContainer container, Js
3233
container.RegisterInstance(options.MaximumRequestTimeout, serviceKey: nameof(options.MaximumRequestTimeout));
3334
container.RegisterInstance(options.SupportsContentModified, serviceKey: nameof(options.SupportsContentModified));
3435
container.RegisterInstance(options.Concurrency ?? -1, serviceKey: nameof(options.Concurrency));
36+
container.RegisterInstance(options.InputScheduler, serviceKey: nameof(options.InputScheduler));
37+
container.RegisterInstance(options.OutputScheduler, serviceKey: nameof(options.OutputScheduler));
3538
if (options.CreateResponseException != null)
3639
{
3740
container.RegisterInstance(options.CreateResponseException);
@@ -40,15 +43,18 @@ internal static IContainer AddJsonRpcServerCore<T>(this IContainer container, Js
4043
container.RegisterMany<OutputHandler>(
4144
nonPublicServiceTypes: true,
4245
made: Parameters.Of
43-
.Type<PipeWriter>(serviceKey: nameof(options.Output)),
46+
.Type<PipeWriter>(serviceKey: nameof(options.Output))
47+
.Type<IScheduler>(serviceKey: nameof(options.OutputScheduler)),
4448
reuse: Reuse.Singleton
4549
);
4650
container.Register<Connection>(
4751
made: new Made.TypedMade<Connection>().Parameters
4852
.Type<PipeReader>(serviceKey: nameof(options.Input))
4953
.Type<TimeSpan>(serviceKey: nameof(options.MaximumRequestTimeout))
5054
.Type<bool>(serviceKey: nameof(options.SupportsContentModified))
51-
.Name("concurrency", serviceKey: nameof(options.Concurrency)),
55+
.Name("concurrency", serviceKey: nameof(options.Concurrency))
56+
.Type<IScheduler>(serviceKey: nameof(options.InputScheduler))
57+
,
5258
reuse: Reuse.Singleton
5359
);
5460

src/JsonRpc/OutputHandler.cs

Lines changed: 0 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -63,21 +63,6 @@ private bool ShouldSend(object value)
6363
return _outputFilters.Any(z => z.ShouldOutput(value));
6464
}
6565

66-
public OutputHandler(
67-
PipeWriter pipeWriter,
68-
ISerializer serializer,
69-
IEnumerable<IOutputFilter> outputFilters,
70-
ILogger<OutputHandler> logger
71-
) : this(
72-
pipeWriter,
73-
serializer,
74-
outputFilters,
75-
TaskPoolScheduler.Default,
76-
logger
77-
)
78-
{
79-
}
80-
8166
public void Send(object? value)
8267
{
8368
if (_queue.IsDisposed || _disposable.IsDisposed || value == null) return;

src/Shared/LanguageProtocolServiceCollectionExtensions.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
using System;
22
using System.Linq;
3+
using System.Reactive.Concurrency;
34
using DryIoc;
45
using Microsoft.Extensions.DependencyInjection;
56
using OmniSharp.Extensions.JsonRpc;

test/JsonRpc.Tests/InputHandlerTests.cs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
using System.IO;
33
using System.IO.Pipelines;
44
using System.Linq;
5+
using System.Reactive.Concurrency;
56
using System.Reflection;
67
using System.Text;
78
using System.Threading;
@@ -35,7 +36,8 @@ private InputHandler NewHandler(
3536
IRequestProcessIdentifier requestProcessIdentifier,
3637
IRequestRouter<IHandlerDescriptor?> requestRouter,
3738
ILoggerFactory loggerFactory,
38-
IResponseRouter responseRouter
39+
IResponseRouter responseRouter,
40+
IScheduler? scheduler = null
3941
) =>
4042
new InputHandler(
4143
inputStream,
@@ -49,7 +51,8 @@ IResponseRouter responseRouter
4951
null,
5052
TimeSpan.FromSeconds(30),
5153
true,
52-
null
54+
null,
55+
scheduler ?? TaskPoolScheduler.Default
5356
);
5457

5558
[Fact]

0 commit comments

Comments
 (0)