Whamcloud - gitweb
LU-80 lov: large stripe count support
[fs/lustre-release.git] / lustre / lov / lov_request.c
index ba95f06..d8c1fea 100644 (file)
  * GPL HEADER END
  */
 /*
- * Copyright  2008 Sun Microsystems, Inc. All rights reserved
+ * Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved.
  * Use is subject to license terms.
+ *
+ * Copyright (c) 2011 Whamcloud, Inc.
+ *
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
 static void lov_init_set(struct lov_request_set *set)
 {
         set->set_count = 0;
-        set->set_completes = 0;
-        set->set_success = 0;
+        cfs_atomic_set(&set->set_completes, 0);
+        cfs_atomic_set(&set->set_success, 0);
         set->set_cookies = 0;
         CFS_INIT_LIST_HEAD(&set->set_list);
-        atomic_set(&set->set_refcount, 1);
+        cfs_atomic_set(&set->set_refcount, 1);
+        cfs_waitq_init(&set->set_waitq);
+        cfs_spin_lock_init(&set->set_lock);
 }
 
-static void lov_finish_set(struct lov_request_set *set)
+void lov_finish_set(struct lov_request_set *set)
 {
-        struct list_head *pos, *n;
+        cfs_list_t *pos, *n;
         ENTRY;
 
         LASSERT(set);
-        list_for_each_safe(pos, n, &set->set_list) {
-                struct lov_request *req = list_entry(pos, struct lov_request,
-                                                     rq_link);
-                list_del_init(&req->rq_link);
+        cfs_list_for_each_safe(pos, n, &set->set_list) {
+                struct lov_request *req = cfs_list_entry(pos,
+                                                         struct lov_request,
+                                                         rq_link);
+                cfs_list_del_init(&req->rq_link);
 
                 if (req->rq_oi.oi_oa)
                         OBDO_FREE(req->rq_oi.oi_oa);
                 if (req->rq_oi.oi_md)
-                        OBD_FREE(req->rq_oi.oi_md, req->rq_buflen);
+                        OBD_FREE_LARGE(req->rq_oi.oi_md, req->rq_buflen);
                 if (req->rq_oi.oi_osfs)
                         OBD_FREE(req->rq_oi.oi_osfs,
                                  sizeof(*req->rq_oi.oi_osfs));
@@ -84,7 +90,7 @@ static void lov_finish_set(struct lov_request_set *set)
 
         if (set->set_pga) {
                 int len = set->set_oabufs * sizeof(*set->set_pga);
-                OBD_FREE(set->set_pga, len);
+                OBD_FREE_LARGE(set->set_pga, len);
         }
         if (set->set_lockh)
                 lov_llh_put(set->set_lockh);
@@ -93,15 +99,26 @@ static void lov_finish_set(struct lov_request_set *set)
         EXIT;
 }
 
+int lov_finished_set(struct lov_request_set *set)
+{
+        int completes = cfs_atomic_read(&set->set_completes);
+
+        CDEBUG(D_INFO, "check set %d/%d\n", completes,
+               set->set_count);
+        return completes == set->set_count;
+}
+
 void lov_update_set(struct lov_request_set *set,
                     struct lov_request *req, int rc)
 {
         req->rq_complete = 1;
         req->rq_rc = rc;
 
-        set->set_completes++;
+        cfs_atomic_inc(&set->set_completes);
         if (rc == 0)
-                set->set_success++;
+                cfs_atomic_inc(&set->set_success);
+
+        cfs_waitq_signal(&set->set_waitq);
 }
 
 int lov_update_common_set(struct lov_request_set *set,
@@ -123,8 +140,9 @@ int lov_update_common_set(struct lov_request_set *set,
 
 void lov_set_add_req(struct lov_request *req, struct lov_request_set *set)
 {
-        list_add_tail(&req->rq_link, &set->set_list);
+        cfs_list_add_tail(&req->rq_link, &set->set_list);
         set->set_count++;
+        req->rq_rqset = set;
 }
 
 extern void osc_update_enqueue(struct lustre_handle *lov_lockhp,
@@ -200,15 +218,16 @@ static int enqueue_done(struct lov_request_set *set, __u32 mode)
 {
         struct lov_request *req;
         struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
+        int completes = cfs_atomic_read(&set->set_completes);
         int rc = 0;
         ENTRY;
 
         /* enqueue/match success, just return */
-        if (set->set_completes && set->set_completes == set->set_success)
+        if (completes && completes == cfs_atomic_read(&set->set_success))
                 RETURN(0);
 
         /* cancel enqueued/matched locks */
-        list_for_each_entry(req, &set->set_list, rq_link) {
+        cfs_list_for_each_entry(req, &set->set_list, rq_link) {
                 struct lustre_handle *lov_lockhp;
 
                 if (!req->rq_complete || req->rq_rc)
@@ -246,13 +265,12 @@ int lov_fini_enqueue_set(struct lov_request_set *set, __u32 mode, int rc,
          * succeeded. */
         if (!rqset) {
                 if (rc)
-                        set->set_completes = 0;
+                        cfs_atomic_set(&set->set_completes, 0);
                 ret = enqueue_done(set, mode);
         } else if (set->set_lockh)
                 lov_llh_put(set->set_lockh);
 
-        if (atomic_dec_and_test(&set->set_refcount))
-                lov_finish_set(set);
+        lov_put_reqset(set);
 
         RETURN(rc ? rc : ret);
 }
@@ -304,7 +322,7 @@ int lov_prep_enqueue_set(struct obd_export *exp, struct obd_info *oinfo,
                 req->rq_buflen = sizeof(*req->rq_oi.oi_md) +
                         sizeof(struct lov_oinfo *) +
                         sizeof(struct lov_oinfo);
-                OBD_ALLOC(req->rq_oi.oi_md, req->rq_buflen);
+                OBD_ALLOC_LARGE(req->rq_oi.oi_md, req->rq_buflen);
                 if (req->rq_oi.oi_md == NULL) {
                         OBD_FREE(req, sizeof(*req));
                         GOTO(out_set, rc = -ENOMEM);
@@ -313,8 +331,6 @@ int lov_prep_enqueue_set(struct obd_export *exp, struct obd_info *oinfo,
                         ((void *)req->rq_oi.oi_md) + sizeof(*req->rq_oi.oi_md) +
                         sizeof(struct lov_oinfo *);
 
-
-                req->rq_rqset = set;
                 /* Set lov request specific parameters. */
                 req->rq_oi.oi_lockh = set->set_lockh->llh_handles + i;
                 req->rq_oi.oi_cb_up = cb_update_enqueue;
@@ -332,7 +348,7 @@ int lov_prep_enqueue_set(struct obd_export *exp, struct obd_info *oinfo,
 
                 /* XXX LOV STACKING: submd should be from the subobj */
                 req->rq_oi.oi_md->lsm_object_id = loi->loi_id;
-                req->rq_oi.oi_md->lsm_object_gr = oinfo->oi_md->lsm_object_gr;
+                req->rq_oi.oi_md->lsm_object_seq = loi->loi_seq;
                 req->rq_oi.oi_md->lsm_stripe_count = 0;
                 req->rq_oi.oi_md->lsm_oinfo[0]->loi_kms_valid =
                         loi->loi_kms_valid;
@@ -350,20 +366,6 @@ out_set:
         RETURN(rc);
 }
 
-int lov_update_match_set(struct lov_request_set *set, struct lov_request *req,
-                         int rc)
-{
-        int ret = rc;
-        ENTRY;
-
-        if (rc > 0)
-                ret = 0;
-        else if (rc == 0)
-                ret = 1;
-        lov_update_set(set, req, ret);
-        RETURN(rc);
-}
-
 int lov_fini_match_set(struct lov_request_set *set, __u32 mode, int flags)
 {
         int rc = 0;
@@ -373,12 +375,11 @@ int lov_fini_match_set(struct lov_request_set *set, __u32 mode, int flags)
                 RETURN(0);
         LASSERT(set->set_exp);
         rc = enqueue_done(set, mode);
-        if ((set->set_count == set->set_success) &&
+        if ((set->set_count == cfs_atomic_read(&set->set_success)) &&
             (flags & LDLM_FL_TEST_LOCK))
                 lov_llh_put(set->set_lockh);
 
-        if (atomic_dec_and_test(&set->set_refcount))
-                lov_finish_set(set);
+        lov_put_reqset(set);
 
         RETURN(rc);
 }
@@ -428,7 +429,7 @@ int lov_prep_match_set(struct obd_export *exp, struct obd_info *oinfo,
                         GOTO(out_set, rc = -ENOMEM);
 
                 req->rq_buflen = sizeof(*req->rq_oi.oi_md);
-                OBD_ALLOC(req->rq_oi.oi_md, req->rq_buflen);
+                OBD_ALLOC_LARGE(req->rq_oi.oi_md, req->rq_buflen);
                 if (req->rq_oi.oi_md == NULL) {
                         OBD_FREE(req, sizeof(*req));
                         GOTO(out_set, rc = -ENOMEM);
@@ -443,7 +444,7 @@ int lov_prep_match_set(struct obd_export *exp, struct obd_info *oinfo,
 
                 /* XXX LOV STACKING: submd should be from the subobj */
                 req->rq_oi.oi_md->lsm_object_id = loi->loi_id;
-                req->rq_oi.oi_md->lsm_object_gr = lsm->lsm_object_gr;
+                req->rq_oi.oi_md->lsm_object_seq = loi->loi_seq;
                 req->rq_oi.oi_md->lsm_stripe_count = 0;
 
                 lov_set_add_req(req, set);
@@ -469,8 +470,7 @@ int lov_fini_cancel_set(struct lov_request_set *set)
         if (set->set_lockh)
                 lov_llh_put(set->set_lockh);
 
-        if (atomic_dec_and_test(&set->set_refcount))
-                lov_finish_set(set);
+        lov_put_reqset(set);
 
         RETURN(rc);
 }
@@ -506,7 +506,7 @@ int lov_prep_cancel_set(struct obd_export *exp, struct obd_info *oinfo,
 
                 lov_lockhp = set->set_lockh->llh_handles + i;
                 if (!lustre_handle_is_used(lov_lockhp)) {
-                        CDEBUG(D_RPCTRACE,"lov idx %d subobj "LPX64" no lock\n",
+                        CDEBUG(D_INFO, "lov idx %d subobj "LPX64" no lock\n",
                                loi->loi_ost_idx, loi->loi_id);
                         continue;
                 }
@@ -516,7 +516,7 @@ int lov_prep_cancel_set(struct obd_export *exp, struct obd_info *oinfo,
                         GOTO(out_set, rc = -ENOMEM);
 
                 req->rq_buflen = sizeof(*req->rq_oi.oi_md);
-                OBD_ALLOC(req->rq_oi.oi_md, req->rq_buflen);
+                OBD_ALLOC_LARGE(req->rq_oi.oi_md, req->rq_buflen);
                 if (req->rq_oi.oi_md == NULL) {
                         OBD_FREE(req, sizeof(*req));
                         GOTO(out_set, rc = -ENOMEM);
@@ -527,7 +527,7 @@ int lov_prep_cancel_set(struct obd_export *exp, struct obd_info *oinfo,
 
                 /* XXX LOV STACKING: submd should be from the subobj */
                 req->rq_oi.oi_md->lsm_object_id = loi->loi_id;
-                req->rq_oi.oi_md->lsm_object_gr = lsm->lsm_object_gr;
+                req->rq_oi.oi_md->lsm_object_seq = loi->loi_seq;
                 req->rq_oi.oi_md->lsm_stripe_count = 0;
 
                 lov_set_add_req(req, set);
@@ -541,6 +541,56 @@ out_set:
         RETURN(rc);
 }
 
+static int lov_update_create_set(struct lov_request_set *set,
+                                 struct lov_request *req, int rc)
+{
+        struct obd_trans_info *oti = set->set_oti;
+        struct lov_stripe_md *lsm = set->set_oi->oi_md;
+        struct lov_oinfo *loi;
+        struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
+        ENTRY;
+
+        if (rc && lov->lov_tgts[req->rq_idx] &&
+            lov->lov_tgts[req->rq_idx]->ltd_active) {
+                CERROR("error creating fid "LPX64" sub-object"
+                       " on OST idx %d/%d: rc = %d\n",
+                       set->set_oi->oi_oa->o_id, req->rq_idx,
+                       lsm->lsm_stripe_count, rc);
+                if (rc > 0) {
+                        CERROR("obd_create returned invalid err %d\n", rc);
+                        rc = -EIO;
+                }
+        }
+
+        cfs_spin_lock(&set->set_lock);
+        req->rq_stripe = cfs_atomic_read(&set->set_success);
+        loi = lsm->lsm_oinfo[req->rq_stripe];
+
+
+        if (rc) {
+                lov_update_set(set, req, rc);
+                cfs_spin_unlock(&set->set_lock);
+                RETURN(rc);
+        }
+
+        loi->loi_id = req->rq_oi.oi_oa->o_id;
+        loi->loi_seq = req->rq_oi.oi_oa->o_seq;
+        loi->loi_ost_idx = req->rq_idx;
+        loi_init(loi);
+
+        if (oti && set->set_cookies)
+                ++oti->oti_logcookies;
+        if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLCOOKIE)
+                set->set_cookie_sent++;
+
+        lov_update_set(set, req, rc);
+        cfs_spin_unlock(&set->set_lock);
+
+        CDEBUG(D_INODE, "objid "LPX64" has subobj "LPX64"/"LPU64" at idx %d\n",
+               lsm->lsm_object_id, loi->loi_id, loi->loi_id, req->rq_idx);
+        RETURN(rc);
+}
+
 static int create_done(struct obd_export *exp, struct lov_request_set *set,
                        struct lov_stripe_md **lsmp)
 {
@@ -549,39 +599,33 @@ static int create_done(struct obd_export *exp, struct lov_request_set *set,
         struct obdo *src_oa = set->set_oi->oi_oa;
         struct lov_request *req;
         struct obdo *ret_oa = NULL;
-        int attrset = 0, rc = 0;
+        int success, attrset = 0, rc = 0;
         ENTRY;
 
-        LASSERT(set->set_completes);
+        LASSERT(cfs_atomic_read(&set->set_completes));
 
         /* try alloc objects on other osts if osc_create fails for
          * exceptions: RPC failure, ENOSPC, etc */
-        if (set->set_count != set->set_success) {
-                list_for_each_entry (req, &set->set_list, rq_link) {
+        if (set->set_count != cfs_atomic_read(&set->set_success)) {
+                cfs_list_for_each_entry (req, &set->set_list, rq_link) {
                         if (req->rq_rc == 0)
                                 continue;
 
-                        set->set_completes--;
+                        cfs_atomic_dec(&set->set_completes);
                         req->rq_complete = 0;
 
                         rc = qos_remedy_create(set, req);
                         lov_update_create_set(set, req, rc);
-
-                        if (rc)
-                                break;
                 }
         }
 
+        success = cfs_atomic_read(&set->set_success);
         /* no successful creates */
-        if (set->set_success == 0)
+        if (success == 0)
                 GOTO(cleanup, rc);
 
-        /* If there was an explicit stripe set, fail.  Otherwise, we
-         * got some objects and that's not bad. */
-        if (set->set_count != set->set_success) {
-                if (*lsmp)
-                        GOTO(cleanup, rc);
-                set->set_count = set->set_success;
+        if (set->set_count != success) {
+                set->set_count = success;
                 qos_shrink_lsm(set);
         }
 
@@ -589,7 +633,7 @@ static int create_done(struct obd_export *exp, struct lov_request_set *set,
         if (ret_oa == NULL)
                 GOTO(cleanup, rc = -ENOMEM);
 
-        list_for_each_entry(req, &set->set_list, rq_link) {
+        cfs_list_for_each_entry(req, &set->set_list, rq_link) {
                 if (!req->rq_complete || req->rq_rc)
                         continue;
                 lov_merge_attrs(ret_oa, req->rq_oi.oi_oa,
@@ -603,7 +647,7 @@ static int create_done(struct obd_export *exp, struct lov_request_set *set,
                 LBUG();
         }
         ret_oa->o_id = src_oa->o_id;
-        ret_oa->o_gr = src_oa->o_gr;
+        ret_oa->o_seq = src_oa->o_seq;
         ret_oa->o_valid |= OBD_MD_FLID | OBD_MD_FLGROUP;
         memcpy(src_oa, ret_oa, sizeof(*src_oa));
         OBDO_FREE(ret_oa);
@@ -612,7 +656,7 @@ static int create_done(struct obd_export *exp, struct lov_request_set *set,
         GOTO(done, rc = 0);
 
 cleanup:
-        list_for_each_entry(req, &set->set_list, rq_link) {
+        cfs_list_for_each_entry(req, &set->set_list, rq_link) {
                 struct obd_export *sub_exp;
                 int err = 0;
 
@@ -620,7 +664,8 @@ cleanup:
                         continue;
 
                 sub_exp = lov->lov_tgts[req->rq_idx]->ltd_exp;
-                err = obd_destroy(sub_exp, req->rq_oi.oi_oa, NULL, oti, NULL);
+                err = obd_destroy(sub_exp, req->rq_oi.oi_oa, NULL, oti, NULL,
+                                  NULL);
                 if (err)
                         CERROR("Failed to uncreate objid "LPX64" subobj "
                                LPX64" on OST idx %d: rc = %d\n",
@@ -650,55 +695,28 @@ int lov_fini_create_set(struct lov_request_set *set,struct lov_stripe_md **lsmp)
         if (set == NULL)
                 RETURN(0);
         LASSERT(set->set_exp);
-        if (set->set_completes)
+        if (cfs_atomic_read(&set->set_completes))
                 rc = create_done(set->set_exp, set, lsmp);
 
-        if (atomic_dec_and_test(&set->set_refcount))
-                lov_finish_set(set);
-
+        lov_put_reqset(set);
         RETURN(rc);
 }
 
-int lov_update_create_set(struct lov_request_set *set,
-                          struct lov_request *req, int rc)
+int cb_create_update(void *cookie, int rc)
 {
-        struct obd_trans_info *oti = set->set_oti;
-        struct lov_stripe_md *lsm = set->set_oi->oi_md;
-        struct lov_oinfo *loi;
-        struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
-        ENTRY;
-
-        req->rq_stripe = set->set_success;
-        loi = lsm->lsm_oinfo[req->rq_stripe];
-
-        if (rc && lov->lov_tgts[req->rq_idx] &&
-            lov->lov_tgts[req->rq_idx]->ltd_active) {
-                CERROR("error creating fid "LPX64" sub-object"
-                       " on OST idx %d/%d: rc = %d\n",
-                       set->set_oi->oi_oa->o_id, req->rq_idx,
-                       lsm->lsm_stripe_count, rc);
-                if (rc > 0) {
-                        CERROR("obd_create returned invalid err %d\n", rc);
-                        rc = -EIO;
-                }
-        }
-        lov_update_set(set, req, rc);
-        if (rc)
-                RETURN(rc);
+        struct obd_info *oinfo = cookie;
+        struct lov_request *lovreq;
 
-        loi->loi_id = req->rq_oi.oi_oa->o_id;
-        loi->loi_gr = req->rq_oi.oi_oa->o_gr;
-        loi->loi_ost_idx = req->rq_idx;
-        CDEBUG(D_INODE, "objid "LPX64" has subobj "LPX64"/"LPU64" at idx %d\n",
-               lsm->lsm_object_id, loi->loi_id, loi->loi_id, req->rq_idx);
-        loi_init(loi);
+        lovreq = container_of(oinfo, struct lov_request, rq_oi);
 
-        if (oti && set->set_cookies)
-                ++oti->oti_logcookies;
-        if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLCOOKIE)
-                set->set_cookie_sent++;
+        if (CFS_FAIL_CHECK(OBD_FAIL_MDS_OSC_CREATE_FAIL))
+                if (lovreq->rq_idx == cfs_fail_val)
+                        rc = -ENOTCONN;
 
-        RETURN(0);
+        rc= lov_update_create_set(lovreq->rq_rqset, lovreq, rc);
+        if (lov_finished_set(lovreq->rq_rqset))
+                lov_put_reqset(lovreq->rq_rqset);
+        return rc;
 }
 
 int lov_prep_create_set(struct obd_export *exp, struct obd_info *oinfo,
@@ -720,18 +738,23 @@ int lov_prep_create_set(struct obd_export *exp, struct obd_info *oinfo,
         set->set_oi->oi_md = *lsmp;
         set->set_oi->oi_oa = src_oa;
         set->set_oti = oti;
+        lov_get_reqset(set);
 
         rc = qos_prep_create(exp, set);
-        if (rc)
+        /* qos_shrink_lsm() may have allocated a new lsm */
+        *lsmp = oinfo->oi_md;
+        if (rc) {
                 lov_fini_create_set(set, lsmp);
-        else
+                lov_put_reqset(set);
+        } else {
                 *reqset = set;
+        }
         RETURN(rc);
 }
 
 static int common_attr_done(struct lov_request_set *set)
 {
-        struct list_head *pos;
+        cfs_list_t *pos;
         struct lov_request *req;
         struct obdo *tmp_oa;
         int rc = 0, attrset = 0;
@@ -742,15 +765,15 @@ static int common_attr_done(struct lov_request_set *set)
         if (set->set_oi->oi_oa == NULL)
                 RETURN(0);
 
-        if (!set->set_success)
+        if (!cfs_atomic_read(&set->set_success))
                 RETURN(-EIO);
 
         OBDO_ALLOC(tmp_oa);
         if (tmp_oa == NULL)
                 GOTO(out, rc = -ENOMEM);
 
-        list_for_each (pos, &set->set_list) {
-                req = list_entry(pos, struct lov_request, rq_link);
+        cfs_list_for_each (pos, &set->set_list) {
+                req = cfs_list_entry(pos, struct lov_request, rq_link);
 
                 if (!req->rq_complete || req->rq_rc)
                         continue;
@@ -764,6 +787,14 @@ static int common_attr_done(struct lov_request_set *set)
                 CERROR("No stripes had valid attrs\n");
                 rc = -EIO;
         }
+        if ((set->set_oi->oi_oa->o_valid & OBD_MD_FLEPOCH) &&
+            (set->set_oi->oi_md->lsm_stripe_count != attrset)) {
+                /* When we take attributes of some epoch, we require all the
+                 * ost to be active. */
+                CERROR("Not all the stripes had valid attrs\n");
+                GOTO(out, rc = -EIO);
+        }
+
         tmp_oa->o_id = set->set_oi->oi_oa->o_id;
         memcpy(set->set_oi->oi_oa, tmp_oa, sizeof(*set->set_oi->oi_oa));
 out:
@@ -777,12 +808,12 @@ static int brw_done(struct lov_request_set *set)
 {
         struct lov_stripe_md *lsm = set->set_oi->oi_md;
         struct lov_oinfo     *loi = NULL;
-        struct list_head *pos;
+        cfs_list_t *pos;
         struct lov_request *req;
         ENTRY;
 
-        list_for_each (pos, &set->set_list) {
-                req = list_entry(pos, struct lov_request, rq_link);
+        cfs_list_for_each (pos, &set->set_list) {
+                req = cfs_list_entry(pos, struct lov_request, rq_link);
 
                 if (!req->rq_complete || req->rq_rc)
                         continue;
@@ -804,12 +835,11 @@ int lov_fini_brw_set(struct lov_request_set *set)
         if (set == NULL)
                 RETURN(0);
         LASSERT(set->set_exp);
-        if (set->set_completes) {
+        if (cfs_atomic_read(&set->set_completes)) {
                 rc = brw_done(set);
                 /* FIXME update qos data here */
         }
-        if (atomic_dec_and_test(&set->set_refcount))
-                lov_finish_set(set);
+        lov_put_reqset(set);
 
         RETURN(rc);
 }
@@ -838,11 +868,11 @@ int lov_prep_brw_set(struct obd_export *exp, struct obd_info *oinfo,
         set->set_oti = oti;
         set->set_oi = oinfo;
         set->set_oabufs = oa_bufs;
-        OBD_ALLOC(set->set_pga, oa_bufs * sizeof(*set->set_pga));
+        OBD_ALLOC_LARGE(set->set_pga, oa_bufs * sizeof(*set->set_pga));
         if (!set->set_pga)
                 GOTO(out, rc = -ENOMEM);
 
-        OBD_ALLOC(info, sizeof(*info) * oinfo->oi_md->lsm_stripe_count);
+        OBD_ALLOC_LARGE(info, sizeof(*info) * oinfo->oi_md->lsm_stripe_count);
         if (!info)
                 GOTO(out, rc = -ENOMEM);
 
@@ -883,10 +913,11 @@ int lov_prep_brw_set(struct obd_export *exp, struct obd_info *oinfo,
                                sizeof(*req->rq_oi.oi_oa));
                 }
                 req->rq_oi.oi_oa->o_id = loi->loi_id;
+                req->rq_oi.oi_oa->o_seq = loi->loi_seq;
                 req->rq_oi.oi_oa->o_stripe_idx = i;
 
                 req->rq_buflen = sizeof(*req->rq_oi.oi_md);
-                OBD_ALLOC(req->rq_oi.oi_md, req->rq_buflen);
+                OBD_ALLOC_LARGE(req->rq_oi.oi_md, req->rq_buflen);
                 if (req->rq_oi.oi_md == NULL) {
                         OBDO_FREE(req->rq_oi.oi_oa);
                         OBD_FREE(req, sizeof(*req));
@@ -898,7 +929,7 @@ int lov_prep_brw_set(struct obd_export *exp, struct obd_info *oinfo,
 
                 /* XXX LOV STACKING */
                 req->rq_oi.oi_md->lsm_object_id = loi->loi_id;
-                req->rq_oi.oi_md->lsm_object_gr = oinfo->oi_md->lsm_object_gr;
+                req->rq_oi.oi_md->lsm_object_seq = loi->loi_seq;
                 req->rq_oabufs = info[i].count;
                 req->rq_pgaidx = shift;
                 shift += req->rq_oabufs;
@@ -926,7 +957,8 @@ int lov_prep_brw_set(struct obd_export *exp, struct obd_info *oinfo,
         }
 out:
         if (info)
-                OBD_FREE(info, sizeof(*info) * oinfo->oi_md->lsm_stripe_count);
+                OBD_FREE_LARGE(info,
+                               sizeof(*info) * oinfo->oi_md->lsm_stripe_count);
 
         if (rc == 0)
                 *reqset = set;
@@ -944,17 +976,16 @@ int lov_fini_getattr_set(struct lov_request_set *set)
         if (set == NULL)
                 RETURN(0);
         LASSERT(set->set_exp);
-        if (set->set_completes)
+        if (cfs_atomic_read(&set->set_completes))
                 rc = common_attr_done(set);
 
-        if (atomic_dec_and_test(&set->set_refcount))
-                lov_finish_set(set);
+        lov_put_reqset(set);
 
         RETURN(rc);
 }
 
 /* The callback for osc_getattr_async that finilizes a request info when a
- * response is recieved. */
+ * response is received. */
 static int cb_getattr_update(void *cookie, int rc)
 {
         struct obd_info *oinfo = cookie;
@@ -987,6 +1018,9 @@ int lov_prep_getattr_set(struct obd_export *exp, struct obd_info *oinfo,
                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
+                        if (oinfo->oi_oa->o_valid & OBD_MD_FLEPOCH)
+                                /* SOM requires all the OSTs to be active. */
+                                GOTO(out_set, rc = -EIO);
                         continue;
                 }
 
@@ -1005,9 +1039,9 @@ int lov_prep_getattr_set(struct obd_export *exp, struct obd_info *oinfo,
                 memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
                        sizeof(*req->rq_oi.oi_oa));
                 req->rq_oi.oi_oa->o_id = loi->loi_id;
+                req->rq_oi.oi_oa->o_seq = loi->loi_seq;
                 req->rq_oi.oi_cb_up = cb_getattr_update;
                 req->rq_oi.oi_capa = oinfo->oi_capa;
-                req->rq_rqset = set;
 
                 lov_set_add_req(req, set);
         }
@@ -1027,12 +1061,11 @@ int lov_fini_destroy_set(struct lov_request_set *set)
         if (set == NULL)
                 RETURN(0);
         LASSERT(set->set_exp);
-        if (set->set_completes) {
+        if (cfs_atomic_read(&set->set_completes)) {
                 /* FIXME update qos data here */
         }
 
-        if (atomic_dec_and_test(&set->set_refcount))
-                lov_finish_set(set);
+        lov_put_reqset(set);
 
         RETURN(0);
 }
@@ -1085,6 +1118,7 @@ int lov_prep_destroy_set(struct obd_export *exp, struct obd_info *oinfo,
                 }
                 memcpy(req->rq_oi.oi_oa, src_oa, sizeof(*req->rq_oi.oi_oa));
                 req->rq_oi.oi_oa->o_id = loi->loi_id;
+                req->rq_oi.oi_oa->o_seq = loi->loi_seq;
                 lov_set_add_req(req, set);
         }
         if (!set->set_count)
@@ -1104,13 +1138,12 @@ int lov_fini_setattr_set(struct lov_request_set *set)
         if (set == NULL)
                 RETURN(0);
         LASSERT(set->set_exp);
-        if (set->set_completes) {
+        if (cfs_atomic_read(&set->set_completes)) {
                 rc = common_attr_done(set);
                 /* FIXME update qos data here */
         }
 
-        if (atomic_dec_and_test(&set->set_refcount))
-                lov_finish_set(set);
+        lov_put_reqset(set);
         RETURN(rc);
 }
 
@@ -1144,7 +1177,7 @@ int lov_update_setattr_set(struct lov_request_set *set,
 }
 
 /* The callback for osc_setattr_async that finilizes a request info when a
- * response is recieved. */
+ * response is received. */
 static int cb_setattr_update(void *cookie, int rc)
 {
         struct obd_info *oinfo = cookie;
@@ -1197,12 +1230,10 @@ int lov_prep_setattr_set(struct obd_export *exp, struct obd_info *oinfo,
                 memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
                        sizeof(*req->rq_oi.oi_oa));
                 req->rq_oi.oi_oa->o_id = loi->loi_id;
-                LASSERT(!(req->rq_oi.oi_oa->o_valid & OBD_MD_FLGROUP)
-                                || req->rq_oi.oi_oa->o_gr>0);
+                req->rq_oi.oi_oa->o_seq= loi->loi_seq;
                 req->rq_oi.oi_oa->o_stripe_idx = i;
                 req->rq_oi.oi_cb_up = cb_setattr_update;
                 req->rq_oi.oi_capa = oinfo->oi_capa;
-                req->rq_rqset = set;
 
                 if (oinfo->oi_oa->o_valid & OBD_MD_FLSIZE) {
                         int off = lov_stripe_offset(oinfo->oi_md,
@@ -1235,15 +1266,14 @@ int lov_fini_punch_set(struct lov_request_set *set)
         if (set == NULL)
                 RETURN(0);
         LASSERT(set->set_exp);
-        if (set->set_completes) {
+        if (cfs_atomic_read(&set->set_completes)) {
                 rc = -EIO;
                 /* FIXME update qos data here */
-                if (set->set_success)
+                if (cfs_atomic_read(&set->set_success))
                         rc = common_attr_done(set);
         }
 
-        if (atomic_dec_and_test(&set->set_refcount))
-                lov_finish_set(set);
+        lov_put_reqset(set);
 
         RETURN(rc);
 }
@@ -1277,7 +1307,7 @@ int lov_update_punch_set(struct lov_request_set *set,
 }
 
 /* The callback for osc_punch that finilizes a request info when a response
- * is recieved. */
+ * is received. */
 static int cb_update_punch(void *cookie, int rc)
 {
         struct obd_info *oinfo = cookie;
@@ -1308,18 +1338,18 @@ int lov_prep_punch_set(struct obd_export *exp, struct obd_info *oinfo,
                 struct lov_request *req;
                 obd_off rs, re;
 
-                if (!lov->lov_tgts[loi->loi_ost_idx] ||
-                    !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
-                        CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
-                        continue;
-                }
-
                 if (!lov_stripe_intersects(oinfo->oi_md, i,
                                            oinfo->oi_policy.l_extent.start,
                                            oinfo->oi_policy.l_extent.end,
                                            &rs, &re))
                         continue;
 
+                if (!lov->lov_tgts[loi->loi_ost_idx] ||
+                    !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
+                        CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
+                        GOTO(out_set, rc = -EIO);
+                }
+
                 OBD_ALLOC(req, sizeof(*req));
                 if (req == NULL)
                         GOTO(out_set, rc = -ENOMEM);
@@ -1334,12 +1364,11 @@ int lov_prep_punch_set(struct obd_export *exp, struct obd_info *oinfo,
                 memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
                        sizeof(*req->rq_oi.oi_oa));
                 req->rq_oi.oi_oa->o_id = loi->loi_id;
-                req->rq_oi.oi_oa->o_gr = loi->loi_gr;
+                req->rq_oi.oi_oa->o_seq = loi->loi_seq;
                 req->rq_oi.oi_oa->o_valid |= OBD_MD_FLGROUP;
 
                 req->rq_oi.oi_oa->o_stripe_idx = i;
                 req->rq_oi.oi_cb_up = cb_update_punch;
-                req->rq_rqset = set;
 
                 req->rq_oi.oi_policy.l_extent.start = rs;
                 req->rq_oi.oi_policy.l_extent.end = re;
@@ -1366,20 +1395,29 @@ int lov_fini_sync_set(struct lov_request_set *set)
         if (set == NULL)
                 RETURN(0);
         LASSERT(set->set_exp);
-        if (set->set_completes) {
-                if (!set->set_success)
+        if (cfs_atomic_read(&set->set_completes)) {
+                if (!cfs_atomic_read(&set->set_success))
                         rc = -EIO;
                 /* FIXME update qos data here */
         }
 
-        if (atomic_dec_and_test(&set->set_refcount))
-                lov_finish_set(set);
+        lov_put_reqset(set);
 
         RETURN(rc);
 }
 
+/* The callback for osc_sync that finilizes a request info when a
+ * response is recieved. */
+static int cb_sync_update(void *cookie, int rc)
+{
+        struct obd_info *oinfo = cookie;
+        struct lov_request *lovreq;
+
+        lovreq = container_of(oinfo, struct lov_request, rq_oi);
+        return lov_update_common_set(lovreq->rq_rqset, lovreq, rc);
+}
+
 int lov_prep_sync_set(struct obd_export *exp, struct obd_info *oinfo,
-                      struct obdo *src_oa, struct lov_stripe_md *lsm,
                       obd_off start, obd_off end,
                       struct lov_request_set **reqset)
 {
@@ -1388,18 +1426,16 @@ int lov_prep_sync_set(struct obd_export *exp, struct obd_info *oinfo,
         int rc = 0, i;
         ENTRY;
 
-        OBD_ALLOC(set, sizeof(*set));
+        OBD_ALLOC_PTR(set);
         if (set == NULL)
                 RETURN(-ENOMEM);
         lov_init_set(set);
 
         set->set_exp = exp;
         set->set_oi = oinfo;
-        set->set_oi->oi_md = lsm;
-        set->set_oi->oi_oa = src_oa;
 
-        for (i = 0; i < lsm->lsm_stripe_count; i++) {
-                struct lov_oinfo *loi = lsm->lsm_oinfo[i];
+        for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
+                struct lov_oinfo *loi = oinfo->oi_md->lsm_oinfo[i];
                 struct lov_request *req;
                 obd_off rs, re;
 
@@ -1409,10 +1445,11 @@ int lov_prep_sync_set(struct obd_export *exp, struct obd_info *oinfo,
                         continue;
                 }
 
-                if (!lov_stripe_intersects(lsm, i, start, end, &rs, &re))
+                if (!lov_stripe_intersects(oinfo->oi_md, i, start, end, &rs,
+                                           &re))
                         continue;
 
-                OBD_ALLOC(req, sizeof(*req));
+                OBD_ALLOC_PTR(req);
                 if (req == NULL)
                         GOTO(out_set, rc = -ENOMEM);
                 req->rq_stripe = i;
@@ -1423,13 +1460,15 @@ int lov_prep_sync_set(struct obd_export *exp, struct obd_info *oinfo,
                         OBD_FREE(req, sizeof(*req));
                         GOTO(out_set, rc = -ENOMEM);
                 }
-                memcpy(req->rq_oi.oi_oa, src_oa, sizeof(*req->rq_oi.oi_oa));
+                *req->rq_oi.oi_oa = *oinfo->oi_oa;
                 req->rq_oi.oi_oa->o_id = loi->loi_id;
+                req->rq_oi.oi_oa->o_seq = loi->loi_seq;
                 req->rq_oi.oi_oa->o_stripe_idx = i;
 
                 req->rq_oi.oi_policy.l_extent.start = rs;
                 req->rq_oi.oi_policy.l_extent.end = re;
                 req->rq_oi.oi_policy.l_extent.gid = -1;
+                req->rq_oi.oi_cb_up = cb_sync_update;
 
                 lov_set_add_req(req, set);
         }
@@ -1456,17 +1495,17 @@ int lov_fini_statfs(struct obd_device *obd, struct obd_statfs *osfs,int success)
         ENTRY;
 
         if (success) {
-                __u32 expected_stripes = lov_get_stripecnt(&obd->u.lov, 0);
-
+                __u32 expected_stripes = lov_get_stripecnt(&obd->u.lov,
+                                                           LOV_MAGIC, 0);
                 if (osfs->os_files != LOV_U64_MAX)
                         do_div(osfs->os_files, expected_stripes);
                 if (osfs->os_ffree != LOV_U64_MAX)
                         do_div(osfs->os_ffree, expected_stripes);
 
-                spin_lock(&obd->obd_osfs_lock);
+                cfs_spin_lock(&obd->obd_osfs_lock);
                 memcpy(&obd->obd_osfs, osfs, sizeof(*osfs));
                 obd->obd_osfs_age = cfs_time_current_64();
-                spin_unlock(&obd->obd_osfs_lock);
+                cfs_spin_unlock(&obd->obd_osfs_lock);
                 RETURN(0);
         }
 
@@ -1481,14 +1520,11 @@ int lov_fini_statfs_set(struct lov_request_set *set)
         if (set == NULL)
                 RETURN(0);
 
-        if (set->set_completes) {
+        if (cfs_atomic_read(&set->set_completes)) {
                 rc = lov_fini_statfs(set->set_obd, set->set_oi->oi_osfs,
-                                     set->set_success);
+                                     cfs_atomic_read(&set->set_success));
         }
-
-        if (atomic_dec_and_test(&set->set_refcount))
-                lov_finish_set(set);
-
+        lov_put_reqset(set);
         RETURN(rc);
 }
 
@@ -1561,44 +1597,57 @@ void lov_update_statfs(struct obd_statfs *osfs, struct obd_statfs *lov_sfs,
 }
 
 /* The callback for osc_statfs_async that finilizes a request info when a
- * response is recieved. */
+ * response is received. */
 static int cb_statfs_update(void *cookie, int rc)
 {
         struct obd_info *oinfo = cookie;
         struct lov_request *lovreq;
+        struct lov_request_set *set;
         struct obd_statfs *osfs, *lov_sfs;
-        struct obd_device *obd;
         struct lov_obd *lov;
+        struct lov_tgt_desc *tgt;
+        struct obd_device *lovobd, *tgtobd;
         int success;
         ENTRY;
 
         lovreq = container_of(oinfo, struct lov_request, rq_oi);
-        lov = &lovreq->rq_rqset->set_obd->u.lov;
-        obd = class_exp2obd(lov->lov_tgts[lovreq->rq_idx]->ltd_exp);
-
-        osfs = lovreq->rq_rqset->set_oi->oi_osfs;
+        set = lovreq->rq_rqset;
+        lovobd = set->set_obd;
+        lov = &lovobd->u.lov;
+        osfs = set->set_oi->oi_osfs;
         lov_sfs = oinfo->oi_osfs;
-
-        success = lovreq->rq_rqset->set_success;
-
+        success = cfs_atomic_read(&set->set_success);
         /* XXX: the same is done in lov_update_common_set, however
            lovset->set_exp is not initialized. */
-        lov_update_set(lovreq->rq_rqset, lovreq, rc);
-        if (rc) {
-                if (rc && !(lov->lov_tgts[lovreq->rq_idx] &&
-                            lov->lov_tgts[lovreq->rq_idx]->ltd_active))
-                        rc = 0;
-                RETURN(rc);
-        }
+        lov_update_set(set, lovreq, rc);
+        if (rc)
+                GOTO(out, rc);
+
+        obd_getref(lovobd);
+        tgt = lov->lov_tgts[lovreq->rq_idx];
+        if (!tgt || !tgt->ltd_active)
+                GOTO(out_update, rc);
 
-        spin_lock(&obd->obd_osfs_lock);
-        memcpy(&obd->obd_osfs, lov_sfs, sizeof(*lov_sfs));
+        tgtobd = class_exp2obd(tgt->ltd_exp);
+        cfs_spin_lock(&tgtobd->obd_osfs_lock);
+        memcpy(&tgtobd->obd_osfs, lov_sfs, sizeof(*lov_sfs));
         if ((oinfo->oi_flags & OBD_STATFS_FROM_CACHE) == 0)
-                obd->obd_osfs_age = cfs_time_current_64();
-        spin_unlock(&obd->obd_osfs_lock);
+                tgtobd->obd_osfs_age = cfs_time_current_64();
+        cfs_spin_unlock(&tgtobd->obd_osfs_lock);
 
+out_update:
         lov_update_statfs(osfs, lov_sfs, success);
         qos_update(lov);
+        obd_putref(lovobd);
+
+out:
+        if (set->set_oi->oi_flags & OBD_STATFS_PTLRPCD &&
+            lov_finished_set(set)) {
+                lov_statfs_interpret(NULL, set, set->set_count !=
+                                     cfs_atomic_read(&set->set_success));
+                if (lov->lov_qos.lq_statfs_in_progress)
+                        qos_statfs_done(lov);
+        }
 
         RETURN(0);
 }
@@ -1623,11 +1672,19 @@ int lov_prep_statfs_set(struct obd_device *obd, struct obd_info *oinfo,
         for (i = 0; i < lov->desc.ld_tgt_count; i++) {
                 struct lov_request *req;
 
-                if (!lov->lov_tgts[i] || !lov->lov_tgts[i]->ltd_active) {
+                if (!lov->lov_tgts[i] || (!lov->lov_tgts[i]->ltd_active
+                                          && (oinfo->oi_flags & OBD_STATFS_NODELAY))) {
                         CDEBUG(D_HA, "lov idx %d inactive\n", i);
                         continue;
                 }
 
+                /* skip targets that have been explicitely disabled by the
+                 * administrator */
+                if (!lov->lov_tgts[i]->ltd_exp) {
+                        CDEBUG(D_HA, "lov idx %d administratively disabled\n", i);
+                        continue;
+                }
+
                 OBD_ALLOC(req, sizeof(*req));
                 if (req == NULL)
                         GOTO(out_set, rc = -ENOMEM);
@@ -1641,7 +1698,6 @@ int lov_prep_statfs_set(struct obd_device *obd, struct obd_info *oinfo,
                 req->rq_idx = i;
                 req->rq_oi.oi_cb_up = cb_statfs_update;
                 req->rq_oi.oi_flags = oinfo->oi_flags;
-                req->rq_rqset = set;
 
                 lov_set_add_req(req, set);
         }