Skip to content
Snippets Groups Projects
Commit d49cc442 authored by Jonas Leder's avatar Jonas Leder
Browse files

extend the queue, to support elements, to stay a given time in the queue

parent a69809c6
No related branches found
No related tags found
1 merge request!1WIP: write values to a database
Pipeline #54572 passed
...@@ -30,7 +30,7 @@ func Run() { ...@@ -30,7 +30,7 @@ func Run() {
err = database.Db.Where("name = ?", task.Instance).First(&instance).Error err = database.Db.Where("name = ?", task.Instance).First(&instance).Error
if err != nil { if err != nil {
log.Log.Error("Failed retreiving instance from database, pushing task back to queue: ", err.Error()) log.Log.Error("Failed retreiving instance from database, pushing task back to queue: ", err.Error())
queue.MainQueue.Enqueue(taskData) queue.MainQueue.Enqueue(taskData, 60)
continue continue
} }
...@@ -38,7 +38,7 @@ func Run() { ...@@ -38,7 +38,7 @@ func Run() {
err = json.Unmarshal([]byte(task.Data), &jsonData) err = json.Unmarshal([]byte(task.Data), &jsonData)
if err != nil { if err != nil {
log.Log.Error("Failed decoding boiler JSON: ", err.Error()) log.Log.Error("Failed decoding boiler JSON: ", err.Error())
queue.MainQueue.Enqueue(taskData) queue.MainQueue.Enqueue(taskData, 60)
continue continue
} }
for key, value := range jsonData { for key, value := range jsonData {
......
...@@ -36,7 +36,7 @@ var messagePubHandlerBoiler mqtt.MessageHandler = func(client mqtt.Client, msg m ...@@ -36,7 +36,7 @@ var messagePubHandlerBoiler mqtt.MessageHandler = func(client mqtt.Client, msg m
if err != nil { if err != nil {
log.Log.Error("Failed encoding new data task as JSON: ", err.Error()) log.Log.Error("Failed encoding new data task as JSON: ", err.Error())
} }
queue.MainQueue.Enqueue(string(taskData)) queue.MainQueue.Enqueue(string(taskData), 0)
} }
} }
......
...@@ -4,16 +4,23 @@ import ( ...@@ -4,16 +4,23 @@ import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"os" "os"
"time"
) )
type QueueElement struct {
Value string `json:"value"`
EnqueueTime time.Time `json:"enqueue_time"`
MinDuration int `json:"min_duration"` // Minimum duration in seconds
}
type Queue struct { type Queue struct {
Elements []string Elements []QueueElement
FilePath string FilePath string
} }
func NewQueue(filePath string) (*Queue, error) { func NewQueue(filePath string) (*Queue, error) {
q := &Queue{ q := &Queue{
Elements: make([]string, 0), Elements: make([]QueueElement, 0),
FilePath: filePath, FilePath: filePath,
} }
...@@ -30,17 +37,30 @@ func NewQueue(filePath string) (*Queue, error) { ...@@ -30,17 +37,30 @@ func NewQueue(filePath string) (*Queue, error) {
return q, nil return q, nil
} }
func (q *Queue) Enqueue(element string) { func (q *Queue) Enqueue(element string, minDuration int) {
q.Elements = append(q.Elements, element) q.Elements = append(q.Elements, QueueElement{
Value: element,
EnqueueTime: time.Now(),
MinDuration: minDuration,
})
} }
func (q *Queue) Dequeue() (string, error) { func (q *Queue) Dequeue() (string, error) {
if len(q.Elements) == 0 { if len(q.Elements) == 0 {
return "", fmt.Errorf("queue is empty") return "", fmt.Errorf("queue is empty")
} }
element := q.Elements[0]
q.Elements = q.Elements[1:] currentTime := time.Now()
return element, nil for i, elem := range q.Elements {
// Check if the element has satisfied its minimum duration in the queue
if currentTime.Sub(elem.EnqueueTime).Seconds() >= float64(elem.MinDuration) {
// Remove the element from the queue
q.Elements = append(q.Elements[:i], q.Elements[i+1:]...)
return elem.Value, nil
}
}
return "", fmt.Errorf("no elements are ready for dequeuing")
} }
func (q *Queue) Save() error { func (q *Queue) Save() error {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment