diff --git a/surf-cloud-api/surf-cloud-api-common/src/main/kotlin/dev/slne/surf/cloud/api/common/sync/SyncMap.kt b/surf-cloud-api/surf-cloud-api-common/src/main/kotlin/dev/slne/surf/cloud/api/common/sync/SyncMap.kt new file mode 100644 index 00000000..863d4b74 --- /dev/null +++ b/surf-cloud-api/surf-cloud-api-common/src/main/kotlin/dev/slne/surf/cloud/api/common/sync/SyncMap.kt @@ -0,0 +1,61 @@ +package dev.slne.surf.cloud.api.common.sync + +import dev.slne.surf.cloud.api.common.netty.network.codec.StreamCodec +import dev.slne.surf.cloud.api.common.netty.protocol.buffer.SurfByteBuf +import it.unimi.dsi.fastutil.objects.Object2ObjectMap +import kotlinx.serialization.KSerializer +import kotlinx.serialization.serializer +import org.jetbrains.annotations.ApiStatus +import org.jetbrains.annotations.Unmodifiable + +typealias SyncMapListener = (key: K, oldValue: V?, newValue: V?) -> Unit + +interface SyncMap : Object2ObjectMap { + val id: String + + /** + * There is generally no need to use this codec directly, as this sync map should always + * be automatically synchronized across the network. + */ + @get:ApiStatus.Obsolete + val codec: StreamCodec> + + fun subscribe(listener: SyncMapListener): Boolean + fun snapshot(): @Unmodifiable Object2ObjectMap + + companion object { + operator fun invoke( + id: String, + keyCodec: StreamCodec, + valueCodec: StreamCodec + ): SyncMap = of(id, keyCodec, valueCodec) + + inline operator fun invoke(id: String): SyncMap = serializable(id) + operator fun invoke( + id: String, + keySerializer: KSerializer, + valueSerializer: KSerializer + ): SyncMap = serializable(id, keySerializer, valueSerializer) + + inline fun serializable( + id: String, + ): SyncMap = serializable(id, serializer(), serializer()) + + fun serializable( + id: String, + keySerializer: KSerializer, + valueSerializer: KSerializer, + ): SyncMap = of( + id, + SurfByteBuf.streamCodecFromKotlin(keySerializer), + SurfByteBuf.streamCodecFromKotlin(valueSerializer) + ) + + fun of( + id: String, + keyCodec: StreamCodec, + valueCodec: StreamCodec + ): SyncMap = + SyncRegistry.instance.createSyncMap(id, keyCodec, valueCodec) + } +} diff --git a/surf-cloud-api/surf-cloud-api-common/src/main/kotlin/dev/slne/surf/cloud/api/common/sync/SyncRegistry.kt b/surf-cloud-api/surf-cloud-api-common/src/main/kotlin/dev/slne/surf/cloud/api/common/sync/SyncRegistry.kt index 318c7ea4..0702e762 100644 --- a/surf-cloud-api/surf-cloud-api-common/src/main/kotlin/dev/slne/surf/cloud/api/common/sync/SyncRegistry.kt +++ b/surf-cloud-api/surf-cloud-api-common/src/main/kotlin/dev/slne/surf/cloud/api/common/sync/SyncRegistry.kt @@ -19,6 +19,12 @@ interface SyncRegistry { codec: StreamCodec ): SyncSet + fun createSyncMap( + id: String, + keyCodec: StreamCodec, + valueCodec: StreamCodec + ): SyncMap + companion object { @InternalApi val instance = requiredService() diff --git a/surf-cloud-core/surf-cloud-core-client/src/main/kotlin/dev/slne/surf/cloud/core/client/netty/network/ClientRunningPacketListenerImpl.kt b/surf-cloud-core/surf-cloud-core-client/src/main/kotlin/dev/slne/surf/cloud/core/client/netty/network/ClientRunningPacketListenerImpl.kt index b8cacec3..67a8b6e0 100644 --- a/surf-cloud-core/surf-cloud-core-client/src/main/kotlin/dev/slne/surf/cloud/core/client/netty/network/ClientRunningPacketListenerImpl.kt +++ b/surf-cloud-core/surf-cloud-core-client/src/main/kotlin/dev/slne/surf/cloud/core/client/netty/network/ClientRunningPacketListenerImpl.kt @@ -444,6 +444,16 @@ class ClientRunningPacketListenerImpl( } } + override fun handleSyncMapDelta(packet: SyncMapDeltaPacket) { + try { + SyncRegistryImpl.instance.handleSyncMapDelta(packet) + } catch (e: Throwable) { + log.atWarning() + .withCause(e) + .log("Failed to handle sync map delta for packet $packet") + } + } + override fun handleSetVelocitySecret(packet: ClientboundSetVelocitySecretPacket) { try { client.velocitySecret = packet.secret diff --git a/surf-cloud-core/surf-cloud-core-client/src/main/kotlin/dev/slne/surf/cloud/core/client/netty/network/ClientSynchronizingPacketListenerImpl.kt b/surf-cloud-core/surf-cloud-core-client/src/main/kotlin/dev/slne/surf/cloud/core/client/netty/network/ClientSynchronizingPacketListenerImpl.kt index 741ef926..59830ca7 100644 --- a/surf-cloud-core/surf-cloud-core-client/src/main/kotlin/dev/slne/surf/cloud/core/client/netty/network/ClientSynchronizingPacketListenerImpl.kt +++ b/surf-cloud-core/surf-cloud-core-client/src/main/kotlin/dev/slne/surf/cloud/core/client/netty/network/ClientSynchronizingPacketListenerImpl.kt @@ -84,6 +84,16 @@ class ClientSynchronizingPacketListenerImpl( } } + override fun handleBatchSyncMap(packet: ClientboundBatchSyncMapPacket) { + try { + SyncRegistryImpl.instance.applyBatchSyncMaps(packet.syncMaps) + } catch (e: Exception) { + log.atWarning() + .withCause(e) + .log("Failed to apply batch sync maps for packet $packet") + } + } + override fun handleBatchUpdateServer(packet: ClientboundBatchUpdateServer) { serverManagerImpl.batchUpdateServer(packet.servers.map { data -> if (data.proxy) { @@ -111,6 +121,16 @@ class ClientSynchronizingPacketListenerImpl( } } + override fun handleSyncMapDelta(packet: SyncMapDeltaPacket) { + try { + SyncRegistryImpl.instance.handleSyncMapDelta(packet) + } catch (e: Exception) { + log.atWarning() + .withCause(e) + .log("Failed to handle sync map delta for packet $packet") + } + } + override fun handleSetVelocitySecret(packet: ClientboundSetVelocitySecretPacket) { try { client.velocitySecret = packet.secret diff --git a/surf-cloud-core/surf-cloud-core-client/src/main/kotlin/dev/slne/surf/cloud/core/client/sync/SyncRegistryImpl.kt b/surf-cloud-core/surf-cloud-core-client/src/main/kotlin/dev/slne/surf/cloud/core/client/sync/SyncRegistryImpl.kt index f68b5b1e..203b7147 100644 --- a/surf-cloud-core/surf-cloud-core-client/src/main/kotlin/dev/slne/surf/cloud/core/client/sync/SyncRegistryImpl.kt +++ b/surf-cloud-core/surf-cloud-core-client/src/main/kotlin/dev/slne/surf/cloud/core/client/sync/SyncRegistryImpl.kt @@ -4,10 +4,12 @@ import com.google.auto.service.AutoService import dev.slne.surf.cloud.api.client.netty.packet.fireAndForget import dev.slne.surf.cloud.api.common.config.properties.CloudProperties import dev.slne.surf.cloud.api.common.sync.SyncRegistry +import dev.slne.surf.cloud.core.common.netty.network.protocol.running.SyncMapDeltaPacket import dev.slne.surf.cloud.core.common.netty.network.protocol.running.SyncSetDeltaPacket import dev.slne.surf.cloud.core.common.netty.network.protocol.running.SyncValueChangePacket import dev.slne.surf.cloud.core.common.sync.BasicSyncValue import dev.slne.surf.cloud.core.common.sync.CommonSyncRegistryImpl +import dev.slne.surf.cloud.core.common.sync.SyncMapImpl import dev.slne.surf.cloud.core.common.sync.SyncSetImpl import dev.slne.surf.surfapi.core.api.util.logger import dev.slne.surf.surfapi.core.api.util.mutableObject2LongMapOf @@ -33,6 +35,17 @@ class SyncRegistryImpl : CommonSyncRegistryImpl() { SyncSetDeltaPacket(syncSet, added, changeId, element).fireAndForget() } + override fun afterChange( + syncMap: SyncMapImpl, + key: K, + oldValue: V?, + newValue: V?, + changeId: Long + ) { + super.afterChange(syncMap, key, oldValue, newValue, changeId) + SyncMapDeltaPacket(syncMap, key, oldValue, newValue, changeId).fireAndForget() + } + fun applyBatchSyncValue(syncValues: List>) { require(frozen) { "SyncRegistry is not frozen, cannot apply batch" } @@ -72,6 +85,37 @@ class SyncRegistryImpl : CommonSyncRegistryImpl() { } } + fun handleSyncMapDelta(packet: SyncMapDeltaPacket) { + if (!packet.registered) return + val map = + getMap(packet.mapId) + ?: error("SyncMap with id '${packet.mapId}' is not registered") + val lastChangeId = lastChangeIds.getLong(packet.mapId) + if (packet.changeId <= lastChangeId) { + log.atInfo() + .log("Ignoring stale SyncMapDeltaPacket for map '${packet.mapId}' with changeId ${packet.changeId}, last known changeId is $lastChangeId") + return + } + + if (packet.newValue != null) { + map.putInternal(packet.key, packet.newValue) + } else { + map.removeInternal(packet.key) + } + + lastChangeIds[packet.mapId] = packet.changeId + } + + fun applyBatchSyncMaps(bulk: List>>) { + bulk.forEach { (id, snapshot) -> + val map = getMap(id) + if (map != null) { + map.putAllInternal(snapshot) + lastChangeIds[id] = Long.MAX_VALUE // Reset change ID to max after bulk update + } + } + } + companion object { val instance by lazy { CommonSyncRegistryImpl.instance as SyncRegistryImpl } } diff --git a/surf-cloud-core/surf-cloud-core-common/src/main/kotlin/dev/slne/surf/cloud/core/common/netty/network/protocol/common/CommonSynchronizingRunningPacketListener.kt b/surf-cloud-core/surf-cloud-core-common/src/main/kotlin/dev/slne/surf/cloud/core/common/netty/network/protocol/common/CommonSynchronizingRunningPacketListener.kt index 2a31f385..0f8219fc 100644 --- a/surf-cloud-core/surf-cloud-core-common/src/main/kotlin/dev/slne/surf/cloud/core/common/netty/network/protocol/common/CommonSynchronizingRunningPacketListener.kt +++ b/surf-cloud-core/surf-cloud-core-common/src/main/kotlin/dev/slne/surf/cloud/core/common/netty/network/protocol/common/CommonSynchronizingRunningPacketListener.kt @@ -1,6 +1,7 @@ package dev.slne.surf.cloud.core.common.netty.network.protocol.common import dev.slne.surf.cloud.core.common.netty.network.PacketListener +import dev.slne.surf.cloud.core.common.netty.network.protocol.running.SyncMapDeltaPacket import dev.slne.surf.cloud.core.common.netty.network.protocol.running.SyncSetDeltaPacket import dev.slne.surf.cloud.core.common.netty.network.protocol.running.SyncValueChangePacket @@ -9,4 +10,6 @@ interface CommonSynchronizingRunningPacketListener : PacketListener { fun handleSyncValueChange(packet: SyncValueChangePacket) fun handleSyncSetDelta(packet: SyncSetDeltaPacket) + + fun handleSyncMapDelta(packet: SyncMapDeltaPacket) } \ No newline at end of file diff --git a/surf-cloud-core/surf-cloud-core-common/src/main/kotlin/dev/slne/surf/cloud/core/common/netty/network/protocol/running/RunningProtocols.kt b/surf-cloud-core/surf-cloud-core-common/src/main/kotlin/dev/slne/surf/cloud/core/common/netty/network/protocol/running/RunningProtocols.kt index 6987bcc0..512cb667 100644 --- a/surf-cloud-core/surf-cloud-core-common/src/main/kotlin/dev/slne/surf/cloud/core/common/netty/network/protocol/running/RunningProtocols.kt +++ b/surf-cloud-core/surf-cloud-core-common/src/main/kotlin/dev/slne/surf/cloud/core/common/netty/network/protocol/running/RunningProtocols.kt @@ -67,6 +67,7 @@ object RunningProtocols { .addPacket(RequestPlayerPermissionPacket.STREAM_CODEC) .addPacket(SyncValueChangePacket.STREAM_CODEC) .addPacket(SyncSetDeltaPacket.STREAM_CODEC) + .addPacket(SyncMapDeltaPacket.STREAM_CODEC) .addPacket(ClientboundSetVelocitySecretPacket.STREAM_CODEC) .addPacket(WhitelistStatusResponsePacket.STREAM_CODEC) .addPacket(WhitelistResponsePacket.STREAM_CODEC) @@ -140,6 +141,7 @@ object RunningProtocols { .addPacket(ServerboundQueuePlayerToGroupPacket.STREAM_CODEC) .addPacket(SyncValueChangePacket.STREAM_CODEC) .addPacket(SyncSetDeltaPacket.STREAM_CODEC) + .addPacket(SyncMapDeltaPacket.STREAM_CODEC) .addPacket(ServerboundCreateOfflineCloudPlayerIfNotExistsPacket.STREAM_CODEC) .addPacket(ServerboundRequestWhitelistStatusPacket.STREAM_CODEC) .addPacket(WhitelistStatusResponsePacket.STREAM_CODEC) diff --git a/surf-cloud-core/surf-cloud-core-common/src/main/kotlin/dev/slne/surf/cloud/core/common/netty/network/protocol/running/SyncMapDeltaPacket.kt b/surf-cloud-core/surf-cloud-core-common/src/main/kotlin/dev/slne/surf/cloud/core/common/netty/network/protocol/running/SyncMapDeltaPacket.kt new file mode 100644 index 00000000..02a7338e --- /dev/null +++ b/surf-cloud-core/surf-cloud-core-common/src/main/kotlin/dev/slne/surf/cloud/core/common/netty/network/protocol/running/SyncMapDeltaPacket.kt @@ -0,0 +1,95 @@ +package dev.slne.surf.cloud.core.common.netty.network.protocol.running + +import dev.slne.surf.cloud.api.common.meta.SurfNettyPacket +import dev.slne.surf.cloud.api.common.netty.network.protocol.PacketFlow +import dev.slne.surf.cloud.api.common.netty.packet.NettyPacket +import dev.slne.surf.cloud.api.common.netty.packet.PacketHandlerMode +import dev.slne.surf.cloud.api.common.netty.packet.packetCodec +import dev.slne.surf.cloud.api.common.netty.protocol.buffer.SurfByteBuf +import dev.slne.surf.cloud.api.common.netty.protocol.buffer.encodeError +import dev.slne.surf.cloud.core.common.netty.network.InternalNettyPacket +import dev.slne.surf.cloud.core.common.netty.network.protocol.common.CommonSynchronizingRunningPacketListener +import dev.slne.surf.cloud.core.common.sync.CommonSyncRegistryImpl +import dev.slne.surf.cloud.core.common.sync.SyncMapImpl + +@SurfNettyPacket( + "cloud:sync_map_delta", + PacketFlow.BIDIRECTIONAL, + handlerMode = PacketHandlerMode.DEFAULT +) +class SyncMapDeltaPacket : NettyPacket, + InternalNettyPacket { + companion object { + val STREAM_CODEC = packetCodec(SyncMapDeltaPacket::write, ::SyncMapDeltaPacket) + } + + val mapId: String + val key: Any? + val oldValue: Any? + val newValue: Any? + val changeId: Long + val registered: Boolean + + constructor( + syncMap: SyncMapImpl<*, *>, + key: Any?, + oldValue: Any?, + newValue: Any?, + changeId: Long + ) { + this.mapId = syncMap.id + this.key = key + this.oldValue = oldValue + this.newValue = newValue + this.changeId = changeId + this.registered = true + } + + constructor(buf: SurfByteBuf) { + this.mapId = buf.readUtf() + this.changeId = buf.readLong() + val map = CommonSyncRegistryImpl.instance.getMap(mapId) + + if (map == null) { + this.registered = false + this.key = null + this.oldValue = null + this.newValue = null + buf.skipBytes(buf.readableBytes()) + } else { + this.registered = true + this.key = map.keyCodec.decode(buf) + val hasOldValue = buf.readBoolean() + this.oldValue = if (hasOldValue) map.valueCodec.decode(buf) else null + val hasNewValue = buf.readBoolean() + this.newValue = if (hasNewValue) map.valueCodec.decode(buf) else null + } + } + + private fun write(buf: SurfByteBuf) { + buf.writeUtf(mapId) + buf.writeLong(changeId) + val map = CommonSyncRegistryImpl.instance.getMap(mapId) + ?: encodeError("SyncMap '$mapId' is not registered") + + map.keyCodec.encode(buf, key) + + if (oldValue != null) { + buf.writeBoolean(true) + map.valueCodec.encode(buf, oldValue) + } else { + buf.writeBoolean(false) + } + + if (newValue != null) { + buf.writeBoolean(true) + map.valueCodec.encode(buf, newValue) + } else { + buf.writeBoolean(false) + } + } + + override fun handle(listener: CommonSynchronizingRunningPacketListener) { + listener.handleSyncMapDelta(this) + } +} diff --git a/surf-cloud-core/surf-cloud-core-common/src/main/kotlin/dev/slne/surf/cloud/core/common/netty/network/protocol/synchronizing/ClientSynchronizingPacketListener.kt b/surf-cloud-core/surf-cloud-core-common/src/main/kotlin/dev/slne/surf/cloud/core/common/netty/network/protocol/synchronizing/ClientSynchronizingPacketListener.kt index f2792ab4..597733ec 100644 --- a/surf-cloud-core/surf-cloud-core-common/src/main/kotlin/dev/slne/surf/cloud/core/common/netty/network/protocol/synchronizing/ClientSynchronizingPacketListener.kt +++ b/surf-cloud-core/surf-cloud-core-common/src/main/kotlin/dev/slne/surf/cloud/core/common/netty/network/protocol/synchronizing/ClientSynchronizingPacketListener.kt @@ -19,6 +19,8 @@ interface ClientSynchronizingPacketListener : ClientCommonPacketListener, fun handleBatchSyncSet(packet: ClientboundBatchSyncSetPacket) + fun handleBatchSyncMap(packet: ClientboundBatchSyncMapPacket) + fun handleBatchUpdateServer(packet: ClientboundBatchUpdateServer) fun handlePlayerCacheHydrateStart(packet: ClientboundPlayerCacheHydrateStartPacket) diff --git a/surf-cloud-core/surf-cloud-core-common/src/main/kotlin/dev/slne/surf/cloud/core/common/netty/network/protocol/synchronizing/ClientboundBatchSyncMapPacket.kt b/surf-cloud-core/surf-cloud-core-common/src/main/kotlin/dev/slne/surf/cloud/core/common/netty/network/protocol/synchronizing/ClientboundBatchSyncMapPacket.kt new file mode 100644 index 00000000..4cbcf2f7 --- /dev/null +++ b/surf-cloud-core/surf-cloud-core-common/src/main/kotlin/dev/slne/surf/cloud/core/common/netty/network/protocol/synchronizing/ClientboundBatchSyncMapPacket.kt @@ -0,0 +1,81 @@ +package dev.slne.surf.cloud.core.common.netty.network.protocol.synchronizing + +import dev.slne.surf.cloud.api.common.meta.SurfNettyPacket +import dev.slne.surf.cloud.api.common.netty.network.ConnectionProtocol +import dev.slne.surf.cloud.api.common.netty.network.protocol.PacketFlow +import dev.slne.surf.cloud.api.common.netty.packet.NettyPacket +import dev.slne.surf.cloud.api.common.netty.packet.PacketHandlerMode +import dev.slne.surf.cloud.api.common.netty.packet.packetCodec +import dev.slne.surf.cloud.api.common.netty.protocol.buffer.SurfByteBuf +import dev.slne.surf.cloud.api.common.netty.protocol.buffer.encodeError +import dev.slne.surf.cloud.core.common.netty.network.InternalNettyPacket +import dev.slne.surf.cloud.core.common.sync.CommonSyncRegistryImpl +import dev.slne.surf.cloud.core.common.sync.SyncMapImpl +import dev.slne.surf.surfapi.core.api.util.logger +import dev.slne.surf.surfapi.core.api.util.mutableObjectListOf + +@SurfNettyPacket( + "cloud:batch_sync_map", + PacketFlow.BIDIRECTIONAL, + ConnectionProtocol.SYNCHRONIZING, + handlerMode = PacketHandlerMode.DEFAULT +) +class ClientboundBatchSyncMapPacket : NettyPacket, InternalNettyPacket { + companion object { + private val log = logger() + val STREAM_CODEC = + packetCodec(ClientboundBatchSyncMapPacket::write, ::ClientboundBatchSyncMapPacket) + } + + val syncMaps: List>> + + constructor(syncMaps: Map>) { + this.syncMaps = syncMaps.map { (key, value) -> key to value.toMap() } + } + + private constructor(buf: SurfByteBuf) { + val unknownSyncMaps = mutableObjectListOf() + + syncMaps = buf.readList { buf -> + val syncId = buf.readUtf() + val syncSize = buf.readInt() + + val syncMap = CommonSyncRegistryImpl.instance.getMap(syncId) + if (syncMap == null) { + buf.skipBytes(syncSize) + unknownSyncMaps.add(syncId) + null + } else { + syncId to syncMap.codec.decode(buf) + } + }.filterNotNull() + + if (unknownSyncMaps.isNotEmpty()) { + log.atWarning() + .log("Unknown sync maps: [${unknownSyncMaps.joinToString(", ")}]") + } + } + + private fun write(buf: SurfByteBuf) { + buf.writeCollection(syncMaps) { buf, (syncId, map) -> + buf.writeUtf(syncId) + + // Reserve 4 bytes for length + val lengthIndex = buf.writerIndex() + buf.writeInt(0) + + val startIndex = buf.writerIndex() + val syncMap = CommonSyncRegistryImpl.instance.getMap(syncId) + ?: encodeError("SyncMap '$syncId' is not registered in SyncRegistry") + syncMap.codec.encode(buf, map) + val endIndex = buf.writerIndex() + + // Write the actual length of the encoded value + buf.setInt(lengthIndex, endIndex - startIndex) + } + } + + override fun handle(listener: ClientSynchronizingPacketListener) { + listener.handleBatchSyncMap(this) + } +} diff --git a/surf-cloud-core/surf-cloud-core-common/src/main/kotlin/dev/slne/surf/cloud/core/common/netty/network/protocol/synchronizing/SynchronizingProtocols.kt b/surf-cloud-core/surf-cloud-core-common/src/main/kotlin/dev/slne/surf/cloud/core/common/netty/network/protocol/synchronizing/SynchronizingProtocols.kt index 8cc13959..b69f7b4b 100644 --- a/surf-cloud-core/surf-cloud-core-common/src/main/kotlin/dev/slne/surf/cloud/core/common/netty/network/protocol/synchronizing/SynchronizingProtocols.kt +++ b/surf-cloud-core/surf-cloud-core-common/src/main/kotlin/dev/slne/surf/cloud/core/common/netty/network/protocol/synchronizing/SynchronizingProtocols.kt @@ -6,6 +6,7 @@ import dev.slne.surf.cloud.core.common.netty.network.protocol.ProtocolInfoBuilde import dev.slne.surf.cloud.core.common.netty.network.protocol.common.* import dev.slne.surf.cloud.core.common.netty.network.protocol.running.ClientboundBatchUpdateServer import dev.slne.surf.cloud.core.common.netty.network.protocol.running.ServerboundCreateOfflineCloudPlayerIfNotExistsPacket +import dev.slne.surf.cloud.core.common.netty.network.protocol.running.SyncMapDeltaPacket import dev.slne.surf.cloud.core.common.netty.network.protocol.running.SyncSetDeltaPacket import dev.slne.surf.cloud.core.common.netty.network.protocol.running.SyncValueChangePacket @@ -22,7 +23,9 @@ object SynchronizingProtocols { .addPacket(SyncValueChangePacket.STREAM_CODEC) .addPacket(ClientboundBatchSyncValuePacket.STREAM_CODEC) .addPacket(ClientboundBatchSyncSetPacket.STREAM_CODEC) + .addPacket(ClientboundBatchSyncMapPacket.STREAM_CODEC) .addPacket(SyncSetDeltaPacket.STREAM_CODEC) + .addPacket(SyncMapDeltaPacket.STREAM_CODEC) .addPacket(FinishSynchronizingPacket.STREAM_CODEC) .addPacket(ClientboundSynchronizeFinishPacket.STREAM_CODEC) .addPacket(ClientboundSetVelocitySecretPacket.STREAM_CODEC) @@ -40,6 +43,7 @@ object SynchronizingProtocols { .addPacket(ServerboundSynchronizeFinishAcknowledgedPacket.STREAM_CODEC) .addPacket(SyncValueChangePacket.STREAM_CODEC) .addPacket(SyncSetDeltaPacket.STREAM_CODEC) + .addPacket(SyncMapDeltaPacket.STREAM_CODEC) .addPacket(FinishSynchronizingPacket.STREAM_CODEC) .addPacket(ServerboundCreateOfflineCloudPlayerIfNotExistsPacket.STREAM_CODEC) } diff --git a/surf-cloud-core/surf-cloud-core-common/src/main/kotlin/dev/slne/surf/cloud/core/common/sync/CommonSyncRegistryImpl.kt b/surf-cloud-core/surf-cloud-core-common/src/main/kotlin/dev/slne/surf/cloud/core/common/sync/CommonSyncRegistryImpl.kt index bef90150..785fd4ee 100644 --- a/surf-cloud-core/surf-cloud-core-common/src/main/kotlin/dev/slne/surf/cloud/core/common/sync/CommonSyncRegistryImpl.kt +++ b/surf-cloud-core/surf-cloud-core-common/src/main/kotlin/dev/slne/surf/cloud/core/common/sync/CommonSyncRegistryImpl.kt @@ -2,6 +2,7 @@ package dev.slne.surf.cloud.core.common.sync import dev.slne.surf.cloud.api.common.netty.network.codec.StreamCodec import dev.slne.surf.cloud.api.common.netty.protocol.buffer.SurfByteBuf +import dev.slne.surf.cloud.api.common.sync.SyncMap import dev.slne.surf.cloud.api.common.sync.SyncRegistry import dev.slne.surf.cloud.api.common.sync.SyncSet import dev.slne.surf.cloud.api.common.sync.SyncValue @@ -14,6 +15,7 @@ abstract class CommonSyncRegistryImpl : SyncRegistry { protected val syncValues = mutableObject2ObjectMapOf>() protected val syncSets = mutableObject2ObjectMapOf>() + protected val syncMaps = mutableObject2ObjectMapOf>() fun freeze() { require(!frozen) { "SyncRegistry is already frozen" } @@ -34,6 +36,13 @@ abstract class CommonSyncRegistryImpl : SyncRegistry { check(previous == null) { "SyncSet with id '${syncSet.id}' was already registered" } } + fun register(syncMap: SyncMapImpl) { + require(!frozen) { "SyncRegistry is frozen and cannot accept new SyncMaps" } + + val previous = syncMaps.put(syncMap.id, syncMap) + check(previous == null) { "SyncMap with id '${syncMap.id}' was already registered" } + } + @OverridingMethodsMustInvokeSuper open fun afterChange(syncValue: BasicSyncValue<*>) { require(frozen) { "SyncRegistry is not frozen, cannot process afterChange" } @@ -51,6 +60,18 @@ abstract class CommonSyncRegistryImpl : SyncRegistry { require(syncSet.id in syncSets) { "SyncSet with id '${syncSet.id}' is not registered" } } + @OverridingMethodsMustInvokeSuper + open fun afterChange( + syncMap: SyncMapImpl, + key: K, + oldValue: V?, + newValue: V?, + changeId: Long + ) { + require(frozen) { "SyncRegistry is not frozen, cannot process afterChange" } + require(syncMap.id in syncMaps) { "SyncMap with id '${syncMap.id}' is not registered" } + } + fun getSyncValueCodec(syncId: String): StreamCodec? { require(frozen) { "SyncRegistry is not frozen, cannot get codec" } @@ -61,6 +82,9 @@ abstract class CommonSyncRegistryImpl : SyncRegistry { @Suppress("UNCHECKED_CAST") fun getSet(id: String): SyncSetImpl? = syncSets[id] as? SyncSetImpl + @Suppress("UNCHECKED_CAST") + fun getMap(id: String): SyncMapImpl? = syncMaps[id] as? SyncMapImpl + fun updateSyncValue(syncId: String, value: Any?) { require(frozen) { "SyncRegistry is not frozen, cannot update SyncValue" } @@ -83,6 +107,14 @@ abstract class CommonSyncRegistryImpl : SyncRegistry { return SyncSetImpl(id, codec) } + override fun createSyncMap( + id: String, + keyCodec: StreamCodec, + valueCodec: StreamCodec + ): SyncMap { + return SyncMapImpl(id, keyCodec, valueCodec) + } + companion object { val instance by lazy { SyncRegistry.instance as CommonSyncRegistryImpl } } diff --git a/surf-cloud-core/surf-cloud-core-common/src/main/kotlin/dev/slne/surf/cloud/core/common/sync/SyncMapImpl.kt b/surf-cloud-core/surf-cloud-core-common/src/main/kotlin/dev/slne/surf/cloud/core/common/sync/SyncMapImpl.kt new file mode 100644 index 00000000..d7d3590f --- /dev/null +++ b/surf-cloud-core/surf-cloud-core-common/src/main/kotlin/dev/slne/surf/cloud/core/common/sync/SyncMapImpl.kt @@ -0,0 +1,218 @@ +package dev.slne.surf.cloud.core.common.sync + +import dev.slne.surf.cloud.api.common.netty.network.codec.StreamCodec +import dev.slne.surf.cloud.api.common.netty.protocol.buffer.SurfByteBuf +import dev.slne.surf.cloud.api.common.sync.SyncMap +import dev.slne.surf.cloud.api.common.sync.SyncMapListener +import dev.slne.surf.surfapi.core.api.util.mutableObject2ObjectMapOf +import dev.slne.surf.surfapi.core.api.util.toObject2ObjectMap +import dev.slne.surf.surfapi.core.api.util.logger +import it.unimi.dsi.fastutil.objects.Object2ObjectMap +import it.unimi.dsi.fastutil.objects.ObjectIterator +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.CopyOnWriteArrayList +import java.util.concurrent.atomic.AtomicLong +import java.util.function.BiConsumer +import java.util.function.Consumer + +class SyncMapImpl( + override val id: String, + val keyCodec: StreamCodec, + val valueCodec: StreamCodec +) : SyncMap { + override val codec: StreamCodec> = StreamCodec.of( + { buf, value -> + buf.writeMap(value, + { buf, key -> keyCodec.encode(buf, key) }, + { buf, v -> valueCodec.encode(buf, v) } + ) + }, + { buf -> + buf.readMap( + { mutableObject2ObjectMapOf(it) }, + { buf -> keyCodec.decode(buf) }, + { buf -> valueCodec.decode(buf) } + ) + } + ) + + private val backing = ConcurrentHashMap() + private val listeners = CopyOnWriteArrayList>() + private val changeCounter = AtomicLong() + + init { + CommonSyncRegistryImpl.instance.register(this) + } + + override fun subscribe(listener: SyncMapListener) = listeners.addIfAbsent(listener) + + fun putInternal(key: K, value: V): V? = backing.put(key, value) + fun removeInternal(key: K): V? = backing.remove(key) + fun putAllInternal(entries: Map) = backing.putAll(entries) + + override fun put(key: K, value: V): V? { + val oldValue = backing.put(key, value) + if (oldValue != value) { + fireDelta(key, oldValue, value) + } + return oldValue + } + + override fun remove(key: K): V? { + val oldValue = backing.remove(key) + if (oldValue != null) { + fireDelta(key, oldValue, null) + } + return oldValue + } + + private fun fireDelta(key: K, oldValue: V?, newValue: V?) { + val changeId = changeCounter.incrementAndGet() + callListeners(key, oldValue, newValue) + CommonSyncRegistryImpl.instance.afterChange(this, key, oldValue, newValue, changeId) + } + + private fun callListeners(key: K, oldValue: V?, newValue: V?) { + for (listener in listeners) { + try { + listener(key, oldValue, newValue) + } catch (e: Exception) { + log.atWarning() + .withCause(e) + .log("Error while notifying listener for SyncMap '$id'") + } + } + } + + override val size: Int get() = backing.size + override fun containsKey(key: K) = backing.containsKey(key) + override fun containsValue(value: V) = backing.containsValue(value) + override fun get(key: K): V? = backing[key] + override fun isEmpty() = backing.isEmpty() + + override fun putAll(from: Map) { + from.forEach { (key, value) -> put(key, value) } + } + + override fun clear() { + backing.keys.toList().forEach { remove(it) } + } + + override fun iterator(): ObjectIterator> = SyncMapIterator() + + override fun snapshot(): Object2ObjectMap = backing.toObject2ObjectMap() + + override val keys: MutableSet get() = backing.keys + override val values: MutableCollection get() = backing.values + override val entries: MutableSet> get() = backing.entries + + override fun object2ObjectEntrySet(): Object2ObjectMap.FastEntrySet { + return object : Object2ObjectMap.FastEntrySet { + override val size: Int get() = backing.size + override fun isEmpty() = backing.isEmpty() + override fun contains(element: Object2ObjectMap.Entry?) = + element != null && backing[element.key] == element.value + override fun iterator(): ObjectIterator> = + this@SyncMapImpl.iterator() + override fun add(element: Object2ObjectMap.Entry): Boolean { + put(element.key, element.value) + return true + } + override fun remove(element: Object2ObjectMap.Entry?): Boolean { + if (element != null && backing[element.key] == element.value) { + this@SyncMapImpl.remove(element.key) + return true + } + return false + } + override fun containsAll(elements: Collection>) = + elements.all { contains(it) } + override fun addAll(elements: Collection>): Boolean { + var modified = false + for (entry in elements) { + put(entry.key, entry.value) + modified = true + } + return modified + } + override fun removeAll(elements: Collection>): Boolean { + var modified = false + for (entry in elements) { + if (remove(entry)) modified = true + } + return modified + } + override fun retainAll(elements: Collection>): Boolean { + val toRemove = backing.entries.filter { entry -> + !elements.any { it.key == entry.key && it.value == entry.value } + } + var modified = false + for (entry in toRemove) { + this@SyncMapImpl.remove(entry.key) + modified = true + } + return modified + } + override fun clear() = this@SyncMapImpl.clear() + override fun fastIterator(): ObjectIterator> = iterator() + override fun fastForEach(consumer: Consumer>) { + backing.forEach { (k, v) -> + consumer.accept(object : Object2ObjectMap.Entry { + override val key = k + override val value = v + override fun setValue(value: V): V { + val old = backing[k] + put(k, value) + return old ?: value + } + }) + } + } + } + } + + private inner class SyncMapIterator : ObjectIterator> { + private val iterator = backing.entries.iterator() + private var last: K? = null + + override fun hasNext() = iterator.hasNext() + + override fun next(): Object2ObjectMap.Entry { + val entry = iterator.next() + last = entry.key + return object : Object2ObjectMap.Entry { + override val key = entry.key + override val value = entry.value + override fun setValue(value: V): V { + val old = entry.value + put(entry.key, value) + return old + } + } + } + + override fun remove() { + val keyToRemove = last ?: error("next() must be called before remove()") + this@SyncMapImpl.remove(keyToRemove) + last = null + } + + override fun forEachRemaining(action: Consumer>) { + iterator.forEachRemaining { entry -> + action.accept(object : Object2ObjectMap.Entry { + override val key = entry.key + override val value = entry.value + override fun setValue(value: V): V { + val old = entry.value + put(entry.key, value) + return old + } + }) + } + } + } + + companion object { + private val log = logger() + } +} diff --git a/surf-cloud-standalone/src/main/kotlin/dev/slne/surf/cloud/standalone/netty/server/network/ServerRunningPacketListenerImpl.kt b/surf-cloud-standalone/src/main/kotlin/dev/slne/surf/cloud/standalone/netty/server/network/ServerRunningPacketListenerImpl.kt index 939ca2a1..f22cb843 100644 --- a/surf-cloud-standalone/src/main/kotlin/dev/slne/surf/cloud/standalone/netty/server/network/ServerRunningPacketListenerImpl.kt +++ b/surf-cloud-standalone/src/main/kotlin/dev/slne/surf/cloud/standalone/netty/server/network/ServerRunningPacketListenerImpl.kt @@ -700,6 +700,16 @@ class ServerRunningPacketListenerImpl( } } + override fun handleSyncMapDelta(packet: SyncMapDeltaPacket) { + try { + SyncRegistryImpl.instance.handleSyncMapDeltaPacket(packet, connection) + } catch (e: Throwable) { + log.atWarning() + .withCause(e) + .log("Failed to handle sync map delta packet: %s", packet.mapId) + } + } + override fun handleCreateOfflineCloudPlayerIfNotExists(packet: ServerboundCreateOfflineCloudPlayerIfNotExistsPacket) { CloudPlayerManager.getOfflinePlayer(packet.uuid, true) } diff --git a/surf-cloud-standalone/src/main/kotlin/dev/slne/surf/cloud/standalone/netty/server/network/ServerSynchronizingPacketListenerImpl.kt b/surf-cloud-standalone/src/main/kotlin/dev/slne/surf/cloud/standalone/netty/server/network/ServerSynchronizingPacketListenerImpl.kt index a3696a36..3d69bb44 100644 --- a/surf-cloud-standalone/src/main/kotlin/dev/slne/surf/cloud/standalone/netty/server/network/ServerSynchronizingPacketListenerImpl.kt +++ b/surf-cloud-standalone/src/main/kotlin/dev/slne/surf/cloud/standalone/netty/server/network/ServerSynchronizingPacketListenerImpl.kt @@ -106,6 +106,16 @@ class ServerSynchronizingPacketListenerImpl( } } + override fun handleSyncMapDelta(packet: SyncMapDeltaPacket) { + try { + SyncRegistryImpl.instance.handleSyncMapDeltaPacket(packet, connection) + } catch (e: Exception) { + log.atWarning() + .withCause(e) + .log("Failed to handle sync map delta for packet $packet") + } + } + override fun handleCreateOfflineCloudPlayerIfNotExists(packet: ServerboundCreateOfflineCloudPlayerIfNotExistsPacket) { CloudPlayerManager.getOfflinePlayer(packet.uuid, true) } diff --git a/surf-cloud-standalone/src/main/kotlin/dev/slne/surf/cloud/standalone/netty/server/network/config/SynchronizeRegistriesTask.kt b/surf-cloud-standalone/src/main/kotlin/dev/slne/surf/cloud/standalone/netty/server/network/config/SynchronizeRegistriesTask.kt index d4e725ab..c5eba694 100644 --- a/surf-cloud-standalone/src/main/kotlin/dev/slne/surf/cloud/standalone/netty/server/network/config/SynchronizeRegistriesTask.kt +++ b/surf-cloud-standalone/src/main/kotlin/dev/slne/surf/cloud/standalone/netty/server/network/config/SynchronizeRegistriesTask.kt @@ -2,6 +2,7 @@ package dev.slne.surf.cloud.standalone.netty.server.network.config import dev.slne.surf.cloud.api.common.netty.NettyClient import dev.slne.surf.cloud.api.common.plugin.spring.task.CloudInitialSynchronizeTask +import dev.slne.surf.cloud.core.common.netty.network.protocol.synchronizing.ClientboundBatchSyncMapPacket import dev.slne.surf.cloud.core.common.netty.network.protocol.synchronizing.ClientboundBatchSyncSetPacket import dev.slne.surf.cloud.core.common.netty.network.protocol.synchronizing.ClientboundBatchSyncValuePacket import dev.slne.surf.cloud.standalone.sync.SyncRegistryImpl @@ -11,5 +12,6 @@ object SynchronizeRegistriesTask : CloudInitialSynchronizeTask { override suspend fun execute(client: NettyClient) { client.connection.send(ClientboundBatchSyncValuePacket(SyncRegistryImpl.instance.prepareBatchSyncValues())) client.connection.send(ClientboundBatchSyncSetPacket(SyncRegistryImpl.instance.prepareBatchSyncSets())) + client.connection.send(ClientboundBatchSyncMapPacket(SyncRegistryImpl.instance.prepareBatchSyncMaps())) } } \ No newline at end of file diff --git a/surf-cloud-standalone/src/main/kotlin/dev/slne/surf/cloud/standalone/sync/SyncRegistryImpl.kt b/surf-cloud-standalone/src/main/kotlin/dev/slne/surf/cloud/standalone/sync/SyncRegistryImpl.kt index cd36d099..1c0eeada 100644 --- a/surf-cloud-standalone/src/main/kotlin/dev/slne/surf/cloud/standalone/sync/SyncRegistryImpl.kt +++ b/surf-cloud-standalone/src/main/kotlin/dev/slne/surf/cloud/standalone/sync/SyncRegistryImpl.kt @@ -4,10 +4,12 @@ import com.google.auto.service.AutoService import dev.slne.surf.cloud.api.common.netty.packet.NettyPacket import dev.slne.surf.cloud.api.common.sync.SyncRegistry import dev.slne.surf.cloud.core.common.netty.network.ConnectionImpl +import dev.slne.surf.cloud.core.common.netty.network.protocol.running.SyncMapDeltaPacket import dev.slne.surf.cloud.core.common.netty.network.protocol.running.SyncSetDeltaPacket import dev.slne.surf.cloud.core.common.netty.network.protocol.running.SyncValueChangePacket import dev.slne.surf.cloud.core.common.sync.BasicSyncValue import dev.slne.surf.cloud.core.common.sync.CommonSyncRegistryImpl +import dev.slne.surf.cloud.core.common.sync.SyncMapImpl import dev.slne.surf.cloud.core.common.sync.SyncSetImpl import dev.slne.surf.cloud.core.common.util.bean import dev.slne.surf.cloud.standalone.netty.server.NettyServerImpl @@ -42,6 +44,24 @@ class SyncRegistryImpl : CommonSyncRegistryImpl() { broadcast(SyncSetDeltaPacket(syncSet, added, changeId, element)) } + override fun afterChange( + syncMap: SyncMapImpl, + key: K, + oldValue: V?, + newValue: V?, + changeId: Long + ) { + super.afterChange(syncMap, key, oldValue, newValue, changeId) + val lastChangeId = lastChangeIds.getLong(syncMap.id) + if (changeId <= lastChangeId) { + log.atInfo() + .log("Ignoring stale SyncMapDeltaPacket for map '${syncMap.id}' with changeId $changeId, last known changeId is $lastChangeId") + return + } + lastChangeIds[syncMap.id] = changeId + broadcast(SyncMapDeltaPacket(syncMap, key, oldValue, newValue, changeId)) + } + fun handleChangePacket(packet: SyncValueChangePacket, sender: ConnectionImpl) { if (!packet.registered) return val syncId = packet.syncId @@ -77,6 +97,32 @@ class SyncRegistryImpl : CommonSyncRegistryImpl() { broadcast(packet, sender) } + fun handleSyncMapDeltaPacket(packet: SyncMapDeltaPacket, sender: ConnectionImpl) { + if (!packet.registered) return + val map = getMap(packet.mapId) + if (map == null) { + log.atWarning() + .log("SyncMap with id '${packet.mapId}' not found, cannot apply delta") + return + } + + val lastChangeId = lastChangeIds.getLong(packet.mapId) + if (packet.changeId <= lastChangeId) { + log.atInfo() + .log("Ignoring stale SyncMapDeltaPacket for map '${packet.mapId}' with changeId ${packet.changeId}, last known changeId is $lastChangeId") + return + } + + if (packet.newValue != null) { + map.putInternal(packet.key, packet.newValue) + } else { + map.removeInternal(packet.key) + } + lastChangeIds[packet.mapId] = packet.changeId + + broadcast(packet, sender) + } + fun prepareBatchSyncValues(): Map> { require(frozen) { "SyncRegistry is not frozen, cannot prepare batch" } @@ -88,6 +134,11 @@ class SyncRegistryImpl : CommonSyncRegistryImpl() { return syncSets.toMap() } + fun prepareBatchSyncMaps(): Map> { + require(frozen) { "SyncRegistry is not frozen, cannot prepare batch sync map" } + return syncMaps.toMap() + } + private fun broadcast(packet: NettyPacket, sender: ConnectionImpl? = null) { if (sender == null) { bean().connection.broadcast(packet) diff --git a/surf-cloud-test-plugin/surf-cloud-test-standalone/src/main/kotlin/dev/slne/surf/cloudtest/standalone/test/sync/SyncMapTest.kt b/surf-cloud-test-plugin/surf-cloud-test-standalone/src/main/kotlin/dev/slne/surf/cloudtest/standalone/test/sync/SyncMapTest.kt new file mode 100644 index 00000000..635cbe0a --- /dev/null +++ b/surf-cloud-test-plugin/surf-cloud-test-standalone/src/main/kotlin/dev/slne/surf/cloudtest/standalone/test/sync/SyncMapTest.kt @@ -0,0 +1,61 @@ +package dev.slne.surf.cloudtest.standalone.test.sync + +import dev.slne.surf.cloud.api.common.sync.SyncMap +import kotlinx.serialization.Serializable +import org.springframework.stereotype.Component + +@Component +class SyncMapTest { + val syncMap = SyncMap("player_data_map") + + fun test() { + syncMap.subscribe { key, oldValue, newValue -> + println("SyncMap changed for key '$key': $oldValue -> $newValue") + } + + // Add some initial data + syncMap["player1"] = PlayerData( + uuid = "uuid-1", + name = "Player1", + score = 100 + ) + + syncMap["player2"] = PlayerData( + uuid = "uuid-2", + name = "Player2", + score = 200 + ) + + println("Current map size: ${syncMap.size}") + println("Player1 data: ${syncMap["player1"]}") + println("Player2 data: ${syncMap["player2"]}") + + // Update existing entry + syncMap["player1"] = PlayerData( + uuid = "uuid-1", + name = "Player1", + score = 150 + ) + + // Remove entry + syncMap.remove("player2") + println("Map size after removal: ${syncMap.size}") + + // Iterate over entries + println("All players:") + for ((key, value) in syncMap) { + println(" $key -> $value") + } + + // Create snapshot for read-only access + val snapshot = syncMap.snapshot() + println("Snapshot size: ${snapshot.size}") + } + + @Serializable + data class PlayerData( + val uuid: String, + val name: String, + val score: Int + ) +}