diff --git a/DrQueue/client.py b/DrQueue/client.py index cbb5aeaa..f910104c 100644 --- a/DrQueue/client.py +++ b/DrQueue/client.py @@ -15,6 +15,7 @@ import time import pickle import datetime +import logging from IPython.parallel import Client as IPClient from IPython.parallel.util import unpack_apply_message from IPython.parallel import dependent @@ -23,6 +24,9 @@ from .computer import Computer as DrQueueComputer +log = logging.getLogger(__name__) + + class Client(): """DrQueue client actions""" def __init__(self): @@ -46,7 +50,6 @@ def job_run(self, job): # check job name if job['name'] in DrQueueJob.query_jobnames(): raise ValueError("Job name %s is already used!" % job['name']) - return False # save job in database job_id = DrQueueJob.store_db(job) @@ -72,21 +75,20 @@ def job_run(self, job): # check frame numbers if not (job['startframe'] >= 1): raise ValueError("Invalid value for startframe. Has to be equal or greater than 1.") - return False + if not (job['endframe'] >= 1): raise ValueError("Invalid value for endframe. Has to be equal or greater than 1.") - return False + if not (job['endframe'] >= job['startframe']): raise ValueError("Invalid value for endframe. Has be to equal or greater than startframe.") - return False + if job['endframe'] > job['startframe']: if not (job['endframe'] - job['startframe'] >= job['blocksize']): raise ValueError("Invalid value for blocksize. Has to be equal or lower than endframe-startframe.") - return False + if job['endframe'] == job['startframe']: if job['blocksize'] != 1: raise ValueError("Invalid value for blocksize. Has to be equal 1 if endframe equals startframe.") - return False task_frames = list(range(job['startframe'], job['endframe'] + 1, job['blocksize'])) ar = None @@ -242,22 +244,22 @@ def identify_computer(self, engine_id, cache_time, timeout=15): now = int(time.time()) # check existence and age of info if (engine != None) and (now <= engine['created_at'] + cache_time): - print("DEBUG: Engine %i was found in DB and info is up-to-date." % engine_id) + log.DEBUG("Engine %i was found in DB and info is up-to-date." % engine_id) return engine # store new info else: if engine != None: - print("DEBUG: Engine %i was found in DB, but info needs to be updated." % engine_id) + log.DEBUG("Engine %i was found in DB, but info needs to be updated." % engine_id) else: - print("DEBUG: Engine %i was not found in DB." % engine_id) + log.DEBUG("Engine %i was not found in DB." % engine_id) # run command only on specific computer try: dview = self.ip_client[engine_id] except IndexError: - print("DEBUG: Engine with id %i unknown." % engine_id) + log.DEBUG("Engine with id %i unknown." % engine_id) # delete old entry from database DrQueueComputer.delete_from_db_by_engine_id(engine_id) - print("DEBUG: Engine with id %i deleted from database." % engine_id) + log.DEBUG("Engine with id %i deleted from database." % engine_id) new_engine = None else: # run command in async mode @@ -269,10 +271,10 @@ def identify_computer(self, engine_id, cache_time, timeout=15): ar.get(timeout) except Exception: if engine != None: - print("DEBUG: Update request for engine %i timed out. Using old information from DB." % engine_id) + log.DEBUG("Update request for engine %i timed out. Using old information from DB." % engine_id) new_engine = engine else: - print("DEBUG: Information request for engine %i timed out." % engine_id) + log.DEBUG("Information request for engine %i timed out." % engine_id) new_engine = None else: # get computer dict from engine namespace @@ -298,7 +300,7 @@ def computer_set_pools(self, computer, pool_list): # update database entry computer['pools'] = pool_list DrQueueComputer.store_db(computer) - print("DEBUG: Engine " + str(computer['engine_id']) + " added to pools " + pool_str + ".") + log.DEBUG("Engine " + str(computer['engine_id']) + " added to pools " + pool_str + ".") return computer diff --git a/DrQueue/computer_pool.py b/DrQueue/computer_pool.py index 45a1c867..cac2a5a4 100644 --- a/DrQueue/computer_pool.py +++ b/DrQueue/computer_pool.py @@ -10,17 +10,39 @@ """ import os -import getpass +import logging + + +log = logging.getLogger(__name__) + + +try: + import pymongo + import bson +except ImportError as err: + log.debug("Can't import pymongo/bson: %s" % err) + pymongo = bson = None + + + +def get_queue_pools(): + if pymongo is None: + raise RuntimeError("pymongo is needed, please install it!") + + connection = pymongo.Connection(os.getenv('DRQUEUE_MONGODB')) + db = connection['ipythondb'] + pools = db['drqueue_pools'] + return pools class ComputerPool(dict): """Subclass of dict for collecting Pool attribute values.""" - def __init__(self, name, engine_names=[]): + + def __init__(self, name, engine_names=None): dict.__init__(self) - if type(engine_names).__name__ != 'list': + if isinstance(engine_names, list): raise ValueError("argument is not of type list") - return False # mandatory elements pool = { @@ -29,101 +51,66 @@ def __init__(self, name, engine_names=[]): } self.update(pool) - @staticmethod def store_db(pool): - import pymongo """store pool information in MongoDB""" - connection = pymongo.Connection(os.getenv('DRQUEUE_MONGODB')) - db = connection['ipythondb'] - pools = db['drqueue_pools'] + pools = get_queue_pools() pool_id = pools.insert(pool) pool['_id'] = str(pool['_id']) return pool_id - @staticmethod def update_db(pool): - import pymongo """update pool information in MongoDB""" - connection = pymongo.Connection(os.getenv('DRQUEUE_MONGODB')) - db = connection['ipythondb'] - pools = db['drqueue_pools'] + pools = get_queue_pools() pool_id = pools.save(pool) pool['_id'] = str(pool['_id']) return pool_id - @staticmethod def query_db(pool_id): - import pymongo - import bson """query pool information from MongoDB""" - connection = pymongo.Connection(os.getenv('DRQUEUE_MONGODB')) - db = connection['ipythondb'] - pools = db['drqueue_pools'] + pools = get_queue_pools() pool = pools.find_one({"_id": bson.ObjectId(pool_id)}) return pool - @staticmethod def delete_from_db(pool_id): - import pymongo - import bson """delete pool information from MongoDB""" - connection = pymongo.Connection(os.getenv('DRQUEUE_MONGODB')) - db = connection['ipythondb'] - pools = db['drqueue_pools'] + pools = get_queue_pools() return pools.remove({"_id": bson.ObjectId(pool_id)}) - @staticmethod def query_poolnames(): - import pymongo """query pool names from MongoDB""" - connection = pymongo.Connection(os.getenv('DRQUEUE_MONGODB')) - db = connection['ipythondb'] - pools = db['drqueue_pools'] + pools = get_queue_pools() names = [] for pool in pools.find(): names.append(pool['name']) return names - @staticmethod def query_pool_by_name(pool_name): - import pymongo """query pool information from MongoDB by name""" - connection = pymongo.Connection(os.getenv('DRQUEUE_MONGODB')) - db = connection['ipythondb'] - pools = db['drqueue_pools'] + pools = get_queue_pools() pool = pools.find_one({"name": pool_name}) return pool - @staticmethod def query_pool_list(): - import pymongo """query list of pools from MongoDB""" - connection = pymongo.Connection(os.getenv('DRQUEUE_MONGODB')) - db = connection['ipythondb'] - pools = db['drqueue_pools'] + pools = get_queue_pools() pool_arr = [] for pool in pools.find(): pool_arr.append(pool) return pool_arr - @staticmethod def query_pool_members(pool_name): - import pymongo """query list of members of pool from MongoDB""" - connection = pymongo.Connection(os.getenv('DRQUEUE_MONGODB')) - db = connection['ipythondb'] - pools = db['drqueue_pools'] + pools = get_queue_pools() pool = pools.find_one({"name": pool_name}) if pool == None: return None else: return list(pool['engine_names']) -