RETURN(0);
LASSERT(info->mti_mos[index]);
- LASSERT(mdt_object_exists(info->mti_mos[index]));
+ if (mdt_object_exists(info->mti_mos[index]) == 0)
+ RETURN(-ESTALE);
mo = mdt_object_child(info->mti_mos[index]);
curr_version = mo_version_get(info->mti_env, mo);
/** Sanity check for malformed buffers */
if (pre_versions == NULL) {
CERROR("No versions in request buffer\n");
- spin_lock(&req->rq_export->exp_lock);
+ cfs_spin_lock(&req->rq_export->exp_lock);
req->rq_export->exp_vbr_failed = 1;
- spin_unlock(&req->rq_export->exp_lock);
+ cfs_spin_unlock(&req->rq_export->exp_lock);
RETURN(-EOVERFLOW);
} else if (pre_versions[index] != curr_version) {
CDEBUG(D_INODE, "Version mismatch "LPX64" != "LPX64"\n",
pre_versions[index], curr_version);
- spin_lock(&req->rq_export->exp_lock);
+ cfs_spin_lock(&req->rq_export->exp_lock);
req->rq_export->exp_vbr_failed = 1;
- spin_unlock(&req->rq_export->exp_lock);
+ cfs_spin_unlock(&req->rq_export->exp_lock);
RETURN(-EOVERFLOW);
}
}
* the client holds a lock already.
* We use the ATTR_FROM_OPEN (translated into MRF_SETATTR_LOCKED by
* mdt_setattr_unpack()) flag to tell these cases apart. */
-int mdt_attr_set(struct mdt_thread_info *info, struct mdt_object *mo, int flags)
+int mdt_attr_set(struct mdt_thread_info *info, struct mdt_object *mo,
+ struct md_attr *ma, int flags)
{
- struct md_attr *ma = &info->mti_attr;
struct mdt_lock_handle *lh;
- int som_update = 0;
int do_vbr = ma->ma_attr.la_valid & (LA_MODE|LA_UID|LA_GID|LA_FLAGS);
int rc;
ENTRY;
/* attr shouldn't be set on remote object */
LASSERT(mdt_object_exists(mo) >= 0);
- if (exp_connect_som(info->mti_exp) && info->mti_ioepoch)
- som_update = (info->mti_ioepoch->flags & MF_SOM_CHANGE);
-
- /* Try to avoid object_lock if another epoch has been started
- * already. */
- if (som_update && (info->mti_ioepoch->ioepoch != mo->mot_ioepoch))
- RETURN(0);
-
lh = &info->mti_lh[MDT_LH_PARENT];
mdt_lock_reg_init(lh, LCK_PW);
RETURN(rc);
}
- /* Setattrs are syncronized through dlm lock taken above. If another
- * epoch started, its attributes may be already flushed on disk,
- * skip setattr. */
- if (som_update && (info->mti_ioepoch->ioepoch != mo->mot_ioepoch))
- GOTO(out_unlock, rc = 0);
-
if (mdt_object_exists(mo) == 0)
GOTO(out_unlock, rc = -ENOENT);
if (rc != 0)
GOTO(out_unlock, rc);
- /* Re-enable SIZEONMDS. */
- if (som_update) {
- CDEBUG(D_INODE, "Closing epoch "LPU64" on "DFID" size "LPU64
- ". Count %d\n", mo->mot_ioepoch,
- PFID(mdt_object_fid(mo)), ma->ma_attr.la_size,
- mo->mot_ioepoch_count);
- mdt_object_som_enable(info, mo);
- }
-
EXIT;
out_unlock:
mdt_object_unlock(info, mo, lh, rc);
struct mdt_object *mo;
struct md_object *next;
struct mdt_body *repbody;
- int rc;
+ int som_au, rc;
ENTRY;
DEBUG_REQ(D_INODE, req, "setattr "DFID" %x", PFID(rr->rr_fid1),
if ((ma->ma_attr.la_valid & LA_SIZE) ||
(rr->rr_flags & MRF_SETATTR_LOCKED)) {
/* Check write access for the O_TRUNC case */
- if (mdt_write_read(info->mti_mdt, mo) < 0)
+ if (mdt_write_read(mo) < 0)
GOTO(out_put, rc = -ETXTBSY);
}
} else if (info->mti_ioepoch &&
(info->mti_ioepoch->flags & MF_EPOCH_OPEN)) {
- /* Truncate case. */
- rc = mdt_write_get(info->mti_mdt, mo);
+ /* Truncate case. IOEpoch is opened. */
+ rc = mdt_write_get(mo);
if (rc)
GOTO(out_put, rc);
mfd = mdt_mfd_new();
- if (mfd == NULL)
+ if (mfd == NULL) {
+ mdt_write_put(mo);
GOTO(out_put, rc = -ENOMEM);
+ }
- mdt_ioepoch_open(info, mo);
+ mdt_ioepoch_open(info, mo, 0);
repbody->ioepoch = mo->mot_ioepoch;
mdt_object_get(info->mti_env, mo);
mfd->mfd_object = mo;
mfd->mfd_xid = req->rq_xid;
- spin_lock(&med->med_open_lock);
- list_add(&mfd->mfd_list, &med->med_open_head);
- spin_unlock(&med->med_open_lock);
+ cfs_spin_lock(&med->med_open_lock);
+ cfs_list_add(&mfd->mfd_list, &med->med_open_head);
+ cfs_spin_unlock(&med->med_open_lock);
repbody->handle.cookie = mfd->mfd_handle.h_cookie;
}
- if (info->mti_ioepoch && (info->mti_ioepoch->flags & MF_SOM_CHANGE))
- ma->ma_attr_flags |= MDS_PERM_BYPASS | MDS_SOM;
-
- rc = mdt_attr_set(info, mo, rr->rr_flags);
- if (rc)
- GOTO(out_put, rc);
-
- if (info->mti_ioepoch && (info->mti_ioepoch->flags & MF_SOM_CHANGE)) {
+ som_au = info->mti_ioepoch && info->mti_ioepoch->flags & MF_SOM_CHANGE;
+ if (som_au) {
+ /* SOM Attribute update case. Find the proper mfd and update
+ * SOM attributes on the proper object. */
LASSERT(mdt_conn_flags(info) & OBD_CONNECT_SOM);
LASSERT(info->mti_ioepoch);
- spin_lock(&med->med_open_lock);
- /* Size-on-MDS Update. Find and free mfd. */
+ cfs_spin_lock(&med->med_open_lock);
mfd = mdt_handle2mfd(info, &info->mti_ioepoch->handle);
if (mfd == NULL) {
- spin_unlock(&med->med_open_lock);
- CDEBUG(D_INODE | D_ERROR, "no handle for file close: "
- "fid = "DFID": cookie = "LPX64"\n",
- PFID(info->mti_rr.rr_fid1),
- info->mti_ioepoch->handle.cookie);
+ cfs_spin_unlock(&med->med_open_lock);
+ CDEBUG(D_INODE, "no handle for file close: "
+ "fid = "DFID": cookie = "LPX64"\n",
+ PFID(info->mti_rr.rr_fid1),
+ info->mti_ioepoch->handle.cookie);
GOTO(out_put, rc = -ESTALE);
}
LASSERT(mfd->mfd_mode == FMODE_SOM);
LASSERT(!(info->mti_ioepoch->flags & MF_EPOCH_CLOSE));
class_handle_unhash(&mfd->mfd_handle);
- list_del_init(&mfd->mfd_list);
- spin_unlock(&med->med_open_lock);
+ cfs_list_del_init(&mfd->mfd_list);
+ cfs_spin_unlock(&med->med_open_lock);
+
+ /* Close the found mfd, update attributes. */
+ ma->ma_lmm_size = info->mti_mdt->mdt_max_mdsize;
+ OBD_ALLOC(ma->ma_lmm, ma->ma_lmm_size);
+ if (ma->ma_lmm == NULL)
+ GOTO(out_put, rc = -ENOMEM);
mdt_mfd_close(info, mfd);
+
+ OBD_FREE(ma->ma_lmm, ma->ma_lmm_size);
+ } else {
+ rc = mdt_attr_set(info, mo, ma, rr->rr_flags);
+ if (rc)
+ GOTO(out_put, rc);
}
ma->ma_need = MA_INODE;
if (info->mti_mdt->mdt_opts.mo_oss_capa &&
info->mti_exp->exp_connect_flags & OBD_CONNECT_OSS_CAPA &&
S_ISREG(lu_object_attr(&mo->mot_obj.mo_lu)) &&
- (ma->ma_attr.la_valid & LA_SIZE)) {
+ (ma->ma_attr.la_valid & LA_SIZE) && !som_au) {
struct lustre_capa *capa;
capa = req_capsule_server_get(info->mti_pill, &RMF_CAPA2);
mdt_object_unlock_put(info, mp, lhp, rc);
return rc;
}
+/**
+ * lock the part of the directory according to the hash of the name
+ * (lh->mlh_pdo_hash) in parallel directory lock.
+ */
+static int mdt_pdir_hash_lock(struct mdt_thread_info *info,
+ struct mdt_lock_handle *lh,
+ struct mdt_object *obj, __u64 ibits)
+{
+ struct ldlm_res_id *res_id = &info->mti_res_id;
+ struct ldlm_namespace *ns = info->mti_mdt->mdt_namespace;
+ ldlm_policy_data_t *policy = &info->mti_policy;
+ int rc;
+
+ /*
+ * Finish res_id initializing by name hash marking part of
+ * directory which is taking modification.
+ */
+ LASSERT(lh->mlh_pdo_hash != 0);
+ fid_build_pdo_res_name(mdt_object_fid(obj), lh->mlh_pdo_hash, res_id);
+ memset(policy, 0, sizeof(*policy));
+ policy->l_inodebits.bits = ibits;
+ /*
+ * Use LDLM_FL_LOCAL_ONLY for this lock. We do not know yet if it is
+ * going to be sent to client. If it is - mdt_intent_policy() path will
+ * fix it up and turn FL_LOCAL flag off.
+ */
+ rc = mdt_fid_lock(ns, &lh->mlh_reg_lh, lh->mlh_reg_mode, policy,
+ res_id, LDLM_FL_LOCAL_ONLY | LDLM_FL_ATOMIC_CB,
+ &info->mti_exp->exp_handle.h_cookie);
+ return rc;
+}
/* partial operation for rename */
static int mdt_reint_rename_tgt(struct mdt_thread_info *info)
if (lu_fid_eq(rr->rr_fid1, rr->rr_fid2)) {
mdt_object_get(info->mti_env, msrcdir);
mtgtdir = msrcdir;
+ if (lh_tgtdirp->mlh_pdo_hash != lh_srcdirp->mlh_pdo_hash) {
+ rc = mdt_pdir_hash_lock(info, lh_tgtdirp, mtgtdir,
+ MDS_INODELOCK_UPDATE);
+ if (rc)
+ GOTO(out_unlock_source, rc);
+ }
} else {
mtgtdir = mdt_object_find(info->mti_env, info->mti_mdt,
rr->rr_fid2);