Message receiving

This commit is contained in:
Julien CLEMENT 2020-05-05 21:04:03 +02:00
parent 0354ce0bea
commit 6795dd7127
5 changed files with 66 additions and 2 deletions

@ -1 +1 @@
9 4

@ -1 +1 @@
0 3

@ -4,6 +4,8 @@
#include "events/register.hh" #include "events/register.hh"
#include "events/send.hh" #include "events/send.hh"
#include "misc/addrinfo/addrinfo.hh" #include "misc/addrinfo/addrinfo.hh"
#include "legislator/legislator.hh"
#include "misc/logger.hh"
namespace paxos namespace paxos
{ {
@ -27,6 +29,43 @@ namespace paxos
void RecvEW::operator()() void RecvEW::operator()()
{ {
shared_socket sock = connection_->get_socket();
auto origin_ip = connection_->get_origin_ip();
auto origin_port = connection_->get_origin_port();
std::cerr << "Ready to recv data on " << origin_ip << ":"
<< origin_port << " fd: " << sock->fd_get()->fd_ << "\n";
std::cout << "RECV on pid " << getpid() << "\n";
auto buf_in = connection_->get_buffer_in();
/* Add try catch to handle session */
try
{
buf_in.receive(*sock);
}
catch (const std::system_error& e)
{
log("Connection closed while receiving on " + origin_ip + ":"
+ origin_port, red);
event_register.unregister_ew(this);
return;
}
Message message;
try
{
if (!message.parse(buf_in))
return;
self->handle_message(message);
}
catch (const std::exception& e)
{
log("Bad message", red);
}
event_register.unregister_ew(this);
} }

@ -33,6 +33,12 @@ namespace paxos
SendEW::send_message(message, legislator.second); SendEW::send_message(message, legislator.second);
} }
void Legislator::receive_next_ballot(Message message)
{
message = message;
log("received NextBallot", green);
}
void Legislator::receive_next_ballot(int ballot, std::string sender) void Legislator::receive_next_ballot(int ballot, std::string sender)
{ {
log(config_.name + " has received a NextBallot(" log(config_.name + " has received a NextBallot("
@ -52,6 +58,11 @@ namespace paxos
//XXX send a LastVote to sender //XXX send a LastVote to sender
} }
void Legislator::receive_last_vote(Message message)
{
message = message;
}
void Legislator::receive_enough_last_vote void Legislator::receive_enough_last_vote
(std::unordered_map<std::string, int> quorum_last_votes) (std::unordered_map<std::string, int> quorum_last_votes)
{ {
@ -69,4 +80,13 @@ namespace paxos
decree = decree; decree = decree;
//XXX send Voted //XXX send Voted
} }
void Legislator::handle_message(Message message)
{
std::string method = message.get_method();
if (method == "NextBallot")
receive_next_ballot(message);
else if (method == "LastVote")
receive_last_vote(message);
}
} }

@ -1,6 +1,7 @@
#pragma once #pragma once
#include "config/config.hh" #include "config/config.hh"
#include "ledger.hh" #include "ledger.hh"
#include "message/message.hh"
#include <unordered_map> #include <unordered_map>
namespace paxos namespace paxos
@ -15,13 +16,17 @@ namespace paxos
void send_next_ballot(int ballot); void send_next_ballot(int ballot);
void receive_next_ballot(Message message);
void receive_next_ballot(int ballot, std::string sender); void receive_next_ballot(int ballot, std::string sender);
void receive_last_vote(Message message);
void receive_enough_last_vote void receive_enough_last_vote
(std::unordered_map<std::string, int> quorum_last_votes); (std::unordered_map<std::string, int> quorum_last_votes);
void receive_begin_ballot(int ballot, int decree); void receive_begin_ballot(int ballot, int decree);
void handle_message(Message message);
Ledger ledger; Ledger ledger;
}; };
using shared_legislator = std::shared_ptr<Legislator>; using shared_legislator = std::shared_ptr<Legislator>;