Package Bio :: Package config :: Module SeqDBRegistry
[hide private]
[frames] | no frames]

Source Code for Module Bio.config.SeqDBRegistry

  1  # Copyright 2002 by Jeffrey Chang.  All rights reserved. 
  2  # This code is part of the Biopython distribution and governed by its 
  3  # license.  Please see the LICENSE file that should have been included 
  4  # as part of this package. 
  5   
  6  """This module handles seqdatabase.INI file. 
  7   
  8  Classes: 
  9  SeqDBRegistry   Holds databases from seqdatabase.INI. 
 10   
 11  """ 
 12  import os 
 13   
 14  from Bio.config import DBRegistry 
 15   
 16  # Functions: 
 17  # _list_ini_paths    Return a list of standard places for INI files. 
 18  # _list_ini_files    Return a list of INI files that exist. 
 19  #  
 20  # _load_ini_file     Load an INI file into a set of RegisterableObjects. 
 21  # _make_flat_db 
 22  # _make_biofetch_db 
 23  # _make_biosql_db 
 24  # 
 25  # _openfu            Open a file or URL. 
 26  # _warn              Register a warning message. 
 27   
28 -class SeqDBRegistry(DBRegistry.DBRegistry):
29 """This object implements a dictionary-like interface to sequence 30 databases. To get a list of the databases available, do: 31 Bio.seqdb.keys() 32 33 Then, you can access the database using: 34 Bio.seqdb[DATABASE_NAME][SEQUENCE_ID] 35 36 """
37 - def __init__(self, name):
39
40 - def _load(self, path):
41 # path is always None here 42 sources = _list_ini_files("seqdatabase.ini") 43 for file in sources: 44 objects = _load_registry_objects(file) 45 if objects: 46 for obj in objects: 47 self.register(obj) 48 break # Use the first one that exists. 49 else: 50 _warn("I could not load any seqdatabase files.")
51
52 - def __getitem__(self, name):
53 try: 54 return DBRegistry.DBRegistry.__getitem__(self, name) 55 except KeyError, x: 56 raise KeyError, "Unknown database: %s" % str(x)
57 58 59 seqdb = SeqDBRegistry("seqdb") 60
61 -def _warn(message):
62 import warnings 63 warnings.warn(message)
64
65 -def _load_registry_objects(ini_file):
66 # Return a list of RegisterableObjects. 67 import _stanzaformat 68 69 try: 70 stanzas = _stanzaformat.load(_openfu(ini_file)) 71 except ValueError, x: 72 _warn("Can't load seqdb. Syntax error in %s: %s" % (ini_file, str(x))) 73 return None 74 75 # Make sure the file is the right version. 76 if stanzas.version > "1.00": 77 _warn("I can't handle stanza files with version %s" % stanzas.version) 78 return None 79 80 protocol2handler = { 81 'flat' : _make_flat_db, 82 'biofetch' : _make_biofetch_db, 83 'biosql' : _make_biosql_db, 84 } 85 86 inidata = [] # list of (section name, section key, dict of tag->value) 87 for stanza in stanzas.stanzas: 88 section_name, tagvalue_dict = stanza.name, stanza.tag_value_dict 89 section_key = section_name.lower() # case insensitive 90 inidata.append((section_name, section_key, tagvalue_dict)) 91 92 # Do some checking on each of the stanzas. If there are errors, 93 # then ignore them. 94 seen = {} # Which sections we have already seen. 95 i = 0 96 while i < len(inidata): 97 section_name, section_key, tagvalue_dict = inidata[i] 98 # Make sure the stanza has a "protocol". 99 if "protocol" not in tagvalue_dict: 100 _warn("%s stanza missing 'protocol'. Skipping" % section_name) 101 del inidata[i] 102 # Make sure the stanza has a "location". 103 elif "location" not in tagvalue_dict: 104 _warn("%s stanza missing 'location'. Skipping" % section_name) 105 del inidata[i] 106 # Make sure we can handle the "protocol". 107 elif tagvalue_dict['protocol'] not in protocol2handler: 108 _warn("%s protocol not handled. Skipping" % 109 tagvalue_dict['protocol']) 110 del inidata[i] 111 # Make sure this stanza has not already been defined. 112 elif section_key in seen: 113 _warn("%s stanza already exists. Skipping" % 114 section_key) 115 del inidata[i] 116 else: 117 seen[section_key] = 1 118 i += 1 119 120 # serial_groups is a list of fallback groups. This is an 121 # undocumented feature unsupported in the OBDA spec 1.00! 122 registry_objects = [] # list of RegisterableObjects 123 serial_groups = [] # list of group_name, obj in group 124 for section_name, section_key, tagvalue_dict in inidata: 125 handler = protocol2handler.get(tagvalue_dict['protocol']) 126 obj = handler(section_name, tagvalue_dict) 127 registry_objects.append(obj) 128 129 if tagvalue_dict.has_key("fallback_group"): 130 group_name = tagvalue_dict['fallback_group'] 131 serial_groups.append((group_name, obj)) 132 133 # Now make the group objects. 134 groups = {} # name -> DBGroup object 135 for group_name, obj in serial_groups: 136 if not groups.has_key(group_name): 137 groups[group_name] = DBRegistry.DBGroup( 138 group_name, behavior="serial") 139 groups[group_name].add(obj) 140 registry_objects.extend(groups.values()) 141 return registry_objects
142
143 -def _make_biofetch_db(name, tagvalue_dict):
144 from Martel import Str 145 # Make the CGIDB object for the registry. 146 params = {} 147 params['name'] = name 148 params['cgi'] = tagvalue_dict['location'] 149 dbname = tagvalue_dict.get("dbname", "embl") 150 params['params'] = [('style', 'raw'), 151 ('db', dbname), 152 ] 153 params['key'] = 'id' 154 params['doc'] = "Retrieve sequences from the %s database." % dbname 155 156 params['failure_cases'] = [ 157 (Str("ERROR 1"), "Unknown database."), 158 (Str("ERROR 2"), "Unknown style."), 159 (Str("ERROR 3"), "Format not known for database."), 160 (Str("ERROR 4"), "ID not found in database."), 161 (Str("ERROR 5"), "Too many IDs."), 162 ] 163 164 # All other params are ignored silently. 165 return DBRegistry.CGIDB(**params)
166 167 ##def _make_biocorba_db(name, tagvalue_dict): 168 ## """Register a BioCorba style database defined in the registry.""" 169 ## params = {} 170 ## params['name'] = name 171 ## params['ior_ref'] = tagvalue_dict['location'] 172 ## return DBRegistry.BioCorbaDB(**params) 173
174 -def _make_biosql_db(name, tagvalue_dict):
175 """Register a BioSQL database defined in the registry.""" 176 import re 177 params = {} 178 params['name'] = name 179 180 # Make sure the location has the right format. 181 if not re.match(r"[a-zA-Z0-9_]+:\d+$", tagvalue_dict['location']): 182 _warn("Invalid location string: %s. I want <host:port>. Skipping" % 183 tagvalue_dict['location']) 184 host, port = tagvalue_dict['location'].split(":") 185 params['db_host'] = host 186 params['db_port'] = port 187 188 params['sql_db'] = tagvalue_dict['biodbname'] 189 params['db_type'] = tagvalue_dict.get('driver', 'mysql').lower() 190 params['db_user'] = tagvalue_dict.get('user', 'root') 191 params['db_passwd'] = tagvalue_dict.get('passwd', '') 192 params['namespace_db'] = tagvalue_dict['dbname'] 193 194 params["doc"] = "Retrieve %s sequences from BioSQL hosted at %s" % ( 195 tagvalue_dict['dbname'], host) 196 197 return DBRegistry.BioSQLDB(**params)
198
199 -def _make_flat_db(name, tagvalue_dict):
200 """Register a Berkeley or Flat indexed file defined in the registry.""" 201 params = {} 202 params['name'] = name 203 params['dbname'] = tagvalue_dict["dbname"] 204 params['doc'] = "Retrieve %s sequences from a local database." % \ 205 tagvalue_dict["dbname"] 206 return DBRegistry.IndexedFileDB(**params)
207
208 -def _openfu(file_or_url):
209 """Guess whether this is a file or url and open it.""" 210 if file_or_url[:4].lower() == 'http': 211 import urllib 212 return urllib.urlopen(file_or_url) 213 # doesn't look like a URL, guess it's a file. 214 return open(file_or_url)
215
216 -def _list_ini_paths():
217 """_list_ini_paths() -> list of URL's or paths to search for files. 218 219 The default places to look for registry files are: 220 - ${HOME}/.bioinformatics 221 - /etc/bioinformatics 222 - http://www.open-bio.org/registry 223 224 The OBDA_SEARCH_PATH environment variable, if specified, overrides 225 the default. This should be a "+" separated list of paths or 226 URL's. 227 228 """ 229 if os.environ.has_key("OBDA_SEARCH_PATH"): 230 paths = os.environ["OBDA_SEARCH_PATH"].split("+") 231 else: 232 paths = [ 233 os.path.join(os.sep, "etc", "bioinformatics"), #/etc/bioinformatics 234 "http://www.open-bio.org/registry", 235 ] 236 # $HOME/.bioinformatics 237 if os.environ.has_key("HOME"): 238 p = os.path.join(os.environ["HOME"], ".bioinformatics") 239 paths.insert(0, p) 240 return paths
241
242 -def _list_ini_files(filename, also_search=[]):
243 """_list_ini_files(filename) -> list of files to search (in order)""" 244 files = [] 245 searchpath = _list_ini_paths() + also_search 246 for path in searchpath: 247 # works for files and urls 248 fullname = os.path.join(path, filename) 249 # Check to see if this name works. If so, add it to the list. 250 try: 251 _openfu(fullname) 252 except IOError: 253 pass 254 else: 255 files.append(fullname) 256 return files
257 258 ##def _urlparamdecode(string): 259 ## """Return a list of (tag, value) from a URL's GET string""" 260 ## params = [] 261 ## pairs = string.split("&") 262 ## for tagvalue in pairs: 263 ## i = tagvalue.find("=") 264 ## if i >= 0: 265 ## tag, value = tagvalue[:i], tagvalue[i+1:] 266 ## else: 267 ## tag, value = "", tagvalue 268 ## tag, value = urllib.unquote(tag), urllib.unquote(value) 269 ## params.append((tag, value)) 270 ## return params 271