#!/usr/bin/env python
# -*- encoding: utf-8 -*-
# Copyright 2011-2021, Nigel Small
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
__all__ = [
"Property",
"Label",
"Related",
"RelatedTo",
"RelatedFrom",
"RelatedObjects",
"ModelType",
"Model", "Model",
"ModelMatch",
"ModelMatcher",
"Repository",
]
from collections import OrderedDict
from interchange.collections import PropertyDict
from interchange.text import Words
from py2neo.compat import metaclass, deprecated
from py2neo.cypher import cypher_escape
from py2neo.data import Node
from py2neo.database import Graph
from py2neo.matching import NodeMatch, NodeMatcher
OUTGOING = 1
UNDIRECTED = 0
INCOMING = -1
[docs]
class Property(object):
""" Property definition for a :class:`.Model`.
Attributes:
key: The name of the node property within the database.
default: The default value for the property, if it would
otherwise be :const:`None`.
"""
def __init__(self, key=None, default=None):
""" Initialise a property definition.
Args:
key: The name of the node property within the database. If
omitted, the name of the class attribute is used.
default: The default value for the property, if it would
otherwise be :const:`None`.
"""
self.key = key
self.default = default
def __get__(self, instance, owner):
value = instance.__node__[self.key]
if value is None:
value = self.default
return value
def __set__(self, instance, value):
instance.__node__[self.key] = value
def __repr__(self):
args = OrderedDict()
if self.key is not None:
args["key"] = self.key
if self.default is not None:
args["default"] = self.default
return "%s(%s)" % (self.__class__.__name__,
", ".join("%s=%r" % arg for arg in args.items()))
[docs]
class Label(object):
""" Label definition for a :class:`.Model`.
Labels are toggleable tags applied to an object that can be used as type
information or other forms of classification.
"""
def __init__(self, name=None):
self.name = name
def __get__(self, instance, owner):
return instance.__node__.has_label(self.name)
def __set__(self, instance, value):
if value:
instance.__node__.add_label(self.name)
else:
instance.__node__.remove_label(self.name)
def __repr__(self):
args = OrderedDict()
if self.name is not None:
args["name"] = self.name
return "%s(%s)" % (self.__class__.__name__,
", ".join("%s=%r" % arg for arg in args.items()))
def _resolve_class(model, current_module_name):
if isinstance(model, type):
return model
module_name, _, class_name = model.rpartition(".")
if not module_name:
module_name = current_module_name
module = __import__(module_name, fromlist=".")
return getattr(module, class_name)
class OGM(object):
def __init__(self, subject, node):
self.subject = subject
self.node = node
self._related = {}
def all_related(self):
""" Return an iterator through all :class:`.RelatedObjects`.
"""
return iter(self._related.values())
def related(self, direction, relationship_type, related_class):
""" Return :class:`.RelatedObjects` for given criteria.
"""
key = (direction, relationship_type)
if key not in self._related:
self._related[key] = RelatedObjects(self.subject, self.node, direction,
relationship_type, related_class)
return self._related[key]
class ModelType(type):
def __new__(mcs, name, bases, attributes):
for attr_name, attr in list(attributes.items()):
if isinstance(attr, Property):
if attr.key is None:
attr.key = attr_name
if attr.__doc__ is attr.__class__.__doc__:
attr.__doc__ = repr(attr)
elif isinstance(attr, Label):
if attr.name is None:
attr.name = Words(attr_name).camel(upper_first=True)
if attr.__doc__ is attr.__class__.__doc__:
attr.__doc__ = repr(attr)
elif isinstance(attr, Related):
if attr.relationship_type is None:
attr.relationship_type = Words(attr_name).upper("_")
if attr.__doc__ is attr.__class__.__doc__:
def related_repr(obj):
try:
args = ":class:`%s`" % obj.related_class.__qualname__
except AttributeError:
args = ":class:`.%s`" % obj.related_class
if obj.relationship_type is not None:
args += ", relationship_type=%r" % obj.relationship_type
return "%s(%s)" % (obj.__class__.__name__, args)
attr.__doc__ = related_repr(attr)
attributes.setdefault("__primarylabel__", name)
primary_key = attributes.get("__primarykey__")
if primary_key is None:
for base in bases:
if primary_key is None and hasattr(base, "__primarykey__"):
primary_key = getattr(base, "__primarykey__")
break
else:
primary_key = "__id__"
attributes["__primarykey__"] = primary_key
return super(ModelType, mcs).__new__(mcs, name, bases, attributes)
[docs]
@metaclass(ModelType)
class Model(object):
""" Base class for all OGM object classes.
This class provides a default constructor, allowing initial
property values to be provided. Positional arguments are mapped,
in order, to primary key fields; keywords arguments are mapped
directly by name.
*Changed in 2020.0: this used to be called GraphObject, but was
renamed to avoid ambiguity. The old name is still available as an
alias.*
*Changed in 2021.2: added default constructor.*
"""
__primarylabel__ = None
__primarykey__ = None
__ogm = None
def __init__(self, *values, **properties):
p = {}
if self.__primarykey__ == "__id__":
primary_key = ()
elif isinstance(self.__primarykey__, tuple):
primary_key = self.__primarykey__
else:
primary_key = (self.__primarykey__,)
for i, value in enumerate(values):
try:
key = primary_key[i]
except IndexError:
raise IndexError("Model constructor value at index %d does not correspond to "
"any part of primary key %r" % (i, self.__primarykey__))
else:
p[key] = value
p.update(properties)
for key, value in p.items():
try:
setattr(self, key, value)
except AttributeError:
raise ValueError("No such property %r" % key)
def __eq__(self, other):
if self is other:
return True
try:
self_node = self.__node__
other_node = other.__node__
if any(x is None for x in [self_node.graph, other_node.graph, self_node.identity, other_node.identity]):
if self.__primarylabel__ != other.__primarylabel__:
return False
if (self.__primarykey__ == other.__primarykey__ and
self.__primaryvalue__ == other.__primaryvalue__):
if self.__primarykey__ == "__id__" and self.__primaryvalue__ is None:
# If __id__ is the primary key but the value
# isn't yet set, assume the objects are
# not equal.
#
# See https://github.com/technige/py2neo/issues/839
#
return False
else:
return True
else:
return False
return (type(self) is type(other) and
self_node.graph == other_node.graph and
self_node.identity == other_node.identity)
except (AttributeError, TypeError):
return False
def __ne__(self, other):
return not self.__eq__(other)
@property
def __ogm__(self):
if self.__ogm is None:
if isinstance(self.__primarylabel__, tuple):
node = Node(*self.__primarylabel__)
else:
node = Node(self.__primarylabel__)
node.__model__ = self.__class__
self.__ogm = OGM(self, node)
return self.__ogm
[docs]
@classmethod
def wrap(cls, node):
""" Convert a :class:`.Node` into a :class:`.Model`.
:param node:
:return:
"""
if node is None:
return None
node.__model__ = cls
obj = Model()
obj.__ogm = OGM(obj, node)
obj.__class__ = cls
return obj
[docs]
@classmethod
def match(cls, repository, primary_value=None):
""" Select one or more nodes from the database, wrapped as instances of this class.
:param repository: the :class:`.Repository` in which to match
:param primary_value: value of the primary property (optional)
:rtype: :class:`.ModelMatch`
"""
return ModelMatcher(cls, repository).match(primary_value)
def __repr__(self):
return "<%s %s=%r>" % (self.__class__.__name__, self.__primarykey__, self.__primaryvalue__)
@property
def __primaryvalue__(self):
node = self.__node__
primary_key = self.__primarykey__
if primary_key == "__id__":
return node.identity
elif isinstance(primary_key, tuple):
return tuple(node[key] for key in primary_key)
else:
return node[primary_key]
@property
def __node__(self):
""" The :class:`.Node` wrapped by this :class:`.Model`.
"""
return self.__ogm__.node
def __db_create__(self, tx):
self.__db_merge__(tx)
def __db_delete__(self, tx):
ogm = self.__ogm__
tx.delete(ogm.node)
for related_objects in ogm.all_related():
related_objects.clear()
def __db_exists__(self, tx):
return tx.exists(self.__node__)
def __db_merge__(self, tx, primary_label=None, primary_key=None):
ogm = self.__ogm__
node = ogm.node
if primary_label is None:
primary_label = self.__primarylabel__
if primary_key is None:
primary_key = self.__primarykey__ or "__id__"
if node.graph is None:
if primary_key == "__id__":
node.add_label(primary_label)
tx.create(node)
else:
tx.merge(node, primary_label, primary_key)
for related_objects in ogm.all_related():
related_objects.__db_push__(tx)
def __db_pull__(self, tx):
ogm = self.__ogm__
if ogm.node.graph is None:
matcher = ModelMatcher(self.__class__, tx.graph)
matcher._match_class = NodeMatch
ogm.node = matcher.match(self.__primaryvalue__).first()
tx.pull(ogm.node)
for related_objects in ogm.all_related():
related_objects.__db_pull__(tx)
def __db_push__(self, tx):
ogm = self.__ogm__
node = ogm.node
if node.graph is not None:
tx.push(node)
else:
primary_key = self.__primarykey__ or "__id__"
if primary_key == "__id__":
tx.create(node)
else:
tx.merge(node)
for related_objects in ogm.all_related():
related_objects.__db_push__(tx)
# Alias for backward compatibility
GraphObject = Model
[docs]
class ModelMatch(NodeMatch):
""" A selection of :class:`.Model` instances that match a
given set of criteria.
"""
_object_class = Model
[docs]
def __iter__(self):
""" Iterate through items drawn from the underlying repository
that match the given criteria.
"""
wrap = self._object_class.wrap
for node in super(ModelMatch, self).__iter__():
yield wrap(node)
[docs]
def first(self):
""" Return the first item that matches the given criteria.
"""
return self._object_class.wrap(super(ModelMatch, self).first())
class ModelMatcher(NodeMatcher):
_match_class = ModelMatch
@classmethod
def _coerce_to_graph(cls, obj):
if isinstance(obj, Repository):
return obj.graph
elif isinstance(obj, Graph):
return obj
else:
raise TypeError("Cannot coerce object %r to Graph" % obj)
def __init__(self, object_class, repository):
NodeMatcher.__init__(self, self._coerce_to_graph(repository))
self._object_class = object_class
self._match_class = type("%sMatch" % self._object_class.__name__,
(ModelMatch,), {"_object_class": object_class})
def match(self, primary_value=None):
cls = self._object_class
if cls.__primarykey__ == "__id__":
match = NodeMatcher.match(self, cls.__primarylabel__)
if primary_value is not None:
match = match.where("id(_) = %d" % primary_value)
elif primary_value is None:
match = NodeMatcher.match(self, cls.__primarylabel__)
else:
match = NodeMatcher.match(self, cls.__primarylabel__).where(**{cls.__primarykey__: primary_value})
return match
[docs]
class Repository(object):
""" Storage container for :class:`.Model` instances.
The constructor for this class has an identical signature to that
for the :class:`.Graph` class. For example::
>>> from py2neo.ogm import Repository
>>> from py2neo.ogm.models.movies import Movie
>>> repo = Repository("bolt://neo4j@localhost:7687", password="password")
>>> repo.match(Movie, "The Matrix").first()
<Movie title='The Matrix'>
*New in version 2020.0. In earlier versions, a :class:`.Graph` was
required to co-ordinate all reads and writes to the remote
database. This class completely replaces that, removing the need
to import from any other packages when using OGM.*
"""
[docs]
@classmethod
def wrap(cls, graph):
""" Wrap an existing :class:`.Graph` object as a
:class:`.Repository`.
"""
obj = object.__new__(Repository)
obj.graph = graph
return obj
def __init__(self, profile=None, name=None, **settings):
self.graph = Graph(profile, name=name, **settings)
def __repr__(self):
return "<Repository profile=%r>" % (self.graph.service.profile,)
[docs]
def reload(self, obj):
""" Reload data from the remote graph into the local object.
"""
self.graph.pull(obj)
[docs]
def save(self, *objects):
""" Save data from the local object into the remote graph.
Each object in `objects` can be a :class:`Model` object or an
iterable of such objects.
:param objects: :class:`Model` objects to save.
"""
def push_all(tx, iterable):
for obj in iterable:
if hasattr(obj, "__db_push__"):
tx.push(obj)
elif hasattr(obj, "__iter__"):
push_all(tx, obj)
else:
raise ValueError("Object %r is neither savable "
"nor iterable" % obj)
self.graph.update(lambda tx: push_all(tx, objects))
[docs]
def delete(self, obj):
""" Delete the object in the remote graph.
"""
self.graph.delete(obj)
[docs]
def exists(self, obj):
""" Check whether the object exists in the remote graph.
"""
return self.graph.exists(obj)
[docs]
def match(self, model, primary_value=None):
""" Select one or more objects from the remote graph.
:param model: the :class:`.Model` subclass to match
:param primary_value: value of the primary property (optional)
:rtype: :class:`.ModelMatch`
"""
return ModelMatcher(model, self).match(primary_value)
[docs]
def get(self, model, primary_value=None):
""" Match and return a single object from the remote graph.
:param model: the :class:`.Model` subclass to match
:param primary_value: value of the primary property (optional)
:rtype: :class:`.Model`
"""
return self.match(model, primary_value).first()
@deprecated("Repository.create is a compatibility alias, "
"please use Repository.save instead")
def create(self, obj):
self.graph.create(obj)
@deprecated("Repository.merge is a compatibility alias, "
"please use Repository.save instead")
def merge(self, obj):
self.graph.merge(obj)
@deprecated("Repository.pull is a compatibility alias, "
"please use Repository.load instead")
def pull(self, obj):
self.graph.pull(obj)
@deprecated("Repository.push is a compatibility alias, "
"please use Repository.save instead")
def push(self, obj):
self.graph.push(obj)