From 14ba6564d2486a142aa46f4b3bc496357fecf817 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B8ren=20Rasmussen?= Date: Tue, 16 Nov 2021 06:19:24 +0100 Subject: [PATCH] New direction? --- cmd/fermentord/config.go | 65 ++++++ cmd/fermentord/config.hcl | 27 +++ cmd/fermentord/conversion.go | 22 +++ cmd/fermentord/db.go | 73 +++++++ cmd/fermentord/main.go | 250 +++++++++--------------- internal/api/http.go | 78 ++++---- internal/controllers/chamber.go | 162 +++++++++++---- internal/controllers/config.go | 24 ++- internal/controllers/hysteresis.go | 226 --------------------- internal/controllers/hysteresis_test.go | 52 ----- internal/dal/db.go | 44 ++++- internal/dal/db_migrations.go | 2 + internal/dal/readings.go | 6 +- internal/hw/gpio.go | 9 - internal/metrics/prometheus.go | 8 +- pkg/daemon/wait.go | 28 +++ pkg/temperature/ds18b20.go | 73 ++++--- 17 files changed, 591 insertions(+), 558 deletions(-) create mode 100644 cmd/fermentord/config.go create mode 100644 cmd/fermentord/config.hcl create mode 100644 cmd/fermentord/conversion.go create mode 100644 cmd/fermentord/db.go delete mode 100644 internal/controllers/hysteresis.go delete mode 100644 internal/controllers/hysteresis_test.go create mode 100644 pkg/daemon/wait.go diff --git a/cmd/fermentord/config.go b/cmd/fermentord/config.go new file mode 100644 index 0000000..75a5753 --- /dev/null +++ b/cmd/fermentord/config.go @@ -0,0 +1,65 @@ +package main + +import ( + "git.joco.dk/sng/fermentord/internal/controllers" + "git.joco.dk/sng/fermentord/pkg/temperature" + "github.com/getsentry/sentry-go" + "github.com/rs/zerolog/log" + "github.com/spf13/viper" +) + +type configuration struct { + controllers.ControllerConfig + + HTTP struct { + Port int16 `mapstructure:"port"` + } `mapstructure:"http"` + + DB struct { + Local struct { + DSN string `mapstructure:"dsn"` + } `mapstructure:"local"` + } `mapstructure:"db"` +} + +func loadConfiguration() *configuration { + config := &configuration{} + + viper.SetDefault("pid.kp", 2.0) + viper.SetDefault("pid.ki", 0.0001) + viper.SetDefault("pid.kd", 2.0) + viper.SetDefault("db.local.dsn", "/var/lib/fermentord/local.db") + viper.SetDefault("http.port", 8000) + viper.SetDefault("db.local.dsn", "./fermentord.db") + + viper.AllowEmptyEnv(true) + viper.AutomaticEnv() + viper.AddConfigPath("/etc") + viper.SetConfigName("fermentord") + viper.SetConfigType("hcl") + + if err := viper.ReadInConfig(); err != nil { + log.Debug(). + Err(err). + Msg("Error loading configuration") + } + + if err := viper.Unmarshal(&config); err != nil { + sentry.CaptureException(err) + log.Fatal(). + Err(err). + Msg("Error unmarshalling configuration") + } + + return config +} + +func reloadConfiguration(config *configuration, ctrl *controllers.ChamberController) { + temperature.ConfigUpdates <- []string{ + config.Sensor.Ambient, + config.Sensor.Chamber, + config.Sensor.Wort, + } + + ctrl.ConfigUpdates <- config.ControllerConfig +} diff --git a/cmd/fermentord/config.hcl b/cmd/fermentord/config.hcl new file mode 100644 index 0000000..87f9fce --- /dev/null +++ b/cmd/fermentord/config.hcl @@ -0,0 +1,27 @@ +// fermentord + +fermentation_temp = 15.0 +delta_temp = 0.5 + +sensors { + wort = "asdf" + chamber = "sldkfj" + ambient = "skdjf" +} + +limits { + min_chamber_temp = 5 + min_cooler_runtime_secs = 300 + max_cooler_runtime_secs = 86400 + min_cooler_cooldown_secs = 300 +} + +pid { + kp = 2.0 + ki = 0.0001 + kd = 2.0 +} + +db "local" { + dsn = "/var/lib/fermentord/local.db" +} diff --git a/cmd/fermentord/conversion.go b/cmd/fermentord/conversion.go new file mode 100644 index 0000000..be5ce74 --- /dev/null +++ b/cmd/fermentord/conversion.go @@ -0,0 +1,22 @@ +package main + +import ( + "git.joco.dk/sng/fermentord/internal/controllers" + "git.joco.dk/sng/fermentord/internal/dal" +) + +func convertChamberState(state controllers.ChamberState) dal.ChamberState { + switch state { + case controllers.ChamberStateIdle: + return dal.ChamberStateIdle + + case controllers.ChamberStateCooling: + return dal.ChamberStateCooling + + case controllers.ChamberStateHeating: + return dal.ChamberStateHeating + + default: + return dal.ChamberStateIdle + } +} diff --git a/cmd/fermentord/db.go b/cmd/fermentord/db.go new file mode 100644 index 0000000..4575e6b --- /dev/null +++ b/cmd/fermentord/db.go @@ -0,0 +1,73 @@ +package main + +import ( + "context" + "database/sql" + "sync" + "time" + + "git.joco.dk/sng/fermentord/internal/controllers" + "git.joco.dk/sng/fermentord/internal/dal" + "git.joco.dk/sng/fermentord/pkg/temperature" + "github.com/getsentry/sentry-go" + "github.com/rs/zerolog/log" +) + +func migrateDB(config configuration) { + migrator, err := dal.NewMigrator(config.DB.Local.DSN) + if err != nil { + sentry.CaptureException(err) + log.Fatal().Err(err).Msg("Error initializing DB migrations") + } + migrator.Up() + serr, dberr := migrator.Close() + if serr != nil { + sentry.CaptureException(serr) + log.Fatal().Err(serr).Msg("DB migration source error") + } + if dberr != nil { + sentry.CaptureException(dberr) + log.Fatal().Err(dberr).Msg("DB migration database error") + } +} + +func persistData(ctx context.Context, wg *sync.WaitGroup, db *dal.DAL, chState <-chan controllers.ChamberState, chTemp <-chan temperature.TemperatureReading) { + defer wg.Done() + + for { + select { + case state, ok := <-chState: + if !ok { + break + } + + ctxTo, cancel := context.WithTimeout(ctx, 10*time.Second) + err := db.InReadWriteTransaction(ctxTo, func(tx *sql.Tx) error { + return dal.SaveChamberState(ctxTo, tx, convertChamberState(state)) + }) + if err != nil { + sentry.CaptureException(err) + log.Error().Err(err).Msg("Error persisting state change") + } + cancel() + + case t, ok := <-chTemp: + if !ok { + break + } + + ctxTo, cancel := context.WithTimeout(ctx, 10*time.Second) + err := db.InReadWriteTransaction(ctxTo, func(tx *sql.Tx) error { + return dal.SaveTemperatureReading(ctx, tx, t.Time, t.Sensor, t.MilliDegrees) + }) + if err != nil { + sentry.CaptureException(err) + log.Error().Err(err).Msg("Error persisting temperature reading") + } + cancel() + + case <-ctx.Done(): + return + } + } +} diff --git a/cmd/fermentord/main.go b/cmd/fermentord/main.go index 2aaa19b..9db6707 100644 --- a/cmd/fermentord/main.go +++ b/cmd/fermentord/main.go @@ -2,11 +2,9 @@ package main import ( "context" + "fmt" "net/http" - "os" - "os/signal" "sync" - "syscall" "time" "git.joco.dk/sng/fermentord/internal/api" @@ -14,6 +12,7 @@ import ( "git.joco.dk/sng/fermentord/internal/dal" "git.joco.dk/sng/fermentord/internal/hw" "git.joco.dk/sng/fermentord/internal/metrics" + "git.joco.dk/sng/fermentord/pkg/daemon" "git.joco.dk/sng/fermentord/pkg/temperature" "github.com/fsnotify/fsnotify" "github.com/getsentry/sentry-go" @@ -22,93 +21,70 @@ import ( "github.com/spf13/viper" ) -type configuration struct { - Db struct { - Dsn string `mapstructure:"dsn"` - } `mapstructure:"db"` - Temperature struct { - WortSensor string `mapstructure:"wort_sensor"` - ChamberSensor string `mapstructure:"chamber_sensor"` - AmbientSensor string `mapstructure:"ambient_sensor"` - } `mapstructure:"temperature"` - Hysteresis controllers.Config `mapstructure:"hysteresis"` -} - -var ( - config configuration - ctx context.Context - wg *sync.WaitGroup - hysteresis *controllers.Hysteresis - gpio *hw.Gpio -) - -func bool2ChamberState(state bool) dal.ChamberState { - if state { - return dal.ChamberStateCooling - } - return dal.ChamberStateIdle -} - -func mainLoop(db *dal.DAL, wg *sync.WaitGroup) { +func mainLoop(ctx context.Context, wg *sync.WaitGroup, db *dal.DAL, config *configuration) { defer wg.Done() + temperature.Initialize() + + // Controller + chCtrlTemp := make(chan temperature.TemperatureReading, 10) + defer close(chCtrlTemp) + ctrl := controllers.NewChamberController("Chamber 1", config.ControllerConfig, chCtrlTemp) + + // Configuration reload + viper.OnConfigChange(func(in fsnotify.Event) { + reloadConfiguration(config, ctrl) + }) + viper.WatchConfig() + reloadConfiguration(config, ctrl) + + gpio, err := hw.NewGpio() + if err != nil { + sentry.CaptureException(err) + log.Fatal(). + Err(err). + Msg("Error initializing GPIO") + } + defer gpio.Close() + + chDbTemp := make(chan temperature.TemperatureReading, 10) + chDbState := make(chan controllers.ChamberState, 10) + defer close(chDbTemp) + defer close(chDbState) + + wg.Add(3) + go persistData(ctx, wg, db, chDbState, chDbTemp) + go ctrl.Run(ctx, wg) + go temperature.PollSensors(ctx, wg, 1*time.Second) + for { select { + case reading := <-temperature.C: + chCtrlTemp <- reading + chDbTemp <- reading + + case state := <-ctrl.C: + switch state { + case controllers.ChamberStateIdle: + metrics.State.Set(metrics.MetricStateIdle) + gpio.StopCooler() + gpio.StopHeater() + + case controllers.ChamberStateCooling: + metrics.State.Set(metrics.MetricStateCooling) + gpio.StopHeater() + gpio.StartCooler() + + case controllers.ChamberStateHeating: + metrics.State.Set(metrics.MetricStateHeating) + gpio.StopCooler() + gpio.StartHeater() + } + + chDbState <- state + case <-ctx.Done(): return - - case state := <-hysteresis.C: - ctx := context.Background() - - if state { - metrics.State.Set(1) - //gpio.StartCooler() - } else { - metrics.State.Set(0) - //gpio.StopCooler() - } - - dbCtx, cancel := context.WithTimeout(ctx, 10*time.Second) - tx, err := db.Begin() - if err != nil { - sentry.CaptureException(err) - log.Error().Err(err).Msg("Failed to start DB transaction") - } else { - dal.SaveChamberState(dbCtx, tx, bool2ChamberState(state)) - err = tx.Commit() - if err != nil { - sentry.CaptureException(err) - log.Error().Err(err).Msg("Failed to commit DB transaction") - } - } - cancel() - - case reading := <-temperature.C: - ctx := context.Background() - - switch reading.Sensor { - case config.Temperature.AmbientSensor: - hysteresis.UpdateAmbientTemperature(reading.Degrees()) - - case config.Temperature.ChamberSensor: - hysteresis.UpdateChamberTemperature(reading.Degrees()) - - case config.Temperature.WortSensor: - hysteresis.UpdateWortTemperature(reading.Degrees()) - } - - dbCtx, cancel := context.WithTimeout(ctx, 10*time.Second) - tx, err := db.Begin() - if err != nil { - log.Error().Err(err).Msg("Failed to start DB transaction") - } else { - dal.SaveTemperatureReading(dbCtx, tx, reading.Sensor, reading.MilliDegrees) - err = tx.Commit() - if err != nil { - log.Error().Err(err).Msg("Failed to commit DB transaction") - } - } - cancel() } } } @@ -120,79 +96,32 @@ func main() { TracesSampleRate: 0.01, }) if err != nil { - log.Fatal().Err(err).Msg("Failed to initialize Sentry.") + log.Fatal(). + Err(err). + Msg("Failed to initialize Sentry.") } defer sentry.Flush(10 * time.Second) // Configuration - viper.AllowEmptyEnv(true) - viper.AutomaticEnv() - viper.SetDefault("db.dsn", "./fermentord.db") - viper.AddConfigPath("/etc") - viper.SetConfigName("fermentord") - viper.SetConfigType("hcl") - viper.ReadInConfig() - viper.Unmarshal(&config) - - ctxb := context.Background() - ctx, shutdown := context.WithCancel(ctxb) - wg = &sync.WaitGroup{} - - temperature.Initialize(10 * time.Second) - hysteresis = controllers.NewHysteresis("Chamber 1") - - reloadConfig := func() { - temperature.SetSensors([]string{ - config.Temperature.AmbientSensor, - config.Temperature.ChamberSensor, - config.Temperature.WortSensor, - }) - hysteresis.SetConfig(&config.Hysteresis) - } - - viper.OnConfigChange(func(in fsnotify.Event) { - reloadConfig() - }) - viper.WatchConfig() - reloadConfig() + config := loadConfiguration() // Database - migrator, err := dal.NewMigrator(config.Db.Dsn) - if err != nil { - log.Fatal().Err(err).Msg("Error initializing DB migrations") - } - migrator.Up() - serr, dberr := migrator.Close() - if serr != nil { - log.Fatal().Err(serr).Msg("DB migration source error") - } - if dberr != nil { - log.Fatal().Err(dberr).Msg("DB migration database error") - } + migrateDB(*config) var db *dal.DAL - if db, err := dal.NewDAL(config.Db.Dsn); err != nil { + if db, err := dal.NewDAL(config.DB.Local.DSN); err != nil { + sentry.CaptureException(err) log.Fatal().Err(err).Msg("Error connecting to DB") } else { - db.Initialize() defer db.Close() } - // GPIO - log.Info().Msg("Initializing GPIO") - gpio, err := hw.NewGpio() - if err != nil { - log.Fatal().Err(err).Send() - } - defer gpio.Close() - // HTTP mux := http.NewServeMux() srv := &http.Server{ - Addr: ":8000", + Addr: fmt.Sprintf(":%v", config.HTTP.Port), Handler: mux, } - api := api.NewAPI(db) // Metrics @@ -202,31 +131,32 @@ func main() { mux.HandleFunc("/state-changes", api.GetStateChanges) // Main - wg.Add(2) - go temperature.Serve(ctx, wg) - go mainLoop(db, wg) + ctxb := context.Background() + ctx, shutdown := context.WithCancel(ctxb) + wg := &sync.WaitGroup{} + + wg.Add(1) + go mainLoop(ctx, wg, db, config) + go srv.ListenAndServe() - done := make(chan os.Signal, 1) - signal.Notify(done, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) - <-done - - go func() { - ctx, cancel := context.WithTimeout(ctxb, 10*time.Second) - defer cancel() - err := srv.Shutdown(ctx) - if err != nil { - log.Error().Err(err).Msg("Error shutting down HTTP server.") - } - }() + daemon.WaitForExitSignal() // Initiate graceful shutdown. + wg.Add(1) + go shutdownHTTP(ctxb, wg, srv) shutdown() - - // Wait for graceful shutdown. wg.Wait() - - // TODO PID for heating - // TODO Data export - // TODO SQL migrations https://github.com/golang-migrate/migrate/tree/master/database/sqlite +} + +func shutdownHTTP(ctx context.Context, wg *sync.WaitGroup, srv *http.Server) { + defer wg.Done() + ctx2, cancel := context.WithTimeout(ctx, 10*time.Second) + defer cancel() + if err := srv.Shutdown(ctx2); err != nil { + sentry.CaptureException(err) + log.Error(). + Err(err). + Msg("Error shutting down HTTP server.") + } } diff --git a/internal/api/http.go b/internal/api/http.go index 3dda13f..5f38213 100644 --- a/internal/api/http.go +++ b/internal/api/http.go @@ -2,6 +2,7 @@ package api import ( "context" + "database/sql" "encoding/json" "fmt" "net/http" @@ -15,7 +16,7 @@ import ( type API struct { db *dal.DAL - config *controllers.Config + config *controllers.ControllerConfig configChangeCallback func() } @@ -27,15 +28,14 @@ func NewAPI(db *dal.DAL) *API { func (a *API) HealthCheck(w http.ResponseWriter, r *http.Request) { defer sentry.Recover() + ctxb := context.Background() - tx, err := a.db.Begin() - if err != nil { - http.Error(w, "Unhealthy", http.StatusServiceUnavailable) - return - } - - var version string - err = tx.QueryRow("SELECT version()").Scan(&version) + ctx, cancel := context.WithTimeout(ctxb, 5*time.Second) + defer cancel() + err := a.db.InReadOnlyTransaction(ctx, func(tx *sql.Tx) error { + _, err := tx.Query("SELECT version()") + return err + }) if err != nil { http.Error(w, "Unhealthy", http.StatusServiceUnavailable) return @@ -57,13 +57,7 @@ func (a *API) GetConfig(w http.ResponseWriter, r *http.Request) { func (a *API) GetTemperatures(w http.ResponseWriter, r *http.Request) { defer sentry.Recover() - - ctx := context.Background() - tx, err := a.db.Begin() - defer tx.Rollback() - if err != nil { - log.Panic().Err(err).Msg("Failed to start DB transaction.") - } + ctxb := context.Background() keys, ok := r.URL.Query()["since"] if !ok || len(keys[0]) < 1 { @@ -77,30 +71,32 @@ func (a *API) GetTemperatures(w http.ResponseWriter, r *http.Request) { return } - dbCtx, cancel := context.WithTimeout(ctx, 30*time.Second) + ctx, cancel := context.WithTimeout(ctxb, 30*time.Second) defer cancel() - readings, err := dal.LoadTemperatureReadingsSince(dbCtx, tx, since) + var data []byte + err = a.db.InReadOnlyTransaction(ctx, func(tx *sql.Tx) error { + readings, err := dal.LoadTemperatureReadingsSince(ctx, tx, since) + if err != nil { + return err + } + + data, err = json.MarshalIndent(readings, "", " ") + if err != nil { + return err + } + + return nil + }) if err != nil { log.Panic().Err(err).Msg("Failed to load temperature readings from DB.") } - tx.Commit() - - data, err := json.MarshalIndent(readings, "", " ") - if err != nil { - log.Panic().Err(err).Msg("Failed to serialize temperature readings to JSON.") - } fmt.Fprintf(w, string(data)) } func (a *API) GetStateChanges(w http.ResponseWriter, r *http.Request) { defer sentry.Recover() - - ctx := context.Background() - tx, err := a.db.Begin() - if err != nil { - log.Panic().Err(err).Msg("Failed to start DB transaction.") - } + ctxb := context.Background() keys, ok := r.URL.Query()["since"] if !ok || len(keys[0]) < 1 { @@ -114,15 +110,23 @@ func (a *API) GetStateChanges(w http.ResponseWriter, r *http.Request) { return } - dbCtx, cancel := context.WithTimeout(ctx, 30*time.Second) + ctx, cancel := context.WithTimeout(ctxb, 30*time.Second) defer cancel() - readings, err := dal.LoadChamberStateChangesSince(dbCtx, tx, since) - if err != nil { - log.Panic().Err(err).Msg("Failed to load temperature readings from DB.") - } - tx.Commit() + var data []byte + err = a.db.InReadOnlyTransaction(ctx, func(tx *sql.Tx) error { + readings, err := dal.LoadChamberStateChangesSince(ctx, tx, since) + if err != nil { + return err + } + + data, err = json.MarshalIndent(readings, "", " ") + if err != nil { + return err + } + + return nil + }) - data, err := json.MarshalIndent(readings, "", " ") if err != nil { log.Panic().Err(err).Msg("Failed to serialize temperature readings to JSON.") } diff --git a/internal/controllers/chamber.go b/internal/controllers/chamber.go index d49b06b..31eee93 100644 --- a/internal/controllers/chamber.go +++ b/internal/controllers/chamber.go @@ -1,9 +1,11 @@ package controllers import ( + "context" "sync" "time" + "git.joco.dk/sng/fermentord/pkg/temperature" "github.com/rs/zerolog/log" ) @@ -16,13 +18,14 @@ const ( ) type ChamberController struct { - config *Config + ConfigUpdates chan ControllerConfig + config ControllerConfig // Current state. chamberState ChamberState lastChamberStateChange time.Time + lastCoolerStateChange time.Time C chan ChamberState - mutex *sync.Mutex name string @@ -31,48 +34,139 @@ type ChamberController struct { chamberTemperature float64 wortTemperature float64 - pid *PIDController - initialized bool + chTemp <-chan temperature.TemperatureReading + + pid *PIDController } -func NewChamberController(name string, config *Config) *ChamberController { +func NewChamberController(name string, config ControllerConfig, chTemp <-chan temperature.TemperatureReading) *ChamberController { return &ChamberController{ - C: make(chan ChamberState), - name: name, - config: config, - pid: NewPIDController(config.PID.Kp, config.PID.Ki, config.PID.Kd), - mutex: &sync.Mutex{}, + C: make(chan ChamberState), + name: name, + config: config, + pid: NewPIDController(config.PID.Kp, config.PID.Ki, config.PID.Kd), + chTemp: chTemp, + chamberState: ChamberStateIdle, + ConfigUpdates: make(chan ControllerConfig, 1), } } -func (p *ChamberController) Run() { +func (p *ChamberController) Run(ctx context.Context, wg *sync.WaitGroup) { + defer wg.Done() + + ticker := time.NewTicker(1 * time.Second) + for { - offset := p.pid.Compute(p.wortTemperature, p.config.FermentationTemperature) - chamberTargetTemp := p.config.FermentationTemperature + offset - log.Debug().Float64("chamber_target_temp", chamberTargetTemp).Send() + select { + case <-ticker.C: + state := p.computeChamberState() + p.setChamberState(state) - // TODO Merge hysteresis algorithm into this one. + case temp := <-p.chTemp: + p.setTemperature(temp) - if !p.initialized { - // heater off - // cooler off - continue - } + case c := <-p.ConfigUpdates: + p.config = c - if p.wortTemperature < p.config.FermentationTemperature && p.chamberTemperature < chamberTargetTemp { - // heater on - } else if p.wortTemperature >= p.config.FermentationTemperature || p.chamberTemperature >= chamberTargetTemp { - // heater off - } else { - // heater off - } - - if p.wortTemperature > p.config.FermentationTemperature && p.chamberTemperature > chamberTargetTemp { - // cooler on - } else if p.wortTemperature <= p.config.FermentationTemperature || p.chamberTemperature <= chamberTargetTemp { - // cooler off - } else { - // cooler off + case <-ctx.Done(): + ticker.Stop() + p.setChamberState(ChamberStateIdle) + return } } } + +func (p *ChamberController) setTemperature(t temperature.TemperatureReading) { + switch t.Sensor { + case p.config.Sensor.Ambient: + p.ambientTemperature = t.Degrees() + + case p.config.Sensor.Chamber: + p.chamberTemperature = t.Degrees() + + case p.config.Sensor.Wort: + p.wortTemperature = t.Degrees() + + default: + log.Warn(). + Str("sensor", t.Sensor). + Int64("temp", t.MilliDegrees). + Msg("Unknown sensor") + } +} + +func (p *ChamberController) setChamberState(state ChamberState) { + if state == p.chamberState { + return + } + + log.Info(). + Int("from", int(p.chamberState)). + Int("to", int(state)). + Msgf("State changed") + + if p.chamberState == ChamberStateCooling || state == ChamberStateCooling { + p.lastCoolerStateChange = time.Now() + } + + p.chamberState = state + p.lastChamberStateChange = time.Now() + p.C <- state +} + +func (p *ChamberController) computeChamberState() ChamberState { + offset := p.pid.Compute(p.wortTemperature, p.config.FermentationTemperature) + chamberTargetTemp := p.config.FermentationTemperature + offset + log.Debug(). + Float64("amb_temp", p.ambientTemperature). + Float64("chamber_temp", p.chamberTemperature). + Float64("wort_temp", p.wortTemperature). + Float64("setpoint_temp", p.config.FermentationTemperature). + Float64("offset", offset). + Float64("chamber_target_temp", chamberTargetTemp). + Float64("kp", p.pid.kp). + Float64("ki", p.pid.ki). + Float64("kd", p.pid.kd). + Send() + + runtimeSecs := time.Since(p.lastChamberStateChange).Seconds() + + if p.chamberState == ChamberStateCooling { + // Ensure compressor min. runtime + if runtimeSecs < p.config.Limits.MinCoolerRuntimeSecs { + return p.chamberState + } + + // Limit compressor runtime + if runtimeSecs > p.config.Limits.MaxCoolerRuntimeSecs { + return ChamberStateIdle + } + + // Limit chamber min. temp. + if p.chamberTemperature < p.config.Limits.MinChamberTemperature { + return ChamberStateIdle + } + } + + var next ChamberState + heater := p.wortTemperature < p.config.FermentationTemperature-p.config.DeltaTemperature && p.chamberTemperature < chamberTargetTemp-p.config.DeltaTemperature + cooler := p.wortTemperature > p.config.FermentationTemperature+p.config.DeltaTemperature && p.chamberTemperature > chamberTargetTemp+p.config.DeltaTemperature + + if cooler && heater { + // This should not happen! + next = ChamberStateIdle + } else if !cooler && !heater { + next = ChamberStateIdle + } else if cooler { + next = ChamberStateCooling + } else if heater { + next = ChamberStateHeating + } + + // Ensure compressor cooldown + if p.chamberState != ChamberStateCooling && next == ChamberStateCooling && time.Since(p.lastCoolerStateChange).Seconds() < p.config.Limits.MinCoolerCooldownSecs { + return ChamberStateIdle + } + + return next +} diff --git a/internal/controllers/config.go b/internal/controllers/config.go index 3782d0e..ca03b56 100644 --- a/internal/controllers/config.go +++ b/internal/controllers/config.go @@ -1,13 +1,23 @@ package controllers -type Config struct { +type ControllerConfig struct { + Sensor struct { + Wort string `mapstructure:"wort"` + Chamber string `mapstructure:"chamber"` + Ambient string `mapstructure:"ambient"` + } `mapstructure:"sensors"` + FermentationTemperature float64 `mapstructure:"fermentation_temp"` - MaxWortDelta float64 `mapstructure:"max_wort_delta"` - MinChamberTemperature float64 `mapstructure:"min_chamber_temp"` - MinCoolerRuntimeSecs float64 `mapstructure:"min_cooler_runtime_secs"` // 900 - MaxCoolerRuntimeSecs float64 `mapstructure:"max_cooler_runtime_secs"` // 86400 - MinCoolerCooldownSecs float64 `mapstructure:"min_cooler_cooldown_secs"` // 900 - PID struct { + DeltaTemperature float64 `mapstructure:"delta_temp"` + + Limits struct { + MinChamberTemperature float64 `mapstructure:"min_chamber_temp"` + MinCoolerRuntimeSecs float64 `mapstructure:"min_cooler_runtime_secs"` // 900 + MaxCoolerRuntimeSecs float64 `mapstructure:"max_cooler_runtime_secs"` // 86400 + MinCoolerCooldownSecs float64 `mapstructure:"min_cooler_cooldown_secs"` // 900 + } `mapstructure:"limits"` + + PID struct { Kp float64 `mapstructure:"kp"` // 2.0 Ki float64 `mapstructure:"ki"` // 0.0001 Kd float64 `mapstructure:"kd"` // 2.0 diff --git a/internal/controllers/hysteresis.go b/internal/controllers/hysteresis.go deleted file mode 100644 index 5097f2d..0000000 --- a/internal/controllers/hysteresis.go +++ /dev/null @@ -1,226 +0,0 @@ -package controllers - -import ( - "sync" - "time" - - "github.com/rs/zerolog/log" -) - -// Hysteresis will only run when the ambient temperature is above the -// fermentation temperature. The compressor will run until the wort -// is cooled to FermentationTemperature, the chamber is cooled to -// MinChamberTemperature or the compressor has run for MaxCoolerRuntimeSecs. -// The compressor will run for min. MinCoolerRuntimeSecs before switching off, -// regardless of temperatures. Once turned off, the compressor will not be -// turned on before MinCoolerCooldownSecs has expired, regardless of temperatures. -type Hysteresis struct { - config *Config - - // Current state. - coolerState bool - lastCoolerStateChange time.Time - C chan bool - mutex *sync.Mutex - - name string - - // Current temperature readings. - ambientTemperature float64 - chamberTemperature float64 - wortTemperature float64 -} - -func NewHysteresis(name string) *Hysteresis { - return &Hysteresis{ - C: make(chan bool), - name: name, - mutex: &sync.Mutex{}, - } -} - -func (h *Hysteresis) SetConfig(newConfig *Config) { - h.lock() - h.config = newConfig - h.unlock() - h.update() -} - -func (h *Hysteresis) UpdateAmbientTemperature(value float64) { - h.lock() - h.ambientTemperature = value - h.unlock() - h.update() -} - -func (h *Hysteresis) UpdateChamberTemperature(value float64) { - h.lock() - h.chamberTemperature = value - h.unlock() - h.update() -} - -func (h *Hysteresis) UpdateWortTemperature(value float64) { - h.lock() - h.wortTemperature = value - h.unlock() - h.update() -} - -func (h *Hysteresis) lock() { - h.mutex.Lock() -} - -func (h *Hysteresis) unlock() { - h.mutex.Unlock() -} - -func (h *Hysteresis) GetCoolerState() bool { - return h.coolerState -} - -func (h *Hysteresis) setState(state bool) { - h.lock() - - if state == h.coolerState { - h.unlock() - return - } - - log.Debug().Bool("state", state).Msg("Setting state") - h.coolerState = state - h.lastCoolerStateChange = time.Now() - h.unlock() - - h.C <- state -} - -func (h *Hysteresis) update() { - // Compressor runtime has highest priority. - lastStateChangeAgeSecs := time.Since(h.lastCoolerStateChange).Seconds() - if h.coolerState { - // Keep the cooler running until min. runtime is reached. - if lastStateChangeAgeSecs < h.config.MinCoolerRuntimeSecs { - log.Debug(). - Str("name", h.name). - Float64("runtime", lastStateChangeAgeSecs). - Float64("min_runtime", h.config.MinCoolerRuntimeSecs). - Bool("state", h.coolerState). - Msg("Cooler kept running as it's runtime threshold is not yet reached.") - return - } - - // Stop the cooler if it's runtime exceeds max. - if lastStateChangeAgeSecs > h.config.MaxCoolerRuntimeSecs { - log.Info(). - Str("name", h.name). - Float64("runtime", lastStateChangeAgeSecs). - Float64("max_runtime", h.config.MaxCoolerRuntimeSecs). - Bool("state", false). - Msg("Cooler stopped as it's runtime exceeds max. threshold.") - h.setState(false) - return - } - } else { - // Keep the cooler off until min. cooldown time is reached. - if lastStateChangeAgeSecs < h.config.MinCoolerCooldownSecs { - log.Debug(). - Str("name", h.name). - Float64("runtime", lastStateChangeAgeSecs). - Float64("min_cooldown", h.config.MinCoolerCooldownSecs). - Bool("state", h.coolerState). - Msg("Cooler kept off as it's cooldown time threshold is not yet reached.") - return - } - } - - // Do not allow the cooler to run when the ambient temperature is below - // the fermentation temperature. - if h.ambientTemperature < h.config.FermentationTemperature { - if h.coolerState { - log.Info(). - Str("name", h.name). - Float64("ambient_temp", h.ambientTemperature). - Float64("fermentation_temp", h.config.FermentationTemperature). - Msg("Turn off cooler as ambient temperature is less than fermentation temperature.") - h.setState(false) - } else { - log.Debug(). - Str("name", h.name). - Float64("ambient_temp", h.ambientTemperature). - Float64("fermentation_temp", h.config.FermentationTemperature). - Msg("Cooler kept off as ambient temperature is less than fermentation temperature.") - } - return - } - - // Do not allow the cooler to run if the chamber temperature is below - // minimum. - if h.chamberTemperature <= h.config.MinChamberTemperature { - if h.coolerState { - log.Info(). - Str("name", h.name). - Float64("chamber_temp", h.chamberTemperature). - Float64("min_chamber_temp", h.config.MinChamberTemperature). - Msg("Cooler turned off as chamber temperature dropped below threshold.") - h.setState(false) - } else { - log.Debug(). - Str("name", h.name). - Float64("ambient_temp", h.ambientTemperature). - Float64("fermentation_temp", h.config.FermentationTemperature). - Msg("Cooler kept off as chamber temperature is below threshold.") - } - return - } - - // Keep the cooler stopped until the wort delta temperature reaches it's threshold. - delta := h.wortTemperature - h.config.FermentationTemperature - if h.coolerState { - // Keep the cooler running until the wort reaches the desired temp. - if delta <= 0 { - // TODO Investigate how much the wort temp. drops below fermentation temp. - // TODO A threshold may be needed. - log.Info(). - Str("name", h.name). - Float64("wort_temp", h.wortTemperature). - Float64("fermentation_temp", h.config.FermentationTemperature). - Float64("delta", delta). - Float64("max_wort_delta", h.config.MaxWortDelta). - Msg("Cooler stopped as wort is cooled down to fermentation temp.") - h.setState(false) - return - } else { - log.Debug(). - Str("name", h.name). - Float64("wort_temp", h.wortTemperature). - Float64("fermentation_temp", h.config.FermentationTemperature). - Float64("delta", delta). - Float64("max_wort_delta", h.config.MaxWortDelta). - Msg("Cooler kept running as the wort has not yet reached fermentation temp.") - } - } else { - // Start the cooler when delta exceeds threshold. - if delta > h.config.MaxWortDelta { - log.Info(). - Str("name", h.name). - Float64("wort_temp", h.wortTemperature). - Float64("fermentation_temp", h.config.FermentationTemperature). - Float64("delta", delta). - Float64("max_wort_delta", h.config.MaxWortDelta). - Msg("Cooler started as wort delta exceeds threshold.") - h.setState(true) - return - } else { - log.Debug(). - Str("name", h.name). - Float64("wort_temp", h.wortTemperature). - Float64("fermentation_temp", h.config.FermentationTemperature). - Float64("delta", delta). - Float64("max_wort_delta", h.config.MaxWortDelta). - Msg("Cooler kept off as delta does not yet exceed threshold.") - // TODO If this is reached, then we can lower the delta as min. cooldown is reached. - // TODO This is relevant for PID. - } - } -} diff --git a/internal/controllers/hysteresis_test.go b/internal/controllers/hysteresis_test.go deleted file mode 100644 index 14e36df..0000000 --- a/internal/controllers/hysteresis_test.go +++ /dev/null @@ -1,52 +0,0 @@ -package controllers - -import ( - "testing" - "time" -) - -func discard(h *Hysteresis) { - for { - <-h.C - } -} - -func TestHysteresisNormalCooling(t *testing.T) { - c := &Config{ - FermentationTemperature: 20, - MaxWortDelta: 0.5, - MinChamberTemperature: 2, - MinCoolerRuntimeSecs: 300, - MaxCoolerRuntimeSecs: 86400, - MinCoolerCooldownSecs: 300, - } - - h := NewHysteresis("test") - h.config = c - h.ambientTemperature = 25 - h.chamberTemperature = 22 - h.wortTemperature = 20 - go discard(h) - h.update() - - assert := func(expectedState bool) { - actualState := h.GetCoolerState() - if actualState != expectedState { - t.Errorf("Expected cooler state %v, but got %v", expectedState, actualState) - } - } - - assert(false) - - h.lastCoolerStateChange = h.lastCoolerStateChange.Add(-1 * time.Hour) - h.UpdateWortTemperature(20.5) - assert(false) - - h.lastCoolerStateChange = h.lastCoolerStateChange.Add(-1 * time.Hour) - h.UpdateWortTemperature(20.6) - assert(true) - - h.lastCoolerStateChange = h.lastCoolerStateChange.Add(-1 * time.Hour) - h.UpdateWortTemperature(20) - assert(false) -} diff --git a/internal/dal/db.go b/internal/dal/db.go index 870e7a6..e25a42b 100644 --- a/internal/dal/db.go +++ b/internal/dal/db.go @@ -1,6 +1,7 @@ package dal import ( + "context" "database/sql" _ "modernc.org/sqlite" @@ -22,15 +23,46 @@ func NewDAL(dsn string) (*DAL, error) { return dal, nil } -func (dal *DAL) Close() error { - return dal.db.Close() +func (p *DAL) Close() error { + return p.db.Close() } -func (dal *DAL) Initialize() error { - // TODO +func (p *DAL) InReadWriteTransaction(ctx context.Context, fn func(tx *sql.Tx) error) error { + tx, err := p.db.BeginTx(ctx, &sql.TxOptions{}) + if err != nil { + return err + } + + if err = fn(tx); err != nil { + if err2 := tx.Rollback(); err2 != nil { + return err2 + } + return err + } + + if err = tx.Commit(); err != nil { + return err + } + return nil } -func (dal *DAL) Begin() (*sql.Tx, error) { - return dal.db.Begin() +func (p *DAL) InReadOnlyTransaction(ctx context.Context, fn func(tx *sql.Tx) error) error { + tx, err := p.db.BeginTx(ctx, &sql.TxOptions{ReadOnly: true}) + if err != nil { + return err + } + + if err = fn(tx); err != nil { + if err2 := tx.Rollback(); err2 != nil { + return err2 + } + return err + } + + if err = tx.Commit(); err != nil { + return err + } + + return nil } diff --git a/internal/dal/db_migrations.go b/internal/dal/db_migrations.go index 8e32dc5..5a39ecb 100644 --- a/internal/dal/db_migrations.go +++ b/internal/dal/db_migrations.go @@ -1,5 +1,7 @@ package dal +// SQL migrations https://github.com/golang-migrate/migrate/tree/master/database/sqlite + import ( "embed" "net/http" diff --git a/internal/dal/readings.go b/internal/dal/readings.go index 70b53b3..bff9dac 100644 --- a/internal/dal/readings.go +++ b/internal/dal/readings.go @@ -56,9 +56,9 @@ func LoadTemperatureReadingsSince(ctx context.Context, tx *sql.Tx, since time.Ti return readings, nil } -func SaveTemperatureReading(ctx context.Context, tx *sql.Tx, sensor string, value int64) error { - q := "INSERT INTO temperature_reading (sensor, value) VALUES (?, ?)" - _, err := tx.ExecContext(ctx, q, sensor, value) +func SaveTemperatureReading(ctx context.Context, tx *sql.Tx, ts time.Time, sensor string, value int64) error { + q := "INSERT INTO temperature_reading (c_time, sensor, value) VALUES (?, ?, ?)" + _, err := tx.ExecContext(ctx, q, ts, sensor, value) return err } diff --git a/internal/hw/gpio.go b/internal/hw/gpio.go index e672362..747f638 100644 --- a/internal/hw/gpio.go +++ b/internal/hw/gpio.go @@ -6,7 +6,6 @@ import ( const ( pinCoolerPower = 20 - pinFanPower = 16 pinHeaterPower = 21 off = 0 @@ -16,7 +15,6 @@ const ( type Gpio struct { chip *gpiod.Chip coolerPower *gpiod.Line - fanPower *gpiod.Line heaterPower *gpiod.Line } @@ -34,11 +32,6 @@ func NewGpio() (*Gpio, error) { return nil, err } - p.fanPower, err = p.chip.RequestLine(pinFanPower, gpiod.AsOutput(1)) - if err != nil { - return nil, err - } - p.heaterPower, err = p.chip.RequestLine(pinHeaterPower, gpiod.AsOutput(1)) if err != nil { return nil, err @@ -49,11 +42,9 @@ func NewGpio() (*Gpio, error) { func (p *Gpio) Close() { p.coolerPower.SetValue(off) - p.fanPower.SetValue(off) p.heaterPower.SetValue(off) p.heaterPower.Close() - p.fanPower.Close() p.coolerPower.Close() p.chip.Close() } diff --git a/internal/metrics/prometheus.go b/internal/metrics/prometheus.go index f7f967f..abd53f4 100644 --- a/internal/metrics/prometheus.go +++ b/internal/metrics/prometheus.go @@ -5,11 +5,17 @@ import ( "github.com/prometheus/client_golang/prometheus/promauto" ) +const ( + MetricStateIdle float64 = iota + MetricStateCooling + MetricStateHeating +) + var ( State = promauto.NewGauge(prometheus.GaugeOpts{ Namespace: "fermentord", Subsystem: "state", - Name: "state_current", + Name: "current", Help: "The state of the fermentor. 0=idle, 1=cooling, 2=heating", }) diff --git a/pkg/daemon/wait.go b/pkg/daemon/wait.go new file mode 100644 index 0000000..4c78e31 --- /dev/null +++ b/pkg/daemon/wait.go @@ -0,0 +1,28 @@ +package daemon + +import ( + "os" + "os/signal" + "syscall" + + "github.com/rs/zerolog/log" +) + +var ( + Interrupt = os.Interrupt + Quit os.Signal = syscall.SIGQUIT + Terminate os.Signal = syscall.SIGTERM +) + +func WaitForSignal(sig ...os.Signal) { + ch := make(chan os.Signal, 1) + signal.Notify(ch, sig...) + s := <-ch + log.Debug(). + Str("signal", s.String()). + Msg("Signal caught") +} + +func WaitForExitSignal() { + WaitForSignal(Interrupt, Quit, Terminate) +} diff --git a/pkg/temperature/ds18b20.go b/pkg/temperature/ds18b20.go index 52045a3..005a109 100644 --- a/pkg/temperature/ds18b20.go +++ b/pkg/temperature/ds18b20.go @@ -1,70 +1,86 @@ package temperature +// Kernel documentation: https://www.kernel.org/doc/html/latest/w1/slaves/w1_therm.html + import ( "context" "errors" "io/ioutil" + "os" "strconv" "strings" "sync" "time" "git.joco.dk/sng/fermentord/internal/metrics" + "github.com/getsentry/sentry-go" "github.com/rs/zerolog/log" ) var ( - ErrReadSensor = errors.New("Failed to read sensor temperature") - - C chan *TemperatureReading - - sensors []string - mutex *sync.Mutex - ticker *time.Ticker + // ErrReadSensor indicates that the sensor could not be read + ErrReadSensor = errors.New("failed to read sensor") ) -func Initialize(readingInterval time.Duration) { - C = make(chan *TemperatureReading) +var ( + // C receives the temperature readings + C chan TemperatureReading + + // ConfigUpdates receives the list of sensors to read + ConfigUpdates chan []string + sensors []string +) + +func Initialize() { sensors = make([]string, 0) - mutex = &sync.Mutex{} - ticker = time.NewTicker(readingInterval) + C = make(chan TemperatureReading, 30) + ConfigUpdates = make(chan []string, 1) } -func SetSensors(newSensors []string) { - mutex.Lock() - sensors = newSensors - mutex.Unlock() -} - -func Serve(ctx context.Context, wg *sync.WaitGroup) { +func PollSensors(ctx context.Context, wg *sync.WaitGroup, readingInterval time.Duration) { defer wg.Done() + if readingInterval < 750*time.Millisecond { + log.Fatal(). + Dur("interval", readingInterval). + Msg("Reading interval must be at least 750 ms.") + } + + ticker := time.NewTicker(readingInterval) + defer ticker.Stop() + for { select { case <-ticker.C: + // Trigger a bulk read and wait 750ms for sensors to convert data. + triggerBulkRead() + time.Sleep(750 * time.Millisecond) + readSensors() + case c := <-ConfigUpdates: + sensors = c + case <-ctx.Done(): - ticker.Stop() + close(C) return } } } func readSensors() { - mutex.Lock() - defer mutex.Unlock() - for _, sensor := range sensors { start := time.Now() t, err := read(sensor) dur := time.Since(start).Seconds() if err != nil { + sentry.CaptureException(err) log.Error(). Err(err). Str("sensor", sensor). Msg("Error reading temperature sensor.") + continue } metrics.TemperatureSensorReadingDegreesCelcius. @@ -75,7 +91,7 @@ func readSensors() { WithLabelValues("sensor", sensor). Observe(dur) - C <- &TemperatureReading{ + C <- TemperatureReading{ Time: time.Now(), Sensor: sensor, MilliDegrees: t, @@ -83,6 +99,17 @@ func readSensors() { } } +func triggerBulkRead() error { + f, err := os.OpenFile("/sys/bus/w1/w1_bus_master/therm_bulk_read", os.O_APPEND, os.ModeAppend) + if err != nil { + return err + } + defer f.Close() + + _, err = f.WriteString("trigger\n") + return err +} + // read returns the temperature of the specified sensor in millidegrees celcius. func read(sensor string) (int64, error) { path := "/sys/bus/w1/devices/" + sensor + "/w1_slave"