Refactor dwingest
All checks were successful
continuous-integration/drone/push Build is passing

This commit is contained in:
Søren Rasmussen 2022-08-02 20:39:31 +02:00
parent 5ecc4ba27a
commit deb1ef3578
3 changed files with 60 additions and 38 deletions

View file

@ -81,24 +81,13 @@ func mainLoop(ctx context.Context, wg *sync.WaitGroup, js nats.JetStream) {
log.Print("Powering down one wire bus for 20 seconds") log.Print("Powering down one wire bus for 20 seconds")
gpio.StopOneWirePower() gpio.StopOneWirePower()
oneWirePowerUpTimer.Reset(20 * time.Second) oneWirePowerUpTimer.Reset(20 * time.Second)
ingest.AddEvent("ONEWIRE_RESET")
// Publish to NATS
e := dwingest.Event{
Time: time.Now().UTC(),
Event: "ONEWIRE_RESET",
}
if err := ingest.Publish(config.NATS.Subject.Event, e); err != nil {
hub.CaptureException(err)
log.Print(err)
}
case <-oneWirePowerUpChannel: case <-oneWirePowerUpChannel:
log.Print("Powering up one wire bus") log.Print("Powering up one wire bus")
gpio.StartOneWirePower() gpio.StartOneWirePower()
case ev := <-gpio.C: case ev := <-gpio.C:
var evs string
switch ev { switch ev {
case hw.DoorClosed: case hw.DoorClosed:
log.Print("Door closed") log.Print("Door closed")
@ -107,24 +96,14 @@ func mainLoop(ctx context.Context, wg *sync.WaitGroup, js nats.JetStream) {
gpio.StartFan() gpio.StartFan()
} }
ctrl.Resume() ctrl.Resume()
evs = "DOOR_CLOSED" ingest.AddEvent("DOOR_CLOSED")
case hw.DoorOpened: case hw.DoorOpened:
log.Print("Door opened") log.Print("Door opened")
ctrl.Pause() ctrl.Pause()
gpio.LightsOn() gpio.LightsOn()
gpio.StopFan() gpio.StopFan()
evs = "DOOR_OPENED" ingest.AddEvent("DOOR_OPENED")
}
// Publish to NATS
e := dwingest.Event{
Time: time.Now().UTC(),
Event: evs,
}
if err := ingest.Publish(config.NATS.Subject.Event, e); err != nil {
hub.CaptureException(err)
log.Print(err)
} }
case state := <-ctrl.C: case state := <-ctrl.C:

View file

@ -22,17 +22,18 @@ type DWIngest struct {
chTemperatureReading chan temperature.TemperatureReading chTemperatureReading chan temperature.TemperatureReading
chState chan controllers.ChamberState chState chan controllers.ChamberState
chEvent chan string
js nats.JetStream js nats.JetStream
config configuration.Configuration config configuration.Configuration
hub *sentry.Hub hub *sentry.Hub
serial int64
} }
func NewDWIngest(js nats.JetStream, config configuration.Configuration) *DWIngest { func NewDWIngest(js nats.JetStream, config configuration.Configuration) *DWIngest {
return &DWIngest{ return &DWIngest{
chTemperatureReading: make(chan temperature.TemperatureReading, 3600), chTemperatureReading: make(chan temperature.TemperatureReading, 3600),
chState: make(chan controllers.ChamberState, 100), chState: make(chan controllers.ChamberState, 100),
chEvent: make(chan string, 100),
hub: sentry.CurrentHub().Clone(), hub: sentry.CurrentHub().Clone(),
js: js, js: js,
config: config, config: config,
@ -64,6 +65,18 @@ func (p *DWIngest) AddState(state controllers.ChamberState) {
} }
} }
func (p *DWIngest) AddEvent(event string) {
select {
case p.chEvent <- event:
break
default:
err := fmt.Errorf("channel overflow on dwingest event channel")
p.hub.CaptureException(err)
log.Print(err)
}
}
func (p *DWIngest) Run(ctx context.Context, wg *sync.WaitGroup) { func (p *DWIngest) Run(ctx context.Context, wg *sync.WaitGroup) {
defer wg.Done() defer wg.Done()
defer p.hub.Flush(10 * time.Second) defer p.hub.Flush(10 * time.Second)
@ -71,21 +84,17 @@ func (p *DWIngest) Run(ctx context.Context, wg *sync.WaitGroup) {
for { for {
select { select {
case reading := <-p.chTemperatureReading: case reading := <-p.chTemperatureReading:
p.Publish(p.config.NATS.Subject.Temp, reading) p.publishTemperatureReading(reading)
case state := <-p.chState: case state := <-p.chState:
p.Publish(p.config.NATS.Subject.State, State{ p.publishState(state)
Time: time.Now().UTC(),
State: controllers.ChamberStateMap[state],
})
case ev := <-p.chEvent:
p.publishEvent(ev)
// TODO Refactor
case t := <-tilt.C: case t := <-tilt.C:
p.Publish(p.config.NATS.Subject.Tilt, Tilt{ p.publishTilt(t)
Time: time.Now().UTC(),
Color: string(t.Color()),
Gravity: t.Gravity(),
Temperature: t.Celsius(),
})
case config := <-p.ConfigUpdates: case config := <-p.ConfigUpdates:
p.config = config p.config = config
@ -96,7 +105,7 @@ func (p *DWIngest) Run(ctx context.Context, wg *sync.WaitGroup) {
} }
} }
func (p *DWIngest) Publish(subject string, reading any) error { func (p *DWIngest) publish(subject string, reading any) error {
b, err := json.Marshal(reading) b, err := json.Marshal(reading)
if err != nil { if err != nil {
p.hub.CaptureException(err) p.hub.CaptureException(err)
@ -113,7 +122,7 @@ func (p *DWIngest) Publish(subject string, reading any) error {
msg := nats.NewMsg(subject) msg := nats.NewMsg(subject)
msg.Header.Add("Nats-Msg-Id", msgID.String()) msg.Header.Add("Nats-Msg-Id", msgID.String())
msg.Data = b msg.Data = b
_, err = p.js.PublishMsg(msg, nats.AckWait(5*time.Second)) _, err = p.js.PublishMsg(msg, nats.AckWait(30*time.Second))
if err != nil { if err != nil {
p.hub.CaptureException(err) p.hub.CaptureException(err)
log.Print(err) log.Print(err)
@ -122,3 +131,36 @@ func (p *DWIngest) Publish(subject string, reading any) error {
return nil return nil
} }
func (p *DWIngest) publishTilt(t tilt.Tilt) error {
ev := Tilt{
Time: time.Now().UTC(),
Color: string(t.Color()),
Gravity: t.Gravity(),
Temperature: t.Celsius(),
}
return p.publish(p.config.NATS.Subject.Tilt, ev)
}
func (p *DWIngest) publishTemperatureReading(reading temperature.TemperatureReading) error {
return p.publish(p.config.NATS.Subject.Temp, reading)
}
func (p *DWIngest) publishState(state controllers.ChamberState) error {
st := State{
Time: time.Now().UTC(),
State: controllers.ChamberStateMap[state],
}
return p.publish(p.config.NATS.Subject.State, st)
}
func (p *DWIngest) publishEvent(event string) error {
ev := Event{
Time: time.Now().UTC(),
Event: event,
}
return p.publish(p.config.NATS.Subject.Event, ev)
}

View file

@ -58,6 +58,7 @@ func scan(ctx context.Context, timeout time.Duration) {
select { select {
case C <- t: case C <- t:
// Message sent // Message sent
default: default:
// No recipients on channel // No recipients on channel
err := fmt.Errorf("channel overflow on tilt reading channel") err := fmt.Errorf("channel overflow on tilt reading channel")