fix(writer): spark 38811 insert alter table add columns#3479
fix(writer): spark 38811 insert alter table add columns#3479Shekharrajak wants to merge 5 commits intoapache:mainfrom
Conversation
| } | ||
|
|
||
| // Refresh the catalog table cache so subsequent reads see the new data | ||
| catalogTable.foreach { ct => |
There was a problem hiding this comment.
this was different issue - while running the test, realised table needs to be refreshed to get the new data.
| .setOutputPath(outputPath) | ||
| .setCompression(codec) | ||
| .addAllColumnNames(cmd.query.output.map(_.name).asJava) | ||
| .addAllColumnNames(cmd.outputColumnNames.asJava) |
There was a problem hiding this comment.
Operator {
plan_id: 42
parquet_writer: ParquetWriter {
output_path: "file:/.../spark-warehouse/t"
compression: SNAPPY
column_names: ["i", "s"]
}
......
spark/src/test/scala/org/apache/comet/parquet/CometParquetWriterSuite.scala
Outdated
Show resolved
Hide resolved
andygrove
left a comment
There was a problem hiding this comment.
Thanks @Shekharrajak this is looking good overall. Would it be possible to add assertions to the new tests to assert that the plan (or the key part of the plan) is actually using Comet operators?
1354593 to
788bcab
Compare
788bcab assertCometNativeWrite is helping in validation of expected execution plan. Please have a look. |
|
|
||
| val listener = new org.apache.spark.sql.util.QueryExecutionListener { | ||
| override def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit = { | ||
| if (funcName == "command") { |
There was a problem hiding this comment.
we can also use this directly ?
if (qe.executedPlan.exists(_.isInstanceOf[DataWritingCommandExec])) {
capturedPlan = Some(qe)
}
There was a problem hiding this comment.
This did not work since we require stripAQEPlan
| val maxWaitTimeMs = 5000 | ||
| val checkIntervalMs = 50 | ||
| var iterations = 0 | ||
| while (capturedPlan.isEmpty && iterations < maxWaitTimeMs / checkIntervalMs) { |
There was a problem hiding this comment.
wait for sometime to make sure query plan is completed.
Which issue does this PR close?
Closes #3422
Rationale for this change
Comet bypasses Spark's logicalPlanOutputWithNames() entirely. It must explicitly use cmd.outputColumnNames (the table's actual column names) to achieve the same result.
What changes are included in this PR?
renames attributes in the logical plan (outputColumns) before passing to FileFormatWriter
How are these changes tested?
unit tests