Разработка сайта для Вашего бизнеса. Веб дизайн. Дизайн логотипа, фирменного стиля, рекламная фотография . Комплексный рекламный креатив.

Ralex. We do the work.
На рынке с 1999го года. Средняя ценовая категория. Ориентация на эффективность решений.
Ознакомтесь с нашим портфолио
Узнайте больше о услугах
Свяжитесь с нами:
E-mail: [email protected]
Tel: (044) 587 - 84 - 78
Custom web design & дизайн и разработка сайта "под ключ"
Креативный, эффективный дизайн. Система управления сайтом (СУС).
Custom flexible разработка систем электронной коммерции
Система e-commerce разрабатывается под индивидуальные потребности. Гибкая функциональность.
Search Engine Optimzation & оптимизация под поисковые системы (SEO)
Постоянная оптимизация и мониторинг сайта в поисковых системах. Достигаем результата быстро и эффективно
Custom logo design & дизайн логотипа и фирменного стиля
Многолетний опыт. Огромное портфолио. Уникальное предложение и цена.
профессиональная рекламная фотография
креативно, смело, качественно
Custom logo design & рекламный креатив. дизайн рекламы
Многолетний опыт. Огромное портфолио. Уникальное предложение и цена.

Подача актуальних даних з баз даних в додатки InfoSphere Streams

  1. огляд
  2. Малюнок 1. Рух даних з бази даних до споживачів
  3. Визначення таблиць і кортежів
  4. Таблиця 1. RATED_CDR
  5. Таблиця 2. CUST_THRESHOLD
  6. Лістинг 1. rated_cdrT
  7. Лістинг 2. cust_thresholdT
  8. попередні вимоги
  9. варіанти архітектури
  10. Примітки
  11. Сценарій 1: Проміжна запис в текстові файли
  12. Малюнок 2. Проміжна запис в текстові файли
  13. Малюнок 3. Властивості InfoSphere DataStage
  14. попередження
  15. аспекти реалізації
  16. Передача в TCP / IP-
  17. Міркування щодо реалізації
  18. Передача в іменованій канал
  19. Міркування щодо реалізації
  20. Конфігурація програми користувальницького виходу
  21. Розгортання програми користувальницького виходу
  22. Конфігурація властивостей програми користувальницького виходу
  23. Налаштування реплікації
  24. Малюнок 6. Передплатити
  25. Малюнок 7. Зіставлення таблиць - вибір типу зіставлення
  26. Малюнок 8. Зіставлення таблиць - вибір вихідної таблиці
  27. Малюнок 9. Зіставлення таблиць - вибір цільової таблиці
  28. Малюнок 10. Створення цільової таблиці
  29. Малюнок 11. Визначення стовпців
  30. Малюнок 12. SQL-оператор створення таблиці
  31. Малюнок 13. Вибір цільової таблиці
  32. Малюнок 14. Визначення ключів
  33. Малюнок 16. Зіставлення таблиця
  34. Малюнок 17. Визначення користувацького виходу на рівні підписки
  35. Малюнок 19. Визначення користувацького виходу на рівні рядків
  36. Тестування реплікації і програми для користувача виходу
  37. Тестування приймача TCP / IP
  38. Лістинг 3. Запуск слухача
  39. Малюнок 20. Журнал подій приймача підписки для з'єднання TCP / IP
  40. Лістинг 4. Список отриманих кортежів
  41. Тестування приймача іменованого каналу
  42. Лістинг 5. cat <named pipe filename>
  43. Приклади додатків InfoSphere Streams
  44. Отримання даних текстових файлів
  45. Отримання даних з програми користувача виходу
  46. Обробка кортежів
  47. Виконання програми InfoSphere Streams і отримання кортежів
  48. Лістинг 6. Зміна вихідних таблиць
  49. Висновок
  50. Ресурси для скачування

Наша взаимовыгодная связь https://banwar.org/

Використання InfoSphere Data Replication CDC як джерела для аналітичної обробки даних в реальному часі

огляд

Аналітичні системи поступово переходять від переважно пакетної обробки до обробки в реальному часі. У деяких випадках відкладена аналітична обробка даних є неприйнятною, наприклад, якщо компанії потрібно моніторинг конкретних подій. У сьогоднішньому взаємопов'язаному і технічно оснащеному світі деякі дані, які використовуються для аналізу, можуть надходити з таких пристроїв, як камери, мікрофони і датчики. Однак багато компаній генерують значні обсяги даних з використанням додатків для оперативної обробки транзакцій (online transactional processing, OLTP) і зберігають їх в транзакційних базах даних. Часто потрібне аналітичне опрацювання таких транзакційних даних з можливістю виконувати за результатами аналізу певні дії, розсилати попередження або відправляти дані на інформаційну панель.

Реплікація даних є найбільш автоматизованим і ефективним методом доставки інформації практично в реальному часі з транзакційних баз даних в аналітичні процеси. Реплікація дозволяє доставляти не весь набір даних, а дані, які зазнали змін у вихідній системі. Рішення InfoSphere Data Replication CDC дозволяє виявляти зміни, що вносяться і фіксуються транзакційними додатками, по журналам вихідних баз даних. Коли механізм захоплення виявляє в джерелі змінилися дані, він відправляє зміни в механізм застосування. Для обробки змін існують різні варіанти, такі як запис в таблицю іншої бази даних або в текстовий файл, відправка зміни як XML-повідомлення в JMS-чергу або виклик програми користувальницького виходу для спеціальної обробки. Змінені дані витягуються з журналу вихідної бази даних, передаються по мережі і обробляються прийомним механізмом без проходження через який-небудь проміжний механізм дискового зберігання.

У цьому посібнику описано підключення CDC як джерела даних майже реального часу для InfoSphere Streams, вивчаються різні варіанти інтеграції, а також пропонуються для завантаження вихідні коди програм для користувача виходу, які можна налаштувати відповідно до потреб.

На малюнку нижче показана загальна схема руху даних від журналу бази даних до споживачів даних. Зміни, що вносяться до таблиці додатками, записуються в журнал бази даних і зчитуються механізмом джерела CDC. Після фіксації транзакції механізм джерела CDC відправляє зміни в механізм приймача, записує ці зміни в таблицю бази даних, текстовий файл або чергу повідомлень. Крім того, можна викликати програму для користувача виходу для виконання спеціальної обробки.

Малюнок 1. Рух даних з бази даних до споживачів
Використання InfoSphere Data Replication CDC як джерела для аналітичної обробки даних в реальному часі   огляд   Аналітичні системи поступово переходять від переважно пакетної обробки до обробки в реальному часі

Приклад сценарію: моніторинг витрат на мобільний зв'язок

Оператор зв'язку хоче запропонувати новий сервіс, надавши абонентам можливість вказувати порогове значення для місячних витрат на мобільний зв'язок. Якщо поточний баланс перевищить встановлений максимум, то клієнт отримує повідомлення по електронній пошті або SMS. Впровадження нового сервісу має дві мети:

  • Клієнти зможуть краще контролювати використання пристроїв.
  • Компанія зможе зменшити кількість випадків несплати за рахунками.

В цілому новий сервіс повинен забезпечити підвищення якості обслуговування клієнтів.

Для впровадження цих нових функцій компанія спочатку розглянула можливість зміни додатки для тарифікації. Однак виявилося, що для реалізації такого варіанту будуть потрібні місяці. Щоб прискорити виведення нового сервісу на ринок, компанія вибрала підхід, який базується на вилученні інформації про витрати безпосередньо з таблиць бази даних. Зазвичай інформація про витрати зберігається у вигляді тарифікованих записів даних про виклики (call data record, CDR). На жаль, застосування стандартних методів виконання запитів надавало б негативний вплив на продуктивність, тому таке рішення виявилося неприйнятним.

Оскільки додаток для тарифікації працює зі стандартною РСУБД, у компанії виникла ідея використовувати журнал бази даних як джерело для виявлення подій. CDC може зчитувати зміни в таблицях в міру їх появи і запускати додаток для обробки подій. Щоб забезпечити гнучкість і масштабованість, компанія вирішила реалізувати доставку даних про зміни в додаток InfoSphere Streams з використанням CDC.

Цей сценарій легко реалізувати, хоча фактично він не використовує потужні аналітичні можливості InfoSphere Streams. Більш складний приклад може включати моніторинг CDR-записів для кожного абонента і повідомлення центру обслуговування клієнтів про відхилення в поведінці абонентів мобільного зв'язку. Наприклад, повідомлення може включати відомості про абонента, який раптово почав дзвонити в інші країни, чого раніше ніколи не робив, або про те, що поточний тарифний план абонента більше не є для нього оптимальним.

Визначення таблиць і кортежів

Для виявлення наближення балансу до граничного значення використовуються дві таблиці основної бази даних - таблиця тарифікованих CDR-записів (RATED_CDR), подібна таблиці 1, і допоміжна таблиця (CUST_THRESHOLD), в якій зберігаються персональні порогові значення для кожного абонента, як в таблиці 2. Ми навмисно спростили технічні уявлення таблиць бази даних для цього прикладу.

Таблиця 1. RATED_CDR

Тип MSISDNA varchar (20) MSISDNB varchar (20) START_TIME timestamp (3) END_TIME timestamp (3) DURATION_SEC decimal (9,0) COST decimal (9,2)

Таблиця 2. CUST_THRESHOLD

Тип MSISDN varchar (20) NAME varchar (50) MAX_MONTHLY_CHARGE decimal (15,2) THRESHOLD_PERCENTAGE decimal (3,0)

Для таблиці RATED_CDR кортеж InfoSphere Streams, одержуваний оператором джерела, буде приблизно таким, як в лістингу 1. Кортежі згенеровані з використанням прикладу програми генерування кортежів, посилання на яку є в розділі Завантаження .

Лістинг 1. rated_cdrT

TelcoRated_cdrT = tuple <rstring aud_commit_timestamp, rstring aud_transaction_id, rstring aud_entry_type, rstring aud_user, rstring b_msisdna, rstring b_msisdnb, rstring b_start_time, rstring b_end_time, float64 b_duration_sec, rstring b_cost, rstring msisdna, rstring msisdnb, rstring start_time, rstring end_time, float64 duration_sec, rstring cost>;

Для таблиці CUST_THRESHOLD визначення кортежів показано в лістингу 2.

Лістинг 2. cust_thresholdT

TelcoCust_thresholdT = tuple <rstring aud_commit_timestamp, rstring aud_transaction_id, rstring aud_entry_type, rstring aud_user, rstring b_msisdn, rstring b_name, float64 b_max_monthly_charge, float64 b_threshold_percentage, rstring msisdn, rstring name, float64 max_monthly_charge, float64 threshold_percentage>;

Ви можете використовувати ці визначення кортежів при обробці текстових файлів CDC і при відправці записів в додаток InfoSphere Streams через програму для користувача виходу.

попередні вимоги

Для виконання дій, наведених в прикладах, будуть потрібні наступні компоненти:

Залежно вибраного варіанта реплікації - через текстові файли або через програму для користувача виходу - вам буде потрібно один з наступних механізмів джерела InfoSphere Data Replication CDC:

  • Текстові файли - InfoSphere Data Replication 10.2 CDC for InfoSphere DataStage
  • Призначений для користувача вихід - InfoSphere Data Replication 10.2 CDC Event Server

    CDC Event Server призначений для черг Java ™ Message Service (JMS), але його можна використовувати також як приймач загального призначення і викликати програму для користувача виходу на базі подій реплікації. Встановивши CDC Event Server, створіть екземпляр, який в кінцевому підсумку буде виконувати функції механізму приймача. При створенні екземпляра необхідно вказати бібліотеку постачальника JMS. Єдиною обов'язковою бібліотекою є jms.jar, яку можна завантажити безкоштовно в складі пакету IBM MessageSight JMS Client Pack .

Якщо ж в якості механізму джерела використовується один з Java-механізмів (Oracle, DB2® для Linux®, UNIX® і Windows®, SQL Server, Informix® і т.д.), то можна використовувати цей механізм як приймач і конфігурувати зворотний реплікацію .

варіанти архітектури

Залежно від середовища і вимог до затримки є два варіанти інтеграції InfoSphere CDC з додатком InfoSphere Streams:

  • Використання механізму CDC for DataStage для проміжної запису транзакцій в текстові файли, після чого додаток InfoSphere Streams може їх обробляти в пакетному режимі ( сценарій 1 ).
  • Розгортання програми користувальницького виходу механізму приймача CDC для доставки даних безпосередньо в додаток InfoSphere Streams через TCP / IP-або іменований канал ( сценарій 2 ).

У цій статті обговорюються переваги та недоліки обох методів інтеграції та пропонуються для ілюстрації приклади додатків.

Незалежно від обраного варіанту механізм джерела CDC зчитує зміни в журналі бази даних і після фіксації транзакції її операції відправляються в механізм приймача CDC для застосування.

Примітки

Доставка в додаток InfoSphere Streams даних, що надходять з транзакционной системи, природним чином підходить для обробки подій (як це обговорювалося в сценарії використання ). Однак обробку даних з такої системи не слід плутати з транзакционной обробкою додатком InfoSphere Streams. Транзакція, також звана одиницею роботи, зберігається в базі даних, коли виконана фіксація. Якщо транзакції реплицируются в механізм бази даних CDC і застосовуються до цільової базі даних, то CDC буде підтримувати узгодженість транзакцій і виконувати операцію фіксації в базі даних на стороні приймача. Якщо стороною приймача є додаток InfoSphere Streams, то CDC не може виконувати таку двосторонню фіксацію. Як тільки операції передані в додаток InfoSphere Streams через призначену для користувача програму виходу, CDC більше не знає, чи були вони повністю оброблені.

Слід уникати використання InfoSphere Streams, якщо ви не можете допустити або обробити втрату або дублювання зміни запису або навіть транзакції в цілому. Для таких додатків більше підходять рішення Enterprise Service Bus (ESB) та Extract Transform and Load (ETL). Додатки InfoSphere Streams мають перевагу у випадках, коли потрібно швидкість аналізу і обробки подій для великих обсягів переданих даних.

Сценарій 1: Проміжна запис в текстові файли

InfoSphere Data Replication підтримує реплікацію в текстові файли з роздільниками (CSV), готові до негайного використання через механізм приймача CDC for DataStage (див. Рисунок 2). Для кожної реплицируемой таблиці можна призначити каталог, в якому будуть створюватися текстові файли.

Малюнок 2. Проміжна запис в текстові файли

До текстових файлів буде застосовуватися угода про іменування <table_name> x <date> T <time>, де x може приймати значення D для зафіксованого текстового файлу і @ для текстового файлу, запис в який ще виконується. У властивостях DataStage підписки CDC є два параметри, які контролюють частоту фіксації файлів, - кількість рядків і кількість секунд (див. Малюнок 3). При досягненні будь-якого з цих порогових значень на кордоні транзакції для будь-якої таблиці, яка реплікується підпискою, всі відкриті файли (з символом @ в іменах) закриваються і перейменовуються в <table> D <date> T <time>. Це забезпечує можливість обробки змін у всіх реплицируемой таблицях до конкретного моменту часу.

Малюнок 3. Властивості InfoSphere DataStage

Додаток InfoSphere Streams має використовувати оператор DirectoryScan для безперервного сканування вихідного каталогу, заданого в зіставленні таблиць CDC, на наявність нових файлів. Коли механізм CDC for DataStage записує новий файл, записи в цьому файлі можуть бути оброблені оператором FileSource. Приклад програми InfoSphere Streams CDCFlatFile.spl (див. Розділ Завантаження ) Демонструє, як сканувати каталог на наявність записаних текстових файлів і обробляти вміст цих файлів.

попередження

Не використовуйте параметр hotfile оператора FileSource в InfoSphere Streams для читання файлу CDC, в який в даний момент записуються зміни (активного файлу). При досягненні одного з порогових значень, сконфигурированних в CDC, поточний активний файл буде закритий і отримає ім'я зафіксованого файлу. Тому додаток InfoSphere Streams не зможе звернутися до файлу в ході операції читання, і вона завершиться помилкою.

Оскільки додаток InfoSphere Streams може залежати тільки від змін в зафіксованих файлах, досягти дуже малих затримок між генеруванням транзакції в джерелі і завершенням її обробки додатком InfoSphere Streams неможливо - зазвичай така затримка обчислюється хвилинами, а не секундами. Одним з методів скорочення затримки є зниження порогових значень розміру пакетів, проте врахуйте, що це може привести до того, що CDC буде генерувати велику кількість текстових файлів в залежності від обсягу транзакцій.

Головною перевагою використання текстових файлів є те, що CDC підтримує їх без будь-яких модифікацій, що забезпечує простоту реалізації.

Ваша програма InfoSphere Streams має зчитувати текстові файли у форматі CSV, враховуючи, що кожне з полів оточене лапками ( "). Перші чотири поля в кожному рядку містять наступну інформацію:

  • Мітка часу в форматі ISO
  • ідентифікатор транзакції
  • Тип операції (I для вставки, U для поновлення, D для видалення)
  • Користувач, який виконав операцію в джерелі.

За цими чотирма фіксованими полями слід вихідна запис оновленої або віддаленої рядка, а потім новий запис вставленої або видаленої рядки. У разі вставки у вихідний запис буде порожній, але поля для неї будуть присутні. У разі видалення будуть порожніми поля для нового запису.

аспекти реалізації

Рекомендується ізолювати механізм CDC for DataStage від кластера InfoSphere Streams, щоб уникнути конкуренції за оперативну пам'ять і процесори. Крім того, можна розглянути варіант архітектури додатку, в якому обробляє елемент (processing element, PE) InfoSphere Streams обробляє текстові файли тільки на кластерному вузлі, який також виконує InfoSphere CDC.

Якщо дані, реплицируемой з вихідної системи, можуть включати символи лапок, то необхідно обробляти вихідні текстові файли CDC for DataStage спеціальною програмою форматування, що дозволяє використовувати інший роздільник стовпчиків, наприклад символ вертикальної риси |. В розділі Завантаження представлений приклад програми для користувача виходу для спецформатування з використанням символу вертикальної риски як роздільник стовпчиків.

Якщо час між генеруванням транзакції в джерелі і її обробкою додатком InfoSphere Streams має бути невеликим (не більше декількох секунд, хоча в добре налаштованих середовищах може бути і менше секунди), то проміжна запис даних в текстові файли неприйнятна. Використовуючи програму для користувача виходу на стороні приймача підписки CDC, можна передавати зміни записів, одержувані від джерела, безпосередньо в додаток InfoSphere Streams.

Програма призначеного для користувача виходу викликається для кожної операції вставки, оновлення та видалення, переданої разом з даними про зміни з боку джерела, і потім форматує вихідну запис, передану в додаток InfoSphere Streams.

У цьому посібнику описано два методи доставки даних безпосередньо в InfoSphere Streams:

  • Використання сокета TCP / IP, де програма призначеного для користувача виходу є клієнтом, а додаток InfoSphere Streams сервером
  • Використання іменованих каналів Linux / UNIX для передачі даних через взаємодії між процесами

Використовуючи сокет TCP / IP або механізм взаємодії між процесами, можна домогтися низької затримки між генеруванням транзакції в джерелі і повної її обробкою додатком InfoSphere Streams. Ці методи також забезпечують масштабованість, оскільки відсутня проміжне збереження записів про зміни на диск.

У цьому розділі ми докладніше розглянемо обидва методи і обговоримо приклад програми для користувача виходу. Ви можете завантажити вихідний код, а також інструкції щодо збирання і розгортання для прикладу програми для користувача виходу.

Передача в TCP / IP-

Додатки InfoSphere Streams можуть отримувати дані безпосередньо з гнізд TCP / IP. Програма призначеного для користувача виходу CDC може з'єднуватися зі слухачем TCP / IP, що виконуються на кластері InfoSphere Streams, і передавати дані в додаток через цей канал (див. Малюнок 4).

Малюнок 4)

Потім додаток InfoSphere Streams, налаштоване як сервер, бере дані з сокета за допомогою оператора TCPSource. Поки додаток InfoSphere Streams активно, CDC має можливість записувати дані в сокет. Ви можете завантажити приклад програми InfoSphere Streams CDCTCPSocket.spl, який демонструє, як зчитувати дані з сокета TCP / IP і обробляти вміст.

Міркування щодо реалізації

У даній реалізації механізм приймача CDC може (і повинен) бути встановлений за межами кластера InfoSphere Streams. Він з'єднується з додатком InfoSphere Streams тільки по мережі і передає всі дані про зміни по мережі.

Передача в іменованій канал

Іменованій канал - це Механізм взаємодії между процесами в Linux / UNIX (див. Малюнок 5). В операційній системі він представлений спеціальним типом об'єкта (fifo-файл), створюваним з використанням команди mkfifo. Процес джерела може записувати дані в канал, а на іншій стороні каналу є процес, який зчитує дані.

Процес джерела може записувати дані в канал, а на іншій стороні каналу є процес, який зчитує дані

Якщо програма призначеного для користувача виходу налаштована для запису в іменований канал, то додаток InfoSphere Streams бере дані з іменованого каналу з використанням оператора FileSource. Поки додаток InfoSphere Streams активно, CDC матиме можливість записувати дані в іменований канал.

У розділі Завантаження представлений приклад програми InfoSphere Streams CDCNamedPipe.spl, який демонструє, як зчитувати дані з іменованого каналу і обробляти вміст.

Міркування щодо реалізації

Більшість мережевих файлових систем не підтримують іменовані канали, тому необхідно встановити механізм приймача CDC на одному з вузлів кластера InfoSphere Streams. Можна зменшити вплив і запобігти конкуренції за ресурси, виділивши один з вузлів елементу InfoSphere Streams PE, який зчитує дані з іменованого каналу, і механізму приймача CDC.

Конфігурація програми користувальницького виходу

Програма призначеного для користувача виходу спрощує реплікацію змін, лічених з використанням InfoSphere CDC, в додаток InfoSphere Streams. Зазвичай програма призначеного для користувача виходу конфигурируется для підписки, виконуваної з бази даних в CDC Event Server, тобто не вимагає бази даних приймача. Однак вона також працює для цільових баз даних з Java-механізмом CDC і навіть може бути реалізована як зворотна реплікація.

Можна вибрати передачу реплікованих подій або в TCP / IP-, або в іменований канал (fifo-файл). Форматом переданої інформації, званим кортежем, є значення з роздільниками; при цьому роздільник може бути визначений в файлі властивостей. У переданої записи є кілька фіксованих полів, за якими слід інформація, що ідентифікує зміна. Кортеж має такий вигляд:

table_schema.table_name

Схема таблиці і ім'я таблиці, об'єднані в одному полі і включаються в кожен реплицируемой кортеж. Оскільки все реплицируемой рядки записуються в один і той же фізичний приймач (TCP / IP-, іменований канал), необхідна можливість ідентифікувати таблицю, в якій рядок була змінена. commit_timestamp Метка часу в форматі ISO (yyyy-mm-dd hh24: mi: ss: ffffff), яка вказує, коли транзакція була зафіксована в джерелі. Вона схожа на контрольне поле журналу & TIMSTAMP. transaction_id Ідентифікатор одиниці роботи. Всі операції в межах однієї транзакції бази даних мають один і той же ідентифікатор транзакції. entry_type Стовпець, який містить тип операції (I = вставка, U = оновлення, D = видалення), виконаної в джерелі. user Користувач вихідної бази даних, який виконав транзакцію. before_columns По одному полю на кожен стовпець вихідної бази даних, обраний для реплікації. Ці поля містять вихідну запис оновленої або віддаленої рядка. У разі вставки ці поля порожні, але будуть присутні у вихідному форматі. after_columns По одному полю на кожен стовпець вихідної бази даних, обраний для реплікації. Ці поля містять новий запис оновленої або вставленого рядка. У разі видалення ці поля порожні, але будуть присутні у вихідному форматі.

Всі поля розділені символом роздільника стовпців, сконфігурованим в файлі властивостей; за замовчуванням це символ вертикальної риски (|). Всі кортежі закінчуються символом нового рядка Linux / UNIX, що забезпечує коректну ідентифікацію кінця кортежу в InfoSphere Streams.

Єдина відмінність між даними, одержуваними з текстового файлу CDC, і даними, одержуваними з програми користувача виходу, полягає в тому, що програма призначеного для користувача виходу вставляє в початок запису повне ім'я таблиці. Це необхідно, оскільки зміни з усіх таблиць відправляються в один і той же слухач або іменований канал. Додаток InfoSphere Streams має спочатку витягти з отриманої записи ім'я таблиці, а потім відправити її на відповідний вихідний порт для подальшої обробки.

Розгортання програми користувальницького виходу

Для реалізації призначеного для користувача виходу необхідно виконати наступне:

  1. Скомпілювати вихідний Java-код у відповідності зі специфікаціями в файлі compile_readme.txt
  2. Скопіювати всі файли .class в каталог <cdc_home> / lib механізму одержувача
  3. Скопіювати файл CDCStreams.properties в каталог <cdc_home> механізму одержувача.

Конфігурація властивостей програми користувальницького виходу

Перед тим як конфігурувати реплікацію, перевірте файл CDCStreams.properties і вкажіть метод, який хочете використовувати для передачі змін в додаток InfoSphere Streams. Передачу можна виконувати через TCP / IP (outputType = tcp) або іменований канал (outputType = namedpipe). Для реплікації в слухач TCP / IP необхідно вказати ім'я хоста або IP-адреса і порт, на який повинні відправлятися сокети (атрибут tcpHostPort). Крім того, програма призначеного для користувача виходу може передавати дані в іменований канал; в такому випадку для атрибута namedPipe повинен бути заданий fifo-файл, локальний для механізму приймача.

Якщо слухач TCP / IP недоступний для CDC, то виконання підписки завершиться помилкою після закінчення зазначеного часу очікування з'єднання TCP (за замовчуванням 2 хвилини). Якщо при запуску підписки іменований канал не існує, то її виконання завершиться помилкою негайно.

Можна налаштувати інші властивості, наприклад роздільник полів (атрибут separator), щоб поведінка програми для користувача виходу відповідало вимогам вашого застосування InfoSphere Streams.

Налаштування реплікації

Зіставлення таблиць в разі, коли приймачем є InfoSphere Streams, мало відрізняється від настройки реплікації між таблицями вихідної і цільової баз даних. Однак в цьому випадку необхідно зіставити вихідну таблицю з фіктивною таблицею CDC Event Server. Призначенням цієї таблиці є визначення структури записів, а не зберігання будь-яких даних. Програма призначеного для користувача виходу забезпечує перехоплення будь-якої операції вставки, оновлення або видалення, переданої в InfoSphere Streams, виключаючи виконання операцій в фіктивної цільової таблиці.

Спочатку створюється підписка в консолі CDC Management Console. Наші вихідні таблиці знаходяться в базі даних DB2 LUW і, оскільки ми маємо намір використовувати як приймач тільки додаток InfoSphere Streams, цільовим сховищем даних CDC є CDC Event Server (див. Рисунок 6).

Малюнок 6. Передплатити

Створивши підписку, визначте для неї зіставлення таблиць. Замість того щоб вказувати призначення повідомлення, використовуйте реплікацію типу Standard (див. Малюнок 7), яка дозволяє вибрати або створити цільову таблицю. Потім можна вибрати вихідну таблицю для реплікації.

Малюнок 7. Зіставлення таблиць - вибір типу зіставлення

Виберіть вихідну таблицю для реплікації, в даному прикладі це таблиця TELCO.CUST_THRESHOLD DB2.

Малюнок 8. Зіставлення таблиць - вибір вихідної таблиці

Цільовий таблиці немає, тому ми створимо фіктивну цільову таблицю, вибравши схему STAGE і натиснувши кнопку Create Table .... STAGE в CDC Event Server - це тільки схема, в якій фіктивна цільова таблиця може бути створена.

Малюнок 9. Зіставлення таблиць - вибір цільової таблиці

Для зручності ми створимо цільову таблицю з тим же ім'ям, що і вихідна таблиця. Це ім'я таблиці, яке буде передаватися програмі призначеного для користувача виходу.

Малюнок 10. Створення цільової таблиці

Як показано нижче, вкажіть стовпці, що утворюють цільову таблицю. За замовчуванням використовуються всі стовпці, присутні у вихідній таблиці.

Малюнок 11. Визначення стовпців

Буде показано кілька проміжних вікон, а потім буде виведено вікно оператора CREATE TABLE SQL.

Малюнок 12. SQL-оператор створення таблиці

Створену фіктивну таблицю можна вибрати як цільову.

Малюнок 13. Вибір цільової таблиці

Можна вказати ключі, які використовуються для оновлення або видалення записів. Для даного прикладу вони не важливі, оскільки ми не будемо виконувати ніяких операцій над цільової таблицею, а будемо тільки використовувати записи, передані механізмом джерела CDC.

Малюнок 14. Визначення ключів

В якості методу реплікації зіставлених таблиць слід вибрати Mirror (Change Data Capture), оскільки ми хочемо відправляти зміни в міру їх появи.

В якості методу реплікації зіставлених таблиць слід вибрати Mirror (Change Data Capture), оскільки ми хочемо відправляти зміни в міру їх появи

Виконавши зіставлення таблиць, завершите конфігурація програми для користувача виходу, як на рівні підписки, так і на рівні таблиці, щоб додаток InfoSphere Streams було визначено як приймач. Для продовження відкрийте зіставлення таблиць в консолі CDC Management Console.

Малюнок 16. Зіставлення таблиця

Необхідно визначити програму для користувача виходу на рівні підписки, оскільки вона забезпечує ініціалізацію і зчитування конфігурації з файлу властивостей. В даному прикладі ми задамо для призначеного для користувача виходу Java-клас CDCStreams (.class), який доступний в розділі Завантаження . Програми для користувача виходу на рівні підписки і на рівні рядків були консолідовані в один клас. Якщо ви не визначите для користувача вихід на рівні підписки, то реплікація завершиться помилкою при реплікації першої операції.

Малюнок 17. Визначення користувацького виходу на рівні підписки
Малюнок 18. Визначення імені класу

Необхідно також визначити цей же користувальницький вихід на рівні рядків для всіх зіставлених таблиць як мінімум для однієї операції (вихідна запис-вставка, вихідна запис-оновлення або зміни у вихідний запис-видалення). Якщо користувальницький вихід конфигурируется для однієї з операцій, він буде автоматично активуватися для іншої операції. Щоб передати той факт, що програма призначеного для користувача виходу викликається до будь-яких операцій вставки, оновлення та видалення, ми рекомендуємо поставити відмітки в усіх трьох полях.

Малюнок 19. Визначення користувацького виходу на рівні рядків

Збережіть зіставлення таблиць і продовжите зіставлення залишилися таблиць, не забуваючи про те, що для кожної таблиці повинен бути налаштований для користувача вихід на рівні рядків.

Тестування реплікації і програми для користувача виходу

Якщо ви вперше виконуєте реплікацію змін з CDC в свій додаток InfoSphere Streams, рекомендуємо перевірити результат програми користувальницького виходу. У цьому розділі описується метод настройки приймача для вашої підписки, що дозволяє перевірити кортежі, які будуть відправлятися в додаток InfoSphere Streams.

Тестування приймача TCP / IP

На сервері, що виконує оператор TCPSource вашого застосування InfoSphere Streams, виконайте команду nc -l <port> в командному рядку Linux.

Лістинг 3. Запуск слухача

[Streamsadmin @ streams-server ~] # nc -l 12345

Ця команда запускає слухач (сервер) на порту 12345 і виводить всю отриману інформацію. Запустивши підписку, ви побачите деякі події в журналі подій приймача підписки.

Малюнок 20. Журнал подій приймача підписки для з'єднання TCP / IP

Вставте кілька записів у вихідну таблицю, і команда слухача покаже отримані кортежі.

Лістинг 4. Список отриманих кортежів

[Streamsadmin @ streams-server] # nc -l 12345 TELCO.CUST_THRESHOLD | I | 2014-05-23 16: 57: 07.000000000000 ||||| 31653111222 | Isaac Stroming | 200.00 | 70 TELCO.CUST_THRESHOLD | I | 2014-05 -23 16: 59: 26.000000000000 ||||| 31651444333 | Chris D. Cosi | 350.00 | 45 TELCO.CUST_THRESHOLD | I | 2014-05-23 17: 00: 28.000000000000 ||||| 33612939415 | Iris Bonne Maman | 80.00 | 90

Тестування приймача іменованого каналу

Створіть іменований канал, як зазначено в файлі CDCStreams.properties, використовуючи команду mkfifo <named pipe filename>. Після створення іменованого каналу запустіть реплікацію. Ви можете побачити кортежі, виконавши команду cat <named pipe filename> в командному рядку Linux.

Лістинг 5. cat <named pipe filename>

[Streamsadmin @ streams-server] cat CDCNamedPipe.csv TELCO.CUST_THRESHOLD | 2014-06-25 12: 35: 59.000000000000 | 149363 | U | DB2INST1 | 33612939415 | Iris Bonne Maman | 80.00 | 85 | 33612939415 | Iris Bonne Maman | 80.00 | 86 TELCO.CUST_THRESHOLD | 2014-06-25 12: 36: 44.000000000000 | 149364 | U | DB2INST1 | 44722333444 | River Flenix | 56.00 | 100 | 44722333444 | River Flenix | 65.00 | 100

Приклади додатків InfoSphere Streams

У прикладах додатків InfoSphere Streams ми використовуємо оператори Export і Import, тому можемо багаторазово використовувати компоненти програми і забезпечувати гнучкість передачі в нього кортежів.

У прикладах додатків InfoSphere Streams ми використовуємо оператори Export і Import, тому можемо багаторазово використовувати компоненти програми і забезпечувати гнучкість передачі в нього кортежів

У прикладі програми можна виділити дві частини. У першій частині зміни захоплюються і форматуються для обробки в другій частині.

Отримання даних текстових файлів

Складовою оператор CDCReceiveFlatFile.spl сканує каталог / tmp / telco на наявність файлів, що починаються з CUST_THRESHOLD.D або RATED_CDR.D. Такі файли є зафіксованими, і додаток InfoSphere Streams може безпечно зчитувати їх, оскільки вони більше не будуть змінюватися. Файли, що записуються в сканований каталог, вже містять ім'я таблиці, тому зіставлення записів файлів з відповідними типами кортежів виконується легко і не вимагає додаткового синтаксичного розбору. Після того як вхідні записи перетворені в кортежі, вони експортуються для подальшої обробки складовим оператором CDCTupleProcess.spl.

Отримання даних з програми користувача виходу

Складові оператори CDCReceiveTCP.spl і CDCReceiveNamedPipe.spl зчитують записи про зміни з сокета TCP / IP і з іменованого каналу відповідно. Зміни надходять від CDC майже в реальному часі.

Оскільки в програмі користувача виходу можна конфігурувати тільки один порт або один іменований канал, зміни з усіх таблиць, зіставлених в підписці, будуть зчитуватися оператором TCPSource або FileSource. Кожна із записів про зміни має на початку повне ім'я вихідної таблиці, тому можна розпізнавати тип кортежу. Оскільки обидва типи джерела для входу надають один і той же тип інформації, ми відокремили синтаксичний розбір запису від отримання; всі записи про зміни негайно експортуються для обробки складовим оператором CDCTupleSplit.spl.

У першій частині складеного оператора CDCTupleSplit.spl запис рядка розбивається для вилучення імені таблиці і актуальних даних, які повинні бути розібрані для створення кортежу. Оскільки оператор InfoSphere Streams Parse очікує дані типу blob, дані перетворюються до цього типу, і знову додається символ нового рядка, що прибирається оператором джерела. В подальшому операторі Split записи направляються в вихідний порт, який веде до оператора Parse для відповідної таблиці.

Після перетворення вхідних записів в кортежі вони експортуються для подальшої обробки складовим оператором CDCTupleProcess.spl.

Обробка кортежів

Приклад складеного оператора CDCProcess.spl імпортує кортежі, що надходять від складових операторів CDCReceiveFlatFile.spl і CDCTupleSplit.spl, і записує їх у вихідні файли в каталозі / tmp / telco / output.

Виконання програми InfoSphere Streams і отримання кортежів

Нарешті, запустимо приклад програми InfoSphere Streams і внесемо в вихідні таблиці зміни, які будуть відображені в вихідних файлах.

Лістинг 6. Зміна вихідних таблиць

[Streamsadmin @ streams-server] cat /tmp/telco/output/custthreshold_output.txt "2014-06-25 18: 19: 24.000000000000", "152151", "U", "DB2INST1", "33612939415", "Iris Bonne Maman "," 80.00 "," 86 "," 33612939415 "," Iris Bonne Maman "," 80.00 "," 85 "" 2014-06-25 18: 19: 25.000000000000 "," 152211 "," U "," DB2INST1 "," 44722333444 "," River Flenix "," 65.00 "," 100 "," 44722333444 "," River Flenix "," 66.00 "," 100 "

Висновок

У цій статті описано використання InfoSphere Data Replication CDC для зчитування змін до вихідних транзакційних базах даних і їх реплікації в додаток InfoSphere Streams для аналітичної обробки в реальному часі. Ви можете використовувати файли в розділі Завантаження , Щоб приступити до інтеграції цих двох продуктів. У розділі ресурси наведені посилання на додаткову інформацію про CDC і InfoSphere Streams.

Ресурси для скачування

Схожі тими

Підпішіть мене на ПОВІДОМЛЕННЯ до коментарів

Категории
  • Биология
  • Математика
  • Краеведению
  • Лечебная
  • Наука
  • Физике
  • Природоведение
  • Информатика
  • Новости

  • Новости
    https://banwar.org/
    Наша взаимовыгодная связь https://banwar.org/. Запустив новый сайт, "Пари Матч" обещает своим клиентам незабываемый опыт и возможность выиграть крупные суммы.


    Наши клиенты
    Клиенты

    Быстрая связь

    Тел.: (044) 587-84-78
    E-mail: [email protected]

    Имя:
    E-mail:
    Телефон:
    Вопрос\Комментарий: