Whamcloud - gitweb
419b2a040117f20a9d6141a1d8955c390741aaa1
[fs/lustre-release.git] / lustre / obdfilter / filter_io.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  linux/fs/obdfilter/filter_io.c
5  *
6  *  Copyright (c) 2001-2003 Cluster File Systems, Inc.
7  *   Author: Peter Braam <braam@clusterfs.com>
8  *   Author: Andreas Dilger <adilger@clusterfs.com>
9  *   Author: Phil Schwan <phil@clusterfs.com>
10  *
11  *   This file is part of Lustre, http://www.lustre.org.
12  *
13  *   Lustre is free software; you can redistribute it and/or
14  *   modify it under the terms of version 2 of the GNU General Public
15  *   License as published by the Free Software Foundation.
16  *
17  *   Lustre is distributed in the hope that it will be useful,
18  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
19  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
20  *   GNU General Public License for more details.
21  *
22  *   You should have received a copy of the GNU General Public License
23  *   along with Lustre; if not, write to the Free Software
24  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
25  */
26
27 #define DEBUG_SUBSYSTEM S_FILTER
28
29 #include <linux/config.h>
30 #include <linux/module.h>
31 #include <linux/pagemap.h> // XXX kill me soon
32 #include <linux/version.h>
33 #include <asm/div64.h>
34
35 #include <linux/obd_class.h>
36 #include <linux/lustre_fsfilt.h>
37 #include "filter_internal.h"
38
39 static int filter_start_page_read(struct inode *inode, struct niobuf_local *lnb)
40 {
41         struct address_space *mapping = inode->i_mapping;
42         struct page *page;
43         unsigned long index = lnb->offset >> PAGE_SHIFT;
44         int rc;
45
46         page = grab_cache_page(mapping, index); /* locked page */
47         if (page == NULL)
48                 return lnb->rc = -ENOMEM;
49
50         LASSERT(page->mapping == mapping);
51
52         lnb->page = page;
53
54         if (inode->i_size < lnb->offset + lnb->len - 1)
55                 lnb->rc = inode->i_size - lnb->offset;
56         else
57                 lnb->rc = lnb->len;
58
59         if (PageUptodate(page)) {
60                 unlock_page(page);
61                 return 0;
62         }
63
64         rc = mapping->a_ops->readpage(NULL, page);
65         if (rc < 0) {
66                 CERROR("page index %lu, rc = %d\n", index, rc);
67                 lnb->page = NULL;
68                 page_cache_release(page);
69                 return lnb->rc = rc;
70         }
71
72         return 0;
73 }
74
75 static int filter_finish_page_read(struct niobuf_local *lnb)
76 {
77         if (lnb->page == NULL)
78                 return 0;
79
80         if (PageUptodate(lnb->page))
81                 return 0;
82
83         wait_on_page(lnb->page);
84         if (!PageUptodate(lnb->page)) {
85                 CERROR("page index %lu/offset "LPX64" not uptodate\n",
86                        lnb->page->index, lnb->offset);
87                 GOTO(err_page, lnb->rc = -EIO);
88         }
89         if (PageError(lnb->page)) {
90                 CERROR("page index %lu/offset "LPX64" has error\n",
91                        lnb->page->index, lnb->offset);
92                 GOTO(err_page, lnb->rc = -EIO);
93         }
94
95         return 0;
96
97 err_page:
98         page_cache_release(lnb->page);
99         lnb->page = NULL;
100         return lnb->rc;
101 }
102
103 /* See if there are unallocated parts in given file region */
104 static int filter_inode_has_holes(struct inode *inode, obd_size start,
105                                   int len)
106 {
107         int j;
108 #if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
109         sector_t (*fs_bmap)(struct address_space *,
110                             sector_t);
111 #else
112         int (*fs_bmap)(struct address_space *, long);
113 #endif
114         fs_bmap = inode->i_mapping->a_ops->bmap;
115         if (fs_bmap) {
116                 for (j = 0; j <= len ; j++) {
117                         if (!fs_bmap(inode->i_mapping, start+j)) {
118                                 return 1;
119                         }
120                 }
121                 return 0;
122         } else {
123                 /* Return -1 in case that caller cares about bmap availability.
124                  */
125                 return -1;
126         }
127 }
128  
129 /* Grab the dirty and seen grant announcements from the incoming obdo.
130  * We will later calculate the clients new grant and return it. */
131 static void filter_grant_incoming(struct obd_export *exp, struct obdo *oa)
132 {
133         struct filter_export_data *fed;
134         struct obd_device *obd = exp->exp_obd;
135         obd_size client_cached;
136         ENTRY;
137
138         if (!oa || (oa->o_valid & (OBD_MD_FLBLOCKS|OBD_MD_FLGRANT)) !=
139                                   (OBD_MD_FLBLOCKS|OBD_MD_FLGRANT)) {
140                 if (oa)
141                         oa->o_valid &= ~OBD_MD_FLGRANT;
142                 EXIT;
143                 return;
144         }
145
146         client_cached = oa->o_blocks;
147         fed = &exp->exp_filter_data;
148
149         if (client_cached > fed->fed_grant)
150                 CERROR("client %s claims "LPU64" granted, > "LPU64" granted\n",
151                        obd->obd_name, client_cached, fed->fed_grant);
152
153         spin_lock(&obd->obd_osfs_lock);
154         /* update our accounting now so that statfs takes it into account */
155         obd->u.filter.fo_tot_cached += client_cached - fed->fed_cached;
156         fed->fed_cached = client_cached;
157
158         /* Acknowledgement that the client has seen our published grant.
159          * If the client has met our shrinking target we can reuse its
160          * difference from the previous grant.  It is reasonable to announce
161          * more dirty that cached as it tries to purge its previously granted
162          * dirty data down to its newly received target. */
163         if (fed->fed_grant_waiting && (oa->o_grant <= fed->fed_grant_sent)) {
164                 if (fed->fed_grant_sent < fed->fed_grant) {
165                         if (client_cached <= fed->fed_grant_sent) {
166                                 obd->u.filter.fo_tot_granted -=
167                                         fed->fed_grant - oa->o_grant;
168                                 CDEBUG(D_SUPER, "reduced grant from "LPU64" to "
169                                        LPU64", total grant now "LPU64"\n",
170                                        fed->fed_grant, oa->o_grant,
171                                        obd->u.filter.fo_tot_granted);
172                                 fed->fed_grant = oa->o_grant;
173                                 fed->fed_grant_waiting = 0;
174                         }
175                 } else {
176                         fed->fed_grant_waiting = 0;
177                 }
178         }
179         spin_unlock(&obd->obd_osfs_lock);
180         oa->o_valid &= ~(OBD_MD_FLGRANT|OBD_MD_FLBLOCKS);
181         EXIT;
182 }
183
184 /* Figure out how much space is available between what we've granted
185  * and what remains in the filesystem.  Compensate for ext3 indirect
186  * block overhead when computing how much free space is left ungranted.
187  *
188  * Caller must hold obd_osfs_lock. */
189 obd_size filter_grant_space_left(struct obd_export *exp)
190 {
191         obd_size left = 0;
192         struct obd_device *obd = exp->exp_obd;
193         int blockbits = obd->u.filter.fo_sb->s_blocksize_bits;
194         /* XXX I disabled statfs caching as it only creates extra problems now.
195           -- green*/
196         unsigned long max_age = jiffies/* - HZ*/+1;
197         struct filter_export_data *fed = &exp->exp_filter_data;
198         int rc;
199
200 restat:
201         rc = fsfilt_statfs(obd, obd->u.filter.fo_sb, max_age);
202         if (rc) /* N.B. statfs can't really fail, just for correctness */
203                 RETURN(0);
204
205         left = obd->obd_osfs.os_bavail << blockbits;
206         left -= (left >> (blockbits - 2)) + (left >> (2 * blockbits - 2));
207         /* We cannot afford having absolutely no space, we need some for
208            llog stuff */
209         if ( left >= PAGE_SIZE * 10)
210                 left -= PAGE_SIZE * 10;
211         else
212                 left = 0;
213
214         /* If fed->fed_grant_waiting is set, this means
215            obd->u.filter.fo_tot_granted does not represent actual granted
216            amount and client is supposedly actively shrinks its cache, so
217            no point in printing this warning */
218         if (left < obd->u.filter.fo_tot_granted && !fed->fed_grant_waiting)
219                 CERROR("granted space "LPU64" more than available "LPU64"\n",
220                        obd->u.filter.fo_tot_granted, left);
221
222         left -= min(left, obd->u.filter.fo_tot_granted);
223         if (left < FILTER_GRANT_CHUNK && time_after(jiffies,obd->obd_osfs_age)){
224                 CDEBUG(D_SUPER, "fs has no space left and statfs too old\n");
225                 max_age = jiffies;
226                 goto restat;
227         }
228
229         CDEBUG(D_SUPER, "free: "LPU64" avail: "LPU64" grant left: "LPU64"\n",
230                obd->obd_osfs.os_bfree << blockbits,
231                obd->obd_osfs.os_bavail << blockbits, left);
232
233         return left;
234 }
235
236 /* When clients have dirtied as much space as they've been granted they
237  * fall through to sync writes.  These sync writes haven't been expressed
238  * in grants and need to error with ENOSPC when there isn't room in the
239  * filesystem for them after grants are taken into account.  However,
240  * writeback of the dirty data that was already granted space can write
241  * right on through.  We have no need to stop writes that won't allocate
242  * new space, so we bmap to calculate how much this io is going to consume.
243  *
244  * Caller must hold obd_osfs_lock. */
245 static int filter_check_space(struct obd_export *exp, int objcount,
246                               struct fsfilt_objinfo *fso, int niocount,
247                               struct niobuf_remote *rnb,
248                               struct niobuf_local *lnb, obd_size *left,
249                               obd_size *consumed, struct inode *inode)
250 {
251         int blocksize = exp->exp_obd->u.filter.fo_sb->s_blocksize;
252         obd_size bytes, ungranted = 0;
253         int i, rc = -ENOSPC, obj, n = 0;
254
255         *consumed = 0;
256
257         for (obj = 0; obj < objcount; obj++) {
258                 for (i = 0; i < fso[obj].fso_bufcnt; i++, n++) {
259                         obd_size tmp;
260
261                         bytes = rnb[n].len;
262                         tmp = rnb[n].offset & (blocksize - 1);
263                         bytes += tmp;
264                         tmp = (rnb[n].offset + rnb[n].len) & (blocksize - 1);
265                         if (tmp)
266                                 bytes += blocksize - tmp;
267
268                         if (rnb[n].flags & OBD_BRW_FROM_GRANT) {
269                                 *consumed += bytes;
270                                 rc = 0;
271                                 continue;
272                         }
273                         if (*left - *consumed >= bytes) {
274                                 /* if enough space, pretend it was granted */
275                                 exp->exp_obd->u.filter.fo_tot_granted += bytes;
276                                 exp->exp_filter_data.fed_grant += bytes;
277                                 *consumed += bytes;
278                                 *left -= bytes;
279                                 rc = 0;
280                                 continue;
281                         } 
282                         spin_unlock(&exp->exp_obd->obd_osfs_lock);
283                         if (!filter_inode_has_holes(inode,
284                                                    rnb[n].offset >>
285                                                    inode->i_blkbits,
286                                                    rnb[n].len >>
287                                                    inode->i_blkbits)) {
288                                 rc = 0;
289                         } else {
290                                 rc = lnb[n].rc = -ENOSPC;
291                         }
292                         spin_lock(&exp->exp_obd->obd_osfs_lock);
293                         if (rc)
294                                 goto leave;
295                 }
296         }
297
298         CDEBUG((*consumed != 0 && ungranted != 0) ? D_ERROR : D_SUPER,
299                "consumed: "LPU64" ungranted: "LPU64"\n", *consumed, ungranted);
300
301         if (*consumed > exp->exp_filter_data.fed_grant)
302                 CERROR("request sent from cache, but not enough grant ("LPU64
303                        ","LPU64")\n", *consumed,
304                        exp->exp_filter_data.fed_grant);
305 leave:
306         return rc;
307 }
308
309 /* Calculate how much grant space to allocate to this client, based on how
310  * much space is currently free and how much of that is already granted.
311  *
312  * Caller must hold obd_osfs_lock. */
313 static void filter_grant(struct obd_export *exp, struct obdo *oa,
314                          obd_size left, obd_size from_grant)
315 {
316         struct obd_device *obd = exp->exp_obd;
317         struct filter_export_data *fed = &exp->exp_filter_data;
318         obd_size grant, extra;
319         int blockbits;
320
321         blockbits = obd->u.filter.fo_sb->s_blocksize_bits;
322
323         /* if things go wrong conservatively try to clamp them from
324          * generating more dirty data until things are better on our end */
325         grant = fed->fed_cached;
326
327         extra = min(FILTER_GRANT_CHUNK, left / 2);
328
329         if (grant > fed->fed_grant) {
330                 /* If client has screwed up, force basic grant until fixed */
331                 CERROR("client %s cached more "LPU64" than granted "LPU64"\n",
332                        exp->exp_client_uuid.uuid, fed->fed_cached,
333                        fed->fed_grant);
334                 grant = extra;
335         } else if (fed->fed_grant_waiting) {
336                 /* KISS: only one grant change in flight at a time.  We
337                  *       could move it in the "same direction" easily,
338                  *       but changing directions (e.g. grow then shrink
339                  *       before client ACKs) would be bad. */
340                 grant = fed->fed_grant_sent;
341         } else {
342                 /* grant will shrink or grow as client cache/extra changes */
343                 grant = fed->fed_cached + extra;
344         }
345
346         /* If we've granted all we're willing, we have to revoke
347          * the grant covering what the client just wrote. */
348         if (left == 0) {
349                 grant -= min(from_grant, grant);
350         }
351
352         if (!fed->fed_grant_waiting && grant + from_grant > left ) {
353                 if (from_grant < left)
354                         grant = left - from_grant;
355                 else
356                         grant = 0;
357         }
358
359         if (grant != fed->fed_grant) {
360                 fed->fed_grant_waiting = 1;
361                 fed->fed_grant_sent = grant;
362                 if (grant > fed->fed_grant) {
363                         obd->u.filter.fo_tot_granted += grant - fed->fed_grant;
364                         fed->fed_grant = grant;
365                 }
366         }
367
368         CDEBUG(D_SUPER,"cli %s cache:"LPU64" grant:"LPU64", granting:"LPU64"\n",
369                         exp->exp_connection->c_remote_uuid.uuid, oa->o_blocks,
370                         oa->o_grant, grant);
371         CDEBUG(D_SUPER, "fed sent:"LPU64" wt:%d grant:"LPU64"\n",
372                         fed->fed_grant_sent, fed->fed_grant_waiting,
373                         fed->fed_grant);
374         CDEBUG(D_SUPER, "tot cached:"LPU64" granted:"LPU64" num_exports: %d\n",
375                         obd->u.filter.fo_tot_cached,
376                         obd->u.filter.fo_tot_granted, obd->obd_num_exports);
377
378         oa->o_valid |= OBD_MD_FLGRANT;
379         oa->o_grant = grant;
380 }
381
382 static int filter_preprw_read(int cmd, struct obd_export *exp, struct obdo *oa,
383                               int objcount, struct obd_ioobj *obj,
384                               int niocount, struct niobuf_remote *nb,
385                               struct niobuf_local *res,
386                               struct obd_trans_info *oti)
387 {
388         struct obd_device *obd = exp->exp_obd;
389         struct obd_run_ctxt saved;
390         struct obd_ioobj *o;
391         struct niobuf_remote *rnb;
392         struct niobuf_local *lnb = NULL;
393         struct fsfilt_objinfo *fso;
394         struct dentry *dentry;
395         struct inode *inode;
396         int rc = 0, i, j, tot_bytes = 0, cleanup_phase = 0;
397         unsigned long now = jiffies;
398         ENTRY;
399
400         /* We are currently not supporting multi-obj BRW_READ RPCS at all.
401          * When we do this function's dentry cleanup will need to be fixed */
402         LASSERT(objcount == 1);
403
404         OBD_ALLOC(fso, objcount * sizeof(*fso));
405         if (fso == NULL)
406                 RETURN(-ENOMEM);
407
408         memset(res, 0, niocount * sizeof(*res));
409
410         push_ctxt(&saved, &exp->exp_obd->obd_ctxt, NULL);
411         for (i = 0, o = obj; i < objcount; i++, o++) {
412                 LASSERT(o->ioo_bufcnt);
413
414                 dentry = filter_oa2dentry(obd, oa);
415                 if (IS_ERR(dentry))
416                         GOTO(cleanup, rc = PTR_ERR(dentry));
417
418                 if (dentry->d_inode == NULL) {
419                         CERROR("trying to BRW to non-existent file "LPU64"\n",
420                                o->ioo_id);
421                         f_dput(dentry);
422                         GOTO(cleanup, rc = -ENOENT);
423                 }
424
425                 fso[i].fso_dentry = dentry;
426                 fso[i].fso_bufcnt = o->ioo_bufcnt;
427         }
428
429         if (time_after(jiffies, now + 15 * HZ))
430                 CERROR("slow preprw_read setup %lus\n", (jiffies - now) / HZ);
431         else
432                 CDEBUG(D_INFO, "preprw_read setup: %lu jiffies\n",
433                        (jiffies - now));
434
435         if (oa) {
436                 spin_lock(&obd->obd_osfs_lock);
437                 filter_grant(exp, oa, filter_grant_space_left(exp), 0);
438                 spin_unlock(&obd->obd_osfs_lock);
439         }
440
441         for (i = 0, o = obj, rnb = nb, lnb = res; i < objcount; i++, o++) {
442                 dentry = fso[i].fso_dentry;
443                 inode = dentry->d_inode;
444
445                 for (j = 0; j < o->ioo_bufcnt; j++, rnb++, lnb++) {
446                         lnb->dentry = dentry;
447                         lnb->offset = rnb->offset;
448                         lnb->len    = rnb->len;
449                         lnb->flags  = rnb->flags;
450                         lnb->start  = jiffies;
451
452                         if (inode->i_size <= rnb->offset) {
453                                 /* If there's no more data, abort early.
454                                  * lnb->page == NULL and lnb->rc == 0, so it's
455                                  * easy to detect later. */
456                                 break;
457                         } else {
458                                 rc = filter_start_page_read(inode, lnb);
459                         }
460
461                         if (rc) {
462                                 CDEBUG(rc == -ENOSPC ? D_INODE : D_ERROR,
463                                        "page err %u@"LPU64" %u/%u %p: rc %d\n",
464                                        lnb->len, lnb->offset, j, o->ioo_bufcnt,
465                                        dentry, rc);
466                                 cleanup_phase = 1;
467                                 GOTO(cleanup, rc);
468                         }
469
470                         tot_bytes += lnb->rc;
471                         if (lnb->rc < lnb->len) {
472                                 /* short read, be sure to wait on it */
473                                 lnb++;
474                                 break;
475                         }
476                 }
477         }
478
479         if (time_after(jiffies, now + 15 * HZ))
480                 CERROR("slow start_page_read %lus\n", (jiffies - now) / HZ);
481         else
482                 CDEBUG(D_INFO, "start_page_read: %lu jiffies\n",
483                        (jiffies - now));
484
485         lprocfs_counter_add(obd->obd_stats, LPROC_FILTER_READ_BYTES, tot_bytes);
486         while (lnb-- > res) {
487                 rc = filter_finish_page_read(lnb);
488                 if (rc) {
489                         CERROR("error page %u@"LPU64" %u %p: rc %d\n", lnb->len,
490                                lnb->offset, (int)(lnb - res), lnb->dentry, rc);
491                         cleanup_phase = 1;
492                         GOTO(cleanup, rc);
493                 }
494         }
495
496         if (time_after(jiffies, now + 15 * HZ))
497                 CERROR("slow finish_page_read %lus\n", (jiffies - now) / HZ);
498         else
499                 CDEBUG(D_INFO, "finish_page_read: %lu jiffies\n",
500                        (jiffies - now));
501
502         EXIT;
503
504  cleanup:
505         switch (cleanup_phase) {
506         case 1:
507                 for (lnb = res; lnb < (res + niocount); lnb++) {
508                         if (lnb->page)
509                                 page_cache_release(lnb->page);
510                 }
511                 if (res->dentry != NULL)
512                         f_dput(res->dentry);
513                 else
514                         CERROR("NULL dentry in cleanup -- tell CFS\n");
515         case 0:
516                 OBD_FREE(fso, objcount * sizeof(*fso));
517                 pop_ctxt(&saved, &exp->exp_obd->obd_ctxt, NULL);
518         }
519         return rc;
520 }
521
522 static int filter_start_page_write(struct inode *inode,
523                                    struct niobuf_local *lnb)
524 {
525         struct page *page = alloc_pages(GFP_HIGHUSER, 0);
526         if (page == NULL) {
527                 CERROR("no memory for a temp page\n");
528                 RETURN(lnb->rc = -ENOMEM);
529         }
530         POISON_PAGE(page, 0xf1);
531         page->index = lnb->offset >> PAGE_SHIFT;
532         lnb->page = page;
533
534         return 0;
535 }
536
537 /* If we ever start to support multi-object BRW RPCs, we will need to get locks
538  * on mulitple inodes.  That isn't all, because there still exists the
539  * possibility of a truncate starting a new transaction while holding the ext3
540  * rwsem = write while some writes (which have started their transactions here)
541  * blocking on the ext3 rwsem = read => lock inversion.
542  *
543  * The handling gets very ugly when dealing with locked pages.  It may be easier
544  * to just get rid of the locked page code (which has problems of its own) and
545  * either discover we do not need it anymore (i.e. it was a symptom of another
546  * bug) or ensure we get the page locks in an appropriate order. */
547 static int filter_preprw_write(int cmd, struct obd_export *exp, struct obdo *oa,
548                                int objcount, struct obd_ioobj *obj,
549                                int niocount, struct niobuf_remote *nb,
550                                struct niobuf_local *res,
551                                struct obd_trans_info *oti)
552 {
553         struct obd_device *obd = exp->exp_obd;
554         struct obd_run_ctxt saved;
555         struct niobuf_remote *rnb = nb;
556         struct niobuf_local *lnb = res;
557         struct fsfilt_objinfo fso;
558         struct dentry *dentry;
559         int rc = 0, i, tot_bytes = 0;
560         obd_size consumed = 0, left;
561         unsigned long now = jiffies;
562         ENTRY;
563         LASSERT(objcount == 1);
564         LASSERT(obj->ioo_bufcnt > 0);
565
566         filter_grant_incoming(exp, oa);
567
568         memset(res, 0, niocount * sizeof(*res));
569
570         push_ctxt(&saved, &obd->obd_ctxt, NULL);
571         dentry = filter_fid2dentry(obd, NULL, obj->ioo_gr, obj->ioo_id);
572         if (IS_ERR(dentry))
573                 GOTO(cleanup, rc = PTR_ERR(dentry));
574
575         if (dentry->d_inode == NULL) {
576                 CERROR("trying to BRW to non-existent file "LPU64"\n",
577                        obj->ioo_id);
578                 f_dput(dentry);
579                 GOTO(cleanup, rc = -ENOENT);
580         }
581
582         fso.fso_dentry = dentry;
583         fso.fso_bufcnt = obj->ioo_bufcnt;
584
585         if (time_after(jiffies, now + 15 * HZ))
586                 CERROR("slow preprw_write setup %lus\n", (jiffies - now) / HZ);
587         else
588                 CDEBUG(D_INFO, "preprw_write setup: %lu jiffies\n",
589                        (jiffies - now));
590
591         spin_lock(&obd->obd_osfs_lock);
592         left = filter_grant_space_left(exp);
593
594         rc = filter_check_space(exp, objcount, &fso, niocount, rnb, lnb,
595                                 &left, &consumed, dentry->d_inode);
596         if (oa)
597                 filter_grant(exp, oa, left, consumed);
598
599         spin_unlock(&obd->obd_osfs_lock);
600
601         if (rc) {
602                 f_dput(dentry);
603                 GOTO(cleanup, rc);
604         }
605
606         for (i = 0, rnb = nb, lnb = res; i < obj->ioo_bufcnt;
607              i++, lnb++, rnb++) {
608
609                 /* If there were any granting failures, we should not have
610                    come here */
611                 LASSERT (lnb->rc == 0);
612
613                 lnb->dentry = dentry;
614                 lnb->offset = rnb->offset;
615                 lnb->len    = rnb->len;
616                 lnb->flags  = rnb->flags;
617                 lnb->start  = jiffies;
618
619                 rc = filter_start_page_write(dentry->d_inode, lnb);
620                 if (rc) {
621                         CDEBUG(rc == -ENOSPC ? D_INODE : D_ERROR, "page err %u@"
622                                LPU64" %u/%u %p: rc %d\n", lnb->len, lnb->offset,
623                                i, obj->ioo_bufcnt, dentry, rc);
624                         while (lnb-- > res)
625                                 __free_pages(lnb->page, 0);
626                         f_dput(dentry);
627                         GOTO(cleanup, rc);
628                 }
629                 tot_bytes += lnb->len;
630         }
631
632         if (time_after(jiffies, now + 15 * HZ))
633                 CERROR("slow start_page_write %lus\n", (jiffies - now) / HZ);
634         else
635                 CDEBUG(D_INFO, "start_page_write: %lu jiffies\n",
636                        (jiffies - now));
637
638         lprocfs_counter_add(obd->obd_stats, LPROC_FILTER_WRITE_BYTES, tot_bytes);
639         EXIT;
640 cleanup:
641         pop_ctxt(&saved, &obd->obd_ctxt, NULL);
642         return rc;
643 }
644
645 int filter_preprw(int cmd, struct obd_export *exp, struct obdo *oa,
646                   int objcount, struct obd_ioobj *obj, int niocount,
647                   struct niobuf_remote *nb, struct niobuf_local *res,
648                   struct obd_trans_info *oti)
649 {
650         if (cmd == OBD_BRW_WRITE)
651                 return filter_preprw_write(cmd, exp, oa, objcount, obj,
652                                            niocount, nb, res, oti);
653
654         if (cmd == OBD_BRW_READ)
655                 return filter_preprw_read(cmd, exp, oa, objcount, obj,
656                                           niocount, nb, res, oti);
657
658         LBUG();
659         return -EPROTO;
660 }
661
662 static int filter_commitrw_read(struct obd_export *exp, struct obdo *oa,
663                                 int objcount, struct obd_ioobj *obj,
664                                 int niocount, struct niobuf_local *res,
665                                 struct obd_trans_info *oti)
666 {
667         struct obd_ioobj *o;
668         struct niobuf_local *lnb;
669         int i, j;
670         ENTRY;
671
672         for (i = 0, o = obj, lnb = res; i < objcount; i++, o++) {
673                 for (j = 0 ; j < o->ioo_bufcnt ; j++, lnb++) {
674                         if (lnb->page != NULL)
675                                 page_cache_release(lnb->page);
676                 }
677         }
678         if (res->dentry != NULL)
679                 f_dput(res->dentry);
680         RETURN(0);
681 }
682
683 void flip_into_page_cache(struct inode *inode, struct page *new_page)
684 {
685         struct page *old_page;
686         int rc;
687
688         do {
689                 /* the dlm is protecting us from read/write concurrency, so we
690                  * expect this find_lock_page to return quickly.  even if we
691                  * race with another writer it won't be doing much work with
692                  * the page locked.  we do this 'cause t_c_p expects a 
693                  * locked page, and it wants to grab the pagecache lock
694                  * as well. */
695                 old_page = find_lock_page(inode->i_mapping, new_page->index);
696                 if (old_page) {
697 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
698                         truncate_complete_page(old_page);
699 #else
700                         truncate_complete_page(old_page->mapping, old_page);
701 #endif
702                         unlock_page(old_page);
703                         page_cache_release(old_page);
704                 }
705
706 #if 0 /* this should be a /proc tunable someday */
707                 /* racing o_directs (no locking ioctl) could race adding
708                  * their pages, so we repeat the page invalidation unless
709                  * we successfully added our new page */
710                 rc = add_to_page_cache_unique(new_page, inode->i_mapping, 
711                                               new_page->index,
712                                               page_hash(inode->i_mapping, 
713                                                         new_page->index));
714                 if (rc == 0) {
715                         /* add_to_page_cache clears uptodate|dirty and locks
716                          * the page */
717                         SetPageUptodate(new_page);
718                         unlock_page(new_page);
719                 }
720 #else   
721                 rc = 0;
722 #endif
723         } while (rc != 0);
724 }
725
726 /* XXX needs to trickle its oa down */
727 int filter_commitrw(int cmd, struct obd_export *exp, struct obdo *oa,
728                     int objcount, struct obd_ioobj *obj, int niocount,
729                     struct niobuf_local *res, struct obd_trans_info *oti)
730 {
731         if (cmd == OBD_BRW_WRITE)
732                 return filter_commitrw_write(exp, oa, objcount, obj, niocount,
733                                              res, oti);
734         if (cmd == OBD_BRW_READ)
735                 return filter_commitrw_read(exp, oa, objcount, obj, niocount,
736                                             res, oti);
737         LBUG();
738         return -EPROTO;
739 }
740
741 int filter_brw(int cmd, struct obd_export *exp, struct obdo *oa,
742                struct lov_stripe_md *lsm, obd_count oa_bufs,
743                struct brw_page *pga, struct obd_trans_info *oti)
744 {
745         struct obd_ioobj ioo;
746         struct niobuf_local *lnb;
747         struct niobuf_remote *rnb;
748         obd_count i;
749         int ret = 0;
750         ENTRY;
751
752         OBD_ALLOC(lnb, oa_bufs * sizeof(struct niobuf_local));
753         OBD_ALLOC(rnb, oa_bufs * sizeof(struct niobuf_remote));
754
755         if (lnb == NULL || rnb == NULL)
756                 GOTO(out, ret = -ENOMEM);
757
758         for (i = 0; i < oa_bufs; i++) {
759                 rnb[i].offset = pga[i].off;
760                 rnb[i].len = pga[i].count;
761         }
762
763         obdo_to_ioobj(oa, &ioo);
764         ioo.ioo_bufcnt = oa_bufs;
765
766         ret = filter_preprw(cmd, exp, oa, 1, &ioo, oa_bufs, rnb, lnb, oti);
767         if (ret != 0)
768                 GOTO(out, ret);
769
770         for (i = 0; i < oa_bufs; i++) {
771                 void *virt = kmap(pga[i].pg);
772                 obd_off off = pga[i].off & ~PAGE_MASK;
773                 void *addr = kmap(lnb[i].page);
774
775                 /* 2 kmaps == vanishingly small deadlock opportunity */
776
777                 if (cmd & OBD_BRW_WRITE)
778                         memcpy(addr + off, virt + off, pga[i].count);
779                 else
780                         memcpy(virt + off, addr + off, pga[i].count);
781
782                 kunmap(lnb[i].page);
783                 kunmap(pga[i].pg);
784         }
785
786         ret = filter_commitrw(cmd, exp, oa, 1, &ioo, oa_bufs, lnb, oti);
787
788 out:
789         if (lnb)
790                 OBD_FREE(lnb, oa_bufs * sizeof(struct niobuf_local));
791         if (rnb)
792                 OBD_FREE(rnb, oa_bufs * sizeof(struct niobuf_remote));
793         RETURN(ret);
794 }