Skip to content

Commit 046fd81

Browse files
committed
[SPARK-56284] Adding UDF worker specification protobuf definition
1 parent 15ffa54 commit 046fd81

File tree

2 files changed

+253
-0
lines changed

2 files changed

+253
-0
lines changed

udf/worker/proto/common.proto

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
syntax = "proto3";
19+
20+
package org.apache.spark.udf.worker;
21+
22+
option java_package = "org.apache.spark.udf.worker";
23+
24+
option java_multiple_files = true;
25+
26+
// The UDF in & output data format.
27+
enum UDFWorkerDataFormat {
28+
UDF_WORKER_DATA_FORMAT_UNSPECIFIED = 0;
29+
30+
// The worker accepts and produces Apache arrow batches.
31+
ARROW = 1;
32+
}
33+
34+
// The UDF execution type/shape.
35+
message UDFShape {
36+
oneof shape {
37+
SparkUDFShapes spark = 1;
38+
}
39+
}
40+
41+
enum SparkUDFShapes {
42+
SPARK_UDF_SHAPE_UNSPECIFIED = 0;
43+
44+
// UDF receives a row with 0+ columns as input
45+
// and produces a single, scalar value as output
46+
EXPRESSION = 1;
47+
48+
// UDF receives a iterator to batch of rows as input and
49+
// produces iterator to a batch of rows as output.
50+
MAP_PARTITIONS = 2;
51+
}

udf/worker/proto/worker_spec.proto

Lines changed: 202 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,202 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
syntax = "proto3";
19+
20+
import "udf/worker/proto/src/main/protobuf/common.proto";
21+
22+
package org.apache.spark.udf.worker;
23+
24+
option java_package = "org.apache.spark.udf.worker";
25+
option java_multiple_files = true;
26+
27+
///
28+
/// Worker specification
29+
///
30+
message UDFWorkerSpecification {
31+
WorkerEnvironment environment = 1;
32+
WorkerCapabilities capabilities = 2;
33+
34+
// How to create new workers.
35+
// At the moment, only direct creation is supported.
36+
// This can be extended with indirect/provisioned creation in the future.
37+
oneof worker {
38+
DirectWorker direct = 3;
39+
}
40+
}
41+
42+
message WorkerEnvironment {
43+
// Callable that is responsible for environment setup.
44+
optional ProcessCallable installation = 1;
45+
// Callable, which is called to verify that an environment
46+
// is suitable to start the UDF worker. This callable
47+
// needs to verify that
48+
// - The worker code itself is present
49+
// - Any needed dependencies are present
50+
optional ProcessCallable environment_verification = 2;
51+
// Callable, which is invoked after the worker has been terminated.
52+
// This can be used to cleanup dependencies or temporary resources.
53+
// Be careful not to cleanup resources that could be used by
54+
// other workers running in parallel.
55+
optional ProcessCallable environment_cleanup = 3;
56+
}
57+
58+
// Capabilities used for query planning
59+
message WorkerCapabilities {
60+
// The data formats that the worker supports for UDF data in- & output.
61+
// Every worker MUST at least support ARROW.
62+
//
63+
// It is expected that for each UDF execution, the input format
64+
// always matches the output format.
65+
//
66+
// If a worker supports multiple data formats, the engine will select
67+
// the most suitable one for each UDF invocation. Which format was chosen
68+
// is reported by the engine as part of the UDF protocol's init message.
69+
repeated UDFWorkerDataFormat supported_data_formats = 1;
70+
71+
// Which types of UDFs this worker supports.
72+
// This should list all supported Shapes.
73+
// Of which shape a specific UDF is will be communicated
74+
// in the initial message of the UDF protocol.
75+
//
76+
// If a execution for an unsupported UDF type is requested
77+
// the query will fail during query planning.
78+
repeated UDFShape supported_udf_shapes = 2;
79+
80+
// Whether multiple, concurrent UDF
81+
// connections are supported by this worker
82+
// (for example via multi-threading).
83+
//
84+
// In the first implementation of the engine-side
85+
// worker specification, this property will not be used.
86+
//
87+
// Usage of this property, can be enabled in the future if the
88+
// engine implements more advanced resource management (Tbd).
89+
//
90+
bool supports_concurrent_udfs = 3;
91+
92+
// Whether compatible workers may be reused.
93+
// If this is not supported, the worker is
94+
// terminated after every single UDF invocation.
95+
bool supports_reuse = 4;
96+
97+
// To be extended with UDF chaining, ...
98+
}
99+
100+
// The worker that will be created to process UDFs
101+
message DirectWorker {
102+
// Blocking callable that is terminated by SIGTERM/SIGKILL
103+
ProcessCallable runner = 1;
104+
105+
UDFWorkerProperties properties = 2;
106+
}
107+
108+
message UDFWorkerProperties {
109+
// Maximum amount of time to wait until the worker can accept connections.
110+
//
111+
// The engine will use this timeout, if it does not exceed a
112+
// engine-configurable maximum time (e.g. 30 seconds).
113+
optional int32 initialization_timeout_ms = 1;
114+
115+
// Used for graceful engine-initiated termination signaled via SIGTERM.
116+
// After this time, the worker process should have terminated itself.
117+
// Otherwise, the process will be forcefully killed using SIGKILL.
118+
//
119+
// The engine will use this timeout, if it does not exceed a
120+
// engine-configurable maximum time (e.g. 30 seconds).
121+
optional int32 graceful_termination_timeout_ms = 2;
122+
123+
// The connection this [[UDFWorker]] supports. Note that a single
124+
// connection is sufficient to run multiple UDFs and (gRPC) services.
125+
//
126+
// On [[UDFWorker]] creation, connection information
127+
// is passed to the callable as a string parameter.
128+
// The string format depends on the [[WorkerConnection]]:
129+
//
130+
// For example, when using TCP, the callable argument will be:
131+
// --connection PORT
132+
// Here is a concrete example
133+
// --connection 8080
134+
//
135+
// For the format of each specific transport type, see the comments below.
136+
WorkerConnection connection = 3;
137+
}
138+
139+
message WorkerConnection {
140+
oneof transport {
141+
UnixDomainSocket unix_domain_socket = 1;
142+
TcpConnection tcp = 2;
143+
}
144+
}
145+
146+
// Communication between the engine and worker
147+
// is done using a UNIX domain socket.
148+
//
149+
// On [[UDFWorker]] creation, a path to a socket
150+
// to listen on is passed as a argument.
151+
// Examples:
152+
// /tmp/channel-uuid.sock
153+
// /some/system/path/channel-1234.sock
154+
message UnixDomainSocket {}
155+
156+
// Communication between the engine and worker
157+
// is done using a localhost TCP connection.
158+
//
159+
// On [[UDFWorker]] creation, a PORT
160+
// is passed as a connection parameter.
161+
//
162+
// It is expected that the worker binds to this
163+
// port on both IPv4 and IPv6 localhost interfaces.
164+
// E.g. the worker server should be reachable via
165+
// 127.0.0.1:PORT and [::1]:PORT.
166+
//
167+
// Examples:
168+
// 8080
169+
// 1234
170+
message TcpConnection {}
171+
172+
message ProcessCallable {
173+
// Executable to invoke.
174+
// Examples:
175+
// ["python3", "-m"]
176+
// ["worker.bin"]
177+
// ["java", "worker.java"]
178+
// ["bin/bash", "-c"]
179+
// This executable should be blocking, until the task is finished.
180+
// Termination is requested via a SIGTERM signal.
181+
//
182+
// Success/Failure can be indicated via exit codes:
183+
// Exit code 0 -> Success
184+
// Exit code != 0 -> Failure
185+
repeated string command = 1;
186+
187+
// Arguments passed directly to the executable.
188+
// Examples:
189+
// ["udf_worker.py"]
190+
// [""]
191+
// ["--max_concurrency", "5"]
192+
// ["\"echo 'Test'\""]
193+
//
194+
// Every executable will ALWAYS receive a
195+
// --id argument. This argument CANNOT be part of the below list of arguments.
196+
// The value of the id argument is a string with the engine-assigned
197+
// id of this UDF Worker. This can be used in logs and other state information.
198+
repeated string arguments = 2;
199+
200+
// Environment variables for the invoked process.
201+
map<string, string> environment_variables = 3;
202+
}

0 commit comments

Comments
 (0)