Main Page | Namespace List | Class Hierarchy | Alphabetical List | Class List | File List | Namespace Members | Class Members | File Members

ChunkedIO.h

Go to the documentation of this file.
00001 // $Id: ChunkedIO.h,v 1.7 2005/09/13 17:25:27 vern Exp $
00002 //
00003 // Implements non-blocking chunk-wise I/O.
00004 
00005 #ifndef CHUNKEDIO_H
00006 #define CHUNKEDIO_H
00007 
00008 #include "config.h"
00009 #include "List.h"
00010 #include "util.h"
00011 
00012 class CompressedChunkedIO;
00013 
00014 // Abstract base class.
00015 class ChunkedIO {
00016 public:
00017         ChunkedIO(int error_fd = 0);
00018         virtual ~ChunkedIO()    { }
00019 
00020         typedef struct {
00021                 char* data;
00022                 uint32 len;
00023         } Chunk;
00024 
00025         // Initialization before any I/O operation is performed. Returns false
00026         // on any form of error.
00027         virtual bool Init()     { return true; }
00028 
00029         // Tries to read the next chunk of data. If it can be read completely,
00030         // a pointer to it is returned in 'chunk' (ownership of chunk is
00031         // passed).  If not, 'chunk' is set to nil.  Returns false if any
00032         // I/O error occurred (use Eof() to see if it's an end-of-file).
00033         // If 'may_block' is true, we explicitly allow blocking.
00034         virtual bool Read(Chunk** chunk, bool may_block = false) = 0;
00035 
00036         // Puts the chunk into the write queue and writes as much data
00037         // as possible (takes ownership of chunk).
00038         // Returns false on any I/O error.
00039         virtual bool Write(Chunk* chunk) = 0;
00040 
00041         // Tries to write as much as currently possible.
00042         // Returns false on any I/O error.
00043         virtual bool Flush() = 0;
00044 
00045         // If an I/O error has been encountered, returns a string describing it.
00046         virtual const char* Error() = 0;
00047 
00048         // Return true if there is currently at least one chunk available
00049         // for reading.
00050         virtual bool CanRead() = 0;
00051 
00052         // Return true if there is currently at least one chunk waiting to be
00053         // written.
00054         virtual bool CanWrite() = 0;
00055 
00056         // Returns true if source believes that there won't be much data soon.
00057         virtual bool IsIdle() = 0;
00058 
00059         // Returns true if internal write buffers are about to fill up.
00060         virtual bool IsFillingUp() = 0;
00061 
00062         // Throws away buffered data.
00063         virtual void Clear() = 0;
00064 
00065         // Returns true,if end-of-file has been reached.
00066         virtual bool Eof() = 0;
00067 
00068         // Returns underlying fd if available, -1 otherwise.
00069         virtual int Fd()        { return -1; }
00070 
00071         // Makes sure that no additional protocol data is written into
00072         // the output stream.  If this is activated, the output cannot
00073         // be read again by any of these classes!
00074         void MakePure() { pure = true; }
00075         bool IsPure()   { return pure; }
00076 
00077         // Writes a log message to the error_fd.
00078         void Log(const char* str);
00079 
00080         struct Statistics {
00081                 Statistics()
00082                         {
00083                         bytes_read = 0;
00084                         bytes_written = 0;
00085                         chunks_read = 0;
00086                         chunks_written = 0;
00087                         reads = 0;
00088                         writes = 0;
00089                         pending = 0;
00090                         }
00091 
00092                 unsigned long bytes_read;
00093                 unsigned long bytes_written;
00094                 unsigned long chunks_read;
00095                 unsigned long chunks_written;
00096                 unsigned long reads;    // # calls which transfered > 0 bytes
00097                 unsigned long writes;
00098                 unsigned long pending;
00099                 };
00100 
00101         // Returns raw statistics.
00102         const Statistics* Stats() const         { return &stats; }
00103 
00104         // Puts a formatted string containing statistics into buffer.
00105         virtual void Stats(char* buffer, int length);
00106 
00107 protected:
00108         Statistics stats;
00109         const char* tag;
00110 
00111 private:
00112         bool pure;
00113         int error_fd;
00114 };
00115 
00116 // Chunked I/O using a file descriptor.
00117 class ChunkedIOFd : public ChunkedIO {
00118 public:
00119         // Argument is an open bi-directional file descriptor.
00120         ChunkedIOFd(int fd, const char* tag, int error_fd);
00121         virtual ~ChunkedIOFd();
00122 
00123         virtual bool Read(Chunk** chunk, bool may_block = false);
00124         virtual bool Write(Chunk* chunk);
00125         virtual bool Flush();
00126         virtual const char* Error();
00127         virtual bool CanRead();
00128         virtual bool CanWrite();
00129         virtual bool IsIdle();
00130         virtual bool IsFillingUp();
00131         virtual void Clear();
00132         virtual bool Eof()      { return eof; }
00133         virtual int Fd()        { return fd; }
00134         virtual void Stats(char* buffer, int length);
00135 
00136 private:
00137 
00138         bool PutIntoWriteBuffer(Chunk* chunk);
00139         bool FlushWriteBuffer();
00140         Chunk* ExtractChunk();
00141 
00142         // Returns size of next chunk in buffer or 0 if none.
00143         uint32 ChunkAvailable();
00144 
00145         // Flushes if it thinks it is time to.
00146         bool OptionalFlush();
00147 
00148         // Concatenates the the data of the two chunks forming a new one.
00149         // The old chunkds are deleted.
00150         Chunk* ConcatChunks(Chunk* c1, Chunk* c2);
00151 
00152         // Reads/writes on chunk of upto BUFFER_SIZE bytes.
00153         bool WriteChunk(Chunk* chunk, bool partial);
00154         bool ReadChunk(Chunk** chunk, bool may_block);
00155 
00156         int fd;
00157         bool eof;
00158         double last_flush;
00159         int failed_reads;
00160 
00161         // Optimally, this should match the file descriptor's
00162         // buffer size (for sockets, it may be helpful to
00163         // increase the send/receive buffers).
00164         static const unsigned int BUFFER_SIZE = 1024 * 1024 * 5;
00165 
00166         // We 'or' this to the length of a data chunk to mark
00167         // that it's part of a larger one. This has to be larger
00168         // than BUFFER_SIZE.
00169         static const uint32 FLAG_PARTIAL = 0x80000000;
00170 
00171         // We report that we're filling up when there are more than this number
00172         // of pending chunks.
00173         static const uint32 MAX_BUFFERED_CHUNKS_SOFT = 400000;
00174 
00175         // Maximum number of chunks we store in memory before rejecting writes.
00176         static const uint32 MAX_BUFFERED_CHUNKS = 500000;
00177 
00178         char* read_buffer;
00179         uint32 read_len;
00180         uint32 read_pos;
00181         Chunk* partial; // when we read an oversized chunk, we store it here
00182 
00183         char* write_buffer;
00184         uint32 write_len;
00185         uint32 write_pos;
00186 
00187         struct ChunkQueue {
00188                 Chunk* chunk;
00189                 ChunkQueue* next;
00190         };
00191 
00192         // Chunks that don't fit into our write buffer.
00193         ChunkQueue* pending_head;
00194         ChunkQueue* pending_tail;
00195 
00196         pid_t parent;
00197 };
00198 
00199 #ifdef USE_OPENSSL
00200 
00201 #ifdef NEED_KRB5_H
00202 # include <krb5.h>
00203 #endif 
00204 
00205 #include <openssl/ssl.h>
00206 #include <openssl/err.h>
00207 
00208 // Chunked I/O using an SSL connection.
00209 
00210 class ChunkedIOSSL : public ChunkedIO {
00211 public:
00212         // Argument is an open socket and a flag indicating whether we are the
00213         // server side of the connection.
00214         ChunkedIOSSL(int socket, bool server, int error_fd);
00215         virtual ~ChunkedIOSSL();
00216 
00217         virtual bool Init();
00218         virtual bool Read(Chunk** chunk, bool mayblock = false);
00219         virtual bool Write(Chunk* chunk);
00220         virtual bool Flush();
00221         virtual const char* Error();
00222         virtual bool CanRead();
00223         virtual bool CanWrite();
00224         virtual bool IsIdle();
00225         virtual bool IsFillingUp();
00226         virtual void Clear();
00227         virtual bool Eof()      { return eof; }
00228         virtual int Fd()        { return socket; }
00229         virtual void Stats(char* buffer, int length);
00230 
00231 private:
00232         // Maximum number of chunks we store in memory before rejecting writes.
00233         static const uint32 MAX_BUFFERED_CHUNKS = 500000;
00234 
00235         // Only returns true if all data has been read. If not, call
00236         // it again with the same parameters as long as error is not
00237         // set to true.
00238         bool ReadData(char* p, uint32 len, bool* error);
00239         // Same for writing.
00240         bool WriteData(char* p, uint32 len, bool* error);
00241 
00242         int socket;
00243         int last_ret;   // last error code
00244         bool eof;
00245 
00246         bool server;    // are we the server?
00247         bool setup;     // has the connection been setup successfully?
00248 
00249         SSL* ssl;
00250 
00251         // Write queue.
00252         struct Queue {
00253                 Chunk* chunk;
00254                 Queue* next;
00255         };
00256 
00257         // The chunk part we are reading/writing
00258         enum State { LEN, DATA };
00259 
00260         State write_state;
00261         Queue* write_head;
00262         Queue* write_tail;
00263 
00264         State read_state;
00265         Chunk* read_chunk;
00266         char* read_ptr;
00267 
00268         // One SSL for all connections.
00269         static SSL_CTX* ctx;
00270 };
00271 
00272 #endif  /* USE_OPENSSL */
00273 
00274 #ifdef HAVE_LIBZ
00275 
00276 #include <zlib.h>
00277 
00278 // Wrapper class around a another ChunkedIO which the (un-)compresses data.
00279 class CompressedChunkedIO : public ChunkedIO {
00280 public:
00281         CompressedChunkedIO(ChunkedIO* arg_io)
00282                 : io(arg_io) {} // takes ownership
00283         virtual ~CompressedChunkedIO()  { delete io; }
00284 
00285         virtual bool Init(); // does *not* call arg_io->Init()
00286         virtual bool Read(Chunk** chunk, bool may_block = false);
00287         virtual bool Write(Chunk* chunk);
00288         virtual bool Flush()    { return io->Flush(); }
00289         virtual const char* Error()     { return error ? error : io->Error(); }
00290         virtual bool CanRead()  { return io->CanRead(); }
00291         virtual bool CanWrite() { return io->CanWrite(); }
00292         virtual bool IsIdle()   { return io->IsIdle(); }
00293         virtual bool IsFillingUp()      { return io->IsFillingUp(); }
00294         virtual void Clear()    { return io->Clear(); }
00295 
00296         virtual bool Eof()      { return io->Eof(); }
00297         virtual int Fd()        { return io->Fd(); }
00298         virtual void Stats(char* buffer, int length);
00299 
00300         void EnableCompression(int level)
00301                 { deflateInit(&zout, level); compress = true; }
00302         void EnableDecompression()
00303                 { inflateInit(&zin); uncompress = true; }
00304 
00305 protected:
00306         // Only compress block with size >= this.
00307         static const unsigned int MIN_COMPRESS_SIZE = 30;
00308 
00309         ChunkedIO* io;
00310         z_stream zin;
00311         z_stream zout;
00312         const char* error;
00313 
00314         bool compress;
00315         bool uncompress;
00316 
00317         // Keep some statistics.
00318         unsigned long uncompressed_bytes_read;
00319         unsigned long uncompressed_bytes_written;
00320 };
00321 
00322 #endif  /* HAVE_LIBZ */
00323 
00324 #endif

Generated on Wed Sep 14 02:55:59 2005 for bro_docs by doxygen 1.3.5