Program Listing for File churn.cpp¶
↰ Return to documentation for file (churn/churn.cpp
)
/*
* This file is part of UMAP. For copyright information see the COPYRIGHT file
* in the top level directory, or at
* https://github.com/LLNL/umap/blob/master/COPYRIGHT This program is free
* software; you can redistribute it and/or modify it under the terms of the
* GNU Lesser General Public License (as published by the Free Software
* Foundation) version 2.1 dated February 1999. This program is distributed in
* the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the
* IMPLIED WARRANTY OF MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
* See the terms and conditions of the GNU Lesser General Public License for
* more details. You should have received a copy of the GNU Lesser General
* Public License along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
/*
The idea is that we have a single "Load Page" and a set
of N "Churn Pages" as follows:
+==================================================+
| |
| Read Load Page(s) |
| |
+--------------------------------------------------+
| |
| Write{/Read} Load Page(s) |
| |
+--------------------------------------------------+
| |
| Churn Page #1 |
| |
+--------------------------------------------------+
| |
| Churn Page #... |
| |
+--------------------------------------------------+
| |
| Churn Page #N |
| |
+==================================================+
We then have a smaller page_buffer_size that these pages will be faulted
into and madvised out of via umap().
The LoadPage will have a set of num_load_reader and num_load_writer threads
focussed exclusively on making reads and writes to locations constrained to
the Load Page.
The the Churn Pages will have num_churn_reader threads performing random
byte read accesses across all of the Churn Pages effectively causing the
Load Page to be paged in and out of the smaller Page_Buffer.
*/
#include <iostream>
#include <cassert>
#include <cstdint>
#include <chrono>
#include <thread>
#include <mutex>
#include <vector>
#include <random>
#include <algorithm>
#include <utmpx.h> // sched_getcpu()
#include <omp.h>
#include "umap/umap.h"
#include "options.h"
#include "../utility/commandline.hpp"
#include "../utility/umap_file.hpp"
uint64_t g_count = 0;
using namespace std;
using namespace chrono;
class pageiotest {
public:
pageiotest(int _ac, char** _av): time_to_stop{false}, pagesize{utility::umt_getpagesize()} {
getoptions(options, _ac, _av);
umt_options.usemmap = options.usemmap;
umt_options.filename = options.fn;
umt_options.noinit = options.noinit;
umt_options.initonly = options.initonly;
num_rw_load_pages = num_read_load_pages = options.num_load_pages;
num_churn_pages = options.num_churn_pages;
base_addr = utility::map_in_file(options.fn, options.initonly,
options.noinit, options.usemmap,
(num_churn_pages + num_rw_load_pages + num_read_load_pages) * pagesize);
if ( base_addr == nullptr ) {
exit(1);
}
assert(base_addr != NULL);
read_load_pages = base_addr;
rw_load_pages = (void*)((uint64_t)base_addr + (num_read_load_pages*pagesize));
churn_pages = (void*)((uint64_t)base_addr + ( (num_read_load_pages + num_rw_load_pages) * pagesize));
if ( ! options.noinit ) {
init();
}
cout << "Churn Test:\n\t"
<< options.page_buffer_size << " Pages in page buffer\n\t"
<< num_read_load_pages << " Read Load (focus) pages from " << hex << read_load_pages << " - " << (void*)((char*)read_load_pages+((num_read_load_pages*pagesize)-1)) << dec << "\n\t"
<< num_rw_load_pages << " RW Load (focus) pages from " << hex << rw_load_pages << " - " << (void*)((char*)rw_load_pages+((num_rw_load_pages*pagesize)-1)) << dec << "\n\t"
<< options.num_churn_pages << " Churn pages from " << hex << churn_pages << " - " << (void*)((char*)churn_pages+((options.num_churn_pages*pagesize)-1)) << dec << "\n\t"
<< options.num_churn_threads << " Churn threads\n\t"
<< options.num_load_reader_threads << " Load Reader threads\n\t"
<< options.num_load_writer_threads << " Load Writer threads\n\t"
<< options.fn << " Backing file\n\t"
<< options.testduration << " seconds for test duration.\n\n";
check();
}
~pageiotest( void ) {
utility::unmap_file(umt_options.usemmap,
(options.num_churn_pages + num_rw_load_pages
+ num_read_load_pages) * pagesize, base_addr);
}
void start( void ) {
if (options.initonly)
return;
for (uint64_t t = 0; t < options.num_load_reader_threads; ++t)
load_readers.push_back(new thread{&pageiotest::load_read, this, t});
for (uint64_t t = 0; t < options.num_load_reader_threads; ++t)
load_rw_readers.push_back(new thread{&pageiotest::load_rw_read, this, t});
for (uint64_t t = 0; t < options.num_load_writer_threads && t < 1; ++t)
load_rw_writers.push_back(new thread{&pageiotest::load_rw_write, this});
for (uint64_t t = 0; t < options.num_churn_threads; ++t)
churn_readers.push_back(new thread{&pageiotest::churn, this, t});
}
void run( void ) {
if (options.initonly)
return;
this_thread::sleep_for(seconds(options.testduration));
}
void stop( void ) {
time_to_stop = true;
for (auto i : load_readers)
i->join();
for (auto i : load_rw_readers)
i->join();
for (auto i : load_rw_writers)
i->join();
for (auto i : churn_readers)
i->join();
}
private:
bool time_to_stop;
utility::umt_optstruct_t umt_options;
options_t options;
long pagesize;
void* base_addr;
void* read_load_pages;
void* rw_load_pages;
void* churn_pages;
vector<thread*> load_readers;
vector<thread*> load_rw_readers;
vector<thread*> load_rw_writers;
vector<thread*> churn_readers;
uint64_t num_churn_pages;
uint64_t num_read_load_pages;
uint64_t num_rw_load_pages;
void check() {
cout << "Checking churn load pages\n";
{
uint64_t* p = (uint64_t*)churn_pages;
for (uint64_t i = 0; i < num_churn_pages * (pagesize/sizeof(*p)); i += (pagesize/sizeof(*p)))
if (p[i] != i) {
cerr << "check(CHURN): *(uint64_t*)" << &p[i] << "=" << p[i] << " != " << (unsigned long long)i << endl;
exit(1);
}
}
cout << "Checking read load pages\n";
{
uint64_t* p = (uint64_t*)(uint64_t)read_load_pages;
for (uint64_t i = 0; i < num_read_load_pages * (pagesize/sizeof(*p)); ++i)
if (p[i] != i) {
cerr << "check(READER): *(uint64_t*)" << &p[i] << "=" << p[i] << " != " << (unsigned long long)i << endl;
exit(1);
}
}
cerr << "Check Complete\n";
}
void init() {
cout << "Initializing churn pages\n";
{
uint64_t* p = (uint64_t*)churn_pages;
#pragma omp parallel for
for (uint64_t i = 0; i < num_churn_pages * (pagesize/sizeof(*p)); ++i)
p[i] = i;
}
cout << "Initializing read load pages pages\n";
{
uint64_t* p = (uint64_t*)(uint64_t)read_load_pages;
#pragma omp parallel for
for (uint64_t i = 0; i < num_read_load_pages * (pagesize/sizeof(*p)); ++i)
p[i] = i;
}
cout << "Initializing rw load pages pages\n";
{
uint64_t* p = (uint64_t*)(uint64_t)rw_load_pages;
#pragma omp parallel for
for (uint64_t i = 0; i < num_rw_load_pages * (pagesize/sizeof(*p)); ++i)
p[i] = i;
}
cerr << "Initialization Complete\n";
}
mutex lock;
uint64_t churn( int tnum ) {
uint64_t cnt = 0;
uint64_t idx;
uint64_t* p = (uint64_t*)churn_pages;
mt19937 gen(tnum);
uniform_int_distribution<uint64_t> rnd_int(0, ((num_churn_pages*(pagesize/sizeof(*p)))-1));
while ( !time_to_stop ) {
idx = rnd_int(gen);
if (p[idx] != idx) {
lock.lock();
cerr << hex << "churn() " << p[idx] << " != " << idx << " at address " << &p[idx] << endl;
lock.unlock();
break;
}
}
return cnt;
}
void load_read(int tnum) {
uint64_t* p = (uint64_t*)read_load_pages;
tnum = tnum + 2048;
mt19937 gen(tnum);
uniform_int_distribution<uint64_t> rnd_int(0, ((num_read_load_pages*(pagesize/sizeof(*p)))-1));
while ( !time_to_stop ) {
uint64_t idx = rnd_int(gen);
if (p[idx] != idx) {
lock.lock();
cerr << "load_read *(uint64_t*)" << &p[idx] << "=" << p[idx] << " != " << idx << endl;
lock.unlock();
break;
}
}
}
// Have a reader going nuts on the write page for fun. No data validation since the writer is changing it from underneath us.
void load_rw_read(int tnum) {
uint64_t* p = (uint64_t*)rw_load_pages;
tnum = tnum + tnum * 53;
mt19937 gen(tnum);
uniform_int_distribution<uint64_t> rnd_int(0, ((num_rw_load_pages*(pagesize/sizeof(*p)))-1));
while ( !time_to_stop ) {
uint64_t idx = rnd_int(gen);
g_count += p[idx];
}
}
void load_rw_write() {
uint64_t cnt = 0;
uint64_t* p = (uint64_t*)((uint64_t)rw_load_pages);
const int num_entries = num_rw_load_pages * pagesize/sizeof(*p);
omp_set_num_threads(options.num_load_writer_threads);
while ( !time_to_stop ) {
uint64_t cnt_base = cnt;
#pragma omp parallel for
for (int i = 0; i < num_entries; ++i) {
p[i] = cnt_base + i;
}
#pragma omp parallel for
for (int i = 0; i < num_entries; ++i) {
if (p[i] != (cnt_base + i)) {
#pragma omp critical
{
lock.lock();
cerr << "load_rw_write *(uint64_t*)" << &p[i] << "=" << p[i] << " != " << (cnt_base+i) << ": (" << cnt_base << "+" << i << "=" << (cnt_base+i) << ") - " << p[i] << " = " << ((cnt_base+i)-p[i]) << endl;
if (i != 0)
cerr << "load_rw_write *(uint64_t*)" << &p[0] << "=" << p[0] << " , " << (cnt_base+0) << ": (" << cnt_base << "+" << 0 << "=" << (cnt_base+0) << ") - " << p[0] << " = " << ((cnt_base+0)-p[0]) << endl;
lock.unlock();
}
exit(1);
}
}
cnt += pagesize/sizeof(*p);
}
}
};
int main(int argc, char **argv)
{
pageiotest test{argc, argv};
test.start();
test.run();
test.stop();
return 0;
}