UDF

Introduction

There're 4 ways you can define UDF in Zeppelin.

  • Write Scala UDF

  • Write PyFlink UDF

  • Create UDF via SQL

  • Configure udf jar via flink.udf.jars

Scala UDF

%flink

class ScalaUpper extends ScalarFunction {
  def eval(str: String) = str.toUpperCase
}

btenv.registerFunction("scala_upper", new ScalaUpper())

It is very straightforward to define scala udf. Almost the same as what you do in IDE. After creating udf class, you need to register it via btenv. You can also register it via stenv which share the same Catalog with btenv.

Python UDF

%flink.pyflink

class PythonUpper(ScalarFunction):
  def eval(self, s):
    return s.upper()

bt_env.register_function("python_upper", udf(PythonUpper(), DataTypes.STRING(), DataTypes.STRING()))

It is very straightforward to define python udf too. Almost the same as what you do in IDE. After creating udf class, you need to register it via bt_env. You can also register it via st_env which share the same Catalog with bt_env.

UDF via SQL

Some simple udf can be written in Zeppelin. But if the udf logic is very complicated, then it is better to write it in IDE, then register it in Zeppelin as following

%flink.ssql

CREATE FUNCTION myupper AS 'org.apache.zeppelin.flink.udf.JavaUpper';

But this kind of approach requires the udf jar must be on CLASSPATH, so you need to configure flink.execution.jars to include this udf jar on CLASSPATH, such as following:

flink.execution.jars /Users/jzhang/github/flink-udf/target/flink-udf-1.0-SNAPSHOT.jar

flink.udf.jars

The above 3 approaches all have some limitations:

  • It is suitable to write simple scala udf or python udf in Zeppelin, but not suitable to write very complicated udf in Zeppelin. Because notebook doesn't provide more advanced features compared to IDE, such as package management, code navigation and etc.

  • You have to run the paragraph of defining udf each time when you want to use these udf. It is not easy to share the udf between notes or users.

So when you have many udfs or udf logic is complicated, and you don't want to register them by yourself every time, then you can use flink.udf.jars

  • Step 1. Create a udf project in your IDE, write your udf there. Here I have a sample udf project

  • Step 2. Set flink.udf.jars to point to the udf jar you build from your udf project

%flink.conf

flink.execution.jars /Users/jzhang/github/flink-udf/target/flink-udf-1.0-SNAPSHOT.jar

Zeppelin would scan this whole jar, find out all the udf classes and then register them automatically for you. The udf name is the class name. For example, here's the output of show functions after I specify the above udf jars in flink.udf.jars

By default, Zeppelin would scan all the classes in this jar, so it would be pretty slow if your jar is very big specially when your udf has other dependencies. So in this case I would recommend you to specify flink.udf.jars.packages to specify the package to scan, this would make the udf detection much faster.

Video Tutorial

Community

Join Zeppelin community to discuss with others

Last updated