#endif
#define DEBUG_SUBSYSTEM S_MDS
+#include <linux/lustre_acl.h>
+#include <lustre_mds.h>
#include "mdt_internal.h"
/* we do nothing because we do not have refcount now */
{
}
-/* Create a new mdt_file_data struct, initialize it,
- * and insert it to global hash table */
+/* Create a new mdt_file_data struct, initialize it,
+ * and insert it to global hash table */
static struct mdt_file_data *mdt_mfd_new(void)
{
struct mdt_file_data *mfd;
INIT_LIST_HEAD(&mfd->mfd_handle.h_link);
INIT_LIST_HEAD(&mfd->mfd_list);
class_handle_hash(&mfd->mfd_handle, mdt_mfd_get);
- } else
- CERROR("mdt: out of memory\n");
-
+ }
RETURN(mfd);
}
static void mdt_mfd_free(struct mdt_file_data *mfd)
{
LASSERT(list_empty(&mfd->mfd_handle.h_link));
+ LASSERT(list_empty(&mfd->mfd_list));
OBD_FREE_PTR(mfd);
}
+static int mdt_create_data_obj(struct mdt_thread_info *info,
+ struct mdt_object *p, struct mdt_object *o)
+{
+ struct md_attr *ma = &info->mti_attr;
+ struct mdt_reint_record *mrr = &info->mti_rr;
+
+ return mdo_create_data(info->mti_ctxt, mdt_object_child(p),
+ mdt_object_child(o), mrr->rr_eadata,
+ mrr->rr_eadatalen, ma);
+}
+
+
+/*The following four functions are copied from MDS */
+
+/* Write access to a file: executors cause a negative count,
+ * writers a positive count. The semaphore is needed to perform
+ * a check for the sign and then increment or decrement atomically.
+ *
+ * This code is closely tied to the allocation of the d_fsdata and the
+ * MDS epoch, so we use the same semaphore for the whole lot.
+ *
+ * FIXME and TODO : handle the epoch!
+ * epoch argument is nonzero during recovery */
+static int mdt_get_write_access(struct mdt_device *mdt, struct mdt_object *o,
+ __u64 epoch)
+{
+ int rc = 0;
+ ENTRY;
+
+ spin_lock(&mdt->mdt_epoch_lock);
+
+ if (atomic_read(&o->mot_writecount) < 0) {
+ rc = -ETXTBSY;
+ } else {
+ if (o->mot_io_epoch != 0) {
+ CDEBUG(D_INODE, "continue epoch "LPU64" for "DFID3"\n",
+ o->mot_io_epoch, PFID3(mdt_object_fid(o)));
+ } else {
+ if (epoch > mdt->mdt_io_epoch)
+ mdt->mdt_io_epoch = epoch;
+ else
+ mdt->mdt_io_epoch++;
+ o->mot_io_epoch = mdt->mdt_io_epoch;
+ CDEBUG(D_INODE, "starting epoch "LPU64" for "DFID3"\n",
+ mdt->mdt_io_epoch, PFID3(mdt_object_fid(o)));
+ }
+ atomic_inc(&o->mot_writecount);
+ }
+ spin_unlock(&mdt->mdt_epoch_lock);
+ RETURN(rc);
+}
+
+static int mdt_put_write_access(struct mdt_device *mdt, struct mdt_object *o)
+{
+ int rc;
+ ENTRY;
+
+ spin_lock(&mdt->mdt_epoch_lock);
+ atomic_dec(&o->mot_writecount);
+ rc = atomic_read(&o->mot_writecount);
+ if (rc == 0)
+ o->mot_io_epoch = 0;
+ spin_unlock(&mdt->mdt_epoch_lock);
+ RETURN(rc);
+}
+
+static int mdt_deny_write_access(struct mdt_device *mdt, struct mdt_object *o)
+{
+ int rc = 0;
+ ENTRY;
+ spin_lock(&mdt->mdt_epoch_lock);
+ if (atomic_read(&o->mot_writecount) > 0) {
+ rc = -ETXTBSY;
+ } else
+ atomic_dec(&o->mot_writecount);
+ spin_unlock(&mdt->mdt_epoch_lock);
+ RETURN(rc);
+}
+
+static void mdt_allow_write_access(struct mdt_object *o)
+{
+ ENTRY;
+ atomic_inc(&o->mot_writecount);
+ EXIT;
+}
+
+int mdt_query_write_access(struct mdt_object *o)
+{
+ ENTRY;
+ RETURN(atomic_read(&o->mot_writecount));
+}
+
static int mdt_mfd_open(struct mdt_thread_info *info,
- struct mdt_object *o,
- int flags)
+ struct mdt_object *p,
+ struct mdt_object *o,
+ int flags, int created)
{
struct mdt_export_data *med;
struct mdt_file_data *mfd;
+ struct mdt_device *mdt = info->mti_mdt;
struct mdt_body *repbody;
- struct lov_mds_md *lmm = NULL;
- struct lu_attr *attr = &info->mti_attr;
+ struct md_attr *ma = &info->mti_attr;
+ struct lu_attr *la = &ma->ma_attr;
struct ptlrpc_request *req = mdt_info_req(info);
+ struct ldlm_reply *ldlm_rep;
int rc = 0;
+ int isreg, isdir, islnk;
ENTRY;
- med = &req->rq_export->exp_mdt_data;
repbody = req_capsule_server_get(&info->mti_pill, &RMF_MDT_BODY);
- if (req_capsule_has_field(&info->mti_pill, &RMF_MDT_MD))
- lmm = req_capsule_server_get(&info->mti_pill, &RMF_MDT_MD);
- rc = mo_attr_get(info->mti_ctxt, mdt_object_child(o), attr);
- if (rc == 0) {
- if (!S_ISREG(attr->la_mode) &&
- !S_ISDIR(attr->la_mode) &&
- (req->rq_export->exp_connect_flags & OBD_CONNECT_NODEVOH))
- /* If client supports this, do not return open handle
- * for special device nodes */
- RETURN(0);
-
- /* FIXME:maybe this can be done earlier? */
- if (S_ISDIR(attr->la_mode)) {
- if (flags & (MDS_OPEN_CREAT | FMODE_WRITE)) {
- /* we are trying to create or
- * write an existing dir. */
- rc = -EISDIR;
- }
- } else if (flags & MDS_OPEN_DIRECTORY)
- rc = -ENOTDIR;
+ if (!created) {
+ /* we have to get attr & lov ea for this object*/
+ rc = mo_attr_get(info->mti_ctxt, mdt_object_child(o), ma);
+ if (rc)
+ RETURN(rc);
+ }
+ isreg = S_ISREG(la->la_mode);
+ isdir = S_ISDIR(la->la_mode);
+ islnk = S_ISLNK(la->la_mode);
+ if (ma->ma_valid & MA_INODE)
+ mdt_pack_attr2body(repbody, la, mdt_object_fid(o));
+
+ /* if we are following a symlink, don't open
+ * do not return open handle for special nodes as client required
+ */
+ if (islnk || (!isreg && !isdir &&
+ (req->rq_export->exp_connect_flags & OBD_CONNECT_NODEVOH))) {
+ info->mti_trans_flags |= MDT_NONEED_TANSNO;
+ RETURN(0);
}
- if (rc != 0) {
- if (rc == -EREMOTE) {
- repbody->fid1 = *mdt_object_fid(o);
- repbody->valid |= OBD_MD_FLID;
+ /* FIXME:maybe this can be done earlier? */
+ if (isdir) {
+ if (flags & (MDS_OPEN_CREAT | FMODE_WRITE)) {
+ /* we are trying to create or
+ * write an existing dir. */
+ RETURN(-EISDIR);
}
- RETURN(rc);
+ } else if (flags & MDS_OPEN_DIRECTORY)
+ RETURN(-ENOTDIR);
+
+ if ((isreg) && !(ma->ma_valid & MA_LOV)) {
+ /*No EA, check whether it is will set regEA and dirEA
+ *since in above attr get, these size might be zero,
+ *so reset it, to retrieve the MD after create obj*/
+ ma->ma_lmm_size = req_capsule_get_size(&info->mti_pill,
+ &RMF_MDT_MD,
+ RCL_SERVER);
+ LASSERT(p != NULL);
+ /*XXX: Tom, do we need this?
+ rc = mdt_create_data_obj(info, p, o);
+ if (rc)
+ RETURN(rc);
+ */
}
- mdt_pack_attr2body(repbody, attr, mdt_object_fid(o));
+ CDEBUG(D_INODE, "after open, ma_valid bit = "LPX64" lmm_size = %d\n",
+ ma->ma_valid, ma->ma_lmm_size);
+ repbody->eadatasize = 0;
+ repbody->aclsize = 0;
-/*
- if (lmm) {
- rc = mo_xattr_get(info->mti_ctxt, mdt_object_child(o),
- lmm, info->mti_mdt->mdt_max_mdsize,
- XATTR_NAME_LOV);
- if (rc < 0)
- RETURN(-EINVAL);
- if (S_ISDIR(attr->la_mode))
+ if (ma->ma_lmm_size && ma->ma_valid & MA_LOV) {
+ repbody->eadatasize = ma->ma_lmm_size;
+ if (isdir)
repbody->valid |= OBD_MD_FLDIREA;
else
repbody->valid |= OBD_MD_FLEASIZE;
- repbody->eadatasize = rc;
- rc = 0;
}
-*/
+ /*FIXME: should determine the offset dynamicly,
+ *did not get ACL before shrink*/
+ lustre_shrink_reply(req, 2, repbody->eadatasize, 1);
+ lustre_shrink_reply(req, repbody->eadatasize ? 3 : 2, repbody->aclsize,
+ 0);
+
+ ldlm_rep = req_capsule_server_get(&info->mti_pill, &RMF_DLM_REP);
+ intent_set_disposition(ldlm_rep, DISP_OPEN_OPEN);
+
if (flags & FMODE_WRITE) {
- /*mds_get_write_access*/
+ /* FIXME: in recovery, need to pass old epoch here */
+ rc = mdt_get_write_access(mdt, o, 0);
+ if (rc == 0)
+ repbody->io_epoch = o->mot_io_epoch;
} else if (flags & MDS_FMODE_EXEC) {
- /*mds_deny_write_access*/
+ rc = mdt_deny_write_access(mdt, o);
+ }
+ if (rc)
+ RETURN(rc);
+
+ /* (1) client wants transno when open to keep a ref count for replay;
+ * see after_reply() and mdc_close_commit();
+ * (2) we need to record the transaction related stuff onto disk;
+ * But, question is: when do a rean only open, do we still need transno?
+ */
+ if (!created) {
+ struct txn_param txn;
+ struct thandle *th;
+ struct dt_device *dt = info->mti_mdt->mdt_bottom;
+ txn.tp_credits = 1;
+
+ LASSERT(dt);
+ th = dt->dd_ops->dt_trans_start(info->mti_ctxt, dt, &txn);
+ if (!IS_ERR(th))
+ dt->dd_ops->dt_trans_stop(info->mti_ctxt, th);
+ else
+ RETURN(PTR_ERR(th));
}
mfd = mdt_mfd_new();
mfd->mfd_object = o;
mfd->mfd_xid = mdt_info_req(info)->rq_xid;
+ med = &req->rq_export->exp_mdt_data;
spin_lock(&med->med_open_lock);
list_add(&mfd->mfd_list, &med->med_open_head);
spin_unlock(&med->med_open_lock);
repbody->handle.cookie = mfd->mfd_handle.h_cookie;
- } else
+ } else
rc = -ENOMEM;
RETURN(rc);
__u32 flags)
{
struct mdt_object *o;
- int rc;
+ struct lu_attr *la = &info->mti_attr.ma_attr;
+ int rc;
ENTRY;
o = mdt_object_find(info->mti_ctxt, info->mti_mdt, fid);
if (!IS_ERR(o)) {
- if (mdt_object_exists(info->mti_ctxt, &o->mot_obj.mo_lu)) {
- rc = mdt_mfd_open(info, o, flags);
+ if (mdt_object_exists(info->mti_ctxt, &o->mot_obj.mo_lu) > 0) {
+ if (la->la_flags & MDS_OPEN_EXCL &&
+ la->la_flags & MDS_OPEN_CREAT)
+ rc = -EEXIST;
+ else
+ rc = mdt_mfd_open(info, NULL, o, flags, 0);
} else {
rc = -ENOENT;
+ if (la->la_flags & MDS_OPEN_CREAT) {
+ rc = mo_object_create(info->mti_ctxt,
+ mdt_object_child(o),
+ &info->mti_spec,
+ &info->mti_attr);
+ if (rc == 0)
+ rc = mdt_mfd_open(info, NULL, o, flags, 1);
+ }
}
mdt_object_put(info->mti_ctxt, o);
} else
struct mdt_body *body;
int rc;
ENTRY;
-
+
rc = req_capsule_pack(&info->mti_pill);
if (rc == 0) {
body = req_capsule_client_get(&info->mti_pill, &RMF_MDT_BODY);
RETURN(rc);
}
-/* Get an internal lock on the inode number (but not generation) to sync
- * new inode creation with inode unlink (bug 2029). If child_lockh is NULL
- * we just get the lock as a barrier to wait for other holders of this lock,
- * and drop it right away again. */
-int mdt_lock_new_child(struct mdt_thread_info *info,
- struct mdt_object *o,
- struct mdt_lock_handle *child_lockh)
-{
- struct mdt_lock_handle lockh;
- int rc;
- ENTRY;
-
- if (child_lockh == NULL)
- child_lockh = &lockh;
-
- mdt_lock_handle_init(&lockh);
- lockh.mlh_mode = LCK_EX;
- rc = mdt_object_lock(info, o, &lockh, MDS_INODELOCK_UPDATE);
-
- if (rc != ELDLM_OK)
- CERROR("can not mdt_object_lock: %d\n", rc);
- else if (child_lockh == &lockh)
- mdt_object_unlock(info, o, &lockh);
-
- RETURN(rc);
-}
-
int mdt_reint_open(struct mdt_thread_info *info)
{
struct mdt_device *mdt = info->mti_mdt;
struct mdt_object *child;
struct mdt_lock_handle *lh;
struct ldlm_reply *ldlm_rep;
- struct ptlrpc_request *req = mdt_info_req(info);
struct lu_fid *child_fid = &info->mti_tmp_fid1;
+ struct md_attr *ma = &info->mti_attr;
+ struct lu_attr *la = &ma->ma_attr;
int result;
int created = 0;
struct mdt_reint_record *rr = &info->mti_rr;
ENTRY;
- if (strlen(rr->rr_name) == 0) {
+ req_capsule_set_size(&info->mti_pill, &RMF_MDT_MD, RCL_SERVER,
+ mdt->mdt_max_mdsize);
+
+ result = req_capsule_pack(&info->mti_pill);
+ if (result)
+ RETURN(result);
+
+ ma->ma_lmm = req_capsule_server_get(&info->mti_pill, &RMF_MDT_MD);
+ ma->ma_lmm_size = mdt->mdt_max_mdsize;
+
+ if (rr->rr_name[0] == 0) {
/* reint partial remote open */
- RETURN(mdt_open_by_fid(info, rr->rr_fid1,
- info->mti_attr.la_flags));
+ RETURN(mdt_open_by_fid(info, rr->rr_fid1, la->la_flags));
}
/* we now have no resent message, so it must be an intent */
/*TODO: remove this and add MDS_CHECK_RESENT if resent enabled*/
LASSERT(info->mti_pill.rc_fmt == &RQF_LDLM_INTENT_OPEN);
- ldlm_rep = req_capsule_server_get(&info->mti_pill, &RMF_DLM_REP);
+ CDEBUG(D_INODE, "I am going to create "DFID3"/("DFID3":%s) flag=%x\n",
+ PFID3(rr->rr_fid1), PFID3(rr->rr_fid2),
+ rr->rr_name, la->la_flags);
+ ldlm_rep = req_capsule_server_get(&info->mti_pill, &RMF_DLM_REP);
intent_set_disposition(ldlm_rep, DISP_LOOKUP_EXECD);
+
lh = &info->mti_lh[MDT_LH_PARENT];
- lh->mlh_mode = LCK_PW;
- parent = mdt_object_find_lock(info, rr->rr_fid1, lh,
+ if (!(la->la_flags & MDS_OPEN_CREAT))
+ lh->mlh_mode = LCK_CR;
+ else
+ lh->mlh_mode = LCK_EX;
+ parent = mdt_object_find_lock(info, rr->rr_fid1, lh,
MDS_INODELOCK_UPDATE);
- if (IS_ERR(parent)) {
- /* just simulate child not existing */
- intent_set_disposition(ldlm_rep, DISP_LOOKUP_NEG);
+ if (IS_ERR(parent))
GOTO(out, result = PTR_ERR(parent));
- }
result = mdo_lookup(info->mti_ctxt, mdt_object_child(parent),
rr->rr_name, child_fid);
- if (result != 0 && result != -ENOENT) {
+ if (result != 0 && result != -ENOENT)
GOTO(out_parent, result);
- }
if (result == -ENOENT) {
intent_set_disposition(ldlm_rep, DISP_LOOKUP_NEG);
- if (!(info->mti_attr.la_flags & MDS_OPEN_CREAT))
+ if (!(la->la_flags & MDS_OPEN_CREAT))
GOTO(out_parent, result);
- if (req->rq_export->exp_connect_flags & OBD_CONNECT_RDONLY)
- GOTO(out_parent, result = -EROFS);
*child_fid = *info->mti_rr.rr_fid2;
+ /* new object will be created. see the following */
} else {
intent_set_disposition(ldlm_rep, DISP_LOOKUP_POS);
- if (info->mti_attr.la_flags & MDS_OPEN_EXCL &&
- info->mti_attr.la_flags & MDS_OPEN_CREAT)
+ if ((la->la_flags & MDS_OPEN_EXCL &&
+ la->la_flags & MDS_OPEN_CREAT))
GOTO(out_parent, result = -EEXIST);
- /* child_fid is filled by mdo_lookup(). */
- LASSERT(lu_fid_eq(child_fid, info->mti_rr.rr_fid2));
}
child = mdt_object_find(info->mti_ctxt, mdt, child_fid);
mdt_object_child(parent),
rr->rr_name,
mdt_object_child(child),
- rr->rr_tgt,
+ &info->mti_spec,
+ /* rr->rr_tgt, rr->rr_eadata, rr->rr_eadatalen,*/
&info->mti_attr);
intent_set_disposition(ldlm_rep, DISP_OPEN_CREATE);
if (result != 0)
}
/* Open it now. */
- result = mdt_mfd_open(info, child, info->mti_attr.la_flags);
- intent_set_disposition(ldlm_rep, DISP_OPEN_OPEN);
+ result = mdt_mfd_open(info, parent, child, la->la_flags, created);
GOTO(finish_open, result);
finish_open:
- if (result != 0 && result != -EREMOTE && created) {
- mdo_unlink(info->mti_ctxt, mdt_object_child(parent),
- mdt_object_child(child), rr->rr_name);
- }
+ if (result != 0 && created) {
+ int rc2 = mdo_unlink(info->mti_ctxt, mdt_object_child(parent),
+ mdt_object_child(child), rr->rr_name,
+ &info->mti_attr);
+ if (rc2 != 0)
+ CERROR("error in cleanup of open");
+ }
out_child:
mdt_object_put(info->mti_ctxt, child);
out_parent:
- mdt_object_unlock_put(info, parent, lh);
+ mdt_object_unlock_put(info, parent, lh, result);
out:
return result;
}
-int mdt_mfd_close(const struct lu_context *ctxt,
- struct mdt_file_data *mfd)
+void mdt_mfd_close(const struct lu_context *ctxt, struct mdt_device *mdt,
+ struct mdt_file_data *mfd)
{
+ struct mdt_object *o = mfd->mfd_object;
ENTRY;
if (mfd->mfd_mode & FMODE_WRITE) {
- /*mdt_put_write_access*/
+ mdt_put_write_access(mdt, o);
} else if (mfd->mfd_mode & MDS_FMODE_EXEC) {
- /*mdt_allow_write_access*/
+ mdt_allow_write_access(o);
}
/* release reference on this object.
mdt_object_put(ctxt, mfd->mfd_object);
mdt_mfd_free(mfd);
- RETURN(0);
+ EXIT;
}
int mdt_close(struct mdt_thread_info *info)
{
+ struct md_attr *ma = &info->mti_attr;
struct mdt_export_data *med;
struct mdt_file_data *mfd;
+ struct mdt_object *o;
int rc;
ENTRY;
if (mfd == NULL) {
spin_unlock(&med->med_open_lock);
CDEBUG(D_INODE, "no handle for file close: fid = "DFID3
- ": cookie = "LPX64, PFID3(&info->mti_body->fid1),
+ ": cookie = "LPX64"\n", PFID3(&info->mti_body->fid1),
info->mti_body->handle.cookie);
rc = -ESTALE;
} else {
class_handle_unhash(&mfd->mfd_handle);
list_del_init(&mfd->mfd_list);
spin_unlock(&med->med_open_lock);
-
- rc = mdt_handle_last_unlink(info, mfd->mfd_object,
- &RQF_MDS_CLOSE_LAST);
- rc = mdt_mfd_close(info->mti_ctxt, mfd);
+ o = mfd->mfd_object;
+ ma->ma_lmm = req_capsule_server_get(&info->mti_pill,
+ &RMF_MDT_MD);
+ ma->ma_lmm_size = req_capsule_get_size(&info->mti_pill,
+ &RMF_MDT_MD, RCL_SERVER);
+ rc = mo_attr_get(info->mti_ctxt, mdt_object_child(o), ma);
+ if (rc == 0)
+ rc = mdt_handle_last_unlink(info, o, ma);
+
+ mdt_mfd_close(info->mti_ctxt, info->mti_mdt, mfd);
}
+ mdt_shrink_reply(info);
RETURN(rc);
}
int mdt_done_writing(struct mdt_thread_info *info)
{
+ int rc;
ENTRY;
+ req_capsule_set(&info->mti_pill, &RQF_MDS_DONE_WRITING);
+ rc = req_capsule_pack(&info->mti_pill);
+
RETURN(0);
}