Skip to content

Commit 0dab102

Browse files
committed
Submit packages via bitcoind rpc
1 parent 1dd9827 commit 0dab102

File tree

1 file changed

+101
-5
lines changed

1 file changed

+101
-5
lines changed

src/chain/bitcoind.rs

Lines changed: 101 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -582,11 +582,8 @@ impl BitcoindChainSource {
582582
}
583583

584584
pub(crate) async fn process_broadcast_package(&self, package: Vec<Transaction>) {
585-
// While it's a bit unclear when we'd be able to lean on Bitcoin Core >v28
586-
// features, we should eventually switch to use `submitpackage` via the
587-
// `rust-bitcoind-json-rpc` crate rather than just broadcasting individual
588-
// transactions.
589-
for tx in &package {
585+
if package.len() == 1 {
586+
let tx = &package[0];
590587
let txid = tx.compute_txid();
591588
let timeout_fut = tokio::time::timeout(
592589
Duration::from_secs(TX_BROADCAST_TIMEOUT_SECS),
@@ -621,6 +618,40 @@ impl BitcoindChainSource {
621618
);
622619
},
623620
}
621+
} else if package.len() > 1 {
622+
let txids: Vec<_> = package.iter().map(|tx| tx.compute_txid()).collect();
623+
let timeout_fut = tokio::time::timeout(
624+
Duration::from_secs(TX_BROADCAST_TIMEOUT_SECS),
625+
self.api_client.submit_package(&package),
626+
);
627+
match timeout_fut.await {
628+
Ok(res) => match res {
629+
Ok(res) => {
630+
// TODO: We'd like to debug assert here the txids, but we sometimes
631+
// get 0 txids back
632+
log_trace!(self.logger, "Package broadcast result {}", res);
633+
},
634+
Err(e) => {
635+
log_error!(self.logger, "Failed to broadcast package {:?}: {}", txids, e);
636+
log_trace!(self.logger, "Failed broadcast package bytes:");
637+
for tx in package {
638+
log_trace!(self.logger, "{}", log_bytes!(tx.encode()));
639+
}
640+
},
641+
},
642+
Err(e) => {
643+
log_error!(
644+
self.logger,
645+
"Failed to broadcast package due to timeout {:?}: {}",
646+
txids,
647+
e
648+
);
649+
log_trace!(self.logger, "Failed broadcast package bytes:");
650+
for tx in package {
651+
log_trace!(self.logger, "{}", log_bytes!(tx.encode()));
652+
}
653+
},
654+
}
624655
}
625656
}
626657
}
@@ -717,6 +748,32 @@ impl BitcoindClient {
717748
rpc_client.call_method::<Txid>("sendrawtransaction", &[tx_json]).await
718749
}
719750

751+
/// Submits the provided package
752+
pub(crate) async fn submit_package(&self, package: &[Transaction]) -> std::io::Result<String> {
753+
match self {
754+
BitcoindClient::Rpc { rpc_client, .. } => {
755+
Self::submit_package_inner(Arc::clone(rpc_client), package).await
756+
},
757+
BitcoindClient::Rest { rpc_client, .. } => {
758+
// Bitcoin Core's REST interface does not support submitting packages
759+
// so we use the RPC client.
760+
Self::submit_package_inner(Arc::clone(rpc_client), package).await
761+
},
762+
}
763+
}
764+
765+
async fn submit_package_inner(
766+
rpc_client: Arc<RpcClient>, package: &[Transaction],
767+
) -> std::io::Result<String> {
768+
let package_serialized: Vec<_> =
769+
package.iter().map(|tx| bitcoin::consensus::encode::serialize_hex(tx)).collect();
770+
let package_json = serde_json::json!(package_serialized);
771+
rpc_client
772+
.call_method::<serde_json::Value>("submitpackage", &[package_json])
773+
.await
774+
.map(|value| value.to_string())
775+
}
776+
720777
/// Retrieve the fee estimate needed for a transaction to begin
721778
/// confirmation within the provided `num_blocks`.
722779
pub(crate) async fn get_fee_estimate_for_target(
@@ -1352,6 +1409,45 @@ impl TryInto<GetMempoolEntryResponse> for JsonResponse {
13521409
}
13531410
}
13541411

1412+
/*
1413+
pub struct SubmitPackageResponse {
1414+
package_msg: String,
1415+
txids: Vec<Txid>,
1416+
}
1417+
1418+
impl TryInto<SubmitPackageResponse> for JsonResponse {
1419+
type Error = std::io::Error;
1420+
fn try_into(self) -> std::io::Result<SubmitPackageResponse> {
1421+
let package_msg = self.0["package_msg"]
1422+
.as_str()
1423+
.ok_or(std::io::Error::new(
1424+
std::io::ErrorKind::Other,
1425+
"Failed to parse submitpackage response",
1426+
))?
1427+
.to_string();
1428+
let tx_results = self.0["tx-results"].as_object().ok_or(std::io::Error::new(
1429+
std::io::ErrorKind::Other,
1430+
"Failed to parse submitpackage response",
1431+
))?;
1432+
let mut txids = Vec::with_capacity(tx_results.len());
1433+
for tx_result in tx_results.values() {
1434+
let txid_string = tx_result["txid"].as_str().ok_or(std::io::Error::new(
1435+
std::io::ErrorKind::Other,
1436+
"Failed to parse submitpackage response",
1437+
))?;
1438+
let txid: Txid = txid_string.parse().map_err(|_| {
1439+
std::io::Error::new(
1440+
std::io::ErrorKind::Other,
1441+
"Failed to parse submitpackage response",
1442+
)
1443+
})?;
1444+
txids.push(txid);
1445+
}
1446+
Ok(SubmitPackageResponse { package_msg, txids })
1447+
}
1448+
}
1449+
*/
1450+
13551451
#[derive(Debug, Clone)]
13561452
pub(crate) struct MempoolEntry {
13571453
/// The transaction id

0 commit comments

Comments
 (0)