Flinkmysql-流處理和關係型數據庫的完美結合

Flink是一個高性能、分布式的流處理框架,能夠處理大規模數據流處理的場景。而MySQL是一種關係型數據庫,是應用程序常用的數據庫之一。結合起來,flinkmysql能夠滿足海量數據的流式處理和數據存儲的需求。本文將從三個方面深入講解flinkmysql的應用,並提供完整的代碼示例。

一、FlinkmysqlSink——flink中與mysql的交互組件

Flink提供了與mysql數據庫進行交互的組件,即FlinkmysqlSink,能夠將數據流直接寫入mysql數據庫中。

首先,需要在pom文件中添加mysql-connector-java依賴:

<dependencies>
    <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
        <version>5.1.32</version>
    </dependency>
</dependencies>

然後,創建一個sink對象,將數據寫入mysql中:

// 導入必要的包
import java.sql.Connection;
import java.sql.PreparedStatement;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import java.sql.DriverManager;
import java.sql.SQLException;

public class FlinkMysqlSinkExample {
    
    // 建立mysql連接
    private static Connection getConnection() throws Exception {
        String driver = "com.mysql.jdbc.Driver";
        String url = "jdbc:mysql://localhost:3306/test";
        String username = "root";
        String password = "root";
        Class.forName(driver);
        Connection conn = DriverManager.getConnection(url, username, password);
        return conn;
    }
    
    public static class MySQLSink extends RichSinkFunction<String> {
        
        private PreparedStatement ps;
        private Connection connection;
        private String insertSql = "insert into users(name,age) values (?,?)";

        // 開始時建立連接
        @Override
        public void open() throws Exception {
            super.open();
            connection = getConnection();
            ps = this.connection.prepareStatement(insertSql);
            System.out.println("open");
        }
 
        // 每個元素的插入都要調用一次invoke方法
        public void invoke(String s, Context context) throws Exception {
            //獲取到要插入的數據
            String[] dataArray = s.split(",");
            String name = dataArray[0];
            String age = dataArray[1];
 
            //將數據插入MySQL
            ps.setString(1, name);
            ps.setInt(2, Integer.parseInt(age));
            ps.executeUpdate();    
        }
        
        // 結束時關閉連接
        @Override
        public void close() throws Exception {
            super.close();
            if (ps != null) {
                ps.close();
            }
            if (connection != null) {
                connection.close();
            }
        }
    }
}

在主程序中,需要如下代碼將寫入mysql的數據流與sink對象關聯:

//創建一個stream
DataStream<String> input = ...;
//將流寫入mysql中
input.addSink(new FlinkMysqlSinkExample.MySQLSink());

二、flinkmysql數據寫入慢及其原因分析

許多開發者通過FlinkmysqlSink將數據寫入MySQL時,會發現寫入速度非常慢。下面將分析造成寫入慢的原因,並給出相應的解決方案。

原因: 大多數情況下,寫入慢的原因是JDBC driver在向MySQL發送數據時,每次只發送一個SQL語句。當需要插入的數據量非常大時,這將導致發送大量的SQL語句,影響寫入的速度。

解決方案: 將數據寫入到緩存中並在達到一定量時批量插入MySQL。可以使用一個BatchPreparedStatementSetter對象,建立完整的SQL語句並設置一次預編譯語句中的所有參數,然後使用批量更新方法。

以下代碼演示如何進行批量插入:

public class BatchInsertUsers {
 
    public static void main(String[] args) throws Exception {

        //獲取到要插入的數據
        String[] dataArray = { "user1,20", "user2,21", "user3,22", "user4,23", "user5,24",
                "user6,25", "user7,26", "user8,27","user9,28", "user10,29","user11,30" };
 
        Connection conn = getConnection();
        String insertSql = "insert into users(name,age) values (?,?)";
 
        PreparedStatement ps = conn.prepareStatement(insertSql);
        int batchCount = 0;//統計批量的個數
        for (String rowData : dataArray) {
            String[] fields = rowData.split(",");
            String name = fields[0];
            int age = Integer.parseInt(fields[1]);
            ps.setString(1, name);
            ps.setInt(2, age);
            ps.addBatch();// 添加到batch

            if (++batchCount % 3 == 0) {// 批量提交
                ps.executeBatch();
            }
        }
        if (batchCount % 3 != 0) {//不足每個batch的數據時,且已經解析完了所有batch,就執行下面的代碼
            ps.executeBatch();
        }
        conn.commit();// 提交事務
        ps.close();
        conn.close();
    }
}

三、flinkmysqlSink jdbc選取

在Flink中,有多種方式與MySQL進行交互,其中最常見的方式是使用JDBC。此外,還可以使用一些第三方庫如MyBatis等。如果需要使用JDBC,請耐心看完本節。

1.導入mysqljdbc的依賴

在pom文件中增加如下dependency:

<dependencies>
  <dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>8.0.23</version>
  </dependency>
</dependencies>

2.建立數據庫表

創建一個名為user的表:

CREATE TABLE `user` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `username` varchar(32) NOT NULL COMMENT '用戶名',
  `password` varchar(32) NOT NULL COMMENT '密碼',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='用戶表';

3.創建一個簡單的程序

在Java代碼中進行配置及相關操作(需要提前建好數據庫出表):

public static void main(String[] args) throws ClassNotFoundException, SQLException {
    String dbURL = "jdbc:mysql://localhost:3306/test?useSSL=false&useUnicode=true&characterEncoding=UTF-8";
    String dbUser = "root";
    String dbPass = "";
    // 加載 mySQL 驅動
    Class.forName("com.mysql.cj.jdbc.Driver");
    // 獲得數據庫鏈接
    Connection conn = DriverManager.getConnection(dbURL, dbUser, dbPass);
    // 操作數據庫
    Statement stmt = conn.createStatement();
    String sql = "select * from user";
    ResultSet rs = stmt.executeQuery(sql);
    while (rs.next()) {
        System.out.println(rs.getString("username"));
    }
    //關閉連接
    rs.close();
    stmt.close();
    conn.close();
}

寫入數據到數據庫:

public static void main(String[] args) throws ClassNotFoundException, SQLException {
    String dbURL = "jdbc:mysql://localhost:3306/test?useSSL=false&useUnicode=true&characterEncoding=UTF-8";
    String dbUser = "root";
    String dbPass = "";
    Class.forName("com.mysql.cj.jdbc.Driver");
    Connection conn = DriverManager.getConnection(dbURL, dbUser, dbPass);
    Statement stmt = conn.createStatement();
    String sql = "insert into user (username,password) values (?,?)";
    PreparedStatement pstmt = conn.prepareStatement(sql);
    // 填充佔位符
    pstmt.setString(1, "好好學Java");
    pstmt.setString(2, "123");
    pstmt.executeUpdate();
    pstmt.close();
    stmt.close();
    conn.close();
}

由此可見,通過jdbc操作MySQL數據庫也非常簡單。

總結

flinkmysql將流處理和關係型數據庫結合起來,為我們在海量數據的情況下進行流式處理以及數據存儲提供了一個可行的方案。通過使用FlinkmysqlSink,我們可以將數據流直接寫入mysql數據庫。解決了寫入慢的問題後,我們可以如上演示的使用BatchPreparedStatementSetter批量插入數據。 jdbc的使用也易上手,可根據需求選取合適的操作方式。

原創文章,作者:PAUSD,如若轉載,請註明出處:https://www.506064.com/zh-hant/n/315672.html

(0)
打賞 微信掃一掃 微信掃一掃 支付寶掃一掃 支付寶掃一掃
PAUSD的頭像PAUSD
上一篇 2025-01-09 12:13
下一篇 2025-01-09 12:13

相關推薦

  • Python 常用數據庫有哪些?

    在Python編程中,數據庫是不可或缺的一部分。隨着互聯網應用的不斷擴大,處理海量數據已成為一種趨勢。Python有許多成熟的數據庫管理系統,接下來我們將從多個方面介紹Python…

    編程 2025-04-29
  • KeyDB Java:完美的分布式高速緩存方案

    本文將從以下幾個方面對KeyDB Java進行詳細闡述:KeyDB Java的特點、安裝和配置、使用示例、性能測試。 一、KeyDB Java的特點 KeyDB Java是KeyD…

    編程 2025-04-29
  • openeuler安裝數據庫方案

    本文將介紹在openeuler操作系統中安裝數據庫的方案,並提供代碼示例。 一、安裝MariaDB 下面介紹如何在openeuler中安裝MariaDB。 1、更新軟件源 sudo…

    編程 2025-04-29
  • 數據庫第三範式會有刪除插入異常

    如果沒有正確設計數據庫,第三範式可能導致刪除和插入異常。以下是詳細解釋: 一、什麼是第三範式和範式理論? 範式理論是關係數據庫中的一個規範化過程。第三範式是範式理論中的一種常見形式…

    編程 2025-04-29
  • leveldb和unqlite:兩個高性能的數據庫存儲引擎

    本文將介紹兩款高性能的數據庫存儲引擎:leveldb和unqlite,並從多個方面對它們進行詳細的闡述。 一、leveldb:輕量級的鍵值存儲引擎 1、leveldb概述: lev…

    編程 2025-04-28
  • Python怎麼導入數據庫

    Python是一種高級編程語言。它具有簡單、易讀的語法和廣泛的庫,讓它成為一個靈活和強大的工具。Python的數據庫連接類型可以多種多樣,其中包括MySQL、Oracle、Post…

    編程 2025-04-28
  • 依賴關係代碼的用法介紹

    依賴關係代碼在軟件開發中扮演着至關重要的角色。它們指定了項目中各個模塊之間的依賴關係。本文將從多個方面對依賴關係代碼進行詳細的闡述。 一、依賴關係代碼的作用 依賴關係代碼可以幫助開…

    編程 2025-04-28
  • Think-ORM數據模型及數據庫核心操作

    本文主要介紹Think-ORM數據模型建立和數據庫核心操作。 一、模型定義 Think-ORM是一個開源的ORM框架,用於簡化在PHP應用中(特別是ThinkPHP)與關係數據庫之…

    編程 2025-04-27
  • 如何使用Python將CSV文件導入到數據庫

    CSV(Comma Separated Values)是一種可讀性高、易於編輯與導入導出的文件格式,常用於存儲表格數據。在數據處理過程中,我們有時需要將CSV文件導入到數據庫中進行…

    編程 2025-04-27
  • Python批量導入數據庫

    本文將介紹Python中如何批量導入數據庫。首先,對於數據分析和挖掘領域,數據庫中批量導入數據是一個必不可少的過程。這種高效的導入方式可以極大地提高數據挖掘、機器學習等任務的效率。…

    編程 2025-04-27

發表回復

登錄後才能評論