Apache Spark Custom Data Source
In today’s big data computing world, Apache Spark is the most popular distributed execution engine. Fairly justifying its popularity, Apache Spark can connect to multiple data sources natively. Different data sources that Spark supports are Parquet, CSV, Text, JDBC, AVRO, ORC, HIVE, Kafka, Azure Cosmos, Amazon S3, Redshift, etc.
Parquet is the default format for Spark unless specified otherwise.
Apache Spark also supports Kafka through external integration. Apache Ignite has also provided its integration with Apache Spark which can be used to support Ignite cache as a data source natively through IgniteDataFrame.
Most of the time, existing data source connectors will solve our purpose. But, some of the time, when we need to work with legacy datastores, we will prefer implementing a custom data source to have more control. If this is your case, you are in the right place. In this article, we will discuss how to implement a custom data source and its advantages.
Apache Spark SQL has made integration with third-party data sources possible with DataSourceV1 interfaces like RelationProvider, DataSourceRegister, and BaseRelation. Over a period of time, Spark Datasource has evolved. With version 2.3.0, Spark has introduced a new cleaner way, DataSourceV2, to integrate any legacy data store with Spark SQL and removed the limitations that were there with DataSourceV1.
Suppose, we are working on a legacy data store and want to read data to perform a set of operations using Spark. We have two ways to do that.
- Fetch data from the datastore and create a dataset from it and perform operations on it.
- Define a custom spark data source for that legacy datastore, fetch data through the dataset interface directly and perform operations on it.
Now, let’s see how we can define a custom spark data source for the legacy data store. For time being, let’s assume CSV is our legacy data store.
The first interface that we need to implement is DataSourceV2. This is just a marker interface that identifies the implementation as a Spark Datasource of version v2.
/**
* Custom spark data source
*/
public class CSV implements DataSourceV2 { public CSV() { }
}
As mentioned in the documentation of this interface, an implementation must have a public, 0-argument constructor.
This is a dummy implementation without reading or writing support.
Read support
To add read support to this custom data source, we need to implement the ReadSupport interface.
public class CSV implements DataSourceV2, ReadSupport { public CSV() { } @Override
public DataSourceReader createReader(StructType schema, DataSourceOptions options) {
return null;
} @Override
public DataSourceReader createReader(DataSourceOptions options){
return null;
}
}
DataSourceReader instance needs to be created for the given schema (if specified) with options. DataSourceReader gathers optimizations and delegates those to InputPartitionReader of each partition which actually fetches the data.
public class CSVDataSourceReader implements DataSourceReader{
@Override
public StructType readSchema() {
return null;
}@Override
public List<InputPartition<InternalRow>> planInputPartitions() {
return Arrays.asList(new CSVInputPartition());
}
}public class CSVInputPartition implements InputPartition<InternalRow> {
@Override
public InputPartitionReader<InternalRow> createPartitionReader({
return new InputPartitionReader<InternalRow>() {
@Override
public boolean next() throws IOException {
return false;
}@Override
public InternalRow get() {
return null;
}@Override
public void close() throws IOException {}
};
}
}
Different optimizations that can be applied to DataSourceReader are SupportsPushDownFilters, SupportsPushDownRequiredColumns, etc. After applying these optimizations, DataSourceReader would look like as follows
public class CSVDataSourceReader implements DataSourceReader, SupportsPushDownFilters,SupportsPushDownRequiredColumns {
@Override
public StructType readSchema() {
return null;
}@Override
public List<InputPartition<InternalRow>> planInputPartitions() {
return Arrays.asList(new CSVInputPartition());
}@Override
public Filter[] pushFilters(Filter[] filters) {
return new Filter[0];
}@Override
public Filter[] pushedFilters() {
return new Filter[0];
}@Override
public void pruneColumns(StructType requiredSchema) {
}
}
Write support
To add write support to a custom data source, the WriteSupport interface needs to be implemented.
public class CSV implements DataSourceV2, ReadSupport, WriteSupport{public CSV() {}@Override
public DataSourceReader createReader(StructType schema, DataSourceOptions options) {
return null;
}@Override
public DataSourceReader createReader(DataSourceOptions options){
return null;
}@Override
public Optional<DataSourceWriter> createWriter(String writeUUID, StructType schema, SaveMode mode, DataSourceOptions options) {
return Optional.empty();
}
}
Now, let’s compare the timings of the following use-case for both the approaches.
Read a CSV file with 1 million records, perform group by using country column, perform sum aggregation on a few columns and order by records using one of the column and print records.
Dataset<Row> rowDataset = dataset.groupBy(dataset.col("Country"))
.agg(
functions.sum("Units Sold").as("Total_Units_Sold"),
functions.sum("TotalRevenue").as("Total_Revenue_Per_Country"),
functions.sum("Total Profit").as("Total_Profit_Per_Country")
)
.orderBy(functions.col("Total_Profit_Per_Country").desc());
Approach 1:
It took 49 seconds on an average over 5 readings.
Approach 2:
It took 8 seconds on an average over 5 readings.
The idea behind taking these performance numbers was not benchmarking but to get the general idea about the performance differences between two approaches. Thus, I haven’t used any benchmarking tools.
Clearly, a custom data source approach outperforms the first approach.
When we read data using the first approach, the in-memory structure, in this case, List<String[]>, becomes a data source, thus, the amount of data that can be read is limited to memory size. This can be viewed using explain() on a dataset.
LocalTableScan [Country#1, Units Sold#8, Total Revenue#11, Total Profit#13]
On the contrary, when data is read through a custom data source, if it doesn’t fit in the memory, spark spills it to the disk. So, memory constraint is not an issue. Thus, using a custom data source we can load 4 GB files in 2 GB executor memory and perform operations on them like join, groupBy, etc.
ScanV2 CSV[Region#0, Country#1, Item Type#2, Sales Channel#3, Order Priority#4, Order Date#5, Order ID#6, Ship Date#7, Units Sold#8, Unit Price#9, Unit Cost#10, Total Revenue#11, Total Cost#12, Total Profit#13] (Options: [filepath=/home/amar/git repository/big-data-projects/spark/CustomSparkSource/src/main/resources/…)
To conclude, it is observed that when a custom data source is used for legacy datastores, it shows multi-fold performance gains over the first approach.
The complete source code for this example can be found here https://www.bugdbug.com/post/speed-up-apache-spark-operations-using-a-custom-data-source
If you like this article, check out similar articles here https://www.bugdbug.com
Feel free to share your thoughts, comments.
If you find this article helpful, share it with a friend!