00001
00002
00003 #include <unistd.h>
00004 #include <fcntl.h>
00005 #include <errno.h>
00006 #include <signal.h>
00007 #include <sys/time.h>
00008 #include <netinet/in.h>
00009 #include <assert.h>
00010
00011 #include "config.h"
00012 #include "ChunkedIO.h"
00013 #include "NetVar.h"
00014
00015
00016 ChunkedIO::ChunkedIO(int arg_error_fd)
00017 {
00018 pure = false;
00019 error_fd = arg_error_fd;
00020 }
00021
00022 void ChunkedIO::Stats(char* buffer, int length)
00023 {
00024 safe_snprintf(buffer, length,
00025 "bytes=%luK/%luK chunks=%lu/%lu io=%lu/%lu bytes/io=%.2fK/%.2fK",
00026 stats.bytes_read / 1024, stats.bytes_written / 1024,
00027 stats.chunks_read, stats.chunks_written,
00028 stats.reads, stats.writes,
00029 stats.bytes_read / (1024.0 * stats.reads),
00030 stats.bytes_written / (1024.0 * stats.writes));
00031 }
00032
00033 ChunkedIOFd::ChunkedIOFd(int arg_fd, const char* arg_tag, int error_fd)
00034 : ChunkedIO(error_fd)
00035 {
00036 int flags;
00037
00038 tag = arg_tag;
00039 fd = arg_fd;
00040 eof = 0;
00041 last_flush = current_time();
00042 failed_reads = 0;
00043
00044 if ( (flags = fcntl(fd, F_GETFL, 0)) < 0)
00045 {
00046 Log(fmt("can't obtain socket flags: %s", strerror(errno)));
00047 exit(1);
00048 }
00049
00050 if ( fcntl(fd, F_SETFL, flags|O_NONBLOCK) < 0 )
00051 {
00052 Log(fmt("can't set fd to non-blocking: %s (%d)",
00053 strerror(errno), getpid()));
00054 exit(1);
00055 }
00056
00057 read_buffer = new char[BUFFER_SIZE];
00058 read_len = 0;
00059 read_pos = 0;
00060 partial = 0;
00061 write_buffer = new char[BUFFER_SIZE];
00062 write_len = 0;
00063 write_pos = 0;
00064
00065 pending_head = 0;
00066 pending_tail = 0;
00067
00068 parent = getppid();
00069 }
00070
00071 ChunkedIOFd::~ChunkedIOFd()
00072 {
00073 Clear();
00074
00075 delete [] read_buffer;
00076 delete [] write_buffer;
00077 close(fd);
00078
00079 if ( partial )
00080 {
00081 delete [] partial->data;
00082 delete partial;
00083 }
00084 }
00085
00086 bool ChunkedIOFd::Write(Chunk* chunk)
00087 {
00088 #ifdef DEBUG
00089 DBG_LOG(DBG_CHUNKEDIO, "write of size %d [%s]",
00090 chunk->len, fmt_bytes(chunk->data, min(20, chunk->len)));
00091 #endif
00092
00093
00094
00095 if ( stats.pending > MAX_BUFFERED_CHUNKS )
00096 {
00097 DBG_LOG(DBG_CHUNKEDIO, "write queue full");
00098 errno = ENOSPC;
00099 return false;
00100 }
00101
00102 if ( chunk->len <= BUFFER_SIZE - sizeof(uint32) )
00103 return WriteChunk(chunk, false);
00104
00105
00106 char* p = chunk->data;
00107 unsigned long left = chunk->len;
00108
00109 while ( left )
00110 {
00111 Chunk* part = new Chunk;
00112
00113 part->len = min(BUFFER_SIZE - sizeof(uint32), left);
00114 part->data = new char[part->len];
00115 short_memcpy(part->data, p, part->len);
00116 left -= part->len;
00117 p += part->len;
00118
00119 if ( ! WriteChunk(part, left != 0) )
00120 return false;
00121 }
00122
00123 delete [] chunk->data;
00124 delete chunk;
00125
00126 return true;
00127 }
00128
00129 bool ChunkedIOFd::WriteChunk(Chunk* chunk, bool partial)
00130 {
00131 assert(chunk->len <= BUFFER_SIZE - sizeof(uint32) );
00132
00133 if ( chunk->len == 0 )
00134 internal_error( "attempt to write 0 bytes chunk");
00135
00136 if ( partial )
00137 chunk->len |= FLAG_PARTIAL;
00138
00139 ++stats.chunks_written;
00140
00141
00142
00143 if ( ! pending_head && PutIntoWriteBuffer(chunk) )
00144 return true;
00145
00146
00147 ++stats.pending;
00148 ChunkQueue* q = new ChunkQueue;
00149 q->chunk = chunk;
00150 q->next = 0;
00151
00152 if ( pending_tail )
00153 {
00154 pending_tail->next = q;
00155 pending_tail = q;
00156 }
00157 else
00158 pending_head = pending_tail = q;
00159
00160 if ( stats.pending && stats.pending % 10000 == 0 )
00161 Log(fmt("ChunkedIO: %ld blocks pending in process %d (%s)",
00162 stats.pending, getpid(), tag));
00163
00164 return Flush();
00165 }
00166
00167
00168 bool ChunkedIOFd::PutIntoWriteBuffer(Chunk* chunk)
00169 {
00170 uint32 len = chunk->len & ~FLAG_PARTIAL;
00171
00172 if ( write_len + len + (IsPure() ? 0 : sizeof(len)) > BUFFER_SIZE )
00173 return false;
00174
00175 if ( ! IsPure() )
00176 {
00177 uint32 nlen = htonl(chunk->len);
00178 short_memcpy(write_buffer + write_len, &nlen, sizeof(nlen));
00179 write_len += sizeof(nlen);
00180 }
00181
00182 short_memcpy(write_buffer + write_len, chunk->data, len);
00183 write_len += len;
00184
00185 delete [] chunk->data;
00186 delete chunk;
00187
00188 if ( network_time - last_flush > 0.005 )
00189 FlushWriteBuffer();
00190
00191 return true;
00192 }
00193
00194 bool ChunkedIOFd::FlushWriteBuffer()
00195 {
00196 last_flush = network_time;
00197
00198 while ( write_pos != write_len )
00199 {
00200 uint32 len = write_len - write_pos;
00201
00202 int written = write(fd, write_buffer + write_pos, len);
00203
00204 if ( written < 0 )
00205 {
00206 if ( errno == EPIPE )
00207 eof = true;
00208
00209
00210 return errno == EWOULDBLOCK || errno == EAGAIN ||
00211 errno == EINTR;
00212 }
00213
00214 stats.bytes_written += written;
00215 if ( written > 0 )
00216 ++stats.writes;
00217
00218 if ( unsigned(written) == len )
00219 {
00220 write_pos = write_len = 0;
00221 return true;
00222 }
00223
00224 if ( written == 0 )
00225 internal_error("written==0");
00226
00227
00228 write_pos += written;
00229 }
00230
00231 return true;
00232 }
00233
00234 bool ChunkedIOFd::OptionalFlush()
00235 {
00236
00237
00238 return Flush();
00239 }
00240
00241 bool ChunkedIOFd::Flush()
00242 {
00243
00244 while ( pending_head )
00245 {
00246 if ( ! FlushWriteBuffer() )
00247 return false;
00248
00249
00250
00251 if ( write_len > 0 )
00252 return true;
00253
00254
00255 while ( pending_head )
00256 {
00257 if ( ! PutIntoWriteBuffer(pending_head->chunk) )
00258 break;
00259
00260 ChunkQueue* q = pending_head;
00261 pending_head = pending_head->next;
00262 if ( ! pending_head )
00263 pending_tail = 0;
00264
00265 --stats.pending;
00266 delete q;
00267 }
00268 }
00269
00270 return FlushWriteBuffer();
00271 }
00272
00273 uint32 ChunkedIOFd::ChunkAvailable()
00274 {
00275 int bytes_left = read_len - read_pos;
00276
00277 if ( bytes_left < int(sizeof(uint32)) )
00278 return 0;
00279
00280 bytes_left -= sizeof(uint32);
00281
00282
00283
00284 uint32 len;
00285 short_memcpy(&len, read_buffer + read_pos, sizeof(len));
00286 len = ntohl(len);
00287
00288 if ( uint32(bytes_left) < (len & ~FLAG_PARTIAL) )
00289 return 0;
00290
00291 assert(len & ~FLAG_PARTIAL);
00292
00293 return len;
00294 }
00295
00296 ChunkedIO::Chunk* ChunkedIOFd::ExtractChunk()
00297 {
00298 uint32 len = ChunkAvailable();
00299 uint32 real_len = len & ~FLAG_PARTIAL;
00300 if ( ! real_len )
00301 return 0;
00302
00303 read_pos += sizeof(uint32);
00304
00305 Chunk* chunk = new Chunk;
00306 chunk->len = len;
00307 chunk->data = new char[real_len];
00308 short_memcpy(chunk->data, read_buffer + read_pos, real_len);
00309 read_pos += real_len;
00310
00311 ++stats.chunks_read;
00312
00313 return chunk;
00314 }
00315
00316 ChunkedIO::Chunk* ChunkedIOFd::ConcatChunks(Chunk* c1, Chunk* c2)
00317 {
00318 Chunk* c = new Chunk;
00319
00320 c->len = c1->len + c2->len;
00321 c->data = new char[c->len];
00322
00323 short_memcpy(c->data, c1->data, c1->len);
00324 short_memcpy(c->data + c1->len, c2->data, c2->len);
00325
00326 delete [] c1->data;
00327 delete c1;
00328 delete [] c2->data;
00329 delete c2;
00330
00331 return c;
00332 }
00333
00334 void ChunkedIO::Log(const char* str)
00335 {
00336 write(error_fd, str, strlen(str));
00337 write(error_fd, "\n", 1);
00338 }
00339
00340 bool ChunkedIOFd::Read(Chunk** chunk, bool may_block)
00341 {
00342
00343
00344 OptionalFlush();
00345
00346 if ( ! ReadChunk(chunk, may_block) )
00347 return false;
00348
00349 if ( ! *chunk )
00350 return true;
00351
00352 #ifdef DEBUG
00353 if ( *chunk )
00354 DBG_LOG(DBG_CHUNKEDIO, "read of size %d %s[%s]",
00355 (*chunk)->len & ~FLAG_PARTIAL,
00356 (*chunk)->len & FLAG_PARTIAL ? "(P) " : "",
00357 fmt_bytes((*chunk)->data,
00358 min(20, (*chunk)->len)));
00359 #endif
00360
00361 if ( ! ((*chunk)->len & FLAG_PARTIAL) )
00362 {
00363 if ( ! partial )
00364 return true;
00365 else
00366 {
00367
00368 *chunk = ConcatChunks(partial, *chunk);
00369 partial = 0;
00370
00371 #ifdef DEBUG
00372 if ( *chunk )
00373 DBG_LOG(DBG_CHUNKEDIO,
00374 "built virtual chunk of size %d [%s]",
00375 (*chunk)->len,
00376 fmt_bytes((*chunk)->data, 20));
00377 #endif
00378
00379
00380 return true;
00381 }
00382 }
00383
00384
00385 (*chunk)->len &= ~FLAG_PARTIAL;
00386
00387 if ( ! partial )
00388
00389 partial = *chunk;
00390 else
00391 partial = ConcatChunks(partial, *chunk);
00392
00393 *chunk = 0;
00394 return true;
00395 }
00396
00397 bool ChunkedIOFd::ReadChunk(Chunk** chunk, bool may_block)
00398 {
00399
00400
00401 OptionalFlush();
00402
00403 *chunk = ExtractChunk();
00404 if ( *chunk )
00405 return true;
00406
00407 int bytes_left = read_len - read_pos;
00408
00409
00410
00411 if ( bytes_left )
00412 memmove(read_buffer, read_buffer + read_pos, bytes_left);
00413
00414 read_pos = 0;
00415 read_len = bytes_left;
00416
00417
00418 if ( may_block )
00419 {
00420 fd_set fd_read, fd_write, fd_except;
00421
00422 FD_ZERO(&fd_read);
00423 FD_ZERO(&fd_write);
00424 FD_ZERO(&fd_except);
00425 FD_SET(fd, &fd_read);
00426
00427 struct timeval small_timeout;
00428 small_timeout.tv_sec = 0;
00429 small_timeout.tv_usec = 50;
00430
00431 select(fd + 1, &fd_read, &fd_write, &fd_except, &small_timeout);
00432 }
00433
00434
00435
00436
00437 if ( kill(parent, 0) < 0 && errno != EPERM )
00438 {
00439 eof = true;
00440 errno = EPIPE;
00441 return false;
00442 }
00443
00444
00445 while ( true )
00446 {
00447 int len = BUFFER_SIZE - read_len;
00448 int read = ::read(fd, read_buffer + read_len, len);
00449
00450 if ( read < 0 )
00451 {
00452
00453 if ( errno == EWOULDBLOCK || errno == EAGAIN ||
00454 errno == EINTR )
00455 {
00456
00457
00458
00459 *chunk = ExtractChunk();
00460 ++failed_reads;
00461 return true;
00462 }
00463
00464 if ( errno == EPIPE )
00465 eof = true;
00466
00467 return false;
00468 }
00469
00470 failed_reads = 0;
00471
00472 if ( read == 0 )
00473 {
00474 *chunk = ExtractChunk();
00475 if ( *chunk )
00476 return true;
00477
00478 eof = true;
00479 return false;
00480 }
00481
00482 read_len += read;
00483
00484 ++stats.reads;
00485 stats.bytes_read += read;
00486
00487 if ( read == len )
00488 break;
00489 }
00490
00491
00492 *chunk = ExtractChunk();
00493
00494 return true;
00495 }
00496
00497 bool ChunkedIOFd::CanRead()
00498 {
00499
00500
00501 OptionalFlush();
00502
00503 if ( ChunkAvailable() )
00504 return true;
00505
00506 fd_set fd_read;
00507 FD_ZERO(&fd_read);
00508 FD_SET(fd, &fd_read);
00509
00510 struct timeval no_timeout;
00511 no_timeout.tv_sec = 0;
00512 no_timeout.tv_usec = 0;
00513
00514 return select(fd + 1, &fd_read, 0, 0, &no_timeout) > 0;
00515 }
00516
00517 bool ChunkedIOFd::CanWrite()
00518 {
00519 return pending_head != 0;
00520 }
00521
00522 bool ChunkedIOFd::IsIdle()
00523 {
00524 if ( pending_head || ChunkAvailable() )
00525 return false;
00526
00527 if ( failed_reads > 0 )
00528 return true;
00529
00530 return false;
00531 }
00532
00533 bool ChunkedIOFd::IsFillingUp()
00534 {
00535 return stats.pending > MAX_BUFFERED_CHUNKS_SOFT;
00536 }
00537
00538 void ChunkedIOFd::Clear()
00539 {
00540 while ( pending_head )
00541 {
00542 ChunkQueue* next = pending_head->next;
00543 delete [] pending_head->chunk->data;
00544 delete pending_head->chunk;
00545 delete pending_head;
00546 pending_head = next;
00547 }
00548
00549 pending_head = pending_tail = 0;
00550 }
00551
00552 const char* ChunkedIOFd::Error()
00553 {
00554 static char buffer[1024];
00555 safe_snprintf(buffer, sizeof(buffer), "%s [%d]", strerror(errno), errno);
00556
00557 return buffer;
00558 }
00559
00560 void ChunkedIOFd::Stats(char* buffer, int length)
00561 {
00562 int i = safe_snprintf(buffer, length, "pending=%d ", stats.pending);
00563 ChunkedIO::Stats(buffer + i, length - i);
00564 }
00565
00566
00567 #ifdef USE_OPENSSL
00568
00569 #include <openssl/ssl.h>
00570
00571 SSL_CTX* ChunkedIOSSL::ctx;
00572
00573 ChunkedIOSSL::ChunkedIOSSL(int arg_socket, bool arg_server, int error_fd)
00574 : ChunkedIO(error_fd)
00575 {
00576 socket = arg_socket;
00577 eof = false;
00578 setup = false;
00579 server = arg_server;
00580 ssl = 0;
00581
00582 write_state = LEN;
00583 write_head = 0;
00584 write_tail = 0;
00585
00586 read_state = LEN;
00587 read_chunk = 0;
00588 read_ptr = 0;
00589 }
00590
00591 ChunkedIOSSL::~ChunkedIOSSL()
00592 {
00593 if ( setup )
00594 {
00595 SSL_shutdown(ssl);
00596
00597
00598 setup = false;
00599 }
00600
00601 if ( ssl )
00602 {
00603 SSL_free(ssl);
00604 ssl = 0;
00605 }
00606
00607 close(socket);
00608 }
00609
00610
00611 static int pem_passwd_cb(char* buf, int size, int rwflag, void* passphrase)
00612 {
00613 safe_strncpy(buf, (char*) passphrase, size);
00614 buf[size - 1] = '\0';
00615 return strlen(buf);
00616 }
00617
00618 bool ChunkedIOSSL::Init()
00619 {
00620
00621
00622 if ( ! ctx )
00623 {
00624 SSL_load_error_strings();
00625
00626 ctx = SSL_CTX_new(SSLv3_method());
00627 if ( ! ctx )
00628 {
00629 Log("can't create SSL context");
00630 return false;
00631 }
00632
00633
00634
00635
00636 const char* key = ssl_private_key->AsString()->CheckString();
00637
00638 if ( ! (key && *key &&
00639 SSL_CTX_use_certificate_chain_file(ctx, key)) )
00640 {
00641 Log(fmt("can't read certificate from file %s", key));
00642 return false;
00643 }
00644
00645 const char* passphrase =
00646 ssl_passphrase->AsString()->CheckString();
00647
00648 if ( passphrase && ! streq(passphrase, "<undefined>") )
00649 {
00650 SSL_CTX_set_default_passwd_cb(ctx, pem_passwd_cb);
00651 SSL_CTX_set_default_passwd_cb_userdata(ctx,
00652 (void*) passphrase);
00653 }
00654
00655 if ( ! (key && *key &&
00656 SSL_CTX_use_PrivateKey_file(ctx, key, SSL_FILETYPE_PEM)) )
00657 {
00658 Log(fmt("can't read private key from file %s", key));
00659 return false;
00660 }
00661
00662 const char* ca = ssl_ca_certificate->AsString()->CheckString();
00663 if ( ! (ca && *ca && SSL_CTX_load_verify_locations(ctx, ca, 0)) )
00664 {
00665 Log(fmt("can't read CA certificate from file %s", ca));
00666 return false;
00667 }
00668
00669
00670 if ( ! SSL_CTX_set_cipher_list(ctx, "HIGH") )
00671 {
00672 Log("can't set cipher list");
00673 return false;
00674 }
00675
00676
00677 SSL_CTX_set_verify(ctx,
00678 SSL_VERIFY_PEER | SSL_VERIFY_FAIL_IF_NO_PEER_CERT, 0);
00679 }
00680
00681 int flags;
00682
00683 if ( (flags = fcntl(socket, F_GETFL, 0)) < 0)
00684 {
00685 Log(fmt("can't obtain socket flags: %s", strerror(errno)));
00686 return false;
00687 }
00688
00689 if ( fcntl(socket, F_SETFL, flags|O_NONBLOCK) < 0 )
00690 {
00691 Log(fmt("can't set socket to non-blocking: %s",
00692 strerror(errno)));
00693 return false;
00694 }
00695
00696 if ( ! ssl )
00697 {
00698 ssl = SSL_new(ctx);
00699 if ( ! ssl )
00700 {
00701 Log("can't create SSL object");
00702 return false;
00703 }
00704
00705 BIO* bio = BIO_new_socket(socket, BIO_NOCLOSE);
00706 BIO_set_nbio(bio, 1);
00707 SSL_set_bio(ssl, bio, bio);
00708 }
00709
00710 int success;
00711 if ( server )
00712 success = last_ret = SSL_accept(ssl);
00713 else
00714 success = last_ret = SSL_connect(ssl);
00715
00716 if ( success > 0 )
00717 {
00718 setup = true;
00719 return true;
00720 }
00721
00722 int error = SSL_get_error(ssl, success);
00723
00724 if ( success <= 0 &&
00725 (error == SSL_ERROR_WANT_WRITE || error == SSL_ERROR_WANT_READ) )
00726
00727 return true;
00728
00729
00730 eof = true;
00731 return false;
00732 }
00733
00734 bool ChunkedIOSSL::Write(Chunk* chunk)
00735 {
00736 #ifdef DEBUG
00737 DBG_LOG(DBG_CHUNKEDIO, "ssl write of size %d [%s]",
00738 chunk->len, fmt_bytes(chunk->data, 20));
00739 #endif
00740
00741
00742
00743 if ( stats.pending > MAX_BUFFERED_CHUNKS )
00744 {
00745 DBG_LOG(DBG_CHUNKEDIO, "write queue full");
00746 errno = ENOSPC;
00747 return false;
00748 }
00749
00750
00751 ++stats.pending;
00752 Queue* q = new Queue;
00753 q->chunk = chunk;
00754 q->next = 0;
00755
00756
00757 chunk->len = htonl(chunk->len);
00758
00759 if ( write_tail )
00760 {
00761 write_tail->next = q;
00762 write_tail = q;
00763 }
00764 else
00765 write_head = write_tail = q;
00766
00767 Flush();
00768 return true;
00769 }
00770
00771 bool ChunkedIOSSL::WriteData(char* p, uint32 len, bool* error)
00772 {
00773 *error = false;
00774
00775 double t = current_time();
00776
00777 int written = last_ret = SSL_write(ssl, p, len);
00778
00779 switch ( SSL_get_error(ssl, written) ) {
00780 case SSL_ERROR_NONE:
00781
00782
00783 return true;
00784
00785 case SSL_ERROR_WANT_READ:
00786 case SSL_ERROR_WANT_WRITE:
00787
00788 DBG_LOG(DBG_CHUNKEDIO,
00789 "SSL_write: SSL_ERROR_WANT_READ [%d,%d]",
00790 read, SSL_get_error(ssl, written));
00791 *error = false;
00792 return false;
00793
00794 case SSL_ERROR_ZERO_RETURN:
00795
00796 DBG_LOG(DBG_CHUNKEDIO,
00797 "SSL_write: SSL_ZERO_RETURN [%d,%d]",
00798 read, SSL_get_error(ssl, written));
00799 *error = eof = true;
00800 return false;
00801
00802 case SSL_ERROR_SYSCALL:
00803 DBG_LOG(DBG_CHUNKEDIO,
00804 "SSL_write: SSL_SYS_CALL [%d,%d]",
00805 read, SSL_get_error(ssl, written));
00806
00807 if ( read == 0 )
00808 {
00809
00810 *error = eof = true;
00811 return false;
00812 }
00813
00814
00815
00816 default:
00817 DBG_LOG(DBG_CHUNKEDIO,
00818 "SSL_write: fatal error [%d,%d]",
00819 read, SSL_get_error(ssl, written));
00820
00821 *error = true;
00822 return false;
00823 }
00824
00825 internal_error("can't be reached");
00826 return false;
00827 }
00828
00829 bool ChunkedIOSSL::Flush()
00830 {
00831 if ( ! setup )
00832 {
00833
00834 if ( ! Init() )
00835 return false;
00836 if ( ! setup )
00837 return true;
00838 }
00839
00840 while ( write_head )
00841 {
00842 bool error;
00843
00844 Chunk* c = write_head->chunk;
00845
00846 if ( write_state == LEN )
00847 {
00848 if ( ! WriteData((char*)&c->len, sizeof(c->len), &error) )
00849 return ! error;
00850 write_state = DATA;
00851
00852
00853 c->len = ntohl(c->len);
00854 }
00855
00856 if ( ! WriteData(c->data, c->len, &error) )
00857 return ! error;
00858
00859
00860 Queue* q = write_head;
00861 write_head = write_head->next;
00862 if ( ! write_head )
00863 write_tail = 0;
00864 --stats.pending;
00865 delete q;
00866
00867 delete [] c->data;
00868 delete c;
00869
00870 write_state = LEN;
00871 }
00872
00873 return true;
00874 }
00875
00876 bool ChunkedIOSSL::ReadData(char* p, uint32 len, bool* error)
00877 {
00878 if ( ! read_ptr )
00879 read_ptr = p;
00880
00881 while ( true )
00882 {
00883 double t = current_time();
00884
00885 int read = last_ret =
00886 SSL_read(ssl, read_ptr, len - (read_ptr - p));
00887
00888 switch ( SSL_get_error(ssl, read) ) {
00889 case SSL_ERROR_NONE:
00890
00891 read_ptr += read;
00892
00893 if ( unsigned(read_ptr - p) == len )
00894 {
00895
00896 read_ptr = 0;
00897 *error = false;
00898 return true;
00899 }
00900
00901 break;
00902
00903 case SSL_ERROR_WANT_READ:
00904 case SSL_ERROR_WANT_WRITE:
00905
00906 DBG_LOG(DBG_CHUNKEDIO,
00907 "SSL_read: SSL_ERROR_WANT_READ [%d,%d]",
00908 read, SSL_get_error(ssl, read));
00909 *error = false;
00910 return false;
00911
00912 case SSL_ERROR_ZERO_RETURN:
00913
00914 DBG_LOG(DBG_CHUNKEDIO,
00915 "SSL_read: SSL_ZERO_RETURN [%d,%d]",
00916 read, SSL_get_error(ssl, read));
00917 *error = eof = true;
00918 return false;
00919
00920 case SSL_ERROR_SYSCALL:
00921 DBG_LOG(DBG_CHUNKEDIO, "SSL_read: SSL_SYS_CALL [%d,%d]",
00922 read, SSL_get_error(ssl, read));
00923
00924 if ( read == 0 )
00925 {
00926
00927 *error = eof = true;
00928 return false;
00929 }
00930
00931
00932
00933 default:
00934 DBG_LOG(DBG_CHUNKEDIO,
00935 "SSL_read: fatal error [%d,%d]",
00936 read, SSL_get_error(ssl, read));
00937
00938
00939 *error = true;
00940 return false;
00941 }
00942 }
00943
00944
00945 internal_error("can't be reached");
00946 return false;
00947 }
00948
00949 bool ChunkedIOSSL::Read(Chunk** chunk, bool mayblock)
00950 {
00951 *chunk = 0;
00952
00953 if ( ! setup )
00954 {
00955
00956 if ( ! Init() )
00957 return false;
00958 if ( ! setup )
00959 return true;
00960 }
00961
00962 bool error;
00963
00964 Flush();
00965
00966 if ( read_state == LEN )
00967 {
00968 if ( ! read_chunk )
00969 {
00970 read_chunk = new Chunk;
00971 read_chunk->data = 0;
00972 }
00973
00974 if ( ! ReadData((char*)&read_chunk->len,
00975 sizeof(read_chunk->len),
00976 &error) )
00977 return ! error;
00978
00979 read_state = DATA;
00980 read_chunk->len = ntohl(read_chunk->len);
00981 }
00982
00983 if ( ! read_chunk->data )
00984 read_chunk->data = new char[read_chunk->len];
00985
00986 if ( ! ReadData(read_chunk->data, read_chunk->len, &error) )
00987 return ! error;
00988
00989
00990 *chunk = read_chunk;
00991 read_chunk = 0;
00992 read_state = LEN;
00993
00994 #ifdef DEBUG
00995 if ( *chunk )
00996 DBG_LOG(DBG_CHUNKEDIO, "ssl read of size %d [%s]",
00997 (*chunk)->len, fmt_bytes((*chunk)->data, 20));
00998 #endif
00999
01000 return true;
01001 }
01002
01003 bool ChunkedIOSSL::CanRead()
01004 {
01005
01006
01007 Flush();
01008
01009 if ( SSL_pending(ssl) )
01010 return true;
01011
01012 fd_set fd_read;
01013 FD_ZERO(&fd_read);
01014 FD_SET(socket, &fd_read);
01015
01016 struct timeval notimeout;
01017 notimeout.tv_sec = 0;
01018 notimeout.tv_usec = 0;
01019
01020 return select(socket + 1, &fd_read, NULL, NULL, ¬imeout) > 0;
01021 }
01022
01023 bool ChunkedIOSSL::CanWrite()
01024 {
01025 return write_head != 0;
01026 }
01027
01028 bool ChunkedIOSSL::IsIdle()
01029 {
01030 return ! (CanRead() || CanWrite());
01031 }
01032
01033 bool ChunkedIOSSL::IsFillingUp()
01034 {
01035
01036
01037 return false;
01038 }
01039
01040 void ChunkedIOSSL::Clear()
01041 {
01042 while ( write_head )
01043 {
01044 Queue* next = write_head->next;
01045 delete [] write_head->chunk->data;
01046 delete write_head->chunk;
01047 delete write_head;
01048 write_head = next;
01049 }
01050 write_head = write_tail = 0;
01051 }
01052
01053 const char* ChunkedIOSSL::Error()
01054 {
01055 const int BUFLEN = 512;
01056 static char buffer[BUFLEN];
01057
01058 int sslcode = SSL_get_error(ssl, last_ret);
01059 int errcode = ERR_get_error();
01060
01061 int count = safe_snprintf(buffer, BUFLEN, "[%d,%d,%d] SSL error: ",
01062 errcode, sslcode, last_ret);
01063
01064 if ( errcode )
01065 ERR_error_string_n(errcode, buffer + count, BUFLEN - count);
01066
01067 else if ( sslcode == SSL_ERROR_SYSCALL )
01068 {
01069 if ( last_ret )
01070
01071 safe_snprintf(buffer + count, BUFLEN - count,
01072 "syscall: %s", strerror(errno));
01073 else
01074
01075 safe_strncpy(buffer + count,
01076 "syscall: unexpected end-of-file",
01077 BUFLEN - count);
01078 }
01079 else
01080 safe_strncpy(buffer + count, "unknown error", BUFLEN - count);
01081
01082 return buffer;
01083 }
01084
01085 void ChunkedIOSSL::Stats(char* buffer, int length)
01086 {
01087 int i = safe_snprintf(buffer, length, "pending=%ld ", stats.pending);
01088 ChunkedIO::Stats(buffer + i, length - i);
01089 }
01090
01091 #ifdef HAVE_LIBZ
01092
01093 bool CompressedChunkedIO::Init()
01094 {
01095 zin.zalloc = 0;
01096 zin.zfree = 0;
01097 zin.opaque = 0;
01098
01099 zout.zalloc = 0;
01100 zout.zfree = 0;
01101 zout.opaque = 0;
01102
01103 compress = uncompress = false;
01104 error = 0;
01105 uncompressed_bytes_read = 0;
01106 uncompressed_bytes_written = 0;
01107
01108 return true;
01109 }
01110
01111 bool CompressedChunkedIO::Read(Chunk** chunk, bool may_block)
01112 {
01113 if ( ! io->Read(chunk, may_block) )
01114 return false;
01115
01116 if ( ! uncompress )
01117 return true;
01118
01119 if ( ! *chunk )
01120 return true;
01121
01122 uint32 uncompressed_len =
01123 *(uint32*)((*chunk)->data + (*chunk)->len - sizeof(uint32));
01124
01125 if ( uncompressed_len == 0 )
01126 {
01127
01128 DBG_LOG(DBG_CHUNKEDIO, "zlib read pass-through: size=%d",
01129 (*chunk)->len);
01130 return true;
01131 }
01132
01133 char* uncompressed = new char[uncompressed_len];
01134
01135 DBG_LOG(DBG_CHUNKEDIO, "zlib read: size=%d uncompressed=%d",
01136 (*chunk)->len, uncompressed_len);
01137
01138 zin.next_in = (Bytef*) (*chunk)->data;
01139 zin.avail_in = (*chunk)->len - sizeof(uint32);
01140 zin.next_out = (Bytef*) uncompressed;
01141 zin.avail_out = uncompressed_len;
01142
01143 if ( inflate(&zin, Z_SYNC_FLUSH) != Z_OK )
01144 {
01145 error = zin.msg;
01146 return false;
01147 }
01148
01149 if ( zin.avail_in > 0 )
01150 {
01151 error = "compressed data longer than expected";
01152 return false;
01153 }
01154
01155 delete [] (*chunk)->data;
01156
01157 uncompressed_bytes_read += uncompressed_len;
01158
01159 (*chunk)->len = uncompressed_len;
01160 (*chunk)->data = uncompressed;
01161
01162 return true;
01163 }
01164
01165 bool CompressedChunkedIO::Write(Chunk* chunk)
01166 {
01167 if ( (! compress) || IsPure() )
01168
01169 return io->Write(chunk);
01170
01171
01172
01173
01174
01175
01176
01177
01178
01179
01180
01181
01182
01183
01184
01185
01186
01187
01188
01189
01190
01191
01192
01193 uncompressed_bytes_written += chunk->len;
01194 uint32 original_size = chunk->len;
01195
01196 char* compressed = new char[chunk->len + sizeof(uint32)];
01197
01198 if ( chunk->len < MIN_COMPRESS_SIZE )
01199 {
01200
01201 short_memcpy(compressed, chunk->data, chunk->len);
01202 *(uint32*) (compressed + chunk->len) = 0;
01203
01204 delete [] chunk->data;
01205 chunk->data = compressed;
01206 chunk->len += 4;
01207
01208 DBG_LOG(DBG_CHUNKEDIO, "zlib write pass-through: size=%d", chunk->len);
01209 }
01210 else
01211 {
01212 zout.next_in = (Bytef*) chunk->data;
01213 zout.avail_in = chunk->len;
01214 zout.next_out = (Bytef*) compressed;
01215 zout.avail_out = chunk->len;
01216
01217 if ( deflate(&zout, Z_SYNC_FLUSH) != Z_OK )
01218 {
01219 error = zout.msg;
01220 return false;
01221 }
01222
01223 while ( zout.avail_out == 0 )
01224 {
01225
01226 char* old = compressed;
01227 int old_size = (char*) zout.next_out - compressed;
01228 int new_size = old_size * 2 + sizeof(uint32);
01229
01230 compressed = new char[new_size];
01231 short_memcpy(compressed, old, old_size);
01232 delete [] old;
01233
01234 zout.next_out = (Bytef*) (compressed + old_size);
01235 zout.avail_out = old_size;
01236
01237 if ( deflate(&zout, Z_SYNC_FLUSH) != Z_OK )
01238 {
01239 error = zout.msg;
01240 return false;
01241 }
01242 }
01243
01244 *(uint32*) zout.next_out = original_size;
01245
01246 delete [] chunk->data;
01247 chunk->data = compressed;
01248 chunk->len =
01249 ((char*) zout.next_out - compressed) + sizeof(uint32);
01250
01251 DBG_LOG(DBG_CHUNKEDIO, "zlib write: size=%d compressed=%d",
01252 original_size, chunk->len);
01253 }
01254
01255 return io->Write(chunk);
01256 }
01257
01258 void CompressedChunkedIO::Stats(char* buffer, int length)
01259 {
01260 const Statistics* stats = io->Stats();
01261
01262 int i = snprintf(buffer, length, "compression=%.2f/%.2f ",
01263 uncompressed_bytes_read ? double(stats->bytes_read) / uncompressed_bytes_read : -1,
01264 uncompressed_bytes_written ? double(stats->bytes_written) / uncompressed_bytes_written : -1 );
01265
01266 io->Stats(buffer + i, length - i);
01267 buffer[length-1] = '\0';
01268 }
01269
01270 #endif
01271
01272 #endif