-
Notifications
You must be signed in to change notification settings - Fork 4
Example Cluster
ANYKS edited this page Sep 1, 2025
·
12 revisions
#include <awh/core/cluster.hpp>
using namespace awh;
using namespace placeholders;
class Executor {
private:
log_t * _log;
public:
void ready(const pid_t pid, cluster::core_t * core){
const string message = "Hi!";
core->send(pid, message.c_str(), message.size());
}
void events(const cluster_t::family_t worker, [[maybe_unused]] const pid_t pid, const cluster_t::event_t event, cluster::core_t * core){
if(event == cluster_t::event_t::START){
switch(static_cast <uint8_t> (worker)){
case static_cast <uint8_t> (cluster_t::family_t::MASTER):
core->emplace();
break;
case static_cast <uint8_t> (cluster_t::family_t::CHILDREN): {
const string message = "Hello";
core->send(message.c_str(), message.size());
} break;
}
}
}
void message(const cluster_t::family_t worker, const pid_t pid, const char * buffer, const size_t size){
switch(static_cast <uint8_t> (worker)){
case static_cast <uint8_t> (cluster_t::family_t::MASTER):
this->_log->print("Message from children [%u]: %s", log_t::flag_t::INFO, pid, string(buffer, size).c_str());
break;
case static_cast <uint8_t> (cluster_t::family_t::CHILDREN):
this->_log->print("Message from master: %s [%u]", log_t::flag_t::INFO, string(buffer, size).c_str(), ::getpid());
break;
}
}
void launched(const awh::core_t::status_t status){
switch(static_cast <uint8_t> (status)){
case static_cast <uint8_t> (awh::core_t::status_t::START):
this->_log->print("%s", log_t::flag_t::INFO, "Start cluster");
break;
case static_cast <uint8_t> (awh::core_t::status_t::STOP):
this->_log->print("%s", log_t::flag_t::INFO, "Stop cluster");
break;
}
}
public:
Executor(log_t * log) : _log(log) {}
};
int32_t main(int32_t argc, char * argv[]){
fmk_t fmk;
log_t log(&fmk);
Executor executor(&log);
cluster::core_t core(&fmk, &log);
log.name("Cluster");
log.format("%H:%M:%S %d.%m.%Y");
core.size();
core.autoRestart(true);
// Setting the cluster name
core.name("ANYKS");
// Activating the mode of exchanging messages between processes via a unix socket
// core.transfer(cluster_t::transfer_t::IPC);
core.password("Password");
core.cipher(hash_t::cipher_t::AES256);
core.compressor(hash_t::method_t::ZSTD);
core.on <void (const pid_t)> ("ready", &Executor::ready, &executor, _1, &core);
core.on <void (const awh::core_t::status_t)> ("status", &Executor::launched, &executor, _1);
core.on <void (const cluster_t::family_t, const pid_t, const cluster_t::event_t)> ("events", &Executor::events, &executor, _1, _2, _3, &core);
core.on <void (const cluster_t::family_t, const pid_t, const char *, const size_t)> ("message", &Executor::message, &executor, _1, _2, _3, _4);
core.start();
return EXIT_SUCCESS;
}copyright © ANYKS