fermentord/cmd/fermentord/main.go

196 lines
4.4 KiB
Go

package main
import (
"context"
"fmt"
"log"
"net/http"
"sync"
"time"
"git.joco.dk/sng/fermentord/internal/api"
"git.joco.dk/sng/fermentord/internal/configuration"
"git.joco.dk/sng/fermentord/internal/controllers"
"git.joco.dk/sng/fermentord/internal/dwingest"
"git.joco.dk/sng/fermentord/internal/hw"
"git.joco.dk/sng/fermentord/internal/lcd"
"git.joco.dk/sng/fermentord/internal/metrics"
"git.joco.dk/sng/fermentord/pkg/daemon"
"git.joco.dk/sng/fermentord/pkg/temperature"
"git.joco.dk/sng/fermentord/pkg/tilt"
"github.com/fsnotify/fsnotify"
"github.com/getsentry/sentry-go"
"github.com/nats-io/nats.go"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/spf13/viper"
)
func mainLoop(ctx context.Context, wg *sync.WaitGroup, js nats.JetStream, config *configuration.Configuration) {
hub := sentry.CurrentHub().Clone()
defer hub.Flush(10 * time.Second)
defer wg.Done()
// Display
display, err := lcd.NewLCD()
if err != nil {
hub.CaptureException(err)
log.Fatal(err)
}
defer display.Close()
// Controller
ctrl := controllers.NewChamberController("Chamber 1", *config)
// NATS
ingest := dwingest.NewDWIngest()
// Configuration reload
loadConfiguration := func() {
temperature.ConfigUpdate <- *config
ctrl.ConfigUpdates <- *config
display.SetSetpointTemp(config.FermentationTemperature)
}
viper.OnConfigChange(func(in fsnotify.Event) {
loadConfiguration()
})
loadConfiguration()
viper.WatchConfig()
gpio, err := hw.NewGpio()
if err != nil {
hub.CaptureException(err)
log.Fatal(err)
}
defer gpio.Close()
wg.Add(5)
go display.Run(ctx, wg)
go ctrl.Run(ctx, wg)
go ingest.Run(ctx, wg, js, config)
go temperature.PollSensors(ctx, wg, 1*time.Second, config.Sensors.Weight)
go tilt.PollSensors(ctx, wg, 1*time.Minute, 20*time.Second)
for {
select {
case reading := <-temperature.Reading:
ctrl.SetTemperature(reading)
ingest.AddReading(reading)
display.SetTemperature(reading)
case state := <-ctrl.C:
gpioSetState(state, gpio)
ingest.AddState(state)
display.SetState(state)
case <-ctx.Done():
return
}
}
}
func gpioSetState(state controllers.ChamberState, gpio *hw.Gpio) {
switch state {
case controllers.ChamberStateIdle:
log.Printf("Setting chamber state idle")
metrics.State.Set(metrics.MetricStateIdle)
gpio.StopCooler()
gpio.StopHeater()
case controllers.ChamberStateCooling:
log.Printf("Setting chamber state cooling")
metrics.State.Set(metrics.MetricStateCooling)
gpio.StopHeater()
gpio.StartCooler()
case controllers.ChamberStateHeating:
log.Printf("Setting chamber state heating")
metrics.State.Set(metrics.MetricStateHeating)
gpio.StopCooler()
gpio.StartHeater()
}
}
func main() {
// Sentry
err := sentry.Init(sentry.ClientOptions{
Dsn: "https://7278625538334140991ce433e0ad292f@sentry.joco.dk/24",
TracesSampleRate: 0.1,
})
if err != nil {
log.Fatal(err)
}
defer sentry.Flush(10 * time.Second)
// Configuration
config := configuration.LoadConfiguration()
// NATS
nc, err := nats.Connect(config.NATS.URL)
if err != nil {
log.Fatal(err)
}
js, err := nc.JetStream(nats.PublishAsyncMaxPending(256))
if err != nil {
log.Fatal(err)
}
_, err = js.AddStream(&nats.StreamConfig{
Name: config.NATS.Stream,
Subjects: []string{
fmt.Sprintf("%v.*", config.NATS.Stream),
},
Replicas: 2,
Retention: nats.WorkQueuePolicy,
})
if err != nil {
log.Fatal(err)
}
// HTTP
mux := http.NewServeMux()
srv := &http.Server{
Addr: fmt.Sprintf(":%v", config.HTTP.Port),
Handler: mux,
}
api := api.NewAPI(nc)
// Metrics
mux.Handle("/metrics", promhttp.Handler())
mux.HandleFunc("/health", api.HealthCheck)
// Main
ctxb := context.Background()
ctx, shutdown := context.WithCancel(ctxb)
wg := &sync.WaitGroup{}
wg.Add(1)
go mainLoop(ctx, wg, js, config)
go srv.ListenAndServe()
daemon.WaitForSignalsDefault()
log.Print("Shutting down")
// Initiate graceful shutdown.
wg.Add(1)
go shutdownHTTP(ctxb, wg, srv)
shutdown()
wg.Wait()
nc.Close()
log.Print("Shutdown complete")
}
func shutdownHTTP(ctx context.Context, wg *sync.WaitGroup, srv *http.Server) {
hub := sentry.CurrentHub().Clone()
defer hub.Flush(10 * time.Second)
defer wg.Done()
ctx2, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
if err := srv.Shutdown(ctx2); err != nil {
hub.CaptureException(err)
log.Printf("Error shutting down HTTP server: %v", err)
}
}