Easy Guide to Create a Custom Write Data Source in Apache Spark 3

Step by step guide for creating a custom transactional write data source in Apache Spark 3.0.x

Amar Gajbhiye
Level Up Coding

--

Photo by Glenn Carstens-Peters on Unsplash

This is a second article in the series to write a custom data source in Apache Spark 3.0.x. In the first article, we learned about data source APIs in Apache Spark 3.0.x, their significance and overview of the read APIs. First, we learned to create a simple custom read data source and then created a slightly complex location-aware, multi-partition read data source. In this article, we will learn to implement a custom write data source.

What will we learn?

  • In this article, we will learn about data source write APIs.
  • Significance of each interface and their deployment on the cluster for a distributed execution, driver vs executor nodes.
  • We will create a simple write data source by implementing all the necessary interfaces.
  • After that, we will update that and will add transaction support to it.
  • We will learn about task failure, retry mechanism, speculative tasks and how to achieve at-most-once behaviour.

Write data source APIs

Apache Spark data source has read and write capabilities. When we call dataset.write().format(output_format).save(), write APIs are used to save the data frame using a specified output format. Out of the box, Spark supports CSV, Parquet, JDBC, Orc, etc. as write formats.

Write Interfaces

  • TableProvider
  • Table

These two are common in read and write APIs. Following are write-only interfaces

  • SupportsWrite
  • WriteBuilder
  • BatchWrite
  • LogicalWriteInfo
  • PhysicalWriteInfo
  • DataWriterFactory
  • DataWriter
  • WriterCommitMessage

Now, let’s understand the importance of each of these interfaces in detail and create a write data source along the way.

TableProvider

As we have seen in the last article, the primary data source interfaceDatasourceV2, was removed and the new interface, TableProvider, is introduced. It is a base interface for all the custom data sources that don’t need to support DDL. Implementation of this interface should have a 0-argument public constructor, for Java, it is the default constructor. Let us see how it looks.

SupportsWrite

A Table interface defines a single logical entity which represents a structured data. It could be a file/folder for a file system based data source, a topic for Kafka, or a table for JDBC data source. It can be mixed with SupportsRead and SupportsWrite to add read and write capabilities respectively. The capabilities method returns all the capabilities of the table. For our simple write implementation, let us return BATCH_WRITE. A SupportsWrite interface has a newWriteBuilder()method, it returns WriteBuilderthat defines write behaviour for the data source.

WriteBuider

Now, let’s implement this interface and return its instance from JdbcTable implementation. WriteBuilder is used to create an instance of either BatchWrite for batch writing or StreamingWrite for writing a stream of data to a streaming data source. We have returned BATCH_WRITE from table capabilities, so we will implement buildForBatch().

The default behaviour this interface provides is the ability to append data to the given table using BatchWrite implementation. It can be used with other interfaces to provide added capabilities like overwrite ( SupportsOverwrite), truncate existing data (SupportsTruncate).

We will learn all about creating a streaming write data source in a separate article.

BatchWrite

As the name suggests, this interface is used for batch writing data to the given data source. It handles the writing job and the tasks that are created for each partition. A job-level commit and abort are handled in here. It returns the instance of DataWriterFactory. The instance of these three interfaces runs on the driver node where a writing job is created on calling dataset.write.save().

DataWriterFactory

As the name suggests, it is a factory to create a DataWriter instance. BatchWrite instance runs on a driver node. BatchWrite creates an instance of DataWriterFactory implementation and sends it to the executor nodes. An important thing to note here is to make it serializable.

DataWriter

This is the main interface which actually does the job of writing data to the destination system. For each data frame partition, an instance of DataWriter is created on an executor node using DataWriterFactory. A write(record)is called for each internal row. When all the records from a partition are written successfully, commit() is called. If we get any exception during writing these records, abort() is called. We can use these APIs to support partition level transaction. We will see more about that in a bit.

Spark writes each partition in parallel. So, DataWriter implementation does not need to be thread-safe. A separate DataWriter instance is created for each partition.

Other interfaces

  • LogicalWriteInfo: It carries source data frame schema, options needed to perform write operation and query id assigned to the job.
  • PhysicalWriteInfo: It carries partitions count information about the source data frame.
  • WriterCommitMessage: This is a marker interface. It can be used to transfer information about partition id, task id etc, from data writer on executor node to driver node.

That’s it. We have successfully created a custom write data source. Now, let us see how this can be used to write the existing data frame into a JDBC table.

Adding Transaction Support

With all these learnings in our kitty, let’s make this implementation more mature and let us see how transactional support can be added to this implementation.

As we have seen earlier, DataWriter interface has commit() and abort() methods. These can be used to implement transaction support on a partition level. For Jdbc type data sources following things can be done

  • Turn off the auto-commit on a connection.
  • Keep writing on a connection.
  • On writing entire partition data successfully, datawriter.commit() is called. We will call connection.commit() from this method.
  • If any exception is thrown from write(record), abort() is called. We can call connection.rollback() to revert the incomplete data.

For non-JDBC datasources like file data source, data could be written into the temporary storage like an in-memory buffer and later in commit(), could be written all at once or cleared completely on abort().

Job level transaction

A BatchWrite can be used to add a job level transaction support. This could be done using temporary storage. In our case, instead of passing a user-specified table name, we will pass a temporary table name. All the data writers for the partitions will write to that temporary table. On writing all partitions successfully, BatchWrite.commit( WriterCommitMessage[] writerCommitMessages) will be called. In this method, we will rename the table to the original one. On any exception, we will drop the temporary table in abort(WriterCommitMessage[] writerCommitMessages).

Task failure

In distributed systems, failures are very common. It could happen because of network interruption, node failure etc. A robust and reliable system like Apache Spark surely knows how to handle such failures. It has a retry mechanism for task failures. For every spark job, multiple tasks are run on executor nodes. In our write job, a separate task is created for writing each data source partition. Failure of an individual task should not entirely fail the writing job. For this same reason, Spark has a retry mechanism to execute a failed task on a different node. Number of retries can be configured through spark.task.maxFailures.

Data consistency should be the main objective while performing a write operation. Retry mechanism poses a challenge for a data source implementor. There are chances that some of the data will be written multiple times if the data source is not implemented correctly.

In our effort to add partition level transaction support, we are writing data on a connection with auto-commit turned off. We commit only when everything is written successfully otherwise discard everything using rollback. This makes sure that records are not written multiple times even when retry task runs because of failures.

Speculative tasks

Apache Spark has another amazing feature called Speculation. When one of the running tasks is taking too much time to finish, another speculative task is run for the same task on any other node. This means, now, there is more than one task running simultaneously to write the same data. In the case of read-only tasks, this doesn’t matter. Tasks that finishes reading earlier would be considered in the result and others would be ignored.

But, things are a little different in write jobs. If multiple tasks are running in parallel trying to write the same data to the destination system, consistency would be compromised. To avoid this, Spark uses commit co-ordinator to make sure that only one running tasks should commit data. The commit co-ordinator can be turned on through BatchWrite implementation using the following method

public class JdbcBatchWrite implements BatchWrite {

public boolean useCommitCoordinator() {
return true;
}
}

But, this will work only when we have a partition level transaction implemented. Otherwise, duplicate entries will get written into the destination system. Speculation feature can be configured using spark.speculation.

Summing it up

A custom write data source can be implemented for writing into the destination system. Compared to read, it is slightly complex, as transaction support needs to be added and wrong handling could lead to data inconsistency. We need to aware of retry mechanism and speculative tasks mechanism and how a partition level transaction handles data inconsistency issues that could arise because of these.

Complete source code for this example is shared here.

This example can be enhanced further by adding overwrite and truncate capabilities by implementing SupportOverwrite and SupportsTruncate interfaces.

--

--

Technology Enthusiast | Big Data Developer | Amateur Cricketer | Technical Lead Engineer @ eQ Technologic |