This commit is contained in:
parent
9e9992a0d9
commit
7f4718fabc
9 changed files with 124 additions and 62 deletions
|
@ -11,9 +11,10 @@ import (
|
|||
)
|
||||
|
||||
func shutdownHTTP(ctx context.Context, wg *sync.WaitGroup, srv *http.Server) {
|
||||
defer wg.Done()
|
||||
|
||||
hub := sentry.CurrentHub().Clone()
|
||||
defer hub.Flush(10 * time.Second)
|
||||
defer wg.Done()
|
||||
|
||||
ctx2, cancel := context.WithTimeout(ctx, 10*time.Second)
|
||||
defer cancel()
|
||||
|
|
|
@ -11,6 +11,7 @@ import (
|
|||
"git.joco.dk/sng/fermentord/internal/controllers"
|
||||
"git.joco.dk/sng/fermentord/internal/dwingest"
|
||||
"git.joco.dk/sng/fermentord/internal/hw"
|
||||
"git.joco.dk/sng/fermentord/internal/lcd"
|
||||
"git.joco.dk/sng/fermentord/pkg/temperature"
|
||||
"git.joco.dk/sng/fermentord/pkg/tilt"
|
||||
"github.com/fsnotify/fsnotify"
|
||||
|
@ -20,18 +21,18 @@ import (
|
|||
)
|
||||
|
||||
func mainLoop(ctx context.Context, wg *sync.WaitGroup, js nats.JetStream, config *configuration.Configuration) {
|
||||
hub := sentry.CurrentHub().Clone()
|
||||
defer hub.Flush(10 * time.Second)
|
||||
defer wg.Done()
|
||||
|
||||
hub := sentry.CurrentHub().Clone()
|
||||
defer hub.Flush(10 * time.Second)
|
||||
|
||||
// Display
|
||||
/*display, err := lcd.NewLCD()
|
||||
display, err := lcd.NewLCD()
|
||||
if err != nil {
|
||||
hub.CaptureException(err)
|
||||
log.Fatal(err)
|
||||
}
|
||||
defer display.Close()
|
||||
*/
|
||||
|
||||
// Controller
|
||||
ctrl := controllers.NewChamberController(*config)
|
||||
|
@ -60,8 +61,8 @@ func mainLoop(ctx context.Context, wg *sync.WaitGroup, js nats.JetStream, config
|
|||
}
|
||||
defer gpio.Close()
|
||||
|
||||
wg.Add(4)
|
||||
//go display.Run(ctx, wg)
|
||||
wg.Add(5)
|
||||
go display.Run(ctx, wg)
|
||||
go ctrl.Run(ctx, wg)
|
||||
go ingest.Run(ctx, wg, js, config)
|
||||
go temperature.PollSensors(ctx, wg, 1*time.Second, config.Sensors.Weight)
|
||||
|
|
|
@ -57,8 +57,7 @@ func main() {
|
|||
mux.HandleFunc("/health", api.HealthCheck)
|
||||
|
||||
// Main
|
||||
ctxb := context.Background()
|
||||
ctx, shutdown := context.WithCancel(ctxb)
|
||||
ctx, shutdown := context.WithCancel(context.Background())
|
||||
wg := &sync.WaitGroup{}
|
||||
|
||||
wg.Add(1)
|
||||
|
@ -71,7 +70,7 @@ func main() {
|
|||
|
||||
// Initiate graceful shutdown.
|
||||
wg.Add(1)
|
||||
go shutdownHTTP(ctxb, wg, srv)
|
||||
go shutdownHTTP(context.Background(), wg, srv)
|
||||
shutdown()
|
||||
wg.Wait()
|
||||
nc.Close()
|
||||
|
|
|
@ -2,7 +2,6 @@ package configuration
|
|||
|
||||
import (
|
||||
"log"
|
||||
"time"
|
||||
|
||||
"github.com/getsentry/sentry-go"
|
||||
"github.com/spf13/viper"
|
||||
|
@ -56,9 +55,6 @@ type Configuration struct {
|
|||
}
|
||||
|
||||
func LoadConfiguration() *Configuration {
|
||||
hub := sentry.CurrentHub().Clone()
|
||||
defer hub.Flush(10 * time.Second)
|
||||
|
||||
viper.SetDefault("cooler_enabled", true)
|
||||
viper.SetDefault("heater_enabled", true)
|
||||
viper.SetDefault("http.port", 8000)
|
||||
|
@ -91,7 +87,7 @@ func LoadConfiguration() *Configuration {
|
|||
|
||||
config := &Configuration{}
|
||||
if err := viper.Unmarshal(config); err != nil {
|
||||
hub.CaptureException(err)
|
||||
sentry.CaptureException(err)
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
||||
|
|
|
@ -2,6 +2,7 @@ package controllers
|
|||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log"
|
||||
"sync"
|
||||
"time"
|
||||
|
@ -62,8 +63,8 @@ func NewChamberController(config configuration.Configuration) *ChamberController
|
|||
}
|
||||
|
||||
func (p *ChamberController) Run(ctx context.Context, wg *sync.WaitGroup) {
|
||||
defer p.hub.Flush(10 * time.Second)
|
||||
defer wg.Done()
|
||||
defer p.hub.Flush(10 * time.Second)
|
||||
|
||||
ticker := time.NewTicker(1 * time.Second)
|
||||
|
||||
|
@ -173,9 +174,10 @@ func (p *ChamberController) computeChamberState() ChamberState {
|
|||
//p.chamberTemperature > chamberTargetTemp+p.config.DeltaTemperatureCool
|
||||
|
||||
if cooler && heater {
|
||||
// This should not happen!
|
||||
log.Print("Trying to set cooler and heater on at the same time. This should NOT happen! Setting IDLE mode.")
|
||||
p.hub.CaptureMessage("Trying to set cooler and heater on at the same time")
|
||||
// This should NOT happen!
|
||||
err := fmt.Errorf("heater and cooler activated at same time")
|
||||
p.hub.CaptureException(err)
|
||||
log.Print(err)
|
||||
next = ChamberStateIdle
|
||||
} else if !cooler && !heater {
|
||||
next = ChamberStateIdle
|
||||
|
|
|
@ -3,6 +3,7 @@ package dwingest
|
|||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"log"
|
||||
"sync"
|
||||
"time"
|
||||
|
@ -18,61 +19,66 @@ import (
|
|||
type DWIngest struct {
|
||||
chTemperatureReading chan temperature.TemperatureReading
|
||||
chState chan controllers.ChamberState
|
||||
|
||||
hub *sentry.Hub
|
||||
}
|
||||
|
||||
func NewDWIngest() *DWIngest {
|
||||
return &DWIngest{
|
||||
chTemperatureReading: make(chan temperature.TemperatureReading, 3600),
|
||||
chState: make(chan controllers.ChamberState, 100),
|
||||
hub: sentry.CurrentHub().Clone(),
|
||||
}
|
||||
}
|
||||
|
||||
func (p *DWIngest) AddReading(reading temperature.TemperatureReading) {
|
||||
// Non-blocking send
|
||||
select {
|
||||
case p.chTemperatureReading <- reading:
|
||||
break
|
||||
|
||||
default:
|
||||
err := fmt.Errorf("channel overflow on dwingest temperature channel")
|
||||
p.hub.CaptureException(err)
|
||||
log.Print(err)
|
||||
}
|
||||
}
|
||||
|
||||
func (p *DWIngest) AddState(state controllers.ChamberState) {
|
||||
// Non-blocking send
|
||||
select {
|
||||
case p.chState <- state:
|
||||
break
|
||||
|
||||
default:
|
||||
err := fmt.Errorf("channel overflow on dwingest state channel")
|
||||
p.hub.CaptureException(err)
|
||||
log.Print(err)
|
||||
}
|
||||
}
|
||||
|
||||
func (p *DWIngest) Run(ctx context.Context, wg *sync.WaitGroup, js nats.JetStream, config *configuration.Configuration) {
|
||||
hub := sentry.CurrentHub().Clone()
|
||||
defer hub.Flush(10 * time.Second)
|
||||
defer wg.Done()
|
||||
defer p.hub.Flush(10 * time.Second)
|
||||
|
||||
for {
|
||||
select {
|
||||
case reading := <-p.chTemperatureReading:
|
||||
publish(config.NATS.Subject.Temp, js, hub, reading)
|
||||
publish(config.NATS.Subject.Temp, js, p.hub, reading)
|
||||
|
||||
case state := <-p.chState:
|
||||
reading := State{
|
||||
publish(config.NATS.Subject.State, js, p.hub, State{
|
||||
Time: time.Now().UTC(),
|
||||
State: controllers.ChamberStateMap[state],
|
||||
}
|
||||
publish(config.NATS.Subject.State, js, hub, reading)
|
||||
})
|
||||
|
||||
case t := <-tilt.C:
|
||||
reading := Tilt{
|
||||
publish(config.NATS.Subject.Tilt, js, p.hub, Tilt{
|
||||
Time: time.Now().UTC(),
|
||||
Color: string(t.Color()),
|
||||
Gravity: t.Gravity(),
|
||||
Temperature: t.Celsius(),
|
||||
}
|
||||
publish(config.NATS.Subject.Tilt, js, hub, reading)
|
||||
})
|
||||
|
||||
case <-ctx.Done():
|
||||
close(p.chTemperatureReading)
|
||||
close(p.chState)
|
||||
|
||||
return
|
||||
}
|
||||
}
|
||||
|
|
|
@ -11,6 +11,7 @@ import (
|
|||
"git.joco.dk/sng/fermentord/pkg/temperature"
|
||||
device "github.com/d2r2/go-hd44780"
|
||||
"github.com/d2r2/go-i2c"
|
||||
"github.com/getsentry/sentry-go"
|
||||
)
|
||||
|
||||
type LCD struct {
|
||||
|
@ -24,6 +25,7 @@ type LCD struct {
|
|||
chState chan controllers.ChamberState
|
||||
chSetpoint chan float64
|
||||
lastUpdate time.Time
|
||||
hub *sentry.Hub
|
||||
}
|
||||
|
||||
func NewLCD() (*LCD, error) {
|
||||
|
@ -34,6 +36,7 @@ func NewLCD() (*LCD, error) {
|
|||
chState: make(chan controllers.ChamberState, 10),
|
||||
chSetpoint: make(chan float64, 10),
|
||||
lastUpdate: time.Now(),
|
||||
hub: sentry.CurrentHub().Clone(),
|
||||
}
|
||||
|
||||
p.bus, err = i2c.NewI2C(0x27, 0)
|
||||
|
@ -47,30 +50,32 @@ func NewLCD() (*LCD, error) {
|
|||
return nil, err
|
||||
}
|
||||
|
||||
err = p.lcd.BacklightOn()
|
||||
err = p.lcd.BacklightOff()
|
||||
if err != nil {
|
||||
p.bus.Close()
|
||||
return nil, err
|
||||
}
|
||||
|
||||
/*
|
||||
err = p.lcd.ShowMessage("Fermentor", device.SHOW_LINE_1)
|
||||
if err != nil {
|
||||
p.bus.Close()
|
||||
return nil, err
|
||||
}
|
||||
|
||||
err = p.lcd.ShowMessage("Initializing", device.SHOW_LINE_2 /*|device.SHOW_BLANK_PADDING*/)
|
||||
err = p.lcd.ShowMessage("Initializing", device.SHOW_LINE_2|device.SHOW_BLANK_PADDING)
|
||||
if err != nil {
|
||||
p.bus.Close()
|
||||
return nil, err
|
||||
}
|
||||
|
||||
*/
|
||||
return p, nil
|
||||
|
||||
}
|
||||
|
||||
func (p *LCD) Close() error {
|
||||
if err := p.lcd.BacklightOff(); err != nil {
|
||||
p.hub.CaptureException(err)
|
||||
log.Print(err)
|
||||
}
|
||||
|
||||
|
@ -78,19 +83,44 @@ func (p *LCD) Close() error {
|
|||
}
|
||||
|
||||
func (p *LCD) SetTemperature(t temperature.TemperatureReading) {
|
||||
p.chTemp <- t
|
||||
select {
|
||||
case p.chTemp <- t:
|
||||
break
|
||||
|
||||
default:
|
||||
err := fmt.Errorf("channel overflow on display temperature channel")
|
||||
p.hub.CaptureException(err)
|
||||
log.Print(err)
|
||||
}
|
||||
}
|
||||
|
||||
func (p *LCD) SetSetpointTemp(t float64) {
|
||||
p.chSetpoint <- t
|
||||
select {
|
||||
case p.chSetpoint <- t:
|
||||
break
|
||||
|
||||
default:
|
||||
err := fmt.Errorf("channel overflow on display setpoint temperature channel")
|
||||
p.hub.CaptureException(err)
|
||||
log.Print(err)
|
||||
}
|
||||
}
|
||||
|
||||
func (p *LCD) SetState(state controllers.ChamberState) {
|
||||
p.chState <- state
|
||||
select {
|
||||
case p.chState <- state:
|
||||
break
|
||||
|
||||
default:
|
||||
err := fmt.Errorf("channel overflow on display state channel")
|
||||
p.hub.CaptureException(err)
|
||||
log.Print(err)
|
||||
}
|
||||
}
|
||||
|
||||
func (p *LCD) Run(ctx context.Context, wg *sync.WaitGroup) {
|
||||
defer wg.Done()
|
||||
defer p.hub.Flush(10 * time.Second)
|
||||
|
||||
for {
|
||||
select {
|
||||
|
@ -98,7 +128,11 @@ func (p *LCD) Run(ctx context.Context, wg *sync.WaitGroup) {
|
|||
p.ambient = t.Ambient
|
||||
p.chamber = t.Chamber
|
||||
p.wort = t.Wort
|
||||
p.update()
|
||||
|
||||
if err := p.update(); err != nil {
|
||||
p.hub.CaptureException(err)
|
||||
log.Print(err)
|
||||
}
|
||||
|
||||
case state := <-p.chState:
|
||||
switch state {
|
||||
|
@ -111,19 +145,29 @@ func (p *LCD) Run(ctx context.Context, wg *sync.WaitGroup) {
|
|||
case controllers.ChamberStateCooling:
|
||||
p.state = "Co"
|
||||
}
|
||||
p.update()
|
||||
|
||||
if err := p.update(); err != nil {
|
||||
p.hub.CaptureException(err)
|
||||
log.Print(err)
|
||||
}
|
||||
|
||||
case t := <-p.chSetpoint:
|
||||
p.setpoint = t
|
||||
p.update()
|
||||
|
||||
if err := p.update(); err != nil {
|
||||
p.hub.CaptureException(err)
|
||||
log.Print(err)
|
||||
}
|
||||
|
||||
case <-ctx.Done():
|
||||
p.lcd.ShowMessage("Fermentor", device.SHOW_LINE_1|device.SHOW_BLANK_PADDING)
|
||||
p.lcd.ShowMessage("Shutting down", device.SHOW_LINE_2|device.SHOW_BLANK_PADDING)
|
||||
|
||||
close(p.chSetpoint)
|
||||
close(p.chState)
|
||||
close(p.chTemp)
|
||||
if err := p.lcd.ShowMessage("Fermentor", device.SHOW_LINE_1|device.SHOW_BLANK_PADDING); err != nil {
|
||||
p.hub.CaptureException(err)
|
||||
log.Print(err)
|
||||
}
|
||||
if err := p.lcd.ShowMessage("Shutting down", device.SHOW_LINE_2|device.SHOW_BLANK_PADDING); err != nil {
|
||||
p.hub.CaptureException(err)
|
||||
log.Print(err)
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
|
|
@ -5,6 +5,7 @@ package temperature
|
|||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"log"
|
||||
"os"
|
||||
|
@ -61,9 +62,10 @@ func configure(config configuration.Configuration) {
|
|||
}
|
||||
|
||||
func PollSensors(ctx context.Context, wg *sync.WaitGroup, readingInterval time.Duration, filterWeight float64) {
|
||||
defer wg.Done()
|
||||
|
||||
hub := sentry.CurrentHub().Clone()
|
||||
defer hub.Flush(10 * time.Second)
|
||||
defer wg.Done()
|
||||
|
||||
if readingInterval < sensorConversionTime {
|
||||
log.Fatalf("Reading interval must be at least %v ms.", sensorConversionTime)
|
||||
|
@ -179,7 +181,9 @@ func readSensors(hub *sentry.Hub) {
|
|||
break
|
||||
|
||||
default:
|
||||
log.Fatal("Temperature channel overflow!")
|
||||
err := fmt.Errorf("channel overflow on ds18b20 temperature channel")
|
||||
hub.CaptureException(err)
|
||||
log.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -219,12 +223,12 @@ func read(sensor string) (int64, error) {
|
|||
raw := string(data)
|
||||
|
||||
if !strings.Contains(raw, " YES") {
|
||||
return 0.0, ErrReadSensor
|
||||
return 0.0, fmt.Errorf("%w: checksum failed", ErrReadSensor)
|
||||
}
|
||||
|
||||
i := strings.LastIndex(raw, "t=")
|
||||
if i == -1 {
|
||||
return 0.0, ErrReadSensor
|
||||
return 0.0, fmt.Errorf("%w: t= not found", ErrReadSensor)
|
||||
}
|
||||
|
||||
c, err := strconv.ParseInt(raw[i+2:len(raw)-1], 10, 64)
|
||||
|
|
|
@ -2,21 +2,27 @@ package tilt
|
|||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/getsentry/sentry-go"
|
||||
)
|
||||
|
||||
var (
|
||||
C chan Tilt
|
||||
hub *sentry.Hub
|
||||
)
|
||||
|
||||
func init() {
|
||||
C = make(chan Tilt, 10)
|
||||
hub = sentry.CurrentHub().Clone()
|
||||
}
|
||||
|
||||
func PollSensors(ctx context.Context, wg *sync.WaitGroup, interval time.Duration, scanDuration time.Duration) {
|
||||
defer wg.Done()
|
||||
defer hub.Flush(10 * time.Second)
|
||||
|
||||
if interval < scanDuration {
|
||||
log.Fatal("Unable to use interval < scanDuration")
|
||||
|
@ -54,6 +60,9 @@ func scan(ctx context.Context, timeout time.Duration) {
|
|||
// Message sent
|
||||
default:
|
||||
// No recipients on channel
|
||||
err := fmt.Errorf("channel overflow on tilt reading channel")
|
||||
hub.CaptureException(err)
|
||||
log.Print(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue