-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathrun_client_tests
More file actions
executable file
·112 lines (96 loc) · 3.29 KB
/
run_client_tests
File metadata and controls
executable file
·112 lines (96 loc) · 3.29 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
#!/usr/bin/env python
from crossflow import filehandling, tasks, clients
import tempfile
from pathlib import Path
def test_function_test(myclient, tmpdir):
def testit(f):
return f
fk = tasks.FunctionTask(testit)
fk.set_inputs(['f'])
fk.set_outputs(['x'])
p = tmpdir / "hello.txt"
p.write_text("content")
fh = filehandling.FileHandler()
pf = fh.load(p)
result = myclient.submit(fk, pf)
try:
assert isinstance(result.result(), filehandling.FileHandle)
except AssertionError:
print('Error: result.result() = {}'.format(result.result()))
raise
def test_function_test_no_filehandler(myclient, tmpdir):
def testit(f):
return f
fk = tasks.FunctionTask(testit)
fk.set_inputs(['f'])
fk.set_outputs(['x'])
p = tmpdir / "hello.txt"
p.write_text("content")
result = myclient.submit(fk, p)
try:
assert isinstance(result.result(), filehandling.FileHandle)
except AssertionError:
print('Error: result.result() = {}'.format(result.result()))
raise
def test_subprocess_test_data(myclient, tmpdir):
sk = tasks.SubprocessTask('cat file.txt')
sk.set_inputs(['file.txt'])
sk.set_outputs([tasks.STDOUT])
p = tmpdir / "hello.txt"
p.write_text("content")
fh = filehandling.FileHandler()
pf = fh.load(p)
ll = myclient.upload(pf)
result = myclient.submit(sk, ll)
try:
assert result.result() == 'content'
except AssertionError:
print('Error: result.result() = {}'.format(result.result()))
def test_subprocess_test_no_filehandler(myclient, tmpdir):
sk = tasks.SubprocessTask('cat file.txt')
sk.set_inputs(['file.txt'])
sk.set_outputs([tasks.STDOUT])
p = tmpdir / "hello.txt"
p.write_text("content")
result = myclient.submit(sk, p)
try:
assert result.result() == 'content'
except AssertionError:
print('Error: result.result() = {}'.format(result.result()))
def test_subprocess_test_file(myclient, tmpdir):
sk = tasks.SubprocessTask('cat file.txt')
sk.set_inputs(['file.txt'])
sk.set_outputs([tasks.STDOUT])
p = tmpdir / "hello.txt"
p.write_text("content")
fh = filehandling.FileHandler(tmpdir)
pf = fh.load(p)
ll = myclient.upload(pf)
result = myclient.submit(sk, ll)
try:
assert result.result() == 'content'
except AssertionError:
print('Error: result.result() = {}'.format(result.result()))
def test_subprocess_test_s3(myclient, tmpdir):
sk = tasks.SubprocessTask('cat file.txt')
sk.set_inputs(['file.txt'])
sk.set_outputs([tasks.STDOUT])
p = tmpdir / "hello.txt"
p.write_text("content")
fh = filehandling.FileHandler('s3://bucket_name')
fp = fh.load(p)
ll = myclient.upload(fp)
result = myclient.submit(sk, ll)
try:
assert result.result() == 'content'
except AssertionError:
print('Error: result.result() = {}'.format(result.result()))
if __name__ == '__main__':
myclient = clients.Client()
tmpdir = Path(tempfile.mkdtemp())
test_function_test(myclient, tmpdir)
test_function_test_no_filehandler(myclient, tmpdir)
test_subprocess_test_data(myclient, tmpdir)
test_subprocess_test_no_filehandler(myclient, tmpdir)
test_subprocess_test_file(myclient, tmpdir)
myclient.close()