Skip to content

Commit e2eba69

Browse files
authored
feat: Add async/await Write API without batching (#102)
1 parent 3547608 commit e2eba69

File tree

7 files changed

+682
-1
lines changed

7 files changed

+682
-1
lines changed

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
## 1.10.0 [unreleased]
22

3+
### Features
4+
1. [#23](https://github.com/influxdata/influxdb-client-csharp/pull/102): Added WriteApiAsync for asynchronous write without batching
5+
36
## 1.9.0 [2020-06-19]
47

58
### Features

Client.Test/ItWriteApiAsyncTest.cs

Lines changed: 160 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,160 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.Linq;
4+
using System.Threading.Tasks;
5+
using InfluxDB.Client.Api.Domain;
6+
using InfluxDB.Client.Core;
7+
using InfluxDB.Client.Core.Flux.Domain;
8+
using InfluxDB.Client.Core.Test;
9+
using InfluxDB.Client.Writes;
10+
using NodaTime;
11+
using NUnit.Framework;
12+
13+
namespace InfluxDB.Client.Test
14+
{
15+
[TestFixture]
16+
public class ItWriteApiAsyncTest : AbstractItClientTest
17+
{
18+
[SetUp]
19+
public new async Task SetUp()
20+
{
21+
_organization = await FindMyOrg();
22+
23+
var retention = new BucketRetentionRules(BucketRetentionRules.TypeEnum.Expire, 3600);
24+
25+
_bucket = await Client.GetBucketsApi()
26+
.CreateBucketAsync(GenerateName("h2o"), retention, _organization);
27+
28+
//
29+
// Add Permissions to read and write to the Bucket
30+
//
31+
var resource = new PermissionResource(PermissionResource.TypeEnum.Buckets, _bucket.Id, null,
32+
_organization.Id);
33+
34+
var readBucket = new Permission(Permission.ActionEnum.Read, resource);
35+
var writeBucket = new Permission(Permission.ActionEnum.Write, resource);
36+
37+
var loggedUser = await Client.GetUsersApi().MeAsync();
38+
Assert.IsNotNull(loggedUser);
39+
40+
var authorization = await Client.GetAuthorizationsApi()
41+
.CreateAuthorizationAsync(_organization,
42+
new List<Permission> {readBucket, writeBucket});
43+
44+
_token = authorization.Token;
45+
46+
Client.Dispose();
47+
var options = new InfluxDBClientOptions.Builder().Url(InfluxDbUrl).AuthenticateToken(_token.ToCharArray())
48+
.Org(_organization.Id).Bucket(_bucket.Id).Build();
49+
Client = InfluxDBClientFactory.Create(options);
50+
51+
_writeApi = Client.GetWriteApiAsync();
52+
}
53+
54+
private Bucket _bucket;
55+
private Organization _organization;
56+
private string _token;
57+
58+
private WriteApiAsync _writeApi;
59+
60+
[Measurement("h2o")]
61+
private class H20Measurement
62+
{
63+
[Column("location", IsTag = true)] public string Location { get; set; }
64+
65+
[Column("water_level")] public double? Level { get; set; }
66+
67+
[Column(IsTimestamp = true)] public DateTime Time { get; set; }
68+
}
69+
70+
[Test]
71+
public async Task Write()
72+
{
73+
// By LineProtocol
74+
await _writeApi.WriteRecordAsync(WritePrecision.S, "h2o,location=coyote_creek water_level=1.0 1");
75+
await _writeApi.WriteRecordAsync(_bucket.Name, _organization.Name, WritePrecision.S,
76+
"h2o,location=coyote_creek water_level=2.0 2");
77+
await _writeApi.WriteRecordsAsync(WritePrecision.S,
78+
new List<string>
79+
{
80+
"h2o,location=coyote_creek water_level=3.0 3",
81+
"h2o,location=coyote_creek water_level=4.0 4"
82+
});
83+
await _writeApi.WriteRecordsAsync(_bucket.Name, _organization.Name, WritePrecision.S,
84+
"h2o,location=coyote_creek water_level=5.0 5",
85+
"h2o,location=coyote_creek water_level=6.0 6");
86+
87+
// By DataPoint
88+
await _writeApi.WritePointAsync(PointData.Measurement("h2o").Tag("location", "coyote_creek")
89+
.Field("water_level", 7.0D).Timestamp(7L, WritePrecision.S));
90+
await _writeApi.WritePointAsync(_bucket.Name, _organization.Name,
91+
PointData.Measurement("h2o").Tag("location", "coyote_creek").Field("water_level", 8.0D)
92+
.Timestamp(8L, WritePrecision.S));
93+
await _writeApi.WritePointsAsync(new List<PointData>
94+
{
95+
PointData.Measurement("h2o").Tag("location", "coyote_creek").Field("water_level", 9.0D)
96+
.Timestamp(9L, WritePrecision.S),
97+
PointData.Measurement("h2o").Tag("location", "coyote_creek").Field("water_level", 10.0D)
98+
.Timestamp(10L, WritePrecision.S)
99+
});
100+
await _writeApi.WritePointsAsync(_bucket.Name, _organization.Name,
101+
PointData.Measurement("h2o").Tag("location", "coyote_creek")
102+
.Field("water_level", 11.0D)
103+
.Timestamp(11L, WritePrecision.S),
104+
PointData.Measurement("h2o").Tag("location", "coyote_creek")
105+
.Field("water_level", 12.0D)
106+
.Timestamp(12L, WritePrecision.S));
107+
108+
// By Measurement
109+
DateTime dtDateTime = new DateTime(1970, 1, 1, 0, 0, 0, 0, DateTimeKind.Utc);
110+
await _writeApi.WriteMeasurementAsync(WritePrecision.S,
111+
new H20Measurement
112+
{
113+
Location = "coyote_creek", Level = 13.0D, Time = dtDateTime.AddSeconds(13)
114+
});
115+
116+
await _writeApi.WriteMeasurementAsync(_bucket.Name, _organization.Name, WritePrecision.S,
117+
new H20Measurement
118+
{
119+
Location = "coyote_creek", Level = 14.0D, Time = dtDateTime.AddSeconds(14)
120+
});
121+
await _writeApi.WriteMeasurementsAsync(WritePrecision.S, new List<H20Measurement>
122+
{
123+
new H20Measurement
124+
{
125+
Location = "coyote_creek", Level = 15.0D, Time = dtDateTime.AddSeconds(15)
126+
},
127+
new H20Measurement
128+
{
129+
Location = "coyote_creek", Level = 16.0D, Time = dtDateTime.AddSeconds(16)
130+
}
131+
});
132+
await _writeApi.WriteMeasurementsAsync(_bucket.Name, _organization.Name, WritePrecision.S,
133+
new H20Measurement
134+
{
135+
Location = "coyote_creek", Level = 17.0D, Time = dtDateTime.AddSeconds(17)
136+
},
137+
new H20Measurement
138+
{
139+
Location = "coyote_creek", Level = 18.0D, Time = dtDateTime.AddSeconds(18)
140+
});
141+
142+
List<FluxTable> query = await Client.GetQueryApi().QueryAsync(
143+
"from(bucket:\"" + _bucket.Name +
144+
"\") |> range(start: 1970-01-01T00:00:00.000000001Z)",
145+
_organization.Id);
146+
147+
Assert.AreEqual(1, query.Count);
148+
Assert.AreEqual(18, query[0].Records.Count);
149+
150+
for (var ii = 1; ii <= 17; ii++)
151+
{
152+
var record = query[0].Records[ii - 1];
153+
Assert.AreEqual("h2o", record.GetMeasurement());
154+
Assert.AreEqual((double) ii, record.GetValue());
155+
Assert.AreEqual("water_level", record.GetField());
156+
Assert.AreEqual(Instant.FromDateTimeUtc(dtDateTime.AddSeconds(ii)), record.GetTime());
157+
}
158+
}
159+
}
160+
}

Client/InfluxDBClient.cs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,20 @@ public WriteApi GetWriteApi()
104104
{
105105
return GetWriteApi(WriteOptions.CreateNew().Build());
106106
}
107+
108+
/// <summary>
109+
/// Get the Write async client.
110+
/// </summary>
111+
/// <returns>the new client instance for the Write API Async without batching</returns>
112+
public WriteApiAsync GetWriteApiAsync()
113+
{
114+
var service = new WriteService((Configuration) _apiClient.Configuration)
115+
{
116+
ExceptionFactory = _exceptionFactory
117+
};
118+
119+
return new WriteApiAsync(_options, service, this);
120+
}
107121

108122
/// <summary>
109123
/// Get the Write client.

Client/README.md

Lines changed: 83 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -300,7 +300,10 @@ namespace Examples
300300

301301
## Writes
302302

303-
For writing data we use [WriteApi](https://github.com/influxdata/influxdb-client-csharp/blob/master/Client/WriteApi.cs#L1) that supports:
303+
For writing data we use [WriteApi](https://github.com/influxdata/influxdb-client-csharp/blob/master/Client/WriteApi.cs#L1) or
304+
[WriteApiAsync](https://github.com/influxdata/influxdb-client-csharp/blob/feat/write-api-async/Client/WriteApiAsync.cs) which is simplified version of WriteApi without batching support.
305+
306+
[WriteApi](https://github.com/influxdata/influxdb-client-csharp/blob/master/Client/WriteApi.cs#L1) supports:
304307

305308
1. writing data using [InfluxDB Line Protocol](https://docs.influxdata.com/influxdb/v1.6/write_protocols/line_protocol_tutorial/), Data Point, POCO
306309
2. use batching for writes
@@ -499,6 +502,85 @@ namespace Examples
499502
}
500503
```
501504

505+
#### Using WriteApiAsync
506+
```c#
507+
using System;
508+
using System.Threading.Tasks;
509+
using InfluxDB.Client;
510+
using InfluxDB.Client.Api.Domain;
511+
using InfluxDB.Client.Core;
512+
using InfluxDB.Client.Writes;
513+
514+
namespace Examples
515+
{
516+
public static class WriteApiAsyncExample
517+
{
518+
[Measurement("temperature")]
519+
private class Temperature
520+
{
521+
[Column("location", IsTag = true)] public string Location { get; set; }
522+
523+
[Column("value")] public double Value { get; set; }
524+
525+
[Column(IsTimestamp = true)] public DateTime Time;
526+
}
527+
528+
public static async Task Main(string[] args)
529+
{
530+
var influxDbClient = InfluxDBClientFactory.Create("http://localhost:9999",
531+
"my-user", "my-password".ToCharArray());
532+
533+
//
534+
// Write Data
535+
//
536+
var writeApiAsync = influxDbClient.GetWriteApiAsync();
537+
538+
//
539+
//
540+
// Write by LineProtocol
541+
//
542+
await writeApiAsync.WriteRecordAsync("my-bucket", "my-org", WritePrecision.Ns,
543+
"temperature,location=north value=60.0");
544+
545+
//
546+
//
547+
// Write by Data Point
548+
//
549+
var point = PointData.Measurement("temperature")
550+
.Tag("location", "west")
551+
.Field("value", 55D)
552+
.Timestamp(DateTime.UtcNow.AddSeconds(-10), WritePrecision.Ns);
553+
554+
await writeApiAsync.WritePointAsync("my-bucket", "my-org", point);
555+
556+
//
557+
// Write by POCO
558+
//
559+
var temperature = new Temperature {Location = "south", Value = 62D, Time = DateTime.UtcNow};
560+
561+
await writeApiAsync.WriteMeasurementAsync("my-bucket", "my-org", WritePrecision.Ns, temperature);
562+
563+
//
564+
// Check written data
565+
//
566+
var tables = await influxDbClient.GetQueryApi()
567+
.QueryAsync("from(bucket:\"my-bucket\") |> range(start: 0)", "my-org");
568+
569+
tables.ForEach(table =>
570+
{
571+
var fluxRecords = table.Records;
572+
fluxRecords.ForEach(record =>
573+
{
574+
Console.WriteLine($"{record.GetTime()}: {record.GetValue()}");
575+
});
576+
});
577+
578+
influxDbClient.Dispose();
579+
}
580+
}
581+
}
582+
```
583+
502584
#### Default Tags
503585

504586
Sometimes is useful to store same information in every measurement e.g. `hostname`, `location`, `customer`.

0 commit comments

Comments
 (0)