diff --git a/pgdog/src/backend/replication/logical/publisher/publisher_impl.rs b/pgdog/src/backend/replication/logical/publisher/publisher_impl.rs index d7b97cdf9..3d54d45ac 100644 --- a/pgdog/src/backend/replication/logical/publisher/publisher_impl.rs +++ b/pgdog/src/backend/replication/logical/publisher/publisher_impl.rs @@ -1,7 +1,7 @@ use std::collections::HashMap; use std::time::Duration; -use tokio::{select, spawn}; +use tokio::spawn; use tracing::{debug, error, info}; use super::super::{publisher::Table, Error}; @@ -113,35 +113,38 @@ impl Publisher { let progress = Progress::new_stream(); loop { - select! { - replication_data = slot.replicate(Duration::MAX) => { - match replication_data { - Ok(Some(ReplicationData::CopyData(data))) => { - let lsn = if let Some(ReplicationMeta::KeepAlive(ka)) = data.replication_meta() { - // If the LSN hasn't moved, we reached the end of the stream. - // If Postgres is getting requesting reply, provide our LSN now. - if !stream.set_current_lsn(ka.wal_end) || ka.reply() { - slot.status_update(stream.status_update()).await?; - } - debug!("origin at lsn {} [{}]", Lsn::from_i64(ka.wal_end), slot.server()?.addr()); - ka.wal_end - } else { - if let Some(status_update) = stream.handle(data).await? { - slot.status_update(status_update).await?; - } - stream.lsn() - }; - progress.update(stream.bytes_sharded(), lsn); + let replication_data = slot.replicate(Duration::MAX).await; + match replication_data { + Ok(Some(ReplicationData::CopyData(data))) => { + let lsn = if let Some(ReplicationMeta::KeepAlive(ka)) = + data.replication_meta() + { + // If the LSN hasn't moved, we reached the end of the stream. + // If Postgres is getting requesting reply, provide our LSN now. + if !stream.set_current_lsn(ka.wal_end) || ka.reply() { + slot.status_update(stream.status_update()).await?; } - Ok(Some(ReplicationData::CopyDone)) => (), - Ok(None) => { - slot.drop_slot().await?; - break; + debug!( + "origin at lsn {} [{}]", + Lsn::from_i64(ka.wal_end), + slot.server()?.addr() + ); + ka.wal_end + } else { + if let Some(status_update) = stream.handle(data).await? { + slot.status_update(status_update).await?; } - Err(err) => { - return Err(err); - } - } + stream.lsn() + }; + progress.update(stream.bytes_sharded(), lsn); + } + Ok(Some(ReplicationData::CopyDone)) => (), + Ok(None) => { + slot.drop_slot().await?; + break; + } + Err(err) => { + return Err(err); } } }