diff --git a/AdsLib/AdsLib.h b/AdsLib/AdsLib.h index a9ecb732..de0f30a0 100644 --- a/AdsLib/AdsLib.h +++ b/AdsLib/AdsLib.h @@ -128,6 +128,35 @@ long AdsSyncAddDeviceNotificationReqEx(long port, const AmsAddr *pAddr, long AdsSyncDelDeviceNotificationReqEx(long port, const AmsAddr *pAddr, uint32_t hNotification); +/** + * A specific type of notification about the internal state of the library. + * When a state change as defined by nType parameter occurs the callback function is invoked + * @param[in] port port number of an Ads port that had previously been opened with AdsPortOpenEx(). + * @param[in] pAddr Structure with NetId and port number of the ADS server. + * @param[in] nType The notification type. + * @param[in] pFunc Pointer to the structure describing the callback function. + * @param[in] hUser 32-bit value that is passed to the callback function. + * @param[out] pNotification Address of the variable that will receive the handle of the notification. + * @return [ADS Return + * Code](https://infosys.beckhoff.com/content/1031/tcadscommon/html/ads_returncodes.htm?id=1666172286265530469) + */ +long AdsAddSyntheticDeviceNotificationReqEx(long port, const AmsAddr *pAddr, + uint32_t nType, + PAdsSyntheticNotificationFuncEx pFunc, + uint32_t hUser, + uint32_t *pNotification); + +/** + * A notification defined previously is deleted from an ADS server. + * @param[in] port port number of an Ads port that had previously been opened with AdsPortOpenEx(). + * @param[in] pAddr Structure with NetId and port number of the ADS server. + * @param[in] hNotification Address of the variable that contains the handle of the notification. + * @return [ADS Return + * Code](https://infosys.beckhoff.com/content/1031/tcadscommon/html/ads_returncodes.htm?id=1666172286265530469) + */ +long AdsDelSyntheticDeviceNotificationReqEx(long port, const AmsAddr *pAddr, + uint32_t hNotification); + /** * Read the configured timeout for the ADS functions. The standard value is 5000 ms. * @param[in] port port number of an Ads port that had previously been opened with AdsPortOpenEx(). diff --git a/AdsLib/AdsSyntheticNotification.h b/AdsLib/AdsSyntheticNotification.h new file mode 100644 index 00000000..663a3185 --- /dev/null +++ b/AdsLib/AdsSyntheticNotification.h @@ -0,0 +1,38 @@ +// SPDX-License-Identifier: MIT + +#pragma once + +#include "AdsDef.h" + +using VirtualConnection = std::pair; + +struct SyntheticNotification { + const VirtualConnection connection; + const uint32_t type; + + SyntheticNotification(PAdsSyntheticNotificationFuncEx __func, + uint32_t __hUser, AmsAddr __amsAddr, + uint16_t __port, uint32_t __type) + : connection({ __port, __amsAddr }) + , type(__type) + , callback(__func) + , hNotification(0) + , hUser(__hUser) + { + } + + void Notify() + { + callback(&connection.second, hNotification, hUser); + } + + void hNotify(uint32_t value) + { + hNotification = value; + } + + private: + const PAdsSyntheticNotificationFuncEx callback; + uint32_t hNotification; + const uint32_t hUser; +}; diff --git a/AdsLib/AmsConnection.h b/AdsLib/AmsConnection.h index ec0d54c1..dd92dad8 100644 --- a/AdsLib/AmsConnection.h +++ b/AdsLib/AmsConnection.h @@ -74,6 +74,9 @@ struct AmsConnection { SharedDispatcher CreateNotifyMapping(uint32_t hNotify, std::shared_ptr notification); + SharedDispatcher + CreateSyntheticNotifyMapping(uint32_t hNotify, + std::shared_ptr notification); long DeleteNotification(const AmsAddr &amsAddr, uint32_t hNotify, uint32_t tmms, uint16_t port); long AdsRequest(AmsRequest &request, uint32_t timeout); diff --git a/AdsLib/AmsPort.h b/AdsLib/AmsPort.h index 21b89977..b3ff4cec 100644 --- a/AdsLib/AmsPort.h +++ b/AdsLib/AmsPort.h @@ -19,9 +19,14 @@ struct AmsPort { SharedDispatcher dispatcher); long DelNotification(AmsAddr ams, uint32_t hNotify); + void AddSyntheticNotification(AmsAddr ams, uint32_t hNotify, + SharedDispatcher dispatcher); + long DelSyntheticNotification(AmsAddr ams, uint32_t hNotify); + private: using NotifyUUID = std::pair; static const uint32_t DEFAULT_TIMEOUT = 5000; std::map dispatcherList; + std::map syntheticDispatcherList; std::mutex mutex; }; diff --git a/AdsLib/AmsRouter.h b/AdsLib/AmsRouter.h index 37da047a..965677cc 100644 --- a/AdsLib/AmsRouter.h +++ b/AdsLib/AmsRouter.h @@ -6,6 +6,7 @@ #pragma once #include "AmsConnection.h" +#include #include struct AmsRouter : Router { @@ -21,6 +22,11 @@ struct AmsRouter : Router { std::shared_ptr notify); long DelNotification(uint16_t port, const AmsAddr *pAddr, uint32_t hNotification); + long AddSyntheticNotification(uint16_t port, const AmsAddr &addr, uint32_t *pNotification, + std::shared_ptr notify); + long DelSyntheticNotification(uint16_t port, const AmsAddr *pAddr, + uint32_t hNotification); + uint32_t AllocateNotifyId(); [[deprecated]] long AddRoute(AmsNetId ams, const IpV4 &ip); @@ -43,4 +49,6 @@ struct AmsRouter : Router { void DeleteIfLastConnection(const AmsConnection *conn); std::array ports; + + std::atomic nextNotifyId; }; diff --git a/AdsLib/NotificationDispatcher.h b/AdsLib/NotificationDispatcher.h index 44b01347..9bcc04a3 100644 --- a/AdsLib/NotificationDispatcher.h +++ b/AdsLib/NotificationDispatcher.h @@ -6,6 +6,7 @@ #pragma once #include "AdsNotification.h" +#include "AdsSyntheticNotification.h" #include "AmsHeader.h" #include "Semaphore.h" @@ -13,29 +14,38 @@ #include #include #include +#include using DeleteNotificationCallback = std::function; struct NotificationDispatcher { - NotificationDispatcher(DeleteNotificationCallback callback); + NotificationDispatcher(VirtualConnection connection, + DeleteNotificationCallback callback); ~NotificationDispatcher(); void Emplace(uint32_t hNotify, std::shared_ptr notification); long Erase(uint32_t hNotify, uint32_t tmms); + void EmplaceSynthetic(uint32_t hNotify, + std::shared_ptr notification); + long EraseSynthetic(uint32_t hNotify); void Notify(); void Run(); const DeleteNotificationCallback deleteNotification; RingBuffer ring; - private: + private: + const VirtualConnection connection; std::map > notifications; + std::map > syntheticNotifications; std::recursive_mutex mutex; Semaphore sem; std::atomic stopExecution; std::thread thread; std::shared_ptr Find(uint32_t hNotify); + std::vector > + FindSynthetic(uint32_t type); }; using SharedDispatcher = std::shared_ptr; diff --git a/AdsLib/RingBuffer.h b/AdsLib/RingBuffer.h index a6678f8a..c08781a4 100644 --- a/AdsLib/RingBuffer.h +++ b/AdsLib/RingBuffer.h @@ -10,6 +10,8 @@ #include struct RingBuffer { + friend struct RingBufferTransaction; + RingBuffer(size_t N) : dataSize(N + 1) , data(new uint8_t[N + 1]) diff --git a/AdsLib/RingBufferTransaction.h b/AdsLib/RingBufferTransaction.h new file mode 100644 index 00000000..c1ca2cbd --- /dev/null +++ b/AdsLib/RingBufferTransaction.h @@ -0,0 +1,45 @@ +// SPDX-License-Identifier: MIT + +#pragma once + +#include "RingBuffer.h" + +#include +#include + +struct RingBufferTransaction { + RingBufferTransaction(RingBuffer &target) + : buffer(target) + , write(target.write) + { + } + + size_t BytesFree() const + { + return (write < buffer.read) ? + buffer.read - write - 1 : + buffer.dataSize - 1 - (write - buffer.read); + } + + size_t WriteChunk() const + { + return (write < buffer.read) ? + buffer.read - write - 1 : + buffer.data.get() + buffer.dataSize - write - + (buffer.data.get() == buffer.read); + } + + void Write(size_t n) + { + assert(n <= BytesFree()); + write = buffer.Increment(write, n); + } + + void Commit() + { + buffer.write = write; + } + + RingBuffer &buffer; + uint8_t *write; +}; diff --git a/AdsLib/standalone/AdsDef.h b/AdsLib/standalone/AdsDef.h index 76507176..bdd93a28 100644 --- a/AdsLib/standalone/AdsDef.h +++ b/AdsLib/standalone/AdsDef.h @@ -279,6 +279,11 @@ enum AMSPORT : uint16_t { #define ADSERR_CLIENT_SYNCPORTLOCKED \ (0x55 + ERR_ADSERRS) /**< sync port is locked */ +//////////////////////////////////////////////////////////////////////////////// +// Synthetic notification types +#define NOTIFY_NOTIFICATION_RCV 0x02 /**< notification received notification */ +#define NOTIFY_CONNECTION_LOST 0x01 /**< connection lost notification */ + #pragma pack(push, 1) /** @@ -427,6 +432,16 @@ typedef void (*PAdsNotificationFuncEx)( const AmsAddr *pAddr, const AdsNotificationHeader *pNotification, uint32_t hUser); +/** + * @brief Type definition of the callback function required by the AdsSyncAddSyntheticDeviceNotificationReqEx() function. + * @param[in] pAddr Structure with NetId and port number of the ADS server. + * @param[in] hNotification Handle for the notification. Is specified when the notification is defined. + * @param[in] hUser custom handle pass to AdsSyncAddSyntheticDeviceNotificationReqEx() during registration + */ +typedef void (*PAdsSyntheticNotificationFuncEx)( + const AmsAddr *pAddr, uint32_t hNotification, + uint32_t hUser); + #define ADSSYMBOLFLAG_PERSISTENT ((uint32_t)(1 << 0)) #define ADSSYMBOLFLAG_BITVALUE ((uint32_t)(1 << 1)) #define ADSSYMBOLFLAG_REFERENCETO ((uint32_t)(1 << 2)) diff --git a/AdsLib/standalone/AdsLib.cpp b/AdsLib/standalone/AdsLib.cpp index 6f80f4c9..c8f17b34 100644 --- a/AdsLib/standalone/AdsLib.cpp +++ b/AdsLib/standalone/AdsLib.cpp @@ -272,6 +272,39 @@ long AdsSyncDelDeviceNotificationReqEx(long port, const AmsAddr *pAddr, hNotification); } +long AdsAddSyntheticDeviceNotificationReqEx(long port, const AmsAddr *pAddr, + uint32_t nType, + PAdsSyntheticNotificationFuncEx pFunc, + uint32_t hUser, uint32_t *pNotification) +{ + ASSERT_PORT_AND_AMSADDR(port, pAddr); + if (!pFunc || !pNotification) { + return ADSERR_CLIENT_INVALIDPARM; + } + + if (nType != NOTIFY_NOTIFICATION_RCV && + nType != NOTIFY_CONNECTION_LOST) { + return ADSERR_CLIENT_INVALIDPARM; + } + + try { + auto notify = std::make_shared( + pFunc, hUser, *pAddr, (uint16_t)port, nType); + return GetRouter().AddSyntheticNotification( + (uint16_t)port, *pAddr, pNotification, notify); + } catch (const std::bad_alloc &) { + return GLOBALERR_NO_MEMORY; + } +} + +long AdsDelSyntheticDeviceNotificationReqEx(long port, const AmsAddr *pAddr, + uint32_t hNotification) +{ + ASSERT_PORT_AND_AMSADDR(port, pAddr); + return GetRouter().DelSyntheticNotification((uint16_t)port, pAddr, + hNotification); +} + long AdsSyncGetTimeoutEx(long port, uint32_t *timeout) { ASSERT_PORT(port); diff --git a/AdsLib/standalone/AmsConnection.cpp b/AdsLib/standalone/AmsConnection.cpp index d09e41e9..6baebec8 100644 --- a/AdsLib/standalone/AmsConnection.cpp +++ b/AdsLib/standalone/AmsConnection.cpp @@ -5,6 +5,7 @@ #include "AmsConnection.h" #include "Log.h" +#include "RingBufferTransaction.h" AmsResponse::AmsResponse() : request(nullptr) @@ -47,10 +48,13 @@ AmsConnection::DispatcherListAdd(const VirtualConnection &connection) std::lock_guard lock(dispatcherListMutex); return dispatcherList .emplace(connection, - std::make_shared(std::bind( - &AmsConnection::DeleteNotification, this, - connection.second, std::placeholders::_1, - std::placeholders::_2, connection.first))) + std::make_shared( + connection, + std::bind(&AmsConnection::DeleteNotification, + this, connection.second, + std::placeholders::_1, + std::placeholders::_2, + connection.first))) .first->second; } @@ -93,6 +97,16 @@ AmsConnection::CreateNotifyMapping(uint32_t hNotify, return dispatcher; } +SharedDispatcher +AmsConnection::CreateSyntheticNotifyMapping(uint32_t hNotify, + std::shared_ptr notification) +{ + auto dispatcher = DispatcherListAdd(notification->connection); + notification->hNotify(hNotify); + dispatcher->EmplaceSynthetic(hNotify, notification); + return dispatcher; +} + long AmsConnection::DeleteNotification(const AmsAddr &amsAddr, uint32_t hNotify, uint32_t tmms, uint16_t port) { @@ -291,7 +305,7 @@ bool AmsConnection::ReceiveNotification(const AoEHeader &header) return false; } - auto &ring = dispatcher->ring; + auto ring = RingBufferTransaction(dispatcher->ring); auto bytesLeft = header.length(); if (bytesLeft + sizeof(bytesLeft) > ring.BytesFree()) { ReceiveJunk(bytesLeft); @@ -316,6 +330,8 @@ bool AmsConnection::ReceiveNotification(const AoEHeader &header) } Receive(ring.write, bytesLeft); ring.Write(bytesLeft); + + ring.Commit(); dispatcher->Notify(); return true; } @@ -325,6 +341,9 @@ void AmsConnection::TryRecv() try { Recv(); } catch (const std::runtime_error &e) { + for (auto &dispatcher : dispatcherList) { + dispatcher.second->Notify(); + } LOG_INFO(e.what()); } } diff --git a/AdsLib/standalone/AmsPort.cpp b/AdsLib/standalone/AmsPort.cpp index 622615c3..97ccc630 100644 --- a/AdsLib/standalone/AmsPort.cpp +++ b/AdsLib/standalone/AmsPort.cpp @@ -26,6 +26,13 @@ void AmsPort::AddNotification(const AmsAddr ams, const uint32_t hNotify, dispatcherList.emplace(NotifyUUID{ ams, hNotify }, dispatcher); } +void AmsPort::AddSyntheticNotification(const AmsAddr ams, const uint32_t hNotify, + SharedDispatcher dispatcher) +{ + std::lock_guard lock(mutex); + syntheticDispatcherList.emplace(NotifyUUID { ams, hNotify }, dispatcher); +} + void AmsPort::Close() { std::lock_guard lock(mutex); @@ -34,6 +41,12 @@ void AmsPort::Close() d.second->Erase(d.first.second, tmms); } dispatcherList.clear(); + + for (auto &d : syntheticDispatcherList) { + d.second->EraseSynthetic(d.first.second); + } + syntheticDispatcherList.clear(); + tmms = DEFAULT_TIMEOUT; port = 0; } @@ -50,6 +63,18 @@ long AmsPort::DelNotification(const AmsAddr ams, uint32_t hNotify) return ADSERR_CLIENT_REMOVEHASH; } +long AmsPort::DelSyntheticNotification(AmsAddr ams, uint32_t hNotify) +{ + std::lock_guard lock(mutex); + auto it = syntheticDispatcherList.find({ ams, hNotify }); + if (it != syntheticDispatcherList.end()) { + const auto status = it->second->EraseSynthetic(hNotify); + syntheticDispatcherList.erase(it); + return status; + } + return ADSERR_CLIENT_REMOVEHASH; +} + bool AmsPort::IsOpen() const { return !!port; diff --git a/AdsLib/standalone/AmsRouter.cpp b/AdsLib/standalone/AmsRouter.cpp index 97c2b96e..f765f48e 100644 --- a/AdsLib/standalone/AmsRouter.cpp +++ b/AdsLib/standalone/AmsRouter.cpp @@ -9,7 +9,7 @@ #include AmsRouter::AmsRouter(AmsNetId netId) - : localAddr(netId) + : localAddr(netId), nextNotifyId(1) { } @@ -236,6 +236,22 @@ long AmsRouter::AddNotification(AmsRequest &request, uint32_t *pNotification, return status; } +long AmsRouter::AddSyntheticNotification(uint16_t port, const AmsAddr &addr, uint32_t *pNotification, + std::shared_ptr notify) +{ + auto ads = GetConnection(addr.netId); + if (!ads) { + return GLOBALERR_MISSING_ROUTE; + } + + *pNotification = AllocateNotifyId(); + auto dispatcher = ads->CreateSyntheticNotifyMapping(*pNotification, notify); + auto& amsPort = ports[port - Router::PORT_BASE]; + amsPort.AddSyntheticNotification(addr, *pNotification, dispatcher); + + return 0; +} + long AmsRouter::DelNotification(uint16_t port, const AmsAddr *pAddr, uint32_t hNotification) { @@ -243,6 +259,18 @@ long AmsRouter::DelNotification(uint16_t port, const AmsAddr *pAddr, return p.DelNotification(*pAddr, hNotification); } +long AmsRouter::DelSyntheticNotification(uint16_t port, const AmsAddr *pAddr, + uint32_t hNotification) +{ + auto &p = ports[port - Router::PORT_BASE]; + return p.DelSyntheticNotification(*pAddr, hNotification); +} + +uint32_t AmsRouter::AllocateNotifyId() +{ + return nextNotifyId.fetch_add(1, std::memory_order_acq_rel); +} + void AmsRouter::AwaitConnectionAttempts( const AmsNetId &ams, std::unique_lock &lock) { diff --git a/AdsLib/standalone/NotificationDispatcher.cpp b/AdsLib/standalone/NotificationDispatcher.cpp index 8d3998c7..06e7b940 100644 --- a/AdsLib/standalone/NotificationDispatcher.cpp +++ b/AdsLib/standalone/NotificationDispatcher.cpp @@ -8,9 +8,10 @@ #include NotificationDispatcher::NotificationDispatcher( - DeleteNotificationCallback callback) + VirtualConnection connection, DeleteNotificationCallback callback) : deleteNotification(callback) , ring(4 * 1024 * 1024) + , connection(connection) , stopExecution(false) , thread(&NotificationDispatcher::Run, this) { @@ -30,6 +31,13 @@ void NotificationDispatcher::Emplace(uint32_t hNotify, notifications.emplace(hNotify, notification); } +void NotificationDispatcher::EmplaceSynthetic(uint32_t hNotify, + std::shared_ptr notification) +{ + std::lock_guard lock(mutex); + syntheticNotifications.emplace(hNotify, notification); +} + long NotificationDispatcher::Erase(uint32_t hNotify, uint32_t tmms) { const auto status = deleteNotification(hNotify, tmms); @@ -38,6 +46,13 @@ long NotificationDispatcher::Erase(uint32_t hNotify, uint32_t tmms) return status; } +long NotificationDispatcher::EraseSynthetic(uint32_t hNotify) +{ + std::lock_guard lock(mutex); + notifications.erase(hNotify); + return ADSERR_NOERR; +} + std::shared_ptr NotificationDispatcher::Find(uint32_t hNotify) { std::lock_guard lock(mutex); @@ -48,6 +63,21 @@ std::shared_ptr NotificationDispatcher::Find(uint32_t hNotify) return {}; } +std::vector > +NotificationDispatcher::FindSynthetic(uint32_t type) +{ + std::vector > found; + + std::lock_guard lock(mutex); + for (auto ¬ification : syntheticNotifications) { + if (notification.second->type == type) { + found.push_back(notification.second); + } + } + + return found; +} + void NotificationDispatcher::Notify() { sem.release(); @@ -60,6 +90,13 @@ void NotificationDispatcher::Run() if (stopExecution) { return; } + if (ring.BytesAvailable() == 0) { + for (auto ¬ification : FindSynthetic(NOTIFY_CONNECTION_LOST)) { + notification->Notify(); + } + continue; + } + auto fullLength = ring.ReadFromLittleEndian(); const auto length = ring.ReadFromLittleEndian(); (void)length; @@ -97,5 +134,9 @@ void NotificationDispatcher::Run() } cleanup: ring.Read(fullLength); + + for (auto ¬ification : FindSynthetic(NOTIFY_NOTIFICATION_RCV)) { + notification->Notify(); + } } }