Skip to content

Commit 22f5b42

Browse files
committed
fix #2220 allow more control in BulkAll how the buffer of IEnumerable<T> is tranlated into a single bulk request, rather then always doing IndexMany on it
(cherry picked from commit 813b088)
1 parent 7f5988b commit 22f5b42

File tree

3 files changed

+84
-46
lines changed

3 files changed

+84
-46
lines changed

src/Nest/Document/Multiple/BulkAll/BulkAllObservable.cs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -108,12 +108,15 @@ private async Task<IBulkAllResponse> BulkAsync(IList<T> buffer, long page, int b
108108
var r = this._partionedBulkRequest;
109109
var response = await this._client.BulkAsync(s =>
110110
{
111-
s.IndexMany(buffer);
112-
s.Index(r.Index).Type(r.Type);
111+
s.Index(r.Index).Type(r.Type);
112+
if (r.BufferToBulk != null) r.BufferToBulk(s, buffer);
113+
else s.IndexMany(buffer);
113114
if (!string.IsNullOrEmpty(r.Pipeline)) s.Pipeline(r.Pipeline);
114115
if (r.Refresh.HasValue) s.Refresh(r.Refresh.Value);
115116
if (!string.IsNullOrEmpty(r.Routing)) s.Routing(r.Routing);
116117
if (r.WaitForActiveShards.HasValue) s.WaitForActiveShards(r.WaitForActiveShards.ToString());
118+
119+
117120
return s;
118121
}, this._compositeCancelToken).ConfigureAwait(false);
119122

src/Nest/Document/Multiple/BulkAll/BulkAllRequest.cs

Lines changed: 37 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
1-
using System.Collections.Generic;
1+
using System;
2+
using System.Collections.Generic;
23
using Elasticsearch.Net;
34

45
namespace Nest
56
{
6-
public interface IBulkAllRequest<T>
7+
public interface IBulkAllRequest<T> where T : class
78
{
89
/// <summary> In case of a 429 (too busy) how long should we wait before retrying</summary>
910
Time BackOffTime { get; set; }
@@ -51,26 +52,51 @@ public interface IBulkAllRequest<T>
5152
Time Timeout { get; set; }
5253

5354
///<summary>The pipeline id to preprocess all the incoming documents with</summary>
54-
string Pipeline { get; set; }
55-
55+
string Pipeline { get; set; }
56+
57+
/// <summary>
58+
/// By default the bulkall helper simply calls <see cref="BulkDescriptor.IndexMany"/> on the buffer.
59+
/// There might be case where you'd like more control over this. By setting this callback you are in complete control
60+
/// of describing how the buffer should be translated to a bulk operation. Maybe you want to enforce all documents are newly created using
61+
/// <see cref="BulkDescriptor.CreateMany"/> or maybe a piece of metadata on <typeparamref name="T"/> controls where you need to call
62+
/// <see cref="BulkDescriptor.Update{T, TPartialDocument}(Func{BulkUpdateDescriptor{T, TPartialDocument}, IBulkUpdateOperation{T, TPartialDocument}})"/>
63+
/// or <see cref="BulkDescriptor.Index{T}(Func{BulkIndexDescriptor{T}, IBulkIndexOperation{T}})"/>
64+
/// </summary>
65+
Action<BulkDescriptor, IList<T>> BufferToBulk { get; set; }
5666
}
5767

58-
public class BulkAllRequest<T> : IBulkAllRequest<T>
68+
public class BulkAllRequest<T> : IBulkAllRequest<T>
69+
where T : class
5970
{
71+
/// <inheritdoc />
6072
public Time BackOffTime { get; set; }
73+
/// <inheritdoc />
6174
public int? Size { get; set; }
75+
/// <inheritdoc />
6276
public int? MaxDegreeOfParallelism { get; set; }
77+
/// <inheritdoc />
6378
public int? BackOffRetries { get; set; }
79+
/// <inheritdoc />
6480
public IEnumerable<T> Documents { get; private set; }
6581

82+
/// <inheritdoc />
6683
public IndexName Index { get; set; }
84+
/// <inheritdoc />
6785
public TypeName Type { get; set; }
86+
/// <inheritdoc />
6887
public int? WaitForActiveShards { get; set; }
88+
/// <inheritdoc />
6989
public Refresh? Refresh { get; set; }
90+
/// <inheritdoc />
7091
public bool RefreshOnCompleted { get; set; }
92+
/// <inheritdoc />
7193
public string Routing { get; set; }
94+
/// <inheritdoc />
7295
public Time Timeout { get; set; }
73-
public string Pipeline { get; set; }
96+
/// <inheritdoc />
97+
public string Pipeline { get; set; }
98+
/// <inheritdoc />
99+
public Action<BulkDescriptor, IList<T>> BufferToBulk { get; set; }
74100

75101
public BulkAllRequest(IEnumerable<T> documents)
76102
{
@@ -98,7 +124,8 @@ public class BulkAllDescriptor<T> : DescriptorBase<BulkAllDescriptor<T>, IBulkAl
98124
bool IBulkAllRequest<T>.RefreshOnCompleted { get; set; }
99125
string IBulkAllRequest<T>.Routing { get; set; }
100126
Time IBulkAllRequest<T>.Timeout { get; set; }
101-
string IBulkAllRequest<T>.Pipeline { get; set; }
127+
string IBulkAllRequest<T>.Pipeline { get; set; }
128+
Action<BulkDescriptor, IList<T>> IBulkAllRequest<T>.BufferToBulk { get; set; }
102129

103130
public BulkAllDescriptor(IEnumerable<T> documents)
104131
{
@@ -148,5 +175,8 @@ public BulkAllDescriptor<T> BackOffRetries(int? backoffs) =>
148175
/// <inheritdoc />
149176
public BulkAllDescriptor<T> Pipeline(string pipeline) => Assign(p => p.Pipeline = pipeline);
150177

178+
/// <inheritdoc />
179+
public BulkAllDescriptor<T> BufferToBulk(Action<BulkDescriptor, IList<T>> modifier) => Assign(p => p.BufferToBulk = modifier);
180+
151181
}
152182
}

src/Tests/Document/Multiple/BulkAll/BulkAllApiTests.cs

Lines changed: 42 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,8 @@ public BulkAllApiTests(IntrusiveOperationCluster cluster, EndpointUsage usage)
3333
this._client = cluster.Client;
3434
}
3535

36-
[I] public void ReturnsExpectedResponse()
36+
[I]
37+
public void ReturnsExpectedResponse()
3738
{
3839
var index = CreateIndexName();
3940
var handle = new ManualResetEvent(false);
@@ -42,24 +43,24 @@ [I] public void ReturnsExpectedResponse()
4243
var pages = 100;
4344
var seenPages = 0;
4445
var numberOfDocuments = size * pages;
45-
var documents = this.CreateLazyStreamOfDocuments(numberOfDocuments);
46-
47-
//first we setup our cold observable
46+
var documents = this.CreateLazyStreamOfDocuments(numberOfDocuments);
47+
48+
//first we setup our cold observable
4849
var observableBulk = this._client.BulkAll(documents, f => f
4950
.MaxDegreeOfParallelism(8)
5051
.BackOffTime(TimeSpan.FromSeconds(10))
5152
.BackOffRetries(2)
5253
.Size(size)
5354
.RefreshOnCompleted()
5455
.Index(index)
55-
);
56-
//we set up an observer
56+
);
57+
//we set up an observer
5758
var bulkObserver = new BulkAllObserver(
5859
onError: (e) => { throw e; },
5960
onCompleted: () => handle.Set(),
6061
onNext: (b) => Interlocked.Increment(ref seenPages)
61-
);
62-
//when we subscribe the observable becomes hot
62+
);
63+
//when we subscribe the observable becomes hot
6364
observableBulk.Subscribe(bulkObserver);
6465

6566
handle.WaitOne(TimeSpan.FromMinutes(5));
@@ -71,7 +72,8 @@ [I] public void ReturnsExpectedResponse()
7172
bulkObserver.TotalNumberOfRetries.Should().Be(0);
7273
}
7374

74-
[I] public void DisposingObservableCancelsBulkAll()
75+
[I]
76+
public void DisposingObservableCancelsBulkAll()
7577
{
7678
var index = CreateIndexName();
7779
var handle = new ManualResetEvent(false);
@@ -80,30 +82,30 @@ [I] public void DisposingObservableCancelsBulkAll()
8082
var pages = 100;
8183
var seenPages = 0;
8284
var numberOfDocuments = size * pages;
83-
var documents = this.CreateLazyStreamOfDocuments(numberOfDocuments);
84-
85-
//first we setup our cold observable
85+
var documents = this.CreateLazyStreamOfDocuments(numberOfDocuments);
86+
87+
//first we setup our cold observable
8688
var observableBulk = this._client.BulkAll(documents, f => f
8789
.MaxDegreeOfParallelism(8)
8890
.BackOffTime(TimeSpan.FromSeconds(10))
8991
.BackOffRetries(2)
9092
.Size(size)
9193
.RefreshOnCompleted()
9294
.Index(index)
93-
);
94-
//we set up an observer
95+
);
96+
//we set up an observer
9597
var bulkObserver = new BulkAllObserver(
9698
onError: (e) => { throw e; },
9799
onCompleted: () => handle.Set(),
98100
onNext: (b) => Interlocked.Increment(ref seenPages)
99-
);
100-
//when we subscribe the observable becomes hot
101-
observableBulk.Subscribe(bulkObserver);
102-
103-
//we wait N seconds to see some bulks
101+
);
102+
//when we subscribe the observable becomes hot
103+
observableBulk.Subscribe(bulkObserver);
104+
105+
//we wait N seconds to see some bulks
104106
handle.WaitOne(TimeSpan.FromSeconds(3));
105-
observableBulk.Dispose();
106-
//we wait N seconds to give in flight request a chance to cancel
107+
observableBulk.Dispose();
108+
//we wait N seconds to give in flight request a chance to cancel
107109
handle.WaitOne(TimeSpan.FromSeconds(3));
108110

109111
seenPages.Should().BeLessThan(pages).And.BeGreaterThan(0);
@@ -113,7 +115,8 @@ [I] public void DisposingObservableCancelsBulkAll()
113115
bulkObserver.TotalNumberOfRetries.Should().Be(0);
114116
}
115117

116-
[I] public void CancelBulkAll()
118+
[I]
119+
public void CancelBulkAll()
117120
{
118121
var index = CreateIndexName();
119122
var handle = new ManualResetEvent(false);
@@ -122,9 +125,9 @@ [I] public void CancelBulkAll()
122125
var pages = 100;
123126
var seenPages = 0;
124127
var numberOfDocuments = size * pages;
125-
var documents = this.CreateLazyStreamOfDocuments(numberOfDocuments);
126-
127-
//first we setup our cold observable
128+
var documents = this.CreateLazyStreamOfDocuments(numberOfDocuments);
129+
130+
//first we setup our cold observable
128131
var tokenSource = new CancellationTokenSource();
129132
var observableBulk = this._client.BulkAll(documents, f => f
130133
.MaxDegreeOfParallelism(8)
@@ -133,21 +136,21 @@ [I] public void CancelBulkAll()
133136
.Size(size)
134137
.RefreshOnCompleted()
135138
.Index(index)
136-
, tokenSource.Token);
137-
138-
//we set up an observer
139+
, tokenSource.Token);
140+
141+
//we set up an observer
139142
var bulkObserver = new BulkAllObserver(
140143
onError: (e) => { throw e; },
141144
onCompleted: () => handle.Set(),
142145
onNext: (b) => Interlocked.Increment(ref seenPages)
143-
);
144-
//when we subscribe the observable becomes hot
145-
observableBulk.Subscribe(bulkObserver);
146-
147-
//we wait Nseconds to see some bulks
146+
);
147+
//when we subscribe the observable becomes hot
148+
observableBulk.Subscribe(bulkObserver);
149+
150+
//we wait Nseconds to see some bulks
148151
handle.WaitOne(TimeSpan.FromSeconds(3));
149-
tokenSource.Cancel();
150-
//we wait Nseconds to give in flight request a chance to cancel
152+
tokenSource.Cancel();
153+
//we wait Nseconds to give in flight request a chance to cancel
151154
handle.WaitOne(TimeSpan.FromSeconds(3));
152155

153156
seenPages.Should().BeLessThan(pages).And.BeGreaterThan(0);
@@ -157,7 +160,8 @@ [I] public void CancelBulkAll()
157160
bulkObserver.TotalNumberOfRetries.Should().Be(0);
158161
}
159162

160-
[I] public async Task AwaitBulkAll()
163+
[I]
164+
public async Task AwaitBulkAll()
161165
{
162166
var index = CreateIndexName();
163167
var handle = new ManualResetEvent(false);
@@ -176,11 +180,12 @@ [I] public async Task AwaitBulkAll()
176180
.Size(size)
177181
.RefreshOnCompleted()
178182
.Index(index)
183+
.BufferToBulk((r, buffer) => r.IndexMany(buffer))
179184
, tokenSource.Token);
180185

181186

182187
await observableBulk
183-
.ForEachAsync(x=> Interlocked.Increment(ref seenPages), tokenSource.Token);
188+
.ForEachAsync(x => Interlocked.Increment(ref seenPages), tokenSource.Token);
184189

185190
seenPages.Should().Be(pages);
186191
var count = this._client.Count<SmallObject>(f => f.Index(index));

0 commit comments

Comments
 (0)