Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

PostgreSQL Driver

The qail-pg crate provides a native PostgreSQL driver with AST-native wire protocol encoding. It communicates directly with Postgres at the wire level — no libpq, no ORM, and no app-side SQL interpolation on the AST path.

Features

  • AST-Native — Direct AST to wire protocol on the primary query path
  • Zero-Alloc — Reusable buffers, no heap allocation per query
  • LRU Statement Cache — Bounded cache (100 max), auto-evicts
  • SSL/TLS — Full TLS with mutual TLS (mTLS) support
  • Password Auth Modes — Supports SCRAM-SHA-256, MD5, and cleartext server flows
  • Enterprise Auth Policy — Configure allowed auth mechanisms and SCRAM channel binding mode
  • Kerberos/GSS/SSPI Hooks — Protocol-level support with pluggable token providers (legacy + stateful)
  • Built-in Linux Kerberos Provider — Optional enterprise-gssapi feature for native krb5/GSS flow
  • Protocol Negotiation — Requests startup protocol 3.2 by default with one-shot fallback to 3.0 on explicit server version rejection
  • Cancel-Key Compatibility — Native bytes cancel-key APIs with legacy i32 wrappers retained for 4-byte keys
  • Connection Pooling — Efficient resource management with RLS-safe checkout
  • COPY Protocol — Bulk insert for high throughput (1.63M rows/sec)
  • Pipeline Execution — Multiple queries per round-trip
  • Cursors — Stream large result sets
  • Transactions — BEGIN/COMMIT/ROLLBACK

Architecture

Understanding which type to use is the most important concept:

PgPool (manages N connections, handles checkout/return)
  └── PooledConnection (call `release().await` for deterministic pool return)
        └── PgConnection (raw TCP/TLS stream, wire protocol I/O)
              └── PgDriver (convenience wrapper over PgConnection)
TypeUse When
PgDriverQuick scripts, benchmarks, single-connection use cases
PgConnectionYou need raw control (TLS, mTLS, Unix sockets, manual lifecycle)
PgPoolProduction code — multi-connection, concurrent workloads
PooledConnectionYou called pool.acquire*() — call release().await

Rule of thumb: If you’re building a server, use PgPool. Everything else is for specialized cases.


Connection Methods

Choose based on your deployment:

ScenarioMethodNotes
Local dev (pg_hba.conf = trust)PgDriver::connect()No password required
Password auth (most common)PgDriver::connect_with_password()Auto cleartext / MD5 / SCRAM-SHA-256
Cloud DB (RDS, Cloud SQL, Supabase)PgConnection::connect_tls()Server-side TLS
Zero-trust / mTLSPgConnection::connect_mtls()Client certificate
Enterprise policy (TLS/auth/channel binding)PgDriver::connect_with_options()Explicit TLS mode + auth controls
Unix socket (same host)PgConnection::connect_unix()Lowest latency
.env / DATABASE_URLPgDriver::connect_env()Parses URL format
Custom configPgDriver::builder()Builder pattern for full control

Basic Connection

#![allow(unused)]
fn main() {
use qail_pg::PgDriver;

// Trust mode (no password)
let driver = PgDriver::connect("localhost", 5432, "user", "db").await?;

// With password (auto-detects MD5 or SCRAM-SHA-256)
let driver = PgDriver::connect_with_password(
    "localhost", 5432, "user", "db", "password"
).await?;

// From DATABASE_URL env var
let driver = PgDriver::connect_env().await?;
}

SSL/TLS

#![allow(unused)]
fn main() {
use qail_pg::PgConnection;

// Standard TLS — verifies server certificate
let conn = PgConnection::connect_tls("localhost", 5432, "user", "db").await?;
}

Mutual TLS (Client Certificates)

#![allow(unused)]
fn main() {
use qail_pg::{PgConnection, TlsConfig};

let config = TlsConfig {
    client_cert_pem: cert_bytes,
    client_key_pem: key_bytes,
    ca_cert_pem: Some(ca_bytes),
};

let conn = PgConnection::connect_mtls("localhost", 5432, "user", "db", config).await?;
}

Enterprise Auth/TLS Policy

#![allow(unused)]
fn main() {
use qail_pg::{AuthSettings, ConnectOptions, PgDriver, ScramChannelBindingMode, TlsMode};

let options = ConnectOptions {
    tls_mode: TlsMode::Require,
    auth: AuthSettings {
        allow_cleartext_password: false,
        allow_md5_password: false,
        allow_scram_sha_256: true,
        channel_binding: ScramChannelBindingMode::Require,
        ..AuthSettings::default()
    },
    ..Default::default()
};

let driver = PgDriver::connect_with_options(
    "db.internal",
    5432,
    "app_user",
    "app_db",
    Some("secret"),
    options,
)
.await?;
}

Kerberos / GSSAPI Token Hook

#![allow(unused)]
fn main() {
use qail_pg::{
    AuthSettings, ConnectOptions, EnterpriseAuthMechanism, PgDriver,
};

fn gss_provider(
    mech: EnterpriseAuthMechanism,
    challenge: Option<&[u8]>,
) -> Result<Vec<u8>, String> {
    // Plug your krb5/gssapi token generation here.
    // Return initial token when challenge=None, then continue tokens per challenge.
    let _ = (mech, challenge);
    Err("not wired yet".to_string())
}

let options = ConnectOptions {
    auth: AuthSettings::gssapi_only(),
    gss_token_provider: Some(gss_provider),
    ..Default::default()
};

let _driver = PgDriver::connect_with_options(
    "db.internal", 5432, "app_user", "app_db", None, options
).await?;
}

Stateful GSS Provider (Per-Session Context)

#![allow(unused)]
fn main() {
use std::sync::Arc;
use qail_pg::{AuthSettings, ConnectOptions, GssTokenRequest, PgDriver};

let provider = Arc::new(move |req: GssTokenRequest<'_>| -> Result<Vec<u8>, String> {
    // req.session_id is stable for one auth handshake.
    // Keep per-session context in your own map if your GSS stack requires it.
    let _ = req;
    Err("wire your stateful provider".to_string())
});

let options = ConnectOptions {
    auth: AuthSettings::gssapi_only(),
    gss_token_provider_ex: Some(provider),
    ..Default::default()
};

let _driver = PgDriver::connect_with_options(
    "db.internal", 5432, "app_user", "app_db", None, options
).await?;
}

Built-in Linux Kerberos Provider (Feature-Gated)

#![allow(unused)]
fn main() {
#[cfg(all(feature = "enterprise-gssapi", target_os = "linux"))]
{
    use qail_pg::{
        AuthSettings, ConnectOptions, LinuxKrb5ProviderConfig, PgDriver,
        linux_krb5_preflight, linux_krb5_token_provider,
    };

    let gss_cfg = LinuxKrb5ProviderConfig {
        service: "postgres".to_string(),
        host: "db.internal".to_string(),
        target_name: None, // optional override, e.g. Some("postgres@db.internal".into())
    };

    let report = linux_krb5_preflight(&gss_cfg)?;
    for warning in &report.warnings {
        eprintln!("Kerberos preflight warning: {}", warning);
    }

    let provider = linux_krb5_token_provider(gss_cfg)?;

    let options = ConnectOptions {
        auth: AuthSettings::gssapi_only(),
        gss_token_provider_ex: Some(provider),
        ..Default::default()
    };

    let _driver = PgDriver::connect_with_options(
        "db.internal", 5432, "app_user", "app_db", None, options
    ).await?;
}
}

Unix Socket

#![allow(unused)]
fn main() {
let conn = PgConnection::connect_unix(
    "/var/run/postgresql",  // socket directory
    "user",
    "db"
).await?;
}

AST-Native Queries

All queries are constructed through the typed AST — no raw SQL strings.

#![allow(unused)]
fn main() {
use qail_core::Qail;

let cmd = Qail::get("users").select_all().limit(10);

// Fetch all rows
let rows = driver.fetch_all(&cmd).await?;

// Fetch one row
let row = driver.fetch_one(&cmd).await?;

// Execute mutation (returns affected rows)
let affected = driver.execute(&cmd).await?;
}

Statement Cache (LRU)

Prepared statements are cached automatically. The AST is hashed by structure, so identical query shapes reuse the same prepared statement.

Cached execution includes one-shot self-heal for common server-side invalidation cases (prepared statement does not exist, cached plan must be replanned): local cache state is cleared and the statement is retried once automatically.

#![allow(unused)]
fn main() {
// Cache is bounded (default: 100 statements)
// Auto-evicts least recently used when full

let (size, capacity) = driver.cache_stats();  // (42, 100)

// Manual clear if needed
driver.clear_cache();
}
MethodDescription
fetch_all()Uses cache (~25,000 q/s)
fetch_all_with_format(cmd, ResultFormat::Binary)Cached fetch with binary column format
fetch_all_uncached()Skips cache
fetch_all_uncached_with_format(...)Uncached fetch with text/binary result format
cache_stats()Returns (current, max)
clear_cache()Frees all cached statements

Pipeline Methods

Pipelining sends multiple queries in a single network round-trip. This is the key to high throughput. Choose based on your needs:

Do you need return values from each query?
├── Yes
│   └── pipeline_ast()              — full parse/bind/execute per query, returns rows
│
└── No (fire-and-forget mutations)
    ├── Repeating the same query shape with different params?
    │   ├── pipeline_ast_cached()   — hash-based statement reuse
    │   └── pipeline_prepared_fast()— named prepared statement reuse
    │
    └── Different query shapes?
        ├── pipeline_ast_fast()     — parse+bind+execute, discard results
        ├── pipeline_simple_fast()  — simple query protocol (no params)
        └── pipeline_bytes_fast()   — pre-encoded buffers (fastest possible)

Quick Reference

MethodReturns Rows?Statement CachingRelative Speed
pipeline_ast()Hash-based★★★
pipeline_ast_fast()None★★★★
pipeline_ast_cached()Hash + LRU★★★★★
pipeline_bytes_fast()Pre-encoded★★★★★
pipeline_prepared_fast()Named★★★★★
pipeline_prepared_zerocopy()Named + zero-copy★★★★★
pipeline_prepared_ultra()Named + ultra★★★★★+

Example: Pipelined Inserts

#![allow(unused)]
fn main() {
let commands: Vec<QailCmd> = users.iter().map(|u| {
    Qail::add("users")
        .set("name", &u.name)
        .set("email", &u.email)
        .build()
}).collect();

// Fire-and-forget — fastest for bulk mutations
let affected = driver.pipeline_ast_fast(&commands).await?;

// With results — slower but returns inserted rows
let rows = driver.pipeline_ast(&commands).await?;
}

Connection Pooling

#![allow(unused)]
fn main() {
use qail_pg::{PgPool, PoolConfig};
use qail_core::Qail;

let config = PoolConfig::new("localhost", 5432, "user", "db")
    .password("secret")
    .tls_mode(qail_pg::TlsMode::Require)
    .auth_settings(qail_pg::AuthSettings::scram_only())
    .max_connections(20)
    .min_connections(5);

let pool = PgPool::connect(config).await?;

// Acquire connection (return deterministically with release())
let mut conn = pool.acquire().await?;
let probe = Qail::get("users").columns(["id"]).limit(1);
let _ = conn.fetch_all(&probe).await?;
conn.release().await;

// Check idle count
let idle = pool.idle_count().await;
}

Pool with RLS (Multi-Tenant)

#![allow(unused)]
fn main() {
use qail_core::RlsContext;

let ctx = RlsContext {
    operator_id: "tenant-123".into(),
    agent_id: Some("agent-456".into()),
    is_super_admin: false,
};

// Acquire + set RLS context in one call
// Call release() after query work to reset context and return to pool
let mut conn = pool.acquire_with_rls(&ctx).await?;
conn.release().await;
}

Important: When using acquire_with_rls(), the RLS context is automatically cleared when the connection is returned to the pool. This prevents cross-tenant data leakage — a connection used by Tenant A will never carry Tenant A’s context when checked out by Tenant B.

Pool Configuration

#![allow(unused)]
fn main() {
use std::time::Duration;

let config = PoolConfig::new("localhost", 5432, "user", "db")
    .idle_timeout(Duration::from_secs(600))    // 10 min
    .acquire_timeout(Duration::from_secs(30))  // 30 sec
    .connect_timeout(Duration::from_secs(10)); // 10 sec
}
OptionDefaultDescription
max_connections10Maximum pool size
min_connections1Minimum idle connections
idle_timeout10 minStale connections auto-discarded
acquire_timeout30 secMax wait for connection
connect_timeout10 secMax time to establish new connection
max_lifetime30 minMax age of any connection
test_on_acquiretruePing connection before returning

Bulk Insert (COPY Protocol)

High-performance bulk insert using PostgreSQL’s COPY protocol. Benchmarked at 1.63M rows/sec for 100M rows.

#![allow(unused)]
fn main() {
use qail_core::ast::Value;

let cmd = Qail::add("users").columns(&["name", "email"]);

let rows = vec![
    vec![Value::Text("Alice".into()), Value::Text("a@x.com".into())],
    vec![Value::Text("Bob".into()), Value::Text("b@x.com".into())],
];

let count = driver.copy_bulk(&cmd, &rows).await?;
// count = 2
}

Performance Comparison

OperationRows/secNotes
COPY bulk insert1.63MNative COPY protocol
Pipelined INSERT180KExtended Query
Single INSERT22KPer-statement

Cursor Streaming

Stream large result sets in batches:

#![allow(unused)]
fn main() {
let cmd = Qail::get("logs").select_all();

let batches = driver.stream_cmd(&cmd, 1000).await?;
for batch in batches {
    for row in batch {
        // Process row
    }
}
}

Transactions

#![allow(unused)]
fn main() {
let mut conn = pool.acquire().await?;

conn.begin_transaction().await?;
// ... queries ...
conn.commit().await?;

// Or rollback on error
conn.rollback().await?;
}

Row Decoding

By Index

#![allow(unused)]
fn main() {
let name = row.get_string(0);
let age = row.get_i32(1);
}
#![allow(unused)]
fn main() {
// Safer — column order changes don't break code
let name = row.get_string_by_name("name");
let age = row.get_i32_by_name("age");
let email = row.get_string_by_name("email");

// Check if NULL
if row.is_null_by_name("deleted_at") { ... }
}

Available get_by_name methods:

  • get_string_by_name, get_i32_by_name, get_i64_by_name
  • get_f64_by_name, get_bool_by_name
  • get_uuid_by_name, get_json_by_name
  • is_null_by_name, column_index

Supported Types

Rust TypePostgreSQL Type
i16/i32/i64INT2/INT4/INT8
f32/f64FLOAT4/FLOAT8
boolBOOLEAN
StringTEXT/VARCHAR
Vec<u8>BYTEA
UuidUUID
TimestampTIMESTAMPTZ
DateDATE
TimeTIME
JsonJSONB
InetINET
CidrCIDR
MacAddrMACADDR
NumericNUMERIC/DECIMAL

✅ AST-Only Driver

Raw SQL helper APIs (execute_raw, fetch_raw) were removed.

#![allow(unused)]
fn main() {
// ✅ Use AST-native transaction APIs
driver.begin().await?;
// ... execute QAIL commands ...
driver.commit().await?;
}