Whamcloud - gitweb
Change LASSERTs to client eviction (i.e. abort client's recovery)
[fs/lustre-release.git] / lustre / mdt / mdt_recovery.c
index 7780a7b..3ca99be 100644 (file)
@@ -581,7 +581,7 @@ int mdt_client_new(const struct lu_env *env, struct mdt_device *mdt)
         spin_lock(&mdt->mdt_client_bitmap_lock);
         cl_idx = find_first_zero_bit(bitmap, LR_MAX_CLIENTS);
         if (cl_idx >= LR_MAX_CLIENTS ||
-            MDT_FAIL_CHECK_ONCE(OBD_FAIL_MDS_CLIENT_ADD)) {
+            OBD_FAIL_CHECK(OBD_FAIL_MDS_CLIENT_ADD)) {
                 CERROR("no room for %u clients - fix LR_MAX_CLIENTS\n",
                        cl_idx);
                 spin_unlock(&mdt->mdt_client_bitmap_lock);
@@ -937,14 +937,15 @@ int mdt_fs_setup(const struct lu_env *env, struct mdt_device *mdt,
         int rc = 0;
         ENTRY;
 
-        OBD_FAIL_RETURN(OBD_FAIL_MDS_FS_SETUP, -ENOENT);
+        if (OBD_FAIL_CHECK(OBD_FAIL_MDS_FS_SETUP))
+                RETURN(-ENOENT);
 
         /* prepare transactions callbacks */
         mdt->mdt_txn_cb.dtc_txn_start = mdt_txn_start_cb;
         mdt->mdt_txn_cb.dtc_txn_stop = mdt_txn_stop_cb;
         mdt->mdt_txn_cb.dtc_txn_commit = mdt_txn_commit_cb;
         mdt->mdt_txn_cb.dtc_cookie = mdt;
-        INIT_LIST_HEAD(&mdt->mdt_txn_cb.dtc_linkage);
+        CFS_INIT_LIST_HEAD(&mdt->mdt_txn_cb.dtc_linkage);
 
         dt_txn_callback_add(mdt->mdt_bottom, &mdt->mdt_txn_cb);
 
@@ -999,7 +1000,56 @@ void mdt_fs_cleanup(const struct lu_env *env, struct mdt_device *mdt)
 }
 
 /* reconstruction code */
-extern void mds_steal_ack_locks(struct ptlrpc_request *req);
+static void mdt_steal_ack_locks(struct ptlrpc_request *req)
+{
+        struct obd_export         *exp = req->rq_export;
+        struct list_head          *tmp;
+        struct ptlrpc_reply_state *oldrep;
+        struct ptlrpc_service     *svc;
+        int                        i;
+
+        /* CAVEAT EMPTOR: spinlock order */
+        spin_lock(&exp->exp_lock);
+        list_for_each (tmp, &exp->exp_outstanding_replies) {
+                oldrep = list_entry(tmp, struct ptlrpc_reply_state,rs_exp_list);
+
+                if (oldrep->rs_xid != req->rq_xid)
+                        continue;
+
+                if (lustre_msg_get_opc(oldrep->rs_msg) !=
+                    lustre_msg_get_opc(req->rq_reqmsg))
+                        CERROR ("Resent req xid "LPX64" has mismatched opc: "
+                                "new %d old %d\n", req->rq_xid,
+                                lustre_msg_get_opc(req->rq_reqmsg),
+                                lustre_msg_get_opc(oldrep->rs_msg));
+
+                svc = oldrep->rs_service;
+                spin_lock (&svc->srv_lock);
+
+                list_del_init (&oldrep->rs_exp_list);
+
+                CWARN("Stealing %d locks from rs %p x"LPD64".t"LPD64
+                      " o%d NID %s\n",
+                      oldrep->rs_nlocks, oldrep,
+                      oldrep->rs_xid, oldrep->rs_transno,
+                      lustre_msg_get_opc(oldrep->rs_msg),
+                      libcfs_nid2str(exp->exp_connection->c_peer.nid));
+
+                for (i = 0; i < oldrep->rs_nlocks; i++)
+                        ptlrpc_save_lock(req,
+                                         &oldrep->rs_locks[i],
+                                         oldrep->rs_modes[i]);
+                oldrep->rs_nlocks = 0;
+
+                DEBUG_REQ(D_HA, req, "stole locks for");
+                ptlrpc_schedule_difficult_reply (oldrep);
+
+                spin_unlock (&svc->srv_lock);
+                break;
+        }
+        spin_unlock(&exp->exp_lock);
+}
+
 void mdt_req_from_mcd(struct ptlrpc_request *req,
                       struct mdt_client_data *mcd)
 {
@@ -1018,11 +1068,11 @@ void mdt_req_from_mcd(struct ptlrpc_request *req,
                 lustre_msg_set_transno(req->rq_repmsg, req->rq_transno);
                 lustre_msg_set_status(req->rq_repmsg, req->rq_status);
         }
-        mds_steal_ack_locks(req);
+        mdt_steal_ack_locks(req);
 }
 
-static void mdt_reconstruct_generic(struct mdt_thread_info *mti,
-                                    struct mdt_lock_handle *lhc)
+void mdt_reconstruct_generic(struct mdt_thread_info *mti,
+                             struct mdt_lock_handle *lhc)
 {
         struct ptlrpc_request *req = mdt_info_req(mti);
         struct mdt_export_data *med = &req->rq_export->exp_mdt_data;
@@ -1034,7 +1084,8 @@ static void mdt_reconstruct_create(struct mdt_thread_info *mti,
                                    struct mdt_lock_handle *lhc)
 {
         struct ptlrpc_request  *req = mdt_info_req(mti);
-        struct mdt_export_data *med = &req->rq_export->exp_mdt_data;
+        struct obd_export *exp = req->rq_export;
+        struct mdt_export_data *med = &exp->exp_mdt_data;
         struct mdt_device *mdt = mti->mti_mdt;
         struct mdt_object *child;
         struct mdt_body *body;
@@ -1046,9 +1097,19 @@ static void mdt_reconstruct_create(struct mdt_thread_info *mti,
 
         /* if no error, so child was created with requested fid */
         child = mdt_object_find(mti->mti_env, mdt, mti->mti_rr.rr_fid2);
-        LASSERT(!IS_ERR(child));
+        if (IS_ERR(child)) {
+                rc = PTR_ERR(child);
+                LCONSOLE_WARN("Child "DFID" lookup error %d."
+                              " Evicting client %s with export %s.\n",
+                              PFID(mdt_object_fid(child)), rc,
+                              obd_uuid2str(&exp->exp_client_uuid),
+                              obd_export_nid2str(exp));
+                mdt_export_evict(exp);
+                EXIT;
+                return;
+        }
 
-        body = req_capsule_server_get(&mti->mti_pill, &RMF_MDT_BODY);
+        body = req_capsule_server_get(mti->mti_pill, &RMF_MDT_BODY);
         rc = mo_attr_get(mti->mti_env, mdt_object_child(child), &mti->mti_attr);
         if (rc == -EREMOTE) {
                 /* object was created on remote server */
@@ -1063,7 +1124,8 @@ static void mdt_reconstruct_setattr(struct mdt_thread_info *mti,
                                     struct mdt_lock_handle *lhc)
 {
         struct ptlrpc_request  *req = mdt_info_req(mti);
-        struct mdt_export_data *med = &req->rq_export->exp_mdt_data;
+        struct obd_export *exp = req->rq_export;
+        struct mdt_export_data *med = &exp->exp_mdt_data;
         struct mdt_device *mdt = mti->mti_mdt;
         struct mdt_object *obj;
         struct mdt_body *body;
@@ -1072,29 +1134,46 @@ static void mdt_reconstruct_setattr(struct mdt_thread_info *mti,
         if (req->rq_status)
                 return;
 
-        body = req_capsule_server_get(&mti->mti_pill, &RMF_MDT_BODY);
+        body = req_capsule_server_get(mti->mti_pill, &RMF_MDT_BODY);
         obj = mdt_object_find(mti->mti_env, mdt, mti->mti_rr.rr_fid1);
-        LASSERT(!IS_ERR(obj));
+        if (IS_ERR(obj)) {
+                int rc = PTR_ERR(obj);
+                LCONSOLE_WARN(""DFID" lookup error %d."
+                              " Evicting client %s with export %s.\n",
+                              PFID(mdt_object_fid(obj)), rc,
+                              obd_uuid2str(&exp->exp_client_uuid),
+                              obd_export_nid2str(exp));
+                mdt_export_evict(exp);
+                EXIT;
+                return;
+        }
         mo_attr_get(mti->mti_env, mdt_object_child(obj), &mti->mti_attr);
-        mdt_pack_attr2body(mti, body, &mti->mti_attr.ma_attr, mdt_object_fid(obj));
+        mdt_pack_attr2body(mti, body, &mti->mti_attr.ma_attr,
+                           mdt_object_fid(obj));
+        if (mti->mti_epoch && (mti->mti_epoch->flags & MF_EPOCH_OPEN)) {
+                struct mdt_file_data *mfd;
+                struct mdt_body *repbody;
+
+                repbody = req_capsule_server_get(mti->mti_pill, &RMF_MDT_BODY);
+                repbody->ioepoch = obj->mot_ioepoch;
+                spin_lock(&med->med_open_lock);
+                list_for_each_entry(mfd, &med->med_open_head, mfd_list) {
+                        if (mfd->mfd_xid == req->rq_xid)
+                                break;
+                }
+                LASSERT(&mfd->mfd_list != &med->med_open_head);
+                spin_unlock(&med->med_open_lock);
+                repbody->handle.cookie = mfd->mfd_handle.h_cookie;
+        }
 
-        /* Don't return OST-specific attributes if we didn't just set them */
-/*
-        if (rec->ur_iattr.ia_valid & ATTR_SIZE)
-                body->valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
-        if (rec->ur_iattr.ia_valid & (ATTR_MTIME | ATTR_MTIME_SET))
-                body->valid |= OBD_MD_FLMTIME;
-        if (rec->ur_iattr.ia_valid & (ATTR_ATIME | ATTR_ATIME_SET))
-                body->valid |= OBD_MD_FLATIME;
-*/
         mdt_object_put(mti->mti_env, obj);
 }
 
-static void mdt_reconstruct_with_shrink(struct mdt_thread_info *mti,
-                                        struct mdt_lock_handle *lhc)
+static void mdt_reconstruct_setxattr(struct mdt_thread_info *mti,
+                                     struct mdt_lock_handle *lhc)
 {
-        mdt_reconstruct_generic(mti, lhc);
-        mdt_shrink_reply(mti);
+        /* reply nothing */
+        req_capsule_shrink(mti->mti_pill, &RMF_EADATA, 0, RCL_SERVER);
 }
 
 typedef void (*mdt_reconstructor)(struct mdt_thread_info *mti,
@@ -1104,9 +1183,10 @@ static mdt_reconstructor reconstructors[REINT_MAX] = {
         [REINT_SETATTR]  = mdt_reconstruct_setattr,
         [REINT_CREATE]   = mdt_reconstruct_create,
         [REINT_LINK]     = mdt_reconstruct_generic,
-        [REINT_UNLINK]   = mdt_reconstruct_with_shrink,
-        [REINT_RENAME]   = mdt_reconstruct_with_shrink,
-        [REINT_OPEN]     = mdt_reconstruct_open
+        [REINT_UNLINK]   = mdt_reconstruct_generic,
+        [REINT_RENAME]   = mdt_reconstruct_generic,
+        [REINT_OPEN]     = mdt_reconstruct_open,
+        [REINT_SETXATTR] = mdt_reconstruct_setxattr
 };
 
 void mdt_reconstruct(struct mdt_thread_info *mti,
@@ -1116,4 +1196,3 @@ void mdt_reconstruct(struct mdt_thread_info *mti,
         reconstructors[mti->mti_rr.rr_opcode](mti, lhc);
         EXIT;
 }
-