What is Redis?

Redis (Remote Dictionary Server) is an in-memory data structure store. It keeps data in RAM, making it incredibly fast - operations complete in microseconds rather than milliseconds.

Think of Redis like a super-fast notepad that your application can use to quickly store and retrieve information without going to the slower database every time.

Why Use Redis?

  • Speed: Reads/writes in microseconds (1000x faster than disk)
  • Caching: Store frequently accessed data to reduce database load
  • Sessions: Store user session data for web apps
  • Real-time: Pub/sub messaging, leaderboards, counters
  • Rate limiting: Control API request rates
  • Queues: Simple job queues (with Celery)

Getting Started

# Install Redis
# Mac: brew install redis
# Ubuntu: sudo apt install redis-server
# Windows: Use Docker or WSL

# Start Redis
redis-server

# Or with Docker
docker run -d -p 6379:6379 redis:alpine

# Install Python client
pip install redis

# Connect from Python
import redis

r = redis.Redis(host='localhost', port=6379, db=0)

# Test connection
r.ping()  # Returns True if connected

Basic Operations

import redis

r = redis.Redis(host='localhost', port=6379, decode_responses=True)

# Strings (most common)
r.set('name', 'John')           # Set a value
r.get('name')                    # Get a value: 'John'
r.set('count', 10)
r.incr('count')                  # Increment: 11
r.decr('count')                  # Decrement: 10
r.incrby('count', 5)             # Increment by 5: 15

# Set with expiration
r.setex('token', 3600, 'abc123')  # Expires in 1 hour
r.set('temp', 'value', ex=60)     # Alternative syntax
r.ttl('token')                    # Check time to live

# Check if key exists
r.exists('name')                  # Returns 1 if exists

# Delete keys
r.delete('name')
r.delete('key1', 'key2', 'key3')  # Delete multiple

# Lists
r.lpush('queue', 'task1')         # Push to front
r.rpush('queue', 'task2')         # Push to back
r.lpop('queue')                   # Pop from front
r.rpop('queue')                   # Pop from back
r.lrange('queue', 0, -1)          # Get all items

# Sets (unique values)
r.sadd('tags', 'python', 'redis', 'web')
r.smembers('tags')                # Get all members
r.sismember('tags', 'python')     # Check membership

# Hashes (objects/dictionaries)
r.hset('user:1', 'name', 'John')
r.hset('user:1', 'email', 'john@example.com')
r.hget('user:1', 'name')          # Get single field
r.hgetall('user:1')               # Get all fields

# Sorted sets (with scores)
r.zadd('leaderboard', {'player1': 100, 'player2': 85})
r.zrange('leaderboard', 0, -1, withscores=True)
r.zincrby('leaderboard', 10, 'player1')  # Add to score

Caching Pattern

The most common use of Redis - cache expensive operations:

import redis
import json

r = redis.Redis(host='localhost', port=6379, decode_responses=True)

def get_user_data(user_id: int):
    # Check cache first
    cache_key = f"user:{user_id}"
    cached = r.get(cache_key)

    if cached:
        print("Cache hit!")
        return json.loads(cached)

    # Cache miss - get from database
    print("Cache miss - querying database")
    user = db.query(User).filter(User.id == user_id).first()

    if user:
        # Store in cache for 1 hour
        r.setex(cache_key, 3600, json.dumps(user.to_dict()))

    return user.to_dict()

# Invalidate cache when data changes
def update_user(user_id: int, data: dict):
    db.query(User).filter(User.id == user_id).update(data)
    db.commit()

    # Delete cached data
    r.delete(f"user:{user_id}")

Django with Redis

# Install
pip install django-redis

# settings.py
CACHES = {
    'default': {
        'BACKEND': 'django_redis.cache.RedisCache',
        'LOCATION': 'redis://localhost:6379/1',
        'OPTIONS': {
            'CLIENT_CLASS': 'django_redis.client.DefaultClient',
        }
    }
}

# Session storage
SESSION_ENGINE = 'django.contrib.sessions.backends.cache'
SESSION_CACHE_ALIAS = 'default'

# Using cache in views
from django.core.cache import cache

def my_view(request):
    # Try to get from cache
    data = cache.get('my_data')

    if data is None:
        data = expensive_operation()
        cache.set('my_data', data, timeout=3600)

    return JsonResponse(data)

# Cache decorator
from django.views.decorators.cache import cache_page

@cache_page(60 * 15)  # Cache for 15 minutes
def my_view(request):
    return render(request, 'template.html')

FastAPI with Redis

from fastapi import FastAPI, Depends
import redis
import json

app = FastAPI()

# Connection pool for better performance
pool = redis.ConnectionPool(host='localhost', port=6379, db=0)

def get_redis():
    return redis.Redis(connection_pool=pool, decode_responses=True)

@app.get("/users/{user_id}")
async def get_user(user_id: int, r: redis.Redis = Depends(get_redis)):
    cache_key = f"user:{user_id}"

    # Try cache
    cached = r.get(cache_key)
    if cached:
        return json.loads(cached)

    # Get from database
    user = await fetch_user_from_db(user_id)

    # Cache for 1 hour
    r.setex(cache_key, 3600, json.dumps(user))
    return user

Rate Limiting

import redis
from datetime import datetime

r = redis.Redis(host='localhost', port=6379)

def is_rate_limited(user_id: str, limit: int = 100, window: int = 60) -> bool:
    """
    Allow 'limit' requests per 'window' seconds
    """
    key = f"rate_limit:{user_id}:{datetime.now().minute}"

    current = r.incr(key)

    if current == 1:
        r.expire(key, window)

    return current > limit

# Usage
user_id = "user123"

if is_rate_limited(user_id, limit=10, window=60):
    return {"error": "Rate limit exceeded. Try again later."}

# Process request...

Pub/Sub (Real-time Messaging)

import redis

r = redis.Redis(host='localhost', port=6379)

# Publisher
def publish_message(channel, message):
    r.publish(channel, message)

publish_message('notifications', 'New order received!')

# Subscriber (run in separate process)
def subscribe_to_channel(channel):
    pubsub = r.pubsub()
    pubsub.subscribe(channel)

    for message in pubsub.listen():
        if message['type'] == 'message':
            print(f"Received: {message['data']}")

# Use case: Real-time notifications, chat, live updates

Best Practices

  • Use connection pools: Reuse connections for better performance
  • Set TTL on keys: Prevent memory from filling up
  • Use namespaced keys: user:123:profile not just profile
  • Monitor memory: Redis is in-memory, watch usage
  • Use pipelines: Batch multiple commands for speed
  • Don't cache everything: Only cache expensive operations
# Pipeline for multiple operations
pipe = r.pipeline()
pipe.set('key1', 'value1')
pipe.set('key2', 'value2')
pipe.incr('counter')
results = pipe.execute()  # All at once

Master Redis with Expert Mentorship

Our Full Stack Python program covers Redis for caching and real-time features. Learn to build high-performance applications with personalized guidance.

Explore Full Stack Python Program

Related Articles