How-to: Создаем приложение для обработки сложных событий на основе Apache Spark и Drools

Интеграция платформы CDH и системы управления бизнес-правилами может послужить хорошей основой для создания системы обработки сложных событий в контексте больших данных.

Обработка событий включает в себя отслеживание и анализ потоков данных, генерируемых событиями, с целью получения полезной информации и обеспечения эффективности принятия решений. Учитывая стремительное увеличение объемов данных и разнообразие их источников, при реализации этого процесса могут возникать существенные трудности.

Обработка сложных событий (ОСС, complex event processing, CEP) – это процесс обработки множества потоков данных из различных источников с целью выявления закономерностей и сложных взаимосвязей между различными событиями. Система ОСС позволяет обнаруживать возможности и угрозы и своевременно реагировать на них, благодаря оповещениям, генерируемым в режиме реального времени. В настоящее время ОСС находит широкое применение во многих сферах экономической деятельности:

  • Финансы: анализ торговли, выявление мошенничества.
  • Авиаперевозки: мониторинг операций.
  • Здравоохранение: обработка страховых требований, мониторинг пациентов.
  • Энергетика и телекоммуникации: выявление аварий на линиях.

Как и все процессы в мире аналитики, ОСС существенно осложняется в результате экспоненциального роста объемов данных. К счастью, CDH (открытая платформа компании Cloudera, в состав которой входят Apache Hadoop, Apache Spark [ссылка] и др.) может служить основой для системы ОСС в контексте больших данных посредством интеграции с открытой системой управления бизнес-правилами (СУБП, business rule management system, BRMS). В этой статье мы рассмотрим, как реализовать такую интеграцию.

Архитектура

На рисунке ниже представлен пример высокоуровневой архитектуры системы ОСС на базе CDH. В зависимости от решаемых задач компоненты могут отличаться.

Архитектура основана на следующих компонентах:

  • Прием событий. Apache Kafka или Apache Flume.
  • Хранилище. Apache HBase (в будущем, возможно, Kudu) для хранения и извлечения событий.
  • Оповещения. Оповещения посредством Kafka или с помощью прямой интеграции через API.
  • Потоковая обработка. Обработка событий выполняется с помощью Spark Streaming на основе микропакетов и включает в себя следующие этапы: синтаксический анализ, поиск, сохранение, формирование текущего состояния на основе последовательности исторических событий, применение пользовательской логики. Например, можно объединить потоки от различных RDD в интервале скользящего окна (sliding-window) и получать информацию о тенденциях почти в режиме реального времени. Пакетирование может выполняться каждую секунду, в результате чего общая задержка будет менее нескольких секунд.
  • Управление бизнес-процессами. Система управления бизнес-правилами дает пользователям различной специализации возможность разрабатывать сложную бизнес-логику. Бизнес-пользователи могут непосредственно участвовать в создании гибкого процесса автоматизации принятия решений, подчиняющегося сложным эволюционирующим бизнес-правилам. В этой статье мы рассмотрим интеграцию Spark и СУБП Drools [ссылка], являющейся одной из наиболее популярных систем такого рода.
  • Статистика. Информационная панель на основе СУБД для временных рядов (time series database, TSDB), например, Аналогичные возможности можно получить с помощью Cloudera Search + Hue.

Чтобы не усложнять пример, мы будем работать с данными, сгенерированными случайным образом. Входящие события и результаты анализа можно сохранять в HBase.

В примере мы будем анализировать события на основе критериев, позволяющих выявить у пациента сепсис различной тяжести. Данные критерии заимствованы со следующего ресурса и обслуживаются с помощью Drools.

Критерии для выявления у пациента синдрома системной воспалительной реакции организма (ССВР, SIRS), сепсиса (sepsis) и септического шока (septic shock). Позволяют определить степень тяжести сепсиса.

 

Как следует из рисунка выше, ССВР имеет место, если любые два показателя выходят за указанные пределы. В общем случае, значения показателей поступают в различное время, поэтому обычно ()е время, поэтому мых диапазонов.ериев, для выявления ()олучить необходимую каждый раз, когда приходит новое значение некоторого показателя, необходимо воссоздавать состояние пациента с помощью HBase и выполнять проход по правилам. В нашем примере давайте предположим, что мы получаем все показатели пациента в каждом событии. Соответственно, не будет необходимости воссоздавать состояние пациента.

Если у пациента выявлен ССВР, далее необходимо проверить наличие сепсиса, тяжелого сепсиса, септического шока и т.д. в указанном порядке. Ниже представлена схема выполнения правил:

Правила Drools могут быть заданы в файле .drl или в файле Excel. Чтобы учесть все критерии и обеспечить дружественность интерфейса для бизнес-пользователей, в примере мы будем использовать таблицы принятия решений (decision table), поддерживаемые Drools. Такой подход обеспечивает доступ к бизнес-логике для более широкого круга пользователей (включая бизнес-аналитиков), что более эффективно по сравнению со встраиванием бизнес-логики в код на Java/Scala или в пользовательский синтаксис.

Далее представлена таблица принятия решений, соответствующая данному сервису .

На рисунке выше:

  • Розовые ячейки содержат ссылки на код.
  • Оранжевые ячейки содержат значения, которые устанавливаются при выполнении данного правила.
  • Зеленые ячейки содержат значения или диапазоны значений, на основе которых проверяется соответствие входящих показателей критериям правил.
  • Синие ячейки содержат названия правил и их критериев.

Перечислим некоторые цели интеграции Spark и Drools:

  • Непосредственное выполнение правил из Spark/Streaming.
  • Для простоты можно использовать ту часть СУБП, которая не сохраняет состояния (stateless). Для хранения состояний, охватывающих определенный период времени, можно применять скользящие окна Spark.
  • Выполнение правил в последовательном или произвольном порядке, в зависимости от требований.
  • Направление результатов выполнения правил в Spark DataFrame для расчета статистики.

Код

Перечисленные выше цели реализуются посредством следующих шагов и соответствующих фрагментов кода. Весь код можно найти по ссылке https://github.com/mganta/sprue.

  1. Инициализация выполняется однократно для каждого раздела с целью повторного использования для всех последующих потоков DStream.
    KieContainer kContainer = kieServices.newKieContainer(kieRepository.getDefaultReleaseId());
    kContainer.newStatelessKieSession();
  2. Сохраним входные данные в HBase.
    //store incoming data in hbase
    hbaseContext.streamBulkPut[Patient](patientStream, patientTable,  HBaseUtil.insertIncomingDataIntoHBase, true)
  3. Для каждого события в RDD выполним все правила и вернем RDD с результатами.
    //evaluate all the rules for the batch
        patientStream.foreachRDD(rdd => {
         val evaluatedPatients = rdd.mapPartitions(partitionOfRecords => {
            val ksession = KieSessionFactory.getKieSession(xlsFileName)
            val patients = partitionOfRecords.map(patient => {
              ksession.execute(patient)
              patient
            })
            patients
          })
  4. Преобразуем RDD в DataFrame и рассчитаем статистику.
    /convert to dataframe
            val patientdf = sqc.applySchema(evaluatedPatients, classOf[Patient])
            //compute statistics
            val countMatrix = patientdf.groupBy("location").agg(max("evaluationDate"), sum("sirsFlag"), sum("sepsisFlag"), sum("severeSepsisFlag"), sum("septicShockFlag"), sum("organDysfunctionSyndrome"))
           countMatrix.show()
  5. Сохраним изменения в HBase.
    hbaseContext.streamBulkPut[Patient](patientStream, patientTable,  HBaseUtil.insertEvaluatedDataIntoHBase, true
  6. Вызовем REST API временного ряда и методом POST отправим статистику микропакета. Информационная панель способна читать эти данные. (Установка OpenTSDB описана здесь [ссылка].)
    //opentsdb update statistics
    countMatrix.foreach(row => {
    TSDBUpdater.loadPatientStats(row.getString(0), row.getLong(1), row.getLong(2), row.getLong(3), row.getLong(4), row.getLong(5), row.getLong(6))
      })

Все вышеперечисленные шаги объединены в коде Spark-драйвера здесь.

В нашем примере для генерации потоков случайных данных используется QueueStream. В реальном случае мы бы получали данные каждого события в виде сообщения в формате HL7. Запустив пример, мы видим, что правила выполняются для каждого входящего события, и для каждого штата выводятся сгруппированные статистические характеристики.

Total Patients in batch: 212
Patients with atleast one condition: 137
+--------+-------------------+-------------+---------------+---------------------+--------------------+-----------------------------+
|location|MAX(evaluationDate)|SUM(sirsFlag)|SUM(sepsisFlag)|SUM(severeSepsisFlag)|SUM(septicShockFlag)|SUM(organDysfunctionSyndrome)|
+--------+-------------------+-------------+---------------+---------------------+--------------------+-----------------------------+
|      MS|      1443198199233|            3|              2|                    2|                   0|                            0|
|      NE|      1443198199233|            8|              4|                    4|                   1|                            0|
|      TX|      1443198199233|           10|              8|                    8|                   7|                            1|
|      NM|      1443198199232|            8|              6|                    6|                   3|                            0|
|      NY|      1443198199233|            6|              4|                    3|                   3|                            0|
|      OK|      1443198199233|            7|              5|                    3|                   1|                            0|
|      VA|      1443198199232|           16|             14|                   12|                   7|                            1|
|      IL|      1443198199233|            5|              3|                    3|                   1|                            0|
|      CA|      1443198199233|            7|              6|                    6|                   4|                            0|
|      KS|      1443198199233|           12|              8|                    8|                   6|                            0|
|      LA|      1443198199233|           14|              8|                    7|                   2|                            1|
|      SC|      1443198199233|           12|              9|                    6|                   4|                            0|
|      FL|      1443198199233|            7|              4|                    4|                   2|                            0|
|      MN|      1443198199233|            8|              5|                    5|                   2|                            0|
|      GA|      1443198199233|           14|             12|                   12|                   6|                            0|
+--------+-------------------+-------------+---------------+---------------------+--------------------+-----------------------------+
 
 
 
Total Patients in batch: 247
Patients with atleast one condition: 170
+--------+-------------------+-------------+---------------+---------------------+--------------------+-----------------------------+
|location|MAX(evaluationDate)|SUM(sirsFlag)|SUM(sepsisFlag)|SUM(severeSepsisFlag)|SUM(septicShockFlag)|SUM(organDysfunctionSyndrome)|
+--------+-------------------+-------------+---------------+---------------------+--------------------+-----------------------------+
|      MS|      1443198199242|            1|              1|                    1|                   1|                            0|
|      NE|      1443198199241|           14|             11|                   11|                   5|                            1|
|      TX|      1443198199242|           13|             10|                    9|                   6|                            0|
|      NM|      1443198199241|           11|              4|                    3|                   2|

Если настроить информационную панель OpenTSDB, мы увидим следующее:

Заключение

При создании сложных систем использование СУБП является хорошим решением. Разделение данных и бизнес-логики обеспечивает гибкость дизайна, в то время как специалисты из предметной области получают возможность лучше понять логику принятия решений. Как мы увидели на примере, интеграция CDH (Spark, HBase, Kafka) и системы управления бизнес-правилами позволяет реализовать сложную бизнес-логику и реагировать на события в режиме реального времени.

По материалам: Cloudera

Добавить комментарий

Ваш e-mail не будет опубликован.

закрыть

Поделиться

Отправить на почту
закрыть

Вход

закрыть

Регистрация

+ =