fermentord/cmd/fermentord/main.go

198 lines
4.6 KiB
Go

package main
import (
"context"
"encoding/json"
"fmt"
"log"
"net/http"
"sync"
"time"
"git.joco.dk/sng/fermentord/internal/api"
"git.joco.dk/sng/fermentord/internal/configuration"
"git.joco.dk/sng/fermentord/internal/controllers"
"git.joco.dk/sng/fermentord/internal/hw"
"git.joco.dk/sng/fermentord/internal/metrics"
"git.joco.dk/sng/fermentord/pkg/daemon"
"git.joco.dk/sng/fermentord/pkg/temperature"
"github.com/fsnotify/fsnotify"
"github.com/getsentry/sentry-go"
"github.com/nats-io/nats.go"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/spf13/viper"
)
func mainLoop(ctx context.Context, wg *sync.WaitGroup, js nats.JetStream, config *configuration.ControllerConfig) {
hub := sentry.CurrentHub().Clone()
defer hub.Flush(10 * time.Second)
defer wg.Done()
// Controller
chCtrlTemp := make(chan temperature.TemperatureReading, 10)
defer close(chCtrlTemp)
ctrl := controllers.NewChamberController("Chamber 1", *config, chCtrlTemp)
// Configuration reload
viper.OnConfigChange(func(in fsnotify.Event) {
controllers.ReloadConfiguration(config, ctrl)
})
viper.WatchConfig()
controllers.ReloadConfiguration(config, ctrl)
gpio, err := hw.NewGpio()
if err != nil {
hub.CaptureException(err)
log.Fatal(err)
}
defer gpio.Close()
chDbTemp := make(chan temperature.TemperatureReading, 10)
chDbState := make(chan controllers.ChamberState, 10)
defer close(chDbTemp)
defer close(chDbState)
wg.Add(3)
go persistData(ctx, wg, chDbState, chDbTemp, config)
go ctrl.Run(ctx, wg)
go temperature.PollSensors(ctx, wg, 1*time.Second, config.Sensors.Weight)
for {
select {
case reading := <-temperature.C:
chCtrlTemp <- reading
chDbTemp <- reading
b, err := json.Marshal(reading)
if err != nil {
hub.CaptureException(err)
log.Printf("Error marshaling JSON: %v", err)
break
}
// Publish to NATS
_, err = js.Publish(config.NATS.Subject.Temp, b)
if err != nil {
hub.CaptureException(err)
log.Printf("Error publishing to NATS: %v", err)
}
case state := <-ctrl.C:
switch state {
case controllers.ChamberStateIdle:
log.Printf("Setting chamber state idle")
metrics.State.Set(metrics.MetricStateIdle)
gpio.StopCooler()
gpio.StopHeater()
case controllers.ChamberStateCooling:
log.Printf("Setting chamber state cooling")
metrics.State.Set(metrics.MetricStateCooling)
gpio.StopHeater()
gpio.StartCooler()
case controllers.ChamberStateHeating:
log.Printf("Setting chamber state heating")
metrics.State.Set(metrics.MetricStateHeating)
gpio.StopCooler()
gpio.StartHeater()
}
chDbState <- state
// Publish to NATS
_, err = js.Publish(config.NATS.Subject.State, []byte(controllers.ChamberStateMap[state]))
if err != nil {
hub.CaptureException(err)
log.Printf("Error publishing to NATS: %v", err)
}
case <-ctx.Done():
return
}
}
}
func main() {
// Sentry
err := sentry.Init(sentry.ClientOptions{
Dsn: "https://7278625538334140991ce433e0ad292f@sentry.joco.dk/24",
TracesSampleRate: 0.1,
})
if err != nil {
log.Fatal(err)
}
defer sentry.Flush(10 * time.Second)
// Configuration
config := configuration.LoadConfiguration()
// NATS
nc, err := nats.Connect(config.NATS.URL)
if err != nil {
log.Fatal(err)
}
js, err := nc.JetStream(nats.PublishAsyncMaxPending(256))
if err != nil {
log.Fatal(err)
}
_, err = js.AddStream(&nats.StreamConfig{
Name: config.NATS.Stream,
Subjects: []string{
fmt.Sprintf("%v.*", config.NATS.Stream),
},
Replicas: 2,
Retention: nats.WorkQueuePolicy,
})
if err != nil {
log.Fatal(err)
}
// HTTP
mux := http.NewServeMux()
srv := &http.Server{
Addr: fmt.Sprintf(":%v", config.HTTP.Port),
Handler: mux,
}
api := api.NewAPI(nc)
// Metrics
mux.Handle("/metrics", promhttp.Handler())
mux.HandleFunc("/health", api.HealthCheck)
// Main
ctxb := context.Background()
ctx, shutdown := context.WithCancel(ctxb)
wg := &sync.WaitGroup{}
wg.Add(1)
go mainLoop(ctx, wg, js, config)
go srv.ListenAndServe()
daemon.WaitForSignalsDefault()
log.Print("Shutting down")
// Initiate graceful shutdown.
wg.Add(1)
go shutdownHTTP(ctxb, wg, srv)
shutdown()
wg.Wait()
nc.Close()
log.Print("Shutdown complete")
}
func shutdownHTTP(ctx context.Context, wg *sync.WaitGroup, srv *http.Server) {
hub := sentry.CurrentHub().Clone()
defer hub.Flush(10 * time.Second)
defer wg.Done()
ctx2, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
if err := srv.Shutdown(ctx2); err != nil {
hub.CaptureException(err)
log.Printf("Error shutting down HTTP server: %v", err)
}
}