Current location - Quotes Website - Signature design - How to make Spark versions compatible
How to make Spark versions compatible
In Spark 1.6, most classes related to machine learning still use vectors.

org . Apache . spark . ml lib . Lina LG . vector

After 2.0, it has basically been changed to

org . Apache . spark . ml . Lina LG . vector

Similarly, so does the corresponding Vectors object. This has caused great difficulties. For example, the following code is difficult to be compatible and cannot be compiled by switching Spark:

No matter how you write it, you can't run normally in Spark 1.6 and Spark 2.0 at the same time, and you will always get an error, because the package names such as Vector and Vectors have changed.

In Spark, you can get the version of Spark through org.apache.spark.SPARK_VERSION.

We define a class:

Dynamic compilation

Scala can easily carry out dynamic source code, and the code is very simple, as follows:

This effect is similar to that in Spark Shell, where ref is a return value. Under normal circumstances, you can write two codes, one is Spark 1.6 and the other is Spark 2.0, and then decide to compile that code at runtime. However, this method has a disadvantage, especially in Spark. If the value ref returned by compileCode needs to be serialized to the executor, then deserialization will cause problems, because some anonymous classes generated in it do not exist in the executor. In addition, this method is the most effective way to achieve compatibility.

At first, I considered using CodeGen code inside Spark, which is roughly as follows:

Unfortunately, the API itself is changing. For example, CodeAndComment is only available in 2.0.

Separation project mode

Detach the changed part of the API. For example, as we mentioned earlier, for the API related to Vector, 1.6 and 2.0 are different, then we can strip out two projects, each of which matches the corresponding version, and then release the jar package, and introduce different adaptation packages according to the Profile mechanism and Spark version in Maven. This method is cumbersome.

Reflection method (how StreamingPro is used)

The following is the code I used to solve the change of Vector package name:

We dynamically load the corresponding classes according to different versions, and then call the method through reflection, thus avoiding errors at compile time. However, by reflection, you can't use similar code:

Because udf function requires to be able to deduce what the input and return values are. However, if it is a reflection, it cannot be compiled at this time because the return value (possibly org.apache.spark.ml.linalg.vector or org.apache.spark.mllib.linalg.vector) cannot be determined. So we rewrote the implementation of udf, but this implementation also encountered setbacks, because the UserDefinedFunction class used in it has been in different packages, and we still solved it by radiation:

Ugly, isn't it? Here's another question. Although udf returns all UserDefinedFunction objects, they are also incompatible versions, which means we can't let the compiler decide what the return value is. We used another Scala syntax technique, as follows:

The core is in the last line, and we claim that the returned object matches this signature:

{def apply (expression: column *): column}

At this point, you can use it directly:

abstract

The internal APIs of Spark 1.6 and 2.0 have changed a lot, but the API compatibility for ordinary users is still good. It seems that it is not easy to do version compatibility. Therefore, when using StreamingPro for machine learning, I was only compatible with Spark 1.6, 2.0, and abandoned the 1.5 version. But for ordinary ETL and stream computing, all three versions support it.