mrr->rr_eadatalen, ma);
}
+
+/*The following four functions are copied from MDS */
+
+/* Write access to a file: executors cause a negative count,
+ * writers a positive count. The semaphore is needed to perform
+ * a check for the sign and then increment or decrement atomically.
+ *
+ * This code is closely tied to the allocation of the d_fsdata and the
+ * MDS epoch, so we use the same semaphore for the whole lot.
+ *
+ * FIXME and TODO : handle the epoch!
+ * epoch argument is nonzero during recovery */
+static int mdt_get_write_access(struct mdt_device *mdt, struct mdt_object *o,
+ __u64 epoch)
+{
+ int rc = 0;
+ ENTRY;
+
+ spin_lock(&mdt->mdt_epoch_lock);
+
+ if (atomic_read(&o->mot_writecount) < 0) {
+ rc = -ETXTBSY;
+ } else {
+ if (o->mot_io_epoch != 0) {
+ CDEBUG(D_INODE, "continue epoch "LPU64" for "DFID3"\n",
+ o->mot_io_epoch, PFID3(mdt_object_fid(o)));
+ } else {
+ if (epoch > mdt->mdt_io_epoch)
+ mdt->mdt_io_epoch = epoch;
+ else
+ mdt->mdt_io_epoch++;
+ o->mot_io_epoch = mdt->mdt_io_epoch;
+ CDEBUG(D_INODE, "starting epoch "LPU64" for "DFID3"\n",
+ mdt->mdt_io_epoch, PFID3(mdt_object_fid(o)));
+ }
+ atomic_inc(&o->mot_writecount);
+ }
+ spin_unlock(&mdt->mdt_epoch_lock);
+ RETURN(rc);
+}
+
+static int mdt_put_write_access(struct mdt_device *mdt, struct mdt_object *o)
+{
+ int rc;
+ ENTRY;
+
+ spin_lock(&mdt->mdt_epoch_lock);
+ atomic_dec(&o->mot_writecount);
+ rc = atomic_read(&o->mot_writecount);
+ if (rc == 0)
+ o->mot_io_epoch = 0;
+ spin_unlock(&mdt->mdt_epoch_lock);
+ RETURN(rc);
+}
+
+static int mdt_deny_write_access(struct mdt_device *mdt, struct mdt_object *o)
+{
+ int rc = 0;
+ ENTRY;
+ spin_lock(&mdt->mdt_epoch_lock);
+ if (atomic_read(&o->mot_writecount) > 0) {
+ rc = -ETXTBSY;
+ } else
+ atomic_dec(&o->mot_writecount);
+ spin_unlock(&mdt->mdt_epoch_lock);
+ RETURN(rc);
+}
+
+static void mdt_allow_write_access(struct mdt_object *o)
+{
+ ENTRY;
+ atomic_inc(&o->mot_writecount);
+ EXIT;
+}
+
+int mdt_query_write_access(struct mdt_object *o)
+{
+ ENTRY;
+ RETURN(atomic_read(&o->mot_writecount));
+}
+
static int mdt_mfd_open(struct mdt_thread_info *info,
struct mdt_object *p,
struct mdt_object *o,
{
struct mdt_export_data *med;
struct mdt_file_data *mfd;
+ struct mdt_device *mdt = info->mti_mdt;
struct mdt_body *repbody;
struct md_attr *ma = &info->mti_attr;
struct lu_attr *la = &ma->ma_attr;
int isreg, isdir, islnk;
ENTRY;
- med = &req->rq_export->exp_mdt_data;
repbody = req_capsule_server_get(&info->mti_pill, &RMF_MDT_BODY);
if (!created) {
if (ma->ma_valid & MA_INODE)
mdt_pack_attr2body(repbody, la, mdt_object_fid(o));
- /* if we are following a symlink, don't open */
+ /* if we are following a symlink, don't open
+ * do not return open handle for special nodes as client required
+ */
if (islnk || (!isreg && !isdir &&
(req->rq_export->exp_connect_flags & OBD_CONNECT_NODEVOH))) {
info->mti_trans_flags |= MDT_NONEED_TANSNO;
} else if (flags & MDS_OPEN_DIRECTORY)
RETURN(-ENOTDIR);
-
- if (!isreg && !isdir &&
- (req->rq_export->exp_connect_flags & OBD_CONNECT_NODEVOH))
- /* If client supports this, do not return open handle
- * for special nodes */
- RETURN(0);
-
if ((isreg) && !(ma->ma_valid & MA_LOV)) {
/*No EA, check whether it is will set regEA and dirEA
*since in above attr get, these size might be zero,
&RMF_MDT_MD,
RCL_SERVER);
LASSERT(p != NULL);
+ /*XXX: Tom, do we need this?
rc = mdt_create_data_obj(info, p, o);
if (rc)
RETURN(rc);
+ */
}
- CDEBUG(D_INODE, "after open, ma_valid bit = "LPX64"\n", ma->ma_valid);
- CDEBUG(D_INODE, "after open, lmm_size = %d\n", ma->ma_lmm_size);
+ CDEBUG(D_INODE, "after open, ma_valid bit = "LPX64" lmm_size = %d\n",
+ ma->ma_valid, ma->ma_lmm_size);
repbody->eadatasize = 0;
repbody->aclsize = 0;
else
repbody->valid |= OBD_MD_FLEASIZE;
}
-
- ldlm_rep = req_capsule_server_get(&info->mti_pill, &RMF_DLM_REP);
-
- intent_set_disposition(ldlm_rep, DISP_OPEN_OPEN);
/*FIXME: should determine the offset dynamicly,
*did not get ACL before shrink*/
lustre_shrink_reply(req, 2, repbody->eadatasize, 1);
lustre_shrink_reply(req, repbody->eadatasize ? 3 : 2, repbody->aclsize,
0);
+ ldlm_rep = req_capsule_server_get(&info->mti_pill, &RMF_DLM_REP);
+ intent_set_disposition(ldlm_rep, DISP_OPEN_OPEN);
+
if (flags & FMODE_WRITE) {
- /*mds_get_write_access*/
+ /* FIXME: in recovery, need to pass old epoch here */
+ rc = mdt_get_write_access(mdt, o, 0);
+ if (rc == 0)
+ repbody->io_epoch = o->mot_io_epoch;
} else if (flags & MDS_FMODE_EXEC) {
- /*mds_deny_write_access*/
+ rc = mdt_deny_write_access(mdt, o);
}
+ if (rc)
+ RETURN(rc);
/* (1) client wants transno when open to keep a ref count for replay;
* see after_reply() and mdc_close_commit();
mfd->mfd_object = o;
mfd->mfd_xid = mdt_info_req(info)->rq_xid;
+ med = &req->rq_export->exp_mdt_data;
spin_lock(&med->med_open_lock);
list_add(&mfd->mfd_list, &med->med_open_head);
spin_unlock(&med->med_open_lock);
if (la->la_flags & MDS_OPEN_CREAT) {
rc = mo_object_create(info->mti_ctxt,
mdt_object_child(o),
+ &info->mti_spec,
&info->mti_attr);
if (rc == 0)
rc = mdt_mfd_open(info, NULL, o, flags, 1);
RETURN(result);
ma->ma_lmm = req_capsule_server_get(&info->mti_pill, &RMF_MDT_MD);
- ma->ma_lmm_size = req_capsule_get_size(&info->mti_pill,
- &RMF_MDT_MD, RCL_SERVER);
+ ma->ma_lmm_size = mdt->mdt_max_mdsize;
if (rr->rr_name[0] == 0) {
/* reint partial remote open */
/*TODO: remove this and add MDS_CHECK_RESENT if resent enabled*/
LASSERT(info->mti_pill.rc_fmt == &RQF_LDLM_INTENT_OPEN);
- ldlm_rep = req_capsule_server_get(&info->mti_pill, &RMF_DLM_REP);
+ CDEBUG(D_INODE, "I am going to create "DFID3"/("DFID3":%s) flag=%x\n",
+ PFID3(rr->rr_fid1), PFID3(rr->rr_fid2),
+ rr->rr_name, la->la_flags);
+ ldlm_rep = req_capsule_server_get(&info->mti_pill, &RMF_DLM_REP);
intent_set_disposition(ldlm_rep, DISP_LOOKUP_EXECD);
+
lh = &info->mti_lh[MDT_LH_PARENT];
if (!(la->la_flags & MDS_OPEN_CREAT))
lh->mlh_mode = LCK_CR;
mdt_object_child(parent),
rr->rr_name,
mdt_object_child(child),
- rr->rr_tgt, rr->rr_eadata,
- rr->rr_eadatalen, &info->mti_attr);
+ &info->mti_spec,
+ /* rr->rr_tgt, rr->rr_eadata, rr->rr_eadatalen,*/
+ &info->mti_attr);
intent_set_disposition(ldlm_rep, DISP_OPEN_CREATE);
if (result != 0)
GOTO(out_child, result);
return result;
}
-void mdt_mfd_close(const struct lu_context *ctxt,
+void mdt_mfd_close(const struct lu_context *ctxt, struct mdt_device *mdt,
struct mdt_file_data *mfd)
{
+ struct mdt_object *o = mfd->mfd_object;
ENTRY;
if (mfd->mfd_mode & FMODE_WRITE) {
- /*mdt_put_write_access*/
+ mdt_put_write_access(mdt, o);
} else if (mfd->mfd_mode & MDS_FMODE_EXEC) {
- /*mdt_allow_write_access*/
+ mdt_allow_write_access(o);
}
/* release reference on this object.
if (rc == 0)
rc = mdt_handle_last_unlink(info, o, ma);
- mdt_mfd_close(info->mti_ctxt, mfd);
+ mdt_mfd_close(info->mti_ctxt, info->mti_mdt, mfd);
}
mdt_shrink_reply(info);
RETURN(rc);