Skip to content

[SPARK-56284] Adding UDF worker specification protobuf definition#55165

Open
sven-weber-db wants to merge 1 commit intoapache:masterfrom
sven-weber-db:spark-56324
Open

[SPARK-56284] Adding UDF worker specification protobuf definition#55165
sven-weber-db wants to merge 1 commit intoapache:masterfrom
sven-weber-db:spark-56324

Conversation

@sven-weber-db
Copy link
Copy Markdown

@sven-weber-db sven-weber-db commented Apr 2, 2026

What changes were proposed in this pull request?

This PR introduces the protobuf definitions for the UDF worker specification described in SPIP SPARK-55278 and this design document.

Overall, two new .proto files are introduced:

  • common.proto - Shared types and messages between the worker specification & the new UDF protocol (to be introduced)
  • worker_spec.proto - UDF worker specification

Why are the changes needed?

This is the first step toward a language-agnostic UDF protocol for Spark that enables UDF workers written in any language to communicate with the Spark engine through a well-defined specification and API boundary. The abstractions introduced here establish the core contract that concrete implementations (e.g., process-based or gRPC-based workers) will build on.

The worker specification introduced in this PR captures all the information Spark needs to:

  • Plan UDF execution (concurrency, supported UDF types, etc.)
  • Provision a UDF worker and connect to it for UDF invocation

Does this PR introduce any user-facing change?

No. All new APIs are marked @experimental, and there are no behavioral changes to existing code.

How was this patch tested?

  • Compilation of the proto files verified via both Maven and SBT.

Was this patch authored or co-authored using generative AI tooling?

Yes, in an assistive manner and for reviews.

@sven-weber-db sven-weber-db changed the title [WIP][SPARK-56284] Adding UDF worker specification protobuf definition [SPARK-56284] Adding UDF worker specification protobuf definition Apr 7, 2026
Copy link
Copy Markdown
Contributor

@cloud-fan cloud-fan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Summary

This PR fills in the previously placeholder UDFWorkerSpecification protobuf message with the full worker specification schema per SPIP SPARK-55278.

Design approach: Two proto files define a layered worker specification:

  • common.proto — shared types for reuse by both the worker spec and the forthcoming UDF protocol: UDFWorkerDataFormat (data serialization format), UDFShape/SparkUDFShapes (UDF execution shapes).
  • worker_spec.proto — the full specification: UDFWorkerSpecification composes WorkerEnvironment (lifecycle callables), WorkerCapabilities (data formats, UDF shapes, concurrency/reuse flags), and a DirectWorker (process callable + connection + timeout properties). Transport is abstracted via WorkerConnection (oneof of Unix domain socket or TCP).

Key design decisions:

  • ProcessCallable separates command (executable prefix) from arguments, with the engine injecting --id and --connection at invocation time.
  • oneof worker in UDFWorkerSpecification and oneof transport in WorkerConnection provide extension points for future worker provisioning strategies and transport types.
  • WorkerCapabilities.supports_concurrent_udfs is defined but explicitly deferred for future use.

General comments:

  • udf/worker/README.md (line 23) still says "UDFWorkerSpecification -- currently a placeholder" — should be updated now that the specification is filled in.
  • Spark Connect protos use (Required) / (Optional) annotations on field comments to clarify the application-level contract. For fields like supported_data_formats (where the comment says "Every worker MUST at least support ARROW"), such annotations would make the requirement immediately visible to proto consumers.

// engine-configurable maximum time (e.g. 30 seconds).
optional int32 graceful_termination_timeout_ms = 2;

// The connection this [[UDFWorker]] supports. Note that a single
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[[UDFWorker]] is not defined anywhere — no proto message, no Scala/Java class. The same dangling reference appears at lines 149 and 159. The closest entity is DirectWorker (line 101). Should these reference DirectWorker, or is UDFWorker a planned type not yet introduced?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch! [[UDFWorker]] was the name we previously used for DirectWorker. Before raising this PR, it was renamed, and I seem to have forgotten to update all references in the text to the old name. This should be fixed now. Thank you!

// ["\"echo 'Test'\""]
//
// Every executable will ALWAYS receive a
// --id argument. This argument CANNOT be part of the below list of arguments.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The --id argument is explicitly reserved here ("CANNOT be part of the below list of arguments"), but --connection (injected by the engine per lines 130–134) has no such restriction documented. A user including --connection in their arguments would conflict with the engine-injected value. Consider adding the same reservation for --connection.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, very good point. I have updated the description to a list of restricted values including both --id and --connection. Thank you!

}
}

enum SparkUDFShapes {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SparkUDFShapes uses plural naming, while UDFWorkerDataFormat in the same file uses singular. Proto convention recommends singular enum names — consider SparkUDFShape.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, thank you! Fixed.

Comment on lines +49 to +50
// produces iterator to a batch of rows as output.
MAP_PARTITIONS = 2;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Grammar — "a iterator" and missing article:

Suggested change
// produces iterator to a batch of rows as output.
MAP_PARTITIONS = 2;
// UDF receives an iterator to a batch of rows as input and
// produces an iterator to a batch of rows as output.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missed this - thank you!


// Which types of UDFs this worker supports.
// This should list all supported Shapes.
// Of which shape a specific UDF is will be communicated
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Awkward phrasing:

Suggested change
// Of which shape a specific UDF is will be communicated
// The shape of a specific UDF will be communicated

// Maximum amount of time to wait until the worker can accept connections.
//
// The engine will use this timeout, if it does not exceed a
// engine-configurable maximum time (e.g. 30 seconds).
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same issue at line 119.

Suggested change
// engine-configurable maximum time (e.g. 30 seconds).
// The engine will use this timeout, if it does not exceed an

// After this time, the worker process should have terminated itself.
// Otherwise, the process will be forcefully killed using SIGKILL.
//
// The engine will use this timeout, if it does not exceed a
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// The engine will use this timeout, if it does not exceed a
// The engine will use this timeout, if it does not exceed an

}
}

// Communication between the engine and worker
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Leading space before // — inconsistent with all other message-level comments:

Suggested change
// Communication between the engine and worker
// Communication between the engine and worker

// is done using a UNIX domain socket.
//
// On [[UDFWorker]] creation, a path to a socket
// to listen on is passed as a argument.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// to listen on is passed as a argument.
// to listen on is passed as an argument.

// ["python3", "-m"]
// ["worker.bin"]
// ["java", "worker.java"]
// ["bin/bash", "-c"]
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing leading /:

Suggested change
// ["bin/bash", "-c"]
// ["/bin/bash", "-c"]

Copy link
Copy Markdown
Author

@sven-weber-db sven-weber-db left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adjusted according to review comments

}
}

enum SparkUDFShapes {
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, thank you! Fixed.

Comment on lines +49 to +50
// produces iterator to a batch of rows as output.
MAP_PARTITIONS = 2;
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missed this - thank you!

// engine-configurable maximum time (e.g. 30 seconds).
optional int32 graceful_termination_timeout_ms = 2;

// The connection this [[UDFWorker]] supports. Note that a single
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch! [[UDFWorker]] was the name we previously used for DirectWorker. Before raising this PR, it was renamed, and I seem to have forgotten to update all references in the text to the old name. This should be fixed now. Thank you!

// ["\"echo 'Test'\""]
//
// Every executable will ALWAYS receive a
// --id argument. This argument CANNOT be part of the below list of arguments.
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, very good point. I have updated the description to a list of restricted values including both --id and --connection. Thank you!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants