/**
* Handler of layout intent RPC requiring the layout modification
*
- * \param[in] info thread environment
- * \param[in] obj object
- * \param[in] layout layout change descriptor
+ * \param[in] info thread environment
+ * \param[in] obj object
+ * \param[out] lhc object ldlm lock handle
+ * \param[in] layout layout change descriptor
*
* \retval 0 on success
* \retval < 0 error code
*/
int mdt_layout_change(struct mdt_thread_info *info, struct mdt_object *obj,
+ struct mdt_lock_handle *lhc,
struct md_layout_change *layout)
{
- struct mdt_lock_handle *lh = &info->mti_lh[MDT_LH_LOCAL];
int rc;
+
ENTRY;
if (!mdt_object_exists(obj))
- GOTO(out, rc = -ENOENT);
+ RETURN(-ENOENT);
if (!S_ISREG(lu_object_attr(&obj->mot_obj)))
- GOTO(out, rc = -EINVAL);
+ RETURN(-EINVAL);
rc = mo_permission(info->mti_env, NULL, mdt_object_child(obj), NULL,
MAY_WRITE);
if (rc)
- GOTO(out, rc);
+ RETURN(rc);
- /* take layout lock to prepare layout change */
- mdt_lock_reg_init(lh, LCK_EX);
- rc = mdt_object_lock(info, obj, lh, MDS_INODELOCK_LAYOUT);
- if (rc)
- GOTO(out, rc);
+ rc = mdt_check_resent_lock(info, obj, lhc);
+ if (rc < 0)
+ RETURN(rc);
+
+ if (rc > 0) {
+ /* not resent */
+ mdt_lock_handle_init(lhc);
+ mdt_lock_reg_init(lhc, LCK_EX);
+ rc = mdt_reint_object_lock(info, obj, lhc, MDS_INODELOCK_LAYOUT,
+ false);
+ if (rc)
+ RETURN(rc);
+ }
mutex_lock(&obj->mot_som_mutex);
rc = mo_layout_change(info->mti_env, mdt_object_child(obj), layout);
mutex_unlock(&obj->mot_som_mutex);
- mdt_object_unlock(info, obj, lh, 1);
-out:
+
+ if (rc)
+ mdt_object_unlock(info, obj, lhc, 1);
+
RETURN(rc);
}
struct ldlm_lock **lockp,
__u64 flags)
{
- struct mdt_lock_handle *lhc = &info->mti_lh[MDT_LH_LAYOUT];
+ struct mdt_lock_handle *lhc = &info->mti_lh[MDT_LH_RMT];
struct md_layout_change layout = { .mlc_opc = MD_LAYOUT_NOP };
struct layout_intent *intent;
+ struct ldlm_reply *ldlm_rep;
struct lu_fid *fid = &info->mti_tmp_fid2;
struct mdt_object *obj = NULL;
int layout_size = 0;
+ struct lu_buf *buf = &layout.mlc_buf;
int rc = 0;
+
ENTRY;
fid_extract_from_res_name(fid, &(*lockp)->l_resource->lr_name);
case LAYOUT_INTENT_RESTORE:
CERROR("%s: Unsupported layout intent opc %d\n",
mdt_obd_name(info->mti_mdt), intent->li_opc);
- rc = -ENOTSUPP;
- break;
+ RETURN(-ENOTSUPP);
default:
CERROR("%s: Unknown layout intent opc %d\n",
mdt_obd_name(info->mti_mdt), intent->li_opc);
- rc = -EINVAL;
- break;
+ RETURN(-EINVAL);
}
- if (rc < 0)
- RETURN(rc);
-
- /* Get lock from request for possible resent case. */
- mdt_intent_fixup_resent(info, *lockp, lhc, flags);
obj = mdt_object_find(info->mti_env, info->mti_mdt, fid);
if (IS_ERR(obj))
- GOTO(out, rc = PTR_ERR(obj));
-
+ RETURN(PTR_ERR(obj));
if (mdt_object_exists(obj) && !mdt_object_remote(obj)) {
/* if layout is going to be changed don't use the current EA
} else {
layout_size = mdt_attr_get_eabuf_size(info, obj);
if (layout_size < 0)
- GOTO(out_obj, rc = layout_size);
+ GOTO(out, rc = layout_size);
if (layout_size > info->mti_mdt->mdt_max_mdsize)
info->mti_mdt->mdt_max_mdsize = layout_size;
* set reply buffer size, so that ldlm_handle_enqueue0()->
* ldlm_lvbo_fill() will fill the reply buffer with lovea.
*/
- (*lockp)->l_lvb_type = LVB_T_LAYOUT;
req_capsule_set_size(info->mti_pill, &RMF_DLM_LVB, RCL_SERVER,
layout_size);
rc = req_capsule_server_pack(info->mti_pill);
if (rc)
- GOTO(out_obj, rc);
+ GOTO(out, rc);
+ ldlm_rep = req_capsule_server_get(info->mti_pill, &RMF_DLM_REP);
+ if (!ldlm_rep)
+ GOTO(out, rc = -EPROTO);
- if (layout.mlc_opc != MD_LAYOUT_NOP) {
- struct lu_buf *buf = &layout.mlc_buf;
+ mdt_set_disposition(info, ldlm_rep, DISP_IT_EXECD);
- /**
- * mdt_layout_change is a reint operation, when the request
- * is resent, layout write shouldn't reprocess it again.
- */
- rc = mdt_check_resent(info, mdt_reconstruct_generic, lhc);
- if (rc)
- GOTO(out_obj, rc = rc < 0 ? rc : 0);
+ /* take lock in ldlm_lock_enqueue() for LAYOUT_INTENT_ACCESS */
+ if (layout.mlc_opc == MD_LAYOUT_NOP)
+ GOTO(out, rc = 0);
- /**
- * There is another resent case: the client's job has been
- * done by another client, referring lod_declare_layout_change
- * -EALREADY case, and it became a operation w/o transaction,
- * so we should not do the layout change, otherwise
- * mdt_layout_change() will try to cancel the granted server
- * CR lock whose remote counterpart is still in hold on the
- * client, and a deadlock ensues.
- */
- rc = mdt_check_resent_lock(info, obj, lhc);
- if (rc <= 0)
- GOTO(out_obj, rc);
-
- buf->lb_buf = NULL;
- buf->lb_len = 0;
- if (unlikely(req_is_replay(mdt_info_req(info)))) {
- buf->lb_buf = req_capsule_client_get(info->mti_pill,
- &RMF_EADATA);
- buf->lb_len = req_capsule_get_size(info->mti_pill,
- &RMF_EADATA, RCL_CLIENT);
- /*
- * If it's a replay of layout write intent RPC, the
- * client has saved the extended lovea when
- * it get reply then.
- */
- if (buf->lb_len > 0)
- mdt_fix_lov_magic(info, buf->lb_buf);
- }
+ rc = mdt_check_resent(info, mdt_reconstruct_generic, lhc);
+ if (rc < 0)
+ GOTO(out, rc);
+ if (rc == 1) {
+ DEBUG_REQ(D_INODE, mdt_info_req(info), "resent opt.");
+ rc = lustre_msg_get_status(mdt_info_req(info)->rq_repmsg);
+ GOTO(out, rc);
+ }
+
+ buf->lb_buf = NULL;
+ buf->lb_len = 0;
+ if (unlikely(req_is_replay(mdt_info_req(info)))) {
+ buf->lb_buf = req_capsule_client_get(info->mti_pill,
+ &RMF_EADATA);
+ buf->lb_len = req_capsule_get_size(info->mti_pill,
+ &RMF_EADATA, RCL_CLIENT);
/*
- * Instantiate some layout components, if @buf contains
- * lovea, then it's a replay of the layout intent write
- * RPC.
+ * If it's a replay of layout write intent RPC, the client has
+ * saved the extended lovea when it get reply then.
*/
- rc = mdt_layout_change(info, obj, &layout);
- if (rc)
- GOTO(out_obj, rc);
+ if (buf->lb_len > 0)
+ mdt_fix_lov_magic(info, buf->lb_buf);
}
-out_obj:
- mdt_object_put(info->mti_env, obj);
- if (rc == 0 && lustre_handle_is_used(&lhc->mlh_reg_lh))
+ /* Get lock from request for possible resent case. */
+ mdt_intent_fixup_resent(info, *lockp, lhc, flags);
+ (*lockp)->l_lvb_type = LVB_T_LAYOUT;
+
+ /*
+ * Instantiate some layout components, if @buf contains lovea, then it's
+ * a replay of the layout intent write RPC.
+ */
+ rc = mdt_layout_change(info, obj, lhc, &layout);
+ ldlm_rep->lock_policy_res2 = clear_serious(rc);
+
+ if (lustre_handle_is_used(&lhc->mlh_reg_lh)) {
rc = mdt_intent_lock_replace(info, lockp, lhc, flags, rc);
+ if (rc == ELDLM_LOCK_REPLACED &&
+ (*lockp)->l_granted_mode == LCK_EX)
+ ldlm_lock_mode_downgrade(*lockp, LCK_CR);
+ }
+ EXIT;
out:
- lhc->mlh_reg_lh.cookie = 0;
-
- RETURN(rc);
+ mdt_object_put(info->mti_env, obj);
+ return rc;
}
static int mdt_intent_open(enum ldlm_intent_flags it_opc,