Whamcloud - gitweb
LU-80 lov: large stripe count support
[fs/lustre-release.git] / lustre / lov / lov_request.c
index ac52991..d8c1fea 100644 (file)
@@ -57,8 +57,8 @@
 static void lov_init_set(struct lov_request_set *set)
 {
         set->set_count = 0;
-        set->set_completes = 0;
-        set->set_success = 0;
+        cfs_atomic_set(&set->set_completes, 0);
+        cfs_atomic_set(&set->set_success, 0);
         set->set_cookies = 0;
         CFS_INIT_LIST_HEAD(&set->set_list);
         cfs_atomic_set(&set->set_refcount, 1);
@@ -101,9 +101,11 @@ void lov_finish_set(struct lov_request_set *set)
 
 int lov_finished_set(struct lov_request_set *set)
 {
-        CDEBUG(D_INFO, "check set %d/%d\n", set->set_completes,
+        int completes = cfs_atomic_read(&set->set_completes);
+
+        CDEBUG(D_INFO, "check set %d/%d\n", completes,
                set->set_count);
-        return set->set_completes == set->set_count;
+        return completes == set->set_count;
 }
 
 void lov_update_set(struct lov_request_set *set,
@@ -112,9 +114,9 @@ void lov_update_set(struct lov_request_set *set,
         req->rq_complete = 1;
         req->rq_rc = rc;
 
-        set->set_completes++;
+        cfs_atomic_inc(&set->set_completes);
         if (rc == 0)
-                set->set_success++;
+                cfs_atomic_inc(&set->set_success);
 
         cfs_waitq_signal(&set->set_waitq);
 }
@@ -216,11 +218,12 @@ static int enqueue_done(struct lov_request_set *set, __u32 mode)
 {
         struct lov_request *req;
         struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
+        int completes = cfs_atomic_read(&set->set_completes);
         int rc = 0;
         ENTRY;
 
         /* enqueue/match success, just return */
-        if (set->set_completes && set->set_completes == set->set_success)
+        if (completes && completes == cfs_atomic_read(&set->set_success))
                 RETURN(0);
 
         /* cancel enqueued/matched locks */
@@ -262,7 +265,7 @@ int lov_fini_enqueue_set(struct lov_request_set *set, __u32 mode, int rc,
          * succeeded. */
         if (!rqset) {
                 if (rc)
-                        set->set_completes = 0;
+                        cfs_atomic_set(&set->set_completes, 0);
                 ret = enqueue_done(set, mode);
         } else if (set->set_lockh)
                 lov_llh_put(set->set_lockh);
@@ -363,20 +366,6 @@ out_set:
         RETURN(rc);
 }
 
-int lov_update_match_set(struct lov_request_set *set, struct lov_request *req,
-                         int rc)
-{
-        int ret = rc;
-        ENTRY;
-
-        if (rc > 0)
-                ret = 0;
-        else if (rc == 0)
-                ret = 1;
-        lov_update_set(set, req, ret);
-        RETURN(rc);
-}
-
 int lov_fini_match_set(struct lov_request_set *set, __u32 mode, int flags)
 {
         int rc = 0;
@@ -386,7 +375,7 @@ int lov_fini_match_set(struct lov_request_set *set, __u32 mode, int flags)
                 RETURN(0);
         LASSERT(set->set_exp);
         rc = enqueue_done(set, mode);
-        if ((set->set_count == set->set_success) &&
+        if ((set->set_count == cfs_atomic_read(&set->set_success)) &&
             (flags & LDLM_FL_TEST_LOCK))
                 lov_llh_put(set->set_lockh);
 
@@ -574,7 +563,7 @@ static int lov_update_create_set(struct lov_request_set *set,
         }
 
         cfs_spin_lock(&set->set_lock);
-        req->rq_stripe = set->set_success;
+        req->rq_stripe = cfs_atomic_read(&set->set_success);
         loi = lsm->lsm_oinfo[req->rq_stripe];
 
 
@@ -610,19 +599,19 @@ static int create_done(struct obd_export *exp, struct lov_request_set *set,
         struct obdo *src_oa = set->set_oi->oi_oa;
         struct lov_request *req;
         struct obdo *ret_oa = NULL;
-        int attrset = 0, rc = 0;
+        int success, attrset = 0, rc = 0;
         ENTRY;
 
-        LASSERT(set->set_completes);
+        LASSERT(cfs_atomic_read(&set->set_completes));
 
         /* try alloc objects on other osts if osc_create fails for
          * exceptions: RPC failure, ENOSPC, etc */
-        if (set->set_count != set->set_success) {
+        if (set->set_count != cfs_atomic_read(&set->set_success)) {
                 cfs_list_for_each_entry (req, &set->set_list, rq_link) {
                         if (req->rq_rc == 0)
                                 continue;
 
-                        set->set_completes--;
+                        cfs_atomic_dec(&set->set_completes);
                         req->rq_complete = 0;
 
                         rc = qos_remedy_create(set, req);
@@ -630,12 +619,13 @@ static int create_done(struct obd_export *exp, struct lov_request_set *set,
                 }
         }
 
+        success = cfs_atomic_read(&set->set_success);
         /* no successful creates */
-        if (set->set_success == 0)
+        if (success == 0)
                 GOTO(cleanup, rc);
 
-        if (set->set_count != set->set_success) {
-                set->set_count = set->set_success;
+        if (set->set_count != success) {
+                set->set_count = success;
                 qos_shrink_lsm(set);
         }
 
@@ -705,7 +695,7 @@ int lov_fini_create_set(struct lov_request_set *set,struct lov_stripe_md **lsmp)
         if (set == NULL)
                 RETURN(0);
         LASSERT(set->set_exp);
-        if (set->set_completes)
+        if (cfs_atomic_read(&set->set_completes))
                 rc = create_done(set->set_exp, set, lsmp);
 
         lov_put_reqset(set);
@@ -775,7 +765,7 @@ static int common_attr_done(struct lov_request_set *set)
         if (set->set_oi->oi_oa == NULL)
                 RETURN(0);
 
-        if (!set->set_success)
+        if (!cfs_atomic_read(&set->set_success))
                 RETURN(-EIO);
 
         OBDO_ALLOC(tmp_oa);
@@ -845,7 +835,7 @@ int lov_fini_brw_set(struct lov_request_set *set)
         if (set == NULL)
                 RETURN(0);
         LASSERT(set->set_exp);
-        if (set->set_completes) {
+        if (cfs_atomic_read(&set->set_completes)) {
                 rc = brw_done(set);
                 /* FIXME update qos data here */
         }
@@ -986,7 +976,7 @@ int lov_fini_getattr_set(struct lov_request_set *set)
         if (set == NULL)
                 RETURN(0);
         LASSERT(set->set_exp);
-        if (set->set_completes)
+        if (cfs_atomic_read(&set->set_completes))
                 rc = common_attr_done(set);
 
         lov_put_reqset(set);
@@ -1071,7 +1061,7 @@ int lov_fini_destroy_set(struct lov_request_set *set)
         if (set == NULL)
                 RETURN(0);
         LASSERT(set->set_exp);
-        if (set->set_completes) {
+        if (cfs_atomic_read(&set->set_completes)) {
                 /* FIXME update qos data here */
         }
 
@@ -1148,7 +1138,7 @@ int lov_fini_setattr_set(struct lov_request_set *set)
         if (set == NULL)
                 RETURN(0);
         LASSERT(set->set_exp);
-        if (set->set_completes) {
+        if (cfs_atomic_read(&set->set_completes)) {
                 rc = common_attr_done(set);
                 /* FIXME update qos data here */
         }
@@ -1276,10 +1266,10 @@ int lov_fini_punch_set(struct lov_request_set *set)
         if (set == NULL)
                 RETURN(0);
         LASSERT(set->set_exp);
-        if (set->set_completes) {
+        if (cfs_atomic_read(&set->set_completes)) {
                 rc = -EIO;
                 /* FIXME update qos data here */
-                if (set->set_success)
+                if (cfs_atomic_read(&set->set_success))
                         rc = common_attr_done(set);
         }
 
@@ -1405,8 +1395,8 @@ int lov_fini_sync_set(struct lov_request_set *set)
         if (set == NULL)
                 RETURN(0);
         LASSERT(set->set_exp);
-        if (set->set_completes) {
-                if (!set->set_success)
+        if (cfs_atomic_read(&set->set_completes)) {
+                if (!cfs_atomic_read(&set->set_success))
                         rc = -EIO;
                 /* FIXME update qos data here */
         }
@@ -1505,8 +1495,8 @@ int lov_fini_statfs(struct obd_device *obd, struct obd_statfs *osfs,int success)
         ENTRY;
 
         if (success) {
-                __u32 expected_stripes = lov_get_stripecnt(&obd->u.lov, 0);
-
+                __u32 expected_stripes = lov_get_stripecnt(&obd->u.lov,
+                                                           LOV_MAGIC, 0);
                 if (osfs->os_files != LOV_U64_MAX)
                         do_div(osfs->os_files, expected_stripes);
                 if (osfs->os_ffree != LOV_U64_MAX)
@@ -1530,9 +1520,9 @@ int lov_fini_statfs_set(struct lov_request_set *set)
         if (set == NULL)
                 RETURN(0);
 
-        if (set->set_completes) {
+        if (cfs_atomic_read(&set->set_completes)) {
                 rc = lov_fini_statfs(set->set_obd, set->set_oi->oi_osfs,
-                                     set->set_success);
+                                     cfs_atomic_read(&set->set_success));
         }
         lov_put_reqset(set);
         RETURN(rc);
@@ -1612,6 +1602,7 @@ static int cb_statfs_update(void *cookie, int rc)
 {
         struct obd_info *oinfo = cookie;
         struct lov_request *lovreq;
+        struct lov_request_set *set;
         struct obd_statfs *osfs, *lov_sfs;
         struct lov_obd *lov;
         struct lov_tgt_desc *tgt;
@@ -1620,14 +1611,15 @@ static int cb_statfs_update(void *cookie, int rc)
         ENTRY;
 
         lovreq = container_of(oinfo, struct lov_request, rq_oi);
-        lovobd = lovreq->rq_rqset->set_obd;
+        set = lovreq->rq_rqset;
+        lovobd = set->set_obd;
         lov = &lovobd->u.lov;
-        osfs = lovreq->rq_rqset->set_oi->oi_osfs;
+        osfs = set->set_oi->oi_osfs;
         lov_sfs = oinfo->oi_osfs;
-        success = lovreq->rq_rqset->set_success;
+        success = cfs_atomic_read(&set->set_success);
         /* XXX: the same is done in lov_update_common_set, however
            lovset->set_exp is not initialized. */
-        lov_update_set(lovreq->rq_rqset, lovreq, rc);
+        lov_update_set(set, lovreq, rc);
         if (rc)
                 GOTO(out, rc);
 
@@ -1649,12 +1641,12 @@ out_update:
         obd_putref(lovobd);
 
 out:
-        if (lovreq->rq_rqset->set_oi->oi_flags & OBD_STATFS_PTLRPCD &&
-            lov_finished_set(lovreq->rq_rqset)) {
-               lov_statfs_interpret(NULL, lovreq->rq_rqset,
-                                    lovreq->rq_rqset->set_success !=
-                                                  lovreq->rq_rqset->set_count);
-               qos_statfs_done(lov);
+        if (set->set_oi->oi_flags & OBD_STATFS_PTLRPCD &&
+            lov_finished_set(set)) {
+                lov_statfs_interpret(NULL, set, set->set_count !=
+                                     cfs_atomic_read(&set->set_success));
+                if (lov->lov_qos.lq_statfs_in_progress)
+                        qos_statfs_done(lov);
         }
 
         RETURN(0);