Skip to content

Commit e33cab7

Browse files
author
Aaron
committed
Durable sessions
1 parent 7a5830b commit e33cab7

File tree

11 files changed

+90
-38
lines changed

11 files changed

+90
-38
lines changed

build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ subprojects {
44
apply plugin: 'maven'
55

66
group 'org.iot-dsa'
7-
version '0.31.1'
7+
version '0.31.5'
88

99
sourceCompatibility = 1.6
1010
targetCompatibility = 1.6

dslink-v2/src/main/java/com/acuity/iot/dsa/dslink/protocol/DSSession.java

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ public abstract class DSSession extends DSNode implements DSIConnected {
3131
protected static final String REQUESTER_ALLOWED = "Requester Allowed";
3232
protected static final String RESPONDER = "Responder";
3333

34+
private static final int MAX_MISSING_ACKS = 8;
3435
private static final int MAX_MSG_ID = Integer.MAX_VALUE;
3536
private static final long MSG_TIMEOUT = 60000;
3637

@@ -40,6 +41,7 @@ public abstract class DSSession extends DSNode implements DSIConnected {
4041

4142
private int ackRcvd = -1;
4243
private int ackToSend = -1;
44+
private int ackRequired = 0;
4345
private boolean connected = false;
4446
private DSLinkConnection connection;
4547
private long lastTimeRecv;
@@ -74,7 +76,7 @@ public DSSession(DSLinkConnection connection) {
7476
public void enqueueOutgoingRequest(OutboundMessage arg) {
7577
if (connected) {
7678
if (!isRequesterAllowed()) {
77-
throw new IllegalStateException("Requests forbidden");
79+
throw new IllegalStateException("Requester not allowed");
7880
}
7981
synchronized (outgoingMutex) {
8082
outgoingRequests.add(arg);
@@ -211,6 +213,9 @@ protected String getLogName() {
211213
}
212214

213215
protected int getMissingAcks() {
216+
if (ackRequired > 0) {
217+
return ackRequired - ackRcvd - 1;
218+
}
214219
return messageId - ackRcvd - 1;
215220
}
216221

@@ -339,6 +344,13 @@ protected void setAckRcvd(int ackRcvd) {
339344
notifyOutgoing();
340345
}
341346

347+
/**
348+
* Used to indicate that the current message ID requires an ack.
349+
*/
350+
protected void setAckRequired() {
351+
ackRequired = messageId;
352+
}
353+
342354
/**
343355
* Call for each incoming message id that needs to be acked.
344356
*/
@@ -350,7 +362,7 @@ protected void setAckToSend(int ackToSend) {
350362
}
351363

352364
protected boolean waitingForAcks() {
353-
boolean ret = getMissingAcks() > 8;
365+
boolean ret = getMissingAcks() > MAX_MISSING_ACKS;
354366
if (ret) {
355367
debug(debug() ? "Waiting for " + getMissingAcks() + " acks" : null);
356368
}

dslink-v2/src/main/java/com/acuity/iot/dsa/dslink/protocol/requester/DSRequester.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,6 @@ public void onConnected() {
7878
}
7979

8080
public void onDisconnected() {
81-
subscriptions.onDisconnected();
8281
Iterator<Entry<Integer, DSOutboundStub>> it = requests.entrySet().iterator();
8382
Map.Entry<Integer, DSOutboundStub> me;
8483
while (it.hasNext()) {
@@ -90,6 +89,7 @@ public void onDisconnected() {
9089
}
9190
it.remove();
9291
}
92+
subscriptions.onDisconnected();
9393
}
9494

9595
@Override

dslink-v2/src/main/java/com/acuity/iot/dsa/dslink/protocol/responder/DSInboundList.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ public void close(Exception reason) {
108108
state = STATE_CLOSE_PENDING;
109109
closeReason = reason;
110110
enqueueResponse();
111-
debug(debug() ? getPath() + " list closed locally" : null);
111+
debug(debug() ? getPath() + " list closed locally" : null, reason);
112112
}
113113

114114
@Override

dslink-v2/src/main/java/com/acuity/iot/dsa/dslink/protocol/responder/DSInboundSubscription.java

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,10 @@ public Integer getSubscriptionId() {
9191
return sid;
9292
}
9393

94+
public int getQos() {
95+
return qos;
96+
}
97+
9498
/**
9599
* For v2 only.
96100
*/
@@ -119,11 +123,27 @@ public void onUnsubscribed(DSTopic topic, DSNode node, DSInfo info) {
119123
close();
120124
}
121125

126+
/**
127+
* For v2 only.
128+
*/
122129
public DSInboundSubscription setCloseAfterUpdate(boolean closeAfterUpdate) {
123130
this.closeAfterUpdate = closeAfterUpdate;
124131
return this;
125132
}
126133

134+
public void setSubscriptionId(Integer id) {
135+
sid = id;
136+
}
137+
138+
public void setQos(Integer val) {
139+
synchronized (this) {
140+
if (val == 0) {
141+
updateHead = updateTail;
142+
}
143+
qos = val;
144+
}
145+
}
146+
127147
@Override
128148
public String toString() {
129149
return "Subscription (" + getSubscriptionId() + ") " + getPath();
@@ -164,7 +184,9 @@ public void update(long timestamp, DSIValue value, DSStatus status) {
164184
enqueued = true;
165185
}
166186
}
167-
manager.enqueue(this);
187+
if (sid != 0) {
188+
manager.enqueue(this);
189+
}
168190
}
169191

170192
///////////////////////////////////////////////////////////////////////////

dslink-v2/src/main/java/com/acuity/iot/dsa/dslink/protocol/responder/DSInboundSubscriptions.java

Lines changed: 34 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -72,18 +72,26 @@ public DSResponder getResponder() {
7272
* Create or update a subscription.
7373
*/
7474
public DSInboundSubscription subscribe(Integer sid, String path, int qos) {
75-
trace(trace() ? "Subscribing " + path : null);
76-
DSInboundSubscription subscription = sidMap.get(sid);
77-
if (subscription == null) {
75+
DSInboundSubscription subscription = pathMap.get(path);
76+
if (subscription != null) {
77+
trace(trace() ? String.format("Updating (sid=%s,qos=%s) %s", sid, qos, path)
78+
: null);
79+
pathMap.remove(subscription.getPath());
80+
if (sid != subscription.getSubscriptionId()) {
81+
sidMap.remove(sid);
82+
subscription.setSubscriptionId(sid);
83+
sidMap.put(sid, subscription);
84+
}
85+
if (qos != subscription.getQos()) {
86+
subscription.setQos(qos);
87+
}
88+
enqueue(subscription);
89+
} else {
90+
trace(trace() ? String.format("Subscribing (sid=%s,qos=%s) %s", sid, qos, path)
91+
: null);
7892
subscription = makeSubscription(sid, path, qos);
7993
sidMap.put(sid, subscription);
8094
pathMap.put(path, subscription);
81-
} else if (!path.equals(subscription.getPath())) {
82-
unsubscribe(sid);
83-
return subscribe(sid, path, qos);
84-
} else {
85-
subscription.setQos(qos);
86-
//TODO refresh subscription, align w/v2
8795
}
8896
return subscription;
8997
}
@@ -94,7 +102,8 @@ public DSInboundSubscription subscribe(Integer sid, String path, int qos) {
94102
public void unsubscribe(Integer sid) {
95103
DSInboundSubscription subscription = sidMap.remove(sid);
96104
if (subscription != null) {
97-
trace(trace() ? "Unsubscribing " + subscription.getPath() : null);
105+
trace(trace() ? String.format("Unsubscribe (sid=%s) %s ", sid, subscription.getPath())
106+
: null);
98107
pathMap.remove(subscription.getPath());
99108
try {
100109
subscription.onClose();
@@ -148,11 +157,10 @@ protected void enqueue(DSInboundSubscription subscription) {
148157

149158
@Override
150159
protected String getLogName() {
151-
String pre = "";
152160
if (responder != null) {
153-
pre = responder.getLogName();
161+
return responder.getLogName() + ".subscriptions";
154162
}
155-
return pre + "." + getClass().getSimpleName();
163+
return getClass().getName();
156164
}
157165

158166
/**
@@ -173,8 +181,19 @@ protected void onConnected() {
173181
* Unsubscribes all.
174182
*/
175183
protected void onDisconnected() {
176-
for (Integer i : sidMap.keySet()) {
177-
unsubscribe(i);
184+
DSInboundSubscription sub;
185+
for (Map.Entry<String, DSInboundSubscription> me : pathMap.entrySet()) {
186+
sub = me.getValue();
187+
if (sub.getQos() < 2) {
188+
unsubscribe(sub.getSubscriptionId());
189+
} else {
190+
sidMap.remove(sub.getSubscriptionId());
191+
sub.setSubscriptionId(0);
192+
}
193+
}
194+
synchronized (this) {
195+
outbound.clear();
196+
enqueued = false;
178197
}
179198
}
180199

dslink-v2/src/main/java/com/acuity/iot/dsa/dslink/protocol/responder/DSResponder.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ public void onConnected() {
8080
}
8181

8282
public void onDisconnected() {
83+
getSubscriptions().onDisconnected();
8384
Iterator<Entry<Integer, DSStream>> it = inboundRequests.entrySet().iterator();
8485
Map.Entry<Integer, DSStream> me;
8586
while (it.hasNext()) {
@@ -91,7 +92,6 @@ public void onDisconnected() {
9192
}
9293
it.remove();
9394
}
94-
getSubscriptions().onDisconnected();
9595
}
9696

9797
public DSStream removeRequest(Integer rid) {

dslink-v2/src/main/java/com/acuity/iot/dsa/dslink/protocol/v1/DS1Session.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -112,16 +112,19 @@ protected void doRecvMessage() throws IOException {
112112
@Override
113113
protected void doSendMessage() {
114114
try {
115-
requestsNext = !requestsNext;
116-
beginMessage();
117115
if (!waitingForAcks()) {
116+
requestsNext = !requestsNext;
117+
beginMessage();
118118
send(requestsNext);
119119
if (!shouldEndMessage()) {
120120
send(!requestsNext);
121121
}
122+
endMessage();
123+
lastMessageSent = System.currentTimeMillis();
124+
if (requestsBegun || responsesBegun) {
125+
setAckRequired();
126+
}
122127
}
123-
endMessage();
124-
lastMessageSent = System.currentTimeMillis();
125128
} finally {
126129
requestsBegun = false;
127130
responsesBegun = false;
@@ -280,6 +283,7 @@ private void processEnvelope(DSIReader reader) {
280283
if (reader.next() != Token.BOOLEAN) {
281284
throw new IllegalStateException("Allowed not a boolean");
282285
}
286+
debug(debug() ? "Requester allowed" : null);
283287
setRequesterAllowed(reader.getBoolean());
284288
} else if (key.equals("salt")) {
285289
reader.next();

dslink-v2/src/main/java/com/acuity/iot/dsa/dslink/protocol/v2/DS2Session.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,7 @@ private void send(boolean requests) {
203203
msg = null;
204204
}
205205
}
206+
setAckRequired();
206207
} else if (hasPingToSend()) {
207208
msg = new PingMessage(this);
208209
} else if (hasAckToSend()) {

dslink-v2/src/main/java/org/iot/dsa/conn/DSConnection.java

Lines changed: 1 addition & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,6 @@ public abstract class DSConnection extends DSNode implements DSIStatus, Runnable
4141
// Class Fields
4242
///////////////////////////////////////////////////////////////////////////
4343

44-
static final String ENABLED = "Enabled";
4544
static final String FAILURE = "Failure";
4645
static final String LAST_OK = "Last OK";
4746
static final String LAST_FAIL = "Last Fail";
@@ -53,7 +52,6 @@ public abstract class DSConnection extends DSNode implements DSIStatus, Runnable
5352
// Instance Fields
5453
///////////////////////////////////////////////////////////////////////////
5554

56-
private DSInfo enabled = getInfo(ENABLED);
5755
private DSInfo failure = getInfo(FAILURE);
5856
private DSInfo lastFail = getInfo(LAST_FAIL);
5957
private DSInfo lastOk = getInfo(LAST_OK);
@@ -194,7 +192,7 @@ public boolean isConnected() {
194192
}
195193

196194
public boolean isEnabled() {
197-
return enabled.getElement().toBoolean();
195+
return true;
198196
}
199197

200198
/**
@@ -286,7 +284,6 @@ protected void configOk() {
286284

287285
@Override
288286
protected void declareDefaults() {
289-
declareDefault(ENABLED, DSBool.TRUE);
290287
declareDefault(STATUS, DSStatus.down).setReadOnly(true).setTransient(true);
291288
declareDefault(STATE, DSConnectionState.DISCONNECTED).setReadOnly(true).setTransient(true);
292289
declareDefault(STATE_TIME, DSDateTime.currentTime()).setReadOnly(true).setTransient(true);
@@ -327,14 +324,6 @@ protected boolean isOperational() {
327324
return isRunning() && isConfigOk() && isEnabled();
328325
}
329326

330-
protected void onChildChanged(DSInfo info) {
331-
if (info == enabled) {
332-
synchronized (this) {
333-
notify();
334-
}
335-
}
336-
}
337-
338327
/**
339328
* Subclasses must establish the connection. You must call connOk or connDown, but that can
340329
* be async after this method has returned. You can throw an exception from this method instead

0 commit comments

Comments
 (0)