Main Page | Namespace List | Class Hierarchy | Alphabetical List | Class List | File List | Namespace Members | Class Members | File Members

ChunkedIO.cc

Go to the documentation of this file.
00001 // $Id: ChunkedIO.cc,v 1.7 2005/09/09 22:41:42 vern Exp $
00002 
00003 #include <unistd.h>
00004 #include <fcntl.h>
00005 #include <errno.h>
00006 #include <signal.h>
00007 #include <sys/time.h>
00008 #include <netinet/in.h>
00009 #include <assert.h>
00010 
00011 #include "config.h"
00012 #include "ChunkedIO.h"
00013 #include "NetVar.h"
00014 
00015 
00016 ChunkedIO::ChunkedIO(int arg_error_fd)
00017         {
00018         pure = false;
00019         error_fd = arg_error_fd;
00020         }
00021 
00022 void ChunkedIO::Stats(char* buffer, int length)
00023         {
00024         safe_snprintf(buffer, length,
00025                       "bytes=%luK/%luK chunks=%lu/%lu io=%lu/%lu bytes/io=%.2fK/%.2fK",
00026                       stats.bytes_read / 1024, stats.bytes_written / 1024,
00027                       stats.chunks_read, stats.chunks_written,
00028                       stats.reads, stats.writes,
00029                       stats.bytes_read / (1024.0 * stats.reads),
00030                       stats.bytes_written / (1024.0 * stats.writes));
00031         }
00032 
00033 ChunkedIOFd::ChunkedIOFd(int arg_fd, const char* arg_tag, int error_fd)
00034 : ChunkedIO(error_fd)
00035         {
00036         int flags;
00037 
00038         tag = arg_tag;
00039         fd = arg_fd;
00040         eof = 0;
00041         last_flush = current_time();
00042         failed_reads = 0;
00043 
00044         if ( (flags = fcntl(fd, F_GETFL, 0)) < 0)
00045                 {
00046                 Log(fmt("can't obtain socket flags: %s", strerror(errno)));
00047                 exit(1);
00048                 }
00049 
00050         if ( fcntl(fd, F_SETFL, flags|O_NONBLOCK) < 0 )
00051                 {
00052                 Log(fmt("can't set fd to non-blocking: %s (%d)",
00053                           strerror(errno), getpid()));
00054                 exit(1);
00055                 }
00056 
00057         read_buffer = new char[BUFFER_SIZE];
00058         read_len = 0;
00059         read_pos = 0;
00060         partial = 0;
00061         write_buffer = new char[BUFFER_SIZE];
00062         write_len = 0;
00063         write_pos = 0;
00064 
00065         pending_head = 0;
00066         pending_tail = 0;
00067 
00068         parent = getppid();
00069         }
00070 
00071 ChunkedIOFd::~ChunkedIOFd()
00072         {
00073         Clear();
00074 
00075         delete [] read_buffer;
00076         delete [] write_buffer;
00077         close(fd);
00078 
00079         if ( partial )
00080                 {
00081                 delete [] partial->data;
00082                 delete partial;
00083                 }
00084         }
00085 
00086 bool ChunkedIOFd::Write(Chunk* chunk)
00087         {
00088 #ifdef DEBUG
00089         DBG_LOG(DBG_CHUNKEDIO, "write of size %d [%s]",
00090                 chunk->len, fmt_bytes(chunk->data, min(20, chunk->len)));
00091 #endif
00092 
00093         // Reject if our queue of pending chunks is way too large. Otherwise,
00094         // memory could fill up if the other side doesn't read.
00095         if ( stats.pending > MAX_BUFFERED_CHUNKS )
00096                 {
00097                 DBG_LOG(DBG_CHUNKEDIO, "write queue full");
00098                 errno = ENOSPC;
00099                 return false;
00100                 }
00101 
00102         if ( chunk->len <= BUFFER_SIZE - sizeof(uint32) )
00103                 return WriteChunk(chunk, false);
00104 
00105         // We have to split it up.
00106         char* p = chunk->data;
00107         unsigned long left = chunk->len;
00108 
00109         while ( left )
00110                 {
00111                 Chunk* part = new Chunk;
00112 
00113                 part->len = min(BUFFER_SIZE - sizeof(uint32), left);
00114                 part->data = new char[part->len];
00115                 short_memcpy(part->data, p, part->len);
00116                 left -= part->len;
00117                 p += part->len;
00118 
00119                 if ( ! WriteChunk(part, left != 0) )
00120                         return false;
00121                 }
00122 
00123         delete [] chunk->data;
00124         delete chunk;
00125 
00126         return true;
00127         }
00128 
00129 bool ChunkedIOFd::WriteChunk(Chunk* chunk, bool partial)
00130         {
00131         assert(chunk->len <= BUFFER_SIZE - sizeof(uint32) );
00132 
00133         if ( chunk->len == 0 )
00134                 internal_error( "attempt to write 0 bytes chunk");
00135 
00136         if ( partial )
00137                 chunk->len |= FLAG_PARTIAL;
00138 
00139         ++stats.chunks_written;
00140 
00141         // If it fits into the buffer, we're done (but keep care not
00142         // to reorder chunks).
00143         if ( ! pending_head && PutIntoWriteBuffer(chunk) )
00144                 return true;
00145 
00146         // Otherwise queue it.
00147         ++stats.pending;
00148         ChunkQueue* q = new ChunkQueue;
00149         q->chunk = chunk;
00150         q->next = 0;
00151 
00152         if ( pending_tail )
00153                 {
00154                 pending_tail->next = q;
00155                 pending_tail = q;
00156                 }
00157         else
00158                 pending_head = pending_tail = q;
00159 
00160         if ( stats.pending && stats.pending % 10000 == 0 )
00161                 Log(fmt("ChunkedIO: %ld blocks pending in process %d (%s)",
00162                                 stats.pending, getpid(), tag));
00163 
00164         return Flush();
00165         }
00166 
00167 
00168 bool ChunkedIOFd::PutIntoWriteBuffer(Chunk* chunk)
00169         {
00170         uint32 len = chunk->len & ~FLAG_PARTIAL;
00171 
00172         if ( write_len + len + (IsPure() ? 0 : sizeof(len)) > BUFFER_SIZE )
00173                 return false;
00174 
00175         if ( ! IsPure() )
00176                 {
00177                 uint32 nlen = htonl(chunk->len);
00178                 short_memcpy(write_buffer + write_len, &nlen, sizeof(nlen));
00179                 write_len += sizeof(nlen);
00180                 }
00181 
00182         short_memcpy(write_buffer + write_len, chunk->data, len);
00183         write_len += len;
00184 
00185         delete [] chunk->data;
00186         delete chunk;
00187 
00188         if ( network_time - last_flush > 0.005 )
00189                 FlushWriteBuffer();
00190 
00191         return true;
00192         }
00193 
00194 bool ChunkedIOFd::FlushWriteBuffer()
00195         {
00196         last_flush = network_time;
00197 
00198         while ( write_pos != write_len )
00199                 {
00200                 uint32 len = write_len - write_pos;
00201 
00202                 int written = write(fd, write_buffer + write_pos, len);
00203 
00204                 if ( written < 0 )
00205                         {
00206                         if ( errno == EPIPE )
00207                                 eof = true;
00208 
00209                         // These errnos are equal on POSIX.
00210                         return errno == EWOULDBLOCK || errno == EAGAIN ||
00211                                 errno == EINTR;
00212                         }
00213 
00214                 stats.bytes_written += written;
00215                 if ( written > 0 )
00216                         ++stats.writes;
00217 
00218                 if ( unsigned(written) == len )
00219                         {
00220                         write_pos = write_len = 0;
00221                         return true;
00222                         }
00223 
00224                 if ( written == 0 )
00225                         internal_error("written==0");
00226 
00227                 // Short write.
00228                 write_pos += written;
00229                 }
00230 
00231         return true;
00232         }
00233 
00234 bool ChunkedIOFd::OptionalFlush()
00235         {
00236         // This threshhold is quite arbitrary.
00237 //      if ( current_time() - last_flush > 0.01 )
00238         return Flush();
00239         }
00240 
00241 bool ChunkedIOFd::Flush()
00242         {
00243         // Try to write data out.
00244         while ( pending_head )
00245                 {
00246                 if ( ! FlushWriteBuffer() )
00247                         return false;
00248 
00249                 // If we couldn't write the whole buffer, we stop here
00250                 // and try again next time.
00251                 if ( write_len > 0 )
00252                         return true;
00253 
00254                 // Put as many pending chunks into the buffer as possible.
00255                 while ( pending_head )
00256                         {
00257                         if ( ! PutIntoWriteBuffer(pending_head->chunk) )
00258                                 break;
00259 
00260                         ChunkQueue* q = pending_head;
00261                         pending_head = pending_head->next;
00262                         if ( ! pending_head )
00263                                 pending_tail = 0;
00264 
00265                         --stats.pending;
00266                         delete q;
00267                         }
00268                 }
00269 
00270         return FlushWriteBuffer();
00271         }
00272 
00273 uint32 ChunkedIOFd::ChunkAvailable()
00274         {
00275         int bytes_left = read_len - read_pos;
00276 
00277         if ( bytes_left < int(sizeof(uint32)) )
00278                 return 0;
00279 
00280         bytes_left -= sizeof(uint32);
00281 
00282         // We have to copy the value here as it may not be
00283         // aligned correctly in the data.
00284         uint32 len;
00285         short_memcpy(&len, read_buffer + read_pos, sizeof(len));
00286         len = ntohl(len);
00287 
00288         if ( uint32(bytes_left) < (len & ~FLAG_PARTIAL) )
00289                 return 0;
00290 
00291         assert(len & ~FLAG_PARTIAL);
00292 
00293         return len;
00294         }
00295 
00296 ChunkedIO::Chunk* ChunkedIOFd::ExtractChunk()
00297         {
00298         uint32 len = ChunkAvailable();
00299         uint32 real_len = len & ~FLAG_PARTIAL;
00300         if ( ! real_len )
00301                 return 0;
00302 
00303         read_pos += sizeof(uint32);
00304 
00305         Chunk* chunk = new Chunk;
00306         chunk->len = len;
00307         chunk->data = new char[real_len];
00308         short_memcpy(chunk->data, read_buffer + read_pos, real_len);
00309         read_pos += real_len;
00310 
00311         ++stats.chunks_read;
00312 
00313         return chunk;
00314         }
00315 
00316 ChunkedIO::Chunk* ChunkedIOFd::ConcatChunks(Chunk* c1, Chunk* c2)
00317         {
00318         Chunk* c = new Chunk;
00319 
00320         c->len = c1->len + c2->len;
00321         c->data = new char[c->len];
00322 
00323         short_memcpy(c->data, c1->data, c1->len);
00324         short_memcpy(c->data + c1->len, c2->data, c2->len);
00325 
00326         delete [] c1->data;
00327         delete c1;
00328         delete [] c2->data;
00329         delete c2;
00330 
00331         return c;
00332         }
00333 
00334 void ChunkedIO::Log(const char* str)
00335         {
00336         write(error_fd, str, strlen(str));
00337         write(error_fd, "\n", 1);
00338         }
00339 
00340 bool ChunkedIOFd::Read(Chunk** chunk, bool may_block)
00341         {
00342         // We will be called regularly. So take the opportunity
00343         // to flush the write buffer once in a while.
00344         OptionalFlush();
00345 
00346         if ( ! ReadChunk(chunk, may_block) )
00347                 return false;
00348 
00349         if ( ! *chunk )
00350                 return true;
00351 
00352 #ifdef DEBUG
00353         if ( *chunk )
00354                 DBG_LOG(DBG_CHUNKEDIO, "read of size %d %s[%s]",
00355                                 (*chunk)->len & ~FLAG_PARTIAL,
00356                                 (*chunk)->len & FLAG_PARTIAL ? "(P) " : "",
00357                                 fmt_bytes((*chunk)->data,
00358                                                 min(20, (*chunk)->len)));
00359 #endif
00360 
00361         if ( ! ((*chunk)->len & FLAG_PARTIAL) )
00362                 {
00363                 if ( ! partial )
00364                         return true;
00365                 else
00366                         {
00367                         // This is the last chunk of an oversized one.
00368                         *chunk = ConcatChunks(partial, *chunk);
00369                         partial = 0;
00370 
00371 #ifdef DEBUG
00372                         if ( *chunk )
00373                                 DBG_LOG(DBG_CHUNKEDIO,
00374                                         "built virtual chunk of size %d [%s]",
00375                                         (*chunk)->len,
00376                                         fmt_bytes((*chunk)->data, 20));
00377 #endif
00378 
00379 
00380                         return true;
00381                         }
00382                 }
00383 
00384         // This chunk is the non-last part of an oversized.
00385         (*chunk)->len &= ~FLAG_PARTIAL;
00386 
00387         if ( ! partial )
00388                 // First part of oversized chunk.
00389                 partial = *chunk;
00390         else
00391                 partial = ConcatChunks(partial, *chunk);
00392 
00393         *chunk = 0;
00394         return true; // Read following part next time.
00395         }
00396 
00397 bool ChunkedIOFd::ReadChunk(Chunk** chunk, bool may_block)
00398         {
00399         // We will be called regularly. So take the opportunity
00400         // to flush the write buffer once in a while.
00401         OptionalFlush();
00402 
00403         *chunk = ExtractChunk();
00404         if ( *chunk )
00405                 return true;
00406 
00407         int bytes_left = read_len - read_pos;
00408 
00409         // If we have a partial chunk left, move this to the head of
00410         // the buffer.
00411         if ( bytes_left )
00412                 memmove(read_buffer, read_buffer + read_pos, bytes_left);
00413 
00414         read_pos = 0;
00415         read_len = bytes_left;
00416 
00417         // If allowed, wait a bit for something to read.
00418         if ( may_block )
00419                 {
00420                 fd_set fd_read, fd_write, fd_except;
00421 
00422                 FD_ZERO(&fd_read);
00423                 FD_ZERO(&fd_write);
00424                 FD_ZERO(&fd_except);
00425                 FD_SET(fd, &fd_read);
00426 
00427                 struct timeval small_timeout;
00428                 small_timeout.tv_sec = 0;
00429                 small_timeout.tv_usec = 50;
00430 
00431                 select(fd + 1, &fd_read, &fd_write, &fd_except, &small_timeout);
00432                 }
00433 
00434         // Make sure the parent is still runnning
00435         // (only checking for EPIPE after a read doesn't
00436         // seem to be sufficient).
00437         if ( kill(parent, 0) < 0 && errno != EPERM )
00438                 {
00439                 eof = true;
00440                 errno = EPIPE;
00441                 return false;
00442                 }
00443 
00444         // Try to fill the buffer.
00445         while ( true )
00446                 {
00447                 int len = BUFFER_SIZE - read_len;
00448                 int read = ::read(fd, read_buffer + read_len, len);
00449 
00450                 if ( read < 0 )
00451                         {
00452                         // These errno's are the same on POSIX.
00453                         if ( errno == EWOULDBLOCK || errno == EAGAIN ||
00454                              errno == EINTR )
00455                                 {
00456                                 // Let's see if we have a chunk now --
00457                                 // even if we time out, we may have read
00458                                 // just enough in previous iterations!
00459                                 *chunk = ExtractChunk();
00460                                 ++failed_reads;
00461                                 return true;
00462                                 }
00463 
00464                         if ( errno == EPIPE )
00465                                 eof = true;
00466 
00467                         return false;
00468                         }
00469 
00470                 failed_reads = 0;
00471 
00472                 if ( read == 0 )
00473                         {
00474                         *chunk = ExtractChunk();
00475                         if ( *chunk )
00476                                 return true;
00477 
00478                         eof = true;
00479                         return false;
00480                         }
00481 
00482                 read_len += read;
00483 
00484                 ++stats.reads;
00485                 stats.bytes_read += read;
00486 
00487                 if ( read == len )
00488                         break;
00489                 }
00490 
00491         // Let's see if we have a chunk now.
00492         *chunk = ExtractChunk();
00493 
00494         return true;
00495         }
00496 
00497 bool ChunkedIOFd::CanRead()
00498         {
00499         // We will be called regularly. So take the opportunity
00500         // to flush the write buffer once in a while.
00501         OptionalFlush();
00502 
00503         if ( ChunkAvailable() )
00504                 return true;
00505 
00506         fd_set fd_read;
00507         FD_ZERO(&fd_read);
00508         FD_SET(fd, &fd_read);
00509 
00510         struct timeval no_timeout;
00511         no_timeout.tv_sec = 0;
00512         no_timeout.tv_usec = 0;
00513 
00514         return select(fd + 1, &fd_read, 0, 0, &no_timeout) > 0;
00515         }
00516 
00517 bool ChunkedIOFd::CanWrite()
00518         {
00519         return pending_head != 0;
00520         }
00521 
00522 bool ChunkedIOFd::IsIdle()
00523         {
00524         if ( pending_head || ChunkAvailable() )
00525                 return false;
00526 
00527         if ( failed_reads > 0 )
00528                 return true;
00529 
00530         return false;
00531         }
00532 
00533 bool ChunkedIOFd::IsFillingUp()
00534         {
00535         return stats.pending > MAX_BUFFERED_CHUNKS_SOFT;
00536         }
00537 
00538 void ChunkedIOFd::Clear()
00539         {
00540         while ( pending_head )
00541                 {
00542                 ChunkQueue* next = pending_head->next;
00543                 delete [] pending_head->chunk->data;
00544                 delete pending_head->chunk;
00545                 delete pending_head;
00546                 pending_head = next;
00547                 }
00548 
00549         pending_head = pending_tail = 0;
00550         }
00551 
00552 const char* ChunkedIOFd::Error()
00553         {
00554         static char buffer[1024];
00555         safe_snprintf(buffer, sizeof(buffer), "%s [%d]", strerror(errno), errno);
00556 
00557         return buffer;
00558         }
00559 
00560 void ChunkedIOFd::Stats(char* buffer, int length)
00561         {
00562         int i = safe_snprintf(buffer, length, "pending=%d ", stats.pending);
00563         ChunkedIO::Stats(buffer + i, length - i);
00564         }
00565 
00566 
00567 #ifdef USE_OPENSSL
00568 
00569 #include <openssl/ssl.h>
00570 
00571 SSL_CTX* ChunkedIOSSL::ctx;
00572 
00573 ChunkedIOSSL::ChunkedIOSSL(int arg_socket, bool arg_server, int error_fd)
00574         : ChunkedIO(error_fd)
00575         {
00576         socket = arg_socket;
00577         eof = false;
00578         setup = false;
00579         server = arg_server;
00580         ssl = 0;
00581 
00582         write_state = LEN;
00583         write_head = 0;
00584         write_tail = 0;
00585 
00586         read_state = LEN;
00587         read_chunk = 0;
00588         read_ptr = 0;
00589         }
00590 
00591 ChunkedIOSSL::~ChunkedIOSSL()
00592         {
00593         if ( setup )
00594                 {
00595                 SSL_shutdown(ssl);
00596 
00597                 // We don't care if the other side closes properly.
00598                 setup = false;
00599                 }
00600 
00601         if ( ssl )
00602                 {
00603                 SSL_free(ssl);
00604                 ssl = 0;
00605                 }
00606 
00607         close(socket);
00608         }
00609 
00610 
00611 static int pem_passwd_cb(char* buf, int size, int rwflag, void* passphrase)
00612         {
00613         safe_strncpy(buf, (char*) passphrase, size);
00614         buf[size - 1] = '\0';
00615         return strlen(buf);
00616         }
00617 
00618 bool ChunkedIOSSL::Init()
00619         {
00620         // If the handshake doesn't succeed immediately we will
00621         // be called multiple times.
00622         if ( ! ctx )
00623                 {
00624                 SSL_load_error_strings();
00625 
00626                 ctx = SSL_CTX_new(SSLv3_method());
00627                 if ( ! ctx )
00628                         {
00629                         Log("can't create SSL context");
00630                         return false;
00631                         }
00632 
00633                 // We access global variables here. But as they are
00634                 // declared const and we don't modify them this should
00635                 // be fine.
00636                 const char* key = ssl_private_key->AsString()->CheckString();
00637 
00638                 if ( ! (key && *key &&
00639                         SSL_CTX_use_certificate_chain_file(ctx, key)) )
00640                         {
00641                         Log(fmt("can't read certificate from file %s", key));
00642                         return false;
00643                         }
00644 
00645                 const char* passphrase =
00646                         ssl_passphrase->AsString()->CheckString();
00647 
00648                 if ( passphrase && ! streq(passphrase, "<undefined>") )
00649                         {
00650                         SSL_CTX_set_default_passwd_cb(ctx, pem_passwd_cb);
00651                         SSL_CTX_set_default_passwd_cb_userdata(ctx,
00652                                                         (void*) passphrase);
00653                         }
00654 
00655                 if ( ! (key && *key &&
00656                         SSL_CTX_use_PrivateKey_file(ctx, key, SSL_FILETYPE_PEM)) )
00657                         {
00658                         Log(fmt("can't read private key from file %s", key));
00659                         return false;
00660                         }
00661 
00662                 const char* ca = ssl_ca_certificate->AsString()->CheckString();
00663                 if ( ! (ca && *ca && SSL_CTX_load_verify_locations(ctx, ca, 0)) )
00664                         {
00665                         Log(fmt("can't read CA certificate from file %s", ca));
00666                         return false;
00667                         }
00668 
00669                 // Only use real ciphers.
00670                 if ( ! SSL_CTX_set_cipher_list(ctx, "HIGH") )
00671                         {
00672                         Log("can't set cipher list");
00673                         return false;
00674                         }
00675 
00676                 // Require client certificate.
00677                 SSL_CTX_set_verify(ctx,
00678                         SSL_VERIFY_PEER | SSL_VERIFY_FAIL_IF_NO_PEER_CERT, 0);
00679                 }
00680 
00681         int flags;
00682 
00683         if ( (flags = fcntl(socket, F_GETFL, 0)) < 0)
00684                 {
00685                 Log(fmt("can't obtain socket flags: %s", strerror(errno)));
00686                 return false;
00687                 }
00688 
00689         if ( fcntl(socket, F_SETFL, flags|O_NONBLOCK) < 0 )
00690                 {
00691                 Log(fmt("can't set socket to non-blocking: %s",
00692                           strerror(errno)));
00693                 return false;
00694                 }
00695 
00696         if ( ! ssl )
00697                 {
00698                 ssl = SSL_new(ctx);
00699                 if ( ! ssl )
00700                         {
00701                         Log("can't create SSL object");
00702                         return false;
00703                         }
00704 
00705                 BIO* bio = BIO_new_socket(socket, BIO_NOCLOSE);
00706                 BIO_set_nbio(bio, 1);
00707                 SSL_set_bio(ssl, bio, bio);
00708                 }
00709 
00710         int success;
00711         if ( server )
00712                 success = last_ret = SSL_accept(ssl);
00713         else
00714                 success = last_ret = SSL_connect(ssl);
00715 
00716         if ( success > 0 )
00717                 { // handshake done
00718                 setup = true;
00719                 return true;
00720                 }
00721 
00722         int error = SSL_get_error(ssl, success);
00723 
00724         if ( success <= 0 &&
00725              (error == SSL_ERROR_WANT_WRITE || error == SSL_ERROR_WANT_READ) )
00726                 // Handshake not finished yet, but that's ok for now.
00727                 return true;
00728 
00729         // Some error.
00730         eof = true;
00731         return false;
00732         }
00733 
00734 bool ChunkedIOSSL::Write(Chunk* chunk)
00735         {
00736 #ifdef DEBUG
00737         DBG_LOG(DBG_CHUNKEDIO, "ssl write of size %d [%s]",
00738                 chunk->len, fmt_bytes(chunk->data, 20));
00739 #endif
00740 
00741         // Reject if our queue of pending chunks is way too large. Otherwise,
00742         // memory could fill up if the other side doesn't read.
00743         if ( stats.pending > MAX_BUFFERED_CHUNKS )
00744                 {
00745                 DBG_LOG(DBG_CHUNKEDIO, "write queue full");
00746                 errno = ENOSPC;
00747                 return false;
00748                 }
00749 
00750         // Queue it.
00751         ++stats.pending;
00752         Queue* q = new Queue;
00753         q->chunk = chunk;
00754         q->next = 0;
00755 
00756         // Temporarily convert len into network byte order.
00757         chunk->len = htonl(chunk->len);
00758 
00759         if ( write_tail )
00760                 {
00761                 write_tail->next = q;
00762                 write_tail = q;
00763                 }
00764         else
00765                 write_head = write_tail = q;
00766 
00767         Flush();
00768         return true;
00769         }
00770 
00771 bool ChunkedIOSSL::WriteData(char* p, uint32 len, bool* error)
00772         {
00773         *error = false;
00774 
00775         double t = current_time();
00776 
00777         int written = last_ret = SSL_write(ssl, p, len);
00778 
00779         switch ( SSL_get_error(ssl, written) ) {
00780                 case SSL_ERROR_NONE:
00781                         // SSL guarantees us that all bytes have been written.
00782                         // That's nice. :-)
00783                         return true;
00784 
00785                 case SSL_ERROR_WANT_READ:
00786                 case SSL_ERROR_WANT_WRITE:
00787                         // Would block.
00788                         DBG_LOG(DBG_CHUNKEDIO,
00789                                 "SSL_write: SSL_ERROR_WANT_READ [%d,%d]",
00790                                 read, SSL_get_error(ssl, written));
00791                         *error = false;
00792                         return false;
00793 
00794                 case SSL_ERROR_ZERO_RETURN:
00795                         // Regular remote connection shutdown.
00796                         DBG_LOG(DBG_CHUNKEDIO,
00797                                 "SSL_write: SSL_ZERO_RETURN [%d,%d]",
00798                                 read, SSL_get_error(ssl, written));
00799                         *error = eof = true;
00800                         return false;
00801 
00802                 case SSL_ERROR_SYSCALL:
00803                         DBG_LOG(DBG_CHUNKEDIO,
00804                                 "SSL_write: SSL_SYS_CALL [%d,%d]",
00805                                 read, SSL_get_error(ssl, written));
00806 
00807                         if ( read == 0 )
00808                                 {
00809                                 // Socket connection closed.
00810                                 *error = eof = true;
00811                                 return false;
00812                                 }
00813 
00814                         // Fall through.
00815 
00816                 default:
00817                         DBG_LOG(DBG_CHUNKEDIO,
00818                                 "SSL_write: fatal error [%d,%d]",
00819                                 read, SSL_get_error(ssl, written));
00820                         // Fatal SSL error.
00821                         *error = true;
00822                         return false;
00823         }
00824 
00825         internal_error("can't be reached");
00826         return false;
00827         }
00828 
00829 bool ChunkedIOSSL::Flush()
00830         {
00831         if ( ! setup )
00832                 {
00833                 // We may need to finish the handshake.
00834                 if ( ! Init() )
00835                         return false;
00836                 if ( ! setup )
00837                         return true;
00838                 }
00839 
00840         while ( write_head )
00841                 {
00842                 bool error;
00843 
00844                 Chunk* c = write_head->chunk;
00845 
00846                 if ( write_state == LEN )
00847                         {
00848                         if ( ! WriteData((char*)&c->len, sizeof(c->len), &error) )
00849                                 return ! error;
00850                         write_state = DATA;
00851 
00852                         // Convert back from network byte order.
00853                         c->len = ntohl(c->len);
00854                         }
00855 
00856                 if ( ! WriteData(c->data, c->len, &error) )
00857                         return ! error;
00858 
00859                 // Chunk written, throw away.
00860                 Queue* q = write_head;
00861                 write_head = write_head->next;
00862                 if ( ! write_head )
00863                         write_tail = 0;
00864                 --stats.pending;
00865                 delete q;
00866 
00867                 delete [] c->data;
00868                 delete c;
00869 
00870                 write_state = LEN;
00871                 }
00872 
00873         return true;
00874         }
00875 
00876 bool ChunkedIOSSL::ReadData(char* p, uint32 len, bool* error)
00877         {
00878         if ( ! read_ptr )
00879                 read_ptr = p;
00880 
00881         while ( true )
00882                 {
00883                 double t = current_time();
00884 
00885                 int read = last_ret =
00886                         SSL_read(ssl, read_ptr, len - (read_ptr - p));
00887 
00888                 switch ( SSL_get_error(ssl, read) ) {
00889                 case SSL_ERROR_NONE:
00890                         // We're fine.
00891                         read_ptr += read;
00892 
00893                         if ( unsigned(read_ptr - p) == len )
00894                                 {
00895                                 // We have read as much as requested..
00896                                 read_ptr = 0;
00897                                 *error = false;
00898                                 return true;
00899                                 }
00900 
00901                         break;
00902 
00903                 case SSL_ERROR_WANT_READ:
00904                 case SSL_ERROR_WANT_WRITE:
00905                         // Would block.
00906                         DBG_LOG(DBG_CHUNKEDIO,
00907                                 "SSL_read: SSL_ERROR_WANT_READ [%d,%d]",
00908                                 read, SSL_get_error(ssl, read));
00909                         *error = false;
00910                         return false;
00911 
00912                 case SSL_ERROR_ZERO_RETURN:
00913                         // Regular remote connection shutdown.
00914                         DBG_LOG(DBG_CHUNKEDIO,
00915                                 "SSL_read: SSL_ZERO_RETURN [%d,%d]",
00916                                 read, SSL_get_error(ssl, read));
00917                         *error = eof = true;
00918                         return false;
00919 
00920                 case SSL_ERROR_SYSCALL:
00921                         DBG_LOG(DBG_CHUNKEDIO, "SSL_read: SSL_SYS_CALL [%d,%d]",
00922                                 read, SSL_get_error(ssl, read));
00923 
00924                         if ( read == 0 )
00925                                 {
00926                                 // Socket connection closed.
00927                                 *error = eof = true;
00928                                 return false;
00929                                 }
00930 
00931                         // Fall through.
00932 
00933                 default:
00934                         DBG_LOG(DBG_CHUNKEDIO,
00935                                 "SSL_read: fatal error [%d,%d]",
00936                                 read, SSL_get_error(ssl, read));
00937 
00938                         // Fatal SSL error.
00939                         *error = true;
00940                         return false;
00941                 }
00942                 }
00943 
00944         // Can't be reached.
00945         internal_error("can't be reached");
00946         return false;
00947         }
00948 
00949 bool ChunkedIOSSL::Read(Chunk** chunk, bool mayblock)
00950         {
00951         *chunk = 0;
00952 
00953         if ( ! setup )
00954                 {
00955                 // We may need to finish the handshake.
00956                 if ( ! Init() )
00957                         return false;
00958                 if ( ! setup )
00959                         return true;
00960                 }
00961 
00962         bool error;
00963 
00964         Flush();
00965 
00966         if ( read_state == LEN )
00967                 {
00968                 if ( ! read_chunk )
00969                         {
00970                         read_chunk = new Chunk;
00971                         read_chunk->data = 0;
00972                         }
00973 
00974                 if ( ! ReadData((char*)&read_chunk->len,
00975                                 sizeof(read_chunk->len),
00976                                 &error) )
00977                         return ! error;
00978 
00979                 read_state = DATA;
00980                 read_chunk->len = ntohl(read_chunk->len);
00981                 }
00982 
00983         if ( ! read_chunk->data )
00984                 read_chunk->data = new char[read_chunk->len];
00985 
00986         if ( ! ReadData(read_chunk->data, read_chunk->len, &error) )
00987                 return ! error;
00988 
00989         // Chunk fully read. Pass it on.
00990         *chunk = read_chunk;
00991         read_chunk = 0;
00992         read_state = LEN;
00993 
00994 #ifdef DEBUG
00995         if ( *chunk )
00996                 DBG_LOG(DBG_CHUNKEDIO, "ssl read of size %d [%s]",
00997                         (*chunk)->len, fmt_bytes((*chunk)->data, 20));
00998 #endif
00999 
01000         return true;
01001         }
01002 
01003 bool ChunkedIOSSL::CanRead()
01004         {
01005         // We will be called regularly. So take the opportunity
01006         // to flush the write buffer.
01007         Flush();
01008 
01009         if ( SSL_pending(ssl) )
01010                 return true;
01011 
01012         fd_set fd_read;
01013         FD_ZERO(&fd_read);
01014         FD_SET(socket, &fd_read);
01015 
01016         struct timeval notimeout;
01017         notimeout.tv_sec = 0;
01018         notimeout.tv_usec = 0;
01019 
01020         return select(socket + 1, &fd_read, NULL, NULL, &notimeout) > 0;
01021         }
01022 
01023 bool ChunkedIOSSL::CanWrite()
01024         {
01025         return write_head != 0;
01026         }
01027 
01028 bool ChunkedIOSSL::IsIdle()
01029         {
01030         return ! (CanRead() || CanWrite());
01031         }
01032 
01033 bool ChunkedIOSSL::IsFillingUp()
01034         {
01035         // We don't really need this at the moment (since SSL is only used for
01036         // peer-to-peer communication). Thus, we always return false for now.
01037         return false;
01038         }
01039 
01040 void ChunkedIOSSL::Clear()
01041         {
01042         while ( write_head )
01043                 {
01044                 Queue* next = write_head->next;
01045                 delete [] write_head->chunk->data;
01046                 delete write_head->chunk;
01047                 delete write_head;
01048                 write_head = next;
01049                 }
01050         write_head = write_tail = 0;
01051         }
01052 
01053 const char* ChunkedIOSSL::Error()
01054         {
01055         const int BUFLEN = 512;
01056         static char buffer[BUFLEN];
01057 
01058         int sslcode = SSL_get_error(ssl, last_ret);
01059         int errcode = ERR_get_error();
01060 
01061         int count = safe_snprintf(buffer, BUFLEN, "[%d,%d,%d] SSL error: ",
01062                                         errcode, sslcode, last_ret);
01063 
01064         if ( errcode )
01065                 ERR_error_string_n(errcode, buffer + count, BUFLEN - count);
01066 
01067         else if ( sslcode == SSL_ERROR_SYSCALL )
01068                 {
01069                 if ( last_ret )
01070                         // Look at errno.
01071                         safe_snprintf(buffer + count, BUFLEN - count,
01072                                         "syscall: %s", strerror(errno));
01073                 else
01074                         // Errno is not valid in this case.
01075                         safe_strncpy(buffer + count,
01076                                         "syscall: unexpected end-of-file",
01077                                         BUFLEN - count);
01078                 }
01079         else
01080                 safe_strncpy(buffer + count, "unknown error", BUFLEN - count);
01081 
01082         return buffer;
01083         }
01084 
01085 void ChunkedIOSSL::Stats(char* buffer, int length)
01086         {
01087         int i = safe_snprintf(buffer, length, "pending=%ld ", stats.pending);
01088         ChunkedIO::Stats(buffer + i, length - i);
01089         }
01090 
01091 #ifdef HAVE_LIBZ
01092 
01093 bool CompressedChunkedIO::Init()
01094         {
01095         zin.zalloc = 0;
01096         zin.zfree = 0;
01097         zin.opaque = 0;
01098 
01099         zout.zalloc = 0;
01100         zout.zfree = 0;
01101         zout.opaque = 0;
01102 
01103         compress = uncompress = false;
01104         error = 0;
01105         uncompressed_bytes_read = 0;
01106         uncompressed_bytes_written = 0;
01107 
01108         return true;
01109         }
01110 
01111 bool CompressedChunkedIO::Read(Chunk** chunk, bool may_block)
01112         {
01113         if ( ! io->Read(chunk, may_block) )
01114                 return false;
01115 
01116         if ( ! uncompress )
01117                 return true;
01118 
01119         if ( ! *chunk )
01120                 return true;
01121 
01122         uint32 uncompressed_len =
01123                 *(uint32*)((*chunk)->data + (*chunk)->len - sizeof(uint32));
01124 
01125         if ( uncompressed_len == 0 )
01126                 {
01127                 // Not compressed.
01128                 DBG_LOG(DBG_CHUNKEDIO, "zlib read pass-through: size=%d",
01129                         (*chunk)->len);
01130                 return true;
01131                 }
01132 
01133         char* uncompressed = new char[uncompressed_len];
01134 
01135         DBG_LOG(DBG_CHUNKEDIO, "zlib read: size=%d uncompressed=%d",
01136                 (*chunk)->len, uncompressed_len);
01137 
01138         zin.next_in = (Bytef*) (*chunk)->data;
01139         zin.avail_in = (*chunk)->len - sizeof(uint32);
01140         zin.next_out = (Bytef*) uncompressed;
01141         zin.avail_out = uncompressed_len;
01142 
01143         if ( inflate(&zin, Z_SYNC_FLUSH) != Z_OK )
01144                 {
01145                 error = zin.msg;
01146                 return false;
01147                 }
01148 
01149         if ( zin.avail_in > 0 )
01150                 {
01151                 error = "compressed data longer than expected";
01152                 return false;
01153                 }
01154 
01155         delete [] (*chunk)->data;
01156 
01157         uncompressed_bytes_read += uncompressed_len;
01158 
01159         (*chunk)->len = uncompressed_len;
01160         (*chunk)->data = uncompressed;
01161 
01162         return true;
01163         }
01164 
01165 bool CompressedChunkedIO::Write(Chunk* chunk)
01166         {
01167         if ( (! compress) || IsPure() )
01168                 // No compression.
01169                 return io->Write(chunk);
01170 
01171         // We compress block-wise (rather than stream-wise) because:
01172         //
01173         // (1) it's significantly easier to implement due to our block-oriented
01174         // communication model (with a stream compression, we'd need to chop
01175         // the stream into blocks during decompression which would require
01176         // additional buffering and copying).
01177         //
01178         // (2) it ensures that we do not introduce any additional latencies (a
01179         // stream compression may decide to wait for the next chunk of data
01180         // before writing anything out).
01181         //
01182         // The block-wise compression comes at the cost of a smaller compression
01183         // factor.
01184         //
01185         // A compressed chunk's data looks like this:
01186         //   char[] compressed data
01187         //   uint32 uncompressed_length
01188         //
01189         // By including uncompressed_length, we again trade easier
01190         // decompression for a smaller reduction factor. If uncompressed_length
01191         // is zero, the data is *not* compressed.
01192 
01193         uncompressed_bytes_written += chunk->len;
01194         uint32 original_size = chunk->len;
01195 
01196         char* compressed = new char[chunk->len + sizeof(uint32)];
01197 
01198         if ( chunk->len < MIN_COMPRESS_SIZE )
01199                 {
01200                 // Too small; not worth any compression.
01201                 short_memcpy(compressed, chunk->data, chunk->len);
01202                 *(uint32*) (compressed + chunk->len) = 0; // uncompressed_length
01203 
01204                 delete [] chunk->data;
01205                 chunk->data = compressed;
01206                 chunk->len += 4;
01207 
01208                 DBG_LOG(DBG_CHUNKEDIO, "zlib write pass-through: size=%d", chunk->len);
01209                 }
01210         else
01211                 {
01212                 zout.next_in = (Bytef*) chunk->data;
01213                 zout.avail_in = chunk->len;
01214                 zout.next_out = (Bytef*) compressed;
01215                 zout.avail_out = chunk->len;
01216 
01217                 if ( deflate(&zout, Z_SYNC_FLUSH) != Z_OK )
01218                         {
01219                         error = zout.msg;
01220                         return false;
01221                         }
01222 
01223                 while ( zout.avail_out == 0 )
01224                         {
01225                         // D'oh! Not enough space, i.e., it hasn't got smaller.
01226                         char* old = compressed;
01227                         int old_size = (char*) zout.next_out - compressed;
01228                         int new_size = old_size * 2 + sizeof(uint32);
01229 
01230                         compressed = new char[new_size];
01231                         short_memcpy(compressed, old, old_size);
01232                         delete [] old;
01233 
01234                         zout.next_out = (Bytef*) (compressed + old_size);
01235                         zout.avail_out = old_size; // Sic! We doubled.
01236 
01237                         if ( deflate(&zout, Z_SYNC_FLUSH) != Z_OK )
01238                                 {
01239                                 error = zout.msg;
01240                                 return false;
01241                                 }
01242                         }
01243 
01244                 *(uint32*) zout.next_out = original_size; // uncompressed_length
01245 
01246                 delete [] chunk->data;
01247                 chunk->data = compressed;
01248                 chunk->len =
01249                         ((char*) zout.next_out - compressed) + sizeof(uint32);
01250 
01251                 DBG_LOG(DBG_CHUNKEDIO, "zlib write: size=%d compressed=%d",
01252                         original_size, chunk->len);
01253                 }
01254 
01255         return io->Write(chunk);
01256         }
01257 
01258 void CompressedChunkedIO::Stats(char* buffer, int length)
01259         {
01260         const Statistics* stats = io->Stats();
01261 
01262         int i = snprintf(buffer, length, "compression=%.2f/%.2f ",
01263                         uncompressed_bytes_read ? double(stats->bytes_read) / uncompressed_bytes_read : -1,
01264                         uncompressed_bytes_written ? double(stats->bytes_written) / uncompressed_bytes_written : -1 );
01265 
01266         io->Stats(buffer + i, length - i);
01267         buffer[length-1] = '\0';
01268         }
01269 
01270 #endif  /* HAVE_LIBZ */
01271 
01272 #endif  /* USE_OPENSSL */

Generated on Wed Sep 14 02:55:59 2005 for bro_docs by doxygen 1.3.5