Whamcloud - gitweb
Branch HEAD
[fs/lustre-release.git] / lustre / mdc / mdc_request.c
index d70954a..e294986 100644 (file)
@@ -610,26 +610,10 @@ int mdc_free_lustre_md(struct obd_export *exp, struct lustre_md *md)
         RETURN(0);
 }
 
-static void mdc_commit_open(struct ptlrpc_request *req)
-{
-        struct mdc_open_data *mod = req->rq_cb_data;
-        if (mod == NULL)
-                return;
-
-        if (mod->mod_close_req != NULL)
-                mod->mod_close_req->rq_cb_data = NULL;
-
-        if (mod->mod_och != NULL)
-                mod->mod_och->och_mod = NULL;
-
-        OBD_FREE(mod, sizeof(*mod));
-        req->rq_cb_data = NULL;
-}
-
 static void mdc_replay_open(struct ptlrpc_request *req)
 {
-        struct mdc_open_data *mod = req->rq_cb_data;
-        struct ptlrpc_request *close_req;
+        struct md_open_data *mod = req->rq_cb_data;
+        struct ptlrpc_request *cur, *tmp;
         struct obd_client_handle *och;
         struct lustre_handle old;
         struct mdt_body *body;
@@ -644,43 +628,100 @@ static void mdc_replay_open(struct ptlrpc_request *req)
 
         body = lustre_swab_repbuf(req, DLM_REPLY_REC_OFF, sizeof(*body),
                                   lustre_swab_mdt_body);
+        LASSERT(body != NULL);
 
         och = mod->mod_och;
         if (och != NULL) {
                 struct lustre_handle *file_fh;
 
                 LASSERT(och->och_magic == OBD_CLIENT_HANDLE_MAGIC);
-                LASSERT(body != NULL);
 
                 file_fh = &och->och_fh;
                 CDEBUG(D_HA, "updating handle from "LPX64" to "LPX64"\n",
                        file_fh->cookie, body->handle.cookie);
-                memcpy(&old, file_fh, sizeof(old));
-                memcpy(file_fh, &body->handle, sizeof(*file_fh));
+                old = *file_fh;
+                *file_fh = body->handle;
         }
-        close_req = mod->mod_close_req;
-        if (close_req != NULL) {
-                struct mdt_epoch *epoch;
 
-                LASSERT(lustre_msg_get_opc(close_req->rq_reqmsg) == MDS_CLOSE);
-                LASSERT(body != NULL);
-
-                epoch = lustre_msg_buf(close_req->rq_reqmsg, REQ_REC_OFF,
-                                       sizeof(*epoch));
-                LASSERT(epoch);
-                if (och != NULL)
-                        LASSERT(!memcmp(&old, &epoch->handle, sizeof(old)));
-                DEBUG_REQ(D_HA, close_req, "updating close body with new fh");
-                memcpy(&epoch->handle, &body->handle, sizeof(epoch->handle));
+        list_for_each_entry_safe(cur, tmp, &mod->mod_replay_list, rq_mod_list) {
+                int opc = lustre_msg_get_opc(cur->rq_reqmsg);
+                struct mdt_epoch *epoch = NULL;
+
+                if (opc == MDS_CLOSE || opc == MDS_DONE_WRITING) {
+                        epoch = lustre_msg_buf(cur->rq_reqmsg,
+                                               REQ_REC_OFF, sizeof(*epoch));
+                        LASSERT(epoch);
+                        DEBUG_REQ(D_HA, cur, "updating %s body with new fh",
+                                  opc == MDS_CLOSE ? "CLOSE" : "DONE_WRITING");
+                } else if (opc == MDS_REINT) {
+                        struct mdt_rec_setattr *rec;
+                        
+                        /* Check this is REINT_SETATTR. */
+                        rec = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF,
+                                             sizeof (*rec));
+                        LASSERT(rec && rec->sa_opcode == REINT_SETATTR);
+
+                        epoch = lustre_msg_buf(cur->rq_reqmsg,
+                                               REQ_REC_OFF + 2, sizeof(*epoch));
+                        LASSERT(epoch);
+                        DEBUG_REQ(D_HA, cur, "updating REINT_SETATTR body "
+                                  "with new fh");
+                }
+                if (epoch) {
+                        if (och != NULL)
+                                LASSERT(!memcmp(&old, &epoch->handle,
+                                                sizeof(old)));
+                        epoch->handle = body->handle;
+                }
         }
         EXIT;
 }
 
+void mdc_commit_delayed(struct ptlrpc_request *req)
+{
+        struct md_open_data *mod = req->rq_cb_data;
+        struct ptlrpc_request *cur, *tmp;
+        
+        DEBUG_REQ(D_HA, req, "req committed");
+
+        if (mod == NULL)
+                return;
+
+        req->rq_cb_data = NULL;
+        req->rq_commit_cb = NULL;
+        list_del_init(&req->rq_mod_list);
+        if (req->rq_sequence) {
+                list_for_each_entry_safe(cur, tmp, &mod->mod_replay_list,
+                                         rq_mod_list)
+                {
+                        LASSERT(cur != LP_POISON);
+                        LASSERT(cur->rq_type != LI_POISON);
+                        DEBUG_REQ(D_HA, cur, "req balanced");
+                        LASSERT(cur->rq_transno != 0);
+                        LASSERT(cur->rq_import == req->rq_import);
+
+                        list_del_init(&cur->rq_mod_list);
+                        /* We no longer want to preserve this for transno-
+                         * unconditional replay. */
+                        spin_lock(&cur->rq_lock);
+                        cur->rq_replay = 0;
+                        spin_unlock(&cur->rq_lock);
+                }
+        }
+
+        if (list_empty(&mod->mod_replay_list)) {
+                if (mod->mod_och != NULL)
+                        mod->mod_och->och_mod = NULL;
+
+                OBD_FREE_PTR(mod);
+        }
+}
+
 int mdc_set_open_replay_data(struct obd_export *exp,
                              struct obd_client_handle *och,
                              struct ptlrpc_request *open_req)
 {
-        struct mdc_open_data *mod;
+        struct md_open_data *mod;
         struct mdt_rec_create *rec = lustre_msg_buf(open_req->rq_reqmsg,
                                                     DLM_INTENT_REC_OFF,
                                                     sizeof(*rec));
@@ -703,9 +744,10 @@ int mdc_set_open_replay_data(struct obd_export *exp,
                 OBD_ALLOC(mod, sizeof(*mod));
                 if (mod == NULL) {
                         DEBUG_REQ(D_ERROR, open_req,
-                                  "Can't allocate mdc_open_data");
+                                  "Can't allocate md_open_data");
                         RETURN(0);
                 }
+                CFS_INIT_LIST_HEAD(&mod->mod_replay_list);
 
                 spin_lock(&open_req->rq_lock);
                 if (!open_req->rq_replay) {
@@ -717,8 +759,8 @@ int mdc_set_open_replay_data(struct obd_export *exp,
                 och->och_mod = mod;
                 mod->mod_och = och;
                 open_req->rq_cb_data = mod;
-                mod->mod_open_req = open_req;
-                open_req->rq_commit_cb = mdc_commit_open;
+                list_add_tail(&open_req->rq_mod_list, &mod->mod_replay_list);
+                open_req->rq_commit_cb = mdc_commit_delayed;
                 spin_unlock(&open_req->rq_lock);
         }
 
@@ -739,12 +781,12 @@ int mdc_set_open_replay_data(struct obd_export *exp,
 int mdc_clear_open_replay_data(struct obd_export *exp,
                                struct obd_client_handle *och)
 {
-        struct mdc_open_data *mod = och->och_mod;
+        struct md_open_data *mod = och->och_mod;
         ENTRY;
 
         /*
-         * Don't free the structure now (it happens in mdc_commit_open(), after
-         * we're sure we won't need to fix up the close request in the future),
+         * Don't free the structure now (it happens in mdc_commit_delayed(),
+         * after the last request is removed from its replay list),
          * but make sure that replay doesn't poke at the och, which is about to
          * be freed.
          */
@@ -756,40 +798,8 @@ int mdc_clear_open_replay_data(struct obd_export *exp,
         RETURN(0);
 }
 
-static void mdc_commit_close(struct ptlrpc_request *req)
-{
-        struct mdc_open_data *mod = req->rq_cb_data;
-        struct ptlrpc_request *open_req;
-        struct obd_import *imp = req->rq_import;
-
-        DEBUG_REQ(D_HA, req, "close req committed");
-        if (mod == NULL)
-                return;
-
-        mod->mod_close_req = NULL;
-        req->rq_cb_data = NULL;
-        req->rq_commit_cb = NULL;
-
-        open_req = mod->mod_open_req;
-        LASSERT(open_req != NULL);
-        LASSERT(open_req != LP_POISON);
-        LASSERT(open_req->rq_type != LI_POISON);
-
-        DEBUG_REQ(D_HA, open_req, "open req balanced");
-        LASSERT(open_req->rq_transno != 0);
-        LASSERT(open_req->rq_import == imp);
-
-        /*
-         * We no longer want to preserve this for transno-unconditional
-         * replay. Decref open req here as well.
-         */
-        spin_lock(&open_req->rq_lock);
-        open_req->rq_replay = 0;
-        spin_unlock(&open_req->rq_lock);
-}
-
 int mdc_close(struct obd_export *exp, struct md_op_data *op_data,
-              struct obd_client_handle *och, struct ptlrpc_request **request)
+              struct md_open_data *mod, struct ptlrpc_request **request)
 {
         struct obd_device *obd = class_exp2obd(exp);
         int reqsize[4] = { sizeof(struct ptlrpc_body),
@@ -800,7 +810,6 @@ int mdc_close(struct obd_export *exp, struct md_op_data *op_data,
                            obd->u.cli.cl_max_mds_easize,
                            obd->u.cli.cl_max_mds_cookiesize };
         struct ptlrpc_request *req;
-        struct mdc_open_data *mod;
         int rc;
         ENTRY;
 
@@ -818,26 +827,15 @@ int mdc_close(struct obd_export *exp, struct md_op_data *op_data,
         req->rq_request_portal = MDS_READPAGE_PORTAL;
 
         /* Ensure that this close's handle is fixed up during replay. */
-        LASSERT(och != NULL);
-        LASSERT(och->och_magic == OBD_CLIENT_HANDLE_MAGIC);
-        mod = och->och_mod;
-        if (likely(mod != NULL)) {
-                if (mod->mod_open_req->rq_type == LI_POISON) {
-                        CERROR("LBUG POISONED open %p!\n", mod->mod_open_req);
-                        LBUG();
-                        ptlrpc_req_finished(req);
-                        req = NULL;
-                        GOTO(out, rc = -EIO);
-                }
-                mod->mod_close_req = req;
-                DEBUG_REQ(D_HA, mod->mod_open_req, "matched open");
-        } else {
+        if (likely(mod != NULL))
+                list_add_tail(&req->rq_mod_list, &mod->mod_replay_list);
+        else
                 CDEBUG(D_HA, "couldn't find open req; expecting close error\n");
-        }
 
         mdc_close_pack(req, REQ_REC_OFF, op_data);
         ptlrpc_req_set_repsize(req, 4, repsize);
-        req->rq_commit_cb = mdc_commit_close;
+        req->rq_commit_cb = mdc_commit_delayed;
+        req->rq_replay = 1;
         LASSERT(req->rq_cb_data == NULL);
         req->rq_cb_data = mod;
 
@@ -859,7 +857,7 @@ int mdc_close(struct obd_export *exp, struct md_op_data *op_data,
                                 rc = -rc;
                 } else if (mod == NULL) {
                         if (req->rq_import->imp_replayable) 
-                                CERROR("Unexpected: can't find mdc_open_data," 
+                                CERROR("Unexpected: can't find md_open_data," 
                                        "but close succeeded with replayable imp"
                                        "Please tell CFS.\n");
                 }
@@ -881,7 +879,7 @@ int mdc_close(struct obd_export *exp, struct md_op_data *op_data,
 }
 
 int mdc_done_writing(struct obd_export *exp, struct md_op_data *op_data,
-                     struct obd_client_handle *och)
+                     struct md_open_data *mod)
 {
         struct obd_device *obd = class_exp2obd(exp);
         struct ptlrpc_request *req;
@@ -900,14 +898,29 @@ int mdc_done_writing(struct obd_export *exp, struct md_op_data *op_data,
         if (req == NULL)
                 RETURN(-ENOMEM);
 
-        /* XXX: add DONE_WRITING request to och -- when Size-on-MDS
-         * recovery will be ready. */
         mdc_close_pack(req, REQ_REC_OFF, op_data);
+        
+        req->rq_replay = 1;
+        req->rq_cb_data = mod;
+        req->rq_commit_cb = mdc_commit_delayed;
+        if (likely(mod != NULL))
+                list_add_tail(&req->rq_mod_list, &mod->mod_replay_list);
+        else
+                CDEBUG(D_HA, "couldn't find open req; expecting close error\n");
 
         ptlrpc_req_set_repsize(req, 2, repsize);
         mdc_get_rpc_lock(obd->u.cli.cl_close_lock, NULL);
         rc = ptlrpc_queue_wait(req);
         mdc_put_rpc_lock(obd->u.cli.cl_close_lock, NULL);
+
+        /* Close the open replay sequence if an error occured or no SOM
+         * attribute update is needed. */
+        if (rc != -EAGAIN)
+                ptlrpc_close_replay_seq(req);
+                
+        if (rc && rc != -EAGAIN && req->rq_commit_cb)
+                req->rq_commit_cb(req);
+
         ptlrpc_req_finished(req);
         RETURN(rc);
 }
@@ -1271,10 +1284,13 @@ static int mdc_pin(struct obd_export *exp, const struct lu_fid *fid,
 
         OBD_ALLOC(handle->och_mod, sizeof(*handle->och_mod));
         if (handle->och_mod == NULL) {
-                DEBUG_REQ(D_ERROR, req, "can't allocate mdc_open_data");
+                DEBUG_REQ(D_ERROR, req, "can't allocate md_open_data");
                 RETURN(-ENOMEM);
         }
-        handle->och_mod->mod_open_req = req; /* will be dropped by unpin */
+
+        /* will be dropped by unpin */
+        CFS_INIT_LIST_HEAD(&handle->och_mod->mod_replay_list);
+        list_add_tail(&req->rq_mod_list, &handle->och_mod->mod_replay_list);
 
         RETURN(rc);
 }
@@ -1308,7 +1324,14 @@ static int mdc_unpin(struct obd_export *exp,
                 CERROR("unpin failed: %d\n", rc);
 
         ptlrpc_req_finished(req);
-        ptlrpc_req_finished(handle->och_mod->mod_open_req);
+
+        LASSERT(!list_empty(&handle->och_mod->mod_replay_list));
+        req = list_entry(handle->och_mod->mod_replay_list.next,
+                         typeof(*req), rq_mod_list);
+        list_del_init(&req->rq_mod_list);
+        ptlrpc_req_finished(req);
+        LASSERT(list_empty(&handle->och_mod->mod_replay_list));
+
         OBD_FREE(handle->och_mod, sizeof(*handle->och_mod));
         RETURN(rc);
 }