Skip to content

Commit a1cb224

Browse files
committed
fix: writing message before upgrade is finished
1 parent 1a271f1 commit a1cb224

File tree

2 files changed

+41
-36
lines changed

2 files changed

+41
-36
lines changed

driver/src/main/java/oracle/nosql/driver/httpclient/HttpProtocolNegotiationHandler.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010
import static oracle.nosql.driver.util.LogUtil.logFine;
1111

1212
import java.net.SocketAddress;
13-
import java.util.Objects;
1413
import java.util.logging.Logger;
1514

1615
import io.netty.channel.ChannelHandlerContext;

driver/src/main/java/oracle/nosql/driver/httpclient/HttpUtil.java

Lines changed: 41 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -9,24 +9,17 @@
99

1010
import static io.netty.handler.logging.LogLevel.DEBUG;
1111

12-
import java.net.InetSocketAddress;
1312
import java.net.SocketAddress;
1413
import java.util.Objects;
1514

16-
import io.netty.buffer.Unpooled;
15+
import io.netty.channel.ChannelDuplexHandler;
1716
import io.netty.channel.ChannelHandlerContext;
18-
import io.netty.channel.ChannelInboundHandlerAdapter;
19-
import io.netty.channel.ChannelOutboundHandler;
2017
import io.netty.channel.ChannelPipeline;
2118
import io.netty.channel.ChannelPromise;
22-
import io.netty.handler.codec.http.DefaultFullHttpRequest;
2319
import io.netty.handler.codec.http.HttpClientCodec;
2420
import io.netty.handler.codec.http.HttpClientUpgradeHandler;
25-
import io.netty.handler.codec.http.HttpHeaderNames;
2621
import io.netty.handler.codec.http.HttpMessage;
27-
import io.netty.handler.codec.http.HttpMethod;
2822
import io.netty.handler.codec.http.HttpObjectAggregator;
29-
import io.netty.handler.codec.http.HttpVersion;
3023
import io.netty.handler.codec.http2.DefaultHttp2Connection;
3124
import io.netty.handler.codec.http2.DelegatingDecompressorFrameListener;
3225
import io.netty.handler.codec.http2.Http2ClientUpgradeCodec;
@@ -83,7 +76,7 @@ public static void configureH2C(ChannelPipeline p, int maxChunkSize, int maxCont
8376

8477
HttpClientCodec sourceCodec = new HttpClientCodec(4096, 8192, maxChunkSize);
8578
Http2ClientUpgradeCodec upgradeCodec = new Http2ClientUpgradeCodec(connectionHandler);
86-
HttpClientUpgradeHandler upgradeHandler = new HttpClientUpgradeHandler(sourceCodec, upgradeCodec, maxContentLength);
79+
HttpClientUpgradeHandler upgradeHandler = new UpgradeHandler(sourceCodec, upgradeCodec, maxContentLength);
8780
p.addLast(sourceCodec,
8881
upgradeHandler,
8982
new UpgradeRequestHandler(maxContentLength));
@@ -101,35 +94,31 @@ public static void writeBufferedMessages(ChannelHandlerContext ctx, RecyclableAr
10194
bufferedMessages.recycle();
10295
}
10396

97+
private static final class UpgradeHandler extends HttpClientUpgradeHandler {
98+
public UpgradeHandler(SourceCodec sourceCodec, UpgradeCodec upgradeCodec, int maxContentLength) {
99+
super(sourceCodec, upgradeCodec, maxContentLength);
100+
}
101+
102+
@Override
103+
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
104+
super.handlerRemoved(ctx);
105+
ctx.fireUserEventTriggered(UpgradeFinishedEvent.INSTANCE);
106+
}
107+
}
108+
104109
/**
105110
* A handler that triggers the cleartext upgrade to HTTP/2 by sending an initial HTTP request.
106111
*/
107-
private static final class UpgradeRequestHandler extends ChannelInboundHandlerAdapter implements ChannelOutboundHandler {
112+
private static final class UpgradeRequestHandler extends ChannelDuplexHandler {
108113
private final int maxContentLength;
109114
private final RecyclableArrayList bufferedMessages = RecyclableArrayList.newInstance();
115+
private boolean upgradeTried = false;
116+
private boolean upgrading = false;
110117

111118
public UpgradeRequestHandler(int maxContentLength) {
112119
this.maxContentLength = maxContentLength;
113120
}
114121

115-
@Override
116-
public void channelActive(ChannelHandlerContext ctx) throws Exception {
117-
DefaultFullHttpRequest upgradeRequest =
118-
new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/", Unpooled.EMPTY_BUFFER);
119-
120-
// Set HOST header as the remote peer may require it.
121-
InetSocketAddress remote = (InetSocketAddress) ctx.channel().remoteAddress();
122-
String hostString = remote.getHostString();
123-
if (hostString == null) {
124-
hostString = remote.getAddress().getHostAddress();
125-
}
126-
upgradeRequest.headers().set(HttpHeaderNames.HOST, hostString + ':' + remote.getPort());
127-
128-
ctx.writeAndFlush(upgradeRequest);
129-
130-
ctx.fireChannelActive();
131-
}
132-
133122
@Override
134123
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
135124
super.handlerRemoved(ctx);
@@ -142,23 +131,33 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exc
142131
HttpClientUpgradeHandler.UpgradeEvent reg = (HttpClientUpgradeHandler.UpgradeEvent) evt;
143132
if (reg == HttpClientUpgradeHandler.UpgradeEvent.UPGRADE_REJECTED) {
144133
ctx.pipeline().addAfter(ctx.name(), AGG_HANDLER_NAME, new HttpObjectAggregator(maxContentLength));
145-
ctx.pipeline().remove(this);
146-
} else if (reg == HttpClientUpgradeHandler.UpgradeEvent.UPGRADE_SUCCESSFUL) {
147-
ctx.pipeline().remove(this);
148134
}
149135
}
136+
if (evt instanceof UpgradeFinishedEvent) {
137+
// Upgrade Finished (both SUCCESSFUL or REJECTED)
138+
// The HttpClientUpgradeHandler is removed from pipeline
139+
// The pipeline is now configured, and ready
140+
// Remove this handler and flush the buffered messages
141+
ctx.pipeline().remove(this);
142+
}
150143
ctx.fireUserEventTriggered(evt);
151144
}
152145

153146
@Override
154147
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
155148
if (msg instanceof HttpMessage) {
156-
Pair<Object, ChannelPromise> p = Pair.of(msg, promise);
157-
this.bufferedMessages.add(p);
158-
return;
149+
// Only let the very first HttpMessage pass through
150+
// The H2C upgrade handler modifies the first message,
151+
// and tries to negotiate a new protocol with the server
152+
// This process takes one round trip
153+
if (upgrading) {
154+
Pair<Object, ChannelPromise> p = Pair.of(msg, promise);
155+
this.bufferedMessages.add(p);
156+
return;
157+
}
158+
upgrading = true;
159159
}
160160

161-
// let non-http message to pass, so the HTTP2 preface and settings frame can be sent
162161
ctx.write(msg, promise);
163162
}
164163

@@ -198,6 +197,13 @@ public void flush(ChannelHandlerContext ctx) throws Exception {
198197
}
199198
}
200199

200+
public static final class UpgradeFinishedEvent {
201+
private static final UpgradeFinishedEvent INSTANCE = new UpgradeFinishedEvent();
202+
203+
private UpgradeFinishedEvent() {
204+
}
205+
}
206+
201207
public static class Pair<A, B> {
202208

203209
public final A first;

0 commit comments

Comments
 (0)