diff --git a/src/archiver.c b/src/archiver.c index 929b7d2..f920ebf 100644 --- a/src/archiver.c +++ b/src/archiver.c @@ -34,6 +34,7 @@ #include #include #include +#include #include #endif @@ -955,10 +956,95 @@ int read_fd_to_out_fd(FILE *in_fd, FILE *out_fd, char *read_buf, return SDAS_SUCCESS; } +int try_write_to_decomp(int *to_dec_pipe, uint64_t *chunk_remaining, FILE *in_f, + char *buf, const size_t buf_size) { + if (*to_dec_pipe >= 0) { + uint_fast32_t loop_count = 0; + if (*chunk_remaining > 0) { + if (*chunk_remaining > buf_size) { + size_t fread_ret = fread(buf, 1, 1024, in_f); + if (fread_ret == 0) { + goto TRY_WRITE_TO_DECOMP_END; + } else { + ssize_t write_ret; + TRY_WRITE_TO_DECOMP_AGAIN_0: + write_ret = write(*to_dec_pipe, buf, fread_ret); + if (write_ret < 0) { + if (errno == EAGAIN || errno == EWOULDBLOCK) { + // Non-blocking write. +#if SIMPLE_ARCHIVER_PLATFORM == SIMPLE_ARCHIVER_PLATFORM_COSMOPOLITAN || \ + SIMPLE_ARCHIVER_PLATFORM == SIMPLE_ARCHIVER_PLATFORM_MAC || \ + SIMPLE_ARCHIVER_PLATFORM == SIMPLE_ARCHIVER_PLATFORM_LINUX + struct timespec sleep_time; + sleep_time.tv_sec = 0; + sleep_time.tv_nsec = 100000000; + nanosleep(&sleep_time, NULL); +#endif + if (++loop_count > 10) { + return SDAS_INTERNAL_ERROR; + } + goto TRY_WRITE_TO_DECOMP_AGAIN_0; + } else { + return SDAS_INTERNAL_ERROR; + } + } else if (write_ret == 0) { + return SDAS_INTERNAL_ERROR; + } else { + *chunk_remaining -= (size_t)write_ret; + } + } + } else { + size_t fread_ret = fread(buf, 1, *chunk_remaining, in_f); + if (fread_ret == 0) { + goto TRY_WRITE_TO_DECOMP_END; + } else { + ssize_t write_ret; + TRY_WRITE_TO_DECOMP_AGAIN_1: + write_ret = write(*to_dec_pipe, buf, fread_ret); + if (write_ret < 0) { + if (errno == EAGAIN || errno == EWOULDBLOCK) { + // Non-blocking write. +#if SIMPLE_ARCHIVER_PLATFORM == SIMPLE_ARCHIVER_PLATFORM_COSMOPOLITAN || \ + SIMPLE_ARCHIVER_PLATFORM == SIMPLE_ARCHIVER_PLATFORM_MAC || \ + SIMPLE_ARCHIVER_PLATFORM == SIMPLE_ARCHIVER_PLATFORM_LINUX + struct timespec sleep_time; + sleep_time.tv_sec = 0; + sleep_time.tv_nsec = 100000000; + nanosleep(&sleep_time, NULL); +#endif + if (++loop_count > 10) { + return SDAS_INTERNAL_ERROR; + } + goto TRY_WRITE_TO_DECOMP_AGAIN_1; + } else { + return SDAS_INTERNAL_ERROR; + } + } else if (write_ret == 0) { + return SDAS_INTERNAL_ERROR; + } else if ((size_t)write_ret <= *chunk_remaining) { + *chunk_remaining -= (size_t)write_ret; + } else { + return SDAS_INTERNAL_ERROR; + } + } + } + } + } + +TRY_WRITE_TO_DECOMP_END: + if (*to_dec_pipe >= 0 && *chunk_remaining == 0) { + close(*to_dec_pipe); + *to_dec_pipe = -1; + } + + return SDAS_SUCCESS; +} + /// Returns SDAS_SUCCESS on success. int read_decomp_to_out_file(const char *out_filename, int in_pipe, char *read_buf, const size_t read_buf_size, - const uint64_t file_size) { + const uint64_t file_size, int *to_dec_pipe, + uint64_t *chunk_remaining, FILE *in_f) { __attribute__((cleanup(simple_archiver_helper_cleanup_FILE))) FILE *out_fd = NULL; if (out_filename) { @@ -974,6 +1060,8 @@ int read_decomp_to_out_file(const char *out_filename, int in_pipe, ssize_t read_ret; size_t fwrite_ret; while (written_amt < file_size) { + try_write_to_decomp(to_dec_pipe, chunk_remaining, in_f, read_buf, + read_buf_size); if (file_size - written_amt >= read_buf_size) { read_ret = read(in_pipe, read_buf, read_buf_size); if (read_ret > 0) { @@ -1003,10 +1091,15 @@ int read_decomp_to_out_file(const char *out_filename, int in_pipe, break; } } else { - // Error. - fprintf(stderr, "ERROR Failed to read from decompressor! (%lu)\n", - read_ret); - return SDAS_INTERNAL_ERROR; + if (errno == EAGAIN || errno == EWOULDBLOCK) { + // Non-blocking read from pipe. + continue; + } else { + // Error. + fprintf(stderr, "ERROR Failed to read from decompressor! (%lu)\n", + read_ret); + return SDAS_INTERNAL_ERROR; + } } } else { read_ret = read(in_pipe, read_buf, file_size - written_amt); @@ -1037,10 +1130,15 @@ int read_decomp_to_out_file(const char *out_filename, int in_pipe, break; } } else { - // Error. - fprintf(stderr, "ERROR Failed to read from decompressor! (%d)\n", - errno); - return SDAS_INTERNAL_ERROR; + if (errno == EAGAIN || errno == EWOULDBLOCK) { + // Non-blocking read from pipe. + continue; + } else { + // Error. + fprintf(stderr, "ERROR Failed to read from decompressor! (%d)\n", + errno); + return SDAS_INTERNAL_ERROR; + } } } } @@ -1323,6 +1421,7 @@ void simple_archiver_internal_cleanup_decomp(pid_t *decomp_pid) { "WARNING: Exec failed (exec exit code unknown)! Invalid " "decompressor cmd?\n"); } + *decomp_pid = -1; } } #endif @@ -3543,6 +3642,7 @@ int simple_archiver_parse_archive_version_1(FILE *in_f, int_fast8_t do_extract, simple_archiver_helper_64_bit_be(&u64); const uint64_t chunk_size = u64; + uint64_t chunk_remaining = chunk_size; uint64_t chunk_idx = 0; SDArchiverLLNode *node = file_info_list->head; @@ -3555,6 +3655,7 @@ int simple_archiver_parse_archive_version_1(FILE *in_f, int_fast8_t do_extract, // Start the decompressing process and read into files. // Handle SIGPIPE. + is_sig_pipe_occurred = 0; signal(SIGPIPE, handle_sig_pipe); int pipe_into_cmd[2]; @@ -3569,6 +3670,20 @@ int simple_archiver_parse_archive_version_1(FILE *in_f, int_fast8_t do_extract, close(pipe_into_cmd[0]); close(pipe_into_cmd[1]); return SDAS_INTERNAL_ERROR; + } else if (fcntl(pipe_into_cmd[1], F_SETFL, O_NONBLOCK) != 0) { + // Unable to set non-blocking on into-write-pipe. + close(pipe_into_cmd[0]); + close(pipe_into_cmd[1]); + close(pipe_outof_cmd[0]); + close(pipe_outof_cmd[1]); + return SDAS_INTERNAL_ERROR; + } else if (fcntl(pipe_outof_cmd[0], F_SETFL, O_NONBLOCK) != 0) { + // Unable to set non-blocking on outof-read-pipe. + close(pipe_into_cmd[0]); + close(pipe_into_cmd[1]); + close(pipe_outof_cmd[0]); + close(pipe_outof_cmd[1]); + return SDAS_INTERNAL_ERROR; } if (state && state->parsed && state->parsed->decompressor) { @@ -3599,12 +3714,12 @@ int simple_archiver_parse_archive_version_1(FILE *in_f, int_fast8_t do_extract, close(pipe_into_cmd[0]); close(pipe_outof_cmd[1]); - __attribute__((cleanup( - simple_archiver_internal_cleanup_int_fd))) int pipe_into_write = - pipe_into_cmd[1]; __attribute__((cleanup( simple_archiver_internal_cleanup_int_fd))) int pipe_outof_read = pipe_outof_cmd[0]; + __attribute__((cleanup( + simple_archiver_internal_cleanup_int_fd))) int pipe_into_write = + pipe_into_cmd[1]; int decompressor_status; int decompressor_return_val; @@ -3630,63 +3745,6 @@ int simple_archiver_parse_archive_version_1(FILE *in_f, int_fast8_t do_extract, return SDAS_INTERNAL_ERROR; } - // Write all of chunk into decompressor. - uint64_t chunk_written = 0; - while (chunk_written < chunk_size) { - if (is_sig_pipe_occurred) { - fprintf(stderr, - "WARNING: Failed to write to decompressor (SIGPIPE)! Invalid " - "decompressor cmd?\n"); - return SDAS_INTERNAL_ERROR; - } else if (chunk_size - chunk_written >= 1024) { - if (fread(buf, 1, 1024, in_f) != 1024) { - fprintf(stderr, "ERROR Failed to read chunk for decompressing!\n"); - return SDAS_INTERNAL_ERROR; - } - ssize_t write_ret = write(pipe_into_cmd[1], buf, 1024); - if (write_ret > 0 && (size_t)write_ret == 1024) { - // Successful write. - } else if (write_ret == -1) { - fprintf(stderr, - "WARNING: Failed to write chunk data into decompressor! " - "Invalid decompressor cmd? (errno %d)\n", - errno); - return SDAS_INTERNAL_ERROR; - } else { - fprintf(stderr, - "WARNING: Failed to write chunk data into decompressor! " - "Invalid decompressor cmd?\n"); - return SDAS_INTERNAL_ERROR; - } - chunk_written += 1024; - } else { - if (fread(buf, 1, chunk_size - chunk_written, in_f) != - chunk_size - chunk_written) { - fprintf(stderr, "ERROR Failed to read chunk for decompressing!\n"); - return SDAS_INTERNAL_ERROR; - } - ssize_t write_ret = - write(pipe_into_cmd[1], buf, chunk_size - chunk_written); - if (write_ret > 0 && - (size_t)write_ret == chunk_size - chunk_written) { - // Successful write. - } else if (write_ret == -1) { - fprintf(stderr, - "WARNING: Failed to write chunk data into decompressor! " - "Invalid decompressor cmd?\n"); - return SDAS_INTERNAL_ERROR; - } else { - fprintf(stderr, - "WARNING: Failed to write chunk data into decompressor! " - "Invalid decompressor cmd?\n"); - return SDAS_INTERNAL_ERROR; - } - chunk_written = chunk_size; - } - } - - simple_archiver_internal_cleanup_int_fd(&pipe_into_write); - while (node->next != file_info_list->tail) { node = node->next; const SDArchiverInternalFileInfo *file_info = node->data; @@ -3719,16 +3777,17 @@ int simple_archiver_parse_archive_version_1(FILE *in_f, int_fast8_t do_extract, fprintf(stderr, " WARNING: File already exists and " "\"--overwrite-extract\" is not specified, skipping!\n"); - read_decomp_to_out_file(NULL, pipe_outof_cmd[0], (char *)buf, - 1024, file_info->file_size); + read_decomp_to_out_file(NULL, pipe_outof_read, (char *)buf, 1024, + file_info->file_size, &pipe_into_write, + &chunk_remaining, in_f); continue; } } simple_archiver_helper_make_dirs(file_info->filename); - int ret = - read_decomp_to_out_file(file_info->filename, pipe_outof_cmd[0], - (char *)buf, 1024, file_info->file_size); + int ret = read_decomp_to_out_file( + file_info->filename, pipe_outof_read, (char *)buf, 1024, + file_info->file_size, &pipe_into_write, &chunk_remaining, in_f); if (ret != SDAS_SUCCESS) { return ret; } @@ -3758,13 +3817,15 @@ int simple_archiver_parse_archive_version_1(FILE *in_f, int_fast8_t do_extract, fprintf(stderr, " File size: %lu\n", file_info->file_size); } int ret = read_decomp_to_out_file( - NULL, pipe_outof_cmd[0], (char *)buf, 1024, file_info->file_size); + NULL, pipe_outof_read, (char *)buf, 1024, file_info->file_size, + &pipe_into_write, &chunk_remaining, in_f); if (ret != SDAS_SUCCESS) { return ret; } } else { int ret = read_decomp_to_out_file( - NULL, pipe_outof_cmd[0], (char *)buf, 1024, file_info->file_size); + NULL, pipe_outof_cmd[0], (char *)buf, 1024, file_info->file_size, + &pipe_into_write, &chunk_remaining, in_f); if (ret != SDAS_SUCCESS) { return ret; }