diff --git a/messageworker/main.go b/messageworker/main.go index f7b6a93ffb9f31dc07a5ffecc6d673666ef5788e..99167c82601442d32c2db7c4986b42a5de2b9ff6 100644 --- a/messageworker/main.go +++ b/messageworker/main.go @@ -30,7 +30,7 @@ func Run() { err = database.Db.Where("name = ?", task.Instance).First(&instance).Error if err != nil { log.Log.Error("Failed retreiving instance from database, pushing task back to queue: ", err.Error()) - queue.MainQueue.Enqueue(taskData) + queue.MainQueue.Enqueue(taskData, 60) continue } @@ -38,7 +38,7 @@ func Run() { err = json.Unmarshal([]byte(task.Data), &jsonData) if err != nil { log.Log.Error("Failed decoding boiler JSON: ", err.Error()) - queue.MainQueue.Enqueue(taskData) + queue.MainQueue.Enqueue(taskData, 60) continue } for key, value := range jsonData { diff --git a/mqttclient/main.go b/mqttclient/main.go index 268565e940fc826fe03f31bcfa7d560a64e3ea17..b349ef54911931b3440d377ca8c31acd233b9fcc 100644 --- a/mqttclient/main.go +++ b/mqttclient/main.go @@ -36,7 +36,7 @@ var messagePubHandlerBoiler mqtt.MessageHandler = func(client mqtt.Client, msg m if err != nil { log.Log.Error("Failed encoding new data task as JSON: ", err.Error()) } - queue.MainQueue.Enqueue(string(taskData)) + queue.MainQueue.Enqueue(string(taskData), 0) } } diff --git a/queue/queue.go b/queue/queue.go index 4bd627ce52db7ead5ab9f64a9e0aaa8263b3d073..fbc0bf4c4976fe23d5273a75b09d3ee245db9df5 100644 --- a/queue/queue.go +++ b/queue/queue.go @@ -4,16 +4,23 @@ import ( "encoding/json" "fmt" "os" + "time" ) +type QueueElement struct { + Value string `json:"value"` + EnqueueTime time.Time `json:"enqueue_time"` + MinDuration int `json:"min_duration"` // Minimum duration in seconds +} + type Queue struct { - Elements []string + Elements []QueueElement FilePath string } func NewQueue(filePath string) (*Queue, error) { q := &Queue{ - Elements: make([]string, 0), + Elements: make([]QueueElement, 0), FilePath: filePath, } @@ -30,17 +37,30 @@ func NewQueue(filePath string) (*Queue, error) { return q, nil } -func (q *Queue) Enqueue(element string) { - q.Elements = append(q.Elements, element) +func (q *Queue) Enqueue(element string, minDuration int) { + q.Elements = append(q.Elements, QueueElement{ + Value: element, + EnqueueTime: time.Now(), + MinDuration: minDuration, + }) } func (q *Queue) Dequeue() (string, error) { if len(q.Elements) == 0 { return "", fmt.Errorf("queue is empty") } - element := q.Elements[0] - q.Elements = q.Elements[1:] - return element, nil + + currentTime := time.Now() + for i, elem := range q.Elements { + // Check if the element has satisfied its minimum duration in the queue + if currentTime.Sub(elem.EnqueueTime).Seconds() >= float64(elem.MinDuration) { + // Remove the element from the queue + q.Elements = append(q.Elements[:i], q.Elements[i+1:]...) + return elem.Value, nil + } + } + + return "", fmt.Errorf("no elements are ready for dequeuing") } func (q *Queue) Save() error {