#include <lustre_acl.h>
#include <lustre_param.h>
#include <lustre_quota.h>
-#include <lustre_linkea.h>
#include <lustre_lfsck.h>
mdl_mode_t mdt_mdl_lock_modes[] = {
RETURN(rc);
}
-static int mdt_stripe_get(struct mdt_thread_info *info, struct mdt_object *o,
- struct md_attr *ma, const char *name)
+int mdt_stripe_get(struct mdt_thread_info *info, struct mdt_object *o,
+ struct md_attr *ma, const char *name)
{
struct md_object *next = mdt_object_child(o);
struct lu_buf *buf = &info->mti_buf;
ma_need |= MA_LOV;
}
} else {
+ /* Do not enqueue the UPDATE lock from MDT(cross-MDT),
+ * client will enqueue the lock to the remote MDT */
+ if (mdt_object_remote(child))
+ child_bits &= ~MDS_INODELOCK_UPDATE;
rc = mdt_object_lock(info, child, lhc, child_bits,
MDT_CROSS_LOCK);
}
GOTO(out_ucred, rc = err_serious(rc));
if (mdt_check_resent(info, mdt_reconstruct, lhc)) {
- rc = lustre_msg_get_status(mdt_info_req(info)->rq_repmsg);
+ DEBUG_REQ(D_INODE, mdt_info_req(info), "resent opt.");
+ rc = lustre_msg_get_status(mdt_info_req(info)->rq_repmsg);
GOTO(out_ucred, rc);
}
rc = mdt_reint_rec(info, lhc);
[REINT_RENAME] = &RQF_MDS_REINT_RENAME,
[REINT_OPEN] = &RQF_MDS_REINT_OPEN,
[REINT_SETXATTR] = &RQF_MDS_REINT_SETXATTR,
- [REINT_RMENTRY] = &RQF_MDS_REINT_UNLINK
+ [REINT_RMENTRY] = &RQF_MDS_REINT_UNLINK,
+ [REINT_MIGRATE] = &RQF_MDS_REINT_RENAME
};
ENTRY;
}
int mdt_remote_object_lock(struct mdt_thread_info *mti,
- struct mdt_object *o, struct lustre_handle *lh,
- ldlm_mode_t mode, __u64 ibits)
+ struct mdt_object *o, const struct lu_fid *fid,
+ struct lustre_handle *lh, ldlm_mode_t mode,
+ __u64 ibits)
{
struct ldlm_enqueue_info *einfo = &mti->mti_einfo;
ldlm_policy_data_t *policy = &mti->mti_policy;
+ struct ldlm_res_id *res_id = &mti->mti_res_id;
int rc = 0;
ENTRY;
LASSERT(mdt_object_remote(o));
- LASSERT(ibits & MDS_INODELOCK_UPDATE);
+ LASSERT(ibits == MDS_INODELOCK_UPDATE);
+
+ fid_build_reg_res_name(fid, res_id);
memset(einfo, 0, sizeof(*einfo));
einfo->ei_type = LDLM_IBITS;
einfo->ei_cb_bl = mdt_remote_blocking_ast;
einfo->ei_cb_cp = ldlm_completion_ast;
einfo->ei_enq_slave = 0;
+ einfo->ei_res_id = res_id;
memset(policy, 0, sizeof(*policy));
policy->l_inodebits.bits = ibits;
RETURN(rc);
}
-static int mdt_object_lock0(struct mdt_thread_info *info, struct mdt_object *o,
- struct mdt_lock_handle *lh, __u64 ibits,
- bool nonblock, int locality)
+static int mdt_object_local_lock(struct mdt_thread_info *info,
+ struct mdt_object *o,
+ struct mdt_lock_handle *lh, __u64 ibits,
+ bool nonblock, int locality)
{
struct ldlm_namespace *ns = info->mti_mdt->mdt_namespace;
ldlm_policy_data_t *policy = &info->mti_policy;
LASSERT(lh->mlh_reg_mode != LCK_MINMODE);
LASSERT(lh->mlh_type != MDT_NUL_LOCK);
- if (mdt_object_remote(o)) {
- if (locality == MDT_CROSS_LOCK) {
- ibits &= ~(MDS_INODELOCK_UPDATE | MDS_INODELOCK_PERM |
- MDS_INODELOCK_LAYOUT);
- ibits |= MDS_INODELOCK_LOOKUP;
- } else {
- LASSERTF(!(ibits &
- (MDS_INODELOCK_UPDATE | MDS_INODELOCK_PERM |
- MDS_INODELOCK_LAYOUT)),
- "%s: wrong bit "LPX64" for remote obj "DFID"\n",
- mdt_obd_name(info->mti_mdt), ibits,
- PFID(mdt_object_fid(o)));
- LASSERT(ibits & MDS_INODELOCK_LOOKUP);
- }
- /* No PDO lock on remote object */
- LASSERT(lh->mlh_type != MDT_PDO_LOCK);
- }
+ /* Only enqueue LOOKUP lock for remote object */
+ if (mdt_object_remote(o))
+ LASSERT(ibits == MDS_INODELOCK_LOOKUP);
if (lh->mlh_type == MDT_PDO_LOCK) {
/* check for exists after object is locked */
RETURN(rc);
}
+int mdt_object_lock_internal(struct mdt_thread_info *info, struct mdt_object *o,
+ struct mdt_lock_handle *lh, __u64 ibits,
+ bool nonblock, int locality)
+{
+ int rc;
+ ENTRY;
+
+ if (!mdt_object_remote(o))
+ return mdt_object_local_lock(info, o, lh, ibits, nonblock,
+ locality);
+
+ if (locality == MDT_LOCAL_LOCK) {
+ CERROR("%s: try to get local lock for remote object"
+ DFID".\n", mdt_obd_name(info->mti_mdt),
+ PFID(mdt_object_fid(o)));
+ RETURN(-EPROTO);
+ }
+
+ /* XXX do not support PERM/LAYOUT/XATTR lock for remote object yet */
+ ibits &= ~(MDS_INODELOCK_PERM | MDS_INODELOCK_LAYOUT |
+ MDS_INODELOCK_XATTR);
+ if (ibits & MDS_INODELOCK_UPDATE) {
+ /* Sigh, PDO needs to enqueue 2 locks right now, but
+ * enqueue RPC can only request 1 lock, to avoid extra
+ * RPC, so it will instead enqueue EX lock for remote
+ * object anyway XXX*/
+ if (lh->mlh_type == MDT_PDO_LOCK &&
+ lh->mlh_pdo_hash != 0) {
+ CDEBUG(D_INFO, "%s: "DFID" convert PDO lock to"
+ "EX lock.\n", mdt_obd_name(info->mti_mdt),
+ PFID(mdt_object_fid(o)));
+ lh->mlh_pdo_hash = 0;
+ lh->mlh_rreg_mode = LCK_EX;
+ lh->mlh_type = MDT_REG_LOCK;
+ }
+ rc = mdt_remote_object_lock(info, o, mdt_object_fid(o),
+ &lh->mlh_rreg_lh,
+ lh->mlh_rreg_mode,
+ MDS_INODELOCK_UPDATE);
+ if (rc != ELDLM_OK)
+ RETURN(rc);
+ }
+
+ /* Only enqueue LOOKUP lock for remote object */
+ if (ibits & MDS_INODELOCK_LOOKUP) {
+ rc = mdt_object_local_lock(info, o, lh,
+ MDS_INODELOCK_LOOKUP,
+ nonblock, locality);
+ if (rc != ELDLM_OK)
+ RETURN(rc);
+ }
+
+ RETURN(0);
+}
+
int mdt_object_lock(struct mdt_thread_info *info, struct mdt_object *o,
struct mdt_lock_handle *lh, __u64 ibits, int locality)
{
- return mdt_object_lock0(info, o, lh, ibits, false, locality);
+ return mdt_object_lock_internal(info, o, lh, ibits, false, locality);
}
int mdt_object_lock_try(struct mdt_thread_info *info, struct mdt_object *o,
struct mdt_lock_handle tmp = *lh;
int rc;
- rc = mdt_object_lock0(info, o, &tmp, ibits, true, locality);
+ rc = mdt_object_lock_internal(info, o, &tmp, ibits, true, locality);
if (rc == 0)
*lh = tmp;
info->mti_spec.no_create = 0;
info->mti_spec.sp_rm_entry = 0;
+
+ info->mti_spec.u.sp_ea.eadata = NULL;
+ info->mti_spec.u.sp_ea.eadatalen = 0;
}
void mdt_thread_info_fini(struct mdt_thread_info *info)
site->ls_top_dev = &mdt->mdt_lu_dev;
mdt->mdt_child = lu2md_dev(mdt->mdt_child_exp->exp_obd->obd_lu_dev);
-
/* now connect to bottom OSD */
snprintf(name, MAX_OBD_NAME, "%s-osd", dev);
rc = mdt_connect_to_next(env, mdt, name, &mdt->mdt_bottom_exp);
mdt->mdt_bottom =
lu2dt_dev(mdt->mdt_bottom_exp->exp_obd->obd_lu_dev);
-
rc = lu_env_refill((struct lu_env *)env);
if (rc != 0)
CERROR("Failure to refill session: '%d'\n", rc);
.tos_hs = mdt_sec_ctx_ops
},
{
- .tos_opc_start = UPDATE_OBJ,
- .tos_opc_end = UPDATE_LAST_OPC,
+ .tos_opc_start = OUT_UPDATE_FIRST_OPC,
+ .tos_opc_end = OUT_UPDATE_LAST_OPC,
.tos_hs = tgt_out_handlers
},
{
int pli_fidcount; /**< number of \a pli_fids */
};
-static int mdt_links_read(struct mdt_thread_info *info,
- struct mdt_object *mdt_obj, struct linkea_data *ldata)
+int mdt_links_read(struct mdt_thread_info *info, struct mdt_object *mdt_obj,
+ struct linkea_data *ldata)
{
int rc;