Redis Backend

Redis Settings

class progress_updater.backends.redis.RedisSettings(*, redis_host: str, redis_port: int = 6379, redis_db: int = 1, redis_password: str, redis_extras: Dict = None)

Redis Settings. Defines settings for Redis Backend

backend() Type[RedisLog]

Returns a RedisLog class and set Redis backend settings

Usage:
>>> from progress_updater.backends import RedisSettings
>>>
>>> settings = RedisSettings(
>>>     redis_host="redis",
>>>     redis_port="6379",
>>>     redis_db="logs",
>>>     redis_password="pass"
>>> )
>>> RedisLog = RedisSettings.backend()  # type: Type[RedisLog]
>>> log = RedisLog(task_name="My task", description="A cool task")
>>> log.save()

Redis Log

class progress_updater.backends.redis.RedisLog(*, uuid: UUID = None, task_name: str, status: str = 'PENDING', log: str = '', description: str = None, start_time: datetime = None, end_time: datetime = None, created: datetime = None, updated: datetime = None)

RedisLog class. Defines the Log for Redis Backend

Usage:

>>> from progress_updater.backends import RedisSettings
>>>
>>> settings = RedisSettings(
>>>     redis_host="redis",
>>>     redis_port="6379",
>>>     redis_db="logs",
>>>     redis_password="pass"
>>> )
>>> RedisLog = RedisSettings.backend()  # type: Type[RedisLog]
>>> log = RedisLog(task_name="My task", description="A cool task")
>>> log.save()
>>>
>>> assert log.dict() == {"task_name": "My task", ...}
>>> assert log.json() == '{"task_name": "My task", ...}'
>>>
>>> log = RedisLog.get(uuid=UUID("<your-uuid>"))
>>> assert log.description == "A cool task"
>>>
>>> assert log.delete() == 1
delete() int

Deletes object in DataBase

Usage:

>>> ...
>>> assert log.delete() == 1  # count deleted 1
>>> assert log.delete() == 0  # count deleted 0
>>> ...
classmethod get(uuid: UUID) Optional[RedisLog]

Get object from DataBase

Usage:

>>> ...
>>> log = RedisLog.get(uuid=UUID("<your-uuid>"))
>>> assert log.uuid == UUID("<your-uuid>")
>>>
save() RedisLog

Updates/Creates object in DataBase

Usage:

>>> ...
>>> log = RedisLog(task_name="My Task")
>>> log.save()
>>> log.description = "A new description"
>>> log.save()
>>> ...