Skip to content
This repository was archived by the owner on Dec 10, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,12 @@ class BukkitCloudPlayerManagerImpl :
override fun createPlayer(
uuid: UUID,
name: String,
proxy: Boolean,
ip: Inet4Address,
serverName: String
proxyName: String?,
serverName: String?,
ip: Inet4Address
) = BukkitClientCloudPlayerImpl(uuid, name).also {
if (proxy) {
it.proxyServerName = serverName
} else {
it.serverName = serverName
}
it.proxyServerName = serverName
Copy link

Copilot AI Nov 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The proxy and server names are being swapped. Line 27 should set it.proxyServerName = proxyName and line 28 should set it.serverName = serverName. Currently both are incorrectly set to serverName.

Suggested change
it.proxyServerName = serverName
it.proxyServerName = proxyName

Copilot uses AI. Check for mistakes.
it.serverName = serverName
}

override fun getAudience(uuid: UUID): Audience? = Bukkit.getPlayer(uuid)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,26 @@
import dev.slne.surf.cloud.api.common.netty.packet.NettyPacket
import dev.slne.surf.cloud.api.common.netty.packet.NettyPacketInfo
import dev.slne.surf.cloud.core.client.netty.ClientNettyClientImpl
import dev.slne.surf.cloud.core.client.player.commonPlayerManagerImpl
import dev.slne.surf.cloud.core.client.server.ClientCloudServerImpl
import dev.slne.surf.cloud.core.client.server.ClientProxyCloudServerImpl
import dev.slne.surf.cloud.core.client.server.serverManagerImpl
import dev.slne.surf.cloud.core.client.sync.SyncRegistryImpl
import dev.slne.surf.cloud.core.common.coroutines.BeforeStartTaskScope
import dev.slne.surf.cloud.core.common.coroutines.PacketHandlerScope
import dev.slne.surf.cloud.core.common.coroutines.SynchronizeTasksScope
import dev.slne.surf.cloud.core.common.netty.network.ConnectionImpl
import dev.slne.surf.cloud.core.common.netty.network.protocol.common.ClientboundSetVelocitySecretPacket
import dev.slne.surf.cloud.core.common.netty.network.protocol.running.*
import dev.slne.surf.cloud.core.common.netty.network.protocol.synchronizing.*
import dev.slne.surf.cloud.core.common.netty.registry.listener.NettyListenerRegistry
import dev.slne.surf.cloud.core.common.plugin.task.CloudSynchronizeTaskManager
import dev.slne.surf.surfapi.core.api.util.logger
import dev.slne.surf.surfapi.core.api.util.mutableObjectListOf
import dev.slne.surf.surfapi.core.api.util.mutableObjectSetOf
import kotlinx.coroutines.launch
import net.kyori.adventure.nbt.BinaryTagIO
import java.util.*
import java.util.concurrent.atomic.AtomicBoolean

class ClientSynchronizingPacketListenerImpl(
override val client: ClientNettyClientImpl,
Expand All @@ -27,11 +33,18 @@
) : ClientCommonPacketListenerImpl(connection), ClientSynchronizingPacketListener {

private val log = logger()
private val hydratingPlayers = AtomicBoolean(false)
private val pendingHydrationPlayers =
mutableObjectListOf<ClientboundSyncPlayerHydrationChunkPacket.Entry>()

private var currentLargePpdcUuid: UUID? = null
private var currentLargePpdc: ByteArray? = null
private val pendingLargePpdcs = mutableObjectSetOf<UUID>()

fun startSynchronizing() {
statusUpdater.switchState(AbstractStatusUpdater.State.SYNCHRONIZING)

BeforeStartTaskScope.launch {
SynchronizeTasksScope.launch {
CloudSynchronizeTaskManager.executeTasks(client)

statusUpdater.switchState(AbstractStatusUpdater.State.SYNCHRONIZE_WAIT_FOR_SERVER)
Expand Down Expand Up @@ -134,6 +147,117 @@
TODO("Not yet implemented")
}

override fun handleSyncPlayerHydrationStart(packet: ClientboundSyncPlayerHydrationStartPacket) {
if (!hydratingPlayers.compareAndSet(false, true)) {
log.atWarning()
.log("Tried to start player hydration twice")
return
}
}

override fun handleSyncPlayerHydrationChunk(packet: ClientboundSyncPlayerHydrationChunkPacket) {
if (!hydratingPlayers.get()) {
log.atWarning()
.log("Received player hydration chunk before start")
return
}

pendingHydrationPlayers.addAll(packet.entries)
}

override fun handleSyncPlayerHydrationEnd(packet: ClientboundSyncPlayerHydrationEndPacket) {
if (!hydratingPlayers.compareAndSet(true, false)) {
log.atWarning()
.log("Tried to end player hydration twice")
return
}

for (data in pendingHydrationPlayers) {
val player = commonPlayerManagerImpl.createExistingPlayer(
data.uuid,
data.name,
data.playerIp,
data.serverName,
data.proxyName
)

data.pdcOrCallback.ifLeft { tag ->
player.overwritePpdc(tag)
}.ifRight { callback ->
pendingLargePpdcs.add(callback)
}
}

pendingHydrationPlayers.clear()
}

override fun handleSyncLargerPlayerPersistentDataContainerStart(packet: ClientboundSyncLargePlayerPersistentDataContainerStartPacket) {
if (currentLargePpdcUuid != null) {
log.atWarning()
.log("Received start of large PPD container before end of previous one (%s)", currentLargePpdcUuid)

Check notice on line 197 in surf-cloud-core/surf-cloud-core-client/src/main/kotlin/dev/slne/surf/cloud/core/client/netty/network/ClientSynchronizingPacketListenerImpl.kt

View workflow job for this annotation

GitHub Actions / Qodana for JVM

Incorrect formatting

Missing whitespace

Check notice on line 197 in surf-cloud-core/surf-cloud-core-client/src/main/kotlin/dev/slne/surf/cloud/core/client/netty/network/ClientSynchronizingPacketListenerImpl.kt

View workflow job for this annotation

GitHub Actions / Qodana for JVM

Incorrect formatting

Missing whitespace

Check notice on line 197 in surf-cloud-core/surf-cloud-core-client/src/main/kotlin/dev/slne/surf/cloud/core/client/netty/network/ClientSynchronizingPacketListenerImpl.kt

View workflow job for this annotation

GitHub Actions / Qodana for JVM

Incorrect formatting

Incorrect whitespace
return
}

currentLargePpdcUuid = packet.playerUuid
currentLargePpdc = null
}

override fun handleSyncLargerPlayerPersistentDataContainerChunk(packet: ClientboundSyncLargePlayerPersistentDataContainerChunkPacket) {
if (currentLargePpdcUuid == null) {
log.atWarning()
.log("Received chunk of large PPD container before start")
return
}

val existing = currentLargePpdc
val payload = packet.payload

currentLargePpdc = if (existing == null) {
payload
} else {
existing + payload
}
}

override fun handleSyncLargerPlayerPersistentDataContainerEnd(packet: ClientboundSyncLargePlayerPersistentDataContainerEndPacket) {
val uuid = currentLargePpdcUuid
val payload = currentLargePpdc
if (uuid == null || payload == null) {
log.atWarning()
.log("Received end of large PPD container before start")
return
}

currentLargePpdcUuid = null
currentLargePpdc = null

pendingLargePpdcs.remove(uuid)
val player = commonPlayerManagerImpl.getPlayer(uuid)
if (player == null) {
log.atWarning()
.log("Received large PPD container end for unknown player (%s)", uuid)
return
}

val tag = payload.inputStream().use { stream ->
BinaryTagIO.reader().read(stream)
}

player.overwritePpdc(tag)
}

override fun handleSynchronizePlayerMutes(packet: ClientboundSynchronizePlayerMutes) {
val player = commonPlayerManagerImpl.getPlayer(packet.playerUuid)

if (player == null) {
log.atWarning()
.log("Received mute update for unknown player (%s)", packet.playerUuid)
return
}

player.punishmentManager.updateMutes(packet.mutes)
}

override fun handlePacket(packet: NettyPacket) {
val listeners = NettyListenerRegistry.getListeners(packet.javaClass) ?: return
if (listeners.isEmpty()) return
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,11 @@ abstract class CommonClientCloudPlayerManagerImpl<Platform : Audience, P : Clien
player.serverName = serverName
}

override fun removeProxyServer(
player: P,
serverName: String
) {
override fun removeProxyServer(player: P) {
player.proxyServerName = null
}

override fun removeServer(
player: P,
serverName: String
) {
override fun removeServer(player: P) {
player.serverName = null
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package dev.slne.surf.cloud.core.common.coroutines

import dev.slne.surf.cloud.api.common.util.threadFactory
import dev.slne.surf.cloud.core.common.coroutines.BeforeStartTaskScope.unnamedTask
import dev.slne.surf.cloud.core.common.coroutines.SynchronizeTasksScope.unnamedTask
import dev.slne.surf.surfapi.core.api.util.logger
import dev.slne.surf.surfapi.core.api.util.mutableObjectListOf
import kotlinx.coroutines.*
Expand All @@ -22,13 +22,13 @@
@JvmStatic
protected val log = logger()
private val scopes = mutableObjectListOf<BaseScope>()
fun terminateAll() {

Check warning on line 25 in surf-cloud-core/surf-cloud-core-common/src/main/kotlin/dev/slne/surf/cloud/core/common/coroutines/scopes.kt

View workflow job for this annotation

GitHub Actions / Qodana for JVM

Unused symbol

Function "terminateAll" is never used
scopes.forEach { it.cancel("Shutdown") }
}
}

init {
scopes.add(this)

Check notice on line 31 in surf-cloud-core/surf-cloud-core-common/src/main/kotlin/dev/slne/surf/cloud/core/common/coroutines/scopes.kt

View workflow job for this annotation

GitHub Actions / Qodana for JVM

Leaking 'this' in constructor

Leaking 'this' in constructor of non-final class BaseScope
}

override val coroutineContext =
Expand Down Expand Up @@ -65,7 +65,7 @@
name = "netty-tick"
)

object QueueProcessingScope : BaseScope(

Check warning on line 68 in surf-cloud-core/surf-cloud-core-common/src/main/kotlin/dev/slne/surf/cloud/core/common/coroutines/scopes.kt

View workflow job for this annotation

GitHub Actions / Qodana for JVM

Unused symbol

Object "QueueProcessingScope" is never used
dispatcher = Executors.newSingleThreadExecutor(threadFactory {
nameFormat("queue-thread-%d")
daemon(false)
Expand All @@ -73,7 +73,7 @@
name = "queue"
)

object QueueDisplayScope : BaseScope(

Check warning on line 76 in surf-cloud-core/surf-cloud-core-common/src/main/kotlin/dev/slne/surf/cloud/core/common/coroutines/scopes.kt

View workflow job for this annotation

GitHub Actions / Qodana for JVM

Unused symbol

Object "QueueDisplayScope" is never used
dispatcher = Executors.newSingleThreadExecutor(threadFactory {
nameFormat("queue-display-thread-%d")
daemon(false)
Expand Down Expand Up @@ -122,7 +122,7 @@
name = "ktor"
)

object NameHistoryScope : BaseScope(

Check warning on line 125 in surf-cloud-core/surf-cloud-core-common/src/main/kotlin/dev/slne/surf/cloud/core/common/coroutines/scopes.kt

View workflow job for this annotation

GitHub Actions / Qodana for JVM

Unused symbol

Object "NameHistoryScope" is never used
dispatcher = Dispatchers.IO,
name = "name-history"
)
Expand All @@ -147,7 +147,7 @@
name = "common-punishment-handlers"
)

object PunishmentDatabaseScope : BaseScope(

Check warning on line 150 in surf-cloud-core/surf-cloud-core-common/src/main/kotlin/dev/slne/surf/cloud/core/common/coroutines/scopes.kt

View workflow job for this annotation

GitHub Actions / Qodana for JVM

Unused symbol

Object "PunishmentDatabaseScope" is never used
dispatcher = Dispatchers.IO,
name = "punishment-database"
)
Expand All @@ -172,14 +172,14 @@
name = "punishment-cache-refresh"
)

object BeforeStartTaskScope : BaseScope(
dispatcher = Dispatchers.IO,
name = "before-start-task",
object SynchronizeTasksScope : BaseScope(
dispatcher = Dispatchers.Default,
name = "synchronize-tasks",
coroutineExceptionHandler = CoroutineExceptionHandler { context, throwable ->
val task = context[TaskName] ?: unnamedTask
log.atWarning()
.withCause(throwable)
.log("Unhandled exception in before start task: $task")
.log("Unhandled exception in synchronize task: $task")
}
) {
@JvmStatic
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,35 +7,18 @@ import dev.slne.surf.cloud.api.common.netty.packet.ResponseNettyPacket
import dev.slne.surf.cloud.api.common.server.CloudServer
import dev.slne.surf.cloud.api.common.server.CommonCloudServer
import dev.slne.surf.cloud.core.common.netty.network.ConnectionImpl
import dev.slne.surf.surfapi.core.api.util.mutableObject2ObjectMapOf
import dev.slne.surf.surfapi.core.api.util.synchronize
import kotlinx.coroutines.CompletableDeferred
import java.net.InetSocketAddress
import java.util.concurrent.ConcurrentLinkedQueue
import kotlin.time.Duration

abstract class CommonNettyClientImpl(
override val serverCategory: String,
override val serverName: String
) : NettyClient {
private val packetQueue by lazy { mutableObject2ObjectMapOf<NettyPacket, CompletableDeferred<Boolean>?>().synchronize() }

@Volatile
private var _connection: ConnectionImpl? = null
set(value) {
field = value

if (value != null) {
synchronized(packetQueue) {
packetQueue.forEach { (packet, deferred) ->
if (deferred != null) {
value.sendWithIndication(packet, deferred = deferred)
} else {
value.send(packet)
}
}
packetQueue.clear()
}
}
}
private val packetQueue = ConcurrentLinkedQueue<QueuedPacket>()

override val connection get() = _connection ?: error("connection not yet set")

Expand All @@ -47,29 +30,39 @@ abstract class CommonNettyClientImpl(

override fun fireAndForget(packet: NettyPacket) {
val connection = _connection
if (connection == null) {
packetQueue[packet] = null
} else {
if (connection != null) {
connection.send(packet)
return
}

packetQueue.add(QueuedPacket(packet, null))

val connectionNow = _connection
if (connectionNow != null) {
drainQueue(connectionNow)
}
}

override suspend fun fire(packet: NettyPacket, convertExceptions: Boolean): Boolean {
val connection = _connection
if (connection == null) {
val result = runCatching {
val deferred = CompletableDeferred<Boolean>()
packetQueue[packet] = deferred
deferred.await()
}
if (connection != null) {
return connection.sendWithIndication(packet, convertExceptions)
}

if (convertExceptions) {
return result.getOrDefault(false)
}
val deferred = CompletableDeferred<Boolean>()
packetQueue.add(QueuedPacket(packet, deferred))

return result.getOrThrow()
val connectionNow = _connection
if (connectionNow != null) {
drainQueue(connectionNow)
}

val result = runCatching { deferred.await() }

return if (convertExceptions) {
result.getOrDefault(false)
} else {
return connection.sendWithIndication(packet, convertExceptions)
result.getOrThrow()
}
}

Expand All @@ -85,5 +78,24 @@ abstract class CommonNettyClientImpl(
fun initConnection(connection: ConnectionImpl) {
check(_connection == null) { "Connection already set" }
_connection = connection
drainQueue(connection)
}


private fun drainQueue(connection: ConnectionImpl) {
while (true) {
val queued = packetQueue.poll() ?: break
val (packet, deferred) = queued
if (deferred != null) {
connection.sendWithIndication(packet, deferred = deferred)
} else {
connection.send(packet)
}
}
}

private data class QueuedPacket(
val packet: NettyPacket,
val deferred: CompletableDeferred<Boolean>?
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@

private var disconnectionHandled = false

private var packetsReceived = 0

Check warning on line 99 in surf-cloud-core/surf-cloud-core-common/src/main/kotlin/dev/slne/surf/cloud/core/common/netty/network/ConnectionImpl.kt

View workflow job for this annotation

GitHub Actions / Qodana for JVM

Unused symbol

Property "packetsReceived" is never used
override var receivedPackets = 0
private set
override var sentPackets = 0
Expand Down Expand Up @@ -125,7 +125,7 @@
var disconnectionDetails: DisconnectionDetails? = null
private set

var encrypted: Boolean = false

Check warning on line 128 in surf-cloud-core/surf-cloud-core-common/src/main/kotlin/dev/slne/surf/cloud/core/common/netty/network/ConnectionImpl.kt

View workflow job for this annotation

GitHub Actions / Qodana for JVM

Unused symbol

Property "encrypted" is never used
private set

@Volatile
Expand Down Expand Up @@ -413,7 +413,7 @@
)
}

suspend fun initiateServerboundRunningConnection(

Check warning on line 416 in surf-cloud-core/surf-cloud-core-common/src/main/kotlin/dev/slne/surf/cloud/core/common/netty/network/ConnectionImpl.kt

View workflow job for this annotation

GitHub Actions / Qodana for JVM

Unused symbol

Function "initiateServerboundRunningConnection" is never used
hostname: String,
port: Int,
listener: ClientLoginPacketListener
Expand Down Expand Up @@ -712,7 +712,6 @@
handleDisconnection()
}


this.averageSentPackets = lerp(
0.75f, this.sentPackets.toFloat(),
this.averageSentPackets
Expand Down Expand Up @@ -775,7 +774,7 @@
_channel?.config()?.isAutoRead = false
}

fun enableAutoRead() {

Check warning on line 777 in surf-cloud-core/surf-cloud-core-common/src/main/kotlin/dev/slne/surf/cloud/core/common/netty/network/ConnectionImpl.kt

View workflow job for this annotation

GitHub Actions / Qodana for JVM

Unused symbol

Function "enableAutoRead" is never used
_channel?.config()?.isAutoRead = true
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import dev.slne.surf.cloud.api.common.netty.packet.ResponseNettyPacket
import dev.slne.surf.cloud.api.common.util.netty.UnifiedReadOnlyChannelHandler
import dev.slne.surf.surfapi.core.api.util.logger
import dev.slne.surf.surfapi.core.api.util.mutableObject2ObjectMapOf
import dev.slne.surf.surfapi.core.api.util.synchronize
import io.netty.channel.ChannelHandlerContext
import io.netty.channel.ChannelPromise
import kotlinx.coroutines.CompletableDeferred
Expand All @@ -16,7 +15,7 @@ import java.util.*
class RespondingPacketSendHandler : UnifiedReadOnlyChannelHandler<NettyPacket>() {
private val log = logger()
private val respondingPackets =
mutableObject2ObjectMapOf<UUID, CompletableDeferred<ResponseNettyPacket>>().synchronize()
mutableObject2ObjectMapOf<UUID, CompletableDeferred<ResponseNettyPacket>>()
Copy link

Copilot AI Nov 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The respondingPackets map is accessed from multiple methods (handleRead on line 41 and handleWrite on line 65) that can be called concurrently from Netty's event loop threads. The map should be thread-safe. Consider using ConcurrentHashMap instead of the unsynchronized mutableObject2ObjectMapOf.

Suggested change
mutableObject2ObjectMapOf<UUID, CompletableDeferred<ResponseNettyPacket>>()
ConcurrentHashMap<UUID, CompletableDeferred<ResponseNettyPacket>>()

Copilot uses AI. Check for mistakes.

@Suppress("DEPRECATION")
override fun handleRead(
Expand Down Expand Up @@ -49,7 +48,7 @@ class RespondingPacketSendHandler : UnifiedReadOnlyChannelHandler<NettyPacket>()
}
}

@Suppress("DEPRECATION")
@Suppress("UNCHECKED_CAST")
override fun handleWrite(
ctx: ChannelHandlerContext,
msg: NettyPacket,
Expand Down
Loading
Loading