package dwingest import ( "context" "encoding/json" "fmt" "log" "sync" "time" "git.joco.dk/sng/fermentord/internal/configuration" "git.joco.dk/sng/fermentord/internal/controllers" "git.joco.dk/sng/fermentord/pkg/temperature" "git.joco.dk/sng/fermentord/pkg/tilt" "github.com/getsentry/sentry-go" "github.com/nats-io/nats.go" ) type DWIngest struct { chTemperatureReading chan temperature.TemperatureReading chState chan controllers.ChamberState hub *sentry.Hub } func NewDWIngest() *DWIngest { return &DWIngest{ chTemperatureReading: make(chan temperature.TemperatureReading, 3600), chState: make(chan controllers.ChamberState, 100), hub: sentry.CurrentHub().Clone(), } } func (p *DWIngest) AddReading(reading temperature.TemperatureReading) { select { case p.chTemperatureReading <- reading: break default: err := fmt.Errorf("channel overflow on dwingest temperature channel") p.hub.CaptureException(err) log.Print(err) } } func (p *DWIngest) AddState(state controllers.ChamberState) { select { case p.chState <- state: break default: err := fmt.Errorf("channel overflow on dwingest state channel") p.hub.CaptureException(err) log.Print(err) } } func (p *DWIngest) Run(ctx context.Context, wg *sync.WaitGroup, js nats.JetStream, config *configuration.Configuration) { defer wg.Done() defer p.hub.Flush(10 * time.Second) for { select { case reading := <-p.chTemperatureReading: publish(config.NATS.Subject.Temp, js, p.hub, reading) case state := <-p.chState: publish(config.NATS.Subject.State, js, p.hub, State{ Time: time.Now().UTC(), State: controllers.ChamberStateMap[state], }) case t := <-tilt.C: publish(config.NATS.Subject.Tilt, js, p.hub, Tilt{ Time: time.Now().UTC(), Color: string(t.Color()), Gravity: t.Gravity(), Temperature: t.Celsius(), }) case <-ctx.Done(): return } } } func publish(subject string, js nats.JetStream, hub *sentry.Hub, reading any) error { b, err := json.Marshal(reading) if err != nil { hub.CaptureException(err) log.Printf("Error marshaling JSON: %v", err) return err } _, err = js.Publish(subject, b) if err != nil { hub.CaptureException(err) log.Print(err) } return err }