Source code for py2neo.database

#!/usr/bin/env python
# -*- encoding: utf-8 -*-

# Copyright 2011-2021, Nigel Small
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


from __future__ import absolute_import, print_function


__all__ = [
    "GraphService",
    "Graph",
    "SystemGraph",
    "Schema",
    "Transaction",
]


from inspect import isgenerator
from time import sleep

from py2neo.compat import (deprecated,
                           Sequence,
                           Mapping)
from py2neo.cypher import Cursor, cypher_escape
from py2neo.cypher.proc import ProcedureLibrary
from py2neo.errors import (Neo4jError,
                           ConnectionUnavailable,
                           ConnectionBroken,
                           ConnectionLimit,
                           ServiceUnavailable,
                           WriteServiceUnavailable)
from py2neo.matching import NodeMatcher, RelationshipMatcher


[docs] class GraphService(object): """ The :class:`.GraphService` class is the top-level accessor for an entire Neo4j graph database management system (DBMS). Within the py2neo object hierarchy, a :class:`.GraphService` contains one or more :class:`.Graph` objects in which data storage and retrieval activity chiefly occurs. An explicit URI can be passed to the constructor:: >>> from py2neo import GraphService >>> gs = GraphService("bolt://camelot.example.com:7687") Alternatively, the default value of ``bolt://localhost:7687`` is used:: >>> default_gs = GraphService() >>> default_gs <GraphService uri='bolt://localhost:7687'> .. note:: Some attributes of this class available in earlier versions of py2neo are no longer available, specifically ``kernel_start_time``, ``primitive_counts``, ``store_creation_time``, ``store_file_sizes`` and ``store_id``, along with the ``query_jmx`` method. This is due to a change in Neo4j 4.0 relating to how certain system metadata is exposed. Replacement functionality may be reintroduced in a future py2neo release. *Changed in 2020.0: this class was formerly known as 'Database', but was renamed to avoid confusion with the concept of the same name introduced with the multi-database feature of Neo4j 4.0.* .. describe:: iter(graph_service) Yield all named graphs. For Neo4j 4.0 and above, this yields the names returned by a ``SHOW DATABASES`` query. For earlier versions, this yields no entries, since the one and only graph in these versions is not named. *New in version 2020.0.* .. describe:: graph_service[name] Access a :class:`.Graph` by name. *New in version 2020.0.* """ _connector = None _graphs = None def __init__(self, profile=None, **settings): from py2neo import ServiceProfile from py2neo.client import Connector connector_settings = { "user_agent": settings.pop("user_agent", None), "init_size": settings.pop("init_size", None), "max_size": settings.pop("max_size", None), "max_age": settings.pop("max_age", None), "routing_refresh_ttl": settings.pop("routing_refresh_ttl", None), } profile = ServiceProfile(profile, **settings) if connector_settings["init_size"] is None and not profile.routing: # Ensures credentials are checked on construction connector_settings["init_size"] = 1 self._connector = Connector(profile, **connector_settings) self._graphs = {} def __repr__(self): class_name = self.__class__.__name__ profile = self._connector.profile return "<%s uri=%r secure=%r user_agent=%r>" % ( class_name, profile.uri, profile.secure, self._connector.user_agent) def __eq__(self, other): try: return self.uri == other.uri except AttributeError: return False def __ne__(self, other): return not self.__eq__(other) def __hash__(self): return hash(self._connector) def __getitem__(self, graph_name): if graph_name is None: graph_name = self._connector.default_graph_name() elif graph_name not in self._connector.graph_names(): raise KeyError("Graph {!r} does not exist for " "service {!r}".format(graph_name, self._connector.profile.uri)) if graph_name not in self._graphs: graph_class = SystemGraph if graph_name == "system" else Graph self._graphs[graph_name] = graph_class(self.profile, name=graph_name) return self._graphs[graph_name] def __iter__(self): return iter(self._connector.graph_names()) @property def connector(self): """ The :class:`.Connector` providing communication for this graph service. *New in version 2020.0.* """ return self._connector @property def profile(self): """ The :class:`.ConnectionProfile` for which this graph service is configured. This attribute is simply a shortcut for ``connector.profile``. *New in version 2020.0.* """ return self.connector.profile @property def uri(self): """ The URI to which this graph service is connected. This attribute is simply a shortcut for ``connector.profile.uri``. """ return self.profile.uri @property def default_graph(self): """ The default :class:`.Graph` exposed by this graph service. """ return self[None] @property def system_graph(self): """ The :class:`.SystemGraph` exposed by this graph service. *New in version 2020.0.* """ return self["system"]
[docs] def keys(self): """ Return a list of all :class:`.Graph` names exposed by this graph service. *New in version 2020.0.* """ return list(self)
@property def kernel_version(self): """ The :class:`~packaging.version.Version` of Neo4j running. """ from packaging.version import Version components = self.default_graph.call("dbms.components").data() kernel_component = [component for component in components if component["name"] == "Neo4j Kernel"][0] version_string = kernel_component["versions"][0] return Version(version_string) @property def product(self): """ The product name. """ record = next(self.default_graph.call("dbms.components")) return "%s %s (%s)" % (record[0], " ".join(record[1]), record[2].title()) @property def config(self): """ A dictionary of the configuration parameters used to configure Neo4j. >>> gs.config['dbms.connectors.default_advertised_address'] 'localhost' """ return {record["name"]: record["value"] for record in self.default_graph.call("dbms.listConfig")}
[docs] class Graph(object): """ The `Graph` class provides a handle to an individual named graph database exposed by a Neo4j graph database service. Connection details are provided using either a URI or a :class:`.ConnectionProfile`, plus individual settings, if required. The `name` argument allows selection of a graph database by name. When working with Neo4j 4.0 and above, this can be any name defined in the system catalogue, a full list of which can be obtained through the Cypher ``SHOW DATABASES`` command. Passing `None` here will select the default database, as defined on the server. For earlier versions of Neo4j, the `name` must be set to `None`. >>> from py2neo import Graph >>> sales = Graph("bolt+s://g.example.com:7687", name="sales") >>> sales.run("MATCH (c:Customer) RETURN c.name") c.name --------------- John Smith Amy Pond Rory Williams The `system graph`, which is available in all 4.x+ product editions, can also be accessed via the :class:`.SystemGraph` class. >>> from py2neo import SystemGraph >>> sg = SystemGraph("bolt+s://g.example.com:7687") >>> sg.call("dbms.security.listUsers") username | roles | flags ----------|-------|------- neo4j | null | [] In addition to the core `connection details <#getting-connected>`_ that can be passed to the constructor, the :class:`.Graph` class can accept several other settings: =================== ======================================================== ============== ========================= Keyword Description Type Default =================== ======================================================== ============== ========================= ``user_agent`` User agent to send for all connections str `(depends on URI scheme)` ``max_connections`` The maximum number of simultaneous connections permitted int 40 =================== ======================================================== ============== ========================= Once obtained, the `Graph` instance provides direct or indirect access to most of the functionality available within py2neo. """ #: The :class:`.GraphService` to which this :class:`.Graph` belongs. service = None #: The :class:`.Schema` resource for this :class:`.Graph`. schema = None def __init__(self, profile=None, name=None, **settings): self.service = GraphService(profile, **settings) self.__name__ = name self.schema = Schema(self) self._procedures = ProcedureLibrary(self) def __repr__(self): if self.name is None: return "%s(%r)" % (self.__class__.__name__, self.service.uri) else: return "%s(%r, name=%r)" % (self.__class__.__name__, self.service.uri, self.name) def __eq__(self, other): try: return self.service == other.service and self.__name__ == other.__name__ except (AttributeError, TypeError): return False def __ne__(self, other): return not self.__eq__(other) def __len__(self): return len(self.relationships) def __bool__(self): return True __nonzero__ = __bool__ @property def name(self): """ The name of this graph. *New in version 2020.0.* """ return self.__name__ # TRANSACTION MANAGEMENT #
[docs] def auto(self, readonly=False, # after=None, metadata=None, timeout=None ): """ Create a new auto-commit :class:`~py2neo.Transaction`. :param readonly: if :py:const:`True`, will begin a readonly transaction, otherwise will begin as read-write *New in version 2020.0.* """ return Transaction(self, autocommit=True, readonly=readonly, # after, metadata, timeout )
[docs] def begin(self, readonly=False, # after=None, metadata=None, timeout=None ): """ Begin a new :class:`~py2neo.Transaction`. :param readonly: if :py:const:`True`, will begin a readonly transaction, otherwise will begin as read-write *Changed in version 2021.1: the 'autocommit' argument has been removed. Use the 'auto' method instead.* """ return Transaction(self, autocommit=False, readonly=readonly, # after, metadata, timeout )
[docs] def commit(self, tx): """ Commit a transaction. *New in version 2021.1.* """ if tx is None: return if not isinstance(tx, Transaction): raise TypeError("Bad transaction %r" % tx) if tx.closed: raise TypeError("Cannot commit closed transaction") try: summary = self.service.connector.commit(tx.ref) tx._bookmark = summary["bookmark"] tx._profile = summary["profile"] tx._time = summary["time"] finally: tx._closed = True
[docs] def rollback(self, tx): """ Rollback a transaction. *New in version 2021.1.* """ if tx is None or tx.closed: return if not isinstance(tx, Transaction): raise TypeError("Bad transaction %r" % tx) try: summary = self.service.connector.rollback(tx.ref) tx._bookmark = summary["bookmark"] tx._profile = summary["profile"] tx._time = summary["time"] except (ConnectionUnavailable, ConnectionBroken): pass finally: tx._closed = True
# CYPHER EXECUTION #
[docs] def run(self, cypher, parameters=None, **kwparameters): """ Run a single read/write query within an auto-commit :class:`~py2neo.Transaction`. :param cypher: Cypher statement :param parameters: dictionary of parameters :param kwparameters: extra parameters supplied as keyword arguments :return: """ return self.auto().run(cypher, parameters, **kwparameters)
[docs] def evaluate(self, cypher, parameters=None, **kwparameters): """ Run a :meth:`~py2neo.Transaction.evaluate` operation within an auto-commit :class:`~py2neo.Transaction`. :param cypher: Cypher statement :param parameters: dictionary of parameters :return: first value from the first record returned or :py:const:`None`. """ return self.run(cypher, parameters, **kwparameters).evaluate()
[docs] def update(self, cypher, parameters=None, timeout=None): """ Execute a transactional unit of work that carries out write operations but does not return a result. The `cypher` argument can be either a single string statement or a callable that accepts a :class:`~py2neo.Transaction` object as its first argument. Additional arguments can be passed though the `args` and `kwargs` arguments of this method. The unit of work may be called multiple times if earlier attempts fail due to connectivity or other transient errors. As such, the function should have no non-idempotent side effects. Note that hybrid read-write queries (i.e. those which carry out updates *and* return a result should use the non-retrying :class:`.run` method instead. :param cypher: cypher string or transaction function containing a unit of work :param parameters: cypher parameter map or function arguments :param timeout: :raises WriteServiceUnavailable: if the update does not successfully complete """ if callable(cypher): if parameters is None: self._update(cypher, timeout=timeout) elif (isinstance(parameters, tuple) and len(parameters) == 2 and isinstance(parameters[0], Sequence) and isinstance(parameters[1], Mapping)): self._update(lambda tx: cypher(tx, *parameters[0], **parameters[1]), timeout=timeout) elif isinstance(parameters, Sequence): self._update(lambda tx: cypher(tx, *parameters), timeout=timeout) elif isinstance(parameters, Mapping): self._update(lambda tx: cypher(tx, **parameters), timeout=timeout) else: raise TypeError("Unrecognised parameter type") else: self._update(lambda tx: tx.update(cypher, parameters), timeout=timeout)
def _update(self, f, timeout=None): from py2neo.timing import Timer # TODO: logging n = 0 for _ in Timer.repeat(at_least=3, timeout=timeout): n += 1 tx = None try: tx = self.begin( # after=after, metadata=metadata, timeout=timeout ) value = f(tx) if isgenerator(value): _ = list(value) # exhaust the generator self.commit(tx) except (ConnectionUnavailable, ConnectionBroken, ConnectionLimit): self.rollback(tx) continue except Neo4jError as error: self.rollback(tx) if error.should_invalidate_routing_table(): self.service.connector.invalidate_routing_table(self.name) if error.should_retry(): continue else: raise except Exception: self.rollback(tx) raise else: return raise WriteServiceUnavailable("Failed to execute update after %r tries" % n)
[docs] def query(self, cypher, parameters=None, timeout=None): """ Run a single query within a readonly auto-commit :class:`~py2neo.Transaction`. Should query execution fail due to connection failure or other transient error, retries will be attempted. There will be a minimum of three attempts, and retries will continue until the timeout passes. :param cypher: Cypher statement :param parameters: dictionary of parameters :param timeout: :returns: result :class:`~.cypher.Cursor` object :raises TypeError: if the underlying connection profile does not support readonly transactions :raises ServiceUnavailable: if the query does not successfully complete *Refactored from read to query in version 2021.1* """ from py2neo.timing import Timer # TODO: logging n = 0 for _ in Timer.repeat(at_least=3, timeout=timeout): n += 1 try: result = self.auto(readonly=True).run(cypher, parameters) except (ConnectionUnavailable, ConnectionBroken, ConnectionLimit): continue except Neo4jError as error: if error.should_invalidate_routing_table(): self.service.connector.invalidate_routing_table(self.name) if error.should_retry(): continue else: raise else: return result raise ServiceUnavailable("Failed to execute query after %r tries" % n)
@property def call(self): """ Accessor for listing and calling procedures. This property contains a :class:`.ProcedureLibrary` object tied to this graph, which provides links to Cypher procedures in the underlying implementation. Calling a procedure requires only the regular Python function call syntax:: >>> g = Graph() >>> g.call.dbms.components() name | versions | edition --------------|------------|----------- Neo4j Kernel | ['3.5.12'] | community The object returned from the call is a :class:`~py2neo.Cursor` object, identical to that obtained from running a normal Cypher query, and can therefore be consumed in a similar way. Procedure names can alternatively be supplied as a string:: >>> g.call["dbms.components"]() name | versions | edition --------------|------------|----------- Neo4j Kernel | ['3.5.12'] | community Using :func:`dir` or :func:`iter` on the `call` attribute will yield a list of available procedure names. *New in version 2020.0.* """ return self._procedures
[docs] def delete_all(self): """ Delete all nodes and relationships from this :class:`.Graph`. .. warning:: This method will permanently remove **all** nodes and relationships from the graph and cannot be undone. """ self.run("MATCH (a) DETACH DELETE a")
@deprecated("The graph.read(...) method is deprecated, " "use graph.query(...) instead") def read(self, cypher, parameters=None, **kwparameters): return self.query(cypher, dict(parameters or {}, **kwparameters)) # SUBGRAPH OPERATIONS #
[docs] def create(self, subgraph): """ Run a :meth:`~py2neo.Transaction.create` operation within a :class:`~py2neo.Transaction`. :param subgraph: a :class:`.Node`, :class:`.Relationship` or other :class:`.Subgraph` """ self.update(lambda tx: tx.create(subgraph))
[docs] def delete(self, subgraph): """ Run a :meth:`~py2neo.Transaction.delete` operation within an auto-commit :class:`~py2neo.Transaction`. To delete only the relationships, use the :meth:`.separate` method. Note that only entities which are bound to corresponding remote entities though the ``graph`` and ``identity`` attributes will trigger a deletion. :param subgraph: a :class:`.Node`, :class:`.Relationship` or other :class:`.Subgraph` object """ self.update(lambda tx: tx.delete(subgraph))
[docs] def exists(self, subgraph): """ Run a :meth:`~py2neo.Transaction.exists` operation within an auto-commit :class:`~py2neo.Transaction`. :param subgraph: a :class:`.Node`, :class:`.Relationship` or other :class:`.Subgraph` object :return: """ return self.auto(readonly=True).exists(subgraph)
[docs] def match(self, nodes=None, r_type=None, limit=None): """ Match and return all relationships with specific criteria. For example, to find all of Alice's friends:: for rel in graph.match((alice, ), r_type="FRIEND"): print(rel.end_node["name"]) :param nodes: Sequence or Set of start and end nodes (:const:`None` means any node); a Set implies a match in any direction :param r_type: type of relationships to match (:const:`None` means any type) :param limit: maximum number of relationships to match (:const:`None` means unlimited) """ return RelationshipMatcher(self).match(nodes=nodes, r_type=r_type).limit(limit)
[docs] def match_one(self, nodes=None, r_type=None): """ Match and return one relationship with specific criteria. :param nodes: Sequence or Set of start and end nodes (:const:`None` means any node); a Set implies a match in any direction :param r_type: type of relationships to match (:const:`None` means any type) """ matches = self.match(nodes=nodes, r_type=r_type, limit=1) rels = list(matches) if rels: return rels[0] else: return None
[docs] def merge(self, subgraph, label=None, *property_keys): """ Run a :meth:`~py2neo.Transaction.merge` operation within an auto-commit :class:`~py2neo.Transaction`. The example code below shows a simple merge for a new relationship between two new nodes: >>> from py2neo import Graph, Node, Relationship >>> g = Graph() >>> a = Node("Person", name="Alice", age=33) >>> b = Node("Person", name="Bob", age=44) >>> KNOWS = Relationship.type("KNOWS") >>> g.merge(KNOWS(a, b), "Person", "name") Following on, we then create a third node (of a different type) to which both the original nodes connect: >>> c = Node("Company", name="ACME") >>> c.__primarylabel__ = "Company" >>> c.__primarykey__ = "name" >>> WORKS_FOR = Relationship.type("WORKS_FOR") >>> g.merge(WORKS_FOR(a, c) | WORKS_FOR(b, c)) For details of how the merge algorithm works, see the :meth:`~py2neo.Transaction.merge` method. Note that this is different to a Cypher MERGE. :param subgraph: a :class:`.Node`, :class:`.Relationship` or other :class:`.Subgraph` object :param label: label on which to match any existing nodes :param property_keys: property keys on which to match any existing nodes """ self.update(lambda tx: tx.merge(subgraph, label, *property_keys))
@property def nodes(self): """ A :class:`.NodeMatcher` for this graph. This can be used to find nodes that match given criteria: >>> graph = Graph() >>> graph.nodes[1234] (_1234:Person {name: 'Alice'}) >>> graph.nodes.get(1234) (_1234:Person {name: 'Alice'}) >>> graph.nodes.match("Person", name="Alice").first() (_1234:Person {name: 'Alice'}) Nodes can also be efficiently counted using this attribute: >>> len(graph.nodes) 55691 >>> len(graph.nodes.match("Person", age=33)) 12 """ return NodeMatcher(self)
[docs] def pull(self, subgraph): """ Pull data to one or more entities from their remote counterparts. :param subgraph: the collection of nodes and relationships to pull """ self.update(lambda tx: tx.pull(subgraph))
[docs] def push(self, subgraph): """ Push data from one or more entities to their remote counterparts. :param subgraph: the collection of nodes and relationships to push """ self.update(lambda tx: tx.push(subgraph))
@property def relationships(self): """ A :class:`.RelationshipMatcher` for this graph. This can be used to find relationships that match given criteria as well as efficiently count relationships. """ return RelationshipMatcher(self)
[docs] def separate(self, subgraph): """ Run a :meth:`~py2neo.Transaction.separate` operation within an auto-commit :class:`~py2neo.Transaction`. Note that only relationships which are bound to corresponding remote relationships though the ``graph`` and ``identity`` attributes will trigger a deletion. :param subgraph: a :class:`.Node`, :class:`.Relationship` or other :class:`.Subgraph` """ self.update(lambda tx: tx.separate(subgraph))
[docs] class SystemGraph(Graph): """ A subclass of :class:`.Graph` that provides access to the system database for the remote DBMS. This is only available in Neo4j 4.0 and above. *New in version 2020.0.* """ def __init__(self, profile=None, **settings): settings["name"] = "system" super(SystemGraph, self).__init__(profile, **settings) def __repr__(self): return "%s(%r)" % (self.__class__.__name__, self.service.uri)
[docs] class Schema(object): """ The schema resource attached to a `Graph` instance. """ def __init__(self, graph): self.graph = graph @property def node_labels(self): """ The set of node labels currently defined within the graph. """ return frozenset(record[0] for record in self.graph.run("CALL db.labels")) @property def relationship_types(self): """ The set of relationship types currently defined within the graph. """ return frozenset(record[0] for record in self.graph.run("CALL db.relationshipTypes"))
[docs] def create_index(self, label, *property_keys): """ Create a schema index for a label and property key combination. """ cypher = "CREATE INDEX ON :{}({})".format( cypher_escape(label), ", ".join(map(cypher_escape, property_keys))) self.graph.update(cypher) while property_keys not in self.get_indexes(label): sleep(0.1)
[docs] def create_uniqueness_constraint(self, label, property_key): """ Create a node uniqueness constraint for a given label and property key. While indexes support the use of composite keys, unique constraints may only be tied to a single property key. """ cypher = "CREATE CONSTRAINT ON (_:{}) ASSERT _.{} IS UNIQUE".format( cypher_escape(label), cypher_escape(property_key)) self.graph.update(cypher) while property_key not in self.get_uniqueness_constraints(label): sleep(0.1)
[docs] def drop_index(self, label, *property_keys): """ Remove label index for a given property key. """ cypher = "DROP INDEX ON :{}({})".format( cypher_escape(label), ", ".join(map(cypher_escape, property_keys))) self.graph.update(cypher)
[docs] def drop_uniqueness_constraint(self, label, property_key): """ Remove the node uniqueness constraint for a given label and property key. """ cypher = "DROP CONSTRAINT ON (_:{}) ASSERT _.{} IS UNIQUE".format( cypher_escape(label), cypher_escape(property_key)) self.graph.update(cypher)
def _get_indexes(self, label, unique_only=False): indexes = [] result = self.graph.run("CALL db.indexes") for record in result: properties = [] # The code branches here depending on the format of the response # from the `db.indexes` procedure, which has varied enormously # since 3.0. if len(record) == 10: if "labelsOrTypes" in result.keys(): # 4.0.0 # ['id', 'name', 'state', 'populationPercent', # 'uniqueness', 'type', 'entityType', 'labelsOrTypes', # 'properties', 'provider'] (id_, name, state, population_percent, uniqueness, type_, entity_type, token_names, properties, provider) = record description = None # The 'type' field has randomly changed its meaning in 4.0, # holding for example 'BTREE' instead of for example # 'node_unique_property'. To check for uniqueness, we now # need to look at the new 'uniqueness' field. is_unique = uniqueness == "UNIQUE" else: # 3.5.3 # ['description', 'indexName', 'tokenNames', 'properties', # 'state', 'type', 'progress', 'provider', 'id', # 'failureMessage'] (description, index_name, token_names, properties, state, type_, progress, provider, id_, failure_message) = record is_unique = type_ == "node_unique_property" elif len(record) == 7: # 3.4.10 (description, lbl, properties, state, type_, provider, failure_message) = record is_unique = type_ == "node_unique_property" token_names = [lbl] elif len(record) == 6: # 3.4.7 description, lbl, properties, state, type_, provider = record is_unique = type_ == "node_unique_property" token_names = [lbl] elif len(record) == 3: # 3.0.10 description, state, type_ = record is_unique = type_ == "node_unique_property" token_names = [] else: raise RuntimeError("Unexpected response from procedure " "db.indexes (%d fields)" % len(record)) if state not in (u"ONLINE", u"online"): continue if unique_only and not is_unique: continue if not token_names or not properties: if description: from py2neo.cypher.lexer import CypherLexer from pygments.token import Token tokens = list(CypherLexer().get_tokens(description)) for token_type, token_value in tokens: if token_type is Token.Name.Label: token_names.append(token_value.strip("`")) elif token_type is Token.Name.Variable: properties.append(token_value.strip("`")) if not token_names or not properties: continue if label in token_names: indexes.append(tuple(properties)) return indexes
[docs] def get_indexes(self, label): """ Fetch a list of indexed property keys for a label. """ return self._get_indexes(label)
[docs] def get_uniqueness_constraints(self, label): """ Fetch a list of unique constraints for a label. Each constraint is the name of a single property key. """ return [k[0] for k in self._get_indexes(label, unique_only=True)]
[docs] class Transaction(object): """ Logical context for one or more graph operations. Transaction objects are typically constructed by the :meth:`.Graph.auto` and :meth:`.Graph.begin` methods. Likewise, the :meth:`.Graph.commit` and :meth:`.Graph.rollback` methods can be used to finish a transaction. """ def __init__(self, graph, autocommit=False, readonly=False, # after=None, metadata=None, timeout=None ): self._graph = graph self._autocommit = autocommit self._connector = self.graph.service.connector if autocommit: self._ref = None else: self._ref = self._connector.begin(self.graph.name, readonly=readonly, # after, metadata, timeout ) self._readonly = readonly self._closed = False self._bookmark = None self._profile = None self._time = None @property def graph(self): """ Graph to which this transaction belongs. """ return self._graph @property def ref(self): """ Transaction reference. """ return self._ref @property def readonly(self): """ :py:const:`True` if this is a readonly transaction, :py:const:`False` otherwise. """ return self._readonly @property def closed(self): """ :py:const:`True` if this transaction is closed, :py:const:`False` otherwise. """ return self._closed @property def bookmark(self): """ The closing bookmark for this transaction, populated on commit. """ return self._bookmark @property def profile(self): """ The connection profile under which this transaction was carried out. """ return self._profile @property def time(self): """ The total time taken to carry out this transaction """ return self._time
[docs] def run(self, cypher, parameters=None, **kwparameters): """ Send a Cypher query to the server for execution and return a :py:class:`~.cypher.Cursor` for navigating its result. :param cypher: Cypher query :param parameters: dictionary of parameters :returns: :py:class:`~.cypher.Cursor` object """ from py2neo.client import Connection if self.closed: raise TypeError("Cannot run query in closed transaction") try: hydrant = Connection.default_hydrant(self._connector.profile, self.graph) parameters = dict(parameters or {}, **kwparameters) if self.ref: result = self._connector.run(self.ref, cypher, parameters) else: result = self._connector.auto_run(cypher, parameters, graph_name=self.graph.name, readonly=self.readonly) self._connector.pull(result, -1) return Cursor(result, hydrant) finally: if not self.ref: self._closed = True
[docs] def evaluate(self, cypher, parameters=None, **kwparameters): """ Execute a single Cypher query and return the value from the first column of the first record. :param cypher: Cypher statement :param parameters: dictionary of parameters :returns: single return value or :const:`None` """ return self.run(cypher, parameters, **kwparameters).evaluate(0)
[docs] def update(self, cypher, parameters=None, **kwparameters): """ Execute a single Cypher statement and discard any result returned. :param cypher: Cypher statement :param parameters: dictionary of parameters """ self.run(cypher, parameters, **kwparameters)
@deprecated("The transaction.commit() method is deprecated, " "use graph.commit(transaction) instead") def commit(self): """ Commit the transaction. """ return self.graph.commit(self) @deprecated("The transaction.rollback() method is deprecated, " "use graph.rollback(transaction) instead") def rollback(self): """ Roll back the current transaction, undoing all actions previously taken. """ return self.graph.rollback(self)
[docs] def create(self, subgraph): """ Create remote nodes and relationships that correspond to those in a local subgraph. Any entities in *subgraph* that are already bound to remote entities will remain unchanged, those which are not will become bound to their newly-created counterparts. For example:: >>> from py2neo import Graph, Node, Relationship >>> g = Graph() >>> tx = g.begin() >>> a = Node("Person", name="Alice") >>> tx.create(a) >>> b = Node("Person", name="Bob") >>> ab = Relationship(a, "KNOWS", b) >>> tx.create(ab) >>> tx.commit() >>> g.exists(ab) True :param subgraph: a :class:`.Node`, :class:`.Relationship` or other creatable object """ if self._autocommit: raise TypeError("Create operations are not supported inside " "auto-commit transactions") try: create = subgraph.__db_create__ except AttributeError: raise TypeError("No method defined to create object %r" % subgraph) else: create(self)
[docs] def delete(self, subgraph): """ Delete the remote nodes and relationships that correspond to those in a local subgraph. To delete only the relationships, use the :meth:`.separate` method. :param subgraph: a :class:`.Node`, :class:`.Relationship` or other :class:`.Subgraph` """ try: delete = subgraph.__db_delete__ except AttributeError: raise TypeError("No method defined to delete object %r" % subgraph) else: delete(self)
[docs] def exists(self, subgraph): """ Determine whether one or more entities all exist within the graph. Note that if any nodes or relationships in *subgraph* are not bound to remote counterparts, this method will return ``False``. :param subgraph: a :class:`.Node`, :class:`.Relationship` or other :class:`.Subgraph` :returns: ``True`` if all entities exist remotely, ``False`` otherwise """ try: exists = subgraph.__db_exists__ except AttributeError: raise TypeError("No method defined to check existence of object %r" % subgraph) else: return exists(self)
[docs] def merge(self, subgraph, primary_label=None, primary_key=None): """ Create or update the nodes and relationships of a local subgraph in the remote database. Note that the functionality of this operation is not strictly identical to the Cypher MERGE clause, although there is some overlap. Each node and relationship in the local subgraph is merged independently, with nodes merged first and relationships merged second. For each node, the merge is carried out by comparing that node with a potential remote equivalent on the basis of a single label and property value. If no remote match is found, a new node is created; if a match is found, the labels and properties of the remote node are updated. The label and property used for comparison are determined by the `primary_label` and `primary_key` arguments but may be overridden for individual nodes by the of `__primarylabel__` and `__primarykey__` attributes on the node itself. For each relationship, the merge is carried out by comparing that relationship with a potential remote equivalent on the basis of matching start and end nodes plus relationship type. If no remote match is found, a new relationship is created; if a match is found, the properties of the remote relationship are updated. :param subgraph: a :class:`.Node`, :class:`.Relationship` or other :class:`.Subgraph` object :param primary_label: label on which to match any existing nodes :param primary_key: property key(s) on which to match any existing nodes """ try: merge = subgraph.__db_merge__ except AttributeError: raise TypeError("No method defined to merge object %r" % subgraph) else: merge(self, primary_label, primary_key)
[docs] def pull(self, subgraph): """ Update local entities from their remote counterparts. For any nodes and relationships that exist in both the local :class:`.Subgraph` and the remote :class:`.Graph`, pull properties and node labels into the local copies. This operation does not create or delete any entities. :param subgraph: a :class:`.Node`, :class:`.Relationship` or other :class:`.Subgraph` """ try: pull = subgraph.__db_pull__ except AttributeError: raise TypeError("No method defined to pull object %r" % subgraph) else: return pull(self)
[docs] def push(self, subgraph): """ Update remote entities from their local counterparts. For any nodes and relationships that exist in both the local :class:`.Subgraph` and the remote :class:`.Graph`, push properties and node labels into the remote copies. This operation does not create or delete any entities. :param subgraph: a :class:`.Node`, :class:`.Relationship` or other :class:`.Subgraph` """ try: push = subgraph.__db_push__ except AttributeError: raise TypeError("No method defined to push object %r" % subgraph) else: return push(self)
[docs] def separate(self, subgraph): """ Delete the remote relationships that correspond to those in a local subgraph. This leaves any nodes untouched. :param subgraph: a :class:`.Node`, :class:`.Relationship` or other :class:`.Subgraph` """ try: separate = subgraph.__db_separate__ except AttributeError: raise TypeError("No method defined to separate object %r" % subgraph) else: separate(self)