Whamcloud - gitweb
b=21966 can not mount mdt after umount
[fs/lustre-release.git] / lustre / lov / lov_request.c
index 9e898e3..7c0e040 100644 (file)
@@ -58,21 +58,22 @@ static void lov_init_set(struct lov_request_set *set)
         set->set_success = 0;
         set->set_cookies = 0;
         CFS_INIT_LIST_HEAD(&set->set_list);
-        atomic_set(&set->set_refcount, 1);
+        cfs_atomic_set(&set->set_refcount, 1);
         cfs_waitq_init(&set->set_waitq);
-        spin_lock_init(&set->set_lock);
+        cfs_spin_lock_init(&set->set_lock);
 }
 
-static void lov_finish_set(struct lov_request_set *set)
+void lov_finish_set(struct lov_request_set *set)
 {
-        struct list_head *pos, *n;
+        cfs_list_t *pos, *n;
         ENTRY;
 
         LASSERT(set);
-        list_for_each_safe(pos, n, &set->set_list) {
-                struct lov_request *req = list_entry(pos, struct lov_request,
-                                                     rq_link);
-                list_del_init(&req->rq_link);
+        cfs_list_for_each_safe(pos, n, &set->set_list) {
+                struct lov_request *req = cfs_list_entry(pos,
+                                                         struct lov_request,
+                                                         rq_link);
+                cfs_list_del_init(&req->rq_link);
 
                 if (req->rq_oi.oi_oa)
                         OBDO_FREE(req->rq_oi.oi_oa);
@@ -135,7 +136,7 @@ int lov_update_common_set(struct lov_request_set *set,
 
 void lov_set_add_req(struct lov_request *req, struct lov_request_set *set)
 {
-        list_add_tail(&req->rq_link, &set->set_list);
+        cfs_list_add_tail(&req->rq_link, &set->set_list);
         set->set_count++;
         req->rq_rqset = set;
 }
@@ -221,7 +222,7 @@ static int enqueue_done(struct lov_request_set *set, __u32 mode)
                 RETURN(0);
 
         /* cancel enqueued/matched locks */
-        list_for_each_entry(req, &set->set_list, rq_link) {
+        cfs_list_for_each_entry(req, &set->set_list, rq_link) {
                 struct lustre_handle *lov_lockhp;
 
                 if (!req->rq_complete || req->rq_rc)
@@ -264,8 +265,7 @@ int lov_fini_enqueue_set(struct lov_request_set *set, __u32 mode, int rc,
         } else if (set->set_lockh)
                 lov_llh_put(set->set_lockh);
 
-        if (atomic_dec_and_test(&set->set_refcount))
-                lov_finish_set(set);
+        lov_put_reqset(set);
 
         RETURN(rc ? rc : ret);
 }
@@ -388,8 +388,7 @@ int lov_fini_match_set(struct lov_request_set *set, __u32 mode, int flags)
             (flags & LDLM_FL_TEST_LOCK))
                 lov_llh_put(set->set_lockh);
 
-        if (atomic_dec_and_test(&set->set_refcount))
-                lov_finish_set(set);
+        lov_put_reqset(set);
 
         RETURN(rc);
 }
@@ -480,8 +479,7 @@ int lov_fini_cancel_set(struct lov_request_set *set)
         if (set->set_lockh)
                 lov_llh_put(set->set_lockh);
 
-        if (atomic_dec_and_test(&set->set_refcount))
-                lov_finish_set(set);
+        lov_put_reqset(set);
 
         RETURN(rc);
 }
@@ -517,7 +515,7 @@ int lov_prep_cancel_set(struct obd_export *exp, struct obd_info *oinfo,
 
                 lov_lockhp = set->set_lockh->llh_handles + i;
                 if (!lustre_handle_is_used(lov_lockhp)) {
-                        CDEBUG(D_RPCTRACE,"lov idx %d subobj "LPX64" no lock\n",
+                        CDEBUG(D_INFO, "lov idx %d subobj "LPX64" no lock\n",
                                loi->loi_ost_idx, loi->loi_id);
                         continue;
                 }
@@ -568,7 +566,7 @@ static int create_done(struct obd_export *exp, struct lov_request_set *set,
         /* try alloc objects on other osts if osc_create fails for
          * exceptions: RPC failure, ENOSPC, etc */
         if (set->set_count != set->set_success) {
-                list_for_each_entry (req, &set->set_list, rq_link) {
+                cfs_list_for_each_entry (req, &set->set_list, rq_link) {
                         if (req->rq_rc == 0)
                                 continue;
 
@@ -593,7 +591,7 @@ static int create_done(struct obd_export *exp, struct lov_request_set *set,
         if (ret_oa == NULL)
                 GOTO(cleanup, rc = -ENOMEM);
 
-        list_for_each_entry(req, &set->set_list, rq_link) {
+        cfs_list_for_each_entry(req, &set->set_list, rq_link) {
                 if (!req->rq_complete || req->rq_rc)
                         continue;
                 lov_merge_attrs(ret_oa, req->rq_oi.oi_oa,
@@ -616,7 +614,7 @@ static int create_done(struct obd_export *exp, struct lov_request_set *set,
         GOTO(done, rc = 0);
 
 cleanup:
-        list_for_each_entry(req, &set->set_list, rq_link) {
+        cfs_list_for_each_entry(req, &set->set_list, rq_link) {
                 struct obd_export *sub_exp;
                 int err = 0;
 
@@ -658,9 +656,7 @@ int lov_fini_create_set(struct lov_request_set *set,struct lov_stripe_md **lsmp)
         if (set->set_completes)
                 rc = create_done(set->set_exp, set, lsmp);
 
-        if (atomic_dec_and_test(&set->set_refcount))
-                lov_finish_set(set);
-
+        lov_put_reqset(set);
         RETURN(rc);
 }
 
@@ -685,12 +681,12 @@ int lov_update_create_set(struct lov_request_set *set,
                 }
         }
 
-        spin_lock(&set->set_lock);
+        cfs_spin_lock(&set->set_lock);
         req->rq_stripe = set->set_success;
         loi = lsm->lsm_oinfo[req->rq_stripe];
         if (rc) {
                 lov_update_set(set, req, rc);
-                spin_unlock(&set->set_lock);
+                cfs_spin_unlock(&set->set_lock);
                 RETURN(rc);
         }
 
@@ -705,7 +701,7 @@ int lov_update_create_set(struct lov_request_set *set,
                 set->set_cookie_sent++;
 
         lov_update_set(set, req, rc);
-        spin_unlock(&set->set_lock);
+        cfs_spin_unlock(&set->set_lock);
 
         CDEBUG(D_INODE, "objid "LPX64" has subobj "LPX64"/"LPU64" at idx %d\n",
                lsm->lsm_object_id, loi->loi_id, loi->loi_id, req->rq_idx);
@@ -718,7 +714,10 @@ int cb_create_update(void *cookie, int rc)
         struct lov_request *lovreq;
 
         lovreq = container_of(oinfo, struct lov_request, rq_oi);
-        return lov_update_create_set(lovreq->rq_rqset, lovreq, rc);
+        rc= lov_update_create_set(lovreq->rq_rqset, lovreq, rc);
+        if (lov_finished_set(lovreq->rq_rqset))
+                lov_put_reqset(lovreq->rq_rqset);
+        return rc;
 }
 
 
@@ -741,20 +740,23 @@ int lov_prep_create_set(struct obd_export *exp, struct obd_info *oinfo,
         set->set_oi->oi_md = *lsmp;
         set->set_oi->oi_oa = src_oa;
         set->set_oti = oti;
+        lov_get_reqset(set);
 
         rc = qos_prep_create(exp, set);
         /* qos_shrink_lsm() may have allocated a new lsm */
         *lsmp = oinfo->oi_md;
-        if (rc)
+        if (rc) {
                 lov_fini_create_set(set, lsmp);
-        else
+                lov_put_reqset(set);
+        } else {
                 *reqset = set;
+        }
         RETURN(rc);
 }
 
 static int common_attr_done(struct lov_request_set *set)
 {
-        struct list_head *pos;
+        cfs_list_t *pos;
         struct lov_request *req;
         struct obdo *tmp_oa;
         int rc = 0, attrset = 0;
@@ -772,8 +774,8 @@ static int common_attr_done(struct lov_request_set *set)
         if (tmp_oa == NULL)
                 GOTO(out, rc = -ENOMEM);
 
-        list_for_each (pos, &set->set_list) {
-                req = list_entry(pos, struct lov_request, rq_link);
+        cfs_list_for_each (pos, &set->set_list) {
+                req = cfs_list_entry(pos, struct lov_request, rq_link);
 
                 if (!req->rq_complete || req->rq_rc)
                         continue;
@@ -787,6 +789,14 @@ static int common_attr_done(struct lov_request_set *set)
                 CERROR("No stripes had valid attrs\n");
                 rc = -EIO;
         }
+        if ((set->set_oi->oi_oa->o_valid & OBD_MD_FLEPOCH) &&
+            (set->set_oi->oi_md->lsm_stripe_count != attrset)) {
+                /* When we take attributes of some epoch, we require all the
+                 * ost to be active. */
+                CERROR("Not all the stripes had valid attrs\n");
+                GOTO(out, rc = -EIO);
+        }
+
         tmp_oa->o_id = set->set_oi->oi_oa->o_id;
         memcpy(set->set_oi->oi_oa, tmp_oa, sizeof(*set->set_oi->oi_oa));
 out:
@@ -800,12 +810,12 @@ static int brw_done(struct lov_request_set *set)
 {
         struct lov_stripe_md *lsm = set->set_oi->oi_md;
         struct lov_oinfo     *loi = NULL;
-        struct list_head *pos;
+        cfs_list_t *pos;
         struct lov_request *req;
         ENTRY;
 
-        list_for_each (pos, &set->set_list) {
-                req = list_entry(pos, struct lov_request, rq_link);
+        cfs_list_for_each (pos, &set->set_list) {
+                req = cfs_list_entry(pos, struct lov_request, rq_link);
 
                 if (!req->rq_complete || req->rq_rc)
                         continue;
@@ -831,8 +841,7 @@ int lov_fini_brw_set(struct lov_request_set *set)
                 rc = brw_done(set);
                 /* FIXME update qos data here */
         }
-        if (atomic_dec_and_test(&set->set_refcount))
-                lov_finish_set(set);
+        lov_put_reqset(set);
 
         RETURN(rc);
 }
@@ -970,8 +979,7 @@ int lov_fini_getattr_set(struct lov_request_set *set)
         if (set->set_completes)
                 rc = common_attr_done(set);
 
-        if (atomic_dec_and_test(&set->set_refcount))
-                lov_finish_set(set);
+        lov_put_reqset(set);
 
         RETURN(rc);
 }
@@ -1010,6 +1018,9 @@ int lov_prep_getattr_set(struct obd_export *exp, struct obd_info *oinfo,
                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
+                        if (oinfo->oi_oa->o_valid & OBD_MD_FLEPOCH)
+                                /* SOM requires all the OSTs to be active. */
+                                GOTO(out_set, rc = -EIO);
                         continue;
                 }
 
@@ -1053,8 +1064,7 @@ int lov_fini_destroy_set(struct lov_request_set *set)
                 /* FIXME update qos data here */
         }
 
-        if (atomic_dec_and_test(&set->set_refcount))
-                lov_finish_set(set);
+        lov_put_reqset(set);
 
         RETURN(0);
 }
@@ -1131,8 +1141,7 @@ int lov_fini_setattr_set(struct lov_request_set *set)
                 /* FIXME update qos data here */
         }
 
-        if (atomic_dec_and_test(&set->set_refcount))
-                lov_finish_set(set);
+        lov_put_reqset(set);
         RETURN(rc);
 }
 
@@ -1219,11 +1228,6 @@ int lov_prep_setattr_set(struct obd_export *exp, struct obd_info *oinfo,
                 memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
                        sizeof(*req->rq_oi.oi_oa));
                 req->rq_oi.oi_oa->o_id = loi->loi_id;
-                LASSERTF(!(req->rq_oi.oi_oa->o_valid & OBD_MD_FLGROUP) ||
-                         CHECK_MDS_GROUP(req->rq_oi.oi_oa->o_gr),
-                         "req->rq_oi.oi_oa->o_valid="LPX64" "
-                         "req->rq_oi.oi_oa->o_gr="LPU64"\n",
-                         req->rq_oi.oi_oa->o_valid, req->rq_oi.oi_oa->o_gr);
                 req->rq_oi.oi_oa->o_stripe_idx = i;
                 req->rq_oi.oi_cb_up = cb_setattr_update;
                 req->rq_oi.oi_capa = oinfo->oi_capa;
@@ -1266,8 +1270,7 @@ int lov_fini_punch_set(struct lov_request_set *set)
                         rc = common_attr_done(set);
         }
 
-        if (atomic_dec_and_test(&set->set_refcount))
-                lov_finish_set(set);
+        lov_put_reqset(set);
 
         RETURN(rc);
 }
@@ -1395,8 +1398,7 @@ int lov_fini_sync_set(struct lov_request_set *set)
                 /* FIXME update qos data here */
         }
 
-        if (atomic_dec_and_test(&set->set_refcount))
-                lov_finish_set(set);
+        lov_put_reqset(set);
 
         RETURN(rc);
 }
@@ -1486,10 +1488,10 @@ int lov_fini_statfs(struct obd_device *obd, struct obd_statfs *osfs,int success)
                 if (osfs->os_ffree != LOV_U64_MAX)
                         do_div(osfs->os_ffree, expected_stripes);
 
-                spin_lock(&obd->obd_osfs_lock);
+                cfs_spin_lock(&obd->obd_osfs_lock);
                 memcpy(&obd->obd_osfs, osfs, sizeof(*osfs));
                 obd->obd_osfs_age = cfs_time_current_64();
-                spin_unlock(&obd->obd_osfs_lock);
+                cfs_spin_unlock(&obd->obd_osfs_lock);
                 RETURN(0);
         }
 
@@ -1508,10 +1510,7 @@ int lov_fini_statfs_set(struct lov_request_set *set)
                 rc = lov_fini_statfs(set->set_obd, set->set_oi->oi_osfs,
                                      set->set_success);
         }
-
-        if (atomic_dec_and_test(&set->set_refcount))
-                lov_finish_set(set);
-
+        lov_put_reqset(set);
         RETURN(rc);
 }
 
@@ -1590,38 +1589,41 @@ static int cb_statfs_update(void *cookie, int rc)
         struct obd_info *oinfo = cookie;
         struct lov_request *lovreq;
         struct obd_statfs *osfs, *lov_sfs;
-        struct obd_device *obd;
         struct lov_obd *lov;
+        struct lov_tgt_desc *tgt;
+        struct obd_device *lovobd, *tgtobd;
         int success;
         ENTRY;
 
         lovreq = container_of(oinfo, struct lov_request, rq_oi);
-        lov = &lovreq->rq_rqset->set_obd->u.lov;
-        obd = class_exp2obd(lov->lov_tgts[lovreq->rq_idx]->ltd_exp);
-
+        lovobd = lovreq->rq_rqset->set_obd;
+        lov = &lovobd->u.lov;
         osfs = lovreq->rq_rqset->set_oi->oi_osfs;
         lov_sfs = oinfo->oi_osfs;
-
         success = lovreq->rq_rqset->set_success;
         /* XXX: the same is done in lov_update_common_set, however
            lovset->set_exp is not initialized. */
         lov_update_set(lovreq->rq_rqset, lovreq, rc);
-        if (rc) {
-                /* XXX ignore error for disconnected ost ? */
-                if (rc && !(lov->lov_tgts[lovreq->rq_idx] &&
-                            lov->lov_tgts[lovreq->rq_idx]->ltd_active))
-                        rc = 0;
+        if (rc)
                 GOTO(out, rc);
-        }
 
-        spin_lock(&obd->obd_osfs_lock);
-        memcpy(&obd->obd_osfs, lov_sfs, sizeof(*lov_sfs));
+        obd_getref(lovobd);
+        tgt = lov->lov_tgts[lovreq->rq_idx];
+        if (!tgt || !tgt->ltd_active)
+                GOTO(out_update, rc);
+
+        tgtobd = class_exp2obd(tgt->ltd_exp);
+        cfs_spin_lock(&tgtobd->obd_osfs_lock);
+        memcpy(&tgtobd->obd_osfs, lov_sfs, sizeof(*lov_sfs));
         if ((oinfo->oi_flags & OBD_STATFS_FROM_CACHE) == 0)
-                obd->obd_osfs_age = cfs_time_current_64();
-        spin_unlock(&obd->obd_osfs_lock);
+                tgtobd->obd_osfs_age = cfs_time_current_64();
+        cfs_spin_unlock(&tgtobd->obd_osfs_lock);
 
+out_update:
         lov_update_statfs(osfs, lov_sfs, success);
         qos_update(lov);
+        obd_putref(lovobd);
+
 out:
         if (lovreq->rq_rqset->set_oi->oi_flags & OBD_STATFS_PTLRPCD &&
             lov_finished_set(lovreq->rq_rqset)) {
@@ -1660,6 +1662,13 @@ int lov_prep_statfs_set(struct obd_device *obd, struct obd_info *oinfo,
                         continue;
                 }
 
+                /* skip targets that have been explicitely disabled by the
+                 * administrator */
+                if (!lov->lov_tgts[i]->ltd_exp) {
+                        CDEBUG(D_HA, "lov idx %d administratively disabled\n", i);
+                        continue;
+                }
+
                 OBD_ALLOC(req, sizeof(*req));
                 if (req == NULL)
                         GOTO(out_set, rc = -ENOMEM);