2022-07-19 09:11:50 +00:00
|
|
|
package dwingest
|
|
|
|
|
|
|
|
import (
|
|
|
|
"context"
|
|
|
|
"encoding/json"
|
2022-07-25 08:55:25 +00:00
|
|
|
"fmt"
|
2022-07-19 09:11:50 +00:00
|
|
|
"log"
|
|
|
|
"sync"
|
|
|
|
"time"
|
|
|
|
|
|
|
|
"git.joco.dk/sng/fermentord/internal/configuration"
|
|
|
|
"git.joco.dk/sng/fermentord/internal/controllers"
|
|
|
|
"git.joco.dk/sng/fermentord/pkg/temperature"
|
2022-07-23 14:10:34 +00:00
|
|
|
"git.joco.dk/sng/fermentord/pkg/tilt"
|
2022-07-19 09:11:50 +00:00
|
|
|
"github.com/getsentry/sentry-go"
|
2022-07-29 21:53:55 +00:00
|
|
|
"github.com/gofrs/uuid"
|
2022-07-19 09:11:50 +00:00
|
|
|
"github.com/nats-io/nats.go"
|
|
|
|
)
|
|
|
|
|
|
|
|
type DWIngest struct {
|
|
|
|
chTemperatureReading chan temperature.TemperatureReading
|
|
|
|
chState chan controllers.ChamberState
|
2022-07-25 08:55:25 +00:00
|
|
|
|
2022-07-29 21:53:55 +00:00
|
|
|
hub *sentry.Hub
|
|
|
|
serial int64
|
2022-07-19 09:11:50 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
func NewDWIngest() *DWIngest {
|
|
|
|
return &DWIngest{
|
|
|
|
chTemperatureReading: make(chan temperature.TemperatureReading, 3600),
|
|
|
|
chState: make(chan controllers.ChamberState, 100),
|
2022-07-25 08:55:25 +00:00
|
|
|
hub: sentry.CurrentHub().Clone(),
|
2022-07-19 09:11:50 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func (p *DWIngest) AddReading(reading temperature.TemperatureReading) {
|
2022-07-24 07:38:40 +00:00
|
|
|
select {
|
|
|
|
case p.chTemperatureReading <- reading:
|
2022-07-25 08:55:25 +00:00
|
|
|
break
|
|
|
|
|
2022-07-24 07:38:40 +00:00
|
|
|
default:
|
2022-07-25 08:55:25 +00:00
|
|
|
err := fmt.Errorf("channel overflow on dwingest temperature channel")
|
|
|
|
p.hub.CaptureException(err)
|
|
|
|
log.Print(err)
|
2022-07-24 07:38:40 +00:00
|
|
|
}
|
2022-07-19 09:11:50 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
func (p *DWIngest) AddState(state controllers.ChamberState) {
|
2022-07-24 07:38:40 +00:00
|
|
|
select {
|
|
|
|
case p.chState <- state:
|
2022-07-25 08:55:25 +00:00
|
|
|
break
|
|
|
|
|
2022-07-24 07:38:40 +00:00
|
|
|
default:
|
2022-07-25 08:55:25 +00:00
|
|
|
err := fmt.Errorf("channel overflow on dwingest state channel")
|
|
|
|
p.hub.CaptureException(err)
|
|
|
|
log.Print(err)
|
2022-07-24 07:38:40 +00:00
|
|
|
}
|
2022-07-19 09:11:50 +00:00
|
|
|
}
|
|
|
|
|
2022-03-15 16:20:47 +00:00
|
|
|
func (p *DWIngest) Run(ctx context.Context, wg *sync.WaitGroup, js nats.JetStream, config *configuration.Configuration) {
|
2022-07-19 09:11:50 +00:00
|
|
|
defer wg.Done()
|
2022-07-25 08:55:25 +00:00
|
|
|
defer p.hub.Flush(10 * time.Second)
|
2022-07-19 09:11:50 +00:00
|
|
|
|
|
|
|
for {
|
|
|
|
select {
|
|
|
|
case reading := <-p.chTemperatureReading:
|
2022-07-29 21:53:55 +00:00
|
|
|
p.publish(config.NATS.Subject.Temp, js, reading)
|
2022-07-19 09:11:50 +00:00
|
|
|
|
2022-07-23 14:10:34 +00:00
|
|
|
case state := <-p.chState:
|
2022-07-29 21:53:55 +00:00
|
|
|
p.publish(config.NATS.Subject.State, js, State{
|
2022-07-23 14:39:56 +00:00
|
|
|
Time: time.Now().UTC(),
|
2022-07-23 14:10:34 +00:00
|
|
|
State: controllers.ChamberStateMap[state],
|
2022-07-25 08:55:25 +00:00
|
|
|
})
|
2022-07-19 09:11:50 +00:00
|
|
|
|
2022-07-23 14:10:34 +00:00
|
|
|
case t := <-tilt.C:
|
2022-07-29 21:53:55 +00:00
|
|
|
p.publish(config.NATS.Subject.Tilt, js, Tilt{
|
2022-07-23 14:39:56 +00:00
|
|
|
Time: time.Now().UTC(),
|
2022-07-23 14:10:34 +00:00
|
|
|
Color: string(t.Color()),
|
|
|
|
Gravity: t.Gravity(),
|
|
|
|
Temperature: t.Celsius(),
|
2022-07-25 08:55:25 +00:00
|
|
|
})
|
2022-07-19 09:11:50 +00:00
|
|
|
|
|
|
|
case <-ctx.Done():
|
|
|
|
return
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
2022-07-23 14:10:34 +00:00
|
|
|
|
2022-07-29 21:53:55 +00:00
|
|
|
func (p *DWIngest) publish(subject string, js nats.JetStream, reading any) error {
|
2022-07-23 14:10:34 +00:00
|
|
|
b, err := json.Marshal(reading)
|
|
|
|
if err != nil {
|
2022-07-29 21:53:55 +00:00
|
|
|
p.hub.CaptureException(err)
|
2022-07-23 14:10:34 +00:00
|
|
|
log.Printf("Error marshaling JSON: %v", err)
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2022-07-29 21:53:55 +00:00
|
|
|
msgID, err := uuid.NewV7(uuid.MillisecondPrecision)
|
|
|
|
if err != nil {
|
|
|
|
p.hub.CaptureException(err)
|
|
|
|
log.Print(err)
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
msg := nats.NewMsg(subject)
|
|
|
|
msg.Header.Add("Nats-Msg-Id", msgID.String())
|
|
|
|
msg.Data = b
|
|
|
|
_, err = js.PublishMsg(msg, nats.AckWait(5*time.Second))
|
2022-07-23 14:10:34 +00:00
|
|
|
if err != nil {
|
2022-07-29 21:53:55 +00:00
|
|
|
p.hub.CaptureException(err)
|
2022-07-26 10:59:17 +00:00
|
|
|
log.Print(err)
|
2022-07-29 21:53:55 +00:00
|
|
|
return err
|
2022-07-23 14:10:34 +00:00
|
|
|
}
|
|
|
|
|
2022-07-29 21:53:55 +00:00
|
|
|
return nil
|
2022-07-23 14:10:34 +00:00
|
|
|
}
|