Whamcloud - gitweb
branch: HEAD
[fs/lustre-release.git] / lustre / lov / lov_request.c
index ba95f06..9e898e3 100644 (file)
@@ -59,6 +59,8 @@ static void lov_init_set(struct lov_request_set *set)
         set->set_cookies = 0;
         CFS_INIT_LIST_HEAD(&set->set_list);
         atomic_set(&set->set_refcount, 1);
+        cfs_waitq_init(&set->set_waitq);
+        spin_lock_init(&set->set_lock);
 }
 
 static void lov_finish_set(struct lov_request_set *set)
@@ -93,6 +95,14 @@ static void lov_finish_set(struct lov_request_set *set)
         EXIT;
 }
 
+int lov_finished_set(struct lov_request_set *set)
+{
+        CDEBUG(D_INFO, "check set %d/%d\n", set->set_completes,
+               set->set_count);
+        return set->set_completes == set->set_count;
+}
+
+
 void lov_update_set(struct lov_request_set *set,
                     struct lov_request *req, int rc)
 {
@@ -102,6 +112,8 @@ void lov_update_set(struct lov_request_set *set,
         set->set_completes++;
         if (rc == 0)
                 set->set_success++;
+
+        cfs_waitq_signal(&set->set_waitq);
 }
 
 int lov_update_common_set(struct lov_request_set *set,
@@ -125,6 +137,7 @@ void lov_set_add_req(struct lov_request *req, struct lov_request_set *set)
 {
         list_add_tail(&req->rq_link, &set->set_list);
         set->set_count++;
+        req->rq_rqset = set;
 }
 
 extern void osc_update_enqueue(struct lustre_handle *lov_lockhp,
@@ -313,8 +326,6 @@ int lov_prep_enqueue_set(struct obd_export *exp, struct obd_info *oinfo,
                         ((void *)req->rq_oi.oi_md) + sizeof(*req->rq_oi.oi_md) +
                         sizeof(struct lov_oinfo *);
 
-
-                req->rq_rqset = set;
                 /* Set lov request specific parameters. */
                 req->rq_oi.oi_lockh = set->set_lockh->llh_handles + i;
                 req->rq_oi.oi_cb_up = cb_update_enqueue;
@@ -566,9 +577,6 @@ static int create_done(struct obd_export *exp, struct lov_request_set *set,
 
                         rc = qos_remedy_create(set, req);
                         lov_update_create_set(set, req, rc);
-
-                        if (rc)
-                                break;
                 }
         }
 
@@ -576,11 +584,7 @@ static int create_done(struct obd_export *exp, struct lov_request_set *set,
         if (set->set_success == 0)
                 GOTO(cleanup, rc);
 
-        /* If there was an explicit stripe set, fail.  Otherwise, we
-         * got some objects and that's not bad. */
         if (set->set_count != set->set_success) {
-                if (*lsmp)
-                        GOTO(cleanup, rc);
                 set->set_count = set->set_success;
                 qos_shrink_lsm(set);
         }
@@ -620,7 +624,8 @@ cleanup:
                         continue;
 
                 sub_exp = lov->lov_tgts[req->rq_idx]->ltd_exp;
-                err = obd_destroy(sub_exp, req->rq_oi.oi_oa, NULL, oti, NULL);
+                err = obd_destroy(sub_exp, req->rq_oi.oi_oa, NULL, oti, NULL,
+                                  NULL);
                 if (err)
                         CERROR("Failed to uncreate objid "LPX64" subobj "
                                LPX64" on OST idx %d: rc = %d\n",
@@ -668,9 +673,6 @@ int lov_update_create_set(struct lov_request_set *set,
         struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
         ENTRY;
 
-        req->rq_stripe = set->set_success;
-        loi = lsm->lsm_oinfo[req->rq_stripe];
-
         if (rc && lov->lov_tgts[req->rq_idx] &&
             lov->lov_tgts[req->rq_idx]->ltd_active) {
                 CERROR("error creating fid "LPX64" sub-object"
@@ -682,15 +684,19 @@ int lov_update_create_set(struct lov_request_set *set,
                         rc = -EIO;
                 }
         }
-        lov_update_set(set, req, rc);
-        if (rc)
+
+        spin_lock(&set->set_lock);
+        req->rq_stripe = set->set_success;
+        loi = lsm->lsm_oinfo[req->rq_stripe];
+        if (rc) {
+                lov_update_set(set, req, rc);
+                spin_unlock(&set->set_lock);
                 RETURN(rc);
+        }
 
         loi->loi_id = req->rq_oi.oi_oa->o_id;
         loi->loi_gr = req->rq_oi.oi_oa->o_gr;
         loi->loi_ost_idx = req->rq_idx;
-        CDEBUG(D_INODE, "objid "LPX64" has subobj "LPX64"/"LPU64" at idx %d\n",
-               lsm->lsm_object_id, loi->loi_id, loi->loi_id, req->rq_idx);
         loi_init(loi);
 
         if (oti && set->set_cookies)
@@ -698,9 +704,24 @@ int lov_update_create_set(struct lov_request_set *set,
         if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLCOOKIE)
                 set->set_cookie_sent++;
 
-        RETURN(0);
+        lov_update_set(set, req, rc);
+        spin_unlock(&set->set_lock);
+
+        CDEBUG(D_INODE, "objid "LPX64" has subobj "LPX64"/"LPU64" at idx %d\n",
+               lsm->lsm_object_id, loi->loi_id, loi->loi_id, req->rq_idx);
+        RETURN(rc);
 }
 
+int cb_create_update(void *cookie, int rc)
+{
+        struct obd_info *oinfo = cookie;
+        struct lov_request *lovreq;
+
+        lovreq = container_of(oinfo, struct lov_request, rq_oi);
+        return lov_update_create_set(lovreq->rq_rqset, lovreq, rc);
+}
+
+
 int lov_prep_create_set(struct obd_export *exp, struct obd_info *oinfo,
                         struct lov_stripe_md **lsmp, struct obdo *src_oa,
                         struct obd_trans_info *oti,
@@ -722,6 +743,8 @@ int lov_prep_create_set(struct obd_export *exp, struct obd_info *oinfo,
         set->set_oti = oti;
 
         rc = qos_prep_create(exp, set);
+        /* qos_shrink_lsm() may have allocated a new lsm */
+        *lsmp = oinfo->oi_md;
         if (rc)
                 lov_fini_create_set(set, lsmp);
         else
@@ -954,7 +977,7 @@ int lov_fini_getattr_set(struct lov_request_set *set)
 }
 
 /* The callback for osc_getattr_async that finilizes a request info when a
- * response is recieved. */
+ * response is received. */
 static int cb_getattr_update(void *cookie, int rc)
 {
         struct obd_info *oinfo = cookie;
@@ -1007,7 +1030,6 @@ int lov_prep_getattr_set(struct obd_export *exp, struct obd_info *oinfo,
                 req->rq_oi.oi_oa->o_id = loi->loi_id;
                 req->rq_oi.oi_cb_up = cb_getattr_update;
                 req->rq_oi.oi_capa = oinfo->oi_capa;
-                req->rq_rqset = set;
 
                 lov_set_add_req(req, set);
         }
@@ -1144,7 +1166,7 @@ int lov_update_setattr_set(struct lov_request_set *set,
 }
 
 /* The callback for osc_setattr_async that finilizes a request info when a
- * response is recieved. */
+ * response is received. */
 static int cb_setattr_update(void *cookie, int rc)
 {
         struct obd_info *oinfo = cookie;
@@ -1197,12 +1219,14 @@ int lov_prep_setattr_set(struct obd_export *exp, struct obd_info *oinfo,
                 memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
                        sizeof(*req->rq_oi.oi_oa));
                 req->rq_oi.oi_oa->o_id = loi->loi_id;
-                LASSERT(!(req->rq_oi.oi_oa->o_valid & OBD_MD_FLGROUP)
-                                || req->rq_oi.oi_oa->o_gr>0);
+                LASSERTF(!(req->rq_oi.oi_oa->o_valid & OBD_MD_FLGROUP) ||
+                         CHECK_MDS_GROUP(req->rq_oi.oi_oa->o_gr),
+                         "req->rq_oi.oi_oa->o_valid="LPX64" "
+                         "req->rq_oi.oi_oa->o_gr="LPU64"\n",
+                         req->rq_oi.oi_oa->o_valid, req->rq_oi.oi_oa->o_gr);
                 req->rq_oi.oi_oa->o_stripe_idx = i;
                 req->rq_oi.oi_cb_up = cb_setattr_update;
                 req->rq_oi.oi_capa = oinfo->oi_capa;
-                req->rq_rqset = set;
 
                 if (oinfo->oi_oa->o_valid & OBD_MD_FLSIZE) {
                         int off = lov_stripe_offset(oinfo->oi_md,
@@ -1277,7 +1301,7 @@ int lov_update_punch_set(struct lov_request_set *set,
 }
 
 /* The callback for osc_punch that finilizes a request info when a response
- * is recieved. */
+ * is received. */
 static int cb_update_punch(void *cookie, int rc)
 {
         struct obd_info *oinfo = cookie;
@@ -1308,18 +1332,18 @@ int lov_prep_punch_set(struct obd_export *exp, struct obd_info *oinfo,
                 struct lov_request *req;
                 obd_off rs, re;
 
-                if (!lov->lov_tgts[loi->loi_ost_idx] ||
-                    !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
-                        CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
-                        continue;
-                }
-
                 if (!lov_stripe_intersects(oinfo->oi_md, i,
                                            oinfo->oi_policy.l_extent.start,
                                            oinfo->oi_policy.l_extent.end,
                                            &rs, &re))
                         continue;
 
+                if (!lov->lov_tgts[loi->loi_ost_idx] ||
+                    !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
+                        CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
+                        GOTO(out_set, rc = -EIO);
+                }
+
                 OBD_ALLOC(req, sizeof(*req));
                 if (req == NULL)
                         GOTO(out_set, rc = -ENOMEM);
@@ -1339,7 +1363,6 @@ int lov_prep_punch_set(struct obd_export *exp, struct obd_info *oinfo,
 
                 req->rq_oi.oi_oa->o_stripe_idx = i;
                 req->rq_oi.oi_cb_up = cb_update_punch;
-                req->rq_rqset = set;
 
                 req->rq_oi.oi_policy.l_extent.start = rs;
                 req->rq_oi.oi_policy.l_extent.end = re;
@@ -1561,7 +1584,7 @@ void lov_update_statfs(struct obd_statfs *osfs, struct obd_statfs *lov_sfs,
 }
 
 /* The callback for osc_statfs_async that finilizes a request info when a
- * response is recieved. */
+ * response is received. */
 static int cb_statfs_update(void *cookie, int rc)
 {
         struct obd_info *oinfo = cookie;
@@ -1580,15 +1603,15 @@ static int cb_statfs_update(void *cookie, int rc)
         lov_sfs = oinfo->oi_osfs;
 
         success = lovreq->rq_rqset->set_success;
-
         /* XXX: the same is done in lov_update_common_set, however
            lovset->set_exp is not initialized. */
         lov_update_set(lovreq->rq_rqset, lovreq, rc);
         if (rc) {
+                /* XXX ignore error for disconnected ost ? */
                 if (rc && !(lov->lov_tgts[lovreq->rq_idx] &&
                             lov->lov_tgts[lovreq->rq_idx]->ltd_active))
                         rc = 0;
-                RETURN(rc);
+                GOTO(out, rc);
         }
 
         spin_lock(&obd->obd_osfs_lock);
@@ -1599,6 +1622,14 @@ static int cb_statfs_update(void *cookie, int rc)
 
         lov_update_statfs(osfs, lov_sfs, success);
         qos_update(lov);
+out:
+        if (lovreq->rq_rqset->set_oi->oi_flags & OBD_STATFS_PTLRPCD &&
+            lov_finished_set(lovreq->rq_rqset)) {
+               lov_statfs_interpret(NULL, lovreq->rq_rqset,
+                                    lovreq->rq_rqset->set_success !=
+                                                  lovreq->rq_rqset->set_count);
+               qos_statfs_done(lov);
+        }
 
         RETURN(0);
 }
@@ -1623,7 +1654,8 @@ int lov_prep_statfs_set(struct obd_device *obd, struct obd_info *oinfo,
         for (i = 0; i < lov->desc.ld_tgt_count; i++) {
                 struct lov_request *req;
 
-                if (!lov->lov_tgts[i] || !lov->lov_tgts[i]->ltd_active) {
+                if (!lov->lov_tgts[i] || (!lov->lov_tgts[i]->ltd_active
+                                          && (oinfo->oi_flags & OBD_STATFS_NODELAY))) {
                         CDEBUG(D_HA, "lov idx %d inactive\n", i);
                         continue;
                 }
@@ -1641,7 +1673,6 @@ int lov_prep_statfs_set(struct obd_device *obd, struct obd_info *oinfo,
                 req->rq_idx = i;
                 req->rq_oi.oi_cb_up = cb_statfs_update;
                 req->rq_oi.oi_flags = oinfo->oi_flags;
-                req->rq_rqset = set;
 
                 lov_set_add_req(req, set);
         }