本文共 7329 字,大约阅读时间需要 24 分钟。
实现思想,每来一条数据,按照数据 更新、删除、追加三种情况直接用JDBC的方式更新数据库,然后在flatmap函数中,用最新的所有数据通过JDBC的处理,得到最新的结果,样例如下
package com.baiimport java.sqlimport java.sql.{DriverManager, PreparedStatement, ResultSet, ResultSetMetaData}import com.alibaba.fastjson.{JSON, JSONObject}import org.apache.flink.api.common.functions.RichFlatMapFunctionimport org.apache.flink.streaming.api.scala._import org.apache.flink.util.Collectorobject JDBCtest { var conn: sql.Connection = _ var selectStmt: PreparedStatement = _ var insertStmt: PreparedStatement = _ def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI() val ncStream = env.socketTextStream("hadoop104", 7777) ncStream.print("before") val value1: DataStream[String] = ncStream.filter(_.matches("(\\d)+,(.)*")) value1.print("after") val value2: DataStream[Student] = value1.flatMap(new RichFlatMapFunction[String, Student] { override def flatMap(value: String, out: Collector[Student]) = { val arr: Array[String] = value.split(",", 2) conn = DriverManager.getConnection("jdbc:mysql://hadoop103:3306/test", "root", "123456") println(arr.toList) insertStmt = conn.prepareStatement("insert into input (id, name) values (?,?)") insertStmt.setInt(1, arr(0).toInt) insertStmt.setString(2, arr(1)) insertStmt.execute() selectStmt = conn.prepareStatement("select * from input") val data: ResultSetMetaData = selectStmt.getMetaData val resultSet: ResultSet = selectStmt.executeQuery() while (resultSet.next()) { val map = new java.util.HashMap[String, String]() val jSONObject = new JSONObject() for (i <- 1 to data.getColumnCount) { val key: String = data.getColumnName(i) val value: String = resultSet.getString(i) map.put(key, value) jSONObject.put(key, value) } val a: Student = JSON.parseObject(jSONObject.toString(), classOf[Student]) out.collect(a) } selectStmt.close() conn.close() } }) value2.print("dd") env.execute() }}case class Student(id: Int, name: String)maven 依赖如下
UTF-8 1.11.0 2.11 2.11.12 2.12.1 2.7.3 mysql mysql-connector-java 5.1.44 org.apache.flink flink-scala_${scala.binary.version} ${flink.version} org.apache.flink flink-streaming-scala_${scala.binary.version} ${flink.version} org.apache.flink flink-clients_${scala.binary.version} ${flink.version} org.apache.flink flink-table-api-scala-bridge_${scala.binary.version} ${flink.version} org.apache.flink flink-table-planner-blink_${scala.binary.version} ${flink.version} org.apache.flink flink-table-common ${flink.version} org.apache.flink flink-runtime-web_2.11 ${flink.version} org.scala-lang scala-library ${scala.version} org.apache.flink flink-statebackend-rocksdb_${scala.binary.version} ${flink.version} org.apache.flink flink-connector-kafka_${scala.binary.version} ${flink.version} org.apache.hbase hbase-server 2.2.4 provided com.google.guava guava 29.0-jre com.alibaba fastjson 1.2.68 org.apache.logging.log4j log4j-slf4j-impl ${log4j.version} runtime org.apache.logging.log4j log4j-api ${log4j.version} runtime org.apache.logging.log4j log4j-core ${log4j.version} runtime org.apache.hadoop hadoop-common ${hadoop.version} provided org.slf4j slf4j-log4j12 log4j log4j org.slf4j slf4j-api commons-logging commons-logging org.apache.hadoop hadoop-hdfs ${hadoop.version} provided log4j log4j org.apache.hadoop hadoop-client ${hadoop.version} provided jdk.tools jdk.tools 1.8 system ${JAVA_HOME}/lib/tools.jar org.apache.maven.plugins maven-compiler-plugin 3.6.1 1.8 1.8 org.scala-tools maven-scala-plugin 2.15.1 compile-scala add-source compile test-compile-scala add-source testCompile org.apache.maven.plugins maven-assembly-plugin 3.0.0 make-assembly package single net.alchim31.maven scala-maven-plugin 3.2.2 compile testCompile
转载地址:http://kigfk.baihongyu.com/