Skip to content

Schemas

First, we'll go over basics and examine the Schema class. Then, we'll continue to build our example. You can also skip ahead to the example.

The Schema class

You can import Schema like so:

from dbdeclare.entities import Schema

Schema is a representation of a Postgres schema. It is a database-wide entity. This means each schema in your database (not cluster) must have a unique name. The __init__ args correspond to the SQL CREATE SCHEMA arguments found in the Postgres documentation.

Take a look at the class docstrings for more detail, like an explanation of the __init__ args, the various methods declared, what classes it inherits from, and more.

Example

Let's keep building our example. In addition to the databases and roles we've declared, we need a log schema for two tables that don't go in the default schema. We need this for each database. Since each database corresponds to each stage, we can simply create a new schema as part of our declare_stage function:

from dbdeclare.entities import Database, Role, Schema


def declare_stage(stage: str) -> None:
    db = Database(name=stage)

    # "groups" aka non-login roles
    etl_writer = Role(name=f"{stage}_etl_writer")
    ml_writer = Role(name=f"{stage}_ml_writer")
    reader = Role(name=f"{stage}_reader")

    # "users" aka login roles
    Role(name=f"{stage}_etl", login=True, password="fake", in_role=[etl_writer, reader])
    Role(name=f"{stage}_ml", login=True, password="fake", in_role=[ml_writer, reader])

    # create extra schemas
    Schema(name="log", database=db)

# omitted code below

Note that we assign the output of our call to Database to a variable now (db) so that we can explicitly refer to it when we create our log Schema. We don't need to create the default schema, as that will already exist when a new database is created.

Entire file now:

from dbdeclare.entities import Database, Role, Schema


def declare_stage(stage: str) -> None:
    db = Database(name=stage)

    # "groups" aka non-login roles
    etl_writer = Role(name=f"{stage}_etl_writer")
    ml_writer = Role(name=f"{stage}_ml_writer")
    reader = Role(name=f"{stage}_reader")

    # "users" aka login roles
    Role(name=f"{stage}_etl", login=True, password="fake", in_role=[etl_writer, reader])
    Role(name=f"{stage}_ml", login=True, password="fake", in_role=[ml_writer, reader])

    # create extra schemas
    Schema(name="log", database=db)

    # create log role and grant privileges if not test stage
    if stage != "test":
        log_role = Role(name=f"{stage}_logger")
        Role(name=f"{stage}_api", login=True, password="fake", in_role=[log_role, reader])


def main() -> None:
    stages = ["test", "dev", "prod"]
    for stage in stages:
        declare_stage(stage)


if __name__ == "__main__":
    main()

This code now declares the databases, roles, and schemas we need. All that's left is to declare the tables + columns, grant privileges, then actually push all this to the cluster. Let's add some tables.