Implementasi Sistem Upload & Proses Secara Asynchronous dengan Golang
Artikel ini merupakan lanjutan dari pembahasan konsep & arsitektur. Fokus di sini adalah contoh implementasi praktis menggunakan Golang, dengan dua pendekatan worker:
- CLI Worker (polling DB)
- AWS SQS Worker (event-driven)
Struktur High-Level Project
/cmd
/api -> HTTP API (upload, status)
/worker -> CLI worker (polling)
/internal
/handler -> HTTP handlers
/service -> Business logic
/repository -> DB access
/model -> Struct DB
/processor -> File processing logic
/storage -> S3 / object storage abstraction
Data Model (Golang Struct)
type UploadTask struct {
ID int64
UserID int64
FilePath string
Status string // PENDING, PROCESSING, DONE, FAILED
ErrorMessage *string
CreatedAt time.Time
UpdatedAt time.Time
}
API: Upload File
Flow
- Terima multipart upload
- Simpan file ke object storage
- Insert record ke DB (status = PENDING)
- (Opsional) Push message ke SQS
- Response ke user
Contoh Handler Sederhana
func UploadHandler(w http.ResponseWriter, r *http.Request) {
file, _, err := r.FormFile("file")
if err != nil {
http.Error(w, err.Error(), 400)
return
}
defer file.Close()
path, err := storage.Save(file)
if err != nil {
http.Error(w, err.Error(), 500)
return
}
taskID, err := repo.CreateUploadTask(r.Context(), path)
if err != nil {
http.Error(w, err.Error(), 500)
return
}
// Jika pakai SQS
// sqs.Publish(taskID)
json.NewEncoder(w).Encode(map[string]interface{}{
"task_id": taskID,
"status": "PENDING",
})
}
API: Status & History
Endpoint:
GET /uploads→ list historyGET /uploads/{id}→ detail
Query langsung ke tabel upload_tasks.
Worker Opsi 1: CLI Worker (Polling DB)
Konsep
- Program Go berjalan terus
- Loop setiap 5 detik
- Ambil 1 task PENDING
- Update ke PROCESSING
- Proses file
- Update status akhir
Contoh Loop Worker
for {
task, err := repo.FetchPendingTask(ctx)
if err == sql.ErrNoRows {
time.Sleep(5 * time.Second)
continue
}
repo.MarkProcessing(ctx, task.ID)
err = processor.Process(task)
if err != nil {
repo.MarkFailed(ctx, task.ID, err.Error())
continue
}
repo.MarkDone(ctx, task.ID)
}
Catatan Penting
- Gunakan transaction + locking
- Hindari double processing
- Cocok untuk workload kecil-menengah
Worker Opsi 2: AWS SQS + Worker
Flow
- Upload → push message ke SQS
- SQS trigger Lambda / container worker
- Worker ambil task_id
- Proses file
- Update DB
Payload Message
{
"task_id": 123
}
Contoh SQS Handler (Conceptual)
func HandleMessage(ctx context.Context, taskID int64) error {
task, err := repo.GetTask(ctx, taskID)
if err != nil {
return err
}
repo.MarkProcessing(ctx, task.ID)
if err := processor.Process(task); err != nil {
repo.MarkFailed(ctx, task.ID, err.Error())
return err
}
repo.MarkDone(ctx, task.ID)
return nil
}
Konfigurasi Penting
- Visibility timeout > max processing time
- DLQ untuk task gagal
- Retry terbatas
File Processor
Logic berat sebaiknya dipisah:
func Process(task UploadTask) error {
file, err := storage.Download(task.FilePath)
if err != nil {
return err
}
records := parseExcel(file)
for _, r := range records {
// validasi & simpan
}
return nil
}
Observability & Monitoring
Best practice:
- Logging per task_id
- Metric: success / failed
- Timeout guard
- Context cancellation
Kapan Memilih Opsi Mana?
| Kondisi | Rekomendasi |
|---|---|
| Traffic kecil | CLI Worker |
| Burst traffic | SQS + Lambda |
| Burst traffic + very long running process | AWS Batch |
| Cost-sensitive | Serverless (SQS/Event Bridge + Lambda) |
| On-prem | CLI Worker |
Penutup
Pola ini sangat umum di sistem enterprise:
- Upload cepat
- Proses lama di belakang
- User tetap punya visibility
Dengan Golang, arsitektur ini relatif mudah diimplementasikan dan scalable.
Pendekatan SQS adalah versi paling mature, sedangkan CLI worker cocok untuk tahap awal atau sistem sederhana.
Contoh implementasi ini tidak mengimplementasi full pattern seperti service-repository pattern untuk kebutuhan simplifikasi.