Whamcloud - gitweb
Branch HEAD
authortappro <tappro>
Wed, 3 Jun 2009 22:17:45 +0000 (22:17 +0000)
committertappro <tappro>
Wed, 3 Jun 2009 22:17:45 +0000 (22:17 +0000)
b=18734, 19528
i=rread
i=zam

Cleanup mfd list upon export disconnect, take export reference for commit
callback and check that new mfd can't be added for disconnected export

lustre/include/obd_support.h
lustre/mdd/mdd_lov.c
lustre/mdt/mdt_handler.c
lustre/mdt/mdt_open.c
lustre/mdt/mdt_recovery.c
lustre/obdfilter/filter.c
lustre/ptlrpc/service.c
lustre/ptlrpc/target.c
lustre/tests/replay-single.sh

index 82d43a0..3ee66aa 100644 (file)
@@ -205,6 +205,7 @@ int obd_alloc_fail(const void *ptr, const char *name, const char *type,
 #define OBD_FAIL_MDS_ALLOW_COMMON_EA_SETTING   0x13f
 #define OBD_FAIL_MDS_LOV_PREP_CREATE     0x141
 #define OBD_FAIL_MDS_REINT_DELAY         0x142
+#define OBD_FAIL_MDS_OPEN_WAIT_CREATE    0x143
 
 /* CMD */
 #define OBD_FAIL_MDS_IS_SUBDIR_NET       0x180
index 6044429..4dd8dbf 100644 (file)
@@ -457,6 +457,7 @@ int mdd_lov_create(const struct lu_env *env, struct mdd_device *mdd,
                                 GOTO(out_oti, rc);
                 }
 
+                OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_OPEN_WAIT_CREATE, 10);
                 rc = obd_create(lov_exp, oa, &lsm, oti);
                 if (rc) {
                         if (rc > 0) {
index 54b7ddc..fb53bf6 100644 (file)
@@ -5106,6 +5106,81 @@ static int mdt_obd_reconnect(const struct lu_env *env,
 
         RETURN(rc);
 }
+static int mdt_mfd_cleanup(struct obd_export *exp)
+{
+        struct mdt_export_data *med = &exp->exp_mdt_data;
+        struct obd_device      *obd = exp->exp_obd;
+        struct mdt_device      *mdt;
+        struct mdt_thread_info *info;
+        struct lu_env           env;
+        CFS_LIST_HEAD(closing_list);
+        struct mdt_file_data *mfd, *n;
+        int rc = 0;
+        ENTRY;
+
+        spin_lock(&med->med_open_lock);
+        while (!list_empty(&med->med_open_head)) {
+                struct list_head *tmp = med->med_open_head.next;
+                mfd = list_entry(tmp, struct mdt_file_data, mfd_list);
+
+                /* Remove mfd handle so it can't be found again.
+                 * We are consuming the mfd_list reference here. */
+                class_handle_unhash(&mfd->mfd_handle);
+                list_move_tail(&mfd->mfd_list, &closing_list);
+        }
+        spin_unlock(&med->med_open_lock);
+        mdt = mdt_dev(obd->obd_lu_dev);
+        LASSERT(mdt != NULL);
+
+        rc = lu_env_init(&env, LCT_MD_THREAD);
+        if (rc)
+                RETURN(rc);
+
+        info = lu_context_key_get(&env.le_ctx, &mdt_thread_key);
+        LASSERT(info != NULL);
+        memset(info, 0, sizeof *info);
+        info->mti_env = &env;
+        info->mti_mdt = mdt;
+        info->mti_exp = exp;
+
+        if (!list_empty(&closing_list)) {
+                struct md_attr *ma = &info->mti_attr;
+                int lmm_size;
+                int cookie_size;
+
+                lmm_size = mdt->mdt_max_mdsize;
+                cookie_size = mdt->mdt_max_cookiesize;
+                OBD_ALLOC(ma->ma_lmm, lmm_size);
+                if (ma->ma_lmm == NULL)
+                        GOTO(out_lmm, rc = -ENOMEM);
+                OBD_ALLOC(ma->ma_cookie, cookie_size);
+                if (ma->ma_cookie == NULL)
+                        GOTO(out_cookie, rc = -ENOMEM);
+
+                /* Close any open files (which may also cause orphan unlinking). */
+                list_for_each_entry_safe(mfd, n, &closing_list, mfd_list) {
+                        list_del_init(&mfd->mfd_list);
+                        /* TODO: if we close the unlinked file,
+                         * we need to remove its objects from OST */
+                        memset(&ma->ma_attr, 0, sizeof(ma->ma_attr));
+                        ma->ma_lmm_size = lmm_size;
+                        ma->ma_cookie_size = cookie_size;
+                        ma->ma_need = MA_LOV | MA_COOKIE;
+                        ma->ma_valid = 0;
+                        mdt_mfd_close(info, mfd);
+                }
+                info->mti_mdt = NULL;
+                OBD_FREE(ma->ma_cookie, cookie_size);
+                ma->ma_cookie = NULL;
+out_cookie:
+                OBD_FREE(ma->ma_lmm, lmm_size);
+                ma->ma_lmm = NULL;
+        }
+out_lmm:
+        lu_env_fini(&env);
+
+        RETURN(rc);
+}
 
 static int mdt_obd_disconnect(struct obd_export *exp)
 {
@@ -5140,7 +5215,7 @@ static int mdt_obd_disconnect(struct obd_export *exp)
                 spin_unlock(&svc->srv_lock);
         }
         spin_unlock(&exp->exp_lock);
-
+        rc = mdt_mfd_cleanup(exp);
         class_export_put(exp);
         RETURN(rc);
 }
@@ -5172,11 +5247,6 @@ static int mdt_destroy_export(struct obd_export *export)
         struct mdt_device      *mdt;
         struct mdt_thread_info *info;
         struct lu_env           env;
-        struct md_attr         *ma;
-        int lmm_size;
-        int cookie_size;
-        CFS_LIST_HEAD(closing_list);
-        struct mdt_file_data *mfd, *n;
         int rc = 0;
         ENTRY;
 
@@ -5187,6 +5257,8 @@ static int mdt_destroy_export(struct obd_export *export)
         target_destroy_export(export);
         ldlm_destroy_export(export);
 
+        LASSERT(list_empty(&export->exp_outstanding_replies));
+        LASSERT(list_empty(&med->med_open_head));
         if (obd_uuid_equals(&export->exp_client_uuid, &obd->obd_uuid))
                 RETURN(0);
 
@@ -5201,62 +5273,12 @@ static int mdt_destroy_export(struct obd_export *export)
         LASSERT(info != NULL);
         memset(info, 0, sizeof *info);
         info->mti_env = &env;
-        info->mti_mdt = mdt;
         info->mti_exp = export;
-
-        ma = &info->mti_attr;
-        lmm_size = ma->ma_lmm_size = mdt->mdt_max_mdsize;
-        cookie_size = ma->ma_cookie_size = mdt->mdt_max_cookiesize;
-        OBD_ALLOC(ma->ma_lmm, lmm_size);
-        OBD_ALLOC(ma->ma_cookie, cookie_size);
-
-        if (ma->ma_lmm == NULL || ma->ma_cookie == NULL)
-                GOTO(out, rc = -ENOMEM);
-        ma->ma_need = MA_LOV | MA_COOKIE;
-        ma->ma_valid = 0;
-        /* Close any open files (which may also cause orphan unlinking). */
-        spin_lock(&med->med_open_lock);
-        while (!list_empty(&med->med_open_head)) {
-                struct list_head *tmp = med->med_open_head.next;
-                mfd = list_entry(tmp, struct mdt_file_data, mfd_list);
-
-                /* Remove mfd handle so it can't be found again.
-                 * We are consuming the mfd_list reference here. */
-                class_handle_unhash(&mfd->mfd_handle);
-                list_move_tail(&mfd->mfd_list, &closing_list);
-        }
-        spin_unlock(&med->med_open_lock);
-
-        list_for_each_entry_safe(mfd, n, &closing_list, mfd_list) {
-                list_del_init(&mfd->mfd_list);
-                mdt_mfd_close(info, mfd);
-                /* TODO: if we close the unlinked file,
-                 * we need to remove its objects from OST */
-                memset(&ma->ma_attr, 0, sizeof(ma->ma_attr));
-                spin_lock(&med->med_open_lock);
-                ma->ma_lmm_size = lmm_size;
-                ma->ma_cookie_size = cookie_size;
-                ma->ma_need = MA_LOV | MA_COOKIE;
-                ma->ma_valid = 0;
-                spin_unlock(&med->med_open_lock);
-        }
-
         info->mti_mdt = NULL;
         mdt_client_del(&env, mdt);
 
-        EXIT;
-out:
-        if (lmm_size) {
-                OBD_FREE(ma->ma_lmm, lmm_size);
-                ma->ma_lmm = NULL;
-        }
-        if (cookie_size) {
-                OBD_FREE(ma->ma_cookie, cookie_size);
-                ma->ma_cookie = NULL;
-        }
         lu_env_fini(&env);
-
-        return rc;
+        RETURN(rc);
 }
 
 static void mdt_allow_cli(struct mdt_device *m, unsigned int flag)
index e2bea57..f33e0ac 100644 (file)
@@ -486,11 +486,20 @@ static int mdt_mfd_open(struct mdt_thread_info *info, struct mdt_object *p,
                         mfd->mfd_old_handle.cookie =
                                                 info->mti_rr.rr_handle->cookie;
                 }
-                spin_lock(&med->med_open_lock);
-                list_add(&mfd->mfd_list, &med->med_open_head);
-                spin_unlock(&med->med_open_lock);
-
                 repbody->handle.cookie = mfd->mfd_handle.h_cookie;
+
+                if (req->rq_export->exp_disconnected) {
+                        spin_lock(&med->med_open_lock);
+                        class_handle_unhash(&mfd->mfd_handle);
+                        list_del_init(&mfd->mfd_list);
+                        spin_unlock(&med->med_open_lock);
+                        mdt_mfd_close(info, mfd);
+                } else {
+                        spin_lock(&med->med_open_lock);
+                        list_add(&mfd->mfd_list, &med->med_open_head);
+                        spin_unlock(&med->med_open_lock);
+                }
+
                 mdt_empty_transno(info);
         } else
                 rc = -ENOMEM;
index 4e03c7f..8b8ae00 100644 (file)
@@ -913,7 +913,8 @@ static int mdt_txn_stop_cb(const struct lu_env *env,
 
         /* add separate commit callback for transaction handling because we need
          * export as parameter */
-        mdt_trans_add_cb(txn, lut_cb_last_committed, mti->mti_exp);
+        mdt_trans_add_cb(txn, lut_cb_last_committed,
+                         class_export_get(mti->mti_exp));
 
         return mdt_last_rcvd_update(mti, txn);
 }
index 1ab6b0a..8e4d34c 100644 (file)
@@ -88,7 +88,9 @@ static void filter_commit_cb(struct obd_device *obd, __u64 transno,
                              void *cb_data, int error)
 {
         struct obd_export *exp = cb_data;
+        LASSERT(exp->exp_obd == obd);
         obd_transno_commit_cb(obd, transno, exp, error);
+        class_export_put(exp);
 }
 
 int filter_version_get_check(struct obd_export *exp,
@@ -164,6 +166,7 @@ int filter_finish_transno(struct obd_export *exp, struct inode *inode,
                        fed->fed_lr_idx, fed->fed_lr_off);
                 err = -EINVAL;
         } else {
+                class_export_get(exp); /* released when the cb is called */
                 if (!force_sync)
                         force_sync = fsfilt_add_journal_cb(exp->exp_obd,
                                                            last_rcvd,
index d3fcc9d..3eb247f 100644 (file)
@@ -162,8 +162,8 @@ ptlrpc_grow_req_bufs(struct ptlrpc_service *svc)
 }
 
 void
-ptlrpc_save_lock (struct ptlrpc_request *req,
-                  struct lustre_handle *lock, int mode, int no_ack)
+ptlrpc_save_lock(struct ptlrpc_request *req,
+                 struct lustre_handle *lock, int mode, int no_ack)
 {
         struct ptlrpc_reply_state *rs = req->rq_reply_state;
         int                        idx;
@@ -171,11 +171,15 @@ ptlrpc_save_lock (struct ptlrpc_request *req,
         LASSERT(rs != NULL);
         LASSERT(rs->rs_nlocks < RS_MAX_LOCKS);
 
-        idx = rs->rs_nlocks++;
-        rs->rs_locks[idx] = *lock;
-        rs->rs_modes[idx] = mode;
-        rs->rs_difficult = 1;
-        rs->rs_no_ack = !!no_ack;
+        if (req->rq_export->exp_disconnected) {
+                ldlm_lock_decref(lock, mode);
+        } else {
+                idx = rs->rs_nlocks++;
+                rs->rs_locks[idx] = *lock;
+                rs->rs_modes[idx] = mode;
+                rs->rs_difficult = 1;
+                rs->rs_no_ack = !!no_ack;
+        }
 }
 
 #ifdef __KERNEL__
index 92ab5a9..3dc9675 100644 (file)
@@ -296,20 +296,20 @@ void lut_cb_last_committed(struct lu_target *lut, __u64 transno,
                            void *data, int err)
 {
         struct obd_export *exp = data;
-
+        LASSERT(exp->exp_obd == lut->lut_obd);
         spin_lock(&lut->lut_translock);
         if (transno > lut->lut_obd->obd_last_committed)
                 lut->lut_obd->obd_last_committed = transno;
 
         LASSERT(exp);
-        if (!lut->lut_obd->obd_stopping &&
-            transno > exp->exp_last_committed) {
+        if (transno > exp->exp_last_committed) {
                 exp->exp_last_committed = transno;
                 spin_unlock(&lut->lut_translock);
                 ptlrpc_commit_replies(exp);
         } else {
                 spin_unlock(&lut->lut_translock);
         }
+        class_export_put(exp);
         if (transno)
                 CDEBUG(D_HA, "%s: transno "LPD64" is committed\n",
                        lut->lut_obd->obd_name, transno);
index 394ef8d..aab4ef6 100755 (executable)
@@ -1989,6 +1989,17 @@ test_82b() {
 }
 run_test 82b "CMD: mkdir cross-node dir (fail mds with name)"
 
+test_84() {
+#define OBD_FAIL_MDS_OPEN_WAIT_CREATE  0x143
+    do_facet mds "lctl set_param fail_loc=0x80000143"
+    createmany -o $DIR/$tfile- 1 &
+    PID=$!
+    mds_evict_client
+    wait $PID
+    df -P $DIR || df -P $DIR || true    # reconnect
+}
+run_test 84 "stale open during export disconnect"
+
 equals_msg `basename $0`: test complete, cleaning up
 check_and_cleanup_lustre
 [ -f "$TESTSUITELOG" ] && cat $TESTSUITELOG && grep -q FAIL $TESTSUITELOG && exit 1 || true