diff --git a/legislator1/last-tried.txt b/legislator1/last-tried.txt index f11c82a..bf0d87a 100644 --- a/legislator1/last-tried.txt +++ b/legislator1/last-tried.txt @@ -1 +1 @@ -9 \ No newline at end of file +4 \ No newline at end of file diff --git a/legislator2/last-tried.txt b/legislator2/last-tried.txt index c227083..e440e5c 100644 --- a/legislator2/last-tried.txt +++ b/legislator2/last-tried.txt @@ -1 +1 @@ -0 \ No newline at end of file +3 \ No newline at end of file diff --git a/src/events/receive.cc b/src/events/receive.cc index d5f413f..4278a98 100644 --- a/src/events/receive.cc +++ b/src/events/receive.cc @@ -4,6 +4,8 @@ #include "events/register.hh" #include "events/send.hh" #include "misc/addrinfo/addrinfo.hh" +#include "legislator/legislator.hh" +#include "misc/logger.hh" namespace paxos { @@ -27,6 +29,43 @@ namespace paxos void RecvEW::operator()() { + shared_socket sock = connection_->get_socket(); + + auto origin_ip = connection_->get_origin_ip(); + auto origin_port = connection_->get_origin_port(); + + std::cerr << "Ready to recv data on " << origin_ip << ":" + << origin_port << " fd: " << sock->fd_get()->fd_ << "\n"; + std::cout << "RECV on pid " << getpid() << "\n"; + + auto buf_in = connection_->get_buffer_in(); + /* Add try catch to handle session */ + try + { + buf_in.receive(*sock); + } + catch (const std::system_error& e) + { + log("Connection closed while receiving on " + origin_ip + ":" + + origin_port, red); + + event_register.unregister_ew(this); + return; + } + + Message message; + try + { + if (!message.parse(buf_in)) + return; + self->handle_message(message); + } + catch (const std::exception& e) + { + log("Bad message", red); + } + + event_register.unregister_ew(this); } diff --git a/src/legislator/legislator.cc b/src/legislator/legislator.cc index e1950e7..32e7b82 100644 --- a/src/legislator/legislator.cc +++ b/src/legislator/legislator.cc @@ -33,6 +33,12 @@ namespace paxos SendEW::send_message(message, legislator.second); } + void Legislator::receive_next_ballot(Message message) + { + message = message; + log("received NextBallot", green); + } + void Legislator::receive_next_ballot(int ballot, std::string sender) { log(config_.name + " has received a NextBallot(" @@ -52,6 +58,11 @@ namespace paxos //XXX send a LastVote to sender } + void Legislator::receive_last_vote(Message message) + { + message = message; + } + void Legislator::receive_enough_last_vote (std::unordered_map quorum_last_votes) { @@ -69,4 +80,13 @@ namespace paxos decree = decree; //XXX send Voted } + + void Legislator::handle_message(Message message) + { + std::string method = message.get_method(); + if (method == "NextBallot") + receive_next_ballot(message); + else if (method == "LastVote") + receive_last_vote(message); + } } diff --git a/src/legislator/legislator.hh b/src/legislator/legislator.hh index 7f6a1d8..f798cd4 100644 --- a/src/legislator/legislator.hh +++ b/src/legislator/legislator.hh @@ -1,6 +1,7 @@ #pragma once #include "config/config.hh" #include "ledger.hh" +#include "message/message.hh" #include namespace paxos @@ -15,13 +16,17 @@ namespace paxos void send_next_ballot(int ballot); + void receive_next_ballot(Message message); void receive_next_ballot(int ballot, std::string sender); + void receive_last_vote(Message message); void receive_enough_last_vote (std::unordered_map quorum_last_votes); void receive_begin_ballot(int ballot, int decree); + void handle_message(Message message); + Ledger ledger; }; using shared_legislator = std::shared_ptr;