Flinkmysql-流处理和关系型数据库的完美结合

Flink是一个高性能、分布式的流处理框架,能够处理大规模数据流处理的场景。而MySQL是一种关系型数据库,是应用程序常用的数据库之一。结合起来,flinkmysql能够满足海量数据的流式处理和数据存储的需求。本文将从三个方面深入讲解flinkmysql的应用,并提供完整的代码示例。

一、FlinkmysqlSink——flink中与mysql的交互组件

Flink提供了与mysql数据库进行交互的组件,即FlinkmysqlSink,能够将数据流直接写入mysql数据库中。

首先,需要在pom文件中添加mysql-connector-java依赖:

<dependencies>
    <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
        <version>5.1.32</version>
    </dependency>
</dependencies>

然后,创建一个sink对象,将数据写入mysql中:

// 导入必要的包
import java.sql.Connection;
import java.sql.PreparedStatement;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import java.sql.DriverManager;
import java.sql.SQLException;

public class FlinkMysqlSinkExample {
    
    // 建立mysql连接
    private static Connection getConnection() throws Exception {
        String driver = "com.mysql.jdbc.Driver";
        String url = "jdbc:mysql://localhost:3306/test";
        String username = "root";
        String password = "root";
        Class.forName(driver);
        Connection conn = DriverManager.getConnection(url, username, password);
        return conn;
    }
    
    public static class MySQLSink extends RichSinkFunction<String> {
        
        private PreparedStatement ps;
        private Connection connection;
        private String insertSql = "insert into users(name,age) values (?,?)";

        // 开始时建立连接
        @Override
        public void open() throws Exception {
            super.open();
            connection = getConnection();
            ps = this.connection.prepareStatement(insertSql);
            System.out.println("open");
        }
 
        // 每个元素的插入都要调用一次invoke方法
        public void invoke(String s, Context context) throws Exception {
            //获取到要插入的数据
            String[] dataArray = s.split(",");
            String name = dataArray[0];
            String age = dataArray[1];
 
            //将数据插入MySQL
            ps.setString(1, name);
            ps.setInt(2, Integer.parseInt(age));
            ps.executeUpdate();    
        }
        
        // 结束时关闭连接
        @Override
        public void close() throws Exception {
            super.close();
            if (ps != null) {
                ps.close();
            }
            if (connection != null) {
                connection.close();
            }
        }
    }
}

在主程序中,需要如下代码将写入mysql的数据流与sink对象关联:

//创建一个stream
DataStream<String> input = ...;
//将流写入mysql中
input.addSink(new FlinkMysqlSinkExample.MySQLSink());

二、flinkmysql数据写入慢及其原因分析

许多开发者通过FlinkmysqlSink将数据写入MySQL时,会发现写入速度非常慢。下面将分析造成写入慢的原因,并给出相应的解决方案。

原因: 大多数情况下,写入慢的原因是JDBC driver在向MySQL发送数据时,每次只发送一个SQL语句。当需要插入的数据量非常大时,这将导致发送大量的SQL语句,影响写入的速度。

解决方案: 将数据写入到缓存中并在达到一定量时批量插入MySQL。可以使用一个BatchPreparedStatementSetter对象,建立完整的SQL语句并设置一次预编译语句中的所有参数,然后使用批量更新方法。

以下代码演示如何进行批量插入:

public class BatchInsertUsers {
 
    public static void main(String[] args) throws Exception {

        //获取到要插入的数据
        String[] dataArray = { "user1,20", "user2,21", "user3,22", "user4,23", "user5,24",
                "user6,25", "user7,26", "user8,27","user9,28", "user10,29","user11,30" };
 
        Connection conn = getConnection();
        String insertSql = "insert into users(name,age) values (?,?)";
 
        PreparedStatement ps = conn.prepareStatement(insertSql);
        int batchCount = 0;//统计批量的个数
        for (String rowData : dataArray) {
            String[] fields = rowData.split(",");
            String name = fields[0];
            int age = Integer.parseInt(fields[1]);
            ps.setString(1, name);
            ps.setInt(2, age);
            ps.addBatch();// 添加到batch

            if (++batchCount % 3 == 0) {// 批量提交
                ps.executeBatch();
            }
        }
        if (batchCount % 3 != 0) {//不足每个batch的数据时,且已经解析完了所有batch,就执行下面的代码
            ps.executeBatch();
        }
        conn.commit();// 提交事务
        ps.close();
        conn.close();
    }
}

三、flinkmysqlSink jdbc选取

在Flink中,有多种方式与MySQL进行交互,其中最常见的方式是使用JDBC。此外,还可以使用一些第三方库如MyBatis等。如果需要使用JDBC,请耐心看完本节。

1.导入mysqljdbc的依赖

在pom文件中增加如下dependency:

<dependencies>
  <dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>8.0.23</version>
  </dependency>
</dependencies>

2.建立数据库表

创建一个名为user的表:

CREATE TABLE `user` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `username` varchar(32) NOT NULL COMMENT '用户名',
  `password` varchar(32) NOT NULL COMMENT '密码',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='用户表';

3.创建一个简单的程序

在Java代码中进行配置及相关操作(需要提前建好数据库出表):

public static void main(String[] args) throws ClassNotFoundException, SQLException {
    String dbURL = "jdbc:mysql://localhost:3306/test?useSSL=false&useUnicode=true&characterEncoding=UTF-8";
    String dbUser = "root";
    String dbPass = "";
    // 加载 mySQL 驱动
    Class.forName("com.mysql.cj.jdbc.Driver");
    // 获得数据库链接
    Connection conn = DriverManager.getConnection(dbURL, dbUser, dbPass);
    // 操作数据库
    Statement stmt = conn.createStatement();
    String sql = "select * from user";
    ResultSet rs = stmt.executeQuery(sql);
    while (rs.next()) {
        System.out.println(rs.getString("username"));
    }
    //关闭连接
    rs.close();
    stmt.close();
    conn.close();
}

写入数据到数据库:

public static void main(String[] args) throws ClassNotFoundException, SQLException {
    String dbURL = "jdbc:mysql://localhost:3306/test?useSSL=false&useUnicode=true&characterEncoding=UTF-8";
    String dbUser = "root";
    String dbPass = "";
    Class.forName("com.mysql.cj.jdbc.Driver");
    Connection conn = DriverManager.getConnection(dbURL, dbUser, dbPass);
    Statement stmt = conn.createStatement();
    String sql = "insert into user (username,password) values (?,?)";
    PreparedStatement pstmt = conn.prepareStatement(sql);
    // 填充占位符
    pstmt.setString(1, "好好学Java");
    pstmt.setString(2, "123");
    pstmt.executeUpdate();
    pstmt.close();
    stmt.close();
    conn.close();
}

由此可见,通过jdbc操作MySQL数据库也非常简单。

总结

flinkmysql将流处理和关系型数据库结合起来,为我们在海量数据的情况下进行流式处理以及数据存储提供了一个可行的方案。通过使用FlinkmysqlSink,我们可以将数据流直接写入mysql数据库。解决了写入慢的问题后,我们可以如上演示的使用BatchPreparedStatementSetter批量插入数据。 jdbc的使用也易上手,可根据需求选取合适的操作方式。

原创文章,作者:PAUSD,如若转载,请注明出处:https://www.506064.com/n/315672.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
PAUSDPAUSD
上一篇 2025-01-09 12:13
下一篇 2025-01-09 12:13

相关推荐

  • Python 常用数据库有哪些?

    在Python编程中,数据库是不可或缺的一部分。随着互联网应用的不断扩大,处理海量数据已成为一种趋势。Python有许多成熟的数据库管理系统,接下来我们将从多个方面介绍Python…

    编程 2025-04-29
  • KeyDB Java:完美的分布式高速缓存方案

    本文将从以下几个方面对KeyDB Java进行详细阐述:KeyDB Java的特点、安装和配置、使用示例、性能测试。 一、KeyDB Java的特点 KeyDB Java是KeyD…

    编程 2025-04-29
  • openeuler安装数据库方案

    本文将介绍在openeuler操作系统中安装数据库的方案,并提供代码示例。 一、安装MariaDB 下面介绍如何在openeuler中安装MariaDB。 1、更新软件源 sudo…

    编程 2025-04-29
  • 数据库第三范式会有删除插入异常

    如果没有正确设计数据库,第三范式可能导致删除和插入异常。以下是详细解释: 一、什么是第三范式和范式理论? 范式理论是关系数据库中的一个规范化过程。第三范式是范式理论中的一种常见形式…

    编程 2025-04-29
  • leveldb和unqlite:两个高性能的数据库存储引擎

    本文将介绍两款高性能的数据库存储引擎:leveldb和unqlite,并从多个方面对它们进行详细的阐述。 一、leveldb:轻量级的键值存储引擎 1、leveldb概述: lev…

    编程 2025-04-28
  • Python怎么导入数据库

    Python是一种高级编程语言。它具有简单、易读的语法和广泛的库,让它成为一个灵活和强大的工具。Python的数据库连接类型可以多种多样,其中包括MySQL、Oracle、Post…

    编程 2025-04-28
  • 依赖关系代码的用法介绍

    依赖关系代码在软件开发中扮演着至关重要的角色。它们指定了项目中各个模块之间的依赖关系。本文将从多个方面对依赖关系代码进行详细的阐述。 一、依赖关系代码的作用 依赖关系代码可以帮助开…

    编程 2025-04-28
  • Think-ORM数据模型及数据库核心操作

    本文主要介绍Think-ORM数据模型建立和数据库核心操作。 一、模型定义 Think-ORM是一个开源的ORM框架,用于简化在PHP应用中(特别是ThinkPHP)与关系数据库之…

    编程 2025-04-27
  • 如何使用Python将CSV文件导入到数据库

    CSV(Comma Separated Values)是一种可读性高、易于编辑与导入导出的文件格式,常用于存储表格数据。在数据处理过程中,我们有时需要将CSV文件导入到数据库中进行…

    编程 2025-04-27
  • Python批量导入数据库

    本文将介绍Python中如何批量导入数据库。首先,对于数据分析和挖掘领域,数据库中批量导入数据是一个必不可少的过程。这种高效的导入方式可以极大地提高数据挖掘、机器学习等任务的效率。…

    编程 2025-04-27

发表回复

登录后才能评论