Whamcloud - gitweb
3900ad15bc8b668f92f4490b98c97d30b1393099
[fs/lustre-release.git] / lustre / obdfilter / filter_io.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  linux/fs/obdfilter/filter_io.c
5  *
6  *  Copyright (c) 2001-2003 Cluster File Systems, Inc.
7  *   Author: Peter Braam <braam@clusterfs.com>
8  *   Author: Andreas Dilger <adilger@clusterfs.com>
9  *   Author: Phil Schwan <phil@clusterfs.com>
10  *
11  *   This file is part of Lustre, http://www.lustre.org.
12  *
13  *   Lustre is free software; you can redistribute it and/or
14  *   modify it under the terms of version 2 of the GNU General Public
15  *   License as published by the Free Software Foundation.
16  *
17  *   Lustre is distributed in the hope that it will be useful,
18  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
19  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
20  *   GNU General Public License for more details.
21  *
22  *   You should have received a copy of the GNU General Public License
23  *   along with Lustre; if not, write to the Free Software
24  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
25  */
26
27 #define DEBUG_SUBSYSTEM S_FILTER
28
29 #include <linux/config.h>
30 #include <linux/module.h>
31 #include <linux/pagemap.h> // XXX kill me soon
32 #include <linux/version.h>
33
34 #include <linux/obd_class.h>
35 #include <linux/lustre_fsfilt.h>
36 #include "filter_internal.h"
37
38 static int filter_start_page_read(struct inode *inode, struct niobuf_local *lnb)
39 {
40         struct address_space *mapping = inode->i_mapping;
41         struct page *page;
42         unsigned long index = lnb->offset >> PAGE_SHIFT;
43         int rc;
44
45         page = grab_cache_page(mapping, index); /* locked page */
46         if (page == NULL)
47                 return lnb->rc = -ENOMEM;
48
49         LASSERT(page->mapping == mapping);
50
51         lnb->page = page;
52
53         if (inode->i_size < lnb->offset + lnb->len - 1)
54                 lnb->rc = inode->i_size - lnb->offset;
55         else
56                 lnb->rc = lnb->len;
57
58         if (PageUptodate(page)) {
59                 unlock_page(page);
60                 return 0;
61         }
62
63         rc = mapping->a_ops->readpage(NULL, page);
64         if (rc < 0) {
65                 CERROR("page index %lu, rc = %d\n", index, rc);
66                 lnb->page = NULL;
67                 page_cache_release(page);
68                 return lnb->rc = rc;
69         }
70
71         return 0;
72 }
73
74 static int filter_finish_page_read(struct niobuf_local *lnb)
75 {
76         if (lnb->page == NULL)
77                 return 0;
78
79         if (PageUptodate(lnb->page))
80                 return 0;
81
82         wait_on_page(lnb->page);
83         if (!PageUptodate(lnb->page)) {
84                 CERROR("page index %lu/offset "LPX64" not uptodate\n",
85                        lnb->page->index, lnb->offset);
86                 GOTO(err_page, lnb->rc = -EIO);
87         }
88         if (PageError(lnb->page)) {
89                 CERROR("page index %lu/offset "LPX64" has error\n",
90                        lnb->page->index, lnb->offset);
91                 GOTO(err_page, lnb->rc = -EIO);
92         }
93
94         return 0;
95
96 err_page:
97         page_cache_release(lnb->page);
98         lnb->page = NULL;
99         return lnb->rc;
100 }
101
102 /* Grab the dirty and seen grant announcements from the incoming obdo.
103  * We will later calculate the clients new grant and return it.
104  * Caller must hold osfs lock */
105 static void filter_grant_incoming(struct obd_export *exp, struct obdo *oa)
106 {
107         struct filter_export_data *fed;
108         struct obd_device *obd = exp->exp_obd;
109         ENTRY;
110
111         LASSERT_SPIN_LOCKED(&obd->obd_osfs_lock);
112
113         if ((oa->o_valid & (OBD_MD_FLBLOCKS|OBD_MD_FLGRANT)) !=
114                                         (OBD_MD_FLBLOCKS|OBD_MD_FLGRANT)) {
115                 oa->o_valid &= ~OBD_MD_FLGRANT;
116                 EXIT;
117                 return;
118         }
119
120         fed = &exp->exp_filter_data;
121
122         /* Add some margin, since there is a small race if other RPCs arrive
123          * out-or-order and have already consumed some grant.  We want to
124          * leave this here in case there is a large error in accounting. */
125         CDEBUG(oa->o_grant > fed->fed_grant + FILTER_GRANT_CHUNK ?
126                D_WARNING : D_CACHE,
127                "%s: cli %s/%p reports grant: "LPU64" dropped: %u, local: %lu\n",
128                obd->obd_name, exp->exp_client_uuid.uuid, exp, oa->o_grant,
129                oa->o_dropped, fed->fed_grant);
130
131         /* Update our accounting now so that statfs takes it into account.
132          * Note that fed_dirty is only approximate and can become incorrect
133          * if RPCs arrive out-of-order.  No important calculations depend
134          * on fed_dirty however. */
135         obd->u.filter.fo_tot_dirty += oa->o_dirty - fed->fed_dirty;
136         if (fed->fed_grant < oa->o_dropped) {
137                 CERROR("%s: cli %s/%p reports %u dropped > fed_grant %lu\n",
138                        obd->obd_name, exp->exp_client_uuid.uuid, exp,
139                        oa->o_dropped, fed->fed_grant);
140                 oa->o_dropped = 0;
141         }
142         if (obd->u.filter.fo_tot_granted < oa->o_dropped) {
143                 CERROR("%s: cli %s/%p reports %u dropped > tot_grant "LPU64"\n",
144                        obd->obd_name, exp->exp_client_uuid.uuid, exp,
145                        oa->o_dropped, obd->u.filter.fo_tot_granted);
146                 oa->o_dropped = 0;
147         }
148         obd->u.filter.fo_tot_granted -= oa->o_dropped;
149         fed->fed_grant -= oa->o_dropped;
150         fed->fed_dirty = oa->o_dirty;
151         EXIT;
152 }
153
154 #define GRANT_FOR_LLOG(obd) 16
155
156 /* Figure out how much space is available between what we've granted
157  * and what remains in the filesystem.  Compensate for ext3 indirect
158  * block overhead when computing how much free space is left ungranted.
159  *
160  * Caller must hold obd_osfs_lock. */
161 obd_size filter_grant_space_left(struct obd_export *exp)
162 {
163         struct obd_device *obd = exp->exp_obd;
164         int blockbits = obd->u.filter.fo_sb->s_blocksize_bits;
165         obd_size tot_granted = obd->u.filter.fo_tot_granted, avail, left = 0;
166         int rc, statfs_done = 0;
167
168         LASSERT_SPIN_LOCKED(&obd->obd_osfs_lock);
169
170         if (time_before(obd->obd_osfs_age, jiffies - HZ)) {
171 restat:
172                 rc = fsfilt_statfs(obd, obd->u.filter.fo_sb, jiffies + 1);
173                 if (rc) /* N.B. statfs can't really fail */
174                         RETURN(0);
175                 statfs_done = 1;
176         }
177
178         avail = obd->obd_osfs.os_bavail;
179         left = avail - (avail >> (blockbits - 3)); /* (d)indirect */
180         if (left > GRANT_FOR_LLOG(obd)) {
181                 left = (left - GRANT_FOR_LLOG(obd)) << blockbits;
182         } else {
183                 left = 0 /* << blockbits */;
184         }
185
186         if (!statfs_done && left < 32 * FILTER_GRANT_CHUNK + tot_granted) {
187                 CDEBUG(D_CACHE, "fs has no space left and statfs too old\n");
188                 goto restat;
189         }
190
191         if (left >= tot_granted) {
192                 left -= tot_granted;
193         } else {
194                 static unsigned long next;
195                 if (left < tot_granted - obd->u.filter.fo_tot_pending &&
196                     time_after(jiffies, next)) {
197                         spin_unlock(&obd->obd_osfs_lock);
198                         CERROR("%s: cli %s/%p grant "LPU64" > available "
199                                LPU64" and pending "LPU64"\n", obd->obd_name,
200                                exp->exp_client_uuid.uuid, exp, tot_granted,
201                                left, obd->u.filter.fo_tot_pending);
202                         if (next == 0)
203                                 portals_debug_dumplog();
204                         next = jiffies + 20 * HZ;
205                         spin_lock(&obd->obd_osfs_lock);
206                 }
207                 left = 0;
208         }
209
210         CDEBUG(D_CACHE, "%s: cli %s/%p free: "LPU64" avail: "LPU64" grant "LPU64
211                " left: "LPU64" pending: "LPU64"\n", obd->obd_name,
212                exp->exp_client_uuid.uuid, exp,
213                obd->obd_osfs.os_bfree << blockbits, avail << blockbits,
214                tot_granted, left, obd->u.filter.fo_tot_pending);
215
216         return left;
217 }
218
219 /* Calculate how much grant space to allocate to this client, based on how
220  * much space is currently free and how much of that is already granted.
221  *
222  * Caller must hold obd_osfs_lock. */
223 long filter_grant(struct obd_export *exp, obd_size current_grant,
224                   obd_size want, obd_size fs_space_left)
225 {
226         struct obd_device *obd = exp->exp_obd;
227         struct filter_export_data *fed = &exp->exp_filter_data;
228         int blockbits = obd->u.filter.fo_sb->s_blocksize_bits;
229         __u64 grant = 0;
230
231         LASSERT_SPIN_LOCKED(&obd->obd_osfs_lock);
232
233         /* Grant some fraction of the client's requested grant space so that
234          * they are not always waiting for write credits (not all of it to
235          * avoid overgranting in face of multiple RPCs in flight).  This
236          * essentially will be able to control the OSC_MAX_RIF for a client.
237          *
238          * If we do have a large disparity between what the client thinks it
239          * has and what we think it has, don't grant very much and let the
240          * client consume its grant first.  Either it just has lots of RPCs
241          * in flight, or it was evicted and its grants will soon be used up. */
242         if (current_grant < want &&
243             current_grant < fed->fed_grant + FILTER_GRANT_CHUNK) {
244                 grant = min((want >> blockbits) / 2,
245                             (fs_space_left >> blockbits) / 8);
246                 grant <<= blockbits;
247
248                 if (grant) {
249                         if (grant > FILTER_GRANT_CHUNK)
250                                 grant = FILTER_GRANT_CHUNK;
251
252                         obd->u.filter.fo_tot_granted += grant;
253                         fed->fed_grant += grant;
254                 }
255         }
256
257         CDEBUG(D_CACHE,"%s: cli %s/%p wants: "LPU64" granting: "LPU64"\n",
258                obd->obd_name, exp->exp_client_uuid.uuid, exp, want, grant);
259         CDEBUG(D_CACHE,
260                "%s: cli %s/%p tot cached:"LPU64" granted:"LPU64
261                " num_exports: %d\n", obd->obd_name, exp->exp_client_uuid.uuid,
262                exp, obd->u.filter.fo_tot_dirty,
263                obd->u.filter.fo_tot_granted, obd->obd_num_exports);
264
265         return grant;
266 }
267
268 static int filter_preprw_read(int cmd, struct obd_export *exp, struct obdo *oa,
269                               int objcount, struct obd_ioobj *obj,
270                               int niocount, struct niobuf_remote *nb,
271                               struct niobuf_local *res,
272                               struct obd_trans_info *oti)
273 {
274         struct obd_device *obd = exp->exp_obd;
275         struct obd_run_ctxt saved;
276         struct obd_ioobj *o;
277         struct niobuf_remote *rnb;
278         struct niobuf_local *lnb = NULL;
279         struct fsfilt_objinfo *fso;
280         struct dentry *dentry;
281         struct inode *inode;
282         int rc = 0, i, j, tot_bytes = 0, cleanup_phase = 0;
283         unsigned long now = jiffies;
284         ENTRY;
285
286         /* We are currently not supporting multi-obj BRW_READ RPCS at all.
287          * When we do this function's dentry cleanup will need to be fixed */
288         LASSERT(objcount == 1);
289         LASSERT(obj->ioo_bufcnt > 0);
290
291         if (oa && oa->o_valid & OBD_MD_FLGRANT) {
292                 spin_lock(&obd->obd_osfs_lock);
293                 filter_grant_incoming(exp, oa);
294
295 #if 0
296                 /* Reads do not increase grants */
297                 oa->o_grant = filter_grant(exp, oa->o_grant, oa->o_undirty,
298                                            filter_grant_space_left(exp));
299 #else
300                 oa->o_grant = 0;
301 #endif
302                 spin_unlock(&obd->obd_osfs_lock);
303         }
304
305         OBD_ALLOC(fso, objcount * sizeof(*fso));
306         if (fso == NULL)
307                 RETURN(-ENOMEM);
308
309         memset(res, 0, niocount * sizeof(*res));
310
311         push_ctxt(&saved, &exp->exp_obd->obd_ctxt, NULL);
312         for (i = 0, o = obj; i < objcount; i++, o++) {
313                 LASSERT(o->ioo_bufcnt);
314
315                 dentry = filter_oa2dentry(obd, oa);
316                 if (IS_ERR(dentry))
317                         GOTO(cleanup, rc = PTR_ERR(dentry));
318
319                 if (dentry->d_inode == NULL) {
320                         CERROR("trying to BRW to non-existent file "LPU64"\n",
321                                o->ioo_id);
322                         f_dput(dentry);
323                         GOTO(cleanup, rc = -ENOENT);
324                 }
325
326                 fso[i].fso_dentry = dentry;
327                 fso[i].fso_bufcnt = o->ioo_bufcnt;
328         }
329
330         if (time_after(jiffies, now + 15 * HZ))
331                 CERROR("slow preprw_read setup %lus\n", (jiffies - now) / HZ);
332         else
333                 CDEBUG(D_INFO, "preprw_read setup: %lu jiffies\n",
334                        (jiffies - now));
335
336         for (i = 0, o = obj, rnb = nb, lnb = res; i < objcount; i++, o++) {
337                 dentry = fso[i].fso_dentry;
338                 inode = dentry->d_inode;
339
340                 for (j = 0; j < o->ioo_bufcnt; j++, rnb++, lnb++) {
341                         lnb->dentry = dentry;
342                         lnb->offset = rnb->offset;
343                         lnb->len    = rnb->len;
344                         lnb->flags  = rnb->flags;
345
346                         if (inode->i_size <= rnb->offset) {
347                                 /* If there's no more data, abort early.
348                                  * lnb->page == NULL and lnb->rc == 0, so it's
349                                  * easy to detect later. */
350                                 break;
351                         } else {
352                                 rc = filter_start_page_read(inode, lnb);
353                         }
354
355                         if (rc) {
356                                 CDEBUG(rc == -ENOSPC ? D_INODE : D_ERROR,
357                                        "page err %u@"LPU64" %u/%u %p: rc %d\n",
358                                        lnb->len, lnb->offset, j, o->ioo_bufcnt,
359                                        dentry, rc);
360                                 cleanup_phase = 1;
361                                 GOTO(cleanup, rc);
362                         }
363
364                         tot_bytes += lnb->rc;
365                         if (lnb->rc < lnb->len) {
366                                 /* short read, be sure to wait on it */
367                                 lnb++;
368                                 break;
369                         }
370                 }
371         }
372
373         if (time_after(jiffies, now + 15 * HZ))
374                 CERROR("slow start_page_read %lus\n", (jiffies - now) / HZ);
375         else
376                 CDEBUG(D_INFO, "start_page_read: %lu jiffies\n",
377                        (jiffies - now));
378
379         lprocfs_counter_add(obd->obd_stats, LPROC_FILTER_READ_BYTES, tot_bytes);
380         while (lnb-- > res) {
381                 rc = filter_finish_page_read(lnb);
382                 if (rc) {
383                         CERROR("error page %u@"LPU64" %u %p: rc %d\n", lnb->len,
384                                lnb->offset, (int)(lnb - res), lnb->dentry, rc);
385                         cleanup_phase = 1;
386                         GOTO(cleanup, rc);
387                 }
388         }
389
390         if (time_after(jiffies, now + 15 * HZ))
391                 CERROR("slow finish_page_read %lus\n", (jiffies - now) / HZ);
392         else
393                 CDEBUG(D_INFO, "finish_page_read: %lu jiffies\n",
394                        (jiffies - now));
395
396         filter_tally_read(&exp->exp_obd->u.filter, res, niocount);
397
398         EXIT;
399
400  cleanup:
401         switch (cleanup_phase) {
402         case 1:
403                 for (lnb = res; lnb < (res + niocount); lnb++) {
404                         if (lnb->page)
405                                 page_cache_release(lnb->page);
406                 }
407                 if (res->dentry != NULL)
408                         f_dput(res->dentry);
409                 else
410                         CERROR("NULL dentry in cleanup -- tell CFS\n");
411         case 0:
412                 OBD_FREE(fso, objcount * sizeof(*fso));
413                 pop_ctxt(&saved, &exp->exp_obd->obd_ctxt, NULL);
414         }
415         return rc;
416 }
417
418 /* When clients have dirtied as much space as they've been granted they
419  * fall through to sync writes.  These sync writes haven't been expressed
420  * in grants and need to error with ENOSPC when there isn't room in the
421  * filesystem for them after grants are taken into account.  However,
422  * writeback of the dirty data that was already granted space can write
423  * right on through.
424  *
425  * Caller must hold obd_osfs_lock. */
426 static int filter_grant_check(struct obd_export *exp, int objcount,
427                               struct fsfilt_objinfo *fso, int niocount,
428                               struct niobuf_remote *rnb,
429                               struct niobuf_local *lnb, obd_size *left,
430                               struct inode *inode)
431 {
432         struct filter_export_data *fed = &exp->exp_filter_data;
433         int blocksize = exp->exp_obd->u.filter.fo_sb->s_blocksize;
434         unsigned long used = 0, ungranted = 0, using;
435         int i, rc = -ENOSPC, obj, n = 0, mask = D_CACHE;
436
437         LASSERT_SPIN_LOCKED(&exp->exp_obd->obd_osfs_lock);
438
439         for (obj = 0; obj < objcount; obj++) {
440                 for (i = 0; i < fso[obj].fso_bufcnt; i++, n++) {
441                         int tmp, bytes;
442
443                         /* FIXME: this is calculated with PAGE_SIZE on client */
444                         bytes = rnb[n].len;
445                         bytes += rnb[n].offset & (blocksize - 1);
446                         tmp = (rnb[n].offset + rnb[n].len) & (blocksize - 1);
447                         if (tmp)
448                                 bytes += blocksize - tmp;
449
450                         if (rnb[n].flags & OBD_BRW_FROM_GRANT) {
451                                 if (fed->fed_grant < used + bytes) {
452                                         CDEBUG(D_CACHE,
453                                                "%s: cli %s/%p claims %ld+%d "
454                                                "GRANT, real grant %lu idx %d\n",
455                                                exp->exp_obd->obd_name,
456                                                exp->exp_client_uuid.uuid, exp,
457                                                used, bytes, fed->fed_grant, n);
458                                         mask = D_ERROR;
459                                 } else {
460                                         used += bytes;
461                                         rnb[n].flags |= OBD_BRW_GRANTED;
462                                         lnb[n].lnb_grant_used = bytes;
463                                         CDEBUG(0, "idx %d used=%lu\n", n, used);
464                                         rc = 0;
465                                         continue;
466                                 }
467                         }
468                         if (*left > ungranted) {
469                                 /* if enough space, pretend it was granted */
470                                 ungranted += bytes;
471                                 rnb[n].flags |= OBD_BRW_GRANTED;
472                                 CDEBUG(0, "idx %d ungranted=%lu\n",n,ungranted);
473                                 rc = 0;
474                                 continue;
475                         }
476
477                         /* We can't check for already-mapped blocks here, as
478                          * it requires dropping the osfs lock to do the bmap.
479                          * Instead, we return ENOSPC and in that case we need
480                          * to go through and verify if all of the blocks not
481                          * marked BRW_GRANTED are already mapped and we can
482                          * ignore this error. */
483                         lnb[n].rc = -ENOSPC;
484                         rnb[n].flags &= OBD_BRW_GRANTED;
485                         CDEBUG(D_CACHE,"%s: cli %s/%p idx %d no space for %d\n",
486                                exp->exp_obd->obd_name,
487                                exp->exp_client_uuid.uuid, exp, n, bytes);
488                 }
489         }
490
491         /* Now substract what client have used already.  We don't subtract
492          * this from the tot_granted yet, so that other client's can't grab
493          * that space before we have actually allocated our blocks.  That
494          * happens in filter_grant_commit() after the writes are done. */
495         *left -= ungranted;
496         fed->fed_grant -= used;
497         fed->fed_pending += used;
498         exp->exp_obd->u.filter.fo_tot_pending += used;
499
500         CDEBUG(mask,
501                "%s: cli %s/%p used: %lu ungranted: %lu grant: %lu dirty: %lu\n",
502                exp->exp_obd->obd_name, exp->exp_client_uuid.uuid, exp, used,
503                ungranted, fed->fed_grant, fed->fed_dirty);
504
505         /* Rough calc in case we don't refresh cached statfs data */
506         using = (used + ungranted + 1 ) >>
507                 exp->exp_obd->u.filter.fo_sb->s_blocksize_bits;
508         if (exp->exp_obd->obd_osfs.os_bavail > using)
509                 exp->exp_obd->obd_osfs.os_bavail -= using;
510         else
511                 exp->exp_obd->obd_osfs.os_bavail = 0;
512
513         if (fed->fed_dirty < used) {
514                 CERROR("%s: cli %s/%p claims used %lu > fed_dirty %lu\n",
515                        exp->exp_obd->obd_name, exp->exp_client_uuid.uuid, exp,
516                        used, fed->fed_dirty);
517                 used = fed->fed_dirty;
518         }
519         exp->exp_obd->u.filter.fo_tot_dirty -= used;
520         fed->fed_dirty -= used;
521
522         return rc;
523 }
524
525 static int filter_start_page_write(struct inode *inode,
526                                    struct niobuf_local *lnb)
527 {
528         struct page *page = alloc_pages(GFP_HIGHUSER, 0);
529         if (page == NULL) {
530                 CERROR("no memory for a temp page\n");
531                 RETURN(lnb->rc = -ENOMEM);
532         }
533         POISON_PAGE(page, 0xf1);
534         if (lnb->len != PAGE_SIZE) {
535                 memset(kmap(page) + lnb->len, 0, PAGE_SIZE - lnb->len);
536                 kunmap(page);
537         }
538         page->index = lnb->offset >> PAGE_SHIFT;
539         lnb->page = page;
540
541         return 0;
542 }
543
544 /* If we ever start to support multi-object BRW RPCs, we will need to get locks
545  * on mulitple inodes.  That isn't all, because there still exists the
546  * possibility of a truncate starting a new transaction while holding the ext3
547  * rwsem = write while some writes (which have started their transactions here)
548  * blocking on the ext3 rwsem = read => lock inversion.
549  *
550  * The handling gets very ugly when dealing with locked pages.  It may be easier
551  * to just get rid of the locked page code (which has problems of its own) and
552  * either discover we do not need it anymore (i.e. it was a symptom of another
553  * bug) or ensure we get the page locks in an appropriate order. */
554 static int filter_preprw_write(int cmd, struct obd_export *exp, struct obdo *oa,
555                                int objcount, struct obd_ioobj *obj,
556                                int niocount, struct niobuf_remote *nb,
557                                struct niobuf_local *res,
558                                struct obd_trans_info *oti)
559 {
560         struct obd_run_ctxt saved;
561         struct niobuf_remote *rnb;
562         struct niobuf_local *lnb;
563         struct fsfilt_objinfo fso;
564         struct dentry *dentry;
565         obd_size left;
566         unsigned long now = jiffies;
567         int rc = 0, i, tot_bytes = 0, cleanup_phase = 1;
568         ENTRY;
569         LASSERT(objcount == 1);
570         LASSERT(obj->ioo_bufcnt > 0);
571
572         memset(res, 0, niocount * sizeof(*res));
573
574         push_ctxt(&saved, &exp->exp_obd->obd_ctxt, NULL);
575         dentry = filter_fid2dentry(exp->exp_obd, NULL, obj->ioo_gr,
576                                    obj->ioo_id);
577         if (IS_ERR(dentry))
578                 GOTO(cleanup, rc = PTR_ERR(dentry));
579
580         if (dentry->d_inode == NULL) {
581                 CERROR("trying to BRW to non-existent file "LPU64"\n",
582                        obj->ioo_id);
583                 f_dput(dentry);
584                 GOTO(cleanup, rc = -ENOENT);
585         }
586
587         fso.fso_dentry = dentry;
588         fso.fso_bufcnt = obj->ioo_bufcnt;
589
590         if (time_after(jiffies, now + 15 * HZ))
591                 CERROR("slow preprw_write setup %lus\n", (jiffies - now) / HZ);
592         else
593                 CDEBUG(D_INFO, "preprw_write setup: %lu jiffies\n",
594                        (jiffies - now));
595
596         spin_lock(&exp->exp_obd->obd_osfs_lock);
597         if (oa)
598                 filter_grant_incoming(exp, oa);
599         cleanup_phase = 0;
600
601         left = filter_grant_space_left(exp);
602
603         rc = filter_grant_check(exp, objcount, &fso, niocount, nb, res,
604                                 &left, dentry->d_inode);
605         if (oa && oa->o_valid & OBD_MD_FLGRANT)
606                 oa->o_grant = filter_grant(exp,oa->o_grant,oa->o_undirty,left);
607
608         spin_unlock(&exp->exp_obd->obd_osfs_lock);
609
610         if (rc) {
611                 f_dput(dentry);
612                 GOTO(cleanup, rc);
613         }
614
615         for (i = 0, rnb = nb, lnb = res; i < obj->ioo_bufcnt;
616              i++, lnb++, rnb++) {
617                 /* We still set up for ungranted pages so that granted pages
618                  * can be written to disk as they were promised, and portals
619                  * needs to keep the pages all aligned properly. */ 
620                 lnb->dentry = dentry;
621                 lnb->offset = rnb->offset;
622                 lnb->len    = rnb->len;
623                 lnb->flags  = rnb->flags;
624
625                 rc = filter_start_page_write(dentry->d_inode, lnb);
626                 if (rc) {
627                         CERROR("page err %u@"LPU64" %u/%u %p: rc %d\n",
628                                lnb->len, lnb->offset,
629                                i, obj->ioo_bufcnt, dentry, rc);
630                         while (lnb-- > res)
631                                 __free_pages(lnb->page, 0);
632                         f_dput(dentry);
633                         GOTO(cleanup, rc);
634                 }
635                 if (lnb->rc == 0)
636                         tot_bytes += lnb->len;
637         }
638
639         if (time_after(jiffies, now + 15 * HZ))
640                 CERROR("slow start_page_write %lus\n", (jiffies - now) / HZ);
641         else
642                 CDEBUG(D_INFO, "start_page_write: %lu jiffies\n",
643                        (jiffies - now));
644
645         lprocfs_counter_add(exp->exp_obd->obd_stats, LPROC_FILTER_WRITE_BYTES,
646                             tot_bytes);
647         EXIT;
648 cleanup:
649         switch(cleanup_phase) {
650         case 1:
651                 spin_lock(&exp->exp_obd->obd_osfs_lock);
652                 if (oa)
653                         filter_grant_incoming(exp, oa);
654                 spin_unlock(&exp->exp_obd->obd_osfs_lock);
655         default: ;
656         }
657         pop_ctxt(&saved, &exp->exp_obd->obd_ctxt, NULL);
658         return rc;
659 }
660
661 int filter_preprw(int cmd, struct obd_export *exp, struct obdo *oa,
662                   int objcount, struct obd_ioobj *obj, int niocount,
663                   struct niobuf_remote *nb, struct niobuf_local *res,
664                   struct obd_trans_info *oti)
665 {
666         if (cmd == OBD_BRW_WRITE)
667                 return filter_preprw_write(cmd, exp, oa, objcount, obj,
668                                            niocount, nb, res, oti);
669
670         if (cmd == OBD_BRW_READ)
671                 return filter_preprw_read(cmd, exp, oa, objcount, obj,
672                                           niocount, nb, res, oti);
673
674         LBUG();
675         return -EPROTO;
676 }
677
678 static int filter_commitrw_read(struct obd_export *exp, struct obdo *oa,
679                                 int objcount, struct obd_ioobj *obj,
680                                 int niocount, struct niobuf_local *res,
681                                 struct obd_trans_info *oti, int rc)
682 {
683         struct obd_ioobj *o;
684         struct niobuf_local *lnb;
685         int i, j, drop = 0;
686         ENTRY;
687
688         if (res->dentry != NULL)
689                 drop = (res->dentry->d_inode->i_size >
690                         exp->exp_obd->u.filter.fo_readcache_max_filesize);
691
692         for (i = 0, o = obj, lnb = res; i < objcount; i++, o++) {
693                 for (j = 0 ; j < o->ioo_bufcnt ; j++, lnb++) {
694                         if (lnb->page == NULL)
695                                 continue;
696                         /* drop from cache like truncate_list_pages() */
697                         if (drop && !TryLockPage(lnb->page)) {
698                                 if (lnb->page->mapping)
699                                         ll_truncate_complete_page(lnb->page);
700                                 unlock_page(lnb->page);
701                         }
702                         page_cache_release(lnb->page);
703                 }
704         }
705
706         if (res->dentry != NULL)
707                 f_dput(res->dentry);
708         RETURN(rc);
709 }
710
711 void flip_into_page_cache(struct inode *inode, struct page *new_page)
712 {
713         struct page *old_page;
714         int rc;
715
716         do {
717                 /* the dlm is protecting us from read/write concurrency, so we
718                  * expect this find_lock_page to return quickly.  even if we
719                  * race with another writer it won't be doing much work with
720                  * the page locked.  we do this 'cause t_c_p expects a
721                  * locked page, and it wants to grab the pagecache lock
722                  * as well. */
723                 old_page = find_lock_page(inode->i_mapping, new_page->index);
724                 if (old_page) {
725                         ll_truncate_complete_page(old_page);
726                         unlock_page(old_page);
727                         page_cache_release(old_page);
728                 }
729
730 #if 0 /* this should be a /proc tunable someday */
731                 /* racing o_directs (no locking ioctl) could race adding
732                  * their pages, so we repeat the page invalidation unless
733                  * we successfully added our new page */
734                 rc = add_to_page_cache_unique(new_page, inode->i_mapping,
735                                               new_page->index,
736                                               page_hash(inode->i_mapping,
737                                                         new_page->index));
738                 if (rc == 0) {
739                         /* add_to_page_cache clears uptodate|dirty and locks
740                          * the page */
741                         SetPageUptodate(new_page);
742                         unlock_page(new_page);
743                 }
744 #else
745                 rc = 0;
746 #endif
747         } while (rc != 0);
748 }
749
750 void filter_grant_commit(struct obd_export *exp, int niocount,
751                          struct niobuf_local *res)
752 {
753         struct filter_obd *filter = &exp->exp_obd->u.filter;
754         struct niobuf_local *lnb = res;
755         unsigned long pending = 0;
756         int i;
757
758         spin_lock(&exp->exp_obd->obd_osfs_lock);
759         for (i = 0, lnb = res; i < niocount; i++, lnb++)
760                 pending += lnb->lnb_grant_used;
761
762         LASSERTF(exp->exp_filter_data.fed_pending >= pending,
763                  "%s: cli %s/%p fed_pending: %lu grant_used: %lu\n",
764                  exp->exp_obd->obd_name, exp->exp_client_uuid.uuid, exp,
765                  exp->exp_filter_data.fed_pending, pending);
766         exp->exp_filter_data.fed_pending -= pending;
767         LASSERTF(filter->fo_tot_granted >= pending,
768                  "%s: cli %s/%p tot_granted: "LPU64" grant_used: %lu\n",
769                  exp->exp_obd->obd_name, exp->exp_client_uuid.uuid, exp,
770                  exp->exp_obd->u.filter.fo_tot_granted, pending);
771         filter->fo_tot_granted -= pending;
772         LASSERTF(filter->fo_tot_pending >= pending,
773                  "%s: cli %s/%p tot_pending: "LPU64" grant_used: %lu\n",
774                  exp->exp_obd->obd_name, exp->exp_client_uuid.uuid, exp,
775                  filter->fo_tot_pending, pending);
776         filter->fo_tot_pending -= pending;
777
778         spin_unlock(&exp->exp_obd->obd_osfs_lock);
779 }
780
781 int filter_commitrw(int cmd, struct obd_export *exp, struct obdo *oa,
782                     int objcount, struct obd_ioobj *obj, int niocount,
783                     struct niobuf_local *res, struct obd_trans_info *oti,int rc)
784 {
785         if (cmd == OBD_BRW_WRITE)
786                 return filter_commitrw_write(exp, oa, objcount, obj, niocount,
787                                              res, oti, rc);
788         if (cmd == OBD_BRW_READ)
789                 return filter_commitrw_read(exp, oa, objcount, obj, niocount,
790                                             res, oti, rc);
791         LBUG();
792         return -EPROTO;
793 }
794
795 int filter_brw(int cmd, struct obd_export *exp, struct obdo *oa,
796                struct lov_stripe_md *lsm, obd_count oa_bufs,
797                struct brw_page *pga, struct obd_trans_info *oti)
798 {
799         struct obd_ioobj ioo;
800         struct niobuf_local *lnb;
801         struct niobuf_remote *rnb;
802         obd_count i;
803         int ret = 0;
804         ENTRY;
805
806         OBD_ALLOC(lnb, oa_bufs * sizeof(struct niobuf_local));
807         OBD_ALLOC(rnb, oa_bufs * sizeof(struct niobuf_remote));
808
809         if (lnb == NULL || rnb == NULL)
810                 GOTO(out, ret = -ENOMEM);
811
812         for (i = 0; i < oa_bufs; i++) {
813                 rnb[i].offset = pga[i].off;
814                 rnb[i].len = pga[i].count;
815         }
816
817         obdo_to_ioobj(oa, &ioo);
818         ioo.ioo_bufcnt = oa_bufs;
819
820         ret = filter_preprw(cmd, exp, oa, 1, &ioo, oa_bufs, rnb, lnb, oti);
821         if (ret != 0)
822                 GOTO(out, ret);
823
824         for (i = 0; i < oa_bufs; i++) {
825                 void *virt = kmap(pga[i].pg);
826                 obd_off off = pga[i].off & ~PAGE_MASK;
827                 void *addr = kmap(lnb[i].page);
828
829                 /* 2 kmaps == vanishingly small deadlock opportunity */
830
831                 if (cmd & OBD_BRW_WRITE)
832                         memcpy(addr + off, virt + off, pga[i].count);
833                 else
834                         memcpy(virt + off, addr + off, pga[i].count);
835
836                 kunmap(lnb[i].page);
837                 kunmap(pga[i].pg);
838         }
839
840         ret = filter_commitrw(cmd, exp, oa, 1, &ioo, oa_bufs, lnb, oti, ret);
841
842 out:
843         if (lnb)
844                 OBD_FREE(lnb, oa_bufs * sizeof(struct niobuf_local));
845         if (rnb)
846                 OBD_FREE(rnb, oa_bufs * sizeof(struct niobuf_remote));
847         RETURN(ret);
848 }