From fca8d3be475113ad61773c16590e73bae8c348d8 Mon Sep 17 00:00:00 2001 From: Jonas Leder <jonas@jonasled.de> Date: Sun, 26 Jan 2025 14:16:08 +0100 Subject: [PATCH] implement writing data to database --- messageworker/main.go | 44 ++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 41 insertions(+), 3 deletions(-) diff --git a/messageworker/main.go b/messageworker/main.go index 44334de..8281364 100644 --- a/messageworker/main.go +++ b/messageworker/main.go @@ -1,10 +1,16 @@ package messageworker import ( - "fmt" + "encoding/json" "os" zmq "github.com/pebbe/zmq4" + "jonasled.dev/jonasled/ems-esp-logger/database" + "jonasled.dev/jonasled/ems-esp-logger/database/tables" + "jonasled.dev/jonasled/ems-esp-logger/helper" + "jonasled.dev/jonasled/ems-esp-logger/log" + "jonasled.dev/jonasled/ems-esp-logger/types" + "jonasled.dev/jonasled/ems-esp-logger/zeromq" ) func Run() { @@ -12,7 +18,39 @@ func Run() { defer worker.Close() worker.Connect(os.Getenv("ZEROMQ_WORKER")) for { - task, _ := worker.Recv(0) - fmt.Println("Processing:", task) + taskData, _ := worker.Recv(0) + log.Log.Debug("Received new task: ", taskData) + var task types.Task + err := json.Unmarshal([]byte(taskData), &task) + if err != nil { + log.Log.Error("Failed decoding task as JSON: ", err.Error()) + continue + } + var instance tables.Instance + err = database.Db.Where("name = ?", task.Instance).First(&instance).Error + if err != nil { + log.Log.Error("Failed retreiving instance from database, pushing task back to queue: ", err.Error()) + zeromq.Pusher.Send(taskData, 0) + continue + } + + var jsonData map[string]interface{} + err = json.Unmarshal([]byte(task.Data), &jsonData) + if err != nil { + log.Log.Error("Failed decoding boiler JSON: ", err.Error()) + zeromq.Pusher.Send(taskData, 0) + continue + } + for key, value := range jsonData { + valueType := database.GetOrCreateValueType(key) + dbValue := tables.Value{ + Date: task.Date, + ValueType: valueType, + Value: helper.AnyToString(value), + Instance: instance, + } + database.Db.Create(&dbValue) + } + log.Log.Info("Stored boiler data in database") } } -- GitLab