Whamcloud - gitweb
Branch HEAD
[fs/lustre-release.git] / lustre / mdc / mdc_request.c
index 6ca08a3..8eb065d 100644 (file)
@@ -188,7 +188,7 @@ static int mdc_getattr_common(struct obd_export *exp, unsigned int ea_size,
         CDEBUG(D_NET, "mode: %o\n", body->mode);
 
         offset = REPLY_REC_OFF + 1;
-        LASSERT_REPSWAB(req, offset);
+        lustre_set_rep_swabbed(req, offset);
         if (body->eadatasize != 0) {
                 /* reply indicates presence of eadata; check it's there... */
                 eadata = lustre_msg_buf(req->rq_repmsg, offset++,
@@ -499,52 +499,76 @@ int mdc_get_lustre_md(struct obd_export *exp, struct ptlrpc_request *req,
 
         md->body = lustre_msg_buf(req->rq_repmsg, offset, sizeof (*md->body));
         LASSERT (md->body != NULL);
-        LASSERT_REPSWABBED(req, offset);
+        LASSERT(lustre_rep_swabbed(req, offset));
         offset++;
 
         if (md->body->valid & OBD_MD_FLEASIZE) {
                 int lmmsize;
                 struct lov_mds_md *lmm;
 
-                LASSERT(S_ISREG(md->body->mode));
+                if (!S_ISREG(md->body->mode)) {
+                        CERROR("OBD_MD_FLEASIZE set, should be a regular file, "
+                               "but is not\n");
+                        GOTO(out, rc = -EPROTO);
+                }
 
                 if (md->body->eadatasize == 0) {
                         CERROR("OBD_MD_FLEASIZE set, but eadatasize 0\n");
-                        RETURN(-EPROTO);
+                        GOTO(out, rc = -EPROTO);
                 }
                 lmmsize = md->body->eadatasize;
                 lmm = lustre_msg_buf(req->rq_repmsg, offset, lmmsize);
-                LASSERT (lmm != NULL);
-                LASSERT_REPSWABBED(req, offset);
+                if (!lmm) {
+                        CERROR ("incorrect message: lmm == 0\n");
+                        GOTO(out, rc = -EPROTO);
+                }
+                LASSERT(lustre_rep_swabbed(req, offset));
 
                 rc = obd_unpackmd(dt_exp, &md->lsm, lmm, lmmsize);
                 if (rc < 0)
-                        RETURN(rc);
+                        GOTO(out, rc);
+
+                if (rc < sizeof(*md->lsm)) {
+                        CERROR ("lsm size too small:  rc < sizeof (*md->lsm) "
+                                "(%d < %d)\n", rc, sizeof(*md->lsm));
+                        GOTO(out, rc = -EPROTO);
+                }
 
-                LASSERT (rc >= sizeof (*md->lsm));
                 offset++;
         } else if (md->body->valid & OBD_MD_FLDIREA) {
                 int lmvsize;
                 struct lov_mds_md *lmv;
-                
-                LASSERT(S_ISDIR(md->body->mode));
+
+                if(!S_ISDIR(md->body->mode)) {
+                        CERROR("OBD_MD_FLDIREA set, should be a directory, but "
+                               "is not\n");
+                        GOTO(out, rc = -EPROTO);
+                }
 
                 if (md->body->eadatasize == 0) {
-                        CERROR("OBD_MD_FLEASIZE is set, but eadatasize 0\n");
+                        CERROR("OBD_MD_FLDIREA is set, but eadatasize 0\n");
                         RETURN(-EPROTO);
                 }
                 if (md->body->valid & OBD_MD_MEA) {
                         lmvsize = md->body->eadatasize;
                         lmv = lustre_msg_buf(req->rq_repmsg, offset, lmvsize);
-                        LASSERT (lmv != NULL);
-                        LASSERT_REPSWABBED(req, offset);
+                        if (!lmv) {
+                                CERROR ("incorrect message: lmv == 0\n");
+                                GOTO(out, rc = -EPROTO);
+                        }
+
+                        LASSERT(lustre_rep_swabbed(req, offset));
 
                         rc = obd_unpackmd(md_exp, (void *)&md->mea, lmv,
                                           lmvsize);
                         if (rc < 0)
-                                RETURN(rc);
+                                GOTO(out, rc);
 
-                        LASSERT (rc >= sizeof (*md->mea));
+                        if (rc < sizeof(*md->mea)) {
+                                CERROR ("size too small:  rc < sizeof(*md->mea) "
+                                        "(%d < %d)\n", rc, sizeof(*md->mea));
+                                GOTO(out, rc = -EPROTO);
+                        }
                 }
                 offset++;
         }
@@ -554,7 +578,10 @@ int mdc_get_lustre_md(struct obd_export *exp, struct ptlrpc_request *req,
         if (md->body->valid & OBD_MD_FLRMTPERM) {
                 md->remote_perm = lustre_msg_buf(req->rq_repmsg, offset++,
                                                 sizeof(struct mdt_remote_perm));
-                LASSERT(md->remote_perm);
+                if (!md->remote_perm) {
+                        CERROR ("incorrect message: remote_perm == 0\n");
+                        GOTO(out, rc = -EPROTO);
+                }
         }
 
         /* for ACL, it's possible that FLACL is set but aclsize is zero.  only
@@ -610,26 +637,10 @@ int mdc_free_lustre_md(struct obd_export *exp, struct lustre_md *md)
         RETURN(0);
 }
 
-static void mdc_commit_open(struct ptlrpc_request *req)
-{
-        struct mdc_open_data *mod = req->rq_cb_data;
-        if (mod == NULL)
-                return;
-
-        if (mod->mod_close_req != NULL)
-                mod->mod_close_req->rq_cb_data = NULL;
-
-        if (mod->mod_och != NULL)
-                mod->mod_och->och_mod = NULL;
-
-        OBD_FREE(mod, sizeof(*mod));
-        req->rq_cb_data = NULL;
-}
-
 static void mdc_replay_open(struct ptlrpc_request *req)
 {
-        struct mdc_open_data *mod = req->rq_cb_data;
-        struct ptlrpc_request *close_req;
+        struct md_open_data *mod = req->rq_cb_data;
+        struct ptlrpc_request *cur, *tmp;
         struct obd_client_handle *och;
         struct lustre_handle old;
         struct mdt_body *body;
@@ -644,43 +655,98 @@ static void mdc_replay_open(struct ptlrpc_request *req)
 
         body = lustre_swab_repbuf(req, DLM_REPLY_REC_OFF, sizeof(*body),
                                   lustre_swab_mdt_body);
+        LASSERT(body != NULL);
 
         och = mod->mod_och;
         if (och != NULL) {
                 struct lustre_handle *file_fh;
 
                 LASSERT(och->och_magic == OBD_CLIENT_HANDLE_MAGIC);
-                LASSERT(body != NULL);
 
                 file_fh = &och->och_fh;
                 CDEBUG(D_HA, "updating handle from "LPX64" to "LPX64"\n",
                        file_fh->cookie, body->handle.cookie);
-                memcpy(&old, file_fh, sizeof(old));
-                memcpy(file_fh, &body->handle, sizeof(*file_fh));
+                old = *file_fh;
+                *file_fh = body->handle;
         }
-        close_req = mod->mod_close_req;
-        if (close_req != NULL) {
-                struct mdt_epoch *epoch;
-
-                LASSERT(lustre_msg_get_opc(close_req->rq_reqmsg) == MDS_CLOSE);
-                LASSERT(body != NULL);
 
-                epoch = lustre_msg_buf(close_req->rq_reqmsg, REQ_REC_OFF,
-                                       sizeof(*epoch));
-                LASSERT(epoch);
-                if (och != NULL)
-                        LASSERT(!memcmp(&old, &epoch->handle, sizeof(old)));
-                DEBUG_REQ(D_HA, close_req, "updating close body with new fh");
-                memcpy(&epoch->handle, &body->handle, sizeof(epoch->handle));
+        list_for_each_entry_safe(cur, tmp, &mod->mod_replay_list, rq_mod_list) {
+                int opc = lustre_msg_get_opc(cur->rq_reqmsg);
+                struct mdt_epoch *epoch = NULL;
+
+                if (opc == MDS_CLOSE || opc == MDS_DONE_WRITING) {
+                        epoch = lustre_msg_buf(cur->rq_reqmsg,
+                                               REQ_REC_OFF, sizeof(*epoch));
+                        LASSERT(epoch);
+                        DEBUG_REQ(D_HA, cur, "updating %s body with new fh",
+                                  opc == MDS_CLOSE ? "CLOSE" : "DONE_WRITING");
+                } else if (opc == MDS_REINT) {
+                        struct mdt_rec_setattr *rec;
+                        
+                        /* Check this is REINT_SETATTR. */
+                        rec = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF,
+                                             sizeof (*rec));
+                        LASSERT(rec && rec->sa_opcode == REINT_SETATTR);
+
+                        epoch = lustre_msg_buf(cur->rq_reqmsg,
+                                               REQ_REC_OFF + 2, sizeof(*epoch));
+                        LASSERT(epoch);
+                        DEBUG_REQ(D_HA, cur, "updating REINT_SETATTR body "
+                                  "with new fh");
+                }
+                if (epoch) {
+                        if (och != NULL)
+                                LASSERT(!memcmp(&old, &epoch->handle,
+                                                sizeof(old)));
+                        epoch->handle = body->handle;
+                }
         }
         EXIT;
 }
 
+void mdc_commit_delayed(struct ptlrpc_request *req)
+{
+        struct md_open_data *mod = req->rq_cb_data;
+        struct ptlrpc_request *cur, *tmp;
+        
+        DEBUG_REQ(D_HA, req, "req committed");
+
+        if (mod == NULL)
+                return;
+
+        req->rq_cb_data = NULL;
+        req->rq_commit_cb = NULL;
+        list_del_init(&req->rq_mod_list);
+        if (req->rq_sequence) {
+                list_for_each_entry_safe(cur, tmp, &mod->mod_replay_list,
+                                         rq_mod_list) {
+                        LASSERT(cur != LP_POISON);
+                        LASSERT(cur->rq_type != LI_POISON);
+                        DEBUG_REQ(D_HA, cur, "req balanced");
+                        LASSERT(cur->rq_transno != 0);
+                        LASSERT(cur->rq_import == req->rq_import);
+
+                        /* We no longer want to preserve this for transno-
+                         * unconditional replay. */
+                        spin_lock(&cur->rq_lock);
+                        cur->rq_replay = 0;
+                        spin_unlock(&cur->rq_lock);
+                }
+        }
+
+        if (list_empty(&mod->mod_replay_list)) {
+                if (mod->mod_och != NULL)
+                        mod->mod_och->och_mod = NULL;
+
+                OBD_FREE_PTR(mod);
+        }
+}
+
 int mdc_set_open_replay_data(struct obd_export *exp,
                              struct obd_client_handle *och,
                              struct ptlrpc_request *open_req)
 {
-        struct mdc_open_data *mod;
+        struct md_open_data *mod;
         struct mdt_rec_create *rec = lustre_msg_buf(open_req->rq_reqmsg,
                                                     DLM_INTENT_REC_OFF,
                                                     sizeof(*rec));
@@ -693,7 +759,7 @@ int mdc_set_open_replay_data(struct obd_export *exp,
         LASSERT(rec != NULL);
 
         /* Incoming message in my byte order (it's been swabbed). */
-        LASSERT_REPSWABBED(open_req, DLM_REPLY_REC_OFF);
+        LASSERT(lustre_rep_swabbed(open_req, DLM_REPLY_REC_OFF));
 
         /* Outgoing messages always in my byte order. */
         LASSERT(body != NULL);
@@ -703,9 +769,10 @@ int mdc_set_open_replay_data(struct obd_export *exp,
                 OBD_ALLOC(mod, sizeof(*mod));
                 if (mod == NULL) {
                         DEBUG_REQ(D_ERROR, open_req,
-                                  "Can't allocate mdc_open_data");
+                                  "Can't allocate md_open_data");
                         RETURN(0);
                 }
+                CFS_INIT_LIST_HEAD(&mod->mod_replay_list);
 
                 spin_lock(&open_req->rq_lock);
                 if (!open_req->rq_replay) {
@@ -717,8 +784,8 @@ int mdc_set_open_replay_data(struct obd_export *exp,
                 och->och_mod = mod;
                 mod->mod_och = och;
                 open_req->rq_cb_data = mod;
-                mod->mod_open_req = open_req;
-                open_req->rq_commit_cb = mdc_commit_open;
+                list_add_tail(&open_req->rq_mod_list, &mod->mod_replay_list);
+                open_req->rq_commit_cb = mdc_commit_delayed;
                 spin_unlock(&open_req->rq_lock);
         }
 
@@ -732,19 +799,19 @@ int mdc_set_open_replay_data(struct obd_export *exp,
                 LBUG();
         }
 
-        DEBUG_REQ(D_HA, open_req, "Set up open replay data");
+        DEBUG_REQ(D_RPCTRACE, open_req, "Set up open replay data");
         RETURN(0);
 }
 
 int mdc_clear_open_replay_data(struct obd_export *exp,
                                struct obd_client_handle *och)
 {
-        struct mdc_open_data *mod = och->och_mod;
+        struct md_open_data *mod = och->och_mod;
         ENTRY;
 
         /*
-         * Don't free the structure now (it happens in mdc_commit_open(), after
-         * we're sure we won't need to fix up the close request in the future),
+         * Don't free the structure now (it happens in mdc_commit_delayed(),
+         * after the last request is removed from its replay list),
          * but make sure that replay doesn't poke at the och, which is about to
          * be freed.
          */
@@ -756,40 +823,8 @@ int mdc_clear_open_replay_data(struct obd_export *exp,
         RETURN(0);
 }
 
-static void mdc_commit_close(struct ptlrpc_request *req)
-{
-        struct mdc_open_data *mod = req->rq_cb_data;
-        struct ptlrpc_request *open_req;
-        struct obd_import *imp = req->rq_import;
-
-        DEBUG_REQ(D_HA, req, "close req committed");
-        if (mod == NULL)
-                return;
-
-        mod->mod_close_req = NULL;
-        req->rq_cb_data = NULL;
-        req->rq_commit_cb = NULL;
-
-        open_req = mod->mod_open_req;
-        LASSERT(open_req != NULL);
-        LASSERT(open_req != LP_POISON);
-        LASSERT(open_req->rq_type != LI_POISON);
-
-        DEBUG_REQ(D_HA, open_req, "open req balanced");
-        LASSERT(open_req->rq_transno != 0);
-        LASSERT(open_req->rq_import == imp);
-
-        /*
-         * We no longer want to preserve this for transno-unconditional
-         * replay. Decref open req here as well.
-         */
-        spin_lock(&open_req->rq_lock);
-        open_req->rq_replay = 0;
-        spin_unlock(&open_req->rq_lock);
-}
-
 int mdc_close(struct obd_export *exp, struct md_op_data *op_data,
-              struct obd_client_handle *och, struct ptlrpc_request **request)
+              struct md_open_data *mod, struct ptlrpc_request **request)
 {
         struct obd_device *obd = class_exp2obd(exp);
         int reqsize[4] = { sizeof(struct ptlrpc_body),
@@ -800,7 +835,6 @@ int mdc_close(struct obd_export *exp, struct md_op_data *op_data,
                            obd->u.cli.cl_max_mds_easize,
                            obd->u.cli.cl_max_mds_cookiesize };
         struct ptlrpc_request *req;
-        struct mdc_open_data *mod;
         int rc;
         ENTRY;
 
@@ -818,26 +852,15 @@ int mdc_close(struct obd_export *exp, struct md_op_data *op_data,
         req->rq_request_portal = MDS_READPAGE_PORTAL;
 
         /* Ensure that this close's handle is fixed up during replay. */
-        LASSERT(och != NULL);
-        LASSERT(och->och_magic == OBD_CLIENT_HANDLE_MAGIC);
-        mod = och->och_mod;
-        if (likely(mod != NULL)) {
-                if (mod->mod_open_req->rq_type == LI_POISON) {
-                        CERROR("LBUG POISONED open %p!\n", mod->mod_open_req);
-                        LBUG();
-                        ptlrpc_req_finished(req);
-                        req = NULL;
-                        GOTO(out, rc = -EIO);
-                }
-                mod->mod_close_req = req;
-                DEBUG_REQ(D_HA, mod->mod_open_req, "matched open");
-        } else {
+        if (likely(mod != NULL))
+                list_add_tail(&req->rq_mod_list, &mod->mod_replay_list);
+        else
                 CDEBUG(D_HA, "couldn't find open req; expecting close error\n");
-        }
 
         mdc_close_pack(req, REQ_REC_OFF, op_data);
         ptlrpc_req_set_repsize(req, 4, repsize);
-        req->rq_commit_cb = mdc_commit_close;
+        req->rq_commit_cb = mdc_commit_delayed;
+        req->rq_replay = 1;
         LASSERT(req->rq_cb_data == NULL);
         req->rq_cb_data = mod;
 
@@ -846,7 +869,7 @@ int mdc_close(struct obd_export *exp, struct md_op_data *op_data,
         mdc_put_rpc_lock(obd->u.cli.cl_close_lock, NULL);
 
         if (req->rq_repmsg == NULL) {
-                CDEBUG(D_HA, "request failed to send: %p, %d\n", req,
+                CDEBUG(D_RPCTRACE, "request failed to send: %p, %d\n", req,
                        req->rq_status);
                 if (rc == 0)
                         rc = req->rq_status ? req->rq_status : -EIO;
@@ -859,7 +882,7 @@ int mdc_close(struct obd_export *exp, struct md_op_data *op_data,
                                 rc = -rc;
                 } else if (mod == NULL) {
                         if (req->rq_import->imp_replayable) 
-                                CERROR("Unexpected: can't find mdc_open_data," 
+                                CERROR("Unexpected: can't find md_open_data," 
                                        "but close succeeded with replayable imp"
                                        "Please tell CFS.\n");
                 }
@@ -881,7 +904,7 @@ int mdc_close(struct obd_export *exp, struct md_op_data *op_data,
 }
 
 int mdc_done_writing(struct obd_export *exp, struct md_op_data *op_data,
-                     struct obd_client_handle *och)
+                     struct md_open_data *mod)
 {
         struct obd_device *obd = class_exp2obd(exp);
         struct ptlrpc_request *req;
@@ -900,14 +923,29 @@ int mdc_done_writing(struct obd_export *exp, struct md_op_data *op_data,
         if (req == NULL)
                 RETURN(-ENOMEM);
 
-        /* XXX: add DONE_WRITING request to och -- when Size-on-MDS
-         * recovery will be ready. */
         mdc_close_pack(req, REQ_REC_OFF, op_data);
+        
+        req->rq_replay = 1;
+        req->rq_cb_data = mod;
+        req->rq_commit_cb = mdc_commit_delayed;
+        if (likely(mod != NULL))
+                list_add_tail(&req->rq_mod_list, &mod->mod_replay_list);
+        else
+                CDEBUG(D_HA, "couldn't find open req; expecting close error\n");
 
         ptlrpc_req_set_repsize(req, 2, repsize);
         mdc_get_rpc_lock(obd->u.cli.cl_close_lock, NULL);
         rc = ptlrpc_queue_wait(req);
         mdc_put_rpc_lock(obd->u.cli.cl_close_lock, NULL);
+
+        /* Close the open replay sequence if an error occured or no SOM
+         * attribute update is needed. */
+        if (rc != -EAGAIN)
+                ptlrpc_close_replay_seq(req);
+                
+        if (rc && rc != -EAGAIN && req->rq_commit_cb)
+                req->rq_commit_cb(req);
+
         ptlrpc_req_finished(req);
         RETURN(rc);
 }
@@ -1016,14 +1054,10 @@ static int mdc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
         int rc;
         ENTRY;
 
-#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
-        MOD_INC_USE_COUNT;
-#else
         if (!try_module_get(THIS_MODULE)) {
                 CERROR("Can't get module. Is it alive?");
                 return -EINVAL;
         }
-#endif
         switch (cmd) {
         case OBD_IOC_CLIENT_RECOVER:
                 rc = ptlrpc_recover_import(imp, data->ioc_inlbuf1);
@@ -1056,11 +1090,7 @@ static int mdc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
                 GOTO(out, rc = -ENOTTY);
         }
 out:
-#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
-        MOD_DEC_USE_COUNT;
-#else
         module_put(THIS_MODULE);
-#endif
 
         return rc;
 }
@@ -1271,10 +1301,13 @@ static int mdc_pin(struct obd_export *exp, const struct lu_fid *fid,
 
         OBD_ALLOC(handle->och_mod, sizeof(*handle->och_mod));
         if (handle->och_mod == NULL) {
-                DEBUG_REQ(D_ERROR, req, "can't allocate mdc_open_data");
+                DEBUG_REQ(D_ERROR, req, "can't allocate md_open_data");
                 RETURN(-ENOMEM);
         }
-        handle->och_mod->mod_open_req = req; /* will be dropped by unpin */
+
+        /* will be dropped by unpin */
+        CFS_INIT_LIST_HEAD(&handle->och_mod->mod_replay_list);
+        list_add_tail(&req->rq_mod_list, &handle->och_mod->mod_replay_list);
 
         RETURN(rc);
 }
@@ -1308,7 +1341,14 @@ static int mdc_unpin(struct obd_export *exp,
                 CERROR("unpin failed: %d\n", rc);
 
         ptlrpc_req_finished(req);
-        ptlrpc_req_finished(handle->och_mod->mod_open_req);
+
+        LASSERT(!list_empty(&handle->och_mod->mod_replay_list));
+        req = list_entry(handle->och_mod->mod_replay_list.next,
+                         typeof(*req), rq_mod_list);
+        list_del_init(&req->rq_mod_list);
+        ptlrpc_req_finished(req);
+        LASSERT(list_empty(&handle->och_mod->mod_replay_list));
+
         OBD_FREE(handle->och_mod, sizeof(*handle->och_mod));
         RETURN(rc);
 }
@@ -1568,6 +1608,7 @@ static int mdc_cleanup(struct obd_device *obd)
         OBD_FREE(cli->cl_setattr_lock, sizeof (*cli->cl_setattr_lock));
         OBD_FREE(cli->cl_close_lock, sizeof (*cli->cl_close_lock));
 
+        ptlrpc_lprocfs_unregister_obd(obd);
         lprocfs_obd_cleanup(obd);
         ptlrpcd_decref();