Whamcloud - gitweb
- add mballoc to ldiskfs series
[fs/lustre-release.git] / lustre / obdfilter / filter_io.c
index 10cc687..1c9cd4d 100644 (file)
 #include <linux/lustre_snap.h>
 #include "filter_internal.h"
 
-static int filter_start_page_read(struct obd_device *obd, struct inode *inode,
-                                  struct niobuf_local *lnb)
+int *obdfilter_created_scratchpad;
+
+static int filter_alloc_dio_page(struct obd_device *obd, struct inode *inode,
+                                 struct niobuf_local *lnb)
+
 {
         struct page *page;
-        unsigned long index = lnb->offset >> PAGE_SHIFT;
-
-        page = fsfilt_getpage(obd, inode, index);
-        if (IS_ERR(page)) {
-                CERROR("page index %lu, rc = %ld\n", index, PTR_ERR(page));
+        ENTRY;
+        page = alloc_pages(GFP_HIGHUSER, 0);
+        if (page == NULL) {
+                CERROR("no memory for a temp page\n");
+                lnb->rc = -ENOMEM;
+                RETURN(-ENOMEM);
+        }
 
-                lnb->page = NULL;
-                lnb->rc = PTR_ERR(page);
-                return lnb->rc;
+#if 0
+        POISON_PAGE(page, 0xf1);
+        if (lnb->len != PAGE_SIZE) {
+                memset(kmap(page) + lnb->len, 0, PAGE_SIZE - lnb->len);
+                kunmap(page);
         }
+#endif
+        page->index = lnb->offset >> PAGE_SHIFT;
 
         lnb->page = page;
 
-        return 0;
+        RETURN(0);
 }
 
-static int filter_finish_page_read(struct niobuf_local *lnb)
+void filter_free_dio_pages(int objcount, struct obd_ioobj *obj,
+                           int niocount, struct niobuf_local *res)
 {
-        if (lnb->page == NULL)
-                return 0;
-
-        if (PageUptodate(lnb->page))
-                return 0;
+        int i, j;
 
-        wait_on_page(lnb->page);
-        if (!PageUptodate(lnb->page)) {
-                CERROR("page index %lu/offset "LPX64" not uptodate\n",
-                       lnb->page->index, lnb->offset);
-                GOTO(err_page, lnb->rc = -EIO);
-        }
-        if (PageError(lnb->page)) {
-                CERROR("page index %lu/offset "LPX64" has error\n",
-                       lnb->page->index, lnb->offset);
-                GOTO(err_page, lnb->rc = -EIO);
+        for (i = 0; i < objcount; i++, obj++) {
+                for (j = 0 ; j < obj->ioo_bufcnt ; j++, res++) {
+                        if (res->page != NULL) {
+                                __free_page(res->page);
+                                res->page = NULL;
+                        }
+                }
         }
-
-        return 0;
-
-err_page:
-        page_cache_release(lnb->page);
-        lnb->page = NULL;
-        return lnb->rc;
 }
 
 /* Grab the dirty and seen grant announcements from the incoming obdo.
@@ -271,150 +268,109 @@ static int filter_preprw_read(int cmd, struct obd_export *exp, struct obdo *oa,
 {
         struct obd_device *obd = exp->exp_obd;
         struct lvfs_run_ctxt saved;
-        struct obd_ioobj *o;
         struct niobuf_remote *rnb;
-        struct niobuf_local *lnb = NULL;
-        struct fsfilt_objinfo *fso;
-        struct dentry *dentry;
+        struct niobuf_local *lnb;
+        struct dentry *dentry = NULL;
         struct inode *inode;
-        int rc = 0, i, j, tot_bytes = 0, cleanup_phase = 0;
+        void *iobuf = NULL;
+        int rc = 0, i, tot_bytes = 0;
         unsigned long now = jiffies;
         ENTRY;
 
         /* We are currently not supporting multi-obj BRW_READ RPCS at all.
          * When we do this function's dentry cleanup will need to be fixed */
-        LASSERT(objcount == 1);
-        LASSERT(obj->ioo_bufcnt > 0);
+        LASSERTF(objcount == 1, "%d\n", objcount);
+        LASSERTF(obj->ioo_bufcnt > 0, "%d\n", obj->ioo_bufcnt);
 
         if (oa && oa->o_valid & OBD_MD_FLGRANT) {
                 spin_lock(&obd->obd_osfs_lock);
                 filter_grant_incoming(exp, oa);
 
-#if 0
-                /* Reads do not increase grants */
-                oa->o_grant = filter_grant(exp, oa->o_grant, oa->o_undirty,
-                                           filter_grant_space_left(exp));
-#else
                 oa->o_grant = 0;
-#endif
                 spin_unlock(&obd->obd_osfs_lock);
         }
 
-        OBD_ALLOC(fso, objcount * sizeof(*fso));
-        if (fso == NULL)
-                RETURN(-ENOMEM);
-
         memset(res, 0, niocount * sizeof(*res));
 
         push_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL);
-        for (i = 0, o = obj; i < objcount; i++, o++) {
-                LASSERT(o->ioo_bufcnt);
+        rc = filter_alloc_iobuf(OBD_BRW_READ, obj->ioo_bufcnt, &iobuf);
+        if (rc)
+                GOTO(cleanup, rc);
 
-                dentry = filter_oa2dentry(obd, oa);
-                if (IS_ERR(dentry))
-                        GOTO(cleanup, rc = PTR_ERR(dentry));
+        dentry = filter_id2dentry(obd, NULL, oa->o_gr, oa->o_id);
+        if (IS_ERR(dentry))
+                GOTO(cleanup, rc = PTR_ERR(dentry));
 
-                if (dentry->d_inode == NULL) {
-                        CERROR("trying to BRW to non-existent file "LPU64"\n",
-                               o->ioo_id);
-                        f_dput(dentry);
-                        GOTO(cleanup, rc = -ENOENT);
-                }
+        inode = dentry->d_inode; 
 
-                fso[i].fso_dentry = dentry;
-                fso[i].fso_bufcnt = o->ioo_bufcnt;
-        }
+        fsfilt_check_slow(now, obd_timeout, "preprw_read setup");
 
-        if (time_after(jiffies, now + 15 * HZ))
-                CERROR("slow preprw_read setup %lus\n", (jiffies - now) / HZ);
-        else
-                CDEBUG(D_INFO, "preprw_read setup: %lu jiffies\n",
-                       (jiffies - now));
-
-        for (i = 0, o = obj, rnb = nb, lnb = res; i < objcount; i++, o++) {
-                dentry = fso[i].fso_dentry;
-                inode = dentry->d_inode;
-
-                for (j = 0; j < o->ioo_bufcnt; j++, rnb++, lnb++) {
-                        lnb->dentry = dentry;
-                        lnb->offset = rnb->offset;
-                        lnb->len    = rnb->len;
-                        lnb->flags  = rnb->flags;
-
-                        if (inode->i_size <= rnb->offset) {
-                                /* If there's no more data, abort early.
-                                 * lnb->page == NULL and lnb->rc == 0, so it's
-                                 * easy to detect later. */
-                                break;
-                        } else {
-                                rc = filter_start_page_read(obd, inode, lnb);
-                        }
+        for (i = 0, lnb = res, rnb = nb; i < obj->ioo_bufcnt;
+             i++, rnb++, lnb++) {
+                lnb->dentry = dentry;
+                lnb->offset = rnb->offset;
+                lnb->len    = rnb->len;
+                lnb->flags  = rnb->flags;
 
-                        if (rc) {
-                                CDEBUG(rc == -ENOSPC ? D_INODE : D_ERROR,
-                                       "page err %u@"LPU64" %u/%u %p: rc %d\n",
-                                       lnb->len, lnb->offset, j, o->ioo_bufcnt,
-                                       dentry, rc);
-                                cleanup_phase = 1;
-                                GOTO(cleanup, rc);
-                        }
+                if ((inode && inode->i_size <= rnb->offset) || inode == NULL)
+                        /*
+                         * if there's no more data, abort early.  lnb->page == *
+                         * NULL and lnb->rc == 0, so it's easy to detect later.
+                         */
+                        break;
+                
+                rc = filter_alloc_dio_page(obd, inode, lnb);
+                if (rc) {
+                        CDEBUG(rc == -ENOSPC ? D_INODE : D_ERROR,
+                             "page err %u@"LPU64" %u/%u %p: rc %d\n",
+                              lnb->len, lnb->offset, i, obj->ioo_bufcnt,
+                              dentry, rc);
+                        GOTO(cleanup, rc);
+                }
 
-                        if (inode->i_size < lnb->offset + lnb->len - 1)
-                                lnb->rc = inode->i_size - lnb->offset;
-                        else
-                                lnb->rc = lnb->len;
+                if (inode->i_size < lnb->offset + lnb->len - 1)
+                        lnb->rc = inode->i_size - lnb->offset;
+                else
+                        lnb->rc = lnb->len;
 
-                        tot_bytes += lnb->rc;
-                        if (lnb->rc < lnb->len) {
-                                /* short read, be sure to wait on it */
-                                lnb++;
-                                break;
-                        }
-                }
+                tot_bytes += lnb->rc;
+
+                filter_iobuf_add_page(obd, iobuf, inode, lnb->page);
         }
 
-        if (time_after(jiffies, now + 15 * HZ))
-                CERROR("slow start_page_read %lus\n", (jiffies - now) / HZ);
-        else
-                CDEBUG(D_INFO, "start_page_read: %lu jiffies\n",
-                       (jiffies - now));
+        fsfilt_check_slow(now, obd_timeout, "start_page_read");
 
-        lprocfs_counter_add(obd->obd_stats, LPROC_FILTER_READ_BYTES, tot_bytes);
-        while (lnb-- > res) {
-                rc = filter_finish_page_read(lnb);
-                if (rc) {
-                        CERROR("error page %u@"LPU64" %u %p: rc %d\n", lnb->len,
-                               lnb->offset, (int)(lnb - res), lnb->dentry, rc);
-                        cleanup_phase = 1;
+        if (inode != NULL) {
+                rc = filter_direct_io(OBD_BRW_READ, dentry, iobuf,
+                                      exp, NULL, NULL, NULL);
+                if (rc)
                         GOTO(cleanup, rc);
-                }
         }
 
-        if (time_after(jiffies, now + 15 * HZ))
-                CERROR("slow finish_page_read %lus\n", (jiffies - now) / HZ);
-        else
-                CDEBUG(D_INFO, "finish_page_read: %lu jiffies\n",
-                       (jiffies - now));
-
+        lprocfs_counter_add(obd->obd_stats,
+                            LPROC_FILTER_READ_BYTES, tot_bytes);
         filter_tally_read(&exp->exp_obd->u.filter, res, niocount);
 
         EXIT;
-
- cleanup:
-        switch (cleanup_phase) {
-        case 1:
-                for (lnb = res; lnb < (res + niocount); lnb++) {
-                        if (lnb->page)
-                                page_cache_release(lnb->page);
-                }
-                if (res->dentry != NULL)
-                        f_dput(res->dentry);
-                else
-                        CERROR("NULL dentry in cleanup -- tell CFS\n");
-        case 0:
-                OBD_FREE(fso, objcount * sizeof(*fso));
-                pop_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL);
+cleanup:
+        if (rc) {
+                filter_free_dio_pages(objcount, obj,
+                                      niocount, res);
+                /*
+                 * in other cases (no errors) dentry is released in
+                 * filter_commitrw_read().
+                 */
+                f_dput(dentry);
         }
+
+        if (iobuf != NULL)
+                filter_free_iobuf(iobuf);
+
+        pop_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL);
+
+        if (rc)
+                CERROR("io error %d\n", rc);
+        
         return rc;
 }
 
@@ -525,55 +481,6 @@ static int filter_grant_check(struct obd_export *exp, int objcount,
         return rc;
 }
 
-static int filter_start_page_write(struct obd_device *obd, struct inode *inode,
-                                   struct niobuf_local *lnb)
-{
-        struct page *page;
-
-        if (lnb->len != PAGE_SIZE)
-                return filter_start_page_read(obd, inode, lnb);
-
-        page = alloc_pages(GFP_HIGHUSER, 0);
-        if (page == NULL) {
-                CERROR("no memory for a temp page\n");
-                RETURN(lnb->rc = -ENOMEM);
-        }
-#if 0
-        POISON_PAGE(page, 0xf1);
-        if (lnb->len != PAGE_SIZE) {
-                memset(kmap(page) + lnb->len, 0, PAGE_SIZE - lnb->len);
-                kunmap(page);
-        }
-#endif
-        page->index = lnb->offset >> PAGE_SHIFT;
-        lnb->page = page;
-
-        return 0;
-}
-
-static void filter_abort_page_write(struct niobuf_local *lnb)
-{
-        LASSERT(lnb->page != NULL);
-
-        if (lnb->len != PAGE_SIZE)
-                page_cache_release(lnb->page);
-        else
-                __free_pages(lnb->page, 0);
-}
-
-/* a helper for both the 2.4 and 2.6 commitrw paths which are both built
- * up by our shared filter_preprw_write() */
-void filter_release_write_page(struct filter_obd *filter, struct inode *inode,
-                               struct niobuf_local *lnb, int rc)
-{
-        if (lnb->len != PAGE_SIZE)
-                return filter_release_read_page(filter, inode, lnb->page);
-
-        if (rc == 0)
-                flip_into_page_cache(inode, lnb->page);
-        __free_page(lnb->page);
-}
-
 /* If we ever start to support multi-object BRW RPCs, we will need to get locks
  * on mulitple inodes.  That isn't all, because there still exists the
  * possibility of a truncate starting a new transaction while holding the ext3
@@ -590,46 +497,62 @@ static int filter_preprw_write(int cmd, struct obd_export *exp, struct obdo *oa,
                                struct niobuf_local *res,
                                struct obd_trans_info *oti)
 {
+        int rc = 0, i, tot_bytes = 0, cleanup_phase = 0;
+        struct niobuf_local *lnb = res;
+        struct dentry *dentry = NULL;
+        unsigned long now = jiffies;
         struct lvfs_run_ctxt saved;
         struct niobuf_remote *rnb;
-        struct niobuf_local *lnb;
         struct fsfilt_objinfo fso;
-        struct dentry *dentry;
+        struct obd_device *obd;
         obd_size left;
-        unsigned long now = jiffies;
-        int rc = 0, i, tot_bytes = 0, cleanup_phase = 1;
+        void *iobuf; 
+        
         ENTRY;
         LASSERT(objcount == 1);
         LASSERT(obj->ioo_bufcnt > 0);
 
         memset(res, 0, niocount * sizeof(*res));
 
-        push_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL);
-        dentry = filter_fid2dentry(exp->exp_obd, NULL, obj->ioo_gr,
-                                   obj->ioo_id);
+        rc = filter_alloc_iobuf(OBD_BRW_READ, obj->ioo_bufcnt, &iobuf);
+        if (rc)
+                GOTO(cleanup, rc);
+        cleanup_phase = 1;
+
+        obd = exp->exp_obd;
+        push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
+
+        /* make sure that object is already allocated */
+        dentry = filter_crow_object(obd, obj->ioo_gr,
+                                    obj->ioo_id);
+
         if (IS_ERR(dentry))
                 GOTO(cleanup, rc = PTR_ERR(dentry));
 
-        if (dentry->d_inode == NULL) {
-                CERROR("trying to BRW to non-existent file "LPU64"\n",
-                       obj->ioo_id);
-                f_dput(dentry);
-                GOTO(cleanup, rc = -ENOENT);
-        }
+        cleanup_phase = 2;
 
+        /* 
+         * setting attrs passed along with write requests (owner/group). We
+         * goind it here as object should not exist with wrong owner/group as
+         * this may break quotas. --umka
+         */
+        rc = filter_setattr_internal(exp, dentry, oa, NULL);
+        if (rc) {
+                CERROR("cannot set attrs on write, err %d\n",
+                       rc);
+                GOTO(cleanup, rc);
+        }
+        
         fso.fso_dentry = dentry;
         fso.fso_bufcnt = obj->ioo_bufcnt;
 
-        if (time_after(jiffies, now + 15 * HZ))
-                CERROR("slow preprw_write setup %lus\n", (jiffies - now) / HZ);
-        else
-                CDEBUG(D_INFO, "preprw_write setup: %lu jiffies\n",
-                       (jiffies - now));
+        fsfilt_check_slow(now, obd_timeout, "preprw_write setup");
 
-        spin_lock(&exp->exp_obd->obd_osfs_lock);
+        spin_lock(&obd->obd_osfs_lock);
         if (oa)
                 filter_grant_incoming(exp, oa);
-        cleanup_phase = 0;
+        
+        cleanup_phase = 3;
 
         left = filter_grant_space_left(exp);
 
@@ -638,12 +561,14 @@ static int filter_preprw_write(int cmd, struct obd_export *exp, struct obdo *oa,
         if (oa && oa->o_valid & OBD_MD_FLGRANT)
                 oa->o_grant = filter_grant(exp,oa->o_grant,oa->o_undirty,left);
 
-        spin_unlock(&exp->exp_obd->obd_osfs_lock);
+        /* We're finishing using body->oa as an input variable, so reset
+         * o_valid here. */
+        oa->o_valid = 0;
 
-        if (rc) {
-                f_dput(dentry);
+        spin_unlock(&obd->obd_osfs_lock);
+
+        if (rc) 
                 GOTO(cleanup, rc);
-        }
 
         for (i = 0, rnb = nb, lnb = res; i < obj->ioo_bufcnt;
              i++, lnb++, rnb++) {
@@ -655,51 +580,67 @@ static int filter_preprw_write(int cmd, struct obd_export *exp, struct obdo *oa,
                 lnb->len    = rnb->len;
                 lnb->flags  = rnb->flags;
 
-                rc = filter_start_page_write(exp->exp_obd, dentry->d_inode,lnb);
+                rc = filter_alloc_dio_page(obd, dentry->d_inode,lnb);
                 if (rc) {
                         CERROR("page err %u@"LPU64" %u/%u %p: rc %d\n",
                                lnb->len, lnb->offset,
                                i, obj->ioo_bufcnt, dentry, rc);
-                        while (lnb-- > res)
-                                filter_abort_page_write(lnb);
-                        f_dput(dentry);
                         GOTO(cleanup, rc);
                 }
+                cleanup_phase = 4;
+
+                /* If the filter writes a partial page, then has the file
+                 * extended, the client will read in the whole page.  the
+                 * filter has to be careful to zero the rest of the partial
+                 * page on disk.  we do it by hand for partial extending
+                 * writes, send_bio() is responsible for zeroing pages when
+                 * asked to read unmapped blocks -- brw_kiovec() does this. */
+                if (lnb->len != PAGE_SIZE) {
+                        if (lnb->offset + lnb->len < dentry->d_inode->i_size) {
+                                filter_iobuf_add_page(obd, iobuf, dentry->d_inode,
+                                                      lnb->page);
+                        } else {
+                                memset(kmap(lnb->page) + lnb->len, 0,
+                                       PAGE_SIZE - lnb->len);
+                                kunmap(lnb->page);
+                        }
+                }
                 if (lnb->rc == 0)
                         tot_bytes += lnb->len;
         }
 
-        while (lnb-- > res) {
-                if (lnb->len == PAGE_SIZE)
-                        continue;
-                rc = filter_finish_page_read(lnb);
-                if (rc) {
-                        CERROR("error page %u@"LPU64" %u %p: rc %d\n", lnb->len,
-                               lnb->offset, (int)(lnb - res), lnb->dentry, rc);
-                        GOTO(cleanup, rc);
-                }
-        }
-
-        if (time_after(jiffies, now + 15 * HZ))
-                CERROR("slow start_page_write %lus\n", (jiffies - now) / HZ);
-        else
-                CDEBUG(D_INFO, "start_page_write: %lu jiffies\n",
-                       (jiffies - now));
+        rc = filter_direct_io(OBD_BRW_READ, dentry, iobuf, exp,
+                              NULL, NULL, NULL);
+        
+        fsfilt_check_slow(now, obd_timeout, "start_page_write");
 
-        lprocfs_counter_add(exp->exp_obd->obd_stats, LPROC_FILTER_WRITE_BYTES,
+        lprocfs_counter_add(obd->obd_stats, LPROC_FILTER_WRITE_BYTES,
                             tot_bytes);
         EXIT;
 cleanup:
         switch(cleanup_phase) {
+        case 4:
+                if (rc)
+                        filter_free_dio_pages(objcount, obj, niocount, res);
+        case 3:
+                pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
+                filter_free_iobuf(iobuf);
+        case 2:
+                if (rc && dentry && !IS_ERR(dentry))
+                        f_dput(dentry);
+                break;
         case 1:
-                spin_lock(&exp->exp_obd->obd_osfs_lock);
+                spin_lock(&obd->obd_osfs_lock);
                 if (oa)
                         filter_grant_incoming(exp, oa);
-                spin_unlock(&exp->exp_obd->obd_osfs_lock);
-        default: ;
+                spin_unlock(&obd->obd_osfs_lock);
+                pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
+                filter_free_iobuf(iobuf);
+                break;
+        default:;
+        
         }
-        pop_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL);
-        return rc;
+        RETURN(rc);
 }
 
 int filter_preprw(int cmd, struct obd_export *exp, struct obdo *oa,
@@ -742,24 +683,14 @@ static int filter_commitrw_read(struct obd_export *exp, struct obdo *oa,
                                 int niocount, struct niobuf_local *res,
                                 struct obd_trans_info *oti, int rc)
 {
-        struct obd_ioobj *o;
-        struct niobuf_local *lnb;
-        int i, j;
         struct inode *inode = NULL;
         ENTRY;
 
         if (res->dentry != NULL)
                 inode = res->dentry->d_inode;
 
-        for (i = 0, o = obj, lnb = res; i < objcount; i++, o++) {
-                for (j = 0 ; j < o->ioo_bufcnt ; j++, lnb++) {
-                        if (lnb->page == NULL)
-                                continue;
-                        filter_release_read_page(&exp->exp_obd->u.filter,
-                                                 inode, lnb->page);
-                }
-        }
-
+        filter_free_dio_pages(objcount, obj, niocount, res);
+        
         if (res->dentry != NULL)
                 f_dput(res->dentry);
         RETURN(rc);
@@ -848,8 +779,8 @@ int filter_do_cow(struct obd_export *exp, struct obd_ioobj *obj,
 
         push_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL);
         
-        dentry = filter_fid2dentry(exp->exp_obd, NULL, obj->ioo_gr,
-                                   obj->ioo_id);
+        dentry = filter_id2dentry(exp->exp_obd, NULL, obj->ioo_gr,
+                                  obj->ioo_id);
         if (IS_ERR(dentry)) {
                 pop_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL);
                 RETURN (PTR_ERR(dentry));
@@ -908,9 +839,8 @@ int filter_write_extents(struct obd_export *exp, struct obd_ioobj *obj, int nobj
         LASSERT(nobj == 1);
 
         push_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL);
-
-        dentry = filter_fid2dentry(exp->exp_obd, NULL, obj->ioo_gr,
-                                   obj->ioo_id);
+        dentry = filter_id2dentry(exp->exp_obd, NULL, obj->ioo_gr,
+                                  obj->ioo_id);
         if (IS_ERR(dentry)) {
                 pop_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL);
                 RETURN (PTR_ERR(dentry));
@@ -992,7 +922,7 @@ int filter_brw(int cmd, struct obd_export *exp, struct obdo *oa,
                 GOTO(out, ret = -ENOMEM);
 
         for (i = 0; i < oa_bufs; i++) {
-                rnb[i].offset = pga[i].off;
+                rnb[i].offset = pga[i].disk_offset;
                 rnb[i].len = pga[i].count;
         }
 
@@ -1004,9 +934,16 @@ int filter_brw(int cmd, struct obd_export *exp, struct obdo *oa,
                 GOTO(out, ret);
 
         for (i = 0; i < oa_bufs; i++) {
-                void *virt = kmap(pga[i].pg);
-                obd_off off = pga[i].off & ~PAGE_MASK;
-                void *addr = kmap(lnb[i].page);
+                void *virt;
+                obd_off off;
+                void *addr;
+
+                if (lnb[i].page == NULL)
+                        break;
+
+                off = pga[i].disk_offset & ~PAGE_MASK;
+                virt = kmap(pga[i].pg);
+                addr = kmap(lnb[i].page);
 
                 /* 2 kmaps == vanishingly small deadlock opportunity */