@@ -596,11 +596,8 @@ impl BitcoindChainSource {
596596 }
597597
598598 pub ( crate ) async fn process_broadcast_package ( & self , package : Vec < Transaction > ) {
599- // While it's a bit unclear when we'd be able to lean on Bitcoin Core >v28
600- // features, we should eventually switch to use `submitpackage` via the
601- // `rust-bitcoind-json-rpc` crate rather than just broadcasting individual
602- // transactions.
603- for tx in & package {
599+ if package. len ( ) == 1 {
600+ let tx = & package[ 0 ] ;
604601 let txid = tx. compute_txid ( ) ;
605602 let timeout_fut = tokio:: time:: timeout (
606603 Duration :: from_secs ( TX_BROADCAST_TIMEOUT_SECS ) ,
@@ -635,6 +632,40 @@ impl BitcoindChainSource {
635632 ) ;
636633 } ,
637634 }
635+ } else if package. len ( ) > 1 {
636+ let txids: Vec < _ > = package. iter ( ) . map ( |tx| tx. compute_txid ( ) ) . collect ( ) ;
637+ let timeout_fut = tokio:: time:: timeout (
638+ Duration :: from_secs ( TX_BROADCAST_TIMEOUT_SECS ) ,
639+ self . api_client . submit_package ( & package) ,
640+ ) ;
641+ match timeout_fut. await {
642+ Ok ( res) => match res {
643+ Ok ( res) => {
644+ // TODO: We'd like to debug assert here the txids, but we sometimes
645+ // get 0 txids back
646+ log_trace ! ( self . logger, "Package broadcast result {}" , res) ;
647+ } ,
648+ Err ( e) => {
649+ log_error ! ( self . logger, "Failed to broadcast package {:?}: {}" , txids, e) ;
650+ log_trace ! ( self . logger, "Failed broadcast package bytes:" ) ;
651+ for tx in package {
652+ log_trace ! ( self . logger, "{}" , log_bytes!( tx. encode( ) ) ) ;
653+ }
654+ } ,
655+ } ,
656+ Err ( e) => {
657+ log_error ! (
658+ self . logger,
659+ "Failed to broadcast package due to timeout {:?}: {}" ,
660+ txids,
661+ e
662+ ) ;
663+ log_trace ! ( self . logger, "Failed broadcast package bytes:" ) ;
664+ for tx in package {
665+ log_trace ! ( self . logger, "{}" , log_bytes!( tx. encode( ) ) ) ;
666+ }
667+ } ,
668+ }
638669 }
639670 }
640671}
@@ -731,6 +762,32 @@ impl BitcoindClient {
731762 rpc_client. call_method :: < Txid > ( "sendrawtransaction" , & [ tx_json] ) . await
732763 }
733764
765+ /// Submits the provided package
766+ pub ( crate ) async fn submit_package ( & self , package : & [ Transaction ] ) -> std:: io:: Result < String > {
767+ match self {
768+ BitcoindClient :: Rpc { rpc_client, .. } => {
769+ Self :: submit_package_inner ( Arc :: clone ( rpc_client) , package) . await
770+ } ,
771+ BitcoindClient :: Rest { rpc_client, .. } => {
772+ // Bitcoin Core's REST interface does not support submitting packages
773+ // so we use the RPC client.
774+ Self :: submit_package_inner ( Arc :: clone ( rpc_client) , package) . await
775+ } ,
776+ }
777+ }
778+
779+ async fn submit_package_inner (
780+ rpc_client : Arc < RpcClient > , package : & [ Transaction ] ,
781+ ) -> std:: io:: Result < String > {
782+ let package_serialized: Vec < _ > =
783+ package. iter ( ) . map ( |tx| bitcoin:: consensus:: encode:: serialize_hex ( tx) ) . collect ( ) ;
784+ let package_json = serde_json:: json!( package_serialized) ;
785+ rpc_client
786+ . call_method :: < serde_json:: Value > ( "submitpackage" , & [ package_json] )
787+ . await
788+ . map ( |value| value. to_string ( ) )
789+ }
790+
734791 /// Retrieve the fee estimate needed for a transaction to begin
735792 /// confirmation within the provided `num_blocks`.
736793 pub ( crate ) async fn get_fee_estimate_for_target (
@@ -1366,6 +1423,45 @@ impl TryInto<GetMempoolEntryResponse> for JsonResponse {
13661423 }
13671424}
13681425
1426+ /*
1427+ pub struct SubmitPackageResponse {
1428+ package_msg: String,
1429+ txids: Vec<Txid>,
1430+ }
1431+
1432+ impl TryInto<SubmitPackageResponse> for JsonResponse {
1433+ type Error = std::io::Error;
1434+ fn try_into(self) -> std::io::Result<SubmitPackageResponse> {
1435+ let package_msg = self.0["package_msg"]
1436+ .as_str()
1437+ .ok_or(std::io::Error::new(
1438+ std::io::ErrorKind::Other,
1439+ "Failed to parse submitpackage response",
1440+ ))?
1441+ .to_string();
1442+ let tx_results = self.0["tx-results"].as_object().ok_or(std::io::Error::new(
1443+ std::io::ErrorKind::Other,
1444+ "Failed to parse submitpackage response",
1445+ ))?;
1446+ let mut txids = Vec::with_capacity(tx_results.len());
1447+ for tx_result in tx_results.values() {
1448+ let txid_string = tx_result["txid"].as_str().ok_or(std::io::Error::new(
1449+ std::io::ErrorKind::Other,
1450+ "Failed to parse submitpackage response",
1451+ ))?;
1452+ let txid: Txid = txid_string.parse().map_err(|_| {
1453+ std::io::Error::new(
1454+ std::io::ErrorKind::Other,
1455+ "Failed to parse submitpackage response",
1456+ )
1457+ })?;
1458+ txids.push(txid);
1459+ }
1460+ Ok(SubmitPackageResponse { package_msg, txids })
1461+ }
1462+ }
1463+ */
1464+
13691465#[ derive( Debug , Clone ) ]
13701466pub ( crate ) struct MempoolEntry {
13711467 /// The transaction id
0 commit comments