88 *
99 *
1010 * IDENTIFICATION
11- * $PostgreSQL: pgsql/src/backend/catalog/pg_operator.c,v 1.104 2008/06/19 00:46:04 alvherre Exp $
11+ * $PostgreSQL: pgsql/src/backend/catalog/pg_operator.c,v 1.105 2008/08/16 00:01:35 tgl Exp $
1212 *
1313 * NOTES
1414 * these routines moved here from commands/define.c and somewhat cleaned up.
2121#include "access/xact.h"
2222#include "catalog/dependency.h"
2323#include "catalog/indexing.h"
24+ #include "catalog/namespace.h"
2425#include "catalog/pg_namespace.h"
2526#include "catalog/pg_operator.h"
2627#include "catalog/pg_proc.h"
2728#include "catalog/pg_type.h"
2829#include "miscadmin.h"
29- #include "parser/parse_func.h"
3030#include "parser/parse_oper.h"
3131#include "utils/acl.h"
3232#include "utils/builtins.h"
@@ -273,6 +273,11 @@ OperatorShellMake(const char *operatorName,
273273
274274heap_freetuple (tup );
275275
276+ /*
277+ * Make sure the tuple is visible for subsequent lookups/updates.
278+ */
279+ CommandCounterIncrement ();
280+
276281/*
277282 * close the operator relation and return the oid.
278283 */
@@ -289,14 +294,19 @@ OperatorShellMake(const char *operatorName,
289294 *operatorNamespacenamespace for new operator
290295 *leftTypeIdX left type ID
291296 *rightTypeIdX right type ID
292- *procedureName procedure for operator
297+ *procedureId procedure ID for operator
293298 *commutatorNameX commutator operator
294299 *negatorNameX negator operator
295- *restrictionName X restrictionsel. procedure
296- *joinName X joinsel. procedure
300+ *restrictionId X restrictionselectivity procedure ID
301+ *joinId X joinselectivity procedure ID
297302 *canMergemerge join can be used with this operator
298303 *canHashhash join can be used with this operator
299304 *
305+ * The caller should have validated properties and permissions for the
306+ * objects passed as OID references. We must handle the commutator and
307+ * negator operator references specially, however, since those need not
308+ * exist beforehand.
309+ *
300310 * This routine gets complicated because it allows the user to
301311 * specify operators that do not exist. For example, if operator
302312 * "op" is being defined, the negator operator "negop" and the
@@ -310,59 +320,17 @@ OperatorShellMake(const char *operatorName,
310320 * information about the operator (just its name and types).
311321 * Forward declaration is used only for this purpose, it is
312322 * not available to the user as it is for type definition.
313- *
314- * Algorithm:
315- *
316- * check if operator already defined
317- * if so, but oprcode is null, save the Oid -- we are filling in a shell
318- * otherwise error
319- * get the attribute types from relation descriptor for pg_operator
320- * assign values to the fields of the operator:
321- * operatorName
322- * owner id (simply the user id of the caller)
323- * operator "kind" either "b" for binary or "l" for left unary
324- * canMerge boolean
325- * canHash boolean
326- * leftTypeObjectId -- type must already be defined
327- * rightTypeObjectId -- this is optional, enter ObjectId=0 if none specified
328- * resultType -- defer this, since it must be determined from
329- * the pg_procedure catalog
330- * commutatorObjectId -- if this is NULL, enter ObjectId=0
331- * else if this already exists, enter its ObjectId
332- * else if this does not yet exist, and is not
333- *the same as the main operatorName, then create
334- *a shell and enter the new ObjectId
335- * else if this does not exist but IS the same
336- *name & types as the main operator, set the ObjectId=0.
337- *(We are creating a self-commutating operator.)
338- *The link will be fixed later by OperatorUpd.
339- * negatorObjectId -- same as for commutatorObjectId
340- * operatorProcedure -- must access the pg_procedure catalog to get the
341- * ObjectId of the procedure that actually does the operator
342- * actions this is required. Do a lookup to find out the
343- * return type of the procedure
344- * restrictionProcedure -- must access the pg_procedure catalog to get
345- * the ObjectId but this is optional
346- * joinProcedure -- same as restrictionProcedure
347- * now either insert or replace the operator into the pg_operator catalog
348- * if the operator shell is being filled in
349- * access the catalog in order to get a valid buffer
350- * create a tuple using ModifyHeapTuple
351- * get the t_self from the modified tuple and call RelationReplaceHeapTuple
352- * else if a new operator is being created
353- * create a tuple using heap_formtuple
354- * call simple_heap_insert
355323 */
356324void
357325OperatorCreate (const char * operatorName ,
358326Oid operatorNamespace ,
359327Oid leftTypeId ,
360328Oid rightTypeId ,
361- List * procedureName ,
329+ Oid procedureId ,
362330List * commutatorName ,
363331List * negatorName ,
364- List * restrictionName ,
365- List * joinName ,
332+ Oid restrictionId ,
333+ Oid joinId ,
366334bool canMerge ,
367335bool canHash )
368336{
@@ -373,15 +341,10 @@ OperatorCreate(const char *operatorName,
373341Datum values [Natts_pg_operator ];
374342Oid operatorObjectId ;
375343bool operatorAlreadyDefined ;
376- Oid procOid ;
377344Oid operResultType ;
378345Oid commutatorId ,
379- negatorId ,
380- restOid ,
381- joinOid ;
346+ negatorId ;
382347bool selfCommutator = false;
383- Oid typeId [4 ];/* only need up to 4 args here */
384- int nargs ;
385348NameData oname ;
386349TupleDesc tupDesc ;
387350int i ;
@@ -395,19 +358,14 @@ OperatorCreate(const char *operatorName,
395358errmsg ("\"%s\" is not a valid operator name" ,
396359operatorName )));
397360
398- if (!OidIsValid (leftTypeId )&& !OidIsValid (rightTypeId ))
399- ereport (ERROR ,
400- (errcode (ERRCODE_INVALID_FUNCTION_DEFINITION ),
401- errmsg ("at least one of leftarg or rightarg must be specified" )));
402-
403361if (!(OidIsValid (leftTypeId )&& OidIsValid (rightTypeId )))
404362{
405363/* If it's not a binary op, these things mustn't be set: */
406364if (commutatorName )
407365ereport (ERROR ,
408366(errcode (ERRCODE_INVALID_FUNCTION_DEFINITION ),
409367errmsg ("only binary operators can have commutators" )));
410- if (joinName )
368+ if (OidIsValid ( joinId ) )
411369ereport (ERROR ,
412370(errcode (ERRCODE_INVALID_FUNCTION_DEFINITION ),
413371errmsg ("only binary operators can have join selectivity" )));
@@ -421,6 +379,33 @@ OperatorCreate(const char *operatorName,
421379errmsg ("only binary operators can hash" )));
422380}
423381
382+ operResultType = get_func_rettype (procedureId );
383+
384+ if (operResultType != BOOLOID )
385+ {
386+ /* If it's not a boolean op, these things mustn't be set: */
387+ if (negatorName )
388+ ereport (ERROR ,
389+ (errcode (ERRCODE_INVALID_FUNCTION_DEFINITION ),
390+ errmsg ("only boolean operators can have negators" )));
391+ if (OidIsValid (restrictionId ))
392+ ereport (ERROR ,
393+ (errcode (ERRCODE_INVALID_FUNCTION_DEFINITION ),
394+ errmsg ("only boolean operators can have restriction selectivity" )));
395+ if (OidIsValid (joinId ))
396+ ereport (ERROR ,
397+ (errcode (ERRCODE_INVALID_FUNCTION_DEFINITION ),
398+ errmsg ("only boolean operators can have join selectivity" )));
399+ if (canMerge )
400+ ereport (ERROR ,
401+ (errcode (ERRCODE_INVALID_FUNCTION_DEFINITION ),
402+ errmsg ("only boolean operators can merge join" )));
403+ if (canHash )
404+ ereport (ERROR ,
405+ (errcode (ERRCODE_INVALID_FUNCTION_DEFINITION ),
406+ errmsg ("only boolean operators can hash" )));
407+ }
408+
424409operatorObjectId = OperatorGet (operatorName ,
425410operatorNamespace ,
426411leftTypeId ,
@@ -435,85 +420,13 @@ OperatorCreate(const char *operatorName,
435420
436421/*
437422 * At this point, if operatorObjectId is not InvalidOid then we are
438- * filling in a previously-created shell.
439- */
440-
441- /*
442- * Look up registered procedures -- find the return type of procedureName
443- * to place in "result" field. Do this before shells are created so we
444- * don't have to worry about deleting them later.
445- */
446- if (!OidIsValid (leftTypeId ))
447- {
448- typeId [0 ]= rightTypeId ;
449- nargs = 1 ;
450- }
451- else if (!OidIsValid (rightTypeId ))
452- {
453- typeId [0 ]= leftTypeId ;
454- nargs = 1 ;
455- }
456- else
457- {
458- typeId [0 ]= leftTypeId ;
459- typeId [1 ]= rightTypeId ;
460- nargs = 2 ;
461- }
462- procOid = LookupFuncName (procedureName ,nargs ,typeId , false);
463- operResultType = get_func_rettype (procOid );
464-
465- /*
466- * find restriction estimator
467- */
468- if (restrictionName )
469- {
470- typeId [0 ]= INTERNALOID ;/* Query */
471- typeId [1 ]= OIDOID ;/* operator OID */
472- typeId [2 ]= INTERNALOID ;/* args list */
473- typeId [3 ]= INT4OID ;/* varRelid */
474-
475- restOid = LookupFuncName (restrictionName ,4 ,typeId , false);
476- }
477- else
478- restOid = InvalidOid ;
479-
480- /*
481- * find join estimator
482- */
483- if (joinName )
484- {
485- typeId [0 ]= INTERNALOID ;/* Query */
486- typeId [1 ]= OIDOID ;/* operator OID */
487- typeId [2 ]= INTERNALOID ;/* args list */
488- typeId [3 ]= INT2OID ;/* jointype */
489-
490- joinOid = LookupFuncName (joinName ,4 ,typeId , false);
491- }
492- else
493- joinOid = InvalidOid ;
494-
495- /*
496- * set up values in the operator tuple
423+ * filling in a previously-created shell. Insist that the user own
424+ * any such shell.
497425 */
498-
499- for (i = 0 ;i < Natts_pg_operator ;++ i )
500- {
501- values [i ]= (Datum )NULL ;
502- replaces [i ]= 'r' ;
503- nulls [i ]= ' ' ;
504- }
505-
506- i = 0 ;
507- namestrcpy (& oname ,operatorName );
508- values [i ++ ]= NameGetDatum (& oname );/* oprname */
509- values [i ++ ]= ObjectIdGetDatum (operatorNamespace );/* oprnamespace */
510- values [i ++ ]= ObjectIdGetDatum (GetUserId ());/* oprowner */
511- values [i ++ ]= CharGetDatum (leftTypeId ? (rightTypeId ?'b' :'r' ) :'l' );/* oprkind */
512- values [i ++ ]= BoolGetDatum (canMerge );/* oprcanmerge */
513- values [i ++ ]= BoolGetDatum (canHash );/* oprcanhash */
514- values [i ++ ]= ObjectIdGetDatum (leftTypeId );/* oprleft */
515- values [i ++ ]= ObjectIdGetDatum (rightTypeId );/* oprright */
516- values [i ++ ]= ObjectIdGetDatum (operResultType );/* oprresult */
426+ if (OidIsValid (operatorObjectId )&&
427+ !pg_oper_ownercheck (operatorObjectId ,GetUserId ()))
428+ aclcheck_error (ACLCHECK_NOT_OWNER ,ACL_KIND_OPER ,
429+ operatorName );
517430
518431/*
519432 * Set up the other operators.If they do not currently exist, create
@@ -529,6 +442,12 @@ OperatorCreate(const char *operatorName,
529442leftTypeId ,rightTypeId ,
530443 true);
531444
445+ /* Permission check: must own other operator */
446+ if (OidIsValid (commutatorId )&&
447+ !pg_oper_ownercheck (commutatorId ,GetUserId ()))
448+ aclcheck_error (ACLCHECK_NOT_OWNER ,ACL_KIND_OPER ,
449+ NameListToString (commutatorName ));
450+
532451/*
533452 * self-linkage to this operator; will fix below. Note that only
534453 * self-linkage for commutation makes sense.
@@ -538,7 +457,6 @@ OperatorCreate(const char *operatorName,
538457}
539458else
540459commutatorId = InvalidOid ;
541- values [i ++ ]= ObjectIdGetDatum (commutatorId );/* oprcom */
542460
543461if (negatorName )
544462{
@@ -548,19 +466,48 @@ OperatorCreate(const char *operatorName,
548466operatorName ,operatorNamespace ,
549467leftTypeId ,rightTypeId ,
550468 false);
469+
470+ /* Permission check: must own other operator */
471+ if (OidIsValid (negatorId )&&
472+ !pg_oper_ownercheck (negatorId ,GetUserId ()))
473+ aclcheck_error (ACLCHECK_NOT_OWNER ,ACL_KIND_OPER ,
474+ NameListToString (negatorName ));
551475}
552476else
553477negatorId = InvalidOid ;
554- values [i ++ ]= ObjectIdGetDatum (negatorId );/* oprnegate */
555478
556- values [i ++ ]= ObjectIdGetDatum (procOid );/* oprcode */
557- values [i ++ ]= ObjectIdGetDatum (restOid );/* oprrest */
558- values [i ++ ]= ObjectIdGetDatum (joinOid );/* oprjoin */
479+ /*
480+ * set up values in the operator tuple
481+ */
482+
483+ for (i = 0 ;i < Natts_pg_operator ;++ i )
484+ {
485+ values [i ]= (Datum )NULL ;
486+ replaces [i ]= 'r' ;
487+ nulls [i ]= ' ' ;
488+ }
489+
490+ i = 0 ;
491+ namestrcpy (& oname ,operatorName );
492+ values [i ++ ]= NameGetDatum (& oname );/* oprname */
493+ values [i ++ ]= ObjectIdGetDatum (operatorNamespace );/* oprnamespace */
494+ values [i ++ ]= ObjectIdGetDatum (GetUserId ());/* oprowner */
495+ values [i ++ ]= CharGetDatum (leftTypeId ? (rightTypeId ?'b' :'r' ) :'l' );/* oprkind */
496+ values [i ++ ]= BoolGetDatum (canMerge );/* oprcanmerge */
497+ values [i ++ ]= BoolGetDatum (canHash );/* oprcanhash */
498+ values [i ++ ]= ObjectIdGetDatum (leftTypeId );/* oprleft */
499+ values [i ++ ]= ObjectIdGetDatum (rightTypeId );/* oprright */
500+ values [i ++ ]= ObjectIdGetDatum (operResultType );/* oprresult */
501+ values [i ++ ]= ObjectIdGetDatum (commutatorId );/* oprcom */
502+ values [i ++ ]= ObjectIdGetDatum (negatorId );/* oprnegate */
503+ values [i ++ ]= ObjectIdGetDatum (procedureId );/* oprcode */
504+ values [i ++ ]= ObjectIdGetDatum (restrictionId );/* oprrest */
505+ values [i ++ ]= ObjectIdGetDatum (joinId );/* oprjoin */
559506
560507pg_operator_desc = heap_open (OperatorRelationId ,RowExclusiveLock );
561508
562509/*
563- * If we areadding to an operator shell, update; else insert
510+ * If we arereplacing an operator shell, update; else insert
564511 */
565512if (operatorObjectId )
566513{
@@ -706,7 +653,8 @@ OperatorUpd(Oid baseId, Oid commId, Oid negId)
706653/*
707654 * check and update the commutator & negator, if necessary
708655 *
709- * First make sure we can see them...
656+ * We need a CommandCounterIncrement here in case of a self-commutator
657+ * operator: we'll need to update the tuple that we just inserted.
710658 */
711659CommandCounterIncrement ();
712660