Add receive_last_vote and begin receive_enough_last_vote

This commit is contained in:
Julien CLEMENT 2020-05-05 23:25:20 +02:00
parent 01000023b6
commit 63797e0631
5 changed files with 74 additions and 21 deletions

@ -1,4 +1,5 @@
#include "ledger.hh" #include "ledger.hh"
#include "legislator.hh"
#include <fstream> #include <fstream>
#include <filesystem> #include <filesystem>
@ -27,14 +28,21 @@ namespace paxos
} }
int Ledger::prev_vote() Vote Ledger::prev_vote()
{ {
return read_file(base_path_ + "prev-vote.txt"); Vote v;
v.legislator = self->config_.name;
v.ballot_id = read_file(base_path_ + "prev-vote-id.txt");
Decree decree;
decree.decree = read_file(base_path_ + "prev-vote-decree.txt");
v.decree = decree;
return v;
} }
void Ledger::set_prev_vote(int v) void Ledger::set_prev_vote(Vote v)
{ {
write_file(base_path_ + "prev-vote.txt", v); write_file(base_path_ + "prev-vote-id.txt", v.ballot_id);
write_file(base_path_ + "prev-vote-decree.txt", v.decree.decree);
} }
int Ledger::next_bal() int Ledger::next_bal()

@ -3,6 +3,7 @@
#include <vector> #include <vector>
#include "law/decree.hh" #include "law/decree.hh"
#include "vote.hh"
namespace paxos namespace paxos
{ {
@ -16,8 +17,8 @@ namespace paxos
int last_tried(); int last_tried();
void set_last_tried(int b); void set_last_tried(int b);
int prev_vote(); Vote prev_vote();
void set_prev_vote(int v); void set_prev_vote(Vote v);
int next_bal(); int next_bal();
void set_next_bal(int b); void set_next_bal(int b);

@ -5,6 +5,7 @@
#include "message/message.hh" #include "message/message.hh"
#include "events/register.hh" #include "events/register.hh"
#include "events/send.hh" #include "events/send.hh"
#include "vote.hh"
namespace paxos namespace paxos
{ {
@ -16,6 +17,7 @@ namespace paxos
void Legislator::initiate_ballot() void Legislator::initiate_ballot()
{ {
quorum_previous_votes.clear();
int new_ballot_number = ledger.last_tried() + 1; int new_ballot_number = ledger.last_tried() + 1;
ledger.set_last_tried(new_ballot_number); ledger.set_last_tried(new_ballot_number);
log(config_.name + " is initiating ballot " + std::to_string(new_ballot_number), cyan); log(config_.name + " is initiating ballot " + std::to_string(new_ballot_number), cyan);
@ -55,20 +57,23 @@ namespace paxos
return; return;
} }
ledger.set_next_bal(ballot); ledger.set_next_bal(ballot);
int previous_vote = ledger.prev_vote(); Vote previous_vote = ledger.prev_vote();
previous_vote = previous_vote; previous_vote = previous_vote;
send_last_vote(ballot, previous_vote, sender); send_last_vote(ballot, previous_vote, sender);
} }
void Legislator::send_last_vote(int ballot, int previous_vote, void Legislator::send_last_vote(int ballot, Vote previous_vote,
std::string sender) std::string sender)
{ {
std::string ballot_string = std::to_string(ballot); std::string ballot_string = std::to_string(ballot);
std::string vote_string = std::to_string(previous_vote); std::string vote_ballot_id_string
= std::to_string(previous_vote.ballot_id);
std::string decree_string = std::to_string(previous_vote.decree.decree);
Message message; Message message;
message.set_method("LastVote"); message.set_method("LastVote");
message.add_header("ballot", ballot_string); message.add_header("ballot", ballot_string);
message.add_header("vote", vote_string); message.add_header("vote_ballot_id", vote_ballot_id_string);
message.add_header("decree", decree_string);
message.add_header("sender", self->config_.name); message.add_header("sender", self->config_.name);
SendEW::send_message(message, legislators[sender]); SendEW::send_message(message, legislators[sender]);
@ -76,22 +81,59 @@ namespace paxos
void Legislator::receive_last_vote(Message message) void Legislator::receive_last_vote(Message message)
{ {
message = message; std::string ballot_str = *message.get_header("ballot");
std::string vote_ballot_id_str = *message.get_header("vote");
std::string sender = *message.get_header("sender");
log(config_.name + " has received a LastVote("
+ ballot_str + ", " + vote_ballot_id_str
+ ") from " + sender, cyan);
int ballot = std::stoi(ballot_str);
int vote_ballot_id = std::stoi(vote_ballot_id_str);
int vote_decree = std::stoi(*message.get_header("decree"));
if (ballot != ledger.last_tried())
return;
Decree decree;
decree.decree = vote_decree;
Vote vote;
vote.decree = decree;
vote.legislator = sender;
vote.ballot_id = vote_ballot_id;
quorum_previous_votes.insert(std::pair<std::string, Vote>
(sender, vote));
int nb_legislators = legislators.size();
int quorum_size = quorum_previous_votes.size();
if (quorum_size > nb_legislators / 2)
receive_enough_last_vote();
} }
void Legislator::receive_enough_last_vote void Legislator::receive_enough_last_vote()
(std::unordered_map<std::string, int> quorum_last_votes)
{ {
quorum_last_votes = quorum_last_votes; Vote max_vote;
max_vote.ballot_id = -1;
for (auto legislator_vote_pair : quorum_previous_votes)
{
Vote current_vote = legislator_vote_pair.second;
if (current_vote.ballot_id > max_vote.ballot_id)
max_vote = current_vote;
}
//find d to satisfy B3 //find d to satisfy B3
//send BeginBallot to the quorum //send BeginBallot to the quorum
} }
void Legislator::receive_begin_ballot(int ballot, int decree) void Legislator::receive_begin_ballot(int ballot, int decree_id)
{ {
if (ballot != ledger.next_bal()) if (ballot != ledger.next_bal())
return; return;
ledger.set_prev_vote(ballot); Vote vote;
vote.ballot_id = ballot;
Decree decree;
decree.decree = decree_id;
vote.decree = decree;
ledger.set_prev_vote(vote);
decree = decree; decree = decree;
//XXX send Voted //XXX send Voted

@ -2,7 +2,9 @@
#include "config/config.hh" #include "config/config.hh"
#include "ledger.hh" #include "ledger.hh"
#include "message/message.hh" #include "message/message.hh"
#include "vote.hh"
#include <unordered_map> #include <unordered_map>
#include <unordered_set>
namespace paxos namespace paxos
{ {
@ -18,16 +20,17 @@ namespace paxos
void receive_next_ballot(Message message); void receive_next_ballot(Message message);
void receive_next_ballot(int ballot, std::string sender); void receive_next_ballot(int ballot, std::string sender);
void send_last_vote(int ballot, int previous_vote, std::string sender); void send_last_vote(int ballot, Vote previous_vote, std::string sender);
void receive_last_vote(Message message); void receive_last_vote(Message message);
void receive_enough_last_vote void receive_enough_last_vote();
(std::unordered_map<std::string, int> quorum_last_votes);
void receive_begin_ballot(int ballot, int decree); void receive_begin_ballot(int ballot, int decree);
void handle_message(Message message); void handle_message(Message message);
Ledger ledger; Ledger ledger;
std::unordered_map<std::string, Vote> quorum_previous_votes;
}; };
using shared_legislator = std::shared_ptr<Legislator>; using shared_legislator = std::shared_ptr<Legislator>;

@ -1,12 +1,11 @@
#pragma once #pragma once
#include "legislator.hh"
#include "law/decree.hh" #include "law/decree.hh"
namespace paxos namespace paxos
{ {
struct Vote struct Vote
{ {
Legislator legislator; std::string legislator;
int ballot_id; int ballot_id;
Decree decree; Decree decree;
}; };