Movatterモバイル変換


[0]ホーム

URL:


Home  /  Reference  /  CREATE SOURCE

CREATE SOURCE: PostgreSQL

CREATE SOURCE connects Materialize to an external system you want to read data from, and provides details about how to decode and interpret that data.

Materialize supports PostgreSQL (11+) as a data source. To connect to aPostgreSQL instance, you first need tocreate a connectionthat specifies access and authentication parameters.Once created, a connection isreusable across multipleCREATE SOURCEstatements.

WARNING! Before creating a PostgreSQL source, you must set up logical replication in theupstream database. For step-by-step instructions, see the integration guide foryour PostgreSQL service:AlloyDB,Amazon RDS,Amazon Aurora,Azure DB,Google Cloud SQL,Self-hosted.

Syntax

CREATE SOURCEIF NOT EXISTSsrc_nameIN CLUSTERcluster_nameFROMPOSTGRESCONNECTIONconnection_name(PUBLICATIONpublication_name,TEXT COLUMNS(column_name,))FOR ALL TABLESFOR TABLES(table_nameASsubsrc_name,FOR SCHEMAS(schema_name,)EXPOSEPROGRESSASprogress_subsource_namewith_options

with_options

WITH(RETAIN HISTORY=FORretention_period)
FieldUse
src_nameThe name for the source.
IF NOT EXISTSDo nothing (except issuing a notice) if a source with the same name already exists.Default.
IN CLUSTERcluster_nameThecluster to maintain this source.
CONNECTIONconnection_nameThe name of the PostgreSQL connection to use in the source. For details on creating connections, check theCREATE CONNECTION documentation page.
FOR ALL TABLESCreate subsources for all tables in the publication.
FOR SCHEMAS (schema_list)Create subsources for specific schemas in the publication.
FOR TABLES (table_list)Create subsources for specific tables in the publication.
EXPOSE PROGRESS ASprogress_subsource_nameThe name of the progress collection for the source. If this is not specified, the progress collection will be named<src_name>_progress. For more information, seeMonitoring source progress.
RETAIN HISTORY FOR
retention_period
Private preview. This option has known performance or stability issues and is under active development. Duration for which Materialize retains historical data, which is useful to implementdurable subscriptions. Accepts positiveinterval values (e.g.'1hr'). Default:1s.

CONNECTION options

FieldValueDescription
PUBLICATIONtextRequired. The PostgreSQLpublication (the replication data set containing the tables to be streamed to Materialize).
TEXT COLUMNSA list of namesDecode data astext for specific columns that contain PostgreSQL types that are unsupported in Materialize.

Features

Change data capture

This source uses PostgreSQL’s native replication protocol to continually ingestchanges resulting fromINSERT,UPDATE andDELETE operations in theupstream database — a process also known aschange data capture.

For this reason, you must configure the upstream PostgreSQL database to supportlogical replication before creating a source in Materialize. For step-by-stepinstructions, see the integration guide for your PostgreSQL service:AlloyDB,Amazon RDS,Amazon Aurora,Azure DB,Google Cloud SQL,Self-hosted.

Creating a source

To avoid creating multiple replication slots in the upstream PostgreSQL databaseand minimize the required bandwidth, Materialize ingests the raw replicationstream data for some specific set of tables in your publication.

CREATESOURCEmz_sourceFROMPOSTGRESCONNECTIONpg_connection(PUBLICATION'mz_source')FORALLTABLES;

When you define a source, Materialize will automatically:

  1. Create areplication slot in the upstream PostgreSQL database (seePostgreSQL replication slots).

    The name of the replication slot created by Materialize is prefixed withmaterialize_ for easy identification, and can be looked up inmz_internal.mz_postgres_sources.

    SELECTid,replication_slotFROMmz_internal.mz_postgres_sources;
       id   |             replication_slot--------+---------------------------------------------- u8     | materialize_7f8a72d0bf2a4b6e9ebc4e61ba769b71
  2. Create asubsource for each original table in the publication.

    SHOWSOURCES;
             name         |   type----------------------+----------- mz_source            | postgres mz_source_progress   | progress table_1              | subsource table_2              | subsource

    And perform an initial, snapshot-based sync of the tables in the publicationbefore it starts ingesting change events.

  3. Incrementally update any materialized or indexed views that depend on thesource as change events stream in, as a result ofINSERT,UPDATE andDELETE operations in the upstream PostgreSQL database.

It’s important to note that the schema metadata is captured when the source isinitially created, and is validated against the upstream schema upon restart.If you create new tables upstream after creating a PostgreSQL source and want toreplicate them to Materialize, the source must be dropped and recreated.

PostgreSQL replication slots

Each source ingests the raw replication stream data for all tables in thespecified publication usinga single replication slot. This allows you tominimize the performance impact on the upstream database, as well as reuse thesame source across multiple materializations.

WARNING! Make sure to delete any replication slots if you stop using Materialize, or ifeither the Materialize or PostgreSQL instances crash. To look up the name ofthe replication slot created for each source, usemz_internal.mz_postgres_sources.

If you delete all objects that depend on a source without also dropping thesource, the upstream replication slot will linger and continue to accumulatedata so that the source can resume in the future. To avoid unbounded disk spaceusage, make sure to useDROP SOURCE or manually deletethe replication slot.

For PostgreSQL 13+, it is recommended that you set a reasonable value formax_slot_wal_keep_sizeto limit the amount of storage used by replication slots.

PostgreSQL schemas

CREATE SOURCE will attempt to create each upstream table in the same schema asthe source. This may lead to naming collisions if, for example, you arereplicatingschema1.table_1 andschema2.table_1. Use theFOR TABLESclause to provide aliases for each upstream table, in such cases, or to specifyan alternative destination schema in Materialize.

CREATESOURCEmz_sourceFROMPOSTGRESCONNECTIONpg_connection(PUBLICATION'mz_source')FORTABLES(schema1.table_1ASs1_table_1,schema2_table_1ASs2_table_1);

Monitoring source progress

By default, PostgreSQL sources expose progress metadata as a subsource that youcan use to monitor sourceingestion progress. The name of the progresssubsource can be specified when creating a source using theEXPOSE PROGRESS AS clause; otherwise, it will be named<src_name>_progress.

The following metadata is available for each source as a progress subsource:

FieldTypeMeaning
lsnuint8The last Log Sequence Number (LSN) consumed from the upstream PostgreSQL replication stream.

And can be queried using:

SELECTlsnFROM<src_name>_progress;

The reported LSN should increase as Materialize consumesnew WAL recordsfrom the upstream PostgreSQL database. For more details on monitoring sourceingestion progress and debugging related issues, seeTroubleshooting.

Known limitations

Schema changes

NOTE: Work to more smoothly support ddl changes to upstream tables is currently inprogress. The work introduces the ability to re-ingest the same upstream tableunder a new schema and switch over without downtime.

Materialize supports schema changes in the upstream database as follows:

Compatible schema changes

  • Adding columns to tables. Materialize willnot ingest new columns addedupstream unless you useDROP SOURCE to firstdrop the affected subsource, and then add the table back to the source usingALTER SOURCE...ADD SUBSOURCE.

  • Dropping columns that were added after the source was created. These columnsare never ingested, so you can drop them without issue.

  • Adding or removingNOT NULL constraints to tables that were nullable whenthe source was created.

Incompatible schema changes

All other schema changes to upstream tables will set the corresponding subsourceinto an error state, which prevents you from reading from the source.

To handle incompatibleschema changes, useDROP SOURCEandALTER SOURCE...ADD SUBSOURCE to first drop theaffected subsource, and then add the table back to the source. When you add thesubsource, it will have the updated schema from the corresponding upstreamtable.

Publication membership

PostgreSQL’s logical replication API does not provide a signal when users removetables from publications. Because of this, Materialize relies on periodic checksto determine if a table has been removed from a publication, at which time itgenerates an irrevocable error, preventing any values from being read from thetable.

However, it is possible to remove a table from a publication and then re-add itbefore Materialize notices that the table was removed. In this case, Materializecan no longer provide any consistency guarantees about the data we present fromthe table and, unfortunately, is wholly unaware that this occurred.

To mitigate this issue, if you need to drop and re-add a table to a publication,ensure that you remove the table/subsource from the sourcebefore re-adding itusing theDROP SOURCE command.

Supported types

Materialize natively supports the following PostgreSQL types (including thearray type for each of the types):

  • bool
  • bpchar
  • bytea
  • char
  • date
  • daterange
  • float4
  • float8
  • int2
  • int2vector
  • int4
  • int4range
  • int8
  • int8range
  • interval
  • json
  • jsonb
  • numeric
  • numrange
  • oid
  • text
  • time
  • timestamp
  • timestamptz
  • tsrange
  • tstzrange
  • uuid
  • varchar

Replicating tables that containunsupporteddata types ispossible via theTEXT COLUMNS option. The specified columns will be treatedastext, and will thus not offer the expected PostgreSQL type features. Forexample:

  • enum: the implicit ordering of the original PostgreSQLenum type is notpreserved, as Materialize will sort values astext.

  • money: the resultingtext value cannot be cast back to e.g.numeric,since PostgreSQL adds typical currency formatting to the output.

Truncation

Upstream tables replicated into Materialize should not be truncated. If anupstream table is truncated while replicated, the whole source becomesinaccessible and will not produce any data until it is recreated. Instead oftruncating, you can use an unqualifiedDELETE to remove all rows from thetable:

DELETEFROMt;

Inherited tables

When usingPostgreSQL table inheritance,PostgreSQL serves data fromSELECTs as if the inheriting tables’ data is alsopresent in the inherited table. However, both PostgreSQL’s logical replicationandCOPY only present data written to the tables themselves, i.e. theinheriting data isnot treated as part of the inherited table.

PostgreSQL sources use logical replication andCOPY to ingest table data, soinheriting tables’ data will only be ingested as part of the inheriting table,i.e. in Materialize, the data will not be returned when servingSELECTs fromthe inherited table.

You can mimic PostgreSQL’sSELECT behavior with inherited tables by creating amaterialized view that unions data from the inherited and inheriting tables(usingUNION ALL). However, if new tables inherit from the table, data fromthe inheriting tables will not be available in the view. You will need to addthe inheriting tables viaADD SUBSOURCE and create a new view (materialized ornon-) that unions the new table.

Examples

! Important: Before creating a PostgreSQL source, you must set up logical replication in theupstream database. For step-by-step instructions, see the integration guide foryour PostgreSQL service:AlloyDB,Amazon RDS,Amazon Aurora,Azure DB,Google Cloud SQL,Self-hosted.

Creating a connection

A connection describes how to connect and authenticate to an external system youwant Materialize to read data from.

Once created, a connection isreusable across multipleCREATE SOURCEstatements. For more details on creating connections, check theCREATE CONNECTION documentation page.

CREATESECRETpgpassAS'<POSTGRES_PASSWORD>';CREATECONNECTIONpg_connectionTOPOSTGRES(HOST'instance.foo000.us-west-1.rds.amazonaws.com',PORT5432,USER'postgres',PASSWORDSECRETpgpass,SSLMODE'require',DATABASE'postgres');

If your PostgreSQL server is not exposed to the public internet, you cantunnel the connectionthrough an AWS PrivateLink service or an SSH bastion host.

    CREATECONNECTIONprivatelink_svcTOAWSPRIVATELINK(SERVICENAME'com.amazonaws.vpce.us-east-1.vpce-svc-0e123abc123198abc',AVAILABILITYZONES('use1-az1','use1-az4'));
    CREATESECRETpgpassAS'<POSTGRES_PASSWORD>';CREATECONNECTIONpg_connectionTOPOSTGRES(HOST'instance.foo000.us-west-1.rds.amazonaws.com',PORT5432,USER'postgres',PASSWORDSECRETpgpass,AWSPRIVATELINKprivatelink_svc,DATABASE'postgres');

    For step-by-step instructions on creating AWS PrivateLink connections andconfiguring an AWS PrivateLink service to accept connections from Materialize,checkthis guide.

    CREATECONNECTIONssh_connectionTOSSHTUNNEL(HOST'bastion-host',PORT22,USER'materialize',);
    CREATECONNECTIONpg_connectionTOPOSTGRES(HOST'instance.foo000.us-west-1.rds.amazonaws.com',PORT5432,SSHTUNNELssh_connection,DATABASE'postgres');

    For step-by-step instructions on creating SSH tunnel connections and configuringan SSH bastion server to accept connections from Materialize, checkthis guide.

    Creating a source

    Create subsources for all tables included in the PostgreSQL publication

    CREATESOURCEmz_sourceFROMPOSTGRESCONNECTIONpg_connection(PUBLICATION'mz_source')FORALLTABLES;

    Create subsources for all tables from specific schemas included in thePostgreSQL publication

    CREATESOURCEmz_sourceFROMPOSTGRESCONNECTIONpg_connection(PUBLICATION'mz_source')FORSCHEMAS(public,project);

    Create subsources for specific tables included in the PostgreSQL publication

    CREATESOURCEmz_sourceFROMPOSTGRESCONNECTIONpg_connection(PUBLICATION'mz_source')FORTABLES(table_1,table_2ASalias_table_2);

    Handling unsupported types

    If the publication contains tables that usedata typesunsupported by Materialize, use theTEXT COLUMNS option to decode data astext for the affected columns. This option expects the upstream names of thereplicated table and column (i.e. as defined in your PostgreSQL database).

    CREATESOURCEmz_sourceFROMPOSTGRESCONNECTIONpg_connection(PUBLICATION'mz_source',TEXTCOLUMNS(upstream_table_name.column_of_unsupported_type))FORALLTABLES;

    Handling errors and schema changes

    NOTE: Work to more smoothly support ddl changes to upstream tables is currently inprogress. The work introduces the ability to re-ingest the same upstream tableunder a new schema and switch over without downtime.

    To handle upstreamschema changes or errored subsources, usetheDROP SOURCE syntax to drop the affectedsubsource, and thenALTER SOURCE...ADD SUBSOURCE to addthe subsource back to the source.

    -- List all subsources in mz_sourceSHOWSUBSOURCESONmz_source;-- Get rid of an outdated or errored subsourceDROPSOURCEtable_1;-- Start ingesting the table with the updated schema or fixALTERSOURCEmz_sourceADDSUBSOURCEtable_1;

    Adding subsources

    When adding subsources to a PostgreSQL source, Materialize opens a temporaryreplication slot to snapshot the new subsources’ current states. Aftercompleting the snapshot, the table will be kept up-to-date, like all othertables in the publication.

    Dropping subsources

    Dropping a subsource prevents Materialize from ingesting any data from it, inaddition to dropping any state that Materialize previously had for the table.

    Related pages

    Back to top ↑
    Join The Community

    On this page


    [8]ページ先頭

    ©2009-2025 Movatter.jp