diff --git a/cmd/fermentord/gpio.go b/cmd/fermentord/gpio.go new file mode 100644 index 0000000..94e7b10 --- /dev/null +++ b/cmd/fermentord/gpio.go @@ -0,0 +1,31 @@ +package main + +import ( + "log" + + "git.joco.dk/sng/fermentord/internal/controllers" + "git.joco.dk/sng/fermentord/internal/hw" + "git.joco.dk/sng/fermentord/internal/metrics" +) + +func gpioSetState(state controllers.ChamberState, gpio *hw.Gpio) { + switch state { + case controllers.ChamberStateIdle: + log.Printf("Setting chamber state idle") + metrics.State.Set(metrics.MetricStateIdle) + gpio.StopCooler() + gpio.StopHeater() + + case controllers.ChamberStateCooling: + log.Printf("Setting chamber state cooling") + metrics.State.Set(metrics.MetricStateCooling) + gpio.StopHeater() + gpio.StartCooler() + + case controllers.ChamberStateHeating: + log.Printf("Setting chamber state heating") + metrics.State.Set(metrics.MetricStateHeating) + gpio.StopCooler() + gpio.StartHeater() + } +} diff --git a/cmd/fermentord/http.go b/cmd/fermentord/http.go new file mode 100644 index 0000000..aceab2a --- /dev/null +++ b/cmd/fermentord/http.go @@ -0,0 +1,24 @@ +package main + +import ( + "context" + "log" + "net/http" + "sync" + "time" + + "github.com/getsentry/sentry-go" +) + +func shutdownHTTP(ctx context.Context, wg *sync.WaitGroup, srv *http.Server) { + hub := sentry.CurrentHub().Clone() + defer hub.Flush(10 * time.Second) + defer wg.Done() + + ctx2, cancel := context.WithTimeout(ctx, 10*time.Second) + defer cancel() + if err := srv.Shutdown(ctx2); err != nil { + hub.CaptureException(err) + log.Printf("Error shutting down HTTP server: %v", err) + } +} diff --git a/cmd/fermentord/loop.go b/cmd/fermentord/loop.go new file mode 100644 index 0000000..63f91ea --- /dev/null +++ b/cmd/fermentord/loop.go @@ -0,0 +1,119 @@ +package main + +import ( + "context" + "encoding/json" + "log" + "sync" + "time" + + "git.joco.dk/sng/fermentord/internal/configuration" + "git.joco.dk/sng/fermentord/internal/controllers" + "git.joco.dk/sng/fermentord/internal/dwingest" + "git.joco.dk/sng/fermentord/internal/hw" + "git.joco.dk/sng/fermentord/internal/lcd" + "git.joco.dk/sng/fermentord/pkg/temperature" + "git.joco.dk/sng/fermentord/pkg/tilt" + "github.com/fsnotify/fsnotify" + "github.com/getsentry/sentry-go" + "github.com/nats-io/nats.go" + "github.com/spf13/viper" +) + +func mainLoop(ctx context.Context, wg *sync.WaitGroup, js nats.JetStream, config *configuration.Configuration) { + hub := sentry.CurrentHub().Clone() + defer hub.Flush(10 * time.Second) + defer wg.Done() + + // Display + display, err := lcd.NewLCD() + if err != nil { + hub.CaptureException(err) + log.Fatal(err) + } + defer display.Close() + + // Controller + ctrl := controllers.NewChamberController("Chamber 1", *config) + + // NATS + ingest := dwingest.NewDWIngest() + + // Configuration reload + loadConfiguration := func() { + temperature.ConfigUpdate <- *config + ctrl.ConfigUpdates <- *config + display.SetSetpointTemp(config.FermentationTemperature) + } + + viper.OnConfigChange(func(in fsnotify.Event) { + loadConfiguration() + }) + loadConfiguration() + viper.WatchConfig() + + gpio, err := hw.NewGpio() + if err != nil { + hub.CaptureException(err) + log.Fatal(err) + } + defer gpio.Close() + + wg.Add(5) + go display.Run(ctx, wg) + go ctrl.Run(ctx, wg) + go ingest.Run(ctx, wg, js, config) + go temperature.PollSensors(ctx, wg, 1*time.Second, config.Sensors.Weight) + go tilt.PollSensors(ctx, wg, 1*time.Minute, 20*time.Second) + + for { + select { + case reading := <-temperature.Reading: + ctrl.SetTemperature(reading) + ingest.AddReading(reading) + display.SetTemperature(reading) + + case ev := <-gpio.C: + var evs string + + switch ev { + case hw.DoorClosed: + gpio.LightsOff() + gpio.StartFan() + ctrl.Resume() + evs = "DOOR_CLOSED" + + case hw.DoorOpened: + ctrl.Pause() + gpio.LightsOn() + gpio.StopFan() + evs = "DOOR_OPENED" + } + + b, err := json.Marshal(map[string]interface{}{ + "time": time.Now().UTC(), + "event": evs, + }) + if err != nil { + hub.CaptureException(err) + log.Printf("Error marshaling JSON: %v", err) + break + } + + // Publish to NATS + _, err = js.Publish(config.NATS.Subject.Event, b) + if err != nil { + hub.CaptureException(err) + log.Printf("Error publishing to NATS: %v", err) + } + + case state := <-ctrl.C: + gpioSetState(state, gpio) + ingest.AddState(state) + display.SetState(state) + + case <-ctx.Done(): + return + } + } +} diff --git a/cmd/fermentord/main.go b/cmd/fermentord/main.go index 431ef8b..608b55b 100644 --- a/cmd/fermentord/main.go +++ b/cmd/fermentord/main.go @@ -2,7 +2,6 @@ package main import ( "context" - "encoding/json" "fmt" "log" "net/http" @@ -11,141 +10,12 @@ import ( "git.joco.dk/sng/fermentord/internal/api" "git.joco.dk/sng/fermentord/internal/configuration" - "git.joco.dk/sng/fermentord/internal/controllers" - "git.joco.dk/sng/fermentord/internal/dwingest" - "git.joco.dk/sng/fermentord/internal/hw" - "git.joco.dk/sng/fermentord/internal/lcd" - "git.joco.dk/sng/fermentord/internal/metrics" "git.joco.dk/sng/fermentord/pkg/daemon" - "git.joco.dk/sng/fermentord/pkg/temperature" - "git.joco.dk/sng/fermentord/pkg/tilt" - "github.com/fsnotify/fsnotify" "github.com/getsentry/sentry-go" "github.com/nats-io/nats.go" "github.com/prometheus/client_golang/prometheus/promhttp" - "github.com/spf13/viper" ) -func mainLoop(ctx context.Context, wg *sync.WaitGroup, js nats.JetStream, config *configuration.Configuration) { - hub := sentry.CurrentHub().Clone() - defer hub.Flush(10 * time.Second) - defer wg.Done() - - // Display - display, err := lcd.NewLCD() - if err != nil { - hub.CaptureException(err) - log.Fatal(err) - } - defer display.Close() - - // Controller - ctrl := controllers.NewChamberController("Chamber 1", *config) - - // NATS - ingest := dwingest.NewDWIngest() - - // Configuration reload - loadConfiguration := func() { - temperature.ConfigUpdate <- *config - ctrl.ConfigUpdates <- *config - display.SetSetpointTemp(config.FermentationTemperature) - } - - viper.OnConfigChange(func(in fsnotify.Event) { - loadConfiguration() - }) - loadConfiguration() - viper.WatchConfig() - - gpio, err := hw.NewGpio() - if err != nil { - hub.CaptureException(err) - log.Fatal(err) - } - defer gpio.Close() - - wg.Add(5) - go display.Run(ctx, wg) - go ctrl.Run(ctx, wg) - go ingest.Run(ctx, wg, js, config) - go temperature.PollSensors(ctx, wg, 1*time.Second, config.Sensors.Weight) - go tilt.PollSensors(ctx, wg, 1*time.Minute, 20*time.Second) - - for { - select { - case reading := <-temperature.Reading: - ctrl.SetTemperature(reading) - ingest.AddReading(reading) - display.SetTemperature(reading) - - case ev := <-gpio.C: - var evs string - - switch ev { - case hw.DoorClosed: - gpio.LightsOff() - gpio.StartFan() - ctrl.Resume() - evs = "DOOR_CLOSED" - - case hw.DoorOpened: - ctrl.Pause() - gpio.LightsOn() - gpio.StopFan() - evs = "DOOR_OPENED" - } - - b, err := json.Marshal(map[string]interface{}{ - "time": time.Now().UTC(), - "event": evs, - }) - if err != nil { - hub.CaptureException(err) - log.Printf("Error marshaling JSON: %v", err) - break - } - - // Publish to NATS - _, err = js.Publish(config.NATS.Subject.Event, b) - if err != nil { - hub.CaptureException(err) - log.Printf("Error publishing to NATS: %v", err) - } - - case state := <-ctrl.C: - gpioSetState(state, gpio) - ingest.AddState(state) - display.SetState(state) - - case <-ctx.Done(): - return - } - } -} - -func gpioSetState(state controllers.ChamberState, gpio *hw.Gpio) { - switch state { - case controllers.ChamberStateIdle: - log.Printf("Setting chamber state idle") - metrics.State.Set(metrics.MetricStateIdle) - gpio.StopCooler() - gpio.StopHeater() - - case controllers.ChamberStateCooling: - log.Printf("Setting chamber state cooling") - metrics.State.Set(metrics.MetricStateCooling) - gpio.StopHeater() - gpio.StartCooler() - - case controllers.ChamberStateHeating: - log.Printf("Setting chamber state heating") - metrics.State.Set(metrics.MetricStateHeating) - gpio.StopCooler() - gpio.StartHeater() - } -} - func main() { // Sentry err := sentry.Init(sentry.ClientOptions{ @@ -166,7 +36,7 @@ func main() { log.Fatal(err) } - js, err := nc.JetStream(nats.PublishAsyncMaxPending(256)) + js, err := nc.JetStream(nats.PublishAsyncMaxPending(4096)) if err != nil { log.Fatal(err) } @@ -216,16 +86,3 @@ func main() { nc.Close() log.Print("Shutdown complete") } - -func shutdownHTTP(ctx context.Context, wg *sync.WaitGroup, srv *http.Server) { - hub := sentry.CurrentHub().Clone() - defer hub.Flush(10 * time.Second) - defer wg.Done() - - ctx2, cancel := context.WithTimeout(ctx, 10*time.Second) - defer cancel() - if err := srv.Shutdown(ctx2); err != nil { - hub.CaptureException(err) - log.Printf("Error shutting down HTTP server: %v", err) - } -} diff --git a/internal/configuration/config.go b/internal/configuration/config.go index 77a1a8d..a419fa8 100644 --- a/internal/configuration/config.go +++ b/internal/configuration/config.go @@ -31,9 +31,10 @@ type Configuration struct { Weight float64 `mapstructure:"weight"` } `mapstructure:"sensors"` - FermentationTemperature float64 `mapstructure:"fermentation_temp"` - DeltaTemperatureCool float64 `mapstructure:"delta_temp_cool"` - DeltaTemperatureHeat float64 `mapstructure:"delta_temp_heat"` + FermentationTemperature float64 `mapstructure:"fermentation_temp"` + DeltaTemperatureCool float64 `mapstructure:"delta_temp_cool"` + DeltaTemperatureHeat float64 `mapstructure:"delta_temp_heat"` + HeaterDutyCycle time.Duration `mapstructure:"heater_duty_cycle"` Limits struct { MinChamberTemperature float64 `mapstructure:"min_chamber_temp"` @@ -55,6 +56,7 @@ func LoadConfiguration() *Configuration { hub := sentry.CurrentHub().Clone() defer hub.Flush(10 * time.Second) + viper.SetDefault("heater_duty_cycle", 1*time.Second) viper.SetDefault("http.port", 8000) viper.SetDefault("nats.stream", "DWJONDAHL") viper.SetDefault("nats.subject.event", "DWJONDAHL.ingest.fermentor.event")