Package cloud.google.com/go/pubsub/v2/pstest (v2.3.0) Stay organized with collections Save and categorize content based on your preferences.
Package pstest provides a fake Cloud PubSub service for testing. It implements asimplified form of the service, suitable for unit tests. It may behavedifferently from the actual service in ways in which the service isnon-deterministic or unspecified: timing, delivery order, etc.
This package is EXPERIMENTAL and is subject to change without notice.
See the example for usage.
Functions
func ResetMinAckDeadline
funcResetMinAckDeadline()ResetMinAckDeadline resets the minack deadline to the default.
func SetMinAckDeadline
SetMinAckDeadline changes the minack deadline to n. Must begreater than or equal to 1 second. Remember to reset this valueto the default after your test changes it. Example usage:
pstest.SetMinAckDeadlineSecs(1)defer pstest.ResetMinAckDeadlineSecs()func ValidateFilter
ValidateFilter validates if the filter string is parsable.
GServer
typeGServerstruct{pb.UnimplementedPublisherServerpb.UnimplementedSubscriberServerpb.UnimplementedSchemaServiceServer// contains filtered or unexported fields}GServer is the underlying service implementor. It is not intended to be useddirectly.
func (*GServer) Acknowledge
Acknowledge marks the message as acknowleged.
func (*GServer) CommitSchema
CommitSchema commits a new schema revision.
func (*GServer) CreateSchema
CreateSchema creates a new schema.
func (*GServer) CreateSubscription
func(s*GServer)CreateSubscription(_context.Context,ps*pb.Subscription)(*pb.Subscription,error)CreateSubscription creates a Pub/Sub subscription.
func (*GServer) CreateTopic
CreateTopic creates a topic.
func (*GServer) DeleteSchema
DeleteSchema deletes an existing schema.
func (*GServer) DeleteSchemaRevision
func(s*GServer)DeleteSchemaRevision(_context.Context,req*pb.DeleteSchemaRevisionRequest)(*pb.Schema,error)DeleteSchemaRevision deletes a schema revision.
func (*GServer) DeleteSubscription
func(s*GServer)DeleteSubscription(_context.Context,req*pb.DeleteSubscriptionRequest)(*emptypb.Empty,error)DeleteSubscription deletes the Pub/Sub subscription.
func (*GServer) DeleteTopic
DeleteTopic deletes the topic.
func (*GServer) DetachSubscription
func(s*GServer)DetachSubscription(_context.Context,req*pb.DetachSubscriptionRequest)(*pb.DetachSubscriptionResponse,error)DetachSubscription detaches the subscription from the topic.
func (*GServer) GetSchema
GetSchema gets an existing schema details.
func (*GServer) GetSubscription
func(s*GServer)GetSubscription(_context.Context,req*pb.GetSubscriptionRequest)(*pb.Subscription,error)GetSubscription fetches an existing Pub/Sub subscription details.
func (*GServer) GetTopic
GetTopic gets a Pub/Sub topic.
func (*GServer) ListSchemaRevisions
func(s*GServer)ListSchemaRevisions(_context.Context,req*pb.ListSchemaRevisionsRequest)(*pb.ListSchemaRevisionsResponse,error)ListSchemaRevisions lists the schema revisions.
func (*GServer) ListSchemas
func(s*GServer)ListSchemas(_context.Context,req*pb.ListSchemasRequest)(*pb.ListSchemasResponse,error)ListSchemas lists the available schemas in this server.
func (*GServer) ListSubscriptions
func(s*GServer)ListSubscriptions(_context.Context,req*pb.ListSubscriptionsRequest)(*pb.ListSubscriptionsResponse,error)ListSubscriptions lists the Pub/Sub subscriptions in this server.
func (*GServer) ListTopicSubscriptions
func(s*GServer)ListTopicSubscriptions(_context.Context,req*pb.ListTopicSubscriptionsRequest)(*pb.ListTopicSubscriptionsResponse,error)ListTopicSubscriptions lists the subscriptions associated with a topic.
func (*GServer) ListTopics
func(s*GServer)ListTopics(_context.Context,req*pb.ListTopicsRequest)(*pb.ListTopicsResponse,error)ListTopics lists the topics in this server.
func (*GServer) ModifyAckDeadline
func(s*GServer)ModifyAckDeadline(_context.Context,req*pb.ModifyAckDeadlineRequest)(*emptypb.Empty,error)ModifyAckDeadline modifies the ack deadline of the message.
func (*GServer) Publish
func(s*GServer)Publish(_context.Context,req*pb.PublishRequest)(*pb.PublishResponse,error)Publish sends a message to the topic.
func (*GServer) Pull
func(s*GServer)Pull(ctxcontext.Context,req*pb.PullRequest)(*pb.PullResponse,error)Pull returns a list of unacknowledged messages from a subscription.
func (*GServer) RollbackSchema
RollbackSchema rolls back the current schema to a previous revision by copying and creating a new revision.
func (*GServer) Seek
func(s*GServer)Seek(ctxcontext.Context,req*pb.SeekRequest)(*pb.SeekResponse,error)Seek updates a subscription to a specific point in time or snapshot.
func (*GServer) StreamingPull
func(s*GServer)StreamingPull(spspb.Subscriber_StreamingPullServer)errorStreamingPull return a stream to pull messages from a subscription.
func (*GServer) UpdateSubscription
func(s*GServer)UpdateSubscription(_context.Context,req*pb.UpdateSubscriptionRequest)(*pb.Subscription,error)UpdateSubscription updates an existing Pub/Sub subscription.
func (*GServer) UpdateTopic
UpdateTopic updates the Pub/Sub topic.
func (*GServer) ValidateMessage
func(s*GServer)ValidateMessage(_context.Context,req*pb.ValidateMessageRequest)(*pb.ValidateMessageResponse,error)ValidateMessage mocks the ValidateMessage call but only checks that the schema definition to validate themessage against is not empty.
func (*GServer) ValidateSchema
func(s*GServer)ValidateSchema(_context.Context,req*pb.ValidateSchemaRequest)(*pb.ValidateSchemaResponse,error)ValidateSchema mocks the ValidateSchema call but only checks that the schema definition is not empty.
Message
typeMessagestruct{IDstringData[]byteAttributesmap[string]stringPublishTimetime.TimeDeliveriesint// number of times delivery of the message was attemptedAcksint// number of acks received from clientsModacks[]Modack// modacks received by server for this messageOrderingKeystringTopicstring// contains filtered or unexported fields}A Message is a message that was published to the server.
Modack
Modack represents a modack sent to the server.
Reactor
typeReactorinterface{// React handles the message types and returns results. If "handled" is false,// then the test server will ignore the results and continue to the next reactor// or the original handler.React(_interface{})(handledbool,retinterface{},errerror)}Reactor is an interface to allow reaction function to a certain call.
ReactorOptions
ReactorOptions is a map that Server uses to look up reactors.Key is the function name, value is array of reactor for the function.
Server
typeServerstruct{Addrstring// The address that the server is listening on.GServerGServer// Not intended to be used directly.// contains filtered or unexported fields}Server is a fake Pub/Sub server.
func NewServer
funcNewServer(opts...ServerReactorOption)*ServerNewServer creates a new fake server running in the current process.
Example
ctx:=context.Background()// Start a fake server running locally.srv:=pstest.NewServer()defersrv.Close()// Connect to the server without using TLS.conn,err:=grpc.Dial(srv.Addr,grpc.WithTransportCredentials(insecure.NewCredentials()))iferr!=nil{// TODO: Handle error.}deferconn.Close()// Use the connection when creating a pubsub client.client,err:=pubsub.NewClient(ctx,"project",option.WithGRPCConn(conn))iferr!=nil{// TODO: Handle error.}deferclient.Close()_=client// TODO: Use the client.func NewServerWithAddress
funcNewServerWithAddress(addressstring,opts...ServerReactorOption)*ServerNewServerWithAddress creates a new fake server running in the current processat the specified address (host and port).
func NewServerWithCallback
funcNewServerWithCallback(portint,callbackfunc(*grpc.Server),opts...ServerReactorOption)*ServerNewServerWithCallback creates new fake server running in the current processat the specified port. Before starting the server, the provided callback iscalled to allow caller to register additional fakes into grpc server.
func NewServerWithPort
funcNewServerWithPort(portint,opts...ServerReactorOption)*ServerNewServerWithPort creates a new fake server running in the current process atthe specified port.
Example
ctx:=context.Background()// Start a fake server running locally at 9001.srv:=pstest.NewServerWithPort(9001)defersrv.Close()// Connect to the server without using TLS.conn,err:=grpc.Dial(srv.Addr,grpc.WithTransportCredentials(insecure.NewCredentials()))iferr!=nil{// TODO: Handle error.}deferconn.Close()// Use the connection when creating a pubsub client.client,err:=pubsub.NewClient(ctx,"project",option.WithGRPCConn(conn))iferr!=nil{// TODO: Handle error.}deferclient.Close()_=client// TODO: Use the client.func (*Server) AddPublishResponse
func(s*Server)AddPublishResponse(pbr*pb.PublishResponse,errerror)AddPublishResponse adds a new publish response to the channel used forresponding to publish requests.
func (*Server) ClearMessages
func(s*Server)ClearMessages()ClearMessages removes all published messagesfrom internal containers.
func (*Server) Close
Close shuts down the server and releases all resources.
func (*Server) Message
Message returns the message with the given ID, or nil if no messagewith that ID was published.
func (*Server) Messages
Messages returns information about all messages ever published.
func (*Server) Publish
Publish behaves as if the Publish RPC was called with a message with the givendata and attrs. It returns the ID of the message.The topic will be created if it doesn't exist.
Publish panics if there is an error, which is appropriate for testing.
func (*Server) PublishOrdered
PublishOrdered behaves as if the Publish RPC was called with a message with the givendata, attrs and ordering key. It returns the ID of the message.The topic will be created if it doesn't exist.
PublishOrdered panics if there is an error, which is appropriate for testing.
func (*Server) ResetPublishResponses
ResetPublishResponses resets the buffered publishResponses channelwith a new buffered channel with the given size.
func (*Server) SetAutoPublishResponse
SetAutoPublishResponse controls whether to automatically respondto messages published or to use user-added responses from thepublishResponses channel.
func (*Server) SetStreamTimeout
SetStreamTimeout sets the amount of time a stream will be active before it shutsitself down. This mimics the real service's behavior of closing streams after 30minutes. If SetStreamTimeout is never called or is passed zero, streams never shutdown.
func (*Server) SetTimeNowFunc
SetTimeNowFunc registers f as a function tobe used instead of time.Now for this server.
func (*Server) Wait
func(s*Server)Wait()Wait blocks until all server activity has completed.
ServerReactorOption
ServerReactorOption is options passed to the server for reactor creation.
func WithErrorInjection
funcWithErrorInjection(funcNamestring,codecodes.Code,msgstring)ServerReactorOptionWithErrorInjection creates a ServerReactorOption that injects error with defined status code andmessage for a certain function.
Except as otherwise noted, the content of this page is licensed under theCreative Commons Attribution 4.0 License, and code samples are licensed under theApache 2.0 License. For details, see theGoogle Developers Site Policies. Java is a registered trademark of Oracle and/or its affiliates.
Last updated 2025-10-30 UTC.