diff --git a/.gitmodules b/.gitmodules index 7d0f8a37f7b..17149e7e362 100644 --- a/.gitmodules +++ b/.gitmodules @@ -1,6 +1,6 @@ [submodule "libraries/chainbase"] path = libraries/chainbase - url = https://github.com/eosio/chainbase + url = https://github.com/mmcs85/chainbase ignore = dirty [submodule "libraries/appbase"] path = libraries/appbase diff --git a/libraries/chain/apply_context.cpp b/libraries/chain/apply_context.cpp index de1450013d8..ceed31ae52c 100644 --- a/libraries/chain/apply_context.cpp +++ b/libraries/chain/apply_context.cpp @@ -29,7 +29,33 @@ static inline void print_debug(account_name receiver, const action_trace& ar) { } } -void apply_context::exec_one( action_trace& trace ) +/** + * Plugins / observers listening to signals emited (such as accepted_transaction) might trigger + * errors and throw exceptions. Unless those exceptions are caught it could impact consensus and/or + * cause a node to fork. + * + * If it is ever desirable to let a signal handler bubble an exception out of this method + * a full audit of its uses needs to be undertaken. + * + */ +template +void emit( const Signal& s, Arg&& a ) { + try { + s(std::forward(a)); + } catch (boost::interprocess::bad_alloc& e) { + wlog( "bad alloc" ); + throw e; + } catch ( controller_emit_signal_exception& e ) { + wlog( "${details}", ("details", e.to_detail_string()) ); + throw e; + } catch ( fc::exception& e ) { + wlog( "${details}", ("details", e.to_detail_string()) ); + } catch ( ... ) { + wlog( "signal handler threw exception" ); + } +} + +void apply_context::exec_one( action_trace& trace, bool inline_action ) { auto start = fc::time_point::now(); @@ -113,11 +139,15 @@ void apply_context::finalize_trace( action_trace& trace, const fc::time_point& s void apply_context::exec( action_trace& trace ) { _notified.push_back(receiver); - exec_one( trace ); + + exec_one( trace, false ); + + emit(control.applied_action, trace); + for( uint32_t i = 1; i < _notified.size(); ++i ) { receiver = _notified[i]; trace.inline_traces.emplace_back( ); - exec_one( trace.inline_traces.back() ); + exec_one( trace.inline_traces.back(), true ); } if( _cfa_inline_actions.size() > 0 || _inline_actions.size() > 0 ) { diff --git a/libraries/chain/controller.cpp b/libraries/chain/controller.cpp index 9a6c4d0f958..daf2be79b68 100644 --- a/libraries/chain/controller.cpp +++ b/libraries/chain/controller.cpp @@ -145,6 +145,7 @@ struct controller_impl { map unapplied_transactions; void pop_block() { + auto local_head = head; auto prev = fork_db.get_block( head->header.previous ); EOS_ASSERT( prev, block_validate_exception, "attempt to pop beyond last irreversible block" ); @@ -158,9 +159,13 @@ struct controller_impl { for( const auto& t : head->trxs ) unapplied_transactions[t->signed_id] = t; } + + emit( self.pre_undo_block, local_head ); + head = prev; db.undo(); + emit( self.post_undo_block, local_head ); } diff --git a/libraries/chain/include/eosio/chain/apply_context.hpp b/libraries/chain/include/eosio/chain/apply_context.hpp index a253d950358..cafb8ba6870 100644 --- a/libraries/chain/include/eosio/chain/apply_context.hpp +++ b/libraries/chain/include/eosio/chain/apply_context.hpp @@ -472,7 +472,7 @@ class apply_context { /// Execution methods: public: - void exec_one( action_trace& trace ); + void exec_one( action_trace& trace, bool inline_action ); void exec( action_trace& trace ); void execute_inline( action&& a ); void execute_context_free_inline( action&& a ); diff --git a/libraries/chain/include/eosio/chain/controller.hpp b/libraries/chain/include/eosio/chain/controller.hpp index ec7b53fafc0..1b79bc7f618 100644 --- a/libraries/chain/include/eosio/chain/controller.hpp +++ b/libraries/chain/include/eosio/chain/controller.hpp @@ -253,6 +253,10 @@ namespace eosio { namespace chain { signal accepted_confirmation; signal bad_alloc; + signal applied_action; + signal pre_undo_block; + signal post_undo_block; + /* signal pre_apply_block; signal post_apply_block; diff --git a/libraries/chainbase b/libraries/chainbase index 8ca96ad6b18..dbc55dfbfc5 160000 --- a/libraries/chainbase +++ b/libraries/chainbase @@ -1 +1 @@ -Subproject commit 8ca96ad6b18709d65a7d1f67f8893978f25babcf +Subproject commit dbc55dfbfc566968147bd1eca0613298197d4dcb diff --git a/plugins/CMakeLists.txt b/plugins/CMakeLists.txt index 9b0b17b9d0a..23c1cc13eb7 100644 --- a/plugins/CMakeLists.txt +++ b/plugins/CMakeLists.txt @@ -7,6 +7,7 @@ add_subdirectory(chain_plugin) add_subdirectory(chain_api_plugin) add_subdirectory(producer_plugin) add_subdirectory(producer_api_plugin) +add_subdirectory(statetrack_plugin) add_subdirectory(history_plugin) add_subdirectory(history_api_plugin) diff --git a/plugins/statetrack_plugin/CMakeLists.txt b/plugins/statetrack_plugin/CMakeLists.txt new file mode 100644 index 00000000000..b22ef7d0fb1 --- /dev/null +++ b/plugins/statetrack_plugin/CMakeLists.txt @@ -0,0 +1,29 @@ +list(APPEND CMAKE_MODULE_PATH "${CMAKE_CURRENT_SOURCE_DIR}/CMakeModules") + +## load in pkg-config support +find_package(PkgConfig REQUIRED) +## use pkg-config to get hints for 0mq locations +pkg_check_modules(PC_ZeroMQ REQUIRED libzmq) + +## use the hint from above to find where 'zmq.hpp' is located +find_path(ZeroMQ_INCLUDE_DIR + NAMES zmq.hpp + PATHS ${PC_ZeroMQ_INCLUDE_DIRS} +) + +## use the hint from about to find the location of libzmq +find_library(ZeroMQ_LIBRARY + NAMES zmq + PATHS ${PC_ZeroMQ_LIBRARY_DIRS} + ) + +message(STATUS "[Additional Plugin] EOSIO ZeroMQ plugin enabled") + +file(GLOB HEADERS "include/eosio/statetrack_plugin/*.hpp") +add_library( statetrack_plugin + statetrack_plugin.cpp + statetrack_plugin_impl.cpp + ${HEADERS} ) + +target_link_libraries( statetrack_plugin ${ZeroMQ_LIBRARY} chain_plugin appbase fc ) +target_include_directories( statetrack_plugin PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}/include" ) diff --git a/plugins/statetrack_plugin/README.md b/plugins/statetrack_plugin/README.md new file mode 100644 index 00000000000..cbecd852ff3 --- /dev/null +++ b/plugins/statetrack_plugin/README.md @@ -0,0 +1,116 @@ +# statetrack_plugin + +This plugin adds hooks on chainbase operations emplace, modify, remove, and undo, +and routes the objects and revision numbers to a ZMQ socket. +This approach has different purposes: + +- Allowing developers to get fine-grained access to the state DB operations. + - To mirror the state DB into a traditional database system such as MongoDB + or Postgres without them having to implement any further logic or duplicate + the DB write logic of their smart contracts. + - To execute side-effects associated to specific database operations rather than + specific contract actions. + - To propagate database operations to the dapp frontends through WebSockets + for enabling real-time updates to user interfaces. +- Seamlessly handling any forks and transaction failures by leveraging nodeos' own + behavior in maintaining the consistency of its own databases through + undo and commit operations. +- Minimizing the work that happens inside nodeos and letting receivers such as + [statemirror](https://github.com/andresberrios/statemirror) or a demux-js reader + handle the more complex logic and more performance intensive work. + +The plugin can hook to the operations that happen on the state DBs of all contracts +deployed to the chain, and it can use filters to limit the operations +that should be sent to only the ones happening on the contracts, scopes, and tables +that the user is interested in tracking, thus limiting the performance impact to a minimum. + +The plugin will also send operations happening on accounts and account permissions. +It will also send a stream of applied actions. + +This is a very early stage of the implementation, and more testing is required. +This should not be considered production ready yet. + +## Configuration + +The following configuration statements in `config.ini` are recognized: + +- `plugin = eosio::statetrack_plugin` -- Enables the state track plugin +- `st-zmq-sender-bind = ENDPOINT` -- Specifies the PUSH socket connection endpoint. + Default value: tcp://127.0.0.1:3000. +- `st-filter-on = code:scope:table` -- Track DB operations which match code:scope:table. +- `st-filter-out = code:scope:table` -- Do not track DB operations which match code:scope:table. + +## Important notes + +- Accounts are sent as DB operations with `(code, scope, table)` being + `(system, system, accounts)`. + - Disable tracking of accounts using `st-filter-out = system:system:accounts`. +- Account permissions are sent as DB operations with `(code, scope, table)` + being `(system, system, permissions)`. + - Disable tracking of account permissions using `st-filter-out = system:system:permissions`. +- Actions are sent as DB operations with `(code, scope, table)` being + `(system, system, actions)`. + - Disable tracking of actions using `st-filter-out = system:system:actions`. +- Filters are not tested yet, but the implementation is taken from + the history plugin, so it should work fine. +- We're adding separate filters for actions to make it possible to + filter by specific actions of specific contracts. + +## Open questions + +- In Chainbase, we are emitting the operations inside an undo just before the + operation is actually executed and checked for success + ([See here](https://github.com/EOSIO/chainbase/compare/master...mmcs85:master#diff-298563b6c76ef92100c2ea27c06cb08bR390)). + - We would like advice on the best way (without degrading performance) to get + the item after it is given to the `modify` and `emplace` functions using `std::move`. + - We don't like this inconsistency since it is emitting the event before being + sure that the operation actually took place, but we think it might not + be an issue since those operations are undoing previous ones and it + would not be an expected scenario for them to fail unless there was a bug + or a database inconsistency (maybe caused by bad locking mechanisms when + using multiple threads?). +- Performance + - Serialization options + - JSON serializing on the plugin + - Sending binary and decoding to JSON in the receiver + - Would need to fetch ABIs, which would make it slower overall but put less load on nodeos + - Place operations in a queue during transaction processing + - Process queue, serialize each message to JSON and send to receiver + - After transaction is processed (prevents failed transactions from sending do and undo operations) + - During transaction processing but in a separate thread + - Risk of making transaction processing take longer and hit CPU time limit + - Only for transactions sent directly to this node? + - Does it affect validated transactions/blocks received from the network? +- Consensus + - Forks + - How to induce and test recovery + - Exceptions + - Other potential disruptions +- Socket communications + - In the current implementation, sending a message blocks until it is received (ZMQ PUSH socket) + - This is useful in case receiver goes down, since nodeos will wait for it to come back up + - Instead of maintaining a local in-memory queue of messages that can build up too much if it carries on + - However, it will make nodeos unavailable for API calls, and maybe other side-effects + - Lost messages + - When receiver pulls a message from the socket but then crashes + - Implement message acknowledgement signal? + - Implement an intermediate receiver that only manages the queue? + - Implement a very safe persistent received-messages queue in receiver from which to recover if it crashes? + +## Building + +The plugin needs to be built using the ZeroMQ C++ headers. + +### Adding ZMQ library on Mac + +``` +brew install zmq +brew tap jmuncaster/homebrew-header-only +brew install jmuncaster/header-only/cppzmq +``` + +### Adding ZMQ library on Ubuntu + +``` +apt-get install -y pkg-config libzmq5-dev +``` diff --git a/plugins/statetrack_plugin/include/eosio/statetrack_plugin/statetrack_plugin.hpp b/plugins/statetrack_plugin/include/eosio/statetrack_plugin/statetrack_plugin.hpp new file mode 100644 index 00000000000..ead0fcada78 --- /dev/null +++ b/plugins/statetrack_plugin/include/eosio/statetrack_plugin/statetrack_plugin.hpp @@ -0,0 +1,35 @@ +/** + * @file + * @copyright defined in eos/LICENSE.txt + */ +#pragma once + +#include +#include + +namespace eosio { + +using namespace appbase; + +class statetrack_plugin : public plugin { +public: + APPBASE_PLUGIN_REQUIRES((chain_plugin)) + + statetrack_plugin(); + statetrack_plugin(const statetrack_plugin&) = delete; + statetrack_plugin(statetrack_plugin&&) = delete; + statetrack_plugin& operator=(const statetrack_plugin&) = delete; + statetrack_plugin& operator=(statetrack_plugin&&) = delete; + virtual ~statetrack_plugin() override = default; + + virtual void set_program_options(options_description& cli, options_description& cfg) override; + void plugin_initialize(const variables_map& options); + void plugin_startup(); + void plugin_shutdown(); + +private: + std::unique_ptr my; +}; + +} + diff --git a/plugins/statetrack_plugin/include/eosio/statetrack_plugin/statetrack_plugin_impl.hpp b/plugins/statetrack_plugin/include/eosio/statetrack_plugin/statetrack_plugin_impl.hpp new file mode 100644 index 00000000000..4b92a56d2bf --- /dev/null +++ b/plugins/statetrack_plugin/include/eosio/statetrack_plugin/statetrack_plugin_impl.hpp @@ -0,0 +1,267 @@ +/** + * @file + * @copyright defined in eos/LICENSE.txt + */ +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +namespace eosio +{ + +using namespace chain; +using namespace chainbase; +using namespace bmi; +using boost::signals2::connection; +using bmi::random_access; + +typedef typename get_index_type::type co_index_type; +typedef typename get_index_type::type po_index_type; +typedef typename get_index_type::type plo_index_type; +typedef typename get_index_type::type ti_index_type; +typedef typename get_index_type::type kv_index_type; + +enum op_type_enum +{ + TABLE_REMOVE = 0, + ROW_CREATE = 1, + ROW_MODIFY = 2, + ROW_REMOVE = 3 +}; + +struct db_account +{ + account_name name; + uint8_t vm_type = 0; + uint8_t vm_version = 0; + bool privileged = false; + + time_point last_code_update; + digest_type code_version; + block_timestamp_type creation_date; +}; + +struct db_op +{ + int64_t oid; + uint64_t id; + block_num_type block_num; + uint64_t actionid; + + op_type_enum op_type; + account_name code; + fc::string scope; + table_name table; + account_name payer; + + fc::variant value; +}; + +struct db_action +{ + uint64_t actionid; + transaction_id_type trx_id; + block_num_type block_num; + fc::variant trace; + std::vector ops; +}; + +struct db_transaction { + transaction_id_type trx_id; + std::vector actions; +}; + +struct db_block { + block_num_type block_num; + block_id_type block_id; + bool irreversible; +}; + +struct db_undo_block : db_block { + std::vector ops; +}; + +struct filter_entry +{ + name code; + name scope; + name table; + + std::tuple key() const + { + return std::make_tuple(code, scope, table); + } + + friend bool operator<(const filter_entry &a, const filter_entry &b) + { + return a.key() < b.key(); + } +}; + +struct IndexByOId {}; +struct IndexByActionId {}; +struct IndexByTrxId {}; +struct IndexByBlockNum {}; + +typedef multi_index_container< + db_op, + indexed_by< + random_access<>, + ordered_unique, member >, + ordered_non_unique, member >, + ordered_non_unique, member > + > +> db_op_container; + +typedef multi_index_container< + db_action, + indexed_by< + random_access<>, + ordered_non_unique, member >, + ordered_non_unique, member > + > +> db_action_container; + +class statetrack_plugin_impl +{ + public: + void send_zmq_msg(const fc::string content); + bool filter(const account_name &code, const fc::string &scope, const table_name &table); + + void build_blacklist(); + + void init_current_block(); + + const table_id_object *get_kvo_tio(const database &db, const key_value_object &kvo); + const account_object &get_tio_co(const database &db, const table_id_object &tio); + const abi_def &get_co_abi(const account_object &co); + fc::variant get_kvo_row(const database &db, const table_id_object &tio, const key_value_object &kvo, bool json = true); + + db_op get_db_op(const database &db, const account_object &co, op_type_enum op_type); + db_op get_db_op(const database &db, const permission_object &po, op_type_enum op_type); + db_op get_db_op(const database &db, const permission_link_object &plo, op_type_enum op_type); + db_op get_db_op(const database &db, const table_id_object &tio, const key_value_object &kvo, op_type_enum op_type, bool json = true); + + //generic_index state tables + void on_applied_table(const database &db, const table_id_object &tio, op_type_enum op_type); + //generic_index op + void on_applied_op(const database &db, const account_object &co, op_type_enum op_type); + void on_applied_op(const database &db, const permission_object &po, op_type_enum op_type); + void on_applied_op(const database &db, const permission_link_object &plo, op_type_enum op_type); + void on_applied_op(const database &db, const key_value_object &kvo, op_type_enum op_type); + //generic_index undo + void on_applied_undo(const int64_t revision); + //blocks and transactions + void on_applied_transaction(const transaction_trace_ptr &ttp); + void on_accepted_block(const block_state_ptr &bsp); + void on_irreversible_block(const block_state_ptr &bsp); + void on_applied_action(action_trace& trace); + void on_pre_undo_block(const block_state_ptr& bsp); + void on_post_undo_block(const block_state_ptr& bsp); + + template + void create_index_events(const database &db) { + auto &index = db.get_index(); + + connections.emplace_back( + fc::optional(index.applied_emplace.connect([&](const typename MultiIndexType::value_type &v) { + on_applied_op(db, v, op_type_enum::ROW_CREATE); + }))); + + connections.emplace_back( + fc::optional(index.applied_modify.connect([&](const typename MultiIndexType::value_type &v) { + on_applied_op(db, v, op_type_enum::ROW_MODIFY); + }))); + + connections.emplace_back( + fc::optional(index.applied_remove.connect([&](const typename MultiIndexType::value_type &v) { + on_applied_op(db, v, op_type_enum::ROW_REMOVE); + }))); + + connections.emplace_back( + fc::optional(index.applied_undo.connect([&](const int64_t revision) { + on_applied_undo(revision); + }))); + } + + static void copy_inline_row(const key_value_object &obj, vector &data) + { + data.resize(obj.value.size()); + memcpy(data.data(), obj.value.data(), obj.value.size()); + } + + static fc::string scope_sym_to_string(scope_name sym_code) + { + fc::string scope = sym_code.to_string(); + if (scope.length() > 0 && scope[0] == '.') + { + uint64_t scope_int = sym_code; + vector v; + for (int i = 0; i < 7; ++i) + { + char c = (char)(scope_int & 0xff); + if (!c) + break; + v.emplace_back(c); + scope_int >>= 8; + } + return fc::string(v.begin(), v.end()); + } + return scope; + } + + chain_plugin *chain_plug = nullptr; + fc::string socket_bind_str; + zmq::context_t *context; + zmq::socket_t *sender_socket; + + bool is_undo_state; + + uint64_t current_action_index = 0; + db_action current_action; + + block_num_type current_undo_block_num = 0; + db_undo_block current_undo_block; + + db_action_container reversible_actions; + db_op_container reversible_ops; + + std::map> blacklist_actions; + + std::set filter_on; + std::set filter_out; + fc::microseconds abi_serializer_max_time; + + std::vector> connections; + + private: + bool shorten_abi_errors = true; +}; + +} // namespace eosio + +FC_REFLECT_ENUM(eosio::op_type_enum, (TABLE_REMOVE)(ROW_CREATE)(ROW_MODIFY)(ROW_REMOVE)) +FC_REFLECT(eosio::db_account, (name)(vm_type)(vm_version)(privileged)(last_code_update)(code_version)(creation_date)) +FC_REFLECT(eosio::db_op, (oid)(id)(op_type)(code)(scope)(table)(payer)(block_num)(actionid)(value)) +FC_REFLECT(eosio::db_action, (trx_id)(trace)(ops)) +FC_REFLECT(eosio::db_transaction, (trx_id)(actions)) +FC_REFLECT(eosio::db_block, (block_num)(block_id)(irreversible)) +FC_REFLECT_DERIVED(eosio::db_undo_block, (eosio::db_block), (ops)) diff --git a/plugins/statetrack_plugin/statetrack_plugin.cpp b/plugins/statetrack_plugin/statetrack_plugin.cpp new file mode 100644 index 00000000000..f583ae387a5 --- /dev/null +++ b/plugins/statetrack_plugin/statetrack_plugin.cpp @@ -0,0 +1,168 @@ +/** + * @file + * @copyright defined in eos/LICENSE.txt + */ +#include +#include + +namespace +{ +const char *SENDER_BIND = "st-zmq-sender-bind"; +const char *SENDER_BIND_DEFAULT = "tcp://127.0.0.1:3000"; +} // namespace + +namespace eosio +{ + +using namespace chain; +using namespace chainbase; + +static appbase::abstract_plugin &_statetrack_plugin = app().register_plugin(); + +// Plugin implementation + +statetrack_plugin::statetrack_plugin() : my(new statetrack_plugin_impl()) {} + +void statetrack_plugin::set_program_options(options_description &, options_description &cfg) +{ + cfg.add_options()(SENDER_BIND, bpo::value()->default_value(SENDER_BIND_DEFAULT), + "ZMQ Sender Socket binding"); + cfg.add_options()("st-filter-on,f", bpo::value>()->composing(), + "Track tables which match code:scope:table."); + cfg.add_options()("st-filter-out,F", bpo::value>()->composing(), + "Do not track tables which match code:scope:table."); +} + +void statetrack_plugin::plugin_initialize(const variables_map &options) +{ + ilog("initializing statetrack plugin"); + + try + { + if (options.count("st-filter-on")) + { + auto fo = options.at("st-filter-on").as>(); + for (auto &s : fo) + { + std::vector v; + boost::split(v, s, boost::is_any_of(":")); + EOS_ASSERT(v.size() == 3, fc::invalid_arg_exception, "Invalid value ${s} for --filter-on", ("s", s)); + filter_entry fe{v[0], v[1], v[2]}; + my->filter_on.insert(fe); + } + } + + if (options.count("st-filter-out")) + { + auto fo = options.at("st-filter-out").as>(); + for (auto &s : fo) + { + std::vector v; + boost::split(v, s, boost::is_any_of(":")); + EOS_ASSERT(v.size() == 3, fc::invalid_arg_exception, "Invalid value ${s} for --filter-out", ("s", s)); + filter_entry fe{v[0], v[1], v[2]}; + my->filter_out.insert(fe); + } + } + + my->socket_bind_str = options.at(SENDER_BIND).as(); + if (my->socket_bind_str.empty()) + { + wlog("zmq-sender-bind not specified => eosio::statetrack_plugin disabled."); + return; + } + + my->context = new zmq::context_t(1); + my->sender_socket = new zmq::socket_t(*my->context, ZMQ_PUSH); + my->chain_plug = app().find_plugin(); + + ilog("Bind to ZMQ PUSH socket ${u}", ("u", my->socket_bind_str)); + my->sender_socket->bind(my->socket_bind_str); + + ilog("Bind to ZMQ PUSH socket successful"); + + auto &chain = my->chain_plug->chain(); + const database &db = chain.db(); + + my->abi_serializer_max_time = my->chain_plug->get_abi_serializer_max_time(); + + my->init_current_block(); + + ilog("Binding database events"); + + // account_object events + my->create_index_events(db); + // permisson_object events + my->create_index_events(db); + // permisson_link_object events + my->create_index_events(db); + // key_value_object events + my->create_index_events(db); + + // table_id_object events + auto &ti_index = db.get_index(); + + my->connections.emplace_back( + fc::optional(ti_index.applied_remove.connect([&](const table_id_object &tio) { + my->on_applied_table(db, tio, op_type_enum::TABLE_REMOVE); + }))); + + // transaction and block events + + my->connections.emplace_back( + fc::optional(chain.applied_transaction.connect([&](const transaction_trace_ptr &ttp) { + my->on_applied_transaction(ttp); + }))); + + my->connections.emplace_back( + fc::optional(chain.accepted_block.connect([&](const block_state_ptr &bsp) { + my->on_accepted_block(bsp); + }))); + + my->connections.emplace_back( + fc::optional(chain.irreversible_block.connect([&](const block_state_ptr &bsp) { + my->on_irreversible_block(bsp); + }))); + + my->connections.emplace_back( + fc::optional(chain.applied_action.connect([&](action_trace& trace){ + my->on_applied_action(trace); + }))); + + my->connections.emplace_back( + fc::optional(chain.pre_undo_block.connect([&](const block_state_ptr& bsp){ + my->on_pre_undo_block(bsp); + }))); + + my->connections.emplace_back( + fc::optional(chain.post_undo_block.connect([&](const block_state_ptr& bsp){ + my->on_post_undo_block(bsp); + }))); + } + FC_LOG_AND_RETHROW() +} + +void statetrack_plugin::plugin_startup() +{ + +} + +void statetrack_plugin::plugin_shutdown() +{ + ilog("statetrack plugin shutdown"); + if (!my->socket_bind_str.empty()) + { + my->sender_socket->disconnect(my->socket_bind_str); + my->sender_socket->close(); + delete my->sender_socket; + delete my->context; + my->sender_socket = nullptr; + + for(auto conn : my->connections) { + conn->disconnect(); + conn.reset(); + } + } +} + +} // namespace eosio diff --git a/plugins/statetrack_plugin/statetrack_plugin_impl.cpp b/plugins/statetrack_plugin/statetrack_plugin_impl.cpp new file mode 100644 index 00000000000..f35ac201bcf --- /dev/null +++ b/plugins/statetrack_plugin/statetrack_plugin_impl.cpp @@ -0,0 +1,471 @@ +/** + * @file + * @copyright defined in eos/LICENSE.txt + */ +#include + +namespace eosio +{ + +using namespace chain; +using namespace chainbase; + +// statetrack plugin implementation + +void statetrack_plugin_impl::send_zmq_msg(const fc::string content) +{ + if (sender_socket == nullptr) + return; + + zmq::message_t message(content.length()); + memcpy(message.data(), content.c_str(), content.length()); + sender_socket->send(message); +} + +bool statetrack_plugin_impl::filter(const account_name &code, const fc::string &scope, const table_name &table) +{ + if (filter_on.size() > 0) + { + bool pass_on = false; + if (filter_on.find({code, 0, 0}) != filter_on.end()) + { + pass_on = true; + } + if (filter_on.find({code, scope, 0}) != filter_on.end()) + { + pass_on = true; + } + if (filter_on.find({code, scope, table}) != filter_on.end()) + { + pass_on = true; + } + if (!pass_on) + { + return false; + } + } + + if (filter_out.size() > 0) + { + if (filter_out.find({code, 0, 0}) != filter_out.end()) + { + return false; + } + if (filter_out.find({code, scope, 0}) != filter_out.end()) + { + return false; + } + if (filter_out.find({code, scope, table}) != filter_out.end()) + { + return false; + } + } + + return true; +} + +void statetrack_plugin_impl::build_blacklist() { + blacklist_actions.emplace(std::make_pair(config::system_account_name, + std::set{N(onblock)})); + blacklist_actions.emplace(std::make_pair(N(blocktwitter), + std::set{N(tweet)})); +} + +void statetrack_plugin_impl::init_current_block() { + is_undo_state = false; +} + +const table_id_object *statetrack_plugin_impl::get_kvo_tio(const database &db, const key_value_object &kvo) +{ + return db.find(kvo.t_id); +} + +const account_object &statetrack_plugin_impl::get_tio_co(const database &db, const table_id_object &tio) +{ + const account_object *co = db.find(tio.code); + EOS_ASSERT(co != nullptr, account_query_exception, "Fail to retrieve account for ${code}", ("code", tio.code)); + return *co; +} + +const abi_def &statetrack_plugin_impl::get_co_abi(const account_object &co) +{ + abi_def *abi = new abi_def(); + abi_serializer::to_abi(co.abi, *abi); + return *abi; +} + +fc::variant statetrack_plugin_impl::get_kvo_row(const database &db, const table_id_object &tio, const key_value_object &kvo, bool json) +{ + vector data; + copy_inline_row(kvo, data); + + if (json) + { + const account_object &co = get_tio_co(db, tio); + const abi_def &abi = get_co_abi(co); + + abi_serializer abis; + abis.set_abi(abi, abi_serializer_max_time); + + return abis.binary_to_variant(abis.get_table_type(tio.table), data, abi_serializer_max_time, shorten_abi_errors); + } + else + { + return fc::variant(data); + } +} + +db_op statetrack_plugin_impl::get_db_op(const database &db, const account_object &co, op_type_enum op_type) +{ + db_op op; + + name system = N(system); + + op.oid = co.id._id; + op.op_type = op_type; + + if(is_undo_state) { + op.block_num = current_undo_block_num; + } + else { + op.actionid = current_action_index; + } + + op.code = system; + op.scope = system.to_string(); + op.table = N(accounts); + + db_account account; + account.name = co.name; + account.vm_type = co.vm_type; + account.vm_version = co.vm_version; + account.privileged = co.privileged; + account.last_code_update = co.last_code_update; + account.code_version = co.code_version; + account.creation_date = co.creation_date; + + op.value = fc::variant(account); + + return op; +} + +db_op statetrack_plugin_impl::get_db_op(const database &db, const permission_object &po, op_type_enum op_type) +{ + db_op op; + + name system = N(system); + + op.oid = po.id._id; + op.op_type = op_type; + + if(is_undo_state) { + op.block_num = current_undo_block_num; + } + else { + op.actionid = current_action_index; + } + + op.code = system; + op.scope = system.to_string(); + op.table = N(permissions); + op.value = fc::variant(po); + + return op; +} + +db_op statetrack_plugin_impl::get_db_op(const database &db, const permission_link_object &plo, op_type_enum op_type) +{ + db_op op; + + name system = N(system); + + op.oid = plo.id._id; + op.op_type = op_type; + + if(is_undo_state) { + op.block_num = current_undo_block_num; + } + else { + op.actionid = current_action_index; + } + + op.code = system; + op.scope = system.to_string(); + op.table = N(permission_links); + op.value = fc::variant(plo); + + return op; +} + +db_op statetrack_plugin_impl::get_db_op(const database &db, const table_id_object &tio, const key_value_object &kvo, op_type_enum op_type, bool json) +{ + db_op op; + + op.oid = kvo.id._id; + op.id = kvo.primary_key; + op.op_type = op_type; + + if(is_undo_state) { + op.block_num = current_undo_block_num; + } + else { + op.actionid = current_action_index; + } + + op.code = tio.code; + op.scope = scope_sym_to_string(tio.scope); + op.table = tio.table; + + if (op_type == op_type_enum::ROW_CREATE || + op_type == op_type_enum::ROW_MODIFY) + { + op.value = get_kvo_row(db, tio, kvo, json); + } + + return op; +} + +void statetrack_plugin_impl::on_applied_table(const database &db, const table_id_object &tio, op_type_enum op_type) +{ + if (sender_socket == nullptr) + return; + + auto code = tio.code; + auto scope = scope_sym_to_string(tio.scope); + auto table = tio.table; + + if (filter(code, scope, table)) + { + db_op op; + + op.oid = tio.id._id; + op.op_type = op_type; + + if(is_undo_state) { + op.block_num = current_undo_block_num; + } + else { + op.actionid = current_action_index; + } + + op.code = tio.code; + op.scope = scope_sym_to_string(tio.scope); + op.table = tio.table; + + current_action.ops.emplace_back(op); + + fc::string data = fc::json::to_string(op); + //ilog("STATETRACK table_id_object ${op_type}: ${data}", ("op_type", op_type)("data", data)); + } +} + +void statetrack_plugin_impl::on_applied_op(const database &db, const account_object &co, op_type_enum op_type) +{ + if (sender_socket == nullptr) + return; + + name system = N(system); + auto code = system; + auto scope = system.to_string(); + auto table = N(accounts); + + if (filter(code, scope, table)) + { + auto op = get_db_op(db, co, op_type); + + current_action.ops.emplace_back(op); + + fc::string data = fc::json::to_string(op); + //ilog("STATETRACK account_object ${op_type}: ${data}", ("op_type", op_type)("data", data)); + } +} + +void statetrack_plugin_impl::on_applied_op(const database &db, const permission_object &po, op_type_enum op_type) +{ + if (sender_socket == nullptr) + return; + + name system = N(system); + auto code = system; + auto scope = system.to_string(); + auto table = N(permission); + + if (filter(code, scope, table)) + { + auto op = get_db_op(db, po, op_type); + + current_action.ops.emplace_back(op); + + fc::string data = fc::json::to_string(op); + //ilog("STATETRACK permission_object ${op_type}: ${data}", ("op_type", op_type)("data", data)); + } +} + +void statetrack_plugin_impl::on_applied_op(const database &db, const permission_link_object &plo, op_type_enum op_type) +{ + if (sender_socket == nullptr) + return; + + name system = N(system); + auto code = system; + auto scope = system.to_string(); + auto table = N(permission_links); + + if (filter(code, scope, table)) + { + auto op = get_db_op(db, plo, op_type); + + current_action.ops.emplace_back(op); + + fc::string data = fc::json::to_string(op); + //ilog("STATETRACK permission_link_object ${op_type}: ${data}", ("op_type", op_type)("data", data)); + } +} + +void statetrack_plugin_impl::on_applied_op(const database &db, const key_value_object &kvo, op_type_enum op_type) +{ + if (sender_socket == nullptr) + return; + + const table_id_object *tio_ptr = get_kvo_tio(db, kvo); + + if (tio_ptr == nullptr) + return; + + const table_id_object &tio = *tio_ptr; + + auto code = tio.code; + auto scope = scope_sym_to_string(tio.scope); + auto table = tio.table; + + if (filter(code, scope, table)) + { + auto op = get_db_op(db, *tio_ptr, kvo, op_type); + + current_action.ops.emplace_back(op); + + fc::string data = fc::json::to_string(op); + //ilog("STATETRACK key_value_object ${op_type}: ${data}", ("op_type", op_type)("data", data)); + } +} + +void statetrack_plugin_impl::on_applied_undo(const int64_t revision) +{ + is_undo_state = true; +} + +void statetrack_plugin_impl::on_applied_transaction(const transaction_trace_ptr &ttp) +{ + if (sender_socket == nullptr || !ttp->receipt) + return; + + db_transaction trx; + + trx.trx_id = ttp->id; + + auto trx_action_range = reversible_actions.get().equal_range(trx.trx_id); + for (auto& action : boost::make_iterator_range(trx_action_range)) { + + trx.actions.emplace_back(action); + auto newact = trx.actions.back(); + + auto action_op_range = reversible_ops.get().equal_range(action.actionid); + for (auto& op : boost::make_iterator_range(action_op_range)) { + newact.ops.emplace_back(op); + } + } + + fc::string data = fc::json::to_string(trx); + ilog("STATETRACK applied transaction: ${trx}", ("trx", data)); + + send_zmq_msg(data); +} + +void statetrack_plugin_impl::on_accepted_block(const block_state_ptr &bsp) +{ + if (sender_socket == nullptr) + return; + + db_block block; + block.block_num = bsp->block_num; + block.block_id = bsp->id; + + fc::string data = fc::json::to_string(block); + ilog("STATETRACK accepted block: ${data}", ("data", data)); + + send_zmq_msg(data); + + auto block_action_range = reversible_actions.get().equal_range(bsp->block_num); + for (auto& action : boost::make_iterator_range(block_action_range)) { + auto action_op_range = reversible_ops.get().equal_range(action.actionid); + reversible_ops.get().erase(action_op_range.first, action_op_range.second); + } + reversible_actions.get().erase(block_action_range.first, block_action_range.second); +} + +void statetrack_plugin_impl::on_irreversible_block(const block_state_ptr &bsp) +{ + if (sender_socket == nullptr) + return; + + db_block block; + block.block_num = bsp->block_num; + block.block_id = bsp->id; + block.irreversible = true; + + fc::string data = fc::json::to_string(block); + ilog("STATETRACK irreversible block: ${data}", ("data", data)); + + send_zmq_msg(data); + + auto block_action_range = reversible_actions.get().equal_range(bsp->block_num); + for (auto& action : boost::make_iterator_range(block_action_range)) { + auto action_op_range = reversible_ops.get().equal_range(action.actionid); + reversible_ops.get().erase(action_op_range.first, action_op_range.second); + } + reversible_actions.get().erase(block_action_range.first, block_action_range.second); +} + +void statetrack_plugin_impl::on_applied_action(action_trace& trace) +{ + current_action.trx_id = trace.trx_id; + current_action.block_num = trace.block_num; + current_action.trace = chain_plug->chain().to_variant_with_abi(trace, abi_serializer_max_time); + current_action_index++; + reversible_actions.emplace_back(current_action); + current_action.ops.clear(); + + //ilog("on_applied_action: ${da}", ("da", *current_action)); +} + +void statetrack_plugin_impl::on_pre_undo_block(const block_state_ptr& bsp) +{ + is_undo_state = true; + current_undo_block.block_num = bsp->block_num; + current_undo_block.block_id = bsp->id; + + auto block_ops_range = reversible_ops.get().equal_range(current_undo_block_num); + reversible_ops.get().erase(block_ops_range.first, block_ops_range.second); +} + +void statetrack_plugin_impl::on_post_undo_block(const block_state_ptr&) +{ + if (sender_socket == nullptr) + return; + + is_undo_state = false; + + auto block_ops_range = reversible_ops.get().equal_range(current_undo_block_num); + for (auto& op : boost::make_iterator_range(block_ops_range)) { + current_undo_block.ops.emplace_back(op); + } + + fc::string data = fc::json::to_string(current_undo_block); + ilog("STATETRACK on_post_undo_block: ${block}", ("block", data)); + + send_zmq_msg(data); + + current_undo_block.ops.clear(); +} + +} // namespace eosio \ No newline at end of file diff --git a/programs/nodeos/CMakeLists.txt b/programs/nodeos/CMakeLists.txt index 9e1481c23c3..e525a5b3caa 100644 --- a/programs/nodeos/CMakeLists.txt +++ b/programs/nodeos/CMakeLists.txt @@ -60,6 +60,7 @@ target_link_libraries( ${NODE_EXECUTABLE_NAME} PRIVATE -Wl,${whole_archive_flag} txn_test_gen_plugin -Wl,${no_whole_archive_flag} PRIVATE -Wl,${whole_archive_flag} db_size_api_plugin -Wl,${no_whole_archive_flag} PRIVATE -Wl,${whole_archive_flag} producer_api_plugin -Wl,${no_whole_archive_flag} + PRIVATE -Wl,${whole_archive_flag} statetrack_plugin -Wl,${no_whole_archive_flag} PRIVATE -Wl,${whole_archive_flag} test_control_plugin -Wl,${no_whole_archive_flag} PRIVATE -Wl,${whole_archive_flag} test_control_api_plugin -Wl,${no_whole_archive_flag} PRIVATE -Wl,${build_id_flag}