diff --git a/csharp/Platform.Data.Tests/MultithreadedStorage/MapReduceCombinedLinksStorageTests.cs b/csharp/Platform.Data.Tests/MultithreadedStorage/MapReduceCombinedLinksStorageTests.cs new file mode 100644 index 0000000..fff26cb --- /dev/null +++ b/csharp/Platform.Data.Tests/MultithreadedStorage/MapReduceCombinedLinksStorageTests.cs @@ -0,0 +1,308 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; +using Xunit; +using Platform.Data.MultithreadedStorage; + +#pragma warning disable CS1591 // Missing XML comment for publicly visible type or member + +namespace Platform.Data.Tests.MultithreadedStorage +{ + /// + /// + /// Tests for the MapReduceCombinedLinksStorage implementation. + /// + /// + /// + public class MapReduceCombinedLinksStorageTests : IDisposable + { + private readonly LinksConstants _constants; + private readonly MapReduceCombinedLinksStorage> _storage; + + public MapReduceCombinedLinksStorageTests() + { + _constants = new LinksConstants(enableExternalReferencesSupport: false); + var config = StorageConfiguration.CreateDefault(); + config.NumberOfSections = 2; // Use 2 sections for testing + config.MaxSectionCapacity = 1000; // Small capacity for testing + _storage = new MapReduceCombinedLinksStorage>(_constants, config); + } + + [Fact] + public void Constructor_SetsPropertiesCorrectly() + { + // Assert + Assert.Equal(_constants, _storage.Constants); + Assert.Equal(2, _storage.SectionCount); + Assert.Equal(1000, _storage.MaxSectionCapacity); + } + + [Fact] + public void Count_EmptyStorage_ReturnsZero() + { + // Act + var count = _storage.Count(null); + + // Assert + Assert.Equal(0UL, count); + } + + [Fact] + public void Create_SingleLink_ReturnsValidAddress() + { + // Arrange + var substitution = new List { _constants.Null, _constants.Null }; + + // Act + var result = _storage.Create(substitution, null); + + // Assert + Assert.NotEqual(_constants.Null, result); + Assert.True(result > 0UL); + } + + [Fact] + public void Create_MultipleLinks_DistributesAcrossSections() + { + // Arrange + var substitution = new List { _constants.Null, _constants.Null }; + var createdLinks = new List(); + + // Act - Create multiple links + for (int i = 0; i < 10; i++) + { + var result = _storage.Create(substitution, null); + Assert.NotEqual(_constants.Null, result); + createdLinks.Add(result); + } + + // Assert + Assert.Equal(10, createdLinks.Count); + Assert.Equal(createdLinks.Count, createdLinks.Distinct().Count()); // All links should be unique + } + + [Fact] + public void Count_AfterCreatingLinks_ReturnsCorrectCount() + { + // Arrange + var substitution = new List { _constants.Null, _constants.Null }; + var linksToCreate = 5; + + // Act + for (int i = 0; i < linksToCreate; i++) + { + _storage.Create(substitution, null); + } + var count = _storage.Count(null); + + // Assert + Assert.Equal((ulong)linksToCreate, count); + } + + [Fact] + public void Each_EmptyStorage_CallsHandlerZeroTimes() + { + // Arrange + var callCount = 0; + ulong handler(IList link) + { + callCount++; + return _constants.Continue; + } + + // Act + var result = _storage.Each(null, handler); + + // Assert + Assert.Equal(_constants.Continue, result); + Assert.Equal(0, callCount); + } + + [Fact] + public void Each_WithLinks_CallsHandlerForEachLink() + { + // Arrange + var substitution = new List { _constants.Null, _constants.Null }; + var linksToCreate = 3; + var handlerCalls = new List>(); + + // Create some links first + for (int i = 0; i < linksToCreate; i++) + { + _storage.Create(substitution, null); + } + + ulong handler(IList link) + { + handlerCalls.Add(new List(link)); + return _constants.Continue; + } + + // Act + var result = _storage.Each(null, handler); + + // Assert + Assert.Equal(_constants.Continue, result); + Assert.Equal(linksToCreate, handlerCalls.Count); + } + + [Fact] + public void Each_WithBreak_StopsIteration() + { + // Arrange + var substitution = new List { _constants.Null, _constants.Null }; + var linksToCreate = 5; + var handlerCallCount = 0; + + // Create some links first + for (int i = 0; i < linksToCreate; i++) + { + _storage.Create(substitution, null); + } + + ulong handler(IList link) + { + handlerCallCount++; + return handlerCallCount >= 2 ? _constants.Break : _constants.Continue; + } + + // Act + var result = _storage.Each(null, handler); + + // Assert + Assert.Equal(_constants.Break, result); + Assert.True(handlerCallCount >= 2); + Assert.True(handlerCallCount <= linksToCreate); + } + + [Fact] + public void Update_ExistingLink_UpdatesSuccessfully() + { + // Arrange + var initialSubstitution = new List { 100UL, 200UL }; + var newSubstitution = new List { 300UL, 400UL }; + + var createdLink = _storage.Create(initialSubstitution, null); + Assert.NotEqual(_constants.Null, createdLink); + + var restriction = new List { createdLink }; + + // Act + var updateResult = _storage.Update(restriction, newSubstitution, null); + + // Assert + Assert.Equal(_constants.Continue, updateResult); + } + + [Fact] + public void Delete_ExistingLink_DeletesSuccessfully() + { + // Arrange + var substitution = new List { _constants.Null, _constants.Null }; + var createdLink = _storage.Create(substitution, null); + Assert.NotEqual(_constants.Null, createdLink); + + var initialCount = _storage.Count(null); + var restriction = new List { createdLink }; + + // Act + var deleteResult = _storage.Delete(restriction, null); + var finalCount = _storage.Count(null); + + // Assert + Assert.Equal(_constants.Continue, deleteResult); + Assert.Equal(initialCount - 1, finalCount); + } + + [Fact] + public async Task ConcurrentOperations_MultipleThreads_HandledCorrectly() + { + // Arrange + var substitution = new List { _constants.Null, _constants.Null }; + var tasks = new List>(); + var numberOfOperations = 20; + + // Act - Create multiple links concurrently + for (int i = 0; i < numberOfOperations; i++) + { + tasks.Add(Task.Run(() => _storage.Create(substitution, null))); + } + + var results = await Task.WhenAll(tasks); + + // Assert + Assert.Equal(numberOfOperations, results.Length); + Assert.All(results, result => Assert.NotEqual(_constants.Null, result)); + Assert.Equal(results.Length, results.Distinct().Count()); // All results should be unique + + var finalCount = _storage.Count(null); + Assert.Equal((ulong)numberOfOperations, finalCount); + } + + [Fact] + public void StorageConfiguration_DefaultValues_AreValid() + { + // Act + var config = StorageConfiguration.CreateDefault(); + + // Assert + Assert.True(config.MaxSectionCapacity > 0); + Assert.True(config.EffectiveNumberOfSections > 0); + Assert.Equal(SectionAllocationMode.Heap, config.AllocationMode); + Assert.True(config.RequestTimeoutMs > 0); + Assert.True(config.TargetCpuUtilization > 0 && config.TargetCpuUtilization <= 1.0); + + // Should not throw + config.Validate(); + } + + [Fact] + public void StorageConfiguration_HighThroughput_HasCorrectSettings() + { + // Act + var config = StorageConfiguration.CreateHighThroughput(); + + // Assert + Assert.True(config.MaxSectionCapacity > StorageConfiguration.CreateDefault().MaxSectionCapacity); + Assert.True(config.EffectiveNumberOfSections >= Environment.ProcessorCount); + Assert.Equal(SectionAllocationMode.Heap, config.AllocationMode); + Assert.True(config.EnableAutoExpansion); + Assert.True(config.EnablePerformanceMonitoring); + + // Should not throw + config.Validate(); + } + + [Fact] + public void StorageConfiguration_Validation_ThrowsForInvalidValues() + { + // Arrange + var config = StorageConfiguration.CreateDefault(); + + // Act & Assert + config.MaxSectionCapacity = 0; + Assert.Throws(() => config.Validate()); + + config.MaxSectionCapacity = 1000; + config.NumberOfSections = 0; + Assert.Throws(() => config.Validate()); + + config.NumberOfSections = 2; + config.RequestTimeoutMs = 0; + Assert.Throws(() => config.Validate()); + + config.RequestTimeoutMs = 1000; + config.TargetCpuUtilization = 0; + Assert.Throws(() => config.Validate()); + + config.TargetCpuUtilization = 1.5; + Assert.Throws(() => config.Validate()); + } + + public void Dispose() + { + _storage?.Dispose(); + } + } +} \ No newline at end of file diff --git a/csharp/Platform.Data/MultithreadedStorage/IRequestQueue.cs b/csharp/Platform.Data/MultithreadedStorage/IRequestQueue.cs new file mode 100644 index 0000000..a1ffd2b --- /dev/null +++ b/csharp/Platform.Data/MultithreadedStorage/IRequestQueue.cs @@ -0,0 +1,170 @@ +using System; +using System.Collections.Generic; +using System.Numerics; +using System.Threading; +using System.Threading.Tasks; + +#pragma warning disable CS1591 // Missing XML comment for publicly visible type or member + +namespace Platform.Data.MultithreadedStorage +{ + /// + /// + /// Represents a lock-free queue for handling storage requests between threads. + /// + /// + /// + /// + /// The type of link address. + /// + /// + public interface IRequestQueue : IDisposable + where TLinkAddress : IUnsignedNumber, IComparable + { + /// + /// Gets the number of pending requests in the queue. + /// + /// + int Count { get; } + + /// + /// Gets whether the queue is empty. + /// + /// + bool IsEmpty { get; } + + /// + /// Enqueues a request for processing. + /// + /// + /// + /// The request to enqueue. + /// + /// + /// + /// True if the request was successfully enqueued, false otherwise. + /// + /// + bool TryEnqueue(StorageRequest request); + + /// + /// Dequeues a request for processing. + /// + /// + /// + /// The dequeued request, if any. + /// + /// + /// + /// True if a request was successfully dequeued, false otherwise. + /// + /// + bool TryDequeue(out StorageRequest? request); + + /// + /// Waits for a request to become available and dequeues it. + /// + /// + /// + /// The cancellation token. + /// + /// + /// + /// A task that completes with the dequeued request, or null if cancelled. + /// + /// + Task?> WaitForRequestAsync(CancellationToken cancellationToken = default); + + /// + /// Signals that new requests are available for processing. + /// + /// + void SignalNewRequest(); + + /// + /// Shuts down the queue and prevents new requests from being enqueued. + /// + /// + void Shutdown(); + } + + /// + /// + /// Represents a lock-free queue for handling storage results between threads. + /// + /// + /// + /// + /// The type of link address. + /// + /// + public interface IResultQueue : IDisposable + where TLinkAddress : IUnsignedNumber, IComparable + { + /// + /// Gets the number of pending results in the queue. + /// + /// + int Count { get; } + + /// + /// Gets whether the queue is empty. + /// + /// + bool IsEmpty { get; } + + /// + /// Enqueues a result from processing. + /// + /// + /// + /// The result to enqueue. + /// + /// + /// + /// True if the result was successfully enqueued, false otherwise. + /// + /// + bool TryEnqueue(StorageResult result); + + /// + /// Dequeues a result from processing. + /// + /// + /// + /// The dequeued result, if any. + /// + /// + /// + /// True if a result was successfully dequeued, false otherwise. + /// + /// + bool TryDequeue(out StorageResult? result); + + /// + /// Waits for a result to become available and dequeues it. + /// + /// + /// + /// The cancellation token. + /// + /// + /// + /// A task that completes with the dequeued result, or null if cancelled. + /// + /// + Task?> WaitForResultAsync(CancellationToken cancellationToken = default); + + /// + /// Signals that new results are available for processing. + /// + /// + void SignalNewResult(); + + /// + /// Shuts down the queue and prevents new results from being enqueued. + /// + /// + void Shutdown(); + } +} \ No newline at end of file diff --git a/csharp/Platform.Data/MultithreadedStorage/IStorageSection.cs b/csharp/Platform.Data/MultithreadedStorage/IStorageSection.cs new file mode 100644 index 0000000..f9d0826 --- /dev/null +++ b/csharp/Platform.Data/MultithreadedStorage/IStorageSection.cs @@ -0,0 +1,151 @@ +using System; +using System.Collections.Generic; +using System.Numerics; +using Platform.Delegates; + +#pragma warning disable CS1591 // Missing XML comment for publicly visible type or member + +namespace Platform.Data.MultithreadedStorage +{ + /// + /// + /// Represents a single section of links storage that can be managed by a dedicated thread. + /// + /// + /// + /// + /// The type of link address. + /// + /// + public interface IStorageSection : IDisposable + where TLinkAddress : IUnsignedNumber, IComparable + { + /// + /// Gets the range of link addresses managed by this section. + /// + /// + (TLinkAddress Min, TLinkAddress Max) AddressRange { get; } + + /// + /// Gets the thread ID that manages this section. + /// + /// + int ThreadId { get; } + + /// + /// Gets the maximum capacity of this section. + /// + /// + TLinkAddress Capacity { get; } + + /// + /// Gets the current number of links in this section. + /// + /// + TLinkAddress Count { get; } + + /// + /// Determines if a link address belongs to this section. + /// + /// + /// + /// The link address to check. + /// + /// + /// + /// True if the address belongs to this section, false otherwise. + /// + /// + bool ContainsAddress(TLinkAddress address); + + /// + /// Counts links that match the specified restriction within this section. + /// + /// + /// + /// The restriction to apply. + /// + /// + /// + /// The number of matching links in this section. + /// + /// + TLinkAddress CountLinks(IList? restriction); + + /// + /// Iterates through links that match the specified restriction within this section. + /// + /// + /// + /// The restriction to apply. + /// + /// + /// + /// The handler to call for each matching link. + /// + /// + /// + /// The result of the iteration. + /// + /// + TLinkAddress EachLink(IList? restriction, ReadHandler? handler); + + /// + /// Creates a new link in this section. + /// + /// + /// + /// The content of the new link. + /// + /// + /// + /// The handler to call for write operations. + /// + /// + /// + /// The result of the create operation. + /// + /// + TLinkAddress CreateLink(IList? substitution, WriteHandler? handler); + + /// + /// Updates a link in this section. + /// + /// + /// + /// The restriction to identify the link to update. + /// + /// + /// + /// The new content for the link. + /// + /// + /// + /// The handler to call for write operations. + /// + /// + /// + /// The result of the update operation. + /// + /// + TLinkAddress UpdateLink(IList? restriction, IList? substitution, WriteHandler? handler); + + /// + /// Deletes links in this section. + /// + /// + /// + /// The restriction to identify links to delete. + /// + /// + /// + /// The handler to call for write operations. + /// + /// + /// + /// The result of the delete operation. + /// + /// + TLinkAddress DeleteLink(IList? restriction, WriteHandler? handler); + } +} \ No newline at end of file diff --git a/csharp/Platform.Data/MultithreadedStorage/InMemoryStorageSection.cs b/csharp/Platform.Data/MultithreadedStorage/InMemoryStorageSection.cs new file mode 100644 index 0000000..433b871 --- /dev/null +++ b/csharp/Platform.Data/MultithreadedStorage/InMemoryStorageSection.cs @@ -0,0 +1,333 @@ +using System; +using System.Collections.Generic; +using System.Collections.Concurrent; +using System.Linq; +using System.Numerics; +using System.Runtime.CompilerServices; +using Platform.Delegates; + +#pragma warning disable CS1591 // Missing XML comment for publicly visible type or member + +namespace Platform.Data.MultithreadedStorage +{ + /// + /// + /// In-memory implementation of a storage section that manages a specific range of link addresses. + /// Each section operates independently and can be managed by a dedicated thread. + /// + /// + /// + /// + /// The type of link address. + /// + /// + public class InMemoryStorageSection : IStorageSection + where TLinkAddress : IUnsignedNumber, IComparable + { + private readonly LinksConstants _constants; + private readonly ConcurrentDictionary _links; + private readonly (TLinkAddress Min, TLinkAddress Max) _addressRange; + private readonly int _threadId; + private readonly TLinkAddress _capacity; + private TLinkAddress _nextAvailableAddress; + private volatile bool _disposed; + + /// + /// Gets the range of link addresses managed by this section. + /// + /// + public (TLinkAddress Min, TLinkAddress Max) AddressRange => _addressRange; + + /// + /// Gets the thread ID that manages this section. + /// + /// + public int ThreadId => _threadId; + + /// + /// Gets the maximum capacity of this section. + /// + /// + public TLinkAddress Capacity => _capacity; + + /// + /// Gets the current number of links in this section. + /// + /// + public TLinkAddress Count => TLinkAddress.CreateTruncating((ulong)_links.Count); + + /// + /// + /// Initializes a new instance of the class. + /// + /// + /// + /// + /// The thread ID that will manage this section. + /// + /// + /// + /// The range of addresses this section manages. + /// + /// + /// + /// The constants for link operations. + /// + /// + public InMemoryStorageSection(int threadId, (TLinkAddress Min, TLinkAddress Max) addressRange, LinksConstants constants) + { + _threadId = threadId; + _addressRange = addressRange; + _constants = constants ?? throw new ArgumentNullException(nameof(constants)); + _capacity = addressRange.Max - addressRange.Min + TLinkAddress.One; + _nextAvailableAddress = addressRange.Min; + _links = new ConcurrentDictionary(); + _disposed = false; + } + + /// + /// Determines if a link address belongs to this section. + /// + /// + /// + /// The link address to check. + /// + /// + /// + /// True if the address belongs to this section, false otherwise. + /// + /// + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public bool ContainsAddress(TLinkAddress address) + { + return address.CompareTo(_addressRange.Min) >= 0 && address.CompareTo(_addressRange.Max) <= 0; + } + + /// + /// Counts links that match the specified restriction within this section. + /// + /// + /// + /// The restriction to apply. + /// + /// + /// + /// The number of matching links in this section. + /// + /// + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public TLinkAddress CountLinks(IList? restriction) + { + if (_disposed) + return TLinkAddress.Zero; + + if (restriction == null || restriction.Count == 0) + return Count; + + var count = TLinkAddress.Zero; + foreach (var link in _links.Values) + { + if (MatchesRestriction(link, restriction)) + count++; + } + return count; + } + + /// + /// Iterates through links that match the specified restriction within this section. + /// + /// + /// + /// The restriction to apply. + /// + /// + /// + /// The handler to call for each matching link. + /// + /// + /// + /// The result of the iteration. + /// + /// + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public TLinkAddress EachLink(IList? restriction, ReadHandler? handler) + { + if (_disposed || handler == null) + return _constants.Continue; + + foreach (var kvp in _links) + { + var link = kvp.Value; + if (restriction == null || restriction.Count == 0 || MatchesRestriction(link, restriction)) + { + var result = handler(link); + if (EqualityComparer.Default.Equals(result, _constants.Break)) + return _constants.Break; + } + } + return _constants.Continue; + } + + /// + /// Creates a new link in this section. + /// + /// + /// + /// The content of the new link. + /// + /// + /// + /// The handler to call for write operations. + /// + /// + /// + /// The result of the create operation. + /// + /// + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public TLinkAddress CreateLink(IList? substitution, WriteHandler? handler) + { + if (_disposed) + return _constants.Null; + + if (_nextAvailableAddress.CompareTo(_addressRange.Max) > 0) + return _constants.Null; // Section is full + + var newAddress = _nextAvailableAddress++; + var linkContent = CreateLinkContent(newAddress, substitution); + + if (_links.TryAdd(newAddress, linkContent)) + { + handler?.Invoke(Array.Empty(), linkContent); + return newAddress; + } + + return _constants.Null; + } + + /// + /// Updates a link in this section. + /// + /// + /// + /// The restriction to identify the link to update. + /// + /// + /// + /// The new content for the link. + /// + /// + /// + /// The handler to call for write operations. + /// + /// + /// + /// The result of the update operation. + /// + /// + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public TLinkAddress UpdateLink(IList? restriction, IList? substitution, WriteHandler? handler) + { + if (_disposed || restriction == null || restriction.Count == 0) + return _constants.Continue; + + var linkAddress = restriction[_constants.IndexPart]; + if (!ContainsAddress(linkAddress)) + return _constants.Continue; + + if (_links.TryGetValue(linkAddress, out var oldLink)) + { + var newLinkContent = CreateLinkContent(linkAddress, substitution); + if (_links.TryUpdate(linkAddress, newLinkContent, oldLink)) + { + handler?.Invoke(oldLink, newLinkContent); + return _constants.Continue; + } + } + + return _constants.Continue; + } + + /// + /// Deletes links in this section. + /// + /// + /// + /// The restriction to identify links to delete. + /// + /// + /// + /// The handler to call for write operations. + /// + /// + /// + /// The result of the delete operation. + /// + /// + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public TLinkAddress DeleteLink(IList? restriction, WriteHandler? handler) + { + if (_disposed || restriction == null || restriction.Count == 0) + return _constants.Continue; + + var linkAddress = restriction[_constants.IndexPart]; + if (!ContainsAddress(linkAddress)) + return _constants.Continue; + + if (_links.TryRemove(linkAddress, out var deletedLink)) + { + var emptyLink = new TLinkAddress[deletedLink.Length]; + handler?.Invoke(deletedLink, emptyLink); + return _constants.Continue; + } + + return _constants.Continue; + } + + private TLinkAddress[] CreateLinkContent(TLinkAddress address, IList? substitution) + { + if (substitution == null || substitution.Count == 0) + { + // Create a basic triplet: [index, null, null] + return new TLinkAddress[] { address, _constants.Null, _constants.Null }; + } + + var content = new TLinkAddress[Math.Max(3, substitution.Count + 1)]; + content[_constants.IndexPart] = address; + + for (int i = 0; i < substitution.Count; i++) + { + content[i + 1] = substitution[i]; + } + + return content; + } + + private bool MatchesRestriction(TLinkAddress[] link, IList restriction) + { + for (int i = 0; i < Math.Min(link.Length, restriction.Count); i++) + { + var restrictionValue = restriction[i]; + if (!EqualityComparer.Default.Equals(restrictionValue, _constants.Any) && + !EqualityComparer.Default.Equals(restrictionValue, link[i])) + { + return false; + } + } + return true; + } + + /// + /// Disposes the section and releases its resources. + /// + /// + public void Dispose() + { + if (_disposed) + return; + + _disposed = true; + _links.Clear(); + } + } +} \ No newline at end of file diff --git a/csharp/Platform.Data/MultithreadedStorage/LockFreeQueue.cs b/csharp/Platform.Data/MultithreadedStorage/LockFreeQueue.cs new file mode 100644 index 0000000..99c64f5 --- /dev/null +++ b/csharp/Platform.Data/MultithreadedStorage/LockFreeQueue.cs @@ -0,0 +1,302 @@ +using System; +using System.Collections.Concurrent; +using System.Numerics; +using System.Threading; +using System.Threading.Tasks; + +#pragma warning disable CS1591 // Missing XML comment for publicly visible type or member + +namespace Platform.Data.MultithreadedStorage +{ + /// + /// + /// Lock-free implementation of request queue using ConcurrentQueue. + /// + /// + /// + /// + /// The type of link address. + /// + /// + public class LockFreeRequestQueue : IRequestQueue + where TLinkAddress : IUnsignedNumber, IComparable + { + private readonly ConcurrentQueue> _queue; + private readonly SemaphoreSlim _semaphore; + private volatile bool _isShutdown; + + /// + /// Gets the number of pending requests in the queue. + /// + /// + public int Count => _queue.Count; + + /// + /// Gets whether the queue is empty. + /// + /// + public bool IsEmpty => _queue.IsEmpty; + + /// + /// + /// Initializes a new instance of the class. + /// + /// + /// + public LockFreeRequestQueue() + { + _queue = new ConcurrentQueue>(); + _semaphore = new SemaphoreSlim(0); + _isShutdown = false; + } + + /// + /// Enqueues a request for processing. + /// + /// + /// + /// The request to enqueue. + /// + /// + /// + /// True if the request was successfully enqueued, false otherwise. + /// + /// + public bool TryEnqueue(StorageRequest request) + { + if (_isShutdown) + return false; + + _queue.Enqueue(request); + _semaphore.Release(); + return true; + } + + /// + /// Dequeues a request for processing. + /// + /// + /// + /// The dequeued request, if any. + /// + /// + /// + /// True if a request was successfully dequeued, false otherwise. + /// + /// + public bool TryDequeue(out StorageRequest? request) + { + return _queue.TryDequeue(out request); + } + + /// + /// Waits for a request to become available and dequeues it. + /// + /// + /// + /// The cancellation token. + /// + /// + /// + /// A task that completes with the dequeued request, or null if cancelled. + /// + /// + public async Task?> WaitForRequestAsync(CancellationToken cancellationToken = default) + { + if (_isShutdown) + return null; + + try + { + await _semaphore.WaitAsync(cancellationToken); + if (_isShutdown || cancellationToken.IsCancellationRequested) + return null; + + if (TryDequeue(out var request)) + return request; + + return null; + } + catch (OperationCanceledException) + { + return null; + } + } + + /// + /// Signals that new requests are available for processing. + /// + /// + public void SignalNewRequest() + { + if (!_isShutdown) + _semaphore.Release(); + } + + /// + /// Shuts down the queue and prevents new requests from being enqueued. + /// + /// + public void Shutdown() + { + _isShutdown = true; + _semaphore.Release(100); // Release enough to wake up all waiting threads + } + + /// + /// Disposes the queue resources. + /// + /// + public void Dispose() + { + Shutdown(); + _semaphore.Dispose(); + } + } + + /// + /// + /// Lock-free implementation of result queue using ConcurrentQueue. + /// + /// + /// + /// + /// The type of link address. + /// + /// + public class LockFreeResultQueue : IResultQueue + where TLinkAddress : IUnsignedNumber, IComparable + { + private readonly ConcurrentQueue> _queue; + private readonly SemaphoreSlim _semaphore; + private volatile bool _isShutdown; + + /// + /// Gets the number of pending results in the queue. + /// + /// + public int Count => _queue.Count; + + /// + /// Gets whether the queue is empty. + /// + /// + public bool IsEmpty => _queue.IsEmpty; + + /// + /// + /// Initializes a new instance of the class. + /// + /// + /// + public LockFreeResultQueue() + { + _queue = new ConcurrentQueue>(); + _semaphore = new SemaphoreSlim(0); + _isShutdown = false; + } + + /// + /// Enqueues a result from processing. + /// + /// + /// + /// The result to enqueue. + /// + /// + /// + /// True if the result was successfully enqueued, false otherwise. + /// + /// + public bool TryEnqueue(StorageResult result) + { + if (_isShutdown) + return false; + + _queue.Enqueue(result); + _semaphore.Release(); + return true; + } + + /// + /// Dequeues a result from processing. + /// + /// + /// + /// The dequeued result, if any. + /// + /// + /// + /// True if a result was successfully dequeued, false otherwise. + /// + /// + public bool TryDequeue(out StorageResult? result) + { + return _queue.TryDequeue(out result); + } + + /// + /// Waits for a result to become available and dequeues it. + /// + /// + /// + /// The cancellation token. + /// + /// + /// + /// A task that completes with the dequeued result, or null if cancelled. + /// + /// + public async Task?> WaitForResultAsync(CancellationToken cancellationToken = default) + { + if (_isShutdown) + return null; + + try + { + await _semaphore.WaitAsync(cancellationToken); + if (_isShutdown || cancellationToken.IsCancellationRequested) + return null; + + if (TryDequeue(out var result)) + return result; + + return null; + } + catch (OperationCanceledException) + { + return null; + } + } + + /// + /// Signals that new results are available for processing. + /// + /// + public void SignalNewResult() + { + if (!_isShutdown) + _semaphore.Release(); + } + + /// + /// Shuts down the queue and prevents new results from being enqueued. + /// + /// + public void Shutdown() + { + _isShutdown = true; + _semaphore.Release(100); // Release enough to wake up all waiting threads + } + + /// + /// Disposes the queue resources. + /// + /// + public void Dispose() + { + Shutdown(); + _semaphore.Dispose(); + } + } +} \ No newline at end of file diff --git a/csharp/Platform.Data/MultithreadedStorage/MapReduceCombinedLinksStorage.cs b/csharp/Platform.Data/MultithreadedStorage/MapReduceCombinedLinksStorage.cs new file mode 100644 index 0000000..f3d1cc0 --- /dev/null +++ b/csharp/Platform.Data/MultithreadedStorage/MapReduceCombinedLinksStorage.cs @@ -0,0 +1,426 @@ +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Linq; +using System.Numerics; +using System.Runtime.CompilerServices; +using System.Threading; +using System.Threading.Tasks; +using Platform.Delegates; +using Platform.Ranges; + +#pragma warning disable CS1591 // Missing XML comment for publicly visible type or member + +namespace Platform.Data.MultithreadedStorage +{ + /// + /// + /// MapReduce multithread combined storage implementation that distributes links across multiple sections, + /// each managed by a dedicated thread. Implements lock-free queues for request/response streaming. + /// + /// + /// + /// + /// The type of link address. + /// + /// + /// + /// The type of constants. + /// + /// + public class MapReduceCombinedLinksStorage : ILinks, IDisposable + where TLinkAddress : IUnsignedNumber, IComparable + where TConstants : LinksConstants + { + private readonly TConstants _constants; + private readonly List> _sections; + private readonly List> _requestQueues; + private readonly IResultQueue _resultQueue; + private readonly List _sectionTasks; + private readonly CancellationTokenSource _cancellationTokenSource; + private readonly int _maxSectionCapacity; + private readonly int _numberOfSections; + private readonly object _sectionLock; + private volatile bool _disposed; + + /// + /// Gets the constants for this storage instance. + /// + /// + public TConstants Constants + { + [MethodImpl(MethodImplOptions.AggressiveInlining)] + get => _constants; + } + + /// + /// Gets the number of active sections. + /// + /// + public int SectionCount => _sections.Count; + + /// + /// Gets the maximum capacity per section. + /// + /// + public int MaxSectionCapacity => _maxSectionCapacity; + + /// + /// + /// Initializes a new instance of the class. + /// + /// + /// + /// + /// The constants for this storage instance. + /// + /// + /// + /// The configuration for this storage instance. + /// + /// + public MapReduceCombinedLinksStorage(TConstants constants, StorageConfiguration? configuration = null) + { + _constants = constants ?? throw new ArgumentNullException(nameof(constants)); + var config = configuration ?? StorageConfiguration.CreateDefault(); + config.Validate(); + _maxSectionCapacity = config.MaxSectionCapacity; + _numberOfSections = config.EffectiveNumberOfSections; + _sections = new List>(); + _requestQueues = new List>(); + _resultQueue = new LockFreeResultQueue(); + _sectionTasks = new List(); + _cancellationTokenSource = new CancellationTokenSource(); + _sectionLock = new object(); + _disposed = false; + + InitializeSections(); + } + + private void InitializeSections() + { + for (int i = 0; i < _numberOfSections; i++) + { + CreateNewSection(); + } + } + + private void CreateNewSection() + { + var sectionId = _sections.Count; + var addressRangeStart = TLinkAddress.CreateTruncating((ulong)sectionId * (ulong)_maxSectionCapacity + 1); + var addressRangeEnd = TLinkAddress.CreateTruncating((ulong)(sectionId + 1) * (ulong)_maxSectionCapacity); + + // Create a basic in-memory section implementation + var section = new InMemoryStorageSection(sectionId, (addressRangeStart, addressRangeEnd), _constants); + var requestQueue = new LockFreeRequestQueue(); + + _sections.Add(section); + _requestQueues.Add(requestQueue); + + // Start section processing task + var task = Task.Run(() => ProcessSectionRequests(section, requestQueue, _cancellationTokenSource.Token)); + _sectionTasks.Add(task); + } + + private async Task ProcessSectionRequests(IStorageSection section, IRequestQueue requestQueue, CancellationToken cancellationToken) + { + while (!cancellationToken.IsCancellationRequested) + { + try + { + var request = await requestQueue.WaitForRequestAsync(cancellationToken); + if (request == null) + continue; + + var result = ProcessRequest(section, request); + _resultQueue.TryEnqueue(result); + } + catch (OperationCanceledException) + { + break; + } + catch (Exception ex) + { + // Log error and continue processing + Console.WriteLine($"Error processing request in section {section.ThreadId}: {ex.Message}"); + } + } + } + + private StorageResult ProcessRequest(IStorageSection section, StorageRequest request) + { + try + { + TLinkAddress result = request.OperationType switch + { + StorageOperationType.Count => section.CountLinks(request.Restriction), + StorageOperationType.Each => section.EachLink(request.Restriction, request.ReadHandler), + StorageOperationType.Create => section.CreateLink(request.Substitution, request.WriteHandler), + StorageOperationType.Update => section.UpdateLink(request.Restriction, request.Substitution, request.WriteHandler), + StorageOperationType.Delete => section.DeleteLink(request.Restriction, request.WriteHandler), + _ => throw new ArgumentException($"Unknown operation type: {request.OperationType}") + }; + + return new StorageResult(request.Id, section.ThreadId, result); + } + catch (Exception ex) + { + return new StorageResult(request.Id, section.ThreadId, default, ex); + } + } + + /// + /// Maps a request to all relevant sections and reduces the results. + /// + /// + /// + /// The request to process. + /// + /// + /// + /// The reduced result from all sections. + /// + /// + private async Task MapReduceRequest(StorageRequest request) + { + var relevantSections = GetRelevantSections(request); + var tasks = new List>(); + + // Map: send request to all relevant sections + foreach (var (section, queue) in relevantSections) + { + var sectionRequest = new StorageRequest( + request.OperationType, + request.Restriction, + request.Substitution, + request.ReadHandler, + request.WriteHandler); + + queue.TryEnqueue(sectionRequest); + tasks.Add(sectionRequest.CompletionSource.Task); + } + + // Wait for results from result queue + var results = new List>(); + var expectedResults = tasks.Count; + var resultTasks = tasks.Select(async task => + { + while (true) + { + var result = await _resultQueue.WaitForResultAsync(_cancellationTokenSource.Token); + if (result != null) + { + results.Add(result); + + // Find and complete the corresponding task + var matchingTask = tasks.FirstOrDefault(t => !t.IsCompleted); + if (matchingTask != null && result.IsSuccess) + { + try + { + ((TaskCompletionSource)matchingTask.AsyncState!)?.SetResult(result.Value); + } + catch { } + } + + if (results.Count >= expectedResults) + break; + } + } + }); + + await Task.WhenAll(resultTasks); + + // Reduce: combine results based on operation type + return ReduceResults(request.OperationType, results); + } + + private List<(IStorageSection section, IRequestQueue queue)> GetRelevantSections(StorageRequest request) + { + var relevantSections = new List<(IStorageSection, IRequestQueue)>(); + + // For now, send to all sections. In a more sophisticated implementation, + // we would analyze the restriction to determine which sections are relevant + for (int i = 0; i < _sections.Count; i++) + { + relevantSections.Add((_sections[i], _requestQueues[i])); + } + + return relevantSections; + } + + private TLinkAddress ReduceResults(StorageOperationType operationType, List> results) + { + var successfulResults = results.Where(r => r.IsSuccess).ToList(); + + return operationType switch + { + StorageOperationType.Count => successfulResults.Aggregate(TLinkAddress.Zero, (sum, result) => sum + result.Value), + StorageOperationType.Each => successfulResults.LastOrDefault()?.Value ?? _constants.Continue, + StorageOperationType.Create => successfulResults.FirstOrDefault()?.Value ?? _constants.Null, + StorageOperationType.Update => successfulResults.FirstOrDefault()?.Value ?? _constants.Continue, + StorageOperationType.Delete => successfulResults.FirstOrDefault()?.Value ?? _constants.Continue, + _ => throw new ArgumentException($"Unknown operation type: {operationType}") + }; + } + + #region ILinks Implementation + + /// + /// Counts and returns the total number of links in the storage that meet the specified restriction. + /// + /// + /// + /// Restriction on the contents of links. + /// + /// + /// + /// The total number of links in the storage that meet the specified restriction. + /// + /// + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public TLinkAddress Count(IList? restriction) + { + var request = new StorageRequest(StorageOperationType.Count, restriction); + return MapReduceRequest(request).GetAwaiter().GetResult(); + } + + /// + /// Passes through all the links matching the pattern, invoking a handler for each matching link. + /// + /// + /// + /// Restriction on the contents of links. + /// + /// + /// + /// A handler for each matching link. + /// + /// + /// + /// Constants.Continue, if the pass through the links was not interrupted, and Constants.Break otherwise. + /// + /// + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public TLinkAddress Each(IList? restriction, ReadHandler? handler) + { + var request = new StorageRequest(StorageOperationType.Each, restriction, readHandler: handler); + return MapReduceRequest(request).GetAwaiter().GetResult(); + } + + /// + /// Creates a link. + /// + /// + /// + /// The content of a new link. + /// + /// + /// + /// A function to handle each executed change. + /// + /// + /// + /// Constants.Continue if all executed changes are handled. + /// + /// + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public TLinkAddress Create(IList? substitution, WriteHandler? handler) + { + var request = new StorageRequest(StorageOperationType.Create, substitution: substitution, writeHandler: handler); + return MapReduceRequest(request).GetAwaiter().GetResult(); + } + + /// + /// Updates a link. + /// + /// + /// + /// Restriction on the content of links. + /// + /// + /// + /// The new content for the link. + /// + /// + /// + /// A function to handle each executed change. + /// + /// + /// + /// Constants.Continue if all executed changes are handled. + /// + /// + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public TLinkAddress Update(IList? restriction, IList? substitution, WriteHandler? handler) + { + var request = new StorageRequest(StorageOperationType.Update, restriction, substitution, writeHandler: handler); + return MapReduceRequest(request).GetAwaiter().GetResult(); + } + + /// + /// Deletes links that match the specified restriction. + /// + /// + /// + /// Restriction on the content of a link. + /// + /// + /// + /// A function to handle each executed change. + /// + /// + /// + /// Constants.Continue if all executed changes are handled. + /// + /// + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public TLinkAddress Delete(IList? restriction, WriteHandler? handler) + { + var request = new StorageRequest(StorageOperationType.Delete, restriction, writeHandler: handler); + return MapReduceRequest(request).GetAwaiter().GetResult(); + } + + #endregion + + #region IDisposable Implementation + + /// + /// Disposes the storage and all its resources. + /// + /// + public void Dispose() + { + if (_disposed) + return; + + _disposed = true; + _cancellationTokenSource.Cancel(); + + // Shutdown all queues + foreach (var queue in _requestQueues) + { + queue.Shutdown(); + } + _resultQueue.Shutdown(); + + // Wait for all tasks to complete + Task.WaitAll(_sectionTasks.ToArray(), TimeSpan.FromSeconds(5)); + + // Dispose all resources + foreach (var section in _sections) + { + section.Dispose(); + } + foreach (var queue in _requestQueues) + { + queue.Dispose(); + } + _resultQueue.Dispose(); + _cancellationTokenSource.Dispose(); + } + + #endregion + } +} \ No newline at end of file diff --git a/csharp/Platform.Data/MultithreadedStorage/StorageConfiguration.cs b/csharp/Platform.Data/MultithreadedStorage/StorageConfiguration.cs new file mode 100644 index 0000000..9e3fe64 --- /dev/null +++ b/csharp/Platform.Data/MultithreadedStorage/StorageConfiguration.cs @@ -0,0 +1,247 @@ +using System; +using System.Collections.Generic; +using System.Numerics; +using Platform.Ranges; + +#pragma warning disable CS1591 // Missing XML comment for publicly visible type or member + +namespace Platform.Data.MultithreadedStorage +{ + /// + /// + /// Specifies the type of memory allocation for storage sections. + /// + /// + /// + public enum SectionAllocationMode + { + /// + /// Allocate sections in heap memory blocks. + /// + /// + Heap, + + /// + /// Allocate sections using memory-mapped files (mmap). + /// + /// + MemoryMapped, + + /// + /// Allocate sections in separate files. + /// + /// + SeparateFiles + } + + /// + /// + /// Configuration options for the MapReduce combined links storage. + /// + /// + /// + /// + /// The type of link address. + /// + /// + public class StorageConfiguration + where TLinkAddress : IUnsignedNumber, IComparable + { + /// + /// Gets or sets the maximum capacity per section. Default is 1MB worth of links. + /// + /// + public int MaxSectionCapacity { get; set; } = 1024 * 1024; + + /// + /// Gets or sets the number of sections to create initially. If null, uses CPU core count + 1. + /// + /// + public int? NumberOfSections { get; set; } + + /// + /// Gets or sets the allocation mode for storage sections. + /// + /// + public SectionAllocationMode AllocationMode { get; set; } = SectionAllocationMode.Heap; + + /// + /// Gets or sets the base directory for file-based allocations. + /// + /// + public string? BaseDirectory { get; set; } + + /// + /// Gets or sets the minimum internal references range override. + /// When set, overrides the default internal references range minimum value. + /// + /// + public TLinkAddress MinInternalReference { get; set; } + + /// + /// Gets or sets the maximum internal references range override. + /// When set, overrides the default internal references range maximum value. + /// + /// + public TLinkAddress MaxInternalReference { get; set; } + + /// + /// Gets or sets whether to enable external references support. + /// + /// + public bool EnableExternalReferencesSupport { get; set; } = false; + + /// + /// Gets or sets the timeout for request processing in milliseconds. + /// + /// + public int RequestTimeoutMs { get; set; } = 30000; // 30 seconds + + /// + /// Gets or sets whether to enable automatic section expansion when capacity is reached. + /// + /// + public bool EnableAutoExpansion { get; set; } = true; + + /// + /// Gets or sets the target CPU utilization percentage for determining optimal thread count. + /// + /// + public double TargetCpuUtilization { get; set; } = 0.8; // 80% + + /// + /// Gets or sets whether to enable performance monitoring and statistics collection. + /// + /// + public bool EnablePerformanceMonitoring { get; set; } = false; + + /// + /// + /// Gets the effective number of sections to use. + /// + /// + /// + public int EffectiveNumberOfSections => NumberOfSections ?? Environment.ProcessorCount + 1; + + /// + /// + /// Gets the internal references range based on configuration. + /// + /// + /// + /// + /// The links constants to use as a base. + /// + /// + /// + /// The configured internal references range. + /// + /// + public Range GetInternalReferencesRange(LinksConstants constants) + { + var min = EqualityComparer.Default.Equals(MinInternalReference, default) ? constants.InternalReferencesRange.Minimum : MinInternalReference; + var max = EqualityComparer.Default.Equals(MaxInternalReference, default) ? constants.InternalReferencesRange.Maximum : MaxInternalReference; + return new Range(min, max); + } + + /// + /// + /// Validates the configuration and throws an exception if invalid. + /// + /// + /// + public void Validate() + { + if (MaxSectionCapacity <= 0) + throw new ArgumentException("MaxSectionCapacity must be positive", nameof(MaxSectionCapacity)); + + if (NumberOfSections.HasValue && NumberOfSections.Value <= 0) + throw new ArgumentException("NumberOfSections must be positive when specified", nameof(NumberOfSections)); + + if (RequestTimeoutMs <= 0) + throw new ArgumentException("RequestTimeoutMs must be positive", nameof(RequestTimeoutMs)); + + if (TargetCpuUtilization <= 0 || TargetCpuUtilization > 1.0) + throw new ArgumentException("TargetCpuUtilization must be between 0 and 1", nameof(TargetCpuUtilization)); + + if ((AllocationMode == SectionAllocationMode.MemoryMapped || AllocationMode == SectionAllocationMode.SeparateFiles) + && string.IsNullOrEmpty(BaseDirectory)) + { + throw new ArgumentException("BaseDirectory must be specified for file-based allocation modes", nameof(BaseDirectory)); + } + + if (!EqualityComparer.Default.Equals(MinInternalReference, default) && + !EqualityComparer.Default.Equals(MaxInternalReference, default) && + MinInternalReference.CompareTo(MaxInternalReference) >= 0) + { + throw new ArgumentException("MinInternalReference must be less than MaxInternalReference", nameof(MinInternalReference)); + } + } + + /// + /// + /// Creates a default configuration instance. + /// + /// + /// + /// + /// A new configuration instance with default values. + /// + /// + public static StorageConfiguration CreateDefault() + { + return new StorageConfiguration(); + } + + /// + /// + /// Creates a configuration optimized for high-throughput scenarios. + /// + /// + /// + /// + /// A new configuration instance optimized for high throughput. + /// + /// + public static StorageConfiguration CreateHighThroughput() + { + return new StorageConfiguration + { + MaxSectionCapacity = 16 * 1024 * 1024, // 16MB sections + NumberOfSections = Environment.ProcessorCount * 2, + AllocationMode = SectionAllocationMode.Heap, + EnableAutoExpansion = true, + TargetCpuUtilization = 0.9, + EnablePerformanceMonitoring = true + }; + } + + /// + /// + /// Creates a configuration optimized for memory-efficient scenarios. + /// + /// + /// + /// + /// The base directory for file storage. + /// + /// + /// + /// A new configuration instance optimized for memory efficiency. + /// + /// + public static StorageConfiguration CreateMemoryEfficient(string baseDirectory) + { + return new StorageConfiguration + { + MaxSectionCapacity = 256 * 1024, // 256KB sections + NumberOfSections = Environment.ProcessorCount, + AllocationMode = SectionAllocationMode.MemoryMapped, + BaseDirectory = baseDirectory, + EnableAutoExpansion = false, + TargetCpuUtilization = 0.7, + EnablePerformanceMonitoring = false + }; + } + } +} \ No newline at end of file diff --git a/csharp/Platform.Data/MultithreadedStorage/StorageRequest.cs b/csharp/Platform.Data/MultithreadedStorage/StorageRequest.cs new file mode 100644 index 0000000..298fedf --- /dev/null +++ b/csharp/Platform.Data/MultithreadedStorage/StorageRequest.cs @@ -0,0 +1,211 @@ +using System; +using System.Collections.Generic; +using System.Numerics; +using System.Threading.Tasks; +using Platform.Delegates; + +#pragma warning disable CS1591 // Missing XML comment for publicly visible type or member + +namespace Platform.Data.MultithreadedStorage +{ + /// + /// + /// Represents the type of storage operation. + /// + /// + /// + public enum StorageOperationType + { + Count, + Each, + Create, + Update, + Delete + } + + /// + /// + /// Represents a request for a storage operation that can be processed by multiple sections. + /// + /// + /// + /// + /// The type of link address. + /// + /// + public class StorageRequest + where TLinkAddress : IUnsignedNumber, IComparable + { + /// + /// Gets the unique identifier for this request. + /// + /// + public Guid Id { get; } + + /// + /// Gets the type of storage operation. + /// + /// + public StorageOperationType OperationType { get; } + + /// + /// Gets the restriction for the operation. + /// + /// + public IList? Restriction { get; } + + /// + /// Gets the substitution for create/update operations. + /// + /// + public IList? Substitution { get; } + + /// + /// Gets the read handler for each operations. + /// + /// + public ReadHandler? ReadHandler { get; } + + /// + /// Gets the write handler for create/update/delete operations. + /// + /// + public WriteHandler? WriteHandler { get; } + + /// + /// Gets the task completion source for returning results. + /// + /// + public TaskCompletionSource CompletionSource { get; } + + /// + /// Gets the timestamp when this request was created. + /// + /// + public DateTime CreatedAt { get; } + + /// + /// + /// Initializes a new instance of the class. + /// + /// + /// + /// + /// The type of storage operation. + /// + /// + /// + /// The restriction for the operation. + /// + /// + /// + /// The substitution for create/update operations. + /// + /// + /// + /// The read handler for each operations. + /// + /// + /// + /// The write handler for create/update/delete operations. + /// + /// + public StorageRequest( + StorageOperationType operationType, + IList? restriction = null, + IList? substitution = null, + ReadHandler? readHandler = null, + WriteHandler? writeHandler = null) + { + Id = Guid.NewGuid(); + OperationType = operationType; + Restriction = restriction; + Substitution = substitution; + ReadHandler = readHandler; + WriteHandler = writeHandler; + CompletionSource = new TaskCompletionSource(); + CreatedAt = DateTime.UtcNow; + } + } + + /// + /// + /// Represents the result of processing a storage request by a section. + /// + /// + /// + /// + /// The type of link address. + /// + /// + public class StorageResult + where TLinkAddress : IUnsignedNumber, IComparable + { + /// + /// Gets the request ID this result corresponds to. + /// + /// + public Guid RequestId { get; } + + /// + /// Gets the section ID that processed this request. + /// + /// + public int SectionId { get; } + + /// + /// Gets the result value. + /// + /// + public TLinkAddress Value { get; } + + /// + /// Gets any exception that occurred during processing. + /// + /// + public Exception? Exception { get; } + + /// + /// Gets whether the operation was successful. + /// + /// + public bool IsSuccess => Exception == null; + + /// + /// Gets the timestamp when this result was created. + /// + /// + public DateTime CreatedAt { get; } + + /// + /// + /// Initializes a new instance of the class. + /// + /// + /// + /// + /// The request ID this result corresponds to. + /// + /// + /// + /// The section ID that processed this request. + /// + /// + /// + /// The result value. + /// + /// + /// + /// Any exception that occurred during processing. + /// + /// + public StorageResult(Guid requestId, int sectionId, TLinkAddress value, Exception? exception = null) + { + RequestId = requestId; + SectionId = sectionId; + Value = value; + Exception = exception; + CreatedAt = DateTime.UtcNow; + } + } +} \ No newline at end of file diff --git a/csharp/Platform.Data/Platform.Data.csproj b/csharp/Platform.Data/Platform.Data.csproj index 929fb16..99305fe 100644 --- a/csharp/Platform.Data/Platform.Data.csproj +++ b/csharp/Platform.Data/Platform.Data.csproj @@ -4,12 +4,12 @@ LinksPlatform's Platform.Data Class Library konard, FreePhoenix888 Platform.Data - 0.16.1 + 0.17.0 konard, FreePhoenix888 net8 Platform.Data Platform.Data - LinksPlatform;Data;ArgumentLinkDoesNotExistsException;ArgumentLinkHasDependenciesException;LinksLimitReachedException;LinksLimitReachedExceptionBase;LinkWithSameValueAlreadyExistsException;AddressToRawNumberConverter;RawNumberToAddressConverter;ISequenceAppender;ISequenceWalker;SequenceWalker;StopableSequenceWalker;Hybrid;ILinks;ILinksExtensions;ISynchronizedLinks;LinkAddress;LinksConstants;LinksConstantsBase;LinksConstantsExtensions;Point + LinksPlatform;Data;ArgumentLinkDoesNotExistsException;ArgumentLinkHasDependenciesException;LinksLimitReachedException;LinksLimitReachedExceptionBase;LinkWithSameValueAlreadyExistsException;AddressToRawNumberConverter;RawNumberToAddressConverter;ISequenceAppender;ISequenceWalker;SequenceWalker;StopableSequenceWalker;Hybrid;ILinks;ILinksExtensions;ISynchronizedLinks;LinkAddress;LinksConstants;LinksConstantsBase;LinksConstantsExtensions;Point;MapReduceCombinedLinksStorage;MultithreadedStorage;LockFreeQueue;StorageSection;DistributedStorage https://raw.githubusercontent.com/linksplatform/Documentation/18469f4d033ee9a5b7b84caab9c585acab2ac519/doc/Avatar-rainbow-icon-64x64.png https://linksplatform.github.io/Data Unlicense @@ -23,7 +23,7 @@ true snupkg latest - Update target framework from net7 to net8. + Add MapReduce multithread combined storage for lock-free distributed operations across multiple sections. Implements issue #77 with configurable section capacity, thread-per-section architecture, and seamless MapReduce request/response streaming. enable