博客
关于我
在数据量小的情况下,flink基于事件驱动,处理更新、删除、追加历史数据
阅读量:798 次
发布时间:2023-04-16

本文共 7329 字,大约阅读时间需要 24 分钟。

实现思想,每来一条数据,按照数据 更新、删除、追加三种情况直接用JDBC的方式更新数据库,然后在flatmap函数中,用最新的所有数据通过JDBC的处理,得到最新的结果,样例如下

package com.baiimport java.sqlimport java.sql.{DriverManager, PreparedStatement, ResultSet, ResultSetMetaData}import com.alibaba.fastjson.{JSON, JSONObject}import org.apache.flink.api.common.functions.RichFlatMapFunctionimport org.apache.flink.streaming.api.scala._import org.apache.flink.util.Collectorobject JDBCtest {  var conn: sql.Connection = _  var selectStmt: PreparedStatement = _  var insertStmt: PreparedStatement = _  def main(args: Array[String]): Unit = {    val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI()    val ncStream = env.socketTextStream("hadoop104", 7777)    ncStream.print("before")    val value1: DataStream[String] = ncStream.filter(_.matches("(\\d)+,(.)*"))    value1.print("after")    val value2: DataStream[Student] = value1.flatMap(new RichFlatMapFunction[String, Student] {      override def flatMap(value: String, out: Collector[Student]) = {        val arr: Array[String] = value.split(",", 2)        conn = DriverManager.getConnection("jdbc:mysql://hadoop103:3306/test", "root", "123456")        println(arr.toList)        insertStmt = conn.prepareStatement("insert into input (id, name) values (?,?)")        insertStmt.setInt(1, arr(0).toInt)        insertStmt.setString(2, arr(1))        insertStmt.execute()        selectStmt = conn.prepareStatement("select * from input")        val data: ResultSetMetaData = selectStmt.getMetaData        val resultSet: ResultSet = selectStmt.executeQuery()        while (resultSet.next()) {          val map = new java.util.HashMap[String, String]()          val jSONObject = new JSONObject()          for (i <- 1 to data.getColumnCount) {            val key: String = data.getColumnName(i)            val value: String = resultSet.getString(i)            map.put(key, value)            jSONObject.put(key, value)          }          val a: Student = JSON.parseObject(jSONObject.toString(), classOf[Student])          out.collect(a)        }        selectStmt.close()        conn.close()      }    })    value2.print("dd")    env.execute()  }}case class Student(id: Int, name: String)

maven 依赖如下

UTF-8
1.11.0
2.11
2.11.12
2.12.1
2.7.3
mysql
mysql-connector-java
5.1.44
org.apache.flink
flink-scala_${scala.binary.version}
${flink.version}
org.apache.flink
flink-streaming-scala_${scala.binary.version}
${flink.version}
org.apache.flink
flink-clients_${scala.binary.version}
${flink.version}
org.apache.flink
flink-table-api-scala-bridge_${scala.binary.version}
${flink.version}
org.apache.flink
flink-table-planner-blink_${scala.binary.version}
${flink.version}
org.apache.flink
flink-table-common
${flink.version}
org.apache.flink
flink-runtime-web_2.11
${flink.version}
org.scala-lang
scala-library
${scala.version}
org.apache.flink
flink-statebackend-rocksdb_${scala.binary.version}
${flink.version}
org.apache.flink
flink-connector-kafka_${scala.binary.version}
${flink.version}
org.apache.hbase
hbase-server
2.2.4
provided
com.google.guava
guava
29.0-jre
com.alibaba
fastjson
1.2.68
org.apache.logging.log4j
log4j-slf4j-impl
${log4j.version}
runtime
org.apache.logging.log4j
log4j-api
${log4j.version}
runtime
org.apache.logging.log4j
log4j-core
${log4j.version}
runtime
org.apache.hadoop
hadoop-common
${hadoop.version}
provided
org.slf4j
slf4j-log4j12
log4j
log4j
org.slf4j
slf4j-api
commons-logging
commons-logging
org.apache.hadoop
hadoop-hdfs
${hadoop.version}
provided
log4j
log4j
org.apache.hadoop
hadoop-client
${hadoop.version}
provided
jdk.tools
jdk.tools
1.8
system
${JAVA_HOME}/lib/tools.jar
org.apache.maven.plugins
maven-compiler-plugin
3.6.1
1.8
1.8
org.scala-tools
maven-scala-plugin
2.15.1
compile-scala
add-source
compile
test-compile-scala
add-source
testCompile
org.apache.maven.plugins
maven-assembly-plugin
3.0.0
make-assembly
package
single
net.alchim31.maven
scala-maven-plugin
3.2.2
compile
testCompile

转载地址:http://kigfk.baihongyu.com/

你可能感兴趣的文章
mysqli
查看>>
MySQLIntegrityConstraintViolationException异常处理
查看>>
mysqlreport分析工具详解
查看>>
MySQLSyntaxErrorException: Unknown error 1146和SQLSyntaxErrorException: Unknown error 1146
查看>>
Mysql_Postgresql中_geometry数据操作_st_astext_GeomFromEWKT函数_在java中转换geometry的16进制数据---PostgreSQL工作笔记007
查看>>
mysql_real_connect 参数注意
查看>>
mysql_secure_installation初始化数据库报Access denied
查看>>
MySQL_西安11月销售昨日未上架的产品_20161212
查看>>
Mysql——深入浅出InnoDB底层原理
查看>>
MySQL“被动”性能优化汇总
查看>>
MySQL、HBase 和 Elasticsearch:特点与区别详解
查看>>
MySQL、Redis高频面试题汇总
查看>>
MYSQL、SQL Server、Oracle数据库排序空值null问题及其解决办法
查看>>
mysql一个字段为空时使用另一个字段排序
查看>>
MySQL一个表A中多个字段关联了表B的ID,如何关联查询?
查看>>
MYSQL一直显示正在启动
查看>>
MySQL一站到底!华为首发MySQL进阶宝典,基础+优化+源码+架构+实战五飞
查看>>
MySQL万字总结!超详细!
查看>>
Mysql下载以及安装(新手入门,超详细)
查看>>
MySQL不会性能调优?看看这份清华架构师编写的MySQL性能优化手册吧
查看>>