case OBD_PING:
case MDS_REINT:
case LDLM_ENQUEUE:
+ case OST_CREATE:
*process = target_queue_recovery_request(req, obd);
RETURN(0);
OBD_MD_FLATIME | OBD_MD_FLMTIME | OBD_MD_FLCTIME|\
OBD_MD_FLID)
+static void reconstruct_create(struct ptlrpc_request *req)
+{
+ struct mds_export_data *med = &req->rq_export->exp_mds_data;
+ struct mds_client_data *mcd = med->med_mcd;
+ struct ost_body *body;
+
+ body = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*body));
+
+ /* copy rc, transno and disp; steal locks */
+ mds_req_from_mcd(req, mcd);
+ CERROR("reconstruct reply for x"LPU64"\n", req->rq_xid);
+}
+
static int mdt_obj_create(struct ptlrpc_request *req)
{
+ struct obd_device *obd = req->rq_export->exp_obd;
struct ldlm_res_id res_id = { .name = {0} };
- struct obd_export *exp = req->rq_export;
- struct obd_device *obd = exp->exp_obd;
struct mds_obd *mds = &obd->u.mds;
struct ost_body *body, *repbody;
int rc, size = sizeof(*repbody);
struct lustre_handle lockh;
struct obd_run_ctxt saved;
ldlm_policy_data_t policy;
+ struct dentry_params dp;
int mealen, flags = 0;
unsigned int tmpname;
struct obd_ucred uc;
void *handle;
ENTRY;
+ DEBUG_REQ(D_HA, req, "create remote object");
+
parent_inode = mds->mds_objects_dir->d_inode;
- body = lustre_swab_reqbuf(req, 0, sizeof(*body), lustre_swab_ost_body);
+ body = lustre_swab_reqbuf(req, 0, sizeof(*body),
+ lustre_swab_ost_body);
if (body == NULL)
RETURN(-EFAULT);
+ MDS_CHECK_RESENT(req, reconstruct_create(req));
+
uc.ouc_fsuid = body->oa.o_uid;
uc.ouc_fsgid = body->oa.o_gid;
repbody = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*repbody));
-repeat:
+
+ down(&parent_inode->i_sem);
handle = fsfilt_start(obd, parent_inode, FSFILT_OP_MKDIR, NULL);
LASSERT(!IS_ERR(handle));
+repeat:
tmpname = ll_insecure_random_int();
- sprintf(fidname, "%u", tmpname);
- new = simple_mkdir(mds->mds_objects_dir, fidname,
- body->oa.o_mode, 1);
+ rc = sprintf(fidname, "%u", tmpname);
+ new = lookup_one_len(fidname, mds->mds_objects_dir, rc);
if (IS_ERR(new)) {
- CERROR("%s: can't create new inode %s) for mkdir: %d\n",
+ CERROR("%s: can't lookup new inode (%s) for mkdir: %d\n",
obd->obd_name, fidname, (int) PTR_ERR(new));
- if (PTR_ERR(new) == -EEXIST) {
- fsfilt_commit(obd, parent_inode, handle, 0);
- goto repeat;
+ fsfilt_commit(obd, new->d_inode, handle, 0);
+ up(&parent_inode->i_sem);
+ RETURN(PTR_ERR(new));
+ } else if (new->d_inode) {
+ CERROR("%s: name exists. repeat\n", obd->obd_name);
+ goto repeat;
+ }
+
+ new->d_fsdata = (void *) &dp;
+ dp.p_inum = 0;
+ dp.p_ptr = req;
+
+ if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY) {
+ DEBUG_REQ(D_HA, req, "replay create obj %lu/%lu",
+ (unsigned long) body->oa.o_id,
+ (unsigned long) body->oa.o_generation);
+ dp.p_inum = body->oa.o_id;
+ dp.p_generation = body->oa.o_generation;
+ }
+ rc = vfs_mkdir(parent_inode, new, body->oa.o_mode);
+ if (rc == 0) {
+ obdo_from_inode(&repbody->oa, new->d_inode, FILTER_VALID_FLAGS);
+ repbody->oa.o_id = new->d_inode->i_ino;
+ repbody->oa.o_generation = new->d_inode->i_generation;
+ repbody->oa.o_valid |= OBD_MD_FLID | OBD_MD_FLGENER;
+
+ rc = fsfilt_del_dir_entry(obd, new);
+ up(&parent_inode->i_sem);
+
+ if (rc) {
+ CERROR("can't remove name for object: %d\n", rc);
+ GOTO(cleanup, rc);
}
+
+ /* this lock should be taken to serialize MDS modifications
+ * in failure case */
+ res_id.name[0] = new->d_inode->i_ino;
+ res_id.name[1] = new->d_inode->i_generation;
+ policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
+ rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace,
+ res_id, LDLM_IBITS, &policy,
+ LCK_EX, &flags, mds_blocking_ast,
+ ldlm_completion_ast, NULL, NULL,
+ NULL, 0, NULL, &lockh);
+ if (rc != ELDLM_OK)
+ GOTO(cleanup, rc);
+
+ CDEBUG(D_OTHER, "created dirobj: %lu/%lu mode %o\n",
+ (unsigned long) new->d_inode->i_ino,
+ (unsigned long) new->d_inode->i_generation,
+ (unsigned) new->d_inode->i_mode);
+ } else {
+ up(&parent_inode->i_sem);
+ CERROR("%s: can't create dirobj: %d\n", obd->obd_name, rc);
}
- LASSERT(!IS_ERR(new));
- LASSERT(new->d_inode != NULL);
- if (body->oa.o_valid & OBD_MD_FLID) {
+ if (rc == 0 && body->oa.o_valid & OBD_MD_FLID) {
/* this is new object for splitted dir. we have to
* prevent recursive splitting on it -bzzz */
mealen = obd_size_diskmd(mds->mds_lmv_exp, NULL);
OBD_ALLOC(mea, mealen);
- LASSERT(mea != NULL);
+ if (mea == NULL)
+ GOTO(cleanup, rc = -ENOMEM);
mea->mea_count = 0;
down(&new->d_inode->i_sem);
- handle = fsfilt_start(obd, new->d_inode, FSFILT_OP_SETATTR, NULL);
- LASSERT(!IS_ERR(handle));
rc = fsfilt_set_md(obd, new->d_inode, handle, mea, mealen);
- LASSERT(rc == 0);
- fsfilt_commit(obd, new->d_inode, handle, 0);
- LASSERT(rc == 0);
up(&new->d_inode->i_sem);
OBD_FREE(mea, mealen);
}
- obdo_from_inode(&repbody->oa, new->d_inode, FILTER_VALID_FLAGS);
- repbody->oa.o_id = new->d_inode->i_ino;
- repbody->oa.o_generation = new->d_inode->i_generation;
- repbody->oa.o_valid |= OBD_MD_FLID | OBD_MD_FLGENER;
-
- down(&parent_inode->i_sem);
- rc = fsfilt_del_dir_entry(obd, new);
- up(&parent_inode->i_sem);
- LASSERT(rc == 0);
+cleanup:
rc = mds_finish_transno(mds, parent_inode, handle, req, rc, 0);
- LASSERT(rc == 0);
-
- res_id.name[0] = new->d_inode->i_ino;
- res_id.name[1] = new->d_inode->i_generation;
- policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
- rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace,
- res_id, LDLM_IBITS, &policy,
- LCK_EX, &flags, mds_blocking_ast,
- ldlm_completion_ast, NULL, NULL,
- NULL, 0, NULL, &lockh);
- LASSERT(rc == ELDLM_OK);
-
- CDEBUG(D_OTHER, "created dirobj: %lu/%lu mode %o\n",
- (unsigned long) new->d_inode->i_ino,
- (unsigned long) new->d_inode->i_generation,
- (unsigned) new->d_inode->i_mode);
-
+ if (rc == 0)
+ ptlrpc_save_lock(req, &lockh, LCK_EX);
+ else
+ ldlm_lock_decref(&lockh, LCK_EX);
l_dput(new);
pop_ctxt(&saved, &obd->obd_ctxt, &uc);
- ptlrpc_save_lock(req, &lockh, LCK_EX);
- RETURN(0);
+ RETURN(rc);
}
static int mds_get_info(struct obd_export *exp, __u32 keylen,