New direction?

This commit is contained in:
Søren Rasmussen 2021-11-16 06:19:24 +01:00
parent bc6f62af23
commit 14ba6564d2
17 changed files with 591 additions and 558 deletions

65
cmd/fermentord/config.go Normal file
View file

@ -0,0 +1,65 @@
package main
import (
"git.joco.dk/sng/fermentord/internal/controllers"
"git.joco.dk/sng/fermentord/pkg/temperature"
"github.com/getsentry/sentry-go"
"github.com/rs/zerolog/log"
"github.com/spf13/viper"
)
type configuration struct {
controllers.ControllerConfig
HTTP struct {
Port int16 `mapstructure:"port"`
} `mapstructure:"http"`
DB struct {
Local struct {
DSN string `mapstructure:"dsn"`
} `mapstructure:"local"`
} `mapstructure:"db"`
}
func loadConfiguration() *configuration {
config := &configuration{}
viper.SetDefault("pid.kp", 2.0)
viper.SetDefault("pid.ki", 0.0001)
viper.SetDefault("pid.kd", 2.0)
viper.SetDefault("db.local.dsn", "/var/lib/fermentord/local.db")
viper.SetDefault("http.port", 8000)
viper.SetDefault("db.local.dsn", "./fermentord.db")
viper.AllowEmptyEnv(true)
viper.AutomaticEnv()
viper.AddConfigPath("/etc")
viper.SetConfigName("fermentord")
viper.SetConfigType("hcl")
if err := viper.ReadInConfig(); err != nil {
log.Debug().
Err(err).
Msg("Error loading configuration")
}
if err := viper.Unmarshal(&config); err != nil {
sentry.CaptureException(err)
log.Fatal().
Err(err).
Msg("Error unmarshalling configuration")
}
return config
}
func reloadConfiguration(config *configuration, ctrl *controllers.ChamberController) {
temperature.ConfigUpdates <- []string{
config.Sensor.Ambient,
config.Sensor.Chamber,
config.Sensor.Wort,
}
ctrl.ConfigUpdates <- config.ControllerConfig
}

27
cmd/fermentord/config.hcl Normal file
View file

@ -0,0 +1,27 @@
// fermentord
fermentation_temp = 15.0
delta_temp = 0.5
sensors {
wort = "asdf"
chamber = "sldkfj"
ambient = "skdjf"
}
limits {
min_chamber_temp = 5
min_cooler_runtime_secs = 300
max_cooler_runtime_secs = 86400
min_cooler_cooldown_secs = 300
}
pid {
kp = 2.0
ki = 0.0001
kd = 2.0
}
db "local" {
dsn = "/var/lib/fermentord/local.db"
}

View file

@ -0,0 +1,22 @@
package main
import (
"git.joco.dk/sng/fermentord/internal/controllers"
"git.joco.dk/sng/fermentord/internal/dal"
)
func convertChamberState(state controllers.ChamberState) dal.ChamberState {
switch state {
case controllers.ChamberStateIdle:
return dal.ChamberStateIdle
case controllers.ChamberStateCooling:
return dal.ChamberStateCooling
case controllers.ChamberStateHeating:
return dal.ChamberStateHeating
default:
return dal.ChamberStateIdle
}
}

73
cmd/fermentord/db.go Normal file
View file

@ -0,0 +1,73 @@
package main
import (
"context"
"database/sql"
"sync"
"time"
"git.joco.dk/sng/fermentord/internal/controllers"
"git.joco.dk/sng/fermentord/internal/dal"
"git.joco.dk/sng/fermentord/pkg/temperature"
"github.com/getsentry/sentry-go"
"github.com/rs/zerolog/log"
)
func migrateDB(config configuration) {
migrator, err := dal.NewMigrator(config.DB.Local.DSN)
if err != nil {
sentry.CaptureException(err)
log.Fatal().Err(err).Msg("Error initializing DB migrations")
}
migrator.Up()
serr, dberr := migrator.Close()
if serr != nil {
sentry.CaptureException(serr)
log.Fatal().Err(serr).Msg("DB migration source error")
}
if dberr != nil {
sentry.CaptureException(dberr)
log.Fatal().Err(dberr).Msg("DB migration database error")
}
}
func persistData(ctx context.Context, wg *sync.WaitGroup, db *dal.DAL, chState <-chan controllers.ChamberState, chTemp <-chan temperature.TemperatureReading) {
defer wg.Done()
for {
select {
case state, ok := <-chState:
if !ok {
break
}
ctxTo, cancel := context.WithTimeout(ctx, 10*time.Second)
err := db.InReadWriteTransaction(ctxTo, func(tx *sql.Tx) error {
return dal.SaveChamberState(ctxTo, tx, convertChamberState(state))
})
if err != nil {
sentry.CaptureException(err)
log.Error().Err(err).Msg("Error persisting state change")
}
cancel()
case t, ok := <-chTemp:
if !ok {
break
}
ctxTo, cancel := context.WithTimeout(ctx, 10*time.Second)
err := db.InReadWriteTransaction(ctxTo, func(tx *sql.Tx) error {
return dal.SaveTemperatureReading(ctx, tx, t.Time, t.Sensor, t.MilliDegrees)
})
if err != nil {
sentry.CaptureException(err)
log.Error().Err(err).Msg("Error persisting temperature reading")
}
cancel()
case <-ctx.Done():
return
}
}
}

View file

@ -2,11 +2,9 @@ package main
import ( import (
"context" "context"
"fmt"
"net/http" "net/http"
"os"
"os/signal"
"sync" "sync"
"syscall"
"time" "time"
"git.joco.dk/sng/fermentord/internal/api" "git.joco.dk/sng/fermentord/internal/api"
@ -14,6 +12,7 @@ import (
"git.joco.dk/sng/fermentord/internal/dal" "git.joco.dk/sng/fermentord/internal/dal"
"git.joco.dk/sng/fermentord/internal/hw" "git.joco.dk/sng/fermentord/internal/hw"
"git.joco.dk/sng/fermentord/internal/metrics" "git.joco.dk/sng/fermentord/internal/metrics"
"git.joco.dk/sng/fermentord/pkg/daemon"
"git.joco.dk/sng/fermentord/pkg/temperature" "git.joco.dk/sng/fermentord/pkg/temperature"
"github.com/fsnotify/fsnotify" "github.com/fsnotify/fsnotify"
"github.com/getsentry/sentry-go" "github.com/getsentry/sentry-go"
@ -22,93 +21,70 @@ import (
"github.com/spf13/viper" "github.com/spf13/viper"
) )
type configuration struct { func mainLoop(ctx context.Context, wg *sync.WaitGroup, db *dal.DAL, config *configuration) {
Db struct {
Dsn string `mapstructure:"dsn"`
} `mapstructure:"db"`
Temperature struct {
WortSensor string `mapstructure:"wort_sensor"`
ChamberSensor string `mapstructure:"chamber_sensor"`
AmbientSensor string `mapstructure:"ambient_sensor"`
} `mapstructure:"temperature"`
Hysteresis controllers.Config `mapstructure:"hysteresis"`
}
var (
config configuration
ctx context.Context
wg *sync.WaitGroup
hysteresis *controllers.Hysteresis
gpio *hw.Gpio
)
func bool2ChamberState(state bool) dal.ChamberState {
if state {
return dal.ChamberStateCooling
}
return dal.ChamberStateIdle
}
func mainLoop(db *dal.DAL, wg *sync.WaitGroup) {
defer wg.Done() defer wg.Done()
temperature.Initialize()
// Controller
chCtrlTemp := make(chan temperature.TemperatureReading, 10)
defer close(chCtrlTemp)
ctrl := controllers.NewChamberController("Chamber 1", config.ControllerConfig, chCtrlTemp)
// Configuration reload
viper.OnConfigChange(func(in fsnotify.Event) {
reloadConfiguration(config, ctrl)
})
viper.WatchConfig()
reloadConfiguration(config, ctrl)
gpio, err := hw.NewGpio()
if err != nil {
sentry.CaptureException(err)
log.Fatal().
Err(err).
Msg("Error initializing GPIO")
}
defer gpio.Close()
chDbTemp := make(chan temperature.TemperatureReading, 10)
chDbState := make(chan controllers.ChamberState, 10)
defer close(chDbTemp)
defer close(chDbState)
wg.Add(3)
go persistData(ctx, wg, db, chDbState, chDbTemp)
go ctrl.Run(ctx, wg)
go temperature.PollSensors(ctx, wg, 1*time.Second)
for { for {
select { select {
case reading := <-temperature.C:
chCtrlTemp <- reading
chDbTemp <- reading
case state := <-ctrl.C:
switch state {
case controllers.ChamberStateIdle:
metrics.State.Set(metrics.MetricStateIdle)
gpio.StopCooler()
gpio.StopHeater()
case controllers.ChamberStateCooling:
metrics.State.Set(metrics.MetricStateCooling)
gpio.StopHeater()
gpio.StartCooler()
case controllers.ChamberStateHeating:
metrics.State.Set(metrics.MetricStateHeating)
gpio.StopCooler()
gpio.StartHeater()
}
chDbState <- state
case <-ctx.Done(): case <-ctx.Done():
return return
case state := <-hysteresis.C:
ctx := context.Background()
if state {
metrics.State.Set(1)
//gpio.StartCooler()
} else {
metrics.State.Set(0)
//gpio.StopCooler()
}
dbCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
tx, err := db.Begin()
if err != nil {
sentry.CaptureException(err)
log.Error().Err(err).Msg("Failed to start DB transaction")
} else {
dal.SaveChamberState(dbCtx, tx, bool2ChamberState(state))
err = tx.Commit()
if err != nil {
sentry.CaptureException(err)
log.Error().Err(err).Msg("Failed to commit DB transaction")
}
}
cancel()
case reading := <-temperature.C:
ctx := context.Background()
switch reading.Sensor {
case config.Temperature.AmbientSensor:
hysteresis.UpdateAmbientTemperature(reading.Degrees())
case config.Temperature.ChamberSensor:
hysteresis.UpdateChamberTemperature(reading.Degrees())
case config.Temperature.WortSensor:
hysteresis.UpdateWortTemperature(reading.Degrees())
}
dbCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
tx, err := db.Begin()
if err != nil {
log.Error().Err(err).Msg("Failed to start DB transaction")
} else {
dal.SaveTemperatureReading(dbCtx, tx, reading.Sensor, reading.MilliDegrees)
err = tx.Commit()
if err != nil {
log.Error().Err(err).Msg("Failed to commit DB transaction")
}
}
cancel()
} }
} }
} }
@ -120,79 +96,32 @@ func main() {
TracesSampleRate: 0.01, TracesSampleRate: 0.01,
}) })
if err != nil { if err != nil {
log.Fatal().Err(err).Msg("Failed to initialize Sentry.") log.Fatal().
Err(err).
Msg("Failed to initialize Sentry.")
} }
defer sentry.Flush(10 * time.Second) defer sentry.Flush(10 * time.Second)
// Configuration // Configuration
viper.AllowEmptyEnv(true) config := loadConfiguration()
viper.AutomaticEnv()
viper.SetDefault("db.dsn", "./fermentord.db")
viper.AddConfigPath("/etc")
viper.SetConfigName("fermentord")
viper.SetConfigType("hcl")
viper.ReadInConfig()
viper.Unmarshal(&config)
ctxb := context.Background()
ctx, shutdown := context.WithCancel(ctxb)
wg = &sync.WaitGroup{}
temperature.Initialize(10 * time.Second)
hysteresis = controllers.NewHysteresis("Chamber 1")
reloadConfig := func() {
temperature.SetSensors([]string{
config.Temperature.AmbientSensor,
config.Temperature.ChamberSensor,
config.Temperature.WortSensor,
})
hysteresis.SetConfig(&config.Hysteresis)
}
viper.OnConfigChange(func(in fsnotify.Event) {
reloadConfig()
})
viper.WatchConfig()
reloadConfig()
// Database // Database
migrator, err := dal.NewMigrator(config.Db.Dsn) migrateDB(*config)
if err != nil {
log.Fatal().Err(err).Msg("Error initializing DB migrations")
}
migrator.Up()
serr, dberr := migrator.Close()
if serr != nil {
log.Fatal().Err(serr).Msg("DB migration source error")
}
if dberr != nil {
log.Fatal().Err(dberr).Msg("DB migration database error")
}
var db *dal.DAL var db *dal.DAL
if db, err := dal.NewDAL(config.Db.Dsn); err != nil { if db, err := dal.NewDAL(config.DB.Local.DSN); err != nil {
sentry.CaptureException(err)
log.Fatal().Err(err).Msg("Error connecting to DB") log.Fatal().Err(err).Msg("Error connecting to DB")
} else { } else {
db.Initialize()
defer db.Close() defer db.Close()
} }
// GPIO
log.Info().Msg("Initializing GPIO")
gpio, err := hw.NewGpio()
if err != nil {
log.Fatal().Err(err).Send()
}
defer gpio.Close()
// HTTP // HTTP
mux := http.NewServeMux() mux := http.NewServeMux()
srv := &http.Server{ srv := &http.Server{
Addr: ":8000", Addr: fmt.Sprintf(":%v", config.HTTP.Port),
Handler: mux, Handler: mux,
} }
api := api.NewAPI(db) api := api.NewAPI(db)
// Metrics // Metrics
@ -202,31 +131,32 @@ func main() {
mux.HandleFunc("/state-changes", api.GetStateChanges) mux.HandleFunc("/state-changes", api.GetStateChanges)
// Main // Main
wg.Add(2) ctxb := context.Background()
go temperature.Serve(ctx, wg) ctx, shutdown := context.WithCancel(ctxb)
go mainLoop(db, wg) wg := &sync.WaitGroup{}
wg.Add(1)
go mainLoop(ctx, wg, db, config)
go srv.ListenAndServe() go srv.ListenAndServe()
done := make(chan os.Signal, 1) daemon.WaitForExitSignal()
signal.Notify(done, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
<-done
go func() {
ctx, cancel := context.WithTimeout(ctxb, 10*time.Second)
defer cancel()
err := srv.Shutdown(ctx)
if err != nil {
log.Error().Err(err).Msg("Error shutting down HTTP server.")
}
}()
// Initiate graceful shutdown. // Initiate graceful shutdown.
wg.Add(1)
go shutdownHTTP(ctxb, wg, srv)
shutdown() shutdown()
// Wait for graceful shutdown.
wg.Wait() wg.Wait()
}
// TODO PID for heating
// TODO Data export func shutdownHTTP(ctx context.Context, wg *sync.WaitGroup, srv *http.Server) {
// TODO SQL migrations https://github.com/golang-migrate/migrate/tree/master/database/sqlite defer wg.Done()
ctx2, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
if err := srv.Shutdown(ctx2); err != nil {
sentry.CaptureException(err)
log.Error().
Err(err).
Msg("Error shutting down HTTP server.")
}
} }

View file

@ -2,6 +2,7 @@ package api
import ( import (
"context" "context"
"database/sql"
"encoding/json" "encoding/json"
"fmt" "fmt"
"net/http" "net/http"
@ -15,7 +16,7 @@ import (
type API struct { type API struct {
db *dal.DAL db *dal.DAL
config *controllers.Config config *controllers.ControllerConfig
configChangeCallback func() configChangeCallback func()
} }
@ -27,15 +28,14 @@ func NewAPI(db *dal.DAL) *API {
func (a *API) HealthCheck(w http.ResponseWriter, r *http.Request) { func (a *API) HealthCheck(w http.ResponseWriter, r *http.Request) {
defer sentry.Recover() defer sentry.Recover()
ctxb := context.Background()
tx, err := a.db.Begin() ctx, cancel := context.WithTimeout(ctxb, 5*time.Second)
if err != nil { defer cancel()
http.Error(w, "Unhealthy", http.StatusServiceUnavailable) err := a.db.InReadOnlyTransaction(ctx, func(tx *sql.Tx) error {
return _, err := tx.Query("SELECT version()")
} return err
})
var version string
err = tx.QueryRow("SELECT version()").Scan(&version)
if err != nil { if err != nil {
http.Error(w, "Unhealthy", http.StatusServiceUnavailable) http.Error(w, "Unhealthy", http.StatusServiceUnavailable)
return return
@ -57,13 +57,7 @@ func (a *API) GetConfig(w http.ResponseWriter, r *http.Request) {
func (a *API) GetTemperatures(w http.ResponseWriter, r *http.Request) { func (a *API) GetTemperatures(w http.ResponseWriter, r *http.Request) {
defer sentry.Recover() defer sentry.Recover()
ctxb := context.Background()
ctx := context.Background()
tx, err := a.db.Begin()
defer tx.Rollback()
if err != nil {
log.Panic().Err(err).Msg("Failed to start DB transaction.")
}
keys, ok := r.URL.Query()["since"] keys, ok := r.URL.Query()["since"]
if !ok || len(keys[0]) < 1 { if !ok || len(keys[0]) < 1 {
@ -77,30 +71,32 @@ func (a *API) GetTemperatures(w http.ResponseWriter, r *http.Request) {
return return
} }
dbCtx, cancel := context.WithTimeout(ctx, 30*time.Second) ctx, cancel := context.WithTimeout(ctxb, 30*time.Second)
defer cancel() defer cancel()
readings, err := dal.LoadTemperatureReadingsSince(dbCtx, tx, since) var data []byte
err = a.db.InReadOnlyTransaction(ctx, func(tx *sql.Tx) error {
readings, err := dal.LoadTemperatureReadingsSince(ctx, tx, since)
if err != nil {
return err
}
data, err = json.MarshalIndent(readings, "", " ")
if err != nil {
return err
}
return nil
})
if err != nil { if err != nil {
log.Panic().Err(err).Msg("Failed to load temperature readings from DB.") log.Panic().Err(err).Msg("Failed to load temperature readings from DB.")
} }
tx.Commit()
data, err := json.MarshalIndent(readings, "", " ")
if err != nil {
log.Panic().Err(err).Msg("Failed to serialize temperature readings to JSON.")
}
fmt.Fprintf(w, string(data)) fmt.Fprintf(w, string(data))
} }
func (a *API) GetStateChanges(w http.ResponseWriter, r *http.Request) { func (a *API) GetStateChanges(w http.ResponseWriter, r *http.Request) {
defer sentry.Recover() defer sentry.Recover()
ctxb := context.Background()
ctx := context.Background()
tx, err := a.db.Begin()
if err != nil {
log.Panic().Err(err).Msg("Failed to start DB transaction.")
}
keys, ok := r.URL.Query()["since"] keys, ok := r.URL.Query()["since"]
if !ok || len(keys[0]) < 1 { if !ok || len(keys[0]) < 1 {
@ -114,15 +110,23 @@ func (a *API) GetStateChanges(w http.ResponseWriter, r *http.Request) {
return return
} }
dbCtx, cancel := context.WithTimeout(ctx, 30*time.Second) ctx, cancel := context.WithTimeout(ctxb, 30*time.Second)
defer cancel() defer cancel()
readings, err := dal.LoadChamberStateChangesSince(dbCtx, tx, since) var data []byte
err = a.db.InReadOnlyTransaction(ctx, func(tx *sql.Tx) error {
readings, err := dal.LoadChamberStateChangesSince(ctx, tx, since)
if err != nil { if err != nil {
log.Panic().Err(err).Msg("Failed to load temperature readings from DB.") return err
} }
tx.Commit()
data, err := json.MarshalIndent(readings, "", " ") data, err = json.MarshalIndent(readings, "", " ")
if err != nil {
return err
}
return nil
})
if err != nil { if err != nil {
log.Panic().Err(err).Msg("Failed to serialize temperature readings to JSON.") log.Panic().Err(err).Msg("Failed to serialize temperature readings to JSON.")
} }

View file

@ -1,9 +1,11 @@
package controllers package controllers
import ( import (
"context"
"sync" "sync"
"time" "time"
"git.joco.dk/sng/fermentord/pkg/temperature"
"github.com/rs/zerolog/log" "github.com/rs/zerolog/log"
) )
@ -16,13 +18,14 @@ const (
) )
type ChamberController struct { type ChamberController struct {
config *Config ConfigUpdates chan ControllerConfig
config ControllerConfig
// Current state. // Current state.
chamberState ChamberState chamberState ChamberState
lastChamberStateChange time.Time lastChamberStateChange time.Time
lastCoolerStateChange time.Time
C chan ChamberState C chan ChamberState
mutex *sync.Mutex
name string name string
@ -31,48 +34,139 @@ type ChamberController struct {
chamberTemperature float64 chamberTemperature float64
wortTemperature float64 wortTemperature float64
chTemp <-chan temperature.TemperatureReading
pid *PIDController pid *PIDController
initialized bool
} }
func NewChamberController(name string, config *Config) *ChamberController { func NewChamberController(name string, config ControllerConfig, chTemp <-chan temperature.TemperatureReading) *ChamberController {
return &ChamberController{ return &ChamberController{
C: make(chan ChamberState), C: make(chan ChamberState),
name: name, name: name,
config: config, config: config,
pid: NewPIDController(config.PID.Kp, config.PID.Ki, config.PID.Kd), pid: NewPIDController(config.PID.Kp, config.PID.Ki, config.PID.Kd),
mutex: &sync.Mutex{}, chTemp: chTemp,
chamberState: ChamberStateIdle,
ConfigUpdates: make(chan ControllerConfig, 1),
} }
} }
func (p *ChamberController) Run() { func (p *ChamberController) Run(ctx context.Context, wg *sync.WaitGroup) {
defer wg.Done()
ticker := time.NewTicker(1 * time.Second)
for { for {
select {
case <-ticker.C:
state := p.computeChamberState()
p.setChamberState(state)
case temp := <-p.chTemp:
p.setTemperature(temp)
case c := <-p.ConfigUpdates:
p.config = c
case <-ctx.Done():
ticker.Stop()
p.setChamberState(ChamberStateIdle)
return
}
}
}
func (p *ChamberController) setTemperature(t temperature.TemperatureReading) {
switch t.Sensor {
case p.config.Sensor.Ambient:
p.ambientTemperature = t.Degrees()
case p.config.Sensor.Chamber:
p.chamberTemperature = t.Degrees()
case p.config.Sensor.Wort:
p.wortTemperature = t.Degrees()
default:
log.Warn().
Str("sensor", t.Sensor).
Int64("temp", t.MilliDegrees).
Msg("Unknown sensor")
}
}
func (p *ChamberController) setChamberState(state ChamberState) {
if state == p.chamberState {
return
}
log.Info().
Int("from", int(p.chamberState)).
Int("to", int(state)).
Msgf("State changed")
if p.chamberState == ChamberStateCooling || state == ChamberStateCooling {
p.lastCoolerStateChange = time.Now()
}
p.chamberState = state
p.lastChamberStateChange = time.Now()
p.C <- state
}
func (p *ChamberController) computeChamberState() ChamberState {
offset := p.pid.Compute(p.wortTemperature, p.config.FermentationTemperature) offset := p.pid.Compute(p.wortTemperature, p.config.FermentationTemperature)
chamberTargetTemp := p.config.FermentationTemperature + offset chamberTargetTemp := p.config.FermentationTemperature + offset
log.Debug().Float64("chamber_target_temp", chamberTargetTemp).Send() log.Debug().
Float64("amb_temp", p.ambientTemperature).
Float64("chamber_temp", p.chamberTemperature).
Float64("wort_temp", p.wortTemperature).
Float64("setpoint_temp", p.config.FermentationTemperature).
Float64("offset", offset).
Float64("chamber_target_temp", chamberTargetTemp).
Float64("kp", p.pid.kp).
Float64("ki", p.pid.ki).
Float64("kd", p.pid.kd).
Send()
// TODO Merge hysteresis algorithm into this one. runtimeSecs := time.Since(p.lastChamberStateChange).Seconds()
if !p.initialized { if p.chamberState == ChamberStateCooling {
// heater off // Ensure compressor min. runtime
// cooler off if runtimeSecs < p.config.Limits.MinCoolerRuntimeSecs {
continue return p.chamberState
} }
if p.wortTemperature < p.config.FermentationTemperature && p.chamberTemperature < chamberTargetTemp { // Limit compressor runtime
// heater on if runtimeSecs > p.config.Limits.MaxCoolerRuntimeSecs {
} else if p.wortTemperature >= p.config.FermentationTemperature || p.chamberTemperature >= chamberTargetTemp { return ChamberStateIdle
// heater off
} else {
// heater off
} }
if p.wortTemperature > p.config.FermentationTemperature && p.chamberTemperature > chamberTargetTemp { // Limit chamber min. temp.
// cooler on if p.chamberTemperature < p.config.Limits.MinChamberTemperature {
} else if p.wortTemperature <= p.config.FermentationTemperature || p.chamberTemperature <= chamberTargetTemp { return ChamberStateIdle
// cooler off
} else {
// cooler off
} }
} }
var next ChamberState
heater := p.wortTemperature < p.config.FermentationTemperature-p.config.DeltaTemperature && p.chamberTemperature < chamberTargetTemp-p.config.DeltaTemperature
cooler := p.wortTemperature > p.config.FermentationTemperature+p.config.DeltaTemperature && p.chamberTemperature > chamberTargetTemp+p.config.DeltaTemperature
if cooler && heater {
// This should not happen!
next = ChamberStateIdle
} else if !cooler && !heater {
next = ChamberStateIdle
} else if cooler {
next = ChamberStateCooling
} else if heater {
next = ChamberStateHeating
}
// Ensure compressor cooldown
if p.chamberState != ChamberStateCooling && next == ChamberStateCooling && time.Since(p.lastCoolerStateChange).Seconds() < p.config.Limits.MinCoolerCooldownSecs {
return ChamberStateIdle
}
return next
} }

View file

@ -1,12 +1,22 @@
package controllers package controllers
type Config struct { type ControllerConfig struct {
Sensor struct {
Wort string `mapstructure:"wort"`
Chamber string `mapstructure:"chamber"`
Ambient string `mapstructure:"ambient"`
} `mapstructure:"sensors"`
FermentationTemperature float64 `mapstructure:"fermentation_temp"` FermentationTemperature float64 `mapstructure:"fermentation_temp"`
MaxWortDelta float64 `mapstructure:"max_wort_delta"` DeltaTemperature float64 `mapstructure:"delta_temp"`
Limits struct {
MinChamberTemperature float64 `mapstructure:"min_chamber_temp"` MinChamberTemperature float64 `mapstructure:"min_chamber_temp"`
MinCoolerRuntimeSecs float64 `mapstructure:"min_cooler_runtime_secs"` // 900 MinCoolerRuntimeSecs float64 `mapstructure:"min_cooler_runtime_secs"` // 900
MaxCoolerRuntimeSecs float64 `mapstructure:"max_cooler_runtime_secs"` // 86400 MaxCoolerRuntimeSecs float64 `mapstructure:"max_cooler_runtime_secs"` // 86400
MinCoolerCooldownSecs float64 `mapstructure:"min_cooler_cooldown_secs"` // 900 MinCoolerCooldownSecs float64 `mapstructure:"min_cooler_cooldown_secs"` // 900
} `mapstructure:"limits"`
PID struct { PID struct {
Kp float64 `mapstructure:"kp"` // 2.0 Kp float64 `mapstructure:"kp"` // 2.0
Ki float64 `mapstructure:"ki"` // 0.0001 Ki float64 `mapstructure:"ki"` // 0.0001

View file

@ -1,226 +0,0 @@
package controllers
import (
"sync"
"time"
"github.com/rs/zerolog/log"
)
// Hysteresis will only run when the ambient temperature is above the
// fermentation temperature. The compressor will run until the wort
// is cooled to FermentationTemperature, the chamber is cooled to
// MinChamberTemperature or the compressor has run for MaxCoolerRuntimeSecs.
// The compressor will run for min. MinCoolerRuntimeSecs before switching off,
// regardless of temperatures. Once turned off, the compressor will not be
// turned on before MinCoolerCooldownSecs has expired, regardless of temperatures.
type Hysteresis struct {
config *Config
// Current state.
coolerState bool
lastCoolerStateChange time.Time
C chan bool
mutex *sync.Mutex
name string
// Current temperature readings.
ambientTemperature float64
chamberTemperature float64
wortTemperature float64
}
func NewHysteresis(name string) *Hysteresis {
return &Hysteresis{
C: make(chan bool),
name: name,
mutex: &sync.Mutex{},
}
}
func (h *Hysteresis) SetConfig(newConfig *Config) {
h.lock()
h.config = newConfig
h.unlock()
h.update()
}
func (h *Hysteresis) UpdateAmbientTemperature(value float64) {
h.lock()
h.ambientTemperature = value
h.unlock()
h.update()
}
func (h *Hysteresis) UpdateChamberTemperature(value float64) {
h.lock()
h.chamberTemperature = value
h.unlock()
h.update()
}
func (h *Hysteresis) UpdateWortTemperature(value float64) {
h.lock()
h.wortTemperature = value
h.unlock()
h.update()
}
func (h *Hysteresis) lock() {
h.mutex.Lock()
}
func (h *Hysteresis) unlock() {
h.mutex.Unlock()
}
func (h *Hysteresis) GetCoolerState() bool {
return h.coolerState
}
func (h *Hysteresis) setState(state bool) {
h.lock()
if state == h.coolerState {
h.unlock()
return
}
log.Debug().Bool("state", state).Msg("Setting state")
h.coolerState = state
h.lastCoolerStateChange = time.Now()
h.unlock()
h.C <- state
}
func (h *Hysteresis) update() {
// Compressor runtime has highest priority.
lastStateChangeAgeSecs := time.Since(h.lastCoolerStateChange).Seconds()
if h.coolerState {
// Keep the cooler running until min. runtime is reached.
if lastStateChangeAgeSecs < h.config.MinCoolerRuntimeSecs {
log.Debug().
Str("name", h.name).
Float64("runtime", lastStateChangeAgeSecs).
Float64("min_runtime", h.config.MinCoolerRuntimeSecs).
Bool("state", h.coolerState).
Msg("Cooler kept running as it's runtime threshold is not yet reached.")
return
}
// Stop the cooler if it's runtime exceeds max.
if lastStateChangeAgeSecs > h.config.MaxCoolerRuntimeSecs {
log.Info().
Str("name", h.name).
Float64("runtime", lastStateChangeAgeSecs).
Float64("max_runtime", h.config.MaxCoolerRuntimeSecs).
Bool("state", false).
Msg("Cooler stopped as it's runtime exceeds max. threshold.")
h.setState(false)
return
}
} else {
// Keep the cooler off until min. cooldown time is reached.
if lastStateChangeAgeSecs < h.config.MinCoolerCooldownSecs {
log.Debug().
Str("name", h.name).
Float64("runtime", lastStateChangeAgeSecs).
Float64("min_cooldown", h.config.MinCoolerCooldownSecs).
Bool("state", h.coolerState).
Msg("Cooler kept off as it's cooldown time threshold is not yet reached.")
return
}
}
// Do not allow the cooler to run when the ambient temperature is below
// the fermentation temperature.
if h.ambientTemperature < h.config.FermentationTemperature {
if h.coolerState {
log.Info().
Str("name", h.name).
Float64("ambient_temp", h.ambientTemperature).
Float64("fermentation_temp", h.config.FermentationTemperature).
Msg("Turn off cooler as ambient temperature is less than fermentation temperature.")
h.setState(false)
} else {
log.Debug().
Str("name", h.name).
Float64("ambient_temp", h.ambientTemperature).
Float64("fermentation_temp", h.config.FermentationTemperature).
Msg("Cooler kept off as ambient temperature is less than fermentation temperature.")
}
return
}
// Do not allow the cooler to run if the chamber temperature is below
// minimum.
if h.chamberTemperature <= h.config.MinChamberTemperature {
if h.coolerState {
log.Info().
Str("name", h.name).
Float64("chamber_temp", h.chamberTemperature).
Float64("min_chamber_temp", h.config.MinChamberTemperature).
Msg("Cooler turned off as chamber temperature dropped below threshold.")
h.setState(false)
} else {
log.Debug().
Str("name", h.name).
Float64("ambient_temp", h.ambientTemperature).
Float64("fermentation_temp", h.config.FermentationTemperature).
Msg("Cooler kept off as chamber temperature is below threshold.")
}
return
}
// Keep the cooler stopped until the wort delta temperature reaches it's threshold.
delta := h.wortTemperature - h.config.FermentationTemperature
if h.coolerState {
// Keep the cooler running until the wort reaches the desired temp.
if delta <= 0 {
// TODO Investigate how much the wort temp. drops below fermentation temp.
// TODO A threshold may be needed.
log.Info().
Str("name", h.name).
Float64("wort_temp", h.wortTemperature).
Float64("fermentation_temp", h.config.FermentationTemperature).
Float64("delta", delta).
Float64("max_wort_delta", h.config.MaxWortDelta).
Msg("Cooler stopped as wort is cooled down to fermentation temp.")
h.setState(false)
return
} else {
log.Debug().
Str("name", h.name).
Float64("wort_temp", h.wortTemperature).
Float64("fermentation_temp", h.config.FermentationTemperature).
Float64("delta", delta).
Float64("max_wort_delta", h.config.MaxWortDelta).
Msg("Cooler kept running as the wort has not yet reached fermentation temp.")
}
} else {
// Start the cooler when delta exceeds threshold.
if delta > h.config.MaxWortDelta {
log.Info().
Str("name", h.name).
Float64("wort_temp", h.wortTemperature).
Float64("fermentation_temp", h.config.FermentationTemperature).
Float64("delta", delta).
Float64("max_wort_delta", h.config.MaxWortDelta).
Msg("Cooler started as wort delta exceeds threshold.")
h.setState(true)
return
} else {
log.Debug().
Str("name", h.name).
Float64("wort_temp", h.wortTemperature).
Float64("fermentation_temp", h.config.FermentationTemperature).
Float64("delta", delta).
Float64("max_wort_delta", h.config.MaxWortDelta).
Msg("Cooler kept off as delta does not yet exceed threshold.")
// TODO If this is reached, then we can lower the delta as min. cooldown is reached.
// TODO This is relevant for PID.
}
}
}

View file

@ -1,52 +0,0 @@
package controllers
import (
"testing"
"time"
)
func discard(h *Hysteresis) {
for {
<-h.C
}
}
func TestHysteresisNormalCooling(t *testing.T) {
c := &Config{
FermentationTemperature: 20,
MaxWortDelta: 0.5,
MinChamberTemperature: 2,
MinCoolerRuntimeSecs: 300,
MaxCoolerRuntimeSecs: 86400,
MinCoolerCooldownSecs: 300,
}
h := NewHysteresis("test")
h.config = c
h.ambientTemperature = 25
h.chamberTemperature = 22
h.wortTemperature = 20
go discard(h)
h.update()
assert := func(expectedState bool) {
actualState := h.GetCoolerState()
if actualState != expectedState {
t.Errorf("Expected cooler state %v, but got %v", expectedState, actualState)
}
}
assert(false)
h.lastCoolerStateChange = h.lastCoolerStateChange.Add(-1 * time.Hour)
h.UpdateWortTemperature(20.5)
assert(false)
h.lastCoolerStateChange = h.lastCoolerStateChange.Add(-1 * time.Hour)
h.UpdateWortTemperature(20.6)
assert(true)
h.lastCoolerStateChange = h.lastCoolerStateChange.Add(-1 * time.Hour)
h.UpdateWortTemperature(20)
assert(false)
}

View file

@ -1,6 +1,7 @@
package dal package dal
import ( import (
"context"
"database/sql" "database/sql"
_ "modernc.org/sqlite" _ "modernc.org/sqlite"
@ -22,15 +23,46 @@ func NewDAL(dsn string) (*DAL, error) {
return dal, nil return dal, nil
} }
func (dal *DAL) Close() error { func (p *DAL) Close() error {
return dal.db.Close() return p.db.Close()
}
func (p *DAL) InReadWriteTransaction(ctx context.Context, fn func(tx *sql.Tx) error) error {
tx, err := p.db.BeginTx(ctx, &sql.TxOptions{})
if err != nil {
return err
}
if err = fn(tx); err != nil {
if err2 := tx.Rollback(); err2 != nil {
return err2
}
return err
}
if err = tx.Commit(); err != nil {
return err
} }
func (dal *DAL) Initialize() error {
// TODO
return nil return nil
} }
func (dal *DAL) Begin() (*sql.Tx, error) { func (p *DAL) InReadOnlyTransaction(ctx context.Context, fn func(tx *sql.Tx) error) error {
return dal.db.Begin() tx, err := p.db.BeginTx(ctx, &sql.TxOptions{ReadOnly: true})
if err != nil {
return err
}
if err = fn(tx); err != nil {
if err2 := tx.Rollback(); err2 != nil {
return err2
}
return err
}
if err = tx.Commit(); err != nil {
return err
}
return nil
} }

View file

@ -1,5 +1,7 @@
package dal package dal
// SQL migrations https://github.com/golang-migrate/migrate/tree/master/database/sqlite
import ( import (
"embed" "embed"
"net/http" "net/http"

View file

@ -56,9 +56,9 @@ func LoadTemperatureReadingsSince(ctx context.Context, tx *sql.Tx, since time.Ti
return readings, nil return readings, nil
} }
func SaveTemperatureReading(ctx context.Context, tx *sql.Tx, sensor string, value int64) error { func SaveTemperatureReading(ctx context.Context, tx *sql.Tx, ts time.Time, sensor string, value int64) error {
q := "INSERT INTO temperature_reading (sensor, value) VALUES (?, ?)" q := "INSERT INTO temperature_reading (c_time, sensor, value) VALUES (?, ?, ?)"
_, err := tx.ExecContext(ctx, q, sensor, value) _, err := tx.ExecContext(ctx, q, ts, sensor, value)
return err return err
} }

View file

@ -6,7 +6,6 @@ import (
const ( const (
pinCoolerPower = 20 pinCoolerPower = 20
pinFanPower = 16
pinHeaterPower = 21 pinHeaterPower = 21
off = 0 off = 0
@ -16,7 +15,6 @@ const (
type Gpio struct { type Gpio struct {
chip *gpiod.Chip chip *gpiod.Chip
coolerPower *gpiod.Line coolerPower *gpiod.Line
fanPower *gpiod.Line
heaterPower *gpiod.Line heaterPower *gpiod.Line
} }
@ -34,11 +32,6 @@ func NewGpio() (*Gpio, error) {
return nil, err return nil, err
} }
p.fanPower, err = p.chip.RequestLine(pinFanPower, gpiod.AsOutput(1))
if err != nil {
return nil, err
}
p.heaterPower, err = p.chip.RequestLine(pinHeaterPower, gpiod.AsOutput(1)) p.heaterPower, err = p.chip.RequestLine(pinHeaterPower, gpiod.AsOutput(1))
if err != nil { if err != nil {
return nil, err return nil, err
@ -49,11 +42,9 @@ func NewGpio() (*Gpio, error) {
func (p *Gpio) Close() { func (p *Gpio) Close() {
p.coolerPower.SetValue(off) p.coolerPower.SetValue(off)
p.fanPower.SetValue(off)
p.heaterPower.SetValue(off) p.heaterPower.SetValue(off)
p.heaterPower.Close() p.heaterPower.Close()
p.fanPower.Close()
p.coolerPower.Close() p.coolerPower.Close()
p.chip.Close() p.chip.Close()
} }

View file

@ -5,11 +5,17 @@ import (
"github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/client_golang/prometheus/promauto"
) )
const (
MetricStateIdle float64 = iota
MetricStateCooling
MetricStateHeating
)
var ( var (
State = promauto.NewGauge(prometheus.GaugeOpts{ State = promauto.NewGauge(prometheus.GaugeOpts{
Namespace: "fermentord", Namespace: "fermentord",
Subsystem: "state", Subsystem: "state",
Name: "state_current", Name: "current",
Help: "The state of the fermentor. 0=idle, 1=cooling, 2=heating", Help: "The state of the fermentor. 0=idle, 1=cooling, 2=heating",
}) })

28
pkg/daemon/wait.go Normal file
View file

@ -0,0 +1,28 @@
package daemon
import (
"os"
"os/signal"
"syscall"
"github.com/rs/zerolog/log"
)
var (
Interrupt = os.Interrupt
Quit os.Signal = syscall.SIGQUIT
Terminate os.Signal = syscall.SIGTERM
)
func WaitForSignal(sig ...os.Signal) {
ch := make(chan os.Signal, 1)
signal.Notify(ch, sig...)
s := <-ch
log.Debug().
Str("signal", s.String()).
Msg("Signal caught")
}
func WaitForExitSignal() {
WaitForSignal(Interrupt, Quit, Terminate)
}

View file

@ -1,70 +1,86 @@
package temperature package temperature
// Kernel documentation: https://www.kernel.org/doc/html/latest/w1/slaves/w1_therm.html
import ( import (
"context" "context"
"errors" "errors"
"io/ioutil" "io/ioutil"
"os"
"strconv" "strconv"
"strings" "strings"
"sync" "sync"
"time" "time"
"git.joco.dk/sng/fermentord/internal/metrics" "git.joco.dk/sng/fermentord/internal/metrics"
"github.com/getsentry/sentry-go"
"github.com/rs/zerolog/log" "github.com/rs/zerolog/log"
) )
var ( var (
ErrReadSensor = errors.New("Failed to read sensor temperature") // ErrReadSensor indicates that the sensor could not be read
ErrReadSensor = errors.New("failed to read sensor")
C chan *TemperatureReading
sensors []string
mutex *sync.Mutex
ticker *time.Ticker
) )
func Initialize(readingInterval time.Duration) { var (
C = make(chan *TemperatureReading) // C receives the temperature readings
C chan TemperatureReading
// ConfigUpdates receives the list of sensors to read
ConfigUpdates chan []string
sensors []string
)
func Initialize() {
sensors = make([]string, 0) sensors = make([]string, 0)
mutex = &sync.Mutex{} C = make(chan TemperatureReading, 30)
ticker = time.NewTicker(readingInterval) ConfigUpdates = make(chan []string, 1)
} }
func SetSensors(newSensors []string) { func PollSensors(ctx context.Context, wg *sync.WaitGroup, readingInterval time.Duration) {
mutex.Lock()
sensors = newSensors
mutex.Unlock()
}
func Serve(ctx context.Context, wg *sync.WaitGroup) {
defer wg.Done() defer wg.Done()
if readingInterval < 750*time.Millisecond {
log.Fatal().
Dur("interval", readingInterval).
Msg("Reading interval must be at least 750 ms.")
}
ticker := time.NewTicker(readingInterval)
defer ticker.Stop()
for { for {
select { select {
case <-ticker.C: case <-ticker.C:
// Trigger a bulk read and wait 750ms for sensors to convert data.
triggerBulkRead()
time.Sleep(750 * time.Millisecond)
readSensors() readSensors()
case c := <-ConfigUpdates:
sensors = c
case <-ctx.Done(): case <-ctx.Done():
ticker.Stop() close(C)
return return
} }
} }
} }
func readSensors() { func readSensors() {
mutex.Lock()
defer mutex.Unlock()
for _, sensor := range sensors { for _, sensor := range sensors {
start := time.Now() start := time.Now()
t, err := read(sensor) t, err := read(sensor)
dur := time.Since(start).Seconds() dur := time.Since(start).Seconds()
if err != nil { if err != nil {
sentry.CaptureException(err)
log.Error(). log.Error().
Err(err). Err(err).
Str("sensor", sensor). Str("sensor", sensor).
Msg("Error reading temperature sensor.") Msg("Error reading temperature sensor.")
continue
} }
metrics.TemperatureSensorReadingDegreesCelcius. metrics.TemperatureSensorReadingDegreesCelcius.
@ -75,7 +91,7 @@ func readSensors() {
WithLabelValues("sensor", sensor). WithLabelValues("sensor", sensor).
Observe(dur) Observe(dur)
C <- &TemperatureReading{ C <- TemperatureReading{
Time: time.Now(), Time: time.Now(),
Sensor: sensor, Sensor: sensor,
MilliDegrees: t, MilliDegrees: t,
@ -83,6 +99,17 @@ func readSensors() {
} }
} }
func triggerBulkRead() error {
f, err := os.OpenFile("/sys/bus/w1/w1_bus_master/therm_bulk_read", os.O_APPEND, os.ModeAppend)
if err != nil {
return err
}
defer f.Close()
_, err = f.WriteString("trigger\n")
return err
}
// read returns the temperature of the specified sensor in millidegrees celcius. // read returns the temperature of the specified sensor in millidegrees celcius.
func read(sensor string) (int64, error) { func read(sensor string) (int64, error) {
path := "/sys/bus/w1/devices/" + sensor + "/w1_slave" path := "/sys/bus/w1/devices/" + sensor + "/w1_slave"