buf->lb_buf != info->mti_big_acl) {
if (info->mti_big_acl == NULL) {
info->mti_big_aclsize =
- MIN(mdt->mdt_max_ea_size,
- XATTR_SIZE_MAX);
+ min_t(unsigned int,
+ mdt->mdt_max_ea_size,
+ XATTR_SIZE_MAX);
OBD_ALLOC_LARGE(info->mti_big_acl,
info->mti_big_aclsize);
if (info->mti_big_acl == NULL) {
/**
* Handler of layout intent RPC requiring the layout modification
*
- * \param[in] info thread environment
- * \param[in] obj object
- * \param[in] layout layout change descriptor
+ * \param[in] info thread environment
+ * \param[in] obj object
+ * \param[out] lhc object ldlm lock handle
+ * \param[in] layout layout change descriptor
*
* \retval 0 on success
* \retval < 0 error code
*/
int mdt_layout_change(struct mdt_thread_info *info, struct mdt_object *obj,
+ struct mdt_lock_handle *lhc,
struct md_layout_change *layout)
{
- struct mdt_lock_handle *lh = &info->mti_lh[MDT_LH_LOCAL];
int rc;
+
ENTRY;
if (!mdt_object_exists(obj))
- GOTO(out, rc = -ENOENT);
+ RETURN(-ENOENT);
if (!S_ISREG(lu_object_attr(&obj->mot_obj)))
- GOTO(out, rc = -EINVAL);
+ RETURN(-EINVAL);
rc = mo_permission(info->mti_env, NULL, mdt_object_child(obj), NULL,
MAY_WRITE);
if (rc)
- GOTO(out, rc);
+ RETURN(rc);
- /* take layout lock to prepare layout change */
- mdt_lock_reg_init(lh, LCK_EX);
- rc = mdt_object_lock(info, obj, lh, MDS_INODELOCK_LAYOUT);
- if (rc)
- GOTO(out, rc);
+ rc = mdt_check_resent_lock(info, obj, lhc);
+ if (rc < 0)
+ RETURN(rc);
+
+ if (rc > 0) {
+ /* not resent */
+ mdt_lock_handle_init(lhc);
+ mdt_lock_reg_init(lhc, LCK_EX);
+ rc = mdt_reint_object_lock(info, obj, lhc, MDS_INODELOCK_LAYOUT,
+ false);
+ if (rc)
+ RETURN(rc);
+ }
mutex_lock(&obj->mot_som_mutex);
rc = mo_layout_change(info->mti_env, mdt_object_child(obj), layout);
mutex_unlock(&obj->mot_som_mutex);
- mdt_object_unlock(info, obj, lh, 1);
-out:
+
+ if (rc)
+ mdt_object_unlock(info, obj, lhc, 1);
+
RETURN(rc);
}
}
rc = mdt_getattr_internal(info, child, 0);
- if (unlikely(rc != 0))
+ if (unlikely(rc != 0)) {
mdt_object_unlock(info, child, lhc, 1);
+ RETURN(rc);
+ }
- mdt_pack_secctx_in_reply(info, child);
-
+ rc = mdt_pack_secctx_in_reply(info, child);
+ if (unlikely(rc))
+ mdt_object_unlock(info, child, lhc, 1);
RETURN(rc);
}
mdt_lock_reg_init(lhc, LCK_PR);
if (!(child_bits & MDS_INODELOCK_UPDATE) &&
- mdt_object_exists(child) && !mdt_object_remote(child)) {
+ !mdt_object_remote(child)) {
struct md_attr *ma = &info->mti_attr;
ma->ma_valid = 0;
* lock and this might save us RPC on later STAT. For
* directories, it also let negative dentry cache start
* working for this dir. */
- if (ma->ma_valid & MA_INODE &&
- ma->ma_attr.la_valid & LA_CTIME &&
- info->mti_mdt->mdt_namespace->ns_ctime_age_limit +
- ma->ma_attr.la_ctime < ktime_get_real_seconds())
- child_bits |= MDS_INODELOCK_UPDATE;
- }
+ if (ma->ma_valid & MA_INODE &&
+ ma->ma_attr.la_valid & LA_CTIME &&
+ info->mti_mdt->mdt_namespace->ns_ctime_age_limit +
+ ma->ma_attr.la_ctime < ktime_get_real_seconds())
+ child_bits |= MDS_INODELOCK_UPDATE;
+ }
/* layout lock must be granted in a best-effort way
* for IT operations */
GOTO(out_child, rc);
}
- mdt_pack_secctx_in_reply(info, child);
+ rc = mdt_pack_secctx_in_reply(info, child);
+ if (unlikely(rc)) {
+ mdt_object_unlock(info, child, lhc, 1);
+ GOTO(out_child, rc);
+ }
lock = ldlm_handle2lock(&lhc->mlh_reg_lh);
if (lock) {
PFID(mdt_object_fid(child)));
if (S_ISREG(lu_object_attr(&child->mot_obj)) &&
- mdt_object_exists(child) && !mdt_object_remote(child) &&
- child != parent) {
+ !mdt_object_remote(child) && child != parent) {
mdt_object_put(info->mti_env, child);
rc = mdt_pack_size2body(info, child_fid,
&lhc->mlh_reg_lh);
* object anyway XXX*/
if (lh->mlh_type == MDT_PDO_LOCK &&
lh->mlh_pdo_hash != 0) {
- CDEBUG(D_INFO, "%s: "DFID" convert PDO lock to"
- "EX lock.\n", mdt_obd_name(info->mti_mdt),
+ CDEBUG(D_INFO,
+ "%s: "DFID" convert PDO lock to EX lock.\n",
+ mdt_obd_name(info->mti_mdt),
PFID(mdt_object_fid(o)));
lh->mlh_pdo_hash = 0;
lh->mlh_rreg_mode = LCK_EX;
* If the xid matches, then we know this is a resent request, and allow
* it. (It's probably an OPEN, for which we don't send a lock.
*/
- if (req_can_reconstruct(req, NULL))
+ if (req_can_reconstruct(req, NULL) != 0)
return;
/*
struct ldlm_lock **lockp,
__u64 flags)
{
- struct mdt_lock_handle *lhc = &info->mti_lh[MDT_LH_LAYOUT];
+ struct mdt_lock_handle *lhc = &info->mti_lh[MDT_LH_RMT];
struct md_layout_change layout = { .mlc_opc = MD_LAYOUT_NOP };
struct layout_intent *intent;
+ struct ldlm_reply *ldlm_rep;
struct lu_fid *fid = &info->mti_tmp_fid2;
struct mdt_object *obj = NULL;
int layout_size = 0;
+ struct lu_buf *buf = &layout.mlc_buf;
int rc = 0;
+
ENTRY;
fid_extract_from_res_name(fid, &(*lockp)->l_resource->lr_name);
case LAYOUT_INTENT_RESTORE:
CERROR("%s: Unsupported layout intent opc %d\n",
mdt_obd_name(info->mti_mdt), intent->li_opc);
- rc = -ENOTSUPP;
- break;
+ RETURN(-ENOTSUPP);
default:
CERROR("%s: Unknown layout intent opc %d\n",
mdt_obd_name(info->mti_mdt), intent->li_opc);
- rc = -EINVAL;
- break;
+ RETURN(-EINVAL);
}
- if (rc < 0)
- RETURN(rc);
-
- /* Get lock from request for possible resent case. */
- mdt_intent_fixup_resent(info, *lockp, lhc, flags);
obj = mdt_object_find(info->mti_env, info->mti_mdt, fid);
if (IS_ERR(obj))
- GOTO(out, rc = PTR_ERR(obj));
-
+ RETURN(PTR_ERR(obj));
if (mdt_object_exists(obj) && !mdt_object_remote(obj)) {
/* if layout is going to be changed don't use the current EA
} else {
layout_size = mdt_attr_get_eabuf_size(info, obj);
if (layout_size < 0)
- GOTO(out_obj, rc = layout_size);
+ GOTO(out, rc = layout_size);
if (layout_size > info->mti_mdt->mdt_max_mdsize)
info->mti_mdt->mdt_max_mdsize = layout_size;
* set reply buffer size, so that ldlm_handle_enqueue0()->
* ldlm_lvbo_fill() will fill the reply buffer with lovea.
*/
- (*lockp)->l_lvb_type = LVB_T_LAYOUT;
req_capsule_set_size(info->mti_pill, &RMF_DLM_LVB, RCL_SERVER,
layout_size);
rc = req_capsule_server_pack(info->mti_pill);
if (rc)
- GOTO(out_obj, rc);
+ GOTO(out, rc);
+ ldlm_rep = req_capsule_server_get(info->mti_pill, &RMF_DLM_REP);
+ if (!ldlm_rep)
+ GOTO(out, rc = -EPROTO);
- if (layout.mlc_opc != MD_LAYOUT_NOP) {
- struct lu_buf *buf = &layout.mlc_buf;
+ mdt_set_disposition(info, ldlm_rep, DISP_IT_EXECD);
- /**
- * mdt_layout_change is a reint operation, when the request
- * is resent, layout write shouldn't reprocess it again.
- */
- rc = mdt_check_resent(info, mdt_reconstruct_generic, lhc);
- if (rc)
- GOTO(out_obj, rc = rc < 0 ? rc : 0);
+ /* take lock in ldlm_lock_enqueue() for LAYOUT_INTENT_ACCESS */
+ if (layout.mlc_opc == MD_LAYOUT_NOP)
+ GOTO(out, rc = 0);
- /**
- * There is another resent case: the client's job has been
- * done by another client, referring lod_declare_layout_change
- * -EALREADY case, and it became a operation w/o transaction,
- * so we should not do the layout change, otherwise
- * mdt_layout_change() will try to cancel the granted server
- * CR lock whose remote counterpart is still in hold on the
- * client, and a deadlock ensues.
- */
- rc = mdt_check_resent_lock(info, obj, lhc);
- if (rc <= 0)
- GOTO(out_obj, rc);
-
- buf->lb_buf = NULL;
- buf->lb_len = 0;
- if (unlikely(req_is_replay(mdt_info_req(info)))) {
- buf->lb_buf = req_capsule_client_get(info->mti_pill,
- &RMF_EADATA);
- buf->lb_len = req_capsule_get_size(info->mti_pill,
- &RMF_EADATA, RCL_CLIENT);
- /*
- * If it's a replay of layout write intent RPC, the
- * client has saved the extended lovea when
- * it get reply then.
- */
- if (buf->lb_len > 0)
- mdt_fix_lov_magic(info, buf->lb_buf);
- }
+ rc = mdt_check_resent(info, mdt_reconstruct_generic, lhc);
+ if (rc < 0)
+ GOTO(out, rc);
+ if (rc == 1) {
+ DEBUG_REQ(D_INODE, mdt_info_req(info), "resent opt.");
+ rc = lustre_msg_get_status(mdt_info_req(info)->rq_repmsg);
+ GOTO(out, rc);
+ }
+
+ buf->lb_buf = NULL;
+ buf->lb_len = 0;
+ if (unlikely(req_is_replay(mdt_info_req(info)))) {
+ buf->lb_buf = req_capsule_client_get(info->mti_pill,
+ &RMF_EADATA);
+ buf->lb_len = req_capsule_get_size(info->mti_pill,
+ &RMF_EADATA, RCL_CLIENT);
/*
- * Instantiate some layout components, if @buf contains
- * lovea, then it's a replay of the layout intent write
- * RPC.
+ * If it's a replay of layout write intent RPC, the client has
+ * saved the extended lovea when it get reply then.
*/
- rc = mdt_layout_change(info, obj, &layout);
- if (rc)
- GOTO(out_obj, rc);
+ if (buf->lb_len > 0)
+ mdt_fix_lov_magic(info, buf->lb_buf);
}
-out_obj:
- mdt_object_put(info->mti_env, obj);
- if (rc == 0 && lustre_handle_is_used(&lhc->mlh_reg_lh))
+ /* Get lock from request for possible resent case. */
+ mdt_intent_fixup_resent(info, *lockp, lhc, flags);
+ (*lockp)->l_lvb_type = LVB_T_LAYOUT;
+
+ /*
+ * Instantiate some layout components, if @buf contains lovea, then it's
+ * a replay of the layout intent write RPC.
+ */
+ rc = mdt_layout_change(info, obj, lhc, &layout);
+ ldlm_rep->lock_policy_res2 = clear_serious(rc);
+
+ if (lustre_handle_is_used(&lhc->mlh_reg_lh)) {
rc = mdt_intent_lock_replace(info, lockp, lhc, flags, rc);
+ if (rc == ELDLM_LOCK_REPLACED &&
+ (*lockp)->l_granted_mode == LCK_EX)
+ ldlm_lock_mode_downgrade(*lockp, LCK_CR);
+ }
+ EXIT;
out:
- lhc->mlh_reg_lh.cookie = 0;
-
- RETURN(rc);
+ mdt_object_put(info->mti_env, obj);
+ return rc;
}
static int mdt_intent_open(enum ldlm_intent_flags it_opc,
break;
case IT_GETATTR:
check_mdt_object = true;
+ /* fallthrough */
case IT_LOOKUP:
it_format = &RQF_LDLM_INTENT_GETATTR;
it_handler = &mdt_intent_getattr;
m->mdt_enable_striped_dir = 1;
m->mdt_enable_dir_migration = 1;
m->mdt_enable_remote_dir_gid = 0;
+ m->mdt_enable_chprojid_gid = 0;
m->mdt_enable_remote_rename = 1;
atomic_set(&m->mdt_mds_mds_conns, 0);
static int mdt_export_cleanup(struct obd_export *exp)
{
- struct list_head closing_list;
+ LIST_HEAD(closing_list);
struct mdt_export_data *med = &exp->exp_mdt_data;
struct obd_device *obd = exp->exp_obd;
struct mdt_device *mdt;
int rc = 0;
ENTRY;
- INIT_LIST_HEAD(&closing_list);
spin_lock(&med->med_open_lock);
while (!list_empty(&med->med_open_head)) {
struct list_head *tmp = med->med_open_head.next;
static inline void mdt_enable_slc(struct mdt_device *mdt)
{
- if (mdt->mdt_lut.lut_sync_lock_cancel == NEVER_SYNC_ON_CANCEL)
- mdt->mdt_lut.lut_sync_lock_cancel = BLOCKING_SYNC_ON_CANCEL;
+ if (mdt->mdt_lut.lut_sync_lock_cancel == SYNC_LOCK_CANCEL_NEVER)
+ mdt->mdt_lut.lut_sync_lock_cancel = SYNC_LOCK_CANCEL_BLOCKING;
}
static inline void mdt_disable_slc(struct mdt_device *mdt)
{
- if (mdt->mdt_lut.lut_sync_lock_cancel == BLOCKING_SYNC_ON_CANCEL)
- mdt->mdt_lut.lut_sync_lock_cancel = NEVER_SYNC_ON_CANCEL;
+ if (mdt->mdt_lut.lut_sync_lock_cancel == SYNC_LOCK_CANCEL_BLOCKING)
+ mdt->mdt_lut.lut_sync_lock_cancel = SYNC_LOCK_CANCEL_NEVER;
}
static int mdt_obd_disconnect(struct obd_export *exp)
exp->exp_connecting = 1;
spin_unlock(&exp->exp_lock);
+ OBD_ALLOC(exp->exp_used_slots,
+ BITS_TO_LONGS(OBD_MAX_RIF_MAX) * sizeof(long));
+ if (exp->exp_used_slots == NULL)
+ RETURN(-ENOMEM);
+
/* self-export doesn't need client data and ldlm initialization */
if (unlikely(obd_uuid_equals(&exp->exp_obd->obd_uuid,
&exp->exp_client_uuid)))
err_free:
tgt_client_free(exp);
err:
+ OBD_FREE(exp->exp_used_slots,
+ BITS_TO_LONGS(OBD_MAX_RIF_MAX) * sizeof(long));
+ exp->exp_used_slots = NULL;
+
CERROR("%s: Failed to initialize export: rc = %d\n",
exp->exp_obd->obd_name, rc);
return rc;
ENTRY;
target_destroy_export(exp);
+ if (exp->exp_used_slots)
+ OBD_FREE(exp->exp_used_slots,
+ BITS_TO_LONGS(OBD_MAX_RIF_MAX) * sizeof(long));
+
/* destroy can be called from failed obd_setup, so
* checking uuid is safer than obd_self_export */
if (unlikely(obd_uuid_equals(&exp->exp_obd->obd_uuid,
return rc;
}
-static struct obd_ops mdt_obd_device_ops = {
+static const struct obd_ops mdt_obd_device_ops = {
.o_owner = THIS_MODULE,
.o_set_info_async = mdt_obd_set_info_async,
.o_connect = mdt_obd_connect,