Publish Tilt readings to DWIngest

This commit is contained in:
Søren Rasmussen 2022-07-23 16:10:34 +02:00
parent 7dbffce49b
commit fe94a51acd
4 changed files with 53 additions and 16 deletions

View file

@ -16,6 +16,7 @@ type Configuration struct {
Event string `mapstructure:"event"` Event string `mapstructure:"event"`
State string `mapstructure:"state"` State string `mapstructure:"state"`
Temp string `mapstructure:"temp"` Temp string `mapstructure:"temp"`
Tilt string `mapstructure:"tilt"`
} `mapstructure:"subject"` } `mapstructure:"subject"`
} `mapstructure:"nats"` } `mapstructure:"nats"`
@ -59,6 +60,7 @@ func LoadConfiguration() *Configuration {
viper.SetDefault("nats.subject.event", "FERMENTOR.event") viper.SetDefault("nats.subject.event", "FERMENTOR.event")
viper.SetDefault("nats.subject.state", "FERMENTOR.state") viper.SetDefault("nats.subject.state", "FERMENTOR.state")
viper.SetDefault("nats.subject.temp", "FERMENTOR.temp") viper.SetDefault("nats.subject.temp", "FERMENTOR.temp")
viper.SetDefault("nats.subject.tilt", "FERMENTOR.tilt")
viper.SetDefault("nats.url", "nats.service.consul") viper.SetDefault("nats.url", "nats.service.consul")
viper.SetDefault("pid.kd", 2.0) viper.SetDefault("pid.kd", 2.0)
viper.SetDefault("pid.ki", 0.0001) viper.SetDefault("pid.ki", 0.0001)

View file

@ -10,6 +10,7 @@ import (
"git.joco.dk/sng/fermentord/internal/configuration" "git.joco.dk/sng/fermentord/internal/configuration"
"git.joco.dk/sng/fermentord/internal/controllers" "git.joco.dk/sng/fermentord/internal/controllers"
"git.joco.dk/sng/fermentord/pkg/temperature" "git.joco.dk/sng/fermentord/pkg/temperature"
"git.joco.dk/sng/fermentord/pkg/tilt"
"github.com/getsentry/sentry-go" "github.com/getsentry/sentry-go"
"github.com/nats-io/nats.go" "github.com/nats-io/nats.go"
) )
@ -42,25 +43,21 @@ func (p *DWIngest) Run(ctx context.Context, wg *sync.WaitGroup, js nats.JetStrea
for { for {
select { select {
case reading := <-p.chTemperatureReading: case reading := <-p.chTemperatureReading:
b, err := json.Marshal(reading) publish(config.NATS.Subject.Temp, js, hub, reading)
if err != nil {
hub.CaptureException(err)
log.Printf("Error marshaling JSON: %v", err)
break
}
_, err = js.Publish(config.NATS.Subject.Temp, b)
if err != nil {
hub.CaptureException(err)
log.Printf("Error publishing to NATS: %v", err)
}
case state := <-p.chState: case state := <-p.chState:
_, err := js.Publish(config.NATS.Subject.State, []byte(controllers.ChamberStateMap[state])) reading := State{
if err != nil { State: controllers.ChamberStateMap[state],
hub.CaptureException(err)
log.Printf("Error publishing to NATS: %v", err)
} }
publish(config.NATS.Subject.State, js, hub, reading)
case t := <-tilt.C:
reading := Tilt{
Color: string(t.Color()),
Gravity: t.Gravity(),
Temperature: t.Celsius(),
}
publish(config.NATS.Subject.Tilt, js, hub, reading)
case <-ctx.Done(): case <-ctx.Done():
close(p.chTemperatureReading) close(p.chTemperatureReading)
@ -71,3 +68,20 @@ func (p *DWIngest) Run(ctx context.Context, wg *sync.WaitGroup, js nats.JetStrea
} }
} }
func publish(subject string, js nats.JetStream, hub *sentry.Hub, reading any) error {
b, err := json.Marshal(reading)
if err != nil {
hub.CaptureException(err)
log.Printf("Error marshaling JSON: %v", err)
return err
}
_, err = js.Publish(subject, b)
if err != nil {
hub.CaptureException(err)
log.Printf("Error publishing to NATS: %v", err)
}
return err
}

View file

@ -0,0 +1,11 @@
package dwingest
type State struct {
State string `json:"state"`
}
type Tilt struct {
Color string `json:"color"`
Gravity float64 `json:"gravity"`
Temperature float64 `json:"temperature"`
}

View file

@ -7,6 +7,14 @@ import (
"time" "time"
) )
var (
C chan Tilt
)
func init() {
C = make(chan Tilt, 10)
}
func PollSensors(ctx context.Context, wg *sync.WaitGroup, interval time.Duration, scanDuration time.Duration) { func PollSensors(ctx context.Context, wg *sync.WaitGroup, interval time.Duration, scanDuration time.Duration) {
if interval < scanDuration { if interval < scanDuration {
log.Fatal("Unable to use interval < scanDuration") log.Fatal("Unable to use interval < scanDuration")
@ -34,5 +42,7 @@ func scan(ctx context.Context, timeout time.Duration) {
color := string(t.Color()) color := string(t.Color())
metricGravity.WithLabelValues(color).Set(t.Gravity()) metricGravity.WithLabelValues(color).Set(t.Gravity())
metricTemp.WithLabelValues(color).Set(t.Celsius()) metricTemp.WithLabelValues(color).Set(t.Celsius())
C <- t
} }
} }