Secondary development Spark enables JDBC to read Hive data of remote tenant cluster and implement Hive2Hive data integration of Hive in this cluster [Java]

background

Shallow SQL boys may only know that after pyspark constructs the sparkSession object [of course, enableHiveSupport], write an SQL:

spark.sql("write a SQL string here");

Then spark will complete various operations of select ing, querying data, inserting and overwriting data into the result table according to the SQL here. For SQL boys, it's enough. After all, those who engage in data warehouse and ETL may only know SQL and only get SQL.

However, this degree of platform development is obviously not enough. For example, the most common data integration and data entry into the lake will certainly involve cross cluster, multi cluster, cross Kerberos domain and other issues. When dealing with data from heterogeneous data sources, SQL is still very weak. For example, I can't write a sentence of SQL and write the data of HDFS file block to FTP server after processing.

Normal data integration generally means that Spark reads Hive data of the cluster and pushes it to Hive of the remote tenant cluster. However, the CDP version of Spark will automatically read data according to the cluster configuration file. This "convenience" also leads to a lot of inconvenience when you want to read Hive data of the remote cluster. This paper is to solve this problem.

Principle analysis

Don't think that Java and pyspark can build multiple sparkSession objects. If Idea doesn't report an error before compilation, you can build multiple sparkSession objects to connect multiple hives.

Spark Session object can only set one Thrift Server link in the whole life cycle. In other words, Spark Session can only connect one Hive at the same time, and cannot read or write hives across clusters.

Considering that Hive has a way of accessing MySQL via JDBC, try to read and write Hive via JDBC. But I encountered some problems. Pick up the source code, jdbcdialects Found in scala:

package org.apache.spark.sql.jdbc 

//Line 222 
package org.apache.spark.sql.jdbc
#Near line 222


/**
 * :: DeveloperApi ::
 * Registry of dialects that apply to every new jdbc `org.apache.spark.sql.DataFrame`.
 *
 * If multiple matching dialects are registered then all matching ones will be
 * tried in reverse order. A user-added dialect will thus be applied first,
 * overwriting the defaults.
 *
 * @note All new dialects are applied to new jdbc DataFrames only. Make
 * sure to register your dialects first.
 */
@DeveloperApi
@InterfaceStability.Evolving
object JdbcDialects {

  /**
   * Register a dialect for use on all new matching jdbc `org.apache.spark.sql.DataFrame`.
   * Reading an existing dialect will cause a move-to-front.
   *
   * @param dialect The new dialect.
   */
  def registerDialect(dialect: JdbcDialect) : Unit = {
    dialects = dialect :: dialects.filterNot(_ == dialect)
  }

  /**
   * Unregister a dialect. Does nothing if the dialect is not registered.
   *
   * @param dialect The jdbc dialect.
   */
  def unregisterDialect(dialect : JdbcDialect) : Unit = {
    dialects = dialects.filterNot(_ == dialect)
  }

  private[this] var dialects = List[JdbcDialect]()

  registerDialect(MySQLDialect)
  registerDialect(PostgresDialect)
  registerDialect(DB2Dialect)
  registerDialect(MsSqlServerDialect)
  registerDialect(DerbyDialect)
  registerDialect(OracleDialect)
  registerDialect(TeradataDialect)

  /**
   * Fetch the JdbcDialect class corresponding to a given database url.
   */
  def get(url: String): JdbcDialect = {
    val matchingDialects = dialects.filter(_.canHandle(url))
    matchingDialects.length match {
      case 0 => NoopDialect
      case 1 => matchingDialects.head
      case _ => new AggregatedDialect(matchingDialects)
    }
  }
}

Spark's JDBC natively supports 7 kinds of DB such as MySQL and SQL Server, but it doesn't support Hive!!! Then, if you want spark to support JDBC access to Hive, Kylin, Druid, Doris, ClickHouse and other components, you need to turn it on and realize relevant functions by yourself.

Second open Spark

Scala's solution:

case object HiveSqlDialect extends JdbcDialect { 

override def canHandle(url: String): Boolean = url.startsWith("jdbc:hive2") 

override def quoteIdentifier(colName: String): String = { colName.split('.').map(part => s"`$part`").mkString(".") } }

class RegisterHiveSqlDialect { 
def register(): Unit = { 
	JdbcDialects.registerDialect(HiveSqlDialect) 
	} 
}

However, Scala may still be very popular in 2018, and not many people will use it in 2020. In order to avoid problems, use Java Refactoring:

package com.zhiyong.Hive2Hive; 

import org.apache.spark.sql.jdbc.JdbcDialect; 

public class HiveDialect extends JdbcDialect { 

@Override
public boolean canHandle(String url) { 
	return url.startsWith("jdbc:hive2"); 
}

@Override 
public String quoteIdentifier(String colName) { 
	//Need to return colname split('.'). map(part => s"`$part`"). mkString(".") 
	String[] split1 = colName.split("\\.");//Split string first 
	String[] split2 = new String[split1.length]; 
	StringBuilder strb = new StringBuilder(); 
	String result = null; 
	int index = 0; 

	//Call the map map to read the value 
	for (String part : split1) { 
		//split2[index] = "`$" + part + "`"; 
		split2[index] = "`" + part + "`"; 
		index++; 
	}
	//use. Splice string 
	for (int i = 0; i < split2.length; i++) { 
		String cell = split2[i]; 
		if (i != 0) { strb.append("."); }
		strb.append(cell); 
	}

	result = strb.toString(); 
	return result; //Return string STR = colname split("."). map(part => s"`$part`"). mkString("."); 
	} 
}

After the second opening, it can be used in the main class:

JdbcDialect hiveDialect = new HiveDialect(); 
JdbcDialects.registerDialect(hiveDialect);//Recoding to prevent error reporting 
Map<String, String> sourceMap = new LinkedHashMap<>(); 

sourceMap.put("url", "jdbc:hive2://192.168.88.11:10000"); 
sourceMap.put("driver", "org.apache.hive.jdbc.HiveDriver"); 
sourceMap.put("user", "root"); sourceMap.put("password", "123456");
sourceMap.put("dbtable", "aaa.test1"); 

SparkSession sc = SparkSession.builder()
	.appName("aaaaa")
	.master("local[*]") 
	.enableHiveSupport() //Here is only Hive connected to this cluster
	.getOrCreate(); 

Dataset<Row> jdbcReadDF = sc.read()
	.format("jdbc")
	.options(sourceMap)
	.load();//Use JDBC to read hive data

You can read Hive data through JDBC.

Test effect

After testing, you can also pass Kerberos authentication and get data on the cloud desktop. Take it this way
There is a problem with the DF header (with its own table name):

The header length obtained by standard Spark SQL (or DSL) is as follows:

Solve the problem of header

Of course, there is no way to write data if the header is different.

Fortunately, Spark has a way to replace the meter:

Map<String, String> targetSource = new LinkedHashMap<>(); 

targetSource.put("url", "jdbc:hive2://192.168.88.11:10000"); 
targetSource.put("driver", "org.apache.hive.jdbc.HiveDriver"); 
targetSource.put("user", "root"); 
targetSource.put("password", "123456"); 
targetSource.put("dbtable", "aaa.test4"); 

Dataset<Row> jdbcReadDF2 = sc.read()
	.format("jdbc")
	.options(targetSource)
	.load();//Use JDBC to read the data of the target hive (for obtaining the header) 
Dataset<Row> proDF2 = sc
	.createDataFrame(jdbcReadDF.rdd(), jdbcReadDF2.schema());//Replace the meter

Read the table through JDBC again. Since the two tables have the same structure, the obtained header can directly replace the old DataFrame. Then directly spark SQL ("select * from db1.tb1 limit1") can also get the header. This problem is solved.

JDBC write failure

However, after replacing the header, it still cannot be written in JDBC:

jdbcReadDF.write()
	.format("jdbc")
	.options(targetSource) 
	//. mode(SaveMode.Append) / / this line is not allowed, and an error method not support will be reported 
	.save();

As we all know, there are four modes. If the default call is not set, an error will be reported if it already exists (this is the case), and ignore ignores and does not process. These two modes are useless. However, after switching to append or overwrite mode, an error method not support is reported directly. It's weird.

When mode is not written:

Exception in thread "main" org.apache.spark.sql.AnalysisException: Table or view 'aaa.test4' already exists. SaveMode: ErrorIfExists.;

In the jdbcrelationprovider In scala:

package org.apache.spark.sql.execution.datasources.jdbc

#Near line 71
case SaveMode.ErrorIfExists =>
            throw new AnalysisException(
              s"Table or view '${options.table}' already exists. " +
                s"SaveMode: ErrorIfExists.")

You can't use the default mode.

use. mode(SaveMode.Overwrite): if the target table is deleted (as seen in beeline), an error will be reported:

Exception in thread "main" org.apache.hive.service.cli.HiveSQLException: Error while compiling statement: FAILED: ParseException line 1:31 cannot recognize input near '.' 'sid' 'INTEGER' in column type
	at org.apache.hive.jdbc.Utils.verifySuccess(Utils.java:264)
	at org.apache.hive.jdbc.Utils.verifySuccessWithInfo(Utils.java:250)
	at org.apache.hive.jdbc.HiveStatement.runAsyncOnServer(HiveStatement.java:309)
	at org.apache.hive.jdbc.HiveStatement.execute(HiveStatement.java:250)
	at org.apache.hive.jdbc.HiveStatement.executeUpdate(HiveStatement.java:448)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.createTable(JdbcUtils.scala:863)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:62)
	at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:83)
	at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:81)
	at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
	at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
	at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:80)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:127)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:75)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
	at com.zhiyong.Hive2Hive.Hive2HiveJDBCDemo.main(Hive2HiveJDBCDemo.java:133)
Caused by: org.apache.hive.service.cli.HiveSQLException: Error while compiling statement: FAILED: ParseException line 1:31 cannot recognize input near '.' 'sid' 'INTEGER' in column type
	at org.apache.hive.service.cli.operation.Operation.toSQLException(Operation.java:387)
	at org.apache.hive.service.cli.operation.SQLOperation.prepare(SQLOperation.java:186)
	at org.apache.hive.service.cli.operation.SQLOperation.runInternal(SQLOperation.java:269)
	at org.apache.hive.service.cli.operation.Operation.run(Operation.java:324)
	at org.apache.hive.service.cli.session.HiveSessionImpl.executeStatementInternal(HiveSessionImpl.java:460)
	at org.apache.hive.service.cli.session.HiveSessionImpl.executeStatementAsync(HiveSessionImpl.java:447)
	at sun.reflect.GeneratedMethodAccessor11.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.hive.service.cli.session.HiveSessionProxy.invoke(HiveSessionProxy.java:78)
	at org.apache.hive.service.cli.session.HiveSessionProxy.access$000(HiveSessionProxy.java:36)
	at org.apache.hive.service.cli.session.HiveSessionProxy$1.run(HiveSessionProxy.java:63)
	at java.security.AccessController.doPrivileged(Native Method)
	at javax.security.auth.Subject.doAs(Subject.java:422)
	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
	at org.apache.hive.service.cli.session.HiveSessionProxy.invoke(HiveSessionProxy.java:59)
	at com.sun.proxy.$Proxy33.executeStatementAsync(Unknown Source)
	at org.apache.hive.service.cli.CLIService.executeStatementAsync(CLIService.java:294)
	at org.apache.hive.service.cli.thrift.ThriftCLIService.ExecuteStatement(ThriftCLIService.java:497)
	at org.apache.hive.service.rpc.thrift.TCLIService$Processor$ExecuteStatement.getResult(TCLIService.java:1437)
	at org.apache.hive.service.rpc.thrift.TCLIService$Processor$ExecuteStatement.getResult(TCLIService.java:1422)
	at org.apache.thrift.ProcessFunction.process(ProcessFunction.java:39)
	at org.apache.thrift.TBaseProcessor.process(TBaseProcessor.java:39)
	at org.apache.hive.service.auth.TSetIpAddressProcessor.process(TSetIpAddressProcessor.java:56)
	at org.apache.thrift.server.TThreadPoolServer$WorkerProcess.run(TThreadPoolServer.java:286)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: org.apache.hadoop.hive.ql.parse.ParseException:line 1:31 cannot recognize input near '.' 'sid' 'INTEGER' in column type
	at org.apache.hadoop.hive.ql.parse.ParseDriver.parse(ParseDriver.java:207)
	at org.apache.hadoop.hive.ql.parse.ParseDriver.parse(ParseDriver.java:166)
	at org.apache.hadoop.hive.ql.Driver.compile(Driver.java:404)
	at org.apache.hadoop.hive.ql.Driver.compile(Driver.java:329)
	at org.apache.hadoop.hive.ql.Driver.compileInternal(Driver.java:1158)
	at org.apache.hadoop.hive.ql.Driver.compileAndRespond(Driver.java:1145)
	at org.apache.hive.service.cli.operation.SQLOperation.prepare(SQLOperation.java:184)
	... 26 more

Process finished with exit code 1

use. After mode(SaveMode.Append):

21/10/07 14:26:23 WARN JdbcUtils: Requested isolation level 1, but transactions are unsupported
    
proDF2.write().format("jdbc")
        .mode(SaveMode.Append)
        .options(targetSource)
        .option("isolationLevel", "NONE")//Solve the above problems that do not support transactions
        .option("numPartitions", "1")//Solve the above problems that do not support transactions
        .save();    
    
21/10/07 14:26:23 ERROR Executor: Exception in task 0.0 in stage 11.0 (TID 11)
java.sql.SQLException: Method not supported
	at org.apache.hive.jdbc.HivePreparedStatement.addBatch(HivePreparedStatement.java:75)
    
   //HivePreparedStatement. 75 lines of Java
   //* (non-Javadoc)
   //*
   //* @see java.sql.PreparedStatement#addBatch()

  public void addBatch() throws SQLException {
    // TODO Auto-generated method stub
    throw new SQLException("Method not supported");
  }
    */
    
    
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.savePartition(JdbcUtils.scala:668)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$saveTable$1.apply(JdbcUtils.scala:838)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$saveTable$1.apply(JdbcUtils.scala:838)
	at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:980)
	at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:980)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
21/10/07 14:26:23 WARN TaskSetManager: Lost task 0.0 in stage 11.0 (TID 11, localhost, executor driver): java.sql.SQLException: Method not supported
	at org.apache.hive.jdbc.HivePreparedStatement.addBatch(HivePreparedStatement.java:75)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.savePartition(JdbcUtils.scala:668)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$saveTable$1.apply(JdbcUtils.scala:838)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$saveTable$1.apply(JdbcUtils.scala:838)
	at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:980)
	at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:980)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

21/10/07 14:26:23 ERROR TaskSetManager: Task 0 in stage 11.0 failed 1 times; aborting job
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 11.0 failed 1 times, most recent failure: Lost task 0.0 in stage 11.0 (TID 11, localhost, executor driver): java.sql.SQLException: Method not supported
	at org.apache.hive.jdbc.HivePreparedStatement.addBatch(HivePreparedStatement.java:75)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.savePartition(JdbcUtils.scala:668)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$saveTable$1.apply(JdbcUtils.scala:838)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$saveTable$1.apply(JdbcUtils.scala:838)
	at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:980)
	at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:980)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1891)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1879)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1878)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1878)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:927)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:927)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:927)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2112)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2061)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2050)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:738)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
	at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:980)
	at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:978)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:385)
	at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:978)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.saveTable(JdbcUtils.scala:838)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:68)
	at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:83)
	at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:81)
	at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
	at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
	at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:80)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:127)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:75)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
	at com.zhiyong.Hive2Hive.Hive2HiveJDBCDemo.main(Hive2HiveJDBCDemo.java:133)
Caused by: java.sql.SQLException: Method not supported
	at org.apache.hive.jdbc.HivePreparedStatement.addBatch(HivePreparedStatement.java:75)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.savePartition(JdbcUtils.scala:668)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$saveTable$1.apply(JdbcUtils.scala:838)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$saveTable$1.apply(JdbcUtils.scala:838)
	at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:980)
	at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:980)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Process finished with exit code 1

If you follow the error reporting point, you can find that after integrating JDBC, spark also calls Hive's JDBC to write data, but the addBatch method of Hive's JDBC itself simply reports an error: method not support.

Facts have proved that there is no problem reading Hive in JDBC mode, but writing seems not very good.

If the header is replaced by the header of the normal DF reading table, the DataFrame operator can be used to write data to the target table smoothly:

proDF.write()
	.format("Hive")
	.mode(SaveMode.Append)
	.saveAsTable("aaa.test4");/ /Write to in normal mode Hive Target table

So far, the cross cluster function of Hive2Hive has been realized. DEMO verification is used in both the development machine (self built Apache VM cluster) and the cloud desktop (Eclipse+CDP7.1.6.0). The cloud desktop has passed Kerberos and written data at one time. Facts have proved that this method is feasible.

Architecture change

This method is different from normal data integration: the hive on the source side is equivalent to remote, and the hive on the target side is equivalent to local.

In normal data integration and data pushing, the hive on the source side should be local [host cluster], and the target side should be remote [tenant cluster]. For example, hive → FTP fixed length double files, hive on the source side is local, FTP is remote server, and jar is also a yarn cluster running in local.

The hive2hive implemented in this way can read and write across clusters. If it is used as push data, jar is
Run the yarn cluster in remote. Illustrate with a diagram:


Became:

In short, when it is used to push data, Spark's Job (labeled as Jar package) needs to run in the Yarn cluster on the target end after cross cluster. This method is more like the tenant cluster actively pulling the host group data.

However, the need to read and write Hive across clusters can be realized in this way.

New problems brought by

It can be seen that this method solves the problem of reading and writing two hives across clusters, but it also produces new problems. When used to push data, the original architecture is destroyed, resulting in architecture confusion. If the original architecture is maintained, remote data push cannot be realized. Therefore, this method is most suitable for pulling the Hive data from the remote cluster to the Hive of this cluster [this Hive - > Hive data integration is a special case of various data sources - > Hive data of this cluster entering the lake], rather than pushing data. There are more ways to remotely push data across clusters to tenant clusters.

When Hive reads data, it runs MapReduce [or Tez, Spark] and will certainly eat the Yan resources of the cluster. Of course, this method will also eat the Yan resources of the remote cluster, but the string parameter of JDBC connection string is certainly better than managing a large lump of core site xml,hdfs-site.xml,yarn-site.xml configuration files are much easier, and the cost is affordable.

Tags: Java Hadoop hive JDBC Spark

Posted by Fearless_Fish on Wed, 04 May 2022 16:16:50 +0300