GOTO(out_ids, rc = -ENOMEM);
oa->o_mode = S_IFREG | 0600;
oa->o_id = inode->i_ino;
+ oa->o_gr = FILTER_GROUP_FIRST_MDS + mds->mds_num;
oa->o_generation = inode->i_generation;
oa->o_uid = 0; /* must have 0 uid / gid on OST */
oa->o_gid = 0;
oa->o_valid = OBD_MD_FLID | OBD_MD_FLGENER | OBD_MD_FLTYPE |
- OBD_MD_FLMODE | OBD_MD_FLUID | OBD_MD_FLGID;
+ OBD_MD_FLMODE | OBD_MD_FLUID | OBD_MD_FLGID | OBD_MD_FLGROUP;
oa->o_size = 0;
obdo_from_inode(oa, inode, OBD_MD_FLTYPE|OBD_MD_FLATIME|OBD_MD_FLMTIME|
if (rc)
GOTO(out_oa, rc);
}
+ LASSERT(oa->o_gr >= FILTER_GROUP_FIRST_MDS);
rc = obd_create(mds->mds_osc_exp, oa, &lsm, &oti);
if (rc) {
int level = D_ERROR;
GOTO(out_oa, rc);
}
lsm->lsm_object_id = oa->o_id;
+ lsm->lsm_object_gr = oa->o_gr;
}
if (inode->i_size) {
oa->o_size = inode->i_size;
/* get lock (write for O_CREAT, read otherwise) */
- mds_pack_inode2fid(&body->fid1, child->d_inode);
- mds_pack_inode2body(body, child->d_inode);
+ mds_pack_inode2fid(obd, &body->fid1, child->d_inode);
+ mds_pack_inode2body(obd, body, child->d_inode);
if (S_ISREG(child->d_inode->i_mode)) {
rc = mds_pack_md(obd, req->rq_repmsg, 2, body,
child->d_inode, 1);
struct obd_device *obd = req->rq_export->exp_obd;
struct mds_file_data *mfd = NULL;
obd_id *ids = NULL; /* object IDs created */
+ unsigned mode;
int rc = 0;
ENTRY;
/* atomically create objects if necessary */
down(&dchild->d_inode->i_sem);
- if (S_ISREG(dchild->d_inode->i_mode) &&
- !(body->valid & OBD_MD_FLEASIZE)) {
+ mode = dchild->d_inode->i_mode;
+ if ((S_ISREG(mode) || S_ISDIR(mode)) &&
+ !(body->valid & OBD_MD_FLEASIZE)) {
rc = mds_pack_md(obd, req->rq_repmsg, 2, body,
dchild->d_inode, 0);
if (rc) {
if (dchild->d_inode != NULL) {
up(&pending_dir->i_sem);
mds_inode_set_orphan(dchild->d_inode);
- mds_pack_inode2fid(&body->fid1, dchild->d_inode);
- mds_pack_inode2body(body, dchild->d_inode);
+ mds_pack_inode2fid(req2obd(req), &body->fid1, dchild->d_inode);
+ mds_pack_inode2body(req2obd(req), body, dchild->d_inode);
intent_set_disposition(rep, DISP_LOOKUP_EXECD);
intent_set_disposition(rep, DISP_LOOKUP_POS);
CWARN("Orphan %s found and opened in PENDING directory\n",
if (IS_ERR(dchild))
RETURN(PTR_ERR(dchild));
- mds_pack_inode2fid(&body->fid1, dchild->d_inode);
- mds_pack_inode2body(body, dchild->d_inode);
+ mds_pack_inode2fid(req2obd(req), &body->fid1, dchild->d_inode);
+ mds_pack_inode2body(req2obd(req), body, dchild->d_inode);
intent_set_disposition(rep, DISP_LOOKUP_EXECD);
intent_set_disposition(rep, DISP_LOOKUP_POS);
struct mds_body *body = NULL;
struct dentry *dchild = NULL, *dparent = NULL;
struct mds_export_data *med;
- struct lustre_handle parent_lockh;
+ struct lustre_handle parent_lockh[2];
int rc = 0, cleanup_phase = 0, acc_mode, created = 0;
int parent_mode = LCK_PR;
void *handle = NULL;
struct dentry_params dp;
+ struct mea *mea = NULL;
+ int mea_size;
ENTRY;
+ parent_lockh[0].cookie = 0;
+ parent_lockh[1].cookie = 0;
+
if (offset == 2) { /* intent */
rep = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*rep));
body = lustre_msg_buf(req->rq_repmsg, 1, sizeof (*body));
acc_mode = accmode(rec->ur_flags);
/* Step 1: Find and lock the parent */
- if (rec->ur_flags & O_CREAT)
+ if (rec->ur_flags & O_CREAT) {
+ /* XXX Well, in fact we only need this lock mode change if
+ in addition to O_CREAT, the file does not exist.
+ But we do not know if it exists or not yet */
parent_mode = LCK_PW;
+ }
+
+ if (rec->ur_namelen == 1) {
+ /* client (LMV) wants to open the file by fid */
+ CDEBUG(D_OTHER, "OPEN by fid %u/%u/%u\n",
+ (unsigned) rec->ur_fid1->mds,
+ (unsigned) rec->ur_fid1->id,
+ (unsigned) rec->ur_fid1->generation);
+ dchild = mds_fid2dentry(mds, rec->ur_fid1, NULL);
+ if (IS_ERR(dchild)) {
+ rc = PTR_ERR(dparent);
+ CERROR("child lookup by a fid error %d\n", rc);
+ GOTO(cleanup, rc);
+ }
+ goto got_child;
+ }
+
dparent = mds_fid2locked_dentry(obd, rec->ur_fid1, NULL, parent_mode,
- &parent_lockh, rec->ur_name,
- rec->ur_namelen - 1);
+ parent_lockh, rec->ur_name,
+ rec->ur_namelen - 1,
+ MDS_INODELOCK_UPDATE);
if (IS_ERR(dparent)) {
rc = PTR_ERR(dparent);
CERROR("parent lookup error %d\n", rc);
cleanup_phase = 1; /* parent dentry and lock */
+ /* try to retrieve MEA data for this dir */
+ rc = mds_get_lmv_attr(obd, dparent->d_inode, &mea, &mea_size);
+
+ if (mea != NULL) {
+ /* dir is already splitted, check is requested filename
+ * should live at this MDS or at another one */
+ int i;
+ i = mea_name2idx(mea, rec->ur_name, rec->ur_namelen - 1);
+ if (mea->mea_master != i) {
+ CERROR("inapropriate MDS(%d) for %s. should be %d\n",
+ mea->mea_master, rec->ur_name, i);
+ GOTO(cleanup, rc = -ESTALE);
+ }
+ }
+
/* Step 2: Lookup the child */
dchild = ll_lookup_one_len(rec->ur_name, dparent, rec->ur_namelen - 1);
if (IS_ERR(dchild)) {
GOTO(cleanup, rc);
}
+got_child:
cleanup_phase = 2; /* child dentry */
intent_set_disposition(rep, DISP_LOOKUP_EXECD);
+
+ if (dchild->d_flags & DCACHE_CROSS_REF) {
+ struct ldlm_res_id res_id = { . name = {0} };
+ ldlm_policy_data_t policy;
+ int flags = 0;
+ CDEBUG(D_OTHER, "cross reference: %lu/%lu/%lu\n",
+ (unsigned long) dchild->d_mdsnum,
+ (unsigned long) dchild->d_inum,
+ (unsigned long) dchild->d_generation);
+ body->valid |= OBD_MD_FLID | OBD_MD_MDS;
+ body->fid1.id = dchild->d_inum;
+ body->fid1.mds = dchild->d_mdsnum;
+ body->fid1.generation = dchild->d_generation;
+ intent_set_disposition(rep, DISP_LOOKUP_POS);
+ res_id.name[0] = dchild->d_inum;
+ res_id.name[1] = dchild->d_generation;
+ policy.l_inodebits.bits = MDS_INODELOCK_LOOKUP;
+ rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace,
+ res_id, LDLM_IBITS, &policy,
+ LCK_PR, &flags,
+ mds_blocking_ast,
+ ldlm_completion_ast, NULL, NULL,
+ NULL, 0, NULL, child_lockh);
+#ifdef S_PDIROPS
+ if (parent_lockh[1].cookie != 0)
+ ldlm_lock_decref(parent_lockh + 1, LCK_CW);
+#endif
+ ldlm_lock_decref(parent_lockh, parent_mode);
+ if (mea)
+ OBD_FREE(mea, mea_size);
+ l_dput(dchild);
+ l_dput(dparent);
+ RETURN(rc);
+ }
+
if (dchild->d_inode)
intent_set_disposition(rep, DISP_LOOKUP_POS);
else
struct iattr iattr;
struct inode *inode;
+ if ((rc = mds_try_to_split_dir(obd, dparent, &mea, 0))) {
+ if (rc > 0) {
+ /* dir got splitted */
+ GOTO(cleanup, rc = -ESTALE);
+ } else {
+ /* error happened during spitting */
+ GOTO(cleanup, rc);
+ }
+ }
+
if (!(rec->ur_flags & MDS_OPEN_CREAT)) {
/* It's negative and we weren't supposed to create it */
GOTO(cleanup, rc = -ENOENT);
LASSERT(!mds_inode_is_orphan(dchild->d_inode));
- mds_pack_inode2fid(&body->fid1, dchild->d_inode);
- mds_pack_inode2body(body, dchild->d_inode);
+ mds_pack_inode2fid(obd, &body->fid1, dchild->d_inode);
+ mds_pack_inode2body(obd, body, dchild->d_inode);
if (S_ISREG(dchild->d_inode->i_mode)) {
/* Check permissions etc */
}
}
+ if (rc == 0) {
+ struct ldlm_res_id res_id = { . name = {0} };
+ ldlm_policy_data_t policy;
+ int flags = 0;
+ res_id.name[0] = dchild->d_inode->i_ino;
+ res_id.name[1] = dchild->d_inode->i_generation;
+ policy.l_inodebits.bits = MDS_INODELOCK_LOOKUP |
+ MDS_INODELOCK_UPDATE;
+ rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace,
+ res_id, LDLM_IBITS, &policy,
+ LCK_PR, &flags,
+ mds_blocking_ast,
+ ldlm_completion_ast, NULL, NULL,
+ NULL, 0, NULL, child_lockh);
+ if (rc == 0)
+ cleanup_phase = 3;
+ }
+
/* Step 5: mds_open it */
rc = mds_finish_open(req, dchild, body, rec->ur_flags, &handle, rec,
rep);
rc = mds_finish_transno(mds, dchild ? dchild->d_inode : NULL, handle,
req, rc, rep ? rep->lock_policy_res1 : 0);
+
switch (cleanup_phase) {
+ case 3:
+ if (rc)
+ ldlm_lock_decref(child_lockh, LCK_PR);
case 2:
if (rc && created) {
int err = vfs_unlink(dparent->d_inode, dchild);
break;
l_dput(dparent);
+#ifdef S_PDIROPS
+ if (parent_lockh[1].cookie != 0)
+ ldlm_lock_decref(parent_lockh + 1, LCK_CW);
+#endif
if (rc)
- ldlm_lock_decref(&parent_lockh, parent_mode);
+ ldlm_lock_decref(parent_lockh, parent_mode);
else
- ptlrpc_save_lock (req, &parent_lockh, parent_mode);
+ ptlrpc_save_lock (req, parent_lockh, parent_mode);
}
if (rc == 0)
atomic_inc(&mds->mds_open_count);
+ if (mea)
+ OBD_FREE(mea, mea_size);
+ if ((cleanup_phase != 3) && !rc)
+ rc = ENOLCK;
RETURN(rc);
}
body = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*body));
LASSERT(body != NULL);
- mds_pack_inode2fid(&body->fid1, inode);
- mds_pack_inode2body(body, inode);
+ mds_pack_inode2fid(obd, &body->fid1, inode);
+ mds_pack_inode2body(obd, body, inode);
mds_pack_md(obd, req->rq_repmsg, 1, body, inode, 1);
}
spin_lock(&med->med_open_lock);