From 6266e3d08da8a80bba64db7c662ef80a1805f270 Mon Sep 17 00:00:00 2001 From: JensDiemer Date: Wed, 22 Jul 2015 22:50:22 +0200 Subject: [PATCH 01/15] not needed, because of the raise --- DrQueue/client.py | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/DrQueue/client.py b/DrQueue/client.py index cbb5aeaa..2f693603 100644 --- a/DrQueue/client.py +++ b/DrQueue/client.py @@ -46,7 +46,6 @@ def job_run(self, job): # check job name if job['name'] in DrQueueJob.query_jobnames(): raise ValueError("Job name %s is already used!" % job['name']) - return False # save job in database job_id = DrQueueJob.store_db(job) @@ -72,21 +71,20 @@ def job_run(self, job): # check frame numbers if not (job['startframe'] >= 1): raise ValueError("Invalid value for startframe. Has to be equal or greater than 1.") - return False + if not (job['endframe'] >= 1): raise ValueError("Invalid value for endframe. Has to be equal or greater than 1.") - return False + if not (job['endframe'] >= job['startframe']): raise ValueError("Invalid value for endframe. Has be to equal or greater than startframe.") - return False + if job['endframe'] > job['startframe']: if not (job['endframe'] - job['startframe'] >= job['blocksize']): raise ValueError("Invalid value for blocksize. Has to be equal or lower than endframe-startframe.") - return False + if job['endframe'] == job['startframe']: if job['blocksize'] != 1: raise ValueError("Invalid value for blocksize. Has to be equal 1 if endframe equals startframe.") - return False task_frames = list(range(job['startframe'], job['endframe'] + 1, job['blocksize'])) ar = None From 23068d196592ae46d0fe9b81393c2b8b82c9fc67 Mon Sep 17 00:00:00 2001 From: JensDiemer Date: Wed, 22 Jul 2015 22:55:43 +0200 Subject: [PATCH 02/15] use logging?!? --- DrQueue/client.py | 20 ++++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/DrQueue/client.py b/DrQueue/client.py index 2f693603..f910104c 100644 --- a/DrQueue/client.py +++ b/DrQueue/client.py @@ -15,6 +15,7 @@ import time import pickle import datetime +import logging from IPython.parallel import Client as IPClient from IPython.parallel.util import unpack_apply_message from IPython.parallel import dependent @@ -23,6 +24,9 @@ from .computer import Computer as DrQueueComputer +log = logging.getLogger(__name__) + + class Client(): """DrQueue client actions""" def __init__(self): @@ -240,22 +244,22 @@ def identify_computer(self, engine_id, cache_time, timeout=15): now = int(time.time()) # check existence and age of info if (engine != None) and (now <= engine['created_at'] + cache_time): - print("DEBUG: Engine %i was found in DB and info is up-to-date." % engine_id) + log.DEBUG("Engine %i was found in DB and info is up-to-date." % engine_id) return engine # store new info else: if engine != None: - print("DEBUG: Engine %i was found in DB, but info needs to be updated." % engine_id) + log.DEBUG("Engine %i was found in DB, but info needs to be updated." % engine_id) else: - print("DEBUG: Engine %i was not found in DB." % engine_id) + log.DEBUG("Engine %i was not found in DB." % engine_id) # run command only on specific computer try: dview = self.ip_client[engine_id] except IndexError: - print("DEBUG: Engine with id %i unknown." % engine_id) + log.DEBUG("Engine with id %i unknown." % engine_id) # delete old entry from database DrQueueComputer.delete_from_db_by_engine_id(engine_id) - print("DEBUG: Engine with id %i deleted from database." % engine_id) + log.DEBUG("Engine with id %i deleted from database." % engine_id) new_engine = None else: # run command in async mode @@ -267,10 +271,10 @@ def identify_computer(self, engine_id, cache_time, timeout=15): ar.get(timeout) except Exception: if engine != None: - print("DEBUG: Update request for engine %i timed out. Using old information from DB." % engine_id) + log.DEBUG("Update request for engine %i timed out. Using old information from DB." % engine_id) new_engine = engine else: - print("DEBUG: Information request for engine %i timed out." % engine_id) + log.DEBUG("Information request for engine %i timed out." % engine_id) new_engine = None else: # get computer dict from engine namespace @@ -296,7 +300,7 @@ def computer_set_pools(self, computer, pool_list): # update database entry computer['pools'] = pool_list DrQueueComputer.store_db(computer) - print("DEBUG: Engine " + str(computer['engine_id']) + " added to pools " + pool_str + ".") + log.DEBUG("Engine " + str(computer['engine_id']) + " added to pools " + pool_str + ".") return computer From c0119bdd1d5122cb0826b3bdf83635e3b121f8e7 Mon Sep 17 00:00:00 2001 From: JensDiemer Date: Wed, 22 Jul 2015 23:18:00 +0200 Subject: [PATCH 03/15] move imports to top --- DrQueue/computer_pool.py | 29 +++++++++++------------------ 1 file changed, 11 insertions(+), 18 deletions(-) diff --git a/DrQueue/computer_pool.py b/DrQueue/computer_pool.py index 45a1c867..1ed31fb9 100644 --- a/DrQueue/computer_pool.py +++ b/DrQueue/computer_pool.py @@ -12,6 +12,9 @@ import os import getpass +import pymongo +import bson + class ComputerPool(dict): """Subclass of dict for collecting Pool attribute values.""" @@ -32,7 +35,7 @@ def __init__(self, name, engine_names=[]): @staticmethod def store_db(pool): - import pymongo + """store pool information in MongoDB""" connection = pymongo.Connection(os.getenv('DRQUEUE_MONGODB')) db = connection['ipythondb'] @@ -41,10 +44,9 @@ def store_db(pool): pool['_id'] = str(pool['_id']) return pool_id - @staticmethod def update_db(pool): - import pymongo + """update pool information in MongoDB""" connection = pymongo.Connection(os.getenv('DRQUEUE_MONGODB')) db = connection['ipythondb'] @@ -53,11 +55,9 @@ def update_db(pool): pool['_id'] = str(pool['_id']) return pool_id - @staticmethod def query_db(pool_id): - import pymongo - import bson + """query pool information from MongoDB""" connection = pymongo.Connection(os.getenv('DRQUEUE_MONGODB')) db = connection['ipythondb'] @@ -65,21 +65,18 @@ def query_db(pool_id): pool = pools.find_one({"_id": bson.ObjectId(pool_id)}) return pool - @staticmethod def delete_from_db(pool_id): - import pymongo - import bson + """delete pool information from MongoDB""" connection = pymongo.Connection(os.getenv('DRQUEUE_MONGODB')) db = connection['ipythondb'] pools = db['drqueue_pools'] return pools.remove({"_id": bson.ObjectId(pool_id)}) - @staticmethod def query_poolnames(): - import pymongo + """query pool names from MongoDB""" connection = pymongo.Connection(os.getenv('DRQUEUE_MONGODB')) db = connection['ipythondb'] @@ -89,10 +86,9 @@ def query_poolnames(): names.append(pool['name']) return names - @staticmethod def query_pool_by_name(pool_name): - import pymongo + """query pool information from MongoDB by name""" connection = pymongo.Connection(os.getenv('DRQUEUE_MONGODB')) db = connection['ipythondb'] @@ -100,10 +96,9 @@ def query_pool_by_name(pool_name): pool = pools.find_one({"name": pool_name}) return pool - @staticmethod def query_pool_list(): - import pymongo + """query list of pools from MongoDB""" connection = pymongo.Connection(os.getenv('DRQUEUE_MONGODB')) db = connection['ipythondb'] @@ -112,11 +107,10 @@ def query_pool_list(): for pool in pools.find(): pool_arr.append(pool) return pool_arr - @staticmethod def query_pool_members(pool_name): - import pymongo + """query list of members of pool from MongoDB""" connection = pymongo.Connection(os.getenv('DRQUEUE_MONGODB')) db = connection['ipythondb'] @@ -126,4 +120,3 @@ def query_pool_members(pool_name): return None else: return list(pool['engine_names']) - From 7c1aa5dc5af9c407255e2af0d600d2ee8919601e Mon Sep 17 00:00:00 2001 From: JensDiemer Date: Wed, 22 Jul 2015 23:18:35 +0200 Subject: [PATCH 04/15] not needed --- DrQueue/computer_pool.py | 11 +---------- 1 file changed, 1 insertion(+), 10 deletions(-) diff --git a/DrQueue/computer_pool.py b/DrQueue/computer_pool.py index 1ed31fb9..c10785e9 100644 --- a/DrQueue/computer_pool.py +++ b/DrQueue/computer_pool.py @@ -10,7 +10,6 @@ """ import os -import getpass import pymongo import bson @@ -18,12 +17,12 @@ class ComputerPool(dict): """Subclass of dict for collecting Pool attribute values.""" + def __init__(self, name, engine_names=[]): dict.__init__(self) if type(engine_names).__name__ != 'list': raise ValueError("argument is not of type list") - return False # mandatory elements pool = { @@ -35,7 +34,6 @@ def __init__(self, name, engine_names=[]): @staticmethod def store_db(pool): - """store pool information in MongoDB""" connection = pymongo.Connection(os.getenv('DRQUEUE_MONGODB')) db = connection['ipythondb'] @@ -46,7 +44,6 @@ def store_db(pool): @staticmethod def update_db(pool): - """update pool information in MongoDB""" connection = pymongo.Connection(os.getenv('DRQUEUE_MONGODB')) db = connection['ipythondb'] @@ -57,7 +54,6 @@ def update_db(pool): @staticmethod def query_db(pool_id): - """query pool information from MongoDB""" connection = pymongo.Connection(os.getenv('DRQUEUE_MONGODB')) db = connection['ipythondb'] @@ -67,7 +63,6 @@ def query_db(pool_id): @staticmethod def delete_from_db(pool_id): - """delete pool information from MongoDB""" connection = pymongo.Connection(os.getenv('DRQUEUE_MONGODB')) db = connection['ipythondb'] @@ -76,7 +71,6 @@ def delete_from_db(pool_id): @staticmethod def query_poolnames(): - """query pool names from MongoDB""" connection = pymongo.Connection(os.getenv('DRQUEUE_MONGODB')) db = connection['ipythondb'] @@ -88,7 +82,6 @@ def query_poolnames(): @staticmethod def query_pool_by_name(pool_name): - """query pool information from MongoDB by name""" connection = pymongo.Connection(os.getenv('DRQUEUE_MONGODB')) db = connection['ipythondb'] @@ -98,7 +91,6 @@ def query_pool_by_name(pool_name): @staticmethod def query_pool_list(): - """query list of pools from MongoDB""" connection = pymongo.Connection(os.getenv('DRQUEUE_MONGODB')) db = connection['ipythondb'] @@ -110,7 +102,6 @@ def query_pool_list(): @staticmethod def query_pool_members(pool_name): - """query list of members of pool from MongoDB""" connection = pymongo.Connection(os.getenv('DRQUEUE_MONGODB')) db = connection['ipythondb'] From 1e2fab3071aaebf0e938b44486c17d9670f388a7 Mon Sep 17 00:00:00 2001 From: JensDiemer Date: Wed, 22 Jul 2015 23:24:55 +0200 Subject: [PATCH 05/15] merge db connect code --- DrQueue/computer_pool.py | 40 +++++++++++++++------------------------- 1 file changed, 15 insertions(+), 25 deletions(-) diff --git a/DrQueue/computer_pool.py b/DrQueue/computer_pool.py index c10785e9..119b7f89 100644 --- a/DrQueue/computer_pool.py +++ b/DrQueue/computer_pool.py @@ -15,6 +15,13 @@ import bson +def get_queue_pools(): + connection = pymongo.Connection(os.getenv('DRQUEUE_MONGODB')) + db = connection['ipythondb'] + pools = db['drqueue_pools'] + return pools + + class ComputerPool(dict): """Subclass of dict for collecting Pool attribute values.""" @@ -31,13 +38,10 @@ def __init__(self, name, engine_names=[]): } self.update(pool) - @staticmethod def store_db(pool): """store pool information in MongoDB""" - connection = pymongo.Connection(os.getenv('DRQUEUE_MONGODB')) - db = connection['ipythondb'] - pools = db['drqueue_pools'] + pools = get_queue_pools() pool_id = pools.insert(pool) pool['_id'] = str(pool['_id']) return pool_id @@ -45,9 +49,7 @@ def store_db(pool): @staticmethod def update_db(pool): """update pool information in MongoDB""" - connection = pymongo.Connection(os.getenv('DRQUEUE_MONGODB')) - db = connection['ipythondb'] - pools = db['drqueue_pools'] + pools = get_queue_pools() pool_id = pools.save(pool) pool['_id'] = str(pool['_id']) return pool_id @@ -55,26 +57,20 @@ def update_db(pool): @staticmethod def query_db(pool_id): """query pool information from MongoDB""" - connection = pymongo.Connection(os.getenv('DRQUEUE_MONGODB')) - db = connection['ipythondb'] - pools = db['drqueue_pools'] + pools = get_queue_pools() pool = pools.find_one({"_id": bson.ObjectId(pool_id)}) return pool @staticmethod def delete_from_db(pool_id): """delete pool information from MongoDB""" - connection = pymongo.Connection(os.getenv('DRQUEUE_MONGODB')) - db = connection['ipythondb'] - pools = db['drqueue_pools'] + pools = get_queue_pools() return pools.remove({"_id": bson.ObjectId(pool_id)}) @staticmethod def query_poolnames(): """query pool names from MongoDB""" - connection = pymongo.Connection(os.getenv('DRQUEUE_MONGODB')) - db = connection['ipythondb'] - pools = db['drqueue_pools'] + pools = get_queue_pools() names = [] for pool in pools.find(): names.append(pool['name']) @@ -83,18 +79,14 @@ def query_poolnames(): @staticmethod def query_pool_by_name(pool_name): """query pool information from MongoDB by name""" - connection = pymongo.Connection(os.getenv('DRQUEUE_MONGODB')) - db = connection['ipythondb'] - pools = db['drqueue_pools'] + pools = get_queue_pools() pool = pools.find_one({"name": pool_name}) return pool @staticmethod def query_pool_list(): """query list of pools from MongoDB""" - connection = pymongo.Connection(os.getenv('DRQUEUE_MONGODB')) - db = connection['ipythondb'] - pools = db['drqueue_pools'] + pools = get_queue_pools() pool_arr = [] for pool in pools.find(): pool_arr.append(pool) @@ -103,9 +95,7 @@ def query_pool_list(): @staticmethod def query_pool_members(pool_name): """query list of members of pool from MongoDB""" - connection = pymongo.Connection(os.getenv('DRQUEUE_MONGODB')) - db = connection['ipythondb'] - pools = db['drqueue_pools'] + pools = get_queue_pools() pool = pools.find_one({"name": pool_name}) if pool == None: return None From c5e8b126c2025ce3bb913c75bb8b908fec9409e3 Mon Sep 17 00:00:00 2001 From: JensDiemer Date: Wed, 22 Jul 2015 23:25:05 +0200 Subject: [PATCH 06/15] better solution --- DrQueue/computer_pool.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/DrQueue/computer_pool.py b/DrQueue/computer_pool.py index 119b7f89..11aa1a34 100644 --- a/DrQueue/computer_pool.py +++ b/DrQueue/computer_pool.py @@ -25,10 +25,10 @@ def get_queue_pools(): class ComputerPool(dict): """Subclass of dict for collecting Pool attribute values.""" - def __init__(self, name, engine_names=[]): + def __init__(self, name, engine_names=None): dict.__init__(self) - if type(engine_names).__name__ != 'list': + if isinstance(engine_names, list): raise ValueError("argument is not of type list") # mandatory elements From df03816b8e2ddec3942b7576865e3b1b96a539ce Mon Sep 17 00:00:00 2001 From: JensDiemer Date: Fri, 24 Jul 2015 17:04:39 +0200 Subject: [PATCH 07/15] Quick fix for #7 TODO: move the command into seperate .py file (IMHO) --- setup.py | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/setup.py b/setup.py index d9ccded0..fa5a2db5 100644 --- a/setup.py +++ b/setup.py @@ -1,8 +1,15 @@ -import os, glob, shutil, sys, pwd, grp +import os, glob, shutil, sys from setuptools import setup from distutils.core import setup, Command +try: + import pwd, grp +except ImportError: + # e.g. under windows + pwd = grp = None + + def read(fname): """Read file contents.""" return open(os.path.join(os.path.dirname(__file__), fname)).read() @@ -97,10 +104,10 @@ def run(self): shutil.copy(template, drqueue_etc) # set to user-supplied user / group - if self.owner != None: + if self.owner != None and pwd is not None: uid = pwd.getpwnam(self.owner)[2] recursive_chown(drqueue_root, uid, -1) - if self.group != None: + if self.group != None and grp is not None: gid = grp.getgrnam(self.group)[2] recursive_chown(drqueue_root, -1, gid) From 2a21ad42a1ca3b435672094dd7aa1dae70ce6324 Mon Sep 17 00:00:00 2001 From: JensDiemer Date: Fri, 24 Jul 2015 17:28:35 +0200 Subject: [PATCH 08/15] made pymongo/bson import "optional" see: https://github.com/jedie/DrQueueIPython/commit/c0119bdd1d5122cb0826b3bdf83635e3b121f8e7#commitcomment-12308414 --- DrQueue/computer_pool.py | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/DrQueue/computer_pool.py b/DrQueue/computer_pool.py index 11aa1a34..cac2a5a4 100644 --- a/DrQueue/computer_pool.py +++ b/DrQueue/computer_pool.py @@ -10,12 +10,25 @@ """ import os +import logging + + +log = logging.getLogger(__name__) + + +try: + import pymongo + import bson +except ImportError as err: + log.debug("Can't import pymongo/bson: %s" % err) + pymongo = bson = None -import pymongo -import bson def get_queue_pools(): + if pymongo is None: + raise RuntimeError("pymongo is needed, please install it!") + connection = pymongo.Connection(os.getenv('DRQUEUE_MONGODB')) db = connection['ipythondb'] pools = db['drqueue_pools'] From f8e6e175e7d8b867e9377ead5fe0aea79cb066c8 Mon Sep 17 00:00:00 2001 From: JensDiemer Date: Tue, 4 Aug 2015 10:06:42 +0200 Subject: [PATCH 09/15] add python shebang --- bin/drqueue | 1 + setup.py | 2 ++ 2 files changed, 3 insertions(+) diff --git a/bin/drqueue b/bin/drqueue index 6e3dc09e..11230b17 100644 --- a/bin/drqueue +++ b/bin/drqueue @@ -1,3 +1,4 @@ +#!/usr/bin/env python # -*- coding: utf-8 -*- """ diff --git a/setup.py b/setup.py index ef230857..14508b9d 100644 --- a/setup.py +++ b/setup.py @@ -1,3 +1,5 @@ +#!/usr/bin/env python + import os, glob, shutil, sys, pwd, grp from setuptools import setup from distutils.core import setup, Command From e08d954add6c67ed0e1feb834a24f46019dcd990 Mon Sep 17 00:00:00 2001 From: JensDiemer Date: Tue, 4 Aug 2015 11:45:15 +0200 Subject: [PATCH 10/15] bugfix: -log.DEBUG +log.debug --- DrQueue/client.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/DrQueue/client.py b/DrQueue/client.py index f910104c..837ac0ce 100644 --- a/DrQueue/client.py +++ b/DrQueue/client.py @@ -244,22 +244,22 @@ def identify_computer(self, engine_id, cache_time, timeout=15): now = int(time.time()) # check existence and age of info if (engine != None) and (now <= engine['created_at'] + cache_time): - log.DEBUG("Engine %i was found in DB and info is up-to-date." % engine_id) + log.debug("Engine %i was found in DB and info is up-to-date." % engine_id) return engine # store new info else: if engine != None: - log.DEBUG("Engine %i was found in DB, but info needs to be updated." % engine_id) + log.debug("Engine %i was found in DB, but info needs to be updated." % engine_id) else: - log.DEBUG("Engine %i was not found in DB." % engine_id) + log.debug("Engine %i was not found in DB." % engine_id) # run command only on specific computer try: dview = self.ip_client[engine_id] except IndexError: - log.DEBUG("Engine with id %i unknown." % engine_id) + log.debug("Engine with id %i unknown." % engine_id) # delete old entry from database DrQueueComputer.delete_from_db_by_engine_id(engine_id) - log.DEBUG("Engine with id %i deleted from database." % engine_id) + log.debug("Engine with id %i deleted from database." % engine_id) new_engine = None else: # run command in async mode @@ -271,10 +271,10 @@ def identify_computer(self, engine_id, cache_time, timeout=15): ar.get(timeout) except Exception: if engine != None: - log.DEBUG("Update request for engine %i timed out. Using old information from DB." % engine_id) + log.debug("Update request for engine %i timed out. Using old information from DB." % engine_id) new_engine = engine else: - log.DEBUG("Information request for engine %i timed out." % engine_id) + log.debug("Information request for engine %i timed out." % engine_id) new_engine = None else: # get computer dict from engine namespace @@ -300,7 +300,7 @@ def computer_set_pools(self, computer, pool_list): # update database entry computer['pools'] = pool_list DrQueueComputer.store_db(computer) - log.DEBUG("Engine " + str(computer['engine_id']) + " added to pools " + pool_str + ".") + log.debug("Engine " + str(computer['engine_id']) + " added to pools " + pool_str + ".") return computer From 4975166a7604f5ddfcb175da370e61b81e59e2e2 Mon Sep 17 00:00:00 2001 From: JensDiemer Date: Tue, 4 Aug 2015 14:15:09 +0200 Subject: [PATCH 11/15] startframe can be <1 in 3dsmax it is also possible to render from -20 to -10 ;) * return after rase is useless --- DrQueue/job.py | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/DrQueue/job.py b/DrQueue/job.py index 16f1f7b2..972e9400 100644 --- a/DrQueue/job.py +++ b/DrQueue/job.py @@ -33,21 +33,18 @@ def __init__(self, name, startframe, endframe, blocksize, renderer, scenefile, r 'enabled' : True, 'limits' : {} } + if name == "": raise ValueError("No name of job given!") - return False - if not (endframe >= startframe >= 1): - raise ValueError("Startframe and endframe need to be at least 1!") - return False + if not (endframe > startframe): + raise ValueError("Endframe must be bigger than startframe!") if blocksize < 1: raise ValueError("Blocksize needs to be at least 1!") - return False if DrQueue.check_renderer_support(renderer) == False: raise ValueError("Render called \"%s\" not supported!" % renderer) - return False if scenefile == "": raise ValueError("No scenefile given!") - return False + # optional elements if 'renderdir' in options: jb['renderdir'] = options['renderdir'] From 69cbb6148ff988b963e5f02f034847584a33d753 Mon Sep 17 00:00:00 2001 From: JensDiemer Date: Tue, 4 Aug 2015 14:37:28 +0200 Subject: [PATCH 12/15] bugfix for windows --- bin/drqueue | 8 +++++--- bin/drqueue_slave.py | 8 +++++--- 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/bin/drqueue b/bin/drqueue index d913e8c5..e803651e 100644 --- a/bin/drqueue +++ b/bin/drqueue @@ -970,14 +970,16 @@ def slave_daemon(args): print("IPython engine started with PID " + str(IPENGINE_PID) + ". Logging to " + ipengine_logpath + ".") # wait for process to exit - os.waitpid(IPENGINE_PID, 0) + returncode = ipengine_daemon.wait() + print("IPython was shut down with returncode: %s" % returncode) # run only once if option given if args.no_restart == True: break else: - print("IPython was shut down. Restarting ...") - time.sleep(5) + sec = 5 + print("Restarting IPython after %s sec..." % sec) + time.sleep(sec) def process_exists(process_name): diff --git a/bin/drqueue_slave.py b/bin/drqueue_slave.py index 62d9a76a..3f189cb0 100644 --- a/bin/drqueue_slave.py +++ b/bin/drqueue_slave.py @@ -111,10 +111,12 @@ def main(): print("IPython engine started with PID " + str(IPENGINE_PID) + ". Logging to " + IPENGINE_LOGPATH + ".") # wait for process to exit - os.waitpid(IPENGINE_PID, 0) + returncode = ipengine_daemon.wait() + print("IPython was shut down with returncode: %s" % returncode) - print("IPython was shut down. Restarting ...") - time.sleep(5) + sec = 5 + print("Restarting IPython after %s sec..." % sec) + time.sleep(sec) if __name__== "__main__": From 779c294f0ff3f8b743c11f7ff8a8af0cfc7d67a2 Mon Sep 17 00:00:00 2001 From: JensDiemer Date: Tue, 4 Aug 2015 14:40:02 +0200 Subject: [PATCH 13/15] get_slave_information.py is in PATH, see #30 --- bin/drqueue | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/bin/drqueue b/bin/drqueue index e803651e..03aeaca6 100644 --- a/bin/drqueue +++ b/bin/drqueue @@ -943,8 +943,6 @@ def slave_daemon(args): ipengine_logpath = os.path.join(os.environ["DRQUEUE_ROOT"], "logs", "ipengine_" + slave_ip + ".log") ipengine_logfile = open(ipengine_logpath, "ab") - dist_egg = pkg_resources.get_distribution("DrQueueIPython") - startup_script = dist_egg.get_resource_filename(__name__, "EGG-INFO/scripts/get_slave_information.py") # register signal handler for SIGINT & SIGTERM signal.signal(signal.SIGTERM, slave_sig_handler) @@ -957,7 +955,7 @@ def slave_daemon(args): while True: # start IPython engine along with startup script print("Connecting to DrQueue master at " + master_ip + ".") - command = "ipengine -s " + startup_script + command = "ipengine -s get_slave_information.py" if (args.no_ssh == True) or LOCALHOST: command += " --ssh=" From 12ffc6aa2df50de00c806a5f083bc4e4853df064 Mon Sep 17 00:00:00 2001 From: JensDiemer Date: Tue, 4 Aug 2015 14:45:17 +0200 Subject: [PATCH 14/15] get_slave_information.py is in PATH, see #30 --- bin/drqueue_slave.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/bin/drqueue_slave.py b/bin/drqueue_slave.py index 3f189cb0..2c08fe05 100644 --- a/bin/drqueue_slave.py +++ b/bin/drqueue_slave.py @@ -39,8 +39,6 @@ IPENGINE_PID = None IPENGINE_LOGPATH = os.path.join(os.environ["DRQUEUE_ROOT"], "logs", "ipengine_" + SLAVE_IP + ".log") IPENGINE_LOGFILE = open(IPENGINE_LOGPATH, "ab") -dist_egg = pkg_resources.get_distribution("DrQueueIPython") -STARTUP_SCRIPT = dist_egg.get_resource_filename(__name__, "EGG-INFO/scripts/get_slave_information.py") def sig_handler(signum, frame): @@ -82,7 +80,7 @@ def run_command(command): message = "OSError({0}) while executing command: {1}\n".format(errno, strerror) IPENGINE_LOGFILE.write(message) raise OSError(message) - return False + return p @@ -105,7 +103,7 @@ def main(): # restart ipengine if it was shut down by IPython while True: # start IPython engine along with startup script - command = "ipengine --url tcp://" + MASTER_IP + ":10101 -s " + STARTUP_SCRIPT + command = "ipengine --url tcp://" + MASTER_IP + ":10101 -s get_slave_information.py" ipengine_daemon = run_command(command) IPENGINE_PID = ipengine_daemon.pid print("IPython engine started with PID " + str(IPENGINE_PID) + ". Logging to " + IPENGINE_LOGPATH + ".") From 4f1387b6036f62f0f5bda294a64812fff9df64d1 Mon Sep 17 00:00:00 2001 From: JensDiemer Date: Tue, 4 Aug 2015 15:29:05 +0200 Subject: [PATCH 15/15] simplification and more info on connection error --- DrQueue/job.py | 67 +++++++++++++++++++++++++++----------------------- 1 file changed, 36 insertions(+), 31 deletions(-) diff --git a/DrQueue/job.py b/DrQueue/job.py index 972e9400..71813eee 100644 --- a/DrQueue/job.py +++ b/DrQueue/job.py @@ -9,11 +9,39 @@ Licensed under GNU General Public License version 3. See LICENSE for details. """ -import os, datetime +import os +import datetime import getpass + +import pymongo +import bson + import DrQueue +def connect_db(): + """ + :return: MongoDB connection object + """ + host=os.getenv('DRQUEUE_MONGODB', None) + if not host: + raise RuntimeError("Error: DRQUEUE_MONGODB not set!") + + print("Connect MongoDB on %s" % host) + connection = pymongo.Connection(host) + db = connection['ipythondb'] + return db + + +def get_jobs(): + """ + :return: return 'drqueue_jobs' + """ + db = connect_db() + jobs = db['drqueue_jobs'] + return jobs + + class Job(dict): """Subclass of dict for collecting Job attribute values.""" def __init__(self, name, startframe, endframe, blocksize, renderer, scenefile, retries=1, owner=getpass.getuser(), options={}, created_with=None, limits={}): @@ -101,11 +129,8 @@ def __init__(self, name, startframe, endframe, blocksize, renderer, scenefile, r @staticmethod def store_db(job): - import pymongo """store job information in MongoDB""" - connection = pymongo.Connection(os.getenv('DRQUEUE_MONGODB')) - db = connection['ipythondb'] - jobs = db['drqueue_jobs'] + jobs = get_jobs() job_id = jobs.insert(job) job['_id'] = str(job['_id']) return job_id @@ -113,11 +138,8 @@ def store_db(job): @staticmethod def update_db(job): - import pymongo """update job information in MongoDB""" - connection = pymongo.Connection(os.getenv('DRQUEUE_MONGODB')) - db = connection['ipythondb'] - jobs = db['drqueue_jobs'] + jobs = get_jobs() job_id = jobs.save(job) job['_id'] = str(job['_id']) return job_id @@ -125,12 +147,8 @@ def update_db(job): @staticmethod def query_db(job_id): - import pymongo - import bson """query job information from MongoDB""" - connection = pymongo.Connection(os.getenv('DRQUEUE_MONGODB')) - db = connection['ipythondb'] - jobs = db['drqueue_jobs'] + jobs = get_jobs() try: job = jobs.find_one({"_id": bson.ObjectId(job_id)}) except bson.errors.InvalidId: @@ -141,22 +159,15 @@ def query_db(job_id): @staticmethod def delete_from_db(job_id): - import pymongo - import bson """query job information from MongoDB""" - connection = pymongo.Connection(os.getenv('DRQUEUE_MONGODB')) - db = connection['ipythondb'] - jobs = db['drqueue_jobs'] + jobs = get_jobs() return jobs.remove({"_id": bson.ObjectId(job_id)}) @staticmethod def query_jobnames(): - import pymongo """query job names from MongoDB""" - connection = pymongo.Connection(os.getenv('DRQUEUE_MONGODB')) - db = connection['ipythondb'] - jobs = db['drqueue_jobs'] + jobs = get_jobs() names = [] for job in jobs.find(): names.append(job['name']) @@ -165,21 +176,15 @@ def query_jobnames(): @staticmethod def query_job_by_name(job_name): - import pymongo """query job information from MongoDB by name""" - connection = pymongo.Connection(os.getenv('DRQUEUE_MONGODB')) - db = connection['ipythondb'] - jobs = db['drqueue_jobs'] + jobs = get_jobs() job = jobs.find_one({"name": job_name}) return job @staticmethod def query_job_list(): - import pymongo """query list of jobs from MongoDB""" - connection = pymongo.Connection(os.getenv('DRQUEUE_MONGODB')) - db = connection['ipythondb'] - jobs = db['drqueue_jobs'] + jobs = get_jobs() return list(jobs.find())