EHS Embedded HTTP Server
1.5.1.173
|
00001 /* 00002 * This file has been partially derived from the WebSockets++ project at 00003 * https://github.com/zaphoyd/websocketpp which is licensed under a BSD-license. 00004 */ 00005 00006 #ifndef WSENDPOINT_H 00007 #define WSENDPOINT_H 00008 00009 #include <vector> 00010 #include <sstream> 00011 #include <iostream> 00012 #include <string> 00013 #include <boost/thread.hpp> 00014 #include "wsframe.h" 00015 00016 #ifndef HAVE_BOOST_LOCK_GUARD 00017 #include <pthread.h> 00021 class MutexHelper { 00022 public: 00029 MutexHelper(pthread_mutex_t *mutex, bool locknow = true) : 00030 m_pMutex(mutex), m_bLocked(false) 00031 { 00032 if (locknow) 00033 Lock(); 00034 } 00035 00039 ~MutexHelper() 00040 { 00041 if (m_bLocked) 00042 pthread_mutex_unlock(m_pMutex); 00043 } 00044 00048 void Lock() 00049 { 00050 pthread_mutex_lock(m_pMutex); 00051 m_bLocked = true; 00052 } 00053 00057 void Unlock() 00058 { 00059 m_bLocked = false; 00060 pthread_mutex_unlock(m_pMutex); 00061 } 00062 private: 00063 pthread_mutex_t *m_pMutex; 00064 bool m_bLocked; 00065 00066 MutexHelper(const MutexHelper &); 00067 MutexHelper & operator=(const MutexHelper &); 00068 }; 00069 #endif 00070 00071 namespace wspp { 00072 class wsendpoint; 00073 00083 class wshandler { 00084 public: 00089 void send_text(const std::string & data) { 00090 send(data, frame::opcode::TEXT); 00091 } 00092 00097 void send_binary(const std::string & data) { 00098 send(data, frame::opcode::BINARY); 00099 } 00100 00102 wshandler() : m_endpoint(0) {} 00103 00105 virtual ~wshandler() {} 00106 00107 private: 00108 virtual void on_message(std::string header, std::string data) = 0; 00109 virtual void on_close() = 0; 00110 virtual bool on_ping(const std::string & data) = 0; 00111 virtual void on_pong(const std::string & data) = 0; 00112 virtual void do_response(const std::string & data) = 0; 00113 00114 void send(const std::string& payload, frame::opcode::value op); 00115 00116 // Non-copyable 00117 wshandler(const wshandler&); 00118 wshandler& operator=(const wshandler&); 00119 00120 wsendpoint *m_endpoint; 00121 friend class wsendpoint; 00122 }; 00123 00127 class wsendpoint { 00128 private: 00129 // Non-copyable 00130 wsendpoint(const wsendpoint&); 00131 wsendpoint& operator=(const wsendpoint&); 00132 00133 public: 00138 wsendpoint(wshandler *h) 00139 : m_rng(simple_rng()) 00140 , m_parser(frame::parser<simple_rng>(m_rng)) 00141 , m_state(session::state::OPEN) 00142 , m_lock() 00143 , m_handler(h) 00144 { 00145 #ifndef HAVE_BOOST_LOCK_GUARD 00146 pthread_mutexattr_t mattr; 00147 pthread_mutexattr_init(&mattr); 00148 pthread_mutexattr_settype(&mattr, PTHREAD_MUTEX_RECURSIVE); 00149 pthread_mutex_init(&m_lock, &mattr); 00150 pthread_mutexattr_destroy(&mattr); 00151 #endif 00152 m_handler->m_endpoint = this; 00153 } 00154 00155 #ifndef HAVE_BOOST_LOCK_GUARD 00156 ~wsendpoint() { pthread_mutex_destroy(&m_lock); } 00157 #endif 00158 00170 void AddRxData(std::string data) 00171 { 00172 std::istringstream s(data); 00173 while (m_state != session::state::CLOSED && s.rdbuf()->in_avail()) { 00174 try { 00175 m_parser.consume(s); 00176 if (m_parser.ready()) { 00177 if (m_parser.is_control()) { 00178 process_control(); 00179 } else { 00180 process_data(); 00181 } 00182 m_parser.reset(); 00183 } 00184 } catch (const tracing::wserror & e) { 00185 if (m_parser.ready()) { 00186 m_parser.reset(); 00187 } 00188 switch(e.code()) { 00189 case tracing::wserror::PROTOCOL_VIOLATION: 00190 send_close(close::status::PROTOCOL_ERROR,e.what()); 00191 break; 00192 case tracing::wserror::PAYLOAD_VIOLATION: 00193 send_close(close::status::INVALID_PAYLOAD,e.what()); 00194 break; 00195 case tracing::wserror::INTERNAL_ENDPOINT_ERROR: 00196 send_close(close::status::INTERNAL_ENDPOINT_ERROR,e.what()); 00197 break; 00198 case tracing::wserror::SOFT_ERROR: 00199 continue; 00200 case tracing::wserror::MESSAGE_TOO_BIG: 00201 send_close(close::status::MESSAGE_TOO_BIG,e.what()); 00202 break; 00203 case tracing::wserror::OUT_OF_MESSAGES: 00204 // we need to wait for a message to be returned by the 00205 // client. We exit the read loop. handle_read_frame 00206 // will be restarted by recycle() 00207 //m_read_state = WAITING; 00208 //m_endpoint.wait(type::shared_from_this()); 00209 return; 00210 default: 00211 // Fatal error, forcibly end connection immediately. 00212 std::cerr 00213 << "Dropping TCP due to unrecoverable exception: " << e.code() 00214 << " (" << e.what() << ")" << std::endl; 00215 shutdown(); 00216 } 00217 break; 00218 } 00219 } 00220 } 00221 00229 void send(const std::string& payload, frame::opcode::value op) { 00230 frame::parser<simple_rng> control(m_rng); 00231 control.set_opcode(op); 00232 control.set_fin(true); 00233 control.set_masked(false); 00234 control.set_payload(payload); 00235 00236 std::string tmp(control.get_header_str()); 00237 tmp.append(control.get_payload_str()); 00238 m_handler->do_response(tmp); 00239 } 00240 00241 private: 00242 void process_data() { 00243 m_handler->on_message(m_parser.get_header_str(), m_parser.get_payload_str()); 00244 } 00245 00247 00257 void shutdown() { 00258 #ifdef HAVE_BOOST_LOCK_GUARD 00259 boost::lock_guard<boost::recursive_mutex> lock(m_lock); 00260 #else 00261 // cppcheck-suppress unusedScopedObject 00262 MutexHelper((pthread_mutex_t *)&m_lock); 00263 #endif 00264 00265 if (m_state == session::state::CLOSED) {return;} 00266 00267 m_state = session::state::CLOSED; 00268 m_handler->on_close(); 00269 } 00270 00283 void pong(const std::vector<unsigned char> & payload) { 00284 #ifdef HAVE_BOOST_LOCK_GUARD 00285 boost::lock_guard<boost::recursive_mutex> lock(m_lock); 00286 #else 00287 // cppcheck-suppress unusedScopedObject 00288 MutexHelper((pthread_mutex_t *)&m_lock); 00289 #endif 00290 00291 if (m_state != session::state::OPEN) {return;} 00292 // if (m_detached) {return;} 00293 00294 // TODO: optimize control messages and handle case where 00295 // endpoint is out of messages 00296 frame::parser<simple_rng> control(m_rng); 00297 control.set_opcode(frame::opcode::PONG); 00298 control.set_fin(true); 00299 control.set_masked(false); 00300 control.set_payload(payload); 00301 00302 std::string tmp(control.get_header_str()); 00303 tmp.append(control.get_payload_str()); 00304 m_handler->do_response(tmp); 00305 } 00306 00308 00319 void send_close(close::status::value code, const std::string& reason) { 00320 #ifdef HAVE_BOOST_LOCK_GUARD 00321 boost::lock_guard<boost::recursive_mutex> lock(m_lock); 00322 #else 00323 // cppcheck-suppress unusedScopedObject 00324 MutexHelper((pthread_mutex_t *)&m_lock); 00325 #endif 00326 00327 // if (m_detached) {return;} 00328 00329 if (m_state != session::state::OPEN) { 00330 std::cerr << "Tried to disconnect a session that wasn't open" << std::endl; 00331 return; 00332 } 00333 00334 if (close::status::invalid(code)) { 00335 std::cerr << "Tried to close a connection with invalid close code: " 00336 << code << std::endl; 00337 return; 00338 } else if (close::status::reserved(code)) { 00339 std::cerr << "Tried to close a connection with reserved close code: " 00340 << code << std::endl; 00341 return; 00342 } 00343 00344 m_state = session::state::CLOSING; 00345 00346 frame::parser<simple_rng> control(m_rng); 00347 control.set_opcode(frame::opcode::CLOSE); 00348 control.set_fin(true); 00349 control.set_masked(false); 00350 if (code != close::status::NO_STATUS) { 00351 const uint16_t payload = htons(code); 00352 std::string pl(reinterpret_cast<const char*>(&payload), 2); 00353 pl.append(reason); 00354 control.set_payload(pl); 00355 } 00356 00357 std::string tmp(control.get_header_str()); 00358 tmp.append(control.get_payload_str()); 00359 m_handler->do_response(tmp); 00360 } 00361 00363 00368 void send_close_ack(close::status::value remote_close_code, std::string remote_close_reason) { 00369 close::status::value local_close_code; 00370 std::string local_close_reason; 00371 // echo close value unless there is a good reason not to. 00372 if (remote_close_code == close::status::NO_STATUS) { 00373 local_close_code = close::status::NORMAL; 00374 local_close_reason = ""; 00375 } else if (remote_close_code == close::status::ABNORMAL_CLOSE) { 00376 // TODO: can we possibly get here? This means send_close_ack was 00377 // called after a connection ended without getting a close 00378 // frame 00379 throw "shouldn't be here"; 00380 } else if (close::status::invalid(remote_close_code)) { 00381 // TODO: shouldn't be able to get here now either 00382 local_close_code = close::status::PROTOCOL_ERROR; 00383 local_close_reason = "Status code is invalid"; 00384 } else if (close::status::reserved(remote_close_code)) { 00385 // TODO: shouldn't be able to get here now either 00386 local_close_code = close::status::PROTOCOL_ERROR; 00387 local_close_reason = "Status code is reserved"; 00388 } else { 00389 local_close_code = remote_close_code; 00390 local_close_reason = remote_close_reason; 00391 } 00392 00393 // TODO: check whether we should cancel the current in flight write. 00394 // if not canceled the close message will be sent as soon as the 00395 // current write completes. 00396 00397 00398 frame::parser<simple_rng> control(m_rng); 00399 control.set_opcode(frame::opcode::CLOSE); 00400 control.set_fin(true); 00401 control.set_masked(false); 00402 if (local_close_code != close::status::NO_STATUS) { 00403 const uint16_t payload = htons(local_close_code); 00404 std::string pl(reinterpret_cast<const char*>(&payload), 2); 00405 pl.append(local_close_reason); 00406 control.set_payload(pl); 00407 } 00408 00409 std::string tmp(control.get_header_str()); 00410 tmp.append(control.get_payload_str()); 00411 m_handler->do_response(tmp); 00412 shutdown(); 00413 } 00414 00415 void process_control() { 00416 switch (m_parser.get_opcode()) { 00417 case frame::opcode::PING: 00418 if (m_handler->on_ping(m_parser.get_payload_str())) { 00419 pong(m_parser.get_payload()); 00420 } 00421 break; 00422 case frame::opcode::PONG: 00423 m_handler->on_pong(m_parser.get_payload_str()); 00424 break; 00425 case frame::opcode::CLOSE: 00426 // check that the codes we got over the wire are valid 00427 if (m_state == session::state::OPEN) { 00428 // other end is initiating 00429 std::cerr << "sending close ack" << std::endl; 00430 00431 // TODO: 00432 send_close_ack(m_parser.get_close_code(), m_parser.get_close_reason()); 00433 } else if (m_state == session::state::CLOSING) { 00434 // ack of our close 00435 std::cerr << "got close ack" << std::endl; 00436 shutdown(); 00437 } 00438 break; 00439 default: 00440 throw tracing::wserror("Invalid Opcode", 00441 tracing::wserror::PROTOCOL_VIOLATION); 00442 break; 00443 } 00444 } 00445 00446 private: 00447 simple_rng m_rng; 00448 frame::parser<simple_rng> m_parser; 00449 session::state::value m_state; 00450 #ifdef HAVE_BOOST_LOCK_GUARD 00451 mutable boost::recursive_mutex m_lock; 00452 #else 00453 mutable pthread_mutex_t m_lock; 00454 #endif 00455 wshandler *m_handler; 00456 }; 00457 00458 void wshandler::send(const std::string& payload, frame::opcode::value op) 00459 { 00460 if (m_endpoint) { 00461 m_endpoint->send(payload, op); 00462 } 00463 } 00464 00465 } 00466 00467 #endif