Commit 36731433 authored by mradmin's avatar mradmin
Browse files

close the iRODS session between upload batches

parent 547d1c45
......@@ -47,8 +47,10 @@ global mrdata_irods_sesh
# How long to wait to assume the file transfer is complete
time_to_file_whole_seconds = 15.0
# Cached dicom values used to reduce the number of iRODS API calls per file.
mrdata_irods_sesh = None
cret_file_list = None
# Cached dicom values used to reduce the number of iRODS API calls per file.
cache_experiment_id = None
cache_experiment_echtdata_ipath = None
cache_mrdata_scan_ipath = None
......@@ -426,6 +428,21 @@ def ScanArrivingFiles():
if file_age < time_to_file_whole_seconds:
time.sleep( time_to_file_whole_seconds - file_age )
global mrdata_irods_sesh
if mrdata_irods_sesh == None:
logging.debug("Creating mrdata_irods_sesh for this batch of files.")
try:
mrdata_irods_sesh = iRODSSession( host=cred_file_list["host"],
port=cred_file_list["port"],
user=cred_file_list["user"],
zone=cred_file_list['zone'],
password=cred_file_list['password'] )
except Exception as e:
logging.error("Exception opening irods session")
logging.error( str(e) )
raise
ProcessFile( file )
processing = True
......@@ -466,6 +483,7 @@ def run():
cred_file_name = "mrdata_irods_credential_file.yml"
global cred_file_list
try:
with open( cred_file_name, "r" ) as cf:
cred_file_list = yaml.load( cf, Loader=yaml.FullLoader )
......@@ -473,6 +491,7 @@ def run():
logging.error("Failed to open credential file " + cred_file_name + " " + str(e) )
raise
global mrdata_irods_sesh
try:
#ToDo Needs to be centralized
mrdata_irods_sesh = iRODSSession( host=cred_file_list["host"],
......@@ -480,20 +499,19 @@ def run():
user=cred_file_list["user"],
zone=cred_file_list['zone'],
password=cred_file_list['password'] )
# The following is a problem because the credentials seem local host specific
#mrdata_irods_sesh = iRODSSession(irods_env_file=env_file)
except Exception as e:
print("Exception opening irods session")
logging.error( e )
logging.error("Exception opening irods session")
logging.error( str(e) )
raise
irods_user = str( mrdata_irods_sesh.username )
irods_zone = str( mrdata_irods_sesh.zone )
coll = mrdata_irods_sesh.collections.get( miconf.mrdata_base_ipath )
print( "iRODS returns " + str(coll.id) )
logging.info( "iRODS working. collections.get() returns: " + str(coll.id) )
mrdata_irods_sesh.cleanup()
mrdata_irods_sesh = None
global path
path = os.path.abspath( sys.argv[1] if len(sys.argv) > 1 else '.' )
......@@ -505,9 +523,18 @@ def run():
#observer.start()
try:
while True:
logging.debug( "Scanning files.")
ScanArrivingFiles()
# Hackish ... but close the seshion if it was opened by ScanArrivingFiles.
# This is because iRODS python has trouble with long running sessions
if mrdata_irods_sesh != None:
logging.debug("Clean up mrdata_irods_sesh which was created in ScanArrivingFiles.")
mrdata_irods_sesh.cleanup()
mrdata_irods_sesh = None
time.sleep(10)
except KeyboardInterrupt:
logging.info( "Got KeyboardInterrupt" )
observer.stop()
observer.join()
logging.info( "Finished ")
......
......@@ -35,8 +35,8 @@ global mrdata_irods_sesh
# How long to wait to assume the file transfer is complete
time_to_file_whole_seconds = 15.0
# Cached dicom values used to reduce the number of iRODS API calls per file.
mrdata_irods_sesh = None
cret_file_list = None
import subprocess
import re
......@@ -284,6 +284,23 @@ def ScanArrivingFiles():
global path
logging.debug( path )
def get_iRODS_sesh():
global mrdata_irods_sesh
if mrdata_irods_sesh == None:
logging.debug("Creating mrdata_irods_sesh for this batch of files.")
try:
mrdata_irods_sesh = iRODSSession( host=cred_file_list["host"],
port=cred_file_list["port"],
user=cred_file_list["user"],
zone=cred_file_list['zone'],
password=cred_file_list['password'] )
except Exception as e:
logging.error("Exception opening irods session")
logging.error( str(e) )
raise
# Find the flagged directories.
glob_patient_name_pattern = "*_*-*" # could it be more accurate ?
finished_suffix = "_FINISHED.TXT"
......@@ -294,6 +311,7 @@ def ScanArrivingFiles():
logging.error("Failed to get a list of pathname and/or sort them. error: " + str(ex) )
return
# Find the marked finished directories.
for pathname in pathnames:
logging.debug( "Starting on finished suffix pathname: " + pathname )
......@@ -301,6 +319,8 @@ def ScanArrivingFiles():
if not pathname.endswith( finished_suffix ):
continue
get_iRODS_sesh()
finished_suffix_index = pathname.find( finished_suffix )
exp_dir_pathname = pathname[:-len(finished_suffix)]
......@@ -342,6 +362,9 @@ def ScanArrivingFiles():
logging.debug( "pathname: " + pathname + " pathname_age: " + str(pathname_age) )
if pathname_age > 24*60*60 :
get_iRODS_sesh()
ProcessExperimentDataDir( pathname )
......@@ -358,6 +381,7 @@ def run():
cred_file_name = "mrdata_irods_credential_file.yml"
global cred_file_list
try:
with open( cred_file_name, "r" ) as cf:
cred_file_list = yaml.load( cf, Loader=yaml.FullLoader )
......@@ -365,26 +389,26 @@ def run():
logging.error("Failed to open credential file " + cred_file_name + " " + str(e) )
raise
global mrdata_irods_sesh
try:
#ToDo Needs to be centralized
mrdata_irods_sesh = iRODSSession( host=cred_file_list["host"],
port=cred_file_list["port"],
user=cred_file_list["user"],
zone=cred_file_list['zone'],
password=cred_file_list['password'] )
# The following is a problem because the credentials seem local host specific
#mrdata_irods_sesh = iRODSSession(irods_env_file=env_file)
except Exception as e:
print("Exception opening irods session")
logging.error( e )
logging.error("Exception opening irods session")
logging.error( str(e) )
raise
irods_user = str( mrdata_irods_sesh.username )
irods_zone = str( mrdata_irods_sesh.zone )
coll = mrdata_irods_sesh.collections.get( miconf.mrdata_base_ipath )
print( "iRODS returns " + str(coll.id) )
logging.info( "iRODS working. collections.get() returns: " + str(coll.id) )
mrdata_irods_sesh.cleanup()
mrdata_irods_sesh = None
global path
path = miconf.experiment_data_path
......@@ -402,9 +426,18 @@ def run():
try:
while True:
logging.debug( "Scanning files.")
ScanArrivingFiles()
# Hackish ... but close the seshion if it was opened by ScanArrivingFiles.
# This is because iRODS python has trouble with long running sessions
if mrdata_irods_sesh != None:
logging.debug("Clean up mrdata_irods_sesh which was created in ScanArrivingFiles.")
mrdata_irods_sesh.cleanup()
mrdata_irods_sesh = None
time.sleep(60)
except KeyboardInterrupt:
logging.info( "Got KeyboardInterrupt" )
observer.stop()
observer.join()
logging.info( "Finished ")
......
......@@ -24,6 +24,7 @@ from irods.session import iRODSSession
from irods.meta import iRODSMeta, AVUOperation
import irods.keywords as kw
global cred_file_list
global mrdata_irods_sesh
sys.path.append('/home/mradmin/mrdata-common-public/pymods/mrdata_irods_config')
......@@ -51,6 +52,7 @@ import twixtools # Switch to Philip's library
time_to_file_whole_seconds = 60.0
mrdata_irods_sesh = None
cret_file_list = None
last_patient_name = None
cache_patient_name = None
......@@ -430,12 +432,28 @@ def ScanArrivingFiles():
if file_age < time_to_file_whole_seconds:
time.sleep( time_to_file_whole_seconds - file_age )
global mrdata_irods_sesh
if mrdata_irods_sesh == None:
logging.debug("Creating mrdata_irods_sesh for this batch of files.")
try:
mrdata_irods_sesh = iRODSSession( host=cred_file_list["host"],
port=cred_file_list["port"],
user=cred_file_list["user"],
zone=cred_file_list['zone'],
password=cred_file_list['password'] )
except Exception as e:
logging.error("Exception opening irods session")
logging.error( str(e) )
raise
logging.info("Calling ProcessFile on file: " + file )
ProcessFile( file )
logging.info("Done ProcessFile on file: " + file )
processing = True
class EventHandler(FileSystemEventHandler):
# def dispatch(self, event):
......@@ -472,6 +490,7 @@ def run():
cred_file_name = "mrdata_irods_credential_file.yml"
global cred_file_list
try:
with open( cred_file_name, "r" ) as cf:
cred_file_list = yaml.load( cf, Loader=yaml.FullLoader )
......@@ -479,6 +498,7 @@ def run():
logging.error("Failed to open credential file " + cred_file_name + " " + str(e) )
raise
global mrdata_irods_sesh
try:
#ToDo Needs to be centralized
mrdata_irods_sesh = iRODSSession( host=cred_file_list["host"],
......@@ -486,9 +506,6 @@ def run():
user=cred_file_list["user"],
zone=cred_file_list['zone'],
password=cred_file_list['password'] )
# The following is a problem because the credentials seem local host specific
#mrdata_irods_sesh = iRODSSession(irods_env_file=env_file)
except Exception as e:
logging.error("Exception opening irods session")
logging.error( str(e) )
......@@ -500,6 +517,9 @@ def run():
coll = mrdata_irods_sesh.collections.get( miconf.mrdata_base_ipath )
logging.info( "iRODS working. collections.get() returns: " + str(coll.id) )
mrdata_irods_sesh.cleanup()
mrdata_irods_sesh = None
global path
path = os.path.abspath( sys.argv[1] if len(sys.argv) > 1 else '.' )
logging.info( path )
......@@ -510,9 +530,16 @@ def run():
#observer.start()
try:
while True:
logging.debug( "Scanning files.")
ScanArrivingFiles()
# Hackish ... but close the seshion if it was opened by ScanArrivingFiles.
# This is because iRODS python has trouble with long running sessions
if mrdata_irods_sesh != None:
logging.debug("Clean up mrdata_irods_sesh which was created in ScanArrivingFiles.")
mrdata_irods_sesh.cleanup()
mrdata_irods_sesh = None
time.sleep(60)
logging.info("waking")
except KeyboardInterrupt:
logging.info( "Got KeyboardInterrupt" )
observer.stop()
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment