aboutsummaryrefslogtreecommitdiffstats
path: root/include
diff options
context:
space:
mode:
authoripkn <ipknhama@gmail.com>2014-08-17 05:35:21 -0400
committeripkn <ipknhama@gmail.com>2014-08-17 05:35:21 -0400
commitdaa3c820878f8e189120cf9359caf8b2359d61ca (patch)
tree2c8ea578483bc9ef16c539c76ac6f0e9b10df363 /include
parent79bbdfebfaff295f468bd9e2d46e8106693eb113 (diff)
downloadcrow-daa3c820878f8e189120cf9359caf8b2359d61ca.tar.gz
crow-daa3c820878f8e189120cf9359caf8b2359d61ca.zip
improve performance by 2x
change to io_service per CPU model
Diffstat (limited to 'include')
-rw-r--r--include/dumb_timer_queue.h12
-rw-r--r--include/http_connection.h9
-rw-r--r--include/http_server.h59
3 files changed, 55 insertions, 25 deletions
diff --git a/include/dumb_timer_queue.h b/include/dumb_timer_queue.h
index ebb8bb1..86ec7c1 100644
--- a/include/dumb_timer_queue.h
+++ b/include/dumb_timer_queue.h
@@ -29,19 +29,17 @@ namespace crow
k.first = nullptr;
if (!self)
return;
- self->mutex_.lock();
+
unsigned int index = (unsigned int)(k.second - self->step_);
if (index < self->dq_.size())
self->dq_[index].second = nullptr;
- self->mutex_.unlock();
}
key add(std::function<void()> f)
{
- mutex_.lock();
dq_.emplace_back(std::chrono::steady_clock::now(), std::move(f));
int ret = step_+dq_.size()-1;
- mutex_.unlock();
+
CROW_LOG_DEBUG << "timer add inside: " << this << ' ' << ret ;
return {this, ret};
}
@@ -50,7 +48,7 @@ namespace crow
{
if (!io_service_)
return;
- mutex_.lock();
+
auto now = std::chrono::steady_clock::now();
while(!dq_.empty())
{
@@ -60,13 +58,12 @@ namespace crow
if (x.second)
{
CROW_LOG_DEBUG << "timer call: " << this << ' ' << step_;
- //io_service_->post(std::move(x.second));
+ // we know that timer handlers are very simple currenty; call here
x.second();
}
dq_.pop_front();
step_++;
}
- mutex_.unlock();
}
void set_io_service(boost::asio::io_service& io_service)
@@ -82,7 +79,6 @@ namespace crow
int tick{5};
boost::asio::io_service* io_service_{};
std::deque<std::pair<decltype(std::chrono::steady_clock::now()), std::function<void()>>> dq_;
- std::mutex mutex_;
int step_{};
};
}
diff --git a/include/http_connection.h b/include/http_connection.h
index 3232476..907485f 100644
--- a/include/http_connection.h
+++ b/include/http_connection.h
@@ -26,8 +26,8 @@ namespace crow
class Connection
{
public:
- Connection(tcp::socket&& socket, Handler* handler, const std::string& server_name)
- : socket_(std::move(socket)),
+ Connection(boost::asio::io_service& io_service, Handler* handler, const std::string& server_name)
+ : socket_(io_service),
handler_(handler),
parser_(this),
server_name_(server_name)
@@ -48,6 +48,11 @@ namespace crow
#endif
}
+ tcp::socket& socket()
+ {
+ return socket_;
+ }
+
void start()
{
//auto self = this->shared_from_this();
diff --git a/include/http_server.h b/include/http_server.h
index 73f62f0..6dc845a 100644
--- a/include/http_server.h
+++ b/include/http_server.h
@@ -1,5 +1,6 @@
#pragma once
+#include <boost/date_time/posix_time/posix_time.hpp>
#include <boost/asio.hpp>
#include <cstdint>
#include <atomic>
@@ -23,53 +24,80 @@ namespace crow
public:
Server(Handler* handler, uint16_t port, uint16_t concurrency = 1)
: acceptor_(io_service_, tcp::endpoint(asio::ip::address(), port)),
- socket_(io_service_),
signals_(io_service_, SIGINT, SIGTERM),
handler_(handler),
concurrency_(concurrency),
port_(port)
{
- do_accept();
}
void run()
{
+ if (concurrency_ < 0)
+ concurrency_ = 1;
+
+ for(int i = 0; i < concurrency_; i++)
+ io_service_pool_.emplace_back(new boost::asio::io_service());
+
std::vector<std::future<void>> v;
for(uint16_t i = 0; i < concurrency_; i ++)
v.push_back(
- std::async(std::launch::async, [this]{
+ std::async(std::launch::async, [this, i]{
auto& timer_queue = detail::dumb_timer_queue::get_current_dumb_timer_queue();
- timer_queue.set_io_service(io_service_);
- while(!io_service_.stopped())
- {
+ timer_queue.set_io_service(*io_service_pool_[i]);
+ boost::asio::deadline_timer timer(*io_service_pool_[i]);
+ timer.expires_from_now(boost::posix_time::seconds(1));
+ std::function<void(const boost::system::error_code& ec)> handler;
+ handler = [&](const boost::system::error_code& ec){
+ if (ec)
+ return;
timer_queue.process();
- io_service_.poll_one();
- }
+ timer.expires_from_now(boost::posix_time::seconds(1));
+ timer.async_wait(handler);
+ };
+ timer.async_wait(handler);
+ io_service_pool_[i]->run();
}));
-
CROW_LOG_INFO << server_name_ << " server is running, local port " << port_;
signals_.async_wait(
[&](const boost::system::error_code& error, int signal_number){
- io_service_.stop();
+ stop();
});
-
+
+ do_accept();
+
+ v.push_back(std::async(std::launch::async, [this]{
+ io_service_.run();
+ CROW_LOG_INFO << "Exiting.";
+ }));
}
void stop()
{
io_service_.stop();
+ for(auto& io_service:io_service_pool_)
+ io_service->stop();
}
private:
+ asio::io_service& pick_io_service()
+ {
+ // TODO load balancing
+ roundrobin_index_++;
+ if (roundrobin_index_ >= io_service_pool_.size())
+ roundrobin_index_ = 0;
+ return *io_service_pool_[roundrobin_index_];
+ }
+
void do_accept()
{
- acceptor_.async_accept(socket_,
- [this](boost::system::error_code ec)
+ auto p = new Connection<Handler>(pick_io_service(), handler_, server_name_);
+ acceptor_.async_accept(p->socket(),
+ [this, p](boost::system::error_code ec)
{
if (!ec)
{
- auto p = new Connection<Handler>(std::move(socket_), handler_, server_name_);
p->start();
}
do_accept();
@@ -78,13 +106,14 @@ namespace crow
private:
asio::io_service io_service_;
+ std::vector<std::unique_ptr<asio::io_service>> io_service_pool_;
tcp::acceptor acceptor_;
- tcp::socket socket_;
boost::asio::signal_set signals_;
Handler* handler_;
uint16_t concurrency_{1};
std::string server_name_ = "Crow/0.1";
uint16_t port_;
+ unsigned int roundrobin_index_{};
};
}