package dwingest import ( "context" "encoding/json" "fmt" "log" "sync" "time" "git.joco.dk/snr/fermentord/internal/configuration" "git.joco.dk/snr/fermentord/internal/controllers" "git.joco.dk/snr/fermentord/pkg/temperature" "git.joco.dk/snr/fermentord/pkg/tilt" "github.com/getsentry/sentry-go" "github.com/gofrs/uuid" "github.com/nats-io/nats.go" ) type DWIngest struct { ConfigUpdates chan configuration.Configuration chTemperatureReading chan temperature.TemperatureReading chState chan controllers.ChamberState chEvent chan string js nats.JetStream config configuration.Configuration hub *sentry.Hub } func NewDWIngest(js nats.JetStream, config configuration.Configuration) *DWIngest { return &DWIngest{ chTemperatureReading: make(chan temperature.TemperatureReading, 3600), chState: make(chan controllers.ChamberState, 100), chEvent: make(chan string, 100), hub: sentry.CurrentHub().Clone(), js: js, config: config, ConfigUpdates: make(chan configuration.Configuration, 1), } } func (p *DWIngest) AddReading(reading temperature.TemperatureReading) { select { case p.chTemperatureReading <- reading: break default: err := fmt.Errorf("channel overflow on dwingest temperature channel") p.hub.CaptureException(err) log.Print(err) } } func (p *DWIngest) AddState(state controllers.ChamberState) { select { case p.chState <- state: break default: err := fmt.Errorf("channel overflow on dwingest state channel") p.hub.CaptureException(err) log.Print(err) } } func (p *DWIngest) AddEvent(event string) { select { case p.chEvent <- event: break default: err := fmt.Errorf("channel overflow on dwingest event channel") p.hub.CaptureException(err) log.Print(err) } } func (p *DWIngest) Run(ctx context.Context, wg *sync.WaitGroup) { defer wg.Done() defer p.hub.Flush(10 * time.Second) for { select { case reading := <-p.chTemperatureReading: p.publishTemperatureReading(reading) case state := <-p.chState: p.publishState(state) case ev := <-p.chEvent: p.publishEvent(ev) // TODO Refactor case t := <-tilt.C: p.publishTilt(t) case config := <-p.ConfigUpdates: p.config = config case <-ctx.Done(): return } } } func (p *DWIngest) publish(subject string, reading any) error { b, err := json.Marshal(reading) if err != nil { p.hub.CaptureException(err) log.Printf("Error marshaling JSON: %v", err) return err } msgID, err := uuid.NewV7() if err != nil { p.hub.CaptureException(err) log.Print(err) return err } msg := nats.NewMsg(subject) msg.Header.Add("Nats-Msg-Id", msgID.String()) msg.Data = b _, err = p.js.PublishMsg(msg, nats.AckWait(30*time.Second)) if err != nil { p.hub.CaptureException(err) log.Print(err) return err } return nil } func (p *DWIngest) publishTilt(t tilt.Tilt) error { ev := Tilt{ Time: time.Now().UTC(), BrewUUID: p.config.Brew.UUID, Color: string(t.Color()), Gravity: t.Gravity(), Temperature: t.Celsius(), } return p.publish(p.config.NATS.Subject.Tilt, ev) } func (p *DWIngest) publishTemperatureReading(reading temperature.TemperatureReading) error { reading.BrewUUID = p.config.Brew.UUID return p.publish(p.config.NATS.Subject.Temp, reading) } func (p *DWIngest) publishState(state controllers.ChamberState) error { st := State{ Time: time.Now().UTC(), BrewUUID: p.config.Brew.UUID, State: controllers.ChamberStateMap[state], } return p.publish(p.config.NATS.Subject.State, st) } func (p *DWIngest) publishEvent(event string) error { ev := Event{ Time: time.Now().UTC(), BrewUUID: p.config.Brew.UUID, Event: event, } return p.publish(p.config.NATS.Subject.Event, ev) }