Очереди Windows Azure
1) Очереди используются для хранения сообщения
2) Используют FIFO, однонаправленные
3) Имя очереди должно быть именем в нижнем регистре и URL-friendly
.
Сообщения:
Ссылка на очередь выглядит стандартно для именования сущностей в сервисах хранилища Windows Azure:
http://<account>.queue.core.windows.net/<QueueName>
К основным операциям над очередями с использованием клиента облачных очередей CloudQueueClient можно отнести получение ссылки на очередь (GetQueueReference, возвращает объект CloudQueue) и получение списка очередей (ListQueues).
var queueClient = CloudStorageAccount.FromConfigurationSetting("DataStorage").CreateCloudQueueClient(); var queue = queueClient.GetQueueReference("messages");
У объекта CloudQueue доступно несколько операций по управлению очередью:
var queueClient = CloudStorageAccount.FromConfigurationSetting("DataStorage").CreateCloudQueueClient(); var queue = queue.Client.GetQueueReference("messages"); queue.CreateIfNotExist(); queue.Metadata.Add(new NameValueCollection() { {"Name","Ahriman"}. {"DateOfMsgCreation", DateTime.UtcNow.ToString()} }); queue.SetMetadata();
Асинхронное создание очереди:
var queueClient = CloudStorageAccount.FromConfigurationSetting("DataStorage").CreateCloudQueueClient(); var queue = queue.Client.GetQueueReference("messages"); if (!queue.Exists)){ queue.BeginCreate(EndCreateQueue, queue); } protected static void EndCreateQueue(IAsyncResult result) { var queue = result.AsyncState as CloudQueue; queue.EndCreate(result); }
var queueClient = CloudStorageAccount.FromConfigurationSetting("DataStorage").CreateCloudQueueClient(); var queue = queue.Client.GetQueueReference("messages"); queue.Delete();
var queueClient = CloudStorageAccount.FromConfigurationSetting("DataStorage").CreateCloudQueueClient(); var queue = queue.Client.GetQueueReference("messages"); if (!queue.Exists()) { queue.Create(); }
var queueClient = CloudStorageAccount.FromConfigurationSetting("DataStorage").CreateCloudQueueClient(); var queue = queue.Client.GetQueueReference("messages"); //создание объекта заранее определенного класса, помеченного как Serialize var msg = new Message {id = Guid.NewGuid().ToString(), name = "Ahriman"}; //создание объекта сообщения облачной очереди и передача в качестве аргумента созданного //сериализованного (с помощью какого-либо реализова��ного метода) объекта msg var queueMessage = new CloudQueueMessage(msg.SerializeToBinary()); //добавление сообщения в очередь. Первый аргумент - сообщение, второй - время жизни сообщения //(максимум - 7 дней, нельзя забывать про Garbage Collector), третий, необязательный аргумент - //время "невидимости" сообщения для обработчика после добавления сообщения в очередь queue.AddMessage(queueMessage,TimeSpan.FromMinutes(5)); //”подсматривание”, но не забор сообщения из очереди var peekedMessage = queue.PeekMessage(); //получение сообщения из начала очереди var retrievedMessage = queue.GetMessage(); retrievedMessage.SetMessageContent("Some update."); //сообщение в очереди обновляется и становится сразу же видимым для обработчиков queue.UpdateMessage(retrievedMessage,TimeSpan.FromSeconds(0.0), MessageUpdateFields.Content | MessageUpdateFields.Visibility); //печать значения свойства Id возвращенного объекта Message. System.Console.WriteLine(retrievedMessage.FromMessage<Message>().Id); //удаление сообщения из очереди queue.DeleteMessage(retrievedMessage);
RetrieveApproximateMessageCount:
Возвращает примерно количество сообщений в очереди. Примерное – потому, что сообщение могут быть добавлены или удалены после того, как вы отправите запрос на количество сообщений очереди.
var queueClient = CloudStorageAccount.FromConfigurationSetting("DataStorage").CreateCloudQueueClient(); var queue = queue.Client.GetQueueReference("messages"); int msgCount = queue.RetrieveApproximateMessageCount(); int cachedMsgCount = queue.ApproximateMessageCount;
Опрос очереди на наличие нового сообщения в обработчике принято выполнять в бесконечном цикле.
while (true) { var msg = queue.GetMessage(); if (msg != null) { string retrievedMsg = msg.AsString; //логика query.DeleteMessage(msg); } else { Thread.Sleep(1000); }
Обратите внимание, что использование Thread.Sleep необосновано с экономической точки зрения, поэтому рекомендуется использовать Timer.
Усеченный экспоненциальный алгоритм отката
Одним из подходов к опросу очереди и облегчению нагрузки на очередь является использование усеченного экспоненциального алгоритма отката. Суть заключается в том, что каждый “пустой” опрос увеличивает интервал опроса вдвое, не пустой же опрос ставит интервал обратно в 1.
while (true) { var msg = queue.GetMessage(); if (msg != null) { string retrievedMsg = msg.AsString; //логика query.DeleteMessage(msg); if (gradualDecrease) if (curInterval > intervalFloor) curInterval = curInterval /2; else curInterval = intervalFloor; else curInterval = intervalFloor; } else { if (curInterval <intervalCeiling) curInterval = curInterval * 2; }
Долгие очереди
Очередь, собирающая сообщения в период отключённого потребителя, называется долгой. В этом случае потребитель очереди может выходить в онлайн раз в день и забирать все сообщения, затем снова отключаться. Полезно для обмена сообщениями с внешними вендорами, а также для сокращения оплаты за время выполнения сервиса (в том случае, если потребитель является, например, Worker-ролью в Windows Azure).
Рекомендация
В качестве рекомендаций можно упомянуть разбиение сложного процесса на состояние, при этом каждое состояние является отдельным процессом-потребителем, опрашивающим определенную очередь.
Неоптимальная архитектура – один большой потребитель обрабатывает одну очередь. Является негибким подходом.
Архитектура, являющаяся более оптимальной с позиции гибкости – разделенная на различных потребителей и очередей, каждый из которых занимается собственной задачей.