From e810e2a3c3ed08feb08f323f60a7262242d8a1da Mon Sep 17 00:00:00 2001 From: adilger Date: Tue, 7 Sep 2004 18:17:11 +0000 Subject: [PATCH] Merge b1_4_smallfix from b1_4 (20040903_1451) - added kernel patch for /dev/sd I/O stats (4385) - lovcleanup - 2.6 direct read fixes, memory shortage fixes --- lustre/lov/lov_merge.c | 145 ++++++ lustre/lov/lov_offset.c | 240 +++++++++ lustre/lov/lov_qos.c | 186 +++++++ lustre/lov/lov_request.c | 1276 ++++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 1847 insertions(+) create mode 100644 lustre/lov/lov_merge.c create mode 100644 lustre/lov/lov_offset.c create mode 100644 lustre/lov/lov_qos.c create mode 100644 lustre/lov/lov_request.c diff --git a/lustre/lov/lov_merge.c b/lustre/lov/lov_merge.c new file mode 100644 index 0000000..3da1e10 --- /dev/null +++ b/lustre/lov/lov_merge.c @@ -0,0 +1,145 @@ +/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- + * vim:expandtab:shiftwidth=8:tabstop=8: + * + * Copyright (C) 2002, 2003 Cluster File Systems, Inc. + * + * This file is part of Lustre, http://www.lustre.org. + * + * Lustre is free software; you can redistribute it and/or + * modify it under the terms of version 2 of the GNU General Public + * License as published by the Free Software Foundation. + * + * Lustre is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Lustre; if not, write to the Free Software + * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + */ + +#ifndef EXPORT_SYMTAB +# define EXPORT_SYMTAB +#endif +#define DEBUG_SUBSYSTEM S_LOV + +#ifdef __KERNEL__ +#include +#else +#include +#endif + +#include +#include + +#include "lov_internal.h" + +/* Merge rss if kms == 0 + * + * Even when merging RSS, we will take the KMS value if it's larger. + * This prevents getattr from stomping on dirty cached pages which + * extend the file size. */ +__u64 lov_merge_size(struct lov_stripe_md *lsm, int kms) +{ + struct lov_oinfo *loi; + __u64 size = 0; + int i; + + for (i = 0, loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; + i++, loi++) { + obd_size lov_size, tmpsize; + + tmpsize = loi->loi_kms; + if (kms == 0 && loi->loi_rss > tmpsize) + tmpsize = loi->loi_rss; + + lov_size = lov_stripe_size(lsm, tmpsize, i); + if (lov_size > size) + size = lov_size; + } + + return size; +} +EXPORT_SYMBOL(lov_merge_size); + +/* Merge blocks */ +__u64 lov_merge_blocks(struct lov_stripe_md *lsm) +{ + struct lov_oinfo *loi; + __u64 blocks = 0; + int i; + + for (i = 0, loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; + i++, loi++) { + blocks += loi->loi_blocks; + } + return blocks; +} +EXPORT_SYMBOL(lov_merge_blocks); + +__u64 lov_merge_mtime(struct lov_stripe_md *lsm, __u64 current_time) +{ + struct lov_oinfo *loi; + int i; + + for (i = 0, loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; + i++, loi++) { + if (loi->loi_mtime > current_time) + current_time = loi->loi_mtime; + } + return current_time; +} +EXPORT_SYMBOL(lov_merge_mtime); + +void lov_increase_kms(struct obd_export *exp, struct lov_stripe_md *lsm, + obd_off size) +{ + struct lov_oinfo *loi; + int stripe = 0; + __u64 kms; + ENTRY; + + if (size > 0) + stripe = lov_stripe_number(lsm, size - 1); + kms = lov_size_to_stripe(lsm, size, stripe); + loi = &(lsm->lsm_oinfo[stripe]); + + CDEBUG(D_INODE, "stripe %d KMS %sincreasing "LPU64"->"LPU64"\n", + stripe, kms > loi->loi_kms ? "" : "not ", loi->loi_kms, kms); + if (kms > loi->loi_kms) + loi->loi_kms = kms; + EXIT; +} +EXPORT_SYMBOL(lov_increase_kms); + +void lov_merge_attrs(struct obdo *tgt, struct obdo *src, obd_flag valid, + struct lov_stripe_md *lsm, int stripeno, int *set) +{ + valid &= src->o_valid; + + if (*set) { + if (valid & OBD_MD_FLSIZE) { + /* this handles sparse files properly */ + obd_size lov_size; + + lov_size = lov_stripe_size(lsm, src->o_size, stripeno); + if (lov_size > tgt->o_size) + tgt->o_size = lov_size; + } + if (valid & OBD_MD_FLBLOCKS) + tgt->o_blocks += src->o_blocks; + if (valid & OBD_MD_FLBLKSZ) + tgt->o_blksize += src->o_blksize; + if (valid & OBD_MD_FLCTIME && tgt->o_ctime < src->o_ctime) + tgt->o_ctime = src->o_ctime; + if (valid & OBD_MD_FLMTIME && tgt->o_mtime < src->o_mtime) + tgt->o_mtime = src->o_mtime; + } else { + memcpy(tgt, src, sizeof(*tgt)); + tgt->o_id = lsm->lsm_object_id; + if (valid & OBD_MD_FLSIZE) + tgt->o_size = lov_stripe_size(lsm,src->o_size,stripeno); + *set = 1; + } +} diff --git a/lustre/lov/lov_offset.c b/lustre/lov/lov_offset.c new file mode 100644 index 0000000..66fad27 --- /dev/null +++ b/lustre/lov/lov_offset.c @@ -0,0 +1,240 @@ +/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- + * vim:expandtab:shiftwidth=8:tabstop=8: + * + * Copyright (C) 2002, 2003 Cluster File Systems, Inc. + * + * This file is part of Lustre, http://www.lustre.org. + * + * Lustre is free software; you can redistribute it and/or + * modify it under the terms of version 2 of the GNU General Public + * License as published by the Free Software Foundation. + * + * Lustre is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Lustre; if not, write to the Free Software + * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + */ + +#ifndef EXPORT_SYMTAB +# define EXPORT_SYMTAB +#endif +#define DEBUG_SUBSYSTEM S_LOV + +#ifdef __KERNEL__ +#include +#else +#include +#endif + +#include +#include + +#include "lov_internal.h" + +/* compute object size given "stripeno" and the ost size */ +obd_size lov_stripe_size(struct lov_stripe_md *lsm, obd_size ost_size, + int stripeno) +{ + unsigned long ssize = lsm->lsm_stripe_size; + unsigned long swidth = ssize * lsm->lsm_stripe_count; + unsigned long stripe_size; + obd_size lov_size; + ENTRY; + + if (ost_size == 0) + RETURN(0); + + /* do_div(a, b) returns a % b, and a = a / b */ + stripe_size = do_div(ost_size, ssize); + if (stripe_size) + lov_size = ost_size * swidth + stripeno * ssize + stripe_size; + else + lov_size = (ost_size - 1) * swidth + (stripeno + 1) * ssize; + + RETURN(lov_size); +} + +/* we have an offset in file backed by an lov and want to find out where + * that offset lands in our given stripe of the file. for the easy + * case where the offset is within the stripe, we just have to scale the + * offset down to make it relative to the stripe instead of the lov. + * + * the harder case is what to do when the offset doesn't intersect the + * stripe. callers will want start offsets clamped ahead to the start + * of the nearest stripe in the file. end offsets similarly clamped to the + * nearest ending byte of a stripe in the file: + * + * all this function does is move offsets to the nearest region of the + * stripe, and it does its work "mod" the full length of all the stripes. + * consider a file with 3 stripes: + * + * S E + * --------------------------------------------------------------------- + * | 0 | 1 | 2 | 0 | 1 | 2 | + * --------------------------------------------------------------------- + * + * to find stripe 1's offsets for S and E, it divides by the full stripe + * width and does its math in the context of a single set of stripes: + * + * S E + * ----------------------------------- + * | 0 | 1 | 2 | + * ----------------------------------- + * + * it'll notice that E is outside stripe 1 and clamp it to the end of the + * stripe, then multiply it back out by lov_off to give the real offsets in + * the stripe: + * + * S E + * --------------------------------------------------------------------- + * | 1 | 1 | 1 | 1 | 1 | 1 | + * --------------------------------------------------------------------- + * + * it would have done similarly and pulled S forward to the start of a 1 + * stripe if, say, S had landed in a 0 stripe. + * + * this rounding isn't always correct. consider an E lov offset that lands + * on a 0 stripe, the "mod stripe width" math will pull it forward to the + * start of a 1 stripe, when in fact it wanted to be rounded back to the end + * of a previous 1 stripe. this logic is handled by callers and this is why: + * + * this function returns < 0 when the offset was "before" the stripe and + * was moved forward to the start of the stripe in question; 0 when it + * falls in the stripe and no shifting was done; > 0 when the offset + * was outside the stripe and was pulled back to its final byte. */ +int lov_stripe_offset(struct lov_stripe_md *lsm, obd_off lov_off, + int stripeno, obd_off *obd_off) +{ + unsigned long ssize = lsm->lsm_stripe_size; + unsigned long swidth = ssize * lsm->lsm_stripe_count; + unsigned long stripe_off, this_stripe; + int ret = 0; + + if (lov_off == OBD_OBJECT_EOF) { + *obd_off = OBD_OBJECT_EOF; + return 0; + } + + /* do_div(a, b) returns a % b, and a = a / b */ + stripe_off = do_div(lov_off, swidth); + + this_stripe = stripeno * ssize; + if (stripe_off < this_stripe) { + stripe_off = 0; + ret = -1; + } else { + stripe_off -= this_stripe; + + if (stripe_off >= ssize) { + stripe_off = ssize; + ret = 1; + } + } + + *obd_off = lov_off * ssize + stripe_off; + return ret; +} + +/* Given a whole-file size and a stripe number, give the file size which + * corresponds to the individual object of that stripe. + * + * This behaves basically in the same was as lov_stripe_offset, except that + * file sizes falling before the beginning of a stripe are clamped to the end + * of the previous stripe, not the beginning of the next: + * + * S + * --------------------------------------------------------------------- + * | 0 | 1 | 2 | 0 | 1 | 2 | + * --------------------------------------------------------------------- + * + * if clamped to stripe 2 becomes: + * + * S + * --------------------------------------------------------------------- + * | 0 | 1 | 2 | 0 | 1 | 2 | + * --------------------------------------------------------------------- + */ +obd_off lov_size_to_stripe(struct lov_stripe_md *lsm, obd_off file_size, + int stripeno) +{ + unsigned long ssize = lsm->lsm_stripe_size; + unsigned long swidth = ssize * lsm->lsm_stripe_count; + unsigned long stripe_off, this_stripe; + + if (file_size == OBD_OBJECT_EOF) + return OBD_OBJECT_EOF; + + /* do_div(a, b) returns a % b, and a = a / b */ + stripe_off = do_div(file_size, swidth); + + this_stripe = stripeno * ssize; + if (stripe_off < this_stripe) { + /* Move to end of previous stripe, or zero */ + if (file_size > 0) { + file_size--; + stripe_off = ssize; + } else { + stripe_off = 0; + } + } else { + stripe_off -= this_stripe; + + if (stripe_off >= ssize) { + /* Clamp to end of this stripe */ + stripe_off = ssize; + } + } + + return (file_size * ssize + stripe_off); +} + +/* given an extent in an lov and a stripe, calculate the extent of the stripe + * that is contained within the lov extent. this returns true if the given + * stripe does intersect with the lov extent. */ +int lov_stripe_intersects(struct lov_stripe_md *lsm, int stripeno, + obd_off start, obd_off end, + obd_off *obd_start, obd_off *obd_end) +{ + int start_side, end_side; + + start_side = lov_stripe_offset(lsm, start, stripeno, obd_start); + end_side = lov_stripe_offset(lsm, end, stripeno, obd_end); + + CDEBUG(D_INODE, "["LPU64"->"LPU64"] -> [(%d) "LPU64"->"LPU64" (%d)]\n", + start, end, start_side, *obd_start, *obd_end, end_side); + + /* this stripe doesn't intersect the file extent when neither + * start or the end intersected the stripe and obd_start and + * obd_end got rounded up to the save value. */ + if (start_side != 0 && end_side != 0 && *obd_start == *obd_end) + return 0; + + /* as mentioned in the lov_stripe_offset commentary, end + * might have been shifted in the wrong direction. This + * happens when an end offset is before the stripe when viewed + * through the "mod stripe size" math. we detect it being shifted + * in the wrong direction and touch it up. + * interestingly, this can't underflow since end must be > start + * if we passed through the previous check. + * (should we assert for that somewhere?) */ + if (end_side != 0) + (*obd_end)--; + + return 1; +} + +/* compute which stripe number "lov_off" will be written into */ +int lov_stripe_number(struct lov_stripe_md *lsm, obd_off lov_off) +{ + unsigned long ssize = lsm->lsm_stripe_size; + unsigned long swidth = ssize * lsm->lsm_stripe_count; + unsigned long stripe_off; + + stripe_off = do_div(lov_off, swidth); + + return stripe_off / ssize; +} diff --git a/lustre/lov/lov_qos.c b/lustre/lov/lov_qos.c new file mode 100644 index 0000000..a2b1643 --- /dev/null +++ b/lustre/lov/lov_qos.c @@ -0,0 +1,186 @@ +/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- + * vim:expandtab:shiftwidth=8:tabstop=8: + * + * Copyright (C) 2002, 2003 Cluster File Systems, Inc. + * + * This file is part of Lustre, http://www.lustre.org. + * + * Lustre is free software; you can redistribute it and/or + * modify it under the terms of version 2 of the GNU General Public + * License as published by the Free Software Foundation. + * + * Lustre is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Lustre; if not, write to the Free Software + * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + */ + +#ifndef EXPORT_SYMTAB +# define EXPORT_SYMTAB +#endif +#define DEBUG_SUBSYSTEM S_LOV + +#ifdef __KERNEL__ +#else +#include +#endif + +#include +#include + +#include "lov_internal.h" + +void qos_shrink_lsm(struct lov_request_set *set) +{ + struct lov_stripe_md *lsm = set->set_md; + struct lov_stripe_md *lsm_new; + /* XXX LOV STACKING call into osc for sizes */ + unsigned oldsize, newsize; + + if (set->set_oti && set->set_cookies && set->set_cookie_sent) { + struct llog_cookie *cookies; + oldsize = lsm->lsm_stripe_count * sizeof(*cookies); + newsize = set->set_count * sizeof(*cookies); + + cookies = set->set_cookies; + oti_alloc_cookies(set->set_oti, set->set_count); + if (set->set_oti->oti_logcookies) { + memcpy(set->set_oti->oti_logcookies, cookies, newsize); + OBD_FREE(cookies, oldsize); + set->set_cookies = set->set_oti->oti_logcookies; + } else { + CWARN("'leaking' %d bytes\n", oldsize - newsize); + } + } + + CWARN("using fewer stripes for object "LPX64": old %u new %u\n", + lsm->lsm_object_id, lsm->lsm_stripe_count, set->set_count); + + oldsize = lov_stripe_md_size(lsm->lsm_stripe_count); + newsize = lov_stripe_md_size(set->set_count); + OBD_ALLOC(lsm_new, newsize); + if (lsm_new != NULL) { + memcpy(lsm_new, lsm, newsize); + lsm_new->lsm_stripe_count = set->set_count; + OBD_FREE(lsm, oldsize); + set->set_md = lsm_new; + } else { + CWARN("'leaking' %d bytes\n", oldsize - newsize); + } +} + +#define LOV_CREATE_RESEED_INTERVAL 1000 +/* FIXME use real qos data to prepare the lov create request */ +int qos_prep_create(struct lov_obd *lov, struct lov_request_set *set, int newea) +{ + static int ost_start_idx, ost_start_count; + unsigned ost_idx, ost_count = lov->desc.ld_tgt_count; + struct lov_stripe_md *lsm = set->set_md; + struct obdo *src_oa = set->set_oa; + int i, rc = 0; + ENTRY; + + LASSERT(src_oa->o_valid & OBD_MD_FLID); + + lsm->lsm_object_id = src_oa->o_id; + if (!lsm->lsm_stripe_size) + lsm->lsm_stripe_size = lov->desc.ld_default_stripe_size; + if (!lsm->lsm_pattern) { + lsm->lsm_pattern = lov->desc.ld_pattern ? + lov->desc.ld_pattern : LOV_PATTERN_RAID0; + } + + if (newea || lsm->lsm_oinfo[0].loi_ost_idx >= ost_count) { + if (--ost_start_count <= 0) { + ost_start_idx = ll_insecure_random_int(); + ost_start_count = LOV_CREATE_RESEED_INTERVAL; + } else if (lsm->lsm_stripe_count >= + lov->desc.ld_active_tgt_count) { + /* If we allocate from all of the stripes, make the + * next file start on the next OST. */ + ++ost_start_idx; + } + ost_idx = ost_start_idx % ost_count; + } else { + ost_idx = lsm->lsm_oinfo[0].loi_ost_idx; + } + + CDEBUG(D_INODE, "allocating %d subobjs for objid "LPX64" at idx %d\n", + lsm->lsm_stripe_count, lsm->lsm_object_id, ost_idx); + + for (i = 0; i < ost_count; i++, ost_idx = (ost_idx + 1) % ost_count) { + struct lov_request *req; + + ++ost_start_idx; + if (lov->tgts[ost_idx].active == 0) { + CDEBUG(D_HA, "lov idx %d inactive\n", ost_idx); + continue; + } + + OBD_ALLOC(req, sizeof(*req)); + if (req == NULL) + GOTO(out, rc = -ENOMEM); + + req->rq_buflen = sizeof(*req->rq_md); + OBD_ALLOC(req->rq_md, req->rq_buflen); + if (req->rq_md == NULL) + GOTO(out, rc = -ENOMEM); + + req->rq_oa = obdo_alloc(); + if (req->rq_oa == NULL) + GOTO(out, rc = -ENOMEM); + + req->rq_idx = ost_idx; + req->rq_stripe = i; + /* create data objects with "parent" OA */ + memcpy(req->rq_oa, src_oa, sizeof(*req->rq_oa)); + + /* XXX When we start creating objects on demand, we need to + * make sure that we always create the object on the + * stripe which holds the existing file size. + */ + if (src_oa->o_valid & OBD_MD_FLSIZE) { + if (lov_stripe_offset(lsm, src_oa->o_size, i, + &req->rq_oa->o_size) < 0 && + req->rq_oa->o_size) + req->rq_oa->o_size--; + + CDEBUG(D_INODE, "stripe %d has size "LPU64"/"LPU64"\n", + i, req->rq_oa->o_size, src_oa->o_size); + } + + lov_set_add_req(req, set); + + /* If we have allocated enough objects, we are OK */ + if (set->set_count == lsm->lsm_stripe_count) + GOTO(out, rc = 0); + } + + if (set->set_count == 0) + GOTO(out, rc = -EIO); + + /* If we were passed specific striping params, then a failure to + * meet those requirements is an error, since we can't reallocate + * that memory (it might be part of a larger array or something). + * + * We can only get here if lsm_stripe_count was originally > 1. + */ + if (!newea) { + CERROR("can't lstripe objid "LPX64": have %u want %u, rc %d\n", + lsm->lsm_object_id, set->set_count, + lsm->lsm_stripe_count, rc); + rc = rc ? rc : -EFBIG; + } else { + qos_shrink_lsm(set); + rc = 0; + } +out: + RETURN(rc); +} + + + diff --git a/lustre/lov/lov_request.c b/lustre/lov/lov_request.c new file mode 100644 index 0000000..da1f5a6 --- /dev/null +++ b/lustre/lov/lov_request.c @@ -0,0 +1,1276 @@ +/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- + * vim:expandtab:shiftwidth=8:tabstop=8: + * + * Copyright (C) 2002, 2003 Cluster File Systems, Inc. + * + * This file is part of Lustre, http://www.lustre.org. + * + * Lustre is free software; you can redistribute it and/or + * modify it under the terms of version 2 of the GNU General Public + * License as published by the Free Software Foundation. + * + * Lustre is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Lustre; if not, write to the Free Software + * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + */ + +#ifndef EXPORT_SYMTAB +# define EXPORT_SYMTAB +#endif +#define DEBUG_SUBSYSTEM S_LOV + +#ifdef __KERNEL__ +#include +#else +#include +#endif + +#include +#include +#include + +#include "lov_internal.h" + +static void lov_init_set(struct lov_request_set *set) +{ + set->set_count = 0; + set->set_completes = 0; + set->set_success = 0; + INIT_LIST_HEAD(&set->set_list); + atomic_set(&set->set_refcount, 1); +} + +static void lov_finish_set(struct lov_request_set *set) +{ + struct list_head *pos, *n; + ENTRY; + + LASSERT(set); + list_for_each_safe(pos, n, &set->set_list) { + struct lov_request *req = list_entry(pos, struct lov_request, + rq_link); + list_del_init(&req->rq_link); + + if (req->rq_oa) + obdo_free(req->rq_oa); + if (req->rq_md) + OBD_FREE(req->rq_md, req->rq_buflen); + OBD_FREE(req, sizeof(*req)); + } + + if (set->set_pga) { + int len = set->set_oabufs * sizeof(*set->set_pga); + OBD_FREE(set->set_pga, len); + } + if (set->set_lockh) + lov_llh_put(set->set_lockh); + + OBD_FREE(set, sizeof(*set)); + EXIT; +} + +static void lov_update_set(struct lov_request_set *set, + struct lov_request *req, int rc) +{ + req->rq_complete = 1; + req->rq_rc = rc; + + set->set_completes++; + if (rc == 0) + set->set_success++; +} + +int lov_update_common_set(struct lov_request_set *set, + struct lov_request *req, int rc) +{ + struct lov_obd *lov = &set->set_exp->exp_obd->u.lov; + ENTRY; + + lov_update_set(set, req, rc); + + /* grace error on inactive ost */ + if (rc && !lov->tgts[req->rq_idx].active) + rc = 0; + + /* FIXME in raid1 regime, should return 0 */ + RETURN(rc); +} + +void lov_set_add_req(struct lov_request *req, struct lov_request_set *set) +{ + list_add_tail(&req->rq_link, &set->set_list); + set->set_count++; +} + +int lov_update_enqueue_set(struct lov_request_set *set, + struct lov_request *req, int rc, int flags) +{ + struct lustre_handle *lov_lockhp; + struct lov_oinfo *loi; + ENTRY; + + lov_lockhp = set->set_lockh->llh_handles + req->rq_stripe; + loi = &set->set_md->lsm_oinfo[req->rq_stripe]; + + /* XXX FIXME: This unpleasantness doesn't belong here at *all*. + * It belongs in the OSC, except that the OSC doesn't have + * access to the real LOI -- it gets a copy, that we created + * above, and that copy can be arbitrarily out of date. + * + * The LOV API is due for a serious rewriting anyways, and this + * can be addressed then. */ + if (rc == ELDLM_OK) { + struct ldlm_lock *lock = ldlm_handle2lock(lov_lockhp); + __u64 tmp = req->rq_md->lsm_oinfo->loi_rss; + + LASSERT(lock != NULL); + loi->loi_rss = tmp; + loi->loi_blocks = req->rq_md->lsm_oinfo->loi_blocks; + /* Extend KMS up to the end of this lock and no further + * A lock on [x,y] means a KMS of up to y + 1 bytes! */ + if (tmp > lock->l_policy_data.l_extent.end) + tmp = lock->l_policy_data.l_extent.end + 1; + if (tmp >= loi->loi_kms) { + CDEBUG(D_INODE, "lock acquired, setting rss=" + LPU64", kms="LPU64"\n", loi->loi_rss, tmp); + loi->loi_kms = tmp; + loi->loi_kms_valid = 1; + } else { + CDEBUG(D_INODE, "lock acquired, setting rss=" + LPU64"; leaving kms="LPU64", end="LPU64 + "\n", loi->loi_rss, loi->loi_kms, + lock->l_policy_data.l_extent.end); + } + ldlm_lock_allow_match(lock); + LDLM_LOCK_PUT(lock); + + } else if (rc == ELDLM_LOCK_ABORTED && flags & LDLM_FL_HAS_INTENT) { + memset(lov_lockhp, 0, sizeof(*lov_lockhp)); + loi->loi_rss = req->rq_md->lsm_oinfo->loi_rss; + loi->loi_blocks = req->rq_md->lsm_oinfo->loi_blocks; + CDEBUG(D_INODE, "glimpsed, setting rss="LPU64"; leaving" + " kms="LPU64"\n", loi->loi_rss, loi->loi_kms); + rc = ELDLM_OK; + } else { + struct obd_export *exp = set->set_exp; + struct lov_obd *lov = &exp->exp_obd->u.lov; + + memset(lov_lockhp, 0, sizeof(*lov_lockhp)); + if (lov->tgts[req->rq_idx].active) { + CERROR("error: enqueue objid "LPX64" subobj " + LPX64" on OST idx %d: rc = %d\n", + set->set_md->lsm_object_id, loi->loi_id, + loi->loi_ost_idx, rc); + } else { + rc = ELDLM_OK; + } + } + lov_update_set(set, req, rc); + RETURN(rc); +} + +static int enqueue_done(struct lov_request_set *set, __u32 mode) +{ + struct list_head *pos; + struct lov_request *req; + struct lustre_handle *lov_lockhp = NULL; + struct lov_obd *lov = &set->set_exp->exp_obd->u.lov; + int rc = 0; + ENTRY; + + LASSERT(set->set_completes); + /* enqueue/match success, just return */ + if (set->set_completes == set->set_success) + RETURN(0); + + /* cancel enqueued/matched locks */ + list_for_each (pos, &set->set_list) { + req = list_entry(pos, struct lov_request, rq_link); + + if (!req->rq_complete || req->rq_rc) + continue; + + lov_lockhp = set->set_lockh->llh_handles + req->rq_stripe; + LASSERT(lov_lockhp); + if (lov_lockhp->cookie == 0) + continue; + + rc = obd_cancel(lov->tgts[req->rq_idx].ltd_exp, req->rq_md, + mode, lov_lockhp); + if (rc && lov->tgts[req->rq_idx].active) + CERROR("cancelling obdjid "LPX64" on OST " + "idx %d error: rc = %d\n", + req->rq_md->lsm_object_id, req->rq_idx, rc); + } + lov_llh_put(set->set_lockh); + RETURN(rc); +} + +int lov_fini_enqueue_set(struct lov_request_set *set, __u32 mode) +{ + int rc = 0; + ENTRY; + + LASSERT(set->set_exp); + if (set == NULL) + RETURN(0); + if (set->set_completes) + rc = enqueue_done(set, mode); + else + lov_llh_put(set->set_lockh); + + if (atomic_dec_and_test(&set->set_refcount)) + lov_finish_set(set); + + RETURN(rc); +} + +int lov_prep_enqueue_set(struct obd_export *exp, struct lov_stripe_md *lsm, + ldlm_policy_data_t *policy, __u32 mode, + struct lustre_handle *lockh, + struct lov_request_set **reqset) +{ + struct lov_obd *lov = &exp->exp_obd->u.lov; + struct lov_request_set *set; + int i, rc = 0; + struct lov_oinfo *loi; + ENTRY; + + OBD_ALLOC(set, sizeof(*set)); + if (set == NULL) + RETURN(-ENOMEM); + lov_init_set(set); + + set->set_exp = exp; + set->set_md = lsm; + set->set_lockh = lov_llh_new(lsm); + if (set->set_lockh == NULL) + GOTO(out_set, rc = -ENOMEM); + lockh->cookie = set->set_lockh->llh_handle.h_cookie; + + loi = lsm->lsm_oinfo; + for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) { + struct lov_request *req; + obd_off start, end; + + if (!lov_stripe_intersects(lsm, i, policy->l_extent.start, + policy->l_extent.end, &start, &end)) + continue; + + if (lov->tgts[loi->loi_ost_idx].active == 0) { + CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx); + continue; + } + + OBD_ALLOC(req, sizeof(*req)); + if (req == NULL) + GOTO(out_set, rc = -ENOMEM); + + req->rq_buflen = sizeof(*req->rq_md) + + sizeof(struct lov_oinfo); + OBD_ALLOC(req->rq_md, req->rq_buflen); + if (req->rq_md == NULL) + GOTO(out_set, rc = -ENOMEM); + + req->rq_extent.start = start; + req->rq_extent.end = end; + + req->rq_idx = loi->loi_ost_idx; + req->rq_stripe = i; + + /* XXX LOV STACKING: submd should be from the subobj */ + req->rq_md->lsm_object_id = loi->loi_id; + req->rq_md->lsm_stripe_count = 0; + req->rq_md->lsm_oinfo->loi_kms_valid = loi->loi_kms_valid; + req->rq_md->lsm_oinfo->loi_rss = loi->loi_rss; + req->rq_md->lsm_oinfo->loi_kms = loi->loi_kms; + req->rq_md->lsm_oinfo->loi_blocks = loi->loi_blocks; + + lov_set_add_req(req, set); + } + if (!set->set_count) + GOTO(out_set, rc = -EIO); + *reqset = set; + RETURN(0); +out_set: + lov_fini_enqueue_set(set, mode); + RETURN(rc); +} + +int lov_update_match_set(struct lov_request_set *set, struct lov_request *req, + int rc) +{ + int ret = rc; + ENTRY; + + if (rc == 1) + ret = 0; + lov_update_set(set, req, ret); + RETURN(rc); +} + +int lov_fini_match_set(struct lov_request_set *set, __u32 mode, int flags) +{ + int rc = 0; + ENTRY; + + LASSERT(set->set_exp); + if (set == NULL) + RETURN(0); + if (set->set_completes) { + if (set->set_count == set->set_success && + flags & LDLM_FL_TEST_LOCK) + lov_llh_put(set->set_lockh); + rc = enqueue_done(set, mode); + } else { + lov_llh_put(set->set_lockh); + } + + if (atomic_dec_and_test(&set->set_refcount)) + lov_finish_set(set); + + RETURN(rc); +} + +int lov_prep_match_set(struct obd_export *exp, struct lov_stripe_md *lsm, + ldlm_policy_data_t *policy, __u32 mode, + struct lustre_handle *lockh, + struct lov_request_set **reqset) +{ + struct lov_obd *lov = &exp->exp_obd->u.lov; + struct lov_request_set *set; + int i, rc = 0; + struct lov_oinfo *loi; + ENTRY; + + OBD_ALLOC(set, sizeof(*set)); + if (set == NULL) + RETURN(-ENOMEM); + lov_init_set(set); + + set->set_exp = exp; + set->set_md = lsm; + set->set_lockh = lov_llh_new(lsm); + if (set->set_lockh == NULL) + GOTO(out_set, rc = -ENOMEM); + lockh->cookie = set->set_lockh->llh_handle.h_cookie; + + loi = lsm->lsm_oinfo; + for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) { + struct lov_request *req; + obd_off start, end; + + if (!lov_stripe_intersects(lsm, i, policy->l_extent.start, + policy->l_extent.end, &start, &end)) + continue; + + /* FIXME raid1 should grace this error */ + if (lov->tgts[loi->loi_ost_idx].active == 0) { + CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx); + GOTO(out_set, rc = -EIO); + } + + OBD_ALLOC(req, sizeof(*req)); + if (req == NULL) + GOTO(out_set, rc = -ENOMEM); + + req->rq_buflen = sizeof(*req->rq_md); + OBD_ALLOC(req->rq_md, req->rq_buflen); + if (req->rq_md == NULL) + GOTO(out_set, rc = -ENOMEM); + + req->rq_extent.start = start; + req->rq_extent.end = end; + + req->rq_idx = loi->loi_ost_idx; + req->rq_stripe = i; + + /* XXX LOV STACKING: submd should be from the subobj */ + req->rq_md->lsm_object_id = loi->loi_id; + req->rq_md->lsm_stripe_count = 0; + + lov_set_add_req(req, set); + } + if (!set->set_count) + GOTO(out_set, rc = -EIO); + *reqset = set; + RETURN(rc); +out_set: + lov_fini_match_set(set, mode, 0); + RETURN(rc); +} + +int lov_fini_cancel_set(struct lov_request_set *set) +{ + int rc = 0; + ENTRY; + + LASSERT(set->set_exp); + if (set == NULL) + RETURN(0); + + if (set->set_lockh) + lov_llh_put(set->set_lockh); + + if (atomic_dec_and_test(&set->set_refcount)) + lov_finish_set(set); + + RETURN(rc); +} + +int lov_prep_cancel_set(struct obd_export *exp, struct lov_stripe_md *lsm, + __u32 mode, struct lustre_handle *lockh, + struct lov_request_set **reqset) +{ + struct lov_request_set *set; + int i, rc = 0; + struct lov_oinfo *loi; + ENTRY; + + OBD_ALLOC(set, sizeof(*set)); + if (set == NULL) + RETURN(-ENOMEM); + lov_init_set(set); + + set->set_exp = exp; + set->set_md = lsm; + set->set_lockh = lov_handle2llh(lockh); + if (set->set_lockh == NULL) { + CERROR("LOV: invalid lov lock handle %p\n", lockh); + GOTO(out_set, rc = -EINVAL); + } + lockh->cookie = set->set_lockh->llh_handle.h_cookie; + + loi = lsm->lsm_oinfo; + for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) { + struct lov_request *req; + struct lustre_handle *lov_lockhp; + + lov_lockhp = set->set_lockh->llh_handles + i; + if (lov_lockhp->cookie == 0) { + CDEBUG(D_HA, "lov idx %d subobj "LPX64" no lock?\n", + loi->loi_ost_idx, loi->loi_id); + continue; + } + + OBD_ALLOC(req, sizeof(*req)); + if (req == NULL) + GOTO(out_set, rc = -ENOMEM); + + req->rq_buflen = sizeof(*req->rq_md); + OBD_ALLOC(req->rq_md, req->rq_buflen); + if (req->rq_md == NULL) + GOTO(out_set, rc = -ENOMEM); + + req->rq_idx = loi->loi_ost_idx; + req->rq_stripe = i; + + /* XXX LOV STACKING: submd should be from the subobj */ + req->rq_md->lsm_object_id = loi->loi_id; + req->rq_md->lsm_stripe_count = 0; + + lov_set_add_req(req, set); + } + if (!set->set_count) + GOTO(out_set, rc = -EIO); + *reqset = set; + RETURN(rc); +out_set: + lov_fini_cancel_set(set); + RETURN(rc); +} + +static int create_done(struct obd_export *exp, struct lov_request_set *set, + struct lov_stripe_md **ea) +{ + struct lov_obd *lov = &exp->exp_obd->u.lov; + struct obd_trans_info *oti = set->set_oti; + struct obdo *src_oa = set->set_oa; + struct list_head *pos; + struct lov_request *req; + struct obdo *ret_oa = NULL; + int attrset = 0, rc = 0; + ENTRY; + + LASSERT(set->set_completes); + + if (!set->set_success) + GOTO(cleanup, rc = -EIO); + if (*ea == NULL && set->set_count != set->set_success) { + set->set_count = set->set_success; + qos_shrink_lsm(set); + } + + ret_oa = obdo_alloc(); + if (ret_oa == NULL) + GOTO(cleanup, rc = -ENOMEM); + + list_for_each (pos, &set->set_list) { + req = list_entry(pos, struct lov_request, rq_link); + if (!req->rq_complete || req->rq_rc) + continue; + lov_merge_attrs(ret_oa, req->rq_oa, req->rq_oa->o_valid, + set->set_md, req->rq_stripe, &attrset); + } + if (src_oa->o_valid & OBD_MD_FLSIZE && + ret_oa->o_size != src_oa->o_size) { + CERROR("original size "LPU64" isn't new object size "LPU64"\n", + src_oa->o_size, ret_oa->o_size); + LBUG(); + } + ret_oa->o_id = src_oa->o_id; + memcpy(src_oa, ret_oa, sizeof(*src_oa)); + obdo_free(ret_oa); + + *ea = set->set_md; + GOTO(done, rc = 0); + +cleanup: + list_for_each (pos, &set->set_list) { + struct obd_export *sub_exp; + int err = 0; + req = list_entry(pos, struct lov_request, rq_link); + + if (!req->rq_complete || req->rq_rc) + continue; + + sub_exp = lov->tgts[req->rq_idx].ltd_exp, + err = obd_destroy(sub_exp, req->rq_oa, NULL, oti); + if (err) + CERROR("Failed to uncreate objid "LPX64" subobj " + ""LPX64" on OST idx %d: rc = %d\n", + set->set_oa->o_id, req->rq_oa->o_id, + req->rq_idx, rc); + } + if (*ea == NULL) + obd_free_memmd(exp, &set->set_md); +done: + if (oti && set->set_cookies) { + oti->oti_logcookies = set->set_cookies; + if (!set->set_cookie_sent) { + oti_free_cookies(oti); + src_oa->o_valid &= ~OBD_MD_FLCOOKIE; + } else { + src_oa->o_valid |= OBD_MD_FLCOOKIE; + } + } + RETURN(rc); +} + + +int lov_fini_create_set(struct lov_request_set *set, struct lov_stripe_md **ea) +{ + int rc = 0; + ENTRY; + + LASSERT(set->set_exp); + if (set == NULL) + RETURN(0); + if (set->set_completes) { + rc = create_done(set->set_exp, set, ea); + /* FIXME update qos data here */ + } + + if (atomic_dec_and_test(&set->set_refcount)) + lov_finish_set(set); + + RETURN(rc); +} + +int lov_update_create_set(struct lov_request_set *set, + struct lov_request *req, int rc) +{ + struct obd_trans_info *oti = set->set_oti; + struct lov_stripe_md *lsm = set->set_md; + struct lov_oinfo *loi; + struct lov_obd *lov = &set->set_exp->exp_obd->u.lov; + ENTRY; + + req->rq_stripe = set->set_success; + loi = &lsm->lsm_oinfo[req->rq_stripe]; + + if (rc && lov->tgts[req->rq_idx].active) { + CERROR("error creating objid "LPX64" sub-object" + " on OST idx %d/%d: rc = %d\n", + set->set_oa->o_id, req->rq_idx, + lsm->lsm_stripe_count, rc); + if (rc > 0) { + CERROR("obd_create returned invalid err %d\n", rc); + rc = -EIO; + } + } + lov_update_set(set, req, rc); + if (rc) + RETURN(rc); + + if (oti && oti->oti_objid) + oti->oti_objid[req->rq_idx] = req->rq_oa->o_id; + + loi->loi_id = req->rq_oa->o_id; + loi->loi_ost_idx = req->rq_idx; + CDEBUG(D_INODE, "objid "LPX64" has subobj "LPX64"/"LPX64" at idx %d\n", + lsm->lsm_object_id, loi->loi_id, loi->loi_id, req->rq_idx); + loi_init(loi); + + if (set->set_cookies) + ++oti->oti_logcookies; + if (req->rq_oa->o_valid & OBD_MD_FLCOOKIE) + set->set_cookie_sent++; + + RETURN(0); +} + +int lov_prep_create_set(struct obd_export *exp, struct lov_stripe_md **ea, + struct obdo *src_oa, struct obd_trans_info *oti, + struct lov_request_set **reqset) +{ + struct lov_obd *lov = &exp->exp_obd->u.lov; + struct lov_request_set *set; + int rc = 0, newea = 0; + ENTRY; + + OBD_ALLOC(set, sizeof(*set)); + if (set == NULL) + RETURN(-ENOMEM); + lov_init_set(set); + + set->set_exp = exp; + set->set_md = *ea; + set->set_oa = src_oa; + set->set_oti = oti; + + if (set->set_md == NULL) { + int stripes, stripe_cnt; + stripe_cnt = lov_get_stripecnt(lov, 0); + + /* If the MDS file was truncated up to some size, stripe over + * enough OSTs to allow the file to be created at that size. */ + if (src_oa->o_valid & OBD_MD_FLSIZE) { + stripes=((src_oa->o_size+LUSTRE_STRIPE_MAXBYTES)>>12)-1; + do_div(stripes, (__u32)(LUSTRE_STRIPE_MAXBYTES >> 12)); + + if (stripes > lov->desc.ld_active_tgt_count) + GOTO(out_set, rc = -EFBIG); + if (stripes < stripe_cnt) + stripes = stripe_cnt; + } else { + stripes = stripe_cnt; + } + + rc = lov_alloc_memmd(&set->set_md, stripes, + lov->desc.ld_pattern ? + lov->desc.ld_pattern : LOV_PATTERN_RAID0); + if (rc < 0) + goto out_set; + newea = 1; + } + + rc = qos_prep_create(lov, set, newea); + if (rc) + goto out_lsm; + + if (oti && (src_oa->o_valid & OBD_MD_FLCOOKIE)) { + oti_alloc_cookies(oti, set->set_count); + if (!oti->oti_logcookies) + goto out_lsm; + set->set_cookies = oti->oti_logcookies; + } + *reqset = set; + RETURN(rc); + +out_lsm: + if (*ea == NULL) + obd_free_memmd(exp, &set->set_md); +out_set: + lov_fini_create_set(set, ea); + RETURN(rc); +} + +static int common_attr_done(struct lov_request_set *set) +{ + struct list_head *pos; + struct lov_request *req; + struct obdo *tmp_oa; + int rc = 0, attrset = 0; + ENTRY; + + if (set->set_oa == NULL) + RETURN(0); + + if (!set->set_success) + RETURN(-EIO); + + tmp_oa = obdo_alloc(); + if (tmp_oa == NULL) + GOTO(out, rc = -ENOMEM); + + list_for_each (pos, &set->set_list) { + req = list_entry(pos, struct lov_request, rq_link); + + if (!req->rq_complete || req->rq_rc) + continue; + if (req->rq_oa->o_valid == 0) /* inactive stripe */ + continue; + lov_merge_attrs(tmp_oa, req->rq_oa, req->rq_oa->o_valid, + set->set_md, req->rq_stripe, &attrset); + } + if (!attrset) { + CERROR("No stripes had valid attrs\n"); + rc = -EIO; + } + tmp_oa->o_id = set->set_oa->o_id; + memcpy(set->set_oa, tmp_oa, sizeof(*set->set_oa)); +out: + obdo_free(tmp_oa); + RETURN(rc); + +} + +static int brw_done(struct lov_request_set *set) +{ + return common_attr_done(set); +} + +int lov_fini_brw_set(struct lov_request_set *set) +{ + int rc = 0; + ENTRY; + + LASSERT(set->set_exp); + if (set == NULL) + RETURN(0); + if (set->set_completes) { + rc = brw_done(set); + /* FIXME update qos data here */ + } + if (atomic_dec_and_test(&set->set_refcount)) + lov_finish_set(set); + + RETURN(rc); +} + +int lov_prep_brw_set(struct obd_export *exp, struct obdo *src_oa, + struct lov_stripe_md *lsm, obd_count oa_bufs, + struct brw_page *pga, struct obd_trans_info *oti, + struct lov_request_set **reqset) +{ + struct { + obd_count index; + obd_count count; + obd_count off; + } *info = NULL; + struct lov_request_set *set; + struct lov_oinfo *loi = NULL; + struct lov_obd *lov = &exp->exp_obd->u.lov; + int rc = 0, i, shift; + ENTRY; + + OBD_ALLOC(set, sizeof(*set)); + if (set == NULL) + RETURN(-ENOMEM); + lov_init_set(set); + + set->set_exp = exp; + set->set_md = lsm; + set->set_oa = src_oa; + set->set_oti = oti; + set->set_oabufs = oa_bufs; + OBD_ALLOC(set->set_pga, oa_bufs * sizeof(*set->set_pga)); + if (!set->set_pga) + GOTO(out, rc = -ENOMEM); + + OBD_ALLOC(info, sizeof(*info) * lsm->lsm_stripe_count); + if (!info) + GOTO(out, rc = -ENOMEM); + + /* calculate the page count for each stripe */ + for (i = 0; i < oa_bufs; i++) { + int stripe = lov_stripe_number(lsm, pga[i].disk_offset); + info[stripe].count++; + } + + /* alloc and initialize lov request */ + loi = lsm->lsm_oinfo; + shift = 0; + for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) { + struct lov_request *req; + + if (info[i].count == 0) + continue; + + if (lov->tgts[loi->loi_ost_idx].active == 0) { + CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx); + GOTO(out, rc = -EIO); + } + + OBD_ALLOC(req, sizeof(*req)); + if (req == NULL) + GOTO(out, rc = -ENOMEM); + + req->rq_oa = obdo_alloc(); + if (req->rq_oa == NULL) + GOTO(out, rc = -ENOMEM); + + if (src_oa) + memcpy(req->rq_oa, src_oa, sizeof(*req->rq_oa)); + req->rq_oa->o_id = loi->loi_id; + + req->rq_buflen = sizeof(*req->rq_md); + OBD_ALLOC(req->rq_md, req->rq_buflen); + if (req->rq_md == NULL) + GOTO(out, rc = -ENOMEM); + + req->rq_idx = loi->loi_ost_idx; + req->rq_stripe = i; + + /* XXX LOV STACKING */ + req->rq_md->lsm_object_id = loi->loi_id; + req->rq_md->lsm_object_gr = lsm->lsm_object_gr; + req->rq_oabufs = info[i].count; + req->rq_pgaidx = shift; + shift += req->rq_oabufs; + + /* remember the index for sort brw_page array */ + info[i].index = req->rq_pgaidx; + + lov_set_add_req(req, set); + } + if (!set->set_count) + GOTO(out, rc = -EIO); + + /* rotate & sort the brw_page array */ + for (i = 0; i < oa_bufs; i++) { + int stripe = lov_stripe_number(lsm, pga[i].disk_offset); + + shift = info[stripe].index + info[stripe].off; + LASSERT(shift < oa_bufs); + set->set_pga[shift] = pga[i]; + lov_stripe_offset(lsm, pga[i].disk_offset, stripe, + &set->set_pga[shift].disk_offset); + info[stripe].off++; + } +out: + if (info) + OBD_FREE(info, sizeof(*info) * lsm->lsm_stripe_count); + + if (rc == 0) + *reqset = set; + else + lov_fini_brw_set(set); + + RETURN(rc); +} + +static int getattr_done(struct lov_request_set *set) +{ + return common_attr_done(set); +} + +int lov_fini_getattr_set(struct lov_request_set *set) +{ + int rc = 0; + ENTRY; + + LASSERT(set->set_exp); + if (set == NULL) + RETURN(0); + if (set->set_completes) + rc = getattr_done(set); + + if (atomic_dec_and_test(&set->set_refcount)) + lov_finish_set(set); + + RETURN(rc); +} + +int lov_prep_getattr_set(struct obd_export *exp, struct obdo *src_oa, + struct lov_stripe_md *lsm, + struct lov_request_set **reqset) +{ + struct lov_request_set *set; + struct lov_oinfo *loi = NULL; + struct lov_obd *lov = &exp->exp_obd->u.lov; + int rc = 0, i; + ENTRY; + + OBD_ALLOC(set, sizeof(*set)); + if (set == NULL) + RETURN(-ENOMEM); + lov_init_set(set); + + set->set_exp = exp; + set->set_md = lsm; + set->set_oa = src_oa; + + loi = lsm->lsm_oinfo; + for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) { + struct lov_request *req; + + if (lov->tgts[loi->loi_ost_idx].active == 0) { + CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx); + continue; + } + + OBD_ALLOC(req, sizeof(*req)); + if (req == NULL) + GOTO(out_set, rc = -ENOMEM); + + req->rq_stripe = i; + req->rq_idx = loi->loi_ost_idx; + + req->rq_oa = obdo_alloc(); + if (req->rq_oa == NULL) + GOTO(out_set, rc = -ENOMEM); + memcpy(req->rq_oa, src_oa, sizeof(*req->rq_oa)); + req->rq_oa->o_id = loi->loi_id; + + lov_set_add_req(req, set); + } + if (!set->set_count) + GOTO(out_set, rc = -EIO); + *reqset = set; + RETURN(rc); +out_set: + lov_fini_getattr_set(set); + RETURN(rc); +} + +int lov_fini_destroy_set(struct lov_request_set *set) +{ + ENTRY; + + LASSERT(set->set_exp); + if (set == NULL) + RETURN(0); + if (set->set_completes) { + /* FIXME update qos data here */ + } + + if (atomic_dec_and_test(&set->set_refcount)) + lov_finish_set(set); + + RETURN(0); +} + +int lov_prep_destroy_set(struct obd_export *exp, struct obdo *src_oa, + struct lov_stripe_md *lsm, + struct obd_trans_info *oti, + struct lov_request_set **reqset) +{ + struct lov_request_set *set; + struct lov_oinfo *loi = NULL; + struct lov_obd *lov = &exp->exp_obd->u.lov; + int rc = 0, cookie_set = 0, i; + ENTRY; + + OBD_ALLOC(set, sizeof(*set)); + if (set == NULL) + RETURN(-ENOMEM); + lov_init_set(set); + + set->set_exp = exp; + set->set_md = lsm; + set->set_oa = src_oa; + set->set_oti = oti; + if (oti != NULL && src_oa->o_valid & OBD_MD_FLCOOKIE) + set->set_cookies = oti->oti_logcookies; + + loi = lsm->lsm_oinfo; + for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) { + struct lov_request *req; + + if (lov->tgts[loi->loi_ost_idx].active == 0) { + CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx); + continue; + } + + OBD_ALLOC(req, sizeof(*req)); + if (req == NULL) + GOTO(out_set, rc = -ENOMEM); + + req->rq_stripe = i; + req->rq_idx = loi->loi_ost_idx; + + req->rq_oa = obdo_alloc(); + if (req->rq_oa == NULL) + GOTO(out_set, rc = -ENOMEM); + memcpy(req->rq_oa, src_oa, sizeof(*req->rq_oa)); + req->rq_oa->o_id = loi->loi_id; + + /* Setup the first request's cookie position */ + if (!cookie_set && set->set_cookies) { + oti->oti_logcookies = set->set_cookies + i; + cookie_set = 1; + } + lov_set_add_req(req, set); + } + if (!set->set_count) + GOTO(out_set, rc = -EIO); + *reqset = set; + RETURN(rc); +out_set: + lov_fini_destroy_set(set); + RETURN(rc); +} + +static int setattr_done(struct lov_request_set *set) +{ + return common_attr_done(set); +} + +int lov_fini_setattr_set(struct lov_request_set *set) +{ + int rc = 0; + ENTRY; + + LASSERT(set->set_exp); + if (set == NULL) + RETURN(0); + if (set->set_completes) { + rc = setattr_done(set); + /* FIXME update qos data here */ + } + + if (atomic_dec_and_test(&set->set_refcount)) + lov_finish_set(set); + RETURN(rc); +} + +int lov_prep_setattr_set(struct obd_export *exp, struct obdo *src_oa, + struct lov_stripe_md *lsm, struct obd_trans_info *oti, + struct lov_request_set **reqset) +{ + struct lov_request_set *set; + struct lov_oinfo *loi = NULL; + struct lov_obd *lov = &exp->exp_obd->u.lov; + int rc = 0, i; + ENTRY; + + OBD_ALLOC(set, sizeof(*set)); + if (set == NULL) + RETURN(-ENOMEM); + lov_init_set(set); + + set->set_exp = exp; + set->set_md = lsm; + set->set_oa = src_oa; + + loi = lsm->lsm_oinfo; + for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) { + struct lov_request *req; + + if (lov->tgts[loi->loi_ost_idx].active == 0) { + CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx); + continue; + } + + OBD_ALLOC(req, sizeof(*req)); + if (req == NULL) + GOTO(out_set, rc = -ENOMEM); + req->rq_stripe = i; + req->rq_idx = loi->loi_ost_idx; + + req->rq_oa = obdo_alloc(); + if (req->rq_oa == NULL) + GOTO(out_set, rc = -ENOMEM); + memcpy(req->rq_oa, src_oa, sizeof(*req->rq_oa)); + req->rq_oa->o_id = loi->loi_id; + + if (src_oa->o_valid & OBD_MD_FLSIZE) { + if (lov_stripe_offset(lsm, src_oa->o_size, i, + &req->rq_oa->o_size) < 0 && + req->rq_oa->o_size) + req->rq_oa->o_size--; + CDEBUG(D_INODE, "stripe %d has size "LPU64"/"LPU64"\n", + i, req->rq_oa->o_size, src_oa->o_size); + } + lov_set_add_req(req, set); + } + if (!set->set_count) + GOTO(out_set, rc = -EIO); + *reqset = set; + RETURN(rc); +out_set: + lov_fini_setattr_set(set); + RETURN(rc); +} + +int lov_update_punch_set(struct lov_request_set *set, struct lov_request *req, + int rc) +{ + struct lov_stripe_md *lsm = set->set_md; + struct lov_obd *lov = &set->set_exp->exp_obd->u.lov; + ENTRY; + + lov_update_set(set, req, rc); + if (rc == 0) { + struct lov_oinfo *loi = &lsm->lsm_oinfo[req->rq_stripe]; + loi->loi_kms = loi->loi_rss = req->rq_extent.start; + } + if (rc && !lov->tgts[req->rq_idx].active) + rc = 0; + /* FIXME in raid1 regime, should return 0 */ + RETURN(rc); +} + +int lov_fini_punch_set(struct lov_request_set *set) +{ + int rc = 0; + ENTRY; + + LASSERT(set->set_exp); + if (set == NULL) + RETURN(0); + if (set->set_completes) { + if (!set->set_success) + rc = -EIO; + /* FIXME update qos data here */ + } + + if (atomic_dec_and_test(&set->set_refcount)) + lov_finish_set(set); + + RETURN(rc); +} + +int lov_prep_punch_set(struct obd_export *exp, struct obdo *src_oa, + struct lov_stripe_md *lsm, obd_off start, + obd_off end, struct obd_trans_info *oti, + struct lov_request_set **reqset) +{ + struct lov_request_set *set; + struct lov_oinfo *loi = NULL; + struct lov_obd *lov = &exp->exp_obd->u.lov; + int rc = 0, i; + ENTRY; + + OBD_ALLOC(set, sizeof(*set)); + if (set == NULL) + RETURN(-ENOMEM); + lov_init_set(set); + + set->set_exp = exp; + set->set_md = lsm; + set->set_oa = src_oa; + + loi = lsm->lsm_oinfo; + for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) { + struct lov_request *req; + obd_off rs, re; + + if (lov->tgts[loi->loi_ost_idx].active == 0) { + CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx); + continue; + } + + if (!lov_stripe_intersects(lsm, i, start, end, &rs, &re)) + continue; + + OBD_ALLOC(req, sizeof(*req)); + if (req == NULL) + GOTO(out_set, rc = -ENOMEM); + req->rq_stripe = i; + req->rq_idx = loi->loi_ost_idx; + + req->rq_oa = obdo_alloc(); + if (req->rq_oa == NULL) + GOTO(out_set, rc = -ENOMEM); + memcpy(req->rq_oa, src_oa, sizeof(*req->rq_oa)); + req->rq_oa->o_id = loi->loi_id; + + req->rq_extent.start = rs; + req->rq_extent.end = re; + + lov_set_add_req(req, set); + } + if (!set->set_count) + GOTO(out_set, rc = -EIO); + *reqset = set; + RETURN(rc); +out_set: + lov_fini_punch_set(set); + RETURN(rc); +} + +int lov_fini_sync_set(struct lov_request_set *set) +{ + int rc = 0; + ENTRY; + + LASSERT(set->set_exp); + if (set == NULL) + RETURN(0); + if (set->set_completes) { + if (!set->set_success) + rc = -EIO; + /* FIXME update qos data here */ + } + + if (atomic_dec_and_test(&set->set_refcount)) + lov_finish_set(set); + + RETURN(rc); +} + +int lov_prep_sync_set(struct obd_export *exp, struct obdo *src_oa, + struct lov_stripe_md *lsm, obd_off start, + obd_off end, struct lov_request_set **reqset) +{ + struct lov_request_set *set; + struct lov_oinfo *loi = NULL; + struct lov_obd *lov = &exp->exp_obd->u.lov; + int rc = 0, i; + ENTRY; + + OBD_ALLOC(set, sizeof(*set)); + if (set == NULL) + RETURN(-ENOMEM); + lov_init_set(set); + + set->set_exp = exp; + set->set_md = lsm; + set->set_oa = src_oa; + + loi = lsm->lsm_oinfo; + for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) { + struct lov_request *req; + obd_off rs, re; + + if (lov->tgts[loi->loi_ost_idx].active == 0) { + CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx); + continue; + } + + if (!lov_stripe_intersects(lsm, i, start, end, &rs, &re)) + continue; + + OBD_ALLOC(req, sizeof(*req)); + if (req == NULL) + GOTO(out_set, rc = -ENOMEM); + req->rq_stripe = i; + req->rq_idx = loi->loi_ost_idx; + + req->rq_oa = obdo_alloc(); + if (req->rq_oa == NULL) + GOTO(out_set, rc = -ENOMEM); + memcpy(req->rq_oa, src_oa, sizeof(*req->rq_oa)); + req->rq_oa->o_id = loi->loi_id; + + req->rq_extent.start = rs; + req->rq_extent.end = re; + + lov_set_add_req(req, set); + } + if (!set->set_count) + GOTO(out_set, rc = -EIO); + *reqset = set; + RETURN(rc); +out_set: + lov_fini_sync_set(set); + RETURN(rc); +} -- 1.8.3.1