并行计算

利用cpu多核计算子集,然后合并返回结果;例如在快排中,本身计算privot左边,交给子线程计算privot右边

倘若我们要实现一套通用的规则而不依赖于存储容器,那就可以使用函数式编程

程序由一系列函数组成,每一步计算就是函数求值,并且不改变状态和数据(那样拷贝数据返回新值挺费内存吧)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
template<class T>
std::list<T> parallel_quick_sort(std::list<T> input) {
if(input.empty()) return input;
std::list<T> result;
// 转移,前插
result.splice(result.begin(), input, input.begin());
T const& pivot = *result.begin();
基于privot拆分链表
auto divide_point = std::partition(input.begin(), input.end(),
[&](T const& t) {return t < pivot; });
std::list<T> lower_part;
// 后插
lower_part.splice(lower_part.end(), input, input.begin(),
divide_point);
// ①因为lower_part是副本,所以并行操作不会引发逻辑错误,这里可以启动future做排序
std::future<std::list<T>> new_lower(
std::async(&parallel_quick_sort<T>, std::move(lower_part)));

// ②
auto new_higher(
parallel_quick_sort(std::move(input)));
result.splice(result.end(), new_higher);
result.splice(result.begin(), new_lower.get());
return result;
}

async调用是人性化,假若资源不够async通过std::launch::deffered启动

两种并发模型

Actor模型

不同于共享内存,每个线程可以称作actor,每个actor实体通过消息传递来进行交互,每个实体实现了自己的邮箱,其他actor投递邮件,该实体一次只能同步处理一分邮件,这个过程像队列一样,因此Actor模型无需加锁(接收应该同取出一样)

actor强调在消息传递中的角色,各种不同的actor通过消息形参流水线

CSP模型

CSP模型中,通道(channel)是第一类对象,它不关注发送消息的实体,而关注发送消息时使用的通道。与Actor模型相比,CSP模型中的发送方和接收方是解耦的,并不需要知道对方的存在

csp强调在消息传递中的作用

两者区别:Actor模型使用异步消息传递,而CSP模型使用同步通道通信。这意味着在Actor模型中,消息发送后发送者可以立即继续执行,而在CSP模型中,发送者必须等待接收者准备好接收消息。

simple use

CAF

spawn函数接收函数指针或actor对象来创建一个actor对象,

创建mirror actor进行反转字符串

创建hello_world actor接收一个actor参数向其发送字符串

request基于future和promise机制的异步操作,then设置事件完成回调函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
#include <iostream>
#include <string>

#include <caf/io/all.hpp>
#include <caf/all.hpp>

// 镜像反转actor
caf::behavior mirror(caf::event_based_actor* self) {
return {
[=](const std::string& what) -> std::string {
// prints "Hello World!" via aout (thread-safe cout wrapper)
aout(self) << what << std::endl;
// reply "!dlroW olleH"
return std::string{what.rbegin(), what.rend()};
},
};
}

// helloworld Actor
void hello_world(event_based_actor* self, const actor& buddy) {
// send "Hello World!" to our buddy ...
self->request(buddy, std::chrono::seconds(10), "Hello World!")
.then(
// ... wait up to 10s for a response ...
[=](const std::string& what) {
// ... and print it
aout(self) << what << std::endl;
});
}

void caf_main(actor_system& sys) {
auto mirror_actor = sys.spawn(mirror);
sys.spawn(hello_world, mirror_actor);
// the system will wait until both actors are done before exiting the program
}
// main函数宏定义
CAF_MAIN()

C++实现类似go的channel

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
template<class T>
class Channel {
private:
std::queue<T> queue_;
std::mutex mtx_;
std::condition_variable producer_
bool closed_ = false;

public:
Channel()=defult;
bool send(T value){
std::unique_lock<std::mutex> lock(mtx_);
producer.wait(lock,[this]{return (queue_.empty() || closed_);});
if(closed_) return false;
queue_.push(value), consumer_.notify_one();
return true;
}

bool reeceive(T* val) {
std::unique_lock<std::mutex> lock(myx_);
consumer_wait(lock,[this]{return !queue() || closed_;});
if(close_ && queue_.empty()) return false;

val = queue.front(),q.pop(),producer.notify_one();
return true;
}

void close(){
std::unique_lock<std::mutex> lock(mtx_);
closed_ = true;
producer.notify_all(),consumer_.notify_all();
}
};