Whamcloud - gitweb
b=17310
[fs/lustre-release.git] / lustre / mdt / mdt_reint.c
index 8a0a9f0..d42e20f 100644 (file)
@@ -1,33 +1,47 @@
 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
  * vim:expandtab:shiftwidth=8:tabstop=8:
  *
- *  linux/mdt/mdt_reint.c
- *  Lustre Metadata Target (mdt) reintegration routines
+ * GPL HEADER START
  *
- *  Copyright (C) 2002-2006 Cluster File Systems, Inc.
- *   Author: Peter Braam <braam@clusterfs.com>
- *   Author: Andreas Dilger <adilger@clusterfs.com>
- *   Author: Phil Schwan <phil@clusterfs.com>
- *   Author: Huang Hua <huanghua@clusterfs.com>
- *   Author: Yury Umanets <umka@clusterfs.com>
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  *
- *   This file is part of the Lustre file system, http://www.lustre.org
- *   Lustre is a trademark of Cluster File Systems, Inc.
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 only,
+ * as published by the Free Software Foundation.
  *
- *   You may have signed or agreed to another license before downloading
- *   this software.  If so, you are bound by the terms and conditions
- *   of that agreement, and the following does not apply to you.  See the
- *   LICENSE file included with this distribution for more information.
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * General Public License version 2 for more details (a copy is included
+ * in the LICENSE file that accompanied this code).
  *
- *   If you did not agree to a different license, then this copy of Lustre
- *   is open source software; you can redistribute it and/or modify it
- *   under the terms of version 2 of the GNU General Public License as
- *   published by the Free Software Foundation.
+ * You should have received a copy of the GNU General Public License
+ * version 2 along with this program; If not, see
+ * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
  *
- *   In either case, Lustre is distributed in the hope that it will be
- *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
- *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
- *   license text for more details.
+ * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
+ * CA 95054 USA or visit www.sun.com if you need additional information or
+ * have any questions.
+ *
+ * GPL HEADER END
+ */
+/*
+ * Copyright  2008 Sun Microsystems, Inc. All rights reserved
+ * Use is subject to license terms.
+ */
+/*
+ * This file is part of Lustre, http://www.lustre.org/
+ * Lustre is a trademark of Sun Microsystems, Inc.
+ *
+ * lustre/mdt/mdt_reint.c
+ *
+ * Lustre Metadata Target (mdt) reintegration routines
+ *
+ * Author: Peter Braam <braam@clusterfs.com>
+ * Author: Andreas Dilger <adilger@clusterfs.com>
+ * Author: Phil Schwan <phil@clusterfs.com>
+ * Author: Huang Hua <huanghua@clusterfs.com>
+ * Author: Yury Umanets <umka@clusterfs.com>
  */
 
 #ifndef EXPORT_SYMTAB
 static inline void mdt_reint_init_ma(struct mdt_thread_info *info,
                                      struct md_attr *ma)
 {
-        ma->ma_lmm = req_capsule_server_get(&info->mti_pill, &RMF_MDT_MD);
-        ma->ma_lmm_size = req_capsule_get_size(&info->mti_pill,
+        ma->ma_lmm = req_capsule_server_get(info->mti_pill, &RMF_MDT_MD);
+        ma->ma_lmm_size = req_capsule_get_size(info->mti_pill,
                                                &RMF_MDT_MD, RCL_SERVER);
 
-        ma->ma_cookie = req_capsule_server_get(&info->mti_pill,
+        ma->ma_cookie = req_capsule_server_get(info->mti_pill,
                                                &RMF_LOGCOOKIES);
-        ma->ma_cookie_size = req_capsule_get_size(&info->mti_pill,
+        ma->ma_cookie_size = req_capsule_get_size(info->mti_pill,
                                                   &RMF_LOGCOOKIES,
                                                   RCL_SERVER);
 
@@ -69,7 +83,7 @@ static int mdt_create_pack_capa(struct mdt_thread_info *info, int rc,
         if (rc == 0 && info->mti_mdt->mdt_opts.mo_mds_capa) {
                 struct lustre_capa *capa;
 
-                capa = req_capsule_server_get(&info->mti_pill, &RMF_CAPA1);
+                capa = req_capsule_server_get(info->mti_pill, &RMF_CAPA1);
                 LASSERT(capa);
                 capa->lc_opc = CAPA_OPC_MDS_DEFAULT;
                 rc = mo_capa_get(info->mti_env, mdt_object_child(object), capa,
@@ -77,8 +91,6 @@ static int mdt_create_pack_capa(struct mdt_thread_info *info, int rc,
                 if (rc == 0)
                         repbody->valid |= OBD_MD_FLMDSCAPA;
         }
-        if (!(repbody->valid & OBD_MD_FLMDSCAPA))
-                lustre_shrink_reply(mdt_info_req(info), REPLY_REC_OFF+1, 0, 1);
 
         RETURN(rc);
 }
@@ -99,7 +111,7 @@ static int mdt_md_create(struct mdt_thread_info *info)
         DEBUG_REQ(D_INODE, mdt_info_req(info), "Create  (%s->"DFID") in "DFID,
                   rr->rr_name, PFID(rr->rr_fid2), PFID(rr->rr_fid1));
 
-        repbody = req_capsule_server_get(&info->mti_pill, &RMF_MDT_BODY);
+        repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
 
         lh = &info->mti_lh[MDT_LH_PARENT];
         mdt_lock_pdo_init(lh, LCK_PW, rr->rr_name, rr->rr_namelen);
@@ -116,7 +128,7 @@ static int mdt_md_create(struct mdt_thread_info *info)
                 ma->ma_need = MA_INODE;
                 ma->ma_valid = 0;
                 /* capa for cross-ref will be stored here */
-                ma->ma_capa = req_capsule_server_get(&info->mti_pill,
+                ma->ma_capa = req_capsule_server_get(info->mti_pill,
                                                      &RMF_CAPA1);
                 LASSERT(ma->ma_capa);
 
@@ -166,7 +178,7 @@ static int mdt_md_mkobj(struct mdt_thread_info *info)
         DEBUG_REQ(D_INODE, mdt_info_req(info), "Partial create "DFID"",
                   PFID(info->mti_rr.rr_fid2));
 
-        repbody = req_capsule_server_get(&info->mti_pill, &RMF_MDT_BODY);
+        repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
 
         o = mdt_object_find(info->mti_env, mdt, info->mti_rr.rr_fid2);
         if (!IS_ERR(o)) {
@@ -296,8 +308,8 @@ static int mdt_reint_setattr(struct mdt_thread_info *info,
 
         if (info->mti_dlm_req)
                 ldlm_request_cancel(req, info->mti_dlm_req, 0);
-        
-        repbody = req_capsule_server_get(&info->mti_pill, &RMF_MDT_BODY);
+
+        repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
         mo = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid1);
         if (IS_ERR(mo))
                 GOTO(out, rc = PTR_ERR(mo));
@@ -334,6 +346,9 @@ static int mdt_reint_setattr(struct mdt_thread_info *info,
                 repbody->handle.cookie = mfd->mfd_handle.h_cookie;
         }
 
+        if (info->mti_epoch && (info->mti_epoch->flags & MF_SOM_CHANGE))
+                ma->ma_attr_flags |= MDS_PERM_BYPASS | MDS_SOM;
+
         rc = mdt_attr_set(info, mo, rr->rr_flags);
         if (rc)
                 GOTO(out_put, rc);
@@ -377,7 +392,7 @@ static int mdt_reint_setattr(struct mdt_thread_info *info,
             (ma->ma_attr.la_valid & LA_SIZE)) {
                 struct lustre_capa *capa;
 
-                capa = req_capsule_server_get(&info->mti_pill, &RMF_CAPA1);
+                capa = req_capsule_server_get(info->mti_pill, &RMF_CAPA2);
                 LASSERT(capa);
                 capa->lc_opc = CAPA_OPC_OSS_DEFAULT | CAPA_OPC_OSS_TRUNC;
                 rc = mo_capa_get(info->mti_env, mdt_object_child(mo), capa, 0);
@@ -390,6 +405,7 @@ static int mdt_reint_setattr(struct mdt_thread_info *info,
 out_put:
         mdt_object_put(info->mti_env, mo);
 out:
+        mdt_shrink_reply(info);
         return rc;
 }
 
@@ -400,7 +416,7 @@ static int mdt_reint_create(struct mdt_thread_info *info,
         ENTRY;
 
         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_CREATE))
-                GOTO(out, rc = err_serious(-ESTALE));
+                RETURN(err_serious(-ESTALE));
 
         if (info->mti_dlm_req)
                 ldlm_request_cancel(mdt_info_req(info), info->mti_dlm_req, 0);
@@ -427,9 +443,7 @@ static int mdt_reint_create(struct mdt_thread_info *info,
         default:
                 rc = err_serious(-EOPNOTSUPP);
         }
-        EXIT;
-out:
-        return rc;
+        RETURN(rc);
 }
 
 static int mdt_reint_unlink(struct mdt_thread_info *info,
@@ -454,18 +468,28 @@ static int mdt_reint_unlink(struct mdt_thread_info *info,
                 ldlm_request_cancel(req, info->mti_dlm_req, 0);
 
         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_UNLINK))
-                GOTO(out, rc = err_serious(-ENOENT));
+                RETURN(err_serious(-ENOENT));
 
-        /* step 1: lock the parent */
+        /*
+         * step 1: lock the parent. Note, this may be child in case of
+         * remote operation denoted by ->mti_cross_ref flag.
+         */
         parent_lh = &info->mti_lh[MDT_LH_PARENT];
-        mdt_lock_pdo_init(parent_lh, LCK_PW, rr->rr_name,
-                          rr->rr_namelen);
-
+        if (info->mti_cross_ref) {
+                /*
+                 * Init reg lock for cross ref case when we need to do only
+                 * ref del locally.
+                 */
+                mdt_lock_reg_init(parent_lh, LCK_PW);
+        } else {
+                mdt_lock_pdo_init(parent_lh, LCK_PW, rr->rr_name,
+                                  rr->rr_namelen);
+        }
         mp = mdt_object_find_lock(info, rr->rr_fid1, parent_lh,
                                   MDS_INODELOCK_UPDATE);
         if (IS_ERR(mp)) {
                 rc = PTR_ERR(mp);
-                /* errors are possible here in cross-ref cases, see below */ 
+                /* errors are possible here in cross-ref cases, see below */
                 if (info->mti_cross_ref)
                         rc = 0;
                 GOTO(out, rc);
@@ -486,7 +510,8 @@ static int mdt_reint_unlink(struct mdt_thread_info *info,
                         mdt_set_capainfo(info, 0, rr->rr_fid1, BYPASS_CAPA);
                         rc = mo_ref_del(info->mti_env,
                                         mdt_object_child(mp), ma);
-                        mdt_handle_last_unlink(info, mp, ma);
+                        if (rc == 0)
+                                mdt_handle_last_unlink(info, mp, ma);
                 } else
                         rc = 0;
                 GOTO(out_unlock_parent, rc);
@@ -532,7 +557,6 @@ static int mdt_reint_unlink(struct mdt_thread_info *info,
 out_unlock_parent:
         mdt_object_unlock_put(info, mp, parent_lh, rc);
 out:
-        mdt_shrink_reply(info);
         return rc;
 }
 
@@ -554,7 +578,7 @@ static int mdt_reint_link(struct mdt_thread_info *info,
                   PFID(rr->rr_fid1), PFID(rr->rr_fid2), rr->rr_name);
 
         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_LINK))
-                GOTO(out, rc = err_serious(-ENOENT));
+                RETURN(err_serious(-ENOENT));
 
         if (info->mti_dlm_req)
                 ldlm_request_cancel(req, info->mti_dlm_req, 0);
@@ -566,18 +590,18 @@ static int mdt_reint_link(struct mdt_thread_info *info,
                 ms = mdt_object_find_lock(info, rr->rr_fid1, lhs,
                                           MDS_INODELOCK_UPDATE);
                 if (IS_ERR(ms))
-                        GOTO(out, rc = PTR_ERR(ms));
+                        RETURN(PTR_ERR(ms));
 
                 mdt_set_capainfo(info, 0, rr->rr_fid1, BYPASS_CAPA);
                 rc = mo_ref_add(info->mti_env, mdt_object_child(ms), ma);
                 mdt_object_unlock_put(info, ms, lhs, rc);
-                GOTO(out, rc);
+                RETURN(rc);
         }
 
         /* Invalid case so return error immediately instead of
          * processing it */
         if (lu_fid_eq(rr->rr_fid1, rr->rr_fid2))
-                GOTO(out, rc = -EPERM);
+                RETURN(-EPERM);
 
         /* step 1: find & lock the target parent dir */
         lhp = &info->mti_lh[MDT_LH_PARENT];
@@ -586,7 +610,7 @@ static int mdt_reint_link(struct mdt_thread_info *info,
         mp = mdt_object_find_lock(info, rr->rr_fid2, lhp,
                                   MDS_INODELOCK_UPDATE);
         if (IS_ERR(mp))
-                GOTO(out, rc = PTR_ERR(mp));
+                RETURN(PTR_ERR(mp));
 
         /* step 2: find & lock the source */
         lhs = &info->mti_lh[MDT_LH_CHILD];
@@ -615,7 +639,6 @@ static int mdt_reint_link(struct mdt_thread_info *info,
         mdt_object_unlock_put(info, ms, lhs, rc);
 out_unlock_parent:
         mdt_object_unlock_put(info, mp, lhp, rc);
-out:
         return rc;
 }
 
@@ -644,7 +667,7 @@ static int mdt_reint_rename_tgt(struct mdt_thread_info *info)
         mtgtdir = mdt_object_find_lock(info, rr->rr_fid1, lh_tgtdir,
                                        MDS_INODELOCK_UPDATE);
         if (IS_ERR(mtgtdir))
-                GOTO(out, rc = PTR_ERR(mtgtdir));
+                RETURN(PTR_ERR(mtgtdir));
 
         /* step 2: find & lock the target object if exists. */
         mdt_set_capainfo(info, 0, rr->rr_fid1, BYPASS_CAPA);
@@ -691,8 +714,6 @@ out_unlock_tgt:
                 mdt_object_unlock_put(info, mtgt, lh_tgt, rc);
 out_unlock_tgtdir:
         mdt_object_unlock_put(info, mtgtdir, lh_tgtdir, rc);
-out:
-        mdt_shrink_reply(info);
         return rc;
 }
 
@@ -702,25 +723,25 @@ static int mdt_rename_lock(struct mdt_thread_info *info,
         struct ldlm_namespace *ns     = info->mti_mdt->mdt_namespace;
         ldlm_policy_data_t    *policy = &info->mti_policy;
         struct ldlm_res_id    *res_id = &info->mti_res_id;
-        struct lu_site        *ls;
+        struct md_site        *ms;
         int rc;
         ENTRY;
 
-        /* 
-         * Disable global rename BFL lock temporarily because 
-         * when a mds do rename recoverying, which might enqueue 
-         * BFL lock to the controller mds. and this req might be 
-         * replay req for controller mds. but we did not have 
+        /*
+         * Disable global rename BFL lock temporarily because
+         * when a mds do rename recoverying, which might enqueue
+         * BFL lock to the controller mds. and this req might be
+         * replay req for controller mds. but we did not have
          * such handling in controller mds. XXX
          */
         RETURN(0);
-        ls = info->mti_mdt->mdt_md_dev.md_lu_dev.ld_site;
+        ms = mdt_md_site(info->mti_mdt);
         fid_build_reg_res_name(&LUSTRE_BFL_FID, res_id);
 
         memset(policy, 0, sizeof *policy);
         policy->l_inodebits.bits = MDS_INODELOCK_UPDATE;
 
-        if (ls->ls_control_exp == NULL) {
+        if (ms->ms_control_exp == NULL) {
                 int flags = LDLM_FL_LOCAL_ONLY | LDLM_FL_ATOMIC_CB;
 
                 /*
@@ -730,17 +751,20 @@ static int mdt_rename_lock(struct mdt_thread_info *info,
                 rc = ldlm_cli_enqueue_local(ns, res_id, LDLM_IBITS, policy,
                                             LCK_EX, &flags, ldlm_blocking_ast,
                                             ldlm_completion_ast, NULL, NULL, 0,
-                                            NULL, lh);
+                                            NULL,
+                                            &info->mti_exp->exp_handle.h_cookie,
+                                            lh);
         } else {
                 struct ldlm_enqueue_info einfo = { LDLM_IBITS, LCK_EX,
-                        ldlm_blocking_ast, ldlm_completion_ast, NULL, NULL };
+                     ldlm_blocking_ast, ldlm_completion_ast, NULL, NULL, NULL };
                 int flags = 0;
 
                 /*
                  * This is the case mdt0 is remote node, issue DLM lock like
                  * other clients.
                  */
-                rc = ldlm_cli_enqueue(ls->ls_control_exp, NULL, &einfo, res_id,
+                rc = ldlm_cli_enqueue(ms->ms_control_exp,
+                                      NULL, &einfo, res_id,
                                       policy, &flags, NULL, 0, NULL, lh, 0);
         }
 
@@ -831,7 +855,7 @@ static int mdt_reint_rename(struct mdt_thread_info *info,
         rc = mdt_rename_lock(info, &rename_lh);
         if (rc) {
                 CERROR("Can't lock FS for rename, rc %d\n", rc);
-                GOTO(out, rc);
+                RETURN(rc);
         }
 
         lh_newp = &info->mti_lh[MDT_LH_NEW];
@@ -960,8 +984,6 @@ out_unlock_source:
         mdt_object_unlock_put(info, msrcdir, lh_srcdirp, rc);
 out_rename_lock:
         mdt_rename_unlock(&rename_lh);
-out:
-        mdt_shrink_reply(info);
         return rc;
 }
 
@@ -970,11 +992,12 @@ typedef int (*mdt_reinter)(struct mdt_thread_info *info,
 
 static mdt_reinter reinters[REINT_MAX] = {
         [REINT_SETATTR]  = mdt_reint_setattr,
-        [REINT_CREATE] = mdt_reint_create,
-        [REINT_LINK] = mdt_reint_link,
-        [REINT_UNLINK] = mdt_reint_unlink,
-        [REINT_RENAME] = mdt_reint_rename,
-        [REINT_OPEN] = mdt_reint_open
+        [REINT_CREATE]   = mdt_reint_create,
+        [REINT_LINK]     = mdt_reint_link,
+        [REINT_UNLINK]   = mdt_reint_unlink,
+        [REINT_RENAME]   = mdt_reint_rename,
+        [REINT_OPEN]     = mdt_reint_open,
+        [REINT_SETXATTR] = mdt_reint_setxattr
 };
 
 int mdt_reint_rec(struct mdt_thread_info *info,