2 puan yazan GN⁺ 2024-07-07 | 1 yorum | WhatsApp'ta paylaş

Eşzamanlı veri yapılarının doğru şekilde test edilmesi

Bir, iki, üç, iki

  • Rust kütüphanesi loom kullanılarak lock-free veri yapıları kapsamlı biçimde test edilebilir
  • Basit bir eşzamanlı sayaç örnek kodu sunuluyor
  • Koddaki hata, artırma işleminin atomik olmaması
use std::sync::atomic::{AtomicU32, Ordering::SeqCst};

#[derive(Default)]
pub struct Counter {
    value: AtomicU32,
}

impl Counter {
    pub fn increment(&self) {
        let value = self.value.load(SeqCst);
        self.value.store(value + 1, SeqCst);
    }

    pub fn get(&self) -> u32 {
        self.value.load(SeqCst)
    }
}

Basit test

  • Birden çok thread'in aynı sayacı tekrar tekrar artırdığı ve sonucun kontrol edildiği bir test
  • Test gerçekten başarısız olabiliyor, ancak zamanlamaya bağlı olduğundan yeniden üretmesi zor
#[test]
fn threaded_test() {
    let counter = Counter::default();
    let thread_count = 100;
    let increment_count = 100;

    std::thread::scope(|scope| {
        for _ in 0..thread_count {
            scope.spawn(|| {
                for _ in 0..increment_count {
                    counter.increment()
                }
            });
        }
    });

    assert_eq!(counter.get(), thread_count * increment_count);
}

Özellik tabanlı test (PBT)

  • Durum makinelerini test etmeye uygun özellik tabanlı test yaklaşımı uygulanmaya çalışılıyor
  • Thread'ler elle adım adım çalıştırılabilse, başka bir thread'in load&store arasına girmek kolay olurdu
#[test]
fn state_machine_test() {
    arbtest::arbtest(|rng| {
        let mut state: i32 = 0;
        let step_count: usize = rng.int_in_range(0..=100)?;

        for _ in 0..step_count {
            match *rng.choose(&["inc", "dec"])? {
                "inc" => state += 1,
                "dec" => state -= 1,
                _ => unreachable!(),
            }
        }
        Ok(())
    });
}

Basit enstrümantasyon

  • Bir thread'in atomik işlemler arasında "duraklatılabilmesini" sağlayan yöntem
pub fn increment(&self) {
    pause();
    let value = self.value.load(SeqCst);
    pause();
    self.value.store(value + 1, SeqCst);
    pause();
}

fn pause() {
    // ¯\_(ツ)_/¯
}

Yönetilen thread API'si

  • API tasarımında bir kural, önce tek kullanıcıyla başlayıp API'nin hissini anlamak, ardından gerçek implementasyona geçmektir
  • Yönetilen thread'ler kullanılarak özellik tabanlı test yazılıyor
let counter = Counter::default();
let t1 = managed_thread::spawn(&counter);
let t2 = managed_thread::spawn(&counter);

while !rng.is_empty() {
    let coin_flip: bool = rng.arbitrary()?;
    if t1.is_paused() {
        if coin_flip {
            t1.unpause();
        }
    } else if t2.is_paused() {
        if coin_flip {
            t2.unpause();
        }
    }
}

Yönetilen thread implementasyonu

  • Kontrol thread'i ile yönetilen thread arasında iletişim gerekiyor
  • Durumu koruyan bir mutex ve condition variable kullanılarak implementasyon yapılıyor
struct SharedContext {
    state: Mutex<State>,
    cv: Condvar,
}

#[derive(PartialEq, Eq, Default)]
enum State {
    #[default]
    Running,
    Paused,
}

impl SharedContext {
    fn pause(&self) {
        let mut guard = self.state.lock().unwrap();
        assert_eq!(*guard, State::Running);
        *guard = State::Paused;
        self.cv.notify_all();
        guard = self.cv.wait_while(guard, |state| *state == State::Paused).unwrap();
        assert_eq!(*guard, State::Running);
    }
}

Tüm kodun birleştirilmesi

  • Yönetilen thread'ler ile test kodunun entegrasyonu
#[test]
fn test_counter() {
    arbtest::arbtest(|rng| {
        eprintln!("begin trace");
        let counter = Counter::default();
        let mut counter_model: u32 = 0;

        std::thread::scope(|scope| {
            let t1 = managed_thread::spawn(scope, &counter);
            let t2 = managed_thread::spawn(scope, &counter);
            let mut threads = [t1, t2];

            while !rng.is_empty() {
                for (tid, t) in threads.iter_mut().enumerate() {
                    if rng.arbitrary()? {
                        if t.is_paused() {
                            eprintln!("{tid}: unpause");
                            t.unpause()
                        } else {
                            eprintln!("{tid}: increment");
                            t.submit(|c| c.increment());
                            counter_model += 1;
                        }
                    }
                }
            }

            for t in threads {
                t.join();
            }
            assert_eq!(counter_model, counter.get());

            Ok(())
        })
    });
}

GN⁺ özeti

  • Bu yazı, eşzamanlı veri yapılarını test etme yöntemlerini açıklıyor
  • Rust'ın loom kütüphanesiyle atomik olmayan işlemlerin nasıl test edilebileceğini inceliyor
  • Yönetilen thread'lerle eşzamanlılık sorunlarını yeniden üretilebilir ve debug edilebilir biçimde test ediyor
  • Eşzamanlı programlamayla ilgilenen geliştiriciler için faydalı olacaktır
  • Benzer işlevlere sahip bir proje olarak Java'nın JCStress'i bulunuyor

1 yorum

 
GN⁺ 2024-07-07
Hacker News görüşü
  • Rust ile Temper adlı bir kütüphane geliştiriyorum ve Rust bellek modelinin karmaşık kısımlarını ele almak için çok çaba gerekiyor

    • C++/Rust bellek modeline ait en büyük test vakası koleksiyonunu içeriyor
    • Loom, mutex ve kuyruk gibi yüksek seviyeli yapıları kapsamlı biçimde test etmeyi sağlayan daha eksiksiz bir kütüphane
    • Foundation DB test sunumundan ilham aldım ve WebAssembly'nin bu alanda önemli bir rol oynayacağına inanıyorum
  • Rust'ta paylaşımlı bellek için atomik snapshot uyguladım ve otomatik testleri çok önemli görüyorum

    • Başta Loom kullandım ama daha sonra Shuttle'a geçtim
    • Shuttle, rastgeleleştirilmiş bir yaklaşım kullanıyor ve hata bulma konusunda olasılıksal güvence sağlıyor
    • Shuttle daha hızlı ve daha karmaşık test senaryolarına iyi ölçekleniyor
    • Başarısız olan testleri yeniden üretme özelliği çok önemli
  • Bu yaklaşımın dezavantajı, test koduna uydurmak için kodun kendisini değiştirmeniz gerekmesi

    • Bunu, iki thread çalıştırıp ptrace ile tek adım üzerinden komut yürütmeyi rastgele karıştırarak yapmak da mümkün olabilir
  • JetBrains'in Lincheck'i Kotlin/Java dünyasında iyi bir kütüphane

    • Bildirimsel olması ve linearization sonucunu yazdırma biçimi hoşuma gidiyor
  • C++ için "Loom" benzeri bir kütüphane olup olmadığını merak ediyorum

    • lock-free veri yapılarını test etmek istiyorum
  • Bu yaklaşımın soft progress garantileri açısından sınırlamaları olabilir

    • cmpxchg döngüsünde gerçek donanım ve scheduler altında kesintiye uğrama olasılığı çok düşüktür
    • Ancak bu test yaklaşımında ilerleme olasılığı iş sayısına ve duraklatma sayısına göre değişir
  • Pratik bilgi gerekiyor ve gerçek thread'ler oluşturmak lazım

    • async runtime kullanılıp kullanılamayacağını merak ediyorum
  • ptrace kullanarak thread'leri tek adım çalıştırıp komut seviyesi farklı interleaving'ler oluşturabilirsiniz

    • black-box test için bir alternatif olup olmadığını merak ediyorum
  • Loom kullanmak için koşullu derleme kullanmak gerekiyor, bu da biraz istilacı

    • Diğer dillerin kendi scheduler'larını kullanma konusunda daha iyi olup olmadığını merak ediyorum
  • Python'da aynı işi nasıl yapabileceğimi bilmek istiyorum

    • Buna benzer testlere izin veren thread sınıfları oluşturulup oluşturulamayacağını merak ediyorum