diff --git a/surf-cloud-bukkit/src/main/kotlin/dev/slne/surf/cloud/bukkit/player/BukkitCloudPlayerManagerImpl.kt b/surf-cloud-bukkit/src/main/kotlin/dev/slne/surf/cloud/bukkit/player/BukkitCloudPlayerManagerImpl.kt index 75b5e8a5..e8859fce 100644 --- a/surf-cloud-bukkit/src/main/kotlin/dev/slne/surf/cloud/bukkit/player/BukkitCloudPlayerManagerImpl.kt +++ b/surf-cloud-bukkit/src/main/kotlin/dev/slne/surf/cloud/bukkit/player/BukkitCloudPlayerManagerImpl.kt @@ -20,15 +20,12 @@ class BukkitCloudPlayerManagerImpl : override fun createPlayer( uuid: UUID, name: String, - proxy: Boolean, - ip: Inet4Address, - serverName: String + proxyName: String?, + serverName: String?, + ip: Inet4Address ) = BukkitClientCloudPlayerImpl(uuid, name).also { - if (proxy) { - it.proxyServerName = serverName - } else { - it.serverName = serverName - } + it.proxyServerName = serverName + it.serverName = serverName } override fun getAudience(uuid: UUID): Audience? = Bukkit.getPlayer(uuid) diff --git a/surf-cloud-core/surf-cloud-core-client/src/main/kotlin/dev/slne/surf/cloud/core/client/netty/network/ClientSynchronizingPacketListenerImpl.kt b/surf-cloud-core/surf-cloud-core-client/src/main/kotlin/dev/slne/surf/cloud/core/client/netty/network/ClientSynchronizingPacketListenerImpl.kt index 741ef926..da5521b8 100644 --- a/surf-cloud-core/surf-cloud-core-client/src/main/kotlin/dev/slne/surf/cloud/core/client/netty/network/ClientSynchronizingPacketListenerImpl.kt +++ b/surf-cloud-core/surf-cloud-core-client/src/main/kotlin/dev/slne/surf/cloud/core/client/netty/network/ClientSynchronizingPacketListenerImpl.kt @@ -4,12 +4,13 @@ import dev.slne.surf.cloud.api.common.netty.network.ConnectionProtocol import dev.slne.surf.cloud.api.common.netty.packet.NettyPacket import dev.slne.surf.cloud.api.common.netty.packet.NettyPacketInfo import dev.slne.surf.cloud.core.client.netty.ClientNettyClientImpl +import dev.slne.surf.cloud.core.client.player.commonPlayerManagerImpl import dev.slne.surf.cloud.core.client.server.ClientCloudServerImpl import dev.slne.surf.cloud.core.client.server.ClientProxyCloudServerImpl import dev.slne.surf.cloud.core.client.server.serverManagerImpl import dev.slne.surf.cloud.core.client.sync.SyncRegistryImpl -import dev.slne.surf.cloud.core.common.coroutines.BeforeStartTaskScope import dev.slne.surf.cloud.core.common.coroutines.PacketHandlerScope +import dev.slne.surf.cloud.core.common.coroutines.SynchronizeTasksScope import dev.slne.surf.cloud.core.common.netty.network.ConnectionImpl import dev.slne.surf.cloud.core.common.netty.network.protocol.common.ClientboundSetVelocitySecretPacket import dev.slne.surf.cloud.core.common.netty.network.protocol.running.* @@ -17,7 +18,12 @@ import dev.slne.surf.cloud.core.common.netty.network.protocol.synchronizing.* import dev.slne.surf.cloud.core.common.netty.registry.listener.NettyListenerRegistry import dev.slne.surf.cloud.core.common.plugin.task.CloudSynchronizeTaskManager import dev.slne.surf.surfapi.core.api.util.logger +import dev.slne.surf.surfapi.core.api.util.mutableObjectListOf +import dev.slne.surf.surfapi.core.api.util.mutableObjectSetOf import kotlinx.coroutines.launch +import net.kyori.adventure.nbt.BinaryTagIO +import java.util.* +import java.util.concurrent.atomic.AtomicBoolean class ClientSynchronizingPacketListenerImpl( override val client: ClientNettyClientImpl, @@ -27,11 +33,18 @@ class ClientSynchronizingPacketListenerImpl( ) : ClientCommonPacketListenerImpl(connection), ClientSynchronizingPacketListener { private val log = logger() + private val hydratingPlayers = AtomicBoolean(false) + private val pendingHydrationPlayers = + mutableObjectListOf() + + private var currentLargePpdcUuid: UUID? = null + private var currentLargePpdc: ByteArray? = null + private val pendingLargePpdcs = mutableObjectSetOf() fun startSynchronizing() { statusUpdater.switchState(AbstractStatusUpdater.State.SYNCHRONIZING) - BeforeStartTaskScope.launch { + SynchronizeTasksScope.launch { CloudSynchronizeTaskManager.executeTasks(client) statusUpdater.switchState(AbstractStatusUpdater.State.SYNCHRONIZE_WAIT_FOR_SERVER) @@ -134,6 +147,117 @@ class ClientSynchronizingPacketListenerImpl( TODO("Not yet implemented") } + override fun handleSyncPlayerHydrationStart(packet: ClientboundSyncPlayerHydrationStartPacket) { + if (!hydratingPlayers.compareAndSet(false, true)) { + log.atWarning() + .log("Tried to start player hydration twice") + return + } + } + + override fun handleSyncPlayerHydrationChunk(packet: ClientboundSyncPlayerHydrationChunkPacket) { + if (!hydratingPlayers.get()) { + log.atWarning() + .log("Received player hydration chunk before start") + return + } + + pendingHydrationPlayers.addAll(packet.entries) + } + + override fun handleSyncPlayerHydrationEnd(packet: ClientboundSyncPlayerHydrationEndPacket) { + if (!hydratingPlayers.compareAndSet(true, false)) { + log.atWarning() + .log("Tried to end player hydration twice") + return + } + + for (data in pendingHydrationPlayers) { + val player = commonPlayerManagerImpl.createExistingPlayer( + data.uuid, + data.name, + data.playerIp, + data.serverName, + data.proxyName + ) + + data.pdcOrCallback.ifLeft { tag -> + player.overwritePpdc(tag) + }.ifRight { callback -> + pendingLargePpdcs.add(callback) + } + } + + pendingHydrationPlayers.clear() + } + + override fun handleSyncLargerPlayerPersistentDataContainerStart(packet: ClientboundSyncLargePlayerPersistentDataContainerStartPacket) { + if (currentLargePpdcUuid != null) { + log.atWarning() + .log("Received start of large PPD container before end of previous one (%s)", currentLargePpdcUuid) + return + } + + currentLargePpdcUuid = packet.playerUuid + currentLargePpdc = null + } + + override fun handleSyncLargerPlayerPersistentDataContainerChunk(packet: ClientboundSyncLargePlayerPersistentDataContainerChunkPacket) { + if (currentLargePpdcUuid == null) { + log.atWarning() + .log("Received chunk of large PPD container before start") + return + } + + val existing = currentLargePpdc + val payload = packet.payload + + currentLargePpdc = if (existing == null) { + payload + } else { + existing + payload + } + } + + override fun handleSyncLargerPlayerPersistentDataContainerEnd(packet: ClientboundSyncLargePlayerPersistentDataContainerEndPacket) { + val uuid = currentLargePpdcUuid + val payload = currentLargePpdc + if (uuid == null || payload == null) { + log.atWarning() + .log("Received end of large PPD container before start") + return + } + + currentLargePpdcUuid = null + currentLargePpdc = null + + pendingLargePpdcs.remove(uuid) + val player = commonPlayerManagerImpl.getPlayer(uuid) + if (player == null) { + log.atWarning() + .log("Received large PPD container end for unknown player (%s)", uuid) + return + } + + val tag = payload.inputStream().use { stream -> + BinaryTagIO.reader().read(stream) + } + + player.overwritePpdc(tag) + } + + override fun handleSynchronizePlayerMutes(packet: ClientboundSynchronizePlayerMutes) { + val player = commonPlayerManagerImpl.getPlayer(packet.playerUuid) + + if (player == null) { + log.atWarning() + .log("Received mute update for unknown player (%s)", packet.playerUuid) + return + } + + player.punishmentManager.updateMutes(packet.mutes) + } + override fun handlePacket(packet: NettyPacket) { val listeners = NettyListenerRegistry.getListeners(packet.javaClass) ?: return if (listeners.isEmpty()) return diff --git a/surf-cloud-core/surf-cloud-core-client/src/main/kotlin/dev/slne/surf/cloud/core/client/player/CommonClientCloudPlayerManagerImpl.kt b/surf-cloud-core/surf-cloud-core-client/src/main/kotlin/dev/slne/surf/cloud/core/client/player/CommonClientCloudPlayerManagerImpl.kt index b76499b5..687ebab1 100644 --- a/surf-cloud-core/surf-cloud-core-client/src/main/kotlin/dev/slne/surf/cloud/core/client/player/CommonClientCloudPlayerManagerImpl.kt +++ b/surf-cloud-core/surf-cloud-core-client/src/main/kotlin/dev/slne/surf/cloud/core/client/player/CommonClientCloudPlayerManagerImpl.kt @@ -25,17 +25,11 @@ abstract class CommonClientCloudPlayerManagerImpl val task = context[TaskName] ?: unnamedTask log.atWarning() .withCause(throwable) - .log("Unhandled exception in before start task: $task") + .log("Unhandled exception in synchronize task: $task") } ) { @JvmStatic diff --git a/surf-cloud-core/surf-cloud-core-common/src/main/kotlin/dev/slne/surf/cloud/core/common/netty/CommonNettyClientImpl.kt b/surf-cloud-core/surf-cloud-core-common/src/main/kotlin/dev/slne/surf/cloud/core/common/netty/CommonNettyClientImpl.kt index 653f646f..8518c21c 100644 --- a/surf-cloud-core/surf-cloud-core-common/src/main/kotlin/dev/slne/surf/cloud/core/common/netty/CommonNettyClientImpl.kt +++ b/surf-cloud-core/surf-cloud-core-common/src/main/kotlin/dev/slne/surf/cloud/core/common/netty/CommonNettyClientImpl.kt @@ -7,35 +7,18 @@ import dev.slne.surf.cloud.api.common.netty.packet.ResponseNettyPacket import dev.slne.surf.cloud.api.common.server.CloudServer import dev.slne.surf.cloud.api.common.server.CommonCloudServer import dev.slne.surf.cloud.core.common.netty.network.ConnectionImpl -import dev.slne.surf.surfapi.core.api.util.mutableObject2ObjectMapOf -import dev.slne.surf.surfapi.core.api.util.synchronize import kotlinx.coroutines.CompletableDeferred import java.net.InetSocketAddress +import java.util.concurrent.ConcurrentLinkedQueue import kotlin.time.Duration abstract class CommonNettyClientImpl( override val serverCategory: String, override val serverName: String ) : NettyClient { - private val packetQueue by lazy { mutableObject2ObjectMapOf?>().synchronize() } - + @Volatile private var _connection: ConnectionImpl? = null - set(value) { - field = value - - if (value != null) { - synchronized(packetQueue) { - packetQueue.forEach { (packet, deferred) -> - if (deferred != null) { - value.sendWithIndication(packet, deferred = deferred) - } else { - value.send(packet) - } - } - packetQueue.clear() - } - } - } + private val packetQueue = ConcurrentLinkedQueue() override val connection get() = _connection ?: error("connection not yet set") @@ -47,29 +30,39 @@ abstract class CommonNettyClientImpl( override fun fireAndForget(packet: NettyPacket) { val connection = _connection - if (connection == null) { - packetQueue[packet] = null - } else { + if (connection != null) { connection.send(packet) + return + } + + packetQueue.add(QueuedPacket(packet, null)) + + val connectionNow = _connection + if (connectionNow != null) { + drainQueue(connectionNow) } } override suspend fun fire(packet: NettyPacket, convertExceptions: Boolean): Boolean { val connection = _connection - if (connection == null) { - val result = runCatching { - val deferred = CompletableDeferred() - packetQueue[packet] = deferred - deferred.await() - } + if (connection != null) { + return connection.sendWithIndication(packet, convertExceptions) + } - if (convertExceptions) { - return result.getOrDefault(false) - } + val deferred = CompletableDeferred() + packetQueue.add(QueuedPacket(packet, deferred)) - return result.getOrThrow() + val connectionNow = _connection + if (connectionNow != null) { + drainQueue(connectionNow) + } + + val result = runCatching { deferred.await() } + + return if (convertExceptions) { + result.getOrDefault(false) } else { - return connection.sendWithIndication(packet, convertExceptions) + result.getOrThrow() } } @@ -85,5 +78,24 @@ abstract class CommonNettyClientImpl( fun initConnection(connection: ConnectionImpl) { check(_connection == null) { "Connection already set" } _connection = connection + drainQueue(connection) + } + + + private fun drainQueue(connection: ConnectionImpl) { + while (true) { + val queued = packetQueue.poll() ?: break + val (packet, deferred) = queued + if (deferred != null) { + connection.sendWithIndication(packet, deferred = deferred) + } else { + connection.send(packet) + } + } } + + private data class QueuedPacket( + val packet: NettyPacket, + val deferred: CompletableDeferred? + ) } \ No newline at end of file diff --git a/surf-cloud-core/surf-cloud-core-common/src/main/kotlin/dev/slne/surf/cloud/core/common/netty/network/ConnectionImpl.kt b/surf-cloud-core/surf-cloud-core-common/src/main/kotlin/dev/slne/surf/cloud/core/common/netty/network/ConnectionImpl.kt index c5326a38..1d8d9466 100644 --- a/surf-cloud-core/surf-cloud-core-common/src/main/kotlin/dev/slne/surf/cloud/core/common/netty/network/ConnectionImpl.kt +++ b/surf-cloud-core/surf-cloud-core-common/src/main/kotlin/dev/slne/surf/cloud/core/common/netty/network/ConnectionImpl.kt @@ -712,7 +712,6 @@ class ConnectionImpl( handleDisconnection() } - this.averageSentPackets = lerp( 0.75f, this.sentPackets.toFloat(), this.averageSentPackets diff --git a/surf-cloud-core/surf-cloud-core-common/src/main/kotlin/dev/slne/surf/cloud/core/common/netty/network/RespondingPacketSendHandler.kt b/surf-cloud-core/surf-cloud-core-common/src/main/kotlin/dev/slne/surf/cloud/core/common/netty/network/RespondingPacketSendHandler.kt index 207b5018..bfb57f4c 100644 --- a/surf-cloud-core/surf-cloud-core-common/src/main/kotlin/dev/slne/surf/cloud/core/common/netty/network/RespondingPacketSendHandler.kt +++ b/surf-cloud-core/surf-cloud-core-common/src/main/kotlin/dev/slne/surf/cloud/core/common/netty/network/RespondingPacketSendHandler.kt @@ -6,7 +6,6 @@ import dev.slne.surf.cloud.api.common.netty.packet.ResponseNettyPacket import dev.slne.surf.cloud.api.common.util.netty.UnifiedReadOnlyChannelHandler import dev.slne.surf.surfapi.core.api.util.logger import dev.slne.surf.surfapi.core.api.util.mutableObject2ObjectMapOf -import dev.slne.surf.surfapi.core.api.util.synchronize import io.netty.channel.ChannelHandlerContext import io.netty.channel.ChannelPromise import kotlinx.coroutines.CompletableDeferred @@ -16,7 +15,7 @@ import java.util.* class RespondingPacketSendHandler : UnifiedReadOnlyChannelHandler() { private val log = logger() private val respondingPackets = - mutableObject2ObjectMapOf>().synchronize() + mutableObject2ObjectMapOf>() @Suppress("DEPRECATION") override fun handleRead( @@ -49,7 +48,7 @@ class RespondingPacketSendHandler : UnifiedReadOnlyChannelHandler() } } - @Suppress("DEPRECATION") + @Suppress("UNCHECKED_CAST") override fun handleWrite( ctx: ChannelHandlerContext, msg: NettyPacket, diff --git a/surf-cloud-core/surf-cloud-core-common/src/main/kotlin/dev/slne/surf/cloud/core/common/netty/network/TickablePacketListener.kt b/surf-cloud-core/surf-cloud-core-common/src/main/kotlin/dev/slne/surf/cloud/core/common/netty/network/TickablePacketListener.kt index 61a7ea29..46394170 100644 --- a/surf-cloud-core/surf-cloud-core-common/src/main/kotlin/dev/slne/surf/cloud/core/common/netty/network/TickablePacketListener.kt +++ b/surf-cloud-core/surf-cloud-core-common/src/main/kotlin/dev/slne/surf/cloud/core/common/netty/network/TickablePacketListener.kt @@ -1,9 +1,7 @@ package dev.slne.surf.cloud.core.common.netty.network -import dev.slne.surf.surfapi.core.api.util.mutableObjectListOf -import dev.slne.surf.surfapi.core.api.util.synchronize -import kotlinx.coroutines.sync.Mutex -import kotlinx.coroutines.sync.withLock +import dev.slne.surf.surfapi.core.api.util.logger +import java.util.concurrent.ConcurrentLinkedQueue interface TickablePacketListener : PacketListener { @@ -14,22 +12,31 @@ interface TickablePacketListener : PacketListener { } abstract class CommonTickablePacketListener : TickablePacketListener { - private val schedules = mutableObjectListOf Unit>().synchronize() - private val schedulesMutex = Mutex() + companion object { + private val log = logger() + } + + private val schedules = ConcurrentLinkedQueue Unit>() override suspend fun tick() { - schedulesMutex.withLock { - schedules.forEach { it() } - schedules.clear() + while (true) { + val task = schedules.poll() ?: break + + try { + task() + } catch (e: Exception) { + log.atWarning() + .withCause(e) + .log("Error while executing scheduled task") + } } + tick0() } protected abstract suspend fun tick0() - suspend fun schedule(function: suspend () -> Unit) { - schedulesMutex.withLock { - schedules.add(function) - } + fun schedule(function: suspend () -> Unit) { + schedules.add(function) } } \ No newline at end of file diff --git a/surf-cloud-core/surf-cloud-core-common/src/main/kotlin/dev/slne/surf/cloud/core/common/netty/network/protocol/synchronizing/ClientSynchronizingPacketListener.kt b/surf-cloud-core/surf-cloud-core-common/src/main/kotlin/dev/slne/surf/cloud/core/common/netty/network/protocol/synchronizing/ClientSynchronizingPacketListener.kt index f2792ab4..87bc5cf5 100644 --- a/surf-cloud-core/surf-cloud-core-common/src/main/kotlin/dev/slne/surf/cloud/core/common/netty/network/protocol/synchronizing/ClientSynchronizingPacketListener.kt +++ b/surf-cloud-core/surf-cloud-core-common/src/main/kotlin/dev/slne/surf/cloud/core/common/netty/network/protocol/synchronizing/ClientSynchronizingPacketListener.kt @@ -25,5 +25,15 @@ interface ClientSynchronizingPacketListener : ClientCommonPacketListener, fun handlePlayerCacheHydrateChunk(packet: ClientboundPlayerCacheHydrateChunkPacket) fun handlePlayerCacheHydrateEnd(packet: ClientboundPlayerCacheHydrateEndPacket) + fun handleSyncPlayerHydrationStart(packet: ClientboundSyncPlayerHydrationStartPacket) + fun handleSyncPlayerHydrationChunk(packet: ClientboundSyncPlayerHydrationChunkPacket) + fun handleSyncPlayerHydrationEnd(packet: ClientboundSyncPlayerHydrationEndPacket) + + fun handleSyncLargerPlayerPersistentDataContainerStart(packet: ClientboundSyncLargePlayerPersistentDataContainerStartPacket) + fun handleSyncLargerPlayerPersistentDataContainerChunk(packet: ClientboundSyncLargePlayerPersistentDataContainerChunkPacket) + fun handleSyncLargerPlayerPersistentDataContainerEnd(packet: ClientboundSyncLargePlayerPersistentDataContainerEndPacket) + + fun handleSynchronizePlayerMutes(packet: ClientboundSynchronizePlayerMutes) + fun handlePacket(packet: NettyPacket) } \ No newline at end of file diff --git a/surf-cloud-core/surf-cloud-core-common/src/main/kotlin/dev/slne/surf/cloud/core/common/netty/network/protocol/synchronizing/ClientboundSyncLargePlayerPersistentDataContainerChunkPacket.kt b/surf-cloud-core/surf-cloud-core-common/src/main/kotlin/dev/slne/surf/cloud/core/common/netty/network/protocol/synchronizing/ClientboundSyncLargePlayerPersistentDataContainerChunkPacket.kt new file mode 100644 index 00000000..9ac1c907 --- /dev/null +++ b/surf-cloud-core/surf-cloud-core-common/src/main/kotlin/dev/slne/surf/cloud/core/common/netty/network/protocol/synchronizing/ClientboundSyncLargePlayerPersistentDataContainerChunkPacket.kt @@ -0,0 +1,31 @@ +package dev.slne.surf.cloud.core.common.netty.network.protocol.synchronizing + +import dev.slne.surf.cloud.api.common.meta.SurfNettyPacket +import dev.slne.surf.cloud.api.common.netty.network.ConnectionProtocol +import dev.slne.surf.cloud.api.common.netty.network.codec.ByteBufCodecs +import dev.slne.surf.cloud.api.common.netty.network.codec.StreamCodec +import dev.slne.surf.cloud.api.common.netty.network.protocol.PacketFlow +import dev.slne.surf.cloud.api.common.netty.packet.NettyPacket +import dev.slne.surf.cloud.api.common.netty.packet.PacketHandlerMode +import dev.slne.surf.cloud.core.common.netty.network.InternalNettyPacket + +@SurfNettyPacket( + "cloud:clientbound:sync_large_player_ppdc/chunk", + PacketFlow.CLIENTBOUND, + ConnectionProtocol.SYNCHRONIZING, + handlerMode = PacketHandlerMode.NETTY +) +class ClientboundSyncLargePlayerPersistentDataContainerChunkPacket(val payload: ByteArray) : + NettyPacket(), InternalNettyPacket { + companion object { + val STREAM_CODEC = StreamCodec.composite( + ByteBufCodecs.BYTE_ARRAY_CODEC, + ClientboundSyncLargePlayerPersistentDataContainerChunkPacket::payload, + ::ClientboundSyncLargePlayerPersistentDataContainerChunkPacket + ) + } + + override fun handle(listener: ClientSynchronizingPacketListener) { + listener.handleSyncLargerPlayerPersistentDataContainerChunk(this) + } +} \ No newline at end of file diff --git a/surf-cloud-core/surf-cloud-core-common/src/main/kotlin/dev/slne/surf/cloud/core/common/netty/network/protocol/synchronizing/ClientboundSyncLargePlayerPersistentDataContainerEndPacket.kt b/surf-cloud-core/surf-cloud-core-common/src/main/kotlin/dev/slne/surf/cloud/core/common/netty/network/protocol/synchronizing/ClientboundSyncLargePlayerPersistentDataContainerEndPacket.kt new file mode 100644 index 00000000..3daa6260 --- /dev/null +++ b/surf-cloud-core/surf-cloud-core-common/src/main/kotlin/dev/slne/surf/cloud/core/common/netty/network/protocol/synchronizing/ClientboundSyncLargePlayerPersistentDataContainerEndPacket.kt @@ -0,0 +1,24 @@ +package dev.slne.surf.cloud.core.common.netty.network.protocol.synchronizing + +import dev.slne.surf.cloud.api.common.meta.SurfNettyPacket +import dev.slne.surf.cloud.api.common.netty.network.ConnectionProtocol +import dev.slne.surf.cloud.api.common.netty.network.codec.streamCodecUnitSimple +import dev.slne.surf.cloud.api.common.netty.network.protocol.PacketFlow +import dev.slne.surf.cloud.api.common.netty.packet.NettyPacket +import dev.slne.surf.cloud.api.common.netty.packet.PacketHandlerMode +import dev.slne.surf.cloud.core.common.netty.network.InternalNettyPacket + +@SurfNettyPacket( + "cloud:clientbound:sync_large_player_ppdc/end", + PacketFlow.CLIENTBOUND, + ConnectionProtocol.SYNCHRONIZING, + handlerMode = PacketHandlerMode.NETTY +) +object ClientboundSyncLargePlayerPersistentDataContainerEndPacket : NettyPacket(), + InternalNettyPacket { + val STREAM_CODEC = streamCodecUnitSimple(this) + + override fun handle(listener: ClientSynchronizingPacketListener) { + listener.handleSyncLargerPlayerPersistentDataContainerEnd(this) + } +} \ No newline at end of file diff --git a/surf-cloud-core/surf-cloud-core-common/src/main/kotlin/dev/slne/surf/cloud/core/common/netty/network/protocol/synchronizing/ClientboundSyncLargePlayerPersistentDataContainerStartPacket.kt b/surf-cloud-core/surf-cloud-core-common/src/main/kotlin/dev/slne/surf/cloud/core/common/netty/network/protocol/synchronizing/ClientboundSyncLargePlayerPersistentDataContainerStartPacket.kt new file mode 100644 index 00000000..e9ca5a23 --- /dev/null +++ b/surf-cloud-core/surf-cloud-core-common/src/main/kotlin/dev/slne/surf/cloud/core/common/netty/network/protocol/synchronizing/ClientboundSyncLargePlayerPersistentDataContainerStartPacket.kt @@ -0,0 +1,32 @@ +package dev.slne.surf.cloud.core.common.netty.network.protocol.synchronizing + +import dev.slne.surf.cloud.api.common.meta.SurfNettyPacket +import dev.slne.surf.cloud.api.common.netty.network.ConnectionProtocol +import dev.slne.surf.cloud.api.common.netty.network.codec.ByteBufCodecs +import dev.slne.surf.cloud.api.common.netty.network.codec.StreamCodec +import dev.slne.surf.cloud.api.common.netty.network.protocol.PacketFlow +import dev.slne.surf.cloud.api.common.netty.packet.NettyPacket +import dev.slne.surf.cloud.api.common.netty.packet.PacketHandlerMode +import dev.slne.surf.cloud.core.common.netty.network.InternalNettyPacket +import java.util.* + +@SurfNettyPacket( + "cloud:clientbound:sync_large_player_ppdc/start", + PacketFlow.CLIENTBOUND, + ConnectionProtocol.SYNCHRONIZING, + handlerMode = PacketHandlerMode.NETTY +) +class ClientboundSyncLargePlayerPersistentDataContainerStartPacket(val playerUuid: UUID) : + NettyPacket(), InternalNettyPacket { + companion object { + val STREAM_CODEC = StreamCodec.composite( + ByteBufCodecs.UUID_CODEC, + ClientboundSyncLargePlayerPersistentDataContainerStartPacket::playerUuid, + ::ClientboundSyncLargePlayerPersistentDataContainerStartPacket + ) + } + + override fun handle(listener: ClientSynchronizingPacketListener) { + listener.handleSyncLargerPlayerPersistentDataContainerStart(this) + } +} \ No newline at end of file diff --git a/surf-cloud-core/surf-cloud-core-common/src/main/kotlin/dev/slne/surf/cloud/core/common/netty/network/protocol/synchronizing/ClientboundSyncPlayerHydrationChunkPacket.kt b/surf-cloud-core/surf-cloud-core-common/src/main/kotlin/dev/slne/surf/cloud/core/common/netty/network/protocol/synchronizing/ClientboundSyncPlayerHydrationChunkPacket.kt new file mode 100644 index 00000000..e9c4821f --- /dev/null +++ b/surf-cloud-core/surf-cloud-core-common/src/main/kotlin/dev/slne/surf/cloud/core/common/netty/network/protocol/synchronizing/ClientboundSyncPlayerHydrationChunkPacket.kt @@ -0,0 +1,68 @@ +package dev.slne.surf.cloud.core.common.netty.network.protocol.synchronizing + +import dev.slne.surf.cloud.api.common.meta.SurfNettyPacket +import dev.slne.surf.cloud.api.common.netty.network.ConnectionProtocol +import dev.slne.surf.cloud.api.common.netty.network.codec.ByteBufCodecs +import dev.slne.surf.cloud.api.common.netty.network.codec.StreamCodec +import dev.slne.surf.cloud.api.common.netty.network.codec.composite +import dev.slne.surf.cloud.api.common.netty.network.protocol.PacketFlow +import dev.slne.surf.cloud.api.common.netty.packet.NettyPacket +import dev.slne.surf.cloud.api.common.netty.packet.PacketHandlerMode +import dev.slne.surf.cloud.api.common.util.Either +import dev.slne.surf.cloud.core.common.netty.network.InternalNettyPacket +import net.kyori.adventure.nbt.CompoundBinaryTag +import java.net.Inet4Address +import java.util.* + +@SurfNettyPacket( + "cloud:clientbound:sync_players/chunk", + PacketFlow.CLIENTBOUND, + ConnectionProtocol.SYNCHRONIZING, + handlerMode = PacketHandlerMode.NETTY +) +class ClientboundSyncPlayerHydrationChunkPacket( + val entries: MutableList +) : NettyPacket(), InternalNettyPacket { + + companion object { + val STREAM_CODEC = StreamCodec.composite( + Entry.STREAM_CODEC.apply(ByteBufCodecs.list()), + ClientboundSyncPlayerHydrationChunkPacket::entries, + ::ClientboundSyncPlayerHydrationChunkPacket + ) + } + + override fun handle(listener: ClientSynchronizingPacketListener) { + listener.handleSyncPlayerHydrationChunk(this) + } + + data class Entry( + val uuid: UUID, + val name: String, + val serverName: String?, + val proxyName: String?, + val playerIp: Inet4Address, + val pdcOrCallback: Either + ) { + companion object { + val STREAM_CODEC = StreamCodec.composite( + ByteBufCodecs.UUID_CODEC, + Entry::uuid, + ByteBufCodecs.STRING_CODEC, + Entry::name, + ByteBufCodecs.STRING_CODEC.apply(ByteBufCodecs::nullable), + Entry::serverName, + ByteBufCodecs.STRING_CODEC.apply(ByteBufCodecs::nullable), + Entry::proxyName, + ByteBufCodecs.INET_4_ADDRESS_CODEC, + Entry::playerIp, + ByteBufCodecs.either( + ByteBufCodecs.COMPOUND_TAG_CODEC, + ByteBufCodecs.UUID_CODEC + ), + Entry::pdcOrCallback, + ::Entry + ) + } + } +} \ No newline at end of file diff --git a/surf-cloud-core/surf-cloud-core-common/src/main/kotlin/dev/slne/surf/cloud/core/common/netty/network/protocol/synchronizing/ClientboundSyncPlayerHydrationEndPacket.kt b/surf-cloud-core/surf-cloud-core-common/src/main/kotlin/dev/slne/surf/cloud/core/common/netty/network/protocol/synchronizing/ClientboundSyncPlayerHydrationEndPacket.kt new file mode 100644 index 00000000..30578349 --- /dev/null +++ b/surf-cloud-core/surf-cloud-core-common/src/main/kotlin/dev/slne/surf/cloud/core/common/netty/network/protocol/synchronizing/ClientboundSyncPlayerHydrationEndPacket.kt @@ -0,0 +1,22 @@ +package dev.slne.surf.cloud.core.common.netty.network.protocol.synchronizing + +import dev.slne.surf.cloud.api.common.meta.SurfNettyPacket +import dev.slne.surf.cloud.api.common.netty.network.ConnectionProtocol +import dev.slne.surf.cloud.api.common.netty.network.codec.streamCodecUnitSimple +import dev.slne.surf.cloud.api.common.netty.network.protocol.PacketFlow +import dev.slne.surf.cloud.api.common.netty.packet.NettyPacket +import dev.slne.surf.cloud.api.common.netty.packet.PacketHandlerMode +import dev.slne.surf.cloud.core.common.netty.network.InternalNettyPacket + +@SurfNettyPacket( + "cloud:clientbound:sync_players/end", + PacketFlow.CLIENTBOUND, + ConnectionProtocol.SYNCHRONIZING, + handlerMode = PacketHandlerMode.NETTY +) +object ClientboundSyncPlayerHydrationEndPacket : NettyPacket(), InternalNettyPacket { + val STREAM_CODEC = streamCodecUnitSimple(this) + override fun handle(listener: ClientSynchronizingPacketListener) { + listener.handleSyncPlayerHydrationEnd(this) + } +} \ No newline at end of file diff --git a/surf-cloud-core/surf-cloud-core-common/src/main/kotlin/dev/slne/surf/cloud/core/common/netty/network/protocol/synchronizing/ClientboundSyncPlayerHydrationStartPacket.kt b/surf-cloud-core/surf-cloud-core-common/src/main/kotlin/dev/slne/surf/cloud/core/common/netty/network/protocol/synchronizing/ClientboundSyncPlayerHydrationStartPacket.kt new file mode 100644 index 00000000..9925a6d8 --- /dev/null +++ b/surf-cloud-core/surf-cloud-core-common/src/main/kotlin/dev/slne/surf/cloud/core/common/netty/network/protocol/synchronizing/ClientboundSyncPlayerHydrationStartPacket.kt @@ -0,0 +1,22 @@ +package dev.slne.surf.cloud.core.common.netty.network.protocol.synchronizing + +import dev.slne.surf.cloud.api.common.meta.SurfNettyPacket +import dev.slne.surf.cloud.api.common.netty.network.ConnectionProtocol +import dev.slne.surf.cloud.api.common.netty.network.codec.streamCodecUnitSimple +import dev.slne.surf.cloud.api.common.netty.network.protocol.PacketFlow +import dev.slne.surf.cloud.api.common.netty.packet.NettyPacket +import dev.slne.surf.cloud.api.common.netty.packet.PacketHandlerMode +import dev.slne.surf.cloud.core.common.netty.network.InternalNettyPacket + +@SurfNettyPacket( + "cloud:clientbound:sync_players/start", + PacketFlow.CLIENTBOUND, + ConnectionProtocol.SYNCHRONIZING, + handlerMode = PacketHandlerMode.NETTY +) +object ClientboundSyncPlayerHydrationStartPacket : NettyPacket(), InternalNettyPacket { + val STREAM_CODEC = streamCodecUnitSimple(this) + override fun handle(listener: ClientSynchronizingPacketListener) { + listener.handleSyncPlayerHydrationStart(this) + } +} \ No newline at end of file diff --git a/surf-cloud-core/surf-cloud-core-common/src/main/kotlin/dev/slne/surf/cloud/core/common/netty/network/protocol/synchronizing/ClientboundSynchronizePlayerMutes.kt b/surf-cloud-core/surf-cloud-core-common/src/main/kotlin/dev/slne/surf/cloud/core/common/netty/network/protocol/synchronizing/ClientboundSynchronizePlayerMutes.kt new file mode 100644 index 00000000..70c2876c --- /dev/null +++ b/surf-cloud-core/surf-cloud-core-common/src/main/kotlin/dev/slne/surf/cloud/core/common/netty/network/protocol/synchronizing/ClientboundSynchronizePlayerMutes.kt @@ -0,0 +1,37 @@ +package dev.slne.surf.cloud.core.common.netty.network.protocol.synchronizing + +import dev.slne.surf.cloud.api.common.meta.SurfNettyPacket +import dev.slne.surf.cloud.api.common.netty.network.ConnectionProtocol +import dev.slne.surf.cloud.api.common.netty.network.codec.ByteBufCodecs +import dev.slne.surf.cloud.api.common.netty.network.codec.StreamCodec +import dev.slne.surf.cloud.api.common.netty.network.protocol.PacketFlow +import dev.slne.surf.cloud.api.common.netty.packet.NettyPacket +import dev.slne.surf.cloud.api.common.netty.packet.PacketHandlerMode +import dev.slne.surf.cloud.core.common.netty.network.InternalNettyPacket +import dev.slne.surf.cloud.core.common.player.punishment.type.PunishmentMuteImpl +import java.util.* + +@SurfNettyPacket( + "cloud:clientbound:synchronize_punishments/mutes", + PacketFlow.CLIENTBOUND, + ConnectionProtocol.SYNCHRONIZING, + handlerMode = PacketHandlerMode.NETTY +) +class ClientboundSynchronizePlayerMutes( + val playerUuid: UUID, + val mutes: MutableList +) : NettyPacket(), InternalNettyPacket { + companion object { + val STREAM_CODEC = StreamCodec.composite( + ByteBufCodecs.UUID_CODEC, + ClientboundSynchronizePlayerMutes::playerUuid, + PunishmentMuteImpl.STREAM_CODEC.apply(ByteBufCodecs.list()), + ClientboundSynchronizePlayerMutes::mutes, + ::ClientboundSynchronizePlayerMutes + ) + } + + override fun handle(listener: ClientSynchronizingPacketListener) { + listener.handleSynchronizePlayerMutes(this) + } +} \ No newline at end of file diff --git a/surf-cloud-core/surf-cloud-core-common/src/main/kotlin/dev/slne/surf/cloud/core/common/netty/network/protocol/synchronizing/SynchronizingProtocols.kt b/surf-cloud-core/surf-cloud-core-common/src/main/kotlin/dev/slne/surf/cloud/core/common/netty/network/protocol/synchronizing/SynchronizingProtocols.kt index 8cc13959..7e7e4ed1 100644 --- a/surf-cloud-core/surf-cloud-core-common/src/main/kotlin/dev/slne/surf/cloud/core/common/netty/network/protocol/synchronizing/SynchronizingProtocols.kt +++ b/surf-cloud-core/surf-cloud-core-common/src/main/kotlin/dev/slne/surf/cloud/core/common/netty/network/protocol/synchronizing/SynchronizingProtocols.kt @@ -26,6 +26,13 @@ object SynchronizingProtocols { .addPacket(FinishSynchronizingPacket.STREAM_CODEC) .addPacket(ClientboundSynchronizeFinishPacket.STREAM_CODEC) .addPacket(ClientboundSetVelocitySecretPacket.STREAM_CODEC) + .addPacket(ClientboundSyncPlayerHydrationStartPacket.STREAM_CODEC) + .addPacket(ClientboundSyncPlayerHydrationChunkPacket.STREAM_CODEC) + .addPacket(ClientboundSyncPlayerHydrationEndPacket.STREAM_CODEC) + .addPacket(ClientboundSyncLargePlayerPersistentDataContainerStartPacket.STREAM_CODEC) + .addPacket(ClientboundSyncLargePlayerPersistentDataContainerEndPacket.STREAM_CODEC) + .addPacket(ClientboundSyncLargePlayerPersistentDataContainerChunkPacket.STREAM_CODEC) + .addPacket(ClientboundSynchronizePlayerMutes.STREAM_CODEC) } val CLIENTBOUND by lazy { CLIENTBOUND_TEMPLATE.bind(::SurfByteBuf) } diff --git a/surf-cloud-core/surf-cloud-core-common/src/main/kotlin/dev/slne/surf/cloud/core/common/player/CloudPlayerManagerImpl.kt b/surf-cloud-core/surf-cloud-core-common/src/main/kotlin/dev/slne/surf/cloud/core/common/player/CloudPlayerManagerImpl.kt index 844410b2..048ddaef 100644 --- a/surf-cloud-core/surf-cloud-core-common/src/main/kotlin/dev/slne/surf/cloud/core/common/player/CloudPlayerManagerImpl.kt +++ b/surf-cloud-core/surf-cloud-core-common/src/main/kotlin/dev/slne/surf/cloud/core/common/player/CloudPlayerManagerImpl.kt @@ -41,16 +41,16 @@ abstract class CloudPlayerManagerImpl

: CloudPlayerMa abstract fun createPlayer( uuid: UUID, name: String, - proxy: Boolean, - ip: Inet4Address, - serverName: String + proxyName: String?, + serverName: String?, + ip: Inet4Address ): P abstract fun updateProxyServer(player: P, serverName: String) abstract fun updateServer(player: P, serverName: String) - abstract fun removeProxyServer(player: P, serverName: String) - abstract fun removeServer(player: P, serverName: String) + abstract fun removeProxyServer(player: P) + abstract fun removeServer(player: P) abstract fun getProxyServerName(player: P): String? abstract fun getServerName(player: P): String? @@ -63,6 +63,38 @@ abstract class CloudPlayerManagerImpl

: CloudPlayerMa playerCache.currentValues().forEach(action) } + fun createExistingPlayer( + uuid: UUID, + name: String, + ip: Inet4Address, + serverName: String?, + proxyName: String?, + extraInit: P.() -> Unit = {} + ): P { + val existing = playerCache.getOrNull(uuid) + if (existing != null) { + if (proxyName == null) { + removeProxyServer(existing) + } else { + updateProxyServer(existing, proxyName) + } + + if (serverName == null) { + removeServer(existing) + } else { + updateServer(existing, serverName) + } + + return existing.apply { this.extraInit() } + } + + val player = createPlayer(uuid, name, proxyName, serverName, ip) + player.extraInit() + playerCache.put(uuid, player) + + return player + } + /** * Updates the server or proxy information of an existing player, * or creates a new player if one does not exist. @@ -98,7 +130,13 @@ abstract class CloudPlayerManagerImpl

: CloudPlayerMa return try { val player = playerCache.get(uuid) { - val newPlayer = createPlayer(uuid, name, proxy, ip, serverName) + val newPlayer = createPlayer( + uuid, + name, + if (proxy) serverName else null, + if (!proxy) serverName else null, + ip + ) newPlayer.extraInit() if (runPreJoinTasks) { @@ -274,9 +312,9 @@ abstract class CloudPlayerManagerImpl

: CloudPlayerMa val oldProxy = getProxyServerName(player) val oldServer = getServerName(player) if (proxy) { - removeProxyServer(player, serverName) + removeProxyServer(player) } else { - removeServer(player, serverName) + removeServer(player) } onServerDisconnect(uuid, player, serverName) diff --git a/surf-cloud-core/surf-cloud-core-common/src/main/kotlin/dev/slne/surf/cloud/core/common/player/CommonCloudPlayerImpl.kt b/surf-cloud-core/surf-cloud-core-common/src/main/kotlin/dev/slne/surf/cloud/core/common/player/CommonCloudPlayerImpl.kt index 1a2d531f..187c669f 100644 --- a/surf-cloud-core/surf-cloud-core-common/src/main/kotlin/dev/slne/surf/cloud/core/common/player/CommonCloudPlayerImpl.kt +++ b/surf-cloud-core/surf-cloud-core-common/src/main/kotlin/dev/slne/surf/cloud/core/common/player/CommonCloudPlayerImpl.kt @@ -8,6 +8,8 @@ import dev.slne.surf.cloud.api.common.server.CloudServerManager import dev.slne.surf.cloud.core.common.player.ppdc.PersistentPlayerDataContainerViewImpl import dev.slne.surf.cloud.core.common.player.ppdc.TrackingPlayerPersistentDataContainerImpl import dev.slne.surf.cloud.core.common.player.ppdc.network.PdcPatch +import net.kyori.adventure.nbt.BinaryTagIO +import java.io.ByteArrayOutputStream import java.time.ZonedDateTime import java.util.* import java.util.concurrent.locks.ReentrantReadWriteLock @@ -51,6 +53,24 @@ abstract class CommonCloudPlayerImpl(uuid: UUID, override val name: String) : } } + fun estimatedPpdcSize(): Int { + return ppdcReentrantLock.read { + ByteArrayOutputStream().use { os -> + BinaryTagIO.writer().write(ppdc.tag, os) + os.size() + } + } + } + + fun ppdcToByteArray(): ByteArray { + return ppdcReentrantLock.read { + ByteArrayOutputStream().use { os -> + BinaryTagIO.writer().write(ppdc.tag, os) + os.toByteArray() + } + } + } + override suspend fun connectToServer( group: String, diff --git a/surf-cloud-core/surf-cloud-core-common/src/main/kotlin/dev/slne/surf/cloud/core/common/player/punishment/CloudPlayerPunishmentManagerImpl.kt b/surf-cloud-core/surf-cloud-core-common/src/main/kotlin/dev/slne/surf/cloud/core/common/player/punishment/CloudPlayerPunishmentManagerImpl.kt index 66f68046..a0261f62 100644 --- a/surf-cloud-core/surf-cloud-core-common/src/main/kotlin/dev/slne/surf/cloud/core/common/player/punishment/CloudPlayerPunishmentManagerImpl.kt +++ b/surf-cloud-core/surf-cloud-core-common/src/main/kotlin/dev/slne/surf/cloud/core/common/player/punishment/CloudPlayerPunishmentManagerImpl.kt @@ -9,15 +9,15 @@ import dev.slne.surf.cloud.api.common.player.punishment.type.ban.PunishmentBan import dev.slne.surf.cloud.api.common.player.punishment.type.kick.PunishmentKick import dev.slne.surf.cloud.api.common.player.punishment.type.mute.PunishmentMute import dev.slne.surf.cloud.api.common.player.punishment.type.warn.PunishmentWarn -import dev.slne.surf.surfapi.core.api.util.emptyObjectList -import dev.slne.surf.surfapi.core.api.util.toObjectList import dev.slne.surf.cloud.core.common.player.PunishmentManager import dev.slne.surf.cloud.core.common.player.punishment.type.PunishmentBanImpl import dev.slne.surf.cloud.core.common.player.punishment.type.PunishmentKickImpl import dev.slne.surf.cloud.core.common.player.punishment.type.PunishmentMuteImpl import dev.slne.surf.cloud.core.common.player.punishment.type.PunishmentWarnImpl import dev.slne.surf.cloud.core.common.util.bean +import dev.slne.surf.surfapi.core.api.util.emptyObjectList import dev.slne.surf.surfapi.core.api.util.logger +import dev.slne.surf.surfapi.core.api.util.toObjectList import it.unimi.dsi.fastutil.objects.ObjectList import org.jetbrains.annotations.Unmodifiable import java.util.* @@ -66,6 +66,28 @@ class CloudPlayerPunishmentManagerImpl(private val playerUuid: UUID) : return (if (sort) mutes.sorted() else mutes).toObjectList() } + fun rawCachedMutes(): MutableList { + return cachedMutes + } + + fun updateMutes(updated: List) { + if (!cachedMutesFetched) { + cachedMutes = CopyOnWriteArrayList(updated) + cachedMutesFetched = true + return + } + + val iterator = cachedMutes.iterator() + while (iterator.hasNext()) { + val mute = iterator.next() + if (updated.any { it.punishmentId == mute.punishmentId }) { + iterator.remove() + } + } + + cachedMutes.addAll(updated) + } + @Suppress("UNCHECKED_CAST") suspend fun cacheMutes() { cachedMutes = CopyOnWriteArrayList(fetchMutes() as ObjectList) diff --git a/surf-cloud-core/surf-cloud-core-common/src/main/kotlin/dev/slne/surf/cloud/core/common/plugin/task/CloudSynchronizeTaskManager.kt b/surf-cloud-core/surf-cloud-core-common/src/main/kotlin/dev/slne/surf/cloud/core/common/plugin/task/CloudSynchronizeTaskManager.kt index 890e4b2e..a0da6794 100644 --- a/surf-cloud-core/surf-cloud-core-common/src/main/kotlin/dev/slne/surf/cloud/core/common/plugin/task/CloudSynchronizeTaskManager.kt +++ b/surf-cloud-core/surf-cloud-core-common/src/main/kotlin/dev/slne/surf/cloud/core/common/plugin/task/CloudSynchronizeTaskManager.kt @@ -2,7 +2,7 @@ package dev.slne.surf.cloud.core.common.plugin.task import dev.slne.surf.cloud.api.common.netty.NettyClient import dev.slne.surf.cloud.api.common.plugin.spring.task.CloudInitialSynchronizeTask -import dev.slne.surf.cloud.core.common.coroutines.BeforeStartTaskScope +import dev.slne.surf.cloud.core.common.coroutines.SynchronizeTasksScope import dev.slne.surf.surfapi.core.api.util.logger import kotlinx.coroutines.launch import org.springframework.core.annotation.AnnotationAwareOrderComparator @@ -35,7 +35,7 @@ object CloudSynchronizeTaskManager { .log("Executing initial synchronize task: ${task.name} (${position + 1}/${tasks.size})") val duration = measureTime { - BeforeStartTaskScope.launch(BeforeStartTaskScope.TaskName(task.name, position)) { + SynchronizeTasksScope.launch(SynchronizeTasksScope.TaskName(task.name, position)) { task.execute(client) }.join() } diff --git a/surf-cloud-standalone/src/main/kotlin/dev/slne/surf/cloud/standalone/netty/server/network/ServerSynchronizingPacketListenerImpl.kt b/surf-cloud-standalone/src/main/kotlin/dev/slne/surf/cloud/standalone/netty/server/network/ServerSynchronizingPacketListenerImpl.kt index a3696a36..7ef87003 100644 --- a/surf-cloud-standalone/src/main/kotlin/dev/slne/surf/cloud/standalone/netty/server/network/ServerSynchronizingPacketListenerImpl.kt +++ b/surf-cloud-standalone/src/main/kotlin/dev/slne/surf/cloud/standalone/netty/server/network/ServerSynchronizingPacketListenerImpl.kt @@ -4,11 +4,13 @@ import dev.slne.surf.cloud.api.common.netty.network.ConnectionProtocol import dev.slne.surf.cloud.api.common.netty.packet.NettyPacket import dev.slne.surf.cloud.api.common.netty.packet.NettyPacketInfo import dev.slne.surf.cloud.api.common.player.CloudPlayerManager -import dev.slne.surf.cloud.core.common.coroutines.BeforeStartTaskScope import dev.slne.surf.cloud.core.common.coroutines.PacketHandlerScope +import dev.slne.surf.cloud.core.common.coroutines.SynchronizeTasksScope import dev.slne.surf.cloud.core.common.netty.network.ConnectionImpl -import dev.slne.surf.cloud.core.common.netty.network.protocol.common.ClientboundSetVelocitySecretPacket -import dev.slne.surf.cloud.core.common.netty.network.protocol.running.* +import dev.slne.surf.cloud.core.common.netty.network.protocol.running.RunningProtocols +import dev.slne.surf.cloud.core.common.netty.network.protocol.running.ServerboundCreateOfflineCloudPlayerIfNotExistsPacket +import dev.slne.surf.cloud.core.common.netty.network.protocol.running.SyncSetDeltaPacket +import dev.slne.surf.cloud.core.common.netty.network.protocol.running.SyncValueChangePacket import dev.slne.surf.cloud.core.common.netty.network.protocol.synchronizing.ClientboundSynchronizeFinishPacket import dev.slne.surf.cloud.core.common.netty.network.protocol.synchronizing.FinishSynchronizingPacket import dev.slne.surf.cloud.core.common.netty.network.protocol.synchronizing.ServerSynchronizingPacketListener @@ -16,11 +18,8 @@ import dev.slne.surf.cloud.core.common.netty.network.protocol.synchronizing.Serv import dev.slne.surf.cloud.core.common.netty.registry.listener.NettyListenerRegistry import dev.slne.surf.cloud.core.common.plugin.task.CloudSynchronizeTaskManager import dev.slne.surf.cloud.standalone.netty.server.NettyServerImpl -import dev.slne.surf.cloud.standalone.netty.server.ProxySecretHolder import dev.slne.surf.cloud.standalone.netty.server.ServerClientImpl -import dev.slne.surf.cloud.standalone.netty.server.network.config.SynchronizeRegistriesTask -import dev.slne.surf.cloud.standalone.netty.server.network.config.SynchronizeUserTask -import dev.slne.surf.cloud.standalone.server.serverManagerImpl +import dev.slne.surf.cloud.standalone.netty.server.network.config.* import dev.slne.surf.cloud.standalone.sync.SyncRegistryImpl import dev.slne.surf.surfapi.core.api.util.logger import kotlinx.coroutines.launch @@ -52,12 +51,13 @@ class ServerSynchronizingPacketListenerImpl( check(state == State.START) { "Cannot start synchronizing from state $state" } state = State.SYNCHRONIZING - BeforeStartTaskScope.launch { - send(ClientboundSetVelocitySecretPacket(ProxySecretHolder.currentSecret())) - send(ClientboundBatchUpdateServer(serverManagerImpl.retrieveAllServers())) - + SynchronizeTasksScope.launch { + SetVelocitySecretTask.execute(client) + SynchronizeServersTask.execute(client) SynchronizeRegistriesTask.execute(client) SynchronizeUserTask.execute(client) + SynchronizePlayerPunishmentManagers.execute(client) + CloudSynchronizeTaskManager.executeTasks(client) state = State.WAIT_FOR_CLIENT diff --git a/surf-cloud-standalone/src/main/kotlin/dev/slne/surf/cloud/standalone/netty/server/network/config/SetVelocitySecretTask.kt b/surf-cloud-standalone/src/main/kotlin/dev/slne/surf/cloud/standalone/netty/server/network/config/SetVelocitySecretTask.kt new file mode 100644 index 00000000..4e2629e8 --- /dev/null +++ b/surf-cloud-standalone/src/main/kotlin/dev/slne/surf/cloud/standalone/netty/server/network/config/SetVelocitySecretTask.kt @@ -0,0 +1,12 @@ +package dev.slne.surf.cloud.standalone.netty.server.network.config + +import dev.slne.surf.cloud.api.common.netty.NettyClient +import dev.slne.surf.cloud.api.common.plugin.spring.task.CloudInitialSynchronizeTask +import dev.slne.surf.cloud.core.common.netty.network.protocol.common.ClientboundSetVelocitySecretPacket +import dev.slne.surf.cloud.standalone.netty.server.ProxySecretHolder + +object SetVelocitySecretTask: CloudInitialSynchronizeTask { + override suspend fun execute(client: NettyClient) { + client.fireAndForget(ClientboundSetVelocitySecretPacket(ProxySecretHolder.currentSecret())) + } +} \ No newline at end of file diff --git a/surf-cloud-standalone/src/main/kotlin/dev/slne/surf/cloud/standalone/netty/server/network/config/SynchronizePlayerPunishmentManagers.kt b/surf-cloud-standalone/src/main/kotlin/dev/slne/surf/cloud/standalone/netty/server/network/config/SynchronizePlayerPunishmentManagers.kt new file mode 100644 index 00000000..f766357b --- /dev/null +++ b/surf-cloud-standalone/src/main/kotlin/dev/slne/surf/cloud/standalone/netty/server/network/config/SynchronizePlayerPunishmentManagers.kt @@ -0,0 +1,25 @@ +package dev.slne.surf.cloud.standalone.netty.server.network.config + +import dev.slne.surf.cloud.api.common.netty.NettyClient +import dev.slne.surf.cloud.api.common.plugin.spring.task.CloudInitialSynchronizeTask +import dev.slne.surf.cloud.core.common.netty.network.protocol.synchronizing.ClientboundSynchronizePlayerMutes +import dev.slne.surf.cloud.standalone.player.StandaloneCloudPlayerImpl +import dev.slne.surf.cloud.standalone.player.standalonePlayerManagerImpl + +object SynchronizePlayerPunishmentManagers : CloudInitialSynchronizeTask { + override suspend fun execute(client: NettyClient) { + val players = standalonePlayerManagerImpl.getRawOnlinePlayers() + synchronizeCachedMutes(client, players) + } + + private fun synchronizeCachedMutes( + client: NettyClient, + players: List + ) { + for (player in players) { + val mutes = player.punishmentManager.rawCachedMutes() + if (mutes.isEmpty()) continue + client.fireAndForget(ClientboundSynchronizePlayerMutes(player.uuid, mutes)) + } + } +} \ No newline at end of file diff --git a/surf-cloud-standalone/src/main/kotlin/dev/slne/surf/cloud/standalone/netty/server/network/config/SynchronizeRegistriesTask.kt b/surf-cloud-standalone/src/main/kotlin/dev/slne/surf/cloud/standalone/netty/server/network/config/SynchronizeRegistriesTask.kt index d4e725ab..c6523e5a 100644 --- a/surf-cloud-standalone/src/main/kotlin/dev/slne/surf/cloud/standalone/netty/server/network/config/SynchronizeRegistriesTask.kt +++ b/surf-cloud-standalone/src/main/kotlin/dev/slne/surf/cloud/standalone/netty/server/network/config/SynchronizeRegistriesTask.kt @@ -9,7 +9,7 @@ import dev.slne.surf.cloud.standalone.sync.SyncRegistryImpl object SynchronizeRegistriesTask : CloudInitialSynchronizeTask { override suspend fun execute(client: NettyClient) { - client.connection.send(ClientboundBatchSyncValuePacket(SyncRegistryImpl.instance.prepareBatchSyncValues())) - client.connection.send(ClientboundBatchSyncSetPacket(SyncRegistryImpl.instance.prepareBatchSyncSets())) + client.fireAndForget(ClientboundBatchSyncValuePacket(SyncRegistryImpl.instance.prepareBatchSyncValues())) + client.fireAndForget(ClientboundBatchSyncSetPacket(SyncRegistryImpl.instance.prepareBatchSyncSets())) } } \ No newline at end of file diff --git a/surf-cloud-standalone/src/main/kotlin/dev/slne/surf/cloud/standalone/netty/server/network/config/SynchronizeServersTask.kt b/surf-cloud-standalone/src/main/kotlin/dev/slne/surf/cloud/standalone/netty/server/network/config/SynchronizeServersTask.kt new file mode 100644 index 00000000..862d32a8 --- /dev/null +++ b/surf-cloud-standalone/src/main/kotlin/dev/slne/surf/cloud/standalone/netty/server/network/config/SynchronizeServersTask.kt @@ -0,0 +1,12 @@ +package dev.slne.surf.cloud.standalone.netty.server.network.config + +import dev.slne.surf.cloud.api.common.netty.NettyClient +import dev.slne.surf.cloud.api.common.plugin.spring.task.CloudInitialSynchronizeTask +import dev.slne.surf.cloud.core.common.netty.network.protocol.running.ClientboundBatchUpdateServer +import dev.slne.surf.cloud.standalone.server.serverManagerImpl + +object SynchronizeServersTask : CloudInitialSynchronizeTask { + override suspend fun execute(client: NettyClient) { + client.fireAndForget(ClientboundBatchUpdateServer(serverManagerImpl.retrieveAllServers())) + } +} \ No newline at end of file diff --git a/surf-cloud-standalone/src/main/kotlin/dev/slne/surf/cloud/standalone/netty/server/network/config/SynchronizeUserTask.kt b/surf-cloud-standalone/src/main/kotlin/dev/slne/surf/cloud/standalone/netty/server/network/config/SynchronizeUserTask.kt index 4333ed2a..dba7cdb6 100644 --- a/surf-cloud-standalone/src/main/kotlin/dev/slne/surf/cloud/standalone/netty/server/network/config/SynchronizeUserTask.kt +++ b/surf-cloud-standalone/src/main/kotlin/dev/slne/surf/cloud/standalone/netty/server/network/config/SynchronizeUserTask.kt @@ -1,10 +1,157 @@ package dev.slne.surf.cloud.standalone.netty.server.network.config +import com.github.benmanes.caffeine.cache.Caffeine +import com.google.common.flogger.StackSize import dev.slne.surf.cloud.api.common.netty.NettyClient import dev.slne.surf.cloud.api.common.plugin.spring.task.CloudInitialSynchronizeTask +import dev.slne.surf.cloud.api.common.util.Either +import dev.slne.surf.cloud.core.common.netty.network.MAX_PACKET_SIZE +import dev.slne.surf.cloud.core.common.netty.network.protocol.synchronizing.* +import dev.slne.surf.cloud.core.common.netty.network.protocol.synchronizing.ClientboundSyncPlayerHydrationChunkPacket.Entry +import dev.slne.surf.cloud.standalone.player.StandaloneCloudPlayerImpl +import dev.slne.surf.cloud.standalone.player.standalonePlayerManagerImpl +import dev.slne.surf.surfapi.core.api.util.logger +import dev.slne.surf.surfapi.core.api.util.mutableObjectListOf +import dev.slne.surf.surfapi.core.api.util.mutableObjectSetOf +import dev.slne.surf.surfapi.core.api.util.toMutableObjectList +import io.netty.buffer.Unpooled +import it.unimi.dsi.fastutil.objects.ObjectSet +import net.kyori.adventure.nbt.CompoundBinaryTag +import java.util.* +import java.util.concurrent.TimeUnit + +object SynchronizeUserTask : CloudInitialSynchronizeTask { + private const val MAX_PDC_SIZE = MAX_PACKET_SIZE - 0xF4240 // ~7 MB for pdc + private const val MAX_HYDRATION_DATA_SIZE = MAX_PACKET_SIZE - 0x186A0 + private val log = logger() + + private val pendingPdcs = Caffeine.newBuilder() + .expireAfterWrite(2, TimeUnit.MINUTES) + .removalListener> { key, value, cause -> + if (value != null && value.isNotEmpty()) { + log.atWarning() + .withStackTrace(StackSize.SMALL) + .log( + "Unable to transfer all player PDCs to server '%s'. Pending PDC callbacks: %s. Cause: %s", + key, + value, + cause + ) + } + } + .build>() -object SynchronizeUserTask: CloudInitialSynchronizeTask { override suspend fun execute(client: NettyClient) { + val players = standalonePlayerManagerImpl.getRawOnlinePlayers() + + syncPlayerHydration(client, players) + syncPendingPersistentDataContainers(client, players) + } + + private fun syncPlayerHydration( + client: NettyClient, + players: List + ) { + val tempBuf = Unpooled.directBuffer(1024) + try { + client.fireAndForget(ClientboundSyncPlayerHydrationStartPacket) + + val current = mutableObjectListOf() + var currentSize = 0 + + fun estimatePlayerSize(entry: Entry): Int { + tempBuf.clear() + Entry.STREAM_CODEC.encode(tempBuf, entry) + return tempBuf.readableBytes() + } + + for (player in players) { + val data = player.toHydrationData(client) + val playerSize = estimatePlayerSize(data) + + if (currentSize + playerSize > MAX_HYDRATION_DATA_SIZE && current.isNotEmpty()) { + client.fireAndForget(ClientboundSyncPlayerHydrationChunkPacket(current.toMutableObjectList())) + current.clear() + currentSize = 0 + } + + current.add(data) + currentSize += playerSize + } + + if (current.isNotEmpty()) { + client.fireAndForget(ClientboundSyncPlayerHydrationChunkPacket(current.toMutableObjectList())) + } + + current.clear() + tempBuf.clear() + + client.fireAndForget(ClientboundSyncPlayerHydrationEndPacket) + } finally { + tempBuf.release() + } + } + + private fun syncPendingPersistentDataContainers( + client: NettyClient, + players: List + ) { + val pendingPdcs = pendingPdcs.getIfPresent(client.serverName) + if (pendingPdcs.isNullOrEmpty()) return + val iterator = pendingPdcs.iterator() + while (iterator.hasNext()) { + val uuid = iterator.next() + val player = players.find { it.uuid == uuid } ?: run { + log.atWarning() + .log( + "Unable to find player with UUID '%s' in hydration list. Maybe the player already disconnected?", + uuid + ) + continue + } + + client.fireAndForget( + ClientboundSyncLargePlayerPersistentDataContainerStartPacket( + uuid + ) + ) + + val data = player.ppdcToByteArray() + var offset = 0 + while (offset < data.size) { + val chunkSize = minOf(MAX_PDC_SIZE, data.size - offset) + val chunk = data.copyOfRange(offset, offset + chunkSize) + offset += chunkSize + + client.fireAndForget( + ClientboundSyncLargePlayerPersistentDataContainerChunkPacket( + chunk + ) + ) + } + + client.fireAndForget(ClientboundSyncLargePlayerPersistentDataContainerEndPacket) + iterator.remove() + } + } + + private fun StandaloneCloudPlayerImpl.toHydrationData(client: NettyClient): Entry { + val ppdcSize = estimatedPpdcSize() + val fits = MAX_PDC_SIZE >= ppdcSize + val pdcOrCallback: Either = if (fits) { + Either.left(persistentData.toTagCompound()) + } else { + pendingPdcs.get(client.serverName) { mutableObjectSetOf() }.add(uuid) + Either.right(uuid) + } + return Entry( + uuid, + name, + server?.name, + proxyServer?.name, + ip, + pdcOrCallback + ) } } \ No newline at end of file diff --git a/surf-cloud-standalone/src/main/kotlin/dev/slne/surf/cloud/standalone/player/StandaloneCloudPlayerManagerImpl.kt b/surf-cloud-standalone/src/main/kotlin/dev/slne/surf/cloud/standalone/player/StandaloneCloudPlayerManagerImpl.kt index 109a479d..58af117f 100644 --- a/surf-cloud-standalone/src/main/kotlin/dev/slne/surf/cloud/standalone/player/StandaloneCloudPlayerManagerImpl.kt +++ b/surf-cloud-standalone/src/main/kotlin/dev/slne/surf/cloud/standalone/player/StandaloneCloudPlayerManagerImpl.kt @@ -95,23 +95,23 @@ class StandaloneCloudPlayerManagerImpl : CloudPlayerManagerImpl().synchronize() + private val items = ConcurrentHashMap() override fun getCoroutineSession(plugin: StandalonePlugin): CoroutineSession { val session = items[plugin] @@ -20,9 +18,10 @@ class CoroutineManagerImpl : CoroutineManager { } override fun setupCoroutineSession(plugin: StandalonePlugin) { - check(plugin !in items) { "Coroutine session for plugin ${plugin.meta.name} is already initialized" } - items[plugin] = CoroutineSessionImpl(plugin) - + items.compute(plugin) { _, value -> + check(value == null) { "Coroutine session for plugin ${plugin.meta.name} is already initialized" } + CoroutineSessionImpl(plugin) + } } override fun disable(plugin: StandalonePlugin) { diff --git a/surf-cloud-standalone/src/main/kotlin/dev/slne/surf/cloud/standalone/plugin/entrypoint/classloader/ByteCodeModifyingURLClassloader.kt b/surf-cloud-standalone/src/main/kotlin/dev/slne/surf/cloud/standalone/plugin/entrypoint/classloader/ByteCodeModifyingURLClassloader.kt index 8b00fcd2..0749d00d 100644 --- a/surf-cloud-standalone/src/main/kotlin/dev/slne/surf/cloud/standalone/plugin/entrypoint/classloader/ByteCodeModifyingURLClassloader.kt +++ b/surf-cloud-standalone/src/main/kotlin/dev/slne/surf/cloud/standalone/plugin/entrypoint/classloader/ByteCodeModifyingURLClassloader.kt @@ -1,7 +1,5 @@ package dev.slne.surf.cloud.standalone.plugin.entrypoint.classloader -import dev.slne.surf.surfapi.core.api.util.mutableObject2ObjectMapOf -import dev.slne.surf.surfapi.core.api.util.synchronize import java.io.IOException import java.io.UncheckedIOException import java.net.JarURLConnection @@ -10,6 +8,7 @@ import java.net.URL import java.net.URLClassLoader import java.security.CodeSigner import java.security.CodeSource +import java.util.concurrent.ConcurrentHashMap import java.util.jar.Attributes import java.util.jar.Manifest @@ -26,7 +25,7 @@ class ByteCodeModifyingURLClassloader( } } - private val manifestCache = mutableObject2ObjectMapOf().synchronize() + private val manifestCache = ConcurrentHashMap() override fun findClass(name: String): Class<*> { val path = name.replace('.', '/') + ".class" @@ -43,7 +42,7 @@ class ByteCodeModifyingURLClassloader( val lastDot = name.lastIndexOf('.') if (lastDot != -1) { - val pkgName = name.substring(0, lastDot) + val pkgName = name.take(lastDot) // Check if package already loaded. val manifest = url.manifest() diff --git a/surf-cloud-velocity/src/main/kotlin/dev/slne/surf/cloud/velocity/player/VelocityCloudPlayerManagerImpl.kt b/surf-cloud-velocity/src/main/kotlin/dev/slne/surf/cloud/velocity/player/VelocityCloudPlayerManagerImpl.kt index 25d483a1..29ceb30f 100644 --- a/surf-cloud-velocity/src/main/kotlin/dev/slne/surf/cloud/velocity/player/VelocityCloudPlayerManagerImpl.kt +++ b/surf-cloud-velocity/src/main/kotlin/dev/slne/surf/cloud/velocity/player/VelocityCloudPlayerManagerImpl.kt @@ -19,16 +19,13 @@ class VelocityCloudPlayerManagerImpl : override fun createPlayer( uuid: UUID, name: String, - proxy: Boolean, - ip: Inet4Address, - serverName: String + proxyName: String?, + serverName: String?, + ip: Inet4Address ): VelocityClientCloudPlayerImpl { return VelocityClientCloudPlayerImpl(uuid, name).also { - if (proxy) { - it.proxyServerName = serverName - } else { - it.serverName = serverName - } + it.proxyServerName = serverName + it.serverName = serverName } }