Whamcloud - gitweb
b=3920
[fs/lustre-release.git] / lustre / mds / mds_unlink_open.c
index 84a4090..c11328c 100644 (file)
@@ -60,6 +60,7 @@ int mds_open_unlink_rename(struct mds_update_record *rec,
         struct dentry *pending_child;
         char fidname[LL_FID_NAMELEN];
         int fidlen = 0, rc;
+        unsigned mode;
         ENTRY;
 
         LASSERT(!mds_inode_is_orphan(dchild->d_inode));
@@ -68,9 +69,8 @@ int mds_open_unlink_rename(struct mds_update_record *rec,
         fidlen = ll_fid2str(fidname, dchild->d_inode->i_ino,
                             dchild->d_inode->i_generation);
 
-        CWARN("pending destroy of %dx open file %s = %s\n",
-              mds_open_orphan_count(dchild->d_inode),
-              rec->ur_name, fidname);
+        CDEBUG(D_HA, "pending destroy of %dx open file %s = %s\n",
+               mds_open_orphan_count(dchild->d_inode), rec->ur_name, fidname);
 
         pending_child = lookup_one_len(fidname, mds->mds_pending_dir, fidlen);
         if (IS_ERR(pending_child))
@@ -82,18 +82,26 @@ int mds_open_unlink_rename(struct mds_update_record *rec,
                 GOTO(out_dput, rc = 0);
         }
 
-        *handle = fsfilt_start(obd, pending_dir, FSFILT_OP_RENAME, NULL);
-        if (IS_ERR(*handle))
-                GOTO(out_dput, rc = PTR_ERR(*handle));
-
-        lock_kernel();
-        rc = vfs_rename(dparent->d_inode, dchild, pending_dir, pending_child);
-        unlock_kernel();
+        /* link() is semanticaly-wrong for S_IFDIR, so we set S_IFREG
+         * for linking and return real mode back then -bzzz */
+        mode = dchild->d_inode->i_mode;
+        dchild->d_inode->i_mode = S_IFREG;
+        rc = vfs_link(dchild, pending_dir, pending_child);
         if (rc)
-                CERROR("error renaming orphan %lu/%s to PENDING: rc = %d\n",
-                       dparent->d_inode->i_ino, rec->ur_name, rc);
+                CERROR("error linking orphan %s to PENDING: rc = %d\n",
+                       rec->ur_name, rc);
         else
                 mds_inode_set_orphan(dchild->d_inode);
+
+        /* return mode and correct i_nlink if inode is directory */
+        LASSERT(dchild->d_inode->i_nlink == 1);
+        dchild->d_inode->i_mode = mode;
+        if ((mode & S_IFMT) == S_IFDIR) {
+                dchild->d_inode->i_nlink++;
+                pending_dir->i_nlink++;
+        }
+        mark_inode_dirty(dchild->d_inode);
+
 out_dput:
         dput(pending_child);
 out_lock:
@@ -101,29 +109,23 @@ out_lock:
         RETURN(rc);
 }
 
-static int mds_osc_destroy_orphan(struct mds_obd *mds, 
-                                  struct ptlrpc_request *request)
+static int mds_osc_destroy_orphan(struct mds_obd *mds,
+                                  struct inode *inode,
+                                  struct lov_mds_md *lmm,
+                                  int lmm_size,
+                                  struct llog_cookie *logcookies,
+                                  int log_unlink)
 {
-        struct mds_body *body;
-        struct lov_mds_md *lmm = NULL;
         struct lov_stripe_md *lsm = NULL;
         struct obd_trans_info oti = { 0 };
         struct obdo *oa;
         int rc;
         ENTRY;
 
-        body = lustre_msg_buf(request->rq_repmsg, 0, sizeof(*body));
-        if (!(body->valid & OBD_MD_FLEASIZE))
+        if (lmm_size == 0)
                 RETURN(0);
-        if (body->eadatasize == 0) {
-                CERROR("OBD_MD_FLEASIZE set but eadatasize zero\n");
-                RETURN(rc = -EPROTO); 
-        }
 
-        lmm = lustre_msg_buf(request->rq_repmsg, 1, body->eadatasize);
-        LASSERT(lmm != NULL);
-
-        rc = obd_unpackmd(mds->mds_osc_exp, &lsm, lmm, body->eadatasize);
+        rc = obd_unpackmd(mds->mds_osc_exp, &lsm, lmm, lmm_size);
         if (rc < 0) {
                 CERROR("Error unpack md %p\n", lmm);
                 RETURN(rc);
@@ -136,24 +138,19 @@ static int mds_osc_destroy_orphan(struct mds_obd *mds,
         if (oa == NULL)
                 GOTO(out_free_memmd, rc = -ENOMEM);
         oa->o_id = lsm->lsm_object_id;
-        oa->o_mode = body->mode & S_IFMT;
-        oa->o_valid = OBD_MD_FLID | OBD_MD_FLTYPE;
+        oa->o_gr = FILTER_GROUP_FIRST_MDS + mds->mds_num;
+        oa->o_mode = inode->i_mode & S_IFMT;
+        oa->o_valid = OBD_MD_FLID | OBD_MD_FLTYPE | OBD_MD_FLGROUP;
 
-        if (body->valid & OBD_MD_FLCOOKIE) {
+        if (log_unlink && logcookies) {
                 oa->o_valid |= OBD_MD_FLCOOKIE;
-                oti.oti_logcookies = 
-                        lustre_msg_buf(request->rq_repmsg, 2,
-                                       sizeof(struct llog_cookie) *
-                                       lsm->lsm_stripe_count);
-                if (oti.oti_logcookies == NULL)
-                        oa->o_valid &= ~OBD_MD_FLCOOKIE;
-                        body->valid &= ~OBD_MD_FLCOOKIE;
+                oti.oti_logcookies = logcookies;
         }
 
         rc = obd_destroy(mds->mds_osc_exp, oa, lsm, &oti);
         obdo_free(oa);
-        if (rc) 
-                CERROR("destroy orphan objid 0x"LPX64" on ost error "
+        if (rc)
+                CDEBUG(D_INODE, "destroy orphan objid 0x"LPX64" on ost error "
                        "%d\n", lsm->lsm_object_id, rc);
 out_free_memmd:
         obd_free_memmd(mds->mds_osc_exp, &lsm);
@@ -164,76 +161,96 @@ static int mds_unlink_orphan(struct obd_device *obd, struct dentry *dchild,
                              struct inode *inode, struct inode *pending_dir)
 {
         struct mds_obd *mds = &obd->u.mds;
-        struct mds_body *body;
+        struct lov_mds_md *lmm = NULL;
+        struct llog_cookie *logcookies = NULL;
+        int lmm_size = 0, log_unlink = 0;
         void *handle = NULL;
-        struct ptlrpc_request *req;
-        int lengths[3] = {sizeof(struct mds_body),
-                          mds->mds_max_mdsize,
-                          mds->mds_max_cookiesize};
-        int rc;
+        int rc, err;
         ENTRY;
 
         LASSERT(mds->mds_osc_obd != NULL);
-        OBD_ALLOC(req, sizeof(*req));
-        if (!req) {
-                CERROR("request allocation out of memory\n");
-                GOTO(err_alloc_req, rc = -ENOMEM);
-        }
-        rc = lustre_pack_reply(req, 3, lengths, NULL);
-        if (rc) {
-                CERROR("cannot pack request %d\n", rc);
-                GOTO(out_free_req, rc);
-        }
-        body = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*body));
-        LASSERT(body != NULL);
 
-        mds_pack_inode2body(body, inode);
-        mds_pack_md(obd, req->rq_repmsg, 1, body, inode, 1);
+        OBD_ALLOC(lmm, mds->mds_max_mdsize);
+        if (lmm == NULL)
+                RETURN(-ENOMEM);
+
+        down(&inode->i_sem);
+        rc = fsfilt_get_md(obd, inode, lmm, mds->mds_max_mdsize);
+        up(&inode->i_sem);
+
+        if (rc < 0) {
+                CERROR("Error %d reading eadata for ino %lu\n",
+                       rc, inode->i_ino);
+                GOTO(out_free_lmm, rc);
+        } else if (rc > 0) {
+                lmm_size = rc;
+                rc = mds_convert_lov_ea(obd, inode, lmm, lmm_size);
+                if (rc > 0)
+                        lmm_size = rc;
+                rc = 0;
+        }
 
-        handle = fsfilt_start(obd, pending_dir, FSFILT_OP_UNLINK_LOG, NULL);
+        handle = fsfilt_start_log(obd, pending_dir, FSFILT_OP_UNLINK, NULL,
+                                  le32_to_cpu(lmm->lmm_stripe_count));
         if (IS_ERR(handle)) {
                 rc = PTR_ERR(handle);
                 CERROR("error fsfilt_start: %d\n", rc);
                 handle = NULL;
-                GOTO(out_free_msg, rc);
+                GOTO(out_free_lmm, rc);
+        }
+
+        down(&inode->i_sem);
+        rc = fsfilt_get_md(obd, inode, lmm, mds->mds_max_mdsize);
+        up(&inode->i_sem);
+
+        if (rc < 0) {
+                CERROR("Error %d reading eadata for ino %lu\n",
+                       rc, inode->i_ino);
+                GOTO(out_free_lmm, rc);
+        } else if (rc > 0) {
+                lmm_size = rc;
+                rc = 0;
         }
 
-        if (S_ISDIR(inode->i_mode)) {
+        if (S_ISDIR(inode->i_mode))
                 rc = vfs_rmdir(pending_dir, dchild);
-        } else {
+        else
                 rc = vfs_unlink(pending_dir, dchild);
-        }
-        if (rc) 
+
+        if (rc)
                 CERROR("error %d unlinking orphan %*s from PENDING directory\n",
                        rc, dchild->d_name.len, dchild->d_name.name);
 
-        if ((body->valid & OBD_MD_FLEASIZE)) {
-                if (mds_log_op_unlink(obd, inode, req->rq_repmsg, 1) > 0)
-                        body->valid |= OBD_MD_FLCOOKIE;
+        if (!rc && lmm_size) {
+                OBD_ALLOC(logcookies, mds->mds_max_cookiesize);
+                if (logcookies == NULL)
+                        rc = -ENOMEM;
+                else if (mds_log_op_unlink(obd, inode, lmm,lmm_size,logcookies,
+                                           mds->mds_max_cookiesize) > 0)
+                        log_unlink = 1;
         }
-
-        if (handle) {
-                int err = fsfilt_commit(obd, pending_dir, handle, 0);
-                if (err) {
-                        CERROR("error committing orphan unlink: %d\n", err);
+        err = fsfilt_commit(obd, mds->mds_sb, pending_dir, handle, 0);
+        if (err) {
+                CERROR("error committing orphan unlink: %d\n", err);
+                if (!rc)
                         rc = err;
-                        GOTO(out_free_msg, rc);
-                }
         }
-        rc = mds_osc_destroy_orphan(mds, req);
-out_free_msg:
-        OBD_FREE(req->rq_repmsg, req->rq_replen);
-        req->rq_repmsg = NULL;
-out_free_req:
-        OBD_FREE(req, sizeof(*req));
-err_alloc_req:
+        if (!rc) {
+                rc = mds_osc_destroy_orphan(mds, inode, lmm, lmm_size,
+                                            logcookies, log_unlink);
+        }
+
+        if (logcookies != NULL)
+                OBD_FREE(logcookies, mds->mds_max_cookiesize);
+out_free_lmm:
+        OBD_FREE(lmm, mds->mds_max_mdsize);
         RETURN(rc);
 }
 
 int mds_cleanup_orphans(struct obd_device *obd)
 {
         struct mds_obd *mds = &obd->u.mds;
-        struct obd_run_ctxt saved;
+        struct lvfs_run_ctxt saved;
         struct file *file;
         struct dentry *dchild, *dentry;
         struct vfsmount *mnt;
@@ -245,7 +262,7 @@ int mds_cleanup_orphans(struct obd_device *obd)
         int rc = 0, item = 0, namlen;
         ENTRY;
 
-        push_ctxt(&saved, &obd->obd_ctxt, NULL);
+        push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
         dentry = dget(mds->mds_pending_dir);
         if (IS_ERR(dentry))
                 GOTO(err_pop, rc = PTR_ERR(dentry));
@@ -275,7 +292,7 @@ int mds_cleanup_orphans(struct obd_device *obd)
 
                 CDEBUG(D_INODE, "entry "LPU64" of PENDING DIR: %s\n",
                        i, d_name);
-                
+
                 if (((namlen == 1) && !strcmp(d_name, ".")) ||
                     ((namlen == 2) && !strcmp(d_name, ".."))) {
                         continue;
@@ -304,7 +321,7 @@ int mds_cleanup_orphans(struct obd_device *obd)
                         item ++;
                         CWARN("removed orphan %s from MDS and OST\n", d_name);
                 } else {
-                        CERROR("removed orphan %s from MDS and OST failed,"
+                        CDEBUG(D_INODE, "removed orphan %s from MDS/OST failed,"
                                " rc = %d\n", d_name, rc);
                         rc = 0;
                 }
@@ -318,7 +335,7 @@ err_out:
                 OBD_FREE(dirent, sizeof(*dirent));
         }
 err_pop:
-        pop_ctxt(&saved, &obd->obd_ctxt, NULL);
+        pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
         if (rc == 0)
                 rc = item;
         RETURN(rc);