Add event handler

This commit is contained in:
Julien CLEMENT 2020-05-05 15:39:59 +02:00
parent b42378c261
commit f512de7313
20 changed files with 717 additions and 14 deletions

@ -54,7 +54,8 @@ MISC = \
src/misc/sys-wrapper.hh \ src/misc/sys-wrapper.hh \
src/misc/unistd.hh \ src/misc/unistd.hh \
src/misc/logger.cc \ src/misc/logger.cc \
src/misc/logger.hh src/misc/logger.hh \
src/misc/buffer.cc
EVENTS = \ EVENTS = \
src/events/event-loop.hh \ src/events/event-loop.hh \
@ -69,17 +70,7 @@ EVENTS = \
src/events/receive.hh \ src/events/receive.hh \
src/events/receive.cc \ src/events/receive.cc \
src/events/send.hh \ src/events/send.hh \
src/events/send.cc \ src/events/send.cc
src/events/send-file.hh \
src/events/send-file.cc \
src/events/send-request.hh \
src/events/send-request.cc \
src/events/receive-response.hh \
src/events/receive-response.cc \
src/events/file-sender.hh \
src/events/file-sender.cc \
src/events/reverse-proxy.cc \
src/events/reverse-proxy.hh
LEGISLATOR = \ LEGISLATOR = \
src/legislator/legislator.cc \ src/legislator/legislator.cc \
@ -88,10 +79,12 @@ LEGISLATOR = \
SOCKET = \ SOCKET = \
src/socket/default-socket.cc \ src/socket/default-socket.cc \
src/socket/default-socket.hh \ src/socket/default-socket.hh \
src/connection/connection.cc \
src/socket/socket.hh src/socket/socket.hh
SOURCES = \ SOURCES = \
$(CONFIG) \ $(CONFIG) \
$(EVENTS) \
$(LEGISLATOR) \ $(LEGISLATOR) \
$(MISC) \ $(MISC) \
$(SOCKET) \ $(SOCKET) \

@ -0,0 +1,54 @@
#include "connection.hh"
namespace paxos
{
Connection::Connection(shared_socket socket)
{
this->socket_ = socket;
servername_ = std::string();
}
Connection::~Connection()
{
}
const shared_socket& Connection::get_socket()
{
return socket_;
}
const misc::shared_fd Connection::get_socket_fd()
{
return socket_->fd_get();
}
misc::Buffer& Connection::get_buffer_in()
{
return buffer_in_;
}
misc::Buffer& Connection::get_buffer_out()
{
return buffer_out_;
}
const std::string Connection::get_origin_ip() const
{
return socket_->getsockname();
}
const std::string& Connection::get_origin_port() const
{
return socket_->get_listener_port();
}
void Connection::set_servername(const std::string& servername)
{
servername_ = servername;
}
const std::string& Connection::get_servername() const
{
return servername_;
}
}

@ -0,0 +1,76 @@
/**
* \file vhost/connection.hh
* \brief Connection declaration.
*/
#pragma once
#include <iostream>
#include <memory>
#include "misc/fd.hh"
#include "socket/socket.hh"
#include "misc/buffer.hh"
namespace paxos
{
/**
* \struct Connection
* \brief Value object representing a connection.
*
* We need to keep track of the state of each request while it has not
* been fully processed.
*/
struct Connection
{
friend class APM;
Connection(shared_socket socket);
Connection(const Connection& con) = default;
Connection& operator=(const Connection&) = default;
Connection(Connection&&) = default;
Connection& operator=(Connection&&) = default;
~Connection();
/* Add members to store the information relative to the
** connection.
*/
const shared_socket& get_socket();
const misc::shared_fd get_socket_fd();
/*
** \brief Getter of the input misc::Buffer
** \return misc::Buffer
*/
misc::Buffer& get_buffer_in();
/*
** \brief Getter of the output misc::Buffer
** \return misc::Buffer
*/
misc::Buffer& get_buffer_out();
/**
* \brief Returns the ip the client accessed while connecting
*/
const std::string get_origin_ip() const;
/**
* \brief Returns the port the client accessed while connecting
*/
const std::string& get_origin_port() const;
const std::string& get_servername() const;
void set_servername(const std::string& servername);
private:
shared_socket socket_;
std::string servername_;
misc::Buffer buffer_in_;
misc::Buffer buffer_out_;
};
using shared_connection = std::shared_ptr<Connection>;
} // namespace http

52
src/events/event-loop.cc Normal file

@ -0,0 +1,52 @@
#include "event-loop.hh"
namespace paxos
{
EventLoop::EventLoop()
{
/* Macro equivalent of ev_default_loop(int flags) */
loop = EV_DEFAULT;
}
EventLoop::EventLoop(struct ev_loop* loopev)
{
loop = loopev;
}
EventLoop::~EventLoop()
{
/* Maybe need to stop all watcher or easier, free them */
ev_loop_destroy(loop);
}
void EventLoop::register_watcher(EventWatcher* watcher)
{
ev_io_start(loop, &(watcher->watcher_get()));
}
void EventLoop::unregister_watcher(EventWatcher* watcher)
{
ev_io_stop(loop, &(watcher->watcher_get()));
}
void EventLoop::register_sigint_watcher(ev_signal* signal) const
{
ev_signal_start(loop, signal);
}
void EventLoop::register_timer_watcher(ev_timer* et) const
{
ev_timer_start(loop, et);
}
void EventLoop::unregister_timer_watcher(ev_timer* et) const
{
ev_timer_stop(loop, et);
}
void EventLoop::operator()() const
{
ev_run(loop, 0);
}
}

95
src/events/event-loop.hh Normal file

@ -0,0 +1,95 @@
/**
* \file events/event-loop.hh
* \brief EventLoop declaration.
*/
#pragma once
#include <ev.h>
#include "error/not-implemented.hh"
#include "events/events.hh"
namespace paxos
{
class EventWatcher;
/**
* \struct EventLoop
* \brief Value object wrapping libev's ev_loop.
*
* Event loop design pattern.
*/
struct EventLoop
{
/**
* \brief Create a default EventLoop based on a default ev_loop.
*/
EventLoop();
/**
* \brief Create an EventLoop from an existing ev_loop.
*
* \param loop ev_loop* custom ev_loop.
*/
explicit EventLoop(struct ev_loop*);
EventLoop(const EventLoop&) = default;
EventLoop& operator=(const EventLoop&) = default;
EventLoop(EventLoop&&) = default;
EventLoop& operator=(EventLoop&&) = default;
/**
* \brief Destroy the ev_loop.
*/
~EventLoop();
/**
* \brief Activate the given ev_io.
*
* Note that only activated watchers will receive events.
*
* \param watcher EventWatcher* to register in the loop.
*/
void register_watcher(EventWatcher*);
/**
* \brief Stop the given ev_io.
*
* \param watcher EventWatcher* to unregister in the loop.
*/
void unregister_watcher(EventWatcher*);
/**
* \brief Register SIGINT ev_signal.
*
* \param watcher ev_signal* to register in the loop.
*/
void register_sigint_watcher(ev_signal*) const;
/**
* \brief Start ev_timer.
*
* \param watcher ev_timer* to register in the loop.
*/
void register_timer_watcher(ev_timer* et) const;
/**
* \brief Stop ev_timer.
*
* \param watcher ev_timer* to unregister in the loop.
*/
void unregister_timer_watcher(ev_timer* et) const;
/**
* \brief Start waiting for events.
*/
void operator()() const;
/**
* \brief Libev's event loop.
*/
struct ev_loop* loop;
};
} // namespace http

20
src/events/events.cc Normal file

@ -0,0 +1,20 @@
#include "events/events.hh"
#include "events/register.hh"
namespace paxos
{
EventWatcher::EventWatcher(int fd, int flags)
{
ev_io_init(&watcher_, EventWatcher::event_callback, fd, flags);
watcher_.data = reinterpret_cast<void*>(this);
}
void EventWatcher::event_callback(struct ev_loop*, ev_io* w, int)
{
auto ew = reinterpret_cast<EventWatcher*>(w->data);
auto shared_ew = event_register.at(ew).value();
(*shared_ew)();
}
} // namespace http

75
src/events/events.hh Normal file

@ -0,0 +1,75 @@
/**
* \file events/events.hh
* \brief EvenWatcher declaration.
*/
#pragma once
#include <ev.h>
#include <memory>
#include <iostream>
#include "connection/connection.hh"
namespace paxos
{
/**
* \class EventWatcher
* \brief Abstract class representing event watchers.
*
* Its subclasses will act as functors and implement most of the server's
* workflow. Every potentially blocking operation should be represented by
* an EventWatcher.
*/
class EventWatcher
{
public:
/**
* \brief Initialize EventWatcher's ev_io.
*
* Initialize the ev_io held by the EventWatcher and store the
* EventWatcher's address in it.
*
* \param fd int socket fd.
* \param flags int initialization flags.
*/
EventWatcher(int fd, int flags);
EventWatcher(const EventWatcher&) = delete;
EventWatcher& operator=(const EventWatcher&) = delete;
EventWatcher(EventWatcher&&) = delete;
EventWatcher& operator=(EventWatcher&&) = delete;
virtual ~EventWatcher() = default;
/**
* \brief Virtual pure functor
*
* Each EventWatcher deriving from this class will need to implement
* its own workflow by overriding this functor.
*/
virtual void operator()() = 0;
ev_io& watcher_get() noexcept
{
return watcher_;
}
protected:
/**
* \brief Callback for event handling.
*
* Each time an event occurs in the loop, this callback will be called
* with its associated ev_io. This method is called by libev and uses
* dynamic dispatch to run the correct overload of the operator().
*
* \param watcher ev_io* which received an event.
*/
static void event_callback(struct ev_loop* loop, ev_io* w, int revents);
/**
* \brief Libev's io watcher.
*/
ev_io watcher_;
};
} // namespace http

41
src/events/listener.cc Normal file

@ -0,0 +1,41 @@
#include "events/listener.hh"
#include "events/register.hh"
#include "events/receive.hh"
#include "misc/socket.hh"
#include "misc/logger.hh"
namespace paxos
{
ListenerEW::ListenerEW(shared_socket socket) :
EventWatcher(socket->fd_get()->fd_, EV_READ)
{
/*struct sockaddr_in addr;
int addrlen = sizeof(addr);
sys::getsockname(socket->fd_get()->fd_, &addr, &addrlen);*/
sock_ = socket;
//port_ = addr.sin_port;
}
void ListenerEW::operator()()
{
/* accept client */
try
{
auto client_socket = sock_->accept(nullptr, nullptr);
client_socket->fcntl_set_O_NONBLOCK();
std::cout << "Listener: accepting client ... on " << getpid() << "\n";
/* Register client_socket */
auto e = event_register.register_event<RecvEW>(client_socket);
}
catch (const std::system_error& e)
{
std::cerr << "Listener: Could not accept client\n";
return;
}
}
}

39
src/events/listener.hh Normal file

@ -0,0 +1,39 @@
/**
* \file events/listener.hh
* \brief ListenerEW declaration.
*/
#pragma once
#include "events/events.hh"
#include "socket/socket.hh"
namespace paxos
{
/**
* \class ListenerEW
* \brief Workflow for listener socket.
*/
class ListenerEW : public EventWatcher
{
public:
/**
* \brief Create a ListenerEW from a listener socket.
*/
explicit ListenerEW(shared_socket socket);
/**
* \brief Start accepting connections on listener socket.
*/
void operator()() final;
private:
/**
* \brief Listener socket.
*/
shared_socket sock_;
/**
* \brief Port on which the socket is listening.
*/
uint16_t port_;
};
} // namespace http

34
src/events/receive.cc Normal file

@ -0,0 +1,34 @@
#include "error/parsing-error.hh"
#include "error/connection-closed.hh"
#include "events/receive.hh"
#include "events/register.hh"
#include "events/send.hh"
#include "misc/addrinfo/addrinfo.hh"
namespace paxos
{
RecvEW::RecvEW(int socket)
: EventWatcher(socket, EV_READ)
{
}
RecvEW::RecvEW(shared_socket socket)
: RecvEW(socket->fd_get()->fd_)
{
connection_ = std::make_shared<Connection>(socket);
}
RecvEW::RecvEW(shared_connection connection)
: RecvEW(connection->get_socket_fd()->fd_)
{
connection_ = connection;
}
void RecvEW::operator()()
{
}
}

29
src/events/receive.hh Normal file

@ -0,0 +1,29 @@
#pragma once
#include <memory>
#include <string>
#include "events/events.hh"
namespace paxos
{
class RecvEW : public EventWatcher
{
public:
/**
* \brief Create a RecvEW from a listener socket.
*/
explicit RecvEW(shared_socket socket);
explicit RecvEW(shared_connection connection);
/**
* \brief Read on socket.
*/
void operator()() final;
shared_connection& get_connection();
private:
explicit RecvEW(int socket);
shared_connection connection_;
};
}

20
src/events/register.cc Normal file

@ -0,0 +1,20 @@
#include "register.hh"
namespace paxos
{
std::optional<std::shared_ptr<EventWatcher>>
EventWatcherRegistry::at(EventWatcher* event)
{
return events_[event];
}
bool EventWatcherRegistry::unregister_ew(EventWatcher *ew)
{
if (events_.find(ew) == events_.end())
return false;
loop_.unregister_watcher(ew);
events_.erase(ew);
return true;
}
}

98
src/events/register.hh Normal file

@ -0,0 +1,98 @@
/**
* \file events/register.hh
* \brief EventWatcherRegistry declaration.
*/
#pragma once
#include <ev.h>
#include <memory>
#include <optional>
#include <unordered_map>
#include "events/event-loop.hh"
#include "events/events.hh"
namespace paxos
{
/**
* \class EventWatcherRegistry
* \brief Registry containing an ev_loop and its ev_io.
*
* This class is used to instantiate a global registry inside of each
* instance of the server. Each registry will contain an EventLoop and
* its associated EventWatchers.
* An ev_io stores a raw pointer that needs to be immutable. To ensure
* that, we allocate EventWatcher using smart pointers. The registry
* holds a map to translate raw pointer to the smart one.
*/
class EventWatcherRegistry
{
public:
EventWatcherRegistry() = default;
EventWatcherRegistry(const EventWatcherRegistry&) = delete;
EventWatcherRegistry& operator=(const EventWatcherRegistry&) = delete;
EventWatcherRegistry(EventWatcherRegistry&&) = delete;
EventWatcherRegistry& operator=(EventWatcherRegistry&&) = delete;
~EventWatcherRegistry() = default;
/**
* \brief Register a given event watcher.
*
* Associate the address stored in the ev_io data with a new
* EventWatcher instance inside of the registry's events_ map to
* operate on the ev_io as an EventWatcher object. Then, register this
* ev_io inside of the loop to start receiving its events.
*
* \param args Args variadic number of arguments of unknown types used
* to construct a specific EventWatcher with the proper parameters.
* \return A shared pointer on the constructed EventWatcher object.
*/
template <typename EventWatcher, typename... Args>
std::shared_ptr<EventWatcher> register_event(Args&&...);
/**
* \brief Unregister a given event watcher.
*
* \param watcher EventWatcher* to unregister.
* \return If the operation was successful.
*/
bool unregister_ew(EventWatcher*);
/**
* \brief Access EventWatcher in the map.
*
* \param watcher EventWatcher* key to the events_ map.
* \return The found EventWatcher otherwise std::nullopt.
*/
std::optional<std::shared_ptr<EventWatcher>> at(EventWatcher*);
EventLoop& get_event_loop()
{
return loop_;
}
private:
/**
* \brief Map used to store EventWatchers
*
* This map is used to associate an ev_io to its registered
* EventWatcher instance.
*/
std::unordered_map<EventWatcher*, std::shared_ptr<EventWatcher>>
events_;
/**
* \brief Registry's event loop.
*/
EventLoop loop_;
};
/**
* \brief Service object.
*/
extern EventWatcherRegistry event_register;
} // namespace http
#include "events/register.hxx"

18
src/events/register.hxx Normal file

@ -0,0 +1,18 @@
#pragma once
#include "events/register.hh"
namespace paxos
{
template <typename EventWatcher, typename... Args>
std::shared_ptr<EventWatcher>
EventWatcherRegistry::register_event(Args&&... args)
{
auto ew = std::make_shared<EventWatcher>(std::forward<Args>(args)...);
events_[ew.get()] = ew;
loop_.register_watcher(ew.get());
return ew;
}
} // namespace http

23
src/events/send.cc Normal file

@ -0,0 +1,23 @@
#include "events/send.hh"
#include "events/receive.hh"
#include "events/register.hh"
#include "error/connection-closed.hh"
#include "error/parsing-error.hh"
namespace paxos
{
SendEW::SendEW(shared_connection connection)
: EventWatcher(connection->get_socket_fd()->fd_, EV_WRITE)
, connection_(connection)
{
}
SendEW::~SendEW()
{
}
void SendEW::operator()()
{
}
}

31
src/events/send.hh Normal file

@ -0,0 +1,31 @@
#pragma once
#include <memory>
#include "events/events.hh"
#include "connection/connection.hh"
namespace paxos
{
class SendEW : public EventWatcher
{
public:
/**
* \brief Create a RecvEW from a listener socket.
*/
explicit SendEW(shared_connection connection);
~SendEW();
SendEW(const SendEW&) = delete;
SendEW& operator=(const SendEW&) = delete;
/**
* \brief Read on socket.
*/
void operator()() final;
private:
shared_connection connection_;
};
} // namespace http

@ -3,6 +3,8 @@
#include "legislator-factory.hh" #include "legislator-factory.hh"
#include "socket/default-socket.hh" #include "socket/default-socket.hh"
#include "misc/addrinfo/addrinfo.hh" #include "misc/addrinfo/addrinfo.hh"
#include "events/register.hh"
#include "events/listener.hh"
namespace paxos namespace paxos
{ {
@ -67,6 +69,7 @@ namespace paxos
if (config.is_self) if (config.is_self)
{ {
shared_socket socket = prepare_socket(config); shared_socket socket = prepare_socket(config);
event_register.register_event<ListenerEW>(socket);
} }
return shared; return shared;
} }

@ -2,7 +2,9 @@
#include "config/config.hh" #include "config/config.hh"
#include "legislator/legislator-factory.hh" #include "legislator/legislator-factory.hh"
#include "events/register.hh"
paxos::EventWatcherRegistry paxos::event_register;
int main(int, char **argv) int main(int, char **argv)
{ {

@ -39,7 +39,7 @@ namespace misc
} }
} }
size_t Buffer::receive(http::Socket& socket) size_t Buffer::receive(paxos::Socket& socket)
{ {
char buf[BUFF_SIZE]; char buf[BUFF_SIZE];
ssize_t n = socket.recv(buf, BUFF_SIZE); ssize_t n = socket.recv(buf, BUFF_SIZE);

@ -26,7 +26,7 @@ namespace misc
** \brief Call the socket recv method and append to the buffer ** \brief Call the socket recv method and append to the buffer
** \param socket The default socket ** \param socket The default socket
*/ */
size_t receive(http::Socket& socket); size_t receive(paxos::Socket& socket);
/** /**
** \brief Pop the string in front of data_ and returns it. ** \brief Pop the string in front of data_ and returns it.