From 8ac38988273c2b49a494c5d1c2631728d9f7da93 Mon Sep 17 00:00:00 2001 From: adilger Date: Mon, 25 Oct 2004 06:27:28 +0000 Subject: [PATCH] Branch: b1_4 - reorganization of lov code - fixups for compiling on 2.6 --- lustre/ChangeLog | 6 + lustre/configure.in | 2 +- lustre/include/linux/lustre_net.h | 2 +- lustre/include/linux/lvfs_linux.h | 3 - lustre/include/linux/obd_lov.h | 15 - lustre/include/lustre/lustre_user.h | 9 + .../patches/export-zap-page-range.patch | 12 + lustre/liblustre/rw.c | 2 - lustre/llite/file.c | 10 +- lustre/llite/llite_lib.c | 6 - lustre/lov/Makefile.in | 2 +- lustre/lov/Makefile.mk | 2 +- lustre/lov/autoMakefile.am | 2 +- lustre/lov/lov_internal.h | 169 +- lustre/lov/lov_merge.c | 142 ++ lustre/lov/lov_obd.c | 1814 ++++---------------- lustre/lov/lov_offset.c | 240 +++ lustre/lov/lov_pack.c | 2 +- lustre/lov/lov_qos.c | 186 ++ lustre/lov/lov_request.c | 1293 ++++++++++++++ lustre/ptlrpc/client.c | 2 +- 21 files changed, 2447 insertions(+), 1474 deletions(-) create mode 100644 lustre/kernel_patches/patches/export-zap-page-range.patch create mode 100644 lustre/lov/lov_merge.c create mode 100644 lustre/lov/lov_offset.c create mode 100644 lustre/lov/lov_qos.c create mode 100644 lustre/lov/lov_request.c diff --git a/lustre/ChangeLog b/lustre/ChangeLog index 642f790..7aed4ee 100644 --- a/lustre/ChangeLog +++ b/lustre/ChangeLog @@ -1,4 +1,10 @@ tbd Cluster File Systems, Inc. + * version 1.3.4 + * bug fixes + * miscellania + - reorganization of lov code + +tbd Cluster File Systems, Inc. * version 1.2.8 * bug fixes - allocate qswnal tx descriptors singly to avoid fragmentation (4504) diff --git a/lustre/configure.in b/lustre/configure.in index 5fbcd65..bcf65df 100644 --- a/lustre/configure.in +++ b/lustre/configure.in @@ -5,7 +5,7 @@ AC_INIT AC_CANONICAL_SYSTEM -AM_INIT_AUTOMAKE(lustre, 1.2.7.4) +AM_INIT_AUTOMAKE(lustre, 1.3.4) # AM_MAINTAINER_MODE # Four main targets: lustre kernel modules, utilities, tests, and liblustre diff --git a/lustre/include/linux/lustre_net.h b/lustre/include/linux/lustre_net.h index 3b6617d..7e612eb 100644 --- a/lustre/include/linux/lustre_net.h +++ b/lustre/include/linux/lustre_net.h @@ -201,7 +201,7 @@ struct ptlrpc_request_set { wait_queue_head_t *set_wakeup_ptr; struct list_head set_requests; set_interpreter_func set_interpret; /* completion callback */ - union ptlrpc_async_args set_args; /* completion context */ + void *set_arg; /* completion context */ /* locked so that any old caller can communicate requests to * the set holder who can then fold them into the lock-free set */ spinlock_t set_new_req_lock; diff --git a/lustre/include/linux/lvfs_linux.h b/lustre/include/linux/lvfs_linux.h index 71fc431..cf73d47 100644 --- a/lustre/include/linux/lvfs_linux.h +++ b/lustre/include/linux/lvfs_linux.h @@ -8,11 +8,8 @@ #include #include #include -#include #include -/* we have made EXT3_IOC_SETFLAGS a Lustre constant */ -#include #define l_file file #define l_dentry dentry diff --git a/lustre/include/linux/obd_lov.h b/lustre/include/linux/obd_lov.h index cf3ccec..9692a9b 100644 --- a/lustre/include/linux/obd_lov.h +++ b/lustre/include/linux/obd_lov.h @@ -7,21 +7,6 @@ #define OBD_LOV_DEVICENAME "lov" -struct lov_brw_async_args { - struct lov_stripe_md *aa_lsm; - struct obdo *aa_obdos; - struct obdo *aa_oa; - struct brw_page *aa_ioarr; - obd_count aa_oa_bufs; -}; - -struct lov_getattr_async_args { - struct lov_stripe_md *aa_lsm; - struct obdo *aa_oa; - struct obdo *aa_obdos; - struct lov_obd *aa_lov; -}; - static inline int lov_stripe_md_size(int stripes) { return sizeof(struct lov_stripe_md) + stripes*sizeof(struct lov_oinfo); diff --git a/lustre/include/lustre/lustre_user.h b/lustre/include/lustre/lustre_user.h index 38ea2f2..0286b49 100644 --- a/lustre/include/lustre/lustre_user.h +++ b/lustre/include/lustre/lustre_user.h @@ -34,6 +34,15 @@ /* for statfs() */ #define LL_SUPER_MAGIC 0x0BD00BD0 +#ifndef EXT3_IOC_GETFLAGS +#define EXT3_IOC_GETFLAGS _IOR('f', 1, long) +#define EXT3_IOC_SETFLAGS _IOW('f', 2, long) +#define EXT3_IOC_GETVERSION _IOR('f', 3, long) +#define EXT3_IOC_SETVERSION _IOW('f', 4, long) +#define EXT3_IOC_GETVERSION_OLD _IOR('v', 1, long) +#define EXT3_IOC_SETVERSION_OLD _IOW('v', 2, long) +#endif + #define LL_IOC_GETFLAGS _IOR ('f', 151, long) #define LL_IOC_SETFLAGS _IOW ('f', 152, long) #define LL_IOC_CLRFLAGS _IOW ('f', 153, long) diff --git a/lustre/kernel_patches/patches/export-zap-page-range.patch b/lustre/kernel_patches/patches/export-zap-page-range.patch new file mode 100644 index 0000000..9b9d48f --- /dev/null +++ b/lustre/kernel_patches/patches/export-zap-page-range.patch @@ -0,0 +1,12 @@ +Index: linux-2.4.24-l36mmap/mm/memory.c +=================================================================== +--- linux-2.4.24-l36mmap.orig/mm/memory.c 2004-05-27 17:44:13.000000000 -0700 ++++ linux-2.4.24-l36mmap/mm/memory.c 2004-05-27 17:45:07.000000000 -0700 +@@ -411,6 +411,7 @@ + mm->rss = 0; + spin_unlock(&mm->page_table_lock); + } ++EXPORT_SYMBOL_GPL(zap_page_range); + + /* + * Do a quick page-table lookup for a single page. diff --git a/lustre/liblustre/rw.c b/lustre/liblustre/rw.c index 3104ff0..43e75c5 100644 --- a/lustre/liblustre/rw.c +++ b/lustre/liblustre/rw.c @@ -241,8 +241,6 @@ int llu_extent_lock(struct ll_file_data *fd, struct inode *inode, policy->l_extent.end == OBD_OBJECT_EOF) lli->lli_st_size = lov_merge_size(lsm, 1); - //inode->i_mtime = lov_merge_mtime(lsm, inode->i_mtime); - RETURN(rc); } diff --git a/lustre/llite/file.c b/lustre/llite/file.c index ef1f0a5..b0e16c3 100644 --- a/lustre/llite/file.c +++ b/lustre/llite/file.c @@ -361,7 +361,7 @@ void ll_pgcache_remove_extent(struct inode *inode, struct lov_stripe_md *lsm, if (end < tmpex.l_extent.end >> PAGE_CACHE_SHIFT) end = ~0; - i = (inode->i_size + PAGE_CACHE_SIZE-1) >> PAGE_CACHE_SHIFT; + i = inode->i_size ? (inode->i_size - 1) >> PAGE_CACHE_SHIFT : 0; if (i < end) end = i; @@ -650,7 +650,8 @@ int ll_glimpse_size(struct inode *inode) inode->i_size = lov_merge_size(lli->lli_smd, 0); inode->i_blocks = lov_merge_blocks(lli->lli_smd); - inode->i_mtime = lov_merge_mtime(lli->lli_smd, inode->i_mtime); + LTIME_S(inode->i_mtime) = lov_merge_mtime(lli->lli_smd, + LTIME_S(inode->i_mtime)); CDEBUG(D_DLMTRACE, "glimpse: size: %llu, blocks: %lu\n", inode->i_size, inode->i_blocks); @@ -700,8 +701,9 @@ int ll_extent_lock(struct ll_file_data *fd, struct inode *inode, inode->i_size = lov_merge_size(lsm, 1); up(&inode->i_sem); } - - //inode->i_mtime = lov_merge_mtime(lsm, inode->i_mtime); + if (rc > 0) + LTIME_S(inode->i_mtime) = + lov_merge_mtime(lsm, LTIME_S(inode->i_mtime)); RETURN(rc); } diff --git a/lustre/llite/llite_lib.c b/lustre/llite/llite_lib.c index b61b721..57a5a2d 100644 --- a/lustre/llite/llite_lib.c +++ b/lustre/llite/llite_lib.c @@ -150,12 +150,6 @@ int lustre_common_fill_super(struct super_block *sb, char *mdc, char *osc) } sbi->ll_osc_exp = class_conn2export(&osc_conn); - /* need to do a statfs to initialize the per-OSC osfs cache - - * that is used by the OSC IO code to know the blocksize */ - err = obd_statfs(obd, &osfs, jiffies - HZ); - if (err) - GOTO(out_mdc, err); - err = mdc_getstatus(sbi->ll_mdc_exp, &rootfid); if (err) { CERROR("cannot mds_connect: rc = %d\n", err); diff --git a/lustre/lov/Makefile.in b/lustre/lov/Makefile.in index 52dba75..aebee3e 100644 --- a/lustre/lov/Makefile.in +++ b/lustre/lov/Makefile.in @@ -1,4 +1,4 @@ MODULES := lov -lov-objs := lov_log.o lov_obd.o lov_pack.o lproc_lov.o +lov-objs := lov_log.o lov_obd.o lov_pack.o lproc_lov.o lov_offset.o lov_merge.o lov_request.o lov_qos.o @INCLUDE_RULES@ diff --git a/lustre/lov/Makefile.mk b/lustre/lov/Makefile.mk index 48c14bb..bdef333 100644 --- a/lustre/lov/Makefile.mk +++ b/lustre/lov/Makefile.mk @@ -6,4 +6,4 @@ include $(src)/../portals/Kernelenv obj-y += lov.o -lov-objs := lov_obd.o lov_pack.o lproc_lov.o lov_log.o +lov-objs := lov_obd.o lov_pack.o lproc_lov.o lov_log.o lov_offset.o lov_merge.o lov_request.o lov_qos.o diff --git a/lustre/lov/autoMakefile.am b/lustre/lov/autoMakefile.am index 2847d56..e6854ec 100644 --- a/lustre/lov/autoMakefile.am +++ b/lustre/lov/autoMakefile.am @@ -5,7 +5,7 @@ if LIBLUSTRE noinst_LIBRARIES = liblov.a -liblov_a_SOURCES = lov_log.c lov_obd.c lov_pack.c lov_internal.h +liblov_a_SOURCES = lov_log.c lov_obd.c lov_pack.c lov_request.c lov_offset.c lov_qos.c lov_merge.c lov_internal.h liblov_a_CPPFLAGS = $(LLCPPFLAGS) liblov_a_CFLAGS = $(LLCFLAGS) endif diff --git a/lustre/lov/lov_internal.h b/lustre/lov/lov_internal.h index cefe534..e6ecc5e 100644 --- a/lustre/lov/lov_internal.h +++ b/lustre/lov/lov_internal.h @@ -12,6 +12,44 @@ #include +struct lov_lock_handles { + struct portals_handle llh_handle; + atomic_t llh_refcount; + int llh_stripe_count; + struct lustre_handle llh_handles[0]; +}; + +struct lov_request { + struct list_head rq_link; + struct ldlm_extent rq_extent; + int rq_idx; /* index in lov->tgts array */ + int rq_stripe; /* stripe number */ + int rq_complete; + int rq_rc; + int rq_buflen; /* length of sub_md */ + struct obdo *rq_oa; + struct lov_stripe_md *rq_md; + obd_count rq_oabufs; + obd_count rq_pgaidx; +}; + +struct lov_request_set { + atomic_t set_refcount; + struct obd_export *set_exp; + int set_count; + int set_completes; + int set_success; + struct llog_cookie *set_cookies; + int set_cookie_sent; + struct lov_stripe_md *set_md; + struct obdo *set_oa; + struct obd_trans_info *set_oti; + obd_count set_oabufs; + struct brw_page *set_pga; + struct lov_lock_handles *set_lockh; + struct list_head set_list; +}; + #define LAP_MAGIC 8200 struct lov_async_page { @@ -20,10 +58,139 @@ struct lov_async_page { obd_off lap_sub_offset; void *lap_sub_cookie; struct obd_async_page_ops *lap_caller_ops; - struct obd_async_page_ops *lap_caller_data; + void *lap_caller_data; obd_id lap_loi_id; }; +#define LAP_FROM_COOKIE(c) \ + (LASSERT(((struct lov_async_page *)(c))->lap_magic == LAP_MAGIC), \ + (struct lov_async_page *)(c)) + +static inline void lov_llh_addref(void *llhp) +{ + struct lov_lock_handles *llh = llhp; + atomic_inc(&llh->llh_refcount); + CDEBUG(D_INFO, "GETting llh %p : new refcount %d\n", llh, + atomic_read(&llh->llh_refcount)); +} + +static inline struct lov_lock_handles *lov_llh_new(struct lov_stripe_md *lsm) +{ + struct lov_lock_handles *llh; + + OBD_ALLOC(llh, sizeof *llh + + sizeof(*llh->llh_handles) * lsm->lsm_stripe_count); + if (llh == NULL) + return NULL; + atomic_set(&llh->llh_refcount, 2); + llh->llh_stripe_count = lsm->lsm_stripe_count; + INIT_LIST_HEAD(&llh->llh_handle.h_link); + class_handle_hash(&llh->llh_handle, lov_llh_addref); + return llh; +} + +static inline struct lov_lock_handles * +lov_handle2llh(struct lustre_handle *handle) +{ + LASSERT(handle != NULL); + return(class_handle2object(handle->cookie)); +} + +static inline void lov_llh_put(struct lov_lock_handles *llh) +{ + CDEBUG(D_INFO, "PUTting llh %p : new refcount %d\n", llh, + atomic_read(&llh->llh_refcount) - 1); + LASSERT(atomic_read(&llh->llh_refcount) > 0 && + atomic_read(&llh->llh_refcount) < 0x5a5a); + if (atomic_dec_and_test(&llh->llh_refcount)) { + class_handle_unhash(&llh->llh_handle); + LASSERT(list_empty(&llh->llh_handle.h_link)); + OBD_FREE(llh, sizeof *llh + + sizeof(*llh->llh_handles) * llh->llh_stripe_count); + } +} + +/* lov_merge.c */ +void lov_merge_attrs(struct obdo *tgt, struct obdo *src, obd_flag valid, + struct lov_stripe_md *lsm, int stripeno, int *set); + +/* lov_offset.c */ +obd_size lov_stripe_size(struct lov_stripe_md *lsm, obd_size ost_size, + int stripeno); +int lov_stripe_offset(struct lov_stripe_md *lsm, obd_off lov_off, + int stripeno, obd_off *obd_off); +obd_off lov_size_to_stripe(struct lov_stripe_md *lsm, obd_off file_size, + int stripeno); +int lov_stripe_intersects(struct lov_stripe_md *lsm, int stripeno, + obd_off start, obd_off end, + obd_off *obd_start, obd_off *obd_end); +int lov_stripe_number(struct lov_stripe_md *lsm, obd_off lov_off); + +/* lov_qos.c */ +void qos_shrink_lsm(struct lov_request_set *set); +int qos_prep_create(struct lov_obd *lov, struct lov_request_set *set, + int newea); + +/* lov_request.c */ +void lov_set_add_req(struct lov_request *req, struct lov_request_set *set); +int lov_update_common_set(struct lov_request_set *set, + struct lov_request *req, int rc); +int lov_prep_create_set(struct obd_export *exp, struct lov_stripe_md **ea, + struct obdo *src_oa, struct obd_trans_info *oti, + struct lov_request_set **reqset); +int lov_update_create_set(struct lov_request_set *set, + struct lov_request *req, int rc); +int lov_fini_create_set(struct lov_request_set *set, struct lov_stripe_md **ea); +int lov_prep_brw_set(struct obd_export *exp, struct obdo *src_oa, + struct lov_stripe_md *lsm, obd_count oa_bufs, + struct brw_page *pga, struct obd_trans_info *oti, + struct lov_request_set **reqset); +int lov_fini_brw_set(struct lov_request_set *set); +int lov_prep_getattr_set(struct obd_export *exp, struct obdo *src_oa, + struct lov_stripe_md *lsm, + struct lov_request_set **reqset); +int lov_fini_getattr_set(struct lov_request_set *set); +int lov_prep_destroy_set(struct obd_export *exp, struct obdo *src_oa, + struct lov_stripe_md *lsm, + struct obd_trans_info *oti, + struct lov_request_set **reqset); +int lov_update_destroy_set(struct lov_request_set *set, + struct lov_request *req, int rc); +int lov_fini_destroy_set(struct lov_request_set *set); +int lov_prep_setattr_set(struct obd_export *exp, struct obdo *src_oa, + struct lov_stripe_md *lsm, struct obd_trans_info *oti, + struct lov_request_set **reqset); +int lov_fini_setattr_set(struct lov_request_set *set); +int lov_prep_punch_set(struct obd_export *exp, struct obdo *src_oa, + struct lov_stripe_md *lsm, obd_off start, + obd_off end, struct obd_trans_info *oti, + struct lov_request_set **reqset); +int lov_update_punch_set(struct lov_request_set *set, struct lov_request *req, + int rc); +int lov_fini_punch_set(struct lov_request_set *set); +int lov_prep_sync_set(struct obd_export *exp, struct obdo *src_oa, + struct lov_stripe_md *lsm, obd_off start, + obd_off end, struct lov_request_set **reqset); +int lov_fini_sync_set(struct lov_request_set *set); +int lov_prep_enqueue_set(struct obd_export *exp, struct lov_stripe_md *lsm, + ldlm_policy_data_t *policy, __u32 mode, + struct lustre_handle *lockh, + struct lov_request_set **reqset); +int lov_update_enqueue_set(struct lov_request_set *set, + struct lov_request *req, int rc, int flags); +int lov_fini_enqueue_set(struct lov_request_set *set, __u32 mode); +int lov_prep_match_set(struct obd_export *exp, struct lov_stripe_md *lsm, + ldlm_policy_data_t *policy, __u32 mode, + struct lustre_handle *lockh, + struct lov_request_set **reqset); +int lov_update_match_set(struct lov_request_set *set, struct lov_request *req, + int rc); +int lov_fini_match_set(struct lov_request_set *set, __u32 mode, int flags); +int lov_prep_cancel_set(struct obd_export *exp, struct lov_stripe_md *lsm, + __u32 mode, struct lustre_handle *lockh, + struct lov_request_set **reqset); +int lov_fini_cancel_set(struct lov_request_set *set); + /* lov_obd.c */ int lov_get_stripecnt(struct lov_obd *lov, int stripe_count); int lov_alloc_memmd(struct lov_stripe_md **lsmp, int stripe_count, int pattern); diff --git a/lustre/lov/lov_merge.c b/lustre/lov/lov_merge.c new file mode 100644 index 0000000..e4e40bd --- /dev/null +++ b/lustre/lov/lov_merge.c @@ -0,0 +1,142 @@ +/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- + * vim:expandtab:shiftwidth=8:tabstop=8: + * + * Copyright (C) 2002, 2003 Cluster File Systems, Inc. + * + * This file is part of Lustre, http://www.lustre.org. + * + * Lustre is free software; you can redistribute it and/or + * modify it under the terms of version 2 of the GNU General Public + * License as published by the Free Software Foundation. + * + * Lustre is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Lustre; if not, write to the Free Software + * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + */ + +#ifndef EXPORT_SYMTAB +# define EXPORT_SYMTAB +#endif +#define DEBUG_SUBSYSTEM S_LOV + +#ifdef __KERNEL__ +#include +#else +#include +#endif + +#include +#include + +#include "lov_internal.h" + +/* Merge rss if kms == 0 + * + * Even when merging RSS, we will take the KMS value if it's larger. + * This prevents getattr from stomping on dirty cached pages which + * extend the file size. */ +__u64 lov_merge_size(struct lov_stripe_md *lsm, int kms) +{ + struct lov_oinfo *loi; + __u64 size = 0; + int i; + + for (i = 0, loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; + i++, loi++) { + obd_size lov_size, tmpsize; + + tmpsize = loi->loi_kms; + if (kms == 0 && loi->loi_rss > tmpsize) + tmpsize = loi->loi_rss; + + lov_size = lov_stripe_size(lsm, tmpsize, i); + if (lov_size > size) + size = lov_size; + } + + return size; +} +EXPORT_SYMBOL(lov_merge_size); + +/* Merge blocks */ +__u64 lov_merge_blocks(struct lov_stripe_md *lsm) +{ + struct lov_oinfo *loi; + __u64 blocks = 0; + int i; + + for (i = 0, loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++, loi++) + blocks += loi->loi_blocks; + return blocks; +} +EXPORT_SYMBOL(lov_merge_blocks); + +__u64 lov_merge_mtime(struct lov_stripe_md *lsm, __u64 current_time) +{ + struct lov_oinfo *loi; + int i; + + for (i = 0, loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++, loi++) + if (loi->loi_mtime > current_time) + current_time = loi->loi_mtime; + return current_time; +} +EXPORT_SYMBOL(lov_merge_mtime); + +int lov_increase_kms(struct obd_export *exp, struct lov_stripe_md *lsm, + obd_off size) +{ + struct lov_oinfo *loi; + int stripe = 0; + __u64 kms; + ENTRY; + + if (size > 0) + stripe = lov_stripe_number(lsm, size - 1); + kms = lov_size_to_stripe(lsm, size, stripe); + loi = &(lsm->lsm_oinfo[stripe]); + + CDEBUG(D_INODE, "stripe %d KMS %sincreasing "LPU64"->"LPU64"\n", + stripe, kms > loi->loi_kms ? "" : "not ", loi->loi_kms, kms); + if (kms > loi->loi_kms) + loi->loi_kms = kms; + + RETURN(0); +} +EXPORT_SYMBOL(lov_increase_kms); + +void lov_merge_attrs(struct obdo *tgt, struct obdo *src, obd_flag valid, + struct lov_stripe_md *lsm, int stripeno, int *set) +{ + valid &= src->o_valid; + + if (*set) { + if (valid & OBD_MD_FLSIZE) { + /* this handles sparse files properly */ + obd_size lov_size; + + lov_size = lov_stripe_size(lsm, src->o_size, stripeno); + if (lov_size > tgt->o_size) + tgt->o_size = lov_size; + } + if (valid & OBD_MD_FLBLOCKS) + tgt->o_blocks += src->o_blocks; + if (valid & OBD_MD_FLBLKSZ) + tgt->o_blksize += src->o_blksize; + if (valid & OBD_MD_FLCTIME && tgt->o_ctime < src->o_ctime) + tgt->o_ctime = src->o_ctime; + if (valid & OBD_MD_FLMTIME && tgt->o_mtime < src->o_mtime) + tgt->o_mtime = src->o_mtime; + } else { + memcpy(tgt, src, sizeof(*tgt)); + tgt->o_id = lsm->lsm_object_id; + if (valid & OBD_MD_FLSIZE) + tgt->o_size = lov_stripe_size(lsm,src->o_size,stripeno); + *set = 1; + } +} diff --git a/lustre/lov/lov_obd.c b/lustre/lov/lov_obd.c index 977ac2c..5ca8bf3 100644 --- a/lustre/lov/lov_obd.c +++ b/lustre/lov/lov_obd.c @@ -51,68 +51,6 @@ #include "lov_internal.h" -static int lov_stripe_offset(struct lov_stripe_md *lsm, obd_off lov_off, - int stripeno, obd_off *obd_off); - -struct lov_lock_handles { - struct portals_handle llh_handle; - atomic_t llh_refcount; - int llh_stripe_count; - struct lustre_handle llh_handles[0]; -}; - -static void lov_llh_addref(void *llhp) -{ - struct lov_lock_handles *llh = llhp; - - atomic_inc(&llh->llh_refcount); - CDEBUG(D_INFO, "GETting llh %p : new refcount %d\n", llh, - atomic_read(&llh->llh_refcount)); -} - -static struct lov_lock_handles *lov_llh_new(struct lov_stripe_md *lsm) -{ - struct lov_lock_handles *llh; - - OBD_ALLOC(llh, sizeof *llh + - sizeof(*llh->llh_handles) * lsm->lsm_stripe_count); - if (llh == NULL) { - CERROR("out of memory\n"); - return NULL; - } - atomic_set(&llh->llh_refcount, 2); - llh->llh_stripe_count = lsm->lsm_stripe_count; - INIT_LIST_HEAD(&llh->llh_handle.h_link); - class_handle_hash(&llh->llh_handle, lov_llh_addref); - return llh; -} - -static struct lov_lock_handles *lov_handle2llh(struct lustre_handle *handle) -{ - ENTRY; - LASSERT(handle != NULL); - RETURN(class_handle2object(handle->cookie)); -} - -static void lov_llh_put(struct lov_lock_handles *llh) -{ - CDEBUG(D_INFO, "PUTting llh %p : new refcount %d\n", llh, - atomic_read(&llh->llh_refcount) - 1); - LASSERT(atomic_read(&llh->llh_refcount) > 0 && - atomic_read(&llh->llh_refcount) < 0x5a5a); - if (atomic_dec_and_test(&llh->llh_refcount)) { - LASSERT(list_empty(&llh->llh_handle.h_link)); - OBD_FREE(llh, sizeof *llh + - sizeof(*llh->llh_handles) * llh->llh_stripe_count); - } -} - -static void lov_llh_destroy(struct lov_lock_handles *llh) -{ - class_handle_unhash(&llh->llh_handle); - lov_llh_put(llh); -} - /* obd methods */ static int lov_connect(struct lustre_handle *conn, struct obd_device *obd, struct obd_uuid *cluuid) @@ -391,6 +329,14 @@ static int lov_setup(struct obd_device *obd, obd_count len, void *buf) RETURN(-EINVAL); } + if (desc->ld_default_stripe_size < PTLRPC_MAX_BRW_SIZE) { + CWARN("Increasing default_stripe_size "LPU64" to %u\n", + desc->ld_default_stripe_size, PTLRPC_MAX_BRW_SIZE); + CWARN("Please update config and run --write-conf on MDS\n"); + + desc->ld_default_stripe_size = PTLRPC_MAX_BRW_SIZE; + } + /* Because of 64-bit divide/mod operations only work with a 32-bit * divisor in a 32-bit kernel, we cannot support a stripe width * of 4GB or larger on 32-bit CPUs. @@ -450,60 +396,6 @@ static int lov_cleanup(struct obd_device *obd, int flags) RETURN(0); } - -/* compute object size given "stripeno" and the ost size */ -static obd_size lov_stripe_size(struct lov_stripe_md *lsm, obd_size ost_size, - int stripeno) -{ - unsigned long ssize = lsm->lsm_stripe_size; - unsigned long swidth = ssize * lsm->lsm_stripe_count; - unsigned long stripe_size; - obd_size lov_size; - - if (ost_size == 0) - return 0; - - /* do_div(a, b) returns a % b, and a = a / b */ - stripe_size = do_div(ost_size, ssize); - if (stripe_size) - lov_size = ost_size * swidth + stripeno * ssize + stripe_size; - else - lov_size = (ost_size - 1) * swidth + (stripeno + 1) * ssize; - - return lov_size; -} - -static void lov_merge_attrs(struct obdo *tgt, struct obdo *src, obd_flag valid, - struct lov_stripe_md *lsm, int stripeno, int *set) -{ - valid &= src->o_valid; - - if (*set) { - if (valid & OBD_MD_FLSIZE) { - /* this handles sparse files properly */ - obd_size lov_size; - - lov_size = lov_stripe_size(lsm, src->o_size, stripeno); - if (lov_size > tgt->o_size) - tgt->o_size = lov_size; - } - if (valid & OBD_MD_FLBLOCKS) - tgt->o_blocks += src->o_blocks; - if (valid & OBD_MD_FLBLKSZ) - tgt->o_blksize += src->o_blksize; - if (valid & OBD_MD_FLCTIME && tgt->o_ctime < src->o_ctime) - tgt->o_ctime = src->o_ctime; - if (valid & OBD_MD_FLMTIME && tgt->o_mtime < src->o_mtime) - tgt->o_mtime = src->o_mtime; - } else { - memcpy(tgt, src, sizeof(*tgt)); - tgt->o_id = lsm->lsm_object_id; - if (valid & OBD_MD_FLSIZE) - tgt->o_size = lov_stripe_size(lsm,src->o_size,stripeno); - *set = 1; - } -} - #ifndef log2 #define log2(n) ffz(~(n)) #endif @@ -565,23 +457,58 @@ static int lov_clear_orphans(struct obd_export *export, struct obdo *src_oa, RETURN(rc); } -#define LOV_CREATE_RESEED_INTERVAL 1000 +static int lov_recreate(struct obd_export *exp, struct obdo *src_oa, + struct lov_stripe_md **ea, struct obd_trans_info *oti) +{ + struct lov_stripe_md *obj_mdp, *lsm; + struct lov_obd *lov = &exp->exp_obd->u.lov; + unsigned ost_idx; + int rc, i; + ENTRY; + + LASSERT(src_oa->o_valid & OBD_MD_FLFLAGS && + src_oa->o_flags & OBD_FL_RECREATE_OBJS); + + OBD_ALLOC(obj_mdp, sizeof(*obj_mdp)); + if (obj_mdp == NULL) + RETURN(-ENOMEM); + + ost_idx = src_oa->o_nlink; + lsm = *ea; + if (lsm == NULL) + GOTO(out, rc = -EINVAL); + if (ost_idx >= lov->desc.ld_tgt_count) + GOTO(out, rc = -EINVAL); + + for (i = 0; i < lsm->lsm_stripe_count; i++) { + if (lsm->lsm_oinfo[i].loi_ost_idx == ost_idx) { + if (lsm->lsm_oinfo[i].loi_id != src_oa->o_id) + GOTO(out, rc = -EINVAL); + break; + } + } + if (i == lsm->lsm_stripe_count) + GOTO(out, rc = -EINVAL); + + rc = obd_create(lov->tgts[ost_idx].ltd_exp, src_oa, &obj_mdp, oti); +out: + OBD_FREE(obj_mdp, sizeof(*obj_mdp)); + RETURN(rc); +} /* the LOV expects oa->o_id to be set to the LOV object id */ static int lov_create(struct obd_export *exp, struct obdo *src_oa, struct lov_stripe_md **ea, struct obd_trans_info *oti) { - static int ost_start_idx, ost_start_count; struct lov_obd *lov; - struct lov_stripe_md *lsm; - struct lov_oinfo *loi = NULL; - struct obdo *tmp_oa, *ret_oa; - struct llog_cookie *cookies = NULL; - unsigned ost_count, ost_idx; - int set = 0, obj_alloc = 0, cookie_sent = 0, rc = 0, i; + struct lov_request_set *set = NULL; + struct list_head *pos; + int rc = 0; ENTRY; LASSERT(ea != NULL); + if (exp == NULL) + RETURN(-EINVAL); if ((src_oa->o_valid & OBD_MD_FLFLAGS) && src_oa->o_flags == OBD_FL_DELORPHAN) { @@ -589,425 +516,142 @@ static int lov_create(struct obd_export *exp, struct obdo *src_oa, RETURN(rc); } - if (exp == NULL) - RETURN(-EINVAL); - lov = &exp->exp_obd->u.lov; - if (!lov->desc.ld_active_tgt_count) RETURN(-EIO); /* Recreate a specific object id at the given OST index */ - if (src_oa->o_valid & OBD_MD_FLFLAGS && src_oa->o_flags & - OBD_FL_RECREATE_OBJS) { - struct lov_stripe_md obj_md; - struct lov_stripe_md *obj_mdp = &obj_md; - - ost_idx = src_oa->o_nlink; - lsm = *ea; - if (lsm == NULL) - RETURN(-EINVAL); - if (ost_idx >= lov->desc.ld_tgt_count) - RETURN(-EINVAL); - for (i = 0; i < lsm->lsm_stripe_count; i++) { - if (lsm->lsm_oinfo[i].loi_ost_idx == ost_idx) { - if (lsm->lsm_oinfo[i].loi_id != src_oa->o_id) - RETURN(-EINVAL); - break; - } - } - if (i == lsm->lsm_stripe_count) - RETURN(-EINVAL); - - rc = obd_create(lov->tgts[ost_idx].ltd_exp, src_oa, - &obj_mdp, oti); + if ((src_oa->o_valid & OBD_MD_FLFLAGS) && + (src_oa->o_flags & OBD_FL_RECREATE_OBJS)) { + rc = lov_recreate(exp, src_oa, ea, oti); RETURN(rc); } - ret_oa = obdo_alloc(); - if (!ret_oa) - RETURN(-ENOMEM); - - tmp_oa = obdo_alloc(); - if (!tmp_oa) - GOTO(out_oa, rc = -ENOMEM); - - lsm = *ea; - if (lsm == NULL) { - int stripes; - ost_count = lov_get_stripecnt(lov, 0); - - /* If the MDS file was truncated up to some size, stripe over - * enough OSTs to allow the file to be created at that size. */ - if (src_oa->o_valid & OBD_MD_FLSIZE) { - stripes=((src_oa->o_size+LUSTRE_STRIPE_MAXBYTES)>>12)-1; - do_div(stripes, (__u32)(LUSTRE_STRIPE_MAXBYTES >> 12)); - - if (stripes > lov->desc.ld_active_tgt_count) - RETURN(-EFBIG); - if (stripes < ost_count) - stripes = ost_count; - } else { - stripes = ost_count; - } - - rc = lov_alloc_memmd(&lsm, stripes, lov->desc.ld_pattern ? - lov->desc.ld_pattern : LOV_PATTERN_RAID0); - if (rc < 0) - GOTO(out_tmp, rc); - - rc = 0; - } - - ost_count = lov->desc.ld_tgt_count; - - LASSERT(src_oa->o_valid & OBD_MD_FLID); - lsm->lsm_object_id = src_oa->o_id; - if (!lsm->lsm_stripe_size) - lsm->lsm_stripe_size = lov->desc.ld_default_stripe_size; - if (!lsm->lsm_pattern) { - lsm->lsm_pattern = lov->desc.ld_pattern ? - lov->desc.ld_pattern : LOV_PATTERN_RAID0; - } - - if (*ea == NULL || lsm->lsm_oinfo[0].loi_ost_idx >= ost_count) { - if (--ost_start_count <= 0) { - ost_start_idx = ll_insecure_random_int(); - ost_start_count = LOV_CREATE_RESEED_INTERVAL; - } else if (lsm->lsm_stripe_count >= - lov->desc.ld_active_tgt_count) { - /* If we allocate from all of the stripes, make the - * next file start on the next OST. */ - ++ost_start_idx; - } - ost_idx = ost_start_idx % ost_count; - } else { - ost_idx = lsm->lsm_oinfo[0].loi_ost_idx; - } - - CDEBUG(D_INODE, "allocating %d subobjs for objid "LPX64" at idx %d\n", - lsm->lsm_stripe_count, lsm->lsm_object_id, ost_idx); - - /* XXX LOV STACKING: need to figure out how many real OSCs */ - if (oti && (src_oa->o_valid & OBD_MD_FLCOOKIE)) { - oti_alloc_cookies(oti, lsm->lsm_stripe_count); - if (!oti->oti_logcookies) - GOTO(out_cleanup, rc = -ENOMEM); - cookies = oti->oti_logcookies; - } - - loi = lsm->lsm_oinfo; - for (i = 0; i < ost_count; i++, ost_idx = (ost_idx + 1) % ost_count) { - struct lov_stripe_md obj_md; - struct lov_stripe_md *obj_mdp = &obj_md; - int err; - - ++ost_start_idx; - if (lov->tgts[ost_idx].active == 0) { - CDEBUG(D_HA, "lov idx %d inactive\n", ost_idx); - continue; - } - - /* create data objects with "parent" OA */ - memcpy(tmp_oa, src_oa, sizeof(*tmp_oa)); - - /* XXX When we start creating objects on demand, we need to - * make sure that we always create the object on the - * stripe which holds the existing file size. - */ - if (src_oa->o_valid & OBD_MD_FLSIZE) { - if (lov_stripe_offset(lsm, src_oa->o_size, i, - &tmp_oa->o_size) < 0 && - tmp_oa->o_size) - tmp_oa->o_size--; - - CDEBUG(D_INODE, "stripe %d has size "LPU64"/"LPU64"\n", - i, tmp_oa->o_size, src_oa->o_size); - } + rc = lov_prep_create_set(exp, ea, src_oa, oti, &set); + if (rc) + RETURN(rc); + list_for_each (pos, &set->set_list) { + struct lov_request *req = + list_entry(pos, struct lov_request, rq_link); /* XXX: LOV STACKING: use real "obj_mdp" sub-data */ - err = obd_create(lov->tgts[ost_idx].ltd_exp, tmp_oa, &obj_mdp, - oti); - if (err) { - if (lov->tgts[ost_idx].active) { - CERROR("error creating objid "LPX64" sub-object" - " on OST idx %d/%d: rc = %d\n", - src_oa->o_id, ost_idx, - lsm->lsm_stripe_count, err); - if (err > 0) { - CERROR("obd_create returned invalid " - "err %d\n", err); - err = -EIO; - } - } - if (!rc) - rc = err; - continue; - } - if (oti->oti_objid) - oti->oti_objid[ost_idx] = tmp_oa->o_id; - loi->loi_id = tmp_oa->o_id; - loi->loi_ost_idx = ost_idx; - CDEBUG(D_INODE, "objid "LPX64" has subobj "LPX64" at idx %d\n", - lsm->lsm_object_id, loi->loi_id, ost_idx); - - lov_merge_attrs(ret_oa, tmp_oa, tmp_oa->o_valid, lsm, - obj_alloc, &set); - loi_init(loi); - - if (cookies) - ++oti->oti_logcookies; - if (tmp_oa->o_valid & OBD_MD_FLCOOKIE) - ++cookie_sent; - ++obj_alloc; - ++loi; - - /* If we have allocated enough objects, we are OK */ - if (obj_alloc == lsm->lsm_stripe_count) - GOTO(out_done, rc = 0); - } - - if (obj_alloc == 0) { - if (rc == 0) - rc = -EIO; - GOTO(out_cleanup, rc); - } - - /* If we were passed specific striping params, then a failure to - * meet those requirements is an error, since we can't reallocate - * that memory (it might be part of a larger array or something). - * - * We can only get here if lsm_stripe_count was originally > 1. - */ - if (*ea != NULL) { - CERROR("can't lstripe objid "LPX64": have %u want %u, rc %d\n", - lsm->lsm_object_id, obj_alloc, lsm->lsm_stripe_count,rc); - if (rc == 0) - rc = -EFBIG; - GOTO(out_cleanup, rc); - } else { - struct lov_stripe_md *lsm_new; - /* XXX LOV STACKING call into osc for sizes */ - unsigned oldsize, newsize; - - if (oti && cookies && cookie_sent) { - oldsize = lsm->lsm_stripe_count * sizeof(*cookies); - newsize = obj_alloc * sizeof(*cookies); - - oti_alloc_cookies(oti, obj_alloc); - if (oti->oti_logcookies) { - memcpy(oti->oti_logcookies, cookies, newsize); - OBD_FREE(cookies, oldsize); - cookies = oti->oti_logcookies; - } else { - CWARN("'leaking' %d bytes\n", oldsize-newsize); - } - } - - CWARN("using fewer stripes for object "LPX64": old %u new %u\n", - lsm->lsm_object_id, lsm->lsm_stripe_count, obj_alloc); - oldsize = lov_stripe_md_size(lsm->lsm_stripe_count); - newsize = lov_stripe_md_size(obj_alloc); - OBD_ALLOC(lsm_new, newsize); - if (lsm_new != NULL) { - memcpy(lsm_new, lsm, newsize); - lsm_new->lsm_stripe_count = obj_alloc; - OBD_FREE(lsm, oldsize); - lsm = lsm_new; - } else { - CWARN("'leaking' %d bytes\n", oldsize - newsize); - } - rc = 0; - } - EXIT; - out_done: - *ea = lsm; - if (src_oa->o_valid & OBD_MD_FLSIZE && - ret_oa->o_size != src_oa->o_size) { - CERROR("original size "LPU64" isn't new object size "LPU64"\n", - src_oa->o_size, ret_oa->o_size); - LBUG(); - } - ret_oa->o_id = src_oa->o_id; - memcpy(src_oa, ret_oa, sizeof(*src_oa)); - - out_tmp: - obdo_free(tmp_oa); - out_oa: - obdo_free(ret_oa); - if (oti && cookies) { - oti->oti_logcookies = cookies; - if (!cookie_sent) { - oti_free_cookies(oti); - src_oa->o_valid &= ~OBD_MD_FLCOOKIE; - } else { - src_oa->o_valid |= OBD_MD_FLCOOKIE; - } + rc = obd_create(lov->tgts[req->rq_idx].ltd_exp, + req->rq_oa, &req->rq_md, oti); + lov_update_create_set(set, req, rc); } + rc = lov_fini_create_set(set, ea); RETURN(rc); - - out_cleanup: - while (obj_alloc-- > 0) { - struct obd_export *sub_exp; - int err; - - --loi; - sub_exp = lov->tgts[loi->loi_ost_idx].ltd_exp; - /* destroy already created objects here */ - memcpy(tmp_oa, src_oa, sizeof(*tmp_oa)); - tmp_oa->o_id = loi->loi_id; - - err = obd_destroy(sub_exp, tmp_oa, NULL, oti); - if (err) - CERROR("Failed to uncreate objid "LPX64" subobj "LPX64 - " on OST idx %d: rc = %d\n", src_oa->o_id, - loi->loi_id, loi->loi_ost_idx, err); - } - if (*ea == NULL) - obd_free_memmd(exp, &lsm); - goto out_tmp; } -#define lsm_bad_magic(LSMP) \ -({ \ - struct lov_stripe_md *_lsm__ = (LSMP); \ - int _ret__ = 0; \ - if (!_lsm__) { \ - CERROR("LOV requires striping ea\n"); \ - _ret__ = 1; \ - } else if (_lsm__->lsm_magic != LOV_MAGIC) { \ - CERROR("LOV striping magic bad %#x != %#x\n", \ - _lsm__->lsm_magic, LOV_MAGIC); \ - _ret__ = 1; \ - } \ - _ret__; \ -}) +#define ASSERT_LSM_MAGIC(lsmp) \ +do { \ + LASSERT((lsmp) != NULL); \ + LASSERTF((lsmp)->lsm_magic == LOV_MAGIC, "%p, %x", \ + (lsmp), (lsmp)->lsm_magic); \ +} while (0) static int lov_destroy(struct obd_export *exp, struct obdo *oa, struct lov_stripe_md *lsm, struct obd_trans_info *oti) { - struct obdo tmp; + struct lov_request_set *set; + struct lov_request *req; + struct list_head *pos; struct lov_obd *lov; - struct lov_oinfo *loi; - int rc = 0, i; + int rc = 0; ENTRY; - if (lsm_bad_magic(lsm)) - RETURN(-EINVAL); + ASSERT_LSM_MAGIC(lsm); if (!exp || !exp->exp_obd) RETURN(-ENODEV); lov = &exp->exp_obd->u.lov; - for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) { + rc = lov_prep_destroy_set(exp, oa, lsm, oti, &set); + if (rc) + RETURN(rc); + + list_for_each (pos, &set->set_list) { int err; - if (lov->tgts[loi->loi_ost_idx].active == 0) { - CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx); - /* Orphan clean up will (someday) fix this up. */ - if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE) - oti->oti_logcookies++; - continue; - } + req = list_entry(pos, struct lov_request, rq_link); - memcpy(&tmp, oa, sizeof(tmp)); - tmp.o_id = loi->loi_id; - err = obd_destroy(lov->tgts[loi->loi_ost_idx].ltd_exp, &tmp, - NULL, oti); - if (err && lov->tgts[loi->loi_ost_idx].active) { + /* XXX update the cookie position */ + oti->oti_logcookies = set->set_cookies + req->rq_stripe; + rc = obd_destroy(lov->tgts[req->rq_idx].ltd_exp, req->rq_oa, + NULL, oti); + err = lov_update_common_set(set, req, rc); + if (rc) { CERROR("error: destroying objid "LPX64" subobj " - LPX64" on OST idx %d: rc = %d\n", - oa->o_id, loi->loi_id, loi->loi_ost_idx, err); + LPX64" on OST idx %d: rc = %d\n", + set->set_oa->o_id, req->rq_oa->o_id, + req->rq_idx, rc); if (!rc) rc = err; } } + lov_fini_destroy_set(set); RETURN(rc); } static int lov_getattr(struct obd_export *exp, struct obdo *oa, struct lov_stripe_md *lsm) { - struct obdo tmp; + struct lov_request_set *set; + struct lov_request *req; + struct list_head *pos; struct lov_obd *lov; - struct lov_oinfo *loi; - int i, rc = 0, set = 0; + int err = 0, rc = 0; ENTRY; - if (lsm_bad_magic(lsm)) - RETURN(-EINVAL); + ASSERT_LSM_MAGIC(lsm); if (!exp || !exp->exp_obd) RETURN(-ENODEV); lov = &exp->exp_obd->u.lov; + + rc = lov_prep_getattr_set(exp, oa, lsm, &set); + if (rc) + RETURN(rc); - CDEBUG(D_INFO, "objid "LPX64": %ux%u byte stripes\n", - lsm->lsm_object_id, lsm->lsm_stripe_count, lsm->lsm_stripe_size); - for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) { - int err; - - if (lov->tgts[loi->loi_ost_idx].active == 0) { - CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx); - continue; - } - + list_for_each (pos, &set->set_list) { + req = list_entry(pos, struct lov_request, rq_link); + CDEBUG(D_INFO, "objid "LPX64"[%d] has subobj "LPX64" at idx " - "%u\n", oa->o_id, i, loi->loi_id, loi->loi_ost_idx); - /* create data objects with "parent" OA */ - memcpy(&tmp, oa, sizeof(tmp)); - tmp.o_id = loi->loi_id; + "%u\n", oa->o_id, req->rq_stripe, req->rq_oa->o_id, + req->rq_idx); - err = obd_getattr(lov->tgts[loi->loi_ost_idx].ltd_exp, &tmp, - NULL); + rc = obd_getattr(lov->tgts[req->rq_idx].ltd_exp, + req->rq_oa, NULL); + err = lov_update_common_set(set, req, rc); if (err) { - if (lov->tgts[loi->loi_ost_idx].active) { - CERROR("error: getattr objid "LPX64" subobj " - LPX64" on OST idx %d: rc = %d\n", - oa->o_id, loi->loi_id, loi->loi_ost_idx, - err); - RETURN(err); - } - } else { - lov_merge_attrs(oa, &tmp, tmp.o_valid, lsm, i, &set); + CERROR("error: getattr objid "LPX64" subobj " + LPX64" on OST idx %d: rc = %d\n", + set->set_oa->o_id, req->rq_oa->o_id, + req->rq_idx, err); + break; } } - if (!set) - rc = -EIO; + + rc = lov_fini_getattr_set(set); + if (err) + rc = err; RETURN(rc); } static int lov_getattr_interpret(struct ptlrpc_request_set *rqset, void *data, int rc) { - struct lov_getattr_async_args *aa = data; - struct lov_stripe_md *lsm = aa->aa_lsm; - struct obdo *oa = aa->aa_oa; - struct obdo *obdos = aa->aa_obdos; - struct lov_oinfo *loi; - int i; - int set = 0; + struct lov_request_set *lovset = (struct lov_request_set *)data; ENTRY; - if (rc == 0) { - /* NB all stripe requests succeeded to get here */ - - for (i = 0, loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; - i++, loi++) { - if (obdos[i].o_valid == 0) /* inactive stripe */ - continue; - - lov_merge_attrs(oa, &obdos[i], obdos[i].o_valid, lsm, - i, &set); - } - - if (!set) { - CERROR ("No stripes had valid attrs\n"); - rc = -EIO; - } + /* don't do attribute merge if this aysnc op failed */ + if (rc) { + lovset->set_completes = 0; + lov_fini_getattr_set(lovset); + } else { + rc = lov_fini_getattr_set(lovset); } - - OBD_FREE (obdos, lsm->lsm_stripe_count * sizeof (*obdos)); RETURN (rc); } @@ -1015,84 +659,67 @@ static int lov_getattr_async(struct obd_export *exp, struct obdo *oa, struct lov_stripe_md *lsm, struct ptlrpc_request_set *rqset) { - struct obdo *obdos; + struct lov_request_set *lovset; struct lov_obd *lov; - struct lov_oinfo *loi; - struct lov_getattr_async_args *aa; - int i, rc = 0, set = 0; + struct list_head *pos; + struct lov_request *req; + int rc = 0; ENTRY; - if (lsm_bad_magic(lsm)) - RETURN(-EINVAL); + ASSERT_LSM_MAGIC(lsm); if (!exp || !exp->exp_obd) RETURN(-ENODEV); lov = &exp->exp_obd->u.lov; - OBD_ALLOC (obdos, lsm->lsm_stripe_count * sizeof (*obdos)); - if (obdos == NULL) - RETURN(-ENOMEM); + rc = lov_prep_getattr_set(exp, oa, lsm, &lovset); + if (rc) + RETURN(rc); CDEBUG(D_INFO, "objid "LPX64": %ux%u byte stripes\n", lsm->lsm_object_id, lsm->lsm_stripe_count, lsm->lsm_stripe_size); - for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) { - int err; - - if (lov->tgts[loi->loi_ost_idx].active == 0) { - CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx); - /* leaves obdos[i].obd_valid unset */ - continue; - } + list_for_each (pos, &lovset->set_list) { + req = list_entry(pos, struct lov_request, rq_link); + CDEBUG(D_INFO, "objid "LPX64"[%d] has subobj "LPX64" at idx " - "%u\n", oa->o_id, i, loi->loi_id, loi->loi_ost_idx); - /* create data objects with "parent" OA */ - memcpy(&obdos[i], oa, sizeof(obdos[i])); - obdos[i].o_id = loi->loi_id; - - err = obd_getattr_async(lov->tgts[loi->loi_ost_idx].ltd_exp, - &obdos[i], NULL, rqset); - if (err) { + "%u\n", oa->o_id, req->rq_stripe, req->rq_oa->o_id, + req->rq_idx); + rc = obd_getattr_async(lov->tgts[req->rq_idx].ltd_exp, + req->rq_oa, NULL, rqset); + if (rc) { CERROR("error: getattr objid "LPX64" subobj " LPX64" on OST idx %d: rc = %d\n", - oa->o_id, loi->loi_id, loi->loi_ost_idx, - err); - GOTO(out_obdos, rc = err); + lovset->set_oa->o_id, req->rq_oa->o_id, + req->rq_idx, rc); + GOTO(out, rc); } - set = 1; + lov_update_common_set(lovset, req, rc); } - if (!set) - GOTO (out_obdos, rc = -EIO); - + + LASSERT(rc == 0); LASSERT (rqset->set_interpret == NULL); rqset->set_interpret = lov_getattr_interpret; - LASSERT (sizeof (rqset->set_args) >= sizeof (*aa)); - aa = (struct lov_getattr_async_args *)&rqset->set_args; - aa->aa_lsm = lsm; - aa->aa_oa = oa; - aa->aa_obdos = obdos; - aa->aa_lov = lov; - GOTO(out, rc = 0); - -out_obdos: - OBD_FREE (obdos, lsm->lsm_stripe_count * sizeof (*obdos)); + rqset->set_arg = (void *)lovset; + RETURN(rc); out: + LASSERT(rc); + lov_fini_getattr_set(lovset); RETURN(rc); } - static int lov_setattr(struct obd_export *exp, struct obdo *src_oa, struct lov_stripe_md *lsm, struct obd_trans_info *oti) { - struct obdo *tmp_oa, *ret_oa; + struct lov_request_set *set; struct lov_obd *lov; - struct lov_oinfo *loi; - int rc = 0, i, set = 0; + struct list_head *pos; + struct lov_request *req; + int err = 0, rc = 0; ENTRY; - if (lsm_bad_magic(lsm)) - RETURN(-EINVAL); + ASSERT_LSM_MAGIC(lsm); if (!exp || !exp->exp_obd) RETURN(-ENODEV); @@ -1102,245 +729,30 @@ static int lov_setattr(struct obd_export *exp, struct obdo *src_oa, OBD_MD_FLATIME | OBD_MD_FLMTIME | OBD_MD_FLCTIME | OBD_MD_FLFLAGS | OBD_MD_FLSIZE))); - ret_oa = obdo_alloc(); - if (!ret_oa) - RETURN(-ENOMEM); - - tmp_oa = obdo_alloc(); - if (!tmp_oa) - GOTO(out_oa, rc = -ENOMEM); - lov = &exp->exp_obd->u.lov; - for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) { - int err; - - if (lov->tgts[loi->loi_ost_idx].active == 0) { - CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx); - continue; - } - - memcpy(tmp_oa, src_oa, sizeof(*tmp_oa)); - tmp_oa->o_id = loi->loi_id; - - if (src_oa->o_valid & OBD_MD_FLSIZE) { - if (lov_stripe_offset(lsm, src_oa->o_size, i, - &tmp_oa->o_size) < 0 && - tmp_oa->o_size) - tmp_oa->o_size--; - - CDEBUG(D_INODE, "stripe %d has size "LPU64"/"LPU64"\n", - i, tmp_oa->o_size, src_oa->o_size); - } - if (src_oa->o_valid & OBD_MD_FLMTIME) - loi->loi_mtime = src_oa->o_mtime; + rc = lov_prep_setattr_set(exp, src_oa, lsm, NULL, &set); + if (rc) + RETURN(rc); - err = obd_setattr(lov->tgts[loi->loi_ost_idx].ltd_exp, tmp_oa, - NULL, NULL); + list_for_each (pos, &set->set_list) { + req = list_entry(pos, struct lov_request, rq_link); + + rc = obd_setattr(lov->tgts[req->rq_idx].ltd_exp, req->rq_oa, + NULL, NULL); + err = lov_update_common_set(set, req, rc); if (err) { - if (lov->tgts[loi->loi_ost_idx].active) { - CERROR("error: setattr objid "LPX64" subobj " - LPX64" on OST idx %d: rc = %d\n", - src_oa->o_id, loi->loi_id, - loi->loi_ost_idx, err); - if (!rc) - rc = err; - } - continue; - } - lov_merge_attrs(ret_oa, tmp_oa, tmp_oa->o_valid, lsm, i, &set); - } - if (!set && !rc) - rc = -EIO; - - ret_oa->o_id = src_oa->o_id; - memcpy(src_oa, ret_oa, sizeof(*src_oa)); - GOTO(out_tmp, rc); -out_tmp: - obdo_free(tmp_oa); -out_oa: - obdo_free(ret_oa); - return rc; -} - -/* we have an offset in file backed by an lov and want to find out where - * that offset lands in our given stripe of the file. for the easy - * case where the offset is within the stripe, we just have to scale the - * offset down to make it relative to the stripe instead of the lov. - * - * the harder case is what to do when the offset doesn't intersect the - * stripe. callers will want start offsets clamped ahead to the start - * of the nearest stripe in the file. end offsets similarly clamped to the - * nearest ending byte of a stripe in the file: - * - * all this function does is move offsets to the nearest region of the - * stripe, and it does its work "mod" the full length of all the stripes. - * consider a file with 3 stripes: - * - * S E - * --------------------------------------------------------------------- - * | 0 | 1 | 2 | 0 | 1 | 2 | - * --------------------------------------------------------------------- - * - * to find stripe 1's offsets for S and E, it divides by the full stripe - * width and does its math in the context of a single set of stripes: - * - * S E - * ----------------------------------- - * | 0 | 1 | 2 | - * ----------------------------------- - * - * it'll notice that E is outside stripe 1 and clamp it to the end of the - * stripe, then multiply it back out by lov_off to give the real offsets in - * the stripe: - * - * S E - * --------------------------------------------------------------------- - * | 1 | 1 | 1 | 1 | 1 | 1 | - * --------------------------------------------------------------------- - * - * it would have done similarly and pulled S forward to the start of a 1 - * stripe if, say, S had landed in a 0 stripe. - * - * this rounding isn't always correct. consider an E lov offset that lands - * on a 0 stripe, the "mod stripe width" math will pull it forward to the - * start of a 1 stripe, when in fact it wanted to be rounded back to the end - * of a previous 1 stripe. this logic is handled by callers and this is why: - * - * this function returns < 0 when the offset was "before" the stripe and - * was moved forward to the start of the stripe in question; 0 when it - * falls in the stripe and no shifting was done; > 0 when the offset - * was outside the stripe and was pulled back to its final byte. */ -static int lov_stripe_offset(struct lov_stripe_md *lsm, obd_off lov_off, - int stripeno, obd_off *obd_off) -{ - unsigned long ssize = lsm->lsm_stripe_size; - unsigned long swidth = ssize * lsm->lsm_stripe_count; - unsigned long stripe_off, this_stripe; - int ret = 0; - - if (lov_off == OBD_OBJECT_EOF) { - *obd_off = OBD_OBJECT_EOF; - return 0; - } - - /* do_div(a, b) returns a % b, and a = a / b */ - stripe_off = do_div(lov_off, swidth); - - this_stripe = stripeno * ssize; - if (stripe_off < this_stripe) { - stripe_off = 0; - ret = -1; - } else { - stripe_off -= this_stripe; - - if (stripe_off >= ssize) { - stripe_off = ssize; - ret = 1; - } - } - - *obd_off = lov_off * ssize + stripe_off; - return ret; -} - -/* Given a whole-file size and a stripe number, give the file size which - * corresponds to the individual object of that stripe. - * - * This behaves basically in the same was as lov_stripe_offset, except that - * file sizes falling before the beginning of a stripe are clamped to the end - * of the previous stripe, not the beginning of the next: - * - * S - * --------------------------------------------------------------------- - * | 0 | 1 | 2 | 0 | 1 | 2 | - * --------------------------------------------------------------------- - * - * if clamped to stripe 2 becomes: - * - * S - * --------------------------------------------------------------------- - * | 0 | 1 | 2 | 0 | 1 | 2 | - * --------------------------------------------------------------------- - */ -static obd_off lov_size_to_stripe(struct lov_stripe_md *lsm, obd_off file_size, - int stripeno) -{ - unsigned long ssize = lsm->lsm_stripe_size; - unsigned long swidth = ssize * lsm->lsm_stripe_count; - unsigned long stripe_off, this_stripe; - - if (file_size == OBD_OBJECT_EOF) - return OBD_OBJECT_EOF; - - /* do_div(a, b) returns a % b, and a = a / b */ - stripe_off = do_div(file_size, swidth); - - this_stripe = stripeno * ssize; - if (stripe_off < this_stripe) { - /* Move to end of previous stripe, or zero */ - if (file_size > 0) { - file_size--; - stripe_off = ssize; - } else { - stripe_off = 0; - } - } else { - stripe_off -= this_stripe; - - if (stripe_off >= ssize) { - /* Clamp to end of this stripe */ - stripe_off = ssize; + CERROR("error: setattr objid "LPX64" subobj " + LPX64" on OST idx %d: rc = %d\n", + set->set_oa->o_id, req->rq_oa->o_id, + req->rq_idx, err); + if (!rc) + rc = err; } } - - return (file_size * ssize + stripe_off); -} - -/* given an extent in an lov and a stripe, calculate the extent of the stripe - * that is contained within the lov extent. this returns true if the given - * stripe does intersect with the lov extent. */ -static int lov_stripe_intersects(struct lov_stripe_md *lsm, int stripeno, - obd_off start, obd_off end, - obd_off *obd_start, obd_off *obd_end) -{ - int start_side, end_side; - - start_side = lov_stripe_offset(lsm, start, stripeno, obd_start); - end_side = lov_stripe_offset(lsm, end, stripeno, obd_end); - - CDEBUG(D_INODE, "["LPU64"->"LPU64"] -> [(%d) "LPU64"->"LPU64" (%d)]\n", - start, end, start_side, *obd_start, *obd_end, end_side); - - /* this stripe doesn't intersect the file extent when neither - * start or the end intersected the stripe and obd_start and - * obd_end got rounded up to the save value. */ - if (start_side != 0 && end_side != 0 && *obd_start == *obd_end) - return 0; - - /* as mentioned in the lov_stripe_offset commentary, end - * might have been shifted in the wrong direction. This - * happens when an end offset is before the stripe when viewed - * through the "mod stripe size" math. we detect it being shifted - * in the wrong direction and touch it up. - * interestingly, this can't underflow since end must be > start - * if we passed through the previous check. - * (should we assert for that somewhere?) */ - if (end_side != 0) - (*obd_end)--; - - return 1; -} - -/* compute which stripe number "lov_off" will be written into */ -static int lov_stripe_number(struct lov_stripe_md *lsm, obd_off lov_off) -{ - unsigned long ssize = lsm->lsm_stripe_size; - unsigned long swidth = ssize * lsm->lsm_stripe_count; - unsigned long stripe_off; - - stripe_off = do_div(lov_off, swidth); - - return stripe_off / ssize; + err = lov_fini_setattr_set(set); + if (!rc) + rc = err; + RETURN(rc); } /* FIXME: maybe we'll just make one node the authoritative attribute node, then @@ -1350,101 +762,81 @@ static int lov_punch(struct obd_export *exp, struct obdo *oa, struct lov_stripe_md *lsm, obd_off start, obd_off end, struct obd_trans_info *oti) { - struct obdo tmp; + struct lov_request_set *set; struct lov_obd *lov; - struct lov_oinfo *loi; - int rc = 0, i; + struct list_head *pos; + struct lov_request *req; + int err = 0, rc = 0; ENTRY; - if (lsm_bad_magic(lsm)) - RETURN(-EINVAL); + ASSERT_LSM_MAGIC(lsm); if (!exp || !exp->exp_obd) RETURN(-ENODEV); lov = &exp->exp_obd->u.lov; - for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) { - obd_off starti, endi; - int err; - - if (lov->tgts[loi->loi_ost_idx].active == 0) { - CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx); - continue; - } - - if (!lov_stripe_intersects(lsm, i, start, end, &starti, &endi)) - continue; + rc = lov_prep_punch_set(exp, oa, lsm, start, end, oti, &set); + if (rc) + RETURN(rc); - /* create data objects with "parent" OA */ - memcpy(&tmp, oa, sizeof(tmp)); - tmp.o_id = loi->loi_id; + list_for_each (pos, &set->set_list) { + req = list_entry(pos, struct lov_request, rq_link); - err = obd_punch(lov->tgts[loi->loi_ost_idx].ltd_exp, &tmp, NULL, - starti, endi, NULL); + rc = obd_punch(lov->tgts[req->rq_idx].ltd_exp, req->rq_oa, + NULL, req->rq_extent.start, + req->rq_extent.end, NULL); + err = lov_update_punch_set(set, req, rc); if (err) { - if (lov->tgts[loi->loi_ost_idx].active) { - CERROR("error: punch objid "LPX64" subobj "LPX64 - " on OST idx %d: rc = %d\n", oa->o_id, - loi->loi_id, loi->loi_ost_idx, err); - } + CERROR("error: punch objid "LPX64" subobj "LPX64 + " on OST idx %d: rc = %d\n", set->set_oa->o_id, + req->rq_oa->o_id, req->rq_idx, rc); if (!rc) rc = err; - } else { - loi->loi_kms = loi->loi_rss = starti; } } + err = lov_fini_punch_set(set); + if (!rc) + rc = err; RETURN(rc); } static int lov_sync(struct obd_export *exp, struct obdo *oa, struct lov_stripe_md *lsm, obd_off start, obd_off end) { - struct obdo *tmp; + struct lov_request_set *set; struct lov_obd *lov; - struct lov_oinfo *loi; - int rc = 0, i; + struct list_head *pos; + struct lov_request *req; + int err = 0, rc = 0; ENTRY; - if (lsm_bad_magic(lsm)) - RETURN(-EINVAL); + ASSERT_LSM_MAGIC(lsm); if (!exp->exp_obd) RETURN(-ENODEV); - tmp = obdo_alloc(); - if (!tmp) - RETURN(-ENOMEM); - - lov = &exp->exp_obd->u.lov; - for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) { - obd_off starti, endi; - int err; - - if (lov->tgts[loi->loi_ost_idx].active == 0) { - CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx); - continue; - } - - if (!lov_stripe_intersects(lsm, i, start, end, &starti, &endi)) - continue; + lov = &exp->exp_obd->u.lov; + rc = lov_prep_sync_set(exp, oa, lsm, start, end, &set); + if (rc) + RETURN(rc); - memcpy(tmp, oa, sizeof(*tmp)); - tmp->o_id = loi->loi_id; + list_for_each (pos, &set->set_list) { + req = list_entry(pos, struct lov_request, rq_link); - err = obd_sync(lov->tgts[loi->loi_ost_idx].ltd_exp, tmp, NULL, - starti, endi); + rc = obd_sync(lov->tgts[req->rq_idx].ltd_exp, req->rq_oa, + NULL, req->rq_extent.start, req->rq_extent.end); + err = lov_update_common_set(set, req, rc); if (err) { - if (lov->tgts[loi->loi_ost_idx].active) { - CERROR("error: fsync objid "LPX64" subobj "LPX64 - " on OST idx %d: rc = %d\n", oa->o_id, - loi->loi_id, loi->loi_ost_idx, err); - } + CERROR("error: fsync objid "LPX64" subobj "LPX64 + " on OST idx %d: rc = %d\n", set->set_oa->o_id, + req->rq_oa->o_id, req->rq_idx, rc); if (!rc) rc = err; } } - - obdo_free(tmp); + err = lov_fini_sync_set(set); + if (!rc) + rc = err; RETURN(rc); } @@ -1459,12 +851,11 @@ static int lov_brw_check(struct lov_obd *lov, struct obdo *oa, for (i = 0; i < oa_bufs; i++) { int stripe = lov_stripe_number(lsm, pga[i].off); int ost = lsm->lsm_oinfo[stripe].loi_ost_idx; - struct ldlm_extent ext, subext; - ext.start = pga[i].off; - ext.end = pga[i].off + pga[i].count; + obd_off start, end; - if (!lov_stripe_intersects(lsm, i, ext.start, ext.end, - &subext.start, &subext.end)) + if (!lov_stripe_intersects(lsm, i, pga[i].off, + pga[i].off + pga[i].count, + &start, &end)) continue; if (lov->tgts[ost].active == 0) { @@ -1483,137 +874,57 @@ static int lov_brw(int cmd, struct obd_export *exp, struct obdo *src_oa, struct lov_stripe_md *lsm, obd_count oa_bufs, struct brw_page *pga, struct obd_trans_info *oti) { - struct { - int bufct; - int index; - int subcount; - struct lov_stripe_md lsm; - int ost_idx; - } *stripeinfo, *si, *si_last; - struct obdo *ret_oa = NULL, *tmp_oa = NULL; - struct lov_obd *lov; - struct brw_page *ioarr; - struct lov_oinfo *loi; - int rc = 0, i, *where, stripe_count = lsm->lsm_stripe_count, set = 0; + struct lov_request_set *set; + struct lov_request *req; + struct list_head *pos; + struct lov_obd *lov = &exp->exp_obd->u.lov; + int err, rc = 0; ENTRY; - if (lsm_bad_magic(lsm)) - RETURN(-EINVAL); - - lov = &exp->exp_obd->u.lov; + ASSERT_LSM_MAGIC(lsm); if (cmd == OBD_BRW_CHECK) { rc = lov_brw_check(lov, src_oa, lsm, oa_bufs, pga); RETURN(rc); } - OBD_ALLOC(stripeinfo, stripe_count * sizeof(*stripeinfo)); - if (!stripeinfo) - RETURN(-ENOMEM); - - OBD_ALLOC(where, sizeof(*where) * oa_bufs); - if (!where) - GOTO(out_sinfo, rc = -ENOMEM); - - OBD_ALLOC(ioarr, sizeof(*ioarr) * oa_bufs); - if (!ioarr) - GOTO(out_where, rc = -ENOMEM); - - if (src_oa) { - ret_oa = obdo_alloc(); - if (!ret_oa) - GOTO(out_ioarr, rc = -ENOMEM); - - tmp_oa = obdo_alloc(); - if (!tmp_oa) - GOTO(out_oa, rc = -ENOMEM); - } - - for (i = 0; i < oa_bufs; i++) { - where[i] = lov_stripe_number(lsm, pga[i].off); - stripeinfo[where[i]].bufct++; - } - - for (i = 0, loi = lsm->lsm_oinfo, si_last = si = stripeinfo; - i < stripe_count; i++, loi++, si_last = si, si++) { - if (i > 0) - si->index = si_last->index + si_last->bufct; - si->lsm.lsm_object_id = loi->loi_id; - si->ost_idx = loi->loi_ost_idx; - } - - for (i = 0; i < oa_bufs; i++) { - int which = where[i]; - int shift; - - shift = stripeinfo[which].index + stripeinfo[which].subcount; - LASSERT(shift < oa_bufs); - ioarr[shift] = pga[i]; - lov_stripe_offset(lsm, pga[i].off, which, &ioarr[shift].off); - stripeinfo[which].subcount++; - } - - for (i = 0, si = stripeinfo; i < stripe_count; i++, si++) { - int shift = si->index; - - if (lov->tgts[si->ost_idx].active == 0) { - CDEBUG(D_HA, "lov idx %d inactive\n", si->ost_idx); - GOTO(out_oa, rc = -EIO); - } - - if (si->bufct) { - LASSERT(shift < oa_bufs); - if (src_oa) - memcpy(tmp_oa, src_oa, sizeof(*tmp_oa)); - - tmp_oa->o_id = si->lsm.lsm_object_id; - rc = obd_brw(cmd, lov->tgts[si->ost_idx].ltd_exp, - tmp_oa, &si->lsm, si->bufct, - &ioarr[shift], oti); - if (rc) - GOTO(out_oa, rc); + rc = lov_prep_brw_set(exp, src_oa, lsm, oa_bufs, pga, oti, &set); + if (rc) + RETURN(rc); - lov_merge_attrs(ret_oa, tmp_oa, tmp_oa->o_valid, lsm, - i, &set); - } + list_for_each (pos, &set->set_list) { + struct obd_export *sub_exp; + struct brw_page *sub_pga; + req = list_entry(pos, struct lov_request, rq_link); + + sub_exp = lov->tgts[req->rq_idx].ltd_exp; + sub_pga = set->set_pga + req->rq_pgaidx; + rc = obd_brw(cmd, sub_exp, req->rq_oa, req->rq_md, + req->rq_oabufs, sub_pga, oti); + if (rc) + break; + lov_update_common_set(set, req, rc); } - ret_oa->o_id = src_oa->o_id; - memcpy(src_oa, ret_oa, sizeof(*src_oa)); - - GOTO(out_oa, rc); - out_oa: - if (tmp_oa) - obdo_free(tmp_oa); - if (ret_oa) - obdo_free(ret_oa); - out_ioarr: - OBD_FREE(ioarr, sizeof(*ioarr) * oa_bufs); - out_where: - OBD_FREE(where, sizeof(*where) * oa_bufs); - out_sinfo: - OBD_FREE(stripeinfo, stripe_count * sizeof(*stripeinfo)); - return rc; + err = lov_fini_brw_set(set); + if (!rc) + rc = err; + RETURN(rc); } -static int lov_brw_interpret(struct ptlrpc_request_set *set, void *data, int rc) +static int lov_brw_interpret(struct ptlrpc_request_set *reqset, void *data, + int rc) { - struct lov_brw_async_args *aa = data; - struct lov_stripe_md *lsm = aa->aa_lsm; - struct obdo *obdos = aa->aa_obdos; - struct lov_oinfo *loi; - int i = 0; + struct lov_request_set *lovset = (struct lov_request_set *)data; ENTRY; - - for (loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++, loi++) { - if (obdos[i].o_valid & OBD_MD_FLBLOCKS) - loi->loi_blocks = obdos[i].o_blocks; - if (obdos[i].o_valid & OBD_MD_FLMTIME) - loi->loi_mtime = obdos[i].o_mtime; + + if (rc) { + lovset->set_completes = 0; + lov_fini_brw_set(lovset); + } else { + rc = lov_fini_brw_set(lovset); } - - OBD_FREE(obdos, lsm->lsm_stripe_count * sizeof(*obdos)); - OBD_FREE(aa->aa_ioarr, sizeof(*aa->aa_ioarr) * aa->aa_oa_bufs); + RETURN(rc); } @@ -1622,154 +933,64 @@ static int lov_brw_async(int cmd, struct obd_export *exp, struct obdo *oa, struct brw_page *pga, struct ptlrpc_request_set *set, struct obd_trans_info *oti) { - struct { - int bufct; - int index; - int subcount; - struct lov_stripe_md lsm; - int ost_idx; - } *stripeinfo, *si, *si_last; - struct lov_obd *lov; - struct brw_page *ioarr; - struct obdo *obdos = NULL; - struct lov_oinfo *loi; - struct lov_brw_async_args *aa; - int rc = 0, i, *where, stripe_count = lsm->lsm_stripe_count; + struct lov_request_set *lovset; + struct lov_request *req; + struct list_head *pos; + struct lov_obd *lov = &exp->exp_obd->u.lov; + int rc = 0; ENTRY; - if (lsm_bad_magic(lsm)) - RETURN(-EINVAL); - - lov = &exp->exp_obd->u.lov; + ASSERT_LSM_MAGIC(lsm); if (cmd == OBD_BRW_CHECK) { rc = lov_brw_check(lov, oa, lsm, oa_bufs, pga); RETURN(rc); } - OBD_ALLOC(stripeinfo, stripe_count * sizeof(*stripeinfo)); - if (!stripeinfo) - RETURN(-ENOMEM); - - OBD_ALLOC(where, sizeof(*where) * oa_bufs); - if (!where) - GOTO(out_sinfo, rc = -ENOMEM); - - if (oa) { - OBD_ALLOC(obdos, sizeof(*obdos) * stripe_count); - if (!obdos) - GOTO(out_where, rc = -ENOMEM); - } - - OBD_ALLOC(ioarr, sizeof(*ioarr) * oa_bufs); - if (!ioarr) - GOTO(out_obdos, rc = -ENOMEM); - - for (i = 0; i < oa_bufs; i++) { - where[i] = lov_stripe_number(lsm, pga[i].off); - stripeinfo[where[i]].bufct++; - } - - for (i = 0, loi = lsm->lsm_oinfo, si_last = si = stripeinfo; - i < stripe_count; i++, loi++, si_last = si, si++) { - if (i > 0) - si->index = si_last->index + si_last->bufct; - si->lsm.lsm_object_id = loi->loi_id; - si->ost_idx = loi->loi_ost_idx; - - if (oa) { - memcpy(&obdos[i], oa, sizeof(*obdos)); - obdos[i].o_id = si->lsm.lsm_object_id; - } - } - - for (i = 0; i < oa_bufs; i++) { - int which = where[i]; - int shift; - - shift = stripeinfo[which].index + stripeinfo[which].subcount; - LASSERT(shift < oa_bufs); - ioarr[shift] = pga[i]; - lov_stripe_offset(lsm, pga[i].off, which, &ioarr[shift].off); - stripeinfo[which].subcount++; - } - - for (i = 0, si = stripeinfo; i < stripe_count; i++, si++) { - int shift = si->index; - - if (si->bufct == 0) - continue; - - if (lov->tgts[si->ost_idx].active == 0) { - CDEBUG(D_HA, "lov idx %d inactive\n", si->ost_idx); - GOTO(out_ioarr, rc = -EIO); - } - - LASSERT(shift < oa_bufs); + rc = lov_prep_brw_set(exp, oa, lsm, oa_bufs, pga, oti, &lovset); + if (rc) + RETURN(rc); - rc = obd_brw_async(cmd, lov->tgts[si->ost_idx].ltd_exp, - &obdos[i], &si->lsm, si->bufct, - &ioarr[shift], set, oti); + list_for_each (pos, &lovset->set_list) { + struct obd_export *sub_exp; + struct brw_page *sub_pga; + req = list_entry(pos, struct lov_request, rq_link); + + sub_exp = lov->tgts[req->rq_idx].ltd_exp; + sub_pga = lovset->set_pga + req->rq_pgaidx; + rc = obd_brw_async(cmd, sub_exp, req->rq_oa, req->rq_md, + req->rq_oabufs, sub_pga, set, oti); if (rc) - GOTO(out_ioarr, rc); + GOTO(out, rc); + lov_update_common_set(lovset, req, rc); } LASSERT(rc == 0); LASSERT(set->set_interpret == NULL); set->set_interpret = (set_interpreter_func)lov_brw_interpret; - LASSERT(sizeof(set->set_args) >= sizeof(struct lov_brw_async_args)); - aa = (struct lov_brw_async_args *)&set->set_args; - aa->aa_lsm = lsm; - aa->aa_obdos = obdos; - aa->aa_oa = oa; - aa->aa_ioarr = ioarr; - aa->aa_oa_bufs = oa_bufs; - - /* Don't free ioarr or obdos - that's done in lov_brw_interpret */ - GOTO(out_where, rc); - - out_ioarr: - OBD_FREE(ioarr, sizeof(*ioarr) * oa_bufs); - out_obdos: - OBD_FREE(obdos, stripe_count * sizeof(*obdos)); - out_where: - OBD_FREE(where, sizeof(*where) * oa_bufs); - out_sinfo: - OBD_FREE(stripeinfo, stripe_count * sizeof(*stripeinfo)); - return rc; + set->set_arg = (void *)lovset; + + RETURN(rc); +out: + lov_fini_brw_set(lovset); + RETURN(rc); } -struct lov_async_page *lap_from_cookie(void *cookie) -{ - struct lov_async_page *lap = cookie; - if (lap->lap_magic != LAP_MAGIC) - return ERR_PTR(-EINVAL); - return lap; -}; - static int lov_ap_make_ready(void *data, int cmd) { - struct lov_async_page *lap = lap_from_cookie(data); - /* XXX should these assert? */ - if (IS_ERR(lap)) - return -EINVAL; + struct lov_async_page *lap = LAP_FROM_COOKIE(data); return lap->lap_caller_ops->ap_make_ready(lap->lap_caller_data, cmd); } static int lov_ap_refresh_count(void *data, int cmd) { - struct lov_async_page *lap = lap_from_cookie(data); - if (IS_ERR(lap)) - return -EINVAL; + struct lov_async_page *lap = LAP_FROM_COOKIE(data); return lap->lap_caller_ops->ap_refresh_count(lap->lap_caller_data, cmd); } static void lov_ap_fill_obdo(void *data, int cmd, struct obdo *oa) { - struct lov_async_page *lap = lap_from_cookie(data); - /* XXX should these assert? */ - if (IS_ERR(lap)) - return; + struct lov_async_page *lap = LAP_FROM_COOKIE(data); lap->lap_caller_ops->ap_fill_obdo(lap->lap_caller_data, cmd, oa); /* XXX woah, shouldn't we be altering more here? size? */ @@ -1778,9 +999,7 @@ static void lov_ap_fill_obdo(void *data, int cmd, struct obdo *oa) static void lov_ap_completion(void *data, int cmd, struct obdo *oa, int rc) { - struct lov_async_page *lap = lap_from_cookie(data); - if (IS_ERR(lap)) - return; + struct lov_async_page *lap = LAP_FROM_COOKIE(data); /* in a raid1 regime this would down a count of many ios * in flight, onl calling the caller_ops completion when all @@ -1805,8 +1024,7 @@ int lov_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm, int rc; ENTRY; - if (lsm_bad_magic(lsm)) - RETURN(-EINVAL); + ASSERT_LSM_MAGIC(lsm); LASSERT(loi == NULL); OBD_ALLOC(lap, sizeof(*lap)); @@ -1851,12 +1069,9 @@ static int lov_queue_async_io(struct obd_export *exp, LASSERT(loi == NULL); - if (lsm_bad_magic(lsm)) - RETURN(-EINVAL); + ASSERT_LSM_MAGIC(lsm); - lap = lap_from_cookie(cookie); - if (IS_ERR(lap)) - RETURN(PTR_ERR(lap)); + lap = LAP_FROM_COOKIE(cookie); loi = &lsm->lsm_oinfo[lap->lap_stripe]; rc = obd_queue_async_io(lov->tgts[loi->loi_ost_idx].ltd_exp, lsm, @@ -1876,12 +1091,9 @@ static int lov_set_async_flags(struct obd_export *exp, LASSERT(loi == NULL); - if (lsm_bad_magic(lsm)) - RETURN(-EINVAL); + ASSERT_LSM_MAGIC(lsm); - lap = lap_from_cookie(cookie); - if (IS_ERR(lap)) - RETURN(PTR_ERR(lap)); + lap = LAP_FROM_COOKIE(cookie); loi = &lsm->lsm_oinfo[lap->lap_stripe]; rc = obd_set_async_flags(lov->tgts[loi->loi_ost_idx].ltd_exp, @@ -1902,12 +1114,9 @@ static int lov_queue_group_io(struct obd_export *exp, LASSERT(loi == NULL); - if (lsm_bad_magic(lsm)) - RETURN(-EINVAL); + ASSERT_LSM_MAGIC(lsm); - lap = lap_from_cookie(cookie); - if (IS_ERR(lap)) - RETURN(PTR_ERR(lap)); + lap = LAP_FROM_COOKIE(cookie); loi = &lsm->lsm_oinfo[lap->lap_stripe]; rc = obd_queue_group_io(lov->tgts[loi->loi_ost_idx].ltd_exp, lsm, loi, @@ -1929,8 +1138,7 @@ static int lov_trigger_group_io(struct obd_export *exp, LASSERT(loi == NULL); - if (lsm_bad_magic(lsm)) - RETURN(-EINVAL); + ASSERT_LSM_MAGIC(lsm); for (i = 0, loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++, loi++) { @@ -1952,12 +1160,9 @@ static int lov_teardown_async_page(struct obd_export *exp, LASSERT(loi == NULL); - if (lsm_bad_magic(lsm)) - RETURN(-EINVAL); + ASSERT_LSM_MAGIC(lsm); - lap = lap_from_cookie(cookie); - if (IS_ERR(lap)) - RETURN(PTR_ERR(lap)); + lap = LAP_FROM_COOKIE(cookie); loi = &lsm->lsm_oinfo[lap->lap_stripe]; rc = obd_teardown_async_page(lov->tgts[loi->loi_ost_idx].ltd_exp, @@ -1977,18 +1182,16 @@ static int lov_enqueue(struct obd_export *exp, struct lov_stripe_md *lsm, void *data,__u32 lvb_len, void *lvb_swabber, struct lustre_handle *lockh) { - struct lov_lock_handles *lov_lockh = NULL; + struct lov_request_set *set; + struct lov_request *req; + struct list_head *pos; struct lustre_handle *lov_lockhp; struct lov_obd *lov; - struct lov_oinfo *loi; - char submd_buf[sizeof(struct lov_stripe_md) + sizeof(struct lov_oinfo)]; - struct lov_stripe_md *submd = (void *)submd_buf; ldlm_error_t rc; - int i, save_flags = *flags, all_skipped = 1; + int save_flags = *flags; ENTRY; - if (lsm_bad_magic(lsm)) - RETURN(-EINVAL); + ASSERT_LSM_MAGIC(lsm); /* we should never be asked to replay a lock this way. */ LASSERT((*flags & LDLM_FL_REPLAY) == 0); @@ -1996,229 +1199,74 @@ static int lov_enqueue(struct obd_export *exp, struct lov_stripe_md *lsm, if (!exp || !exp->exp_obd) RETURN(-ENODEV); - if (lsm->lsm_stripe_count > 1) { - lov_lockh = lov_llh_new(lsm); - if (lov_lockh == NULL) - RETURN(-ENOMEM); - - lockh->cookie = lov_lockh->llh_handle.h_cookie; - lov_lockhp = lov_lockh->llh_handles; - } else { - lov_lockhp = lockh; - } - lov = &exp->exp_obd->u.lov; - for (i = 0, loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; - i++, loi++, lov_lockhp++) { - ldlm_policy_data_t sub_ext; - - if (!lov_stripe_intersects(lsm, i, policy->l_extent.start, - policy->l_extent.end, - &sub_ext.l_extent.start, - &sub_ext.l_extent.end)) - continue; + rc = lov_prep_enqueue_set(exp, lsm, policy, mode, lockh, &set); + if (rc) + RETURN(rc); - if (lov->tgts[loi->loi_ost_idx].active == 0) { - CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx); - continue; - } + list_for_each (pos, &set->set_list) { + ldlm_policy_data_t sub_policy; + req = list_entry(pos, struct lov_request, rq_link); + lov_lockhp = set->set_lockh->llh_handles + req->rq_stripe; + LASSERT(lov_lockhp); - all_skipped = 0; - - /* XXX LOV STACKING: submd should be from the subobj */ - submd->lsm_object_id = loi->loi_id; - submd->lsm_stripe_count = 0; - submd->lsm_oinfo->loi_kms_valid = loi->loi_kms_valid; - submd->lsm_oinfo->loi_rss = loi->loi_rss; - submd->lsm_oinfo->loi_kms = loi->loi_kms; - submd->lsm_oinfo->loi_blocks = loi->loi_blocks; - submd->lsm_oinfo->loi_mtime = loi->loi_mtime; - /* XXX submd is not fully initialized here */ *flags = save_flags; - rc = obd_enqueue(lov->tgts[loi->loi_ost_idx].ltd_exp, submd, - type, &sub_ext, mode, flags, bl_cb, cp_cb, - gl_cb, data, lvb_len, lvb_swabber, lov_lockhp); - - /* XXX FIXME: This unpleasantness doesn't belong here at *all*. - * It belongs in the OSC, except that the OSC doesn't have - * access to the real LOI -- it gets a copy, that we created - * above, and that copy can be arbitrarily out of date. - * - * The LOV API is due for a serious rewriting anyways, and this - * can be addressed then. */ - if (rc == ELDLM_OK) { - struct ldlm_lock *lock = ldlm_handle2lock(lov_lockhp); - __u64 tmp = submd->lsm_oinfo->loi_rss; - - LASSERT(lock != NULL); - loi->loi_rss = tmp; - loi->loi_mtime = submd->lsm_oinfo->loi_mtime; - loi->loi_blocks = submd->lsm_oinfo->loi_blocks; - /* Extend KMS up to the end of this lock and no further - * A lock on [x,y] means a KMS of up to y + 1 bytes! */ - if (tmp > lock->l_policy_data.l_extent.end) - tmp = lock->l_policy_data.l_extent.end + 1; - if (tmp >= loi->loi_kms) { - LDLM_DEBUG(lock, "acquired set stripe %d rss=" - LPU64", kms="LPU64"\n", i, - loi->loi_rss, tmp); - loi->loi_kms = tmp; - loi->loi_kms_valid = 1; - } else { - LDLM_DEBUG(lock, "acquired, set stripe %d rss=" - LPU64"; leaving kms="LPU64", end=" - LPU64"\n", i, loi->loi_rss, - loi->loi_kms, - lock->l_policy_data.l_extent.end); - } - ldlm_lock_allow_match(lock); - LDLM_LOCK_PUT(lock); - } else if (rc == ELDLM_LOCK_ABORTED && - save_flags & LDLM_FL_HAS_INTENT) { - memset(lov_lockhp, 0, sizeof(*lov_lockhp)); - loi->loi_rss = submd->lsm_oinfo->loi_rss; - loi->loi_mtime = submd->lsm_oinfo->loi_mtime; - loi->loi_blocks = submd->lsm_oinfo->loi_blocks; - CDEBUG(D_DLMTRACE, "glimpsed, set stripe %d rss="LPU64 - "; leaving kms="LPU64"\n", i, loi->loi_rss, - loi->loi_kms); - } else { - memset(lov_lockhp, 0, sizeof(*lov_lockhp)); - if (lov->tgts[loi->loi_ost_idx].active) { - CERROR("error: enqueue objid "LPX64" subobj " - LPX64" stripe %d idx %d: rc = %d\n", - lsm->lsm_object_id, loi->loi_id, - i, loi->loi_ost_idx, rc); - GOTO(out_locks, rc); - } - } - } - if (all_skipped) - GOTO(out_lockh, rc = -EIO); - - if (lsm->lsm_stripe_count > 1) - lov_llh_put(lov_lockh); - RETURN(ELDLM_OK); - - out_locks: - while (loi--, lov_lockhp--, i-- > 0) { - struct lov_stripe_md submd; - int err; - - if (lov_lockhp->cookie == 0) - continue; - - /* XXX LOV STACKING: submd should be from the subobj */ - submd.lsm_object_id = loi->loi_id; - submd.lsm_stripe_count = 0; - err = obd_cancel(lov->tgts[loi->loi_ost_idx].ltd_exp, &submd, - mode, lov_lockhp); - if (err && lov->tgts[loi->loi_ost_idx].active) { - CERROR("error: cancelling objid "LPX64" on OST " - "idx %d after enqueue error: rc = %d\n", - loi->loi_id, loi->loi_ost_idx, err); - } + sub_policy.l_extent.start = req->rq_extent.start; + sub_policy.l_extent.end = req->rq_extent.end; + + rc = obd_enqueue(lov->tgts[req->rq_idx].ltd_exp, req->rq_md, + type, &sub_policy, mode, flags, bl_cb, + cp_cb, gl_cb, data, lvb_len, lvb_swabber, + lov_lockhp); + rc = lov_update_enqueue_set(set, req, rc, save_flags); + if (rc != ELDLM_OK) + break; } -out_lockh: - if (lsm->lsm_stripe_count > 1) { - lov_llh_destroy(lov_lockh); - lov_llh_put(lov_lockh); - } - return rc; + lov_fini_enqueue_set(set, mode); + RETURN(rc); } static int lov_match(struct obd_export *exp, struct lov_stripe_md *lsm, __u32 type, ldlm_policy_data_t *policy, __u32 mode, int *flags, void *data, struct lustre_handle *lockh) { - struct lov_lock_handles *lov_lockh = NULL; + struct lov_request_set *set; + struct lov_request *req; + struct list_head *pos; + struct lov_obd *lov = &exp->exp_obd->u.lov; struct lustre_handle *lov_lockhp; - struct lov_obd *lov; - struct lov_oinfo *loi; - struct lov_stripe_md submd; - ldlm_error_t rc = 0; - int i; + int lov_flags, rc = 0; ENTRY; - if (lsm_bad_magic(lsm)) - RETURN(-EINVAL); + ASSERT_LSM_MAGIC(lsm); if (!exp || !exp->exp_obd) RETURN(-ENODEV); - if (lsm->lsm_stripe_count > 1) { - lov_lockh = lov_llh_new(lsm); - if (lov_lockh == NULL) - RETURN(-ENOMEM); - - lockh->cookie = lov_lockh->llh_handle.h_cookie; - lov_lockhp = lov_lockh->llh_handles; - } else { - lov_lockhp = lockh; - } - lov = &exp->exp_obd->u.lov; - for (i = 0, loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; - i++, loi++, lov_lockhp++) { - ldlm_policy_data_t sub_ext; - int lov_flags; - - if (!lov_stripe_intersects(lsm, i, policy->l_extent.start, - policy->l_extent.end, - &sub_ext.l_extent.start, - &sub_ext.l_extent.end)) - continue; + rc = lov_prep_match_set(exp, lsm, policy, mode, lockh, &set); + if (rc) + RETURN(rc); - if (lov->tgts[loi->loi_ost_idx].active == 0) { - CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx); - rc = -EIO; - break; - } + list_for_each (pos, &set->set_list) { + ldlm_policy_data_t sub_policy; + req = list_entry(pos, struct lov_request, rq_link); + lov_lockhp = set->set_lockh->llh_handles + req->rq_stripe; + LASSERT(lov_lockhp); - /* XXX LOV STACKING: submd should be from the subobj */ - submd.lsm_object_id = loi->loi_id; - submd.lsm_stripe_count = 0; + sub_policy.l_extent.start = req->rq_extent.start; + sub_policy.l_extent.end = req->rq_extent.end; lov_flags = *flags; - /* XXX submd is not fully initialized here */ - rc = obd_match(lov->tgts[loi->loi_ost_idx].ltd_exp, &submd, - type, &sub_ext, mode, &lov_flags, data, + + rc = obd_match(lov->tgts[req->rq_idx].ltd_exp, req->rq_md, + type, &sub_policy, mode, &lov_flags, data, lov_lockhp); + rc = lov_update_match_set(set, req, rc); if (rc != 1) break; } - if (rc == 1) { - if (lsm->lsm_stripe_count > 1) { - if (*flags & LDLM_FL_TEST_LOCK) - lov_llh_destroy(lov_lockh); - lov_llh_put(lov_lockh); - } - RETURN(1); - } - - while (loi--, lov_lockhp--, i-- > 0) { - struct lov_stripe_md submd; - int err; - - if (lov_lockhp->cookie == 0) - continue; - - /* XXX LOV STACKING: submd should be from the subobj */ - submd.lsm_object_id = loi->loi_id; - submd.lsm_stripe_count = 0; - err = obd_cancel(lov->tgts[loi->loi_ost_idx].ltd_exp, &submd, - mode, lov_lockhp); - if (err && lov->tgts[loi->loi_ost_idx].active) { - CERROR("error: cancelling objid "LPX64" on OST " - "idx %d after match failure: rc = %d\n", - loi->loi_id, loi->loi_ost_idx, err); - } - } - - if (lsm->lsm_stripe_count > 1) { - lov_llh_destroy(lov_lockh); - lov_llh_put(lov_lockh); - } + lov_fini_match_set(set, mode, *flags); RETURN(rc); } @@ -2231,8 +1279,7 @@ static int lov_change_cbdata(struct obd_export *exp, int rc = 0, i; ENTRY; - if (lsm_bad_magic(lsm)) - RETURN(-EINVAL); + ASSERT_LSM_MAGIC(lsm); if (!exp || !exp->exp_obd) RETURN(-ENODEV); @@ -2254,66 +1301,43 @@ static int lov_change_cbdata(struct obd_export *exp, static int lov_cancel(struct obd_export *exp, struct lov_stripe_md *lsm, __u32 mode, struct lustre_handle *lockh) { - struct lov_lock_handles *lov_lockh = NULL; + struct lov_request_set *set; + struct lov_request *req; + struct list_head *pos; + struct lov_obd *lov = &exp->exp_obd->u.lov; struct lustre_handle *lov_lockhp; - struct lov_obd *lov; - struct lov_oinfo *loi; - int rc = 0, i; + int err = 0, rc = 0; ENTRY; - if (lsm_bad_magic(lsm)) - RETURN(-EINVAL); + ASSERT_LSM_MAGIC(lsm); if (!exp || !exp->exp_obd) RETURN(-ENODEV); LASSERT(lockh); - if (lsm->lsm_stripe_count > 1) { - lov_lockh = lov_handle2llh(lockh); - if (!lov_lockh) { - CERROR("LOV: invalid lov lock handle %p\n", lockh); - RETURN(-EINVAL); - } - - lov_lockhp = lov_lockh->llh_handles; - } else { - lov_lockhp = lockh; - } - lov = &exp->exp_obd->u.lov; - for (i = 0, loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; - i++, loi++, lov_lockhp++) { - struct lov_stripe_md submd; - int err; + rc = lov_prep_cancel_set(exp, lsm, mode, lockh, &set); + if (rc) + RETURN(rc); - if (lov_lockhp->cookie == 0) { - CDEBUG(D_HA, "lov idx %d subobj "LPX64" no lock?\n", - loi->loi_ost_idx, loi->loi_id); - continue; - } + list_for_each (pos, &set->set_list) { + req = list_entry(pos, struct lov_request, rq_link); + lov_lockhp = set->set_lockh->llh_handles + req->rq_stripe; - /* XXX LOV STACKING: submd should be from the subobj */ - submd.lsm_object_id = loi->loi_id; - submd.lsm_stripe_count = 0; - err = obd_cancel(lov->tgts[loi->loi_ost_idx].ltd_exp, &submd, - mode, lov_lockhp); - if (err) { - if (lov->tgts[loi->loi_ost_idx].active) { - CERROR("error: cancel objid "LPX64" subobj " - LPX64" on OST idx %d: rc = %d\n", - lsm->lsm_object_id, - loi->loi_id, loi->loi_ost_idx, err); - if (!rc) - rc = err; - } + rc = obd_cancel(lov->tgts[req->rq_idx].ltd_exp, req->rq_md, + mode, lov_lockhp); + rc = lov_update_common_set(set, req, rc); + if (rc) { + CERROR("error: cancel objid "LPX64" subobj " + LPX64" on OST idx %d: rc = %d\n", + lsm->lsm_object_id, + req->rq_md->lsm_object_id, req->rq_idx, rc); + err = rc; } + } - - if (lsm->lsm_stripe_count > 1) - lov_llh_destroy(lov_lockh); - if (lov_lockh != NULL) - lov_llh_put(lov_lockh); - RETURN(rc); + lov_fini_cancel_set(set); + RETURN(err); } static int lov_cancel_unused(struct obd_export *exp, @@ -2324,8 +1348,7 @@ static int lov_cancel_unused(struct obd_export *exp, int rc = 0, i; ENTRY; - if (lsm_bad_magic(lsm)) - RETURN(-EINVAL); + ASSERT_LSM_MAGIC(lsm); if (!exp || !exp->exp_obd) RETURN(-ENODEV); @@ -2574,8 +1597,8 @@ static int lov_get_info(struct obd_export *exp, __u32 keylen, for (i = 0; i < lov->desc.ld_tgt_count; i++) { if (!lov->tgts[i].active) continue; - rc = obd_get_info(lov->tgts[i].ltd_exp, keylen, key, - &size, &(ids[i])); + rc = obd_get_info(lov->tgts[i].ltd_exp, + keylen, key, &size, &(ids[i])); if (rc != 0) RETURN(rc); } @@ -2595,7 +1618,7 @@ static int lov_set_info(struct obd_export *exp, obd_count keylen, { struct obd_device *obddev = class_exp2obd(exp); struct lov_obd *lov = &obddev->u.lov; - int i, rc = 0; + int i, rc = 0, err; ENTRY; #define KEY_IS(str) \ @@ -2605,22 +1628,18 @@ static int lov_set_info(struct obd_export *exp, obd_count keylen, if (vallen != lov->desc.ld_tgt_count) RETURN(-EINVAL); for (i = 0; i < lov->desc.ld_tgt_count; i++) { - int er; - /* initialize all OSCs, even inactive ones */ - er = obd_set_info(lov->tgts[i].ltd_exp, keylen, key, - sizeof(obd_id), ((obd_id*)val) + i); + err = obd_set_info(lov->tgts[i].ltd_exp, + keylen, key, sizeof(obd_id), + ((obd_id*)val) + i); if (!rc) - rc = er; + rc = err; } RETURN(rc); } - if (KEY_IS("growth_count")) { - if (vallen != sizeof(int)) - RETURN(-EINVAL); - } else if (KEY_IS("mds_conn") || KEY_IS("unlinked")) { + if (KEY_IS("mds_conn") || KEY_IS("unlinked")) { if (vallen != 0) RETURN(-EINVAL); } else { @@ -2628,78 +1647,20 @@ static int lov_set_info(struct obd_export *exp, obd_count keylen, } for (i = 0; i < lov->desc.ld_tgt_count; i++) { - int er; - if (val && !obd_uuid_equals(val, &lov->tgts[i].uuid)) continue; if (!val && !lov->tgts[i].active) continue; - er = obd_set_info(lov->tgts[i].ltd_exp, keylen, key, vallen, - val); + err = obd_set_info(lov->tgts[i].ltd_exp, + keylen, key, vallen, val); if (!rc) - rc = er; + rc = err; } RETURN(rc); #undef KEY_IS - -} - -/* Merge rss if @kms_only == 0 - * - * Even when merging RSS, we will take the KMS value if it's larger. - * This prevents getattr from stomping on dirty cached pages which - * extend the file size. */ -__u64 lov_merge_size(struct lov_stripe_md *lsm, int kms_only) -{ - struct lov_oinfo *loi; - __u64 size = 0; - int i; - - for (i = 0, loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; - i++, loi++) { - obd_size lov_size, tmpsize; - - tmpsize = loi->loi_kms; - if (kms_only == 0 && loi->loi_rss > tmpsize) - tmpsize = loi->loi_rss; - - lov_size = lov_stripe_size(lsm, tmpsize, i); - if (lov_size > size) - size = lov_size; - } - - return size; -} -EXPORT_SYMBOL(lov_merge_size); - -/* Merge blocks */ -__u64 lov_merge_blocks(struct lov_stripe_md *lsm) -{ - struct lov_oinfo *loi; - __u64 blocks = 0; - int i; - - for (i = 0, loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++){ - blocks += loi->loi_blocks; - } - return blocks; -} -EXPORT_SYMBOL(lov_merge_blocks); - -__u64 lov_merge_mtime(struct lov_stripe_md *lsm, __u64 current_time) -{ - struct lov_oinfo *loi; - int i; - - for (i = 0, loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++){ - if (loi->loi_mtime > current_time) - current_time = loi->loi_mtime; - } - return current_time; } -EXPORT_SYMBOL(lov_merge_mtime); #if 0 struct lov_multi_wait { @@ -2720,8 +1681,7 @@ int lov_complete_many(struct obd_export *exp, struct lov_stripe_md *lsm, int rc = 0, i; ENTRY; - if (lsm_bad_magic(lsm)) - RETURN(-EINVAL); + ASSERT_LSM_MAGIC(lsm); if (!exp || !exp->exp_obd) RETURN(-ENODEV); @@ -2792,25 +1752,7 @@ int lov_complete_many(struct obd_export *exp, struct lov_stripe_md *lsm, #endif int lov_increase_kms(struct obd_export *exp, struct lov_stripe_md *lsm, - obd_off size) -{ - struct lov_oinfo *loi; - int stripe = 0; - __u64 kms; - ENTRY; - - if (size > 0) - stripe = lov_stripe_number(lsm, size - 1); - kms = lov_size_to_stripe(lsm, size, stripe); - loi = &(lsm->lsm_oinfo[stripe]); - - CDEBUG(D_INODE, "stripe %d KMS %sincreasing "LPU64"->"LPU64"\n", - stripe, kms > loi->loi_kms ? "" : "not ", loi->loi_kms, kms); - if (kms > loi->loi_kms) - loi->loi_kms = kms; - RETURN(0); -} -EXPORT_SYMBOL(lov_increase_kms); + obd_off size); struct obd_ops lov_obd_ops = { .o_owner = THIS_MODULE, diff --git a/lustre/lov/lov_offset.c b/lustre/lov/lov_offset.c new file mode 100644 index 0000000..66fad27 --- /dev/null +++ b/lustre/lov/lov_offset.c @@ -0,0 +1,240 @@ +/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- + * vim:expandtab:shiftwidth=8:tabstop=8: + * + * Copyright (C) 2002, 2003 Cluster File Systems, Inc. + * + * This file is part of Lustre, http://www.lustre.org. + * + * Lustre is free software; you can redistribute it and/or + * modify it under the terms of version 2 of the GNU General Public + * License as published by the Free Software Foundation. + * + * Lustre is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Lustre; if not, write to the Free Software + * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + */ + +#ifndef EXPORT_SYMTAB +# define EXPORT_SYMTAB +#endif +#define DEBUG_SUBSYSTEM S_LOV + +#ifdef __KERNEL__ +#include +#else +#include +#endif + +#include +#include + +#include "lov_internal.h" + +/* compute object size given "stripeno" and the ost size */ +obd_size lov_stripe_size(struct lov_stripe_md *lsm, obd_size ost_size, + int stripeno) +{ + unsigned long ssize = lsm->lsm_stripe_size; + unsigned long swidth = ssize * lsm->lsm_stripe_count; + unsigned long stripe_size; + obd_size lov_size; + ENTRY; + + if (ost_size == 0) + RETURN(0); + + /* do_div(a, b) returns a % b, and a = a / b */ + stripe_size = do_div(ost_size, ssize); + if (stripe_size) + lov_size = ost_size * swidth + stripeno * ssize + stripe_size; + else + lov_size = (ost_size - 1) * swidth + (stripeno + 1) * ssize; + + RETURN(lov_size); +} + +/* we have an offset in file backed by an lov and want to find out where + * that offset lands in our given stripe of the file. for the easy + * case where the offset is within the stripe, we just have to scale the + * offset down to make it relative to the stripe instead of the lov. + * + * the harder case is what to do when the offset doesn't intersect the + * stripe. callers will want start offsets clamped ahead to the start + * of the nearest stripe in the file. end offsets similarly clamped to the + * nearest ending byte of a stripe in the file: + * + * all this function does is move offsets to the nearest region of the + * stripe, and it does its work "mod" the full length of all the stripes. + * consider a file with 3 stripes: + * + * S E + * --------------------------------------------------------------------- + * | 0 | 1 | 2 | 0 | 1 | 2 | + * --------------------------------------------------------------------- + * + * to find stripe 1's offsets for S and E, it divides by the full stripe + * width and does its math in the context of a single set of stripes: + * + * S E + * ----------------------------------- + * | 0 | 1 | 2 | + * ----------------------------------- + * + * it'll notice that E is outside stripe 1 and clamp it to the end of the + * stripe, then multiply it back out by lov_off to give the real offsets in + * the stripe: + * + * S E + * --------------------------------------------------------------------- + * | 1 | 1 | 1 | 1 | 1 | 1 | + * --------------------------------------------------------------------- + * + * it would have done similarly and pulled S forward to the start of a 1 + * stripe if, say, S had landed in a 0 stripe. + * + * this rounding isn't always correct. consider an E lov offset that lands + * on a 0 stripe, the "mod stripe width" math will pull it forward to the + * start of a 1 stripe, when in fact it wanted to be rounded back to the end + * of a previous 1 stripe. this logic is handled by callers and this is why: + * + * this function returns < 0 when the offset was "before" the stripe and + * was moved forward to the start of the stripe in question; 0 when it + * falls in the stripe and no shifting was done; > 0 when the offset + * was outside the stripe and was pulled back to its final byte. */ +int lov_stripe_offset(struct lov_stripe_md *lsm, obd_off lov_off, + int stripeno, obd_off *obd_off) +{ + unsigned long ssize = lsm->lsm_stripe_size; + unsigned long swidth = ssize * lsm->lsm_stripe_count; + unsigned long stripe_off, this_stripe; + int ret = 0; + + if (lov_off == OBD_OBJECT_EOF) { + *obd_off = OBD_OBJECT_EOF; + return 0; + } + + /* do_div(a, b) returns a % b, and a = a / b */ + stripe_off = do_div(lov_off, swidth); + + this_stripe = stripeno * ssize; + if (stripe_off < this_stripe) { + stripe_off = 0; + ret = -1; + } else { + stripe_off -= this_stripe; + + if (stripe_off >= ssize) { + stripe_off = ssize; + ret = 1; + } + } + + *obd_off = lov_off * ssize + stripe_off; + return ret; +} + +/* Given a whole-file size and a stripe number, give the file size which + * corresponds to the individual object of that stripe. + * + * This behaves basically in the same was as lov_stripe_offset, except that + * file sizes falling before the beginning of a stripe are clamped to the end + * of the previous stripe, not the beginning of the next: + * + * S + * --------------------------------------------------------------------- + * | 0 | 1 | 2 | 0 | 1 | 2 | + * --------------------------------------------------------------------- + * + * if clamped to stripe 2 becomes: + * + * S + * --------------------------------------------------------------------- + * | 0 | 1 | 2 | 0 | 1 | 2 | + * --------------------------------------------------------------------- + */ +obd_off lov_size_to_stripe(struct lov_stripe_md *lsm, obd_off file_size, + int stripeno) +{ + unsigned long ssize = lsm->lsm_stripe_size; + unsigned long swidth = ssize * lsm->lsm_stripe_count; + unsigned long stripe_off, this_stripe; + + if (file_size == OBD_OBJECT_EOF) + return OBD_OBJECT_EOF; + + /* do_div(a, b) returns a % b, and a = a / b */ + stripe_off = do_div(file_size, swidth); + + this_stripe = stripeno * ssize; + if (stripe_off < this_stripe) { + /* Move to end of previous stripe, or zero */ + if (file_size > 0) { + file_size--; + stripe_off = ssize; + } else { + stripe_off = 0; + } + } else { + stripe_off -= this_stripe; + + if (stripe_off >= ssize) { + /* Clamp to end of this stripe */ + stripe_off = ssize; + } + } + + return (file_size * ssize + stripe_off); +} + +/* given an extent in an lov and a stripe, calculate the extent of the stripe + * that is contained within the lov extent. this returns true if the given + * stripe does intersect with the lov extent. */ +int lov_stripe_intersects(struct lov_stripe_md *lsm, int stripeno, + obd_off start, obd_off end, + obd_off *obd_start, obd_off *obd_end) +{ + int start_side, end_side; + + start_side = lov_stripe_offset(lsm, start, stripeno, obd_start); + end_side = lov_stripe_offset(lsm, end, stripeno, obd_end); + + CDEBUG(D_INODE, "["LPU64"->"LPU64"] -> [(%d) "LPU64"->"LPU64" (%d)]\n", + start, end, start_side, *obd_start, *obd_end, end_side); + + /* this stripe doesn't intersect the file extent when neither + * start or the end intersected the stripe and obd_start and + * obd_end got rounded up to the save value. */ + if (start_side != 0 && end_side != 0 && *obd_start == *obd_end) + return 0; + + /* as mentioned in the lov_stripe_offset commentary, end + * might have been shifted in the wrong direction. This + * happens when an end offset is before the stripe when viewed + * through the "mod stripe size" math. we detect it being shifted + * in the wrong direction and touch it up. + * interestingly, this can't underflow since end must be > start + * if we passed through the previous check. + * (should we assert for that somewhere?) */ + if (end_side != 0) + (*obd_end)--; + + return 1; +} + +/* compute which stripe number "lov_off" will be written into */ +int lov_stripe_number(struct lov_stripe_md *lsm, obd_off lov_off) +{ + unsigned long ssize = lsm->lsm_stripe_size; + unsigned long swidth = ssize * lsm->lsm_stripe_count; + unsigned long stripe_off; + + stripe_off = do_div(lov_off, swidth); + + return stripe_off / ssize; +} diff --git a/lustre/lov/lov_pack.c b/lustre/lov/lov_pack.c index 419f3ba..164bcec 100644 --- a/lustre/lov/lov_pack.c +++ b/lustre/lov/lov_pack.c @@ -295,7 +295,7 @@ int lov_alloc_memmd(struct lov_stripe_md **lsmp, int stripe_count, int pattern) (*lsmp)->lsm_magic = LOV_MAGIC; (*lsmp)->lsm_stripe_count = stripe_count; (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES * stripe_count; - (*lsmp)->lsm_xfersize = PTLRPC_MTU * stripe_count; + (*lsmp)->lsm_xfersize = PTLRPC_MAX_BRW_SIZE * stripe_count; (*lsmp)->lsm_pattern = pattern; (*lsmp)->lsm_oinfo[0].loi_ost_idx = ~0; diff --git a/lustre/lov/lov_qos.c b/lustre/lov/lov_qos.c new file mode 100644 index 0000000..0f6f012 --- /dev/null +++ b/lustre/lov/lov_qos.c @@ -0,0 +1,186 @@ +/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- + * vim:expandtab:shiftwidth=8:tabstop=8: + * + * Copyright (C) 2002, 2003 Cluster File Systems, Inc. + * + * This file is part of Lustre, http://www.lustre.org. + * + * Lustre is free software; you can redistribute it and/or + * modify it under the terms of version 2 of the GNU General Public + * License as published by the Free Software Foundation. + * + * Lustre is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Lustre; if not, write to the Free Software + * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + */ + +#ifndef EXPORT_SYMTAB +# define EXPORT_SYMTAB +#endif +#define DEBUG_SUBSYSTEM S_LOV + +#ifdef __KERNEL__ +#else +#include +#endif + +#include +#include + +#include "lov_internal.h" + +void qos_shrink_lsm(struct lov_request_set *set) +{ + struct lov_stripe_md *lsm = set->set_md; + struct lov_stripe_md *lsm_new; + /* XXX LOV STACKING call into osc for sizes */ + unsigned oldsize, newsize; + + if (set->set_oti && set->set_cookies && set->set_cookie_sent) { + struct llog_cookie *cookies; + oldsize = lsm->lsm_stripe_count * sizeof(*cookies); + newsize = set->set_count * sizeof(*cookies); + + cookies = set->set_cookies; + oti_alloc_cookies(set->set_oti, set->set_count); + if (set->set_oti->oti_logcookies) { + memcpy(set->set_oti->oti_logcookies, cookies, newsize); + OBD_FREE(cookies, oldsize); + set->set_cookies = set->set_oti->oti_logcookies; + } else { + CWARN("'leaking' %d bytes\n", oldsize - newsize); + } + } + + CWARN("using fewer stripes for object "LPX64": old %u new %u\n", + lsm->lsm_object_id, lsm->lsm_stripe_count, set->set_count); + + oldsize = lov_stripe_md_size(lsm->lsm_stripe_count); + newsize = lov_stripe_md_size(set->set_count); + OBD_ALLOC(lsm_new, newsize); + if (lsm_new != NULL) { + memcpy(lsm_new, lsm, newsize); + lsm_new->lsm_stripe_count = set->set_count; + OBD_FREE(lsm, oldsize); + set->set_md = lsm_new; + } else { + CWARN("'leaking' %d bytes\n", oldsize - newsize); + } +} + +#define LOV_CREATE_RESEED_INTERVAL 1000 +/* FIXME use real qos data to prepare the lov create request */ +int qos_prep_create(struct lov_obd *lov, struct lov_request_set *set, int newea) +{ + static int ost_start_idx, ost_start_count; + unsigned ost_idx, ost_count = lov->desc.ld_tgt_count; + struct lov_stripe_md *lsm = set->set_md; + struct obdo *src_oa = set->set_oa; + int i, rc = 0; + ENTRY; + + LASSERT(src_oa->o_valid & OBD_MD_FLID); + + lsm->lsm_object_id = src_oa->o_id; + if (!lsm->lsm_stripe_size) + lsm->lsm_stripe_size = lov->desc.ld_default_stripe_size; + if (!lsm->lsm_pattern) { + lsm->lsm_pattern = lov->desc.ld_pattern ? + lov->desc.ld_pattern : LOV_PATTERN_RAID0; + } + + if (newea || lsm->lsm_oinfo[0].loi_ost_idx >= ost_count) { + if (--ost_start_count <= 0) { + ost_start_idx = ll_insecure_random_int(); + ost_start_count = LOV_CREATE_RESEED_INTERVAL; + } else if (lsm->lsm_stripe_count >= + lov->desc.ld_active_tgt_count) { + /* If we allocate from all of the stripes, make the + * next file start on the next OST. */ + ++ost_start_idx; + } + ost_idx = ost_start_idx % ost_count; + } else { + ost_idx = lsm->lsm_oinfo[0].loi_ost_idx; + } + + CDEBUG(D_INODE, "allocating %d subobjs for objid "LPX64" at idx %d\n", + lsm->lsm_stripe_count, lsm->lsm_object_id, ost_idx); + + for (i = 0; i < ost_count; i++, ost_idx = (ost_idx + 1) % ost_count) { + struct lov_request *req; + + ++ost_start_idx; + if (lov->tgts[ost_idx].active == 0) { + CDEBUG(D_HA, "lov idx %d inactive\n", ost_idx); + continue; + } + + OBD_ALLOC(req, sizeof(*req)); + if (req == NULL) + GOTO(out, rc = -ENOMEM); + + req->rq_buflen = sizeof(*req->rq_md); + OBD_ALLOC(req->rq_md, req->rq_buflen); + if (req->rq_md == NULL) + GOTO(out, rc = -ENOMEM); + + req->rq_oa = obdo_alloc(); + if (req->rq_oa == NULL) + GOTO(out, rc = -ENOMEM); + + req->rq_idx = ost_idx; + req->rq_stripe = i; + /* create data objects with "parent" OA */ + memcpy(req->rq_oa, src_oa, sizeof(*req->rq_oa)); + + /* XXX When we start creating objects on demand, we need to + * make sure that we always create the object on the + * stripe which holds the existing file size. + */ + if (src_oa->o_valid & OBD_MD_FLSIZE) { + if (lov_stripe_offset(lsm, src_oa->o_size, i, + &req->rq_oa->o_size) < 0 && + req->rq_oa->o_size) + req->rq_oa->o_size--; + + CDEBUG(D_INODE, "stripe %d has size "LPU64"/"LPU64"\n", + i, req->rq_oa->o_size, src_oa->o_size); + } + + lov_set_add_req(req, set); + + /* If we have allocated enough objects, we are OK */ + if (set->set_count == lsm->lsm_stripe_count) + GOTO(out, rc = 0); + } + + if (set->set_count == 0) + GOTO(out, rc = -EIO); + + /* If we were passed specific striping params, then a failure to + * meet those requirements is an error, since we can't reallocate + * that memory (it might be part of a larger array or something). + * + * We can only get here if lsm_stripe_count was originally > 1. + */ + if (!newea) { + CERROR("can't lstripe objid "LPX64": have %u want %u, rc %d\n", + lsm->lsm_object_id, set->set_count, + lsm->lsm_stripe_count, rc); + rc = rc ? rc : -EFBIG; + } else { + qos_shrink_lsm(set); + rc = 0; + } +out: + RETURN(rc); +} + + + diff --git a/lustre/lov/lov_request.c b/lustre/lov/lov_request.c new file mode 100644 index 0000000..0f59335 --- /dev/null +++ b/lustre/lov/lov_request.c @@ -0,0 +1,1293 @@ +/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- + * vim:expandtab:shiftwidth=8:tabstop=8: + * + * Copyright (C) 2002, 2003 Cluster File Systems, Inc. + * + * This file is part of Lustre, http://www.lustre.org. + * + * Lustre is free software; you can redistribute it and/or + * modify it under the terms of version 2 of the GNU General Public + * License as published by the Free Software Foundation. + * + * Lustre is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Lustre; if not, write to the Free Software + * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + */ + +#ifndef EXPORT_SYMTAB +# define EXPORT_SYMTAB +#endif +#define DEBUG_SUBSYSTEM S_LOV + +#ifdef __KERNEL__ +#include +#else +#include +#endif + +#include +#include +#include + +#include "lov_internal.h" + +static void lov_init_set(struct lov_request_set *set) +{ + set->set_count = 0; + set->set_completes = 0; + set->set_success = 0; + INIT_LIST_HEAD(&set->set_list); + atomic_set(&set->set_refcount, 1); +} + +static void lov_finish_set(struct lov_request_set *set) +{ + struct list_head *pos, *n; + ENTRY; + + LASSERT(set); + list_for_each_safe(pos, n, &set->set_list) { + struct lov_request *req = list_entry(pos, struct lov_request, + rq_link); + list_del_init(&req->rq_link); + + if (req->rq_oa) + obdo_free(req->rq_oa); + if (req->rq_md) + OBD_FREE(req->rq_md, req->rq_buflen); + OBD_FREE(req, sizeof(*req)); + } + + if (set->set_pga) { + int len = set->set_oabufs * sizeof(*set->set_pga); + OBD_FREE(set->set_pga, len); + } + if (set->set_lockh) + lov_llh_put(set->set_lockh); + + OBD_FREE(set, sizeof(*set)); + EXIT; +} + +static void lov_update_set(struct lov_request_set *set, + struct lov_request *req, int rc) +{ + req->rq_complete = 1; + req->rq_rc = rc; + + set->set_completes++; + if (rc == 0) + set->set_success++; +} + +int lov_update_common_set(struct lov_request_set *set, + struct lov_request *req, int rc) +{ + struct lov_obd *lov = &set->set_exp->exp_obd->u.lov; + ENTRY; + + lov_update_set(set, req, rc); + + /* grace error on inactive ost */ + if (rc && !lov->tgts[req->rq_idx].active) + rc = 0; + + /* FIXME in raid1 regime, should return 0 */ + RETURN(rc); +} + +void lov_set_add_req(struct lov_request *req, struct lov_request_set *set) +{ + list_add_tail(&req->rq_link, &set->set_list); + set->set_count++; +} + +int lov_update_enqueue_set(struct lov_request_set *set, + struct lov_request *req, int rc, int flags) +{ + struct lustre_handle *lov_lockhp; + struct lov_oinfo *loi; + ENTRY; + + lov_lockhp = set->set_lockh->llh_handles + req->rq_stripe; + loi = &set->set_md->lsm_oinfo[req->rq_stripe]; + + /* XXX FIXME: This unpleasantness doesn't belong here at *all*. + * It belongs in the OSC, except that the OSC doesn't have + * access to the real LOI -- it gets a copy, that we created + * above, and that copy can be arbitrarily out of date. + * + * The LOV API is due for a serious rewriting anyways, and this + * can be addressed then. */ + if (rc == ELDLM_OK) { + struct ldlm_lock *lock = ldlm_handle2lock(lov_lockhp); + __u64 tmp = req->rq_md->lsm_oinfo->loi_rss; + + LASSERT(lock != NULL); + loi->loi_rss = tmp; + loi->loi_blocks = req->rq_md->lsm_oinfo->loi_blocks; + /* Extend KMS up to the end of this lock and no further + * A lock on [x,y] means a KMS of up to y + 1 bytes! */ + if (tmp > lock->l_policy_data.l_extent.end) + tmp = lock->l_policy_data.l_extent.end + 1; + if (tmp >= loi->loi_kms) { + CDEBUG(D_INODE, "lock acquired, setting rss=" + LPU64", kms="LPU64"\n", loi->loi_rss, tmp); + loi->loi_kms = tmp; + loi->loi_kms_valid = 1; + } else { + CDEBUG(D_INODE, "lock acquired, setting rss=" + LPU64"; leaving kms="LPU64", end="LPU64 + "\n", loi->loi_rss, loi->loi_kms, + lock->l_policy_data.l_extent.end); + } + ldlm_lock_allow_match(lock); + LDLM_LOCK_PUT(lock); + } else if (rc == ELDLM_LOCK_ABORTED && flags & LDLM_FL_HAS_INTENT) { + memset(lov_lockhp, 0, sizeof(*lov_lockhp)); + loi->loi_rss = req->rq_md->lsm_oinfo->loi_rss; + loi->loi_blocks = req->rq_md->lsm_oinfo->loi_blocks; + CDEBUG(D_INODE, "glimpsed, setting rss="LPU64"; leaving" + " kms="LPU64"\n", loi->loi_rss, loi->loi_kms); + rc = ELDLM_OK; + } else { + struct obd_export *exp = set->set_exp; + struct lov_obd *lov = &exp->exp_obd->u.lov; + + memset(lov_lockhp, 0, sizeof(*lov_lockhp)); + if (lov->tgts[req->rq_idx].active) { + CERROR("error: enqueue objid "LPX64" subobj " + LPX64" on OST idx %d: rc = %d\n", + set->set_md->lsm_object_id, loi->loi_id, + loi->loi_ost_idx, rc); + } else { + rc = ELDLM_OK; + } + } + lov_update_set(set, req, rc); + RETURN(rc); +} + +static int enqueue_done(struct lov_request_set *set, __u32 mode) +{ + struct list_head *pos; + struct lov_request *req; + struct lustre_handle *lov_lockhp = NULL; + struct lov_obd *lov = &set->set_exp->exp_obd->u.lov; + int rc = 0; + ENTRY; + + LASSERT(set->set_completes); + /* enqueue/match success, just return */ + if (set->set_completes == set->set_success) + RETURN(0); + + /* cancel enqueued/matched locks */ + list_for_each (pos, &set->set_list) { + req = list_entry(pos, struct lov_request, rq_link); + + if (!req->rq_complete || req->rq_rc) + continue; + + lov_lockhp = set->set_lockh->llh_handles + req->rq_stripe; + LASSERT(lov_lockhp); + if (lov_lockhp->cookie == 0) + continue; + + rc = obd_cancel(lov->tgts[req->rq_idx].ltd_exp, req->rq_md, + mode, lov_lockhp); + if (rc && lov->tgts[req->rq_idx].active) + CERROR("cancelling obdjid "LPX64" on OST " + "idx %d error: rc = %d\n", + req->rq_md->lsm_object_id, req->rq_idx, rc); + } + lov_llh_put(set->set_lockh); + RETURN(rc); +} + +int lov_fini_enqueue_set(struct lov_request_set *set, __u32 mode) +{ + int rc = 0; + ENTRY; + + LASSERT(set->set_exp); + if (set == NULL) + RETURN(0); + if (set->set_completes) + rc = enqueue_done(set, mode); + else + lov_llh_put(set->set_lockh); + + if (atomic_dec_and_test(&set->set_refcount)) + lov_finish_set(set); + + RETURN(rc); +} + +int lov_prep_enqueue_set(struct obd_export *exp, struct lov_stripe_md *lsm, + ldlm_policy_data_t *policy, __u32 mode, + struct lustre_handle *lockh, + struct lov_request_set **reqset) +{ + struct lov_obd *lov = &exp->exp_obd->u.lov; + struct lov_request_set *set; + int i, rc = 0; + struct lov_oinfo *loi; + ENTRY; + + OBD_ALLOC(set, sizeof(*set)); + if (set == NULL) + RETURN(-ENOMEM); + lov_init_set(set); + + set->set_exp = exp; + set->set_md = lsm; + set->set_lockh = lov_llh_new(lsm); + if (set->set_lockh == NULL) + GOTO(out_set, rc = -ENOMEM); + lockh->cookie = set->set_lockh->llh_handle.h_cookie; + + loi = lsm->lsm_oinfo; + for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) { + struct lov_request *req; + obd_off start, end; + + if (!lov_stripe_intersects(lsm, i, policy->l_extent.start, + policy->l_extent.end, &start, &end)) + continue; + + if (lov->tgts[loi->loi_ost_idx].active == 0) { + CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx); + continue; + } + + OBD_ALLOC(req, sizeof(*req)); + if (req == NULL) + GOTO(out_set, rc = -ENOMEM); + + req->rq_buflen = sizeof(*req->rq_md) + + sizeof(struct lov_oinfo); + OBD_ALLOC(req->rq_md, req->rq_buflen); + if (req->rq_md == NULL) + GOTO(out_set, rc = -ENOMEM); + + req->rq_extent.start = start; + req->rq_extent.end = end; + + req->rq_idx = loi->loi_ost_idx; + req->rq_stripe = i; + + /* XXX LOV STACKING: submd should be from the subobj */ + req->rq_md->lsm_object_id = loi->loi_id; + req->rq_md->lsm_stripe_count = 0; + req->rq_md->lsm_oinfo->loi_kms_valid = loi->loi_kms_valid; + req->rq_md->lsm_oinfo->loi_rss = loi->loi_rss; + req->rq_md->lsm_oinfo->loi_kms = loi->loi_kms; + req->rq_md->lsm_oinfo->loi_blocks = loi->loi_blocks; + + lov_set_add_req(req, set); + } + if (!set->set_count) + GOTO(out_set, rc = -EIO); + *reqset = set; + RETURN(0); +out_set: + lov_fini_enqueue_set(set, mode); + RETURN(rc); +} + +int lov_update_match_set(struct lov_request_set *set, struct lov_request *req, + int rc) +{ + int ret = rc; + ENTRY; + + if (rc == 1) + ret = 0; + lov_update_set(set, req, ret); + RETURN(rc); +} + +int lov_fini_match_set(struct lov_request_set *set, __u32 mode, int flags) +{ + int rc = 0; + ENTRY; + + LASSERT(set->set_exp); + if (set == NULL) + RETURN(0); + if (set->set_completes) { + if (set->set_count == set->set_success && + flags & LDLM_FL_TEST_LOCK) + lov_llh_put(set->set_lockh); + rc = enqueue_done(set, mode); + } else { + lov_llh_put(set->set_lockh); + } + + if (atomic_dec_and_test(&set->set_refcount)) + lov_finish_set(set); + + RETURN(rc); +} + +int lov_prep_match_set(struct obd_export *exp, struct lov_stripe_md *lsm, + ldlm_policy_data_t *policy, __u32 mode, + struct lustre_handle *lockh, + struct lov_request_set **reqset) +{ + struct lov_obd *lov = &exp->exp_obd->u.lov; + struct lov_request_set *set; + int i, rc = 0; + struct lov_oinfo *loi; + ENTRY; + + OBD_ALLOC(set, sizeof(*set)); + if (set == NULL) + RETURN(-ENOMEM); + lov_init_set(set); + + set->set_exp = exp; + set->set_md = lsm; + set->set_lockh = lov_llh_new(lsm); + if (set->set_lockh == NULL) + GOTO(out_set, rc = -ENOMEM); + lockh->cookie = set->set_lockh->llh_handle.h_cookie; + + loi = lsm->lsm_oinfo; + for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) { + struct lov_request *req; + obd_off start, end; + + if (!lov_stripe_intersects(lsm, i, policy->l_extent.start, + policy->l_extent.end, &start, &end)) + continue; + + /* FIXME raid1 should grace this error */ + if (lov->tgts[loi->loi_ost_idx].active == 0) { + CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx); + GOTO(out_set, rc = -EIO); + } + + OBD_ALLOC(req, sizeof(*req)); + if (req == NULL) + GOTO(out_set, rc = -ENOMEM); + + req->rq_buflen = sizeof(*req->rq_md); + OBD_ALLOC(req->rq_md, req->rq_buflen); + if (req->rq_md == NULL) + GOTO(out_set, rc = -ENOMEM); + + req->rq_extent.start = start; + req->rq_extent.end = end; + + req->rq_idx = loi->loi_ost_idx; + req->rq_stripe = i; + + /* XXX LOV STACKING: submd should be from the subobj */ + req->rq_md->lsm_object_id = loi->loi_id; + req->rq_md->lsm_stripe_count = 0; + + lov_set_add_req(req, set); + } + if (!set->set_count) + GOTO(out_set, rc = -EIO); + *reqset = set; + RETURN(rc); +out_set: + lov_fini_match_set(set, mode, 0); + RETURN(rc); +} + +int lov_fini_cancel_set(struct lov_request_set *set) +{ + int rc = 0; + ENTRY; + + LASSERT(set->set_exp); + if (set == NULL) + RETURN(0); + + if (set->set_lockh) + lov_llh_put(set->set_lockh); + + if (atomic_dec_and_test(&set->set_refcount)) + lov_finish_set(set); + + RETURN(rc); +} + +int lov_prep_cancel_set(struct obd_export *exp, struct lov_stripe_md *lsm, + __u32 mode, struct lustre_handle *lockh, + struct lov_request_set **reqset) +{ + struct lov_request_set *set; + int i, rc = 0; + struct lov_oinfo *loi; + ENTRY; + + OBD_ALLOC(set, sizeof(*set)); + if (set == NULL) + RETURN(-ENOMEM); + lov_init_set(set); + + set->set_exp = exp; + set->set_md = lsm; + set->set_lockh = lov_handle2llh(lockh); + if (set->set_lockh == NULL) { + CERROR("LOV: invalid lov lock handle %p\n", lockh); + GOTO(out_set, rc = -EINVAL); + } + lockh->cookie = set->set_lockh->llh_handle.h_cookie; + + loi = lsm->lsm_oinfo; + for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) { + struct lov_request *req; + struct lustre_handle *lov_lockhp; + + lov_lockhp = set->set_lockh->llh_handles + i; + if (lov_lockhp->cookie == 0) { + CDEBUG(D_HA, "lov idx %d subobj "LPX64" no lock?\n", + loi->loi_ost_idx, loi->loi_id); + continue; + } + + OBD_ALLOC(req, sizeof(*req)); + if (req == NULL) + GOTO(out_set, rc = -ENOMEM); + + req->rq_buflen = sizeof(*req->rq_md); + OBD_ALLOC(req->rq_md, req->rq_buflen); + if (req->rq_md == NULL) + GOTO(out_set, rc = -ENOMEM); + + req->rq_idx = loi->loi_ost_idx; + req->rq_stripe = i; + + /* XXX LOV STACKING: submd should be from the subobj */ + req->rq_md->lsm_object_id = loi->loi_id; + req->rq_md->lsm_stripe_count = 0; + + lov_set_add_req(req, set); + } + if (!set->set_count) + GOTO(out_set, rc = -EIO); + *reqset = set; + RETURN(rc); +out_set: + lov_fini_cancel_set(set); + RETURN(rc); +} + +static int create_done(struct obd_export *exp, struct lov_request_set *set, + struct lov_stripe_md **ea) +{ + struct lov_obd *lov = &exp->exp_obd->u.lov; + struct obd_trans_info *oti = set->set_oti; + struct obdo *src_oa = set->set_oa; + struct list_head *pos; + struct lov_request *req; + struct obdo *ret_oa = NULL; + int attrset = 0, rc = 0; + ENTRY; + + LASSERT(set->set_completes); + + if (!set->set_success) + GOTO(cleanup, rc = -EIO); + if (*ea == NULL && set->set_count != set->set_success) { + set->set_count = set->set_success; + qos_shrink_lsm(set); + } + + ret_oa = obdo_alloc(); + if (ret_oa == NULL) + GOTO(cleanup, rc = -ENOMEM); + + list_for_each (pos, &set->set_list) { + req = list_entry(pos, struct lov_request, rq_link); + if (!req->rq_complete || req->rq_rc) + continue; + lov_merge_attrs(ret_oa, req->rq_oa, req->rq_oa->o_valid, + set->set_md, req->rq_stripe, &attrset); + } + if (src_oa->o_valid & OBD_MD_FLSIZE && + ret_oa->o_size != src_oa->o_size) { + CERROR("original size "LPU64" isn't new object size "LPU64"\n", + src_oa->o_size, ret_oa->o_size); + LBUG(); + } + ret_oa->o_id = src_oa->o_id; + memcpy(src_oa, ret_oa, sizeof(*src_oa)); + obdo_free(ret_oa); + + *ea = set->set_md; + GOTO(done, rc = 0); + +cleanup: + list_for_each (pos, &set->set_list) { + struct obd_export *sub_exp; + int err = 0; + req = list_entry(pos, struct lov_request, rq_link); + + if (!req->rq_complete || req->rq_rc) + continue; + + sub_exp = lov->tgts[req->rq_idx].ltd_exp, + err = obd_destroy(sub_exp, req->rq_oa, NULL, oti); + if (err) + CERROR("Failed to uncreate objid "LPX64" subobj " + LPX64" on OST idx %d: rc = %d\n", + set->set_oa->o_id, req->rq_oa->o_id, + req->rq_idx, rc); + } + if (*ea == NULL) + obd_free_memmd(exp, &set->set_md); +done: + if (oti && set->set_cookies) { + oti->oti_logcookies = set->set_cookies; + if (!set->set_cookie_sent) { + oti_free_cookies(oti); + src_oa->o_valid &= ~OBD_MD_FLCOOKIE; + } else { + src_oa->o_valid |= OBD_MD_FLCOOKIE; + } + } + RETURN(rc); +} + +int lov_fini_create_set(struct lov_request_set *set, struct lov_stripe_md **ea) +{ + int rc = 0; + ENTRY; + + LASSERT(set->set_exp); + if (set == NULL) + RETURN(0); + if (set->set_completes) { + rc = create_done(set->set_exp, set, ea); + /* FIXME update qos data here */ + } + + if (atomic_dec_and_test(&set->set_refcount)) + lov_finish_set(set); + + RETURN(rc); +} + +int lov_update_create_set(struct lov_request_set *set, + struct lov_request *req, int rc) +{ + struct obd_trans_info *oti = set->set_oti; + struct lov_stripe_md *lsm = set->set_md; + struct lov_oinfo *loi; + struct lov_obd *lov = &set->set_exp->exp_obd->u.lov; + ENTRY; + + req->rq_stripe = set->set_success; + loi = &lsm->lsm_oinfo[req->rq_stripe]; + + if (rc && lov->tgts[req->rq_idx].active) { + CERROR("error creating objid "LPX64" sub-object" + " on OST idx %d/%d: rc = %d\n", + set->set_oa->o_id, req->rq_idx, + lsm->lsm_stripe_count, rc); + if (rc > 0) { + CERROR("obd_create returned invalid err %d\n", rc); + rc = -EIO; + } + } + lov_update_set(set, req, rc); + if (rc) + RETURN(rc); + + if (oti && oti->oti_objid) + oti->oti_objid[req->rq_idx] = req->rq_oa->o_id; + + loi->loi_id = req->rq_oa->o_id; + loi->loi_ost_idx = req->rq_idx; + CDEBUG(D_INODE, "objid "LPX64" has subobj "LPX64"/"LPX64" at idx %d\n", + lsm->lsm_object_id, loi->loi_id, loi->loi_id, req->rq_idx); + loi_init(loi); + + if (set->set_cookies) + ++oti->oti_logcookies; + if (req->rq_oa->o_valid & OBD_MD_FLCOOKIE) + set->set_cookie_sent++; + + RETURN(0); +} + +int lov_prep_create_set(struct obd_export *exp, struct lov_stripe_md **ea, + struct obdo *src_oa, struct obd_trans_info *oti, + struct lov_request_set **reqset) +{ + struct lov_obd *lov = &exp->exp_obd->u.lov; + struct lov_request_set *set; + int rc = 0, newea = 0; + ENTRY; + + OBD_ALLOC(set, sizeof(*set)); + if (set == NULL) + RETURN(-ENOMEM); + lov_init_set(set); + + set->set_exp = exp; + set->set_md = *ea; + set->set_oa = src_oa; + set->set_oti = oti; + + if (set->set_md == NULL) { + int stripes, stripe_cnt; + stripe_cnt = lov_get_stripecnt(lov, 0); + + /* If the MDS file was truncated up to some size, stripe over + * enough OSTs to allow the file to be created at that size. */ + if (src_oa->o_valid & OBD_MD_FLSIZE) { + stripes=((src_oa->o_size+LUSTRE_STRIPE_MAXBYTES)>>12)-1; + do_div(stripes, (__u32)(LUSTRE_STRIPE_MAXBYTES >> 12)); + + if (stripes > lov->desc.ld_active_tgt_count) + GOTO(out_set, rc = -EFBIG); + if (stripes < stripe_cnt) + stripes = stripe_cnt; + } else { + stripes = stripe_cnt; + } + + rc = lov_alloc_memmd(&set->set_md, stripes, + lov->desc.ld_pattern ? + lov->desc.ld_pattern : LOV_PATTERN_RAID0); + if (rc < 0) + goto out_set; + newea = 1; + } + + rc = qos_prep_create(lov, set, newea); + if (rc) + goto out_lsm; + + if (oti && (src_oa->o_valid & OBD_MD_FLCOOKIE)) { + oti_alloc_cookies(oti, set->set_count); + if (!oti->oti_logcookies) + goto out_lsm; + set->set_cookies = oti->oti_logcookies; + } + *reqset = set; + RETURN(rc); + +out_lsm: + if (*ea == NULL) + obd_free_memmd(exp, &set->set_md); +out_set: + lov_fini_create_set(set, ea); + RETURN(rc); +} + +static int common_attr_done(struct lov_request_set *set) +{ + struct list_head *pos; + struct lov_request *req; + struct obdo *tmp_oa; + int rc = 0, attrset = 0; + ENTRY; + + if (set->set_oa == NULL) + RETURN(0); + + if (!set->set_success) + RETURN(-EIO); + + tmp_oa = obdo_alloc(); + if (tmp_oa == NULL) + GOTO(out, rc = -ENOMEM); + + list_for_each (pos, &set->set_list) { + req = list_entry(pos, struct lov_request, rq_link); + + if (!req->rq_complete || req->rq_rc) + continue; + if (req->rq_oa->o_valid == 0) /* inactive stripe */ + continue; + lov_merge_attrs(tmp_oa, req->rq_oa, req->rq_oa->o_valid, + set->set_md, req->rq_stripe, &attrset); + } + if (!attrset) { + CERROR("No stripes had valid attrs\n"); + rc = -EIO; + } + tmp_oa->o_id = set->set_oa->o_id; + memcpy(set->set_oa, tmp_oa, sizeof(*set->set_oa)); +out: + if (tmp_oa) + obdo_free(tmp_oa); + RETURN(rc); + +} + +static int brw_done(struct lov_request_set *set) +{ + struct lov_stripe_md *lsm = set->set_md; + struct lov_oinfo *loi = NULL; + struct list_head *pos; + struct lov_request *req; + ENTRY; + + list_for_each (pos, &set->set_list) { + req = list_entry(pos, struct lov_request, rq_link); + + if (!req->rq_complete || req->rq_rc) + continue; + + loi = &lsm->lsm_oinfo[req->rq_stripe]; + + if (req->rq_oa->o_valid & OBD_MD_FLBLOCKS) + loi->loi_blocks = req->rq_oa->o_blocks; + } + + RETURN(0); +} + +int lov_fini_brw_set(struct lov_request_set *set) +{ + int rc = 0; + ENTRY; + + LASSERT(set->set_exp); + if (set == NULL) + RETURN(0); + if (set->set_completes) { + rc = brw_done(set); + /* FIXME update qos data here */ + } + if (atomic_dec_and_test(&set->set_refcount)) + lov_finish_set(set); + + RETURN(rc); +} + +int lov_prep_brw_set(struct obd_export *exp, struct obdo *src_oa, + struct lov_stripe_md *lsm, obd_count oa_bufs, + struct brw_page *pga, struct obd_trans_info *oti, + struct lov_request_set **reqset) +{ + struct { + obd_count index; + obd_count count; + obd_count off; + } *info = NULL; + struct lov_request_set *set; + struct lov_oinfo *loi = NULL; + struct lov_obd *lov = &exp->exp_obd->u.lov; + int rc = 0, i, shift; + ENTRY; + + OBD_ALLOC(set, sizeof(*set)); + if (set == NULL) + RETURN(-ENOMEM); + lov_init_set(set); + + set->set_exp = exp; + set->set_md = lsm; + set->set_oa = src_oa; + set->set_oti = oti; + set->set_oabufs = oa_bufs; + OBD_ALLOC(set->set_pga, oa_bufs * sizeof(*set->set_pga)); + if (!set->set_pga) + GOTO(out, rc = -ENOMEM); + + OBD_ALLOC(info, sizeof(*info) * lsm->lsm_stripe_count); + if (!info) + GOTO(out, rc = -ENOMEM); + + /* calculate the page count for each stripe */ + for (i = 0; i < oa_bufs; i++) { + int stripe = lov_stripe_number(lsm, pga[i].off); + info[stripe].count++; + } + + /* alloc and initialize lov request */ + loi = lsm->lsm_oinfo; + shift = 0; + for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) { + struct lov_request *req; + + if (info[i].count == 0) + continue; + + if (lov->tgts[loi->loi_ost_idx].active == 0) { + CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx); + GOTO(out, rc = -EIO); + } + + OBD_ALLOC(req, sizeof(*req)); + if (req == NULL) + GOTO(out, rc = -ENOMEM); + + req->rq_oa = obdo_alloc(); + if (req->rq_oa == NULL) + GOTO(out, rc = -ENOMEM); + + if (src_oa) + memcpy(req->rq_oa, src_oa, sizeof(*req->rq_oa)); + req->rq_oa->o_id = loi->loi_id; + + req->rq_buflen = sizeof(*req->rq_md); + OBD_ALLOC(req->rq_md, req->rq_buflen); + if (req->rq_md == NULL) + GOTO(out, rc = -ENOMEM); + + req->rq_idx = loi->loi_ost_idx; + req->rq_stripe = i; + + /* XXX LOV STACKING */ + req->rq_md->lsm_object_id = loi->loi_id; + req->rq_md->lsm_object_gr = lsm->lsm_object_gr; + req->rq_oabufs = info[i].count; + req->rq_pgaidx = shift; + shift += req->rq_oabufs; + + /* remember the index for sort brw_page array */ + info[i].index = req->rq_pgaidx; + + lov_set_add_req(req, set); + } + if (!set->set_count) + GOTO(out, rc = -EIO); + + /* rotate & sort the brw_page array */ + for (i = 0; i < oa_bufs; i++) { + int stripe = lov_stripe_number(lsm, pga[i].off); + + shift = info[stripe].index + info[stripe].off; + LASSERT(shift < oa_bufs); + set->set_pga[shift] = pga[i]; + lov_stripe_offset(lsm, pga[i].off, stripe, + &set->set_pga[shift].off); + info[stripe].off++; + } +out: + if (info) + OBD_FREE(info, sizeof(*info) * lsm->lsm_stripe_count); + + if (rc == 0) + *reqset = set; + else + lov_fini_brw_set(set); + + RETURN(rc); +} + +static int getattr_done(struct lov_request_set *set) +{ + return common_attr_done(set); +} + +int lov_fini_getattr_set(struct lov_request_set *set) +{ + int rc = 0; + ENTRY; + + LASSERT(set->set_exp); + if (set == NULL) + RETURN(0); + if (set->set_completes) + rc = getattr_done(set); + + if (atomic_dec_and_test(&set->set_refcount)) + lov_finish_set(set); + + RETURN(rc); +} + +int lov_prep_getattr_set(struct obd_export *exp, struct obdo *src_oa, + struct lov_stripe_md *lsm, + struct lov_request_set **reqset) +{ + struct lov_request_set *set; + struct lov_oinfo *loi = NULL; + struct lov_obd *lov = &exp->exp_obd->u.lov; + int rc = 0, i; + ENTRY; + + OBD_ALLOC(set, sizeof(*set)); + if (set == NULL) + RETURN(-ENOMEM); + lov_init_set(set); + + set->set_exp = exp; + set->set_md = lsm; + set->set_oa = src_oa; + + loi = lsm->lsm_oinfo; + for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) { + struct lov_request *req; + + if (lov->tgts[loi->loi_ost_idx].active == 0) { + CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx); + continue; + } + + OBD_ALLOC(req, sizeof(*req)); + if (req == NULL) + GOTO(out_set, rc = -ENOMEM); + + req->rq_stripe = i; + req->rq_idx = loi->loi_ost_idx; + + req->rq_oa = obdo_alloc(); + if (req->rq_oa == NULL) + GOTO(out_set, rc = -ENOMEM); + memcpy(req->rq_oa, src_oa, sizeof(*req->rq_oa)); + req->rq_oa->o_id = loi->loi_id; + + lov_set_add_req(req, set); + } + if (!set->set_count) + GOTO(out_set, rc = -EIO); + *reqset = set; + RETURN(rc); +out_set: + lov_fini_getattr_set(set); + RETURN(rc); +} + +int lov_fini_destroy_set(struct lov_request_set *set) +{ + ENTRY; + + LASSERT(set->set_exp); + if (set == NULL) + RETURN(0); + if (set->set_completes) { + /* FIXME update qos data here */ + } + + if (atomic_dec_and_test(&set->set_refcount)) + lov_finish_set(set); + + RETURN(0); +} + +int lov_prep_destroy_set(struct obd_export *exp, struct obdo *src_oa, + struct lov_stripe_md *lsm, + struct obd_trans_info *oti, + struct lov_request_set **reqset) +{ + struct lov_request_set *set; + struct lov_oinfo *loi = NULL; + struct lov_obd *lov = &exp->exp_obd->u.lov; + int rc = 0, cookie_set = 0, i; + ENTRY; + + OBD_ALLOC(set, sizeof(*set)); + if (set == NULL) + RETURN(-ENOMEM); + lov_init_set(set); + + set->set_exp = exp; + set->set_md = lsm; + set->set_oa = src_oa; + set->set_oti = oti; + if (oti != NULL && src_oa->o_valid & OBD_MD_FLCOOKIE) + set->set_cookies = oti->oti_logcookies; + + loi = lsm->lsm_oinfo; + for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) { + struct lov_request *req; + + if (lov->tgts[loi->loi_ost_idx].active == 0) { + CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx); + continue; + } + + OBD_ALLOC(req, sizeof(*req)); + if (req == NULL) + GOTO(out_set, rc = -ENOMEM); + + req->rq_stripe = i; + req->rq_idx = loi->loi_ost_idx; + + req->rq_oa = obdo_alloc(); + if (req->rq_oa == NULL) + GOTO(out_set, rc = -ENOMEM); + memcpy(req->rq_oa, src_oa, sizeof(*req->rq_oa)); + req->rq_oa->o_id = loi->loi_id; + + /* Setup the first request's cookie position */ + if (!cookie_set && set->set_cookies) { + oti->oti_logcookies = set->set_cookies + i; + cookie_set = 1; + } + lov_set_add_req(req, set); + } + if (!set->set_count) + GOTO(out_set, rc = -EIO); + *reqset = set; + RETURN(rc); +out_set: + lov_fini_destroy_set(set); + RETURN(rc); +} + +static int setattr_done(struct lov_request_set *set) +{ + return common_attr_done(set); +} + +int lov_fini_setattr_set(struct lov_request_set *set) +{ + int rc = 0; + ENTRY; + + LASSERT(set->set_exp); + if (set == NULL) + RETURN(0); + if (set->set_completes) { + rc = setattr_done(set); + /* FIXME update qos data here */ + } + + if (atomic_dec_and_test(&set->set_refcount)) + lov_finish_set(set); + RETURN(rc); +} + +int lov_prep_setattr_set(struct obd_export *exp, struct obdo *src_oa, + struct lov_stripe_md *lsm, struct obd_trans_info *oti, + struct lov_request_set **reqset) +{ + struct lov_request_set *set; + struct lov_oinfo *loi = NULL; + struct lov_obd *lov = &exp->exp_obd->u.lov; + int rc = 0, i; + ENTRY; + + OBD_ALLOC(set, sizeof(*set)); + if (set == NULL) + RETURN(-ENOMEM); + lov_init_set(set); + + set->set_exp = exp; + set->set_md = lsm; + set->set_oa = src_oa; + + loi = lsm->lsm_oinfo; + for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) { + struct lov_request *req; + + if (lov->tgts[loi->loi_ost_idx].active == 0) { + CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx); + continue; + } + + OBD_ALLOC(req, sizeof(*req)); + if (req == NULL) + GOTO(out_set, rc = -ENOMEM); + req->rq_stripe = i; + req->rq_idx = loi->loi_ost_idx; + + req->rq_oa = obdo_alloc(); + if (req->rq_oa == NULL) + GOTO(out_set, rc = -ENOMEM); + memcpy(req->rq_oa, src_oa, sizeof(*req->rq_oa)); + req->rq_oa->o_id = loi->loi_id; + + if (src_oa->o_valid & OBD_MD_FLSIZE) { + if (lov_stripe_offset(lsm, src_oa->o_size, i, + &req->rq_oa->o_size) < 0 && + req->rq_oa->o_size) + req->rq_oa->o_size--; + CDEBUG(D_INODE, "stripe %d has size "LPU64"/"LPU64"\n", + i, req->rq_oa->o_size, src_oa->o_size); + } + lov_set_add_req(req, set); + } + if (!set->set_count) + GOTO(out_set, rc = -EIO); + *reqset = set; + RETURN(rc); +out_set: + lov_fini_setattr_set(set); + RETURN(rc); +} + +int lov_update_punch_set(struct lov_request_set *set, struct lov_request *req, + int rc) +{ + struct lov_stripe_md *lsm = set->set_md; + struct lov_obd *lov = &set->set_exp->exp_obd->u.lov; + ENTRY; + + lov_update_set(set, req, rc); + if (rc == 0) { + struct lov_oinfo *loi = &lsm->lsm_oinfo[req->rq_stripe]; + loi->loi_kms = loi->loi_rss = req->rq_extent.start; + } + if (rc && !lov->tgts[req->rq_idx].active) + rc = 0; + /* FIXME in raid1 regime, should return 0 */ + RETURN(rc); +} + +int lov_fini_punch_set(struct lov_request_set *set) +{ + int rc = 0; + ENTRY; + + LASSERT(set->set_exp); + if (set == NULL) + RETURN(0); + if (set->set_completes) { + if (!set->set_success) + rc = -EIO; + /* FIXME update qos data here */ + } + + if (atomic_dec_and_test(&set->set_refcount)) + lov_finish_set(set); + + RETURN(rc); +} + +int lov_prep_punch_set(struct obd_export *exp, struct obdo *src_oa, + struct lov_stripe_md *lsm, obd_off start, + obd_off end, struct obd_trans_info *oti, + struct lov_request_set **reqset) +{ + struct lov_request_set *set; + struct lov_oinfo *loi = NULL; + struct lov_obd *lov = &exp->exp_obd->u.lov; + int rc = 0, i; + ENTRY; + + OBD_ALLOC(set, sizeof(*set)); + if (set == NULL) + RETURN(-ENOMEM); + lov_init_set(set); + + set->set_exp = exp; + set->set_md = lsm; + set->set_oa = src_oa; + + loi = lsm->lsm_oinfo; + for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) { + struct lov_request *req; + obd_off rs, re; + + if (lov->tgts[loi->loi_ost_idx].active == 0) { + CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx); + continue; + } + + if (!lov_stripe_intersects(lsm, i, start, end, &rs, &re)) + continue; + + OBD_ALLOC(req, sizeof(*req)); + if (req == NULL) + GOTO(out_set, rc = -ENOMEM); + req->rq_stripe = i; + req->rq_idx = loi->loi_ost_idx; + + req->rq_oa = obdo_alloc(); + if (req->rq_oa == NULL) + GOTO(out_set, rc = -ENOMEM); + memcpy(req->rq_oa, src_oa, sizeof(*req->rq_oa)); + req->rq_oa->o_id = loi->loi_id; + + req->rq_extent.start = rs; + req->rq_extent.end = re; + + lov_set_add_req(req, set); + } + if (!set->set_count) + GOTO(out_set, rc = -EIO); + *reqset = set; + RETURN(rc); +out_set: + lov_fini_punch_set(set); + RETURN(rc); +} + +int lov_fini_sync_set(struct lov_request_set *set) +{ + int rc = 0; + ENTRY; + + LASSERT(set->set_exp); + if (set == NULL) + RETURN(0); + if (set->set_completes) { + if (!set->set_success) + rc = -EIO; + /* FIXME update qos data here */ + } + + if (atomic_dec_and_test(&set->set_refcount)) + lov_finish_set(set); + + RETURN(rc); +} + +int lov_prep_sync_set(struct obd_export *exp, struct obdo *src_oa, + struct lov_stripe_md *lsm, obd_off start, + obd_off end, struct lov_request_set **reqset) +{ + struct lov_request_set *set; + struct lov_oinfo *loi = NULL; + struct lov_obd *lov = &exp->exp_obd->u.lov; + int rc = 0, i; + ENTRY; + + OBD_ALLOC(set, sizeof(*set)); + if (set == NULL) + RETURN(-ENOMEM); + lov_init_set(set); + + set->set_exp = exp; + set->set_md = lsm; + set->set_oa = src_oa; + + loi = lsm->lsm_oinfo; + for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) { + struct lov_request *req; + obd_off rs, re; + + if (lov->tgts[loi->loi_ost_idx].active == 0) { + CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx); + continue; + } + + if (!lov_stripe_intersects(lsm, i, start, end, &rs, &re)) + continue; + + OBD_ALLOC(req, sizeof(*req)); + if (req == NULL) + GOTO(out_set, rc = -ENOMEM); + req->rq_stripe = i; + req->rq_idx = loi->loi_ost_idx; + + req->rq_oa = obdo_alloc(); + if (req->rq_oa == NULL) + GOTO(out_set, rc = -ENOMEM); + memcpy(req->rq_oa, src_oa, sizeof(*req->rq_oa)); + req->rq_oa->o_id = loi->loi_id; + + req->rq_extent.start = rs; + req->rq_extent.end = re; + + lov_set_add_req(req, set); + } + if (!set->set_count) + GOTO(out_set, rc = -EIO); + *reqset = set; + RETURN(rc); +out_set: + lov_fini_sync_set(set); + RETURN(rc); +} diff --git a/lustre/ptlrpc/client.c b/lustre/ptlrpc/client.c index bcaa30a..8321e73 100644 --- a/lustre/ptlrpc/client.c +++ b/lustre/ptlrpc/client.c @@ -1005,7 +1005,7 @@ int ptlrpc_set_wait(struct ptlrpc_request_set *set) if (set->set_interpret != NULL) { int (*interpreter)(struct ptlrpc_request_set *set,void *,int) = set->set_interpret; - rc = interpreter (set, &set->set_args, rc); + rc = interpreter (set, set->set_arg, rc); } RETURN(rc); -- 1.8.3.1