package ui import ( "cli-mon/yabl" "context" json "encoding/json" "fmt" amqp "github.com/rabbitmq/amqp091-go" "time" ) const exchange = "yabl-newproto-in" type broker struct { uri string conn *amqp.Connection ch *amqp.Channel } func NewAmqp(uri string) UI { return &broker{uri: uri} } func (b *broker) Consume(event *yabl.Event) { var bytes []byte var err error if bytes, err = json.Marshal(event); err != nil { return } route := fmt.Sprintf( "%s.%s.%s.%d", *event.Tag, *event.ActionName, *event.Field, event.GetUnitId()) ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() if err = b.ch.PublishWithContext(ctx, exchange, route, false, false, amqp.Publishing{ ContentType: "application/json", Body: bytes, }); err != nil { fmt.Printf("Can't publish msg: %v", err) } } func (b *broker) Start() { var err error b.conn, err = amqp.Dial(b.uri) if err != nil { panic(fmt.Sprintf("Can't connect to AMQP: %v", err)) } b.ch, err = b.conn.Channel() if err != nil { panic(fmt.Sprintf("Can't create AMQP channel: %v", err)) } fmt.Printf("AMQP client connected to %v, sending data to server..", b.uri) } func (b *broker) Stop() { if b.conn != nil { b.conn.Close() } if b.ch != nil { b.ch.Close() } }