Whamcloud - gitweb
landing b_cmobd_merge on HEAD
[fs/lustre-release.git] / lustre / mds / mds_reint.c
index 8dc5409..57f7c99 100644 (file)
@@ -38,6 +38,7 @@
 #include <linux/lustre_idl.h>
 #include <linux/lustre_mds.h>
 #include <linux/lustre_dlm.h>
+#include <linux/lustre_log.h>
 #include <linux/lustre_fsfilt.h>
 
 #include "mds_internal.h"
@@ -80,9 +81,9 @@ static void mds_cancel_cookies_cb(struct obd_device *obd, __u64 transno,
                 ///* XXX 0 normally, SENDNOW for debug */);
                 ctxt = llog_get_context(&obd->obd_llogs,
                                         mlcd->mlcd_cookies[0].lgc_subsys + 1);
-                rc = llog_cancel(ctxt, lsm, mlcd->mlcd_cookielen /
-                                                sizeof(*mlcd->mlcd_cookies),
-                                 mlcd->mlcd_cookies, OBD_LLOG_FL_SENDNOW);
+                rc = llog_cancel(ctxt, mlcd->mlcd_cookielen /
+                                 sizeof(*mlcd->mlcd_cookies),
+                                 mlcd->mlcd_cookies, OBD_LLOG_FL_SENDNOW, lsm);
                 if (rc)
                         CERROR("error cancelling %d log cookies: rc %d\n",
                                (int)(mlcd->mlcd_cookielen /
@@ -147,8 +148,10 @@ int mds_finish_transno(struct mds_obd *mds, struct inode *inode, void *handle,
         mcd->mcd_last_result = cpu_to_le32(rc);
         mcd->mcd_last_data = cpu_to_le32(op_data);
 
-        fsfilt_add_journal_cb(req->rq_export->exp_obd, transno, handle,
-                              mds_commit_cb, NULL);
+
+        fsfilt_add_journal_cb(req->rq_export->exp_obd, mds->mds_sb,
+                              transno, handle, mds_commit_cb, NULL);
+        
         err = fsfilt_write_record(obd, mds->mds_rcvd_filp, mcd, sizeof(*mcd),
                                   &off, 0);
 
@@ -210,7 +213,7 @@ int mds_fix_attr(struct inode *inode, struct mds_update_record *rec)
 
         /* times */
         if ((ia_valid & (ATTR_MTIME|ATTR_ATIME)) == (ATTR_MTIME|ATTR_ATIME)) {
-                if (rec->ur_fsuid != inode->i_uid &&
+                if (rec->_ur_fsuid != inode->i_uid &&
                     (error = ll_permission(inode, MAY_WRITE, NULL)) != 0)
                         RETURN(error);
         }
@@ -279,6 +282,7 @@ void mds_steal_ack_locks(struct ptlrpc_request *req)
         struct ptlrpc_reply_state *oldrep;
         struct ptlrpc_service     *svc;
         unsigned long              flags;
+        char                       str[PTL_NALFMT_SIZE];
         int                        i;
 
         /* CAVEAT EMPTOR: spinlock order */
@@ -300,13 +304,13 @@ void mds_steal_ack_locks(struct ptlrpc_request *req)
                 list_del_init (&oldrep->rs_exp_list);
 
                 CWARN("Stealing %d locks from rs %p x"LPD64".t"LPD64
-                      " o%d NID"LPX64"\n",
-                      oldrep->rs_nlocks, oldrep, 
+                      " o%d NID %s\n",
+                      oldrep->rs_nlocks, oldrep,
                       oldrep->rs_xid, oldrep->rs_transno, oldrep->rs_msg.opc,
-                      exp->exp_connection->c_peer.peer_nid);
+                      ptlrpc_peernid2str(&exp->exp_connection->c_peer, str));
 
                 for (i = 0; i < oldrep->rs_nlocks; i++)
-                        ptlrpc_save_lock(req, 
+                        ptlrpc_save_lock(req,
                                          &oldrep->rs_locks[i],
                                          oldrep->rs_modes[i]);
                 oldrep->rs_nlocks = 0;
@@ -376,7 +380,7 @@ static int mds_reint_setattr(struct mds_update_record *rec, int offset,
         struct mds_body *body;
         struct dentry *de;
         struct inode *inode = NULL;
-        struct lustre_handle lockh[2];
+        struct lustre_handle lockh[2] = {{0}, {0}};
         void *handle = NULL;
         struct mds_logcancel_data *mlcd = NULL;
         int rc = 0, cleanup_phase = 0, err, locked = 0;
@@ -408,7 +412,8 @@ static int mds_reint_setattr(struct mds_update_record *rec, int offset,
 
         inode = de->d_inode;
         LASSERT(inode);
-        if (S_ISREG(inode->i_mode) && rec->ur_eadata != NULL)
+        if ((S_ISREG(inode->i_mode) || S_ISDIR(inode->i_mode)) &&
+            rec->ur_eadata != NULL)
                 down(&inode->i_sem);
 
         OBD_FAIL_WRITE(OBD_FAIL_MDS_REINT_SETATTR_WRITE, inode->i_sb);
@@ -431,9 +436,25 @@ static int mds_reint_setattr(struct mds_update_record *rec, int offset,
         else                                            /* setattr */
                 rc = fsfilt_setattr(obd, de, handle, &rec->ur_iattr, 0);
 
-        if (rc == 0 && S_ISREG(inode->i_mode) && rec->ur_eadata != NULL) {
-                rc = fsfilt_set_md(obd, inode, handle,
-                                   rec->ur_eadata, rec->ur_eadatalen);
+        if (rc == 0 && (S_ISREG(inode->i_mode) || S_ISDIR(inode->i_mode)) &&
+            rec->ur_eadata != NULL) {
+                struct lov_stripe_md *lsm = NULL;
+
+                rc = ll_permission(inode, MAY_WRITE, NULL);
+                if (rc < 0)
+                        GOTO(cleanup, rc);
+
+                rc = obd_iocontrol(OBD_IOC_LOV_SETSTRIPE,
+                                   mds->mds_osc_exp, 0, &lsm, rec->ur_eadata);
+                if (rc)
+                        GOTO(cleanup, rc);
+
+                obd_free_memmd(mds->mds_osc_exp, &lsm);
+
+                rc = fsfilt_set_md(obd, inode, handle, rec->ur_eadata,
+                                   rec->ur_eadatalen);
+                if (rc)
+                        GOTO(cleanup, rc);
         }
 
         body = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*body));
@@ -469,12 +490,13 @@ static int mds_reint_setattr(struct mds_update_record *rec, int offset,
         EXIT;
  cleanup:
         if (mlcd != NULL)
-                fsfilt_add_journal_cb(req->rq_export->exp_obd, 0, handle,
-                                      mds_cancel_cookies_cb, mlcd);
+                fsfilt_add_journal_cb(req->rq_export->exp_obd, mds->mds_sb, 0,
+                                      handle, mds_cancel_cookies_cb, mlcd);
         err = mds_finish_transno(mds, inode, handle, req, rc, 0);
         switch (cleanup_phase) {
         case 1:
-                if (S_ISREG(inode->i_mode) && rec->ur_eadata != NULL)
+                if ((S_ISREG(inode->i_mode) || S_ISDIR(inode->i_mode)) &&
+                    rec->ur_eadata != NULL)
                         up(&inode->i_sem);
                 l_dput(de);
                 if (locked) {
@@ -534,7 +556,7 @@ static int mds_reint_create(struct mds_update_record *rec, int offset,
         struct dentry *dchild = NULL;
         struct inode *dir = NULL;
         void *handle = NULL;
-        struct lustre_handle lockh[2];
+        struct lustre_handle lockh[2] = {{0}, {0}};
         int rc = 0, err, type = rec->ur_mode & S_IFMT, cleanup_phase = 0;
         int created = 0;
         struct dentry_params dp;
@@ -570,6 +592,9 @@ static int mds_reint_create(struct mds_update_record *rec, int offset,
 
         /* try to retrieve MEA data for this dir */
         rc = mds_get_lmv_attr(obd, dparent->d_inode, &mea, &mea_size);
+        if (rc)
+                GOTO(cleanup, rc);
+
         if (mea != NULL) {
                 /* dir is already splitted, check is requested filename
                  * should live at this MDS or at another one */
@@ -631,7 +656,8 @@ static int mds_reint_create(struct mds_update_record *rec, int offset,
                  * over the whole cluster in order to distribute namespace
                  * processing load. first, we calculate which MDS to use to
                  * put new directory's inode in */
-                i = mds_choose_mdsnum(obd, rec->ur_name, rec->ur_namelen - 1);
+                i = mds_choose_mdsnum(obd, rec->ur_name, rec->ur_namelen - 1, 
+                                      rec->ur_flags);
                 if (i == mds->mds_num) {
                         /* inode will be created locally */
 
@@ -675,12 +701,17 @@ static int mds_reint_create(struct mds_update_record *rec, int offset,
                         }
 
                         rc = obd_create(mds->mds_lmv_exp, oa, NULL, NULL);
-                       LASSERT(rc == 0);
+                        if (rc) {
+                               obdo_free(oa);
+                                GOTO(cleanup, rc);
+                        }
                         
                         /* now, add new dir entry for it */
                         handle = fsfilt_start(obd, dir, FSFILT_OP_MKDIR, NULL);
-                        if (IS_ERR(handle))
+                        if (IS_ERR(handle)) {
+                               obdo_free(oa);
                                 GOTO(cleanup, rc = PTR_ERR(handle));
+                        }
                         rc = fsfilt_add_dir_entry(obd, dparent, rec->ur_name,
                                                   rec->ur_namelen - 1,
                                                   oa->o_id, oa->o_generation,
@@ -747,11 +778,11 @@ static int mds_reint_create(struct mds_update_record *rec, int offset,
                 LTIME_S(iattr.ia_atime) = rec->ur_time;
                 LTIME_S(iattr.ia_ctime) = rec->ur_time;
                 LTIME_S(iattr.ia_mtime) = rec->ur_time;
-                iattr.ia_uid = rec->ur_fsuid;
+                iattr.ia_uid = rec->_ur_fsuid;
                 if (dir->i_mode & S_ISGID)
                         iattr.ia_gid = dir->i_gid;
                 else
-                        iattr.ia_gid = rec->ur_fsgid;
+                        iattr.ia_gid = rec->_ur_fsgid;
                 iattr.ia_valid = ATTR_UID | ATTR_GID | ATTR_ATIME |
                         ATTR_MTIME | ATTR_CTIME;
 
@@ -1170,9 +1201,11 @@ int mds_get_parent_child_locked(struct obd_device *obd, struct mds_obd *mds,
 
         /* Step 1: Lookup parent */
         *dparentp = mds_fid2dentry(mds, fid, NULL);
-        if (IS_ERR(*dparentp))
-                RETURN(rc = PTR_ERR(*dparentp));
-        LASSERT((*dparentp)->d_inode);
+        if (IS_ERR(*dparentp)) {
+                rc = PTR_ERR(*dparentp);
+                *dparentp = NULL;
+                RETURN(rc);
+        }
 
         CDEBUG(D_INODE, "parent ino %lu, name %s\n",
                (*dparentp)->d_inode->i_ino, name);
@@ -1304,7 +1337,7 @@ int mds_create_local_dentry(struct mds_update_record *rec,
         struct dentry *new_child = NULL;
         char *fidname = rec->ur_name;
         struct dentry *child = NULL;
-        struct lustre_handle lockh;
+        struct lustre_handle lockh[2] = {{0}, {0}};
         void *handle;
         ENTRY;
 
@@ -1338,7 +1371,7 @@ int mds_create_local_dentry(struct mds_update_record *rec,
         d_drop(new_child);
 
         child = mds_fid2locked_dentry(obd, rec->ur_fid1, NULL, LCK_EX,
-                                      &lockh, NULL, 0, MDS_INODELOCK_UPDATE);
+                                      lockh, NULL, 0, MDS_INODELOCK_UPDATE);
         if (IS_ERR(child)) {
                 CERROR("can't get victim\n");
                 GOTO(cleanup, rc = PTR_ERR(child));
@@ -1372,7 +1405,7 @@ int mds_create_local_dentry(struct mds_update_record *rec,
 cleanup:
         switch(cleanup_phase) {
                 case 2:
-                        ldlm_lock_decref(&lockh, LCK_EX);
+                        ldlm_lock_decref(lockh, LCK_EX);
                         dput(child);
                 case 1:
                         dput(new_child);
@@ -1404,7 +1437,8 @@ static int mds_copy_unlink_reply(struct ptlrpc_request *master,
         memcpy(body2, body, sizeof(*body));
         body2->valid &= ~OBD_MD_FLCOOKIE;
 
-        if (!(body->valid & OBD_MD_FLEASIZE))
+        if (!(body->valid & OBD_MD_FLEASIZE) &&
+            !(body->valid & OBD_MD_FLDIREA))
                 RETURN(0);
 
         if (body->eadatasize == 0) {
@@ -1498,7 +1532,8 @@ static int mds_reint_unlink(struct mds_update_record *rec, int offset,
         struct obd_device *obd = req->rq_export->exp_obd;
         struct mds_body *body = NULL;
         struct inode *child_inode;
-        struct lustre_handle parent_lockh[2], child_lockh, child_reuse_lockh;
+        struct lustre_handle parent_lockh[2] = {{0}, {0}}; 
+        struct lustre_handle child_lockh = {0}, child_reuse_lockh = {0};
         char fidname[LL_FID_NAMELEN];
         void *handle = NULL;
         int rc = 0, log_unlink = 0, cleanup_phase = 0;
@@ -1602,13 +1637,6 @@ static int mds_reint_unlink(struct mds_update_record *rec, int offset,
                         GOTO(cleanup, rc = -EISDIR);
         }
 
-        if (child_inode->i_nlink == (S_ISDIR(child_inode->i_mode) ? 2 : 1) &&
-            mds_open_orphan_count(child_inode) > 0) {
-                rc = mds_open_unlink_rename(rec, obd, dparent, dchild, &handle);
-                cleanup_phase = 4; /* transaction */
-                GOTO(cleanup, rc);
-        }
-
         /* Step 4: Do the unlink: we already verified ur_mode above (bug 72) */
         switch (child_inode->i_mode & S_IFMT) {
         case S_IFDIR:
@@ -1670,8 +1698,8 @@ static int mds_reint_unlink(struct mds_update_record *rec, int offset,
                 int err;
 
                 iattr.ia_valid = ATTR_MTIME | ATTR_CTIME;
-                LTIME_S(iattr.ia_mtime) = rec->ur_time;
-                LTIME_S(iattr.ia_ctime) = rec->ur_time;
+                LTIME_S(iattr.ia_mtime) = LTIME_S(rec->ur_time);
+                LTIME_S(iattr.ia_ctime) = LTIME_S(rec->ur_time);
 
                 err = fsfilt_setattr(obd, dparent, handle, &iattr, 0);
                 if (err)
@@ -1680,6 +1708,14 @@ static int mds_reint_unlink(struct mds_update_record *rec, int offset,
 
         switch(cleanup_phase) {
         case 4:
+                LASSERT(dchild != NULL && dchild->d_inode != NULL);
+                LASSERT(atomic_read(&dchild->d_inode->i_count) > 0);
+                if (rc == 0 && dchild->d_inode->i_nlink == 0 &&
+                                mds_open_orphan_count(dchild->d_inode) > 0) {
+                        /* filesystem is really going to destroy an inode
+                         * we have to delay this till inode is opened -bzzz */
+                        mds_open_unlink_rename(rec, obd, dparent, dchild, NULL);
+                }
                 rc = mds_finish_transno(mds, dparent->d_inode, handle, req,
                                         rc, 0);
                 if (!rc)
@@ -1727,7 +1763,7 @@ static int mds_reint_link_acquire(struct mds_update_record *rec,
 {
         struct obd_device *obd = req->rq_export->exp_obd;
         struct ldlm_res_id src_res_id = { .name = {0} };
-        struct lustre_handle *handle = NULL, src_lockh;
+        struct lustre_handle *handle = NULL, src_lockh = {0};
         struct mds_obd *mds = mds_req2mds(req);
         int rc = 0, cleanup_phase = 0;
         struct dentry *de_src = NULL;
@@ -1800,7 +1836,7 @@ static int mds_reint_link_to_remote(struct mds_update_record *rec,
                                     int offset, struct ptlrpc_request *req,
                                     struct lustre_handle *lh)
 {
-        struct lustre_handle *handle = NULL, tgt_dir_lockh[2];
+        struct lustre_handle *handle = NULL, tgt_dir_lockh[2] = {{0}, {0}};
         struct obd_device *obd = req->rq_export->exp_obd;
         struct dentry *de_tgt_dir = NULL;
         struct mds_obd *mds = mds_req2mds(req);
@@ -1831,7 +1867,9 @@ static int mds_reint_link_to_remote(struct mds_update_record *rec,
         op_data.namelen = 0;
         op_data.name = NULL;
         rc = md_link(mds->mds_lmv_exp, &op_data, &request);
-        LASSERT(rc == 0);
+        if (rc)
+                GOTO(cleanup, rc);
+
         cleanup_phase = 2;
         if (request)
                 ptlrpc_req_finished(request);
@@ -1892,7 +1930,8 @@ static int mds_reint_link(struct mds_update_record *rec, int offset,
         struct dentry *de_tgt_dir = NULL;
         struct dentry *dchild = NULL;
         struct mds_obd *mds = mds_req2mds(req);
-        struct lustre_handle *handle = NULL, tgt_dir_lockh[2], src_lockh;
+        struct lustre_handle *handle = NULL;
+        struct lustre_handle tgt_dir_lockh[2] = {{0}, {0}}, src_lockh = {0};
         struct ldlm_res_id src_res_id = { .name = {0} };
         struct ldlm_res_id tgt_dir_res_id = { .name = {0} };
         ldlm_policy_data_t src_policy ={.l_inodebits = {MDS_INODELOCK_UPDATE}};
@@ -1909,7 +1948,8 @@ static int mds_reint_link(struct mds_update_record *rec, int offset,
                   rec->ur_fid2->id, rec->ur_fid2->generation, rec->ur_name);
 
         MDS_CHECK_RESENT(req, mds_reconstruct_generic(req));
-
+        
+//      memset(tgt_dir_lockh, 0, 2*sizeof(tgt_dir_lockh[0]));
         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_LINK))
                 GOTO(cleanup, rc = -ENOENT);
 
@@ -1931,8 +1971,11 @@ static int mds_reint_link(struct mds_update_record *rec, int offset,
         cleanup_phase = 1; /* source dentry */
 
         de_tgt_dir = mds_fid2dentry(mds, rec->ur_fid2, NULL);
-        if (IS_ERR(de_tgt_dir))
-                GOTO(cleanup, rc = PTR_ERR(de_tgt_dir));
+        if (IS_ERR(de_tgt_dir)) {
+                rc = PTR_ERR(de_tgt_dir);
+                de_tgt_dir = NULL;
+                GOTO(cleanup, rc);
+        }
 
         cleanup_phase = 2; /* target directory dentry */
 
@@ -2157,8 +2200,11 @@ static int mds_get_parents_children_locked(struct obd_device *obd,
                 *de_tgtdirp = dget(*de_srcdirp);
         } else {
                 *de_tgtdirp = mds_fid2dentry(mds, p2_fid, NULL);
-                if (IS_ERR(*de_tgtdirp))
-                        GOTO(cleanup, rc = PTR_ERR(*de_tgtdirp));
+                if (IS_ERR(*de_tgtdirp)) {
+                        rc = PTR_ERR(*de_tgtdirp);
+                        *de_tgtdirp = NULL;
+                        GOTO(cleanup, rc);
+                }
         }
 
         cleanup_phase = 2; /* target directory dentry */
@@ -2326,8 +2372,8 @@ static int mds_reint_rename_create_name(struct mds_update_record *rec,
         struct dentry *de_srcdir = NULL;
         struct dentry *de_new = NULL;
         struct mds_obd *mds = mds_req2mds(req);
-        struct lustre_handle parent_lockh[2];
-        struct lustre_handle child_lockh;
+        struct lustre_handle parent_lockh[2] = {{0}, {0}};
+        struct lustre_handle child_lockh = {0};
         int cleanup_phase = 0;
         void *handle = NULL;
         int rc = 0;
@@ -2428,10 +2474,9 @@ static int mds_reint_rename_to_remote(struct mds_update_record *rec, int offset,
         struct dentry *de_srcdir = NULL;
         struct dentry *de_old = NULL;
         struct mds_obd *mds = mds_req2mds(req);
-        struct lustre_handle parent_lockh[2];
-        struct lustre_handle child_lockh;
+        struct lustre_handle parent_lockh[2] = {{0}, {0}};
+        struct lustre_handle child_lockh = {0};
         struct mdc_op_data opdata;
-        int cleanup_phase = 0;
         void *handle = NULL;
         int rc = 0;
         ENTRY;
@@ -2518,7 +2563,7 @@ static int mds_reint_rename(struct mds_update_record *rec, int offset,
         struct dentry *de_old = NULL;
         struct dentry *de_new = NULL;
         struct mds_obd *mds = mds_req2mds(req);
-        struct lustre_handle dlm_handles[7];
+        struct lustre_handle dlm_handles[7] = {{0},{0},{0},{0},{0},{0},{0}};
         struct mds_body *body = NULL;
         int rc = 0, lock_count = 3;
         int cleanup_phase = 0;
@@ -2568,9 +2613,8 @@ static int mds_reint_rename(struct mds_update_record *rec, int offset,
              de_new->d_inode->i_ino == de_tgtdir->d_inode->i_ino))
                 GOTO(cleanup, rc = -EINVAL);
 
-        if (de_old->d_inode == de_new->d_inode) {
+        if (de_old->d_inode == de_new->d_inode)
                 GOTO(cleanup, rc = 0);
-        }
 
         /* if we are about to remove the target at first, pass the EA of
          * that inode to client to perform and cleanup on OST */
@@ -2672,16 +2716,16 @@ int mds_reint_rec(struct mds_update_record *rec, int offset,
                   struct ptlrpc_request *req, struct lustre_handle *lockh)
 {
         struct obd_device *obd = req->rq_export->exp_obd;
-        struct obd_run_ctxt saved;
+        struct lvfs_run_ctxt saved;
         int rc;
 
         /* checked by unpacker */
         LASSERT(rec->ur_opcode <= REINT_MAX &&
                 reinters[rec->ur_opcode] != NULL);
 
-        push_ctxt(&saved, &obd->obd_ctxt, &rec->ur_uc);
+        push_ctxt(&saved, &obd->obd_lvfs_ctxt, &rec->ur_uc);
         rc = reinters[rec->ur_opcode] (rec, offset, req, lockh);
-        pop_ctxt(&saved, &obd->obd_ctxt, &rec->ur_uc);
+        pop_ctxt(&saved, &obd->obd_lvfs_ctxt, &rec->ur_uc);
 
         return rc;
 }