EHS Embedded HTTP Server  1.5.1.173
samples/wsendpoint.h
00001 /*
00002  * This file has been partially derived from the WebSockets++ project at
00003  * https://github.com/zaphoyd/websocketpp which is licensed under a BSD-license.
00004  */
00005 
00006 #ifndef WSENDPOINT_H
00007 #define WSENDPOINT_H
00008 
00009 #include <vector>
00010 #include <sstream>
00011 #include <iostream>
00012 #include <string>
00013 #include <boost/thread.hpp>
00014 #include "wsframe.h"
00015 
00016 #ifndef HAVE_BOOST_LOCK_GUARD
00017 #include <pthread.h>
00021 class MutexHelper {
00022     public:
00029         MutexHelper(pthread_mutex_t *mutex, bool locknow = true) :
00030             m_pMutex(mutex), m_bLocked(false)
00031         {
00032             if (locknow)
00033                 Lock();
00034         }
00035 
00039         ~MutexHelper()
00040         {
00041             if (m_bLocked)
00042                 pthread_mutex_unlock(m_pMutex);
00043         }
00044 
00048         void Lock()
00049         {
00050             pthread_mutex_lock(m_pMutex);
00051             m_bLocked = true;
00052         }
00053 
00057         void Unlock()
00058         {
00059             m_bLocked = false;
00060             pthread_mutex_unlock(m_pMutex);
00061         }
00062     private:
00063         pthread_mutex_t *m_pMutex;
00064         bool m_bLocked;
00065 
00066         MutexHelper(const MutexHelper &);
00067         MutexHelper & operator=(const MutexHelper &);
00068 };
00069 #endif
00070 
00071 namespace wspp {
00072     class wsendpoint;
00073 
00083     class wshandler {
00084         public:
00089             void send_text(const std::string & data) {
00090                 send(data, frame::opcode::TEXT);
00091             }
00092 
00097             void send_binary(const std::string & data) {
00098                 send(data, frame::opcode::BINARY);
00099             }
00100 
00102             wshandler() : m_endpoint(0) {}
00103 
00105             virtual ~wshandler() {}
00106 
00107         private:
00108             virtual void on_message(std::string header, std::string data) = 0;
00109             virtual void on_close() = 0;
00110             virtual bool on_ping(const std::string & data) = 0;
00111             virtual void on_pong(const std::string & data) = 0;
00112             virtual void do_response(const std::string & data) = 0;
00113 
00114             void send(const std::string& payload, frame::opcode::value op);
00115 
00116             // Non-copyable
00117             wshandler(const wshandler&);
00118             wshandler& operator=(const wshandler&);
00119 
00120             wsendpoint *m_endpoint;
00121             friend class wsendpoint;
00122     };
00123 
00127     class wsendpoint {
00128         private:
00129             // Non-copyable
00130             wsendpoint(const wsendpoint&);
00131             wsendpoint& operator=(const wsendpoint&);
00132 
00133         public:
00138             wsendpoint(wshandler *h)
00139                 : m_rng(simple_rng())
00140                   , m_parser(frame::parser<simple_rng>(m_rng))
00141                   , m_state(session::state::OPEN)
00142                   , m_lock()
00143                   , m_handler(h)
00144         {
00145 #ifndef HAVE_BOOST_LOCK_GUARD
00146             pthread_mutexattr_t mattr;
00147             pthread_mutexattr_init(&mattr);
00148             pthread_mutexattr_settype(&mattr, PTHREAD_MUTEX_RECURSIVE);
00149             pthread_mutex_init(&m_lock, &mattr);
00150             pthread_mutexattr_destroy(&mattr);
00151 #endif
00152             m_handler->m_endpoint = this;
00153         }
00154 
00155 #ifndef HAVE_BOOST_LOCK_GUARD
00156             ~wsendpoint() { pthread_mutex_destroy(&m_lock); }
00157 #endif
00158 
00170             void AddRxData(std::string data)
00171             {
00172                 std::istringstream s(data);
00173                 while (m_state != session::state::CLOSED && s.rdbuf()->in_avail()) {
00174                     try {
00175                         m_parser.consume(s);
00176                         if (m_parser.ready()) {
00177                             if (m_parser.is_control()) {
00178                                 process_control();
00179                             } else {
00180                                 process_data();
00181                             }                   
00182                             m_parser.reset();
00183                         }
00184                     } catch (const tracing::wserror & e) {
00185                         if (m_parser.ready()) {
00186                             m_parser.reset();
00187                         }
00188                         switch(e.code()) {
00189                             case tracing::wserror::PROTOCOL_VIOLATION:
00190                                 send_close(close::status::PROTOCOL_ERROR,e.what());
00191                                 break;
00192                             case tracing::wserror::PAYLOAD_VIOLATION:
00193                                 send_close(close::status::INVALID_PAYLOAD,e.what());
00194                                 break;
00195                             case tracing::wserror::INTERNAL_ENDPOINT_ERROR:
00196                                 send_close(close::status::INTERNAL_ENDPOINT_ERROR,e.what());
00197                                 break;
00198                             case tracing::wserror::SOFT_ERROR:
00199                                 continue;
00200                             case tracing::wserror::MESSAGE_TOO_BIG:
00201                                 send_close(close::status::MESSAGE_TOO_BIG,e.what());
00202                                 break;
00203                             case tracing::wserror::OUT_OF_MESSAGES:
00204                                 // we need to wait for a message to be returned by the
00205                                 // client. We exit the read loop. handle_read_frame
00206                                 // will be restarted by recycle()
00207                                 //m_read_state = WAITING;
00208                                 //m_endpoint.wait(type::shared_from_this());
00209                                 return;
00210                             default:
00211                                 // Fatal error, forcibly end connection immediately.
00212                                 std::cerr
00213                                     << "Dropping TCP due to unrecoverable exception: " << e.code()
00214                                     << " (" << e.what() << ")" << std::endl;
00215                                 shutdown();
00216                         }
00217                         break;
00218                     }
00219                 }
00220             }
00221 
00229             void send(const std::string& payload, frame::opcode::value op) {
00230                 frame::parser<simple_rng> control(m_rng);
00231                 control.set_opcode(op);
00232                 control.set_fin(true);
00233                 control.set_masked(false);
00234                 control.set_payload(payload);
00235 
00236                 std::string tmp(control.get_header_str());
00237                 tmp.append(control.get_payload_str());
00238                 m_handler->do_response(tmp);
00239             }
00240 
00241         private:
00242             void process_data() {
00243                 m_handler->on_message(m_parser.get_header_str(), m_parser.get_payload_str());
00244             }
00245 
00247 
00257             void shutdown() {
00258 #ifdef HAVE_BOOST_LOCK_GUARD
00259                 boost::lock_guard<boost::recursive_mutex> lock(m_lock);
00260 #else
00261                 // cppcheck-suppress unusedScopedObject
00262                 MutexHelper((pthread_mutex_t *)&m_lock);
00263 #endif
00264 
00265                 if (m_state == session::state::CLOSED) {return;}
00266 
00267                 m_state = session::state::CLOSED;
00268                 m_handler->on_close();
00269             }
00270 
00283             void pong(const std::vector<unsigned char> & payload) {
00284 #ifdef HAVE_BOOST_LOCK_GUARD
00285                 boost::lock_guard<boost::recursive_mutex> lock(m_lock);
00286 #else
00287                 // cppcheck-suppress unusedScopedObject
00288                 MutexHelper((pthread_mutex_t *)&m_lock);
00289 #endif
00290 
00291                 if (m_state != session::state::OPEN) {return;}
00292                 // if (m_detached) {return;}
00293 
00294                 // TODO: optimize control messages and handle case where 
00295                 // endpoint is out of messages
00296                 frame::parser<simple_rng> control(m_rng);
00297                 control.set_opcode(frame::opcode::PONG);
00298                 control.set_fin(true);
00299                 control.set_masked(false);
00300                 control.set_payload(payload);
00301 
00302                 std::string tmp(control.get_header_str());
00303                 tmp.append(control.get_payload_str());
00304                 m_handler->do_response(tmp);
00305             }
00306 
00308 
00319             void send_close(close::status::value code, const std::string& reason) {
00320 #ifdef HAVE_BOOST_LOCK_GUARD
00321                 boost::lock_guard<boost::recursive_mutex> lock(m_lock);
00322 #else
00323                 // cppcheck-suppress unusedScopedObject
00324                 MutexHelper((pthread_mutex_t *)&m_lock);
00325 #endif
00326 
00327                 // if (m_detached) {return;}
00328 
00329                 if (m_state != session::state::OPEN) {
00330                     std::cerr << "Tried to disconnect a session that wasn't open" << std::endl;
00331                     return;
00332                 }
00333 
00334                 if (close::status::invalid(code)) {
00335                     std::cerr << "Tried to close a connection with invalid close code: " 
00336                         << code << std::endl;
00337                     return;
00338                 } else if (close::status::reserved(code)) {
00339                     std::cerr << "Tried to close a connection with reserved close code: " 
00340                         << code << std::endl;
00341                     return;
00342                 }
00343 
00344                 m_state = session::state::CLOSING;
00345 
00346                 frame::parser<simple_rng> control(m_rng);
00347                 control.set_opcode(frame::opcode::CLOSE);
00348                 control.set_fin(true);
00349                 control.set_masked(false);
00350                 if (code != close::status::NO_STATUS) {
00351                     const uint16_t payload = htons(code);
00352                     std::string pl(reinterpret_cast<const char*>(&payload), 2);
00353                     pl.append(reason);
00354                     control.set_payload(pl);
00355                 }
00356 
00357                 std::string tmp(control.get_header_str());
00358                 tmp.append(control.get_payload_str());
00359                 m_handler->do_response(tmp);
00360             }
00361 
00363 
00368             void send_close_ack(close::status::value remote_close_code, std::string remote_close_reason) {
00369                 close::status::value local_close_code;
00370                 std::string local_close_reason;
00371                 // echo close value unless there is a good reason not to.
00372                 if (remote_close_code == close::status::NO_STATUS) {
00373                     local_close_code = close::status::NORMAL;
00374                     local_close_reason = "";
00375                 } else if (remote_close_code == close::status::ABNORMAL_CLOSE) {
00376                     // TODO: can we possibly get here? This means send_close_ack was
00377                     //       called after a connection ended without getting a close
00378                     //       frame
00379                     throw "shouldn't be here";
00380                 } else if (close::status::invalid(remote_close_code)) {
00381                     // TODO: shouldn't be able to get here now either
00382                     local_close_code = close::status::PROTOCOL_ERROR;
00383                     local_close_reason = "Status code is invalid";
00384                 } else if (close::status::reserved(remote_close_code)) {
00385                     // TODO: shouldn't be able to get here now either
00386                     local_close_code = close::status::PROTOCOL_ERROR;
00387                     local_close_reason = "Status code is reserved";
00388                 } else {
00389                     local_close_code = remote_close_code;
00390                     local_close_reason = remote_close_reason;
00391                 }
00392 
00393                 // TODO: check whether we should cancel the current in flight write.
00394                 //       if not canceled the close message will be sent as soon as the
00395                 //       current write completes.
00396 
00397 
00398                 frame::parser<simple_rng> control(m_rng);
00399                 control.set_opcode(frame::opcode::CLOSE);
00400                 control.set_fin(true);
00401                 control.set_masked(false);
00402                 if (local_close_code != close::status::NO_STATUS) {
00403                     const uint16_t payload = htons(local_close_code);
00404                     std::string pl(reinterpret_cast<const char*>(&payload), 2);
00405                     pl.append(local_close_reason);
00406                     control.set_payload(pl);
00407                 }
00408 
00409                 std::string tmp(control.get_header_str());
00410                 tmp.append(control.get_payload_str());
00411                 m_handler->do_response(tmp);
00412                 shutdown();
00413             }
00414 
00415             void process_control() {
00416                 switch (m_parser.get_opcode()) {
00417                     case frame::opcode::PING:
00418                         if (m_handler->on_ping(m_parser.get_payload_str())) {
00419                             pong(m_parser.get_payload());
00420                         }
00421                         break;
00422                     case frame::opcode::PONG:
00423                         m_handler->on_pong(m_parser.get_payload_str());
00424                         break;
00425                     case frame::opcode::CLOSE:
00426                         // check that the codes we got over the wire are valid
00427                         if (m_state == session::state::OPEN) {
00428                             // other end is initiating
00429                             std::cerr << "sending close ack" << std::endl;
00430 
00431                             // TODO:
00432                             send_close_ack(m_parser.get_close_code(), m_parser.get_close_reason());
00433                         } else if (m_state == session::state::CLOSING) {
00434                             // ack of our close
00435                             std::cerr << "got close ack" << std::endl;
00436                             shutdown();
00437                         }
00438                         break;
00439                     default:
00440                         throw tracing::wserror("Invalid Opcode",
00441                                 tracing::wserror::PROTOCOL_VIOLATION);
00442                         break;
00443                 }
00444             }
00445 
00446         private:
00447             simple_rng m_rng;
00448             frame::parser<simple_rng> m_parser;
00449             session::state::value m_state;
00450 #ifdef HAVE_BOOST_LOCK_GUARD
00451             mutable boost::recursive_mutex m_lock;
00452 #else
00453             mutable pthread_mutex_t m_lock;
00454 #endif
00455             wshandler *m_handler;
00456     };
00457 
00458     void wshandler::send(const std::string& payload, frame::opcode::value op)
00459     {
00460         if (m_endpoint) {
00461             m_endpoint->send(payload, op);
00462         }
00463     }
00464 
00465 }
00466 
00467 #endif
 All Classes Functions Variables Enumerations