sqli_lab.py — Source Code

Back Download
/labs/sqli_lab.py
#!/usr/bin/env python3
"""
SQL Injection Lab
The Hacker's Arsenal - Lesson 5

Practice SQL injection attacks against a SQLite database:
1. Login bypass (authentication bypass)
2. Product search (data extraction)
3. User lookup (UNION attacks)
4. Category filter (error-based injection)

Run this script and open http://localhost:8888 in your browser.

WARNING: This code is INTENTIONALLY VULNERABLE for educational purposes.
"""

from http.server import HTTPServer, BaseHTTPRequestHandler
from urllib.parse import urlparse, parse_qs
import sqlite3
import os

DB_FILE = 'sqli_lab.db'


def init_db():
    if os.path.exists(DB_FILE):
        os.remove(DB_FILE)

    conn = sqlite3.connect(DB_FILE)
    c = conn.cursor()

    # Users table
    c.execute('''CREATE TABLE users (
        id INTEGER PRIMARY KEY,
        username TEXT,
        password TEXT,
        email TEXT,
        role TEXT
    )''')

    users = [
        (1, 'admin', 'sup3rs3cr3t!', 'admin@company.com', 'administrator'),
        (2, 'john', 'john123', 'john@company.com', 'user'),
        (3, 'jane', 'password1', 'jane@company.com', 'user'),
        (4, 'bob', 'bobspassword', 'bob@company.com', 'moderator'),
    ]
    c.executemany('INSERT INTO users VALUES (?,?,?,?,?)', users)

    # Products table
    c.execute('''CREATE TABLE products (
        id INTEGER PRIMARY KEY,
        name TEXT,
        price REAL,
        category TEXT
    )''')

    products = [
        (1, 'Laptop', 999.99, 'Electronics'),
        (2, 'Phone', 599.99, 'Electronics'),
        (3, 'Desk', 199.99, 'Furniture'),
        (4, 'Chair', 149.99, 'Furniture'),
        (5, 'SECRET_FLAG', 0.00, 'hidden'),
    ]
    c.executemany('INSERT INTO products VALUES (?,?,?,?)', products)

    # Secrets table
    c.execute('''CREATE TABLE secrets (
        id INTEGER PRIMARY KEY,
        secret_key TEXT,
        secret_value TEXT
    )''')
    c.execute("INSERT INTO secrets VALUES (1, 'FLAG', 'CONGRATS_YOU_FOUND_THE_SECRET_TABLE!')")

    conn.commit()
    conn.close()
    print("[+] Database initialized with test data")


HTML_TEMPLATE = '''<!DOCTYPE html>
<html>
<head>
    <title>SQLi Lab</title>
    <style>
        body {{ font-family: Arial; max-width: 900px; margin: 50px auto; background: #1a1a1a; color: #e0e0e0; }}
        h1 {{ color: #6EFF24; }}
        h2 {{ color: #6EFF24; font-size: 1.3em; }}
        .section {{ background: #252525; padding: 20px; margin: 20px 0; border-radius: 8px; border-left: 3px solid #6EFF24; }}
        input {{ padding: 10px; margin: 5px; background: #333; border: 1px solid #6EFF24; color: white; }}
        button {{ background: #6EFF24; color: black; padding: 10px 20px; border: none; cursor: pointer; font-weight: bold; }}
        table {{ border-collapse: collapse; width: 100%; margin-top: 10px; }}
        th, td {{ border: 1px solid #444; padding: 10px; text-align: left; }}
        th {{ background: #333; color: #6EFF24; }}
        .error {{ color: #ff4444; background: #331111; padding: 10px; border-radius: 5px; }}
        .success {{ color: #44ff44; background: #113311; padding: 10px; border-radius: 5px; }}
        .hint {{ color: #888; font-size: 0.9em; margin-top: 10px; }}
        code {{ background: #333; padding: 2px 6px; color: #6EFF24; }}
    </style>
</head>
<body>
    <h1>SQL Injection Lab</h1>

    <div class="section">
        <h2>1. Login (Authentication Bypass)</h2>
        <form action="/login" method="POST">
            <input type="text" name="username" placeholder="Username">
            <input type="password" name="password" placeholder="Password">
            <button type="submit">Login</button>
        </form>
        {login_result}
        <p class="hint">Hint: Try to login as admin without knowing the password</p>
    </div>

    <div class="section">
        <h2>2. Product Search (Data Extraction)</h2>
        <form action="/search" method="GET">
            <input type="text" name="q" placeholder="Search products...">
            <button type="submit">Search</button>
        </form>
        {search_result}
        <p class="hint">Hint: What other tables might exist?</p>
    </div>

    <div class="section">
        <h2>3. User Lookup (UNION Attack)</h2>
        <form action="/user" method="GET">
            <input type="text" name="id" placeholder="User ID (1-4)">
            <button type="submit">Lookup</button>
        </form>
        {user_result}
        <p class="hint">Hint: UNION SELECT lets you combine results from different tables</p>
    </div>

    <div class="section">
        <h2>4. Product Category (Error-Based)</h2>
        <form action="/category" method="GET">
            <input type="text" name="cat" placeholder="Category">
            <button type="submit">Filter</button>
        </form>
        {category_result}
        <p class="hint">Hint: Error messages can reveal database structure</p>
    </div>

    <div class="section" style="border-left-color: #ff4444;">
        <h2>Challenge Mode</h2>
        <ol>
            <li>Login as admin without the password</li>
            <li>Extract all usernames and passwords</li>
            <li>Find the hidden "secrets" table</li>
            <li>Extract the FLAG from the secrets table</li>
        </ol>
    </div>
</body>
</html>'''


class SQLiHandler(BaseHTTPRequestHandler):
    def do_GET(self):
        parsed = urlparse(self.path)
        params = parse_qs(parsed.query)

        search_result = ''
        user_result = ''
        category_result = ''

        conn = sqlite3.connect(DB_FILE)
        c = conn.cursor()

        try:
            if parsed.path == '/search':
                q = params.get('q', [''])[0]
                # VULNERABLE: Direct string concatenation
                query = f"SELECT name, price, category FROM products WHERE name LIKE '%{q}%'"
                print(f"[SQL] {query}")
                c.execute(query)
                rows = c.fetchall()

                if rows:
                    search_result = '<table><tr><th>Name</th><th>Price</th><th>Category</th></tr>'
                    for row in rows:
                        search_result += f'<tr><td>{row[0]}</td><td>${row[1]}</td><td>{row[2]}</td></tr>'
                    search_result += '</table>'
                else:
                    search_result = '<p>No products found</p>'

            elif parsed.path == '/user':
                uid = params.get('id', [''])[0]
                # VULNERABLE: Direct string concatenation
                query = f"SELECT id, username, email, role FROM users WHERE id = {uid}"
                print(f"[SQL] {query}")
                c.execute(query)
                rows = c.fetchall()

                if rows:
                    user_result = '<table><tr><th>ID</th><th>Username</th><th>Email</th><th>Role</th></tr>'
                    for row in rows:
                        user_result += f'<tr><td>{row[0]}</td><td>{row[1]}</td><td>{row[2]}</td><td>{row[3]}</td></tr>'
                    user_result += '</table>'
                else:
                    user_result = '<p>User not found</p>'

            elif parsed.path == '/category':
                cat = params.get('cat', [''])[0]
                # VULNERABLE: Direct string concatenation
                query = f"SELECT name, price FROM products WHERE category = '{cat}'"
                print(f"[SQL] {query}")
                c.execute(query)
                rows = c.fetchall()

                if rows:
                    category_result = '<table><tr><th>Name</th><th>Price</th></tr>'
                    for row in rows:
                        category_result += f'<tr><td>{row[0]}</td><td>${row[1]}</td></tr>'
                    category_result += '</table>'
                else:
                    category_result = '<p>No products in this category</p>'

        except Exception as e:
            error_msg = str(e)
            if 'search' in parsed.path:
                search_result = f'<p class="error">Error: {error_msg}</p>'
            elif 'user' in parsed.path:
                user_result = f'<p class="error">Error: {error_msg}</p>'
            elif 'category' in parsed.path:
                category_result = f'<p class="error">Error: {error_msg}</p>'

        conn.close()

        self.send_response(200)
        self.send_header('Content-type', 'text/html')
        self.end_headers()

        page = HTML_TEMPLATE.format(
            login_result='',
            search_result=search_result,
            user_result=user_result,
            category_result=category_result
        )
        self.wfile.write(page.encode())

    def do_POST(self):
        parsed = urlparse(self.path)
        content_length = int(self.headers['Content-Length'])
        post_data = self.rfile.read(content_length).decode()
        params = parse_qs(post_data)

        login_result = ''

        if parsed.path == '/login':
            username = params.get('username', [''])[0]
            password = params.get('password', [''])[0]

            conn = sqlite3.connect(DB_FILE)
            c = conn.cursor()

            try:
                # VULNERABLE: Direct string concatenation
                query = f"SELECT * FROM users WHERE username = '{username}' AND password = '{password}'"
                print(f"[SQL] {query}")
                c.execute(query)
                user = c.fetchone()

                if user:
                    login_result = f'<p class="success">Welcome, {user[1]}! Role: {user[4]}</p>'
                else:
                    login_result = '<p class="error">Invalid credentials</p>'

            except Exception as e:
                login_result = f'<p class="error">Error: {str(e)}</p>'

            conn.close()

        self.send_response(200)
        self.send_header('Content-type', 'text/html')
        self.end_headers()

        page = HTML_TEMPLATE.format(
            login_result=login_result,
            search_result='',
            user_result='',
            category_result=''
        )
        self.wfile.write(page.encode())

    def log_message(self, format, *args):
        pass


if __name__ == '__main__':
    init_db()
    server = HTTPServer(('localhost', 8888), SQLiHandler)
    print("=" * 50)
    print("  SQL INJECTION LAB RUNNING")
    print("  Open http://localhost:8888 in your browser")
    print("  SQL queries are logged to terminal")
    print("=" * 50)
    server.serve_forever()