Классика баз данных - статьи

       

Доступ к данным Hadoop из SQL с использованием табличной UDF


В этом разделе мы опишем, как можно обеспечить прямой доступ к данным Hadoop через SQL-запросы и использовать эти данные совместно с реляционными данными Teradata EDW для выполнения интегрированного анализа данных. Мы обеспечиваем табличную UDF (User Defined Function – функцию, определяемую пользователями), называемую HDFSUDF, которая "вытягивает" данные из Hadoop в Teradata EDW. Например, в следующем SQL-запросе вызывается HDFSUDF для загрузки данных из файла Hadoop с именем mydfsfile.txt в таблицу Tab1 в Teradata EDW:

INSERT INTO Tab1 SELECT * FROM TABLE(HDFSUDF (‘mydfsfile.txt’)) AS T1;

Заметим, что после создания табличной UDF HDFSUDF и предоставления ее пользователям она вызывается подобно любой другой UDF. Для пользователей этой табличной UDF несущественно, каким образом данные перемещаются из Hadoop в Teradata EDW. Обычно табличная UDF HDFSUDF пишется таким образом, чтобы при ее вызове из SQL-запроса она выполнялась в каждом AMP. Однако ее можно написать и таким образом, чтобы при вызове из SQL-запроса она выполнялась в каком-либо одном AMP или в какой-либо группе AMP. Каждый экземпляр HDFSUDF, выполняемый в некотором AMP, отвечает за извлечение некоторой части файла Hadoop. Табличная функция HDFSUDF может также производить фильтрацию и преобразование данных по мере того, как эта функция доставляет строки в процессор SQL. Примерный код HDFSUDF и другие подробности доступны на Web-сайте Teradata Developer Exchange [1]. Когда в некотором AMP запускается экземпляр UDF, этот экземпляр связывается с NameNode в Hadoop, который заведует метаданными относительно mydfsfile.txt. Метаданные Hadoop NameNode включают информацию о том, какие блоки файла Hadoop сохраняются, и в каких узлах они реплицируются. В нашем примере каждый экземпляр UDF обращается к NameNode и обнаруживает общий размер S файла mydfsfile.txt. Затем табличная UDF запрашивает у Teradata EDW номер своего собственного AMP и общее число AMP. На основе этих фактов каждый экземпляр UDF вычисляет смещение в файле mydfsfile.txt, от которого он начнет читать данные из Hadoop.


Для любого запроса от экземпляров UDF к системе Hadoop NameNode устанавливает, какие DataNode в Hadoop отвечают на возврат требуемых данных. Экземпляр табличной UDF, выполяемый на некотором AMP, получает данные непосредственно от тех DataNode, которые сохраняют требуемые блоки данных. Заметим, что никакие данные из файла Hadoop никогда не маршрутизируются через NameNote. Все это делается напрямую от одного узла другому узлу. В нашей примерной реализации [1] мы просто вынуждаем N-ый AMP в системе загружать N-ую порцию файла Hadoop. В зависимости от потребностей приложений можно обеспечить другие типы отображений.

При принятии решения о том, какую часть файла следует загружать каждому AMD с использованием табличной UDF, нужно убедиться, что, в конечном счете, все экземпляры UDF прочитают все байты файла Hadoop, и каждый байт будет прочитан только один раз. Поскольку каждый AMP запрашивает данные из Hadoop, посылая в своем запросе смещение в байтах до позиции файла, с которого должно начаться чтение, нам требуется гаантировать, что последняя строка, прочитанная каждым AMP, является полной, а не частичной строкой (если экземпляры UDF обрабатывают входной файл в режиме "строка за строкой"). В нашей примерной реализации [1] у файла Hadoop, который требуется загрузить, строки имеют фиксированный размер; поэтому мы можем простым образом вычислить начальное и конечное смещение в байтах требуемой порции данных для любого AMP. В зависимости от формата входного файла и потребностей приложений назначению каждому AMP соответствующей порции файла может потребоваться уделять более серьезное внимание.

После загрузки данных Hadoop в Teradata мы можем анализировать набор данных Hadoop точно так же как любые другие данные, сохраняемые в EDW. Более интересно то, что мы можем выполнять интегрированный анализ данных над реляционными данными, хранимыми в Teradata EDW, и внешними данными, исходно сохранявшимися в Hadoop, без потребности в создании новой таблицы и загрузке в нее данных Hadoop.


Это демонстрируется в следующем примере. Предположим, что у некоторой телекоммуникационной компании имеется файл Hadoop packets.txt, в котором сохраняется информация о сетевых пакетах, и строки которого имеют формат <source-id, dest-id, timestamp>. Поля source-id и dest-id используются для обнаружения спамеров и хакеров. Их значения говорят нам, кто и куда послал запрос. Допустим теперь, что в Teradata EDW имеется таблица watch-list ("список отслеживания"), в которой сохраняется список source-id, которые отслеживаются и используются для анализа тенденций изменения. В следующем SQL-запросе соединяются файл Hadoop packets.txt и таблица watch-list для нахождения списка source-id в таблице watch-list, из которых рассылались пакеты в более чем миллион уникальных dest-id.

SELECT watchlist.source-id, count(distinct(T.dest-id)) as Total FROM watchlist, TABLE(HDFSUDF(’packets.txt’)) AS T WHERE watchlist.source-id=T.source-id GROUP BY watchlist.source-id HAVING Total > 1000000

Приведенный пример показывает, что мы можем использовать подход табличной UDF для обеспечения простой возможности выполнения сложного анализа с применением процессора SQL над данными Hadoop и реляционными данными. В настоящее время мы работаем над более развитой версией HDFSUDF [1], позволяющей пользователям SQL объявлять отображение схем между файлами Hadoop и таблицами SQL, а также фильтровать и трансформировать данные с применением высокоуровневых конструкций SQL без потребности в написании кода на языке Java.


Содержание раздела