Fixed FM demod and http server.

master
Hannes Matuschek 11 years ago
parent 589f2e5d7c
commit 7cfba39a23

@ -3,6 +3,6 @@
INCLUDE_DIRECTORIES(${CMAKE_CURRENT_BINARY_DIR}) INCLUDE_DIRECTORIES(${CMAKE_CURRENT_BINARY_DIR})
link_resources(sdr_cmd_resources shared/index.html) link_resources(sdr_cmd_resources shared/index.html)
add_executable(sdr_cmd main.cc) add_executable(sdr_cmd main.cc aprsapplication.cc)
add_dependencies(sdr_cmd sdr_cmd_resources) add_dependencies(sdr_cmd sdr_cmd_resources)
target_link_libraries(sdr_cmd ${LIBS} libsdr ) target_link_libraries(sdr_cmd ${LIBS} libsdr )

@ -0,0 +1,87 @@
#include "aprsapplication.hh"
// This file will be generated at build time
// and contains the static content served.
#include "sdr_cmd_resources.hh"
using namespace sdr;
APRSApplication::APRSApplication(http::Server &server)
: APRS(), _server(server), _messages()
{
// Register callbacks
server.addStatic("/", std::string(index_html, index_html_size), "text/html");
server.addJSON("/spots", this, &APRSApplication::spots);
server.addHandler("/update", this, &APRSApplication::update);
}
APRSApplication::~APRSApplication() {
// pass...
}
bool
APRSApplication::spots(const http::JSON &request, http::JSON &response) {
std::list<http::JSON> msg_list;
for (std::list<Message>::iterator msg = _messages.begin(); msg != _messages.end(); msg++) {
std::map<std::string, http::JSON> message;
message["call"] = http::JSON(msg->from().call());
if (msg->hasLocation()) {
message["lat"] = http::JSON(msg->latitude());
message["lon"] = http::JSON(msg->longitude());
}
time_t time = msg->time();
message["time"] = http::JSON(ctime(&time));
msg_list.push_back(message);
}
response = http::JSON(msg_list);
return true;
}
void
APRSApplication::handleAPRSMessage(const Message &message) {
_messages.push_back(message);
// Serialize JSON message
std::string json_text;
if (_clients.size()) {
std::map<std::string, http::JSON> msg;
msg["call"] = http::JSON(message.from().call());
if (message.hasLocation()) {
msg["lat"] = http::JSON(message.latitude());
msg["lon"] = http::JSON(message.longitude());
}
time_t time = message.time();
msg["time"] = http::JSON(ctime(&time));
http::JSON(msg).serialize(json_text);
}
// a list collecting the closed
// signal all clients connected
std::list<http::Connection>::iterator client = _clients.begin();
while (client != _clients.end()) {
if (client->isClosed()) {
// remove client from list
client = _clients.erase(client);
} else {
// send event
client->send("data: ");
client->send(json_text);
client->send("\n\n");
}
}
}
void
APRSApplication::update(const http::Request &request, http::Response &response) {
// This call back implements a server side event stream, means the response will
// be blocked until the connection is closed
response.setHeader("Content-Type", "text/event-stream");
response.setHeader("Cache-Control", "no-cache");
response.setStatus(http::Response::STATUS_OK);
response.sendHeaders();
// Signal connection thread to exit without closing the connection
response.connection().setProtocolUpgrade();
// Store connection
_clients.push_back(response.connection());
}

@ -0,0 +1,29 @@
#ifndef __SDR_APRS_APRSAPPLICATION_HH__
#define __SDR_APRS_APRSAPPLICATION_HH__
#include "http.hh"
#include "aprs.hh"
namespace sdr {
class APRSApplication: public APRS
{
public:
APRSApplication(http::Server &server);
~APRSApplication();
bool spots(const http::JSON &request, http::JSON &response);
void update(const http::Request &request, http::Response &response);
void handleAPRSMessage(const Message &message);
protected:
http::Server &_server;
std::list<Message> _messages;
std::list<http::Connection> _clients;
};
}
#endif // APRSAPPLICATION_HH

@ -1,36 +1,26 @@
#include <stdio.h> #include <stdio.h>
#include "queue.hh" #include "queue.hh"
#include "http.hh"
#include "logger.hh" #include "logger.hh"
#include <iostream> #include <iostream>
#include <csignal> #include <csignal>
#include "sdr_cmd_resources.hh"
#include "aprsapplication.hh"
using namespace sdr; using namespace sdr;
class Application
{
public:
Application() {}
bool echo(const http::JSON &request, http::JSON &response) {
response = request;
return true;
}
};
static http::Server *server = 0; static http::Server *server = 0;
static void __sigint_handler(int signo) { static void __sigint_handler(int signo) {
server->stop(true); if (server) { server->stop(true); }
} }
int main(int argc, char *argv[]) { int main(int argc, char *argv[]) {
Application app;
server = new http::Server(8080); server = new http::Server(8080);
APRSApplication app(*server);
// Install log handler // Install log handler
sdr::Logger::get().addHandler( sdr::Logger::get().addHandler(
@ -39,9 +29,6 @@ int main(int argc, char *argv[]) {
// Register signal handler: // Register signal handler:
signal(SIGINT, __sigint_handler); signal(SIGINT, __sigint_handler);
// Register callbacks
server->addStatic("/", std::string(index_html, index_html_size), "text/html");
server->addJSON("/echo", &app, &Application::echo);
// start server // start server
server->start(true); server->start(true);

@ -13,50 +13,53 @@
<script src="https://maps.googleapis.com/maps/api/js?v=3.exp"></script> <script src="https://maps.googleapis.com/maps/api/js?v=3.exp"></script>
<script> <script>
var map; var map = null;
var connections = {}; var spots = {};
var qth = null; var source = null;
function onSetQTH (lon, lat) { function onAddSpot (callsign, lon, lat) {
// Center map at QTH
map.setCenter(new google.maps.LatLng(lat, lon));
// Add or update the marker at QTH
if (null == qth) {
qth = new google.maps.Marker({
position: new google.maps.LatLng(lat, lon),
title: "QTH", map: map});
} else {
qth.setPosition(new google.maps.LatLng(lat, lon));
}
}
function onAddConnection (callsign, lon, lat, snr) {
// Check if connection exists // Check if connection exists
if (callsign in connections) { if (callsign in spots) {
// If connection exists, update SNR // If connection exists, update SNR
connections[callsign].line.setOptions(options={ connections[spots].marker.setPosition(new google.maps.LatLng(lat, lon));
clickable: false, draggable: false, geodesic: true,
path: [qth.getPosition(), new google.maps.LatLng(lat, lon)],
strokeColor: '#0000FF', strokeOpacity: (1.0 + Math.min(0, snr/40)), strokeWeight: 2 } );
connections[callsign].marker.setPosition(new google.maps.LatLng(lat, lon));
} else { } else {
// otherwise, add marker at location with label callsign and line from QTH to location // otherwise, add marker at location with label callsign
var marker = new google.maps.Marker({ var marker = new google.maps.Marker({
position: new google.maps.LatLng(lat, lon), title: callsign, map: map}); position: new google.maps.LatLng(lat, lon), title: callsign, map: map});
var line = new google.maps.Polyline({ spots[callsign] = { marker: marker };
clickable: false, draggable: false, geodesic: true,
path: [qth.getPosition(), new google.maps.LatLng(lat, lon)],
strokeColor: '#0000FF', strokeOpacity: (1.0 + Math.min(0, snr/40)), strokeWeight: 2 });
line.setMap(map);
connections[callsign] = { marker: marker, line: line };
} }
} }
function updateHandler(e) {
var msg = JSON.parse(e.data);
console.log("RX: " + msg)
onAddSpot(msg.call, msg.lon, msg.lat);
}
function initialize() { function initialize() {
// Create Map object // Create Map object
map = new google.maps.Map( map = new google.maps.Map(
document.getElementById('map-canvas'), document.getElementById('map-canvas'),
{zoom:2, center: new google.maps.LatLng(0,0), streetViewControl:false}); {zoom:2, center: new google.maps.LatLng(0,0), streetViewControl:false});
// Request all spots
var xmlhttp = new XMLHttpRequest();
var url = "spots";
xmlhttp.onreadystatechange = function() {
if (xmlhttp.readyState != 4 && xmlhttp.status != 200) { return; }
var spots = JSON.parse(xmlhttp.responseText);
for (spot in spots) { onAddSpot(spot); }
}
xmlhttp.open("GET", url, true);
xmlhttp.send();
// Now, connect to event source for updates
if (!!window.EventSource) {
source = new window.EventSource("update");
source.addEventListener('message', updateHandler);
} else {
alert("Your browser does not support the EventSource -> no live update.")
}
} }
google.maps.event.addDomListener(window, 'load', initialize); google.maps.event.addDomListener(window, 'load', initialize);

@ -149,6 +149,14 @@ APRS::Message::Message(const AX25::Message &msg)
} }
} }
APRS::Message::Message(const Message &msg)
: AX25::Message(msg), _hasLocation(msg._hasLocation), _latitude(msg._latitude),
_longitude(msg._longitude), _symbol(msg._symbol), _hasTime(msg._hasTime),
_time(msg._hasTime)
{
// pass...
}
bool bool
APRS::Message::_readLocation(size_t &offset) { APRS::Message::_readLocation(size_t &offset) {

@ -241,26 +241,12 @@ protected:
/** The actual demodulation. */ /** The actual demodulation. */
void _process(const Buffer< std::complex<iScalar> > &in, const Buffer<oScalar> &out) void _process(const Buffer< std::complex<iScalar> > &in, const Buffer<oScalar> &out)
{ {
// calc first value
SScalar a = (SScalar(in[0].real())*SScalar(_last_value.real()))/2
+ (SScalar(in[0].imag())*SScalar(_last_value.imag()))/2;
SScalar b = (SScalar(in[0].imag())*SScalar(_last_value.real()))/2
- (SScalar(in[0].real())*SScalar(_last_value.imag()))/2;
a >>= Traits<iScalar>::shift; b >>= Traits<iScalar>::shift;
// update last value
_last_value = in[0];
// calc output (prob. overwriting the last value)
out[0] = fast_atan2<iScalar, oScalar>(a, b);
// Calc remaining values // Calc remaining values
for (size_t i=1; i<in.size(); i++) { for (size_t i=1; i<in.size(); i++) {
a = (SScalar(in[i].real())*SScalar(_last_value.real()))/2 oScalar phi = fast_atan2<iScalar, oScalar>(in[i].real(), in[i].imag())/2;
+ (SScalar(in[i].imag())*SScalar(_last_value.imag()))/2; // dphi
b = (SScalar(in[i].imag())*SScalar(_last_value.real()))/2 out[i] = (_last_value - phi);
- (SScalar(in[i].real())*SScalar(_last_value.imag()))/2; _last_value = phi;
a >>= Traits<iScalar>::shift; b >>= Traits<iScalar>::shift;
_last_value = in[i];
out[i] = fast_atan2<iScalar,oScalar>(a, b);
} }
// propergate result // propergate result
@ -271,8 +257,8 @@ protected:
protected: protected:
/** Output rescaling. */ /** Output rescaling. */
int _shift; int _shift;
/** The last input value. */ /** The last angle. */
std::complex<iScalar> _last_value; oScalar _last_value;
/** If true, in-place demodulation is poissible. */ /** If true, in-place demodulation is poissible. */
bool _can_overwrite; bool _can_overwrite;
/** The output buffer, unused if demodulation is performed in-place. */ /** The output buffer, unused if demodulation is performed in-place. */

@ -98,20 +98,13 @@ inline http::Version to_version(const std::string &version) {
* Implementation of Server * Implementation of Server
* ********************************************************************************************* */ * ********************************************************************************************* */
Server::Server(uint port) Server::Server(uint port)
: _port(port), _socket(-1) : _port(port), _socket(-1), _queue()
{ {
// pass... pthread_mutex_init(&_queue_lock, 0);
} }
Server::~Server() { Server::~Server() {
_is_running = false; _is_running = false;
// Close all connections
std::set<Connection *>::iterator con = _connections.begin();
for (; con != _connections.end(); con++) {
// Wait for the connection to close
(*con)->close(true);
delete *con;
}
// Free all handler // Free all handler
std::list<Handler *>::iterator item = _handler.begin(); std::list<Handler *>::iterator item = _handler.begin();
for (; item != _handler.end(); item++) { delete *item; } for (; item != _handler.end(); item++) { delete *item; }
@ -157,49 +150,70 @@ Server::start(bool wait) {
void void
Server::stop(bool wait) { Server::stop(bool wait) {
_is_running = false; _is_running = false;
// Close all open connections
std::set<Connection *>::iterator con = _connections.begin();
for (; con != _connections.end(); con++) {
(*con)->close(wait);
}
// Close the socket we listen on // Close the socket we listen on
::close(_socket); _socket = -1; ::close(_socket); _socket = -1;
// wait for server to join
if (wait) { this->wait(); }
} }
void void
Server::wait() { Server::wait() {
void *ret=0; void *ret=0;
pthread_join(_thread, &ret); pthread_join(_thread, &ret);
// wait for all handlers to join
while (_threads.size()) {
void *ret = 0;
pthread_join(*_threads.begin(), &ret);
}
} }
void * void *
Server::_listen_main(void *ctx) { Server::_listen_main(void *ctx)
{
// Get server instance
Server *self = (Server *)ctx; Server *self = (Server *)ctx;
while (self->_is_running) { // Whil server is running
while (self->_is_running)
{
// Wait for incomming connections
listen(self->_socket, 5); listen(self->_socket, 5);
// Create socket to client
struct sockaddr_in cli_addr; struct sockaddr_in cli_addr;
socklen_t clilen = sizeof(cli_addr); socklen_t clilen = sizeof(cli_addr);
int socket = accept(self->_socket, (struct sockaddr *) &cli_addr, &clilen); int socket = accept(self->_socket, (struct sockaddr *) &cli_addr, &clilen);
if (socket < 0) { continue; } if (socket < 0) { continue; }
try { self->_connections.insert(new Connection(self, socket)); }
catch (...) { } // Construct connection object from socket
// Free closed connections try {
std::list<Connection *> closed_connections; pthread_mutex_lock(&self->_queue_lock);
std::set<Connection *>::iterator item = self->_connections.begin(); self->_queue.push_back(Connection(self, socket));
for (; item != self->_connections.end(); item++) { pthread_t thread;
if ((*item)->isClosed()) { closed_connections.push_back(*item); } pthread_create(&thread, 0, &Server::_connection_main, self);
} self->_threads.insert(thread);
std::list<Connection *>::iterator citem = closed_connections.begin(); pthread_mutex_unlock(&self->_queue_lock);
for (; citem != closed_connections.end(); citem++) { } catch (...) {
self->_connections.erase(*citem); pthread_mutex_unlock(&self->_queue_lock);
delete *citem;
} }
} }
return 0; return 0;
} }
void *
Server::_connection_main(void *ctx) {
Server *self = (Server *)ctx;
pthread_mutex_lock(&self->_queue_lock);
Connection con = self->_queue.front(); self->_queue.pop_front();
pthread_mutex_unlock(&self->_queue_lock);
con.main();
// Remove thread from list of running threads
self->_threads.erase(pthread_self());
return 0;
}
void void
Server::dispatch(const Request &request, Response &response) { Server::dispatch(const Request &request, Response &response)
{
LogMessage msg(LOG_DEBUG); LogMessage msg(LOG_DEBUG);
msg << "httpd: "; msg << "httpd: ";
switch (request.method()) { switch (request.method()) {
@ -230,70 +244,120 @@ Server::addHandler(Handler *handler) {
/* ********************************************************************************************* * /* ********************************************************************************************* *
* Implementation of HTTPD::Connection * Implementation of http::Connection & http::ConnectionObj
* ********************************************************************************************* */ * ********************************************************************************************* */
Connection::Connection(Server *server, int socket) ConnectionObj::ConnectionObj(Server *server, int cli_socket)
: _server(server), _socket(socket) : server(server), socket(cli_socket), refcount(1)
{ {
// Start new thread to parse requests // pass...
if (pthread_create(&_thread, 0, &Connection::_main, (void *)this)) {
::close(_socket); _socket = -1;
RuntimeError err;
err << "httpd: Can not create thread for connection.";
throw err;
} }
ConnectionObj::~ConnectionObj() {
::close(socket); socket=-1;
}
ConnectionObj *
ConnectionObj::ref() {
refcount++; return this;
}
void
ConnectionObj::unref() {
refcount--;
if (0 == refcount) {
delete this;
}
}
Connection::Connection()
: _object(0)
{
// pass...
}
Connection::Connection(Server *server, int socket)
: _object(new ConnectionObj(server, socket))
{
// pass...
}
Connection::Connection(const Connection &other)
: _object(other._object)
{
if (_object) { _object->ref(); }
} }
Connection::~Connection() { Connection::~Connection() {
// Close the socket // Close the socket
this->close(false); if (_object) { _object->unref(); }
} }
Connection &
Connection::operator =(const Connection &other) {
if (_object) { _object->unref(); }
_object = other._object;
if (_object) { _object->ref(); }
return *this;
}
void void
Connection::close(bool wait) { Connection::close(bool wait) {
if (-1 != _socket) { if (0 == _object) { return; }
int socket = _socket; _socket = -1; if (-1 != _object->socket) {
int socket = _object->socket; _object->socket = -1;
LogMessage msg(LOG_DEBUG); LogMessage msg(LOG_DEBUG);
msg << "httpd: Close connection " << socket << "."; msg << "httpd: Close connection " << socket << ".";
Logger::get().log(msg); Logger::get().log(msg);
::close(socket); ::close(socket);
} }
if (wait) {
// Wait for the thread to exit.
void *ret = 0;
pthread_join(_thread, &ret);
}
} }
bool bool
Connection::isClosed() const { Connection::isClosed() const {
return (-1 == _socket); if (0 == _object) { return true; }
return (-1 == _object->socket);
} }
void * bool
Connection::_main(void *ctx) Connection::send(const std::string &data) const {
const char *ptr = data.c_str();
size_t count = data.size();
while (count) {
int c = this->write(ptr, count);
if (c < 0) { return false; }
count -= c; ptr += c;
}
return true;
}
void
Connection::main()
{ {
Connection *self = (Connection *)ctx;
// While socket is open
int error = 0; socklen_t errorlen = sizeof(error); int error = 0; socklen_t errorlen = sizeof(error);
while (0 == getsockopt(self->_socket, SOL_SOCKET, SO_ERROR, &error, &errorlen)) { // While socket is open
Request request(self->_socket); while (0 == getsockopt(_object->socket, SOL_SOCKET, SO_ERROR, &error, &errorlen)) {
Response response(self->_socket); // Contstruct request & reponse instances
// Parse request Request request(*this);
Response response(*this);
// try to parse request
if (! request.parse()) { if (! request.parse()) {
// On parser error or no keep-alive -> exit // On parser error close connection -> exit
self->close(); return 0; this->close(false); return;
} }
// on success -> dispatch request by server // on success -> dispatch request by server
self->_server->dispatch(request, response); _object->server->dispatch(request, response);
// If the connection is kept alive -> continue // on protocol update
if ((! request.isKeepAlive()) || response.closeConnection()) { if (this->protocolUpgrade()) { return; }
// Signal server to close connection // If the connection is keep-alive -> continue
self->close(self); return 0; if (! request.isKeepAlive()) {
// close connection -> exit
this->close(false); return;
} }
} }
// Signal server to close connection // Close connection
self->close(self); return 0; this->close(false);
} }
@ -449,8 +513,8 @@ typedef enum {
} HttpRequestParserState; } HttpRequestParserState;
Request::Request(int socket) Request::Request(const Connection &connection)
: _socket(socket), _method(HTTP_UNKNOWN) : _connection(connection), _method(HTTP_UNKNOWN)
{ {
// pass... // pass...
} }
@ -463,7 +527,7 @@ Request::parse() {
HttpRequestParserState state = READ_METHOD; HttpRequestParserState state = READ_METHOD;
// while getting a char from stream // while getting a char from stream
while (::read(_socket, &c, 1)) { while (_connection.read(&c, 1)) {
switch (state) { switch (state) {
case READ_METHOD: case READ_METHOD:
if (is_space(c)) { if (is_space(c)) {
@ -626,7 +690,7 @@ Request::readBody(std::string &body) const {
size_t N = contentLength(); body.reserve(N); size_t N = contentLength(); body.reserve(N);
char buffer[65536]; char buffer[65536];
while (N>0) { while (N>0) {
int res = ::read(_socket, buffer, std::min(N, size_t(65536))); int res = _connection.read(buffer, std::min(N, size_t(65536)));
if (res>=0) { body.append(buffer, size_t(res)); N -= res; } if (res>=0) { body.append(buffer, size_t(res)); N -= res; }
else { return false; } else { return false; }
} }
@ -637,8 +701,8 @@ Request::readBody(std::string &body) const {
/* ********************************************************************************************* * /* ********************************************************************************************* *
* Implementation of HTTPD::Response * Implementation of HTTPD::Response
* ********************************************************************************************* */ * ********************************************************************************************* */
Response::Response(int socket) Response::Response(const Connection &connection)
: _socket(socket), _status(STATUS_SERVER_ERROR), _close_connection(false) : _connection(connection), _status(STATUS_SERVER_ERROR), _close_connection(false)
{ {
// pass... // pass...
} }
@ -670,18 +734,6 @@ Response::setContentLength(size_t length) {
setHeader("Content-Length", buffer.str()); setHeader("Content-Length", buffer.str());
} }
bool
Response::send(const std::string &data) const {
const char *ptr = data.c_str();
size_t count = data.size();
while (count) {
int c = ::write(_socket, ptr, count);
if (c < 0) { return false; }
count -= c; ptr += c;
}
return true;
}
bool bool
Response::sendHeaders() const { Response::sendHeaders() const {
std::stringstream buffer; std::stringstream buffer;
@ -699,7 +751,7 @@ Response::sendHeaders() const {
buffer << item->first << ": " << item->second << "\r\n"; buffer << item->first << ": " << item->second << "\r\n";
} }
buffer << "\r\n"; buffer << "\r\n";
return send(buffer.str()); return _connection.send(buffer.str());
} }
@ -742,7 +794,7 @@ StaticHandler::handle(const Request &request, Response &response) {
} }
response.setContentLength(_text.size()); response.setContentLength(_text.size());
response.sendHeaders(); response.sendHeaders();
response.send(_text); response.connection().send(_text);
} }
@ -785,7 +837,7 @@ JSONHandler::handle(const http::Request &request, http::Response &response) {
result.serialize(result_string); result.serialize(result_string);
response.setContentLength(result_string.size()); response.setContentLength(result_string.size());
response.sendHeaders(); response.sendHeaders();
response.send(result_string); response.connection().send(result_string);
} else { } else {
response.setStatus(http::Response::STATUS_BAD_REQUEST); response.setStatus(http::Response::STATUS_BAD_REQUEST);
response.setContentLength(0); response.setContentLength(0);

@ -15,7 +15,7 @@
* *
* using namespace sdr; * using namespace sdr;
* *
* // Implements an application, a collection of methods being called from the http::Server. * // Implements an application, a collection of methods being called by the http::Server.
* class Application { * class Application {
* public: * public:
* // contstructor. * // contstructor.
@ -251,17 +251,90 @@ protected:
}; };
struct ConnectionObj
{
public:
ConnectionObj(Server *server, int cli_socket);
~ConnectionObj();
ConnectionObj *ref();
void unref();
public:
/** A weak reference to the server instance. */
Server *server;
/** The connection socket. */
int socket;
/** If @c true (i.e. set by a handler), the http parser thread will exit without closing
* the connection. This allows to "take-over" the tcp connection to the client by the request
* handler. */
bool protocol_upgrade;
/** Reference counter. */
size_t refcount;
};
/** Implements a HTTP connection to a client.
* @c ingroup http */
class Connection
{
public:
/** Empty constructor. */
Connection();
/** Constructor. */
Connection(Server *server, int socket);
/** Copy constructor. Implements reference counting. */
Connection(const Connection &other);
/** Destructor. */
~Connection();
Connection &operator=(const Connection &other);
/** Closes the connection.
* If @c wait is @c true, the method will wait until the thread listening for incomming
* request joined. */
void close(bool wait=false);
/** Returns @c true if the connection is closed. */
bool isClosed() const;
/** Sets the protocol-update flag. */
inline void setProtocolUpgrade() const { _object->protocol_upgrade = true; }
inline bool protocolUpgrade() const { return _object->protocol_upgrade; }
inline ssize_t write(const void *data, size_t n) const {
if (0 == _object) { return -1; }
return ::write(_object->socket, data, n);
}
inline ssize_t read(void *data, size_t n) const {
if (0 == _object) { return -1; }
return ::read(_object->socket, data, n);
}
bool send(const std::string &data) const;
/** Main loop for incomming requests. */
void main();
protected:
ConnectionObj *_object;
};
/** Represents a HTTP request. /** Represents a HTTP request.
* @ingroup http */ * @ingroup http */
class Request class Request
{ {
public: public:
/** Constructor. */ /** Constructor. */
Request(int socket); Request(const Connection &connection);
/** Parses the HTTP request header, returns @c true on success. */ /** Parses the HTTP request header, returns @c true on success. */
bool parse(); bool parse();
/** Return the connection to the client. */
inline const Connection &connection() const { return _connection; }
/** Returns @c true if the connection to the client is kept alive after the response /** Returns @c true if the connection to the client is kept alive after the response
* has been send. */ * has been send. */
bool isKeepAlive() const; bool isKeepAlive() const;
@ -280,17 +353,13 @@ public:
inline Method method() const { return _method; } inline Method method() const { return _method; }
/** Returns the request URL. */ /** Returns the request URL. */
inline const URL &url() const { return _url; } inline const URL &url() const { return _url; }
/** Allows to read data from the connection (i.e. the request body). */
inline int read(const char *data, size_t size) {
return ::read(_socket, (void *)data, size);
}
/** Reads the complete body (if Content-Length header is present). /** Reads the complete body (if Content-Length header is present).
* Retruns @c true on success.*/ * Retruns @c true on success.*/
bool readBody(std::string &body) const; bool readBody(std::string &body) const;
protected: protected:
/** The connection socket. */ /** The connection socket. */
int _socket; Connection _connection;
/** The request method. */ /** The request method. */
Method _method; Method _method;
/** The HTTP version. */ /** The HTTP version. */
@ -315,9 +384,11 @@ public:
} Status; } Status;
public: public:
/** Constructor. /** Constructor. */
* @param socket Specifies the socket over which the response will be send.*/ Response(const Connection &connnection);
Response(int socket);
/** Return the connection to the client. */
inline const Connection &connection() const { return _connection; }
/** Specifies the response code. */ /** Specifies the response code. */
void setStatus(Status status); void setStatus(Status status);
@ -333,19 +404,9 @@ public:
/** Sends the response code and all defined headers. */ /** Sends the response code and all defined headers. */
bool sendHeaders() const; bool sendHeaders() const;
/** Sends some data through the socket. To send the response body, call
* @c sendHeaders() first. */
bool send(const std::string &data) const;
/** Returns @c true if the connection will be closed after the response has been send.
* @c I.e. if there was an error. */
inline bool closeConnection() const { return _close_connection; }
/** Marks that the connection will be closed after the response has been send. */
inline void setCloseConnection() { _close_connection = true; }
protected: protected:
/** The socket over which the response will be send. */ /** The socket over which the response will be send. */
int _socket; Connection _connection;
/** The response code. */ /** The response code. */
Status _status; Status _status;
/** The response headers. */ /** The response headers. */
@ -355,37 +416,6 @@ protected:
}; };
/** Implements a HTTP connection to a client.
* @c ingroup http */
class Connection
{
public:
/** Constructor. */
Connection(Server *server, int socket);
/** Destructor. */
~Connection();
/** Closes the connection.
* If @c wait is @c true, the method will wait until the thread listening for incomming
* request joined. */
void close(bool wait=false);
/** Returns @c true if the connection is closed. */
bool isClosed() const;
protected:
/** Main loop for incomming requests. */
static void *_main(void *ctx);
protected:
/** A weak reference to the server instance. */
Server *_server;
/** The connection socket. */
int _socket;
/** The thread processing requests for this connection. */
pthread_t _thread;
};
/** Base class of all HTTP request handlers. /** Base class of all HTTP request handlers.
* @ingroup http */ * @ingroup http */
class Handler class Handler
@ -434,7 +464,7 @@ protected:
}; };
/** Utility function to provide a handler as a delegate. /** Utility class to provide a handler as a delegate.
* @ingroup http */ * @ingroup http */
template <class T> template <class T>
class DelegateHandler: public Handler class DelegateHandler: public Handler
@ -530,9 +560,11 @@ public:
/** Starts the server. /** Starts the server.
* If @c wait is @c true, the call to this method will bock until the server thread stops. */ * If @c wait is @c true, the call to this method will bock until the server thread stops. */
void start(bool wait=false); void start(bool wait=false);
/** Stops the server. /** Stops the server.
* If @c wait is @c true, the call will block until the server thread stopped. */ * If @c wait is @c true, the call will block until the server thread stopped. */
void stop(bool wait=false); void stop(bool wait=false);
/** Wait for the server thread to join. */ /** Wait for the server thread to join. */
void wait(); void wait();
@ -564,6 +596,8 @@ protected:
void dispatch(const Request &request, Response &response); void dispatch(const Request &request, Response &response);
/** The thread waiting for incomming connections. */ /** The thread waiting for incomming connections. */
static void *_listen_main(void *ctx); static void *_listen_main(void *ctx);
/** The thread handling connections. */
static void *_connection_main(void *ctx);
protected: protected:
/** Port to bind to. */ /** Port to bind to. */
@ -576,8 +610,12 @@ protected:
pthread_t _thread; pthread_t _thread;
/** All registered handler. */ /** All registered handler. */
std::list<Handler *> _handler; std::list<Handler *> _handler;
/** All open connections. */ /** The connection queue. */
std::set<Connection *> _connections; std::list<Connection> _queue;
/** The queue lock. */
pthread_mutex_t _queue_lock;
/** The set of handler threads. */
std::set<pthread_t> _threads;
/* Allow Connection to access dispatch(). */ /* Allow Connection to access dispatch(). */
friend class Connection; friend class Connection;

Loading…
Cancel
Save