22
33import com .acuity .iot .dsa .dslink .protocol .message .OutboundMessage ;
44import com .acuity .iot .dsa .dslink .transport .DSTransport ;
5+ import java .io .IOException ;
56import java .util .LinkedList ;
67import java .util .List ;
7- import java .util .logging .Logger ;
88import org .iot .dsa .dslink .DSIRequester ;
99import org .iot .dsa .dslink .DSLinkConnection ;
1010import org .iot .dsa .node .DSNode ;
1717 */
1818public abstract class DSSession extends DSNode {
1919
20+ ///////////////////////////////////////////////////////////////////////////
21+ // Constants
22+ ///////////////////////////////////////////////////////////////////////////
23+
24+ private static final int MAX_MSG_ID = 2147483647 ;
25+ private static final long MSG_TIMEOUT = 60000 ;
26+
2027 ///////////////////////////////////////////////////////////////////////////
2128 // Fields
2229 ///////////////////////////////////////////////////////////////////////////
2330
31+ private long lastRecv ;
32+ private long lastSend ;
33+ private int nextAck = -1 ;
34+ private int nextMessage = 1 ;
2435 private boolean connected = false ;
2536 private DSLinkConnection connection ;
26- private Logger logger ;
2737 private Object outgoingMutex = new Object ();
2838 private List <OutboundMessage > outgoingRequests = new LinkedList <OutboundMessage >();
2939 private List <OutboundMessage > outgoingResponses = new LinkedList <OutboundMessage >();
@@ -127,11 +137,28 @@ public DSLinkConnection getConnection() {
127137 }
128138
129139 @ Override
130- public Logger getLogger () {
131- if (logger == null ) {
132- logger = Logger .getLogger (getConnection ().getLink ().getLinkName () + ".session" );
140+ protected String getLogName () {
141+ return "Session" ;
142+ }
143+
144+ /**
145+ * The next ack id, or -1.
146+ */
147+ public synchronized int getNextAck () {
148+ int ret = nextAck ;
149+ nextAck = -1 ;
150+ return ret ;
151+ }
152+
153+ /**
154+ * Returns the next new message id.
155+ */
156+ public synchronized int getNextMessageId () {
157+ int ret = nextMessage ;
158+ if (++nextMessage > MAX_MSG_ID ) {
159+ nextMessage = 1 ;
133160 }
134- return logger ;
161+ return ret ;
135162 }
136163
137164 public abstract DSIRequester getRequester ();
@@ -140,17 +167,8 @@ public DSTransport getTransport() {
140167 return getConnection ().getTransport ();
141168 }
142169
143- /**
144- * True if there are any outbound requests or responses queued up.
145- */
146- protected final boolean hasMessagesToSend () {
147- if (!outgoingResponses .isEmpty ()) {
148- return true ;
149- }
150- if (!outgoingRequests .isEmpty ()) {
151- return true ;
152- }
153- return false ;
170+ protected boolean hasAckToSend () {
171+ return nextAck > 0 ;
154172 }
155173
156174 protected boolean hasOutgoingRequests () {
@@ -165,7 +183,16 @@ protected boolean hasOutgoingResponses() {
165183 * Override point, this returns the result of hasMessagesToSend.
166184 */
167185 protected boolean hasSomethingToSend () {
168- return hasMessagesToSend ();
186+ if (nextAck > 0 ) {
187+ return true ;
188+ }
189+ if (!outgoingResponses .isEmpty ()) {
190+ return true ;
191+ }
192+ if (!outgoingRequests .isEmpty ()) {
193+ return true ;
194+ }
195+ return false ;
169196 }
170197
171198 protected boolean isConnected () {
@@ -206,6 +233,16 @@ public void onDisconnect() {
206233 }
207234 }
208235
236+ /**
237+ * Call for each incoming message id that needs to be acked.
238+ */
239+ public synchronized void setNextAck (int nextAck ) {
240+ if (nextAck > 0 ) {
241+ this .nextAck = nextAck ;
242+ notifyOutgoing ();
243+ }
244+ }
245+
209246 /**
210247 * Called when the broker signifies that requests are allowed.
211248 */
@@ -220,20 +257,35 @@ public void setRequesterAllowed() {
220257 * implementation. A separate thread is spun off to manage writing.
221258 */
222259 public void run () {
260+ lastRecv = lastSend = System .currentTimeMillis ();
223261 new WriteThread (getConnection ().getLink ().getLinkName () + " Writer" ).start ();
224262 while (connected ) {
225263 try {
264+ verifyLastSend ();
226265 doRecvMessage ();
266+ lastRecv = System .currentTimeMillis ();
227267 } catch (Exception x ) {
228268 getTransport ().close ();
229269 if (connected ) {
230270 connected = false ;
231- severe (getPath (), x );
271+ error (getPath (), x );
232272 }
233273 }
234274 }
235275 }
236276
277+ private void verifyLastRead () throws IOException {
278+ if ((System .currentTimeMillis () - lastRecv ) > MSG_TIMEOUT ) {
279+ throw new IOException ("No message received in " + MSG_TIMEOUT + "ms" );
280+ }
281+ }
282+
283+ private void verifyLastSend () throws IOException {
284+ if ((System .currentTimeMillis () - lastSend ) > MSG_TIMEOUT ) {
285+ throw new IOException ("No message sent in " + MSG_TIMEOUT + "ms" );
286+ }
287+ }
288+
237289 ///////////////////////////////////////////////////////////////////////////
238290 // Inner Classes
239291 ///////////////////////////////////////////////////////////////////////////
@@ -251,23 +303,25 @@ private class WriteThread extends Thread {
251303 public void run () {
252304 try {
253305 while (connected ) {
306+ verifyLastRead ();
254307 synchronized (outgoingMutex ) {
255308 if (!hasSomethingToSend ()) {
256309 try {
257310 outgoingMutex .wait (5000 );
258311 } catch (InterruptedException x ) {
259- fine (getPath (), x );
312+ warn (getPath (), x );
260313 }
261314 continue ;
262315 }
263316 }
264317 doSendMessage ();
318+ lastSend = System .currentTimeMillis ();
265319 }
266320 } catch (Exception x ) {
267321 if (connected ) {
268322 connected = false ;
269323 getTransport ().close ();
270- severe (getPath (), x );
324+ error (getPath (), x );
271325 }
272326 }
273327 }
0 commit comments