Skip to content

Commit 04881c5

Browse files
authored
Make datastore.__init__(mode='w') a cheap operation (#448)
* Make datastore.__init__(mode='w') cheap * Make datastore.__init__(mode='w') cheap
1 parent 35207e9 commit 04881c5

5 files changed

Lines changed: 42 additions & 18 deletions

File tree

metaflow/datastore/datastore.py

Lines changed: 26 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -323,15 +323,18 @@ def __init__(self,
323323
event_logger=None,
324324
monitor=None,
325325
data_obj=None,
326-
artifact_cache=None):
326+
artifact_cache=None,
327+
allow_unsuccessful=False):
327328
if run_id == 'data':
328329
raise DataException("Run ID 'data' is reserved. "
329330
"Try with a different --run-id.")
330331
if self.datastore_root is None:
331332
raise DataException("Datastore root not found. "
332333
"Specify with METAFLOW_DATASTORE_SYSROOT_%s "
333334
"environment variable." % self.TYPE.upper())
334-
335+
# NOTE: calling __init__(mode='w') should be a cheap operation:
336+
# no file system accesses are allowed. It is called frequently
337+
# e.g. to resolve log file location.
335338
self.event_logger = event_logger if event_logger else NullEventLogger()
336339
self.monitor = monitor if monitor else NullMonitor()
337340
self.metadata = metadata
@@ -356,14 +359,7 @@ def __init__(self,
356359
task_id)
357360

358361
self.attempt = attempt
359-
if mode == 'w':
360-
if run_id is not None:
361-
# run_id may be None when datastore is used to save
362-
# things not related to runs, e.g. the job package
363-
self.save_metadata('attempt', {'time': time.time()})
364-
self.objects = {}
365-
self.info = {}
366-
elif mode == 'r':
362+
if mode == 'r':
367363
if data_obj is None:
368364
# what is the latest attempt ID of this data store?
369365

@@ -389,25 +385,37 @@ def __init__(self,
389385
self.attempt = i
390386

391387
# was the latest attempt completed successfully?
392-
if not self.is_done():
388+
if self.is_done():
389+
# load the data from the latest attempt
390+
data_obj = self.load_metadata('data')
391+
elif allow_unsuccessful and self.attempt is not None:
392+
# this mode can be used to load_logs, for instance
393+
data_obj = None
394+
else:
393395
raise DataException("Data was not found or not finished at %s"\
394396
% self.root)
395397

396-
# load the data from the latest attempt
397-
data_obj = self.load_metadata('data')
398-
399-
self.origin = data_obj.get('origin')
400-
self.objects = data_obj['objects']
401-
self.info = data_obj.get('info', {})
398+
if data_obj:
399+
self.origin = data_obj.get('origin')
400+
self.objects = data_obj['objects']
401+
self.info = data_obj.get('info', {})
402402
elif mode == 'd':
403403
# Direct access mode used by the client. We effectively don't load any
404404
# objects and can only access things using the load_* functions
405405
self.origin = None
406406
self.objects = None
407407
self.info = None
408-
else:
408+
elif mode != 'w':
409409
raise DataException('Unknown datastore mode: %s' % mode)
410410

411+
def init_task(self):
412+
# this method should be called once after datastore has been opened
413+
# for task-related write operations
414+
self.save_metadata('attempt', {'time': time.time()})
415+
self.objects = {}
416+
self.info = {}
417+
418+
411419
@property
412420
def pathspec(self):
413421
return '%s/%s/%s' % (self.run_id, self.step_name, self.task_id)

metaflow/metaflow_config.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,9 @@ def from_conf(name, default=None):
170170
# increasing this limit has real performance implications for all tasks.
171171
# Decreasing this limit is very unsafe, as it can lead to wrong results
172172
# being read from old tasks.
173+
#
174+
# Note also that DataStoreSet resolves the latest attempt_id using
175+
# lexicographic ordering of attempts. This won't work if MAX_ATTEMPTS > 99.
173176
MAX_ATTEMPTS = 6
174177

175178

metaflow/runtime.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -529,6 +529,7 @@ def new_attempt(self):
529529
attempt=self.retries,
530530
event_logger=self.event_logger,
531531
monitor=self.monitor)
532+
self._ds.init_task()
532533

533534

534535
def log(self, msg, system_msg=False, pid=None):

metaflow/task.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -215,6 +215,7 @@ def clone_only(self, step_name, run_id, task_id, clone_origin_task):
215215
attempt=0,
216216
event_logger=self.event_logger,
217217
monitor=self.monitor)
218+
output.init_task()
218219
origin_run_id, origin_step_name, origin_task_id =\
219220
clone_origin_task.split('/')
220221
# 2. initialize origin datastore
@@ -280,6 +281,7 @@ def run_step(self,
280281
attempt=retry_count,
281282
event_logger=self.event_logger,
282283
monitor=self.monitor)
284+
output.init_task()
283285

284286
if input_paths:
285287
# 2. initialize input datastores

metaflow/util.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,21 @@
2929
def unquote_bytes(x):
3030
return to_unicode(unquote(to_bytes(x)))
3131

32+
# this is used e.g. by datastore.save_logs to identify paths
33+
class Path(object):
34+
35+
def __init__(path):
36+
self.path = path
37+
38+
def __str__(self):
39+
return self.path
40+
3241
except:
3342
# python3
3443
unicode_type = str
3544
bytes_type = bytes
3645
from urllib.parse import quote, unquote
46+
from pathlib import Path
3747

3848
def unquote_bytes(x):
3949
return unquote(to_unicode(x))

0 commit comments

Comments
 (0)