Skip to content

Commit 7574c9d

Browse files
committed
Add RabbitMQ Client
1 parent 265d99b commit 7574c9d

File tree

5 files changed

+213
-2
lines changed

5 files changed

+213
-2
lines changed

src/InEngine.Core/InEngine.Core.csproj

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
<PackageReference Include="RestSharp" Version="106.1.0" />
3030
<PackageReference Include="MailKit" Version="1.22.0" />
3131
<PackageReference Include="Serialize.Linq" Version="1.4.0" />
32+
<PackageReference Include="RabbitMQ.Client" Version="5.0.1" />
3233
</ItemGroup>
3334
<ItemGroup>
3435
<Folder Include="Scheduling\" />
Lines changed: 201 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,201 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.Text;
4+
using System.Threading;
5+
using Common.Logging;
6+
using InEngine.Core.Exceptions;
7+
using InEngine.Core.Queuing.Message;
8+
using RabbitMQ.Client;
9+
using RabbitMQ.Client.Events;
10+
11+
namespace InEngine.Core.Queuing.Clients
12+
{
13+
public class RabbitMQClient : IQueueClient, IDisposable
14+
{
15+
public ILog Log { get; set; } = LogManager.GetLogger<SyncClient>();
16+
public int Id { get; set; } = 0;
17+
public string QueueBaseName { get; set; } = "InEngineQueue";
18+
public string QueueName { get; set; } = "Primary";
19+
public string PendingQueueName { get { return QueueBaseName + $":{QueueName}:Pending"; } }
20+
public string FailedQueueName { get { return QueueBaseName + $":{QueueName}:Failed"; } }
21+
public bool UseCompression { get; set; }
22+
IConnection _connection;
23+
public IConnection Connection { get {
24+
if (_connection == null)
25+
_connection = new ConnectionFactory() {
26+
HostName = "localhost",
27+
AutomaticRecoveryEnabled = true
28+
}.CreateConnection();
29+
return _connection;
30+
}
31+
}
32+
IModel _channel;
33+
public IModel Channel {
34+
get {
35+
if (_channel == null)
36+
_channel = Connection.CreateModel();
37+
return _channel;
38+
}
39+
}
40+
41+
public RabbitMQClient()
42+
{
43+
}
44+
45+
public void Publish(AbstractCommand command)
46+
{
47+
48+
var body = Encoding.UTF8.GetBytes(new CommandEnvelope()
49+
{
50+
IsCompressed = UseCompression,
51+
CommandClassName = command.GetType().FullName,
52+
PluginName = command.GetType().Assembly.GetName().Name,
53+
SerializedCommand = command.SerializeToJson(UseCompression)
54+
}.SerializeToJson());
55+
56+
Channel.ExchangeDeclare(QueueBaseName, ExchangeType.Direct);
57+
Channel.QueueDeclare(PendingQueueName, true, false, false);
58+
Channel.BasicReturn += (model, eb) =>
59+
{
60+
throw new Exception("returned");
61+
};
62+
var properties = Channel.CreateBasicProperties();
63+
properties.Persistent = true;
64+
Channel.QueueBind(PendingQueueName, QueueBaseName, QueueName, null);
65+
Channel.BasicPublish(exchange: QueueBaseName,
66+
routingKey: QueueName,
67+
basicProperties: properties,
68+
mandatory: true,
69+
body: body);
70+
}
71+
72+
public void Recover()
73+
{ }
74+
75+
public void Consume(CancellationToken cancellationToken)
76+
{
77+
Channel.QueueDeclare(PendingQueueName, true, false, false);
78+
var consumer = new EventingBasicConsumer(Channel);
79+
consumer.Received += (model, result) => {
80+
var eventingConsumer = (EventingBasicConsumer)model;
81+
82+
var serializedMessage = Encoding.UTF8.GetString(result.Body);
83+
var commandEnvelope = serializedMessage.DeserializeFromJson<CommandEnvelope>();
84+
if (commandEnvelope == null)
85+
throw new CommandFailedException("Could not deserialize the command.");
86+
87+
var command = QueueAdapter.ExtractCommandInstanceFromMessage(commandEnvelope);
88+
command.CommandLifeCycle.IncrementRetry();
89+
commandEnvelope.SerializedCommand = command.SerializeToJson(UseCompression);
90+
try
91+
{
92+
command.WriteSummaryToConsole();
93+
command.RunWithLifeCycle();
94+
}
95+
catch (Exception exception)
96+
{
97+
Log.Error(exception);
98+
if (command.CommandLifeCycle.ShouldRetry())
99+
eventingConsumer.Model.BasicNack(result.DeliveryTag, false, true);
100+
else
101+
{
102+
eventingConsumer.Model.BasicNack(result.DeliveryTag, false, false);
103+
throw new CommandFailedException("Failed to run consumed command.", exception);
104+
}
105+
}
106+
Log.Debug("Acknowledging message...");
107+
eventingConsumer.Model.BasicAck(result.DeliveryTag, false);
108+
};
109+
Channel.BasicConsume(queue: PendingQueueName, autoAck: false, consumer: consumer);
110+
}
111+
112+
public ICommandEnvelope Consume()
113+
{
114+
var result = Channel.BasicGet(PendingQueueName, false);
115+
if (result == null)
116+
return null;
117+
118+
var serializedMessage = Encoding.UTF8.GetString(result.Body);
119+
var commandEnvelope = serializedMessage.DeserializeFromJson<CommandEnvelope>();
120+
if (commandEnvelope == null)
121+
throw new CommandFailedException("Could not deserialize the command.");
122+
123+
var command = QueueAdapter.ExtractCommandInstanceFromMessage(commandEnvelope);
124+
command.CommandLifeCycle.IncrementRetry();
125+
commandEnvelope.SerializedCommand = command.SerializeToJson(UseCompression);
126+
try
127+
{
128+
command.WriteSummaryToConsole();
129+
command.RunWithLifeCycle();
130+
}
131+
catch (Exception exception)
132+
{
133+
Log.Error(exception);
134+
if (command.CommandLifeCycle.ShouldRetry())
135+
Channel.BasicNack(result.DeliveryTag, false, true);
136+
else
137+
{
138+
Channel.BasicNack(result.DeliveryTag, false, false);
139+
throw new CommandFailedException("Failed to run consumed command.", exception);
140+
}
141+
}
142+
Channel.BasicAck(result.DeliveryTag, false);
143+
return commandEnvelope;
144+
}
145+
146+
public bool ClearFailedQueue()
147+
{
148+
throw new NotImplementedException();
149+
}
150+
151+
public bool ClearInProgressQueue()
152+
{
153+
throw new NotImplementedException();
154+
}
155+
156+
public bool ClearPendingQueue()
157+
{
158+
throw new NotImplementedException();
159+
}
160+
161+
public long GetFailedQueueLength()
162+
{
163+
throw new NotImplementedException();
164+
}
165+
166+
public long GetInProgressQueueLength()
167+
{
168+
throw new NotImplementedException();
169+
}
170+
171+
public long GetPendingQueueLength()
172+
{
173+
throw new NotImplementedException();
174+
}
175+
176+
public List<ICommandEnvelope> PeekFailedMessages(long from, long to)
177+
{
178+
throw new NotImplementedException();
179+
}
180+
181+
public List<ICommandEnvelope> PeekInProgressMessages(long from, long to)
182+
{
183+
throw new NotImplementedException();
184+
}
185+
186+
public List<ICommandEnvelope> PeekPendingMessages(long from, long to)
187+
{
188+
throw new NotImplementedException();
189+
}
190+
191+
public void RepublishFailedMessages()
192+
{
193+
throw new NotImplementedException();
194+
}
195+
196+
public void Dispose()
197+
{
198+
Connection.Close();
199+
}
200+
}
201+
}

src/InEngine.Core/Queuing/Dequeue.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,10 @@ IList<Task> MakeTasks(bool useSecondaryQueue = false, int numberOfTasks = 0)
5252

5353
public void Dispose()
5454
{
55+
queueAdapters.ToList().ForEach(x => {
56+
if (x is IDisposable)
57+
(x as IDisposable).Dispose();
58+
});
5559
CancellationTokenSource.Cancel();
5660
}
5761
}

src/InEngine.Core/Queuing/QueueAdapter.cs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,16 @@ public static QueueAdapter Make(bool useSecondaryQueue = false, QueueSettings qu
2525
var queue = new QueueAdapter();
2626

2727
if (queueDriverName == "redis")
28-
queue.QueueClient = new RedisClient()
29-
{
28+
queue.QueueClient = new RedisClient() {
3029
QueueBaseName = queueSettings.QueueName,
3130
UseCompression = queueSettings.UseCompression,
3231
RedisDb = queueSettings.RedisDb
3332
};
33+
else if (queueDriverName == "rabbitmq")
34+
queue.QueueClient = new RabbitMQClient() {
35+
QueueBaseName = queueSettings.QueueName,
36+
UseCompression = queueSettings.UseCompression,
37+
};
3438
else if (queueDriverName == "file")
3539
queue.QueueClient = new FileClient() {
3640
QueueBaseName = queueSettings.QueueName,

src/InEngine.Core/ServerHost.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ public async void StartDequeueAsync()
3030
public void Dispose()
3131
{
3232
SuperScheduler?.Shutdown();
33+
Dequeue.Dispose();
3334
}
3435
}
3536
}

0 commit comments

Comments
 (0)