Skip to content
This repository was archived by the owner on Dec 10, 2025. It is now read-only.
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package dev.slne.surf.cloud.api.common.sync

import dev.slne.surf.cloud.api.common.netty.network.codec.StreamCodec
import dev.slne.surf.cloud.api.common.netty.protocol.buffer.SurfByteBuf
import it.unimi.dsi.fastutil.objects.Object2ObjectMap
import kotlinx.serialization.KSerializer
import kotlinx.serialization.serializer
import org.jetbrains.annotations.ApiStatus
import org.jetbrains.annotations.Unmodifiable

typealias SyncMapListener<K, V> = (key: K, oldValue: V?, newValue: V?) -> Unit

interface SyncMap<K, V> : Object2ObjectMap<K, V> {
val id: String

/**
* There is generally no need to use this codec directly, as this sync map should always
* be automatically synchronized across the network.
*/
@get:ApiStatus.Obsolete
val codec: StreamCodec<SurfByteBuf, Map<K, V>>

fun subscribe(listener: SyncMapListener<K, V>): Boolean
fun snapshot(): @Unmodifiable Object2ObjectMap<K, V>

companion object {
operator fun <K, V> invoke(
id: String,
keyCodec: StreamCodec<SurfByteBuf, K>,
valueCodec: StreamCodec<SurfByteBuf, V>
): SyncMap<K, V> = of(id, keyCodec, valueCodec)

inline operator fun <reified K, reified V> invoke(id: String): SyncMap<K, V> = serializable(id)
operator fun <K, V> invoke(
id: String,
keySerializer: KSerializer<K>,
valueSerializer: KSerializer<V>
): SyncMap<K, V> = serializable(id, keySerializer, valueSerializer)

inline fun <reified K, reified V> serializable(
id: String,
): SyncMap<K, V> = serializable(id, serializer(), serializer())

fun <K, V> serializable(
id: String,
keySerializer: KSerializer<K>,
valueSerializer: KSerializer<V>,
): SyncMap<K, V> = of(
id,
SurfByteBuf.streamCodecFromKotlin(keySerializer),
SurfByteBuf.streamCodecFromKotlin(valueSerializer)
)

fun <K, V> of(
id: String,
keyCodec: StreamCodec<SurfByteBuf, K>,
valueCodec: StreamCodec<SurfByteBuf, V>
): SyncMap<K, V> =
SyncRegistry.instance.createSyncMap(id, keyCodec, valueCodec)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,12 @@ interface SyncRegistry {
codec: StreamCodec<SurfByteBuf, T>
): SyncSet<T>

fun <K, V> createSyncMap(
id: String,
keyCodec: StreamCodec<SurfByteBuf, K>,
valueCodec: StreamCodec<SurfByteBuf, V>
): SyncMap<K, V>

companion object {
@InternalApi
val instance = requiredService<SyncRegistry>()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -444,6 +444,16 @@ class ClientRunningPacketListenerImpl(
}
}

override fun handleSyncMapDelta(packet: SyncMapDeltaPacket) {
try {
SyncRegistryImpl.instance.handleSyncMapDelta(packet)
} catch (e: Throwable) {
log.atWarning()
.withCause(e)
.log("Failed to handle sync map delta for packet $packet")
}
}

override fun handleSetVelocitySecret(packet: ClientboundSetVelocitySecretPacket) {
try {
client.velocitySecret = packet.secret
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,16 @@ class ClientSynchronizingPacketListenerImpl(
}
}

override fun handleBatchSyncMap(packet: ClientboundBatchSyncMapPacket) {
try {
SyncRegistryImpl.instance.applyBatchSyncMaps(packet.syncMaps)
} catch (e: Exception) {
log.atWarning()
.withCause(e)
.log("Failed to apply batch sync maps for packet $packet")
}
}

override fun handleBatchUpdateServer(packet: ClientboundBatchUpdateServer) {
serverManagerImpl.batchUpdateServer(packet.servers.map { data ->
if (data.proxy) {
Expand Down Expand Up @@ -111,6 +121,16 @@ class ClientSynchronizingPacketListenerImpl(
}
}

override fun handleSyncMapDelta(packet: SyncMapDeltaPacket) {
try {
SyncRegistryImpl.instance.handleSyncMapDelta(packet)
} catch (e: Exception) {
log.atWarning()
.withCause(e)
.log("Failed to handle sync map delta for packet $packet")
}
}

override fun handleSetVelocitySecret(packet: ClientboundSetVelocitySecretPacket) {
try {
client.velocitySecret = packet.secret
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@ import com.google.auto.service.AutoService
import dev.slne.surf.cloud.api.client.netty.packet.fireAndForget
import dev.slne.surf.cloud.api.common.config.properties.CloudProperties
import dev.slne.surf.cloud.api.common.sync.SyncRegistry
import dev.slne.surf.cloud.core.common.netty.network.protocol.running.SyncMapDeltaPacket
import dev.slne.surf.cloud.core.common.netty.network.protocol.running.SyncSetDeltaPacket
import dev.slne.surf.cloud.core.common.netty.network.protocol.running.SyncValueChangePacket
import dev.slne.surf.cloud.core.common.sync.BasicSyncValue
import dev.slne.surf.cloud.core.common.sync.CommonSyncRegistryImpl
import dev.slne.surf.cloud.core.common.sync.SyncMapImpl
import dev.slne.surf.cloud.core.common.sync.SyncSetImpl
import dev.slne.surf.surfapi.core.api.util.logger
import dev.slne.surf.surfapi.core.api.util.mutableObject2LongMapOf
Expand All @@ -33,6 +35,17 @@ class SyncRegistryImpl : CommonSyncRegistryImpl() {
SyncSetDeltaPacket(syncSet, added, changeId, element).fireAndForget()
}

override fun <K, V> afterChange(
syncMap: SyncMapImpl<K, V>,
key: K,
oldValue: V?,
newValue: V?,
changeId: Long
) {
super.afterChange(syncMap, key, oldValue, newValue, changeId)
SyncMapDeltaPacket(syncMap, key, oldValue, newValue, changeId).fireAndForget()
}

fun applyBatchSyncValue(syncValues: List<Pair<String, Any?>>) {
require(frozen) { "SyncRegistry is not frozen, cannot apply batch" }

Expand Down Expand Up @@ -72,6 +85,37 @@ class SyncRegistryImpl : CommonSyncRegistryImpl() {
}
}

fun handleSyncMapDelta(packet: SyncMapDeltaPacket) {
if (!packet.registered) return
val map =
getMap<Any?, Any?>(packet.mapId)
?: error("SyncMap with id '${packet.mapId}' is not registered")
val lastChangeId = lastChangeIds.getLong(packet.mapId)
if (packet.changeId <= lastChangeId) {
log.atInfo()
.log("Ignoring stale SyncMapDeltaPacket for map '${packet.mapId}' with changeId ${packet.changeId}, last known changeId is $lastChangeId")
return
}

if (packet.newValue != null) {
map.putInternal(packet.key, packet.newValue)
} else {
map.removeInternal(packet.key)
}

lastChangeIds[packet.mapId] = packet.changeId
}

fun applyBatchSyncMaps(bulk: List<Pair<String, Map<Any?, Any?>>>) {
bulk.forEach { (id, snapshot) ->
val map = getMap<Any?, Any?>(id)
if (map != null) {
map.putAllInternal(snapshot)
lastChangeIds[id] = Long.MAX_VALUE // Reset change ID to max after bulk update
}
}
}

companion object {
val instance by lazy { CommonSyncRegistryImpl.instance as SyncRegistryImpl }
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package dev.slne.surf.cloud.core.common.netty.network.protocol.common

import dev.slne.surf.cloud.core.common.netty.network.PacketListener
import dev.slne.surf.cloud.core.common.netty.network.protocol.running.SyncMapDeltaPacket
import dev.slne.surf.cloud.core.common.netty.network.protocol.running.SyncSetDeltaPacket
import dev.slne.surf.cloud.core.common.netty.network.protocol.running.SyncValueChangePacket

Expand All @@ -9,4 +10,6 @@ interface CommonSynchronizingRunningPacketListener : PacketListener {
fun handleSyncValueChange(packet: SyncValueChangePacket)

fun handleSyncSetDelta(packet: SyncSetDeltaPacket)

fun handleSyncMapDelta(packet: SyncMapDeltaPacket)
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ object RunningProtocols {
.addPacket(RequestPlayerPermissionPacket.STREAM_CODEC)
.addPacket(SyncValueChangePacket.STREAM_CODEC)
.addPacket(SyncSetDeltaPacket.STREAM_CODEC)
.addPacket(SyncMapDeltaPacket.STREAM_CODEC)
.addPacket(ClientboundSetVelocitySecretPacket.STREAM_CODEC)
.addPacket(WhitelistStatusResponsePacket.STREAM_CODEC)
.addPacket(WhitelistResponsePacket.STREAM_CODEC)
Expand Down Expand Up @@ -140,6 +141,7 @@ object RunningProtocols {
.addPacket(ServerboundQueuePlayerToGroupPacket.STREAM_CODEC)
.addPacket(SyncValueChangePacket.STREAM_CODEC)
.addPacket(SyncSetDeltaPacket.STREAM_CODEC)
.addPacket(SyncMapDeltaPacket.STREAM_CODEC)
.addPacket(ServerboundCreateOfflineCloudPlayerIfNotExistsPacket.STREAM_CODEC)
.addPacket(ServerboundRequestWhitelistStatusPacket.STREAM_CODEC)
.addPacket(WhitelistStatusResponsePacket.STREAM_CODEC)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package dev.slne.surf.cloud.core.common.netty.network.protocol.running

import dev.slne.surf.cloud.api.common.meta.SurfNettyPacket
import dev.slne.surf.cloud.api.common.netty.network.protocol.PacketFlow
import dev.slne.surf.cloud.api.common.netty.packet.NettyPacket
import dev.slne.surf.cloud.api.common.netty.packet.PacketHandlerMode
import dev.slne.surf.cloud.api.common.netty.packet.packetCodec
import dev.slne.surf.cloud.api.common.netty.protocol.buffer.SurfByteBuf
import dev.slne.surf.cloud.api.common.netty.protocol.buffer.encodeError
import dev.slne.surf.cloud.core.common.netty.network.InternalNettyPacket
import dev.slne.surf.cloud.core.common.netty.network.protocol.common.CommonSynchronizingRunningPacketListener
import dev.slne.surf.cloud.core.common.sync.CommonSyncRegistryImpl
import dev.slne.surf.cloud.core.common.sync.SyncMapImpl

@SurfNettyPacket(
"cloud:sync_map_delta",
PacketFlow.BIDIRECTIONAL,
handlerMode = PacketHandlerMode.DEFAULT
)
class SyncMapDeltaPacket : NettyPacket,
InternalNettyPacket<CommonSynchronizingRunningPacketListener> {
companion object {
val STREAM_CODEC = packetCodec(SyncMapDeltaPacket::write, ::SyncMapDeltaPacket)
}

val mapId: String
val key: Any?
val oldValue: Any?
val newValue: Any?
val changeId: Long
val registered: Boolean

constructor(
syncMap: SyncMapImpl<*, *>,
key: Any?,
oldValue: Any?,
newValue: Any?,
changeId: Long
) {
this.mapId = syncMap.id
this.key = key
this.oldValue = oldValue
this.newValue = newValue
this.changeId = changeId
this.registered = true
}

constructor(buf: SurfByteBuf) {
this.mapId = buf.readUtf()
this.changeId = buf.readLong()
val map = CommonSyncRegistryImpl.instance.getMap<Any?, Any?>(mapId)

if (map == null) {
this.registered = false
this.key = null
this.oldValue = null
this.newValue = null
buf.skipBytes(buf.readableBytes())
} else {
this.registered = true
this.key = map.keyCodec.decode(buf)
val hasOldValue = buf.readBoolean()
this.oldValue = if (hasOldValue) map.valueCodec.decode(buf) else null
val hasNewValue = buf.readBoolean()
this.newValue = if (hasNewValue) map.valueCodec.decode(buf) else null
}
}

private fun write(buf: SurfByteBuf) {
buf.writeUtf(mapId)
buf.writeLong(changeId)
val map = CommonSyncRegistryImpl.instance.getMap<Any?, Any?>(mapId)
?: encodeError("SyncMap '$mapId' is not registered")

map.keyCodec.encode(buf, key)

if (oldValue != null) {
buf.writeBoolean(true)
map.valueCodec.encode(buf, oldValue)
} else {
buf.writeBoolean(false)
}

if (newValue != null) {
buf.writeBoolean(true)
map.valueCodec.encode(buf, newValue)
} else {
buf.writeBoolean(false)
}
}

override fun handle(listener: CommonSynchronizingRunningPacketListener) {
listener.handleSyncMapDelta(this)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ interface ClientSynchronizingPacketListener : ClientCommonPacketListener,

fun handleBatchSyncSet(packet: ClientboundBatchSyncSetPacket)

fun handleBatchSyncMap(packet: ClientboundBatchSyncMapPacket)

fun handleBatchUpdateServer(packet: ClientboundBatchUpdateServer)

fun handlePlayerCacheHydrateStart(packet: ClientboundPlayerCacheHydrateStartPacket)
Expand Down
Loading