diff --git a/surf-cloud-core/surf-cloud-core-client/src/main/kotlin/dev/slne/surf/cloud/core/client/sync/SyncRegistryImpl.kt b/surf-cloud-core/surf-cloud-core-client/src/main/kotlin/dev/slne/surf/cloud/core/client/sync/SyncRegistryImpl.kt index f68b5b1e..0cd43a83 100644 --- a/surf-cloud-core/surf-cloud-core-client/src/main/kotlin/dev/slne/surf/cloud/core/client/sync/SyncRegistryImpl.kt +++ b/surf-cloud-core/surf-cloud-core-client/src/main/kotlin/dev/slne/surf/cloud/core/client/sync/SyncRegistryImpl.kt @@ -62,12 +62,12 @@ class SyncRegistryImpl : CommonSyncRegistryImpl() { lastChangeIds[packet.setId] = packet.changeId } - fun applyBatchSyncSets(bulk: List>>) { - bulk.forEach { (id, snapshot) -> + fun applyBatchSyncSets(bulk: List, Long>>) { + bulk.forEach { (id, snapshot, changeId) -> val set = getSet(id) if (set != null) { set.addAllInternal(snapshot) - lastChangeIds[id] = Long.MAX_VALUE // Reset change ID to max after bulk update + lastChangeIds[id] = changeId } } } diff --git a/surf-cloud-core/surf-cloud-core-common/src/main/kotlin/dev/slne/surf/cloud/core/common/netty/network/protocol/synchronizing/ClientboundBatchSyncSetPacket.kt b/surf-cloud-core/surf-cloud-core-common/src/main/kotlin/dev/slne/surf/cloud/core/common/netty/network/protocol/synchronizing/ClientboundBatchSyncSetPacket.kt index 9f02703a..2d0d34c4 100644 --- a/surf-cloud-core/surf-cloud-core-common/src/main/kotlin/dev/slne/surf/cloud/core/common/netty/network/protocol/synchronizing/ClientboundBatchSyncSetPacket.kt +++ b/surf-cloud-core/surf-cloud-core-common/src/main/kotlin/dev/slne/surf/cloud/core/common/netty/network/protocol/synchronizing/ClientboundBatchSyncSetPacket.kt @@ -27,10 +27,10 @@ class ClientboundBatchSyncSetPacket : NettyPacket, InternalNettyPacket>> + val syncSets: List, Long>> constructor(syncValues: Map>) { - this.syncSets = syncValues.map { (key, value) -> key to value.toSet() } + this.syncSets = syncValues.map { (key, value) -> Triple(key, value.toSet(), value.currentChangeId) } } private constructor(buf: SurfByteBuf) { @@ -43,10 +43,13 @@ class ClientboundBatchSyncSetPacket : NettyPacket, InternalNettyPacket(syncId) if (syncSet == null) { buf.skipBytes(syncSize) + buf.readLong() // Skip the change ID as well unknownSyncValues.add(syncId) null } else { - syncId to syncSet.codec.decode(buf) + val set = syncSet.codec.decode(buf) + val changeId = buf.readLong() + Triple(syncId, set, changeId) } }.filterNotNull() @@ -57,7 +60,7 @@ class ClientboundBatchSyncSetPacket : NettyPacket, InternalNettyPacket + buf.writeCollection(syncSets) { buf, (syncId, set, changeId) -> buf.writeUtf(syncId) // Reserve 4 bytes for length @@ -72,6 +75,9 @@ class ClientboundBatchSyncSetPacket : NettyPacket, InternalNettyPacket(override val id: String, val valueCodec: StreamCodec>() private val changeCounter = AtomicLong() + val currentChangeId: Long + get() = changeCounter.get() init { CommonSyncRegistryImpl.instance.register(this)