/******************************************************************************!
* \file messagejob.cpp
* \author Sebastien Beaugrand
* \sa http://beaugrand.chez.com/
* \copyright CeCILL 2.1 Free Software license
******************************************************************************/
#include <unistd.h>
#include <amqp.h>
#include <amqp_tcp_socket.h>
#include <iostream>
#include "messagejob.h"
#include "mqexception.h"
namespace mq {
/******************************************************************************!
* \fn open
******************************************************************************/
void
MessageJob::open()
{
mConn = amqp_new_connection();
auto sock = amqp_tcp_socket_new(mConn);
if (sock == NULL) {
throw Exception("amqp_tcp_socket_new");
}
auto status = amqp_socket_open(sock, hostname.c_str(), port);
if (status != AMQP_STATUS_OK) {
throw Exception("amqp_socket_open", status);
}
auto res = amqp_login(mConn, "/", 0, 131072, 0,
AMQP_SASL_METHOD_PLAIN, "guest", "guest");
if (res.reply_type != AMQP_RESPONSE_NORMAL) {
throw Exception(res);
}
amqp_channel_open(mConn, 1);
res = amqp_get_rpc_reply(mConn);
if (res.reply_type != AMQP_RESPONSE_NORMAL) {
throw Exception(res);
}
amqp_queue_declare(mConn, 1, amqp_cstring_bytes(mQueueName.c_str()),
0, // passive
0, // durable
0, // exclusive
1, // auto_delete
amqp_empty_table);
res = amqp_get_rpc_reply(mConn);
if (res.reply_type != AMQP_RESPONSE_NORMAL) {
throw Exception(res);
}
amqp_basic_consume(mConn, 1, amqp_cstring_bytes(mQueueName.c_str()),
amqp_empty_bytes,
0, // no_local
1, // no_ack
0, // exclusive
amqp_empty_table);
res = amqp_get_rpc_reply(mConn);
if (res.reply_type != AMQP_RESPONSE_NORMAL) {
throw Exception(res);
}
amqp_maybe_release_buffers(mConn);
}
/******************************************************************************!
* \fn run
******************************************************************************/
void
MessageJob::run()
{
while (mLoop) {
try {
this->loop();
} catch (const Exception& e) {
std::cerr << "error: " << e.what() << std::endl;
sleep(5);
try {
this->destroy();
} catch (const Exception&) {
}
mConn = nullptr;
}
}
try {
this->destroy();
} catch (const Exception& e) {
std::cerr << "error: " << e.what() << std::endl;
}
}
/******************************************************************************!
* \fn loop
******************************************************************************/
void
MessageJob::loop()
{
amqp_envelope_t envelope;
struct timeval timeout = { 1, 0 };
if (mConn == nullptr) {
this->open();
}
auto res = amqp_consume_message(mConn, &envelope, &timeout, 0);
if (res.reply_type == AMQP_RESPONSE_LIBRARY_EXCEPTION &&
res.library_error == AMQP_STATUS_TIMEOUT) {
return;
}
if (res.reply_type != AMQP_RESPONSE_NORMAL) {
throw Exception(res);
}
std::string str(static_cast<const char*>(envelope.message.body.bytes),
envelope.message.body.len);
/* */ if (str == "status") {
this->reply("status: ok", envelope);
} else if (str == "click") {
this->reply("click: ok", envelope);
emit clickSignal();
} else if (str == "quit") {
this->reply("quit: ok", envelope);
emit closeSignal();
}
amqp_destroy_envelope(&envelope);
}
/******************************************************************************!
* \fn reply
******************************************************************************/
void
MessageJob::reply(const std::string& msg,
const amqp_envelope_t& envelope) const
{
amqp_basic_properties_t props;
props._flags =
AMQP_BASIC_CONTENT_TYPE_FLAG |
AMQP_BASIC_DELIVERY_MODE_FLAG |
AMQP_BASIC_CORRELATION_ID_FLAG;
props.content_type = amqp_cstring_bytes("text/plain");
props.delivery_mode = 1;
props.correlation_id = envelope.message.properties.correlation_id;
auto status = amqp_basic_publish(mConn, 1,
amqp_cstring_bytes(""),
envelope.message.properties.reply_to,
1, // mandatory
0, // immediate
&props,
amqp_cstring_bytes(msg.c_str()));
if (status != AMQP_STATUS_OK) {
throw Exception("amqp_basic_publish", status);
}
}
/******************************************************************************!
* \fn destroy
******************************************************************************/
void
MessageJob::destroy()
{
if (mConn == nullptr) {
return;
}
auto res = amqp_channel_close(mConn, 1, AMQP_REPLY_SUCCESS);
if (res.reply_type != AMQP_RESPONSE_NORMAL) {
throw Exception(res);
}
res = amqp_connection_close(mConn, AMQP_REPLY_SUCCESS);
if (res.reply_type != AMQP_RESPONSE_NORMAL) {
throw Exception(res);
}
auto status = amqp_destroy_connection(mConn);
if (status != AMQP_STATUS_OK) {
throw Exception("amqp_destroy_connection", status);
}
mConn = nullptr;
}
}