* Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
* Use is subject to license terms.
*
- * Copyright (c) 2011, 2012, Whamcloud, Inc.
+ * Copyright (c) 2011, 2012, Intel Corporation.
*/
/*
* This file is part of Lustre, http://www.lustre.org/
static int mdt_create_data(struct mdt_thread_info *info,
struct mdt_object *p, struct mdt_object *o)
{
- struct md_op_spec *spec = &info->mti_spec;
- struct md_attr *ma = &info->mti_attr;
- int rc = 0;
- ENTRY;
-
- if (!md_should_create(spec->sp_cr_flags))
- RETURN(0);
-
- ma->ma_need = MA_INODE | MA_LOV;
- ma->ma_valid = 0;
- cfs_mutex_lock(&o->mot_lov_mutex);
- if (!(o->mot_flags & MOF_LOV_CREATED)) {
- rc = mdo_create_data(info->mti_env,
- p ? mdt_object_child(p) : NULL,
- mdt_object_child(o), spec, ma);
- if (rc == 0 && ma->ma_valid & MA_LOV)
- o->mot_flags |= MOF_LOV_CREATED;
- }
- cfs_mutex_unlock(&o->mot_lov_mutex);
- RETURN(rc);
+ struct md_op_spec *spec = &info->mti_spec;
+ struct md_attr *ma = &info->mti_attr;
+ int rc = 0;
+ ENTRY;
+
+ if (!md_should_create(spec->sp_cr_flags))
+ RETURN(0);
+
+ ma->ma_need = MA_INODE | MA_LOV;
+ ma->ma_valid = 0;
+ mutex_lock(&o->mot_lov_mutex);
+ if (!(o->mot_flags & MOF_LOV_CREATED)) {
+ if (p != NULL && (fid_is_obf(mdt_object_fid(p)) ||
+ fid_is_dot_lustre(mdt_object_fid(p))))
+ GOTO(unlock, rc = -EPERM);
+
+ rc = mdo_create_data(info->mti_env,
+ p ? mdt_object_child(p) : NULL,
+ mdt_object_child(o), spec, ma);
+ if (rc == 0)
+ rc = mdt_attr_get_complex(info, o, ma);
+
+ if (rc == 0 && ma->ma_valid & MA_LOV)
+ o->mot_flags |= MOF_LOV_CREATED;
+ }
+unlock:
+ mutex_unlock(&o->mot_lov_mutex);
+ RETURN(rc);
}
static int mdt_ioepoch_opened(struct mdt_object *mo)
!S_ISREG(lu_object_attr(&o->mot_obj.mo_lu)))
RETURN(0);
- cfs_mutex_lock(&o->mot_ioepoch_mutex);
- if (mdt_ioepoch_opened(o)) {
- /* Epoch continues even if there is no writers yet. */
- CDEBUG(D_INODE, "continue epoch "LPU64" for "DFID"\n",
- o->mot_ioepoch, PFID(mdt_object_fid(o)));
- } else {
- /* XXX: ->mdt_ioepoch is not initialized at the mount */
- cfs_spin_lock(&mdt->mdt_ioepoch_lock);
+ mutex_lock(&o->mot_ioepoch_mutex);
+ if (mdt_ioepoch_opened(o)) {
+ /* Epoch continues even if there is no writers yet. */
+ CDEBUG(D_INODE, "continue epoch "LPU64" for "DFID"\n",
+ o->mot_ioepoch, PFID(mdt_object_fid(o)));
+ } else {
+ /* XXX: ->mdt_ioepoch is not initialized at the mount */
+ spin_lock(&mdt->mdt_ioepoch_lock);
if (mdt->mdt_ioepoch < info->mti_replayepoch)
mdt->mdt_ioepoch = info->mti_replayepoch;
else
o->mot_ioepoch = mdt->mdt_ioepoch;
- cfs_spin_unlock(&mdt->mdt_ioepoch_lock);
+ spin_unlock(&mdt->mdt_ioepoch_lock);
- CDEBUG(D_INODE, "starting epoch "LPU64" for "DFID"\n",
- o->mot_ioepoch, PFID(mdt_object_fid(o)));
- if (created)
- o->mot_flags |= MOF_SOM_CREATED;
- cancel = 1;
- }
- o->mot_ioepoch_count++;
- cfs_mutex_unlock(&o->mot_ioepoch_mutex);
+ CDEBUG(D_INODE, "starting epoch "LPU64" for "DFID"\n",
+ o->mot_ioepoch, PFID(mdt_object_fid(o)));
+ if (created)
+ o->mot_flags |= MOF_SOM_CREATED;
+ cancel = 1;
+ }
+ o->mot_ioepoch_count++;
+ mutex_unlock(&o->mot_ioepoch_mutex);
/* Cancel Size-on-MDS attributes cached on clients for the open case.
* In the truncate case, see mdt_reint_setattr(). */
/**
* Update SOM on-disk attributes.
* If enabling, write update inodes and lustre-ea with the proper IOEpoch,
- * mountid and attributes. If disabling, zero IOEpoch id in lustre-ea.
+ * mountid and attributes. If disabling, clean SOM xattr.
* Call under ->mot_ioepoch_mutex.
*/
static int mdt_som_attr_set(struct mdt_thread_info *info,
- struct mdt_object *obj, __u64 ioepoch, int enable)
+ struct mdt_object *obj, __u64 ioepoch, bool enable)
{
- struct md_attr *ma = &info->mti_attr;
- int rc;
+ struct md_object *next = mdt_object_child(obj);
+ int rc;
ENTRY;
CDEBUG(D_INODE, "Size-on-MDS attribute %s for epoch "LPU64
" on "DFID".\n", enable ? "update" : "disabling",
ioepoch, PFID(mdt_object_fid(obj)));
- ma->ma_valid |= MA_SOM;
- ma->ma_som = &info->mti_u.som.data;
- if (enable) {
- struct mdt_device *mdt = info->mti_mdt;
- struct lu_attr *la = &ma->ma_attr;
-
- ma->ma_som->msd_ioepoch = ioepoch;
- ma->ma_som->msd_size = la->la_valid & LA_SIZE ? la->la_size : 0;
- ma->ma_som->msd_blocks = la->la_valid & LA_BLOCKS ?
- la->la_blocks : 0;
- ma->ma_som->msd_mountid = mdt->mdt_lut.lut_obd->u.obt.obt_mount_count;
- ma->ma_attr.la_valid &= LA_ATIME | LA_MTIME | LA_CTIME;
- } else {
- ma->ma_som->msd_ioepoch = IOEPOCH_INVAL;
- ma->ma_attr.la_valid &= LA_ATIME;
- }
-
- /* Since we have opened the file, it is unnecessary
- * to check permission when close it. Between the "open"
- * and "close", maybe someone has changed the file mode
- * or flags, or the file created mode do not permit wirte,
- * and so on. Just set MDS_PERM_BYPASS for all the cases. */
- ma->ma_attr_flags |= MDS_PERM_BYPASS | MDS_SOM;
+ if (enable) {
+ struct lu_buf *buf = &info->mti_buf;
+ struct som_attrs *attrs;
+ struct md_attr *ma = &info->mti_attr;
+ struct lu_attr *la = &ma->ma_attr;
+ struct obd_device *obd = info->mti_mdt->mdt_lut.lut_obd;
+
+ attrs = (struct som_attrs *)info->mti_xattr_buf;
+ CLASSERT(sizeof(info->mti_xattr_buf) >= sizeof(*attrs));
+
+ /* pack SOM attributes */
+ memset(attrs, 0, sizeof(*attrs));
+ attrs->som_ioepoch = ioepoch;
+ attrs->som_mountid = obd->u.obt.obt_mount_count;
+ if ((la->la_valid & LA_SIZE) != 0)
+ attrs->som_size = la->la_size;
+ if ((la->la_valid & LA_BLOCKS) != 0)
+ attrs->som_blocks = la->la_blocks;
+ lustre_som_swab(attrs);
+
+ /* update SOM attributes */
+ buf->lb_buf = attrs;
+ buf->lb_len = sizeof(*attrs);
+ rc = mo_xattr_set(info->mti_env, next, buf, XATTR_NAME_SOM, 0);
+ } else {
+ /* delete SOM attributes */
+ rc = mo_xattr_del(info->mti_env, next, XATTR_NAME_SOM);
+ }
- rc = mdt_attr_set(info, obj, ma, 0);
RETURN(rc);
}
{
int rc = 0;
- cfs_mutex_lock(&o->mot_ioepoch_mutex);
+ mutex_lock(&o->mot_ioepoch_mutex);
CDEBUG(D_INODE, "Eviction. Closing IOepoch "LPU64" on "DFID". "
"Count %d\n", o->mot_ioepoch, PFID(mdt_object_fid(o)),
o->mot_ioepoch_count);
rc = mdt_som_attr_set(info, o, o->mot_ioepoch, MDT_SOM_DISABLE);
mdt_object_som_enable(o, o->mot_ioepoch);
}
- cfs_mutex_unlock(&o->mot_ioepoch_mutex);
+ mutex_unlock(&o->mot_ioepoch_mutex);
RETURN(rc);
}
int rc = MDT_IOEPOCH_CLOSED;
ENTRY;
- cfs_mutex_lock(&o->mot_ioepoch_mutex);
+ mutex_lock(&o->mot_ioepoch_mutex);
CDEBUG(D_INODE, "Replay. Closing epoch "LPU64" on "DFID". Count %d\n",
o->mot_ioepoch, PFID(mdt_object_fid(o)), o->mot_ioepoch_count);
o->mot_ioepoch_count--;
if (!mdt_ioepoch_opened(o))
mdt_object_som_enable(o, info->mti_ioepoch->ioepoch);
- cfs_mutex_unlock(&o->mot_ioepoch_mutex);
+ mutex_unlock(&o->mot_ioepoch_mutex);
RETURN(rc);
}
la = &info->mti_attr.ma_attr;
achange = (info->mti_ioepoch->flags & MF_SOM_CHANGE);
- cfs_mutex_lock(&o->mot_ioepoch_mutex);
+ mutex_lock(&o->mot_ioepoch_mutex);
o->mot_ioepoch_count--;
tmp_ma = &info->mti_u.som.attr;
tmp_ma->ma_som = &info->mti_u.som.data;
tmp_ma->ma_need = MA_INODE | MA_LOV | MA_SOM;
tmp_ma->ma_valid = 0;
- rc = mo_attr_get(info->mti_env, mdt_object_child(o), tmp_ma);
+ rc = mdt_attr_get_complex(info, o, tmp_ma);
if (rc)
GOTO(error_up, rc);
mdt_object_som_enable(o, o->mot_ioepoch);
}
- cfs_mutex_unlock(&o->mot_ioepoch_mutex);
+ mutex_unlock(&o->mot_ioepoch_mutex);
/* If recovery is needed, tell the client to perform GETATTR under
* the lock. */
if (ret == MDT_IOEPOCH_GETATTR && recovery) {
RETURN(rc ? : ret);
error_up:
- cfs_mutex_unlock(&o->mot_ioepoch_mutex);
+ mutex_unlock(&o->mot_ioepoch_mutex);
return rc;
}
!(info->mti_attr.ma_attr.la_valid & LA_SIZE)))
act = MDT_SOM_DISABLE;
- cfs_mutex_lock(&o->mot_ioepoch_mutex);
+ mutex_lock(&o->mot_ioepoch_mutex);
/* Mark the object it is the recovery state if we failed to obtain
* SOM attributes. */
if (act == MDT_SOM_DISABLE)
rc = mdt_som_attr_set(info, o, ioepoch, act);
mdt_object_som_enable(o, ioepoch);
}
- cfs_mutex_unlock(&o->mot_ioepoch_mutex);
+ mutex_unlock(&o->mot_ioepoch_mutex);
RETURN(rc);
}
{
int rc = 0;
ENTRY;
- cfs_mutex_lock(&o->mot_ioepoch_mutex);
+ mutex_lock(&o->mot_ioepoch_mutex);
rc = o->mot_writecount;
- cfs_mutex_unlock(&o->mot_ioepoch_mutex);
+ mutex_unlock(&o->mot_ioepoch_mutex);
RETURN(rc);
}
{
int rc = 0;
ENTRY;
- cfs_mutex_lock(&o->mot_ioepoch_mutex);
+ mutex_lock(&o->mot_ioepoch_mutex);
if (o->mot_writecount < 0)
rc = -ETXTBSY;
else
o->mot_writecount++;
- cfs_mutex_unlock(&o->mot_ioepoch_mutex);
+ mutex_unlock(&o->mot_ioepoch_mutex);
RETURN(rc);
}
void mdt_write_put(struct mdt_object *o)
{
ENTRY;
- cfs_mutex_lock(&o->mot_ioepoch_mutex);
+ mutex_lock(&o->mot_ioepoch_mutex);
o->mot_writecount--;
- cfs_mutex_unlock(&o->mot_ioepoch_mutex);
+ mutex_unlock(&o->mot_ioepoch_mutex);
EXIT;
}
{
int rc = 0;
ENTRY;
- cfs_mutex_lock(&o->mot_ioepoch_mutex);
+ mutex_lock(&o->mot_ioepoch_mutex);
if (o->mot_writecount > 0)
rc = -ETXTBSY;
else
o->mot_writecount--;
- cfs_mutex_unlock(&o->mot_ioepoch_mutex);
+ mutex_unlock(&o->mot_ioepoch_mutex);
RETURN(rc);
}
static void mdt_write_allow(struct mdt_object *o)
{
ENTRY;
- cfs_mutex_lock(&o->mot_ioepoch_mutex);
+ mutex_lock(&o->mot_ioepoch_mutex);
o->mot_writecount++;
- cfs_mutex_unlock(&o->mot_ioepoch_mutex);
+ mutex_unlock(&o->mot_ioepoch_mutex);
EXIT;
}
if (lustre_msg_get_transno(req->rq_repmsg) != 0)
RETURN_EXIT;
- cfs_spin_lock(&mdt->mdt_lut.lut_translock);
- if (info->mti_transno == 0) {
- info->mti_transno = ++ mdt->mdt_lut.lut_last_transno;
- } else {
- /* should be replay */
- if (info->mti_transno > mdt->mdt_lut.lut_last_transno)
- mdt->mdt_lut.lut_last_transno = info->mti_transno;
- }
- cfs_spin_unlock(&mdt->mdt_lut.lut_translock);
-
- CDEBUG(D_INODE, "transno = "LPU64", last_committed = "LPU64"\n",
- info->mti_transno,
- req->rq_export->exp_obd->obd_last_committed);
-
- req->rq_transno = info->mti_transno;
- lustre_msg_set_transno(req->rq_repmsg, info->mti_transno);
+ spin_lock(&mdt->mdt_lut.lut_translock);
+ if (rc != 0) {
+ if (info->mti_transno != 0) {
+ struct obd_export *exp = req->rq_export;
+
+ CERROR("%s: replay trans "LPU64" NID %s: rc = %d\n",
+ mdt_obd_name(mdt), info->mti_transno,
+ libcfs_nid2str(exp->exp_connection->c_peer.nid),
+ rc);
+ RETURN_EXIT;
+ }
+ } else if (info->mti_transno == 0) {
+ info->mti_transno = ++mdt->mdt_lut.lut_last_transno;
+ } else {
+ /* should be replay */
+ if (info->mti_transno > mdt->mdt_lut.lut_last_transno)
+ mdt->mdt_lut.lut_last_transno = info->mti_transno;
+ }
+ spin_unlock(&mdt->mdt_lut.lut_translock);
+
+ CDEBUG(D_INODE, "transno = "LPU64", last_committed = "LPU64"\n",
+ info->mti_transno,
+ req->rq_export->exp_obd->obd_last_committed);
+
+ req->rq_transno = info->mti_transno;
+ lustre_msg_set_transno(req->rq_repmsg, info->mti_transno);
+
+ /* update lcd in memory only for resent cases */
+ ted = &req->rq_export->exp_target_data;
+ LASSERT(ted);
+ mutex_lock(&ted->ted_lcd_lock);
+ lcd = ted->ted_lcd;
+ if (info->mti_transno < lcd->lcd_last_transno &&
+ info->mti_transno != 0) {
+ /* This should happen during replay. Do not update
+ * last rcvd info if replay req transno < last transno,
+ * otherwise the following resend(after replay) can not
+ * be checked correctly by xid */
+ mutex_unlock(&ted->ted_lcd_lock);
+ CDEBUG(D_HA, "%s: transno = "LPU64" < last_transno = "LPU64"\n",
+ mdt_obd_name(mdt), info->mti_transno,
+ lcd->lcd_last_transno);
+ RETURN_EXIT;
+ }
- /* update lcd in memory only for resent cases */
- ted = &req->rq_export->exp_target_data;
- LASSERT(ted);
- cfs_mutex_lock(&ted->ted_lcd_lock);
- lcd = ted->ted_lcd;
if (lustre_msg_get_opc(req->rq_reqmsg) == MDS_CLOSE ||
lustre_msg_get_opc(req->rq_reqmsg) == MDS_DONE_WRITING) {
- if (info->mti_transno != 0)
- lcd->lcd_last_close_transno = info->mti_transno;
+ if (info->mti_transno != 0)
+ lcd->lcd_last_close_transno = info->mti_transno;
lcd->lcd_last_close_xid = req->rq_xid;
lcd->lcd_last_close_result = rc;
} else {
lcd->lcd_pre_versions[2] = pre_versions[2];
lcd->lcd_pre_versions[3] = pre_versions[3];
}
- if (info->mti_transno != 0)
- lcd->lcd_last_transno = info->mti_transno;
- lcd->lcd_last_xid = req->rq_xid;
+ if (info->mti_transno != 0)
+ lcd->lcd_last_transno = info->mti_transno;
+
+ lcd->lcd_last_xid = req->rq_xid;
lcd->lcd_last_result = rc;
lcd->lcd_last_data = info->mti_opdata;
}
- cfs_mutex_unlock(&ted->ted_lcd_lock);
+ mutex_unlock(&ted->ted_lcd_lock);
EXIT;
}
repbody->ioepoch = o->mot_ioepoch;
}
} else if (flags & MDS_FMODE_EXEC) {
- rc = mdt_write_deny(o);
+ /* if file is released, we can't deny write because we must
+ * restore (write) it to access it.*/
+ if ((ma->ma_valid & MA_HSM) &&
+ (ma->ma_hsm.mh_flags & HS_RELEASED))
+ rc = 0;
+ else
+ rc = mdt_write_deny(o);
}
if (rc)
RETURN(rc);
"cookie=" LPX64"\n", mfd,
PFID(mdt_object_fid(mfd->mfd_object)),
info->mti_rr.rr_handle->cookie);
- cfs_spin_lock(&med->med_open_lock);
- class_handle_unhash(&old_mfd->mfd_handle);
- cfs_list_del_init(&old_mfd->mfd_list);
- cfs_spin_unlock(&med->med_open_lock);
+ spin_lock(&med->med_open_lock);
+ class_handle_unhash(&old_mfd->mfd_handle);
+ cfs_list_del_init(&old_mfd->mfd_list);
+ spin_unlock(&med->med_open_lock);
/* no attr update for that close */
la->la_valid = 0;
ma->ma_valid |= MA_FLAGS;
repbody->handle.cookie = mfd->mfd_handle.h_cookie;
if (req->rq_export->exp_disconnected) {
- cfs_spin_lock(&med->med_open_lock);
- class_handle_unhash(&mfd->mfd_handle);
- cfs_list_del_init(&mfd->mfd_list);
- cfs_spin_unlock(&med->med_open_lock);
- mdt_mfd_close(info, mfd);
- } else {
- cfs_spin_lock(&med->med_open_lock);
- cfs_list_add(&mfd->mfd_list, &med->med_open_head);
- cfs_spin_unlock(&med->med_open_lock);
+ spin_lock(&med->med_open_lock);
+ class_handle_unhash(&mfd->mfd_handle);
+ cfs_list_del_init(&mfd->mfd_list);
+ spin_unlock(&med->med_open_lock);
+ mdt_mfd_close(info, mfd);
+ } else {
+ spin_lock(&med->med_open_lock);
+ cfs_list_add(&mfd->mfd_list, &med->med_open_head);
+ spin_unlock(&med->med_open_lock);
}
mdt_empty_transno(info, rc);
islnk = S_ISLNK(la->la_mode);
mdt_pack_attr2body(info, repbody, la, mdt_object_fid(o));
+ /* LU-2275, simulate broken behaviour (esp. prevalent in
+ * pre-2.4 servers where a very strange reply is sent on error
+ * that looks like it was actually almost succesful and a failure at the
+ * same time */
+ if (OBD_FAIL_CHECK(OBD_FAIL_MDS_NEGATIVE_POSITIVE)) {
+ mdt_set_disposition(info, rep, DISP_OPEN_OPEN |
+ DISP_LOOKUP_NEG |
+ DISP_LOOKUP_POS);
+
+ if (flags & MDS_OPEN_LOCK)
+ mdt_set_disposition(info, rep, DISP_OPEN_LOCK);
+
+ RETURN(-ENOENT);
+ }
+
if (exp_connect_rmtclient(exp)) {
void *buf = req_capsule_server_get(info->mti_pill, &RMF_ACL);
}
}
#ifdef CONFIG_FS_POSIX_ACL
- else if (exp->exp_connect_flags & OBD_CONNECT_ACL) {
+ else if (exp_connect_flags(exp) & OBD_CONNECT_ACL) {
const struct lu_env *env = info->mti_env;
struct md_object *next = mdt_object_child(o);
struct lu_buf *buf = &info->mti_buf;
}
#endif
- if (info->mti_mdt->mdt_opts.mo_mds_capa &&
- exp->exp_connect_flags & OBD_CONNECT_MDS_CAPA) {
+ if (info->mti_mdt->mdt_opts.mo_mds_capa &&
+ exp_connect_flags(exp) & OBD_CONNECT_MDS_CAPA) {
struct lustre_capa *capa;
capa = req_capsule_server_get(info->mti_pill, &RMF_CAPA1);
RETURN(rc);
repbody->valid |= OBD_MD_FLMDSCAPA;
}
- if (info->mti_mdt->mdt_opts.mo_oss_capa &&
- exp->exp_connect_flags & OBD_CONNECT_OSS_CAPA &&
- S_ISREG(lu_object_attr(&o->mot_obj.mo_lu))) {
+ if (info->mti_mdt->mdt_opts.mo_oss_capa &&
+ exp_connect_flags(exp) & OBD_CONNECT_OSS_CAPA &&
+ S_ISREG(lu_object_attr(&o->mot_obj.mo_lu))) {
struct lustre_capa *capa;
capa = req_capsule_server_get(info->mti_pill, &RMF_CAPA2);
* If we are following a symlink, don't open; and do not return open
* handle for special nodes as client required.
*/
- if (islnk || (!isreg && !isdir &&
- (req->rq_export->exp_connect_flags & OBD_CONNECT_NODEVOH))) {
- lustre_msg_set_transno(req->rq_repmsg, 0);
- RETURN(0);
- }
-
- mdt_set_disposition(info, rep, DISP_OPEN_OPEN);
+ if (islnk || (!isreg && !isdir &&
+ (exp_connect_flags(req->rq_export) & OBD_CONNECT_NODEVOH))) {
+ lustre_msg_set_transno(req->rq_repmsg, 0);
+ RETURN(0);
+ }
/*
* We need to return the existing object's fid back, so it is done here,
mfd = NULL;
if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT) {
- cfs_spin_lock(&med->med_open_lock);
- cfs_list_for_each(t, &med->med_open_head) {
- mfd = cfs_list_entry(t, struct mdt_file_data, mfd_list);
- if (mfd->mfd_xid == req->rq_xid) {
- break;
- }
- mfd = NULL;
- }
- cfs_spin_unlock(&med->med_open_lock);
+ spin_lock(&med->med_open_lock);
+ cfs_list_for_each(t, &med->med_open_head) {
+ mfd = cfs_list_entry(t, struct mdt_file_data, mfd_list);
+ if (mfd->mfd_xid == req->rq_xid)
+ break;
+ mfd = NULL;
+ }
+ spin_unlock(&med->med_open_lock);
if (mfd != NULL) {
repbody->handle.cookie = mfd->mfd_handle.h_cookie;
else
repbody->valid |= OBD_MD_FLEASIZE;
}
+ mdt_set_disposition(info, rep, DISP_OPEN_OPEN);
RETURN(0);
}
}
rc = mdt_mfd_open(info, p, o, flags, created);
+ if (!rc)
+ mdt_set_disposition(info, rep, DISP_OPEN_OPEN);
+
RETURN(rc);
}
ma->ma_lmm = req_capsule_server_get(pill, &RMF_MDT_MD);
ma->ma_lmm_size = req_capsule_get_size(pill, &RMF_MDT_MD,
RCL_SERVER);
- ma->ma_need = MA_INODE;
+ ma->ma_need = MA_INODE | MA_HSM;
if (ma->ma_lmm_size > 0)
ma->ma_need |= MA_LOV;
mdt_export_evict(exp);
RETURN_EXIT;
}
- rc = mdt_object_exists(child);
- if (rc > 0) {
- struct md_object *next;
-
- mdt_set_capainfo(info, 1, rr->rr_fid2, BYPASS_CAPA);
- next = mdt_object_child(child);
- rc = mo_attr_get(env, next, ma);
- if (rc == 0)
- rc = mdt_finish_open(info, parent, child,
- flags, 1, ldlm_rep);
- } else if (rc < 0) {
- /* the child object was created on remote server */
- repbody->fid1 = *rr->rr_fid2;
- repbody->valid |= (OBD_MD_FLID | OBD_MD_MDS);
- rc = 0;
- } else if (rc == 0) {
- /* the child does not exist, we should do regular open */
- mdt_object_put(env, parent);
- mdt_object_put(env, child);
- GOTO(regular_open, 0);
- }
+
+ if (unlikely(mdt_object_remote(child))) {
+ /* the child object was created on remote server */
+ if (!mdt_is_dne_client(exp)) {
+ /* Return -EIO for old client */
+ mdt_object_put(env, parent);
+ mdt_object_put(env, child);
+ GOTO(out, rc = -EIO);
+ }
+ repbody->fid1 = *rr->rr_fid2;
+ repbody->valid |= (OBD_MD_FLID | OBD_MD_MDS);
+ rc = 0;
+ } else {
+ if (mdt_object_exists(child)) {
+ mdt_set_capainfo(info, 1, rr->rr_fid2,
+ BYPASS_CAPA);
+ rc = mdt_attr_get_complex(info, child, ma);
+ if (rc == 0)
+ rc = mdt_finish_open(info, parent,
+ child, flags,
+ 1, ldlm_rep);
+ } else {
+ /* the child does not exist, we should do
+ * regular open */
+ mdt_object_put(env, parent);
+ mdt_object_put(env, child);
+ GOTO(regular_open, 0);
+ }
+ }
mdt_object_put(env, parent);
mdt_object_put(env, child);
GOTO(out, rc);
int mdt_open_by_fid(struct mdt_thread_info* info,
struct ldlm_reply *rep)
{
- const struct lu_env *env = info->mti_env;
__u32 flags = info->mti_spec.sp_cr_flags;
struct mdt_reint_record *rr = &info->mti_rr;
struct md_attr *ma = &info->mti_attr;
if (IS_ERR(o))
RETURN(rc = PTR_ERR(o));
- rc = mdt_object_exists(o);
- if (rc > 0) {
- mdt_set_disposition(info, rep, (DISP_IT_EXECD |
- DISP_LOOKUP_EXECD |
- DISP_LOOKUP_POS));
-
- rc = mo_attr_get(env, mdt_object_child(o), ma);
- if (rc == 0)
- rc = mdt_finish_open(info, NULL, o, flags, 0, rep);
- } else if (rc == 0) {
- rc = -ENOENT;
- } else {
+ if (unlikely(mdt_object_remote(o))) {
/* the child object was created on remote server */
struct mdt_body *repbody;
+
+ mdt_set_disposition(info, rep, (DISP_IT_EXECD |
+ DISP_LOOKUP_EXECD |
+ DISP_LOOKUP_POS));
repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
repbody->fid1 = *rr->rr_fid2;
repbody->valid |= (OBD_MD_FLID | OBD_MD_MDS);
rc = 0;
- }
+ } else {
+ if (mdt_object_exists(o)) {
+ mdt_set_disposition(info, rep, (DISP_IT_EXECD |
+ DISP_LOOKUP_EXECD |
+ DISP_LOOKUP_POS));
+
+ rc = mdt_attr_get_complex(info, o, ma);
+ if (rc == 0)
+ rc = mdt_finish_open(info, NULL, o, flags, 0,
+ rep);
+ } else {
+ rc = -ENOENT;
+ }
+ }
mdt_object_put(info->mti_env, o);
RETURN(rc);
}
-int mdt_open_anon_by_fid(struct mdt_thread_info *info,
- struct ldlm_reply *rep,
- struct mdt_lock_handle *lhc)
+/* lock object for open */
+static int mdt_object_open_lock(struct mdt_thread_info *info,
+ struct mdt_object *obj,
+ struct mdt_lock_handle *lhc,
+ __u64 *ibits)
+{
+ struct md_attr *ma = &info->mti_attr;
+ __u64 open_flags = info->mti_spec.sp_cr_flags;
+ ldlm_mode_t lm = LCK_CR;
+ bool try_layout = false;
+ bool create_layout = false;
+ int rc = 0;
+ ENTRY;
+
+ *ibits = 0;
+ if (open_flags & MDS_OPEN_LOCK) {
+ if (open_flags & FMODE_WRITE)
+ lm = LCK_CW;
+ /* if file is released, we can't deny write because we must
+ * restore (write) it to access it. */
+ else if ((open_flags & MDS_FMODE_EXEC) &&
+ !((ma->ma_valid & MA_HSM) &&
+ (ma->ma_hsm.mh_flags & HS_RELEASED)))
+ lm = LCK_PR;
+ else
+ lm = LCK_CR;
+
+ *ibits = MDS_INODELOCK_LOOKUP | MDS_INODELOCK_OPEN;
+ }
+
+ if (S_ISREG(lu_object_attr(&obj->mot_obj.mo_lu))) {
+ if (ma->ma_need & MA_LOV && !(ma->ma_valid & MA_LOV) &&
+ md_should_create(open_flags))
+ create_layout = true;
+ if (exp_connect_layout(info->mti_exp) && !create_layout &&
+ ma->ma_need & MA_LOV)
+ try_layout = true;
+ }
+
+ mdt_lock_handle_init(lhc);
+ mdt_lock_reg_init(lhc, lm);
+
+ /* one problem to return layout lock on open is that it may result
+ * in too many layout locks cached on the client side. */
+ if (!OBD_FAIL_CHECK(OBD_FAIL_MDS_NO_LL_OPEN) && try_layout) {
+ /* return lookup lock to validate inode at the client side,
+ * this is pretty important otherwise mdt will return layout
+ * lock for each open.
+ * However this is a double-edged sword because changing
+ * permission will revoke huge # of LOOKUP locks. */
+ *ibits |= MDS_INODELOCK_LAYOUT | MDS_INODELOCK_LOOKUP;
+ if (!mdt_object_lock_try(info, obj, lhc, *ibits,
+ MDT_CROSS_LOCK)) {
+ *ibits &= ~(MDS_INODELOCK_LAYOUT|MDS_INODELOCK_LOOKUP);
+ if (*ibits != 0)
+ rc = mdt_object_lock(info, obj, lhc, *ibits,
+ MDT_CROSS_LOCK);
+ }
+ } else if (*ibits != 0) {
+ rc = mdt_object_lock(info, obj, lhc, *ibits, MDT_CROSS_LOCK);
+ }
+
+ CDEBUG(D_INODE, "Requested bits lock:"DFID ", ibits = "LPX64
+ ", open_flags = "LPO64", try_layout = %d, rc = %d\n",
+ PFID(mdt_object_fid(obj)), *ibits, open_flags, try_layout, rc);
+
+ /* will change layout, revoke layout locks by enqueuing EX lock. */
+ if (rc == 0 && create_layout) {
+ struct mdt_lock_handle *ll = &info->mti_lh[MDT_LH_LAYOUT];
+
+ CDEBUG(D_INODE, "Will create layout, get EX layout lock:"DFID
+ ", open_flags = "LPO64"\n",
+ PFID(mdt_object_fid(obj)), open_flags);
+
+ LASSERT(!try_layout);
+ mdt_lock_handle_init(ll);
+ mdt_lock_reg_init(ll, LCK_EX);
+ rc = mdt_object_lock(info, obj, ll, MDS_INODELOCK_LAYOUT,
+ MDT_LOCAL_LOCK);
+
+ OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_LL_BLOCK, 2);
+ }
+
+ RETURN(rc);
+}
+
+static void mdt_object_open_unlock(struct mdt_thread_info *info,
+ struct mdt_object *obj,
+ struct mdt_lock_handle *lhc,
+ __u64 ibits, int rc)
+{
+ __u64 open_flags = info->mti_spec.sp_cr_flags;
+ struct mdt_lock_handle *ll = &info->mti_lh[MDT_LH_LAYOUT];
+
+ /* Release local layout lock - the layout lock put in MDT_LH_LAYOUT
+ * will never return to client side. */
+ if (lustre_handle_is_used(&ll->mlh_reg_lh)) {
+ LASSERT(!(ibits & MDS_INODELOCK_LAYOUT));
+ mdt_object_unlock(info, obj, ll, 1);
+ }
+
+ /* Cross-ref case, the lock should be returned to the client */
+ if (ibits == 0 || rc == -EREMOTE)
+ return;
+
+ if (!(open_flags & MDS_OPEN_LOCK) && !(ibits & MDS_INODELOCK_LAYOUT)) {
+ /* for the open request, the lock will only return to client
+ * if open or layout lock is granted. */
+ rc = 1;
+ }
+
+ if (rc != 0) {
+ struct ldlm_reply *ldlm_rep;
+
+ ldlm_rep = req_capsule_server_get(info->mti_pill, &RMF_DLM_REP);
+ mdt_clear_disposition(info, ldlm_rep, DISP_OPEN_LOCK);
+ mdt_object_unlock(info, obj, lhc, 1);
+ }
+}
+
+int mdt_open_by_fid_lock(struct mdt_thread_info *info, struct ldlm_reply *rep,
+ struct mdt_lock_handle *lhc)
{
const struct lu_env *env = info->mti_env;
struct mdt_device *mdt = info->mti_mdt;
struct mdt_object *parent= NULL;
struct mdt_object *o;
int rc;
- ldlm_mode_t lm;
+ __u64 ibits = 0;
ENTRY;
- if (md_should_create(flags)) {
+ if (md_should_create(flags) && !(flags & MDS_OPEN_HAS_EA)) {
if (!lu_fid_eq(rr->rr_fid1, rr->rr_fid2)) {
parent = mdt_object_find(env, mdt, rr->rr_fid1);
if (IS_ERR(parent)) {
if (IS_ERR(o))
RETURN(rc = PTR_ERR(o));
- rc = mdt_object_exists(o);
- if (rc == 0) {
- mdt_set_disposition(info, rep, (DISP_LOOKUP_EXECD |
- DISP_LOOKUP_NEG));
- GOTO(out, rc = -ENOENT);
- } else if (rc < 0) {
- CERROR("NFS remote open shouldn't happen.\n");
- GOTO(out, rc);
- }
- mdt_set_disposition(info, rep, (DISP_IT_EXECD |
- DISP_LOOKUP_EXECD |
- DISP_LOOKUP_POS));
-
- if (flags & FMODE_WRITE)
- lm = LCK_CW;
- else if (flags & MDS_FMODE_EXEC)
- lm = LCK_PR;
- else
- lm = LCK_CR;
-
- mdt_lock_handle_init(lhc);
- mdt_lock_reg_init(lhc, lm);
- rc = mdt_object_lock(info, o, lhc,
- MDS_INODELOCK_LOOKUP | MDS_INODELOCK_OPEN,
- MDT_CROSS_LOCK);
+ if (mdt_object_remote(o)) {
+ CDEBUG(D_INFO, "%s: "DFID" is on remote MDT.\n",
+ mdt_obd_name(info->mti_mdt),
+ PFID(rr->rr_fid2));
+ GOTO(out, rc = -EREMOTE);
+ } else if (!mdt_object_exists(o)) {
+ mdt_set_disposition(info, rep,
+ DISP_IT_EXECD |
+ DISP_LOOKUP_EXECD |
+ DISP_LOOKUP_NEG);
+ GOTO(out, rc = -ENOENT);
+ }
+
+ mdt_set_disposition(info, rep, (DISP_IT_EXECD | DISP_LOOKUP_EXECD));
+
+ rc = mdt_attr_get_complex(info, o, ma);
if (rc)
GOTO(out, rc);
- rc = mo_attr_get(env, mdt_object_child(o), ma);
+ rc = mdt_object_open_lock(info, o, lhc, &ibits);
if (rc)
GOTO(out, rc);
}
}
- if (flags & MDS_OPEN_LOCK)
- mdt_set_disposition(info, rep, DISP_OPEN_LOCK);
rc = mdt_finish_open(info, parent, o, flags, 0, rep);
-
- if (!(flags & MDS_OPEN_LOCK) || rc)
- mdt_object_unlock(info, o, lhc, 1);
-
+ if (!rc) {
+ mdt_set_disposition(info, rep, DISP_LOOKUP_POS);
+ if (flags & MDS_OPEN_LOCK)
+ mdt_set_disposition(info, rep, DISP_OPEN_LOCK);
+ }
GOTO(out, rc);
+
out:
+ mdt_object_open_unlock(info, o, lhc, ibits, rc);
mdt_object_put(env, o);
if (parent != NULL)
mdt_object_put(env, parent);
}
/* Cross-ref request. Currently it can only be a pure open (w/o create) */
-int mdt_cross_open(struct mdt_thread_info* info,
- const struct lu_fid *fid,
- struct ldlm_reply *rep, __u32 flags)
+static int mdt_cross_open(struct mdt_thread_info *info,
+ const struct lu_fid *parent_fid,
+ const struct lu_fid *fid,
+ struct ldlm_reply *rep, __u32 flags)
{
struct md_attr *ma = &info->mti_attr;
struct mdt_object *o;
if (IS_ERR(o))
RETURN(rc = PTR_ERR(o));
- rc = mdt_object_exists(o);
- if (rc > 0) {
- /* Do permission check for cross-open. */
- rc = mo_permission(info->mti_env, NULL, mdt_object_child(o),
- NULL, flags | MDS_OPEN_CROSS);
- if (rc)
- goto out;
-
- mdt_set_capainfo(info, 0, fid, BYPASS_CAPA);
- rc = mo_attr_get(info->mti_env, mdt_object_child(o), ma);
- if (rc == 0)
- rc = mdt_finish_open(info, NULL, o, flags, 0, rep);
- } else if (rc == 0) {
- /*
- * Something is wrong here. lookup was positive but there is
- * no object!
- */
- CERROR("Cross-ref object doesn't exist!\n");
- rc = -EFAULT;
- } else {
- /* Something is wrong here, the object is on another MDS! */
- CERROR("The object isn't on this server! FLD error?\n");
- LU_OBJECT_DEBUG(D_WARNING, info->mti_env,
- &o->mot_obj.mo_lu,
- "Object isn't on this server! FLD error?\n");
-
+ if (mdt_object_remote(o)) {
+ /* Something is wrong here, the object is on another MDS! */
+ CERROR("%s: "DFID" isn't on this server!: rc = %d\n",
+ mdt_obd_name(info->mti_mdt), PFID(fid), -EFAULT);
+ LU_OBJECT_DEBUG(D_WARNING, info->mti_env,
+ &o->mot_obj.mo_lu,
+ "Object isn't on this server! FLD error?\n");
rc = -EFAULT;
+ } else {
+ if (mdt_object_exists(o)) {
+ /* Do permission check for cross-open. */
+ rc = mo_permission(info->mti_env, NULL,
+ mdt_object_child(o),
+ NULL, flags | MDS_OPEN_CROSS);
+ if (rc)
+ goto out;
+
+ mdt_set_capainfo(info, 0, fid, BYPASS_CAPA);
+ rc = mdt_attr_get_complex(info, o, ma);
+ if (rc != 0)
+ GOTO(out, rc);
+
+ /* Do not create lov object if the fid is opened
+ * under OBF */
+ if (S_ISREG(ma->ma_attr.la_mode) &&
+ !(ma->ma_valid & MA_LOV) && (flags & FMODE_WRITE) &&
+ fid_is_obf(parent_fid))
+ GOTO(out, rc = -EPERM);
+
+ rc = mdt_finish_open(info, NULL, o, flags, 0, rep);
+ } else {
+ /*
+ * Something is wrong here. lookup was positive but
+ * there is no object!
+ */
+ CERROR("%s: "DFID" doesn't exist!: rc = %d\n",
+ mdt_obd_name(info->mti_mdt), PFID(fid), -EFAULT);
+ rc = -EFAULT;
+ }
}
-
out:
mdt_object_put(info->mti_env, o);
RETURN(rc);
struct lu_fid *child_fid = &info->mti_tmp_fid1;
struct md_attr *ma = &info->mti_attr;
__u64 create_flags = info->mti_spec.sp_cr_flags;
+ __u64 ibits;
struct mdt_reint_record *rr = &info->mti_rr;
struct lu_name *lname;
int result, rc;
PFID(rr->rr_fid1), rr->rr_name,
PFID(rr->rr_fid2), create_flags,
ma->ma_attr.la_mode, msg_flags);
-
- if (req_is_replay(req) ||
- (req->rq_export->exp_libclient && create_flags&MDS_OPEN_HAS_EA)) {
- /* This is a replay request or from liblustre with ea. */
- result = mdt_open_by_fid(info, ldlm_rep);
-
- if (result != -ENOENT) {
- if (req->rq_export->exp_libclient &&
- create_flags & MDS_OPEN_HAS_EA)
- GOTO(out, result = 0);
- GOTO(out, result);
- }
- /*
- * We didn't find the correct object, so we need to re-create it
- * via a regular replay.
- */
- if (!(create_flags & MDS_OPEN_CREAT)) {
- DEBUG_REQ(D_ERROR, req,
- "OPEN & CREAT not in open replay.");
- GOTO(out, result = -EFAULT);
- }
- CDEBUG(D_INFO, "Open replay did find object, continue as "
- "regular open\n");
- } else if (rr->rr_namelen == 0 && !info->mti_cross_ref &&
- create_flags & MDS_OPEN_LOCK) {
- result = mdt_open_anon_by_fid(info, ldlm_rep, lhc);
- GOTO(out, result);
- }
+ if (info->mti_cross_ref) {
+ /* This is cross-ref open */
+ mdt_set_disposition(info, ldlm_rep,
+ (DISP_IT_EXECD | DISP_LOOKUP_EXECD |
+ DISP_LOOKUP_POS));
+ result = mdt_cross_open(info, rr->rr_fid2, rr->rr_fid1,
+ ldlm_rep, create_flags);
+ GOTO(out, result);
+ } else if (req_is_replay(req) ||
+ (req->rq_export->exp_libclient && create_flags & MDS_OPEN_HAS_EA)) {
+ /* This is a replay request or from liblustre with ea. */
+ result = mdt_open_by_fid(info, ldlm_rep);
+
+ if (result != -ENOENT) {
+ if (req->rq_export->exp_libclient &&
+ create_flags & MDS_OPEN_HAS_EA)
+ GOTO(out, result = 0);
+ GOTO(out, result);
+ }
+ /* We didn't find the correct object, so we need to re-create it
+ * via a regular replay. */
+ if (!(create_flags & MDS_OPEN_CREAT)) {
+ DEBUG_REQ(D_ERROR, req,
+ "OPEN & CREAT not in open replay/by_fid.");
+ GOTO(out, result = -EFAULT);
+ }
+ CDEBUG(D_INFO, "No object(1), continue as regular open.\n");
+ } else if ((rr->rr_namelen == 0 && create_flags & MDS_OPEN_LOCK) ||
+ (create_flags & MDS_OPEN_BY_FID)) {
+ result = mdt_open_by_fid_lock(info, ldlm_rep, lhc);
+ /* If result is 0 then open by FID has found the file
+ * and there is nothing left for us to do here. More
+ * generally if it is anything other than -ENOENT or
+ * -EREMOTE then we return that now. If -ENOENT and
+ * MDS_OPEN_CREAT is set then we must create the file
+ * below. If -EREMOTE then we need to return a LOOKUP
+ * lock to the client, which we do below. Hence this
+ * odd looking condition. See LU-2523. */
+ if (!(result == -ENOENT && (create_flags & MDS_OPEN_CREAT)) &&
+ result != -EREMOTE)
+ GOTO(out, result);
+
+ if (unlikely(rr->rr_namelen == 0))
+ GOTO(out, result = -EINVAL);
+
+ CDEBUG(D_INFO, "No object(2), continue as regular open.\n");
+ }
if (OBD_FAIL_CHECK(OBD_FAIL_MDS_OPEN_PACK))
GOTO(out, result = err_serious(-ENOMEM));
mdt_set_disposition(info, ldlm_rep,
(DISP_IT_EXECD | DISP_LOOKUP_EXECD));
- if (info->mti_cross_ref) {
- /* This is cross-ref open */
- mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_POS);
- result = mdt_cross_open(info, rr->rr_fid1, ldlm_rep,
- create_flags);
- GOTO(out, result);
- }
-
lh = &info->mti_lh[MDT_LH_PARENT];
mdt_lock_pdo_init(lh, (create_flags & MDS_OPEN_CREAT) ?
LCK_PW : LCK_PR, rr->rr_name, rr->rr_namelen);
*child_fid = *info->mti_rr.rr_fid2;
LASSERTF(fid_is_sane(child_fid), "fid="DFID"\n",
PFID(child_fid));
- } else {
- /*
- * Check for O_EXCL is moved to the mdt_finish_open(), we need to
- * return FID back in that case.
- */
- mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_POS);
- }
-
- child = mdt_object_find(info->mti_env, mdt, child_fid);
+ /* In the function below, .hs_keycmp resolves to
+ * lu_obj_hop_keycmp() */
+ /* coverity[overrun-buffer-val] */
+ child = mdt_object_new(info->mti_env, mdt, child_fid);
+ } else {
+ /*
+ * Check for O_EXCL is moved to the mdt_finish_open(), we need to
+ * return FID back in that case.
+ */
+ mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_POS);
+ child = mdt_object_find(info->mti_env, mdt, child_fid);
+ }
if (IS_ERR(child))
GOTO(out_parent, result = PTR_ERR(child));
mdt_set_capainfo(info, 1, child_fid, BYPASS_CAPA);
if (result == -ENOENT) {
- if (mdt_object_obf(parent))
- GOTO(out_child, result = -EPERM);
+ /* Create under OBF and .lustre is not permitted */
+ if (fid_is_obf(rr->rr_fid1) || fid_is_dot_lustre(rr->rr_fid1))
+ GOTO(out_child, result = -EPERM);
/* save versions in reply */
mdt_version_get_save(info, parent, 0);
mdt_clear_disposition(info, ldlm_rep, DISP_OPEN_CREATE);
GOTO(out_child, result);
} else {
+
+ /* XXX: we should call this once, see few lines below */
+ if (result == 0)
+ result = mdt_attr_get_complex(info, child, ma);
+
if (result != 0)
GOTO(out_child, result);
}
created = 1;
} else {
- /* We have to get attr & lov ea for this object */
- result = mo_attr_get(info->mti_env, mdt_object_child(child),
- ma);
/*
* The object is on remote node, return its FID for remote open.
*/
- if (result == -EREMOTE) {
+ if (mdt_object_remote(child)) {
/*
* Check if this lock already was sent to client and
* this is resent case. For resent case do not take lock
repbody->valid |= (OBD_MD_FLID | OBD_MD_MDS);
if (rc != 0)
result = rc;
+ else
+ result = -EREMOTE;
GOTO(out_child, result);
- }
+ } else {
+ if (mdt_object_exists(child)) {
+ /* We have to get attr & LOV EA & HSM for this
+ * object */
+ ma->ma_need |= MA_HSM;
+ result = mdt_attr_get_complex(info, child, ma);
+ } else {
+ /*object non-exist!!!*/
+ LBUG();
+ }
+ }
}
LASSERT(!lustre_handle_is_used(&lhc->mlh_reg_lh));
- /* get openlock if this is not replay and if a client requested it */
- if (!req_is_replay(req) && create_flags & MDS_OPEN_LOCK) {
- ldlm_mode_t lm;
-
- if (create_flags & FMODE_WRITE)
- lm = LCK_CW;
- else if (create_flags & MDS_FMODE_EXEC)
- lm = LCK_PR;
- else
- lm = LCK_CR;
- mdt_lock_handle_init(lhc);
- mdt_lock_reg_init(lhc, lm);
- rc = mdt_object_lock(info, child, lhc,
- MDS_INODELOCK_LOOKUP | MDS_INODELOCK_OPEN,
- MDT_CROSS_LOCK);
- if (rc) {
- result = rc;
- GOTO(out_child, result);
- } else {
- result = -EREMOTE;
- mdt_set_disposition(info, ldlm_rep, DISP_OPEN_LOCK);
- }
- }
-
- /* Try to open it now. */
- rc = mdt_finish_open(info, parent, child, create_flags,
- created, ldlm_rep);
- if (rc) {
- result = rc;
- if (lustre_handle_is_used(&lhc->mlh_reg_lh))
- /* openlock was acquired and mdt_finish_open failed -
- drop the openlock */
- mdt_object_unlock(info, child, lhc, 1);
- if (created) {
- ma->ma_need = 0;
- ma->ma_valid = 0;
- ma->ma_cookie_size = 0;
- info->mti_no_need_trans = 1;
- rc = mdo_unlink(info->mti_env,
- mdt_object_child(parent),
- mdt_object_child(child),
- lname,
- &info->mti_attr);
- if (rc != 0)
- CERROR("Error in cleanup of open\n");
- }
- }
+ /* get openlock if this is not replay and if a client requested it */
+ if (!req_is_replay(req)) {
+ rc = mdt_object_open_lock(info, child, lhc, &ibits);
+ if (rc != 0)
+ GOTO(out_child, result = rc);
+ else if (create_flags & MDS_OPEN_LOCK)
+ mdt_set_disposition(info, ldlm_rep, DISP_OPEN_LOCK);
+ }
+
+ /* Try to open it now. */
+ rc = mdt_finish_open(info, parent, child, create_flags,
+ created, ldlm_rep);
+ if (rc) {
+ result = rc;
+ /* openlock will be released if mdt_finish_open failed */
+ mdt_clear_disposition(info, ldlm_rep, DISP_OPEN_LOCK);
+ if (created) {
+ ma->ma_need = 0;
+ ma->ma_valid = 0;
+ ma->ma_cookie_size = 0;
+ rc = mdo_unlink(info->mti_env,
+ mdt_object_child(parent),
+ mdt_object_child(child),
+ lname,
+ &info->mti_attr, 0);
+ if (rc != 0)
+ CERROR("%s: "DFID" cleanup of open: rc = %d\n",
+ mdt_obd_name(info->mti_mdt),
+ PFID(mdt_object_fid(child)), rc);
+ mdt_clear_disposition(info, ldlm_rep, DISP_OPEN_CREATE);
+ }
+ }
EXIT;
out_child:
+ mdt_object_open_unlock(info, child, lhc, ibits, result);
mdt_object_put(info->mti_env, child);
out_parent:
mdt_object_unlock_put(info, parent, lh, result || !created);
out:
- if (result && result != -EREMOTE)
- lustre_msg_set_transno(req->rq_repmsg, 0);
- return result;
+ if (result)
+ lustre_msg_set_transno(req->rq_repmsg, 0);
+ return result;
}
#define MFD_CLOSED(mode) (((mode) & ~(MDS_FMODE_EPOCH | MDS_FMODE_SOM | \
rc = mo_attr_set(info->mti_env, next, ma);
}
+ /* If file data is modified, add the dirty flag.
+ *
+ * If MDS_CLOSE_CLEANUP is set, this file is being closed due to an
+ * eviction, file could have been modified and now dirty
+ * regarding to HSM archive, check this!
+ * The logic here is to mark a file dirty if there's a chance it was
+ * dirtied before the client was evicted, so that we don't have to wait
+ * for a release attempt before finding out the file was actually dirty
+ * and fail the release. Aggressively marking it dirty here will cause
+ * the policy engine to attempt to re-archive it; when rearchiving, we
+ * can compare the current version to the LMA data_version and make the
+ * archive request into a noop if it's not actually dirty.
+ */
+ if ((ma->ma_attr_flags & MDS_DATA_MODIFIED) ||
+ ((ma->ma_attr_flags & MDS_CLOSE_CLEANUP) &&
+ (mode & (FMODE_WRITE|MDS_FMODE_TRUNC))))
+ rc = mdt_add_dirty_flag(info, o, ma);
+
ma->ma_need |= MA_INODE;
ma->ma_valid &= ~MA_INODE;
LASSERT(mdt_info_req(info));
med = &mdt_info_req(info)->rq_export->exp_mdt_data;
- cfs_spin_lock(&med->med_open_lock);
- cfs_list_add(&mfd->mfd_list, &med->med_open_head);
- class_handle_hash_back(&mfd->mfd_handle);
- cfs_spin_unlock(&med->med_open_lock);
+ spin_lock(&med->med_open_lock);
+ cfs_list_add(&mfd->mfd_list, &med->med_open_head);
+ class_handle_hash_back(&mfd->mfd_handle);
+ spin_unlock(&med->med_open_lock);
if (ret == MDT_IOEPOCH_OPENED) {
ret = 0;
mdt_client_compatibility(info);
if (rc == 0)
mdt_fix_reply(info);
+ mdt_exit_ucred(info);
RETURN(lustre_msg_get_status(req->rq_repmsg));
}
}
med = &req->rq_export->exp_mdt_data;
- cfs_spin_lock(&med->med_open_lock);
- mfd = mdt_handle2mfd(info, &info->mti_ioepoch->handle);
- if (mdt_mfd_closed(mfd)) {
- cfs_spin_unlock(&med->med_open_lock);
- CDEBUG(D_INODE, "no handle for file close: fid = "DFID
- ": cookie = "LPX64"\n", PFID(info->mti_rr.rr_fid1),
- info->mti_ioepoch->handle.cookie);
- /** not serious error since bug 3633 */
- rc = -ESTALE;
- } else {
- class_handle_unhash(&mfd->mfd_handle);
- cfs_list_del_init(&mfd->mfd_list);
- cfs_spin_unlock(&med->med_open_lock);
+ spin_lock(&med->med_open_lock);
+ mfd = mdt_handle2mfd(info, &info->mti_ioepoch->handle);
+ if (mdt_mfd_closed(mfd)) {
+ spin_unlock(&med->med_open_lock);
+ CDEBUG(D_INODE, "no handle for file close: fid = "DFID
+ ": cookie = "LPX64"\n", PFID(info->mti_rr.rr_fid1),
+ info->mti_ioepoch->handle.cookie);
+ /** not serious error since bug 3633 */
+ rc = -ESTALE;
+ } else {
+ class_handle_unhash(&mfd->mfd_handle);
+ cfs_list_del_init(&mfd->mfd_list);
+ spin_unlock(&med->med_open_lock);
/* Do not lose object before last unlink. */
o = mfd->mfd_object;
rc = mdt_fix_reply(info);
}
+ mdt_exit_ucred(info);
if (OBD_FAIL_CHECK(OBD_FAIL_MDS_CLOSE_PACK))
RETURN(err_serious(-ENOMEM));
if (rc)
RETURN(err_serious(rc));
- if (mdt_check_resent(info, mdt_reconstruct_generic, NULL))
- RETURN(lustre_msg_get_status(req->rq_repmsg));
+ if (mdt_check_resent(info, mdt_reconstruct_generic, NULL)) {
+ mdt_exit_ucred(info);
+ RETURN(lustre_msg_get_status(req->rq_repmsg));
+ }
med = &info->mti_exp->exp_mdt_data;
- cfs_spin_lock(&med->med_open_lock);
- mfd = mdt_handle2mfd(info, &info->mti_ioepoch->handle);
- if (mfd == NULL) {
- cfs_spin_unlock(&med->med_open_lock);
+ spin_lock(&med->med_open_lock);
+ mfd = mdt_handle2mfd(info, &info->mti_ioepoch->handle);
+ if (mfd == NULL) {
+ spin_unlock(&med->med_open_lock);
CDEBUG(D_INODE, "no handle for done write: fid = "DFID
": cookie = "LPX64" ioepoch = "LPU64"\n",
PFID(info->mti_rr.rr_fid1),
rc = info->mti_ioepoch->flags & MF_SOM_AU ?
-EAGAIN : 0;
mdt_empty_transno(info, rc);
- RETURN(rc);
- }
- RETURN(-ESTALE);
+ } else
+ rc = -ESTALE;
+ GOTO(error_ucred, rc);
}
LASSERT(mfd->mfd_mode == MDS_FMODE_EPOCH ||
mfd->mfd_mode == MDS_FMODE_TRUNC);
class_handle_unhash(&mfd->mfd_handle);
cfs_list_del_init(&mfd->mfd_list);
- cfs_spin_unlock(&med->med_open_lock);
+ spin_unlock(&med->med_open_lock);
/* Set EPOCH CLOSE flag if not set by client. */
info->mti_ioepoch->flags |= MF_EPOCH_CLOSE;
info->mti_attr.ma_lmm_size = info->mti_mdt->mdt_max_mdsize;
OBD_ALLOC_LARGE(info->mti_attr.ma_lmm, info->mti_mdt->mdt_max_mdsize);
- if (info->mti_attr.ma_lmm == NULL)
- RETURN(-ENOMEM);
+ if (info->mti_attr.ma_lmm == NULL)
+ GOTO(error_ucred, rc = -ENOMEM);
rc = mdt_mfd_close(info, mfd);
OBD_FREE_LARGE(info->mti_attr.ma_lmm, info->mti_mdt->mdt_max_mdsize);
mdt_empty_transno(info, rc);
- RETURN(rc);
+error_ucred:
+ mdt_exit_ucred(info);
+ RETURN(rc);
}