Skip to content

Commit e399cf7

Browse files
committed
feat(java): add possibility to specify WriteConsistency parameter
1 parent 0e612cf commit e399cf7

File tree

9 files changed

+213
-35
lines changed

9 files changed

+213
-35
lines changed

client-kotlin/src/main/kotlin/com/influxdb/client/kotlin/internal/WriteKotlinApiImpl.kt

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
package com.influxdb.client.kotlin.internal
2323

2424
import com.influxdb.client.InfluxDBClientOptions
25+
import com.influxdb.client.WriteApi.WriteParameters
26+
import com.influxdb.client.domain.WriteConsistency
2527
import com.influxdb.client.domain.WritePrecision
2628
import com.influxdb.client.internal.AbstractWriteBlockingClient
2729
import com.influxdb.client.internal.AbstractWriteClient
@@ -32,7 +34,6 @@ import kotlinx.coroutines.flow.Flow
3234
import kotlinx.coroutines.flow.asFlow
3335
import kotlinx.coroutines.flow.map
3436
import kotlinx.coroutines.flow.toList
35-
import java.util.*
3637

3738
/**
3839
* @author Jakub Bednar (20/04/2021 9:27)
@@ -117,7 +118,17 @@ internal class WriteKotlinApiImpl(service: WriteService, options: InfluxDBClient
117118
val bucketOrOption = bucket ?: options.bucket.orEmpty()
118119
val orgOrOption = org ?: options.org.orEmpty()
119120

120-
write(bucketOrOption, orgOrOption, precision, records.toList().stream())
121+
val obj = object : WriteParameters {
122+
override fun getPrecision(): WritePrecision {
123+
return precision
124+
}
125+
126+
override fun getConsistency(): WriteConsistency? {
127+
return null
128+
}
129+
}
130+
131+
write(bucketOrOption, orgOrOption, obj, records.toList().stream())
121132
}
122133
}
123134

client-reactive/src/main/java/com/influxdb/client/reactive/internal/WriteReactiveApiImpl.java

Lines changed: 30 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@
3131
import javax.annotation.Nullable;
3232

3333
import com.influxdb.client.InfluxDBClientOptions;
34+
import com.influxdb.client.WriteApi;
35+
import com.influxdb.client.domain.WriteConsistency;
3436
import com.influxdb.client.domain.WritePrecision;
3537
import com.influxdb.client.internal.AbstractWriteClient;
3638
import com.influxdb.client.internal.AbstractWriteClient.BatchWriteData;
@@ -122,7 +124,7 @@ public Publisher<Success> writeRecords(@Nonnull final String bucket,
122124

123125
Flowable<BatchWriteData> stream = Flowable.fromPublisher(records).map(BatchWriteDataRecord::new);
124126

125-
return write(bucket, org, precision, stream);
127+
return write(bucket, org, new WriteReactiveParameters(precision), stream);
126128
}
127129

128130
@Override
@@ -174,7 +176,7 @@ public Publisher<Success> writePoints(@Nonnull final String bucket,
174176
.filter(Objects::nonNull)
175177
.map(point -> new BatchWriteDataPoint(point, precision, options));
176178

177-
return write(bucket, org, precision, stream);
179+
return write(bucket, org, new WriteReactiveParameters(precision), stream);
178180
}
179181

180182
@Override
@@ -225,19 +227,19 @@ public <M> Publisher<Success> writeMeasurements(@Nonnull final String bucket,
225227
Flowable<BatchWriteData> stream = Flowable.fromPublisher(measurements)
226228
.map(it -> new BatchWriteDataMeasurement(it, precision, options, measurementMapper));
227229

228-
return write(bucket, org, precision, stream);
230+
return write(bucket, org, new WriteReactiveParameters(precision), stream);
229231
}
230232

231233
@Nonnull
232234
@SuppressWarnings("MagicNumber")
233235
private Publisher<Success> write(@Nonnull final String bucket,
234236
@Nonnull final String organization,
235-
@Nonnull final WritePrecision precision,
237+
@Nonnull final WriteApi.WriteParameters parameters,
236238
@Nonnull final Flowable<BatchWriteData> stream) {
237239

238240
Arguments.checkNonEmpty(bucket, "bucket");
239241
Arguments.checkNonEmpty(organization, "organization");
240-
Arguments.checkNotNull(precision, "precision");
242+
Arguments.checkNotNull(parameters, "parameters");
241243
Arguments.checkNotNull(stream, "stream");
242244

243245
Flowable<String> batches = stream
@@ -293,7 +295,7 @@ private Publisher<Success> write(@Nonnull final String bucket,
293295
//
294296
.flatMapSingle(it -> service.postWriteRx(organization, bucket, it, null,
295297
"identity", "text/plain; charset=utf-8", null,
296-
"application/json", null, precision)
298+
"application/json", null, parameters.getPrecision(), parameters.getConsistency())
297299
)
298300
//
299301
// Map to Success
@@ -331,4 +333,26 @@ private Publisher<Success> write(@Nonnull final String bucket,
331333
return Flowable.error(toInfluxException(throwable));
332334
});
333335
}
336+
337+
private static final class WriteReactiveParameters implements WriteApi.WriteParameters {
338+
339+
private final WritePrecision writePrecision;
340+
341+
private WriteReactiveParameters(@Nonnull final WritePrecision writePrecision) {
342+
Arguments.checkNotNull(writePrecision, "writePrecision");
343+
this.writePrecision = writePrecision;
344+
}
345+
346+
@Nullable
347+
@Override
348+
public WritePrecision getPrecision() {
349+
return writePrecision;
350+
}
351+
352+
@Nullable
353+
@Override
354+
public WriteConsistency getConsistency() {
355+
return null;
356+
}
357+
}
334358
}
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
/*
2+
* InfluxDB OSS API Service
3+
* The InfluxDB v2 API provides a programmatic interface for all interactions with InfluxDB. Access the InfluxDB API using the `/api/v2/` endpoint.
4+
*
5+
* OpenAPI spec version: 2.0.0
6+
*
7+
*
8+
* NOTE: This class is auto generated by OpenAPI Generator (https://openapi-generator.tech).
9+
* https://openapi-generator.tech
10+
* Do not edit the class manually.
11+
*/
12+
13+
14+
package com.influxdb.client.domain;
15+
16+
import java.util.Objects;
17+
import java.util.Arrays;
18+
import com.google.gson.annotations.SerializedName;
19+
20+
import java.io.IOException;
21+
import com.google.gson.TypeAdapter;
22+
import com.google.gson.annotations.JsonAdapter;
23+
import com.google.gson.stream.JsonReader;
24+
import com.google.gson.stream.JsonWriter;
25+
26+
/**
27+
* Gets or Sets WriteConsistency
28+
*/
29+
@JsonAdapter(WriteConsistency.Adapter.class)
30+
public enum WriteConsistency {
31+
32+
ANY("any"),
33+
34+
ONE("one"),
35+
36+
QUORUM("quorum"),
37+
38+
ALL("all");
39+
40+
private String value;
41+
42+
WriteConsistency(String value) {
43+
this.value = value;
44+
}
45+
46+
public String getValue() {
47+
return value;
48+
}
49+
50+
@Override
51+
public String toString() {
52+
return String.valueOf(value);
53+
}
54+
55+
public static WriteConsistency fromValue(String text) {
56+
for (WriteConsistency b : WriteConsistency.values()) {
57+
if (String.valueOf(b.value).equals(text)) {
58+
return b;
59+
}
60+
}
61+
return null;
62+
}
63+
64+
public static class Adapter extends TypeAdapter<WriteConsistency> {
65+
@Override
66+
public void write(final JsonWriter jsonWriter, final WriteConsistency enumeration) throws IOException {
67+
jsonWriter.value(enumeration.getValue());
68+
}
69+
70+
@Override
71+
public WriteConsistency read(final JsonReader jsonReader) throws IOException {
72+
String value = jsonReader.nextString();
73+
return WriteConsistency.fromValue(String.valueOf(value));
74+
}
75+
}
76+
}
77+

client/src/generated/java/com/influxdb/client/service/WriteService.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import com.influxdb.client.domain.Error;
1111
import com.influxdb.client.domain.LineProtocolError;
1212
import com.influxdb.client.domain.LineProtocolLengthError;
13+
import com.influxdb.client.domain.WriteConsistency;
1314
import com.influxdb.client.domain.WritePrecision;
1415
import io.reactivex.rxjava3.core.Single;
1516
import retrofit2.Response;
@@ -33,14 +34,15 @@ public interface WriteService {
3334
* @param accept Content type that the client can understand. Writes only return a response body if they fail, e.g. due to a formatting problem or quota limit. #### InfluxDB Cloud - returns only &#x60;application/json&#x60; for format and limit errors. - returns only &#x60;text/html&#x60; for some quota limit errors. #### InfluxDB OSS - returns only &#x60;application/json&#x60; for format and limit errors. For more information about write errors, see how to [troubleshoot issues writing data](https://docs.influxdata.com/influxdb/v2.1/write-data/troubleshoot/). (optional, default to application/json)
3435
* @param orgID ID of the destination organization for writes. If both &#x60;orgID&#x60; and &#x60;org&#x60; are specified, &#x60;org&#x60; takes precedence. (optional)
3536
* @param precision Precision for unix timestamps in the line protocol of the request payload. (optional, default to null)
37+
* @param consistency Sets the write consistency for the point. InfluxDB assumes that the write consistency is &#39;one&#39; if you do not specify. Available with InfluxDB Enterprise clusters only. (optional, default to null)
3638
* @return Call&lt;Void&gt;
3739
*/
3840
@Headers({
3941
"Content-Type:text/plain"
4042
})
4143
@POST("api/v2/write")
4244
Call<Void> postWrite(
43-
@retrofit2.http.Query("org") String org, @retrofit2.http.Query("bucket") String bucket, @retrofit2.http.Body String body, @retrofit2.http.Header("Zap-Trace-Span") String zapTraceSpan, @retrofit2.http.Header("Content-Encoding") String contentEncoding, @retrofit2.http.Header("Content-Type") String contentType, @retrofit2.http.Header("Content-Length") Integer contentLength, @retrofit2.http.Header("Accept") String accept, @retrofit2.http.Query("orgID") String orgID, @retrofit2.http.Query("precision") WritePrecision precision
45+
@retrofit2.http.Query("org") String org, @retrofit2.http.Query("bucket") String bucket, @retrofit2.http.Body String body, @retrofit2.http.Header("Zap-Trace-Span") String zapTraceSpan, @retrofit2.http.Header("Content-Encoding") String contentEncoding, @retrofit2.http.Header("Content-Type") String contentType, @retrofit2.http.Header("Content-Length") Integer contentLength, @retrofit2.http.Header("Accept") String accept, @retrofit2.http.Query("orgID") String orgID, @retrofit2.http.Query("precision") WritePrecision precision, @retrofit2.http.Query("consistency") WriteConsistency consistency
4446
);
4547

4648
/**
@@ -56,11 +58,12 @@ Call<Void> postWrite(
5658
* @param accept Content type that the client can understand. Writes only return a response body if they fail, e.g. due to a formatting problem or quota limit. #### InfluxDB Cloud - returns only &#x60;application/json&#x60; for format and limit errors. - returns only &#x60;text/html&#x60; for some quota limit errors. #### InfluxDB OSS - returns only &#x60;application/json&#x60; for format and limit errors. For more information about write errors, see how to [troubleshoot issues writing data](https://docs.influxdata.com/influxdb/v2.1/write-data/troubleshoot/). (optional, default to application/json)
5759
* @param orgID ID of the destination organization for writes. If both &#x60;orgID&#x60; and &#x60;org&#x60; are specified, &#x60;org&#x60; takes precedence. (optional)
5860
* @param precision Precision for unix timestamps in the line protocol of the request payload. (optional, default to null)
61+
* @param consistency Sets the write consistency for the point. InfluxDB assumes that the write consistency is &#39;one&#39; if you do not specify. Available with InfluxDB Enterprise clusters only. (optional, default to null)
5962
* @return Single&lt;Response&lt;Void&gt;&gt;
6063
*/
6164
@POST("api/v2/write")
6265
Single<Response<Void>> postWriteRx(
63-
@retrofit2.http.Query("org") String org, @retrofit2.http.Query("bucket") String bucket, @retrofit2.http.Body String body, @retrofit2.http.Header("Zap-Trace-Span") String zapTraceSpan, @retrofit2.http.Header("Content-Encoding") String contentEncoding, @retrofit2.http.Header("Content-Type") String contentType, @retrofit2.http.Header("Content-Length") Integer contentLength, @retrofit2.http.Header("Accept") String accept, @retrofit2.http.Query("orgID") String orgID, @retrofit2.http.Query("precision") WritePrecision precision
66+
@retrofit2.http.Query("org") String org, @retrofit2.http.Query("bucket") String bucket, @retrofit2.http.Body String body, @retrofit2.http.Header("Zap-Trace-Span") String zapTraceSpan, @retrofit2.http.Header("Content-Encoding") String contentEncoding, @retrofit2.http.Header("Content-Type") String contentType, @retrofit2.http.Header("Content-Length") Integer contentLength, @retrofit2.http.Header("Accept") String accept, @retrofit2.http.Query("orgID") String orgID, @retrofit2.http.Query("precision") WritePrecision precision, @retrofit2.http.Query("consistency") WriteConsistency consistency
6467
);
6568

6669
}

client/src/main/java/com/influxdb/client/WriteApi.java

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import javax.annotation.Nullable;
2727
import javax.annotation.concurrent.ThreadSafe;
2828

29+
import com.influxdb.client.domain.WriteConsistency;
2930
import com.influxdb.client.domain.WritePrecision;
3031
import com.influxdb.client.write.Point;
3132
import com.influxdb.client.write.events.AbstractWriteEvent;
@@ -43,7 +44,7 @@
4344
* <p>
4445
*
4546
* <b>
46-
* The {@link WriteApi} uses background thread to ingesting data into InfluxDB and is suppose to run as a singleton.
47+
* The {@link WriteApi} uses background thread to ingesting data into InfluxDB and is suppose to run as a singleton.
4748
* </b>
4849
*
4950
* @author Jakub Bednar (bednar@github) (20/09/2018 10:58)
@@ -276,11 +277,37 @@ interface RetryOptions {
276277

277278
/**
278279
* The base for the exponential retry delay.
279-
*
280+
* <p>
280281
* The next delay is computed as: retryInterval * exponentialBase^(attempts-1) + random(jitterInterval)
281282
*
282283
* @return exponential base
283284
*/
284285
int getExponentialBase();
285286
}
287+
288+
/**
289+
* Write API optional parameters.
290+
*/
291+
interface WriteParameters {
292+
/**
293+
* Precision for unix timestamps in the line protocol of the request payload.
294+
*
295+
* @return {@link WritePrecision}
296+
*/
297+
@Nullable
298+
WritePrecision getPrecision();
299+
300+
/**
301+
* The write consistency for the point. InfluxDB assumes that the write consistency is
302+
* {@link WriteConsistency#ONE} if you do not specify consistency.
303+
* See the <a href="https://bit.ly/enterprise-consistency">InfluxDB Enterprise documentation</a>
304+
* for detailed descriptions of each consistency option.
305+
*
306+
* <b>Available with InfluxDB Enterprise clusters only!</b>
307+
*
308+
* @return {@link WriteConsistency}
309+
*/
310+
@Nullable
311+
WriteConsistency getConsistency();
312+
}
286313
}

client/src/main/java/com/influxdb/client/internal/AbstractWriteBlockingClient.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import javax.annotation.Nullable;
3030

3131
import com.influxdb.client.InfluxDBClientOptions;
32+
import com.influxdb.client.WriteApi;
3233
import com.influxdb.client.domain.WritePrecision;
3334
import com.influxdb.client.internal.AbstractWriteClient.BatchWriteDataMeasurement;
3435
import com.influxdb.client.service.WriteService;
@@ -61,7 +62,7 @@ public AbstractWriteBlockingClient(@Nonnull final WriteService service,
6162

6263
protected void write(@Nonnull final String bucket,
6364
@Nonnull final String organization,
64-
@Nonnull final WritePrecision precision,
65+
@Nonnull final WriteApi.WriteParameters parameters,
6566
@Nonnull final Stream<AbstractWriteClient.BatchWriteData> stream) {
6667

6768
String lineProtocol = stream.map(AbstractWriteClient.BatchWriteData::toLineProtocol)
@@ -76,11 +77,11 @@ protected void write(@Nonnull final String bucket,
7677

7778
LOG.log(Level.FINEST,
7879
"Writing time-series data into InfluxDB (org={0}, bucket={1}, precision={2})...",
79-
new Object[]{organization, bucket, precision});
80+
new Object[]{organization, bucket, parameters.getPrecision()});
8081

8182
Call<Void> voidCall = service.postWrite(organization, bucket, lineProtocol, null,
8283
"identity", "text/plain; charset=utf-8", null,
83-
"application/json", null, precision);
84+
"application/json", null, parameters.getPrecision(), parameters.getConsistency());
8485

8586
execute(voidCall);
8687

0 commit comments

Comments
 (0)