Skip to content

Commit 593d871

Browse files
andrxMpdreamz
authored andcommitted
Added pipeline to ReindexDestination (#4183) (#4187)
1 parent b27b6f4 commit 593d871

File tree

2 files changed

+134
-0
lines changed

2 files changed

+134
-0
lines changed

src/Nest/Document/Multiple/ReindexOnServer/ReindexDestination.cs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,11 @@ public interface IReindexDestination
2323

2424
OpType? OpType { get; set; }
2525

26+
/// <summary>
27+
/// Id of the pipeline to use to process documents
28+
/// </summary>
29+
string Pipeline { get; set; }
30+
2631
/// <summary>
2732
/// The routing to use when reindexing
2833
/// </summary>
@@ -49,6 +54,9 @@ public class ReindexDestination : IReindexDestination
4954
/// <inheritdoc />
5055
public OpType? OpType { get; set; }
5156

57+
/// <inheritdoc />
58+
public string Pipeline { get; set; }
59+
5260
/// <inheritdoc />
5361
public ReindexRouting Routing { get; set; }
5462

@@ -61,12 +69,16 @@ public class ReindexDestinationDescriptor : DescriptorBase<ReindexDestinationDes
6169
{
6270
IndexName IReindexDestination.Index { get; set; }
6371
OpType? IReindexDestination.OpType { get; set; }
72+
string IReindexDestination.Pipeline { get; set; }
6473
ReindexRouting IReindexDestination.Routing { get; set; }
6574
VersionType? IReindexDestination.VersionType { get; set; }
6675

6776
/// <inheritdoc cref="IReindexDestination.Routing" />
6877
public ReindexDestinationDescriptor Routing(ReindexRouting routing) => Assign(routing, (a, v) => a.Routing = v);
6978

79+
/// <inheritdoc cref="IReindexDestination.Pipeline" />
80+
public ReindexDestinationDescriptor Pipeline(string pipeline) => Assign(pipeline, (a, v) => a.Pipeline = v);
81+
7082
/// <inheritdoc cref="IReindexDestination.OpType" />
7183
public ReindexDestinationDescriptor OpType(OpType? opType) => Assign(opType, (a, v) => a.OpType = v);
7284

Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using Elastic.Xunit.XunitPlumbing;
4+
using Elasticsearch.Net;
5+
using FluentAssertions;
6+
using Nest;
7+
using Tests.Core.Extensions;
8+
using Tests.Core.ManagedElasticsearch.Clusters;
9+
using Tests.Framework.EndpointTests;
10+
using Tests.Framework.EndpointTests.TestState;
11+
using static Nest.Infer;
12+
13+
namespace Tests.Document.Multiple.ReindexOnServer
14+
{
15+
[SkipVersion("<2.3.0", "")]
16+
public class ReindexOnServerPipelineApiTests
17+
: ApiIntegrationTestBase<IntrusiveOperationCluster, ReindexOnServerResponse, IReindexOnServerRequest, ReindexOnServerDescriptor,
18+
ReindexOnServerRequest>
19+
{
20+
public ReindexOnServerPipelineApiTests(IntrusiveOperationCluster cluster, EndpointUsage usage) : base(cluster, usage) { }
21+
22+
protected override bool ExpectIsValid => true;
23+
24+
protected override object ExpectJson =>
25+
new
26+
{
27+
dest = new
28+
{
29+
index = $"{CallIsolatedValue}-clone",
30+
pipeline = $"{Pipeline}"
31+
},
32+
source = new
33+
{
34+
index = CallIsolatedValue
35+
},
36+
conflicts = "proceed"
37+
};
38+
39+
protected override int ExpectStatusCode => 200;
40+
41+
protected override Func<ReindexOnServerDescriptor, IReindexOnServerRequest> Fluent => d => d
42+
.Source(s => s
43+
.Index(CallIsolatedValue)
44+
)
45+
.Destination(s => s
46+
.Index(CallIsolatedValue + "-clone")
47+
.Pipeline($"{Pipeline}")
48+
)
49+
.Conflicts(Conflicts.Proceed)
50+
.Refresh();
51+
52+
protected override HttpMethod HttpMethod => HttpMethod.POST;
53+
54+
protected override ReindexOnServerRequest Initializer => new ReindexOnServerRequest()
55+
{
56+
Source = new ReindexSource
57+
{
58+
Index = CallIsolatedValue
59+
},
60+
Destination = new ReindexDestination
61+
{
62+
Index = CallIsolatedValue + "-clone",
63+
Pipeline = $"{Pipeline}"
64+
},
65+
Conflicts = Conflicts.Proceed,
66+
Refresh = true,
67+
};
68+
69+
protected virtual string Pipeline { get; } = "pipeline-id";
70+
71+
protected override bool SupportsDeserialization => false;
72+
73+
protected override string UrlPath => $"/_reindex?refresh=true";
74+
75+
protected override void IntegrationSetup(IElasticClient client, CallUniqueValues values)
76+
{
77+
var pipelineResponse = client.Ingest.PutPipeline(Pipeline, p => p
78+
.Processors(pr => pr
79+
.Set<Test>(t => t.Field(f => f.Flag).Value("Overridden"))
80+
)
81+
);
82+
pipelineResponse.ShouldBeValid($"Failed to set up pipeline named '{Pipeline}' required for bulk");
83+
84+
foreach (var index in values.Values)
85+
{
86+
Client.Index(new Test { Id = 1, Flag = "bar" }, i => i.Index(index).Refresh(Refresh.True));
87+
Client.Index(new Test { Id = 2, Flag = "bar" }, i => i.Index(index).Refresh(Refresh.True));
88+
}
89+
}
90+
91+
protected override LazyResponses ClientUsage() => Calls(
92+
(client, f) => client.ReindexOnServer(f),
93+
(client, f) => client.ReindexOnServerAsync(f),
94+
(client, r) => client.ReindexOnServer(r),
95+
(client, r) => client.ReindexOnServerAsync(r)
96+
);
97+
98+
protected override void OnAfterCall(IElasticClient client) => client.Indices.Refresh(CallIsolatedValue);
99+
100+
protected override void ExpectResponse(ReindexOnServerResponse response)
101+
{
102+
response.Task.Should().BeNull();
103+
response.Took.Should().BeGreaterThan(TimeSpan.FromMilliseconds(0));
104+
response.Total.Should().Be(2);
105+
response.Updated.Should().Be(0);
106+
response.Created.Should().Be(2);
107+
response.Batches.Should().Be(1);
108+
109+
var search = Client.Search<Test>(s => s
110+
.Index(CallIsolatedValue + "-clone")
111+
);
112+
search.Total.Should().BeGreaterThan(0);
113+
search.Documents.Should().OnlyContain(t => t.Flag == "Overridden");
114+
}
115+
116+
public class Test
117+
{
118+
public string Flag { get; set; }
119+
public long Id { get; set; }
120+
}
121+
}
122+
}

0 commit comments

Comments
 (0)