Flink on Zeppelin – Batch

本文讲述如何使用Flink SQL + UDF来做Batch ETL和BI数据分析的任务。

Flink Interpreter类型

    首先介绍下Zeppelin中的Flink Interpreter类型。Zeppelin的Flink Interpreter支持Flink的所有API (DataSet, DataStream, Table API )。语言方面支持Scala,Python,SQL。下图是Zeppelin中支持的不同场景下的Flink Interpreter。

Name Class Description
%flink FlinkInterpreter Creates ExecutionEnvironment/StreamExecutionEnvironment/BatchTableEnvironment/StreamTableEnvironment and provides a Scala environment
%flink.pyflink PyFlinkInterpreter Provides a python environment
%flink.ipyflink IPyFlinkInterpreter Provides an ipython environment
%flink.ssql FlinkStreamSqlInterpreter Provides a stream sql environment
%flink.bsql FlinkBatchSqlInterpreter Provides a batch sql environment
配置Flink Interpreter

       下图例举了所有重要的Flink配置信息,除此之外你还可以配置任意Flink的Configuration(https://ci.apache.org/projects/flink/flink-docs-master/ops/config.html)

Property Default Description
FLINK_HOME Flink的安装目录
HADOOP_CONF_DIR Hadoop配置信息目录
HIVE_CONF_DIR Hive配置信息目录
flink.execution.mode local Execution mode of flink, e.g. local | yarn | remote
flink.execution.remote.host jobmanager hostname if it is remote mode
flink.execution.remote.port jobmanager port if it is remote mode
flink.jm.memory 1024 Total number of memory(mb) of JobManager
flink.tm.memory 1024 Total number of memory(mb) of TaskManager
flink.yarn.appName Zeppelin Flink Session Yarn app name
flink.yarn.queue queue name of yarn app
flink.execution.jars additional user jars (comma separated)
flink.execution.packages additional user packages (comma separated), e.g. org.apache.flink:flink-connector-kafka_2.11:1.10,org.apache.flink:flink-connector-kafka-base_2.11:1.10,org.apache.flink:flink-json:1.10
zeppelin.pyflink.python python python binary executable for PyFlink
table.exec.resource.default-parallelism 1 Default parallelism for flink sql job
zeppelin.flink.scala.color true whether display scala shell output in colorful format
zeppelin.flink.enableHive false whether enable hive
zeppelin.flink.printREPLOutput true Print REPL output
zeppelin.flink.maxResult 1 max number of rows returned by sql interpreter

StreamExecutionEnvironment, ExecutionEnvironment, StreamTableEnvironment, BatchTableEnvironment

Flink Interpreter (%flink) 为用户自动创建了下面6个变量作为Flink Scala程序的入口。

  • senv (StreamExecutionEnvironment),
  • benv (ExecutionEnvironment)
  • stenv (StreamTableEnvironment for blink planner)
  • btenv (BatchTableEnvironment for blink planner)
  • stenv_2 (StreamTableEnvironment for flink planner)
  • btenv_2 (BatchTableEnvironment for flink planner)

PyFlinkInterpreter (%flink.pyflink, %flink.ipyflink) 为用户自动创建了6个python变量作为PyFlink程序的入口

  • s_env (StreamExecutionEnvironment),
  • b_env (ExecutionEnvironment)
  • st_env (StreamTableEnvironment for blink planner)
  • bt_env (BatchTableEnvironment for blink planner)
  • st_env_2 (StreamTableEnvironment for flink planner)
  • bt_env_2 (BatchTableEnvironment for flink planner)
Blink/Flink Planner

Flink 1.10中有2种table api的planner:flink & blink.

  • 如果你用DataSet api以及需要把DataSet转换成Table,那么就需要使用Flink planner的TableEnvironment (btenv_2 and stenv_2).
  • 其他场景下, 我们都会建议用户使用blink planner. 这也是Flink sql使用的planner(%flink.bsql & %flink.ssql)
使用Flink Batch SQL

      %flink.bsql 是用来执行Flink的batch sql. 运行 help 命令可以得到所有可用的命令

总的来说,Flink Batch SQL可以用来做2大任务:

  • 使用 insert into 语句来做 Batch ETL
  • 使用 select 语句来做BI 数据分析
基于Bank数据的Batch ETL

    下面我们基于Bank (https://archive.ics.uci.edu/ml/datasets/bank+marketing)数据来做Batch ETL任务。首先用Flink Sql创建一个raw 数据的source table,以及清洗干净后的sink table。

然后再定义Table Function来parse raw data。

接下来就可以用insert into语句来进行数据转换(source table –> sink table)

用select语句来Preview最终数据,验证insert into语句的正确性

基于Bank数据的BI数据分析

        经过上面的数据清洗工作,接下来就可以对数据进行分析了。用户不仅可以使用标准的SQL Select语句进行分析,也可以使用Zeppelin的dynamic forms来增加交互性(TextBox,Select,Checkbox)

使用Flink UDF

      SQL虽然强大,但表达能力毕竟有限。有时候就要借助于UDF来表达更复杂的逻辑。Flink Interpreter 支持2种UDF (Scala + Python)。下面是2个简单的例子。

       Scala UDF

%flink


class ScalaUpper extends ScalarFunction {
  def eval(str: String) = str.toUpperCase
}


btenv.registerFunction("scala_upper", new ScalaUpper())

        Python UDF

%flink.pyflink

class PythonUpper(ScalarFunction):
  def eval(self, s):
    return s.upper()

bt_env.register_function("python_upper", udf(PythonUpper(), DataTypes.STRING(), DataTypes.STRING()))

创建完UDF之后,你就可以在SQL里使用了。

对Hive数据的数据分析

除了可以分析Flink SQL创建的table之外,Flink也可以分析Hive上已有的table。如果要让Flink Interpreter使用Hive,那么需要做以下配置

  • 设置 zeppelin.flink.enableHive 为 true
  • Copy 下面这些 dependencies 到flink的 lib 目录
    • flink-connector-hive_{scala_version}-{flink.version}.jar
    • flink-hadoop-compatibility_{scala_version}-{flink.version}.jar
    • flink-shaded-hadoop-2-uber-{hadoop.version}-{flink-shaded.version}.jar
    • hive-exec-2.x.jar (for Hive 1.x, you need to copy hive-exec-1.x.jar, hive-metastore-1.x.jar, libfb303-0.9.2.jar and libthrift-0.9.2.jar)
  • 在Flink interpreter setting 里或者 zeppelin-env.sh里指定 HIVE_CONF_DIR
  • 在Flink interpreter setting 指定 zeppelin.flink.hive.version 为你使用的Hive版本

下面就用一个简单的例子展示如何在Zeppelin中用Flink查询Hive table

1. 用Zeppelin的jdbc interpreter查询hive tables

2. 用Flink sql 查询 hive table的schema

3. 用Flink Sql 查询hive table

发表评论