From fe94a51acda9a647ed5cd21ba4975f1f9eb99fe9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B8ren=20Rasmussen?= Date: Sat, 23 Jul 2022 16:10:34 +0200 Subject: [PATCH] Publish Tilt readings to DWIngest --- internal/configuration/config.go | 2 ++ internal/dwingest/nats.go | 46 +++++++++++++++++++++----------- internal/dwingest/types.go | 11 ++++++++ pkg/tilt/main.go | 10 +++++++ 4 files changed, 53 insertions(+), 16 deletions(-) create mode 100644 internal/dwingest/types.go diff --git a/internal/configuration/config.go b/internal/configuration/config.go index ce8b39f..07ec1f4 100644 --- a/internal/configuration/config.go +++ b/internal/configuration/config.go @@ -16,6 +16,7 @@ type Configuration struct { Event string `mapstructure:"event"` State string `mapstructure:"state"` Temp string `mapstructure:"temp"` + Tilt string `mapstructure:"tilt"` } `mapstructure:"subject"` } `mapstructure:"nats"` @@ -59,6 +60,7 @@ func LoadConfiguration() *Configuration { viper.SetDefault("nats.subject.event", "FERMENTOR.event") viper.SetDefault("nats.subject.state", "FERMENTOR.state") viper.SetDefault("nats.subject.temp", "FERMENTOR.temp") + viper.SetDefault("nats.subject.tilt", "FERMENTOR.tilt") viper.SetDefault("nats.url", "nats.service.consul") viper.SetDefault("pid.kd", 2.0) viper.SetDefault("pid.ki", 0.0001) diff --git a/internal/dwingest/nats.go b/internal/dwingest/nats.go index 9cd09a3..b4af7cc 100644 --- a/internal/dwingest/nats.go +++ b/internal/dwingest/nats.go @@ -10,6 +10,7 @@ import ( "git.joco.dk/sng/fermentord/internal/configuration" "git.joco.dk/sng/fermentord/internal/controllers" "git.joco.dk/sng/fermentord/pkg/temperature" + "git.joco.dk/sng/fermentord/pkg/tilt" "github.com/getsentry/sentry-go" "github.com/nats-io/nats.go" ) @@ -42,25 +43,21 @@ func (p *DWIngest) Run(ctx context.Context, wg *sync.WaitGroup, js nats.JetStrea for { select { case reading := <-p.chTemperatureReading: - b, err := json.Marshal(reading) - if err != nil { - hub.CaptureException(err) - log.Printf("Error marshaling JSON: %v", err) - break - } - - _, err = js.Publish(config.NATS.Subject.Temp, b) - if err != nil { - hub.CaptureException(err) - log.Printf("Error publishing to NATS: %v", err) - } + publish(config.NATS.Subject.Temp, js, hub, reading) case state := <-p.chState: - _, err := js.Publish(config.NATS.Subject.State, []byte(controllers.ChamberStateMap[state])) - if err != nil { - hub.CaptureException(err) - log.Printf("Error publishing to NATS: %v", err) + reading := State{ + State: controllers.ChamberStateMap[state], } + publish(config.NATS.Subject.State, js, hub, reading) + + case t := <-tilt.C: + reading := Tilt{ + Color: string(t.Color()), + Gravity: t.Gravity(), + Temperature: t.Celsius(), + } + publish(config.NATS.Subject.Tilt, js, hub, reading) case <-ctx.Done(): close(p.chTemperatureReading) @@ -71,3 +68,20 @@ func (p *DWIngest) Run(ctx context.Context, wg *sync.WaitGroup, js nats.JetStrea } } + +func publish(subject string, js nats.JetStream, hub *sentry.Hub, reading any) error { + b, err := json.Marshal(reading) + if err != nil { + hub.CaptureException(err) + log.Printf("Error marshaling JSON: %v", err) + return err + } + + _, err = js.Publish(subject, b) + if err != nil { + hub.CaptureException(err) + log.Printf("Error publishing to NATS: %v", err) + } + + return err +} diff --git a/internal/dwingest/types.go b/internal/dwingest/types.go new file mode 100644 index 0000000..55bfe9b --- /dev/null +++ b/internal/dwingest/types.go @@ -0,0 +1,11 @@ +package dwingest + +type State struct { + State string `json:"state"` +} + +type Tilt struct { + Color string `json:"color"` + Gravity float64 `json:"gravity"` + Temperature float64 `json:"temperature"` +} diff --git a/pkg/tilt/main.go b/pkg/tilt/main.go index 2ee909f..92f2541 100644 --- a/pkg/tilt/main.go +++ b/pkg/tilt/main.go @@ -7,6 +7,14 @@ import ( "time" ) +var ( + C chan Tilt +) + +func init() { + C = make(chan Tilt, 10) +} + func PollSensors(ctx context.Context, wg *sync.WaitGroup, interval time.Duration, scanDuration time.Duration) { if interval < scanDuration { log.Fatal("Unable to use interval < scanDuration") @@ -34,5 +42,7 @@ func scan(ctx context.Context, timeout time.Duration) { color := string(t.Color()) metricGravity.WithLabelValues(color).Set(t.Gravity()) metricTemp.WithLabelValues(color).Set(t.Celsius()) + + C <- t } }