Skip to content

Commit 5760e0f

Browse files
authored
feat(linq): optimize Flux Query for querying one time-series (#197)
1 parent 89e46f7 commit 5760e0f

File tree

10 files changed

+289
-119
lines changed

10 files changed

+289
-119
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
### Features
44
1. [#194](https://github.com/influxdata/influxdb-client-csharp/pull/194): Add possibility to handle HTTP response from InfluxDB server [write]
5+
1. [#197](https://github.com/influxdata/influxdb-client-csharp/pull/197): Optimize Flux Query for querying one time-series [LINQ]
56

67
### Bug Fixes
78
1. [#193](https://github.com/influxdata/influxdb-client-csharp/pull/193): Create services without API implementation

Client.Linq.Test/InfluxDBQueryVisitorTest.cs

Lines changed: 120 additions & 52 deletions
Large diffs are not rendered by default.

Client.Linq.Test/ItInfluxDBQueryableTest.cs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,18 @@ public void QueryTake()
9292

9393
var sensors = query.ToList();
9494

95+
Assert.AreEqual(2*2, sensors.Count);
96+
}
97+
98+
[Test]
99+
public void QueryTakeMultipleTimeSeries()
100+
{
101+
var query = (from s in InfluxDBQueryable<Sensor>.Queryable("my-bucket", "my-org", _client.GetQueryApiSync(),
102+
new QueryableOptimizerSettings {QueryMultipleTimeSeries = true})
103+
select s).Take(2);
104+
105+
var sensors = query.ToList();
106+
95107
Assert.AreEqual(2, sensors.Count);
96108
}
97109

Client.Linq.Test/QueryAggregatorTest.cs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
using InfluxDB.Client.Core.Test;
2+
using InfluxDB.Client.Linq;
23
using InfluxDB.Client.Linq.Internal;
34
using NUnit.Framework;
45

@@ -76,10 +77,10 @@ public void Range()
7677
"from(bucket: p1) " +
7778
$"|> {range} " +
7879
"|> pivot(rowKey:[\"_time\"], columnKey: [\"_field\"], valueColumn: \"_value\") " +
79-
"|> drop(columns: [\"_start\", \"_stop\", \"_measurement\"]) " +
80-
"|> group()";
80+
"|> drop(columns: [\"_start\", \"_stop\", \"_measurement\"])";
8181

82-
Assert.AreEqual(expected, _aggregator.BuildFluxQuery(), $"Expected Range: {range}, Shift: {shift}");
82+
var settings = new QueryableOptimizerSettings();
83+
Assert.AreEqual(expected, _aggregator.BuildFluxQuery(settings), $"Expected Range: {range}, Shift: {shift}");
8384
}
8485
}
8586
}

Client.Linq/InfluxDBQueryable.cs

Lines changed: 41 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,27 @@
88

99
namespace InfluxDB.Client.Linq
1010
{
11+
/// <summary>
12+
/// The settings for a Query optimization.
13+
/// </summary>
14+
public class QueryableOptimizerSettings
15+
{
16+
public QueryableOptimizerSettings()
17+
{
18+
QueryMultipleTimeSeries = false;
19+
}
20+
21+
/// <summary>
22+
/// Gets or sets whether the drive is used to query multiple time series.
23+
/// Setting this variable to true will change how the produced Flux Query looks like:
24+
/// <list type="bullet">
25+
/// <item>appends <a href="https://docs.influxdata.com/influxdb/v2.0/reference/flux/stdlib/built-in/transformations/group/">group operator</a></item>
26+
/// <item>enable use default sorting: <i>sort(columns: ["_time"], desc: false)</i></item>
27+
/// </list>
28+
/// </summary>
29+
public bool QueryMultipleTimeSeries { get; set; }
30+
}
31+
1132
/// <summary>
1233
/// Main entry point to query InfluxDB by LINQ
1334
/// </summary>
@@ -19,34 +40,40 @@ public class InfluxDBQueryable<T> : QueryableBase<T>
1940
/// <param name="bucket">Specifies the source bucket.</param>
2041
/// <param name="org">Specifies the source organization.</param>
2142
/// <param name="queryApi">The underlying API to execute Flux Query.</param>
43+
/// <param name="queryableOptimizerSettings">Settings for a Query optimization</param>
2244
/// <returns>new instance for of Queryable</returns>
23-
public static InfluxDBQueryable<T> Queryable(string bucket, string org, QueryApiSync queryApi)
45+
public static InfluxDBQueryable<T> Queryable(string bucket, string org, QueryApiSync queryApi,
46+
QueryableOptimizerSettings queryableOptimizerSettings = default)
2447
{
25-
return Queryable(bucket, org, queryApi, new DefaultMemberNameResolver());
48+
return Queryable(bucket, org, queryApi, new DefaultMemberNameResolver(), queryableOptimizerSettings);
2649
}
27-
50+
2851
/// <summary>
2952
/// Create a new instance of IQueryable.
3053
/// </summary>
3154
/// <param name="bucket">Specifies the source bucket.</param>
3255
/// <param name="org">Specifies the source organization.</param>
3356
/// <param name="queryApi">The underlying API to execute Flux Query.</param>
3457
/// <param name="memberResolver">Resolver for customized names.</param>
58+
/// <param name="queryableOptimizerSettings">Settings for a Query optimization</param>
3559
/// <returns>new instance for of Queryable</returns>
36-
public static InfluxDBQueryable<T> Queryable(string bucket, string org, QueryApiSync queryApi, IMemberNameResolver memberResolver)
60+
public static InfluxDBQueryable<T> Queryable(string bucket, string org, QueryApiSync queryApi,
61+
IMemberNameResolver memberResolver, QueryableOptimizerSettings queryableOptimizerSettings = default)
3762
{
38-
return new InfluxDBQueryable<T>(bucket, org, queryApi, memberResolver);
63+
return new InfluxDBQueryable<T>(bucket, org, queryApi, memberResolver, queryableOptimizerSettings);
3964
}
40-
65+
4166
/// <summary>
4267
/// Create a new instance of IQueryable.
4368
/// </summary>
4469
/// <param name="bucket">Specifies the source bucket.</param>
4570
/// <param name="org">Specifies the source organization.</param>
4671
/// <param name="queryApi">The underlying API to execute Flux Query.</param>
4772
/// <param name="memberResolver">Resolver for customized names.</param>
48-
public InfluxDBQueryable(string bucket, string org, QueryApiSync queryApi, IMemberNameResolver memberResolver) : base(CreateQueryParser(),
49-
CreateExecutor(bucket, org, queryApi, memberResolver))
73+
/// <param name="queryableOptimizerSettings">Settings for a Query optimization</param>
74+
public InfluxDBQueryable(string bucket, string org, QueryApiSync queryApi, IMemberNameResolver memberResolver,
75+
QueryableOptimizerSettings queryableOptimizerSettings = default) : base(CreateQueryParser(),
76+
CreateExecutor(bucket, org, queryApi, memberResolver, queryableOptimizerSettings))
5077
{
5178
}
5279

@@ -77,18 +104,20 @@ public Api.Domain.Query ToDebugQuery()
77104
return generateQuery;
78105
}
79106

80-
private static IQueryExecutor CreateExecutor(string bucket, string org, QueryApiSync queryApi, IMemberNameResolver memberResolver)
107+
private static IQueryExecutor CreateExecutor(string bucket, string org, QueryApiSync queryApi,
108+
IMemberNameResolver memberResolver, QueryableOptimizerSettings queryableOptimizerSettings = default)
81109
{
82110
Arguments.CheckNonEmptyString(bucket, nameof(bucket));
83111
Arguments.CheckNonEmptyString(org, nameof(org));
84112
Arguments.CheckNotNull(queryApi, nameof(queryApi));
85-
86-
return new InfluxDBQueryExecutor(bucket, org, queryApi, memberResolver);
113+
114+
return new InfluxDBQueryExecutor(bucket, org, queryApi, memberResolver,
115+
queryableOptimizerSettings ?? new QueryableOptimizerSettings());
87116
}
88117

89118
private static QueryParser CreateQueryParser()
90119
{
91120
return QueryParser.CreateDefault();
92121
}
93122
}
94-
}
123+
}

Client.Linq/Internal/QueryAggregator.cs

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -64,15 +64,15 @@ internal class QueryAggregator
6464
private ResultFunction _resultFunction;
6565
private readonly List<string> _filterByTags;
6666
private readonly List<string> _filterByFields;
67-
private readonly List<(string, string)> _orders;
67+
private readonly List<(string, string, bool, string)> _orders;
6868

6969
internal QueryAggregator()
7070
{
7171
_resultFunction = ResultFunction.None;
7272
_limitNOffsetAssignments = new List<LimitOffsetAssignment>();
7373
_filterByTags = new List<string>();
7474
_filterByFields = new List<string>();
75-
_orders = new List<(string, string)>();
75+
_orders = new List<(string, string, bool, string)>();
7676
}
7777

7878
internal void AddBucket(string bucket)
@@ -133,9 +133,9 @@ internal void AddSubQueries(QueryAggregator aggregator)
133133
_orders.AddRange(aggregator._orders);
134134
}
135135

136-
internal void AddOrder(string orderPart, string desc)
136+
internal void AddOrder(string column, string columnVariable, bool descending, string descendingVariable)
137137
{
138-
_orders.Add((orderPart, desc));
138+
_orders.Add((column, columnVariable, descending, descendingVariable));
139139
}
140140

141141
internal void AddResultFunction(ResultFunction resultFunction)
@@ -145,8 +145,10 @@ internal void AddResultFunction(ResultFunction resultFunction)
145145
_resultFunction = resultFunction;
146146
}
147147

148-
internal string BuildFluxQuery()
148+
internal string BuildFluxQuery(QueryableOptimizerSettings settings)
149149
{
150+
Arguments.CheckNotNull(settings, nameof(settings));
151+
150152
var transforms = new List<string>();
151153
var parts = new List<string>
152154
{
@@ -155,14 +157,20 @@ internal string BuildFluxQuery()
155157
BuildFilter(_filterByTags),
156158
"pivot(rowKey:[\"_time\"], columnKey: [\"_field\"], valueColumn: \"_value\")",
157159
"drop(columns: [\"_start\", \"_stop\", \"_measurement\"])",
158-
"group()",
160+
settings.QueryMultipleTimeSeries ? "group()" : "",
159161
BuildFilter(_filterByFields)
160162
};
161163

162164
// https://docs.influxdata.com/influxdb/cloud/reference/flux/stdlib/built-in/transformations/sort/
163-
foreach (var (column, desc) in _orders)
165+
foreach (var ((column, columnVariable, descending, descendingVariable), index) in _orders.Select((value, i) => (value, i)))
164166
{
165-
parts.Add(BuildOperator("sort", "columns", new List<string> {column}, "desc", desc));
167+
// skip default sorting if don't query to multiple time series
168+
if (!settings.QueryMultipleTimeSeries && index == 0 && column == "_time" && !descending)
169+
{
170+
continue;
171+
}
172+
173+
parts.Add(BuildOperator("sort", "columns", new List<string> {columnVariable}, "desc", descendingVariable));
166174
}
167175

168176
// https://docs.influxdata.com/influxdb/cloud/reference/flux/stdlib/built-in/transformations/limit/

Client.Linq/Internal/QueryExecutor.cs

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,19 +21,24 @@ internal class InfluxDBQueryExecutor : IQueryExecutor
2121
private readonly string _org;
2222
private readonly QueryApiSync _queryApi;
2323
private readonly IMemberNameResolver _memberResolver;
24+
private readonly QueryableOptimizerSettings _queryableOptimizerSettings;
2425

2526
/// <summary>
2627
///
2728
/// </summary>
2829
/// <param name="bucket">Specifies the source bucket.</param>
2930
/// <param name="org">Specifies the source organization.</param>
3031
/// <param name="queryApi">The underlying API to execute Flux Query.</param>
31-
public InfluxDBQueryExecutor(string bucket, string org, QueryApiSync queryApi, IMemberNameResolver memberResolver)
32+
/// <param name="memberResolver">Resolver for customized names.</param>
33+
/// <param name="queryableOptimizerSettings">Settings for a Query optimization</param>
34+
public InfluxDBQueryExecutor(string bucket, string org, QueryApiSync queryApi,
35+
IMemberNameResolver memberResolver, QueryableOptimizerSettings queryableOptimizerSettings)
3236
{
3337
_bucket = bucket;
3438
_org = org;
3539
_queryApi = queryApi;
3640
_memberResolver = memberResolver;
41+
_queryableOptimizerSettings = queryableOptimizerSettings;
3742
}
3843

3944
/// <summary>
@@ -85,11 +90,22 @@ public IEnumerable<T> ExecuteCollection<T>(QueryModel queryModel)
8590
/// <returns>Query to Invoke</returns>
8691
internal Query GenerateQuery(QueryModel queryModel, out QueryResultsSettings settings)
8792
{
88-
var visitor = new InfluxDBQueryVisitor(_bucket, _memberResolver);
89-
visitor.VisitQueryModel(queryModel);
93+
var visitor = QueryVisitor(queryModel);
9094

9195
settings = new QueryResultsSettings(queryModel);
9296
return visitor.GenerateQuery();
9397
}
98+
99+
/// <summary>
100+
/// Create QueryVisitor for specified model.
101+
/// </summary>
102+
/// <param name="queryModel">Query Model</param>
103+
/// <returns>Query Visitor</returns>
104+
internal InfluxDBQueryVisitor QueryVisitor(QueryModel queryModel)
105+
{
106+
var visitor = new InfluxDBQueryVisitor(_bucket, _memberResolver, _queryableOptimizerSettings);
107+
visitor.VisitQueryModel(queryModel);
108+
return visitor;
109+
}
94110
}
95111
}

Client.Linq/Internal/QueryGenerationContext.cs

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,18 +5,23 @@ internal class QueryGenerationContext
55
internal readonly QueryAggregator QueryAggregator;
66
internal readonly IMemberNameResolver MemberResolver;
77
internal readonly VariableAggregator Variables;
8+
internal readonly QueryableOptimizerSettings QueryableOptimizerSettings;
89

9-
internal QueryGenerationContext(QueryAggregator queryAggregator, VariableAggregator variableAggregator,
10-
IMemberNameResolver memberResolver)
10+
internal QueryGenerationContext(
11+
QueryAggregator queryAggregator,
12+
VariableAggregator variableAggregator,
13+
IMemberNameResolver memberResolver,
14+
QueryableOptimizerSettings queryableOptimizerSettings)
1115
{
1216
QueryAggregator = queryAggregator;
1317
Variables = variableAggregator;
1418
MemberResolver = memberResolver;
19+
QueryableOptimizerSettings = queryableOptimizerSettings;
1520
}
1621

1722
internal QueryGenerationContext Clone(QueryAggregator queryAggregator)
1823
{
19-
return new QueryGenerationContext(queryAggregator, Variables, MemberResolver);
24+
return new QueryGenerationContext(queryAggregator, Variables, MemberResolver, QueryableOptimizerSettings);
2025
}
2126
}
2227
}

Client.Linq/Internal/QueryVisitor.cs

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,11 @@ internal class InfluxDBQueryVisitor : QueryModelVisitorBase
1717
{
1818
private readonly QueryGenerationContext _context;
1919

20-
internal InfluxDBQueryVisitor(string bucket, IMemberNameResolver memberResolver) :
21-
this(new QueryGenerationContext(new QueryAggregator(), new VariableAggregator(), memberResolver))
20+
internal InfluxDBQueryVisitor(
21+
string bucket,
22+
IMemberNameResolver memberResolver,
23+
QueryableOptimizerSettings queryableOptimizerSettings) :
24+
this(new QueryGenerationContext(new QueryAggregator(), new VariableAggregator(), memberResolver, queryableOptimizerSettings))
2225
{
2326
var bucketVariable = _context.Variables.AddNamedVariable(bucket);
2427
_context.QueryAggregator.AddBucket(bucketVariable);
@@ -59,7 +62,7 @@ internal File BuildFluxAST()
5962

6063
internal string BuildFluxQuery()
6164
{
62-
return _context.QueryAggregator.BuildFluxQuery();
65+
return _context.QueryAggregator.BuildFluxQuery(_context.QueryableOptimizerSettings);
6366
}
6467

6568
public override void VisitWhereClause(WhereClause whereClause, QueryModel queryModel, int index)
@@ -155,11 +158,11 @@ public override void VisitOrderByClause(OrderByClause orderByClause, QueryModel
155158

156159
foreach (var ordering in orderByClause.Orderings)
157160
{
158-
var orderPart = _context.Variables
159-
.AddNamedVariable(GetFluxExpression(ordering.Expression, orderByClause));
160-
var desc = _context.Variables
161-
.AddNamedVariable(ordering.OrderingDirection == OrderingDirection.Desc);
162-
_context.QueryAggregator.AddOrder(orderPart, desc);
161+
var column = GetFluxExpression(ordering.Expression, orderByClause);
162+
var columnVariable = _context.Variables.AddNamedVariable(column);
163+
var descending = ordering.OrderingDirection == OrderingDirection.Desc;
164+
var descendingVariable = _context.Variables.AddNamedVariable(descending);
165+
_context.QueryAggregator.AddOrder(column, columnVariable, descending, descendingVariable);
163166
}
164167
}
165168

0 commit comments

Comments
 (0)