package temperature // Kernel documentation: https://www.kernel.org/doc/html/latest/w1/slaves/w1_therm.html import ( "context" "errors" "fmt" "log" "os" "path/filepath" "strconv" "strings" "sync" "time" "git.joco.dk/snr/fermentord/internal/configuration" "git.joco.dk/snr/fermentord/internal/metrics" "github.com/getsentry/sentry-go" ) type BulkReadBusState int const ( NaN = -999 BulkReadIdle BulkReadBusState = 0 BulkReadBusy BulkReadBusState = -1 BulkReadReady BulkReadBusState = 1 sensorConversionTime = 750 * time.Millisecond busPollDelay = 10 * time.Millisecond ) var ( // ErrReadSensor indicates that the sensor could not be read ErrReadSensor = errors.New("failed to read sensor") ) var ( // Reading receives the temperature readings Reading chan TemperatureReading ConfigUpdate chan configuration.Configuration RequestReset chan bool accErrTrigger int sensors []Sensor ) func init() { Reading = make(chan TemperatureReading, 30) ConfigUpdate = make(chan configuration.Configuration, 1) RequestReset = make(chan bool) sensors = make([]Sensor, 0) } func configure(config configuration.Configuration) { sensors = []Sensor{ NewSensor("Ambient", config.Sensors.Ambient, config.Sensors.Weight), NewSensor("Chamber", config.Sensors.Chamber, config.Sensors.Weight), NewSensor("Wort", config.Sensors.Wort, config.Sensors.Weight), } } func PollSensors(ctx context.Context, wg *sync.WaitGroup, readingInterval time.Duration, filterWeight float64) { defer wg.Done() hub := sentry.CurrentHub().Clone() defer hub.Flush(10 * time.Second) if readingInterval < sensorConversionTime { log.Fatalf("Reading interval must be at least %v ms.", sensorConversionTime) } ticker := time.NewTicker(readingInterval) defer ticker.Stop() for { select { case <-ticker.C: poll_bus: state, err := pollBusState() if err != nil { accErrTrigger++ hub.CaptureException(err) log.Print(err) break } switch state { case BulkReadBusy: time.Sleep(busPollDelay) goto poll_bus case BulkReadIdle: start := time.Now() // Trigger a bulk read to start conversion on all sensors. if err := triggerBulkRead(); err != nil { accErrTrigger++ hub.CaptureException(err) log.Print(err) break } // Ensure that we wait for sensors to convert data. deltaSleep := sensorConversionTime - time.Since(start) if deltaSleep > 0 { // Wait for sensors to convert data. time.Sleep(deltaSleep) } goto poll_bus case BulkReadReady: if !readSensors(hub) { accErrTrigger++ } else { accErrTrigger = 0 } } case c := <-ConfigUpdate: configure(c) case <-ctx.Done(): return } if accErrTrigger > 60 { select { case RequestReset <- true: log.Print("Thermal bulk read failed 60 times in a row -- bus reset") // Reset counter to allow 60 more reads accErrTrigger = 0 default: log.Fatal("Thermal bulk read failed 60 times in a row -- terminating") } } } } func readSensors(hub *sentry.Hub) bool { r := TemperatureReading{ Time: time.Now(), Ambient: NaN, Chamber: NaN, Wort: NaN, } hasReadFailure := false for _, sensor := range sensors { time.Sleep(50 * time.Millisecond) t, err := read(sensor.Path) if err != nil { hasReadFailure = true sensor.Fail() hub.CaptureException(err) log.Printf("%v/%v", sensor.Name, err) metrics.TemperatureSensorReadingStatus.WithLabelValues(sensor.Name, "failed").Inc() continue } tw := sensor.Update(t) switch sensor.Name { case "Ambient": r.Ambient = tw case "Chamber": r.Chamber = tw case "Wort": r.Wort = tw } // TODO Move metrics out metrics.TemperatureSensorReadingDegreesCelcius. WithLabelValues(sensor.Name). Set(tw) var statusStr string if err == nil { statusStr = "ok" } else { statusStr = "fail" } metrics.TemperatureSensorReadingStatus. WithLabelValues(sensor.Name, statusStr). Inc() } // Throw away reading if any sensor failed if hasReadFailure { return false } select { case Reading <- r: break default: err := fmt.Errorf("channel overflow on ds18b20 temperature channel") hub.CaptureException(err) log.Fatal(err) } return true } func triggerBulkRead() error { f, err := os.OpenFile("/sys/bus/w1/devices/w1_bus_master1/therm_bulk_read", os.O_WRONLY, 0644) if err != nil { return err } defer f.Close() _, err = f.WriteString("trigger\n") return err } func pollBusState() (BulkReadBusState, error) { b, err := os.ReadFile("/sys/bus/w1/devices/w1_bus_master1/therm_bulk_read") if err != nil { return BulkReadIdle, err } i, err := strconv.Atoi(strings.TrimSpace(string(b))) if err != nil { return BulkReadIdle, err } return BulkReadBusState(i), nil } // read returns the temperature of the specified sensor in millidegrees celcius. func read(sensor string) (int64, error) { path := filepath.Join("/sys/bus/w1/devices", sensor, "w1_slave") data, err := os.ReadFile(path) if err != nil { return NaN, err } raw := string(data) if raw == "" { return NaN, fmt.Errorf("%v: %w: empty file", sensor, ErrReadSensor) } if !strings.Contains(raw, " YES") { return NaN, fmt.Errorf("%v: %w: checksum failed [%v]", sensor, ErrReadSensor, raw) } i := strings.LastIndex(raw, "t=") if i == -1 { return NaN, fmt.Errorf("%v: %w: t= not found in [%v]", sensor, ErrReadSensor, raw) } c, err := strconv.ParseInt(raw[i+2:len(raw)-1], 10, 64) if err != nil { return NaN, err } // Ignore magic value -128 if c == -128000 { return NaN, fmt.Errorf("%v: magic value -128 detected", sensor) } return c, nil }