1 --- configure_orig.in 2009-03-01 13:50:30.000000000 +0800
2 +++ configure.in 2009-02-27 13:35:42.000000000 +0800
4 if test -n "$file_system_testfs"; then
5 AC_DEFINE(ROMIO_TESTFS,1,[Define for ROMIO with TESTFS])
8 +# Verify presence of lustre/lustre_user.h
10 if test -n "$file_system_lustre"; then
11 - AC_DEFINE(ROMIO_LUSTRE,1,[Define for ROMIO with LUSTRE])
12 + AC_CHECK_HEADERS(lustre/lustre_user.h,
13 + AC_DEFINE(ROMIO_LUSTRE,1,[Define for ROMIO with LUSTRE]),
14 + AC_MSG_ERROR([LUSTRE support requested but cannot find lustre/lustre_user.h header file])
18 if test -n "$file_system_xfs"; then
19 --- adio/include/adioi_orig.h 2009-03-01 14:00:48.000000000 +0800
20 +++ adio/include/adioi.h 2009-04-24 15:26:44.000000000 +0800
34 diff -ruN adio/ad_lustre_orig/ad_lustre_aggregate.c adio/ad_lustre/ad_lustre_aggregate.c
35 --- adio/ad_lustre_orig/ad_lustre_aggregate.c 1970-01-01 08:00:00.000000000 +0800
36 +++ adio/ad_lustre/ad_lustre_aggregate.c 2009-05-05 15:22:40.000000000 +0800
38 +/* -*- Mode: C; c-basic-offset:4 ; -*- */
40 + * Copyright (C) 1997 University of Chicago.
41 + * See COPYRIGHT notice in top-level directory.
43 + * Copyright (C) 2007 Oak Ridge National Laboratory
45 + * Copyright (C) 2008 Sun Microsystems, Lustre group
48 +#include "ad_lustre.h"
49 +#include "adio_extern.h"
53 +void ADIOI_LUSTRE_Get_striping_info(ADIO_File fd, int **striping_info_ptr,
56 + int *striping_info = NULL;
57 + /* get striping information:
58 + * striping_info[0]: stripe_size
59 + * striping_info[1]: stripe_count
60 + * striping_info[2]: avail_cb_nodes
62 + int stripe_size, stripe_count, CO = 1, CO_max = 1, CO_nodes, lflag;
63 + int avail_cb_nodes, divisor, nprocs_for_coll = fd->hints->cb_nodes;
64 + char *value = (char *) ADIOI_Malloc((MPI_MAX_INFO_VAL + 1) * sizeof(char));
66 + /* Get hints value */
68 + stripe_size = fd->hints->striping_unit;
70 + /* stripe_size and stripe_count have been validated in ADIOI_LUSTRE_Open() */
71 + stripe_count = fd->hints->striping_factor;
73 + /* Calculate the available number of I/O clients, that is
74 + * avail_cb_nodes=min(cb_nodes, stripe_count*CO), where
78 + /* for collective read,
79 + * if "CO" clients access the same OST simultaneously,
80 + * the OST disk seek time would be much. So, to avoid this,
81 + * it might be better if 1 client only accesses 1 OST.
82 + * So, we set CO = 1 to meet the above requirement.
85 + /*XXX: maybe there are other better way for collective read */
87 + /* CO_max: the largest number of IO clients for each ost group */
88 + CO_max = (nprocs_for_coll - 1)/ stripe_count + 1;
89 + /* CO also has been validated in ADIOI_LUSTRE_Open(), >0 */
90 + CO = fd->hints->fs_hints.lustre.co_ratio;
91 + CO = ADIOI_MIN(CO_max, CO);
93 + /* Calculate how many IO clients we need */
94 + /* To avoid extent lock conflicts,
95 + * avail_cb_nodes should divide (stripe_count*CO) exactly,
96 + * so that each OST is accessed by only one or more constant clients. */
97 + CO_nodes = stripe_count * CO;
98 + avail_cb_nodes = ADIOI_MIN(nprocs_for_coll, CO_nodes);
99 + if (avail_cb_nodes < CO_nodes) {
101 + /* find the divisor of CO_nodes */
105 + } while (CO_nodes % divisor);
106 + CO_nodes = CO_nodes / divisor;
107 + /* if stripe_count*CO is a prime number, change nothing */
108 + if ((CO_nodes <= avail_cb_nodes) && (CO_nodes != 1)) {
109 + avail_cb_nodes = CO_nodes;
112 + } while (CO_nodes != 1);
115 + *striping_info_ptr = (int *) ADIOI_Malloc(3 * sizeof(int));
116 + striping_info = *striping_info_ptr;
117 + striping_info[0] = stripe_size;
118 + striping_info[1] = stripe_count;
119 + striping_info[2] = avail_cb_nodes;
124 +int ADIOI_LUSTRE_Calc_aggregator(ADIO_File fd, ADIO_Offset off,
125 + ADIO_Offset *len, int *striping_info)
127 + int rank_index, rank;
128 + ADIO_Offset avail_bytes;
129 + int stripe_size = striping_info[0];
130 + int avail_cb_nodes = striping_info[2];
132 + /* Produce the stripe-contiguous pattern for Lustre */
133 + rank_index = (int)((off / stripe_size) % avail_cb_nodes);
135 + /* we index into fd_end with rank_index, and fd_end was allocated to be no
136 + * bigger than fd->hins->cb_nodes. If we ever violate that, we're
137 + * overrunning arrays. Obviously, we should never ever hit this abort
139 + if (rank_index >= fd->hints->cb_nodes)
140 + MPI_Abort(MPI_COMM_WORLD, 1);
142 + avail_bytes = (off / (ADIO_Offset)stripe_size + 1) *
143 + (ADIO_Offset)stripe_size - off;
144 + if (avail_bytes < *len) {
145 + /* this proc only has part of the requested contig. region */
146 + *len = avail_bytes;
148 + /* map our index to a rank */
149 + /* NOTE: FOR NOW WE DON'T HAVE A MAPPING...JUST DO 0..NPROCS_FOR_COLL */
150 + rank = fd->hints->ranklist[rank_index];
155 +/* ADIOI_LUSTRE_Calc_my_req() - calculate what portions of the access requests
156 + * of this process are located in the file domains of various processes
157 + * (including this one)
159 +void ADIOI_LUSTRE_Calc_my_req(ADIO_File fd, ADIO_Offset *offset_list,
160 + int *len_list, int contig_access_count,
161 + int *striping_info, int nprocs,
162 + int *count_my_req_procs_ptr,
163 + int **count_my_req_per_proc_ptr,
164 + ADIOI_Access **my_req_ptr,
167 + /* Nothing different from ADIOI_Calc_my_req(), except calling
168 + * ADIOI_Lustre_Calc_aggregator() instead of the old one */
169 + int *count_my_req_per_proc, count_my_req_procs, *buf_idx;
171 + ADIO_Offset avail_len, rem_len, curr_idx, off;
172 + ADIOI_Access *my_req;
174 + *count_my_req_per_proc_ptr = (int *) ADIOI_Calloc(nprocs, sizeof(int));
175 + count_my_req_per_proc = *count_my_req_per_proc_ptr;
176 + /* count_my_req_per_proc[i] gives the no. of contig. requests of this
177 + * process in process i's file domain. calloc initializes to zero.
178 + * I'm allocating memory of size nprocs, so that I can do an
179 + * MPI_Alltoall later on.
182 + buf_idx = (int *) ADIOI_Malloc(nprocs * sizeof(int));
183 + /* buf_idx is relevant only if buftype_is_contig.
184 + * buf_idx[i] gives the index into user_buf where data received
185 + * from proc. i should be placed. This allows receives to be done
186 + * without extra buffer. This can't be done if buftype is not contig.
189 + /* initialize buf_idx to -1 */
190 + for (i = 0; i < nprocs; i++)
193 + /* one pass just to calculate how much space to allocate for my_req;
194 + * contig_access_count was calculated way back in ADIOI_Calc_my_off_len()
196 + for (i = 0; i < contig_access_count; i++) {
197 + /* short circuit offset/len processing if len == 0
198 + * (zero-byte read/write
200 + if (len_list[i] == 0)
202 + off = offset_list[i];
203 + avail_len = len_list[i];
204 + /* note: we set avail_len to be the total size of the access.
205 + * then ADIOI_LUSTRE_Calc_aggregator() will modify the value to return
206 + * the amount that was available.
208 + proc = ADIOI_LUSTRE_Calc_aggregator(fd, off, &avail_len, striping_info);
209 + count_my_req_per_proc[proc]++;
211 + /* figure out how many data is remaining in the access
212 + * we'll take care of this data (if there is any)
213 + * in the while loop below.
215 + rem_len = len_list[i] - avail_len;
217 + while (rem_len != 0) {
218 + off += avail_len; /* point to first remaining byte */
219 + avail_len = rem_len; /* save remaining size, pass to calc */
220 + proc = ADIOI_LUSTRE_Calc_aggregator(fd, off, &avail_len, striping_info);
221 + count_my_req_per_proc[proc]++;
222 + rem_len -= avail_len; /* reduce remaining length by amount from fd */
226 + /* now allocate space for my_req, offset, and len */
227 + *my_req_ptr = (ADIOI_Access *) ADIOI_Malloc(nprocs * sizeof(ADIOI_Access));
228 + my_req = *my_req_ptr;
230 + count_my_req_procs = 0;
231 + for (i = 0; i < nprocs; i++) {
232 + if (count_my_req_per_proc[i]) {
233 + my_req[i].offsets = (ADIO_Offset *)
234 + ADIOI_Malloc(count_my_req_per_proc[i] *
235 + sizeof(ADIO_Offset));
236 + my_req[i].lens = (int *) ADIOI_Malloc(count_my_req_per_proc[i] *
238 + count_my_req_procs++;
240 + my_req[i].count = 0; /* will be incremented where needed later */
243 + /* now fill in my_req */
245 + for (i = 0; i < contig_access_count; i++) {
246 + /* short circuit offset/len processing if len == 0
247 + * (zero-byte read/write */
248 + if (len_list[i] == 0)
250 + off = offset_list[i];
251 + avail_len = len_list[i];
252 + proc = ADIOI_LUSTRE_Calc_aggregator(fd, off, &avail_len, striping_info);
254 + /* for each separate contiguous access from this process */
255 + if (buf_idx[proc] == -1)
256 + buf_idx[proc] = (int) curr_idx;
258 + l = my_req[proc].count;
259 + curr_idx += (int) avail_len; /* NOTE: Why is curr_idx an int? Fix? */
261 + rem_len = len_list[i] - avail_len;
263 + /* store the proc, offset, and len information in an array
264 + * of structures, my_req. Each structure contains the
265 + * offsets and lengths located in that process's FD,
266 + * and the associated count.
268 + my_req[proc].offsets[l] = off;
269 + my_req[proc].lens[l] = (int) avail_len;
270 + my_req[proc].count++;
272 + while (rem_len != 0) {
274 + avail_len = rem_len;
275 + proc = ADIOI_LUSTRE_Calc_aggregator(fd, off, &avail_len,
277 + if (buf_idx[proc] == -1)
278 + buf_idx[proc] = (int) curr_idx;
280 + l = my_req[proc].count;
281 + curr_idx += avail_len;
282 + rem_len -= avail_len;
284 + my_req[proc].offsets[l] = off;
285 + my_req[proc].lens[l] = (int) avail_len;
286 + my_req[proc].count++;
291 + for (i = 0; i < nprocs; i++) {
292 + if (count_my_req_per_proc[i] > 0) {
293 + FPRINTF(stdout, "data needed from %d (count = %d):\n",
294 + i, my_req[i].count);
295 + for (l = 0; l < my_req[i].count; l++) {
296 + FPRINTF(stdout, " off[%d] = %lld, len[%d] = %d\n",
297 + l, my_req[i].offsets[l], l, my_req[i].lens[l]);
302 + for (i = 0; i < nprocs; i++) {
303 + FPRINTF(stdout, "buf_idx[%d] = 0x%x\n", i, buf_idx[i]);
308 + *count_my_req_procs_ptr = count_my_req_procs;
309 + *buf_idx_ptr = buf_idx;
312 +int ADIOI_LUSTRE_Docollect(ADIO_File fd, int contig_access_count,
313 + int *len_list, int nprocs)
315 + /* If the processes are non-interleaved, we will check the req_size.
316 + * if (avg_req_size > big_req_size) {
321 + int i, docollect = 1, lflag, big_req_size = 0;
322 + ADIO_Offset req_size = 0, total_req_size;
323 + int avg_req_size, total_access_count;
325 + /* calculate total_req_size and total_access_count */
326 + for (i = 0; i < contig_access_count; i++)
327 + req_size += len_list[i];
328 + MPI_Allreduce(&req_size, &total_req_size, 1, MPI_LONG_LONG_INT, MPI_SUM,
330 + MPI_Allreduce(&contig_access_count, &total_access_count, 1, MPI_INT, MPI_SUM,
332 + /* estimate average req_size */
333 + avg_req_size = (int)(total_req_size / total_access_count);
334 + /* get hint of big_req_size */
335 + big_req_size = fd->hints->fs_hints.lustre.coll_threshold;
336 + /* Don't perform collective I/O if there are big requests */
337 + if ((big_req_size > 0) && (avg_req_size > big_req_size))
342 diff -ruN adio/ad_lustre_orig/ad_lustre.c adio/ad_lustre/ad_lustre.c
343 --- adio/ad_lustre_orig/ad_lustre.c 2008-09-17 14:36:56.000000000 +0800
344 +++ adio/ad_lustre/ad_lustre.c 2008-10-17 17:03:42.000000000 +0800
346 /* -*- Mode: C; c-basic-offset:4 ; -*- */
348 - * Copyright (C) 2001 University of Chicago.
350 + * Copyright (C) 2001 University of Chicago.
351 * See COPYRIGHT notice in top-level directory.
353 * Copyright (C) 2007 Oak Ridge National Laboratory
355 + * Copyright (C) 2008 Sun Microsystems, Lustre group
358 #include "ad_lustre.h"
360 ADIOI_LUSTRE_ReadContig, /* ReadContig */
361 ADIOI_LUSTRE_WriteContig, /* WriteContig */
362 ADIOI_GEN_ReadStridedColl, /* ReadStridedColl */
363 - ADIOI_GEN_WriteStridedColl, /* WriteStridedColl */
364 + ADIOI_LUSTRE_WriteStridedColl, /* WriteStridedColl */
365 ADIOI_GEN_SeekIndividual, /* SeekIndividual */
366 ADIOI_GEN_Fcntl, /* Fcntl */
367 ADIOI_LUSTRE_SetInfo, /* SetInfo */
368 ADIOI_GEN_ReadStrided, /* ReadStrided */
369 - ADIOI_GEN_WriteStrided, /* WriteStrided */
370 + ADIOI_LUSTRE_WriteStrided, /* WriteStrided */
371 ADIOI_GEN_Close, /* Close */
372 #if defined(ROMIO_HAVE_WORKING_AIO) && !defined(CRAY_XT_LUSTRE)
373 ADIOI_GEN_IreadContig, /* IreadContig */
374 diff -ruN adio/ad_lustre_orig/ad_lustre.h adio/ad_lustre/ad_lustre.h
375 --- adio/ad_lustre_orig/ad_lustre.h 2008-09-17 14:36:56.000000000 +0800
376 +++ adio/ad_lustre/ad_lustre.h 2009-05-05 15:34:58.000000000 +0800
378 /* -*- Mode: C; c-basic-offset:4 ; -*- */
380 - * Copyright (C) 1997 University of Chicago.
382 + * Copyright (C) 1997 University of Chicago.
383 * See COPYRIGHT notice in top-level directory.
385 * Copyright (C) 2007 Oak Ridge National Laboratory
387 + * Copyright (C) 2008 Sun Microsystems, Lustre group
390 #ifndef AD_UNIX_INCLUDE
393 /*#include <fcntl.h>*/
394 #include <sys/ioctl.h>
395 -#include "lustre/lustre_user.h"
396 +#include <lustre/lustre_user.h>
398 /*#include "adioi.h"*/
402 void ADIOI_LUSTRE_Open(ADIO_File fd, int *error_code);
403 void ADIOI_LUSTRE_Close(ADIO_File fd, int *error_code);
404 -void ADIOI_LUSTRE_ReadContig(ADIO_File fd, void *buf, int count,
405 - MPI_Datatype datatype, int file_ptr_type,
406 - ADIO_Offset offset, ADIO_Status *status, int
408 -void ADIOI_LUSTRE_WriteContig(ADIO_File fd, void *buf, int count,
409 - MPI_Datatype datatype, int file_ptr_type,
410 - ADIO_Offset offset, ADIO_Status *status, int
412 +void ADIOI_LUSTRE_ReadContig(ADIO_File fd, void *buf, int count,
413 + MPI_Datatype datatype, int file_ptr_type,
414 + ADIO_Offset offset, ADIO_Status *status,
416 +void ADIOI_LUSTRE_WriteContig(ADIO_File fd, void *buf, int count,
417 + MPI_Datatype datatype, int file_ptr_type,
418 + ADIO_Offset offset, ADIO_Status *status,
420 +void ADIOI_LUSTRE_WriteStrided(ADIO_File fd, void *buf, int count,
421 + MPI_Datatype datatype, int file_ptr_type,
422 + ADIO_Offset offset, ADIO_Status *status,
424 void ADIOI_LUSTRE_WriteStridedColl(ADIO_File fd, void *buf, int count,
425 - MPI_Datatype datatype, int file_ptr_type,
426 - ADIO_Offset offset, ADIO_Status *status, int
428 + MPI_Datatype datatype, int file_ptr_type,
429 + ADIO_Offset offset, ADIO_Status *status,
431 void ADIOI_LUSTRE_ReadStridedColl(ADIO_File fd, void *buf, int count,
432 - MPI_Datatype datatype, int file_ptr_type,
433 - ADIO_Offset offset, ADIO_Status *status, int
435 + MPI_Datatype datatype, int file_ptr_type,
436 + ADIO_Offset offset, ADIO_Status *status,
438 +void ADIOI_LUSTRE_ReadStrided(ADIO_File fd, void *buf, int count,
439 + MPI_Datatype datatype, int file_ptr_type,
440 + ADIO_Offset offset, ADIO_Status *status,
442 void ADIOI_LUSTRE_Fcntl(ADIO_File fd, int flag, ADIO_Fcntl_t *fcntl_struct,
444 void ADIOI_LUSTRE_SetInfo(ADIO_File fd, MPI_Info users_info, int *error_code);
446 #endif /* End of AD_UNIX_INCLUDE */
447 diff -ruN adio/ad_lustre_orig/ad_lustre_hints.c adio/ad_lustre/ad_lustre_hints.c
448 --- adio/ad_lustre_orig/ad_lustre_hints.c 2008-09-17 14:36:56.000000000 +0800
449 +++ adio/ad_lustre/ad_lustre_hints.c 2009-04-24 15:35:05.000000000 +0800
451 /* -*- Mode: C; c-basic-offset:4 ; -*- */
453 - * Copyright (C) 1997 University of Chicago.
455 + * Copyright (C) 1997 University of Chicago.
456 * See COPYRIGHT notice in top-level directory.
458 * Copyright (C) 2007 Oak Ridge National Laboratory
460 + * Copyright (C) 2008 Sun Microsystems, Lustre group
463 #include "ad_lustre.h"
465 void ADIOI_LUSTRE_SetInfo(ADIO_File fd, MPI_Info users_info, int *error_code)
467 char *value, *value_in_fd;
468 - int flag, tmp_val[3], str_factor=-1, str_unit=0, start_iodev=-1;
469 + int flag, stripe_val[3], str_factor = -1, str_unit=0, start_iodev=-1;
470 struct lov_user_md lum = { 0 };
471 int err, myrank, fd_sys, perm, amode, old_mask;
472 + int int_val, tmp_val;
473 + static char myname[] = "ADIOI_LUSTRE_SETINFO";
475 value = (char *) ADIOI_Malloc((MPI_MAX_INFO_VAL+1)*sizeof(char));
476 if ( (fd->info) == MPI_INFO_NULL) {
477 - /* This must be part of the open call. can set striping parameters
479 + /* This must be part of the open call. can set striping parameters
481 MPI_Info_create(&(fd->info));
483 MPI_Info_set(fd->info, "direct_read", "false");
484 MPI_Info_set(fd->info, "direct_write", "false");
485 fd->direct_read = fd->direct_write = 0;
487 - /* has user specified striping or server buffering parameters
488 + /* initialize lustre hints */
489 + MPI_Info_set(fd->info, "romio_lustre_co_ratio", "1");
490 + fd->hints->fs_hints.lustre.co_ratio = 1;
491 + MPI_Info_set(fd->info, "romio_lustre_coll_threshold", "0");
492 + fd->hints->fs_hints.lustre.coll_threshold = 0;
493 + MPI_Info_set(fd->info, "romio_lustre_ds_in_coll", "enable");
494 + fd->hints->fs_hints.lustre.ds_in_coll = ADIOI_HINT_ENABLE;
496 + /* has user specified striping or server buffering parameters
497 and do they have the same value on all processes? */
498 if (users_info != MPI_INFO_NULL) {
499 - MPI_Info_get(users_info, "striping_unit", MPI_MAX_INFO_VAL,
500 + /* striping information */
501 + MPI_Info_get(users_info, "striping_unit", MPI_MAX_INFO_VAL,
505 str_unit=atoi(value);
507 - MPI_Info_get(users_info, "striping_factor", MPI_MAX_INFO_VAL,
508 + MPI_Info_get(users_info, "striping_factor", MPI_MAX_INFO_VAL,
512 str_factor=atoi(value);
514 - MPI_Info_get(users_info, "start_iodevice", MPI_MAX_INFO_VAL,
517 + MPI_Info_get(users_info, "romio_lustre_start_iodevice",
518 + MPI_MAX_INFO_VAL, value, &flag);
520 start_iodev=atoi(value);
522 - MPI_Info_get(users_info, "direct_read", MPI_MAX_INFO_VAL,
524 + /* direct read and write */
525 + MPI_Info_get(users_info, "direct_read", MPI_MAX_INFO_VAL,
527 if (flag && (!strcmp(value, "true") || !strcmp(value, "TRUE"))) {
528 MPI_Info_set(fd->info, "direct_read", "true");
532 - MPI_Info_get(users_info, "direct_write", MPI_MAX_INFO_VAL,
533 + MPI_Info_get(users_info, "direct_write", MPI_MAX_INFO_VAL,
535 if (flag && (!strcmp(value, "true") || !strcmp(value, "TRUE"))) {
536 MPI_Info_set(fd->info, "direct_write", "true");
541 + /* set striping information with ioctl */
542 MPI_Comm_rank(fd->comm, &myrank);
544 - tmp_val[0] = str_factor;
545 - tmp_val[1] = str_unit;
546 - tmp_val[2] = start_iodev;
547 + stripe_val[0] = str_factor;
548 + stripe_val[1] = str_unit;
549 + stripe_val[2] = start_iodev;
551 - MPI_Bcast(tmp_val, 3, MPI_INT, 0, fd->comm);
552 + MPI_Bcast(stripe_val, 3, MPI_INT, 0, fd->comm);
554 - if (tmp_val[0] != str_factor
555 - || tmp_val[1] != str_unit
556 - || tmp_val[2] != start_iodev) {
557 + if (stripe_val[0] != str_factor
558 + || stripe_val[1] != str_unit
559 + || stripe_val[2] != start_iodev) {
560 FPRINTF(stderr, "ADIOI_LUSTRE_SetInfo: All keys"
561 "-striping_factor:striping_unit:start_iodevice "
562 "need to be identical across all processes\n");
563 MPI_Abort(MPI_COMM_WORLD, 1);
564 - } else if ((str_factor > 0) || (str_unit > 0) || (start_iodev >= 0)) {
565 + } else if ((str_factor > 0) || (str_unit > 0) || (start_iodev >= 0)) {
566 /* if user has specified striping info, process 0 tries to set it */
568 if (fd->perm == ADIO_PERM_NULL) {
570 amode = amode | O_LOV_DELAY_CREATE | O_CREAT;
572 fd_sys = open(fd->filename, amode, perm);
573 - if (fd_sys == -1) {
574 - if (errno != EEXIST)
576 + if (fd_sys == -1) {
577 + if (errno != EEXIST)
579 "Failure to open file %s %d %d\n",strerror(errno), amode, perm);
581 lum.lmm_magic = LOV_USER_MAGIC;
582 @@ -112,25 +125,73 @@
583 lum.lmm_stripe_offset = start_iodev;
585 err = ioctl(fd_sys, LL_IOC_LOV_SETSTRIPE, &lum);
586 - if (err == -1 && errno != EEXIST) {
587 + if (err == -1 && errno != EEXIST) {
588 fprintf(stderr, "Failure to set stripe info %s \n", strerror(errno));
592 } /* End of striping parameters validation */
595 MPI_Barrier(fd->comm);
596 - /* set the values for collective I/O and data sieving parameters */
597 - ADIOI_GEN_SetInfo(fd, users_info, error_code);
599 - /* The file has been opened previously and fd->fd_sys is a valid
600 - file descriptor. cannot set striping parameters now. */
602 - /* set the values for collective I/O and data sieving parameters */
603 - ADIOI_GEN_SetInfo(fd, users_info, error_code);
606 + /* get other hint */
607 + if (users_info != MPI_INFO_NULL) {
608 + /* CO: IO Clients/OST,
609 + * to keep the load balancing between clients and OSTs */
610 + MPI_Info_get(users_info, "romio_lustre_co_ratio", MPI_MAX_INFO_VAL, value,
612 + if (flag && (int_val = atoi(value)) > 0) {
614 + MPI_Bcast(&tmp_val, 1, MPI_INT, 0, fd->comm);
615 + if (tmp_val != int_val) {
616 + MPIO_ERR_CREATE_CODE_INFO_NOT_SAME(myname,
617 + "romio_lustre_co_ratio",
622 + MPI_Info_set(fd->info, "romio_lustre_co_ratio", value);
623 + fd->hints->fs_hints.lustre.co_ratio = atoi(value);
626 + * if the req size is bigger than this, collective IO may not be performed.
628 + MPI_Info_get(users_info, "romio_lustre_coll_threshold", MPI_MAX_INFO_VAL, value,
630 + if (flag && (int_val = atoi(value)) > 0) {
632 + MPI_Bcast(&tmp_val, 1, MPI_INT, 0, fd->comm);
633 + if (tmp_val != int_val) {
634 + MPIO_ERR_CREATE_CODE_INFO_NOT_SAME(myname,
635 + "romio_lustre_coll_threshold",
640 + MPI_Info_set(fd->info, "romio_lustre_coll_threshold", value);
641 + fd->hints->fs_hints.lustre.coll_threshold = atoi(value);
643 + /* ds_in_coll: disable data sieving in collective IO */
644 + MPI_Info_get(users_info, "romio_lustre_ds_in_coll", MPI_MAX_INFO_VAL,
646 + if (flag && (!strcmp(value, "disable") ||
647 + !strcmp(value, "DISABLE"))) {
648 + tmp_val = int_val = 2;
649 + MPI_Bcast(&tmp_val, 2, MPI_INT, 0, fd->comm);
650 + if (tmp_val != int_val) {
651 + MPIO_ERR_CREATE_CODE_INFO_NOT_SAME(myname,
652 + "romio_lustre_ds_in_coll",
657 + MPI_Info_set(fd->info, "romio_lustre_ds_in_coll", "disable");
658 + fd->hints->fs_hints.lustre.ds_in_coll = ADIOI_HINT_DISABLE;
661 + /* set the values for collective I/O and data sieving parameters */
662 + ADIOI_GEN_SetInfo(fd, users_info, error_code);
664 if (ADIOI_Direct_read) fd->direct_read = 1;
665 if (ADIOI_Direct_write) fd->direct_write = 1;
667 diff -ruN adio/ad_lustre_orig/ad_lustre_open.c adio/ad_lustre/ad_lustre_open.c
668 --- adio/ad_lustre_orig/ad_lustre_open.c 2008-09-17 14:36:56.000000000 +0800
669 +++ adio/ad_lustre/ad_lustre_open.c 2009-03-01 11:32:32.000000000 +0800
671 /* -*- Mode: C; c-basic-offset:4 ; -*- */
673 - * Copyright (C) 1997 University of Chicago.
675 + * Copyright (C) 1997 University of Chicago.
676 * See COPYRIGHT notice in top-level directory.
678 * Copyright (C) 2007 Oak Ridge National Laboratory
680 + * Copyright (C) 2008 Sun Microsystems, Lustre group
683 #include "ad_lustre.h"
685 err = ioctl(fd->fd_sys, LL_IOC_LOV_GETSTRIPE, (void *) &lum);
688 + fd->hints->striping_unit = lum.lmm_stripe_size;
689 sprintf(value, "%d", lum.lmm_stripe_size);
690 MPI_Info_set(fd->info, "striping_unit", value);
692 + fd->hints->striping_factor = lum.lmm_stripe_count;
693 sprintf(value, "%d", lum.lmm_stripe_count);
694 MPI_Info_set(fd->info, "striping_factor", value);
696 + fd->hints->fs_hints.lustre.start_iodevice = lum.lmm_stripe_offset;
697 sprintf(value, "%d", lum.lmm_stripe_offset);
698 - MPI_Info_set(fd->info, "start_iodevice", value);
699 + MPI_Info_set(fd->info, "romio_lustre_start_iodevice", value);
703 diff -ruN adio/ad_lustre_orig/ad_lustre_rwcontig.c adio/ad_lustre/ad_lustre_rwcontig.c
704 --- adio/ad_lustre_orig/ad_lustre_rwcontig.c 2008-09-17 14:36:56.000000000 +0800
705 +++ adio/ad_lustre/ad_lustre_rwcontig.c 2009-05-05 15:34:29.000000000 +0800
707 /* -*- Mode: C; c-basic-offset:4 ; -*- */
709 - * Copyright (C) 1997 University of Chicago.
711 + * Copyright (C) 1997 University of Chicago.
712 * See COPYRIGHT notice in top-level directory.
714 * Copyright (C) 2007 Oak Ridge National Laboratory
716 + * Copyright (C) 2008 Sun Microsystems, Lustre group
719 #define _XOPEN_SOURCE 600
720 @@ -136,10 +138,23 @@
721 if (err == -1) goto ioerr;
726 +#ifdef ADIOI_MPE_LOGGING
727 + MPE_Log_event(ADIOI_MPE_write_a, 0, NULL);
729 err = write(fd->fd_sys, buf, len);
731 +#ifdef ADIOI_MPE_LOGGING
732 + MPE_Log_event(ADIOI_MPE_write_b, 0, NULL);
735 +#ifdef ADIOI_MPE_LOGGING
736 + MPE_Log_event(ADIOI_MPE_read_a, 0, NULL);
738 err = read(fd->fd_sys, buf, len);
739 +#ifdef ADIOI_MPE_LOGGING
740 + MPE_Log_event(ADIOI_MPE_read_b, 0, NULL);
744 err = ADIOI_LUSTRE_Directio(fd, buf, len, offset, io_mode);
746 diff -ruN adio/ad_lustre_orig/ad_lustre_wrcoll.c adio/ad_lustre/ad_lustre_wrcoll.c
747 --- adio/ad_lustre_orig/ad_lustre_wrcoll.c 1970-01-01 08:00:00.000000000 +0800
748 +++ adio/ad_lustre/ad_lustre_wrcoll.c 2009-04-24 14:48:34.000000000 +0800
750 +/* -*- Mode: C; c-basic-offset:4 ; -*- */
752 + * Copyright (C) 1997 University of Chicago.
753 + * See COPYRIGHT notice in top-level directory.
755 + * Copyright (C) 2007 Oak Ridge National Laboratory
757 + * Copyright (C) 2008 Sun Microsystems, Lustre group
760 +#include "ad_lustre.h"
761 +#include "adio_extern.h"
763 +/* prototypes of functions used for collective writes only. */
764 +static void ADIOI_LUSTRE_Exch_and_write(ADIO_File fd, void *buf,
765 + MPI_Datatype datatype, int nprocs,
767 + ADIOI_Access *others_req,
768 + ADIOI_Access *my_req,
769 + ADIO_Offset *offset_list,
771 + int contig_access_count,
772 + int *striping_info,
773 + int *buf_idx, int *error_code);
774 +static void ADIOI_LUSTRE_Fill_send_buffer(ADIO_File fd, void *buf,
775 + ADIOI_Flatlist_node *flat_buf,
777 + ADIO_Offset *offset_list,
778 + int *len_list, int *send_size,
779 + MPI_Request *requests,
780 + int *sent_to_proc, int nprocs,
781 + int myrank, int contig_access_count,
782 + int *striping_info,
785 + int *done_to_proc, int iter,
786 + MPI_Aint buftype_extent);
787 +static void ADIOI_LUSTRE_W_Exchange_data(ADIO_File fd, void *buf,
789 + ADIOI_Flatlist_node *flat_buf,
790 + ADIO_Offset *offset_list,
791 + int *len_list, int *send_size,
792 + int *recv_size, ADIO_Offset off,
793 + int size, int *count,
794 + int *start_pos, int *partial_recv,
795 + int *sent_to_proc, int nprocs,
796 + int myrank, int buftype_is_contig,
797 + int contig_access_count,
798 + int *striping_info,
799 + ADIOI_Access *others_req,
802 + int *done_to_proc, int *hole,
803 + int iter, MPI_Aint buftype_extent,
804 + int *buf_idx, int *error_code);
805 +void ADIOI_Heap_merge(ADIOI_Access *others_req, int *count,
806 + ADIO_Offset *srt_off, int *srt_len, int *start_pos,
807 + int nprocs, int nprocs_recv, int total_elements);
809 +void ADIOI_LUSTRE_WriteStridedColl(ADIO_File fd, void *buf, int count,
810 + MPI_Datatype datatype,
811 + int file_ptr_type, ADIO_Offset offset,
812 + ADIO_Status *status, int *error_code)
814 + /* Uses a generalized version of the extended two-phase method described
815 + * in "An Extended Two-Phase Method for Accessing Sections of
816 + * Out-of-Core Arrays", Rajeev Thakur and Alok Choudhary,
817 + * Scientific Programming, (5)4:301--317, Winter 1996.
818 + * http://www.mcs.anl.gov/home/thakur/ext2ph.ps
821 + ADIOI_Access *my_req;
822 + /* array of nprocs access structures, one for each other process has
823 + this process's request */
825 + ADIOI_Access *others_req;
826 + /* array of nprocs access structures, one for each other process
827 + whose request is written by this process. */
829 + int i, filetype_is_contig, nprocs, myrank, do_collect = 0;
830 + int contig_access_count = 0, buftype_is_contig, interleave_count = 0;
831 + int *count_my_req_per_proc, count_my_req_procs, count_others_req_procs;
832 + ADIO_Offset orig_fp, start_offset, end_offset, off;
833 + ADIO_Offset *offset_list = NULL, *st_offsets = NULL, *end_offsets = NULL;
834 + int *buf_idx = NULL, *len_list = NULL, *striping_info = NULL;
835 + int old_error, tmp_error;
837 + MPI_Comm_size(fd->comm, &nprocs);
838 + MPI_Comm_rank(fd->comm, &myrank);
840 + orig_fp = fd->fp_ind;
842 + /* IO patten identification if cb_write isn't disabled */
843 + if (fd->hints->cb_write != ADIOI_HINT_DISABLE) {
844 + /* For this process's request, calculate the list of offsets and
845 + lengths in the file and determine the start and end offsets. */
847 + /* Note: end_offset points to the last byte-offset that will be accessed.
848 + * e.g., if start_offset=0 and 100 bytes to be read, end_offset=99
851 + ADIOI_Calc_my_off_len(fd, count, datatype, file_ptr_type, offset,
852 + &offset_list, &len_list, &start_offset,
853 + &end_offset, &contig_access_count);
855 + /* each process communicates its start and end offsets to other
856 + * processes. The result is an array each of start and end offsets
857 + * stored in order of process rank.
859 + st_offsets = (ADIO_Offset *) ADIOI_Malloc(nprocs * sizeof(ADIO_Offset));
860 + end_offsets = (ADIO_Offset *) ADIOI_Malloc(nprocs * sizeof(ADIO_Offset));
861 + MPI_Allgather(&start_offset, 1, ADIO_OFFSET, st_offsets, 1,
862 + ADIO_OFFSET, fd->comm);
863 + MPI_Allgather(&end_offset, 1, ADIO_OFFSET, end_offsets, 1,
864 + ADIO_OFFSET, fd->comm);
865 + /* are the accesses of different processes interleaved? */
866 + for (i = 1; i < nprocs; i++)
867 + if ((st_offsets[i] < end_offsets[i-1]) &&
868 + (st_offsets[i] <= end_offsets[i]))
869 + interleave_count++;
870 + /* This is a rudimentary check for interleaving, but should suffice
873 + /* Two typical access patterns can benefit from collective write.
874 + * 1) the processes are interleaved, and
875 + * 2) the req size is small.
877 + if (interleave_count > 0) {
880 + do_collect = ADIOI_LUSTRE_Docollect(fd, contig_access_count,
884 + ADIOI_Datatype_iscontig(datatype, &buftype_is_contig);
886 + /* Decide if collective I/O should be done */
887 + if ((!do_collect && fd->hints->cb_write == ADIOI_HINT_AUTO) ||
888 + fd->hints->cb_write == ADIOI_HINT_DISABLE) {
890 + int filerange_is_contig = 0;
892 + /* use independent accesses */
893 + if (fd->hints->cb_write != ADIOI_HINT_DISABLE) {
894 + ADIOI_Free(offset_list);
895 + ADIOI_Free(len_list);
896 + ADIOI_Free(st_offsets);
897 + ADIOI_Free(end_offsets);
900 + fd->fp_ind = orig_fp;
901 + ADIOI_Datatype_iscontig(fd->filetype, &filetype_is_contig);
902 + if (buftype_is_contig && filetype_is_contig) {
903 + if (file_ptr_type == ADIO_EXPLICIT_OFFSET) {
904 + off = fd->disp + (fd->etype_size) * offset;
905 + ADIO_WriteContig(fd, buf, count, datatype,
906 + ADIO_EXPLICIT_OFFSET,
907 + off, status, error_code);
909 + ADIO_WriteContig(fd, buf, count, datatype, ADIO_INDIVIDUAL,
910 + 0, status, error_code);
912 + ADIO_WriteStrided(fd, buf, count, datatype, file_ptr_type,
913 + offset, status, error_code);
918 + /* Get Lustre hints information */
919 + ADIOI_LUSTRE_Get_striping_info(fd, &striping_info, 1);
921 + /* calculate what portions of the access requests of this process are
922 + * located in which process
924 + ADIOI_LUSTRE_Calc_my_req(fd, offset_list, len_list, contig_access_count,
925 + striping_info, nprocs, &count_my_req_procs,
926 + &count_my_req_per_proc, &my_req, &buf_idx);
928 + /* based on everyone's my_req, calculate what requests of other processes
929 + * will be accessed by this process.
930 + * count_others_req_procs = number of processes whose requests (including
931 + * this process itself) will be accessed by this process
932 + * count_others_req_per_proc[i] indicates how many separate contiguous
933 + * requests of proc. i will be accessed by this process.
936 + ADIOI_Calc_others_req(fd, count_my_req_procs, count_my_req_per_proc,
937 + my_req, nprocs, myrank, &count_others_req_procs,
939 + ADIOI_Free(count_my_req_per_proc);
941 + /* exchange data and write in sizes of no more than stripe_size. */
942 + ADIOI_LUSTRE_Exch_and_write(fd, buf, datatype, nprocs, myrank,
943 + others_req, my_req,
944 + offset_list, len_list, contig_access_count,
945 + striping_info, buf_idx, error_code);
947 + /* If this collective write is followed by an independent write,
948 + * it's possible to have those subsequent writes on other processes
949 + * race ahead and sneak in before the read-modify-write completes.
950 + * We carry out a collective communication at the end here so no one
951 + * can start independent i/o before collective I/O completes.
953 + * need to do some gymnastics with the error codes so that if something
954 + * went wrong, all processes report error, but if a process has a more
955 + * specific error code, we can still have that process report the
956 + * additional information */
958 + old_error = *error_code;
959 + if (*error_code != MPI_SUCCESS)
960 + *error_code = MPI_ERR_IO;
962 + /* optimization: if only one process performing i/o, we can perform
963 + * a less-expensive Bcast */
964 +#ifdef ADIOI_MPE_LOGGING
965 + MPE_Log_event(ADIOI_MPE_postwrite_a, 0, NULL);
967 + if (fd->hints->cb_nodes == 1)
968 + MPI_Bcast(error_code, 1, MPI_INT,
969 + fd->hints->ranklist[0], fd->comm);
971 + tmp_error = *error_code;
972 + MPI_Allreduce(&tmp_error, error_code, 1, MPI_INT,
973 + MPI_MAX, fd->comm);
975 +#ifdef ADIOI_MPE_LOGGING
976 + MPE_Log_event(ADIOI_MPE_postwrite_b, 0, NULL);
979 + if ((old_error != MPI_SUCCESS) && (old_error != MPI_ERR_IO))
980 + *error_code = old_error;
983 + if (!buftype_is_contig)
984 + ADIOI_Delete_flattened(datatype);
986 + /* free all memory allocated for collective I/O */
987 + /* free others_req */
988 + for (i = 0; i < nprocs; i++) {
989 + if (others_req[i].count) {
990 + ADIOI_Free(others_req[i].offsets);
991 + ADIOI_Free(others_req[i].lens);
992 + ADIOI_Free(others_req[i].mem_ptrs);
995 + ADIOI_Free(others_req);
996 + /* free my_req here */
997 + for (i = 0; i < nprocs; i++) {
998 + if (my_req[i].count) {
999 + ADIOI_Free(my_req[i].offsets);
1000 + ADIOI_Free(my_req[i].lens);
1003 + ADIOI_Free(my_req);
1004 + ADIOI_Free(buf_idx);
1005 + ADIOI_Free(offset_list);
1006 + ADIOI_Free(len_list);
1007 + ADIOI_Free(st_offsets);
1008 + ADIOI_Free(end_offsets);
1009 + ADIOI_Free(striping_info);
1011 +#ifdef HAVE_STATUS_SET_BYTES
1013 + int bufsize, size;
1014 + /* Don't set status if it isn't needed */
1015 + MPI_Type_size(datatype, &size);
1016 + bufsize = size * count;
1017 + MPIR_Status_set_bytes(status, datatype, bufsize);
1019 + /* This is a temporary way of filling in status. The right way is to
1020 + * keep track of how much data was actually written during collective I/O.
1024 + fd->fp_sys_posn = -1; /* set it to null. */
1027 +/* If successful, error_code is set to MPI_SUCCESS. Otherwise an error
1028 + * code is created and returned in error_code.
1030 +static void ADIOI_LUSTRE_Exch_and_write(ADIO_File fd, void *buf,
1031 + MPI_Datatype datatype, int nprocs,
1032 + int myrank, ADIOI_Access *others_req,
1033 + ADIOI_Access *my_req,
1034 + ADIO_Offset *offset_list,
1035 + int *len_list, int contig_access_count,
1036 + int *striping_info, int *buf_idx,
1039 + /* Send data to appropriate processes and write in sizes of no more
1040 + * than lustre stripe_size.
1041 + * The idea is to reduce the amount of extra memory required for
1042 + * collective I/O. If all data were written all at once, which is much
1043 + * easier, it would require temp space more than the size of user_buf,
1044 + * which is often unacceptable. For example, to write a distributed
1045 + * array to a file, where each local array is 8Mbytes, requiring
1046 + * at least another 8Mbytes of temp space is unacceptable.
1049 + int hole, i, j, m, flag, ntimes = 1 , max_ntimes, buftype_is_contig;
1050 + ADIO_Offset st_loc = -1, end_loc = -1, min_st_loc, max_end_loc;
1051 + ADIO_Offset off, req_off, send_off, iter_st_off, *off_list;
1052 + ADIO_Offset max_size, step_size = 0;
1053 + int real_size, req_len, send_len;
1054 + int *recv_curr_offlen_ptr, *recv_count, *recv_size;
1055 + int *send_curr_offlen_ptr, *send_size;
1056 + int *partial_recv, *sent_to_proc, *recv_start_pos;
1057 + int *send_buf_idx, *curr_to_proc, *done_to_proc;
1058 + char *write_buf = NULL;
1059 + MPI_Status status;
1060 + ADIOI_Flatlist_node *flat_buf = NULL;
1061 + MPI_Aint buftype_extent;
1062 + int stripe_size = striping_info[0], avail_cb_nodes = striping_info[2];
1063 + int data_sieving = 0;
1065 + *error_code = MPI_SUCCESS; /* changed below if error */
1066 + /* only I/O errors are currently reported */
1068 + /* calculate the number of writes of stripe size to be done.
1069 + * That gives the no. of communication phases as well.
1071 + * Because we redistribute data in stripe-contiguous pattern for Lustre,
1072 + * each process has the same no. of communication phases.
1075 + for (i = 0; i < nprocs; i++) {
1076 + if (others_req[i].count) {
1077 + st_loc = others_req[i].offsets[0];
1078 + end_loc = others_req[i].offsets[0];
1082 + for (i = 0; i < nprocs; i++) {
1083 + for (j = 0; j < others_req[i].count; j++) {
1084 + st_loc = ADIOI_MIN(st_loc, others_req[i].offsets[j]);
1085 + end_loc = ADIOI_MAX(end_loc, (others_req[i].offsets[j] +
1086 + others_req[i].lens[j] - 1));
1089 + /* this process does no writing. */
1090 + if ((st_loc == -1) && (end_loc == -1))
1092 + MPI_Allreduce(&end_loc, &max_end_loc, 1, MPI_LONG_LONG_INT, MPI_MAX, fd->comm);
1093 + /* avoid min_st_loc be -1 */
1095 + st_loc = max_end_loc;
1096 + MPI_Allreduce(&st_loc, &min_st_loc, 1, MPI_LONG_LONG_INT, MPI_MIN, fd->comm);
1097 + /* align downward */
1098 + min_st_loc -= min_st_loc % (ADIO_Offset)stripe_size;
1100 + /* Each time, only avail_cb_nodes number of IO clients perform IO,
1101 + * so, step_size=avail_cb_nodes*stripe_size IO will be performed at most,
1102 + * and ntimes=whole_file_portion/step_size
1104 + step_size = (ADIO_Offset) avail_cb_nodes * stripe_size;
1105 + max_ntimes = (int)((max_end_loc - min_st_loc) / step_size + 1);
1107 + write_buf = (char *) ADIOI_Malloc(stripe_size);
1109 + /* calculate the start offset for each iteration */
1110 + off_list = (ADIO_Offset *) ADIOI_Malloc(max_ntimes * sizeof(ADIO_Offset));
1111 + for (m = 0; m < max_ntimes; m ++)
1112 + off_list[m] = max_end_loc;
1113 + for (i = 0; i < nprocs; i++) {
1114 + for (j = 0; j < others_req[i].count; j ++) {
1115 + req_off = others_req[i].offsets[j];
1116 + m = (int)((req_off - min_st_loc) / step_size);
1117 + off_list[m] = ADIOI_MIN(off_list[m], req_off);
1121 + recv_curr_offlen_ptr = (int *) ADIOI_Calloc(nprocs, sizeof(int));
1122 + send_curr_offlen_ptr = (int *) ADIOI_Calloc(nprocs, sizeof(int));
1123 + /* their use is explained below. calloc initializes to 0. */
1125 + recv_count = (int *) ADIOI_Malloc(nprocs * sizeof(int));
1126 + /* to store count of how many off-len pairs per proc are satisfied
1127 + in an iteration. */
1129 + send_size = (int *) ADIOI_Malloc(nprocs * sizeof(int));
1130 + /* total size of data to be sent to each proc. in an iteration.
1131 + Of size nprocs so that I can use MPI_Alltoall later. */
1133 + recv_size = (int *) ADIOI_Malloc(nprocs * sizeof(int));
1134 + /* total size of data to be recd. from each proc. in an iteration. */
1136 + sent_to_proc = (int *) ADIOI_Calloc(nprocs, sizeof(int));
1137 + /* amount of data sent to each proc so far. Used in
1138 + ADIOI_Fill_send_buffer. initialized to 0 here. */
1140 + send_buf_idx = (int *) ADIOI_Malloc(nprocs * sizeof(int));
1141 + curr_to_proc = (int *) ADIOI_Malloc(nprocs * sizeof(int));
1142 + done_to_proc = (int *) ADIOI_Malloc(nprocs * sizeof(int));
1143 + /* Above three are used in ADIOI_Fill_send_buffer */
1145 + recv_start_pos = (int *) ADIOI_Malloc(nprocs * sizeof(int));
1146 + /* used to store the starting value of recv_curr_offlen_ptr[i] in
1149 + ADIOI_Datatype_iscontig(datatype, &buftype_is_contig);
1150 + if (!buftype_is_contig) {
1151 + ADIOI_Flatten_datatype(datatype);
1152 + flat_buf = ADIOI_Flatlist;
1153 + while (flat_buf->type != datatype)
1154 + flat_buf = flat_buf->next;
1156 + MPI_Type_extent(datatype, &buftype_extent);
1157 + /* I need to check if there are any outstanding nonblocking writes to
1158 + * the file, which could potentially interfere with the writes taking
1159 + * place in this collective write call. Since this is not likely to be
1160 + * common, let me do the simplest thing possible here: Each process
1161 + * completes all pending nonblocking operations before completing.
1163 + /*ADIOI_Complete_async(error_code);
1164 + if (*error_code != MPI_SUCCESS) return;
1165 + MPI_Barrier(fd->comm);
1168 + iter_st_off = min_st_loc;
1170 + /* Although we have recognized the data according to OST index,
1171 + * a read-modify-write will be done if there is a hole between the data.
1172 + * For example: if blocksize=60, xfersize=30 and stripe_size=100,
1173 + * then rank0 will collect data [0, 30] and [60, 90] then write. There
1174 + * is a hole in [30, 60], which will cause a read-modify-write in [0, 90].
1176 + * To reduce its impact on the performance, we can disable data sieving
1177 + * by hint "ds_in_coll".
1179 + /* check the hint for data sieving */
1180 + data_sieving = fd->hints->fs_hints.lustre.ds_in_coll;
1182 + for (m = 0; m < max_ntimes; m++) {
1183 + /* go through all others_req and my_req to check which will be received
1184 + * and sent in this iteration.
1187 + /* Note that MPI guarantees that displacements in filetypes are in
1188 + monotonically nondecreasing order and that, for writes, the
1189 + filetypes cannot specify overlapping regions in the file. This
1190 + simplifies implementation a bit compared to reads. */
1193 + off = start offset in the file for the data to be written in
1195 + iter_st_off = start offset of this iteration
1196 + real_size = size of data written (bytes) corresponding to off
1197 + max_size = possible maximum size of data written in this iteration
1198 + req_off = offset in the file for a particular contiguous request minus
1199 + what was satisfied in previous iteration
1200 + send_off = offset the request needed by other processes in this iteration
1201 + req_len = size corresponding to req_off
1202 + send_len = size corresponding to send_off
1205 + /* first calculate what should be communicated */
1206 + for (i = 0; i < nprocs; i++)
1207 + recv_count[i] = recv_size[i] = send_size[i] = 0;
1209 + off = off_list[m];
1210 + max_size = ADIOI_MIN(step_size, max_end_loc - iter_st_off + 1);
1211 + real_size = (int) ADIOI_MIN((off / stripe_size + 1) * stripe_size - off,
1212 + end_loc - off + 1);
1214 + for (i = 0; i < nprocs; i++) {
1215 + if (my_req[i].count) {
1216 + for (j = send_curr_offlen_ptr[i]; j < my_req[i].count; j++) {
1217 + send_off = my_req[i].offsets[j];
1218 + send_len = my_req[i].lens[j];
1219 + if (send_off < iter_st_off + max_size) {
1220 + send_size[i] += send_len;
1225 + send_curr_offlen_ptr[i] = j;
1227 + if (others_req[i].count) {
1228 + recv_start_pos[i] = recv_curr_offlen_ptr[i];
1229 + for (j = recv_curr_offlen_ptr[i]; j < others_req[i].count; j++) {
1230 + req_off = others_req[i].offsets[j];
1231 + req_len = others_req[i].lens[j];
1232 + if (req_off < iter_st_off + max_size) {
1234 + MPI_Address(write_buf + req_off - off,
1235 + &(others_req[i].mem_ptrs[j]));
1236 + recv_size[i] += req_len;
1241 + recv_curr_offlen_ptr[i] = j;
1244 + /* use variable "hole" to pass data_sieving flag into W_Exchange_data */
1245 + hole = data_sieving;
1246 + ADIOI_LUSTRE_W_Exchange_data(fd, buf, write_buf, flat_buf, offset_list,
1247 + len_list, send_size, recv_size, off, real_size,
1248 + recv_count, recv_start_pos, partial_recv,
1249 + sent_to_proc, nprocs, myrank,
1250 + buftype_is_contig, contig_access_count,
1251 + striping_info, others_req, send_buf_idx,
1252 + curr_to_proc, done_to_proc, &hole, m,
1253 + buftype_extent, buf_idx, error_code);
1254 + if (*error_code != MPI_SUCCESS)
1258 + for (i = 0; i < nprocs; i++)
1259 + if (recv_count[i]) {
1264 + /* check whether to do data sieving */
1265 + if(data_sieving == ADIOI_HINT_ENABLE) {
1266 + ADIO_WriteContig(fd, write_buf, real_size, MPI_BYTE,
1267 + ADIO_EXPLICIT_OFFSET, off, &status,
1270 + /* if there is no hole, write data in one time;
1271 + * otherwise, write data in several times */
1273 + ADIO_WriteContig(fd, write_buf, real_size, MPI_BYTE,
1274 + ADIO_EXPLICIT_OFFSET, off, &status,
1277 + for (i = 0; i < nprocs; i++) {
1278 + if (others_req[i].count) {
1279 + for (j = 0; j < others_req[i].count; j++) {
1280 + if (others_req[i].offsets[j] < off + real_size &&
1281 + others_req[i].offsets[j] >= off) {
1282 + ADIO_WriteContig(fd,
1283 + write_buf + others_req[i].offsets[j] - off,
1284 + others_req[i].lens[j],
1285 + MPI_BYTE, ADIO_EXPLICIT_OFFSET,
1286 + others_req[i].offsets[j], &status,
1288 + if (*error_code != MPI_SUCCESS)
1296 + if (*error_code != MPI_SUCCESS)
1299 + iter_st_off += max_size;
1303 + ADIOI_Free(write_buf);
1304 + ADIOI_Free(recv_curr_offlen_ptr);
1305 + ADIOI_Free(send_curr_offlen_ptr);
1306 + ADIOI_Free(recv_count);
1307 + ADIOI_Free(send_size);
1308 + ADIOI_Free(recv_size);
1309 + ADIOI_Free(sent_to_proc);
1310 + ADIOI_Free(recv_start_pos);
1311 + ADIOI_Free(send_buf_idx);
1312 + ADIOI_Free(curr_to_proc);
1313 + ADIOI_Free(done_to_proc);
1314 + ADIOI_Free(off_list);
1317 +/* Sets error_code to MPI_SUCCESS if successful, or creates an error code
1318 + * in the case of error.
1320 +static void ADIOI_LUSTRE_W_Exchange_data(ADIO_File fd, void *buf,
1322 + ADIOI_Flatlist_node *flat_buf,
1323 + ADIO_Offset *offset_list,
1324 + int *len_list, int *send_size,
1325 + int *recv_size, ADIO_Offset off,
1326 + int size, int *count,
1327 + int *start_pos, int *partial_recv,
1328 + int *sent_to_proc, int nprocs,
1329 + int myrank, int buftype_is_contig,
1330 + int contig_access_count,
1331 + int *striping_info,
1332 + ADIOI_Access *others_req,
1333 + int *send_buf_idx,
1334 + int *curr_to_proc, int *done_to_proc,
1335 + int *hole, int iter,
1336 + MPI_Aint buftype_extent,
1337 + int *buf_idx, int *error_code)
1339 + int i, j, nprocs_recv, nprocs_send, err;
1340 + char **send_buf = NULL;
1341 + MPI_Request *requests, *send_req;
1342 + MPI_Datatype *recv_types;
1343 + MPI_Status *statuses, status;
1344 + int *srt_len, sum, sum_recv;
1345 + ADIO_Offset *srt_off;
1346 + int data_sieving = *hole;
1347 + static char myname[] = "ADIOI_W_EXCHANGE_DATA";
1349 + /* create derived datatypes for recv */
1351 + for (i = 0; i < nprocs; i++)
1355 + recv_types = (MPI_Datatype *) ADIOI_Malloc((nprocs_recv + 1) *
1356 + sizeof(MPI_Datatype));
1357 + /* +1 to avoid a 0-size malloc */
1360 + for (i = 0; i < nprocs; i++) {
1361 + if (recv_size[i]) {
1362 + MPI_Type_hindexed(count[i],
1363 + &(others_req[i].lens[start_pos[i]]),
1364 + &(others_req[i].mem_ptrs[start_pos[i]]),
1365 + MPI_BYTE, recv_types + j);
1366 + /* absolute displacements; use MPI_BOTTOM in recv */
1367 + MPI_Type_commit(recv_types + j);
1372 + /* To avoid a read-modify-write,
1373 + * check if there are holes in the data to be written.
1374 + * For this, merge the (sorted) offset lists others_req using a heap-merge.
1378 + for (i = 0; i < nprocs; i++)
1380 + srt_off = (ADIO_Offset *) ADIOI_Malloc((sum + 1) * sizeof(ADIO_Offset));
1381 + srt_len = (int *) ADIOI_Malloc((sum + 1) * sizeof(int));
1382 + /* +1 to avoid a 0-size malloc */
1384 + ADIOI_Heap_merge(others_req, count, srt_off, srt_len, start_pos,
1385 + nprocs, nprocs_recv, sum);
1387 + /* check if there are any holes */
1389 + for (i = 0; i < sum - 1; i++) {
1390 + if (srt_off[i] + srt_len[i] < srt_off[i + 1]) {
1395 + /* In some cases (see John Bent ROMIO REQ # 835), an odd interaction
1396 + * between aggregation, nominally contiguous regions, and cb_buffer_size
1397 + * should be handled with a read-modify-write (otherwise we will write out
1398 + * more data than we receive from everyone else (inclusive), so override
1403 + for (i = 0; i < nprocs; i++)
1404 + sum_recv += recv_size[i];
1405 + if (size > sum_recv)
1408 + /* check the hint for data sieving */
1409 + if (data_sieving == ADIOI_HINT_ENABLE && nprocs_recv && *hole) {
1410 + ADIO_ReadContig(fd, write_buf, size, MPI_BYTE,
1411 + ADIO_EXPLICIT_OFFSET, off, &status, &err);
1412 + // --BEGIN ERROR HANDLING--
1413 + if (err != MPI_SUCCESS) {
1414 + *error_code = MPIO_Err_create_code(err,
1415 + MPIR_ERR_RECOVERABLE,
1418 + "**ioRMWrdwr", 0);
1419 + ADIOI_Free(recv_types);
1420 + ADIOI_Free(srt_off);
1421 + ADIOI_Free(srt_len);
1424 + // --END ERROR HANDLING--
1426 + ADIOI_Free(srt_off);
1427 + ADIOI_Free(srt_len);
1430 + for (i = 0; i < nprocs; i++)
1434 + if (fd->atomicity) {
1435 + /* bug fix from Wei-keng Liao and Kenin Coloma */
1436 + requests = (MPI_Request *) ADIOI_Malloc((nprocs_send + 1) *
1437 + sizeof(MPI_Request));
1438 + send_req = requests;
1440 + requests = (MPI_Request *) ADIOI_Malloc((nprocs_send + nprocs_recv + 1)*
1441 + sizeof(MPI_Request));
1442 + /* +1 to avoid a 0-size malloc */
1444 + /* post receives */
1446 + for (i = 0; i < nprocs; i++) {
1447 + if (recv_size[i]) {
1448 + MPI_Irecv(MPI_BOTTOM, 1, recv_types[j], i,
1449 + myrank + i + 100 * iter, fd->comm, requests + j);
1453 + send_req = requests + nprocs_recv;
1457 + * if buftype_is_contig, data can be directly sent from
1458 + * user buf at location given by buf_idx. else use send_buf.
1460 + if (buftype_is_contig) {
1462 + for (i = 0; i < nprocs; i++)
1463 + if (send_size[i]) {
1464 + MPI_Isend(((char *) buf) + buf_idx[i], send_size[i],
1465 + MPI_BYTE, i, myrank + i + 100 * iter, fd->comm,
1468 + buf_idx[i] += send_size[i];
1470 + } else if (nprocs_send) {
1471 + /* buftype is not contig */
1472 + send_buf = (char **) ADIOI_Malloc(nprocs * sizeof(char *));
1473 + for (i = 0; i < nprocs; i++)
1475 + send_buf[i] = (char *) ADIOI_Malloc(send_size[i]);
1477 + ADIOI_LUSTRE_Fill_send_buffer(fd, buf, flat_buf, send_buf, offset_list,
1478 + len_list, send_size, send_req,
1479 + sent_to_proc, nprocs, myrank,
1480 + contig_access_count, striping_info,
1481 + send_buf_idx, curr_to_proc, done_to_proc,
1482 + iter, buftype_extent);
1483 + /* the send is done in ADIOI_Fill_send_buffer */
1486 + /* bug fix from Wei-keng Liao and Kenin Coloma */
1487 + if (fd->atomicity) {
1489 + for (i = 0; i < nprocs; i++) {
1490 + MPI_Status wkl_status;
1491 + if (recv_size[i]) {
1492 + MPI_Recv(MPI_BOTTOM, 1, recv_types[j], i,
1493 + myrank + i + 100 * iter, fd->comm, &wkl_status);
1499 + for (i = 0; i < nprocs_recv; i++)
1500 + MPI_Type_free(recv_types + i);
1501 + ADIOI_Free(recv_types);
1503 + /* bug fix from Wei-keng Liao and Kenin Coloma */
1504 + /* +1 to avoid a 0-size malloc */
1505 + if (fd->atomicity) {
1506 + statuses = (MPI_Status *) ADIOI_Malloc((nprocs_send + 1) *
1507 + sizeof(MPI_Status));
1509 + statuses = (MPI_Status *) ADIOI_Malloc((nprocs_send + nprocs_recv + 1) *
1510 + sizeof(MPI_Status));
1513 +#ifdef NEEDS_MPI_TEST
1515 + if (fd->atomicity) {
1516 + /* bug fix from Wei-keng Liao and Kenin Coloma */
1518 + MPI_Testall(nprocs_send, send_req, &i, statuses);
1521 + MPI_Testall(nprocs_send + nprocs_recv, requests, &i, statuses);
1524 + /* bug fix from Wei-keng Liao and Kenin Coloma */
1525 + if (fd->atomicity)
1526 + MPI_Waitall(nprocs_send, send_req, statuses);
1528 + MPI_Waitall(nprocs_send + nprocs_recv, requests, statuses);
1530 + ADIOI_Free(statuses);
1531 + ADIOI_Free(requests);
1532 + if (!buftype_is_contig && nprocs_send) {
1533 + for (i = 0; i < nprocs; i++)
1535 + ADIOI_Free(send_buf[i]);
1536 + ADIOI_Free(send_buf);
1540 +#define ADIOI_BUF_INCR \
1542 + while (buf_incr) { \
1543 + size_in_buf = ADIOI_MIN(buf_incr, flat_buf_sz); \
1544 + user_buf_idx += size_in_buf; \
1545 + flat_buf_sz -= size_in_buf; \
1546 + if (!flat_buf_sz) { \
1547 + if (flat_buf_idx < (flat_buf->count - 1)) flat_buf_idx++; \
1549 + flat_buf_idx = 0; \
1552 + user_buf_idx = flat_buf->indices[flat_buf_idx] + \
1553 + n_buftypes*buftype_extent; \
1554 + flat_buf_sz = flat_buf->blocklens[flat_buf_idx]; \
1556 + buf_incr -= size_in_buf; \
1561 +#define ADIOI_BUF_COPY \
1564 + size_in_buf = ADIOI_MIN(size, flat_buf_sz); \
1565 + memcpy(&(send_buf[p][send_buf_idx[p]]), \
1566 + ((char *) buf) + user_buf_idx, size_in_buf); \
1567 + send_buf_idx[p] += size_in_buf; \
1568 + user_buf_idx += size_in_buf; \
1569 + flat_buf_sz -= size_in_buf; \
1570 + if (!flat_buf_sz) { \
1571 + if (flat_buf_idx < (flat_buf->count - 1)) flat_buf_idx++; \
1573 + flat_buf_idx = 0; \
1576 + user_buf_idx = flat_buf->indices[flat_buf_idx] + \
1577 + n_buftypes*buftype_extent; \
1578 + flat_buf_sz = flat_buf->blocklens[flat_buf_idx]; \
1580 + size -= size_in_buf; \
1581 + buf_incr -= size_in_buf; \
1586 +static void ADIOI_LUSTRE_Fill_send_buffer(ADIO_File fd, void *buf,
1587 + ADIOI_Flatlist_node *flat_buf,
1589 + ADIO_Offset *offset_list,
1590 + int *len_list, int *send_size,
1591 + MPI_Request *requests,
1592 + int *sent_to_proc, int nprocs,
1594 + int contig_access_count,
1595 + int *striping_info,
1596 + int *send_buf_idx,
1597 + int *curr_to_proc,
1598 + int *done_to_proc, int iter,
1599 + MPI_Aint buftype_extent)
1601 + /* this function is only called if buftype is not contig */
1602 + int i, p, flat_buf_idx, size;
1603 + int flat_buf_sz, buf_incr, size_in_buf, jj, n_buftypes;
1604 + ADIO_Offset off, len, rem_len, user_buf_idx;
1606 + /* curr_to_proc[p] = amount of data sent to proc. p that has already
1607 + * been accounted for so far
1608 + * done_to_proc[p] = amount of data already sent to proc. p in
1609 + * previous iterations
1610 + * user_buf_idx = current location in user buffer
1611 + * send_buf_idx[p] = current location in send_buf of proc. p
1614 + for (i = 0; i < nprocs; i++) {
1615 + send_buf_idx[i] = curr_to_proc[i] = 0;
1616 + done_to_proc[i] = sent_to_proc[i];
1620 + user_buf_idx = flat_buf->indices[0];
1623 + flat_buf_sz = flat_buf->blocklens[0];
1625 + /* flat_buf_idx = current index into flattened buftype
1626 + * flat_buf_sz = size of current contiguous component in flattened buf
1628 + for (i = 0; i < contig_access_count; i++) {
1629 + off = offset_list[i];
1630 + rem_len = (ADIO_Offset) len_list[i];
1632 + /*this request may span to more than one process */
1633 + while (rem_len != 0) {
1635 + /* NOTE: len value is modified by ADIOI_Calc_aggregator() to be no
1636 + * longer than the single region that processor "p" is responsible
1639 + p = ADIOI_LUSTRE_Calc_aggregator(fd, off, &len, striping_info);
1641 + if (send_buf_idx[p] < send_size[p]) {
1642 + if (curr_to_proc[p] + len > done_to_proc[p]) {
1643 + if (done_to_proc[p] > curr_to_proc[p]) {
1644 + size = (int) ADIOI_MIN(curr_to_proc[p] + len -
1648 + buf_incr = done_to_proc[p] - curr_to_proc[p];
1650 + buf_incr = (int) (curr_to_proc[p] + len -
1652 + curr_to_proc[p] = done_to_proc[p] + size;
1655 + size = (int) ADIOI_MIN(len, send_size[p] -
1657 + buf_incr = (int) len;
1658 + curr_to_proc[p] += size;
1661 + if (send_buf_idx[p] == send_size[p]) {
1662 + MPI_Isend(send_buf[p], send_size[p], MPI_BYTE, p,
1663 + myrank + p + 100 * iter, fd->comm,
1668 + curr_to_proc[p] += (int) len;
1669 + buf_incr = (int) len;
1673 + buf_incr = (int) len;
1680 + for (i = 0; i < nprocs; i++)
1682 + sent_to_proc[i] = curr_to_proc[i];
1684 diff -ruN adio/ad_lustre_orig/ad_lustre_wrstr.c adio/ad_lustre/ad_lustre_wrstr.c
1685 --- adio/ad_lustre_orig/ad_lustre_wrstr.c 1970-01-01 08:00:00.000000000 +0800
1686 +++ adio/ad_lustre/ad_lustre_wrstr.c 2009-02-27 10:35:18.000000000 +0800
1688 +/* -*- Mode: C; c-basic-offset:4 ; -*- */
1690 + * Copyright (C) 1997 University of Chicago.
1691 + * See COPYRIGHT notice in top-level directory.
1693 + * Copyright (C) 2007 Oak Ridge National Laboratory
1695 + * Copyright (C) 2008 Sun Microsystems, Lustre group
1698 +#include "ad_lustre.h"
1699 +#include "adio_extern.h"
1701 +#define ADIOI_BUFFERED_WRITE \
1703 + if (req_off >= writebuf_off + writebuf_len) { \
1704 + if (writebuf_len) { \
1705 + ADIO_WriteContig(fd, writebuf, writebuf_len, MPI_BYTE, \
1706 + ADIO_EXPLICIT_OFFSET, writebuf_off, &status1, error_code); \
1707 + if (!(fd->atomicity)) \
1708 + ADIOI_UNLOCK(fd, writebuf_off, SEEK_SET, writebuf_len); \
1709 + if (*error_code != MPI_SUCCESS) { \
1710 + *error_code = MPIO_Err_create_code(*error_code, \
1711 + MPIR_ERR_RECOVERABLE, myname, \
1712 + __LINE__, MPI_ERR_IO, \
1714 + ADIOI_Free(writebuf); \
1718 + writebuf_off = req_off; \
1719 + /* stripe_size alignment */ \
1720 + writebuf_len = (int) ADIOI_MIN(end_offset - writebuf_off + 1, \
1721 + (writebuf_off / stripe_size + 1) * \
1722 + stripe_size - writebuf_off);\
1723 + if (!(fd->atomicity)) \
1724 + ADIOI_WRITE_LOCK(fd, writebuf_off, SEEK_SET, writebuf_len); \
1725 + ADIO_ReadContig(fd, writebuf, writebuf_len, MPI_BYTE, ADIO_EXPLICIT_OFFSET,\
1726 + writebuf_off, &status1, error_code); \
1727 + if (*error_code != MPI_SUCCESS) { \
1728 + *error_code = MPIO_Err_create_code(*error_code, \
1729 + MPIR_ERR_RECOVERABLE, myname, \
1730 + __LINE__, MPI_ERR_IO, \
1732 + ADIOI_Free(writebuf); \
1736 + write_sz = (int) ADIOI_MIN(req_len, writebuf_off + writebuf_len - req_off); \
1737 + memcpy(writebuf + req_off - writebuf_off, (char *)buf + userbuf_off, write_sz);\
1738 + while (write_sz != req_len) {\
1739 + ADIO_WriteContig(fd, writebuf, writebuf_len, MPI_BYTE, \
1740 + ADIO_EXPLICIT_OFFSET, writebuf_off, &status1, error_code); \
1741 + if (!(fd->atomicity)) \
1742 + ADIOI_UNLOCK(fd, writebuf_off, SEEK_SET, writebuf_len); \
1743 + if (*error_code != MPI_SUCCESS) { \
1744 + *error_code = MPIO_Err_create_code(*error_code, \
1745 + MPIR_ERR_RECOVERABLE, myname, \
1746 + __LINE__, MPI_ERR_IO, \
1748 + ADIOI_Free(writebuf); \
1751 + req_len -= write_sz; \
1752 + userbuf_off += write_sz; \
1753 + writebuf_off += writebuf_len; \
1754 + /* stripe_size alignment */ \
1755 + writebuf_len = (int) ADIOI_MIN(end_offset - writebuf_off + 1, \
1756 + (writebuf_off / stripe_size + 1) * \
1757 + stripe_size - writebuf_off);\
1758 + if (!(fd->atomicity)) \
1759 + ADIOI_WRITE_LOCK(fd, writebuf_off, SEEK_SET, writebuf_len); \
1760 + ADIO_ReadContig(fd, writebuf, writebuf_len, MPI_BYTE, ADIO_EXPLICIT_OFFSET,\
1761 + writebuf_off, &status1, error_code); \
1762 + if (*error_code != MPI_SUCCESS) { \
1763 + *error_code = MPIO_Err_create_code(*error_code, \
1764 + MPIR_ERR_RECOVERABLE, myname, \
1765 + __LINE__, MPI_ERR_IO, \
1767 + ADIOI_Free(writebuf); \
1770 + write_sz = ADIOI_MIN(req_len, writebuf_len); \
1771 + memcpy(writebuf, (char *)buf + userbuf_off, write_sz);\
1776 +/* this macro is used when filetype is contig and buftype is not contig.
1777 + it does not do a read-modify-write and does not lock*/
1778 +#define ADIOI_BUFFERED_WRITE_WITHOUT_READ \
1780 + if (req_off >= writebuf_off + writebuf_len) { \
1781 + writebuf_off = req_off; \
1782 + /* stripe_size alignment */ \
1783 + writebuf_len = (int) ADIOI_MIN(end_offset - writebuf_off + 1, \
1784 + (writebuf_off / stripe_size + 1) * \
1785 + stripe_size - writebuf_off);\
1787 + write_sz = (int) ADIOI_MIN(req_len, writebuf_off + writebuf_len - req_off); \
1788 + memcpy(writebuf + req_off - writebuf_off, (char *)buf + userbuf_off, write_sz);\
1789 + while (req_len) { \
1790 + ADIO_WriteContig(fd, writebuf, writebuf_len, MPI_BYTE, \
1791 + ADIO_EXPLICIT_OFFSET, writebuf_off, &status1, error_code); \
1792 + if (*error_code != MPI_SUCCESS) { \
1793 + *error_code = MPIO_Err_create_code(*error_code, \
1794 + MPIR_ERR_RECOVERABLE, myname, \
1795 + __LINE__, MPI_ERR_IO, \
1797 + ADIOI_Free(writebuf); \
1800 + req_len -= write_sz; \
1801 + userbuf_off += write_sz; \
1802 + writebuf_off += writebuf_len; \
1803 + /* stripe_size alignment */ \
1804 + writebuf_len = (int) ADIOI_MIN(end_offset - writebuf_off + 1, \
1805 + (writebuf_off / stripe_size + 1) * \
1806 + stripe_size - writebuf_off);\
1807 + write_sz = ADIOI_MIN(req_len, writebuf_len); \
1808 + memcpy(writebuf, (char *)buf + userbuf_off, write_sz);\
1812 +void ADIOI_LUSTRE_WriteStrided(ADIO_File fd, void *buf, int count,
1813 + MPI_Datatype datatype, int file_ptr_type,
1814 + ADIO_Offset offset, ADIO_Status * status,
1817 + /* offset is in units of etype relative to the filetype. */
1818 + ADIOI_Flatlist_node *flat_buf, *flat_file;
1819 + int i, j, k, bwr_size, fwr_size = 0, st_index = 0;
1820 + int bufsize, num, size, sum, n_etypes_in_filetype, size_in_filetype;
1821 + int n_filetypes, etype_in_filetype;
1822 + ADIO_Offset abs_off_in_filetype = 0;
1823 + int filetype_size, etype_size, buftype_size, req_len;
1824 + MPI_Aint filetype_extent, buftype_extent;
1825 + int buf_count, buftype_is_contig, filetype_is_contig;
1826 + ADIO_Offset userbuf_off;
1827 + ADIO_Offset off, req_off, disp, end_offset = 0, writebuf_off, start_off;
1829 + int flag, st_fwr_size, st_n_filetypes, writebuf_len, write_sz;
1830 + ADIO_Status status1;
1831 + int new_bwr_size, new_fwr_size;
1833 + static char myname[] = "ADIOI_LUSTRE_WriteStrided";
1835 + MPI_Comm_rank(fd->comm, &myrank);
1837 + if (fd->hints->ds_write == ADIOI_HINT_DISABLE) {
1838 + /* if user has disabled data sieving on writes, use naive
1839 + * approach instead.
1841 + ADIOI_GEN_WriteStrided_naive(fd,
1846 + offset, status, error_code);
1850 + *error_code = MPI_SUCCESS; /* changed below if error */
1852 + ADIOI_Datatype_iscontig(datatype, &buftype_is_contig);
1853 + ADIOI_Datatype_iscontig(fd->filetype, &filetype_is_contig);
1855 + MPI_Type_size(fd->filetype, &filetype_size);
1856 + if (!filetype_size) {
1857 + *error_code = MPI_SUCCESS;
1861 + MPI_Type_extent(fd->filetype, &filetype_extent);
1862 + MPI_Type_size(datatype, &buftype_size);
1863 + MPI_Type_extent(datatype, &buftype_extent);
1864 + etype_size = fd->etype_size;
1866 + bufsize = buftype_size * count;
1868 + /* get striping info */
1869 + stripe_size = fd->hints->striping_unit;
1871 + /* Different buftype to different filetype */
1872 + if (!buftype_is_contig && filetype_is_contig) {
1873 + /* noncontiguous in memory, contiguous in file. */
1874 + ADIOI_Flatten_datatype(datatype);
1875 + flat_buf = ADIOI_Flatlist;
1876 + while (flat_buf->type != datatype)
1877 + flat_buf = flat_buf->next;
1879 + off = (file_ptr_type == ADIO_INDIVIDUAL) ? fd->fp_ind :
1880 + fd->disp + etype_size * offset;
1883 + end_offset = start_off + bufsize - 1;
1884 + writebuf_off = start_off;
1885 + /* write stripe size buffer each time */
1886 + writebuf = (char *) ADIOI_Malloc(ADIOI_MIN(bufsize, stripe_size));
1887 + writebuf_len = (int) ADIOI_MIN(bufsize,
1888 + (writebuf_off / stripe_size + 1) *
1889 + stripe_size - writebuf_off);
1891 + /* if atomicity is true, lock the region to be accessed */
1892 + if (fd->atomicity)
1893 + ADIOI_WRITE_LOCK(fd, start_off, SEEK_SET, bufsize);
1895 + for (j = 0; j < count; j++) {
1896 + for (i = 0; i < flat_buf->count; i++) {
1897 + userbuf_off = j * buftype_extent + flat_buf->indices[i];
1899 + req_len = flat_buf->blocklens[i];
1900 + ADIOI_BUFFERED_WRITE_WITHOUT_READ
1901 + off += flat_buf->blocklens[i];
1905 + /* write the buffer out finally */
1906 + ADIO_WriteContig(fd, writebuf, writebuf_len, MPI_BYTE,
1907 + ADIO_EXPLICIT_OFFSET, writebuf_off, &status1,
1910 + if (fd->atomicity)
1911 + ADIOI_UNLOCK(fd, start_off, SEEK_SET, bufsize);
1912 + if (*error_code != MPI_SUCCESS) {
1913 + ADIOI_Free(writebuf);
1916 + ADIOI_Free(writebuf);
1917 + if (file_ptr_type == ADIO_INDIVIDUAL)
1920 + /* noncontiguous in file */
1921 + /* filetype already flattened in ADIO_Open */
1922 + flat_file = ADIOI_Flatlist;
1923 + while (flat_file->type != fd->filetype)
1924 + flat_file = flat_file->next;
1927 + if (file_ptr_type == ADIO_INDIVIDUAL) {
1928 + offset = fd->fp_ind; /* in bytes */
1933 + for (i = 0; i < flat_file->count; i++) {
1934 + if (disp + flat_file->indices[i] +
1935 + (ADIO_Offset) n_filetypes * filetype_extent +
1936 + flat_file->blocklens[i] >= offset) {
1938 + fwr_size = (int) (disp + flat_file->indices[i] +
1939 + (ADIO_Offset) n_filetypes *
1941 + flat_file->blocklens[i] -
1949 + n_etypes_in_filetype = filetype_size / etype_size;
1950 + n_filetypes = (int) (offset / n_etypes_in_filetype);
1951 + etype_in_filetype = (int) (offset % n_etypes_in_filetype);
1952 + size_in_filetype = etype_in_filetype * etype_size;
1955 + for (i = 0; i < flat_file->count; i++) {
1956 + sum += flat_file->blocklens[i];
1957 + if (sum > size_in_filetype) {
1959 + fwr_size = sum - size_in_filetype;
1960 + abs_off_in_filetype = flat_file->indices[i] +
1961 + size_in_filetype - (sum - flat_file->blocklens[i]);
1966 + /* abs. offset in bytes in the file */
1967 + offset = disp + (ADIO_Offset) n_filetypes *filetype_extent +
1968 + abs_off_in_filetype;
1971 + start_off = offset;
1973 + /* If the file bytes is actually contiguous, we do not need data sieve at all */
1974 + if (bufsize <= fwr_size) {
1975 + req_off = start_off;
1976 + req_len = bufsize;
1977 + end_offset = start_off + bufsize - 1;
1978 + writebuf = (char *) ADIOI_Malloc(ADIOI_MIN(bufsize, stripe_size));
1979 + memset(writebuf, -1, ADIOI_MIN(bufsize, stripe_size));
1983 + ADIOI_BUFFERED_WRITE_WITHOUT_READ
1985 + /* Calculate end_offset, the last byte-offset that will be accessed.
1986 + e.g., if start_offset=0 and 100 bytes to be write, end_offset=99 */
1987 + st_fwr_size = fwr_size;
1988 + st_n_filetypes = n_filetypes;
1992 + fwr_size = ADIOI_MIN(st_fwr_size, bufsize);
1993 + while (i < bufsize) {
1995 + end_offset = off + fwr_size - 1;
1997 + if (j < (flat_file->count - 1))
2004 + off = disp + flat_file->indices[j] +
2005 + (ADIO_Offset) n_filetypes * filetype_extent;
2006 + fwr_size = ADIOI_MIN(flat_file->blocklens[j], bufsize - i);
2011 + writebuf = (char *) ADIOI_Malloc(stripe_size);
2012 + memset(writebuf, -1, stripe_size);
2013 + /* if atomicity is true, lock the region to be accessed */
2014 + if (fd->atomicity)
2015 + ADIOI_WRITE_LOCK(fd, start_off, SEEK_SET, bufsize);
2017 + if (buftype_is_contig && !filetype_is_contig) {
2018 + /* contiguous in memory, noncontiguous in file. should be the most
2023 + n_filetypes = st_n_filetypes;
2024 + fwr_size = ADIOI_MIN(st_fwr_size, bufsize);
2025 + while (i < bufsize) {
2027 + /* TYPE_UB and TYPE_LB can result in
2028 + fwr_size = 0. save system call in such cases */
2030 + lseek(fd->fd_sys, off, SEEK_SET);
2031 + err = write(fd->fd_sys, ((char *) buf) + i, fwr_size);
2034 + req_len = fwr_size;
2036 + ADIOI_BUFFERED_WRITE
2040 + if (off + fwr_size < disp + flat_file->indices[j] +
2041 + flat_file->blocklens[j] +
2042 + (ADIO_Offset) n_filetypes * filetype_extent)
2044 + /* did not reach end of contiguous block in filetype.
2045 + no more I/O needed. off is incremented by fwr_size. */
2047 + if (j < (flat_file->count - 1))
2053 + off = disp + flat_file->indices[j] +
2054 + (ADIO_Offset) n_filetypes * filetype_extent;
2055 + fwr_size = ADIOI_MIN(flat_file->blocklens[j],
2060 + /* noncontiguous in memory as well as in file */
2061 + ADIOI_Flatten_datatype(datatype);
2062 + flat_buf = ADIOI_Flatlist;
2063 + while (flat_buf->type != datatype)
2064 + flat_buf = flat_buf->next;
2066 + k = num = buf_count = 0;
2067 + i = (int) (flat_buf->indices[0]);
2070 + n_filetypes = st_n_filetypes;
2071 + fwr_size = st_fwr_size;
2072 + bwr_size = flat_buf->blocklens[0];
2074 + while (num < bufsize) {
2075 + size = ADIOI_MIN(fwr_size, bwr_size);
2078 + lseek(fd->fd_sys, off, SEEK_SET);
2079 + err = write(fd->fd_sys, ((char *) buf) + i, size);
2084 + ADIOI_BUFFERED_WRITE
2087 + new_fwr_size = fwr_size;
2088 + new_bwr_size = bwr_size;
2090 + if (size == fwr_size) {
2091 + /* reached end of contiguous block in file */
2092 + if (j < (flat_file->count - 1)) {
2098 + off = disp + flat_file->indices[j] +
2099 + (ADIO_Offset) n_filetypes * filetype_extent;
2100 + new_fwr_size = flat_file->blocklens[j];
2101 + if (size != bwr_size) {
2103 + new_bwr_size -= size;
2106 + if (size == bwr_size) {
2107 + /* reached end of contiguous block in memory */
2108 + k = (k + 1) % flat_buf->count;
2110 + i = (int) (buftype_extent *
2111 + (buf_count / flat_buf->count) +
2112 + flat_buf->indices[k]);
2113 + new_bwr_size = flat_buf->blocklens[k];
2114 + if (size != fwr_size) {
2116 + new_fwr_size -= size;
2120 + fwr_size = new_fwr_size;
2121 + bwr_size = new_bwr_size;
2125 + /* write the buffer out finally */
2126 + if (writebuf_len) {
2127 + ADIO_WriteContig(fd, writebuf, writebuf_len,
2128 + MPI_BYTE, ADIO_EXPLICIT_OFFSET,
2129 + writebuf_off, &status1, error_code);
2130 + if (!(fd->atomicity))
2131 + ADIOI_UNLOCK(fd, writebuf_off, SEEK_SET, writebuf_len);
2132 + if (*error_code != MPI_SUCCESS) {
2133 + ADIOI_Free(writebuf);
2137 + if (fd->atomicity)
2138 + ADIOI_UNLOCK(fd, start_off, SEEK_SET, bufsize);
2140 + ADIOI_Free(writebuf);
2141 + if (file_ptr_type == ADIO_INDIVIDUAL)
2144 + fd->fp_sys_posn = -1; /* set it to null. */
2146 +#ifdef HAVE_STATUS_SET_BYTES
2147 + MPIR_Status_set_bytes(status, datatype, bufsize);
2148 + /* This is a temporary way of filling in status. The right way is to
2149 + keep track of how much data was actually written by ADIOI_BUFFERED_WRITE. */
2152 + if (!buftype_is_contig)
2153 + ADIOI_Delete_flattened(datatype);
2155 diff -ruN adio/ad_lustre_orig/Makefile.in adio/ad_lustre/Makefile.in
2156 --- adio/ad_lustre_orig/Makefile.in 2008-09-17 14:36:56.000000000 +0800
2157 +++ adio/ad_lustre/Makefile.in 2008-10-17 17:03:06.000000000 +0800
2161 AD_LUSTRE_OBJECTS = ad_lustre.o ad_lustre_open.o \
2162 - ad_lustre_rwcontig.o ad_lustre_hints.o
2163 + ad_lustre_rwcontig.o ad_lustre_wrcoll.o ad_lustre_wrstr.o \
2164 + ad_lustre_hints.o ad_lustre_aggregate.o
2168 @if [ "@ENABLE_SHLIB@" != "none" ] ; then \
2169 diff -ruN adio/ad_lustre_orig/README adio/ad_lustre/README
2170 --- adio/ad_lustre_orig/README 2008-09-17 14:36:56.000000000 +0800
2171 +++ adio/ad_lustre/README 2009-04-24 09:46:20.000000000 +0800
2173 o To post the code for ParColl (Partitioned collective IO)
2175 -----------------------------------------------------
2177 +-----------------------------------------------------
2178 +Improved data redistribution
2179 + o Improve I/O pattern identification. Besides checking interleaving,
2180 + if request I/O size is small, collective I/O will be performed.
2181 + The hint bigsize can be used to define the req size value.
2182 + o Provide hint CO for load balancing to control the number of
2183 + IO clients for each OST
2184 + o Produce stripe-contiguous I/O pattern that Lustre prefers
2185 + o Control read-modify-write in data sieving in collective IO
2186 + by hint ds_in_coll.
2187 + o Reduce extent lock conflicts by make each OST accessed by one or
2188 + more constant clients.
2190 +-----------------------------------------------------
2192 -----------------------------------------------------
2193 o Direct IO and Lockless IO support
2194 --- adio/common/ad_write_coll_orig.c 2009-02-27 22:06:46.000000000 +0800
2195 +++ adio/common/ad_write_coll.c 2008-10-15 11:25:38.000000000 +0800
2197 int *send_buf_idx, int *curr_to_proc,
2198 int *done_to_proc, int iter,
2199 MPI_Aint buftype_extent);
2200 -static void ADIOI_Heap_merge(ADIOI_Access *others_req, int *count,
2201 +void ADIOI_Heap_merge(ADIOI_Access *others_req, int *count,
2202 ADIO_Offset *srt_off, int *srt_len, int *start_pos,
2203 int nprocs, int nprocs_recv, int total_elements);
2209 -static void ADIOI_Heap_merge(ADIOI_Access *others_req, int *count,
2210 +void ADIOI_Heap_merge(ADIOI_Access *others_req, int *count,
2211 ADIO_Offset *srt_off, int *srt_len, int *start_pos,
2212 int nprocs, int nprocs_recv, int total_elements)