@@ -5,30 +5,37 @@ import (
55 "fmt"
66 "net"
77 "sync"
8+ "time"
89
910 "github.com/minetest-go/minetest_client/commands"
1011 "github.com/minetest-go/minetest_client/packet"
1112)
1213
14+ var ErrTimeout = errors .New ("timeout" )
15+
1316type CommandClient struct {
14- conn net.Conn
15- Host string
16- Port int
17- PeerID uint16
18- sph * packet.SplitpacketHandler
19- netrx chan []byte
20- listeners []chan commands.Command
21- listener_lock * sync.RWMutex
17+ conn net.Conn
18+ Host string
19+ Port int
20+ PeerID uint16
21+ sph * packet.SplitpacketHandler
22+ netrx chan []byte
23+ listeners []chan commands.Command
24+ listener_lock * sync.RWMutex
25+ payload_listeners []chan []byte
26+ payload_listener_lock * sync.RWMutex
2227}
2328
2429func NewCommandClient (host string , port int ) * CommandClient {
2530 return & CommandClient {
26- Host : host ,
27- Port : port ,
28- sph : packet .NewSplitPacketHandler (),
29- netrx : make (chan []byte , 1000 ),
30- listeners : make ([]chan commands.Command , 0 ),
31- listener_lock : & sync.RWMutex {},
31+ Host : host ,
32+ Port : port ,
33+ sph : packet .NewSplitPacketHandler (),
34+ netrx : make (chan []byte , 1000 ),
35+ listeners : make ([]chan commands.Command , 0 ),
36+ listener_lock : & sync.RWMutex {},
37+ payload_listeners : make ([]chan []byte , 0 ),
38+ payload_listener_lock : & sync.RWMutex {},
3239 }
3340}
3441
@@ -77,6 +84,44 @@ func (c *CommandClient) RemoveListener(ch chan commands.Command) {
7784 c .listeners = newlisteners
7885}
7986
87+ func (c * CommandClient ) AddPayloadListener (ch chan []byte ) {
88+ c .payload_listener_lock .Lock ()
89+ defer c .payload_listener_lock .Unlock ()
90+ c .payload_listeners = append (c .payload_listeners , ch )
91+ }
92+
93+ func (c * CommandClient ) RemovePayloadListener (ch chan []byte ) {
94+ c .payload_listener_lock .Lock ()
95+ defer c .payload_listener_lock .Unlock ()
96+
97+ newlisteners := make ([]chan []byte , 0 )
98+ for _ , l := range c .payload_listeners {
99+ if l != ch {
100+ newlisteners = append (newlisteners , l )
101+ }
102+ }
103+ c .payload_listeners = newlisteners
104+ }
105+
106+ func (c * CommandClient ) WaitFor (cmd commands.Command , timeout time.Duration ) error {
107+ ch := make (chan []byte , 1000 )
108+ c .AddPayloadListener (ch )
109+ defer c .RemovePayloadListener (ch )
110+ until := time .Now ().Add (timeout )
111+
112+ for {
113+ select {
114+ case <- time .After (time .Until (until )):
115+ return ErrTimeout
116+ case payload := <- ch :
117+ cmdId := commands .GetCommandID (payload )
118+ if cmdId == cmd .GetCommandId () {
119+ return cmd .UnmarshalPacket (commands .GetCommandPayload (payload ))
120+ }
121+ }
122+ }
123+ }
124+
80125func (c * CommandClient ) emitCommand (cmd commands.Command ) {
81126 c .listener_lock .RLock ()
82127 defer c .listener_lock .RUnlock ()
@@ -150,6 +195,12 @@ func (c *CommandClient) Send(packet *packet.Packet) error {
150195}
151196
152197func (c * CommandClient ) handleCommandPayload (payload []byte ) error {
198+ c .payload_listener_lock .RLock ()
199+ for _ , ch := range c .payload_listeners {
200+ ch <- payload
201+ }
202+ c .payload_listener_lock .RUnlock ()
203+
153204 cmd , err := commands .Parse (payload )
154205 if err != nil {
155206 return err
0 commit comments