SQLAlchemy
Hi in this post I will try to explain how implement sqlalchemy of async way.
Index
Requirements
- Docker
- docker compose
- Python >= 3.10
- Clone repository: learning-sqlalchemy
For this example I use Docker of this way you don’t need install postgres or another tools.
After all, I’m going to explain the environment structure for work:
/learning-sqlalchemy
├── /docker
│ ├── docker-compose.yml
│ └── Dockerfile
├── manage.sh # main file
├── Pipfile
├── Pipfile.lock
└── /src
└── /examples
└── connection.py
Of this way you don’t need to install any package in your system and all run in docker container.
For run the examples you need great permissions of execution to file manage.sh, for this, execute this command:
chmod +x manage.sh
And now you can execute for example the connection example:
./manage.sh connection
Maybe you don’t know how you can know which commands have this project, well for list the possible commands only need execute:
./manage.sh
Now let’s get started explaining the examples.
Exercises
Connection
At this point you need have cloned the repository learning-sqlalchemy, you can execute this example with this command:
./manage.sh connection
After execute this example you will get the next output in the terminal:
Attaching to db-postgres-1, exercise-1
db-postgres-1 |
db-postgres-1 | PostgreSQL Database directory appears to contain a database; Skipping initialization
db-postgres-1 |
db-postgres-1 | 2024-10-19 23:46:24.300 UTC [1] LOG: starting PostgreSQL 16.4 (Debian 16.4-1.pgdg120+1) on x86_64-pc-linux-gnu, compiled by gcc (Debian 12.2.0-14) 12.2.0, 64-bit
db-postgres-1 | 2024-10-19 23:46:24.300 UTC [1] LOG: listening on IPv4 address "0.0.0.0", port 5432
db-postgres-1 | 2024-10-19 23:46:24.300 UTC [1] LOG: listening on IPv6 address "::", port 5432
db-postgres-1 | 2024-10-19 23:46:24.302 UTC [1] LOG: listening on Unix socket "/var/run/postgresql/.s.PGSQL.5432"
db-postgres-1 | 2024-10-19 23:46:24.306 UTC [29] LOG: database system was shut down at 2024-10-19 23:38:45 UTC
db-postgres-1 | 2024-10-19 23:46:24.312 UTC [1] LOG: database system is ready to accept connections
exercise-1 | SELECT 1: 1
exercise-1 exited with code 0
Aborting on container exit...
[+] Stopping 2/2
✔ Container learning-sqlalchemy-exercise-1 Stopped 0.0s
✔ Container learning-sqlalchemy-db-postgres-1 Stopped
The more important in this output is the line exercise-1 | SELECT 1: 1 this is the result of a select 1, and 1 is the return of this query, now let my explain the code in the connection exercise:
async_engine = create_async_engine(
URL.create(
drivername="postgresql+asyncpg",
username="postgres",
password=os.getenv("DB_PASS"),
host=os.getenv("DB_HOST"),
port=5432,
database=os.getenv("DB_NAME"),
)
)
For async way we need to implement a async engine and for that sqlalchemy has create_async_engine1, for use this engine we need to set a url connection and a good practice is use URL2 of sqlalchemy because can parse any character for example the character # some time cause problem when is used in password without parsing, and parsing now scape the character and is manage automatically for sqlalchemy.
Once we have the engine need create a session, a way to manage the session is using async_scoped_session3 with this function the sessions is reused and we don’t need to do anything:
async_session = async_scoped_session(
async_sessionmaker(async_engine, expire_on_commit=False), scopefunc=current_task
)
This function only need that you pass a async_sessionmaker4 and in async_sessionmaker you can stablish if the session should be expire after commit or an autoflush behavior the argument scopefunc it’s important because this way the runtime know how implement the correct behavior in the correct task that is running whit the task of asyncio.
Now we need start the session with the next function:
async def db_psql_session() -> AsyncSession:
session = async_session()
return session
With this function we start a session and is returned to be used.
For use this function in this example we use for make a something equals to a PING the PING in SQL is a query of select(1):
async def check_connection():
session = await db_psql_session()
await session.begin()
try:
result = await session.execute(select(1))
print(f"SELECT 1: {result.scalar()}")
await session.commit()
except Exception as e:
print(f"Error to try check connection: {e}")
await session.rollback()
finally:
await session.close()
How you can see in this function with start the begin function of the sesssion this begin start a context for manage the sessin but we don’t use with statement for example how this:
with session.begin():
Maybe you know the behavior of with sentence, using this the context manager close automatically the session after exit of the with sentence, but when we don’t use the with sentence is necessary use explicitly the close and rollback; in this example we use scalar for extract the value of return [(1,)] if you use first or fetchall you will get a tuple or list object respectively.
And if all work fine we will have a print of SELECT: 1: 1 otherwise we will have and error and well be printed with the print of exception sentence, and after all we will close the session with await session.close().
Now a good practice working with session is close the connection after all executions, this is possible with the dispose5 function:
async def close_connection():
try:
await async_engine.dispose()
except Exception as e:
print(f"Error trying to close connection: {e}")
sys.exit(-1)
For this example we use the main function for manage the flow of this code, in python it’s a good practice when you create a script, use the if name == main6 sentence and of this way you can execute a custom execution flow for your functions. And for work a easy way with asyncio functions we use asyncio.run with this function is easy only pass you main async function and don’t need use a event_loops or other more complicated functions how Future or task.
async def main():
await check_connection()
await close_connection()
if __name__ == "__main__":
asyncio.run(main())
Models (Tables)
In this example we gonna cover the models topics, models are abstracts class of tables in database on this special example is using dataclass and attr7 use dataclass is a easy way of implement sqlalchemy but you need to follow the rules of dataclass8 for example for declarate an attribute it’s necessary set in the end the variables with default values, in this example also we gonna see some a little example of mixin9 pattern in sqlalchemy.
First we gonna to declarete a base class, this is a good practice because you can add more configuration to principal class and use inheritance:
class BaseModel(AsyncAttrs, DeclarativeBase, MappedAsDataclass):
pass
Of this way we are ready to use async, class models and with dataclass configuration. And now we have a base models, it’s time to declarate de mixin class:
class CommonMixin(MappedAsDataclass):
__abstract__ = True
class Status(str, Enum):
ENABLE = "1"
DISABLE = "0"
@declared_attr
def status(cls) -> Mapped[str]:
return mapped_column(String(2), default=cls.Status.ENABLE.value, nullable=False)
@declared_attr
def created_at(cls) -> Mapped[datetime]:
return mapped_column(
DateTime(timezone=True), default_factory=datetime.now, nullable=False
)
@declared_attr
def updated_at(cls) -> Mapped[datetime]:
return mapped_column(
DateTime(timezone=True),
onupdate=datetime.now,
nullable=False,
default_factory=datetime.now,
)
How you can see, all dataclass models the you wanna use it’s necessary inheritance the MappedAsDataclass, in this mixin class also is a good practice set the attribute abstract == True because we don’t want a table of mixin, we only want this class for declarate commons columns for example, status, created_at and updated_at, and now maybe you’re wondering use the decorator @declared_attr, this is because we need to follow the rules of dataclass, and if you use inheritance then we have a problem because the attribute with default value will be set at the beginning of all attributes and this brake the rule of dataclass, then for fix this we gonna use functions because the functions are established at the end, but for convert the functions to attributes it’s necessary use the decorator @declarad_attr.
Now for use this class see the next example:
def generate_uuid4() -> str:
return str(uuid4())
class ItemModel(BaseModel, CommonMixin, MappedAsDataclass):
__tablename__ = "items"
name: Mapped[str] = mapped_column(String(255), nullable=False)
id_item: Mapped[str] = mapped_column(
String(50), init=False, default_factory=generate_uuid4, primary_key=True
)
def __repr__(self) -> str:
return (
f"ItemModel(id_item={self.id_item!r}, name={self.name!r}, "
f"status={self.status!r}, created_at={self.created_at!r}, updated_at={self.updated_at!r})"
)
How you can see this table have a columns of:
- id_item
- name
- status
- created_at
- updated_at
We use a function for generate a uuid primary key and a function of repr for good practice an a more easy debuggin, maybe for you it’s strange declarate to finish the id_item but remember that rules of dataclass say the default values are set at the end.
A tip for manage the creations of the class is use sqlalchemy or some library special for that maybe alembic, in this example we gonna use sqlalchemy:
async def create_tables(connection: AsyncConnection):
try:
await connection.run_sync(BaseModel.metadata.create_all)
print("Databases created")
except Exception as e:
print(f"Error trying create tables: {e}")
It’s necessary use the run_sync function because this detect the dependencies between tables and are created, how we follow goods practice it’s easy use the BaseModel with the metadata.create_all function.
And now as in the previous example we gonna use the asyncio and the main function for use a custom flow, and the preview example of connection to database:
async def main():
async with async_engine.begin() as conn:
await create_tables(connection=conn)
session = await db_psql_session()
await session.begin()
result = await session.execute(select(ItemModel))
print("RESULT: ", result.all())
if __name__ == "__main__":
asyncio.run(main())