From deb1ef3578ab0dfe27ba415f5f93aeb101d04068 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B8ren=20Rasmussen?= Date: Tue, 2 Aug 2022 20:39:31 +0200 Subject: [PATCH] Refactor dwingest --- cmd/fermentord/loop.go | 27 ++------------- internal/dwingest/nats.go | 70 +++++++++++++++++++++++++++++++-------- pkg/tilt/main.go | 1 + 3 files changed, 60 insertions(+), 38 deletions(-) diff --git a/cmd/fermentord/loop.go b/cmd/fermentord/loop.go index b179cd1..8924577 100644 --- a/cmd/fermentord/loop.go +++ b/cmd/fermentord/loop.go @@ -81,24 +81,13 @@ func mainLoop(ctx context.Context, wg *sync.WaitGroup, js nats.JetStream) { log.Print("Powering down one wire bus for 20 seconds") gpio.StopOneWirePower() oneWirePowerUpTimer.Reset(20 * time.Second) - - // Publish to NATS - e := dwingest.Event{ - Time: time.Now().UTC(), - Event: "ONEWIRE_RESET", - } - if err := ingest.Publish(config.NATS.Subject.Event, e); err != nil { - hub.CaptureException(err) - log.Print(err) - } + ingest.AddEvent("ONEWIRE_RESET") case <-oneWirePowerUpChannel: log.Print("Powering up one wire bus") gpio.StartOneWirePower() case ev := <-gpio.C: - var evs string - switch ev { case hw.DoorClosed: log.Print("Door closed") @@ -107,24 +96,14 @@ func mainLoop(ctx context.Context, wg *sync.WaitGroup, js nats.JetStream) { gpio.StartFan() } ctrl.Resume() - evs = "DOOR_CLOSED" + ingest.AddEvent("DOOR_CLOSED") case hw.DoorOpened: log.Print("Door opened") ctrl.Pause() gpio.LightsOn() gpio.StopFan() - evs = "DOOR_OPENED" - } - - // Publish to NATS - e := dwingest.Event{ - Time: time.Now().UTC(), - Event: evs, - } - if err := ingest.Publish(config.NATS.Subject.Event, e); err != nil { - hub.CaptureException(err) - log.Print(err) + ingest.AddEvent("DOOR_OPENED") } case state := <-ctrl.C: diff --git a/internal/dwingest/nats.go b/internal/dwingest/nats.go index 78be56a..3baeb97 100644 --- a/internal/dwingest/nats.go +++ b/internal/dwingest/nats.go @@ -22,17 +22,18 @@ type DWIngest struct { chTemperatureReading chan temperature.TemperatureReading chState chan controllers.ChamberState + chEvent chan string js nats.JetStream config configuration.Configuration hub *sentry.Hub - serial int64 } func NewDWIngest(js nats.JetStream, config configuration.Configuration) *DWIngest { return &DWIngest{ chTemperatureReading: make(chan temperature.TemperatureReading, 3600), chState: make(chan controllers.ChamberState, 100), + chEvent: make(chan string, 100), hub: sentry.CurrentHub().Clone(), js: js, config: config, @@ -64,6 +65,18 @@ func (p *DWIngest) AddState(state controllers.ChamberState) { } } +func (p *DWIngest) AddEvent(event string) { + select { + case p.chEvent <- event: + break + + default: + err := fmt.Errorf("channel overflow on dwingest event channel") + p.hub.CaptureException(err) + log.Print(err) + } +} + func (p *DWIngest) Run(ctx context.Context, wg *sync.WaitGroup) { defer wg.Done() defer p.hub.Flush(10 * time.Second) @@ -71,21 +84,17 @@ func (p *DWIngest) Run(ctx context.Context, wg *sync.WaitGroup) { for { select { case reading := <-p.chTemperatureReading: - p.Publish(p.config.NATS.Subject.Temp, reading) + p.publishTemperatureReading(reading) case state := <-p.chState: - p.Publish(p.config.NATS.Subject.State, State{ - Time: time.Now().UTC(), - State: controllers.ChamberStateMap[state], - }) + p.publishState(state) + case ev := <-p.chEvent: + p.publishEvent(ev) + + // TODO Refactor case t := <-tilt.C: - p.Publish(p.config.NATS.Subject.Tilt, Tilt{ - Time: time.Now().UTC(), - Color: string(t.Color()), - Gravity: t.Gravity(), - Temperature: t.Celsius(), - }) + p.publishTilt(t) case config := <-p.ConfigUpdates: p.config = config @@ -96,7 +105,7 @@ func (p *DWIngest) Run(ctx context.Context, wg *sync.WaitGroup) { } } -func (p *DWIngest) Publish(subject string, reading any) error { +func (p *DWIngest) publish(subject string, reading any) error { b, err := json.Marshal(reading) if err != nil { p.hub.CaptureException(err) @@ -113,7 +122,7 @@ func (p *DWIngest) Publish(subject string, reading any) error { msg := nats.NewMsg(subject) msg.Header.Add("Nats-Msg-Id", msgID.String()) msg.Data = b - _, err = p.js.PublishMsg(msg, nats.AckWait(5*time.Second)) + _, err = p.js.PublishMsg(msg, nats.AckWait(30*time.Second)) if err != nil { p.hub.CaptureException(err) log.Print(err) @@ -122,3 +131,36 @@ func (p *DWIngest) Publish(subject string, reading any) error { return nil } + +func (p *DWIngest) publishTilt(t tilt.Tilt) error { + ev := Tilt{ + Time: time.Now().UTC(), + Color: string(t.Color()), + Gravity: t.Gravity(), + Temperature: t.Celsius(), + } + + return p.publish(p.config.NATS.Subject.Tilt, ev) +} + +func (p *DWIngest) publishTemperatureReading(reading temperature.TemperatureReading) error { + return p.publish(p.config.NATS.Subject.Temp, reading) +} + +func (p *DWIngest) publishState(state controllers.ChamberState) error { + st := State{ + Time: time.Now().UTC(), + State: controllers.ChamberStateMap[state], + } + + return p.publish(p.config.NATS.Subject.State, st) +} + +func (p *DWIngest) publishEvent(event string) error { + ev := Event{ + Time: time.Now().UTC(), + Event: event, + } + + return p.publish(p.config.NATS.Subject.Event, ev) +} diff --git a/pkg/tilt/main.go b/pkg/tilt/main.go index 5c492fd..2238eb8 100644 --- a/pkg/tilt/main.go +++ b/pkg/tilt/main.go @@ -58,6 +58,7 @@ func scan(ctx context.Context, timeout time.Duration) { select { case C <- t: // Message sent + default: // No recipients on channel err := fmt.Errorf("channel overflow on tilt reading channel")