Add amqp support

pull/3/head
Terekhin Alexandr 2 years ago
parent 7b7c419f03
commit 234a50adb2
Signed by: didinst
GPG Key ID: D2EF94423C23BF12
  1. 4
      can/input.go
  2. 4
      can/input_test.go
  3. 14
      cli-mon.go
  4. 3
      go.mod
  5. 18
      go.sum
  6. 79
      ui/amqp.go
  7. 35
      ui/amqp_test.go
  8. 22
      yabl/strings.go

@ -52,11 +52,11 @@ func process(scanner *bufio.Scanner, ch chan<- *CanFrame) {
}
}
func ReadStdin() <-chan *CanFrame {
func ReadStdin(file *os.File) <-chan *CanFrame {
c := make(chan *CanFrame)
go func() {
scanner := bufio.NewScanner(os.Stdin)
scanner := bufio.NewScanner(file)
process(scanner, c)
close(c)
}()

@ -36,7 +36,7 @@ func Test_fromString(t *testing.T) {
want: &CanFrame{
Date: &time_m,
CanId: 0x4021,
Payload: []uint8{00, 00, 00, 00, 0xFC, 0xFF, 0xFF, 0xFF},
Payload: &[]uint8{00, 00, 00, 00, 0xFC, 0xFF, 0xFF, 0xFF},
},
},
{
@ -45,7 +45,7 @@ func Test_fromString(t *testing.T) {
want: &CanFrame{
Date: &time_p,
CanId: 0x100,
Payload: []uint8{00, 00, 00, 00, 00, 00, 0x64, 00},
Payload: &[]uint8{00, 00, 00, 00, 00, 00, 0x64, 00},
},
},
}

@ -4,7 +4,7 @@ import (
"cli-mon/can"
"cli-mon/ui"
"cli-mon/yabl"
"flag"
flag "flag"
"fmt"
"golang.org/x/sys/unix"
"os"
@ -13,16 +13,18 @@ import (
func main() {
filename := flag.String("f", "", "Candump filename")
canbus := flag.String("i", "", "CAN bus interface")
canbus := flag.String("i", "can0", "CAN bus interface")
stdin := flag.Bool("s", false, "Read from stdin")
gui := flag.Bool("gui", false, "Text mode gui")
gui := flag.Bool("gui", false, "Pseudo-GUI CLI interface")
amqp := flag.String("amqp", "", "Run output to AMQP broker url")
interval := time.Duration(*flag.Uint("t", 50, "Override data loss check interval, ms"))
flag.Parse()
var frames <-chan *can.CanFrame
switch {
case *stdin:
frames = can.ReadStdin()
frames = can.ReadStdin(os.Stdin)
case len(*filename) > 0:
frames = can.Readfile(filename)
case len(*canbus) > 0:
@ -75,6 +77,8 @@ func main() {
if *gui {
display = ui.NewUI()
} else if len(*amqp) > 0 {
display = ui.NewAmqp(*amqp)
} else {
display = ui.NewLog()
}
@ -92,7 +96,7 @@ func main() {
updates <- event
}
}
time.AfterFunc(50*time.Millisecond, f)
time.AfterFunc(interval*time.Millisecond, f)
}
time.AfterFunc(5*time.Second, f)

@ -3,13 +3,14 @@ module cli-mon
go 1.19
require (
github.com/gdamore/tcell/v2 v2.6.0
github.com/rabbitmq/amqp091-go v1.9.0
github.com/rivo/tview v0.0.0-20230621164836-6cc0565babaf
golang.org/x/sys v0.5.0
)
require (
github.com/gdamore/encoding v1.0.0 // indirect
github.com/gdamore/tcell/v2 v2.6.0 // indirect
github.com/lucasb-eyer/go-colorful v1.2.0 // indirect
github.com/mattn/go-runewidth v0.0.14 // indirect
github.com/rivo/uniseg v0.4.3 // indirect

@ -1,17 +1,31 @@
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/gdamore/encoding v1.0.0 h1:+7OoQ1Bc6eTm5niUzBa0Ctsh6JbMW6Ra+YNuAtDBdko=
github.com/gdamore/encoding v1.0.0/go.mod h1:alR0ol34c49FCSBLjhosxzcPHQbf2trDkoo5dl+VrEg=
github.com/gdamore/tcell/v2 v2.6.0 h1:OKbluoP9VYmJwZwq/iLb4BxwKcwGthaa1YNBJIyCySg=
github.com/gdamore/tcell/v2 v2.6.0/go.mod h1:be9omFATkdr0D9qewWW3d+MEvl5dha+Etb5y65J2H8Y=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/lucasb-eyer/go-colorful v1.2.0 h1:1nnpGOrhyZZuNyfu1QjKiUICQ74+3FNCN69Aj6K7nkY=
github.com/lucasb-eyer/go-colorful v1.2.0/go.mod h1:R4dSotOR9KMtayYi1e77YzuveK+i7ruzyGqttikkLy0=
github.com/mattn/go-runewidth v0.0.14 h1:+xnbZSEeDbOIg5/mE6JF0w6n9duR1l3/WmbinWVwUuU=
github.com/mattn/go-runewidth v0.0.14/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/rabbitmq/amqp091-go v1.9.0 h1:qrQtyzB4H8BQgEuJwhmVQqVHB9O4+MNDJCCAcpc3Aoo=
github.com/rabbitmq/amqp091-go v1.9.0/go.mod h1:+jPrT9iY2eLjRaMSRHUhc3z14E/l85kv/f+6luSD3pc=
github.com/rivo/tview v0.0.0-20230621164836-6cc0565babaf h1:IchpMMtnfvzg7T3je672bP1nKWz1M4tW3kMZT6CbgoM=
github.com/rivo/tview v0.0.0-20230621164836-6cc0565babaf/go.mod h1:nVwGv4MP47T0jvlk7KuTTjjuSmrGO4JF0iaiNt4bufE=
github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc=
github.com/rivo/uniseg v0.4.3 h1:utMvzDsuh3suAEnhH0RdHmoPbU648o6CvXxTx4SBMOw=
github.com/rivo/uniseg v0.4.3/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
go.uber.org/goleak v1.2.1 h1:NBol2c7O1ZokfZ0LEU9K6Whx/KnwvepVetCUhtKja4A=
go.uber.org/goleak v1.2.1/go.mod h1:qlT2yGI9QafXHhZZLxlSuNsMw3FFLxBr+tBRlmO1xH4=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4=
@ -40,3 +54,7 @@ golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGm
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

@ -0,0 +1,79 @@
package ui
import (
"cli-mon/yabl"
"context"
json "encoding/json"
"fmt"
amqp "github.com/rabbitmq/amqp091-go"
"os"
"time"
)
const exchange = "yabl-newproto-in"
var hostname, _ = os.Hostname()
type broker struct {
uri string
conn *amqp.Connection
ch *amqp.Channel
}
func NewAmqp(uri string) UI {
return &broker{uri: uri}
}
func (b *broker) Consume(event *yabl.Event) {
var bytes []byte
var err error
if bytes, err = json.Marshal(event); err != nil {
return
}
route := fmt.Sprintf(
"%s.%s.%s.%d",
hostname,
event.ActionName,
event.Field,
event.GetUnitId())
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err = b.ch.PublishWithContext(ctx,
exchange,
route,
false,
false,
amqp.Publishing{
ContentType: "application/json",
Body: bytes,
}); err != nil {
fmt.Printf("Can't publish msg: %v", err)
}
}
func (b *broker) Start() {
var err error
b.conn, err = amqp.Dial(b.uri)
if err != nil {
panic(fmt.Sprintf("Can't connect to AMQP: %v", err))
}
b.ch, err = b.conn.Channel()
if err != nil {
panic(fmt.Sprintf("Can't create AMQP channel: %v", err))
}
fmt.Printf("AMQP client connected to %v, sending data to server..", b.uri)
}
func (b *broker) Stop() {
if b.conn != nil {
b.conn.Close()
}
if b.ch != nil {
b.ch.Close()
}
}

@ -0,0 +1,35 @@
package ui
import (
"cli-mon/yabl"
"testing"
"time"
)
func TestNewAmqp(t *testing.T) {
t.Run("Amqp Test", func(t *testing.T) {
client := NewAmqp("amqp://user:password@localhost:5672/")
if client == nil {
t.Errorf("Can't create client")
}
client.Start()
now := time.Now()
event := &yabl.Event{
Field: yabl.FBoardReady,
ActionName: yabl.PPuErrors,
Object: nil,
//unit: 3,
Updated: &now,
Value: yabl.ERROR,
}
client.Consume(event)
client.Stop()
})
}

@ -1,6 +1,10 @@
package yabl
import "fmt"
import (
"encoding/json"
"fmt"
"time"
)
func (t BooleanType) String() string {
switch t {
@ -211,3 +215,19 @@ func (t SignedAirTemp8BitType) String() string {
func (c ConnectedOut8bitType) String() string {
return fmt.Sprintf("%d", c)
}
func (u *Event) MarshalJSON() ([]byte, error) {
return json.Marshal(struct {
Unit uint
ActionName AName
Field FName
Updated *time.Time
Value string
}{
Unit: u.GetUnitId(),
ActionName: u.ActionName,
Field: u.Field,
Updated: u.Updated,
Value: fmt.Sprintf("%v", u.Value),
})
}

Loading…
Cancel
Save