|
| 1 | +""" |
| 2 | +databricks-sql-connector includes a SQLAlchemy dialect compatible with Databricks SQL. |
| 3 | +It aims to be a drop-in replacement for the crflynn/sqlalchemy-databricks project, that implements |
| 4 | +more of the Databricks API, particularly around table reflection, Alembic usage, and data |
| 5 | +ingestion with pandas. |
| 6 | +
|
| 7 | +Because of the extent of SQLAlchemy's capabilities it isn't feasible to provide examples of every |
| 8 | +usage in a single script, so we only provide a basic one here. More examples are found in our test |
| 9 | +suite at tests/e2e/sqlalchemy/test_basic.py and in the PR that implements this change: |
| 10 | +
|
| 11 | +https://github.com/databricks/databricks-sql-python/pull/57 |
| 12 | +
|
| 13 | +# What's already supported |
| 14 | +
|
| 15 | +Most of the functionality is demonstrated in the e2e tests mentioned above. The below list we |
| 16 | +derived from those test method names: |
| 17 | +
|
| 18 | + - Create and drop tables with SQLAlchemy Core |
| 19 | + - Create and drop tables with SQLAlchemy ORM |
| 20 | + - Read created tables via reflection |
| 21 | + - Modify column nullability |
| 22 | + - Insert records manually |
| 23 | + - Insert records with pandas.to_sql (note that this does not work for DataFrames with indexes) |
| 24 | +
|
| 25 | +This connector also aims to support Alembic for programmatic delta table schema maintenance. This |
| 26 | +behaviour is not yet backed by integration tests, which will follow in a subsequent PR as we learn |
| 27 | +more about customer use cases there. That said, the following behaviours have been tested manually: |
| 28 | +
|
| 29 | + - Autogenerate revisions with alembic revision --autogenerate |
| 30 | + - Upgrade and downgrade between revisions with `alembic upgrade <revision hash>` and |
| 31 | + `alembic downgrade <revision hash>` |
| 32 | +
|
| 33 | +# Known Gaps |
| 34 | + - MAP, ARRAY, and STRUCT types: this dialect can read these types out as strings. But you cannot |
| 35 | + define a SQLAlchemy model with databricks.sqlalchemy.dialect.types.DatabricksMap (e.g.) because |
| 36 | + we haven't implemented them yet. |
| 37 | + - Constraints: with the addition of information_schema to Unity Catalog, Databricks SQL supports |
| 38 | + foreign key and primary key constraints. This dialect can write these constraints but the ability |
| 39 | + for alembic to reflect and modify them programmatically has not been tested. |
| 40 | +""" |
| 41 | + |
| 42 | +importos |
| 43 | +fromsqlalchemy.ormimportdeclarative_base,Session |
| 44 | +fromsqlalchemyimportColumn,String,Integer,BOOLEAN,create_engine,select |
| 45 | + |
| 46 | +host=os.getenv("DATABRICKS_SERVER_HOSTNAME") |
| 47 | +http_path=os.getenv("DATABRICKS_HTTP_PATH") |
| 48 | +access_token=os.getenv("DATABRICKS_TOKEN") |
| 49 | +catalog=os.getenv("DATABRICKS_CATALOG") |
| 50 | +schema=os.getenv("DATABRICKS_SCHEMA") |
| 51 | + |
| 52 | + |
| 53 | +# Extra arguments are passed untouched to the driver |
| 54 | +# See thrift_backend.py for complete list |
| 55 | +extra_connect_args= { |
| 56 | +"_tls_verify_hostname":True, |
| 57 | +"_user_agent_entry":"PySQL Example Script", |
| 58 | +} |
| 59 | + |
| 60 | +engine=create_engine( |
| 61 | +f"databricks://token:{access_token}@{host}?http_path={http_path}&catalog={catalog}&schema={schema}", |
| 62 | +connect_args=extra_connect_args, |
| 63 | +) |
| 64 | +session=Session(bind=engine) |
| 65 | +base=declarative_base(bind=engine) |
| 66 | + |
| 67 | + |
| 68 | +classSampleObject(base): |
| 69 | + |
| 70 | +__tablename__="mySampleTable" |
| 71 | + |
| 72 | +name=Column(String(255),primary_key=True) |
| 73 | +episodes=Column(Integer) |
| 74 | +some_bool=Column(BOOLEAN) |
| 75 | + |
| 76 | + |
| 77 | +base.metadata.create_all() |
| 78 | + |
| 79 | +sample_object_1=SampleObject(name="Bim Adewunmi",episodes=6,some_bool=True) |
| 80 | +sample_object_2=SampleObject(name="Miki Meek",episodes=12,some_bool=False) |
| 81 | + |
| 82 | +session.add(sample_object_1) |
| 83 | +session.add(sample_object_2) |
| 84 | + |
| 85 | +session.commit() |
| 86 | + |
| 87 | +stmt=select(SampleObject).where(SampleObject.name.in_(["Bim Adewunmi","Miki Meek"])) |
| 88 | + |
| 89 | +output= [iforiinsession.scalars(stmt)] |
| 90 | +assertlen(output)==2 |
| 91 | + |
| 92 | +base.metadata.drop_all() |