@@ -1044,6 +1044,86 @@ int fio_stat(char const* path, struct stat* st, bool follow_symlink, fio_locatio
10441044 }
10451045}
10461046
1047+ /*
1048+ * Calculate size of the file without trailing spaces in the end.
1049+ * Save result in statbuf->st_size.
1050+ *
1051+ * It is used to avoid sending trailing zeros in CFS map files.
1052+ */
1053+ static int getFileNonZeroSize (const char * path , struct stat * statbuf )
1054+ {
1055+ char buf [BLCKSZ ];
1056+ uint64 * word = (uint64 * )buf ;
1057+ pgoff_t size ;
1058+ int i ;
1059+ FILE * fp ;
1060+
1061+ stat (path , statbuf );
1062+ size = statbuf -> st_size ;
1063+
1064+ fp = fopen (path , PG_BINARY_R );
1065+
1066+ while (size > BLCKSZ && fseek (fp , size - BLCKSZ , SEEK_SET ) == 0 )
1067+ {
1068+ int rc = fread (buf , 1 , BLCKSZ , fp );
1069+
1070+ if (rc != BLCKSZ )
1071+ break ;
1072+
1073+ for (i = 0 ; i < BLCKSZ /8 ; i ++ )
1074+ {
1075+ if (word [i ] != 0 )
1076+ goto stop ;
1077+ }
1078+ size -= BLCKSZ ;
1079+ }
1080+ stop :
1081+
1082+ statbuf -> st_size = size ;
1083+ fclose (fp );
1084+
1085+ //TODO handle possible errors
1086+ return 0 ;
1087+ }
1088+
1089+ /*
1090+ * This function is wrapper for both local and remote calls.
1091+ *
1092+ * Calculate size of the file without trailing spaces in the end.
1093+ * Return result via statbuf->st_size.
1094+ */
1095+ int fio_find_non_zero_size (char const * path , struct stat * st , bool remote )
1096+ {
1097+ if (remote )
1098+ {
1099+ fio_header hdr ;
1100+ size_t path_len = strlen (path ) + 1 ;
1101+
1102+ hdr .cop = FIO_NON_ZERO_SIZE ;
1103+ hdr .arg = 0 ;
1104+ hdr .handle = -1 ;
1105+ hdr .size = path_len ;
1106+
1107+ IO_CHECK (fio_write_all (fio_stdout , & hdr , sizeof (hdr )), sizeof (hdr ));
1108+ IO_CHECK (fio_write_all (fio_stdout , path , path_len ), path_len );
1109+
1110+ IO_CHECK (fio_read_all (fio_stdin , & hdr , sizeof (hdr )), sizeof (hdr ));
1111+ Assert (hdr .cop == FIO_NON_ZERO_SIZE );
1112+ IO_CHECK (fio_read_all (fio_stdin , st , sizeof (* st )), sizeof (* st ));
1113+
1114+ if (hdr .arg != 0 )
1115+ {
1116+ errno = hdr .arg ;
1117+ return -1 ;
1118+ }
1119+ return 0 ;
1120+ }
1121+ else
1122+ {
1123+ return getFileNonZeroSize (path , st );
1124+ }
1125+ }
1126+
10471127/* Check presence of the file */
10481128int fio_access (char const * path , int mode , fio_location location )
10491129{
@@ -2085,6 +2165,7 @@ int fio_send_file_gz(const char *from_fullpath, const char *to_fullpath, FILE* o
20852165 z_stream * strm = NULL ;
20862166
20872167 hdr .cop = FIO_SEND_FILE ;
2168+ hdr .arg = 0 ; //read till EOF
20882169 hdr .size = path_len ;
20892170
20902171// elog(VERBOSE, "Thread [%d]: Attempting to open remote compressed WAL file '%s'",
@@ -2223,6 +2304,7 @@ int fio_send_file_gz(const char *from_fullpath, const char *to_fullpath, FILE* o
22232304 return exit_code ;
22242305}
22252306
2307+
22262308/* Receive chunks of data and write them to destination file.
22272309 * Return codes:
22282310 * SEND_OK (0)
@@ -2242,7 +2324,19 @@ int fio_send_file(const char *from_fullpath, const char *to_fullpath, FILE* out,
22422324 size_t path_len = strlen (from_fullpath ) + 1 ;
22432325 char * buf = pgut_malloc (CHUNK_SIZE ); /* buffer */
22442326
2327+ struct stat statbuf ;
2328+ ssize_t cfm_non_zero_size = 0 ;
2329+
2330+
2331+ if (file && file -> forkName == cfm )
2332+ {
2333+ if (fio_find_non_zero_size (from_fullpath , & statbuf , true) != 0 )
2334+ elog (ERROR , "fio_find_non_zero_size failed" );
2335+ cfm_non_zero_size = statbuf .st_size ;
2336+ }
2337+
22452338 hdr .cop = FIO_SEND_FILE ;
2339+ hdr .arg = cfm_non_zero_size ; //read till this length
22462340 hdr .size = path_len ;
22472341
22482342// elog(VERBOSE, "Thread [%d]: Attempting to open remote WAL file '%s'",
@@ -2314,13 +2408,19 @@ int fio_send_file(const char *from_fullpath, const char *to_fullpath, FILE* out,
23142408 * FIO_PAGE
23152409 * FIO_SEND_FILE_EOF
23162410 *
2411+ * If we only want to read a part of the file, pass len_wanted argument.
2412+ * Currently we do not differentiate exit codes and return with FIO_SEND_FILE_EOF
2413+ * when expected amount of bytes was send.
2414+ *
2415+ * len_wanted == 0 means read till the end of file.
23172416 */
2318- static void fio_send_file_impl (int out , char const * path )
2417+ static void fio_send_file_impl (int out , char const * path , size_t len_wanted )
23192418{
23202419 FILE * fp ;
23212420 fio_header hdr ;
23222421 char * buf = pgut_malloc (CHUNK_SIZE );
23232422 size_t read_len = 0 ;
2423+ size_t real_send_len = 0 ;
23242424 char * errormsg = NULL ;
23252425
23262426 /* open source file for read */
@@ -2389,6 +2489,10 @@ static void fio_send_file_impl(int out, char const* path)
23892489 IO_CHECK (fio_write_all (out , buf , read_len ), read_len );
23902490 }
23912491
2492+ real_send_len += read_len ;
2493+ if (len_wanted && real_send_len >= len_wanted )
2494+ break ; //TODO pass real_send_len somewhere
2495+
23922496 if (feof (fp ))
23932497 break ;
23942498 }
@@ -2543,6 +2647,7 @@ static void fio_list_dir_impl(int out, char* buf)
25432647 fio_file .linked_len = 0 ;
25442648
25452649 hdr .cop = FIO_SEND_FILE ;
2650+ hdr .arg = 0 ; //read till EOF
25462651 hdr .size = strlen (file -> rel_path ) + 1 ;
25472652
25482653 /* send rel_path first */
@@ -2931,7 +3036,7 @@ void fio_communicate(int in, int out)
29313036 fio_send_pages_impl (out , buf );
29323037 break ;
29333038 case FIO_SEND_FILE :
2934- fio_send_file_impl (out , buf );
3039+ fio_send_file_impl (out , buf , hdr . arg );
29353040 break ;
29363041 case FIO_SYNC :
29373042 /* open file and fsync it */
@@ -2980,6 +3085,14 @@ void fio_communicate(int in, int out)
29803085 case FIO_GET_ASYNC_ERROR :
29813086 fio_get_async_error_impl (out );
29823087 break ;
3088+ case FIO_NON_ZERO_SIZE : /* Get non-zero length of the file in specified path.
3089+ * Currently used for cfm optimization */
3090+ hdr .size = sizeof (st );
3091+ rc = getFileNonZeroSize (buf , & st );
3092+ hdr .arg = rc < 0 ? errno : 0 ;
3093+ IO_CHECK (fio_write_all (out , & hdr , sizeof (hdr )), sizeof (hdr ));
3094+ IO_CHECK (fio_write_all (out , & st , sizeof (st )), sizeof (st ));
3095+ break ;
29833096 default :
29843097 Assert (false);
29853098 }
0 commit comments