Skip to content

feat: scrape timeout #558

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 8 additions & 7 deletions cmd/postgres_exporter/datasource.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package main

import (
"context"
"fmt"
"io/ioutil"
"net/url"
Expand All @@ -25,7 +26,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
)

func (e *Exporter) discoverDatabaseDSNs() []string {
func (e *Exporter) discoverDatabaseDSNs(ctx context.Context) []string {
// connstring syntax is complex (and not sure if even regular).
// we don't need to parse it, so just superficially validate that it starts
// with a valid-ish keyword pair
Expand All @@ -50,7 +51,7 @@ func (e *Exporter) discoverDatabaseDSNs() []string {
continue
}

server, err := e.servers.GetServer(dsn)
server, err := e.servers.GetServer(ctx, dsn)
if err != nil {
level.Error(logger).Log("msg", "Error opening connection to database", "dsn", loggableDSN(dsn), "err", err)
continue
Expand All @@ -60,7 +61,7 @@ func (e *Exporter) discoverDatabaseDSNs() []string {
// If autoDiscoverDatabases is true, set first dsn as master database (Default: false)
server.master = true

databaseNames, err := queryDatabases(server)
databaseNames, err := queryDatabases(ctx, server)
if err != nil {
level.Error(logger).Log("msg", "Error querying databases", "dsn", loggableDSN(dsn), "err", err)
continue
Expand Down Expand Up @@ -96,8 +97,8 @@ func (e *Exporter) discoverDatabaseDSNs() []string {
return result
}

func (e *Exporter) scrapeDSN(ch chan<- prometheus.Metric, dsn string) error {
server, err := e.servers.GetServer(dsn)
func (e *Exporter) scrapeDSN(ctx context.Context, ch chan<- prometheus.Metric, dsn string) error {
server, err := e.servers.GetServer(ctx, dsn)

if err != nil {
return &ErrorConnectToServer{fmt.Sprintf("Error opening connection to database (%s): %s", loggableDSN(dsn), err.Error())}
Expand All @@ -109,11 +110,11 @@ func (e *Exporter) scrapeDSN(ch chan<- prometheus.Metric, dsn string) error {
}

// Check if map versions need to be updated
if err := e.checkMapVersions(ch, server); err != nil {
if err := e.checkMapVersions(ctx, ch, server); err != nil {
level.Warn(logger).Log("msg", "Proceeding with outdated query maps, as the Postgres version could not be determined", "err", err)
}

return server.Scrape(ch, e.disableSettingsMetrics)
return server.Scrape(ctx, ch, e.disableSettingsMetrics)
}

// try to get the DataSource
Expand Down
3 changes: 2 additions & 1 deletion cmd/postgres_exporter/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ var (
excludeDatabases = kingpin.Flag("exclude-databases", "A list of databases to remove when autoDiscoverDatabases is enabled").Default("").Envar("PG_EXPORTER_EXCLUDE_DATABASES").String()
includeDatabases = kingpin.Flag("include-databases", "A list of databases to include when autoDiscoverDatabases is enabled").Default("").Envar("PG_EXPORTER_INCLUDE_DATABASES").String()
metricPrefix = kingpin.Flag("metric-prefix", "A metric prefix can be used to have non-default (not \"pg\") prefixes for each of the metrics").Default("pg").Envar("PG_EXPORTER_METRIC_PREFIX").String()
scrapeTimeout = kingpin.Flag("scrape-timeout", "Maximum duration of a scrape").Default("60s").Envar("PG_EXPORTER_SCRAPE_TIMEOUT").Duration()
logger = log.NewNopLogger()
)

Expand Down Expand Up @@ -105,7 +106,7 @@ func main() {
IncludeDatabases(*includeDatabases),
}

exporter := NewExporter(dsn, opts...)
exporter := NewExporter(dsn, scrapeTimeout, opts...)
defer func() {
exporter.servers.Close()
}()
Expand Down
11 changes: 6 additions & 5 deletions cmd/postgres_exporter/namespace.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package main

import (
"context"
"database/sql"
"errors"
"fmt"
Expand All @@ -27,7 +28,7 @@ import (

// Query within a namespace mapping and emit metrics. Returns fatal errors if
// the scrape fails, and a slice of errors if they were non-fatal.
func queryNamespaceMapping(server *Server, namespace string, mapping MetricMapNamespace) ([]prometheus.Metric, []error, error) {
func queryNamespaceMapping(ctx context.Context, server *Server, namespace string, mapping MetricMapNamespace) ([]prometheus.Metric, []error, error) {
// Check for a query override for this namespace
query, found := server.queryOverrides[namespace]

Expand All @@ -45,9 +46,9 @@ func queryNamespaceMapping(server *Server, namespace string, mapping MetricMapNa
if !found {
// I've no idea how to avoid this properly at the moment, but this is
// an admin tool so you're not injecting SQL right?
rows, err = server.db.Query(fmt.Sprintf("SELECT * FROM %s;", namespace)) // nolint: gas
rows, err = server.db.QueryContext(ctx, fmt.Sprintf("SELECT * FROM %s;", namespace)) // nolint: gas
} else {
rows, err = server.db.Query(query)
rows, err = server.db.QueryContext(ctx, query)
}
if err != nil {
return []prometheus.Metric{}, []error{}, fmt.Errorf("Error running query on database %q: %s %v", server, namespace, err)
Expand Down Expand Up @@ -183,7 +184,7 @@ func queryNamespaceMapping(server *Server, namespace string, mapping MetricMapNa

// Iterate through all the namespace mappings in the exporter and run their
// queries.
func queryNamespaceMappings(ch chan<- prometheus.Metric, server *Server) map[string]error {
func queryNamespaceMappings(ctx context.Context, ch chan<- prometheus.Metric, server *Server) map[string]error {
// Return a map of namespace -> errors
namespaceErrors := make(map[string]error)

Expand Down Expand Up @@ -225,7 +226,7 @@ func queryNamespaceMappings(ch chan<- prometheus.Metric, server *Server) map[str
var nonFatalErrors []error
var err error
if scrapeMetric {
metrics, nonFatalErrors, err = queryNamespaceMapping(server, namespace, mapping)
metrics, nonFatalErrors, err = queryNamespaceMapping(ctx, server, namespace, mapping)
} else {
metrics = cachedMetric.metrics
}
Expand Down
5 changes: 3 additions & 2 deletions cmd/postgres_exporter/pg_setting.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package main

import (
"context"
"fmt"
"math"
"strconv"
Expand All @@ -24,7 +25,7 @@ import (
)

// Query the pg_settings view containing runtime variables
func querySettings(ch chan<- prometheus.Metric, server *Server) error {
func querySettings(ctx context.Context, ch chan<- prometheus.Metric, server *Server) error {
level.Debug(logger).Log("msg", "Querying pg_setting view", "server", server)

// pg_settings docs: https://www.postgresql.org/docs/current/static/view-pg-settings.html
Expand All @@ -33,7 +34,7 @@ func querySettings(ch chan<- prometheus.Metric, server *Server) error {
// types in normaliseUnit() below
query := "SELECT name, setting, COALESCE(unit, ''), short_desc, vartype FROM pg_settings WHERE vartype IN ('bool', 'integer', 'real');"

rows, err := server.db.Query(query)
rows, err := server.db.QueryContext(ctx, query)
if err != nil {
return fmt.Errorf("Error running query on database %q: %s %v", server, namespace, err)
}
Expand Down
30 changes: 21 additions & 9 deletions cmd/postgres_exporter/postgres_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package main

import (
"context"
"crypto/sha256"
"database/sql"
"errors"
Expand Down Expand Up @@ -468,6 +469,7 @@ type Exporter struct {
psqlUp prometheus.Gauge
userQueriesError *prometheus.GaugeVec
totalScrapes prometheus.Counter
scrapeTimeout *time.Duration

// servers are used to allow re-using the DB connection between scrapes.
// servers contains metrics map and query overrides.
Expand Down Expand Up @@ -555,7 +557,7 @@ func parseConstLabels(s string) prometheus.Labels {
}

// NewExporter returns a new PostgreSQL exporter for the provided DSN.
func NewExporter(dsn []string, opts ...ExporterOpt) *Exporter {
func NewExporter(dsn []string, scrapeTimeout *time.Duration, opts ...ExporterOpt) *Exporter {
e := &Exporter{
dsn: dsn,
builtinMetricMaps: builtinMetricMaps,
Expand All @@ -567,6 +569,7 @@ func NewExporter(dsn []string, opts ...ExporterOpt) *Exporter {

e.setupInternalMetrics()
e.servers = NewServers(ServerWithLabels(e.constantLabels))
e.scrapeTimeout = scrapeTimeout

return e
}
Expand Down Expand Up @@ -614,7 +617,16 @@ func (e *Exporter) Describe(ch chan<- *prometheus.Desc) {

// Collect implements prometheus.Collector.
func (e *Exporter) Collect(ch chan<- prometheus.Metric) {
e.scrape(ch)
var ctx context.Context
var cancel context.CancelFunc
if e.scrapeTimeout != nil {
ctx, cancel = context.WithTimeout(context.Background(), *e.scrapeTimeout)
} else {
ctx, cancel = context.WithCancel(context.Background())
}
defer cancel()

e.scrape(ctx, ch)

ch <- e.duration
ch <- e.totalScrapes
Expand All @@ -630,9 +642,9 @@ func newDesc(subsystem, name, help string, labels prometheus.Labels) *prometheus
)
}

func checkPostgresVersion(db *sql.DB, server string) (semver.Version, string, error) {
func checkPostgresVersion(ctx context.Context, db *sql.DB, server string) (semver.Version, string, error) {
level.Debug(logger).Log("msg", "Querying PostgreSQL version", "server", server)
versionRow := db.QueryRow("SELECT version();")
versionRow := db.QueryRowContext(ctx, "SELECT version();")
var versionString string
err := versionRow.Scan(&versionString)
if err != nil {
Expand All @@ -647,8 +659,8 @@ func checkPostgresVersion(db *sql.DB, server string) (semver.Version, string, er
}

// Check and update the exporters query maps if the version has changed.
func (e *Exporter) checkMapVersions(ch chan<- prometheus.Metric, server *Server) error {
semanticVersion, versionString, err := checkPostgresVersion(server.db, server.String())
func (e *Exporter) checkMapVersions(ctx context.Context, ch chan<- prometheus.Metric, server *Server) error {
semanticVersion, versionString, err := checkPostgresVersion(ctx, server.db, server.String())
if err != nil {
return fmt.Errorf("Error fetching version string on %q: %v", server, err)
}
Expand Down Expand Up @@ -709,7 +721,7 @@ func (e *Exporter) checkMapVersions(ch chan<- prometheus.Metric, server *Server)
return nil
}

func (e *Exporter) scrape(ch chan<- prometheus.Metric) {
func (e *Exporter) scrape(ctx context.Context, ch chan<- prometheus.Metric) {
defer func(begun time.Time) {
e.duration.Set(time.Since(begun).Seconds())
}(time.Now())
Expand All @@ -718,14 +730,14 @@ func (e *Exporter) scrape(ch chan<- prometheus.Metric) {

dsns := e.dsn
if e.autoDiscoverDatabases {
dsns = e.discoverDatabaseDSNs()
dsns = e.discoverDatabaseDSNs(ctx)
}

var errorsCount int
var connectionErrorsCount int

for _, dsn := range dsns {
if err := e.scrapeDSN(ch, dsn); err != nil {
if err := e.scrapeDSN(ctx, ch, dsn); err != nil {
errorsCount++

level.Error(logger).Log("err", err)
Expand Down
34 changes: 21 additions & 13 deletions cmd/postgres_exporter/postgres_exporter_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@
package main

import (
"context"
"fmt"
"os"
"strings"
"testing"
"time"

_ "github.com/lib/pq"
"github.com/prometheus/client_golang/prometheus"
Expand All @@ -38,11 +40,13 @@ type IntegrationSuite struct {

var _ = Suite(&IntegrationSuite{})

var testScrapeDuration = 10 * time.Second

func (s *IntegrationSuite) SetUpSuite(c *C) {
dsn := os.Getenv("DATA_SOURCE_NAME")
c.Assert(dsn, Not(Equals), "")

exporter := NewExporter(strings.Split(dsn, ","))
exporter := NewExporter(strings.Split(dsn, ","), &testScrapeDuration)
c.Assert(exporter, NotNil)
// Assign the exporter to the suite
s.e = exporter
Expand All @@ -59,24 +63,26 @@ func (s *IntegrationSuite) TestAllNamespacesReturnResults(c *C) {
}
}()

ctx := context.Background()

for _, dsn := range s.e.dsn {
// Open a database connection
server, err := NewServer(dsn)
c.Assert(server, NotNil)
c.Assert(err, IsNil)

// Do a version update
err = s.e.checkMapVersions(ch, server)
err = s.e.checkMapVersions(ctx, ch, server)
c.Assert(err, IsNil)

err = querySettings(ch, server)
err = querySettings(ctx, ch, server)
if !c.Check(err, Equals, nil) {
fmt.Println("## ERRORS FOUND")
fmt.Println(err)
}

// This should never happen in our test cases.
errMap := queryNamespaceMappings(ch, server)
errMap := queryNamespaceMappings(ctx, ch, server)
if !c.Check(len(errMap), Equals, 0) {
fmt.Println("## NAMESPACE ERRORS FOUND")
for namespace, err := range errMap {
Expand All @@ -99,14 +105,15 @@ func (s *IntegrationSuite) TestInvalidDsnDoesntCrash(c *C) {
}()

// Send a bad DSN
exporter := NewExporter([]string{"invalid dsn"})
ctx := context.Background()
exporter := NewExporter([]string{"invalid dsn"}, &testScrapeDuration)
c.Assert(exporter, NotNil)
exporter.scrape(ch)
exporter.scrape(ctx, ch)

// Send a DSN to a non-listening port.
exporter = NewExporter([]string{"postgresql://nothing:[email protected]:1/nothing"})
exporter = NewExporter([]string{"postgresql://nothing:[email protected]:1/nothing"}, &testScrapeDuration)
c.Assert(exporter, NotNil)
exporter.scrape(ch)
exporter.scrape(ctx, ch)
}

// TestUnknownMetricParsingDoesntCrash deliberately deletes all the column maps out
Expand All @@ -122,7 +129,7 @@ func (s *IntegrationSuite) TestUnknownMetricParsingDoesntCrash(c *C) {
dsn := os.Getenv("DATA_SOURCE_NAME")
c.Assert(dsn, Not(Equals), "")

exporter := NewExporter(strings.Split(dsn, ","))
exporter := NewExporter(strings.Split(dsn, ","), &testScrapeDuration)
c.Assert(exporter, NotNil)

// Convert the default maps into a list of empty maps.
Expand All @@ -137,7 +144,7 @@ func (s *IntegrationSuite) TestUnknownMetricParsingDoesntCrash(c *C) {
exporter.builtinMetricMaps = emptyMaps

// scrape the exporter and make sure it works
exporter.scrape(ch)
exporter.scrape(context.Background(), ch)
}

// TestExtendQueriesDoesntCrash tests that specifying extend.query-path doesn't
Expand All @@ -154,24 +161,25 @@ func (s *IntegrationSuite) TestExtendQueriesDoesntCrash(c *C) {
c.Assert(dsn, Not(Equals), "")

exporter := NewExporter(
strings.Split(dsn, ","),
strings.Split(dsn, ","), &testScrapeDuration,
WithUserQueriesPath("../user_queries_test.yaml"),
)
c.Assert(exporter, NotNil)

// scrape the exporter and make sure it works
exporter.scrape(ch)
exporter.scrape(context.Background(), ch)
}

func (s *IntegrationSuite) TestAutoDiscoverDatabases(c *C) {
dsn := os.Getenv("DATA_SOURCE_NAME")

exporter := NewExporter(
strings.Split(dsn, ","),
&testScrapeDuration,
)
c.Assert(exporter, NotNil)

dsns := exporter.discoverDatabaseDSNs()
dsns := exporter.discoverDatabaseDSNs(context.Background())

c.Assert(len(dsns), Equals, 2)
}
5 changes: 3 additions & 2 deletions cmd/postgres_exporter/queries.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package main

import (
"context"
"errors"
"fmt"

Expand Down Expand Up @@ -282,8 +283,8 @@ func addQueries(content []byte, pgVersion semver.Version, server *Server) error
return nil
}

func queryDatabases(server *Server) ([]string, error) {
rows, err := server.db.Query("SELECT datname FROM pg_database WHERE datallowconn = true AND datistemplate = false AND datname != current_database()")
func queryDatabases(ctx context.Context, server *Server) ([]string, error) {
rows, err := server.db.QueryContext(ctx, "SELECT datname FROM pg_database WHERE datallowconn = true AND datistemplate = false AND datname != current_database()")
if err != nil {
return nil, fmt.Errorf("Error retrieving databases: %v", err)
}
Expand Down
Loading