From 6f10e2d51ff88378459437f653803efd40bd08c3 Mon Sep 17 00:00:00 2001 From: Jonas Leder <jonas@jonasled.de> Date: Sun, 26 Jan 2025 17:25:25 +0100 Subject: [PATCH] implement HTTP upload for new log messages --- go.mod | 11 +++++---- go.sum | 24 +++++++++++-------- messageworker/client.go | 52 ++++++++++++++++++++++++++++++++++++++++- 3 files changed, 71 insertions(+), 16 deletions(-) diff --git a/go.mod b/go.mod index a268147..8de657f 100644 --- a/go.mod +++ b/go.mod @@ -6,6 +6,7 @@ require ( github.com/eclipse/paho.mqtt.golang v1.5.0 github.com/gin-gonic/gin v1.10.0 github.com/glebarez/sqlite v1.11.0 + github.com/go-resty/resty/v2 v2.16.5 github.com/google/uuid v1.3.0 github.com/joho/godotenv v1.5.1 github.com/nekomeowww/gorm-logger-logrus v1.0.8 @@ -47,11 +48,11 @@ require ( github.com/twitchyliquid64/golang-asm v0.15.1 // indirect github.com/ugorji/go/codec v1.2.12 // indirect golang.org/x/arch v0.8.0 // indirect - golang.org/x/crypto v0.25.0 // indirect - golang.org/x/net v0.27.0 // indirect - golang.org/x/sync v0.7.0 // indirect - golang.org/x/sys v0.22.0 // indirect - golang.org/x/text v0.16.0 // indirect + golang.org/x/crypto v0.31.0 // indirect + golang.org/x/net v0.33.0 // indirect + golang.org/x/sync v0.10.0 // indirect + golang.org/x/sys v0.28.0 // indirect + golang.org/x/text v0.21.0 // indirect google.golang.org/protobuf v1.34.1 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect modernc.org/libc v1.22.5 // indirect diff --git a/go.sum b/go.sum index f6dff72..38a6f3b 100644 --- a/go.sum +++ b/go.sum @@ -33,6 +33,8 @@ github.com/go-playground/universal-translator v0.18.1 h1:Bcnm0ZwsGyWbCzImXv+pAJn github.com/go-playground/universal-translator v0.18.1/go.mod h1:xekY+UJKNuX9WP91TpwSH2VMlDf28Uj24BCp08ZFTUY= github.com/go-playground/validator/v10 v10.20.0 h1:K9ISHbSaI0lyB2eWMPJo+kOS/FBExVwjEviJTixqxL8= github.com/go-playground/validator/v10 v10.20.0/go.mod h1:dbuPbCMFw/DrkbEynArYaCwl3amGuJotoKCe95atGMM= +github.com/go-resty/resty/v2 v2.16.5 h1:hBKqmWrr7uRc3euHVqmh1HTHcKn99Smr7o5spptdhTM= +github.com/go-resty/resty/v2 v2.16.5/go.mod h1:hkJtXbA2iKHzJheXYvQ8snQES5ZLGKMwQ07xAwp/fiA= github.com/go-sql-driver/mysql v1.7.0/go.mod h1:OXbVy3sEdcQ2Doequ6Z5BW6fXNQTmx+9S1MCJN5yJMI= github.com/go-sql-driver/mysql v1.8.1 h1:LedoTUt/eveggdHS9qUFC1EFSa8bU2+1pZjSRpvNJ1Y= github.com/go-sql-driver/mysql v1.8.1/go.mod h1:wEBSXgmK//2ZFJyE+qWnIsVGmvmEKlqwuVSjsCm7DZg= @@ -106,19 +108,21 @@ github.com/wind-c/comqtt/v2 v2.6.0/go.mod h1:6O4VilBrTQ/cNIcmIgNdMLCK9DTiLRxkal0 golang.org/x/arch v0.0.0-20210923205945-b76863e36670/go.mod h1:5om86z9Hs0C8fWVUuoMHwpExlXzs5Tkyp9hOrfG7pp8= golang.org/x/arch v0.8.0 h1:3wRIsP3pM4yUptoR96otTUOXI367OS0+c9eeRi9doIc= golang.org/x/arch v0.8.0/go.mod h1:FEVrYAQjsQXMVJ1nsMoVVXPZg6p2JE2mx8psSWTDQys= -golang.org/x/crypto v0.25.0 h1:ypSNr+bnYL2YhwoMt2zPxHFmbAN1KZs/njMG3hxUp30= -golang.org/x/crypto v0.25.0/go.mod h1:T+wALwcMOSE0kXgUAnPAHqTLW+XHgcELELW8VaDgm/M= -golang.org/x/net v0.27.0 h1:5K3Njcw06/l2y9vpGCSdcxWOYHOUk3dVNGDXN+FvAys= -golang.org/x/net v0.27.0/go.mod h1:dDi0PyhWNoiUOrAS8uXv/vnScO4wnHQO4mj9fn/RytE= -golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M= -golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/crypto v0.31.0 h1:ihbySMvVjLAeSH1IbfcRTkD/iNscyz8rGzjF/E5hV6U= +golang.org/x/crypto v0.31.0/go.mod h1:kDsLvtWBEx7MV9tJOj9bnXsPbxwJQ6csT/x4KIN4Ssk= +golang.org/x/net v0.33.0 h1:74SYHlV8BIgHIFC/LrYkOGIwL19eTYXQ5wc6TBuO36I= +golang.org/x/net v0.33.0/go.mod h1:HXLR5J+9DxmrqMwG9qjGCxZ+zKXxBru04zlTvWlWuN4= +golang.org/x/sync v0.10.0 h1:3NQrjDixjgGwUOCaF8w2+VYHv0Ve/vGYSbdkTa98gmQ= +golang.org/x/sync v0.10.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.22.0 h1:RI27ohtqKCnwULzJLqkv897zojh5/DwS/ENaMzUOaWI= -golang.org/x/sys v0.22.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= -golang.org/x/text v0.16.0 h1:a94ExnEXNtEwYLGJSIUxnWoxoRz/ZcCsV63ROupILh4= -golang.org/x/text v0.16.0/go.mod h1:GhwF1Be+LQoKShO3cGOHzqOgRrGaYc9AvblQOmPVHnI= +golang.org/x/sys v0.28.0 h1:Fksou7UEQUWlKvIdsqzJmUmCX3cZuD2+P3XyyzwMhlA= +golang.org/x/sys v0.28.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/text v0.21.0 h1:zyQAAkrwaneQ066sspRyJaG9VNi/YJ1NfzcGB3hZ/qo= +golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ= +golang.org/x/time v0.6.0 h1:eTDhh4ZXt5Qf0augr54TN6suAUudPcawVZeIAPU7D4U= +golang.org/x/time v0.6.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE= golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= google.golang.org/protobuf v1.34.1 h1:9ddQBjfCyZPOHPUiPxpYESBLc+T8P3E+Vo4IbKZgFWg= diff --git a/messageworker/client.go b/messageworker/client.go index 17b1c85..4dfb814 100644 --- a/messageworker/client.go +++ b/messageworker/client.go @@ -5,6 +5,7 @@ import ( "os" "time" + "github.com/go-resty/resty/v2" "jonasled.dev/jonasled/ems-esp-logger/database" "jonasled.dev/jonasled/ems-esp-logger/database/tables" "jonasled.dev/jonasled/ems-esp-logger/helper" @@ -18,6 +19,9 @@ func RunClient() { log.Log.Info("Intialized local client with direct datbase connection") runDbClient() + } else { + log.Log.Info("Using remote server as data storage") + runHttpClient() } } @@ -25,6 +29,48 @@ func RunServer() { runDbClient() } +func runHttpClient() { + client := resty.New() + client.SetBaseURL(os.Getenv("API_ENDPOINT")) + client.SetHeader("Authentication", os.Getenv("INSTANCE_APIKEY")) + client.SetHeader("X-Instance", os.Getenv("INSTANCE_NAME")) + + for { + taskData, taskId, err := queue.MainQueue.Dequeue() + if err != nil { + time.Sleep(time.Second) + continue + } + log.Log.Debug("Received new task: ", taskData) + var task types.Task + err = json.Unmarshal([]byte(taskData), &task) + if err != nil { + log.Log.Error("Failed reading task data, discarding task: ", err.Error()) + continue + } + resp, err := client.R().SetBody(task).Post("/api/instance") + if err != nil { + log.Log.Error("Failed uploading task to server: ", err.Error()) + queue.MainQueue.Enqueue(taskData, 60) + queue.MainQueue.MarkDone(taskId) + continue + } + + if resp.StatusCode() != 200 { + log.Log.Error("Received invalid status code from server ", resp.StatusCode(), " response: ", string(resp.Body())) + queue.MainQueue.Enqueue(taskData, 60) + queue.MainQueue.MarkDone(taskId) + continue + } + + err = queue.MainQueue.MarkDone(taskId) + if err != nil { + log.Log.Error("Failed marking element in queue as done: ", err.Error()) + } + log.Log.Info("Successfully uploaded dataset to server") + } +} + func runDbClient() { for { taskData, taskId, err := queue.MainQueue.Dequeue() @@ -36,14 +82,17 @@ func runDbClient() { var task types.Task err = json.Unmarshal([]byte(taskData), &task) if err != nil { - log.Log.Error("Failed decoding task as JSON: ", err.Error()) + log.Log.Error("Failed reading task data, discarding task: ", err.Error()) + queue.MainQueue.MarkDone(taskId) continue } + var instance tables.Instance err = database.Db.Where("name = ?", task.Instance).First(&instance).Error if err != nil { log.Log.Error("Failed retreiving instance from database, pushing task back to queue: ", err.Error()) queue.MainQueue.Enqueue(taskData, 60) + queue.MainQueue.MarkDone(taskId) continue } @@ -52,6 +101,7 @@ func runDbClient() { if err != nil { log.Log.Error("Failed decoding boiler JSON: ", err.Error()) queue.MainQueue.Enqueue(taskData, 60) + queue.MainQueue.MarkDone(taskId) continue } valuesToInsert := []tables.Value{} -- GitLab