Whamcloud - gitweb
b=1642
[fs/lustre-release.git] / lustre / obdfilter / filter_io.c
index 0885a80..c4754ae 100644 (file)
@@ -46,6 +46,8 @@ static int filter_start_page_read(struct inode *inode, struct niobuf_local *lnb)
         if (IS_ERR(page))
                 return lnb->rc = PTR_ERR(page);
 
+        LASSERT(page->mapping == mapping);
+
         lnb->page = page;
 
         if (inode->i_size < lnb->offset + lnb->len - 1)
@@ -242,11 +244,12 @@ static int filter_preprw_read(int cmd, struct obd_export *exp, struct obdo *oa,
         struct fsfilt_objinfo *fso;
         struct dentry *dentry;
         struct inode *inode;
-        int rc = 0, i, j, tot_bytes = 0;
+        int rc = 0, i, j, tot_bytes = 0, cleanup_phase = 0;
         unsigned long now = jiffies;
         ENTRY;
 
-        /* We are currently not supporting multi-obj BRW_READ RPCS at all */
+        /* We are currently not supporting multi-obj BRW_READ RPCS at all.
+         * When we do this function's dentry cleanup will need to be fixed */
         LASSERT(objcount == 1);
 
         OBD_ALLOC(fso, objcount * sizeof(*fso));
@@ -262,13 +265,13 @@ static int filter_preprw_read(int cmd, struct obd_export *exp, struct obdo *oa,
 
                 dentry = filter_oa2dentry(exp->exp_obd, oa);
                 if (IS_ERR(dentry))
-                        GOTO(out_objinfo, rc = PTR_ERR(dentry));
+                        GOTO(cleanup, rc = PTR_ERR(dentry));
 
                 if (dentry->d_inode == NULL) {
                         CERROR("trying to BRW to non-existent file "LPU64"\n",
                                o->ioo_id);
                         f_dput(dentry);
-                        GOTO(out_objinfo, rc = -ENOENT);
+                        GOTO(cleanup, rc = -ENOENT);
                 }
 
                 fso[i].fso_dentry = dentry;
@@ -288,11 +291,7 @@ static int filter_preprw_read(int cmd, struct obd_export *exp, struct obdo *oa,
                 inode = dentry->d_inode;
 
                 for (j = 0; j < o->ioo_bufcnt; j++, rnb++, lnb++) {
-                        if (j == 0)
-                                lnb->dentry = dentry;
-                        else
-                                lnb->dentry = dget(dentry);
-
+                        lnb->dentry = dentry;
                         lnb->offset = rnb->offset;
                         lnb->len    = rnb->len;
                         lnb->flags  = rnb->flags;
@@ -302,7 +301,6 @@ static int filter_preprw_read(int cmd, struct obd_export *exp, struct obdo *oa,
                                 /* If there's no more data, abort early.
                                  * lnb->page == NULL and lnb->rc == 0, so it's
                                  * easy to detect later. */
-                                f_dput(dentry);
                                 lnb->dentry = NULL;
                                 break;
                         } else {
@@ -314,13 +312,16 @@ static int filter_preprw_read(int cmd, struct obd_export *exp, struct obdo *oa,
                                        "page err %u@"LPU64" %u/%u %p: rc %d\n",
                                        lnb->len, lnb->offset, j, o->ioo_bufcnt,
                                        dentry, rc);
-                                f_dput(dentry);
-                                GOTO(out_pages, rc);
+                                cleanup_phase = 1;
+                                GOTO(cleanup, rc);
                         }
 
                         tot_bytes += lnb->rc;
-                        if (lnb->rc < lnb->len)
-                                break; /* short read */
+                        if (lnb->rc < lnb->len) {
+                                /* short read, be sure to wait on it */
+                                lnb++;
+                                break;
+                        }
                 }
         }
 
@@ -334,8 +335,8 @@ static int filter_preprw_read(int cmd, struct obd_export *exp, struct obdo *oa,
                 if (rc) {
                         CERROR("error page %u@"LPU64" %u %p: rc %d\n", lnb->len,
                                lnb->offset, (int)(lnb - res), lnb->dentry, rc);
-                        f_dput(lnb->dentry);
-                        GOTO(out_pages, rc);
+                        cleanup_phase = 1;
+                        GOTO(cleanup, rc);
                 }
         }
 
@@ -343,24 +344,20 @@ static int filter_preprw_read(int cmd, struct obd_export *exp, struct obdo *oa,
                 CERROR("slow prep finish page %lus\n", (jiffies - now) / HZ);
 
         EXIT;
-out:
-        OBD_FREE(fso, objcount * sizeof(*fso));
-        /* we saved the journal handle into oti->oti_handle instead */
-        current->journal_info = NULL;
-        pop_ctxt(&saved, &exp->exp_obd->u.filter.fo_ctxt, NULL);
-        return rc;
 
-out_pages:
-        while (lnb-- > res) {
-                page_cache_release(lnb->page);
-                f_dput(lnb->dentry);
+ cleanup:
+        switch (cleanup_phase) {
+        case 1:
+                for (lnb = res; lnb < (res + niocount); lnb++) {
+                        if (lnb->page)
+                                page_cache_release(lnb->page);
+                }
+                f_dput(res->dentry);
+        case 0:
+                OBD_FREE(fso, objcount * sizeof(*fso));
+                pop_ctxt(&saved, &exp->exp_obd->u.filter.fo_ctxt, NULL);
         }
-        goto out; /* dropped the dentry refs already (one per page) */
-
-out_objinfo:
-        for (i = 0; i < objcount && fso[i].fso_dentry; i++)
-                f_dput(fso[i].fso_dentry);
-        goto out;
+        return rc;
 }
 
 /* We need to balance prepare_write() calls with commit_write() calls.
@@ -600,9 +597,30 @@ static int filter_write_locked_page(struct niobuf_local *lnb)
         RETURN(rc);
 }
 
-int filter_commitrw(int cmd, struct obd_export *exp, struct obdo *oa,
-                    int objcount, struct obd_ioobj *obj, int niocount,
-                    struct niobuf_local *res, struct obd_trans_info *oti)
+static int filter_commitrw_read(struct obd_export *exp, int objcount,
+                                struct obd_ioobj *obj, int niocount,
+                                struct niobuf_local *res,
+                                struct obd_trans_info *oti)
+{
+        struct obd_ioobj *o;
+        struct niobuf_local *lnb;
+        int i, j;
+        ENTRY;
+
+        for (i = 0, o = obj, lnb = res; i < objcount; i++, o++) {
+                for (j = 0 ; j < o->ioo_bufcnt ; j++, lnb++) {
+                        if (lnb->page != NULL)
+                                page_cache_release(lnb->page);
+                }
+        }
+        f_dput(res->dentry);
+        RETURN(0);
+}
+
+static int
+filter_commitrw_write(int cmd, struct obd_export *exp, struct obdo *oa,
+                      int objcount, struct obd_ioobj *obj, int niocount,
+                      struct niobuf_local *res, struct obd_trans_info *oti)
 {
         struct obd_run_ctxt saved;
         struct obd_ioobj *o;
@@ -739,6 +757,21 @@ int filter_commitrw(int cmd, struct obd_export *exp, struct obdo *oa,
         RETURN(rc);
 }
 
+/* XXX needs to trickle its oa down */
+int filter_commitrw(int cmd, struct obd_export *exp, struct obdo *oa,
+                    int objcount, struct obd_ioobj *obj, int niocount,
+                    struct niobuf_local *res, struct obd_trans_info *oti)
+{
+        if (cmd == OBD_BRW_WRITE)
+                return filter_commitrw_write(cmd, exp, oa, objcount, obj,
+                                             niocount, res, oti);
+        if (cmd == OBD_BRW_READ)
+                return filter_commitrw_read(exp, objcount, obj, niocount,
+                                            res, oti);
+        LBUG();
+        return -EPROTO;
+}
+
 int filter_brw(int cmd, struct lustre_handle *conn, struct obdo *oa,
                struct lov_stripe_md *lsm, obd_count oa_bufs,
                struct brw_page *pga, struct obd_trans_info *oti)