PostgreSQL® Extensions to the JDBC API
PostgreSQL® is an extensible database system. You can add your own functions to the server, which can then be called from queries, or even add your own data types. As these are facilities unique to PostgreSQL®, we support them from Java, with a set of extension APIs. Some features within the core of the standard driver actually use these extensions to implement Large Objects, etc.
Accessing the Extensions
To access some of the extensions, you need to use some extra methods in theorg.postgresql.PGConnection
class. In this case, you would need to cast the return value ofDriver.getConnection()
. For example:
Connectiondb=Driver.getConnection(url,username,password);// ...// later onFastpathfp=db.unwrap(org.postgresql.PGConnection.class).getFastpathAPI();
Timestamp Infinity
The driver uses the following values to represent negative and positive infinity:
type | Negative infinity | Positive infinity |
---|---|---|
LocalDateTime | LocalDateTime.MIN | LocalDateTime.MAX |
OffsetDateTime | OffsetDateTime.MIN | OffsetDateTime.MAX |
java.sql.Timestamp | when object’s millisecond value equalsPGStatement.DATE_NEGATIVE_INFINITY | when object’s millisecond value equalsPGStatement.DATE_POSITIVE_INFINITY |
ResultSet example
java.sql.Timestampts=myResultSet.getTimestamp("mycol");if(ts.getTime()==PGStatement.DATE_NEGATIVE_INFINITY){// The value in the database is '-infinity'}if(ts.getTime()==PGStatement.DATE_POSITIVE_INFINITY){// The value in the database is 'infinity'}
Geometric Data Types
PostgreSQL® has a set of data types that can store geometric features into a table. These include single points, lines, and polygons. We support these types in Java with the org.postgresql.geometric package. Please consult the Javadoc mentioned inFurther Reading for details of available classes and features.
Example 9.1. Using the CIRCLE datatype JDBC
importjava.sql.*;importorg.postgresql.geometric.PGpoint;importorg.postgresql.geometric.PGcircle;publicclassGeometricTest{publicstaticvoidmain(Stringargs[])throwsException{Stringurl="jdbc:postgresql://localhost:5432/test";try(Connectionconn=DriverManager.getConnection(url,"test","")){try(Statementstmt=conn.createStatement()){stmt.execute("CREATE TEMP TABLE geomtest(mycirc circle)");}insertCircle(conn);retrieveCircle(conn);}}privatestaticvoidinsertCircle(Connectionconn)throwsSQLException{PGpointcenter=newPGpoint(1,2.5);doubleradius=4;PGcirclecircle=newPGcircle(center,radius);try(PreparedStatementps=conn.prepareStatement("INSERT INTO geomtest(mycirc) VALUES (?)")){ps.setObject(1,circle);ps.executeUpdate();}}privatestaticvoidretrieveCircle(Connectionconn)throwsSQLException{try(Statementstmt=conn.createStatement()){try(ResultSetrs=stmt.executeQuery("SELECT mycirc, area(mycirc) FROM geomtest")){while(rs.next()){PGcirclecircle=(PGcircle)rs.getObject(1);doublearea=rs.getDouble(2);System.out.println("Center (X, Y) = ("+circle.center.x+", "+circle.center.y+")");System.out.println("Radius = "+circle.radius);System.out.println("Area = "+area);}}}}}
Large Objects
Large objects are supported in the standard JDBC specification. However, thatinterface is limited, and the API provided by PostgreSQL® allows for randomaccess to the objects contents, as if it was a local file.
The org.postgresql.largeobject package provides to Java the libpq C interface’slarge object API. It consists of two classes,LargeObjectManager
, which dealswith creating, opening and deleting large objects, andLargeObject
which dealswith an individual object. For an example usage of this API, please seeProcessing Binary Data in JDBC.
Listen / Notify
Listen and Notify provide a simple form of signal or interprocess communication mechanism for a collection of processes accessing the same PostgreSQL® database. For more information on notifications consult the main server documentation. This section only deals with the JDBC specific aspects of notifications.
StandardLISTEN
,NOTIFY
, andUNLISTEN
commands are issued via the standardStatement
interface. To retrieve and process retrieved notifications theConnection
must be cast to the PostgreSQL® specific extension interfacePGConnection
. From there thegetNotifications()
method can be used to retrieve any outstanding notifications.
NOTE
A key limitation of the JDBC driver is that it cannot receive asynchronous notifications and must poll the backend to check if any notifications were issued. A timeout can be given to the poll function, but then the execution of statements from other threads will block.
Example 9.2. Receiving Notifications
importjava.sql.*;publicclassNotificationTest{publicstaticvoidmain(Stringargs[])throwsException{Class.forName("org.postgresql.Driver");Stringurl="jdbc:postgresql://localhost:5432/test";// Create two distinct connections, one for the notifier// and another for the listener to show the communication// works across connections although this example would// work fine with just one connection.ConnectionlConn=DriverManager.getConnection(url,"test","");ConnectionnConn=DriverManager.getConnection(url,"test","");// Create two threads, one to issue notifications and// the other to receive them.Listenerlistener=newListener(lConn);Notifiernotifier=newNotifier(nConn);listener.start();notifier.start();}}classListenerextendsThread{privateConnectionconn;privateorg.postgresql.PGConnectionpgconn;Listener(Connectionconn)throwsSQLException{this.conn=conn;this.pgconn=conn.unwrap(org.postgresql.PGConnection.class);Statementstmt=conn.createStatement();stmt.execute("LISTEN mymessage");stmt.close();}publicvoidrun(){try{while(true){org.postgresql.PGNotificationnotifications[]=pgconn.getNotifications();// If this thread is the only one that uses the connection, a timeout can be used to// receive notifications immediately:// org.postgresql.PGNotification notifications[] = pgconn.getNotifications(10000);for(inti=0;i<notifications.length;i++){System.out.println("Got notification: "+notifications[i].getName());}// wait a while before checking again for new// notificationsThread.sleep(500);}}catch(SQLExceptionsqle){sqle.printStackTrace();}catch(InterruptedExceptionie){ie.printStackTrace();}}}classNotifierextendsThread{privateConnectionconn;publicNotifier(Connectionconn){this.conn=conn;}publicvoidrun(){while(true){try{Statementstmt=conn.createStatement();stmt.execute("NOTIFY mymessage");stmt.close();Thread.sleep(2000);}catch(SQLExceptionsqle){sqle.printStackTrace();}catch(InterruptedExceptionie){ie.printStackTrace();}}}}
Server Prepared Statements
Motivation
The PostgreSQL® server allows clients to compile sql statements that are expected to be reused to avoid the overhead of parsing and planning the statement for every execution. This functionality is available at the SQL level via PREPARE and EXECUTE beginning with server version 7.3, and at the protocol level beginning with server version 7.4, but as Java developers we really just want to use the standardPreparedStatement
interface.
NOTE
PostgreSQL® 9.2 release notes: prepared statements used to be optimized once, without any knowledge of the parameters’ values. With 9.2, the planner will use specific plans regarding to the parameters sent (the query will be planned at execution), except if the query is executed several times and the planner decides that the generic plan is not too much more expensive than the specific plans.
Server side prepared statements can improve execution speed as
- It sends just statement handle (e.g.
S_1
) instead of full SQL text - It enables use of binary transfer (e.g. binary int4, binary timestamps, etc); the parameters and results are much faster to parse
- It enables the reuse server-side execution plan
- The client can reuse result set column definition, so it does not have to receive and parse metadata on each execution
Activation
Previous versions of the driver used PREPARE and EXECUTE to implement server-prepared statements.
This is supported on all server versions beginning with 7.3, but produced application-visible changes in query results,such as missing ResultSet metadata and row update counts. The current driver uses the V3 protocol-level equivalentswhich avoid these changes in query results. The Extended Query protocol prepares a temporary “unnamed statement”.SeeExtended Query Section 53.2.3 for details.
The driver uses the Extended Protocolby default when thePreparedStatement
API is used.
An internal counter keeps track of how many times the statement has been executed and when it reaches theprepareThreshold
(default 5)the driver will switch to creating a named statement and usingPrepare
andExecute
.
It is generally a good idea to reuse the samePreparedStatement
object for performance reasons, however the driver is able to server-prepare statements automatically acrossconnection.prepareStatement(...)
calls.
For instance:
PreparedStatementps=con.prepareStatement("select /*test*/ ?::int4");ps.setInt(1,42);ps.executeQuery().close();ps.close();PreparedStatementps=con.prepareStatement("select /*test*/ ?::int4");ps.setInt(1,43);ps.executeQuery().close();ps.close();
is less efficient than
PreparedStatementps=con.prepareStatement("select /*test*/ ?::int4");ps.setInt(1,42);ps.executeQuery().close();ps.setInt(1,43);ps.executeQuery().close();
however pgJDBC can use server side prepared statements in both cases.
Note
The
Statement
object is bound to aConnection
, and it is not a good idea to access the sameStatement
and/orConnection
from multiple concurrent threads (exceptcancel()
,close()
, and alike cases). It might be safer tojustclose()
the statement rather than trying to cache it somehow.
Server-prepared statements consume memory both on the client and the server, so pgJDBC limits the number of server-preparedstatements per connection. It can be configured viapreparedStatementCacheQueries
(default256
, the number of queriesknown to pgJDBC), andpreparedStatementCacheSizeMiB
(default5
, that is the client side cache size in megabytes perconnection). Only a subset ofstatement cache
is server-prepared as some statements might fail to reachprepareThreshold
.
Deactivation
There might be cases when you would want to disable use of server-prepared statements. For instance, if you routeconnections through a balancer that is incompatible with server-prepared statements, you have little choice.
You can disable usage of server side prepared statements by settingprepareThreshold=0
Corner cases
DDL
V3 protocol avoids sending column metadata on each execution, and BIND message specifies output column format.That creates a problem for cases like
SELECT*FROMmytable;ALTERmytableADDcolumn...;SELECT*FROMmytable;
That results incached plan must not change result type
error, and it causes the transaction to fail.
The recommendation is:
- Use explicit column names in the SELECT list
- Avoid column type alters
DEALLOCATE ALL, DISCARD ALL
There are explicit commands to deallocate all server side prepared statements. It would result in the following server-sideerror message:prepared statement name is invalid
. Of course, it could defeat pgJDBC, however there are cases when you needto discard statements (e.g. after lots of DDLs)
The recommendation is:
- Use simple
DEALLOCATE ALL
and/orDISCARD ALL
commands, avoid nesting the commands into pl/pgsql or alike.The driver does understand top-level DEALLOCATE/DISCARD commands, and it invalidates client-side cache as well - Reconnect. The cache is per connection, so it would get invalidated if you reconnect
set search_path = …
PostgreSQL® allows to customizesearch_path
, and it provides great power to the developer. With great power thefollowing case could happen:
setsearch_path='app_v1';SELECT*FROMmytable;setsearch_path='app_v2';SELECT*FROMmytable;-- Does mytable mean app_v1.mytable or app_v2.mytable here?
Server side prepared statements are linked to database object IDs, so it could fetch data from “old”app_v1.mytable
table.It is hard to tell which behaviour is expected, however pgJDBC tries to tracksearch_path
changes, and it invalidatesprepare cache accordingly.
The recommendation is:
- Avoid changing
search_path
often, as it invalidates server side prepared statements - Use simple
set search_path...
commands, avoid nesting the commands into pl/pgsql or alike, otherwise pgJDBC won’tbe able to identifysearch_path
change
Re-execution of failed statements
It is a pity that a singlecached plan must not change result type
could cause the whole transaction to fail. The drivercould re-execute the statement automatically in certain cases.
- In case the transaction has not failed (e.g. the transaction did not exist before execution of the statement that caused
cached plan...
error), then pgJDBC re-executes the statement automatically. This makes the application happy, and avoidsunnecessary errors. - In case the transaction is in a failed state, there’s nothing to do but rollback it. pgJDBC does have “automatic savepoint”feature, and it could automatically rollback and retry the statement. The behaviour is controlled via
autosave
property(defaultnever
). The value ofconservative
would auto-rollback for the errors related to invalid server-prepared statements.
Note
autosave
might result insevere performance issues for long transactions, as PostgreSQL® backend is not optimizedfor the case of long transactions and lots of savepoints.
Replication connection
PostgreSQL® replication connection does not allow to use server side prepared statements, so pgJDBCuses simple queries in the case wherereplication
connection property is activated.
Use of server-prepared statements for con.createStatement()
By default, pgJDBC uses server-prepared statements forPreparedStatement
only, however you might wantto activate server side prepared statements for regularStatement
as well. For instance, if youexecute the same statement throughcon.createStatement().executeQuery(...)
, then you might improveperformance by caching the statement. Of course, it is better to usePreparedStatements
explicitly,however the driver has an option to cache simple statements as well.
You can do that by settingpreferQueryMode
toextendedCacheEverything
.
Note
the option is more of a diagnostic/debugging sort, so be careful how you use it .
Bind placeholder datatypes
The database optimizes the execution plan for given parameter types.Consider the below case:
-- create table rooms (id int4, name varchar);-- create index name__rooms on rooms(name);PreparedStatementps=con.prepareStatement("select id from rooms where name=?");ps.setString(1,"42");
It works as expected, however what would happen if one usessetInt
instead?ps.setInt(1, 42);
Even though the result would be identical, the first variation (setString
case) enables the database to use indexname__rooms
, and the latter does not. In case the database gets42
as integer, it uses the plan likewhere cast(name as int4) = ?
.
The plan has to be specific for the (SQL text
;parameter types
) combination, so the driver has to invalidateserver side prepared statements in case the statement is used with different parameter types.
This gets especially painful for batch operations as you don’t want to interrupt the batch by using alternating datatypes.
The most typical case is as follows (don’t ever use this in production):
PreparedStatementps=con.prepareStatement("select id from rooms where ...");if(paraminstanceofString){ps.setString(1,param);}elseif(paraminstanceofInteger){ps.setInt(1,((Integer)param).intValue());}else{// Does it really matter which type of NULL to use?// In fact, it does since data types specify which server-procedure to callps.setNull(1,Types.INTEGER);}
As you might guess,setString
vssetNull(..., Types.INTEGER)
result in alternating datatypes,and it forces the driver to invalidate and re-prepare server side statement.
Recommendation is to use the consistent datatype for each bind placeholder, and use the same typeforsetNull
.Check outorg.postgresql.test.jdbc2.PreparedStatementTest.testAlternatingBindType
example for more details.
Debugging
In case you run intocached plan must not change result type
orprepared statement \"S_2\" does not exist
thefollowing might be helpful to debug the case.
- Client logging. If you add
loggerLevel=TRACE&loggerFile=pgjdbc-trace.log
, you would get traceof the messages send between the driver and the backend - You might check
org.postgresql.test.jdbc2.AutoRollbackTest
as it verifies lots of combinations
Client Logging
Logging is now configured usingjava.util.logging
. Create a logging.properties file in resources similar to:
handlers=java.util.logging.FileHandler.level=INFOjava.util.logging.FileHandler.level=FINESTjava.util.logging.FileHandler.formatter=java.util.logging.SimpleFormatterjava.util.logging.FileHandler.pattern=/tmp/debug.logjava.util.logging.ConsoleHandler.level=INFOjava.util.logging.ConsoleHandler.formatter=java.util.logging.SimpleFormatterorg.postgresql.level=FINEST
Which can be loaded using:
LogManager.getLogManager().readConfiguration(YourClass.class.getResourceAsStream("/logging.properties"));
Example 9.3. Using server side prepared statements
importjava.sql.*;publicclassServerSidePreparedStatement{publicstaticvoidmain(Stringargs[])throwsException{Stringurl="jdbc:postgresql://localhost:5432/test";try(Connectionconn=DriverManager.getConnection(url,"test","")){try(PreparedStatementpstmt=conn.prepareStatement("SELECT ?")){// cast to the pg extension interfaceorg.postgresql.PGStatementpgstmt=pstmt.unwrap(org.postgresql.PGStatement.class);// on the third execution start using server side statementspgstmt.setPrepareThreshold(3);for(inti=1;i<=5;i++){pstmt.setInt(1,i);booleanusingServerPrepare=pgstmt.isUseServerPrepare();ResultSetrs=pstmt.executeQuery();rs.next();System.out.println("Execution: "+i+", Used server side: "+usingServerPrepare+", Result: "+rs.getInt(1));rs.close();}}}}}
Which produces the expected result of using server side prepared statements uponthe third execution.
Execution | Used server side | Result |
---|---|---|
1 | false | 1 |
2 | false | 2 |
3 | true | 3 |
4 | true | 4 |
5 | true | 5 |
The example shown above requires the programmer to use PostgreSQL® specific code in a supposedly portable API which is not ideal.Also it sets the threshold only for that particular statement which is some extra typing if we wanted to use that threshold forevery statement. Let’s take a look at the other ways to set the threshold to enable server side prepared statements.There is already a hierarchy in place above aPreparedStatement
, theConnection
it was created from, and above thatthe source of the connection be it aDatasource
or a URL. The server side prepared statement threshold can be set at anyof these levels such that the value will be the default for all of its children.
// pg extension interfacesorg.postgresql.PGConnectionpgconn;org.postgresql.PGStatementpgstmt;// set a prepared statement threshold for connections created from this urlStringurl="jdbc:postgresql://localhost:5432/test?prepareThreshold=3";// see that the connection has picked up the correct threshold from the urlConnectionconn=DriverManager.getConnection(url,"test","");pgconn=conn.unwrap(org.postgresql.PGConnection.class);System.out.println(pgconn.getPrepareThreshold());// Should be 3// see that the statement has picked up the correct threshold from the connectionPreparedStatementpstmt=conn.prepareStatement("SELECT ?");pgstmt=pstmt.unwrap(org.postgresql.PGStatement.class);System.out.println(pgstmt.getPrepareThreshold());// Should be 3// change the connection's threshold and ensure that new statements pick it uppgconn.setPrepareThreshold(5);PreparedStatementpstmt=conn.prepareStatement("SELECT ?");pgstmt=pstmt.unwrap(org.postgresql.PGStatement.class);System.out.println(pgstmt.getPrepareThreshold());// Should be 5
Parameter Status Messages
PostgreSQL® supports server parameters, also called server variables or, internally, Grand Unified Configuration (GUC) variables.These variables are manipulated by theSET
command,postgresql.conf
,ALTER SYSTEM SET
,ALTER USER SET
,ALTER DATABASE SET
,theset_config(...)
SQL-callable function, etc. SeeThe PostgreSQL manual.
For a subset of these variables the server willautomatically report changes to the value to the client driver and application.These variables are known internally asGUC_REPORT
variables after the name of the flag that enables the functionality.
The server keeps track of all the variable scopes and reports when a variable reverts to a prior value, so the client doesn’thave to guess what the current value is and whether some server-side function could’ve changed it. Whenever the value changes,no matter why or how it changes, the server reports the new effective value in aParameter Status protocol message to the client.pgJDBC uses many of these reports internally.
As of pgJDBC 42.2.6, it also exposes the parameter status information to user applications via the PGConnection extensions interface.
Methods
Two methods onorg.postgresql.PGConnection
provide the client interface to reported parameters. Parameter names arecase-insensitive and case-preserving.
Map PGConnection.getParameterStatuses()
- return a map of all reported parameters and their values.String PGConnection.getParameterStatus()
- shorthand to retrieve one value by name, or null if no value has been reported.
See thePGConnection
JavaDoc for details.
Example
If you’re working directly with ajava.sql.Connection
you can
importorg.postgresql.PGConnection;voidmy_function(Connectionconn){System.out.println("My application name is "+((PGConnection)conn).getParameterStatus("application_name"));}
Other client drivers
Thelibpq
equivalent is thePQparameterStatus(...)
API function.
Physical and Logical replication API
Postgres 9.4 (released in December 2014) introduced a new feature called logical replication. Logical replication allowschanges from a database to be streamed in real-time to an external system. The difference between physical replication andlogical replication is that logical replication sends data over in a logical format whereas physical replication sends dataover in a binary format. Additionally logical replication can send over a single table, or database. Binary replicationreplicates the entire cluster in an all or nothing fashion; which is to say there is no way to get a specific table ordatabase using binary replication
Prior to logical replication keeping an external system synchronized in real time was problematic. The application wouldhave to update/invalidate the appropriate cache entries, reindex the data in your search engine, send it to your analyticssystem, and so on.
This suffers from race conditions and reliability problems. For example if slightly different data gets written to twodifferent datastores (perhaps due to a bug or a race condition), the contents of the datastores will gradually driftapart — they will become more and more inconsistent over time. Recovering from such gradual data corruption is difficult.
Logical decoding takes the database’s write-ahead log (WAL), and gives us access to row-level change events:every time a row in a table is inserted, updated or deleted, that’s an event. Those events are grouped by transaction,and appear in the order in which they were committed to the database. Aborted/rolled-back transactionsdo not appear in the stream. Thus, if you apply the change events in the same order, you end up with an exact,transactionally consistent copy of the database. It’s looks like the Event Sourcing pattern that you previously implementedin your application, but now it’s available out of the box from the PostgreSQL® database.
For access to real-time changes PostgreSQL® provides the streaming replication protocol. Replication protocol can be physicalor logical. Physical replication protocol is used for Master/Secondary replication. Logical replication protocol can be usedto stream changes to an external system.
Since the JDBC API does not include replicationPGConnection
implements the PostgreSQL® API
Configure database
Your database should be configured to enable logical or physical replication
postgresql.conf
- Property
max_wal_senders
should be at least equal to the number of replication consumers - Property
wal_keep_segments
should contain count wal segments that can’t be removed from database. - Property
wal_level
for logical replication should be equal tological
. - Property
max_replication_slots
should be greater than zero for logical replication, because logical replication can’twork without replication slot.
pg_hba.conf
Enable connect user with replication privileges to replication stream.
localreplicationalltrusthostreplicationall127.0.0.1/32md5hostreplicationall::1/128md5
Configuration for examples
postgresql.conf
max_wal_senders=4 # max number of walsender processeswal_keep_segments=4 # in logfile segments, 16MB each; 0 disableswal_level=logical # minimal, replica, or logicalmax_replication_slots=4 # max number of replication slots
pg_hba.conf
#Allowreplicationconnectionsfromlocalhost,byauserwiththe#replicationprivilege.localreplicationalltrusthostreplicationall127.0.0.1/32md5hostreplicationall::1/128md5
Logical replication
Logical replication uses a replication slot to reserve WAL logs on the server and also defines which decoding plugin touse to decode the WAL logs to the required format, for example you can decode changes as json, protobuf, etc. To demonstratehow to use the pgJDBC replication API we will use thetest_decoding
plugin that is included in thepostgresql-contrib
package, but you can use your own decoding plugin. There are a few on github which can be used as examples.
In order to use the replication API, the Connection has to be created in replication mode, in this mode the connectionis not available to execute SQL commands, and can only be used with replication API. This is a restriction imposed by PostgreSQL®.
Example 9.4. Create replication connection.
Stringurl="jdbc:postgresql://localhost:5432/postgres";Propertiesprops=newProperties();PGProperty.USER.set(props,"postgres");PGProperty.PASSWORD.set(props,"postgres");PGProperty.ASSUME_MIN_SERVER_VERSION.set(props,"9.4");PGProperty.REPLICATION.set(props,"database");PGProperty.PREFER_QUERY_MODE.set(props,"simple");Connectioncon=DriverManager.getConnection(url,props);PGConnectionreplConnection=con.unwrap(PGConnection.class);
The entire replication API is grouped inorg.postgresql.replication.PGReplicationConnection
and is available viaorg.postgresql.PGConnection#getReplicationAPI
.
Before you can start replication protocol, you need to have replication slot, which can be also created via pgJDBC API.
Example 9.5. Create replication slot via pgJDBC API
replConnection.getReplicationAPI().createReplicationSlot().logical().withSlotName("demo_logical_slot").withOutputPlugin("test_decoding").make();
Once we have the replication slot, we can create a ReplicationStream.
Example 9.6. Create logical replication stream.
PGReplicationStreamstream=replConnection.getReplicationAPI().replicationStream().logical().withSlotName("demo_logical_slot").withSlotOption("include-xids",false).withSlotOption("skip-empty-xacts",true).start();
The replication stream will send all changes since the creation of the replication slot or from replication slotrestart LSN if the slot was already used for replication. You can also start streaming changes from a particular LSN position,in that case LSN position should be specified when you create the replication stream.
Example 9.7. Create logical replication stream from particular position.
LogSequenceNumberwaitLSN=LogSequenceNumber.valueOf("6F/E3C53568");PGReplicationStreamstream=replConnection.getReplicationAPI().replicationStream().logical().withSlotName("demo_logical_slot").withSlotOption("include-xids",false).withSlotOption("skip-empty-xacts",true).withStartPosition(waitLSN).start();
ViawithSlotOption
we also can specify options that will be sent to our output plugin, this allows the user to customize decoding.For example, I have my own output plugin that has a propertysensitive=true
which will include changes by sensitive columns to changeevent.
Example 9.8. Example output with include-xids=true
BEGIN105779tablepublic.test_logic_table:INSERT:pk[integer]:1name[charactervarying]:'previous value'COMMIT105779
Example 9.9. Example output with include-xids=false
BEGINtablepublic.test_logic_table:INSERT:pk[integer]:1name[charactervarying]:'previous value'COMMIT
During replication the database and consumer periodically exchange ping messages. When the database or client do not receiveping message within the configured timeout, replication has been deemed to have stopped and an exception will be thrown andthe database will free resources. In PostgreSQL® the ping timeout is configured by the propertywal_sender_timeout
(default = 60 seconds). Replication stream in pgjdc can be configured to send feedback(ping) when required or by time interval.It is recommended to send feedback(ping) to the database more often than configuredwal_sender_timeout
. In production systemsI use value equal towal_sender_timeout / 3
. It’s avoids a potential problems with networks and changes to bestreamed without disconnects by timeout. To specify the feedback interval usewithStatusInterval
method.
Example 9.10. Replication stream with configured feedback interval equal to 20 sec
PGReplicationStreamstream=replConnection.getReplicationAPI().replicationStream().logical().withSlotName("demo_logical_slot").withSlotOption("include-xids",false).withSlotOption("skip-empty-xacts",true).withStatusInterval(20,TimeUnit.SECONDS).start();
After createPGReplicationStream
, it’s time to start receive changes in real-time.
Changes can be received from stream as blocking(org.postgresql.replication.PGReplicationStream#read
) or asnon-blocking (org.postgresql.replication.PGReplicationStream#readPending
).Both methods receive changes as ajava.nio.ByteBuffer
with the payload from the send output plugin. We can’t receivepart of message, only the full message that was sent by the output plugin. ByteBuffer contains message in format that isdefined by the decoding output plugin, it can be simple String, json, or whatever the plugin determines. That’s whypgJDBC returns the raw ByteBuffer instead of making assumptions.
Example 9.11. Example send message from output plugin.
OutputPluginPrepareWrite(ctx,true);appendStringInfo(ctx->out,"BEGIN %u",txn->xid);OutputPluginWrite(ctx,true);
Example 9.12. Receive changes via replication stream.
while(true){//non blocking receive messageByteBuffermsg=stream.readPending();if(msg==null){TimeUnit.MILLISECONDS.sleep(10L);continue;}intoffset=msg.arrayOffset();byte[]source=msg.array();intlength=source.length-offset;System.out.println(newString(source,offset,length));}
As mentioned previously, replication stream should periodically send feedback to the database to prevent disconnect viatimeout. Feedback is automatically sent whenread
orreadPending
are called if it’s time to send feedback. Feedbackcan also be sent viaorg.postgresql.replication.PGReplicationStream#forceUpdateStatus()
regardless of the timeout. Anotherimportant duty of feedback is to provide the server with the Logical Sequence Number (LSN) that has been successfully receivedand applied to consumer, it is necessary for monitoring and to truncate/archive WAL’s that that are no longer needed. In theevent that replication has been restarted, it’s will start from last successfully processed LSN that was sent via feedback to database.
The API provides the following feedback mechanism to indicate the successfully applied LSN by the current consumer. LSN’sbefore this can be truncated or archived.org.postgresql.replication.PGReplicationStream#setFlushedLSN
andorg.postgresql.replication.PGReplicationStream#setAppliedLSN
. You always can get last receive LSN viaorg.postgresql.replication.PGReplicationStream#getLastReceiveLSN
.
Example 9.13. Add feedback indicating a successfully process LSN
while(true){//Receive last successfully send to queue message. LSN ordered.LogSequenceNumbersuccessfullySendToQueue=getQueueFeedback();if(successfullySendToQueue!=null){stream.setAppliedLSN(successfullySendToQueue);stream.setFlushedLSN(successfullySendToQueue);}//non blocking receive messageByteBuffermsg=stream.readPending();if(msg==null){TimeUnit.MILLISECONDS.sleep(10L);continue;}asyncSendToQueue(msg,stream.getLastReceiveLSN());}
Example 9.14. Full example of logical replication
Stringurl="jdbc:postgresql://localhost:5432/test";Propertiesprops=newProperties();PGProperty.USER.set(props,"postgres");PGProperty.PASSWORD.set(props,"postgres");PGProperty.ASSUME_MIN_SERVER_VERSION.set(props,"9.4");PGProperty.REPLICATION.set(props,"database");PGProperty.PREFER_QUERY_MODE.set(props,"simple");Connectioncon=DriverManager.getConnection(url,props);PGConnectionreplConnection=con.unwrap(PGConnection.class);replConnection.getReplicationAPI().createReplicationSlot().logical().withSlotName("demo_logical_slot").withOutputPlugin("test_decoding").make();//some changes after create replication slot to demonstrate receive itsqlConnection.setAutoCommit(true);Statementst=sqlConnection.createStatement();st.execute("insert into test_logic_table(name) values('first tx changes')");st.close();st=sqlConnection.createStatement();st.execute("update test_logic_table set name = 'second tx change' where pk = 1");st.close();st=sqlConnection.createStatement();st.execute("delete from test_logic_table where pk = 1");st.close();PGReplicationStreamstream=replConnection.getReplicationAPI().replicationStream().logical().withSlotName("demo_logical_slot").withSlotOption("include-xids",false).withSlotOption("skip-empty-xacts",true).withStatusInterval(20,TimeUnit.SECONDS).start();while(true){//non blocking receive messageByteBuffermsg=stream.readPending();if(msg==null){TimeUnit.MILLISECONDS.sleep(10L);continue;}intoffset=msg.arrayOffset();byte[]source=msg.array();intlength=source.length-offset;System.out.println(newString(source,offset,length));//feedbackstream.setAppliedLSN(stream.getLastReceiveLSN());stream.setFlushedLSN(stream.getLastReceiveLSN());}
Where output looks like this, where each line is a separate message.
BEGINtablepublic.test_logic_table:INSERT:pk[integer]:1name[charactervarying]:'first tx changes'COMMITBEGINtablepublic.test_logic_table:UPDATE:pk[integer]:1name[charactervarying]:'second tx change'COMMITBEGINtablepublic.test_logic_table:DELETE:pk[integer]:1COMMIT
Physical replication
API for physical replication looks like the API for logical replication. Physical replication does not require a replicationslot. And ByteBuffer will contain the binary form of WAL logs. The binary WAL format is a very low level API, and can changefrom version to version. That is why replication between different major PostgreSQL® versions is not possible. But physicalreplication can contain many important data, that is not available via logical replication. That is why pgJDBC contains animplementation for both.
Example 9.15. Use physical replication
LogSequenceNumberlsn=getCurrentLSN();Statementst=sqlConnection.createStatement();st.execute("insert into test_physic_table(name) values('previous value')");st.close();PGReplicationStreamstream=pgConnection.getReplicationAPI().replicationStream().physical().withStartPosition(lsn).start();ByteBufferread=stream.read();
Arrays
PostgreSQL® provides robust support for array data types as column types, function argumentsand criteria in where clauses. There are several ways to create arrays with pgJDBC.
Thejava.sql. Connection.createArrayOf(String, Object[]) can be used to create anjava.sql. Array fromObject[]
instances (Note: this includes both primitive and object multi-dimensional arrays).A similar methodorg.postgresql.PGConnection.createArrayOf(String, Object)
provides support for primitive array types.Thejava.sql.Array
object returned from these methods can be used in other methods, such asPreparedStatement.setArray(int, Array).
The following types of arrays support binary representation in requests and can be used inPreparedStatement.setObject
Java Type | Supported binary PostgreSQL® Types | Default PostgreSQL® Type |
---|---|---|
short[] ,Short[] | int2[] | int2[] |
int[] ,Integer[] | int4[] | int4[] |
long[] ,Long[] | int8[] | int8[] |
float[] ,Float[] | float4[] | float4[] |
double[] ,Double[] | float8[] | float8[] |
boolean[] ,Boolean[] | bool[] | bool[] |
String[] | varchar[] ,text[] | varchar[] |
byte[][] | bytea[] | bytea[] |
CopyManager
The driver provides an extension for accessingCOPY
. Copy is an extension that PostreSQL provides. seeCopy
Example 9.15 Copying Data in
/** DDL for code below* create table copytest (stringvalue text, intvalue int, numvalue numeric(5,2));*/privatestaticString[]origData={"First Row\t1\t1.10\n","Second Row\t2\t-22.20\n","\\N\t\\N\t\\N\n","\t4\t444.40\n"};privateintdataRows=origData.length;privateStringsql="COPY copytest FROM STDIN";try(Connectioncon=DriverManager.getConnection(url,"postgres","somepassword")){PGConnectionpgConnection=con.unwrap(org.postgresql.PGConnection.class);CopyManagercopyAPI=pgConnection.getCopyAPI();CopyIncp=copyAPI.copyIn(sql);for(StringanOrigData:origData){byte[]buf=anOrigData.getBytes();cp.writeToCopy(buf,0,buf.length);}longupdatedRows=cp.endCopy();longhandledRowCount=cp.getHandledRowCount();System.err.println(String.format("copy Updated %d Rows, and handled %d rows",updatedRows,handledRowCount));introwCount=getCount(con);System.err.println(rowCount);}
Example 9.16 Copying Data out
Stringsql="COPY copytest TO STDOUT";try(Connectioncon=DriverManager.getConnection(url,"postgres","somepassword")){PGConnectionpgConnection=con.unwrap(org.postgresql.PGConnection.class);CopyManagercopyAPI=pgConnection.getCopyAPI();CopyOutcp=copyAPI.copyOut(sql);intcount=0;byte[]buf;// This is a relatively simple example. buf will contain rows from the databasewhile((buf=cp.readFromCopy())!=null){count++;}longrowCount=cp.getHandledRowCount();}
More examples can be found in theCopy Test Code