-
Notifications
You must be signed in to change notification settings - Fork 37
Expand file tree
/
Copy pathreusable.py
More file actions
72 lines (57 loc) · 2.15 KB
/
reusable.py
File metadata and controls
72 lines (57 loc) · 2.15 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
import asyncio
from typing import List
import flyte
# PATH_TO_FASTTASK_WORKER = pathlib.Path("../../../private/flyte/fasttask/worker-v2")
#
# actor_image = (
# flyte.Image.from_debian_base(install_flyte=False)
# .with_apt_packages("curl", "build-essential", "ca-certificates", "pkg-config", "libssl-dev")
# .with_commands(["sh -c 'curl https://sh.rustup.rs -sSf | sh -s -- -y'"])
# .with_env_vars({"PATH": "/root/.cargo/bin:${PATH}"})
# .with_source_file(pathlib.Path(".dockerignore"))
# .with_source_folder(PATH_TO_FASTTASK_WORKER, "/root/fasttask")
# .with_pip_packages("uv")
# .with_workdir("/root/fasttask")
# .with_commands(["uv sync --reinstall --active"])
# .with_local_v2()
# )
actor_image = flyte.Image.from_debian_base().with_pip_packages("unionai-reuse")
env = flyte.TaskEnvironment(
name="reusable",
resources=flyte.Resources(memory="500Mi", cpu=1),
reusable=flyte.ReusePolicy(
replicas=(1, 4), # Min of 2 replicas are needed to ensure no-starvation of tasks.
idle_ttl=300,
),
image=actor_image,
)
@env.task
async def square(x: int) -> int:
return x**2
@env.task
async def cube(x: int) -> int:
return x**3
# Clone the environment with a different name and no reuse policy.
cloned_env = env.clone_with(name="nonreusable", reusable=None, depends_on=[env])
# This task will run in the cloned environment without reuse.
@cloned_env.task
async def main(n: int) -> List[int]:
"""
Run square and cube tasks in parallel for the range of x_list.
"""
square_coros = []
cube_coros = []
for x in range(n):
square_coros.append(square(x))
cube_coros.append(cube(x))
return await asyncio.gather(*square_coros) + await asyncio.gather(*cube_coros)
if __name__ == "__main__":
flyte.init_from_config() # establish remote connection from within your script.
run = flyte.run(main, n=30) # run remotely inline and pass data.
print(run.url)
run.wait() # wait for the run to finish.
# # print various attributes of the run.
# print(run.name)
# print(run.url)
#
# run.wait() # stream the logs from the root action to the terminal.