Whamcloud - gitweb
Add flock support.
[fs/lustre-release.git] / lustre / mds / mds_fs.c
index db3a11b..6c8b722 100644 (file)
@@ -41,6 +41,7 @@
 #include <linux/lustre_fsfilt.h>
 #include <portals/list.h>
 
+#include <linux/lustre_smfs.h>
 #include "mds_internal.h"
 
 /* This limit is arbitrary, but for now we fit it in 1 page (32k clients) */
@@ -166,6 +167,12 @@ int mds_client_free(struct obd_export *exp, int clear_client)
                 LBUG();
         }
 
+
+        /* Make sure the server's last_transno is up to date. Do this
+         * after the client is freed so we know all the client's
+         * transactions have been committed. */
+        mds_update_server_data(exp->exp_obd, 1);
+
  free_and_out:
         OBD_FREE(med->med_mcd, sizeof(*med->med_mcd));
 
@@ -308,9 +315,10 @@ static int mds_read_last_rcvd(struct obd_device *obd, struct file *file)
                 /* These exports are cleaned up by mds_disconnect(), so they
                  * need to be set up like real exports as mds_connect() does.
                  */
-                CDEBUG(D_HA, "RCVRNG CLIENT uuid: %s idx: %d lr: "LPU64
-                       " srv lr: "LPU64"\n", mcd->mcd_uuid, cl_idx,
-                       last_transno, le64_to_cpu(msd->msd_last_transno));
+                CDEBUG(D_HA|D_WARNING,"RCVRNG CLIENT uuid: %s idx: %d lr: "LPU64
+                       " srv lr: "LPU64" lx: "LPU64"\n", mcd->mcd_uuid, cl_idx,
+                       last_transno, le64_to_cpu(msd->msd_last_transno),
+                       mcd->mcd_last_xid);
 
                 exp = class_new_export(obd);
                 if (exp == NULL)
@@ -326,6 +334,7 @@ static int mds_read_last_rcvd(struct obd_device *obd, struct file *file)
                 spin_lock_init(&med->med_open_lock);
 
                 mcd = NULL;
+                exp->exp_replay_needed = 1;
                 obd->obd_recoverable_clients++;
                 obd->obd_max_recoverable_clients++;
                 class_export_put(exp);
@@ -343,7 +352,7 @@ static int mds_read_last_rcvd(struct obd_device *obd, struct file *file)
                       "last_transno "LPU64"\n", obd->obd_name,
                       obd->obd_recoverable_clients, mds->mds_last_transno);
                 obd->obd_next_recovery_transno = obd->obd_last_committed + 1;
-                obd->obd_recovering = 1;
+                target_start_recovery_thread(obd, mds_handle);
         }
 
         if (mcd)
@@ -370,13 +379,14 @@ static int  mds_fs_post_setup(struct obd_device *obd)
         struct dentry *de = mds_fid2dentry(mds, &mds->mds_rootfid, NULL);
         int    rc = 0;
        
-        rc = fsfilt_post_setup(obd);
+        rc = fsfilt_post_setup(obd, de);
         if (rc)
                 GOTO(out, rc);
  
-        fsfilt_set_kml_flags(obd, de->d_inode);
-        fsfilt_set_kml_flags(obd, mds->mds_pending_dir->d_inode);
-        
+        fsfilt_set_fs_flags(obd, de->d_inode, 
+                              SM_DO_REC | SM_DO_COW);
+        fsfilt_set_fs_flags(obd, mds->mds_pending_dir->d_inode, 
+                              SM_DO_REC | SM_DO_COW);
         fsfilt_set_mds_flags(obd, mds->mds_sb);
 out:
         l_dput(de);
@@ -468,6 +478,14 @@ int mds_fs_setup(struct obd_device *obd, struct vfsmount *mnt)
         }
         mds->mds_fids_dir = dentry;
 
+        dentry = simple_mkdir(current->fs->pwd, "UNNAMED", 0777, 1);
+        if (IS_ERR(dentry)) {
+                rc = PTR_ERR(dentry);
+                CERROR("cannot create UNNAMED directory: rc = %d\n", rc);
+                GOTO(err_unnamed, rc);
+        }
+        mds->mds_unnamed_dir = dentry;
+
         /* open and test the last rcvd file */
         file = filp_open(LAST_RCVD, O_RDWR | O_CREAT, 0644);
         if (IS_ERR(file)) {
@@ -518,6 +536,8 @@ err_client:
 err_last_rcvd:
         if (mds->mds_rcvd_filp && filp_close(mds->mds_rcvd_filp, 0))
                 CERROR("can't close %s after error\n", LAST_RCVD);
+err_unnamed:
+        dput(mds->mds_unnamed_dir);
 err_fids:
         dput(mds->mds_fids_dir);
 err_objects:
@@ -564,6 +584,10 @@ int mds_fs_cleanup(struct obd_device *obd, int flags)
                 if (rc)
                         CERROR("%s file won't close, rc=%d\n", LOV_OBJID, rc);
         }
+        if (mds->mds_unnamed_dir != NULL) {
+                l_dput(mds->mds_unnamed_dir);
+                mds->mds_unnamed_dir = NULL;
+        }
         if (mds->mds_fids_dir != NULL) {
                 l_dput(mds->mds_fids_dir);
                 mds->mds_fids_dir = NULL;
@@ -597,9 +621,8 @@ int mds_obd_create(struct obd_export *exp, struct obdo *oa,
 {
         struct mds_obd *mds = &exp->exp_obd->u.mds;
         struct inode *parent_inode = mds->mds_objects_dir->d_inode;
-        unsigned int tmpname = ll_insecure_random_int();
         struct file *filp;
-        struct dentry *new_child;
+        struct dentry *dchild;
         struct lvfs_run_ctxt saved;
         char fidname[LL_FID_NAMELEN];
         void *handle;
@@ -607,17 +630,51 @@ int mds_obd_create(struct obd_export *exp, struct obdo *oa,
         ENTRY;
 
         push_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL);
-        
-        sprintf(fidname, "OBJECTS/%u", tmpname);
+        down(&parent_inode->i_sem);
+        if (oa->o_id) {
+                namelen = ll_fid2str(fidname, oa->o_id, oa->o_generation);
+
+                dchild = lookup_one_len(fidname, mds->mds_objects_dir, namelen);
+                if (IS_ERR(dchild))
+                        GOTO(out_pop, rc = PTR_ERR(dchild));
+
+                if (dchild->d_inode == NULL) {
+                        struct dentry_params dp;
+                        struct inode *inode;
+
+                        dchild->d_fsdata = (void *) &dp;
+                        dp.p_ptr = NULL;
+                        dp.p_inum = oa->o_id;
+                        rc = ll_vfs_create(parent_inode, dchild, S_IFREG, NULL);
+                        if (dchild->d_fsdata == (void *)(unsigned long)oa->o_id)
+                                dchild->d_fsdata = NULL;
+                        if (rc) {
+                                CDEBUG(D_INODE, "err during create: %d\n", rc);
+                                dput(dchild);
+                                GOTO(out_pop, rc);
+                        }
+                        inode = dchild->d_inode;
+                        LASSERT(inode->i_ino == oa->o_id);
+                        inode->i_generation = oa->o_generation;
+                        CDEBUG(D_HA, "recreated ino %lu with gen %u\n",
+                               inode->i_ino, inode->i_generation);
+                        mark_inode_dirty(inode);
+                } else {
+                        CWARN("it should be here!\n");
+                }
+                GOTO(out_pop, rc);
+        }
+
+        sprintf(fidname, "OBJECTS/%u.%u",ll_insecure_random_int(),current->pid);
         filp = filp_open(fidname, O_CREAT | O_EXCL, 0644);
         if (IS_ERR(filp)) {
                 rc = PTR_ERR(filp);
                 if (rc == -EEXIST) {
-                        CERROR("impossible object name collision %u\n",
-                               tmpname);
+                        CERROR("impossible object name collision %s\n",
+                               fidname);
                         LBUG();
                 }
-                CERROR("error creating tmp object %u: rc %d\n", tmpname, rc);
+                CERROR("error creating tmp object %s: rc %d\n", fidname, rc);
                 GOTO(out_pop, rc);
         }
 
@@ -627,14 +684,13 @@ int mds_obd_create(struct obd_export *exp, struct obdo *oa,
         oa->o_generation = filp->f_dentry->d_inode->i_generation;
         namelen = ll_fid2str(fidname, oa->o_id, oa->o_generation);
 
-        down(&parent_inode->i_sem);
-        new_child = lookup_one_len(fidname, mds->mds_objects_dir, namelen);
+        dchild = lookup_one_len(fidname, mds->mds_objects_dir, namelen);
 
-        if (IS_ERR(new_child)) {
+        if (IS_ERR(dchild)) {
                 CERROR("getting neg dentry for obj rename: %d\n", rc);
-                GOTO(out_close, rc = PTR_ERR(new_child));
+                GOTO(out_close, rc = PTR_ERR(dchild));
         }
-        if (new_child->d_inode != NULL) {
+        if (dchild->d_inode != NULL) {
                 CERROR("impossible non-negative obj dentry " LPU64":%u!\n",
                        oa->o_id, oa->o_generation);
                 LBUG();
@@ -647,31 +703,30 @@ int mds_obd_create(struct obd_export *exp, struct obdo *oa,
 
         lock_kernel();
         rc = vfs_rename(mds->mds_objects_dir->d_inode, filp->f_dentry,
-                        mds->mds_objects_dir->d_inode, new_child);
+                        mds->mds_objects_dir->d_inode, dchild);
         unlock_kernel();
         if (rc)
                 CERROR("error renaming new object "LPU64":%u: rc %d\n",
                        oa->o_id, oa->o_generation, rc);
 
-        err = fsfilt_commit(exp->exp_obd, mds->mds_objects_dir->d_inode,
-                            handle, 0);
+        err = fsfilt_commit(exp->exp_obd, mds->mds_sb, 
+                            mds->mds_objects_dir->d_inode, handle, 0);
         if (!err) {
                 oa->o_gr = FILTER_GROUP_FIRST_MDS + mds->mds_num;
                 oa->o_valid |= OBD_MD_FLID | OBD_MD_FLGENER | OBD_MD_FLGROUP;
-        }
-        else if (!rc)
+        } else if (!rc)
                 rc = err;
 out_dput:
-        dput(new_child);
+        dput(dchild);
 out_close:
-        up(&parent_inode->i_sem);
         err = filp_close(filp, 0);
         if (err) {
-                CERROR("closing tmpfile %u: rc %d\n", tmpname, rc);
+                CERROR("closing tmpfile %s: rc %d\n", fidname, rc);
                 if (!rc)
                         rc = err;
         }
 out_pop:
+        up(&parent_inode->i_sem);
         pop_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL);
         RETURN(rc);
 }
@@ -714,7 +769,8 @@ int mds_obd_destroy(struct obd_export *exp, struct obdo *oa,
                 CERROR("error destroying object "LPU64":%u: rc %d\n",
                        oa->o_id, oa->o_generation, rc);
         
-        err = fsfilt_commit(obd, mds->mds_objects_dir->d_inode, handle, 0);
+        err = fsfilt_commit(obd, mds->mds_sb, mds->mds_objects_dir->d_inode, 
+                            handle, 0);
         if (err && !rc)
                 rc = err;
 out_dput: