Flink是一個高性能、分佈式的流處理框架,能夠處理大規模數據流處理的場景。而MySQL是一種關係型數據庫,是應用程序常用的數據庫之一。結合起來,flinkmysql能夠滿足海量數據的流式處理和數據存儲的需求。本文將從三個方面深入講解flinkmysql的應用,並提供完整的代碼示例。
一、FlinkmysqlSink——flink中與mysql的交互組件
Flink提供了與mysql數據庫進行交互的組件,即FlinkmysqlSink,能夠將數據流直接寫入mysql數據庫中。
首先,需要在pom文件中添加mysql-connector-java依賴:
<dependencies>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.32</version>
</dependency>
</dependencies>
然後,創建一個sink對象,將數據寫入mysql中:
// 導入必要的包
import java.sql.Connection;
import java.sql.PreparedStatement;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import java.sql.DriverManager;
import java.sql.SQLException;
public class FlinkMysqlSinkExample {
// 建立mysql連接
private static Connection getConnection() throws Exception {
String driver = "com.mysql.jdbc.Driver";
String url = "jdbc:mysql://localhost:3306/test";
String username = "root";
String password = "root";
Class.forName(driver);
Connection conn = DriverManager.getConnection(url, username, password);
return conn;
}
public static class MySQLSink extends RichSinkFunction<String> {
private PreparedStatement ps;
private Connection connection;
private String insertSql = "insert into users(name,age) values (?,?)";
// 開始時建立連接
@Override
public void open() throws Exception {
super.open();
connection = getConnection();
ps = this.connection.prepareStatement(insertSql);
System.out.println("open");
}
// 每個元素的插入都要調用一次invoke方法
public void invoke(String s, Context context) throws Exception {
//獲取到要插入的數據
String[] dataArray = s.split(",");
String name = dataArray[0];
String age = dataArray[1];
//將數據插入MySQL
ps.setString(1, name);
ps.setInt(2, Integer.parseInt(age));
ps.executeUpdate();
}
// 結束時關閉連接
@Override
public void close() throws Exception {
super.close();
if (ps != null) {
ps.close();
}
if (connection != null) {
connection.close();
}
}
}
}
在主程序中,需要如下代碼將寫入mysql的數據流與sink對象關聯:
//創建一個stream
DataStream<String> input = ...;
//將流寫入mysql中
input.addSink(new FlinkMysqlSinkExample.MySQLSink());
二、flinkmysql數據寫入慢及其原因分析
許多開發者通過FlinkmysqlSink將數據寫入MySQL時,會發現寫入速度非常慢。下面將分析造成寫入慢的原因,並給出相應的解決方案。
原因: 大多數情況下,寫入慢的原因是JDBC driver在向MySQL發送數據時,每次只發送一個SQL語句。當需要插入的數據量非常大時,這將導致發送大量的SQL語句,影響寫入的速度。
解決方案: 將數據寫入到緩存中並在達到一定量時批量插入MySQL。可以使用一個BatchPreparedStatementSetter對象,建立完整的SQL語句並設置一次預編譯語句中的所有參數,然後使用批量更新方法。
以下代碼演示如何進行批量插入:
public class BatchInsertUsers {
public static void main(String[] args) throws Exception {
//獲取到要插入的數據
String[] dataArray = { "user1,20", "user2,21", "user3,22", "user4,23", "user5,24",
"user6,25", "user7,26", "user8,27","user9,28", "user10,29","user11,30" };
Connection conn = getConnection();
String insertSql = "insert into users(name,age) values (?,?)";
PreparedStatement ps = conn.prepareStatement(insertSql);
int batchCount = 0;//統計批量的個數
for (String rowData : dataArray) {
String[] fields = rowData.split(",");
String name = fields[0];
int age = Integer.parseInt(fields[1]);
ps.setString(1, name);
ps.setInt(2, age);
ps.addBatch();// 添加到batch
if (++batchCount % 3 == 0) {// 批量提交
ps.executeBatch();
}
}
if (batchCount % 3 != 0) {//不足每個batch的數據時,且已經解析完了所有batch,就執行下面的代碼
ps.executeBatch();
}
conn.commit();// 提交事務
ps.close();
conn.close();
}
}
三、flinkmysqlSink jdbc選取
在Flink中,有多種方式與MySQL進行交互,其中最常見的方式是使用JDBC。此外,還可以使用一些第三方庫如MyBatis等。如果需要使用JDBC,請耐心看完本節。
1.導入mysqljdbc的依賴
在pom文件中增加如下dependency:
<dependencies>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.23</version>
</dependency>
</dependencies>
2.建立數據庫表
創建一個名為user的表:
CREATE TABLE `user` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`username` varchar(32) NOT NULL COMMENT '用戶名',
`password` varchar(32) NOT NULL COMMENT '密碼',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='用戶表';
3.創建一個簡單的程序
在Java代碼中進行配置及相關操作(需要提前建好數據庫出表):
public static void main(String[] args) throws ClassNotFoundException, SQLException {
String dbURL = "jdbc:mysql://localhost:3306/test?useSSL=false&useUnicode=true&characterEncoding=UTF-8";
String dbUser = "root";
String dbPass = "";
// 加載 mySQL 驅動
Class.forName("com.mysql.cj.jdbc.Driver");
// 獲得數據庫鏈接
Connection conn = DriverManager.getConnection(dbURL, dbUser, dbPass);
// 操作數據庫
Statement stmt = conn.createStatement();
String sql = "select * from user";
ResultSet rs = stmt.executeQuery(sql);
while (rs.next()) {
System.out.println(rs.getString("username"));
}
//關閉連接
rs.close();
stmt.close();
conn.close();
}
寫入數據到數據庫:
public static void main(String[] args) throws ClassNotFoundException, SQLException {
String dbURL = "jdbc:mysql://localhost:3306/test?useSSL=false&useUnicode=true&characterEncoding=UTF-8";
String dbUser = "root";
String dbPass = "";
Class.forName("com.mysql.cj.jdbc.Driver");
Connection conn = DriverManager.getConnection(dbURL, dbUser, dbPass);
Statement stmt = conn.createStatement();
String sql = "insert into user (username,password) values (?,?)";
PreparedStatement pstmt = conn.prepareStatement(sql);
// 填充佔位符
pstmt.setString(1, "好好學Java");
pstmt.setString(2, "123");
pstmt.executeUpdate();
pstmt.close();
stmt.close();
conn.close();
}
由此可見,通過jdbc操作MySQL數據庫也非常簡單。
總結
flinkmysql將流處理和關係型數據庫結合起來,為我們在海量數據的情況下進行流式處理以及數據存儲提供了一個可行的方案。通過使用FlinkmysqlSink,我們可以將數據流直接寫入mysql數據庫。解決了寫入慢的問題後,我們可以如上演示的使用BatchPreparedStatementSetter批量插入數據。 jdbc的使用也易上手,可根據需求選取合適的操作方式。
原創文章,作者:PAUSD,如若轉載,請註明出處:https://www.506064.com/zh-hk/n/315672.html