diff --git a/go.mod b/go.mod index 7c4a2a58d4d50..3f8a52d8ee99c 100644 --- a/go.mod +++ b/go.mod @@ -423,6 +423,7 @@ require ( github.com/RoaringBitmap/roaring v0.9.4 // indirect github.com/acomagu/bufpipe v1.0.3 // indirect github.com/axiomhq/hyperloglog v0.0.0-20191112132149-a4c4c47bc57f // indirect + github.com/bitly/go-simplejson v0.5.1 github.com/bits-and-blooms/bitset v1.2.0 // indirect github.com/blevesearch/go-porterstemmer v1.0.3 // indirect github.com/blevesearch/mmap-go v1.0.4 // indirect @@ -443,6 +444,7 @@ require ( github.com/go-logr/logr v1.2.4 // @grafana/grafana-app-platform-squad github.com/go-logr/stdr v1.2.2 // indirect github.com/google/go-github v17.0.0+incompatible // @grafana/grafana-app-platform-squad + github.com/grafadruid/go-druid v0.0.6 github.com/grpc-ecosystem/grpc-gateway/v2 v2.16.0 // indirect github.com/hmarr/codeowners v1.1.2 // @grafana/grafana-as-code github.com/imdario/mergo v0.3.13 // indirect diff --git a/go.sum b/go.sum index 300c53900f164..716b9b327def6 100644 --- a/go.sum +++ b/go.sum @@ -794,6 +794,8 @@ github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs= github.com/bitly/go-hostpool v0.0.0-20171023180738-a3a6125de932/go.mod h1:NOuUCSz6Q9T7+igc/hlvDOUdtWKryOrtFyIVABv/p7k= +github.com/bitly/go-simplejson v0.5.1 h1:xgwPbetQScXt1gh9BmoJ6j9JMr3TElvuIyjR8pgdoow= +github.com/bitly/go-simplejson v0.5.1/go.mod h1:YOPVLzCfwK14b4Sff3oP1AmGhI9T9Vsg84etUnlyp+Q= github.com/bits-and-blooms/bitset v1.2.0 h1:Kn4yilvwNtMACtf1eYDlG8H77R07mZSPbMjLyS07ChA= github.com/bits-and-blooms/bitset v1.2.0/go.mod h1:gIdJ4wp64HaoK2YrL1Q5/N7Y16edYb8uY+O0FJTyyDA= github.com/blang/semver/v4 v4.0.0 h1:1PFHFE6yCCTv8C1TeyNNarDzntLi7wMI5i/pzqYIsAM= @@ -1756,6 +1758,8 @@ github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWm github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/gotestyourself/gotestyourself v1.3.0/go.mod h1:zZKM6oeNM8k+FRljX1mnzVYeS8wiGgQyvST1/GafPbY= github.com/gotestyourself/gotestyourself v2.2.0+incompatible/go.mod h1:zZKM6oeNM8k+FRljX1mnzVYeS8wiGgQyvST1/GafPbY= +github.com/grafadruid/go-druid v0.0.6 h1:Nt9jQrhrtHi1BJICN9aDJgYDmBmc10pJYpQiuwAsxa4= +github.com/grafadruid/go-druid v0.0.6/go.mod h1:KY3a6MrVMKkXgMTwBS9Nrhm1E8OWyR4gd0WzUi8d/zM= github.com/grafana/alerting v0.0.0-20230606080147-55b8d71c7890 h1:ubNIgVGX4PQ9YI1nWnt2mky3il8clWSjdo3NFSD26DQ= github.com/grafana/alerting v0.0.0-20230606080147-55b8d71c7890/go.mod h1:zEflOvMVchYhRbFb5ziXVR/JG67FOLBzQTjhHh9xaI4= github.com/grafana/codejen v0.0.3 h1:tAWxoTUuhgmEqxJPOLtJoxlPBbMULFwKFOcRsPRPXDw= @@ -1857,6 +1861,7 @@ github.com/hashicorp/go-plugin v1.2.2/go.mod h1:F9eH4LrE/ZsRdbwhfjs9k9HoDUwAHnYt github.com/hashicorp/go-plugin v1.4.9 h1:ESiK220/qE0aGxWdzKIvRH69iLiuN/PjoLTm69RoWtU= github.com/hashicorp/go-plugin v1.4.9/go.mod h1:viDMjcLJuDui6pXb8U4HVfb8AamCWhHGUjr2IrTF67s= github.com/hashicorp/go-retryablehttp v0.5.3/go.mod h1:9B5zBasrRhHXnJnui7y6sL7es7NDiJgTc6Er0maI1Xs= +github.com/hashicorp/go-retryablehttp v0.6.7/go.mod h1:vAew36LZh98gCBJNLH42IQ1ER/9wtLZZ8meHqQvEYWY= github.com/hashicorp/go-retryablehttp v0.6.8/go.mod h1:vAew36LZh98gCBJNLH42IQ1ER/9wtLZZ8meHqQvEYWY= github.com/hashicorp/go-retryablehttp v0.7.1/go.mod h1:vAew36LZh98gCBJNLH42IQ1ER/9wtLZZ8meHqQvEYWY= github.com/hashicorp/go-retryablehttp v0.7.2 h1:AcYqCvkpalPnPF2pn0KamgwamS42TqUDDYFRKq/RAd0= diff --git a/pkg/api/plugin_resource_test.go b/pkg/api/plugin_resource_test.go index 97c08eaf6e014..ce4a4632287a7 100644 --- a/pkg/api/plugin_resource_test.go +++ b/pkg/api/plugin_resource_test.go @@ -63,7 +63,7 @@ func TestCallResource(t *testing.T) { cfg.Azure = &azsettings.AzureSettings{} coreRegistry := coreplugin.ProvideCoreRegistry(nil, &cloudwatch.CloudWatchService{}, nil, nil, nil, nil, - nil, nil, nil, nil, testdatasource.ProvideService(), nil, nil, nil, nil, nil, nil) + nil, nil, nil, nil, testdatasource.ProvideService(), nil, nil, nil, nil, nil, nil, nil) pCfg, err := config.ProvideConfig(setting.ProvideProvider(cfg), cfg, featuremgmt.WithFeatures()) require.NoError(t, err) reg := registry.ProvideService() diff --git a/pkg/plugins/backendplugin/coreplugin/registry.go b/pkg/plugins/backendplugin/coreplugin/registry.go index e029fba1069d3..3a2dd3471b755 100644 --- a/pkg/plugins/backendplugin/coreplugin/registry.go +++ b/pkg/plugins/backendplugin/coreplugin/registry.go @@ -12,6 +12,7 @@ import ( "github.com/grafana/grafana/pkg/tsdb/azuremonitor" cloudmonitoring "github.com/grafana/grafana/pkg/tsdb/cloud-monitoring" "github.com/grafana/grafana/pkg/tsdb/cloudwatch" + "github.com/grafana/grafana/pkg/tsdb/druid" "github.com/grafana/grafana/pkg/tsdb/elasticsearch" pyroscope "github.com/grafana/grafana/pkg/tsdb/grafana-pyroscope-datasource" "github.com/grafana/grafana/pkg/tsdb/grafanads" @@ -37,6 +38,7 @@ const ( InfluxDB = "influxdb" Loki = "loki" OpenTSDB = "opentsdb" + Druid = "grafadruid-druid-datasource" Prometheus = "prometheus" Tempo = "tempo" TestData = "testdata" @@ -84,7 +86,7 @@ func NewRegistry(store map[string]backendplugin.PluginFactoryFunc) *Registry { func ProvideCoreRegistry(am *azuremonitor.Service, cw *cloudwatch.CloudWatchService, cm *cloudmonitoring.Service, es *elasticsearch.Service, grap *graphite.Service, idb *influxdb.Service, lk *loki.Service, otsdb *opentsdb.Service, pr *prometheus.Service, t *tempo.Service, td *testdatasource.Service, pg *postgres.Service, my *mysql.Service, - ms *mssql.Service, graf *grafanads.Service, pyroscope *pyroscope.Service, parca *parca.Service) *Registry { + ms *mssql.Service, graf *grafanads.Service, pyroscope *pyroscope.Service, parca *parca.Service, dr *druid.Service) *Registry { return NewRegistry(map[string]backendplugin.PluginFactoryFunc{ CloudWatch: asBackendPlugin(cw.Executor), CloudMonitoring: asBackendPlugin(cm), @@ -103,6 +105,7 @@ func ProvideCoreRegistry(am *azuremonitor.Service, cw *cloudwatch.CloudWatchServ Grafana: asBackendPlugin(graf), Pyroscope: asBackendPlugin(pyroscope), Parca: asBackendPlugin(parca), + Druid: asBackendPlugin(dr), }) } diff --git a/pkg/plugins/manager/manager_integration_test.go b/pkg/plugins/manager/manager_integration_test.go index f87061bbee54e..33009e6e776bf 100644 --- a/pkg/plugins/manager/manager_integration_test.go +++ b/pkg/plugins/manager/manager_integration_test.go @@ -40,6 +40,7 @@ import ( "github.com/grafana/grafana/pkg/tsdb/azuremonitor" cloudmonitoring "github.com/grafana/grafana/pkg/tsdb/cloud-monitoring" "github.com/grafana/grafana/pkg/tsdb/cloudwatch" + "github.com/grafana/grafana/pkg/tsdb/druid" "github.com/grafana/grafana/pkg/tsdb/elasticsearch" pyroscope "github.com/grafana/grafana/pkg/tsdb/grafana-pyroscope-datasource" "github.com/grafana/grafana/pkg/tsdb/grafanads" @@ -111,8 +112,9 @@ func TestIntegrationPluginManager(t *testing.T) { graf := grafanads.ProvideService(sv2, nil) phlare := pyroscope.ProvideService(hcp, acimpl.ProvideAccessControl(cfg)) parca := parca.ProvideService(hcp) + druid := druid.ProvideService(hcp) - coreRegistry := coreplugin.ProvideCoreRegistry(am, cw, cm, es, grap, idb, lk, otsdb, pr, tmpo, td, pg, my, ms, graf, phlare, parca) + coreRegistry := coreplugin.ProvideCoreRegistry(am, cw, cm, es, grap, idb, lk, otsdb, pr, tmpo, td, pg, my, ms, graf, phlare, parca, druid) pCfg, err := config.ProvideConfig(setting.ProvideProvider(cfg), cfg, featuremgmt.WithFeatures()) require.NoError(t, err) @@ -224,6 +226,7 @@ func verifyCorePluginCatalogue(t *testing.T, ctx context.Context, ps *store.Serv "zipkin": {}, "grafana-pyroscope-datasource": {}, "parca": {}, + "druid": {}, } expApps := map[string]struct{}{ diff --git a/pkg/plugins/pfs/corelist/corelist_load_gen.go b/pkg/plugins/pfs/corelist/corelist_load_gen.go index 83808c4e74499..a01a3cd72f904 100644 --- a/pkg/plugins/pfs/corelist/corelist_load_gen.go +++ b/pkg/plugins/pfs/corelist/corelist_load_gen.go @@ -84,5 +84,7 @@ func corePlugins(rt *thema.Runtime) []pfs.ParsedPlugin { parsePluginOrPanic("public/app/plugins/panel/trend", "trend", rt), parsePluginOrPanic("public/app/plugins/panel/welcome", "welcome", rt), parsePluginOrPanic("public/app/plugins/panel/xychart", "xychart", rt), + parsePluginOrPanic("public/app/plugins/datasource/grafadruid-druid-datasource", "grafadruid_druid_datasource", rt), + parsePluginOrPanic("public/app/plugins/panel/sankey-panel-0.5.0", "ismael_sankey_panel", rt), } } diff --git a/pkg/server/wire.go b/pkg/server/wire.go index 22a33edce4ff5..fb9ce3ffa511b 100644 --- a/pkg/server/wire.go +++ b/pkg/server/wire.go @@ -65,7 +65,7 @@ import ( "github.com/grafana/grafana/pkg/services/featuremgmt" "github.com/grafana/grafana/pkg/services/folder" "github.com/grafana/grafana/pkg/services/folder/folderimpl" - "github.com/grafana/grafana/pkg/services/grafana-apiserver" + grafanaapiserver "github.com/grafana/grafana/pkg/services/grafana-apiserver" "github.com/grafana/grafana/pkg/services/grpcserver" grpccontext "github.com/grafana/grafana/pkg/services/grpcserver/context" "github.com/grafana/grafana/pkg/services/grpcserver/interceptors" @@ -146,6 +146,7 @@ import ( "github.com/grafana/grafana/pkg/tsdb/azuremonitor" cloudmonitoring "github.com/grafana/grafana/pkg/tsdb/cloud-monitoring" "github.com/grafana/grafana/pkg/tsdb/cloudwatch" + "github.com/grafana/grafana/pkg/tsdb/druid" "github.com/grafana/grafana/pkg/tsdb/elasticsearch" pyroscope "github.com/grafana/grafana/pkg/tsdb/grafana-pyroscope-datasource" "github.com/grafana/grafana/pkg/tsdb/grafanads" @@ -165,6 +166,7 @@ import ( ) var wireBasicSet = wire.NewSet( + druid.ProvideService, legacydataservice.ProvideService, wire.Bind(new(legacydata.RequestHandler), new(*legacydataservice.Service)), annotationsimpl.ProvideService, diff --git a/pkg/tsdb/druid/column_type.go b/pkg/tsdb/druid/column_type.go new file mode 100644 index 0000000000000..20433d84ec1e9 --- /dev/null +++ b/pkg/tsdb/druid/column_type.go @@ -0,0 +1,54 @@ +package druid + +import ( + "strconv" + "strings" + "time" +) + +func detectColumnType(c *responseColumn, pos int, rows [][]interface{}) { + t := map[columnType]int{"nil": 0} + maxRowsToScan := (len(rows) / 5) + 1 + for _, row := range rows[:maxRowsToScan] { + switch v := row[pos].(type) { + case string: + _, err := strconv.Atoi(v) + if err == nil { + t[ColumnInt]++ + continue + } + _, err = strconv.ParseBool(v) + if err == nil { + t[ColumnBool]++ + continue + } + // TODO is there any other timestamp format possible? + _, err = time.Parse("2006-01-02T15:04:05.000Z", v) + if err == nil { + t[ColumnTime]++ + continue + } + t[ColumnString]++ + continue + case float64: + if c.Name == "__time" || strings.Contains(strings.ToLower(c.Name), "time_") { + t[ColumnTime]++ + continue + } + t[ColumnFloat]++ + continue + case bool: + t[ColumnBool]++ + continue + } + } + key := ColumnString + maxVal := 0 + for k, v := range t { + if v > maxVal { + maxVal = v + key = k + } + } + c.Type = key +} diff --git a/pkg/tsdb/druid/druid.go b/pkg/tsdb/druid/druid.go new file mode 100644 index 0000000000000..44344952809b0 --- /dev/null +++ b/pkg/tsdb/druid/druid.go @@ -0,0 +1,907 @@ +package druid + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "math" + "net/http" + "strconv" + "strings" + "time" + + "github.com/bitly/go-simplejson" + "github.com/grafadruid/go-druid" + druidquerybuilder "github.com/grafadruid/go-druid/builder" + druidquery "github.com/grafadruid/go-druid/builder/query" + "github.com/grafana/grafana-plugin-sdk-go/backend" + "github.com/grafana/grafana-plugin-sdk-go/backend/datasource" + "github.com/grafana/grafana-plugin-sdk-go/backend/instancemgmt" + "github.com/grafana/grafana-plugin-sdk-go/data" + "github.com/grafana/grafana/pkg/infra/httpclient" + "github.com/grafana/grafana/pkg/tsdb/druid/result" +) + +// Internal interval and range variables +var ( + varInterval = variableVariants("__interval") + varIntervalMs = variableVariants("__interval_ms") + varRange = variableVariants("__range") + varRangeS = variableVariants("__range_s") + varRangeMs = variableVariants("__range_ms") + varRateInterval = variableVariants("__rate_interval") +) + +func variableVariants(base string) []string { + return []string{ + fmt.Sprintf(`"${%s}"`, base), + fmt.Sprintf(`"$%s"`, base), + fmt.Sprintf(`$%s`, base), + fmt.Sprintf(`${%s}`, base), + } +} + +type druidQuery struct { + Builder map[string]interface{} `json:"builder"` + Settings map[string]interface{} `json:"settings"` +} + +type druidResponse struct { + Reference string + Columns []responseColumn + Rows [][]interface{} +} + +type columnType string + +const ( + ColumnString columnType = "string" + ColumnTime columnType = "time" + ColumnBool columnType = "bool" + ColumnInt columnType = "int" + ColumnFloat columnType = "float" +) + +type responseColumn struct { + Name string + Type columnType +} + +type druidInstanceSettings struct { + client *druid.Client + defaultQuerySettings map[string]interface{} +} + +func (s *druidInstanceSettings) Dispose() { + s.client.Close() +} + +func newDataSourceInstance(settings backend.DataSourceInstanceSettings) (instancemgmt.Instance, error) { + data, err := simplejson.NewJson(settings.JSONData) + if err != nil { + return &druidInstanceSettings{}, err + } + secureData := settings.DecryptedSecureJSONData + + var druidOpts []druid.ClientOption + if retryMax := data.Get("connection.retryableRetryMax").MustInt(-1); retryMax != -1 { + druidOpts = append(druidOpts, druid.WithRetryMax(retryMax)) + } + if retryWaitMin := data.Get("connection.retryableRetryWaitMin").MustInt(-1); retryWaitMin != -1 { + druidOpts = append(druidOpts, druid.WithRetryWaitMin(time.Duration(retryWaitMin)*time.Millisecond)) + } + if retryWaitMax := data.Get("connection.retryableRetryWaitMax").MustInt(-1); retryWaitMax != -1 { + druidOpts = append(druidOpts, druid.WithRetryWaitMax(time.Duration(retryWaitMax)*time.Millisecond)) + } + if basicAuth := data.Get("connection.basicAuth").MustBool(); basicAuth { + druidOpts = append(druidOpts, druid.WithBasicAuth(data.Get("connection.basicAuthUser").MustString(), secureData["connection.basicAuthPassword"])) + } + if skipTLS := data.Get("connection.skipTls").MustBool(); skipTLS { + druidOpts = append(druidOpts, druid.WithSkipTLSVerify()) + } + + c, err := druid.NewClient(data.Get("connection.url").MustString(), druidOpts...) + if err != nil { + return &druidInstanceSettings{}, err + } + + return &druidInstanceSettings{ + client: c, + defaultQuerySettings: prepareQuerySettings(settings.JSONData), + }, nil +} + +func prepareQuerySettings(data json.RawMessage) map[string]interface{} { + var d map[string]interface{} + settings := make(map[string]interface{}) + err := json.Unmarshal(data, &d) + if err != nil { + return settings + } + for k, v := range d { + if strings.HasPrefix(k, "query.") { + settings[strings.TrimPrefix(k, "query.")] = v + } + } + return settings +} + +func mergeSettings(settings ...map[string]interface{}) map[string]interface{} { + stg := make(map[string]interface{}) + for _, s := range settings { + for k, v := range s { + stg[k] = v + } + } + return stg +} + +func newDatasource() datasource.ServeOpts { + ds := &Service{ + im: datasource.NewInstanceManager(newDataSourceInstance), + } + + return datasource.ServeOpts{ + QueryDataHandler: ds, + CheckHealthHandler: ds, + CallResourceHandler: ds, + } +} + +type Service struct { + im instancemgmt.InstanceManager +} + +func newInstanceSettings(httpClientProvider httpclient.Provider) datasource.InstanceFactoryFunc { + return newDataSourceInstance +} + +func ProvideService(httpClientProvider httpclient.Provider) *Service { + return &Service{ + im: datasource.NewInstanceManager(newInstanceSettings(httpClientProvider)), + } +} + +func (ds *Service) CallResource(ctx context.Context, req *backend.CallResourceRequest, sender backend.CallResourceResponseSender) error { + var err error + var body interface{} + var code int + body = "Unknown error" + code = 500 + switch req.Path { + case "query-variable": + switch req.Method { + case "POST": + body, err = ds.QueryVariableData(ctx, req) + if err == nil { + code = 200 + } + default: + body = "Method not supported" + } + default: + body = "Path not supported" + } + resp := &backend.CallResourceResponse{Status: code} + resp.Body, err = json.Marshal(body) + sender.Send(resp) + return nil +} + +type grafanaMetricFindValue struct { + Value interface{} `json:"value"` + Text string `json:"text"` +} + +func (ds *Service) QueryVariableData(ctx context.Context, req *backend.CallResourceRequest) ([]grafanaMetricFindValue, error) { + s, err := ds.settings(req.PluginContext, ctx) + if err != nil { + return []grafanaMetricFindValue{}, err + } + return ds.queryVariable(req.Body, s, toHTTPHeaders(req.Headers)) +} + +func (ds *Service) queryVariable(qry []byte, s *druidInstanceSettings, headers http.Header) ([]grafanaMetricFindValue, error) { + // feature: probably implement a short (1s ? 500ms ? configurable in datasource ? beware memory: constrain size ?) life cache (druidInstanceSettings.cache ?) and early return then + response := []grafanaMetricFindValue{} + q, stg, err := ds.prepareQuery(qry, s) + if err != nil { + return response, err + } + r, err := ds.oldExecuteQuery("variable", q, s, stg, headers) + if err != nil { + return response, err + } + response, err = ds.prepareVariableResponse(r, stg) + return response, err +} + +func (ds *Service) prepareVariableResponse(resp *druidResponse, settings map[string]interface{}) ([]grafanaMetricFindValue, error) { + // refactor: probably some method that returns a container (make([]whattypeever, 0)) and its related appender func based on column type) + response := []grafanaMetricFindValue{} + for ic, c := range resp.Columns { + for _, r := range resp.Rows { + switch c.Type { + case "string": + if r[ic] != nil { + response = append(response, grafanaMetricFindValue{Value: r[ic].(string), Text: r[ic].(string)}) + } + case "float": + if r[ic] != nil { + response = append(response, grafanaMetricFindValue{Value: r[ic].(float64), Text: fmt.Sprintf("%f", r[ic].(float64))}) + } + case "int": + if r[ic] != nil { + i, err := strconv.Atoi(r[ic].(string)) + if err != nil { + i = 0 + } + response = append(response, grafanaMetricFindValue{Value: i, Text: r[ic].(string)}) + } + case "bool": + var b bool + var err error + b, ok := r[ic].(bool) + if !ok { + b, err = strconv.ParseBool(r[ic].(string)) + if err != nil { + b = false + } + } + var i int + if b { + i = 1 + } else { + i = 0 + } + response = append(response, grafanaMetricFindValue{Value: i, Text: strconv.FormatBool(b)}) + case "time": + var t time.Time + var err error + if r[ic] == nil { + r[ic] = 0.0 + } + switch r[ic].(type) { + case string: + t, err = time.Parse("2006-01-02T15:04:05.000Z", r[ic].(string)) + if err != nil { + t = time.Now() + } + case float64: + sec, dec := math.Modf(r[ic].(float64) / 1000) + t = time.Unix(int64(sec), int64(dec*(1e9))) + } + response = append(response, grafanaMetricFindValue{Value: t.Unix(), Text: t.Format(time.UnixDate)}) + } + } + } + return response, nil +} + +func (ds *Service) CheckHealth(ctx context.Context, req *backend.CheckHealthRequest) (*backend.CheckHealthResult, error) { + result := &backend.CheckHealthResult{ + Status: backend.HealthStatusError, + Message: "Can't connect to Druid", + } + + i, err := ds.im.Get(ctx, req.PluginContext) + if err != nil { + result.Message = "Can't get Druid instance" + return result, nil + } + + status, _, err := i.(*druidInstanceSettings).client.Common().Status() + if err != nil { + result.Message = "Can't fetch Druid status" + return result, nil + } + + result.Status = backend.HealthStatusOk + result.Message = fmt.Sprintf("Succesfully connected to Druid %s", status.Version) + return result, nil +} + +func (ds *Service) QueryData(ctx context.Context, req *backend.QueryDataRequest) (*backend.QueryDataResponse, error) { + response := backend.NewQueryDataResponse() + s, err := ds.settings(req.PluginContext, ctx) + if err != nil { + return response, err + } + + for _, q := range req.Queries { + response.Responses[q.RefID] = ds.query(q, s, toHTTPHeaders(req.Headers)) + } + + return response, nil +} + +func toHTTPHeaders(rawOriginal interface{}) http.Header { + headers := http.Header{} + switch original := rawOriginal.(type) { + case map[string]string: + for k, v := range original { + // This is temporary fix. List of allowed headers should be configurable. + if k != "Cookie" { + continue + } + headers.Set(k, v) + } + case map[string][]string: + for k, vv := range original { + // This is temporary fix. List of allowed headers should be configurable. + if k != "Cookie" { + continue + } + for _, v := range vv { + headers.Set(k, v) + } + } + } + return headers +} + +func (ds *Service) settings(ctx backend.PluginContext, getCtx context.Context) (*druidInstanceSettings, error) { + s, err := ds.im.Get(getCtx, ctx) + if err != nil { + return nil, err + } + return s.(*druidInstanceSettings), nil +} + +func (ds *Service) query(qry backend.DataQuery, s *druidInstanceSettings, headers http.Header) backend.DataResponse { + rawQuery := interpolateVariables(string(qry.JSON), qry.Interval, qry.TimeRange.Duration()) + + // feature: probably implement a short (1s ? 500ms ? configurable in datasource ? beware memory: constrain size ?) life cache (druidInstanceSettings.cache ?) and early return then + response := backend.DataResponse{} + q, stg, err := ds.prepareQuery([]byte(rawQuery), s) + if err != nil { + response.Error = err + return response + } + r, err := ds.executeQuery(qry.RefID, q, s, stg, headers) + if err != nil { + response.Error = err + return response + } + response, err = ds.prepareResponse(r, stg) + if err != nil { + // note: error could be set from prepareResponse but this gives a chance to react to error here + response.Error = err + } + return response +} + +func interpolateVariables(expr string, interval time.Duration, timeRange time.Duration) string { + rangeMs := timeRange.Milliseconds() + rangeSRounded := int64(math.Round(float64(rangeMs) / 1000.0)) + + expr = multiReplace(expr, varIntervalMs, strconv.FormatInt(int64(interval/time.Millisecond), 10)) + expr = multiReplace(expr, varInterval, formatDuration(interval)) + expr = multiReplace(expr, varRangeMs, strconv.FormatInt(rangeMs, 10)) + expr = multiReplace(expr, varRangeS, strconv.FormatInt(rangeSRounded, 10)) + expr = multiReplace(expr, varRange, strconv.FormatInt(rangeSRounded, 10)+"s") + expr = multiReplace(expr, varRateInterval, interval.String()) + + return expr +} + +func multiReplace(s string, olds []string, new string) string { + res := s + for _, old := range olds { + res = strings.ReplaceAll(res, old, new) + } + return res +} + +func formatDuration(inter time.Duration) string { + day := time.Hour * 24 + year := day * 365 + if inter >= year { + return fmt.Sprintf("%dy", inter/year) + } + + if inter >= day { + return fmt.Sprintf("%dd", inter/day) + } + + if inter >= time.Hour { + return fmt.Sprintf("%dh", inter/time.Hour) + } + + if inter >= time.Minute { + return fmt.Sprintf("%dm", inter/time.Minute) + } + + if inter >= time.Second { + return fmt.Sprintf("%ds", inter/time.Second) + } + + if inter >= time.Millisecond { + return fmt.Sprintf("%dms", inter/time.Millisecond) + } + + return "1ms" +} + +func (ds *Service) prepareQuery(qry []byte, s *druidInstanceSettings) (druidquerybuilder.Query, map[string]interface{}, error) { + var q druidQuery + err := json.Unmarshal(qry, &q) + if err != nil { + return nil, nil, err + } + var defaultQueryContext map[string]interface{} + if defaultContextParameters, ok := s.defaultQuerySettings["contextParameters"]; ok { + defaultQueryContext = ds.prepareQueryContext(defaultContextParameters.([]interface{})) + } + q.Builder["context"] = defaultQueryContext + if queryContextParameters, ok := q.Settings["contextParameters"]; ok { + q.Builder["context"] = mergeSettings( + defaultQueryContext, + ds.prepareQueryContext(queryContextParameters.([]interface{}))) + } + jsonQuery, err := json.Marshal(q.Builder) + if err != nil { + return nil, nil, err + } + query, err := s.client.Query().Load(jsonQuery) + // feature: could ensure __time column is selected, time interval is set based on qry given timerange and consider max data points ? + return query, mergeSettings(s.defaultQuerySettings, q.Settings), err +} + +func (ds *Service) prepareQueryContext(parameters []interface{}) map[string]interface{} { + ctx := make(map[string]interface{}) + for _, parameter := range parameters { + p := parameter.(map[string]interface{}) + ctx[p["name"].(string)] = p["value"] + } + return ctx +} + +func (ds *Service) executeQuery( + queryRef string, + q druidquerybuilder.Query, + s *druidInstanceSettings, + settings map[string]interface{}, + headers http.Header, +) (*data.Frame, error) { + var resultFramer result.Framer + qtyp := q.Type() + switch qtyp { + case "sql": + q.(*druidquery.SQL).SetResultFormat("array").SetHeader(true) + return nil, errors.New("not implemented") + case "timeseries": + var r result.TimeseriesResult + _, err := s.client.Query().Execute(q, &r) + if err != nil { + return nil, fmt.Errorf("Query error: %w", err) + } + resultFramer = &r + case "topN": + var r result.TopNResult + _, err := s.client.Query().Execute(q, &r) + if err != nil { + return nil, fmt.Errorf("Query error: %w", err) + } + resultFramer = &r + case "groupBy": + var r result.GroupByResult + _, err := s.client.Query().Execute(q, &r) + if err != nil { + return nil, fmt.Errorf("Query error: %w", err) + } + resultFramer = &r + case "scan": + q.(*druidquery.Scan).SetResultFormat("compactedList") + return nil, errors.New("not implemented") + case "search": + return nil, errors.New("not implemented") + case "timeBoundary": + return nil, errors.New("not implemented") + case "dataSourceMetadata": + return nil, errors.New("not implemented") + case "segmentMetadata": + return nil, errors.New("not implemented") + default: + return nil, errors.New("unknown query type") + } + f := resultFramer.Frame() + f.Name = queryRef + return f, nil +} + +func (ds *Service) oldExecuteQuery(queryRef string, q druidquerybuilder.Query, s *druidInstanceSettings, settings map[string]interface{}, headers http.Header) (*druidResponse, error) { + // refactor: probably need to extract per-query preprocessor and postprocessor into a per-query file. load those "plugins" (ak. QueryProcessor ?) into a register and then do something like plugins[q.Type()].preprocess(q) and plugins[q.Type()].postprocess(r) + r := &druidResponse{Reference: queryRef} + qtyp := q.Type() + switch qtyp { + case "sql": + q.(*druidquery.SQL).SetResultFormat("array").SetHeader(true) + case "scan": + q.(*druidquery.Scan).SetResultFormat("compactedList") + } + var res json.RawMessage + _, err := s.client.Query().Execute(q, &res) + if err != nil { + return r, err + } + switch qtyp { + case "sql": + var sqlr []interface{} + err := json.Unmarshal(res, &sqlr) + if err == nil && len(sqlr) > 1 { + for _, row := range sqlr[1:] { + r.Rows = append(r.Rows, row.([]interface{})) + } + for i, c := range sqlr[0].([]interface{}) { + col := responseColumn{ + Name: c.(string), + } + detectColumnType(&col, i, r.Rows) + r.Columns = append(r.Columns, col) + } + } + case "timeseries": + var tsResult result.TimeseriesResult + err := json.Unmarshal(res, &tsResult) + if err != nil { + return r, err + } + if len(tsResult) == 0 { + return r, nil + } + columns := tsResult.Columns() + for _, result := range tsResult { + var row []interface{} + t := result.Timestamp + if t.IsZero() { + // If timestamp not set, use value from previous row. + // This can happen when grand total is calculated. + t = r.Rows[len(r.Rows)-1][0].(time.Time) + } + row = append(row, t) + colResults := result.Result + for _, c := range columns[1:] { + row = append(row, colResults[c]) + } + r.Rows = append(r.Rows, row) + } + for i, c := range columns { + col := responseColumn{ + Name: c, + } + detectColumnType(&col, i, r.Rows) + r.Columns = append(r.Columns, col) + } + case "topN": + var tn []map[string]interface{} + err := json.Unmarshal(res, &tn) + if err == nil && len(tn) > 0 { + columns := []string{"timestamp"} + for c := range tn[0]["result"].([]interface{})[0].(map[string]interface{}) { + columns = append(columns, c) + } + for _, result := range tn { + for _, record := range result["result"].([]interface{}) { + var row []interface{} + row = append(row, result["timestamp"]) + o := record.(map[string]interface{}) + for _, c := range columns[1:] { + row = append(row, o[c]) + } + r.Rows = append(r.Rows, row) + } + } + for i, c := range columns { + col := responseColumn{ + Name: c, + } + detectColumnType(&col, i, r.Rows) + r.Columns = append(r.Columns, col) + } + } + case "groupBy": + var gb []map[string]interface{} + err := json.Unmarshal(res, &gb) + if err == nil && len(gb) > 0 { + columns := []string{"timestamp"} + for c := range gb[0]["event"].(map[string]interface{}) { + columns = append(columns, c) + } + for _, result := range gb { + var row []interface{} + row = append(row, result["timestamp"]) + colResults := result["event"].(map[string]interface{}) + for _, c := range columns[1:] { + row = append(row, colResults[c]) + } + r.Rows = append(r.Rows, row) + } + for i, c := range columns { + col := responseColumn{ + Name: c, + } + detectColumnType(&col, i, r.Rows) + r.Columns = append(r.Columns, col) + } + } + case "scan": + var scanr []map[string]interface{} + err := json.Unmarshal(res, &scanr) + if err == nil && len(scanr) > 0 { + for _, e := range scanr[0]["events"].([]interface{}) { + r.Rows = append(r.Rows, e.([]interface{})) + } + for i, c := range scanr[0]["columns"].([]interface{}) { + col := responseColumn{ + Name: c.(string), + } + detectColumnType(&col, i, r.Rows) + r.Columns = append(r.Columns, col) + } + } + case "search": + var s []map[string]interface{} + err := json.Unmarshal(res, &s) + if err == nil && len(s) > 0 { + columns := []string{"timestamp"} + for c := range s[0]["result"].([]interface{})[0].(map[string]interface{}) { + columns = append(columns, c) + } + for _, result := range s { + for _, record := range result["result"].([]interface{}) { + var row []interface{} + row = append(row, result["timestamp"]) + o := record.(map[string]interface{}) + for _, c := range columns[1:] { + row = append(row, o[c]) + } + r.Rows = append(r.Rows, row) + } + } + for i, c := range columns { + col := responseColumn{ + Name: c, + } + detectColumnType(&col, i, r.Rows) + r.Columns = append(r.Columns, col) + } + } + case "timeBoundary": + var tb []map[string]interface{} + err := json.Unmarshal(res, &tb) + if err == nil && len(tb) > 0 { + columns := []string{"timestamp"} + for c := range tb[0]["result"].(map[string]interface{}) { + columns = append(columns, c) + } + for _, result := range tb { + var row []interface{} + row = append(row, result["timestamp"]) + colResults := result["result"].(map[string]interface{}) + for _, c := range columns[1:] { + row = append(row, colResults[c]) + } + r.Rows = append(r.Rows, row) + } + for i, c := range columns { + col := responseColumn{ + Name: c, + } + detectColumnType(&col, i, r.Rows) + r.Columns = append(r.Columns, col) + } + } + case "dataSourceMetadata": + var dsm []map[string]interface{} + err := json.Unmarshal(res, &dsm) + if err == nil && len(dsm) > 0 { + columns := []string{"timestamp"} + for c := range dsm[0]["result"].(map[string]interface{}) { + columns = append(columns, c) + } + for _, result := range dsm { + var row []interface{} + row = append(row, result["timestamp"]) + colResults := result["result"].(map[string]interface{}) + for _, c := range columns[1:] { + row = append(row, colResults[c]) + } + r.Rows = append(r.Rows, row) + } + for i, c := range columns { + col := responseColumn{ + Name: c, + } + detectColumnType(&col, i, r.Rows) + r.Columns = append(r.Columns, col) + } + } + case "segmentMetadata": + var sm []map[string]interface{} + err := json.Unmarshal(res, &sm) + if err == nil && len(sm) > 0 { + var columns []string + switch settings["view"].(string) { + case "base": + for k, v := range sm[0] { + if k != "aggregators" && k != "columns" && k != "timestampSpec" { + if k == "intervals" { + for i := range v.([]interface{}) { + pos := strconv.Itoa(i) + columns = append(columns, "interval_start_"+pos) + columns = append(columns, "interval_stop_"+pos) + } + } else { + columns = append(columns, k) + } + } + } + for _, result := range sm { + var row []interface{} + for _, c := range columns { + var col interface{} + if strings.HasPrefix(c, "interval_") { + parts := strings.Split(c, "_") + pos := 0 + if parts[1] == "stop" { + pos = 1 + } + idx, err := strconv.Atoi(parts[2]) + if err != nil { + return r, errors.New("interval parsing goes wrong") + } + ii := result["intervals"].([]interface{})[idx] + col = strings.Split(ii.(string), "/")[pos] + } else { + col = result[c] + } + row = append(row, col) + } + r.Rows = append(r.Rows, row) + } + case "aggregators": + for _, v := range sm[0]["aggregators"].(map[string]interface{}) { + columns = append(columns, "aggregator") + for k := range v.(map[string]interface{}) { + columns = append(columns, k) + } + break + } + for _, result := range sm { + for k, v := range result["aggregators"].(map[string]interface{}) { + var row []interface{} + for _, c := range columns { + var col interface{} + if c == "aggregator" { + col = k + } else { + col = v.(map[string]interface{})[c] + } + row = append(row, col) + } + r.Rows = append(r.Rows, row) + } + } + case "columns": + for _, v := range sm[0]["columns"].(map[string]interface{}) { + columns = append(columns, "column") + for k := range v.(map[string]interface{}) { + columns = append(columns, k) + } + break + } + for _, result := range sm { + for k, v := range result["columns"].(map[string]interface{}) { + var row []interface{} + for _, c := range columns { + var col interface{} + if c == "column" { + col = k + } else { + col = v.(map[string]interface{})[c] + } + row = append(row, col) + } + r.Rows = append(r.Rows, row) + } + } + case "timestampspec": + for k := range sm[0]["timestampSpec"].(map[string]interface{}) { + columns = append(columns, k) + } + for _, result := range sm { + var row []interface{} + for _, c := range columns { + col := result["timestampSpec"].(map[string]interface{})[c] + row = append(row, col) + } + r.Rows = append(r.Rows, row) + } + } + for i, c := range columns { + col := responseColumn{ + Name: c, + } + detectColumnType(&col, i, r.Rows) + r.Columns = append(r.Columns, col) + } + + } + default: + return r, errors.New("unknown query type") + } + return r, err +} + +func (ds *Service) prepareResponse(frame *data.Frame, settings map[string]interface{}) (backend.DataResponse, error) { + // refactor: probably some method that returns a container (make([]whattypeever, 0)) and its related appender func based on column type) + response := backend.DataResponse{} + // TODO support those settings + // hideEmptyColumns, _ := settings["hideEmptyColumns"].(bool) + // responseLimit, _ := settings["responseLimit"].(float64) + format, found := settings["format"] + if !found { + format = "long" + } else { + format = format.(string) + } + // convert to other formats if specified + if format == "wide" && len(frame.Fields) > 0 { + f, err := data.LongToWide(frame, nil) + if err == nil { + frame = f + } + } else if format == "log" && len(frame.Fields) > 0 { + f, err := longToLog(frame, settings) + if err == nil { + frame = f + } + } + response.Frames = append(response.Frames, frame) + return response, nil +} + +func longToLog(longFrame *data.Frame, settings map[string]interface{}) (*data.Frame, error) { + logFrame := data.NewFrame("response") + logFrame.SetMeta(&data.FrameMeta{PreferredVisualization: data.VisTypeLogs}) + // fetch settings + logColumnTime, found := settings["logColumnTime"] + if !found { + logColumnTime = "__time" + } else { + logColumnTime = logColumnTime.(string) + } + logColumnLevel, found := settings["logColumnLevel"] + if !found { + logColumnLevel = "level" + } else { + logColumnLevel = logColumnLevel.(string) + } + logColumnMessage, found := settings["logColumnMessage"] + if !found { + logColumnMessage = "message" + } else { + logColumnMessage = logColumnMessage.(string) + } + // make sure the special time and message fields come first in the frame because that's how + // the log ui decides what time and message to display + for _, f := range longFrame.Fields { + if f.Name == logColumnTime || f.Name == logColumnMessage { + logFrame.Fields = append(logFrame.Fields, f) + } + } + // now copy over the rest of the fields + for _, f := range longFrame.Fields { + if f.Name == logColumnTime { + // skip because time already copied above. does not skip message because we want it + // included twice since otherwise it won't be available as a detected field + continue + } else if f.Name == logColumnLevel { + f.Name = "level" + } + logFrame.Fields = append(logFrame.Fields, f) + } + return logFrame, nil +} diff --git a/pkg/tsdb/druid/result/common.go b/pkg/tsdb/druid/result/common.go new file mode 100644 index 0000000000000..5d15a866c8d6b --- /dev/null +++ b/pkg/tsdb/druid/result/common.go @@ -0,0 +1,82 @@ +package result + +import ( + "fmt" + "time" + + "github.com/grafana/grafana-plugin-sdk-go/backend/log" + "github.com/grafana/grafana-plugin-sdk-go/data" +) + +type Framer interface { + Frame() *data.Frame +} + +const timestampColumn = "timestamp" + +func toTypedResults(raw []interface{}) interface{} { + // TODO this is not perfect. Maybe we should get this from the query? + switch raw[0].(type) { + // TODO this can be done with generics, but we should wait until go is updated in this repo + case string: + results := make([]string, len(raw)) + for i, v := range raw { + vv, _ := v.(string) + results[i] = vv + } + return results + case int: + results := make([]int, len(raw)) + for i, v := range raw { + vv, _ := v.(int) + results[i] = vv + } + return results + case int32: + results := make([]int32, len(raw)) + for i, v := range raw { + vv, _ := v.(int32) + results[i] = vv + } + return results + case int64: + results := make([]int64, len(raw)) + for i, v := range raw { + vv, _ := v.(int64) + results[i] = vv + } + return results + case float32: + results := make([]float32, len(raw)) + for i, v := range raw { + vv, _ := v.(float32) + results[i] = vv + } + return results + case float64: + results := make([]float64, len(raw)) + for i, v := range raw { + vv, _ := v.(float64) + results[i] = vv + } + return results + case bool: + results := make([]bool, len(raw)) + for i, v := range raw { + vv, _ := v.(bool) + results[i] = vv + } + return results + case time.Time: + results := make([]time.Time, len(raw)) + for i, v := range raw { + vv, _ := v.(time.Time) + results[i] = vv + } + return results + default: + // TODO better error handling + log.DefaultLogger.Debug(fmt.Sprintf("Unsupported type %T", raw[0])) + return nil + } +} diff --git a/pkg/tsdb/druid/result/groupby.go b/pkg/tsdb/druid/result/groupby.go new file mode 100644 index 0000000000000..e3cb2a49b9472 --- /dev/null +++ b/pkg/tsdb/druid/result/groupby.go @@ -0,0 +1,74 @@ +package result + +import ( + "sort" + "time" + + "github.com/grafana/grafana-plugin-sdk-go/data" +) + +type GroupByResult []GroupByRecord + +// Frame returns data formatted as Grafana Frame. +func (t *GroupByResult) Frame() *data.Frame { + columns := t.Columns() + fields := make([]*data.Field, len(columns)) + for i, column := range columns { + labels := data.Labels{} + fields[i] = data.NewField(column, labels, t.Values(column)) + } + return data.NewFrame("", fields...) +} + +// Columns returns list of columns. It calls `Columns()` on first record. If +// no records are available it returns nil. +func (t *GroupByResult) Columns() []string { + for _, r := range *t { + return r.Columns() + } + return nil +} + +// Values returns all values for given column. +func (t *GroupByResult) Values(column string) interface{} { + if len(*t) == 0 { + return nil + } + results := make([]interface{}, len(*t)) + for i, r := range *t { + results[i] = r.Value(column) + } + return toTypedResults(results) +} + +type GroupByRecord struct { + Timestamp time.Time `json:"timestamp"` + Event map[string]interface{} `json:"event"` +} + +// Columns returns list of columns for given record. +// The first column will always be "timestamp" followed by other columns sorted +// alphabetically. +func (t *GroupByRecord) Columns() []string { + columns := make([]string, len(t.Event)+1) + columns[0] = timestampColumn + i := 1 + for c := range t.Event { + columns[i] = c + i++ + } + sort.Strings(columns[1:]) + return columns +} + +// Value returns value for given column. +func (t *GroupByRecord) Value(column string) interface{} { + if column == timestampColumn { + return t.Timestamp + } + v, ok := t.Event[column] + if !ok { + return nil + } + return v +} diff --git a/pkg/tsdb/druid/result/groupby_test.go b/pkg/tsdb/druid/result/groupby_test.go new file mode 100644 index 0000000000000..80cc4cb335b67 --- /dev/null +++ b/pkg/tsdb/druid/result/groupby_test.go @@ -0,0 +1,60 @@ +package result + +import ( + "encoding/json" + "testing" + "time" + + "github.com/grafana/grafana-plugin-sdk-go/data" + "github.com/stretchr/testify/assert" +) + +func TestGroupByResultUnmarshal(t *testing.T) { + input := []byte(`[ + { + "timestamp": "2022-10-14T08:08:10.000Z", + "event": { + "dog_count": 47, + "dog_rate": 2.083, + "dog_name": "foo" + } + }, + { + "timestamp": "2022-10-14T08:08:11.000Z", + "event": { + "dog_count": 75, + "dog_rate": 3.846, + "dog_name": "bar" + } + } + ]`) + + var res GroupByResult + err := json.Unmarshal(input, &res) + assert.Nil(t, err, "Failed to unmarshal response") + assert.Equal(t, len(res), 2, "Wrong number of unmarshalled results") + frame := res.Frame() + assert.Equal(t, len(frame.Fields), 4, "Wrong number of framed fields") + + assert.Equal(t, frame.Fields[0].Name, "timestamp") + assert.Equal(t, frame.Fields[0].Type(), data.FieldTypeTime) + assert.Equal(t, frame.Fields[0].Len(), 2) + assert.Equal(t, frame.Fields[0].At(0), time.Time(time.Date(2022, time.October, 14, 8, 8, 10, 0, time.UTC))) + assert.Equal(t, frame.Fields[0].At(1), time.Time(time.Date(2022, time.October, 14, 8, 8, 11, 0, time.UTC))) + + assert.Equal(t, frame.Fields[1].Name, "dog_count") + assert.Equal(t, frame.Fields[1].Type(), data.FieldTypeFloat64) + assert.Equal(t, frame.Fields[1].At(0), float64(47)) + assert.Equal(t, frame.Fields[1].At(1), float64(75)) + + assert.Equal(t, frame.Fields[2].Name, "dog_name") + assert.Equal(t, frame.Fields[2].Type(), data.FieldTypeString) + assert.Equal(t, frame.Fields[2].At(0), "foo") + assert.Equal(t, frame.Fields[2].At(1), "bar") + + assert.Equal(t, frame.Fields[3].Name, "dog_rate") + assert.Equal(t, frame.Fields[3].Type(), data.FieldTypeFloat64) + assert.Equal(t, frame.Fields[3].At(0), float64(2.083)) + assert.Equal(t, frame.Fields[3].At(1), float64(3.846)) + +} diff --git a/pkg/tsdb/druid/result/timeseries.go b/pkg/tsdb/druid/result/timeseries.go new file mode 100644 index 0000000000000..ec9e20e64fa52 --- /dev/null +++ b/pkg/tsdb/druid/result/timeseries.go @@ -0,0 +1,74 @@ +package result + +import ( + "sort" + "time" + + "github.com/grafana/grafana-plugin-sdk-go/data" +) + +type TimeseriesResult []TimeseriesRecord + +// Frame returns data formatted as Grafana Frame. +func (t *TimeseriesResult) Frame() *data.Frame { + columns := t.Columns() + fields := make([]*data.Field, len(columns)) + for i, column := range columns { + labels := data.Labels{} + fields[i] = data.NewField(column, labels, t.Values(column)) + } + return data.NewFrame("", fields...) +} + +// Columns returns list of columns. It calls `Columns()` on first record. If +// no records are available it returns nil. +func (t *TimeseriesResult) Columns() []string { + for _, r := range *t { + return r.Columns() + } + return nil +} + +// Values returns all values for given column. +func (t *TimeseriesResult) Values(column string) interface{} { + if len(*t) == 0 { + return nil + } + results := make([]interface{}, len(*t)) + for i, r := range *t { + results[i] = r.Value(column) + } + return toTypedResults(results) +} + +type TimeseriesRecord struct { + Timestamp time.Time `json:"timestamp"` + Result map[string]interface{} `json:"result"` +} + +// Columns returns list of columns for given record. +// The first column will always be "timestamp" followed by other columns sorted +// alphabetically. +func (t *TimeseriesRecord) Columns() []string { + columns := make([]string, len(t.Result)+1) + columns[0] = timestampColumn + i := 1 + for c := range t.Result { + columns[i] = c + i++ + } + sort.Strings(columns[1:]) + return columns +} + +// Value returns value for given column. +func (t *TimeseriesRecord) Value(column string) interface{} { + if column == timestampColumn { + return t.Timestamp + } + v, ok := t.Result[column] + if !ok { + return nil + } + return v +} diff --git a/pkg/tsdb/druid/result/timeseries_test.go b/pkg/tsdb/druid/result/timeseries_test.go new file mode 100644 index 0000000000000..1154b0f0d522b --- /dev/null +++ b/pkg/tsdb/druid/result/timeseries_test.go @@ -0,0 +1,60 @@ +package result + +import ( + "encoding/json" + "testing" + "time" + + "github.com/grafana/grafana-plugin-sdk-go/data" + "github.com/stretchr/testify/assert" +) + +func TestTimeseriesResultUnmarshal(t *testing.T) { + input := []byte(`[ + { + "timestamp": "2022-10-14T08:08:10.000Z", + "result": { + "dog_count": 47, + "dog_rate": 2.083, + "dog_name": "foo" + } + }, + { + "timestamp": "2022-10-14T08:08:11.000Z", + "result": { + "dog_count": 75, + "dog_rate": 3.846, + "dog_name": "bar" + } + } + ]`) + + var res TimeseriesResult + err := json.Unmarshal(input, &res) + assert.Nil(t, err, "Failed to unmarshal response") + assert.Equal(t, len(res), 2, "Wrong number of unmarshalled results") + frame := res.Frame() + assert.Equal(t, len(frame.Fields), 4, "Wrong number of framed fields") + + assert.Equal(t, frame.Fields[0].Name, "timestamp") + assert.Equal(t, frame.Fields[0].Type(), data.FieldTypeTime) + assert.Equal(t, frame.Fields[0].Len(), 2) + assert.Equal(t, frame.Fields[0].At(0), time.Time(time.Date(2022, time.October, 14, 8, 8, 10, 0, time.UTC))) + assert.Equal(t, frame.Fields[0].At(1), time.Time(time.Date(2022, time.October, 14, 8, 8, 11, 0, time.UTC))) + + assert.Equal(t, frame.Fields[1].Name, "dog_count") + assert.Equal(t, frame.Fields[1].Type(), data.FieldTypeFloat64) + assert.Equal(t, frame.Fields[1].At(0), float64(47)) + assert.Equal(t, frame.Fields[1].At(1), float64(75)) + + assert.Equal(t, frame.Fields[2].Name, "dog_name") + assert.Equal(t, frame.Fields[2].Type(), data.FieldTypeString) + assert.Equal(t, frame.Fields[2].At(0), "foo") + assert.Equal(t, frame.Fields[2].At(1), "bar") + + assert.Equal(t, frame.Fields[3].Name, "dog_rate") + assert.Equal(t, frame.Fields[3].Type(), data.FieldTypeFloat64) + assert.Equal(t, frame.Fields[3].At(0), float64(2.083)) + assert.Equal(t, frame.Fields[3].At(1), float64(3.846)) + +} diff --git a/pkg/tsdb/druid/result/topn.go b/pkg/tsdb/druid/result/topn.go new file mode 100644 index 0000000000000..ba64fa7dc508b --- /dev/null +++ b/pkg/tsdb/druid/result/topn.go @@ -0,0 +1,78 @@ +package result + +import ( + "sort" + "time" + + "github.com/grafana/grafana-plugin-sdk-go/data" +) + +type TopNResult []TopNRecord + +// Frame returns data formatted as Grafana Frame. +func (t *TopNResult) Frame() *data.Frame { + columns := t.Columns() + fields := make([]*data.Field, len(columns)) + for i, column := range columns { + labels := data.Labels{} + fields[i] = data.NewField(column, labels, t.Values(column)) + } + return data.NewFrame("", fields...) +} + +// Columns returns list of columns. It calls `Columns()` on first record. If +// no records are available it returns nil. +func (t *TopNResult) Columns() []string { + for _, r := range *t { + return r.Columns() + } + return nil +} + +// Values returns all values for given column. +func (t *TopNResult) Values(column string) interface{} { + results := []interface{}{} + for _, r := range *t { + results = append(results, r.Values(column)...) + } + return toTypedResults(results) +} + +type TopNRecord struct { + Timestamp time.Time `json:"timestamp"` + Result []map[string]interface{} `json:"result"` +} + +// Columns returns list of columns for given record. +// It assumes that every map from Result has the same columns, so it gets +// the list from first item. +// The first column will always be "timestamp" followed by other columns sorted +// alphabetically. +func (t *TopNRecord) Columns() []string { + for _, result := range t.Result { + columns := make([]string, len(result)+1) + columns[0] = timestampColumn + i := 1 + for c := range result { + columns[i] = c + i++ + } + sort.Strings(columns[1:]) + return columns + } + return nil +} + +// Value returns values for given column. +func (t *TopNRecord) Values(column string) []interface{} { + values := []interface{}{} + for _, result := range t.Result { + if column == timestampColumn { + values = append(values, t.Timestamp) + continue + } + v, _ := result[column] + values = append(values, v) + } + return values +} diff --git a/pkg/tsdb/druid/result/topn_test.go b/pkg/tsdb/druid/result/topn_test.go new file mode 100644 index 0000000000000..7fb2503aae7cc --- /dev/null +++ b/pkg/tsdb/druid/result/topn_test.go @@ -0,0 +1,59 @@ +package result + +import ( + "encoding/json" + "testing" + "time" + + "github.com/grafana/grafana-plugin-sdk-go/data" + "github.com/stretchr/testify/assert" +) + +func TestTopNResultUnmarshal(t *testing.T) { + input := []byte(`[ + { + "timestamp": "2022-10-14T08:08:10.000Z", + "result": [ + { + "dog_count": 47, + "dog_rate": 2.083, + "dog_name": "foo" + }, + { + "dog_count": 75, + "dog_rate": 3.846, + "dog_name": "bar" + } + ] + } + ]`) + + var res TopNResult + err := json.Unmarshal(input, &res) + assert.Nil(t, err, "Failed to unmarshal response") + assert.Equal(t, len(res), 1, "Wrong number of unmarshalled results") + frame := res.Frame() + assert.Equal(t, len(frame.Fields), 4, "Wrong number of framed fields") + + assert.Equal(t, frame.Fields[0].Name, "timestamp") + assert.Equal(t, frame.Fields[0].Type(), data.FieldTypeTime) + assert.Equal(t, frame.Fields[0].Len(), 2) + assert.Equal(t, frame.Fields[0].At(0), time.Time(time.Date(2022, time.October, 14, 8, 8, 10, 0, time.UTC))) + assert.Equal(t, frame.Fields[0].At(1), time.Time(time.Date(2022, time.October, 14, 8, 8, 10, 0, time.UTC))) + + assert.Equal(t, frame.Fields[1].Name, "dog_count") + assert.Equal(t, frame.Fields[1].Type(), data.FieldTypeFloat64) + assert.Equal(t, frame.Fields[1].At(0), float64(47)) + assert.Equal(t, frame.Fields[1].At(1), float64(75)) + + assert.Equal(t, frame.Fields[2].Name, "dog_name") + assert.Equal(t, frame.Fields[2].Type(), data.FieldTypeString) + assert.Equal(t, frame.Fields[2].At(0), "foo") + assert.Equal(t, frame.Fields[2].At(1), "bar") + + assert.Equal(t, frame.Fields[3].Name, "dog_rate") + assert.Equal(t, frame.Fields[3].Type(), data.FieldTypeFloat64) + assert.Equal(t, frame.Fields[3].At(0), float64(2.083)) + assert.Equal(t, frame.Fields[3].At(1), float64(3.846)) + +} diff --git a/public/microfrontends/fn_dashboard/index.html b/public/microfrontends/fn_dashboard/index.html index 42478827d766e..37811ad878659 100644 --- a/public/microfrontends/fn_dashboard/index.html +++ b/public/microfrontends/fn_dashboard/index.html @@ -27,7 +27,7 @@