Skip to content

Commit 1d78907

Browse files
committed
Big VPN redesign for thread-safety, performance & reliability
Previously, we had one thread to read from the VPN, one reader & writer thread per VPN session when required, from a pool, and one NIO thread that orchestrated the threadpool. This had a lot of races: data arrived in the session during writing, writes couldn't safely set the NIO subscriptions to handle write retries, and it was easy to fall into useless hot NIO loops or miss events entirely. In practice, this was easiest demoed with any speed test app: under load, the connection would lock up and lose data, dramatically dropping connection perf, and it'd also probably burn a bunch of CPU en route. Now, we have one thread that handles VPN reads, one that handles VPN writes (async, with reliable retries), and one that handles all upstream reads & writes with non-blocking NIO.
1 parent 15eb972 commit 1d78907

12 files changed

+491
-493
lines changed

app/src/main/java/tech/httptoolkit/android/ProxyVpnRunnable.kt

Lines changed: 19 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ package tech.httptoolkit.android
33
import android.os.ParcelFileDescriptor
44
import android.util.Log
55
import android.util.SparseArray
6-
import tech.httptoolkit.android.vpn.ClientPacketWriterImpl
6+
import tech.httptoolkit.android.vpn.ClientPacketWriter
77
import tech.httptoolkit.android.vpn.SessionHandler
88
import tech.httptoolkit.android.vpn.SessionManager
99
import tech.httptoolkit.android.vpn.socket.SocketNIODataService
@@ -27,23 +27,22 @@ class ProxyVpnRunnable(
2727
@Volatile private var running = false
2828

2929
// Packets from device apps downstream, heading upstream via this VPN
30-
private val clientReader = FileInputStream(vpnInterface.fileDescriptor)
30+
private val vpnReadStream = FileInputStream(vpnInterface.fileDescriptor)
3131

3232
// Packets from upstream servers, received by this VPN
33-
private val clientWriter = FileOutputStream(vpnInterface.fileDescriptor)
34-
private val clientPacketWriter = ClientPacketWriterImpl(clientWriter)
33+
private val vpnWriteStream = FileOutputStream(vpnInterface.fileDescriptor)
34+
private val vpnPacketWriter = ClientPacketWriter(vpnWriteStream)
35+
private val vpnPacketWriterThread = Thread(vpnPacketWriter)
3536

36-
// Allocate the buffer for a single packet.
37-
private val packet = ByteBuffer.allocate(MAX_PACKET_LEN)!!
37+
// Background service & task for non-blocking socket
38+
private val nioService = SocketNIODataService(vpnPacketWriter)
39+
private val dataServiceThread = Thread(nioService, "Socket NIO thread")
3840

39-
// SessionHandler, which handles sessions whilst writing packets
40-
private val handler = SessionHandler.getInstance().apply {
41-
setWriter(clientPacketWriter)
42-
}
41+
private val manager = SessionManager(nioService)
42+
private val handler = SessionHandler(manager, nioService, vpnPacketWriter)
4343

44-
// Background service & task for non-blocking socket
45-
private val dataService = SocketNIODataService(clientPacketWriter)
46-
private val dataServiceThread = Thread(dataService, "Socket NIO thread")
44+
// Allocate the buffer for a single packet.
45+
private val packet = ByteBuffer.allocate(MAX_PACKET_LEN)!!
4746

4847
// Our redirect rules, defining which traffic should be forwarded to what proxy address
4948
private val portRedirections = SparseArray<InetSocketAddress>().apply {
@@ -61,8 +60,9 @@ class ProxyVpnRunnable(
6160

6261
Log.i(TAG, "Vpn thread starting")
6362

64-
SessionManager.INSTANCE.setTcpPortRedirections(portRedirections)
63+
manager.setTcpPortRedirections(portRedirections)
6564
dataServiceThread.start()
65+
vpnPacketWriterThread.start()
6666

6767
var data: ByteArray
6868
var length: Int
@@ -71,7 +71,7 @@ class ProxyVpnRunnable(
7171
while (running) {
7272
data = packet.array()
7373

74-
length = clientReader.read(data)
74+
length = vpnReadStream.read(data)
7575
if (length > 0) {
7676
try {
7777
packet.limit(length)
@@ -93,14 +93,16 @@ class ProxyVpnRunnable(
9393
}
9494

9595
Log.i(TAG, "Vpn thread shutting down")
96-
9796
}
9897

9998
fun stop() {
10099
if (running) {
101100
running = false
102-
dataService.setShutdown(true)
101+
nioService.shutdown()
103102
dataServiceThread.interrupt()
103+
104+
vpnPacketWriter.shutdown()
105+
vpnPacketWriterThread.interrupt()
104106
} else {
105107
Log.w(TAG, "Vpn runnable stopped, but it's not running")
106108
}
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
/*
2+
* Copyright 2014 AT&T
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package tech.httptoolkit.android.vpn;
18+
19+
import android.util.Log;
20+
21+
import java.io.FileOutputStream;
22+
import java.io.IOException;
23+
import java.nio.channels.AsynchronousFileChannel;
24+
import java.nio.channels.Selector;
25+
import java.util.Queue;
26+
import java.util.concurrent.BlockingDeque;
27+
import java.util.concurrent.BlockingQueue;
28+
import java.util.concurrent.ConcurrentLinkedQueue;
29+
import java.util.concurrent.LinkedBlockingDeque;
30+
import java.util.concurrent.LinkedBlockingQueue;
31+
32+
import tech.httptoolkit.android.TagKt;
33+
34+
/**
35+
* write packet data back to VPN client stream. This class is thread safe.
36+
* @author Borey Sao
37+
* Date: May 22, 2014
38+
*/
39+
public class ClientPacketWriter implements Runnable {
40+
41+
private final String TAG = TagKt.getTAG(this);
42+
43+
private final FileOutputStream clientWriter;
44+
45+
private volatile boolean shutdown = false;
46+
private final BlockingDeque<byte[]> packetQueue = new LinkedBlockingDeque<>();
47+
48+
public ClientPacketWriter(FileOutputStream clientWriter) {
49+
this.clientWriter = clientWriter;
50+
}
51+
52+
public void write(byte[] data) {
53+
if (data.length > 30000) throw new Error("Packet too large");
54+
Log.i(TAG, "Putting " + data.length + " bytes on the write queue");
55+
packetQueue.addLast(data);
56+
}
57+
58+
public void shutdown() {
59+
this.shutdown = true;
60+
}
61+
62+
@Override
63+
public void run() {
64+
while (!this.shutdown) {
65+
try {
66+
byte[] data = this.packetQueue.take();
67+
try {
68+
this.clientWriter.write(data);
69+
} catch (IOException e) {
70+
Log.e(TAG, "Error writing " + data.length + " bytes to the VPN");
71+
e.printStackTrace();
72+
73+
this.packetQueue.addFirst(data); // Put the data back, so it's resent
74+
Thread.sleep(10); // Add an arbitrary tiny pause, in case that helps
75+
}
76+
} catch (InterruptedException e) { }
77+
}
78+
}
79+
}

app/src/main/java/tech/httptoolkit/android/vpn/ClientPacketWriterImpl.java

Lines changed: 0 additions & 43 deletions
This file was deleted.

app/src/main/java/tech/httptoolkit/android/vpn/IClientPacketWriter.java

Lines changed: 0 additions & 28 deletions
This file was deleted.

app/src/main/java/tech/httptoolkit/android/vpn/Session.java

Lines changed: 26 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import android.util.Log;
2020

2121
import tech.httptoolkit.android.vpn.network.ip.IPv4Header;
22+
import tech.httptoolkit.android.vpn.socket.ICloseSession;
2223
import tech.httptoolkit.android.vpn.transport.tcp.TCPHeader;
2324
import tech.httptoolkit.android.vpn.transport.udp.UDPHeader;
2425
import tech.httptoolkit.android.vpn.util.PacketUtil;
@@ -36,15 +37,15 @@
3637
* Date: May 19, 2014
3738
*/
3839
public class Session {
39-
private static final String TAG = "Session";
40+
private final String TAG = "Session";
4041

4142
private AbstractSelectableChannel channel;
4243

43-
private int destIp = 0;
44-
private int destPort = 0;
44+
private final int destIp;
45+
private final int destPort;
4546

46-
private int sourceIp = 0;
47-
private int sourcePort = 0;
47+
private final int sourceIp;
48+
private final int sourcePort;
4849

4950
//sequence received from client
5051
private long recSequence = 0;
@@ -69,10 +70,10 @@ public class Session {
6970
private boolean isConnected = false;
7071

7172
//receiving buffer for storing data from remote host
72-
private ByteArrayOutputStream receivingStream;
73+
private final ByteArrayOutputStream receivingStream;
7374

7475
//sending buffer for storing data from vpn client to be send to destination host
75-
private ByteArrayOutputStream sendingStream;
76+
private final ByteArrayOutputStream sendingStream;
7677

7778
private boolean hasReceivedLastSegment = false;
7879

@@ -85,7 +86,7 @@ public class Session {
8586
private boolean closingConnection = false;
8687

8788
//indicate data from client is ready for sending to destination
88-
private boolean isDataForSendingReady = false;
89+
private volatile boolean isDataForSendingReady = false;
8990

9091
//store data for retransmission
9192
private byte[] unackData = null;
@@ -102,24 +103,31 @@ public class Session {
102103
//indicate that vpn client has sent FIN flag and it has been acked
103104
private boolean ackedToFin = false;
104105

105-
//indicate that this session is currently being worked on by some SocketDataWorker already
106-
private volatile boolean isBusyRead = false;
107-
private volatile boolean isBusyWrite = false;
108-
109106
//closing session and aborting connection, will be done by background task
110107
private volatile boolean abortingConnection = false;
111108

112109
private SelectionKey selectionkey = null;
113110

114111
public long connectionStartTime = 0;
112+
113+
private final ICloseSession sessionCloser;
115114

116-
Session(int sourceIp, int sourcePort, int destinationIp, int destinationPort){
115+
Session(
116+
int sourceIp,
117+
int sourcePort,
118+
int destinationIp,
119+
int destinationPort,
120+
ICloseSession sessionCloser
121+
) {
117122
receivingStream = new ByteArrayOutputStream();
118123
sendingStream = new ByteArrayOutputStream();
124+
119125
this.sourceIp = sourceIp;
120126
this.sourcePort = sourcePort;
121127
this.destIp = destinationIp;
122128
this.destPort = destinationPort;
129+
130+
this.sessionCloser = sessionCloser;
123131
}
124132

125133
/**
@@ -357,18 +365,6 @@ boolean isAckedToFin() {
357365
return ackedToFin;
358366
}
359367

360-
public boolean isBusyRead() {
361-
return isBusyRead;
362-
}
363-
public void setBusyread(boolean isbusyread) {
364-
this.isBusyRead = isbusyread;
365-
}
366-
public boolean isBusywrite() {
367-
return isBusyWrite;
368-
}
369-
public void setBusywrite(boolean isbusywrite) {
370-
this.isBusyWrite = isbusywrite;
371-
}
372368
public boolean isAbortingConnection() {
373369
return abortingConnection;
374370
}
@@ -378,7 +374,7 @@ public void setAbortingConnection(boolean abortingConnection) {
378374
public SelectionKey getSelectionKey() {
379375
return selectionkey;
380376
}
381-
void setSelectionKey(SelectionKey selectionkey) {
377+
public void setSelectionKey(SelectionKey selectionkey) {
382378
this.selectionkey = selectionkey;
383379
}
384380

@@ -403,6 +399,10 @@ public void unsubscribeKey(int OP) {
403399
}
404400
}
405401

402+
public void closeSession() {
403+
this.sessionCloser.closeSession(this);
404+
}
405+
406406
public String getSessionKey() {
407407
return Session.getSessionKey(this.destIp, this.destPort, this.sourceIp, this.sourcePort);
408408
}

0 commit comments

Comments
 (0)