package ui import ( "cli-mon/yabl" "context" json "encoding/json" "fmt" amqp "github.com/rabbitmq/amqp091-go" "log" "time" ) const ( exchange = "yabl-newproto-exch" brokerQueue = "yabl-newproto-conv" routingKey = "*.newproto.in.#" routingKeyFormat = "%s.newproto.out.%s.%s.%d" ) type broker struct { uri string conn *amqp.Connection writeCh *amqp.Channel } type communicator struct { broker tag string readCh *amqp.Channel toBus chan<- *yabl.Event } func NewAmqp(uri string) Consumer { return &broker{uri: uri} } func NewAmqpWithEventProducer(uri string, tag string, toBus chan<- *yabl.Event) Consumer { return &communicator{broker: broker{uri: uri}, tag: tag, toBus: toBus} } func (b *broker) Consume(event *yabl.Event) { var bytes []byte var err error if bytes, err = json.Marshal(event); err != nil { return } route := fmt.Sprintf( routingKeyFormat, *event.Tag, *event.ActionName, *event.Field, event.GetUnitId()) ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() if err = b.writeCh.PublishWithContext(ctx, exchange, route, false, false, amqp.Publishing{ ContentType: "application/json", Body: bytes, }); err != nil { fmt.Printf("Can't publish msg: %v", err) } } func (b *broker) Start() { var err error b.conn, err = amqp.Dial(b.uri) if err != nil { panic(fmt.Sprintf("Can't connect to AMQP: %v", err)) } b.writeCh, err = b.conn.Channel() if err != nil { panic(fmt.Sprintf("Can't create writer AMQP channel: %v", err)) } fmt.Printf("AMQP client connected to %v, sending data to server..", b.uri) } func (c *communicator) Start() { c.broker.Start() var err error c.readCh, err = c.conn.Channel() if err != nil { panic(fmt.Sprintf("Can't create reader AMQP channel: %v", err)) } var queue amqp.Queue queue, err = c.readCh.QueueDeclare( brokerQueue, // name false, // durable true, // delete when unused true, // exclusive false, // no-wait nil, // arguments ) if err != nil { panic(fmt.Errorf("failed to declare reader queue: %w", err)) } if err = c.readCh.QueueBind(queue.Name, routingKey, exchange, false, nil); err != nil { panic(fmt.Errorf("failed to bind reader queue: %w", err)) } var events <-chan amqp.Delivery events, err = c.readCh.Consume( queue.Name, // brockerQueue c.tag, // consumer true, // auto-ack false, // exclusive false, // no-local false, // no-wait nil, // args ) if err != nil { panic(fmt.Errorf("failed to register reader consumer: %w", err)) } go func() { defer c.readCh.Close() defer c.Stop() for d := range events { event := &yabl.Event{} if err := json.Unmarshal(d.Body, event); err != nil { log.Println("unmarshalling error:", err.Error()) continue } c.toBus <- event } }() } func (b *broker) Stop() { if b.conn != nil { b.conn.Close() } if b.writeCh != nil { b.writeCh.Close() } }