Source: ../../libxorp/asyncio.hh
|
|
|
|
// -*- c-basic-offset: 4; tab-width: 8; indent-tabs-mode: t -*-
// vim:set sts=4 ts=8:
// Copyright (c) 2001-2009 XORP, Inc.
//
// This program is free software; you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License, Version
// 2.1, June 1999 as published by the Free Software Foundation.
// Redistribution and/or modification of this program under the terms of
// any other version of the GNU Lesser General Public License is not
// permitted.
//
// This program is distributed in the hope that it will be useful, but
// WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. For more details,
// see the GNU Lesser General Public License, Version 2.1, a copy of
// which can be found in the XORP LICENSE.lgpl file.
//
// XORP, Inc, 2953 Bunker Hill Lane, Suite 204, Santa Clara, CA 95054, USA;
// http://xorp.net
// $XORP: xorp/libxorp/asyncio.hh,v 1.36 2009/01/05 18:30:57 jtc Exp $
#ifndef __LIBXORP_ASYNCIO_HH__
#define __LIBXORP_ASYNCIO_HH__
#include "libxorp/xorp.h"
#include <list>
#include <vector>
#ifdef HAVE_FCNTL_H
#include <fcntl.h>
#endif
#include "libxorp/xorpfd.hh"
#include "libxorp/callback.hh"
#include "libxorp/eventloop.hh"
#include "libxorp/ipvx.hh"
struct iovec;
// Asynchronous file transfer classes. These utilize XORP EventLoop
// and the IoEvent framework to read / write files asynchronously. The
// user creates and AsyncFile{Reader,Writer} and adds a buffer for
// reading / writing with add_buffer(). A callback is provided with
// each buffer is called every time I/O happens on the buffer.
//
// Note that in case of AsyncFileWriter the user can use add_data() to
// add the data to write/send, and that data will be stored/buffered
// internally by AsyncFileWriter itself.
//
// Reading/Writing only begins when start() is called, and normally
// continues until there are no buffers left.
// ----------------------------------------------------------------------------
// AsyncFileOperator - Abstract base class for asynchronous file operators.
/**
* @short Base class for asynchronous file transfer operations.
*
* Asynchronous file transfer operations allow data to be transferred
* to or from user supplied buffers. A callback is invoked on each
* transfer. Transfer stops when the available buffers are exhausted.
*/
class AsyncFileOperator {
public:
enum Event {
DATA = 1, // I/O occured
FLUSHING = 2, // Buffer is being flushed
OS_ERROR = 4, // I/O Error has occurred, check error()
END_OF_FILE = 8, // End of file reached (applies to read only)
WOULDBLOCK = 16 // I/O would block the current thread
};
/**
* Callback type user provides when adding buffers to sub-classes
* AsyncFileOperator. Callback's are on a per buffer basis and
* invoked any time some I/O is performed. The offset field
* refers to the offset of the last byte read, or written, from
* the start of the buffer.
*
* Callback has arguments:
* ErrorCode e,
* uint8_t* buffer,
* size_t buffer_bytes,
* size_t offset
*/
typedef XorpCallback4<void, Event, const uint8_t*, size_t, size_t>::RefPtr Callback;
public:
/**
* @return the number of buffers available.
*/
virtual size_t buffers_remaining() const = 0;
/**
* Stop asynchronous operation and clear list of buffers.
*/
virtual void flush_buffers() = 0;
/**
* Start asynchronous operation.
*
* @return true on success, false if no buffers are available.
*/
virtual bool start() = 0;
/**
* Stop asynchronous operation.
*/
virtual void stop() = 0;
/**
* Resume stopped asynchronous operation.
*
* @return true on success, false if no buffers are available.
*/
bool resume() { return start(); }
/**
* @return true if asynchronous I/O is started.
*/
bool running() const { return _running; }
/**
* @return file descriptor undergoing asynchronous operation.
*/
XorpFd fd() const { return _fd; }
/**
* @return the last error code returned by the underlying OS.
*/
int error() const { return _last_error; }
protected:
AsyncFileOperator(EventLoop& e, XorpFd fd,
int priority = XorpTask::PRIORITY_DEFAULT)
: _eventloop(e), _fd(fd), _running(false),
_last_error(0), _priority(priority)
{
#ifndef HOST_OS_WINDOWS
int fl = fcntl(fd, F_GETFL);
assert(fl & O_NONBLOCK);
#endif
}
virtual ~AsyncFileOperator();
EventLoop& _eventloop;
XorpFd _fd;
bool _running;
int _last_error;
int _priority;
};
/**
* @short Read asynchronously from a file.
*/
class AsyncFileReader : public AsyncFileOperator {
public:
/**
* @param e EventLoop that object should associate itself with.
* @param fd a file descriptor to read from.
*/
AsyncFileReader(EventLoop& e, XorpFd fd,
int priority = XorpTask::PRIORITY_DEFAULT);
~AsyncFileReader();
/**
* Add an additional buffer for reading to.
*
* Note that the buffer with the data is managed by the user.
*
* @param buffer pointer to buffer.
* @param buffer_bytes size of buffer in bytes.
* @param cb Callback object to invoke when I/O is performed.
*/
void add_buffer(uint8_t* buffer, size_t buffer_bytes, const Callback& cb);
/**
* Add an additional buffer for reading to.
*
* Note that the buffer with the data is managed by the user.
*
* @param buffer pointer to buffer.
* @param buffer_bytes size of buffer in bytes.
* @param offset starting point for read operation.
* @param cb Callback object to invoke when I/O is performed.
*/
void add_buffer_with_offset(uint8_t* buffer, size_t buffer_bytes,
size_t offset, const Callback& cb);
/**
* Start asynchronous operation.
*
* @return true on success, false if no buffers are available.
*/
bool start();
/**
* Stop asynchronous operation.
*/
void stop();
/**
* @return the number of buffers available.
*/
size_t buffers_remaining() const { return _buffers.size(); }
/**
* Stop asynchronous operation and clear list of buffers.
*/
void flush_buffers();
protected:
class BufferInfo {
public:
BufferInfo(uint8_t* b, size_t bb, Callback cb)
: _buffer(b), _buffer_bytes(bb), _offset(0), _cb(cb) {}
BufferInfo(uint8_t* b, size_t bb, size_t off, Callback cb)
: _buffer(b), _buffer_bytes(bb), _offset(off), _cb(cb) {}
void dispatch_callback(AsyncFileOperator::Event e) {
_cb->dispatch(e, _buffer, _buffer_bytes, _offset);
}
uint8_t* buffer() { return (_buffer); }
size_t buffer_bytes() const { return (_buffer_bytes); }
size_t offset() const { return (_offset); }
void incr_offset(size_t done) { _offset += done; }
private:
BufferInfo(); // Not implemented
BufferInfo(const BufferInfo&); // Not implemented
BufferInfo& operator=(const BufferInfo&); // Not implemented
uint8_t* _buffer;
size_t _buffer_bytes;
size_t _offset;
Callback _cb;
};
void read(XorpFd fd, IoEventType type);
void complete_transfer(int err, ssize_t done);
list<BufferInfo *> _buffers;
#ifdef HOST_OS_WINDOWS
void disconnect(XorpFd fd, IoEventType type);
XorpTask _deferred_io_task;
bool _disconnect_added;
#endif
};
/**
* @short Write asynchronously to non-blocking file.
*/
class AsyncFileWriter : public AsyncFileOperator {
public:
/**
* @param e EventLoop that object should associate itself with.
* @param fd a file descriptor marked as non-blocking to write to.
* @param coalesce the number of buffers to coalesce for each write()
* system call.
*/
AsyncFileWriter(EventLoop& e, XorpFd fd, uint32_t coalesce = 1,
int priority = XorpTask::PRIORITY_DEFAULT);
~AsyncFileWriter();
/**
* Add an additional buffer for writing from.
*
* Note that the buffer with the data is managed by the user.
*
* @param buffer pointer to buffer.
* @param buffer_bytes size of buffer in bytes.
* @param cb Callback object to invoke when I/O is performed.
*/
void add_buffer(const uint8_t* buffer,
size_t buffer_bytes,
const Callback& cb);
/**
* Add an additional buffer for writing from by using sendto(2).
*
* Note that sendto()-buffers are never coalesced with other buffers.
*
* @param buffer pointer to buffer.
* @param buffer_bytes size of buffer in bytes.
* @param dst_addr the destination address to send the data to.
* @param dst_port the destination port (in host order) to send the
* data to.
* @param cb Callback object to invoke when I/O is performed.
*/
void add_buffer_sendto(const uint8_t* buffer,
size_t buffer_bytes,
const IPvX& dst_addr,
uint16_t dst_port,
const Callback& cb);
/**
* Add an additional buffer for writing from.
*
* @param buffer pointer to buffer.
* @param buffer_bytes size of buffer in bytes.
* @param offset the starting point to write from in the buffer.
* @param cb Callback object to invoke when I/O is performed.
*/
void add_buffer_with_offset(const uint8_t* buffer,
size_t buffer_bytes,
size_t offset,
const Callback& cb);
/**
* Add additional data for writing from.
*
* Note that the data is stored to write is stored internally by
* AsyncFileWriter.
*
* @param data the data to write.
* @param cb Callback object to invoke when I/O is performed.
*/
void add_data(const vector<uint8_t>& data,
const Callback& cb);
/**
* Add additional data for writing from by using sendto(2).
*
* Note that the data is stored to write is stored internally by
* AsyncFileWriter.
* Note that sendto()-buffers are never coalesced with other buffers.
*
* @param data the data to send.
* @param dst_addr the destination address to send the data to.
* @param dst_port the destination port (in host order) to send the
* data to.
* @param cb Callback object to invoke when I/O is performed.
*/
void add_data_sendto(const vector<uint8_t>& data,
const IPvX& dst_addr,
uint16_t dst_port,
const Callback& cb);
/**
* Start asynchronous operation.
*
* @return true on success, false if no buffers are available.
*/
bool start();
/**
* Stop asynchronous operation.
*/
void stop();
/**
* @return the number of buffers available.
*/
size_t buffers_remaining() const { return _buffers.size(); }
/**
* Stop asynchronous operation and clear list of buffers.
*/
void flush_buffers();
private:
AsyncFileWriter(); // Not implemented
AsyncFileWriter(const AsyncFileWriter&); // Not implemented
AsyncFileWriter& operator=(const AsyncFileWriter&); // Not implemented
protected:
class BufferInfo {
public:
BufferInfo(const uint8_t* b, size_t bb, const Callback& cb)
: _buffer(b), _buffer_bytes(bb), _offset(0), _dst_port(0),
_cb(cb), _is_sendto(false) {}
BufferInfo(const uint8_t* b, size_t bb, const IPvX& dst_addr,
uint16_t dst_port, const Callback& cb)
: _buffer(b), _buffer_bytes(bb), _offset(0), _dst_addr(dst_addr),
_dst_port(dst_port), _cb(cb), _is_sendto(true) {}
BufferInfo(const uint8_t* b, size_t bb, size_t off, const Callback& cb)
: _buffer(b), _buffer_bytes(bb), _offset(off), _dst_port(0),
_cb(cb), _is_sendto(false) {}
BufferInfo(const vector<uint8_t>& data, const Callback& cb)
: _data(data), _buffer(&_data[0]), _buffer_bytes(_data.size()),
_offset(0), _dst_port(0), _cb(cb), _is_sendto(false) {}
BufferInfo(const vector<uint8_t>& data, const IPvX& dst_addr,
uint16_t dst_port, const Callback& cb)
: _data(data), _buffer(&_data[0]), _buffer_bytes(_data.size()),
_offset(0), _dst_addr(dst_addr), _dst_port(dst_port),
_cb(cb), _is_sendto(true) {}
void dispatch_callback(AsyncFileOperator::Event e) {
_cb->dispatch(e, _buffer, _buffer_bytes, _offset);
}
const uint8_t* buffer() const { return (_buffer); }
size_t buffer_bytes() const { return (_buffer_bytes); }
size_t offset() const { return (_offset); }
void incr_offset(size_t done) { _offset += done; }
const IPvX& dst_addr() const { return (_dst_addr); }
uint16_t dst_port() const { return (_dst_port); }
bool is_sendto() const { return (_is_sendto); }
private:
BufferInfo(); // Not implemented
BufferInfo(const BufferInfo&); // Not implemented
BufferInfo& operator=(const BufferInfo&); // Not implemented
const vector<uint8_t> _data; // Local copy of the data
const uint8_t* _buffer;
size_t _buffer_bytes;
size_t _offset;
const IPvX _dst_addr;
const uint16_t _dst_port;
Callback _cb;
bool _is_sendto;
};
void write(XorpFd, IoEventType);
void complete_transfer(ssize_t done);
uint32_t _coalesce;
struct iovec* _iov;
ref_ptr<int> _dtoken;
list<BufferInfo *> _buffers;
#ifdef HOST_OS_WINDOWS
void disconnect(XorpFd fd, IoEventType type);
XorpTask _deferred_io_task;
bool _disconnect_added;
#endif
};
#endif // __LIBXORP_ASYNCIO_HH__
Generated by: pavlin on kobe.xorp.net on Wed Jan 7 19:10:36 2009, using kdoc 2.0a54+XORP.