/******************************************************************************!
* \file client.cpp
* \author Sebastien Beaugrand
* \sa http://beaugrand.chez.com/
* \copyright CeCILL 2.1 Free Software license
******************************************************************************/
#include <iostream>
#include <thread>
#include <mutex>
#include <string.h>
#include <nng/nng.h>
#include <nng/protocol/pair0/pair.h>
using namespace std::chrono_literals;
/******************************************************************************!
* \class RecvThread
******************************************************************************/
class RecvThread
{
public:
explicit RecvThread(nng_socket sock) {
mThread = std::thread(RecvThread::run, this, sock);
}
~RecvThread() {
loop = false;
mThread.join();
}
std::string getMessage() {
std::lock_guard<std::mutex> guard(mutex);
empty = true;
return message;
}
std::atomic_bool loop = true;
std::atomic_bool empty = true;
std::mutex mutex;
std::string message;
private:
static void run(RecvThread* self, nng_socket sock) {
char* buf = NULL;
size_t sz;
int rv;
while (self->loop) {
std::cout << "thread: recv ..." << std::endl;
if ((rv = nng_recv(sock, &buf, &sz, NNG_FLAG_ALLOC)) != 0) {
std::cerr << "thread: nng_recv "
<< nng_strerror(rv) << std::endl;
std::this_thread::sleep_for(500ms);
continue;
}
if (strcmp(buf, "alert") == 0) {
std::cout << "thread: alert" << std::endl;
} else {
std::cout << "thread: recv "
<< buf << std::endl;
while (! self->empty) {
std::this_thread::sleep_for(100ms);
}
std::lock_guard<std::mutex> guard(self->mutex);
self->message = buf;
self->empty = false;
}
nng_free(buf, sz);
}
}
std::thread mThread;
};
/******************************************************************************!
* \fn rpc
******************************************************************************/
void
rpc(RecvThread& recvThread, nng_socket sock, const char* method)
{
int rv;
std::cout << "client: send "
<< method << std::endl;
rv = nng_send(sock, const_cast<char*>(method), strlen(method) + 1, 0);
if (rv != 0) {
std::cerr << "client: nng_send "
<< nng_strerror(rv) << std::endl;
}
while (recvThread.empty) {
std::this_thread::sleep_for(100ms);
}
std::cout << "client: recv "
<< recvThread.getMessage() << std::endl;
}
/******************************************************************************!
* \fn main
******************************************************************************/
int
main(int argc, char** argv)
{
if (argc != 2) {
return 1;
}
const char* url = argv[1];
nng_socket sock;
int rv;
if ((rv = nng_pair0_open(&sock)) != 0) {
std::cerr << "client: nng_pair0_open "
<< nng_strerror(rv) << std::endl;
}
if ((rv = nng_dial(sock, url, NULL, 0)) != 0) {
std::cerr << "client: nng_dial "
<< nng_strerror(rv) << std::endl;
}
if ((rv = nng_setopt_ms(sock, NNG_OPT_RECVTIMEO, 100)) != 0) {
std::cerr << "client: nng_setopt_ms "
<< nng_strerror(rv) << std::endl;
}
{
RecvThread recvThread(sock);
rpc(recvThread, sock, "status");
rpc(recvThread, sock, "emit alert");
rpc(recvThread, sock, "quit");
}
nng_close(sock);
return 0;
}