Whamcloud - gitweb
LU-12037 mdt: add option for cross-MDT rename
[fs/lustre-release.git] / lustre / mdt / mdt_handler.c
index c2bb878..2ce7a68 100644 (file)
@@ -1622,9 +1622,11 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info,
        struct lu_name         *lname = NULL;
        struct mdt_lock_handle *lhp = NULL;
        struct ldlm_lock       *lock;
+       struct req_capsule *pill = info->mti_pill;
        __u64 try_bits = 0;
        bool is_resent;
        int ma_need = 0;
+       bool deal_with_dom = false;
        int rc;
 
        ENTRY;
@@ -1681,18 +1683,20 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info,
                if (unlikely(rc != 0))
                        mdt_object_unlock(info, child, lhc, 1);
 
-                RETURN(rc);
-        }
+               mdt_pack_secctx_in_reply(info, child);
+
+               RETURN(rc);
+       }
 
        lname = &info->mti_name;
-       mdt_name_unpack(info->mti_pill, &RMF_NAME, lname, MNF_FIX_ANON);
+       mdt_name_unpack(pill, &RMF_NAME, lname, MNF_FIX_ANON);
 
        if (lu_name_is_valid(lname)) {
                CDEBUG(D_INODE, "getattr with lock for "DFID"/"DNAME", "
                       "ldlm_rep = %p\n", PFID(mdt_object_fid(parent)),
                       PNAME(lname), ldlm_rep);
        } else {
-               reqbody = req_capsule_client_get(info->mti_pill, &RMF_MDT_BODY);
+               reqbody = req_capsule_client_get(pill, &RMF_MDT_BODY);
                if (unlikely(reqbody == NULL))
                        RETURN(err_serious(-EPROTO));
 
@@ -1845,16 +1849,17 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info,
                                child_bits &= ~MDS_INODELOCK_UPDATE;
                        rc = mdt_object_lock(info, child, lhc, child_bits);
                }
-                if (unlikely(rc != 0))
-                        GOTO(out_child, rc);
-        }
+               if (unlikely(rc != 0))
+                       GOTO(out_child, rc);
+       }
 
-        lock = ldlm_handle2lock(&lhc->mlh_reg_lh);
+       lock = ldlm_handle2lock(&lhc->mlh_reg_lh);
 
-        /* finally, we can get attr for child. */
-        rc = mdt_getattr_internal(info, child, ma_need);
-        if (unlikely(rc != 0)) {
-                mdt_object_unlock(info, child, lhc, 1);
+       /* finally, we can get attr for child. */
+       rc = mdt_getattr_internal(info, child, ma_need);
+       if (unlikely(rc != 0)) {
+               mdt_object_unlock(info, child, lhc, 1);
+               GOTO(out_lock, rc);
        } else if (lock) {
                /* Debugging code. */
                LDLM_DEBUG(lock, "Returning lock to client");
@@ -1866,31 +1871,32 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info,
 
                if (S_ISREG(lu_object_attr(&child->mot_obj)) &&
                    mdt_object_exists(child) && !mdt_object_remote(child) &&
-                   child != parent) {
-                       LDLM_LOCK_PUT(lock);
-                       mdt_object_put(info->mti_env, child);
-                       /* NB: call the mdt_pack_size2body always after
-                        * mdt_object_put(), that is why this special
-                        * exit path is used. */
-                       rc = mdt_pack_size2body(info, child_fid,
-                                               &lhc->mlh_reg_lh);
-                       if (rc != 0 && child_bits & MDS_INODELOCK_DOM) {
-                               /* DOM lock was taken in advance but this is
-                                * not DoM file. Drop the lock. */
-                               lock_res_and_lock(lock);
-                               ldlm_inodebits_drop(lock, MDS_INODELOCK_DOM);
-                               unlock_res_and_lock(lock);
-                       }
-
-                       GOTO(out_parent, rc = 0);
-               }
+                   child != parent)
+                       deal_with_dom = true;
        }
+
+       mdt_pack_secctx_in_reply(info, child);
+
+out_lock:
        if (lock)
                LDLM_LOCK_PUT(lock);
 
        EXIT;
 out_child:
        mdt_object_put(info->mti_env, child);
+       if (deal_with_dom) {
+               rc = mdt_pack_size2body(info, child_fid,
+                                       &lhc->mlh_reg_lh);
+               if (rc != 0 && child_bits & MDS_INODELOCK_DOM) {
+                       /* DOM lock was taken in advance but this is
+                        * not DoM file. Drop the lock.
+                        */
+                       lock_res_and_lock(lock);
+                       ldlm_inodebits_drop(lock, MDS_INODELOCK_DOM);
+                       unlock_res_and_lock(lock);
+               }
+               rc = 0;
+       }
 out_parent:
        if (lhp)
                mdt_object_unlock(info, parent, lhp, 1);
@@ -2096,6 +2102,28 @@ static inline bool mdt_is_readonly_open(struct mdt_thread_info *info, __u32 op)
             !(info->mti_spec.sp_cr_flags & (MDS_FMODE_WRITE | MDS_OPEN_CREAT));
 }
 
+static void mdt_preset_secctx_size(struct mdt_thread_info *info)
+{
+       struct req_capsule *pill = info->mti_pill;
+
+       if (req_capsule_has_field(pill, &RMF_FILE_SECCTX,
+                                 RCL_SERVER) &&
+           req_capsule_has_field(pill, &RMF_FILE_SECCTX_NAME,
+                                 RCL_CLIENT)) {
+               if (req_capsule_get_size(pill, &RMF_FILE_SECCTX_NAME,
+                                        RCL_CLIENT) != 0) {
+                       /* pre-set size in server part with max size */
+                       req_capsule_set_size(pill, &RMF_FILE_SECCTX,
+                                            RCL_SERVER,
+                                            info->mti_mdt->mdt_max_ea_size);
+               } else {
+                       req_capsule_set_size(pill, &RMF_FILE_SECCTX,
+                                            RCL_SERVER, 0);
+               }
+       }
+
+}
+
 static int mdt_reint_internal(struct mdt_thread_info *info,
                               struct mdt_lock_handle *lhc,
                               __u32 op)
@@ -2124,7 +2152,7 @@ static int mdt_reint_internal(struct mdt_thread_info *info,
                                     DEF_REP_MD_SIZE);
 
        /* llog cookies are always 0, the field is kept for compatibility */
-        if (req_capsule_has_field(pill, &RMF_LOGCOOKIES, RCL_SERVER))
+       if (req_capsule_has_field(pill, &RMF_LOGCOOKIES, RCL_SERVER))
                req_capsule_set_size(pill, &RMF_LOGCOOKIES, RCL_SERVER, 0);
 
        /* Set ACL reply buffer size as LUSTRE_POSIX_ACL_MAX_SIZE_OLD
@@ -2134,33 +2162,35 @@ static int mdt_reint_internal(struct mdt_thread_info *info,
                req_capsule_set_size(pill, &RMF_ACL, RCL_SERVER,
                                     LUSTRE_POSIX_ACL_MAX_SIZE_OLD);
 
-        rc = req_capsule_server_pack(pill);
-        if (rc != 0) {
-                CERROR("Can't pack response, rc %d\n", rc);
-                RETURN(err_serious(rc));
-        }
+       mdt_preset_secctx_size(info);
 
-        if (req_capsule_has_field(pill, &RMF_MDT_BODY, RCL_SERVER)) {
-                repbody = req_capsule_server_get(pill, &RMF_MDT_BODY);
-                LASSERT(repbody);
+       rc = req_capsule_server_pack(pill);
+       if (rc != 0) {
+               CERROR("Can't pack response, rc %d\n", rc);
+               RETURN(err_serious(rc));
+       }
+
+       if (req_capsule_has_field(pill, &RMF_MDT_BODY, RCL_SERVER)) {
+               repbody = req_capsule_server_get(pill, &RMF_MDT_BODY);
+               LASSERT(repbody);
                repbody->mbo_eadatasize = 0;
                repbody->mbo_aclsize = 0;
-        }
+       }
 
-        OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_REINT_DELAY, 10);
+       OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_REINT_DELAY, 10);
 
-        /* for replay no cookkie / lmm need, because client have this already */
-        if (info->mti_spec.no_create)
-                if (req_capsule_has_field(pill, &RMF_MDT_MD, RCL_SERVER))
-                        req_capsule_set_size(pill, &RMF_MDT_MD, RCL_SERVER, 0);
+       /* for replay no cookkie / lmm need, because client have this already */
+       if (info->mti_spec.no_create)
+               if (req_capsule_has_field(pill, &RMF_MDT_MD, RCL_SERVER))
+                       req_capsule_set_size(pill, &RMF_MDT_MD, RCL_SERVER, 0);
 
-        rc = mdt_init_ucred_reint(info);
-        if (rc)
-                GOTO(out_shrink, rc);
+       rc = mdt_init_ucred_reint(info);
+       if (rc)
+               GOTO(out_shrink, rc);
 
-        rc = mdt_fix_attr_ucred(info, op);
-        if (rc != 0)
-                GOTO(out_ucred, rc = err_serious(rc));
+       rc = mdt_fix_attr_ucred(info, op);
+       if (rc != 0)
+               GOTO(out_ucred, rc = err_serious(rc));
 
        rc = mdt_check_resent(info, mdt_reconstruct, lhc);
        if (rc < 0) {
@@ -2168,12 +2198,12 @@ static int mdt_reint_internal(struct mdt_thread_info *info,
        } else if (rc == 1) {
                DEBUG_REQ(D_INODE, mdt_info_req(info), "resent opt.");
                rc = lustre_msg_get_status(mdt_info_req(info)->rq_repmsg);
-                GOTO(out_ucred, rc);
-        }
-        rc = mdt_reint_rec(info, lhc);
-        EXIT;
+               GOTO(out_ucred, rc);
+       }
+       rc = mdt_reint_rec(info, lhc);
+       EXIT;
 out_ucred:
-        mdt_exit_ucred(info);
+       mdt_exit_ucred(info);
 out_shrink:
        mdt_client_compatibility(info);
 
@@ -3291,7 +3321,7 @@ void mdt_object_unlock_put(struct mdt_thread_info * info,
  *  - create lu_object, corresponding to the fid in mdt_body, and save it in
  *  @info;
  *
- *  - if HABEO_CORPUS flag is set for this request type check whether object
+ *  - if HAS_BODY flag is set for this request type check whether object
  *  actually exists on storage (lu_object_exists()).
  *
  */
@@ -3322,7 +3352,7 @@ static int mdt_body_unpack(struct mdt_thread_info *info,
 
        obj = mdt_object_find(env, info->mti_mdt, &body->mbo_fid1);
        if (!IS_ERR(obj)) {
-               if ((flags & HABEO_CORPUS) && !mdt_object_exists(obj)) {
+               if ((flags & HAS_BODY) && !mdt_object_exists(obj)) {
                        mdt_object_put(env, obj);
                        rc = -ENOENT;
                 } else {
@@ -3338,21 +3368,22 @@ static int mdt_body_unpack(struct mdt_thread_info *info,
 static int mdt_unpack_req_pack_rep(struct mdt_thread_info *info,
                                   enum tgt_handler_flags flags)
 {
-        struct req_capsule *pill = info->mti_pill;
-        int rc;
-        ENTRY;
+       struct req_capsule *pill = info->mti_pill;
+       int rc;
 
-        if (req_capsule_has_field(pill, &RMF_MDT_BODY, RCL_CLIENT))
-                rc = mdt_body_unpack(info, flags);
-        else
-                rc = 0;
+       ENTRY;
+
+       if (req_capsule_has_field(pill, &RMF_MDT_BODY, RCL_CLIENT))
+               rc = mdt_body_unpack(info, flags);
+       else
+               rc = 0;
 
-        if (rc == 0 && (flags & HABEO_REFERO)) {
-                /* Pack reply. */
-                if (req_capsule_has_field(pill, &RMF_MDT_MD, RCL_SERVER))
-                        req_capsule_set_size(pill, &RMF_MDT_MD, RCL_SERVER,
+       if (rc == 0 && (flags & HAS_REPLY)) {
+               /* Pack reply. */
+               if (req_capsule_has_field(pill, &RMF_MDT_MD, RCL_SERVER))
+                       req_capsule_set_size(pill, &RMF_MDT_MD, RCL_SERVER,
                                             DEF_REP_MD_SIZE);
-                if (req_capsule_has_field(pill, &RMF_LOGCOOKIES, RCL_SERVER))
+               if (req_capsule_has_field(pill, &RMF_LOGCOOKIES, RCL_SERVER))
                        req_capsule_set_size(pill, &RMF_LOGCOOKIES,
                                             RCL_SERVER, 0);
 
@@ -3363,9 +3394,14 @@ static int mdt_unpack_req_pack_rep(struct mdt_thread_info *info,
                        req_capsule_set_size(pill, &RMF_ACL, RCL_SERVER,
                                             LUSTRE_POSIX_ACL_MAX_SIZE_OLD);
 
-                rc = req_capsule_server_pack(pill);
-        }
-        RETURN(rc);
+               mdt_preset_secctx_size(info);
+
+               rc = req_capsule_server_pack(pill);
+               if (rc)
+                       CWARN("%s: cannot pack response: rc = %d\n",
+                                     mdt_obd_name(info->mti_mdt), rc);
+       }
+       RETURN(rc);
 }
 
 void mdt_lock_handle_init(struct mdt_lock_handle *lh)
@@ -3666,7 +3702,10 @@ static int mdt_intent_getxattr(enum ldlm_intent_flags it_opc,
        if (ldlm_rep == NULL ||
            OBD_FAIL_CHECK(OBD_FAIL_MDS_XATTR_REP)) {
                mdt_object_unlock(info,  info->mti_object, lhc, 1);
-               RETURN(err_serious(-EFAULT));
+               if (is_serious(rc))
+                       RETURN(rc);
+               else
+                       RETURN(err_serious(-EFAULT));
        }
 
        ldlm_rep->lock_policy_res2 = clear_serious(rc);
@@ -3929,11 +3968,15 @@ static int mdt_intent_open(enum ldlm_intent_flags it_opc,
 
         rc = mdt_reint_internal(info, lhc, opc);
 
-        /* Check whether the reply has been packed successfully. */
-        if (mdt_info_req(info)->rq_repmsg != NULL)
-                rep = req_capsule_server_get(info->mti_pill, &RMF_DLM_REP);
-        if (rep == NULL)
-                RETURN(err_serious(-EFAULT));
+       /* Check whether the reply has been packed successfully. */
+       if (mdt_info_req(info)->rq_repmsg != NULL)
+               rep = req_capsule_server_get(info->mti_pill, &RMF_DLM_REP);
+       if (rep == NULL) {
+               if (is_serious(rc))
+                       RETURN(rc);
+               else
+                       RETURN(err_serious(-EFAULT));
+       }
 
         /* MDC expects this in any case */
         if (rc != 0)
@@ -3994,7 +4037,7 @@ static int mdt_intent_opc(enum ldlm_intent_flags it_opc,
        case IT_OPEN:
        case IT_OPEN|IT_CREAT:
                /*
-                * OCREAT is not a MUTABOR request since the file may
+                * OCREAT is not a IS_MUTABLE request since the file may
                 * already exist. We do the extra check of
                 * OBD_CONNECT_RDONLY in mdt_reint_open() when we
                 * really need to create the object.
@@ -4006,12 +4049,12 @@ static int mdt_intent_opc(enum ldlm_intent_flags it_opc,
        case IT_LOOKUP:
                it_format = &RQF_LDLM_INTENT_GETATTR;
                it_handler = &mdt_intent_getattr;
-               it_handler_flags = HABEO_REFERO;
+               it_handler_flags = HAS_REPLY;
                break;
        case IT_GETXATTR:
                it_format = &RQF_LDLM_INTENT_GETXATTR;
                it_handler = &mdt_intent_getxattr;
-               it_handler_flags = HABEO_CORPUS;
+               it_handler_flags = HAS_BODY;
                break;
        case IT_LAYOUT:
                it_format = &RQF_LDLM_INTENT_LAYOUT;
@@ -4054,7 +4097,7 @@ static int mdt_intent_opc(enum ldlm_intent_flags it_opc,
        if (rc < 0)
                RETURN(rc);
 
-       if (it_handler_flags & MUTABOR && mdt_rdonly(req->rq_export))
+       if (it_handler_flags & IS_MUTABLE && mdt_rdonly(req->rq_export))
                RETURN(-EROFS);
 
        OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_INTENT_DELAY, 10);
@@ -4832,47 +4875,47 @@ TGT_RPC_HANDLER(MDS_FIRST_OPC,
                0,                      MDS_DISCONNECT, tgt_disconnect,
                &RQF_MDS_DISCONNECT, LUSTRE_OBD_VERSION),
 TGT_RPC_HANDLER(MDS_FIRST_OPC,
-               HABEO_REFERO,           MDS_SET_INFO,   mdt_set_info,
+               HAS_REPLY,              MDS_SET_INFO,   mdt_set_info,
                &RQF_OBD_SET_INFO, LUSTRE_MDS_VERSION),
 TGT_MDT_HDL(0,                         MDS_GET_INFO,   mdt_get_info),
-TGT_MDT_HDL(0          | HABEO_REFERO, MDS_GET_ROOT,   mdt_get_root),
-TGT_MDT_HDL(HABEO_CORPUS,              MDS_GETATTR,    mdt_getattr),
-TGT_MDT_HDL(HABEO_CORPUS| HABEO_REFERO,        MDS_GETATTR_NAME,
+TGT_MDT_HDL(HAS_REPLY,         MDS_GET_ROOT,   mdt_get_root),
+TGT_MDT_HDL(HAS_BODY,          MDS_GETATTR,    mdt_getattr),
+TGT_MDT_HDL(HAS_BODY | HAS_REPLY,      MDS_GETATTR_NAME,
                                                        mdt_getattr_name),
-TGT_MDT_HDL(HABEO_CORPUS,              MDS_GETXATTR,   mdt_tgt_getxattr),
-TGT_MDT_HDL(0          | HABEO_REFERO, MDS_STATFS,     mdt_statfs),
-TGT_MDT_HDL(0          | MUTABOR,      MDS_REINT,      mdt_reint),
-TGT_MDT_HDL(HABEO_CORPUS,              MDS_CLOSE,      mdt_close),
-TGT_MDT_HDL(HABEO_CORPUS| HABEO_REFERO,        MDS_READPAGE,   mdt_readpage),
-TGT_MDT_HDL(HABEO_CORPUS| HABEO_REFERO,        MDS_SYNC,       mdt_sync),
+TGT_MDT_HDL(HAS_BODY,          MDS_GETXATTR,   mdt_tgt_getxattr),
+TGT_MDT_HDL(HAS_REPLY,         MDS_STATFS,     mdt_statfs),
+TGT_MDT_HDL(IS_MUTABLE,                MDS_REINT,      mdt_reint),
+TGT_MDT_HDL(HAS_BODY,          MDS_CLOSE,      mdt_close),
+TGT_MDT_HDL(HAS_BODY | HAS_REPLY,      MDS_READPAGE,   mdt_readpage),
+TGT_MDT_HDL(HAS_BODY | HAS_REPLY,      MDS_SYNC,       mdt_sync),
 TGT_MDT_HDL(0,                         MDS_QUOTACTL,   mdt_quotactl),
-TGT_MDT_HDL(HABEO_CORPUS| HABEO_REFERO | MUTABOR, MDS_HSM_PROGRESS,
+TGT_MDT_HDL(HAS_BODY | HAS_REPLY | IS_MUTABLE, MDS_HSM_PROGRESS,
                                                        mdt_hsm_progress),
-TGT_MDT_HDL(HABEO_CORPUS| HABEO_REFERO | MUTABOR, MDS_HSM_CT_REGISTER,
+TGT_MDT_HDL(HAS_BODY | HAS_REPLY | IS_MUTABLE, MDS_HSM_CT_REGISTER,
                                                        mdt_hsm_ct_register),
-TGT_MDT_HDL(HABEO_CORPUS| HABEO_REFERO | MUTABOR, MDS_HSM_CT_UNREGISTER,
+TGT_MDT_HDL(HAS_BODY | HAS_REPLY | IS_MUTABLE, MDS_HSM_CT_UNREGISTER,
                                                        mdt_hsm_ct_unregister),
-TGT_MDT_HDL(HABEO_CORPUS| HABEO_REFERO, MDS_HSM_STATE_GET,
+TGT_MDT_HDL(HAS_BODY | HAS_REPLY, MDS_HSM_STATE_GET,
                                                        mdt_hsm_state_get),
-TGT_MDT_HDL(HABEO_CORPUS| HABEO_REFERO | MUTABOR, MDS_HSM_STATE_SET,
+TGT_MDT_HDL(HAS_BODY | HAS_REPLY | IS_MUTABLE, MDS_HSM_STATE_SET,
                                                        mdt_hsm_state_set),
-TGT_MDT_HDL(HABEO_CORPUS| HABEO_REFERO, MDS_HSM_ACTION,        mdt_hsm_action),
-TGT_MDT_HDL(HABEO_CORPUS| HABEO_REFERO, MDS_HSM_REQUEST,
+TGT_MDT_HDL(HAS_BODY | HAS_REPLY, MDS_HSM_ACTION,      mdt_hsm_action),
+TGT_MDT_HDL(HAS_BODY | HAS_REPLY, MDS_HSM_REQUEST,
                                                        mdt_hsm_request),
-TGT_MDT_HDL(HABEO_CLAVIS | HABEO_CORPUS | HABEO_REFERO | MUTABOR,
+TGT_MDT_HDL(HAS_KEY | HAS_BODY | HAS_REPLY | IS_MUTABLE,
            MDS_SWAP_LAYOUTS,
            mdt_swap_layouts),
 };
 
 static struct tgt_handler mdt_io_ops[] = {
-TGT_OST_HDL_HP(HABEO_CORPUS | HABEO_REFERO, OST_BRW_READ, tgt_brw_read,
+TGT_OST_HDL_HP(HAS_BODY | HAS_REPLY, OST_BRW_READ, tgt_brw_read,
                                                        mdt_hp_brw),
-TGT_OST_HDL_HP(HABEO_CORPUS | MUTABOR,  OST_BRW_WRITE, tgt_brw_write,
+TGT_OST_HDL_HP(HAS_BODY | IS_MUTABLE,   OST_BRW_WRITE, tgt_brw_write,
                                                        mdt_hp_brw),
-TGT_OST_HDL_HP(HABEO_CORPUS | HABEO_REFERO | MUTABOR,
+TGT_OST_HDL_HP(HAS_BODY | HAS_REPLY | IS_MUTABLE,
                                         OST_PUNCH,     mdt_punch_hdl,
                                                        mdt_hp_punch),
-TGT_OST_HDL(HABEO_CORPUS | HABEO_REFERO, OST_SYNC,     mdt_data_sync),
+TGT_OST_HDL(HAS_BODY | HAS_REPLY, OST_SYNC,    mdt_data_sync),
 };
 
 static struct tgt_handler mdt_sec_ctx_ops[] = {
@@ -4882,7 +4925,7 @@ TGT_SEC_HDL_VAR(0,                        SEC_CTX_FINI,     mdt_sec_ctx_handle)
 };
 
 static struct tgt_handler mdt_quota_ops[] = {
-TGT_QUOTA_HDL(HABEO_REFERO,            QUOTA_DQACQ,      mdt_quota_dqacq),
+TGT_QUOTA_HDL(HAS_REPLY,               QUOTA_DQACQ,      mdt_quota_dqacq),
 };
 
 static struct tgt_opc_slice mdt_common_slice[] = {
@@ -5101,6 +5144,7 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m,
        m->mdt_enable_striped_dir = 1;
        m->mdt_enable_dir_migration = 1;
        m->mdt_enable_remote_dir_gid = 0;
+       m->mdt_enable_remote_rename = 1;
 
        atomic_set(&m->mdt_mds_mds_conns, 0);
        atomic_set(&m->mdt_async_commit_count, 0);