Whamcloud - gitweb
Print client recovery info to the console for debugging.
[fs/lustre-release.git] / lustre / mds / mds_fs.c
index 0c74ec0..79a4cb5 100644 (file)
@@ -41,6 +41,7 @@
 #include <linux/lustre_fsfilt.h>
 #include <portals/list.h>
 
+#include <linux/lustre_smfs.h>
 #include "mds_internal.h"
 
 /* This limit is arbitrary, but for now we fit it in 1 page (32k clients) */
@@ -99,15 +100,15 @@ int mds_client_add(struct obd_device *obd, struct mds_obd *mds,
                 (cl_idx * le16_to_cpu(mds->mds_server_data->msd_client_size));
 
         if (new_client) {
-                struct obd_run_ctxt saved;
+                struct lvfs_run_ctxt saved;
                 loff_t off = med->med_off;
                 struct file *file = mds->mds_rcvd_filp;
                 int rc;
 
-                push_ctxt(&saved, &obd->obd_ctxt, NULL);
+                push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
                 rc = fsfilt_write_record(obd, file, med->med_mcd,
                                          sizeof(*med->med_mcd), &off, 1);
-                pop_ctxt(&saved, &obd->obd_ctxt, NULL);
+                pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
 
                 if (rc)
                         return rc;
@@ -124,7 +125,7 @@ int mds_client_free(struct obd_export *exp, int clear_client)
         struct mds_obd *mds = &exp->exp_obd->u.mds;
         struct obd_device *obd = exp->exp_obd;
         struct mds_client_data zero_mcd;
-        struct obd_run_ctxt saved;
+        struct lvfs_run_ctxt saved;
         int rc;
         unsigned long *bitmap = mds->mds_client_bitmap;
 
@@ -150,10 +151,10 @@ int mds_client_free(struct obd_export *exp, int clear_client)
 
         if (clear_client) {
                 memset(&zero_mcd, 0, sizeof zero_mcd);
-                push_ctxt(&saved, &obd->obd_ctxt, NULL);
+                push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
                 rc = fsfilt_write_record(obd, mds->mds_rcvd_filp, &zero_mcd,
                                          sizeof(zero_mcd), &med->med_off, 1);
-                pop_ctxt(&saved, &obd->obd_ctxt, NULL);
+                pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
 
                 CDEBUG(rc == 0 ? D_INFO : D_ERROR,
                        "zeroing out client %s idx %u in %s rc %d\n",
@@ -166,6 +167,12 @@ int mds_client_free(struct obd_export *exp, int clear_client)
                 LBUG();
         }
 
+
+        /* Make sure the server's last_transno is up to date. Do this
+         * after the client is freed so we know all the client's
+         * transactions have been committed. */
+        mds_update_server_data(exp->exp_obd, 1);
+
  free_and_out:
         OBD_FREE(med->med_mcd, sizeof(*med->med_mcd));
 
@@ -308,9 +315,10 @@ static int mds_read_last_rcvd(struct obd_device *obd, struct file *file)
                 /* These exports are cleaned up by mds_disconnect(), so they
                  * need to be set up like real exports as mds_connect() does.
                  */
-                CDEBUG(D_HA, "RCVRNG CLIENT uuid: %s idx: %d lr: "LPU64
-                       " srv lr: "LPU64"\n", mcd->mcd_uuid, cl_idx,
-                       last_transno, le64_to_cpu(msd->msd_last_transno));
+                CDEBUG(D_HA|D_WARNING,"RCVRNG CLIENT uuid: %s idx: %d lr: "LPU64
+                       " srv lr: "LPU64" lx: "LPU64"\n", mcd->mcd_uuid, cl_idx,
+                       last_transno, le64_to_cpu(msd->msd_last_transno),
+                       mcd->mcd_last_xid);
 
                 exp = class_new_export(obd);
                 if (exp == NULL)
@@ -326,6 +334,7 @@ static int mds_read_last_rcvd(struct obd_device *obd, struct file *file)
                 spin_lock_init(&med->med_open_lock);
 
                 mcd = NULL;
+                exp->exp_replay_needed = 1;
                 obd->obd_recoverable_clients++;
                 obd->obd_max_recoverable_clients++;
                 class_export_put(exp);
@@ -343,7 +352,7 @@ static int mds_read_last_rcvd(struct obd_device *obd, struct file *file)
                       "last_transno "LPU64"\n", obd->obd_name,
                       obd->obd_recoverable_clients, mds->mds_last_transno);
                 obd->obd_next_recovery_transno = obd->obd_last_committed + 1;
-                obd->obd_recovering = 1;
+                target_start_recovery_thread(obd, mds_handle);
         }
 
         if (mcd)
@@ -364,10 +373,30 @@ err_msd:
         RETURN(rc);
 }
 
+static int  mds_fs_post_setup(struct obd_device *obd)
+{
+        struct mds_obd *mds = &obd->u.mds;
+        struct dentry *de = mds_fid2dentry(mds, &mds->mds_rootfid, NULL);
+        int    rc = 0;
+       
+        rc = fsfilt_post_setup(obd, de);
+        if (rc)
+                GOTO(out, rc);
+        fsfilt_set_fs_flags(obd, de->d_inode, 
+                              SM_DO_REC | SM_DO_COW);
+        fsfilt_set_fs_flags(obd, mds->mds_pending_dir->d_inode, 
+                              SM_DO_REC | SM_DO_COW);
+        fsfilt_set_mds_flags(obd, mds->mds_sb);
+out:
+        l_dput(de);
+        return rc; 
+}
+
 int mds_fs_setup(struct obd_device *obd, struct vfsmount *mnt)
 {
         struct mds_obd *mds = &obd->u.mds;
-        struct obd_run_ctxt saved;
+        struct lvfs_run_ctxt saved;
         struct dentry *dentry;
         struct file *file;
         int rc;
@@ -382,14 +411,14 @@ int mds_fs_setup(struct obd_device *obd, struct vfsmount *mnt)
 
         fsfilt_setup(obd, mds->mds_sb);
 
-        OBD_SET_CTXT_MAGIC(&obd->obd_ctxt);
-        obd->obd_ctxt.pwdmnt = mnt;
-        obd->obd_ctxt.pwd = mnt->mnt_root;
-        obd->obd_ctxt.fs = get_ds();
-        obd->obd_ctxt.cb_ops = mds_lvfs_ops;
+        OBD_SET_CTXT_MAGIC(&obd->obd_lvfs_ctxt);
+        obd->obd_lvfs_ctxt.pwdmnt = mnt;
+        obd->obd_lvfs_ctxt.pwd = mnt->mnt_root;
+        obd->obd_lvfs_ctxt.fs = get_ds();
+        obd->obd_lvfs_ctxt.cb_ops = mds_lvfs_ops;
 
         /* setup the directory tree */
-        push_ctxt(&saved, &obd->obd_ctxt, NULL);
+        push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
         dentry = simple_mkdir(current->fs->pwd, "ROOT", 0755, 0);
         if (IS_ERR(dentry)) {
                 rc = PTR_ERR(dentry);
@@ -424,7 +453,7 @@ int mds_fs_setup(struct obd_device *obd, struct vfsmount *mnt)
                 GOTO(err_fid, rc);
         }
         mds->mds_pending_dir = dentry;
-
+      
         dentry = simple_mkdir(current->fs->pwd, "LOGS", 0777, 1);
         if (IS_ERR(dentry)) {
                 rc = PTR_ERR(dentry);
@@ -441,6 +470,22 @@ int mds_fs_setup(struct obd_device *obd, struct vfsmount *mnt)
         }
         mds->mds_objects_dir = dentry;
 
+        dentry = simple_mkdir(current->fs->pwd, "FIDS", 0777, 1);
+        if (IS_ERR(dentry)) {
+                rc = PTR_ERR(dentry);
+                CERROR("cannot create FIDS directory: rc = %d\n", rc);
+                GOTO(err_fids, rc);
+        }
+        mds->mds_fids_dir = dentry;
+
+        dentry = simple_mkdir(current->fs->pwd, "UNNAMED", 0777, 1);
+        if (IS_ERR(dentry)) {
+                rc = PTR_ERR(dentry);
+                CERROR("cannot create UNNAMED directory: rc = %d\n", rc);
+                GOTO(err_unnamed, rc);
+        }
+        mds->mds_unnamed_dir = dentry;
+
         /* open and test the last rcvd file */
         file = filp_open(LAST_RCVD, O_RDWR | O_CREAT, 0644);
         if (IS_ERR(file)) {
@@ -475,8 +520,12 @@ int mds_fs_setup(struct obd_device *obd, struct vfsmount *mnt)
                 GOTO(err_lov_objid, rc = -ENOENT);
         }
 err_pop:
-        pop_ctxt(&saved, &obd->obd_ctxt, NULL);
-
+        if (!rc) {
+                rc = mds_fs_post_setup(obd);
+                if (rc)
+                        CERROR("can not post setup fsfilt\n");        
+        }
+        pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
         return rc;
 
 err_lov_objid:
@@ -487,6 +536,10 @@ err_client:
 err_last_rcvd:
         if (mds->mds_rcvd_filp && filp_close(mds->mds_rcvd_filp, 0))
                 CERROR("can't close %s after error\n", LAST_RCVD);
+err_unnamed:
+        dput(mds->mds_unnamed_dir);
+err_fids:
+        dput(mds->mds_fids_dir);
 err_objects:
         dput(mds->mds_objects_dir);
 err_logs:
@@ -498,11 +551,17 @@ err_fid:
         goto err_pop;
 }
 
+static int  mds_fs_post_cleanup(struct obd_device *obd)
+{
+        int    rc = 0;
+        rc = fsfilt_post_cleanup(obd);
+        return rc; 
+}
 
 int mds_fs_cleanup(struct obd_device *obd, int flags)
 {
         struct mds_obd *mds = &obd->u.mds;
-        struct obd_run_ctxt saved;
+        struct lvfs_run_ctxt saved;
         int rc = 0;
 
         if (flags & OBD_OPT_FAILOVER)
@@ -512,7 +571,7 @@ int mds_fs_cleanup(struct obd_device *obd, int flags)
         class_disconnect_exports(obd, flags); /* cleans up client info too */
         mds_server_free_data(mds);
 
-        push_ctxt(&saved, &obd->obd_ctxt, NULL);
+        push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
         if (mds->mds_rcvd_filp) {
                 rc = filp_close(mds->mds_rcvd_filp, 0);
                 mds->mds_rcvd_filp = NULL;
@@ -525,6 +584,14 @@ int mds_fs_cleanup(struct obd_device *obd, int flags)
                 if (rc)
                         CERROR("%s file won't close, rc=%d\n", LOV_OBJID, rc);
         }
+        if (mds->mds_unnamed_dir != NULL) {
+                l_dput(mds->mds_unnamed_dir);
+                mds->mds_unnamed_dir = NULL;
+        }
+        if (mds->mds_fids_dir != NULL) {
+                l_dput(mds->mds_fids_dir);
+                mds->mds_fids_dir = NULL;
+        }
         if (mds->mds_objects_dir != NULL) {
                 l_dput(mds->mds_objects_dir);
                 mds->mds_objects_dir = NULL;
@@ -537,7 +604,9 @@ int mds_fs_cleanup(struct obd_device *obd, int flags)
                 l_dput(mds->mds_pending_dir);
                 mds->mds_pending_dir = NULL;
         }
-        pop_ctxt(&saved, &obd->obd_ctxt, NULL);
+        rc = mds_fs_post_cleanup(obd);
+        
+        pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
         shrink_dcache_parent(mds->mds_fid_de);
         dput(mds->mds_fid_de);
 
@@ -548,31 +617,64 @@ int mds_fs_cleanup(struct obd_device *obd, int flags)
  * performance sensitive, it is accomplished by creating a file, checking the
  * fid, and renaming it. */
 int mds_obd_create(struct obd_export *exp, struct obdo *oa,
-                      struct lov_stripe_md **ea, struct obd_trans_info *oti)
+                   struct lov_stripe_md **ea, struct obd_trans_info *oti)
 {
         struct mds_obd *mds = &exp->exp_obd->u.mds;
         struct inode *parent_inode = mds->mds_objects_dir->d_inode;
-        unsigned int tmpname = ll_insecure_random_int();
         struct file *filp;
-        struct dentry *new_child;
-        struct obd_run_ctxt saved;
+        struct dentry *dchild;
+        struct lvfs_run_ctxt saved;
         char fidname[LL_FID_NAMELEN];
         void *handle;
         int rc = 0, err, namelen;
         ENTRY;
 
-        push_ctxt(&saved, &exp->exp_obd->obd_ctxt, NULL);
-        
-        sprintf(fidname, "OBJECTS/%u", tmpname);
+        push_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL);
+        down(&parent_inode->i_sem);
+        if (oa->o_id) {
+                namelen = ll_fid2str(fidname, oa->o_id, oa->o_generation);
+                dchild = lookup_one_len(fidname, mds->mds_objects_dir, namelen);
+                if (IS_ERR(dchild))
+                        GOTO(out_pop, rc = PTR_ERR(dchild));
+
+                if (dchild->d_inode == NULL) {
+                        struct dentry_params dp;
+                        struct inode *inode;
+
+                        dchild->d_fsdata = (void *) &dp;
+                        dp.p_ptr = NULL;
+                        dp.p_inum = oa->o_id;
+                        rc = ll_vfs_create(parent_inode, dchild, S_IFREG, NULL);
+                        if (dchild->d_fsdata == (void *)(unsigned long)oa->o_id)
+                                dchild->d_fsdata = NULL;
+                        if (rc) {
+                                CDEBUG(D_INODE, "err during create: %d\n", rc);
+                                dput(dchild);
+                                GOTO(out_pop, rc);
+                        }
+                        inode = dchild->d_inode;
+                        LASSERT(inode->i_ino == oa->o_id);
+                        inode->i_generation = oa->o_generation;
+                        CDEBUG(D_HA, "recreated ino %lu with gen %u\n",
+                               inode->i_ino, inode->i_generation);
+                        mark_inode_dirty(inode);
+                } else {
+                        CWARN("it should be here!\n");
+                }
+                GOTO(out_pop, rc);
+        }
+
+        sprintf(fidname, "OBJECTS/%u.%u",ll_insecure_random_int(),current->pid);
         filp = filp_open(fidname, O_CREAT | O_EXCL, 0644);
         if (IS_ERR(filp)) {
                 rc = PTR_ERR(filp);
                 if (rc == -EEXIST) {
-                        CERROR("impossible object name collision %u\n",
-                               tmpname);
+                        CERROR("impossible object name collision %s\n",
+                               fidname);
                         LBUG();
                 }
-                CERROR("error creating tmp object %u: rc %d\n", tmpname, rc);
+                CERROR("error creating tmp object %s: rc %d\n", fidname, rc);
                 GOTO(out_pop, rc);
         }
 
@@ -582,14 +684,13 @@ int mds_obd_create(struct obd_export *exp, struct obdo *oa,
         oa->o_generation = filp->f_dentry->d_inode->i_generation;
         namelen = ll_fid2str(fidname, oa->o_id, oa->o_generation);
 
-        down(&parent_inode->i_sem);
-        new_child = lookup_one_len(fidname, mds->mds_objects_dir, namelen);
+        dchild = lookup_one_len(fidname, mds->mds_objects_dir, namelen);
 
-        if (IS_ERR(new_child)) {
+        if (IS_ERR(dchild)) {
                 CERROR("getting neg dentry for obj rename: %d\n", rc);
-                GOTO(out_close, rc = PTR_ERR(new_child));
+                GOTO(out_close, rc = PTR_ERR(dchild));
         }
-        if (new_child->d_inode != NULL) {
+        if (dchild->d_inode != NULL) {
                 CERROR("impossible non-negative obj dentry " LPU64":%u!\n",
                        oa->o_id, oa->o_generation);
                 LBUG();
@@ -602,30 +703,31 @@ int mds_obd_create(struct obd_export *exp, struct obdo *oa,
 
         lock_kernel();
         rc = vfs_rename(mds->mds_objects_dir->d_inode, filp->f_dentry,
-                        mds->mds_objects_dir->d_inode, new_child);
+                        mds->mds_objects_dir->d_inode, dchild);
         unlock_kernel();
         if (rc)
                 CERROR("error renaming new object "LPU64":%u: rc %d\n",
                        oa->o_id, oa->o_generation, rc);
 
-        err = fsfilt_commit(exp->exp_obd, mds->mds_objects_dir->d_inode,
-                            handle, 0);
-        if (!err)
-                oa->o_valid |= OBD_MD_FLID | OBD_MD_FLGENER;
-        else if (!rc)
+        err = fsfilt_commit(exp->exp_obd, mds->mds_sb, 
+                            mds->mds_objects_dir->d_inode, handle, 0);
+        if (!err) {
+                oa->o_gr = FILTER_GROUP_FIRST_MDS + mds->mds_num;
+                oa->o_valid |= OBD_MD_FLID | OBD_MD_FLGENER | OBD_MD_FLGROUP;
+        } else if (!rc)
                 rc = err;
 out_dput:
-        dput(new_child);
+        dput(dchild);
 out_close:
-        up(&parent_inode->i_sem);
         err = filp_close(filp, 0);
         if (err) {
-                CERROR("closing tmpfile %u: rc %d\n", tmpname, rc);
+                CERROR("closing tmpfile %s: rc %d\n", fidname, rc);
                 if (!rc)
                         rc = err;
         }
 out_pop:
-        pop_ctxt(&saved, &exp->exp_obd->obd_ctxt, NULL);
+        up(&parent_inode->i_sem);
+        pop_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL);
         RETURN(rc);
 }
 
@@ -635,14 +737,14 @@ int mds_obd_destroy(struct obd_export *exp, struct obdo *oa,
         struct mds_obd *mds = &exp->exp_obd->u.mds;
         struct inode *parent_inode = mds->mds_objects_dir->d_inode;
         struct obd_device *obd = exp->exp_obd;
-        struct obd_run_ctxt saved;
+        struct lvfs_run_ctxt saved;
         char fidname[LL_FID_NAMELEN];
         struct dentry *de;
         void *handle;
         int err, namelen, rc = 0;
         ENTRY;
 
-        push_ctxt(&saved, &obd->obd_ctxt, NULL);
+        push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
 
         namelen = ll_fid2str(fidname, oa->o_id, oa->o_generation);
 
@@ -658,22 +760,23 @@ int mds_obd_destroy(struct obd_export *exp, struct obdo *oa,
            that is unlinked, not spanned across multiple OSTs */
         handle = fsfilt_start_log(obd, mds->mds_objects_dir->d_inode,
                                   FSFILT_OP_UNLINK, oti, 1);
-        if (IS_ERR(handle)) {
+
+        if (IS_ERR(handle))
                 GOTO(out_dput, rc = PTR_ERR(handle));
-        }
         
         rc = vfs_unlink(mds->mds_objects_dir->d_inode, de);
         if (rc) 
                 CERROR("error destroying object "LPU64":%u: rc %d\n",
                        oa->o_id, oa->o_generation, rc);
         
-        err = fsfilt_commit(obd, mds->mds_objects_dir->d_inode, handle, 0);
+        err = fsfilt_commit(obd, mds->mds_sb, mds->mds_objects_dir->d_inode, 
+                            handle, 0);
         if (err && !rc)
                 rc = err;
 out_dput:
         if (de != NULL)
                 l_dput(de);
         up(&parent_inode->i_sem);
-        pop_ctxt(&saved, &obd->obd_ctxt, NULL);
+        pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
         RETURN(rc);
 }