Source code for neural.utils

''' This module contains generic helper functions, not related to imaging specifically'''
import neural as nl
import os,subprocess
import datetime,random,string
import hashlib
import zlib, base64
import tempfile,shutil,re,glob,time,random
import chardet
from threading import Thread,Event
import json,time,re
import numpy as np


#! A list of archives this library understands
archive_formats = {
    'zip': {'suffix':'zip', 'command':lambda output,filename: ['unzip','-o','-d',output,filename]},
    'tarball': {'suffix':('tar.gz','tgz'), 'command':lambda output,filename: ['tar','zx','-C',output,'-f',filename]},
    'tarball-bzip': {'suffix':('tar.bz2','tbz'), 'command':lambda output,filename: ['tar','jx','-C',output,'-f',filename]},
    '7zip': {'suffix':'7z', 'command':lambda output,filename: ['7z','x','-y','-o%s' % output, filename]}
}

[docs]def is_archive(filename): '''returns boolean of whether this filename looks like an archive''' for archive in archive_formats: if filename.endswith(archive_formats[archive]['suffix']): return True return False
[docs]def archive_basename(filename): '''returns the basename (name without extension) of a recognized archive file''' for archive in archive_formats: if filename.endswith(archive_formats[archive]['suffix']): return filename.rstrip('.' + archive_formats[archive]['suffix']) return False
[docs]def unarchive(filename,output_dir='.'): '''unpacks the given archive into ``output_dir``''' if not os.path.exists(output_dir): os.makedirs(output_dir) for archive in archive_formats: if filename.endswith(archive_formats[archive]['suffix']): return subprocess.call(archive_formats[archive]['command'](output_dir,filename))==0 return False
[docs]def flatten(nested_list): '''converts a list-of-lists to a single flat list''' return_list = [] for i in nested_list: if isinstance(i,list): return_list += flatten(i) else: return_list.append(i) return return_list
[docs]class run_in: '''temporarily changes into another directory If the directory name you pass doesn't exist, but matches the current directory you are in, it will silently ignore your silliness. Otherwise, it will raise an ``OSError``. If the argument ``create`` is True, it will create the directory instead of raising an error. Example:: with run_in('another_directory'): do_some_stuff_there() ''' def __init__(self,working_directory,create=False): self.working_directory = working_directory self.create = create def __enter__(self): self.old_cwd = os.getcwd() if self.working_directory and self.working_directory!='.': if os.path.exists(self.working_directory): os.chdir(self.working_directory) elif self.old_cwd.endswith(self.working_directory.rstrip('/')): # Silly, we're already in that directory pass else: if self.create: os.makedirs(self.working_directory) os.chdir(self.working_directory) else: raise IOError('Attempting to run_in the non-existent directory "%s"' % self.working_directory) def __exit__(self, type, value, traceback): os.chdir(self.old_cwd)
[docs]class RunResult: '''result of calling :meth:`run` when used as a string, will try to reasonably return the filename of the primary output of the command (if known)''' def __init__(self,output=None,return_code=None,output_filename=None): self.output_filename = output_filename self.output = output self.return_code = return_code def __str__(self): return self.output_filename
[docs]def run(command,products=None,working_directory='.',force_local=False,stderr=True,quiet=False): '''wrapper to run external programs :command: list containing command and parameters (formatted the same as subprocess; must contain only strings) :products: string or list of files that are the products of this command if all products exist, the command will not be run, and False returned :working_directory: will chdir to this directory :force_local: when used with `neural.scheduler`, setting to ``True`` will disable all job distribution functions :stderr: forward ``stderr`` into the output ``True`` will combine ``stderr`` and ``stdout`` ``False`` will return ``stdout`` and let ``stderr`` print to the console ``None`` will return ``stdout`` and suppress ``stderr`` :quiet: ``False`` (default) will print friendly messages ``True`` will suppress everything but errors ``None`` will suppress all output Returns result in form of :class:`RunResult` ''' with run_in(working_directory): if products: if isinstance(products,basestring): products = [products] if all([os.path.exists(x) for x in products]): return False command = flatten(command) command = [str(x) for x in command] quiet_option = False if quiet==False else True with nl.notify('Running %s...' % command[0],level=nl.level.debug,quiet=quiet_option): out = None returncode = 0 try: if stderr: # include STDERR in STDOUT output out = subprocess.check_output(command,stderr=subprocess.STDOUT) elif stderr==None: # dump STDERR into nothing out = subprocess.check_output(command,stderr=subprocess.PIPE) else: # let STDERR show through to the console out = subprocess.check_output(command) except subprocess.CalledProcessError, e: if quiet!=None: nl.notify('''ERROR: %s returned a non-zero status ----COMMAND------------ %s ----------------------- ----OUTPUT------------- %s ----------------------- Return code: %d ''' % (command[0],' '.join(command),e.output,e.returncode),level=nl.level.error) out = e.output returncode = e.returncode result = RunResult(out,returncode) if products and returncode==0: result.output_filename = products[0] return result
[docs]def log(fname,msg): ''' generic logging function ''' with open(fname,'a') as f: f.write(datetime.datetime.now().strftime('%m-%d-%Y %H:%M:\n') + msg + '\n')
[docs]def hash(filename): '''returns string of MD5 hash of given filename''' buffer_size = 10*1024*1024 m = hashlib.md5() with open(filename) as f: buff = f.read(buffer_size) while len(buff)>0: m.update(buff) buff = f.read(buffer_size) dig = m.digest() return ''.join(['%x' % ord(x) for x in dig])
[docs]def hash_str(string): '''returns string of MD5 hash of given string''' m = hashlib.md5() m.update(string) dig = m.digest() return ''.join(['%x' % ord(x) for x in dig])
[docs]class simple_timer: '''a simple way to time a single run of a function Example:: with simple_timer(): do_stuff() ''' def __init__(self): self.start_time = None def __enter__(self): self.start_time = datetime.datetime.now() print 'timer start time: %s' % self.start_time.strftime('%Y-%m-%d %H:%M:%S') def __exit__(self, type, value, traceback): self.end_time = datetime.datetime.now() print 'timer end time: %s' % self.end_time.strftime('%Y-%m-%d %H:%M:%S') print 'time elapsed: %s' % str(self.end_time-self.start_time)
[docs]def factor(n): '''return set of all prime factors for a number''' return set(reduce(list.__add__, ([i, n//i] for i in range(1, int(n**0.5) + 1) if n % i == 0)))
[docs]def compress(data, level=9): ''' return compressed, ASCII-encoded string ''' return base64.encodestring(zlib.compress(data,9))
[docs]def decompress(data): ''' return uncompressed string ''' return zlib.decompress(base64.decodestring(data))
[docs]def which(program): '''returns full path to program name or ``None`` if not found taken from: http://stackoverflow.com/questions/377017/test-if-executable-exists-in-python''' def is_exe(fpath): return os.path.isfile(fpath) and os.access(fpath, os.X_OK) fpath, fname = os.path.split(program) if fpath: if is_exe(program): return program else: for path in os.environ["PATH"].split(os.pathsep): path = path.strip('"') exe_file = os.path.join(path, program) if is_exe(exe_file): return exe_file return None
[docs]def find(file): '''tries to find ``file`` using OS-specific searches and some guessing''' # Try MacOS Spotlight: mdfind = which('mdfind') if mdfind: out = run([mdfind,'-name',file],stderr=None,quiet=None) if out.return_code==0 and out.output: for fname in out.output.split('\n'): if os.path.basename(fname)==file: return fname # Try UNIX locate: locate = which('locate') if locate: out = run([locate,file],stderr=None,quiet=None) if out.return_code==0 and out.output: for fname in out.output.split('\n'): if os.path.basename(fname)==file: return fname # Try to look through the PATH, and some guesses: path_search = os.environ["PATH"].split(os.pathsep) path_search += ['/usr/local/afni','/usr/local/afni/atlases','/usr/local/share','/usr/local/share/afni','/usr/local/share/afni/atlases'] afni_path = which('afni') if afni_path: path_search.append(os.path.dirname(afni_path)) if nl.wrappers.fsl.bet2: path_search.append(os.path.dirname(nl.wrappers.fsl.bet2)) for path in path_search: path = path.strip('"') try: if file in os.listdir(path): return os.path.join(path,file) except: pass
[docs]class run_in_tmp: '''creates a temporary directory to run the code block in''' def __init__(self,inputs=[],products=[]): self.tmp_dir = tempfile.mkdtemp() self.cwd = None self.inputs = inputs self.products = products def __enter__(self): if not isinstance(self.inputs,list): self.inputs = [self.inputs] self.cwd = os.getcwd() for file in self.inputs: try: nl.dset_copy(file,self.tmp_dir) except (OSError,IOError): pass os.chdir(self.tmp_dir) return self def __exit__(self, type, value, traceback): if not isinstance(self.products,list): self.products = [self.products] for file in self.products: try: nl.dset_copy(file,self.cwd) except (OSError,IOError): pass os.chdir(self.cwd) shutil.rmtree(self.tmp_dir,True)
[docs]def random_string(length): '''Returns random string of letters''' return ''.join(random.SystemRandom().choice(string.ascii_uppercase + string.ascii_lowercase) for _ in range(length))
[docs]def universal_read(fname): '''Will open and read a file with universal line endings, trying to decode whatever format it's in (e.g., utf8 or utf16)''' with open(fname,'rU') as f: data = f.read() enc_guess = chardet.detect(data) return data.decode(enc_guess['encoding'])
[docs]class ThreadSafe(object): '''wrapper class to handle starting and stopping for the :meth:`thread_safe` method of :class:`Beacon`''' def __init__(self,beacon): self.beacon = beacon def __enter__(self): self.beacon.write_packet() self.beacon.start() def __exit__(self, type, value, traceback): self.beacon.stop()
[docs]class Beacon(Thread): '''Class to easily handle running multiple threads simultaneously. Communicates through a lockfile in an arbitrary file path, so communicating across different computers that have shared file systems is relatively easy (just choose a file path on the shared drive). Options: :app_name: Arbitrary name of the script. Will use script filename if ``None`` :instance_name: Arbitrary name of this instance (e.g., subject #) :packet_path: Path to put the lock file in (defaults to the system temp directory) :poll_time: How often (in seconds) to ping the lock file Example of usage:: b = Beacon('my_analysis','subject_4') if not b.exists(): # there are no "my_analysis" scripts running "subject_4" right now with b.thread_safe(): # lock this, so other scripts will fail when they run b.exists() on this subject # do the analysis here... ''' def __init__(self,app_name=None,instance_name=None,packet_path=None,poll_time=0.5): Thread.__init__(self) self.stop_event = Event() self.app_name = app_name if app_name==None: self.app_name = os.path.basename(__file__) self.instance_name = instance_name self.filename = self.app_name if instance_name: self.filename += '_' + instance_name self.filename += '.lock' self.packet_path = packet_path if packet_path==None: self.packet_path = tempfile.gettempdir() self.poll_time = poll_time
[docs] def exists(self): '''Returns a ``bool`` of whether this analysis is already running somewhere else''' return not self.check_packet()
[docs] def thread_safe(self): '''Use in a ``with`` statement to run code within a thread-safe context. When the ``with`` statement enters, this will create the lock file, and it will automatically stop and delete it when the ``with`` block finishes''' return ThreadSafe(self)
def packet(self): return { 'app_name':self.app_name, 'instance_name':self.instance_name, 'poll_time':self.poll_time, 'last_time':time.time() } def packet_file(self): return os.path.join(self.packet_path,self.filename) def write_packet(self): with open(self.packet_file(),'w') as f: f.write(json.dumps(self.packet()))
[docs] def check_packet(self): '''is there a valid packet (from another thread) for this app/instance?''' if not os.path.exists(self.packet_file()): # No packet file, we're good return True else: # There's already a file, but is it still running? try: with open(self.packet_file()) as f: packet = json.loads(f.read()) if time.time() - packet['last_time'] > 3.0*packet['poll_time']: # We haven't heard a ping in too long. It's probably dead return True else: # Still getting pings.. probably still a live process return False except: # Failed to read file... try again in a second time.sleep(random.random()*2) return self.check_packet()
[docs] def run(self): while not self.stop_event.wait(self.poll_time): self.write_packet() if os.path.exists(self.packet_file()): try: os.remove(self.packet_file()) except: pass
def stop(self): self.stop_event.set()
[docs]def strip_rows(array,invalid=None): '''takes a ``list`` of ``list``s and removes corresponding indices containing the invalid value (default ``None``). ''' array = np.array(array) none_indices = np.where(np.any(np.equal(array,invalid),axis=0)) return tuple(np.delete(array,none_indices,axis=1))
[docs]def numberize(string): '''Turns a string into a number (``int`` or ``float``) if it's only a number (ignoring spaces), otherwise returns the string. For example, ``"5 "`` becomes ``5`` and ``"2 ton"`` remains ``"2 ton"``''' if not isinstance(string,basestring): return string just_int = r'^\s*[-+]?\d+\s*$' just_float = r'^\s*[-+]?\d+\.(\d+)?\s*$' if re.match(just_int,string): return int(string) if re.match(just_float,string): return float(string) return string