00001
00002
00003
00004
00005 #ifndef CHUNKEDIO_H
00006 #define CHUNKEDIO_H
00007
00008 #include "config.h"
00009 #include "List.h"
00010 #include "util.h"
00011
00012 class CompressedChunkedIO;
00013
00014
00015 class ChunkedIO {
00016 public:
00017 ChunkedIO(int error_fd = 0);
00018 virtual ~ChunkedIO() { }
00019
00020 typedef struct {
00021 char* data;
00022 uint32 len;
00023 } Chunk;
00024
00025
00026
00027 virtual bool Init() { return true; }
00028
00029
00030
00031
00032
00033
00034 virtual bool Read(Chunk** chunk, bool may_block = false) = 0;
00035
00036
00037
00038
00039 virtual bool Write(Chunk* chunk) = 0;
00040
00041
00042
00043 virtual bool Flush() = 0;
00044
00045
00046 virtual const char* Error() = 0;
00047
00048
00049
00050 virtual bool CanRead() = 0;
00051
00052
00053
00054 virtual bool CanWrite() = 0;
00055
00056
00057 virtual bool IsIdle() = 0;
00058
00059
00060 virtual bool IsFillingUp() = 0;
00061
00062
00063 virtual void Clear() = 0;
00064
00065
00066 virtual bool Eof() = 0;
00067
00068
00069 virtual int Fd() { return -1; }
00070
00071
00072
00073
00074 void MakePure() { pure = true; }
00075 bool IsPure() { return pure; }
00076
00077
00078 void Log(const char* str);
00079
00080 struct Statistics {
00081 Statistics()
00082 {
00083 bytes_read = 0;
00084 bytes_written = 0;
00085 chunks_read = 0;
00086 chunks_written = 0;
00087 reads = 0;
00088 writes = 0;
00089 pending = 0;
00090 }
00091
00092 unsigned long bytes_read;
00093 unsigned long bytes_written;
00094 unsigned long chunks_read;
00095 unsigned long chunks_written;
00096 unsigned long reads;
00097 unsigned long writes;
00098 unsigned long pending;
00099 };
00100
00101
00102 const Statistics* Stats() const { return &stats; }
00103
00104
00105 virtual void Stats(char* buffer, int length);
00106
00107 protected:
00108 Statistics stats;
00109 const char* tag;
00110
00111 private:
00112 bool pure;
00113 int error_fd;
00114 };
00115
00116
00117 class ChunkedIOFd : public ChunkedIO {
00118 public:
00119
00120 ChunkedIOFd(int fd, const char* tag, int error_fd);
00121 virtual ~ChunkedIOFd();
00122
00123 virtual bool Read(Chunk** chunk, bool may_block = false);
00124 virtual bool Write(Chunk* chunk);
00125 virtual bool Flush();
00126 virtual const char* Error();
00127 virtual bool CanRead();
00128 virtual bool CanWrite();
00129 virtual bool IsIdle();
00130 virtual bool IsFillingUp();
00131 virtual void Clear();
00132 virtual bool Eof() { return eof; }
00133 virtual int Fd() { return fd; }
00134 virtual void Stats(char* buffer, int length);
00135
00136 private:
00137
00138 bool PutIntoWriteBuffer(Chunk* chunk);
00139 bool FlushWriteBuffer();
00140 Chunk* ExtractChunk();
00141
00142
00143 uint32 ChunkAvailable();
00144
00145
00146 bool OptionalFlush();
00147
00148
00149
00150 Chunk* ConcatChunks(Chunk* c1, Chunk* c2);
00151
00152
00153 bool WriteChunk(Chunk* chunk, bool partial);
00154 bool ReadChunk(Chunk** chunk, bool may_block);
00155
00156 int fd;
00157 bool eof;
00158 double last_flush;
00159 int failed_reads;
00160
00161
00162
00163
00164 static const unsigned int BUFFER_SIZE = 1024 * 1024 * 5;
00165
00166
00167
00168
00169 static const uint32 FLAG_PARTIAL = 0x80000000;
00170
00171
00172
00173 static const uint32 MAX_BUFFERED_CHUNKS_SOFT = 400000;
00174
00175
00176 static const uint32 MAX_BUFFERED_CHUNKS = 500000;
00177
00178 char* read_buffer;
00179 uint32 read_len;
00180 uint32 read_pos;
00181 Chunk* partial;
00182
00183 char* write_buffer;
00184 uint32 write_len;
00185 uint32 write_pos;
00186
00187 struct ChunkQueue {
00188 Chunk* chunk;
00189 ChunkQueue* next;
00190 };
00191
00192
00193 ChunkQueue* pending_head;
00194 ChunkQueue* pending_tail;
00195
00196 pid_t parent;
00197 };
00198
00199 #ifdef USE_OPENSSL
00200
00201 #ifdef NEED_KRB5_H
00202 # include <krb5.h>
00203 #endif
00204
00205 #include <openssl/ssl.h>
00206 #include <openssl/err.h>
00207
00208
00209
00210 class ChunkedIOSSL : public ChunkedIO {
00211 public:
00212
00213
00214 ChunkedIOSSL(int socket, bool server, int error_fd);
00215 virtual ~ChunkedIOSSL();
00216
00217 virtual bool Init();
00218 virtual bool Read(Chunk** chunk, bool mayblock = false);
00219 virtual bool Write(Chunk* chunk);
00220 virtual bool Flush();
00221 virtual const char* Error();
00222 virtual bool CanRead();
00223 virtual bool CanWrite();
00224 virtual bool IsIdle();
00225 virtual bool IsFillingUp();
00226 virtual void Clear();
00227 virtual bool Eof() { return eof; }
00228 virtual int Fd() { return socket; }
00229 virtual void Stats(char* buffer, int length);
00230
00231 private:
00232
00233 static const uint32 MAX_BUFFERED_CHUNKS = 500000;
00234
00235
00236
00237
00238 bool ReadData(char* p, uint32 len, bool* error);
00239
00240 bool WriteData(char* p, uint32 len, bool* error);
00241
00242 int socket;
00243 int last_ret;
00244 bool eof;
00245
00246 bool server;
00247 bool setup;
00248
00249 SSL* ssl;
00250
00251
00252 struct Queue {
00253 Chunk* chunk;
00254 Queue* next;
00255 };
00256
00257
00258 enum State { LEN, DATA };
00259
00260 State write_state;
00261 Queue* write_head;
00262 Queue* write_tail;
00263
00264 State read_state;
00265 Chunk* read_chunk;
00266 char* read_ptr;
00267
00268
00269 static SSL_CTX* ctx;
00270 };
00271
00272 #endif
00273
00274 #ifdef HAVE_LIBZ
00275
00276 #include <zlib.h>
00277
00278
00279 class CompressedChunkedIO : public ChunkedIO {
00280 public:
00281 CompressedChunkedIO(ChunkedIO* arg_io)
00282 : io(arg_io) {}
00283 virtual ~CompressedChunkedIO() { delete io; }
00284
00285 virtual bool Init();
00286 virtual bool Read(Chunk** chunk, bool may_block = false);
00287 virtual bool Write(Chunk* chunk);
00288 virtual bool Flush() { return io->Flush(); }
00289 virtual const char* Error() { return error ? error : io->Error(); }
00290 virtual bool CanRead() { return io->CanRead(); }
00291 virtual bool CanWrite() { return io->CanWrite(); }
00292 virtual bool IsIdle() { return io->IsIdle(); }
00293 virtual bool IsFillingUp() { return io->IsFillingUp(); }
00294 virtual void Clear() { return io->Clear(); }
00295
00296 virtual bool Eof() { return io->Eof(); }
00297 virtual int Fd() { return io->Fd(); }
00298 virtual void Stats(char* buffer, int length);
00299
00300 void EnableCompression(int level)
00301 { deflateInit(&zout, level); compress = true; }
00302 void EnableDecompression()
00303 { inflateInit(&zin); uncompress = true; }
00304
00305 protected:
00306
00307 static const unsigned int MIN_COMPRESS_SIZE = 30;
00308
00309 ChunkedIO* io;
00310 z_stream zin;
00311 z_stream zout;
00312 const char* error;
00313
00314 bool compress;
00315 bool uncompress;
00316
00317
00318 unsigned long uncompressed_bytes_read;
00319 unsigned long uncompressed_bytes_written;
00320 };
00321
00322 #endif
00323
00324 #endif