Concurrency dan Race Condition di Rust: Panduan Lengkap dari Ownership hingga Async Tokio
20 min read

Concurrency dan Race Condition di Rust: Panduan Lengkap dari Ownership hingga Async Tokio

Rust mengambil pendekatan yang fundamentally berbeda dari semua bahasa lain dalam menangani concurrency. Di Go, race condition dicegah oleh konvensi dan race detector yang berjalan saat runtime. Di Java dan Kotlin, developer bertanggung jawab menggunakan synchronized, volatile, atau lock dengan benar — dan jika lupa, bug baru muncul saat production. Di Rust, race condition tidak mungkin terjadi — bukan karena runtime yang memantaunya, bukan karena konvensi yang harus dipatuhi, tetapi karena compiler menolak mengkompilasi kode yang mengandung data race.

Klaim ini terdengar terlalu bagus untuk menjadi kenyataan, tapi inilah yang disebut Rust sebagai “fearless concurrency”. Sistem ownership dan borrowing Rust, yang sudah menjamin memory safety tanpa garbage collector, ternyata juga secara alami menjamin thread safety. Trait Send dan Sync memungkinkan compiler memverifikasi secara statik apakah sebuah tipe aman untuk dikirim lintas thread atau diakses dari beberapa thread bersamaan. Jika kode kamu melanggar jaminan ini, program tidak akan bisa dikompilasi — race condition dideteksi pada compile time, bukan runtime.

Ini bukan berarti concurrency di Rust mudah. Justru karena compiler sangat ketat, kurva belajarnya cukup curam — terutama saat pertama kali berhadapan dengan pesan error tentang lifetime, borrow checker, dan trait bounds. Tapi saat kode akhirnya berhasil dikompilasi, kamu bisa yakin bahwa tidak ada data race di dalamnya. Artikel ini membahas seluruh spektrum concurrency di Rust: dari threading primitif dan synchronization primitives, hingga async/await dan Tokio (versi stabil terkini 1.52.x) yang menjadi standar de-facto untuk async I/O di ekosistem Rust.

Ownership dan Borrowing: Fondasi Thread Safety di Rust

Untuk memahami kenapa Rust bisa menjamin thread safety, kita perlu memahami dua aturan inti Rust terlebih dahulu.

Aturan 1 — Ownership: Setiap nilai di Rust hanya dimiliki oleh satu variabel (owner) pada satu waktu. Saat owner keluar dari scope, nilai tersebut di-drop (memory dibebaskan). Nilai bisa dipindah (move) ke owner baru, tapi owner lama tidak bisa diakses lagi.

Aturan 2 — Borrowing: Kamu bisa meminjam referensi ke nilai tanpa mengambil ownership. Pada satu waktu, sebuah nilai hanya boleh memiliki salah satu dari:

  • Satu atau lebih immutable reference (&T) — banyak boleh baca bersamaan
  • Tepat satu mutable reference (&mut T) — hanya satu yang boleh tulis, tidak ada yang bisa baca bersamaan

Aturan borrowing ini terasa sangat familiar jika dipandang dari sudut concurrency: “banyak reader atau satu writer” adalah prinsip yang sama dengan RwLock. Bedanya, di Rust ini dijamin oleh compiler secara statik tanpa overhead runtime.

fn main() {
    let mut data = vec![1, 2, 3];

    // OK: banyak immutable reference bersamaan
    let r1 = &data;
    let r2 = &data;
    println!("{:?} {:?}", r1, r2);

    // ERROR saat kompilasi: tidak bisa punya mutable dan immutable reference bersamaan
    let r3 = &mut data;
    // error[E0502]: cannot borrow `data` as mutable because it is also borrowed as immutable
    println!("{:?}", r1); // r1 masih aktif di sini
}

Jika kode yang sama dijalankan dari dua thread tanpa sinkronisasi, inilah tepatnya race condition yang terjadi — satu thread membaca sementara thread lain menulis. Rust mencegah ini bukan dengan runtime check, melainkan dengan menolak kompilasi kode yang memiliki potensi tersebut.


Thread Dasar: std::thread

Rust menyediakan threading primitif di standard library melalui std::thread:

use std::thread;
use std::time::Duration;

fn main() {
    // Spawn thread baru — mengembalikan JoinHandle
    let handle = thread::spawn(|| {
        for i in 1..=5 {
            println!("Thread anak: {}", i);
            thread::sleep(Duration::from_millis(10));
        }
    });

    // Thread utama juga berjalan bersamaan
    for i in 1..=3 {
        println!("Thread utama: {}", i);
        thread::sleep(Duration::from_millis(10));
    }

    handle.join().unwrap(); // tunggu thread anak selesai
}

Move Closure: Memindah Ownership ke Thread

Ketika sebuah thread mengakses data dari luar closure-nya, Rust memaksa kamu untuk memindah ownership data tersebut ke thread — bukan meminjam referensinya. Ini mencegah situasi di mana thread masih menggunakan data sementara owner aslinya sudah mati (dangling pointer).

use std::thread;

fn main() {
    let data = vec![1, 2, 3];

    // ANTI-PATTERN: thread mungkin hidup lebih lama dari `data`
    // let handle = thread::spawn(|| {
    //     println!("{:?}", data);
    //     // error[E0373]: closure may outlive the current function, but it borrows `data`
    // });

    // BENAR: gunakan `move` untuk memindah ownership ke thread
    let handle = thread::spawn(move || {
        println!("{:?}", data); // `data` sekarang dimiliki thread ini
    });

    // println!("{:?}", data); // ERROR: data sudah di-move ke thread

    handle.join().unwrap();
}

Konfigurasi Thread

use std::thread;

let handle = thread::Builder::new()
    .name("worker-1".to_string())
    .stack_size(4 * 1024 * 1024) // 4 MB stack
    .spawn(|| {
        println!("Nama thread: {}", thread::current().name().unwrap());
    })
    .expect("Gagal membuat thread");

handle.join().unwrap();

Send dan Sync: Trait Penjaga Thread Safety

Dua trait paling penting untuk concurrency di Rust adalah Send dan Sync — keduanya adalah marker trait tanpa metode, hanya menandai properti sebuah tipe:

  • Send: Tipe ini aman untuk dipindah ke thread lain. Hampir semua tipe di Rust adalah Send secara default.
  • Sync: Tipe ini aman untuk diakses dari beberapa thread melalui shared reference. T adalah Sync jika dan hanya jika &T adalah Send.
flowchart TD
    A[Tipe T] --> B{Implements Send?}
    B -- Ya --> C[Aman di-move ke thread lain]
    B -- Tidak --> D[Hanya di thread pembuatnya<br/>contoh: Rc T, raw pointer]
    A --> E{Implements Sync?}
    E -- Ya --> F[Aman diakses via shared ref<br/>dari banyak thread]
    E -- Tidak --> G[Butuh wrapper seperti<br/>Mutex atau RwLock]

Mengapa Rc<T> Tidak Send

Rc<T> (Reference Counted) menggunakan counter non-atomik karena diasumsikan tidak ada concurrency — lebih cepat dari Arc<T>, tapi tidak aman lintas thread:

use std::rc::Rc;
use std::thread;

fn main() {
    let rc = Rc::new(42);

    thread::spawn(move || {
        println!("{}", rc);
        // error[E0277]: `Rc<i32>` cannot be sent between threads safely
        // the trait `Send` is not implemented for `Rc<i32>`
    });
}

Compiler langsung menolak ini — tidak ada runtime crash, tidak ada undefined behavior. Hanya pesan error yang jelas.


Arc: Shared Ownership Lintas Thread

Arc<T> (Atomically Reference Counted) adalah versi thread-safe dari Rc<T> yang menggunakan atomic operations untuk mengelola reference count:

use std::sync::Arc;
use std::thread;

fn main() {
    let data = Arc::new(vec![1, 2, 3]);
    let mut handles = vec![];

    for i in 0..3 {
        let data_clone = Arc::clone(&data); // clone pointer, bukan data
        let handle = thread::spawn(move || {
            println!("Thread {}: {:?}", i, data_clone);
        });
        handles.push(handle);
    }

    for handle in handles {
        handle.join().unwrap();
    }
}

Arc<T> sendiri hanya memberikan shared immutable access. Untuk menulis dari beberapa thread, Arc dikombinasikan dengan Mutex atau RwLock.


Mutex dan RwLock

Mutex<T>: RAII-Based Locking

Mutex<T> di Rust menyimpan data di dalam Mutex. Untuk mengakses data, kamu harus mengambil lock — hasilnya adalah MutexGuard yang menyediakan akses ke data. Saat MutexGuard keluar dari scope, lock otomatis dilepas via RAII. Tidak ada risiko lupa memanggil unlock().

use std::sync::{Arc, Mutex};
use std::thread;

fn main() {
    let counter = Arc::new(Mutex::new(0_i32));
    let mut handles = vec![];

    for _ in 0..10 {
        let counter = Arc::clone(&counter);
        let handle = thread::spawn(move || {
            for _ in 0..1000 {
                let mut guard = counter.lock().unwrap(); // acquire lock
                *guard += 1;
                // guard keluar scope: lock otomatis dilepas (RAII)
            }
        });
        handles.push(handle);
    }

    for handle in handles {
        handle.join().unwrap();
    }

    println!("Counter: {}", *counter.lock().unwrap()); // selalu 10000
}

Pola Arc<Mutex<T>> adalah idiom paling umum di Rust untuk shared mutable state lintas thread.

Mutex Poisoning

Jika thread yang memegang lock mengalami panic, Mutex menjadi poisoned untuk mencegah thread lain mengakses data yang mungkin dalam keadaan tidak konsisten:

use std::sync::{Arc, Mutex};
use std::thread;

fn main() {
    let mutex = Arc::new(Mutex::new(0_i32));
    let m = Arc::clone(&mutex);

    let _ = thread::spawn(move || {
        let _guard = m.lock().unwrap();
        panic!("panic saat memegang lock!"); // mutex menjadi poisoned
    }).join();

    match mutex.lock() {
        Ok(val) => println!("Normal: {}", *val),
        Err(poisoned) => {
            // bisa paksa akses dengan into_inner()
            println!("Poisoned, tapi data: {}", *poisoned.into_inner());
        }
    }
}

RwLock<T>: Multiple Reader atau Single Writer

use std::sync::{Arc, RwLock};
use std::thread;
use std::collections::HashMap;

fn main() {
    let cache = Arc::new(RwLock::new(HashMap::<String, String>::new()));

    // Writer
    {
        let mut map = cache.write().unwrap(); // eksklusif
        map.insert("key".to_string(), "value".to_string());
    }

    // Banyak reader concurrent
    let mut handles = vec![];
    for i in 0..5 {
        let cache = Arc::clone(&cache);
        let h = thread::spawn(move || {
            let map = cache.read().unwrap(); // banyak reader OK bersamaan
            println!("Reader {}: {:?}", i, map.get("key"));
        });
        handles.push(h);
    }

    for h in handles { h.join().unwrap(); }
}
Mutex<T>RwLock<T>
Read concurrentTidakYa
WriteSatu, eksklusifSatu, eksklusif
OverheadRendahSedikit lebih tinggi
Cocok untukWrite-heavy / mixedRead-heavy
PoisoningYaYa

Channel: Komunikasi Antar Thread

Rust standard library menyediakan std::sync::mpsc (multiple producer, single consumer):

use std::sync::mpsc;
use std::thread;

fn main() {
    let (tx, rx) = mpsc::channel::<String>();
    let tx1 = tx.clone();
    let tx2 = tx.clone();

    thread::spawn(move || {
        tx1.send("pesan dari thread 1".to_string()).unwrap();
        tx1.send("pesan lagi dari thread 1".to_string()).unwrap();
    });

    thread::spawn(move || {
        tx2.send("pesan dari thread 2".to_string()).unwrap();
    });

    drop(tx); // drop original agar rx tahu semua pengirim selesai

    for received in rx {
        println!("Diterima: {}", received);
    }
}

Bounded Channel dengan sync_channel

use std::sync::mpsc;

// Channel dengan kapasitas 10: pengirim blok saat buffer penuh
let (tx, rx) = mpsc::sync_channel::<i32>(10);

thread::spawn(move || {
    for i in 0..20 {
        tx.send(i).unwrap(); // blok saat buffer penuh — natural backpressure
    }
});

for val in rx {
    println!("{}", val);
}

Atomic Types: Operasi Lock-Free

use std::sync::Arc;
use std::sync::atomic::{AtomicI32, AtomicBool, Ordering};
use std::thread;

fn main() {
    let counter = Arc::new(AtomicI32::new(0));
    let mut handles = vec![];

    for _ in 0..10 {
        let counter = Arc::clone(&counter);
        let handle = thread::spawn(move || {
            for _ in 0..1000 {
                counter.fetch_add(1, Ordering::Relaxed);
            }
        });
        handles.push(handle);
    }

    for handle in handles { handle.join().unwrap(); }
    println!("Counter: {}", counter.load(Ordering::SeqCst)); // selalu 10000
}

Memory Ordering

use std::sync::atomic::Ordering;

// Relaxed: hanya atomicity, tidak ada jaminan urutan relatif
// Cocok untuk counter murni yang tidak dipakai untuk sinkronisasi
counter.fetch_add(1, Ordering::Relaxed);

// Acquire/Release: membangun happens-before relationship
// Release: semua operasi sebelumnya tidak bisa di-reorder setelah store
// Acquire: semua operasi sesudahnya tidak bisa di-reorder sebelum load
flag.store(true, Ordering::Release);       // pengirim: publish data, lalu set flag
if flag.load(Ordering::Acquire) { ... }   // penerima: baca flag, baru baca data

// SeqCst: jaminan terkuat, total order global semua operasi atomik
// Paling aman, paling mudah dipahami, sedikit lebih lambat
counter.load(Ordering::SeqCst);
Memilih Ordering yang salah tidak menyebabkan error kompilasi, tapi bisa menyebabkan subtle bug pada CPU dengan weak memory model seperti ARM dan PowerPC. Untuk kode yang tidak performance-critical, gunakan SeqCst sebagai default yang aman. Optimasi ke Relaxed atau Acquire/Release hanya jika kamu benar-benar memahami memory ordering model dan sudah mem-profile bahwa ini adalah bottleneck.

Deadlock di Rust: Masih Bisa Terjadi

Rust mencegah data race, tapi tidak mencegah deadlock. Deadlock adalah masalah logika, bukan masalah memory safety, sehingga di luar jangkauan yang bisa diverifikasi compiler secara statik.

use std::sync::{Arc, Mutex};
use std::thread;
use std::time::Duration;

fn main() {
    let lock_a = Arc::new(Mutex::new(0));
    let lock_b = Arc::new(Mutex::new(0));
    let (la1, lb1) = (Arc::clone(&lock_a), Arc::clone(&lock_b));
    let (la2, lb2) = (Arc::clone(&lock_a), Arc::clone(&lock_b));

    // ANTI-PATTERN: urutan lock berbeda di dua thread — DEADLOCK
    let t1 = thread::spawn(move || {
        let _a = la1.lock().unwrap();   // ambil A
        thread::sleep(Duration::from_millis(10));
        let _b = lb1.lock().unwrap();   // tunggu B — DEADLOCK
    });

    let t2 = thread::spawn(move || {
        let _b = lb2.lock().unwrap();   // ambil B
        thread::sleep(Duration::from_millis(10));
        let _a = la2.lock().unwrap();   // tunggu A — DEADLOCK
    });

    t1.join().unwrap(); // hang selamanya
    t2.join().unwrap();
}
flowchart LR
    T1[Thread 1<br/>pegang lock_a] -->|menunggu| LB[lock_b]
    T2[Thread 2<br/>pegang lock_b] -->|menunggu| LA[lock_a]
    LA -->|dipegang| T1
    LB -->|dipegang| T2
// BENAR: selalu acquire lock dalam urutan yang sama di semua thread
let t1 = thread::spawn(move || {
    let _a = la1.lock().unwrap(); // A dulu
    let _b = lb1.lock().unwrap(); // B kedua
});

let t2 = thread::spawn(move || {
    let _a = la2.lock().unwrap(); // A dulu (urutan konsisten!)
    let _b = lb2.lock().unwrap(); // B kedua
});

Async/Await di Rust: Model Berbasis Future

Threading berbasis OS bekerja baik untuk CPU-bound work, tapi untuk I/O-bound concurrency dengan ribuan koneksi concurrent, overhead thread OS menjadi bottleneck. Solusinya adalah async/await dengan runtime asynchronous.

Future: Unit Dasar Async Rust

Future adalah trait yang merepresentasikan komputasi yang belum selesai. Berbeda dengan Promise di JavaScript atau Coroutine di Kotlin yang langsung berjalan saat dibuat, Future di Rust bersifat lazy — ia tidak melakukan apa pun sampai di-poll oleh runtime:

// async fn adalah syntactic sugar untuk fungsi yang mengembalikan impl Future
async fn fetch_data(url: &str) -> String {
    // `await` mem-poll Future sampai selesai
    // selama menunggu, task di-suspend, thread bisa mengerjakan task lain
    reqwest::get(url).await.unwrap().text().await.unwrap()
}

// Ekuivalen tanpa sugar syntax:
fn fetch_data_manual(url: String) -> impl std::future::Future<Output = String> {
    async move {
        reqwest::get(&url).await.unwrap().text().await.unwrap()
    }
}

Tokio: Async Runtime De-facto

Rust menyediakan Future trait dan async/await syntax, tapi tidak menyediakan runtime bawaan. Runtime bertugas mem-poll future secara efisien dan mengelola thread pool. Tokio adalah runtime paling populer di ekosistem Rust:

# Cargo.toml
[dependencies]
tokio = { version = "1.52", features = ["full"] }
// #[tokio::main] setup Tokio runtime dan jalankan async main
#[tokio::main]
async fn main() {
    let result = fetch_data_local().await;
    println!("Hasil: {}", result);
}

async fn fetch_data_local() -> String {
    tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
    "data selesai".to_string()
}

Single-Thread vs Multi-Thread Runtime

// Multi-thread (default): work-stealing scheduler, thread = jumlah core CPU
#[tokio::main]
async fn main() { /* ... */ }

// Eksplisit multi-thread dengan 4 worker
#[tokio::main(flavor = "multi_thread", worker_threads = 4)]
async fn main() { /* ... */ }

// Single-thread: semua task di satu thread, tidak ada parallelism
#[tokio::main(flavor = "current_thread")]
async fn main() { /* ... */ }

Task Tokio: Goroutine-nya Rust

tokio::task::spawn meluncurkan async task yang berjalan concurrent di Tokio runtime:

use tokio::task;

#[tokio::main]
async fn main() {
    // Spawn task — berjalan concurrent
    let handle = task::spawn(async {
        tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
        42_i32
    });

    let result = handle.await.unwrap();
    println!("Hasil: {}", result);

    // Spawn banyak task paralel dan kumpulkan hasilnya
    let handles: Vec<_> = (0..10)
        .map(|i| task::spawn(async move {
            tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
            i * 2
        }))
        .collect();

    for handle in handles {
        println!("Hasil: {}", handle.await.unwrap());
    }
}

spawn_blocking: Kode Blocking di Async Context

Kode blocking tidak boleh dijalankan langsung di async task — ia akan memblokir worker thread dan mencegah task lain berjalan:

use tokio::task;

#[tokio::main]
async fn main() {
    // ANTI-PATTERN: blocking call di async task
    // std::thread::sleep(Duration::from_secs(1)); // membekukan worker thread!
    // std::fs::read_to_string("file.txt");        // blocking I/O!

    // BENAR: pindahkan ke blocking thread pool
    let result = task::spawn_blocking(|| {
        std::thread::sleep(std::time::Duration::from_millis(100));
        heavy_computation()
    }).await.unwrap();

    println!("Hasil: {}", result);

    // BENAR: gunakan tokio::fs untuk file I/O async
    let content = tokio::fs::read_to_string("data.txt").await;
}

fn heavy_computation() -> i64 {
    (1..=1_000_000_i64).sum()
}
flowchart TD
    subgraph "Tokio Runtime"
        subgraph "Async Worker Threads"
            W1[Worker 1]
            W2[Worker 2]
        end
        subgraph "Blocking Thread Pool"
            B1[Blocking Thread 1]
            B2[Blocking Thread 2]
        end
    end
    AT1[Async Task 1] --> W1
    AT2[Async Task 2] --> W2
    BT1[Blocking Task via spawn_blocking] --> B1
    W1 -.suspend saat await.- W1

Race Condition di Async Rust

Meski Rust mencegah data race di level memory, logical race condition — terutama pola check-then-act — masih bisa terjadi di async code:

use std::sync::Arc;
use tokio::sync::Mutex;
use std::collections::HashMap;

// ANTI-PATTERN: check-then-act tidak atomik
async fn get_or_insert(
    cache: Arc<Mutex<HashMap<String, String>>>,
    key: String,
) -> String {
    {
        let map = cache.lock().await;
        if let Some(val) = map.get(&key) {
            return val.clone();
        }
    } // lock dilepas di sini

    // GAP: task lain bisa insert key yang sama di sini!
    let new_val = fetch_expensive(&key).await;

    let mut map = cache.lock().await;
    map.insert(key, new_val.clone()); // mungkin overwrite hasil task lain
    new_val
}

// BENAR: gunakan entry API yang atomik, atau tahan lock sepanjang operasi
async fn get_or_insert_safe(
    cache: Arc<Mutex<HashMap<String, String>>>,
    key: String,
) -> String {
    let mut map = cache.lock().await;
    if let Some(val) = map.get(&key) {
        return val.clone(); // sudah ada, return langsung
    }
    // Lock masih dipegang — tidak ada task lain yang bisa masuk
    let new_val = format!("value-for-{}", key); // operasi sync saja di dalam lock
    map.insert(key, new_val.clone());
    new_val
}

async fn fetch_expensive(key: &str) -> String {
    tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
    format!("value-for-{}", key)
}

Tokio Sync Primitives

tokio::sync menyediakan versi async-aware dari semua primitif sinkronisasi. Penting: jangan gunakan std::sync::Mutex saat memegang lock melewati titik await — ini bisa menyebabkan deadlock karena thread bisa berpindah mengerjakan task lain sementara lock masih dipegang:

use tokio::sync::{Mutex, RwLock, Semaphore, Notify};
use std::sync::Arc;

// Mutex async: aman dipegang saat await
let mutex = Arc::new(Mutex::new(0_i32));
{
    let mut guard = mutex.lock().await;
    *guard += 1;
    some_async_fn().await; // aman: tokio Mutex suspend coroutine, tidak blokir thread
} // guard drop: lock dilepas

// RwLock async
let rwlock = Arc::new(RwLock::new(vec![1, 2, 3]));
let read = rwlock.read().await;   // banyak reader bersamaan
let write = rwlock.write().await; // writer eksklusif

// Semaphore: batasi concurrent operations
let semaphore = Arc::new(Semaphore::new(10));
let permit = semaphore.acquire().await.unwrap();
do_limited_work().await;
drop(permit); // kembalikan slot

// Notify: sinyal antar task
let notify = Arc::new(Notify::new());
let n = Arc::clone(&notify);
tokio::spawn(async move { n.notified().await; println!("Diberitahu!"); });
notify.notify_one();

async fn some_async_fn() {}
async fn do_limited_work() {}

Kapan Pakai std::sync::Mutex vs tokio::sync::Mutex

// Pakai std::sync::Mutex jika:
// lock dipegang sangat singkat, tidak ada await di dalamnya
use std::sync::Mutex;
let fast = Mutex::new(0_i32);
{ *fast.lock().unwrap() += 1; } // tidak ada await — aman dan lebih cepat

// Pakai tokio::sync::Mutex jika:
// perlu await saat lock sedang dipegang
use tokio::sync::Mutex as AsyncMutex;
let slow = Arc::new(AsyncMutex::new(vec![]));
let mut g = slow.lock().await;
let data = fetch_async().await; // await di dalam lock — harus tokio Mutex
g.push(data);

async fn fetch_async() -> String { "data".to_string() }

Channel Async di Tokio

Tokio menyediakan empat jenis channel untuk komunikasi antar async task:

use tokio::sync::{mpsc, oneshot, broadcast, watch};

// ── mpsc: Multiple Producer, Single Consumer ──────────────────────────────
let (tx, mut rx) = mpsc::channel::<String>(100); // bounded

for i in 0..5 {
    let tx = tx.clone();
    tokio::spawn(async move {
        tx.send(format!("Pesan {}", i)).await.unwrap();
    });
}
drop(tx);
while let Some(msg) = rx.recv().await { println!("{}", msg); }

// ── oneshot: Satu pesan, satu kali ────────────────────────────────────────
let (tx, rx) = oneshot::channel::<String>();
tokio::spawn(async move {
    tx.send("hasil".to_string()).unwrap();
});
let result = rx.await.unwrap();

// ── broadcast: Satu ke banyak ─────────────────────────────────────────────
let (tx, _) = broadcast::channel::<String>(16);
let mut rx1 = tx.subscribe();
let mut rx2 = tx.subscribe();
tx.send("event".to_string()).unwrap();
// rx1 dan rx2 masing-masing menerima "event"

// ── watch: State propagation ──────────────────────────────────────────────
let (tx, rx) = watch::channel("initial");
let mut rx1 = rx.clone();
tokio::spawn(async move {
    rx1.changed().await.unwrap();
    println!("State baru: {}", *rx1.borrow());
});
tx.send("updated").unwrap();
ChannelProducerConsumerUse Case
mpscBanyakSatuTask worker, pipeline
oneshotSatuSatuRequest-response
broadcastSatuBanyakEvent broadcast, pub-sub
watchSatuBanyakState propagation, config

select!: Menunggu Beberapa Future Sekaligus

use tokio::sync::mpsc;
use tokio::time::{sleep, Duration};

#[tokio::main]
async fn main() {
    let (tx1, mut rx1) = mpsc::channel::<&str>(1);
    let (tx2, mut rx2) = mpsc::channel::<&str>(1);

    tokio::spawn(async move {
        sleep(Duration::from_millis(200)).await;
        tx1.send("dari channel 1").await.unwrap();
    });
    tokio::spawn(async move {
        sleep(Duration::from_millis(100)).await;
        tx2.send("dari channel 2").await.unwrap(); // ini duluan
    });

    loop {
        tokio::select! {
            Some(msg) = rx1.recv() => { println!("rx1: {}", msg); break; }
            Some(msg) = rx2.recv() => { println!("rx2: {}", msg); break; }
            _ = sleep(Duration::from_secs(5)) => { println!("Timeout!"); break; }
        }
    }
}

select! sangat berguna untuk timeout, cancellation via done-channel, dan racing beberapa sumber data alternatif.


Cancellation di Tokio

Tokio menggunakan drop-based cancellation: saat JoinHandle di-drop atau abort() dipanggil, task dibatalkan pada titik await berikutnya:

#[tokio::main]
async fn main() {
    let handle = tokio::spawn(async {
        tokio::time::sleep(tokio::time::Duration::from_secs(10)).await; // di-cancel di sini
        println!("tidak akan tercetak");
    });

    tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
    handle.abort();

    match handle.await {
        Ok(_) => println!("Selesai normal"),
        Err(e) if e.is_cancelled() => println!("Di-cancel"),
        Err(e) => println!("Panic: {}", e),
    }
}

Cancellation Safety

Karena cancellation terjadi di setiap titik await, kode yang tidak cancellation-safe bisa menyebabkan state setengah-selesai:

// ANTI-PATTERN: state bisa setengah-selesai jika di-cancel
async fn update(state: &mut Vec<String>, key: String) {
    let val = fetch(&key).await; // di-cancel DI SINI: insert tidak pernah terjadi
    state.push(val);
}

// BENAR: fetch dulu, baru modifikasi state (tidak ada await setelah modifikasi)
async fn update_safe(state: &mut Vec<String>, key: String) {
    let val = fetch(&key).await; // cancel di sini OK: state belum berubah
    state.push(val);             // tidak ada await: tidak bisa di-cancel di sini
}

async fn fetch(_key: &str) -> String { "value".to_string() }

Pola Concurrency Produksi

Worker Pool dengan Semaphore

use std::sync::Arc;
use tokio::sync::Semaphore;

#[tokio::main]
async fn main() {
    let semaphore = Arc::new(Semaphore::new(5)); // maksimal 5 concurrent
    let mut handles = vec![];

    for i in 0..20 {
        let sem = Arc::clone(&semaphore);
        handles.push(tokio::spawn(async move {
            let _permit = sem.acquire().await.unwrap(); // tunggu slot
            println!("Task {} mulai", i);
            tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
            println!("Task {} selesai", i);
            // permit di-drop: slot tersedia untuk task berikutnya
        }));
    }

    for h in handles { h.await.unwrap(); }
}

Pipeline Async

use tokio::sync::mpsc;

#[tokio::main]
async fn main() {
    let (tx1, rx1) = mpsc::channel(100);
    let (tx2, rx2) = mpsc::channel(100);

    tokio::join!(
        produce(tx1, vec!["a", "b", "c"]),
        transform(rx1, tx2),
        consume(rx2),
    );
}

async fn produce(tx: mpsc::Sender<String>, items: Vec<&str>) {
    for item in items { tx.send(item.to_string()).await.unwrap(); }
}

async fn transform(mut rx: mpsc::Receiver<String>, tx: mpsc::Sender<String>) {
    while let Some(item) = rx.recv().await {
        tx.send(item.to_uppercase()).await.unwrap();
    }
}

async fn consume(mut rx: mpsc::Receiver<String>) {
    while let Some(item) = rx.recv().await { println!("Output: {}", item); }
}

Graceful Shutdown

use tokio::sync::broadcast;
use tokio::signal;

#[tokio::main]
async fn main() {
    let (shutdown_tx, _) = broadcast::channel::<()>(1);

    for i in 0..3 {
        let mut rx = shutdown_tx.subscribe();
        tokio::spawn(async move {
            loop {
                tokio::select! {
                    _ = rx.recv() => { println!("Worker {} shutdown", i); break; }
                    _ = tokio::time::sleep(tokio::time::Duration::from_millis(500)) => {
                        println!("Worker {} bekerja...", i);
                    }
                }
            }
        });
    }

    signal::ctrl_c().await.unwrap();
    println!("Ctrl+C — graceful shutdown...");
    shutdown_tx.send(()).unwrap();
    tokio::time::sleep(tokio::time::Duration::from_millis(600)).await;
}

Anti-Pattern Umum

Anti-Pattern 1: Blocking di Async Context

// ANTI-PATTERN
async fn bad() {
    std::thread::sleep(std::time::Duration::from_secs(1)); // membekukan worker thread!
    let _ = std::fs::read_to_string("file.txt");           // blocking I/O!
}

// BENAR
async fn good() {
    tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
    let _ = tokio::fs::read_to_string("file.txt").await;
    // atau untuk CPU-heavy:
    tokio::task::spawn_blocking(|| heavy()).await.unwrap();
}

fn heavy() -> i64 { (1..=1_000_000_i64).sum() }

Anti-Pattern 2: std Mutex Melewati Await

use std::sync::Mutex;

// ANTI-PATTERN: std Mutex dipegang saat await — potensial deadlock
async fn bad(m: Arc<Mutex<Vec<String>>>) {
    let mut g = m.lock().unwrap();
    let data = fetch().await; // await saat lock dipegang — BAHAYA
    g.push(data);
}

// BENAR opsi A: fetch dulu, baru lock
async fn good_a(m: Arc<Mutex<Vec<String>>>) {
    let data = fetch().await;       // fetch tanpa lock
    m.lock().unwrap().push(data);   // lock singkat, tanpa await
}

// BENAR opsi B: pakai tokio::sync::Mutex
async fn good_b(m: Arc<tokio::sync::Mutex<Vec<String>>>) {
    let mut g = m.lock().await;
    let data = fetch().await; // aman dengan tokio Mutex
    g.push(data);
}

async fn fetch() -> String { "data".to_string() }

Anti-Pattern 3: Task Tanpa Batas

// ANTI-PATTERN: spawn tak terbatas — OOM saat load spike
async fn bad(items: Vec<String>) {
    let handles: Vec<_> = items.into_iter()
        .map(|item| tokio::spawn(async move { process(item).await }))
        .collect(); // 100.000 item = 100.000 task sekaligus!
    for h in handles { h.await.unwrap(); }
}

// BENAR: batasi dengan Semaphore
async fn good(items: Vec<String>) {
    let sem = Arc::new(Semaphore::new(50));
    let mut set = tokio::task::JoinSet::new();
    for item in items {
        let sem = Arc::clone(&sem);
        set.spawn(async move {
            let _p = sem.acquire().await.unwrap();
            process(item).await;
        });
    }
    while let Some(r) = set.join_next().await { r.unwrap(); }
}

async fn process(_item: String) {}

Anti-Pattern 4: Menelan CancellationError

// ANTI-PATTERN: menangkap semua error termasuk task cancellation
async fn bad_wrapper() {
    match tokio::spawn(async { some_work().await }).await {
        Ok(_) => {}
        Err(_) => {} // menelan JoinError — tidak tahu apakah panic atau cancel!
    }
}

// BENAR: periksa jenis error
async fn good_wrapper() {
    match tokio::spawn(async { some_work().await }).await {
        Ok(_) => println!("Selesai"),
        Err(e) if e.is_cancelled() => println!("Di-cancel"),
        Err(e) => std::panic::resume_unwind(e.into_panic()), // re-panic
    }
}

async fn some_work() {}

Unsafe dan Raw Concurrency

Semua yang dibahas di atas adalah safe Rust. Rust juga menyediakan unsafe block untuk kode yang membutuhkan kontrol paling rendah, di mana programmer mengambil alih tanggung jawab keamanan secara penuh:

use std::sync::atomic::{AtomicPtr, Ordering};

// Contoh lock-free stack sederhana menggunakan raw pointer
struct Node<T> {
    data: T,
    next: *mut Node<T>,
}

struct LockFreeStack<T> {
    head: AtomicPtr<Node<T>>,
}

impl<T> LockFreeStack<T> {
    fn new() -> Self {
        Self { head: AtomicPtr::new(std::ptr::null_mut()) }
    }

    fn push(&self, data: T) {
        let node = Box::into_raw(Box::new(Node {
            data,
            next: std::ptr::null_mut(),
        }));

        loop {
            let head = self.head.load(Ordering::Acquire);
            unsafe { (*node).next = head; }
            if self.head.compare_exchange(
                head, node,
                Ordering::Release,
                Ordering::Relaxed,
            ).is_ok() { break; }
        }
    }
}
unsafe adalah escape hatch dari semua jaminan safety Rust. Kode di dalamnya bisa menyebabkan data race, dangling pointer, dan undefined behavior — persis seperti C. Gunakan hanya jika benar-benar diperlukan, selalu dokumentasikan mengapa kode ini aman, dan batasi area unsafe sesempit mungkin.

Checklist Review Kode Concurrent Rust

OWNERSHIP DAN SEND/SYNC:
  □ Apakah data yang di-share lintas thread dibungkus Arc?
  □ Apakah tipe yang dikirim ke thread mengimplementasikan Send?
  □ Apakah tipe yang di-share via referensi mengimplementasikan Sync?
  □ Apakah move closure digunakan saat thread butuh akses ke data dari luar?

MUTEX DAN RWLOCK:
  □ Apakah std::sync::Mutex tidak dipegang saat await? (gunakan tokio::sync::Mutex)
  □ Apakah scope lock sesempit mungkin — tidak ada operasi berat di dalamnya?
  □ Apakah urutan akuisisi lock konsisten di seluruh codebase (cegah deadlock)?
  □ Apakah Mutex poisoning ditangani dengan benar?
  □ Apakah RwLock digunakan untuk workload read-heavy?

ATOMIC:
  □ Apakah Ordering dipilih dengan tepat (SeqCst untuk keamanan, Relaxed/Acq-Rel untuk performa)?
  □ Apakah compound operation menggunakan CAS, bukan dua operasi terpisah?

ASYNC DAN TOKIO:
  □ Apakah tidak ada std::thread::sleep atau blocking I/O di async task?
  □ Apakah kode blocking/CPU-heavy dijalankan via spawn_blocking?
  □ Apakah jumlah concurrent task dibatasi (Semaphore, JoinSet)?
  □ Apakah JoinError dari task diperiksa jenisnya (cancelled vs panic)?
  □ Apakah select! branch sudah cancellation-safe?

CHANNEL:
  □ Apakah jenis channel yang tepat digunakan (mpsc/oneshot/broadcast/watch)?
  □ Apakah bounded channel digunakan untuk backpressure alami?
  □ Apakah sender di-drop saat tidak dipakai agar receiver tahu channel selesai?

UNSAFE:
  □ Apakah unsafe benar-benar diperlukan dan tidak ada alternatif safe?
  □ Apakah semua invariant safety dijaga secara manual?
  □ Apakah ada komentar yang menjelaskan mengapa unsafe ini benar-benar aman?

Ringkasan

  • Rust mencegah data race pada compile time melalui sistem ownership dan borrowing — jika dikompilasi, tidak ada race condition di level memori.
  • Trait Send (aman di-move ke thread lain) dan Sync (aman diakses via shared reference dari banyak thread) diverifikasi compiler secara statik.
  • Arc<T> untuk shared ownership lintas thread; Rc<T> untuk single-thread saja — Rc tidak Send, compiler menolak pengiriman ke thread lain.
  • Arc<Mutex<T>> adalah idiom paling umum untuk shared mutable state — lock otomatis dilepas via RAII saat MutexGuard keluar scope.
  • RwLock<T> untuk read-heavy workload: banyak reader concurrent, satu writer eksklusif.
  • Atomic types untuk operasi lock-free pada primitif — pilih Ordering yang tepat; SeqCst untuk default aman, Relaxed/Acquire/Release untuk optimasi.
  • Rust mencegah data race, tapi tidak mencegah deadlock — konsistensi urutan akuisisi lock tetap tanggung jawab programmer.
  • Future di Rust bersifat lazy — tidak berjalan sampai di-poll runtime. Berbeda dari Promise atau Coroutine yang mulai berjalan saat dibuat.
  • Tokio (v1.52.x) adalah async runtime de-facto — menyediakan multi-thread work-stealing scheduler, async I/O, dan sync primitives lengkap.
  • tokio::task::spawn untuk async task concurrent; spawn_blocking untuk kode blocking/CPU-heavy agar tidak membekukan worker thread.
  • Jangan gunakan std::sync::Mutex melewati titik await — gunakan tokio::sync::Mutex yang suspend task tanpa memblokir thread.
  • Empat channel Tokio: mpsc (many-to-one), oneshot (satu pesan), broadcast (one-to-many), watch (state terkini selalu tersedia).
  • tokio::select! untuk menunggu dari beberapa future/channel — berguna untuk timeout, cancellation, dan racing alternatif.
  • Cancellation di Tokio bersifat drop-based, terjadi di setiap titik await — pastikan kode cancellation-safe agar tidak ada state setengah-selesai.
  • unsafe adalah escape hatch dari semua jaminan Rust — gunakan hanya jika mutlak diperlukan, dokumentasikan invariant, dan batasi areanya sesempit mungkin.

Portofolio