Skip to content

Example Cluster

ANYKS edited this page Sep 1, 2025 · 12 revisions

Example Cluster

#include <awh/core/cluster.hpp>

using namespace awh;
using namespace placeholders;

class Executor {
	private:
		log_t * _log;
	public:

		void ready(const pid_t pid, cluster::core_t * core){
			const string message = "Hi!";
			core->send(pid, message.c_str(), message.size());
		}

		void events(const cluster_t::family_t worker, [[maybe_unused]] const pid_t pid, const cluster_t::event_t event, cluster::core_t * core){
			if(event == cluster_t::event_t::START){
				switch(static_cast <uint8_t> (worker)){
					case static_cast <uint8_t> (cluster_t::family_t::MASTER):
						core->emplace();
					break;
					case static_cast <uint8_t> (cluster_t::family_t::CHILDREN): {
						const string message = "Hello";
						core->send(message.c_str(), message.size());
					} break;
				}
			}
		}

		void message(const cluster_t::family_t worker, const pid_t pid, const char * buffer, const size_t size){
			switch(static_cast <uint8_t> (worker)){
				case static_cast <uint8_t> (cluster_t::family_t::MASTER):
					this->_log->print("Message from children [%u]: %s", log_t::flag_t::INFO, pid, string(buffer, size).c_str());
				break;
				case static_cast <uint8_t> (cluster_t::family_t::CHILDREN):
					this->_log->print("Message from master: %s [%u]", log_t::flag_t::INFO, string(buffer, size).c_str(), ::getpid());
				break;
			}
		}

		void launched(const awh::core_t::status_t status){
			switch(static_cast <uint8_t> (status)){
				case static_cast <uint8_t> (awh::core_t::status_t::START):
					this->_log->print("%s", log_t::flag_t::INFO, "Start cluster");
				break;
				case static_cast <uint8_t> (awh::core_t::status_t::STOP):
					this->_log->print("%s", log_t::flag_t::INFO, "Stop cluster");
				break;
			}
		}
	public:
		Executor(log_t * log) : _log(log) {}
};

int32_t main(int32_t argc, char * argv[]){
	fmk_t fmk;
	log_t log(&fmk);

	Executor executor(&log);
	cluster::core_t core(&fmk, &log);

	log.name("Cluster");
	log.format("%H:%M:%S %d.%m.%Y");

	core.size();
	core.autoRestart(true);

	// Setting the cluster name
	core.name("ANYKS");

	// Activating the mode of exchanging messages between processes via a unix socket
	// core.transfer(cluster_t::transfer_t::IPC);

	core.password("Password");
	core.cipher(hash_t::cipher_t::AES256);
	core.compressor(hash_t::method_t::ZSTD);

	core.on <void (const pid_t)> ("ready", &Executor::ready, &executor, _1, &core);
	core.on <void (const awh::core_t::status_t)> ("status", &Executor::launched, &executor, _1);
	core.on <void (const cluster_t::family_t, const pid_t, const cluster_t::event_t)> ("events", &Executor::events, &executor, _1, _2, _3, &core);
	core.on <void (const cluster_t::family_t, const pid_t, const char *, const size_t)> ("message", &Executor::message, &executor, _1, _2, _3, _4);

	core.start();

	return EXIT_SUCCESS;
}
Clone this wiki locally