Package acis_xmlrpc :: Module xmlrpc
[hide private]
[frames] | no frames]

Source Code for Module acis_xmlrpc.xmlrpc

  1  """ 
  2  ACIS XML-RPC Server. 
  3  @since: May 2007 
  4  @author: Grant Kelly 
  5  """ 
  6  import re 
  7  import time 
  8  from mx import DateTime 
  9  import simplejson 
 10  from cStringIO import StringIO 
 11  from zope.interface import Interface, implements 
 12  from twisted.application import internet, service 
 13  from twisted.internet import protocol, reactor, defer, threads 
 14  from twisted.protocols.basic import LineReceiver 
 15  from twisted.python import components 
 16  from twisted.web import server, xmlrpc 
 17   
 18  # Local imports 
 19  from acis_xmlrpc.responses import ACISErrorResponse, ACISDataResponse, ACISMetaDataResponse 
 20  from acis_xmlrpc.exceptions import * 
 21  from acis_xmlrpc.options import * 
 22  from acis_xmlrpc.util import * 
 23  from acis_xmlrpc.config import * 
 24  from acis_xmlrpc.var_table import global_var_dict 
 25   
 26  # Start of ACIS BoilerPlate 
 27  import ucanCallMethods 
 28  from string import * 
 29  from omniORB import CORBA 
 30  import Meta 
 31  from acisTools import getMetaDictionary 
 32  NameAny = Meta.MetaQuery.NameAnyPair 
 33  any = CORBA.Any 
 34  tc = CORBA.TypeCode 
 35  tc_short = CORBA.TC_short 
 36  tc_string = CORBA.TC_string 
 37  tc_shortseq = tc(Meta.ShortSeq) 
 38  tc_doubleseq = tc(Meta.DoubleSeq) 
 39  # End of BoilerPlate 
 40   
 41  # 
 42  # ACIS Service 
 43  # 
44 -class IACISService(Interface):
45 - def getMetaForStn(options):
46 """ Interface declaration """
47 - def getDailyData(options):
48 """ Interface declaration """
49
50 -class ACISService(service.Service):
51 implements(IACISService)
52 - def __init__(self):
53 self.ucan = None 54 self.query = None 55 self.data = None 56 self._get_ucan()
57
58 - def _get_ucan(self):
59 if self.ucan is None or not self.ucan: 60 self.ucan = ucanCallMethods.general_ucan(GENERAL_UCAN_NAME) 61 return self.ucan
62
63 - def _get_data(self):
64 return self._get_ucan().get_data()
65
66 - def _get_query(self):
67 return self._get_ucan().get_query()
68
69 - def getUcanIdsFromOptions(self, options):
70 """ 71 Lookup ucan_id for every (stn,stntype) given in options. 72 If stn not found, ucan_id is 0. 73 @return: list of ucan_ids 74 """ 75 ucan_ids = [] 76 query = self._get_query() 77 for i in xrange(len(options.stn[:])): 78 stn, stntype = options.stn[i], options.stntype[i] 79 if stntype == 'ucan': 80 ucan_id = int(stn) 81 else: 82 try: 83 info = query.getUcanFromIdAsSeq(stn, stntype) 84 ucan_id = info[-1].ucan_id 85 except: 86 ucan_id = 0 # ucan_id not found 87 ucan_ids.append((ucan_id, stn, stntype)) 88 query.release(); self.query = None 89 return ucan_ids
90
91 - def getMetaForStn(self, options):
92 """ 93 Get metadata associated with given station(s). 94 @param options: Options from the option parser. 95 @type options: parser options 96 @return: Deferred object. 97 @todo: Populate more meta information, possibly by extending getInfoForUcanIdAsSeq() 98 or another CORBA-level function. 99 """ 100 def _work(): 101 stn_dict = {} 102 query = self._get_query() 103 for ucan_id, stn, stntype in self.getUcanIdsFromOptions(options): 104 if not ucan_id: 105 pass 106 else: 107 info = query.getInfoForUcanIdAsSeq(ucan_id,()) 108 stn_dict[ucan_id] = ucanCallMethods.NameAny_to_dict(info[-1].fields) 109 if options.varstats: 110 data = self._get_data() 111 for var in options.var: 112 var_major = global_var_dict[var]['var_major'] 113 try: 114 tsvar = data.newTSVar(var_major,0,ucan_id) 115 valid_sD, valid_eD = tsvar.getValidDateRange() 116 tsvar.setDateRange(valid_sD, valid_eD) 117 valid_flag_stats = valid_stats(tsvar.getValidFlagSeq()) 118 stn_dict[ucan_id].update({ 119 var+'_valid_start': "%s" % DateTime.Date(valid_sD[0],valid_sD[1],valid_sD[2]).strftime(options.dateformat[1]), 120 var+'_valid_end': "%s" % DateTime.Date(valid_eD[0],valid_eD[1],valid_eD[2]).strftime(options.dateformat[1]), 121 var+'_total_count': valid_flag_stats[0], 122 var+'_valid_count': valid_flag_stats[1], 123 var+'_invalid_count': valid_flag_stats[2], 124 var+'_largest_valid': valid_flag_stats[3], 125 var+'_largest_invalid': valid_flag_stats[4], 126 }) 127 except: 128 # Valid info not found, so set some unknowns 129 stn_dict[ucan_id].update({ 130 var+'_valid_start': "%s" % DateTime.Date(9999,12,31).strftime(options.dateformat[1]), 131 var+'_valid_end': "%s" % DateTime.Date(9999,12,31).strftime(options.dateformat[1]), 132 var+'_total_count': "-9999", 133 var+'_valid_count': "-9999", 134 var+'_invalid_count': "-9999", 135 var+'_largest_valid': "-9999", 136 var+'_largest_invalid': "-9999", 137 }) 138 data.release(); self.data = None 139 query.release(); self.query = None 140 return stn_dict
141 return threads.deferToThread(_work)
142
143 - def getWRCCDailyData(self, stn, options):
144 """ 145 Get daily data for a WRCC station. 146 @param stn: 4-character WRCC station ID 147 @type stn: string 148 @param options: Options from the option parser. 149 @type options: parser options 150 @return: Data dictionary 151 """ 152 from wrcc_http import getWRCCDailySummary 153 date_line = re.compile(r"^(\d\d)/(\d\d)/(\d\d\d\d)") 154 element_order = ('srad','wavg','wdir','pkws','tavg','tmax','tmin','avrh','mxrh','mnrh','prcp') 155 data_dict = {'stntype': "wrcc"} 156 try: 157 # WRCC interface includes last day; ACIS does not. 158 # Make WRCC consistent w/ ACIS by subtracting a day here 159 oneday = DateTime.DateTimeDeltaFromDays(1) 160 data = getWRCCDailySummary(stn, options.start, options.end - oneday, allow_incomplete=options.allow_incomplete ) 161 data_dict['code'] = CODE_NO_DATA 162 except: 163 data = [] 164 data_dict['code'] = INTERNAL_SERVER_ERROR 165 for line in data.split("\n"): 166 line = line.strip() 167 m = date_line.match(line) 168 if m: # If a line starts with a date, then it should be a data line 169 data_dict['code'] = CODE_STN_HAS_DATA 170 #mon,day,year = int(m.group(1)), int(m.group(2)), int(m.group(3)) 171 data_parts = line.split()[4:] 172 if len(data_parts) != len(element_order): 173 print "Did WRCC data page change?" 174 return {'code': INTERNAL_SERVER_ERROR} 175 for i,var in enumerate(element_order): 176 if var in options.var: 177 if not data_dict.has_key(var): 178 data_dict[var] = [] 179 data_dict[var + '_valid_flag'] = [] 180 data_dict[var + '_flag'] = [] 181 val = data_parts[i] 182 try: # convert val to float, otherwise set as missing 183 data_dict[var].append(float(val)) 184 data_dict[var + '_valid_flag'].append(1) 185 data_dict[var + '_flag'].append('') 186 data_dict[var + '_code'] = CODE_STN_HAS_DATA 187 except ValueError: 188 data_dict[var].append('') 189 data_dict[var + '_valid_flag'].append(0) 190 data_dict[var + '_flag'].append('M') 191 return data_dict
192
193 - def getDailyData(self, options):
194 """ 195 Get daily data for the given station(s). 196 @param options: Options from the option parser. 197 @type options: parser options 198 @return: Deferred object 199 """ 200 def _work(): # Done by a thread 201 stn_dict = {} 202 data = self._get_data() 203 sD = [options.start.year, options.start.month, options.start.day] 204 eD = [options.end.year, options.end.month, options.end.day] 205 stn_dict['dates'] = new_date_array(sD, eD) 206 for ucan_id, stn, stntype in self.getUcanIdsFromOptions(options): 207 data_dict = {} 208 data_dict['stntype'] = stntype 209 if stntype == 'wrcc': 210 stn_dict[stn] = self.getWRCCDailyData(stn, options) 211 elif not ucan_id: # stn not found 212 data_dict['code'] = CODE_STN_NOT_FOUND 213 stn_dict[stn] = data_dict 214 else: # build a data dict for this stn 215 data_dict['code'] = CODE_STN_HAS_DATA 216 for var in options.var: 217 sD = [options.start.year, options.start.month, options.start.day] 218 eD = [options.end.year, options.end.month, options.end.day] 219 220 # Setup missing arrays of requested size 221 data_dict[var] = new_missing_array(sD,eD) 222 data_dict[var + '_valid_flag'] = new_missing_array(sD,eD,0) 223 data_dict[var + '_flag'] = new_missing_array(sD,eD,'') 224 225 # Get var_major ID and init TSVar 226 var_major = global_var_dict[var]['var_major'] 227 try: 228 tsvar = data.newTSVar(var_major,0,ucan_id) 229 #except Data.UnavailableDateRange: 230 except: 231 data_dict[var+'_code'] = CODE_NO_DATA 232 continue 233 234 # Set a valid date range 235 try: 236 valid_sD, valid_eD = tsvar.getValidDateRange() 237 if eD > valid_eD: 238 eD = valid_eD 239 if sD < valid_sD: 240 sD = valid_sD 241 if sD < eD: 242 tsvar.setDateRange(sD,eD) 243 else: 244 data_dict[var+'_code'] = CODE_INVALID_DATES 245 continue 246 except: 247 data_dict[var+'_code'] = CODE_INVALID_DATES 248 continue 249 250 # Get actual data. 251 # Since the date requested could be larger than the valid dates, 252 # figure out how to slice the data into the correct spot to 253 # correspond with the dates array. 254 data_seq = tsvar.getDataSeqAsFloat() 255 date_seq = tsvar.getDateArraySeq() 256 vflag_seq = tsvar.getValidFlagSeq() 257 flag_seq = getFilteredFlagSeq(tsvar.getFlagSeqAsShort()) 258 s_rec = None 259 for i in range(len(stn_dict['dates'])): 260 for j in range(len(date_seq)): 261 if tuple(date_seq[j]) == tuple(stn_dict['dates'][i]): 262 s_rec = i 263 break 264 if not s_rec is None: break 265 266 e_rec = s_rec + len(data_seq) 267 data_dict[var][s_rec:e_rec] = data_seq 268 data_dict[var + '_valid_flag'][s_rec:e_rec] = vflag_seq 269 data_dict[var + '_flag'][s_rec:e_rec] = flag_seq 270 data_dict[var + '_code'] = CODE_STN_HAS_DATA 271 272 tsvar.release() 273 stn_dict[stn] = data_dict 274 data.release(); self.data = None 275 return stn_dict
276 return threads.deferToThread(_work) 277
278 -class IACISFactory(Interface):
279 - def getMetaForStn(options):
280 """ Factory interface declaration. """
281 - def getDailyData(options):
282 """ Factory interface declaration. """
283
284 -class ACISFactoryFromService(protocol.ServerFactory):
285 implements(IACISFactory) 286 protocol = LineReceiver
287 - def __init__(self, acis_service):
288 self.acis_service = acis_service
289 - def getMetaForStn(self, options):
290 return self.acis_service.getMetaForStn(options)
291 - def getDailyData(self, options):
292 return self.acis_service.getDailyData(options)
293 components.registerAdapter(ACISFactoryFromService, IACISService, IACISFactory) 294 295 296 # 297 # XMLRPC Server 298 #
299 -class ACISXMLRPC(xmlrpc.XMLRPC):
300 - def __init__(self, acis_service):
301 xmlrpc.XMLRPC.__init__(self) 302 self.acis_service = acis_service 303 self.option_parser = build_parser() 304 if KEEP_PARSER_LOG: 305 self.logfile = open(PARSER_LOG,'a')
306
307 - def parse_input(self, input, required=[], defaults={}):
308 """ 309 Parse the given C{input}, using C{defaults} if not specified. 310 Transforms C{date} options to C{(date object, format string)}. 311 Transforms C{csv} options to lists. 312 313 Raises L{OptionRequired} if any C{required} not given in C{input}. 314 315 @raises OptionRequired: L{OptionRequired} 316 @param input: Input string or list. 317 @param required: List of required options. 318 @param defaults: Dictionary of default values. 319 @return: C{(options, args)} from C{option_parser.parse_args()} 320 @rtype: tuple 321 322 @todo: Put this in a more general location since it's not specific to XMLRPC. 323 """ 324 _date_attrs = ('start','end') 325 _csv_attrs = ('stn','var','stntype') 326 _format_options = DELIMITED_FORMATS.keys() 327 328 if isinstance(input, str): 329 input = input.split() 330 331 if KEEP_PARSER_LOG: 332 self.logfile.write(str(DateTime.DateTimeFromTicks(time.time()))) 333 self.logfile.write("|") 334 self.logfile.write(" ".join(input)) 335 self.logfile.write("\n") 336 options, args = self.option_parser.parse_args(input) 337 if KEEP_PARSER_LOG: 338 self.logfile.write("--> "+str(options)) 339 self.logfile.write("\n") 340 self.logfile.flush() 341 342 if options.help: 343 raise ReturnHelpText 344 345 # Check for required options 346 if required: 347 for req in required: 348 if getattr(options, req) is None: 349 raise OptionRequired, req 350 # Set default output format 351 if hasattr(options, 'format'): 352 if not getattr(options, 'format') in _format_options: 353 setattr(options, 'format', 'json') # default json format 354 # Set default output format 355 dateformat = getattr(options, 'dateformat') 356 if not dateformat in DATE_FORMATS.keys(): 357 dateformat = 'YYYYMMDD' 358 setattr(options, 'dateformat', (dateformat, DATE_FORMATS[dateformat])) 359 # Change dates from strings to date objects 360 for attr in _date_attrs: 361 val = getattr(options, attr, None) 362 if isinstance(val, str): 363 setattr(options, attr, date_from_string(val)) 364 # 365 start_date = getattr(options, 'start', None) 366 end_date = getattr(options, 'end', None) 367 if start_date and not end_date: 368 setattr(options, 'end', start_date + DateTime.DateTimeDeltaFromDays(DEFAULT_N_DAYS)) 369 if end_date and not start_date: 370 setattr(options, 'start', end_date - DateTime.DateTimeDeltaFromDays(DEFAULT_N_DAYS)) 371 if not start_date and not end_date: 372 defaults.update(dates_for_last_n_days(DEFAULT_N_DAYS)) 373 # Use default values if specified 374 for attr in defaults: 375 if getattr(options, attr, None) is None: 376 setattr(options, attr, defaults[attr]) 377 # Change csv to lists 378 for attr in _csv_attrs: 379 if hasattr(options, attr): 380 val = getattr(options, attr) 381 if hasattr(val, 'split'): 382 setattr(options, attr, val.split(',')) 383 elif not val is None: 384 setattr(options, attr, [str(val)]) 385 # Drop unrecognized vars 386 try: 387 for v in options.var[:]: 388 if not v in global_var_dict.keys(): 389 options.var.remove(v) 390 # Drop duplicate vars 391 options.var = unique_list(options.var) 392 except TypeError: 393 pass 394 # Do some logical checks 395 # Swap start and end if reversed 396 if hasattr(options, 'start') and hasattr(options, 'end'): 397 try: 398 if options.start > options.end: # swap start and end 399 options.start, options.end = options.end, options.start 400 except: 401 pass 402 # Set end date to a max of today + 1 403 if hasattr(options, 'end'): 404 tomorrow = DateTime.today() + DateTime.oneDay 405 if options.end > tomorrow: 406 options.end = tomorrow 407 try: 408 # Set stntype to lowercase 409 options.stntype = [str(s).lower() for s in options.stntype] 410 411 # Use last stntype by default 412 stntype = [options.stntype[-1]]*len(options.stn) 413 for i in range(len(options.stntype)): 414 stntype[i] = options.stntype[i] 415 options.stntype = stntype 416 except: 417 pass 418 419 return options, args
420 421
422 - def xmlrpc_getMetaForStn(self, input):
423 """ 424 Get metadata associated with given station(s). 425 426 @requires: C{stn}, C{stntype} 427 """ 428 required = ('stn', 'stntype') 429 defaults = { 430 'var': 'tmax,tmin,prcp', 431 } 432 try: # Parse input 433 options, args = self.parse_input(input, required, defaults) 434 except ReturnHelpText: 435 return self.option_parser.format_help() 436 except (OptionRequired, DateError), msg: 437 return ACISErrorResponse(msg).as_json() 438 439 d = self.acis_service.getMetaForStn(options) 440 return d.addCallback(ACISMetaDataResponse, options)\ 441 .addCallback(lambda x: x.as_format(options.format))
442
443 - def xmlrpc_getDailyData(self, input):
444 """ 445 Get daily data for the a day range for the given station(s). 446 By default, return the last 7 days. 447 Stations that fail to resolve to a ucan_id will have no data. 448 449 @requires: C{stn}, C{stntype}, C{var} 450 """ 451 required = ('stn', 'stntype', 'var') 452 try: # Parse input 453 options, args = self.parse_input(input, required) 454 except ReturnHelpText: 455 return self.option_parser.format_help() 456 except (OptionRequired, DateError), msg: 457 return ACISErrorResponse(msg).as_json() 458 459 d = self.acis_service.getDailyData(options) 460 return d.addCallback(ACISDataResponse, options)\ 461 .addCallback(lambda x: x.as_format(options.format))
462
463 - def xmlrpc_echoOptions(self, input):
464 """ 465 Echo the input options after parsing. For testing purposes only. 466 """ 467 required = None 468 defaults = {} 469 try: # Parse input 470 options, args = self.parse_input(input, required) 471 except ReturnHelpText: 472 return self.option_parser.format_help() 473 except (OptionRequired, DateError), msg: 474 return ACISErrorResponse(msg).as_json() 475 return str(options)
476