5. Base de Datos

5. Base de Datos

Esta sección cubre la arquitectura de base de datos del sistema DTEM, incluyendo diseño, optimización, backup, monitoreo y mejores prácticas de gestión de datos.

5.1. Arquitectura de Base de Datos

5.1.1. Arquitectura General

Base de Datos Principal (PostgreSQL)

┌─────────────────────────────────────────────────────────────┐
│                    POSTGRESQL CLUSTER                       │
├─────────────────────────────────────────────────────────────┤
│  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐        │
│  │   Primary   │  │   Replica 1 │  │   Replica 2 │        │
│  │   (Master)  │  │  (Read-only)│  │  (Read-only)│        │
│  │             │  │             │  │             │        │
│  │ - Read/Write│  │ - Read only │  │ - Read only │        │
│  │ - Leader    │  │ - Standby   │  │ - Standby   │        │
│  └─────────────┘  └─────────────┘  └─────────────┘        │
│         │                │                │               │
│         └────────────────┼────────────────┘               │
│                          │                                 │
│              ┌─────────────────────┐                       │
│              │   Connection Pool    │                       │
│              │   (PgBouncer)        │                       │
│              └─────────────────────┘                       │
└─────────────────────────────────────────────────────────────┘

Cache Layer (Redis)

┌─────────────────────────────────────────────────────────────┐
│                      REDIS CLUSTER                          │
├─────────────────────────────────────────────────────────────┤
│  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐        │
│  │   Master 1  │  │   Master 2  │  │   Master 3  │        │
│  │             │  │             │  │             │        │
│  │ - Sessions  │  │ - Cache     │  │ - Queue     │        │
│  │ - User Data │  │ - DTE Cache │  │ - Jobs      │        │
│  └─────────────┘  └─────────────┘  └─────────────┘        │
│         │                │                │               │
│  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐        │
│  │  Replica 1  │  │  Replica 2  │  │  Replica 3  │        │
│  │             │  │             │  │             │        │
│  │ - Failover  │  │ - Failover  │  │ - Failover  │        │
│  └─────────────┘  └─────────────┘  └─────────────┘        │
└─────────────────────────────────────────────────────────────┘

5.1.2. Esquema de Base de Datos

Tablas Principales

-- Users and Authentication
CREATE TABLE users (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    email VARCHAR(255) UNIQUE NOT NULL,
    password_hash VARCHAR(255) NOT NULL,
    first_name VARCHAR(100),
    last_name VARCHAR(100),
    rut VARCHAR(12) UNIQUE,
    phone VARCHAR(20),
    status VARCHAR(20) DEFAULT 'active',
    email_verified BOOLEAN DEFAULT false,
    mfa_enabled BOOLEAN DEFAULT false,
    mfa_secret VARCHAR(32),
    created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
    updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
    last_login TIMESTAMP WITH TIME ZONE
);

-- Profiles and Permissions
CREATE TABLE profiles (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    name VARCHAR(100) NOT NULL,
    description TEXT,
    permissions JSONB NOT NULL DEFAULT '[]',
    created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
    updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);

CREATE TABLE user_profiles (
    user_id UUID REFERENCES users(id) ON DELETE CASCADE,
    profile_id UUID REFERENCES profiles(id) ON DELETE CASCADE,
    assigned_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
    assigned_by UUID REFERENCES users(id),
    PRIMARY KEY (user_id, profile_id)
);

-- Companies
CREATE TABLE companies (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    rut VARCHAR(12) UNIQUE NOT NULL,
    business_name VARCHAR(255) NOT NULL,
    commercial_name VARCHAR(255),
    activity_code VARCHAR(10),
    address TEXT,
    city VARCHAR(100),
    region VARCHAR(100),
    country VARCHAR(50) DEFAULT 'Chile',
    phone VARCHAR(20),
    email VARCHAR(255),
    status VARCHAR(20) DEFAULT 'active',
    created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
    updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);

-- CAF (Códigos de Autorización de Folios)
CREATE TABLE cafs (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    company_id UUID REFERENCES companies(id) ON DELETE CASCADE,
    document_type VARCHAR(10) NOT NULL,
    start_folio INTEGER NOT NULL,
    end_folio INTEGER NOT NULL,
    current_folio INTEGER NOT NULL DEFAULT 0,
    available_folios INTEGER GENERATED ALWAYS AS (end_folio - current_folio) STORED,
    xml_content TEXT NOT NULL,
    signature_data JSONB,
    status VARCHAR(20) DEFAULT 'active',
    uploaded_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
    expires_at TIMESTAMP WITH TIME ZONE,
    created_by UUID REFERENCES users(id)
);

-- DTE Documents
CREATE TABLE dte_documents (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    company_id UUID REFERENCES companies(id) ON DELETE CASCADE,
    document_type VARCHAR(10) NOT NULL,
    folio INTEGER NOT NULL,
    rut_emisor VARCHAR(12) NOT NULL,
    rut_receptor VARCHAR(12) NOT NULL,
    fecha_emision DATE NOT NULL,
    monto_total NUMERIC(15,2),
    monto_exento NUMERIC(15,2) DEFAULT 0,
    monto_iva NUMERIC(15,2) DEFAULT 0,
    xml_content TEXT NOT NULL,
    pdf_path VARCHAR(500),
    signature_data JSONB,
    status VARCHAR(20) DEFAULT 'draft',
    sii_status VARCHAR(20),
    sii_track_id VARCHAR(50),
    sent_at TIMESTAMP WITH TIME ZONE,
    sii_response_at TIMESTAMP WITH TIME ZONE,
    created_by UUID REFERENCES users(id),
    created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
    updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
    
    UNIQUE(company_id, document_type, folio)
);

-- DTE Items
CREATE TABLE dte_items (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    dte_id UUID REFERENCES dte_documents(id) ON DELETE CASCADE,
    line_number INTEGER NOT NULL,
    sku VARCHAR(50),
    description TEXT NOT NULL,
    quantity NUMERIC(12,4) NOT NULL,
    unit_price NUMERIC(15,6) NOT NULL,
    discount_pct NUMERIC(5,2) DEFAULT 0,
    total_amount NUMERIC(15,2) NOT NULL,
    vat_rate NUMERIC(5,2) DEFAULT 19.00,
    created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);

-- Audit Logs
CREATE TABLE audit_logs (
    id BIGSERIAL PRIMARY KEY,
    timestamp TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
    user_id UUID REFERENCES users(id),
    company_id UUID REFERENCES companies(id),
    action VARCHAR(100) NOT NULL,
    resource_type VARCHAR(50) NOT NULL,
    resource_id VARCHAR(100),
    old_values JSONB,
    new_values JSONB,
    ip_address INET,
    user_agent TEXT,
    session_id UUID,
    success BOOLEAN NOT NULL DEFAULT true,
    error_message TEXT,
    metadata JSONB
);

-- System Configuration
CREATE TABLE system_config (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    key VARCHAR(100) UNIQUE NOT NULL,
    value JSONB NOT NULL,
    description TEXT,
    category VARCHAR(50),
    is_encrypted BOOLEAN DEFAULT false,
    created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
    updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
    updated_by UUID REFERENCES users(id)
);

Índices Optimizados

-- Performance Indexes
CREATE INDEX idx_users_email ON users(email);
CREATE INDEX idx_users_status ON users(status);
CREATE INDEX idx_users_rut ON users(rut);

CREATE INDEX idx_companies_rut ON companies(rut);
CREATE INDEX idx_companies_status ON companies(status);

CREATE INDEX idx_cafs_company_type ON cafs(company_id, document_type);
CREATE INDEX idx_cafs_status ON cafs(status);
CREATE INDEX idx_cafs_expires ON cafs(expires_at);

CREATE INDEX idx_dte_company_type ON dte_documents(company_id, document_type);
CREATE INDEX idx_dte_status ON dte_documents(status);
CREATE INDEX idx_dte_emision ON dte_documents(fecha_emision);
CREATE INDEX idx_dte_receptor ON dte_documents(rut_receptor);
CREATE INDEX idx_dte_sii_status ON dte_documents(sii_status);

CREATE INDEX idx_audit_timestamp ON audit_logs(timestamp DESC);
CREATE INDEX idx_audit_user ON audit_logs(user_id);
CREATE INDEX idx_audit_action ON audit_logs(action);
CREATE INDEX idx_audit_resource ON audit_logs(resource_type, resource_id);

-- Full Text Search Indexes
CREATE INDEX idx_dte_search ON dte_documents USING gin(
    to_tsvector('spanish', 
        COALESCE(rut_receptor, '') || ' ' || 
        COALESCE(folio::text, '') || ' ' ||
        COALESCE(document_type, '')
    )
);

5.2. Optimización de Rendimiento

5.2.1. Query Optimization

Query Analysis y Tuning

-- Slow Query Analysis
SELECT 
    query,
    calls,
    total_time,
    mean_time,
    rows,
    100.0 * shared_blks_hit / nullif(shared_blks_hit + shared_blks_read, 0) AS hit_percent
FROM pg_stat_statements
ORDER BY total_time DESC
LIMIT 10;

-- Missing Indexes Detection
SELECT 
    schemaname,
    tablename,
    attname,
    n_distinct,
    correlation
FROM pg_stats
WHERE schemaname = 'public'
AND tablename IN ('dte_documents', 'cafs', 'users')
AND n_distinct > 100
ORDER BY n_distinct DESC;

-- Index Usage Analysis
SELECT 
    schemaname,
    tablename,
    indexname,
    idx_scan,
    idx_tup_read,
    idx_tup_fetch
FROM pg_stat_user_indexes
ORDER BY idx_scan DESC;

Optimized Queries

-- Efficient DTE Search with Pagination
WITH filtered_dtes AS (
    SELECT 
        id,
        document_type,
        folio,
        rut_receptor,
        fecha_emision,
        monto_total,
        status,
        created_at
    FROM dte_documents 
    WHERE company_id = $1
        AND ($2::text IS NULL OR document_type = $2)
        AND ($3::date IS NULL OR fecha_emision >= $3)
        AND ($4::date IS NULL OR fecha_emision <= $4)
        AND ($5::text IS NULL OR rut_receptor ILIKE '%' || $5 || '%')
    ORDER BY created_at DESC
)
SELECT 
    *,
    COUNT(*) OVER() as total_count
FROM filtered_dtes
LIMIT $6 OFFSET $7;

-- Optimized CAF Availability Check
SELECT 
    id,
    start_folio,
    end_folio,
    current_folio,
    available_folios
FROM cafs 
WHERE company_id = $1 
    AND document_type = $2 
    AND status = 'active'
    AND available_folios > 0
    AND (expires_at IS NULL OR expires_at > NOW())
ORDER BY created_at ASC
LIMIT 1 FOR UPDATE;

-- Efficient Audit Log Query
SELECT 
    timestamp,
    user_id,
    action,
    resource_type,
    resource_id,
    success
FROM audit_logs 
WHERE timestamp >= NOW() - INTERVAL '24 hours'
    AND ($1::uuid IS NULL OR user_id = $1)
    AND ($2::text IS NULL OR action = $2)
ORDER BY timestamp DESC
LIMIT 100;

5.2.2. Connection Pooling

PgBouncer Configuration

# pgbouncer.ini
[databases]
dtem_prod = host=postgres.internal port=5432 dbname=dtem_prod

[pgbouncer]
listen_port = 6432
listen_addr = 0.0.0.0
auth_type = md5
auth_file = /etc/pgbouncer/userlist.txt
logfile = /var/log/pgbouncer/pgbouncer.log
pidfile = /var/run/pgbouncer/pgbouncer.pid
admin_users = postgres
stats_users = stats, postgres

# Pool settings
pool_mode = transaction
max_client_conn = 200
default_pool_size = 20
min_pool_size = 5
reserve_pool_size = 5
reserve_pool_timeout = 5
max_db_connections = 50
max_user_connections = 50

# Timeouts
server_reset_query = DISCARD ALL
server_check_delay = 30
server_check_query = select 1
server_lifetime = 3600
server_idle_timeout = 600

# Logging
log_connections = 1
log_disconnections = 1
log_pooler_errors = 1

Application Connection Pool

// PostgreSQL Connection Pool Configuration
const { Pool } = require('pg');

const pool = new Pool({
    host: process.env.DB_HOST,
    port: process.env.DB_PORT,
    database: process.env.DB_NAME,
    user: process.env.DB_USER,
    password: process.env.DB_PASSWORD,
    
    // Pool configuration
    max: 20, // Maximum number of clients in the pool
    min: 5,  // Minimum number of clients in the pool
    idleTimeoutMillis: 30000, // How long a client is allowed to remain idle
    connectionTimeoutMillis: 2000, // How long to wait for connection
    
    // SSL configuration
    ssl: process.env.NODE_ENV === 'production' ? {
        rejectUnauthorized: true,
        ca: fs.readFileSync('/app/certs/ca.crt').toString()
    } : false
});

// Connection monitoring
pool.on('connect', (client) => {
    console.log('New connection established');
});

pool.on('error', (err, client) => {
    console.error('Unexpected error on idle client', err);
});

// Query with timeout
async function queryWithTimeout(text, params, timeout = 5000) {
    const start = Date.now();
    
    try {
        const result = await pool.query(text, params);
        const duration = Date.now() - start;
        
        if (duration > timeout) {
            console.warn(`Slow query detected: ${duration}ms`);
        }
        
        return result;
    } catch (error) {
        console.error('Query error:', error);
        throw error;
    }
}

5.3. Backup y Recovery

5.3.1. Backup Strategy

Automated Backup Scripts

#!/bin/bash
# backup-database.sh

set -euo pipefail

# Configuration
DB_HOST="localhost"
DB_PORT="5432"
DB_NAME="dtem_prod"
DB_USER="postgres"
BACKUP_DIR="/backups/postgresql"
RETENTION_DAYS=30
S3_BUCKET="s3://dtem-backups/database"

# Create backup directory
mkdir -p "$BACKUP_DIR"
TIMESTAMP=$(date +%Y%m%d_%H%M%S)
BACKUP_FILE="$BACKUP_DIR/dtem_backup_$TIMESTAMP.sql"

# Perform backup
echo "Starting database backup..."
pg_dump -h "$DB_HOST" -p "$DB_PORT" -U "$DB_USER" -d "$DB_NAME" \
    --verbose \
    --clean \
    --if-exists \
    --format=custom \
    --compress=9 \
    --file="$BACKUP_FILE"

# Verify backup
echo "Verifying backup integrity..."
pg_restore --list "$BACKUP_FILE" > /dev/null

# Compress backup
gzip "$BACKUP_FILE"
BACKUP_FILE="${BACKUP_FILE}.gz"

# Upload to S3
echo "Uploading backup to S3..."
aws s3 cp "$BACKUP_FILE" "$S3_BUCKET/$(basename "$BACKUP_FILE")"

# Clean old backups
echo "Cleaning old backups..."
find "$BACKUP_DIR" -name "*.gz" -mtime +$RETENTION_DAYS -delete

# Clean S3 old backups
aws s3 ls "$S3_BUCKET/" | while read -r line; do
    createDate=$(echo "$line" | awk '{print $1" "$2}')
    createDate=$(date -d "$createDate" +%s)
    olderThan=$(date -d "$RETENTION_DAYS days ago" +%s)
    
    if [[ $createDate -lt $olderThan ]]; then
        fileName=$(echo "$line" | awk '{print $4}')
        if [[ $fileName != "" ]]; then
            aws s3 rm "$S3_BUCKET/$fileName"
        fi
    fi
done

echo "Backup completed successfully: $BACKUP_FILE"

Point-in-Time Recovery (PITR)

#!/bin/bash
# pitr-restore.sh

set -euo pipefail

# Configuration
TARGET_TIME="$1"  # Format: "2023-12-01 15:30:00"
RECOVERY_DIR="/tmp/recovery"
DB_NAME="dtem_prod"
DB_USER="postgres"

# Create recovery directory
mkdir -p "$RECOVERY_DIR"

# Extract base backup
echo "Extracting base backup..."
aws s3 cp "s3://dtem-backups/database/latest_base.tar.gz" - | \
    tar -xzf - -C "$RECOVERY_DIR"

# Generate recovery.conf
cat > "$RECOVERY_DIR/recovery.conf" << EOF
restore_command = 'aws s3 cp s3://dtem-backups/wal/%f %p'
recovery_target_time = '$TARGET_TIME'
recovery_target_inclusive = true
EOF

# Start PostgreSQL in recovery mode
echo "Starting PostgreSQL recovery..."
export PGDATA="$RECOVERY_DIR"
pg_ctl start -D "$RECOVERY_DIR" -l "$RECOVERY_DIR/recovery.log"

# Wait for recovery to complete
while ! pg_isready -q; do
    echo "Waiting for recovery to complete..."
    sleep 5
done

# Promote to primary
echo "Promoting to primary..."
pg_ctl promote -D "$RECOVERY_DIR"

echo "Recovery completed successfully"

5.3.2. Backup Verification

Automated Backup Testing

#!/bin/bash
# test-backup.sh

set -euo pipefail

# Configuration
TEST_DB="dtem_test_restore"
BACKUP_FILE="$1"
TEMP_DIR="/tmp/backup_test_$$"

# Create temporary directory
mkdir -p "$TEMP_DIR"

# Download latest backup
echo "Downloading backup..."
aws s3 cp "s3://dtem-backups/database/latest/" "$TEMP_DIR/" --recursive

# Find latest backup file
LATEST_BACKUP=$(ls -t "$TEMP_DIR"/*.gz | head -n1)

# Create test database
echo "Creating test database..."
createdb -U postgres "$TEST_DB"

# Restore backup
echo "Restoring backup..."
gunzip -c "$LATEST_BACKUP" | psql -U postgres -d "$TEST_DB"

# Verify data integrity
echo "Verifying data integrity..."
PSQL_RESULT=$(psql -U postgres -d "$TEST_DB" -t -c "
    SELECT 
        (SELECT COUNT(*) FROM users) as users_count,
        (SELECT COUNT(*) FROM companies) as companies_count,
        (SELECT COUNT(*) FROM dte_documents) as dte_count,
        (SELECT COUNT(*) FROM cafs) as cafs_count;
")

echo "Data verification results:"
echo "$PSQL_RESULT"

# Check for data consistency
CONSISTENCY_CHECK=$(psql -U postgres -d "$TEST_DB" -t -c "
    SELECT 
        CASE 
            WHEN COUNT(*) = 0 THEN 'PASS'
            ELSE 'FAIL'
        END as consistency_status
    FROM dte_documents 
    WHERE folio IS NULL OR company_id IS NULL;
")

if [[ "$CONSISTENCY_CHECK" == *"PASS"* ]]; then
    echo "✅ Backup verification PASSED"
    EXIT_CODE=0
else
    echo "❌ Backup verification FAILED"
    EXIT_CODE=1
fi

# Cleanup
echo "Cleaning up..."
dropdb -U postgres "$TEST_DB"
rm -rf "$TEMP_DIR"

exit $EXIT_CODE

5.4. Monitoreo de Base de Datos

5.4.1. Performance Monitoring

PostgreSQL Metrics Collection

-- Performance monitoring queries

-- Connection monitoring
SELECT 
    state,
    COUNT(*) as connection_count,
    AVG(EXTRACT(EPOCH FROM (now() - query_start))) as avg_query_time
FROM pg_stat_activity 
WHERE datname = current_database()
GROUP BY state;

-- Table size monitoring
SELECT 
    schemaname,
    tablename,
    pg_size_pretty(pg_total_relation_size(schemaname||'.'||tablename)) as size,
    pg_total_relation_size(schemaname||'.'||tablename) as size_bytes
FROM pg_tables 
WHERE schemaname = 'public'
ORDER BY size_bytes DESC;

-- Index usage monitoring
SELECT 
    schemaname,
    tablename,
    indexname,
    idx_scan,
    idx_tup_read,
    idx_tup_fetch,
    pg_size_pretty(pg_relation_size(indexrelid)) as index_size
FROM pg_stat_user_indexes
ORDER BY idx_scan DESC;

-- Lock monitoring
SELECT 
    blocked_locks.pid AS blocked_pid,
    blocked_activity.usename AS blocked_user,
    blocking_locks.pid AS blocking_pid,
    blocking_activity.usename AS blocking_user,
    blocked_activity.query AS blocked_statement,
    blocking_activity.query AS current_statement_in_blocking_process
FROM pg_catalog.pg_locks blocked_locks
JOIN pg_catalog.pg_stat_activity blocked_activity ON blocked_activity.pid = blocked_locks.pid
JOIN pg_catalog.pg_locks blocking_locks ON blocking_locks.locktype = blocked_locks.locktype
JOIN pg_catalog.pg_stat_activity blocking_activity ON blocking_activity.pid = blocking_locks.pid
WHERE NOT blocked_locks.granted;

Prometheus Metrics Exporter

# postgres_exporter configuration
apiVersion: v1
kind: ConfigMap
metadata:
  name: postgres-exporter-config
  namespace: monitoring
data:
  queries.yml: |
    pg_stat_user_tables:
      query: |
        SELECT 
          schemaname,
          tablename,
          seq_scan,
          seq_tup_read,
          idx_scan,
          idx_tup_fetch,
          n_tup_ins,
          n_tup_upd,
          n_tup_del,
          n_live_tup,
          n_dead_tup
        FROM pg_stat_user_tables
      metrics:
        - schemaname:
            usage: "LABEL"
            description: "Schema name"
        - tablename:
            usage: "LABEL"
            description: "Table name"
        - seq_scan:
            usage: "GAUGE"
            description: "Number of sequential scans"
        - idx_scan:
            usage: "GAUGE"
            description: "Number of index scans"
        - n_live_tup:
            usage: "GAUGE"
            description: "Number of live tuples"
        - n_dead_tup:
            usage: "GAUGE"
            description: "Number of dead tuples"

5.4.2. Alerting Rules

Database Alerting

# database-alerts.yml
groups:
  - name: postgresql
    rules:
      - alert: PostgreSQLDown
        expr: up{job="postgres"} == 0
        for: 1m
        labels:
          severity: critical
        annotations:
          summary: "PostgreSQL is down"
          description: "PostgreSQL has been down for more than 1 minute"

      - alert: PostgreSQLTooManyConnections
        expr: pg_stat_activity_count > 80
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "Too many PostgreSQL connections"
          description: "PostgreSQL has more than 80 connections"

      - alert: PostgreSQLSlowQueries
        expr: pg_stat_statements_mean_time_seconds > 1
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "PostgreSQL slow queries detected"
          description: "PostgreSQL average query time is {{ $value }} seconds"

      - alert: PostgreSQLHighDiskUsage
        expr: pg_database_size_bytes / 1024 / 1024 / 1024 > 100
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "PostgreSQL high disk usage"
          description: "PostgreSQL database size is {{ $value }}GB"

      - alert: PostgreSQLReplicationLag
        expr: pg_replication_lag_seconds > 60
        for: 2m
        labels:
          severity: warning
        annotations:
          summary: "PostgreSQL replication lag"
          description: "PostgreSQL replication lag is {{ $value }} seconds"

5.5. Mantenimiento de Base de Datos

5.5.1. Routine Maintenance

Automated Maintenance Scripts

#!/bin/bash
# db-maintenance.sh

set -euo pipefail

DB_NAME="dtem_prod"
DB_USER="postgres"

echo "Starting database maintenance..."

# Update table statistics
echo "Updating table statistics..."
psql -U "$DB_USER" -d "$DB_NAME" -c "ANALYZE;"

# Reindex fragmented indexes
echo "Reindexing fragmented indexes..."
psql -U "$DB_USER" -d "$DB_NAME" -c "
    SELECT 
        schemaname,
        tablename,
        indexname
    FROM pg_stat_user_indexes 
    WHERE idx_scan > 1000 
        AND pg_size_pretty(pg_relation_size(indexrelid))::numeric > '100 MB'
        AND (schemaname, tablename) IN (
            SELECT schemaname, tablename 
            FROM pg_stat_user_tables 
            WHERE n_dead_tup > n_live_tup * 0.1
        );
" | while read -r schema table index; do
    echo "Reindexing $schema.$table.$index"
    psql -U "$DB_USER" -d "$DB_NAME" -c "REINDEX INDEX CONCURRENTLY $schema.$index;"
done

# Vacuum and analyze tables
echo "Vacuuming and analyzing tables..."
psql -U "$DB_USER" -d "$DB_NAME" -c "
    SELECT 
        schemaname,
        tablename
    FROM pg_stat_user_tables 
    WHERE n_dead_tup > 1000 
        OR n_mod_since_analyze > 1000;
" | while read -r schema table; do
    echo "Vacuuming $schema.$table"
    psql -U "$DB_USER" -d "$DB_NAME" -c "VACUUM ANALYZE $schema.$table;"
done

# Clean up old audit logs
echo "Cleaning up old audit logs..."
psql -U "$DB_USER" -d "$DB_NAME" -c "
    DELETE FROM audit_logs 
    WHERE timestamp < NOW() - INTERVAL '2 years';
"

# Update table statistics again
echo "Final statistics update..."
psql -U "$DB_USER" -d "$DB_NAME" -c "ANALYZE;"

echo "Database maintenance completed successfully"

5.5.2. Partitioning Strategy

Time-based Partitioning

-- Partitioned audit_logs table
CREATE TABLE audit_logs_partitioned (
    LIKE audit_logs INCLUDING ALL
) PARTITION BY RANGE (timestamp);

-- Create monthly partitions
CREATE TABLE audit_logs_2023_01 PARTITION OF audit_logs_partitioned
    FOR VALUES FROM ('2023-01-01') TO ('2023-02-01');

CREATE TABLE audit_logs_2023_02 PARTITION OF audit_logs_partitioned
    FOR VALUES FROM ('2023-02-01') TO ('2023-03-01');

-- Automated partition creation function
CREATE OR REPLACE FUNCTION create_monthly_partition(table_name text, start_date date)
RETURNS void AS $$
DECLARE
    partition_name text;
    end_date date;
BEGIN
    partition_name := table_name || '_' || to_char(start_date, 'YYYY_MM');
    end_date := start_date + interval '1 month';
    
    EXECUTE format('CREATE TABLE IF NOT EXISTS %I PARTITION OF %I
                    FOR VALUES FROM (%L) TO (%L)',
                   partition_name, table_name, start_date, end_date);
    
    EXECUTE format('CREATE INDEX IF NOT EXISTS %I_timestamp ON %I (timestamp)',
                   partition_name, partition_name);
END;
$$ LANGUAGE plpgsql;

-- Automated partition management
CREATE OR REPLACE FUNCTION manage_partitions()
RETURNS void AS $$
DECLARE
    current_month date;
    next_month date;
    old_partition text;
BEGIN
    current_month := date_trunc('month', CURRENT_DATE);
    next_month := current_month + interval '1 month';
    
    -- Create next month's partition
    PERFORM create_monthly_partition('audit_logs_partitioned', next_month);
    
    -- Drop partitions older than 2 years
    FOR old_partition IN 
        SELECT tablename 
        FROM pg_tables 
        WHERE tablename LIKE 'audit_logs_%' 
          AND tablename < 'audit_logs_' || to_char(CURRENT_DATE - INTERVAL '2 years', 'YYYY_MM')
    LOOP
        EXECUTE 'DROP TABLE IF EXISTS ' || old_partition;
    END LOOP;
END;
$$ LANGUAGE plpgsql;

5.6. Data Migration

5.6.1. Schema Migration

Version-controlled Migrations

// migration-manager.js
class MigrationManager {
    constructor(pool) {
        this.pool = pool;
    }
    
    async initialize() {
        await this.pool.query(`
            CREATE TABLE IF NOT EXISTS schema_migrations (
                version VARCHAR(255) PRIMARY KEY,
                applied_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
            );
        `);
    }
    
    async migrate() {
        const migrations = await this.loadMigrations();
        const applied = await this.getAppliedMigrations();
        
        for (const migration of migrations) {
            if (!applied.includes(migration.version)) {
                console.log(`Applying migration: ${migration.version}`);
                await this.applyMigration(migration);
            }
        }
    }
    
    async loadMigrations() {
        const fs = require('fs');
        const path = require('path');
        const migrationsDir = path.join(__dirname, 'migrations');
        
        const files = fs.readdirSync(migrationsDir)
            .filter(f => f.endsWith('.sql'))
            .sort();
            
        return files.map(file => ({
            version: file.replace('.sql', ''),
            filename: file,
            sql: fs.readFileSync(path.join(migrationsDir, file), 'utf8')
        }));
    }
    
    async getAppliedMigrations() {
        const result = await this.pool.query(
            'SELECT version FROM schema_migrations ORDER BY version'
        );
        return result.rows.map(row => row.version);
    }
    
    async applyMigration(migration) {
        const client = await this.pool.connect();
        
        try {
            await client.query('BEGIN');
            await client.query(migration.sql);
            await client.query(
                'INSERT INTO schema_migrations (version) VALUES ($1)',
                [migration.version]
            );
            await client.query('COMMIT');
        } catch (error) {
            await client.query('ROLLBACK');
            throw error;
        } finally {
            client.release();
        }
    }
}

Próxima sección: 6. Integraciones Técnicas