From 6266e3d08da8a80bba64db7c662ef80a1805f270 Mon Sep 17 00:00:00 2001 From: JensDiemer Date: Wed, 22 Jul 2015 22:50:22 +0200 Subject: [PATCH 1/7] not needed, because of the raise --- DrQueue/client.py | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/DrQueue/client.py b/DrQueue/client.py index cbb5aeaa..2f693603 100644 --- a/DrQueue/client.py +++ b/DrQueue/client.py @@ -46,7 +46,6 @@ def job_run(self, job): # check job name if job['name'] in DrQueueJob.query_jobnames(): raise ValueError("Job name %s is already used!" % job['name']) - return False # save job in database job_id = DrQueueJob.store_db(job) @@ -72,21 +71,20 @@ def job_run(self, job): # check frame numbers if not (job['startframe'] >= 1): raise ValueError("Invalid value for startframe. Has to be equal or greater than 1.") - return False + if not (job['endframe'] >= 1): raise ValueError("Invalid value for endframe. Has to be equal or greater than 1.") - return False + if not (job['endframe'] >= job['startframe']): raise ValueError("Invalid value for endframe. Has be to equal or greater than startframe.") - return False + if job['endframe'] > job['startframe']: if not (job['endframe'] - job['startframe'] >= job['blocksize']): raise ValueError("Invalid value for blocksize. Has to be equal or lower than endframe-startframe.") - return False + if job['endframe'] == job['startframe']: if job['blocksize'] != 1: raise ValueError("Invalid value for blocksize. Has to be equal 1 if endframe equals startframe.") - return False task_frames = list(range(job['startframe'], job['endframe'] + 1, job['blocksize'])) ar = None From 23068d196592ae46d0fe9b81393c2b8b82c9fc67 Mon Sep 17 00:00:00 2001 From: JensDiemer Date: Wed, 22 Jul 2015 22:55:43 +0200 Subject: [PATCH 2/7] use logging?!? --- DrQueue/client.py | 20 ++++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/DrQueue/client.py b/DrQueue/client.py index 2f693603..f910104c 100644 --- a/DrQueue/client.py +++ b/DrQueue/client.py @@ -15,6 +15,7 @@ import time import pickle import datetime +import logging from IPython.parallel import Client as IPClient from IPython.parallel.util import unpack_apply_message from IPython.parallel import dependent @@ -23,6 +24,9 @@ from .computer import Computer as DrQueueComputer +log = logging.getLogger(__name__) + + class Client(): """DrQueue client actions""" def __init__(self): @@ -240,22 +244,22 @@ def identify_computer(self, engine_id, cache_time, timeout=15): now = int(time.time()) # check existence and age of info if (engine != None) and (now <= engine['created_at'] + cache_time): - print("DEBUG: Engine %i was found in DB and info is up-to-date." % engine_id) + log.DEBUG("Engine %i was found in DB and info is up-to-date." % engine_id) return engine # store new info else: if engine != None: - print("DEBUG: Engine %i was found in DB, but info needs to be updated." % engine_id) + log.DEBUG("Engine %i was found in DB, but info needs to be updated." % engine_id) else: - print("DEBUG: Engine %i was not found in DB." % engine_id) + log.DEBUG("Engine %i was not found in DB." % engine_id) # run command only on specific computer try: dview = self.ip_client[engine_id] except IndexError: - print("DEBUG: Engine with id %i unknown." % engine_id) + log.DEBUG("Engine with id %i unknown." % engine_id) # delete old entry from database DrQueueComputer.delete_from_db_by_engine_id(engine_id) - print("DEBUG: Engine with id %i deleted from database." % engine_id) + log.DEBUG("Engine with id %i deleted from database." % engine_id) new_engine = None else: # run command in async mode @@ -267,10 +271,10 @@ def identify_computer(self, engine_id, cache_time, timeout=15): ar.get(timeout) except Exception: if engine != None: - print("DEBUG: Update request for engine %i timed out. Using old information from DB." % engine_id) + log.DEBUG("Update request for engine %i timed out. Using old information from DB." % engine_id) new_engine = engine else: - print("DEBUG: Information request for engine %i timed out." % engine_id) + log.DEBUG("Information request for engine %i timed out." % engine_id) new_engine = None else: # get computer dict from engine namespace @@ -296,7 +300,7 @@ def computer_set_pools(self, computer, pool_list): # update database entry computer['pools'] = pool_list DrQueueComputer.store_db(computer) - print("DEBUG: Engine " + str(computer['engine_id']) + " added to pools " + pool_str + ".") + log.DEBUG("Engine " + str(computer['engine_id']) + " added to pools " + pool_str + ".") return computer From c0119bdd1d5122cb0826b3bdf83635e3b121f8e7 Mon Sep 17 00:00:00 2001 From: JensDiemer Date: Wed, 22 Jul 2015 23:18:00 +0200 Subject: [PATCH 3/7] move imports to top --- DrQueue/computer_pool.py | 29 +++++++++++------------------ 1 file changed, 11 insertions(+), 18 deletions(-) diff --git a/DrQueue/computer_pool.py b/DrQueue/computer_pool.py index 45a1c867..1ed31fb9 100644 --- a/DrQueue/computer_pool.py +++ b/DrQueue/computer_pool.py @@ -12,6 +12,9 @@ import os import getpass +import pymongo +import bson + class ComputerPool(dict): """Subclass of dict for collecting Pool attribute values.""" @@ -32,7 +35,7 @@ def __init__(self, name, engine_names=[]): @staticmethod def store_db(pool): - import pymongo + """store pool information in MongoDB""" connection = pymongo.Connection(os.getenv('DRQUEUE_MONGODB')) db = connection['ipythondb'] @@ -41,10 +44,9 @@ def store_db(pool): pool['_id'] = str(pool['_id']) return pool_id - @staticmethod def update_db(pool): - import pymongo + """update pool information in MongoDB""" connection = pymongo.Connection(os.getenv('DRQUEUE_MONGODB')) db = connection['ipythondb'] @@ -53,11 +55,9 @@ def update_db(pool): pool['_id'] = str(pool['_id']) return pool_id - @staticmethod def query_db(pool_id): - import pymongo - import bson + """query pool information from MongoDB""" connection = pymongo.Connection(os.getenv('DRQUEUE_MONGODB')) db = connection['ipythondb'] @@ -65,21 +65,18 @@ def query_db(pool_id): pool = pools.find_one({"_id": bson.ObjectId(pool_id)}) return pool - @staticmethod def delete_from_db(pool_id): - import pymongo - import bson + """delete pool information from MongoDB""" connection = pymongo.Connection(os.getenv('DRQUEUE_MONGODB')) db = connection['ipythondb'] pools = db['drqueue_pools'] return pools.remove({"_id": bson.ObjectId(pool_id)}) - @staticmethod def query_poolnames(): - import pymongo + """query pool names from MongoDB""" connection = pymongo.Connection(os.getenv('DRQUEUE_MONGODB')) db = connection['ipythondb'] @@ -89,10 +86,9 @@ def query_poolnames(): names.append(pool['name']) return names - @staticmethod def query_pool_by_name(pool_name): - import pymongo + """query pool information from MongoDB by name""" connection = pymongo.Connection(os.getenv('DRQUEUE_MONGODB')) db = connection['ipythondb'] @@ -100,10 +96,9 @@ def query_pool_by_name(pool_name): pool = pools.find_one({"name": pool_name}) return pool - @staticmethod def query_pool_list(): - import pymongo + """query list of pools from MongoDB""" connection = pymongo.Connection(os.getenv('DRQUEUE_MONGODB')) db = connection['ipythondb'] @@ -112,11 +107,10 @@ def query_pool_list(): for pool in pools.find(): pool_arr.append(pool) return pool_arr - @staticmethod def query_pool_members(pool_name): - import pymongo + """query list of members of pool from MongoDB""" connection = pymongo.Connection(os.getenv('DRQUEUE_MONGODB')) db = connection['ipythondb'] @@ -126,4 +120,3 @@ def query_pool_members(pool_name): return None else: return list(pool['engine_names']) - From 7c1aa5dc5af9c407255e2af0d600d2ee8919601e Mon Sep 17 00:00:00 2001 From: JensDiemer Date: Wed, 22 Jul 2015 23:18:35 +0200 Subject: [PATCH 4/7] not needed --- DrQueue/computer_pool.py | 11 +---------- 1 file changed, 1 insertion(+), 10 deletions(-) diff --git a/DrQueue/computer_pool.py b/DrQueue/computer_pool.py index 1ed31fb9..c10785e9 100644 --- a/DrQueue/computer_pool.py +++ b/DrQueue/computer_pool.py @@ -10,7 +10,6 @@ """ import os -import getpass import pymongo import bson @@ -18,12 +17,12 @@ class ComputerPool(dict): """Subclass of dict for collecting Pool attribute values.""" + def __init__(self, name, engine_names=[]): dict.__init__(self) if type(engine_names).__name__ != 'list': raise ValueError("argument is not of type list") - return False # mandatory elements pool = { @@ -35,7 +34,6 @@ def __init__(self, name, engine_names=[]): @staticmethod def store_db(pool): - """store pool information in MongoDB""" connection = pymongo.Connection(os.getenv('DRQUEUE_MONGODB')) db = connection['ipythondb'] @@ -46,7 +44,6 @@ def store_db(pool): @staticmethod def update_db(pool): - """update pool information in MongoDB""" connection = pymongo.Connection(os.getenv('DRQUEUE_MONGODB')) db = connection['ipythondb'] @@ -57,7 +54,6 @@ def update_db(pool): @staticmethod def query_db(pool_id): - """query pool information from MongoDB""" connection = pymongo.Connection(os.getenv('DRQUEUE_MONGODB')) db = connection['ipythondb'] @@ -67,7 +63,6 @@ def query_db(pool_id): @staticmethod def delete_from_db(pool_id): - """delete pool information from MongoDB""" connection = pymongo.Connection(os.getenv('DRQUEUE_MONGODB')) db = connection['ipythondb'] @@ -76,7 +71,6 @@ def delete_from_db(pool_id): @staticmethod def query_poolnames(): - """query pool names from MongoDB""" connection = pymongo.Connection(os.getenv('DRQUEUE_MONGODB')) db = connection['ipythondb'] @@ -88,7 +82,6 @@ def query_poolnames(): @staticmethod def query_pool_by_name(pool_name): - """query pool information from MongoDB by name""" connection = pymongo.Connection(os.getenv('DRQUEUE_MONGODB')) db = connection['ipythondb'] @@ -98,7 +91,6 @@ def query_pool_by_name(pool_name): @staticmethod def query_pool_list(): - """query list of pools from MongoDB""" connection = pymongo.Connection(os.getenv('DRQUEUE_MONGODB')) db = connection['ipythondb'] @@ -110,7 +102,6 @@ def query_pool_list(): @staticmethod def query_pool_members(pool_name): - """query list of members of pool from MongoDB""" connection = pymongo.Connection(os.getenv('DRQUEUE_MONGODB')) db = connection['ipythondb'] From 1e2fab3071aaebf0e938b44486c17d9670f388a7 Mon Sep 17 00:00:00 2001 From: JensDiemer Date: Wed, 22 Jul 2015 23:24:55 +0200 Subject: [PATCH 5/7] merge db connect code --- DrQueue/computer_pool.py | 40 +++++++++++++++------------------------- 1 file changed, 15 insertions(+), 25 deletions(-) diff --git a/DrQueue/computer_pool.py b/DrQueue/computer_pool.py index c10785e9..119b7f89 100644 --- a/DrQueue/computer_pool.py +++ b/DrQueue/computer_pool.py @@ -15,6 +15,13 @@ import bson +def get_queue_pools(): + connection = pymongo.Connection(os.getenv('DRQUEUE_MONGODB')) + db = connection['ipythondb'] + pools = db['drqueue_pools'] + return pools + + class ComputerPool(dict): """Subclass of dict for collecting Pool attribute values.""" @@ -31,13 +38,10 @@ def __init__(self, name, engine_names=[]): } self.update(pool) - @staticmethod def store_db(pool): """store pool information in MongoDB""" - connection = pymongo.Connection(os.getenv('DRQUEUE_MONGODB')) - db = connection['ipythondb'] - pools = db['drqueue_pools'] + pools = get_queue_pools() pool_id = pools.insert(pool) pool['_id'] = str(pool['_id']) return pool_id @@ -45,9 +49,7 @@ def store_db(pool): @staticmethod def update_db(pool): """update pool information in MongoDB""" - connection = pymongo.Connection(os.getenv('DRQUEUE_MONGODB')) - db = connection['ipythondb'] - pools = db['drqueue_pools'] + pools = get_queue_pools() pool_id = pools.save(pool) pool['_id'] = str(pool['_id']) return pool_id @@ -55,26 +57,20 @@ def update_db(pool): @staticmethod def query_db(pool_id): """query pool information from MongoDB""" - connection = pymongo.Connection(os.getenv('DRQUEUE_MONGODB')) - db = connection['ipythondb'] - pools = db['drqueue_pools'] + pools = get_queue_pools() pool = pools.find_one({"_id": bson.ObjectId(pool_id)}) return pool @staticmethod def delete_from_db(pool_id): """delete pool information from MongoDB""" - connection = pymongo.Connection(os.getenv('DRQUEUE_MONGODB')) - db = connection['ipythondb'] - pools = db['drqueue_pools'] + pools = get_queue_pools() return pools.remove({"_id": bson.ObjectId(pool_id)}) @staticmethod def query_poolnames(): """query pool names from MongoDB""" - connection = pymongo.Connection(os.getenv('DRQUEUE_MONGODB')) - db = connection['ipythondb'] - pools = db['drqueue_pools'] + pools = get_queue_pools() names = [] for pool in pools.find(): names.append(pool['name']) @@ -83,18 +79,14 @@ def query_poolnames(): @staticmethod def query_pool_by_name(pool_name): """query pool information from MongoDB by name""" - connection = pymongo.Connection(os.getenv('DRQUEUE_MONGODB')) - db = connection['ipythondb'] - pools = db['drqueue_pools'] + pools = get_queue_pools() pool = pools.find_one({"name": pool_name}) return pool @staticmethod def query_pool_list(): """query list of pools from MongoDB""" - connection = pymongo.Connection(os.getenv('DRQUEUE_MONGODB')) - db = connection['ipythondb'] - pools = db['drqueue_pools'] + pools = get_queue_pools() pool_arr = [] for pool in pools.find(): pool_arr.append(pool) @@ -103,9 +95,7 @@ def query_pool_list(): @staticmethod def query_pool_members(pool_name): """query list of members of pool from MongoDB""" - connection = pymongo.Connection(os.getenv('DRQUEUE_MONGODB')) - db = connection['ipythondb'] - pools = db['drqueue_pools'] + pools = get_queue_pools() pool = pools.find_one({"name": pool_name}) if pool == None: return None From c5e8b126c2025ce3bb913c75bb8b908fec9409e3 Mon Sep 17 00:00:00 2001 From: JensDiemer Date: Wed, 22 Jul 2015 23:25:05 +0200 Subject: [PATCH 6/7] better solution --- DrQueue/computer_pool.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/DrQueue/computer_pool.py b/DrQueue/computer_pool.py index 119b7f89..11aa1a34 100644 --- a/DrQueue/computer_pool.py +++ b/DrQueue/computer_pool.py @@ -25,10 +25,10 @@ def get_queue_pools(): class ComputerPool(dict): """Subclass of dict for collecting Pool attribute values.""" - def __init__(self, name, engine_names=[]): + def __init__(self, name, engine_names=None): dict.__init__(self) - if type(engine_names).__name__ != 'list': + if isinstance(engine_names, list): raise ValueError("argument is not of type list") # mandatory elements From 2a21ad42a1ca3b435672094dd7aa1dae70ce6324 Mon Sep 17 00:00:00 2001 From: JensDiemer Date: Fri, 24 Jul 2015 17:28:35 +0200 Subject: [PATCH 7/7] made pymongo/bson import "optional" see: https://github.com/jedie/DrQueueIPython/commit/c0119bdd1d5122cb0826b3bdf83635e3b121f8e7#commitcomment-12308414 --- DrQueue/computer_pool.py | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/DrQueue/computer_pool.py b/DrQueue/computer_pool.py index 11aa1a34..cac2a5a4 100644 --- a/DrQueue/computer_pool.py +++ b/DrQueue/computer_pool.py @@ -10,12 +10,25 @@ """ import os +import logging + + +log = logging.getLogger(__name__) + + +try: + import pymongo + import bson +except ImportError as err: + log.debug("Can't import pymongo/bson: %s" % err) + pymongo = bson = None -import pymongo -import bson def get_queue_pools(): + if pymongo is None: + raise RuntimeError("pymongo is needed, please install it!") + connection = pymongo.Connection(os.getenv('DRQUEUE_MONGODB')) db = connection['ipythondb'] pools = db['drqueue_pools']