diff --git a/cmd/fermentord/main.go b/cmd/fermentord/main.go index f73e475..ceb0642 100644 --- a/cmd/fermentord/main.go +++ b/cmd/fermentord/main.go @@ -2,7 +2,6 @@ package main import ( "context" - "encoding/json" "fmt" "log" "net/http" @@ -12,6 +11,7 @@ import ( "git.joco.dk/sng/fermentord/internal/api" "git.joco.dk/sng/fermentord/internal/configuration" "git.joco.dk/sng/fermentord/internal/controllers" + "git.joco.dk/sng/fermentord/internal/dwingest" "git.joco.dk/sng/fermentord/internal/hw" "git.joco.dk/sng/fermentord/internal/metrics" "git.joco.dk/sng/fermentord/pkg/daemon" @@ -33,6 +33,9 @@ func mainLoop(ctx context.Context, wg *sync.WaitGroup, js nats.JetStream, config defer close(chCtrlTemp) ctrl := controllers.NewChamberController("Chamber 1", *config, chCtrlTemp) + // NATS + ingest := dwingest.NewDWIngest() + // Configuration reload viper.OnConfigChange(func(in fsnotify.Event) { controllers.ReloadConfiguration(config, ctrl) @@ -47,28 +50,16 @@ func mainLoop(ctx context.Context, wg *sync.WaitGroup, js nats.JetStream, config } defer gpio.Close() - wg.Add(2) + wg.Add(3) go ctrl.Run(ctx, wg) + go ingest.Run(ctx, wg, js, config) go temperature.PollSensors(ctx, wg, 1*time.Second, config.Sensors.Weight) for { select { case reading := <-temperature.C: chCtrlTemp <- reading - - b, err := json.Marshal(reading) - if err != nil { - hub.CaptureException(err) - log.Printf("Error marshaling JSON: %v", err) - break - } - - // Publish to NATS - _, err = js.Publish(config.NATS.Subject.Temp, b) - if err != nil { - hub.CaptureException(err) - log.Printf("Error publishing to NATS: %v", err) - } + ingest.AddReading(reading) case state := <-ctrl.C: switch state { @@ -91,12 +82,7 @@ func mainLoop(ctx context.Context, wg *sync.WaitGroup, js nats.JetStream, config gpio.StartHeater() } - // Publish to NATS - _, err = js.Publish(config.NATS.Subject.State, []byte(controllers.ChamberStateMap[state])) - if err != nil { - hub.CaptureException(err) - log.Printf("Error publishing to NATS: %v", err) - } + ingest.AddState(state) case <-ctx.Done(): return diff --git a/internal/dwingest/nats.go b/internal/dwingest/nats.go new file mode 100644 index 0000000..f16b5bc --- /dev/null +++ b/internal/dwingest/nats.go @@ -0,0 +1,73 @@ +package dwingest + +import ( + "context" + "encoding/json" + "log" + "sync" + "time" + + "git.joco.dk/sng/fermentord/internal/configuration" + "git.joco.dk/sng/fermentord/internal/controllers" + "git.joco.dk/sng/fermentord/pkg/temperature" + "github.com/getsentry/sentry-go" + "github.com/nats-io/nats.go" +) + +type DWIngest struct { + chTemperatureReading chan temperature.TemperatureReading + chState chan controllers.ChamberState +} + +func NewDWIngest() *DWIngest { + return &DWIngest{ + chTemperatureReading: make(chan temperature.TemperatureReading, 3600), + chState: make(chan controllers.ChamberState, 100), + } +} + +func (p *DWIngest) AddReading(reading temperature.TemperatureReading) { + p.chTemperatureReading <- reading +} + +func (p *DWIngest) AddState(state controllers.ChamberState) { + p.chState <- state +} + +func (p *DWIngest) Run(ctx context.Context, wg *sync.WaitGroup, js nats.JetStream, config *configuration.ControllerConfig) { + hub := sentry.CurrentHub().Clone() + defer hub.Flush(10 * time.Second) + defer wg.Done() + + for { + select { + case reading := <-p.chTemperatureReading: + b, err := json.Marshal(reading) + if err != nil { + hub.CaptureException(err) + log.Printf("Error marshaling JSON: %v", err) + break + } + + _, err = js.Publish(config.NATS.Subject.Temp, b) + if err != nil { + hub.CaptureException(err) + log.Printf("Error publishing to NATS: %v", err) + } + + case state := <-p.chState: + _, err := js.Publish(config.NATS.Subject.State, []byte(controllers.ChamberStateMap[state])) + if err != nil { + hub.CaptureException(err) + log.Printf("Error publishing to NATS: %v", err) + } + + case <-ctx.Done(): + close(p.chTemperatureReading) + close(p.chState) + + return + } + + } +}