Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions internal/conf/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,12 @@ type EmailProviderConfiguration struct {
MagicLinkEnabled bool `json:"magic_link_enabled" default:"true" split_words:"true"`
}

type DBAdvisorConfiguration struct {
Enabled bool `json:"enabled" default:"true"`
SamplingInterval time.Duration `json:"sampling_interval" split_words:"true" default:"200ms"`
ObservationInterval time.Duration `json:"observation_interval" split_words:"true" default:"20s"`
}

// DBConfiguration holds all the database related configuration.
type DBConfiguration struct {
Driver string `json:"driver" required:"true"`
Expand All @@ -106,6 +112,8 @@ type DBConfiguration struct {
HealthCheckPeriod time.Duration `json:"health_check_period" split_words:"true"`
MigrationsPath string `json:"migrations_path" split_words:"true" default:"./migrations"`
CleanupEnabled bool `json:"cleanup_enabled" split_words:"true" default:"false"`

Advisor DBAdvisorConfiguration `json:"advisor"`
}

func (c *DBConfiguration) Validate() error {
Expand Down
98 changes: 98 additions & 0 deletions internal/storage/advisor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package storage

import (
"database/sql"
"math"
"time"
)

type Advisory struct {
LongWaitDurationSamples int
Over2WaitingSamples int
}

type Advisor struct {
StatsFunc func() sql.DBStats
AdviseFunc func(Advisory)
Interval time.Duration

Stats sql.DBStats
LastAdvisedAt time.Time

Iterations int

WaitDurationSamples []time.Duration
WaitCountSamples []int64
}

func (a *Advisor) Start(observeDuration time.Duration) {
a.setup(observeDuration)

go func() {
// after server start the db stats are going to be worse, so ignore that period
time.Sleep(observeDuration)

a.Stats = a.StatsFunc()

for {
time.Sleep(a.Interval)
a.loop()
}
}()
}

func (a *Advisor) setup(observeDuration time.Duration) {
nSamples := int(math.Round(observeDuration.Seconds() / a.Interval.Seconds()))

a.WaitDurationSamples = make([]time.Duration, nSamples)
a.WaitCountSamples = make([]int64, nSamples)
}

func (a *Advisor) loop() {
a.Iterations += 1
if a.Iterations < 0 {
a.Iterations = 0
}

previousStats := a.Stats
a.Stats = a.StatsFunc()

a.WaitDurationSamples[a.Iterations%len(a.WaitDurationSamples)] = a.Stats.WaitDuration - previousStats.WaitDuration
a.WaitCountSamples[a.Iterations%len(a.WaitCountSamples)] = a.Stats.WaitCount - previousStats.WaitCount

advise := false

longWaitDurationSamples := 0
if a.Iterations >= len(a.WaitDurationSamples) {
for _, sample := range a.WaitDurationSamples {
if sample >= a.Interval {
longWaitDurationSamples += 1
}
}

// 1/3 of the observation time was spent waiting for over one sampling interval
advise = longWaitDurationSamples >= (len(a.WaitDurationSamples) / 3)
}

over2WaitingSamples := 0
if !advise && a.Iterations >= len(a.WaitCountSamples) {
for _, sample := range a.WaitCountSamples {
if sample > 2 {
over2WaitingSamples += 1
}
}

// 1/3 of the observation time we saw more than 2 goroutines waiting for a connection
advise = over2WaitingSamples >= (len(a.WaitCountSamples) / 3)
}

if advise && time.Since(a.LastAdvisedAt) >= time.Hour {
a.LastAdvisedAt = time.Now()

a.AdviseFunc(Advisory{
LongWaitDurationSamples: longWaitDurationSamples,
Over2WaitingSamples: over2WaitingSamples,
})

}
}
96 changes: 96 additions & 0 deletions internal/storage/advisor_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package storage

import (
"database/sql"
"testing"
"time"

"github.com/stretchr/testify/require"
)

func TestAdvisorZeroChanges(t *testing.T) {
advised := false

advisor := Advisor{
AdviseFunc: func(advisory Advisory) {
advised = true
},
StatsFunc: func() sql.DBStats {
return sql.DBStats{}
},
Interval: time.Millisecond,
}

advisor.setup(10 * time.Millisecond)

require.Len(t, advisor.WaitDurationSamples, 10)
require.Len(t, advisor.WaitCountSamples, 10)

advisor.Stats = advisor.StatsFunc()

for i := 0; i < 11; i += 1 {
advisor.loop()
}

require.False(t, advised)
}

func TestAdvisorWaitDuration(t *testing.T) {
advised := 0

dbStats := sql.DBStats{}

advisor := Advisor{
AdviseFunc: func(advisory Advisory) {
advised += 1
},
StatsFunc: func() sql.DBStats {
return dbStats
},
Interval: time.Millisecond,
}

advisor.setup(10 * time.Millisecond)

require.Len(t, advisor.WaitDurationSamples, 10)
require.Len(t, advisor.WaitCountSamples, 10)

advisor.Stats = advisor.StatsFunc()

for i := 0; i < 20; i += 1 {
dbStats.WaitDuration += time.Millisecond
advisor.loop()
}

require.Equal(t, advised, 1)
}

func TestAdvisorWaitCount(t *testing.T) {
advised := 0

dbStats := sql.DBStats{}

advisor := Advisor{
AdviseFunc: func(advisory Advisory) {
advised += 1
},
StatsFunc: func() sql.DBStats {
return dbStats
},
Interval: time.Millisecond,
}

advisor.setup(10 * time.Millisecond)

require.Len(t, advisor.WaitDurationSamples, 10)
require.Len(t, advisor.WaitCountSamples, 10)

advisor.Stats = advisor.StatsFunc()

for i := 0; i < 20; i += 1 {
dbStats.WaitCount += 3
advisor.loop()
}

require.Equal(t, advised, 1)
}
22 changes: 20 additions & 2 deletions internal/storage/dial.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,13 +83,13 @@ func Dial(config *conf.GlobalConfiguration) (*Connection, error) {
}

if config.Metrics.Enabled {
registerOpenTelemetryDatabaseStats(db)
registerOpenTelemetryDatabaseStats(db, config)
}

return &Connection{db}, nil
}

func registerOpenTelemetryDatabaseStats(db *pop.Connection) {
func registerOpenTelemetryDatabaseStats(db *pop.Connection, config *conf.GlobalConfiguration) {
defer func() {
if rec := recover(); rec != nil {
logrus.WithField("error", rec).Error("registerOpenTelemetryDatabaseStats is not able to determine database object with reflection -- panicked")
Expand All @@ -111,6 +111,24 @@ func registerOpenTelemetryDatabaseStats(db *pop.Connection) {
} else {
logrus.Debug("registered OpenTelemetry stats metrics for database")
}

if config.DB.Advisor.Enabled {
advisor := Advisor{
StatsFunc: func() sql.DBStats {
return sqldb.Stats()
},
Interval: config.DB.Advisor.SamplingInterval,
AdviseFunc: func(advisory Advisory) {
logrus.WithFields(logrus.Fields{
"component": "db.advisor",
"long_wait_duration_samples": advisory.LongWaitDurationSamples,
"over_2_waiting_samples": advisory.Over2WaitingSamples,
}).Warn("Suboptimal database connection pool settings detected! Consider doubling the max DB pool size configuration")
},
}

advisor.Start(config.DB.Advisor.ObservationInterval)
}
}

type CommitWithError struct {
Expand Down