Skip to content

Instantly share code, notes, and snippets.

@flyinactor91
Created February 7, 2018 15:47
Show Gist options
  • Save flyinactor91/033cee7b317bb12b0a8c9a7e501eb0f9 to your computer and use it in GitHub Desktop.
Save flyinactor91/033cee7b317bb12b0a8c9a7e501eb0f9 to your computer and use it in GitHub Desktop.
I needed to replace a memory bound queue with something that could be shared between programs running on different machines. I don’t need something like RabbitMQ or a task-based queue like RQ. Implemented a single-item queue-like class using redis
import redis
class TinyQueue(object):
"""
A single-item queue to share data between sandboxed projects using Redis
Instances of this class with the same 'tag' will be connected
'overwrite' will cause new push data to overwrite the previous
overwrite=False -> FIFO
overwrite=True -> LIFO
"""
def __init__(self, tag: str, overwrite: bool = False, **redis_args):
self.__conn = redis.StrictRedis(**redis_args)
self.__conn.ping()
self._key = f'queue-{tag}'
self._lock = not overwrite
def __repr__(self):
return f'<TinyQueue as "{self._key}">'
@property
def empty(self) -> bool:
"""
Returns True if the queue is empty
"""
return not self.__conn.exists(self._key)
def push(self, data: str) -> bool:
"""
Add stringified data to the queue
"""
return self.__conn.set(self._key, data, nx=self._lock) or False
def pop(self, delete: bool = True) -> str:
"""
Return stringified data from the queue
'delete' will remove the data from the queue
"""
resp = self.__conn.get(self._key)
if delete:
self.clear()
return resp
def clear(self):
"""
Clears the queue
"""
self.__conn.delete(self._key)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment