Skip to content
Snippets Groups Projects
main.go 2.81 KiB
Newer Older
  • Learn to ignore specific revisions
  • Jonas Leder's avatar
    Jonas Leder committed
    package mqttclient
    
    import (
    
    	"encoding/json"
    
    Jonas Leder's avatar
    Jonas Leder committed
    	"fmt"
    	"os"
    
    	"time"
    
    Jonas Leder's avatar
    Jonas Leder committed
    
    	mqtt "github.com/eclipse/paho.mqtt.golang"
    
    	"jonasled.dev/jonasled/ems-esp-logger/csv"
    
    Jonas Leder's avatar
    Jonas Leder committed
    	"jonasled.dev/jonasled/ems-esp-logger/helper"
    	"jonasled.dev/jonasled/ems-esp-logger/log"
    
    	"jonasled.dev/jonasled/ems-esp-logger/types"
    
    	"jonasled.dev/jonasled/ems-esp-logger/zeromq"
    
    Jonas Leder's avatar
    Jonas Leder committed
    )
    
    var messagePubHandler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
    	log.Log.Debugf("Received message: %s from topic: %s\n", msg.Payload(), msg.Topic())
    }
    
    var messagePubHandlerBoiler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
    	log.Log.Debugf("Received message for boiler: %s from topic: %s\n", msg.Payload(), msg.Topic())
    
    	log.Log.Info("Received new boiler data")
    
    	csv.JsonToCsv(string(msg.Payload()))
    
    	if os.Getenv("OUTPUT_FILE_NAME_RAW") != "" {
    
    		dumpRawData(string(msg.Payload()), os.Getenv("OUTPUT_FILE_NAME_RAW"))
    
    	}
    	if os.Getenv("OUTPUT_DATABSE") != "" {
    
    		task := types.Task{
    			Date:     time.Now().UTC(),
    			Data:     string(msg.Payload()),
    			Instance: os.Getenv("INSTANCE_NAME"),
    		}
    		taskData, err := json.Marshal(task)
    		if err != nil {
    			log.Log.Error("Failed encoding new data task as JSON: ", err.Error())
    		}
    		zeromq.Pusher.Send(string(taskData), 0)
    
    Jonas Leder's avatar
    Jonas Leder committed
    
    }
    
    var connectHandler mqtt.OnConnectHandler = func(client mqtt.Client) {
    	log.Log.Info("Successfully connected to MQTT broker")
    }
    
    var connectLostHandler mqtt.ConnectionLostHandler = func(client mqtt.Client, err error) {
    	log.Log.Error("Connection to MQTT broker lost: ", err.Error())
    }
    
    func Init() {
    	opts := mqtt.NewClientOptions()
    	opts.AddBroker(fmt.Sprintf("tcp://%s:%s", os.Getenv("MQTT_HOST"), os.Getenv("MQTT_PORT")))
    	opts.SetClientID("ems-esp-logger_" + helper.GenerateRandomString(8))
    	if os.Getenv("MQTT_USERNAME") != "" {
    		opts.SetUsername(os.Getenv("MQTT_USERNAME"))
    		opts.SetPassword(os.Getenv("MQTT_PASSWORD"))
    	}
    	opts.SetDefaultPublishHandler(messagePubHandler)
    	opts.OnConnect = connectHandler
    	opts.OnConnectionLost = connectLostHandler
    	client := mqtt.NewClient(opts)
    	if token := client.Connect(); token.Wait() && token.Error() != nil {
    		log.Log.Fatal("Failed connecting to MQTT server: ", token.Error())
    	}
    
    	client.Subscribe(os.Getenv("MQTT_TOPIC_BOILER"), 1, messagePubHandlerBoiler)
    
    Jonas Leder's avatar
    Jonas Leder committed
    }
    
    
    func dumpRawData(data string, filename string) {
    
    
    	var jsonData map[string]interface{}
    	err := json.Unmarshal([]byte(data), &jsonData)
    	if err != nil {
    
    Jonas Leder's avatar
    Jonas Leder committed
    		log.Log.Error(err)
    		return
    
    	}
    
    	currentTime := time.Now().Local().Format("2006-01-02 15:04:05")
    	jsonData["date"] = currentTime
    
    	updatedData, err := json.Marshal(jsonData)
    	if err != nil {
    
    Jonas Leder's avatar
    Jonas Leder committed
    		log.Log.Error(err)
    		return
    
    	file, err := os.OpenFile(filename, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
    	if err != nil {
    
    Jonas Leder's avatar
    Jonas Leder committed
    		log.Log.Error(err)
    		return
    
    	}
    	defer file.Close()
    
    
    	file.WriteString(string(updatedData))
    
    	file.WriteString("\n")
    }