The session API
This chapter describes the following:
Simple sessions
Available in:
|
Simple sessions provide a "classic" blocking style API for Cypher execution. In general, simple sessions provide the easiest programming style to work with since API calls are executed in a strictly sequential fashion.
Lifecycle
The session lifetime extends from session construction to session closure. In languages that support them, simple sessions are usually scoped within a context block; this ensures that they are properly closed and that any underlying connections are released and not leaked.
using (var session = driver.Session(...)) {
// transactions go here
}
session := driver.NewSession(...)
defer session.Close()
// transactions go here
try (Session session = driver.session(...)) {
// transactions go here
}
with driver.session(...) as session:
// transactions go here
Sessions can be configured in a number of different ways. This is carried out by supplying configuration inside the session constructor. See Session configuration for more details.
Transaction functions
Transaction functions are used for containing transactional units of work. This form of transaction requires minimal boilerplate code and allows for a clear separation of database queries and application logic.
Transaction functions are also desirable since they encapsulate retry logic and allow for the greatest degree of flexibility when swapping out a single instance of server for a cluster.
Transaction functions can be called as either read or write operations. This choice will route the transaction to an appropriate server within a clustered environment. If you are operating in a single instance environment, this routing has no impact. It does give you flexibility if you choose to adopt a clustered environment later on.
Before writing a transaction function it is important to ensure that it is designed to be idempotent. This is because a function may be executed multiple times if initial runs fail.
Any query results obtained within a transaction function should be consumed within that function, as connection-bound resources cannot be managed correctly when out of scope. To that end, transaction functions can return values but these should be derived values rather than raw results.
Transaction functions are the recommended form for containing transactional units of work. When a transaction fails, the driver retry logic is invoked. For several failure cases, the transaction can be immediately retried against a different server. These cases include connection issues, server role changes (e.g. leadership elections) and transient errors. |
public void AddPerson(string name)
{
using (var session = Driver.Session())
{
session.WriteTransaction(tx => tx.Run("CREATE (a:Person {name: $name})", new {name}));
}
}
func addPersonInTxFunc(driver neo4j.Driver, name string) error {
session := driver.NewSession(neo4j.SessionConfig{AccessMode: neo4j.AccessModeWrite})
defer session.Close()
_, err := session.WriteTransaction(func(tx neo4j.Transaction) (interface{}, error) {
result, err := tx.Run("CREATE (a:Person {name: $name})", map[string]interface{}{"name": name})
if err != nil {
return nil, err
}
return result.Consume()
})
return err
}
public void addPerson( final String name )
{
try ( Session session = driver.session() )
{
session.writeTransaction( tx -> {
tx.run( "CREATE (a:Person {name: $name})", parameters( "name", name ) );
return 1;
} );
}
}
from neo4j import unit_of_work
@unit_of_work(timeout=5)
def create_person(tx, name):
return tx.run("CREATE (a:Person {name: $name}) RETURN id(a)", name=name).single().value()
def add_person(driver, name):
with driver.session() as session:
return session.write_transaction(create_person, name)
Auto-commit transactions
An auto-commit transaction is a basic but limited form of transaction. Such a transaction consists of only one Cypher query and is not automatically replayed on failure. Therefore any error scenarios must be handled by the client application itself.
Auto-commit transactions are intended to be used for simple use cases such as when learning Cypher or writing one-off scripts.
It is not recommended to use auto-commit transactions in production environments. |
Unlike other kinds of Cypher Query, Therefore, the only way to execute Please refer to the Cypher Manual → |
public void AddPerson(string name)
{
using (var session = Driver.Session())
{
session.Run("CREATE (a:Person {name: $name})", new {name});
}
}
func addPersonInAutoCommitTx(driver neo4j.Driver, name string) error {
session := driver.NewSession(neo4j.SessionConfig{AccessMode: neo4j.AccessModeWrite})
defer session.Close()
result, err := session.Run("CREATE (a:Person {name: $name})", map[string]interface{}{"name": name})
if err != nil {
return err
}
if _, err = result.Consume(); err != nil {
return err
}
return nil
}
public void addPerson( String name )
{
try ( Session session = driver.session() )
{
session.run( "CREATE (a:Person {name: $name})", parameters( "name", name ) );
}
}
from neo4j import Query
def add_person(self, name):
with self.driver.session() as session:
session.run("CREATE (a:Person {name: $name})", name=name)
# Alternative implementation, with a one second timeout
def add_person_within_a_second(self, name):
with self.driver.session() as session:
session.run(Query("CREATE (a:Person {name: $name})", timeout=1.0), name=name)
Consuming results
Query results are typically consumed as a stream of records. The drivers provide a language-idiomatic way to iterate through that stream.
public List<string> GetPeople()
{
using (var session = Driver.Session())
{
return session.ReadTransaction(tx =>
{
var result = tx.Run("MATCH (a:Person) RETURN a.name ORDER BY a.name");
return result.Select(record => record[0].As<string>()).ToList();
});
}
}
func getPeople(driver neo4j.Driver) ([]string, error) {
session := driver.NewSession(neo4j.SessionConfig{AccessMode: neo4j.AccessModeRead})
defer session.Close()
people, err := session.ReadTransaction(func(tx neo4j.Transaction) (interface{}, error) {
var list []string
result, err := tx.Run("MATCH (a:Person) RETURN a.name ORDER BY a.name", nil)
if err != nil {
return nil, err
}
for result.Next() {
list = append(list, result.Record().Values[0].(string))
}
if err = result.Err(); err != nil {
return nil, err
}
return list, nil
})
if err != nil {
return nil, err
}
return people.([]string), nil
}
public List<String> getPeople()
{
try ( Session session = driver.session() )
{
return session.readTransaction( tx -> {
List<String> names = new ArrayList<>();
Result result = tx.run( "MATCH (a:Person) RETURN a.name ORDER BY a.name" );
while ( result.hasNext() )
{
names.add( result.next().get( 0 ).asString() );
}
return names;
} );
}
}
def match_person_nodes(tx):
result = tx.run("MATCH (a:Person) RETURN a.name ORDER BY a.name")
return [record["a.name"] for record in result]
with driver.session() as session:
people = session.read_transaction(match_person_nodes)
Retaining results
Within a session, only one result stream can be active at any one time. Therefore, if the result of one query is not fully consumed before another query is executed, the remainder of the first result will be automatically buffered within the result object.
This buffer provides a staging point for results, and divides result handling into fetching (moving from the network to the buffer) and consuming (moving from the buffer to the application).
For large results, the result buffer may require a significant amount of memory. For this reason, it is recommended to consume results in order wherever possible. |
Client applications can choose to take control of more advanced query patterns by explicitly retaining results. Such explicit retention may also be useful when a result needs to be saved for future processing. The drivers offer support for this process, as per the example below:
public int AddEmployees(string companyName)
{
using (var session = Driver.Session())
{
var persons =
session.ReadTransaction(tx => tx.Run("MATCH (a:Person) RETURN a.name AS name").ToList());
return persons.Sum(person => session.WriteTransaction(tx =>
{
tx.Run("MATCH (emp:Person {name: $person_name}) " +
"MERGE (com:Company {name: $company_name}) " +
"MERGE (emp)-[:WORKS_FOR]->(com)",
new {person_name = person["name"].As<string>(), company_name = companyName});
return 1;
}));
}
}
func addPersonsAsEmployees(driver neo4j.Driver, companyName string) (int, error) {
session := driver.NewSession(neo4j.SessionConfig{AccessMode: neo4j.AccessModeWrite})
defer session.Close()
persons, err := neo4j.Collect(session.Run("MATCH (a:Person) RETURN a.name AS name", nil))
if err != nil {
return 0, err
}
employees := 0
for _, person := range persons {
_, err = session.WriteTransaction(func(tx neo4j.Transaction) (interface{}, error) {
return tx.Run("MATCH (emp:Person {name: $person_name}) "+
"MERGE (com:Company {name: $company_name}) "+
"MERGE (emp)-[:WORKS_FOR]->(com)", map[string]interface{}{"person_name": person.Values[0], "company_name": companyName})
})
if err != nil {
return 0, err
}
employees++
}
return employees, nil
}
public int addEmployees( final String companyName )
{
try ( Session session = driver.session() )
{
int employees = 0;
List<Record> persons = session.readTransaction( new TransactionWork<List<Record>>()
{
@Override
public List<Record> execute( Transaction tx )
{
return matchPersonNodes( tx );
}
} );
for ( final Record person : persons )
{
employees += session.writeTransaction( new TransactionWork<Integer>()
{
@Override
public Integer execute( Transaction tx )
{
tx.run( "MATCH (emp:Person {name: $person_name}) " +
"MERGE (com:Company {name: $company_name}) " +
"MERGE (emp)-[:WORKS_FOR]->(com)",
parameters( "person_name", person.get( "name" ).asString(), "company_name",
companyName ) );
return 1;
}
} );
}
return employees;
}
}
private static List<Record> matchPersonNodes( Transaction tx )
{
return tx.run( "MATCH (a:Person) RETURN a.name AS name" ).list();
}
def add_employee_to_company(tx, person, company_name):
tx.run("MATCH (emp:Person {name: $person_name}) "
"MERGE (com:Company {name: $company_name}) "
"MERGE (emp)-[:WORKS_FOR]->(com)",
person_name=person["name"], company_name=company_name)
return 1
def match_person_nodes(tx):
return list(tx.run("MATCH (a:Person) RETURN a.name AS name"))
def add_employees(company_name):
employees = 0
with driver.session() as session:
persons = session.read_transaction(match_person_nodes)
for person in persons:
employees += session.write_transaction(add_employee_to_company, person, company_name)
return employees
Asynchronous sessions
Available in:
|
Asynchronous sessions provide an API wherein function calls typically return available objects such as futures. This allows client applications to work within asynchronous frameworks and take advantage of cooperative multitasking.
Lifecycle
Session lifetime begins with session construction. A session then exists until it is closed, which is typically set to occur after its contained query results have been consumed.
Sessions can be configured in a number of different ways. This is carried out by supplying configuration inside the session constructor. See Session configuration for more details.
Transaction functions
Transaction functions are the recommended form for containing transactional units of work. This form of transaction requires minimal boilerplate code and allows for a clear separation of database queries and application logic. Transaction functions are also desirable since they encapsulate retry logic and allow for the greatest degree of flexibility when swapping out a single instance of server for a cluster.
Functions can be called as either read or write operations. This choice will route the transaction to an appropriate server within a clustered environment. If you are in a single instance environment, this routing has no impact but it does give you the flexibility should you choose to later adopt a clustered environment.
Before writing a transaction function it is important to ensure that any side-effects carried out by a transaction function should be designed to be idempotent. This is because a function may be executed multiple times if initial runs fail.
Any query results obtained within a transaction function should be consumed within that function, as connection-bound resources cannot be managed correctly when out of scope. To that end, transaction functions can return values but these should be derived values rather than raw results.
When a transaction fails, the driver retry logic is invoked. For several failure cases, the transaction can be immediately retried against a different server. These cases include connection issues, server role changes (e.g. leadership elections) and transient errors. Retry logic can be configured when creating a session. |
public async Task<List<string>> PrintAllProducts()
{
List<string> result = null;
var session = Driver.AsyncSession();
try
{
// Wrap whole operation into an managed transaction and
// get the results back.
result = await session.ReadTransactionAsync(async tx =>
{
var products = new List<string>();
// Send cypher query to the database
var reader = await tx.RunAsync(
"MATCH (p:Product) WHERE p.id = $id RETURN p.title", // Cypher query
new {id = 0} // Parameters in the query, if any
);
// Loop through the records asynchronously
while (await reader.FetchAsync())
{
// Each current read in buffer can be reached via Current
products.Add(reader.Current[0].ToString());
}
return products;
});
}
finally
{
// asynchronously close session
await session.CloseAsync();
}
return result;
}
public CompletionStage<ResultSummary> printAllProducts()
{
String query = "MATCH (p:Product) WHERE p.id = $id RETURN p.title";
Map<String,Object> parameters = Collections.singletonMap( "id", 0 );
AsyncSession session = driver.asyncSession();
return session.readTransactionAsync( tx ->
tx.runAsync( query, parameters )
.thenCompose( cursor -> cursor.forEachAsync( record ->
// asynchronously print every record
System.out.println( record.get( 0 ).asString() ) ) )
);
}
const session = driver.session()
const titles = []
try {
const result = await session.readTransaction(tx =>
tx.run('MATCH (p:Product) WHERE p.id = $id RETURN p.title', { id: 0 })
)
const records = result.records
for (let i = 0; i < records.length; i++) {
const title = records[i].get(0)
titles.push(title)
}
} finally {
await session.close()
}
Auto-commit transactions
An auto-commit transaction is a basic but limited form of transaction. Such a transaction consists of only one Cypher query and is not automatically replayed on failure. Therefore any error scenarios must be handled by the client application itself.
Auto-commit transactions are intended to be used for simple use cases such as when learning Cypher or writing one-off scripts.
It is not recommended to use auto-commit transactions in production environments. |
Unlike other kinds of Cypher Query, Therefore, the only way to execute them from a driver is to use auto-commit transactions. Please refer to the Cypher Manual → |
public async Task<List<string>> ReadProductTitles()
{
var records = new List<string>();
var session = Driver.AsyncSession();
try
{
// Send cypher query to the database.
// The existing IResult interface implements IEnumerable
// and does not play well with asynchronous use cases. The replacement
// IResultCursor interface is returned from the RunAsync
// family of methods instead and provides async capable methods.
var reader = await session.RunAsync(
"MATCH (p:Product) WHERE p.id = $id RETURN p.title", // Cypher query
new {id = 0} // Parameters in the query, if any
);
// Loop through the records asynchronously
while (await reader.FetchAsync())
{
// Each current read in buffer can be reached via Current
records.Add(reader.Current[0].ToString());
}
}
finally
{
// asynchronously close session
await session.CloseAsync();
}
return records;
}
public CompletionStage<List<String>> readProductTitles()
{
String query = "MATCH (p:Product) WHERE p.id = $id RETURN p.title";
Map<String,Object> parameters = Collections.singletonMap( "id", 0 );
AsyncSession session = driver.asyncSession();
return session.runAsync( query, parameters )
.thenCompose( cursor -> cursor.listAsync( record -> record.get( 0 ).asString() ) )
.exceptionally( error ->
{
// query execution failed, print error and fallback to empty list of titles
error.printStackTrace();
return Collections.emptyList();
} )
.thenCompose( titles -> session.closeAsync().thenApply( ignore -> titles ) );
}
async function readProductTitles () {
const session = driver.session()
try {
const result = await session.run(
'MATCH (p:Product) WHERE p.id = $id RETURN p.title',
{
id: 0
}
)
const records = result.records
const titles = []
for (let i = 0; i < records.length; i++) {
const title = records[i].get(0)
titles.push(title)
}
return titles
} finally {
await session.close()
}
}
Consuming results
The asynchronous session API provides language-idiomatic methods to aid integration with asynchronous applications and frameworks.
public async Task<List<string>> GetPeopleAsync()
{
var session = Driver.AsyncSession();
try
{
return await session.ReadTransactionAsync(async tx =>
{
var result = await tx.RunAsync("MATCH (a:Person) RETURN a.name ORDER BY a.name");
return await result.ToListAsync(r => r[0].As<string>());
});
}
finally
{
await session.CloseAsync();
}
}
public CompletionStage<List<String>> getPeople()
{
String query = "MATCH (a:Person) RETURN a.name ORDER BY a.name";
AsyncSession session = driver.asyncSession();
return session.readTransactionAsync( tx ->
tx.runAsync( query )
.thenCompose( cursor -> cursor.listAsync( record ->
record.get( 0 ).asString() ) )
);
}
const session = driver.session()
const result = session.run('MATCH (a:Person) RETURN a.name ORDER BY a.name')
const collectedNames = []
result.subscribe({
onNext: record => {
const name = record.get(0)
collectedNames.push(name)
},
onCompleted: () => {
session.close().then(() => {
console.log('Names: ' + collectedNames.join(', '))
})
},
onError: error => {
console.log(error)
}
})
Reactive sessions
Available in:
|
Starting with Neo4j 4.0, the reactive processing of queries is supported. This can be achieved through reactive sessions. Reactive sessions allow for dynamic management of the data that is being exchanged between the driver and the server.
Typical of reactive programming, consumers control the rate at which they consume records from queries and the driver in turn manages the rate at which records are requested from the server. Flow control is supported throughout the entire Neo4j stack, meaning that the query engine responds correctly to the flow control signals. This results in far more efficient resource handling and ensures that the receiving side is not forced to buffer arbitrary amounts of data.
For more information about reactive stream, please see the following:
Reactive sessions will typically be used in a client application that is already oriented towards the reactive style; it is expected that a reactive dependency or framework is in place. Refer to Get started for more information on recommended dependencies. |
Lifecycle
Session lifetime begins with session construction. A session then exists until it is closed, which is typically set to occur after its contained query results have been consumed.
Transaction functions
This form of transaction requires minimal boilerplate code and allows for a clear separation of database queries and application logic. Transaction functions are also desirable since they encapsulate retry logic and allow for the greatest degree of flexibility when swapping out a single instance of server for a cluster.
Functions can be called as either read or write operations. This choice will route the transaction to an appropriate server within a clustered environment. If you are in a single instance environment, this routing has no impact but it does give you the flexibility should you choose to later adopt a clustered environment.
Before writing a transaction function it is important to ensure that any side-effects carried out by a transaction function should be designed to be idempotent. This is because a function may be executed multiple times if initial runs fail.
Any query results obtained within a transaction function should be consumed within that function, as connection-bound resources cannot be managed correctly when out of scope. To that end, transaction functions can return values but these should be derived values rather than raw results.
When a transaction fails, the driver retry logic is invoked. For several failure cases, the transaction can be immediately retried against a different server. These cases include connection issues, server role changes (e.g. leadership elections) and transient errors. Retry logic can be configured when creating a session. |
public IObservable<string> PrintAllProducts()
{
var session = Driver.RxSession();
return session.ReadTransaction(tx =>
{
return tx.Run(
"MATCH (p:Product) WHERE p.id = $id RETURN p.title", // Cypher query
new {id = 0} // Parameters in the query, if any
)
.Records()
.Select(record => record[0].ToString());
}).OnErrorResumeNext(session.Close<string>());
}
public Flux<ResultSummary> printAllProducts()
{
String query = "MATCH (p:Product) WHERE p.id = $id RETURN p.title";
Map<String,Object> parameters = Collections.singletonMap( "id", 0 );
return Flux.usingWhen( Mono.fromSupplier( driver::rxSession ),
session -> session.readTransaction( tx -> {
RxResult result = tx.run( query, parameters );
return Flux.from( result.records() )
.doOnNext( record -> System.out.println( record.get( 0 ).asString() ) ).then( Mono.from( result.consume() ) );
}
), RxSession::close );
}
const session = driver.rxSession()
const result = session.readTransaction(tx =>
tx
.run('MATCH (p:Product) WHERE p.id = $id RETURN p.title', { id: 0 })
.records()
.pipe(
map(r => r.get(0)),
materialize(),
toArray()
)
)
Sessions can be configured in a number of different ways. This is carried out by supplying configuration inside the session constructor. See Session configuration for more details.
Auto-commit transactions
An auto-commit transaction is a basic but limited form of transaction. Such a transaction consists of only one Cypher Query and is not automatically replayed on failure. Therefore any error scenarios will need to be handled by the client application itself.
Auto-commit transactions are intended to be used for simple use cases such as when learning Cypher or writing one-off scripts.
It is not recommended to use auto-commit transactions in production environments. |
The only way to execute Unlike other kinds of Cypher query, Please refer to the Cypher Manual → |
public IObservable<string> ReadProductTitles()
{
var session = Driver.RxSession();
return session.Run(
"MATCH (p:Product) WHERE p.id = $id RETURN p.title", // Cypher query
new {id = 0} // Parameters in the query, if any
)
.Records()
.Select(record => record[0].ToString())
.OnErrorResumeNext(session.Close<string>());
}
public Flux<String> readProductTitles()
{
String query = "MATCH (p:Product) WHERE p.id = $id RETURN p.title";
Map<String,Object> parameters = Collections.singletonMap( "id", 0 );
return Flux.usingWhen( Mono.fromSupplier( driver::rxSession ),
session -> Flux.from( session.run( query, parameters ).records() ).map( record -> record.get( 0 ).asString() ),
RxSession::close );
}
function readProductTitles () {
const session = driver.rxSession()
return session
.run('MATCH (p:Product) WHERE p.id = $id RETURN p.title', {
id: 0
})
.records()
.pipe(
map(r => r.get(0)),
materialize(),
toArray()
)
}
Consuming results
To consume data from a query in a reactive session, a subscriber is required to handle the results that are being returned by the publisher.
Each transaction corresponds to a data flow which supplies the data from the server. Result processing begins when records are pulled from this flow. Only one subscriber may pull data from a given flow.
public IObservable<string> GetPeople()
{
var session = Driver.RxSession();
return session.ReadTransaction(tx =>
{
return tx.Run("MATCH (a:Person) RETURN a.name ORDER BY a.name")
.Records()
.Select(record => record[0].As<string>());
}).OnErrorResumeNext(session.Close<string>());
}
public Flux<String> getPeople()
{
String query = "MATCH (a:Person) RETURN a.name ORDER BY a.name";
return Flux.usingWhen( Mono.fromSupplier( driver::rxSession ),
session -> session.readTransaction( tx -> {
RxResult result = tx.run( query );
return Flux.from( result.records() )
.map( record -> record.get( 0 ).asString() );
}
), RxSession::close );
}
const session = driver.rxSession()
const result = session
.run('MATCH (a:Person) RETURN a.name ORDER BY a.name')
.records()
.pipe(
map(r => r.get(0)),
materialize(),
toArray()
)
Session configuration
Bookmarks
-
The mechanism which ensures causal consistency between transactions within a session. Bookmarks are implicitly passed between transactions within a single session to meet the causal consistency requirements. There may be scenarios where you might want to use the bookmark from one session in a different new session.
Default: None (Sessions will initially be created without a bookmark)
- Default Access Mode
-
A fallback for the access mode setting when transaction functions are not used. Typically, access mode is set per transaction by calling the appropriate transaction function method. In other cases, this setting is inherited. Note that transaction functions will ignore/override this setting.
Default: Write
- Database
-
The database with which the session will interact. When you are working with a database which is not the default (i.e. the
system
database or another database in Neo4j 4.0 Enterprise Edition), you can explicitly configure the database which the driver is executing transactions against. See Operations Manual → The default database for more information on databases.Default: the default database as configured on the server.
# end::Database[]
Fetch Size
-
The number of records to fetch in each batch from the server. Neo4j 4.0 introduces the ability to pull records in batches, allowing the client application to take control of data population and apply back pressure to the server. This
FetchSize
applies to simple sessions and async-sessions whereas reactive sessions can be controlled directly using the request method of the subscription.Default: 1000 records