Whamcloud - gitweb
LU-80 lov: large stripe count support
[fs/lustre-release.git] / lustre / lov / lov_request.c
index f99307c..d8c1fea 100644 (file)
@@ -1,22 +1,40 @@
 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
  * vim:expandtab:shiftwidth=8:tabstop=8:
  *
- * Copyright (C) 2002, 2003 Cluster File Systems, Inc.
+ * GPL HEADER START
  *
- *   This file is part of Lustre, http://www.lustre.org.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  *
- *   Lustre is free software; you can redistribute it and/or
- *   modify it under the terms of version 2 of the GNU General Public
- *   License as published by the Free Software Foundation.
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 only,
+ * as published by the Free Software Foundation.
  *
- *   Lustre is distributed in the hope that it will be useful,
- *   but WITHOUT ANY WARRANTY; without even the implied warranty of
- *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
- *   GNU General Public License for more details.
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * General Public License version 2 for more details (a copy is included
+ * in the LICENSE file that accompanied this code).
  *
- *   You should have received a copy of the GNU General Public License
- *   along with Lustre; if not, write to the Free Software
- *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+ * You should have received a copy of the GNU General Public License
+ * version 2 along with this program; If not, see
+ * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
+ *
+ * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
+ * CA 95054 USA or visit www.sun.com if you need additional information or
+ * have any questions.
+ *
+ * GPL HEADER END
+ */
+/*
+ * Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved.
+ * Use is subject to license terms.
+ *
+ * Copyright (c) 2011 Whamcloud, Inc.
+ *
+ */
+/*
+ * This file is part of Lustre, http://www.lustre.org/
+ * Lustre is a trademark of Sun Microsystems, Inc.
  */
 
 #ifndef EXPORT_SYMTAB
 #define DEBUG_SUBSYSTEM S_LOV
 
 #ifdef __KERNEL__
-#include <asm/div64.h>
+#include <libcfs/libcfs.h>
 #else
 #include <liblustre.h>
 #endif
 
-#include <linux/obd_class.h>
-#include <linux/obd_lov.h>
-#include <linux/lustre_idl.h>
+#include <obd_class.h>
+#include <obd_lov.h>
+#include <lustre/lustre_idl.h>
 
 #include "lov_internal.h"
 
 static void lov_init_set(struct lov_request_set *set)
 {
         set->set_count = 0;
-        set->set_completes = 0;
-        set->set_success = 0;
-        INIT_LIST_HEAD(&set->set_list);
-        atomic_set(&set->set_refcount, 1);
+        cfs_atomic_set(&set->set_completes, 0);
+        cfs_atomic_set(&set->set_success, 0);
+        set->set_cookies = 0;
+        CFS_INIT_LIST_HEAD(&set->set_list);
+        cfs_atomic_set(&set->set_refcount, 1);
+        cfs_waitq_init(&set->set_waitq);
+        cfs_spin_lock_init(&set->set_lock);
 }
 
-static void lov_finish_set(struct lov_request_set *set)
+void lov_finish_set(struct lov_request_set *set)
 {
-        struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
-        struct list_head *pos, *n;
+        cfs_list_t *pos, *n;
         ENTRY;
 
         LASSERT(set);
-        list_for_each_safe(pos, n, &set->set_list) {
-                struct lov_request *req;
-                struct lov_tgt_desc *tgt;
-
-                req = list_entry(pos, struct lov_request, rq_link);
-                LASSERT(req->rq_idx >= 0);
-
-                tgt = lov->tgts + req->rq_idx;
-                lov_tgt_decref(lov, tgt);
-
-                list_del_init(&req->rq_link);
-
-                if (req->rq_oa)
-                        obdo_free(req->rq_oa);
-                if (req->rq_md)
-                        OBD_FREE(req->rq_md, req->rq_buflen);
+        cfs_list_for_each_safe(pos, n, &set->set_list) {
+                struct lov_request *req = cfs_list_entry(pos,
+                                                         struct lov_request,
+                                                         rq_link);
+                cfs_list_del_init(&req->rq_link);
+
+                if (req->rq_oi.oi_oa)
+                        OBDO_FREE(req->rq_oi.oi_oa);
+                if (req->rq_oi.oi_md)
+                        OBD_FREE_LARGE(req->rq_oi.oi_md, req->rq_buflen);
+                if (req->rq_oi.oi_osfs)
+                        OBD_FREE(req->rq_oi.oi_osfs,
+                                 sizeof(*req->rq_oi.oi_osfs));
                 OBD_FREE(req, sizeof(*req));
         }
 
         if (set->set_pga) {
                 int len = set->set_oabufs * sizeof(*set->set_pga);
-                OBD_FREE(set->set_pga, len);
+                OBD_FREE_LARGE(set->set_pga, len);
         }
         if (set->set_lockh)
                 lov_llh_put(set->set_lockh);
@@ -82,15 +99,26 @@ static void lov_finish_set(struct lov_request_set *set)
         EXIT;
 }
 
-static void lov_update_set(struct lov_request_set *set,
-                           struct lov_request *req, int rc)
+int lov_finished_set(struct lov_request_set *set)
+{
+        int completes = cfs_atomic_read(&set->set_completes);
+
+        CDEBUG(D_INFO, "check set %d/%d\n", completes,
+               set->set_count);
+        return completes == set->set_count;
+}
+
+void lov_update_set(struct lov_request_set *set,
+                    struct lov_request *req, int rc)
 {
         req->rq_complete = 1;
         req->rq_rc = rc;
 
-        set->set_completes++;
+        cfs_atomic_inc(&set->set_completes);
         if (rc == 0)
-                set->set_success++;
+                cfs_atomic_inc(&set->set_success);
+
+        cfs_waitq_signal(&set->set_waitq);
 }
 
 int lov_update_common_set(struct lov_request_set *set,
@@ -102,14 +130,9 @@ int lov_update_common_set(struct lov_request_set *set,
         lov_update_set(set, req, rc);
 
         /* grace error on inactive ost */
-        if (rc) {
-                struct lov_tgt_desc *tgt = lov->tgts + req->rq_idx;
-
-                if (lov_tgt_active(lov, tgt, req->rq_gen))
-                        lov_tgt_decref(lov, tgt);
-                else
-                        rc = 0;
-        }
+        if (rc && !(lov->lov_tgts[req->rq_idx] &&
+                    lov->lov_tgts[req->rq_idx]->ltd_active))
+                rc = 0;
 
         /* FIXME in raid1 regime, should return 0 */
         RETURN(rc);
@@ -117,157 +140,148 @@ int lov_update_common_set(struct lov_request_set *set,
 
 void lov_set_add_req(struct lov_request *req, struct lov_request_set *set)
 {
-        list_add_tail(&req->rq_link, &set->set_list);
+        cfs_list_add_tail(&req->rq_link, &set->set_list);
         set->set_count++;
+        req->rq_rqset = set;
 }
 
-int lov_update_enqueue_set(struct lov_request_set *set,
-                           struct lov_request *req, int rc, int flags)
+extern void osc_update_enqueue(struct lustre_handle *lov_lockhp,
+                               struct lov_oinfo *loi, int flags,
+                               struct ost_lvb *lvb, __u32 mode, int rc);
+
+static int lov_update_enqueue_lov(struct obd_export *exp,
+                                  struct lustre_handle *lov_lockhp,
+                                  struct lov_oinfo *loi, int flags, int idx,
+                                  __u64 oid, int rc)
 {
+        struct lov_obd *lov = &exp->exp_obd->u.lov;
+
+        if (rc != ELDLM_OK &&
+            !(rc == ELDLM_LOCK_ABORTED && (flags & LDLM_FL_HAS_INTENT))) {
+                memset(lov_lockhp, 0, sizeof(*lov_lockhp));
+                if (lov->lov_tgts[idx] && lov->lov_tgts[idx]->ltd_active) {
+                        /* -EUSERS used by OST to report file contention */
+                        if (rc != -EINTR && rc != -EUSERS)
+                                CERROR("enqueue objid "LPX64" subobj "
+                                       LPX64" on OST idx %d: rc %d\n",
+                                       oid, loi->loi_id, loi->loi_ost_idx, rc);
+                } else
+                        rc = ELDLM_OK;
+        }
+        return rc;
+}
+
+int lov_update_enqueue_set(struct lov_request *req, __u32 mode, int rc)
+{
+        struct lov_request_set *set = req->rq_rqset;
         struct lustre_handle *lov_lockhp;
+        struct obd_info *oi = set->set_oi;
         struct lov_oinfo *loi;
         ENTRY;
 
+        LASSERT(oi != NULL);
+
         lov_lockhp = set->set_lockh->llh_handles + req->rq_stripe;
-        loi = &set->set_md->lsm_oinfo[req->rq_stripe];
+        loi = oi->oi_md->lsm_oinfo[req->rq_stripe];
 
-        /* XXX FIXME: This unpleasantness doesn't belong here at *all*.
-         * It belongs in the OSC, except that the OSC doesn't have
-         * access to the real LOI -- it gets a copy, that we created
-         * above, and that copy can be arbitrarily out of date.
+        /* XXX LOV STACKING: OSC gets a copy, created in lov_prep_enqueue_set
+         * and that copy can be arbitrarily out of date.
          *
          * The LOV API is due for a serious rewriting anyways, and this
          * can be addressed then. */
-        if (rc == ELDLM_OK) {
-                struct ldlm_lock *lock = ldlm_handle2lock(lov_lockhp);
-                __u64 tmp = req->rq_md->lsm_oinfo->loi_rss;
-
-                LASSERT(lock != NULL);
-                loi->loi_rss = tmp;
-                loi->loi_mtime = req->rq_md->lsm_oinfo->loi_mtime;
-                loi->loi_blocks = req->rq_md->lsm_oinfo->loi_blocks;
-                /* Extend KMS up to the end of this lock and no further
-                 * A lock on [x,y] means a KMS of up to y + 1 bytes! */
-                if (tmp > lock->l_policy_data.l_extent.end)
-                        tmp = lock->l_policy_data.l_extent.end + 1;
-                if (tmp >= loi->loi_kms) {
-                        CDEBUG(D_INODE, "lock acquired, setting rss="
-                               LPU64", kms="LPU64"\n", loi->loi_rss, tmp);
-                        loi->loi_kms = tmp;
-                        loi->loi_kms_valid = 1;
-                } else {
-                        CDEBUG(D_INODE, "lock acquired, setting rss="
-                               LPU64"; leaving kms="LPU64", end="LPU64
-                               "\n", loi->loi_rss, loi->loi_kms,
-                               lock->l_policy_data.l_extent.end);
-                }
-                ldlm_lock_allow_match(lock);
-                LDLM_LOCK_PUT(lock);
-        } else if (rc == ELDLM_LOCK_ABORTED && flags & LDLM_FL_HAS_INTENT) {
-                memset(lov_lockhp, 0, sizeof(*lov_lockhp));
-                loi->loi_rss = req->rq_md->lsm_oinfo->loi_rss;
-                loi->loi_mtime = req->rq_md->lsm_oinfo->loi_mtime;
-                loi->loi_blocks = req->rq_md->lsm_oinfo->loi_blocks;
-                CDEBUG(D_INODE, "glimpsed, setting rss="LPU64"; leaving"
-                       " kms="LPU64"\n", loi->loi_rss, loi->loi_kms);
-                rc = ELDLM_OK;
-        } else {
-                struct obd_export *exp = set->set_exp;
-                struct lov_obd *lov = &exp->exp_obd->u.lov;
-                struct lov_tgt_desc *tgt = lov->tgts + req->rq_idx;
 
-                memset(lov_lockhp, 0, sizeof(*lov_lockhp));
-                if (lov_tgt_ready(lov, tgt, req->rq_gen)) {
-                        lov_tgt_decref(lov, tgt);
-                        CERROR("error: enqueue objid "LPX64" subobj "
-                                LPX64" on OST idx %d: rc = %d\n",
-                                set->set_md->lsm_object_id, loi->loi_id,
-                                loi->loi_ost_idx, rc);
-                } else {
-                        CERROR("error: enqueue objid "LPX64" subobj "
-                                LPX64" on OST idx %d: rc = %d, NOT ACTIVE !\n",
-                                set->set_md->lsm_object_id, loi->loi_id,
-                                loi->loi_ost_idx, rc);
-                        rc = ELDLM_OK;
-                }
-        }
+        lov_stripe_lock(oi->oi_md);
+        osc_update_enqueue(lov_lockhp, loi, oi->oi_flags,
+                           &req->rq_oi.oi_md->lsm_oinfo[0]->loi_lvb, mode, rc);
+        if (rc == ELDLM_LOCK_ABORTED && (oi->oi_flags & LDLM_FL_HAS_INTENT))
+                memset(lov_lockhp, 0, sizeof *lov_lockhp);
+        rc = lov_update_enqueue_lov(set->set_exp, lov_lockhp, loi, oi->oi_flags,
+                                    req->rq_idx, oi->oi_md->lsm_object_id, rc);
+        lov_stripe_unlock(oi->oi_md);
         lov_update_set(set, req, rc);
         RETURN(rc);
 }
 
-static int enqueue_done(struct lov_request_set *set, __u32 mode, int flags)
+/* The callback for osc_enqueue that updates lov info for every OSC request. */
+static int cb_update_enqueue(void *cookie, int rc)
+{
+        struct obd_info *oinfo = cookie;
+        struct ldlm_enqueue_info *einfo;
+        struct lov_request *lovreq;
+
+        lovreq = container_of(oinfo, struct lov_request, rq_oi);
+        einfo = lovreq->rq_rqset->set_ei;
+        return lov_update_enqueue_set(lovreq, einfo->ei_mode, rc);
+}
+
+static int enqueue_done(struct lov_request_set *set, __u32 mode)
 {
-        struct list_head *pos;
         struct lov_request *req;
-        struct lustre_handle *lov_lockhp = NULL;
         struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
-        struct lov_tgt_desc *tgt;
+        int completes = cfs_atomic_read(&set->set_completes);
         int rc = 0;
         ENTRY;
 
-        LASSERT(set->set_completes);
         /* enqueue/match success, just return */
-        if (set->set_completes == set->set_success) {
-                if (flags & LDLM_FL_TEST_LOCK)
-                        lov_llh_put(set->set_lockh);
+        if (completes && completes == cfs_atomic_read(&set->set_success))
                 RETURN(0);
-        }
 
         /* cancel enqueued/matched locks */
-        list_for_each (pos, &set->set_list) {
-                req = list_entry(pos, struct lov_request, rq_link);
+        cfs_list_for_each_entry(req, &set->set_list, rq_link) {
+                struct lustre_handle *lov_lockhp;
 
                 if (!req->rq_complete || req->rq_rc)
                         continue;
-                if (flags & LDLM_FL_TEST_LOCK)
-                        continue;
 
                 lov_lockhp = set->set_lockh->llh_handles + req->rq_stripe;
                 LASSERT(lov_lockhp);
-                if (lov_lockhp->cookie == 0)
+                if (!lustre_handle_is_used(lov_lockhp))
                         continue;
 
-                tgt = lov->tgts + req->rq_idx;
-                rc = obd_cancel(tgt->ltd_exp, req->rq_md, mode, lov_lockhp);
-                if (rc && lov_tgt_ready(lov, tgt, req->rq_gen)) {
-                        lov_tgt_decref(lov, tgt);
+                rc = obd_cancel(lov->lov_tgts[req->rq_idx]->ltd_exp,
+                                req->rq_oi.oi_md, mode, lov_lockhp);
+                if (rc && lov->lov_tgts[req->rq_idx] &&
+                    lov->lov_tgts[req->rq_idx]->ltd_active)
                         CERROR("cancelling obdjid "LPX64" on OST "
                                "idx %d error: rc = %d\n",
-                               req->rq_md->lsm_object_id, req->rq_idx, rc);
-                }
+                               req->rq_oi.oi_md->lsm_object_id,
+                               req->rq_idx, rc);
         }
-        lov_llh_put(set->set_lockh);
+        if (set->set_lockh)
+                lov_llh_put(set->set_lockh);
         RETURN(rc);
 }
 
-int lov_fini_enqueue_set(struct lov_request_set *set, __u32 mode)
+int lov_fini_enqueue_set(struct lov_request_set *set, __u32 mode, int rc,
+                         struct ptlrpc_request_set *rqset)
 {
-        int rc = 0;
+        int ret = 0;
         ENTRY;
 
-        LASSERT(set->set_exp);
         if (set == NULL)
                 RETURN(0);
-        if (set->set_completes)
-                rc = enqueue_done(set, mode, 0);
-        else
+        LASSERT(set->set_exp);
+        /* Do enqueue_done only for sync requests and if any request
+         * succeeded. */
+        if (!rqset) {
+                if (rc)
+                        cfs_atomic_set(&set->set_completes, 0);
+                ret = enqueue_done(set, mode);
+        } else if (set->set_lockh)
                 lov_llh_put(set->set_lockh);
 
-        if (atomic_dec_and_test(&set->set_refcount))
-                lov_finish_set(set);
+        lov_put_reqset(set);
 
-        RETURN(rc);
+        RETURN(rc ? rc : ret);
 }
 
-int lov_prep_enqueue_set(struct obd_export *exp, struct lov_stripe_md *lsm,
-                         ldlm_policy_data_t *policy, __u32 mode,
-                         struct lustre_handle *lockh,
+int lov_prep_enqueue_set(struct obd_export *exp, struct obd_info *oinfo,
+                         struct ldlm_enqueue_info *einfo,
                          struct lov_request_set **reqset)
 {
         struct lov_obd *lov = &exp->exp_obd->u.lov;
         struct lov_request_set *set;
         int i, rc = 0;
-        struct lov_oinfo *loi;
         ENTRY;
 
         OBD_ALLOC(set, sizeof(*set));
@@ -276,59 +290,70 @@ int lov_prep_enqueue_set(struct obd_export *exp, struct lov_stripe_md *lsm,
         lov_init_set(set);
 
         set->set_exp = exp;
-        set->set_md = lsm;
-        set->set_lockh = lov_llh_new(lsm);
+        set->set_oi = oinfo;
+        set->set_ei = einfo;
+        set->set_lockh = lov_llh_new(oinfo->oi_md);
         if (set->set_lockh == NULL)
                 GOTO(out_set, rc = -ENOMEM);
-        lockh->cookie = set->set_lockh->llh_handle.h_cookie;
+        oinfo->oi_lockh->cookie = set->set_lockh->llh_handle.h_cookie;
 
-        loi = lsm->lsm_oinfo;
-        for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
-                struct lov_tgt_desc *tgt;
+        for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
+                struct lov_oinfo *loi;
                 struct lov_request *req;
                 obd_off start, end;
 
-                if (!lov_stripe_intersects(lsm, i, policy->l_extent.start,
-                                           policy->l_extent.end, &start, &end))
+                loi = oinfo->oi_md->lsm_oinfo[i];
+                if (!lov_stripe_intersects(oinfo->oi_md, i,
+                                           oinfo->oi_policy.l_extent.start,
+                                           oinfo->oi_policy.l_extent.end,
+                                           &start, &end))
                         continue;
 
-                tgt = lov->tgts + loi->loi_ost_idx;
-                if (!lov_tgt_active(lov, tgt, loi->loi_ost_gen)) {
+                if (!lov->lov_tgts[loi->loi_ost_idx] ||
+                    !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
                         continue;
                 }
 
                 OBD_ALLOC(req, sizeof(*req));
-                if (req == NULL) {
-                        lov_tgt_decref(lov, tgt);
+                if (req == NULL)
                         GOTO(out_set, rc = -ENOMEM);
-                }
 
-                req->rq_buflen = sizeof(*req->rq_md) +
+                req->rq_buflen = sizeof(*req->rq_oi.oi_md) +
+                        sizeof(struct lov_oinfo *) +
                         sizeof(struct lov_oinfo);
-                OBD_ALLOC(req->rq_md, req->rq_buflen);
-                if (req->rq_md == NULL) {
+                OBD_ALLOC_LARGE(req->rq_oi.oi_md, req->rq_buflen);
+                if (req->rq_oi.oi_md == NULL) {
                         OBD_FREE(req, sizeof(*req));
-                        lov_tgt_decref(lov, tgt);
                         GOTO(out_set, rc = -ENOMEM);
                 }
+                req->rq_oi.oi_md->lsm_oinfo[0] =
+                        ((void *)req->rq_oi.oi_md) + sizeof(*req->rq_oi.oi_md) +
+                        sizeof(struct lov_oinfo *);
+
+                /* Set lov request specific parameters. */
+                req->rq_oi.oi_lockh = set->set_lockh->llh_handles + i;
+                req->rq_oi.oi_cb_up = cb_update_enqueue;
+                req->rq_oi.oi_flags = oinfo->oi_flags;
 
-                req->rq_extent.start = start;
-                req->rq_extent.end = end;
+                LASSERT(req->rq_oi.oi_lockh);
+
+                req->rq_oi.oi_policy.l_extent.gid =
+                        oinfo->oi_policy.l_extent.gid;
+                req->rq_oi.oi_policy.l_extent.start = start;
+                req->rq_oi.oi_policy.l_extent.end = end;
 
                 req->rq_idx = loi->loi_ost_idx;
-                req->rq_gen = loi->loi_ost_gen;
                 req->rq_stripe = i;
 
                 /* XXX LOV STACKING: submd should be from the subobj */
-                req->rq_md->lsm_object_id = loi->loi_id;
-                req->rq_md->lsm_object_gr = lsm->lsm_object_gr;
-                req->rq_md->lsm_stripe_count = 0;
-                req->rq_md->lsm_oinfo->loi_kms_valid = loi->loi_kms_valid;
-                req->rq_md->lsm_oinfo->loi_rss = loi->loi_rss;
-                req->rq_md->lsm_oinfo->loi_kms = loi->loi_kms;
-                req->rq_md->lsm_oinfo->loi_blocks = loi->loi_blocks;
-                loi->loi_mtime = req->rq_md->lsm_oinfo->loi_mtime;
+                req->rq_oi.oi_md->lsm_object_id = loi->loi_id;
+                req->rq_oi.oi_md->lsm_object_seq = loi->loi_seq;
+                req->rq_oi.oi_md->lsm_stripe_count = 0;
+                req->rq_oi.oi_md->lsm_oinfo[0]->loi_kms_valid =
+                        loi->loi_kms_valid;
+                req->rq_oi.oi_md->lsm_oinfo[0]->loi_kms = loi->loi_kms;
+                req->rq_oi.oi_md->lsm_oinfo[0]->loi_lvb = loi->loi_lvb;
 
                 lov_set_add_req(req, set);
         }
@@ -337,15 +362,7 @@ int lov_prep_enqueue_set(struct obd_export *exp, struct lov_stripe_md *lsm,
         *reqset = set;
         RETURN(0);
 out_set:
-        lov_fini_enqueue_set(set, mode);
-        RETURN(rc);
-}
-
-int lov_update_match_set(struct lov_request_set *set, struct lov_request *req,
-                         int rc)
-{
-        ENTRY;
-        lov_update_set(set, req, !rc);
+        lov_fini_enqueue_set(set, einfo->ei_mode, rc, NULL);
         RETURN(rc);
 }
 
@@ -354,29 +371,27 @@ int lov_fini_match_set(struct lov_request_set *set, __u32 mode, int flags)
         int rc = 0;
         ENTRY;
 
-        LASSERT(set->set_exp);
         if (set == NULL)
                 RETURN(0);
-        if (set->set_completes)
-                rc = enqueue_done(set, mode, flags);
-        else
+        LASSERT(set->set_exp);
+        rc = enqueue_done(set, mode);
+        if ((set->set_count == cfs_atomic_read(&set->set_success)) &&
+            (flags & LDLM_FL_TEST_LOCK))
                 lov_llh_put(set->set_lockh);
 
-        if (atomic_dec_and_test(&set->set_refcount))
-                lov_finish_set(set);
+        lov_put_reqset(set);
 
         RETURN(rc);
 }
 
-int lov_prep_match_set(struct obd_export *exp, struct lov_stripe_md *lsm,
-                       ldlm_policy_data_t *policy, __u32 mode,
-                       struct lustre_handle *lockh,
+int lov_prep_match_set(struct obd_export *exp, struct obd_info *oinfo,
+                       struct lov_stripe_md *lsm, ldlm_policy_data_t *policy,
+                       __u32 mode, struct lustre_handle *lockh,
                        struct lov_request_set **reqset)
 {
         struct lov_obd *lov = &exp->exp_obd->u.lov;
         struct lov_request_set *set;
         int i, rc = 0;
-        struct lov_oinfo *loi;
         ENTRY;
 
         OBD_ALLOC(set, sizeof(*set));
@@ -385,54 +400,53 @@ int lov_prep_match_set(struct obd_export *exp, struct lov_stripe_md *lsm,
         lov_init_set(set);
 
         set->set_exp = exp;
-        set->set_md = lsm;
+        set->set_oi = oinfo;
+        set->set_oi->oi_md = lsm;
         set->set_lockh = lov_llh_new(lsm);
         if (set->set_lockh == NULL)
                 GOTO(out_set, rc = -ENOMEM);
         lockh->cookie = set->set_lockh->llh_handle.h_cookie;
 
-        loi = lsm->lsm_oinfo;
-        for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
-                struct lov_tgt_desc *tgt;
+        for (i = 0; i < lsm->lsm_stripe_count; i++){
+                struct lov_oinfo *loi;
                 struct lov_request *req;
                 obd_off start, end;
 
+                loi = lsm->lsm_oinfo[i];
                 if (!lov_stripe_intersects(lsm, i, policy->l_extent.start,
                                            policy->l_extent.end, &start, &end))
                         continue;
 
                 /* FIXME raid1 should grace this error */
-                tgt = lov->tgts + loi->loi_ost_idx;
-                if (!lov_tgt_ready(lov, tgt, loi->loi_ost_gen)) {
+                if (!lov->lov_tgts[loi->loi_ost_idx] ||
+                    !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
                         GOTO(out_set, rc = -EIO);
                 }
 
                 OBD_ALLOC(req, sizeof(*req));
-                if (req == NULL) {
-                        lov_tgt_decref(lov, tgt);
+                if (req == NULL)
                         GOTO(out_set, rc = -ENOMEM);
-                }
 
-                req->rq_buflen = sizeof(*req->rq_md);
-                OBD_ALLOC(req->rq_md, req->rq_buflen);
-                if (req->rq_md == NULL) {
+                req->rq_buflen = sizeof(*req->rq_oi.oi_md);
+                OBD_ALLOC_LARGE(req->rq_oi.oi_md, req->rq_buflen);
+                if (req->rq_oi.oi_md == NULL) {
                         OBD_FREE(req, sizeof(*req));
-                        lov_tgt_decref(lov, tgt);
                         GOTO(out_set, rc = -ENOMEM);
                 }
 
-                req->rq_extent.start = start;
-                req->rq_extent.end = end;
+                req->rq_oi.oi_policy.l_extent.start = start;
+                req->rq_oi.oi_policy.l_extent.end = end;
+                req->rq_oi.oi_policy.l_extent.gid = policy->l_extent.gid;
 
                 req->rq_idx = loi->loi_ost_idx;
-                req->rq_gen = loi->loi_ost_gen;
                 req->rq_stripe = i;
 
                 /* XXX LOV STACKING: submd should be from the subobj */
-                req->rq_md->lsm_object_id = loi->loi_id;
-               req->rq_md->lsm_object_gr = lsm->lsm_object_gr;
-                req->rq_md->lsm_stripe_count = 0;
+                req->rq_oi.oi_md->lsm_object_id = loi->loi_id;
+                req->rq_oi.oi_md->lsm_object_seq = loi->loi_seq;
+                req->rq_oi.oi_md->lsm_stripe_count = 0;
+
                 lov_set_add_req(req, set);
         }
         if (!set->set_count)
@@ -449,27 +463,25 @@ int lov_fini_cancel_set(struct lov_request_set *set)
         int rc = 0;
         ENTRY;
 
-        LASSERT(set->set_exp);
         if (set == NULL)
                 RETURN(0);
 
+        LASSERT(set->set_exp);
         if (set->set_lockh)
                 lov_llh_put(set->set_lockh);
 
-        if (atomic_dec_and_test(&set->set_refcount))
-                lov_finish_set(set);
+        lov_put_reqset(set);
 
         RETURN(rc);
 }
 
-int lov_prep_cancel_set(struct obd_export *exp, struct lov_stripe_md *lsm,
-                        __u32 mode, struct lustre_handle *lockh,
+int lov_prep_cancel_set(struct obd_export *exp, struct obd_info *oinfo,
+                        struct lov_stripe_md *lsm, __u32 mode,
+                        struct lustre_handle *lockh,
                         struct lov_request_set **reqset)
 {
-        struct lov_obd *lov = &exp->exp_obd->u.lov;
         struct lov_request_set *set;
         int i, rc = 0;
-        struct lov_oinfo *loi;
         ENTRY;
 
         OBD_ALLOC(set, sizeof(*set));
@@ -478,7 +490,8 @@ int lov_prep_cancel_set(struct obd_export *exp, struct lov_stripe_md *lsm,
         lov_init_set(set);
 
         set->set_exp = exp;
-        set->set_md = lsm;
+        set->set_oi = oinfo;
+        set->set_oi->oi_md = lsm;
         set->set_lockh = lov_handle2llh(lockh);
         if (set->set_lockh == NULL) {
                 CERROR("LOV: invalid lov lock handle %p\n", lockh);
@@ -486,48 +499,37 @@ int lov_prep_cancel_set(struct obd_export *exp, struct lov_stripe_md *lsm,
         }
         lockh->cookie = set->set_lockh->llh_handle.h_cookie;
 
-        loi = lsm->lsm_oinfo;
-        for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
-                struct lov_tgt_desc *tgt;
+        for (i = 0; i < lsm->lsm_stripe_count; i++){
                 struct lov_request *req;
                 struct lustre_handle *lov_lockhp;
+                struct lov_oinfo *loi = lsm->lsm_oinfo[i];
 
                 lov_lockhp = set->set_lockh->llh_handles + i;
-                if (lov_lockhp->cookie == 0) {
-                        CDEBUG(D_HA, "lov idx %d subobj "LPX64" no lock?\n",
-                               loi->loi_ost_idx, loi->loi_id);
-                        continue;
-                }
-
-                tgt = lov->tgts + loi->loi_ost_idx;
-                if (!lov_tgt_ready(lov, tgt, loi->loi_ost_gen)) {
-                        CERROR("lov idx %d subobj "LPX64" osc inactive?\n",
+                if (!lustre_handle_is_used(lov_lockhp)) {
+                        CDEBUG(D_INFO, "lov idx %d subobj "LPX64" no lock\n",
                                loi->loi_ost_idx, loi->loi_id);
                         continue;
                 }
 
                 OBD_ALLOC(req, sizeof(*req));
-                if (req == NULL) {
-                        lov_tgt_decref(lov, tgt);
+                if (req == NULL)
                         GOTO(out_set, rc = -ENOMEM);
-                }
 
-                req->rq_buflen = sizeof(*req->rq_md);
-                OBD_ALLOC(req->rq_md, req->rq_buflen);
-                if (req->rq_md == NULL) {
+                req->rq_buflen = sizeof(*req->rq_oi.oi_md);
+                OBD_ALLOC_LARGE(req->rq_oi.oi_md, req->rq_buflen);
+                if (req->rq_oi.oi_md == NULL) {
                         OBD_FREE(req, sizeof(*req));
-                        lov_tgt_decref(lov, tgt);
                         GOTO(out_set, rc = -ENOMEM);
                 }
 
                 req->rq_idx = loi->loi_ost_idx;
-                req->rq_gen = loi->loi_ost_gen;
                 req->rq_stripe = i;
 
                 /* XXX LOV STACKING: submd should be from the subobj */
-                req->rq_md->lsm_object_id = loi->loi_id;
-               req->rq_md->lsm_object_gr = lsm->lsm_object_gr;
-                req->rq_md->lsm_stripe_count = 0;
+                req->rq_oi.oi_md->lsm_object_id = loi->loi_id;
+                req->rq_oi.oi_md->lsm_object_seq = loi->loi_seq;
+                req->rq_oi.oi_md->lsm_stripe_count = 0;
+
                 lov_set_add_req(req, set);
         }
         if (!set->set_count)
@@ -539,38 +541,104 @@ out_set:
         RETURN(rc);
 }
 
+static int lov_update_create_set(struct lov_request_set *set,
+                                 struct lov_request *req, int rc)
+{
+        struct obd_trans_info *oti = set->set_oti;
+        struct lov_stripe_md *lsm = set->set_oi->oi_md;
+        struct lov_oinfo *loi;
+        struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
+        ENTRY;
+
+        if (rc && lov->lov_tgts[req->rq_idx] &&
+            lov->lov_tgts[req->rq_idx]->ltd_active) {
+                CERROR("error creating fid "LPX64" sub-object"
+                       " on OST idx %d/%d: rc = %d\n",
+                       set->set_oi->oi_oa->o_id, req->rq_idx,
+                       lsm->lsm_stripe_count, rc);
+                if (rc > 0) {
+                        CERROR("obd_create returned invalid err %d\n", rc);
+                        rc = -EIO;
+                }
+        }
+
+        cfs_spin_lock(&set->set_lock);
+        req->rq_stripe = cfs_atomic_read(&set->set_success);
+        loi = lsm->lsm_oinfo[req->rq_stripe];
+
+
+        if (rc) {
+                lov_update_set(set, req, rc);
+                cfs_spin_unlock(&set->set_lock);
+                RETURN(rc);
+        }
+
+        loi->loi_id = req->rq_oi.oi_oa->o_id;
+        loi->loi_seq = req->rq_oi.oi_oa->o_seq;
+        loi->loi_ost_idx = req->rq_idx;
+        loi_init(loi);
+
+        if (oti && set->set_cookies)
+                ++oti->oti_logcookies;
+        if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLCOOKIE)
+                set->set_cookie_sent++;
+
+        lov_update_set(set, req, rc);
+        cfs_spin_unlock(&set->set_lock);
+
+        CDEBUG(D_INODE, "objid "LPX64" has subobj "LPX64"/"LPU64" at idx %d\n",
+               lsm->lsm_object_id, loi->loi_id, loi->loi_id, req->rq_idx);
+        RETURN(rc);
+}
+
 static int create_done(struct obd_export *exp, struct lov_request_set *set,
-                       struct lov_stripe_md **ea)
+                       struct lov_stripe_md **lsmp)
 {
         struct lov_obd *lov = &exp->exp_obd->u.lov;
         struct obd_trans_info *oti = set->set_oti;
-        struct obdo *src_oa = set->set_oa;
-        struct list_head *pos;
+        struct obdo *src_oa = set->set_oi->oi_oa;
         struct lov_request *req;
         struct obdo *ret_oa = NULL;
-        int attrset = 0, rc = 0;
+        int success, attrset = 0, rc = 0;
         ENTRY;
 
-        LASSERT(set->set_completes);
+        LASSERT(cfs_atomic_read(&set->set_completes));
 
-        if (!set->set_success)
-                GOTO(cleanup, rc = -EIO);
+        /* try alloc objects on other osts if osc_create fails for
+         * exceptions: RPC failure, ENOSPC, etc */
+        if (set->set_count != cfs_atomic_read(&set->set_success)) {
+                cfs_list_for_each_entry (req, &set->set_list, rq_link) {
+                        if (req->rq_rc == 0)
+                                continue;
 
-        if (*ea == NULL && set->set_count != set->set_success) {
-                set->set_count = set->set_success;
+                        cfs_atomic_dec(&set->set_completes);
+                        req->rq_complete = 0;
+
+                        rc = qos_remedy_create(set, req);
+                        lov_update_create_set(set, req, rc);
+                }
+        }
+
+        success = cfs_atomic_read(&set->set_success);
+        /* no successful creates */
+        if (success == 0)
+                GOTO(cleanup, rc);
+
+        if (set->set_count != success) {
+                set->set_count = success;
                 qos_shrink_lsm(set);
         }
 
-        ret_oa = obdo_alloc();
+        OBDO_ALLOC(ret_oa);
         if (ret_oa == NULL)
                 GOTO(cleanup, rc = -ENOMEM);
 
-        list_for_each (pos, &set->set_list) {
-                req = list_entry(pos, struct lov_request, rq_link);
+        cfs_list_for_each_entry(req, &set->set_list, rq_link) {
                 if (!req->rq_complete || req->rq_rc)
                         continue;
-                lov_merge_attrs(ret_oa, req->rq_oa, req->rq_oa->o_valid,
-                                set->set_md, req->rq_stripe, &attrset);
+                lov_merge_attrs(ret_oa, req->rq_oi.oi_oa,
+                                req->rq_oi.oi_oa->o_valid, set->set_oi->oi_md,
+                                req->rq_stripe, &attrset);
         }
         if (src_oa->o_valid & OBD_MD_FLSIZE &&
             ret_oa->o_size != src_oa->o_size) {
@@ -579,43 +647,33 @@ static int create_done(struct obd_export *exp, struct lov_request_set *set,
                 LBUG();
         }
         ret_oa->o_id = src_oa->o_id;
-        ret_oa->o_gr = src_oa->o_gr;
-        ret_oa->o_valid |= OBD_MD_FLGROUP;
+        ret_oa->o_seq = src_oa->o_seq;
+        ret_oa->o_valid |= OBD_MD_FLID | OBD_MD_FLGROUP;
         memcpy(src_oa, ret_oa, sizeof(*src_oa));
-        obdo_free(ret_oa);
+        OBDO_FREE(ret_oa);
 
-        *ea = set->set_md;
+        *lsmp = set->set_oi->oi_md;
         GOTO(done, rc = 0);
 
-        EXIT;
 cleanup:
-        list_for_each (pos, &set->set_list) {
-                struct lov_tgt_desc *tgt;
+        cfs_list_for_each_entry(req, &set->set_list, rq_link) {
+                struct obd_export *sub_exp;
                 int err = 0;
-                req = list_entry(pos, struct lov_request, rq_link);
 
                 if (!req->rq_complete || req->rq_rc)
                         continue;
 
-                tgt = lov->tgts + req->rq_idx;
-                if (!lov_tgt_ready(lov, tgt, req->rq_gen)) {
-                        CERROR("Failed to uncreate objid "LPX64" subobj "
-                               LPX64" on OST idx %d: osc inactive.\n",
-                               set->set_oa->o_id, req->rq_oa->o_id,
-                               req->rq_idx);
-                        continue;
-                }
-
-                err = obd_destroy(tgt->ltd_exp, req->rq_oa, NULL, oti);
-                lov_tgt_decref(lov, tgt);
+                sub_exp = lov->lov_tgts[req->rq_idx]->ltd_exp;
+                err = obd_destroy(sub_exp, req->rq_oi.oi_oa, NULL, oti, NULL,
+                                  NULL);
                 if (err)
                         CERROR("Failed to uncreate objid "LPX64" subobj "
                                LPX64" on OST idx %d: rc = %d\n",
-                               set->set_oa->o_id, req->rq_oa->o_id,
+                               src_oa->o_id, req->rq_oi.oi_oa->o_id,
                                req->rq_idx, rc);
         }
-        if (*ea == NULL)
-                obd_free_memmd(exp, &set->set_md);
+        if (*lsmp == NULL)
+                obd_free_memmd(exp, &set->set_oi->oi_md);
 done:
         if (oti && set->set_cookies) {
                 oti->oti_logcookies = set->set_cookies;
@@ -626,85 +684,48 @@ done:
                         src_oa->o_valid |= OBD_MD_FLCOOKIE;
                 }
         }
-        return rc;
+        RETURN(rc);
 }
 
-int lov_fini_create_set(struct lov_request_set *set, struct lov_stripe_md **ea)
+int lov_fini_create_set(struct lov_request_set *set,struct lov_stripe_md **lsmp)
 {
         int rc = 0;
         ENTRY;
 
-        LASSERT(set->set_exp);
         if (set == NULL)
                 RETURN(0);
-        if (set->set_completes) {
-                rc = create_done(set->set_exp, set, ea);
-                /* FIXME update qos data here */
-        }
-
-        if (atomic_dec_and_test(&set->set_refcount))
-                lov_finish_set(set);
+        LASSERT(set->set_exp);
+        if (cfs_atomic_read(&set->set_completes))
+                rc = create_done(set->set_exp, set, lsmp);
 
+        lov_put_reqset(set);
         RETURN(rc);
 }
 
-int lov_update_create_set(struct lov_request_set *set,
-                          struct lov_request *req, int rc)
+int cb_create_update(void *cookie, int rc)
 {
-        struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
-        struct obd_trans_info *oti = set->set_oti;
-        struct lov_stripe_md *lsm = set->set_md;
-        struct lov_oinfo *loi;
-        struct lov_tgt_desc *tgt;
-        ENTRY;
+        struct obd_info *oinfo = cookie;
+        struct lov_request *lovreq;
 
-        req->rq_stripe = set->set_success;
-        loi = &lsm->lsm_oinfo[req->rq_stripe];
-        tgt = lov->tgts + req->rq_idx;
+        lovreq = container_of(oinfo, struct lov_request, rq_oi);
 
-        if (rc && lov_tgt_ready(lov, tgt, req->rq_gen)) {
-                lov_tgt_decref(lov, tgt);
-                CERROR("error creating objid "LPX64" sub-object"
-                       " on OST idx %d/%d: rc = %d\n",
-                       set->set_oa->o_id, req->rq_idx,
-                       lsm->lsm_stripe_count, rc);
-                if (rc > 0) {
-                        CERROR("obd_create returned invalid err %d\n", rc);
-                        rc = -EIO;
-                }
-        }
-        lov_update_set(set, req, rc);
-        if (rc)
-                RETURN(rc);
-
-        if (oti && oti->oti_objid)
-                oti->oti_objid[req->rq_idx] = req->rq_oa->o_id;
+        if (CFS_FAIL_CHECK(OBD_FAIL_MDS_OSC_CREATE_FAIL))
+                if (lovreq->rq_idx == cfs_fail_val)
+                        rc = -ENOTCONN;
 
-        loi->loi_id = req->rq_oa->o_id;
-        loi->loi_gr = req->rq_oa->o_gr;
-        loi->loi_ost_idx = req->rq_idx;
-        loi->loi_ost_gen = req->rq_gen;
-        CDEBUG(D_INODE, "objid "LPX64" has subobj "LPX64"/"LPX64" at "
-               "idx %d gen %d\n",
-               lsm->lsm_object_id, loi->loi_id, loi->loi_id,
-               req->rq_idx, req->rq_gen);
-        loi_init(loi);
-
-        if (set->set_cookies)
-                ++oti->oti_logcookies;
-        if (req->rq_oa->o_valid & OBD_MD_FLCOOKIE)
-                set->set_cookie_sent++;
-
-        RETURN(0);
+        rc= lov_update_create_set(lovreq->rq_rqset, lovreq, rc);
+        if (lov_finished_set(lovreq->rq_rqset))
+                lov_put_reqset(lovreq->rq_rqset);
+        return rc;
 }
 
-int lov_prep_create_set(struct obd_export *exp, struct lov_stripe_md **ea,
-                        struct obdo *src_oa, struct obd_trans_info *oti,
+int lov_prep_create_set(struct obd_export *exp, struct obd_info *oinfo,
+                        struct lov_stripe_md **lsmp, struct obdo *src_oa,
+                        struct obd_trans_info *oti,
                         struct lov_request_set **reqset)
 {
-        struct lov_obd *lov = &exp->exp_obd->u.lov;
         struct lov_request_set *set;
-        int rc = 0, newea = 0;
+        int rc = 0;
         ENTRY;
 
         OBD_ALLOC(set, sizeof(*set));
@@ -713,118 +734,96 @@ int lov_prep_create_set(struct obd_export *exp, struct lov_stripe_md **ea,
         lov_init_set(set);
 
         set->set_exp = exp;
-        set->set_md = *ea;
-        set->set_oa = src_oa;
+        set->set_oi = oinfo;
+        set->set_oi->oi_md = *lsmp;
+        set->set_oi->oi_oa = src_oa;
         set->set_oti = oti;
+        lov_get_reqset(set);
 
-        if (set->set_md == NULL) {
-                int stripes, stripe_cnt;
-                stripe_cnt = lov_get_stripecnt(lov, 0);
-
-                /* If the MDS file was truncated up to some size, stripe over
-                 * enough OSTs to allow the file to be created at that size. */
-                if (src_oa->o_valid & OBD_MD_FLSIZE) {
-                        stripes = ((src_oa->o_size + LUSTRE_STRIPE_MAXBYTES) >> 12) - 1;
-                        do_div(stripes, (__u32)(LUSTRE_STRIPE_MAXBYTES >> 12));
-
-                        if (stripes > lov->desc.ld_active_tgt_count)
-                                GOTO(out_set, rc = -EFBIG);
-                        if (stripes < stripe_cnt)
-                                stripes = stripe_cnt;
-                } else {
-                        stripes = stripe_cnt;
-                }
-
-                rc = lov_alloc_memmd(&set->set_md, stripes,
-                                     lov->desc.ld_pattern ?
-                                     lov->desc.ld_pattern : LOV_PATTERN_RAID0);
-                if (rc < 0)
-                        goto out_set;
-                newea = 1;
-        }
-
-        rc = qos_prep_create(lov, set, newea);
-        if (rc)
-                goto out_lsm;
-
-        if (oti && (src_oa->o_valid & OBD_MD_FLCOOKIE)) {
-                oti_alloc_cookies(oti, set->set_count);
-                if (!oti->oti_logcookies)
-                        goto out_lsm;
-                set->set_cookies = oti->oti_logcookies;
+        rc = qos_prep_create(exp, set);
+        /* qos_shrink_lsm() may have allocated a new lsm */
+        *lsmp = oinfo->oi_md;
+        if (rc) {
+                lov_fini_create_set(set, lsmp);
+                lov_put_reqset(set);
+        } else {
+                *reqset = set;
         }
-        *reqset = set;
-        RETURN(rc);
-
-out_lsm:
-        if (*ea == NULL)
-                obd_free_memmd(exp, &set->set_md);
-out_set:
-        lov_fini_create_set(set, ea);
         RETURN(rc);
 }
 
 static int common_attr_done(struct lov_request_set *set)
 {
-        struct list_head *pos;
+        cfs_list_t *pos;
         struct lov_request *req;
         struct obdo *tmp_oa;
         int rc = 0, attrset = 0;
         ENTRY;
 
-        if (set->set_oa == NULL)
+        LASSERT(set->set_oi != NULL);
+
+        if (set->set_oi->oi_oa == NULL)
                 RETURN(0);
 
-        if (!set->set_success)
+        if (!cfs_atomic_read(&set->set_success))
                 RETURN(-EIO);
 
-        tmp_oa = obdo_alloc();
+        OBDO_ALLOC(tmp_oa);
         if (tmp_oa == NULL)
                 GOTO(out, rc = -ENOMEM);
 
-        list_for_each (pos, &set->set_list) {
-                req = list_entry(pos, struct lov_request, rq_link);
+        cfs_list_for_each (pos, &set->set_list) {
+                req = cfs_list_entry(pos, struct lov_request, rq_link);
 
                 if (!req->rq_complete || req->rq_rc)
                         continue;
-                if (req->rq_oa->o_valid == 0)   /* inactive stripe */
+                if (req->rq_oi.oi_oa->o_valid == 0)   /* inactive stripe */
                         continue;
-                lov_merge_attrs(tmp_oa, req->rq_oa, req->rq_oa->o_valid,
-                                set->set_md, req->rq_stripe, &attrset);
+                lov_merge_attrs(tmp_oa, req->rq_oi.oi_oa,
+                                req->rq_oi.oi_oa->o_valid,
+                                set->set_oi->oi_md, req->rq_stripe, &attrset);
         }
         if (!attrset) {
                 CERROR("No stripes had valid attrs\n");
                 rc = -EIO;
         }
-        tmp_oa->o_id = set->set_oa->o_id;
-        memcpy(set->set_oa, tmp_oa, sizeof(*set->set_oa));
+        if ((set->set_oi->oi_oa->o_valid & OBD_MD_FLEPOCH) &&
+            (set->set_oi->oi_md->lsm_stripe_count != attrset)) {
+                /* When we take attributes of some epoch, we require all the
+                 * ost to be active. */
+                CERROR("Not all the stripes had valid attrs\n");
+                GOTO(out, rc = -EIO);
+        }
+
+        tmp_oa->o_id = set->set_oi->oi_oa->o_id;
+        memcpy(set->set_oi->oi_oa, tmp_oa, sizeof(*set->set_oi->oi_oa));
 out:
         if (tmp_oa)
-                obdo_free(tmp_oa);
+                OBDO_FREE(tmp_oa);
         RETURN(rc);
 
 }
 
 static int brw_done(struct lov_request_set *set)
 {
-        struct lov_stripe_md *lsm = set->set_md;
+        struct lov_stripe_md *lsm = set->set_oi->oi_md;
         struct lov_oinfo     *loi = NULL;
-        struct list_head *pos;
+        cfs_list_t *pos;
         struct lov_request *req;
         ENTRY;
-                                                                                                                             
-        list_for_each (pos, &set->set_list) {
-                req = list_entry(pos, struct lov_request, rq_link);
-                                                                                                                             
+
+        cfs_list_for_each (pos, &set->set_list) {
+                req = cfs_list_entry(pos, struct lov_request, rq_link);
+
                 if (!req->rq_complete || req->rq_rc)
                         continue;
-                                                                                                                             
-                loi = &lsm->lsm_oinfo[req->rq_stripe];
-                                                                                                                             
-                if (req->rq_oa->o_valid & OBD_MD_FLBLOCKS)
-                        loi->loi_blocks = req->rq_oa->o_blocks;
+
+                loi = lsm->lsm_oinfo[req->rq_stripe];
+
+                if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLBLOCKS)
+                        loi->loi_lvb.lvb_blocks = req->rq_oi.oi_oa->o_blocks;
         }
-                                                                                                                             
+
         RETURN(0);
 }
 
@@ -833,22 +832,21 @@ int lov_fini_brw_set(struct lov_request_set *set)
         int rc = 0;
         ENTRY;
 
-        LASSERT(set->set_exp);
         if (set == NULL)
                 RETURN(0);
-        if (set->set_completes) {
+        LASSERT(set->set_exp);
+        if (cfs_atomic_read(&set->set_completes)) {
                 rc = brw_done(set);
                 /* FIXME update qos data here */
         }
-        if (atomic_dec_and_test(&set->set_refcount))
-                lov_finish_set(set);
+        lov_put_reqset(set);
 
         RETURN(rc);
 }
 
-int lov_prep_brw_set(struct obd_export *exp, struct obdo *src_oa,
-                     struct lov_stripe_md *lsm, obd_count oa_bufs,
-                     struct brw_page *pga, struct obd_trans_info *oti,
+int lov_prep_brw_set(struct obd_export *exp, struct obd_info *oinfo,
+                     obd_count oa_bufs, struct brw_page *pga,
+                     struct obd_trans_info *oti,
                      struct lov_request_set **reqset)
 {
         struct {
@@ -857,7 +855,6 @@ int lov_prep_brw_set(struct obd_export *exp, struct obdo *src_oa,
                 obd_count       off;
         } *info = NULL;
         struct lov_request_set *set;
-        struct lov_oinfo *loi = NULL;
         struct lov_obd *lov = &exp->exp_obd->u.lov;
         int rc = 0, i, shift;
         ENTRY;
@@ -868,78 +865,80 @@ int lov_prep_brw_set(struct obd_export *exp, struct obdo *src_oa,
         lov_init_set(set);
 
         set->set_exp = exp;
-        set->set_md = lsm;
-        set->set_oa = src_oa;
         set->set_oti = oti;
+        set->set_oi = oinfo;
         set->set_oabufs = oa_bufs;
-        OBD_ALLOC(set->set_pga, oa_bufs * sizeof(*set->set_pga));
+        OBD_ALLOC_LARGE(set->set_pga, oa_bufs * sizeof(*set->set_pga));
         if (!set->set_pga)
                 GOTO(out, rc = -ENOMEM);
 
-        OBD_ALLOC(info, sizeof(*info) * lsm->lsm_stripe_count);
+        OBD_ALLOC_LARGE(info, sizeof(*info) * oinfo->oi_md->lsm_stripe_count);
         if (!info)
                 GOTO(out, rc = -ENOMEM);
 
         /* calculate the page count for each stripe */
         for (i = 0; i < oa_bufs; i++) {
-                int stripe = lov_stripe_number(lsm, pga[i].disk_offset);
+                int stripe = lov_stripe_number(oinfo->oi_md, pga[i].off);
                 info[stripe].count++;
         }
 
         /* alloc and initialize lov request */
-        loi = lsm->lsm_oinfo;
         shift = 0;
-        for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
+        for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++){
+                struct lov_oinfo *loi = NULL;
                 struct lov_request *req;
-                struct lov_tgt_desc *tgt;
 
                 if (info[i].count == 0)
                         continue;
 
-                tgt = lov->tgts + loi->loi_ost_idx;
-                if (!lov_tgt_ready(lov, tgt, loi->loi_ost_gen)) {
+                loi = oinfo->oi_md->lsm_oinfo[i];
+                if (!lov->lov_tgts[loi->loi_ost_idx] ||
+                    !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
                         GOTO(out, rc = -EIO);
                 }
 
                 OBD_ALLOC(req, sizeof(*req));
-                if (req == NULL) {
-                        lov_tgt_decref(lov, tgt);
+                if (req == NULL)
                         GOTO(out, rc = -ENOMEM);
-                }
 
-                req->rq_oa = obdo_alloc();
-                if (req->rq_oa == NULL) {
+                OBDO_ALLOC(req->rq_oi.oi_oa);
+                if (req->rq_oi.oi_oa == NULL) {
                         OBD_FREE(req, sizeof(*req));
-                        lov_tgt_decref(lov, tgt);
                         GOTO(out, rc = -ENOMEM);
                 }
 
-                if (src_oa)
-                        memcpy(req->rq_oa, src_oa, sizeof(*req->rq_oa));
-                req->rq_oa->o_id = loi->loi_id;
-                req->rq_buflen = sizeof(*req->rq_md);
-                OBD_ALLOC(req->rq_md, req->rq_buflen);
-                if (req->rq_md == NULL) {
-                        obdo_free(req->rq_oa);
+                if (oinfo->oi_oa) {
+                        memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
+                               sizeof(*req->rq_oi.oi_oa));
+                }
+                req->rq_oi.oi_oa->o_id = loi->loi_id;
+                req->rq_oi.oi_oa->o_seq = loi->loi_seq;
+                req->rq_oi.oi_oa->o_stripe_idx = i;
+
+                req->rq_buflen = sizeof(*req->rq_oi.oi_md);
+                OBD_ALLOC_LARGE(req->rq_oi.oi_md, req->rq_buflen);
+                if (req->rq_oi.oi_md == NULL) {
+                        OBDO_FREE(req->rq_oi.oi_oa);
                         OBD_FREE(req, sizeof(*req));
-                        lov_tgt_decref(lov, tgt);
                         GOTO(out, rc = -ENOMEM);
                 }
 
                 req->rq_idx = loi->loi_ost_idx;
-                req->rq_gen = loi->loi_ost_gen;
                 req->rq_stripe = i;
 
                 /* XXX LOV STACKING */
-                req->rq_md->lsm_object_id = loi->loi_id;
-                req->rq_md->lsm_object_gr = lsm->lsm_object_gr;
+                req->rq_oi.oi_md->lsm_object_id = loi->loi_id;
+                req->rq_oi.oi_md->lsm_object_seq = loi->loi_seq;
                 req->rq_oabufs = info[i].count;
                 req->rq_pgaidx = shift;
                 shift += req->rq_oabufs;
 
                 /* remember the index for sort brw_page array */
                 info[i].index = req->rq_pgaidx;
+
+                req->rq_oi.oi_capa = oinfo->oi_capa;
+
                 lov_set_add_req(req, set);
         }
         if (!set->set_count)
@@ -947,18 +946,19 @@ int lov_prep_brw_set(struct obd_export *exp, struct obdo *src_oa,
 
         /* rotate & sort the brw_page array */
         for (i = 0; i < oa_bufs; i++) {
-                int stripe = lov_stripe_number(lsm, pga[i].disk_offset);
+                int stripe = lov_stripe_number(oinfo->oi_md, pga[i].off);
 
                 shift = info[stripe].index + info[stripe].off;
                 LASSERT(shift < oa_bufs);
                 set->set_pga[shift] = pga[i];
-                lov_stripe_offset(lsm, pga[i].disk_offset, stripe,
-                                  &set->set_pga[shift].disk_offset);
+                lov_stripe_offset(oinfo->oi_md, pga[i].off, stripe,
+                                  &set->set_pga[shift].off);
                 info[stripe].off++;
         }
 out:
         if (info)
-                OBD_FREE(info, sizeof(*info) * lsm->lsm_stripe_count);
+                OBD_FREE_LARGE(info,
+                               sizeof(*info) * oinfo->oi_md->lsm_stripe_count);
 
         if (rc == 0)
                 *reqset = set;
@@ -968,34 +968,36 @@ out:
         RETURN(rc);
 }
 
-static int getattr_done(struct lov_request_set *set)
-{
-        return common_attr_done(set);
-}
-
 int lov_fini_getattr_set(struct lov_request_set *set)
 {
         int rc = 0;
         ENTRY;
 
-        LASSERT(set->set_exp);
         if (set == NULL)
                 RETURN(0);
-        if (set->set_completes)
-                rc = getattr_done(set);
+        LASSERT(set->set_exp);
+        if (cfs_atomic_read(&set->set_completes))
+                rc = common_attr_done(set);
 
-        if (atomic_dec_and_test(&set->set_refcount))
-                lov_finish_set(set);
+        lov_put_reqset(set);
 
         RETURN(rc);
 }
 
-int lov_prep_getattr_set(struct obd_export *exp, struct obdo *src_oa,
-                         struct lov_stripe_md *lsm,
+/* The callback for osc_getattr_async that finilizes a request info when a
+ * response is received. */
+static int cb_getattr_update(void *cookie, int rc)
+{
+        struct obd_info *oinfo = cookie;
+        struct lov_request *lovreq;
+        lovreq = container_of(oinfo, struct lov_request, rq_oi);
+        return lov_update_common_set(lovreq->rq_rqset, lovreq, rc);
+}
+
+int lov_prep_getattr_set(struct obd_export *exp, struct obd_info *oinfo,
                          struct lov_request_set **reqset)
 {
         struct lov_request_set *set;
-        struct lov_oinfo *loi = NULL;
         struct lov_obd *lov = &exp->exp_obd->u.lov;
         int rc = 0, i;
         ENTRY;
@@ -1006,37 +1008,41 @@ int lov_prep_getattr_set(struct obd_export *exp, struct obdo *src_oa,
         lov_init_set(set);
 
         set->set_exp = exp;
-        set->set_md = lsm;
-        set->set_oa = src_oa;
+        set->set_oi = oinfo;
 
-        loi = lsm->lsm_oinfo;
-        for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
-                struct lov_tgt_desc *tgt = lov->tgts + loi->loi_ost_idx;
+        for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
+                struct lov_oinfo *loi;
                 struct lov_request *req;
 
-                if (!lov_tgt_active(lov, tgt, loi->loi_ost_gen)) {
+                loi = oinfo->oi_md->lsm_oinfo[i];
+                if (!lov->lov_tgts[loi->loi_ost_idx] ||
+                    !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
+                        if (oinfo->oi_oa->o_valid & OBD_MD_FLEPOCH)
+                                /* SOM requires all the OSTs to be active. */
+                                GOTO(out_set, rc = -EIO);
                         continue;
                 }
 
                 OBD_ALLOC(req, sizeof(*req));
-                if (req == NULL) {
-                        lov_tgt_decref(lov, tgt);
+                if (req == NULL)
                         GOTO(out_set, rc = -ENOMEM);
-                }
 
                 req->rq_stripe = i;
                 req->rq_idx = loi->loi_ost_idx;
-                req->rq_gen = loi->loi_ost_gen;
 
-                req->rq_oa = obdo_alloc();
-                if (req->rq_oa == NULL) {
+                OBDO_ALLOC(req->rq_oi.oi_oa);
+                if (req->rq_oi.oi_oa == NULL) {
                         OBD_FREE(req, sizeof(*req));
-                        lov_tgt_decref(lov, tgt);
                         GOTO(out_set, rc = -ENOMEM);
                 }
-                memcpy(req->rq_oa, src_oa, sizeof(*req->rq_oa));
-                req->rq_oa->o_id = loi->loi_id;
+                memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
+                       sizeof(*req->rq_oi.oi_oa));
+                req->rq_oi.oi_oa->o_id = loi->loi_id;
+                req->rq_oi.oi_oa->o_seq = loi->loi_seq;
+                req->rq_oi.oi_cb_up = cb_getattr_update;
+                req->rq_oi.oi_capa = oinfo->oi_capa;
+
                 lov_set_add_req(req, set);
         }
         if (!set->set_count)
@@ -1052,28 +1058,26 @@ int lov_fini_destroy_set(struct lov_request_set *set)
 {
         ENTRY;
 
-        LASSERT(set->set_exp);
         if (set == NULL)
                 RETURN(0);
-        if (set->set_completes) {
+        LASSERT(set->set_exp);
+        if (cfs_atomic_read(&set->set_completes)) {
                 /* FIXME update qos data here */
         }
 
-        if (atomic_dec_and_test(&set->set_refcount))
-                lov_finish_set(set);
+        lov_put_reqset(set);
 
         RETURN(0);
 }
 
-int lov_prep_destroy_set(struct obd_export *exp, struct obdo *src_oa,
-                         struct lov_stripe_md *lsm,
+int lov_prep_destroy_set(struct obd_export *exp, struct obd_info *oinfo,
+                         struct obdo *src_oa, struct lov_stripe_md *lsm,
                          struct obd_trans_info *oti,
                          struct lov_request_set **reqset)
 {
         struct lov_request_set *set;
-        struct lov_oinfo *loi = NULL;
         struct lov_obd *lov = &exp->exp_obd->u.lov;
-        int rc = 0, cookie_set = 0, i;
+        int rc = 0, i;
         ENTRY;
 
         OBD_ALLOC(set, sizeof(*set));
@@ -1082,47 +1086,39 @@ int lov_prep_destroy_set(struct obd_export *exp, struct obdo *src_oa,
         lov_init_set(set);
 
         set->set_exp = exp;
-        set->set_md = lsm;
-        set->set_oa = src_oa;
+        set->set_oi = oinfo;
+        set->set_oi->oi_md = lsm;
+        set->set_oi->oi_oa = src_oa;
         set->set_oti = oti;
         if (oti != NULL && src_oa->o_valid & OBD_MD_FLCOOKIE)
                 set->set_cookies = oti->oti_logcookies;
 
-        loi = lsm->lsm_oinfo;
-        for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
-                struct lov_tgt_desc *tgt = lov->tgts + loi->loi_ost_idx;
+        for (i = 0; i < lsm->lsm_stripe_count; i++) {
+                struct lov_oinfo *loi;
                 struct lov_request *req;
 
-                if (!lov_tgt_active(lov, tgt, loi->loi_ost_gen)) {
+                loi = lsm->lsm_oinfo[i];
+                if (!lov->lov_tgts[loi->loi_ost_idx] ||
+                    !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
                         continue;
                 }
 
                 OBD_ALLOC(req, sizeof(*req));
-                if (req == NULL) {
-                        lov_tgt_decref(lov, tgt);
+                if (req == NULL)
                         GOTO(out_set, rc = -ENOMEM);
-                }
 
                 req->rq_stripe = i;
                 req->rq_idx = loi->loi_ost_idx;
-                req->rq_gen = loi->loi_ost_gen;
 
-                req->rq_oa = obdo_alloc();
-                if (req->rq_oa == NULL) {
+                OBDO_ALLOC(req->rq_oi.oi_oa);
+                if (req->rq_oi.oi_oa == NULL) {
                         OBD_FREE(req, sizeof(*req));
-                        lov_tgt_decref(lov, tgt);
                         GOTO(out_set, rc = -ENOMEM);
                 }
-
-                memcpy(req->rq_oa, src_oa, sizeof(*req->rq_oa));
-                req->rq_oa->o_id = loi->loi_id;
-
-                /* Setup the first request's cookie position */
-                if (!cookie_set && set->set_cookies) {
-                        oti->oti_logcookies = set->set_cookies + i;
-                        cookie_set = 1;
-                }
+                memcpy(req->rq_oi.oi_oa, src_oa, sizeof(*req->rq_oi.oi_oa));
+                req->rq_oi.oi_oa->o_id = loi->loi_id;
+                req->rq_oi.oi_oa->o_seq = loi->loi_seq;
                 lov_set_add_req(req, set);
         }
         if (!set->set_count)
@@ -1134,35 +1130,67 @@ out_set:
         RETURN(rc);
 }
 
-static int setattr_done(struct lov_request_set *set)
-{
-        return common_attr_done(set);
-}
-
 int lov_fini_setattr_set(struct lov_request_set *set)
 {
         int rc = 0;
         ENTRY;
 
-        LASSERT(set->set_exp);
         if (set == NULL)
                 RETURN(0);
-        if (set->set_completes) {
-                rc = setattr_done(set);
+        LASSERT(set->set_exp);
+        if (cfs_atomic_read(&set->set_completes)) {
+                rc = common_attr_done(set);
                 /* FIXME update qos data here */
         }
 
-        if (atomic_dec_and_test(&set->set_refcount))
-                lov_finish_set(set);
+        lov_put_reqset(set);
         RETURN(rc);
 }
 
-int lov_prep_setattr_set(struct obd_export *exp, struct obdo *src_oa,
-                         struct lov_stripe_md *lsm, struct obd_trans_info *oti,
+int lov_update_setattr_set(struct lov_request_set *set,
+                           struct lov_request *req, int rc)
+{
+        struct lov_obd *lov = &req->rq_rqset->set_exp->exp_obd->u.lov;
+        struct lov_stripe_md *lsm = req->rq_rqset->set_oi->oi_md;
+        ENTRY;
+
+        lov_update_set(set, req, rc);
+
+        /* grace error on inactive ost */
+        if (rc && !(lov->lov_tgts[req->rq_idx] &&
+                    lov->lov_tgts[req->rq_idx]->ltd_active))
+                rc = 0;
+
+        if (rc == 0) {
+                if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLCTIME)
+                        lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_ctime =
+                                req->rq_oi.oi_oa->o_ctime;
+                if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLMTIME)
+                        lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_mtime =
+                                req->rq_oi.oi_oa->o_mtime;
+                if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLATIME)
+                        lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_atime =
+                                req->rq_oi.oi_oa->o_atime;
+        }
+
+        RETURN(rc);
+}
+
+/* The callback for osc_setattr_async that finilizes a request info when a
+ * response is received. */
+static int cb_setattr_update(void *cookie, int rc)
+{
+        struct obd_info *oinfo = cookie;
+        struct lov_request *lovreq;
+        lovreq = container_of(oinfo, struct lov_request, rq_oi);
+        return lov_update_setattr_set(lovreq->rq_rqset, lovreq, rc);
+}
+
+int lov_prep_setattr_set(struct obd_export *exp, struct obd_info *oinfo,
+                         struct obd_trans_info *oti,
                          struct lov_request_set **reqset)
 {
         struct lov_request_set *set;
-        struct lov_oinfo *loi = NULL;
         struct lov_obd *lov = &exp->exp_obd->u.lov;
         int rc = 0, i;
         ENTRY;
@@ -1173,47 +1201,51 @@ int lov_prep_setattr_set(struct obd_export *exp, struct obdo *src_oa,
         lov_init_set(set);
 
         set->set_exp = exp;
-        set->set_md = lsm;
-        set->set_oa = src_oa;
+        set->set_oti = oti;
+        set->set_oi = oinfo;
+        if (oti != NULL && oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)
+                set->set_cookies = oti->oti_logcookies;
 
-        loi = lsm->lsm_oinfo;
-        for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
-                struct lov_tgt_desc *tgt = lov->tgts + loi->loi_ost_idx;
+        for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
+                struct lov_oinfo *loi = oinfo->oi_md->lsm_oinfo[i];
                 struct lov_request *req;
 
-                if (!lov_tgt_active(lov, tgt, loi->loi_ost_gen)) {
+                if (!lov->lov_tgts[loi->loi_ost_idx] ||
+                    !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
                         continue;
                 }
 
                 OBD_ALLOC(req, sizeof(*req));
-                if (req == NULL) {
-                        lov_tgt_decref(lov, tgt);
+                if (req == NULL)
                         GOTO(out_set, rc = -ENOMEM);
-                }
-
                 req->rq_stripe = i;
                 req->rq_idx = loi->loi_ost_idx;
-                req->rq_gen = loi->loi_ost_gen;
 
-                req->rq_oa = obdo_alloc();
-                if (req->rq_oa == NULL) {
+                OBDO_ALLOC(req->rq_oi.oi_oa);
+                if (req->rq_oi.oi_oa == NULL) {
                         OBD_FREE(req, sizeof(*req));
-                        lov_tgt_decref(lov, tgt);
                         GOTO(out_set, rc = -ENOMEM);
                 }
+                memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
+                       sizeof(*req->rq_oi.oi_oa));
+                req->rq_oi.oi_oa->o_id = loi->loi_id;
+                req->rq_oi.oi_oa->o_seq= loi->loi_seq;
+                req->rq_oi.oi_oa->o_stripe_idx = i;
+                req->rq_oi.oi_cb_up = cb_setattr_update;
+                req->rq_oi.oi_capa = oinfo->oi_capa;
+
+                if (oinfo->oi_oa->o_valid & OBD_MD_FLSIZE) {
+                        int off = lov_stripe_offset(oinfo->oi_md,
+                                                    oinfo->oi_oa->o_size, i,
+                                                    &req->rq_oi.oi_oa->o_size);
+
+                        if (off < 0 && req->rq_oi.oi_oa->o_size)
+                                req->rq_oi.oi_oa->o_size--;
 
-                memcpy(req->rq_oa, src_oa, sizeof(*req->rq_oa));
-                req->rq_oa->o_id = loi->loi_id;
-                LASSERT(!(req->rq_oa->o_valid & OBD_MD_FLGROUP) || req->rq_oa->o_gr>0);
-
-                if (src_oa->o_valid & OBD_MD_FLSIZE) {
-                        if (lov_stripe_offset(lsm, src_oa->o_size, i,
-                                              &req->rq_oa->o_size) < 0 &&
-                            req->rq_oa->o_size)
-                                req->rq_oa->o_size--;
                         CDEBUG(D_INODE, "stripe %d has size "LPU64"/"LPU64"\n",
-                               i, req->rq_oa->o_size, src_oa->o_size);
+                               i, req->rq_oi.oi_oa->o_size,
+                               oinfo->oi_oa->o_size);
                 }
                 lov_set_add_req(req, set);
         }
@@ -1226,53 +1258,69 @@ out_set:
         RETURN(rc);
 }
 
-int lov_update_punch_set(struct lov_request_set *set, struct lov_request *req,
-                         int rc)
+int lov_fini_punch_set(struct lov_request_set *set)
 {
-        struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
+        int rc = 0;
         ENTRY;
 
-        lov_update_set(set, req, rc);
-        if (rc) {
-                struct lov_tgt_desc *tgt = lov->tgts + req->rq_idx;
-
-                if (lov_tgt_active(lov, tgt, req->rq_gen))
-                        lov_tgt_decref(lov, tgt);
-                else
-                        rc = 0;
+        if (set == NULL)
+                RETURN(0);
+        LASSERT(set->set_exp);
+        if (cfs_atomic_read(&set->set_completes)) {
+                rc = -EIO;
+                /* FIXME update qos data here */
+                if (cfs_atomic_read(&set->set_success))
+                        rc = common_attr_done(set);
         }
 
-        /* FIXME in raid1 regime, should return 0 */
+        lov_put_reqset(set);
+
         RETURN(rc);
 }
 
-int lov_fini_punch_set(struct lov_request_set *set)
+int lov_update_punch_set(struct lov_request_set *set,
+                         struct lov_request *req, int rc)
 {
-        int rc = 0;
+        struct lov_obd *lov = &req->rq_rqset->set_exp->exp_obd->u.lov;
+        struct lov_stripe_md *lsm = req->rq_rqset->set_oi->oi_md;
         ENTRY;
 
-        LASSERT(set->set_exp);
-        if (set == NULL)
-                RETURN(0);
-        if (set->set_completes) {
-                if (!set->set_success)
-                        rc = -EIO;
-                /* FIXME update qos data here */
-        }
+        lov_update_set(set, req, rc);
+
+        /* grace error on inactive ost */
+        if (rc && !lov->lov_tgts[req->rq_idx]->ltd_active)
+                rc = 0;
+
+        if (rc == 0) {
+                lov_stripe_lock(lsm);
+                if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLBLOCKS) {
+                        lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_blocks =
+                                req->rq_oi.oi_oa->o_blocks;
+                }
 
-        if (atomic_dec_and_test(&set->set_refcount))
-                lov_finish_set(set);
+                /* Do we need to update lvb_size here? It needn't because
+                 * it have been done in ll_truncate(). -jay */
+                lov_stripe_unlock(lsm);
+        }
 
         RETURN(rc);
 }
 
-int lov_prep_punch_set(struct obd_export *exp, struct obdo *src_oa,
-                       struct lov_stripe_md *lsm, obd_off start,
-                       obd_off end, struct obd_trans_info *oti,
+/* The callback for osc_punch that finilizes a request info when a response
+ * is received. */
+static int cb_update_punch(void *cookie, int rc)
+{
+        struct obd_info *oinfo = cookie;
+        struct lov_request *lovreq;
+        lovreq = container_of(oinfo, struct lov_request, rq_oi);
+        return lov_update_punch_set(lovreq->rq_rqset, lovreq, rc);
+}
+
+int lov_prep_punch_set(struct obd_export *exp, struct obd_info *oinfo,
+                       struct obd_trans_info *oti,
                        struct lov_request_set **reqset)
 {
         struct lov_request_set *set;
-        struct lov_oinfo *loi = NULL;
         struct lov_obd *lov = &exp->exp_obd->u.lov;
         int rc = 0, i;
         ENTRY;
@@ -1282,48 +1330,51 @@ int lov_prep_punch_set(struct obd_export *exp, struct obdo *src_oa,
                 RETURN(-ENOMEM);
         lov_init_set(set);
 
+        set->set_oi = oinfo;
         set->set_exp = exp;
-        set->set_md = lsm;
-        set->set_oa = src_oa;
 
-        loi = lsm->lsm_oinfo;
-        for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
-                struct lov_tgt_desc *tgt = lov->tgts + loi->loi_ost_idx;
+        for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
+                struct lov_oinfo *loi = oinfo->oi_md->lsm_oinfo[i];
                 struct lov_request *req;
                 obd_off rs, re;
 
-                if (!lov_stripe_intersects(lsm, i, start, end, &rs, &re))
+                if (!lov_stripe_intersects(oinfo->oi_md, i,
+                                           oinfo->oi_policy.l_extent.start,
+                                           oinfo->oi_policy.l_extent.end,
+                                           &rs, &re))
                         continue;
 
-                if (!lov_tgt_active(lov, tgt, loi->loi_ost_gen)) {
+                if (!lov->lov_tgts[loi->loi_ost_idx] ||
+                    !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
-                        continue;
+                        GOTO(out_set, rc = -EIO);
                 }
 
                 OBD_ALLOC(req, sizeof(*req));
-                if (req == NULL) {
-                        lov_tgt_decref(lov, tgt);
+                if (req == NULL)
                         GOTO(out_set, rc = -ENOMEM);
-                }
-
                 req->rq_stripe = i;
                 req->rq_idx = loi->loi_ost_idx;
-                req->rq_gen = loi->loi_ost_gen;
 
-                req->rq_oa = obdo_alloc();
-                if (req->rq_oa == NULL) {
+                OBDO_ALLOC(req->rq_oi.oi_oa);
+                if (req->rq_oi.oi_oa == NULL) {
                         OBD_FREE(req, sizeof(*req));
-                        lov_tgt_decref(lov, tgt);
                         GOTO(out_set, rc = -ENOMEM);
                 }
+                memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
+                       sizeof(*req->rq_oi.oi_oa));
+                req->rq_oi.oi_oa->o_id = loi->loi_id;
+                req->rq_oi.oi_oa->o_seq = loi->loi_seq;
+                req->rq_oi.oi_oa->o_valid |= OBD_MD_FLGROUP;
 
-                memcpy(req->rq_oa, src_oa, sizeof(*req->rq_oa));
-                req->rq_oa->o_id = loi->loi_id;
-                req->rq_oa->o_gr = loi->loi_gr;
-               req->rq_oa->o_valid |= OBD_MD_FLGROUP;
+                req->rq_oi.oi_oa->o_stripe_idx = i;
+                req->rq_oi.oi_cb_up = cb_update_punch;
 
-                req->rq_extent.start = rs;
-                req->rq_extent.end = re;
+                req->rq_oi.oi_policy.l_extent.start = rs;
+                req->rq_oi.oi_policy.l_extent.end = re;
+                req->rq_oi.oi_policy.l_extent.gid = -1;
+
+                req->rq_oi.oi_capa = oinfo->oi_capa;
 
                 lov_set_add_req(req, set);
         }
@@ -1341,74 +1392,84 @@ int lov_fini_sync_set(struct lov_request_set *set)
         int rc = 0;
         ENTRY;
 
-        LASSERT(set->set_exp);
         if (set == NULL)
                 RETURN(0);
-        if (set->set_completes) {
-                if (!set->set_success)
+        LASSERT(set->set_exp);
+        if (cfs_atomic_read(&set->set_completes)) {
+                if (!cfs_atomic_read(&set->set_success))
                         rc = -EIO;
                 /* FIXME update qos data here */
         }
 
-        if (atomic_dec_and_test(&set->set_refcount))
-                lov_finish_set(set);
+        lov_put_reqset(set);
 
         RETURN(rc);
 }
 
-int lov_prep_sync_set(struct obd_export *exp, struct obdo *src_oa,
-                      struct lov_stripe_md *lsm, obd_off start,
-                      obd_off end, struct lov_request_set **reqset)
+/* The callback for osc_sync that finilizes a request info when a
+ * response is recieved. */
+static int cb_sync_update(void *cookie, int rc)
+{
+        struct obd_info *oinfo = cookie;
+        struct lov_request *lovreq;
+
+        lovreq = container_of(oinfo, struct lov_request, rq_oi);
+        return lov_update_common_set(lovreq->rq_rqset, lovreq, rc);
+}
+
+int lov_prep_sync_set(struct obd_export *exp, struct obd_info *oinfo,
+                      obd_off start, obd_off end,
+                      struct lov_request_set **reqset)
 {
         struct lov_request_set *set;
-        struct lov_oinfo *loi = NULL;
         struct lov_obd *lov = &exp->exp_obd->u.lov;
         int rc = 0, i;
         ENTRY;
 
-        OBD_ALLOC(set, sizeof(*set));
+        OBD_ALLOC_PTR(set);
         if (set == NULL)
                 RETURN(-ENOMEM);
         lov_init_set(set);
 
         set->set_exp = exp;
-        set->set_md = lsm;
-        set->set_oa = src_oa;
+        set->set_oi = oinfo;
 
-        loi = lsm->lsm_oinfo;
-        for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
-                struct lov_tgt_desc *tgt = lov->tgts + loi->loi_ost_idx;
+        for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
+                struct lov_oinfo *loi = oinfo->oi_md->lsm_oinfo[i];
                 struct lov_request *req;
                 obd_off rs, re;
 
-                if (!lov_stripe_intersects(lsm, i, start, end, &rs, &re))
-                        continue;
-
-                if (!lov_tgt_active(lov, tgt, loi->loi_ost_gen)) {
+                if (!lov->lov_tgts[loi->loi_ost_idx] ||
+                    !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
                         continue;
                 }
 
-                OBD_ALLOC(req, sizeof(*req));
-                if (req == NULL) {
-                        lov_tgt_decref(lov, tgt);
-                        GOTO(out_set, rc = -ENOMEM);
-                }
+                if (!lov_stripe_intersects(oinfo->oi_md, i, start, end, &rs,
+                                           &re))
+                        continue;
 
+                OBD_ALLOC_PTR(req);
+                if (req == NULL)
+                        GOTO(out_set, rc = -ENOMEM);
                 req->rq_stripe = i;
                 req->rq_idx = loi->loi_ost_idx;
-                req->rq_gen = loi->loi_ost_gen;
 
-                req->rq_oa = obdo_alloc();
-                if (req->rq_oa == NULL) {
+                OBDO_ALLOC(req->rq_oi.oi_oa);
+                if (req->rq_oi.oi_oa == NULL) {
                         OBD_FREE(req, sizeof(*req));
-                        lov_tgt_decref(lov, tgt);
                         GOTO(out_set, rc = -ENOMEM);
                 }
-                memcpy(req->rq_oa, src_oa, sizeof(*req->rq_oa));
-                req->rq_oa->o_id = loi->loi_id;
-                req->rq_extent.start = rs;
-                req->rq_extent.end = re;
+                *req->rq_oi.oi_oa = *oinfo->oi_oa;
+                req->rq_oi.oi_oa->o_id = loi->loi_id;
+                req->rq_oi.oi_oa->o_seq = loi->loi_seq;
+                req->rq_oi.oi_oa->o_stripe_idx = i;
+
+                req->rq_oi.oi_policy.l_extent.start = rs;
+                req->rq_oi.oi_policy.l_extent.end = re;
+                req->rq_oi.oi_policy.l_extent.gid = -1;
+                req->rq_oi.oi_cb_up = cb_sync_update;
+
                 lov_set_add_req(req, set);
         }
         if (!set->set_count)
@@ -1419,3 +1480,232 @@ out_set:
         lov_fini_sync_set(set);
         RETURN(rc);
 }
+
+#define LOV_U64_MAX ((__u64)~0ULL)
+#define LOV_SUM_MAX(tot, add)                                           \
+        do {                                                            \
+                if ((tot) + (add) < (tot))                              \
+                        (tot) = LOV_U64_MAX;                            \
+                else                                                    \
+                        (tot) += (add);                                 \
+        } while(0)
+
+int lov_fini_statfs(struct obd_device *obd, struct obd_statfs *osfs,int success)
+{
+        ENTRY;
+
+        if (success) {
+                __u32 expected_stripes = lov_get_stripecnt(&obd->u.lov,
+                                                           LOV_MAGIC, 0);
+                if (osfs->os_files != LOV_U64_MAX)
+                        do_div(osfs->os_files, expected_stripes);
+                if (osfs->os_ffree != LOV_U64_MAX)
+                        do_div(osfs->os_ffree, expected_stripes);
+
+                cfs_spin_lock(&obd->obd_osfs_lock);
+                memcpy(&obd->obd_osfs, osfs, sizeof(*osfs));
+                obd->obd_osfs_age = cfs_time_current_64();
+                cfs_spin_unlock(&obd->obd_osfs_lock);
+                RETURN(0);
+        }
+
+        RETURN(-EIO);
+}
+
+int lov_fini_statfs_set(struct lov_request_set *set)
+{
+        int rc = 0;
+        ENTRY;
+
+        if (set == NULL)
+                RETURN(0);
+
+        if (cfs_atomic_read(&set->set_completes)) {
+                rc = lov_fini_statfs(set->set_obd, set->set_oi->oi_osfs,
+                                     cfs_atomic_read(&set->set_success));
+        }
+        lov_put_reqset(set);
+        RETURN(rc);
+}
+
+void lov_update_statfs(struct obd_statfs *osfs, struct obd_statfs *lov_sfs,
+                       int success)
+{
+        int shift = 0, quit = 0;
+        __u64 tmp;
+
+        if (success == 0) {
+                memcpy(osfs, lov_sfs, sizeof(*lov_sfs));
+        } else {
+                if (osfs->os_bsize != lov_sfs->os_bsize) {
+                        /* assume all block sizes are always powers of 2 */
+                        /* get the bits difference */
+                        tmp = osfs->os_bsize | lov_sfs->os_bsize;
+                        for (shift = 0; shift <= 64; ++shift) {
+                                if (tmp & 1) {
+                                        if (quit)
+                                                break;
+                                        else
+                                                quit = 1;
+                                        shift = 0;
+                                }
+                                tmp >>= 1;
+                        }
+                }
+
+                if (osfs->os_bsize < lov_sfs->os_bsize) {
+                        osfs->os_bsize = lov_sfs->os_bsize;
+
+                        osfs->os_bfree  >>= shift;
+                        osfs->os_bavail >>= shift;
+                        osfs->os_blocks >>= shift;
+                } else if (shift != 0) {
+                        lov_sfs->os_bfree  >>= shift;
+                        lov_sfs->os_bavail >>= shift;
+                        lov_sfs->os_blocks >>= shift;
+                }
+#ifdef MIN_DF
+                /* Sandia requested that df (and so, statfs) only
+                   returned minimal available space on
+                   a single OST, so people would be able to
+                   write this much data guaranteed. */
+                if (osfs->os_bavail > lov_sfs->os_bavail) {
+                        /* Presumably if new bavail is smaller,
+                           new bfree is bigger as well */
+                        osfs->os_bfree = lov_sfs->os_bfree;
+                        osfs->os_bavail = lov_sfs->os_bavail;
+                }
+#else
+                osfs->os_bfree += lov_sfs->os_bfree;
+                osfs->os_bavail += lov_sfs->os_bavail;
+#endif
+                osfs->os_blocks += lov_sfs->os_blocks;
+                /* XXX not sure about this one - depends on policy.
+                 *   - could be minimum if we always stripe on all OBDs
+                 *     (but that would be wrong for any other policy,
+                 *     if one of the OBDs has no more objects left)
+                 *   - could be sum if we stripe whole objects
+                 *   - could be average, just to give a nice number
+                 *
+                 * To give a "reasonable" (if not wholly accurate)
+                 * number, we divide the total number of free objects
+                 * by expected stripe count (watch out for overflow).
+                 */
+                LOV_SUM_MAX(osfs->os_files, lov_sfs->os_files);
+                LOV_SUM_MAX(osfs->os_ffree, lov_sfs->os_ffree);
+        }
+}
+
+/* The callback for osc_statfs_async that finilizes a request info when a
+ * response is received. */
+static int cb_statfs_update(void *cookie, int rc)
+{
+        struct obd_info *oinfo = cookie;
+        struct lov_request *lovreq;
+        struct lov_request_set *set;
+        struct obd_statfs *osfs, *lov_sfs;
+        struct lov_obd *lov;
+        struct lov_tgt_desc *tgt;
+        struct obd_device *lovobd, *tgtobd;
+        int success;
+        ENTRY;
+
+        lovreq = container_of(oinfo, struct lov_request, rq_oi);
+        set = lovreq->rq_rqset;
+        lovobd = set->set_obd;
+        lov = &lovobd->u.lov;
+        osfs = set->set_oi->oi_osfs;
+        lov_sfs = oinfo->oi_osfs;
+        success = cfs_atomic_read(&set->set_success);
+        /* XXX: the same is done in lov_update_common_set, however
+           lovset->set_exp is not initialized. */
+        lov_update_set(set, lovreq, rc);
+        if (rc)
+                GOTO(out, rc);
+
+        obd_getref(lovobd);
+        tgt = lov->lov_tgts[lovreq->rq_idx];
+        if (!tgt || !tgt->ltd_active)
+                GOTO(out_update, rc);
+
+        tgtobd = class_exp2obd(tgt->ltd_exp);
+        cfs_spin_lock(&tgtobd->obd_osfs_lock);
+        memcpy(&tgtobd->obd_osfs, lov_sfs, sizeof(*lov_sfs));
+        if ((oinfo->oi_flags & OBD_STATFS_FROM_CACHE) == 0)
+                tgtobd->obd_osfs_age = cfs_time_current_64();
+        cfs_spin_unlock(&tgtobd->obd_osfs_lock);
+
+out_update:
+        lov_update_statfs(osfs, lov_sfs, success);
+        qos_update(lov);
+        obd_putref(lovobd);
+
+out:
+        if (set->set_oi->oi_flags & OBD_STATFS_PTLRPCD &&
+            lov_finished_set(set)) {
+                lov_statfs_interpret(NULL, set, set->set_count !=
+                                     cfs_atomic_read(&set->set_success));
+                if (lov->lov_qos.lq_statfs_in_progress)
+                        qos_statfs_done(lov);
+        }
+
+        RETURN(0);
+}
+
+int lov_prep_statfs_set(struct obd_device *obd, struct obd_info *oinfo,
+                        struct lov_request_set **reqset)
+{
+        struct lov_request_set *set;
+        struct lov_obd *lov = &obd->u.lov;
+        int rc = 0, i;
+        ENTRY;
+
+        OBD_ALLOC(set, sizeof(*set));
+        if (set == NULL)
+                RETURN(-ENOMEM);
+        lov_init_set(set);
+
+        set->set_obd = obd;
+        set->set_oi = oinfo;
+
+        /* We only get block data from the OBD */
+        for (i = 0; i < lov->desc.ld_tgt_count; i++) {
+                struct lov_request *req;
+
+                if (!lov->lov_tgts[i] || (!lov->lov_tgts[i]->ltd_active
+                                          && (oinfo->oi_flags & OBD_STATFS_NODELAY))) {
+                        CDEBUG(D_HA, "lov idx %d inactive\n", i);
+                        continue;
+                }
+
+                /* skip targets that have been explicitely disabled by the
+                 * administrator */
+                if (!lov->lov_tgts[i]->ltd_exp) {
+                        CDEBUG(D_HA, "lov idx %d administratively disabled\n", i);
+                        continue;
+                }
+
+                OBD_ALLOC(req, sizeof(*req));
+                if (req == NULL)
+                        GOTO(out_set, rc = -ENOMEM);
+
+                OBD_ALLOC(req->rq_oi.oi_osfs, sizeof(*req->rq_oi.oi_osfs));
+                if (req->rq_oi.oi_osfs == NULL) {
+                        OBD_FREE(req, sizeof(*req));
+                        GOTO(out_set, rc = -ENOMEM);
+                }
+
+                req->rq_idx = i;
+                req->rq_oi.oi_cb_up = cb_statfs_update;
+                req->rq_oi.oi_flags = oinfo->oi_flags;
+
+                lov_set_add_req(req, set);
+        }
+        if (!set->set_count)
+                GOTO(out_set, rc = -EIO);
+        *reqset = set;
+        RETURN(rc);
+out_set:
+        lov_fini_statfs_set(set);
+        RETURN(rc);
+}