From c75af5dce41f8b96dcf3e518de4b8c06097a5197 Mon Sep 17 00:00:00 2001 From: Jonas Leder <jonas@jonasled.de> Date: Sun, 26 Jan 2025 13:12:34 +0100 Subject: [PATCH 01/15] write new messages to ZeroMQ broker --- .env.example | 3 ++- go.mod | 1 + go.sum | 2 ++ main.go | 4 ++++ mqttclient/main.go | 37 +++++++++++++++++++++---------------- zeromq/main.go | 20 ++++++++++++++++++++ 6 files changed, 50 insertions(+), 17 deletions(-) create mode 100644 zeromq/main.go diff --git a/.env.example b/.env.example index b7fe173..f092a53 100644 --- a/.env.example +++ b/.env.example @@ -7,4 +7,5 @@ MQTT_PASSWORD=password MQTT_TOPIC_BOILER=ems-esp/boiler_data LOG_LEVEL=INFO OUTPUT_FILE_NAME="boiler.csv" -OUTPUT_FILE_NAME_RAW="boiler.txt" \ No newline at end of file +OUTPUT_FILE_NAME_RAW="boiler.txt" +ZEROMQ_BIND="tcp://127.0.0.1:5557" \ No newline at end of file diff --git a/go.mod b/go.mod index 97a60ef..c441067 100644 --- a/go.mod +++ b/go.mod @@ -5,6 +5,7 @@ go 1.23.4 require ( github.com/eclipse/paho.mqtt.golang v1.5.0 github.com/joho/godotenv v1.5.1 + github.com/pebbe/zmq4 v1.2.11 github.com/sirupsen/logrus v1.9.3 github.com/wind-c/comqtt/v2 v2.6.0 gopkg.in/natefinch/lumberjack.v2 v2.2.1 diff --git a/go.sum b/go.sum index 04b0286..40a424b 100644 --- a/go.sum +++ b/go.sum @@ -9,6 +9,8 @@ github.com/jinzhu/copier v0.4.0 h1:w3ciUoD19shMCRargcpm0cm91ytaBhDvuRpz1ODO/U8= github.com/jinzhu/copier v0.4.0/go.mod h1:DfbEm0FYsaqBcKcFuvmOZb218JkPGtvSHsKg8S8hyyg= github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0= github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4= +github.com/pebbe/zmq4 v1.2.11 h1:Ua5mgIaZeabUGnH7tqswkUcjkL7JYGai5e8v4hpEU9Q= +github.com/pebbe/zmq4 v1.2.11/go.mod h1:nqnPueOapVhE2wItZ0uOErngczsJdLOGkebMxaO8r48= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/rs/xid v1.5.0 h1:mKX4bl4iPYJtEIxp6CYiUuLQ/8DYMoz0PUdtGgMFRVc= diff --git a/main.go b/main.go index 26cb826..edee1a5 100644 --- a/main.go +++ b/main.go @@ -8,10 +8,14 @@ import ( "jonasled.dev/jonasled/ems-esp-logger/log" "jonasled.dev/jonasled/ems-esp-logger/mqttclient" "jonasled.dev/jonasled/ems-esp-logger/mqttserver" + "jonasled.dev/jonasled/ems-esp-logger/zeromq" ) func main() { log.Init() + defer zeromq.Pusher.Close() + + zeromq.Init() if os.Getenv("MQTT_SERVER_ENABLED") == "true" { log.Log.Info("Starting embedded MQTT server") mqttserver.Start() diff --git a/mqttclient/main.go b/mqttclient/main.go index f459ffc..5f56dfc 100644 --- a/mqttclient/main.go +++ b/mqttclient/main.go @@ -10,6 +10,7 @@ import ( "jonasled.dev/jonasled/ems-esp-logger/csv" "jonasled.dev/jonasled/ems-esp-logger/helper" "jonasled.dev/jonasled/ems-esp-logger/log" + "jonasled.dev/jonasled/ems-esp-logger/zeromq" ) var messagePubHandler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) { @@ -20,8 +21,26 @@ var messagePubHandlerBoiler mqtt.MessageHandler = func(client mqtt.Client, msg m log.Log.Debugf("Received message for boiler: %s from topic: %s\n", msg.Payload(), msg.Topic()) log.Log.Info("Received new boiler data") csv.JsonToCsv(string(msg.Payload())) + + var jsonData map[string]interface{} + err := json.Unmarshal(msg.Payload(), &jsonData) + if err != nil { + panic(err) + } + + currentTime := time.Now().Local().Format("2006-01-02 15:04:05") + jsonData["date"] = currentTime + + updatedData, err := json.Marshal(jsonData) + if err != nil { + panic(err) + } + if os.Getenv("OUTPUT_FILE_NAME_RAW") != "" { - dumpRawData(string(msg.Payload()), os.Getenv("OUTPUT_FILE_NAME_RAW")) + dumpRawData(string(updatedData), os.Getenv("OUTPUT_FILE_NAME_RAW")) + } + if os.Getenv("OUTPUT_DATABSE") != "" { + zeromq.Pusher.Send(string(updatedData), 0) } } @@ -53,26 +72,12 @@ func Init() { } func dumpRawData(data string, filename string) { - var jsonData map[string]interface{} - err := json.Unmarshal([]byte(data), &jsonData) - if err != nil { - panic(err) - } - - currentTime := time.Now().Local().Format("2006-01-02 15:04:05") - jsonData["date"] = currentTime - - updatedData, err := json.Marshal(jsonData) - if err != nil { - panic(err) - } - file, err := os.OpenFile(filename, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) if err != nil { panic(err) } defer file.Close() - file.WriteString(string(updatedData)) + file.WriteString(data) file.WriteString("\n") } diff --git a/zeromq/main.go b/zeromq/main.go new file mode 100644 index 0000000..d27d819 --- /dev/null +++ b/zeromq/main.go @@ -0,0 +1,20 @@ +package zeromq + +import ( + "os" + + zmq "github.com/pebbe/zmq4" + "jonasled.dev/jonasled/ems-esp-logger/log" +) + +var Pusher *zmq.Socket + +func Init() { + var err error + Pusher, err = zmq.NewSocket(zmq.PUSH) + if err != nil { + log.Log.Fatal("Failed starting zeroMQ socket: ", err.Error()) + } + Pusher.Bind(os.Getenv("ZEROMQ_BIND")) + log.Log.Info("Successfully initialized Message broker") +} -- GitLab From 1fe5f8d2d4a3c83b0cf6fbdd51175fc14a303e33 Mon Sep 17 00:00:00 2001 From: Jonas Leder <jonas@jonasled.de> Date: Sun, 26 Jan 2025 13:17:00 +0100 Subject: [PATCH 02/15] initialize dummy worker for processing tasks --- main.go | 4 +++- messageworker/main.go | 18 ++++++++++++++++++ 2 files changed, 21 insertions(+), 1 deletion(-) create mode 100644 messageworker/main.go diff --git a/main.go b/main.go index edee1a5..a746003 100644 --- a/main.go +++ b/main.go @@ -6,6 +6,7 @@ import ( _ "github.com/joho/godotenv/autoload" "jonasled.dev/jonasled/ems-esp-logger/log" + "jonasled.dev/jonasled/ems-esp-logger/messageworker" "jonasled.dev/jonasled/ems-esp-logger/mqttclient" "jonasled.dev/jonasled/ems-esp-logger/mqttserver" "jonasled.dev/jonasled/ems-esp-logger/zeromq" @@ -21,8 +22,9 @@ func main() { mqttserver.Start() } mqttclient.Init() + go messageworker.Run() for { - time.Sleep(time.Second) + time.Sleep(time.Second) // reduce CPU usage by adding a short sleep here instead of a empty for loop } } diff --git a/messageworker/main.go b/messageworker/main.go new file mode 100644 index 0000000..44334de --- /dev/null +++ b/messageworker/main.go @@ -0,0 +1,18 @@ +package messageworker + +import ( + "fmt" + "os" + + zmq "github.com/pebbe/zmq4" +) + +func Run() { + worker, _ := zmq.NewSocket(zmq.PULL) + defer worker.Close() + worker.Connect(os.Getenv("ZEROMQ_WORKER")) + for { + task, _ := worker.Recv(0) + fmt.Println("Processing:", task) + } +} -- GitLab From 06285f76b435ae8abbbc8bb4c5480a721e75bbe5 Mon Sep 17 00:00:00 2001 From: Jonas Leder <jonas@jonasled.de> Date: Sun, 26 Jan 2025 13:32:49 +0100 Subject: [PATCH 03/15] initialize database connection with sqlite or MySQL --- database/main.go | 88 +++++++++++++++++++++++++++++++++++++ database/tables/instance.go | 7 +++ go.mod | 18 ++++++++ go.sum | 42 ++++++++++++++++++ main.go | 9 +++- 5 files changed, 163 insertions(+), 1 deletion(-) create mode 100644 database/main.go create mode 100644 database/tables/instance.go diff --git a/database/main.go b/database/main.go new file mode 100644 index 0000000..08706f6 --- /dev/null +++ b/database/main.go @@ -0,0 +1,88 @@ +package database + +import ( + "fmt" + "os" + "time" + + "github.com/glebarez/sqlite" + gormloggerlogrus "github.com/nekomeowww/gorm-logger-logrus" + "github.com/sirupsen/logrus" + "gorm.io/driver/mysql" + "gorm.io/gorm" + "gorm.io/gorm/logger" + "jonasled.dev/jonasled/ems-esp-logger/database/tables" + "jonasled.dev/jonasled/ems-esp-logger/log" +) + +var Db *gorm.DB + +func Init() { + var err error + var dbdriver gorm.Dialector + switch os.Getenv("OUTPUT_DATABASE_TYPE") { + case "sqlite": + dbdriver = sqlite.Open(os.Getenv("OUTPUT_DATABSE")) + case "mysql": + dsn := fmt.Sprintf("%s:%s@tcp(%s)/%s?charset=utf8mb4&parseTime=True&loc=Local", + os.Getenv("OUTPUT_DATABASE_USER"), + os.Getenv("OUTPUT_DATABASE_PASSWORD"), + os.Getenv("OUTPUT_DATABASE_HOST"), + os.Getenv("OUTPUT_DATABSE"), + ) + dbdriver = mysql.Open(dsn) + default: + log.Log.Fatal("Please set OUTPUT_DATABASE_TYPE to a valid value") + } + Db, err = gorm.Open(dbdriver, &gorm.Config{ + Logger: gormloggerlogrus.New(gormloggerlogrus.Options{ + Logger: logrus.NewEntry(log.Log), + LogLevel: logger.Error, + IgnoreRecordNotFoundError: false, + SlowThreshold: time.Millisecond * 200, + FileWithLineNumField: "file", + }), + }) + + if err != nil { + log.Log.Fatal("Failed initializing database: ", err.Error()) + } + + if os.Getenv("OUTPUT_DATABASE_EXECUTE_MIGRATIONS") != "false" { + log.Log.Info("Executing database migrations") + Db.AutoMigrate(&tables.Instance{}) + } +} + +func CreateInstance() { + instanceName := os.Getenv("INSTANCE_NAME") + instanceDescription := os.Getenv("INSTANCE_DESCRIPTION") + if instanceName == "" || instanceDescription == "" { + log.Log.Fatal("INSTANCE_NAME and INSTANCE_DESCRIPTION must be set") + } + var instance tables.Instance + err := Db.Where("name = ?", instanceName).First(&instance).Error + + if err != nil { + if err == gorm.ErrRecordNotFound { + // Create a new instance if it doesn't exist + instance = tables.Instance{ + Name: instanceName, + Description: instanceDescription, + } + if err := Db.Create(&instance).Error; err != nil { + log.Log.Fatalf("Failed to create instance: %v", err) + } + log.Log.Infof("Created new instance: %+v\n", instance) + } else { + log.Log.Fatalf("Failed to query database: %v", err) + } + } else { + // Update the description if the instance exists + instance.Description = instanceDescription + if err := Db.Save(&instance).Error; err != nil { + log.Log.Fatalf("Failed to update instance description: %v", err) + } + log.Log.Infof("Updated instance description: %+v\n", instance) + } +} diff --git a/database/tables/instance.go b/database/tables/instance.go new file mode 100644 index 0000000..1ecfb7f --- /dev/null +++ b/database/tables/instance.go @@ -0,0 +1,7 @@ +package tables + +type Instance struct { + ID uint `gorm:"primaryKey"` + Name string + Description string +} diff --git a/go.mod b/go.mod index c441067..1bd3fb7 100644 --- a/go.mod +++ b/go.mod @@ -4,18 +4,36 @@ go 1.23.4 require ( github.com/eclipse/paho.mqtt.golang v1.5.0 + github.com/glebarez/sqlite v1.11.0 github.com/joho/godotenv v1.5.1 + github.com/nekomeowww/gorm-logger-logrus v1.0.8 github.com/pebbe/zmq4 v1.2.11 github.com/sirupsen/logrus v1.9.3 github.com/wind-c/comqtt/v2 v2.6.0 gopkg.in/natefinch/lumberjack.v2 v2.2.1 + gorm.io/driver/mysql v1.5.7 + gorm.io/gorm v1.25.12 ) require ( + filippo.io/edwards25519 v1.1.0 // indirect + github.com/dustin/go-humanize v1.0.1 // indirect + github.com/glebarez/go-sqlite v1.21.2 // indirect + github.com/go-sql-driver/mysql v1.8.1 // indirect + github.com/google/uuid v1.3.0 // indirect github.com/gorilla/websocket v1.5.3 // indirect + github.com/jinzhu/inflection v1.0.0 // indirect + github.com/jinzhu/now v1.1.5 // indirect + github.com/mattn/go-isatty v0.0.20 // indirect + github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect github.com/rs/xid v1.5.0 // indirect golang.org/x/net v0.27.0 // indirect golang.org/x/sync v0.7.0 // indirect golang.org/x/sys v0.22.0 // indirect + golang.org/x/text v0.16.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect + modernc.org/libc v1.22.5 // indirect + modernc.org/mathutil v1.5.0 // indirect + modernc.org/memory v1.5.0 // indirect + modernc.org/sqlite v1.23.1 // indirect ) diff --git a/go.sum b/go.sum index 40a424b..5e87357 100644 --- a/go.sum +++ b/go.sum @@ -1,18 +1,44 @@ +filippo.io/edwards25519 v1.1.0 h1:FNf4tywRC1HmFuKW5xopWpigGjJKiJSV0Cqo0cJWDaA= +filippo.io/edwards25519 v1.1.0/go.mod h1:BxyFTGdWcka3PhytdK4V28tE5sGfRvvvRV7EaN4VDT4= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY= +github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto= github.com/eclipse/paho.mqtt.golang v1.5.0 h1:EH+bUVJNgttidWFkLLVKaQPGmkTUfQQqjOsyvMGvD6o= github.com/eclipse/paho.mqtt.golang v1.5.0/go.mod h1:du/2qNQVqJf/Sqs4MEL77kR8QTqANF7XU7Fk0aOTAgk= +github.com/glebarez/go-sqlite v1.21.2 h1:3a6LFC4sKahUunAmynQKLZceZCOzUthkRkEAl9gAXWo= +github.com/glebarez/go-sqlite v1.21.2/go.mod h1:sfxdZyhQjTM2Wry3gVYWaW072Ri1WMdWJi0k6+3382k= +github.com/glebarez/sqlite v1.11.0 h1:wSG0irqzP6VurnMEpFGer5Li19RpIRi2qvQz++w0GMw= +github.com/glebarez/sqlite v1.11.0/go.mod h1:h8/o8j5wiAsqSPoWELDUdJXhjAhsVliSn7bWZjOhrgQ= +github.com/go-sql-driver/mysql v1.7.0/go.mod h1:OXbVy3sEdcQ2Doequ6Z5BW6fXNQTmx+9S1MCJN5yJMI= +github.com/go-sql-driver/mysql v1.8.1 h1:LedoTUt/eveggdHS9qUFC1EFSa8bU2+1pZjSRpvNJ1Y= +github.com/go-sql-driver/mysql v1.8.1/go.mod h1:wEBSXgmK//2ZFJyE+qWnIsVGmvmEKlqwuVSjsCm7DZg= +github.com/google/pprof v0.0.0-20221118152302-e6195bd50e26 h1:Xim43kblpZXfIBQsbuBVKCudVG457BR2GZFIz3uw3hQ= +github.com/google/pprof v0.0.0-20221118152302-e6195bd50e26/go.mod h1:dDKJzRmX4S37WGHujM7tX//fmj1uioxKzKxz3lo4HJo= +github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= +github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg= github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/jinzhu/copier v0.4.0 h1:w3ciUoD19shMCRargcpm0cm91ytaBhDvuRpz1ODO/U8= github.com/jinzhu/copier v0.4.0/go.mod h1:DfbEm0FYsaqBcKcFuvmOZb218JkPGtvSHsKg8S8hyyg= +github.com/jinzhu/inflection v1.0.0 h1:K317FqzuhWc8YvSVlFMCCUb36O/S9MCKRDI7QkRKD/E= +github.com/jinzhu/inflection v1.0.0/go.mod h1:h+uFLlag+Qp1Va5pdKtLDYj+kHp5pxUVkryuEj+Srlc= +github.com/jinzhu/now v1.1.5 h1:/o9tlHleP7gOFmsnYNz3RGnqzefHA47wQpKrrdTIwXQ= +github.com/jinzhu/now v1.1.5/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8= github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0= github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4= +github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= +github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= +github.com/nekomeowww/gorm-logger-logrus v1.0.8 h1:/XdgNrSBSrTwiZ7fgf8Y9zW9juW1rVmRoY7OkX/tPnY= +github.com/nekomeowww/gorm-logger-logrus v1.0.8/go.mod h1:V266BSFFzJF1jnKWVAucFsh4U+ib3pEqJCzQY5zm7iA= github.com/pebbe/zmq4 v1.2.11 h1:Ua5mgIaZeabUGnH7tqswkUcjkL7JYGai5e8v4hpEU9Q= github.com/pebbe/zmq4 v1.2.11/go.mod h1:nqnPueOapVhE2wItZ0uOErngczsJdLOGkebMxaO8r48= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= +github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE= +github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= github.com/rs/xid v1.5.0 h1:mKX4bl4iPYJtEIxp6CYiUuLQ/8DYMoz0PUdtGgMFRVc= github.com/rs/xid v1.5.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg= github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ= @@ -28,8 +54,11 @@ golang.org/x/net v0.27.0/go.mod h1:dDi0PyhWNoiUOrAS8uXv/vnScO4wnHQO4mj9fn/RytE= golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M= golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.22.0 h1:RI27ohtqKCnwULzJLqkv897zojh5/DwS/ENaMzUOaWI= golang.org/x/sys v0.22.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/text v0.16.0 h1:a94ExnEXNtEwYLGJSIUxnWoxoRz/ZcCsV63ROupILh4= +golang.org/x/text v0.16.0/go.mod h1:GhwF1Be+LQoKShO3cGOHzqOgRrGaYc9AvblQOmPVHnI= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/natefinch/lumberjack.v2 v2.2.1 h1:bBRl1b0OH9s/DuPhuXpNl+VtCaJXFZ5/uEFST95x9zc= @@ -37,3 +66,16 @@ gopkg.in/natefinch/lumberjack.v2 v2.2.1/go.mod h1:YD8tP3GAjkrDg1eZH7EGmyESg/lsYs gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gorm.io/driver/mysql v1.5.7 h1:MndhOPYOfEp2rHKgkZIhJ16eVUIRf2HmzgoPmh7FCWo= +gorm.io/driver/mysql v1.5.7/go.mod h1:sEtPWMiqiN1N1cMXoXmBbd8C6/l+TESwriotuRRpkDM= +gorm.io/gorm v1.25.7/go.mod h1:hbnx/Oo0ChWMn1BIhpy1oYozzpM15i4YPuHDmfYtwg8= +gorm.io/gorm v1.25.12 h1:I0u8i2hWQItBq1WfE0o2+WuL9+8L21K9e2HHSTE/0f8= +gorm.io/gorm v1.25.12/go.mod h1:xh7N7RHfYlNc5EmcI/El95gXusucDrQnHXe0+CgWcLQ= +modernc.org/libc v1.22.5 h1:91BNch/e5B0uPbJFgqbxXuOnxBQjlS//icfQEGmvyjE= +modernc.org/libc v1.22.5/go.mod h1:jj+Z7dTNX8fBScMVNRAYZ/jF91K8fdT2hYMThc3YjBY= +modernc.org/mathutil v1.5.0 h1:rV0Ko/6SfM+8G+yKiyI830l3Wuz1zRutdslNoQ0kfiQ= +modernc.org/mathutil v1.5.0/go.mod h1:mZW8CKdRPY1v87qxC/wUdX5O1qDzXMP5TH3wjfpga6E= +modernc.org/memory v1.5.0 h1:N+/8c5rE6EqugZwHii4IFsaJ7MUhoWX07J5tC/iI5Ds= +modernc.org/memory v1.5.0/go.mod h1:PkUhL0Mugw21sHPeskwZW4D6VscE/GQJOnIpCnW6pSU= +modernc.org/sqlite v1.23.1 h1:nrSBg4aRQQwq59JpvGEQ15tNxoO5pX/kUjcRNwSAGQM= +modernc.org/sqlite v1.23.1/go.mod h1:OrDj17Mggn6MhE+iPbBNf7RGKODDE9NFT0f3EwDzJqk= diff --git a/main.go b/main.go index a746003..46e6f53 100644 --- a/main.go +++ b/main.go @@ -5,6 +5,7 @@ import ( "time" _ "github.com/joho/godotenv/autoload" + "jonasled.dev/jonasled/ems-esp-logger/database" "jonasled.dev/jonasled/ems-esp-logger/log" "jonasled.dev/jonasled/ems-esp-logger/messageworker" "jonasled.dev/jonasled/ems-esp-logger/mqttclient" @@ -14,9 +15,15 @@ import ( func main() { log.Init() - defer zeromq.Pusher.Close() zeromq.Init() + defer zeromq.Pusher.Close() + + if os.Getenv("OUTPUT_DATABSE") != "" { + database.Init() + database.CreateInstance() + } + if os.Getenv("MQTT_SERVER_ENABLED") == "true" { log.Log.Info("Starting embedded MQTT server") mqttserver.Start() -- GitLab From 7f9414ca00d181864df307e7f92e3358c34ff232 Mon Sep 17 00:00:00 2001 From: Jonas Leder <jonas@jonasled.de> Date: Sun, 26 Jan 2025 13:39:28 +0100 Subject: [PATCH 04/15] define tables for value types and value --- database/main.go | 2 +- database/tables/valueTypes.go | 7 +++++++ database/tables/values.go | 13 +++++++++++++ 3 files changed, 21 insertions(+), 1 deletion(-) create mode 100644 database/tables/valueTypes.go create mode 100644 database/tables/values.go diff --git a/database/main.go b/database/main.go index 08706f6..2b62e9d 100644 --- a/database/main.go +++ b/database/main.go @@ -50,7 +50,7 @@ func Init() { if os.Getenv("OUTPUT_DATABASE_EXECUTE_MIGRATIONS") != "false" { log.Log.Info("Executing database migrations") - Db.AutoMigrate(&tables.Instance{}) + Db.AutoMigrate(&tables.Instance{}, &tables.ValueType{}, &tables.Value{}) } } diff --git a/database/tables/valueTypes.go b/database/tables/valueTypes.go new file mode 100644 index 0000000..5e3c14f --- /dev/null +++ b/database/tables/valueTypes.go @@ -0,0 +1,7 @@ +package tables + +type ValueType struct { + ID uint `gorm:"primaryKey"` + Name string + Description string +} diff --git a/database/tables/values.go b/database/tables/values.go new file mode 100644 index 0000000..51edd6e --- /dev/null +++ b/database/tables/values.go @@ -0,0 +1,13 @@ +package tables + +import "time" + +type Value struct { + ID uint `gorm:"primaryKey"` + Date time.Time + InstanceID int + Instance Instance + ValueTypeID int + ValueType ValueType + Value string +} -- GitLab From f3f90042fef456b88d91b5fc80dbaea6f8cc4939 Mon Sep 17 00:00:00 2001 From: Jonas Leder <jonas@jonasled.de> Date: Sun, 26 Jan 2025 13:46:11 +0100 Subject: [PATCH 05/15] introduce struct for transmitting task --- mqttclient/main.go | 45 ++++++++++++++++++++++++++++----------------- types/task.go | 9 +++++++++ 2 files changed, 37 insertions(+), 17 deletions(-) create mode 100644 types/task.go diff --git a/mqttclient/main.go b/mqttclient/main.go index 5f56dfc..ee1c61e 100644 --- a/mqttclient/main.go +++ b/mqttclient/main.go @@ -10,6 +10,7 @@ import ( "jonasled.dev/jonasled/ems-esp-logger/csv" "jonasled.dev/jonasled/ems-esp-logger/helper" "jonasled.dev/jonasled/ems-esp-logger/log" + "jonasled.dev/jonasled/ems-esp-logger/types" "jonasled.dev/jonasled/ems-esp-logger/zeromq" ) @@ -22,25 +23,20 @@ var messagePubHandlerBoiler mqtt.MessageHandler = func(client mqtt.Client, msg m log.Log.Info("Received new boiler data") csv.JsonToCsv(string(msg.Payload())) - var jsonData map[string]interface{} - err := json.Unmarshal(msg.Payload(), &jsonData) - if err != nil { - panic(err) - } - - currentTime := time.Now().Local().Format("2006-01-02 15:04:05") - jsonData["date"] = currentTime - - updatedData, err := json.Marshal(jsonData) - if err != nil { - panic(err) - } - if os.Getenv("OUTPUT_FILE_NAME_RAW") != "" { - dumpRawData(string(updatedData), os.Getenv("OUTPUT_FILE_NAME_RAW")) + dumpRawData(string(msg.Payload()), os.Getenv("OUTPUT_FILE_NAME_RAW")) } if os.Getenv("OUTPUT_DATABSE") != "" { - zeromq.Pusher.Send(string(updatedData), 0) + task := types.Task{ + Date: time.Now().UTC(), + Data: string(msg.Payload()), + Instance: os.Getenv("INSTANCE_NAME"), + } + taskData, err := json.Marshal(task) + if err != nil { + log.Log.Error("Failed encoding new data task as JSON: ", err.Error()) + } + zeromq.Pusher.Send(string(taskData), 0) } } @@ -72,12 +68,27 @@ func Init() { } func dumpRawData(data string, filename string) { + + var jsonData map[string]interface{} + err := json.Unmarshal([]byte(data), &jsonData) + if err != nil { + panic(err) + } + + currentTime := time.Now().Local().Format("2006-01-02 15:04:05") + jsonData["date"] = currentTime + + updatedData, err := json.Marshal(jsonData) + if err != nil { + panic(err) + } + file, err := os.OpenFile(filename, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) if err != nil { panic(err) } defer file.Close() - file.WriteString(data) + file.WriteString(string(updatedData)) file.WriteString("\n") } diff --git a/types/task.go b/types/task.go new file mode 100644 index 0000000..44e1d04 --- /dev/null +++ b/types/task.go @@ -0,0 +1,9 @@ +package types + +import "time" + +type Task struct { + Date time.Time + Instance string + Data string +} -- GitLab From 7fd01df016012e5e29b575a1201c9d7606eef1ec Mon Sep 17 00:00:00 2001 From: Jonas Leder <jonas@jonasled.de> Date: Sun, 26 Jan 2025 13:59:42 +0100 Subject: [PATCH 06/15] never exit with a panic --- mqttclient/main.go | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/mqttclient/main.go b/mqttclient/main.go index ee1c61e..6207aa5 100644 --- a/mqttclient/main.go +++ b/mqttclient/main.go @@ -72,7 +72,8 @@ func dumpRawData(data string, filename string) { var jsonData map[string]interface{} err := json.Unmarshal([]byte(data), &jsonData) if err != nil { - panic(err) + log.Log.Error(err) + return } currentTime := time.Now().Local().Format("2006-01-02 15:04:05") @@ -80,12 +81,14 @@ func dumpRawData(data string, filename string) { updatedData, err := json.Marshal(jsonData) if err != nil { - panic(err) + log.Log.Error(err) + return } file, err := os.OpenFile(filename, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) if err != nil { - panic(err) + log.Log.Error(err) + return } defer file.Close() -- GitLab From b13408912d9c6949a6fae9f9cbcc3c1b400e4bd3 Mon Sep 17 00:00:00 2001 From: Jonas Leder <jonas@jonasled.de> Date: Sun, 26 Jan 2025 14:00:03 +0100 Subject: [PATCH 07/15] move createInstance to a seperate file --- database/createInstance.go | 42 ++++++++++++++++++++++++++++++++++++++ database/main.go | 33 ------------------------------ 2 files changed, 42 insertions(+), 33 deletions(-) create mode 100644 database/createInstance.go diff --git a/database/createInstance.go b/database/createInstance.go new file mode 100644 index 0000000..509ac93 --- /dev/null +++ b/database/createInstance.go @@ -0,0 +1,42 @@ +package database + +import ( + "os" + + "gorm.io/gorm" + "jonasled.dev/jonasled/ems-esp-logger/database/tables" + "jonasled.dev/jonasled/ems-esp-logger/log" +) + +func CreateInstance() { + instanceName := os.Getenv("INSTANCE_NAME") + instanceDescription := os.Getenv("INSTANCE_DESCRIPTION") + if instanceName == "" || instanceDescription == "" { + log.Log.Fatal("INSTANCE_NAME and INSTANCE_DESCRIPTION must be set") + } + var instance tables.Instance + err := Db.Where("name = ?", instanceName).First(&instance).Error + + if err != nil { + if err == gorm.ErrRecordNotFound { + // Create a new instance if it doesn't exist + instance = tables.Instance{ + Name: instanceName, + Description: instanceDescription, + } + if err := Db.Create(&instance).Error; err != nil { + log.Log.Fatalf("Failed to create instance: %v", err) + } + log.Log.Infof("Created new instance: %+v\n", instance) + } else { + log.Log.Fatalf("Failed to query database: %v", err) + } + } else { + // Update the description if the instance exists + instance.Description = instanceDescription + if err := Db.Save(&instance).Error; err != nil { + log.Log.Fatalf("Failed to update instance description: %v", err) + } + log.Log.Infof("Updated instance description: %+v\n", instance) + } +} diff --git a/database/main.go b/database/main.go index 2b62e9d..93b6959 100644 --- a/database/main.go +++ b/database/main.go @@ -53,36 +53,3 @@ func Init() { Db.AutoMigrate(&tables.Instance{}, &tables.ValueType{}, &tables.Value{}) } } - -func CreateInstance() { - instanceName := os.Getenv("INSTANCE_NAME") - instanceDescription := os.Getenv("INSTANCE_DESCRIPTION") - if instanceName == "" || instanceDescription == "" { - log.Log.Fatal("INSTANCE_NAME and INSTANCE_DESCRIPTION must be set") - } - var instance tables.Instance - err := Db.Where("name = ?", instanceName).First(&instance).Error - - if err != nil { - if err == gorm.ErrRecordNotFound { - // Create a new instance if it doesn't exist - instance = tables.Instance{ - Name: instanceName, - Description: instanceDescription, - } - if err := Db.Create(&instance).Error; err != nil { - log.Log.Fatalf("Failed to create instance: %v", err) - } - log.Log.Infof("Created new instance: %+v\n", instance) - } else { - log.Log.Fatalf("Failed to query database: %v", err) - } - } else { - // Update the description if the instance exists - instance.Description = instanceDescription - if err := Db.Save(&instance).Error; err != nil { - log.Log.Fatalf("Failed to update instance description: %v", err) - } - log.Log.Infof("Updated instance description: %+v\n", instance) - } -} -- GitLab From b424a1a17315e276732c630712a1b75a923a9535 Mon Sep 17 00:00:00 2001 From: Jonas Leder <jonas@jonasled.de> Date: Sun, 26 Jan 2025 14:00:23 +0100 Subject: [PATCH 08/15] add function to get a valueType --- database/getOrCreateValueType.go | 30 ++++++++++++++++++++++++++++++ 1 file changed, 30 insertions(+) create mode 100644 database/getOrCreateValueType.go diff --git a/database/getOrCreateValueType.go b/database/getOrCreateValueType.go new file mode 100644 index 0000000..c8d85a7 --- /dev/null +++ b/database/getOrCreateValueType.go @@ -0,0 +1,30 @@ +package database + +import ( + "gorm.io/gorm" + "jonasled.dev/jonasled/ems-esp-logger/database/tables" + "jonasled.dev/jonasled/ems-esp-logger/log" +) + +func getOrCreateValueType(name string) tables.ValueType { + var valueType tables.ValueType + err := Db.Where("name = ?", name).First(&valueType).Error + + if err != nil { + if err == gorm.ErrRecordNotFound { + // Create a new instance if it doesn't exist + valueType = tables.ValueType{ + Name: name, + } + if err := Db.Create(&valueType).Error; err != nil { + log.Log.Fatalf("Failed to create value type: %v", err) + } + log.Log.Infof("Created new value type: %+v\n", valueType) + } else { + log.Log.Errorf("Failed to query database: %v", err) + } + } + + return valueType + +} -- GitLab From f8915bc46e899f707354c0752c15c552620be394 Mon Sep 17 00:00:00 2001 From: Jonas Leder <jonas@jonasled.de> Date: Sun, 26 Jan 2025 14:10:18 +0100 Subject: [PATCH 09/15] export function --- database/getOrCreateValueType.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/database/getOrCreateValueType.go b/database/getOrCreateValueType.go index c8d85a7..9e51c1a 100644 --- a/database/getOrCreateValueType.go +++ b/database/getOrCreateValueType.go @@ -6,7 +6,7 @@ import ( "jonasled.dev/jonasled/ems-esp-logger/log" ) -func getOrCreateValueType(name string) tables.ValueType { +func GetOrCreateValueType(name string) tables.ValueType { var valueType tables.ValueType err := Db.Where("name = ?", name).First(&valueType).Error -- GitLab From fca8d3be475113ad61773c16590e73bae8c348d8 Mon Sep 17 00:00:00 2001 From: Jonas Leder <jonas@jonasled.de> Date: Sun, 26 Jan 2025 14:16:08 +0100 Subject: [PATCH 10/15] implement writing data to database --- messageworker/main.go | 44 ++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 41 insertions(+), 3 deletions(-) diff --git a/messageworker/main.go b/messageworker/main.go index 44334de..8281364 100644 --- a/messageworker/main.go +++ b/messageworker/main.go @@ -1,10 +1,16 @@ package messageworker import ( - "fmt" + "encoding/json" "os" zmq "github.com/pebbe/zmq4" + "jonasled.dev/jonasled/ems-esp-logger/database" + "jonasled.dev/jonasled/ems-esp-logger/database/tables" + "jonasled.dev/jonasled/ems-esp-logger/helper" + "jonasled.dev/jonasled/ems-esp-logger/log" + "jonasled.dev/jonasled/ems-esp-logger/types" + "jonasled.dev/jonasled/ems-esp-logger/zeromq" ) func Run() { @@ -12,7 +18,39 @@ func Run() { defer worker.Close() worker.Connect(os.Getenv("ZEROMQ_WORKER")) for { - task, _ := worker.Recv(0) - fmt.Println("Processing:", task) + taskData, _ := worker.Recv(0) + log.Log.Debug("Received new task: ", taskData) + var task types.Task + err := json.Unmarshal([]byte(taskData), &task) + if err != nil { + log.Log.Error("Failed decoding task as JSON: ", err.Error()) + continue + } + var instance tables.Instance + err = database.Db.Where("name = ?", task.Instance).First(&instance).Error + if err != nil { + log.Log.Error("Failed retreiving instance from database, pushing task back to queue: ", err.Error()) + zeromq.Pusher.Send(taskData, 0) + continue + } + + var jsonData map[string]interface{} + err = json.Unmarshal([]byte(task.Data), &jsonData) + if err != nil { + log.Log.Error("Failed decoding boiler JSON: ", err.Error()) + zeromq.Pusher.Send(taskData, 0) + continue + } + for key, value := range jsonData { + valueType := database.GetOrCreateValueType(key) + dbValue := tables.Value{ + Date: task.Date, + ValueType: valueType, + Value: helper.AnyToString(value), + Instance: instance, + } + database.Db.Create(&dbValue) + } + log.Log.Info("Stored boiler data in database") } } -- GitLab From 09243f2820ca47a09894479149bdb36b0b336349 Mon Sep 17 00:00:00 2001 From: Jonas Leder <jonas@jonasled.de> Date: Sun, 26 Jan 2025 14:39:02 +0100 Subject: [PATCH 11/15] remove zeromq --- .gitignore | 4 +++- go.mod | 1 - go.sum | 2 -- main.go | 5 ++-- messageworker/main.go | 20 ++++++++-------- mqttclient/main.go | 4 ++-- queue/main.go | 35 +++++++++++++++++++++++++++ queue/queue.go | 55 +++++++++++++++++++++++++++++++++++++++++++ zeromq/main.go | 20 ---------------- 9 files changed, 107 insertions(+), 39 deletions(-) create mode 100644 queue/main.go create mode 100644 queue/queue.go delete mode 100644 zeromq/main.go diff --git a/.gitignore b/.gitignore index 0400075..68f09f4 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,5 @@ +!.gitkeep .env boiler.csv -boiler.txt \ No newline at end of file +boiler.txt +queues/ \ No newline at end of file diff --git a/go.mod b/go.mod index 1bd3fb7..f1f7efa 100644 --- a/go.mod +++ b/go.mod @@ -7,7 +7,6 @@ require ( github.com/glebarez/sqlite v1.11.0 github.com/joho/godotenv v1.5.1 github.com/nekomeowww/gorm-logger-logrus v1.0.8 - github.com/pebbe/zmq4 v1.2.11 github.com/sirupsen/logrus v1.9.3 github.com/wind-c/comqtt/v2 v2.6.0 gopkg.in/natefinch/lumberjack.v2 v2.2.1 diff --git a/go.sum b/go.sum index 5e87357..2713a23 100644 --- a/go.sum +++ b/go.sum @@ -32,8 +32,6 @@ github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWE github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= github.com/nekomeowww/gorm-logger-logrus v1.0.8 h1:/XdgNrSBSrTwiZ7fgf8Y9zW9juW1rVmRoY7OkX/tPnY= github.com/nekomeowww/gorm-logger-logrus v1.0.8/go.mod h1:V266BSFFzJF1jnKWVAucFsh4U+ib3pEqJCzQY5zm7iA= -github.com/pebbe/zmq4 v1.2.11 h1:Ua5mgIaZeabUGnH7tqswkUcjkL7JYGai5e8v4hpEU9Q= -github.com/pebbe/zmq4 v1.2.11/go.mod h1:nqnPueOapVhE2wItZ0uOErngczsJdLOGkebMxaO8r48= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= diff --git a/main.go b/main.go index 46e6f53..1c83f09 100644 --- a/main.go +++ b/main.go @@ -10,14 +10,13 @@ import ( "jonasled.dev/jonasled/ems-esp-logger/messageworker" "jonasled.dev/jonasled/ems-esp-logger/mqttclient" "jonasled.dev/jonasled/ems-esp-logger/mqttserver" - "jonasled.dev/jonasled/ems-esp-logger/zeromq" + "jonasled.dev/jonasled/ems-esp-logger/queue" ) func main() { log.Init() - zeromq.Init() - defer zeromq.Pusher.Close() + queue.Init() if os.Getenv("OUTPUT_DATABSE") != "" { database.Init() diff --git a/messageworker/main.go b/messageworker/main.go index 8281364..f7b6a93 100644 --- a/messageworker/main.go +++ b/messageworker/main.go @@ -2,26 +2,26 @@ package messageworker import ( "encoding/json" - "os" + "time" - zmq "github.com/pebbe/zmq4" "jonasled.dev/jonasled/ems-esp-logger/database" "jonasled.dev/jonasled/ems-esp-logger/database/tables" "jonasled.dev/jonasled/ems-esp-logger/helper" "jonasled.dev/jonasled/ems-esp-logger/log" + "jonasled.dev/jonasled/ems-esp-logger/queue" "jonasled.dev/jonasled/ems-esp-logger/types" - "jonasled.dev/jonasled/ems-esp-logger/zeromq" ) func Run() { - worker, _ := zmq.NewSocket(zmq.PULL) - defer worker.Close() - worker.Connect(os.Getenv("ZEROMQ_WORKER")) for { - taskData, _ := worker.Recv(0) + taskData, err := queue.MainQueue.Dequeue() + if err != nil { + time.Sleep(time.Second) + continue + } log.Log.Debug("Received new task: ", taskData) var task types.Task - err := json.Unmarshal([]byte(taskData), &task) + err = json.Unmarshal([]byte(taskData), &task) if err != nil { log.Log.Error("Failed decoding task as JSON: ", err.Error()) continue @@ -30,7 +30,7 @@ func Run() { err = database.Db.Where("name = ?", task.Instance).First(&instance).Error if err != nil { log.Log.Error("Failed retreiving instance from database, pushing task back to queue: ", err.Error()) - zeromq.Pusher.Send(taskData, 0) + queue.MainQueue.Enqueue(taskData) continue } @@ -38,7 +38,7 @@ func Run() { err = json.Unmarshal([]byte(task.Data), &jsonData) if err != nil { log.Log.Error("Failed decoding boiler JSON: ", err.Error()) - zeromq.Pusher.Send(taskData, 0) + queue.MainQueue.Enqueue(taskData) continue } for key, value := range jsonData { diff --git a/mqttclient/main.go b/mqttclient/main.go index 6207aa5..268565e 100644 --- a/mqttclient/main.go +++ b/mqttclient/main.go @@ -10,8 +10,8 @@ import ( "jonasled.dev/jonasled/ems-esp-logger/csv" "jonasled.dev/jonasled/ems-esp-logger/helper" "jonasled.dev/jonasled/ems-esp-logger/log" + "jonasled.dev/jonasled/ems-esp-logger/queue" "jonasled.dev/jonasled/ems-esp-logger/types" - "jonasled.dev/jonasled/ems-esp-logger/zeromq" ) var messagePubHandler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) { @@ -36,7 +36,7 @@ var messagePubHandlerBoiler mqtt.MessageHandler = func(client mqtt.Client, msg m if err != nil { log.Log.Error("Failed encoding new data task as JSON: ", err.Error()) } - zeromq.Pusher.Send(string(taskData), 0) + queue.MainQueue.Enqueue(string(taskData)) } } diff --git a/queue/main.go b/queue/main.go new file mode 100644 index 0000000..af4deda --- /dev/null +++ b/queue/main.go @@ -0,0 +1,35 @@ +package queue + +import ( + "fmt" + "os" + "os/signal" + "syscall" + + "github.com/wind-c/comqtt/v2/cluster/log" +) + +var MainQueue Queue + +func Init() { + queue, err := NewQueue(os.Getenv("QUEUE_STATE_FOLDER") + "main.json") + if err != nil { + log.Fatal("Error initializing main queue:", err) + } + + // Set up signal handling for clean exit + exitChan := make(chan os.Signal, 1) + signal.Notify(exitChan, os.Interrupt, syscall.SIGTERM) + + go func() { + <-exitChan + fmt.Println("\nSaving main queue state before exiting...") + if err := queue.Save(); err != nil { + fmt.Println("Error saving main queue state:", err) + } else { + fmt.Println("Main queue state saved successfully.") + } + os.Exit(0) + }() + MainQueue = *queue +} diff --git a/queue/queue.go b/queue/queue.go new file mode 100644 index 0000000..4bd627c --- /dev/null +++ b/queue/queue.go @@ -0,0 +1,55 @@ +package queue + +import ( + "encoding/json" + "fmt" + "os" +) + +type Queue struct { + Elements []string + FilePath string +} + +func NewQueue(filePath string) (*Queue, error) { + q := &Queue{ + Elements: make([]string, 0), + FilePath: filePath, + } + + // Load state from file + if _, err := os.Stat(filePath); err == nil { + data, err := os.ReadFile(filePath) + if err != nil { + return nil, fmt.Errorf("error reading state file: %w", err) + } + if err := json.Unmarshal(data, &q.Elements); err != nil { + return nil, fmt.Errorf("error parsing state file: %w", err) + } + } + return q, nil +} + +func (q *Queue) Enqueue(element string) { + q.Elements = append(q.Elements, element) +} + +func (q *Queue) Dequeue() (string, error) { + if len(q.Elements) == 0 { + return "", fmt.Errorf("queue is empty") + } + element := q.Elements[0] + q.Elements = q.Elements[1:] + return element, nil +} + +func (q *Queue) Save() error { + data, err := json.Marshal(q.Elements) + if err != nil { + return fmt.Errorf("error serializing state: %w", err) + } + if err := os.WriteFile(q.FilePath, data, 0644); err != nil { + return fmt.Errorf("error writing state to file: %w", err) + } + return nil +} diff --git a/zeromq/main.go b/zeromq/main.go deleted file mode 100644 index d27d819..0000000 --- a/zeromq/main.go +++ /dev/null @@ -1,20 +0,0 @@ -package zeromq - -import ( - "os" - - zmq "github.com/pebbe/zmq4" - "jonasled.dev/jonasled/ems-esp-logger/log" -) - -var Pusher *zmq.Socket - -func Init() { - var err error - Pusher, err = zmq.NewSocket(zmq.PUSH) - if err != nil { - log.Log.Fatal("Failed starting zeroMQ socket: ", err.Error()) - } - Pusher.Bind(os.Getenv("ZEROMQ_BIND")) - log.Log.Info("Successfully initialized Message broker") -} -- GitLab From a69809c61846275a8ee0b2513bc01fa9b1597fd7 Mon Sep 17 00:00:00 2001 From: Jonas Leder <jonas@jonasled.de> Date: Sun, 26 Jan 2025 14:39:57 +0100 Subject: [PATCH 12/15] update example config --- .env.example | 21 +++++++++++++++++---- 1 file changed, 17 insertions(+), 4 deletions(-) diff --git a/.env.example b/.env.example index f092a53..415106b 100644 --- a/.env.example +++ b/.env.example @@ -1,11 +1,24 @@ -MQTT_SERVER_ENABLED=false +MQTT_SERVER_ENABLED=true MQTT_LISTEN=0.0.0.0:1883 # only required for MQTT server -MQTT_HOST=1.2.3.4 +MQTT_HOST=127.0.0.1 MQTT_PORT=1883 MQTT_USERNAME=mqtt -MQTT_PASSWORD=password +MQTT_PASSWORD=mqtt MQTT_TOPIC_BOILER=ems-esp/boiler_data + LOG_LEVEL=INFO + OUTPUT_FILE_NAME="boiler.csv" OUTPUT_FILE_NAME_RAW="boiler.txt" -ZEROMQ_BIND="tcp://127.0.0.1:5557" \ No newline at end of file + +QUEUE_STATE_FOLDER="./queues" + +OUTPUT_DATABSE=ems_esp_logger +OUTPUT_DATABASE_USER=root +OUTPUT_DATABASE_PASSWORD=password +OUTPUT_DATABASE_HOST=localhost +OUTPUT_DATABASE_TYPE=mysql +OUTPUT_DATABASE_EXECUTE_MIGRATIONS=true + +INSTANCE_NAME=test +INSTANCE_DESCRIPTION="My test instance" \ No newline at end of file -- GitLab From d49cc442a6f0c6a8982af28d94c91d51b7862485 Mon Sep 17 00:00:00 2001 From: Jonas Leder <jonas@jonasled.de> Date: Sun, 26 Jan 2025 14:49:26 +0100 Subject: [PATCH 13/15] extend the queue, to support elements, to stay a given time in the queue --- messageworker/main.go | 4 ++-- mqttclient/main.go | 2 +- queue/queue.go | 34 +++++++++++++++++++++++++++------- 3 files changed, 30 insertions(+), 10 deletions(-) diff --git a/messageworker/main.go b/messageworker/main.go index f7b6a93..99167c8 100644 --- a/messageworker/main.go +++ b/messageworker/main.go @@ -30,7 +30,7 @@ func Run() { err = database.Db.Where("name = ?", task.Instance).First(&instance).Error if err != nil { log.Log.Error("Failed retreiving instance from database, pushing task back to queue: ", err.Error()) - queue.MainQueue.Enqueue(taskData) + queue.MainQueue.Enqueue(taskData, 60) continue } @@ -38,7 +38,7 @@ func Run() { err = json.Unmarshal([]byte(task.Data), &jsonData) if err != nil { log.Log.Error("Failed decoding boiler JSON: ", err.Error()) - queue.MainQueue.Enqueue(taskData) + queue.MainQueue.Enqueue(taskData, 60) continue } for key, value := range jsonData { diff --git a/mqttclient/main.go b/mqttclient/main.go index 268565e..b349ef5 100644 --- a/mqttclient/main.go +++ b/mqttclient/main.go @@ -36,7 +36,7 @@ var messagePubHandlerBoiler mqtt.MessageHandler = func(client mqtt.Client, msg m if err != nil { log.Log.Error("Failed encoding new data task as JSON: ", err.Error()) } - queue.MainQueue.Enqueue(string(taskData)) + queue.MainQueue.Enqueue(string(taskData), 0) } } diff --git a/queue/queue.go b/queue/queue.go index 4bd627c..fbc0bf4 100644 --- a/queue/queue.go +++ b/queue/queue.go @@ -4,16 +4,23 @@ import ( "encoding/json" "fmt" "os" + "time" ) +type QueueElement struct { + Value string `json:"value"` + EnqueueTime time.Time `json:"enqueue_time"` + MinDuration int `json:"min_duration"` // Minimum duration in seconds +} + type Queue struct { - Elements []string + Elements []QueueElement FilePath string } func NewQueue(filePath string) (*Queue, error) { q := &Queue{ - Elements: make([]string, 0), + Elements: make([]QueueElement, 0), FilePath: filePath, } @@ -30,17 +37,30 @@ func NewQueue(filePath string) (*Queue, error) { return q, nil } -func (q *Queue) Enqueue(element string) { - q.Elements = append(q.Elements, element) +func (q *Queue) Enqueue(element string, minDuration int) { + q.Elements = append(q.Elements, QueueElement{ + Value: element, + EnqueueTime: time.Now(), + MinDuration: minDuration, + }) } func (q *Queue) Dequeue() (string, error) { if len(q.Elements) == 0 { return "", fmt.Errorf("queue is empty") } - element := q.Elements[0] - q.Elements = q.Elements[1:] - return element, nil + + currentTime := time.Now() + for i, elem := range q.Elements { + // Check if the element has satisfied its minimum duration in the queue + if currentTime.Sub(elem.EnqueueTime).Seconds() >= float64(elem.MinDuration) { + // Remove the element from the queue + q.Elements = append(q.Elements[:i], q.Elements[i+1:]...) + return elem.Value, nil + } + } + + return "", fmt.Errorf("no elements are ready for dequeuing") } func (q *Queue) Save() error { -- GitLab From 58b58eb2b19adbf95a6142160c39ed99d1cb1b44 Mon Sep 17 00:00:00 2001 From: Jonas Leder <jonas@jonasled.de> Date: Sun, 26 Jan 2025 14:51:14 +0100 Subject: [PATCH 14/15] add missing / --- queue/main.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/queue/main.go b/queue/main.go index af4deda..f59aa3b 100644 --- a/queue/main.go +++ b/queue/main.go @@ -12,7 +12,7 @@ import ( var MainQueue Queue func Init() { - queue, err := NewQueue(os.Getenv("QUEUE_STATE_FOLDER") + "main.json") + queue, err := NewQueue(os.Getenv("QUEUE_STATE_FOLDER") + "/main.json") if err != nil { log.Fatal("Error initializing main queue:", err) } -- GitLab From 5fe39d6832eb0edd7dd0f57ae3bed161b4f31ccb Mon Sep 17 00:00:00 2001 From: Jonas Leder <jonas@jonasled.de> Date: Sun, 26 Jan 2025 14:56:53 +0100 Subject: [PATCH 15/15] fix storing queue data in json file --- queue/main.go | 5 +++-- queue/queue.go | 7 +++++++ 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/queue/main.go b/queue/main.go index f59aa3b..c802f1d 100644 --- a/queue/main.go +++ b/queue/main.go @@ -21,15 +21,16 @@ func Init() { exitChan := make(chan os.Signal, 1) signal.Notify(exitChan, os.Interrupt, syscall.SIGTERM) + MainQueue = *queue go func() { <-exitChan fmt.Println("\nSaving main queue state before exiting...") - if err := queue.Save(); err != nil { + fmt.Println("\nCurrent queue size: ", MainQueue.GetCurrentSize()) + if err := MainQueue.Save(); err != nil { fmt.Println("Error saving main queue state:", err) } else { fmt.Println("Main queue state saved successfully.") } os.Exit(0) }() - MainQueue = *queue } diff --git a/queue/queue.go b/queue/queue.go index fbc0bf4..8f00ac0 100644 --- a/queue/queue.go +++ b/queue/queue.go @@ -5,6 +5,8 @@ import ( "fmt" "os" "time" + + "jonasled.dev/jonasled/ems-esp-logger/log" ) type QueueElement struct { @@ -33,6 +35,7 @@ func NewQueue(filePath string) (*Queue, error) { if err := json.Unmarshal(data, &q.Elements); err != nil { return nil, fmt.Errorf("error parsing state file: %w", err) } + log.Log.Info("Initialized queue, current size: ", q.GetCurrentSize()) } return q, nil } @@ -63,6 +66,10 @@ func (q *Queue) Dequeue() (string, error) { return "", fmt.Errorf("no elements are ready for dequeuing") } +func (q *Queue) GetCurrentSize() int { + return len(q.Elements) +} + func (q *Queue) Save() error { data, err := json.Marshal(q.Elements) if err != nil { -- GitLab