JdbcSink 简析

news/2024/7/7 21:44:26

文章目录

  • 1、JdbcSink
    • 1.1、参数
    • 1.2、返回
  • 2、JdbcBatchingOutputFormat
    • 2.1、参数
    • 2.2、open方法
      • 2.2.1、连接数据库
      • 2.2.2、JdbcExec
      • 2.2.3、scheduler
    • 2.3、writeRecord方法
      • 2.3.1、缓存数据
      • 2.3.2、flush


1、JdbcSink

  用于DataStream增加Jdbc的Sink输出,主要两个接口:sink()和exactlyOnceSink()。其中exactlyOnceSink()是13版本新增的支持事务性的接口,本次主要介绍sink()接口。

public static <T> SinkFunction<T> sink(
        String sql,
        JdbcStatementBuilder<T> statementBuilder,
        JdbcExecutionOptions executionOptions,
        JdbcConnectionOptions connectionOptions) {
    return new GenericJdbcSinkFunction<>(
            new JdbcBatchingOutputFormat<>(
                    new SimpleJdbcConnectionProvider(connectionOptions),
                    executionOptions,
                    context -> {
                        Preconditions.checkState(
                                !context.getExecutionConfig().isObjectReuseEnabled(),
                                "objects can not be reused with JDBC sink function");
                        return JdbcBatchStatementExecutor.simple(
                                sql, statementBuilder, Function.identity());
                    },
                    JdbcBatchingOutputFormat.RecordExtractor.identity()));
}

1.1、参数

  接口有四个参数,其中第三个参数executionOptions可以省略使用默认值,具体样例参看1、JdbcSink方式

  • sql

    String类型,一个SQL语句模板,就是通常使用的PreparedStatement那种形式,例如:insert into wordcount (wordcl, countcl) values (?,?)

  • statementBuilder

    JdbcStatementBuilder类型,作用是完成流数据与SQL具体列的对应,基于上一个参数的PreparedStatement形式,完成对应关系

  • executionOptions

    Flink Jdbc输出的执行规则,主要设置执行触发机制,主要设置三个参数:数据量触发阈值、时间触发阈值、最大重试次数。其中,数据量触发默认为5000,时间触发默认为0,即关闭时间触发。注意触发阈值不要设置的过低,否则可能造成数据库的阻塞。

  • connectionOptions

    JdbcConnectionOptions类型,用于设置数据库连接属性,包括Url、Driver、Username、Password等

1.2、返回

  接口返回的是一个基于SinkFunction实现的GenericJdbcSinkFunction类,其核心是参数JdbcBatchingOutputFormat。

  GenericJdbcSinkFunction的结果核心方法如下,都是基于JdbcBatchingOutputFormat的操作。

@Override
public void open(Configuration parameters) throws Exception {
    super.open(parameters);
    RuntimeContext ctx = getRuntimeContext();
    outputFormat.setRuntimeContext(ctx);
    outputFormat.open(ctx.getIndexOfThisSubtask(), ctx.getNumberOfParallelSubtasks());
}

@Override
public void invoke(T value, Context context) throws IOException {
    outputFormat.writeRecord(value);
}

@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
    outputFormat.flush();
}

2、JdbcBatchingOutputFormat

  JdbcBatchingOutputFormat是进行Jdbc交互的实现类,在向Jdbc输出前进行数据聚合

2.1、参数

  接口有四个参数

  • JdbcConnectionProvider

    提供Jdbc连接

  • JdbcExecutionOptions

    执行参数

  • StatementExecutorFactory

    Statement执行工厂,也就是流数据与数据库字段对应关系的处理

  • RecordExtractor

    数据提取的执行类

2.2、open方法

  Open方法是进行数据库连接初始化及前期准备的接口,存在调用关系

Task.doRun()
    ->invokable.invoke()->DataSinkTask.invoke()
        ->format.open()->JdbcBatchingOutputFormat.open()

2.2.1、连接数据库

  Open方法的第一步是连接数据库,调用上层方法AbstractJdbcOutputFormat的open方法,之后调用JdbcConnectionProvider的实现类SimpleJdbcConnectionProvider的getOrEstablishConnection()方法建立连接,getOrEstablishConnection的具体操作如下

public Connection getOrEstablishConnection() throws SQLException, ClassNotFoundException {
    if (connection != null) {
        return connection;
    }
    if (jdbcOptions.getDriverName() == null) {
        connection =
                DriverManager.getConnection(
                        jdbcOptions.getDbURL(),
                        jdbcOptions.getUsername().orElse(null),
                        jdbcOptions.getPassword().orElse(null));
    } else {
        Driver driver = getLoadedDriver();
        Properties info = new Properties();
        jdbcOptions.getUsername().ifPresent(user -> info.setProperty("user", user));
        jdbcOptions.getPassword().ifPresent(password -> info.setProperty("password", password));
        connection = driver.connect(jdbcOptions.getDbURL(), info);
        if (connection == null) {
            // Throw same exception as DriverManager.getConnection when no driver found to match
            // caller expectation.
            throw new SQLException(
                    "No suitable driver found for " + jdbcOptions.getDbURL(), "08001");
        }
    }
    return connection;
}

  此处根据有没有设置Drive有两种处理。如果没有设置,会根据设置的URL自动解析,用到了Java的DriverManager,这个类用于管理Jdbc驱动。DriverManager会自动识别classpath里的Driver,然后可以根据URL自动解析配对。如果设置了Driver,那就直接加载这个Driver来进行连接处理。

2.2.2、JdbcExec

  这个是基于StatementExecutorFactory创建的,此处最后使用的实现类是JdbcBatchStatementExecutor,在sink()接口中设定。这一步实际的操作就是做一个prepareStatements

@Override
public void prepareStatements(Connection connection) throws SQLException {
    this.st = connection.prepareStatement(sql);
}

2.2.3、scheduler

  数据库性能有限,所以Flink写数据库通常采用批的方式,此处就是设置时间调度的,具体参数可以参看第一章。需要注意的是两个特殊配置值:时间为0或者条数为1则不创建这个调度器。

if (executionOptions.getBatchIntervalMs() != 0 && executionOptions.getBatchSize() != 1) {

  此处创建的调度线程池只包含一个线程

this.scheduler =
        Executors.newScheduledThreadPool(
                1, new ExecutorThreadFactory("jdbc-upsert-output-format"))

  调度器最终执行的操作就是整个类最大的一点,flush数据到数据库

synchronized (JdbcBatchingOutputFormat.this) {
    if (!closed) {
        try {
            flush();
        } catch (Exception e) {
            flushException = e;
        }
    }
}

2.3、writeRecord方法

  writeRecord是类的核心方法,进行数据的写入。主要进行两个操作,将数据加入列表,达到条件时flush到数据库中。

try {
    addToBatch(record, jdbcRecordExtractor.apply(record));
    batchCount++;
    if (executionOptions.getBatchSize() > 0
            && batchCount >= executionOptions.getBatchSize()) {
        flush();
    }
} catch (Exception e) {
    throw new IOException("Writing records to JDBC failed.", e);
}

2.3.1、缓存数据

  缓存数据使用的就是一个简单的ArrayList,其定义在SimpleBatchStatementExecutor

SimpleBatchStatementExecutor(
        String sql, JdbcStatementBuilder<V> statementBuilder, Function<T, V> valueTransformer) {
    this.sql = sql;
    this.parameterSetter = statementBuilder;
    this.valueTransformer = valueTransformer;
    this.batch = new ArrayList<>();
}

  如上,batch就是用于缓存数据的,添加数据操作如下。

@Override
public void addToBatch(T record) {
    batch.add(valueTransformer.apply(record));
}

  其中valueTransformer的作用就是返回输入,在sink初始时定义:

return JdbcBatchStatementExecutor.simple(
        sql, statementBuilder, Function.identity());

/**
 * Returns a function that always returns its input argument.
 *
 * @param <T> the type of the input and output objects to the function
 * @return a function that always returns its input argument
 */
static <T> Function<T, T> identity() {
    return t -> t;
}

2.3.2、flush

  flush就是把缓存数据向数据库刷出,最终调用的是SimpleBatchStatementExecutor的executeBatch方法

@Override
public void executeBatch() throws SQLException {
    if (!batch.isEmpty()) {
        for (V r : batch) {
            parameterSetter.accept(st, r);
            st.addBatch();
        }
        st.executeBatch();
        batch.clear();
    }
}

http://www.niftyadmin.cn/n/3018935.html

相关文章

机器学习(1)_R与神经网络之Neuralnet包

本篇博客将会介绍R中的一个神经网络算法包&#xff1a;Neuralnet&#xff0c;通过模拟一组数据&#xff0c;展现其在R中是如何使用&#xff0c;以及如何训练和预测。在介绍Neuranet之前&#xff0c;我们先简单介绍一下神经网络算法。 人工神经网络(ANN)&#xff0c;简称神经网络…

C# List.ForEach 方法

C#中List.ForEach 方法是对 List 的每个元素执行指定操作。 示例&#xff1a; using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks;namespace AppExample {class Program{static void Main(string[] args){…

Adaptive调度器

文章目录1.前言2.测试3.配置启用4.其他配置参数4.1.主要配置4.2.其他可能相关的配置5.调用流程6.配置Adaptive调度器7.DefaultDeclarativeSlotPool7.1.NewSlotsListener7.2.offerSlots7.3.freeReservedSlot7.4.缩容触发8.AdaptiveScheduler8.1.使用条件8.2.计算并行度信息8.2.1…

jQuery触屏插件:Tap 代码

jQuery触屏插件&#xff1a;Tap&#xff0c;使用方法非常简单&#xff0c;例&#xff1a;$("#domid").tap(function(){alert("You tapped me! -- by"this.innerText);});依赖jquery 1.701$.fn.tap function(fn){02var collection this,03isTouch "…

e开头的java编辑器叫什么意思_java处理百度编辑器ueditor上传的图片等多媒体文件...

java处理百度编辑器ueditor上传的图片等多媒体文件开发项目过程中&#xff0c;一般会涉及到采用富文本编辑器处理“内容”之类的业务&#xff0c;而这内容中&#xff0c;难免会上传各种图片、视频等。而一般采用的富文本编辑器常见的有ueditor百度编辑器、widgEditor等等。我一…

Flink语法扩展--SqlRichExplain为例

文章目录1.修改文件列表2.SqlRichExplain3.parserImpls.ftl4.Parser.tdd5.ExplainOperation6.SqlToOperationConverter7.FlinkPlannerImpl8.Parser.jj9.FlinkSqlParserImpl1.修改文件列表 一些原始的修改&#xff0c;后续的部分增强没有再加入说明 flink-sql-client&#xff1…

ajax查询数据库时数据无法更新的问题

楼主问题&#xff1a;做这个留言本都郁闷得我想杀遍整个东南亚了,莫名问题层出不穷!目前这个问题是这样的:我使用ajax的http.open(get,url)方法,在url后附加参数传到后台查询留言本数据库的内容.但每次查询后,如果你再插入新的数据,再使用ajax传递同样的参数过去,显示的还是第一…

java awt 实例_java:图形化界面awt基本实例

/**创建一个图形化界面在TextField中输入目录&#xff0c;就会再TextField中显示该目录下的文件*/import java.io.*;import java.awt.*;import java.awt.event.*;class TestFileFrame{public static void main(String[] args) throws Exception{//创建窗体Frame f new Frame(&…