Package acis_xmlrpc :: Module util
[hide private]
[frames] | no frames]

Source Code for Module acis_xmlrpc.util

  1  """ 
  2  Utility functions 
  3  @since: May 2007 
  4  @author: Grant Kelly 
  5  """ 
  6   
  7  import datetime 
  8  from mx import DateTime 
  9   
10 -def dates_for_last_n_days(n):
11 """ 12 Return the start and end dates for the last n days. 13 """ 14 today = DateTime.today() 15 return { 'start':today - DateTime.DateTimeDeltaFromDays(n), 16 'end': today }
17
18 -def date_from_string(s):
19 """ 20 Return a DateTime.Date object for a given string 21 """ 22 if len(s) > 8: 23 if not s[2].isdigit(): 24 sep = s[2] 25 elif not s[4].isdigit(): 26 sep = s[4] 27 else: return None 28 ss = s.replace(sep,'') 29 else: 30 ss = s 31 # If given a yearmonth, add first day of month 32 if len(ss) == 6: 33 ss = ss + '01' 34 35 if len(ss) == 8: 36 y = ss[:4] 37 m = ss[4:6] 38 d = ss[6:8] 39 return DateTime.Date(int(y),int(m),int(d))
40
41 -def new_missing_array(sDate, eDate, missing=-999.0):
42 """ 43 Return a list of missing values of the size of interval between sDate and eDate. 44 """ 45 sD = datetime.datetime(*sDate) 46 eD = datetime.datetime(*eDate) 47 num_days = (eD - sD).days 48 return [missing]*num_days
49
50 -def new_date_array(sDate, eDate):
51 one_day = datetime.timedelta(days=1) 52 sD = datetime.datetime(*sDate) 53 eD = datetime.datetime(*eDate) 54 num_days = (eD - sD).days 55 dates = [None,]*num_days 56 for i in range(num_days): 57 dates[i] = (sD + one_day * i).timetuple()[:3] 58 return dates
59 60
61 -def unique_list(l, idfunc=lambda x: x):
62 """ 63 Order-preserving uniqify sequence C{l} according to C{idfunc} function. 64 """ 65 seen = {} 66 result = [] 67 for i in l: 68 mark = idfunc(i) 69 if mark in seen: continue 70 seen[mark] = 1 71 result.append(i) 72 return result
73 74
75 -def valid_stats(l, fun=bool):
76 valid_count = 0 77 invalid_count = 0 78 largest_valid = 0 79 largest_invalid = 0 80 curr_largest_valid = 0 81 curr_largest_invalid = 0 82 83 for i in range(len(l)): 84 if fun(l[i]): 85 valid_count += 1 86 if curr_largest_valid == 0: 87 curr_largest_valid = 1 88 else: 89 try: 90 if fun(l[i-1]): 91 curr_largest_valid += 1 92 else: 93 curr_largest_valid = 1 94 except: pass 95 else: 96 invalid_count += 1 97 if curr_largest_invalid == 0: 98 curr_largest_invalid = 1 99 else: 100 try: 101 if (not fun(l[i-1])): 102 curr_largest_invalid += 1 103 else: 104 curr_largest_invalid = 1 105 except: pass 106 107 if curr_largest_valid > largest_valid: 108 largest_valid = curr_largest_valid 109 if curr_largest_invalid > largest_invalid: 110 largest_invalid = curr_largest_invalid 111 112 return (len(l), valid_count, invalid_count, largest_valid, largest_invalid)
113 114 115 ## 116 # This really should be done lower-level, 117 # instead of per application. 118 invalid_mask = 0x0800 119 flag_mask = 0x07c0 120 flag_code = { 121 0x0040: 'A', # accumulated 122 0x0080: 'B', # accumulated includes, estimated values 123 0x00C0: 'E', # estimated 124 0x0100: 'J', # value has been manually validated 125 0x0140: 'S', # subsequent 126 0x0180: 'T', # trace 127 0x01C0: '(', # expert system edited value, NOT validated 128 0x0200: ')', # expert system approved edited value 129 0x0240: 'D', # derived value 130 0x0280: 'I', # based on incomplete time series 131 0x0400: 'val', 132 0x0440: 'val T', 133 0x07C0: 'M', # missing 134 } 135
136 -def getFilteredFlagSeq(flags):
137 for i in xrange(len(flags)): 138 if flags[i] & invalid_mask != 0: 139 meas_flag = 'M' 140 else: 141 try: 142 meas_flag = flag_code[flags[i] & flag_mask] 143 except KeyError: 144 meas_flag = '' 145 flags[i] = meas_flag 146 return flags
147 148 ## 149