- Notifications
You must be signed in to change notification settings - Fork187
pgjdbc/r2dbc-postgresql
Folders and files
| Name | Name | Last commit message | Last commit date | |
|---|---|---|---|---|
Repository files navigation
This project contains thePostgreSQL implementation of theR2DBC SPI. This implementation is not intended to be used directly, but rather to be used as the backing implementation for a humane client library to delegate to.
This driver provides the following features:
- Implements R2DBC 1.0
- Login with username/password (MD5, SASL/SCRAM) or implicit trust
- Supports credential rotation by providing
Supplier<String>orPublisher<String> - SCRAM authentication
- Unix Domain Socket transport
- Connection Fail-over supporting multiple hosts
- TLS
- Explicit transactions
- Notifications
- Logical Decode
- Binary data transfer
- Execution of prepared statements with bindings
- Execution of batch statements without bindings
- Read and write support for a majority of data types (seeData Type Mapping for details)
- Fetching of
REFCURSORusingio.r2dbc.postgresql.api.RefCursor - Extension points to register
Codecs to handle additional PostgreSQL data types
Next steps:
- Multi-dimensional arrays
This project is governed by theCode of Conduct. By participating, you are expected to uphold this code of conduct. Please report unacceptable behavior tor2dbc@googlegroups.com.
Here is a quick teaser of how to use R2DBC PostgreSQL in Java:
URL Connection Factory Discovery
ConnectionFactoryconnectionFactory =ConnectionFactories.get("r2dbc:postgresql://<host>:5432/<database>");Publisher<?extendsConnection>connectionPublisher =connectionFactory.create();
Programmatic Connection Factory Discovery
Map<String,String>options =newHashMap<>();options.put("lock_timeout","10s");options.put("statement_timeout","5m");ConnectionFactoryconnectionFactory =ConnectionFactories.get(ConnectionFactoryOptions.builder() .option(DRIVER,"postgresql") .option(HOST,"...") .option(PORT,5432)// optional, defaults to 5432 .option(USER,"...") .option(PASSWORD,"...") .option(DATABASE,"...")// optional .option(OPTIONS,options)// optional .build());Publisher<?extendsConnection>connectionPublisher =connectionFactory.create();// Alternative: Creating a Mono using Project ReactorMono<Connection>connectionMono =Mono.from(connectionFactory.create());
Supported ConnectionFactory Discovery Options
| Option | Description |
|---|---|
ssl | Enables SSL usage (SSLMode.VERIFY_FULL). |
driver | Must bepostgresql. |
protocol | Protocol specifier. Empty to use single-host operations. Supported:failover for multi-server failover operations.(Optional) |
host | Server hostname to connect to. May contain a comma-separated list of hosts with ports when using thefailover protocol. |
port | Server port to connect to. Defaults to5432.(Optional) |
socket | Unix Domain Socket path to connect to as alternative to TCP.(Optional) |
username | Login username. Can be a plainString,Supplier<String>, orPublisher<String>. |
password | Login password. Can be a plainCharSequence,Supplier<CharSequence>, orPublisher<CharSequence>.(Optional when using TLS Certificate authentication) |
database | Database to select.(Optional) |
applicationName | The name of the application connecting to the database. Defaults tor2dbc-postgresql.(Optional) |
autodetectExtensions | Whether to auto-detect and registerExtensions from the class path. Defaults totrue.(Optional) |
compatibilityMode | Enable compatibility mode for cursored fetching. Required when using newer pgpool versions. Defaults tofalse.(Optional) |
errorResponseLogLevel | Log level for error responses. Any ofOFF,DEBUG,INFO,WARN orERROR Defaults toDEBUG.(Optional) |
extensions | Collection ofExtension to provide additional extensions when creating a connection factory. Defaults to empty.(Optional) |
fetchSize | The default number of rows to return when fetching results. Defaults to0 for unlimited.(Optional) |
forceBinary | Whether to force binary transfer. Defaults tofalse.(Optional) |
hostRecheckTime | Host status recheck time when using multi-server operations. Defaults to10 seconds.(Optional) |
loadBalanceHosts | Whether to shuffle the list of given hostnames before connect when using multi-server operations. Defaults totrue.(Optional) |
loopResources | TCP/Socket LoopResources (depends on the endpoint connection type).(Optional) |
lockWaitTimeout | Lock wait timeout.(Optional) |
noticeLogLevel | Log level for error responses. Any ofOFF,DEBUG,INFO,WARN orERROR Defaults toDEBUG.(Optional) |
preferAttachedBuffers | Configure whether codecs should prefer attached data buffers. The default isfalse, meaning that codecs will copy data from the input buffer into a byte array. Enabling attached buffers requires consumption of values such asJson to avoid memory leaks. |
preparedStatementCacheQueries | Determine the number of queries that are cached in each connection. The default is-1, meaning there's no limit. The value of0 disables the cache. Any other value specifies the cache size. |
options | AMap<String, String> of connection parameters. These are applied to each database connection created by theConnectionFactory. Useful for setting genericPostgreSQL connection parameters.(Optional) |
schema | The search path to set.(Optional) |
sslMode | SSL mode to use, seeSSLMode enum. Supported values:DISABLE,ALLOW,PREFER,REQUIRE,VERIFY_CA,VERIFY_FULL.(Optional) |
sslNegotiation | SSL negotiation to use, seeSSLNegotiation enum. Supported values:POSTGRES,DIRECT,TUNNEL.(Optional) |
sslRootCert | Path to SSL CA certificate in PEM format. Can be also a resource path.(Optional) |
sslKey | Path to SSL key for TLS authentication in PEM format. Can be also a resource path.(Optional) |
sslCert | Path to SSL certificate for TLS authentication in PEM format. Can be also a resource path.(Optional) |
sslPassword | Key password to decrypt SSL key.(Optional) |
sslHostnameVerifier | javax.net.ssl.HostnameVerifier implementation.(Optional) |
sslSni | Enable/disable SNI to send the configuredhost name during the SSL handshake. Defaults totrue.(Optional) |
statementTimeout | Statement timeout.(Optional) |
targetServerType | Type of server to use when using multi-host operations. Supported values:ANY,PRIMARY,SECONDARY,PREFER_SECONDARY. Defaults toANY.(Optional) |
tcpNoDelay | Enable/disable TCP NoDelay. Enabled by default.(Optional) |
tcpKeepAlive | Enable/disable TCP KeepAlive. Disabled by default.(Optional) |
timeZone | Configure the session timezone to control conversion of local temporal representations. Defaults toTimeZone.getDefault()(Optional) |
Programmatic Configuration
Map<String,String>options =newHashMap<>();options.put("lock_timeout","10s");PostgresqlConnectionFactoryconnectionFactory =newPostgresqlConnectionFactory(PostgresqlConnectionConfiguration.builder() .host("...") .port(5432)// optional, defaults to 5432 .username("...") .password("...") .database("...")// optional .options(options)// optional .build());Mono<Connection>mono =connectionFactory.create();
PostgreSQL uses index parameters that are prefixed with$. The following SQL statement makes use of parameters:
INSERT INTO person (id, first_name, last_name)VALUES ($1, $2, $3)
Parameters are referenced using the same identifiers when binding these:
mono.flatMapMany(connection ->connection .createStatement("INSERT INTO person (id, first_name, last_name) VALUES ($1, $2, $3)") .bind("$1",1) .bind("$2","Walter") .bind("$3","White") .execute());
Binding also allowed positional index (zero-based) references. The parameter index is derived from the parameter discovery order when parsing the query.
Artifacts can be found onMaven Central.
<dependency> <groupId>org.postgresql</groupId> <artifactId>r2dbc-postgresql</artifactId> <version>${version}</version></dependency>
If you'd rather like the latest snapshots of the upcoming major version, use our Maven snapshot repository and declare the appropriate dependency version.
<dependency> <groupId>org.postgresql</groupId> <artifactId>r2dbc-postgresql</artifactId> <version>${version}.BUILD-SNAPSHOT</version></dependency><repository> <id>central-portal-snapshots</id> <name>Central Portal Snapshots</name> <url>https://central.sonatype.com/repository/maven-snapshots/</url></repository>
To support simple connection fail-over it is possible to define multiple endpoints (host and port pairs) in the connection url separated by commas. The driver will try once to connect to each of themin order until the connection succeeds. If none succeeds a normal connection exception is thrown. Make sure to specify thefailover protocol.
The syntax for the connection url is:
r2dbc:postgresql:failover://user:foo@host1:5433,host2:5432,host3For example an application can create two connection pools. One data source is for writes, another for reads. The write pool limits connections only to a primary node:
r2dbc:postgresql:failover://user:foo@host1:5433,host2:5432,host3?targetServerType=primaryR2DBC Postgres supports both, thesimpleandextended message flow.
Cursored fetching is activated by configuring afetchSize. Postgres cursors are valid for the duration of a transaction. R2DBC can use cursors in auto-commit mode (Execute andFlush) to notrequire an explicit transaction (BEGIN…COMMIT/ROLLBACK). Newer pgpool versions don't support this feature. To work around this limitation, either use explicit transactions when configuring a fetchsize or enable compatibility mode. Compatibility mode avoids cursors in auto-commit mode (Execute with no limit +Sync). Cursors in a transaction useExecute (with fetch size as limit) +Syncas message flow.
Listen and Notify provide a simple form of signal or inter-process communication mechanism for processes accessing the same PostgreSQL database. For Listen/Notify, two actors are involved: Thesender (notify) and the receiver (listen). The following example uses two connections to illustrate how they work together:
PostgresqlConnectionsender = …;PostgresqlConnectionreceiver = …;Flux<Notification>listen =receiver.createStatement("LISTEN mymessage") .execute() .flatMap(PostgresqlResult::getRowsUpdated) .thenMany(receiver.getNotifications());Mono<Void>notify =sender.createStatement("NOTIFY mymessage, 'Hello World'") .execute() .flatMap(PostgresqlResult::getRowsUpdated) .then();
Upon subscription, the first connection enters listen mode and publishes incomingNotifications asFlux. The second connection broadcasts a notification to themymessage channel uponsubscription.
Postgres supports additional options when starting a transaction. In particular, the following options can be specified:
- Isolation Level (
isolationLevel) (reset after the transaction to previous value) - Transaction Mutability (
readOnly) - Deferrable Mode (
deferrable)
These options can be specified upon transaction begin to start the transaction and apply options in a single command roundtrip:
PostgresqlConnectionconnection= …;connection.beginTransaction(PostgresTransactionDefinition.from(IsolationLevel.SERIALIZABLE).readOnly().notDeferrable());
See also:https://www.postgresql.org/docs/current/sql-begin.html
PostgreSQL supports JSON by storing values inJSON/JSONB columns. These values can be consumed and written using the regular R2DBC SPI and by using driver-specific extensions withtheio.r2dbc.postgresql.codec.Json type.
You can choose from two approaches:
- Native JSONB encoding using the
Jsonwrapper type. - Using scalar types.
The difference between theJson type and scalar types is thatJson values are written encoded asJSONB to the database.byte[] andString types are represented asBYTEA respectiveVARCHAR and require casting ($1::JSON) when used with parameterized statements.
The following code showsINSERT andSELECT cases for JSON interaction:
CREATETABLEmy_table (my_json JSON);
Write JSON
connection.createStatement("INSERT INTO my_table (my_json) VALUES($1)") .bind("$1",Json.of("{\"hello\":\"world\"}")).execute();
Consume JSON
connection.createStatement("SELECT my_json FROM my_table") .execute() .flatMap(it ->it.map((row,rowMetadata) ->row.get("my_json",Json.class))) .map(Json::asString);
Write JSON using casting
connection.createStatement("INSERT INTO my_table (my_json) VALUES($1::JSON)") .bind("$1","{\"hello\":\"world\"}").execute();
Consume JSON as scalar type
connection.createStatement("SELECT my_json FROM my_table") .execute() .flatMap(it ->it.map((row,rowMetadata) ->row.get("my_json",String.class)));
The following types are supported for JSON exchange:
io.r2dbc.postgresql.codec.JsonByteBuf(must be released after usage to avoid memory leaks)ByteBufferbyte[]StringInputStream(must be closed after usage to avoid memory leaks)
CITEXT is a built-in extension to support case-insensitivetext columns. By default, the driver sends all string values asVARCHAR that cannot be used directly withCITEXT (without casting or converting values in your SQL).
If you cast input, then you can send parameters to the server without further customization of the driver:
CREATETABLEtest (ci CITEXT);SELECT ciFROM testWHERE ci= $1::citext;
If you want to send individualString-values in a CITEXT-compatible way, then useParameters.in(…):
connection.createStatement("SELECT ci FROM test WHERE ci = $1") .bind("$1",Parameters.in(PostgresqlObjectId.UNSPECIFIED,"Hello")) .execute();
If you do not have control over the created SQL or you want to send allString values in a CITEXT-compatible way, then you can customize the driver configuration by registering aStringCodec to sendString values with theUNSPECIFIED OID to let Postgres infer the value type from the provided values:
Builderbuilder =PostgresqlConnectionConfiguration.builder();builder.codecRegistrar((connection,allocator,registry) -> {registry.addFirst(newStringCodec(allocator,PostgresqlObjectId.UNSPECIFIED,PostgresqlObjectId.VARCHAR_ARRAY));returnMono.empty();});
You can register also theCodecRegistrar asExtension so that it gets auto-detected duringConnectionFactory creation.
The driver can consume cursors that were created by PL/pgSQL asrefcursor.Cursors are represented asRefCursor objects. Cursors obtained fromResult can be used to fetch the cursor directly.Since cursors are stateful, they must be closed once they are no longer in use.
connection.createStatement("SELECT show_cities_multiple()").execute() .flatMap(result ->result.map((row,rowMetadata) ->row.get(0,RefCursor.class))) .flatMap(cursor -> {Mono<PostgresResult>data =cursor.fetch() .flatMap(…) .then(rc.close());returndata; });
PostgreSQL allows replication streaming and decoding persistent changes to a database's tables into useful chunks of data.In PostgreSQL, logical decoding is implemented by decoding the contents of the write-ahead log, which describe changes on a storage level, into an application-specific form such as a stream of tuples or SQL statements.
Consuming the replication stream is a four-step process:
- Obtain a replication connection via
PostgresqlConnectionFactory.replication(). - Create a replication slot (physical/logical).
- Initiate replication using the replication slot.
- Once the replication stream is set up, you can consume and map the binary data using
ReplicationStream.map(…).
On application shutdown,close() theReplicationStream.
Note that a connection is busy once the replication is active and a connection can have at most one active replication stream.
Mono<PostgresqlReplicationConnection>replicationMono =connectionFactory.replication();// later:ReplicationSlotRequestrequest =ReplicationSlotRequest.logical() .slotName("my_slot") .outputPlugin("test_decoding") .temporary() .build();Mono<ReplicationSlot>createSlot =replicationConnection.createSlot(request);ReplicationRequestreplicationRequest =ReplicationRequest.logical() .slotName("my_slot") .startPosition(LogSequenceNumber.valueOf(0)) .slotOption("skip-empty-xacts",true) .slotOption("include-xids",false) .build();Flux<T>replicationStream =replicationConnection.startReplication(replicationRequest).flatMapMany(it -> {returnit.map(byteBuf -> …) .doOnError(t ->it.close().subscribe());});
Applications may make use of Postgres enumerated types by usingEnumCodec to map custom types to Javaenum types.EnumCodec requires the Postgres OID and the Java to map enum values to the Postgres protocol and to materialize Enum instances from Postgres results.You can configure aCodecRegistrar throughEnumCodec.builder() for one or more enumeration type mappings. Make sure to use different Java enum types otherwise the driver is not able to distinguish between Postgres OIDs.
Example:
SQL:
CREATETYPEmy_enumAS ENUM ('FIRST','SECOND');
Java Model:
enumMyEnumType {FIRST,SECOND;}
Codec Registration:
PostgresqlConnectionConfiguration.builder() .codecRegistrar(EnumCodec.builder().withEnum("my_enum",MyEnumType.class).build());
When available, the driver registers also an array variant of the codec.
This reference table shows the type mapping betweenPostgreSQL and Java data types:
Note onIntegeroid usage: Postgres OIDs are unsigned 32 bit integers. Make sure to consider how Java represents unsigned integers when working with large OID values or convert the value tolong throughInteger.toUnsignedLong(int).
Types inbold indicate the native (default) Java type.
Support for the following single-dimensional arrays (read and write):
This driver accepts the following extensions:
CodecRegistrarto contributeCodecs for PostgreSQL ObjectIDs.
Extensions can be registered programmatically usingPostgresConnectionConfiguration or discovered using Java'sServiceLoader mechanism (fromMETA-INF/services/io.r2dbc.postgresql.extension.Extension).
The driver ships with built-in dynamic codecs (e.g.hstore, PostGISgeometry) that are registered during the connection handshake depending on their availability while connecting. Note that Postgres extensions registered after a connection was established require a reconnect to initialize the codec.
If SL4J is on the classpath, it will be used. Otherwise, there are two possible fallbacks: Console orjava.util.logging.Logger). By default, the Console fallback is used. To use the JDK loggers, set thereactor.logging.fallback System property toJDK.
Logging facilities:
- Driver Logging (
io.r2dbc.postgresql) - Query Logging (
io.r2dbc.postgresql.QUERYonDEBUGlevel) - Connection Context (
io.r2dbc.postgresql.client.ConnectionContext)DEBUGlevel enables connection and process identifiers in log messages and exceptions ([cid: 0x1][pid: 109])TRACElevel enables socket information (remote and local addresses) to the connection context ([cid: 0x1][pid: 109][id: 0x79dfc4d4, L:/127.0.0.1:49391 - R:localhost/127.0.0.1:49366])
- Parameters' values Logging (
io.r2dbc.postgresql.PARAMonDEBUGlevel) - Transport Logging (
io.r2dbc.postgresql.client)DEBUGenablesMessageexchange loggingTRACEenables traffic logging
Logging that is associated with a connection reports the logical connection id (cid) which is a driver-local connection counter and the Postgres Process Id (pid) once the connection handshake finishes.
Having trouble with R2DBC? We'd love to help!
- Check thespec documentation, andJavadoc.
- If you are upgrading, check out thechangelog for "new and noteworthy" features.
- Ask a question - we monitorstackoverflow.com for questionstagged with
r2dbc.You can also chat with the community onGitter. - Report bugs with R2DBC PostgreSQL atgithub.com/pgjdbc/r2dbc-postgresql/issues.
R2DBC uses GitHub as issue tracking system to record bugs and feature requests.If you want to raise an issue, please follow the recommendations below:
- Before you log a bug, please search theissue tracker to see if someone has already reported the problem.
- If the issue doesn't already exist,create a new issue.
- Please provide as much information as possible with the issue report, we like to know the version of R2DBC PostgreSQL that you are using and JVM version.
- If you need to paste code, or include a stack trace use Markdown ``` escapes before and after your text.
- If possible try to create a test-case or project that replicates the issue.Attach a link to your code or a compressed file containing your code.
You don't need to build from source to use R2DBC PostgreSQL (binaries in Maven Central), but if you want to try out the latest and greatest, R2DBC PostgreSQL can be easily built with themaven wrapper. You also need JDK 1.8 and Docker to run integration tests.
$ ./mvnw clean install
If you want to build with the regularmvn command, you will needMaven v3.9.0 or above.
Also seeCONTRIBUTING.adoc if you wish to submit pull requests.
Running the JMH benchmarks builds and runs the benchmarks without running tests.
$ ./mvnw clean install -Pjmh
This project is released under version 2.0 of theApache License.
About
Postgresql R2DBC Driver
Topics
Resources
License
Contributing
Security policy
Uh oh!
There was an error while loading.Please reload this page.
Stars
Watchers
Forks
Packages0
Uh oh!
There was an error while loading.Please reload this page.