df.foreachPartition(iter => { val conn = ds.getConnection val sql = """ |INSERT INTO test_table (uid,a,b,c,d,e) |VALUES (?,?,?,?,?,?) |ON DUPLICATE KEY |UPDATE c = ?, d = ? |""".stripMargin val ps = conn.prepareStatement(sql) iter.foreach(row => { val uid = row.getAs[Long]("pid") val a = row.getAs[Long]("a") val b = row.getAs[String]("b") val c = row.getAs[java.math.BigDecimal]("c") val d = row.getAs[java.math.BigDecimal]("d") val e = row.getAs[Byte]("e") ps.setLong(1, uid) ps.setLong(2, a) ps.setString(3, b) ps.setBigDecimal(4, c) ps.setBigDecimal(5, d) ps.setByte(6, e) ps.setBigDecimal(7, c) ps.setBigDecimal(8, d) ps.executeUpdate() }) DbUtil.close(conn) })
代码封装示例
基于上面的介绍,可以将Spark读写MySQL进行一个简单地封装,使用起来会更加方便:
/** * 读取 MySQL 表,并行读取时固定 id 为分区字段 * * @param spark SparkSession * @param table 表名 * @param partitionNum 分区数量 * @param filterKey 过滤字段 * @param filterMin 过滤条件最小值 * @param filterMax 过滤条件最大值 * @param jdbcUrl url * @param jdbcUser user * @param jdbcPass password * @return */ defreadMysqlPartById(spark: SparkSession, table: String, partitionNum: Int = 0, filterKey: String = "", filterMin: String = "", filterMax: String = "", jdbcUrl: String = url, jdbcUser: String = user, jdbcPass: String = pass): DataFrame = { val conn = spark.read .format("jdbc") .option("url", jdbcUrl) .option("user", jdbcUser) .option("password", jdbcPass) .option("driver", "com.mysql.jdbc.Driver") if (partitionNum == 0) { conn.option("dbtable", table).load() } esle { if ("".equals(filterKey)) { // 读取最大id val ids = conn.option("dbtable", s"(select min(id),max(id) from $table) tmp") .load() .first() if (ids.isNullAt(0)) { spark.emptyDataFrame } else { val minId = String.valueOf(ids.get(0)).toLong val maxId = String.valueOf(ids.get(1)).toLong conn.option("dbtable", table) .option("numPartitions", partitionNum) .option("partitionColumn", "id") .option("lowerBound", minId) .option("upperBound", maxId) .load() } } else { val filter = s"where $filterKey between '$filterMin' and '$filterMax'" // 读取最大id val ids = conn.option("dbtable", s"(select min(id),max(id) from $table$filter) tmp") .load() .first() if (ids.isNullAt(0)) { spark.emptyDataFrame } else { val minId = String.valueOf(ids.get(0)).toLong val maxId = String.valueOf(ids.get(1)).toLong conn.option("dbtable", s"(select * from $table$filter) tmp") .option("numPartitions", partitionNum) .option("partitionColumn", "id") .option("lowerBound", minId) .option("upperBound", maxId) .load() } } } }