Logo Search packages:      
Sourcecode: suds version File versions  Download package

sxbasic.py

# This program is free software; you can redistribute it and/or modify
# it under the terms of the (LGPL) GNU Lesser General Public License as
# published by the Free Software Foundation; either version 3 of the 
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Library Lesser General Public License for more details at
# ( http://www.gnu.org/licenses/lgpl.html ).
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
# written by: Jeff Ortel ( jortel@redhat.com )

"""
The I{sxbasic} module provides classes that represent
I{basic} schema objects.
"""

from logging import getLogger
from suds import *
from suds.xsd import *
from suds.xsd.sxbase import *
from suds.xsd.query import *
from suds.sax import splitPrefix, Namespace
from suds.transport import TransportError
from suds.reader import DocumentReader
from urlparse import urljoin


log = getLogger(__name__)


00036 class RestrictionMatcher:
    """
    For use with L{NodeFinder} to match restriction.
    """
    def match(self, n):
        return isinstance(n, Restriction)
    

00044 class TypedContent(Content):
    """
    Represents any I{typed} content.
    """
00048     def resolve(self, nobuiltin=False):
        qref = self.qref()
        if qref is None:
            return self
        key = 'resolved:nb=%s' % nobuiltin
        cached = self.cache.get(key)
        if cached is not None:
            return cached
        result = self
        query = TypeQuery(qref)
        query.history = [self]
        log.debug('%s, resolving: %s\n using:%s', self.id, qref, query)
        resolved = query.execute(self.schema)
        if resolved is None:
            log.debug(self.schema)
            raise TypeNotFound(qref)
        self.cache[key] = resolved
        if resolved.builtin():
            if nobuiltin:
                result = self
            else:
                result = resolved
        else:
            result = resolved.resolve(nobuiltin)
        return result
    
00074     def qref(self):
        """
        Get the I{type} qualified reference to the referenced xsd type.
        This method takes into account simple types defined through
        restriction with are detected by determining that self is simple
        (len=0) and by finding a restriction child.
        @return: The I{type} qualified reference.
        @rtype: qref
        """
        qref = self.type
        if qref is None and len(self) == 0:
            ls = []
            m = RestrictionMatcher()
            finder = NodeFinder(m, 1)
            finder.find(self, ls)
            if len(ls):
                return ls[0].ref
        return qref


00094 class Complex(SchemaObject):
    """
    Represents an (xsd) schema <xs:complexType/> node.
    @cvar childtags: A list of valid child node names
    @type childtags: (I{str},...)
    """
        
00101     def childtags(self):
        return (
            'attribute', 
            'attributeGroup', 
            'sequence', 
            'all', 
            'choice', 
            'complexContent',
            'simpleContent', 
            'any', 
            'group')

00113     def description(self):
        return ('name',)
    
00116     def extension(self):
        for c in self.rawchildren:
            if c.extension():
                return True
        return False


00123 class Group(SchemaObject):
    """
    Represents an (xsd) schema <xs:group/> node.
    @cvar childtags: A list of valid child node names
    @type childtags: (I{str},...)
    """
        
00130     def childtags(self):
        return ('sequence', 'all', 'choice')
        
00133     def dependencies(self):
        deps = []
        midx = None
        if self.ref is not None:     
            query = GroupQuery(self.ref)
            g = query.execute(self.schema)
            if g is None:
                log.debug(self.schema)
                raise TypeNotFound(self.ref)
            deps.append(g)
            midx = 0
        return (midx, deps)
    
00146     def merge(self, other):
        SchemaObject.merge(self, other)
        self.rawchildren = other.rawchildren

00150     def description(self):
        return ('name', 'ref',)
    

00154 class AttributeGroup(SchemaObject):
    """
    Represents an (xsd) schema <xs:attributeGroup/> node.
    @cvar childtags: A list of valid child node names
    @type childtags: (I{str},...)
    """
        
00161     def childtags(self):
        return ('attribute', 'attributeGroup')

00164     def dependencies(self):
        deps = []
        midx = None
        if self.ref is not None:
            query = AttrGroupQuery(self.ref)
            ag = query.execute(self.schema)
            if ag is None:
                log.debug(self.schema)
                raise TypeNotFound(self.ref)
            deps.append(ag)
            midx = 0
        return (midx, deps)
    
00177     def merge(self, other):
        SchemaObject.merge(self, other)
        self.rawchildren = other.rawchildren

00181     def description(self):
        return ('name', 'ref',)
    

00185 class Simple(SchemaObject):
    """
    Represents an (xsd) schema <xs:simpleType/> node
    """

00190     def childtags(self):
        return ('restriction', 'any', 'list',)
    
00193     def enum(self):
        for child, ancestry in self.children():
            if isinstance(child, Enumeration):
                return True
        return False

00199     def description(self):
        return ('name',)
    
00202     def extension(self):
        for c in self.rawchildren:
            if c.extension():
                return True
        return False
    
00208     def restriction(self):
        for c in self.rawchildren:
            if c.restriction():
                return True
        return False
    

00215 class List(SchemaObject):
    """
    Represents an (xsd) schema <xs:list/> node
    """

00220     def childtags(self):
        return ()

00223     def description(self):
        return ('name',)
    
00226     def xslist(self):
        return True

   
00230 class Restriction(SchemaObject):
    """
    Represents an (xsd) schema <xs:restriction/> node
    """
    
00235     def __init__(self, schema, root):
        SchemaObject.__init__(self, schema, root)
        self.ref = root.get('base')

00239     def childtags(self):
        return ('enumeration', 'attribute', 'attributeGroup')
    
00242     def dependencies(self):
        deps = []
        midx = None
        if self.ref is not None:
            query = TypeQuery(self.ref)
            super = query.execute(self.schema)
            if super is None:
                log.debug(self.schema)
                raise TypeNotFound(self.ref)
            if not super.builtin():
                deps.append(super)
                midx = 0
        return (midx, deps)
    
00256     def restriction(self):
        return True

00259     def merge(self, other):
        SchemaObject.merge(self, other)
        filter = Filter(False, self.rawchildren)
        self.prepend(self.rawchildren, other.rawchildren, filter)
        
00264     def description(self):
        return ('ref',)
    
    
00268 class Collection(SchemaObject):
    """
    Represents an (xsd) schema collection node:
        - sequence
        - choice
        - all
    """

00276     def childtags(self):
        return ('element', 'sequence', 'all', 'choice', 'any', 'group')


00280 class Sequence(Collection):
    """
    Represents an (xsd) schema <xs:sequence/> node.
    """
00284     def sequence(self):
        return True


00288 class All(Collection):
    """
    Represents an (xsd) schema <xs:all/> node.
    """
00292     def all(self):
        return True

00295 class Choice(Collection):
    """
    Represents an (xsd) schema <xs:choice/> node.
    """
00299     def choice(self):
        return True


00303 class ComplexContent(SchemaObject):
    """
    Represents an (xsd) schema <xs:complexContent/> node.
    """
        
00308     def childtags(self):
        return ('attribute', 'attributeGroup', 'extension', 'restriction')
    
00311     def extension(self):
        for c in self.rawchildren:
            if c.extension():
                return True
        return False
    
00317     def restriction(self):
        for c in self.rawchildren:
            if c.restriction():
                return True
        return False


00324 class SimpleContent(SchemaObject):
    """
    Represents an (xsd) schema <xs:simpleContent/> node.
    """
        
00329     def childtags(self):
        return ('extension', 'restriction')
    
00332     def extension(self):
        for c in self.rawchildren:
            if c.extension():
                return True
        return False
    
00338     def restriction(self):
        for c in self.rawchildren:
            if c.restriction():
                return True
        return False


00345 class Enumeration(Content):
    """
    Represents an (xsd) schema <xs:enumeration/> node
    """

00350     def __init__(self, schema, root):
        Content.__init__(self, schema, root)
        self.name = root.get('value')
        
00354     def enum(self):
        return True

    
00358 class Element(TypedContent):
    """
    Represents an (xsd) schema <xs:element/> node.
    """
    
00363     def __init__(self, schema, root):
        TypedContent.__init__(self, schema, root)
        a = root.get('form')
        if a is not None:
            self.form_qualified = ( a == 'qualified' )
        a = self.root.get('nillable')
        if a is not None:
            self.nillable = ( a in ('1', 'true') )
        self.implany()
            
00373     def implany(self):
        """
        Set the type as any when implicit.
        An implicit <xs:any/> is when an element has not
        body and no type defined.
        @return: self
        @rtype: L{Element}
        """
        if self.type is None and \
            self.ref is None and \
            self.root.isempty():
                self.type = self.anytype()
        return self
        
00387     def childtags(self):
        return ('attribute', 'simpleType', 'complexType', 'any',)
    
00390     def extension(self):
        for c in self.rawchildren:
            if c.extension():
                return True
        return False
    
00396     def restriction(self):
        for c in self.rawchildren:
            if c.restriction():
                return True
        return False
    
00402     def dependencies(self):
        deps = []
        midx = None
        if self.ref is not None:
            query = ElementQuery(self.ref)
            e = query.execute(self.schema)
            if e is None:
                log.debug(self.schema)
                raise TypeNotFound(self.ref)
            deps.append(e)
            midx = 0
        return (midx, deps)
    
00415     def merge(self, other):
        SchemaObject.merge(self, other)
        self.rawchildren = other.rawchildren

00419     def description(self):
        return ('name', 'ref', 'type')
        
00422     def anytype(self):
        """ create an xsd:anyType reference """
        p,u = Namespace.xsdns
        mp = self.root.findPrefix(u)
        if mp is None:
            mp = p
            self.root.addPrefix(p, u)
        return ':'.join((mp, 'anyType'))


00432 class Extension(SchemaObject):
    """
    Represents an (xsd) schema <xs:extension/> node.
    """
    
00437     def __init__(self, schema, root):
        SchemaObject.__init__(self, schema, root)
        self.ref = root.get('base')
        
00441     def childtags(self):
        return ('attribute',
                'attributeGroup', 
                'sequence', 
                'all', 
                'choice', 
                'group')
        
00449     def dependencies(self):
        deps = []
        midx = None
        if self.ref is not None:
            query = TypeQuery(self.ref)
            super = query.execute(self.schema)
            if super is None:
                log.debug(self.schema)
                raise TypeNotFound(self.ref)
            if not super.builtin():
                deps.append(super)
                midx = 0
        return (midx, deps)

00463     def merge(self, other):
        SchemaObject.merge(self, other)
        filter = Filter(False, self.rawchildren)
        self.prepend(self.rawchildren, other.rawchildren, filter)
        
00468     def extension(self):
        return ( self.ref is not None )

00471     def description(self):
        return ('ref',)


00475 class Import(SchemaObject):
    """
    Represents an (xsd) schema <xs:import/> node
    @cvar locations: A dictionary of namespace locations.
    @type locations: dict
    @ivar ns: The imported namespace.
    @type ns: str
    @ivar location: The (optional) location.
    @type location: namespace-uri
    @ivar opened: Opened and I{imported} flag.
    @type opened: boolean
    """
    
    locations = {}
    
    @classmethod
00491     def bind(cls, ns, location=None):
        """
        Bind a namespace to a schema location (URI).  
        This is used for imports that don't specify a schemaLocation.
        @param ns: A namespace-uri.
        @type ns: str
        @param location: The (optional) schema location for the
            namespace.  (default=ns).
        @type location: str
        """
        if location is None:
            location = ns
        cls.locations[ns] = location
    
00505     def __init__(self, schema, root):
        SchemaObject.__init__(self, schema, root)
        self.ns = (None, root.get('namespace'))
        self.location = root.get('schemaLocation')
        if self.location is None:
            self.location = self.locations.get(self.ns[1])
        self.opened = False
        
00513     def open(self, options):
        """
        Open and import the refrenced schema.
        @param options: An options dictionary.
        @type options: L{options.Options}
        @return: The referenced schema.
        @rtype: L{Schema}
        """
        if self.opened:
            return
        self.opened = True
        log.debug('%s, importing ns="%s", location="%s"', self.id, self.ns[1], self.location)
        result = self.locate()
        if result is None:
            if self.location is None:
                log.debug('imported schema (%s) not-found', self.ns[1])
            else:
                result = self.download(options)
        log.debug('imported:\n%s', result)
        return result
    
00534     def locate(self):
        """ find the schema locally """
        if self.ns[1] == self.schema.tns[1]:
            return None
        else:
            return self.schema.locate(self.ns)

00541     def download(self, options):
        """ download the schema """
        url = self.location
        try:
            if '://' not in url:
                url = urljoin(self.schema.baseurl, url)
            reader = DocumentReader(options)
            d = reader.open(url)
            root = d.root()
            root.set('url', url)
            return self.schema.instance(root, url, options)
        except TransportError:
            msg = 'imported schema (%s) at (%s), failed' % (self.ns[1], url)
            log.error('%s, %s', self.id, msg, exc_info=True)
            raise Exception(msg)
 
00557     def description(self):
        return ('ns', 'location')
    

00561 class Include(SchemaObject):
    """
    Represents an (xsd) schema <xs:include/> node
    @ivar location: The (optional) location.
    @type location: namespace-uri
    @ivar opened: Opened and I{imported} flag.
    @type opened: boolean
    """
    
    locations = {}
    
00572     def __init__(self, schema, root):
        SchemaObject.__init__(self, schema, root)
        self.location = root.get('schemaLocation')
        if self.location is None:
            self.location = self.locations.get(self.ns[1])
        self.opened = False
        
00579     def open(self, options):
        """
        Open and include the refrenced schema.
        @param options: An options dictionary.
        @type options: L{options.Options}
        @return: The referenced schema.
        @rtype: L{Schema}
        """
        if self.opened:
            return
        self.opened = True
        log.debug('%s, including location="%s"', self.id, self.location)
        result = self.download(options)
        log.debug('included:\n%s', result)
        return result

00595     def download(self, options):
        """ download the schema """
        url = self.location
        try:
            if '://' not in url:
                url = urljoin(self.schema.baseurl, url)
            reader = DocumentReader(options)
            d = reader.open(url)
            root = d.root()
            root.set('url', url)
            self.__applytns(root)
            return self.schema.instance(root, url, options)
        except TransportError:
            msg = 'include schema at (%s), failed' % url
            log.error('%s, %s', self.id, msg, exc_info=True)
            raise Exception(msg)
        
00612     def __applytns(self, root):
        """ make sure included schema has same tns. """
        TNS = 'targetNamespace'
        tns = root.get(TNS)
        if tns is None:
            tns = self.schema.tns[1]
            root.set(TNS, tns)
        else:
            if self.schema.tns[1] != tns:
                raise Exception, '%s mismatch' % TNS
                
 
00624     def description(self):
        return ('location')

   
00628 class Attribute(TypedContent):
    """
    Represents an (xsd) <attribute/> node
    """

00633     def __init__(self, schema, root):
        TypedContent.__init__(self, schema, root)
        self.use = root.get('use', default='')
        
00637     def childtags(self):
        return ('restriction',)
        
00640     def isattr(self):
        return True

00643     def get_default(self):
        """
        Gets the <xs:attribute default=""/> attribute value.
        @return: The default value for the attribute
        @rtype: str
        """
        return self.root.get('default', default='')
    
00651     def optional(self):
        return ( self.use != 'required' )

00654     def dependencies(self):
        deps = []
        midx = None
        if self.ref is not None:
            query = AttrQuery(self.ref)
            a = query.execute(self.schema)
            if a is None:
                log.debug(self.schema)
                raise TypeNotFound(self.ref)
            deps.append(a)
            midx = 0
        return (midx, deps)
    
00667     def description(self):
        return ('name', 'ref', 'type')


00671 class Any(Content):
    """
    Represents an (xsd) <any/> node
    """

00676     def get_child(self, name):
        root = self.root.clone()
        root.set('note', 'synthesized (any) child')
        child = Any(self.schema, root)
        return (child, [])
    
00682     def get_attribute(self, name):
        root = self.root.clone()
        root.set('note', 'synthesized (any) attribute')
        attribute = Any(self.schema, root)
        return (attribute, [])
    
00688     def any(self):
        return True
    
    
00692 class Factory:
    """
    @cvar tags: A factory to create object objects based on tag.
    @type tags: {tag:fn,}
    """

    tags =\
    {
        'import' : Import,
        'include' : Include, 
        'complexType' : Complex,
        'group' : Group,
        'attributeGroup' : AttributeGroup, 
        'simpleType' : Simple,
        'list' : List,
        'element' : Element,
        'attribute' : Attribute,
        'sequence' : Sequence,
        'all' : All,
        'choice' : Choice,
        'complexContent' : ComplexContent,
        'simpleContent' : SimpleContent,
        'restriction' : Restriction,
        'enumeration' : Enumeration,
        'extension' : Extension,
        'any' : Any,
    }
    
    @classmethod
00721     def maptag(cls, tag, fn):
        """
        Map (override) tag => I{class} mapping.
        @param tag: An xsd tag name.
        @type tag: str
        @param fn: A function or class.
        @type fn: fn|class.
        """
        cls.tags[tag] = fn
    
    @classmethod
00732     def create(cls, root, schema):
        """
        Create an object based on the root tag name.
        @param root: An XML root element.
        @type root: L{Element}
        @param schema: A schema object.
        @type schema: L{schema.Schema}
        @return: The created object.
        @rtype: L{SchemaObject} 
        """
        fn = cls.tags.get(root.name)
        if fn is not None:
            return fn(schema, root)
        else:
            return None

    @classmethod
00749     def build(cls, root, schema, filter=('*',)):
        """
        Build an xsobject representation.
        @param root: An schema XML root.
        @type root: L{sax.element.Element}
        @param filter: A tag filter.
        @type filter: [str,...]
        @return: A schema object graph.
        @rtype: L{sxbase.SchemaObject}
        """
        children = []
        for node in root.getChildren(ns=Namespace.xsdns):
            if '*' in filter or node.name in filter:
                child = cls.create(node, schema)
                if child is None:
                    continue
                children.append(child)
                c = cls.build(node, schema, child.childtags())
                child.rawchildren = c
        return children
    
    @classmethod
    def collate(cls, children):
        imports = []
        elements = {}
        attributes = {}
        types = {}
        groups = {}
        agrps = {}
        for c in children:
            if isinstance(c, (Import, Include)):
                imports.append(c)
                continue
            if isinstance(c, Attribute):
                attributes[c.qname] = c
                continue
            if isinstance(c, Element):
                elements[c.qname] = c
                continue
            if isinstance(c, Group):
                groups[c.qname] = c
                continue
            if isinstance(c, AttributeGroup):
                agrps[c.qname] = c
                continue
            types[c.qname] = c
        for i in imports:
            children.remove(i)
        return (children, imports, attributes, elements, types, groups, agrps)

    


#######################################################
# Static Import Bindings :-(
#######################################################
Import.bind(
    'http://schemas.xmlsoap.org/soap/encoding/',
    'suds://schemas.xmlsoap.org/soap/encoding/')
Import.bind(
    'http://www.w3.org/XML/1998/namespace',
    'http://www.w3.org/2001/xml.xsd')
Import.bind(
    'http://www.w3.org/2001/XMLSchema',
    'http://www.w3.org/2001/XMLSchema.xsd')

Generated by  Doxygen 1.6.0   Back to index