Small CLI monitoring tool to decode proprietary [...] protocol and display info in human readable representation
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
ycli-mon/ui/amqp.go

76 lines
1.3 KiB

package ui
import (
"cli-mon/yabl"
"context"
json "encoding/json"
"fmt"
amqp "github.com/rabbitmq/amqp091-go"
"time"
)
const exchange = "yabl-newproto-in"
type broker struct {
uri string
conn *amqp.Connection
ch *amqp.Channel
}
func NewAmqp(uri string) UI {
return &broker{uri: uri}
}
func (b *broker) Consume(event *yabl.Event) {
var bytes []byte
var err error
if bytes, err = json.Marshal(event); err != nil {
return
}
route := fmt.Sprintf(
"%s.%s.%s.%d",
*event.Tag,
*event.ActionName,
*event.Field,
event.GetUnitId())
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err = b.ch.PublishWithContext(ctx,
exchange,
route,
false,
false,
amqp.Publishing{
ContentType: "application/json",
Body: bytes,
}); err != nil {
fmt.Printf("Can't publish msg: %v", err)
}
}
func (b *broker) Start() {
var err error
b.conn, err = amqp.Dial(b.uri)
if err != nil {
panic(fmt.Sprintf("Can't connect to AMQP: %v", err))
}
b.ch, err = b.conn.Channel()
if err != nil {
panic(fmt.Sprintf("Can't create AMQP channel: %v", err))
}
fmt.Printf("AMQP client connected to %v, sending data to server..", b.uri)
}
func (b *broker) Stop() {
if b.conn != nil {
b.conn.Close()
}
if b.ch != nil {
b.ch.Close()
}
}