Flink是一个高性能、分布式的流处理框架,能够处理大规模数据流处理的场景。而MySQL是一种关系型数据库,是应用程序常用的数据库之一。结合起来,flinkmysql能够满足海量数据的流式处理和数据存储的需求。本文将从三个方面深入讲解flinkmysql的应用,并提供完整的代码示例。
一、FlinkmysqlSink——flink中与mysql的交互组件
Flink提供了与mysql数据库进行交互的组件,即FlinkmysqlSink,能够将数据流直接写入mysql数据库中。
首先,需要在pom文件中添加mysql-connector-java依赖:
<dependencies>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.32</version>
</dependency>
</dependencies>
然后,创建一个sink对象,将数据写入mysql中:
// 导入必要的包
import java.sql.Connection;
import java.sql.PreparedStatement;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import java.sql.DriverManager;
import java.sql.SQLException;
public class FlinkMysqlSinkExample {
// 建立mysql连接
private static Connection getConnection() throws Exception {
String driver = "com.mysql.jdbc.Driver";
String url = "jdbc:mysql://localhost:3306/test";
String username = "root";
String password = "root";
Class.forName(driver);
Connection conn = DriverManager.getConnection(url, username, password);
return conn;
}
public static class MySQLSink extends RichSinkFunction<String> {
private PreparedStatement ps;
private Connection connection;
private String insertSql = "insert into users(name,age) values (?,?)";
// 开始时建立连接
@Override
public void open() throws Exception {
super.open();
connection = getConnection();
ps = this.connection.prepareStatement(insertSql);
System.out.println("open");
}
// 每个元素的插入都要调用一次invoke方法
public void invoke(String s, Context context) throws Exception {
//获取到要插入的数据
String[] dataArray = s.split(",");
String name = dataArray[0];
String age = dataArray[1];
//将数据插入MySQL
ps.setString(1, name);
ps.setInt(2, Integer.parseInt(age));
ps.executeUpdate();
}
// 结束时关闭连接
@Override
public void close() throws Exception {
super.close();
if (ps != null) {
ps.close();
}
if (connection != null) {
connection.close();
}
}
}
}
在主程序中,需要如下代码将写入mysql的数据流与sink对象关联:
//创建一个stream
DataStream<String> input = ...;
//将流写入mysql中
input.addSink(new FlinkMysqlSinkExample.MySQLSink());
二、flinkmysql数据写入慢及其原因分析
许多开发者通过FlinkmysqlSink将数据写入MySQL时,会发现写入速度非常慢。下面将分析造成写入慢的原因,并给出相应的解决方案。
原因: 大多数情况下,写入慢的原因是JDBC driver在向MySQL发送数据时,每次只发送一个SQL语句。当需要插入的数据量非常大时,这将导致发送大量的SQL语句,影响写入的速度。
解决方案: 将数据写入到缓存中并在达到一定量时批量插入MySQL。可以使用一个BatchPreparedStatementSetter对象,建立完整的SQL语句并设置一次预编译语句中的所有参数,然后使用批量更新方法。
以下代码演示如何进行批量插入:
public class BatchInsertUsers {
public static void main(String[] args) throws Exception {
//获取到要插入的数据
String[] dataArray = { "user1,20", "user2,21", "user3,22", "user4,23", "user5,24",
"user6,25", "user7,26", "user8,27","user9,28", "user10,29","user11,30" };
Connection conn = getConnection();
String insertSql = "insert into users(name,age) values (?,?)";
PreparedStatement ps = conn.prepareStatement(insertSql);
int batchCount = 0;//统计批量的个数
for (String rowData : dataArray) {
String[] fields = rowData.split(",");
String name = fields[0];
int age = Integer.parseInt(fields[1]);
ps.setString(1, name);
ps.setInt(2, age);
ps.addBatch();// 添加到batch
if (++batchCount % 3 == 0) {// 批量提交
ps.executeBatch();
}
}
if (batchCount % 3 != 0) {//不足每个batch的数据时,且已经解析完了所有batch,就执行下面的代码
ps.executeBatch();
}
conn.commit();// 提交事务
ps.close();
conn.close();
}
}
三、flinkmysqlSink jdbc选取
在Flink中,有多种方式与MySQL进行交互,其中最常见的方式是使用JDBC。此外,还可以使用一些第三方库如MyBatis等。如果需要使用JDBC,请耐心看完本节。
1.导入mysqljdbc的依赖
在pom文件中增加如下dependency:
<dependencies>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.23</version>
</dependency>
</dependencies>
2.建立数据库表
创建一个名为user的表:
CREATE TABLE `user` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`username` varchar(32) NOT NULL COMMENT '用户名',
`password` varchar(32) NOT NULL COMMENT '密码',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='用户表';
3.创建一个简单的程序
在Java代码中进行配置及相关操作(需要提前建好数据库出表):
public static void main(String[] args) throws ClassNotFoundException, SQLException {
String dbURL = "jdbc:mysql://localhost:3306/test?useSSL=false&useUnicode=true&characterEncoding=UTF-8";
String dbUser = "root";
String dbPass = "";
// 加载 mySQL 驱动
Class.forName("com.mysql.cj.jdbc.Driver");
// 获得数据库链接
Connection conn = DriverManager.getConnection(dbURL, dbUser, dbPass);
// 操作数据库
Statement stmt = conn.createStatement();
String sql = "select * from user";
ResultSet rs = stmt.executeQuery(sql);
while (rs.next()) {
System.out.println(rs.getString("username"));
}
//关闭连接
rs.close();
stmt.close();
conn.close();
}
写入数据到数据库:
public static void main(String[] args) throws ClassNotFoundException, SQLException {
String dbURL = "jdbc:mysql://localhost:3306/test?useSSL=false&useUnicode=true&characterEncoding=UTF-8";
String dbUser = "root";
String dbPass = "";
Class.forName("com.mysql.cj.jdbc.Driver");
Connection conn = DriverManager.getConnection(dbURL, dbUser, dbPass);
Statement stmt = conn.createStatement();
String sql = "insert into user (username,password) values (?,?)";
PreparedStatement pstmt = conn.prepareStatement(sql);
// 填充占位符
pstmt.setString(1, "好好学Java");
pstmt.setString(2, "123");
pstmt.executeUpdate();
pstmt.close();
stmt.close();
conn.close();
}
由此可见,通过jdbc操作MySQL数据库也非常简单。
总结
flinkmysql将流处理和关系型数据库结合起来,为我们在海量数据的情况下进行流式处理以及数据存储提供了一个可行的方案。通过使用FlinkmysqlSink,我们可以将数据流直接写入mysql数据库。解决了写入慢的问题后,我们可以如上演示的使用BatchPreparedStatementSetter批量插入数据。 jdbc的使用也易上手,可根据需求选取合适的操作方式。
原创文章,作者:PAUSD,如若转载,请注明出处:https://www.506064.com/n/315672.html