Source code for dbcollection.core.api.process

"""
Process API class.
"""


from __future__ import print_function
import os

from dbcollection.core.manager import CacheManager

from .metadata import MetadataConstructor


[docs]def process(name, task='default', verbose=True): """Process a dataset's metadata and stores it to file. The data is stored a a HSF5 file for each task composing the dataset's tasks. Parameters ---------- name : str Name of the dataset. task : str, optional Name of the task to process. verbose : bool, optional Displays text information (if true). Raises ------ KeyError If a task does not exist for a dataset. Examples -------- >>> import dbcollection as dbc Download the CIFAR10 dataset to disk. >>> dbc.process('cifar10', task='classification', verbose=False) """ assert name, 'Must input a valid dataset name.' processer = ProcessAPI(name=name, task=task, verbose=verbose) processer.run()
[docs]class ProcessAPI(object): """Dataset metadata process API class. This class contains methods to correctly process the dataset's data files and convert their metadata to disk. Parameters ---------- name : str Name of the dataset. task : str Name of the task to process. verbose : bool Displays text information (if true). Attributes ---------- name : str Name of the dataset. task : str Name of the task to process. verbose : bool Displays text information (if true). extract_data : bool Flag to extract data (if True). cache_manager : CacheManager Cache manager object. Raises ------ KeyError If a task does not exist for a dataset. """ def __init__(self, name, task, verbose): """Initialize class.""" assert isinstance(name, str), 'Must input a valid dataset name.' assert isinstance(task, str), 'Must input a valid task name.' assert isinstance(verbose, bool), "Must input a valid boolean for verbose." self.name = name self.task = task self.verbose = verbose self.extract_data = False self.cache_manager = self.get_cache_manager() def get_cache_manager(self): return CacheManager()
[docs] def run(self): """Main method.""" if self.verbose: print('==> Setup directories to store the data files.') self.set_dirs_processed_metadata() if self.verbose: print('==> Process \'{}\' metadata to disk...'.format(self.name)) task_info = self.process_dataset() if self.verbose: print('==> Updating the cache manager') self.update_cache(task_info) if self.verbose: print('==> Dataset processing complete.')
def set_dirs_processed_metadata(self): cache_dir = self.get_dataset_cache_dir_path() self.create_dir(cache_dir) def get_dataset_cache_dir_path(self): cache_dir = self.get_cache_dir_path_from_cache() return os.path.join(cache_dir, self.name) def get_cache_dir_path_from_cache(self): return self.cache_manager.info.cache_dir
[docs] def create_dir(self, path): """Create a directory in the disk.""" if not os.path.exists(path): os.makedirs(path)
[docs] def process_dataset(self): """Process the dataset's metadata.""" data_dir = self.get_dataset_data_dir_path() cache_dir = self.get_dataset_cache_dir_path() task = self.parse_task_name(self.task) self.check_if_task_exists_in_database(task) return self.process_dataset_metadata(data_dir, cache_dir, task)
def get_dataset_constructor(self): db_metadata = self.get_dataset_metadata_obj(self.name) constructor = db_metadata.get_constructor() return constructor def get_dataset_metadata_obj(self, name): return MetadataConstructor(name) def get_dataset_data_dir_path(self): dataset_metadata = self.get_dataset_metadata_from_cache() data_dir = dataset_metadata['data_dir'] if not self.name == os.path.basename(data_dir): data_dir = os.path.join(data_dir, self.name) return data_dir def get_dataset_metadata_from_cache(self): return self.cache_manager.dataset.get(self.name)
[docs] def parse_task_name(self, task): """Parse the input task string.""" if task == '' or task == 'default': task_parsed = self.get_default_task() else: task_parsed = task return task_parsed
[docs] def get_default_task(self): """Returns the default task for this dataset.""" db_metadata = self.get_dataset_metadata_obj(self.name) default_task = db_metadata.get_default_task() return default_task
[docs] def check_if_task_exists_in_database(self, task): """Check if task exists in the list of available tasks for processing.""" if not self.exists_task(task): raise KeyError('The task \'{}\' does not exists for loading/processing.' .format(self.task))
[docs] def exists_task(self, task): """Checks if a task exists for a dataset.""" db_metadata = self.get_dataset_metadata_obj(self.name) return task in db_metadata.get_tasks()
def process_dataset_metadata(self, data_dir, cache_dir, task): constructor = self.get_dataset_constructor() db = constructor(data_path=data_dir, cache_path=cache_dir, extract_data=self.extract_data, verbose=self.verbose) task_info = db.process(task) return task_info
[docs] def update_cache(self, task_info): """Update the cache manager information for this dataset.""" cache_dir = self.get_dataset_cache_dir_path() self.cache_manager.dataset.update(name=self.name, cache_dir=cache_dir, tasks=task_info)