Skip to content

Commit 9e783e7

Browse files
Adding support for list rules API filter parameters (#5417)
Adding support to filter rules with rules API query parameters Signed-off-by: Anand Rajagopal <[email protected]> --------- Signed-off-by: Anand Rajagopal <[email protected]>
1 parent eaf9463 commit 9e783e7

File tree

9 files changed

+809
-75
lines changed

9 files changed

+809
-75
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
* [FEATURE] Querier/StoreGateway: Allow the tenant shard sizes to be a percent of total instances. #5393
1919
* [FEATURE] Added the flag `-alertmanager.api-concurrency` to configure alert manager api concurrency limit. #5412
2020
* [FEATURE] Store Gateway: Add `-store-gateway.sharding-ring.keep-instance-in-the-ring-on-shutdown` to skip unregistering instance from the ring in shutdown. #5421
21+
* [FEATURE] Ruler: Support for filtering rules in the API. #5417
2122
* [ENHANCEMENT] Distributor/Ingester: Add span on push path #5319
2223
* [ENHANCEMENT] Support object storage backends for runtime configuration file. #5292
2324
* [ENHANCEMENT] Query Frontend: Reject subquery with too small step size. #5323

docs/contributing/how-integration-tests-work.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -46,4 +46,4 @@ Integration tests have `requires_docker` tag (`// +build requires_docker` line f
4646

4747
## Isolation
4848

49-
Each integration test runs in isolation. For each integration test, we do create a Docker network, start Cortex and its dependencies containers, push/query series to/from Cortex and run assertions on it. Once the test has done, both the Docker network and containers are terminated and deleted.
49+
Each integration test runs in isolation. For each integration test, we do create a Docker network, start Cortex and its dependencies containers, push/query series to/from Cortex and run assertions on it. Once the test has done, both the Docker network and containers are terminated and deleted.

integration/e2ecortex/client.go

+63
Original file line numberDiff line numberDiff line change
@@ -361,6 +361,69 @@ type ServerStatus struct {
361361
} `json:"data"`
362362
}
363363

364+
type RuleFilter struct {
365+
Namespaces []string
366+
RuleGroupNames []string
367+
RuleNames []string
368+
RuleType string
369+
}
370+
371+
func addQueryParams(urlValues url.Values, paramName string, params ...string) {
372+
for _, paramValue := range params {
373+
urlValues.Add(paramName, paramValue)
374+
}
375+
}
376+
377+
// GetPrometheusRulesWithFilter fetches the rules from the Prometheus endpoint /api/v1/rules.
378+
func (c *Client) GetPrometheusRulesWithFilter(filter RuleFilter) ([]*ruler.RuleGroup, error) {
379+
// Create HTTP request
380+
381+
req, err := http.NewRequest("GET", fmt.Sprintf("http://%s/api/prom/api/v1/rules", c.rulerAddress), nil)
382+
if err != nil {
383+
return nil, err
384+
}
385+
req.Header.Set("X-Scope-OrgID", c.orgID)
386+
387+
urlValues := req.URL.Query()
388+
addQueryParams(urlValues, "file[]", filter.Namespaces...)
389+
addQueryParams(urlValues, "rule_name[]", filter.RuleNames...)
390+
addQueryParams(urlValues, "rule_group[]", filter.RuleGroupNames...)
391+
addQueryParams(urlValues, "type", filter.RuleType)
392+
req.URL.RawQuery = urlValues.Encode()
393+
394+
ctx, cancel := context.WithTimeout(context.Background(), c.timeout)
395+
defer cancel()
396+
397+
// Execute HTTP request
398+
res, err := c.httpClient.Do(req.WithContext(ctx))
399+
if err != nil {
400+
return nil, err
401+
}
402+
defer res.Body.Close()
403+
404+
body, err := io.ReadAll(res.Body)
405+
if err != nil {
406+
return nil, err
407+
}
408+
409+
// Decode the response.
410+
type response struct {
411+
Status string `json:"status"`
412+
Data ruler.RuleDiscovery `json:"data"`
413+
}
414+
415+
decoded := &response{}
416+
if err := json.Unmarshal(body, decoded); err != nil {
417+
return nil, err
418+
}
419+
420+
if decoded.Status != "success" {
421+
return nil, fmt.Errorf("unexpected response status '%s'", decoded.Status)
422+
}
423+
424+
return decoded.Data.RuleGroups, nil
425+
}
426+
364427
// GetPrometheusRules fetches the rules from the Prometheus endpoint /api/v1/rules.
365428
func (c *Client) GetPrometheusRules() ([]*ruler.RuleGroup, error) {
366429
// Create HTTP request

integration/ruler_test.go

+160
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"crypto/x509/pkix"
1010
"fmt"
1111
"math"
12+
"math/rand"
1213
"net/http"
1314
"os"
1415
"path/filepath"
@@ -17,6 +18,8 @@ import (
1718
"testing"
1819
"time"
1920

21+
"github.com/cortexproject/cortex/pkg/ruler"
22+
2023
"github.com/cortexproject/cortex/pkg/storage/tsdb"
2124

2225
"github.com/prometheus/common/model"
@@ -389,6 +392,163 @@ func TestRulerSharding(t *testing.T) {
389392
assert.ElementsMatch(t, expectedNames, actualNames)
390393
}
391394

395+
func TestRulerAPISharding(t *testing.T) {
396+
const numRulesGroups = 100
397+
398+
random := rand.New(rand.NewSource(time.Now().UnixNano()))
399+
s, err := e2e.NewScenario(networkName)
400+
require.NoError(t, err)
401+
defer s.Close()
402+
403+
// Generate multiple rule groups, with 1 rule each.
404+
ruleGroups := make([]rulefmt.RuleGroup, numRulesGroups)
405+
expectedNames := make([]string, numRulesGroups)
406+
alertCount := 0
407+
for i := 0; i < numRulesGroups; i++ {
408+
num := random.Intn(100)
409+
var ruleNode yaml.Node
410+
var exprNode yaml.Node
411+
412+
ruleNode.SetString(fmt.Sprintf("rule_%d", i))
413+
exprNode.SetString(strconv.Itoa(i))
414+
ruleName := fmt.Sprintf("test_%d", i)
415+
416+
expectedNames[i] = ruleName
417+
if num%2 == 0 {
418+
alertCount++
419+
ruleGroups[i] = rulefmt.RuleGroup{
420+
Name: ruleName,
421+
Interval: 60,
422+
Rules: []rulefmt.RuleNode{{
423+
Alert: ruleNode,
424+
Expr: exprNode,
425+
}},
426+
}
427+
} else {
428+
ruleGroups[i] = rulefmt.RuleGroup{
429+
Name: ruleName,
430+
Interval: 60,
431+
Rules: []rulefmt.RuleNode{{
432+
Record: ruleNode,
433+
Expr: exprNode,
434+
}},
435+
}
436+
}
437+
}
438+
439+
// Start dependencies.
440+
consul := e2edb.NewConsul()
441+
minio := e2edb.NewMinio(9000, rulestoreBucketName)
442+
require.NoError(t, s.StartAndWaitReady(consul, minio))
443+
444+
// Configure the ruler.
445+
rulerFlags := mergeFlags(
446+
BlocksStorageFlags(),
447+
RulerFlags(),
448+
RulerShardingFlags(consul.NetworkHTTPEndpoint()),
449+
map[string]string{
450+
// Since we're not going to run any rule, we don't need the
451+
// store-gateway to be configured to a valid address.
452+
"-querier.store-gateway-addresses": "localhost:12345",
453+
// Enable the bucket index so we can skip the initial bucket scan.
454+
"-blocks-storage.bucket-store.bucket-index.enabled": "true",
455+
},
456+
)
457+
458+
// Start rulers.
459+
ruler1 := e2ecortex.NewRuler("ruler-1", consul.NetworkHTTPEndpoint(), rulerFlags, "")
460+
ruler2 := e2ecortex.NewRuler("ruler-2", consul.NetworkHTTPEndpoint(), rulerFlags, "")
461+
rulers := e2ecortex.NewCompositeCortexService(ruler1, ruler2)
462+
require.NoError(t, s.StartAndWaitReady(ruler1, ruler2))
463+
464+
// Upload rule groups to one of the rulers.
465+
c, err := e2ecortex.NewClient("", "", "", ruler1.HTTPEndpoint(), "user-1")
466+
require.NoError(t, err)
467+
468+
namespaceNames := []string{"test1", "test2", "test3", "test4", "test5"}
469+
namespaceNameCount := make([]int, 5)
470+
nsRand := rand.New(rand.NewSource(time.Now().UnixNano()))
471+
for _, ruleGroup := range ruleGroups {
472+
index := nsRand.Intn(len(namespaceNames))
473+
namespaceNameCount[index] = namespaceNameCount[index] + 1
474+
require.NoError(t, c.SetRuleGroup(ruleGroup, namespaceNames[index]))
475+
}
476+
477+
// Wait until rulers have loaded all rules.
478+
require.NoError(t, rulers.WaitSumMetricsWithOptions(e2e.Equals(numRulesGroups), []string{"cortex_prometheus_rule_group_rules"}, e2e.WaitMissingMetrics))
479+
480+
// Since rulers have loaded all rules, we expect that rules have been sharded
481+
// between the two rulers.
482+
require.NoError(t, ruler1.WaitSumMetrics(e2e.Less(numRulesGroups), "cortex_prometheus_rule_group_rules"))
483+
require.NoError(t, ruler2.WaitSumMetrics(e2e.Less(numRulesGroups), "cortex_prometheus_rule_group_rules"))
484+
485+
testCases := map[string]struct {
486+
filter e2ecortex.RuleFilter
487+
resultCheckFn func(assert.TestingT, []*ruler.RuleGroup)
488+
}{
489+
"Filter for Alert Rules": {
490+
filter: e2ecortex.RuleFilter{
491+
RuleType: "alert",
492+
},
493+
resultCheckFn: func(t assert.TestingT, ruleGroups []*ruler.RuleGroup) {
494+
assert.Len(t, ruleGroups, alertCount, "Expected %d rules but got %d", alertCount, len(ruleGroups))
495+
},
496+
},
497+
"Filter for Recording Rules": {
498+
filter: e2ecortex.RuleFilter{
499+
RuleType: "record",
500+
},
501+
resultCheckFn: func(t assert.TestingT, ruleGroups []*ruler.RuleGroup) {
502+
assert.Len(t, ruleGroups, numRulesGroups-alertCount, "Expected %d rules but got %d", numRulesGroups-alertCount, len(ruleGroups))
503+
},
504+
},
505+
"Filter by Namespace Name": {
506+
filter: e2ecortex.RuleFilter{
507+
Namespaces: []string{namespaceNames[2]},
508+
},
509+
resultCheckFn: func(t assert.TestingT, ruleGroups []*ruler.RuleGroup) {
510+
assert.Len(t, ruleGroups, namespaceNameCount[2], "Expected %d rules but got %d", namespaceNameCount[2], len(ruleGroups))
511+
},
512+
},
513+
"Filter by Namespace Name and Alert Rules": {
514+
filter: e2ecortex.RuleFilter{
515+
RuleType: "alert",
516+
Namespaces: []string{namespaceNames[2]},
517+
},
518+
resultCheckFn: func(t assert.TestingT, ruleGroups []*ruler.RuleGroup) {
519+
for _, ruleGroup := range ruleGroups {
520+
rule := ruleGroup.Rules[0].(map[string]interface{})
521+
ruleType := rule["type"]
522+
assert.Equal(t, "alerting", ruleType, "Expected 'alerting' rule type but got %s", ruleType)
523+
}
524+
},
525+
},
526+
"Filter by Rule Names": {
527+
filter: e2ecortex.RuleFilter{
528+
RuleNames: []string{"rule_3", "rule_64", "rule_99"},
529+
},
530+
resultCheckFn: func(t assert.TestingT, ruleGroups []*ruler.RuleGroup) {
531+
ruleNames := []string{}
532+
for _, ruleGroup := range ruleGroups {
533+
rule := ruleGroup.Rules[0].(map[string]interface{})
534+
ruleName := rule["name"]
535+
ruleNames = append(ruleNames, ruleName.(string))
536+
537+
}
538+
assert.Len(t, ruleNames, 3, "Expected %d rules but got %d", 3, len(ruleNames))
539+
},
540+
},
541+
}
542+
// For each test case, fetch the rules with configured filters, and ensure the results match.
543+
for name, tc := range testCases {
544+
t.Run(name, func(t *testing.T) {
545+
actualGroups, err := c.GetPrometheusRulesWithFilter(tc.filter)
546+
require.NoError(t, err)
547+
tc.resultCheckFn(t, actualGroups)
548+
})
549+
}
550+
}
551+
392552
func TestRulerAlertmanager(t *testing.T) {
393553
var namespaceOne = "test_/encoded_+namespace/?"
394554
ruleGroup := createTestRuleGroup(t)

pkg/ruler/api.go

+45-2
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package ruler
22

33
import (
44
"encoding/json"
5+
"fmt"
56
io "io"
67
"net/http"
78
"net/url"
@@ -119,6 +120,26 @@ func respondError(logger log.Logger, w http.ResponseWriter, msg string) {
119120
}
120121
}
121122

123+
func respondBadRequest(logger log.Logger, w http.ResponseWriter, msg string) {
124+
b, err := json.Marshal(&response{
125+
Status: "error",
126+
ErrorType: v1.ErrBadData,
127+
Error: msg,
128+
Data: nil,
129+
})
130+
131+
if err != nil {
132+
level.Error(logger).Log("msg", "error marshaling json response", "err", err)
133+
http.Error(w, err.Error(), http.StatusInternalServerError)
134+
return
135+
}
136+
137+
w.WriteHeader(http.StatusBadRequest)
138+
if n, err := w.Write(b); err != nil {
139+
level.Error(logger).Log("msg", "error writing response", "bytesWritten", n, "err", err)
140+
}
141+
}
142+
122143
// API is used to handle HTTP requests for the ruler service
123144
type API struct {
124145
ruler *Ruler
@@ -145,8 +166,27 @@ func (a *API) PrometheusRules(w http.ResponseWriter, req *http.Request) {
145166
return
146167
}
147168

169+
if err := req.ParseForm(); err != nil {
170+
level.Error(logger).Log("msg", "error parsing form/query params", "err", err)
171+
respondBadRequest(logger, w, "error parsing form/query params")
172+
return
173+
}
174+
175+
typ := strings.ToLower(req.URL.Query().Get("type"))
176+
if typ != "" && typ != alertingRuleFilter && typ != recordingRuleFilter {
177+
respondBadRequest(logger, w, fmt.Sprintf("unsupported rule type %q", typ))
178+
return
179+
}
180+
181+
rulesRequest := RulesRequest{
182+
RuleNames: req.Form["rule_name[]"],
183+
RuleGroupNames: req.Form["rule_group[]"],
184+
Files: req.Form["file[]"],
185+
Type: typ,
186+
}
187+
148188
w.Header().Set("Content-Type", "application/json")
149-
rgs, err := a.ruler.GetRules(req.Context())
189+
rgs, err := a.ruler.GetRules(req.Context(), rulesRequest)
150190

151191
if err != nil {
152192
respondError(logger, w, err.Error())
@@ -238,7 +278,10 @@ func (a *API) PrometheusAlerts(w http.ResponseWriter, req *http.Request) {
238278
}
239279

240280
w.Header().Set("Content-Type", "application/json")
241-
rgs, err := a.ruler.GetRules(req.Context())
281+
rulesRequest := RulesRequest{
282+
Type: alertingRuleFilter,
283+
}
284+
rgs, err := a.ruler.GetRules(req.Context(), rulesRequest)
242285

243286
if err != nil {
244287
respondError(logger, w, err.Error())

0 commit comments

Comments
 (0)