Package cloud.google.com/go/pubsub/v2/pstest (v2.3.0)

Note: To get more information about this package, such as access to older versions, viewthis package on pkg.go.dev.

Package pstest provides a fake Cloud PubSub service for testing. It implements asimplified form of the service, suitable for unit tests. It may behavedifferently from the actual service in ways in which the service isnon-deterministic or unspecified: timing, delivery order, etc.

This package is EXPERIMENTAL and is subject to change without notice.

See the example for usage.

Functions

func ResetMinAckDeadline

funcResetMinAckDeadline()

ResetMinAckDeadline resets the minack deadline to the default.

func SetMinAckDeadline

funcSetMinAckDeadline(ntime.Duration)

SetMinAckDeadline changes the minack deadline to n. Must begreater than or equal to 1 second. Remember to reset this valueto the default after your test changes it. Example usage:

pstest.SetMinAckDeadlineSecs(1)defer pstest.ResetMinAckDeadlineSecs()

func ValidateFilter

funcValidateFilter(filterstring)error

ValidateFilter validates if the filter string is parsable.

GServer

GServer is the underlying service implementor. It is not intended to be useddirectly.

func (*GServer) Acknowledge

Acknowledge marks the message as acknowleged.

func (*GServer) CommitSchema

CommitSchema commits a new schema revision.

func (*GServer) CreateSchema

CreateSchema creates a new schema.

func (*GServer) CreateSubscription

func(s*GServer)CreateSubscription(_context.Context,ps*pb.Subscription)(*pb.Subscription,error)

CreateSubscription creates a Pub/Sub subscription.

func (*GServer) CreateTopic

func(s*GServer)CreateTopic(_context.Context,t*pb.Topic)(*pb.Topic,error)

CreateTopic creates a topic.

func (*GServer) DeleteSchema

DeleteSchema deletes an existing schema.

func (*GServer) DeleteSchemaRevision

func(s*GServer)DeleteSchemaRevision(_context.Context,req*pb.DeleteSchemaRevisionRequest)(*pb.Schema,error)

DeleteSchemaRevision deletes a schema revision.

func (*GServer) DeleteSubscription

DeleteSubscription deletes the Pub/Sub subscription.

func (*GServer) DeleteTopic

DeleteTopic deletes the topic.

func (*GServer) DetachSubscription

DetachSubscription detaches the subscription from the topic.

func (*GServer) GetSchema

GetSchema gets an existing schema details.

func (*GServer) GetSubscription

GetSubscription fetches an existing Pub/Sub subscription details.

func (*GServer) GetTopic

GetTopic gets a Pub/Sub topic.

func (*GServer) ListSchemaRevisions

ListSchemaRevisions lists the schema revisions.

func (*GServer) ListSchemas

ListSchemas lists the available schemas in this server.

func (*GServer) ListSubscriptions

ListSubscriptions lists the Pub/Sub subscriptions in this server.

func (*GServer) ListTopicSubscriptions

ListTopicSubscriptions lists the subscriptions associated with a topic.

func (*GServer) ListTopics

ListTopics lists the topics in this server.

func (*GServer) ModifyAckDeadline

ModifyAckDeadline modifies the ack deadline of the message.

func (*GServer) Publish

Publish sends a message to the topic.

func (*GServer) Pull

Pull returns a list of unacknowledged messages from a subscription.

func (*GServer) RollbackSchema

RollbackSchema rolls back the current schema to a previous revision by copying and creating a new revision.

func (*GServer) Seek

Seek updates a subscription to a specific point in time or snapshot.

func (*GServer) StreamingPull

StreamingPull return a stream to pull messages from a subscription.

func (*GServer) UpdateSubscription

UpdateSubscription updates an existing Pub/Sub subscription.

func (*GServer) UpdateTopic

UpdateTopic updates the Pub/Sub topic.

func (*GServer) ValidateMessage

ValidateMessage mocks the ValidateMessage call but only checks that the schema definition to validate themessage against is not empty.

func (*GServer) ValidateSchema

ValidateSchema mocks the ValidateSchema call but only checks that the schema definition is not empty.

Message

typeMessagestruct{IDstringData[]byteAttributesmap[string]stringPublishTimetime.TimeDeliveriesint// number of times delivery of the message was attemptedAcksint// number of acks received from clientsModacks[]Modack// modacks received by server for this messageOrderingKeystringTopicstring// contains filtered or unexported fields}

A Message is a message that was published to the server.

Modack

typeModackstruct{AckIDstringAckDeadlineint32ReceivedAttime.Time}

Modack represents a modack sent to the server.

Reactor

typeReactorinterface{// React handles the message types and returns results.  If "handled" is false,// then the test server will ignore the results and continue to the next reactor// or the original handler.React(_interface{})(handledbool,retinterface{},errerror)}

Reactor is an interface to allow reaction function to a certain call.

ReactorOptions

typeReactorOptionsmap[string][]Reactor

ReactorOptions is a map that Server uses to look up reactors.Key is the function name, value is array of reactor for the function.

Server

typeServerstruct{Addrstring// The address that the server is listening on.GServerGServer// Not intended to be used directly.// contains filtered or unexported fields}

Server is a fake Pub/Sub server.

func NewServer

funcNewServer(opts...ServerReactorOption)*Server

NewServer creates a new fake server running in the current process.

Example

ctx:=context.Background()// Start a fake server running locally.srv:=pstest.NewServer()defersrv.Close()// Connect to the server without using TLS.conn,err:=grpc.Dial(srv.Addr,grpc.WithTransportCredentials(insecure.NewCredentials()))iferr!=nil{// TODO: Handle error.}deferconn.Close()// Use the connection when creating a pubsub client.client,err:=pubsub.NewClient(ctx,"project",option.WithGRPCConn(conn))iferr!=nil{// TODO: Handle error.}deferclient.Close()_=client// TODO: Use the client.

func NewServerWithAddress

funcNewServerWithAddress(addressstring,opts...ServerReactorOption)*Server

NewServerWithAddress creates a new fake server running in the current processat the specified address (host and port).

func NewServerWithCallback

funcNewServerWithCallback(portint,callbackfunc(*grpc.Server),opts...ServerReactorOption)*Server

NewServerWithCallback creates new fake server running in the current processat the specified port. Before starting the server, the provided callback iscalled to allow caller to register additional fakes into grpc server.

func NewServerWithPort

funcNewServerWithPort(portint,opts...ServerReactorOption)*Server

NewServerWithPort creates a new fake server running in the current process atthe specified port.

Example

ctx:=context.Background()// Start a fake server running locally at 9001.srv:=pstest.NewServerWithPort(9001)defersrv.Close()// Connect to the server without using TLS.conn,err:=grpc.Dial(srv.Addr,grpc.WithTransportCredentials(insecure.NewCredentials()))iferr!=nil{// TODO: Handle error.}deferconn.Close()// Use the connection when creating a pubsub client.client,err:=pubsub.NewClient(ctx,"project",option.WithGRPCConn(conn))iferr!=nil{// TODO: Handle error.}deferclient.Close()_=client// TODO: Use the client.

func (*Server) AddPublishResponse

func(s*Server)AddPublishResponse(pbr*pb.PublishResponse,errerror)

AddPublishResponse adds a new publish response to the channel used forresponding to publish requests.

func (*Server) ClearMessages

func(s*Server)ClearMessages()

ClearMessages removes all published messagesfrom internal containers.

func (*Server) Close

func(s*Server)Close()error

Close shuts down the server and releases all resources.

func (*Server) Message

func(s*Server)Message(idstring)*Message

Message returns the message with the given ID, or nil if no messagewith that ID was published.

func (*Server) Messages

func(s*Server)Messages()[]*Message

Messages returns information about all messages ever published.

func (*Server) Publish

func(s*Server)Publish(topicstring,data[]byte,attrsmap[string]string)string

Publish behaves as if the Publish RPC was called with a message with the givendata and attrs. It returns the ID of the message.The topic will be created if it doesn't exist.

Publish panics if there is an error, which is appropriate for testing.

func (*Server) PublishOrdered

func(s*Server)PublishOrdered(topicstring,data[]byte,attrsmap[string]string,orderingKeystring)string

PublishOrdered behaves as if the Publish RPC was called with a message with the givendata, attrs and ordering key. It returns the ID of the message.The topic will be created if it doesn't exist.

PublishOrdered panics if there is an error, which is appropriate for testing.

func (*Server) ResetPublishResponses

func(s*Server)ResetPublishResponses(sizeint)

ResetPublishResponses resets the buffered publishResponses channelwith a new buffered channel with the given size.

func (*Server) SetAutoPublishResponse

func(s*Server)SetAutoPublishResponse(autoPublishResponsebool)

SetAutoPublishResponse controls whether to automatically respondto messages published or to use user-added responses from thepublishResponses channel.

func (*Server) SetStreamTimeout

func(s*Server)SetStreamTimeout(dtime.Duration)

SetStreamTimeout sets the amount of time a stream will be active before it shutsitself down. This mimics the real service's behavior of closing streams after 30minutes. If SetStreamTimeout is never called or is passed zero, streams never shutdown.

func (*Server) SetTimeNowFunc

func(s*Server)SetTimeNowFunc(ffunc()time.Time)

SetTimeNowFunc registers f as a function tobe used instead of time.Now for this server.

func (*Server) Wait

func(s*Server)Wait()

Wait blocks until all server activity has completed.

ServerReactorOption

typeServerReactorOptionstruct{FuncNamestringReactorReactor}

ServerReactorOption is options passed to the server for reactor creation.

func WithErrorInjection

funcWithErrorInjection(funcNamestring,codecodes.Code,msgstring)ServerReactorOption

WithErrorInjection creates a ServerReactorOption that injects error with defined status code andmessage for a certain function.

Except as otherwise noted, the content of this page is licensed under theCreative Commons Attribution 4.0 License, and code samples are licensed under theApache 2.0 License. For details, see theGoogle Developers Site Policies. Java is a registered trademark of Oracle and/or its affiliates.

Last updated 2025-10-30 UTC.