diff --git a/cmd/fermentord/loop.go b/cmd/fermentord/loop.go index 6847bad..72fd58f 100644 --- a/cmd/fermentord/loop.go +++ b/cmd/fermentord/loop.go @@ -51,7 +51,7 @@ func mainLoop(ctx context.Context, wg *sync.WaitGroup, js nats.JetStream, config viper.WatchConfig() // NATS - ingest := dwingest.NewDWIngest() + ingest := dwingest.NewDWIngest(js) gpio, err := hw.NewGpio() if err != nil { @@ -63,7 +63,7 @@ func mainLoop(ctx context.Context, wg *sync.WaitGroup, js nats.JetStream, config wg.Add(5) go display.Run(ctx, wg) go ctrl.Run(ctx, wg) - go ingest.Run(ctx, wg, js, config) + go ingest.Run(ctx, wg, config) go temperature.PollSensors(ctx, wg, 1*time.Second, config.Sensors.Weight) go tilt.PollSensors(ctx, wg, 1*time.Minute, 20*time.Second) @@ -89,7 +89,7 @@ func mainLoop(ctx context.Context, wg *sync.WaitGroup, js nats.JetStream, config Time: time.Now().UTC(), Event: "ONEWIRE_RESET", } - if err := ingest.Publish(config.NATS.Subject.Event, js, e); err != nil { + if err := ingest.Publish(config.NATS.Subject.Event, e); err != nil { hub.CaptureException(err) log.Print(err) } @@ -124,7 +124,7 @@ func mainLoop(ctx context.Context, wg *sync.WaitGroup, js nats.JetStream, config Time: time.Now().UTC(), Event: evs, } - if err := ingest.Publish(config.NATS.Subject.Event, js, e); err != nil { + if err := ingest.Publish(config.NATS.Subject.Event, e); err != nil { hub.CaptureException(err) log.Print(err) } diff --git a/internal/dwingest/nats.go b/internal/dwingest/nats.go index 2805f8d..95fe392 100644 --- a/internal/dwingest/nats.go +++ b/internal/dwingest/nats.go @@ -21,15 +21,17 @@ type DWIngest struct { chTemperatureReading chan temperature.TemperatureReading chState chan controllers.ChamberState + js nats.JetStream hub *sentry.Hub serial int64 } -func NewDWIngest() *DWIngest { +func NewDWIngest(js nats.JetStream) *DWIngest { return &DWIngest{ chTemperatureReading: make(chan temperature.TemperatureReading, 3600), chState: make(chan controllers.ChamberState, 100), hub: sentry.CurrentHub().Clone(), + js: js, } } @@ -57,23 +59,23 @@ func (p *DWIngest) AddState(state controllers.ChamberState) { } } -func (p *DWIngest) Run(ctx context.Context, wg *sync.WaitGroup, js nats.JetStream, config *configuration.Configuration) { +func (p *DWIngest) Run(ctx context.Context, wg *sync.WaitGroup, config *configuration.Configuration) { defer wg.Done() defer p.hub.Flush(10 * time.Second) for { select { case reading := <-p.chTemperatureReading: - p.Publish(config.NATS.Subject.Temp, js, reading) + p.Publish(config.NATS.Subject.Temp, reading) case state := <-p.chState: - p.Publish(config.NATS.Subject.State, js, State{ + p.Publish(config.NATS.Subject.State, State{ Time: time.Now().UTC(), State: controllers.ChamberStateMap[state], }) case t := <-tilt.C: - p.Publish(config.NATS.Subject.Tilt, js, Tilt{ + p.Publish(config.NATS.Subject.Tilt, Tilt{ Time: time.Now().UTC(), Color: string(t.Color()), Gravity: t.Gravity(), @@ -86,7 +88,7 @@ func (p *DWIngest) Run(ctx context.Context, wg *sync.WaitGroup, js nats.JetStrea } } -func (p *DWIngest) Publish(subject string, js nats.JetStream, reading any) error { +func (p *DWIngest) Publish(subject string, reading any) error { b, err := json.Marshal(reading) if err != nil { p.hub.CaptureException(err) @@ -103,7 +105,7 @@ func (p *DWIngest) Publish(subject string, js nats.JetStream, reading any) error msg := nats.NewMsg(subject) msg.Header.Add("Nats-Msg-Id", msgID.String()) msg.Data = b - _, err = js.PublishMsg(msg, nats.AckWait(5*time.Second)) + _, err = p.js.PublishMsg(msg, nats.AckWait(5*time.Second)) if err != nil { p.hub.CaptureException(err) log.Print(err)