Small CLI monitoring tool to decode proprietary [...] protocol and display info in human readable representation
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
ycli-mon/ui/amqp.go

150 lines
3.0 KiB

package ui
import (
"cli-mon/yabl"
"context"
json "encoding/json"
"fmt"
amqp "github.com/rabbitmq/amqp091-go"
"log"
"time"
)
const (
exchange = "yabl-newproto-exch"
brokerQueue = "yabl-newproto-conv"
routingKey = "*.newproto.in.#"
routingKeyFormat = "%s.newproto.out.%s.%s.%d"
)
type broker struct {
uri string
conn *amqp.Connection
writeCh *amqp.Channel
}
type communicator struct {
broker
tag string
readCh *amqp.Channel
toBus chan<- *yabl.Event
}
func NewAmqp(uri string) Consumer {
return &broker{uri: uri}
}
func NewAmqpWithEventProducer(uri string, tag string, toBus chan<- *yabl.Event) Consumer {
return &communicator{broker: broker{uri: uri}, tag: tag, toBus: toBus}
}
func (b *broker) Consume(event *yabl.Event) {
var bytes []byte
var err error
if bytes, err = json.Marshal(event); err != nil {
return
}
route := fmt.Sprintf(
routingKeyFormat,
*event.Tag,
*event.ActionName,
*event.Field,
event.GetUnitId())
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err = b.writeCh.PublishWithContext(ctx,
exchange,
route,
false,
false,
amqp.Publishing{
ContentType: "application/json",
Body: bytes,
}); err != nil {
fmt.Printf("Can't publish msg: %v", err)
}
}
func (b *broker) Start() {
var err error
b.conn, err = amqp.Dial(b.uri)
if err != nil {
panic(fmt.Sprintf("Can't connect to AMQP: %v", err))
}
b.writeCh, err = b.conn.Channel()
if err != nil {
panic(fmt.Sprintf("Can't create writer AMQP channel: %v", err))
}
fmt.Printf("AMQP client connected to %v, sending data to server..", b.uri)
}
func (c *communicator) Start() {
c.broker.Start()
var err error
c.readCh, err = c.conn.Channel()
if err != nil {
panic(fmt.Sprintf("Can't create reader AMQP channel: %v", err))
}
var queue amqp.Queue
queue, err = c.readCh.QueueDeclare(
brokerQueue, // name
false, // durable
true, // delete when unused
true, // exclusive
false, // no-wait
nil, // arguments
)
if err != nil {
panic(fmt.Errorf("failed to declare reader queue: %w", err))
}
if err = c.readCh.QueueBind(queue.Name, routingKey, exchange, false, nil); err != nil {
panic(fmt.Errorf("failed to bind reader queue: %w", err))
}
var events <-chan amqp.Delivery
events, err = c.readCh.Consume(
queue.Name, // brockerQueue
c.tag, // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
if err != nil {
panic(fmt.Errorf("failed to register reader consumer: %w", err))
}
go func() {
defer c.readCh.Close()
defer c.Stop()
for d := range events {
event := &yabl.Event{}
if err := json.Unmarshal(d.Body, event); err != nil {
log.Println("unmarshalling error:", err.Error())
continue
}
c.toBus <- event
}
}()
}
func (b *broker) Stop() {
if b.conn != nil {
b.conn.Close()
}
if b.writeCh != nil {
b.writeCh.Close()
}
}