Posted in Khác

SPARK

1.Khởi tạo Spark trong Java

import org.apache.spark.SparkConf;

import org.apache.spark.api.java.JavaSparkContext;

 

SparkConf conf = new SparkConf().setMaster(“local”).setAppName(“Filtering”);

JavaSparkContext sc = new JavaSparkContext(conf);

Hai tham số của SparkConf đó là :

  • Cluster URL , local có nghĩa là Spark sẽ chạy 1 thread trong máy local , không kết nối với các cluster khác
  • Tên của application , Nó sẽ xác định tên của applicating của bạn trong cluster manager’s UI nếu bạn kết nối tới một clusster
  • Và một số các tham số khác để configuring ứng dụng của bạn sẽ hoạt động ntn ….

Để shut down Spark , có thể gọi stop() method , hoặc thoát application như : System.exit(0) hoặc sys.exit()

2.Programming with RDDs

Spark’s core abstraction for working with data , the resilient distributed dataset(RDD). RDD là tập hợp rời rạc của các elements . Spark tự động phân tán dữ liệu trong RDDs trong các cluster và song song thực hiện chúng

Mỗi RDD trong Spark đơn giản là một tập phân phối bất biến của các objects . Mỗi RDD được chia ra thành các partitions , có thể được tính toán ở các nodes khác nhau của các cluster . RDD có thể chưas tất các các type Python , Java , Scala object , kể cả user-defined classes .

Users create RDDs in two ways : by loading an external dataset , or distributing a collection of objects ( list or set ) in their driver program .Sau khi khởi tạo , RDDs cung cấp 2 dạng operations : transformations actions . Transformations xây dựng RDD mới từ precious one . Ví dụ , tạo RDD mới mà chỉ bao gồm các string mà bao gôm từ Python . Actions thì tính toán kết quả dựa vào một RDD , và trả về kết quả cho driver program hoặc lưu nó vào external storage system ( như HDFS) . Ví dụ first() sẽ trả về elements đầu tiên của một RDD.

Transformations và actions khác nhau theo cách mà Spark xử lý RDDs đó . Chúng ta có thể định nghĩa RDDs mới bất kì lúc nào , nhưng Spark chỉ tính toán nó theo a lazy fashion , đó là thực hiện khi RDDs này trong action đầu tiên . Điều này có thể là khó hiểu , nhưng lại hợp lý khi bạn làm việc với Big Data . Ví dụ , trong phương thức first() , Spark sẽ scan file để tìm ra first matching line ; nó sẽ không đọc hết cả file .

Sparks’s RDDs mặc định tính toán lại mỗi lần bạn chạy một action trong nó . Nếu bạn muốn dùng một RDD trong nhiều actions , bạn có thể yêu cầu Spark persit , sử dụng RDD.persit(). Điều này là hợp lý bởi vì , nếu bạn không sử dụng lại RDD , chả có lý do gì để tốn bộ nhớ để lưu.

CREATING RDDs

Users create RDDs in two ways : by loading an external dataset , or distributing a collection of objects ( list or set ) in their driver program .Cách đơn giản nhất để tạo ra RDDs là sử dụng một collection có sẵn và pass it to SparkContext’s parallelize() method (đòi hỏi bạn phải có dataset trong memory ở một machine )

JavaRDD<String> lines = sc.parallelize(Arrays.asList(“pandas”,”i like pandas”));

Một cách phổ biến khác để tạo ra RDDs đó là load data from external storage . Chúng ta có thể load rất nhiều các định dạng file khác nhau như text,json , csv,…Với file text :

JavaRDD<String> lines = sc.textFile(“README.txt”);

RDD Operations

RDDs hỗ trợ 2 kiểu operations : transformations actions . Transformations là một operations trong RDDs mà sẽ trả lại một RDD mới , như map() hay filter() . Actions là một operations mà sẽ trả về kết quả cho driver program hoặc write it to storage và tính toán như count() first()

Transformations

Giả dụng chúng ta có một logfile , log.txt , có một số các messages , chúng ta muốn chọn ra các error message . Chúng ta có thể sử dụng filter() transformation :

JavaRDD<String> inputRDD = sc.textFile(“README.txt”);

JavaRDD<String> errorsRDD = inputRDD.filter(

new Function<String,Boolean>(){

public Boolean call(String x){return x.contains(“error”);}

}

);

filter() sẽ không thay đổi inputRDD mà sẽ trỏ đến một RDD hoàn toàn mới . Chúng ta thu được RDDs mới từ các RDD sử dụng transformations . Spark sẽ lưu trữ các phụ thuộc giữa các RDDs khác nhau , được gọi là lineage graph . Nó sẽ được sử dụng để tính toán RDD và recover lost data nếu một phần của persistent RDD bị mất mát :

ACTIONS

Ở ví dụ trước , chúng ta có thể muốn in ra thông tin về errorsRDD . Chúng ta có thể sử dụng các actions như , count() sẽ trả về số lượng , và take() sẽ thu thập một số lượng elements từ RDD

System.out.println(“Input had ” + badLinesRDD.count() + ” concerning lines”)

System.out.println(“Here are 10 examples:”)

for (String line: badLinesRDD.take(10)) {

System.out.println(line);

}

RDD cũng có collect() function trả về toàn bộ RDD (Phải đảm bảo là toàn bộ dataset phải fit in memory on a single machine , không nên sử dụng với datasets quá lớn ). Trong phần lớn RDDs không thể sử dụng collect bởi vì nó quá lớn . Chúng ta sẽ viết data ra các hệ lưu trữ dữ liệu phân tán như là HDFS hoặc AMAZON S3 . Chúng ta có thể lưu contents của một RDD sử dụng saveAsTextFile() action , saveAsSequenceFile() ,

LAZY EVALUATION

Spark sẽ không thực thi cho đến khi nó nhìn thấy một action . Ví dụ nếu ta gọi một transformation ở một RDD ( ví dụ map() ) , operation sẽ không thực thi ngay tức thì . Thay vào đó , Spark internally records metadata để chỉ ra rằng operation này đã được yêu cầu . Thay vì nghĩ RDD chứa dữ liệu cụ thể , chúng ta có thể nghĩ rằng mỗi RDD sẽ bao chứa các chỉ dẫn rằng các data sẽ được xử lý như thế nào . Trong một hệ thống như Hadoop MapReduce , các lập trình viên thường xuyên dành nhiều thời gian để nghĩ ra cách gộp các operations lại với nhau để minimize the number of MapReduce passes . Trong Spark , không có ích lợi gì nếu việt một single complex map thay vì dàn trải thành nhiều simple operations

Passing Functions to Spark

Trong Java,các functions được quy định như các objects mà implement một trong những Spark’s function interfacces từ org.apache.spark.api.java.function package. Có rất nhiều các interfaces dựa vào kiểu trả về của function .

Function name Method to implement Usage
Function<T,R> R call(T) 1 đàu vào , 1 đầu ra , sử dụng với operations như map() filter()
Function2<T1,T2,R> R call(T1,T2) 2 đầu vào , 1 đầu ra , sử dụng với operations như aggregate() fold()
FlatMapFunction<T,R> Iterable<R> call(T) 1 đầu vào , không hoặc nhiều outputs , sử dụng với operations như flatMap()

 

Java function passing with anonymous inner class

 

RDD<String> errors = lines.filter(new Function<String, Boolean>() {

public Boolean call(String x) { return x.contains(“error”); }

});

Java function passing with named class

class ContainsError implements Function() {

public Boolean call(String x) {

return x.contains(“error”); } }

RDD errors = lines.filter(new ContainsError());

Java function class with parameters

class Contains implements Function() {

private String query;

public Contains(String query) { this.query = query; }

public Boolean call(String x) { return x.contains(query); } }

RDD errors = lines.filter(new Contains(“error”));

Trong Java 8 , chúng ta có thể sử dụng lambda expressions để implement ngắn gọn function interfaces :

Java function passing with lambda expression in Java 8

RDD errors = lines.filter(s -> s.contains(“error”));

Common Transformations and Actions

Element-wise transformations

Hai transformations phổ biến nhất mà thường xuyên sử dụng đó là map() filter() . The map() transformations sẽ cần một function và áp dụng nó vào từng element trong RDD với kết quả của function sẽ trở thành giá trị mới của từng element trong RDD kết quả . The filter() transformation sẽ cần một function và trả về một RDD mà chỉ chứa các phần từ mà vượt qua được filter() function.

Nên nhớ rằng , map()’s sẽ trả về kiểu có thể không giống như input type . Ví dụ về một map() mà bình phương các số trong một RDD

Java squaring the values in an RDD

JavaRDD rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4));

JavaRDD result = rdd.map(new Function() {

public Integer call(Integer x) { return x*x; }

});

System.out.println(StringUtils.join(result.collect(), “,”));

Đôi khi chúng ta muốn sinh ra multiple output elements từ mỗi input element . Operation này tên là flatMap() . Như với map() , function flatMap() sẽ được gọi riêng biệt cho từng element của input RDD . Nhưng thay vì chỉ trả về một element , chúng ta trả về một iterator với giá trị trả về . Một flatMap() đơn giản để tách các string thành các word :

flatMap() in Java, splitting lines into multiple words

JavaRDD lines = sc.parallelize(Arrays.asList(“hello world”, “hi”)); JavaRDD words = lines.flatMap(new FlatMapFunction() {

public Iterable call(String line) {

return Arrays.asList(line.split(” “));

}

});

words.first(); // returns “hello”

Pseudo set operations

RDDs hỗ trợ rất nhiều các operations toán , như union intersection . Các operations này đòi hỏi các RDD cùng loại

Tập các elements của chúng ta có thể có trùng lặp , nếu muốn lấy các phần tử riêng biệt , chúng ta có thể sử dụng RDD.distinct() transformation . distinct() khá tốn kém , vì nó phải load tất cả các data .

Toán tử đơn giản nhất là union(other), nó sẽ trả về RDD bao hồm data tử cả 2 RDD input . Spark’s union() có thể chứa dữ liệu trùng lặp

Toán tử intersection(other)  trả về phần tử mà nằm ở cả hai RDD , intersection() thì loại bỏ trùng lặp .

Toán tử subtract(other) sẽ trả về RDD mà năm trong RDD đầu tiên và không nằm trong RDD thứ hai

Toán tử cartesian(other) transformation trả về kết quả tất cả các cặp (a,b) với a năm trong source RDD và b trong RDD còn lại .

ACTIONS

Action phổ biến nhất là reduce() , nó sẽ takes a function triển trên 2 element của RDD của bạn và trả về một element mới cùng kiểu .

reduce() in Java

Integer sum = rdd.reduce(new Function2() {

public Integer call(Integer x, Integer y) { return x + y; }

});

fold() aggregate() …………

Operation đơn giản và phổ biến nhất để trả về dữ liệu cho driver program đó chính là collect() . collect() thường được sử dụng trong unit tests khi mà toàn bộ contents của RDD có thể fit in memory .

take(n) trả về n phần tử từ RDD và cố gắng tối thiểu số partitions mà nó truy cập . Nếu mà tồn tại thứ tự trong data của chúng ta , chúng ta có thể lấy về top elements từ một RDD sử dụng top() . top() sẽ sử dụng sắp xếp mặc định trong data của chúng ta , nhưng chúng ta có thể sử dụng hàm so sánh của chúng ta để lấy về top elements .

Đôi khi chúng ta cần lấy mẫu trong dữ liệu chúng ta . The takeSample(withReplecement, num, seed) function cho phép chúng ta lấy mẫu trong dữ liệu của chúng ta with or without replacement

Actions foreach() sẽ cho chúng ta tính toán trên từng element trong RDD

Converting Between RDD Types

Một số functions chỉ thích hợp cho một vài kiểu RDDs , như mean() variance() trên numeric RDDs hoặc join() trên key/value pair RDDs .

Thông thường , có một vài classes đặc biẹt tên là JavaDoubleRDD JavaPairRDD với một vài phương thức thêm vào cho những kiểu dữ liệu data này .

Creating DoubleRDD in Java

JavaDoubleRDD result = rdd.mapToDouble(

new DoubleFunction() {

public double call(Integer x) {

return (double) x * x;

}

});

System.out.println(result.mean());

Persistence (Caching)

Đôi khi chúng ta muốn sử dụng một RDD nhiều lần . Nếu chúng ta ngây thơ , thì Spark sẽ tính toàn lại RDD vào mỗi lần chúng ta gọi một action trên RDD . Để loại trừ tính toán RDD rất nhiều lần , chúng ta có thể yêu cầu Spark persist data . Khi chúng ta yêu cầu Spark persist một RDD , nodes mà tính toán RDD sẽ lưu lại partitions của nó . Nếu một node chứa data persisted mà fails , Spark sẽ tính toán lại lost partitions của data khi cần thiết . Spark có rất nhiều cấp độ persistence để chọn dựa vào mục đích cũng chúng ta , persist() mặc định sẽ lưu trữ dữ liệu trong JVM heap như một unserialized objects . Khi chúng ta viết dữ liệu ra disk hay off-heap storage , dữ liệu luôn luôn được serizlized .

Nếu bạn thử cache quá nhiều dữ liệu fit in memory . Spark sẽ tự động thu hồi old partitions sử dụng Least Recently Used(LRU) cache policy.Với memory-only storage levels , nó sẽ tính toán lại các partitions vào lần kết tiếp access , còn với memory-and-disk , nó sẽ ghi ra disk . Hay có nghĩa là , bạn sẽ không phải quá lo lắng khi yêu cầu Spark to cache quá nhiều data . Nhưng cũng k nên caching unnecessary data dẫn đến phải tính toán lại useful data và thêm thời gian tính toán lại

Cuối cùng , RDDs cung cấp một method tên là unpersist() cho phép remove các RDDs từ cache

3.Working with Key/Value Pairs

Spark cung cấp một operations đặc biệt bao gồm key/value pairs . Những RDDs này được gọi là pair RDDs . Pair RDDs cho phép chúng ta act on each key song song hoặc regroup dữ liệu thông qua mạng . Ví dụ , pair RDDs có một phương thức reduceByKey() có thể tổng hợp data riêng cho từng key , và một phương thức join() có thể gộp 2 RDDs lại với nhau bằng cách nhóm các elements với cùng key .

Creating Pair RDDs

Có một số cách để get pair trong Spark . Rất nhiều cách đọc file mà trả về pair RDDs . Trong trường hợp chúng ta có một RDD thông thường mà chúng ta muốn biến đổi thành pair RDD . Chúng ta có thể chạy map() function .

 

 

 

 

Creating a pair RDD using the first word as the key in Java

PairFunction keyData =

new PairFunction() {

public Tuple2 call(String x) {

return new Tuple2(x.split(” “)[0], x);

}

};

JavaPairRDD pairs = lines.mapToPair(keyData);

Để tạo ra pair RDD từ in-memory collection trong Java , chúng ta sử dụng SparkContext.parallelizePairs()

Transformations on Pair RDDs

Pair RDDs cho phép sử dụng tất các các transformations của một RDDs thông thường . Pair RDDs gồm các tuples , chúng ta cần pass functions mà operate on tuples hơn là các phần tử riêng biệt .

Aggregations

Khi datasets được mô tả theo key/value pairs , là điều bình thường nếu muốn aggregate statistics acroos tất cả các elements với cùng key .

reduceByKey() khá là giống với reduce(); cả hai đều take a function và sử dụng để gộp các values . reduceByKey() chạy song song các reduce operations , mỗi operations cho một key trong dataset

foldByKey()cũng khá là tương tự fold(), both use a zero value ò the same type of the data in our RDD and combination function . As with fold() , the provided zẻo value for foldByKey() should have no impact ưhen added ưith your combination function to another element.

Gọi reduceByKey() foldByKey() sẽ tự động thực thi kết hợp locally trên mỗi máy trước khi tính toán global cho từng key . combineByKey() interface sẽ cho phép bạn customize combining behavior

Word count in Java

JavaRDD input = sc.textFile(“s3://…”)

JavaRDD words = rdd.flatMap(new FlatMapFunction() {

public Iterable call(String x) { return Arrays.asList(x.split(” “)); }

});

JavaPairRDD result = words.mapToPair(

new PairFunction() { public Tuple2 call(String x) { return new Tuple2(x, 1); }

}).reduceByKey(

new Function2() {

public Integer call(Integer a, Integer b) { return a + b; }

});

Chúng ta sẽ thực thi word count nhanh hơn sử dụng countByValue() function ở RDD đầu tiên :

input.flatMap(x => x.split(” “)).countByValue()

combineByKey() is the mose general of the per-key aggregation functions . Các combiners per-key khác thường implemented nó . Như aggregate() , combineByKey() cho phép người dùng trả về giá trị không cùng kiểu với input data .

combineByKey() sẽ đi qua các elements trong các partition , mỗi element sẽ có một key mà lần đầu xuất hiện hoặc

 

Advertisements

Trả lời

Mời bạn điền thông tin vào ô dưới đây hoặc kích vào một biểu tượng để đăng nhập:

WordPress.com Logo

Bạn đang bình luận bằng tài khoản WordPress.com Đăng xuất / Thay đổi )

Twitter picture

Bạn đang bình luận bằng tài khoản Twitter Đăng xuất / Thay đổi )

Facebook photo

Bạn đang bình luận bằng tài khoản Facebook Đăng xuất / Thay đổi )

Google+ photo

Bạn đang bình luận bằng tài khoản Google+ Đăng xuất / Thay đổi )

Connecting to %s