Redis Backend
Redis Settings
- class progress_updater.backends.redis.RedisSettings(*, redis_host: str, redis_port: int = 6379, redis_db: int = 1, redis_password: str, redis_extras: Dict = None)
Redis Settings. Defines settings for Redis Backend
- backend() Type[RedisLog]
Returns a RedisLog class and set Redis backend settings
- Usage:
>>> from progress_updater.backends import RedisSettings >>> >>> settings = RedisSettings( >>> redis_host="redis", >>> redis_port="6379", >>> redis_db="logs", >>> redis_password="pass" >>> ) >>> RedisLog = RedisSettings.backend() # type: Type[RedisLog] >>> log = RedisLog(task_name="My task", description="A cool task") >>> log.save()
Redis Log
- class progress_updater.backends.redis.RedisLog(*, uuid: UUID = None, task_name: str, status: str = 'PENDING', log: str = '', description: str = None, start_time: datetime = None, end_time: datetime = None, created: datetime = None, updated: datetime = None)
RedisLog class. Defines the Log for Redis Backend
Usage:
>>> from progress_updater.backends import RedisSettings >>> >>> settings = RedisSettings( >>> redis_host="redis", >>> redis_port="6379", >>> redis_db="logs", >>> redis_password="pass" >>> ) >>> RedisLog = RedisSettings.backend() # type: Type[RedisLog] >>> log = RedisLog(task_name="My task", description="A cool task") >>> log.save() >>> >>> assert log.dict() == {"task_name": "My task", ...} >>> assert log.json() == '{"task_name": "My task", ...}' >>> >>> log = RedisLog.get(uuid=UUID("<your-uuid>")) >>> assert log.description == "A cool task" >>> >>> assert log.delete() == 1
- delete() int
Deletes object in DataBase
Usage:
>>> ... >>> assert log.delete() == 1 # count deleted 1 >>> assert log.delete() == 0 # count deleted 0 >>> ...