Показаны сообщения с ярлыком bigdata. Показать все сообщения
Показаны сообщения с ярлыком bigdata. Показать все сообщения
воскресенье, 7 мая 2023 г.
четверг, 13 января 2022 г.
Книга: Высоконагруженные приложения. Программирование, масштабирование, поддержка
Метки:
2cp,
avro,
bigdata,
book,
cap,
consistance,
distributed,
replication,
serialize,
sharding
Это не пересказ всей книги, а краткие заметки, особо запомнившихся разделов.
среда, 28 июля 2021 г.
Azure: Databricks vs HDInsight
Метки:
azure,
bigdata,
databricks,
HDInsight,
spark
В облаке MS Azure есть 2 простых способа организовать кластер Spark - это Databricks и HDInsight.
вторник, 5 января 2021 г.
Оптимизация Spark Scala UDF
- Проблемы Scala UDF
- Создание Native Scala UDF для Catalyst
- Использование
- Тестовые данные
- Сравнение планов, Codegen и производительности
- Использование в SQL
Проблемы Scala UDF
1. UDF - черный ящик для Codegen: представляет из себя вызов java функции и не встраивается в wholestagegen2. Нет возможности not null оптимизации
3. UDF не может быть спущена на уровень файлов (predicate pushdown)
4. Конвертация UTF-8 строк Spark в UTF-16 строки JVM при передаче параметров и получении результата.
пятница, 21 августа 2020 г.
BigData анализ с помощью Spark и Scala
В этой статье я хотел бы охватить основные аспекты работы с фреймворком Spark
- RDD
- RDD Key-Value
- DataFrame и Spark SQL
- DataSet
- Управление распределением данных
- Shuffle
- Дополнительные возможности
- Spark Streaming
- Настройка и отладка
- Оптимизация
пятница, 27 марта 2020 г.
Oracle DataMiner ML в сравнении с Python sklearn
В этой статье хочу посмотреть на ML опцию, встроенную в Oracle 12.
среда, 26 февраля 2020 г.
Подключение к Kafka через Spark Structure Streaming
Памятка по чтению данных из Kafka топика средствами Spark Structure Streaming
- Подключение к Kafka
- Описание схемы топика
- Системные данные
- Преобразование json в плоскую таблицу
- Запуск стрима
- Указание окна забора
- Запуск batch процессинга
- События на стрим
- Запуск из консоли
- Использование стрима в HiveQL
суббота, 25 января 2020 г.
SQL заметки за 2019
Продолжение цикла заметок и статьи 2016 года.
Хочу зафиксировать моменты Oracle и SQL в общем, которые достаточно интересны, но малы для отдельной статьи.
Хочу зафиксировать моменты Oracle и SQL в общем, которые достаточно интересны, но малы для отдельной статьи.
-
Трансформация запросов
- Виды преобразований запросов
- Результрующий запрос после всех преобразований оптимизатора
- Ручная трансформаиця 1 запроса в другой
-
Статистика
- Устаревание статистики
- Инкрементальный сбор статистики в партицированных таблицах
- Селективность колонки с 12.2
- Хинт для задания статистики колонки
- статистика по использованию сегмента
- Ассоциация статистики к функции
- Колонки-кандидаты для гистограммы
- Просмотр данных гистограммы
- Join cardinality по гистограмме
- Определение селективности, если на обоих столбцах соединения есть гистограмма
-
PLSQL
- Автономная транзакция
- Иключение при поиске элемента по ключу
- plsql redefinition
- Консистентность функций
- Параллелизация pipeline функций
-
PLSQL коллекции
- Varrays - обычный массив
- Hash table - Associative array над связанным списком
- Nested tables
-
Анализ производительности запросов
- индекс - кандидат на удаление
- forall - в статистике (ash/awr)
- Выявление skew через oem monitor
- Пометка запроса для awr
- Чтение плана
- Монотонный рост значений в индексе
- Долгий вызов plsql в запросе
- Undo/redo при вставке
- Параллельное последовательное чтение индекса
- Result_cache
- Вставка игнорируя consraint, но с сохранением ошибок
- Пометка блока горячим
-
Оптимизация хранения
- Создание not null поля с default
- Index coalesce
- Вставка в новую таблицу
- Вставка в длинную таблицу
- Include Индекс
- Дополнительные параметры таблиц в Exadata
- Отрицательная эффективность Exadata
- Структура Lob
-
Партицирование
- Системное партицирование
- Reference partitions
- Глобальные индексы
- INDEXING OFF|On
- Тепловая карта партиций
-
Настройки бд
- Виды репликаций
- Exadata 12.2
- Особенности только в exadata
-
Разные SQL алгоритмы
- Пагинация на ключах
- start_of_group - нумерация групп по разрывам
- Забор таблицы частями без fullscan, индексов и партиций
- Поиск одного пропуска
- Вставка данных больше размера varchar
- partition join
- Округление через to int
- Удаление из обновляемого представления
- Выражение на месте join
- DBMS_HS_PASSTHROUGH - Полное выполнение запроса на удаленной бд
- Top уникальных строк в группе
понедельник, 19 августа 2019 г.
Оптимизация хранения данных в Orc для Hive
воскресенье, 30 июня 2019 г.
Машинное обучение через решающие деревья
Это краткий пересказ курса: Введение в Data Science и машинное обучение с дополнением 2 тем: градиентный бустинг и решающие деревья в Spark.
четверг, 29 ноября 2018 г.
Введение в Scala и параллельную разработку
Метки:
bigdata,
functional,
parallel,
scala
В этой статье я хотел бы охватить все аспекты работы с данными на языке Scala - от примитивов языка до параллельного программирования.
-
Введение в Scala
- Переменные
- Операторы
- Условия
- Вывод в консоль
- Циклы
- Функции
- Процедуры
- Ленивые переменные
- Исключения
- Массивы
- Ассоциативные массивы
- Кортежи (tuples)
- Классы
- Объекты
- Пакеты и импортирование
- Наследование
- Файлы и регулярные выражения
- Трейты (интерфейсы)
- Операторы
- Функции высшего порядка
- Коллекции
- Сопоставление с образцом
- Аннотации
- Обработка XML
- Обобщенные типы
- Дополнительные типы
- Неявные преобразования
- Динамическое программирование
- Конкурентное программирование Scala
- Молниеносный анализ Spark
четверг, 24 мая 2018 г.
LSM дерево: быстрый доступ по ключу в условиях интенсивной вставки
В условиях интенсивной вставки для быстрого доступа к данным обычные btree индексы не подходят.
Во многих базах (Bigtable, HBase, LevelDB, MongoDB, SQLite4, Tarantool, RocksDB, WiredTiger, Apache Cassandra, и InfluxDB ) используется LSM-дерево (Log-structured merge-tree — журнально-структурированное дерево со слиянием)
LSM дерево служит для хранения ключ-значения.
В данной статье рассмотрим двухуровневое дерево. Первый уровень целиком находится в памяти, вторая половина на диске.
Вставка идет всегда в первую часть в памяти, а поиск из всех частей. Когда размер данных в памяти превышает определенный порог, то она записывается на диск. После чего блоки на диске объединяются в один.
Рассмотрим алгоритм по частям:
1. Все записи в дереве маркируются порядковым номером glsn , при каждой вставке этот счетчик увеличивается
Структура для хранения ключа-значения и порядкового номера:
2. Структуры объядиняются в одну таблицу в памяти.
В моем алгоритме используется хэш таблица.
В реальности чаще всего используется отсортированная по ключу структура - это дает преимущества, что для поиска и слияния используется последовательное чтение, вместо рандомного.
3. Т.к. размер памяти ограничен, то при превышении определенного порога, данные должны быть скинуты на диск.
В моей реализации - это простой вариант без фоновых заданий и асинхронных вызовов
При достижении лимита будет фриз, т.к. таблица скидывается на диск.
В реальных системах используется упреждающая запись: данные на диск скидываются асинхронно и заранее, до достижения жесткого лимита памяти.
4. Получается, что половина всех данных у нас хранится в хэше в памяти, а остальное в виде файлов на диске.
Чтобы не перебирать все файлы на диске создается дополнительная индексная структура, которая будет хранить метаинформацию о файлах в памяти:
5. Управляющий объект всего дерева хранит ссылку на таблицу с данными в памяти и на мета-индексы данных на диске:
6. Когда все объекты описаны, давайте посмотрим как происходит добавление нового элемента.
Добавление всегда идет в таблицу в памяти, что гарантирует быструю вставку:
* Увеличиваем глобальный счетчик элементов LSMTree.glsn
* Если число элментов превысило порог, то таблица скидывается на диск и очищается
В реальности это конечно не число элементов, а объем в байтах.
* Создается новый элемент SSItem
* И доблавляется в хэш массив
Причем, если элемент до этого уже существовал, то он перезаписывается.
В реальности хранятся все записи, чтобы была возможность поддержки транзакционности.
* Кроме этого в индексе обновляются метаданные indx.add
7. С вставкой разобрались, теперь обратная операция - поиск элемента в дереве:
* Сперва мы проверяем наличие ключа в хэш таблице в памяти. Если элемент нашелся, то на этом все.
* Если элемента нет, то мы пробегамся по всем метаиндексам в поиске нашего ключа.
* проверяем, что ключ входит в диапазон indx.min_key - indx.max_key индекса
* И для полной точности проверяем наличие ключа в хэше ключей indx.keys.containsKey
* Если элемент нашелся, то это еще не конец, цикл продолжается, пока мы не переберем все индексы.
Т.к. ключ может добавляться несколько раз в разные промежутки времени.
* Из всех совпадений выбираем индекс с максимальным счетчиком вставки - это последнее изменение ключа
** Открываем нужный файл
В реальности, скорей всего, файы постоянно держатся открытыми для экономии времени.
** Следуя смещенями из индекса переходим к нужной точке файла file.seek
** И считываем значение file.read
8. Т.к. ключ может вставляться неограниченное число раз, то со временем он может находится в нескольких файлах на диске одновременно.
Также, если бы LSM дерево поддерживало транзакционность, то в файлах также хранились бы разные версии одного ключа (изменения или удаления).
Для оптимизации последующего поиска применяется операция слияния нескольких файлов в один:
* Сортируем индексы по убыванию lsn
* Последовательно считываем элементы из первого индекса и записываем в новый
* Если элемент уже присутствует в объединенном индексе, то он пропускается
* Создается новый единственный файл и индекс со всеми ключами и значениями
* Старые файлы и индексы удаляются
В реальности данные хранят в отсотированной структуре, что позволят выполнять слияние 2 отсортированных массивов в один за линейное время используя только быстрое последовательное чтение.
Полный код класса LSM дерева можно посмотреть на github
Во многих базах (Bigtable, HBase, LevelDB, MongoDB, SQLite4, Tarantool, RocksDB, WiredTiger, Apache Cassandra, и InfluxDB ) используется LSM-дерево (Log-structured merge-tree — журнально-структурированное дерево со слиянием)
LSM дерево служит для хранения ключ-значения.
В данной статье рассмотрим двухуровневое дерево. Первый уровень целиком находится в памяти, вторая половина на диске.
Вставка идет всегда в первую часть в памяти, а поиск из всех частей. Когда размер данных в памяти превышает определенный порог, то она записывается на диск. После чего блоки на диске объединяются в один.
Рассмотрим алгоритм по частям:
1. Все записи в дереве маркируются порядковым номером glsn , при каждой вставке этот счетчик увеличивается
Структура для хранения ключа-значения и порядкового номера:
//запись с ключ-значение class SSItem { Integer key; String value; //+ идентификатор последовательности вставки Integer lsn; SSItem(Integer key, String value, Integer lsn) { this.key = key; this.value = value; this.lsn = lsn; }
2. Структуры объядиняются в одну таблицу в памяти.
В моем алгоритме используется хэш таблица.
В реальности чаще всего используется отсортированная по ключу структура - это дает преимущества, что для поиска и слияния используется последовательное чтение, вместо рандомного.
class MemSSTable { //из-за хэша нет поиска по диапазону HashMap<Integer, SSItem> itms = new HashMap<Integer, SSItem>();
3. Т.к. размер памяти ограничен, то при превышении определенного порога, данные должны быть скинуты на диск.
В моей реализации - это простой вариант без фоновых заданий и асинхронных вызовов
При достижении лимита будет фриз, т.к. таблица скидывается на диск.
В реальных системах используется упреждающая запись: данные на диск скидываются асинхронно и заранее, до достижения жесткого лимита памяти.
void SaveToDisk() throws FileNotFoundException, UnsupportedEncodingException { indx.path = "sstable_" + indx.max_lsn + ".dat"; PrintWriter writer = new PrintWriter(indx.path, "UTF-8"); Integer pad = 0; //последовательно пишем 10 байт с длиной значения и само значение for(Entry<Integer, SSItem> entry : itms.entrySet()) { SSItem itm = entry.getValue(); String val = itm.get(); writer.print( val ); //регистрируем в индексе смещения в файле indx.keys.put(itm.key, pad); pad = pad + val.length(); } writer.close(); LSMTree.indexes.add(indx); } //SaveToDisk
4. Получается, что половина всех данных у нас хранится в хэше в памяти, а остальное в виде файлов на диске.
Чтобы не перебирать все файлы на диске создается дополнительная индексная структура, которая будет хранить метаинформацию о файлах в памяти:
//индекс над таблицей с даными class SSTableIndex { //метаданные: //минимальный и максимальный ключи в таблице Integer min_key; Integer max_key; //минимальный и максимальный порядковый lsn Integer min_lsn; Integer max_lsn; //если таблица на диске, то путь до файла String path; //ключ - смещение ключа в файле HashMap<Integer, Integer> keys = new HashMap<Integer, Integer>(); //добавить ключ в индекс void add(Integer k) { //также обновляем метаданные max_lsn = LSMTree.glsn; if(min_lsn == null) min_lsn = max_lsn; if(min_key == null || k < min_key) min_key = k; if(max_key == null || k > max_key) max_key = k; //добавление идет в память в хэш таблицу, на первом этапе смещения в файле нет keys.put(k, 0); }
5. Управляющий объект всего дерева хранит ссылку на таблицу с данными в памяти и на мета-индексы данных на диске:
public class LSMTree { static int glsn = 0; //максимальный размер, после которого таблица должна быть скинута на диск //для упрощения алгоритма = числу записей, а не размеру в байтах final static int max_sstable_size = 10; //текущая таблица в памяти, куда вставляем данные MemSSTable MemTable; //все индексы, даже для таблиц на диске, хранятся в памяти static LinkedList<SSTableIndex> indexes = new LinkedList<SSTableIndex>();
6. Когда все объекты описаны, давайте посмотрим как происходит добавление нового элемента.
Добавление всегда идет в таблицу в памяти, что гарантирует быструю вставку:
public class LSMTree { //.... //добавить запись public void add(Integer k, String v) { MemTable.add(k, v); }Подробней логика добавления в таблицу в памяти:
* Увеличиваем глобальный счетчик элементов LSMTree.glsn
* Если число элментов превысило порог, то таблица скидывается на диск и очищается
В реальности это конечно не число элементов, а объем в байтах.
* Создается новый элемент SSItem
* И доблавляется в хэш массив
Причем, если элемент до этого уже существовал, то он перезаписывается.
В реальности хранятся все записи, чтобы была возможность поддержки транзакционности.
* Кроме этого в индексе обновляются метаданные indx.add
class MemSSTable { //..... //добавить новый элемент void add(Integer k, String v) { //увеличиваем глобальный счетчик операций glsn LSMTree.glsn++; //если размер превышает, if(itms.size() >= LSMTree.max_sstable_size) { try { //то сохраняем таблицу на диск //в реальных движках используется упреждающая фоновая запись //когда папять заполнена на N% (n<100), то данные скидываются на диск заранее, чтобы избежать фриза при сбросе памяти и записи на диск SaveToDisk(); } catch (FileNotFoundException | UnsupportedEncodingException e) { e.printStackTrace(); return; } //очищаем данные под новые значения indx = new SSTableIndex(); itms = new HashMap<Integer, SSItem>(); } //обновляем медаданные в индексе indx.add(k); SSItem itm = new SSItem(k, v, indx.max_lsn); //в моей реализации, при повторе ключ перезаписывается //т.е. транзакционность и многоверсионность тут не поддерживается itms.put(k, itm); } //addЕсли после вставки нового элемента старые данные оказались на диске, то в метаданных индекса прописывается ссылка на точное смещение в файле:
class SSTableIndex { //... //если таблица на диске, то путь до файла String path; //ключ - смещение HashMap<Integer, Integer> keys = new HashMap<Integer, Integer>();
7. С вставкой разобрались, теперь обратная операция - поиск элемента в дереве:
* Сперва мы проверяем наличие ключа в хэш таблице в памяти. Если элемент нашелся, то на этом все.
* Если элемента нет, то мы пробегамся по всем метаиндексам в поиске нашего ключа.
* проверяем, что ключ входит в диапазон indx.min_key - indx.max_key индекса
* И для полной точности проверяем наличие ключа в хэше ключей indx.keys.containsKey
* Если элемент нашелся, то это еще не конец, цикл продолжается, пока мы не переберем все индексы.
Т.к. ключ может добавляться несколько раз в разные промежутки времени.
* Из всех совпадений выбираем индекс с максимальным счетчиком вставки - это последнее изменение ключа
public class LSMTree { //..... //получить значение по ключу String getKey(Integer key) { //сперва смотрим в памяти String val = MemTable.getKey(key); if(val == null) { SSTableIndex indx_with_max_lsn = null; //потом таблицу по индексам, которая содержит наш ключ //если содержится в нескольких, то берем с максимальным lsn, т.к. это последнее изменение for (SSTableIndex indx : indexes) { Integer max_lsn = 0; if(key >= indx.min_key && key <= indx.max_key && max_lsn < indx.max_lsn ) { if(indx.keys.containsKey(key)) { max_lsn = indx.max_lsn; indx_with_max_lsn = indx; } } } //читаем из таблицы с диска if(indx_with_max_lsn != null) { try { return indx_with_max_lsn.getByPath(key); } catch (IOException e) { e.printStackTrace(); } } } return val; }* Определим нужный индекс обращаемся по нему к таблице на диске:
** Открываем нужный файл
В реальности, скорей всего, файы постоянно держатся открытыми для экономии времени.
** Следуя смещенями из индекса переходим к нужной точке файла file.seek
** И считываем значение file.read
class SSTableIndex { //... //получить значение ключа из открытого файла String getByPath(Integer key, RandomAccessFile file) throws IOException { //получаем смещение в файле для ключа из индекса Integer offset = keys.get(key); //смещаемся в файле file.seek(offset); //резервируем 10 байт под переменную с длинной значения byte[] lenb = new byte[10]; file.read(lenb, 0, 10); Integer len = Integer.parseInt(new String(lenb, StandardCharsets.UTF_8)); file.seek(offset + 10); //считываем значение byte[] valb = new byte[len]; file.read(valb, 0, len); return new String(valb, StandardCharsets.UTF_8); } //getByPath //получить значение ключа с диска String getByPath(Integer key) throws IOException { if(!keys.containsKey(key)) return null; RandomAccessFile file = new RandomAccessFile(path, "r"); String val = getByPath(key, file); file.close(); return val; }
8. Т.к. ключ может вставляться неограниченное число раз, то со временем он может находится в нескольких файлах на диске одновременно.
Также, если бы LSM дерево поддерживало транзакционность, то в файлах также хранились бы разные версии одного ключа (изменения или удаления).
Для оптимизации последующего поиска применяется операция слияния нескольких файлов в один:
* Сортируем индексы по убыванию lsn
* Последовательно считываем элементы из первого индекса и записываем в новый
* Если элемент уже присутствует в объединенном индексе, то он пропускается
* Создается новый единственный файл и индекс со всеми ключами и значениями
* Старые файлы и индексы удаляются
public class LSMTree { //.... //объединить несколько таблиц на диске в 1 большую void merge() throws IOException { Integer min_key = null; Integer max_key = null; Integer min_lsn = null; Integer max_lsn = null; //сортируем таблицы по убыванию lsn //чтобы вначале были самые свежие ключи Collections.sort(indexes, new Comparator<SSTableIndex>() { @Override public int compare(SSTableIndex o1, SSTableIndex o2) { if(o1.max_lsn > o2.max_lsn) { return -1; } else if(o1.max_lsn < o2.max_lsn) { return 1; } return 0; } }); SSTableIndex merge_indx = new SSTableIndex(); Integer pad = 0; merge_indx.path = "sstable_merge.dat"; PrintWriter writer = new PrintWriter(merge_indx.path, "UTF-8"); //пробегаемся по всем индексам, чтобы слить в 1 for (SSTableIndex indx : indexes) { if(min_lsn == null || indx.min_lsn < min_lsn) min_lsn = indx.min_lsn; if(min_key == null || indx.min_key < min_key) min_key = indx.min_key; if(max_key == null || indx.max_key > max_key) max_key = indx.max_key; if(max_lsn == null || indx.max_lsn > max_lsn) max_lsn = indx.max_lsn; RandomAccessFile file = new RandomAccessFile(indx.path, "r"); //т.к. данные в таблицах не упорядочены, это приводит к рандомным чтениям с диска //в реальности делают упорядочнный по ключу массив, чтобы делать быстрые последовательные чтения for(Entry<Integer, Integer> entry : indx.keys.entrySet()) { //оставляем запись только с максимальным lsn Integer key = entry.getKey(); if(!merge_indx.keys.containsKey(key)) { String val = indx.getByPath(key, file); SSItem itm = new SSItem(key, val, 0); String itmval = itm.get(); writer.print( itmval ); merge_indx.keys.put(key, pad); pad = pad + itmval.length(); } } //записываем и удаляем старые файлы file.close(); //delete File fd = new File(indx.path); fd.delete(); } merge_indx.min_lsn = min_lsn; merge_indx.min_key = min_key; merge_indx.max_key = max_key; merge_indx.max_lsn = max_lsn; writer.close(); //переименовываем к обычному имени File fd = new File(merge_indx.path); merge_indx.path = "sstable_" + merge_indx.max_lsn + ".dat"; File fdn = new File(merge_indx.path); fd.renameTo(fdn); indexes = new LinkedList<SSTableIndex>(); indexes.add(merge_indx); } //mergeВ моей реализации используется хэш массив, что дает большое число случайных чтений с диска.
В реальности данные хранят в отсотированной структуре, что позволят выполнять слияние 2 отсортированных массивов в один за линейное время используя только быстрое последовательное чтение.
Полный код класса LSM дерева можно посмотреть на github
пятница, 5 января 2018 г.
Hive: авторизация и аутентификация
Сборка Cloudera Hadoop не содержит никаких настроек авторизации и аутентификации, что конечно плохо, т.к. не дает возможности разделить права среди пользователей.
Опишу один из вариантов настройки авторизации и аутентификации.
2. Создаем пользователя в hive
3. Создаем java класс, через который будет происходить проверка пользователя:
user1;passwd
4. Компилируем класс в объект и собираем jar файл:
компилировать нужно именно той java под которой работает hadoop:
5. Включаем CUSTOM аутентификацию в настройках Cloudera Manager:
прописываем наш аутентификатор в настройках:
* Ищем все настройки по паттерну "hive-site.xml" в настройках Cloudera.
Делать нужно именно через cloudera, а не напрямую в файле "hive-site.xml", т.к. менеджер переписывает файл из памяти процесса.
Нажимаем view as xml и во всех окнах и вводим
Также ставим флажки suppress parameter validation
Иначе настройки не попадут в итоговый XML файл
* Для работы также нужна включенная опция hive.server2.enable.doAs = true
Она означает, что все запросы будут идти из под пользователя, который прошел аутентификацию.
5. Задаем путь до кастомной папки с jar файлами:
Настройка: «HIVE_AUX_JARS_PATH»
Значение «/u01/jar»,
Также ставим suppress parameter validation
6. Все готово, перезапускам HIVE из консоли Cloudera
7. Теперь при аутентификации нужно указывать способ:
После того как все настроили, мы уверены, что пользователь прошел аутентификацию.
Войти без пароля больше не получится.
Дальше настраиваем авторизацию - выдачу прав пользователям на конкретные объекты.
Проще всего это сделать через расширенные права linux:
1. Включаем ACL права через Cloudera Manager
настройка "dfs.namenode.acls.enabled" = true
2. После этого можно распределять права на папки:
Сброс всех прав:
Даем пользователю user1 права на создание таблиц в своей схеме
Как победить эту проблему я не понял.
Так же стоит не забывать, что таким образом мы защитили только HIVE, возможность читать данные через HDFS по прежнему остались.
(Как читать данные из HDFS и HIVE через java можно посмотреть в моем github: https://github.com/pihel/java/blob/master/bigdata/Hdfs2Hive.java )
Чтобы закрыть доступ чтения через HDFS мы используем закрытие всех портов на сервере, кроме 10000 , который используется службой HIVE.
Опишу один из вариантов настройки авторизации и аутентификации.
Custom аутентификация:
1. Создаем linux пользователя на сервере hadoop:sudo useradd user1 #создаем домашнюю директорию в hdfs sudo -u hdfs hadoop fs -mkdir /user/user1 #задаем пользователя владельцем sudo -u hdfs hadoop fs -chown user1 /user/user1 sudo passwd user1 #лочим пользователя в linux sudo passwd -l user1
2. Создаем пользователя в hive
sudo -u hive hive CREATE SCHEMA user1 LOCATION "/user/user1";
3. Создаем java класс, через который будет происходить проверка пользователя:
package hive.lenta; import java.util.Hashtable; import javax.security.sasl.AuthenticationException; import org.apache.hive.service.auth.PasswdAuthenticationProvider; import java.io.*; public class LentaAuthenticator implements PasswdAuthenticationProvider { Hashtable<String, String> store = null; public LentaAuthenticator () { store = new Hashtable<String, String>(); try { readPwd(); } catch (IOException e) { e.printStackTrace(); } } //LentaAuthenticator public void readPwd () throws IOException { BufferedReader br = new BufferedReader(new FileReader("/u01/jar/pwd.ini")); String line; while ((line = br.readLine()) != null) { String login[] = line.split(";"); if(login.length == 2) { store.put(login[0].trim(), login[1].trim()); } } br.close(); } //readPwd @Override public void Authenticate(String user, String password) throws AuthenticationException { String storedPasswd = store.get(user); if (storedPasswd != null && storedPasswd.equals(password)) return; throw new AuthenticationException("LentaAuthenticator: Error validating user"); } }В файле /u01/jar/pwd.ini лежит логин и пароль, разделенные точкой с запятой:
user1;passwd
4. Компилируем класс в объект и собираем jar файл:
компилировать нужно именно той java под которой работает hadoop:
/usr/java/jdk1.7.0_67-cloudera/bin/javac -classpath /u01/cloudera/parcels/CDH-5.11.1-1.cdh5.11.1.p0.4/lib/hive/lib/hive-service.jar -d /home/askahin/auth /home/askahin/auth/LentaAuthenticator.java /usr/java/jdk1.7.0_67-cloudera/bin/jar -cf /home/askahin/auth/jar/lentauth.jar ./hive sudo cp jar/lentauth.jar /u01/cloudera/parcels/CDH-5.11.1-1.cdh5.11.1.p0.4/lib/hive/lib/ sudo chmod 777 /u01/cloudera/parcels/CDH-5.11.1-1.cdh5.11.1.p0.4/lib/hive/lib/lentauth.jar sudo cp jar/lentauth.jar /u01/jar/lentauth.jar sudo chmod 777 /u01/jar/lentauth.jar
5. Включаем CUSTOM аутентификацию в настройках Cloudera Manager:
прописываем наш аутентификатор в настройках:
* Ищем все настройки по паттерну "hive-site.xml" в настройках Cloudera.
Делать нужно именно через cloudera, а не напрямую в файле "hive-site.xml", т.к. менеджер переписывает файл из памяти процесса.
Нажимаем view as xml и во всех окнах и вводим
<property> <name>hive.server2.authentication</name> <value>CUSTOM</value> </property> <property> <name>hive.server2.custom.authentication.class</name> <value>hive.lenta.LentaAuthenticator</value> </property>
Также ставим флажки suppress parameter validation
Иначе настройки не попадут в итоговый XML файл
* Для работы также нужна включенная опция hive.server2.enable.doAs = true
Она означает, что все запросы будут идти из под пользователя, который прошел аутентификацию.
5. Задаем путь до кастомной папки с jar файлами:
Настройка: «HIVE_AUX_JARS_PATH»
Значение «/u01/jar»,
Также ставим suppress parameter validation
6. Все готово, перезапускам HIVE из консоли Cloudera
7. Теперь при аутентификации нужно указывать способ:
AuthMech=3
После того как все настроили, мы уверены, что пользователь прошел аутентификацию.
Войти без пароля больше не получится.
Дальше настраиваем авторизацию - выдачу прав пользователям на конкретные объекты.
Проще всего это сделать через расширенные права linux:
ACL авторизация
1. Включаем ACL права через Cloudera Manager
настройка "dfs.namenode.acls.enabled" = true
2. После этого можно распределять права на папки:
Сброс всех прав:
sudo -u hdfs hdfs dfs -setfacl -R -b /
Даем пользователю user1 права на создание таблиц в своей схеме
sudo -u hdfs hdfs dfs -setfacl -R --set user::rwx,user:hadoop:rwx,user:hive:rwx,user:user1:rwx,group::r-x,other::--- /user/user1к остальному хранилищу hive даем права только на чтение:
sudo -u hdfs hdfs dfs -setfacl -R --set user::rwx,user:hadoop:rwx,user:hive:rwx,user:user1:r-x,group::r-x,other::--- /user/hive/warehouseк какой-то из таблиц закрываем доступ:
sudo -u hdfs hdfs dfs -setfacl -R --set user::rwx,user:hadoop:rwx,user:hive:rwx,user:user1:---,group::r-x,other::--- /user/hive/warehouse/table1Один минус у ACL - это остается возможность выполнить DROP table, даже если нет прав доступа на чтение или запись.
Как победить эту проблему я не понял.
Так же стоит не забывать, что таким образом мы защитили только HIVE, возможность читать данные через HDFS по прежнему остались.
(Как читать данные из HDFS и HIVE через java можно посмотреть в моем github: https://github.com/pihel/java/blob/master/bigdata/Hdfs2Hive.java )
Чтобы закрыть доступ чтения через HDFS мы используем закрытие всех портов на сервере, кроме 10000 , который используется службой HIVE.
вторник, 15 августа 2017 г.
HIVE: Своя быстрая функция замен встроенной
Сегодня расскажу об одном способе ускорения запросов с аналитическими функциями в субд HIVE, работающей поверх Hadoop.
Один из вариантов ускорить HIVEQL запрос - это переписать встроенную аналитическую функцию на свой упрощенный вариант.
К примеру функция ROW_NUMBER(OVER PARTITION BY c1 ORDER BY c2) имеет достаточно сложную реализацию (github) только для того чтобы посчитать номер строки в группе.
Пример запроса с row_number:
Можно реализовать значительно упрощенную версию подсчета номера строки в группе.
На вход функции Rank.evaluate подаем значение группы key (то что было в partition by) и инкрементируем значение счетчика counter.
Если приходит новая группа, то счетчик сбрасывается на 0, а в переменную группы "this.last_key" записывается значение новой группы:
Пример запроса с собственной функцией:
За счет параллельности мы опять же ускорим сортировку.
Чтобы создать такую функцию в HIVE нужно скомпилировать ее из java исходников:
запускаем hive и регистрируем наш jar , как функцию с произвольным названием:
Такое небольшое изменение ускорит выполнение запроса на 10-20%.
Один из вариантов ускорить HIVEQL запрос - это переписать встроенную аналитическую функцию на свой упрощенный вариант.
К примеру функция ROW_NUMBER(OVER PARTITION BY c1 ORDER BY c2) имеет достаточно сложную реализацию (github) только для того чтобы посчитать номер строки в группе.
Пример запроса с row_number:
create table tmp_table stored as orc as select v.material, v.client_id, row_number() over (partition by v.client_id order by v.clientsum desc, v.checkcount desc) as rn from pos_rec_itm_tst v;
Можно реализовать значительно упрощенную версию подсчета номера строки в группе.
На вход функции Rank.evaluate подаем значение группы key (то что было в partition by) и инкрементируем значение счетчика counter.
Если приходит новая группа, то счетчик сбрасывается на 0, а в переменную группы "this.last_key" записывается значение новой группы:
package com.example.hive.udf; import org.apache.hadoop.hive.ql.exec.UDF; public final class Rank extends UDF{ private int counter; private String last_key; public int evaluate(final String key){ if ( !key.equals(this.last_key) ) { this.counter = 0; this.last_key = key; } return (++this.counter); } }Понятно, что для правильной работы этой функции набор данных нужно предварительно отсортировать по группе партицирования "partition by", а потом по остальным полям "order by".
Пример запроса с собственной функцией:
create table tmp_table stored as orc as select v.material, v.client_id, myrank(v.client_id) as rn from ( SELECT client_id, clientsum, checkcount, material FROM pos_rec_itm_tst DISTRIBUTE BY client_id SORT BY clientsum desc, checkcount desc ) v;Дополнительно сортировку в HIVE можно ускорить, если распараллелить мапперы по полю партицирования "partition by", а внутри этих групп сортировать по полям из "order by".
За счет параллельности мы опять же ускорим сортировку.
Чтобы создать такую функцию в HIVE нужно скомпилировать ее из java исходников:
$> /путь_до_java_который_используется_в_hive/bin/javac -classpath /путь_до_hive/lib/hive/lib/hive-serde-1.7.jar:/путь_до_hive/lib/hive/lib/hive-exec.jar:/путь_до_hadoop/lib/hadoop/client-0.20/hadoop-core.jar -d /путь_куда_компилим /путь_до_программы.java $> /путь_до_java_который_используется_в_hive/bin/jar -cf название_jar_программы.jar com/example/hive/udf/название_класса.class
запускаем hive и регистрируем наш jar , как функцию с произвольным названием:
hive> add jar Rank.jar; hive> create temporary function myrank as 'com.example.hive.udf.Rank';
Такое небольшое изменение ускорит выполнение запроса на 10-20%.
воскресенье, 26 марта 2017 г.
Решение bigdata задач на Hadoop mapreduce
Hadoop
Набор утилит, библиотек и фреймворк для разработки и выполнения распределённых программ, работающих на кластерах из сотен и тысяч узлов.Ссылка на установку: http://www.cloudera.com/content/www/en-us/downloads.html
Hadoop состоит из 2 основных частей hdfs и map reduce. Рассмотрим подробней.
Hdfs
- распределенная файловая система. Это значит, что данные файла распределены по множеству серверов.* hdfs лучше работает с небольшим числом больших файлов/блоков
* один раз записали, много раз считали
* можно только целиком считать, целиком очистить или дописать в конец (нельзя с середины)
* файлы бьются на блоки split (к примеру 64мб)
** размер блока выбирается так, чтобы нивилировать время передачи блока по сети (если сеть быстрая, то блок можно поменьше, если медленная, то больше)
* все блоки реплицируются с фактором 3 поумолчанию (хранятся в 3 копиях на разных серверах)
** это обеспечивает высокую пропускную способность, но не скорость реакции
Hdfs java api
Пример программы копирующей один файл в другой:import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; import java.io.InputStream; Path source_path = new Path(args[0]); Path target_path = new Path(args[1]); Configuration conf = new Configuration(); FileSystem source_fs = FileSystem.get(source_path.toUri(), conf); FileSystem target_fs = FileSystem.get(target_path.toUri(), conf); //доступ к нескольким файлам по маскам //FileStatus [] files = fs.globStatus(glob); //?, *, [abc], [^a], {ab, cd} FSDataOutputStream output = target_fs.create(target_path); InputStream input = null; try{ input = source_fs.open(source_path); IOUtils.copyBytes(input, output, conf); } finally { IOUtils.closeStream(input); IOUtils.closeStream(output); }
Парадигма map reduce
1. input file: split 1 + split 2 + split 3 (блоки файла). Каждый сплит хранится на своем сервере кластера.2. mapper:
* на каждый split N создается worker для обработки
* worker выполяется на сервере, где находится блок
* workerы не могут обмениваются между собой данными
** т.е. mapper подходит для независимых данных, которые легко побить на части.
зависимые данные (типа архива zip) нельзя побить на части в mapper:
все блоки (split) будут переданы в один mapper
* в случае ошибки процесс рестратует на другой копии блока hdfs
* combine - (необязательный шаг) - reducer внутри mappera агрегирующие данные в меньшее число данных (ключ менять нельзя)
** агрегирует только данные одного маппера
* partition - (необязательный шаг) - определение куда отправится ключ (поумолчанию hash(k) mod N , где N - число reducer )
3. результат mapper записывается в виде ключ значение {k->v} в циклический буфер в памяти, конец списка пишется на локальный диск сервера с worker
** это сделано в целях производительности, чтобы не реплицировать
** но в случае ошибки, данные нельзя будет восстановить и потребует рестартовать mapper
4. данные из {k->v} расходятся на сервера reduccer на основе ключа.
Т.е. данные с разных mapperов но с одним key попадут в один reducer
5. Результат reducer складываются в hdfs. Число файлов = число reducer
Hadoop streaming
Инструмент для быстрого прототипирования map reduce задач.можно писать python программы заготовки, общение маппера с редусером проиходит чреез stdin/out.
Данные передаются как текст разделенный табом
word count
#!/usr/bin/python import sys for line in sys.stdin: for token in line.strip().split(" "): if token: print(token + '\t1') #!/usr/bin/python import sys (lastKey, sum)=(None, 0) for line in sys.stdin: (key, value) = line.strip().split("\t") if lastKey and lastKey != key: print (lastKey + '\t' + str(sum)) (lastKey, sum) = (key, int(value)) else: (lastKey, sum) = (key, sum + int(value)) if lastKey: print (lastKey + '\t' + str(sum))
Запуск:
hadoop jar $HADOOP_HOME/hadoop/hadoop-streaming.jar \ -D mapred.job.name="WordCount Job via Streaming" \ -files countMap.py, countReduce.py \ -input text.txt \ -output /tmp/wordCount/ \ -mapper countMap.py \ -combiner countReduce.py \ -reducer countReduce.py
Java source code example
Программа подсчитывающая число слов в hdfs файле.word count
public class WordCountJob extends Configured implements Tool { static public class WordCountMapper extends Mapper < LongWritable, Text, Text, IntWritable > { private final static IntWritable one = new IntWritable(1); private final Text word = new Text(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //разбиваем строку (key) на слова и передаем пару: слово, 1 StringTokenizertokenizer = new StringTokenizer(value.toString()); while (tokenizer.hasMoreTokens()) { text.set(tokenizer.nextToken()); context.write(text, one); } } //map } //WordCountMapper static public class WordCountReducer extends Reducer < Text, IntWritable, Text, IntWritable > { @Override protected void reduce(Text key, Iterable < IntWritable > values, Context context) throws IOException, InterruptedException { //в редусер приходят все единицы от одного слова intsum = 0; for (IntWritable value: values) { sum += value.get(); } context.write(key, new IntWritable(sum)); } //reduce } //WordCountReducer @Override public int run(String[] args) throws Exception { Job job = Job.getInstance(getConf(), "WordCount"); //класс обработчик job.setJarByClass(getClass()); //путь до файла TextInputFormat.addInputPath(job, new Path(args[0])); //формат входных данных (можно отнаследовать и сделать собственный) job.setInputFormatClass(TextInputFormat.class); //классы мапперов, редусееров, комбайнеров и т.д. job.setMapperClass(WordCountMapper.class); job.setReducerClass(WordCountReducer.class); job.setCombinerClass(WordCountReducer.class); //выходной файл TextOutputFormat.setOutputPath(job, new Path(args[1])); job.setOutputFormatClass(TextOutputFormat.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); return job.waitForCompletion(true) ? 0 : 1; } //run public static void main(String[] args) throws Exception { int exitCode = ToolRunner.run( new WordCountJob(), args); System.exit(exitCode); } //main } //WordCountJob
Такая программа будет тратить все время на передачу данных от маппера к редусеру.
Её можно усовершенствовать несколькими способами:
* Сделать комбайнер - чтобы данные из одного маппера были сагрегированы там.
- не обязательно вызывается
+/- агрегирует данные только одного вызова маппера (одной строки)
* хэш аггегировать внутри маппера:
+ обязательно выполнится
+/- агрегирует данные только одного вызова маппера (одной строки)
- нужно следить за размером хэш массива
* хэш массив сверху маппера, а внутри маппера обрабатывать
+ обязательно выполнится
+ агрегирует все вызовы маппера (сплита)
- нужно следить за размером хэш массива
Sql операции
Разность (minus) и другое по аналогии// на вход подаются элементы из двух множеств A и B class Mapper method Map(rowkey key, value t) Emit(value t, string t.SetName) // t.SetName либо ‘A‘ либо ‘B' class Reducer // массив n может быть [‘A'], [‘B'], [‘A' ‘B‘] или [‘B', ‘A'] method Reduce(value t, array n) if n.size() = 1 and n[1] = 'A' Emit(value t, null) * hash join class Mapper method Initialize H = new AssociativeArray : join_key -> tuple from A //хэшируем меньшую таблицу целиком в память A = load() for all [ join_key k, tuple [a1, a2,...] ] in A H{k} = H{k}.append( [a1, a2,...] ) method Map(join_key k, tuple B) for all tuple a in H{k} //ищем в A соответствие ключа из B Emit(null, tuple [k a B] ) //если нашли, то записываем
Операции на графах
оптимальное представление в виде списка смежности, т.к. матрица смежности занимает слишком много места* поиск кратчайшего пути
** обычный алгоритм - дейкстра
** поиск в ширину (bfs)
на невзвешенном графе (нет связей назад):
- Останавливаемся, как только расстояния до каждой вершины стало известно
- Останавливаемся, когда пройдет число итераций, равное диаметру графа
на взвешенном графе (есть обратные связи):
- Останавливаемся, после того, как расстояния перестают меняться
* PR = a (1/N) + (1-a)* SUM( PR(i) / C(i) )
** N - начальное число точек в графе
** C - число исходящих точек
** a - вероятность случайного перехода
** pr решается только приблизительно:
- останавливаем, когда значения перестают меняться
- фиксированное число итерации
- когда изменение значение меньше определенного числа
* проблемы
** необходимость передавать весь граф между всеми задачами
** итеративный режим, т.е. переход на следующий этап, когда все параллельные задачи выполнятся
* оптимизации:
** inmemory combining - агрегируем сообщения в общем массиве
** партицирование - смежные точки оказались на одном маппере
** структуру графа читать из hdfs, а не передавать + партицирование
Высокоуровневые языки поверх hdfs или hadoop:
Pig
высокоуровневый язык + компилятор в mapreduceиспользуется для быстрого написания типовых map-reduce задач
основные составляющие:
* field - поле
* tuple - кортеж (1, "a", 2)
* bag - коллекция кортежей {(), ()}
** коллекции могут содержать разное кол-во полей в кортеже (также они могут быть разных типов)
word count:
input_lines = LOAD 'file-with-text' AS (line:chararray); words = FOREACH input_lines GENERATE FLATTEN(TOKENIZE(line)) AS word; filtered_words = FILTER words BY word MATCHES '\\w+'; word_groups = GROUP filtered_words BY word; word_count = FOREACH word_groups GENERATE COUNT(filtered_words) AS count, group AS word; ordered_word_count = ORDER word_count BY count DESC; STORE ordered_word_count INTO ‘number-of-words-file';
join
//Загрузить записи в bag #1 posts = LOAD 'data/user-posts.txt' USING PigStorage(',') AS (user:chararray,post:chararray,date:long); //Загрузить записи в bag #2 likes = LOAD 'data/user-likes.txt' USING PigStorage(',') AS (user:chararray,likes:int,date:long); userInfo = JOIN posts BY user, likes BY user; DUMP userInfo;
Hive
sql подобный язык hivesql поверх hadoopосновные составляющие - как в бд
метаданные могут храниться в mysql /derby
* создание таблицы
hive> CREATE TABLE posts (user STRING, post STRING, time BIGINT) > ROW FORMAT DELIMITED > FIELDS TERMINATED BY ',' > STORED AS TEXTFILE;
* при нарушении схемы ошибки не будет, вставится NULL
join
hive> INSERT OVERWRITE TABLE posts_likes > SELECT p.user, p.post, l.count > FROM posts p JOIN likes l ON (p.user = l.user);
word count
CREATE TABLE docs (line STRING); LOAD DATA INPATH 'docs' OVERWRITE INTO TABLE docs; CREATE TABLE word_counts AS SELECT word, count(1) AS count FROM (SELECT explode(split(line, '\s')) AS word FROM docs) w GROUP BY word ORDER BY word;
Nosql
* Масштабирование:** мастер-слейв: распределение нагрузки cpu, io остается общим
** шардинг: деление данных таблицы по серверам: распределение io, cpu Общее
* cap theorem - достигается только 2 из 3
** consistency - непротиворечивость
Чаще всего жертвует строгой целостностью в текущий момент, на целостность в конечном счете
** aviability - доступность
** partioning - разделение данных на части
* типы:
** ключ-значение
** колоночная
** документоориентировання
** графовая
Hbase
- поколоночная база** строки в таблицах шардируются на сервера
*** строки хранятся в отстортированном виде, чтобы можно было искать по серверам
*** список шардов хранится в HMaster
через него определяется сервер для чтений, а дальше чтение идет уже напряму с него миную hmaster
! возможная точка отказа
** столбцы объединяются в column family
*** каждая такая группа сохраняется также в отдельный файл
*** для всей группы задается степень сжатия и другие физические настройки
** изменение/удаление происходит через вставку нового значения в колонку с TS
*** при select выбираются последние 3 изменения (если в колонке не было изменений, то берется предыдущая максимальная версия)
** промежуточные данные хранятся в memstore сервера и логе изменений
*** периодически данные из памяти скидываются на hdfs
** hfile - мапится в файл hdfs, который также splitit'ится на разные сервера
** сжатие файлов хранилища:
*** объединение файлов CF в один
*** очистка от удаленных записей
Массовость - деление данных на сервера по:
* по семейству колонок
* по диапазону ключей
* по версии строки
* сам файл разбивается в hdfs на сплиты
Cassandra
отличия:** есть sql, а не только java api
** CF имеют иерархичность и могут также объединяться в группы по какомуто признаку
** настраевая consistency от самой строгой - ожидание отклика всех серверов, то записи без проверок
** нет единой точки отказа hmaster, данные делятся по диапазону
** но усложняет деление и поиск данных
* greenplum - классическая колоночная бд, с шардирование данных на основе pgsql
** есть поддержка sql
Spark
Альтернативный вариант реализации hadoop.* Преимущества перед hadoop:
1. Необязательно сохранять промежуточные результаты в HDFS при итеративной разработке. Данные можно хранить в памяти.
2. Необязательно чередовать map-reduce, можно делать несколько reducer подряд
3. Операции в памяти (вкллючая синхронизацию между серверами), а не через hdfs
4. Возможность загрузить данные в распределенную память, а потом делать обработку (каждому mapper уже не надо считывать данные из hdfs)
* В основе может лежать любой вид хранилища данных
* Архитектура, как в hadoop: name node (driver) + worker
* Основная составляющая вещь - RDD:
** объект партицируется
** каждая партиция имеет произвольное хранение: память, диск, hdfs и т.д.
*** Для хранения данных в памяти используется LRU буфер.
Старые, редко используемые данны вытесняются в медленное хранилище
** объект неизменяем (изменение создает новый объект)
** rdd распределена по серверам
* Программа состоит из однонаправленного графа операций без циклов
Виды зависимостей в графе:
** Narrow (узкая) - переход 1 партииции в 1 другую
** wide (широкая) - переход 1 (N) партиций в N (1)
* Восстановление в случае падения:
- нет репликации как в hdfs
+ для воостановления используется перезапуск
** если потерялись данные из одной партиции в Narrow связи, то восстанавливается только она из предыдущей партиции
** если в wide то перезапускаются все партиции (т.к. связ не 1 к 1).
Результат wide поумолчанию сохраняется на диск.
** Если есть шанс потерять данные, а цепочка длинная, то лучше самим самостоятельно сохранять промежуточные результаты на диск.
* Типы операции состоят из:
** Трансформация - не запускают работу
map, filer, group, union, join ....
** Действий - запускают работу (все трансформации в графе до этого графа)
count, collect, save, reduce
* spark поддерживает:
** java
** scala
** python
Join на scala
import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ val sc = new SparkContext(master, appName, [sparkHome], [jars]) //transform //или из текстового файла sc.textFile("file.txt") val visits = sc.parallelize( Seq(("index.html", "1.2.3.4"), ("about.html", "3.4.5.6"), ("index.html", "1.3.3.1"))) val pageNames = sc.parallelize( Seq(("index.html", "Home"), ("about.html", "About"))) visits.join(pageNames) //action visits.saveAsTextFile("hdfs://file.txt") // ("index.html", ("1.2.3.4", "Home")) // ("index.html", ("1.3.3.1", "Home")) // ("about.html", ("3.4.5.6", "About"))word count
val file = sc.textFile("hdfs://...") val sics = file.filter(_.contains("MAIL")) val cached = sics.cache() val ones = cached.map(_ => 1) val count = ones.reduce(_+_) val file = sc.textFile("hdfs://...") val count = file.filter(_.contains(“MAIL")).count()или
val file = spark.textFile("hdfs://...") val counts = file.flatMap(line => line.split(" ")) .map(word => (word, 1)) .reduceByKey(_ + _) counts.saveAsTextFile("hdfs://...")
* Переменные для обмена данными между серверами
** broadcast variables - переменная только для чтения
val broadcastVar = sc.broadcast(Array(1, 2, 3)) ... broadcastVar.value** Accumulators - счетчики
val accum = sc.accumulator(0) accum += x accum.value
Yarn
планировщик ресурсов, которые избавляет от недостатков классического MR:* жесткое разделенеи ресурсов: можно освободить ресурсы от mapper после их полного выполнения и отдать все ресурсы reducer
* возможность разделять ресурсы не только между MR Задачами, но и другими процессами
* разделение происходит в понятии: озу, cpu, диск, сеть и т.д.
* он же производит запуск задач (не только MR) и восстанавливает в случае падения
* для работы нужны вспомогательные процессы:
** appserver - получение комманд от клиента
** resource server - север для выделяющий ресурсы
** node server - сервер для запуска задач и отслеживания за ними (на каждом сервере)
Подписаться на:
Сообщения (Atom)