diff --git a/.env.example b/.env.example index b7fe1737c86f1a88c714c5fe23c3e24da0dceaef..f092a53734c9ac9e2dcc4fd194a680ca5851fe4f 100644 --- a/.env.example +++ b/.env.example @@ -7,4 +7,5 @@ MQTT_PASSWORD=password MQTT_TOPIC_BOILER=ems-esp/boiler_data LOG_LEVEL=INFO OUTPUT_FILE_NAME="boiler.csv" -OUTPUT_FILE_NAME_RAW="boiler.txt" \ No newline at end of file +OUTPUT_FILE_NAME_RAW="boiler.txt" +ZEROMQ_BIND="tcp://127.0.0.1:5557" \ No newline at end of file diff --git a/go.mod b/go.mod index 97a60efbe837b4d6502015f98416ee8f90f056f6..c44106770b367c5d0024830f37f854cbfb3d1750 100644 --- a/go.mod +++ b/go.mod @@ -5,6 +5,7 @@ go 1.23.4 require ( github.com/eclipse/paho.mqtt.golang v1.5.0 github.com/joho/godotenv v1.5.1 + github.com/pebbe/zmq4 v1.2.11 github.com/sirupsen/logrus v1.9.3 github.com/wind-c/comqtt/v2 v2.6.0 gopkg.in/natefinch/lumberjack.v2 v2.2.1 diff --git a/go.sum b/go.sum index 04b0286ab491326e40257bfeb9b34ed69e0ec4bc..40a424b91906e065bdccad1ab026b948a2e6d11c 100644 --- a/go.sum +++ b/go.sum @@ -9,6 +9,8 @@ github.com/jinzhu/copier v0.4.0 h1:w3ciUoD19shMCRargcpm0cm91ytaBhDvuRpz1ODO/U8= github.com/jinzhu/copier v0.4.0/go.mod h1:DfbEm0FYsaqBcKcFuvmOZb218JkPGtvSHsKg8S8hyyg= github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0= github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4= +github.com/pebbe/zmq4 v1.2.11 h1:Ua5mgIaZeabUGnH7tqswkUcjkL7JYGai5e8v4hpEU9Q= +github.com/pebbe/zmq4 v1.2.11/go.mod h1:nqnPueOapVhE2wItZ0uOErngczsJdLOGkebMxaO8r48= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/rs/xid v1.5.0 h1:mKX4bl4iPYJtEIxp6CYiUuLQ/8DYMoz0PUdtGgMFRVc= diff --git a/main.go b/main.go index 26cb826c56625dd59de61e6af4bec586071a480b..edee1a5f4c6e35acc902ef5afe0da742cac66317 100644 --- a/main.go +++ b/main.go @@ -8,10 +8,14 @@ import ( "jonasled.dev/jonasled/ems-esp-logger/log" "jonasled.dev/jonasled/ems-esp-logger/mqttclient" "jonasled.dev/jonasled/ems-esp-logger/mqttserver" + "jonasled.dev/jonasled/ems-esp-logger/zeromq" ) func main() { log.Init() + defer zeromq.Pusher.Close() + + zeromq.Init() if os.Getenv("MQTT_SERVER_ENABLED") == "true" { log.Log.Info("Starting embedded MQTT server") mqttserver.Start() diff --git a/mqttclient/main.go b/mqttclient/main.go index f459ffc630cd6d842c82ff9cda190264e1efd6f8..5f56dfcf92fea268831a9b8620ca2e582f0a278a 100644 --- a/mqttclient/main.go +++ b/mqttclient/main.go @@ -10,6 +10,7 @@ import ( "jonasled.dev/jonasled/ems-esp-logger/csv" "jonasled.dev/jonasled/ems-esp-logger/helper" "jonasled.dev/jonasled/ems-esp-logger/log" + "jonasled.dev/jonasled/ems-esp-logger/zeromq" ) var messagePubHandler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) { @@ -20,8 +21,26 @@ var messagePubHandlerBoiler mqtt.MessageHandler = func(client mqtt.Client, msg m log.Log.Debugf("Received message for boiler: %s from topic: %s\n", msg.Payload(), msg.Topic()) log.Log.Info("Received new boiler data") csv.JsonToCsv(string(msg.Payload())) + + var jsonData map[string]interface{} + err := json.Unmarshal(msg.Payload(), &jsonData) + if err != nil { + panic(err) + } + + currentTime := time.Now().Local().Format("2006-01-02 15:04:05") + jsonData["date"] = currentTime + + updatedData, err := json.Marshal(jsonData) + if err != nil { + panic(err) + } + if os.Getenv("OUTPUT_FILE_NAME_RAW") != "" { - dumpRawData(string(msg.Payload()), os.Getenv("OUTPUT_FILE_NAME_RAW")) + dumpRawData(string(updatedData), os.Getenv("OUTPUT_FILE_NAME_RAW")) + } + if os.Getenv("OUTPUT_DATABSE") != "" { + zeromq.Pusher.Send(string(updatedData), 0) } } @@ -53,26 +72,12 @@ func Init() { } func dumpRawData(data string, filename string) { - var jsonData map[string]interface{} - err := json.Unmarshal([]byte(data), &jsonData) - if err != nil { - panic(err) - } - - currentTime := time.Now().Local().Format("2006-01-02 15:04:05") - jsonData["date"] = currentTime - - updatedData, err := json.Marshal(jsonData) - if err != nil { - panic(err) - } - file, err := os.OpenFile(filename, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) if err != nil { panic(err) } defer file.Close() - file.WriteString(string(updatedData)) + file.WriteString(data) file.WriteString("\n") } diff --git a/zeromq/main.go b/zeromq/main.go new file mode 100644 index 0000000000000000000000000000000000000000..d27d819f88324b4f886f248852a7836b9df7b3ce --- /dev/null +++ b/zeromq/main.go @@ -0,0 +1,20 @@ +package zeromq + +import ( + "os" + + zmq "github.com/pebbe/zmq4" + "jonasled.dev/jonasled/ems-esp-logger/log" +) + +var Pusher *zmq.Socket + +func Init() { + var err error + Pusher, err = zmq.NewSocket(zmq.PUSH) + if err != nil { + log.Log.Fatal("Failed starting zeroMQ socket: ", err.Error()) + } + Pusher.Bind(os.Getenv("ZEROMQ_BIND")) + log.Log.Info("Successfully initialized Message broker") +}