Skip to content

Commit d4deebc

Browse files
lukaszdudek-silvairLloyd Wallis
authored and
Lloyd Wallis
committed
Add CQs management methods to the client (influxdata#681)
* Add CQs management methods to the client
1 parent 574697b commit d4deebc

File tree

4 files changed

+232
-0
lines changed

4 files changed

+232
-0
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
77
## [Unreleased]
88

99
### Added
10+
- Add `get_list_continuous_queries`, `drop_continuous_query`, and `create_continuous_query` management methods for
11+
continuous queries (#681 thx @lukaszdudek-silvair)
1012
- query() now accepts a bind_params argument for parameter binding (#678 thx @clslgrnc)
1113

1214
### Changed

influxdb/client.py

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -940,6 +940,98 @@ def get_list_privileges(self, username):
940940
text = "SHOW GRANTS FOR {0}".format(quote_ident(username))
941941
return list(self.query(text).get_points())
942942

943+
def get_list_continuous_queries(self):
944+
"""Get the list of continuous queries in InfluxDB.
945+
946+
:return: all CQs in InfluxDB
947+
:rtype: list of dictionaries
948+
949+
:Example:
950+
951+
::
952+
953+
>> cqs = client.get_list_cqs()
954+
>> cqs
955+
[
956+
{
957+
u'db1': []
958+
},
959+
{
960+
u'db2': [
961+
{
962+
u'name': u'vampire',
963+
u'query': u'CREATE CONTINUOUS QUERY vampire ON '
964+
'mydb BEGIN SELECT count(dracula) INTO '
965+
'mydb.autogen.all_of_them FROM '
966+
'mydb.autogen.one GROUP BY time(5m) END'
967+
}
968+
]
969+
}
970+
]
971+
"""
972+
query_string = "SHOW CONTINUOUS QUERIES"
973+
return [{sk[0]: list(p)} for sk, p in self.query(query_string).items()]
974+
975+
def create_continuous_query(self, name, select, database=None,
976+
resample_opts=None):
977+
r"""Create a continuous query for a database.
978+
979+
:param name: the name of continuous query to create
980+
:type name: str
981+
:param select: select statement for the continuous query
982+
:type select: str
983+
:param database: the database for which the continuous query is
984+
created. Defaults to current client's database
985+
:type database: str
986+
:param resample_opts: resample options
987+
:type resample_opts: str
988+
989+
:Example:
990+
991+
::
992+
993+
>> select_clause = 'SELECT mean("value") INTO "cpu_mean" ' \
994+
... 'FROM "cpu" GROUP BY time(1m)'
995+
>> client.create_continuous_query(
996+
... 'cpu_mean', select_clause, 'db_name', 'EVERY 10s FOR 2m'
997+
... )
998+
>> client.get_list_continuous_queries()
999+
[
1000+
{
1001+
'db_name': [
1002+
{
1003+
'name': 'cpu_mean',
1004+
'query': 'CREATE CONTINUOUS QUERY "cpu_mean" '
1005+
'ON "db_name" '
1006+
'RESAMPLE EVERY 10s FOR 2m '
1007+
'BEGIN SELECT mean("value") '
1008+
'INTO "cpu_mean" FROM "cpu" '
1009+
'GROUP BY time(1m) END'
1010+
}
1011+
]
1012+
}
1013+
]
1014+
"""
1015+
query_string = (
1016+
"CREATE CONTINUOUS QUERY {0} ON {1}{2} BEGIN {3} END"
1017+
).format(quote_ident(name), quote_ident(database or self._database),
1018+
' RESAMPLE ' + resample_opts if resample_opts else '', select)
1019+
self.query(query_string)
1020+
1021+
def drop_continuous_query(self, name, database=None):
1022+
"""Drop an existing continuous query for a database.
1023+
1024+
:param name: the name of continuous query to drop
1025+
:type name: str
1026+
:param database: the database for which the continuous query is
1027+
dropped. Defaults to current client's database
1028+
:type database: str
1029+
"""
1030+
query_string = (
1031+
"DROP CONTINUOUS QUERY {0} ON {1}"
1032+
).format(quote_ident(name), quote_ident(database or self._database))
1033+
self.query(query_string)
1034+
9431035
def send_packet(self, packet, protocol='json', time_precision=None):
9441036
"""Send an UDP packet.
9451037

influxdb/tests/client_test.py

Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1035,6 +1035,114 @@ def test_get_list_privileges_fails(self):
10351035
with _mocked_session(cli, 'get', 401):
10361036
cli.get_list_privileges('test')
10371037

1038+
def test_get_list_continuous_queries(self):
1039+
"""Test getting a list of continuous queries."""
1040+
data = {
1041+
"results": [
1042+
{
1043+
"statement_id": 0,
1044+
"series": [
1045+
{
1046+
"name": "testdb01",
1047+
"columns": ["name", "query"],
1048+
"values": [["testname01", "testquery01"],
1049+
["testname02", "testquery02"]]
1050+
},
1051+
{
1052+
"name": "testdb02",
1053+
"columns": ["name", "query"],
1054+
"values": [["testname03", "testquery03"]]
1055+
},
1056+
{
1057+
"name": "testdb03",
1058+
"columns": ["name", "query"]
1059+
}
1060+
]
1061+
}
1062+
]
1063+
}
1064+
1065+
with _mocked_session(self.cli, 'get', 200, json.dumps(data)):
1066+
self.assertListEqual(
1067+
self.cli.get_list_continuous_queries(),
1068+
[
1069+
{
1070+
'testdb01': [
1071+
{'name': 'testname01', 'query': 'testquery01'},
1072+
{'name': 'testname02', 'query': 'testquery02'}
1073+
]
1074+
},
1075+
{
1076+
'testdb02': [
1077+
{'name': 'testname03', 'query': 'testquery03'}
1078+
]
1079+
},
1080+
{
1081+
'testdb03': []
1082+
}
1083+
]
1084+
)
1085+
1086+
@raises(Exception)
1087+
def test_get_list_continuous_queries_fails(self):
1088+
"""Test failing to get a list of continuous queries."""
1089+
with _mocked_session(self.cli, 'get', 400):
1090+
self.cli.get_list_continuous_queries()
1091+
1092+
def test_create_continuous_query(self):
1093+
"""Test continuous query creation."""
1094+
data = {"results": [{}]}
1095+
with requests_mock.Mocker() as m:
1096+
m.register_uri(
1097+
requests_mock.GET,
1098+
"http://localhost:8086/query",
1099+
text=json.dumps(data)
1100+
)
1101+
query = 'SELECT count("value") INTO "6_months"."events" FROM ' \
1102+
'"events" GROUP BY time(10m)'
1103+
self.cli.create_continuous_query('cq_name', query, 'db_name')
1104+
self.assertEqual(
1105+
m.last_request.qs['q'][0],
1106+
'create continuous query "cq_name" on "db_name" begin select '
1107+
'count("value") into "6_months"."events" from "events" group '
1108+
'by time(10m) end'
1109+
)
1110+
self.cli.create_continuous_query('cq_name', query, 'db_name',
1111+
'EVERY 10s FOR 2m')
1112+
self.assertEqual(
1113+
m.last_request.qs['q'][0],
1114+
'create continuous query "cq_name" on "db_name" resample '
1115+
'every 10s for 2m begin select count("value") into '
1116+
'"6_months"."events" from "events" group by time(10m) end'
1117+
)
1118+
1119+
@raises(Exception)
1120+
def test_create_continuous_query_fails(self):
1121+
"""Test failing to create a continuous query."""
1122+
with _mocked_session(self.cli, 'get', 400):
1123+
self.cli.create_continuous_query('cq_name', 'select', 'db_name')
1124+
1125+
def test_drop_continuous_query(self):
1126+
"""Test dropping a continuous query."""
1127+
data = {"results": [{}]}
1128+
with requests_mock.Mocker() as m:
1129+
m.register_uri(
1130+
requests_mock.GET,
1131+
"http://localhost:8086/query",
1132+
text=json.dumps(data)
1133+
)
1134+
self.cli.drop_continuous_query('cq_name', 'db_name')
1135+
self.assertEqual(
1136+
m.last_request.qs['q'][0],
1137+
'drop continuous query "cq_name" on "db_name"'
1138+
)
1139+
1140+
@raises(Exception)
1141+
def test_drop_continuous_query_fails(self):
1142+
"""Test failing to drop a continuous query."""
1143+
with _mocked_session(self.cli, 'get', 400):
1144+
self.cli.drop_continuous_query('cq_name', 'db_name')
1145+
10381146
def test_invalid_port_fails(self):
10391147
"""Test invalid port fail for TestInfluxDBClient object."""
10401148
with self.assertRaises(ValueError):

influxdb/tests/server_tests/client_test_with_server.py

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -722,6 +722,36 @@ def test_drop_retention_policy(self):
722722
rsp
723723
)
724724

725+
def test_create_continuous_query(self):
726+
"""Test continuous query creation."""
727+
self.cli.create_retention_policy('some_rp', '1d', 1)
728+
query = 'select count("value") into "some_rp"."events" from ' \
729+
'"events" group by time(10m)'
730+
self.cli.create_continuous_query('test_cq', query, 'db')
731+
cqs = self.cli.get_list_continuous_queries()
732+
expected_cqs = [
733+
{
734+
'db': [
735+
{
736+
'name': 'test_cq',
737+
'query': 'CREATE CONTINUOUS QUERY test_cq ON db '
738+
'BEGIN SELECT count(value) INTO '
739+
'db.some_rp.events FROM db.autogen.events '
740+
'GROUP BY time(10m) END'
741+
}
742+
]
743+
}
744+
]
745+
self.assertEqual(cqs, expected_cqs)
746+
747+
def test_drop_continuous_query(self):
748+
"""Test continuous query drop."""
749+
self.test_create_continuous_query()
750+
self.cli.drop_continuous_query('test_cq', 'db')
751+
cqs = self.cli.get_list_continuous_queries()
752+
expected_cqs = [{'db': []}]
753+
self.assertEqual(cqs, expected_cqs)
754+
725755
def test_issue_143(self):
726756
"""Test for PR#143 from repo."""
727757
pt = partial(point, 'a_series_name', timestamp='2015-03-30T16:16:37Z')

0 commit comments

Comments
 (0)