@@ -715,7 +715,7 @@ createdb(ParseState *pstate, const CreatedbStmt *stmt)
715715int encoding = -1 ;
716716bool dbistemplate = false;
717717bool dballowconnections = true;
718- int dbconnlimit = -1 ;
718+ int dbconnlimit = DATCONNLIMIT_UNLIMITED ;
719719char * dbcollversion = NULL ;
720720int notherbackends ;
721721int npreparedxacts ;
@@ -914,7 +914,7 @@ createdb(ParseState *pstate, const CreatedbStmt *stmt)
914914if (dconnlimit && dconnlimit -> arg )
915915{
916916dbconnlimit = defGetInt32 (dconnlimit );
917- if (dbconnlimit < -1 )
917+ if (dbconnlimit < DATCONNLIMIT_UNLIMITED )
918918ereport (ERROR ,
919919(errcode (ERRCODE_INVALID_PARAMETER_VALUE ),
920920errmsg ("invalid connection limit: %d" ,dbconnlimit )));
@@ -965,6 +965,16 @@ createdb(ParseState *pstate, const CreatedbStmt *stmt)
965965errmsg ("template database \"%s\" does not exist" ,
966966dbtemplate )));
967967
968+ /*
969+ * If the source database was in the process of being dropped, we can't
970+ * use it as a template.
971+ */
972+ if (database_is_invalid_oid (src_dboid ))
973+ ereport (ERROR ,
974+ errcode (ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE ),
975+ errmsg ("cannot use invalid database \"%s\" as template" ,dbtemplate ),
976+ errhint ("Use DROP DATABASE to drop invalid databases." ));
977+
968978/*
969979 * Permission check: to copy a DB that's not marked datistemplate, you
970980 * must be superuser or the owner thereof.
@@ -1513,6 +1523,7 @@ dropdb(const char *dbname, bool missing_ok, bool force)
15131523bool db_istemplate ;
15141524Relation pgdbrel ;
15151525HeapTuple tup ;
1526+ Form_pg_database datform ;
15161527int notherbackends ;
15171528int npreparedxacts ;
15181529int nslots ,
@@ -1628,17 +1639,6 @@ dropdb(const char *dbname, bool missing_ok, bool force)
16281639dbname ),
16291640errdetail_busy_db (notherbackends ,npreparedxacts )));
16301641
1631- /*
1632- * Remove the database's tuple from pg_database.
1633- */
1634- tup = SearchSysCache1 (DATABASEOID ,ObjectIdGetDatum (db_id ));
1635- if (!HeapTupleIsValid (tup ))
1636- elog (ERROR ,"cache lookup failed for database %u" ,db_id );
1637-
1638- CatalogTupleDelete (pgdbrel ,& tup -> t_self );
1639-
1640- ReleaseSysCache (tup );
1641-
16421642/*
16431643 * Delete any comments or security labels associated with the database.
16441644 */
@@ -1655,6 +1655,37 @@ dropdb(const char *dbname, bool missing_ok, bool force)
16551655 */
16561656dropDatabaseDependencies (db_id );
16571657
1658+ /*
1659+ * Tell the cumulative stats system to forget it immediately, too.
1660+ */
1661+ pgstat_drop_database (db_id );
1662+
1663+ tup = SearchSysCacheCopy1 (DATABASEOID ,ObjectIdGetDatum (db_id ));
1664+ if (!HeapTupleIsValid (tup ))
1665+ elog (ERROR ,"cache lookup failed for database %u" ,db_id );
1666+ datform = (Form_pg_database )GETSTRUCT (tup );
1667+
1668+ /*
1669+ * Except for the deletion of the catalog row, subsequent actions are not
1670+ * transactional (consider DropDatabaseBuffers() discarding modified
1671+ * buffers). But we might crash or get interrupted below. To prevent
1672+ * accesses to a database with invalid contents, mark the database as
1673+ * invalid using an in-place update.
1674+ *
1675+ * We need to flush the WAL before continuing, to guarantee the
1676+ * modification is durable before performing irreversible filesystem
1677+ * operations.
1678+ */
1679+ datform -> datconnlimit = DATCONNLIMIT_INVALID_DB ;
1680+ heap_inplace_update (pgdbrel ,tup );
1681+ XLogFlush (XactLastRecEnd );
1682+
1683+ /*
1684+ * Also delete the tuple - transactionally. If this transaction commits,
1685+ * the row will be gone, but if we fail, dropdb() can be invoked again.
1686+ */
1687+ CatalogTupleDelete (pgdbrel ,& tup -> t_self );
1688+
16581689/*
16591690 * Drop db-specific replication slots.
16601691 */
@@ -1667,11 +1698,6 @@ dropdb(const char *dbname, bool missing_ok, bool force)
16671698 */
16681699DropDatabaseBuffers (db_id );
16691700
1670- /*
1671- * Tell the cumulative stats system to forget it immediately, too.
1672- */
1673- pgstat_drop_database (db_id );
1674-
16751701/*
16761702 * Tell checkpointer to forget any pending fsync and unlink requests for
16771703 * files in the database; else the fsyncs will fail at next checkpoint, or
@@ -2188,7 +2214,7 @@ AlterDatabase(ParseState *pstate, AlterDatabaseStmt *stmt, bool isTopLevel)
21882214ListCell * option ;
21892215bool dbistemplate = false;
21902216bool dballowconnections = true;
2191- int dbconnlimit = -1 ;
2217+ int dbconnlimit = DATCONNLIMIT_UNLIMITED ;
21922218DefElem * distemplate = NULL ;
21932219DefElem * dallowconnections = NULL ;
21942220DefElem * dconnlimit = NULL ;
@@ -2259,7 +2285,7 @@ AlterDatabase(ParseState *pstate, AlterDatabaseStmt *stmt, bool isTopLevel)
22592285if (dconnlimit && dconnlimit -> arg )
22602286{
22612287dbconnlimit = defGetInt32 (dconnlimit );
2262- if (dbconnlimit < -1 )
2288+ if (dbconnlimit < DATCONNLIMIT_UNLIMITED )
22632289ereport (ERROR ,
22642290(errcode (ERRCODE_INVALID_PARAMETER_VALUE ),
22652291errmsg ("invalid connection limit: %d" ,dbconnlimit )));
@@ -2286,6 +2312,14 @@ AlterDatabase(ParseState *pstate, AlterDatabaseStmt *stmt, bool isTopLevel)
22862312datform = (Form_pg_database )GETSTRUCT (tuple );
22872313dboid = datform -> oid ;
22882314
2315+ if (database_is_invalid_form (datform ))
2316+ {
2317+ ereport (FATAL ,
2318+ errcode (ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE ),
2319+ errmsg ("cannot alter invalid database \"%s\"" ,stmt -> dbname ),
2320+ errhint ("Use DROP DATABASE to drop invalid databases." ));
2321+ }
2322+
22892323if (!pg_database_ownercheck (dboid ,GetUserId ()))
22902324aclcheck_error (ACLCHECK_NOT_OWNER ,OBJECT_DATABASE ,
22912325stmt -> dbname );
@@ -3007,6 +3041,42 @@ get_database_name(Oid dbid)
30073041return result ;
30083042}
30093043
3044+
3045+ /*
3046+ * While dropping a database the pg_database row is marked invalid, but the
3047+ * catalog contents still exist. Connections to such a database are not
3048+ * allowed.
3049+ */
3050+ bool
3051+ database_is_invalid_form (Form_pg_database datform )
3052+ {
3053+ return datform -> datconnlimit == DATCONNLIMIT_INVALID_DB ;
3054+ }
3055+
3056+
3057+ /*
3058+ * Convenience wrapper around database_is_invalid_form()
3059+ */
3060+ bool
3061+ database_is_invalid_oid (Oid dboid )
3062+ {
3063+ HeapTuple dbtup ;
3064+ Form_pg_database dbform ;
3065+ bool invalid ;
3066+
3067+ dbtup = SearchSysCache1 (DATABASEOID ,ObjectIdGetDatum (dboid ));
3068+ if (!HeapTupleIsValid (dbtup ))
3069+ elog (ERROR ,"cache lookup failed for database %u" ,dboid );
3070+ dbform = (Form_pg_database )GETSTRUCT (dbtup );
3071+
3072+ invalid = database_is_invalid_form (dbform );
3073+
3074+ ReleaseSysCache (dbtup );
3075+
3076+ return invalid ;
3077+ }
3078+
3079+
30103080/*
30113081 * recovery_create_dbdir()
30123082 *