--- configure_orig.in 2009-03-01 13:50:30.000000000 +0800 +++ configure.in 2009-02-27 13:35:42.000000000 +0800 @@ -1123,8 +1123,14 @@ if test -n "$file_system_testfs"; then AC_DEFINE(ROMIO_TESTFS,1,[Define for ROMIO with TESTFS]) fi +# +# Verify presence of lustre/lustre_user.h +# if test -n "$file_system_lustre"; then - AC_DEFINE(ROMIO_LUSTRE,1,[Define for ROMIO with LUSTRE]) + AC_CHECK_HEADERS(lustre/lustre_user.h, + AC_DEFINE(ROMIO_LUSTRE,1,[Define for ROMIO with LUSTRE]), + AC_MSG_ERROR([LUSTRE support requested but cannot find lustre/lustre_user.h header file]) + ) fi if test -n "$file_system_xfs"; then --- adio/include/adioi_orig.h 2009-03-01 14:00:48.000000000 +0800 +++ adio/include/adioi.h 2009-04-24 15:26:44.000000000 +0800 @@ -52,6 +52,12 @@ struct { int debugmask; } pvfs2; + struct { + int start_iodevice; + int co_ratio; + int coll_threshold; + int ds_in_coll; + } lustre; } fs_hints; }; diff -ruN adio/ad_lustre_orig/ad_lustre_aggregate.c adio/ad_lustre/ad_lustre_aggregate.c --- adio/ad_lustre_orig/ad_lustre_aggregate.c 1970-01-01 08:00:00.000000000 +0800 +++ adio/ad_lustre/ad_lustre_aggregate.c 2009-05-05 15:22:40.000000000 +0800 @@ -0,0 +1,304 @@ +/* -*- Mode: C; c-basic-offset:4 ; -*- */ +/* + * Copyright (C) 1997 University of Chicago. + * See COPYRIGHT notice in top-level directory. + * + * Copyright (C) 2007 Oak Ridge National Laboratory + * + * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved. + */ + +#include "ad_lustre.h" +#include "adio_extern.h" + +#undef AGG_DEBUG + +void ADIOI_LUSTRE_Get_striping_info(ADIO_File fd, int **striping_info_ptr, + int mode) +{ + int *striping_info = NULL; + /* get striping information: + * striping_info[0]: stripe_size + * striping_info[1]: stripe_count + * striping_info[2]: avail_cb_nodes + */ + int stripe_size, stripe_count, CO = 1, CO_max = 1, CO_nodes, lflag; + int avail_cb_nodes, divisor, nprocs_for_coll = fd->hints->cb_nodes; + char *value = (char *) ADIOI_Malloc((MPI_MAX_INFO_VAL + 1) * sizeof(char)); + + /* Get hints value */ + /* stripe size */ + stripe_size = fd->hints->striping_unit; + /* stripe count */ + /* stripe_size and stripe_count have been validated in ADIOI_LUSTRE_Open() */ + stripe_count = fd->hints->striping_factor; + + /* Calculate the available number of I/O clients, that is + * avail_cb_nodes=min(cb_nodes, stripe_count*CO), where + * CO=1 by default + */ + if (!mode) { + /* for collective read, + * if "CO" clients access the same OST simultaneously, + * the OST disk seek time would be much. So, to avoid this, + * it might be better if 1 client only accesses 1 OST. + * So, we set CO = 1 to meet the above requirement. + */ + CO = 1; + /*XXX: maybe there are other better way for collective read */ + } else { + /* CO_max: the largest number of IO clients for each ost group */ + CO_max = (nprocs_for_coll - 1)/ stripe_count + 1; + /* CO also has been validated in ADIOI_LUSTRE_Open(), >0 */ + CO = fd->hints->fs_hints.lustre.co_ratio; + CO = ADIOI_MIN(CO_max, CO); + } + /* Calculate how many IO clients we need */ + /* To avoid extent lock conflicts, + * avail_cb_nodes should divide (stripe_count*CO) exactly, + * so that each OST is accessed by only one or more constant clients. */ + CO_nodes = stripe_count * CO; + avail_cb_nodes = ADIOI_MIN(nprocs_for_coll, CO_nodes); + if (avail_cb_nodes < CO_nodes) { + do { + /* find the divisor of CO_nodes */ + divisor = 1; + do { + divisor ++; + } while (CO_nodes % divisor); + CO_nodes = CO_nodes / divisor; + /* if stripe_count*CO is a prime number, change nothing */ + if ((CO_nodes <= avail_cb_nodes) && (CO_nodes != 1)) { + avail_cb_nodes = CO_nodes; + break; + } + } while (CO_nodes != 1); + } + + *striping_info_ptr = (int *) ADIOI_Malloc(3 * sizeof(int)); + striping_info = *striping_info_ptr; + striping_info[0] = stripe_size; + striping_info[1] = stripe_count; + striping_info[2] = avail_cb_nodes; + + ADIOI_Free(value); +} + +int ADIOI_LUSTRE_Calc_aggregator(ADIO_File fd, ADIO_Offset off, + ADIO_Offset *len, int *striping_info) +{ + int rank_index, rank; + ADIO_Offset avail_bytes; + int stripe_size = striping_info[0]; + int avail_cb_nodes = striping_info[2]; + + /* Produce the stripe-contiguous pattern for Lustre */ + rank_index = (int)((off / stripe_size) % avail_cb_nodes); + + /* we index into fd_end with rank_index, and fd_end was allocated to be no + * bigger than fd->hins->cb_nodes. If we ever violate that, we're + * overrunning arrays. Obviously, we should never ever hit this abort + */ + if (rank_index >= fd->hints->cb_nodes) + MPI_Abort(MPI_COMM_WORLD, 1); + + avail_bytes = (off / (ADIO_Offset)stripe_size + 1) * + (ADIO_Offset)stripe_size - off; + if (avail_bytes < *len) { + /* this proc only has part of the requested contig. region */ + *len = avail_bytes; + } + /* map our index to a rank */ + /* NOTE: FOR NOW WE DON'T HAVE A MAPPING...JUST DO 0..NPROCS_FOR_COLL */ + rank = fd->hints->ranklist[rank_index]; + + return rank; +} + +/* ADIOI_LUSTRE_Calc_my_req() - calculate what portions of the access requests + * of this process are located in the file domains of various processes + * (including this one) + */ +void ADIOI_LUSTRE_Calc_my_req(ADIO_File fd, ADIO_Offset *offset_list, + int *len_list, int contig_access_count, + int *striping_info, int nprocs, + int *count_my_req_procs_ptr, + int **count_my_req_per_proc_ptr, + ADIOI_Access **my_req_ptr, + int **buf_idx_ptr) +{ + /* Nothing different from ADIOI_Calc_my_req(), except calling + * ADIOI_Lustre_Calc_aggregator() instead of the old one */ + int *count_my_req_per_proc, count_my_req_procs, *buf_idx; + int i, l, proc; + ADIO_Offset avail_len, rem_len, curr_idx, off; + ADIOI_Access *my_req; + + *count_my_req_per_proc_ptr = (int *) ADIOI_Calloc(nprocs, sizeof(int)); + count_my_req_per_proc = *count_my_req_per_proc_ptr; + /* count_my_req_per_proc[i] gives the no. of contig. requests of this + * process in process i's file domain. calloc initializes to zero. + * I'm allocating memory of size nprocs, so that I can do an + * MPI_Alltoall later on. + */ + + buf_idx = (int *) ADIOI_Malloc(nprocs * sizeof(int)); + /* buf_idx is relevant only if buftype_is_contig. + * buf_idx[i] gives the index into user_buf where data received + * from proc. i should be placed. This allows receives to be done + * without extra buffer. This can't be done if buftype is not contig. + */ + + /* initialize buf_idx to -1 */ + for (i = 0; i < nprocs; i++) + buf_idx[i] = -1; + + /* one pass just to calculate how much space to allocate for my_req; + * contig_access_count was calculated way back in ADIOI_Calc_my_off_len() + */ + for (i = 0; i < contig_access_count; i++) { + /* short circuit offset/len processing if len == 0 + * (zero-byte read/write + */ + if (len_list[i] == 0) + continue; + off = offset_list[i]; + avail_len = len_list[i]; + /* note: we set avail_len to be the total size of the access. + * then ADIOI_LUSTRE_Calc_aggregator() will modify the value to return + * the amount that was available. + */ + proc = ADIOI_LUSTRE_Calc_aggregator(fd, off, &avail_len, striping_info); + count_my_req_per_proc[proc]++; + + /* figure out how many data is remaining in the access + * we'll take care of this data (if there is any) + * in the while loop below. + */ + rem_len = len_list[i] - avail_len; + + while (rem_len != 0) { + off += avail_len; /* point to first remaining byte */ + avail_len = rem_len; /* save remaining size, pass to calc */ + proc = ADIOI_LUSTRE_Calc_aggregator(fd, off, &avail_len, striping_info); + count_my_req_per_proc[proc]++; + rem_len -= avail_len; /* reduce remaining length by amount from fd */ + } + } + + /* now allocate space for my_req, offset, and len */ + *my_req_ptr = (ADIOI_Access *) ADIOI_Malloc(nprocs * sizeof(ADIOI_Access)); + my_req = *my_req_ptr; + + count_my_req_procs = 0; + for (i = 0; i < nprocs; i++) { + if (count_my_req_per_proc[i]) { + my_req[i].offsets = (ADIO_Offset *) + ADIOI_Malloc(count_my_req_per_proc[i] * + sizeof(ADIO_Offset)); + my_req[i].lens = (int *) ADIOI_Malloc(count_my_req_per_proc[i] * + sizeof(int)); + count_my_req_procs++; + } + my_req[i].count = 0; /* will be incremented where needed later */ + } + + /* now fill in my_req */ + curr_idx = 0; + for (i = 0; i < contig_access_count; i++) { + /* short circuit offset/len processing if len == 0 + * (zero-byte read/write */ + if (len_list[i] == 0) + continue; + off = offset_list[i]; + avail_len = len_list[i]; + proc = ADIOI_LUSTRE_Calc_aggregator(fd, off, &avail_len, striping_info); + + /* for each separate contiguous access from this process */ + if (buf_idx[proc] == -1) + buf_idx[proc] = (int) curr_idx; + + l = my_req[proc].count; + curr_idx += (int) avail_len; /* NOTE: Why is curr_idx an int? Fix? */ + + rem_len = len_list[i] - avail_len; + + /* store the proc, offset, and len information in an array + * of structures, my_req. Each structure contains the + * offsets and lengths located in that process's FD, + * and the associated count. + */ + my_req[proc].offsets[l] = off; + my_req[proc].lens[l] = (int) avail_len; + my_req[proc].count++; + + while (rem_len != 0) { + off += avail_len; + avail_len = rem_len; + proc = ADIOI_LUSTRE_Calc_aggregator(fd, off, &avail_len, + striping_info); + if (buf_idx[proc] == -1) + buf_idx[proc] = (int) curr_idx; + + l = my_req[proc].count; + curr_idx += avail_len; + rem_len -= avail_len; + + my_req[proc].offsets[l] = off; + my_req[proc].lens[l] = (int) avail_len; + my_req[proc].count++; + } + } + +#ifdef AGG_DEBUG + for (i = 0; i < nprocs; i++) { + if (count_my_req_per_proc[i] > 0) { + FPRINTF(stdout, "data needed from %d (count = %d):\n", + i, my_req[i].count); + for (l = 0; l < my_req[i].count; l++) { + FPRINTF(stdout, " off[%d] = %lld, len[%d] = %d\n", + l, my_req[i].offsets[l], l, my_req[i].lens[l]); + } + } + } +#if 0 + for (i = 0; i < nprocs; i++) { + FPRINTF(stdout, "buf_idx[%d] = 0x%x\n", i, buf_idx[i]); + } +#endif +#endif + + *count_my_req_procs_ptr = count_my_req_procs; + *buf_idx_ptr = buf_idx; +} + +int ADIOI_LUSTRE_Docollect(ADIO_File fd, int contig_access_count, + int *len_list, int nprocs) +{ + /* If the processes are non-interleaved, we will check the req_size. + * if (avg_req_size > big_req_size) { + * docollect = 0; + * } + */ + + int i, docollect = 1, lflag, big_req_size = 0; + ADIO_Offset req_size = 0, total_req_size; + int avg_req_size, total_access_count; + + /* calculate total_req_size and total_access_count */ + for (i = 0; i < contig_access_count; i++) + req_size += len_list[i]; + MPI_Allreduce(&req_size, &total_req_size, 1, MPI_LONG_LONG_INT, MPI_SUM, + fd->comm); + MPI_Allreduce(&contig_access_count, &total_access_count, 1, MPI_INT, MPI_SUM, + fd->comm); + /* estimate average req_size */ + avg_req_size = (int)(total_req_size / total_access_count); + /* get hint of big_req_size */ + big_req_size = fd->hints->fs_hints.lustre.coll_threshold; + /* Don't perform collective I/O if there are big requests */ + if ((big_req_size > 0) && (avg_req_size > big_req_size)) + docollect = 0; + + return docollect; +} diff -ruN adio/ad_lustre_orig/ad_lustre.c adio/ad_lustre/ad_lustre.c --- adio/ad_lustre_orig/ad_lustre.c 2008-09-17 14:36:56.000000000 +0800 +++ adio/ad_lustre/ad_lustre.c 2008-10-17 17:03:42.000000000 +0800 @@ -1,9 +1,11 @@ /* -*- Mode: C; c-basic-offset:4 ; -*- */ -/* - * Copyright (C) 2001 University of Chicago. +/* + * Copyright (C) 2001 University of Chicago. * See COPYRIGHT notice in top-level directory. * * Copyright (C) 2007 Oak Ridge National Laboratory + * + * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved. */ #include "ad_lustre.h" @@ -13,12 +15,12 @@ ADIOI_LUSTRE_ReadContig, /* ReadContig */ ADIOI_LUSTRE_WriteContig, /* WriteContig */ ADIOI_GEN_ReadStridedColl, /* ReadStridedColl */ - ADIOI_GEN_WriteStridedColl, /* WriteStridedColl */ + ADIOI_LUSTRE_WriteStridedColl, /* WriteStridedColl */ ADIOI_GEN_SeekIndividual, /* SeekIndividual */ ADIOI_GEN_Fcntl, /* Fcntl */ ADIOI_LUSTRE_SetInfo, /* SetInfo */ ADIOI_GEN_ReadStrided, /* ReadStrided */ - ADIOI_GEN_WriteStrided, /* WriteStrided */ + ADIOI_LUSTRE_WriteStrided, /* WriteStrided */ ADIOI_GEN_Close, /* Close */ #if defined(ROMIO_HAVE_WORKING_AIO) && !defined(CRAY_XT_LUSTRE) ADIOI_GEN_IreadContig, /* IreadContig */ diff -ruN adio/ad_lustre_orig/ad_lustre.h adio/ad_lustre/ad_lustre.h --- adio/ad_lustre_orig/ad_lustre.h 2008-09-17 14:36:56.000000000 +0800 +++ adio/ad_lustre/ad_lustre.h 2009-05-05 15:34:58.000000000 +0800 @@ -1,9 +1,11 @@ /* -*- Mode: C; c-basic-offset:4 ; -*- */ -/* - * Copyright (C) 1997 University of Chicago. +/* + * Copyright (C) 1997 University of Chicago. * See COPYRIGHT notice in top-level directory. * * Copyright (C) 2007 Oak Ridge National Laboratory + * + * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved. */ #ifndef AD_UNIX_INCLUDE @@ -24,7 +26,7 @@ /*#include */ #include -#include "lustre/lustre_user.h" +#include #include "adio.h" /*#include "adioi.h"*/ @@ -41,24 +43,31 @@ void ADIOI_LUSTRE_Open(ADIO_File fd, int *error_code); void ADIOI_LUSTRE_Close(ADIO_File fd, int *error_code); -void ADIOI_LUSTRE_ReadContig(ADIO_File fd, void *buf, int count, - MPI_Datatype datatype, int file_ptr_type, - ADIO_Offset offset, ADIO_Status *status, int - *error_code); -void ADIOI_LUSTRE_WriteContig(ADIO_File fd, void *buf, int count, - MPI_Datatype datatype, int file_ptr_type, - ADIO_Offset offset, ADIO_Status *status, int - *error_code); +void ADIOI_LUSTRE_ReadContig(ADIO_File fd, void *buf, int count, + MPI_Datatype datatype, int file_ptr_type, + ADIO_Offset offset, ADIO_Status *status, + int *error_code); +void ADIOI_LUSTRE_WriteContig(ADIO_File fd, void *buf, int count, + MPI_Datatype datatype, int file_ptr_type, + ADIO_Offset offset, ADIO_Status *status, + int *error_code); +void ADIOI_LUSTRE_WriteStrided(ADIO_File fd, void *buf, int count, + MPI_Datatype datatype, int file_ptr_type, + ADIO_Offset offset, ADIO_Status *status, + int *error_code); void ADIOI_LUSTRE_WriteStridedColl(ADIO_File fd, void *buf, int count, - MPI_Datatype datatype, int file_ptr_type, - ADIO_Offset offset, ADIO_Status *status, int - *error_code); + MPI_Datatype datatype, int file_ptr_type, + ADIO_Offset offset, ADIO_Status *status, + int *error_code); void ADIOI_LUSTRE_ReadStridedColl(ADIO_File fd, void *buf, int count, - MPI_Datatype datatype, int file_ptr_type, - ADIO_Offset offset, ADIO_Status *status, int - *error_code); + MPI_Datatype datatype, int file_ptr_type, + ADIO_Offset offset, ADIO_Status *status, + int *error_code); +void ADIOI_LUSTRE_ReadStrided(ADIO_File fd, void *buf, int count, + MPI_Datatype datatype, int file_ptr_type, + ADIO_Offset offset, ADIO_Status *status, + int *error_code); void ADIOI_LUSTRE_Fcntl(ADIO_File fd, int flag, ADIO_Fcntl_t *fcntl_struct, int *error_code); void ADIOI_LUSTRE_SetInfo(ADIO_File fd, MPI_Info users_info, int *error_code); - #endif /* End of AD_UNIX_INCLUDE */ diff -ruN adio/ad_lustre_orig/ad_lustre_hints.c adio/ad_lustre/ad_lustre_hints.c --- adio/ad_lustre_orig/ad_lustre_hints.c 2008-09-17 14:36:56.000000000 +0800 +++ adio/ad_lustre/ad_lustre_hints.c 2009-04-24 15:35:05.000000000 +0800 @@ -1,9 +1,11 @@ /* -*- Mode: C; c-basic-offset:4 ; -*- */ -/* - * Copyright (C) 1997 University of Chicago. +/* + * Copyright (C) 1997 University of Chicago. * See COPYRIGHT notice in top-level directory. * * Copyright (C) 2007 Oak Ridge National Laboratory + * + * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved. */ #include "ad_lustre.h" @@ -12,46 +14,56 @@ void ADIOI_LUSTRE_SetInfo(ADIO_File fd, MPI_Info users_info, int *error_code) { char *value, *value_in_fd; - int flag, tmp_val[3], str_factor=-1, str_unit=0, start_iodev=-1; + int flag, stripe_val[3], str_factor = -1, str_unit=0, start_iodev=-1; struct lov_user_md lum = { 0 }; int err, myrank, fd_sys, perm, amode, old_mask; + int int_val, tmp_val; + static char myname[] = "ADIOI_LUSTRE_SETINFO"; value = (char *) ADIOI_Malloc((MPI_MAX_INFO_VAL+1)*sizeof(char)); if ( (fd->info) == MPI_INFO_NULL) { - /* This must be part of the open call. can set striping parameters - if necessary. */ + /* This must be part of the open call. can set striping parameters + if necessary. */ MPI_Info_create(&(fd->info)); MPI_Info_set(fd->info, "direct_read", "false"); MPI_Info_set(fd->info, "direct_write", "false"); fd->direct_read = fd->direct_write = 0; - - /* has user specified striping or server buffering parameters + /* initialize lustre hints */ + MPI_Info_set(fd->info, "romio_lustre_co_ratio", "1"); + fd->hints->fs_hints.lustre.co_ratio = 1; + MPI_Info_set(fd->info, "romio_lustre_coll_threshold", "0"); + fd->hints->fs_hints.lustre.coll_threshold = 0; + MPI_Info_set(fd->info, "romio_lustre_ds_in_coll", "enable"); + fd->hints->fs_hints.lustre.ds_in_coll = ADIOI_HINT_ENABLE; + + /* has user specified striping or server buffering parameters and do they have the same value on all processes? */ if (users_info != MPI_INFO_NULL) { - MPI_Info_get(users_info, "striping_unit", MPI_MAX_INFO_VAL, + /* striping information */ + MPI_Info_get(users_info, "striping_unit", MPI_MAX_INFO_VAL, value, &flag); - if (flag) + if (flag) str_unit=atoi(value); - MPI_Info_get(users_info, "striping_factor", MPI_MAX_INFO_VAL, + MPI_Info_get(users_info, "striping_factor", MPI_MAX_INFO_VAL, value, &flag); - if (flag) + if (flag) str_factor=atoi(value); - MPI_Info_get(users_info, "start_iodevice", MPI_MAX_INFO_VAL, - value, &flag); - if (flag) + MPI_Info_get(users_info, "romio_lustre_start_iodevice", + MPI_MAX_INFO_VAL, value, &flag); + if (flag) start_iodev=atoi(value); - MPI_Info_get(users_info, "direct_read", MPI_MAX_INFO_VAL, - value, &flag); + /* direct read and write */ + MPI_Info_get(users_info, "direct_read", MPI_MAX_INFO_VAL, + value, &flag); if (flag && (!strcmp(value, "true") || !strcmp(value, "TRUE"))) { MPI_Info_set(fd->info, "direct_read", "true"); fd->direct_read = 1; } - - MPI_Info_get(users_info, "direct_write", MPI_MAX_INFO_VAL, + MPI_Info_get(users_info, "direct_write", MPI_MAX_INFO_VAL, value, &flag); if (flag && (!strcmp(value, "true") || !strcmp(value, "TRUE"))) { MPI_Info_set(fd->info, "direct_write", "true"); @@ -59,22 +71,23 @@ } } + /* set striping information with ioctl */ MPI_Comm_rank(fd->comm, &myrank); if (myrank == 0) { - tmp_val[0] = str_factor; - tmp_val[1] = str_unit; - tmp_val[2] = start_iodev; + stripe_val[0] = str_factor; + stripe_val[1] = str_unit; + stripe_val[2] = start_iodev; } - MPI_Bcast(tmp_val, 3, MPI_INT, 0, fd->comm); + MPI_Bcast(stripe_val, 3, MPI_INT, 0, fd->comm); - if (tmp_val[0] != str_factor - || tmp_val[1] != str_unit - || tmp_val[2] != start_iodev) { + if (stripe_val[0] != str_factor + || stripe_val[1] != str_unit + || stripe_val[2] != start_iodev) { FPRINTF(stderr, "ADIOI_LUSTRE_SetInfo: All keys" "-striping_factor:striping_unit:start_iodevice " "need to be identical across all processes\n"); MPI_Abort(MPI_COMM_WORLD, 1); - } else if ((str_factor > 0) || (str_unit > 0) || (start_iodev >= 0)) { + } else if ((str_factor > 0) || (str_unit > 0) || (start_iodev >= 0)) { /* if user has specified striping info, process 0 tries to set it */ if (!myrank) { if (fd->perm == ADIO_PERM_NULL) { @@ -100,9 +113,9 @@ amode = amode | O_LOV_DELAY_CREATE | O_CREAT; fd_sys = open(fd->filename, amode, perm); - if (fd_sys == -1) { - if (errno != EEXIST) - fprintf(stderr, + if (fd_sys == -1) { + if (errno != EEXIST) + fprintf(stderr, "Failure to open file %s %d %d\n",strerror(errno), amode, perm); } else { lum.lmm_magic = LOV_USER_MAGIC; @@ -112,25 +125,73 @@ lum.lmm_stripe_offset = start_iodev; err = ioctl(fd_sys, LL_IOC_LOV_SETSTRIPE, &lum); - if (err == -1 && errno != EEXIST) { + if (err == -1 && errno != EEXIST) { fprintf(stderr, "Failure to set stripe info %s \n", strerror(errno)); } close(fd_sys); } } /* End of striping parameters validation */ } - MPI_Barrier(fd->comm); - /* set the values for collective I/O and data sieving parameters */ - ADIOI_GEN_SetInfo(fd, users_info, error_code); - } else { - /* The file has been opened previously and fd->fd_sys is a valid - file descriptor. cannot set striping parameters now. */ - - /* set the values for collective I/O and data sieving parameters */ - ADIOI_GEN_SetInfo(fd, users_info, error_code); } - + /* get other hint */ + if (users_info != MPI_INFO_NULL) { + /* CO: IO Clients/OST, + * to keep the load balancing between clients and OSTs */ + MPI_Info_get(users_info, "romio_lustre_co_ratio", MPI_MAX_INFO_VAL, value, + &flag); + if (flag && (int_val = atoi(value)) > 0) { + tmp_val = int_val; + MPI_Bcast(&tmp_val, 1, MPI_INT, 0, fd->comm); + if (tmp_val != int_val) { + MPIO_ERR_CREATE_CODE_INFO_NOT_SAME(myname, + "romio_lustre_co_ratio", + error_code); + ADIOI_Free(value); + return; + } + MPI_Info_set(fd->info, "romio_lustre_co_ratio", value); + fd->hints->fs_hints.lustre.co_ratio = atoi(value); + } + /* coll_threshold: + * if the req size is bigger than this, collective IO may not be performed. + */ + MPI_Info_get(users_info, "romio_lustre_coll_threshold", MPI_MAX_INFO_VAL, value, + &flag); + if (flag && (int_val = atoi(value)) > 0) { + tmp_val = int_val; + MPI_Bcast(&tmp_val, 1, MPI_INT, 0, fd->comm); + if (tmp_val != int_val) { + MPIO_ERR_CREATE_CODE_INFO_NOT_SAME(myname, + "romio_lustre_coll_threshold", + error_code); + ADIOI_Free(value); + return; + } + MPI_Info_set(fd->info, "romio_lustre_coll_threshold", value); + fd->hints->fs_hints.lustre.coll_threshold = atoi(value); + } + /* ds_in_coll: disable data sieving in collective IO */ + MPI_Info_get(users_info, "romio_lustre_ds_in_coll", MPI_MAX_INFO_VAL, + value, &flag); + if (flag && (!strcmp(value, "disable") || + !strcmp(value, "DISABLE"))) { + tmp_val = int_val = 2; + MPI_Bcast(&tmp_val, 2, MPI_INT, 0, fd->comm); + if (tmp_val != int_val) { + MPIO_ERR_CREATE_CODE_INFO_NOT_SAME(myname, + "romio_lustre_ds_in_coll", + error_code); + ADIOI_Free(value); + return; + } + MPI_Info_set(fd->info, "romio_lustre_ds_in_coll", "disable"); + fd->hints->fs_hints.lustre.ds_in_coll = ADIOI_HINT_DISABLE; + } + } + /* set the values for collective I/O and data sieving parameters */ + ADIOI_GEN_SetInfo(fd, users_info, error_code); + if (ADIOI_Direct_read) fd->direct_read = 1; if (ADIOI_Direct_write) fd->direct_write = 1; diff -ruN adio/ad_lustre_orig/ad_lustre_open.c adio/ad_lustre/ad_lustre_open.c --- adio/ad_lustre_orig/ad_lustre_open.c 2008-09-17 14:36:56.000000000 +0800 +++ adio/ad_lustre/ad_lustre_open.c 2009-03-01 11:32:32.000000000 +0800 @@ -1,9 +1,11 @@ /* -*- Mode: C; c-basic-offset:4 ; -*- */ -/* - * Copyright (C) 1997 University of Chicago. +/* + * Copyright (C) 1997 University of Chicago. * See COPYRIGHT notice in top-level directory. * * Copyright (C) 2007 Oak Ridge National Laboratory + * + * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved. */ #include "ad_lustre.h" @@ -51,14 +53,17 @@ err = ioctl(fd->fd_sys, LL_IOC_LOV_GETSTRIPE, (void *) &lum); if (!err) { + fd->hints->striping_unit = lum.lmm_stripe_size; sprintf(value, "%d", lum.lmm_stripe_size); MPI_Info_set(fd->info, "striping_unit", value); + fd->hints->striping_factor = lum.lmm_stripe_count; sprintf(value, "%d", lum.lmm_stripe_count); MPI_Info_set(fd->info, "striping_factor", value); + fd->hints->fs_hints.lustre.start_iodevice = lum.lmm_stripe_offset; sprintf(value, "%d", lum.lmm_stripe_offset); - MPI_Info_set(fd->info, "start_iodevice", value); + MPI_Info_set(fd->info, "romio_lustre_start_iodevice", value); } ADIOI_Free(value); diff -ruN adio/ad_lustre_orig/ad_lustre_rwcontig.c adio/ad_lustre/ad_lustre_rwcontig.c --- adio/ad_lustre_orig/ad_lustre_rwcontig.c 2008-09-17 14:36:56.000000000 +0800 +++ adio/ad_lustre/ad_lustre_rwcontig.c 2009-05-05 15:34:29.000000000 +0800 @@ -1,9 +1,11 @@ /* -*- Mode: C; c-basic-offset:4 ; -*- */ -/* - * Copyright (C) 1997 University of Chicago. +/* + * Copyright (C) 1997 University of Chicago. * See COPYRIGHT notice in top-level directory. * * Copyright (C) 2007 Oak Ridge National Laboratory + * + * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved. */ #define _XOPEN_SOURCE 600 @@ -136,10 +138,23 @@ if (err == -1) goto ioerr; } - if (io_mode) + if (io_mode) { +#ifdef ADIOI_MPE_LOGGING + MPE_Log_event(ADIOI_MPE_write_a, 0, NULL); +#endif err = write(fd->fd_sys, buf, len); - else +#ifdef ADIOI_MPE_LOGGING + MPE_Log_event(ADIOI_MPE_write_b, 0, NULL); +#endif + } else { +#ifdef ADIOI_MPE_LOGGING + MPE_Log_event(ADIOI_MPE_read_a, 0, NULL); +#endif err = read(fd->fd_sys, buf, len); +#ifdef ADIOI_MPE_LOGGING + MPE_Log_event(ADIOI_MPE_read_b, 0, NULL); +#endif + } } else { err = ADIOI_LUSTRE_Directio(fd, buf, len, offset, io_mode); } diff -ruN adio/ad_lustre_orig/ad_lustre_wrcoll.c adio/ad_lustre/ad_lustre_wrcoll.c --- adio/ad_lustre_orig/ad_lustre_wrcoll.c 1970-01-01 08:00:00.000000000 +0800 +++ adio/ad_lustre/ad_lustre_wrcoll.c 2009-04-24 14:48:34.000000000 +0800 @@ -0,0 +1,934 @@ +/* -*- Mode: C; c-basic-offset:4 ; -*- */ +/* + * Copyright (C) 1997 University of Chicago. + * See COPYRIGHT notice in top-level directory. + * + * Copyright (C) 2007 Oak Ridge National Laboratory + * + * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved. + */ + +#include "ad_lustre.h" +#include "adio_extern.h" + +/* prototypes of functions used for collective writes only. */ +static void ADIOI_LUSTRE_Exch_and_write(ADIO_File fd, void *buf, + MPI_Datatype datatype, int nprocs, + int myrank, + ADIOI_Access *others_req, + ADIOI_Access *my_req, + ADIO_Offset *offset_list, + int *len_list, + int contig_access_count, + int *striping_info, + int *buf_idx, int *error_code); +static void ADIOI_LUSTRE_Fill_send_buffer(ADIO_File fd, void *buf, + ADIOI_Flatlist_node *flat_buf, + char **send_buf, + ADIO_Offset *offset_list, + int *len_list, int *send_size, + MPI_Request *requests, + int *sent_to_proc, int nprocs, + int myrank, int contig_access_count, + int *striping_info, + int *send_buf_idx, + int *curr_to_proc, + int *done_to_proc, int iter, + MPI_Aint buftype_extent); +static void ADIOI_LUSTRE_W_Exchange_data(ADIO_File fd, void *buf, + char *write_buf, + ADIOI_Flatlist_node *flat_buf, + ADIO_Offset *offset_list, + int *len_list, int *send_size, + int *recv_size, ADIO_Offset off, + int size, int *count, + int *start_pos, int *partial_recv, + int *sent_to_proc, int nprocs, + int myrank, int buftype_is_contig, + int contig_access_count, + int *striping_info, + ADIOI_Access *others_req, + int *send_buf_idx, + int *curr_to_proc, + int *done_to_proc, int *hole, + int iter, MPI_Aint buftype_extent, + int *buf_idx, int *error_code); +void ADIOI_Heap_merge(ADIOI_Access *others_req, int *count, + ADIO_Offset *srt_off, int *srt_len, int *start_pos, + int nprocs, int nprocs_recv, int total_elements); + +void ADIOI_LUSTRE_WriteStridedColl(ADIO_File fd, void *buf, int count, + MPI_Datatype datatype, + int file_ptr_type, ADIO_Offset offset, + ADIO_Status *status, int *error_code) +{ + /* Uses a generalized version of the extended two-phase method described + * in "An Extended Two-Phase Method for Accessing Sections of + * Out-of-Core Arrays", Rajeev Thakur and Alok Choudhary, + * Scientific Programming, (5)4:301--317, Winter 1996. + * http://www.mcs.anl.gov/home/thakur/ext2ph.ps + */ + + ADIOI_Access *my_req; + /* array of nprocs access structures, one for each other process has + this process's request */ + + ADIOI_Access *others_req; + /* array of nprocs access structures, one for each other process + whose request is written by this process. */ + + int i, filetype_is_contig, nprocs, myrank, do_collect = 0; + int contig_access_count = 0, buftype_is_contig, interleave_count = 0; + int *count_my_req_per_proc, count_my_req_procs, count_others_req_procs; + ADIO_Offset orig_fp, start_offset, end_offset, off; + ADIO_Offset *offset_list = NULL, *st_offsets = NULL, *end_offsets = NULL; + int *buf_idx = NULL, *len_list = NULL, *striping_info = NULL; + int old_error, tmp_error; + + MPI_Comm_size(fd->comm, &nprocs); + MPI_Comm_rank(fd->comm, &myrank); + + orig_fp = fd->fp_ind; + + /* IO patten identification if cb_write isn't disabled */ + if (fd->hints->cb_write != ADIOI_HINT_DISABLE) { + /* For this process's request, calculate the list of offsets and + lengths in the file and determine the start and end offsets. */ + + /* Note: end_offset points to the last byte-offset that will be accessed. + * e.g., if start_offset=0 and 100 bytes to be read, end_offset=99 + */ + + ADIOI_Calc_my_off_len(fd, count, datatype, file_ptr_type, offset, + &offset_list, &len_list, &start_offset, + &end_offset, &contig_access_count); + + /* each process communicates its start and end offsets to other + * processes. The result is an array each of start and end offsets + * stored in order of process rank. + */ + st_offsets = (ADIO_Offset *) ADIOI_Malloc(nprocs * sizeof(ADIO_Offset)); + end_offsets = (ADIO_Offset *) ADIOI_Malloc(nprocs * sizeof(ADIO_Offset)); + MPI_Allgather(&start_offset, 1, ADIO_OFFSET, st_offsets, 1, + ADIO_OFFSET, fd->comm); + MPI_Allgather(&end_offset, 1, ADIO_OFFSET, end_offsets, 1, + ADIO_OFFSET, fd->comm); + /* are the accesses of different processes interleaved? */ + for (i = 1; i < nprocs; i++) + if ((st_offsets[i] < end_offsets[i-1]) && + (st_offsets[i] <= end_offsets[i])) + interleave_count++; + /* This is a rudimentary check for interleaving, but should suffice + for the moment. */ + + /* Two typical access patterns can benefit from collective write. + * 1) the processes are interleaved, and + * 2) the req size is small. + */ + if (interleave_count > 0) { + do_collect = 1; + } else { + do_collect = ADIOI_LUSTRE_Docollect(fd, contig_access_count, + len_list, nprocs); + } + } + ADIOI_Datatype_iscontig(datatype, &buftype_is_contig); + + /* Decide if collective I/O should be done */ + if ((!do_collect && fd->hints->cb_write == ADIOI_HINT_AUTO) || + fd->hints->cb_write == ADIOI_HINT_DISABLE) { + + int filerange_is_contig = 0; + + /* use independent accesses */ + if (fd->hints->cb_write != ADIOI_HINT_DISABLE) { + ADIOI_Free(offset_list); + ADIOI_Free(len_list); + ADIOI_Free(st_offsets); + ADIOI_Free(end_offsets); + } + + fd->fp_ind = orig_fp; + ADIOI_Datatype_iscontig(fd->filetype, &filetype_is_contig); + if (buftype_is_contig && filetype_is_contig) { + if (file_ptr_type == ADIO_EXPLICIT_OFFSET) { + off = fd->disp + (fd->etype_size) * offset; + ADIO_WriteContig(fd, buf, count, datatype, + ADIO_EXPLICIT_OFFSET, + off, status, error_code); + } else + ADIO_WriteContig(fd, buf, count, datatype, ADIO_INDIVIDUAL, + 0, status, error_code); + } else { + ADIO_WriteStrided(fd, buf, count, datatype, file_ptr_type, + offset, status, error_code); + } + return; + } + + /* Get Lustre hints information */ + ADIOI_LUSTRE_Get_striping_info(fd, &striping_info, 1); + + /* calculate what portions of the access requests of this process are + * located in which process + */ + ADIOI_LUSTRE_Calc_my_req(fd, offset_list, len_list, contig_access_count, + striping_info, nprocs, &count_my_req_procs, + &count_my_req_per_proc, &my_req, &buf_idx); + + /* based on everyone's my_req, calculate what requests of other processes + * will be accessed by this process. + * count_others_req_procs = number of processes whose requests (including + * this process itself) will be accessed by this process + * count_others_req_per_proc[i] indicates how many separate contiguous + * requests of proc. i will be accessed by this process. + */ + + ADIOI_Calc_others_req(fd, count_my_req_procs, count_my_req_per_proc, + my_req, nprocs, myrank, &count_others_req_procs, + &others_req); + ADIOI_Free(count_my_req_per_proc); + + /* exchange data and write in sizes of no more than stripe_size. */ + ADIOI_LUSTRE_Exch_and_write(fd, buf, datatype, nprocs, myrank, + others_req, my_req, + offset_list, len_list, contig_access_count, + striping_info, buf_idx, error_code); + + /* If this collective write is followed by an independent write, + * it's possible to have those subsequent writes on other processes + * race ahead and sneak in before the read-modify-write completes. + * We carry out a collective communication at the end here so no one + * can start independent i/o before collective I/O completes. + * + * need to do some gymnastics with the error codes so that if something + * went wrong, all processes report error, but if a process has a more + * specific error code, we can still have that process report the + * additional information */ + + old_error = *error_code; + if (*error_code != MPI_SUCCESS) + *error_code = MPI_ERR_IO; + + /* optimization: if only one process performing i/o, we can perform + * a less-expensive Bcast */ +#ifdef ADIOI_MPE_LOGGING + MPE_Log_event(ADIOI_MPE_postwrite_a, 0, NULL); +#endif + if (fd->hints->cb_nodes == 1) + MPI_Bcast(error_code, 1, MPI_INT, + fd->hints->ranklist[0], fd->comm); + else { + tmp_error = *error_code; + MPI_Allreduce(&tmp_error, error_code, 1, MPI_INT, + MPI_MAX, fd->comm); + } +#ifdef ADIOI_MPE_LOGGING + MPE_Log_event(ADIOI_MPE_postwrite_b, 0, NULL); +#endif + + if ((old_error != MPI_SUCCESS) && (old_error != MPI_ERR_IO)) + *error_code = old_error; + + + if (!buftype_is_contig) + ADIOI_Delete_flattened(datatype); + + /* free all memory allocated for collective I/O */ + /* free others_req */ + for (i = 0; i < nprocs; i++) { + if (others_req[i].count) { + ADIOI_Free(others_req[i].offsets); + ADIOI_Free(others_req[i].lens); + ADIOI_Free(others_req[i].mem_ptrs); + } + } + ADIOI_Free(others_req); + /* free my_req here */ + for (i = 0; i < nprocs; i++) { + if (my_req[i].count) { + ADIOI_Free(my_req[i].offsets); + ADIOI_Free(my_req[i].lens); + } + } + ADIOI_Free(my_req); + ADIOI_Free(buf_idx); + ADIOI_Free(offset_list); + ADIOI_Free(len_list); + ADIOI_Free(st_offsets); + ADIOI_Free(end_offsets); + ADIOI_Free(striping_info); + +#ifdef HAVE_STATUS_SET_BYTES + if (status) { + int bufsize, size; + /* Don't set status if it isn't needed */ + MPI_Type_size(datatype, &size); + bufsize = size * count; + MPIR_Status_set_bytes(status, datatype, bufsize); + } + /* This is a temporary way of filling in status. The right way is to + * keep track of how much data was actually written during collective I/O. + */ +#endif + + fd->fp_sys_posn = -1; /* set it to null. */ +} + +/* If successful, error_code is set to MPI_SUCCESS. Otherwise an error + * code is created and returned in error_code. + */ +static void ADIOI_LUSTRE_Exch_and_write(ADIO_File fd, void *buf, + MPI_Datatype datatype, int nprocs, + int myrank, ADIOI_Access *others_req, + ADIOI_Access *my_req, + ADIO_Offset *offset_list, + int *len_list, int contig_access_count, + int *striping_info, int *buf_idx, + int *error_code) +{ + /* Send data to appropriate processes and write in sizes of no more + * than lustre stripe_size. + * The idea is to reduce the amount of extra memory required for + * collective I/O. If all data were written all at once, which is much + * easier, it would require temp space more than the size of user_buf, + * which is often unacceptable. For example, to write a distributed + * array to a file, where each local array is 8Mbytes, requiring + * at least another 8Mbytes of temp space is unacceptable. + */ + + int hole, i, j, m, flag, ntimes = 1 , max_ntimes, buftype_is_contig; + ADIO_Offset st_loc = -1, end_loc = -1, min_st_loc, max_end_loc; + ADIO_Offset off, req_off, send_off, iter_st_off, *off_list; + ADIO_Offset max_size, step_size = 0; + int real_size, req_len, send_len; + int *recv_curr_offlen_ptr, *recv_count, *recv_size; + int *send_curr_offlen_ptr, *send_size; + int *partial_recv, *sent_to_proc, *recv_start_pos; + int *send_buf_idx, *curr_to_proc, *done_to_proc; + char *write_buf = NULL; + MPI_Status status; + ADIOI_Flatlist_node *flat_buf = NULL; + MPI_Aint buftype_extent; + int stripe_size = striping_info[0], avail_cb_nodes = striping_info[2]; + int data_sieving = 0; + + *error_code = MPI_SUCCESS; /* changed below if error */ + /* only I/O errors are currently reported */ + + /* calculate the number of writes of stripe size to be done. + * That gives the no. of communication phases as well. + * Note: + * Because we redistribute data in stripe-contiguous pattern for Lustre, + * each process has the same no. of communication phases. + */ + + for (i = 0; i < nprocs; i++) { + if (others_req[i].count) { + st_loc = others_req[i].offsets[0]; + end_loc = others_req[i].offsets[0]; + break; + } + } + for (i = 0; i < nprocs; i++) { + for (j = 0; j < others_req[i].count; j++) { + st_loc = ADIOI_MIN(st_loc, others_req[i].offsets[j]); + end_loc = ADIOI_MAX(end_loc, (others_req[i].offsets[j] + + others_req[i].lens[j] - 1)); + } + } + /* this process does no writing. */ + if ((st_loc == -1) && (end_loc == -1)) + ntimes = 0; + MPI_Allreduce(&end_loc, &max_end_loc, 1, MPI_LONG_LONG_INT, MPI_MAX, fd->comm); + /* avoid min_st_loc be -1 */ + if (st_loc == -1) + st_loc = max_end_loc; + MPI_Allreduce(&st_loc, &min_st_loc, 1, MPI_LONG_LONG_INT, MPI_MIN, fd->comm); + /* align downward */ + min_st_loc -= min_st_loc % (ADIO_Offset)stripe_size; + + /* Each time, only avail_cb_nodes number of IO clients perform IO, + * so, step_size=avail_cb_nodes*stripe_size IO will be performed at most, + * and ntimes=whole_file_portion/step_size + */ + step_size = (ADIO_Offset) avail_cb_nodes * stripe_size; + max_ntimes = (int)((max_end_loc - min_st_loc) / step_size + 1); + if (ntimes) + write_buf = (char *) ADIOI_Malloc(stripe_size); + + /* calculate the start offset for each iteration */ + off_list = (ADIO_Offset *) ADIOI_Malloc(max_ntimes * sizeof(ADIO_Offset)); + for (m = 0; m < max_ntimes; m ++) + off_list[m] = max_end_loc; + for (i = 0; i < nprocs; i++) { + for (j = 0; j < others_req[i].count; j ++) { + req_off = others_req[i].offsets[j]; + m = (int)((req_off - min_st_loc) / step_size); + off_list[m] = ADIOI_MIN(off_list[m], req_off); + } + } + + recv_curr_offlen_ptr = (int *) ADIOI_Calloc(nprocs, sizeof(int)); + send_curr_offlen_ptr = (int *) ADIOI_Calloc(nprocs, sizeof(int)); + /* their use is explained below. calloc initializes to 0. */ + + recv_count = (int *) ADIOI_Malloc(nprocs * sizeof(int)); + /* to store count of how many off-len pairs per proc are satisfied + in an iteration. */ + + send_size = (int *) ADIOI_Malloc(nprocs * sizeof(int)); + /* total size of data to be sent to each proc. in an iteration. + Of size nprocs so that I can use MPI_Alltoall later. */ + + recv_size = (int *) ADIOI_Malloc(nprocs * sizeof(int)); + /* total size of data to be recd. from each proc. in an iteration. */ + + sent_to_proc = (int *) ADIOI_Calloc(nprocs, sizeof(int)); + /* amount of data sent to each proc so far. Used in + ADIOI_Fill_send_buffer. initialized to 0 here. */ + + send_buf_idx = (int *) ADIOI_Malloc(nprocs * sizeof(int)); + curr_to_proc = (int *) ADIOI_Malloc(nprocs * sizeof(int)); + done_to_proc = (int *) ADIOI_Malloc(nprocs * sizeof(int)); + /* Above three are used in ADIOI_Fill_send_buffer */ + + recv_start_pos = (int *) ADIOI_Malloc(nprocs * sizeof(int)); + /* used to store the starting value of recv_curr_offlen_ptr[i] in + this iteration */ + + ADIOI_Datatype_iscontig(datatype, &buftype_is_contig); + if (!buftype_is_contig) { + ADIOI_Flatten_datatype(datatype); + flat_buf = ADIOI_Flatlist; + while (flat_buf->type != datatype) + flat_buf = flat_buf->next; + } + MPI_Type_extent(datatype, &buftype_extent); + /* I need to check if there are any outstanding nonblocking writes to + * the file, which could potentially interfere with the writes taking + * place in this collective write call. Since this is not likely to be + * common, let me do the simplest thing possible here: Each process + * completes all pending nonblocking operations before completing. + */ + /*ADIOI_Complete_async(error_code); + if (*error_code != MPI_SUCCESS) return; + MPI_Barrier(fd->comm); + */ + + iter_st_off = min_st_loc; + + /* Although we have recognized the data according to OST index, + * a read-modify-write will be done if there is a hole between the data. + * For example: if blocksize=60, xfersize=30 and stripe_size=100, + * then rank0 will collect data [0, 30] and [60, 90] then write. There + * is a hole in [30, 60], which will cause a read-modify-write in [0, 90]. + * + * To reduce its impact on the performance, we can disable data sieving + * by hint "ds_in_coll". + */ + /* check the hint for data sieving */ + data_sieving = fd->hints->fs_hints.lustre.ds_in_coll; + + for (m = 0; m < max_ntimes; m++) { + /* go through all others_req and my_req to check which will be received + * and sent in this iteration. + */ + + /* Note that MPI guarantees that displacements in filetypes are in + monotonically nondecreasing order and that, for writes, the + filetypes cannot specify overlapping regions in the file. This + simplifies implementation a bit compared to reads. */ + + /* + off = start offset in the file for the data to be written in + this iteration + iter_st_off = start offset of this iteration + real_size = size of data written (bytes) corresponding to off + max_size = possible maximum size of data written in this iteration + req_off = offset in the file for a particular contiguous request minus + what was satisfied in previous iteration + send_off = offset the request needed by other processes in this iteration + req_len = size corresponding to req_off + send_len = size corresponding to send_off + */ + + /* first calculate what should be communicated */ + for (i = 0; i < nprocs; i++) + recv_count[i] = recv_size[i] = send_size[i] = 0; + + off = off_list[m]; + max_size = ADIOI_MIN(step_size, max_end_loc - iter_st_off + 1); + real_size = (int) ADIOI_MIN((off / stripe_size + 1) * stripe_size - off, + end_loc - off + 1); + + for (i = 0; i < nprocs; i++) { + if (my_req[i].count) { + for (j = send_curr_offlen_ptr[i]; j < my_req[i].count; j++) { + send_off = my_req[i].offsets[j]; + send_len = my_req[i].lens[j]; + if (send_off < iter_st_off + max_size) { + send_size[i] += send_len; + } else { + break; + } + } + send_curr_offlen_ptr[i] = j; + } + if (others_req[i].count) { + recv_start_pos[i] = recv_curr_offlen_ptr[i]; + for (j = recv_curr_offlen_ptr[i]; j < others_req[i].count; j++) { + req_off = others_req[i].offsets[j]; + req_len = others_req[i].lens[j]; + if (req_off < iter_st_off + max_size) { + recv_count[i]++; + MPI_Address(write_buf + req_off - off, + &(others_req[i].mem_ptrs[j])); + recv_size[i] += req_len; + } else { + break; + } + } + recv_curr_offlen_ptr[i] = j; + } + } + /* use variable "hole" to pass data_sieving flag into W_Exchange_data */ + hole = data_sieving; + ADIOI_LUSTRE_W_Exchange_data(fd, buf, write_buf, flat_buf, offset_list, + len_list, send_size, recv_size, off, real_size, + recv_count, recv_start_pos, partial_recv, + sent_to_proc, nprocs, myrank, + buftype_is_contig, contig_access_count, + striping_info, others_req, send_buf_idx, + curr_to_proc, done_to_proc, &hole, m, + buftype_extent, buf_idx, error_code); + if (*error_code != MPI_SUCCESS) + goto over; + + flag = 0; + for (i = 0; i < nprocs; i++) + if (recv_count[i]) { + flag = 1; + break; + } + if (flag) { + /* check whether to do data sieving */ + if(data_sieving == ADIOI_HINT_ENABLE) { + ADIO_WriteContig(fd, write_buf, real_size, MPI_BYTE, + ADIO_EXPLICIT_OFFSET, off, &status, + error_code); + } else { + /* if there is no hole, write data in one time; + * otherwise, write data in several times */ + if (!hole) { + ADIO_WriteContig(fd, write_buf, real_size, MPI_BYTE, + ADIO_EXPLICIT_OFFSET, off, &status, + error_code); + } else { + for (i = 0; i < nprocs; i++) { + if (others_req[i].count) { + for (j = 0; j < others_req[i].count; j++) { + if (others_req[i].offsets[j] < off + real_size && + others_req[i].offsets[j] >= off) { + ADIO_WriteContig(fd, + write_buf + others_req[i].offsets[j] - off, + others_req[i].lens[j], + MPI_BYTE, ADIO_EXPLICIT_OFFSET, + others_req[i].offsets[j], &status, + error_code); + if (*error_code != MPI_SUCCESS) + goto over; + } + } + } + } + } + } + if (*error_code != MPI_SUCCESS) + goto over; + } + iter_st_off += max_size; + } +over: + if (ntimes) + ADIOI_Free(write_buf); + ADIOI_Free(recv_curr_offlen_ptr); + ADIOI_Free(send_curr_offlen_ptr); + ADIOI_Free(recv_count); + ADIOI_Free(send_size); + ADIOI_Free(recv_size); + ADIOI_Free(sent_to_proc); + ADIOI_Free(recv_start_pos); + ADIOI_Free(send_buf_idx); + ADIOI_Free(curr_to_proc); + ADIOI_Free(done_to_proc); + ADIOI_Free(off_list); +} + +/* Sets error_code to MPI_SUCCESS if successful, or creates an error code + * in the case of error. + */ +static void ADIOI_LUSTRE_W_Exchange_data(ADIO_File fd, void *buf, + char *write_buf, + ADIOI_Flatlist_node *flat_buf, + ADIO_Offset *offset_list, + int *len_list, int *send_size, + int *recv_size, ADIO_Offset off, + int size, int *count, + int *start_pos, int *partial_recv, + int *sent_to_proc, int nprocs, + int myrank, int buftype_is_contig, + int contig_access_count, + int *striping_info, + ADIOI_Access *others_req, + int *send_buf_idx, + int *curr_to_proc, int *done_to_proc, + int *hole, int iter, + MPI_Aint buftype_extent, + int *buf_idx, int *error_code) +{ + int i, j, nprocs_recv, nprocs_send, err; + char **send_buf = NULL; + MPI_Request *requests, *send_req; + MPI_Datatype *recv_types; + MPI_Status *statuses, status; + int *srt_len, sum, sum_recv; + ADIO_Offset *srt_off; + int data_sieving = *hole; + static char myname[] = "ADIOI_W_EXCHANGE_DATA"; + + /* create derived datatypes for recv */ + nprocs_recv = 0; + for (i = 0; i < nprocs; i++) + if (recv_size[i]) + nprocs_recv++; + + recv_types = (MPI_Datatype *) ADIOI_Malloc((nprocs_recv + 1) * + sizeof(MPI_Datatype)); + /* +1 to avoid a 0-size malloc */ + + j = 0; + for (i = 0; i < nprocs; i++) { + if (recv_size[i]) { + MPI_Type_hindexed(count[i], + &(others_req[i].lens[start_pos[i]]), + &(others_req[i].mem_ptrs[start_pos[i]]), + MPI_BYTE, recv_types + j); + /* absolute displacements; use MPI_BOTTOM in recv */ + MPI_Type_commit(recv_types + j); + j++; + } + } + + /* To avoid a read-modify-write, + * check if there are holes in the data to be written. + * For this, merge the (sorted) offset lists others_req using a heap-merge. + */ + + sum = 0; + for (i = 0; i < nprocs; i++) + sum += count[i]; + srt_off = (ADIO_Offset *) ADIOI_Malloc((sum + 1) * sizeof(ADIO_Offset)); + srt_len = (int *) ADIOI_Malloc((sum + 1) * sizeof(int)); + /* +1 to avoid a 0-size malloc */ + + ADIOI_Heap_merge(others_req, count, srt_off, srt_len, start_pos, + nprocs, nprocs_recv, sum); + + /* check if there are any holes */ + *hole = 0; + for (i = 0; i < sum - 1; i++) { + if (srt_off[i] + srt_len[i] < srt_off[i + 1]) { + *hole = 1; + break; + } + } + /* In some cases (see John Bent ROMIO REQ # 835), an odd interaction + * between aggregation, nominally contiguous regions, and cb_buffer_size + * should be handled with a read-modify-write (otherwise we will write out + * more data than we receive from everyone else (inclusive), so override + * hole detection + */ + if (*hole == 0) { + sum_recv = 0; + for (i = 0; i < nprocs; i++) + sum_recv += recv_size[i]; + if (size > sum_recv) + *hole = 1; + } + /* check the hint for data sieving */ + if (data_sieving == ADIOI_HINT_ENABLE && nprocs_recv && *hole) { + ADIO_ReadContig(fd, write_buf, size, MPI_BYTE, + ADIO_EXPLICIT_OFFSET, off, &status, &err); + // --BEGIN ERROR HANDLING-- + if (err != MPI_SUCCESS) { + *error_code = MPIO_Err_create_code(err, + MPIR_ERR_RECOVERABLE, + myname, __LINE__, + MPI_ERR_IO, + "**ioRMWrdwr", 0); + ADIOI_Free(recv_types); + ADIOI_Free(srt_off); + ADIOI_Free(srt_len); + return; + } + // --END ERROR HANDLING-- + } + ADIOI_Free(srt_off); + ADIOI_Free(srt_len); + + nprocs_send = 0; + for (i = 0; i < nprocs; i++) + if (send_size[i]) + nprocs_send++; + + if (fd->atomicity) { + /* bug fix from Wei-keng Liao and Kenin Coloma */ + requests = (MPI_Request *) ADIOI_Malloc((nprocs_send + 1) * + sizeof(MPI_Request)); + send_req = requests; + } else { + requests = (MPI_Request *) ADIOI_Malloc((nprocs_send + nprocs_recv + 1)* + sizeof(MPI_Request)); + /* +1 to avoid a 0-size malloc */ + + /* post receives */ + j = 0; + for (i = 0; i < nprocs; i++) { + if (recv_size[i]) { + MPI_Irecv(MPI_BOTTOM, 1, recv_types[j], i, + myrank + i + 100 * iter, fd->comm, requests + j); + j++; + } + } + send_req = requests + nprocs_recv; + } + + /* post sends. + * if buftype_is_contig, data can be directly sent from + * user buf at location given by buf_idx. else use send_buf. + */ + if (buftype_is_contig) { + j = 0; + for (i = 0; i < nprocs; i++) + if (send_size[i]) { + MPI_Isend(((char *) buf) + buf_idx[i], send_size[i], + MPI_BYTE, i, myrank + i + 100 * iter, fd->comm, + send_req + j); + j++; + buf_idx[i] += send_size[i]; + } + } else if (nprocs_send) { + /* buftype is not contig */ + send_buf = (char **) ADIOI_Malloc(nprocs * sizeof(char *)); + for (i = 0; i < nprocs; i++) + if (send_size[i]) + send_buf[i] = (char *) ADIOI_Malloc(send_size[i]); + + ADIOI_LUSTRE_Fill_send_buffer(fd, buf, flat_buf, send_buf, offset_list, + len_list, send_size, send_req, + sent_to_proc, nprocs, myrank, + contig_access_count, striping_info, + send_buf_idx, curr_to_proc, done_to_proc, + iter, buftype_extent); + /* the send is done in ADIOI_Fill_send_buffer */ + } + + /* bug fix from Wei-keng Liao and Kenin Coloma */ + if (fd->atomicity) { + j = 0; + for (i = 0; i < nprocs; i++) { + MPI_Status wkl_status; + if (recv_size[i]) { + MPI_Recv(MPI_BOTTOM, 1, recv_types[j], i, + myrank + i + 100 * iter, fd->comm, &wkl_status); + j++; + } + } + } + + for (i = 0; i < nprocs_recv; i++) + MPI_Type_free(recv_types + i); + ADIOI_Free(recv_types); + + /* bug fix from Wei-keng Liao and Kenin Coloma */ + /* +1 to avoid a 0-size malloc */ + if (fd->atomicity) { + statuses = (MPI_Status *) ADIOI_Malloc((nprocs_send + 1) * + sizeof(MPI_Status)); + } else { + statuses = (MPI_Status *) ADIOI_Malloc((nprocs_send + nprocs_recv + 1) * + sizeof(MPI_Status)); + } + +#ifdef NEEDS_MPI_TEST + i = 0; + if (fd->atomicity) { + /* bug fix from Wei-keng Liao and Kenin Coloma */ + while (!i) + MPI_Testall(nprocs_send, send_req, &i, statuses); + } else { + while (!i) + MPI_Testall(nprocs_send + nprocs_recv, requests, &i, statuses); + } +#else + /* bug fix from Wei-keng Liao and Kenin Coloma */ + if (fd->atomicity) + MPI_Waitall(nprocs_send, send_req, statuses); + else + MPI_Waitall(nprocs_send + nprocs_recv, requests, statuses); +#endif + ADIOI_Free(statuses); + ADIOI_Free(requests); + if (!buftype_is_contig && nprocs_send) { + for (i = 0; i < nprocs; i++) + if (send_size[i]) + ADIOI_Free(send_buf[i]); + ADIOI_Free(send_buf); + } +} + +#define ADIOI_BUF_INCR \ +{ \ + while (buf_incr) { \ + size_in_buf = ADIOI_MIN(buf_incr, flat_buf_sz); \ + user_buf_idx += size_in_buf; \ + flat_buf_sz -= size_in_buf; \ + if (!flat_buf_sz) { \ + if (flat_buf_idx < (flat_buf->count - 1)) flat_buf_idx++; \ + else { \ + flat_buf_idx = 0; \ + n_buftypes++; \ + } \ + user_buf_idx = flat_buf->indices[flat_buf_idx] + \ + n_buftypes*buftype_extent; \ + flat_buf_sz = flat_buf->blocklens[flat_buf_idx]; \ + } \ + buf_incr -= size_in_buf; \ + } \ +} + + +#define ADIOI_BUF_COPY \ +{ \ + while (size) { \ + size_in_buf = ADIOI_MIN(size, flat_buf_sz); \ + memcpy(&(send_buf[p][send_buf_idx[p]]), \ + ((char *) buf) + user_buf_idx, size_in_buf); \ + send_buf_idx[p] += size_in_buf; \ + user_buf_idx += size_in_buf; \ + flat_buf_sz -= size_in_buf; \ + if (!flat_buf_sz) { \ + if (flat_buf_idx < (flat_buf->count - 1)) flat_buf_idx++; \ + else { \ + flat_buf_idx = 0; \ + n_buftypes++; \ + } \ + user_buf_idx = flat_buf->indices[flat_buf_idx] + \ + n_buftypes*buftype_extent; \ + flat_buf_sz = flat_buf->blocklens[flat_buf_idx]; \ + } \ + size -= size_in_buf; \ + buf_incr -= size_in_buf; \ + } \ + ADIOI_BUF_INCR \ +} + +static void ADIOI_LUSTRE_Fill_send_buffer(ADIO_File fd, void *buf, + ADIOI_Flatlist_node *flat_buf, + char **send_buf, + ADIO_Offset *offset_list, + int *len_list, int *send_size, + MPI_Request *requests, + int *sent_to_proc, int nprocs, + int myrank, + int contig_access_count, + int *striping_info, + int *send_buf_idx, + int *curr_to_proc, + int *done_to_proc, int iter, + MPI_Aint buftype_extent) +{ + /* this function is only called if buftype is not contig */ + int i, p, flat_buf_idx, size; + int flat_buf_sz, buf_incr, size_in_buf, jj, n_buftypes; + ADIO_Offset off, len, rem_len, user_buf_idx; + + /* curr_to_proc[p] = amount of data sent to proc. p that has already + * been accounted for so far + * done_to_proc[p] = amount of data already sent to proc. p in + * previous iterations + * user_buf_idx = current location in user buffer + * send_buf_idx[p] = current location in send_buf of proc. p + */ + + for (i = 0; i < nprocs; i++) { + send_buf_idx[i] = curr_to_proc[i] = 0; + done_to_proc[i] = sent_to_proc[i]; + } + jj = 0; + + user_buf_idx = flat_buf->indices[0]; + flat_buf_idx = 0; + n_buftypes = 0; + flat_buf_sz = flat_buf->blocklens[0]; + + /* flat_buf_idx = current index into flattened buftype + * flat_buf_sz = size of current contiguous component in flattened buf + */ + for (i = 0; i < contig_access_count; i++) { + off = offset_list[i]; + rem_len = (ADIO_Offset) len_list[i]; + + /*this request may span to more than one process */ + while (rem_len != 0) { + len = rem_len; + /* NOTE: len value is modified by ADIOI_Calc_aggregator() to be no + * longer than the single region that processor "p" is responsible + * for. + */ + p = ADIOI_LUSTRE_Calc_aggregator(fd, off, &len, striping_info); + + if (send_buf_idx[p] < send_size[p]) { + if (curr_to_proc[p] + len > done_to_proc[p]) { + if (done_to_proc[p] > curr_to_proc[p]) { + size = (int) ADIOI_MIN(curr_to_proc[p] + len - + done_to_proc[p], + send_size[p] - + send_buf_idx[p]); + buf_incr = done_to_proc[p] - curr_to_proc[p]; + ADIOI_BUF_INCR + buf_incr = (int) (curr_to_proc[p] + len - + done_to_proc[p]); + curr_to_proc[p] = done_to_proc[p] + size; + ADIOI_BUF_COPY + } else { + size = (int) ADIOI_MIN(len, send_size[p] - + send_buf_idx[p]); + buf_incr = (int) len; + curr_to_proc[p] += size; + ADIOI_BUF_COPY + } + if (send_buf_idx[p] == send_size[p]) { + MPI_Isend(send_buf[p], send_size[p], MPI_BYTE, p, + myrank + p + 100 * iter, fd->comm, + requests + jj); + jj++; + } + } else { + curr_to_proc[p] += (int) len; + buf_incr = (int) len; + ADIOI_BUF_INCR + } + } else { + buf_incr = (int) len; + ADIOI_BUF_INCR + } + off += len; + rem_len -= len; + } + } + for (i = 0; i < nprocs; i++) + if (send_size[i]) + sent_to_proc[i] = curr_to_proc[i]; +} diff -ruN adio/ad_lustre_orig/ad_lustre_wrstr.c adio/ad_lustre/ad_lustre_wrstr.c --- adio/ad_lustre_orig/ad_lustre_wrstr.c 1970-01-01 08:00:00.000000000 +0800 +++ adio/ad_lustre/ad_lustre_wrstr.c 2009-02-27 10:35:18.000000000 +0800 @@ -0,0 +1,467 @@ +/* -*- Mode: C; c-basic-offset:4 ; -*- */ +/* + * Copyright (C) 1997 University of Chicago. + * See COPYRIGHT notice in top-level directory. + * + * Copyright (C) 2007 Oak Ridge National Laboratory + * + * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved. + */ + +#include "ad_lustre.h" +#include "adio_extern.h" + +#define ADIOI_BUFFERED_WRITE \ +{ \ + if (req_off >= writebuf_off + writebuf_len) { \ + if (writebuf_len) { \ + ADIO_WriteContig(fd, writebuf, writebuf_len, MPI_BYTE, \ + ADIO_EXPLICIT_OFFSET, writebuf_off, &status1, error_code); \ + if (!(fd->atomicity)) \ + ADIOI_UNLOCK(fd, writebuf_off, SEEK_SET, writebuf_len); \ + if (*error_code != MPI_SUCCESS) { \ + *error_code = MPIO_Err_create_code(*error_code, \ + MPIR_ERR_RECOVERABLE, myname, \ + __LINE__, MPI_ERR_IO, \ + "**iowswc", 0); \ + ADIOI_Free(writebuf); \ + return; \ + } \ + } \ + writebuf_off = req_off; \ + /* stripe_size alignment */ \ + writebuf_len = (int) ADIOI_MIN(end_offset - writebuf_off + 1, \ + (writebuf_off / stripe_size + 1) * \ + stripe_size - writebuf_off);\ + if (!(fd->atomicity)) \ + ADIOI_WRITE_LOCK(fd, writebuf_off, SEEK_SET, writebuf_len); \ + ADIO_ReadContig(fd, writebuf, writebuf_len, MPI_BYTE, ADIO_EXPLICIT_OFFSET,\ + writebuf_off, &status1, error_code); \ + if (*error_code != MPI_SUCCESS) { \ + *error_code = MPIO_Err_create_code(*error_code, \ + MPIR_ERR_RECOVERABLE, myname, \ + __LINE__, MPI_ERR_IO, \ + "**iowsrc", 0); \ + ADIOI_Free(writebuf); \ + return; \ + } \ + } \ + write_sz = (int) ADIOI_MIN(req_len, writebuf_off + writebuf_len - req_off); \ + memcpy(writebuf + req_off - writebuf_off, (char *)buf + userbuf_off, write_sz);\ + while (write_sz != req_len) {\ + ADIO_WriteContig(fd, writebuf, writebuf_len, MPI_BYTE, \ + ADIO_EXPLICIT_OFFSET, writebuf_off, &status1, error_code); \ + if (!(fd->atomicity)) \ + ADIOI_UNLOCK(fd, writebuf_off, SEEK_SET, writebuf_len); \ + if (*error_code != MPI_SUCCESS) { \ + *error_code = MPIO_Err_create_code(*error_code, \ + MPIR_ERR_RECOVERABLE, myname, \ + __LINE__, MPI_ERR_IO, \ + "**iowswc", 0); \ + ADIOI_Free(writebuf); \ + return; \ + } \ + req_len -= write_sz; \ + userbuf_off += write_sz; \ + writebuf_off += writebuf_len; \ + /* stripe_size alignment */ \ + writebuf_len = (int) ADIOI_MIN(end_offset - writebuf_off + 1, \ + (writebuf_off / stripe_size + 1) * \ + stripe_size - writebuf_off);\ + if (!(fd->atomicity)) \ + ADIOI_WRITE_LOCK(fd, writebuf_off, SEEK_SET, writebuf_len); \ + ADIO_ReadContig(fd, writebuf, writebuf_len, MPI_BYTE, ADIO_EXPLICIT_OFFSET,\ + writebuf_off, &status1, error_code); \ + if (*error_code != MPI_SUCCESS) { \ + *error_code = MPIO_Err_create_code(*error_code, \ + MPIR_ERR_RECOVERABLE, myname, \ + __LINE__, MPI_ERR_IO, \ + "**iowsrc", 0); \ + ADIOI_Free(writebuf); \ + return; \ + } \ + write_sz = ADIOI_MIN(req_len, writebuf_len); \ + memcpy(writebuf, (char *)buf + userbuf_off, write_sz);\ + } \ +} + + +/* this macro is used when filetype is contig and buftype is not contig. + it does not do a read-modify-write and does not lock*/ +#define ADIOI_BUFFERED_WRITE_WITHOUT_READ \ +{ \ + if (req_off >= writebuf_off + writebuf_len) { \ + writebuf_off = req_off; \ + /* stripe_size alignment */ \ + writebuf_len = (int) ADIOI_MIN(end_offset - writebuf_off + 1, \ + (writebuf_off / stripe_size + 1) * \ + stripe_size - writebuf_off);\ + } \ + write_sz = (int) ADIOI_MIN(req_len, writebuf_off + writebuf_len - req_off); \ + memcpy(writebuf + req_off - writebuf_off, (char *)buf + userbuf_off, write_sz);\ + while (req_len) { \ + ADIO_WriteContig(fd, writebuf, writebuf_len, MPI_BYTE, \ + ADIO_EXPLICIT_OFFSET, writebuf_off, &status1, error_code); \ + if (*error_code != MPI_SUCCESS) { \ + *error_code = MPIO_Err_create_code(*error_code, \ + MPIR_ERR_RECOVERABLE, myname, \ + __LINE__, MPI_ERR_IO, \ + "**iowswc", 0); \ + ADIOI_Free(writebuf); \ + return; \ + } \ + req_len -= write_sz; \ + userbuf_off += write_sz; \ + writebuf_off += writebuf_len; \ + /* stripe_size alignment */ \ + writebuf_len = (int) ADIOI_MIN(end_offset - writebuf_off + 1, \ + (writebuf_off / stripe_size + 1) * \ + stripe_size - writebuf_off);\ + write_sz = ADIOI_MIN(req_len, writebuf_len); \ + memcpy(writebuf, (char *)buf + userbuf_off, write_sz);\ + } \ +} + +void ADIOI_LUSTRE_WriteStrided(ADIO_File fd, void *buf, int count, + MPI_Datatype datatype, int file_ptr_type, + ADIO_Offset offset, ADIO_Status * status, + int *error_code) +{ + /* offset is in units of etype relative to the filetype. */ + ADIOI_Flatlist_node *flat_buf, *flat_file; + int i, j, k, bwr_size, fwr_size = 0, st_index = 0; + int bufsize, num, size, sum, n_etypes_in_filetype, size_in_filetype; + int n_filetypes, etype_in_filetype; + ADIO_Offset abs_off_in_filetype = 0; + int filetype_size, etype_size, buftype_size, req_len; + MPI_Aint filetype_extent, buftype_extent; + int buf_count, buftype_is_contig, filetype_is_contig; + ADIO_Offset userbuf_off; + ADIO_Offset off, req_off, disp, end_offset = 0, writebuf_off, start_off; + char *writebuf; + int flag, st_fwr_size, st_n_filetypes, writebuf_len, write_sz; + ADIO_Status status1; + int new_bwr_size, new_fwr_size; + int stripe_size; + static char myname[] = "ADIOI_LUSTRE_WriteStrided"; + int myrank; + MPI_Comm_rank(fd->comm, &myrank); + + if (fd->hints->ds_write == ADIOI_HINT_DISABLE) { + /* if user has disabled data sieving on writes, use naive + * approach instead. + */ + ADIOI_GEN_WriteStrided_naive(fd, + buf, + count, + datatype, + file_ptr_type, + offset, status, error_code); + return; + } + + *error_code = MPI_SUCCESS; /* changed below if error */ + + ADIOI_Datatype_iscontig(datatype, &buftype_is_contig); + ADIOI_Datatype_iscontig(fd->filetype, &filetype_is_contig); + + MPI_Type_size(fd->filetype, &filetype_size); + if (!filetype_size) { + *error_code = MPI_SUCCESS; + return; + } + + MPI_Type_extent(fd->filetype, &filetype_extent); + MPI_Type_size(datatype, &buftype_size); + MPI_Type_extent(datatype, &buftype_extent); + etype_size = fd->etype_size; + + bufsize = buftype_size * count; + + /* get striping info */ + stripe_size = fd->hints->striping_unit; + + /* Different buftype to different filetype */ + if (!buftype_is_contig && filetype_is_contig) { + /* noncontiguous in memory, contiguous in file. */ + ADIOI_Flatten_datatype(datatype); + flat_buf = ADIOI_Flatlist; + while (flat_buf->type != datatype) + flat_buf = flat_buf->next; + + off = (file_ptr_type == ADIO_INDIVIDUAL) ? fd->fp_ind : + fd->disp + etype_size * offset; + + start_off = off; + end_offset = start_off + bufsize - 1; + writebuf_off = start_off; + /* write stripe size buffer each time */ + writebuf = (char *) ADIOI_Malloc(ADIOI_MIN(bufsize, stripe_size)); + writebuf_len = (int) ADIOI_MIN(bufsize, + (writebuf_off / stripe_size + 1) * + stripe_size - writebuf_off); + + /* if atomicity is true, lock the region to be accessed */ + if (fd->atomicity) + ADIOI_WRITE_LOCK(fd, start_off, SEEK_SET, bufsize); + + for (j = 0; j < count; j++) { + for (i = 0; i < flat_buf->count; i++) { + userbuf_off = j * buftype_extent + flat_buf->indices[i]; + req_off = off; + req_len = flat_buf->blocklens[i]; + ADIOI_BUFFERED_WRITE_WITHOUT_READ + off += flat_buf->blocklens[i]; + } + } + + /* write the buffer out finally */ + ADIO_WriteContig(fd, writebuf, writebuf_len, MPI_BYTE, + ADIO_EXPLICIT_OFFSET, writebuf_off, &status1, + error_code); + + if (fd->atomicity) + ADIOI_UNLOCK(fd, start_off, SEEK_SET, bufsize); + if (*error_code != MPI_SUCCESS) { + ADIOI_Free(writebuf); + return; + } + ADIOI_Free(writebuf); + if (file_ptr_type == ADIO_INDIVIDUAL) + fd->fp_ind = off; + } else { + /* noncontiguous in file */ + /* filetype already flattened in ADIO_Open */ + flat_file = ADIOI_Flatlist; + while (flat_file->type != fd->filetype) + flat_file = flat_file->next; + disp = fd->disp; + + if (file_ptr_type == ADIO_INDIVIDUAL) { + offset = fd->fp_ind; /* in bytes */ + n_filetypes = -1; + flag = 0; + while (!flag) { + n_filetypes++; + for (i = 0; i < flat_file->count; i++) { + if (disp + flat_file->indices[i] + + (ADIO_Offset) n_filetypes * filetype_extent + + flat_file->blocklens[i] >= offset) { + st_index = i; + fwr_size = (int) (disp + flat_file->indices[i] + + (ADIO_Offset) n_filetypes * + filetype_extent + + flat_file->blocklens[i] - + offset); + flag = 1; + break; + } + } + } + } else { + n_etypes_in_filetype = filetype_size / etype_size; + n_filetypes = (int) (offset / n_etypes_in_filetype); + etype_in_filetype = (int) (offset % n_etypes_in_filetype); + size_in_filetype = etype_in_filetype * etype_size; + + sum = 0; + for (i = 0; i < flat_file->count; i++) { + sum += flat_file->blocklens[i]; + if (sum > size_in_filetype) { + st_index = i; + fwr_size = sum - size_in_filetype; + abs_off_in_filetype = flat_file->indices[i] + + size_in_filetype - (sum - flat_file->blocklens[i]); + break; + } + } + + /* abs. offset in bytes in the file */ + offset = disp + (ADIO_Offset) n_filetypes *filetype_extent + + abs_off_in_filetype; + } + + start_off = offset; + + /* If the file bytes is actually contiguous, we do not need data sieve at all */ + if (bufsize <= fwr_size) { + req_off = start_off; + req_len = bufsize; + end_offset = start_off + bufsize - 1; + writebuf = (char *) ADIOI_Malloc(ADIOI_MIN(bufsize, stripe_size)); + memset(writebuf, -1, ADIOI_MIN(bufsize, stripe_size)); + writebuf_off = 0; + writebuf_len = 0; + userbuf_off = 0; + ADIOI_BUFFERED_WRITE_WITHOUT_READ + } else { + /* Calculate end_offset, the last byte-offset that will be accessed. + e.g., if start_offset=0 and 100 bytes to be write, end_offset=99 */ + st_fwr_size = fwr_size; + st_n_filetypes = n_filetypes; + i = 0; + j = st_index; + off = offset; + fwr_size = ADIOI_MIN(st_fwr_size, bufsize); + while (i < bufsize) { + i += fwr_size; + end_offset = off + fwr_size - 1; + + if (j < (flat_file->count - 1)) + j++; + else { + j = 0; + n_filetypes++; + } + + off = disp + flat_file->indices[j] + + (ADIO_Offset) n_filetypes * filetype_extent; + fwr_size = ADIOI_MIN(flat_file->blocklens[j], bufsize - i); + } + + writebuf_off = 0; + writebuf_len = 0; + writebuf = (char *) ADIOI_Malloc(stripe_size); + memset(writebuf, -1, stripe_size); + /* if atomicity is true, lock the region to be accessed */ + if (fd->atomicity) + ADIOI_WRITE_LOCK(fd, start_off, SEEK_SET, bufsize); + + if (buftype_is_contig && !filetype_is_contig) { + /* contiguous in memory, noncontiguous in file. should be the most + common case. */ + i = 0; + j = st_index; + off = offset; + n_filetypes = st_n_filetypes; + fwr_size = ADIOI_MIN(st_fwr_size, bufsize); + while (i < bufsize) { + if (fwr_size) { + /* TYPE_UB and TYPE_LB can result in + fwr_size = 0. save system call in such cases */ + /* + lseek(fd->fd_sys, off, SEEK_SET); + err = write(fd->fd_sys, ((char *) buf) + i, fwr_size); + */ + req_off = off; + req_len = fwr_size; + userbuf_off = i; + ADIOI_BUFFERED_WRITE + } + i += fwr_size; + + if (off + fwr_size < disp + flat_file->indices[j] + + flat_file->blocklens[j] + + (ADIO_Offset) n_filetypes * filetype_extent) + off += fwr_size; + /* did not reach end of contiguous block in filetype. + no more I/O needed. off is incremented by fwr_size. */ + else { + if (j < (flat_file->count - 1)) + j++; + else { + j = 0; + n_filetypes++; + } + off = disp + flat_file->indices[j] + + (ADIO_Offset) n_filetypes * filetype_extent; + fwr_size = ADIOI_MIN(flat_file->blocklens[j], + bufsize - i); + } + } + } else { + /* noncontiguous in memory as well as in file */ + ADIOI_Flatten_datatype(datatype); + flat_buf = ADIOI_Flatlist; + while (flat_buf->type != datatype) + flat_buf = flat_buf->next; + + k = num = buf_count = 0; + i = (int) (flat_buf->indices[0]); + j = st_index; + off = offset; + n_filetypes = st_n_filetypes; + fwr_size = st_fwr_size; + bwr_size = flat_buf->blocklens[0]; + + while (num < bufsize) { + size = ADIOI_MIN(fwr_size, bwr_size); + if (size) { + /* + lseek(fd->fd_sys, off, SEEK_SET); + err = write(fd->fd_sys, ((char *) buf) + i, size); + */ + req_off = off; + req_len = size; + userbuf_off = i; + ADIOI_BUFFERED_WRITE + } + + new_fwr_size = fwr_size; + new_bwr_size = bwr_size; + + if (size == fwr_size) { + /* reached end of contiguous block in file */ + if (j < (flat_file->count - 1)) { + j++; + } else { + j = 0; + n_filetypes++; + } + off = disp + flat_file->indices[j] + + (ADIO_Offset) n_filetypes * filetype_extent; + new_fwr_size = flat_file->blocklens[j]; + if (size != bwr_size) { + i += size; + new_bwr_size -= size; + } + } + if (size == bwr_size) { + /* reached end of contiguous block in memory */ + k = (k + 1) % flat_buf->count; + buf_count++; + i = (int) (buftype_extent * + (buf_count / flat_buf->count) + + flat_buf->indices[k]); + new_bwr_size = flat_buf->blocklens[k]; + if (size != fwr_size) { + off += size; + new_fwr_size -= size; + } + } + num += size; + fwr_size = new_fwr_size; + bwr_size = new_bwr_size; + } + } + + /* write the buffer out finally */ + if (writebuf_len) { + ADIO_WriteContig(fd, writebuf, writebuf_len, + MPI_BYTE, ADIO_EXPLICIT_OFFSET, + writebuf_off, &status1, error_code); + if (!(fd->atomicity)) + ADIOI_UNLOCK(fd, writebuf_off, SEEK_SET, writebuf_len); + if (*error_code != MPI_SUCCESS) { + ADIOI_Free(writebuf); + return; + } + } + if (fd->atomicity) + ADIOI_UNLOCK(fd, start_off, SEEK_SET, bufsize); + } + ADIOI_Free(writebuf); + if (file_ptr_type == ADIO_INDIVIDUAL) + fd->fp_ind = off; + } + fd->fp_sys_posn = -1; /* set it to null. */ + +#ifdef HAVE_STATUS_SET_BYTES + MPIR_Status_set_bytes(status, datatype, bufsize); + /* This is a temporary way of filling in status. The right way is to + keep track of how much data was actually written by ADIOI_BUFFERED_WRITE. */ +#endif + + if (!buftype_is_contig) + ADIOI_Delete_flattened(datatype); +} diff -ruN adio/ad_lustre_orig/Makefile.in adio/ad_lustre/Makefile.in --- adio/ad_lustre_orig/Makefile.in 2008-09-17 14:36:56.000000000 +0800 +++ adio/ad_lustre/Makefile.in 2008-10-17 17:03:06.000000000 +0800 @@ -16,7 +16,9 @@ @VPATH@ AD_LUSTRE_OBJECTS = ad_lustre.o ad_lustre_open.o \ - ad_lustre_rwcontig.o ad_lustre_hints.o + ad_lustre_rwcontig.o ad_lustre_wrcoll.o ad_lustre_wrstr.o \ + ad_lustre_hints.o ad_lustre_aggregate.o + default: $(LIBNAME) @if [ "@ENABLE_SHLIB@" != "none" ] ; then \ diff -ruN adio/ad_lustre_orig/README adio/ad_lustre/README --- adio/ad_lustre_orig/README 2008-09-17 14:36:56.000000000 +0800 +++ adio/ad_lustre/README 2009-04-24 09:46:20.000000000 +0800 @@ -5,6 +5,21 @@ o To post the code for ParColl (Partitioned collective IO) ----------------------------------------------------- +V05: +----------------------------------------------------- +Improved data redistribution + o Improve I/O pattern identification. Besides checking interleaving, + if request I/O size is small, collective I/O will be performed. + The hint bigsize can be used to define the req size value. + o Provide hint CO for load balancing to control the number of + IO clients for each OST + o Produce stripe-contiguous I/O pattern that Lustre prefers + o Control read-modify-write in data sieving in collective IO + by hint ds_in_coll. + o Reduce extent lock conflicts by make each OST accessed by one or + more constant clients. + +----------------------------------------------------- V04: ----------------------------------------------------- o Direct IO and Lockless IO support --- adio/common/ad_write_coll_orig.c 2009-02-27 22:06:46.000000000 +0800 +++ adio/common/ad_write_coll.c 2008-10-15 11:25:38.000000000 +0800 @@ -42,7 +42,7 @@ int *send_buf_idx, int *curr_to_proc, int *done_to_proc, int iter, MPI_Aint buftype_extent); -static void ADIOI_Heap_merge(ADIOI_Access *others_req, int *count, +void ADIOI_Heap_merge(ADIOI_Access *others_req, int *count, ADIO_Offset *srt_off, int *srt_len, int *start_pos, int nprocs, int nprocs_recv, int total_elements); @@ -921,7 +921,7 @@ -static void ADIOI_Heap_merge(ADIOI_Access *others_req, int *count, +void ADIOI_Heap_merge(ADIOI_Access *others_req, int *count, ADIO_Offset *srt_off, int *srt_len, int *start_pos, int nprocs, int nprocs_recv, int total_elements) {