package messageworker import ( "encoding/json" "time" "jonasled.dev/jonasled/ems-esp-logger/database" "jonasled.dev/jonasled/ems-esp-logger/database/tables" "jonasled.dev/jonasled/ems-esp-logger/helper" "jonasled.dev/jonasled/ems-esp-logger/log" "jonasled.dev/jonasled/ems-esp-logger/queue" "jonasled.dev/jonasled/ems-esp-logger/types" ) func Run() { for { taskData, taskId, err := queue.MainQueue.Dequeue() if err != nil { time.Sleep(time.Second) continue } log.Log.Debug("Received new task: ", taskData) var task types.Task err = json.Unmarshal([]byte(taskData), &task) if err != nil { log.Log.Error("Failed decoding task as JSON: ", err.Error()) return } var instance tables.Instance err = database.Db.Where("name = ?", task.Instance).First(&instance).Error if err != nil { log.Log.Error("Failed retreiving instance from database, pushing task back to queue: ", err.Error()) queue.MainQueue.Enqueue(taskData, 60) return } var jsonData map[string]interface{} err = json.Unmarshal([]byte(task.Data), &jsonData) if err != nil { log.Log.Error("Failed decoding boiler JSON: ", err.Error()) queue.MainQueue.Enqueue(taskData, 60) return } valuesToInsert := []tables.Value{} for key, value := range jsonData { valueType := database.GetOrCreateValueType(key) dbValue := tables.Value{ Date: task.Date, ValueType: valueType, Value: helper.AnyToString(value), Instance: instance, } valuesToInsert = append(valuesToInsert, dbValue) } database.Db.Create(&valuesToInsert) err = queue.MainQueue.MarkDone(taskId) if err != nil { log.Log.Error("Failed marking element in queue as done: ", err.Error()) } log.Log.Info("Stored boiler data in database") } }