Schemas¶
First, we'll go over basics and examine the Schema
class. Then, we'll continue
to build our example. You can also skip ahead to the example.
The Schema class¶
You can import Schema
like so:
Schema
is a representation of a Postgres schema.
It is a database-wide entity. This means each schema in your database (not cluster) must have a unique name.
The __init__
args correspond to the SQL CREATE SCHEMA
arguments found in the Postgres documentation.
Take a look at the class docstrings for more detail, like an explanation of the __init__
args, the various
methods declared, what classes it inherits from, and more.
Example¶
Let's keep building our example. In addition to the databases and roles we've declared, we need a log
schema
for two tables that don't go in the default schema. We need this for each database.
Since each database corresponds to each stage, we can simply create a new schema as part
of our declare_stage
function:
from dbdeclare.entities import Database, Role, Schema
def declare_stage(stage: str) -> None:
db = Database(name=stage)
# "groups" aka non-login roles
etl_writer = Role(name=f"{stage}_etl_writer")
ml_writer = Role(name=f"{stage}_ml_writer")
reader = Role(name=f"{stage}_reader")
# "users" aka login roles
Role(name=f"{stage}_etl", login=True, password="fake", in_role=[etl_writer, reader])
Role(name=f"{stage}_ml", login=True, password="fake", in_role=[ml_writer, reader])
# create extra schemas
Schema(name="log", database=db)
# omitted code below
Note that we assign the output of our call to Database
to a variable now (db
) so that we
can explicitly refer to it when we create our log Schema
. We don't need to create the default
schema, as that will already exist when a new database is created.
Entire file now:
from dbdeclare.entities import Database, Role, Schema
def declare_stage(stage: str) -> None:
db = Database(name=stage)
# "groups" aka non-login roles
etl_writer = Role(name=f"{stage}_etl_writer")
ml_writer = Role(name=f"{stage}_ml_writer")
reader = Role(name=f"{stage}_reader")
# "users" aka login roles
Role(name=f"{stage}_etl", login=True, password="fake", in_role=[etl_writer, reader])
Role(name=f"{stage}_ml", login=True, password="fake", in_role=[ml_writer, reader])
# create extra schemas
Schema(name="log", database=db)
# create log role and grant privileges if not test stage
if stage != "test":
log_role = Role(name=f"{stage}_logger")
Role(name=f"{stage}_api", login=True, password="fake", in_role=[log_role, reader])
def main() -> None:
stages = ["test", "dev", "prod"]
for stage in stages:
declare_stage(stage)
if __name__ == "__main__":
main()
This code now declares the databases, roles, and schemas we need. All that's left is to declare the tables + columns, grant privileges, then actually push all this to the cluster. Let's add some tables.