多线程accumulate实现

#include <iostream>
#include <thread>
#include <string>
#include <vector>
#include <memory>
#include <algorithm>
#include <numeric>
#include <random>

using namespace std;

class scoped_thread {
public:
  explicit scoped_thread(std::thread t_): t(std::move(t_)) {
    if(!t.joinable()) throw ;
  }

  ~scoped_thread() {
    if( t.joinable() ) {
      t.join();
    }
  }

  scoped_thread(const scoped_thread&) = delete;
  scoped_thread& operator=(const scoped_thread&) = delete;
private:
  std::thread t;
};


template<typename Iterator, typename T>
struct accumulate_block {
  void operator()(Iterator first, Iterator last, T& result) {
    result = std::accumulate(first, last, result);
  }
};



template<typename Iterator, typename T>
T parallel_accumulate(Iterator first, Iterator last, T init) {
  unsigned long const max_thread = std::thread::hardware_concurrency() - 1;
  unsigned long const distence = std::distance(first, last);
  unsigned long const fraction = distence % max_thread;
  unsigned long const stepsize = distence / max_thread;
  if( distence < 100) {
    std::accumulate(first, last, init);
    return init;
  }

  vector<T> result(max_thread);
  vector<thread> threads;
  Iterator block_start = first;
  for( size_t i = 0; i < max_thread -1; ++i) {
    Iterator block_end = block_start;
    std::advance(block_end, stepsize);
    threads.emplace_back(accumulate_block<Iterator, T>(), block_start, block_end, std::ref(result[i]));
    block_start = block_end;
  }

  threads.emplace_back(accumulate_block<Iterator, T>(), block_start, last, std::ref(result[max_thread-1]));

  for( auto& entry: threads ) {
    if( entry.joinable() )  entry.join();
  }

  return std::accumulate(result.begin(), result.end(), init);
}

void readyDataSet(std::vector<unsigned long long>& data) {
  cout << __PRETTY_FUNCTION__ << ": " << endl;
  auto tb = chrono::system_clock::now();
  default_random_engine dre(12932);
  unsigned long long MAX = 1000000000;
  data.resize(MAX);
  uniform_int_distribution<unsigned long long> dis(0,20);
  for( size_t i = 0; i < MAX; ++i ) {
    data.push_back(dis(dre));
  }
  auto te = chrono::system_clock::now();
  chrono::duration<double> diff = te - tb;
  cout << "Time spend: " <<  diff.count() << endl;
}

void test_paralle(std::vector<unsigned long long>& data) {
  cout << __PRETTY_FUNCTION__ << ": " << endl;
  auto tb = chrono::system_clock::now();
  auto re = parallel_accumulate(data.begin(), data.end(), 0);
  auto te = chrono::system_clock::now();
  chrono::duration<double> diff = te - tb;
  cout << "Time spend: " <<  diff.count() << endl;

}

void test_noneparalle(std::vector<unsigned long long>& data) {
  cout << __PRETTY_FUNCTION__ << ": " << endl;
  cout << "Begin testing... " << endl;
  auto tb = chrono::system_clock::now();
  auto re = accumulate(data.begin(), data.end(), 0);
  auto te = chrono::system_clock::now();
  chrono::duration<double> diff = te - tb;
  cout << "Time spend: " <<  diff.count() << endl;

}

int main(void) {
  cout << "Hardware supporting core: " << thread::hardware_concurrency() << endl;
  vector<unsigned long long> data;
  readyDataSet(data);
  test_paralle(data);
  test_noneparalle(data);
}

上一篇:20191324第七、八章读书笔记


下一篇:Pycharm开发Django模版结构优化