Whamcloud - gitweb
Add dcount assertions in obdfilter to catch any funny business (still
authoradilger <adilger>
Thu, 29 Aug 2002 00:59:08 +0000 (00:59 +0000)
committeradilger <adilger>
Thu, 29 Aug 2002 00:59:08 +0000 (00:59 +0000)
open-unlink problems).

Only try to get a cache page if we already have a locked page.  This
is not Peter's "ignore page locks" code yet, but it's what I have in
my tree already and want to flush so I can test at LLNL.

lustre/obdfilter/filter.c

index 29fd3ad..1dfa901 100644 (file)
@@ -50,6 +50,14 @@ static int filter_id(char *buf, obd_id id, obd_mode mode)
                        (unsigned long long)id);
 }
 
+static inline void f_dput(struct dentry *dentry)
+{
+        CDEBUG(D_INODE, "putting %s: %p, count = %d\n",
+               dentry->d_name.name, dentry, atomic_read(&dentry->d_count) - 1);
+        LASSERT(atomic_read(&dentry->d_count) > 0);
+        dput(dentry);
+}
+
 /* setup the object store with correct subdirectories */
 static int filter_prep(struct obd_device *obddev)
 {
@@ -81,7 +89,7 @@ static int filter_prep(struct obd_device *obddev)
         }
         CDEBUG(D_INODE, "putting P: %p, count = %d\n", dentry,
                atomic_read(&dentry->d_count) - 1);
-        dput(dentry);
+        f_dput(dentry);
         dentry = simple_mkdir(current->fs->pwd, "D", 0700);
         CDEBUG(D_INODE, "got/created D: %p\n", dentry);
         if (IS_ERR(dentry)) {
@@ -91,7 +99,7 @@ static int filter_prep(struct obd_device *obddev)
         }
         CDEBUG(D_INODE, "putting D: %p, count = %d\n", dentry,
                atomic_read(&dentry->d_count) - 1);
-        dput(dentry);
+        f_dput(dentry);
 
         /*
          * Create directories and/or get dentries for each object type.
@@ -173,17 +181,12 @@ out_O_mode:
         while (mode-- > 0) {
                 struct dentry *dentry = filter->fo_dentry_O_mode[mode];
                 if (dentry) {
-                        CDEBUG(D_INODE, "putting O/%s: %p, count = %d\n",
-                               obd_type_by_mode[mode], dentry,
-                               atomic_read(&dentry->d_count) - 1);
-                        dput(dentry);
+                        f_dput(dentry);
                         filter->fo_dentry_O_mode[mode] = NULL;
                 }
         }
 out_O:
-        CDEBUG(D_INODE, "putting O: %p, count = %d\n", filter->fo_dentry_O,
-               atomic_read(&filter->fo_dentry_O->d_count) - 1);
-        dput(filter->fo_dentry_O);
+        f_dput(filter->fo_dentry_O);
         filter->fo_dentry_O = NULL;
         goto out;
 }
@@ -219,16 +222,11 @@ static void filter_post(struct obd_device *obddev)
         for (mode = 0; mode < (S_IFMT >> S_SHIFT); mode++) {
                 struct dentry *dentry = filter->fo_dentry_O_mode[mode];
                 if (dentry) {
-                        CDEBUG(D_INODE, "putting O/%s: %p, count = %d\n",
-                               obd_type_by_mode[mode], dentry,
-                               atomic_read(&dentry->d_count) - 1);
-                        dput(dentry);
+                        f_dput(dentry);
                         filter->fo_dentry_O_mode[mode] = NULL;
                 }
         }
-        CDEBUG(D_INODE, "putting O: %p, count = %d\n", filter->fo_dentry_O,
-               atomic_read(&filter->fo_dentry_O->d_count) - 1);
-        dput(filter->fo_dentry_O);
+        f_dput(filter->fo_dentry_O);
 out:
         pop_ctxt(&saved);
 }
@@ -236,11 +234,10 @@ out:
 
 static __u64 filter_next_id(struct obd_device *obddev)
 {
-        __u64 id;
+        obd_id id;
 
         spin_lock(&obddev->u.filter.fo_lock);
-        obddev->u.filter.fo_lastino++;
-        id = obddev->u.filter.fo_lastino;
+        id = ++obddev->u.filter.fo_lastino;
         spin_unlock(&obddev->u.filter.fo_lock);
 
         /* FIXME: write the lastino to disk here */
@@ -280,15 +277,17 @@ static struct dentry *filter_fid2dentry(struct obd_device *obddev,
         CDEBUG(D_INODE, "opening object O/%s/%s\n", obd_mode_to_type(type),
                name);
         dchild = lookup_one_len(name, dparent, len);
-        CDEBUG(D_INODE, "got child obj O/%s/%s: %p, count = %d\n",
-               obd_mode_to_type(type), name, dchild,
-               atomic_read(&dchild->d_count));
-
         if (IS_ERR(dchild)) {
                 CERROR("child lookup error %ld\n", PTR_ERR(dchild));
                 RETURN(dchild);
         }
 
+        CDEBUG(D_INODE, "got child obj O/%s/%s: %p, count = %d\n",
+               obd_mode_to_type(type), name, dchild,
+               atomic_read(&dchild->d_count));
+
+        LASSERT(atomic_read(&dchild->d_count) > 0);
+
         RETURN(dchild);
 }
 
@@ -478,7 +477,7 @@ static inline void filter_from_inode(struct obdo *oa, struct inode *inode,
         EXIT;
 }
 
-static int filter_getattr(struct lustre_handle *conn, struct obdo *oa, 
+static int filter_getattr(struct lustre_handle *conn, struct obdo *oa,
                           struct lov_stripe_md *md)
 {
         struct obd_device *obddev = class_conn2obd(conn);
@@ -496,9 +495,9 @@ static int filter_getattr(struct lustre_handle *conn, struct obdo *oa,
         if (IS_ERR(dentry))
                 RETURN(PTR_ERR(dentry));
 
-        filter_from_inode(oa, dentry->d_inode, oa->o_valid & ~OBD_MD_FLID);
+        filter_from_inode(oa, dentry->d_inode, oa->o_valid);
 
-        dput(dentry);
+        f_dput(dentry);
         RETURN(0);
 }
 
@@ -537,9 +536,7 @@ static int filter_setattr(struct lustre_handle *conn, struct obdo *oa,
         }
         unlock_kernel();
 
-        CDEBUG(D_INODE, "put dentry %p, count = %d\n", inode,
-               atomic_read(&dentry->d_count) - 1);
-        dput(dentry);
+        f_dput(dentry);
         RETURN(rc);
 }
 
@@ -583,17 +580,14 @@ static int filter_close(struct lustre_handle *conn, struct obdo *oa,
                                    oa->o_id, oa->o_mode);
         if (IS_ERR(dentry))
                 RETURN(PTR_ERR(dentry));
+        LASSERT(atomic_read(&dentry->d_count) > 1);
 
-        CDEBUG(D_INODE, "put dentry %p, count = %d\n", dentry,
-               atomic_read(&dentry->d_count) - 1);
-        dput(dentry);  /* for the close */
-        CDEBUG(D_INODE, "put dentry %p, count = %d\n", dentry,
-               atomic_read(&dentry->d_count) - 1);
-        dput(dentry);  /* for this call */
+        f_dput(dentry);  /* for the close */
+        f_dput(dentry);  /* for this call */
         return 0;
 } /* filter_close */
 
-static int filter_create(struct lustre_handle* conn, struct obdo *oa, 
+static int filter_create(struct lustre_handle* conn, struct obdo *oa,
                          struct lov_stripe_md **ea)
 {
         char name[64];
@@ -628,8 +622,7 @@ static int filter_create(struct lustre_handle* conn, struct obdo *oa,
 
         /* Set flags for fields we have set in the inode struct */
         oa->o_valid = OBD_MD_FLID | OBD_MD_FLBLKSZ | OBD_MD_FLBLOCKS |
-                 OBD_MD_FLMTIME | OBD_MD_FLATIME | OBD_MD_FLCTIME |
-                 OBD_MD_FLUID | OBD_MD_FLGID;
+                 OBD_MD_FLMTIME | OBD_MD_FLATIME | OBD_MD_FLCTIME;
         filter_from_inode(oa, file->f_dentry->d_inode, oa->o_valid);
         filp_close(file, 0);
 
@@ -681,9 +674,7 @@ static int filter_destroy(struct lustre_handle *conn, struct obdo *oa,
 
         rc = vfs_unlink(dir_dentry->d_inode, object_dentry);
         pop_ctxt(&saved);
-        CDEBUG(D_INODE, "put child %p, count = %d\n", object_dentry,
-               atomic_read(&object_dentry->d_count) - 1);
-        dput(object_dentry);
+        f_dput(object_dentry);
 
         EXIT;
 out:
@@ -739,18 +730,17 @@ static int filter_pgcache_brw(int cmd, struct lustre_handle *conn,
 
         /* count doubles as retval */
         for (pg = 0; pg < oa_bufs; pg++) {
-                CDEBUG(D_INODE, "OP %d obdo pgno: (%d) (%ld,%ld) "
-                       "off count (%Ld,%Ld)\n",
+                CDEBUG(D_INODE, "OP %d obdo pgno: (%d) (%ld,"LPU64
+                       ") off count ("LPU64",%d)\n",
                        cmd, pnum, file->f_dentry->d_inode->i_ino,
-                       (unsigned long)pga[pnum].off >> PAGE_CACHE_SHIFT,
-                       (unsigned long long)pga[pnum].off,
-                       (unsigned long long)pga[pnum].count);
+                       pga[pnum].off >> PAGE_CACHE_SHIFT, pga[pnum].off,
+                       (int)pga[pnum].count);
                 if (cmd & OBD_BRW_WRITE) {
                         loff_t off;
                         char *buffer;
                         off = pga[pnum].off;
                         buffer = kmap(pga[pnum].pg);
-                        retval = file->f_op->write(file, buffer, 
+                        retval = file->f_op->write(file, buffer,
                                                    pga[pnum].count,
                                                    &off);
                         kunmap(pga[pnum].pg);
@@ -961,14 +951,18 @@ static int filter_journal_stop(void *journal_save, struct filter_obd *filter,
 }
 
 struct page *filter_get_page_write(struct inode *inode, unsigned long index,
-                                   struct niobuf_local *lnb)
+                                   struct niobuf_local *lnb, int *pglocked)
 {
         struct address_space *mapping = inode->i_mapping;
         struct page *page;
         int rc;
 
         //ASSERT_PAGE_INDEX(index, GOTO(err, rc = -EINVAL));
-        page = grab_cache_page_nowait(mapping, index); /* locked page */
+        if (*pglocked)
+                page = grab_cache_page_nowait(mapping, index); /* locked page */
+        else
+                page = grab_cache_page(mapping, index); /* locked page */
+
 
         /* This page is currently locked, so get a temporary page instead. */
         /* XXX I believe this is a very dangerous thing to do - consider if
@@ -981,10 +975,7 @@ struct page *filter_get_page_write(struct inode *inode, unsigned long index,
          */
         if (!page) {
                 unsigned long addr;
-                /* debugging: just seeing if this ever happens
                 CDEBUG(D_PAGE, "ino %ld page %ld locked\n", inode->i_ino,index);
-                */
-                CERROR("writing ino %ld page %ld locked\n", inode->i_ino,index);
                 addr = __get_free_pages(GFP_KERNEL, 0); /* locked page */
                 if (!addr) {
                         CERROR("no memory for a temp page\n");
@@ -996,11 +987,13 @@ struct page *filter_get_page_write(struct inode *inode, unsigned long index,
                 page->index = index;
                 lnb->flags |= N_LOCAL_TEMP_PAGE;
         } else if (!IS_ERR(page)) {
+                (*pglocked)++;
+                kmap(page);
+
                 /* Note: Called with "O" and "PAGE_SIZE" this is essentially
                  * a no-op for most filesystems, because we write the whole
                  * page.  For partial-page I/O this will read in the page.
                  */
-                kmap(page);
                 rc = mapping->a_ops->prepare_write(NULL, page, 0, PAGE_SIZE);
                 if (rc) {
                         CERROR("page index %lu, rc = %d\n", index, rc);
@@ -1071,6 +1064,7 @@ static int filter_preprw(int cmd, struct lustre_handle *conn,
         struct niobuf_remote *b = nb;
         struct niobuf_local *r = res;
         void *journal_save = NULL;
+        int pglocked = 0;
         int rc = 0;
         int i;
         ENTRY;
@@ -1102,7 +1096,7 @@ static int filter_preprw(int cmd, struct lustre_handle *conn,
                 if (!inode) {
                         CERROR("trying to BRW to non-existent file %Ld\n",
                                (unsigned long long)o->ioo_id);
-                        dput(dentry);
+                        f_dput(dentry);
                         GOTO(out_clean, rc = -ENOENT);
                 }
 
@@ -1116,12 +1110,13 @@ static int filter_preprw(int cmd, struct lustre_handle *conn,
                                 r->dentry = dget(dentry);
 
                         if (cmd & OBD_BRW_WRITE)
-                                page = filter_get_page_write(inode, index, r);
+                                page = filter_get_page_write(inode, index, r,
+                                                             &pglocked);
                         else
                                 page = lustre_get_page_read(inode, index);
 
                         if (IS_ERR(page)) {
-                                dput(dentry);
+                                f_dput(dentry);
                                 GOTO(out_clean, rc = PTR_ERR(page));
                         }
 
@@ -1144,7 +1139,8 @@ out_ctxt:
         RETURN(rc);
 out_clean:
         while (r-- > res) {
-                dput(r->dentry);
+                CERROR("error cleanup on brw\n");
+                f_dput(r->dentry);
                 if (cmd & OBD_BRW_WRITE)
                         filter_commit_write(r->page, 0, PAGE_SIZE, rc);
                 else
@@ -1234,7 +1230,7 @@ static int filter_commitrw(int cmd, struct lustre_handle *conn,
                         } else
                                 lustre_put_page(page);
 
-                        dput(r->dentry);
+                        f_dput(r->dentry);
                 }
         }
         current->journal_info = journal_save;
@@ -1252,7 +1248,7 @@ static int filter_commitrw(int cmd, struct lustre_handle *conn,
                         err = filter_write_locked_page(r);
                         if (!rc)
                                 rc = err;
-                        dput(r->dentry);
+                        f_dput(r->dentry);
                 }
         }
 
@@ -1385,8 +1381,8 @@ int filter_copy_data(struct lustre_handle *dst_conn, struct obdo *dst,
         }
         dst->o_size = src->o_size;
         dst->o_blocks = src->o_blocks;
-        dst->o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
-        UnlockPage(page);
+        dst->o_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
+        unlock_page(page);
         __free_page(page);
 
         RETURN(err);