Skip to content

Commit 0295f2e

Browse files
committed
Polish up queuing
* Rework settings * Add mutex to file client to eliminate error message when multiple threads attempt to dequeue the same message. * Move GetCommandInstance into command enevelope instead of QueueAdapter - makes more sense there.
1 parent 0c65138 commit 0295f2e

File tree

11 files changed

+88
-51
lines changed

11 files changed

+88
-51
lines changed

src/InEngine.Core/Queuing/Clients/FileClient.cs

Lines changed: 17 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,15 @@ namespace InEngine.Core.Queuing.Clients
1111
{
1212
public class FileClient : IQueueClient
1313
{
14+
static Mutex consumeLock = new Mutex();
15+
public static FileClientSettings ClientSettings { get; set; }
16+
1417
public ILog Log { get; set; } = LogManager.GetLogger<FileClient>();
1518
public int Id { get; set; } = 0;
1619
public string QueueBaseName { get; set; }
1720
public string QueueName { get; set; }
1821
public bool UseCompression { get; set; }
19-
public string QueuePath { get { return $"{QueueBaseName}_{QueueName}"; } }
22+
public string QueuePath { get { return Path.Combine(ClientSettings.BasePath, $"{QueueBaseName}_{QueueName}"); } }
2023
public string PendingQueuePath {
2124
get {
2225
var path = $"{QueuePath}_Pending";
@@ -25,6 +28,7 @@ public string PendingQueuePath {
2528
return path;
2629
}
2730
}
31+
2832
public string InProgressQueuePath
2933
{
3034
get
@@ -86,28 +90,26 @@ public void Consume(CancellationToken cancellationToken)
8690

8791
public ICommandEnvelope Consume()
8892
{
89-
var fileInfo = new DirectoryInfo(PendingQueuePath)
93+
FileInfo fileInfo;
94+
var inProgressFilePath = String.Empty;
95+
96+
consumeLock.WaitOne();
97+
fileInfo = new DirectoryInfo(PendingQueuePath)
9098
.GetFiles()
9199
.OrderBy(x => x.LastWriteTimeUtc)
92100
.FirstOrDefault();
93-
if (fileInfo == null)
94-
return null;
95-
var inProgressFilePath = Path.Combine(InProgressQueuePath, fileInfo.Name);
96-
97-
try
98-
{
101+
if (fileInfo != null) {
102+
inProgressFilePath = Path.Combine(InProgressQueuePath, fileInfo.Name);
99103
fileInfo.MoveTo(inProgressFilePath);
100104
}
101-
catch (FileNotFoundException exception)
102-
{
103-
// Another process probably consumed the file when it was read and moved.
104-
Log.Debug(exception);
105+
if (fileInfo == null) {
106+
consumeLock.ReleaseMutex();
105107
return null;
106108
}
107-
109+
consumeLock.ReleaseMutex();
110+
108111
var commandEnvelope = File.ReadAllText(inProgressFilePath).DeserializeFromJson<CommandEnvelope>() as ICommandEnvelope;
109-
110-
var command = QueueAdapter.ExtractCommandInstanceFromMessage(commandEnvelope);
112+
var command = commandEnvelope.GetCommandInstance();
111113
command.CommandLifeCycle.IncrementRetry();
112114
commandEnvelope.SerializedCommand = command.SerializeToJson(UseCompression);
113115
try
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
namespace InEngine.Core.Queuing.Clients
2+
{
3+
public class FileClientSettings
4+
{
5+
public string BasePath { get; set; }
6+
}
7+
}

src/InEngine.Core/Queuing/Clients/RabbitMQClient.cs

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ namespace InEngine.Core.Queuing.Clients
1212
{
1313
public class RabbitMQClient : IQueueClient, IDisposable
1414
{
15+
public static RabbitMQClientSettings ClientSettings { get; set; }
16+
1517
public ILog Log { get; set; } = LogManager.GetLogger<SyncClient>();
1618
public int Id { get; set; } = 0;
1719
public string QueueBaseName { get; set; } = "InEngineQueue";
@@ -23,7 +25,10 @@ public class RabbitMQClient : IQueueClient, IDisposable
2325
public IConnection Connection { get {
2426
if (_connection == null)
2527
_connection = new ConnectionFactory() {
26-
HostName = "localhost",
28+
HostName = ClientSettings.Host,
29+
Port = ClientSettings.Port,
30+
UserName = ClientSettings.Username,
31+
Password = ClientSettings.Password,
2732
AutomaticRecoveryEnabled = true
2833
}.CreateConnection();
2934
return _connection;
@@ -51,10 +56,6 @@ public void Publish(AbstractCommand command)
5156

5257
Channel.ExchangeDeclare(QueueBaseName, ExchangeType.Direct);
5358
Channel.QueueDeclare(PendingQueueName, true, false, false);
54-
Channel.BasicReturn += (model, eb) =>
55-
{
56-
throw new Exception("returned");
57-
};
5859
var properties = Channel.CreateBasicProperties();
5960
properties.Persistent = true;
6061
Channel.QueueBind(PendingQueueName, QueueBaseName, QueueName, null);
@@ -80,7 +81,7 @@ public void Consume(CancellationToken cancellationToken)
8081
if (commandEnvelope == null)
8182
throw new CommandFailedException("Could not deserialize the command.");
8283

83-
var command = QueueAdapter.ExtractCommandInstanceFromMessage(commandEnvelope);
84+
var command = commandEnvelope.GetCommandInstance();
8485
command.CommandLifeCycle.IncrementRetry();
8586
commandEnvelope.SerializedCommand = command.SerializeToJson(UseCompression);
8687
try
@@ -116,7 +117,7 @@ public ICommandEnvelope Consume()
116117
if (commandEnvelope == null)
117118
throw new CommandFailedException("Could not deserialize the command.");
118119

119-
var command = QueueAdapter.ExtractCommandInstanceFromMessage(commandEnvelope);
120+
var command = commandEnvelope.GetCommandInstance();
120121
command.CommandLifeCycle.IncrementRetry();
121122
commandEnvelope.SerializedCommand = command.SerializeToJson(UseCompression);
122123
try
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
namespace InEngine.Core.Queuing.Clients
2+
{
3+
public class RabbitMQClientSettings
4+
{
5+
public string Host { get; set; }
6+
public int Port { get; set; }
7+
public string Username { get; set; }
8+
public string Password { get; set; }
9+
}
10+
}

src/InEngine.Core/Queuing/Clients/RedisClient.cs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ namespace InEngine.Core.Queuing.Clients
1212
{
1313
public class RedisClient : IQueueClient
1414
{
15+
public static RedisClientSettings ClientSettings { get; set; }
16+
1517
public ILog Log { get; set; } = LogManager.GetLogger<RedisClient>();
1618
public int Id { get; set; } = 0;
1719
public string QueueBaseName { get; set; } = "InEngineQueue";
@@ -21,19 +23,17 @@ public class RedisClient : IQueueClient
2123
public string InProgressQueueName { get { return QueueBaseName + $":{QueueName}:InProgress"; } }
2224
public string FailedQueueName { get { return QueueBaseName + $":{QueueName}:Failed"; } }
2325
public static Lazy<ConnectionMultiplexer> lazyConnection = new Lazy<ConnectionMultiplexer>(() => {
24-
var queueSettings = InEngineSettings.Make().Queue;
25-
var redisConfig = ConfigurationOptions.Parse($"{queueSettings.RedisHost}:{queueSettings.RedisPort}");
26-
redisConfig.Password = string.IsNullOrWhiteSpace(queueSettings.RedisPassword) ?
26+
var redisConfig = ConfigurationOptions.Parse($"{ClientSettings.Host}:{ClientSettings.Port}");
27+
redisConfig.Password = string.IsNullOrWhiteSpace(ClientSettings.Password) ?
2728
null :
28-
queueSettings.RedisPassword;
29+
ClientSettings.Password;
2930
redisConfig.AbortOnConnectFail = false;
3031
return ConnectionMultiplexer.Connect(redisConfig);
3132
});
3233
public static ConnectionMultiplexer Connection { get { return lazyConnection.Value; } }
3334
public ConnectionMultiplexer _connectionMultiplexer;
34-
public IDatabase Redis { get { return Connection.GetDatabase(RedisDb); } }
35+
public IDatabase Redis { get { return Connection.GetDatabase(ClientSettings.Database); } }
3536
public bool UseCompression { get; set; }
36-
public int RedisDb { get; set; }
3737

3838
public RedisChannel RedisChannel { get; set; }
3939

@@ -101,7 +101,7 @@ public ICommandEnvelope Consume()
101101
if (commandEnvelope == null)
102102
throw new CommandFailedException("Could not deserialize the command.");
103103

104-
var command = QueueAdapter.ExtractCommandInstanceFromMessage(commandEnvelope);
104+
var command = commandEnvelope.GetCommandInstance();
105105
command.CommandLifeCycle.IncrementRetry();
106106
commandEnvelope.SerializedCommand = command.SerializeToJson(UseCompression);
107107
try
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
namespace InEngine.Core.Queuing.Clients
2+
{
3+
public class RedisClientSettings
4+
{
5+
public string Host { get; set; }
6+
public int Port { get; set; }
7+
public int Database { get; set; }
8+
public string Password { get; set; }
9+
}
10+
}

src/InEngine.Core/Queuing/Commands/Peek.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ public void PrintMessages(List<ICommandEnvelope> messages, string queueName)
6969
if (JsonFormat)
7070
Line(commandEnvelope.SerializeToJson());
7171
else
72-
konsoleForm.Write(QueueAdapter.ExtractCommandInstanceFromMessage(commandEnvelope));
72+
konsoleForm.Write(commandEnvelope.GetCommandInstance());
7373
});
7474
}
7575
}

src/InEngine.Core/Queuing/Message/CommandEnvelope.cs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
using System;
2+
using InEngine.Core.Exceptions;
23

34
namespace InEngine.Core.Queuing.Message
45
{
@@ -10,5 +11,13 @@ public class CommandEnvelope : ICommandEnvelope
1011
public string SerializedCommand { get; set; }
1112
public DateTime QueuedAt { get; set; } = DateTime.UtcNow;
1213
public bool IsCompressed { get; set; }
14+
15+
public AbstractCommand GetCommandInstance()
16+
{
17+
var commandType = PluginAssembly.LoadFrom(PluginName).GetCommandType(CommandClassName);
18+
if (commandType == null)
19+
throw new CommandFailedException($"Could not locate command {CommandClassName}. Is the {PluginName} plugin registered in the settings file?");
20+
return SerializedCommand.DeserializeFromJson<AbstractCommand>(IsCompressed);
21+
}
1322
}
1423
}

src/InEngine.Core/Queuing/Message/ICommandEnvelope.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,5 +10,6 @@ public interface ICommandEnvelope
1010
string SerializedCommand { get; set; }
1111
DateTime QueuedAt { get; set; }
1212
bool IsCompressed { get; set; }
13+
AbstractCommand GetCommandInstance();
1314
}
1415
}

src/InEngine.Core/Queuing/QueueAdapter.cs

Lines changed: 12 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -24,22 +24,27 @@ public static QueueAdapter Make(bool useSecondaryQueue = false, QueueSettings qu
2424
var queueDriverName = queueSettings.QueueDriver.ToLower();
2525
var queue = new QueueAdapter();
2626

27-
if (queueDriverName == "redis")
27+
if (queueDriverName == "redis") {
28+
RedisClient.ClientSettings = queueSettings.Redis;
2829
queue.QueueClient = new RedisClient() {
2930
QueueBaseName = queueSettings.QueueName,
3031
UseCompression = queueSettings.UseCompression,
31-
RedisDb = queueSettings.RedisDb
32-
};
33-
else if (queueDriverName == "rabbitmq")
32+
};
33+
}
34+
else if (queueDriverName == "rabbitmq") {
35+
RabbitMQClient.ClientSettings = queueSettings.RabbitMQ;
3436
queue.QueueClient = new RabbitMQClient() {
3537
QueueBaseName = queueSettings.QueueName,
36-
UseCompression = queueSettings.UseCompression,
38+
UseCompression = queueSettings.UseCompression
3739
};
38-
else if (queueDriverName == "file")
40+
}
41+
else if (queueDriverName == "file") {
42+
FileClient.ClientSettings = queueSettings.File;
3943
queue.QueueClient = new FileClient() {
4044
QueueBaseName = queueSettings.QueueName,
41-
UseCompression = queueSettings.UseCompression,
45+
UseCompression = queueSettings.UseCompression
4246
};
47+
}
4348
else if (queueDriverName == "sync")
4449
queue.QueueClient = new SyncClient();
4550
else
@@ -69,15 +74,6 @@ public void Recover()
6974
QueueClient.Recover();
7075
}
7176

72-
public static AbstractCommand ExtractCommandInstanceFromMessage(ICommandEnvelope commandEnvelope)
73-
{
74-
var commandType = PluginAssembly.LoadFrom(commandEnvelope.PluginName)
75-
.GetCommandType(commandEnvelope.CommandClassName);
76-
if (commandType == null)
77-
throw new CommandFailedException($"Could not locate command {commandEnvelope.CommandClassName}. Is the {commandEnvelope.PluginName} plugin registered in the settings file?");
78-
return commandEnvelope.SerializedCommand.DeserializeFromJson<AbstractCommand>(commandEnvelope.IsCompressed);
79-
}
80-
8177
public long GetPendingQueueLength()
8278
{
8379
return QueueClient.GetPendingQueueLength();

0 commit comments

Comments
 (0)