*/
/*
* This file is part of Lustre, http://www.lustre.org/
- * Lustre is a trademark of Sun Microsystems, Inc.
*
* lustre/mdt/mdt_open.c
*
struct mdt_object *obj,
struct md_attr *ma)
{
- LASSERT(ma->ma_lmv == NULL && ma->ma_lmm == NULL);
+ if (ma->ma_lmv || ma->ma_lmm) {
+ CDEBUG(D_INFO, DFID " %s already set.\n",
+ PFID(mdt_object_fid(obj)),
+ ma->ma_lmv ? (ma->ma_lmm ? "ma_lmv and ma_lmm"
+ : "ma_lmv")
+ : "ma_lmm");
+ return;
+ }
+
if (S_ISDIR(obj->mot_header.loh_attr)) {
ma->ma_lmv = req_capsule_server_get(info->mti_pill,
&RMF_MDT_MD);
mdt_set_disposition(info, rep, (DISP_IT_EXECD |
DISP_LOOKUP_EXECD |
DISP_LOOKUP_POS));
+ if ((open_flags & MDS_OPEN_EXCL) &&
+ (open_flags & MDS_OPEN_CREAT))
+ mdt_set_disposition(info, rep,
+ DISP_OPEN_CREATE);
+
mdt_prep_ma_buf_from_rep(info, o, ma);
rc = mdt_attr_get_complex(info, o, ma);
if (rc == 0)
bool try_layout = false;
bool create_layout = false;
int rc = 0;
- int dom_stripes = LMM_NO_DOM;
- bool dom_lock = false;
+ __u32 dom_stripe = 0;
+ unsigned int dom_only = 0;
+ unsigned int dom_lock = 0;
ENTRY;
ma->ma_need & MA_LOV)
try_layout = true;
- /* DoM files can have just MDT stripe or combined MDT + OST
- * stripes.
- * - In the first case the open for read/write will do IO to
- * the MDT stripe and it makes sense to take IO lock in
- * advance along with OPEN even if it is blocking lock.
- * - In the second case it is just size of MDT stripe and it
- * is quite unlikely that client will write into it, though
- * it may read it. So IO lock will be taken optionally if it
- * is non-blocking one.
+ /* DoM files can take IO lock at OPEN when it makes sense,
+ * check if file has DoM stripe and ask for lock if client
+ * no lock on that resource yet.
*/
if (ma->ma_valid & MA_LOV && ma->ma_lmm != NULL)
- dom_stripes = mdt_lmm_dom_entry(ma->ma_lmm);
-
- if (dom_stripes == LMM_DOM_ONLY &&
- info->mti_mdt->mdt_opts.mo_dom_lock > 0 &&
+ dom_stripe = mdt_lmm_dom_entry_check(ma->ma_lmm,
+ &dom_only);
+ /* If only DOM stripe is being used then we can expect IO
+ * to it after OPEN and will return corresponding DOM ibit
+ * using default strategy from mdt_opts.mo_dom_lock.
+ * Otherwise trylock mode is used always and DOM ibit will
+ * be returned optionally.
+ */
+ if (dom_stripe &&
!mdt_dom_client_has_lock(info, mdt_object_fid(obj)))
- dom_lock = true;
+ dom_lock = !dom_only ? TRYLOCK_DOM_ON_OPEN :
+ info->mti_mdt->mdt_opts.mo_dom_lock;
}
if (acq_lease) {
GOTO(out, rc = -EPROTO);
}
- /* XXX: only exclusive open is supported. */
- lm = LCK_EX;
+ /* should conflict with new opens for write/execute */
+ lm = LCK_PW;
*ibits = MDS_INODELOCK_OPEN;
/* never grant LCK_EX layout lock to client */
lhc = &info->mti_lh[MDT_LH_LOCAL];
} else if (dom_lock) {
lm = (open_flags & MDS_FMODE_WRITE) ? LCK_PW : LCK_PR;
- if (info->mti_mdt->mdt_opts.mo_dom_lock ==
- TRYLOCK_DOM_ON_OPEN) {
- trybits |= MDS_INODELOCK_DOM |
- MDS_INODELOCK_LAYOUT;
- } else {
- /* mo_dom_lock == ALWAYS_DOM_LOCK_ON_OPEN */
- *ibits = MDS_INODELOCK_DOM;
- if (info->mti_mdt->mdt_opts.mo_dom_read_open) {
- trybits |= MDS_INODELOCK_LAYOUT;
- }
- }
+ trybits |= MDS_INODELOCK_DOM | MDS_INODELOCK_LAYOUT;
}
CDEBUG(D_INODE, "normal open:"DFID" lease count: %d, lm: %d\n",
PFID(mdt_object_fid(obj)),
atomic_read(&obj->mot_open_count), open_count);
- if (atomic_read(&obj->mot_open_count) > open_count)
- GOTO(out, rc = -EBUSY);
+ if (atomic_read(&obj->mot_open_count) > open_count) {
+ /* fail if anyone *else* has opened file for write */
+ if (mdt_write_read(obj) > 1)
+ GOTO(out, rc = -EBUSY);
+ }
}
GOTO(out, rc);
if (unlikely(rc))
GOTO(out, rc);
+ rc = mdt_pack_encctx_in_reply(info, o);
+ if (unlikely(rc))
+ GOTO(out, rc);
+
rc = mdt_finish_open(info, NULL, o, open_flags, 0, rep);
} else {
/*
struct ptlrpc_request *req = mdt_info_req(info);
struct mdt_object *parent;
struct mdt_object *child;
- struct mdt_lock_handle *lh;
+ struct mdt_lock_handle *lh = NULL;
struct ldlm_reply *ldlm_rep;
struct mdt_body *repbody;
struct lu_fid *child_fid = &info->mti_tmp_fid1;
int result, rc;
int created = 0;
int object_locked = 0;
+ enum ldlm_mode lock_mode = LCK_PR;
u32 msg_flags;
+ ktime_t kstart = ktime_get();
ENTRY;
OBD_FAIL_TIMEOUT_ORSET(OBD_FAIL_MDS_PAUSE_OPEN, OBD_FAIL_ONCE,
(obd_timeout + 1) / 4);
- mdt_counter_incr(req, LPROC_MDT_OPEN);
repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
ma->ma_need = MA_INODE;
if (result < 0)
GOTO(out, result);
-again:
- lh = &info->mti_lh[MDT_LH_PARENT];
- mdt_lock_pdo_init(lh, (open_flags & MDS_OPEN_CREAT) ? LCK_PW : LCK_PR,
- &rr->rr_name);
-
parent = mdt_object_find(info->mti_env, mdt, rr->rr_fid1);
if (IS_ERR(parent))
GOTO(out, result = PTR_ERR(parent));
- result = mdt_object_lock(info, parent, lh, MDS_INODELOCK_UPDATE);
- if (result != 0) {
+ /* get and check version of parent */
+ result = mdt_version_get_check(info, parent, 0);
+ if (result) {
mdt_object_put(info->mti_env, parent);
GOTO(out, result);
}
- /* get and check version of parent */
- result = mdt_version_get_check(info, parent, 0);
- if (result)
- GOTO(out_parent, result);
-
+ OBD_RACE(OBD_FAIL_MDS_REINT_OPEN);
+again_pw:
fid_zero(child_fid);
- result = -ENOENT;
- if ((open_flags & MDS_OPEN_VOLATILE) == 0)
+ if (open_flags & MDS_OPEN_VOLATILE) {
+ lh = NULL;
+ result = -ENOENT;
+ } else {
+ lh = &info->mti_lh[MDT_LH_PARENT];
+ mdt_lock_pdo_init(lh, lock_mode, &rr->rr_name);
+ result = mdt_object_lock(info, parent, lh, MDS_INODELOCK_UPDATE);
+ if (result != 0) {
+ mdt_object_put(info->mti_env, parent);
+ GOTO(out, result);
+ }
+
result = mdo_lookup(info->mti_env, mdt_object_child(parent),
&rr->rr_name, child_fid, &info->mti_spec);
+ }
LASSERTF(ergo(result == 0, fid_is_sane(child_fid)),
"looking for "DFID"/"DNAME", found FID = "DFID"\n",
PFID(mdt_object_fid(parent)), PNAME(&rr->rr_name),
PFID(child_fid));
- if (result != 0 && result != -ENOENT && result != -ESTALE)
+ if (result != 0 && result != -ENOENT)
GOTO(out_parent, result);
- if (result == -ENOENT || result == -ESTALE) {
- /* If the object is dead, let's check if the object
- * is being migrated to a new object */
- if (result == -ESTALE) {
- struct lu_buf lmv_buf;
-
- lmv_buf.lb_buf = info->mti_xattr_buf;
- lmv_buf.lb_len = sizeof(info->mti_xattr_buf);
- rc = mo_xattr_get(info->mti_env,
- mdt_object_child(parent),
- &lmv_buf, XATTR_NAME_LMV);
- if (rc > 0) {
- struct lmv_mds_md_v1 *lmv;
-
- lmv = lmv_buf.lb_buf;
- if (le32_to_cpu(lmv->lmv_hash_type) &
- LMV_HASH_FLAG_MIGRATION) {
- /* Get the new parent FID and retry */
- mdt_object_unlock_put(info, parent,
- lh, 1);
- mdt_lock_handle_init(lh);
- fid_le_to_cpu(
- (struct lu_fid *)rr->rr_fid1,
- &lmv->lmv_stripe_fids[1]);
- goto again;
- }
- }
- }
+ OBD_RACE(OBD_FAIL_MDS_REINT_OPEN2);
+ if (result == -ENOENT) {
mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_NEG);
- if (result == -ESTALE) {
- /*
- * -ESTALE means the parent is a dead(unlinked) dir, so
- * it should return -ENOENT to in accordance with the
- * original mds implementaion.
- */
- GOTO(out_parent, result = -ENOENT);
- }
-
if (!(open_flags & MDS_OPEN_CREAT))
GOTO(out_parent, result);
if (mdt_rdonly(req->rq_export))
GOTO(out_parent, result = -EROFS);
+
+ LASSERT(equi(lh == NULL, open_flags & MDS_OPEN_VOLATILE));
+
+ if (lh != NULL && lock_mode == LCK_PR) {
+ /* first pass: get write lock and restart */
+ mdt_object_unlock(info, parent, lh, 1);
+ mdt_clear_disposition(info, ldlm_rep, DISP_LOOKUP_NEG);
+ mdt_lock_handle_init(lh);
+ lock_mode = LCK_PW;
+ goto again_pw;
+ }
+
*child_fid = *info->mti_rr.rr_fid2;
LASSERTF(fid_is_sane(child_fid), "fid="DFID"\n",
PFID(child_fid));
/* Not found and with MDS_OPEN_CREAT: let's create it. */
mdt_set_disposition(info, ldlm_rep, DISP_OPEN_CREATE);
- /* Let lower layers know what is lock mode on directory. */
- info->mti_spec.sp_cr_mode =
- mdt_dlm_mode2mdl_mode(lh->mlh_pdo_mode);
-
/* Don't do lookup sanity check. We know name doesn't exist. */
info->mti_spec.sp_cr_lookup = 0;
info->mti_spec.sp_feat = &dt_directory_features;
GOTO(out_child, result);
}
created = 1;
- mdt_counter_incr(req, LPROC_MDT_MKNOD);
+ mdt_counter_incr(req, LPROC_MDT_MKNOD,
+ ktime_us_delta(ktime_get(), kstart));
} else {
/*
* The object is on remote node, return its FID for remote open.
result = -MDT_EREMOTE_OPEN;
GOTO(out_child, result);
} else if (mdt_object_exists(child)) {
+ /* Check early for MDS_OPEN_DIRECTORY/O_DIRECTORY to
+ * avoid opening regular files from lfs getstripe
+ * since doing so breaks the leases used by lfs
+ * mirror. See LU-13693. */
+ if (open_flags & MDS_OPEN_DIRECTORY &&
+ S_ISREG(lu_object_attr(&child->mot_obj)))
+ GOTO(out_child, result = -ENOTDIR);
+
/* We have to get attr & LOV EA & HSM for this
* object. */
mdt_prep_ma_buf_from_rep(info, child, ma);
}
}
+ repbody->mbo_max_mdsize = info->mti_mdt->mdt_max_mdsize;
+ repbody->mbo_valid |= OBD_MD_FLMODEASIZE;
+
rc = mdt_pack_secctx_in_reply(info, child);
if (unlikely(rc))
GOTO(out_child, result = rc);
+ rc = mdt_pack_encctx_in_reply(info, child);
+ if (unlikely(rc))
+ GOTO(out_child, result = rc);
+
rc = mdt_check_resent_lock(info, child, lhc);
if (rc < 0) {
GOTO(out_child, result = rc);
mdt_clear_disposition(info, ldlm_rep, DISP_OPEN_CREATE);
}
}
+
+ mdt_counter_incr(req, LPROC_MDT_OPEN,
+ ktime_us_delta(ktime_get(), kstart));
+
EXIT;
out_child_unlock:
if (object_locked)
if (result == 0)
mdt_pack_size2body(info, child_fid, &lhc->mlh_reg_lh);
out_parent:
- mdt_object_unlock_put(info, parent, lh, result || !created);
+ if (lh != NULL)
+ mdt_object_unlock(info, parent, lh, result || !created);
+
+ mdt_object_put(info->mti_env, parent);
out:
if (result)
lustre_msg_set_transno(req->rq_repmsg, 0);
.ln_namelen = sizeof("i_am_nobody") - 1,
};
struct lu_ucred *uc;
- cfs_cap_t uc_cap_save;
+ kernel_cap_t uc_cap_save;
int rc;
ENTRY;
spec->sp_cr_lookup = 0;
spec->sp_feat = &dt_directory_features;
- spec->sp_cr_mode = MDL_MINMODE; /* no lock */
spec->sp_cr_flags = MDS_OPEN_VOLATILE | fmode;
if (attr->ma_valid & MA_LOV) {
spec->u.sp_ea.eadata = attr->ma_lmm;
uc = lu_ucred(env);
uc_cap_save = uc->uc_cap;
- uc->uc_cap |= 1 << CFS_CAP_DAC_OVERRIDE;
+ cap_raise(uc->uc_cap, CAP_DAC_OVERRIDE);
rc = mdo_create(env, mdt_object_child(local_root), &lname,
mdt_object_child(obj), spec, attr);
uc->uc_cap = uc_cap_save;
struct mdt_object *orphan;
struct md_attr *orp_ma;
struct lu_buf *buf;
- cfs_cap_t cap;
+ kernel_cap_t cap;
bool lease_broken;
int rc;
int rc2;
/* The orphan has root ownership so we need to raise
* CAP_FOWNER to set the HSM attributes. */
cap = uc->uc_cap;
- uc->uc_cap |= MD_CAP_TO_MASK(CFS_CAP_FOWNER);
+ cap_raise(uc->uc_cap, CAP_FOWNER);
rc = mo_xattr_set(info->mti_env, mdt_object_child(orphan), buf,
XATTR_NAME_HSM, 0);
uc->uc_cap = cap;
}
out_reprocess:
- ldlm_reprocess_all(lease->l_resource, lease);
+ ldlm_reprocess_all(lease->l_resource,
+ lease->l_policy_data.l_inodebits.bits);
LDLM_LOCK_PUT(lease);
ma->ma_valid = 0;
struct mdt_lock_handle *lh2 = &info->mti_lh[MDT_LH_OLD];
struct close_data *data;
struct ldlm_lock *lease;
- struct mdt_object *o1 = o, *o2;
+ struct mdt_object *o1 = o, *o2 = NULL;
bool lease_broken;
- bool swap_objects;
+ bool swap_objects = false;
int rc;
ENTRY;
RETURN(-EINVAL);
rc = lu_fid_cmp(&data->cd_fid, mdt_object_fid(o));
- if (unlikely(rc == 0))
- RETURN(-EINVAL);
+ if (rc == 0) {
+ /**
+ * only MDS_CLOSE_LAYOUT_SPLIT use the same fid to indicate
+ * mirror deletion, so we'd zero cd_fid, and keeps o2 be NULL.
+ */
+ if (!(ma->ma_attr_flags & MDS_CLOSE_LAYOUT_SPLIT))
+ RETURN(-EINVAL);
- /* Exchange o1 and o2, to enforce locking order */
- swap_objects = (rc < 0);
+ /* zero cd_fid to keeps o2 be NULL */
+ fid_zero(&data->cd_fid);
+ } else if (rc < 0) {
+ /* Exchange o1 and o2, to enforce locking order */
+ swap_objects = true;
+ }
lease = ldlm_handle2lock(&data->cd_handle);
if (lease == NULL)
RETURN(-ESTALE);
- o2 = mdt_object_find(info->mti_env, info->mti_mdt, &data->cd_fid);
- if (IS_ERR(o2))
- GOTO(out_lease, rc = PTR_ERR(o2));
+ if (!fid_is_zero(&data->cd_fid)) {
+ o2 = mdt_object_find(info->mti_env, info->mti_mdt,
+ &data->cd_fid);
+ if (IS_ERR(o2))
+ GOTO(out_lease, rc = PTR_ERR(o2));
- if (!S_ISREG(lu_object_attr(&o2->mot_obj))) {
- swap_objects = false; /* not swapped yet */
- GOTO(out_obj, rc = -EINVAL);
- }
+ if (!mdt_object_exists(o2))
+ GOTO(out_obj, rc = -ENOENT);
- if (swap_objects)
- swap(o1, o2);
+ if (!S_ISREG(lu_object_attr(&o2->mot_obj)))
+ GOTO(out_obj, rc = -EINVAL);
+
+ if (swap_objects)
+ swap(o1, o2);
+ }
rc = mo_permission(info->mti_env, NULL, mdt_object_child(o1), NULL,
MAY_WRITE);
if (rc < 0)
GOTO(out_obj, rc);
- rc = mo_permission(info->mti_env, NULL, mdt_object_child(o2), NULL,
- MAY_WRITE);
- if (rc < 0)
- GOTO(out_obj, rc);
+ if (o2) {
+ rc = mo_permission(info->mti_env, NULL, mdt_object_child(o2),
+ NULL, MAY_WRITE);
+ if (rc < 0)
+ GOTO(out_obj, rc);
+ }
/* try to hold open_sem so that nobody else can open the file */
if (!down_write_trylock(&o->mot_open_sem)) {
if (rc < 0)
GOTO(out_unlock_sem, rc);
- mdt_lock_reg_init(lh2, LCK_EX);
- rc = mdt_object_lock(info, o2, lh2, MDS_INODELOCK_LAYOUT |
- MDS_INODELOCK_XATTR);
- if (rc < 0)
- GOTO(out_unlock1, rc);
+ if (o2) {
+ mdt_lock_reg_init(lh2, LCK_EX);
+ rc = mdt_object_lock(info, o2, lh2, MDS_INODELOCK_LAYOUT |
+ MDS_INODELOCK_XATTR);
+ if (rc < 0)
+ GOTO(out_unlock1, rc);
+ }
/* Swap layout with orphan object */
if (ma->ma_attr_flags & MDS_CLOSE_LAYOUT_SWAP) {
struct lu_buf *buf = &info->mti_buf;
struct md_rejig_data mrd;
- mrd.mrd_obj = mdt_object_child(o == o1 ? o2 : o1);
- if (ma->ma_attr_flags & MDS_CLOSE_LAYOUT_SPLIT)
+ if (o2) {
+ mrd.mrd_obj = mdt_object_child(o == o1 ? o2 : o1);
+ } else {
+ if (!(ma->ma_attr_flags & MDS_CLOSE_LAYOUT_SPLIT)) {
+ /* paranoid check again */
+ CERROR(DFID
+ ":only mirror split support NULL o2 object\n",
+ PFID(mdt_object_fid(o)));
+ GOTO(out_unlock1, rc = -EINVAL);
+ }
+
+ /* set NULL mrd_obj for deleting mirror objects */
+ mrd.mrd_obj = NULL;
+ }
+
+ if (ma->ma_attr_flags & MDS_CLOSE_LAYOUT_SPLIT) {
mrd.mrd_mirror_id = data->cd_mirror_id;
+ /* set a small enough blocks in the SoM */
+ ma->ma_attr.la_blocks >>= 1;
+ }
buf->lb_len = sizeof(mrd);
buf->lb_buf = &mrd;
XATTR_LUSTRE_LOV,
ma->ma_attr_flags & MDS_CLOSE_LAYOUT_SPLIT ?
LU_XATTR_SPLIT : LU_XATTR_MERGE);
- if (rc == 0 && ma->ma_attr.la_valid & (LA_SIZE | LA_BLOCKS)) {
+ if (rc == 0 && ma->ma_attr.la_valid & (LA_SIZE | LA_BLOCKS |
+ LA_LSIZE | LA_LBLOCKS)) {
int rc2;
+ enum lustre_som_flags lsf;
+
+ if (ma->ma_attr.la_valid & (LA_SIZE | LA_BLOCKS))
+ lsf = SOM_FL_STRICT;
+ else
+ lsf = SOM_FL_LAZY;
mutex_lock(&o->mot_som_mutex);
- rc2 = mdt_set_som(info, o, SOM_FL_STRICT,
+ rc2 = mdt_set_som(info, o, lsf,
ma->ma_attr.la_size,
ma->ma_attr.la_blocks);
mutex_unlock(&o->mot_som_mutex);
out_unlock2:
/* Release exclusive LL */
- mdt_object_unlock(info, o2, lh2, 1);
+ if (o2)
+ mdt_object_unlock(info, o2, lh2, 1);
out_unlock1:
mdt_object_unlock(info, o1, lh1, 1);
}
out_obj:
- mdt_object_put(info->mti_env, swap_objects ? o1 : o2);
+ if (o1 != o)
+ /* the 2nd object has been used, and swapped to o1 */
+ mdt_object_put(info->mti_env, o1);
+ else if (o2)
+ /* the 2nd object has been used, and not swapped */
+ mdt_object_put(info->mti_env, o2);
- ldlm_reprocess_all(lease->l_resource, lease);
+ ldlm_reprocess_all(lease->l_resource,
+ lease->l_policy_data.l_inodebits.bits);
out_lease:
LDLM_LOCK_PUT(lease);
if (data == NULL)
RETURN(-EPROTO);
- if (ptlrpc_req_need_swab(mdt_info_req(info)))
+ if (req_capsule_req_need_swab(info->mti_pill))
lustre_swab_close_data_resync_done(&data->cd_resync);
if (!fid_is_zero(&data->cd_fid))
RCL_CLIENT))
GOTO(out_unlock, rc = -EPROTO);
- OBD_ALLOC(resync_ids, resync_count * sizeof(__u32));
+ OBD_ALLOC_PTR_ARRAY(resync_ids, resync_count);
if (!resync_ids)
GOTO(out_unlock, rc = -ENOMEM);
}
if (resync_ids)
- OBD_FREE(resync_ids, resync_count * sizeof(__u32));
+ OBD_FREE_PTR_ARRAY(resync_ids, resync_count);
out_reprocess:
- ldlm_reprocess_all(lease->l_resource, lease);
+ ldlm_reprocess_all(lease->l_resource,
+ lease->l_policy_data.l_inodebits.bits);
LDLM_LOCK_PUT(lease);
ma->ma_valid = 0;
ma->ma_valid = MA_INODE;
ma->ma_attr_flags |= MDS_CLOSE_UPDATE_TIMES;
ma->ma_attr.la_valid &= (LA_ATIME | LA_MTIME | LA_CTIME);
+
+ if (ma->ma_attr.la_valid & LA_MTIME) {
+ rc = mdt_attr_get_pfid(info, o, &ma->ma_pfid);
+ if (!rc)
+ ma->ma_valid |= MA_PFID;
+ }
+
rc = mo_attr_set(info->mti_env, next, ma);
}
struct ptlrpc_request *req = tgt_ses_req(tsi);
struct md_attr *ma = &info->mti_attr;
struct mdt_body *repbody = NULL;
+ ktime_t kstart = ktime_get();
int rc, ret = 0;
ENTRY;
- mdt_counter_incr(req, LPROC_MDT_CLOSE);
/* Close may come with the Size-on-MDS update. Unpack it. */
rc = mdt_close_unpack(info);
if (rc)
tsi->tsi_reply_fail_id = OBD_FAIL_MDS_CLOSE_NET_REP;
out:
mdt_thread_info_fini(info);
+ if (rc == 0)
+ mdt_counter_incr(req, LPROC_MDT_CLOSE,
+ ktime_us_delta(ktime_get(), kstart));
RETURN(rc ? rc : ret);
}