Schemas¶
First, we'll go over basics and examine the Schema class. Then, we'll continue
to build our example. You can also skip ahead to the example.
The Schema class¶
You can import Schema like so:
Schema is a representation of a Postgres schema.
It is a database-wide entity. This means each schema in your database (not cluster) must have a unique name.
The __init__ args correspond to the SQL CREATE SCHEMA arguments found in the Postgres documentation.
Take a look at the class docstrings for more detail, like an explanation of the __init__ args, the various
methods declared, what classes it inherits from, and more.
Example¶
Let's keep building our example. In addition to the databases and roles we've declared, we need a log schema
for two tables that don't go in the default schema. We need this for each database.
Since each database corresponds to each stage, we can simply create a new schema as part
of our declare_stage function:
from dbdeclare.entities import Database, Role, Schema
def declare_stage(stage: str) -> None:
    db = Database(name=stage)
    # "groups" aka non-login roles
    etl_writer = Role(name=f"{stage}_etl_writer")
    ml_writer = Role(name=f"{stage}_ml_writer")
    reader = Role(name=f"{stage}_reader")
    # "users" aka login roles
    Role(name=f"{stage}_etl", login=True, password="fake", in_role=[etl_writer, reader])
    Role(name=f"{stage}_ml", login=True, password="fake", in_role=[ml_writer, reader])
    # create extra schemas
    Schema(name="log", database=db)
# omitted code below
Note that we assign the output of our call to Database to a variable now (db) so that we
can explicitly refer to it when we create our log Schema. We don't need to create the default
schema, as that will already exist when a new database is created.
Entire file now:
from dbdeclare.entities import Database, Role, Schema
def declare_stage(stage: str) -> None:
    db = Database(name=stage)
    # "groups" aka non-login roles
    etl_writer = Role(name=f"{stage}_etl_writer")
    ml_writer = Role(name=f"{stage}_ml_writer")
    reader = Role(name=f"{stage}_reader")
    # "users" aka login roles
    Role(name=f"{stage}_etl", login=True, password="fake", in_role=[etl_writer, reader])
    Role(name=f"{stage}_ml", login=True, password="fake", in_role=[ml_writer, reader])
    # create extra schemas
    Schema(name="log", database=db)
    # create log role and grant privileges if not test stage
    if stage != "test":
        log_role = Role(name=f"{stage}_logger")
        Role(name=f"{stage}_api", login=True, password="fake", in_role=[log_role, reader])
def main() -> None:
    stages = ["test", "dev", "prod"]
    for stage in stages:
        declare_stage(stage)
if __name__ == "__main__":
    main()
This code now declares the databases, roles, and schemas we need. All that's left is to declare the tables + columns, grant privileges, then actually push all this to the cluster. Let's add some tables.