Skip to content

Commit a29eceb

Browse files
author
Oleksandr Poliakov
committed
CSHARP-5560: CSOT: Refactor IOperationExecutor to use operation context
1 parent 2c7fb15 commit a29eceb

14 files changed

+1451
-1566
lines changed

src/MongoDB.Driver/AggregateHelper.cs

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
/* Copyright 2010-present MongoDB Inc.
2+
*
3+
* Licensed under the Apache License, Version 2.0 (the "License");
4+
* you may not use this file except in compliance with the License.
5+
* You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software
10+
* distributed under the License is distributed on an "AS IS" BASIS,
11+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
* See the License for the specific language governing permissions and
13+
* limitations under the License.
14+
*/
15+
16+
using System;
17+
using System.Linq;
18+
using MongoDB.Bson;
19+
20+
namespace MongoDB.Driver
21+
{
22+
internal static class AggregateHelper
23+
{
24+
public static RenderedPipelineDefinition<TResult> RenderAggregatePipeline<TDocument, TResult>(PipelineDefinition<TDocument, TResult> pipeline, RenderArgs<TDocument> renderArgs, out bool isAggregateToCollection)
25+
{
26+
var renderedPipeline = pipeline.Render(renderArgs);
27+
28+
var lastStage = renderedPipeline.Documents.LastOrDefault();
29+
var lastStageName = lastStage?.GetElement(0).Name;
30+
isAggregateToCollection = lastStage != null && (lastStageName == "$out" || lastStageName == "$merge");
31+
32+
return renderedPipeline;
33+
}
34+
35+
public static CollectionNamespace GetOutCollection(BsonDocument outStage, DatabaseNamespace defaultDatabaseNamespace)
36+
{
37+
var stageName = outStage.GetElement(0).Name;
38+
switch (stageName)
39+
{
40+
case "$out":
41+
{
42+
var outValue = outStage[0];
43+
DatabaseNamespace outputDatabaseNamespace;
44+
string outputCollectionName;
45+
if (outValue.IsString)
46+
{
47+
outputDatabaseNamespace = defaultDatabaseNamespace;
48+
outputCollectionName = outValue.AsString;
49+
}
50+
else
51+
{
52+
outputDatabaseNamespace = new DatabaseNamespace(outValue["db"].AsString);
53+
outputCollectionName = outValue["coll"].AsString;
54+
}
55+
return new CollectionNamespace(outputDatabaseNamespace, outputCollectionName);
56+
}
57+
case "$merge":
58+
{
59+
var mergeArguments = outStage[0];
60+
DatabaseNamespace outputDatabaseNamespace;
61+
string outputCollectionName;
62+
if (mergeArguments.IsString)
63+
{
64+
outputDatabaseNamespace = defaultDatabaseNamespace;
65+
outputCollectionName = mergeArguments.AsString;
66+
}
67+
else
68+
{
69+
var into = mergeArguments.AsBsonDocument["into"];
70+
if (into.IsString)
71+
{
72+
outputDatabaseNamespace = defaultDatabaseNamespace;
73+
outputCollectionName = into.AsString;
74+
}
75+
else
76+
{
77+
if (into.AsBsonDocument.Contains("db"))
78+
{
79+
outputDatabaseNamespace = new DatabaseNamespace(into["db"].AsString);
80+
}
81+
else
82+
{
83+
outputDatabaseNamespace = defaultDatabaseNamespace;
84+
}
85+
outputCollectionName = into["coll"].AsString;
86+
}
87+
}
88+
return new CollectionNamespace(outputDatabaseNamespace, outputCollectionName);
89+
}
90+
default:
91+
throw new ArgumentException($"Unexpected stage name: {stageName}.");
92+
}
93+
}
94+
}
95+
}
96+

src/MongoDB.Driver/IOperationExecutor.cs

Lines changed: 23 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,23 +13,40 @@
1313
* limitations under the License.
1414
*/
1515

16-
using System;
1716
using System.Threading;
1817
using System.Threading.Tasks;
19-
using MongoDB.Driver.Core.Bindings;
2018
using MongoDB.Driver.Core.Operations;
2119

2220
namespace MongoDB.Driver
2321
{
2422
internal interface IOperationExecutor
2523
{
26-
TResult ExecuteReadOperation<TResult>(IReadBinding binding, IReadOperation<TResult> operation, CancellationToken cancellationToken);
27-
Task<TResult> ExecuteReadOperationAsync<TResult>(IReadBinding binding, IReadOperation<TResult> operation, CancellationToken cancellationToken);
24+
TResult ExecuteReadOperation<TResult>(
25+
IReadOperation<TResult> operation,
26+
ReadOperationOptions options,
27+
IClientSessionHandle session = null,
28+
CancellationToken cancellationToken = default);
2829

29-
TResult ExecuteWriteOperation<TResult>(IWriteBinding binding, IWriteOperation<TResult> operation, CancellationToken cancellationToken);
30-
Task<TResult> ExecuteWriteOperationAsync<TResult>(IWriteBinding binding, IWriteOperation<TResult> operation, CancellationToken cancellationToken);
30+
Task<TResult> ExecuteReadOperationAsync<TResult>(
31+
IReadOperation<TResult> operation,
32+
ReadOperationOptions options,
33+
IClientSessionHandle session = null,
34+
CancellationToken cancellationToken = default);
35+
36+
TResult ExecuteWriteOperation<TResult>(
37+
IWriteOperation<TResult> operation,
38+
WriteOperationOptions options,
39+
IClientSessionHandle session = null,
40+
CancellationToken cancellationToken = default);
41+
42+
Task<TResult> ExecuteWriteOperationAsync<TResult>(
43+
IWriteOperation<TResult> operation,
44+
WriteOperationOptions options,
45+
IClientSessionHandle session = null,
46+
CancellationToken cancellationToken = default);
3147

3248
IClientSessionHandle StartImplicitSession(CancellationToken cancellationToken);
49+
3350
Task<IClientSessionHandle> StartImplicitSessionAsync(CancellationToken cancellationToken);
3451
}
3552
}

0 commit comments

Comments
 (0)