Whamcloud - gitweb
add enough arguments for the printf format string.
[fs/lustre-release.git] / lustre / obdfilter / filter_io.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  linux/fs/obdfilter/filter_io.c
5  *
6  *  Copyright (c) 2001-2003 Cluster File Systems, Inc.
7  *   Author: Peter Braam <braam@clusterfs.com>
8  *   Author: Andreas Dilger <adilger@clusterfs.com>
9  *   Author: Phil Schwan <phil@clusterfs.com>
10  *
11  *   This file is part of Lustre, http://www.lustre.org.
12  *
13  *   Lustre is free software; you can redistribute it and/or
14  *   modify it under the terms of version 2 of the GNU General Public
15  *   License as published by the Free Software Foundation.
16  *
17  *   Lustre is distributed in the hope that it will be useful,
18  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
19  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
20  *   GNU General Public License for more details.
21  *
22  *   You should have received a copy of the GNU General Public License
23  *   along with Lustre; if not, write to the Free Software
24  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
25  */
26
27 #define DEBUG_SUBSYSTEM S_FILTER
28
29 #include <linux/config.h>
30 #include <linux/module.h>
31 #include <linux/pagemap.h> // XXX kill me soon
32 #include <linux/version.h>
33
34 #include <linux/obd_class.h>
35 #include <linux/lustre_fsfilt.h>
36 #include "filter_internal.h"
37
38 int *obdfilter_created_scratchpad;
39
40 static int filter_alloc_dio_page(struct obd_device *obd, struct inode *inode,
41                                  struct niobuf_local *lnb)
42 {
43         struct page *page;
44
45         page = alloc_pages(GFP_HIGHUSER, 0);
46         if (page == NULL) {
47                 CERROR("no memory for a temp page\n");
48                 lnb->rc = -ENOMEM;
49                 RETURN(-ENOMEM);
50         }
51 #if 0
52         POISON_PAGE(page, 0xf1);
53         if (lnb->len != PAGE_SIZE) {
54                 memset(kmap(page) + lnb->len, 0, PAGE_SIZE - lnb->len);
55                 kunmap(page);
56         }
57 #endif
58         page->index = lnb->offset >> PAGE_SHIFT;
59         lnb->page = page;
60
61         RETURN(0);
62 }
63
64 void filter_free_dio_pages(int objcount, struct obd_ioobj *obj,
65                            int niocount, struct niobuf_local *res)
66 {
67         int i, j;
68
69         for (i = 0; i < objcount; i++, obj++) {
70                 for (j = 0 ; j < obj->ioo_bufcnt ; j++, res++) {
71                         if (res->page != NULL) {
72                                 __free_page(res->page);
73                                 res->page = NULL;
74                         }
75                 }
76         }
77 }
78
79 /* Grab the dirty and seen grant announcements from the incoming obdo.
80  * We will later calculate the clients new grant and return it.
81  * Caller must hold osfs lock */
82 static void filter_grant_incoming(struct obd_export *exp, struct obdo *oa)
83 {
84         struct filter_export_data *fed;
85         struct obd_device *obd = exp->exp_obd;
86         static unsigned long last_msg;
87         static int last_count;
88         int mask = D_CACHE;
89         ENTRY;
90
91         LASSERT_SPIN_LOCKED(&obd->obd_osfs_lock);
92
93         if ((oa->o_valid & (OBD_MD_FLBLOCKS|OBD_MD_FLGRANT)) !=
94                                         (OBD_MD_FLBLOCKS|OBD_MD_FLGRANT)) {
95                 oa->o_valid &= ~OBD_MD_FLGRANT;
96                 EXIT;
97                 return;
98         }
99
100         fed = &exp->exp_filter_data;
101
102         /* Don't print this to the console the first time it happens, since
103          * it can happen legitimately on occasion, but only rarely. */
104         if (time_after(jiffies, last_msg + 60 * HZ)) {
105                 last_count = 0;
106                 last_msg = jiffies;
107         }
108         if ((last_count & (-last_count)) == last_count)
109                 mask = D_WARNING;
110         last_count++;
111
112         /* Add some margin, since there is a small race if other RPCs arrive
113          * out-or-order and have already consumed some grant.  We want to
114          * leave this here in case there is a large error in accounting. */
115         CDEBUG(oa->o_grant > fed->fed_grant + FILTER_GRANT_CHUNK ? mask:D_CACHE,
116                "%s: cli %s/%p reports grant: "LPU64" dropped: %u, local: %lu\n",
117                obd->obd_name, exp->exp_client_uuid.uuid, exp, oa->o_grant,
118                oa->o_dropped, fed->fed_grant);
119
120         /* Update our accounting now so that statfs takes it into account.
121          * Note that fed_dirty is only approximate and can become incorrect
122          * if RPCs arrive out-of-order.  No important calculations depend
123          * on fed_dirty however. */
124         obd->u.filter.fo_tot_dirty += oa->o_dirty - fed->fed_dirty;
125         if (fed->fed_grant < oa->o_dropped) {
126                 CERROR("%s: cli %s/%p reports %u dropped > fed_grant %lu\n",
127                        obd->obd_name, exp->exp_client_uuid.uuid, exp,
128                        oa->o_dropped, fed->fed_grant);
129                 oa->o_dropped = 0;
130         }
131         if (obd->u.filter.fo_tot_granted < oa->o_dropped) {
132                 CERROR("%s: cli %s/%p reports %u dropped > tot_grant "LPU64"\n",
133                        obd->obd_name, exp->exp_client_uuid.uuid, exp,
134                        oa->o_dropped, obd->u.filter.fo_tot_granted);
135                 oa->o_dropped = 0;
136         }
137         obd->u.filter.fo_tot_granted -= oa->o_dropped;
138         fed->fed_grant -= oa->o_dropped;
139         fed->fed_dirty = oa->o_dirty;
140         if (fed->fed_dirty < 0 || fed->fed_grant < 0 || fed->fed_pending < 0) {
141                 CERROR("%s: cli %s/%p dirty %ld pend %ld grant %ld\n",
142                        obd->obd_name, exp->exp_client_uuid.uuid, exp,
143                        fed->fed_dirty, fed->fed_pending, fed->fed_grant);
144                 spin_unlock(&obd->obd_osfs_lock);
145                 LBUG();
146         }
147         EXIT;
148 }
149
150 #define GRANT_FOR_LLOG(obd) 16
151
152 /* Figure out how much space is available between what we've granted
153  * and what remains in the filesystem.  Compensate for ext3 indirect
154  * block overhead when computing how much free space is left ungranted.
155  *
156  * Caller must hold obd_osfs_lock. */
157 obd_size filter_grant_space_left(struct obd_export *exp)
158 {
159         struct obd_device *obd = exp->exp_obd;
160         int blockbits = obd->u.filter.fo_sb->s_blocksize_bits;
161         obd_size tot_granted = obd->u.filter.fo_tot_granted, avail, left = 0;
162         int rc, statfs_done = 0;
163
164         LASSERT_SPIN_LOCKED(&obd->obd_osfs_lock);
165
166         if (time_before(obd->obd_osfs_age, jiffies - HZ)) {
167 restat:
168                 rc = fsfilt_statfs(obd, obd->u.filter.fo_sb, jiffies + 1);
169                 if (rc) /* N.B. statfs can't really fail */
170                         RETURN(0);
171                 statfs_done = 1;
172         }
173
174         avail = obd->obd_osfs.os_bavail;
175         left = avail - (avail >> (blockbits - 3)); /* (d)indirect */
176         if (left > GRANT_FOR_LLOG(obd)) {
177                 left = (left - GRANT_FOR_LLOG(obd)) << blockbits;
178         } else {
179                 left = 0 /* << blockbits */;
180         }
181
182         if (!statfs_done && left < 32 * FILTER_GRANT_CHUNK + tot_granted) {
183                 CDEBUG(D_CACHE, "fs has no space left and statfs too old\n");
184                 goto restat;
185         }
186
187         if (left >= tot_granted) {
188                 left -= tot_granted;
189         } else {
190                 if (left < tot_granted - obd->u.filter.fo_tot_pending + 65536) {
191                         CERROR("%s: cli %s/%p grant "LPU64" > available "
192                                LPU64" and pending "LPU64"\n", obd->obd_name,
193                                exp->exp_client_uuid.uuid, exp, tot_granted,
194                                left, obd->u.filter.fo_tot_pending);
195                 }
196                 left = 0;
197         }
198
199         CDEBUG(D_CACHE, "%s: cli %s/%p free: "LPU64" avail: "LPU64" grant "LPU64
200                " left: "LPU64" pending: "LPU64"\n", obd->obd_name,
201                exp->exp_client_uuid.uuid, exp,
202                obd->obd_osfs.os_bfree << blockbits, avail << blockbits,
203                tot_granted, left, obd->u.filter.fo_tot_pending);
204
205         return left;
206 }
207
208 /* Calculate how much grant space to allocate to this client, based on how
209  * much space is currently free and how much of that is already granted.
210  *
211  * Caller must hold obd_osfs_lock. */
212 long filter_grant(struct obd_export *exp, obd_size current_grant,
213                   obd_size want, obd_size fs_space_left)
214 {
215         struct obd_device *obd = exp->exp_obd;
216         struct filter_export_data *fed = &exp->exp_filter_data;
217         int blockbits = obd->u.filter.fo_sb->s_blocksize_bits;
218         __u64 grant = 0;
219
220         LASSERT_SPIN_LOCKED(&obd->obd_osfs_lock);
221
222         /* Grant some fraction of the client's requested grant space so that
223          * they are not always waiting for write credits (not all of it to
224          * avoid overgranting in face of multiple RPCs in flight).  This
225          * essentially will be able to control the OSC_MAX_RIF for a client.
226          *
227          * If we do have a large disparity between what the client thinks it
228          * has and what we think it has, don't grant very much and let the
229          * client consume its grant first.  Either it just has lots of RPCs
230          * in flight, or it was evicted and its grants will soon be used up. */
231         if (want > 0x7fffffff) {
232                 CERROR("%s: client %s/%p requesting > 2GB grant "LPU64"\n",
233                        obd->obd_name, exp->exp_client_uuid.uuid, exp, want);
234         } else if (current_grant < want &&
235                    current_grant < fed->fed_grant + FILTER_GRANT_CHUNK) {
236                 grant = min((want >> blockbits) / 2,
237                             (fs_space_left >> blockbits) / 8);
238                 grant <<= blockbits;
239
240                 if (grant) {
241                         if (grant > FILTER_GRANT_CHUNK)
242                                 grant = FILTER_GRANT_CHUNK;
243
244                         obd->u.filter.fo_tot_granted += grant;
245                         fed->fed_grant += grant;
246                         if (fed->fed_grant < 0) {
247                                 CERROR("%s: cli %s/%p grant %ld want "LPU64
248                                        "current"LPU64"\n",
249                                        obd->obd_name, exp->exp_client_uuid.uuid,
250                                        exp, fed->fed_grant, want,current_grant);
251                                 spin_unlock(&obd->obd_osfs_lock);
252                                 LBUG();
253                         }
254                 }
255         }
256
257         CDEBUG(D_CACHE,"%s: cli %s/%p wants: "LPU64" granting: "LPU64"\n",
258                obd->obd_name, exp->exp_client_uuid.uuid, exp, want, grant);
259         CDEBUG(D_CACHE,
260                "%s: cli %s/%p tot cached:"LPU64" granted:"LPU64
261                " num_exports: %d\n", obd->obd_name, exp->exp_client_uuid.uuid,
262                exp, obd->u.filter.fo_tot_dirty,
263                obd->u.filter.fo_tot_granted, obd->obd_num_exports);
264
265         return grant;
266 }
267
268 static int filter_preprw_read(int cmd, struct obd_export *exp, struct obdo *oa,
269                               int objcount, struct obd_ioobj *obj,
270                               int niocount, struct niobuf_remote *nb,
271                               struct niobuf_local *res,
272                               struct obd_trans_info *oti)
273 {
274         struct obd_device *obd = exp->exp_obd;
275         struct obd_run_ctxt saved;
276         struct niobuf_remote *rnb;
277         struct niobuf_local *lnb;
278         struct dentry *dentry = NULL;
279         struct inode *inode;
280         void *iobuf = NULL;
281         int rc = 0, i, tot_bytes = 0;
282         unsigned long now = jiffies;
283         ENTRY;
284
285         /* We are currently not supporting multi-obj BRW_READ RPCS at all.
286          * When we do this function's dentry cleanup will need to be fixed */
287         LASSERTF(objcount == 1, "%d\n", objcount);
288         LASSERTF(obj->ioo_bufcnt > 0, "%d\n", obj->ioo_bufcnt);
289
290         if (oa && oa->o_valid & OBD_MD_FLGRANT) {
291                 spin_lock(&obd->obd_osfs_lock);
292                 filter_grant_incoming(exp, oa);
293
294                 oa->o_grant = 0;
295                 spin_unlock(&obd->obd_osfs_lock);
296         }
297
298         memset(res, 0, niocount * sizeof(*res));
299
300         push_ctxt(&saved, &exp->exp_obd->obd_ctxt, NULL);
301
302         rc = filter_alloc_iobuf(&obd->u.filter, OBD_BRW_READ, obj->ioo_bufcnt,
303                                 &iobuf);
304         if (rc)
305                 GOTO(cleanup, rc);
306
307         dentry = filter_oa2dentry(obd, oa);
308         if (IS_ERR(dentry)) {
309                 rc = PTR_ERR(dentry);
310                 dentry = NULL;
311                 GOTO(cleanup, rc);
312         }
313
314         inode = dentry->d_inode;
315
316         if (oa)
317                 obdo_to_inode(inode, oa, OBD_MD_FLATIME);
318
319         fsfilt_check_slow(now, obd_timeout, "preprw_read setup");
320
321         for (i = 0, lnb = res, rnb = nb; i < obj->ioo_bufcnt; 
322              i++, rnb++, lnb++) {
323                 lnb->dentry = dentry;
324                 lnb->offset = rnb->offset;
325                 lnb->len    = rnb->len;
326                 lnb->flags  = rnb->flags;
327
328                 if (inode->i_size <= rnb->offset)
329                       /* If there's no more data, abort early.
330                       * lnb->page == NULL and lnb->rc == 0, so it's
331                       * easy to detect later. */
332                         break;
333                 else
334                         rc = filter_alloc_dio_page(obd, inode, lnb);
335
336                 if (rc) {
337                         CDEBUG(rc == -ENOSPC ? D_INODE : D_ERROR,
338                              "page err %u@"LPU64" %u/%u %p: rc %d\n",
339                               lnb->len, lnb->offset, i, obj->ioo_bufcnt,
340                               dentry, rc);
341                         GOTO(cleanup, rc);
342                 }
343
344                 if (inode->i_size < lnb->offset + lnb->len - 1)
345                         lnb->rc = inode->i_size - lnb->offset;
346                 else
347                         lnb->rc = lnb->len;
348
349                 tot_bytes += lnb->rc;
350
351                 filter_iobuf_add_page(obd, iobuf, inode, lnb->page);
352         }
353
354         fsfilt_check_slow(now, obd_timeout, "start_page_read");
355
356         rc = filter_direct_io(OBD_BRW_READ, dentry, iobuf, exp,
357                               NULL, NULL, NULL);
358         if (rc)
359                 GOTO(cleanup, rc);
360
361         lprocfs_counter_add(obd->obd_stats, LPROC_FILTER_READ_BYTES, tot_bytes);
362
363         filter_tally_read(&exp->exp_obd->u.filter, res, niocount);
364
365         EXIT;
366
367  cleanup:
368         if (rc != 0) {
369                 filter_free_dio_pages(objcount, obj, niocount, res);
370
371                 if (dentry != NULL)
372                         f_dput(dentry);
373         }
374
375         if (iobuf != NULL)
376                 filter_free_iobuf(iobuf);
377
378         pop_ctxt(&saved, &exp->exp_obd->obd_ctxt, NULL);
379         if (rc)
380                 CERROR("io error %d\n", rc);
381
382         return rc;
383 }
384
385 /* When clients have dirtied as much space as they've been granted they
386  * fall through to sync writes.  These sync writes haven't been expressed
387  * in grants and need to error with ENOSPC when there isn't room in the
388  * filesystem for them after grants are taken into account.  However,
389  * writeback of the dirty data that was already granted space can write
390  * right on through.
391  *
392  * Caller must hold obd_osfs_lock. */
393 static int filter_grant_check(struct obd_export *exp, int objcount,
394                               struct fsfilt_objinfo *fso, int niocount,
395                               struct niobuf_remote *rnb,
396                               struct niobuf_local *lnb, obd_size *left,
397                               struct inode *inode)
398 {
399         struct filter_export_data *fed = &exp->exp_filter_data;
400         int blocksize = exp->exp_obd->u.filter.fo_sb->s_blocksize;
401         unsigned long used = 0, ungranted = 0, using;
402         int i, rc = -ENOSPC, obj, n = 0, mask = D_CACHE;
403
404         LASSERT_SPIN_LOCKED(&exp->exp_obd->obd_osfs_lock);
405
406         for (obj = 0; obj < objcount; obj++) {
407                 for (i = 0; i < fso[obj].fso_bufcnt; i++, n++) {
408                         int tmp, bytes;
409
410                         /* should match the code in osc_exit_cache */
411                         bytes = rnb[n].len;
412                         bytes += rnb[n].offset & (blocksize - 1);
413                         tmp = (rnb[n].offset + rnb[n].len) & (blocksize - 1);
414                         if (tmp)
415                                 bytes += blocksize - tmp;
416
417                         if (rnb[n].flags & OBD_BRW_FROM_GRANT) {
418                                 if (fed->fed_grant < used + bytes) {
419                                         CDEBUG(D_CACHE,
420                                                "%s: cli %s/%p claims %ld+%d "
421                                                "GRANT, real grant %lu idx %d\n",
422                                                exp->exp_obd->obd_name,
423                                                exp->exp_client_uuid.uuid, exp,
424                                                used, bytes, fed->fed_grant, n);
425                                         mask = D_ERROR;
426                                 } else {
427                                         used += bytes;
428                                         rnb[n].flags |= OBD_BRW_GRANTED;
429                                         lnb[n].lnb_grant_used = bytes;
430                                         CDEBUG(0, "idx %d used=%lu\n", n, used);
431                                         rc = 0;
432                                         continue;
433                                 }
434                         }
435                         if (*left > ungranted) {
436                                 /* if enough space, pretend it was granted */
437                                 ungranted += bytes;
438                                 rnb[n].flags |= OBD_BRW_GRANTED;
439                                 CDEBUG(0, "idx %d ungranted=%lu\n",n,ungranted);
440                                 rc = 0;
441                                 continue;
442                         }
443
444                         /* We can't check for already-mapped blocks here, as
445                          * it requires dropping the osfs lock to do the bmap.
446                          * Instead, we return ENOSPC and in that case we need
447                          * to go through and verify if all of the blocks not
448                          * marked BRW_GRANTED are already mapped and we can
449                          * ignore this error. */
450                         lnb[n].rc = -ENOSPC;
451                         rnb[n].flags &= OBD_BRW_GRANTED;
452                         CDEBUG(D_CACHE,"%s: cli %s/%p idx %d no space for %d\n",
453                                exp->exp_obd->obd_name,
454                                exp->exp_client_uuid.uuid, exp, n, bytes);
455                 }
456         }
457
458         /* Now substract what client have used already.  We don't subtract
459          * this from the tot_granted yet, so that other client's can't grab
460          * that space before we have actually allocated our blocks.  That
461          * happens in filter_grant_commit() after the writes are done. */
462         *left -= ungranted;
463         fed->fed_grant -= used;
464         fed->fed_pending += used;
465         exp->exp_obd->u.filter.fo_tot_pending += used;
466
467         CDEBUG(mask,
468                "%s: cli %s/%p used: %lu ungranted: %lu grant: %lu dirty: %lu\n",
469                exp->exp_obd->obd_name, exp->exp_client_uuid.uuid, exp, used,
470                ungranted, fed->fed_grant, fed->fed_dirty);
471
472         /* Rough calc in case we don't refresh cached statfs data */
473         using = (used + ungranted + 1 ) >>
474                 exp->exp_obd->u.filter.fo_sb->s_blocksize_bits;
475         if (exp->exp_obd->obd_osfs.os_bavail > using)
476                 exp->exp_obd->obd_osfs.os_bavail -= using;
477         else
478                 exp->exp_obd->obd_osfs.os_bavail = 0;
479
480         if (fed->fed_dirty < used) {
481                 CERROR("%s: cli %s/%p claims used %lu > fed_dirty %lu\n",
482                        exp->exp_obd->obd_name, exp->exp_client_uuid.uuid, exp,
483                        used, fed->fed_dirty);
484                 used = fed->fed_dirty;
485         }
486         exp->exp_obd->u.filter.fo_tot_dirty -= used;
487         fed->fed_dirty -= used;
488
489         if (fed->fed_dirty < 0 || fed->fed_grant < 0 || fed->fed_pending < 0) {
490                 CERROR("%s: cli %s/%p dirty %ld pend %ld grant %ld\n",
491                        exp->exp_obd->obd_name, exp->exp_client_uuid.uuid, exp,
492                        fed->fed_dirty, fed->fed_pending, fed->fed_grant);
493                 spin_unlock(&exp->exp_obd->obd_osfs_lock);
494                 LBUG();
495         }
496         return rc;
497 }
498
499 /* If we ever start to support multi-object BRW RPCs, we will need to get locks
500  * on mulitple inodes.  That isn't all, because there still exists the
501  * possibility of a truncate starting a new transaction while holding the ext3
502  * rwsem = write while some writes (which have started their transactions here)
503  * blocking on the ext3 rwsem = read => lock inversion.
504  *
505  * The handling gets very ugly when dealing with locked pages.  It may be easier
506  * to just get rid of the locked page code (which has problems of its own) and
507  * either discover we do not need it anymore (i.e. it was a symptom of another
508  * bug) or ensure we get the page locks in an appropriate order. */
509 static int filter_preprw_write(int cmd, struct obd_export *exp, struct obdo *oa,
510                                int objcount, struct obd_ioobj *obj,
511                                int niocount, struct niobuf_remote *nb,
512                                struct niobuf_local *res,
513                                struct obd_trans_info *oti)
514 {
515         struct obd_run_ctxt saved;
516         struct niobuf_remote *rnb;
517         struct niobuf_local *lnb = res;
518         struct fsfilt_objinfo fso;
519         struct dentry *dentry = NULL;
520         void *iobuf;
521         obd_size left;
522         unsigned long now = jiffies;
523         int rc = 0, i, tot_bytes = 0, cleanup_phase = 0;
524         ENTRY;
525         LASSERT(objcount == 1);
526         LASSERT(obj->ioo_bufcnt > 0);
527
528         memset(res, 0, niocount * sizeof(*res));
529
530         rc = filter_alloc_iobuf(&exp->exp_obd->u.filter, OBD_BRW_READ,
531                                 obj->ioo_bufcnt, &iobuf);
532         if (rc)
533                 GOTO(cleanup, rc);
534         cleanup_phase = 1;
535
536         push_ctxt(&saved, &exp->exp_obd->obd_ctxt, NULL);
537         dentry = filter_fid2dentry(exp->exp_obd, NULL, obj->ioo_gr,
538                                    obj->ioo_id);
539         if (IS_ERR(dentry))
540                 GOTO(cleanup, rc = PTR_ERR(dentry));
541         cleanup_phase = 2;
542
543         if (dentry->d_inode == NULL) {
544                 CERROR("trying to BRW to non-existent file "LPU64"\n",
545                        obj->ioo_id);
546                 GOTO(cleanup, rc = -ENOENT);
547         }
548
549         fso.fso_dentry = dentry;
550         fso.fso_bufcnt = obj->ioo_bufcnt;
551
552         fsfilt_check_slow(now, obd_timeout, "preprw_write setup");
553
554         spin_lock(&exp->exp_obd->obd_osfs_lock);
555         if (oa) {
556                 filter_grant_incoming(exp, oa);
557                 obdo_to_inode(dentry->d_inode, oa,
558                               OBD_MD_FLATIME | OBD_MD_FLMTIME | OBD_MD_FLCTIME);
559         }
560         cleanup_phase = 3;
561
562         left = filter_grant_space_left(exp);
563
564         rc = filter_grant_check(exp, objcount, &fso, niocount, nb, res,
565                                 &left, dentry->d_inode);
566
567         /* We're finishing using body->oa as an input variable, so reset
568          * o_valid here. */
569         if (oa && oa->o_valid & OBD_MD_FLGRANT) {
570                 oa->o_grant = filter_grant(exp,oa->o_grant,oa->o_undirty,left);
571                 oa->o_valid = OBD_MD_FLGRANT;
572         } else if (oa) {
573                 oa->o_valid = 0;
574         }
575
576         spin_unlock(&exp->exp_obd->obd_osfs_lock);
577
578         if (rc)
579                 GOTO(cleanup, rc);
580
581         for (i = 0, rnb = nb, lnb = res; i < obj->ioo_bufcnt;
582              i++, lnb++, rnb++) {
583                 /* We still set up for ungranted pages so that granted pages
584                  * can be written to disk as they were promised, and portals
585                  * needs to keep the pages all aligned properly. */
586                 lnb->dentry = dentry;
587                 lnb->offset = rnb->offset;
588                 lnb->len    = rnb->len;
589                 lnb->flags  = rnb->flags;
590
591                 rc = filter_alloc_dio_page(exp->exp_obd, dentry->d_inode,lnb);
592                 if (rc) {
593                         CERROR("page err %u@"LPU64" %u/%u %p: rc %d\n",
594                                lnb->len, lnb->offset,
595                                i, obj->ioo_bufcnt, dentry, rc);
596                         GOTO(cleanup, rc);
597                 }
598                 cleanup_phase = 4;
599
600                 /* If the filter writes a partial page, then has the file
601                  * extended, the client will read in the whole page.  the
602                  * filter has to be careful to zero the rest of the partial
603                  * page on disk.  we do it by hand for partial extending
604                  * writes, send_bio() is responsible for zeroing pages when
605                  * asked to read unmapped blocks -- brw_kiovec() does this. */
606                 if (lnb->len != PAGE_SIZE) {
607                         __s64 maxidx;
608
609                         maxidx = ((dentry->d_inode->i_size + PAGE_SIZE - 1) >>
610                                  PAGE_SHIFT) - 1;
611                         if (maxidx >= lnb->page->index) {
612                                 LL_CDEBUG_PAGE(D_PAGE, lnb->page, "write %u @ "
613                                                LPU64" flg %x before EOF %llu\n",
614                                                lnb->len, lnb->offset,lnb->flags,
615                                                dentry->d_inode->i_size);
616                                 filter_iobuf_add_page(exp->exp_obd, iobuf,
617                                                       dentry->d_inode,
618                                                       lnb->page);
619                         } else {
620                                 long off;
621                                 char *p = kmap(lnb->page);
622
623                                 off = lnb->offset & ~PAGE_MASK;
624                                 if (off)
625                                         memset(p, 0, off);
626                                 off = (lnb->offset + lnb->len) & ~PAGE_MASK;
627                                 if (off)
628                                         memset(p + off, 0, PAGE_SIZE - off);
629                                 kunmap(lnb->page);
630                         }
631                 }
632
633                 if (lnb->rc == 0)
634                         tot_bytes += lnb->len;
635         }
636
637         rc = filter_direct_io(OBD_BRW_READ, dentry, iobuf, exp,
638                               NULL, NULL, NULL);
639
640         fsfilt_check_slow(now, obd_timeout, "start_page_write");
641
642         lprocfs_counter_add(exp->exp_obd->obd_stats, LPROC_FILTER_WRITE_BYTES,
643                             tot_bytes);
644         EXIT;
645 cleanup:
646         switch(cleanup_phase) {
647         case 4:
648                 if (rc)
649                         filter_free_dio_pages(objcount, obj, niocount, res);
650         case 3:
651                 pop_ctxt(&saved, &exp->exp_obd->obd_ctxt, NULL);
652                 filter_free_iobuf(iobuf);
653         case 2:
654                 if (rc)
655                         f_dput(dentry);
656                 break;
657         case 1:
658                 spin_lock(&exp->exp_obd->obd_osfs_lock);
659                 if (oa)
660                         filter_grant_incoming(exp, oa);
661                 spin_unlock(&exp->exp_obd->obd_osfs_lock);
662                 pop_ctxt(&saved, &exp->exp_obd->obd_ctxt, NULL);
663                 filter_free_iobuf(iobuf);
664                 break;
665         default:;
666         }
667         return rc;
668 }
669
670 int filter_preprw(int cmd, struct obd_export *exp, struct obdo *oa,
671                   int objcount, struct obd_ioobj *obj, int niocount,
672                   struct niobuf_remote *nb, struct niobuf_local *res,
673                   struct obd_trans_info *oti)
674 {
675         if (cmd == OBD_BRW_WRITE)
676                 return filter_preprw_write(cmd, exp, oa, objcount, obj,
677                                            niocount, nb, res, oti);
678
679         if (cmd == OBD_BRW_READ)
680                 return filter_preprw_read(cmd, exp, oa, objcount, obj,
681                                           niocount, nb, res, oti);
682
683         LBUG();
684         return -EPROTO;
685 }
686
687 void filter_release_read_page(struct filter_obd *filter, struct inode *inode,
688                               struct page *page)
689 {
690         int drop = 0;
691
692         if (inode != NULL &&
693             (inode->i_size > filter->fo_readcache_max_filesize))
694                 drop = 1;
695
696         /* drop from cache like truncate_list_pages() */
697         if (drop && !TryLockPage(page)) {
698                 if (page->mapping)
699                         ll_truncate_complete_page(page);
700                 unlock_page(page);
701         }
702         page_cache_release(page);
703 }
704
705 static int filter_commitrw_read(struct obd_export *exp, struct obdo *oa,
706                                 int objcount, struct obd_ioobj *obj,
707                                 int niocount, struct niobuf_local *res,
708                                 struct obd_trans_info *oti, int rc)
709 {
710         struct inode *inode = NULL;
711         ENTRY;
712
713         if (res->dentry != NULL)
714                 inode = res->dentry->d_inode;
715
716         filter_free_dio_pages(objcount, obj, niocount, res);
717
718         if (res->dentry != NULL)
719                 f_dput(res->dentry);
720         RETURN(rc);
721 }
722
723 void flip_into_page_cache(struct inode *inode, struct page *new_page)
724 {
725         struct page *old_page;
726         int rc;
727
728         do {
729                 /* the dlm is protecting us from read/write concurrency, so we
730                  * expect this find_lock_page to return quickly.  even if we
731                  * race with another writer it won't be doing much work with
732                  * the page locked.  we do this 'cause t_c_p expects a
733                  * locked page, and it wants to grab the pagecache lock
734                  * as well. */
735                 old_page = find_lock_page(inode->i_mapping, new_page->index);
736                 if (old_page) {
737                         ll_truncate_complete_page(old_page);
738                         unlock_page(old_page);
739                         page_cache_release(old_page);
740                 }
741
742 #if 0 /* this should be a /proc tunable someday */
743                 /* racing o_directs (no locking ioctl) could race adding
744                  * their pages, so we repeat the page invalidation unless
745                  * we successfully added our new page */
746                 rc = add_to_page_cache_unique(new_page, inode->i_mapping,
747                                               new_page->index,
748                                               page_hash(inode->i_mapping,
749                                                         new_page->index));
750                 if (rc == 0) {
751                         /* add_to_page_cache clears uptodate|dirty and locks
752                          * the page */
753                         SetPageUptodate(new_page);
754                         unlock_page(new_page);
755                 }
756 #else
757                 rc = 0;
758 #endif
759         } while (rc != 0);
760 }
761
762 void filter_grant_commit(struct obd_export *exp, int niocount,
763                          struct niobuf_local *res)
764 {
765         struct filter_obd *filter = &exp->exp_obd->u.filter;
766         struct niobuf_local *lnb = res;
767         unsigned long pending = 0;
768         int i;
769
770         spin_lock(&exp->exp_obd->obd_osfs_lock);
771         for (i = 0, lnb = res; i < niocount; i++, lnb++)
772                 pending += lnb->lnb_grant_used;
773
774         LASSERTF(exp->exp_filter_data.fed_pending >= pending,
775                  "%s: cli %s/%p fed_pending: %lu grant_used: %lu\n",
776                  exp->exp_obd->obd_name, exp->exp_client_uuid.uuid, exp,
777                  exp->exp_filter_data.fed_pending, pending);
778         exp->exp_filter_data.fed_pending -= pending;
779         LASSERTF(filter->fo_tot_granted >= pending,
780                  "%s: cli %s/%p tot_granted: "LPU64" grant_used: %lu\n",
781                  exp->exp_obd->obd_name, exp->exp_client_uuid.uuid, exp,
782                  exp->exp_obd->u.filter.fo_tot_granted, pending);
783         filter->fo_tot_granted -= pending;
784         LASSERTF(filter->fo_tot_pending >= pending,
785                  "%s: cli %s/%p tot_pending: "LPU64" grant_used: %lu\n",
786                  exp->exp_obd->obd_name, exp->exp_client_uuid.uuid, exp,
787                  filter->fo_tot_pending, pending);
788         filter->fo_tot_pending -= pending;
789
790         spin_unlock(&exp->exp_obd->obd_osfs_lock);
791 }
792
793 int filter_commitrw(int cmd, struct obd_export *exp, struct obdo *oa,
794                     int objcount, struct obd_ioobj *obj, int niocount,
795                     struct niobuf_local *res, struct obd_trans_info *oti,int rc)
796 {
797         if (cmd == OBD_BRW_WRITE)
798                 return filter_commitrw_write(exp, oa, objcount, obj, niocount,
799                                              res, oti, rc);
800         if (cmd == OBD_BRW_READ)
801                 return filter_commitrw_read(exp, oa, objcount, obj, niocount,
802                                             res, oti, rc);
803         LBUG();
804         return -EPROTO;
805 }
806
807 int filter_brw(int cmd, struct obd_export *exp, struct obdo *oa,
808                struct lov_stripe_md *lsm, obd_count oa_bufs,
809                struct brw_page *pga, struct obd_trans_info *oti)
810 {
811         struct obd_ioobj ioo;
812         struct niobuf_local *lnb;
813         struct niobuf_remote *rnb;
814         obd_count i;
815         int ret = 0;
816         ENTRY;
817
818         OBD_ALLOC(lnb, oa_bufs * sizeof(struct niobuf_local));
819         OBD_ALLOC(rnb, oa_bufs * sizeof(struct niobuf_remote));
820
821         if (lnb == NULL || rnb == NULL)
822                 GOTO(out, ret = -ENOMEM);
823
824         for (i = 0; i < oa_bufs; i++) {
825                 rnb[i].offset = pga[i].off;
826                 rnb[i].len = pga[i].count;
827         }
828
829         obdo_to_ioobj(oa, &ioo);
830         ioo.ioo_bufcnt = oa_bufs;
831
832         ret = filter_preprw(cmd, exp, oa, 1, &ioo, oa_bufs, rnb, lnb, oti);
833         if (ret != 0)
834                 GOTO(out, ret);
835
836         for (i = 0; i < oa_bufs; i++) {
837                 void *virt;
838                 obd_off off;
839                 void *addr;
840
841                 if (lnb[i].page == NULL)
842                         break;
843
844                 off = pga[i].off & ~PAGE_MASK;
845                 virt = kmap(pga[i].pg);
846                 addr = kmap(lnb[i].page);
847
848                 /* 2 kmaps == vanishingly small deadlock opportunity */
849
850                 if (cmd & OBD_BRW_WRITE)
851                         memcpy(addr + off, virt + off, pga[i].count);
852                 else
853                         memcpy(virt + off, addr + off, pga[i].count);
854
855                 kunmap(lnb[i].page);
856                 kunmap(pga[i].pg);
857         }
858
859         ret = filter_commitrw(cmd, exp, oa, 1, &ioo, oa_bufs, lnb, oti, ret);
860
861 out:
862         if (lnb)
863                 OBD_FREE(lnb, oa_bufs * sizeof(struct niobuf_local));
864         if (rnb)
865                 OBD_FREE(rnb, oa_bufs * sizeof(struct niobuf_remote));
866         RETURN(ret);
867 }