Whamcloud - gitweb
b77a4015ee3662c83b0a07f8071570f03c7fe8e0
[fs/lustre-release.git] / lustre / obdfilter / filter_io.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  linux/fs/obdfilter/filter_io.c
5  *
6  *  Copyright (c) 2001-2003 Cluster File Systems, Inc.
7  *   Author: Peter Braam <braam@clusterfs.com>
8  *   Author: Andreas Dilger <adilger@clusterfs.com>
9  *   Author: Phil Schwan <phil@clusterfs.com>
10  *
11  *   This file is part of Lustre, http://www.lustre.org.
12  *
13  *   Lustre is free software; you can redistribute it and/or
14  *   modify it under the terms of version 2 of the GNU General Public
15  *   License as published by the Free Software Foundation.
16  *
17  *   Lustre is distributed in the hope that it will be useful,
18  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
19  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
20  *   GNU General Public License for more details.
21  *
22  *   You should have received a copy of the GNU General Public License
23  *   along with Lustre; if not, write to the Free Software
24  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
25  */
26
27 #define DEBUG_SUBSYSTEM S_FILTER
28
29 #include <linux/config.h>
30 #include <linux/module.h>
31 #include <linux/pagemap.h> // XXX kill me soon
32 #include <linux/version.h>
33
34 #include <linux/obd_class.h>
35 #include <linux/lustre_fsfilt.h>
36 #include "filter_internal.h"
37
38 static int filter_start_page_read(struct inode *inode, struct niobuf_local *lnb)
39 {
40         struct address_space *mapping = inode->i_mapping;
41         struct page *page;
42         unsigned long index = lnb->offset >> PAGE_SHIFT;
43         int rc;
44
45         page = grab_cache_page(mapping, index); /* locked page */
46         if (page == NULL)
47                 return lnb->rc = -ENOMEM;
48
49         LASSERT(page->mapping == mapping);
50
51         lnb->page = page;
52
53         if (inode->i_size < lnb->offset + lnb->len - 1)
54                 lnb->rc = inode->i_size - lnb->offset;
55         else
56                 lnb->rc = lnb->len;
57
58         if (PageUptodate(page)) {
59                 unlock_page(page);
60                 return 0;
61         }
62
63         rc = mapping->a_ops->readpage(NULL, page);
64         if (rc < 0) {
65                 CERROR("page index %lu, rc = %d\n", index, rc);
66                 lnb->page = NULL;
67                 page_cache_release(page);
68                 return lnb->rc = rc;
69         }
70
71         return 0;
72 }
73
74 static int filter_finish_page_read(struct niobuf_local *lnb)
75 {
76         if (lnb->page == NULL)
77                 return 0;
78
79         if (PageUptodate(lnb->page))
80                 return 0;
81
82         wait_on_page(lnb->page);
83         if (!PageUptodate(lnb->page)) {
84                 CERROR("page index %lu/offset "LPX64" not uptodate\n",
85                        lnb->page->index, lnb->offset);
86                 GOTO(err_page, lnb->rc = -EIO);
87         }
88         if (PageError(lnb->page)) {
89                 CERROR("page index %lu/offset "LPX64" has error\n",
90                        lnb->page->index, lnb->offset);
91                 GOTO(err_page, lnb->rc = -EIO);
92         }
93
94         return 0;
95
96 err_page:
97         page_cache_release(lnb->page);
98         lnb->page = NULL;
99         return lnb->rc;
100 }
101
102 /* Grab the dirty and seen grant announcements from the incoming obdo.
103  * We will later calculate the clients new grant and return it.
104  * Caller must hold osfs lock */
105 static void filter_grant_incoming(struct obd_export *exp, struct obdo *oa)
106 {
107         struct filter_export_data *fed;
108         struct obd_device *obd = exp->exp_obd;
109         ENTRY;
110
111         if ((oa->o_valid & (OBD_MD_FLBLOCKS|OBD_MD_FLGRANT)) !=
112                                         (OBD_MD_FLBLOCKS|OBD_MD_FLGRANT)) {
113                 oa->o_valid &= ~OBD_MD_FLGRANT;
114                 EXIT;
115                 return;
116         }
117
118         fed = &exp->exp_filter_data;
119
120         /* Add some margin, since there is a small race if other RPCs arrive
121          * out-or-order and have already consumed some grant.  We want to
122          * leave this here in case there is a large error in accounting. */
123         CDEBUG(oa->o_grant > fed->fed_grant + FILTER_GRANT_CHUNK ?
124                D_ERROR : D_CACHE,
125                "%s: cli %s reports granted: "LPU64" dropped: %u, local: %lu\n",
126                obd->obd_name, exp->exp_client_uuid.uuid, oa->o_grant,
127                oa->o_dropped, fed->fed_grant);
128
129         /* Update our accounting now so that statfs takes it into account.
130          * Note that fed_dirty is only approximate and can become incorrect
131          * if RPCs arrive out-of-order.  No important calculations depend
132          * on fed_dirty however. */
133         obd->u.filter.fo_tot_dirty += oa->o_dirty - fed->fed_dirty;
134         if (fed->fed_grant < oa->o_dropped) {
135                 CERROR("%s: cli %s reports %u dropped > fed_grant %lu\n",
136                        obd->obd_name, exp->exp_client_uuid.uuid,
137                        oa->o_dropped, fed->fed_grant);
138                 oa->o_dropped = 0;
139         }
140         if (obd->u.filter.fo_tot_granted < oa->o_dropped) {
141                 CERROR("%s: cli %s reports %u dropped > tot_granted "LPU64"\n",
142                        obd->obd_name, exp->exp_client_uuid.uuid,
143                        oa->o_dropped, obd->u.filter.fo_tot_granted);
144                 oa->o_dropped = 0;
145         }
146         obd->u.filter.fo_tot_granted -= oa->o_dropped;
147         fed->fed_grant -= oa->o_dropped;
148         fed->fed_dirty = oa->o_dirty;
149         EXIT;
150 }
151
152 #define GRANT_FOR_LLOG 16
153
154 /* Figure out how much space is available between what we've granted
155  * and what remains in the filesystem.  Compensate for ext3 indirect
156  * block overhead when computing how much free space is left ungranted.
157  *
158  * Caller must hold obd_osfs_lock. */
159 obd_size filter_grant_space_left(struct obd_export *exp)
160 {
161         struct obd_device *obd = exp->exp_obd;
162         int blockbits = obd->u.filter.fo_sb->s_blocksize_bits;
163         obd_size tot_granted = obd->u.filter.fo_tot_granted, avail, left = 0;
164         int rc, statfs_done = 0;
165
166         if (time_before(obd->obd_osfs_age, jiffies - HZ)) {
167 restat:
168                 rc = fsfilt_statfs(obd, obd->u.filter.fo_sb, jiffies + 1);
169                 if (rc) /* N.B. statfs can't really fail */
170                         RETURN(0);
171                 statfs_done = 1;
172         }
173
174         avail = obd->obd_osfs.os_bavail;
175         left = avail - (avail >> (blockbits - 3)); /* (d)indirect */
176         if (left > GRANT_FOR_LLOG) {
177                 left = (left - GRANT_FOR_LLOG) << blockbits;
178         } else {
179                 left = 0 /* << blockbits */;
180         }
181
182         if (!statfs_done && left < 32 * FILTER_GRANT_CHUNK + tot_granted) {
183                 CDEBUG(D_CACHE, "fs has no space left and statfs too old\n");
184                 goto restat;
185         }
186
187         if (left >= tot_granted) {
188                 left -= tot_granted;
189         } else {
190                 static unsigned long next;
191                 if (left < tot_granted - obd->u.filter.fo_tot_pending &&
192                     time_after(jiffies, next)) {
193                         spin_unlock(&obd->obd_osfs_lock);
194                         CERROR("%s: cli %s granted "LPU64" more than available "
195                                LPU64" and pending "LPU64"\n", obd->obd_name,
196                                exp->exp_client_uuid.uuid, tot_granted, left,
197                                obd->u.filter.fo_tot_pending);
198                         if (next == 0)
199                                 portals_debug_dumplog();
200                         next = jiffies + 20 * HZ;
201                         spin_lock(&obd->obd_osfs_lock);
202                 }
203                 left = 0;
204         }
205
206         CDEBUG(D_CACHE, "%s: cli %s free: "LPU64" avail: "LPU64" grant "LPU64
207                " left: "LPU64" pending: "LPU64"\n", obd->obd_name,
208                exp->exp_client_uuid.uuid, obd->obd_osfs.os_bfree << blockbits,
209                avail << blockbits, tot_granted, left,
210                obd->u.filter.fo_tot_pending);
211
212         return left;
213 }
214
215 /* Calculate how much grant space to allocate to this client, based on how
216  * much space is currently free and how much of that is already granted.
217  *
218  * Caller must hold obd_osfs_lock. */
219 long filter_grant(struct obd_export *exp, obd_size current_grant,
220                   obd_size want, obd_size fs_space_left)
221 {
222         struct obd_device *obd = exp->exp_obd;
223         struct filter_export_data *fed = &exp->exp_filter_data;
224         int blockbits = obd->u.filter.fo_sb->s_blocksize_bits;
225         __u64 grant = 0;
226
227         /* Grant some fraction of the client's requested grant space so that
228          * they are not always waiting for write credits (not all of it to
229          * avoid overgranting in face of multiple RPCs in flight).  This
230          * essentially will be able to control the OSC_MAX_RIF for a client.
231          *
232          * If we do have a large disparity and multiple RPCs in flight we
233          * might grant "too much" but that's OK because it means we are
234          * dirtying a lot on the client and will likely use it up quickly. */
235         if (current_grant < want) {
236                 grant = min((want >> blockbits) / 2,
237                             (fs_space_left >> blockbits) / 8);
238                 grant <<= blockbits;
239
240                 if (grant) {
241                         if (grant > FILTER_GRANT_CHUNK)
242                                 grant = FILTER_GRANT_CHUNK;
243
244                         obd->u.filter.fo_tot_granted += grant;
245                         fed->fed_grant += grant;
246                 }
247         }
248
249         CDEBUG(D_CACHE,"%s: cli %s wants: "LPU64" granting: "LPU64"\n",
250                obd->obd_name, exp->exp_client_uuid.uuid, want, grant);
251         CDEBUG(D_CACHE,
252                "%s: cli %s tot cached:"LPU64" granted:"LPU64
253                " num_exports: %d\n", obd->obd_name, exp->exp_client_uuid.uuid,
254                obd->u.filter.fo_tot_dirty,
255                obd->u.filter.fo_tot_granted, obd->obd_num_exports);
256
257         return grant;
258 }
259
260 static int filter_preprw_read(int cmd, struct obd_export *exp, struct obdo *oa,
261                               int objcount, struct obd_ioobj *obj,
262                               int niocount, struct niobuf_remote *nb,
263                               struct niobuf_local *res,
264                               struct obd_trans_info *oti)
265 {
266         struct obd_device *obd = exp->exp_obd;
267         struct obd_run_ctxt saved;
268         struct obd_ioobj *o;
269         struct niobuf_remote *rnb;
270         struct niobuf_local *lnb = NULL;
271         struct fsfilt_objinfo *fso;
272         struct dentry *dentry;
273         struct inode *inode;
274         int rc = 0, i, j, tot_bytes = 0, cleanup_phase = 0;
275         unsigned long now = jiffies;
276         ENTRY;
277
278         /* We are currently not supporting multi-obj BRW_READ RPCS at all.
279          * When we do this function's dentry cleanup will need to be fixed */
280         LASSERT(objcount == 1);
281         LASSERT(obj->ioo_bufcnt > 0);
282
283         if (oa && oa->o_valid & OBD_MD_FLGRANT) {
284                 spin_lock(&obd->obd_osfs_lock);
285                 filter_grant_incoming(exp, oa);
286
287 #if 0
288                 /* Reads do not increase grants */
289                 oa->o_grant = filter_grant(exp, oa->o_grant, oa->o_undirty,
290                                            filter_grant_space_left(exp));
291 #else
292                 oa->o_grant = 0;
293 #endif
294                 spin_unlock(&obd->obd_osfs_lock);
295         }
296
297         OBD_ALLOC(fso, objcount * sizeof(*fso));
298         if (fso == NULL)
299                 RETURN(-ENOMEM);
300
301         memset(res, 0, niocount * sizeof(*res));
302
303         push_ctxt(&saved, &exp->exp_obd->obd_ctxt, NULL);
304         for (i = 0, o = obj; i < objcount; i++, o++) {
305                 LASSERT(o->ioo_bufcnt);
306
307                 dentry = filter_oa2dentry(obd, oa);
308                 if (IS_ERR(dentry))
309                         GOTO(cleanup, rc = PTR_ERR(dentry));
310
311                 if (dentry->d_inode == NULL) {
312                         CERROR("trying to BRW to non-existent file "LPU64"\n",
313                                o->ioo_id);
314                         f_dput(dentry);
315                         GOTO(cleanup, rc = -ENOENT);
316                 }
317
318                 fso[i].fso_dentry = dentry;
319                 fso[i].fso_bufcnt = o->ioo_bufcnt;
320         }
321
322         if (time_after(jiffies, now + 15 * HZ))
323                 CERROR("slow preprw_read setup %lus\n", (jiffies - now) / HZ);
324         else
325                 CDEBUG(D_INFO, "preprw_read setup: %lu jiffies\n",
326                        (jiffies - now));
327
328         for (i = 0, o = obj, rnb = nb, lnb = res; i < objcount; i++, o++) {
329                 dentry = fso[i].fso_dentry;
330                 inode = dentry->d_inode;
331
332                 for (j = 0; j < o->ioo_bufcnt; j++, rnb++, lnb++) {
333                         lnb->dentry = dentry;
334                         lnb->offset = rnb->offset;
335                         lnb->len    = rnb->len;
336                         lnb->flags  = rnb->flags;
337
338                         if (inode->i_size <= rnb->offset) {
339                                 /* If there's no more data, abort early.
340                                  * lnb->page == NULL and lnb->rc == 0, so it's
341                                  * easy to detect later. */
342                                 break;
343                         } else {
344                                 rc = filter_start_page_read(inode, lnb);
345                         }
346
347                         if (rc) {
348                                 CDEBUG(rc == -ENOSPC ? D_INODE : D_ERROR,
349                                        "page err %u@"LPU64" %u/%u %p: rc %d\n",
350                                        lnb->len, lnb->offset, j, o->ioo_bufcnt,
351                                        dentry, rc);
352                                 cleanup_phase = 1;
353                                 GOTO(cleanup, rc);
354                         }
355
356                         tot_bytes += lnb->rc;
357                         if (lnb->rc < lnb->len) {
358                                 /* short read, be sure to wait on it */
359                                 lnb++;
360                                 break;
361                         }
362                 }
363         }
364
365         if (time_after(jiffies, now + 15 * HZ))
366                 CERROR("slow start_page_read %lus\n", (jiffies - now) / HZ);
367         else
368                 CDEBUG(D_INFO, "start_page_read: %lu jiffies\n",
369                        (jiffies - now));
370
371         lprocfs_counter_add(obd->obd_stats, LPROC_FILTER_READ_BYTES, tot_bytes);
372         while (lnb-- > res) {
373                 rc = filter_finish_page_read(lnb);
374                 if (rc) {
375                         CERROR("error page %u@"LPU64" %u %p: rc %d\n", lnb->len,
376                                lnb->offset, (int)(lnb - res), lnb->dentry, rc);
377                         cleanup_phase = 1;
378                         GOTO(cleanup, rc);
379                 }
380         }
381
382         if (time_after(jiffies, now + 15 * HZ))
383                 CERROR("slow finish_page_read %lus\n", (jiffies - now) / HZ);
384         else
385                 CDEBUG(D_INFO, "finish_page_read: %lu jiffies\n",
386                        (jiffies - now));
387
388         filter_tally_read(&exp->exp_obd->u.filter, res, niocount);
389
390         EXIT;
391
392  cleanup:
393         switch (cleanup_phase) {
394         case 1:
395                 for (lnb = res; lnb < (res + niocount); lnb++) {
396                         if (lnb->page)
397                                 page_cache_release(lnb->page);
398                 }
399                 if (res->dentry != NULL)
400                         f_dput(res->dentry);
401                 else
402                         CERROR("NULL dentry in cleanup -- tell CFS\n");
403         case 0:
404                 OBD_FREE(fso, objcount * sizeof(*fso));
405                 pop_ctxt(&saved, &exp->exp_obd->obd_ctxt, NULL);
406         }
407         return rc;
408 }
409
410 /* When clients have dirtied as much space as they've been granted they
411  * fall through to sync writes.  These sync writes haven't been expressed
412  * in grants and need to error with ENOSPC when there isn't room in the
413  * filesystem for them after grants are taken into account.  However,
414  * writeback of the dirty data that was already granted space can write
415  * right on through.
416  *
417  * Caller must hold obd_osfs_lock. */
418 static int filter_grant_check(struct obd_export *exp, int objcount,
419                               struct fsfilt_objinfo *fso, int niocount,
420                               struct niobuf_remote *rnb,
421                               struct niobuf_local *lnb, obd_size *left,
422                               struct inode *inode)
423 {
424         struct filter_export_data *fed = &exp->exp_filter_data;
425         int blocksize = exp->exp_obd->u.filter.fo_sb->s_blocksize;
426         unsigned long used = 0, ungranted = 0, using;
427         int i, rc = -ENOSPC, obj, n = 0, mask = D_CACHE;
428
429         for (obj = 0; obj < objcount; obj++) {
430                 for (i = 0; i < fso[obj].fso_bufcnt; i++, n++) {
431                         int tmp, bytes;
432
433                         /* FIXME: this is calculated with PAGE_SIZE on client */
434                         bytes = rnb[n].len;
435                         bytes += rnb[n].offset & (blocksize - 1);
436                         tmp = (rnb[n].offset + rnb[n].len) & (blocksize - 1);
437                         if (tmp)
438                                 bytes += blocksize - tmp;
439
440                         if (rnb[n].flags & OBD_BRW_FROM_GRANT) {
441                                 if (fed->fed_grant < used + bytes) {
442                                         CDEBUG(D_CACHE,
443                                                "%s: cli %s claims %ld+%d GRANT,"
444                                                " no such grant %lu, idx %d\n",
445                                                exp->exp_obd->obd_name,
446                                                exp->exp_client_uuid.uuid,
447                                                used, bytes, fed->fed_grant, n);
448                                         mask = D_ERROR;
449                                 } else {
450                                         used += bytes;
451                                         rnb[n].flags |= OBD_BRW_GRANTED;
452                                         lnb[n].lnb_grant_used = bytes;
453                                         CDEBUG(0, "idx %d used=%lu\n", n, used);
454                                         rc = 0;
455                                         continue;
456                                 }
457                         }
458                         if (*left > ungranted) {
459                                 /* if enough space, pretend it was granted */
460                                 ungranted += bytes;
461                                 rnb[n].flags |= OBD_BRW_GRANTED;
462                                 CDEBUG(0, "idx %d ungranted=%lu\n",n,ungranted);
463                                 rc = 0;
464                                 continue;
465                         }
466
467                         /* We can't check for already-mapped blocks here, as
468                          * it requires dropping the osfs lock to do the bmap.
469                          * Instead, we return ENOSPC and in that case we need
470                          * to go through and verify if all of the blocks not
471                          * marked BRW_GRANTED are already mapped and we can
472                          * ignore this error. */
473                         lnb[n].rc = -ENOSPC;
474                         rnb[n].flags &= OBD_BRW_GRANTED;
475                         CDEBUG(D_CACHE, "%s: cli %s idx %d no space for %d\n",
476                                exp->exp_obd->obd_name,
477                                exp->exp_client_uuid.uuid, n, bytes);
478                 }
479         }
480
481         /* Now substract what client have used already.  We don't subtract
482          * this from the tot_granted yet, so that other client's can't grab
483          * that space before we have actually allocated our blocks.  That
484          * happens in filter_grant_commit() after the writes are done. */
485         *left -= ungranted;
486         fed->fed_grant -= used;
487         fed->fed_pending += used;
488         exp->exp_obd->u.filter.fo_tot_pending += used;
489
490         CDEBUG(mask,
491                "%s: cli %s used: %lu ungranted: %lu grant: %lu dirty: %lu\n",
492                exp->exp_obd->obd_name, exp->exp_client_uuid.uuid, used,
493                ungranted, fed->fed_grant, fed->fed_dirty);
494
495         /* Rough calc in case we don't refresh cached statfs data */
496         using = (used + ungranted + 1 ) >>
497                 exp->exp_obd->u.filter.fo_sb->s_blocksize_bits;
498         if (exp->exp_obd->obd_osfs.os_bavail > using)
499                 exp->exp_obd->obd_osfs.os_bavail -= using;
500         else
501                 exp->exp_obd->obd_osfs.os_bavail = 0;
502
503         if (fed->fed_dirty < used) {
504                 CERROR("%s: cli %s claims used %lu > fed_dirty %lu\n",
505                        exp->exp_obd->obd_name, exp->exp_client_uuid.uuid,
506                        used, fed->fed_dirty);
507                 used = fed->fed_dirty;
508         }
509         exp->exp_obd->u.filter.fo_tot_dirty -= used;
510         fed->fed_dirty -= used;
511
512         return rc;
513 }
514
515 static int filter_start_page_write(struct inode *inode,
516                                    struct niobuf_local *lnb)
517 {
518         struct page *page = alloc_pages(GFP_HIGHUSER, 0);
519         if (page == NULL) {
520                 CERROR("no memory for a temp page\n");
521                 RETURN(lnb->rc = -ENOMEM);
522         }
523         POISON_PAGE(page, 0xf1);
524         if (lnb->len != PAGE_SIZE) {
525                 memset(kmap(page) + lnb->len, 0, PAGE_SIZE - lnb->len);
526                 kunmap(page);
527         }
528         page->index = lnb->offset >> PAGE_SHIFT;
529         lnb->page = page;
530
531         return 0;
532 }
533
534 /* If we ever start to support multi-object BRW RPCs, we will need to get locks
535  * on mulitple inodes.  That isn't all, because there still exists the
536  * possibility of a truncate starting a new transaction while holding the ext3
537  * rwsem = write while some writes (which have started their transactions here)
538  * blocking on the ext3 rwsem = read => lock inversion.
539  *
540  * The handling gets very ugly when dealing with locked pages.  It may be easier
541  * to just get rid of the locked page code (which has problems of its own) and
542  * either discover we do not need it anymore (i.e. it was a symptom of another
543  * bug) or ensure we get the page locks in an appropriate order. */
544 static int filter_preprw_write(int cmd, struct obd_export *exp, struct obdo *oa,
545                                int objcount, struct obd_ioobj *obj,
546                                int niocount, struct niobuf_remote *nb,
547                                struct niobuf_local *res,
548                                struct obd_trans_info *oti)
549 {
550         struct obd_run_ctxt saved;
551         struct niobuf_remote *rnb;
552         struct niobuf_local *lnb;
553         struct fsfilt_objinfo fso;
554         struct dentry *dentry;
555         obd_size left;
556         unsigned long now = jiffies;
557         int rc = 0, i, tot_bytes = 0, cleanup_phase = 1;
558         ENTRY;
559         LASSERT(objcount == 1);
560         LASSERT(obj->ioo_bufcnt > 0);
561
562         memset(res, 0, niocount * sizeof(*res));
563
564         push_ctxt(&saved, &exp->exp_obd->obd_ctxt, NULL);
565         dentry = filter_fid2dentry(exp->exp_obd, NULL, obj->ioo_gr,
566                                    obj->ioo_id);
567         if (IS_ERR(dentry))
568                 GOTO(cleanup, rc = PTR_ERR(dentry));
569
570         if (dentry->d_inode == NULL) {
571                 CERROR("trying to BRW to non-existent file "LPU64"\n",
572                        obj->ioo_id);
573                 f_dput(dentry);
574                 GOTO(cleanup, rc = -ENOENT);
575         }
576
577         fso.fso_dentry = dentry;
578         fso.fso_bufcnt = obj->ioo_bufcnt;
579
580         if (time_after(jiffies, now + 15 * HZ))
581                 CERROR("slow preprw_write setup %lus\n", (jiffies - now) / HZ);
582         else
583                 CDEBUG(D_INFO, "preprw_write setup: %lu jiffies\n",
584                        (jiffies - now));
585
586         spin_lock(&exp->exp_obd->obd_osfs_lock);
587         if (oa)
588                 filter_grant_incoming(exp, oa);
589         cleanup_phase = 0;
590
591         left = filter_grant_space_left(exp);
592
593         rc = filter_grant_check(exp, objcount, &fso, niocount, nb, res,
594                                 &left, dentry->d_inode);
595         if (oa && oa->o_valid & OBD_MD_FLGRANT)
596                 oa->o_grant = filter_grant(exp,oa->o_grant,oa->o_undirty,left);
597
598         spin_unlock(&exp->exp_obd->obd_osfs_lock);
599
600         if (rc) {
601                 f_dput(dentry);
602                 GOTO(cleanup, rc);
603         }
604
605         for (i = 0, rnb = nb, lnb = res; i < obj->ioo_bufcnt;
606              i++, lnb++, rnb++) {
607                 /* We still set up for ungranted pages so that granted pages
608                  * can be written to disk as they were promised, and portals
609                  * needs to keep the pages all aligned properly. */ 
610                 lnb->dentry = dentry;
611                 lnb->offset = rnb->offset;
612                 lnb->len    = rnb->len;
613                 lnb->flags  = rnb->flags;
614
615                 rc = filter_start_page_write(dentry->d_inode, lnb);
616                 if (rc) {
617                         CDEBUG(D_ERROR, "page err %u@"LPU64" %u/%u %p: rc %d\n",
618                                lnb->len, lnb->offset,
619                                i, obj->ioo_bufcnt, dentry, rc);
620                         while (lnb-- > res)
621                                 __free_pages(lnb->page, 0);
622                         f_dput(dentry);
623                         GOTO(cleanup, rc);
624                 }
625                 if (lnb->rc == 0)
626                         tot_bytes += lnb->len;
627         }
628
629         if (time_after(jiffies, now + 15 * HZ))
630                 CERROR("slow start_page_write %lus\n", (jiffies - now) / HZ);
631         else
632                 CDEBUG(D_INFO, "start_page_write: %lu jiffies\n",
633                        (jiffies - now));
634
635         lprocfs_counter_add(exp->exp_obd->obd_stats, LPROC_FILTER_WRITE_BYTES,
636                             tot_bytes);
637         EXIT;
638 cleanup:
639         switch(cleanup_phase) {
640         case 1:
641                 spin_lock(&exp->exp_obd->obd_osfs_lock);
642                 if (oa)
643                         filter_grant_incoming(exp, oa);
644                 spin_unlock(&exp->exp_obd->obd_osfs_lock);
645         default: ;
646         }
647         pop_ctxt(&saved, &exp->exp_obd->obd_ctxt, NULL);
648         return rc;
649 }
650
651 int filter_preprw(int cmd, struct obd_export *exp, struct obdo *oa,
652                   int objcount, struct obd_ioobj *obj, int niocount,
653                   struct niobuf_remote *nb, struct niobuf_local *res,
654                   struct obd_trans_info *oti)
655 {
656         if (cmd == OBD_BRW_WRITE)
657                 return filter_preprw_write(cmd, exp, oa, objcount, obj,
658                                            niocount, nb, res, oti);
659
660         if (cmd == OBD_BRW_READ)
661                 return filter_preprw_read(cmd, exp, oa, objcount, obj,
662                                           niocount, nb, res, oti);
663
664         LBUG();
665         return -EPROTO;
666 }
667
668 static int filter_commitrw_read(struct obd_export *exp, struct obdo *oa,
669                                 int objcount, struct obd_ioobj *obj,
670                                 int niocount, struct niobuf_local *res,
671                                 struct obd_trans_info *oti)
672 {
673         struct obd_ioobj *o;
674         struct niobuf_local *lnb;
675         int i, j, drop = 0;
676         ENTRY;
677
678         if (res->dentry != NULL)
679                 drop = (res->dentry->d_inode->i_size >
680                         exp->exp_obd->u.filter.fo_readcache_max_filesize);
681
682         for (i = 0, o = obj, lnb = res; i < objcount; i++, o++) {
683                 for (j = 0 ; j < o->ioo_bufcnt ; j++, lnb++) {
684                         if (lnb->page == NULL)
685                                 continue;
686                         /* drop from cache like truncate_list_pages() */
687                         if (drop && !TryLockPage(lnb->page)) {
688                                 if (lnb->page->mapping) {
689 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
690                                         truncate_complete_page(lnb->page);
691 #else
692                                         truncate_complete_page(lnb->page->mapping, lnb->page);
693 #endif
694                                 }
695                                 unlock_page(lnb->page);
696                         }
697                         page_cache_release(lnb->page);
698                 }
699         }
700         if (res->dentry != NULL)
701                 f_dput(res->dentry);
702         RETURN(0);
703 }
704
705 void flip_into_page_cache(struct inode *inode, struct page *new_page)
706 {
707         struct page *old_page;
708         int rc;
709
710         do {
711                 /* the dlm is protecting us from read/write concurrency, so we
712                  * expect this find_lock_page to return quickly.  even if we
713                  * race with another writer it won't be doing much work with
714                  * the page locked.  we do this 'cause t_c_p expects a
715                  * locked page, and it wants to grab the pagecache lock
716                  * as well. */
717                 old_page = find_lock_page(inode->i_mapping, new_page->index);
718                 if (old_page) {
719 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
720                         truncate_complete_page(old_page);
721 #else
722                         truncate_complete_page(old_page->mapping, old_page);
723 #endif
724                         unlock_page(old_page);
725                         page_cache_release(old_page);
726                 }
727
728 #if 0 /* this should be a /proc tunable someday */
729                 /* racing o_directs (no locking ioctl) could race adding
730                  * their pages, so we repeat the page invalidation unless
731                  * we successfully added our new page */
732                 rc = add_to_page_cache_unique(new_page, inode->i_mapping,
733                                               new_page->index,
734                                               page_hash(inode->i_mapping,
735                                                         new_page->index));
736                 if (rc == 0) {
737                         /* add_to_page_cache clears uptodate|dirty and locks
738                          * the page */
739                         SetPageUptodate(new_page);
740                         unlock_page(new_page);
741                 }
742 #else
743                 rc = 0;
744 #endif
745         } while (rc != 0);
746 }
747
748 void filter_grant_commit(struct obd_export *exp, int niocount,
749                          struct niobuf_local *res)
750 {
751         struct filter_obd *filter = &exp->exp_obd->u.filter;
752         struct niobuf_local *lnb = res;
753         unsigned long pending = 0;
754         int i;
755
756         spin_lock(&exp->exp_obd->obd_osfs_lock);
757         for (i = 0, lnb = res; i < niocount; i++, lnb++)
758                 pending += lnb->lnb_grant_used;
759
760         LASSERTF(exp->exp_filter_data.fed_pending >= pending,
761                  "%s: cli %s/%p fed_pending: %lu grant_used: %lu\n",
762                  exp->exp_obd->obd_name, exp->exp_client_uuid.uuid, exp,
763                  exp->exp_filter_data.fed_pending, pending);
764         exp->exp_filter_data.fed_pending -= pending;
765         LASSERTF(filter->fo_tot_granted >= pending,
766                  "%s: cli %s/%p tot_granted: "LPU64" grant_used: %lu\n",
767                  exp->exp_obd->obd_name, exp->exp_client_uuid.uuid, exp,
768                  exp->exp_obd->u.filter.fo_tot_granted, pending);
769         filter->fo_tot_granted -= pending;
770         LASSERTF(filter->fo_tot_pending >= pending,
771                  "%s: cli %s/%p tot_pending: "LPU64" grant_used: %lu\n",
772                  exp->exp_obd->obd_name, exp->exp_client_uuid.uuid, exp,
773                  filter->fo_tot_pending, pending);
774         filter->fo_tot_pending -= pending;
775
776         spin_unlock(&exp->exp_obd->obd_osfs_lock);
777 }
778
779 int filter_commitrw(int cmd, struct obd_export *exp, struct obdo *oa,
780                     int objcount, struct obd_ioobj *obj, int niocount,
781                     struct niobuf_local *res, struct obd_trans_info *oti)
782 {
783         if (cmd == OBD_BRW_WRITE)
784                 return filter_commitrw_write(exp, oa, objcount, obj, niocount,
785                                              res, oti);
786         if (cmd == OBD_BRW_READ)
787                 return filter_commitrw_read(exp, oa, objcount, obj, niocount,
788                                             res, oti);
789         LBUG();
790         return -EPROTO;
791 }
792
793 int filter_brw(int cmd, struct obd_export *exp, struct obdo *oa,
794                struct lov_stripe_md *lsm, obd_count oa_bufs,
795                struct brw_page *pga, struct obd_trans_info *oti)
796 {
797         struct obd_ioobj ioo;
798         struct niobuf_local *lnb;
799         struct niobuf_remote *rnb;
800         obd_count i;
801         int ret = 0;
802         ENTRY;
803
804         OBD_ALLOC(lnb, oa_bufs * sizeof(struct niobuf_local));
805         OBD_ALLOC(rnb, oa_bufs * sizeof(struct niobuf_remote));
806
807         if (lnb == NULL || rnb == NULL)
808                 GOTO(out, ret = -ENOMEM);
809
810         for (i = 0; i < oa_bufs; i++) {
811                 rnb[i].offset = pga[i].off;
812                 rnb[i].len = pga[i].count;
813         }
814
815         obdo_to_ioobj(oa, &ioo);
816         ioo.ioo_bufcnt = oa_bufs;
817
818         ret = filter_preprw(cmd, exp, oa, 1, &ioo, oa_bufs, rnb, lnb, oti);
819         if (ret != 0)
820                 GOTO(out, ret);
821
822         for (i = 0; i < oa_bufs; i++) {
823                 void *virt = kmap(pga[i].pg);
824                 obd_off off = pga[i].off & ~PAGE_MASK;
825                 void *addr = kmap(lnb[i].page);
826
827                 /* 2 kmaps == vanishingly small deadlock opportunity */
828
829                 if (cmd & OBD_BRW_WRITE)
830                         memcpy(addr + off, virt + off, pga[i].count);
831                 else
832                         memcpy(virt + off, addr + off, pga[i].count);
833
834                 kunmap(lnb[i].page);
835                 kunmap(pga[i].pg);
836         }
837
838         ret = filter_commitrw(cmd, exp, oa, 1, &ioo, oa_bufs, lnb, oti);
839
840 out:
841         if (lnb)
842                 OBD_FREE(lnb, oa_bufs * sizeof(struct niobuf_local));
843         if (rnb)
844                 OBD_FREE(rnb, oa_bufs * sizeof(struct niobuf_remote));
845         RETURN(ret);
846 }