Skip to content

Commit fce2e29

Browse files
committed
feat(java): refactor Write API parameters to one class
1 parent 321ffbc commit fce2e29

File tree

8 files changed

+207
-186
lines changed

8 files changed

+207
-186
lines changed

client-kotlin/src/main/kotlin/com/influxdb/client/kotlin/internal/WriteKotlinApiImpl.kt

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -22,14 +22,13 @@
2222
package com.influxdb.client.kotlin.internal
2323

2424
import com.influxdb.client.InfluxDBClientOptions
25-
import com.influxdb.client.WriteApi.WriteParameters
26-
import com.influxdb.client.domain.WriteConsistency
2725
import com.influxdb.client.domain.WritePrecision
2826
import com.influxdb.client.internal.AbstractWriteBlockingClient
2927
import com.influxdb.client.internal.AbstractWriteClient
3028
import com.influxdb.client.kotlin.WriteKotlinApi
3129
import com.influxdb.client.service.WriteService
3230
import com.influxdb.client.write.Point
31+
import com.influxdb.client.write.WriteParameters
3332
import kotlinx.coroutines.flow.Flow
3433
import kotlinx.coroutines.flow.asFlow
3534
import kotlinx.coroutines.flow.map
@@ -118,17 +117,15 @@ internal class WriteKotlinApiImpl(service: WriteService, options: InfluxDBClient
118117
val bucketOrOption = bucket ?: options.bucket.orEmpty()
119118
val orgOrOption = org ?: options.org.orEmpty()
120119

121-
val obj = object : WriteParameters {
122-
override fun getPrecision(): WritePrecision {
123-
return precision
124-
}
120+
write(records, WriteParameters(bucketOrOption, orgOrOption, precision))
121+
}
125122

126-
override fun getConsistency(): WriteConsistency? {
127-
return null
128-
}
129-
}
123+
private suspend fun write(
124+
records: Flow<AbstractWriteClient.BatchWriteData>,
125+
parameters: WriteParameters
126+
) {
130127

131-
write(bucketOrOption, orgOrOption, obj, records.toList().stream())
128+
write(parameters, records.toList().stream())
132129
}
133130
}
134131

client-reactive/src/main/java/com/influxdb/client/reactive/internal/WriteReactiveApiImpl.java

Lines changed: 15 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@
3131
import javax.annotation.Nullable;
3232

3333
import com.influxdb.client.InfluxDBClientOptions;
34-
import com.influxdb.client.WriteApi;
3534
import com.influxdb.client.domain.WriteConsistency;
3635
import com.influxdb.client.domain.WritePrecision;
3736
import com.influxdb.client.internal.AbstractWriteClient;
@@ -44,6 +43,7 @@
4443
import com.influxdb.client.reactive.WriteReactiveApi;
4544
import com.influxdb.client.service.WriteService;
4645
import com.influxdb.client.write.Point;
46+
import com.influxdb.client.write.WriteParameters;
4747
import com.influxdb.internal.AbstractRestClient;
4848
import com.influxdb.utils.Arguments;
4949

@@ -124,7 +124,7 @@ public Publisher<Success> writeRecords(@Nonnull final String bucket,
124124

125125
Flowable<BatchWriteData> stream = Flowable.fromPublisher(records).map(BatchWriteDataRecord::new);
126126

127-
return write(bucket, org, new WriteReactiveParameters(precision), stream);
127+
return write(new WriteParameters(bucket, org, precision), stream);
128128
}
129129

130130
@Override
@@ -176,7 +176,7 @@ public Publisher<Success> writePoints(@Nonnull final String bucket,
176176
.filter(Objects::nonNull)
177177
.map(point -> new BatchWriteDataPoint(point, precision, options));
178178

179-
return write(bucket, org, new WriteReactiveParameters(precision), stream);
179+
return write(new WriteParameters(bucket, org, precision), stream);
180180
}
181181

182182
@Override
@@ -227,18 +227,14 @@ public <M> Publisher<Success> writeMeasurements(@Nonnull final String bucket,
227227
Flowable<BatchWriteData> stream = Flowable.fromPublisher(measurements)
228228
.map(it -> new BatchWriteDataMeasurement(it, precision, options, measurementMapper));
229229

230-
return write(bucket, org, new WriteReactiveParameters(precision), stream);
230+
return write(new WriteParameters(bucket, org, precision), stream);
231231
}
232232

233233
@Nonnull
234234
@SuppressWarnings("MagicNumber")
235-
private Publisher<Success> write(@Nonnull final String bucket,
236-
@Nonnull final String organization,
237-
@Nonnull final WriteApi.WriteParameters parameters,
235+
private Publisher<Success> write(@Nonnull final WriteParameters parameters,
238236
@Nonnull final Flowable<BatchWriteData> stream) {
239237

240-
Arguments.checkNonEmpty(bucket, "bucket");
241-
Arguments.checkNonEmpty(organization, "organization");
242238
Arguments.checkNotNull(parameters, "parameters");
243239
Arguments.checkNotNull(stream, "stream");
244240

@@ -293,9 +289,16 @@ private Publisher<Success> write(@Nonnull final String bucket,
293289
//
294290
// HTTP post
295291
//
296-
.flatMapSingle(it -> service.postWriteRx(organization, bucket, it, null,
297-
"identity", "text/plain; charset=utf-8", null,
298-
"application/json", null, parameters.getPrecision(), parameters.getConsistency())
292+
.flatMapSingle(it -> {
293+
String organization = parameters.organizationSafe(options);
294+
String bucket = parameters.bucketSafe(options);
295+
WritePrecision precision = parameters.precisionSafe(options);
296+
WriteConsistency consistency = parameters.consistencySafe(options);
297+
298+
return service.postWriteRx(organization, bucket, it, null,
299+
"identity", "text/plain; charset=utf-8", null,
300+
"application/json", null, precision, consistency);
301+
}
299302
)
300303
//
301304
// Map to Success
@@ -333,26 +336,4 @@ private Publisher<Success> write(@Nonnull final String bucket,
333336
return Flowable.error(toInfluxException(throwable));
334337
});
335338
}
336-
337-
private static final class WriteReactiveParameters implements WriteApi.WriteParameters {
338-
339-
private final WritePrecision writePrecision;
340-
341-
private WriteReactiveParameters(@Nonnull final WritePrecision writePrecision) {
342-
Arguments.checkNotNull(writePrecision, "writePrecision");
343-
this.writePrecision = writePrecision;
344-
}
345-
346-
@Nullable
347-
@Override
348-
public WritePrecision getPrecision() {
349-
return writePrecision;
350-
}
351-
352-
@Nullable
353-
@Override
354-
public WriteConsistency getConsistency() {
355-
return null;
356-
}
357-
}
358339
}

client/src/main/java/com/influxdb/client/WriteApi.java

Lines changed: 0 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
import javax.annotation.Nullable;
2727
import javax.annotation.concurrent.ThreadSafe;
2828

29-
import com.influxdb.client.domain.WriteConsistency;
3029
import com.influxdb.client.domain.WritePrecision;
3130
import com.influxdb.client.write.Point;
3231
import com.influxdb.client.write.events.AbstractWriteEvent;
@@ -284,30 +283,4 @@ interface RetryOptions {
284283
*/
285284
int getExponentialBase();
286285
}
287-
288-
/**
289-
* Write API optional parameters.
290-
*/
291-
interface WriteParameters {
292-
/**
293-
* Precision for unix timestamps in the line protocol of the request payload.
294-
*
295-
* @return {@link WritePrecision}
296-
*/
297-
@Nullable
298-
WritePrecision getPrecision();
299-
300-
/**
301-
* The write consistency for the point. InfluxDB assumes that the write consistency is
302-
* {@link WriteConsistency#ONE} if you do not specify consistency.
303-
* See the <a href="https://bit.ly/enterprise-consistency">InfluxDB Enterprise documentation</a>
304-
* for detailed descriptions of each consistency option.
305-
*
306-
* <b>Available with InfluxDB Enterprise clusters only!</b>
307-
*
308-
* @return {@link WriteConsistency}
309-
*/
310-
@Nullable
311-
WriteConsistency getConsistency();
312-
}
313286
}

client/src/main/java/com/influxdb/client/internal/AbstractWriteBlockingClient.java

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,11 @@
2929
import javax.annotation.Nullable;
3030

3131
import com.influxdb.client.InfluxDBClientOptions;
32-
import com.influxdb.client.WriteApi;
32+
import com.influxdb.client.domain.WriteConsistency;
3333
import com.influxdb.client.domain.WritePrecision;
3434
import com.influxdb.client.internal.AbstractWriteClient.BatchWriteDataMeasurement;
3535
import com.influxdb.client.service.WriteService;
36+
import com.influxdb.client.write.WriteParameters;
3637
import com.influxdb.internal.AbstractRestClient;
3738
import com.influxdb.utils.Arguments;
3839

@@ -60,9 +61,7 @@ public AbstractWriteBlockingClient(@Nonnull final WriteService service,
6061
this.service = service;
6162
}
6263

63-
protected void write(@Nonnull final String bucket,
64-
@Nonnull final String organization,
65-
@Nonnull final WriteApi.WriteParameters parameters,
64+
protected void write(@Nonnull final WriteParameters parameters,
6665
@Nonnull final Stream<AbstractWriteClient.BatchWriteData> stream) {
6766

6867
String lineProtocol = stream.map(AbstractWriteClient.BatchWriteData::toLineProtocol)
@@ -75,13 +74,18 @@ protected void write(@Nonnull final String bucket,
7574
return;
7675
}
7776

77+
String organization = parameters.organizationSafe(options);
78+
String bucket = parameters.bucketSafe(options);
79+
WritePrecision precision = parameters.precisionSafe(options);
80+
WriteConsistency consistency = parameters.consistencySafe(options);
81+
7882
LOG.log(Level.FINEST,
7983
"Writing time-series data into InfluxDB (org={0}, bucket={1}, precision={2})...",
80-
new Object[]{organization, bucket, parameters.getPrecision()});
84+
new Object[]{organization, bucket, precision});
8185

8286
Call<Void> voidCall = service.postWrite(organization, bucket, lineProtocol, null,
8387
"identity", "text/plain; charset=utf-8", null,
84-
"application/json", null, parameters.getPrecision(), parameters.getConsistency());
88+
"application/json", null, precision, consistency);
8589

8690
execute(voidCall);
8791

client/src/main/java/com/influxdb/client/internal/AbstractWriteClient.java

Lines changed: 19 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import com.influxdb.client.domain.WritePrecision;
4141
import com.influxdb.client.service.WriteService;
4242
import com.influxdb.client.write.Point;
43+
import com.influxdb.client.write.WriteParameters;
4344
import com.influxdb.client.write.events.AbstractWriteEvent;
4445
import com.influxdb.client.write.events.BackpressureEvent;
4546
import com.influxdb.client.write.events.WriteErrorEvent;
@@ -140,7 +141,7 @@ public AbstractWriteClient(@Nonnull final WriteOptions writeOptions,
140141
//
141142
// Group by key - same bucket, same org
142143
//
143-
.concatMap(it -> it.groupBy(batchWrite -> batchWrite.batchWriteOptions))
144+
.concatMap(it -> it.groupBy(batchWrite -> batchWrite.writeParameters))
144145
//
145146
// Create Write Point = bucket, org, ... + data
146147
//
@@ -223,23 +224,23 @@ public void write(@Nonnull final String bucket,
223224
stream.subscribe(
224225
dataPoint -> {
225226
WritePrecision precision = dataPoint.point.getPrecision();
226-
write(new BatchWriteOptions(bucket, organization, precision), Flowable.just(dataPoint));
227+
write(new WriteParameters(bucket, organization, precision), Flowable.just(dataPoint));
227228
},
228229
throwable -> publish(new WriteErrorEvent(throwable)));
229230
}
230231

231-
public void write(@Nonnull final BatchWriteOptions batchWriteOptions,
232+
public void write(@Nonnull final WriteParameters writeParameters,
232233
@Nonnull final Publisher<AbstractWriteClient.BatchWriteData> stream) {
233234

234-
Arguments.checkNotNull(batchWriteOptions, "batchWriteOptions");
235+
Arguments.checkNotNull(writeParameters, "writeParameters");
235236
Arguments.checkNotNull(stream, "data to write");
236237

237238
if (processor.hasComplete()) {
238239
throw new InfluxException(CLOSED_EXCEPTION);
239240
}
240241

241242
Flowable.fromPublisher(stream)
242-
.map(it -> new BatchWriteItem(batchWriteOptions, it))
243+
.map(it -> new BatchWriteItem(writeParameters, it))
243244
.subscribe(processor::onNext, throwable -> publish(new WriteErrorEvent(throwable)));
244245
}
245246

@@ -352,74 +353,20 @@ public String toLineProtocol() {
352353
*/
353354
final class BatchWriteItem {
354355

355-
private BatchWriteOptions batchWriteOptions;
356+
private WriteParameters writeParameters;
356357
private BatchWriteData data;
357358

358-
private BatchWriteItem(@Nonnull final BatchWriteOptions batchWriteOptions,
359+
private BatchWriteItem(@Nonnull final WriteParameters writeParameters,
359360
@Nonnull final BatchWriteData data) {
360361

361-
Arguments.checkNotNull(batchWriteOptions, "data");
362-
Arguments.checkNotNull(data, "write options");
362+
Arguments.checkNotNull(writeParameters, "writeParameters");
363+
Arguments.checkNotNull(data, "data");
363364

364-
this.batchWriteOptions = batchWriteOptions;
365+
this.writeParameters = writeParameters;
365366
this.data = data;
366367
}
367368
}
368369

369-
/**
370-
* The options to apply to a @{@link BatchWriteItem}.
371-
*/
372-
final class BatchWriteOptions implements WriteApi.WriteParameters {
373-
374-
private String bucket;
375-
private String organization;
376-
private WritePrecision precision;
377-
378-
BatchWriteOptions(@Nonnull final String bucket,
379-
@Nonnull final String organization,
380-
@Nonnull final WritePrecision precision) {
381-
382-
Arguments.checkNonEmpty(bucket, "bucket");
383-
Arguments.checkNonEmpty(organization, "organization");
384-
Arguments.checkNotNull(precision, "TimeUnit.precision is required");
385-
386-
this.bucket = bucket;
387-
this.organization = organization;
388-
this.precision = precision;
389-
}
390-
391-
@Override
392-
public boolean equals(final Object o) {
393-
if (this == o) {
394-
return true;
395-
}
396-
if (!(o instanceof BatchWriteOptions)) {
397-
return false;
398-
}
399-
BatchWriteOptions that = (BatchWriteOptions) o;
400-
return Objects.equals(bucket, that.bucket)
401-
&& Objects.equals(organization, that.organization)
402-
&& precision == that.precision;
403-
}
404-
405-
@Override
406-
public int hashCode() {
407-
return Objects.hash(bucket, organization, precision);
408-
}
409-
410-
@Nullable
411-
@Override
412-
public WritePrecision getPrecision() {
413-
return precision;
414-
}
415-
416-
@Nullable
417-
@Override
418-
public WriteConsistency getConsistency() {
419-
return null;
420-
}
421-
}
422-
423370
@SuppressWarnings("rawtypes")
424371
private final class ToWritePointsMaybe implements Function<BatchWriteItem, Maybe<Notification<Response>>> {
425372

@@ -439,14 +386,15 @@ public Maybe<Notification<Response>> apply(final BatchWriteItem batchWrite) {
439386
}
440387

441388
// Parameters
442-
String organization = batchWrite.batchWriteOptions.organization;
443-
String bucket = batchWrite.batchWriteOptions.bucket;
444-
WritePrecision precision = batchWrite.batchWriteOptions.precision;
389+
String organization = batchWrite.writeParameters.organizationSafe(options);
390+
String bucket = batchWrite.writeParameters.bucketSafe(options);
391+
WritePrecision precision = batchWrite.writeParameters.precisionSafe(options);
392+
WriteConsistency consistency = batchWrite.writeParameters.consistencySafe(options);
445393

446394
Maybe<Response<Void>> requestSource = service
447395
.postWriteRx(organization, bucket, content, null,
448396
"identity", "text/plain; charset=utf-8", null,
449-
"application/json", null, precision, null)
397+
"application/json", null, precision, consistency)
450398
.toMaybe();
451399

452400
return requestSource
@@ -496,9 +444,9 @@ public Maybe<Notification<Response>> apply(final BatchWriteItem batchWrite) {
496444
private WriteSuccessEvent toSuccessEvent(@Nonnull final BatchWriteItem batchWrite, final String lineProtocol) {
497445

498446
return new WriteSuccessEvent(
499-
batchWrite.batchWriteOptions.organization,
500-
batchWrite.batchWriteOptions.bucket,
501-
batchWrite.batchWriteOptions.precision,
447+
batchWrite.writeParameters.organizationSafe(options),
448+
batchWrite.writeParameters.bucketSafe(options),
449+
batchWrite.writeParameters.precisionSafe(options),
502450
lineProtocol);
503451
}
504452
}

0 commit comments

Comments
 (0)