Skip to content

Commit 7a5830b

Browse files
author
Aaron
committed
Durable sessions
1 parent 9ea00d0 commit 7a5830b

37 files changed

+719
-487
lines changed

dslink-v2-poc/dslink.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,9 @@
1010
"value": "org.iot.dsa.dslink.poc.MainNode"
1111
},
1212
"log": {
13-
"desc": "all, trace, debug, fine, warn, info, error, admin, fatal, none",
13+
"desc": "all, trace, debug, warn, info, error, admin, fatal, none",
1414
"type": "enum",
15-
"value": "warn"
15+
"value": "info"
1616
},
1717
"token": {
1818
"desc": "Authentication token for the broker.",

dslink-v2-websocket/src/main/java/org/iot/dsa/dslink/websocket/WsBinaryTransport.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,6 @@
55
import java.io.IOException;
66
import java.net.URI;
77
import java.nio.ByteBuffer;
8-
import java.util.List;
9-
import java.util.Map;
108
import javax.websocket.*;
119
import org.glassfish.tyrus.client.ClientManager;
1210
import org.iot.dsa.util.DSException;
@@ -100,7 +98,7 @@ public DSTransport open() {
10098
client.setDefaultMaxBinaryMessageBufferSize(64 * 1024);
10199
client.setDefaultMaxTextMessageBufferSize(64 * 1024);
102100
client.connectToServer(this, new URI(getConnectionUrl()));
103-
fine(fine() ? "Transport open" : null);
101+
debug(debug() ? "Transport open" : null);
104102
} catch (Exception x) {
105103
DSException.throwRuntime(x);
106104
}

dslink-v2-websocket/src/main/java/org/iot/dsa/dslink/websocket/WsTextTransport.java

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,15 @@
88
import java.io.Reader;
99
import java.io.Writer;
1010
import java.net.URI;
11-
import java.util.List;
12-
import java.util.Map;
13-
import javax.websocket.*;
11+
import javax.websocket.ClientEndpoint;
12+
import javax.websocket.CloseReason;
13+
import javax.websocket.EndpointConfig;
14+
import javax.websocket.OnClose;
15+
import javax.websocket.OnError;
16+
import javax.websocket.OnMessage;
17+
import javax.websocket.OnOpen;
18+
import javax.websocket.RemoteEndpoint;
19+
import javax.websocket.Session;
1420
import org.glassfish.tyrus.client.ClientManager;
1521
import org.iot.dsa.util.DSException;
1622

@@ -54,6 +60,7 @@ public DSTransport close() {
5460
return this;
5561
}
5662
open = false;
63+
debug(debug() ? "WsTextTransport.close()" : null, new Exception());
5764
try {
5865
if (session != null) {
5966
session.close();
@@ -126,7 +133,9 @@ public void onMessage(Session session, String msgPart, boolean isLast) {
126133
@OnOpen
127134
public void onOpen(Session session, EndpointConfig config) {
128135
open = true;
136+
buffer.open();
129137
this.session = session;
138+
debug("WsTextTransport open");
130139
}
131140

132141
@Override
@@ -141,9 +150,6 @@ public DSTransport open() {
141150
client.setDefaultMaxBinaryMessageBufferSize(64 * 1024);
142151
client.setDefaultMaxTextMessageBufferSize(64 * 1024);
143152
client.connectToServer(this, new URI(getConnectionUrl()));
144-
buffer.open();
145-
open = true;
146-
fine(fine() ? "Transport open" : null);
147153
} catch (Exception x) {
148154
DSException.throwRuntime(x);
149155
}
@@ -159,7 +165,7 @@ public DSTransport open() {
159165
*/
160166
public void write(String text, boolean isLast) {
161167
if (!open) {
162-
throw new DSIoException("Closed " + getConnectionUrl());
168+
throw new DSIoException("Not open " + getConnectionUrl());
163169
}
164170
try {
165171
RemoteEndpoint.Basic basic = session.getBasicRemote();

dslink-v2/src/main/java/com/acuity/iot/dsa/dslink/Template.java

Lines changed: 0 additions & 30 deletions
This file was deleted.

dslink-v2/src/main/java/com/acuity/iot/dsa/dslink/io/DSCharBuffer.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ public int available() {
4545
public synchronized void clear() {
4646
length = 0;
4747
offset = 0;
48-
notify();
48+
notifyAll();
4949
}
5050

5151
/**
@@ -56,7 +56,7 @@ public synchronized void close() {
5656
return;
5757
}
5858
open = false;
59-
notify();
59+
notifyAll();
6060
}
6161

6262
/**
@@ -68,7 +68,7 @@ public synchronized void close(RuntimeException toThrow) {
6868
}
6969
closeException = toThrow;
7070
open = false;
71-
notify();
71+
notifyAll();
7272
}
7373

7474
/**
@@ -106,7 +106,7 @@ public synchronized void open() {
106106
offset = 0;
107107
open = true;
108108
closeException = null;
109-
notify();
109+
notifyAll();
110110
}
111111

112112
/**
@@ -128,7 +128,7 @@ public synchronized void put(char b) {
128128
}
129129
buffer[length + offset] = b;
130130
length++;
131-
notify();
131+
notifyAll();
132132
}
133133

134134
/**
@@ -163,7 +163,7 @@ public synchronized void put(char[] msg, int off, int len) {
163163
//System.arraycopy(msg, off, buffer, length + offset, len);
164164
System.arraycopy(msg, off, buffer, offset, len);
165165
length += len;
166-
notify();
166+
notifyAll();
167167
}
168168

169169
/**
@@ -185,7 +185,7 @@ public synchronized void put(String msg) {
185185
}
186186
msg.getChars(0, len, buffer, offset + length);
187187
length += len;
188-
notify();
188+
notifyAll();
189189
}
190190

191191
/**
@@ -204,7 +204,7 @@ public synchronized int read() {
204204
} catch (InterruptedException ignore) {
205205
}
206206
}
207-
notify();
207+
notifyAll();
208208
if (!open) {
209209
if (length == 0) {
210210
if (closeException != null) {
@@ -241,7 +241,7 @@ public synchronized int read(char[] buf, int off, int len) {
241241
} catch (InterruptedException ignore) {
242242
}
243243
}
244-
notify();
244+
notifyAll();
245245
if (!open) {
246246
if (length == 0) {
247247
if (closeException != null) {

dslink-v2/src/main/java/com/acuity/iot/dsa/dslink/protocol/DSSession.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -289,14 +289,15 @@ protected void onDisconnected() {
289289
synchronized (outgoingMutex) {
290290
outgoingRequests.clear();
291291
outgoingResponses.clear();
292+
outgoingMutex.notifyAll();
292293
}
293294
try {
294-
writeThread.join(); //TODO - timeout?
295+
writeThread.join();
295296
} catch (Exception x) {
296297
debug(getPath(), x);
297298
}
298299
try {
299-
readThread.join(); //TODO - timeout?
300+
readThread.join();
300301
} catch (Exception x) {
301302
debug(getPath(), x);
302303
}
@@ -332,7 +333,7 @@ protected void requeueOutgoingResponse(OutboundMessage arg) {
332333
*/
333334
protected void setAckRcvd(int ackRcvd) {
334335
if (ackRcvd < this.ackRcvd) {
335-
warn(warn() ? String.format("Ack rcvd %s < last %s", ackRcvd, this.ackRcvd) : null);
336+
debug(debug() ? String.format("Ack rcvd %s < last %s", ackRcvd, this.ackRcvd) : null);
336337
}
337338
this.ackRcvd = ackRcvd;
338339
notifyOutgoing();
@@ -399,8 +400,8 @@ public void run() {
399400
if (connected) {
400401
connected = false;
401402
error(getPath(), x);
403+
conn.connDown(DSException.makeMessage(x));
402404
}
403-
conn.connDown(DSException.makeMessage(x));
404405
}
405406
}
406407
}
@@ -438,8 +439,8 @@ public void run() {
438439
if (connected) {
439440
connected = false;
440441
error(getPath(), x);
442+
conn.connDown(DSException.makeMessage(x));
441443
}
442-
conn.connDown(DSException.makeMessage(x));
443444
}
444445
}
445446
}

dslink-v2/src/main/java/com/acuity/iot/dsa/dslink/protocol/responder/DSInboundInvoke.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ public void close() {
8686
}
8787
state = STATE_CLOSE_PENDING;
8888
enqueueResponse();
89-
fine(fine() ? getPath() + " invoke closed locally" : null);
89+
debug(debug() ? getPath() + " invoke closed locally" : null);
9090
}
9191

9292
@Override
@@ -97,7 +97,7 @@ public void close(Exception reason) {
9797
closeReason = reason;
9898
state = STATE_CLOSE_PENDING;
9999
enqueueResponse();
100-
fine(fine() ? getPath() + " invoke closed locally" : null);
100+
debug(debug() ? getPath() + " invoke closed locally" : null);
101101
}
102102

103103
/**
@@ -133,7 +133,7 @@ public void onClose(Integer requestId) {
133133
return;
134134
}
135135
state = STATE_CLOSED;
136-
fine(debug() ? getPath() + " invoke closed" : null);
136+
debug(debug() ? getPath() + " invoke closed" : null);
137137
synchronized (this) {
138138
updateHead = updateTail = null;
139139
}

dslink-v2/src/main/java/com/acuity/iot/dsa/dslink/protocol/responder/DSInboundList.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ public void close() {
9797
}
9898
state = STATE_CLOSE_PENDING;
9999
enqueueResponse();
100-
fine(fine() ? getPath() + " list closed locally" : null);
100+
debug(debug() ? getPath() + " list closed locally" : null);
101101
}
102102

103103
@Override
@@ -108,7 +108,7 @@ public void close(Exception reason) {
108108
state = STATE_CLOSE_PENDING;
109109
closeReason = reason;
110110
enqueueResponse();
111-
fine(fine() ? getPath() + " list closed locally" : null);
111+
debug(debug() ? getPath() + " list closed locally" : null);
112112
}
113113

114114
@Override
@@ -137,7 +137,7 @@ public void onClose(Integer requestId) {
137137
return;
138138
}
139139
state = STATE_CLOSED;
140-
fine(debug() ? getPath() + " list closed" : null);
140+
debug(debug() ? getPath() + " list closed" : null);
141141
synchronized (this) {
142142
updateHead = updateTail = null;
143143
}

dslink-v2/src/main/java/com/acuity/iot/dsa/dslink/protocol/responder/DSInboundSubscription.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -294,14 +294,14 @@ void onClose() {
294294
closeHandler.onClose(getSubscriptionId());
295295
}
296296
} catch (Exception x) {
297-
manager.warn(manager.getPath(), x);
297+
manager.debug(manager.getPath(), x);
298298
}
299299
try {
300300
if (node != null) {
301301
node.unsubscribe(DSNode.VALUE_TOPIC, child, this);
302302
}
303303
} catch (Exception x) {
304-
manager.warn(manager.getPath(), x);
304+
manager.debug(manager.getPath(), x);
305305
}
306306
}
307307

dslink-v2/src/main/java/com/acuity/iot/dsa/dslink/protocol/responder/DSInboundSubscriptions.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ public void unsubscribe(Integer sid) {
9999
try {
100100
subscription.onClose();
101101
} catch (Exception x) {
102-
warn(warn() ? subscription.toString() : null, x);
102+
debug(debug() ? subscription.toString() : null, x);
103103
}
104104
}
105105
}
@@ -146,6 +146,15 @@ protected void enqueue(DSInboundSubscription subscription) {
146146
responder.sendResponse(this);
147147
}
148148

149+
@Override
150+
protected String getLogName() {
151+
String pre = "";
152+
if (responder != null) {
153+
pre = responder.getLogName();
154+
}
155+
return pre + "." + getClass().getSimpleName();
156+
}
157+
149158
/**
150159
* Returns a DSInboundSubscription for v1.
151160
*

0 commit comments

Comments
 (0)