Whamcloud - gitweb
3a563ba11a5a6d4d9b22fdb23338bd550cdb1930
[fs/lustre-release.git] / lustre / obdfilter / filter_io.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  linux/fs/obdfilter/filter_io.c
5  *
6  *  Copyright (c) 2001-2003 Cluster File Systems, Inc.
7  *   Author: Peter Braam <braam@clusterfs.com>
8  *   Author: Andreas Dilger <adilger@clusterfs.com>
9  *   Author: Phil Schwan <phil@clusterfs.com>
10  *
11  *   This file is part of Lustre, http://www.lustre.org.
12  *
13  *   Lustre is free software; you can redistribute it and/or
14  *   modify it under the terms of version 2 of the GNU General Public
15  *   License as published by the Free Software Foundation.
16  *
17  *   Lustre is distributed in the hope that it will be useful,
18  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
19  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
20  *   GNU General Public License for more details.
21  *
22  *   You should have received a copy of the GNU General Public License
23  *   along with Lustre; if not, write to the Free Software
24  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
25  */
26
27 #define DEBUG_SUBSYSTEM S_FILTER
28
29 #include <linux/config.h>
30 #include <linux/module.h>
31 #include <linux/pagemap.h> // XXX kill me soon
32 #include <linux/version.h>
33 #include <asm/div64.h>
34
35 #include <linux/obd_class.h>
36 #include <linux/lustre_fsfilt.h>
37 #include "filter_internal.h"
38
39 static int filter_start_page_read(struct inode *inode, struct niobuf_local *lnb)
40 {
41         struct address_space *mapping = inode->i_mapping;
42         struct page *page;
43         unsigned long index = lnb->offset >> PAGE_SHIFT;
44         int rc;
45
46         page = grab_cache_page(mapping, index); /* locked page */
47         if (page == NULL)
48                 return lnb->rc = -ENOMEM;
49
50         LASSERT(page->mapping == mapping);
51
52         lnb->page = page;
53
54         if (inode->i_size < lnb->offset + lnb->len - 1)
55                 lnb->rc = inode->i_size - lnb->offset;
56         else
57                 lnb->rc = lnb->len;
58
59         if (PageUptodate(page)) {
60                 unlock_page(page);
61                 return 0;
62         }
63
64         rc = mapping->a_ops->readpage(NULL, page);
65         if (rc < 0) {
66                 CERROR("page index %lu, rc = %d\n", index, rc);
67                 lnb->page = NULL;
68                 page_cache_release(page);
69                 return lnb->rc = rc;
70         }
71
72         return 0;
73 }
74
75 static int filter_finish_page_read(struct niobuf_local *lnb)
76 {
77         if (lnb->page == NULL)
78                 return 0;
79
80         if (PageUptodate(lnb->page))
81                 return 0;
82
83         wait_on_page(lnb->page);
84         if (!PageUptodate(lnb->page)) {
85                 CERROR("page index %lu/offset "LPX64" not uptodate\n",
86                        lnb->page->index, lnb->offset);
87                 GOTO(err_page, lnb->rc = -EIO);
88         }
89         if (PageError(lnb->page)) {
90                 CERROR("page index %lu/offset "LPX64" has error\n",
91                        lnb->page->index, lnb->offset);
92                 GOTO(err_page, lnb->rc = -EIO);
93         }
94
95         return 0;
96
97 err_page:
98         page_cache_release(lnb->page);
99         lnb->page = NULL;
100         return lnb->rc;
101 }
102
103 /* See if there are unallocated parts in given file region */
104 static int filter_inode_has_holes(struct inode *inode, obd_size start,
105                                   int len)
106 {
107         int j;
108 #if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
109         sector_t (*fs_bmap)(struct address_space *,
110                             sector_t);
111 #else
112         int (*fs_bmap)(struct address_space *, long);
113 #endif
114         fs_bmap = inode->i_mapping->a_ops->bmap;
115         if (fs_bmap) {
116                 for (j = 0; j <= len ; j++) {
117                         if (!fs_bmap(inode->i_mapping, start+j)) {
118                                 return 1;
119                         }
120                 }
121                 return 0;
122         } else {
123                 /* Return -1 in case that caller cares about bmap availability.
124                  */
125                 return -1;
126         }
127 }
128  
129 /* Grab the dirty and seen grant announcements from the incoming obdo.
130  * We will later calculate the clients new grant and return it. */
131 static void filter_grant_incoming(struct obd_export *exp, struct obdo *oa)
132 {
133         struct filter_export_data *fed;
134         struct obd_device *obd = exp->exp_obd;
135         obd_size client_cached;
136         ENTRY;
137
138         if (!oa || (oa->o_valid & (OBD_MD_FLBLOCKS|OBD_MD_FLGRANT)) !=
139                                   (OBD_MD_FLBLOCKS|OBD_MD_FLGRANT)) {
140                 if (oa)
141                         oa->o_valid &= ~OBD_MD_FLGRANT;
142                 EXIT;
143                 return;
144         }
145
146         client_cached = oa->o_blocks;
147         fed = &exp->exp_filter_data;
148
149         if (client_cached > fed->fed_grant)
150                 CERROR("client %s claims "LPU64" granted, > "LPU64" granted\n",
151                        obd->obd_name, client_cached, fed->fed_grant);
152
153         spin_lock(&obd->obd_osfs_lock);
154         /* update our accounting now so that statfs takes it into account */
155         obd->u.filter.fo_tot_cached += client_cached - fed->fed_cached;
156         fed->fed_cached = client_cached;
157
158         /* Acknowledgement that the client has seen our published grant.
159          * If the client has met our shrinking target we can reuse its
160          * difference from the previous grant.  It is reasonable to announce
161          * more dirty that cached as it tries to purge its previously granted
162          * dirty data down to its newly received target. */
163         if (fed->fed_grant_waiting && (oa->o_grant <= fed->fed_grant_sent)) {
164                 if (fed->fed_grant_sent < fed->fed_grant) {
165                         if (client_cached <= fed->fed_grant_sent) {
166                                 obd->u.filter.fo_tot_granted -=
167                                         fed->fed_grant - oa->o_grant;
168                                 CDEBUG(D_SUPER, "reduced grant from "LPU64" to "
169                                        LPU64", total grant now "LPU64"\n",
170                                        fed->fed_grant, oa->o_grant,
171                                        obd->u.filter.fo_tot_granted);
172                                 fed->fed_grant = oa->o_grant;
173                                 fed->fed_grant_waiting = 0;
174                         }
175                 } else {
176                         fed->fed_grant_waiting = 0;
177                 }
178         }
179         spin_unlock(&obd->obd_osfs_lock);
180         oa->o_valid &= ~(OBD_MD_FLGRANT|OBD_MD_FLBLOCKS);
181         EXIT;
182 }
183
184 /* Figure out how much space is available between what we've granted
185  * and what remains in the filesystem.  Compensate for ext3 indirect
186  * block overhead when computing how much free space is left ungranted.
187  *
188  * Caller must hold obd_osfs_lock. */
189 obd_size filter_grant_space_left(struct obd_export *exp)
190 {
191         obd_size left = 0;
192         struct obd_device *obd = exp->exp_obd;
193         int blockbits = obd->u.filter.fo_sb->s_blocksize_bits;
194         /* XXX I disabled statfs caching as it only creates extra problems now.
195           -- green*/
196         unsigned long max_age = jiffies/* - HZ*/+1;
197         struct filter_export_data *fed = &exp->exp_filter_data;
198         int rc;
199
200 restat:
201         rc = fsfilt_statfs(obd, obd->u.filter.fo_sb, max_age);
202         if (rc) /* N.B. statfs can't really fail, just for correctness */
203                 RETURN(0);
204
205         left = obd->obd_osfs.os_bavail << blockbits;
206         left -= (left >> (blockbits - 2)) + (left >> (2 * blockbits - 2));
207         /* We cannot afford having absolutely no space, we need some for
208            llog stuff */
209         if ( left >= PAGE_SIZE * 10)
210                 left -= PAGE_SIZE * 10;
211         else
212                 left = 0;
213
214         /* If fed->fed_grant_waiting is set, this means
215            obd->u.filter.fo_tot_granted does not represent actual granted
216            amount and client is supposedly actively shrinks its cache, so
217            no point in printing this warning */
218         if (left < obd->u.filter.fo_tot_granted && !fed->fed_grant_waiting)
219                 CERROR("granted space "LPU64" more than available "LPU64"\n",
220                        obd->u.filter.fo_tot_granted, left);
221
222         left -= min(left, obd->u.filter.fo_tot_granted);
223         if (left < FILTER_GRANT_CHUNK && time_after(jiffies,obd->obd_osfs_age)){
224                 CDEBUG(D_SUPER, "fs has no space left and statfs too old\n");
225                 max_age = jiffies;
226                 goto restat;
227         }
228
229         CDEBUG(D_SUPER, "free: "LPU64" avail: "LPU64" grant left: "LPU64"\n",
230                obd->obd_osfs.os_bfree << blockbits,
231                obd->obd_osfs.os_bavail << blockbits, left);
232
233         return left;
234 }
235
236 /* When clients have dirtied as much space as they've been granted they
237  * fall through to sync writes.  These sync writes haven't been expressed
238  * in grants and need to error with ENOSPC when there isn't room in the
239  * filesystem for them after grants are taken into account.  However,
240  * writeback of the dirty data that was already granted space can write
241  * right on through.  We have no need to stop writes that won't allocate
242  * new space, so we bmap to calculate how much this io is going to consume.
243  *
244  * Caller must hold obd_osfs_lock. */
245 static int filter_check_space(struct obd_export *exp, int objcount,
246                               struct fsfilt_objinfo *fso, int niocount,
247                               struct niobuf_remote *rnb,
248                               struct niobuf_local *lnb, obd_size *left,
249                               obd_size *consumed, struct inode *inode)
250 {
251         int blocksize = exp->exp_obd->u.filter.fo_sb->s_blocksize;
252         obd_size bytes, ungranted = 0;
253         int i, rc = -ENOSPC, obj, n = 0;
254
255         *consumed = 0;
256
257         for (obj = 0; obj < objcount; obj++) {
258                 for (i = 0; i < fso[obj].fso_bufcnt; i++, n++) {
259                         obd_size tmp;
260
261                         bytes = rnb[n].len;
262                         tmp = rnb[n].offset & (blocksize - 1);
263                         bytes += tmp;
264                         tmp = (rnb[n].offset + rnb[n].len) & (blocksize - 1);
265                         if (tmp)
266                                 bytes += blocksize - tmp;
267
268                         if (rnb[n].flags & OBD_BRW_FROM_GRANT) {
269                                 *consumed += bytes;
270                                 rc = 0;
271                                 continue;
272                         }
273                         if (*left - *consumed >= bytes) {
274                                 /* if enough space, pretend it was granted */
275                                 exp->exp_obd->u.filter.fo_tot_granted += bytes;
276                                 exp->exp_filter_data.fed_grant += bytes;
277                                 *consumed += bytes;
278                                 *left -= bytes;
279                                 rc = 0;
280                                 continue;
281                         } 
282                         spin_unlock(&exp->exp_obd->obd_osfs_lock);
283                         if (!filter_inode_has_holes(inode,
284                                                    rnb[n].offset >>
285                                                    inode->i_blkbits,
286                                                    rnb[n].len >>
287                                                    inode->i_blkbits)) {
288                                 rc = 0;
289                         } else {
290                                 rc = lnb[n].rc = -ENOSPC;
291                         }
292                         spin_lock(&exp->exp_obd->obd_osfs_lock);
293                         if (rc)
294                                 goto leave;
295                 }
296         }
297
298         CDEBUG((*consumed != 0 && ungranted != 0) ? D_ERROR : D_SUPER,
299                "consumed: "LPU64" ungranted: "LPU64"\n", *consumed, ungranted);
300
301         if (*consumed > exp->exp_filter_data.fed_grant)
302                 CERROR("request sent from cache, but not enough grant ("LPU64
303                        ","LPU64")\n", *consumed,
304                        exp->exp_filter_data.fed_grant);
305 leave:
306         return rc;
307 }
308
309 /* Calculate how much grant space to allocate to this client, based on how
310  * much space is currently free and how much of that is already granted.
311  *
312  * Caller must hold obd_osfs_lock. */
313 static void filter_grant(struct obd_export *exp, struct obdo *oa,
314                          obd_size left, obd_size from_grant)
315 {
316         struct obd_device *obd = exp->exp_obd;
317         struct filter_export_data *fed = &exp->exp_filter_data;
318         obd_size grant, extra;
319         int blockbits;
320
321         blockbits = obd->u.filter.fo_sb->s_blocksize_bits;
322
323         /* if things go wrong conservatively try to clamp them from
324          * generating more dirty data until things are better on our end */
325         grant = fed->fed_cached;
326
327         extra = min(FILTER_GRANT_CHUNK, left / 2);
328
329         if (grant > fed->fed_grant) {
330                 /* If client has screwed up, force basic grant until fixed */
331                 CERROR("client %s cached more "LPU64" than granted "LPU64"\n",
332                        exp->exp_client_uuid.uuid, fed->fed_cached,
333                        fed->fed_grant);
334                 grant = extra;
335         } else if (fed->fed_grant_waiting) {
336                 /* KISS: only one grant change in flight at a time.  We
337                  *       could move it in the "same direction" easily,
338                  *       but changing directions (e.g. grow then shrink
339                  *       before client ACKs) would be bad. */
340                 grant = fed->fed_grant_sent;
341         } else {
342                 /* grant will shrink or grow as client cache/extra changes */
343                 grant = fed->fed_cached + extra;
344         }
345
346         /* If we've granted all we're willing, we have to revoke
347          * the grant covering what the client just wrote. */
348         if (left == 0) {
349                 grant -= min(from_grant, grant);
350         }
351
352         if (!fed->fed_grant_waiting && grant + from_grant > left ) {
353                 if (from_grant < left)
354                         grant = left - from_grant;
355                 else
356                         grant = 0;
357         }
358
359         if (grant != fed->fed_grant) {
360                 fed->fed_grant_waiting = 1;
361                 fed->fed_grant_sent = grant;
362                 if (grant > fed->fed_grant) {
363                         obd->u.filter.fo_tot_granted += grant - fed->fed_grant;
364                         fed->fed_grant = grant;
365                 }
366         }
367
368         CDEBUG(D_SUPER,"cli %s cache:"LPU64" grant:"LPU64", granting:"LPU64"\n",
369                         exp->exp_connection->c_remote_uuid.uuid, oa->o_blocks,
370                         oa->o_grant, grant);
371         CDEBUG(D_SUPER, "fed sent:"LPU64" wt:%d grant:"LPU64"\n",
372                         fed->fed_grant_sent, fed->fed_grant_waiting,
373                         fed->fed_grant);
374         CDEBUG(D_SUPER, "tot cached:"LPU64" granted:"LPU64" num_exports: %d\n",
375                         obd->u.filter.fo_tot_cached,
376                         obd->u.filter.fo_tot_granted, obd->obd_num_exports);
377
378         oa->o_valid |= OBD_MD_FLGRANT;
379         oa->o_grant = grant;
380 }
381
382 static int filter_preprw_read(int cmd, struct obd_export *exp, struct obdo *oa,
383                               int objcount, struct obd_ioobj *obj,
384                               int niocount, struct niobuf_remote *nb,
385                               struct niobuf_local *res,
386                               struct obd_trans_info *oti)
387 {
388         struct obd_device *obd = exp->exp_obd;
389         struct obd_run_ctxt saved;
390         struct obd_ioobj *o;
391         struct niobuf_remote *rnb;
392         struct niobuf_local *lnb = NULL;
393         struct fsfilt_objinfo *fso;
394         struct dentry *dentry;
395         struct inode *inode;
396         int rc = 0, i, j, tot_bytes = 0, cleanup_phase = 0;
397         unsigned long now = jiffies;
398         ENTRY;
399
400         /* We are currently not supporting multi-obj BRW_READ RPCS at all.
401          * When we do this function's dentry cleanup will need to be fixed */
402         LASSERT(objcount == 1);
403
404         OBD_ALLOC(fso, objcount * sizeof(*fso));
405         if (fso == NULL)
406                 RETURN(-ENOMEM);
407
408         memset(res, 0, niocount * sizeof(*res));
409
410         push_ctxt(&saved, &exp->exp_obd->obd_ctxt, NULL);
411         for (i = 0, o = obj; i < objcount; i++, o++) {
412                 LASSERT(o->ioo_bufcnt);
413
414                 dentry = filter_oa2dentry(obd, oa);
415                 if (IS_ERR(dentry))
416                         GOTO(cleanup, rc = PTR_ERR(dentry));
417
418                 if (dentry->d_inode == NULL) {
419                         CERROR("trying to BRW to non-existent file "LPU64"\n",
420                                o->ioo_id);
421                         f_dput(dentry);
422                         GOTO(cleanup, rc = -ENOENT);
423                 }
424
425                 fso[i].fso_dentry = dentry;
426                 fso[i].fso_bufcnt = o->ioo_bufcnt;
427         }
428
429         if (time_after(jiffies, now + 15 * HZ))
430                 CERROR("slow preprw_read setup %lus\n", (jiffies - now) / HZ);
431         else
432                 CDEBUG(D_INFO, "preprw_read setup: %lu jiffies\n",
433                        (jiffies - now));
434
435         if (oa) {
436                 spin_lock(&obd->obd_osfs_lock);
437                 filter_grant(exp, oa, filter_grant_space_left(exp), 0);
438                 spin_unlock(&obd->obd_osfs_lock);
439         }
440
441         for (i = 0, o = obj, rnb = nb, lnb = res; i < objcount; i++, o++) {
442                 dentry = fso[i].fso_dentry;
443                 inode = dentry->d_inode;
444
445                 for (j = 0; j < o->ioo_bufcnt; j++, rnb++, lnb++) {
446                         lnb->dentry = dentry;
447                         lnb->offset = rnb->offset;
448                         lnb->len    = rnb->len;
449                         lnb->flags  = rnb->flags;
450                         lnb->start  = jiffies;
451
452                         if (inode->i_size <= rnb->offset) {
453                                 /* If there's no more data, abort early.
454                                  * lnb->page == NULL and lnb->rc == 0, so it's
455                                  * easy to detect later. */
456                                 break;
457                         } else {
458                                 rc = filter_start_page_read(inode, lnb);
459                         }
460
461                         if (rc) {
462                                 CDEBUG(rc == -ENOSPC ? D_INODE : D_ERROR,
463                                        "page err %u@"LPU64" %u/%u %p: rc %d\n",
464                                        lnb->len, lnb->offset, j, o->ioo_bufcnt,
465                                        dentry, rc);
466                                 cleanup_phase = 1;
467                                 GOTO(cleanup, rc);
468                         }
469
470                         tot_bytes += lnb->rc;
471                         if (lnb->rc < lnb->len) {
472                                 /* short read, be sure to wait on it */
473                                 lnb++;
474                                 break;
475                         }
476                 }
477         }
478
479         if (time_after(jiffies, now + 15 * HZ))
480                 CERROR("slow start_page_read %lus\n", (jiffies - now) / HZ);
481         else
482                 CDEBUG(D_INFO, "start_page_read: %lu jiffies\n",
483                        (jiffies - now));
484
485         lprocfs_counter_add(obd->obd_stats, LPROC_FILTER_READ_BYTES, tot_bytes);
486         while (lnb-- > res) {
487                 rc = filter_finish_page_read(lnb);
488                 if (rc) {
489                         CERROR("error page %u@"LPU64" %u %p: rc %d\n", lnb->len,
490                                lnb->offset, (int)(lnb - res), lnb->dentry, rc);
491                         cleanup_phase = 1;
492                         GOTO(cleanup, rc);
493                 }
494         }
495
496         if (time_after(jiffies, now + 15 * HZ))
497                 CERROR("slow finish_page_read %lus\n", (jiffies - now) / HZ);
498         else
499                 CDEBUG(D_INFO, "finish_page_read: %lu jiffies\n",
500                        (jiffies - now));
501
502         filter_tally_read(&exp->exp_obd->u.filter, res, niocount);
503
504         EXIT;
505
506  cleanup:
507         switch (cleanup_phase) {
508         case 1:
509                 for (lnb = res; lnb < (res + niocount); lnb++) {
510                         if (lnb->page)
511                                 page_cache_release(lnb->page);
512                 }
513                 if (res->dentry != NULL)
514                         f_dput(res->dentry);
515                 else
516                         CERROR("NULL dentry in cleanup -- tell CFS\n");
517         case 0:
518                 OBD_FREE(fso, objcount * sizeof(*fso));
519                 pop_ctxt(&saved, &exp->exp_obd->obd_ctxt, NULL);
520         }
521         return rc;
522 }
523
524 static int filter_start_page_write(struct inode *inode,
525                                    struct niobuf_local *lnb)
526 {
527         struct page *page = alloc_pages(GFP_HIGHUSER, 0);
528         if (page == NULL) {
529                 CERROR("no memory for a temp page\n");
530                 RETURN(lnb->rc = -ENOMEM);
531         }
532         POISON_PAGE(page, 0xf1);
533         page->index = lnb->offset >> PAGE_SHIFT;
534         lnb->page = page;
535
536         return 0;
537 }
538
539 /* If we ever start to support multi-object BRW RPCs, we will need to get locks
540  * on mulitple inodes.  That isn't all, because there still exists the
541  * possibility of a truncate starting a new transaction while holding the ext3
542  * rwsem = write while some writes (which have started their transactions here)
543  * blocking on the ext3 rwsem = read => lock inversion.
544  *
545  * The handling gets very ugly when dealing with locked pages.  It may be easier
546  * to just get rid of the locked page code (which has problems of its own) and
547  * either discover we do not need it anymore (i.e. it was a symptom of another
548  * bug) or ensure we get the page locks in an appropriate order. */
549 static int filter_preprw_write(int cmd, struct obd_export *exp, struct obdo *oa,
550                                int objcount, struct obd_ioobj *obj,
551                                int niocount, struct niobuf_remote *nb,
552                                struct niobuf_local *res,
553                                struct obd_trans_info *oti)
554 {
555         struct obd_device *obd = exp->exp_obd;
556         struct obd_run_ctxt saved;
557         struct niobuf_remote *rnb = nb;
558         struct niobuf_local *lnb = res;
559         struct fsfilt_objinfo fso;
560         struct dentry *dentry;
561         int rc = 0, i, tot_bytes = 0;
562         obd_size consumed = 0, left;
563         unsigned long now = jiffies;
564         ENTRY;
565         LASSERT(objcount == 1);
566         LASSERT(obj->ioo_bufcnt > 0);
567
568         filter_grant_incoming(exp, oa);
569
570         memset(res, 0, niocount * sizeof(*res));
571
572         push_ctxt(&saved, &obd->obd_ctxt, NULL);
573         dentry = filter_fid2dentry(obd, NULL, obj->ioo_gr, obj->ioo_id);
574         if (IS_ERR(dentry))
575                 GOTO(cleanup, rc = PTR_ERR(dentry));
576
577         if (dentry->d_inode == NULL) {
578                 CERROR("trying to BRW to non-existent file "LPU64"\n",
579                        obj->ioo_id);
580                 f_dput(dentry);
581                 GOTO(cleanup, rc = -ENOENT);
582         }
583
584         fso.fso_dentry = dentry;
585         fso.fso_bufcnt = obj->ioo_bufcnt;
586
587         if (time_after(jiffies, now + 15 * HZ))
588                 CERROR("slow preprw_write setup %lus\n", (jiffies - now) / HZ);
589         else
590                 CDEBUG(D_INFO, "preprw_write setup: %lu jiffies\n",
591                        (jiffies - now));
592
593         spin_lock(&obd->obd_osfs_lock);
594         left = filter_grant_space_left(exp);
595
596         rc = filter_check_space(exp, objcount, &fso, niocount, rnb, lnb,
597                                 &left, &consumed, dentry->d_inode);
598         if (oa)
599                 filter_grant(exp, oa, left, consumed);
600
601         spin_unlock(&obd->obd_osfs_lock);
602
603         if (rc) {
604                 f_dput(dentry);
605                 GOTO(cleanup, rc);
606         }
607
608         for (i = 0, rnb = nb, lnb = res; i < obj->ioo_bufcnt;
609              i++, lnb++, rnb++) {
610
611                 /* If there were any granting failures, we should not have
612                    come here */
613                 LASSERT (lnb->rc == 0);
614
615                 lnb->dentry = dentry;
616                 lnb->offset = rnb->offset;
617                 lnb->len    = rnb->len;
618                 lnb->flags  = rnb->flags;
619                 lnb->start  = jiffies;
620
621                 rc = filter_start_page_write(dentry->d_inode, lnb);
622                 if (rc) {
623                         CDEBUG(rc == -ENOSPC ? D_INODE : D_ERROR, "page err %u@"
624                                LPU64" %u/%u %p: rc %d\n", lnb->len, lnb->offset,
625                                i, obj->ioo_bufcnt, dentry, rc);
626                         while (lnb-- > res)
627                                 __free_pages(lnb->page, 0);
628                         f_dput(dentry);
629                         GOTO(cleanup, rc);
630                 }
631                 tot_bytes += lnb->len;
632         }
633
634         if (time_after(jiffies, now + 15 * HZ))
635                 CERROR("slow start_page_write %lus\n", (jiffies - now) / HZ);
636         else
637                 CDEBUG(D_INFO, "start_page_write: %lu jiffies\n",
638                        (jiffies - now));
639
640         lprocfs_counter_add(obd->obd_stats, LPROC_FILTER_WRITE_BYTES, tot_bytes);
641         EXIT;
642 cleanup:
643         pop_ctxt(&saved, &obd->obd_ctxt, NULL);
644         return rc;
645 }
646
647 int filter_preprw(int cmd, struct obd_export *exp, struct obdo *oa,
648                   int objcount, struct obd_ioobj *obj, int niocount,
649                   struct niobuf_remote *nb, struct niobuf_local *res,
650                   struct obd_trans_info *oti)
651 {
652         if (cmd == OBD_BRW_WRITE)
653                 return filter_preprw_write(cmd, exp, oa, objcount, obj,
654                                            niocount, nb, res, oti);
655
656         if (cmd == OBD_BRW_READ)
657                 return filter_preprw_read(cmd, exp, oa, objcount, obj,
658                                           niocount, nb, res, oti);
659
660         LBUG();
661         return -EPROTO;
662 }
663
664 static int filter_commitrw_read(struct obd_export *exp, struct obdo *oa,
665                                 int objcount, struct obd_ioobj *obj,
666                                 int niocount, struct niobuf_local *res,
667                                 struct obd_trans_info *oti)
668 {
669         struct obd_ioobj *o;
670         struct niobuf_local *lnb;
671         int i, j;
672         ENTRY;
673
674         for (i = 0, o = obj, lnb = res; i < objcount; i++, o++) {
675                 for (j = 0 ; j < o->ioo_bufcnt ; j++, lnb++) {
676                         if (lnb->page != NULL)
677                                 page_cache_release(lnb->page);
678                 }
679         }
680         if (res->dentry != NULL)
681                 f_dput(res->dentry);
682         RETURN(0);
683 }
684
685 void flip_into_page_cache(struct inode *inode, struct page *new_page)
686 {
687         struct page *old_page;
688         int rc;
689
690         do {
691                 /* the dlm is protecting us from read/write concurrency, so we
692                  * expect this find_lock_page to return quickly.  even if we
693                  * race with another writer it won't be doing much work with
694                  * the page locked.  we do this 'cause t_c_p expects a 
695                  * locked page, and it wants to grab the pagecache lock
696                  * as well. */
697                 old_page = find_lock_page(inode->i_mapping, new_page->index);
698                 if (old_page) {
699 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
700                         truncate_complete_page(old_page);
701 #else
702                         truncate_complete_page(old_page->mapping, old_page);
703 #endif
704                         unlock_page(old_page);
705                         page_cache_release(old_page);
706                 }
707
708 #if 0 /* this should be a /proc tunable someday */
709                 /* racing o_directs (no locking ioctl) could race adding
710                  * their pages, so we repeat the page invalidation unless
711                  * we successfully added our new page */
712                 rc = add_to_page_cache_unique(new_page, inode->i_mapping, 
713                                               new_page->index,
714                                               page_hash(inode->i_mapping, 
715                                                         new_page->index));
716                 if (rc == 0) {
717                         /* add_to_page_cache clears uptodate|dirty and locks
718                          * the page */
719                         SetPageUptodate(new_page);
720                         unlock_page(new_page);
721                 }
722 #else   
723                 rc = 0;
724 #endif
725         } while (rc != 0);
726 }
727
728 /* XXX needs to trickle its oa down */
729 int filter_commitrw(int cmd, struct obd_export *exp, struct obdo *oa,
730                     int objcount, struct obd_ioobj *obj, int niocount,
731                     struct niobuf_local *res, struct obd_trans_info *oti)
732 {
733         if (cmd == OBD_BRW_WRITE)
734                 return filter_commitrw_write(exp, oa, objcount, obj, niocount,
735                                              res, oti);
736         if (cmd == OBD_BRW_READ)
737                 return filter_commitrw_read(exp, oa, objcount, obj, niocount,
738                                             res, oti);
739         LBUG();
740         return -EPROTO;
741 }
742
743 int filter_brw(int cmd, struct obd_export *exp, struct obdo *oa,
744                struct lov_stripe_md *lsm, obd_count oa_bufs,
745                struct brw_page *pga, struct obd_trans_info *oti)
746 {
747         struct obd_ioobj ioo;
748         struct niobuf_local *lnb;
749         struct niobuf_remote *rnb;
750         obd_count i;
751         int ret = 0;
752         ENTRY;
753
754         OBD_ALLOC(lnb, oa_bufs * sizeof(struct niobuf_local));
755         OBD_ALLOC(rnb, oa_bufs * sizeof(struct niobuf_remote));
756
757         if (lnb == NULL || rnb == NULL)
758                 GOTO(out, ret = -ENOMEM);
759
760         for (i = 0; i < oa_bufs; i++) {
761                 rnb[i].offset = pga[i].off;
762                 rnb[i].len = pga[i].count;
763         }
764
765         obdo_to_ioobj(oa, &ioo);
766         ioo.ioo_bufcnt = oa_bufs;
767
768         ret = filter_preprw(cmd, exp, oa, 1, &ioo, oa_bufs, rnb, lnb, oti);
769         if (ret != 0)
770                 GOTO(out, ret);
771
772         for (i = 0; i < oa_bufs; i++) {
773                 void *virt = kmap(pga[i].pg);
774                 obd_off off = pga[i].off & ~PAGE_MASK;
775                 void *addr = kmap(lnb[i].page);
776
777                 /* 2 kmaps == vanishingly small deadlock opportunity */
778
779                 if (cmd & OBD_BRW_WRITE)
780                         memcpy(addr + off, virt + off, pga[i].count);
781                 else
782                         memcpy(virt + off, addr + off, pga[i].count);
783
784                 kunmap(lnb[i].page);
785                 kunmap(pga[i].pg);
786         }
787
788         ret = filter_commitrw(cmd, exp, oa, 1, &ioo, oa_bufs, lnb, oti);
789
790 out:
791         if (lnb)
792                 OBD_FREE(lnb, oa_bufs * sizeof(struct niobuf_local));
793         if (rnb)
794                 OBD_FREE(rnb, oa_bufs * sizeof(struct niobuf_remote));
795         RETURN(ret);
796 }