44 "errors"
55 "fmt"
66 "net"
7- "sync"
8- "time"
97
108 "github.com/minetest-go/minetest_client/commands"
119 "github.com/minetest-go/minetest_client/packet"
@@ -14,28 +12,22 @@ import (
1412var ErrTimeout = errors .New ("timeout" )
1513
1614type CommandClient struct {
17- conn net.Conn
18- Host string
19- Port int
20- PeerID uint16
21- sph * packet.SplitpacketHandler
22- netrx chan []byte
23- listeners []chan commands.Command
24- listener_lock * sync.RWMutex
25- payload_listeners []chan []byte
26- payload_listener_lock * sync.RWMutex
15+ conn net.Conn
16+ Host string
17+ Port int
18+ PeerID uint16
19+ sph * packet.SplitpacketHandler
20+ netrx chan []byte
21+ cmd_chan chan commands.Command
2722}
2823
2924func NewCommandClient (host string , port int ) * CommandClient {
3025 return & CommandClient {
31- Host : host ,
32- Port : port ,
33- sph : packet .NewSplitPacketHandler (),
34- netrx : make (chan []byte , 1000 ),
35- listeners : make ([]chan commands.Command , 0 ),
36- listener_lock : & sync.RWMutex {},
37- payload_listeners : make ([]chan []byte , 0 ),
38- payload_listener_lock : & sync.RWMutex {},
26+ Host : host ,
27+ Port : port ,
28+ sph : packet .NewSplitPacketHandler (),
29+ netrx : make (chan []byte , 1000 ),
30+ cmd_chan : make (chan commands.Command , 1000 ),
3931 }
4032}
4133
@@ -59,79 +51,12 @@ func (c *CommandClient) Disconnect() error {
5951 return err
6052 }
6153 close (c .netrx )
62- for _ , l := range c .listeners {
63- close (l )
64- }
65-
54+ close (c .cmd_chan )
6655 return c .conn .Close ()
6756}
6857
69- func (c * CommandClient ) AddListener (ch chan commands.Command ) {
70- c .listener_lock .Lock ()
71- defer c .listener_lock .Unlock ()
72- c .listeners = append (c .listeners , ch )
73- }
74-
75- func (c * CommandClient ) RemoveListener (ch chan commands.Command ) {
76- c .listener_lock .Lock ()
77- defer c .listener_lock .Unlock ()
78- newlisteners := make ([]chan commands.Command , 0 )
79- for _ , l := range c .listeners {
80- if l != ch {
81- newlisteners = append (newlisteners , l )
82- }
83- }
84- c .listeners = newlisteners
85- }
86-
87- func (c * CommandClient ) AddPayloadListener (ch chan []byte ) {
88- c .payload_listener_lock .Lock ()
89- defer c .payload_listener_lock .Unlock ()
90- c .payload_listeners = append (c .payload_listeners , ch )
91- }
92-
93- func (c * CommandClient ) RemovePayloadListener (ch chan []byte ) {
94- c .payload_listener_lock .Lock ()
95- defer c .payload_listener_lock .Unlock ()
96-
97- newlisteners := make ([]chan []byte , 0 )
98- for _ , l := range c .payload_listeners {
99- if l != ch {
100- newlisteners = append (newlisteners , l )
101- }
102- }
103- c .payload_listeners = newlisteners
104- }
105-
106- func (c * CommandClient ) WaitFor (cmd commands.Command , timeout time.Duration ) error {
107- ch := make (chan []byte , 1000 )
108- c .AddPayloadListener (ch )
109- defer c .RemovePayloadListener (ch )
110- until := time .Now ().Add (timeout )
111-
112- for {
113- select {
114- case <- time .After (time .Until (until )):
115- return ErrTimeout
116- case payload := <- ch :
117- cmdId := commands .GetCommandID (payload )
118- if cmdId == cmd .GetCommandId () {
119- return cmd .UnmarshalPacket (commands .GetCommandPayload (payload ))
120- }
121- }
122- }
123- }
124-
125- func (c * CommandClient ) emitCommand (cmd commands.Command ) {
126- c .listener_lock .RLock ()
127- defer c .listener_lock .RUnlock ()
128-
129- for _ , ch := range c .listeners {
130- select {
131- case ch <- cmd :
132- default :
133- }
134- }
58+ func (c * CommandClient ) CommandChannel () chan commands.Command {
59+ return c .cmd_chan
13560}
13661
13762func (c * CommandClient ) SendOriginalCommand (cmd commands.Command ) error {
@@ -195,31 +120,36 @@ func (c *CommandClient) Send(packet *packet.Packet) error {
195120}
196121
197122func (c * CommandClient ) handleCommandPayload (payload []byte ) error {
198- c .payload_listener_lock .RLock ()
199- for _ , ch := range c .payload_listeners {
200- ch <- payload
201- }
202- c .payload_listener_lock .RUnlock ()
123+ //fmt.Printf("Received bytes: len=%d, cmdId=%d\n", len(payload), commands.GetCommandID(payload))
203124
204125 cmd , err := commands .Parse (payload )
205126 if err != nil {
206127 return err
207128 }
208- c .emitCommand (cmd )
129+
130+ c .cmd_chan <- cmd
209131 return nil
210132}
211133
212134func (c * CommandClient ) onReceive (p * packet.Packet ) error {
213- //fmt.Printf("Received packet: %s\n", p)
214-
135+ //fmt.Printf("Packet: %s\n", p.String())
215136 if p .PacketType == packet .Reliable || p .PacketType == packet .Original {
216137 if p .ControlType == packet .SetPeerID {
217138 c .PeerID = p .PeerID
218- cmd := & commands.ServerSetPeer {
219- PeerID : p .PeerID ,
139+ cmd := & commands.ServerSetPeer {PeerID : p .PeerID }
140+
141+ // send as raw payload to potential listeners
142+ payload , err := commands .CreatePayload (cmd )
143+ if err != nil {
144+ return fmt .Errorf ("peerId marshal error: %v" , err )
145+ }
146+
147+ err = c .handleCommandPayload (payload )
148+ if err != nil {
149+ return fmt .Errorf ("handleCommandPayload error: %v" , err )
220150 }
221151
222- c .emitCommand ( cmd )
152+ c .cmd_chan <- cmd
223153 }
224154 }
225155
0 commit comments