что такое реактивное программирование java

Реактивное программирование со Spring, часть 1 Введение

Это первая часть серии заметок о реактивном программировании, в которой представлен обзор различных концепций реактивного программирования и их истории.

1. Почему реактивное программирование

Реактивное программирование существует уже некоторое время, но за последние пару лет оно стало вызывать гораздо больший интерес. Причина этого связана с тем фактом, что традиционное императивное программирование имеет некоторые ограничения, когда дело доходит до удовлетворения требований сегодняшнего дня, когда приложения должны иметь высокую доступность и обеспечивать низкое время отклика даже при высокой нагрузке.

1.1 Модель поток на запрос

Если приложение разработано в соответствии с архитектурой на основе микросервисов, у нас есть лучшие возможности для масштабирования в зависимости от нагрузки, но за высокое использование памяти по-прежнему приходится платить. Таким образом, модель потока на запрос может стать довольно дорогостоящей для приложений с большим количеством одновременных запросов.

Важной характеристикой архитектур на основе микросервисов является то, что приложение распределено, выполняется большое количество отдельных процессов, обычно на нескольких серверах. Использование традиционного императивного программирования с синхронными вызовами запроса/ответа для взаимодействия между службами означает, что потоки часто блокируются в ожидании ответа от другой службы, что приводит к огромной трате ресурсов.

1.2 Ожидание операций ввода/вывода

Такой же тип потерь также возникает при ожидании завершения других типов операций ввода-вывода, таких как вызов базы данных или чтение из файла. Во всех этих ситуациях поток, выполняющий запрос ввода-вывода, будет заблокирован и будет ожидать, пока операция ввода-вывода не будет завершена, это называется блокирующим вводом-выводом. Такие ситуации, когда выполняющийся поток блокируется, просто ожидая ответа, означают потерю потоков и, следовательно, потерю памяти.

1.3 Время ответа

Другой проблемой традиционного императивного программирования является время отклика, когда службе необходимо выполнить более одного запроса ввода-вывода. Например, службе A может потребоваться вызвать службы B и C, а также выполнить поиск в базе данных, а затем вернуть в результате некоторые агрегированные данные. Это будет означать, что время ответа службы A, помимо времени ее обработки, будет суммой следующих значений:

время отклика услуги B (задержка сети + обработка)

время отклика службы C (задержка сети + обработка)

время ответа на запрос к базе данных (сетевая задержка + обработка)

Если нет никакой реальной логической причины выполнять эти вызовы последовательно, то, безусловно, если эти вызовы будут выполняться параллельно, это очень положительно повлияет на время отклика службы А. Несмотря на то, что существует поддержка выполнения асинхронных вызовов в Java с использованием CompletableFutures и регистрации обратных вызовов, широкое использование такого подхода в приложении сделало бы код более сложным и трудным для чтения и поддержки.

1.4 Перегрузка клиента

1.5 Резюме

отходим от модели поток на запрос и можем обрабатывать больше запросов с небольшим количеством потоков

предотвращаем блокировку потоков при ожидании завершения операций ввода-вывода

упрощаем параллельные вызовы

поддерживаем «обратное давление», давая клиенту возможность сообщить серверу, с какой нагрузкой он может справиться

2. Что такое реактивное программирование

2.1 Определение

В документации Spring дано следующее краткое определение реактивного программирования:

2.2 Объяснение

Так как же всего этого достичь?

Вкратце: программированием с использованием асинхронных потоков данных. Допустим, служба A хочет получить некоторые данные из службы B. При подходе в стиле реактивного программирования служба A отправит запрос службе B, которая немедленно вернет управление (неблокирующий и асинхронный запрос). Затем запрошенные данные будут доступны службе A в виде потока данных, где служба B будет публиковать событие onNext для каждого элемента данных один за другим. Когда все данные будут опубликованы, об этом просигнализирует событие onComplete. В случае ошибки будет опубликовано событие onError, и больше никаких элементов не будет.

Реактивное программирование использует подход функционального стиля (аналогично Streams API), который дает возможность выполнять различные виды преобразований в потоках. Один поток можно использовать как вход для другого. Потоки можно объединять, отображать и фильтровать (операции merge, map и filter).

2.3 Реактивные системы

Создание реактивной системы означает решение таких вопросов, как разделение ответственности, согласованность данных, управление отказами, выбор реализации обмена сообщениями и т. д. Реактивное программирование может использоваться в качестве метода реализации систем, гарантирующего, что отдельные службы используют асинхронную неблокирующую модель. Но чтобы спроектировать систему реактивную в целом, также требуется архитектура, которая обеспечивает все эти аспекты.

3. История вопроса

3.1 REACTIVEX

ReactiveX использует сочетание шаблона Iterator и шаблона Observer из Gang of Four. Разница в том, что используется модель push по сравнению с обычным поведением итераторов на основе pull. Помимо наблюдения за изменениями, подписчику также сообщается о завершении и ошибках.

3.2 Спецификация реактивных потоков

Спецификация Reactive Streams включает следующие интерфейсы:

Publisher:

Он представляет производителя данных/источник данных и имеет один метод, который позволяет подписчику зарегистрироваться на издателе.

Subscriber:
Он представляет потребителя и имеет следующие методы:

onSubscribe должны вызываться Publisher перед началом обработки и использоватся для передачи на Subscription объекта от Publisher до Subscriber

onNext используется для того, чтобы сигнализировать о том, что был отправлен новый элемент

onError используется для того, чтобы сигнализировать о том, что произошел сбой Publisher и больше никаких элементов не будет

onComplete используется для того, чтобы сигнализировать, что все элементы были успешно отправлены

Subscription:
Subscription содержат методы, которые позволяют клиенту управлять выдачей элементов Publisher (т.е. обеспечивать поддержку противодавления).

request позволяет Subscriber сообщить, Publisher сколько дополнительных элементов будет опубликовано

cancel позволяет подписчику отменить дальнейшую отправку элементов Publisher

3.3 PROJECT REACTOR

Spring Framework поддерживает реактивное программирование, начиная с версии 5. Эта поддержка построена на основе Project Reactor.

Источник

Введение в реактивное программирование

Здравствуйте. В этой статье я пробегусь галопом по Европам, а именно — расскажу, что понимают под реактивным программированием, познакомлю с акторами, реактивными потоками, и наконец, при помощи реактивных потоков мы сделаем распознавание мышиных жестов, как в старой Opera и её духовном наследнике — Vivaldi.

Цель — познакомить с основными концепциями реактивного программирования и показать, что не всё так сложно и страшно, как может показаться на первый взгляд.


Источник

Что такое реактивное программирование?

Чтобы ответить на этот вопрос, обратимся к сайту. На нём есть красивая картинка, на которой показаны 4 основных критерия, которым должны соответствовать реактивные приложения.

Приложение должно быть быстрым, отказоустойчивым и хорошо масштабироваться.
Выглядит как «мы за всё хорошее против всего плохого», верно?

Что подразумевается под этими словами:

Модель акторов

Основные вехи развития:

Что умеют акторы?

Акторы — это те же объекты, но:

Актор А хочет отправить сообщение актору Б. Всё, что у него есть — ActorRef (некий адрес). Актор Б может находиться где угодно.
Актор А отправляет письмо Б через систему (ActorSystem). Система кладёт письмо в почтовый ящик актора Б и «будит» актор Б. Актор Б берёт письмо из ящика и что-то делает.

По сравнению с вызовом методов у другого объекта, выглядит излишне сложно, но модель акторов прекрасно ложится на реальный мир, если представить, что акторы — это люди, которые обучены что-то делать в ответ на определённые раздражители.

Представим себе отца и сына:

Отец шлёт сыну СМСку «Убери в комнате» и продолжает заниматься своими делами. Сын читает СМСку и начинает уборку. Отец тем временем играет в покер. Сын заканчивает уборку и шлёт СМС «Готово». Выглядит просто, верно?

Теперь представим, что отец и сын не акторы, а обычные объекты, которые могут дёргать методы друг у друга. Отец дёргает сына за метод «убери в комнате» и следует за ним по пятам, ожидая, пока сын не закончит уборку и не передаст управление обратно отцу. Играть в покер в это время отец не может. В этом контексте модель акторов становится более привлекательной.

Akka.NET

Всё, что написано ниже, справедливо и для оригинального Akka для JVM, но для меня C# ближе, чем Java, поэтому я буду рассказывать на примере Akka.NET.

Итак, какие преимущества есть у Akka?

Обработка ошибок

У акторов есть иерархия — её можно представить в виде дерева. У каждого актора есть родитель и могут быть «дети».


Akka.NET documentation Copyright 2013-2018 Akka.NET project

Для каждого актора можно установить Supervision strategy — что делать, если у «детей» что-то пошло не так. Например, «прибить» актор, у которого возникли проблемы, а затем создать новый актор того же типа и поручить ему ту же работу.

Для примера я сделал на Akka.net CRUD приложение, в котором слой «бизнес-логики» реализован на акторах. Задачей этого проекта было узнать, стоит ли использовать акторы в немасштабируемых системах — сделают ли они жизнь лучше или добавят ещё боли.

Как может помочь встроенная обработка ошибок в Akka:

И немного кода. Так выглядит инициализация системы акторов в IoC контейнере:

EchoActor — самый простой актор, который возвращает значение отправителю:

Для связи акторов с «обычным» кодом используется команда Ask:

Итого

Похимичив с акторами, могу сказать:

Часть 2: Реактивные потоки

А теперь перейдём к более популярной и полезной теме — реактивным потокам. Если с акторами в процессе работы можно никогда не повстречаться, то Rx потоки обязательно пригодятся как во фронтенде, так и в бэкенде. Их реализация есть почти во всех современных языках программирования. Я буду приводить примеры на RxJs, так как в наше время даже бэкенд программистам порой приходится что-то делать на JavaScript.


Rx-потоки есть для всех популярных языков программирования

Чтобы объяснить, что такое реактивный поток, я начну с Pull и Push коллекций.

Single return value Multiple return values
Pull
Synchronous
Interactive
T IEnumerable
Push
Asynchronous
Reactive
Task IObservable

Pull коллекции — это то, к чему мы все привыкли в программировании. Самый яркий пример — массив.

В нём уже есть данные, сам он эти данные не поменяет, но может отдать по запросу.

Также перед тем, как что-то делать с данными, можно их как-то обработать.

А теперь давайте представим, что изначально в коллекции нет данных, но она обязательно сообщит о том, что они появились (Push). И в то же время мы всё так же можем к этой коллекции применять нужные трансформации.

Когда в source появится значение, например, 1, console.log выведет “my number is 1”.

Появляется новая сущность — Subject (или Observable):

Это и есть push-коллекция, которая будет рассылать уведомления об изменении своего состояния.

В данном случае в ней сразу появятся числа 1, 2 и 3, через секунду 4, а затем коллекция «завершится». Это такой особый тип события.

Вторая сущность — это Observer. Он может подписаться на события Subject’a и что-то сделать с полученными данными. Например:

Тут видно, что у одного Subject может быть много подписчиков.

Выглядит несложно, но пока не совсем понятно, для чего это нужно. Я дам ещё 2 определения, которые нужно знать, работая с реактивными потоками, а затем покажу на практике, как они работают и в каких ситуациях раскрывается весь их потенциал.

Cold observables

Hot observables

В каких ситуациях использовать реактивные потоки?

Когда есть поток данных, распределённый во времени. Например, пользовательский ввод. Или логи из какого-либо сервиса. В одном из проектов я видел самописный логгер, который собирал события за N секунд, а затем единовременно записывал всю пачку. Код аккумулятора занимал страницу. Если бы использовались Rx потоки, то это было бы намного проще:


“RxJs Reference / Observable, documentation licensed under CC BY 4.0.
(тут много примеров и картинки, поясняющие, что делают различные операции с реактивными потоками)

И, наконец, пример использования.

Распознавание мышиных жестов при помощи Rx потоков

В старой Опере или её духовном наследнике — Vivaldi — было управление браузером при помощи мышиных жестов.

То есть нужно распознавать движения мышью вверх/вниз, вправо/влево и их комбинации. Это можно написать без Rx потоков, но код будет сложным и трудноподдерживаемым.

А вот как это выглядит с Rx потоками:

Начну с конца — задам, какие данные и в каком формате я буду искать в исходной последовательности:

Это единичные векторы и их комбинации.

Далее нужно преобразовать события мыши в Rx потоки. Во всех Rx библиотеках есть встроенные инструменты для превращения стандартных ивентов в Observables.

Далее, я группирую координаты мыши по 2 и нахожу их разницу, получая смещение мыши.

И группирую эти движения, используя события ‘mousedown’ и ‘mouseup’.

Функция concat вырезает слишком короткие движения и группирует движения, примерно совпадающие по направлению.

Если движение по оси X или Y слишком короткое, оно обнуляется. А затем от полученных координат смещения остается только знак. Таким образом, получаются единичные векторы, которые мы искали.

При помощи sequenceEqual можно сравнить полученные движения с исходными и, если есть совпадение, выполнить определённое действие.

Обратите внимание, что, кроме распознавания жестов, здесь есть ещё отрисовка как изначальных, так и нормализованных движений мыши на HTML canvas. Читаемость кода от этого не страдает.

Из чего следует ещё одно преимущество — функционал, написанный при помощи Rx потоков, может быть легко дополнен и расширен.

Источник

Реактивное программирование с JAX-RS

Последний в этом году курс «Разработчик Java Enterprise» успешно запущен и у нас остался последний материал по данной теме, которым мы хотим поделиться с вами, где разбирается использование асинхронного подхода и стейджинга для разработки отзывчивых реактивных приложений.

Реактивное программирование сперва звучит, как название зарождающейся парадигмы, но на самом деле, относится к методу программирования, в котором для работы с асинхронными потоками данных используется событийно-ориентированный подход. Основываясь на постоянно текущих данных, реактивные системы реагируют на них путем выполнения ряда событий.
Реактивное программирование следует шаблону проектирования “Наблюдатель”, который можно определить следующим образом: если в одном объекте происходит изменение состояния, то все прочие объекты оповещаются и обновляются соответствующим образом. Поэтому, вместо того, чтобы опрашивать события на предмет изменений, события пушатся асинхронно, чтобы наблюдатели могли их обработать. В этом примере, наблюдатели — функции, которые исполняются, когда событие отправлено. А упомянутый поток данных — фактический наблюдаемый.

Почти все языки и фреймворки используют этот подход в своей экосистеме, и последние версии Java — не исключение. В этой статье я объясню как можно применить реактивное программирование, используя последнюю версию JAX-RS в Java EE 8 и функционал Java 8.

В Реактивном Манифесте перечислены четыре фундаментальных аспекта, необходимых приложению, чтобы быть более гибким, слабо связанным и простым для масштабирования, а следовательно и способным быть реактивным. В нем говорится, что приложение должно быть отзывчивым, гибким (а значит и масштабируемым), устойчивым и message-driven.

Основополагающая цель — действительно отзывчивое приложение. Предположим, есть приложение, в котором обработкой запросов пользователей занимается один большой поток, и после выполнения работы этот поток отправляет ответы обратно оригинальным запрашивателям. Когда приложение получает больше запросов, чем может обработать, этот поток становится bottleneck’ом, и приложение теряет свою былую отзывчивость. Чтобы сохранить отзывчивость, приложение должно быть масштабируемым и устойчивым. Устойчивым можно считать приложение, в котором есть функционал для авто-восстановления. По опыту большинства разработчиков, только message-driven архитектура позволяет приложению быть масштабируемым, устойчивым и отзывчивым.

JAX-RS 2.1 Reactive Client API

Посмотрим, как реактивное программирование может использоваться в приложениях Java EE 8. Чтобы разобраться в процессе, нужны базовые знания API Java EE.

JAX-RS 2.1 представил новый способ создания REST клиента с поддержкой реактивного программирования. Дефолтная реализация invoker, предлагаемая в JAX-RS — синхронная, это значит, что создаваемый клиент отправит блокирующий вызов точке назначения (endpoint) сервера. Пример реализации представлен в Listing 1.

Добавление реактивности в точку endpoint REST

Реактивный подход не ограничивается клиентской стороной в JAX-RS; его можно использовать и на стороне сервера. Для примера, сперва я создам простой сценарий, где смогу запросить список местоположений одной точки назначения. Для каждого положения я сделаю отдельный вызов с данными местоположения до другой точки, чтобы получить значения температуры. Взаимодействие точек назначения будет таким, как показано на Figure 1.


Figure 1. Взаимодействие между точками назначения

Для обертки списка прогнозов, класс ServiceResponse имплементирован в Listing 6.

Сначала я покажу реализацию синхронного ForecastResource (смотрите Listing 9), который выдает все местоположения. Затем, для каждого положения он вызывает температурный сервис, чтобы получить значения в градусах по Цельсию.

Пока все идет по плану. Настало время ввести реактивное программирование на серверной стороне, где вызовы к каждой локации могут выполняться параллельно после получения всех местоположений. Это явно может улучшить синхронный поток, показанный ранее. Это выполняется в Listing 11, где показано определение реактивной версии сервиса прогнозов.

Конечно, реактивное программирование заставляет только серверную сторону выполняться асинхронно; клиентская сторона будет заблокирована до тех пор, пока сервер не отправит ответ обратно запрашивающему. Чтобы преодолеть эту проблему, Server Sent Events (SSEs) может быть использован для частичной отправки ответа, как только он окажется доступен, чтобы температурные значения для каждой локации передавались клиенту одно за другим. Вывод ForecastReactiveResource будет похож на тот, что представлен в Listing 12. Как показано в выводе, время обработки составляет 515 мс, что является идеальным временем выполнения для получения температурных значений из одной локации.

Реактивное программирование — это больше, чем просто реализация асинхронной модели из синхронной; оно также упрощает работу с такими концепциями, как nesting stage. Чем больше оно используется, тем проще будет управлять сложными сценариями в параллельном программировании.

Спасибо за внимание. Как всегда ждём ваши комментарии и вопросы.

Источник

Осваиваем реактивное программирование на Java

Асинхронный ввод/вывод уже какое-то время используется в обиходе. При этом разные языки реализуют его по-разному, но все предоставляют способ уменьшить количество потоков, давая вроде бы полную конкурентность. JavaScript занимался этим с самого начала. При использовании всего одного потока будет мало хорошего, отправь вы в продакшн блокирующий вызов.

Несмотря на то, что реактивный Java все больше привлекает интерес разработчиков, большинство знакомых мне программистов по-прежнему живут в многопоточной парадигме. Почему? Принцип потоков относительно легко усвоить. Реактивное же программирование требует переосмысления многих привычных нам принципов программирования. Попытка объяснить, почему асинхронный ввод/вывод является лучшей альтернативой, подобна попытке объяснить сферичность Земли тому, кто всегда верил в ее плоскую природу.

Я предпочитаю обучаться через игру и эксперименты, создавая “игрушечные” системы, которые затем при необходимости можно использовать в качестве основы для больших систем. Здесь я представлю одну такую базовую систему, которая продемонстрирует основы реактивного Java, используя Project Reactor. А поскольку она мала (меньше тысячи строк в девяти файлах), то и понять ее будет несложно.

Это система псевдо-микросервисов. То есть несмотря на то, что все заключено в один исполняемый файл, каждый “сервис” находится в отдельном классе, который не содержит состояния, а сами классы могут взаимодействовать друг с другом только с помощью очереди сообщений и базы данных. Их все запускает один основной класс, при этом они программируются на завершение по истечению установленного периода времени.

Моя система смоделирована на основе схемы приема подержанных транспортных средств (ТС) и начинается с заказа на покупку, который в нашем случае будет случайно генерироваться и добавляться в очередь сообщений. Другой сервис получает заказ на покупку из этой очереди, добавляет его в базу данных и отправляет информацию о типе ТС в одну из трех очередей сообщений, в зависимости от того легковой это автомобиль, грузовик или мотоцикл. В завершении есть еще три сервиса, которые считывают эти три очереди, проверяют наличие заказа в базе данных и определяют автомобилю парковочное место для транспортировки.

Функция main выступает больше в качестве функции тестирования, поскольку в настоящей системе микросервисов каждый из них должен иметь собственную функцию main или быть частью такого фреймворка, как Spring. Однако нам нужно лишь увидеть, что все согласованно работает в одной JVM. Настоящий тест этой системы заключается в наблюдении работоспособности всех сервисов по-отдельности, но я предположу, что это сработает и так либо потребует минимальных корректировок. Вот суть функции main :

Мы стартуем всех потребителей, после чего запускаем генератор заказа на покупку, который создает заказы и поочередно помещает их в очередь в течение 10 секунд. Поскольку ни у одного из классов нет состояния, все их методы могут быть статическими.

Далее метод delayElements добавляет задержку между каждым элементом. Я делаю это для симуляции появления случайных заказов с разумной скоростью. Можете без проблем его удалить, но тогда вместо выполнения системы в течение десяти секунд и завершения, она будет заполняться очередями сообщений о заказах на покупку, для обработки которых потребуется уже более двух минут. Метод take позволит Flux выполнять обработку на протяжении десяти секунд и затем останавливаться. Я так делаю, потому что этот генератор служит исключительно для тестирования. В реальной же системе заказы на покупку наверняка будут поступать из API, который будет добавлять их в очередь сообщений.

У вас голова еще не взорвалась? На это можно посмотреть так. Внутри многозадачной операционной системы нечто наблюдает за ожидающими процессами и решает, какой из них очередным получит внимание ЦПУ. Планировщик работает аналогичным образом, только ему не нужно беспокоиться о прерываниях. Все задачи, с которыми работает планировщик, представляют очень мелкие фрагменты кода. “Блокирующие” задачи вроде delayElements разбиваются на две части. Первая часть получает элемент из потока, устанавливает таймер и возвращает управление планировщику. Далее планировщик может выполнять какую-нибудь другую задачу. Как только таймер метода delayElements истекает, вызывается вторая часть и сохраненный элемент получает возможность перемещения в следующий метод цепочки.

Я рассматриваю этот процесс как две непрерывные конвейерные ленты. Первая перемещает элементы по направлению ко второй, но вторая движется намного медленнее. В итоге элементы задерживаются на первой ленте, в то время как вторая их постепенно с нее снимает. По мере накопления задерживающихся элементов возникает так называемое “обратное давление”, и система понимает, что нужно просто остановить первую ленту, пока вторая ее немного не догонит.

Осталось проговорить последний метод цепочки, остальные же аналогичны предыдущим. Если бы речь шла о реальном сервисе, то метод timeout даже не должен был бы находиться в потоке. У нас он присутствует только, чтобы отменять поток, когда в рамках заданного временного промежутка не происходит получение элемента. Это позволяет нашей небольшой игрушечной системе все закрывать и в конечном итоге завершаться.

В реальном сервисе такого бы у вас наверняка не было. Единственное исключение возможно, если сервис ожидает определенное количество элементов в секунду, и в течение нескольких минут не получает ни одного. Тогда вы можете заподозрить, что было потеряно соединение с очередью, и захотите, чтобы служба оркестровки, например Kubernetes, перезапустила ваш сервис.

Еще один вариант применения таймаута — это тестирование кода. Если ваш тест ожидает элемент из очереди сообщений, то вы можете установить таймаут так, чтобы тест не ждал бесконечно.

Я создал три класса сервисов, по одному для каждого типа; Car (легковое авто), Truck (грузовик) или Motorcycle (мотоцикл). В примере текущего кода я мог сделать один класс с рядом параметров и запустить метод получения три раза, по одному для каждого типа. Но я оставил их отдельными, так как предвидел, что для каждого типа потребуется своя логика, кардинальность типов окажется очень низка, а базовый код короче сотни строк. Если бы у сервисов было много общей логики, я бы не пожалел лишнего времени на их объединение. Но мой прагматизм иногда превосходит перфекционизм, и мне это нравится.

Чтобы все проверить, пришлось запустить CoucheBase и RabbitMQ. В обоих случаях я просто создал экземпляр Docker:

Сборка прошла успешно! Если вы соберете и запустите систему, то советую внимательно проследить за логами, все из которых выводят текущий поток. Вскоре вы заметите паттерн маленьких конвейерных лент (некоторые будут выполняться параллельно), о котором я говорил.

Источник

Понравилась статья? Поделиться с друзьями:

Не пропустите наши новые статьи:

  • что такое реакт в программирование
  • что такое расширение файла программного кода c
  • что такое распарсить в программировании
  • Что такое раскладка клавиатуры windows 10
  • что такое рантайм в программировании

  • Операционные системы и программное обеспечение
    0 0 голоса
    Рейтинг статьи
    Подписаться
    Уведомить о
    guest
    0 комментариев
    Старые
    Новые Популярные
    Межтекстовые Отзывы
    Посмотреть все комментарии