decs) {
+ this.decs = decs;
+ }
+
+ /**
+ * Decode a full message (reverse order if stacking).
+ */
+ public byte[] decode(final byte[] data) throws Exception {
+ byte[] out = data;
+ for (int i = decs.size() - 1; i >= 0; i--) {
+ out = decs.get(i).decode(out);
+ }
+ return out;
+ }
+ }
+}
\ No newline at end of file
diff --git a/httpclient5-websocket/src/main/java/org/apache/hc/client5/http/websocket/core/extension/PerMessageDeflate.java b/httpclient5-websocket/src/main/java/org/apache/hc/client5/http/websocket/core/extension/PerMessageDeflate.java
new file mode 100644
index 0000000000..9d9fb155d0
--- /dev/null
+++ b/httpclient5-websocket/src/main/java/org/apache/hc/client5/http/websocket/core/extension/PerMessageDeflate.java
@@ -0,0 +1,186 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation. For more
+ * information on the Apache Software Foundation, please see
+ * .
+ *
+ */
+package org.apache.hc.client5.http.websocket.core.extension;
+
+import java.io.ByteArrayOutputStream;
+import java.util.zip.Deflater;
+import java.util.zip.Inflater;
+
+import org.apache.hc.client5.http.websocket.core.frame.FrameHeaderBits;
+import org.apache.hc.core5.annotation.Internal;
+
+@Internal
+public final class PerMessageDeflate implements WebSocketExtensionChain {
+ private static final byte[] TAIL = new byte[]{0x00, 0x00, (byte) 0xFF, (byte) 0xFF};
+
+ private final boolean enabled;
+ private final boolean serverNoContextTakeover;
+ private final boolean clientNoContextTakeover;
+ private final Integer clientMaxWindowBits; // negotiated or null
+ private final Integer serverMaxWindowBits; // negotiated or null
+
+ public PerMessageDeflate(final boolean enabled,
+ final boolean serverNoContextTakeover,
+ final boolean clientNoContextTakeover,
+ final Integer clientMaxWindowBits,
+ final Integer serverMaxWindowBits) {
+ this.enabled = enabled;
+ this.serverNoContextTakeover = serverNoContextTakeover;
+ this.clientNoContextTakeover = clientNoContextTakeover;
+ this.clientMaxWindowBits = clientMaxWindowBits;
+ this.serverMaxWindowBits = serverMaxWindowBits;
+ }
+
+ @Override
+ public int rsvMask() {
+ return FrameHeaderBits.RSV1;
+ }
+
+ @Override
+ public Encoder newEncoder() {
+ if (!enabled) {
+ return (data, first, fin) -> new Encoded(data, false);
+ }
+ return new Encoder() {
+ private final Deflater def = new Deflater(Deflater.DEFAULT_COMPRESSION, true); // raw DEFLATE
+
+ @Override
+ public Encoded encode(final byte[] data, final boolean first, final boolean fin) {
+ final byte[] out = first && fin
+ ? compressMessage(data)
+ : compressFragment(data, fin);
+ // RSV1 on first compressed data frame only
+ return new Encoded(out, first);
+ }
+
+ private byte[] compressMessage(final byte[] data) {
+ return doDeflate(data, true, true, clientNoContextTakeover);
+ }
+
+ private byte[] compressFragment(final byte[] data, final boolean fin) {
+ return doDeflate(data, fin, true,fin && clientNoContextTakeover);
+ }
+
+ private byte[] doDeflate(final byte[] data,
+ final boolean fin,
+ final boolean stripTail,
+ final boolean maybeReset) {
+ if (data == null || data.length == 0) {
+ if (fin && maybeReset) {
+ def.reset();
+ }
+ return new byte[0];
+ }
+ def.setInput(data);
+ final ByteArrayOutputStream out = new ByteArrayOutputStream(Math.max(128, data.length / 2));
+ final byte[] buf = new byte[8192];
+ while (!def.needsInput()) {
+ final int n = def.deflate(buf, 0, buf.length, Deflater.SYNC_FLUSH);
+ if (n > 0) {
+ out.write(buf, 0, n);
+ } else {
+ break;
+ }
+ }
+ byte[] all = out.toByteArray();
+ if (stripTail && all.length >= 4) {
+ final int newLen = all.length - 4; // strip 00 00 FF FF
+ if (newLen <= 0) {
+ all = new byte[0];
+ } else {
+ final byte[] trimmed = new byte[newLen];
+ System.arraycopy(all, 0, trimmed, 0, newLen);
+ all = trimmed;
+ }
+ }
+ if (fin && maybeReset) {
+ def.reset();
+ }
+ return all;
+ }
+ };
+ }
+
+ @Override
+ public Decoder newDecoder() {
+ if (!enabled) {
+ return payload -> payload;
+ }
+ return new Decoder() {
+ private final Inflater inf = new Inflater(true);
+
+ @Override
+ public byte[] decode(final byte[] compressedMessage) throws Exception {
+ final byte[] withTail;
+ if (compressedMessage == null || compressedMessage.length == 0) {
+ withTail = TAIL.clone();
+ } else {
+ withTail = new byte[compressedMessage.length + 4];
+ System.arraycopy(compressedMessage, 0, withTail, 0, compressedMessage.length);
+ System.arraycopy(TAIL, 0, withTail, compressedMessage.length, 4);
+ }
+
+ inf.setInput(withTail);
+ final ByteArrayOutputStream out = new ByteArrayOutputStream(Math.max(128, withTail.length * 2));
+ final byte[] buf = new byte[8192];
+ while (!inf.needsInput()) {
+ final int n = inf.inflate(buf);
+ if (n > 0) {
+ out.write(buf, 0, n);
+ } else {
+ break;
+ }
+ }
+ if (serverNoContextTakeover) {
+ inf.reset();
+ }
+ return out.toByteArray();
+ }
+ };
+ }
+
+ // optional getters for logging/tests
+ public boolean isEnabled() {
+ return enabled;
+ }
+
+ public boolean isServerNoContextTakeover() {
+ return serverNoContextTakeover;
+ }
+
+ public boolean isClientNoContextTakeover() {
+ return clientNoContextTakeover;
+ }
+
+ public Integer getClientMaxWindowBits() {
+ return clientMaxWindowBits;
+ }
+
+ public Integer getServerMaxWindowBits() {
+ return serverMaxWindowBits;
+ }
+}
\ No newline at end of file
diff --git a/httpclient5-websocket/src/main/java/org/apache/hc/client5/http/websocket/core/extension/WebSocketExtensionChain.java b/httpclient5-websocket/src/main/java/org/apache/hc/client5/http/websocket/core/extension/WebSocketExtensionChain.java
new file mode 100644
index 0000000000..977014f18d
--- /dev/null
+++ b/httpclient5-websocket/src/main/java/org/apache/hc/client5/http/websocket/core/extension/WebSocketExtensionChain.java
@@ -0,0 +1,80 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation. For more
+ * information on the Apache Software Foundation, please see
+ * .
+ *
+ */
+package org.apache.hc.client5.http.websocket.core.extension;
+
+import org.apache.hc.core5.annotation.Internal;
+
+/**
+ * Generic extension hook for payload transform (e.g., permessage-deflate).
+ * Implementations may return RSV mask (usually RSV1) and indicate whether
+ * the first frame of a message should set RSV.
+ */
+@Internal
+public interface WebSocketExtensionChain {
+
+ /**
+ * RSV bits this extension uses on the first data frame (e.g. 0x40 for RSV1).
+ */
+ int rsvMask();
+
+ /**
+ * Create a thread-confined encoder instance (app thread).
+ */
+ Encoder newEncoder();
+
+ /**
+ * Create a thread-confined decoder instance (I/O thread).
+ */
+ Decoder newDecoder();
+
+ /**
+ * Encoded fragment result.
+ */
+ final class Encoded {
+ public final byte[] payload;
+ public final boolean setRsvOnFirst;
+
+ public Encoded(final byte[] payload, final boolean setRsvOnFirst) {
+ this.payload = payload;
+ this.setRsvOnFirst = setRsvOnFirst;
+ }
+ }
+
+ interface Encoder {
+ /**
+ * Encode one fragment; return transformed payload and whether to set RSV on FIRST frame.
+ */
+ Encoded encode(byte[] data, boolean first, boolean fin);
+ }
+
+ interface Decoder {
+ /**
+ * Decode a full message produced with this extension.
+ */
+ byte[] decode(byte[] payload) throws Exception;
+ }
+}
diff --git a/httpclient5-websocket/src/main/java/org/apache/hc/client5/http/websocket/core/extension/package-info.java b/httpclient5-websocket/src/main/java/org/apache/hc/client5/http/websocket/core/extension/package-info.java
new file mode 100644
index 0000000000..b2cf046271
--- /dev/null
+++ b/httpclient5-websocket/src/main/java/org/apache/hc/client5/http/websocket/core/extension/package-info.java
@@ -0,0 +1,36 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation. For more
+ * information on the Apache Software Foundation, please see
+ * .
+ *
+ */
+
+/**
+ * WebSocket extension SPI and implementations.
+ *
+ * Includes the generic {@code Extension} SPI, chaining support, and a
+ * client-side permessage-deflate (RFC 7692) implementation.
+ *
+ * @since 5.6
+ */
+package org.apache.hc.client5.http.websocket.core.extension;
diff --git a/httpclient5-websocket/src/main/java/org/apache/hc/client5/http/websocket/core/frame/FrameHeaderBits.java b/httpclient5-websocket/src/main/java/org/apache/hc/client5/http/websocket/core/frame/FrameHeaderBits.java
new file mode 100644
index 0000000000..9e76108f55
--- /dev/null
+++ b/httpclient5-websocket/src/main/java/org/apache/hc/client5/http/websocket/core/frame/FrameHeaderBits.java
@@ -0,0 +1,49 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation. For more
+ * information on the Apache Software Foundation, please see
+ * .
+ *
+ */
+package org.apache.hc.client5.http.websocket.core.frame;
+
+import org.apache.hc.core5.annotation.Internal;
+
+/**
+ * WebSocket frame header bit masks (RFC 6455 §5.2).
+ */
+@Internal
+public final class FrameHeaderBits {
+ private FrameHeaderBits() {
+ }
+
+ // First header byte
+ public static final int FIN = 0x80;
+ public static final int RSV1 = 0x40;
+ public static final int RSV2 = 0x20;
+ public static final int RSV3 = 0x10;
+ // low 4 bits (0x0F) are opcode
+
+ // Second header byte
+ public static final int MASK_BIT = 0x80; // client->server payload mask bit
+ // low 7 bits (0x7F) are payload len indicator
+}
diff --git a/httpclient5-websocket/src/main/java/org/apache/hc/client5/http/websocket/core/frame/FrameOpcode.java b/httpclient5-websocket/src/main/java/org/apache/hc/client5/http/websocket/core/frame/FrameOpcode.java
new file mode 100644
index 0000000000..524cffc33d
--- /dev/null
+++ b/httpclient5-websocket/src/main/java/org/apache/hc/client5/http/websocket/core/frame/FrameOpcode.java
@@ -0,0 +1,88 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation. For more
+ * information on the Apache Software Foundation, please see
+ * .
+ *
+ */
+package org.apache.hc.client5.http.websocket.core.frame;
+
+import org.apache.hc.core5.annotation.Internal;
+
+/**
+ * RFC 6455 opcode constants + helpers.
+ */
+@Internal
+public final class FrameOpcode {
+ public static final int CONT = 0x0;
+ public static final int TEXT = 0x1;
+ public static final int BINARY = 0x2;
+ public static final int CLOSE = 0x8;
+ public static final int PING = 0x9;
+ public static final int PONG = 0xA;
+
+ private FrameOpcode() {
+ }
+
+ /**
+ * Control frames have the high bit set in the low nibble (0x8–0xF).
+ */
+ public static boolean isControl(final int opcode) {
+ return (opcode & 0x08) != 0;
+ }
+
+ /**
+ * Data opcodes (not continuation).
+ */
+ public static boolean isData(final int opcode) {
+ return opcode == TEXT || opcode == BINARY;
+ }
+
+ /**
+ * Continuation opcode.
+ */
+ public static boolean isContinuation(final int opcode) {
+ return opcode == CONT;
+ }
+
+ /**
+ * Optional: human-readable name for debugging.
+ */
+ public static String name(final int opcode) {
+ switch (opcode) {
+ case CONT:
+ return "CONT";
+ case TEXT:
+ return "TEXT";
+ case BINARY:
+ return "BINARY";
+ case CLOSE:
+ return "CLOSE";
+ case PING:
+ return "PING";
+ case PONG:
+ return "PONG";
+ default:
+ return "0x" + Integer.toHexString(opcode);
+ }
+ }
+}
diff --git a/httpclient5-websocket/src/main/java/org/apache/hc/client5/http/websocket/core/frame/WebSocketFrameWriter.java b/httpclient5-websocket/src/main/java/org/apache/hc/client5/http/websocket/core/frame/WebSocketFrameWriter.java
new file mode 100644
index 0000000000..40255ae95a
--- /dev/null
+++ b/httpclient5-websocket/src/main/java/org/apache/hc/client5/http/websocket/core/frame/WebSocketFrameWriter.java
@@ -0,0 +1,189 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation. For more
+ * information on the Apache Software Foundation, please see
+ * .
+ *
+ */
+package org.apache.hc.client5.http.websocket.core.frame;
+
+import static org.apache.hc.client5.http.websocket.core.frame.FrameHeaderBits.FIN;
+import static org.apache.hc.client5.http.websocket.core.frame.FrameHeaderBits.MASK_BIT;
+import static org.apache.hc.client5.http.websocket.core.frame.FrameHeaderBits.RSV1;
+import static org.apache.hc.client5.http.websocket.core.frame.FrameHeaderBits.RSV2;
+import static org.apache.hc.client5.http.websocket.core.frame.FrameHeaderBits.RSV3;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.ThreadLocalRandom;
+
+import org.apache.hc.client5.http.websocket.core.message.CloseCodec;
+import org.apache.hc.core5.annotation.Internal;
+
+/**
+ * RFC 6455 frame writer with helpers to build into an existing target buffer.
+ *
+ * @since 5.6
+ */
+@Internal
+public final class WebSocketFrameWriter {
+
+ // -- Text/Binary -----------------------------------------------------------
+
+ public ByteBuffer text(final CharSequence data, final boolean fin) {
+ final ByteBuffer payload = data == null ? ByteBuffer.allocate(0)
+ : StandardCharsets.UTF_8.encode(data.toString());
+ // Client → server MUST be masked
+ return frame(FrameOpcode.TEXT, payload, fin, true);
+ }
+
+ public ByteBuffer binary(final ByteBuffer data, final boolean fin) {
+ final ByteBuffer payload = data == null ? ByteBuffer.allocate(0) : data.asReadOnlyBuffer();
+ return frame(FrameOpcode.BINARY, payload, fin, true);
+ }
+
+ // -- Control frames (FIN=true, payload ≤ 125, never compressed) -----------
+
+ public ByteBuffer ping(final ByteBuffer payloadOrNull) {
+ final ByteBuffer p = payloadOrNull == null ? ByteBuffer.allocate(0) : payloadOrNull.asReadOnlyBuffer();
+ if (p.remaining() > 125) {
+ throw new IllegalArgumentException("PING payload > 125 bytes");
+ }
+ return frame(FrameOpcode.PING, p, true, true);
+ }
+
+ public ByteBuffer pong(final ByteBuffer payloadOrNull) {
+ final ByteBuffer p = payloadOrNull == null ? ByteBuffer.allocate(0) : payloadOrNull.asReadOnlyBuffer();
+ if (p.remaining() > 125) {
+ throw new IllegalArgumentException("PONG payload > 125 bytes");
+ }
+ return frame(FrameOpcode.PONG, p, true, true);
+ }
+
+ public ByteBuffer close(final int code, final String reason) {
+ if (!CloseCodec.isValidToSend(code)) {
+ throw new IllegalArgumentException("Invalid close code to send: " + code);
+ }
+ final String safeReason = CloseCodec.truncateReasonUtf8(reason);
+ final ByteBuffer reasonBuf = safeReason.isEmpty()
+ ? ByteBuffer.allocate(0)
+ : StandardCharsets.UTF_8.encode(safeReason);
+
+ if (reasonBuf.remaining() > 123) {
+ throw new IllegalArgumentException("Close reason too long (UTF-8 bytes > 123)");
+ }
+
+ final ByteBuffer p = ByteBuffer.allocate(2 + reasonBuf.remaining());
+ p.put((byte) (code >> 8 & 0xFF));
+ p.put((byte) (code & 0xFF));
+ if (reasonBuf.hasRemaining()) {
+ p.put(reasonBuf);
+ }
+ p.flip();
+ return frame(FrameOpcode.CLOSE, p, true, true);
+ }
+
+ public ByteBuffer closeEcho(final ByteBuffer payload) {
+ final ByteBuffer p = payload == null ? ByteBuffer.allocate(0) : payload.asReadOnlyBuffer();
+ if (p.remaining() > 125) {
+ throw new IllegalArgumentException("Close payload > 125 bytes");
+ }
+ return frame(FrameOpcode.CLOSE, p, true, true);
+ }
+
+ // -- Core framing ----------------------------------------------------------
+
+ public ByteBuffer frame(final int opcode, final ByteBuffer payload, final boolean fin, final boolean mask) {
+ return frameWithRSV(opcode, payload, fin, mask, 0);
+ }
+
+ public ByteBuffer frameWithRSV(final int opcode, final ByteBuffer payload, final boolean fin,
+ final boolean mask, final int rsvBits) {
+ final int len = payload == null ? 0 : payload.remaining();
+ final int hdrExtra = len <= 125 ? 0 : len <= 0xFFFF ? 2 : 8;
+ final int maskLen = mask ? 4 : 0;
+ final ByteBuffer out = ByteBuffer.allocate(2 + hdrExtra + maskLen + len).order(ByteOrder.BIG_ENDIAN);
+ frameIntoWithRSV(opcode, payload, fin, mask, rsvBits, out);
+ out.flip();
+ return out;
+ }
+
+ public ByteBuffer frameInto(final int opcode, final ByteBuffer payload, final boolean fin,
+ final boolean mask, final ByteBuffer out) {
+ return frameIntoWithRSV(opcode, payload, fin, mask, 0, out);
+ }
+
+ public ByteBuffer frameIntoWithRSV(final int opcode, final ByteBuffer payload, final boolean fin,
+ final boolean mask, final int rsvBits, final ByteBuffer out) {
+ final int len = payload == null ? 0 : payload.remaining();
+
+ if (FrameOpcode.isControl(opcode)) {
+ if (!fin) {
+ throw new IllegalArgumentException("Control frames must not be fragmented (FIN=false)");
+ }
+ if (len > 125) {
+ throw new IllegalArgumentException("Control frame payload > 125 bytes");
+ }
+ if ((rsvBits & (RSV1 | RSV2 | RSV3)) != 0) {
+ throw new IllegalArgumentException("RSV bits must be 0 on control frames");
+ }
+ }
+
+ final int finBit = fin ? FIN : 0;
+ out.put((byte) (finBit | rsvBits & (RSV1 | RSV2 | RSV3) | opcode & 0x0F));
+
+ if (len <= 125) {
+ out.put((byte) ((mask ? MASK_BIT : 0) | len));
+ } else if (len <= 0xFFFF) {
+ out.put((byte) ((mask ? MASK_BIT : 0) | 126));
+ out.putShort((short) len);
+ } else {
+ out.put((byte) ((mask ? MASK_BIT : 0) | 127));
+ out.putLong(len & 0x7FFF_FFFF_FFFF_FFFFL);
+ }
+
+ int[] mkey = null;
+ if (mask) {
+ mkey = new int[]{rnd(), rnd(), rnd(), rnd()};
+ out.put((byte) mkey[0]).put((byte) mkey[1]).put((byte) mkey[2]).put((byte) mkey[3]);
+ }
+
+ if (len > 0) {
+ final ByteBuffer src = payload.asReadOnlyBuffer();
+ int i = 0; // simpler, safer mask index
+ while (src.hasRemaining()) {
+ int b = src.get() & 0xFF;
+ if (mask) {
+ b ^= mkey[i & 3];
+ i++;
+ }
+ out.put((byte) b);
+ }
+ }
+ return out;
+ }
+
+ private static int rnd() {
+ return ThreadLocalRandom.current().nextInt(256);
+ }
+}
diff --git a/httpclient5-websocket/src/main/java/org/apache/hc/client5/http/websocket/core/frame/package-info.java b/httpclient5-websocket/src/main/java/org/apache/hc/client5/http/websocket/core/frame/package-info.java
new file mode 100644
index 0000000000..10191101ed
--- /dev/null
+++ b/httpclient5-websocket/src/main/java/org/apache/hc/client5/http/websocket/core/frame/package-info.java
@@ -0,0 +1,36 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation. For more
+ * information on the Apache Software Foundation, please see
+ * .
+ *
+ */
+
+/**
+ * Low-level WebSocket frame helpers.
+ *
+ * Opcode constants, header bit masks, and frame writer utilities aligned
+ * with RFC 6455.
+ *
+ * @since 5.6
+ */
+package org.apache.hc.client5.http.websocket.core.frame;
\ No newline at end of file
diff --git a/httpclient5-websocket/src/main/java/org/apache/hc/client5/http/websocket/core/message/CloseCodec.java b/httpclient5-websocket/src/main/java/org/apache/hc/client5/http/websocket/core/message/CloseCodec.java
new file mode 100644
index 0000000000..f0c5c6a42e
--- /dev/null
+++ b/httpclient5-websocket/src/main/java/org/apache/hc/client5/http/websocket/core/message/CloseCodec.java
@@ -0,0 +1,190 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation. For more
+ * information on the Apache Software Foundation, please see
+ * .
+ *
+ */
+package org.apache.hc.client5.http.websocket.core.message;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+
+import org.apache.hc.core5.annotation.Internal;
+
+/**
+ * Helpers for RFC6455 CLOSE parsing & validation.
+ */
+@Internal
+public final class CloseCodec {
+
+ private CloseCodec() {
+ }
+
+
+ /**
+ * Reads the close status code from the payload buffer, if present.
+ * Returns {@code 1005} (“no status code present”) when the payload
+ * does not contain at least two bytes.
+ */
+ public static int readCloseCode(final ByteBuffer payloadRO) {
+ if (payloadRO == null || payloadRO.remaining() < 2) {
+ return 1005; // “no status code present”
+ }
+ final int b1 = payloadRO.get() & 0xFF;
+ final int b2 = payloadRO.get() & 0xFF;
+ return (b1 << 8) | b2;
+ }
+
+ /**
+ * Reads the close reason from the remaining bytes of the payload
+ * as UTF-8. Returns an empty string if there is no payload left.
+ */
+ public static String readCloseReason(final ByteBuffer payloadRO) {
+ if (payloadRO == null || !payloadRO.hasRemaining()) {
+ return "";
+ }
+ final ByteBuffer dup = payloadRO.slice();
+ return StandardCharsets.UTF_8.decode(dup).toString();
+ }
+
+ // ---- RFC validation (sender & receiver) ---------------------------------
+
+ /**
+ * RFC 6455 §7.4.2: MUST NOT appear on the wire.
+ */
+ private static boolean isForbiddenOnWire(final int code) {
+ return code == 1005 || code == 1006 || code == 1015;
+ }
+
+ /**
+ * Codes defined by RFC 6455 to send (and likewise valid to receive).
+ */
+ private static boolean isRfcDefined(final int code) {
+ switch (code) {
+ case 1000: // normal
+ case 1001: // going away
+ case 1002: // protocol error
+ case 1003: // unsupported data
+ case 1007: // invalid payload data
+ case 1008: // policy violation
+ case 1009: // message too big
+ case 1010: // mandatory extension
+ case 1011: // internal error
+ return true;
+ default:
+ return false;
+ }
+ }
+
+ /**
+ * Application/reserved range that may be sent by endpoints.
+ */
+ private static boolean isAppRange(final int code) {
+ return code >= 3000 && code <= 4999;
+ }
+
+ /**
+ * Validate a code we intend to PUT ON THE WIRE (sender-side).
+ */
+ public static boolean isValidToSend(final int code) {
+ if (code < 0) {
+ return false;
+ }
+ if (isForbiddenOnWire(code)) {
+ return false;
+ }
+ return isRfcDefined(code) || isAppRange(code);
+ }
+
+ /**
+ * Validate a code we PARSED FROM THE WIRE (receiver-side).
+ */
+ public static boolean isValidToReceive(final int code) {
+ // 1005, 1006, 1015 must not appear on the wire
+ if (isForbiddenOnWire(code)) {
+ return false;
+ }
+ // Same allowed sets otherwise
+ return isRfcDefined(code) || isAppRange(code);
+ }
+
+ // ---- Reason handling: max 123 bytes (2 bytes used by code) --------------
+
+ /**
+ * Returns a UTF-8 string truncated to ≤ 123 bytes, preserving code-points.
+ * This ensures that a CLOSE frame payload (2-byte status code + reason)
+ * never exceeds the 125-byte control frame limit.
+ */
+ public static String truncateReasonUtf8(final String reason) {
+ if (reason == null || reason.isEmpty()) {
+ return "";
+ }
+ final byte[] bytes = reason.getBytes(StandardCharsets.UTF_8);
+ if (bytes.length <= 123) {
+ return reason;
+ }
+ int i = 0;
+ int byteCount = 0;
+ while (i < reason.length()) {
+ final int cp = reason.codePointAt(i);
+ final int charCount = Character.charCount(cp);
+ final int extra = new String(Character.toChars(cp))
+ .getBytes(StandardCharsets.UTF_8).length;
+ if (byteCount + extra > 123) {
+ break;
+ }
+ byteCount += extra;
+ i += charCount;
+ }
+ return reason.substring(0, i);
+ }
+
+ // ---- Encoding -----------------------------------------------------------
+
+ /**
+ * Encodes a close status code and reason into a payload suitable for a
+ * CLOSE control frame:
+ *
+ *
+ * payload[0] = high-byte of status code
+ * payload[1] = low-byte of status code
+ * payload[2..] = UTF-8 bytes of the (possibly truncated) reason
+ *
+ *
+ * The reason is internally truncated to ≤ 123 UTF-8 bytes to ensure the
+ * resulting payload never exceeds the 125-byte control frame limit.
+ *
+ * The caller is expected to have already validated the status code with
+ * {@link #isValidToSend(int)}.
+ */
+ public static byte[] encode(final int statusCode, final String reason) {
+ final String truncated = truncateReasonUtf8(reason);
+ final byte[] reasonBytes = truncated.getBytes(StandardCharsets.UTF_8);
+ // 2 bytes for the status code
+ final byte[] payload = new byte[2 + reasonBytes.length];
+ payload[0] = (byte) ((statusCode >>> 8) & 0xFF);
+ payload[1] = (byte) (statusCode & 0xFF);
+ System.arraycopy(reasonBytes, 0, payload, 2, reasonBytes.length);
+ return payload;
+ }
+}
diff --git a/httpclient5-websocket/src/main/java/org/apache/hc/client5/http/websocket/core/message/package-info.java b/httpclient5-websocket/src/main/java/org/apache/hc/client5/http/websocket/core/message/package-info.java
new file mode 100644
index 0000000000..58fbede0be
--- /dev/null
+++ b/httpclient5-websocket/src/main/java/org/apache/hc/client5/http/websocket/core/message/package-info.java
@@ -0,0 +1,36 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation. For more
+ * information on the Apache Software Foundation, please see
+ * .
+ *
+ */
+
+/**
+ * Message-level helpers and codecs.
+ *
+ * Utilities for parsing and validating message semantics (e.g., CLOSE
+ * status code and reason handling).
+ *
+ * @since 5.6
+ */
+package org.apache.hc.client5.http.websocket.core.message;
diff --git a/httpclient5-websocket/src/main/java/org/apache/hc/client5/http/websocket/core/package-info.java b/httpclient5-websocket/src/main/java/org/apache/hc/client5/http/websocket/core/package-info.java
new file mode 100644
index 0000000000..df1ab9df89
--- /dev/null
+++ b/httpclient5-websocket/src/main/java/org/apache/hc/client5/http/websocket/core/package-info.java
@@ -0,0 +1,36 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation. For more
+ * information on the Apache Software Foundation, please see
+ * .
+ *
+ */
+
+/**
+ * Core WebSocket implementation utilities.
+ *
+ * Implementation detail packages live under {@code core}. These are not
+ * part of the public API and may change without notice.
+ *
+ * @since 5.6
+ */
+package org.apache.hc.client5.http.websocket.core;
diff --git a/httpclient5-websocket/src/main/java/org/apache/hc/client5/http/websocket/core/util/ByteBufferPool.java b/httpclient5-websocket/src/main/java/org/apache/hc/client5/http/websocket/core/util/ByteBufferPool.java
new file mode 100644
index 0000000000..555325323e
--- /dev/null
+++ b/httpclient5-websocket/src/main/java/org/apache/hc/client5/http/websocket/core/util/ByteBufferPool.java
@@ -0,0 +1,127 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation. For more
+ * information on the Apache Software Foundation, please see
+ * .
+ *
+ */
+package org.apache.hc.client5.http.websocket.core.util;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hc.core5.annotation.Internal;
+
+/**
+ * Lock-free fixed-size ByteBuffer pool with a hard capacity limit.
+ * Buffers are cleared before reuse. Non-matching capacities are dropped.
+ *
+ * @since 5.6
+ */
+@Internal
+public final class ByteBufferPool {
+
+ private final ConcurrentLinkedQueue pool = new ConcurrentLinkedQueue<>();
+ private final AtomicInteger pooled = new AtomicInteger(0);
+
+ private final int bufferSize;
+ private final int maxCapacity;
+ private final boolean direct;
+
+ public ByteBufferPool(final int bufferSize, final int maxCapacity) {
+ this(bufferSize, maxCapacity, false);
+ }
+
+ public ByteBufferPool(final int bufferSize, final int maxCapacity, final boolean direct) {
+ if (bufferSize <= 0 || maxCapacity < 0) {
+ throw new IllegalArgumentException("Invalid pool configuration");
+ }
+ this.bufferSize = bufferSize;
+ this.maxCapacity = maxCapacity;
+ this.direct = direct;
+ }
+
+ /**
+ * Acquire a buffer or allocate a new one if the pool is empty.
+ */
+ public ByteBuffer acquire() {
+ final ByteBuffer buf = pool.poll();
+ if (buf != null) {
+ pooled.decrementAndGet();
+ buf.clear();
+ return buf;
+ }
+ return direct ? ByteBuffer.allocateDirect(bufferSize) : ByteBuffer.allocate(bufferSize);
+ }
+
+ /**
+ * Return a buffer to the pool iff it matches the configured capacity and there is room.
+ */
+ public void release(final ByteBuffer buffer) {
+ if (buffer == null || buffer.capacity() != bufferSize) {
+ return;
+ }
+ buffer.clear();
+ for (;;) {
+ final int n = pooled.get();
+ if (n >= maxCapacity) {
+ return;
+ }
+ if (pooled.compareAndSet(n, n + 1)) {
+ pool.offer(buffer);
+ return;
+ }
+ }
+ }
+
+ /**
+ * Drain the pool.
+ */
+ public void clear() {
+ while (pool.poll() != null) { /* drain */ }
+ pooled.set(0);
+ }
+
+ /**
+ * Size in bytes of pooled buffers.
+ */
+ public int bufferSize() {
+ return bufferSize;
+ }
+
+ /**
+ * Backwards-compatible accessor for callers expecting getBufferSize().
+ */
+ public int getBufferSize() {
+ return bufferSize;
+ }
+
+ public int maxCapacity() {
+ return maxCapacity;
+ }
+
+ public int pooledCount() {
+ return pooled.get();
+ }
+
+}
diff --git a/httpclient5-websocket/src/main/java/org/apache/hc/client5/http/websocket/core/util/package-info.java b/httpclient5-websocket/src/main/java/org/apache/hc/client5/http/websocket/core/util/package-info.java
new file mode 100644
index 0000000000..c3e7f67c99
--- /dev/null
+++ b/httpclient5-websocket/src/main/java/org/apache/hc/client5/http/websocket/core/util/package-info.java
@@ -0,0 +1,36 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation. For more
+ * information on the Apache Software Foundation, please see
+ * .
+ *
+ */
+
+/**
+ * Message-level helpers and codecs.
+ *
+ * Utilities for parsing and validating message semantics (e.g., CLOSE
+ * status code and reason handling).
+ *
+ * @since 5.6
+ */
+package org.apache.hc.client5.http.websocket.core.util;
diff --git a/httpclient5-websocket/src/main/java/org/apache/hc/client5/http/websocket/package-info.java b/httpclient5-websocket/src/main/java/org/apache/hc/client5/http/websocket/package-info.java
new file mode 100644
index 0000000000..7f9d5d3fe5
--- /dev/null
+++ b/httpclient5-websocket/src/main/java/org/apache/hc/client5/http/websocket/package-info.java
@@ -0,0 +1,74 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation. For more
+ * information on the Apache Software Foundation, please see
+ * .
+ *
+ */
+
+/**
+ * Client-side WebSocket support built on top of Apache HttpClient.
+ *
+ * This package provides the public API for establishing and using
+ * WebSocket connections according to RFC 6455. WebSocket sessions
+ * are created by upgrading an HTTP request and are backed internally
+ * by the non-blocking I/O reactor used by the HttpClient async APIs.
+ *
+ * Core abstractions
+ *
+ * - {@link org.apache.hc.client5.http.websocket.api.WebSocket WebSocket} –
+ * application view of a single WebSocket connection, used to send
+ * text and binary messages and initiate the close handshake.
+ * - {@link org.apache.hc.client5.http.websocket.api.WebSocketListener WebSocketListener} –
+ * callback interface that receives inbound messages, pings, pongs,
+ * errors, and close notifications.
+ * - {@link org.apache.hc.client5.http.websocket.api.WebSocketClientConfig WebSocketClientConfig} –
+ * immutable configuration for timeouts, maximum frame and message
+ * sizes, auto-pong behaviour, and buffer management.
+ * - {@link org.apache.hc.client5.http.websocket.client.CloseableWebSocketClient CloseableWebSocketClient} –
+ * high-level client for establishing WebSocket connections.
+ * - {@link org.apache.hc.client5.http.websocket.client.WebSocketClients WebSocketClients} and
+ * {@link org.apache.hc.client5.http.websocket.client.WebSocketClientBuilder WebSocketClientBuilder} –
+ * factory and builder for creating and configuring WebSocket clients.
+ *
+ *
+ * Threading model
+ * Outbound operations on {@code WebSocket} are thread-safe and may be
+ * invoked from arbitrary application threads. Inbound callbacks on
+ * {@code WebSocketListener} are normally executed on I/O dispatcher
+ * threads; listeners should avoid long blocking operations.
+ *
+ * Close handshake
+ * The implementation follows the close handshake defined in RFC 6455.
+ * Applications should initiate shutdown via
+ * {@link org.apache.hc.client5.http.websocket.api.WebSocket#close(int, String)}
+ * and treat receipt of a close frame as a terminal event. The configured
+ * {@code closeWaitTimeout} controls how long the client will wait for the
+ * peer's close frame before the underlying connection is closed.
+ *
+ * Classes in {@code org.apache.hc.client5.http.websocket.core} and
+ * {@code org.apache.hc.client5.http.websocket.transport} are internal
+ * implementation details and are not intended for direct use.
+ *
+ * @since 5.6
+ */
+package org.apache.hc.client5.http.websocket;
diff --git a/httpclient5-websocket/src/main/java/org/apache/hc/client5/http/websocket/transport/WebSocketFrameDecoder.java b/httpclient5-websocket/src/main/java/org/apache/hc/client5/http/websocket/transport/WebSocketFrameDecoder.java
new file mode 100644
index 0000000000..3153ed5bab
--- /dev/null
+++ b/httpclient5-websocket/src/main/java/org/apache/hc/client5/http/websocket/transport/WebSocketFrameDecoder.java
@@ -0,0 +1,172 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation. For more
+ * information on the Apache Software Foundation, please see
+ * .
+ *
+ */
+package org.apache.hc.client5.http.websocket.transport;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hc.client5.http.websocket.core.exceptions.WebSocketProtocolException;
+import org.apache.hc.client5.http.websocket.core.frame.FrameOpcode;
+import org.apache.hc.core5.annotation.Internal;
+
+@Internal
+public final class WebSocketFrameDecoder {
+ private final int maxFrameSize;
+ private final boolean strictNoExtensions;
+
+ private int opcode;
+ private boolean fin;
+ private boolean rsv1, rsv2, rsv3;
+ private ByteBuffer payload = ByteBuffer.allocate(0);
+ private final boolean expectMasked;
+
+
+
+ public WebSocketFrameDecoder(final int maxFrameSize, final boolean strictNoExtensions) {
+ this(maxFrameSize, strictNoExtensions, false);
+ }
+
+ public WebSocketFrameDecoder(final int maxFrameSize) {
+ this(maxFrameSize, true, false);
+ }
+
+ public WebSocketFrameDecoder(final int maxFrameSize,
+ final boolean strictNoExtensions,
+ final boolean expectMasked) {
+ this.maxFrameSize = maxFrameSize;
+ this.strictNoExtensions = strictNoExtensions;
+ this.expectMasked = expectMasked;
+ }
+
+ public boolean decode(final ByteBuffer in) {
+ in.mark();
+ if (in.remaining() < 2) {
+ in.reset();
+ return false;
+ }
+
+ final int b0 = in.get() & 0xFF;
+ final int b1 = in.get() & 0xFF;
+
+ fin = (b0 & 0x80) != 0;
+ rsv1 = (b0 & 0x40) != 0;
+ rsv2 = (b0 & 0x20) != 0;
+ rsv3 = (b0 & 0x10) != 0;
+
+ if (strictNoExtensions && (rsv1 || rsv2 || rsv3)) {
+ throw new WebSocketProtocolException(1002, "RSV bits set without extension");
+ }
+
+ opcode = b0 & 0x0F;
+
+ if (opcode != 0 && opcode != 1 && opcode != 2 && opcode != 8 && opcode != 9 && opcode != 10) {
+ throw new WebSocketProtocolException(1002, "Reserved/unknown opcode: " + opcode);
+ }
+
+ final boolean masked = (b1 & 0x80) != 0;
+ long len = b1 & 0x7F;
+
+ // Mode-aware masking rule
+ if (masked != expectMasked) {
+ if (expectMasked) {
+ // server decoding client frames: clients MUST mask
+ throw new WebSocketProtocolException(1002, "Client frame is not masked");
+ } else {
+ // client decoding server frames: servers MUST NOT mask
+ throw new WebSocketProtocolException(1002, "Server frame is masked");
+ }
+ }
+
+ if (len == 126) {
+ if (in.remaining() < 2) {
+ in.reset();
+ return false;
+ }
+ len = in.getShort() & 0xFFFF;
+ } else if (len == 127) {
+ if (in.remaining() < 8) {
+ in.reset();
+ return false;
+ }
+ final long l = in.getLong();
+ if (l < 0) {
+ throw new WebSocketProtocolException(1002, "Negative length");
+ }
+ len = l;
+ }
+
+ if (FrameOpcode.isControl(opcode)) {
+ if (!fin) {
+ throw new WebSocketProtocolException(1002, "fragmented control frame");
+ }
+ if (len > 125) {
+ throw new WebSocketProtocolException(1002, "control frame too large");
+ }
+ // (RSV checks above already cover RSV!=0)
+ }
+
+ if (len > Integer.MAX_VALUE || maxFrameSize > 0 && len > maxFrameSize) {
+ throw new WebSocketProtocolException(1009, "Frame too large: " + len);
+ }
+
+ if (in.remaining() < len) {
+ in.reset();
+ return false;
+ }
+
+ final ByteBuffer data = ByteBuffer.allocate((int) len);
+ for (int i = 0; i < len; i++) {
+ data.put(in.get());
+ }
+ data.flip();
+ payload = data.asReadOnlyBuffer();
+ return true;
+ }
+
+ public int opcode() {
+ return opcode;
+ }
+
+ public boolean fin() {
+ return fin;
+ }
+
+ public boolean rsv1() {
+ return rsv1;
+ }
+
+ public boolean rsv2() {
+ return rsv2;
+ }
+
+ public boolean rsv3() {
+ return rsv3;
+ }
+
+ public ByteBuffer payload() {
+ return payload.asReadOnlyBuffer();
+ }
+}
diff --git a/httpclient5-websocket/src/main/java/org/apache/hc/client5/http/websocket/transport/WebSocketInbound.java b/httpclient5-websocket/src/main/java/org/apache/hc/client5/http/websocket/transport/WebSocketInbound.java
new file mode 100644
index 0000000000..ceaa5185f1
--- /dev/null
+++ b/httpclient5-websocket/src/main/java/org/apache/hc/client5/http/websocket/transport/WebSocketInbound.java
@@ -0,0 +1,447 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation. For more
+ * information on the Apache Software Foundation, please see
+ * .
+ *
+ */
+package org.apache.hc.client5.http.websocket.transport;
+
+import java.nio.ByteBuffer;
+import java.nio.CharBuffer;
+import java.nio.charset.CharacterCodingException;
+import java.nio.charset.CharsetDecoder;
+import java.nio.charset.CodingErrorAction;
+import java.nio.charset.StandardCharsets;
+
+import org.apache.hc.client5.http.websocket.core.exceptions.WebSocketProtocolException;
+import org.apache.hc.client5.http.websocket.core.frame.FrameOpcode;
+import org.apache.hc.client5.http.websocket.core.message.CloseCodec;
+import org.apache.hc.core5.annotation.Internal;
+import org.apache.hc.core5.io.CloseMode;
+import org.apache.hc.core5.reactor.EventMask;
+import org.apache.hc.core5.reactor.IOSession;
+import org.apache.hc.core5.util.Timeout;
+
+/**
+ * Inbound path: decoding, validation, fragment assembly, close handshake.
+ */
+@Internal
+final class WebSocketInbound {
+
+ private final WebSocketSessionState s;
+ private final WebSocketOutbound out;
+
+ WebSocketInbound(final WebSocketSessionState state, final WebSocketOutbound outbound) {
+ this.s = state;
+ this.out = outbound;
+ }
+
+ // ---- lifecycle ----
+ void onConnected(final IOSession ioSession) {
+ ioSession.setSocketTimeout(Timeout.DISABLED);
+ ioSession.setEventMask(EventMask.READ | EventMask.WRITE);
+ }
+
+ void onTimeout(final IOSession ioSession, final Timeout timeout) {
+ try {
+ final String msg = "I/O timeout: " + (timeout != null ? timeout : Timeout.ZERO_MILLISECONDS);
+ s.listener.onError(new java.util.concurrent.TimeoutException(msg));
+ } catch (final Throwable ignore) {
+ }
+ }
+
+ void onException(final IOSession ioSession, final Exception cause) {
+ try {
+ s.listener.onError(cause);
+ } catch (final Throwable ignore) {
+ }
+ }
+
+ void onDisconnected(final IOSession ioSession) {
+ if (s.open.getAndSet(false)) {
+ try {
+ s.listener.onClose(1006, "abnormal closure");
+ } catch (final Throwable ignore) {
+ }
+ }
+ if (s.readBuf != null) {
+ s.bufferPool.release(s.readBuf);
+ s.readBuf = null;
+ }
+ out.drainAndRelease();
+ ioSession.clearEvent(EventMask.READ | EventMask.WRITE);
+ }
+
+ void onInputReady(final IOSession ioSession, final ByteBuffer src) {
+ try {
+ if (!s.open.get() && !s.closeSent.get()) {
+ return;
+ }
+
+ if (s.readBuf == null) {
+ s.readBuf = s.bufferPool.acquire();
+ if (s.readBuf == null) {
+ return;
+ }
+ }
+
+ if (src != null && src.hasRemaining()) {
+ appendToInbuf(src);
+ }
+
+ int n;
+ do {
+ ByteBuffer rb = s.readBuf;
+ if (rb == null) {
+ rb = s.bufferPool.acquire();
+ if (rb == null) {
+ return;
+ }
+ s.readBuf = rb;
+ }
+ rb.clear();
+ n = ioSession.read(rb);
+ if (n > 0) {
+ rb.flip();
+ appendToInbuf(rb);
+ }
+ } while (n > 0);
+
+ if (n < 0) {
+ onDisconnected(ioSession);
+ return;
+ }
+
+ s.inbuf.flip();
+ for (; ; ) {
+ final boolean has;
+ try {
+ has = s.decoder.decode(s.inbuf);
+ } catch (final RuntimeException rte) {
+ final int code = rte instanceof WebSocketProtocolException
+ ? ((WebSocketProtocolException) rte).closeCode
+ : 1002;
+ initiateCloseAndWait(ioSession, code, rte.getMessage());
+ s.inbuf.clear();
+ return;
+ }
+ if (!has) {
+ break;
+ }
+
+ final int op = s.decoder.opcode();
+ final boolean fin = s.decoder.fin();
+ final boolean r1 = s.decoder.rsv1();
+ final boolean r2 = s.decoder.rsv2();
+ final boolean r3 = s.decoder.rsv3();
+ final ByteBuffer payload = s.decoder.payload();
+
+ if (r2 || r3) {
+ initiateCloseAndWait(ioSession, 1002, "RSV2/RSV3 not supported");
+ s.inbuf.clear();
+ return;
+ }
+ if (r1 && s.decChain == null) {
+ initiateCloseAndWait(ioSession, 1002, "RSV1 without negotiated extension");
+ s.inbuf.clear();
+ return;
+ }
+
+ if (s.closeSent.get() && op != FrameOpcode.CLOSE) {
+ continue;
+ }
+
+ if (FrameOpcode.isControl(op)) {
+ if (!fin) {
+ initiateCloseAndWait(ioSession, 1002, "fragmented control frame");
+ s.inbuf.clear();
+ return;
+ }
+ if (payload.remaining() > 125) {
+ initiateCloseAndWait(ioSession, 1002, "control frame too large");
+ s.inbuf.clear();
+ return;
+ }
+ }
+
+ switch (op) {
+ case FrameOpcode.PING: {
+ try {
+ s.listener.onPing(payload.asReadOnlyBuffer());
+ } catch (final Throwable ignore) {
+ }
+ if (s.cfg.isAutoPong()) {
+ out.enqueueCtrl(out.pooledFrame(FrameOpcode.PONG, payload.asReadOnlyBuffer(), true));
+ }
+ break;
+ }
+ case FrameOpcode.PONG: {
+ try {
+ s.listener.onPong(payload.asReadOnlyBuffer());
+ } catch (final Throwable ignore) {
+ }
+ break;
+ }
+ case FrameOpcode.CLOSE: {
+ final ByteBuffer ro = payload.asReadOnlyBuffer();
+ int code = 1005;
+ String reason = "";
+ final int len = ro.remaining();
+
+ if (len == 1) {
+ initiateCloseAndWait(ioSession, 1002, "Close frame length of 1 is invalid");
+ s.inbuf.clear();
+ return;
+ } else if (len >= 2) {
+ final ByteBuffer dup = ro.slice();
+ code = CloseCodec.readCloseCode(dup);
+
+ if (!CloseCodec.isValidToReceive(code)) {
+ initiateCloseAndWait(ioSession, 1002, "Invalid close code: " + code);
+ s.inbuf.clear();
+ return;
+ }
+
+ if (dup.hasRemaining()) {
+ final CharsetDecoder dec = StandardCharsets.UTF_8
+ .newDecoder()
+ .onMalformedInput(CodingErrorAction.REPORT)
+ .onUnmappableCharacter(CodingErrorAction.REPORT);
+ try {
+ reason = dec.decode(dup.asReadOnlyBuffer()).toString();
+ } catch (final CharacterCodingException badUtf8) {
+ initiateCloseAndWait(ioSession, 1007, "Invalid UTF-8 in close reason");
+ s.inbuf.clear();
+ return;
+ }
+ }
+ }
+
+ notifyCloseOnce(code, reason);
+
+ s.closeReceived.set(true);
+
+ if (!s.closeSent.get()) {
+ out.enqueueCtrl(out.pooledCloseEcho(ro));
+ }
+
+ s.session.setSocketTimeout(s.cfg.getCloseWaitTimeout());
+ s.closeAfterFlush = true;
+ ioSession.clearEvent(EventMask.READ);
+ ioSession.setEvent(EventMask.WRITE);
+ s.inbuf.clear();
+ return;
+ }
+ case FrameOpcode.CONT: {
+ if (s.assemblingOpcode == -1) {
+ initiateCloseAndWait(ioSession, 1002, "Unexpected continuation frame");
+ s.inbuf.clear();
+ return;
+ }
+ if (r1) {
+ initiateCloseAndWait(ioSession, 1002, "RSV1 set on continuation");
+ s.inbuf.clear();
+ return;
+ }
+ appendToMessage(payload, ioSession);
+ if (fin) {
+ deliverAssembledMessage();
+ }
+ break;
+ }
+ case FrameOpcode.TEXT:
+ case FrameOpcode.BINARY: {
+ if (s.assemblingOpcode != -1) {
+ initiateCloseAndWait(ioSession, 1002, "New data frame while fragmented message in progress");
+ s.inbuf.clear();
+ return;
+ }
+ if (!fin) {
+ startMessage(op, payload, r1, ioSession);
+ break;
+ }
+ if (s.cfg.getMaxMessageSize() > 0 && payload.remaining() > s.cfg.getMaxMessageSize()) {
+ initiateCloseAndWait(ioSession, 1009, "Message too big");
+ break;
+ }
+ if (r1 && s.decChain != null) {
+ final byte[] comp = toBytes(payload);
+ final byte[] plain;
+ try {
+ plain = s.decChain.decode(comp);
+ } catch (final Exception e) {
+ initiateCloseAndWait(ioSession, 1007, "Extension decode failed");
+ s.inbuf.clear();
+ return;
+ }
+ deliverSingle(op, ByteBuffer.wrap(plain));
+ } else {
+ deliverSingle(op, payload.asReadOnlyBuffer());
+ }
+ break;
+ }
+ default: {
+ initiateCloseAndWait(ioSession, 1002, "Unsupported opcode: " + op);
+ s.inbuf.clear();
+ return;
+ }
+ }
+ }
+ s.inbuf.compact();
+ } catch (final Exception ex) {
+ onException(ioSession, ex);
+ ioSession.close(CloseMode.GRACEFUL);
+ }
+ }
+
+ private void appendToInbuf(final ByteBuffer src) {
+ if (src == null || !src.hasRemaining()) {
+ return;
+ }
+ if (s.inbuf.remaining() < src.remaining()) {
+ final int need = s.inbuf.position() + src.remaining();
+ final int newCap = Math.max(s.inbuf.capacity() * 2, need);
+ final ByteBuffer bigger = ByteBuffer.allocate(newCap);
+ s.inbuf.flip();
+ bigger.put(s.inbuf);
+ s.inbuf = bigger;
+ }
+ s.inbuf.put(src);
+ }
+
+ private void startMessage(final int opcode, final ByteBuffer payload, final boolean rsv1, final IOSession ioSession) {
+ s.assemblingOpcode = opcode;
+ s.assemblingCompressed = rsv1 && s.decChain != null;
+ s.assemblingBytes = new java.io.ByteArrayOutputStream(Math.max(1024, payload.remaining()));
+ s.assemblingSize = 0L;
+ appendToMessage(payload, ioSession);
+ }
+
+ private void appendToMessage(final ByteBuffer payload, final IOSession ioSession) {
+ final ByteBuffer dup = payload.asReadOnlyBuffer();
+ final int n = dup.remaining();
+ s.assemblingSize += n;
+ if (s.cfg.getMaxMessageSize() > 0 && s.assemblingSize > s.cfg.getMaxMessageSize()) {
+ initiateCloseAndWait(ioSession, 1009, "Message too big");
+ return;
+ }
+ final byte[] tmp = new byte[n];
+ dup.get(tmp);
+ s.assemblingBytes.write(tmp, 0, n);
+ }
+
+ private void deliverAssembledMessage() {
+ final byte[] body = s.assemblingBytes.toByteArray();
+ final int op = s.assemblingOpcode;
+ final boolean compressed = s.assemblingCompressed;
+
+ s.assemblingOpcode = -1;
+ s.assemblingCompressed = false;
+ s.assemblingBytes = null;
+ s.assemblingSize = 0L;
+
+ byte[] data = body;
+ if (compressed && s.decChain != null) {
+ try {
+ data = s.decChain.decode(body);
+ } catch (final Exception e) {
+ try {
+ s.listener.onError(e);
+ } catch (final Throwable ignore) {
+ }
+ return;
+ }
+ }
+
+ if (op == FrameOpcode.TEXT) {
+ final CharsetDecoder dec = StandardCharsets.UTF_8.newDecoder()
+ .onMalformedInput(CodingErrorAction.REPORT)
+ .onUnmappableCharacter(CodingErrorAction.REPORT);
+ try {
+ final CharBuffer cb = dec.decode(ByteBuffer.wrap(data));
+ try {
+ s.listener.onText(cb, true);
+ } catch (final Throwable ignore) {
+ }
+ } catch (final CharacterCodingException cce) {
+ initiateCloseAndWait(s.session, 1007, "Invalid UTF-8 in text message");
+ }
+ } else if (op == FrameOpcode.BINARY) {
+ try {
+ s.listener.onBinary(ByteBuffer.wrap(data).asReadOnlyBuffer(), true);
+ } catch (final Throwable ignore) {
+ }
+ }
+ }
+
+ private void deliverSingle(final int op, final ByteBuffer payloadRO) {
+ if (op == FrameOpcode.TEXT) {
+ final CharsetDecoder dec = StandardCharsets.UTF_8.newDecoder()
+ .onMalformedInput(CodingErrorAction.REPORT)
+ .onUnmappableCharacter(CodingErrorAction.REPORT);
+ try {
+ final CharBuffer cb = dec.decode(payloadRO);
+ try {
+ s.listener.onText(cb, true);
+ } catch (final Throwable ignore) {
+ }
+ } catch (final CharacterCodingException cce) {
+ initiateCloseAndWait(s.session, 1007, "Invalid UTF-8 in text message");
+ }
+ } else if (op == FrameOpcode.BINARY) {
+ try {
+ s.listener.onBinary(payloadRO, true);
+ } catch (final Throwable ignore) {
+ }
+ }
+ }
+
+ private static byte[] toBytes(final ByteBuffer buf) {
+ final ByteBuffer b = buf.asReadOnlyBuffer();
+ final byte[] out = new byte[b.remaining()];
+ b.get(out);
+ return out;
+ }
+
+ private void initiateCloseAndWait(final IOSession ioSession, final int code, final String reason) {
+ if (!s.closeSent.get()) {
+ try {
+ final String truncated = CloseCodec.truncateReasonUtf8(reason);
+ final byte[] payloadBytes = CloseCodec.encode(code, truncated);
+ out.enqueueCtrl(out.pooledFrame(FrameOpcode.CLOSE, ByteBuffer.wrap(payloadBytes), true));
+ } catch (final Throwable ignore) {
+ }
+ s.session.setSocketTimeout(s.cfg.getCloseWaitTimeout());
+ }
+ notifyCloseOnce(code, reason);
+ }
+
+ private void notifyCloseOnce(final int code, final String reason) {
+ if (s.open.getAndSet(false)) {
+ try {
+ s.listener.onClose(code, reason == null ? "" : reason);
+ } catch (final Throwable ignore) {
+ }
+ }
+ }
+}
diff --git a/httpclient5-websocket/src/main/java/org/apache/hc/client5/http/websocket/transport/WebSocketIoHandler.java b/httpclient5-websocket/src/main/java/org/apache/hc/client5/http/websocket/transport/WebSocketIoHandler.java
new file mode 100644
index 0000000000..f45720c487
--- /dev/null
+++ b/httpclient5-websocket/src/main/java/org/apache/hc/client5/http/websocket/transport/WebSocketIoHandler.java
@@ -0,0 +1,121 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation. For more
+ * information on the Apache Software Foundation, please see
+ * .
+ *
+ */
+package org.apache.hc.client5.http.websocket.transport;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.hc.client5.http.websocket.api.WebSocket;
+import org.apache.hc.client5.http.websocket.api.WebSocketClientConfig;
+import org.apache.hc.client5.http.websocket.api.WebSocketListener;
+import org.apache.hc.client5.http.websocket.core.extension.ExtensionChain;
+import org.apache.hc.core5.annotation.Internal;
+import org.apache.hc.core5.http.nio.AsyncClientEndpoint;
+import org.apache.hc.core5.http.nio.command.ShutdownCommand;
+import org.apache.hc.core5.io.CloseMode;
+import org.apache.hc.core5.reactor.Command;
+import org.apache.hc.core5.reactor.EventMask;
+import org.apache.hc.core5.reactor.IOEventHandler;
+import org.apache.hc.core5.reactor.IOSession;
+import org.apache.hc.core5.reactor.ProtocolIOSession;
+import org.apache.hc.core5.util.Timeout;
+
+/**
+ * RFC6455/7692 WebSocket handler front-end. Delegates to WsInbound / WsOutbound.
+ */
+@Internal
+public final class WebSocketIoHandler implements IOEventHandler {
+
+ private final WebSocketSessionState state;
+ private final WebSocketInbound inbound;
+ private final WebSocketOutbound outbound;
+ private final AsyncClientEndpoint endpoint;
+ private final AtomicBoolean endpointReleased;
+
+ public WebSocketIoHandler(final ProtocolIOSession session,
+ final WebSocketListener listener,
+ final WebSocketClientConfig cfg,
+ final ExtensionChain chain,
+ final AsyncClientEndpoint endpoint) {
+ this.state = new WebSocketSessionState(session, listener, cfg, chain);
+ this.outbound = new WebSocketOutbound(state);
+ this.inbound = new WebSocketInbound(state, outbound);
+ this.endpoint = endpoint;
+ this.endpointReleased = new AtomicBoolean(false);
+ }
+
+ /**
+ * Expose the application WebSocket facade.
+ */
+ public WebSocket exposeWebSocket() {
+ return outbound.facade();
+ }
+
+ // ---- IOEventHandler ----
+ @Override
+ public void connected(final IOSession ioSession) {
+ inbound.onConnected(ioSession);
+ }
+
+ @Override
+ public void inputReady(final IOSession ioSession, final ByteBuffer src) {
+ inbound.onInputReady(ioSession, src);
+ }
+
+ @Override
+ public void outputReady(final IOSession ioSession) {
+ outbound.onOutputReady(ioSession);
+ }
+
+ @Override
+ public void timeout(final IOSession ioSession, final Timeout timeout) {
+ inbound.onTimeout(ioSession, timeout);
+ // Best-effort graceful close on timeout
+ ioSession.close(CloseMode.GRACEFUL);
+ }
+
+ @Override
+ public void exception(final IOSession ioSession, final Exception cause) {
+ inbound.onException(ioSession, cause);
+ ioSession.close(CloseMode.GRACEFUL);
+ }
+
+ @Override
+ public void disconnected(final IOSession ioSession) {
+ inbound.onDisconnected(ioSession);
+ ioSession.clearEvent(EventMask.READ | EventMask.WRITE);
+ // Ensure the underlying protocol session does not linger
+ state.session.enqueue(new ShutdownCommand(CloseMode.GRACEFUL), Command.Priority.IMMEDIATE);
+ if (endpoint != null && endpointReleased.compareAndSet(false, true)) {
+ try {
+ endpoint.releaseAndDiscard();
+ } catch (final Throwable ignore) {
+ // best effort
+ }
+ }
+ }
+}
diff --git a/httpclient5-websocket/src/main/java/org/apache/hc/client5/http/websocket/transport/WebSocketOutbound.java b/httpclient5-websocket/src/main/java/org/apache/hc/client5/http/websocket/transport/WebSocketOutbound.java
new file mode 100644
index 0000000000..856f4c21eb
--- /dev/null
+++ b/httpclient5-websocket/src/main/java/org/apache/hc/client5/http/websocket/transport/WebSocketOutbound.java
@@ -0,0 +1,486 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation. For more
+ * information on the Apache Software Foundation, please see
+ * .
+ *
+ */
+package org.apache.hc.client5.http.websocket.transport;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+import org.apache.hc.client5.http.websocket.api.WebSocket;
+import org.apache.hc.client5.http.websocket.core.frame.FrameOpcode;
+import org.apache.hc.client5.http.websocket.core.message.CloseCodec;
+import org.apache.hc.core5.annotation.Internal;
+import org.apache.hc.core5.io.CloseMode;
+import org.apache.hc.core5.reactor.EventMask;
+import org.apache.hc.core5.reactor.IOSession;
+import org.apache.hc.core5.util.Args;
+
+/**
+ * Outbound path: frame building, queues, writing, and the app-facing WebSocket facade.
+ */
+@Internal
+final class WebSocketOutbound {
+
+ static final class OutFrame {
+
+ final ByteBuffer buf;
+ final boolean pooled;
+
+ OutFrame(final ByteBuffer buf, final boolean pooled) {
+ this.buf = buf;
+ this.pooled = pooled;
+ }
+ }
+
+ private final WebSocketSessionState s;
+ private final WebSocket facade;
+
+ WebSocketOutbound(final WebSocketSessionState s) {
+ this.s = s;
+ this.facade = new Facade();
+ }
+
+ WebSocket facade() {
+ return facade;
+ }
+
+ // ---------------------------------------------------- IO writing ---------
+
+ void onOutputReady(final IOSession ioSession) {
+ try {
+ int framesThisTick = 0;
+
+ while (framesThisTick < s.maxFramesPerTick) {
+
+ if (s.activeWrite != null && s.activeWrite.buf.hasRemaining()) {
+ final int written = ioSession.write(s.activeWrite.buf);
+ if (written == 0) {
+ ioSession.setEvent(EventMask.WRITE);
+ return;
+ }
+ if (!s.activeWrite.buf.hasRemaining()) {
+ release(s.activeWrite);
+ s.activeWrite = null;
+ framesThisTick++;
+ } else {
+ ioSession.setEvent(EventMask.WRITE);
+ return;
+ }
+ continue;
+ }
+
+ final OutFrame ctrl = s.ctrlOutbound.poll();
+ if (ctrl != null) {
+ s.activeWrite = ctrl;
+ continue;
+ }
+
+ final OutFrame data = s.dataOutbound.poll();
+ if (data != null) {
+ s.activeWrite = data;
+ continue;
+ }
+
+ ioSession.clearEvent(EventMask.WRITE);
+ if (s.closeAfterFlush && s.activeWrite == null && s.ctrlOutbound.isEmpty() && s.dataOutbound.isEmpty()) {
+ ioSession.close(CloseMode.GRACEFUL);
+ }
+ return;
+ }
+
+ if (s.activeWrite != null && s.activeWrite.buf.hasRemaining()) {
+ ioSession.setEvent(EventMask.WRITE);
+ } else {
+ ioSession.clearEvent(EventMask.WRITE);
+ }
+
+ if (s.closeAfterFlush && s.activeWrite == null && s.ctrlOutbound.isEmpty() && s.dataOutbound.isEmpty()) {
+ ioSession.close(CloseMode.GRACEFUL);
+ }
+
+ } catch (final Exception ex) {
+ try {
+ s.listener.onError(ex);
+ } finally {
+ s.session.close(CloseMode.GRACEFUL);
+ }
+ }
+ }
+
+ private void release(final OutFrame frame) {
+ if (frame.pooled) {
+ s.bufferPool.release(frame.buf);
+ }
+ }
+
+ boolean enqueueCtrl(final OutFrame frame) {
+ final boolean closeFrame = isCloseFrame(frame.buf);
+
+ if (!closeFrame && (!s.open.get() || s.closeSent.get())) {
+ release(frame);
+ return false;
+ }
+
+ if (closeFrame) {
+ if (!s.closeSent.compareAndSet(false, true)) {
+ release(frame);
+ return false;
+ }
+ } else {
+ final int max = s.cfg.getMaxOutboundControlQueue();
+ if (max > 0 && s.ctrlOutbound.size() >= max) {
+ release(frame);
+ return false;
+ }
+ }
+ s.ctrlOutbound.offer(frame);
+ s.session.setEvent(EventMask.WRITE);
+ return true;
+ }
+
+
+ boolean enqueueData(final OutFrame frame) {
+ if (!s.open.get() || s.closeSent.get()) {
+ release(frame);
+ return false;
+ }
+ s.dataOutbound.offer(frame);
+ s.session.setEvent(EventMask.WRITE);
+ return true;
+ }
+
+ private static boolean isCloseFrame(final ByteBuffer buf) {
+ if (buf.remaining() < 2) {
+ return false;
+ }
+ final int pos = buf.position();
+ final byte b1 = buf.get(pos);
+ final int opcode = b1 & 0x0F;
+ return opcode == FrameOpcode.CLOSE;
+ }
+
+ // package-private so WebSocketInbound can use them
+ OutFrame pooledFrame(final int opcode, final ByteBuffer payload, final boolean fin) {
+ final ByteBuffer ro = payload == null ? ByteBuffer.allocate(0) : payload.asReadOnlyBuffer();
+ final int len = ro.remaining();
+
+ final int headerEstimate;
+ if (len <= 125) {
+ headerEstimate = 2 + 4; // 2-byte header + 4-byte mask
+ } else if (len <= 0xFFFF) {
+ headerEstimate = 4 + 4; // 4-byte header + 4-byte mask
+ } else {
+ headerEstimate = 10 + 4; // 10-byte header + 4-byte mask
+ }
+
+ final int totalSize = headerEstimate + len;
+
+ final ByteBuffer buf;
+ final boolean pooled;
+ if (totalSize <= s.bufferPool.getBufferSize()) {
+ buf = s.bufferPool.acquire();
+ pooled = true;
+ } else {
+ buf = ByteBuffer.allocate(totalSize);
+ pooled = false;
+ }
+
+ buf.clear();
+ // opcode (int), payload (ByteBuffer), fin (boolean), mask (boolean), out (ByteBuffer)
+ s.writer.frameInto(opcode, ro, fin, true, buf);
+ buf.flip();
+
+ return new OutFrame(buf, pooled);
+ }
+
+ // package-private so WebSocketInbound can use it for close echo
+ OutFrame pooledCloseEcho(final ByteBuffer payload) {
+ final ByteBuffer ro = payload == null ? ByteBuffer.allocate(0) : payload.asReadOnlyBuffer();
+ final int len = ro.remaining();
+
+ final int headerEstimate;
+ if (len <= 125) {
+ headerEstimate = 2 + 4;
+ } else if (len <= 0xFFFF) {
+ headerEstimate = 4 + 4;
+ } else {
+ headerEstimate = 10 + 4;
+ }
+
+ final int totalSize = headerEstimate + len;
+
+ final ByteBuffer buf;
+ final boolean pooled;
+ if (totalSize <= s.bufferPool.getBufferSize()) {
+ buf = s.bufferPool.acquire();
+ pooled = true;
+ } else {
+ buf = ByteBuffer.allocate(totalSize);
+ pooled = false;
+ }
+
+ buf.clear();
+ s.writer.frameInto(FrameOpcode.CLOSE, ro, true, true, buf);
+ buf.flip();
+
+ return new OutFrame(buf, pooled);
+ }
+
+ // package-private: used by WebSocketInbound.onDisconnected()
+ void drainAndRelease() {
+ if (s.activeWrite != null) {
+ release(s.activeWrite);
+ s.activeWrite = null;
+ }
+ OutFrame f;
+ while ((f = s.ctrlOutbound.poll()) != null) {
+ release(f);
+ }
+ while ((f = s.dataOutbound.poll()) != null) {
+ release(f);
+ }
+ }
+
+ // --------------------------------------------------------- Facade --------
+
+ private final class Facade implements WebSocket {
+
+ @Override
+ public boolean isOpen() {
+ return s.open.get() && !s.closeSent.get();
+ }
+
+ @Override
+ public boolean ping(final ByteBuffer data) {
+ if (!s.open.get() || s.closeSent.get()) {
+ return false;
+ }
+ final ByteBuffer ro = data == null ? ByteBuffer.allocate(0) : data.asReadOnlyBuffer();
+ if (ro.remaining() > 125) {
+ return false;
+ }
+ return enqueueCtrl(pooledFrame(FrameOpcode.PING, ro, true));
+ }
+
+ @Override
+ public boolean pong(final ByteBuffer data) {
+ if (!s.open.get() || s.closeSent.get()) {
+ return false;
+ }
+ final ByteBuffer ro = data == null ? ByteBuffer.allocate(0) : data.asReadOnlyBuffer();
+ if (ro.remaining() > 125) {
+ return false;
+ }
+ return enqueueCtrl(pooledFrame(FrameOpcode.PONG, ro, true));
+ }
+
+ @Override
+ public boolean sendText(final CharSequence data, final boolean finalFragment) {
+ if (!s.open.get() || s.closeSent.get() || data == null) {
+ return false;
+ }
+ final ByteBuffer utf8 = StandardCharsets.UTF_8.encode(data.toString());
+ return sendData(FrameOpcode.TEXT, utf8, finalFragment);
+ }
+
+ @Override
+ public boolean sendBinary(final ByteBuffer data, final boolean finalFragment) {
+ if (!s.open.get() || s.closeSent.get() || data == null) {
+ return false;
+ }
+ return sendData(FrameOpcode.BINARY, data, finalFragment);
+ }
+
+ private boolean sendData(final int opcode, final ByteBuffer data, final boolean fin) {
+ synchronized (s.writeLock) {
+ int currentOpcode = s.outOpcode == -1 ? opcode : FrameOpcode.CONT;
+ if (s.outOpcode == -1) {
+ s.outOpcode = opcode;
+ }
+
+ final ByteBuffer ro = data.asReadOnlyBuffer();
+ boolean ok = true;
+
+ while (ro.hasRemaining()) {
+ if (!s.open.get() || s.closeSent.get()) {
+ ok = false;
+ break;
+ }
+
+ final int n = Math.min(ro.remaining(), s.outChunk);
+
+ final int oldLimit = ro.limit();
+ final int newLimit = ro.position() + n;
+ ro.limit(newLimit);
+ final ByteBuffer slice = ro.slice();
+ ro.limit(oldLimit);
+ ro.position(newLimit);
+
+ final boolean lastSlice = !ro.hasRemaining() && fin;
+ if (!enqueueData(pooledFrame(currentOpcode, slice, lastSlice))) {
+ ok = false;
+ break;
+ }
+ currentOpcode = FrameOpcode.CONT;
+ }
+
+ if (fin || !ok) {
+ s.outOpcode = -1;
+ }
+ return ok;
+ }
+ }
+
+
+ @Override
+ public boolean sendTextBatch(final List fragments, final boolean finalFragment) {
+ if (!s.open.get() || s.closeSent.get() || fragments == null || fragments.isEmpty()) {
+ return false;
+ }
+ synchronized (s.writeLock) {
+ int currentOpcode = s.outOpcode == -1 ? FrameOpcode.TEXT : FrameOpcode.CONT;
+ if (s.outOpcode == -1) {
+ s.outOpcode = FrameOpcode.TEXT;
+ }
+
+ for (int i = 0; i < fragments.size(); i++) {
+ final CharSequence part = Args.notNull(fragments.get(i), "fragment");
+ final ByteBuffer utf8 = StandardCharsets.UTF_8.encode(part.toString());
+ final ByteBuffer ro = utf8.asReadOnlyBuffer();
+
+ while (ro.hasRemaining()) {
+ if (!s.open.get() || s.closeSent.get()) {
+ s.outOpcode = -1;
+ return false;
+ }
+ final int n = Math.min(ro.remaining(), s.outChunk);
+
+ final int oldLimit = ro.limit();
+ final int newLimit = ro.position() + n;
+ ro.limit(newLimit);
+ final ByteBuffer slice = ro.slice();
+ ro.limit(oldLimit);
+ ro.position(newLimit);
+
+ final boolean isLastFragment = i == fragments.size() - 1;
+ final boolean lastSlice = !ro.hasRemaining() && isLastFragment && finalFragment;
+
+ if (!enqueueData(pooledFrame(currentOpcode, slice, lastSlice))) {
+ s.outOpcode = -1;
+ return false;
+ }
+ currentOpcode = FrameOpcode.CONT;
+ }
+ }
+
+ if (finalFragment) {
+ s.outOpcode = -1;
+ }
+ return true;
+ }
+ }
+
+ @Override
+ public boolean sendBinaryBatch(final List fragments, final boolean finalFragment) {
+ if (!s.open.get() || s.closeSent.get() || fragments == null || fragments.isEmpty()) {
+ return false;
+ }
+ synchronized (s.writeLock) {
+ int currentOpcode = s.outOpcode == -1 ? FrameOpcode.BINARY : FrameOpcode.CONT;
+ if (s.outOpcode == -1) {
+ s.outOpcode = FrameOpcode.BINARY;
+ }
+
+ for (int i = 0; i < fragments.size(); i++) {
+ final ByteBuffer src = Args.notNull(fragments.get(i), "fragment").asReadOnlyBuffer();
+
+ while (src.hasRemaining()) {
+ if (!s.open.get() || s.closeSent.get()) {
+ s.outOpcode = -1;
+ return false;
+ }
+ final int n = Math.min(src.remaining(), s.outChunk);
+
+ final int oldLimit = src.limit();
+ final int newLimit = src.position() + n;
+ src.limit(newLimit);
+ final ByteBuffer slice = src.slice();
+ src.limit(oldLimit);
+ src.position(newLimit);
+
+ final boolean isLastFragment = i == fragments.size() - 1;
+ final boolean lastSlice = !src.hasRemaining() && isLastFragment && finalFragment;
+
+ if (!enqueueData(pooledFrame(currentOpcode, slice, lastSlice))) {
+ s.outOpcode = -1;
+ return false;
+ }
+ currentOpcode = FrameOpcode.CONT;
+ }
+ }
+
+ if (finalFragment) {
+ s.outOpcode = -1;
+ }
+ return true;
+ }
+ }
+
+ @Override
+ public CompletableFuture close(final int statusCode, final String reason) {
+ final CompletableFuture future = new CompletableFuture<>();
+
+ if (!s.open.get()) {
+ future.completeExceptionally(
+ new IllegalStateException("WebSocket is already closed"));
+ return future;
+ }
+
+ if (!CloseCodec.isValidToSend(statusCode)) {
+ future.completeExceptionally(
+ new IllegalArgumentException("Invalid close status code: " + statusCode));
+ return future;
+ }
+
+ final String truncated = CloseCodec.truncateReasonUtf8(reason);
+ final byte[] payloadBytes = CloseCodec.encode(statusCode, truncated);
+ final ByteBuffer payload = ByteBuffer.wrap(payloadBytes);
+
+ if (!enqueueCtrl(pooledFrame(FrameOpcode.CLOSE, payload, true))) {
+ future.completeExceptionally(
+ new IllegalStateException("WebSocket is closing or already closed"));
+ return future;
+ }
+
+ // cfg.getCloseWaitTimeout() is a Timeout, IOSession.setSocketTimeout(Timeout)
+ s.session.setSocketTimeout(s.cfg.getCloseWaitTimeout());
+ future.complete(null);
+ return future;
+ }
+ }
+}
diff --git a/httpclient5-websocket/src/main/java/org/apache/hc/client5/http/websocket/transport/WebSocketSessionState.java b/httpclient5-websocket/src/main/java/org/apache/hc/client5/http/websocket/transport/WebSocketSessionState.java
new file mode 100644
index 0000000000..c185a98444
--- /dev/null
+++ b/httpclient5-websocket/src/main/java/org/apache/hc/client5/http/websocket/transport/WebSocketSessionState.java
@@ -0,0 +1,116 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation. For more
+ * information on the Apache Software Foundation, please see
+ * .
+ *
+ */
+package org.apache.hc.client5.http.websocket.transport;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.hc.client5.http.websocket.api.WebSocketClientConfig;
+import org.apache.hc.client5.http.websocket.api.WebSocketListener;
+import org.apache.hc.client5.http.websocket.core.extension.ExtensionChain;
+import org.apache.hc.client5.http.websocket.core.frame.WebSocketFrameWriter;
+import org.apache.hc.client5.http.websocket.core.util.ByteBufferPool;
+import org.apache.hc.core5.annotation.Internal;
+import org.apache.hc.core5.reactor.ProtocolIOSession;
+
+/**
+ * Shared state & resources.
+ */
+@Internal
+final class WebSocketSessionState {
+
+ // External
+ final ProtocolIOSession session;
+ final WebSocketListener listener;
+ final WebSocketClientConfig cfg;
+
+ // Extensions
+ final ExtensionChain.EncodeChain encChain; // (not used yet for outbound compression)
+ final ExtensionChain.DecodeChain decChain;
+
+ // Buffers & codec
+ final ByteBufferPool bufferPool;
+ final WebSocketFrameWriter writer = new WebSocketFrameWriter();
+ final WebSocketFrameDecoder decoder;
+
+ // Read side
+ ByteBuffer readBuf;
+ ByteBuffer inbuf = ByteBuffer.allocate(4096);
+
+ // Outbound queues
+ final ConcurrentLinkedQueue ctrlOutbound = new ConcurrentLinkedQueue<>();
+ final ConcurrentLinkedQueue dataOutbound = new ConcurrentLinkedQueue<>();
+ WebSocketOutbound.OutFrame activeWrite = null;
+
+ // Flags / locks
+ final AtomicBoolean open = new AtomicBoolean(true);
+ final AtomicBoolean closeSent = new AtomicBoolean(false);
+ final AtomicBoolean closeReceived = new AtomicBoolean(false);
+ volatile boolean closeAfterFlush = false;
+ final Object writeLock = new Object();
+
+ // Message assembly
+ int assemblingOpcode = -1;
+ boolean assemblingCompressed = false;
+ java.io.ByteArrayOutputStream assemblingBytes = null;
+ long assemblingSize = 0L;
+
+ // Outbound fragmentation
+ int outOpcode = -1;
+ final int outChunk;
+ final int maxFramesPerTick;
+
+ WebSocketSessionState(final ProtocolIOSession session,
+ final WebSocketListener listener,
+ final WebSocketClientConfig cfg,
+ final ExtensionChain chain) {
+ this.session = session;
+ this.listener = listener;
+ this.cfg = cfg;
+
+ this.decoder = new WebSocketFrameDecoder(cfg.getMaxFrameSize(), false);
+
+ this.outChunk = Math.max(256, cfg.getOutgoingChunkSize());
+ this.maxFramesPerTick = Math.max(1, cfg.getMaxFramesPerTick());
+
+ if (chain != null && !chain.isEmpty()) {
+ this.encChain = chain.newEncodeChain();
+ this.decChain = chain.newDecodeChain();
+ } else {
+ this.encChain = null;
+ this.decChain = null;
+ }
+
+ final int poolBufSize = Math.max(8192, this.outChunk);
+ final int poolCapacity = Math.max(16, cfg.getIoPoolCapacity());
+ this.bufferPool = new ByteBufferPool(poolBufSize, poolCapacity, cfg.isDirectBuffers());
+
+ // Borrow one read buffer upfront
+ this.readBuf = bufferPool.acquire();
+ }
+}
diff --git a/httpclient5-websocket/src/main/java/org/apache/hc/client5/http/websocket/transport/WebSocketUpgrader.java b/httpclient5-websocket/src/main/java/org/apache/hc/client5/http/websocket/transport/WebSocketUpgrader.java
new file mode 100644
index 0000000000..c94f8473d7
--- /dev/null
+++ b/httpclient5-websocket/src/main/java/org/apache/hc/client5/http/websocket/transport/WebSocketUpgrader.java
@@ -0,0 +1,114 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation. For more
+ * information on the Apache Software Foundation, please see
+ * .
+ *
+ */
+package org.apache.hc.client5.http.websocket.transport;
+
+import org.apache.hc.client5.http.websocket.api.WebSocket;
+import org.apache.hc.client5.http.websocket.api.WebSocketClientConfig;
+import org.apache.hc.client5.http.websocket.api.WebSocketListener;
+import org.apache.hc.client5.http.websocket.core.extension.ExtensionChain;
+import org.apache.hc.core5.annotation.Internal;
+import org.apache.hc.core5.concurrent.FutureCallback;
+import org.apache.hc.core5.http.nio.AsyncClientEndpoint;
+import org.apache.hc.core5.reactor.ProtocolIOSession;
+import org.apache.hc.core5.reactor.ProtocolUpgradeHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Bridges HttpCore protocol upgrade to a WebSocket {@link WebSocketIoHandler}.
+ *
+ * IMPORTANT: This class does NOT call {@link WebSocketListener#onOpen(WebSocket)}.
+ * The caller performs notification after {@code switchProtocol(...)} completes.
+ */
+@Internal
+public final class WebSocketUpgrader implements ProtocolUpgradeHandler {
+
+ private static final Logger LOG = LoggerFactory.getLogger(WebSocketUpgrader.class);
+
+ private final WebSocketListener listener;
+ private final WebSocketClientConfig cfg;
+ private final ExtensionChain chain;
+ private final AsyncClientEndpoint endpoint;
+
+ /**
+ * The WebSocket facade created during {@link #upgrade}.
+ */
+ private volatile WebSocket webSocket;
+
+ public WebSocketUpgrader(
+ final WebSocketListener listener,
+ final WebSocketClientConfig cfg,
+ final ExtensionChain chain) {
+ this(listener, cfg, chain, null);
+ }
+
+ public WebSocketUpgrader(
+ final WebSocketListener listener,
+ final WebSocketClientConfig cfg,
+ final ExtensionChain chain,
+ final AsyncClientEndpoint endpoint) {
+ this.listener = listener;
+ this.cfg = cfg;
+ this.chain = chain;
+ this.endpoint = endpoint;
+ }
+
+ /**
+ * Returns the {@link WebSocket} created during {@link #upgrade}.
+ */
+ public WebSocket getWebSocket() {
+ return webSocket;
+ }
+
+ @Override
+ public void upgrade(final ProtocolIOSession ioSession,
+ final FutureCallback callback) {
+ try {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Installing WsHandler on {}", ioSession);
+ }
+
+ final WebSocketIoHandler handler = new WebSocketIoHandler(ioSession, listener, cfg, chain, endpoint);
+ ioSession.upgrade(handler);
+
+ this.webSocket = handler.exposeWebSocket();
+
+ if (callback != null) {
+ callback.completed(ioSession);
+ }
+ } catch (final Exception ex) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("WebSocket upgrade failed", ex);
+ }
+ if (callback != null) {
+ callback.failed(ex);
+ } else {
+ throw ex;
+ }
+ }
+ }
+}
diff --git a/httpclient5-websocket/src/main/java/org/apache/hc/client5/http/websocket/transport/package-info.java b/httpclient5-websocket/src/main/java/org/apache/hc/client5/http/websocket/transport/package-info.java
new file mode 100644
index 0000000000..07a727f485
--- /dev/null
+++ b/httpclient5-websocket/src/main/java/org/apache/hc/client5/http/websocket/transport/package-info.java
@@ -0,0 +1,37 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation. For more
+ * information on the Apache Software Foundation, please see
+ * .
+ *
+ */
+
+/**
+ * Integration with Apache HttpCore I/O reactor.
+ *
+ * Protocol upgrade hooks and the reactor {@code IOEventHandler} that
+ * implements RFC 6455/7692 on top of HttpCore. Internal API — subject
+ * to change without notice.
+ *
+ * @since 5.6
+ */
+package org.apache.hc.client5.http.websocket.transport;
diff --git a/httpclient5-websocket/src/test/java/org/apache/hc/client5/http/websocket/api/WebSocketClientConfigTest.java b/httpclient5-websocket/src/test/java/org/apache/hc/client5/http/websocket/api/WebSocketClientConfigTest.java
new file mode 100644
index 0000000000..d4d94b2ff0
--- /dev/null
+++ b/httpclient5-websocket/src/test/java/org/apache/hc/client5/http/websocket/api/WebSocketClientConfigTest.java
@@ -0,0 +1,57 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation. For more
+ * information on the Apache Software Foundation, please see
+ * .
+ *
+ */
+package org.apache.hc.client5.http.websocket.api;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import org.apache.hc.core5.util.Timeout;
+import org.junit.jupiter.api.Test;
+
+final class WebSocketClientConfigTest {
+
+ @Test
+ void builderDefaultsAndCustom() {
+ final WebSocketClientConfig def = WebSocketClientConfig.custom().build();
+ assertTrue(def.isAutoPong());
+ assertTrue(def.getMaxFrameSize() > 0);
+ assertTrue(def.getMaxMessageSize() > 0);
+
+ final WebSocketClientConfig cfg = WebSocketClientConfig.custom()
+ .setAutoPong(false)
+ .setMaxFrameSize(1024)
+ .setMaxMessageSize(2048)
+ .setConnectTimeout(Timeout.ofSeconds(3))
+ .build();
+
+ assertFalse(cfg.isAutoPong());
+ assertEquals(1024, cfg.getMaxFrameSize());
+ assertEquals(2048, cfg.getMaxMessageSize());
+ assertEquals(Timeout.ofSeconds(3), cfg.getConnectTimeout());
+ }
+}
diff --git a/httpclient5-websocket/src/test/java/org/apache/hc/client5/http/websocket/client/WebSocketClientTest.java b/httpclient5-websocket/src/test/java/org/apache/hc/client5/http/websocket/client/WebSocketClientTest.java
new file mode 100644
index 0000000000..5e233d0ef9
--- /dev/null
+++ b/httpclient5-websocket/src/test/java/org/apache/hc/client5/http/websocket/client/WebSocketClientTest.java
@@ -0,0 +1,344 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation. For more
+ * information on the Apache Software Foundation, please see
+ * .
+ *
+ */
+package org.apache.hc.client5.http.websocket.client;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.net.URI;
+import java.nio.ByteBuffer;
+import java.nio.CharBuffer;
+import java.nio.charset.StandardCharsets;
+import java.time.Instant;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hc.client5.http.websocket.api.WebSocket;
+import org.apache.hc.client5.http.websocket.api.WebSocketClientConfig;
+import org.apache.hc.client5.http.websocket.api.WebSocketListener;
+import org.apache.hc.core5.http.protocol.HttpContext;
+import org.apache.hc.core5.io.CloseMode;
+import org.apache.hc.core5.reactor.IOReactorStatus;
+import org.apache.hc.core5.util.TimeValue;
+import org.junit.jupiter.api.Test;
+
+final class WebSocketClientTest {
+
+ private static final class NoNetworkClient extends CloseableWebSocketClient {
+
+ @Override
+ public void start() {
+ // no-op
+ }
+
+ @Override
+ public IOReactorStatus getStatus() {
+ return IOReactorStatus.ACTIVE;
+ }
+
+ @Override
+ public void awaitShutdown(final TimeValue waitTime) {
+ // no-op
+ }
+
+ @Override
+ public void initiateShutdown() {
+ // no-op
+ }
+
+ // ModalCloseable (if your ModalCloseable declares this)
+ public void close(final CloseMode closeMode) {
+ // no-op
+ }
+
+ // Closeable
+ @Override
+ public void close() {
+ // no-op – needed for try-with-resources
+ }
+
+ @Override
+ protected CompletableFuture doConnect(
+ final URI uri,
+ final WebSocketListener listener,
+ final WebSocketClientConfig cfg,
+ final HttpContext context) {
+
+ final CompletableFuture f = new CompletableFuture<>();
+ final LocalLoopWebSocket ws = new LocalLoopWebSocket(listener, cfg);
+ try {
+ listener.onOpen(ws);
+ } catch (final Throwable ignore) {
+ }
+ f.complete(ws);
+ return f;
+ }
+ }
+
+ private static final class LocalLoopWebSocket implements WebSocket {
+ private final WebSocketListener listener;
+ private final WebSocketClientConfig cfg;
+ private volatile boolean open = true;
+
+ LocalLoopWebSocket(final WebSocketListener listener, final WebSocketClientConfig cfg) {
+ this.listener = listener;
+ this.cfg = cfg != null ? cfg : WebSocketClientConfig.custom().build();
+ }
+
+ @Override
+ public boolean sendText(final CharSequence data, final boolean finalFragment) {
+ if (!open) {
+ return false;
+ }
+ if (cfg.getMaxMessageSize() > 0 && data != null && data.length() > cfg.getMaxMessageSize()) {
+ // Simulate client closing due to oversized message
+ try {
+ listener.onClose(1009, "Message too big");
+ } catch (final Throwable ignore) {
+ }
+ open = false;
+ return false;
+ }
+ try {
+ final CharBuffer cb = data != null ? CharBuffer.wrap(data) : CharBuffer.allocate(0);
+ listener.onText(cb, finalFragment);
+ } catch (final Throwable ignore) {
+ }
+ return true;
+ }
+
+ @Override
+ public boolean sendBinary(final ByteBuffer data, final boolean finalFragment) {
+ if (!open) {
+ return false;
+ }
+ try {
+ listener.onBinary(data != null ? data.asReadOnlyBuffer() : ByteBuffer.allocate(0), finalFragment);
+ } catch (final Throwable ignore) {
+ }
+ return true;
+ }
+
+ @Override
+ public boolean ping(final ByteBuffer data) {
+ if (!open) {
+ return false;
+ }
+ try {
+ listener.onPong(data != null ? data.asReadOnlyBuffer() : ByteBuffer.allocate(0));
+ } catch (final Throwable ignore) {
+ }
+ return true;
+ }
+
+ @Override
+ public boolean pong(final ByteBuffer data) {
+ // In a real client this would send a PONG; here it's a no-op.
+ return open;
+ }
+
+ @Override
+ public CompletableFuture close(final int statusCode, final String reason) {
+ final CompletableFuture f = new CompletableFuture<>();
+ if (!open) {
+ f.complete(null);
+ return f;
+ }
+ open = false;
+ try {
+ listener.onClose(statusCode, reason != null ? reason : "");
+ } catch (final Throwable ignore) {
+ }
+ f.complete(null);
+ return f;
+ }
+
+ @Override
+ public boolean isOpen() {
+ return open;
+ }
+
+ @Override
+ public boolean sendTextBatch(final List fragments, final boolean finalFragment) {
+ if (!open) {
+ return false;
+ }
+ if (fragments == null || fragments.isEmpty()) {
+ return true;
+ }
+ for (int i = 0; i < fragments.size(); i++) {
+ final boolean last = i == fragments.size() - 1 && finalFragment;
+ if (!sendText(fragments.get(i), last)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ @Override
+ public boolean sendBinaryBatch(final List fragments, final boolean finalFragment) {
+ if (!open) {
+ return false;
+ }
+ if (fragments == null || fragments.isEmpty()) {
+ return true;
+ }
+ for (int i = 0; i < fragments.size(); i++) {
+ final boolean last = i == fragments.size() - 1 && finalFragment;
+ if (!sendBinary(fragments.get(i), last)) {
+ return false;
+ }
+ }
+ return true;
+ }
+ }
+
+ private static CloseableWebSocketClient newClient() {
+ final CloseableWebSocketClient c = new NoNetworkClient();
+ c.start();
+ return c;
+ }
+
+ // ------------------------------- Tests -----------------------------------
+
+ @Test
+ void echo_uncompressed_no_network() throws Exception {
+ final CountDownLatch done = new CountDownLatch(1);
+ final StringBuilder echoed = new StringBuilder();
+
+ try (final CloseableWebSocketClient client = newClient()) {
+ final WebSocketClientConfig cfg = WebSocketClientConfig.custom()
+ .enablePerMessageDeflate(false)
+ .build();
+
+ client.connect(URI.create("ws://example/echo"), new WebSocketListener() {
+ private WebSocket ws;
+
+ @Override
+ public void onOpen(final WebSocket ws) {
+ this.ws = ws;
+ final String prefix = "hello @ " + Instant.now() + " — ";
+ final StringBuilder sb = new StringBuilder();
+ for (int i = 0; i < 16; i++) {
+ sb.append(prefix);
+ }
+ ws.sendText(sb, true);
+ }
+
+ @Override
+ public void onText(final CharBuffer text, final boolean last) {
+ echoed.append(text);
+ ws.close(1000, "done");
+ }
+
+ @Override
+ public void onClose(final int code, final String reason) {
+ assertEquals(1000, code);
+ assertEquals("done", reason);
+ assertTrue(echoed.length() > 0);
+ done.countDown();
+ }
+
+ @Override
+ public void onError(final Throwable ex) {
+ done.countDown();
+ }
+ }, cfg, null);
+
+ assertTrue(done.await(3, TimeUnit.SECONDS));
+ }
+ }
+
+ @Test
+ void ping_interleaved_fragmentation_no_network() throws Exception {
+ final CountDownLatch gotText = new CountDownLatch(1);
+ final CountDownLatch gotPong = new CountDownLatch(1);
+
+ try (final CloseableWebSocketClient client = newClient()) {
+ final WebSocketClientConfig cfg = WebSocketClientConfig.custom()
+ .enablePerMessageDeflate(false)
+ .build();
+
+ client.connect(URI.create("ws://example/interleave"), new WebSocketListener() {
+
+ @Override
+ public void onOpen(final WebSocket ws) {
+ ws.ping(StandardCharsets.UTF_8.encode("ping"));
+ ws.sendText("hello", true);
+ }
+
+ @Override
+ public void onText(final CharBuffer text, final boolean last) {
+ gotText.countDown();
+ }
+
+ @Override
+ public void onPong(final ByteBuffer payload) {
+ gotPong.countDown();
+ }
+ }, cfg, null);
+
+ assertTrue(gotPong.await(2, TimeUnit.SECONDS));
+ assertTrue(gotText.await(2, TimeUnit.SECONDS));
+ }
+ }
+
+ @Test
+ void max_message_1009_no_network() throws Exception {
+ final CountDownLatch done = new CountDownLatch(1);
+ final int maxMessage = 2048;
+
+ try (final CloseableWebSocketClient client = newClient()) {
+ final WebSocketClientConfig cfg = WebSocketClientConfig.custom()
+ .setMaxMessageSize(maxMessage)
+ .enablePerMessageDeflate(false)
+ .build();
+
+ client.connect(URI.create("ws://example/echo"), new WebSocketListener() {
+ @Override
+ public void onOpen(final WebSocket ws) {
+ final StringBuilder sb = new StringBuilder();
+ final String chunk = "1234567890abcdef-";
+ while (sb.length() <= maxMessage * 2) {
+ sb.append(chunk);
+ }
+ ws.sendText(sb, true);
+ }
+
+ @Override
+ public void onClose(final int code, final String reason) {
+ assertEquals(1009, code);
+ done.countDown();
+ }
+ }, cfg, null);
+
+ assertTrue(done.await(2, TimeUnit.SECONDS));
+ }
+ }
+}
diff --git a/httpclient5-websocket/src/test/java/org/apache/hc/client5/http/websocket/core/extension/ExtensionChainTest.java b/httpclient5-websocket/src/test/java/org/apache/hc/client5/http/websocket/core/extension/ExtensionChainTest.java
new file mode 100644
index 0000000000..39408db577
--- /dev/null
+++ b/httpclient5-websocket/src/test/java/org/apache/hc/client5/http/websocket/core/extension/ExtensionChainTest.java
@@ -0,0 +1,50 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation. For more
+ * information on the Apache Software Foundation, please see
+ * .
+ *
+ */
+package org.apache.hc.client5.http.websocket.core.extension;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+
+import java.nio.charset.StandardCharsets;
+
+import org.junit.jupiter.api.Test;
+
+final class ExtensionChainTest {
+
+ @Test
+ void addAndUsePmce_decodeRoundTrip() throws Exception {
+ final ExtensionChain chain = new ExtensionChain();
+ final PerMessageDeflate pmce = new PerMessageDeflate(true, true, true, null, null);
+ chain.add(pmce);
+
+ final byte[] data = "compress me please".getBytes(StandardCharsets.UTF_8);
+
+ final WebSocketExtensionChain.Encoded enc = pmce.newEncoder().encode(data, true, true);
+ final byte[] back = chain.newDecodeChain().decode(enc.payload);
+
+ assertArrayEquals(data, back);
+ }
+}
diff --git a/httpclient5-websocket/src/test/java/org/apache/hc/client5/http/websocket/core/extension/MessageDeflateTest.java b/httpclient5-websocket/src/test/java/org/apache/hc/client5/http/websocket/core/extension/MessageDeflateTest.java
new file mode 100644
index 0000000000..1436c98353
--- /dev/null
+++ b/httpclient5-websocket/src/test/java/org/apache/hc/client5/http/websocket/core/extension/MessageDeflateTest.java
@@ -0,0 +1,90 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation. For more
+ * information on the Apache Software Foundation, please see
+ * .
+ *
+ */
+package org.apache.hc.client5.http.websocket.core.extension;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.nio.charset.StandardCharsets;
+
+import org.apache.hc.client5.http.websocket.core.frame.FrameHeaderBits;
+import org.junit.jupiter.api.Test;
+
+final class MessageDeflateTest {
+
+ @Test
+ void rsvMask_isRSV1() {
+ final PerMessageDeflate pmce = new PerMessageDeflate(true, false, false, null, null);
+ assertEquals(FrameHeaderBits.RSV1, pmce.rsvMask());
+ }
+
+ @Test
+ void encode_setsRSVOnlyOnFirst() {
+ final PerMessageDeflate pmce = new PerMessageDeflate(true, false, false, null, null);
+ final WebSocketExtensionChain.Encoder enc = pmce.newEncoder();
+
+ final byte[] data = "hello".getBytes(StandardCharsets.UTF_8);
+
+ final WebSocketExtensionChain.Encoded first = enc.encode(data, true, false);
+ final WebSocketExtensionChain.Encoded cont = enc.encode(data, false, true);
+
+ assertTrue(first.setRsvOnFirst, "RSV on first fragment");
+ assertFalse(cont.setRsvOnFirst, "no RSV on continuation");
+ assertNotEquals(0, first.payload.length);
+ assertNotEquals(0, cont.payload.length);
+ }
+
+ @Test
+ void roundTrip_message() throws Exception {
+ final PerMessageDeflate pmce = new PerMessageDeflate(true, true, true, null, null);
+ final WebSocketExtensionChain.Encoder enc = pmce.newEncoder();
+ final WebSocketExtensionChain.Decoder dec = pmce.newDecoder();
+
+ final String s = "The quick brown fox jumps over the lazy dog. "
+ + "The quick brown fox jumps over the lazy dog.";
+ final byte[] plain = s.getBytes(StandardCharsets.UTF_8);
+
+ // Single-frame message: first=true, fin=true
+ final byte[] wire = enc.encode(plain, true, true).payload;
+
+ assertTrue(wire.length > 0);
+ assertFalse(endsWithTail(wire), "tail must be stripped on wire");
+
+ final byte[] roundTrip = dec.decode(wire);
+ assertArrayEquals(plain, roundTrip);
+ }
+
+ private static boolean endsWithTail(final byte[] b) {
+ if (b.length < 4) {
+ return false;
+ }
+ return b[b.length - 4] == 0x00 && b[b.length - 3] == 0x00 && (b[b.length - 2] & 0xFF) == 0xFF && (b[b.length - 1] & 0xFF) == 0xFF;
+ }
+}
diff --git a/httpclient5-websocket/src/test/java/org/apache/hc/client5/http/websocket/core/frame/FrameReaderTest.java b/httpclient5-websocket/src/test/java/org/apache/hc/client5/http/websocket/core/frame/FrameReaderTest.java
new file mode 100644
index 0000000000..16e8a14515
--- /dev/null
+++ b/httpclient5-websocket/src/test/java/org/apache/hc/client5/http/websocket/core/frame/FrameReaderTest.java
@@ -0,0 +1,183 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation. For more
+ * information on the Apache Software Foundation, please see
+ * .
+ *
+ */
+package org.apache.hc.client5.http.websocket.core.frame;
+
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+
+import org.apache.hc.client5.http.websocket.core.exceptions.WebSocketProtocolException;
+import org.apache.hc.client5.http.websocket.transport.WebSocketFrameDecoder;
+import org.junit.jupiter.api.Test;
+
+class FrameReaderTest {
+
+ private static ByteBuffer serverTextFrame(final String s) {
+ final byte[] p = s.getBytes(java.nio.charset.StandardCharsets.UTF_8);
+ final int len = p.length;
+ final ByteBuffer buf;
+ if (len <= 125) {
+ buf = ByteBuffer.allocate(2 + len);
+ buf.put((byte) 0x81); // FIN|TEXT
+ buf.put((byte) len); // no MASK
+ } else if (len <= 0xFFFF) {
+ buf = ByteBuffer.allocate(2 + 2 + len);
+ buf.put((byte) 0x81);
+ buf.put((byte) 126);
+ buf.putShort((short) len);
+ } else {
+ buf = ByteBuffer.allocate(2 + 8 + len);
+ buf.put((byte) 0x81);
+ buf.put((byte) 127);
+ buf.putLong(len);
+ }
+ buf.put(p);
+ buf.flip();
+ return buf;
+ }
+
+ @Test
+ void decode_small_text_unmasked() {
+ final ByteBuffer f = serverTextFrame("hello");
+ final WebSocketFrameDecoder d = new WebSocketFrameDecoder(8192);
+ assertTrue(d.decode(f));
+ assertEquals(FrameOpcode.TEXT, d.opcode());
+ assertTrue(d.fin());
+ assertFalse(d.rsv1());
+ assertEquals("hello", java.nio.charset.StandardCharsets.UTF_8.decode(d.payload()).toString());
+ }
+
+ @Test
+ void decode_extended_126_length() {
+ final byte[] p = new byte[300];
+ for (int i = 0; i < p.length; i++) {
+ p[i] = (byte) (i & 0xFF);
+ }
+ final ByteBuffer f = ByteBuffer.allocate(2 + 2 + p.length);
+ f.put((byte) 0x82); // FIN|BINARY
+ f.put((byte) 126);
+ f.putShort((short) p.length);
+ f.put(p);
+ f.flip();
+
+ final WebSocketFrameDecoder d = new WebSocketFrameDecoder(4096);
+ assertTrue(d.decode(f));
+ assertEquals(FrameOpcode.BINARY, d.opcode());
+ final ByteBuffer payload = d.payload();
+ final byte[] got = new byte[p.length];
+ payload.get(got);
+ assertArrayEquals(p, got);
+ }
+
+ @Test
+ void decode_extended_127_length() {
+ final int len = 66000;
+ final byte[] p = new byte[len];
+ Arrays.fill(p, (byte) 0xAB);
+ final ByteBuffer f = ByteBuffer.allocate(2 + 8 + len);
+ f.put((byte) 0x82); // FIN|BINARY
+ f.put((byte) 127);
+ f.putLong(len);
+ f.put(p);
+ f.flip();
+
+ final WebSocketFrameDecoder d = new WebSocketFrameDecoder(len + 64);
+ assertTrue(d.decode(f));
+ assertEquals(len, d.payload().remaining());
+ }
+
+ @Test
+ void masked_server_frame_is_rejected() {
+ // FIN|TEXT, MASK bit set, len=0, + 4-byte mask key
+ final ByteBuffer f = ByteBuffer.allocate(2 + 4);
+ f.put((byte) 0x81);
+ f.put((byte) 0x80);
+ f.putInt(0x11223344);
+ f.flip();
+
+ final WebSocketFrameDecoder d = new WebSocketFrameDecoder(1024);
+ assertThrows(WebSocketProtocolException.class, () -> d.decode(f));
+ }
+
+ @Test
+ void rsv_bits_without_extension_is_rejected() {
+ final ByteBuffer f = ByteBuffer.allocate(2);
+ f.put((byte) 0xC1); // FIN|RSV1|TEXT
+ f.put((byte) 0x00); // no mask, len=0
+ f.flip();
+
+ final WebSocketFrameDecoder d = new WebSocketFrameDecoder(1024); // strict by default
+ final WebSocketProtocolException ex =
+ assertThrows(WebSocketProtocolException.class, () -> d.decode(f));
+ assertEquals(1002, ex.closeCode);
+ }
+
+ @Test
+ void partial_buffer_returns_false_and_does_not_consume() {
+ final ByteBuffer f = ByteBuffer.allocate(2);
+ f.put((byte) 0x81);
+ f.put((byte) 0x7E); // says 126 (extended), but no length bytes present
+ f.flip();
+
+ final WebSocketFrameDecoder d = new WebSocketFrameDecoder(1024);
+ final int pos = f.position();
+ assertFalse(d.decode(f));
+ assertEquals(pos, f.position(), "decoder must reset position on incomplete frame");
+ }
+
+ @Test
+ void negative_127_length_throws() {
+ final ByteBuffer f = ByteBuffer.allocate(2 + 8);
+ f.put((byte) 0x82);
+ f.put((byte) 127);
+ f.putLong(-1L);
+ f.flip();
+
+ final WebSocketFrameDecoder d = new WebSocketFrameDecoder(1024);
+ assertThrows(WebSocketProtocolException.class, () -> d.decode(f));
+ }
+
+ @Test
+ void frame_too_large_throws() {
+ final int len = 2000;
+ final ByteBuffer f = ByteBuffer.allocate(2 + 2 + len);
+ f.put((byte) 0x82);
+ f.put((byte) 126);
+ f.putShort((short) len);
+ f.put(new byte[len]);
+ f.flip();
+
+ final WebSocketFrameDecoder d = new WebSocketFrameDecoder(1024); // max frame size smaller than len
+ assertThrows(WebSocketProtocolException.class, () -> d.decode(f));
+ }
+}
diff --git a/httpclient5-websocket/src/test/java/org/apache/hc/client5/http/websocket/core/frame/FrameWriterTest.java b/httpclient5-websocket/src/test/java/org/apache/hc/client5/http/websocket/core/frame/FrameWriterTest.java
new file mode 100644
index 0000000000..2e38e2d4bf
--- /dev/null
+++ b/httpclient5-websocket/src/test/java/org/apache/hc/client5/http/websocket/core/frame/FrameWriterTest.java
@@ -0,0 +1,187 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation. For more
+ * information on the Apache Software Foundation, please see
+ * .
+ *
+ */
+package org.apache.hc.client5.http.websocket.core.frame;
+
+import static org.apache.hc.client5.http.websocket.core.frame.FrameHeaderBits.RSV1;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import org.junit.jupiter.api.Test;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+
+
+class FrameWriterTest {
+
+ private static class Parsed {
+ int b0;
+ int b1;
+ int opcode;
+ boolean fin;
+ boolean mask;
+ long len;
+ final byte[] maskKey = new byte[4];
+ int headerLen;
+ ByteBuffer payloadSlice;
+ }
+
+ private static Parsed parse(final ByteBuffer frame) {
+ final ByteBuffer frameCopy = frame.asReadOnlyBuffer();
+ final Parsed r = new Parsed();
+ r.b0 = frameCopy.get() & 0xFF;
+ r.fin = (r.b0 & 0x80) != 0;
+ r.opcode = r.b0 & 0x0F;
+
+ r.b1 = frameCopy.get() & 0xFF;
+ r.mask = (r.b1 & 0x80) != 0;
+ final int low = r.b1 & 0x7F;
+ if (low <= 125) {
+ r.len = low;
+ } else if (low == 126) {
+ r.len = frameCopy.getShort() & 0xFFFF;
+ } else {
+ r.len = frameCopy.getLong();
+ }
+
+ if (r.mask) {
+ frameCopy.get(r.maskKey);
+ }
+ r.headerLen = frameCopy.position();
+ r.payloadSlice = frameCopy.slice();
+ return r;
+ }
+
+ private static byte[] unmask(final Parsed p) {
+ final byte[] out = new byte[(int) p.len];
+ for (int i = 0; i < out.length; i++) {
+ int b = p.payloadSlice.get(i) & 0xFF;
+ b ^= p.maskKey[i & 3] & 0xFF;
+ out[i] = (byte) b;
+ }
+ return out;
+ }
+
+ @Test
+ void text_small_masked_roundtrip() {
+ final WebSocketFrameWriter w = new WebSocketFrameWriter();
+ final ByteBuffer f = w.text("hello", true);
+ final Parsed p = parse(f);
+ assertTrue(p.fin);
+ assertEquals(FrameOpcode.TEXT, p.opcode);
+ assertTrue(p.mask, "client frame must be masked");
+ assertEquals(5, p.len);
+ assertArrayEquals("hello".getBytes(StandardCharsets.UTF_8), unmask(p));
+ }
+
+ @Test
+ void binary_len_126_masked_roundtrip() {
+ final byte[] payload = new byte[300];
+ for (int i = 0; i < payload.length; i++) {
+ payload[i] = (byte) (i & 0xFF);
+ }
+
+ final WebSocketFrameWriter w = new WebSocketFrameWriter();
+ final ByteBuffer f = w.binary(ByteBuffer.wrap(payload), true);
+
+ final Parsed p = parse(f);
+ assertTrue(p.mask);
+ assertEquals(FrameOpcode.BINARY, p.opcode);
+ assertEquals(300, p.len);
+ assertArrayEquals(payload, unmask(p));
+ }
+
+ @Test
+ void binary_len_127_masked_roundtrip() {
+ final int len = 70000;
+ final byte[] payload = new byte[len];
+ java.util.Arrays.fill(payload, (byte) 0xA5);
+
+ final WebSocketFrameWriter w = new WebSocketFrameWriter();
+ final ByteBuffer f = w.binary(ByteBuffer.wrap(payload), true);
+
+ final Parsed p = parse(f);
+ assertTrue(p.mask);
+ assertEquals(FrameOpcode.BINARY, p.opcode);
+ assertEquals(len, p.len);
+ assertArrayEquals(payload, unmask(p));
+ }
+
+ @Test
+ void rsv1_set_with_frameWithRSV() {
+ final WebSocketFrameWriter w = new WebSocketFrameWriter();
+ final ByteBuffer payload = StandardCharsets.UTF_8.encode("x");
+ // Use RSV1 bit
+ final ByteBuffer f = w.frameWithRSV(FrameOpcode.TEXT, payload, true, true, RSV1);
+ final Parsed p = parse(f);
+ assertTrue(p.fin);
+ assertEquals(FrameOpcode.TEXT, p.opcode);
+ assertTrue((p.b0 & RSV1) != 0, "RSV1 must be set");
+ assertArrayEquals("x".getBytes(StandardCharsets.UTF_8), unmask(p));
+ }
+
+ @Test
+ void close_frame_contains_code_and_reason() {
+ final WebSocketFrameWriter w = new WebSocketFrameWriter();
+ final ByteBuffer f = w.close(1000, "done");
+ final Parsed p = parse(f);
+ assertTrue(p.mask);
+ assertEquals(FrameOpcode.CLOSE, p.opcode);
+ assertTrue(p.len >= 2);
+
+ final byte[] raw = unmask(p);
+ final int code = (raw[0] & 0xFF) << 8 | raw[1] & 0xFF;
+ final String reason = new String(raw, 2, raw.length - 2, StandardCharsets.UTF_8);
+
+ assertEquals(1000, code);
+ assertEquals("done", reason);
+ }
+
+ @Test
+ void closeEcho_masks_and_preserves_payload() {
+ // Build a close payload manually
+ final byte[] reason = "bye".getBytes(StandardCharsets.UTF_8);
+ final ByteBuffer payload = ByteBuffer.allocate(2 + reason.length);
+ payload.put((byte) (1000 >>> 8));
+ payload.put((byte) (1000 & 0xFF));
+ payload.put(reason);
+ payload.flip();
+
+ final WebSocketFrameWriter w = new WebSocketFrameWriter();
+ final ByteBuffer f = w.closeEcho(payload);
+ final Parsed p = parse(f);
+
+ assertTrue(p.mask);
+ assertEquals(FrameOpcode.CLOSE, p.opcode);
+ assertEquals(2 + reason.length, p.len);
+
+ final byte[] got = unmask(p);
+ assertEquals(1000, (got[0] & 0xFF) << 8 | got[1] & 0xFF);
+ assertEquals("bye", new String(got, 2, got.length - 2, StandardCharsets.UTF_8));
+ }
+}
diff --git a/httpclient5-websocket/src/test/java/org/apache/hc/client5/http/websocket/core/message/CloseCodecTest.java b/httpclient5-websocket/src/test/java/org/apache/hc/client5/http/websocket/core/message/CloseCodecTest.java
new file mode 100644
index 0000000000..8f813af0c3
--- /dev/null
+++ b/httpclient5-websocket/src/test/java/org/apache/hc/client5/http/websocket/core/message/CloseCodecTest.java
@@ -0,0 +1,57 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation. For more
+ * information on the Apache Software Foundation, please see
+ * .
+ *
+ */
+package org.apache.hc.client5.http.websocket.core.message;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+
+import org.junit.jupiter.api.Test;
+
+final class CloseCodecTest {
+
+ @Test
+ void readEmptyIs1005() {
+ final ByteBuffer empty = ByteBuffer.allocate(0);
+ assertEquals(1005, CloseCodec.readCloseCode(empty.asReadOnlyBuffer()));
+ assertEquals("", CloseCodec.readCloseReason(empty.asReadOnlyBuffer()));
+ }
+
+ @Test
+ void readCodeAndReason() {
+ final ByteBuffer payload = ByteBuffer.allocate(2 + 4);
+ payload.put((byte) 0x03).put((byte) 0xE8); // 1000
+ payload.put(StandardCharsets.UTF_8.encode("done"));
+ payload.flip();
+
+ // Use the SAME buffer so the position advances
+ final ByteBuffer buf = payload.asReadOnlyBuffer();
+ assertEquals(1000, CloseCodec.readCloseCode(buf)); // advances position by 2
+ assertEquals("done", CloseCodec.readCloseReason(buf)); // reads remaining bytes only
+ }
+}
diff --git a/httpclient5-websocket/src/test/java/org/apache/hc/client5/http/websocket/example/WebSocketEchoClient.java b/httpclient5-websocket/src/test/java/org/apache/hc/client5/http/websocket/example/WebSocketEchoClient.java
new file mode 100644
index 0000000000..cb621c16aa
--- /dev/null
+++ b/httpclient5-websocket/src/test/java/org/apache/hc/client5/http/websocket/example/WebSocketEchoClient.java
@@ -0,0 +1,126 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation. For more
+ * information on the Apache Software Foundation, please see
+ * .
+ *
+ */
+package org.apache.hc.client5.http.websocket.example;
+
+import java.net.URI;
+import java.nio.ByteBuffer;
+import java.nio.CharBuffer;
+import java.nio.charset.StandardCharsets;
+import java.time.Instant;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hc.client5.http.websocket.api.WebSocket;
+import org.apache.hc.client5.http.websocket.api.WebSocketClientConfig;
+import org.apache.hc.client5.http.websocket.api.WebSocketListener;
+import org.apache.hc.client5.http.websocket.client.CloseableWebSocketClient;
+import org.apache.hc.client5.http.websocket.client.WebSocketClientBuilder;
+import org.apache.hc.core5.util.TimeValue;
+import org.apache.hc.core5.util.Timeout;
+
+public final class WebSocketEchoClient {
+
+ public static void main(final String[] args) throws Exception {
+ final URI uri = URI.create(args.length > 0 ? args[0] : "ws://localhost:8080/echo");
+ final CountDownLatch done = new CountDownLatch(1);
+
+ final WebSocketClientConfig cfg = WebSocketClientConfig.custom()
+ .enablePerMessageDeflate(true)
+ .offerServerNoContextTakeover(true)
+ .offerClientNoContextTakeover(true)
+ .offerClientMaxWindowBits(15)
+ .setCloseWaitTimeout(Timeout.ofMilliseconds(200))
+ .build();
+
+ try (final CloseableWebSocketClient client = WebSocketClientBuilder.create()
+ .defaultConfig(cfg)
+ .build()) {
+
+ System.out.println("[TEST] connecting: " + uri);
+ client.start();
+
+ client.connect(uri, new WebSocketListener() {
+ private WebSocket ws;
+
+ @Override
+ public void onOpen(final WebSocket ws) {
+ this.ws = ws;
+ System.out.println("[TEST] open: " + uri);
+
+ final String prefix = "hello from hc5 WS @ " + Instant.now() + " — ";
+ final StringBuilder sb = new StringBuilder();
+ for (int i = 0; i < 256; i++) {
+ sb.append(prefix);
+ }
+ final String msg = sb.toString();
+
+ ws.sendText(msg, true);
+ System.out.println("[TEST] sent (chars=" + msg.length() + ")");
+ }
+
+ @Override
+ public void onText(final CharBuffer text, final boolean last) {
+ final int len = text.length();
+ final CharSequence preview = len > 120 ? text.subSequence(0, 120) + "…" : text;
+ System.out.println("[TEST] text (chars=" + len + "): " + preview);
+ ws.close(1000, "done");
+ }
+
+ @Override
+ public void onPong(final ByteBuffer payload) {
+ System.out.println("[TEST] pong: " + StandardCharsets.UTF_8.decode(payload));
+ }
+
+ @Override
+ public void onClose(final int code, final String reason) {
+ System.out.println("[TEST] close: " + code + " " + reason);
+ done.countDown();
+ }
+
+ @Override
+ public void onError(final Throwable ex) {
+ ex.printStackTrace(System.err);
+ done.countDown();
+ }
+ }, cfg).exceptionally(ex -> {
+ done.countDown();
+ return null;
+ });
+
+ if (!done.await(12, TimeUnit.SECONDS)) {
+ System.err.println("[TEST] Timed out waiting for echo/close");
+ System.exit(1);
+ }
+
+ // Tidy shutdown: ask for shutdown, then wait briefly for the reactor to stop.
+ // Try-with-resources will still call close(GRACEFUL) at the end.
+ client.initiateShutdown();
+ client.awaitShutdown(TimeValue.ofSeconds(2));
+ }
+ }
+}
+
diff --git a/httpclient5-websocket/src/test/java/org/apache/hc/client5/http/websocket/example/WebSocketEchoServer.java b/httpclient5-websocket/src/test/java/org/apache/hc/client5/http/websocket/example/WebSocketEchoServer.java
new file mode 100644
index 0000000000..d592ce6b87
--- /dev/null
+++ b/httpclient5-websocket/src/test/java/org/apache/hc/client5/http/websocket/example/WebSocketEchoServer.java
@@ -0,0 +1,118 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation. For more
+ * information on the Apache Software Foundation, please see
+ * .
+ *
+ */
+package org.apache.hc.client5.http.websocket.example;
+
+import java.nio.ByteBuffer;
+
+import org.eclipse.jetty.server.Server;
+import org.eclipse.jetty.servlet.ServletContextHandler;
+import org.eclipse.jetty.servlet.ServletHolder;
+import org.eclipse.jetty.websocket.api.Session;
+import org.eclipse.jetty.websocket.api.WebSocketAdapter;
+import org.eclipse.jetty.websocket.servlet.WebSocketServlet;
+import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory;
+
+/**
+ * WebSocketEchoServer
+ *
+ * A tiny embedded Jetty WebSocket server that echoes back any TEXT or BINARY message
+ * it receives. This is intended for local development and interoperability testing of
+ * {@code WebSocketClient} and is not production hardened.
+ *
+ * Features
+ *
+ * - HTTP upgrade to RFC 6455 WebSocket on path {@code /echo}
+ * - Echoes TEXT and BINARY frames
+ * - Compatible with permessage-deflate (RFC 7692); Jetty will negotiate it if offered
+ *
+ *
+ * Usage
+ *
+ * # Default port 8080
+ * java -cp ... org.apache.hc.client5.http.websocket.example.WebSocketEchoServer
+ *
+ * # Custom port
+ * java -cp ... org.apache.hc.client5.http.websocket.example.WebSocketEchoServer 9090
+ *
+ *
+ * Once started, the server listens on {@code ws://localhost:<port>/echo}.
+ *
+ * Notes
+ *
+ * - If the port is already in use, Jetty will fail to start with {@code BindException}.
+ * - Idle timeout is set to 30 seconds for simplicity.
+ *
+ */
+public final class WebSocketEchoServer {
+
+ public static void main(final String[] args) throws Exception {
+ final int port = args.length > 0 ? Integer.parseInt(args[0]) : 8080;
+
+ final Server server = new Server(port);
+ final ServletContextHandler ctx = new ServletContextHandler(ServletContextHandler.SESSIONS);
+ ctx.setContextPath("/");
+ server.setHandler(ctx);
+
+ ctx.addServlet(new ServletHolder(new EchoServlet()), "/echo");
+ server.start();
+ System.out.println("[WS-Server] up at ws://localhost:" + port + "/echo");
+ server.join();
+ }
+
+ /**
+ * Simple servlet that wires a Jetty WebSocket endpoint at {@code /echo}.
+ */
+ public static final class EchoServlet extends WebSocketServlet {
+ @Override
+ public void configure(final WebSocketServletFactory factory) {
+ factory.getPolicy().setIdleTimeout(30_000);
+ // Jetty will negotiate permessage-deflate automatically if supported.
+ factory.setCreator((req, resp) -> new EchoSocket());
+ }
+ }
+
+ /**
+ * Echoes back text and binary messages.
+ */
+ public static final class EchoSocket extends WebSocketAdapter {
+ @Override
+ public void onWebSocketText(final String msg) {
+ final Session s = getSession();
+ if (s != null && s.isOpen()) {
+ s.getRemote().sendString(msg, null);
+ }
+ }
+
+ @Override
+ public void onWebSocketBinary(final byte[] payload, final int off, final int len) {
+ final Session s = getSession();
+ if (s != null && s.isOpen()) {
+ s.getRemote().sendBytes(ByteBuffer.wrap(payload, off, len), null);
+ }
+ }
+ }
+}
diff --git a/httpclient5-websocket/src/test/java/org/apache/hc/client5/http/websocket/transport/WsDecoderTest.java b/httpclient5-websocket/src/test/java/org/apache/hc/client5/http/websocket/transport/WsDecoderTest.java
new file mode 100644
index 0000000000..ebe8eea001
--- /dev/null
+++ b/httpclient5-websocket/src/test/java/org/apache/hc/client5/http/websocket/transport/WsDecoderTest.java
@@ -0,0 +1,104 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation. For more
+ * information on the Apache Software Foundation, please see
+ * .
+ *
+ */
+package org.apache.hc.client5.http.websocket.transport;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hc.client5.http.websocket.core.exceptions.WebSocketProtocolException;
+import org.apache.hc.client5.http.websocket.core.frame.FrameOpcode;
+import org.junit.jupiter.api.Test;
+
+class WsDecoderTest {
+
+ @Test
+ void serverMaskedFrame_isRejected() {
+ // Build a minimal TEXT frame with MASK bit set (which servers MUST NOT set).
+ // 0x81 FIN|TEXT, 0x80 | 0 = mask + length 0, then 4-byte masking key.
+ final ByteBuffer buf = ByteBuffer.allocate(2 + 4);
+ buf.put((byte) 0x81);
+ buf.put((byte) 0x80); // MASK set, len=0
+ buf.putInt(0x11223344);
+ buf.flip();
+
+ final WebSocketFrameDecoder d = new WebSocketFrameDecoder(8192);
+ assertThrows(WebSocketProtocolException.class, () -> d.decode(buf));
+ }
+
+ @Test
+ void extendedLen_126_and_127_parse() {
+ // A FIN|BINARY with 126 length, len=300
+ final byte[] payload = new byte[300];
+ for (int i = 0; i < payload.length; i++) {
+ payload[i] = (byte) (i & 0xFF);
+ }
+
+ final ByteBuffer f126 = ByteBuffer.allocate(2 + 2 + payload.length);
+ f126.put((byte) 0x82); // FIN+BINARY
+ f126.put((byte) 126);
+ f126.putShort((short) payload.length);
+ f126.put(payload);
+ f126.flip();
+
+ final WebSocketFrameDecoder d = new WebSocketFrameDecoder(4096);
+ assertTrue(d.decode(f126));
+ assertEquals(FrameOpcode.BINARY, d.opcode());
+ assertEquals(payload.length, d.payload().remaining());
+
+ // Now 127 with len=65540 (> 0xFFFF)
+ final int big = 65540;
+ final byte[] p2 = new byte[big];
+ final ByteBuffer f127 = ByteBuffer.allocate(2 + 8 + p2.length);
+ f127.put((byte) 0x82);
+ f127.put((byte) 127);
+ f127.putLong(big);
+ f127.put(p2);
+ f127.flip();
+
+ final WebSocketFrameDecoder d2 = new WebSocketFrameDecoder(big + 32);
+ assertTrue(d2.decode(f127));
+ assertEquals(big, d2.payload().remaining());
+ }
+
+ @Test
+ void partialBuffer_returnsFalse_and_consumesNothing() {
+ final ByteBuffer f = ByteBuffer.allocate(2);
+ f.put((byte) 0x81);
+ f.put((byte) 0x7E); // says 126, but no length bytes present
+ f.flip();
+
+ final WebSocketFrameDecoder d = new WebSocketFrameDecoder(1024);
+ // Should mark/reset and return false; buffer remains at same position after call (no throw).
+ final int posBefore = f.position();
+ assertFalse(d.decode(f));
+ assertEquals(posBefore, f.position());
+ }
+}
diff --git a/httpclient5-websocket/src/test/resources/log4j2.xml b/httpclient5-websocket/src/test/resources/log4j2.xml
new file mode 100644
index 0000000000..ce9e796abd
--- /dev/null
+++ b/httpclient5-websocket/src/test/resources/log4j2.xml
@@ -0,0 +1,36 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/pom.xml b/pom.xml
index 24e9be9aab..351e1f9d36 100644
--- a/pom.xml
+++ b/pom.xml
@@ -85,6 +85,7 @@
1.55.0
1.26.2
2.9.3
+ 9.4.54.v20240208
@@ -135,6 +136,11 @@
httpclient5-sse
${project.version}
+
+ org.apache.httpcomponents.client5
+ httpclient5-websocket
+ ${project.version}
+
org.slf4j
slf4j-api
@@ -265,6 +271,22 @@
caffeine
${caffeine.version}
+
+ org.eclipse.jetty
+ jetty-servlet
+ ${jetty.version}
+
+
+ org.eclipse.jetty.websocket
+ websocket-server
+ ${jetty.version}
+
+
+ org.eclipse.jetty
+ jetty-server
+ ${jetty.version}
+ test
+
@@ -273,6 +295,7 @@
httpclient5-sse
httpclient5-observation
httpclient5-fluent
+ httpclient5-websocket
httpclient5-cache
httpclient5-testing
@@ -498,6 +521,10 @@
Apache HttpClient SSE
org.apache.hc.client5.http.sse*
+
+ Apache HttpClient SSE
+ org.apache.hc.client5.http.websocket*
+