Skip to content
Snippets Groups Projects
Commit fca8d3be authored by Jonas Leder's avatar Jonas Leder
Browse files

implement writing data to database

parent f8915bc4
Branches
No related tags found
1 merge request!1WIP: write values to a database
Pipeline #54568 failed
package messageworker
import (
"fmt"
"encoding/json"
"os"
zmq "github.com/pebbe/zmq4"
"jonasled.dev/jonasled/ems-esp-logger/database"
"jonasled.dev/jonasled/ems-esp-logger/database/tables"
"jonasled.dev/jonasled/ems-esp-logger/helper"
"jonasled.dev/jonasled/ems-esp-logger/log"
"jonasled.dev/jonasled/ems-esp-logger/types"
"jonasled.dev/jonasled/ems-esp-logger/zeromq"
)
func Run() {
......@@ -12,7 +18,39 @@ func Run() {
defer worker.Close()
worker.Connect(os.Getenv("ZEROMQ_WORKER"))
for {
task, _ := worker.Recv(0)
fmt.Println("Processing:", task)
taskData, _ := worker.Recv(0)
log.Log.Debug("Received new task: ", taskData)
var task types.Task
err := json.Unmarshal([]byte(taskData), &task)
if err != nil {
log.Log.Error("Failed decoding task as JSON: ", err.Error())
continue
}
var instance tables.Instance
err = database.Db.Where("name = ?", task.Instance).First(&instance).Error
if err != nil {
log.Log.Error("Failed retreiving instance from database, pushing task back to queue: ", err.Error())
zeromq.Pusher.Send(taskData, 0)
continue
}
var jsonData map[string]interface{}
err = json.Unmarshal([]byte(task.Data), &jsonData)
if err != nil {
log.Log.Error("Failed decoding boiler JSON: ", err.Error())
zeromq.Pusher.Send(taskData, 0)
continue
}
for key, value := range jsonData {
valueType := database.GetOrCreateValueType(key)
dbValue := tables.Value{
Date: task.Date,
ValueType: valueType,
Value: helper.AnyToString(value),
Instance: instance,
}
database.Db.Create(&dbValue)
}
log.Log.Info("Stored boiler data in database")
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment