From f5ae60838166d69f0b34607b02c0512855e41efe Mon Sep 17 00:00:00 2001 From: liuy Date: Wed, 5 Nov 2008 02:22:38 +0000 Subject: [PATCH] Branch b1_8_gate b=12521 A new Lustre ADIO driver for MPICH2-1.0.7 --- lustre/contrib/adio_driver_mpich2-1.0.7.patch | 2588 +++++++++++++++++++++++++ 1 file changed, 2588 insertions(+) create mode 100644 lustre/contrib/adio_driver_mpich2-1.0.7.patch diff --git a/lustre/contrib/adio_driver_mpich2-1.0.7.patch b/lustre/contrib/adio_driver_mpich2-1.0.7.patch new file mode 100644 index 0000000..6b33872 --- /dev/null +++ b/lustre/contrib/adio_driver_mpich2-1.0.7.patch @@ -0,0 +1,2588 @@ +diff -ruN ad_lustre_orig/ad_lustre_aggregate.c ad_lustre/ad_lustre_aggregate.c +--- ad_lustre_orig/ad_lustre_aggregate.c 1970-01-01 08:00:00.000000000 +0800 ++++ ad_lustre/ad_lustre_aggregate.c 2008-10-17 17:30:00.000000000 +0800 +@@ -0,0 +1,502 @@ ++/* -*- Mode: C; c-basic-offset:4 ; -*- */ ++/* ++ * Copyright (C) 1997 University of Chicago. ++ * See COPYRIGHT notice in top-level directory. ++ * ++ * Copyright (C) 2007 Oak Ridge National Laboratory ++ * ++ * Copyright (C) 2008 Sun Microsystems, Lustre group ++ */ ++ ++#include "ad_lustre.h" ++#include "adio_extern.h" ++ ++void ADIOI_LUSTRE_Get_striping_info(ADIO_File fd, int ** striping_info_ptr, ++ int mode) ++{ ++ int *striping_info = NULL; ++ /* get striping information: ++ * striping_info[0]: stripe_size ++ * striping_info[1]: stripe_count ++ * striping_info[2]: avail_cb_nodes ++ */ ++ int stripe_size, stripe_count, CO = 1, CO_max = 1, CO_nodes, lflag; ++ int avail_cb_nodes, divisor, nprocs_for_coll = fd->hints->cb_nodes; ++ char *value = (char *) ADIOI_Malloc((MPI_MAX_INFO_VAL + 1) * sizeof(char)); ++ ++ /* Get hints value */ ++ /* stripe size */ ++ MPI_Info_get(fd->info, "striping_unit", MPI_MAX_INFO_VAL, value, &lflag); ++ if (lflag) ++ stripe_size = atoi(value); ++ /* stripe count */ ++ /* stripe_size and stripe_count have been validated in ADIOI_LUSTRE_Open() */ ++ MPI_Info_get(fd->info, "striping_factor", MPI_MAX_INFO_VAL, value, &lflag); ++ if (lflag) ++ stripe_count = atoi(value); ++ ++ /* Calculate the available number of I/O clients, that is ++ * avail_cb_nodes=min(cb_nodes, stripe_count*CO), where ++ * CO=1 by default ++ */ ++ if (!mode) { ++ /* for collective read, ++ * if "CO" clients access the same OST simultaneously, ++ * the OST disk seek time would be much. So, to avoid this, ++ * it might be better if 1 client only accesses 1 OST. ++ * So, we set CO = 1 to meet the above requirement. ++ */ ++ CO = 1; ++ /*XXX: maybe there are other better way for collective read */ ++ } else { ++ /* CO_max: the largest number of IO clients for each ost group */ ++ CO_max = (nprocs_for_coll - 1)/ stripe_count + 1; ++ /* CO also has been validated in ADIOI_LUSTRE_Open(), >0 */ ++ MPI_Info_get(fd->info, "CO", MPI_MAX_INFO_VAL, value, &lflag); ++ if (lflag) ++ CO = atoi(value); ++ CO = ADIOI_MIN(CO_max, CO); ++ } ++ /* Calculate how many IO clients we need */ ++ /* To avoid extent lock conflicts, ++ * avail_cb_nodes should divide (stripe_count*CO) exactly, ++ * so that each OST is accessed by only one or more constant clients. */ ++ avail_cb_nodes = ADIOI_MIN(nprocs_for_coll, stripe_count * CO); ++ if (avail_cb_nodes == nprocs_for_coll) { ++ CO_nodes = stripe_count * CO; ++ do { ++ /* find the divisor of CO_nodes */ ++ divisor = 1; ++ do { ++ divisor ++; ++ } while (CO_nodes % divisor); ++ CO_nodes = CO_nodes / divisor; ++ /* if stripe_count*CO is a prime number, change nothing */ ++ if ((CO_nodes <= avail_cb_nodes) && (CO_nodes != 1)) { ++ avail_cb_nodes = CO_nodes; ++ break; ++ } ++ } while (CO_nodes != 1); ++ } ++ ++ *striping_info_ptr = (int *) ADIOI_Malloc(3 * sizeof(int)); ++ striping_info = *striping_info_ptr; ++ striping_info[0] = stripe_size; ++ striping_info[1] = stripe_count; ++ striping_info[2] = avail_cb_nodes; ++ ++ ADIOI_Free(value); ++} ++ ++int ADIOI_LUSTRE_Calc_aggregator(ADIO_File fd, ADIO_Offset off, ++ ADIO_Offset *len, int *striping_info) ++{ ++ int rank_index, rank; ++ ADIO_Offset avail_bytes; ++ int stripe_size = striping_info[0]; ++ int avail_cb_nodes = striping_info[2]; ++ ++ /* Produce the stripe-contiguous pattern for Lustre */ ++ rank_index = (int)((off / stripe_size) % avail_cb_nodes); ++ ++ avail_bytes = (off / (ADIO_Offset)stripe_size + 1) * ++ (ADIO_Offset)stripe_size - off; ++ if (avail_bytes < *len) { ++ /* this proc only has part of the requested contig. region */ ++ *len = avail_bytes; ++ } ++ /* map our index to a rank */ ++ /* NOTE: FOR NOW WE DON'T HAVE A MAPPING...JUST DO 0..NPROCS_FOR_COLL */ ++ rank = fd->hints->ranklist[rank_index]; ++ ++ return rank; ++} ++ ++void ADIOI_LUSTRE_Calc_my_req(ADIO_File fd, ADIO_Offset *offset_list, ++ int *len_list, int contig_access_count, ++ int *striping_info, int nprocs, ++ int *count_my_req_procs_ptr, ++ int **count_my_req_per_proc_ptr, ++ ADIOI_Access ** my_req_ptr, ++ int **buf_idx_ptr) ++{ ++ /* Nothing different from ADIOI_Calc_my_req(), except calling ++ * ADIOI_Lustre_Calc_aggregator() instead of the old one */ ++ int *count_my_req_per_proc, count_my_req_procs, *buf_idx; ++ int i, l, proc; ++ ADIO_Offset avail_len, rem_len, curr_idx, off; ++ ADIOI_Access *my_req; ++ ++ *count_my_req_per_proc_ptr = (int *) ADIOI_Calloc(nprocs, sizeof(int)); ++ count_my_req_per_proc = *count_my_req_per_proc_ptr; ++ ++ /* buf_idx is relevant only if buftype_is_contig. ++ * buf_idx[i] gives the index into user_buf where data received ++ * from proc. i should be placed. This allows receives to be done ++ * without extra buffer. This can't be done if buftype is not contig. ++ */ ++ buf_idx = (int *) ADIOI_Malloc(nprocs * sizeof(int)); ++ /* initialize buf_idx to -1 */ ++ for (i = 0; i < nprocs; i++) ++ buf_idx[i] = -1; ++ ++ /* one pass just to calculate how much space to allocate for my_req; ++ * contig_access_count was calculated way back in ADIOI_Calc_my_off_len() ++ */ ++ for (i = 0; i < contig_access_count; i++) { ++ /* short circuit offset/len processing if len == 0 ++ * (zero-byte read/write ++ */ ++ if (len_list[i] == 0) ++ continue; ++ off = offset_list[i]; ++ avail_len = len_list[i]; ++ /* we set avail_len to be the total size of the access. ++ * then ADIOI_LUSTRE_Calc_aggregator() will modify the value to return ++ * the amount that was available. ++ */ ++ proc = ADIOI_LUSTRE_Calc_aggregator(fd, off, &avail_len, striping_info); ++ count_my_req_per_proc[proc]++; ++ /* figure out how many data is remaining in the access ++ * we'll take care of this data (if there is any) ++ * in the while loop below. ++ */ ++ rem_len = len_list[i] - avail_len; ++ ++ while (rem_len != 0) { ++ off += avail_len; /* point to first remaining byte */ ++ avail_len = rem_len; /* save remaining size, pass to calc */ ++ proc = ADIOI_LUSTRE_Calc_aggregator(fd, off, &avail_len, striping_info); ++ count_my_req_per_proc[proc]++; ++ rem_len -= avail_len; /* reduce remaining length by amount from fd */ ++ } ++ } ++ ++ *my_req_ptr = (ADIOI_Access *) ADIOI_Malloc(nprocs * sizeof(ADIOI_Access)); ++ my_req = *my_req_ptr; ++ ++ count_my_req_procs = 0; ++ for (i = 0; i < nprocs; i++) { ++ if (count_my_req_per_proc[i]) { ++ my_req[i].offsets = (ADIO_Offset *) ++ ADIOI_Malloc(count_my_req_per_proc[i] * ++ sizeof(ADIO_Offset)); ++ my_req[i].lens = (int *) ADIOI_Malloc(count_my_req_per_proc[i] * ++ sizeof(int)); ++ count_my_req_procs++; ++ } ++ my_req[i].count = 0; /* will be incremented where needed later */ ++ } ++ ++ /* now fill in my_req */ ++ curr_idx = 0; ++ for (i = 0; i < contig_access_count; i++) { ++ if (len_list[i] == 0) ++ continue; ++ off = offset_list[i]; ++ avail_len = len_list[i]; ++ proc = ADIOI_LUSTRE_Calc_aggregator(fd, off, &avail_len, striping_info); ++ ++ /* for each separate contiguous access from this process */ ++ if (buf_idx[proc] == -1) ++ buf_idx[proc] = (int) curr_idx; ++ ++ l = my_req[proc].count; ++ curr_idx += (int) avail_len; /* NOTE: Why is curr_idx an int? Fix? */ ++ ++ rem_len = len_list[i] - avail_len; ++ ++ /* store the proc, offset, and len information in an array ++ * of structures, my_req. Each structure contains the ++ * offsets and lengths located in that process's FD, ++ * and the associated count. ++ */ ++ my_req[proc].offsets[l] = off; ++ my_req[proc].lens[l] = (int) avail_len; ++ my_req[proc].count++; ++ ++ while (rem_len != 0) { ++ off += avail_len; ++ avail_len = rem_len; ++ proc = ADIOI_LUSTRE_Calc_aggregator(fd, off, &avail_len, ++ striping_info); ++ if (buf_idx[proc] == -1) ++ buf_idx[proc] = (int) curr_idx; ++ ++ l = my_req[proc].count; ++ curr_idx += avail_len; ++ rem_len -= avail_len; ++ ++ my_req[proc].offsets[l] = off; ++ my_req[proc].lens[l] = (int) avail_len; ++ my_req[proc].count++; ++ } ++ } ++ ++#ifdef AGG_DEBUG ++ for (i = 0; i < nprocs; i++) { ++ if (count_my_req_per_proc[i] > 0) { ++ FPRINTF(stdout, "data needed from %d (count = %d):\n", ++ i, my_req[i].count); ++ for (l = 0; l < my_req[i].count; l++) { ++ FPRINTF(stdout, " off[%d] = %lld, len[%d] = %d\n", ++ l, my_req[i].offsets[l], l, my_req[i].lens[l]); ++ } ++ } ++ } ++#endif ++#if 0 ++ for (i = 0; i < nprocs; i++) { ++ FPRINTF(stdout, "buf_idx[%d] = 0x%x\n", i, buf_idx[i]); ++ } ++#endif ++ ++ *count_my_req_procs_ptr = count_my_req_procs; ++ *buf_idx_ptr = buf_idx; ++} ++ ++int ADIOI_LUSTRE_Docollect(ADIO_File fd, int contig_access_count, ++ int *len_list, int nprocs) ++{ ++ /* If the processes are non-interleaved, we will check the req_size. ++ * if (avg_req_size > big_req_size) { ++ * docollect = 0; ++ * } ++ */ ++ ++ int i, docollect = 1, lflag, big_req_size = 0; ++ ADIO_Offset req_size = 0, total_req_size; ++ int avg_req_size, total_access_count; ++ char *value = NULL; ++ ++ /* calculate total_req_size and total_access_count */ ++ for (i = 0; i < contig_access_count; i++) ++ req_size += len_list[i]; ++ MPI_Allreduce(&req_size, &total_req_size, 1, MPI_LONG_LONG_INT, MPI_SUM, ++ fd->comm); ++ MPI_Allreduce(&contig_access_count, &total_access_count, 1, MPI_INT, MPI_SUM, ++ fd->comm); ++ /* estimate average req_size */ ++ avg_req_size = (int)(total_req_size / total_access_count); ++ ++ /* get hint of big_req_size */ ++ value = (char *) ADIOI_Malloc((MPI_MAX_INFO_VAL + 1) * sizeof(char)); ++ MPI_Info_get(fd->info, "big_req_size", MPI_MAX_INFO_VAL, value, &lflag); ++ if (lflag) ++ big_req_size = atoi(value); ++ /* Don't perform collective I/O if there are big requests */ ++ if ((big_req_size > 0) && (avg_req_size > big_req_size)) ++ docollect = 0; ++ ++ ADIOI_Free(value); ++ ++ return docollect; ++} ++ ++void ADIOI_LUSTRE_Calc_others_req(ADIO_File fd, int count_my_req_procs, ++ int *count_my_req_per_proc, ++ ADIOI_Access * my_req, ++ int nprocs, int myrank, ++ ADIO_Offset start_offset, ++ ADIO_Offset end_offset, ++ int *striping_info, ++ int *count_others_req_procs_ptr, ++ ADIOI_Access ** others_req_ptr) ++{ ++ /* what requests of other processes will be written by this process */ ++ ++ int *count_others_req_per_proc, count_others_req_procs, proc; ++ int i, j, lflag, samesize = 0, contiguous = 0; ++ int avail_cb_nodes = striping_info[2]; ++ MPI_Request *send_requests, *recv_requests; ++ MPI_Status *statuses; ++ ADIOI_Access *others_req; ++ char *value = NULL; ++ ADIO_Offset min_st_offset, off, req_len, avail_len, rem_len, *all_lens; ++ ++ /* There are two hints, which could reduce some MPI communication overhead, ++ * if the users knows the I/O pattern and set them correctly. */ ++ /* They are ++ * contiguous_data: if the data are contiguous, ++ * we don't need to do MPI_Alltoall(). ++ * same_io_size: And if the data req size is same, ++ * we can calculate the offset directly ++ */ ++ value = (char *) ADIOI_Malloc((MPI_MAX_INFO_VAL + 1) * sizeof(char)); ++ /* hint of contiguous data */ ++ MPI_Info_get(fd->info, "contiguous_data", MPI_MAX_INFO_VAL, value, &lflag); ++ if (lflag && !strcmp(value, "yes")) ++ contiguous = 1; ++ /* hint of same io size */ ++ MPI_Info_get(fd->info, "same_io_size", MPI_MAX_INFO_VAL, value, &lflag); ++ if (lflag && !strcmp(value, "yes")) ++ samesize = 1; ++ ADIOI_Free(value); ++ ++ *others_req_ptr = (ADIOI_Access *) ADIOI_Malloc(nprocs * ++ sizeof(ADIOI_Access)); ++ others_req = *others_req_ptr; ++ ++ /* if the data are contiguous, we can calulate the offset and length ++ * of the other requests simply, instead of MPI_Alltoall() */ ++ if (contiguous) { ++ for (i = 0; i < nprocs; i++) { ++ others_req[i].count = 0; ++ } ++ req_len = end_offset - start_offset + 1; ++ all_lens = (ADIO_Offset *) ADIOI_Malloc(nprocs * sizeof(ADIO_Offset)); ++ ++ /* same req size ? */ ++ if (samesize == 0) { ++ /* calculate the min_st_offset */ ++ MPI_Allreduce(&start_offset, &min_st_offset, 1, MPI_LONG_LONG, ++ MPI_MIN, fd->comm); ++ /* exchange request length */ ++ MPI_Allgather(&req_len, 1, ADIO_OFFSET, all_lens, 1, ADIO_OFFSET, ++ fd->comm); ++ } else { /* same request size */ ++ /* calculate the 1st request's offset */ ++ min_st_offset = start_offset - myrank * req_len; ++ /* assign request length to all_lens[] */ ++ for (i = 0; i < nprocs; i ++) ++ all_lens[i] = req_len; ++ } ++ if (myrank < avail_cb_nodes) { ++ /* This is a IO client and it will receive data from others */ ++ off = min_st_offset; ++ /* calcaulte other_req[i].count */ ++ for (i = 0; i < nprocs; i++) { ++ avail_len = all_lens[i]; ++ rem_len = avail_len; ++ while (rem_len > 0) { ++ proc = ADIOI_LUSTRE_Calc_aggregator(fd, off, &avail_len, ++ striping_info); ++ if (proc == myrank) { ++ others_req[i].count ++; ++ } ++ off += avail_len; ++ rem_len -= avail_len; ++ avail_len = rem_len; ++ } ++ } ++ /* calculate offset and len for each request */ ++ off = min_st_offset; ++ for (i = 0; i < nprocs; i++) { ++ if (others_req[i].count) { ++ others_req[i].offsets = (ADIO_Offset *) ++ ADIOI_Malloc(others_req[i].count * ++ sizeof(ADIO_Offset)); ++ others_req[i].lens = (int *) ++ ADIOI_Malloc(others_req[i].count * ++ sizeof(int)); ++ others_req[i].mem_ptrs = (MPI_Aint *) ++ ADIOI_Malloc(others_req[i].count * ++ sizeof(MPI_Aint)); ++ } ++ j = 0; ++ avail_len = all_lens[i]; ++ rem_len = avail_len; ++ while (rem_len > 0) { ++ proc = ADIOI_LUSTRE_Calc_aggregator(fd, off, &avail_len, ++ striping_info); ++ if (proc == myrank) { ++ others_req[i].offsets[j] = off; ++ others_req[i].lens[j] = (int)avail_len; ++ j ++; ++ } ++ off += avail_len; ++ rem_len -= avail_len; ++ avail_len = rem_len; ++ } ++ } ++ } ++ ADIOI_Free(all_lens); ++ } else { ++ /* multiple non-contiguous requests */ ++ /* first find out how much to send/recv and from/to whom */ ++ ++ /* ++ * count_others_req_procs: ++ * number of processes whose requests will be written by ++ * this process (including this process itself) ++ * count_others_req_per_proc[i]: ++ * how many separate contiguous requests of proc[i] will be ++ * written by this process. ++ */ ++ ++ count_others_req_per_proc = (int *) ADIOI_Malloc(nprocs * sizeof(int)); ++ ++ MPI_Alltoall(count_my_req_per_proc, 1, MPI_INT, ++ count_others_req_per_proc, 1, MPI_INT, fd->comm); ++ ++ count_others_req_procs = 0; ++ for (i = 0; i < nprocs; i++) { ++ if (count_others_req_per_proc[i]) { ++ others_req[i].count = count_others_req_per_proc[i]; ++ others_req[i].offsets = (ADIO_Offset *) ++ ADIOI_Malloc(others_req[i].count * ++ sizeof(ADIO_Offset)); ++ others_req[i].lens = (int *) ++ ADIOI_Malloc(others_req[i].count * ++ sizeof(int)); ++ others_req[i].mem_ptrs = (MPI_Aint *) ++ ADIOI_Malloc(others_req[i].count * ++ sizeof(MPI_Aint)); ++ count_others_req_procs++; ++ } else ++ others_req[i].count = 0; ++ } ++ ++ /* now send the calculated offsets and lengths to respective processes */ ++ ++ send_requests = (MPI_Request *) ADIOI_Malloc(2 * (count_my_req_procs + 1) * ++ sizeof(MPI_Request)); ++ recv_requests = (MPI_Request *) ADIOI_Malloc(2 * (count_others_req_procs+1)* ++ sizeof(MPI_Request)); ++ /* +1 to avoid a 0-size malloc */ ++ ++ j = 0; ++ for (i = 0; i < nprocs; i++) { ++ if (others_req[i].count) { ++ MPI_Irecv(others_req[i].offsets, others_req[i].count, ++ ADIO_OFFSET, i, i + myrank, fd->comm, ++ &recv_requests[j]); ++ j++; ++ MPI_Irecv(others_req[i].lens, others_req[i].count, ++ MPI_INT, i, i + myrank + 1, fd->comm, ++ &recv_requests[j]); ++ j++; ++ } ++ } ++ ++ j = 0; ++ for (i = 0; i < nprocs; i++) { ++ if (my_req[i].count) { ++ MPI_Isend(my_req[i].offsets, my_req[i].count, ++ ADIO_OFFSET, i, i + myrank, fd->comm, ++ &send_requests[j]); ++ j++; ++ MPI_Isend(my_req[i].lens, my_req[i].count, ++ MPI_INT, i, i + myrank + 1, fd->comm, ++ &send_requests[j]); ++ j++; ++ } ++ } ++ ++ statuses = (MPI_Status *) ++ ADIOI_Malloc((1 + 2 * ADIOI_MAX(count_my_req_procs, ++ count_others_req_procs)) * ++ sizeof(MPI_Status)); ++ /* +1 to avoid a 0-size malloc */ ++ ++ MPI_Waitall(2 * count_my_req_procs, send_requests, statuses); ++ MPI_Waitall(2 * count_others_req_procs, recv_requests, statuses); ++ ++ ADIOI_Free(send_requests); ++ ADIOI_Free(recv_requests); ++ ADIOI_Free(statuses); ++ ADIOI_Free(count_others_req_per_proc); ++ ++ *count_others_req_procs_ptr = count_others_req_procs; ++ } ++} +diff -ruN ad_lustre_orig/ad_lustre.c ad_lustre/ad_lustre.c +--- ad_lustre_orig/ad_lustre.c 2008-09-17 14:36:57.000000000 +0800 ++++ ad_lustre/ad_lustre.c 2008-10-17 17:03:42.000000000 +0800 +@@ -1,9 +1,11 @@ + /* -*- Mode: C; c-basic-offset:4 ; -*- */ +-/* +- * Copyright (C) 2001 University of Chicago. ++/* ++ * Copyright (C) 2001 University of Chicago. + * See COPYRIGHT notice in top-level directory. + * + * Copyright (C) 2007 Oak Ridge National Laboratory ++ * ++ * Copyright (C) 2008 Sun Microsystems, Lustre group + */ + + #include "ad_lustre.h" +@@ -13,12 +15,12 @@ + ADIOI_LUSTRE_ReadContig, /* ReadContig */ + ADIOI_LUSTRE_WriteContig, /* WriteContig */ + ADIOI_GEN_ReadStridedColl, /* ReadStridedColl */ +- ADIOI_GEN_WriteStridedColl, /* WriteStridedColl */ ++ ADIOI_LUSTRE_WriteStridedColl, /* WriteStridedColl */ + ADIOI_GEN_SeekIndividual, /* SeekIndividual */ + ADIOI_GEN_Fcntl, /* Fcntl */ + ADIOI_LUSTRE_SetInfo, /* SetInfo */ + ADIOI_GEN_ReadStrided, /* ReadStrided */ +- ADIOI_GEN_WriteStrided, /* WriteStrided */ ++ ADIOI_LUSTRE_WriteStrided, /* WriteStrided */ + ADIOI_GEN_Close, /* Close */ + #if defined(ROMIO_HAVE_WORKING_AIO) && !defined(CRAY_XT_LUSTRE) + ADIOI_GEN_IreadContig, /* IreadContig */ +diff -ruN ad_lustre_orig/ad_lustre.h ad_lustre/ad_lustre.h +--- ad_lustre_orig/ad_lustre.h 2008-09-17 14:36:57.000000000 +0800 ++++ ad_lustre/ad_lustre.h 2008-10-17 17:11:11.000000000 +0800 +@@ -1,9 +1,11 @@ + /* -*- Mode: C; c-basic-offset:4 ; -*- */ +-/* +- * Copyright (C) 1997 University of Chicago. ++/* ++ * Copyright (C) 1997 University of Chicago. + * See COPYRIGHT notice in top-level directory. + * + * Copyright (C) 2007 Oak Ridge National Laboratory ++ * ++ * Copyright (C) 2008 Sun Microsystems, Lustre group + */ + + #ifndef AD_UNIX_INCLUDE +@@ -24,7 +26,32 @@ + + /*#include */ + #include ++#ifdef WITH_LUSTRE + #include "lustre/lustre_user.h" ++#else ++/* copy something from lustre_user.h here */ ++# define LOV_USER_MAGIC 0x0BD10BD0 ++# define LL_IOC_LOV_SETSTRIPE _IOW ('f', 154, long) ++# define LL_IOC_LOV_GETSTRIPE _IOW ('f', 155, long) ++# define lov_user_ost_data lov_user_ost_data_v1 ++struct lov_user_ost_data_v1 { /* per-stripe data structure */ ++ __u64 l_object_id; /* OST object ID */ ++ __u64 l_object_gr; /* OST object group (creating MDS number) */ ++ __u32 l_ost_gen; /* generation of this OST index */ ++ __u32 l_ost_idx; /* OST index in LOV */ ++} __attribute__((packed)); ++#define lov_user_md lov_user_md_v1 ++struct lov_user_md_v1 { /* LOV EA user data (host-endian) */ ++ __u32 lmm_magic; /* magic number = LOV_USER_MAGIC_V1 */ ++ __u32 lmm_pattern; /* LOV_PATTERN_RAID0, LOV_PATTERN_RAID1 */ ++ __u64 lmm_object_id; /* LOV object ID */ ++ __u64 lmm_object_gr; /* LOV object group */ ++ __u32 lmm_stripe_size; /* size of stripe in bytes */ ++ __u16 lmm_stripe_count; /* num stripes in use for this object */ ++ __u16 lmm_stripe_offset; /* starting stripe offset in lmm_objects */ ++ struct lov_user_ost_data_v1 lmm_objects[0]; /* per-stripe data */ ++} __attribute__((packed)); ++#endif + #include "adio.h" + /*#include "adioi.h"*/ + +@@ -41,24 +68,31 @@ + + void ADIOI_LUSTRE_Open(ADIO_File fd, int *error_code); + void ADIOI_LUSTRE_Close(ADIO_File fd, int *error_code); +-void ADIOI_LUSTRE_ReadContig(ADIO_File fd, void *buf, int count, +- MPI_Datatype datatype, int file_ptr_type, +- ADIO_Offset offset, ADIO_Status *status, int +- *error_code); +-void ADIOI_LUSTRE_WriteContig(ADIO_File fd, void *buf, int count, +- MPI_Datatype datatype, int file_ptr_type, +- ADIO_Offset offset, ADIO_Status *status, int +- *error_code); ++void ADIOI_LUSTRE_ReadContig(ADIO_File fd, void *buf, int count, ++ MPI_Datatype datatype, int file_ptr_type, ++ ADIO_Offset offset, ADIO_Status *status, ++ int *error_code); ++void ADIOI_LUSTRE_WriteContig(ADIO_File fd, void *buf, int count, ++ MPI_Datatype datatype, int file_ptr_type, ++ ADIO_Offset offset, ADIO_Status *status, ++ int *error_code); ++void ADIOI_LUSTRE_WriteStrided(ADIO_File fd, void *buf, int count, ++ MPI_Datatype datatype, int file_ptr_type, ++ ADIO_Offset offset, ADIO_Status *status, ++ int *error_code); + void ADIOI_LUSTRE_WriteStridedColl(ADIO_File fd, void *buf, int count, +- MPI_Datatype datatype, int file_ptr_type, +- ADIO_Offset offset, ADIO_Status *status, int +- *error_code); ++ MPI_Datatype datatype, int file_ptr_type, ++ ADIO_Offset offset, ADIO_Status *status, ++ int *error_code); + void ADIOI_LUSTRE_ReadStridedColl(ADIO_File fd, void *buf, int count, +- MPI_Datatype datatype, int file_ptr_type, +- ADIO_Offset offset, ADIO_Status *status, int +- *error_code); ++ MPI_Datatype datatype, int file_ptr_type, ++ ADIO_Offset offset, ADIO_Status *status, ++ int *error_code); ++void ADIOI_LUSTRE_ReadStrided(ADIO_File fd, void *buf, int count, ++ MPI_Datatype datatype, int file_ptr_type, ++ ADIO_Offset offset, ADIO_Status *status, ++ int *error_code); + void ADIOI_LUSTRE_Fcntl(ADIO_File fd, int flag, ADIO_Fcntl_t *fcntl_struct, + int *error_code); + void ADIOI_LUSTRE_SetInfo(ADIO_File fd, MPI_Info users_info, int *error_code); +- + #endif /* End of AD_UNIX_INCLUDE */ +diff -ruN ad_lustre_orig/ad_lustre_hints.c ad_lustre/ad_lustre_hints.c +--- ad_lustre_orig/ad_lustre_hints.c 2008-09-17 14:36:57.000000000 +0800 ++++ ad_lustre/ad_lustre_hints.c 2008-10-20 14:36:48.000000000 +0800 +@@ -1,9 +1,11 @@ + /* -*- Mode: C; c-basic-offset:4 ; -*- */ +-/* +- * Copyright (C) 1997 University of Chicago. ++/* ++ * Copyright (C) 1997 University of Chicago. + * See COPYRIGHT notice in top-level directory. + * + * Copyright (C) 2007 Oak Ridge National Laboratory ++ * ++ * Copyright (C) 2008 Sun Microsystems, Lustre group + */ + + #include "ad_lustre.h" +@@ -11,130 +13,173 @@ + + void ADIOI_LUSTRE_SetInfo(ADIO_File fd, MPI_Info users_info, int *error_code) + { +- char *value, *value_in_fd; +- int flag, tmp_val[3], str_factor=-1, str_unit=0, start_iodev=-1; +- struct lov_user_md lum = { 0 }; +- int err, myrank, fd_sys, perm, amode, old_mask; ++ char *value = NULL; ++ int flag, tmp_val, int_val, str_factor, str_unit, start_iodev; ++ static char myname[] = "ADIOI_LUSTRE_SETINFO"; + ++ *error_code = MPI_SUCCESS; + value = (char *) ADIOI_Malloc((MPI_MAX_INFO_VAL+1)*sizeof(char)); ++ + if ( (fd->info) == MPI_INFO_NULL) { +- /* This must be part of the open call. can set striping parameters +- if necessary. */ ++ /* This must be part of the open call. can set striping parameters ++ if necessary. */ + MPI_Info_create(&(fd->info)); + + MPI_Info_set(fd->info, "direct_read", "false"); + MPI_Info_set(fd->info, "direct_write", "false"); + fd->direct_read = fd->direct_write = 0; +- +- /* has user specified striping or server buffering parameters ++ ++ /* has user specified striping or server buffering parameters + and do they have the same value on all processes? */ + if (users_info != MPI_INFO_NULL) { +- MPI_Info_get(users_info, "striping_unit", MPI_MAX_INFO_VAL, +- value, &flag); +- if (flag) +- str_unit=atoi(value); +- +- MPI_Info_get(users_info, "striping_factor", MPI_MAX_INFO_VAL, +- value, &flag); +- if (flag) +- str_factor=atoi(value); +- +- MPI_Info_get(users_info, "start_iodevice", MPI_MAX_INFO_VAL, ++ /* direct read and write */ ++ MPI_Info_get(users_info, "direct_read", MPI_MAX_INFO_VAL, + value, &flag); +- if (flag) +- start_iodev=atoi(value); +- +- MPI_Info_get(users_info, "direct_read", MPI_MAX_INFO_VAL, +- value, &flag); + if (flag && (!strcmp(value, "true") || !strcmp(value, "TRUE"))) { + MPI_Info_set(fd->info, "direct_read", "true"); + fd->direct_read = 1; + } +- +- MPI_Info_get(users_info, "direct_write", MPI_MAX_INFO_VAL, ++ MPI_Info_get(users_info, "direct_write", MPI_MAX_INFO_VAL, + value, &flag); + if (flag && (!strcmp(value, "true") || !strcmp(value, "TRUE"))) { + MPI_Info_set(fd->info, "direct_write", "true"); + fd->direct_write = 1; + } ++ /* stripe size */ ++ MPI_Info_get(users_info, "striping_unit", MPI_MAX_INFO_VAL, ++ value, &flag); ++ if (flag && (str_unit = atoi(value))) { ++ tmp_val = str_unit; ++ MPI_Bcast(&tmp_val, 1, MPI_INT, 0, fd->comm); ++ if (tmp_val != str_unit) { ++ MPIO_ERR_CREATE_CODE_INFO_NOT_SAME(myname, ++ "striping_unit", ++ error_code); ++ ADIOI_Free(value); ++ return; ++ } ++ MPI_Info_set(fd->info, "striping_unit", value); ++ } ++ /* stripe count */ ++ MPI_Info_get(users_info, "striping_factor", MPI_MAX_INFO_VAL, ++ value, &flag); ++ if (flag && (str_factor = atoi(value))) { ++ tmp_val = str_factor; ++ MPI_Bcast(&tmp_val, 1, MPI_INT, 0, fd->comm); ++ if (tmp_val != str_factor) { ++ MPIO_ERR_CREATE_CODE_INFO_NOT_SAME(myname, ++ "striping_factor", ++ error_code); ++ ADIOI_Free(value); ++ return; ++ } ++ MPI_Info_set(fd->info, "striping_factor", value); ++ } ++ /* stripe offset */ ++ MPI_Info_get(users_info, "start_iodevice", MPI_MAX_INFO_VAL, ++ value, &flag); ++ if (flag && ((start_iodev = atoi(value)) >= 0)) { ++ tmp_val = start_iodev; ++ MPI_Bcast(&tmp_val, 1, MPI_INT, 0, fd->comm); ++ if (tmp_val != start_iodev) { ++ MPIO_ERR_CREATE_CODE_INFO_NOT_SAME(myname, ++ "start_iodevice", ++ error_code); ++ ADIOI_Free(value); ++ return; ++ } ++ MPI_Info_set(fd->info, "start_iodevice", value); ++ } + } +- +- MPI_Comm_rank(fd->comm, &myrank); +- if (myrank == 0) { +- tmp_val[0] = str_factor; +- tmp_val[1] = str_unit; +- tmp_val[2] = start_iodev; ++ } ++ if (users_info != MPI_INFO_NULL) { ++ /* CO: IO Clients/OST, ++ * to keep the load balancing between clients and OSTs */ ++ MPI_Info_get(users_info, "CO", MPI_MAX_INFO_VAL, value, ++ &flag); ++ if (flag && (int_val = atoi(value)) > 0) { ++ tmp_val = int_val; ++ MPI_Bcast(&tmp_val, 1, MPI_INT, 0, fd->comm); ++ if (tmp_val != int_val) { ++ MPIO_ERR_CREATE_CODE_INFO_NOT_SAME(myname, ++ "CO", ++ error_code); ++ ADIOI_Free(value); ++ return; ++ } ++ MPI_Info_set(fd->info, "CO", value); + } +- MPI_Bcast(tmp_val, 3, MPI_INT, 0, fd->comm); +- +- if (tmp_val[0] != str_factor +- || tmp_val[1] != str_unit +- || tmp_val[2] != start_iodev) { +- FPRINTF(stderr, "ADIOI_LUSTRE_SetInfo: All keys" +- "-striping_factor:striping_unit:start_iodevice " +- "need to be identical across all processes\n"); +- MPI_Abort(MPI_COMM_WORLD, 1); +- } else if ((str_factor > 0) || (str_unit > 0) || (start_iodev >= 0)) { +- /* if user has specified striping info, process 0 tries to set it */ +- if (!myrank) { +- if (fd->perm == ADIO_PERM_NULL) { +- old_mask = umask(022); +- umask(old_mask); +- perm = old_mask ^ 0666; +- } +- else perm = fd->perm; +- +- amode = 0; +- if (fd->access_mode & ADIO_CREATE) +- amode = amode | O_CREAT; +- if (fd->access_mode & ADIO_RDONLY) +- amode = amode | O_RDONLY; +- if (fd->access_mode & ADIO_WRONLY) +- amode = amode | O_WRONLY; +- if (fd->access_mode & ADIO_RDWR) +- amode = amode | O_RDWR; +- if (fd->access_mode & ADIO_EXCL) +- amode = amode | O_EXCL; +- +- /* we need to create file so ensure this is set */ +- amode = amode | O_LOV_DELAY_CREATE | O_CREAT; +- +- fd_sys = open(fd->filename, amode, perm); +- if (fd_sys == -1) { +- if (errno != EEXIST) +- fprintf(stderr, +- "Failure to open file %s %d %d\n",strerror(errno), amode, perm); +- } else { +- lum.lmm_magic = LOV_USER_MAGIC; +- lum.lmm_pattern = 0; +- lum.lmm_stripe_size = str_unit; +- lum.lmm_stripe_count = str_factor; +- lum.lmm_stripe_offset = start_iodev; +- +- err = ioctl(fd_sys, LL_IOC_LOV_SETSTRIPE, &lum); +- if (err == -1 && errno != EEXIST) { +- fprintf(stderr, "Failure to set stripe info %s \n", strerror(errno)); +- } +- close(fd_sys); +- } +- } /* End of striping parameters validation */ ++ /* big_req_size: ++ * if the req size is bigger than this, ++ * collective IO may not be performed. ++ */ ++ MPI_Info_get(users_info, "big_req_size", MPI_MAX_INFO_VAL, value, ++ &flag); ++ if (flag && (int_val = atoi(value)) > 0) { ++ tmp_val = int_val; ++ MPI_Bcast(&tmp_val, 1, MPI_INT, 0, fd->comm); ++ if (tmp_val != int_val) { ++ MPIO_ERR_CREATE_CODE_INFO_NOT_SAME(myname, ++ "big_req_size", ++ error_code); ++ ADIOI_Free(value); ++ return; ++ } ++ MPI_Info_set(fd->info, "big_req_size", value); ++ } ++ /* ds_in_coll: disable data sieving in collective IO */ ++ MPI_Info_get(users_info, "ds_in_coll", MPI_MAX_INFO_VAL, ++ value, &flag); ++ if (flag && (!strcmp(value, "enable") || ++ !strcmp(value, "ENABLE"))) { ++ tmp_val = int_val = 1; ++ MPI_Bcast(&tmp_val, 1, MPI_INT, 0, fd->comm); ++ if (tmp_val != int_val) { ++ MPIO_ERR_CREATE_CODE_INFO_NOT_SAME(myname, ++ "ds_in_coll", ++ error_code); ++ ADIOI_Free(value); ++ return; ++ } ++ MPI_Info_set(fd->info, "ds_in_coll", "enable"); ++ } ++ /* contiguous_data: whether the data are contiguous */ ++ MPI_Info_get(users_info, "contiguous_data", MPI_MAX_INFO_VAL, ++ value, &flag); ++ if (flag && (!strcmp(value, "yes") || ++ !strcmp(value, "YES"))) { ++ tmp_val = int_val = 1; ++ MPI_Bcast(&tmp_val, 1, MPI_INT, 0, fd->comm); ++ if (tmp_val != int_val) { ++ MPIO_ERR_CREATE_CODE_INFO_NOT_SAME(myname, ++ "contiguous_data", ++ error_code); ++ ADIOI_Free(value); ++ return; ++ } ++ MPI_Info_set(fd->info, "contiguous_data", "yes"); ++ } ++ /* same_io_size: whether the req size is same */ ++ MPI_Info_get(users_info, "same_io_size", MPI_MAX_INFO_VAL, ++ value, &flag); ++ if (flag && (!strcmp(value, "yes") || ++ !strcmp(value, "YES"))) { ++ tmp_val = int_val = 1; ++ MPI_Bcast(&tmp_val, 1, MPI_INT, 0, fd->comm); ++ if (tmp_val != int_val) { ++ MPIO_ERR_CREATE_CODE_INFO_NOT_SAME(myname, ++ "same_io_size", ++ error_code); ++ ADIOI_Free(value); ++ return; ++ } ++ MPI_Info_set(fd->info, "same_io_size", "yes"); + } +- +- MPI_Barrier(fd->comm); +- /* set the values for collective I/O and data sieving parameters */ +- ADIOI_GEN_SetInfo(fd, users_info, error_code); +- } else { +- /* The file has been opened previously and fd->fd_sys is a valid +- file descriptor. cannot set striping parameters now. */ +- +- /* set the values for collective I/O and data sieving parameters */ +- ADIOI_GEN_SetInfo(fd, users_info, error_code); + } +- +- if (ADIOI_Direct_read) fd->direct_read = 1; +- if (ADIOI_Direct_write) fd->direct_write = 1; +- + ADIOI_Free(value); ++ /* set the values for collective I/O and data sieving parameters */ ++ ADIOI_GEN_SetInfo(fd, users_info, error_code); + +- *error_code = MPI_SUCCESS; ++ if (ADIOI_Direct_read) fd->direct_read = 1; ++ if (ADIOI_Direct_write) fd->direct_write = 1; + } +diff -ruN ad_lustre_orig/ad_lustre_open.c ad_lustre/ad_lustre_open.c +--- ad_lustre_orig/ad_lustre_open.c 2008-09-17 14:36:57.000000000 +0800 ++++ ad_lustre/ad_lustre_open.c 2008-09-17 18:55:50.000000000 +0800 +@@ -1,18 +1,21 @@ + /* -*- Mode: C; c-basic-offset:4 ; -*- */ +-/* +- * Copyright (C) 1997 University of Chicago. ++/* ++ * Copyright (C) 1997 University of Chicago. + * See COPYRIGHT notice in top-level directory. + * + * Copyright (C) 2007 Oak Ridge National Laboratory ++ * ++ * Copyright (C) 2008 Sun Microsystems, Lustre group + */ + + #include "ad_lustre.h" + + void ADIOI_LUSTRE_Open(ADIO_File fd, int *error_code) + { +- int perm, old_mask, amode, amode_direct; ++ int perm, old_mask, amode = 0, amode_direct = 0, flag = 0, err, myrank; ++ int stripe_size = 0, stripe_count = 0, stripe_offset = -1; + struct lov_user_md lum = { 0 }; +- char *value; ++ char *value = (char *) ADIOI_Malloc((MPI_MAX_INFO_VAL + 1) * sizeof(char)); + + #if defined(MPICH2) || !defined(PRINT_ERR_MSG) + static char myname[] = "ADIOI_LUSTRE_OPEN"; +@@ -22,12 +25,57 @@ + old_mask = umask(022); + umask(old_mask); + perm = old_mask ^ 0666; +- } +- else perm = fd->perm; ++ } else ++ perm = fd->perm; + +- amode = 0; +- if (fd->access_mode & ADIO_CREATE) ++ if (fd->access_mode & ADIO_CREATE) { + amode = amode | O_CREAT; ++ /* Check striping info ++ * if already set by SetInfo(), set them to lum; otherwise, set by lum ++ */ ++ MPI_Info_get(fd->info, "striping_unit", MPI_MAX_INFO_VAL, value, ++ &flag); ++ if (flag) ++ stripe_size = atoi(value); ++ ++ MPI_Info_get(fd->info, "striping_factor", MPI_MAX_INFO_VAL, value, ++ &flag); ++ if (flag) ++ stripe_count = atoi(value); ++ ++ MPI_Info_get(fd->info, "start_iodevice", MPI_MAX_INFO_VAL, value, ++ &flag); ++ if (flag) ++ stripe_offset = atoi(value); ++ ++ /* if user has specified striping info, ++ * process 0 will try to check and set it. ++ */ ++ if ((stripe_size > 0) || (stripe_count > 0) || (stripe_offset >= 0)) { ++ MPI_Comm_rank(fd->comm, &myrank); ++ if (myrank == 0) { ++ int fd_sys = open(fd->filename, amode, perm); ++ if (fd_sys == -1) { ++ if (errno != EEXIST) ++ FPRINTF(stderr, "Failure to open file %s %d %d\n", ++ strerror(errno), amode, perm); ++ } else { ++ lum.lmm_magic = LOV_USER_MAGIC; ++ lum.lmm_pattern = 1; ++ lum.lmm_stripe_size = stripe_size; ++ lum.lmm_stripe_count = stripe_count; ++ lum.lmm_stripe_offset = stripe_offset; ++ ++ if (ioctl(fd_sys, LL_IOC_LOV_SETSTRIPE, &lum)) ++ FPRINTF(stderr, ++ "Failure to set striping info to Lustre!\n"); ++ close(fd_sys); ++ } ++ } ++ MPI_Barrier(fd->comm); ++ } ++ } ++ + if (fd->access_mode & ADIO_RDONLY) + amode = amode | O_RDONLY; + if (fd->access_mode & ADIO_WRONLY) +@@ -42,32 +90,36 @@ + fd->fd_sys = open(fd->filename, amode|O_CREAT, perm); + + if (fd->fd_sys != -1) { +- int err; +- +- value = (char *) ADIOI_Malloc((MPI_MAX_INFO_VAL+1)*sizeof(char)); +- + /* get file striping information and set it in info */ +- lum.lmm_magic = LOV_USER_MAGIC; +- err = ioctl(fd->fd_sys, LL_IOC_LOV_GETSTRIPE, (void *) &lum); +- +- if (!err) { +- sprintf(value, "%d", lum.lmm_stripe_size); +- MPI_Info_set(fd->info, "striping_unit", value); +- +- sprintf(value, "%d", lum.lmm_stripe_count); +- MPI_Info_set(fd->info, "striping_factor", value); +- +- sprintf(value, "%d", lum.lmm_stripe_offset); +- MPI_Info_set(fd->info, "start_iodevice", value); +- } +- ADIOI_Free(value); ++ lum.lmm_magic = LOV_USER_MAGIC; ++ err = ioctl(fd->fd_sys, LL_IOC_LOV_GETSTRIPE, (void *) &lum); + ++ if (!err) { ++ if (lum.lmm_stripe_size && lum.lmm_stripe_count && ++ (lum.lmm_stripe_offset >= 0)) { ++ sprintf(value, "%d", lum.lmm_stripe_size); ++ MPI_Info_set(fd->info, "striping_unit", value); ++ ++ sprintf(value, "%d", lum.lmm_stripe_count); ++ MPI_Info_set(fd->info, "striping_factor", value); ++ ++ sprintf(value, "%d", lum.lmm_stripe_offset); ++ MPI_Info_set(fd->info, "start_iodevice", value); ++ } else { ++ FPRINTF(stderr, "Striping info is invalid!\n"); ++ ADIOI_Free(value); ++ MPI_Abort(MPI_COMM_WORLD, 1); ++ } ++ } else { ++ FPRINTF(stderr, "Failed to get striping info from Lustre!\n"); ++ ADIOI_Free(value); ++ MPI_Abort(MPI_COMM_WORLD, 1); ++ } + if (fd->access_mode & ADIO_APPEND) + fd->fp_ind = fd->fp_sys_posn = lseek(fd->fd_sys, 0, SEEK_END); +- } +- ++ } + if ((fd->fd_sys != -1) && (fd->access_mode & ADIO_APPEND)) +- fd->fp_ind = fd->fp_sys_posn = lseek(fd->fd_sys, 0, SEEK_END); ++ fd->fp_ind = fd->fp_sys_posn = lseek(fd->fd_sys, 0, SEEK_END); + + fd->fd_direct = -1; + if (fd->direct_write || fd->direct_read) { +@@ -81,20 +133,22 @@ + } + + /* --BEGIN ERROR HANDLING-- */ +- if (fd->fd_sys == -1 || ((fd->fd_direct == -1) && +- (fd->direct_write || fd->direct_read))) { ++ if (fd->fd_sys == -1 || ((fd->fd_direct == -1) && ++ (fd->direct_write || fd->direct_read))) { + if (errno == ENAMETOOLONG) + *error_code = MPIO_Err_create_code(MPI_SUCCESS, +- MPIR_ERR_RECOVERABLE, myname, +- __LINE__, MPI_ERR_BAD_FILE, ++ MPIR_ERR_RECOVERABLE, ++ myname, __LINE__, ++ MPI_ERR_BAD_FILE, + "**filenamelong", + "**filenamelong %s %d", + fd->filename, + strlen(fd->filename)); + else if (errno == ENOENT) + *error_code = MPIO_Err_create_code(MPI_SUCCESS, +- MPIR_ERR_RECOVERABLE, myname, +- __LINE__, MPI_ERR_NO_SUCH_FILE, ++ MPIR_ERR_RECOVERABLE, ++ myname, __LINE__, ++ MPI_ERR_NO_SUCH_FILE, + "**filenoexist", + "**filenoexist %s", + fd->filename); +@@ -108,27 +162,30 @@ + fd->filename); + else if (errno == EACCES) { + *error_code = MPIO_Err_create_code(MPI_SUCCESS, +- MPIR_ERR_RECOVERABLE, myname, +- __LINE__, MPI_ERR_ACCESS, ++ MPIR_ERR_RECOVERABLE, ++ myname, __LINE__, ++ MPI_ERR_ACCESS, + "**fileaccess", +- "**fileaccess %s", +- fd->filename ); +- } +- else if (errno == EROFS) { ++ "**fileaccess %s", ++ fd->filename); ++ } else if (errno == EROFS) { + /* Read only file or file system and write access requested */ + *error_code = MPIO_Err_create_code(MPI_SUCCESS, +- MPIR_ERR_RECOVERABLE, myname, +- __LINE__, MPI_ERR_READ_ONLY, +- "**ioneedrd", 0 ); +- } +- else { ++ MPIR_ERR_RECOVERABLE, ++ myname, __LINE__, ++ MPI_ERR_READ_ONLY, ++ "**ioneedrd", 0); ++ } else { + *error_code = MPIO_Err_create_code(MPI_SUCCESS, +- MPIR_ERR_RECOVERABLE, myname, +- __LINE__, MPI_ERR_IO, "**io", ++ MPIR_ERR_RECOVERABLE, ++ myname, __LINE__, ++ MPI_ERR_IO, "**io", + "**io %s", strerror(errno)); + } +- } ++ } else { + /* --END ERROR HANDLING-- */ +- else *error_code = MPI_SUCCESS; ++ *error_code = MPI_SUCCESS; ++ } + ++ ADIOI_Free(value); + } +diff -ruN ad_lustre_orig/ad_lustre_rwcontig.c ad_lustre/ad_lustre_rwcontig.c +--- ad_lustre_orig/ad_lustre_rwcontig.c 2008-09-17 14:36:57.000000000 +0800 ++++ ad_lustre/ad_lustre_rwcontig.c 2008-10-15 22:44:35.000000000 +0800 +@@ -1,9 +1,11 @@ + /* -*- Mode: C; c-basic-offset:4 ; -*- */ +-/* +- * Copyright (C) 1997 University of Chicago. ++/* ++ * Copyright (C) 1997 University of Chicago. + * See COPYRIGHT notice in top-level directory. + * + * Copyright (C) 2007 Oak Ridge National Laboratory ++ * ++ * Copyright (C) 2008 Sun Microsystems, Lustre group + */ + + #define _XOPEN_SOURCE 600 +diff -ruN ad_lustre_orig/ad_lustre_wrcoll.c ad_lustre/ad_lustre_wrcoll.c +--- ad_lustre_orig/ad_lustre_wrcoll.c 1970-01-01 08:00:00.000000000 +0800 ++++ ad_lustre/ad_lustre_wrcoll.c 2008-10-17 16:34:36.000000000 +0800 +@@ -0,0 +1,880 @@ ++/* -*- Mode: C; c-basic-offset:4 ; -*- */ ++/* ++ * Copyright (C) 1997 University of Chicago. ++ * See COPYRIGHT notice in top-level directory. ++ * ++ * Copyright (C) 2007 Oak Ridge National Laboratory ++ * ++ * Copyright (C) 2008 Sun Microsystems, Lustre group ++ */ ++ ++#include "ad_lustre.h" ++#include "adio_extern.h" ++ ++/* prototypes of functions used for collective writes only. */ ++static void ADIOI_LUSTRE_Exch_and_write(ADIO_File fd, void *buf, ++ MPI_Datatype datatype, int nprocs, ++ int myrank, ++ ADIOI_Access *others_req, ++ ADIOI_Access *my_req, ++ ADIO_Offset *offset_list, ++ int *len_list, ++ int contig_access_count, ++ int * striping_info, ++ int *buf_idx, int *error_code); ++static void ADIOI_LUSTRE_Fill_send_buffer(ADIO_File fd, void *buf, ++ ADIOI_Flatlist_node * flat_buf, ++ char **send_buf, ++ ADIO_Offset * offset_list, ++ int *len_list, int *send_size, ++ MPI_Request * requests, ++ int *sent_to_proc, int nprocs, ++ int myrank, int contig_access_count, ++ int * striping_info, ++ int *send_buf_idx, ++ int *curr_to_proc, ++ int *done_to_proc, int iter, ++ MPI_Aint buftype_extent); ++static void ADIOI_LUSTRE_W_Exchange_data(ADIO_File fd, void *buf, ++ char *write_buf, ++ ADIOI_Flatlist_node * flat_buf, ++ ADIO_Offset * offset_list, ++ int *len_list, int *send_size, ++ int *recv_size, ADIO_Offset off, ++ int size, int *count, ++ int *start_pos, int *partial_recv, ++ int *sent_to_proc, int nprocs, ++ int myrank, int buftype_is_contig, ++ int contig_access_count, ++ int * striping_info, ++ ADIOI_Access * others_req, ++ int *send_buf_idx, ++ int *curr_to_proc, ++ int *done_to_proc, int *hole, ++ int iter, MPI_Aint buftype_extent, ++ int *buf_idx, int *error_code); ++void ADIOI_Heap_merge(ADIOI_Access * others_req, int *count, ++ ADIO_Offset * srt_off, int *srt_len, int *start_pos, ++ int nprocs, int nprocs_recv, int total_elements); ++ ++void ADIOI_LUSTRE_WriteStridedColl(ADIO_File fd, void *buf, int count, ++ MPI_Datatype datatype, ++ int file_ptr_type, ADIO_Offset offset, ++ ADIO_Status * status, int *error_code) ++{ ++ ADIOI_Access *my_req; ++ /* array of nprocs access structures, one for each other process has ++ this process's request */ ++ ++ ADIOI_Access *others_req; ++ /* array of nprocs access structures, one for each other process ++ whose request is written by this process. */ ++ ++ int i, filetype_is_contig, nprocs, myrank, do_collect = 0; ++ int contig_access_count = 0, buftype_is_contig, interleave_count = 0; ++ int *count_my_req_per_proc, count_my_req_procs, count_others_req_procs; ++ ADIO_Offset orig_fp, start_offset, end_offset, off; ++ ADIO_Offset *offset_list = NULL, *st_offsets = NULL, *end_offsets = NULL; ++ int *buf_idx = NULL, *len_list = NULL, *striping_info = NULL; ++ int old_error, tmp_error; ++ ++ MPI_Comm_size(fd->comm, &nprocs); ++ MPI_Comm_rank(fd->comm, &myrank); ++ ++ orig_fp = fd->fp_ind; ++ ++ /* IO patten identification if cb_write isn't disabled */ ++ if (fd->hints->cb_write != ADIOI_HINT_DISABLE) { ++ /* For this process's request, calculate the list of offsets and ++ lengths in the file and determine the start and end offsets. */ ++ ADIOI_Calc_my_off_len(fd, count, datatype, file_ptr_type, offset, ++ &offset_list, &len_list, &start_offset, ++ &end_offset, &contig_access_count); ++ ++ /* each process communicates its start and end offsets to other ++ processes. The result is an array each of start and end offsets stored ++ in order of process rank. */ ++ st_offsets = (ADIO_Offset *) ADIOI_Malloc(nprocs * sizeof(ADIO_Offset)); ++ end_offsets = (ADIO_Offset *) ADIOI_Malloc(nprocs * sizeof(ADIO_Offset)); ++ MPI_Allgather(&start_offset, 1, ADIO_OFFSET, st_offsets, 1, ++ ADIO_OFFSET, fd->comm); ++ MPI_Allgather(&end_offset, 1, ADIO_OFFSET, end_offsets, 1, ++ ADIO_OFFSET, fd->comm); ++ /* are the accesses of different processes interleaved? */ ++ for (i = 1; i < nprocs; i++) ++ if ((st_offsets[i] < end_offsets[i-1]) && ++ (st_offsets[i] <= end_offsets[i])) ++ interleave_count++; ++ /* This is a rudimentary check for interleaving, but should suffice ++ for the moment. */ ++ ++ /* Two typical access patterns can benefit from collective write. ++ * 1) the processes are interleaved, and ++ * 2) the req size is small. ++ */ ++ if (interleave_count > 0) { ++ do_collect = 1; ++ } else { ++ do_collect = ADIOI_LUSTRE_Docollect(fd, contig_access_count, ++ len_list, nprocs); ++ } ++ } ++ ADIOI_Datatype_iscontig(datatype, &buftype_is_contig); ++ ++ /* Decide if collective I/O should be done */ ++ if ((!do_collect && fd->hints->cb_write == ADIOI_HINT_AUTO) || ++ fd->hints->cb_write == ADIOI_HINT_DISABLE) { ++ ++ int filerange_is_contig = 0; ++ ++ /* use independent accesses */ ++ if (fd->hints->cb_write != ADIOI_HINT_DISABLE) { ++ ADIOI_Free(offset_list); ++ ADIOI_Free(len_list); ++ ADIOI_Free(st_offsets); ++ ADIOI_Free(end_offsets); ++ } ++ ++ fd->fp_ind = orig_fp; ++ ADIOI_Datatype_iscontig(fd->filetype, &filetype_is_contig); ++ if (buftype_is_contig && filetype_is_contig) { ++ if (file_ptr_type == ADIO_EXPLICIT_OFFSET) { ++ off = fd->disp + (fd->etype_size) * offset; ++ ADIO_WriteContig(fd, buf, count, datatype, ++ ADIO_EXPLICIT_OFFSET, ++ off, status, error_code); ++ } else ++ ADIO_WriteContig(fd, buf, count, datatype, ADIO_INDIVIDUAL, ++ 0, status, error_code); ++ } else { ++ ADIO_WriteStrided(fd, buf, count, datatype, file_ptr_type, ++ offset, status, error_code); ++ } ++ return; ++ } ++ ++ /* Get Lustre hints information */ ++ ADIOI_LUSTRE_Get_striping_info(fd, &striping_info, 1); ++ /* calculate what portions of the access requests of this process are ++ * located in which process ++ */ ++ ADIOI_LUSTRE_Calc_my_req(fd, offset_list, len_list, contig_access_count, ++ striping_info, nprocs, &count_my_req_procs, ++ &count_my_req_per_proc, &my_req, &buf_idx); ++ /* calculate what process's requests will be written by this process */ ++ ADIOI_LUSTRE_Calc_others_req(fd, count_my_req_procs, ++ count_my_req_per_proc, ++ my_req, nprocs, myrank, ++ start_offset, end_offset, striping_info, ++ &count_others_req_procs, &others_req); ++ ADIOI_Free(count_my_req_per_proc); ++ ++ /* exchange data and write in sizes of no more than stripe_size. */ ++ ADIOI_LUSTRE_Exch_and_write(fd, buf, datatype, nprocs, myrank, ++ others_req, my_req, ++ offset_list, len_list, contig_access_count, ++ striping_info, buf_idx, error_code); ++ ++ old_error = *error_code; ++ if (*error_code != MPI_SUCCESS) ++ *error_code = MPI_ERR_IO; ++ ++ /* optimization: if only one process performing i/o, we can perform ++ * a less-expensive Bcast */ ++#ifdef ADIOI_MPE_LOGGING ++ MPE_Log_event(ADIOI_MPE_postwrite_a, 0, NULL); ++#endif ++ if (fd->hints->cb_nodes == 1) ++ MPI_Bcast(error_code, 1, MPI_INT, ++ fd->hints->ranklist[0], fd->comm); ++ else { ++ tmp_error = *error_code; ++ MPI_Allreduce(&tmp_error, error_code, 1, MPI_INT, ++ MPI_MAX, fd->comm); ++ } ++#ifdef ADIOI_MPE_LOGGING ++ MPE_Log_event(ADIOI_MPE_postwrite_b, 0, NULL); ++#endif ++ ++ if ((old_error != MPI_SUCCESS) && (old_error != MPI_ERR_IO)) ++ *error_code = old_error; ++ ++ ++ if (!buftype_is_contig) ++ ADIOI_Delete_flattened(datatype); ++ ++ /* free all memory allocated for collective I/O */ ++ /* free others_req */ ++ for (i = 0; i < nprocs; i++) { ++ if (others_req[i].count) { ++ ADIOI_Free(others_req[i].offsets); ++ ADIOI_Free(others_req[i].lens); ++ ADIOI_Free(others_req[i].mem_ptrs); ++ } ++ } ++ ADIOI_Free(others_req); ++ /* free my_req here */ ++ for (i = 0; i < nprocs; i++) { ++ if (my_req[i].count) { ++ ADIOI_Free(my_req[i].offsets); ++ ADIOI_Free(my_req[i].lens); ++ } ++ } ++ ADIOI_Free(my_req); ++ ADIOI_Free(buf_idx); ++ ADIOI_Free(offset_list); ++ ADIOI_Free(len_list); ++ ADIOI_Free(st_offsets); ++ ADIOI_Free(end_offsets); ++ ADIOI_Free(striping_info); ++ ++#ifdef HAVE_STATUS_SET_BYTES ++ if (status) { ++ int bufsize, size; ++ /* Don't set status if it isn't needed */ ++ MPI_Type_size(datatype, &size); ++ bufsize = size * count; ++ MPIR_Status_set_bytes(status, datatype, bufsize); ++ } ++ /* This is a temporary way of filling in status. The right way is to ++ * keep track of how much data was actually written during collective I/O. ++ */ ++#endif ++ ++ fd->fp_sys_posn = -1; /* set it to null. */ ++} ++ ++static void ADIOI_LUSTRE_Exch_and_write(ADIO_File fd, void *buf, ++ MPI_Datatype datatype, int nprocs, ++ int myrank, ADIOI_Access *others_req, ++ ADIOI_Access *my_req, ++ ADIO_Offset *offset_list, ++ int *len_list, int contig_access_count, ++ int *striping_info, int *buf_idx, ++ int *error_code) ++{ ++ int hole, i, j, m, flag, ntimes = 1 , max_ntimes, buftype_is_contig; ++ ADIO_Offset st_loc = -1, end_loc = -1, min_st_loc, max_end_loc; ++ ADIO_Offset off, req_off, send_off, iter_st_off, *off_list; ++ ADIO_Offset max_size, step_size = 0; ++ int real_size, req_len, send_len; ++ int *recv_curr_offlen_ptr, *recv_count, *recv_size; ++ int *send_curr_offlen_ptr, *send_size; ++ int *partial_recv, *sent_to_proc, *recv_start_pos; ++ int *send_buf_idx, *curr_to_proc, *done_to_proc; ++ char *write_buf = NULL, *value; ++ MPI_Status status; ++ ADIOI_Flatlist_node *flat_buf = NULL; ++ MPI_Aint buftype_extent; ++ int stripe_size = striping_info[0], avail_cb_nodes = striping_info[2]; ++ int lflag, data_sieving = 0; ++ ++ *error_code = MPI_SUCCESS; /* changed below if error */ ++ ++ /* calculate the number of writes of stripe size to be done. ++ * That gives the no. of communication phases as well. ++ * Note: ++ * Because we redistribute data in stripe-contiguous pattern for Lustre, ++ * each process has the same no. of communication phases. ++ */ ++ ++ for (i = 0; i < nprocs; i++) { ++ if (others_req[i].count) { ++ st_loc = others_req[i].offsets[0]; ++ end_loc = others_req[i].offsets[0]; ++ break; ++ } ++ } ++ for (i = 0; i < nprocs; i++) { ++ for (j = 0; j < others_req[i].count; j++) { ++ st_loc = ADIOI_MIN(st_loc, others_req[i].offsets[j]); ++ end_loc = ADIOI_MAX(end_loc, (others_req[i].offsets[j] + ++ others_req[i].lens[j] - 1)); ++ } ++ } ++ /* this process does no writing. */ ++ if ((st_loc == -1) && (end_loc == -1)) ++ ntimes = 0; ++ MPI_Allreduce(&end_loc, &max_end_loc, 1, MPI_LONG_LONG_INT, MPI_MAX, fd->comm); ++ /* avoid min_st_loc be -1 */ ++ if (st_loc == -1) ++ st_loc = max_end_loc; ++ MPI_Allreduce(&st_loc, &min_st_loc, 1, MPI_LONG_LONG_INT, MPI_MIN, fd->comm); ++ /* align downward */ ++ min_st_loc -= min_st_loc % (ADIO_Offset)stripe_size; ++ ++ /* Each time, only avail_cb_nodes number of IO clients perform IO, ++ * so, step_size=avail_cb_nodes*stripe_size IO will be performed at most, ++ * and ntimes=whole_file_portion/step_size ++ */ ++ step_size = (ADIO_Offset) avail_cb_nodes * stripe_size; ++ max_ntimes = (int)((max_end_loc - min_st_loc) / step_size + 1); ++ if (ntimes) ++ write_buf = (char *) ADIOI_Malloc(stripe_size); ++ ++ /* calculate the start offset for each iteration */ ++ off_list = (ADIO_Offset *) ADIOI_Malloc(max_ntimes * sizeof(ADIO_Offset)); ++ for (m = 0; m < max_ntimes; m ++) ++ off_list[m] = max_end_loc; ++ for (i = 0; i < nprocs; i++) { ++ for (j = 0; j < others_req[i].count; j ++) { ++ req_off = others_req[i].offsets[j]; ++ m = (int)((req_off - min_st_loc) / step_size); ++ off_list[m] = ADIOI_MIN(off_list[m], req_off); ++ } ++ } ++ ++ recv_curr_offlen_ptr = (int *) ADIOI_Calloc(nprocs, sizeof(int)); ++ send_curr_offlen_ptr = (int *) ADIOI_Calloc(nprocs, sizeof(int)); ++ /* their use is explained below. calloc initializes to 0. */ ++ ++ recv_count = (int *) ADIOI_Malloc(nprocs * sizeof(int)); ++ /* to store count of how many off-len pairs per proc are satisfied ++ in an iteration. */ ++ ++ send_size = (int *) ADIOI_Malloc(nprocs * sizeof(int)); ++ /* total size of data to be sent to each proc. in an iteration. ++ Of size nprocs so that I can use MPI_Alltoall later. */ ++ ++ recv_size = (int *) ADIOI_Malloc(nprocs * sizeof(int)); ++ /* total size of data to be recd. from each proc. in an iteration. */ ++ ++ sent_to_proc = (int *) ADIOI_Calloc(nprocs, sizeof(int)); ++ /* amount of data sent to each proc so far. Used in ++ ADIOI_Fill_send_buffer. initialized to 0 here. */ ++ ++ send_buf_idx = (int *) ADIOI_Malloc(nprocs * sizeof(int)); ++ curr_to_proc = (int *) ADIOI_Malloc(nprocs * sizeof(int)); ++ done_to_proc = (int *) ADIOI_Malloc(nprocs * sizeof(int)); ++ /* Above three are used in ADIOI_Fill_send_buffer */ ++ ++ recv_start_pos = (int *) ADIOI_Malloc(nprocs * sizeof(int)); ++ /* used to store the starting value of recv_curr_offlen_ptr[i] in ++ this iteration */ ++ ++ ADIOI_Datatype_iscontig(datatype, &buftype_is_contig); ++ if (!buftype_is_contig) { ++ ADIOI_Flatten_datatype(datatype); ++ flat_buf = ADIOI_Flatlist; ++ while (flat_buf->type != datatype) ++ flat_buf = flat_buf->next; ++ } ++ MPI_Type_extent(datatype, &buftype_extent); ++ ++ iter_st_off = min_st_loc; ++ ++ /* Although we have recognized the data according to OST index, ++ * a read-modify-write will be done if there is a hole between the data. ++ * For example: if blocksize=60, xfersize=30 and stripe_size=100, ++ * then rank0 will collect data [0, 30] and [60, 90] then write. There ++ * is a hole in [30, 60], which will cause a read-modify-write in [0, 90]. ++ * ++ * To reduce its impact on the performance, we disable data sieving ++ * by default, unless the hint "ds_in_coll" is enabled. ++ */ ++ /* check the hint for data sieving */ ++ value = (char *) ADIOI_Malloc((MPI_MAX_INFO_VAL + 1) * sizeof(char)); ++ MPI_Info_get(fd->info, "ds_in_coll", MPI_MAX_INFO_VAL, value, &lflag); ++ if (lflag && !strcmp(value, "enable")) ++ data_sieving = 1; ++ ADIOI_Free(value); ++ ++ for (m = 0; m < max_ntimes; m++) { ++ /* go through all others_req and my_req to check which will be received ++ * and sent in this iteration. ++ */ ++ ++ /* Note that MPI guarantees that displacements in filetypes are in ++ monotonically nondecreasing order and that, for writes, the ++ filetypes cannot specify overlapping regions in the file. This ++ simplifies implementation a bit compared to reads. */ ++ ++ /* ++ off = start offset in the file for the data to be written in ++ this iteration ++ iter_st_off = start offset of this iteration ++ real_size = size of data written (bytes) corresponding to off ++ max_size = possible maximum size of data written in this iteration ++ req_off = offset in the file for a particular contiguous request minus ++ what was satisfied in previous iteration ++ send_off = offset the request needed by other processes in this iteration ++ req_len = size corresponding to req_off ++ send_len = size corresponding to send_off ++ */ ++ ++ /* first calculate what should be communicated */ ++ for (i = 0; i < nprocs; i++) ++ recv_count[i] = recv_size[i] = send_size[i] = 0; ++ ++ off = off_list[m]; ++ max_size = ADIOI_MIN(step_size, max_end_loc - iter_st_off + 1); ++ real_size = (int) ADIOI_MIN((off / stripe_size + 1) * stripe_size - off, ++ end_loc - off + 1); ++ ++ for (i = 0; i < nprocs; i++) { ++ if (my_req[i].count) { ++ for (j = send_curr_offlen_ptr[i]; j < my_req[i].count; j++) { ++ send_off = my_req[i].offsets[j]; ++ send_len = my_req[i].lens[j]; ++ if (send_off < iter_st_off + max_size) { ++ send_size[i] += send_len; ++ } else { ++ break; ++ } ++ } ++ send_curr_offlen_ptr[i] = j; ++ } ++ if (others_req[i].count) { ++ recv_start_pos[i] = recv_curr_offlen_ptr[i]; ++ for (j = recv_curr_offlen_ptr[i]; j < others_req[i].count; j++) { ++ req_off = others_req[i].offsets[j]; ++ req_len = others_req[i].lens[j]; ++ if (req_off < iter_st_off + max_size) { ++ recv_count[i]++; ++ MPI_Address(write_buf + req_off - off, ++ &(others_req[i].mem_ptrs[j])); ++ recv_size[i] += req_len; ++ } else { ++ break; ++ } ++ } ++ recv_curr_offlen_ptr[i] = j; ++ } ++ } ++ /* use variable "hole" to pass data_sieving flag into W_Exchange_data */ ++ hole = data_sieving; ++ ADIOI_LUSTRE_W_Exchange_data(fd, buf, write_buf, flat_buf, offset_list, ++ len_list, send_size, recv_size, off, real_size, ++ recv_count, recv_start_pos, partial_recv, ++ sent_to_proc, nprocs, myrank, ++ buftype_is_contig, contig_access_count, ++ striping_info, others_req, send_buf_idx, ++ curr_to_proc, done_to_proc, &hole, m, ++ buftype_extent, buf_idx, error_code); ++ if (*error_code != MPI_SUCCESS) ++ goto over; ++ ++ flag = 0; ++ for (i = 0; i < nprocs; i++) ++ if (recv_count[i]) { ++ flag = 1; ++ break; ++ } ++ if (flag) { ++ /* check whether to do data sieving */ ++ if(data_sieving) { ++ ADIO_WriteContig(fd, write_buf, real_size, MPI_BYTE, ++ ADIO_EXPLICIT_OFFSET, off, &status, ++ error_code); ++ } else { ++ /* if there is no hole, write data in one time; ++ * otherwise, write data in several times */ ++ if (!hole) { ++ ADIO_WriteContig(fd, write_buf, real_size, MPI_BYTE, ++ ADIO_EXPLICIT_OFFSET, off, &status, ++ error_code); ++ } else { ++ for (i = 0; i < nprocs; i++) { ++ if (others_req[i].count) { ++ for (j = 0; j < others_req[i].count; j++) { ++ if (others_req[i].offsets[j] < off + real_size && ++ others_req[i].offsets[j] >= off) { ++ ADIO_WriteContig(fd, ++ write_buf + others_req[i].offsets[j] - off, ++ others_req[i].lens[j], ++ MPI_BYTE, ADIO_EXPLICIT_OFFSET, ++ others_req[i].offsets[j], &status, ++ error_code); ++ if (*error_code != MPI_SUCCESS) ++ goto over; ++ } ++ } ++ } ++ } ++ } ++ } ++ if (*error_code != MPI_SUCCESS) ++ goto over; ++ } ++ iter_st_off += max_size; ++ } ++over: ++ if (ntimes) ++ ADIOI_Free(write_buf); ++ ADIOI_Free(recv_curr_offlen_ptr); ++ ADIOI_Free(send_curr_offlen_ptr); ++ ADIOI_Free(recv_count); ++ ADIOI_Free(send_size); ++ ADIOI_Free(recv_size); ++ ADIOI_Free(sent_to_proc); ++ ADIOI_Free(recv_start_pos); ++ ADIOI_Free(send_buf_idx); ++ ADIOI_Free(curr_to_proc); ++ ADIOI_Free(done_to_proc); ++ ADIOI_Free(off_list); ++} ++ ++static void ADIOI_LUSTRE_W_Exchange_data(ADIO_File fd, void *buf, ++ char *write_buf, ++ ADIOI_Flatlist_node * flat_buf, ++ ADIO_Offset * offset_list, ++ int *len_list, int *send_size, ++ int *recv_size, ADIO_Offset off, ++ int size, int *count, ++ int *start_pos, int *partial_recv, ++ int *sent_to_proc, int nprocs, ++ int myrank, int buftype_is_contig, ++ int contig_access_count, ++ int * striping_info, ++ ADIOI_Access * others_req, ++ int *send_buf_idx, ++ int *curr_to_proc, int *done_to_proc, ++ int *hole, int iter, ++ MPI_Aint buftype_extent, ++ int *buf_idx, int *error_code) ++{ ++ int i, j, nprocs_recv, nprocs_send, err; ++ char **send_buf = NULL; ++ MPI_Request *requests, *send_req; ++ MPI_Datatype *recv_types; ++ MPI_Status *statuses, status; ++ int *srt_len, sum, sum_recv; ++ ADIO_Offset *srt_off; ++ int data_sieving = *hole; ++ static char myname[] = "ADIOI_W_EXCHANGE_DATA"; ++ ++ /* create derived datatypes for recv */ ++ nprocs_recv = 0; ++ for (i = 0; i < nprocs; i++) ++ if (recv_size[i]) ++ nprocs_recv++; ++ ++ recv_types = (MPI_Datatype *) ADIOI_Malloc((nprocs_recv + 1) * ++ sizeof(MPI_Datatype)); ++ /* +1 to avoid a 0-size malloc */ ++ ++ j = 0; ++ for (i = 0; i < nprocs; i++) { ++ if (recv_size[i]) { ++ MPI_Type_hindexed(count[i], ++ &(others_req[i].lens[start_pos[i]]), ++ &(others_req[i].mem_ptrs[start_pos[i]]), ++ MPI_BYTE, recv_types + j); ++ /* absolute displacements; use MPI_BOTTOM in recv */ ++ MPI_Type_commit(recv_types + j); ++ j++; ++ } ++ } ++ ++ /* To avoid a read-modify-write, ++ * check if there are holes in the data to be written. ++ * For this, merge the (sorted) offset lists others_req using a heap-merge. ++ */ ++ ++ sum = 0; ++ for (i = 0; i < nprocs; i++) ++ sum += count[i]; ++ srt_off = (ADIO_Offset *) ADIOI_Malloc((sum + 1) * sizeof(ADIO_Offset)); ++ srt_len = (int *) ADIOI_Malloc((sum + 1) * sizeof(int)); ++ /* +1 to avoid a 0-size malloc */ ++ ++ ADIOI_Heap_merge(others_req, count, srt_off, srt_len, start_pos, ++ nprocs, nprocs_recv, sum); ++ ++ /* check if there are any holes */ ++ *hole = 0; ++ for (i = 0; i < sum - 1; i++) { ++ if (srt_off[i] + srt_len[i] < srt_off[i + 1]) { ++ *hole = 1; ++ break; ++ } ++ } ++ /* In some cases (see John Bent ROMIO REQ # 835), an odd interaction ++ * between aggregation, nominally contiguous regions, and cb_buffer_size ++ * should be handled with a read-modify-write (otherwise we will write out ++ * more data than we receive from everyone else (inclusive), so override ++ * hole detection ++ */ ++ if (*hole == 0) { ++ sum_recv = 0; ++ for (i = 0; i < nprocs; i++) ++ sum_recv += recv_size[i]; ++ if (size > sum_recv) ++ *hole = 1; ++ } ++ /* check the hint for data sieving */ ++ if (data_sieving && nprocs_recv && *hole) { ++ ADIO_ReadContig(fd, write_buf, size, MPI_BYTE, ++ ADIO_EXPLICIT_OFFSET, off, &status, &err); ++ // --BEGIN ERROR HANDLING-- ++ if (err != MPI_SUCCESS) { ++ *error_code = MPIO_Err_create_code(err, ++ MPIR_ERR_RECOVERABLE, ++ myname, __LINE__, ++ MPI_ERR_IO, ++ "**ioRMWrdwr", 0); ++ ADIOI_Free(recv_types); ++ ADIOI_Free(srt_off); ++ ADIOI_Free(srt_len); ++ return; ++ } ++ // --END ERROR HANDLING-- ++ } ++ ADIOI_Free(srt_off); ++ ADIOI_Free(srt_len); ++ ++ nprocs_send = 0; ++ for (i = 0; i < nprocs; i++) ++ if (send_size[i]) ++ nprocs_send++; ++ ++ if (fd->atomicity) { ++ /* bug fix from Wei-keng Liao and Kenin Coloma */ ++ requests = (MPI_Request *) ADIOI_Malloc((nprocs_send + 1) * ++ sizeof(MPI_Request)); ++ send_req = requests; ++ } else { ++ requests = (MPI_Request *) ADIOI_Malloc((nprocs_send + nprocs_recv + 1)* ++ sizeof(MPI_Request)); ++ /* +1 to avoid a 0-size malloc */ ++ ++ /* post receives */ ++ j = 0; ++ for (i = 0; i < nprocs; i++) { ++ if (recv_size[i]) { ++ MPI_Irecv(MPI_BOTTOM, 1, recv_types[j], i, ++ myrank + i + 100 * iter, fd->comm, requests + j); ++ j++; ++ } ++ } ++ send_req = requests + nprocs_recv; ++ } ++ ++ /* post sends. ++ * if buftype_is_contig, data can be directly sent from ++ * user buf at location given by buf_idx. else use send_buf. ++ */ ++ if (buftype_is_contig) { ++ j = 0; ++ for (i = 0; i < nprocs; i++) ++ if (send_size[i]) { ++ MPI_Isend(((char *) buf) + buf_idx[i], send_size[i], ++ MPI_BYTE, i, myrank + i + 100 * iter, fd->comm, ++ send_req + j); ++ j++; ++ buf_idx[i] += send_size[i]; ++ } ++ } else if (nprocs_send) { ++ /* buftype is not contig */ ++ send_buf = (char **) ADIOI_Malloc(nprocs * sizeof(char *)); ++ for (i = 0; i < nprocs; i++) ++ if (send_size[i]) ++ send_buf[i] = (char *) ADIOI_Malloc(send_size[i]); ++ ++ ADIOI_LUSTRE_Fill_send_buffer(fd, buf, flat_buf, send_buf, offset_list, ++ len_list, send_size, send_req, ++ sent_to_proc, nprocs, myrank, ++ contig_access_count, striping_info, ++ send_buf_idx, curr_to_proc, done_to_proc, ++ iter, buftype_extent); ++ /* the send is done in ADIOI_Fill_send_buffer */ ++ } ++ ++ /* bug fix from Wei-keng Liao and Kenin Coloma */ ++ if (fd->atomicity) { ++ j = 0; ++ for (i = 0; i < nprocs; i++) { ++ MPI_Status wkl_status; ++ if (recv_size[i]) { ++ MPI_Recv(MPI_BOTTOM, 1, recv_types[j], i, ++ myrank + i + 100 * iter, fd->comm, &wkl_status); ++ j++; ++ } ++ } ++ } ++ ++ for (i = 0; i < nprocs_recv; i++) ++ MPI_Type_free(recv_types + i); ++ ADIOI_Free(recv_types); ++ ++ /* bug fix from Wei-keng Liao and Kenin Coloma */ ++ /* +1 to avoid a 0-size malloc */ ++ if (fd->atomicity) { ++ statuses = (MPI_Status *) ADIOI_Malloc((nprocs_send + 1) * ++ sizeof(MPI_Status)); ++ } else { ++ statuses = (MPI_Status *) ADIOI_Malloc((nprocs_send + nprocs_recv + 1) * ++ sizeof(MPI_Status)); ++ } ++ ++#ifdef NEEDS_MPI_TEST ++ i = 0; ++ if (fd->atomicity) { ++ /* bug fix from Wei-keng Liao and Kenin Coloma */ ++ while (!i) ++ MPI_Testall(nprocs_send, send_req, &i, statuses); ++ } else { ++ while (!i) ++ MPI_Testall(nprocs_send + nprocs_recv, requests, &i, statuses); ++ } ++#else ++ /* bug fix from Wei-keng Liao and Kenin Coloma */ ++ if (fd->atomicity) ++ MPI_Waitall(nprocs_send, send_req, statuses); ++ else ++ MPI_Waitall(nprocs_send + nprocs_recv, requests, statuses); ++#endif ++ ADIOI_Free(statuses); ++ ADIOI_Free(requests); ++ if (!buftype_is_contig && nprocs_send) { ++ for (i = 0; i < nprocs; i++) ++ if (send_size[i]) ++ ADIOI_Free(send_buf[i]); ++ ADIOI_Free(send_buf); ++ } ++} ++ ++#define ADIOI_BUF_INCR \ ++{ \ ++ while (buf_incr) { \ ++ size_in_buf = ADIOI_MIN(buf_incr, flat_buf_sz); \ ++ user_buf_idx += size_in_buf; \ ++ flat_buf_sz -= size_in_buf; \ ++ if (!flat_buf_sz) { \ ++ if (flat_buf_idx < (flat_buf->count - 1)) flat_buf_idx++; \ ++ else { \ ++ flat_buf_idx = 0; \ ++ n_buftypes++; \ ++ } \ ++ user_buf_idx = flat_buf->indices[flat_buf_idx] + \ ++ n_buftypes*buftype_extent; \ ++ flat_buf_sz = flat_buf->blocklens[flat_buf_idx]; \ ++ } \ ++ buf_incr -= size_in_buf; \ ++ } \ ++} ++ ++ ++#define ADIOI_BUF_COPY \ ++{ \ ++ while (size) { \ ++ size_in_buf = ADIOI_MIN(size, flat_buf_sz); \ ++ memcpy(&(send_buf[p][send_buf_idx[p]]), \ ++ ((char *) buf) + user_buf_idx, size_in_buf); \ ++ send_buf_idx[p] += size_in_buf; \ ++ user_buf_idx += size_in_buf; \ ++ flat_buf_sz -= size_in_buf; \ ++ if (!flat_buf_sz) { \ ++ if (flat_buf_idx < (flat_buf->count - 1)) flat_buf_idx++; \ ++ else { \ ++ flat_buf_idx = 0; \ ++ n_buftypes++; \ ++ } \ ++ user_buf_idx = flat_buf->indices[flat_buf_idx] + \ ++ n_buftypes*buftype_extent; \ ++ flat_buf_sz = flat_buf->blocklens[flat_buf_idx]; \ ++ } \ ++ size -= size_in_buf; \ ++ buf_incr -= size_in_buf; \ ++ } \ ++ ADIOI_BUF_INCR \ ++} ++ ++static void ADIOI_LUSTRE_Fill_send_buffer(ADIO_File fd, void *buf, ++ ADIOI_Flatlist_node * flat_buf, ++ char **send_buf, ++ ADIO_Offset * offset_list, ++ int *len_list, int *send_size, ++ MPI_Request * requests, ++ int *sent_to_proc, int nprocs, ++ int myrank, ++ int contig_access_count, ++ int * striping_info, ++ int *send_buf_idx, ++ int *curr_to_proc, ++ int *done_to_proc, int iter, ++ MPI_Aint buftype_extent) ++{ ++ /* this function is only called if buftype is not contig */ ++ int i, p, flat_buf_idx, size; ++ int flat_buf_sz, buf_incr, size_in_buf, jj, n_buftypes; ++ ADIO_Offset off, len, rem_len, user_buf_idx; ++ ++ /* curr_to_proc[p] = amount of data sent to proc. p that has already ++ * been accounted for so far ++ * done_to_proc[p] = amount of data already sent to proc. p in ++ * previous iterations ++ * user_buf_idx = current location in user buffer ++ * send_buf_idx[p] = current location in send_buf of proc. p ++ */ ++ ++ for (i = 0; i < nprocs; i++) { ++ send_buf_idx[i] = curr_to_proc[i] = 0; ++ done_to_proc[i] = sent_to_proc[i]; ++ } ++ jj = 0; ++ ++ user_buf_idx = flat_buf->indices[0]; ++ flat_buf_idx = 0; ++ n_buftypes = 0; ++ flat_buf_sz = flat_buf->blocklens[0]; ++ ++ /* flat_buf_idx = current index into flattened buftype ++ * flat_buf_sz = size of current contiguous component in flattened buf ++ */ ++ for (i = 0; i < contig_access_count; i++) { ++ off = offset_list[i]; ++ rem_len = (ADIO_Offset) len_list[i]; ++ ++ /*this request may span to more than one process */ ++ while (rem_len != 0) { ++ len = rem_len; ++ /* NOTE: len value is modified by ADIOI_Calc_aggregator() to be no ++ * longer than the single region that processor "p" is responsible ++ * for. ++ */ ++ p = ADIOI_LUSTRE_Calc_aggregator(fd, off, &len, striping_info); ++ ++ if (send_buf_idx[p] < send_size[p]) { ++ if (curr_to_proc[p] + len > done_to_proc[p]) { ++ if (done_to_proc[p] > curr_to_proc[p]) { ++ size = (int) ADIOI_MIN(curr_to_proc[p] + len - ++ done_to_proc[p], ++ send_size[p] - ++ send_buf_idx[p]); ++ buf_incr = done_to_proc[p] - curr_to_proc[p]; ++ ADIOI_BUF_INCR ++ buf_incr = (int) (curr_to_proc[p] + len - ++ done_to_proc[p]); ++ curr_to_proc[p] = done_to_proc[p] + size; ++ ADIOI_BUF_COPY ++ } else { ++ size = (int) ADIOI_MIN(len, send_size[p] - ++ send_buf_idx[p]); ++ buf_incr = (int) len; ++ curr_to_proc[p] += size; ++ ADIOI_BUF_COPY ++ } ++ if (send_buf_idx[p] == send_size[p]) { ++ MPI_Isend(send_buf[p], send_size[p], MPI_BYTE, p, ++ myrank + p + 100 * iter, fd->comm, ++ requests + jj); ++ jj++; ++ } ++ } else { ++ curr_to_proc[p] += (int) len; ++ buf_incr = (int) len; ++ ADIOI_BUF_INCR ++ } ++ } else { ++ buf_incr = (int) len; ++ ADIOI_BUF_INCR ++ } ++ off += len; ++ rem_len -= len; ++ } ++ } ++ for (i = 0; i < nprocs; i++) ++ if (send_size[i]) ++ sent_to_proc[i] = curr_to_proc[i]; ++} +diff -ruN ad_lustre_orig/ad_lustre_wrstr.c ad_lustre/ad_lustre_wrstr.c +--- ad_lustre_orig/ad_lustre_wrstr.c 1970-01-01 08:00:00.000000000 +0800 ++++ ad_lustre/ad_lustre_wrstr.c 2008-10-13 15:34:53.000000000 +0800 +@@ -0,0 +1,472 @@ ++/* -*- Mode: C; c-basic-offset:4 ; -*- */ ++/* ++ * Copyright (C) 1997 University of Chicago. ++ * See COPYRIGHT notice in top-level directory. ++ * ++ * Copyright (C) 2007 Oak Ridge National Laboratory ++ * ++ * Copyright (C) 2008 Sun Microsystems, Lustre group ++ */ ++ ++#include "ad_lustre.h" ++#include "adio_extern.h" ++ ++#define ADIOI_BUFFERED_WRITE \ ++{ \ ++ if (req_off >= writebuf_off + writebuf_len) { \ ++ if (writebuf_len) { \ ++ ADIO_WriteContig(fd, writebuf, writebuf_len, MPI_BYTE, \ ++ ADIO_EXPLICIT_OFFSET, writebuf_off, &status1, error_code); \ ++ if (!(fd->atomicity)) \ ++ ADIOI_UNLOCK(fd, writebuf_off, SEEK_SET, writebuf_len); \ ++ if (*error_code != MPI_SUCCESS) { \ ++ *error_code = MPIO_Err_create_code(*error_code, \ ++ MPIR_ERR_RECOVERABLE, myname, \ ++ __LINE__, MPI_ERR_IO, \ ++ "**iowswc", 0); \ ++ ADIOI_Free(writebuf); \ ++ return; \ ++ } \ ++ } \ ++ writebuf_off = req_off; \ ++ /* stripe_size alignment */ \ ++ writebuf_len = (int) ADIOI_MIN(end_offset - writebuf_off + 1, \ ++ (writebuf_off / stripe_size + 1) * \ ++ stripe_size - writebuf_off);\ ++ if (!(fd->atomicity)) \ ++ ADIOI_WRITE_LOCK(fd, writebuf_off, SEEK_SET, writebuf_len); \ ++ ADIO_ReadContig(fd, writebuf, writebuf_len, MPI_BYTE, ADIO_EXPLICIT_OFFSET,\ ++ writebuf_off, &status1, error_code); \ ++ if (*error_code != MPI_SUCCESS) { \ ++ *error_code = MPIO_Err_create_code(*error_code, \ ++ MPIR_ERR_RECOVERABLE, myname, \ ++ __LINE__, MPI_ERR_IO, \ ++ "**iowsrc", 0); \ ++ ADIOI_Free(writebuf); \ ++ return; \ ++ } \ ++ } \ ++ write_sz = (int) ADIOI_MIN(req_len, writebuf_off + writebuf_len - req_off); \ ++ memcpy(writebuf + req_off - writebuf_off, (char *)buf + userbuf_off, write_sz);\ ++ while (write_sz != req_len) {\ ++ ADIO_WriteContig(fd, writebuf, writebuf_len, MPI_BYTE, \ ++ ADIO_EXPLICIT_OFFSET, writebuf_off, &status1, error_code); \ ++ if (!(fd->atomicity)) \ ++ ADIOI_UNLOCK(fd, writebuf_off, SEEK_SET, writebuf_len); \ ++ if (*error_code != MPI_SUCCESS) { \ ++ *error_code = MPIO_Err_create_code(*error_code, \ ++ MPIR_ERR_RECOVERABLE, myname, \ ++ __LINE__, MPI_ERR_IO, \ ++ "**iowswc", 0); \ ++ ADIOI_Free(writebuf); \ ++ return; \ ++ } \ ++ req_len -= write_sz; \ ++ userbuf_off += write_sz; \ ++ writebuf_off += writebuf_len; \ ++ /* stripe_size alignment */ \ ++ writebuf_len = (int) ADIOI_MIN(end_offset - writebuf_off + 1, \ ++ (writebuf_off / stripe_size + 1) * \ ++ stripe_size - writebuf_off);\ ++ if (!(fd->atomicity)) \ ++ ADIOI_WRITE_LOCK(fd, writebuf_off, SEEK_SET, writebuf_len); \ ++ ADIO_ReadContig(fd, writebuf, writebuf_len, MPI_BYTE, ADIO_EXPLICIT_OFFSET,\ ++ writebuf_off, &status1, error_code); \ ++ if (*error_code != MPI_SUCCESS) { \ ++ *error_code = MPIO_Err_create_code(*error_code, \ ++ MPIR_ERR_RECOVERABLE, myname, \ ++ __LINE__, MPI_ERR_IO, \ ++ "**iowsrc", 0); \ ++ ADIOI_Free(writebuf); \ ++ return; \ ++ } \ ++ write_sz = ADIOI_MIN(req_len, writebuf_len); \ ++ memcpy(writebuf, (char *)buf + userbuf_off, write_sz);\ ++ } \ ++} ++ ++ ++/* this macro is used when filetype is contig and buftype is not contig. ++ it does not do a read-modify-write and does not lock*/ ++#define ADIOI_BUFFERED_WRITE_WITHOUT_READ \ ++{ \ ++ if (req_off >= writebuf_off + writebuf_len) { \ ++ writebuf_off = req_off; \ ++ /* stripe_size alignment */ \ ++ writebuf_len = (int) ADIOI_MIN(end_offset - writebuf_off + 1, \ ++ (writebuf_off / stripe_size + 1) * \ ++ stripe_size - writebuf_off);\ ++ } \ ++ write_sz = (int) ADIOI_MIN(req_len, writebuf_off + writebuf_len - req_off); \ ++ memcpy(writebuf + req_off - writebuf_off, (char *)buf + userbuf_off, write_sz);\ ++ while (req_len) { \ ++ ADIO_WriteContig(fd, writebuf, writebuf_len, MPI_BYTE, \ ++ ADIO_EXPLICIT_OFFSET, writebuf_off, &status1, error_code); \ ++ if (*error_code != MPI_SUCCESS) { \ ++ *error_code = MPIO_Err_create_code(*error_code, \ ++ MPIR_ERR_RECOVERABLE, myname, \ ++ __LINE__, MPI_ERR_IO, \ ++ "**iowswc", 0); \ ++ ADIOI_Free(writebuf); \ ++ return; \ ++ } \ ++ req_len -= write_sz; \ ++ userbuf_off += write_sz; \ ++ writebuf_off += writebuf_len; \ ++ /* stripe_size alignment */ \ ++ writebuf_len = (int) ADIOI_MIN(end_offset - writebuf_off + 1, \ ++ (writebuf_off / stripe_size + 1) * \ ++ stripe_size - writebuf_off);\ ++ write_sz = ADIOI_MIN(req_len, writebuf_len); \ ++ memcpy(writebuf, (char *)buf + userbuf_off, write_sz);\ ++ } \ ++} ++ ++void ADIOI_LUSTRE_WriteStrided(ADIO_File fd, void *buf, int count, ++ MPI_Datatype datatype, int file_ptr_type, ++ ADIO_Offset offset, ADIO_Status * status, ++ int *error_code) ++{ ++ /* offset is in units of etype relative to the filetype. */ ++ ADIOI_Flatlist_node *flat_buf, *flat_file; ++ int i, j, k, bwr_size, fwr_size = 0, st_index = 0; ++ int bufsize, num, size, sum, n_etypes_in_filetype, size_in_filetype; ++ int n_filetypes, etype_in_filetype; ++ ADIO_Offset abs_off_in_filetype = 0; ++ int filetype_size, etype_size, buftype_size, req_len; ++ MPI_Aint filetype_extent, buftype_extent; ++ int buf_count, buftype_is_contig, filetype_is_contig; ++ ADIO_Offset userbuf_off; ++ ADIO_Offset off, req_off, disp, end_offset = 0, writebuf_off, start_off; ++ char *writebuf; ++ int flag, st_fwr_size, st_n_filetypes, writebuf_len, write_sz; ++ ADIO_Status status1; ++ int new_bwr_size, new_fwr_size; ++ char * value; ++ int stripe_size, lflag = 0; ++ static char myname[] = "ADIOI_LUSTRE_WriteStrided"; ++ int myrank; ++ MPI_Comm_rank(fd->comm, &myrank); ++ ++ if (fd->hints->ds_write == ADIOI_HINT_DISABLE) { ++ /* if user has disabled data sieving on writes, use naive ++ * approach instead. ++ */ ++ ADIOI_GEN_WriteStrided_naive(fd, ++ buf, ++ count, ++ datatype, ++ file_ptr_type, ++ offset, status, error_code); ++ return; ++ } ++ ++ *error_code = MPI_SUCCESS; /* changed below if error */ ++ ++ ADIOI_Datatype_iscontig(datatype, &buftype_is_contig); ++ ADIOI_Datatype_iscontig(fd->filetype, &filetype_is_contig); ++ ++ MPI_Type_size(fd->filetype, &filetype_size); ++ if (!filetype_size) { ++ *error_code = MPI_SUCCESS; ++ return; ++ } ++ ++ MPI_Type_extent(fd->filetype, &filetype_extent); ++ MPI_Type_size(datatype, &buftype_size); ++ MPI_Type_extent(datatype, &buftype_extent); ++ etype_size = fd->etype_size; ++ ++ bufsize = buftype_size * count; ++ ++ /* get striping info */ ++ value = (char *) ADIOI_Malloc((MPI_MAX_INFO_VAL + 1) * sizeof(char)); ++ MPI_Info_get(fd->info, "striping_unit", MPI_MAX_INFO_VAL, value, &lflag); ++ if (lflag) ++ stripe_size = atoi(value); ++ ADIOI_Free(value); ++ ++ /* Different buftype to different filetype */ ++ if (!buftype_is_contig && filetype_is_contig) { ++ /* noncontiguous in memory, contiguous in file. */ ++ ADIOI_Flatten_datatype(datatype); ++ flat_buf = ADIOI_Flatlist; ++ while (flat_buf->type != datatype) ++ flat_buf = flat_buf->next; ++ ++ off = (file_ptr_type == ADIO_INDIVIDUAL) ? fd->fp_ind : ++ fd->disp + etype_size * offset; ++ ++ start_off = off; ++ end_offset = start_off + bufsize - 1; ++ writebuf_off = start_off; ++ /* write stripe size buffer each time */ ++ writebuf = (char *) ADIOI_Malloc(ADIOI_MIN(bufsize, stripe_size)); ++ writebuf_len = (int) ADIOI_MIN(bufsize, ++ (writebuf_off / stripe_size + 1) * ++ stripe_size - writebuf_off); ++ ++ /* if atomicity is true, lock the region to be accessed */ ++ if (fd->atomicity) ++ ADIOI_WRITE_LOCK(fd, start_off, SEEK_SET, bufsize); ++ ++ for (j = 0; j < count; j++) { ++ for (i = 0; i < flat_buf->count; i++) { ++ userbuf_off = j * buftype_extent + flat_buf->indices[i]; ++ req_off = off; ++ req_len = flat_buf->blocklens[i]; ++ ADIOI_BUFFERED_WRITE_WITHOUT_READ ++ off += flat_buf->blocklens[i]; ++ } ++ } ++ ++ /* write the buffer out finally */ ++ ADIO_WriteContig(fd, writebuf, writebuf_len, MPI_BYTE, ++ ADIO_EXPLICIT_OFFSET, writebuf_off, &status1, ++ error_code); ++ ++ if (fd->atomicity) ++ ADIOI_UNLOCK(fd, start_off, SEEK_SET, bufsize); ++ if (*error_code != MPI_SUCCESS) { ++ ADIOI_Free(writebuf); ++ return; ++ } ++ ADIOI_Free(writebuf); ++ if (file_ptr_type == ADIO_INDIVIDUAL) ++ fd->fp_ind = off; ++ } else { ++ /* noncontiguous in file */ ++ /* filetype already flattened in ADIO_Open */ ++ flat_file = ADIOI_Flatlist; ++ while (flat_file->type != fd->filetype) ++ flat_file = flat_file->next; ++ disp = fd->disp; ++ ++ if (file_ptr_type == ADIO_INDIVIDUAL) { ++ offset = fd->fp_ind; /* in bytes */ ++ n_filetypes = -1; ++ flag = 0; ++ while (!flag) { ++ n_filetypes++; ++ for (i = 0; i < flat_file->count; i++) { ++ if (disp + flat_file->indices[i] + ++ (ADIO_Offset) n_filetypes * filetype_extent + ++ flat_file->blocklens[i] >= offset) { ++ st_index = i; ++ fwr_size = (int) (disp + flat_file->indices[i] + ++ (ADIO_Offset) n_filetypes * ++ filetype_extent + ++ flat_file->blocklens[i] - ++ offset); ++ flag = 1; ++ break; ++ } ++ } ++ } ++ } else { ++ n_etypes_in_filetype = filetype_size / etype_size; ++ n_filetypes = (int) (offset / n_etypes_in_filetype); ++ etype_in_filetype = (int) (offset % n_etypes_in_filetype); ++ size_in_filetype = etype_in_filetype * etype_size; ++ ++ sum = 0; ++ for (i = 0; i < flat_file->count; i++) { ++ sum += flat_file->blocklens[i]; ++ if (sum > size_in_filetype) { ++ st_index = i; ++ fwr_size = sum - size_in_filetype; ++ abs_off_in_filetype = flat_file->indices[i] + ++ size_in_filetype - (sum - flat_file->blocklens[i]); ++ break; ++ } ++ } ++ ++ /* abs. offset in bytes in the file */ ++ offset = disp + (ADIO_Offset) n_filetypes *filetype_extent + ++ abs_off_in_filetype; ++ } ++ ++ start_off = offset; ++ ++ /* If the file bytes is actually contiguous, we do not need data sieve at all */ ++ if (bufsize <= fwr_size) { ++ req_off = start_off; ++ req_len = bufsize; ++ end_offset = start_off + bufsize - 1; ++ writebuf = (char *) ADIOI_Malloc(ADIOI_MIN(bufsize, stripe_size)); ++ memset(writebuf, -1, ADIOI_MIN(bufsize, stripe_size)); ++ writebuf_off = 0; ++ writebuf_len = 0; ++ userbuf_off = 0; ++ ADIOI_BUFFERED_WRITE_WITHOUT_READ ++ } else { ++ /* Calculate end_offset, the last byte-offset that will be accessed. ++ e.g., if start_offset=0 and 100 bytes to be write, end_offset=99 */ ++ st_fwr_size = fwr_size; ++ st_n_filetypes = n_filetypes; ++ i = 0; ++ j = st_index; ++ off = offset; ++ fwr_size = ADIOI_MIN(st_fwr_size, bufsize); ++ while (i < bufsize) { ++ i += fwr_size; ++ end_offset = off + fwr_size - 1; ++ ++ if (j < (flat_file->count - 1)) ++ j++; ++ else { ++ j = 0; ++ n_filetypes++; ++ } ++ ++ off = disp + flat_file->indices[j] + ++ (ADIO_Offset) n_filetypes * filetype_extent; ++ fwr_size = ADIOI_MIN(flat_file->blocklens[j], bufsize - i); ++ } ++ ++ writebuf_off = 0; ++ writebuf_len = 0; ++ writebuf = (char *) ADIOI_Malloc(stripe_size); ++ memset(writebuf, -1, stripe_size); ++ /* if atomicity is true, lock the region to be accessed */ ++ if (fd->atomicity) ++ ADIOI_WRITE_LOCK(fd, start_off, SEEK_SET, bufsize); ++ ++ if (buftype_is_contig && !filetype_is_contig) { ++ /* contiguous in memory, noncontiguous in file. should be the most ++ common case. */ ++ i = 0; ++ j = st_index; ++ off = offset; ++ n_filetypes = st_n_filetypes; ++ fwr_size = ADIOI_MIN(st_fwr_size, bufsize); ++ while (i < bufsize) { ++ if (fwr_size) { ++ /* TYPE_UB and TYPE_LB can result in ++ fwr_size = 0. save system call in such cases */ ++ /* ++ lseek(fd->fd_sys, off, SEEK_SET); ++ err = write(fd->fd_sys, ((char *) buf) + i, fwr_size); ++ */ ++ req_off = off; ++ req_len = fwr_size; ++ userbuf_off = i; ++ ADIOI_BUFFERED_WRITE ++ } ++ i += fwr_size; ++ ++ if (off + fwr_size < disp + flat_file->indices[j] + ++ flat_file->blocklens[j] + ++ (ADIO_Offset) n_filetypes * filetype_extent) ++ off += fwr_size; ++ /* did not reach end of contiguous block in filetype. ++ no more I/O needed. off is incremented by fwr_size. */ ++ else { ++ if (j < (flat_file->count - 1)) ++ j++; ++ else { ++ j = 0; ++ n_filetypes++; ++ } ++ off = disp + flat_file->indices[j] + ++ (ADIO_Offset) n_filetypes * filetype_extent; ++ fwr_size = ADIOI_MIN(flat_file->blocklens[j], ++ bufsize - i); ++ } ++ } ++ } else { ++ /* noncontiguous in memory as well as in file */ ++ ADIOI_Flatten_datatype(datatype); ++ flat_buf = ADIOI_Flatlist; ++ while (flat_buf->type != datatype) ++ flat_buf = flat_buf->next; ++ ++ k = num = buf_count = 0; ++ i = (int) (flat_buf->indices[0]); ++ j = st_index; ++ off = offset; ++ n_filetypes = st_n_filetypes; ++ fwr_size = st_fwr_size; ++ bwr_size = flat_buf->blocklens[0]; ++ ++ while (num < bufsize) { ++ size = ADIOI_MIN(fwr_size, bwr_size); ++ if (size) { ++ /* ++ lseek(fd->fd_sys, off, SEEK_SET); ++ err = write(fd->fd_sys, ((char *) buf) + i, size); ++ */ ++ req_off = off; ++ req_len = size; ++ userbuf_off = i; ++ ADIOI_BUFFERED_WRITE ++ } ++ ++ new_fwr_size = fwr_size; ++ new_bwr_size = bwr_size; ++ ++ if (size == fwr_size) { ++ /* reached end of contiguous block in file */ ++ if (j < (flat_file->count - 1)) { ++ j++; ++ } else { ++ j = 0; ++ n_filetypes++; ++ } ++ off = disp + flat_file->indices[j] + ++ (ADIO_Offset) n_filetypes * filetype_extent; ++ new_fwr_size = flat_file->blocklens[j]; ++ if (size != bwr_size) { ++ i += size; ++ new_bwr_size -= size; ++ } ++ } ++ if (size == bwr_size) { ++ /* reached end of contiguous block in memory */ ++ k = (k + 1) % flat_buf->count; ++ buf_count++; ++ i = (int) (buftype_extent * ++ (buf_count / flat_buf->count) + ++ flat_buf->indices[k]); ++ new_bwr_size = flat_buf->blocklens[k]; ++ if (size != fwr_size) { ++ off += size; ++ new_fwr_size -= size; ++ } ++ } ++ num += size; ++ fwr_size = new_fwr_size; ++ bwr_size = new_bwr_size; ++ } ++ } ++ ++ /* write the buffer out finally */ ++ if (writebuf_len) { ++ ADIO_WriteContig(fd, writebuf, writebuf_len, ++ MPI_BYTE, ADIO_EXPLICIT_OFFSET, ++ writebuf_off, &status1, error_code); ++ if (!(fd->atomicity)) ++ ADIOI_UNLOCK(fd, writebuf_off, SEEK_SET, writebuf_len); ++ if (*error_code != MPI_SUCCESS) { ++ ADIOI_Free(writebuf); ++ return; ++ } ++ } ++ if (fd->atomicity) ++ ADIOI_UNLOCK(fd, start_off, SEEK_SET, bufsize); ++ } ++ ADIOI_Free(writebuf); ++ if (file_ptr_type == ADIO_INDIVIDUAL) ++ fd->fp_ind = off; ++ } ++ fd->fp_sys_posn = -1; /* set it to null. */ ++ ++#ifdef HAVE_STATUS_SET_BYTES ++ MPIR_Status_set_bytes(status, datatype, bufsize); ++ /* This is a temporary way of filling in status. The right way is to ++ keep track of how much data was actually written by ADIOI_BUFFERED_WRITE. */ ++#endif ++ ++ if (!buftype_is_contig) ++ ADIOI_Delete_flattened(datatype); ++} +diff -ruN ad_lustre_orig/Makefile.in ad_lustre/Makefile.in +--- ad_lustre_orig/Makefile.in 2008-09-17 14:36:57.000000000 +0800 ++++ ad_lustre/Makefile.in 2008-10-17 17:03:06.000000000 +0800 +@@ -16,7 +16,9 @@ + @VPATH@ + + AD_LUSTRE_OBJECTS = ad_lustre.o ad_lustre_open.o \ +- ad_lustre_rwcontig.o ad_lustre_hints.o ++ ad_lustre_rwcontig.o ad_lustre_wrcoll.o ad_lustre_wrstr.o \ ++ ad_lustre_hints.o ad_lustre_aggregate.o ++ + + default: $(LIBNAME) + @if [ "@ENABLE_SHLIB@" != "none" ] ; then \ +diff -ruN ad_lustre_orig/README ad_lustre/README +--- ad_lustre_orig/README 2008-09-17 14:36:57.000000000 +0800 ++++ ad_lustre/README 2008-10-17 16:50:15.000000000 +0800 +@@ -5,6 +5,23 @@ + o To post the code for ParColl (Partitioned collective IO) + + ----------------------------------------------------- ++V05: ++----------------------------------------------------- ++Improved data redistribution ++ o Improve I/O pattern identification. Besides checking interleaving, ++ if request I/O size is small, collective I/O will be performed. ++ The hint big_req_size can be used to define the req size value. ++ o Provide hint CO for load balancing to control the number of ++ IO clients for each OST ++ o Produce stripe-contiguous I/O pattern that Lustre prefers ++ o Reduce the collective overhead by hints contiguous_data and ++ same_io_size to remove unnecessary MPI_Alltoall() ++ o Control read-modify-write in data sieving in collective IO ++ by hint ds_in_coll. ++ o Reduce extent lock conflicts by make each OST accessed by one or ++ more constant clients. ++ ++----------------------------------------------------- + V04: + ----------------------------------------------------- + o Direct IO and Lockless IO support +--- common/ad_write_coll_orig.c 2008-10-15 11:24:31.000000000 +0800 ++++ common/ad_write_coll.c 2008-10-15 11:25:39.000000000 +0800 +@@ -42,7 +42,7 @@ + int *send_buf_idx, int *curr_to_proc, + int *done_to_proc, int iter, + MPI_Aint buftype_extent); +-static void ADIOI_Heap_merge(ADIOI_Access *others_req, int *count, ++void ADIOI_Heap_merge(ADIOI_Access *others_req, int *count, + ADIO_Offset *srt_off, int *srt_len, int *start_pos, + int nprocs, int nprocs_recv, int total_elements); + +@@ -921,7 +921,7 @@ + + + +-static void ADIOI_Heap_merge(ADIOI_Access *others_req, int *count, ++void ADIOI_Heap_merge(ADIOI_Access *others_req, int *count, + ADIO_Offset *srt_off, int *srt_len, int *start_pos, + int nprocs, int nprocs_recv, int total_elements) + { -- 1.8.3.1