diff --git a/csharp/Platform.Data.Tests/MultithreadedStorage/MapReduceCombinedLinksStorageTests.cs b/csharp/Platform.Data.Tests/MultithreadedStorage/MapReduceCombinedLinksStorageTests.cs
new file mode 100644
index 0000000..fff26cb
--- /dev/null
+++ b/csharp/Platform.Data.Tests/MultithreadedStorage/MapReduceCombinedLinksStorageTests.cs
@@ -0,0 +1,308 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Threading.Tasks;
+using Xunit;
+using Platform.Data.MultithreadedStorage;
+
+#pragma warning disable CS1591 // Missing XML comment for publicly visible type or member
+
+namespace Platform.Data.Tests.MultithreadedStorage
+{
+ ///
+ ///
+ /// Tests for the MapReduceCombinedLinksStorage implementation.
+ ///
+ ///
+ ///
+ public class MapReduceCombinedLinksStorageTests : IDisposable
+ {
+ private readonly LinksConstants _constants;
+ private readonly MapReduceCombinedLinksStorage> _storage;
+
+ public MapReduceCombinedLinksStorageTests()
+ {
+ _constants = new LinksConstants(enableExternalReferencesSupport: false);
+ var config = StorageConfiguration.CreateDefault();
+ config.NumberOfSections = 2; // Use 2 sections for testing
+ config.MaxSectionCapacity = 1000; // Small capacity for testing
+ _storage = new MapReduceCombinedLinksStorage>(_constants, config);
+ }
+
+ [Fact]
+ public void Constructor_SetsPropertiesCorrectly()
+ {
+ // Assert
+ Assert.Equal(_constants, _storage.Constants);
+ Assert.Equal(2, _storage.SectionCount);
+ Assert.Equal(1000, _storage.MaxSectionCapacity);
+ }
+
+ [Fact]
+ public void Count_EmptyStorage_ReturnsZero()
+ {
+ // Act
+ var count = _storage.Count(null);
+
+ // Assert
+ Assert.Equal(0UL, count);
+ }
+
+ [Fact]
+ public void Create_SingleLink_ReturnsValidAddress()
+ {
+ // Arrange
+ var substitution = new List { _constants.Null, _constants.Null };
+
+ // Act
+ var result = _storage.Create(substitution, null);
+
+ // Assert
+ Assert.NotEqual(_constants.Null, result);
+ Assert.True(result > 0UL);
+ }
+
+ [Fact]
+ public void Create_MultipleLinks_DistributesAcrossSections()
+ {
+ // Arrange
+ var substitution = new List { _constants.Null, _constants.Null };
+ var createdLinks = new List();
+
+ // Act - Create multiple links
+ for (int i = 0; i < 10; i++)
+ {
+ var result = _storage.Create(substitution, null);
+ Assert.NotEqual(_constants.Null, result);
+ createdLinks.Add(result);
+ }
+
+ // Assert
+ Assert.Equal(10, createdLinks.Count);
+ Assert.Equal(createdLinks.Count, createdLinks.Distinct().Count()); // All links should be unique
+ }
+
+ [Fact]
+ public void Count_AfterCreatingLinks_ReturnsCorrectCount()
+ {
+ // Arrange
+ var substitution = new List { _constants.Null, _constants.Null };
+ var linksToCreate = 5;
+
+ // Act
+ for (int i = 0; i < linksToCreate; i++)
+ {
+ _storage.Create(substitution, null);
+ }
+ var count = _storage.Count(null);
+
+ // Assert
+ Assert.Equal((ulong)linksToCreate, count);
+ }
+
+ [Fact]
+ public void Each_EmptyStorage_CallsHandlerZeroTimes()
+ {
+ // Arrange
+ var callCount = 0;
+ ulong handler(IList link)
+ {
+ callCount++;
+ return _constants.Continue;
+ }
+
+ // Act
+ var result = _storage.Each(null, handler);
+
+ // Assert
+ Assert.Equal(_constants.Continue, result);
+ Assert.Equal(0, callCount);
+ }
+
+ [Fact]
+ public void Each_WithLinks_CallsHandlerForEachLink()
+ {
+ // Arrange
+ var substitution = new List { _constants.Null, _constants.Null };
+ var linksToCreate = 3;
+ var handlerCalls = new List>();
+
+ // Create some links first
+ for (int i = 0; i < linksToCreate; i++)
+ {
+ _storage.Create(substitution, null);
+ }
+
+ ulong handler(IList link)
+ {
+ handlerCalls.Add(new List(link));
+ return _constants.Continue;
+ }
+
+ // Act
+ var result = _storage.Each(null, handler);
+
+ // Assert
+ Assert.Equal(_constants.Continue, result);
+ Assert.Equal(linksToCreate, handlerCalls.Count);
+ }
+
+ [Fact]
+ public void Each_WithBreak_StopsIteration()
+ {
+ // Arrange
+ var substitution = new List { _constants.Null, _constants.Null };
+ var linksToCreate = 5;
+ var handlerCallCount = 0;
+
+ // Create some links first
+ for (int i = 0; i < linksToCreate; i++)
+ {
+ _storage.Create(substitution, null);
+ }
+
+ ulong handler(IList link)
+ {
+ handlerCallCount++;
+ return handlerCallCount >= 2 ? _constants.Break : _constants.Continue;
+ }
+
+ // Act
+ var result = _storage.Each(null, handler);
+
+ // Assert
+ Assert.Equal(_constants.Break, result);
+ Assert.True(handlerCallCount >= 2);
+ Assert.True(handlerCallCount <= linksToCreate);
+ }
+
+ [Fact]
+ public void Update_ExistingLink_UpdatesSuccessfully()
+ {
+ // Arrange
+ var initialSubstitution = new List { 100UL, 200UL };
+ var newSubstitution = new List { 300UL, 400UL };
+
+ var createdLink = _storage.Create(initialSubstitution, null);
+ Assert.NotEqual(_constants.Null, createdLink);
+
+ var restriction = new List { createdLink };
+
+ // Act
+ var updateResult = _storage.Update(restriction, newSubstitution, null);
+
+ // Assert
+ Assert.Equal(_constants.Continue, updateResult);
+ }
+
+ [Fact]
+ public void Delete_ExistingLink_DeletesSuccessfully()
+ {
+ // Arrange
+ var substitution = new List { _constants.Null, _constants.Null };
+ var createdLink = _storage.Create(substitution, null);
+ Assert.NotEqual(_constants.Null, createdLink);
+
+ var initialCount = _storage.Count(null);
+ var restriction = new List { createdLink };
+
+ // Act
+ var deleteResult = _storage.Delete(restriction, null);
+ var finalCount = _storage.Count(null);
+
+ // Assert
+ Assert.Equal(_constants.Continue, deleteResult);
+ Assert.Equal(initialCount - 1, finalCount);
+ }
+
+ [Fact]
+ public async Task ConcurrentOperations_MultipleThreads_HandledCorrectly()
+ {
+ // Arrange
+ var substitution = new List { _constants.Null, _constants.Null };
+ var tasks = new List>();
+ var numberOfOperations = 20;
+
+ // Act - Create multiple links concurrently
+ for (int i = 0; i < numberOfOperations; i++)
+ {
+ tasks.Add(Task.Run(() => _storage.Create(substitution, null)));
+ }
+
+ var results = await Task.WhenAll(tasks);
+
+ // Assert
+ Assert.Equal(numberOfOperations, results.Length);
+ Assert.All(results, result => Assert.NotEqual(_constants.Null, result));
+ Assert.Equal(results.Length, results.Distinct().Count()); // All results should be unique
+
+ var finalCount = _storage.Count(null);
+ Assert.Equal((ulong)numberOfOperations, finalCount);
+ }
+
+ [Fact]
+ public void StorageConfiguration_DefaultValues_AreValid()
+ {
+ // Act
+ var config = StorageConfiguration.CreateDefault();
+
+ // Assert
+ Assert.True(config.MaxSectionCapacity > 0);
+ Assert.True(config.EffectiveNumberOfSections > 0);
+ Assert.Equal(SectionAllocationMode.Heap, config.AllocationMode);
+ Assert.True(config.RequestTimeoutMs > 0);
+ Assert.True(config.TargetCpuUtilization > 0 && config.TargetCpuUtilization <= 1.0);
+
+ // Should not throw
+ config.Validate();
+ }
+
+ [Fact]
+ public void StorageConfiguration_HighThroughput_HasCorrectSettings()
+ {
+ // Act
+ var config = StorageConfiguration.CreateHighThroughput();
+
+ // Assert
+ Assert.True(config.MaxSectionCapacity > StorageConfiguration.CreateDefault().MaxSectionCapacity);
+ Assert.True(config.EffectiveNumberOfSections >= Environment.ProcessorCount);
+ Assert.Equal(SectionAllocationMode.Heap, config.AllocationMode);
+ Assert.True(config.EnableAutoExpansion);
+ Assert.True(config.EnablePerformanceMonitoring);
+
+ // Should not throw
+ config.Validate();
+ }
+
+ [Fact]
+ public void StorageConfiguration_Validation_ThrowsForInvalidValues()
+ {
+ // Arrange
+ var config = StorageConfiguration.CreateDefault();
+
+ // Act & Assert
+ config.MaxSectionCapacity = 0;
+ Assert.Throws(() => config.Validate());
+
+ config.MaxSectionCapacity = 1000;
+ config.NumberOfSections = 0;
+ Assert.Throws(() => config.Validate());
+
+ config.NumberOfSections = 2;
+ config.RequestTimeoutMs = 0;
+ Assert.Throws(() => config.Validate());
+
+ config.RequestTimeoutMs = 1000;
+ config.TargetCpuUtilization = 0;
+ Assert.Throws(() => config.Validate());
+
+ config.TargetCpuUtilization = 1.5;
+ Assert.Throws(() => config.Validate());
+ }
+
+ public void Dispose()
+ {
+ _storage?.Dispose();
+ }
+ }
+}
\ No newline at end of file
diff --git a/csharp/Platform.Data/MultithreadedStorage/IRequestQueue.cs b/csharp/Platform.Data/MultithreadedStorage/IRequestQueue.cs
new file mode 100644
index 0000000..a1ffd2b
--- /dev/null
+++ b/csharp/Platform.Data/MultithreadedStorage/IRequestQueue.cs
@@ -0,0 +1,170 @@
+using System;
+using System.Collections.Generic;
+using System.Numerics;
+using System.Threading;
+using System.Threading.Tasks;
+
+#pragma warning disable CS1591 // Missing XML comment for publicly visible type or member
+
+namespace Platform.Data.MultithreadedStorage
+{
+ ///
+ ///
+ /// Represents a lock-free queue for handling storage requests between threads.
+ ///
+ ///
+ ///
+ ///
+ /// The type of link address.
+ ///
+ ///
+ public interface IRequestQueue : IDisposable
+ where TLinkAddress : IUnsignedNumber, IComparable
+ {
+ ///
+ /// Gets the number of pending requests in the queue.
+ ///
+ ///
+ int Count { get; }
+
+ ///
+ /// Gets whether the queue is empty.
+ ///
+ ///
+ bool IsEmpty { get; }
+
+ ///
+ /// Enqueues a request for processing.
+ ///
+ ///
+ ///
+ /// The request to enqueue.
+ ///
+ ///
+ ///
+ /// True if the request was successfully enqueued, false otherwise.
+ ///
+ ///
+ bool TryEnqueue(StorageRequest request);
+
+ ///
+ /// Dequeues a request for processing.
+ ///
+ ///
+ ///
+ /// The dequeued request, if any.
+ ///
+ ///
+ ///
+ /// True if a request was successfully dequeued, false otherwise.
+ ///
+ ///
+ bool TryDequeue(out StorageRequest? request);
+
+ ///
+ /// Waits for a request to become available and dequeues it.
+ ///
+ ///
+ ///
+ /// The cancellation token.
+ ///
+ ///
+ ///
+ /// A task that completes with the dequeued request, or null if cancelled.
+ ///
+ ///
+ Task?> WaitForRequestAsync(CancellationToken cancellationToken = default);
+
+ ///
+ /// Signals that new requests are available for processing.
+ ///
+ ///
+ void SignalNewRequest();
+
+ ///
+ /// Shuts down the queue and prevents new requests from being enqueued.
+ ///
+ ///
+ void Shutdown();
+ }
+
+ ///
+ ///
+ /// Represents a lock-free queue for handling storage results between threads.
+ ///
+ ///
+ ///
+ ///
+ /// The type of link address.
+ ///
+ ///
+ public interface IResultQueue : IDisposable
+ where TLinkAddress : IUnsignedNumber, IComparable
+ {
+ ///
+ /// Gets the number of pending results in the queue.
+ ///
+ ///
+ int Count { get; }
+
+ ///
+ /// Gets whether the queue is empty.
+ ///
+ ///
+ bool IsEmpty { get; }
+
+ ///
+ /// Enqueues a result from processing.
+ ///
+ ///
+ ///
+ /// The result to enqueue.
+ ///
+ ///
+ ///
+ /// True if the result was successfully enqueued, false otherwise.
+ ///
+ ///
+ bool TryEnqueue(StorageResult result);
+
+ ///
+ /// Dequeues a result from processing.
+ ///
+ ///
+ ///
+ /// The dequeued result, if any.
+ ///
+ ///
+ ///
+ /// True if a result was successfully dequeued, false otherwise.
+ ///
+ ///
+ bool TryDequeue(out StorageResult? result);
+
+ ///
+ /// Waits for a result to become available and dequeues it.
+ ///
+ ///
+ ///
+ /// The cancellation token.
+ ///
+ ///
+ ///
+ /// A task that completes with the dequeued result, or null if cancelled.
+ ///
+ ///
+ Task?> WaitForResultAsync(CancellationToken cancellationToken = default);
+
+ ///
+ /// Signals that new results are available for processing.
+ ///
+ ///
+ void SignalNewResult();
+
+ ///
+ /// Shuts down the queue and prevents new results from being enqueued.
+ ///
+ ///
+ void Shutdown();
+ }
+}
\ No newline at end of file
diff --git a/csharp/Platform.Data/MultithreadedStorage/IStorageSection.cs b/csharp/Platform.Data/MultithreadedStorage/IStorageSection.cs
new file mode 100644
index 0000000..f9d0826
--- /dev/null
+++ b/csharp/Platform.Data/MultithreadedStorage/IStorageSection.cs
@@ -0,0 +1,151 @@
+using System;
+using System.Collections.Generic;
+using System.Numerics;
+using Platform.Delegates;
+
+#pragma warning disable CS1591 // Missing XML comment for publicly visible type or member
+
+namespace Platform.Data.MultithreadedStorage
+{
+ ///
+ ///
+ /// Represents a single section of links storage that can be managed by a dedicated thread.
+ ///
+ ///
+ ///
+ ///
+ /// The type of link address.
+ ///
+ ///
+ public interface IStorageSection : IDisposable
+ where TLinkAddress : IUnsignedNumber, IComparable
+ {
+ ///
+ /// Gets the range of link addresses managed by this section.
+ ///
+ ///
+ (TLinkAddress Min, TLinkAddress Max) AddressRange { get; }
+
+ ///
+ /// Gets the thread ID that manages this section.
+ ///
+ ///
+ int ThreadId { get; }
+
+ ///
+ /// Gets the maximum capacity of this section.
+ ///
+ ///
+ TLinkAddress Capacity { get; }
+
+ ///
+ /// Gets the current number of links in this section.
+ ///
+ ///
+ TLinkAddress Count { get; }
+
+ ///
+ /// Determines if a link address belongs to this section.
+ ///
+ ///
+ ///
+ /// The link address to check.
+ ///
+ ///
+ ///
+ /// True if the address belongs to this section, false otherwise.
+ ///
+ ///
+ bool ContainsAddress(TLinkAddress address);
+
+ ///
+ /// Counts links that match the specified restriction within this section.
+ ///
+ ///
+ ///
+ /// The restriction to apply.
+ ///
+ ///
+ ///
+ /// The number of matching links in this section.
+ ///
+ ///
+ TLinkAddress CountLinks(IList? restriction);
+
+ ///
+ /// Iterates through links that match the specified restriction within this section.
+ ///
+ ///
+ ///
+ /// The restriction to apply.
+ ///
+ ///
+ ///
+ /// The handler to call for each matching link.
+ ///
+ ///
+ ///
+ /// The result of the iteration.
+ ///
+ ///
+ TLinkAddress EachLink(IList? restriction, ReadHandler? handler);
+
+ ///
+ /// Creates a new link in this section.
+ ///
+ ///
+ ///
+ /// The content of the new link.
+ ///
+ ///
+ ///
+ /// The handler to call for write operations.
+ ///
+ ///
+ ///
+ /// The result of the create operation.
+ ///
+ ///
+ TLinkAddress CreateLink(IList? substitution, WriteHandler? handler);
+
+ ///
+ /// Updates a link in this section.
+ ///
+ ///
+ ///
+ /// The restriction to identify the link to update.
+ ///
+ ///
+ ///
+ /// The new content for the link.
+ ///
+ ///
+ ///
+ /// The handler to call for write operations.
+ ///
+ ///
+ ///
+ /// The result of the update operation.
+ ///
+ ///
+ TLinkAddress UpdateLink(IList? restriction, IList? substitution, WriteHandler? handler);
+
+ ///
+ /// Deletes links in this section.
+ ///
+ ///
+ ///
+ /// The restriction to identify links to delete.
+ ///
+ ///
+ ///
+ /// The handler to call for write operations.
+ ///
+ ///
+ ///
+ /// The result of the delete operation.
+ ///
+ ///
+ TLinkAddress DeleteLink(IList? restriction, WriteHandler? handler);
+ }
+}
\ No newline at end of file
diff --git a/csharp/Platform.Data/MultithreadedStorage/InMemoryStorageSection.cs b/csharp/Platform.Data/MultithreadedStorage/InMemoryStorageSection.cs
new file mode 100644
index 0000000..433b871
--- /dev/null
+++ b/csharp/Platform.Data/MultithreadedStorage/InMemoryStorageSection.cs
@@ -0,0 +1,333 @@
+using System;
+using System.Collections.Generic;
+using System.Collections.Concurrent;
+using System.Linq;
+using System.Numerics;
+using System.Runtime.CompilerServices;
+using Platform.Delegates;
+
+#pragma warning disable CS1591 // Missing XML comment for publicly visible type or member
+
+namespace Platform.Data.MultithreadedStorage
+{
+ ///
+ ///
+ /// In-memory implementation of a storage section that manages a specific range of link addresses.
+ /// Each section operates independently and can be managed by a dedicated thread.
+ ///
+ ///
+ ///
+ ///
+ /// The type of link address.
+ ///
+ ///
+ public class InMemoryStorageSection : IStorageSection
+ where TLinkAddress : IUnsignedNumber, IComparable
+ {
+ private readonly LinksConstants _constants;
+ private readonly ConcurrentDictionary _links;
+ private readonly (TLinkAddress Min, TLinkAddress Max) _addressRange;
+ private readonly int _threadId;
+ private readonly TLinkAddress _capacity;
+ private TLinkAddress _nextAvailableAddress;
+ private volatile bool _disposed;
+
+ ///
+ /// Gets the range of link addresses managed by this section.
+ ///
+ ///
+ public (TLinkAddress Min, TLinkAddress Max) AddressRange => _addressRange;
+
+ ///
+ /// Gets the thread ID that manages this section.
+ ///
+ ///
+ public int ThreadId => _threadId;
+
+ ///
+ /// Gets the maximum capacity of this section.
+ ///
+ ///
+ public TLinkAddress Capacity => _capacity;
+
+ ///
+ /// Gets the current number of links in this section.
+ ///
+ ///
+ public TLinkAddress Count => TLinkAddress.CreateTruncating((ulong)_links.Count);
+
+ ///
+ ///
+ /// Initializes a new instance of the class.
+ ///
+ ///
+ ///
+ ///
+ /// The thread ID that will manage this section.
+ ///
+ ///
+ ///
+ /// The range of addresses this section manages.
+ ///
+ ///
+ ///
+ /// The constants for link operations.
+ ///
+ ///
+ public InMemoryStorageSection(int threadId, (TLinkAddress Min, TLinkAddress Max) addressRange, LinksConstants constants)
+ {
+ _threadId = threadId;
+ _addressRange = addressRange;
+ _constants = constants ?? throw new ArgumentNullException(nameof(constants));
+ _capacity = addressRange.Max - addressRange.Min + TLinkAddress.One;
+ _nextAvailableAddress = addressRange.Min;
+ _links = new ConcurrentDictionary();
+ _disposed = false;
+ }
+
+ ///
+ /// Determines if a link address belongs to this section.
+ ///
+ ///
+ ///
+ /// The link address to check.
+ ///
+ ///
+ ///
+ /// True if the address belongs to this section, false otherwise.
+ ///
+ ///
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ public bool ContainsAddress(TLinkAddress address)
+ {
+ return address.CompareTo(_addressRange.Min) >= 0 && address.CompareTo(_addressRange.Max) <= 0;
+ }
+
+ ///
+ /// Counts links that match the specified restriction within this section.
+ ///
+ ///
+ ///
+ /// The restriction to apply.
+ ///
+ ///
+ ///
+ /// The number of matching links in this section.
+ ///
+ ///
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ public TLinkAddress CountLinks(IList? restriction)
+ {
+ if (_disposed)
+ return TLinkAddress.Zero;
+
+ if (restriction == null || restriction.Count == 0)
+ return Count;
+
+ var count = TLinkAddress.Zero;
+ foreach (var link in _links.Values)
+ {
+ if (MatchesRestriction(link, restriction))
+ count++;
+ }
+ return count;
+ }
+
+ ///
+ /// Iterates through links that match the specified restriction within this section.
+ ///
+ ///
+ ///
+ /// The restriction to apply.
+ ///
+ ///
+ ///
+ /// The handler to call for each matching link.
+ ///
+ ///
+ ///
+ /// The result of the iteration.
+ ///
+ ///
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ public TLinkAddress EachLink(IList? restriction, ReadHandler? handler)
+ {
+ if (_disposed || handler == null)
+ return _constants.Continue;
+
+ foreach (var kvp in _links)
+ {
+ var link = kvp.Value;
+ if (restriction == null || restriction.Count == 0 || MatchesRestriction(link, restriction))
+ {
+ var result = handler(link);
+ if (EqualityComparer.Default.Equals(result, _constants.Break))
+ return _constants.Break;
+ }
+ }
+ return _constants.Continue;
+ }
+
+ ///
+ /// Creates a new link in this section.
+ ///
+ ///
+ ///
+ /// The content of the new link.
+ ///
+ ///
+ ///
+ /// The handler to call for write operations.
+ ///
+ ///
+ ///
+ /// The result of the create operation.
+ ///
+ ///
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ public TLinkAddress CreateLink(IList? substitution, WriteHandler? handler)
+ {
+ if (_disposed)
+ return _constants.Null;
+
+ if (_nextAvailableAddress.CompareTo(_addressRange.Max) > 0)
+ return _constants.Null; // Section is full
+
+ var newAddress = _nextAvailableAddress++;
+ var linkContent = CreateLinkContent(newAddress, substitution);
+
+ if (_links.TryAdd(newAddress, linkContent))
+ {
+ handler?.Invoke(Array.Empty(), linkContent);
+ return newAddress;
+ }
+
+ return _constants.Null;
+ }
+
+ ///
+ /// Updates a link in this section.
+ ///
+ ///
+ ///
+ /// The restriction to identify the link to update.
+ ///
+ ///
+ ///
+ /// The new content for the link.
+ ///
+ ///
+ ///
+ /// The handler to call for write operations.
+ ///
+ ///
+ ///
+ /// The result of the update operation.
+ ///
+ ///
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ public TLinkAddress UpdateLink(IList? restriction, IList? substitution, WriteHandler? handler)
+ {
+ if (_disposed || restriction == null || restriction.Count == 0)
+ return _constants.Continue;
+
+ var linkAddress = restriction[_constants.IndexPart];
+ if (!ContainsAddress(linkAddress))
+ return _constants.Continue;
+
+ if (_links.TryGetValue(linkAddress, out var oldLink))
+ {
+ var newLinkContent = CreateLinkContent(linkAddress, substitution);
+ if (_links.TryUpdate(linkAddress, newLinkContent, oldLink))
+ {
+ handler?.Invoke(oldLink, newLinkContent);
+ return _constants.Continue;
+ }
+ }
+
+ return _constants.Continue;
+ }
+
+ ///
+ /// Deletes links in this section.
+ ///
+ ///
+ ///
+ /// The restriction to identify links to delete.
+ ///
+ ///
+ ///
+ /// The handler to call for write operations.
+ ///
+ ///
+ ///
+ /// The result of the delete operation.
+ ///
+ ///
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ public TLinkAddress DeleteLink(IList? restriction, WriteHandler? handler)
+ {
+ if (_disposed || restriction == null || restriction.Count == 0)
+ return _constants.Continue;
+
+ var linkAddress = restriction[_constants.IndexPart];
+ if (!ContainsAddress(linkAddress))
+ return _constants.Continue;
+
+ if (_links.TryRemove(linkAddress, out var deletedLink))
+ {
+ var emptyLink = new TLinkAddress[deletedLink.Length];
+ handler?.Invoke(deletedLink, emptyLink);
+ return _constants.Continue;
+ }
+
+ return _constants.Continue;
+ }
+
+ private TLinkAddress[] CreateLinkContent(TLinkAddress address, IList? substitution)
+ {
+ if (substitution == null || substitution.Count == 0)
+ {
+ // Create a basic triplet: [index, null, null]
+ return new TLinkAddress[] { address, _constants.Null, _constants.Null };
+ }
+
+ var content = new TLinkAddress[Math.Max(3, substitution.Count + 1)];
+ content[_constants.IndexPart] = address;
+
+ for (int i = 0; i < substitution.Count; i++)
+ {
+ content[i + 1] = substitution[i];
+ }
+
+ return content;
+ }
+
+ private bool MatchesRestriction(TLinkAddress[] link, IList restriction)
+ {
+ for (int i = 0; i < Math.Min(link.Length, restriction.Count); i++)
+ {
+ var restrictionValue = restriction[i];
+ if (!EqualityComparer.Default.Equals(restrictionValue, _constants.Any) &&
+ !EqualityComparer.Default.Equals(restrictionValue, link[i]))
+ {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ ///
+ /// Disposes the section and releases its resources.
+ ///
+ ///
+ public void Dispose()
+ {
+ if (_disposed)
+ return;
+
+ _disposed = true;
+ _links.Clear();
+ }
+ }
+}
\ No newline at end of file
diff --git a/csharp/Platform.Data/MultithreadedStorage/LockFreeQueue.cs b/csharp/Platform.Data/MultithreadedStorage/LockFreeQueue.cs
new file mode 100644
index 0000000..99c64f5
--- /dev/null
+++ b/csharp/Platform.Data/MultithreadedStorage/LockFreeQueue.cs
@@ -0,0 +1,302 @@
+using System;
+using System.Collections.Concurrent;
+using System.Numerics;
+using System.Threading;
+using System.Threading.Tasks;
+
+#pragma warning disable CS1591 // Missing XML comment for publicly visible type or member
+
+namespace Platform.Data.MultithreadedStorage
+{
+ ///
+ ///
+ /// Lock-free implementation of request queue using ConcurrentQueue.
+ ///
+ ///
+ ///
+ ///
+ /// The type of link address.
+ ///
+ ///
+ public class LockFreeRequestQueue : IRequestQueue
+ where TLinkAddress : IUnsignedNumber, IComparable
+ {
+ private readonly ConcurrentQueue> _queue;
+ private readonly SemaphoreSlim _semaphore;
+ private volatile bool _isShutdown;
+
+ ///
+ /// Gets the number of pending requests in the queue.
+ ///
+ ///
+ public int Count => _queue.Count;
+
+ ///
+ /// Gets whether the queue is empty.
+ ///
+ ///
+ public bool IsEmpty => _queue.IsEmpty;
+
+ ///
+ ///
+ /// Initializes a new instance of the class.
+ ///
+ ///
+ ///
+ public LockFreeRequestQueue()
+ {
+ _queue = new ConcurrentQueue>();
+ _semaphore = new SemaphoreSlim(0);
+ _isShutdown = false;
+ }
+
+ ///
+ /// Enqueues a request for processing.
+ ///
+ ///
+ ///
+ /// The request to enqueue.
+ ///
+ ///
+ ///
+ /// True if the request was successfully enqueued, false otherwise.
+ ///
+ ///
+ public bool TryEnqueue(StorageRequest request)
+ {
+ if (_isShutdown)
+ return false;
+
+ _queue.Enqueue(request);
+ _semaphore.Release();
+ return true;
+ }
+
+ ///
+ /// Dequeues a request for processing.
+ ///
+ ///
+ ///
+ /// The dequeued request, if any.
+ ///
+ ///
+ ///
+ /// True if a request was successfully dequeued, false otherwise.
+ ///
+ ///
+ public bool TryDequeue(out StorageRequest? request)
+ {
+ return _queue.TryDequeue(out request);
+ }
+
+ ///
+ /// Waits for a request to become available and dequeues it.
+ ///
+ ///
+ ///
+ /// The cancellation token.
+ ///
+ ///
+ ///
+ /// A task that completes with the dequeued request, or null if cancelled.
+ ///
+ ///
+ public async Task?> WaitForRequestAsync(CancellationToken cancellationToken = default)
+ {
+ if (_isShutdown)
+ return null;
+
+ try
+ {
+ await _semaphore.WaitAsync(cancellationToken);
+ if (_isShutdown || cancellationToken.IsCancellationRequested)
+ return null;
+
+ if (TryDequeue(out var request))
+ return request;
+
+ return null;
+ }
+ catch (OperationCanceledException)
+ {
+ return null;
+ }
+ }
+
+ ///
+ /// Signals that new requests are available for processing.
+ ///
+ ///
+ public void SignalNewRequest()
+ {
+ if (!_isShutdown)
+ _semaphore.Release();
+ }
+
+ ///
+ /// Shuts down the queue and prevents new requests from being enqueued.
+ ///
+ ///
+ public void Shutdown()
+ {
+ _isShutdown = true;
+ _semaphore.Release(100); // Release enough to wake up all waiting threads
+ }
+
+ ///
+ /// Disposes the queue resources.
+ ///
+ ///
+ public void Dispose()
+ {
+ Shutdown();
+ _semaphore.Dispose();
+ }
+ }
+
+ ///
+ ///
+ /// Lock-free implementation of result queue using ConcurrentQueue.
+ ///
+ ///
+ ///
+ ///
+ /// The type of link address.
+ ///
+ ///
+ public class LockFreeResultQueue : IResultQueue
+ where TLinkAddress : IUnsignedNumber, IComparable
+ {
+ private readonly ConcurrentQueue> _queue;
+ private readonly SemaphoreSlim _semaphore;
+ private volatile bool _isShutdown;
+
+ ///
+ /// Gets the number of pending results in the queue.
+ ///
+ ///
+ public int Count => _queue.Count;
+
+ ///
+ /// Gets whether the queue is empty.
+ ///
+ ///
+ public bool IsEmpty => _queue.IsEmpty;
+
+ ///
+ ///
+ /// Initializes a new instance of the class.
+ ///
+ ///
+ ///
+ public LockFreeResultQueue()
+ {
+ _queue = new ConcurrentQueue>();
+ _semaphore = new SemaphoreSlim(0);
+ _isShutdown = false;
+ }
+
+ ///
+ /// Enqueues a result from processing.
+ ///
+ ///
+ ///
+ /// The result to enqueue.
+ ///
+ ///
+ ///
+ /// True if the result was successfully enqueued, false otherwise.
+ ///
+ ///
+ public bool TryEnqueue(StorageResult result)
+ {
+ if (_isShutdown)
+ return false;
+
+ _queue.Enqueue(result);
+ _semaphore.Release();
+ return true;
+ }
+
+ ///
+ /// Dequeues a result from processing.
+ ///
+ ///
+ ///
+ /// The dequeued result, if any.
+ ///
+ ///
+ ///
+ /// True if a result was successfully dequeued, false otherwise.
+ ///
+ ///
+ public bool TryDequeue(out StorageResult? result)
+ {
+ return _queue.TryDequeue(out result);
+ }
+
+ ///
+ /// Waits for a result to become available and dequeues it.
+ ///
+ ///
+ ///
+ /// The cancellation token.
+ ///
+ ///
+ ///
+ /// A task that completes with the dequeued result, or null if cancelled.
+ ///
+ ///
+ public async Task?> WaitForResultAsync(CancellationToken cancellationToken = default)
+ {
+ if (_isShutdown)
+ return null;
+
+ try
+ {
+ await _semaphore.WaitAsync(cancellationToken);
+ if (_isShutdown || cancellationToken.IsCancellationRequested)
+ return null;
+
+ if (TryDequeue(out var result))
+ return result;
+
+ return null;
+ }
+ catch (OperationCanceledException)
+ {
+ return null;
+ }
+ }
+
+ ///
+ /// Signals that new results are available for processing.
+ ///
+ ///
+ public void SignalNewResult()
+ {
+ if (!_isShutdown)
+ _semaphore.Release();
+ }
+
+ ///
+ /// Shuts down the queue and prevents new results from being enqueued.
+ ///
+ ///
+ public void Shutdown()
+ {
+ _isShutdown = true;
+ _semaphore.Release(100); // Release enough to wake up all waiting threads
+ }
+
+ ///
+ /// Disposes the queue resources.
+ ///
+ ///
+ public void Dispose()
+ {
+ Shutdown();
+ _semaphore.Dispose();
+ }
+ }
+}
\ No newline at end of file
diff --git a/csharp/Platform.Data/MultithreadedStorage/MapReduceCombinedLinksStorage.cs b/csharp/Platform.Data/MultithreadedStorage/MapReduceCombinedLinksStorage.cs
new file mode 100644
index 0000000..f3d1cc0
--- /dev/null
+++ b/csharp/Platform.Data/MultithreadedStorage/MapReduceCombinedLinksStorage.cs
@@ -0,0 +1,426 @@
+using System;
+using System.Collections.Concurrent;
+using System.Collections.Generic;
+using System.Linq;
+using System.Numerics;
+using System.Runtime.CompilerServices;
+using System.Threading;
+using System.Threading.Tasks;
+using Platform.Delegates;
+using Platform.Ranges;
+
+#pragma warning disable CS1591 // Missing XML comment for publicly visible type or member
+
+namespace Platform.Data.MultithreadedStorage
+{
+ ///
+ ///
+ /// MapReduce multithread combined storage implementation that distributes links across multiple sections,
+ /// each managed by a dedicated thread. Implements lock-free queues for request/response streaming.
+ ///
+ ///
+ ///
+ ///
+ /// The type of link address.
+ ///
+ ///
+ ///
+ /// The type of constants.
+ ///
+ ///
+ public class MapReduceCombinedLinksStorage : ILinks, IDisposable
+ where TLinkAddress : IUnsignedNumber, IComparable
+ where TConstants : LinksConstants
+ {
+ private readonly TConstants _constants;
+ private readonly List> _sections;
+ private readonly List> _requestQueues;
+ private readonly IResultQueue _resultQueue;
+ private readonly List _sectionTasks;
+ private readonly CancellationTokenSource _cancellationTokenSource;
+ private readonly int _maxSectionCapacity;
+ private readonly int _numberOfSections;
+ private readonly object _sectionLock;
+ private volatile bool _disposed;
+
+ ///
+ /// Gets the constants for this storage instance.
+ ///
+ ///
+ public TConstants Constants
+ {
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ get => _constants;
+ }
+
+ ///
+ /// Gets the number of active sections.
+ ///
+ ///
+ public int SectionCount => _sections.Count;
+
+ ///
+ /// Gets the maximum capacity per section.
+ ///
+ ///
+ public int MaxSectionCapacity => _maxSectionCapacity;
+
+ ///
+ ///
+ /// Initializes a new instance of the class.
+ ///
+ ///
+ ///
+ ///
+ /// The constants for this storage instance.
+ ///
+ ///
+ ///
+ /// The configuration for this storage instance.
+ ///
+ ///
+ public MapReduceCombinedLinksStorage(TConstants constants, StorageConfiguration? configuration = null)
+ {
+ _constants = constants ?? throw new ArgumentNullException(nameof(constants));
+ var config = configuration ?? StorageConfiguration.CreateDefault();
+ config.Validate();
+ _maxSectionCapacity = config.MaxSectionCapacity;
+ _numberOfSections = config.EffectiveNumberOfSections;
+ _sections = new List>();
+ _requestQueues = new List>();
+ _resultQueue = new LockFreeResultQueue();
+ _sectionTasks = new List();
+ _cancellationTokenSource = new CancellationTokenSource();
+ _sectionLock = new object();
+ _disposed = false;
+
+ InitializeSections();
+ }
+
+ private void InitializeSections()
+ {
+ for (int i = 0; i < _numberOfSections; i++)
+ {
+ CreateNewSection();
+ }
+ }
+
+ private void CreateNewSection()
+ {
+ var sectionId = _sections.Count;
+ var addressRangeStart = TLinkAddress.CreateTruncating((ulong)sectionId * (ulong)_maxSectionCapacity + 1);
+ var addressRangeEnd = TLinkAddress.CreateTruncating((ulong)(sectionId + 1) * (ulong)_maxSectionCapacity);
+
+ // Create a basic in-memory section implementation
+ var section = new InMemoryStorageSection(sectionId, (addressRangeStart, addressRangeEnd), _constants);
+ var requestQueue = new LockFreeRequestQueue();
+
+ _sections.Add(section);
+ _requestQueues.Add(requestQueue);
+
+ // Start section processing task
+ var task = Task.Run(() => ProcessSectionRequests(section, requestQueue, _cancellationTokenSource.Token));
+ _sectionTasks.Add(task);
+ }
+
+ private async Task ProcessSectionRequests(IStorageSection section, IRequestQueue requestQueue, CancellationToken cancellationToken)
+ {
+ while (!cancellationToken.IsCancellationRequested)
+ {
+ try
+ {
+ var request = await requestQueue.WaitForRequestAsync(cancellationToken);
+ if (request == null)
+ continue;
+
+ var result = ProcessRequest(section, request);
+ _resultQueue.TryEnqueue(result);
+ }
+ catch (OperationCanceledException)
+ {
+ break;
+ }
+ catch (Exception ex)
+ {
+ // Log error and continue processing
+ Console.WriteLine($"Error processing request in section {section.ThreadId}: {ex.Message}");
+ }
+ }
+ }
+
+ private StorageResult ProcessRequest(IStorageSection section, StorageRequest request)
+ {
+ try
+ {
+ TLinkAddress result = request.OperationType switch
+ {
+ StorageOperationType.Count => section.CountLinks(request.Restriction),
+ StorageOperationType.Each => section.EachLink(request.Restriction, request.ReadHandler),
+ StorageOperationType.Create => section.CreateLink(request.Substitution, request.WriteHandler),
+ StorageOperationType.Update => section.UpdateLink(request.Restriction, request.Substitution, request.WriteHandler),
+ StorageOperationType.Delete => section.DeleteLink(request.Restriction, request.WriteHandler),
+ _ => throw new ArgumentException($"Unknown operation type: {request.OperationType}")
+ };
+
+ return new StorageResult(request.Id, section.ThreadId, result);
+ }
+ catch (Exception ex)
+ {
+ return new StorageResult(request.Id, section.ThreadId, default, ex);
+ }
+ }
+
+ ///
+ /// Maps a request to all relevant sections and reduces the results.
+ ///
+ ///
+ ///
+ /// The request to process.
+ ///
+ ///
+ ///
+ /// The reduced result from all sections.
+ ///
+ ///
+ private async Task MapReduceRequest(StorageRequest request)
+ {
+ var relevantSections = GetRelevantSections(request);
+ var tasks = new List>();
+
+ // Map: send request to all relevant sections
+ foreach (var (section, queue) in relevantSections)
+ {
+ var sectionRequest = new StorageRequest(
+ request.OperationType,
+ request.Restriction,
+ request.Substitution,
+ request.ReadHandler,
+ request.WriteHandler);
+
+ queue.TryEnqueue(sectionRequest);
+ tasks.Add(sectionRequest.CompletionSource.Task);
+ }
+
+ // Wait for results from result queue
+ var results = new List>();
+ var expectedResults = tasks.Count;
+ var resultTasks = tasks.Select(async task =>
+ {
+ while (true)
+ {
+ var result = await _resultQueue.WaitForResultAsync(_cancellationTokenSource.Token);
+ if (result != null)
+ {
+ results.Add(result);
+
+ // Find and complete the corresponding task
+ var matchingTask = tasks.FirstOrDefault(t => !t.IsCompleted);
+ if (matchingTask != null && result.IsSuccess)
+ {
+ try
+ {
+ ((TaskCompletionSource)matchingTask.AsyncState!)?.SetResult(result.Value);
+ }
+ catch { }
+ }
+
+ if (results.Count >= expectedResults)
+ break;
+ }
+ }
+ });
+
+ await Task.WhenAll(resultTasks);
+
+ // Reduce: combine results based on operation type
+ return ReduceResults(request.OperationType, results);
+ }
+
+ private List<(IStorageSection section, IRequestQueue queue)> GetRelevantSections(StorageRequest request)
+ {
+ var relevantSections = new List<(IStorageSection, IRequestQueue)>();
+
+ // For now, send to all sections. In a more sophisticated implementation,
+ // we would analyze the restriction to determine which sections are relevant
+ for (int i = 0; i < _sections.Count; i++)
+ {
+ relevantSections.Add((_sections[i], _requestQueues[i]));
+ }
+
+ return relevantSections;
+ }
+
+ private TLinkAddress ReduceResults(StorageOperationType operationType, List> results)
+ {
+ var successfulResults = results.Where(r => r.IsSuccess).ToList();
+
+ return operationType switch
+ {
+ StorageOperationType.Count => successfulResults.Aggregate(TLinkAddress.Zero, (sum, result) => sum + result.Value),
+ StorageOperationType.Each => successfulResults.LastOrDefault()?.Value ?? _constants.Continue,
+ StorageOperationType.Create => successfulResults.FirstOrDefault()?.Value ?? _constants.Null,
+ StorageOperationType.Update => successfulResults.FirstOrDefault()?.Value ?? _constants.Continue,
+ StorageOperationType.Delete => successfulResults.FirstOrDefault()?.Value ?? _constants.Continue,
+ _ => throw new ArgumentException($"Unknown operation type: {operationType}")
+ };
+ }
+
+ #region ILinks Implementation
+
+ ///
+ /// Counts and returns the total number of links in the storage that meet the specified restriction.
+ ///
+ ///
+ ///
+ /// Restriction on the contents of links.
+ ///
+ ///
+ ///
+ /// The total number of links in the storage that meet the specified restriction.
+ ///
+ ///
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ public TLinkAddress Count(IList? restriction)
+ {
+ var request = new StorageRequest(StorageOperationType.Count, restriction);
+ return MapReduceRequest(request).GetAwaiter().GetResult();
+ }
+
+ ///
+ /// Passes through all the links matching the pattern, invoking a handler for each matching link.
+ ///
+ ///
+ ///
+ /// Restriction on the contents of links.
+ ///
+ ///
+ ///
+ /// A handler for each matching link.
+ ///
+ ///
+ ///
+ /// Constants.Continue, if the pass through the links was not interrupted, and Constants.Break otherwise.
+ ///
+ ///
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ public TLinkAddress Each(IList? restriction, ReadHandler? handler)
+ {
+ var request = new StorageRequest(StorageOperationType.Each, restriction, readHandler: handler);
+ return MapReduceRequest(request).GetAwaiter().GetResult();
+ }
+
+ ///
+ /// Creates a link.
+ ///
+ ///
+ ///
+ /// The content of a new link.
+ ///
+ ///
+ ///
+ /// A function to handle each executed change.
+ ///
+ ///
+ ///
+ /// Constants.Continue if all executed changes are handled.
+ ///
+ ///
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ public TLinkAddress Create(IList? substitution, WriteHandler? handler)
+ {
+ var request = new StorageRequest(StorageOperationType.Create, substitution: substitution, writeHandler: handler);
+ return MapReduceRequest(request).GetAwaiter().GetResult();
+ }
+
+ ///
+ /// Updates a link.
+ ///
+ ///
+ ///
+ /// Restriction on the content of links.
+ ///
+ ///
+ ///
+ /// The new content for the link.
+ ///
+ ///
+ ///
+ /// A function to handle each executed change.
+ ///
+ ///
+ ///
+ /// Constants.Continue if all executed changes are handled.
+ ///
+ ///
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ public TLinkAddress Update(IList? restriction, IList? substitution, WriteHandler? handler)
+ {
+ var request = new StorageRequest(StorageOperationType.Update, restriction, substitution, writeHandler: handler);
+ return MapReduceRequest(request).GetAwaiter().GetResult();
+ }
+
+ ///
+ /// Deletes links that match the specified restriction.
+ ///
+ ///
+ ///
+ /// Restriction on the content of a link.
+ ///
+ ///
+ ///
+ /// A function to handle each executed change.
+ ///
+ ///
+ ///
+ /// Constants.Continue if all executed changes are handled.
+ ///
+ ///
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ public TLinkAddress Delete(IList? restriction, WriteHandler? handler)
+ {
+ var request = new StorageRequest(StorageOperationType.Delete, restriction, writeHandler: handler);
+ return MapReduceRequest(request).GetAwaiter().GetResult();
+ }
+
+ #endregion
+
+ #region IDisposable Implementation
+
+ ///
+ /// Disposes the storage and all its resources.
+ ///
+ ///
+ public void Dispose()
+ {
+ if (_disposed)
+ return;
+
+ _disposed = true;
+ _cancellationTokenSource.Cancel();
+
+ // Shutdown all queues
+ foreach (var queue in _requestQueues)
+ {
+ queue.Shutdown();
+ }
+ _resultQueue.Shutdown();
+
+ // Wait for all tasks to complete
+ Task.WaitAll(_sectionTasks.ToArray(), TimeSpan.FromSeconds(5));
+
+ // Dispose all resources
+ foreach (var section in _sections)
+ {
+ section.Dispose();
+ }
+ foreach (var queue in _requestQueues)
+ {
+ queue.Dispose();
+ }
+ _resultQueue.Dispose();
+ _cancellationTokenSource.Dispose();
+ }
+
+ #endregion
+ }
+}
\ No newline at end of file
diff --git a/csharp/Platform.Data/MultithreadedStorage/StorageConfiguration.cs b/csharp/Platform.Data/MultithreadedStorage/StorageConfiguration.cs
new file mode 100644
index 0000000..9e3fe64
--- /dev/null
+++ b/csharp/Platform.Data/MultithreadedStorage/StorageConfiguration.cs
@@ -0,0 +1,247 @@
+using System;
+using System.Collections.Generic;
+using System.Numerics;
+using Platform.Ranges;
+
+#pragma warning disable CS1591 // Missing XML comment for publicly visible type or member
+
+namespace Platform.Data.MultithreadedStorage
+{
+ ///
+ ///
+ /// Specifies the type of memory allocation for storage sections.
+ ///
+ ///
+ ///
+ public enum SectionAllocationMode
+ {
+ ///
+ /// Allocate sections in heap memory blocks.
+ ///
+ ///
+ Heap,
+
+ ///
+ /// Allocate sections using memory-mapped files (mmap).
+ ///
+ ///
+ MemoryMapped,
+
+ ///
+ /// Allocate sections in separate files.
+ ///
+ ///
+ SeparateFiles
+ }
+
+ ///
+ ///
+ /// Configuration options for the MapReduce combined links storage.
+ ///
+ ///
+ ///
+ ///
+ /// The type of link address.
+ ///
+ ///
+ public class StorageConfiguration
+ where TLinkAddress : IUnsignedNumber, IComparable
+ {
+ ///
+ /// Gets or sets the maximum capacity per section. Default is 1MB worth of links.
+ ///
+ ///
+ public int MaxSectionCapacity { get; set; } = 1024 * 1024;
+
+ ///
+ /// Gets or sets the number of sections to create initially. If null, uses CPU core count + 1.
+ ///
+ ///
+ public int? NumberOfSections { get; set; }
+
+ ///
+ /// Gets or sets the allocation mode for storage sections.
+ ///
+ ///
+ public SectionAllocationMode AllocationMode { get; set; } = SectionAllocationMode.Heap;
+
+ ///
+ /// Gets or sets the base directory for file-based allocations.
+ ///
+ ///
+ public string? BaseDirectory { get; set; }
+
+ ///
+ /// Gets or sets the minimum internal references range override.
+ /// When set, overrides the default internal references range minimum value.
+ ///
+ ///
+ public TLinkAddress MinInternalReference { get; set; }
+
+ ///
+ /// Gets or sets the maximum internal references range override.
+ /// When set, overrides the default internal references range maximum value.
+ ///
+ ///
+ public TLinkAddress MaxInternalReference { get; set; }
+
+ ///
+ /// Gets or sets whether to enable external references support.
+ ///
+ ///
+ public bool EnableExternalReferencesSupport { get; set; } = false;
+
+ ///
+ /// Gets or sets the timeout for request processing in milliseconds.
+ ///
+ ///
+ public int RequestTimeoutMs { get; set; } = 30000; // 30 seconds
+
+ ///
+ /// Gets or sets whether to enable automatic section expansion when capacity is reached.
+ ///
+ ///
+ public bool EnableAutoExpansion { get; set; } = true;
+
+ ///
+ /// Gets or sets the target CPU utilization percentage for determining optimal thread count.
+ ///
+ ///
+ public double TargetCpuUtilization { get; set; } = 0.8; // 80%
+
+ ///
+ /// Gets or sets whether to enable performance monitoring and statistics collection.
+ ///
+ ///
+ public bool EnablePerformanceMonitoring { get; set; } = false;
+
+ ///
+ ///
+ /// Gets the effective number of sections to use.
+ ///
+ ///
+ ///
+ public int EffectiveNumberOfSections => NumberOfSections ?? Environment.ProcessorCount + 1;
+
+ ///
+ ///
+ /// Gets the internal references range based on configuration.
+ ///
+ ///
+ ///
+ ///
+ /// The links constants to use as a base.
+ ///
+ ///
+ ///
+ /// The configured internal references range.
+ ///
+ ///
+ public Range GetInternalReferencesRange(LinksConstants constants)
+ {
+ var min = EqualityComparer.Default.Equals(MinInternalReference, default) ? constants.InternalReferencesRange.Minimum : MinInternalReference;
+ var max = EqualityComparer.Default.Equals(MaxInternalReference, default) ? constants.InternalReferencesRange.Maximum : MaxInternalReference;
+ return new Range(min, max);
+ }
+
+ ///
+ ///
+ /// Validates the configuration and throws an exception if invalid.
+ ///
+ ///
+ ///
+ public void Validate()
+ {
+ if (MaxSectionCapacity <= 0)
+ throw new ArgumentException("MaxSectionCapacity must be positive", nameof(MaxSectionCapacity));
+
+ if (NumberOfSections.HasValue && NumberOfSections.Value <= 0)
+ throw new ArgumentException("NumberOfSections must be positive when specified", nameof(NumberOfSections));
+
+ if (RequestTimeoutMs <= 0)
+ throw new ArgumentException("RequestTimeoutMs must be positive", nameof(RequestTimeoutMs));
+
+ if (TargetCpuUtilization <= 0 || TargetCpuUtilization > 1.0)
+ throw new ArgumentException("TargetCpuUtilization must be between 0 and 1", nameof(TargetCpuUtilization));
+
+ if ((AllocationMode == SectionAllocationMode.MemoryMapped || AllocationMode == SectionAllocationMode.SeparateFiles)
+ && string.IsNullOrEmpty(BaseDirectory))
+ {
+ throw new ArgumentException("BaseDirectory must be specified for file-based allocation modes", nameof(BaseDirectory));
+ }
+
+ if (!EqualityComparer.Default.Equals(MinInternalReference, default) &&
+ !EqualityComparer.Default.Equals(MaxInternalReference, default) &&
+ MinInternalReference.CompareTo(MaxInternalReference) >= 0)
+ {
+ throw new ArgumentException("MinInternalReference must be less than MaxInternalReference", nameof(MinInternalReference));
+ }
+ }
+
+ ///
+ ///
+ /// Creates a default configuration instance.
+ ///
+ ///
+ ///
+ ///
+ /// A new configuration instance with default values.
+ ///
+ ///
+ public static StorageConfiguration CreateDefault()
+ {
+ return new StorageConfiguration();
+ }
+
+ ///
+ ///
+ /// Creates a configuration optimized for high-throughput scenarios.
+ ///
+ ///
+ ///
+ ///
+ /// A new configuration instance optimized for high throughput.
+ ///
+ ///
+ public static StorageConfiguration CreateHighThroughput()
+ {
+ return new StorageConfiguration
+ {
+ MaxSectionCapacity = 16 * 1024 * 1024, // 16MB sections
+ NumberOfSections = Environment.ProcessorCount * 2,
+ AllocationMode = SectionAllocationMode.Heap,
+ EnableAutoExpansion = true,
+ TargetCpuUtilization = 0.9,
+ EnablePerformanceMonitoring = true
+ };
+ }
+
+ ///
+ ///
+ /// Creates a configuration optimized for memory-efficient scenarios.
+ ///
+ ///
+ ///
+ ///
+ /// The base directory for file storage.
+ ///
+ ///
+ ///
+ /// A new configuration instance optimized for memory efficiency.
+ ///
+ ///
+ public static StorageConfiguration CreateMemoryEfficient(string baseDirectory)
+ {
+ return new StorageConfiguration
+ {
+ MaxSectionCapacity = 256 * 1024, // 256KB sections
+ NumberOfSections = Environment.ProcessorCount,
+ AllocationMode = SectionAllocationMode.MemoryMapped,
+ BaseDirectory = baseDirectory,
+ EnableAutoExpansion = false,
+ TargetCpuUtilization = 0.7,
+ EnablePerformanceMonitoring = false
+ };
+ }
+ }
+}
\ No newline at end of file
diff --git a/csharp/Platform.Data/MultithreadedStorage/StorageRequest.cs b/csharp/Platform.Data/MultithreadedStorage/StorageRequest.cs
new file mode 100644
index 0000000..298fedf
--- /dev/null
+++ b/csharp/Platform.Data/MultithreadedStorage/StorageRequest.cs
@@ -0,0 +1,211 @@
+using System;
+using System.Collections.Generic;
+using System.Numerics;
+using System.Threading.Tasks;
+using Platform.Delegates;
+
+#pragma warning disable CS1591 // Missing XML comment for publicly visible type or member
+
+namespace Platform.Data.MultithreadedStorage
+{
+ ///
+ ///
+ /// Represents the type of storage operation.
+ ///
+ ///
+ ///
+ public enum StorageOperationType
+ {
+ Count,
+ Each,
+ Create,
+ Update,
+ Delete
+ }
+
+ ///
+ ///
+ /// Represents a request for a storage operation that can be processed by multiple sections.
+ ///
+ ///
+ ///
+ ///
+ /// The type of link address.
+ ///
+ ///
+ public class StorageRequest
+ where TLinkAddress : IUnsignedNumber, IComparable
+ {
+ ///
+ /// Gets the unique identifier for this request.
+ ///
+ ///
+ public Guid Id { get; }
+
+ ///
+ /// Gets the type of storage operation.
+ ///
+ ///
+ public StorageOperationType OperationType { get; }
+
+ ///
+ /// Gets the restriction for the operation.
+ ///
+ ///
+ public IList? Restriction { get; }
+
+ ///
+ /// Gets the substitution for create/update operations.
+ ///
+ ///
+ public IList? Substitution { get; }
+
+ ///
+ /// Gets the read handler for each operations.
+ ///
+ ///
+ public ReadHandler? ReadHandler { get; }
+
+ ///
+ /// Gets the write handler for create/update/delete operations.
+ ///
+ ///
+ public WriteHandler? WriteHandler { get; }
+
+ ///
+ /// Gets the task completion source for returning results.
+ ///
+ ///
+ public TaskCompletionSource CompletionSource { get; }
+
+ ///
+ /// Gets the timestamp when this request was created.
+ ///
+ ///
+ public DateTime CreatedAt { get; }
+
+ ///
+ ///
+ /// Initializes a new instance of the class.
+ ///
+ ///
+ ///
+ ///
+ /// The type of storage operation.
+ ///
+ ///
+ ///
+ /// The restriction for the operation.
+ ///
+ ///
+ ///
+ /// The substitution for create/update operations.
+ ///
+ ///
+ ///
+ /// The read handler for each operations.
+ ///
+ ///
+ ///
+ /// The write handler for create/update/delete operations.
+ ///
+ ///
+ public StorageRequest(
+ StorageOperationType operationType,
+ IList? restriction = null,
+ IList? substitution = null,
+ ReadHandler? readHandler = null,
+ WriteHandler? writeHandler = null)
+ {
+ Id = Guid.NewGuid();
+ OperationType = operationType;
+ Restriction = restriction;
+ Substitution = substitution;
+ ReadHandler = readHandler;
+ WriteHandler = writeHandler;
+ CompletionSource = new TaskCompletionSource();
+ CreatedAt = DateTime.UtcNow;
+ }
+ }
+
+ ///
+ ///
+ /// Represents the result of processing a storage request by a section.
+ ///
+ ///
+ ///
+ ///
+ /// The type of link address.
+ ///
+ ///
+ public class StorageResult
+ where TLinkAddress : IUnsignedNumber, IComparable
+ {
+ ///
+ /// Gets the request ID this result corresponds to.
+ ///
+ ///
+ public Guid RequestId { get; }
+
+ ///
+ /// Gets the section ID that processed this request.
+ ///
+ ///
+ public int SectionId { get; }
+
+ ///
+ /// Gets the result value.
+ ///
+ ///
+ public TLinkAddress Value { get; }
+
+ ///
+ /// Gets any exception that occurred during processing.
+ ///
+ ///
+ public Exception? Exception { get; }
+
+ ///
+ /// Gets whether the operation was successful.
+ ///
+ ///
+ public bool IsSuccess => Exception == null;
+
+ ///
+ /// Gets the timestamp when this result was created.
+ ///
+ ///
+ public DateTime CreatedAt { get; }
+
+ ///
+ ///
+ /// Initializes a new instance of the class.
+ ///
+ ///
+ ///
+ ///
+ /// The request ID this result corresponds to.
+ ///
+ ///
+ ///
+ /// The section ID that processed this request.
+ ///
+ ///
+ ///
+ /// The result value.
+ ///
+ ///
+ ///
+ /// Any exception that occurred during processing.
+ ///
+ ///
+ public StorageResult(Guid requestId, int sectionId, TLinkAddress value, Exception? exception = null)
+ {
+ RequestId = requestId;
+ SectionId = sectionId;
+ Value = value;
+ Exception = exception;
+ CreatedAt = DateTime.UtcNow;
+ }
+ }
+}
\ No newline at end of file
diff --git a/csharp/Platform.Data/Platform.Data.csproj b/csharp/Platform.Data/Platform.Data.csproj
index 929fb16..99305fe 100644
--- a/csharp/Platform.Data/Platform.Data.csproj
+++ b/csharp/Platform.Data/Platform.Data.csproj
@@ -4,12 +4,12 @@
LinksPlatform's Platform.Data Class Library
konard, FreePhoenix888
Platform.Data
- 0.16.1
+ 0.17.0
konard, FreePhoenix888
net8
Platform.Data
Platform.Data
- LinksPlatform;Data;ArgumentLinkDoesNotExistsException;ArgumentLinkHasDependenciesException;LinksLimitReachedException;LinksLimitReachedExceptionBase;LinkWithSameValueAlreadyExistsException;AddressToRawNumberConverter;RawNumberToAddressConverter;ISequenceAppender;ISequenceWalker;SequenceWalker;StopableSequenceWalker;Hybrid;ILinks;ILinksExtensions;ISynchronizedLinks;LinkAddress;LinksConstants;LinksConstantsBase;LinksConstantsExtensions;Point
+ LinksPlatform;Data;ArgumentLinkDoesNotExistsException;ArgumentLinkHasDependenciesException;LinksLimitReachedException;LinksLimitReachedExceptionBase;LinkWithSameValueAlreadyExistsException;AddressToRawNumberConverter;RawNumberToAddressConverter;ISequenceAppender;ISequenceWalker;SequenceWalker;StopableSequenceWalker;Hybrid;ILinks;ILinksExtensions;ISynchronizedLinks;LinkAddress;LinksConstants;LinksConstantsBase;LinksConstantsExtensions;Point;MapReduceCombinedLinksStorage;MultithreadedStorage;LockFreeQueue;StorageSection;DistributedStorage
https://raw.githubusercontent.com/linksplatform/Documentation/18469f4d033ee9a5b7b84caab9c585acab2ac519/doc/Avatar-rainbow-icon-64x64.png
https://linksplatform.github.io/Data
Unlicense
@@ -23,7 +23,7 @@
true
snupkg
latest
- Update target framework from net7 to net8.
+ Add MapReduce multithread combined storage for lock-free distributed operations across multiple sections. Implements issue #77 with configurable section capacity, thread-per-section architecture, and seamless MapReduce request/response streaming.
enable