From 496a03af47b7f7e1580131e8ed4f699f329dc920 Mon Sep 17 00:00:00 2001 From: Wukong Date: Sat, 10 May 2025 15:50:20 +0530 Subject: [PATCH 01/12] add basic workflow for the tracker --- Cargo.lock | 682 ++++++++++++++++++++++++++++++++- Cargo.toml | 8 +- src/db/db_manager.rs | 32 +- src/error.rs | 23 ++ src/handle_error.rs | 2 +- src/indexer/rpc.rs | 7 + src/indexer/tracker_indexer.rs | 28 +- src/main.rs | 151 ++++++-- src/server/mod.rs | 1 + src/server/tracker_monitor.rs | 73 ++++ src/server/tracker_server.rs | 156 +++++--- src/status.rs | 37 +- src/types.rs | 10 +- 13 files changed, 1075 insertions(+), 135 deletions(-) create mode 100644 src/server/tracker_monitor.rs diff --git a/Cargo.lock b/Cargo.lock index 913ae0d..e179a82 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2,12 +2,107 @@ # It is not intended for manual editing. version = 4 +[[package]] +name = "addr2line" +version = "0.24.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dfbe277e56a376000877090da837660b4427aad530e3028d44e0bffe4f89a1c1" +dependencies = [ + "gimli", +] + +[[package]] +name = "adler2" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "512761e0bb2578dd7380c6baaa0f4ce03e84f95e960231d1dec8bf4d7d6e2627" + +[[package]] +name = "aho-corasick" +version = "1.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e60d3430d3a69478ad0993f19238d2df97c507009a52b3c10addcd7f6bcb916" +dependencies = [ + "memchr", +] + +[[package]] +name = "anstream" +version = "0.6.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8acc5369981196006228e28809f761875c0327210a891e941f4c683b3a99529b" +dependencies = [ + "anstyle", + "anstyle-parse", + "anstyle-query", + "anstyle-wincon", + "colorchoice", + "is_terminal_polyfill", + "utf8parse", +] + +[[package]] +name = "anstyle" +version = "1.0.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "55cc3b69f167a1ef2e161439aa98aed94e6028e5f9a59be9a6ffb47aef1651f9" + +[[package]] +name = "anstyle-parse" +version = "0.2.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3b2d16507662817a6a20a9ea92df6652ee4f94f914589377d69f3b21bc5798a9" +dependencies = [ + "utf8parse", +] + +[[package]] +name = "anstyle-query" +version = "1.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "79947af37f4177cfead1110013d678905c37501914fba0efea834c3fe9a8d60c" +dependencies = [ + "windows-sys 0.59.0", +] + +[[package]] +name = "anstyle-wincon" +version = "3.0.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ca3534e77181a9cc07539ad51f2141fe32f6c3ffd4df76db8ad92346b003ae4e" +dependencies = [ + "anstyle", + "once_cell", + "windows-sys 0.59.0", +] + [[package]] name = "arrayvec" version = "0.7.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7c02d123df017efcdfbd739ef81735b36c5ba83ec3c59c80a9d7ecc718f92e50" +[[package]] +name = "autocfg" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ace50bade8e6234aa140d9a2f552bbee1db4d353f69b8217bc503490fc1a9f26" + +[[package]] +name = "backtrace" +version = "0.3.75" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6806a6321ec58106fea15becdad98371e28d92ccbc7c8f1b3b6dd724fe8f1002" +dependencies = [ + "addr2line", + "cfg-if", + "libc", + "miniz_oxide", + "object", + "rustc-demangle", + "windows-targets", +] + [[package]] name = "base58ck" version = "0.1.0" @@ -108,6 +203,18 @@ dependencies = [ "serde_json", ] +[[package]] +name = "bitflags" +version = "2.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c8214115b7bf84099f1309324e63141d4c5d7cc26862f97a0a857dbefe165bd" + +[[package]] +name = "bytes" +version = "1.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d71b6127be86fdcfddb610f7182ac57211d4b18a3e9c82eb2d17662f2227ad6a" + [[package]] name = "cc" version = "1.2.20" @@ -123,6 +230,77 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" +[[package]] +name = "clap" +version = "4.5.37" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eccb054f56cbd38340b380d4a8e69ef1f02f1af43db2f0cc817a4774d80ae071" +dependencies = [ + "clap_builder", + "clap_derive", +] + +[[package]] +name = "clap_builder" +version = "4.5.37" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "efd9466fac8543255d3b1fcad4762c5e116ffe808c8a3043d4263cd4fd4862a2" +dependencies = [ + "anstream", + "anstyle", + "clap_lex", + "strsim", +] + +[[package]] +name = "clap_derive" +version = "4.5.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09176aae279615badda0765c0c0b3f6ed53f4709118af73cf4655d85d1530cd7" +dependencies = [ + "heck", + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "clap_lex" +version = "0.7.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f46ad14479a25103f283c0f10005961cf086d8dc42205bb44c46ac563475dca6" + +[[package]] +name = "colorchoice" +version = "1.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5b63caa9aa9397e2d9480a9b13673856c78d8ac123288526c37d7839f2a86990" + +[[package]] +name = "futures-core" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e" + +[[package]] +name = "futures-sink" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e575fab7d1e0dcb8d0c7bcf9a63ee213816ab51902e6d244a95819acacf1d4f7" + +[[package]] +name = "generator" +version = "0.8.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cc6bd114ceda131d3b1d665eba35788690ad37f5916457286b32ab6fd3c438dd" +dependencies = [ + "cfg-if", + "libc", + "log", + "rustversion", + "windows", +] + [[package]] name = "getrandom" version = "0.2.16" @@ -134,6 +312,18 @@ dependencies = [ "wasi", ] +[[package]] +name = "gimli" +version = "0.31.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "07e28edb80900c19c28f1072f2e8aeca7fa06b23cd4169cefe1af5aa3260783f" + +[[package]] +name = "heck" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" + [[package]] name = "hex" version = "0.4.3" @@ -155,6 +345,12 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3011d1213f159867b13cfd6ac92d2cd5f1345762c63be3554e84092d85a50bbd" +[[package]] +name = "is_terminal_polyfill" +version = "1.70.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7943c866cc5cd64cbc25b2e01621d07fa8eb2a1a23160ee81ce38704e97b8ecf" + [[package]] name = "itoa" version = "1.0.15" @@ -173,24 +369,74 @@ dependencies = [ "serde_json", ] +[[package]] +name = "lazy_static" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbd2bcb4c963f2ddae06a2efc7e9f3591312473c50c6685e1f298068316e66fe" + [[package]] name = "libc" version = "0.2.172" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d750af042f7ef4f724306de029d18836c26c1765a54a6a3f094cbd23a7267ffa" +[[package]] +name = "lock_api" +version = "0.4.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "07af8b9cdd281b7915f413fa73f29ebd5d55d0d3f0155584dade1ff18cea1b17" +dependencies = [ + "autocfg", + "scopeguard", +] + [[package]] name = "log" version = "0.4.27" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "13dc2df351e3202783a1fe0d44375f7295ffb4049267b0f3018346dc122a1d94" +[[package]] +name = "loom" +version = "0.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "419e0dc8046cb947daa77eb95ae174acfbddb7673b4151f56d1eed8e93fbfaca" +dependencies = [ + "cfg-if", + "generator", + "pin-utils", + "scoped-tls", + "serde", + "serde_json", + "tracing", + "tracing-subscriber", +] + +[[package]] +name = "matchers" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8263075bb86c5a1b1427b5ae862e8889656f126e9f77c484496e8b47cf5c5558" +dependencies = [ + "regex-automata 0.1.10", +] + [[package]] name = "memchr" version = "2.7.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "78ca9ab1a0babb1e7d5695e3530886289c18cf2f87ec19a575a0abdce112e3a3" +[[package]] +name = "miniz_oxide" +version = "0.8.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3be647b768db090acb35d5ec5db2b0e1f1de11133ca123b9eacf5137868f892a" +dependencies = [ + "adler2", +] + [[package]] name = "minreq" version = "2.13.4" @@ -209,11 +455,76 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2886843bf800fba2e3377cff24abf6379b4c4d5c6681eaf9ea5b0d15090450bd" dependencies = [ "libc", - "log", "wasi", - "windows-sys", + "windows-sys 0.52.0", +] + +[[package]] +name = "nu-ansi-term" +version = "0.46.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77a8165726e8236064dbb45459242600304b42a5ea24ee2948e18e023bf7ba84" +dependencies = [ + "overload", + "winapi", +] + +[[package]] +name = "object" +version = "0.36.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "62948e14d923ea95ea2c7c86c71013138b66525b86bdc08d2dcc262bdb497b87" +dependencies = [ + "memchr", +] + +[[package]] +name = "once_cell" +version = "1.21.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "42f5e15c9953c5e4ccceeb2e7382a716482c34515315f7b03532b8b4e8393d2d" + +[[package]] +name = "overload" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39" + +[[package]] +name = "parking_lot" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f1bf18183cf54e8d6059647fc3063646a1801cf30896933ec2311622cc4b9a27" +dependencies = [ + "lock_api", + "parking_lot_core", +] + +[[package]] +name = "parking_lot_core" +version = "0.9.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e401f977ab385c9e4e3ab30627d6f26d00e2c73eef317493c4ec6d468726cf8" +dependencies = [ + "cfg-if", + "libc", + "redox_syscall", + "smallvec", + "windows-targets", ] +[[package]] +name = "pin-project-lite" +version = "0.2.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3b3cff922bd51709b605d9ead9aa71031d81447142d828eb4a6eba76fe619f9b" + +[[package]] +name = "pin-utils" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" + [[package]] name = "ppv-lite86" version = "0.2.21" @@ -271,12 +582,89 @@ dependencies = [ "getrandom", ] +[[package]] +name = "redox_syscall" +version = "0.5.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "928fca9cf2aa042393a8325b9ead81d2f0df4cb12e1e24cef072922ccd99c5af" +dependencies = [ + "bitflags", +] + +[[package]] +name = "regex" +version = "1.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b544ef1b4eac5dc2db33ea63606ae9ffcfac26c1416a2806ae0bf5f56b201191" +dependencies = [ + "aho-corasick", + "memchr", + "regex-automata 0.4.9", + "regex-syntax 0.8.5", +] + +[[package]] +name = "regex-automata" +version = "0.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c230d73fb8d8c1b9c0b3135c5142a8acee3a0558fb8db5cf1cb65f8d7862132" +dependencies = [ + "regex-syntax 0.6.29", +] + +[[package]] +name = "regex-automata" +version = "0.4.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "809e8dc61f6de73b46c85f4c96486310fe304c434cfa43669d7b40f711150908" +dependencies = [ + "aho-corasick", + "memchr", + "regex-syntax 0.8.5", +] + +[[package]] +name = "regex-syntax" +version = "0.6.29" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f162c6dd7b008981e4d40210aca20b4bd0f9b60ca9271061b07f78537722f2e1" + +[[package]] +name = "regex-syntax" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2b15c43186be67a4fd63bee50d0303afffcef381492ebe2c5d87f324e1b8815c" + +[[package]] +name = "rustc-demangle" +version = "0.1.24" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "719b953e2095829ee67db738b3bfa9fa368c94900df327b3f07fe6e794d2fe1f" + +[[package]] +name = "rustversion" +version = "1.0.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eded382c5f5f786b989652c49544c4877d9f015cc22e145a5ea8ea66c2921cd2" + [[package]] name = "ryu" version = "1.0.20" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "28d3b2b1366ec20994f1fd18c3c594f05c5dd4bc44d8bb0c1c632c8d6829481f" +[[package]] +name = "scoped-tls" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e1cf6437eb19a8f4a6cc0f7dca544973b0b78843adbfeb3683d1a94a0024a294" + +[[package]] +name = "scopeguard" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" + [[package]] name = "secp256k1" version = "0.29.1" @@ -330,12 +718,61 @@ dependencies = [ "serde", ] +[[package]] +name = "sharded-slab" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f40ca3c46823713e0d4209592e8d6e826aa57e928f09752619fc696c499637f6" +dependencies = [ + "lazy_static", +] + [[package]] name = "shlex" version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64" +[[package]] +name = "signal-hook-registry" +version = "1.4.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9203b8055f63a2a00e2f593bb0510367fe707d7ff1e5c872de2f537b339e5410" +dependencies = [ + "libc", +] + +[[package]] +name = "slab" +version = "0.4.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f92a496fb766b417c996b9c5e57daf2f7ad3b0bebe1ccfca4856390e3d3bb67" +dependencies = [ + "autocfg", +] + +[[package]] +name = "smallvec" +version = "1.15.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8917285742e9f3e1683f0a9c4e6b57960b7314d0b08d30d1ecd426713ee2eee9" + +[[package]] +name = "socket2" +version = "0.5.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4f5fd57c80058a56cf5c777ab8a126398ece8e442983605d280a44ce79d0edef" +dependencies = [ + "libc", + "windows-sys 0.52.0", +] + +[[package]] +name = "strsim" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7da8b5736845d9f2fcb837ea5d9e2628564b3b043a70948a3f0b778838c5fb4f" + [[package]] name = "syn" version = "2.0.101" @@ -347,13 +784,145 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "thread_local" +version = "1.1.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b9ef9bad013ada3808854ceac7b46812a6465ba368859a37e2100283d2d719c" +dependencies = [ + "cfg-if", + "once_cell", +] + +[[package]] +name = "tokio" +version = "1.45.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2513ca694ef9ede0fb23fe71a4ee4107cb102b9dc1930f6d0fd77aae068ae165" +dependencies = [ + "backtrace", + "bytes", + "libc", + "mio", + "parking_lot", + "pin-project-lite", + "signal-hook-registry", + "socket2", + "tokio-macros", + "windows-sys 0.52.0", +] + +[[package]] +name = "tokio-graceful" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "45740b38b48641855471cd402922e89156bdfbd97b69b45eeff170369cc18c7d" +dependencies = [ + "loom", + "pin-project-lite", + "slab", + "tokio", + "tracing", +] + +[[package]] +name = "tokio-macros" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e06d43f1345a3bcd39f6a56dbb7dcab2ba47e68e8ac134855e7e2bdbaf8cab8" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "tokio-util" +version = "0.7.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "66a539a9ad6d5d281510d5bd368c973d636c02dbf8a67300bfb6b950696ad7df" +dependencies = [ + "bytes", + "futures-core", + "futures-sink", + "pin-project-lite", + "tokio", +] + +[[package]] +name = "tracing" +version = "0.1.41" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "784e0ac535deb450455cbfa28a6f0df145ea1bb7ae51b821cf5e7927fdcfbdd0" +dependencies = [ + "pin-project-lite", + "tracing-attributes", + "tracing-core", +] + +[[package]] +name = "tracing-attributes" +version = "0.1.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "395ae124c09f9e6918a2310af6038fba074bcf474ac352496d5910dd59a2226d" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "tracing-core" +version = "0.1.33" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e672c95779cf947c5311f83787af4fa8fffd12fb27e4993211a84bdfd9610f9c" +dependencies = [ + "once_cell", + "valuable", +] + +[[package]] +name = "tracing-log" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ee855f1f400bd0e5c02d150ae5de3840039a3f54b025156404e34c23c03f47c3" +dependencies = [ + "log", + "once_cell", + "tracing-core", +] + +[[package]] +name = "tracing-subscriber" +version = "0.3.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e8189decb5ac0fa7bc8b96b7cb9b2701d60d48805aca84a238004d665fcc4008" +dependencies = [ + "matchers", + "nu-ansi-term", + "once_cell", + "regex", + "sharded-slab", + "smallvec", + "thread_local", + "tracing", + "tracing-core", + "tracing-log", +] + [[package]] name = "tracker" version = "0.1.0" dependencies = [ "bitcoincore-rpc", + "clap", "hex", - "mio", + "serde", + "tokio", + "tokio-graceful", + "tokio-util", + "tracing", + "tracing-subscriber", ] [[package]] @@ -362,12 +931,110 @@ version = "1.0.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5a5f39404a5da50712a4c1eecf25e90dd62b613502b7e925fd4e4d19b5c96512" +[[package]] +name = "utf8parse" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" + +[[package]] +name = "valuable" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba73ea9cf16a25df0c8caa16c51acb937d5712a8429db78a3ee29d5dcacd3a65" + [[package]] name = "wasi" version = "0.11.0+wasi-snapshot-preview1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" +[[package]] +name = "winapi" +version = "0.3.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c839a674fcd7a98952e593242ea400abe93992746761e38641405d28b00f419" +dependencies = [ + "winapi-i686-pc-windows-gnu", + "winapi-x86_64-pc-windows-gnu", +] + +[[package]] +name = "winapi-i686-pc-windows-gnu" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" + +[[package]] +name = "winapi-x86_64-pc-windows-gnu" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" + +[[package]] +name = "windows" +version = "0.58.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dd04d41d93c4992d421894c18c8b43496aa748dd4c081bac0dc93eb0489272b6" +dependencies = [ + "windows-core", + "windows-targets", +] + +[[package]] +name = "windows-core" +version = "0.58.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6ba6d44ec8c2591c134257ce647b7ea6b20335bf6379a27dac5f1641fcf59f99" +dependencies = [ + "windows-implement", + "windows-interface", + "windows-result", + "windows-strings", + "windows-targets", +] + +[[package]] +name = "windows-implement" +version = "0.58.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2bbd5b46c938e506ecbce286b6628a02171d56153ba733b6c741fc627ec9579b" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "windows-interface" +version = "0.58.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "053c4c462dc91d3b1504c6fe5a726dd15e216ba718e84a0e46a88fbe5ded3515" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "windows-result" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d1043d8214f791817bab27572aaa8af63732e11bf84aa21a45a78d6c317ae0e" +dependencies = [ + "windows-targets", +] + +[[package]] +name = "windows-strings" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4cd9b125c486025df0eabcb585e62173c6c9eddcec5d117d3b6e8c30e2ee4d10" +dependencies = [ + "windows-result", + "windows-targets", +] + [[package]] name = "windows-sys" version = "0.52.0" @@ -377,6 +1044,15 @@ dependencies = [ "windows-targets", ] +[[package]] +name = "windows-sys" +version = "0.59.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e38bc4d79ed67fd075bcc251a1c39b32a1776bbe92e5bef1f0bf1f8c531853b" +dependencies = [ + "windows-targets", +] + [[package]] name = "windows-targets" version = "0.52.6" diff --git a/Cargo.toml b/Cargo.toml index 6c98584..b8ddfe1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,5 +5,11 @@ edition = "2024" [dependencies] bitcoincore-rpc = "0.19.0" +clap = { version = "4.5.37", features = ["derive"] } hex = "0.4.3" -mio = { version = "1.0.3", features = ["net", "os-poll"] } +serde = { version = "1.0.219", features = ["derive"] } +tokio = { version = "1.45.0", features = ["full"] } +tokio-graceful = "0.2.2" +tokio-util = "0.7.15" +tracing = "0.1.41" +tracing-subscriber = "0.3.19" diff --git a/src/db/db_manager.rs b/src/db/db_manager.rs index f10e511..0e2450c 100644 --- a/src/db/db_manager.rs +++ b/src/db/db_manager.rs @@ -1,4 +1,6 @@ -use std::{collections::HashMap, sync::mpsc::Receiver}; +use std::collections::HashMap; +use tokio::sync::mpsc::Receiver; +use tracing::info; use crate::{ error::TrackerError, @@ -6,26 +8,40 @@ use crate::{ types::{DbRequest, ServerInfo}, }; -pub fn run(rx: Receiver, status_tx: status::Sender) { +pub async fn run(mut rx: Receiver, status_tx: status::Sender) { let mut servers: HashMap = HashMap::new(); - while let Ok(request) = rx.recv() { + info!("DB manager started"); + while let Some(request) = rx.recv().await { match request { DbRequest::Add(addr, info) => { + info!("Add request intercepted: address: {addr:?}, info: {info:?}"); servers.insert(addr, info); } DbRequest::Query(addr, resp_tx) => { + info!("Query request intecepted"); let result = servers.get(&addr).cloned(); let _ = resp_tx.send(result); } - DbRequest::Update(addr, update_fn) => { - if let Some(server) = servers.get_mut(&addr) { - update_fn(server) - } + DbRequest::Update(addr, server_info) => { + info!("Update request intercepted"); + servers.insert(addr, server_info); + } + DbRequest::QueryAll(resp_tx) => { + info!("Query all request intercepted"); + let response: Vec<(String, ServerInfo)> = servers.iter() + .map(|e| (e.0.clone(), e.1.clone())) + .collect(); + let _ = resp_tx.send(response).await; + } + DbRequest::QueryActive(resp_tx) => { + info!("Query active intercepted"); + let response: Vec<(String, ServerInfo)> = servers.iter().filter(|x| !x.1.stale).map(|e|(e.0.clone(), e.1.clone())).collect(); + let _ = resp_tx.send(response).await; } } } let _ = status_tx.send(Status { state: status::State::DBShutdown(TrackerError::DbManagerExited), - }); + }).await; } diff --git a/src/error.rs b/src/error.rs index f69ae98..ccbe22b 100644 --- a/src/error.rs +++ b/src/error.rs @@ -1,9 +1,14 @@ +use std::error::Error; +use crate::DbRequest; + #[derive(Debug)] pub enum TrackerError { DbManagerExited, ServerError, MempoolIndexerError, Shutdown, + ParsingError, + SendError, IOError(std::io::Error), RPCError(bitcoincore_rpc::Error), } @@ -19,3 +24,21 @@ impl From for TrackerError { TrackerError::RPCError(value) } } + +impl Error for TrackerError { + fn source(&self) -> Option<&(dyn Error + 'static)> { + None + } +} + +impl std::fmt::Display for TrackerError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{:?}", self) + } +} + +impl From> for TrackerError { + fn from(_: tokio::sync::mpsc::error::SendError) -> Self { + Self::SendError + } +} diff --git a/src/handle_error.rs b/src/handle_error.rs index dccc19a..894d026 100644 --- a/src/handle_error.rs +++ b/src/handle_error.rs @@ -4,7 +4,7 @@ macro_rules! handle_result { match $res { Ok(val) => val, Err(e) => { - let res = $crate::status::handle_error(&$sender, e.into()); + let res = $crate::status::handle_error(&$sender, e.into()).await; match res { $crate::handle_error::ErrorBranch::Break => break, $crate::handle_error::ErrorBranch::Continue => continue, diff --git a/src/indexer/rpc.rs b/src/indexer/rpc.rs index efff29c..1aa141a 100644 --- a/src/indexer/rpc.rs +++ b/src/indexer/rpc.rs @@ -42,3 +42,10 @@ impl BitcoinRpc { Ok(block) } } + + +impl From for BitcoinRpc { + fn from(value: Client) -> Self { + BitcoinRpc { client: value } + } +} \ No newline at end of file diff --git a/src/indexer/tracker_indexer.rs b/src/indexer/tracker_indexer.rs index 0b420ce..3a155bd 100644 --- a/src/indexer/tracker_indexer.rs +++ b/src/indexer/tracker_indexer.rs @@ -1,21 +1,19 @@ -use std::sync::mpsc::Sender; +use tokio::{sync::mpsc::Sender, time::Instant}; -use bitcoincore_rpc::bitcoin::absolute::{Height, LockTime}; +use bitcoincore_rpc::{bitcoin::absolute::{Height, LockTime}}; +use tracing::info; use super::rpc::BitcoinRpc; use crate::{ - handle_result, status, - types::{DbRequest, ServerInfo}, + handle_result, status, types::{DbRequest, ServerInfo} }; -pub fn run( - _db_tx: Sender, +pub async fn run( + db_tx: Sender, status_tx: status::Sender, - url: String, - username: String, - password: String, + client: BitcoinRpc ) { - let client = BitcoinRpc::new(url, username, password).unwrap(); + info!("Indexer started"); let mut last_tip = 0; loop { let blockchain_info = handle_result!(status_tx, client.get_blockchain_info()); @@ -41,10 +39,14 @@ pub fn run( if let Some(onion_address) = onion_address { let server_info = ServerInfo { onion_address: onion_address.clone(), - rate: 0.0, - uptime: 0, + cooldown: Instant::now(), + stale: false }; - let _ = DbRequest::Add(onion_address, server_info); + info!("New address found: {:?}", onion_address); + let db_request = DbRequest::Add(onion_address, server_info); + + handle_result!(status_tx, db_tx.send(db_request).await); + } } } diff --git a/src/main.rs b/src/main.rs index e1f1b3e..718f3fb 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,10 +1,15 @@ #![allow(dead_code)] +use error::TrackerError; use status::{State, Status}; -use std::{ +use tokio::{ sync::mpsc::{self, Receiver, Sender}, - thread, + }; +use bitcoincore_rpc::Client; +use bitcoincore_rpc::Auth; +use tracing::{info, warn}; use types::DbRequest; +use clap::Parser; mod db; mod error; mod handle_error; @@ -13,60 +18,142 @@ mod server; mod status; mod types; -fn main() { - let (mut db_tx, db_rx) = mpsc::channel::(); - let (status_tx, status_rx) = mpsc::channel::(); - spawn_db_manager(db_rx, status_tx.clone()); - spawn_mempool_indexer(db_tx.clone(), status_tx.clone()); - spawn_server(db_tx.clone(), status_tx.clone()); +#[derive(Parser)] +#[clap(version = option_env ! ("CARGO_PKG_VERSION").unwrap_or("unknown"), +author = option_env ! ("CARGO_PKG_AUTHORS").unwrap_or(""))] +struct App { + #[clap( + name = "ADDRESS:PORT", + long, + short = 'r', + default_value = "127.0.0.1:48332" + )] + pub(crate) rpc: String, + #[clap( + name = "USER:PASSWORD", + short = 'a', + long, + value_parser = parse_proxy_auth, + default_value = "username:password", + )] + pub auth: (String, String), + #[clap( + name = "Server ADDRESS:PORT", + short = 's', + long, + default_value = "127.0.0.1:8080", + )] + pub address: String +} + +fn parse_proxy_auth(s: &str) -> Result<(String, String), TrackerError> { + let parts: Vec<_> = s.split(':').collect(); + if parts.len() != 2 { + return Err(TrackerError::ParsingError) + } + + let user = parts[0].to_string(); + let passwd = parts[1].to_string(); + + Ok((user, passwd)) +} + + +#[derive(Debug, Clone)] +pub struct RPCConfig { + pub url: String, + pub auth: Auth, +} + +const RPC_HOSTPORT: &str = "localhost:18443"; + +impl RPCConfig { + fn new(url: String, auth: Auth) -> Self { + RPCConfig { url, auth} + } +} + +impl Default for RPCConfig { + fn default() -> Self { + Self { + url: RPC_HOSTPORT.to_string(), + auth: Auth::UserPass("regtestrpcuser".to_string(), "regtestrpcpass".to_string()), + } + } +} + +impl From for Client { + fn from(value: RPCConfig) -> Self { + let rpc = Client::new(&value.url, value.auth.clone()).unwrap(); + rpc + } +} + + +#[tokio::main] +async fn main() { + + tracing_subscriber::fmt::init(); + + let args = App::parse(); + + let rpc_config = RPCConfig::new(args.rpc, Auth::UserPass(args.auth.0, args.auth.1)); + + + let (mut db_tx, db_rx) = mpsc::channel::(10); + let (status_tx, mut status_rx) = mpsc::channel::(10); + let server_address = args.address; + + spawn_db_manager(db_rx, status_tx.clone()).await; + spawn_mempool_indexer(db_tx.clone(), status_tx.clone(), rpc_config.clone().into()).await; + spawn_server(db_tx.clone(), status_tx.clone(), server_address.clone()).await; + + info!("Tracker started"); - while let Ok(status) = status_rx.recv() { + while let Some(status) = status_rx.recv().await { match status.state { State::DBShutdown(e) => { - println!("DB Manager exited. Restarting..., {:?}", e); - let (db_tx_s, db_rx) = mpsc::channel::(); + warn!("DB Manager exited. Restarting..., {:?}", e); + let (db_tx_s, db_rx) = mpsc::channel::(10); db_tx = db_tx_s; - spawn_db_manager(db_rx, status_tx.clone()); + spawn_db_manager(db_rx, status_tx.clone()).await; } State::Healthy(e) => { - println!("All looks good: {:?}", e); + info!("All looks good: {:?}", e); } State::MempoolShutdown(e) => { - println!( + warn!( "Mempool Indexer encountered an error. Restarting...: {:?}", e ); - spawn_mempool_indexer(db_tx.clone(), status_tx.clone()); + let client: Client = rpc_config.clone().into(); + spawn_mempool_indexer(db_tx.clone(), status_tx.clone(), client.into()).await; } State::ServerShutdown(e) => { - println!("Server encountered an error. Restarting...: {:?}", e); - spawn_server(db_tx.clone(), status_tx.clone()); + warn!("Server encountered an error. Restarting...: {:?}", e); + spawn_server(db_tx.clone(), status_tx.clone(), server_address.clone()).await; } } } } -fn spawn_db_manager(db_tx: Receiver, status_tx: Sender) { - thread::spawn(move || { - db::run(db_tx, status::Sender::DBManager(status_tx)); - }); +async fn spawn_db_manager(db_tx: Receiver, status_tx: Sender) { + info!("Spawning db manager"); + tokio::spawn(db::run(db_tx, status::Sender::DBManager(status_tx))); } -fn spawn_mempool_indexer(db_tx: Sender, status_tx: Sender) { - thread::spawn(move || { +async fn spawn_mempool_indexer(db_tx: Sender, status_tx: Sender, client: Client) { + info!("Spawning indexer"); + tokio::spawn( indexer::run( db_tx, status::Sender::Mempool(status_tx), - "user".to_string(), - "password".to_string(), - "Something".to_string(), - ); - }); + client.into() + )); } -fn spawn_server(db_tx: Sender, status_tx: Sender) { - thread::spawn(move || { - server::run(db_tx, status::Sender::Server(status_tx)); - }); +async fn spawn_server(db_tx: Sender, status_tx: Sender, address: String) { + info!("Spawning server instance"); + tokio::spawn(server::run(db_tx, status::Sender::Server(status_tx), address)); } diff --git a/src/server/mod.rs b/src/server/mod.rs index 413226d..f7ee069 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -1,2 +1,3 @@ mod tracker_server; +mod tracker_monitor; pub use tracker_server::run; diff --git a/src/server/tracker_monitor.rs b/src/server/tracker_monitor.rs new file mode 100644 index 0000000..70173e2 --- /dev/null +++ b/src/server/tracker_monitor.rs @@ -0,0 +1,73 @@ +use std::error::Error; +use std::time::Duration; + +use tokio::{ + io::{AsyncReadExt, AsyncWriteExt, BufReader, BufWriter}, + net::TcpStream, + sync::mpsc::Sender, + time::{sleep, Instant}, +}; +use tracing::{ info, error }; + +use crate::{handle_result, status, types::{DbRequest, ServerInfo}}; + +const COOLDOWN_PERIOD: u64 = 5 * 60; + +pub async fn monitor_systems( + db_tx: Sender, + status_tx: status::Sender, +) -> Result<(), Box> { + info!("Starting to monitor other maker services"); + loop { + + sleep(Duration::from_secs(1000)).await; + + let (response_tx, mut response_rx) = tokio::sync::mpsc::channel(1); + if db_tx.send(DbRequest::QueryAll(response_tx)).await.is_err() { + continue; + } + + if let Some(response) = response_rx.recv().await { + for (address, server_info) in response { + let cooldown_duration = Duration::from_secs(COOLDOWN_PERIOD); + if server_info.cooldown.elapsed() <= cooldown_duration { + continue; + } + + match TcpStream::connect(&address).await { + Ok(stream) => { + let (reader, writer) = stream.into_split(); + let mut buf_reader = BufReader::new(reader); + let mut buf_writer = BufWriter::new(writer); + + if let Err(e) = buf_writer.write_all(b"send").await { + error!("Error: {:?}", e); + continue; + } + + let mut response = String::new(); + let n = handle_result!(status_tx, buf_reader.read_to_string(&mut response).await); + info!("Number of bytes read: {:?}", n); + + let updated_info = ServerInfo { + onion_address: response, + cooldown: Instant::now(), + stale: false, + }; + + let _ = db_tx.send(DbRequest::Update(address, updated_info)).await; + } + Err(_) => { + if !server_info.stale { + let updated_info = ServerInfo { + stale: true, + ..server_info + }; + let _ = db_tx.send(DbRequest::Update(address, updated_info)).await; + } + } + } + } + } + } +} diff --git a/src/server/tracker_server.rs b/src/server/tracker_server.rs index bb490a9..7154950 100644 --- a/src/server/tracker_server.rs +++ b/src/server/tracker_server.rs @@ -1,67 +1,113 @@ -use mio::{Events, Interest, Poll, Token, net::TcpStream}; -use std::collections::HashMap; -use std::io::ErrorKind; -use std::io::Read; -use std::io::Write; -use std::sync::mpsc; -use std::sync::mpsc::Sender; - -use crate::handle_result; +use tokio::io::AsyncWriteExt; +use tokio::net::TcpListener; +use tokio::sync::mpsc; +use tokio::sync::mpsc::Sender; +use tracing::{info, error}; +use tokio::io::BufReader; +use tokio::io::BufWriter; +use crate::error::TrackerError; +use crate::server::tracker_monitor::monitor_systems; +use tokio::io::AsyncBufReadExt; use crate::status; use crate::types::DbRequest; -const SERVER: Token = Token(0); - -pub fn run(db_tx: Sender, status_tx: status::Sender) { - let addr = "127.0.0.1:8080".parse().unwrap(); - let mut events = Events::with_capacity(1024); - let mut poll = Poll::new().unwrap(); - let mut server = mio::net::TcpListener::bind(addr).expect("Port binding failed"); - poll.registry() - .register(&mut server, SERVER, Interest::READABLE) - .expect("Couldn't register the file descriptors"); - - let mut clients: HashMap = HashMap::new(); - let mut unique_token = 1; - - loop { - handle_result!(status_tx, poll.poll(&mut events, None)); - let _ = events.iter().map(|e| match e.token() { - SERVER => { - if let Ok((mut connection, _)) = server.accept() { - let token = Token(unique_token); - unique_token += 1; - poll.registry() - .register(&mut connection, token, Interest::READABLE) - .unwrap(); - clients.insert(token, connection); +pub async fn run(db_tx: Sender, status_tx: status::Sender, address: String) -> Result<(), Box>{ + + let server = TcpListener::bind(&address).await?; + + tokio::spawn(monitor_systems(db_tx.clone(), status_tx.clone())); + + info!("Tracker server listening on {}", address); + + while let Ok((stream, client_addr)) = server.accept().await { + info!("Accepted connection from {}", client_addr); + let status_tx_clone = status_tx.clone(); + let db_tx_clone = db_tx.clone(); + tokio::spawn(async move { + let (reader_part, writer_part) = stream.into_split(); + let mut reader = BufReader::new(reader_part); + let mut writer = BufWriter::new(writer_part); + let mut line = String::new(); + loop { + line.clear(); + let bytes_read = match reader.read_line(&mut line).await { + Ok(n) => n, + Err(e) => { + error!("Error reading from {}: {}", client_addr, e); + let _ = status_tx_clone.send(status::Status { + state: status::State::ServerShutdown(TrackerError::IOError(e)) + }).await; + return; + } + }; + + info!("Bytes read: {:?}", bytes_read); + + let request_message = line.trim(); + + if request_message.is_empty() { + continue; } - } - token => { - if let Some(client) = clients.get_mut(&token) { - let mut buf = [0; 1024]; - match client.read(&mut buf) { - Ok(0) => { - clients.remove(&token); - } - Ok(n) => { - let request = String::from_utf8_lossy(&buf[..n]); - if request.contains("GET") { - let (resp_tx, resp_rx) = mpsc::channel(); - let _ = db_tx.send(DbRequest::Query("asc".to_string(), resp_tx)); - if let Ok(server) = resp_rx.recv() { - let response = format!("{:?}", server); - let _ = client.write_all(response.as_bytes()); - } + + info!("Received message from {}: {:?}", client_addr, request_message); + + let response_message: String; + + if request_message.starts_with("QUERY ") { + + let (resp_tx, mut resp_rx) = mpsc::channel(1); + let db_request = DbRequest::QueryActive(resp_tx); + let send_res = db_tx_clone.send(db_request).await; + + if let Err(e) = send_res { + error!("Error sending DB query for {}: {}", client_addr, e); + let _ = status_tx_clone.send(status::Status { + state: status::State::ServerShutdown(TrackerError::DbManagerExited) // Example error + }).await; + response_message = "ERROR: DB query failed".to_string(); + } else { + match resp_rx.recv().await { + Some(server_info) => { + response_message = format!("OK: {:?}", server_info); + } + None => { + response_message = "NOT_FOUND".to_string(); } - } - Err(e) if e.kind() == ErrorKind::WouldBlock => {} - Err(_) => { - clients.remove(&token); } } + } else if request_message.starts_with("ADD ") { + response_message = "ERROR: Add command not implemented via server".to_string(); } + else { + response_message = format!("ERROR: Unknown command '{}'", request_message); // Custom protocol error + } + + let write_res = writer.write_all(response_message.as_bytes()).await; + if let Err(e) = write_res { + error!("Error writing to {}: {}", client_addr, e); + let _ = status_tx_clone.send(status::Status { + state: status::State::ServerShutdown(TrackerError::IOError(e)) + }).await; + return; + } + if let Err(e) = writer.write_all(b"\n").await { + error!("Error writing newline to {}: {}", client_addr, e); + let _ = status_tx_clone.send(status::Status { + state: status::State::ServerShutdown(TrackerError::IOError(e)) + }).await; + } + if let Err(e) = writer.flush().await { + error!("Error flushing writer for {}: {}", client_addr, e); + let _ = status_tx_clone.send(status::Status { + state: status::State::ServerShutdown(TrackerError::IOError(e)) + }).await; + return; + } + println!("Sent response to {}", client_addr); } }); } + + Ok(()) + } diff --git a/src/status.rs b/src/status.rs index 5793fc9..60ab428 100644 --- a/src/status.rs +++ b/src/status.rs @@ -1,5 +1,5 @@ use crate::{error::TrackerError, handle_error::ErrorBranch}; -use std::sync::mpsc::{self, SendError}; +use tokio::sync::mpsc::{self, error::SendError}; #[derive(Debug)] pub enum Sender { @@ -9,11 +9,11 @@ pub enum Sender { } impl Sender { - pub fn send(&self, status: Status) -> Result<(), SendError> { + pub async fn send(&self, status: Status) -> Result<(), SendError> { match self { - Self::Mempool(inner) => inner.send(status), - Self::Server(inner) => inner.send(status), - Self::DBManager(inner) => inner.send(status), + Self::Mempool(inner) => inner.send(status).await, + Self::Server(inner) => inner.send(status).await, + Self::DBManager(inner) => inner.send(status).await, } } } @@ -41,46 +41,47 @@ pub struct Status { pub state: State, } -fn send_status(sender: &Sender, e: TrackerError, outcome: ErrorBranch) -> ErrorBranch { +async fn send_status(sender: &Sender, e: TrackerError, outcome: ErrorBranch) -> ErrorBranch { match sender { Sender::Mempool(tx) => match e { TrackerError::MempoolIndexerError => { tx.send(Status { state: State::MempoolShutdown(e), }) - .unwrap_or(()); + .await.unwrap_or(()); } _ => { - // let string_err = e.to_string(); tx.send(Status { state: State::Healthy("error occured in mempool".to_string()), }) - .unwrap_or(()); + .await.unwrap_or(()); } }, Sender::Server(tx) => { tx.send(Status { state: State::ServerShutdown(e), }) - .unwrap_or(()); + .await.unwrap_or(()); } Sender::DBManager(tx) => { tx.send(Status { state: State::DBShutdown(e), }) - .unwrap_or(()); + .await.unwrap_or(()); } } outcome } -pub fn handle_error(sender: &Sender, e: TrackerError) -> ErrorBranch { +pub async fn handle_error(sender: &Sender, e: TrackerError) -> ErrorBranch { match e { - TrackerError::DbManagerExited => send_status(sender, e, ErrorBranch::Break), - TrackerError::MempoolIndexerError => send_status(sender, e, ErrorBranch::Break), - TrackerError::ServerError => send_status(sender, e, ErrorBranch::Break), - TrackerError::Shutdown => send_status(sender, e, ErrorBranch::Break), - TrackerError::IOError(_) => send_status(sender, e, ErrorBranch::Break), - TrackerError::RPCError(_) => send_status(sender, e, ErrorBranch::Break), + TrackerError::DbManagerExited => send_status(sender, e, ErrorBranch::Break).await, + TrackerError::MempoolIndexerError => send_status(sender, e, ErrorBranch::Break).await, + TrackerError::ServerError => send_status(sender, e, ErrorBranch::Break).await, + TrackerError::Shutdown => send_status(sender, e, ErrorBranch::Break).await, + TrackerError::IOError(_) => send_status(sender, e, ErrorBranch::Break).await, + TrackerError::RPCError(_) => send_status(sender, e, ErrorBranch::Break).await, + TrackerError::ParsingError => send_status(sender, e, ErrorBranch::Continue).await, + TrackerError::SendError => send_status(sender, e, ErrorBranch::Break).await } } diff --git a/src/types.rs b/src/types.rs index 2de246c..6eda768 100644 --- a/src/types.rs +++ b/src/types.rs @@ -1,14 +1,16 @@ -use std::sync::mpsc::Sender; +use tokio::{sync::mpsc::Sender, time::Instant}; #[derive(Debug, Clone)] pub struct ServerInfo { pub onion_address: String, - pub rate: f64, - pub uptime: u64, + pub cooldown: Instant, + pub stale: bool } pub enum DbRequest { Add(String, ServerInfo), Query(String, Sender>), - Update(String, Box), + Update(String, ServerInfo), + QueryAll(Sender>), + QueryActive(Sender>) } From c291ba68c0da9c22590da42710e9ab40f12d55a6 Mon Sep 17 00:00:00 2001 From: Wukong Date: Tue, 20 May 2025 11:51:59 +0530 Subject: [PATCH 02/12] add bidirectional messages and tor support --- Cargo.lock | 74 +++++++++++++++ Cargo.toml | 2 + src/db/db_manager.rs | 21 +++-- src/error.rs | 12 ++- src/indexer/rpc.rs | 3 +- src/indexer/tracker_indexer.rs | 14 ++- src/main.rs | 152 +++++++++++++++++++++---------- src/server/mod.rs | 2 +- src/server/tracker_monitor.rs | 58 ++++++------ src/server/tracker_server.rs | 144 ++++++++++------------------- src/status.rs | 16 ++-- src/tor.rs | 159 +++++++++++++++++++++++++++++++++ src/types.rs | 61 ++++++++++++- src/utils.rs | 34 +++++++ 14 files changed, 555 insertions(+), 197 deletions(-) create mode 100644 src/tor.rs create mode 100644 src/utils.rs diff --git a/Cargo.lock b/Cargo.lock index e179a82..959d9dc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -276,6 +276,12 @@ version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5b63caa9aa9397e2d9480a9b13673856c78d8ac123288526c37d7839f2a86990" +[[package]] +name = "either" +version = "1.15.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "48c757948c5ede0e46177b7add2e67155f70e33c07fea8284df6576da70b3719" + [[package]] name = "futures-core" version = "0.3.31" @@ -288,6 +294,24 @@ version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e575fab7d1e0dcb8d0c7bcf9a63ee213816ab51902e6d244a95819acacf1d4f7" +[[package]] +name = "futures-task" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f90f7dce0722e95104fcb095585910c0977252f286e354b5e3bd38902cd99988" + +[[package]] +name = "futures-util" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81" +dependencies = [ + "futures-core", + "futures-task", + "pin-project-lite", + "pin-utils", +] + [[package]] name = "generator" version = "0.8.4" @@ -318,6 +342,12 @@ version = "0.31.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "07e28edb80900c19c28f1072f2e8aeca7fa06b23cd4169cefe1af5aa3260783f" +[[package]] +name = "half" +version = "1.8.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1b43ede17f21864e81be2fa654110bf1e793774238d86ef8555c37e6519c0403" + [[package]] name = "heck" version = "0.5.0" @@ -695,6 +725,16 @@ dependencies = [ "serde_derive", ] +[[package]] +name = "serde_cbor" +version = "0.11.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2bef2ebfde456fb76bbcf9f59315333decc4fda0b2b44b420243c11e0f5ec1f5" +dependencies = [ + "half", + "serde", +] + [[package]] name = "serde_derive" version = "1.0.219" @@ -784,6 +824,26 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "thiserror" +version = "1.0.69" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6aaf5339b578ea85b50e080feb250a3e8ae8cfcdff9a461c9ec2904bc923f52" +dependencies = [ + "thiserror-impl", +] + +[[package]] +name = "thiserror-impl" +version = "1.0.69" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4fee6c4efc90059e10f81e6d42c60a18f76588c3d74cb83a0b242a2b6c7504c1" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "thread_local" version = "1.1.8" @@ -836,6 +896,18 @@ dependencies = [ "syn", ] +[[package]] +name = "tokio-socks" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0d4770b8024672c1101b3f6733eab95b18007dbe0847a8afe341fcf79e06043f" +dependencies = [ + "either", + "futures-util", + "thiserror", + "tokio", +] + [[package]] name = "tokio-util" version = "0.7.15" @@ -918,8 +990,10 @@ dependencies = [ "clap", "hex", "serde", + "serde_cbor", "tokio", "tokio-graceful", + "tokio-socks", "tokio-util", "tracing", "tracing-subscriber", diff --git a/Cargo.toml b/Cargo.toml index b8ddfe1..4b44e6e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,8 +8,10 @@ bitcoincore-rpc = "0.19.0" clap = { version = "4.5.37", features = ["derive"] } hex = "0.4.3" serde = { version = "1.0.219", features = ["derive"] } +serde_cbor = "0.11.2" tokio = { version = "1.45.0", features = ["full"] } tokio-graceful = "0.2.2" +tokio-socks = "0.5.2" tokio-util = "0.7.15" tracing = "0.1.41" tracing-subscriber = "0.3.19" diff --git a/src/db/db_manager.rs b/src/db/db_manager.rs index 0e2450c..55ab9d7 100644 --- a/src/db/db_manager.rs +++ b/src/db/db_manager.rs @@ -20,7 +20,7 @@ pub async fn run(mut rx: Receiver, status_tx: status::Sender) { DbRequest::Query(addr, resp_tx) => { info!("Query request intecepted"); let result = servers.get(&addr).cloned(); - let _ = resp_tx.send(result); + let _ = resp_tx.send(result).await; } DbRequest::Update(addr, server_info) => { info!("Update request intercepted"); @@ -28,20 +28,25 @@ pub async fn run(mut rx: Receiver, status_tx: status::Sender) { } DbRequest::QueryAll(resp_tx) => { info!("Query all request intercepted"); - let response: Vec<(String, ServerInfo)> = servers.iter() - .map(|e| (e.0.clone(), e.1.clone())) - .collect(); + let response: Vec<(String, ServerInfo)> = + servers.iter().map(|e| (e.0.clone(), e.1.clone())).collect(); let _ = resp_tx.send(response).await; } DbRequest::QueryActive(resp_tx) => { info!("Query active intercepted"); - let response: Vec<(String, ServerInfo)> = servers.iter().filter(|x| !x.1.stale).map(|e|(e.0.clone(), e.1.clone())).collect(); + let response: Vec = servers + .iter() + .filter(|x| !x.1.stale) + .map(|e| e.0.clone()) + .collect(); let _ = resp_tx.send(response).await; } } } - let _ = status_tx.send(Status { - state: status::State::DBShutdown(TrackerError::DbManagerExited), - }).await; + let _ = status_tx + .send(Status { + state: status::State::DBShutdown(TrackerError::DbManagerExited), + }) + .await; } diff --git a/src/error.rs b/src/error.rs index ccbe22b..2b0acc1 100644 --- a/src/error.rs +++ b/src/error.rs @@ -1,5 +1,5 @@ -use std::error::Error; use crate::DbRequest; +use std::error::Error; #[derive(Debug)] pub enum TrackerError { @@ -11,6 +11,8 @@ pub enum TrackerError { SendError, IOError(std::io::Error), RPCError(bitcoincore_rpc::Error), + SerdeCbor(serde_cbor::Error), + General(String), } impl From for TrackerError { @@ -37,8 +39,14 @@ impl std::fmt::Display for TrackerError { } } -impl From> for TrackerError { +impl From> for TrackerError { fn from(_: tokio::sync::mpsc::error::SendError) -> Self { Self::SendError } } + +impl From for TrackerError { + fn from(value: serde_cbor::Error) -> Self { + Self::SerdeCbor(value) + } +} diff --git a/src/indexer/rpc.rs b/src/indexer/rpc.rs index 1aa141a..6267439 100644 --- a/src/indexer/rpc.rs +++ b/src/indexer/rpc.rs @@ -43,9 +43,8 @@ impl BitcoinRpc { } } - impl From for BitcoinRpc { fn from(value: Client) -> Self { BitcoinRpc { client: value } } -} \ No newline at end of file +} diff --git a/src/indexer/tracker_indexer.rs b/src/indexer/tracker_indexer.rs index 3a155bd..32c4b04 100644 --- a/src/indexer/tracker_indexer.rs +++ b/src/indexer/tracker_indexer.rs @@ -1,18 +1,15 @@ use tokio::{sync::mpsc::Sender, time::Instant}; -use bitcoincore_rpc::{bitcoin::absolute::{Height, LockTime}}; +use bitcoincore_rpc::bitcoin::absolute::{Height, LockTime}; use tracing::info; use super::rpc::BitcoinRpc; use crate::{ - handle_result, status, types::{DbRequest, ServerInfo} + handle_result, status, + types::{DbRequest, ServerInfo}, }; -pub async fn run( - db_tx: Sender, - status_tx: status::Sender, - client: BitcoinRpc -) { +pub async fn run(db_tx: Sender, status_tx: status::Sender, client: BitcoinRpc) { info!("Indexer started"); let mut last_tip = 0; loop { @@ -40,13 +37,12 @@ pub async fn run( let server_info = ServerInfo { onion_address: onion_address.clone(), cooldown: Instant::now(), - stale: false + stale: false, }; info!("New address found: {:?}", onion_address); let db_request = DbRequest::Add(onion_address, server_info); handle_result!(status_tx, db_tx.send(db_request).await); - } } } diff --git a/src/main.rs b/src/main.rs index 718f3fb..c6803ca 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,23 +1,26 @@ #![allow(dead_code)] +use std::path::Path; + +use bitcoincore_rpc::Auth; +use bitcoincore_rpc::Client; +use clap::Parser; use error::TrackerError; use status::{State, Status}; -use tokio::{ - sync::mpsc::{self, Receiver, Sender}, - -}; -use bitcoincore_rpc::Client; -use bitcoincore_rpc::Auth; +use tokio::sync::mpsc::{self, Receiver, Sender}; +use tor::check_tor_status; +use tor::get_tor_hostname; +use tracing::error; use tracing::{info, warn}; use types::DbRequest; -use clap::Parser; mod db; mod error; mod handle_error; mod indexer; mod server; mod status; +mod tor; mod types; - +mod utils; #[derive(Parser)] #[clap(version = option_env ! ("CARGO_PKG_VERSION").unwrap_or("unknown"), @@ -42,15 +45,27 @@ struct App { name = "Server ADDRESS:PORT", short = 's', long, - default_value = "127.0.0.1:8080", + default_value = "127.0.0.1:8080" )] - pub address: String + pub address: String, + + #[clap(name = "control port PORT", short = 'c', long, default_value = "9051")] + pub control_port: u16, + + #[clap(name = "tor_auth_password", long, default_value = "")] + pub tor_auth_password: String, + + #[clap(name = "socks port PORT", long, default_value = "9050")] + pub socks_port: u16, + + #[clap(name = "datadir", long, default_value = ".tracker")] + pub datadir: String, } fn parse_proxy_auth(s: &str) -> Result<(String, String), TrackerError> { let parts: Vec<_> = s.split(':').collect(); if parts.len() != 2 { - return Err(TrackerError::ParsingError) + return Err(TrackerError::ParsingError); } let user = parts[0].to_string(); @@ -59,7 +74,6 @@ fn parse_proxy_auth(s: &str) -> Result<(String, String), TrackerError> { Ok((user, passwd)) } - #[derive(Debug, Clone)] pub struct RPCConfig { pub url: String, @@ -70,7 +84,7 @@ const RPC_HOSTPORT: &str = "localhost:18443"; impl RPCConfig { fn new(url: String, auth: Auth) -> Self { - RPCConfig { url, auth} + RPCConfig { url, auth } } } @@ -85,54 +99,87 @@ impl Default for RPCConfig { impl From for Client { fn from(value: RPCConfig) -> Self { - let rpc = Client::new(&value.url, value.auth.clone()).unwrap(); - rpc + Client::new(&value.url, value.auth.clone()).unwrap() } } - #[tokio::main] async fn main() { - tracing_subscriber::fmt::init(); let args = App::parse(); - let rpc_config = RPCConfig::new(args.rpc, Auth::UserPass(args.auth.0, args.auth.1)); + let rpc_config = RPCConfig::new(args.rpc, Auth::UserPass(args.auth.0, args.auth.1)); + + check_tor_status(args.control_port, &args.tor_auth_password) + .await + .expect("Failed to check Tor status"); + + let hostname = match args.address.split_once(':') { + Some((_, port)) => { + let port = port.parse::().expect("Invalid port in address"); + get_tor_hostname( + Path::new(&args.datadir), + args.control_port, + port, + &args.tor_auth_password, + ) + .await + .expect("Failed to retrieve Tor hostname") + } + None => { + error!("Invalid address format. Expected format: :"); + return; + } + }; + info!("Tracker is listening at {}", hostname); let (mut db_tx, db_rx) = mpsc::channel::(10); let (status_tx, mut status_rx) = mpsc::channel::(10); - let server_address = args.address; + + let server_address = args.address.clone(); spawn_db_manager(db_rx, status_tx.clone()).await; spawn_mempool_indexer(db_tx.clone(), status_tx.clone(), rpc_config.clone().into()).await; - spawn_server(db_tx.clone(), status_tx.clone(), server_address.clone()).await; + spawn_server( + db_tx.clone(), + status_tx.clone(), + server_address.clone(), + args.socks_port, + ) + .await; info!("Tracker started"); while let Some(status) = status_rx.recv().await { match status.state { - State::DBShutdown(e) => { - warn!("DB Manager exited. Restarting..., {:?}", e); - let (db_tx_s, db_rx) = mpsc::channel::(10); - db_tx = db_tx_s; - spawn_db_manager(db_rx, status_tx.clone()).await; - } - State::Healthy(e) => { - info!("All looks good: {:?}", e); - } - State::MempoolShutdown(e) => { + State::DBShutdown(err) => { warn!( - "Mempool Indexer encountered an error. Restarting...: {:?}", - e + "DB Manager exited unexpectedly. Restarting... Error: {:?}", + err ); - let client: Client = rpc_config.clone().into(); - spawn_mempool_indexer(db_tx.clone(), status_tx.clone(), client.into()).await; + let (new_db_tx, new_db_rx) = mpsc::channel::(10); + db_tx = new_db_tx; + spawn_db_manager(new_db_rx, status_tx.clone()).await; + } + State::Healthy(info) => { + info!("System healthy: {:?}", info); + } + State::MempoolShutdown(err) => { + warn!("Mempool Indexer crashed. Restarting... Error: {:?}", err); + let client: Client = rpc_config.clone().into(); + spawn_mempool_indexer(db_tx.clone(), status_tx.clone(), client).await; } - State::ServerShutdown(e) => { - warn!("Server encountered an error. Restarting...: {:?}", e); - spawn_server(db_tx.clone(), status_tx.clone(), server_address.clone()).await; + State::ServerShutdown(err) => { + warn!("Server crashed. Restarting... Error: {:?}", err); + spawn_server( + db_tx.clone(), + status_tx.clone(), + server_address.clone(), + args.socks_port, + ) + .await; } } } @@ -143,17 +190,30 @@ async fn spawn_db_manager(db_tx: Receiver, status_tx: Sender) tokio::spawn(db::run(db_tx, status::Sender::DBManager(status_tx))); } -async fn spawn_mempool_indexer(db_tx: Sender, status_tx: Sender, client: Client) { +async fn spawn_mempool_indexer( + db_tx: Sender, + status_tx: Sender, + client: Client, +) { info!("Spawning indexer"); - tokio::spawn( - indexer::run( - db_tx, - status::Sender::Mempool(status_tx), - client.into() - )); + tokio::spawn(indexer::run( + db_tx, + status::Sender::Mempool(status_tx), + client.into(), + )); } -async fn spawn_server(db_tx: Sender, status_tx: Sender, address: String) { +async fn spawn_server( + db_tx: Sender, + status_tx: Sender, + address: String, + socks_port: u16, +) { info!("Spawning server instance"); - tokio::spawn(server::run(db_tx, status::Sender::Server(status_tx), address)); + tokio::spawn(server::run( + db_tx, + status::Sender::Server(status_tx), + address, + socks_port, + )); } diff --git a/src/server/mod.rs b/src/server/mod.rs index f7ee069..27d3094 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -1,3 +1,3 @@ -mod tracker_server; mod tracker_monitor; +mod tracker_server; pub use tracker_server::run; diff --git a/src/server/tracker_monitor.rs b/src/server/tracker_monitor.rs index 70173e2..1868d9f 100644 --- a/src/server/tracker_monitor.rs +++ b/src/server/tracker_monitor.rs @@ -1,25 +1,28 @@ -use std::error::Error; use std::time::Duration; use tokio::{ - io::{AsyncReadExt, AsyncWriteExt, BufReader, BufWriter}, - net::TcpStream, sync::mpsc::Sender, - time::{sleep, Instant}, + time::{Instant, sleep}, }; -use tracing::{ info, error }; +use tokio_socks::tcp::Socks5Stream; +use tracing::info; -use crate::{handle_result, status, types::{DbRequest, ServerInfo}}; +use crate::{ + error::TrackerError, + handle_result, status, + types::{DbRequest, DnsRequest, DnsResponse, ServerInfo}, + utils::{read_message, send_message}, +}; const COOLDOWN_PERIOD: u64 = 5 * 60; pub async fn monitor_systems( db_tx: Sender, status_tx: status::Sender, -) -> Result<(), Box> { + socks_port: u16, +) -> Result<(), TrackerError> { info!("Starting to monitor other maker services"); loop { - sleep(Duration::from_secs(1000)).await; let (response_tx, mut response_rx) = tokio::sync::mpsc::channel(1); @@ -34,28 +37,29 @@ pub async fn monitor_systems( continue; } - match TcpStream::connect(&address).await { - Ok(stream) => { - let (reader, writer) = stream.into_split(); - let mut buf_reader = BufReader::new(reader); - let mut buf_writer = BufWriter::new(writer); - - if let Err(e) = buf_writer.write_all(b"send").await { - error!("Error: {:?}", e); - continue; - } + match Socks5Stream::connect( + format!("127.0.0.1:{:?}", socks_port).as_str(), + address.clone(), + ) + .await + { + Ok(mut stream) => { + let message = DnsResponse::Ping; + _ = send_message(&mut stream, &message).await; - let mut response = String::new(); - let n = handle_result!(status_tx, buf_reader.read_to_string(&mut response).await); - info!("Number of bytes read: {:?}", n); + let buffer = handle_result!(status_tx, read_message(&mut stream).await); + let response: DnsRequest = + handle_result!(status_tx, serde_cbor::de::from_reader(&buffer[..])); - let updated_info = ServerInfo { - onion_address: response, - cooldown: Instant::now(), - stale: false, - }; + if let DnsRequest::Pong { address } = response { + let updated_info = ServerInfo { + onion_address: address.clone(), + cooldown: Instant::now(), + stale: false, + }; - let _ = db_tx.send(DbRequest::Update(address, updated_info)).await; + let _ = db_tx.send(DbRequest::Update(address, updated_info)).await; + } } Err(_) => { if !server_info.stale { diff --git a/src/server/tracker_server.rs b/src/server/tracker_server.rs index 7154950..4da8143 100644 --- a/src/server/tracker_server.rs +++ b/src/server/tracker_server.rs @@ -1,21 +1,30 @@ -use tokio::io::AsyncWriteExt; -use tokio::net::TcpListener; -use tokio::sync::mpsc; -use tokio::sync::mpsc::Sender; -use tracing::{info, error}; -use tokio::io::BufReader; -use tokio::io::BufWriter; -use crate::error::TrackerError; +use crate::handle_result; use crate::server::tracker_monitor::monitor_systems; -use tokio::io::AsyncBufReadExt; use crate::status; use crate::types::DbRequest; - -pub async fn run(db_tx: Sender, status_tx: status::Sender, address: String) -> Result<(), Box>{ - +use crate::types::DnsRequest; +use crate::types::DnsResponse; +use crate::utils::read_message; +use crate::utils::send_message; +use tokio::net::TcpListener; +use tokio::net::TcpStream; +use tokio::sync::mpsc; +use tokio::sync::mpsc::Sender; +use tracing::info; + +pub async fn run( + db_tx: Sender, + status_tx: status::Sender, + address: String, + socks_port: u16, +) -> Result<(), Box> { let server = TcpListener::bind(&address).await?; - tokio::spawn(monitor_systems(db_tx.clone(), status_tx.clone())); + tokio::spawn(monitor_systems( + db_tx.clone(), + status_tx.clone(), + socks_port, + )); info!("Tracker server listening on {}", address); @@ -23,91 +32,36 @@ pub async fn run(db_tx: Sender, status_tx: status::Sender, address: S info!("Accepted connection from {}", client_addr); let status_tx_clone = status_tx.clone(); let db_tx_clone = db_tx.clone(); - tokio::spawn(async move { - let (reader_part, writer_part) = stream.into_split(); - let mut reader = BufReader::new(reader_part); - let mut writer = BufWriter::new(writer_part); - let mut line = String::new(); - loop { - line.clear(); - let bytes_read = match reader.read_line(&mut line).await { - Ok(n) => n, - Err(e) => { - error!("Error reading from {}: {}", client_addr, e); - let _ = status_tx_clone.send(status::Status { - state: status::State::ServerShutdown(TrackerError::IOError(e)) - }).await; - return; - } - }; - - info!("Bytes read: {:?}", bytes_read); - - let request_message = line.trim(); - - if request_message.is_empty() { - continue; - } - - info!("Received message from {}: {:?}", client_addr, request_message); - - let response_message: String; - - if request_message.starts_with("QUERY ") { - - let (resp_tx, mut resp_rx) = mpsc::channel(1); - let db_request = DbRequest::QueryActive(resp_tx); - let send_res = db_tx_clone.send(db_request).await; + tokio::spawn(async move { handle_client(stream, status_tx_clone, db_tx_clone).await }); + } - if let Err(e) = send_res { - error!("Error sending DB query for {}: {}", client_addr, e); - let _ = status_tx_clone.send(status::Status { - state: status::State::ServerShutdown(TrackerError::DbManagerExited) // Example error - }).await; - response_message = "ERROR: DB query failed".to_string(); - } else { - match resp_rx.recv().await { - Some(server_info) => { - response_message = format!("OK: {:?}", server_info); - } - None => { - response_message = "NOT_FOUND".to_string(); - } - } - } - } else if request_message.starts_with("ADD ") { - response_message = "ERROR: Add command not implemented via server".to_string(); - } - else { - response_message = format!("ERROR: Unknown command '{}'", request_message); // Custom protocol error - } + Ok(()) +} - let write_res = writer.write_all(response_message.as_bytes()).await; - if let Err(e) = write_res { - error!("Error writing to {}: {}", client_addr, e); - let _ = status_tx_clone.send(status::Status { - state: status::State::ServerShutdown(TrackerError::IOError(e)) - }).await; - return; - } - if let Err(e) = writer.write_all(b"\n").await { - error!("Error writing newline to {}: {}", client_addr, e); - let _ = status_tx_clone.send(status::Status { - state: status::State::ServerShutdown(TrackerError::IOError(e)) - }).await; - } - if let Err(e) = writer.flush().await { - error!("Error flushing writer for {}: {}", client_addr, e); - let _ = status_tx_clone.send(status::Status { - state: status::State::ServerShutdown(TrackerError::IOError(e)) - }).await; - return; +async fn handle_client(mut stream: TcpStream, status_tx: status::Sender, db_tx: Sender) { + loop { + let buffer = handle_result!(status_tx, read_message(&mut stream).await); + let request: DnsRequest = + handle_result!(status_tx, serde_cbor::de::from_reader(&buffer[..])); + + match request { + DnsRequest::Get => { + info!("Received Get request taker"); + let (resp_tx, mut resp_rx) = mpsc::channel(1); + let db_request = DbRequest::QueryActive(resp_tx); + handle_result!(status_tx, db_tx.send(db_request).await); + let response = resp_rx.recv().await; + if let Some(addresses) = response { + let message = DnsResponse::Address { addresses }; + _ = send_message(&mut stream, &message).await; } - println!("Sent response to {}", client_addr); } - }); + DnsRequest::Post { metadata: _ } => { + todo!() + } + DnsRequest::Pong { address: _ } => { + todo!() + } + } } - - Ok(()) - } diff --git a/src/status.rs b/src/status.rs index 60ab428..12d0f17 100644 --- a/src/status.rs +++ b/src/status.rs @@ -48,26 +48,30 @@ async fn send_status(sender: &Sender, e: TrackerError, outcome: ErrorBranch) -> tx.send(Status { state: State::MempoolShutdown(e), }) - .await.unwrap_or(()); + .await + .unwrap_or(()); } _ => { tx.send(Status { state: State::Healthy("error occured in mempool".to_string()), }) - .await.unwrap_or(()); + .await + .unwrap_or(()); } }, Sender::Server(tx) => { tx.send(Status { state: State::ServerShutdown(e), }) - .await.unwrap_or(()); + .await + .unwrap_or(()); } Sender::DBManager(tx) => { tx.send(Status { state: State::DBShutdown(e), }) - .await.unwrap_or(()); + .await + .unwrap_or(()); } } outcome @@ -82,6 +86,8 @@ pub async fn handle_error(sender: &Sender, e: TrackerError) -> ErrorBranch { TrackerError::IOError(_) => send_status(sender, e, ErrorBranch::Break).await, TrackerError::RPCError(_) => send_status(sender, e, ErrorBranch::Break).await, TrackerError::ParsingError => send_status(sender, e, ErrorBranch::Continue).await, - TrackerError::SendError => send_status(sender, e, ErrorBranch::Break).await + TrackerError::SendError => send_status(sender, e, ErrorBranch::Break).await, + TrackerError::SerdeCbor(_) => send_status(sender, e, ErrorBranch::Break).await, + TrackerError::General(_) => send_status(sender, e, ErrorBranch::Break).await, } } diff --git a/src/tor.rs b/src/tor.rs new file mode 100644 index 0000000..cc2ba6a --- /dev/null +++ b/src/tor.rs @@ -0,0 +1,159 @@ +use std::path::Path; + +use tokio::{ + fs, + io::{AsyncBufReadExt, AsyncWriteExt, BufReader}, + net::TcpStream, +}; +use tracing::{error, info, warn}; + +use crate::error::TrackerError; + +pub(crate) async fn check_tor_status( + control_port: u16, + password: &str, +) -> Result<(), TrackerError> { + let (reader, mut writer) = TcpStream::connect(format!("127.0.0.1:{}", control_port)) + .await? + .into_split(); + let mut reader = BufReader::new(reader); + let auth_command = format!("AUTHENTICATE \"{}\"\r\n", password); + writer.write_all(auth_command.as_bytes()).await?; + let mut response = String::new(); + reader.read_line(&mut response).await?; + if !response.starts_with("250") { + error!( + "Tor authentication failed: {}, please provide correct password", + response + ); + return Err(TrackerError::General( + "Tor authentication failed".to_string(), + )); + } + writer + .write_all(b"GETINFO status/bootstrap-phase\r\n") + .await?; + response.clear(); + reader.read_line(&mut response).await?; + + if response.contains("PROGRESS=100") { + info!("Tor is fully started and operational!"); + } else { + warn!("Tor is still starting, try again later: {}", response); + } + Ok(()) +} + +pub(crate) async fn get_emphemeral_address( + control_port: u16, + target_port: u16, + password: &str, + private_key_data: Option<&str>, + service_id_data: Option<&str>, +) -> Result<(String, String), TrackerError> { + let (reader, mut writer) = TcpStream::connect(format!("127.0.0.1:{}", control_port)) + .await? + .into_split(); + let mut reader = BufReader::new(reader); + let mut response = String::new(); + let mut service_id = String::new(); + let mut private_key = String::new(); + let auth_command = format!("AUTHENTICATE \"{}\"\r\n", password); + writer.write_all(auth_command.as_bytes()).await?; + if let Some(service_id) = service_id_data { + let remove_command = format!("DEL_ONION {}\r\n", service_id); + writer.write_all(remove_command.as_bytes()).await?; + } + let mut add_onion_command = format!( + "ADD_ONION NEW:BEST Flags=Detach Port={},127.0.0.1:{}\r\n", + target_port, target_port + ); + if let Some(pk) = private_key_data { + add_onion_command = format!( + "ADD_ONION {} Flags=Detach Port={},127.0.0.1:{}\r\n", + pk, target_port, target_port + ); + private_key = pk.to_string(); + } + writer.write_all(add_onion_command.as_bytes()).await?; + + while reader.read_line(&mut response).await? > 0 { + if response.starts_with("250-ServiceID=") { + service_id = response + .trim_start_matches("250-ServiceID=") + .trim() + .to_string(); + if private_key_data.is_some() { + break; + } + } else if response.starts_with("250-PrivateKey=") { + private_key = response + .trim_start_matches("250-PrivateKey=") + .trim() + .to_string(); + break; + } + response.clear(); + } + + if service_id.is_empty() || private_key.is_empty() { + return Err(TrackerError::General( + "Failed to retrieve ephemeral onion service details".to_string(), + )); + } + Ok((format!("{}.onion", service_id), private_key)) +} + +pub(crate) async fn get_tor_hostname( + data_dir: &Path, + control_port: u16, + target_port: u16, + password: &str, +) -> Result { + let tor_config_path = data_dir.join("tor/hostname"); + + if tor_config_path.exists() { + if let Ok(tor_metadata) = fs::read(&tor_config_path).await { + let data: [&str; 2] = serde_cbor::de::from_slice(&tor_metadata)?; + + let hostname_data = data[1]; + let private_key_data = data[0]; + + let (hostname, private_key) = get_emphemeral_address( + control_port, + target_port, + password, + Some(private_key_data), + Some(hostname_data.replace(".onion", "").as_str()), + ) + .await?; + + assert_eq!(hostname, hostname_data); + assert_eq!(private_key, private_key_data); + + info!( + "Generated existing Tor Hidden Service Hostname: {}", + hostname + ); + + return Ok(hostname); + } + } + + let (hostname, private_key) = + get_emphemeral_address(control_port, target_port, password, None, None).await?; + + if let Some(parent) = tor_config_path.parent() { + fs::create_dir_all(parent).await?; + } + + fs::write( + &tor_config_path, + serde_cbor::ser::to_vec(&[private_key, hostname.clone()])?, + ) + .await?; + + info!("Generated new Tor Hidden Service Hostname: {}", hostname); + + Ok(hostname) +} diff --git a/src/types.rs b/src/types.rs index 6eda768..cd858e3 100644 --- a/src/types.rs +++ b/src/types.rs @@ -1,10 +1,15 @@ +use bitcoincore_rpc::bitcoin::{ + Amount, OutPoint, PublicKey, absolute::LockTime, hashes::hash160::Hash, + secp256k1::ecdsa::Signature, +}; +use serde::{Deserialize, Serialize}; use tokio::{sync::mpsc::Sender, time::Instant}; #[derive(Debug, Clone)] pub struct ServerInfo { pub onion_address: String, pub cooldown: Instant, - pub stale: bool + pub stale: bool, } pub enum DbRequest { @@ -12,5 +17,57 @@ pub enum DbRequest { Query(String, Sender>), Update(String, ServerInfo), QueryAll(Sender>), - QueryActive(Sender>) + QueryActive(Sender>), +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, PartialOrd, Hash)] +pub struct FidelityBond { + pub(crate) outpoint: OutPoint, + /// Fidelity Amount + pub amount: Amount, + /// Fidelity Locktime + pub lock_time: LockTime, + pub(crate) pubkey: PublicKey, + // Height at which the bond was confirmed. + pub(crate) conf_height: Option, + // Cert expiry denoted in multiple of difficulty adjustment period (2016 blocks) + pub(crate) cert_expiry: Option, +} + +/// Contains proof data related to fidelity bond. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +pub struct FidelityProof { + pub(crate) bond: FidelityBond, + pub(crate) cert_hash: Hash, + pub(crate) cert_sig: Signature, +} + +/// Metadata shared by the maker with the Directory Server for verifying authenticity. +#[derive(Serialize, Deserialize, Debug)] +#[allow(private_interfaces)] +pub struct DnsMetadata { + /// The maker's URL. + pub url: String, + /// Proof of the maker's fidelity bond funding. + pub proof: FidelityProof, +} + +#[derive(Serialize, Deserialize, Debug)] +#[allow(clippy::large_enum_variant)] +pub enum DnsRequest { + /// A request sent by the maker to register itself with the DNS server and authenticate. + Post { + /// Metadata containing the maker's URL and fidelity proof. + metadata: DnsMetadata, + }, + /// A request sent by the taker to fetch all valid maker addresses from the DNS server. + Get, + /// To gauge server activity + Pong { address: String }, +} + +#[derive(Serialize, Deserialize, Debug)] +pub enum DnsResponse { + Address { addresses: Vec }, + Ping, } diff --git a/src/utils.rs b/src/utils.rs new file mode 100644 index 0000000..d445eaf --- /dev/null +++ b/src/utils.rs @@ -0,0 +1,34 @@ +use tokio::{ + io::{AsyncReadExt, AsyncWriteExt, BufReader, BufWriter}, + net::TcpStream, +}; + +use crate::error::TrackerError; + +pub async fn read_message(reader: &mut TcpStream) -> Result, TrackerError> { + let mut reader = BufReader::new(reader); + // length of incoming data + let mut len_buff = [0u8; 4]; + reader.read_exact(&mut len_buff).await?; + let length = u32::from_be_bytes(len_buff); + let mut buffer = vec![0; length as usize]; + + _ = reader.read(&mut buffer[4..]).await?; + + Ok(buffer) +} + +pub async fn send_message( + socket_writer: &mut TcpStream, + message: &impl serde::Serialize, +) -> Result<(), TrackerError> { + let mut writer = BufWriter::new(socket_writer); + let msg_bytes = serde_cbor::ser::to_vec(message).unwrap(); + let msg_len = (msg_bytes.len() as u32).to_be_bytes(); + let mut to_send = Vec::with_capacity(msg_bytes.len() + msg_len.len()); + to_send.extend(msg_len); + to_send.extend(msg_bytes); + _ = writer.write_all(&to_send).await; + writer.flush().await?; + Ok(()) +} From 214ca265e5121de5e41ee493228fc246c4e10676 Mon Sep 17 00:00:00 2001 From: Wukong Date: Tue, 17 Jun 2025 11:49:32 +0530 Subject: [PATCH 03/12] improve error handling, remove buffer inaccuracy and add retry logic --- src/error.rs | 2 +- src/indexer/tracker_indexer.rs | 14 +++--- src/server/tracker_monitor.rs | 87 +++++++++++++++++++++------------- src/server/tracker_server.rs | 11 ++++- src/tor.rs | 70 +++++++++++++-------------- src/utils.rs | 12 ++--- 6 files changed, 110 insertions(+), 86 deletions(-) diff --git a/src/error.rs b/src/error.rs index 2b0acc1..9d049f9 100644 --- a/src/error.rs +++ b/src/error.rs @@ -35,7 +35,7 @@ impl Error for TrackerError { impl std::fmt::Display for TrackerError { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{:?}", self) + write!(f, "{self:?}") } } diff --git a/src/indexer/tracker_indexer.rs b/src/indexer/tracker_indexer.rs index 32c4b04..df38a52 100644 --- a/src/indexer/tracker_indexer.rs +++ b/src/indexer/tracker_indexer.rs @@ -1,3 +1,5 @@ +use std::time::Duration; + use tokio::{sync::mpsc::Sender, time::Instant}; use bitcoincore_rpc::bitcoin::absolute::{Height, LockTime}; @@ -13,6 +15,7 @@ pub async fn run(db_tx: Sender, status_tx: status::Sender, client: Bi info!("Indexer started"); let mut last_tip = 0; loop { + tokio::time::sleep(Duration::from_secs(10)).await; let blockchain_info = handle_result!(status_tx, client.get_blockchain_info()); let tip_height = blockchain_info.blocks; for height in last_tip..tip_height { @@ -25,13 +28,10 @@ pub async fn run(db_tx: Sender, status_tx: status::Sender, client: Bi if tx.output.len() != 2 { continue; } - let onion_address = tx - .output - .iter() - .filter_map(|txout| { - extract_onion_address_from_script(txout.script_pubkey.as_bytes()) - }) - .next(); + + let onion_address = tx.output.iter().find_map(|txout| { + extract_onion_address_from_script(txout.script_pubkey.as_bytes()) + }); if let Some(onion_address) = onion_address { let server_info = ServerInfo { diff --git a/src/server/tracker_monitor.rs b/src/server/tracker_monitor.rs index 1868d9f..6d63143 100644 --- a/src/server/tracker_monitor.rs +++ b/src/server/tracker_monitor.rs @@ -1,11 +1,12 @@ use std::time::Duration; use tokio::{ + io::BufWriter, sync::mpsc::Sender, time::{Instant, sleep}, }; use tokio_socks::tcp::Socks5Stream; -use tracing::info; +use tracing::{info, warn}; use crate::{ error::TrackerError, @@ -14,14 +15,16 @@ use crate::{ utils::{read_message, send_message}, }; -const COOLDOWN_PERIOD: u64 = 5 * 60; +use tokio::io::BufReader; +const COOLDOWN_PERIOD: u64 = 5 * 60; pub async fn monitor_systems( db_tx: Sender, status_tx: status::Sender, socks_port: u16, ) -> Result<(), TrackerError> { info!("Starting to monitor other maker services"); + loop { sleep(Duration::from_secs(1000)).await; @@ -37,40 +40,60 @@ pub async fn monitor_systems( continue; } - match Socks5Stream::connect( - format!("127.0.0.1:{:?}", socks_port).as_str(), - address.clone(), - ) - .await - { - Ok(mut stream) => { - let message = DnsResponse::Ping; - _ = send_message(&mut stream, &message).await; - - let buffer = handle_result!(status_tx, read_message(&mut stream).await); - let response: DnsRequest = - handle_result!(status_tx, serde_cbor::de::from_reader(&buffer[..])); - - if let DnsRequest::Pong { address } = response { - let updated_info = ServerInfo { - onion_address: address.clone(), - cooldown: Instant::now(), - stale: false, - }; - - let _ = db_tx.send(DbRequest::Update(address, updated_info)).await; + let mut success = false; + for attempt in 1..=3 { + let connect_result = Socks5Stream::connect( + format!("127.0.0.1:{socks_port:?}").as_str(), + address.clone(), + ) + .await; + + match connect_result { + Ok(mut stream) => { + success = true; + + let (read_half, write_half) = stream.split(); + + let mut reader = BufReader::new(read_half); + + let mut writer = BufWriter::new(write_half); + + let message = DnsResponse::Ping; + _ = send_message(&mut writer, &message).await; + + let buffer = handle_result!(status_tx, read_message(&mut reader).await); + let response: DnsRequest = + handle_result!(status_tx, serde_cbor::de::from_reader(&buffer[..])); + + if let DnsRequest::Pong { address } = response { + let updated_info = ServerInfo { + onion_address: address.clone(), + cooldown: Instant::now(), + stale: false, + }; + let _ = db_tx.send(DbRequest::Update(address, updated_info)).await; + } + + break; } - } - Err(_) => { - if !server_info.stale { - let updated_info = ServerInfo { - stale: true, - ..server_info - }; - let _ = db_tx.send(DbRequest::Update(address, updated_info)).await; + + Err(e) => { + warn!( + "Failed to connect to {} (attempt {}/3): {}", + address, attempt, e + ); + sleep(Duration::from_secs(1)).await; } } } + + if !success && !server_info.stale { + let updated_info = ServerInfo { + stale: true, + ..server_info + }; + let _ = db_tx.send(DbRequest::Update(address, updated_info)).await; + } } } } diff --git a/src/server/tracker_server.rs b/src/server/tracker_server.rs index 4da8143..785e400 100644 --- a/src/server/tracker_server.rs +++ b/src/server/tracker_server.rs @@ -6,6 +6,8 @@ use crate::types::DnsRequest; use crate::types::DnsResponse; use crate::utils::read_message; use crate::utils::send_message; +use tokio::io::BufReader; +use tokio::io::BufWriter; use tokio::net::TcpListener; use tokio::net::TcpStream; use tokio::sync::mpsc; @@ -39,8 +41,13 @@ pub async fn run( } async fn handle_client(mut stream: TcpStream, status_tx: status::Sender, db_tx: Sender) { + let (read_half, write_half) = stream.split(); + + let mut reader = BufReader::new(read_half); + let mut writer = BufWriter::new(write_half); + loop { - let buffer = handle_result!(status_tx, read_message(&mut stream).await); + let buffer = handle_result!(status_tx, read_message(&mut reader).await); let request: DnsRequest = handle_result!(status_tx, serde_cbor::de::from_reader(&buffer[..])); @@ -53,7 +60,7 @@ async fn handle_client(mut stream: TcpStream, status_tx: status::Sender, db_tx: let response = resp_rx.recv().await; if let Some(addresses) = response { let message = DnsResponse::Address { addresses }; - _ = send_message(&mut stream, &message).await; + _ = send_message(&mut writer, &message).await; } } DnsRequest::Post { metadata: _ } => { diff --git a/src/tor.rs b/src/tor.rs index cc2ba6a..f9ce394 100644 --- a/src/tor.rs +++ b/src/tor.rs @@ -13,11 +13,11 @@ pub(crate) async fn check_tor_status( control_port: u16, password: &str, ) -> Result<(), TrackerError> { - let (reader, mut writer) = TcpStream::connect(format!("127.0.0.1:{}", control_port)) + let (reader, mut writer) = TcpStream::connect(format!("127.0.0.1:{control_port}")) .await? .into_split(); let mut reader = BufReader::new(reader); - let auth_command = format!("AUTHENTICATE \"{}\"\r\n", password); + let auth_command = format!("AUTHENTICATE \"{password}\"\r\n"); writer.write_all(auth_command.as_bytes()).await?; let mut response = String::new(); reader.read_line(&mut response).await?; @@ -51,28 +51,24 @@ pub(crate) async fn get_emphemeral_address( private_key_data: Option<&str>, service_id_data: Option<&str>, ) -> Result<(String, String), TrackerError> { - let (reader, mut writer) = TcpStream::connect(format!("127.0.0.1:{}", control_port)) + let (reader, mut writer) = TcpStream::connect(format!("127.0.0.1:{control_port}")) .await? .into_split(); let mut reader = BufReader::new(reader); let mut response = String::new(); let mut service_id = String::new(); let mut private_key = String::new(); - let auth_command = format!("AUTHENTICATE \"{}\"\r\n", password); + let auth_command = format!("AUTHENTICATE \"{password}\"\r\n"); writer.write_all(auth_command.as_bytes()).await?; if let Some(service_id) = service_id_data { - let remove_command = format!("DEL_ONION {}\r\n", service_id); + let remove_command = format!("DEL_ONION {service_id}\r\n"); writer.write_all(remove_command.as_bytes()).await?; } - let mut add_onion_command = format!( - "ADD_ONION NEW:BEST Flags=Detach Port={},127.0.0.1:{}\r\n", - target_port, target_port - ); + let mut add_onion_command = + format!("ADD_ONION NEW:BEST Flags=Detach Port={target_port},127.0.0.1:{target_port}\r\n"); if let Some(pk) = private_key_data { - add_onion_command = format!( - "ADD_ONION {} Flags=Detach Port={},127.0.0.1:{}\r\n", - pk, target_port, target_port - ); + add_onion_command = + format!("ADD_ONION {pk} Flags=Detach Port={target_port},127.0.0.1:{target_port}\r\n"); private_key = pk.to_string(); } writer.write_all(add_onion_command.as_bytes()).await?; @@ -101,7 +97,7 @@ pub(crate) async fn get_emphemeral_address( "Failed to retrieve ephemeral onion service details".to_string(), )); } - Ok((format!("{}.onion", service_id), private_key)) + Ok((format!("{service_id}.onion"), private_key)) } pub(crate) async fn get_tor_hostname( @@ -112,32 +108,32 @@ pub(crate) async fn get_tor_hostname( ) -> Result { let tor_config_path = data_dir.join("tor/hostname"); - if tor_config_path.exists() { - if let Ok(tor_metadata) = fs::read(&tor_config_path).await { - let data: [&str; 2] = serde_cbor::de::from_slice(&tor_metadata)?; - - let hostname_data = data[1]; - let private_key_data = data[0]; - - let (hostname, private_key) = get_emphemeral_address( - control_port, - target_port, - password, - Some(private_key_data), - Some(hostname_data.replace(".onion", "").as_str()), - ) - .await?; + if tor_config_path.exists() + && let Ok(tor_metadata) = fs::read(&tor_config_path).await + { + let data: [&str; 2] = serde_cbor::de::from_slice(&tor_metadata)?; + + let hostname_data = data[1]; + let private_key_data = data[0]; + + let (hostname, private_key) = get_emphemeral_address( + control_port, + target_port, + password, + Some(private_key_data), + Some(hostname_data.replace(".onion", "").as_str()), + ) + .await?; - assert_eq!(hostname, hostname_data); - assert_eq!(private_key, private_key_data); + assert_eq!(hostname, hostname_data); + assert_eq!(private_key, private_key_data); - info!( - "Generated existing Tor Hidden Service Hostname: {}", - hostname - ); + info!( + "Generated existing Tor Hidden Service Hostname: {}", + hostname + ); - return Ok(hostname); - } + return Ok(hostname); } let (hostname, private_key) = diff --git a/src/utils.rs b/src/utils.rs index d445eaf..d778d75 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -1,12 +1,11 @@ use tokio::{ io::{AsyncReadExt, AsyncWriteExt, BufReader, BufWriter}, - net::TcpStream, + net::tcp::{ReadHalf, WriteHalf}, }; use crate::error::TrackerError; -pub async fn read_message(reader: &mut TcpStream) -> Result, TrackerError> { - let mut reader = BufReader::new(reader); +pub async fn read_message(reader: &mut BufReader>) -> Result, TrackerError> { // length of incoming data let mut len_buff = [0u8; 4]; reader.read_exact(&mut len_buff).await?; @@ -19,16 +18,15 @@ pub async fn read_message(reader: &mut TcpStream) -> Result, TrackerErro } pub async fn send_message( - socket_writer: &mut TcpStream, + writer: &mut BufWriter>, message: &impl serde::Serialize, ) -> Result<(), TrackerError> { - let mut writer = BufWriter::new(socket_writer); - let msg_bytes = serde_cbor::ser::to_vec(message).unwrap(); + let msg_bytes = serde_cbor::ser::to_vec(message)?; let msg_len = (msg_bytes.len() as u32).to_be_bytes(); let mut to_send = Vec::with_capacity(msg_bytes.len() + msg_len.len()); to_send.extend(msg_len); to_send.extend(msg_bytes); - _ = writer.write_all(&to_send).await; + writer.write_all(&to_send).await?; writer.flush().await?; Ok(()) } From b47531996b5cc4579fe09eb6ad9167afa6884c5d Mon Sep 17 00:00:00 2001 From: theivess Date: Mon, 23 Jun 2025 00:16:15 +0530 Subject: [PATCH 04/12] add basic ci workflow --- .github/workflows/ci.yml | 91 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 91 insertions(+) create mode 100644 .github/workflows/ci.yml diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml new file mode 100644 index 0000000..91dee3e --- /dev/null +++ b/.github/workflows/ci.yml @@ -0,0 +1,91 @@ +name: CI + +on: + push: + branches: [ main ] + pull_request: + branches: [ main ] + +env: + CARGO_TERM_COLOR: always + RUSTFLAGS: "-Dwarnings" + +jobs: + check: + name: check + runs-on: ubuntu-latest + steps: + - name: Checkout repository + uses: actions/checkout@v4 + + - name: Setup Rust toolchain + uses: actions-rust-lang/setup-rust-toolchain@v1 + with: + toolchain: nightly + components: rustfmt, clippy + + - name: Check code compilation + run: cargo check --all-targets --all-features + + fmt: + name: rustfmt + runs-on: ubuntu-latest + steps: + - name: Checkout repository + uses: actions/checkout@v4 + + - name: Setup Rust toolchain + uses: actions-rust-lang/setup-rust-toolchain@v1 + with: + toolchain: nightly + components: rustfmt + + - name: Check code formatting + run: cargo fmt --all -- --check + + clippy: + name: clippy + runs-on: ubuntu-latest + steps: + - name: Checkout repository + uses: actions/checkout@v4 + + - name: Setup Rust toolchain + uses: actions-rust-lang/setup-rust-toolchain@v1 + with: + toolchain: nightly + components: clippy + + - name: Run clippy + run: cargo clippy --all-targets --all-features -- -D warnings + + test: + name: tests + runs-on: ubuntu-latest + steps: + - name: Checkout repository + uses: actions/checkout@v4 + + - name: Setup Rust toolchain + uses: actions-rust-lang/setup-rust-toolchain@v1 + with: + toolchain: nightly + + - name: Run tests + run: cargo test --all-features + + build: + name: build + runs-on: ubuntu-latest + needs: [check, fmt, clippy, test] + steps: + - name: Checkout repository + uses: actions/checkout@v4 + + - name: Setup Rust toolchain + uses: actions-rust-lang/setup-rust-toolchain@v1 + with: + toolchain: nightly + + - name: Build project + run: cargo build --release From c0983933dae52e004f59288f5bded81bba35672c Mon Sep 17 00:00:00 2001 From: Wukong Date: Thu, 26 Jun 2025 10:03:57 +0530 Subject: [PATCH 05/12] update gitignore --- .gitignore | 1 + 1 file changed, 1 insertion(+) diff --git a/.gitignore b/.gitignore index ea8c4bf..e1d2205 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,2 @@ /target +.ra-target \ No newline at end of file From 07aa32cd9a6688998e7578ed8270787534ad9af5 Mon Sep 17 00:00:00 2001 From: Wukong Date: Thu, 26 Jun 2025 20:23:02 +0530 Subject: [PATCH 06/12] add kind to error module --- src/error.rs | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/src/error.rs b/src/error.rs index 9d049f9..f5e5788 100644 --- a/src/error.rs +++ b/src/error.rs @@ -50,3 +50,27 @@ impl From for TrackerError { Self::SerdeCbor(value) } } + +impl TrackerError { + pub fn io_error_kind(&self) -> Option { + match self { + TrackerError::IOError(e) => Some(e.kind()), + _ => None, + } + } + + pub fn kind(&self) -> &'static str { + match self { + TrackerError::DbManagerExited => "DbManagerExited", + TrackerError::ServerError => "ServerError", + TrackerError::MempoolIndexerError => "MempoolIndexerError", + TrackerError::Shutdown => "Shutdown", + TrackerError::ParsingError => "ParsingError", + TrackerError::SendError => "SendError", + TrackerError::IOError(_) => "IOError", + TrackerError::RPCError(_) => "RPCError", + TrackerError::SerdeCbor(_) => "SerdeCbor", + TrackerError::General(_) => "General", + } + } +} From 7a098dc54f8e3c12a65b62a72f9ff9202bf92883 Mon Sep 17 00:00:00 2001 From: Wukong Date: Thu, 26 Jun 2025 20:23:25 +0530 Subject: [PATCH 07/12] update status module --- src/status.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/status.rs b/src/status.rs index 12d0f17..cbaac71 100644 --- a/src/status.rs +++ b/src/status.rs @@ -80,7 +80,7 @@ async fn send_status(sender: &Sender, e: TrackerError, outcome: ErrorBranch) -> pub async fn handle_error(sender: &Sender, e: TrackerError) -> ErrorBranch { match e { TrackerError::DbManagerExited => send_status(sender, e, ErrorBranch::Break).await, - TrackerError::MempoolIndexerError => send_status(sender, e, ErrorBranch::Break).await, + TrackerError::MempoolIndexerError => send_status(sender, e, ErrorBranch::Continue).await, TrackerError::ServerError => send_status(sender, e, ErrorBranch::Break).await, TrackerError::Shutdown => send_status(sender, e, ErrorBranch::Break).await, TrackerError::IOError(_) => send_status(sender, e, ErrorBranch::Break).await, From 2bf069944417f99b66ac8ffafbd02f5777d4a7d3 Mon Sep 17 00:00:00 2001 From: Wukong Date: Thu, 26 Jun 2025 20:23:42 +0530 Subject: [PATCH 08/12] change types name --- src/types.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/types.rs b/src/types.rs index cd858e3..9ce9060 100644 --- a/src/types.rs +++ b/src/types.rs @@ -54,7 +54,7 @@ pub struct DnsMetadata { #[derive(Serialize, Deserialize, Debug)] #[allow(clippy::large_enum_variant)] -pub enum DnsRequest { +pub enum TrackerRequest { /// A request sent by the maker to register itself with the DNS server and authenticate. Post { /// Metadata containing the maker's URL and fidelity proof. @@ -67,7 +67,7 @@ pub enum DnsRequest { } #[derive(Serialize, Deserialize, Debug)] -pub enum DnsResponse { +pub enum TrackerResponse { Address { addresses: Vec }, Ping, } From 54bd8253592476b911e059b6fac7203041c0b99a Mon Sep 17 00:00:00 2001 From: Wukong Date: Thu, 26 Jun 2025 20:23:56 +0530 Subject: [PATCH 09/12] fix the read message handler --- src/utils.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/utils.rs b/src/utils.rs index d778d75..1d27a4a 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -12,7 +12,7 @@ pub async fn read_message(reader: &mut BufReader>) -> Result Date: Thu, 26 Jun 2025 20:24:48 +0530 Subject: [PATCH 10/12] update the indexer --- src/indexer/tracker_indexer.rs | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/src/indexer/tracker_indexer.rs b/src/indexer/tracker_indexer.rs index df38a52..58b1541 100644 --- a/src/indexer/tracker_indexer.rs +++ b/src/indexer/tracker_indexer.rs @@ -15,9 +15,9 @@ pub async fn run(db_tx: Sender, status_tx: status::Sender, client: Bi info!("Indexer started"); let mut last_tip = 0; loop { - tokio::time::sleep(Duration::from_secs(10)).await; let blockchain_info = handle_result!(status_tx, client.get_blockchain_info()); - let tip_height = blockchain_info.blocks; + let tip_height = blockchain_info.blocks + 1; + for height in last_tip..tip_height { let block_hash = handle_result!(status_tx, client.get_block_hash(height)); let block = handle_result!(status_tx, client.get_block(block_hash)); @@ -25,7 +25,8 @@ pub async fn run(db_tx: Sender, status_tx: status::Sender, client: Bi if tx.lock_time == LockTime::Blocks(Height::ZERO) { continue; } - if tx.output.len() != 2 { + + if tx.output.len() < 2 { continue; } @@ -47,6 +48,7 @@ pub async fn run(db_tx: Sender, status_tx: status::Sender, client: Bi } } last_tip = tip_height; + tokio::time::sleep(Duration::from_secs(10)).await; } } From dd2e23e6b4e6b45095e49e2741b22242033a92a4 Mon Sep 17 00:00:00 2001 From: Wukong Date: Thu, 26 Jun 2025 20:25:16 +0530 Subject: [PATCH 11/12] improve error handling in tracker monitor --- src/server/tracker_monitor.rs | 33 +++++++++++++++++++++------------ 1 file changed, 21 insertions(+), 12 deletions(-) diff --git a/src/server/tracker_monitor.rs b/src/server/tracker_monitor.rs index 6d63143..cbbf3a1 100644 --- a/src/server/tracker_monitor.rs +++ b/src/server/tracker_monitor.rs @@ -6,18 +6,20 @@ use tokio::{ time::{Instant, sleep}, }; use tokio_socks::tcp::Socks5Stream; -use tracing::{info, warn}; +use tracing::{error, info, warn}; use crate::{ error::TrackerError, - handle_result, status, - types::{DbRequest, DnsRequest, DnsResponse, ServerInfo}, - utils::{read_message, send_message}, + handle_result, + server::send_message_with_prefix, + status, + types::{DbRequest, ServerInfo, TrackerRequest, TrackerResponse}, + utils::read_message, }; use tokio::io::BufReader; -const COOLDOWN_PERIOD: u64 = 5 * 60; +const COOLDOWN_PERIOD: u64 = 5; pub async fn monitor_systems( db_tx: Sender, status_tx: status::Sender, @@ -26,8 +28,6 @@ pub async fn monitor_systems( info!("Starting to monitor other maker services"); loop { - sleep(Duration::from_secs(1000)).await; - let (response_tx, mut response_rx) = tokio::sync::mpsc::channel(1); if db_tx.send(DbRequest::QueryAll(response_tx)).await.is_err() { continue; @@ -39,6 +39,7 @@ pub async fn monitor_systems( if server_info.cooldown.elapsed() <= cooldown_duration { continue; } + info!("Address to query: {:?}", address); let mut success = false; for attempt in 1..=3 { @@ -58,14 +59,21 @@ pub async fn monitor_systems( let mut writer = BufWriter::new(write_half); - let message = DnsResponse::Ping; - _ = send_message(&mut writer, &message).await; + let message = TrackerResponse::Ping; + _ = send_message_with_prefix(&mut writer, &message).await; let buffer = handle_result!(status_tx, read_message(&mut reader).await); - let response: DnsRequest = - handle_result!(status_tx, serde_cbor::de::from_reader(&buffer[..])); + let response: TrackerRequest = + match serde_cbor::de::from_reader(&buffer[..]) { + Ok(resp) => resp, + Err(e) => { + error!("Deserialization error: {e:?}"); + sleep(Duration::from_secs(1)).await; + continue; + } + }; - if let DnsRequest::Pong { address } = response { + if let TrackerRequest::Pong { address } = response { let updated_info = ServerInfo { onion_address: address.clone(), cooldown: Instant::now(), @@ -95,6 +103,7 @@ pub async fn monitor_systems( let _ = db_tx.send(DbRequest::Update(address, updated_info)).await; } } + sleep(Duration::from_secs(4)).await; } } } From b503f7d701f9a6faeb6d493a6ed0bcb11e23fd80 Mon Sep 17 00:00:00 2001 From: Wukong Date: Thu, 26 Jun 2025 20:26:05 +0530 Subject: [PATCH 12/12] improve error handling in tracker server and add send_message_with prefix --- .gitignore | 3 +- src/server/mod.rs | 26 +++++++++++++++ src/server/tracker_server.rs | 61 ++++++++++++++++++++++++++---------- 3 files changed, 72 insertions(+), 18 deletions(-) diff --git a/.gitignore b/.gitignore index e1d2205..f687a5b 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ /target -.ra-target \ No newline at end of file +.ra-target +.tracker \ No newline at end of file diff --git a/src/server/mod.rs b/src/server/mod.rs index 27d3094..0a5ebee 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -1,3 +1,29 @@ mod tracker_monitor; mod tracker_server; +use tokio::{ + io::{AsyncWriteExt, BufWriter}, + net::tcp::WriteHalf, +}; + pub use tracker_server::run; + +use crate::error::TrackerError; + +/// This method adds a prefix for +/// maker to identify if its +/// taker or not +pub async fn send_message_with_prefix( + writer: &mut BufWriter>, + message: &impl serde::Serialize, +) -> Result<(), TrackerError> { + let mut msg_bytes = Vec::new(); + msg_bytes.push(0x02); + msg_bytes.extend(serde_cbor::to_vec(message)?); + let msg_len = (msg_bytes.len() as u32).to_be_bytes(); + let mut to_send = Vec::with_capacity(msg_bytes.len() + msg_len.len()); + to_send.extend(msg_len); + to_send.extend(msg_bytes); + writer.write_all(&to_send).await?; + writer.flush().await?; + Ok(()) +} diff --git a/src/server/tracker_server.rs b/src/server/tracker_server.rs index 785e400..367310c 100644 --- a/src/server/tracker_server.rs +++ b/src/server/tracker_server.rs @@ -1,9 +1,8 @@ -use crate::handle_result; use crate::server::tracker_monitor::monitor_systems; use crate::status; use crate::types::DbRequest; -use crate::types::DnsRequest; -use crate::types::DnsResponse; +use crate::types::TrackerRequest; +use crate::types::TrackerResponse; use crate::utils::read_message; use crate::utils::send_message; use tokio::io::BufReader; @@ -12,6 +11,7 @@ use tokio::net::TcpListener; use tokio::net::TcpStream; use tokio::sync::mpsc; use tokio::sync::mpsc::Sender; +use tracing::error; use tracing::info; pub async fn run( @@ -32,43 +32,70 @@ pub async fn run( while let Ok((stream, client_addr)) = server.accept().await { info!("Accepted connection from {}", client_addr); - let status_tx_clone = status_tx.clone(); let db_tx_clone = db_tx.clone(); - tokio::spawn(async move { handle_client(stream, status_tx_clone, db_tx_clone).await }); + tokio::spawn(async move { handle_client(stream, db_tx_clone).await }); } Ok(()) } - -async fn handle_client(mut stream: TcpStream, status_tx: status::Sender, db_tx: Sender) { +async fn handle_client(mut stream: TcpStream, db_tx: Sender) { let (read_half, write_half) = stream.split(); - let mut reader = BufReader::new(read_half); let mut writer = BufWriter::new(write_half); loop { - let buffer = handle_result!(status_tx, read_message(&mut reader).await); - let request: DnsRequest = - handle_result!(status_tx, serde_cbor::de::from_reader(&buffer[..])); + let buffer = match read_message(&mut reader).await { + Ok(buf) => buf, + Err(e) if e.io_error_kind() == Some(std::io::ErrorKind::UnexpectedEof) => { + info!("Client disconnected."); + break; + } + Err(e) => { + error!("Failed to read message: {}", e); + break; + } + }; + + let request: TrackerRequest = match serde_cbor::de::from_reader(&buffer[..]) { + Ok(r) => r, + Err(e) => { + error!("Failed to deserialize client request: {e}"); + break; + } + }; match request { - DnsRequest::Get => { + TrackerRequest::Get => { info!("Received Get request taker"); let (resp_tx, mut resp_rx) = mpsc::channel(1); let db_request = DbRequest::QueryActive(resp_tx); - handle_result!(status_tx, db_tx.send(db_request).await); + + if let Err(e) = db_tx.send(db_request).await { + error!("Failed to send DB request: {e}"); + break; + } + let response = resp_rx.recv().await; + info!("Response: {:?}", response); + if let Some(addresses) = response { - let message = DnsResponse::Address { addresses }; - _ = send_message(&mut writer, &message).await; + let message = TrackerResponse::Address { addresses }; + if let Err(e) = send_message(&mut writer, &message).await { + error!("Failed to send response to client: {e}"); + break; + } } } - DnsRequest::Post { metadata: _ } => { + + TrackerRequest::Post { metadata: _ } => { todo!() } - DnsRequest::Pong { address: _ } => { + + TrackerRequest::Pong { address: _ } => { todo!() } } } + + info!("Connection handler exiting."); }