Whamcloud - gitweb
- improved handling of errors returned from MDS intent operations.
authoradilger <adilger>
Thu, 17 Oct 2002 09:14:48 +0000 (09:14 +0000)
committeradilger <adilger>
Thu, 17 Oct 2002 09:14:48 +0000 (09:14 +0000)
- remove bogus setting of MDS reply body fields at client in ll_create_node
- always do deferred object creation with ll_create() for non-intent ops

lustre/llite/namei.c
lustre/mdc/mdc_reint.c
lustre/mds/mds_reint.c

index 33dacf6..0fa0fdc 100644 (file)
@@ -150,7 +150,7 @@ static struct dentry *ll_lookup2(struct inode *dir, struct dentry *dentry,
         struct ll_read_inode2_cookie lic;
         struct lustre_handle lockh;
         struct lookup_intent lookup_it = { IT_LOOKUP };
-        int err, offset;
+        int rc, offset;
         obd_id ino = 0;
 
         ENTRY;
@@ -172,9 +172,9 @@ static struct dentry *ll_lookup2(struct inode *dir, struct dentry *dentry,
         if (dentry->d_name.len > EXT2_NAME_LEN)
                 RETURN(ERR_PTR(-ENAMETOOLONG));
 
-        err = ll_lock(dir, dentry, it, &lockh);
-        if (err < 0)
-                RETURN(ERR_PTR(err));
+        rc = ll_lock(dir, dentry, it, &lockh);
+        if (rc < 0)
+                RETURN(ERR_PTR(rc));
         memcpy(it->it_lock_handle, &lockh, sizeof(lockh));
 
         request = (struct ptlrpc_request *)it->it_data;
@@ -198,10 +198,10 @@ static struct dentry *ll_lookup2(struct inode *dir, struct dentry *dentry,
                         if (it->it_status)
                                 GOTO(neg_req, NULL);
                 } else if (it->it_op & (IT_RENAME | IT_LINK)) {
-                        /* For rename, we want the lookup to succeed */
+                        /* For rename, we want the source lookup to succeed */
                         if (it->it_status) {
                                 it->it_data = NULL;
-                                GOTO(neg_req, NULL);
+                                GOTO(drop_req, rc = it->it_status);
                         }
                         it->it_data = dentry;
                 } else if (it->it_op & (IT_UNLINK | IT_RMDIR)) {
@@ -215,12 +215,11 @@ static struct dentry *ll_lookup2(struct inode *dir, struct dentry *dentry,
                         it->it_data = NULL;
                         if (it->it_status && it->it_status != -EEXIST)
                                 GOTO(neg_req, NULL);
-                } else if (it->it_op & (IT_RENAME2|IT_LINK2)) {
+                } else if (it->it_op & (IT_RENAME2 | IT_LINK2)) {
                         struct mds_body *body =
                                 lustre_msg_buf(request->rq_repmsg, offset);
                         it->it_data = NULL;
-                        /* For rename2, this means the lookup is negative */
-                        /* For link2 also */
+                        /* This means the target lookup is negative */
                         if (body->valid == 0)
                                 GOTO(neg_req, NULL);
                         goto iget;
@@ -234,11 +233,11 @@ static struct dentry *ll_lookup2(struct inode *dir, struct dentry *dentry,
                 }
                 ptlrpc_req_finished(request);
                 request = NULL;
-                err = mdc_getattr(&sbi->ll_mdc_conn, ino, mode,
-                                  valid, symlen, &request);
-                if (err) {
-                        CERROR("failure %d inode %Ld\n", err, (long long)ino);
-                        GOTO(drop_req, err = -abs(err));
+                rc = mdc_getattr(&sbi->ll_mdc_conn, ino, mode,
+                                 valid, symlen, &request);
+                if (rc) {
+                        CERROR("failure %d inode "LPX64"\n", rc, ino);
+                        GOTO(drop_req, rc = -abs(rc));
                 }
                 offset = 0;
         } else {
@@ -252,14 +251,14 @@ static struct dentry *ll_lookup2(struct inode *dir, struct dentry *dentry,
                 if (!ino) {
                         CERROR("inode %*s not found by name\n",
                                dentry->d_name.len, dentry->d_name.name);
-                        GOTO(drop_lock, err = -ENOENT);
+                        GOTO(drop_lock, rc = -ENOENT);
                 }
 
-                err = mdc_getattr(&sbi->ll_mdc_conn, ino, mode,
-                                  OBD_MD_FLNOTOBD|OBD_MD_FLEASIZE, 0, &request);
-                if (err) {
-                        CERROR("failure %d inode %Ld\n", err, (long long)ino);
-                        GOTO(drop_req, err = -abs(err));
+                rc = mdc_getattr(&sbi->ll_mdc_conn, ino, mode,
+                                 OBD_MD_FLNOTOBD|OBD_MD_FLEASIZE, 0, &request);
+                if (rc) {
+                        CERROR("failure %d inode "LPX64"\n", rc, ino);
+                        GOTO(drop_req, rc = -abs(rc));
                 }
         }
 
@@ -303,7 +302,7 @@ static struct dentry *ll_lookup2(struct inode *dir, struct dentry *dentry,
         ptlrpc_free_req(request);
  drop_lock:
 #warning FIXME: must release lock here
-        return ERR_PTR(err);
+        return ERR_PTR(rc);
 }
 
 static struct inode *ll_create_node(struct inode *dir, const char *name,
@@ -315,58 +314,58 @@ static struct inode *ll_create_node(struct inode *dir, const char *name,
         struct inode *inode;
         struct ptlrpc_request *request = NULL;
         struct mds_body *body;
-        int rc;
         time_t time = CURRENT_TIME;
         struct ll_sb_info *sbi = ll_i2sbi(dir);
-        int gid = current->fsgid;
         struct ll_read_inode2_cookie lic;
         struct lov_mds_md *lmm = NULL;
-        int mds_md_size = 0;
-
         ENTRY;
 
-        if (dir->i_mode & S_ISGID) {
-                gid = dir->i_gid;
-                if (S_ISDIR(mode))
-                        mode |= S_ISGID;
-        }
-
-        if (!it || !it->it_disposition) {
-                rc = mdc_create(&sbi->ll_mdc_conn, dir, name, namelen, tgt,
-                                 tgtlen, mode, current->fsuid,
-                                 gid, time, extra, lsm, &request);
+        if (it && it->it_disposition) {
+                int rc = it->it_status;
                 if (rc) {
-                        inode = ERR_PTR(rc);
-                        GOTO(out, rc);
+                        CERROR("error creating MDS inode for %*s: rc = %d\n",
+                               namelen, name, rc);
+                        RETURN(ERR_PTR(rc));
                 }
-                body = lustre_msg_buf(request->rq_repmsg, 0);
-                if (lsm != NULL) {
+                invalidate_inode_pages(dir);
+                request = it->it_data;
+                body = lustre_msg_buf(request->rq_repmsg, 1);
+                lic.lic_lmm = NULL;
+        } else {
+                int gid = current->fsgid;
+                int rc;
+
+                if (lsm) {
                         OBD_ALLOC(lmm, lsm->lsm_mds_easize);
+                        if (!lmm)
+                                RETURN(ERR_PTR(-ENOMEM));
                         lov_packmd(lmm, lsm);
                         lic.lic_lmm = lmm;
                 } else
                         lic.lic_lmm = NULL;
 
-        } else {
-                invalidate_inode_pages(dir);
-                request = it->it_data;
-                body = lustre_msg_buf(request->rq_repmsg, 1);
-                lic.lic_lmm = NULL;
-        }
-
-        body->valid = OBD_MD_FLNOTOBD;
+                if (dir->i_mode & S_ISGID) {
+                        gid = dir->i_gid;
+                        if (S_ISDIR(mode))
+                                mode |= S_ISGID;
+                }
 
-        body->nlink = 1;
-        body->atime = body->ctime = body->mtime = time;
-        body->uid = current->fsuid;
-        body->gid = gid;
-        body->mode = mode;
+                rc = mdc_create(&sbi->ll_mdc_conn, dir, name, namelen, tgt,
+                                tgtlen, mode, current->fsuid, gid,
+                                time, extra, lsm, &request);
+                if (rc) {
+                        inode = ERR_PTR(rc);
+                        GOTO(out, rc);
+                }
+                body = lustre_msg_buf(request->rq_repmsg, 0);
+        }
 
         lic.lic_body = body;
 
+        LASSERT(body->ino != 0);
         inode = iget4(dir->i_sb, body->ino, ll_find_inode, &lic);
         if (IS_ERR(inode)) {
-                rc = PTR_ERR(inode);
+                int rc = PTR_ERR(inode);
                 CERROR("new_inode -fatal: rc %d\n", rc);
                 LBUG();
                 GOTO(out, rc);
@@ -384,8 +383,8 @@ static struct inode *ll_create_node(struct inode *dir, const char *name,
 
         EXIT;
  out:
-        if (lmm)
-                OBD_FREE(lmm, mds_md_size);
+        if (lsm && lmm)
+                OBD_FREE(lmm, lsm->lsm_mds_easize);
         ptlrpc_req_finished(request);
         return inode;
 }
@@ -440,59 +439,35 @@ int ll_mdc_rename(struct inode *src, struct inode *tgt,
 }
 
 /*
- * By the time this is called, we already have created
- * the directory cache entry for the new file, but it
- * is so far negative - it has no inode.
+ * By the time this is called, we already have created the directory cache
+ * entry for the new file, but it is so far negative - it has no inode.
+ * We defer creating the OBD object(s) until open, to keep the intent and
+ * non-intent code paths similar, and also because we do not have the MDS
+ * inode number before calling ll_create_node() (which is needed for LOV),
+ * so we would need to do yet another RPC to the MDS to store the LOV EA
+ * data on the MDS.
  *
  * If the create succeeds, we fill in the inode information
  * with d_instantiate().
  */
-
 static int ll_create(struct inode *dir, struct dentry *dentry, int mode)
 {
-        int err, rc = 0;
+        struct lookup_intent *it = dentry->d_it;
         struct inode *inode;
-        struct lov_stripe_md *lsm = NULL;
-        struct ll_inode_info *lli = NULL;
+        int rc = 0;
         ENTRY;
 
         CHECK_MOUNT_EPOCH(dir);
 
-        if (dentry->d_it->it_disposition == 0) {
-                /* FIXME: we set the UID/GID fields to 0 for now, because it
-                 *        fixes a bug on the BA OSTs.  We should really set
-                 *        them properly, and this needs to be revisited when
-                 *        we do proper credentials checking on the OST, and
-                 *        set the attributes on the OST in ll_inode_setattr().
-                 *        See also ll_file_open() and ll_lov_setstripe().
-                gid_t gid = current->fsgid;
-
-                if (dir->i_mode & S_ISGID)
-                        gid = dir->i_gid;
-                rc = ll_create_objects(ll_i2obdconn(dir), 0, current->fsuid,
-                                       gid, &lsm);
-                */
-                rc = ll_create_objects(dir->i_sb, 0, 0, 0, &lsm);
-                CDEBUG(D_DENTRY, "name %*s mode %o: rc = %d\n",
-                       dentry->d_name.len, dentry->d_name.name, mode, rc);
-                if (rc)
-                        GOTO(out_free, rc);
-        }
-
         inode = ll_create_node(dir, dentry->d_name.name, dentry->d_name.len,
-                               NULL, 0, mode, 0, dentry->d_it, lsm);
+                               NULL, 0, mode, 0, it, NULL);
 
-        if (IS_ERR(inode)) {
-                rc = PTR_ERR(inode);
-                CERROR("error creating MDS object for %*s: rc = %d\n",
-                       dentry->d_name.len, dentry->d_name.name, rc);
-                GOTO(out_destroy, rc);
-        }
+        if (IS_ERR(inode))
+                RETURN(PTR_ERR(inode));
 
-        if (dentry->d_it->it_disposition) {
-                lli = ll_i2info(inode);
-                memcpy(&lli->lli_intent_lock_handle,
-                       dentry->d_it->it_lock_handle,
+        if (it->it_disposition) {
+                struct ll_inode_info *lli = ll_i2info(inode);
+                memcpy(&lli->lli_intent_lock_handle, it->it_lock_handle,
                        sizeof(struct lustre_handle));
                 d_instantiate(dentry, inode);
         } else {
@@ -500,28 +475,7 @@ static int ll_create(struct inode *dir, struct dentry *dentry, int mode)
                 rc = ext2_add_nondir(dentry, inode);
         }
 
-out_free:
         RETURN(rc);
-
-out_destroy:
-        if (lsm) {
-                struct obdo *oa;
-                oa = obdo_alloc();
-                if (oa) {
-                        oa->o_easize = lsm->lsm_mds_easize;
-                        oa->o_id = lsm->lsm_object_id;
-                        oa->o_valid |= OBD_MD_FLID | OBD_MD_FLEASIZE;
-                        err = obd_destroy(ll_i2obdconn(dir), oa, lsm);
-                        obdo_free(oa);
-                } else
-                        err = -ENOMEM;
-                if (err)
-                        CERROR("error uncreating objid "LPX64": err %d\n",
-                               lsm->lsm_object_id, err);
-                OBD_FREE(lsm, ll_ost_easize(dir->i_sb));
-        }
-
-        goto out_free;
 }
 
 static int ll_mknod(struct inode *dir, struct dentry *dentry, int mode,
@@ -586,19 +540,19 @@ static int ll_symlink(struct inode *dir, struct dentry *dentry,
 static int ll_link(struct dentry *old_dentry, struct inode * dir,
                    struct dentry *dentry)
 {
-        int err;
         struct inode *inode = old_dentry->d_inode;
+        struct lookup_intent *it = dentry->d_it;
+        int rc;
 
-        if (dentry->d_it && dentry->d_it->it_disposition) { 
-                int err = dentry->d_it->it_status;
-                if (err) 
-                        RETURN(err);
+        if (it && it->it_disposition) {
+                if (it->it_status)
+                        RETURN(it->it_status);
                 inode->i_ctime = CURRENT_TIME;
                 ext2_inc_count(inode);
                 atomic_inc(&inode->i_count);
                 d_instantiate(dentry, inode);
                 invalidate_inode_pages(dir);
-                RETURN(err);
+                RETURN(0);
         }
 
         if (S_ISDIR(inode->i_mode))
@@ -607,10 +561,10 @@ static int ll_link(struct dentry *old_dentry, struct inode * dir,
         if (inode->i_nlink >= EXT2_LINK_MAX)
                 return -EMLINK;
 
-        err = ll_mdc_link(old_dentry, dir,
+        rc = ll_mdc_link(old_dentry, dir,
                           dentry->d_name.name, dentry->d_name.len);
-        if (err)
-                RETURN(err);
+        if (rc)
+                RETURN(rc);
 
         inode->i_ctime = CURRENT_TIME;
         ext2_inc_count(inode);
@@ -619,7 +573,7 @@ static int ll_link(struct dentry *old_dentry, struct inode * dir,
         return ext2_add_nondir(dentry, inode);
 }
 
-static int ll_mkdir(struct inode * dir, struct dentry * dentry, int mode)
+static int ll_mkdir(struct inode *dir, struct dentry *dentry, int mode)
 {
         struct inode * inode;
         int err = -EMLINK;
@@ -665,20 +619,22 @@ out_dir:
         goto out;
 }
 
-static int ll_common_unlink(struct inode *dir, struct dentry *dentry,
-                            __u32 mode)
+static int ll_common_unlink(struct inode *dir, struct dentry *dentry,__u32 mode)
 {
-        struct inode * inode = dentry->d_inode;
+        struct lookup_intent *it = dentry->d_it;
+        struct inode *inode = dentry->d_inode;
 #if 0
         struct ext2_dir_entry_2 * de;
         struct page * page;
 #endif
-        int err = -ENOENT;
+        int rc;
 
-        if (dentry->d_it && dentry->d_it->it_disposition) {
-                err = dentry->d_it->it_status;
+        if (it && it->it_disposition) {
+                rc = it->it_status;
                 invalidate_inode_pages(dir);
-                GOTO(out, err);
+                if (rc)
+                        GOTO(out, rc);
+                GOTO(out_dec, 0);
         }
 
 #if 0
@@ -686,22 +642,27 @@ static int ll_common_unlink(struct inode *dir, struct dentry *dentry,
         if (!de)
                 goto out;
 #endif
-        err = ll_mdc_unlink(dir, dentry->d_inode, mode,
+        rc = ll_mdc_unlink(dir, dentry->d_inode, mode,
                             dentry->d_name.name, dentry->d_name.len);
-        if (err)
-                goto out;
+        if (rc)
+                GOTO(out, rc);
 
 #if 0
         err = ext2_delete_entry(de, page);
         if (err)
                 goto out;
 #endif
+
+        /* AED: not sure if needed - directory lock revocation should do it
+         * in the case where the client has cached it for non-intent ops.
+         */
         invalidate_inode_pages(dir);
 
         inode->i_ctime = dir->i_ctime;
-out:
+out_dec:
         ext2_dec_count(inode);
-        return err;
+out:
+        return rc;
 }
 
 static int ll_unlink(struct inode *dir, struct dentry *dentry)
@@ -712,42 +673,43 @@ static int ll_unlink(struct inode *dir, struct dentry *dentry)
 static int ll_rmdir(struct inode *dir, struct dentry *dentry)
 {
         struct inode * inode = dentry->d_inode;
-        int err = 0;
+        struct lookup_intent *it = dentry->d_it;
+        int rc = 0;
         ENTRY;
 
-        if (!dentry->d_it || dentry->d_it->it_disposition == 0) {
-                if (!ext2_empty_dir(inode))
-                        RETURN(-ENOTEMPTY);
-                err = ll_common_unlink(dir, dentry, S_IFDIR);
-        } else
-                err = dentry->d_it->it_status;
-        if (err)
-                RETURN(err);
-        inode->i_size = 0;
-        ext2_dec_count(inode);
-        ext2_dec_count(dir);
-        RETURN(err);
+        if ((!it || !it->it_disposition) && !ext2_empty_dir(inode))
+                RETURN(-ENOTEMPTY);
+
+        rc = ll_common_unlink(dir, dentry, S_IFDIR);
+        if (!rc) {
+                inode->i_size = 0;
+                ext2_dec_count(inode);
+                ext2_dec_count(dir);
+        }
+
+        RETURN(rc);
 }
 
 static int ll_rename(struct inode * old_dir, struct dentry * old_dentry,
                      struct inode * new_dir, struct dentry * new_dentry)
 {
+        struct lookup_intent *it = new_dentry->d_it;
         struct inode * old_inode = old_dentry->d_inode;
         struct inode * tgt_inode = new_dentry->d_inode;
         struct page * dir_page = NULL;
         struct ext2_dir_entry_2 * dir_de = NULL;
         struct ext2_dir_entry_2 * old_de;
         struct page * old_page;
-        int err = -ENOENT;
+        int err;
 
-        if (new_dentry->d_it && new_dentry->d_it->it_disposition) { 
+        if (it && it->it_disposition) {
                if (tgt_inode) {
                        tgt_inode->i_ctime = CURRENT_TIME;
                        tgt_inode->i_nlink--;
                }
                 invalidate_inode_pages(old_dir);
                 invalidate_inode_pages(new_dir);
-                GOTO(out, err = new_dentry->d_it->it_status);
+                GOTO(out, err = it->it_status);
         }
 
         err = ll_mdc_rename(old_dir, new_dir, old_dentry, new_dentry);
index 4267d3c..324de95 100644 (file)
@@ -79,23 +79,12 @@ int mdc_create(struct lustre_handle *conn,
                __u32 gid, __u64 time, __u64 rdev, struct lov_stripe_md *lsm,
                struct ptlrpc_request **request)
 {
-        struct mds_rec_create *rec;
         struct ptlrpc_request *req;
         int rc, size[3] = {sizeof(struct mds_rec_create), namelen + 1, 0};
-        char *tmp;
         int level, bufcount = 2;
         ENTRY;
 
-        if (S_ISREG(mode)) {
-                if (!lsm) {
-                        CERROR("File create, but no stripe md (%ld, %*s)\n",
-                               dir->i_ino, namelen, name);
-                        LBUG();
-                }
-                // size[2] = mdc->cl_max_mds_easize; soon...
-                size[2] = lsm->lsm_mds_easize;
-                bufcount = 3;
-        } else if (S_ISLNK(mode)) {
+        if (S_ISLNK(mode)) {
                 size[2] = tgtlen + 1;
                 bufcount = 3;
         }
@@ -105,17 +94,10 @@ int mdc_create(struct lustre_handle *conn,
         if (!req)
                 RETURN(-ENOMEM);
 
-        /* mds_create_pack fills msg->bufs[1] with name */
-        rec = lustre_msg_buf(req->rq_reqmsg, 0);
+        /* mds_create_pack fills msg->bufs[1] with name
+         * and msg->bufs[2] with tgt, for symlinks */
         mds_create_pack(req, 0, dir, mode, rdev, uid, gid, time,
-                        name, namelen, NULL, 0);
-
-        if (S_ISREG(mode)) {
-                lov_packmd(lustre_msg_buf(req->rq_reqmsg, 2), lsm);
-        } else if (S_ISLNK(mode)) {
-                tmp = lustre_msg_buf(req->rq_reqmsg, 2);
-                LOGL0(tgt, tgtlen, tmp);
-        }
+                        name, namelen, tgt, tgtlen);
 
         size[0] = sizeof(struct mds_body);
         req->rq_replen = lustre_msg_size(1, size);
@@ -124,11 +106,12 @@ int mdc_create(struct lustre_handle *conn,
  resend:
         rc = mdc_reint(req, level);
         if (rc == -ERESTARTSYS) {
-                __u32 *opcode = lustre_msg_buf(req->rq_reqmsg, 0);
+                struct mds_rec_create *rec = lustre_msg_buf(req->rq_reqmsg, 0);
+
                 level = LUSTRE_CONN_RECOVD;
                 CERROR("Lost reply: re-create rep.\n");
                 req->rq_flags = 0;
-                *opcode = NTOH__u32(REINT_RECREATE);
+                rec->cr_opcode = NTOH__u32(REINT_RECREATE);
                 goto resend;
         }
 
index 381a67a..f00e93f 100644 (file)
@@ -225,8 +225,7 @@ static int mds_reint_create(struct mds_update_record *rec, int offset,
         if (offset)
                 offset = 1;
 
-        if (strcmp(req->rq_export->exp_obd->obd_type->typ_name, "mds") != 0)
-                LBUG();
+        LASSERT(!strcmp(req->rq_export->exp_obd->obd_type->typ_name, "mds"));
 
         lock_mode = (req->rq_reqmsg->opc == MDS_REINT) ? LCK_CW : LCK_PW;