Whamcloud - gitweb
LU-1626 lov: fix lov request set finish check race
authorBobi Jam <bobijam@whamcloud.com>
Mon, 16 Jul 2012 11:07:10 +0000 (19:07 +0800)
committerOleg Drokin <green@whamcloud.com>
Thu, 2 Aug 2012 23:01:15 +0000 (19:01 -0400)
When several lov_request callbacks are called, if one of them is
the last lov_request in the set, lov_finished_set() checks for
all of them will return true, while the following action is supposed
be called only once for the set, in this case the assumption is broke
and the lov request set's refcount is wrong.

This patch fixed another glitch, in qos_remedy_create(), when we use
OST pool, the ost_idx value does not initialied correctly.

Signed-off-by: Bobi Jam <bobijam@whamcloud.com>
Change-Id: Id3ff1777b2146630b2d693e046038fcc6f465309
Reviewed-on: http://review.whamcloud.com/3401
Tested-by: Hudson
Tested-by: Maloo <whamcloud.maloo@gmail.com>
Reviewed-by: Fan Yong <yong.fan@whamcloud.com>
Reviewed-by: wangdi <di.wang@whamcloud.com>
Reviewed-by: Oleg Drokin <green@whamcloud.com>
lustre/lov/lov_internal.h
lustre/lov/lov_obd.c
lustre/lov/lov_qos.c
lustre/lov/lov_request.c

index a2528a0..c2f1e15 100644 (file)
@@ -65,25 +65,26 @@ struct lov_request {
 };
 
 struct lov_request_set {
-        struct ldlm_enqueue_info*set_ei;
-        struct obd_info         *set_oi;
-        cfs_atomic_t             set_refcount;
-        struct obd_export       *set_exp;
-        /* XXX: There is @set_exp already, however obd_statfs gets obd_device
-           only. */
-        struct obd_device       *set_obd;
-        int                      set_count;
-        cfs_atomic_t             set_completes;
-        cfs_atomic_t             set_success;
-        struct llog_cookie      *set_cookies;
-        int                      set_cookie_sent;
-        struct obd_trans_info   *set_oti;
-        obd_count                set_oabufs;
-        struct brw_page         *set_pga;
-        struct lov_lock_handles *set_lockh;
-        cfs_list_t               set_list;
-        cfs_waitq_t              set_waitq;
-        cfs_spinlock_t           set_lock;
+       struct ldlm_enqueue_info        *set_ei;
+       struct obd_info                 *set_oi;
+       cfs_atomic_t                    set_refcount;
+       struct obd_export               *set_exp;
+       /* XXX: There is @set_exp already, however obd_statfs gets obd_device
+          only. */
+       struct obd_device               *set_obd;
+       int                             set_count;
+       cfs_atomic_t                    set_completes;
+       cfs_atomic_t                    set_success;
+       cfs_atomic_t                    set_finish_checked;
+       struct llog_cookie              *set_cookies;
+       int                             set_cookie_sent;
+       struct obd_trans_info           *set_oti;
+       obd_count                       set_oabufs;
+       struct brw_page                 *set_pga;
+       struct lov_lock_handles         *set_lockh;
+       cfs_list_t                      set_list;
+       cfs_waitq_t                     set_waitq;
+       cfs_spinlock_t                  set_lock;
 };
 
 extern cfs_mem_cache_t *lov_oinfo_slab;
@@ -168,7 +169,7 @@ int qos_remedy_create(struct lov_request_set *set, struct lov_request *req);
 
 /* lov_request.c */
 void lov_set_add_req(struct lov_request *req, struct lov_request_set *set);
-int lov_finished_set(struct lov_request_set *set);
+int lov_set_finished(struct lov_request_set *set, int idempotent);
 void lov_update_set(struct lov_request_set *set,
                     struct lov_request *req, int rc);
 int lov_update_common_set(struct lov_request_set *set,
index effb91c..ea72c2e 100644 (file)
@@ -1169,7 +1169,7 @@ static int lov_create(const struct lu_env *env, struct obd_export *exp,
 
         /* osc_create have timeout equ obd_timeout/2 so waiting don't be
          * longer then this */
-        l_wait_event(set->set_waitq, lov_finished_set(set), &lwi);
+       l_wait_event(set->set_waitq, lov_set_finished(set, 1), &lwi);
 
         /* we not have ptlrpc set for assign set->interpret and should
          * be call interpret function himself. calling from cb_create_update
index 527620a..2b22ea4 100644 (file)
@@ -529,30 +529,33 @@ static int lov_check_and_create_object(struct lov_obd *lov, int ost_idx,
 
 int qos_remedy_create(struct lov_request_set *set, struct lov_request *req)
 {
-        struct lov_stripe_md *lsm = set->set_oi->oi_md;
-        struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
-        unsigned ost_idx = 0, ost_count;
-        struct pool_desc *pool;
-        struct ost_pool *osts = NULL;
-        int i, rc = -EIO;
-        ENTRY;
-
-        /* First check whether we can create the objects on the pool */
-        pool = lov_find_pool(lov, lsm->lsm_pool_name);
-        if (pool != NULL) {
-                cfs_down_read(&pool_tgt_rw_sem(pool));
-                osts = &(pool->pool_obds);
-                ost_count = osts->op_count;
-                for (i = 0; i < ost_count; i++, ost_idx = osts->op_array[i]) {
-                        rc = lov_check_and_create_object(lov, ost_idx, lsm, req,
-                                                         set->set_oti);
-                        if (rc == 0)
-                                break;
-                }
-                cfs_up_read(&pool_tgt_rw_sem(pool));
-                lov_pool_putref(pool);
-                RETURN(rc);
-        }
+       struct lov_stripe_md    *lsm = set->set_oi->oi_md;
+       struct lov_obd          *lov = &set->set_exp->exp_obd->u.lov;
+       unsigned                ost_idx;
+       unsigned                ost_count;
+       struct pool_desc        *pool;
+       struct ost_pool         *osts = NULL;
+       int                     i;
+       int                     rc = -EIO;
+       ENTRY;
+
+       /* First check whether we can create the objects on the pool */
+       pool = lov_find_pool(lov, lsm->lsm_pool_name);
+       if (pool != NULL) {
+               cfs_down_read(&pool_tgt_rw_sem(pool));
+               osts = &(pool->pool_obds);
+               ost_count = osts->op_count;
+               for (i = 0, ost_idx = osts->op_array[0]; i < ost_count;
+                    i++, ost_idx = osts->op_array[i]) {
+                       rc = lov_check_and_create_object(lov, ost_idx, lsm, req,
+                                                        set->set_oti);
+                       if (rc == 0)
+                               break;
+               }
+               cfs_up_read(&pool_tgt_rw_sem(pool));
+               lov_pool_putref(pool);
+               RETURN(rc);
+       }
 
         ost_count = lov->desc.ld_tgt_count;
         /* Then check whether we can create the objects on other OSTs */
index 672eca2..205c27f 100644 (file)
 
 static void lov_init_set(struct lov_request_set *set)
 {
-        set->set_count = 0;
-        cfs_atomic_set(&set->set_completes, 0);
-        cfs_atomic_set(&set->set_success, 0);
-        set->set_cookies = 0;
-        CFS_INIT_LIST_HEAD(&set->set_list);
-        cfs_atomic_set(&set->set_refcount, 1);
-        cfs_waitq_init(&set->set_waitq);
-        cfs_spin_lock_init(&set->set_lock);
+       set->set_count = 0;
+       cfs_atomic_set(&set->set_completes, 0);
+       cfs_atomic_set(&set->set_success, 0);
+       cfs_atomic_set(&set->set_finish_checked, 0);
+       set->set_cookies = 0;
+       CFS_INIT_LIST_HEAD(&set->set_list);
+       cfs_atomic_set(&set->set_refcount, 1);
+       cfs_waitq_init(&set->set_waitq);
+       cfs_spin_lock_init(&set->set_lock);
 }
 
 void lov_finish_set(struct lov_request_set *set)
@@ -93,13 +94,19 @@ void lov_finish_set(struct lov_request_set *set)
         EXIT;
 }
 
-int lov_finished_set(struct lov_request_set *set)
+int lov_set_finished(struct lov_request_set *set, int idempotent)
 {
-        int completes = cfs_atomic_read(&set->set_completes);
+       int completes = cfs_atomic_read(&set->set_completes);
+
+       CDEBUG(D_INFO, "check set %d/%d\n", completes, set->set_count);
 
-        CDEBUG(D_INFO, "check set %d/%d\n", completes,
-               set->set_count);
-        return completes == set->set_count;
+       if (completes == set->set_count) {
+               if (idempotent)
+                       return 1;
+               if (cfs_atomic_inc_return(&set->set_finish_checked) == 1)
+                       return 1;
+       }
+       return 0;
 }
 
 void lov_update_set(struct lov_request_set *set,
@@ -741,10 +748,10 @@ int cb_create_update(void *cookie, int rc)
                 if (lovreq->rq_idx == cfs_fail_val)
                         rc = -ENOTCONN;
 
-        rc= lov_update_create_set(lovreq->rq_rqset, lovreq, rc);
-        if (lov_finished_set(lovreq->rq_rqset))
-                lov_put_reqset(lovreq->rq_rqset);
-        return rc;
+       rc = lov_update_create_set(lovreq->rq_rqset, lovreq, rc);
+       if (lov_set_finished(lovreq->rq_rqset, 0))
+               lov_put_reqset(lovreq->rq_rqset);
+       return rc;
 }
 
 int lov_prep_create_set(struct obd_export *exp, struct obd_info *oinfo,
@@ -1669,15 +1676,15 @@ out_update:
         obd_putref(lovobd);
 
 out:
-        if (set->set_oi->oi_flags & OBD_STATFS_PTLRPCD &&
-            lov_finished_set(set)) {
-                lov_statfs_interpret(NULL, set, set->set_count !=
-                                     cfs_atomic_read(&set->set_success));
-                if (lov->lov_qos.lq_statfs_in_progress)
-                        qos_statfs_done(lov);
-        }
-
-        RETURN(0);
+       if (set->set_oi->oi_flags & OBD_STATFS_PTLRPCD &&
+           lov_set_finished(set, 0)) {
+               lov_statfs_interpret(NULL, set, set->set_count !=
+                                    cfs_atomic_read(&set->set_success));
+               if (lov->lov_qos.lq_statfs_in_progress)
+                       qos_statfs_done(lov);
+       }
+
+       RETURN(0);
 }
 
 int lov_prep_statfs_set(struct obd_device *obd, struct obd_info *oinfo,