Skip to content

Commit 791bc74

Browse files
author
Aaron
committed
Durable sessions
1 parent 906e1ab commit 791bc74

32 files changed

+1021
-1148
lines changed

dslink-v2/src/main/java/com/acuity/iot/dsa/dslink/protocol/DSSession.java

Lines changed: 122 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -6,22 +6,31 @@
66
import java.io.IOException;
77
import java.util.LinkedList;
88
import java.util.List;
9+
import org.iot.dsa.conn.DSConnection;
10+
import org.iot.dsa.conn.DSIConnected;
911
import org.iot.dsa.dslink.DSIRequester;
1012
import org.iot.dsa.dslink.DSLinkConnection;
13+
import org.iot.dsa.node.DSBool;
14+
import org.iot.dsa.node.DSInfo;
1115
import org.iot.dsa.node.DSNode;
16+
import org.iot.dsa.util.DSException;
1217

1318
/**
1419
* The state of a connection to a broker as well as a protocol implementation. Not intended for link
1520
* implementors.
1621
*
1722
* @author Aaron Hansen
1823
*/
19-
public abstract class DSSession extends DSNode {
24+
public abstract class DSSession extends DSNode implements DSIConnected {
2025

2126
///////////////////////////////////////////////////////////////////////////
2227
// Class Fields
2328
///////////////////////////////////////////////////////////////////////////
2429

30+
protected static final String REQUESTER = "Requester";
31+
protected static final String REQUESTER_ALLOWED = "Requester Allowed";
32+
protected static final String RESPONDER = "Responder";
33+
2534
private static final int MAX_MSG_ID = Integer.MAX_VALUE;
2635
private static final long MSG_TIMEOUT = 60000;
2736

@@ -37,10 +46,12 @@ public abstract class DSSession extends DSNode {
3746
private long lastTimeSend;
3847
private int messageId = 0;
3948
private int nextMessage = 1;
40-
private Object outgoingMutex = new Object();
49+
private final Object outgoingMutex = new Object();
4150
private List<OutboundMessage> outgoingRequests = new LinkedList<OutboundMessage>();
4251
private List<OutboundMessage> outgoingResponses = new LinkedList<OutboundMessage>();
43-
protected boolean requesterAllowed = false;
52+
private DSInfo requesterAllowed = getInfo(REQUESTER_ALLOWED);
53+
private ReadThread readThread;
54+
private WriteThread writeThread;
4455

4556
///////////////////////////////////////////////////////////////////////////
4657
// Constructors
@@ -57,27 +68,12 @@ public DSSession(DSLinkConnection connection) {
5768
// Public Methods
5869
///////////////////////////////////////////////////////////////////////////
5970

60-
/**
61-
* Can be called by the subclass to force exit the run method.
62-
*/
63-
public void disconnect() {
64-
if (!connected) {
65-
return;
66-
}
67-
connected = false;
68-
getTransport().close();
69-
synchronized (outgoingMutex) {
70-
notifyAll();
71-
}
72-
info(getPath() + " locally closed");
73-
}
74-
7571
/**
7672
* Add a message to the outgoing request queue.
7773
*/
7874
public void enqueueOutgoingRequest(OutboundMessage arg) {
7975
if (connected) {
80-
if (!requesterAllowed) {
76+
if (!isRequesterAllowed()) {
8177
throw new IllegalStateException("Requests forbidden");
8278
}
8379
synchronized (outgoingMutex) {
@@ -134,58 +130,31 @@ public DSTransport getTransport() {
134130
return getConnection().getTransport();
135131
}
136132

137-
/**
138-
* Override point, called when the previous connection can be resumed. The the transport will
139-
* have already been set.
140-
*/
141-
public void onConnect() {
142-
connected = true;
143-
}
144-
145-
/**
146-
* Override point, when a connection attempt failed.
147-
*/
148-
public void onConnectFail() {
149-
connected = false;
133+
public boolean isRequesterAllowed() {
134+
return requesterAllowed.getElement().toBoolean();
150135
}
151136

152-
/**
153-
* Override point, called after the connection is closed.
154-
*/
155-
public void onDisconnect() {
156-
synchronized (outgoingMutex) {
157-
outgoingRequests.clear();
158-
outgoingResponses.clear();
159-
}
160-
}
161-
162-
/**
163-
* Called by the connection, this manages the running state and calls doRun for the specific
164-
* implementation. A separate thread is spun off to manage writing.
165-
*/
166-
public void run() {
167-
lastTimeRecv = lastTimeSend = System.currentTimeMillis();
168-
new WriteThread(getConnection().getLink().getLinkName() + " Writer").start();
169-
while (connected) {
170-
try {
171-
verifyLastSend();
172-
doRecvMessage();
173-
lastTimeRecv = System.currentTimeMillis();
174-
} catch (Exception x) {
175-
getTransport().close();
176-
if (connected) {
177-
connected = false;
178-
error(getPath(), x);
179-
}
180-
}
137+
@Override
138+
public void onChange(DSConnection connection) {
139+
switch (connection.getConnectionState()) {
140+
case CONNECTED:
141+
onConnected();
142+
break;
143+
case DISCONNECTED:
144+
onDisconnected();
145+
break;
146+
case DISCONNECTING:
147+
onDisconnecting();
148+
break;
149+
//case CONNECTING:
181150
}
182151
}
183152

184153
/**
185154
* Called when the broker signifies that requests are allowed.
186155
*/
187-
public void setRequesterAllowed() {
188-
requesterAllowed = true;
156+
public void setRequesterAllowed(boolean allowed) {
157+
put(requesterAllowed, DSBool.valueOf(allowed));
189158
}
190159

191160
public abstract boolean shouldEndMessage();
@@ -194,6 +163,12 @@ public void setRequesterAllowed() {
194163
// Protected Methods
195164
///////////////////////////////////////////////////////////////////////////
196165

166+
@Override
167+
protected void declareDefaults() {
168+
super.declareDefaults();
169+
declareDefault(REQUESTER_ALLOWED, DSBool.FALSE);
170+
}
171+
197172
/**
198173
* Can return null.
199174
*/
@@ -232,7 +207,7 @@ protected OutboundMessage dequeueOutgoingResponse() {
232207

233208
@Override
234209
protected String getLogName() {
235-
return "Session";
210+
return getLogName("session");
236211
}
237212

238213
protected int getMissingAcks() {
@@ -254,13 +229,7 @@ protected boolean hasAckToSend() {
254229
return ackToSend > 0;
255230
}
256231

257-
protected boolean hasOutgoingRequests() {
258-
return !outgoingRequests.isEmpty();
259-
}
260-
261-
protected boolean hasOutgoingResponses() {
262-
return !outgoingResponses.isEmpty();
263-
}
232+
protected abstract boolean hasPingToSend();
264233

265234
/**
266235
* Override point, this returns the result of hasMessagesToSend.
@@ -269,6 +238,9 @@ protected boolean hasSomethingToSend() {
269238
if (ackToSend >= 0) {
270239
return true;
271240
}
241+
if (hasPingToSend()) {
242+
return true;
243+
}
272244
if (waitingForAcks()) {
273245
return false;
274246
}
@@ -281,10 +253,6 @@ protected boolean hasSomethingToSend() {
281253
return false;
282254
}
283255

284-
protected boolean isConnected() {
285-
return connected;
286-
}
287-
288256
/**
289257
* Can be used to waking up a sleeping writer.
290258
*/
@@ -302,6 +270,51 @@ protected int numOutgoingResponses() {
302270
return outgoingResponses.size();
303271
}
304272

273+
/**
274+
* Creates the starts the read and write threads.
275+
*/
276+
protected void onConnected() {
277+
connected = true;
278+
lastTimeRecv = lastTimeSend = System.currentTimeMillis();
279+
readThread = new ReadThread(getConnection().getLink().getLinkName() + " Reader");
280+
writeThread = new WriteThread(getConnection().getLink().getLinkName() + " Writer");
281+
readThread.start();
282+
writeThread.start();
283+
}
284+
285+
/**
286+
* Clear the outgoing queues and waits for the the read and write threads to exit.
287+
*/
288+
protected void onDisconnected() {
289+
synchronized (outgoingMutex) {
290+
outgoingRequests.clear();
291+
outgoingResponses.clear();
292+
}
293+
try {
294+
writeThread.join(); //TODO - timeout?
295+
} catch (Exception x) {
296+
debug(getPath(), x);
297+
}
298+
try {
299+
readThread.join(); //TODO - timeout?
300+
} catch (Exception x) {
301+
debug(getPath(), x);
302+
}
303+
writeThread = null;
304+
readThread = null;
305+
}
306+
307+
/**
308+
* Sets the connected state to false so that the read and write threads will exit cleanly.
309+
*/
310+
protected void onDisconnecting() {
311+
if (!connected) {
312+
return;
313+
}
314+
connected = false;
315+
notifyOutgoing();
316+
}
317+
305318
protected void requeueOutgoingRequest(OutboundMessage arg) {
306319
synchronized (outgoingMutex) {
307320
outgoingRequests.add(arg);
@@ -364,7 +377,36 @@ private void verifyLastSend() throws IOException {
364377
///////////////////////////////////////////////////////////////////////////
365378

366379
/**
367-
* A separate thread is used for writing to the connection.
380+
* Receives messages.
381+
*/
382+
private class ReadThread extends Thread {
383+
384+
ReadThread(String name) {
385+
super(name);
386+
setDaemon(true);
387+
}
388+
389+
public void run() {
390+
DSLinkConnection conn = getConnection();
391+
try {
392+
while (connected) {
393+
verifyLastSend();
394+
doRecvMessage();
395+
conn.connOk();
396+
lastTimeRecv = System.currentTimeMillis();
397+
}
398+
} catch (Exception x) {
399+
if (connected) {
400+
connected = false;
401+
error(getPath(), x);
402+
}
403+
conn.connDown(DSException.makeMessage(x));
404+
}
405+
}
406+
}
407+
408+
/**
409+
* Sends messages.
368410
*/
369411
private class WriteThread extends Thread {
370412

@@ -374,6 +416,7 @@ private class WriteThread extends Thread {
374416
}
375417

376418
public void run() {
419+
DSLinkConnection conn = getConnection();
377420
try {
378421
while (connected) {
379422
verifyLastRead();
@@ -382,20 +425,21 @@ public void run() {
382425
try {
383426
outgoingMutex.wait(5000);
384427
} catch (InterruptedException x) {
385-
warn(getPath(), x);
428+
debug(getPath(), x);
386429
}
387430
continue;
388431
}
389432
}
390433
doSendMessage();
434+
conn.connOk();
391435
lastTimeSend = System.currentTimeMillis();
392436
}
393437
} catch (Exception x) {
394438
if (connected) {
395439
connected = false;
396-
getTransport().close();
397440
error(getPath(), x);
398441
}
442+
conn.connDown(DSException.makeMessage(x));
399443
}
400444
}
401445
}

dslink-v2/src/main/java/com/acuity/iot/dsa/dslink/protocol/requester/DSOutboundSubscriptions.java

Lines changed: 16 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -92,25 +92,6 @@ public void handleUpdate(int sid, String ts, String sts, DSElement value) {
9292
stub.process(timestamp, value, status);
9393
}
9494

95-
public void onConnect() {
96-
}
97-
98-
public void onConnectFail() {
99-
}
100-
101-
public void onDisconnect() {
102-
for (DSOutboundSubscribeStubs stubs : pendingSubscribe) {
103-
stubs.onDisconnect();
104-
}
105-
pendingSubscribe.clear();
106-
pendingUnsubscribe.clear();
107-
for (DSOutboundSubscribeStubs stubs : pathMap.values()) {
108-
stubs.onDisconnect();
109-
}
110-
sidMap.clear();
111-
pathMap.clear();
112-
}
113-
11495
@Override
11596
public void write(DSSession session, MessageWriter writer) {
11697
if (!pendingSubscribe.isEmpty()) {
@@ -213,6 +194,22 @@ protected void doWriteUnsubscribe(MessageWriter writer, Integer sid) {
213194
out.value(sid);
214195
}
215196

197+
protected void onConnected() {
198+
}
199+
200+
protected void onDisconnected() {
201+
for (DSOutboundSubscribeStubs stubs : pendingSubscribe) {
202+
stubs.onDisconnect();
203+
}
204+
pendingSubscribe.clear();
205+
pendingUnsubscribe.clear();
206+
for (DSOutboundSubscribeStubs stubs : pathMap.values()) {
207+
stubs.onDisconnect();
208+
}
209+
sidMap.clear();
210+
pathMap.clear();
211+
}
212+
216213
///////////////////////////////////////////////////////////////////////////
217214
// Package / Private Methods
218215
///////////////////////////////////////////////////////////////////////////

0 commit comments

Comments
 (0)