package dwingest import ( "context" "encoding/json" "log" "sync" "time" "git.joco.dk/sng/fermentord/internal/configuration" "git.joco.dk/sng/fermentord/internal/controllers" "git.joco.dk/sng/fermentord/pkg/temperature" "github.com/getsentry/sentry-go" "github.com/nats-io/nats.go" ) type DWIngest struct { chTemperatureReading chan temperature.TemperatureReading chState chan controllers.ChamberState } func NewDWIngest() *DWIngest { return &DWIngest{ chTemperatureReading: make(chan temperature.TemperatureReading, 3600), chState: make(chan controllers.ChamberState, 100), } } func (p *DWIngest) AddReading(reading temperature.TemperatureReading) { p.chTemperatureReading <- reading } func (p *DWIngest) AddState(state controllers.ChamberState) { p.chState <- state } func (p *DWIngest) Run(ctx context.Context, wg *sync.WaitGroup, js nats.JetStream, config *configuration.ControllerConfig) { hub := sentry.CurrentHub().Clone() defer hub.Flush(10 * time.Second) defer wg.Done() for { select { case reading := <-p.chTemperatureReading: b, err := json.Marshal(reading) if err != nil { hub.CaptureException(err) log.Printf("Error marshaling JSON: %v", err) break } _, err = js.Publish(config.NATS.Subject.Temp, b) if err != nil { hub.CaptureException(err) log.Printf("Error publishing to NATS: %v", err) } case state := <-p.chState: _, err := js.Publish(config.NATS.Subject.State, []byte(controllers.ChamberStateMap[state])) if err != nil { hub.CaptureException(err) log.Printf("Error publishing to NATS: %v", err) } case <-ctx.Done(): close(p.chTemperatureReading) close(p.chState) return } } }