Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-50849][Connect] Add example project to demonstrate Spark Connect Server Libraries #49604

Open
wants to merge 4 commits into
base: master
Choose a base branch
from

Conversation

vicennial
Copy link
Contributor

@vicennial vicennial commented Jan 22, 2025

What changes were proposed in this pull request?

This PR adds a sample project, server-library-example (under a new directory connect-examples) to demonstrate the workings of using Spark Connect Server Libraries (see #48922 for context).
The sample project contains several modules (common, server and client) to showcase how a user may choose to extend the Spark Connect protocol with custom functionality.

Why are the changes needed?

Currently, there are limited resources and documentation to aid a user in building their own Spark Connect Server Libraries. This PR aims to bridge this gap by providing an exoskeleton of a project to work with.

Does this PR introduce any user-facing change?

No

How was this patch tested?

N/A

Was this patch authored or co-authored using generative AI tooling?

Generated-by: Copilot

-------------------- Render of README.md below ----------------

Spark Server Library Example - Custom Datasource Handler

This example demonstrates a modular maven-based project architecture with separate client, server
and common components. It leverages the extensibility of Spark Connect to create a server library
that may be attached to the server to extend the functionality of the Spark Connect server as a whole. Below is a detailed overview of the setup and functionality.

Project Structure

├── common/                # Shared protobuf/utilities/classes
├── client/                # Sample client implementation 
│   ├── src/               # Source code for client functionality
│   ├── pom.xml            # Maven configuration for the client
├── server/                # Server-side plugin extension
│   ├── src/               # Source code for server functionality
│   ├── pom.xml            # Maven configuration for the server
├── resources/             # Static resources
├── pom.xml                # Parent Maven configuration

Functionality Overview

To demonstrate the extensibility of Spark Connect, a custom datasource handler, CustomTable is
implemented in the server module. The class handles reading, writing and processing data stored in
a custom format, here we simply use the .custom extension (which itself is a wrapper over .csv
files).

First and foremost, the client and the server must be able to communicate with each other through
custom messages that 'understand' our custom data format. This is achieved by defining custom
protobuf messages in the common module. The client and server modules both depend on the common
module to access these messages.

  • common/src/main/protobuf/base.proto: Defines the base CustomTable which is simply represented
    by a path and a name.
message CustomTable {
  string path = 1;
  string name = 2;
}
  • common/src/main/protobuf/commands.proto: Defines the custom commands that the client can send
    to the server. These commands are typically operations that the server can perform, such as cloning
    an existing custom table.
message CustomCommand {
  oneof command_type {
    CreateTable create_table = 1;
    CloneTable clone_table = 2;
  }
}
  • common/src/main/protobuf/relations.proto: Defines custom relations, which are a mechanism through which an optional input dataset is transformed into an
    output dataset such as a Scan.
message Scan {
  CustomTable table = 1;
}

On the client side, the CustomTable class mimics the style of Spark's Dataset API, allowing the
user to perform and chain operations on a CustomTable object.

On the server side, a similar CustomTable class is implemented to handle the core functionality of
reading, writing and processing data in the custom format. The plugins (CustomCommandPlugin and
CustomRelationPlugin) are responsible for processing the custom protobuf messages sent from the client
(those defined in the common module) and delegating the appropriate actions to the CustomTable.

Build and Run Instructions

  1. Navigate to the sample project from SPARK_HOME:

    cd connect-examples/server-library-example
  2. Build and package the modules:

    mvn clean package
  3. Download the 4.0.0-preview2 release to use as the Spark Connect Server:

  4. Copy relevant JARs to the root of the unpacked Spark distribution:

     cp \
     <SPARK_HOME>/connect-examples/server-library-example/resources/spark-daria_2.13-1.2.3.jar \
     <SPARK_HOME>/connect-examples/server-library-example/common/target/spark-server-library-example-common-1.0-SNAPSHOT.jar \
     <SPARK_HOME>/connect-examples/server-library-example/server/target/spark-server-library-example-server-extension-1.0-SNAPSHOT.jar \
     .
  5. Start the Spark Connect Server with the relevant JARs:

     bin/spark-connect-shell \
    --jars spark-server-library-example-server-extension,spark-server-library-example-common-1.0-SNAPSHOT.jar,spark-daria_2.13-1.2.3.jar \
    --conf spark.connect.extensions.relation.classes=org.example.CustomRelationPlugin \
    --conf spark.connect.extensions.command.classes=org.example.CustomCommandPlugin
  6. In a different terminal, navigate back to the root of the sample project and start the client:

    java -cp client/target/spark-server-library-client-package-scala-1.0-SNAPSHOT.jar org.example.Main
  7. Notice the printed output in the client terminal as well as the creation of the cloned table:

Explaining plan for custom table: sample_table with path: <SPARK_HOME>/spark/connect-examples/server-library-example/client/../resources/dummy_data.custom
== Parsed Logical Plan ==
Relation [id#2,name#3] csv
== Analyzed Logical Plan ==
id: int, name: string
Relation [id#2,name#3] csv
== Optimized Logical Plan ==
Relation [id#2,name#3] csv
== Physical Plan ==
FileScan csv [id#2,name#3] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/Users/venkata.gudesa/spark/connect-examples/server-library-example/resou..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:int,name:string>
Explaining plan for custom table: cloned_table with path: <SPARK_HOME>/connect-examples/server-library-example/client/../resources/cloned_data.custom
== Parsed Logical Plan ==
Relation [id#2,name#3] csv
== Analyzed Logical Plan ==
id: int, name: string
Relation [id#2,name#3] csv
== Optimized Logical Plan ==
Relation [id#2,name#3] csv
== Physical Plan ==
FileScan csv [id#2,name#3] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/Users/venkata.gudesa/spark/connect-examples/server-library-example/resou..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:int,name:string>

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does this do?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Answered in #49604 (comment)

*/
def flush(): Unit = {
// Write dataset to disk as a CSV file
DariaWriters.writeSingleFile(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How is this better than using dt.write?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The regular write operation creates a folder and files in the format mydata.csv/part-00000. For simplicity, I figured writing a single '.csv' would work best (but comes at the cost of requiring the spark-daria jar here)

import org.example.{CustomPluginBase, CustomTable}
import org.example.proto

class CustomRelationPlugin extends RelationPlugin with CustomPluginBase {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a bit if doc as to why this is needed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added!


import scala.collection.JavaConverters._

class CustomCommandPlugin extends CommandPlugin with CustomPluginBase {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a bit of doc here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added!

Copy link
Contributor

@hvanhovell hvanhovell left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good!

A couple of comments.

Can you make sure this is actually build as part of CI?

@vicennial
Copy link
Contributor Author

Thanks for the review @hvanhovell!

Can you make sure this is actually build as part of CI?

Do you mean to have CI compile this project? I explicitly had this unlinked to the parent Spark POM as it is meant to be 'standalone'

@@ -0,0 +1,133 @@
# Spark Server Library Example - Custom Datasource Handler
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Spark Server -> Spark Connect Server?

* limitations under the License.
*/

package org.example
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a little weird to have org.example in ASF repository.

Technically, we always use org.apache. prefix.

$ git grep '^package org' | grep -v org.apache | awk '{print $NF}' | sort | uniq -c

import org.apache.spark.connect.proto.Command
import org.example.proto
import org.example.proto.CreateTable.Column.{DataType => ProtoDataType}
import org.apache.spark.sql.{functions, Column, DataFrame, Dataset, Row, SparkSession}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Apache Spark has an ordering rule for import statement.

import com.google.protobuf.Any
import org.apache.spark.connect.proto.Command

object Main {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we have some meaningful name instead of org.example.Main?

<parent>
<groupId>org.example</groupId>
<artifactId>spark-server-library-example</artifactId>
<version>1.0-SNAPSHOT</version>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

?

2,Jane Smith
3,Bob Johnson
4,Alice Williams
5,Charlie Brown
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we use simply data.csv as a file name because this seems to be technically CSV format?

@@ -139,3 +139,4 @@ core/src/main/resources/org/apache/spark/ui/static/package.json
testCommitLog
.*\.har
.nojekyll
dummy_data.custom
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI, if you use .csv or .data extension for this new file, we don't need to touch this file.

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like an integration test suite for me, but I understand this will be used as a documentation. I left a few comments first as ASF code.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants