fermentord/internal/dwingest/nats.go

89 lines
2 KiB
Go
Raw Normal View History

2022-07-19 09:11:50 +00:00
package dwingest
import (
"context"
"encoding/json"
"log"
"sync"
"time"
"git.joco.dk/sng/fermentord/internal/configuration"
"git.joco.dk/sng/fermentord/internal/controllers"
"git.joco.dk/sng/fermentord/pkg/temperature"
2022-07-23 14:10:34 +00:00
"git.joco.dk/sng/fermentord/pkg/tilt"
2022-07-19 09:11:50 +00:00
"github.com/getsentry/sentry-go"
"github.com/nats-io/nats.go"
)
type DWIngest struct {
chTemperatureReading chan temperature.TemperatureReading
chState chan controllers.ChamberState
}
func NewDWIngest() *DWIngest {
return &DWIngest{
chTemperatureReading: make(chan temperature.TemperatureReading, 3600),
chState: make(chan controllers.ChamberState, 100),
}
}
func (p *DWIngest) AddReading(reading temperature.TemperatureReading) {
p.chTemperatureReading <- reading
}
func (p *DWIngest) AddState(state controllers.ChamberState) {
p.chState <- state
}
2022-03-15 16:20:47 +00:00
func (p *DWIngest) Run(ctx context.Context, wg *sync.WaitGroup, js nats.JetStream, config *configuration.Configuration) {
2022-07-19 09:11:50 +00:00
hub := sentry.CurrentHub().Clone()
defer hub.Flush(10 * time.Second)
defer wg.Done()
for {
select {
case reading := <-p.chTemperatureReading:
2022-07-23 14:10:34 +00:00
publish(config.NATS.Subject.Temp, js, hub, reading)
2022-07-19 09:11:50 +00:00
2022-07-23 14:10:34 +00:00
case state := <-p.chState:
reading := State{
2022-07-23 14:39:56 +00:00
Time: time.Now().UTC(),
2022-07-23 14:10:34 +00:00
State: controllers.ChamberStateMap[state],
2022-07-19 09:11:50 +00:00
}
2022-07-23 14:10:34 +00:00
publish(config.NATS.Subject.State, js, hub, reading)
2022-07-19 09:11:50 +00:00
2022-07-23 14:10:34 +00:00
case t := <-tilt.C:
reading := Tilt{
2022-07-23 14:39:56 +00:00
Time: time.Now().UTC(),
2022-07-23 14:10:34 +00:00
Color: string(t.Color()),
Gravity: t.Gravity(),
Temperature: t.Celsius(),
2022-07-19 09:11:50 +00:00
}
2022-07-23 14:10:34 +00:00
publish(config.NATS.Subject.Tilt, js, hub, reading)
2022-07-19 09:11:50 +00:00
case <-ctx.Done():
close(p.chTemperatureReading)
close(p.chState)
return
}
}
}
2022-07-23 14:10:34 +00:00
func publish(subject string, js nats.JetStream, hub *sentry.Hub, reading any) error {
b, err := json.Marshal(reading)
if err != nil {
hub.CaptureException(err)
log.Printf("Error marshaling JSON: %v", err)
return err
}
_, err = js.Publish(subject, b)
if err != nil {
hub.CaptureException(err)
log.Printf("Error publishing to NATS: %v", err)
}
return err
}