UDF
There're 4 ways you can define UDF in Zeppelin.
- Write Scala UDF
- Write PyFlink UDF
- Create UDF via SQL
- Configure udf jar via
flink.udf.jars
%flink
class ScalaUpper extends ScalarFunction {
def eval(str: String) = str.toUpperCase
}
btenv.registerFunction("scala_upper", new ScalaUpper())
It is very straightforward to define scala udf. Almost the same as what you do in IDE. After creating udf class, you need to register it via
btenv
. You can also register it via stenv
which share the same Catalog with btenv
.%flink.pyflink
class PythonUpper(ScalarFunction):
def eval(self, s):
return s.upper()
bt_env.register_function("python_upper", udf(PythonUpper(), DataTypes.STRING(), DataTypes.STRING()))
It is very straightforward to define python udf too. Almost the same as what you do in IDE. After creating udf class, you need to register it via
bt_env
. You can also register it via st_env
which share the same Catalog with bt_env
.Some simple udf can be written in Zeppelin. But if the udf logic is very complicated, then it is better to write it in IDE, then register it in Zeppelin as following
%flink.ssql
CREATE FUNCTION myupper AS 'org.apache.zeppelin.flink.udf.JavaUpper';
But this kind of approach requires the udf jar must be on
CLASSPATH
, so you need to configure flink.execution.jars
to include this udf jar on CLASSPATH
, such as following:flink.execution.jars /Users/jzhang/github/flink-udf/target/flink-udf-1.0-SNAPSHOT.jar
The above 3 approaches all have some limitations:
- It is suitable to write simple scala udf or python udf in Zeppelin, but not suitable to write very complicated udf in Zeppelin. Because notebook doesn't provide more advanced features compared to IDE, such as package management, code navigation and etc.
- You have to run the paragraph of defining udf each time when you want to use these udf. It is not easy to share the udf between notes or users.
So when you have many udfs or udf logic is complicated, and you don't want to register them by yourself every time, then you can use
flink.udf.jars
- Step 1. Create a udf project in your IDE, write your udf there. Here I have a sample udf project
- Step 2. Set
flink.udf.jars
to point to the udf jar you build from your udf project
%flink.conf
flink.execution.jars /Users/jzhang/github/flink-udf/target/flink-udf-1.0-SNAPSHOT.jar
Zeppelin would scan this whole jar, find out all the udf classes and then register them automatically for you. The udf name is the class name. For example, here's the output of
show functions
after I specify the above udf jars in flink.udf.jars

By default, Zeppelin would scan all the classes in this jar, so it would be pretty slow if your jar is very big specially when your udf has other dependencies. So in this case I would recommend you to specify
flink.udf.jars.packages
to specify the package to scan, this would make the udf detection much faster.
Join Zeppelin community to discuss with others
Last modified 1yr ago