Current location - Quotes Website - Signature design - What header file does rdd type use in scala?
What header file does rdd type use in scala?
1 introduction. RDD:

RDD is an elastic distributed data set and a distributed collection of elements. In spark, all data operations are nothing more than creating RDD, transforming existing RDD and calling RDD operations for evaluation. Behind all this, Spark will automatically distribute the data in RDD to the cluster and parallelize the operation.

RDD in Spark is a set of immutable distributed objects. Each RDD is divided into multiple partitions and runs on different nodes in the cluster. RDD can contain any type of objects in Python, Java, Scala, and even user-defined objects.

Users can create RDD in two ways: reading an external data set or distributing a collection of objects in the driver, such as list or set.

The conversion operations of RDD are all delayed, which means that when we call the conversion operation on RDD, the operation will not be executed immediately. Instead, Spark internally records information about the requested operation. We should not regard RDD as a data set with specific data. We'd better regard each RDD as an instruction list constructed by transformation operation, and record how to calculate data. Reading data into RDD is also lazy, and data will be read only when necessary. Both the conversion operation and the read operation can be performed multiple times.

2. Create RDD dataset

(1) reading external data set

val input = sc . textfile(input file dir)

(2) Distribute object sets, taking list as an example.

Vallines = sc.parallel (list ("Hello World", "This is a test"));

3. action 3.RDD

(1) conversion operation

To realize the filter conversion operation:

val lines = sc . parallelize(List(" error:a "," error:b "," error:c "," test "));

val errors = lines . filter(line = & gt; line . contains(" error "));

errors.collect()。 foreach(println);

Output:

Error: a

Error: b

Error: c

As you can see, the items in the list that contain the word error have been filtered out correctly.

(2) Merge operation

Merge two RDD data sets into one RDD data set.

According to the above program example:

val lines = sc . parallelize(List(" error:a "," error:b "," error:c "," test "," warnings:a "));

val errors = lines . filter(line = & gt; line . contains(" error "));

val warnings = lines . filter(line = & gt; line . contains(" warnings "));

Val unionLines =errors.union (warning);

unionLines.collect()。 foreach(println);

Output:

Error: a

Error: b

Error: c

Warning: a

As you can see, the error items and warning items in the original list items have been filtered out.

(3) obtaining some or all elements in the RDD data set

① Get some elements in RDD dataset? . Take (integer)? Return value list

Get the first num items in the RDD dataset.

/**

* take the first num elements of RDD. This will scan the partitions one by one, so

* If you need many partitions, the speed will be very slow. In this case, use collect () to get the.

* For the whole RDD.

*/

def take(num: Int): JList[T]

Program example: connecting

unionLines.take(2)。 foreach(println);

Output:

Error: a

Error: b

It can be seen that the first two items of the RDD data set joint line are output.

② Get all elements in RDD dataset. Collect () returns a list;

Program example:

val all = union lines . collect();

all . foreach(println);

Traverse each item of the output RDD dataset union line.

4. Pass the function to spark

In scala, we can pass defined inline functions, method references or static methods to Spark, just like other functional APIs in Scala. We must consider some other details. Functions that must be passed and the data they reference need to be Serializable (Java's serializable interface is implemented). In addition, similar to Python, when passing a method or field of an object, it will contain a reference to the whole object. We can put the required field in a local variable to avoid the whole object containing the field.

Class search function (val query: string) (

Def isMatch(s: String): Boolean value = {

Include (query)

}

Def getmatchfunctionreference (rdd: rdd [string ]):RDD[ string] = {

//Question: isMach stands for this.isMatch, so you need to go through the whole this.

rdd.filter(isMatch)

}

Def getmatches functionreference (rdd: rdd [string ]):RDD[ string] = {

//Question: query stands for this.query, so we need to pass the whole this.

rdd . flat map(line = & gt; Line.split (query))

}

Def getmatches reference (RDD: RDD [string ]):RDD[ string] = {

//Safe, only take out the fields we need and put them into local variables.

val query 1 = this . query;

rdd . flat map(x = & gt; X.split (query 1)

)

}

}

5. For each element transformation operation:

The transformation operation map () receives a function, applies the function to each element in the RDD, and takes the return result of the function as the corresponding element in the result RDD. Keywords: transformation

The transform operation filter () accepts a function and returns the elements in the RDD that satisfy the function in the new RDD. Keywords: filtering

The sample diagram is as follows:

① Map ()

Calculate the square of each RDD value.

val rdd = sc . parallelize(List( 1,2,3,4));

val result = rdd . map(value = & gt; Value * value);

println(result.collect()。 mkString(",");

Output:

1,4,9, 16

Filter ()

②? Delete the element with the value 1 from the RDD set:

val rdd = sc . parallelize(List( 1,2,3,4));

val result = rdd . filter(value = & gt; Value! = 1);

println(result.collect()。 mkString(",");

Results:

2,3,4

We can also take the form of a transfer function, like this:

Function:

def filter function(value:Int):Boolean = {

Value! = 1

}

Use:

val rdd = sc . parallelize(List( 1,2,3,4));

val result = rdd . filter(filter function);

println(result.collect()。 mkString(",");

③ Sometimes, we want to generate multiple output elements for each input element. The operation to realize this function is called flatMap (). Similar to map (), the function we provide to flatMap () is applied to every element of the input RDD. But it is not an element, but an iterator of the return value sequence. The output RDD is not composed of iterators. What we get is an RDD, which contains all the elements that each iterator can access. A simple use of flatMap () is to cut the input string into words, as shown below:?

val rdd = sc . parallelize(List(" Hello world "、" hello you "、" world I love you "));

val result = rdd . flat map(line = & gt; line . split(" ");

println(result.collect()。 mkString(" \ n ");

Output:

hello

world

hello

you

world

I

love

you

6. Set operation

Setting operation in RDD

function

use

RDD 1.distinct()

Generate a new RDD containing only different elements. Need data shuffling.

RDD 1.union(RDD2)

Returns an RDD that contains all the elements in two RDDs.

RDD 1. intersection (RDD2)

Only elements that exist in both RDDs are returned.

RDD 1.substr(RDD2)

Returns an RDD consisting of all elements that exist only in the first RDD but not in the second RDD. Need data shuffling.

The treatment of cartesian set by set operation;

RDD 1. Cartesian coordinates (RDD2)

Returns a Cartesian set of two RDD datasets.

Program example: generate Cartesian sets of RDD sets {1 2} and {1 2}.

val rdd 1 = sc . parallelize(List( 1,2));

val rd D2 = sc . parallelize(List( 1,2));

val rdd = rdd 1 . cartesian(rdd 2);

println(rdd.collect()。 mkString(" \ n ");

Output:

( 1, 1)

( 1,2)

(2, 1)

(2,2)

7. Action operation

(1) reduction operation

Reduce () takes a function as a parameter, which will manipulate data of two RDD element types and return a new element of the same type. A simple example is the function+,which can be used to accumulate our RDD. Use reduce (), you can easily calculate the sum of all elements in RDD, the number of elements and other types of aggregation operations.

The following is an example of a program that sums all elements of an RDD dataset:

val rdd = sc . parallelize(List( 1,2,3,4,5,6,7,8,9, 10));

val results=rdd.reduce((x,y)= & gt; x+y);

Println (result);

Output: 55

(2) Folding () operation

Receive a function with the same function signature as that received by reduce (), and add an initial value as the result of the first call of each partition. The initial value you provide should be the unit element of the operation you provide, that is to say, using your function to calculate this initial value many times will not change the result (for example,+corresponds to 0, * corresponds to 1, or an empty list corresponding to the splicing operation).

Program example:

① Calculate the sum of all elements in RDD dataset:

zero value = 0; //When summing, the initial value is 0.

val rdd = sc . parallelize(List( 1,2,3,4,5,6,7,8,9, 10));

val results=rdd.fold(0)((x,y)= & gt; x+y);

Println (result);

② Calculate the product of all elements in RDD dataset:

zero value = 1; //When integrating, the initial value is 1.

val rdd = sc . parallelize(List( 1,2,3,4,5,6,7,8,9, 10));

val results = rdd . fold( 1)((x,y)= & gt; x * y);

Println (result);

(3)aggregate () operation

The return value type of the aggregate () function does not have to be the same as the RDD type of the operation.

Similar to fold (), when using aggregate (), we need to provide the initial value of the type we expect to return. Then use a function to combine the elements in RDD and put them into the accumulator. Considering that each node accumulates locally, finally, a second function needs to be provided to combine the accumulators in pairs.

The following is a program example:

val rdd = sc . parallelize(List( 1,2,3,4,5,6,7,8,9, 10));

val result=rdd.aggregate((0,0))(

(acc,value)= & gt; (acc。 _ 1+ value, in line with. _2+ 1),

(acc 1,ACC 2)= & gt; (acc 1。 _ 1+acc2。 _ 1,acc 1。 _2+acc2。 _2)

)

Average value = result. _ 1/ result. _2;

Println (average)

Output: 5

What is finally returned is a tuple 2.

Table: Perform basic RDD operations on RDD with data of {1, 2,3,3}.

Sample results of function name project

Collect () returns all elements of rdd.collect () {1, 2,3,3}

Number of elements of count () RDD rdd.count() 4

CountByValue () The number of times each element appears in RDD RDD. Countbyvalue () {( 1, 1),

(2, 1),

(3,2)

}

Take(num) returns num elements rdd.take(2) {1, 2} from rdd.

Top(num) returns the first num elements of RDD. Order (2) (My Order) {3,3} from RDD.

Ordered (quantity)

(Sort) Returns the first num elements from RDD in the order provided.

Rdd.takeSample(false, 1) is uncertain.

Take sample (with replacement, num, [seed]) returns any number of elements from rdd, and takeSample(false, 1) is uncertain.

Reduce(func) integrates all data in RDD in parallel. Reduce ((x,y) = > x+y)

nine

Fold(zero)(func) is the same as reduce (), but the initial value is rdd.fold(0)((x, y) = > x+y)

nine

Aggregate (seq op, combo) is similar to reduce (), but it usually returns a different type of function rdd.aggregate((0 (0,0)).

((x,y)= & gt;

(x._ 1+y,x._2+ 1),

(x,y)= & gt;

(x . 1+y . 1,x . 2+y . 2)

) (9,4)

Foreach(func) uses the given function RDD for each element in rdd.foreach(func) None.

8. Persistent cache

Because Spark RDD is lazy, sometimes we want to use the same RDD many times. If you simply call an action on RDD, Spark will recalculate RDD and all its dependencies every time. This is particularly expensive in iterative algorithms because iterative algorithms often use the same set of data many times.

In order to avoid calculating the same RDD multiple times, Spark can persist the data. When we let Spark store an RDD persistently, the nodes that calculate RDD will save the partition data they have calculated.

We can choose different persistence levels for RDD for different purposes. By default, persist () caches data in the JVM heap space in serialized form.

Storage level table corresponding to different keywords

grade

Space used

Central processor time

Is it in memory?

Is it on the disk?

comment

Memory only

high

low

be

no

Directly stored in memory.

Store only users

low

high

be

no

Serialized and stored in memory.

Memory and disk

low

medium

part

part

If there is no room for data in the memory, it will overflow to the disk.

Memory and disk users

low

high

part

part

The data can't be put into memory and overflows on the disk. Serialized data is stored in memory.

Disk only

low

high

no

be

Stored directly on the hard disk.

Program example: Save RDD data set in memory.

val rdd = sc . parallelize(List( 1,2,3,4,5,6,7,8,9, 10))。 Persistence (StorageLevel. MEMORY _ ONLY);

println(rdd.count())

println(rdd.collect()。 mkString(",");

RDD also has an unpersist () method, which can be called to manually delete persistent RDD from the cache.

9. Different RDD types

In scala, converting RDD into a specific function (such as numerical operation on RDD[Double]) is handled automatically by implicit conversion. These implicit conversions can implicitly convert an RDD into various encapsulation classes, such as DoubleRDDFunctions (RDD of numeric data) and PairRDDFunctions (RDD of key-value pairs), so we have additional functions such as mean () and variance ().

Sample program:

val rdd = sc . parallelize(List( 1.0,2.0,3.0,4.0,5.0));

println(rdd . mean());

In fact, there is no mean () function in RDD[T], but implicit conversion automatically converts it into DoubleRDDFunctions.