Whamcloud - gitweb
b=22176 Add .sync_fs super block handler
[fs/lustre-release.git] / lustre / lov / lov_request.c
index aeb17a0..3ad8f6f 100644 (file)
@@ -1,25 +1,37 @@
 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
  * vim:expandtab:shiftwidth=8:tabstop=8:
  *
- * Copyright (C) 2002, 2003 Cluster File Systems, Inc.
+ * GPL HEADER START
  *
- *   This file is part of the Lustre file system, http://www.lustre.org
- *   Lustre is a trademark of Cluster File Systems, Inc.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  *
- *   You may have signed or agreed to another license before downloading
- *   this software.  If so, you are bound by the terms and conditions
- *   of that agreement, and the following does not apply to you.  See the
- *   LICENSE file included with this distribution for more information.
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 only,
+ * as published by the Free Software Foundation.
  *
- *   If you did not agree to a different license, then this copy of Lustre
- *   is open source software; you can redistribute it and/or modify it
- *   under the terms of version 2 of the GNU General Public License as
- *   published by the Free Software Foundation.
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * General Public License version 2 for more details (a copy is included
+ * in the LICENSE file that accompanied this code).
  *
- *   In either case, Lustre is distributed in the hope that it will be
- *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
- *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
- *   license text for more details.
+ * You should have received a copy of the GNU General Public License
+ * version 2 along with this program; If not, see
+ * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
+ *
+ * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
+ * CA 95054 USA or visit www.sun.com if you need additional information or
+ * have any questions.
+ *
+ * GPL HEADER END
+ */
+/*
+ * Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved.
+ * Use is subject to license terms.
+ */
+/*
+ * This file is part of Lustre, http://www.lustre.org/
+ * Lustre is a trademark of Sun Microsystems, Inc.
  */
 
 #ifndef EXPORT_SYMTAB
@@ -46,19 +58,22 @@ static void lov_init_set(struct lov_request_set *set)
         set->set_success = 0;
         set->set_cookies = 0;
         CFS_INIT_LIST_HEAD(&set->set_list);
-        atomic_set(&set->set_refcount, 1);
+        cfs_atomic_set(&set->set_refcount, 1);
+        cfs_waitq_init(&set->set_waitq);
+        cfs_spin_lock_init(&set->set_lock);
 }
 
-static void lov_finish_set(struct lov_request_set *set)
+void lov_finish_set(struct lov_request_set *set)
 {
-        struct list_head *pos, *n;
+        cfs_list_t *pos, *n;
         ENTRY;
 
         LASSERT(set);
-        list_for_each_safe(pos, n, &set->set_list) {
-                struct lov_request *req = list_entry(pos, struct lov_request,
-                                                     rq_link);
-                list_del_init(&req->rq_link);
+        cfs_list_for_each_safe(pos, n, &set->set_list) {
+                struct lov_request *req = cfs_list_entry(pos,
+                                                         struct lov_request,
+                                                         rq_link);
+                cfs_list_del_init(&req->rq_link);
 
                 if (req->rq_oi.oi_oa)
                         OBDO_FREE(req->rq_oi.oi_oa);
@@ -81,6 +96,13 @@ static void lov_finish_set(struct lov_request_set *set)
         EXIT;
 }
 
+int lov_finished_set(struct lov_request_set *set)
+{
+        CDEBUG(D_INFO, "check set %d/%d\n", set->set_completes,
+               set->set_count);
+        return set->set_completes == set->set_count;
+}
+
 void lov_update_set(struct lov_request_set *set,
                     struct lov_request *req, int rc)
 {
@@ -90,6 +112,8 @@ void lov_update_set(struct lov_request_set *set,
         set->set_completes++;
         if (rc == 0)
                 set->set_success++;
+
+        cfs_waitq_signal(&set->set_waitq);
 }
 
 int lov_update_common_set(struct lov_request_set *set,
@@ -101,7 +125,7 @@ int lov_update_common_set(struct lov_request_set *set,
         lov_update_set(set, req, rc);
 
         /* grace error on inactive ost */
-        if (rc && !(lov->lov_tgts[req->rq_idx] && 
+        if (rc && !(lov->lov_tgts[req->rq_idx] &&
                     lov->lov_tgts[req->rq_idx]->ltd_active))
                 rc = 0;
 
@@ -111,22 +135,49 @@ int lov_update_common_set(struct lov_request_set *set,
 
 void lov_set_add_req(struct lov_request *req, struct lov_request_set *set)
 {
-        list_add_tail(&req->rq_link, &set->set_list);
+        cfs_list_add_tail(&req->rq_link, &set->set_list);
         set->set_count++;
+        req->rq_rqset = set;
+}
+
+extern void osc_update_enqueue(struct lustre_handle *lov_lockhp,
+                               struct lov_oinfo *loi, int flags,
+                               struct ost_lvb *lvb, __u32 mode, int rc);
+
+static int lov_update_enqueue_lov(struct obd_export *exp,
+                                  struct lustre_handle *lov_lockhp,
+                                  struct lov_oinfo *loi, int flags, int idx,
+                                  __u64 oid, int rc)
+{
+        struct lov_obd *lov = &exp->exp_obd->u.lov;
+
+        if (rc != ELDLM_OK &&
+            !(rc == ELDLM_LOCK_ABORTED && (flags & LDLM_FL_HAS_INTENT))) {
+                memset(lov_lockhp, 0, sizeof(*lov_lockhp));
+                if (lov->lov_tgts[idx] && lov->lov_tgts[idx]->ltd_active) {
+                        /* -EUSERS used by OST to report file contention */
+                        if (rc != -EINTR && rc != -EUSERS)
+                                CERROR("enqueue objid "LPX64" subobj "
+                                       LPX64" on OST idx %d: rc %d\n",
+                                       oid, loi->loi_id, loi->loi_ost_idx, rc);
+                } else
+                        rc = ELDLM_OK;
+        }
+        return rc;
 }
 
 int lov_update_enqueue_set(struct lov_request *req, __u32 mode, int rc)
 {
         struct lov_request_set *set = req->rq_rqset;
         struct lustre_handle *lov_lockhp;
+        struct obd_info *oi = set->set_oi;
         struct lov_oinfo *loi;
         ENTRY;
 
-        LASSERT(set != NULL);
-        LASSERT(set->set_oi != NULL);
+        LASSERT(oi != NULL);
 
         lov_lockhp = set->set_lockh->llh_handles + req->rq_stripe;
-        loi = set->set_oi->oi_md->lsm_oinfo[req->rq_stripe];
+        loi = oi->oi_md->lsm_oinfo[req->rq_stripe];
 
         /* XXX LOV STACKING: OSC gets a copy, created in lov_prep_enqueue_set
          * and that copy can be arbitrarily out of date.
@@ -134,65 +185,22 @@ int lov_update_enqueue_set(struct lov_request *req, __u32 mode, int rc)
          * The LOV API is due for a serious rewriting anyways, and this
          * can be addressed then. */
 
-        if (rc == ELDLM_OK) {
-                struct ldlm_lock *lock = ldlm_handle2lock(lov_lockhp);
-                __u64 tmp;
-
-                LASSERT(lock != NULL);
-                lov_stripe_lock(set->set_oi->oi_md);
-                loi->loi_lvb = req->rq_oi.oi_md->lsm_oinfo[0]->loi_lvb;
-                tmp = loi->loi_lvb.lvb_size;
-                /* Extend KMS up to the end of this lock and no further
-                 * A lock on [x,y] means a KMS of up to y + 1 bytes! */
-                if (tmp > lock->l_policy_data.l_extent.end)
-                        tmp = lock->l_policy_data.l_extent.end + 1;
-                if (tmp >= loi->loi_kms) {
-                        LDLM_DEBUG(lock, "lock acquired, setting rss="LPU64
-                                   ", kms="LPU64, loi->loi_lvb.lvb_size, tmp);
-                        loi->loi_kms = tmp;
-                        loi->loi_kms_valid = 1;
-                } else {
-                        LDLM_DEBUG(lock, "lock acquired, setting rss="
-                                   LPU64"; leaving kms="LPU64", end="LPU64,
-                                   loi->loi_lvb.lvb_size, loi->loi_kms,
-                                   lock->l_policy_data.l_extent.end);
-                }
-                lov_stripe_unlock(set->set_oi->oi_md);
-                ldlm_lock_allow_match(lock);
-                LDLM_LOCK_PUT(lock);
-        } else if ((rc == ELDLM_LOCK_ABORTED) &&
-                   (set->set_oi->oi_flags & LDLM_FL_HAS_INTENT)) {
-                memset(lov_lockhp, 0, sizeof(*lov_lockhp));
-                lov_stripe_lock(set->set_oi->oi_md);
-                loi->loi_lvb = req->rq_oi.oi_md->lsm_oinfo[0]->loi_lvb;
-                lov_stripe_unlock(set->set_oi->oi_md);
-                CDEBUG(D_INODE, "glimpsed, setting rss="LPU64"; leaving"
-                       " kms="LPU64"\n", loi->loi_lvb.lvb_size, loi->loi_kms);
-                rc = ELDLM_OK;
-        } else {
-                struct obd_export *exp = set->set_exp;
-                struct lov_obd *lov = &exp->exp_obd->u.lov;
-
-                memset(lov_lockhp, 0, sizeof(*lov_lockhp));
-                if (lov->lov_tgts[req->rq_idx] && 
-                    lov->lov_tgts[req->rq_idx]->ltd_active) {
-                        /* -EUSERS used by OST to report file contention */
-                        if (rc != -EINTR && rc != -EUSERS)
-                                CERROR("enqueue objid "LPX64" subobj "
-                                       LPX64" on OST idx %d: rc %d\n",
-                                       set->set_oi->oi_md->lsm_object_id,
-                                       loi->loi_id, loi->loi_ost_idx, rc);
-                } else {
-                        rc = ELDLM_OK;
-                }
-        }
+        lov_stripe_lock(oi->oi_md);
+        osc_update_enqueue(lov_lockhp, loi, oi->oi_flags,
+                           &req->rq_oi.oi_md->lsm_oinfo[0]->loi_lvb, mode, rc);
+        if (rc == ELDLM_LOCK_ABORTED && (oi->oi_flags & LDLM_FL_HAS_INTENT))
+                memset(lov_lockhp, 0, sizeof *lov_lockhp);
+        rc = lov_update_enqueue_lov(set->set_exp, lov_lockhp, loi, oi->oi_flags,
+                                    req->rq_idx, oi->oi_md->lsm_object_id, rc);
+        lov_stripe_unlock(oi->oi_md);
         lov_update_set(set, req, rc);
         RETURN(rc);
 }
 
 /* The callback for osc_enqueue that updates lov info for every OSC request. */
-static int cb_update_enqueue(struct obd_info *oinfo, int rc)
+static int cb_update_enqueue(void *cookie, int rc)
 {
+        struct obd_info *oinfo = cookie;
         struct ldlm_enqueue_info *einfo;
         struct lov_request *lovreq;
 
@@ -213,7 +221,7 @@ static int enqueue_done(struct lov_request_set *set, __u32 mode)
                 RETURN(0);
 
         /* cancel enqueued/matched locks */
-        list_for_each_entry(req, &set->set_list, rq_link) {
+        cfs_list_for_each_entry(req, &set->set_list, rq_link) {
                 struct lustre_handle *lov_lockhp;
 
                 if (!req->rq_complete || req->rq_rc)
@@ -256,8 +264,7 @@ int lov_fini_enqueue_set(struct lov_request_set *set, __u32 mode, int rc,
         } else if (set->set_lockh)
                 lov_llh_put(set->set_lockh);
 
-        if (atomic_dec_and_test(&set->set_refcount))
-                lov_finish_set(set);
+        lov_put_reqset(set);
 
         RETURN(rc ? rc : ret);
 }
@@ -318,8 +325,6 @@ int lov_prep_enqueue_set(struct obd_export *exp, struct obd_info *oinfo,
                         ((void *)req->rq_oi.oi_md) + sizeof(*req->rq_oi.oi_md) +
                         sizeof(struct lov_oinfo *);
 
-
-                req->rq_rqset = set;
                 /* Set lov request specific parameters. */
                 req->rq_oi.oi_lockh = set->set_lockh->llh_handles + i;
                 req->rq_oi.oi_cb_up = cb_update_enqueue;
@@ -337,7 +342,7 @@ int lov_prep_enqueue_set(struct obd_export *exp, struct obd_info *oinfo,
 
                 /* XXX LOV STACKING: submd should be from the subobj */
                 req->rq_oi.oi_md->lsm_object_id = loi->loi_id;
-                req->rq_oi.oi_md->lsm_object_gr = oinfo->oi_md->lsm_object_gr;
+                req->rq_oi.oi_md->lsm_object_seq = loi->loi_seq;
                 req->rq_oi.oi_md->lsm_stripe_count = 0;
                 req->rq_oi.oi_md->lsm_oinfo[0]->loi_kms_valid =
                         loi->loi_kms_valid;
@@ -382,8 +387,7 @@ int lov_fini_match_set(struct lov_request_set *set, __u32 mode, int flags)
             (flags & LDLM_FL_TEST_LOCK))
                 lov_llh_put(set->set_lockh);
 
-        if (atomic_dec_and_test(&set->set_refcount))
-                lov_finish_set(set);
+        lov_put_reqset(set);
 
         RETURN(rc);
 }
@@ -448,7 +452,7 @@ int lov_prep_match_set(struct obd_export *exp, struct obd_info *oinfo,
 
                 /* XXX LOV STACKING: submd should be from the subobj */
                 req->rq_oi.oi_md->lsm_object_id = loi->loi_id;
-                req->rq_oi.oi_md->lsm_object_gr = lsm->lsm_object_gr;
+                req->rq_oi.oi_md->lsm_object_seq = loi->loi_seq;
                 req->rq_oi.oi_md->lsm_stripe_count = 0;
 
                 lov_set_add_req(req, set);
@@ -474,8 +478,7 @@ int lov_fini_cancel_set(struct lov_request_set *set)
         if (set->set_lockh)
                 lov_llh_put(set->set_lockh);
 
-        if (atomic_dec_and_test(&set->set_refcount))
-                lov_finish_set(set);
+        lov_put_reqset(set);
 
         RETURN(rc);
 }
@@ -511,7 +514,7 @@ int lov_prep_cancel_set(struct obd_export *exp, struct obd_info *oinfo,
 
                 lov_lockhp = set->set_lockh->llh_handles + i;
                 if (!lustre_handle_is_used(lov_lockhp)) {
-                        CDEBUG(D_RPCTRACE,"lov idx %d subobj "LPX64" no lock\n",
+                        CDEBUG(D_INFO, "lov idx %d subobj "LPX64" no lock\n",
                                loi->loi_ost_idx, loi->loi_id);
                         continue;
                 }
@@ -532,7 +535,7 @@ int lov_prep_cancel_set(struct obd_export *exp, struct obd_info *oinfo,
 
                 /* XXX LOV STACKING: submd should be from the subobj */
                 req->rq_oi.oi_md->lsm_object_id = loi->loi_id;
-                req->rq_oi.oi_md->lsm_object_gr = lsm->lsm_object_gr;
+                req->rq_oi.oi_md->lsm_object_seq = loi->loi_seq;
                 req->rq_oi.oi_md->lsm_stripe_count = 0;
 
                 lov_set_add_req(req, set);
@@ -546,6 +549,56 @@ out_set:
         RETURN(rc);
 }
 
+static int lov_update_create_set(struct lov_request_set *set,
+                                 struct lov_request *req, int rc)
+{
+        struct obd_trans_info *oti = set->set_oti;
+        struct lov_stripe_md *lsm = set->set_oi->oi_md;
+        struct lov_oinfo *loi;
+        struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
+        ENTRY;
+
+        if (rc && lov->lov_tgts[req->rq_idx] &&
+            lov->lov_tgts[req->rq_idx]->ltd_active) {
+                CERROR("error creating fid "LPX64" sub-object"
+                       " on OST idx %d/%d: rc = %d\n",
+                       set->set_oi->oi_oa->o_id, req->rq_idx,
+                       lsm->lsm_stripe_count, rc);
+                if (rc > 0) {
+                        CERROR("obd_create returned invalid err %d\n", rc);
+                        rc = -EIO;
+                }
+        }
+
+        cfs_spin_lock(&set->set_lock);
+        req->rq_stripe = set->set_success;
+        loi = lsm->lsm_oinfo[req->rq_stripe];
+
+
+        if (rc) {
+                lov_update_set(set, req, rc);
+                cfs_spin_unlock(&set->set_lock);
+                RETURN(rc);
+        }
+
+        loi->loi_id = req->rq_oi.oi_oa->o_id;
+        loi->loi_seq = req->rq_oi.oi_oa->o_seq;
+        loi->loi_ost_idx = req->rq_idx;
+        loi_init(loi);
+
+        if (oti && set->set_cookies)
+                ++oti->oti_logcookies;
+        if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLCOOKIE)
+                set->set_cookie_sent++;
+
+        lov_update_set(set, req, rc);
+        cfs_spin_unlock(&set->set_lock);
+
+        CDEBUG(D_INODE, "objid "LPX64" has subobj "LPX64"/"LPU64" at idx %d\n",
+               lsm->lsm_object_id, loi->loi_id, loi->loi_id, req->rq_idx);
+        RETURN(rc);
+}
+
 static int create_done(struct obd_export *exp, struct lov_request_set *set,
                        struct lov_stripe_md **lsmp)
 {
@@ -562,7 +615,7 @@ static int create_done(struct obd_export *exp, struct lov_request_set *set,
         /* try alloc objects on other osts if osc_create fails for
          * exceptions: RPC failure, ENOSPC, etc */
         if (set->set_count != set->set_success) {
-                list_for_each_entry (req, &set->set_list, rq_link) {
+                cfs_list_for_each_entry (req, &set->set_list, rq_link) {
                         if (req->rq_rc == 0)
                                 continue;
 
@@ -571,9 +624,6 @@ static int create_done(struct obd_export *exp, struct lov_request_set *set,
 
                         rc = qos_remedy_create(set, req);
                         lov_update_create_set(set, req, rc);
-
-                        if (rc)
-                                break;
                 }
         }
 
@@ -581,11 +631,7 @@ static int create_done(struct obd_export *exp, struct lov_request_set *set,
         if (set->set_success == 0)
                 GOTO(cleanup, rc);
 
-        /* If there was an explicit stripe set, fail.  Otherwise, we
-         * got some objects and that's not bad. */
         if (set->set_count != set->set_success) {
-                if (*lsmp)
-                        GOTO(cleanup, rc);
                 set->set_count = set->set_success;
                 qos_shrink_lsm(set);
         }
@@ -594,7 +640,7 @@ static int create_done(struct obd_export *exp, struct lov_request_set *set,
         if (ret_oa == NULL)
                 GOTO(cleanup, rc = -ENOMEM);
 
-        list_for_each_entry(req, &set->set_list, rq_link) {
+        cfs_list_for_each_entry(req, &set->set_list, rq_link) {
                 if (!req->rq_complete || req->rq_rc)
                         continue;
                 lov_merge_attrs(ret_oa, req->rq_oi.oi_oa,
@@ -608,7 +654,7 @@ static int create_done(struct obd_export *exp, struct lov_request_set *set,
                 LBUG();
         }
         ret_oa->o_id = src_oa->o_id;
-        ret_oa->o_gr = src_oa->o_gr;
+        ret_oa->o_seq = src_oa->o_seq;
         ret_oa->o_valid |= OBD_MD_FLID | OBD_MD_FLGROUP;
         memcpy(src_oa, ret_oa, sizeof(*src_oa));
         OBDO_FREE(ret_oa);
@@ -617,7 +663,7 @@ static int create_done(struct obd_export *exp, struct lov_request_set *set,
         GOTO(done, rc = 0);
 
 cleanup:
-        list_for_each_entry(req, &set->set_list, rq_link) {
+        cfs_list_for_each_entry(req, &set->set_list, rq_link) {
                 struct obd_export *sub_exp;
                 int err = 0;
 
@@ -625,7 +671,8 @@ cleanup:
                         continue;
 
                 sub_exp = lov->lov_tgts[req->rq_idx]->ltd_exp;
-                err = obd_destroy(sub_exp, req->rq_oi.oi_oa, NULL, oti, NULL);
+                err = obd_destroy(sub_exp, req->rq_oi.oi_oa, NULL, oti, NULL,
+                                  NULL);
                 if (err)
                         CERROR("Failed to uncreate objid "LPX64" subobj "
                                LPX64" on OST idx %d: rc = %d\n",
@@ -658,52 +705,25 @@ int lov_fini_create_set(struct lov_request_set *set,struct lov_stripe_md **lsmp)
         if (set->set_completes)
                 rc = create_done(set->set_exp, set, lsmp);
 
-        if (atomic_dec_and_test(&set->set_refcount))
-                lov_finish_set(set);
-
+        lov_put_reqset(set);
         RETURN(rc);
 }
 
-int lov_update_create_set(struct lov_request_set *set,
-                          struct lov_request *req, int rc)
+int cb_create_update(void *cookie, int rc)
 {
-        struct obd_trans_info *oti = set->set_oti;
-        struct lov_stripe_md *lsm = set->set_oi->oi_md;
-        struct lov_oinfo *loi;
-        struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
-        ENTRY;
-
-        req->rq_stripe = set->set_success;
-        loi = lsm->lsm_oinfo[req->rq_stripe];
-
-        if (rc && lov->lov_tgts[req->rq_idx] &&
-            lov->lov_tgts[req->rq_idx]->ltd_active) {
-                CERROR("error creating fid "LPX64" sub-object"
-                       " on OST idx %d/%d: rc = %d\n",
-                       set->set_oi->oi_oa->o_id, req->rq_idx,
-                       lsm->lsm_stripe_count, rc);
-                if (rc > 0) {
-                        CERROR("obd_create returned invalid err %d\n", rc);
-                        rc = -EIO;
-                }
-        }
-        lov_update_set(set, req, rc);
-        if (rc)
-                RETURN(rc);
+        struct obd_info *oinfo = cookie;
+        struct lov_request *lovreq;
 
-        loi->loi_id = req->rq_oi.oi_oa->o_id;
-        loi->loi_gr = req->rq_oi.oi_oa->o_gr;
-        loi->loi_ost_idx = req->rq_idx;
-        CDEBUG(D_INODE, "objid "LPX64" has subobj "LPX64"/"LPU64" at idx %d\n",
-               lsm->lsm_object_id, loi->loi_id, loi->loi_id, req->rq_idx);
-        loi_init(loi);
+        lovreq = container_of(oinfo, struct lov_request, rq_oi);
 
-        if (oti && set->set_cookies)
-                ++oti->oti_logcookies;
-        if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLCOOKIE)
-                set->set_cookie_sent++;
+        if (OBD_FAIL_CHECK(OBD_FAIL_MDS_OSC_CREATE_FAIL))
+                if (lovreq->rq_idx == obd_fail_val)
+                        rc = -ENOTCONN;
 
-        RETURN(0);
+        rc= lov_update_create_set(lovreq->rq_rqset, lovreq, rc);
+        if (lov_finished_set(lovreq->rq_rqset))
+                lov_put_reqset(lovreq->rq_rqset);
+        return rc;
 }
 
 int lov_prep_create_set(struct obd_export *exp, struct obd_info *oinfo,
@@ -725,18 +745,23 @@ int lov_prep_create_set(struct obd_export *exp, struct obd_info *oinfo,
         set->set_oi->oi_md = *lsmp;
         set->set_oi->oi_oa = src_oa;
         set->set_oti = oti;
+        lov_get_reqset(set);
 
         rc = qos_prep_create(exp, set);
-        if (rc)
+        /* qos_shrink_lsm() may have allocated a new lsm */
+        *lsmp = oinfo->oi_md;
+        if (rc) {
                 lov_fini_create_set(set, lsmp);
-        else
+                lov_put_reqset(set);
+        } else {
                 *reqset = set;
+        }
         RETURN(rc);
 }
 
 static int common_attr_done(struct lov_request_set *set)
 {
-        struct list_head *pos;
+        cfs_list_t *pos;
         struct lov_request *req;
         struct obdo *tmp_oa;
         int rc = 0, attrset = 0;
@@ -754,8 +779,8 @@ static int common_attr_done(struct lov_request_set *set)
         if (tmp_oa == NULL)
                 GOTO(out, rc = -ENOMEM);
 
-        list_for_each (pos, &set->set_list) {
-                req = list_entry(pos, struct lov_request, rq_link);
+        cfs_list_for_each (pos, &set->set_list) {
+                req = cfs_list_entry(pos, struct lov_request, rq_link);
 
                 if (!req->rq_complete || req->rq_rc)
                         continue;
@@ -769,6 +794,14 @@ static int common_attr_done(struct lov_request_set *set)
                 CERROR("No stripes had valid attrs\n");
                 rc = -EIO;
         }
+        if ((set->set_oi->oi_oa->o_valid & OBD_MD_FLEPOCH) &&
+            (set->set_oi->oi_md->lsm_stripe_count != attrset)) {
+                /* When we take attributes of some epoch, we require all the
+                 * ost to be active. */
+                CERROR("Not all the stripes had valid attrs\n");
+                GOTO(out, rc = -EIO);
+        }
+
         tmp_oa->o_id = set->set_oi->oi_oa->o_id;
         memcpy(set->set_oi->oi_oa, tmp_oa, sizeof(*set->set_oi->oi_oa));
 out:
@@ -782,12 +815,12 @@ static int brw_done(struct lov_request_set *set)
 {
         struct lov_stripe_md *lsm = set->set_oi->oi_md;
         struct lov_oinfo     *loi = NULL;
-        struct list_head *pos;
+        cfs_list_t *pos;
         struct lov_request *req;
         ENTRY;
 
-        list_for_each (pos, &set->set_list) {
-                req = list_entry(pos, struct lov_request, rq_link);
+        cfs_list_for_each (pos, &set->set_list) {
+                req = cfs_list_entry(pos, struct lov_request, rq_link);
 
                 if (!req->rq_complete || req->rq_rc)
                         continue;
@@ -813,8 +846,7 @@ int lov_fini_brw_set(struct lov_request_set *set)
                 rc = brw_done(set);
                 /* FIXME update qos data here */
         }
-        if (atomic_dec_and_test(&set->set_refcount))
-                lov_finish_set(set);
+        lov_put_reqset(set);
 
         RETURN(rc);
 }
@@ -865,9 +897,9 @@ int lov_prep_brw_set(struct obd_export *exp, struct obd_info *oinfo,
 
                 if (info[i].count == 0)
                         continue;
-                
+
                 loi = oinfo->oi_md->lsm_oinfo[i];
-                if (!lov->lov_tgts[loi->loi_ost_idx] || 
+                if (!lov->lov_tgts[loi->loi_ost_idx] ||
                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
                         GOTO(out, rc = -EIO);
@@ -888,6 +920,7 @@ int lov_prep_brw_set(struct obd_export *exp, struct obd_info *oinfo,
                                sizeof(*req->rq_oi.oi_oa));
                 }
                 req->rq_oi.oi_oa->o_id = loi->loi_id;
+                req->rq_oi.oi_oa->o_seq = loi->loi_seq;
                 req->rq_oi.oi_oa->o_stripe_idx = i;
 
                 req->rq_buflen = sizeof(*req->rq_oi.oi_md);
@@ -903,7 +936,7 @@ int lov_prep_brw_set(struct obd_export *exp, struct obd_info *oinfo,
 
                 /* XXX LOV STACKING */
                 req->rq_oi.oi_md->lsm_object_id = loi->loi_id;
-                req->rq_oi.oi_md->lsm_object_gr = oinfo->oi_md->lsm_object_gr;
+                req->rq_oi.oi_md->lsm_object_seq = loi->loi_seq;
                 req->rq_oabufs = info[i].count;
                 req->rq_pgaidx = shift;
                 shift += req->rq_oabufs;
@@ -952,16 +985,16 @@ int lov_fini_getattr_set(struct lov_request_set *set)
         if (set->set_completes)
                 rc = common_attr_done(set);
 
-        if (atomic_dec_and_test(&set->set_refcount))
-                lov_finish_set(set);
+        lov_put_reqset(set);
 
         RETURN(rc);
 }
 
 /* The callback for osc_getattr_async that finilizes a request info when a
- * response is recieved. */
-static int cb_getattr_update(struct obd_info *oinfo, int rc)
+ * response is received. */
+static int cb_getattr_update(void *cookie, int rc)
 {
+        struct obd_info *oinfo = cookie;
         struct lov_request *lovreq;
         lovreq = container_of(oinfo, struct lov_request, rq_oi);
         return lov_update_common_set(lovreq->rq_rqset, lovreq, rc);
@@ -991,6 +1024,9 @@ int lov_prep_getattr_set(struct obd_export *exp, struct obd_info *oinfo,
                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
+                        if (oinfo->oi_oa->o_valid & OBD_MD_FLEPOCH)
+                                /* SOM requires all the OSTs to be active. */
+                                GOTO(out_set, rc = -EIO);
                         continue;
                 }
 
@@ -1009,9 +1045,9 @@ int lov_prep_getattr_set(struct obd_export *exp, struct obd_info *oinfo,
                 memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
                        sizeof(*req->rq_oi.oi_oa));
                 req->rq_oi.oi_oa->o_id = loi->loi_id;
+                req->rq_oi.oi_oa->o_seq = loi->loi_seq;
                 req->rq_oi.oi_cb_up = cb_getattr_update;
                 req->rq_oi.oi_capa = oinfo->oi_capa;
-                req->rq_rqset = set;
 
                 lov_set_add_req(req, set);
         }
@@ -1035,8 +1071,7 @@ int lov_fini_destroy_set(struct lov_request_set *set)
                 /* FIXME update qos data here */
         }
 
-        if (atomic_dec_and_test(&set->set_refcount))
-                lov_finish_set(set);
+        lov_put_reqset(set);
 
         RETURN(0);
 }
@@ -1069,7 +1104,7 @@ int lov_prep_destroy_set(struct obd_export *exp, struct obd_info *oinfo,
                 struct lov_request *req;
 
                 loi = lsm->lsm_oinfo[i];
-                if (!lov->lov_tgts[loi->loi_ost_idx] || 
+                if (!lov->lov_tgts[loi->loi_ost_idx] ||
                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
                         continue;
@@ -1089,6 +1124,7 @@ int lov_prep_destroy_set(struct obd_export *exp, struct obd_info *oinfo,
                 }
                 memcpy(req->rq_oi.oi_oa, src_oa, sizeof(*req->rq_oi.oi_oa));
                 req->rq_oi.oi_oa->o_id = loi->loi_id;
+                req->rq_oi.oi_oa->o_seq = loi->loi_seq;
                 lov_set_add_req(req, set);
         }
         if (!set->set_count)
@@ -1113,8 +1149,7 @@ int lov_fini_setattr_set(struct lov_request_set *set)
                 /* FIXME update qos data here */
         }
 
-        if (atomic_dec_and_test(&set->set_refcount))
-                lov_finish_set(set);
+        lov_put_reqset(set);
         RETURN(rc);
 }
 
@@ -1128,7 +1163,7 @@ int lov_update_setattr_set(struct lov_request_set *set,
         lov_update_set(set, req, rc);
 
         /* grace error on inactive ost */
-        if (rc && !(lov->lov_tgts[req->rq_idx] && 
+        if (rc && !(lov->lov_tgts[req->rq_idx] &&
                     lov->lov_tgts[req->rq_idx]->ltd_active))
                 rc = 0;
 
@@ -1148,9 +1183,10 @@ int lov_update_setattr_set(struct lov_request_set *set,
 }
 
 /* The callback for osc_setattr_async that finilizes a request info when a
- * response is recieved. */
-static int cb_setattr_update(struct obd_info *oinfo, int rc)
+ * response is received. */
+static int cb_setattr_update(void *cookie, int rc)
 {
+        struct obd_info *oinfo = cookie;
         struct lov_request *lovreq;
         lovreq = container_of(oinfo, struct lov_request, rq_oi);
         return lov_update_setattr_set(lovreq->rq_rqset, lovreq, rc);
@@ -1200,12 +1236,10 @@ int lov_prep_setattr_set(struct obd_export *exp, struct obd_info *oinfo,
                 memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
                        sizeof(*req->rq_oi.oi_oa));
                 req->rq_oi.oi_oa->o_id = loi->loi_id;
-                LASSERT(!(req->rq_oi.oi_oa->o_valid & OBD_MD_FLGROUP) 
-                                || req->rq_oi.oi_oa->o_gr>0);
+                req->rq_oi.oi_oa->o_seq= loi->loi_seq;
                 req->rq_oi.oi_oa->o_stripe_idx = i;
                 req->rq_oi.oi_cb_up = cb_setattr_update;
                 req->rq_oi.oi_capa = oinfo->oi_capa;
-                req->rq_rqset = set;
 
                 if (oinfo->oi_oa->o_valid & OBD_MD_FLSIZE) {
                         int off = lov_stripe_offset(oinfo->oi_md,
@@ -1245,8 +1279,7 @@ int lov_fini_punch_set(struct lov_request_set *set)
                         rc = common_attr_done(set);
         }
 
-        if (atomic_dec_and_test(&set->set_refcount))
-                lov_finish_set(set);
+        lov_put_reqset(set);
 
         RETURN(rc);
 }
@@ -1280,9 +1313,10 @@ int lov_update_punch_set(struct lov_request_set *set,
 }
 
 /* The callback for osc_punch that finilizes a request info when a response
- * is recieved. */
-static int cb_update_punch(struct obd_info *oinfo, int rc)
+ * is received. */
+static int cb_update_punch(void *cookie, int rc)
 {
+        struct obd_info *oinfo = cookie;
         struct lov_request *lovreq;
         lovreq = container_of(oinfo, struct lov_request, rq_oi);
         return lov_update_punch_set(lovreq->rq_rqset, lovreq, rc);
@@ -1310,18 +1344,18 @@ int lov_prep_punch_set(struct obd_export *exp, struct obd_info *oinfo,
                 struct lov_request *req;
                 obd_off rs, re;
 
-                if (!lov->lov_tgts[loi->loi_ost_idx] ||
-                    !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
-                        CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
-                        continue;
-                }
-
                 if (!lov_stripe_intersects(oinfo->oi_md, i,
                                            oinfo->oi_policy.l_extent.start,
                                            oinfo->oi_policy.l_extent.end,
                                            &rs, &re))
                         continue;
 
+                if (!lov->lov_tgts[loi->loi_ost_idx] ||
+                    !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
+                        CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
+                        GOTO(out_set, rc = -EIO);
+                }
+
                 OBD_ALLOC(req, sizeof(*req));
                 if (req == NULL)
                         GOTO(out_set, rc = -ENOMEM);
@@ -1336,12 +1370,11 @@ int lov_prep_punch_set(struct obd_export *exp, struct obd_info *oinfo,
                 memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
                        sizeof(*req->rq_oi.oi_oa));
                 req->rq_oi.oi_oa->o_id = loi->loi_id;
-                req->rq_oi.oi_oa->o_gr = loi->loi_gr;
+                req->rq_oi.oi_oa->o_seq = loi->loi_seq;
                 req->rq_oi.oi_oa->o_valid |= OBD_MD_FLGROUP;
 
                 req->rq_oi.oi_oa->o_stripe_idx = i;
                 req->rq_oi.oi_cb_up = cb_update_punch;
-                req->rq_rqset = set;
 
                 req->rq_oi.oi_policy.l_extent.start = rs;
                 req->rq_oi.oi_policy.l_extent.end = re;
@@ -1374,8 +1407,7 @@ int lov_fini_sync_set(struct lov_request_set *set)
                 /* FIXME update qos data here */
         }
 
-        if (atomic_dec_and_test(&set->set_refcount))
-                lov_finish_set(set);
+        lov_put_reqset(set);
 
         RETURN(rc);
 }
@@ -1427,6 +1459,7 @@ int lov_prep_sync_set(struct obd_export *exp, struct obd_info *oinfo,
                 }
                 memcpy(req->rq_oi.oi_oa, src_oa, sizeof(*req->rq_oi.oi_oa));
                 req->rq_oi.oi_oa->o_id = loi->loi_id;
+                req->rq_oi.oi_oa->o_seq = loi->loi_seq;
                 req->rq_oi.oi_oa->o_stripe_idx = i;
 
                 req->rq_oi.oi_policy.l_extent.start = rs;
@@ -1465,10 +1498,10 @@ int lov_fini_statfs(struct obd_device *obd, struct obd_statfs *osfs,int success)
                 if (osfs->os_ffree != LOV_U64_MAX)
                         do_div(osfs->os_ffree, expected_stripes);
 
-                spin_lock(&obd->obd_osfs_lock);
+                cfs_spin_lock(&obd->obd_osfs_lock);
                 memcpy(&obd->obd_osfs, osfs, sizeof(*osfs));
-                obd->obd_osfs_age = get_jiffies_64();
-                spin_unlock(&obd->obd_osfs_lock);
+                obd->obd_osfs_age = cfs_time_current_64();
+                cfs_spin_unlock(&obd->obd_osfs_lock);
                 RETURN(0);
         }
 
@@ -1487,22 +1520,26 @@ int lov_fini_statfs_set(struct lov_request_set *set)
                 rc = lov_fini_statfs(set->set_obd, set->set_oi->oi_osfs,
                                      set->set_success);
         }
+        lov_put_reqset(set);
+        RETURN(rc);
+}
 
-        if (atomic_dec_and_test(&set->set_refcount))
-                lov_finish_set(set);
+int lov_fini_sync_fs_set(struct lov_request_set *set)
+{
+        int rc = 0;
+        ENTRY;
 
+        if (set == NULL)
+                RETURN(rc);
+        lov_put_reqset(set);
         RETURN(rc);
 }
 
-void lov_update_statfs(struct obd_device *obd, struct obd_statfs *osfs,
-                       struct obd_statfs *lov_sfs, int success)
+void lov_update_statfs(struct obd_statfs *osfs, struct obd_statfs *lov_sfs,
+                       int success)
 {
         int shift = 0, quit = 0;
         __u64 tmp;
-        spin_lock(&obd->obd_osfs_lock);
-        memcpy(&obd->obd_osfs, lov_sfs, sizeof(*lov_sfs));
-        obd->obd_osfs_age = get_jiffies_64();
-        spin_unlock(&obd->obd_osfs_lock);
 
         if (success == 0) {
                 memcpy(osfs, lov_sfs, sizeof(*lov_sfs));
@@ -1567,37 +1604,55 @@ void lov_update_statfs(struct obd_device *obd, struct obd_statfs *osfs,
 }
 
 /* The callback for osc_statfs_async that finilizes a request info when a
- * response is recieved. */
-static int cb_statfs_update(struct obd_info *oinfo, int rc)
+ * response is received. */
+static int cb_statfs_update(void *cookie, int rc)
 {
+        struct obd_info *oinfo = cookie;
         struct lov_request *lovreq;
         struct obd_statfs *osfs, *lov_sfs;
-        struct obd_device *obd;
         struct lov_obd *lov;
+        struct lov_tgt_desc *tgt;
+        struct obd_device *lovobd, *tgtobd;
         int success;
         ENTRY;
 
         lovreq = container_of(oinfo, struct lov_request, rq_oi);
-        lov = &lovreq->rq_rqset->set_obd->u.lov;
-        obd = class_exp2obd(lov->lov_tgts[lovreq->rq_idx]->ltd_exp);
-
+        lovobd = lovreq->rq_rqset->set_obd;
+        lov = &lovobd->u.lov;
         osfs = lovreq->rq_rqset->set_oi->oi_osfs;
         lov_sfs = oinfo->oi_osfs;
-
         success = lovreq->rq_rqset->set_success;
-
         /* XXX: the same is done in lov_update_common_set, however
            lovset->set_exp is not initialized. */
         lov_update_set(lovreq->rq_rqset, lovreq, rc);
-        if (rc) {
-                if (rc && !(lov->lov_tgts[lovreq->rq_idx] &&
-                            lov->lov_tgts[lovreq->rq_idx]->ltd_active))
-                        rc = 0;
-                RETURN(rc);
-        }
-
-        lov_update_statfs(obd, osfs, lov_sfs, success);
+        if (rc)
+                GOTO(out, rc);
+
+        obd_getref(lovobd);
+        tgt = lov->lov_tgts[lovreq->rq_idx];
+        if (!tgt || !tgt->ltd_active)
+                GOTO(out_update, rc);
+
+        tgtobd = class_exp2obd(tgt->ltd_exp);
+        cfs_spin_lock(&tgtobd->obd_osfs_lock);
+        memcpy(&tgtobd->obd_osfs, lov_sfs, sizeof(*lov_sfs));
+        if ((oinfo->oi_flags & OBD_STATFS_FROM_CACHE) == 0)
+                tgtobd->obd_osfs_age = cfs_time_current_64();
+        cfs_spin_unlock(&tgtobd->obd_osfs_lock);
+
+out_update:
+        lov_update_statfs(osfs, lov_sfs, success);
         qos_update(lov);
+        obd_putref(lovobd);
+
+out:
+        if (lovreq->rq_rqset->set_oi->oi_flags & OBD_STATFS_PTLRPCD &&
+            lov_finished_set(lovreq->rq_rqset)) {
+               lov_statfs_interpret(NULL, lovreq->rq_rqset,
+                                    lovreq->rq_rqset->set_success !=
+                                                  lovreq->rq_rqset->set_count);
+               qos_statfs_done(lov);
+        }
 
         RETURN(0);
 }
@@ -1622,11 +1677,19 @@ int lov_prep_statfs_set(struct obd_device *obd, struct obd_info *oinfo,
         for (i = 0; i < lov->desc.ld_tgt_count; i++) {
                 struct lov_request *req;
 
-                if (!lov->lov_tgts[i] || !lov->lov_tgts[i]->ltd_active) {
+                if (!lov->lov_tgts[i] || (!lov->lov_tgts[i]->ltd_active
+                                          && (oinfo->oi_flags & OBD_STATFS_NODELAY))) {
                         CDEBUG(D_HA, "lov idx %d inactive\n", i);
                         continue;
                 }
 
+                /* skip targets that have been explicitely disabled by the
+                 * administrator */
+                if (!lov->lov_tgts[i]->ltd_exp) {
+                        CDEBUG(D_HA, "lov idx %d administratively disabled\n", i);
+                        continue;
+                }
+
                 OBD_ALLOC(req, sizeof(*req));
                 if (req == NULL)
                         GOTO(out_set, rc = -ENOMEM);
@@ -1639,7 +1702,7 @@ int lov_prep_statfs_set(struct obd_device *obd, struct obd_info *oinfo,
 
                 req->rq_idx = i;
                 req->rq_oi.oi_cb_up = cb_statfs_update;
-                req->rq_rqset = set;
+                req->rq_oi.oi_flags = oinfo->oi_flags;
 
                 lov_set_add_req(req, set);
         }
@@ -1651,3 +1714,59 @@ out_set:
         lov_fini_statfs_set(set);
         RETURN(rc);
 }
+
+int cb_sync_fs_update(void *cookie, int rc)
+{
+        struct obd_info *oinfo = cookie;
+        struct lov_request *lovreq;
+        ENTRY;
+
+        lovreq = container_of(oinfo, struct lov_request, rq_oi);
+        lov_update_set(lovreq->rq_rqset, lovreq, rc);
+
+        RETURN(rc);
+}
+
+int lov_prep_sync_fs_set(struct obd_device *obd, struct obd_info *oinfo,
+                         struct lov_request_set **request)
+{
+        struct lov_request_set *set;
+        struct lov_obd *lov = &obd->u.lov;
+        int rc = 0;
+        int i;
+
+        ENTRY;
+
+        OBD_ALLOC(set, sizeof(*set));
+        if (set == NULL)
+                RETURN(ENOMEM);
+        lov_init_set(set);
+        set->set_obd = obd;
+        set->set_oi = oinfo;
+
+        for (i = 0; i < lov->desc.ld_tgt_count; i++) {
+                struct lov_request *req;
+
+                if (!lov->lov_tgts[i] || !lov->lov_tgts[i]->ltd_active ||
+                    !lov->lov_tgts[i]->ltd_exp) {
+                        CDEBUG(D_INFO, "lov idx %d inactive or disabled\n", i);
+                        continue;
+                }
+
+                OBD_ALLOC(req, sizeof(*req));
+                if (req == NULL)
+                        GOTO(out, rc = ENOMEM);
+
+                req->rq_idx = i;
+                req->rq_oi.oi_cb_up = cb_sync_fs_update;
+
+                lov_set_add_req(req, set);
+        }
+        if (!set->set_count)
+                GOTO(out, rc = -EIO);
+        *request = set;
+        RETURN(rc);
+out:
+        lov_fini_sync_fs_set(set);
+        RETURN(rc);
+}