Whamcloud - gitweb
- extN-wantedi accepts generation as well as ino to create an inode with
[fs/lustre-release.git] / lustre / mds / handler.c
index 4279f54..d42bbc4 100644 (file)
@@ -1143,6 +1143,7 @@ static int mds_filter_recovery_request(struct ptlrpc_request *req,
         case OBD_PING:
         case MDS_REINT:
         case LDLM_ENQUEUE:
+        case OST_CREATE:
                 *process = target_queue_recovery_request(req, obd);
                 RETURN(0);
 
@@ -1169,11 +1170,23 @@ static char *reint_names[] = {
                             OBD_MD_FLATIME | OBD_MD_FLMTIME | OBD_MD_FLCTIME|\
                             OBD_MD_FLID) 
 
+static void reconstruct_create(struct ptlrpc_request *req)
+{
+        struct mds_export_data *med = &req->rq_export->exp_mds_data;
+        struct mds_client_data *mcd = med->med_mcd;
+        struct ost_body *body;
+
+        body = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*body));
+
+        /* copy rc, transno and disp; steal locks */
+        mds_req_from_mcd(req, mcd);
+        CERROR("reconstruct reply for x"LPU64"\n", req->rq_xid);
+}
+
 static int mdt_obj_create(struct ptlrpc_request *req)
 {
+        struct obd_device *obd = req->rq_export->exp_obd;
         struct ldlm_res_id res_id = { .name = {0} };
-        struct obd_export *exp = req->rq_export;
-        struct obd_device *obd = exp->exp_obd;
         struct mds_obd *mds = &obd->u.mds;
         struct ost_body *body, *repbody;
         int rc, size = sizeof(*repbody);
@@ -1182,6 +1195,7 @@ static int mdt_obj_create(struct ptlrpc_request *req)
         struct lustre_handle lockh;
         struct obd_run_ctxt saved;
         ldlm_policy_data_t policy;
+        struct dentry_params dp;
         int mealen, flags = 0;
         unsigned int tmpname;
         struct obd_ucred uc;
@@ -1190,12 +1204,17 @@ static int mdt_obj_create(struct ptlrpc_request *req)
         void *handle;
         ENTRY;
        
+        DEBUG_REQ(D_HA, req, "create remote object");
+
         parent_inode = mds->mds_objects_dir->d_inode;
 
-        body = lustre_swab_reqbuf(req, 0, sizeof(*body), lustre_swab_ost_body);
+        body = lustre_swab_reqbuf(req, 0, sizeof(*body),
+                                  lustre_swab_ost_body);
         if (body == NULL)
                 RETURN(-EFAULT);
 
+        MDS_CHECK_RESENT(req, reconstruct_create(req));
+
         uc.ouc_fsuid = body->oa.o_uid;
         uc.ouc_fsgid = body->oa.o_gid;
 
@@ -1207,74 +1226,97 @@ static int mdt_obj_create(struct ptlrpc_request *req)
 
         repbody = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*repbody));
 
-repeat:
+
+        down(&parent_inode->i_sem);
         handle = fsfilt_start(obd, parent_inode, FSFILT_OP_MKDIR, NULL);
         LASSERT(!IS_ERR(handle));
 
+repeat:
         tmpname = ll_insecure_random_int();
-        sprintf(fidname, "%u", tmpname);
-        new = simple_mkdir(mds->mds_objects_dir, fidname,
-                        body->oa.o_mode, 1);
+        rc = sprintf(fidname, "%u", tmpname);
+        new = lookup_one_len(fidname, mds->mds_objects_dir, rc);
         if (IS_ERR(new)) {
-                CERROR("%s: can't create new inode %s) for mkdir: %d\n",
+                CERROR("%s: can't lookup new inode (%s) for mkdir: %d\n",
                        obd->obd_name, fidname, (int) PTR_ERR(new));
-                if (PTR_ERR(new) == -EEXIST) {
-                        fsfilt_commit(obd, parent_inode, handle, 0);
-                        goto repeat;
+                fsfilt_commit(obd, new->d_inode, handle, 0);
+                up(&parent_inode->i_sem);
+                RETURN(PTR_ERR(new));
+        } else if (new->d_inode) {
+                CERROR("%s: name exists. repeat\n", obd->obd_name);
+                goto repeat;
+        }
+
+        new->d_fsdata = (void *) &dp;
+        dp.p_inum = 0;
+        dp.p_ptr = req;
+
+        if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY) {
+                DEBUG_REQ(D_HA, req, "replay create obj %lu/%lu",
+                          (unsigned long) body->oa.o_id,
+                          (unsigned long) body->oa.o_generation);
+                dp.p_inum = body->oa.o_id;
+                dp.p_generation = body->oa.o_generation;
+        }
+        rc = vfs_mkdir(parent_inode, new, body->oa.o_mode);
+        if (rc == 0) {
+                obdo_from_inode(&repbody->oa, new->d_inode, FILTER_VALID_FLAGS);
+                repbody->oa.o_id = new->d_inode->i_ino;
+                repbody->oa.o_generation = new->d_inode->i_generation;
+                repbody->oa.o_valid |= OBD_MD_FLID | OBD_MD_FLGENER;
+
+                rc = fsfilt_del_dir_entry(obd, new);
+                up(&parent_inode->i_sem);
+
+                if (rc) {
+                        CERROR("can't remove name for object: %d\n", rc);
+                        GOTO(cleanup, rc);
                 }
+                        
+                /* this lock should be taken to serialize MDS modifications
+                 * in failure case */
+                res_id.name[0] = new->d_inode->i_ino;
+                res_id.name[1] = new->d_inode->i_generation;
+                policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
+                rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace,
+                                res_id, LDLM_IBITS, &policy,
+                                LCK_EX, &flags, mds_blocking_ast,
+                                ldlm_completion_ast, NULL, NULL,
+                                NULL, 0, NULL, &lockh);
+                if (rc != ELDLM_OK)
+                        GOTO(cleanup, rc);
+
+                CDEBUG(D_OTHER, "created dirobj: %lu/%lu mode %o\n",
+                                (unsigned long) new->d_inode->i_ino,
+                                (unsigned long) new->d_inode->i_generation,
+                                (unsigned) new->d_inode->i_mode);
+        } else {
+                up(&parent_inode->i_sem);
+                CERROR("%s: can't create dirobj: %d\n", obd->obd_name, rc);
         }
-        LASSERT(!IS_ERR(new));
-        LASSERT(new->d_inode != NULL);
 
-        if (body->oa.o_valid & OBD_MD_FLID) {
+        if (rc == 0 && body->oa.o_valid & OBD_MD_FLID) {
                 /* this is new object for splitted dir. we have to
                  * prevent recursive splitting on it -bzzz */
                 mealen = obd_size_diskmd(mds->mds_lmv_exp, NULL);
                 OBD_ALLOC(mea, mealen);
-                LASSERT(mea != NULL);
+                if (mea == NULL)
+                        GOTO(cleanup, rc = -ENOMEM);
                 mea->mea_count = 0;
                 down(&new->d_inode->i_sem);
-                handle = fsfilt_start(obd, new->d_inode, FSFILT_OP_SETATTR, NULL);
-                LASSERT(!IS_ERR(handle));
                 rc = fsfilt_set_md(obd, new->d_inode, handle, mea, mealen);
-                LASSERT(rc == 0);
-                fsfilt_commit(obd, new->d_inode, handle, 0);
-                LASSERT(rc == 0);
                 up(&new->d_inode->i_sem);
                 OBD_FREE(mea, mealen);
         }
-        obdo_from_inode(&repbody->oa, new->d_inode, FILTER_VALID_FLAGS);
-        repbody->oa.o_id = new->d_inode->i_ino;
-        repbody->oa.o_generation = new->d_inode->i_generation;
-        repbody->oa.o_valid |= OBD_MD_FLID | OBD_MD_FLGENER;
-
-        down(&parent_inode->i_sem);
-        rc = fsfilt_del_dir_entry(obd, new);
-        up(&parent_inode->i_sem);
-        LASSERT(rc == 0);
 
+cleanup:
         rc = mds_finish_transno(mds, parent_inode, handle, req, rc, 0);
-        LASSERT(rc == 0);
-
-        res_id.name[0] = new->d_inode->i_ino;
-        res_id.name[1] = new->d_inode->i_generation;
-        policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
-        rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace,
-                        res_id, LDLM_IBITS, &policy,
-                        LCK_EX, &flags, mds_blocking_ast,
-                        ldlm_completion_ast, NULL, NULL,
-                        NULL, 0, NULL, &lockh);
-        LASSERT(rc == ELDLM_OK);
-
-        CDEBUG(D_OTHER, "created dirobj: %lu/%lu mode %o\n",
-                        (unsigned long) new->d_inode->i_ino,
-                        (unsigned long) new->d_inode->i_generation,
-                        (unsigned) new->d_inode->i_mode);
-
+        if (rc == 0)
+                ptlrpc_save_lock(req, &lockh, LCK_EX);
+        else
+                ldlm_lock_decref(&lockh, LCK_EX);
         l_dput(new);
         pop_ctxt(&saved, &obd->obd_ctxt, &uc);
-        ptlrpc_save_lock(req, &lockh, LCK_EX);
-        RETURN(0);
+        RETURN(rc);
 }
 
 static int mds_get_info(struct obd_export *exp, __u32 keylen,