Skip to content
Petr Arsentev edited this page Dec 16, 2023 · 1 revision

В этом тестовой задании вам необходимо сделать микроброкер сообщений.

Брокер сообщений - это отдельное приложение, которое пересылает сообщения между другими приложениями.

Зачем они нужны и почему приложение не может напрямую передать данные в другое приложение?

Брокер сообщений позволяют убрать жесткую взять и стандартизировать механизмы общения.

Клиенты брокера могут быть двух типов: поставщики сообщений и потребители сообщений. В большинстве случаев одно и то же приложение сразу осуществляют роль поставщика и потребителя.

Отличительной чертой общение через брокер является асинхронность событий, то есть сообщение отправляется без ожидания подтверждения его доставки до потребителя.

Рассмотрим пример. вы звоните другу по телефону и ждете его ответ. Это синхронное действия. Вам и другу нужно быть у аппарата в одно и то же время.

Если у друга установлен автоответчик, то вы можете оставить сообщение автоответчику и положить трубку. Друг придет и прослушает сообщение, когда у него будет возможность. Это асинхронное действие. Вы и друг выполняют отправку и прием сообщения в разное время.

Этот пример демонстрируем смысл брокера сообщений, то есть разбить поставку на два этапа.

Брокер имеет два режима работы: очереди и темы.

Режим очередь

Рассмотри пример из трех клиентов: один поставщик, два потребителя.

Поставщик создает события, потребитель их получает.

Представим, что поставщик отправил два сообщения в очередь: ["message 1", "message 2"].

Если у нас два потребителя, то первый получит сообщение "message 1", а второй "message 2".

То есть в режиме очередь все сообщения равномерно распределяются между потребителями.

Режим темы

Рассмотри так же пример из трех клиентов: один поставщик, два потребителя.

Поставщик создает события, потребитель их получает.

Представим, что поставщик отправил два сообщения в очередь: ["message 1", "message 2"].

Если у нас два потребителя, то каждый из них должен получить все сообщения от поставщика,

То есть потребитель 1 получить сообщения: "message 1", "message 2".

и потребитель 2 тоже получить сообщения: "message 1", "message 2".

То есть в режиме темы все сообщения от поставщика должны быть доставлены каждому потребителю.

Исходный код проекта находится тут.

Скачайте проект. Проект использует систему сборки Maven.

Запустите класс PoohServer. Брокер Pooh используют socket для общение между клиентами.

Запустите класс ProducerClient.

Запустите класс ConsumerClient.

ProducerClient отправляет в PoohService сообщение вида queue;weather;text

queue - это режим работы

weather - имя контейнера

text - текст сообщения.

ConsumerClient отправляет сообщение intro;queue;weather. Это сообщение служит для регистрации клиента в качестве потребителя. После этого Consumer переходит в режим ожидания сообщений.

Опишем общую идею проекта.

PoohServer запускает два пула нитей, Первый пул используется для приема сообщений, а второй для отправки.

Общими ресурсами между этими пулами будут классы QueueService и TopicService.

Код класс QueueService уже реализован.

Рассмотрим QueueService по частям.

  1. Поля класса.
private final ConcurrentHashMap<String, CopyOnWriteArrayList<Receiver>> receivers = new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, BlockingQueue<String>> data = new ConcurrentHashMap<>();
private final Condition condition = new Condition();
receivers - этот контейнер служит для регистрации потребителей. 

data - этот контейнер используется для аккумуляции сообщений от поставщика.

condition - многопоточный блокирующий флаг. С помощью этого флага мы регулируем работу очередей в контейнере data.

  1. Аккумуляция сообщений.
@Override
public void addReceiver(Receiver receiver) {
    receivers.putIfAbsent(receiver.name(), new CopyOnWriteArrayList<>());
    receivers.get(receiver.name()).add(receiver);
    condition.on();
}

Основная структура для хранения сообщений - это карта. Ключ в карте - это имя контейнера, а значение - это многопоточная очередь.

  1. Рассылка сообщений.
@Override
public void run() {
    while (!Thread.currentThread().isInterrupted()) {
        for (var queueKey : receivers.keySet()) {
            var queue = data.getOrDefault(queueKey, new LinkedBlockingQueue<>());
            var receiversByQueue = receivers.get(queueKey);
            var iterator = receiversByQueue.iterator();
            while (iterator.hasNext()) {
                var data = queue.poll();
                if (data != null) {
                    iterator.next().receive(data);
                }
                if (data == null) {
                    break;
                }
                if (!iterator.hasNext()) {
                    iterator = receiversByQueue.iterator();
                }
            }
        }
        condition.off();
        try {
            condition.await();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

Наиболее сложный код находиться в методе run(). Метод run используются пулом нитей в классе PoohService.

Если сервис остановили, то нитям придет уведомление, что нить прервана и нужно выйти из цикла.

Цикл for перебирает всех зарегистрированный потребителей и отправляет им сообщение. Обратите внимание, что тут используется метод poll(), который не блокирует нить выполнения, а отправляет null, если очередь пустая.

Если все очереди пустые, то флаг переходить в режим false и нить выполнения блокируется до момента появления новых сообщений.

Обратите внимание, что после выставления флага в режим false идет дополнительная проверка в цикле while. Этот момент будет гарантировать ситуацию, что если после выставление флага в режим false в очередь добавили еще сообщения, то нить не будет блокироваться, а снова будет отправлять сообщения.

Задание

  1. Создайте репозиторий job4j_pooh. Перенесите каркас кода в этот репозиторий.

  2. Реализуйте код внутри класса TopicService. Код в остальных классах уже написан и их изменять или добавлять не надо.

  3. Для тестирования используйте класс TopicServiceTest. В нем находятся необходимые сценарии.

  4. Код в классе TopicService будет аналогичен коду в классе QueueService.

  5. Загрузите код. Оставьте ссылку на коммит. Переведите ответственного на Арсентьева Петра

Clone this wiki locally