Whamcloud - gitweb
1)add more snap cow hook for dir ops.
[fs/lustre-release.git] / lustre / obdfilter / filter_io.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  linux/fs/obdfilter/filter_io.c
5  *
6  *  Copyright (c) 2001-2003 Cluster File Systems, Inc.
7  *   Author: Peter Braam <braam@clusterfs.com>
8  *   Author: Andreas Dilger <adilger@clusterfs.com>
9  *   Author: Phil Schwan <phil@clusterfs.com>
10  *
11  *   This file is part of Lustre, http://www.lustre.org.
12  *
13  *   Lustre is free software; you can redistribute it and/or
14  *   modify it under the terms of version 2 of the GNU General Public
15  *   License as published by the Free Software Foundation.
16  *
17  *   Lustre is distributed in the hope that it will be useful,
18  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
19  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
20  *   GNU General Public License for more details.
21  *
22  *   You should have received a copy of the GNU General Public License
23  *   along with Lustre; if not, write to the Free Software
24  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
25  */
26
27 #define DEBUG_SUBSYSTEM S_FILTER
28
29 #include <linux/config.h>
30 #include <linux/module.h>
31 #include <linux/pagemap.h> // XXX kill me soon
32 #include <linux/version.h>
33
34 #include <linux/obd_class.h>
35 #include <linux/lustre_fsfilt.h>
36 #include <linux/lustre_smfs.h>
37 #include <linux/lustre_snap.h>
38 #include "filter_internal.h"
39
40 static int filter_start_page_read(struct obd_device *obd, struct inode *inode,
41                                   struct niobuf_local *lnb)
42 {
43         struct page *page;
44         unsigned long index = lnb->offset >> PAGE_SHIFT;
45
46         page = fsfilt_getpage(obd, inode, index);
47         if (IS_ERR(page)) {
48                 CERROR("page index %lu, rc = %ld\n", index, PTR_ERR(page));
49
50                 lnb->page = NULL;
51                 lnb->rc = PTR_ERR(page);
52                 return lnb->rc;
53         }
54
55         lnb->page = page;
56
57         return 0;
58 }
59
60 static int filter_finish_page_read(struct niobuf_local *lnb)
61 {
62         if (lnb->page == NULL)
63                 return 0;
64
65         if (PageUptodate(lnb->page))
66                 return 0;
67
68         wait_on_page(lnb->page);
69         if (!PageUptodate(lnb->page)) {
70                 CERROR("page index %lu/offset "LPX64" not uptodate\n",
71                        lnb->page->index, lnb->offset);
72                 GOTO(err_page, lnb->rc = -EIO);
73         }
74         if (PageError(lnb->page)) {
75                 CERROR("page index %lu/offset "LPX64" has error\n",
76                        lnb->page->index, lnb->offset);
77                 GOTO(err_page, lnb->rc = -EIO);
78         }
79
80         return 0;
81
82 err_page:
83         page_cache_release(lnb->page);
84         lnb->page = NULL;
85         return lnb->rc;
86 }
87
88 /* Grab the dirty and seen grant announcements from the incoming obdo.
89  * We will later calculate the clients new grant and return it.
90  * Caller must hold osfs lock */
91 static void filter_grant_incoming(struct obd_export *exp, struct obdo *oa)
92 {
93         struct filter_export_data *fed;
94         struct obd_device *obd = exp->exp_obd;
95         static unsigned long last_msg;
96         static int last_count;
97         int mask = D_CACHE;
98         ENTRY;
99
100         LASSERT_SPIN_LOCKED(&obd->obd_osfs_lock);
101
102         if ((oa->o_valid & (OBD_MD_FLBLOCKS|OBD_MD_FLGRANT)) !=
103                                         (OBD_MD_FLBLOCKS|OBD_MD_FLGRANT)) {
104                 oa->o_valid &= ~OBD_MD_FLGRANT;
105                 EXIT;
106                 return;
107         }
108
109         fed = &exp->exp_filter_data;
110
111         /* Don't print this to the console the first time it happens, since
112          * it can happen legitimately on occasion, but only rarely. */
113         if (time_after(jiffies, last_msg + 60 * HZ)) {
114                 last_count = 0;
115                 last_msg = jiffies;
116         }
117         if ((last_count & (-last_count)) == last_count)
118                 mask = D_WARNING;
119         last_count++;
120
121         /* Add some margin, since there is a small race if other RPCs arrive
122          * out-or-order and have already consumed some grant.  We want to
123          * leave this here in case there is a large error in accounting. */
124         CDEBUG(oa->o_grant > fed->fed_grant + FILTER_GRANT_CHUNK ? mask:D_CACHE,
125                "%s: cli %s/%p reports grant: "LPU64" dropped: %u, local: %lu\n",
126                obd->obd_name, exp->exp_client_uuid.uuid, exp, oa->o_grant,
127                oa->o_dropped, fed->fed_grant);
128
129         /* Update our accounting now so that statfs takes it into account.
130          * Note that fed_dirty is only approximate and can become incorrect
131          * if RPCs arrive out-of-order.  No important calculations depend
132          * on fed_dirty however. */
133         obd->u.filter.fo_tot_dirty += oa->o_dirty - fed->fed_dirty;
134         if (fed->fed_grant < oa->o_dropped) {
135                 CERROR("%s: cli %s/%p reports %u dropped > fed_grant %lu\n",
136                        obd->obd_name, exp->exp_client_uuid.uuid, exp,
137                        oa->o_dropped, fed->fed_grant);
138                 oa->o_dropped = 0;
139         }
140         if (obd->u.filter.fo_tot_granted < oa->o_dropped) {
141                 CERROR("%s: cli %s/%p reports %u dropped > tot_grant "LPU64"\n",
142                        obd->obd_name, exp->exp_client_uuid.uuid, exp,
143                        oa->o_dropped, obd->u.filter.fo_tot_granted);
144                 oa->o_dropped = 0;
145         }
146         obd->u.filter.fo_tot_granted -= oa->o_dropped;
147         fed->fed_grant -= oa->o_dropped;
148         fed->fed_dirty = oa->o_dirty;
149         EXIT;
150 }
151
152 #define GRANT_FOR_LLOG(obd) 16
153
154 /* Figure out how much space is available between what we've granted
155  * and what remains in the filesystem.  Compensate for ext3 indirect
156  * block overhead when computing how much free space is left ungranted.
157  *
158  * Caller must hold obd_osfs_lock. */
159 obd_size filter_grant_space_left(struct obd_export *exp)
160 {
161         struct obd_device *obd = exp->exp_obd;
162         int blockbits = obd->u.filter.fo_sb->s_blocksize_bits;
163         obd_size tot_granted = obd->u.filter.fo_tot_granted, avail, left = 0;
164         int rc, statfs_done = 0;
165
166         LASSERT_SPIN_LOCKED(&obd->obd_osfs_lock);
167
168         if (time_before(obd->obd_osfs_age, jiffies - HZ)) {
169 restat:
170                 rc = fsfilt_statfs(obd, obd->u.filter.fo_sb, jiffies + 1);
171                 if (rc) /* N.B. statfs can't really fail */
172                         RETURN(0);
173                 statfs_done = 1;
174         }
175
176         avail = obd->obd_osfs.os_bavail;
177         left = avail - (avail >> (blockbits - 3)); /* (d)indirect */
178         if (left > GRANT_FOR_LLOG(obd)) {
179                 left = (left - GRANT_FOR_LLOG(obd)) << blockbits;
180         } else {
181                 left = 0 /* << blockbits */;
182         }
183
184         if (!statfs_done && left < 32 * FILTER_GRANT_CHUNK + tot_granted) {
185                 CDEBUG(D_CACHE, "fs has no space left and statfs too old\n");
186                 goto restat;
187         }
188
189         if (left >= tot_granted) {
190                 left -= tot_granted;
191         } else {
192                 static unsigned long next;
193                 if (left < tot_granted - obd->u.filter.fo_tot_pending &&
194                     time_after(jiffies, next)) {
195                         spin_unlock(&obd->obd_osfs_lock);
196                         CERROR("%s: cli %s/%p grant "LPU64" > available "
197                                LPU64" and pending "LPU64"\n", obd->obd_name,
198                                exp->exp_client_uuid.uuid, exp, tot_granted,
199                                left, obd->u.filter.fo_tot_pending);
200                         if (next == 0)
201                                 portals_debug_dumplog();
202                         next = jiffies + 20 * HZ;
203                         spin_lock(&obd->obd_osfs_lock);
204                 }
205                 left = 0;
206         }
207
208         CDEBUG(D_CACHE, "%s: cli %s/%p free: "LPU64" avail: "LPU64" grant "LPU64
209                " left: "LPU64" pending: "LPU64"\n", obd->obd_name,
210                exp->exp_client_uuid.uuid, exp,
211                obd->obd_osfs.os_bfree << blockbits, avail << blockbits,
212                tot_granted, left, obd->u.filter.fo_tot_pending);
213
214         return left;
215 }
216
217 /* Calculate how much grant space to allocate to this client, based on how
218  * much space is currently free and how much of that is already granted.
219  *
220  * Caller must hold obd_osfs_lock. */
221 long filter_grant(struct obd_export *exp, obd_size current_grant,
222                   obd_size want, obd_size fs_space_left)
223 {
224         struct obd_device *obd = exp->exp_obd;
225         struct filter_export_data *fed = &exp->exp_filter_data;
226         int blockbits = obd->u.filter.fo_sb->s_blocksize_bits;
227         __u64 grant = 0;
228
229         LASSERT_SPIN_LOCKED(&obd->obd_osfs_lock);
230
231         /* Grant some fraction of the client's requested grant space so that
232          * they are not always waiting for write credits (not all of it to
233          * avoid overgranting in face of multiple RPCs in flight).  This
234          * essentially will be able to control the OSC_MAX_RIF for a client.
235          *
236          * If we do have a large disparity between what the client thinks it
237          * has and what we think it has, don't grant very much and let the
238          * client consume its grant first.  Either it just has lots of RPCs
239          * in flight, or it was evicted and its grants will soon be used up. */
240         if (current_grant < want &&
241             current_grant < fed->fed_grant + FILTER_GRANT_CHUNK) {
242                 grant = min((want >> blockbits) / 2,
243                             (fs_space_left >> blockbits) / 8);
244                 grant <<= blockbits;
245
246                 if (grant) {
247                         if (grant > FILTER_GRANT_CHUNK)
248                                 grant = FILTER_GRANT_CHUNK;
249
250                         obd->u.filter.fo_tot_granted += grant;
251                         fed->fed_grant += grant;
252                 }
253         }
254
255         CDEBUG(D_CACHE,"%s: cli %s/%p wants: "LPU64" granting: "LPU64"\n",
256                obd->obd_name, exp->exp_client_uuid.uuid, exp, want, grant);
257         CDEBUG(D_CACHE,
258                "%s: cli %s/%p tot cached:"LPU64" granted:"LPU64
259                " num_exports: %d\n", obd->obd_name, exp->exp_client_uuid.uuid,
260                exp, obd->u.filter.fo_tot_dirty,
261                obd->u.filter.fo_tot_granted, obd->obd_num_exports);
262
263         return grant;
264 }
265
266 static int filter_preprw_read(int cmd, struct obd_export *exp, struct obdo *oa,
267                               int objcount, struct obd_ioobj *obj,
268                               int niocount, struct niobuf_remote *nb,
269                               struct niobuf_local *res,
270                               struct obd_trans_info *oti)
271 {
272         struct obd_device *obd = exp->exp_obd;
273         struct lvfs_run_ctxt saved;
274         struct obd_ioobj *o;
275         struct niobuf_remote *rnb;
276         struct niobuf_local *lnb = NULL;
277         struct fsfilt_objinfo *fso;
278         struct dentry *dentry;
279         struct inode *inode;
280         int rc = 0, i, j, tot_bytes = 0, cleanup_phase = 0;
281         unsigned long now = jiffies;
282         ENTRY;
283
284         /* We are currently not supporting multi-obj BRW_READ RPCS at all.
285          * When we do this function's dentry cleanup will need to be fixed */
286         LASSERT(objcount == 1);
287         LASSERT(obj->ioo_bufcnt > 0);
288
289         if (oa && oa->o_valid & OBD_MD_FLGRANT) {
290                 spin_lock(&obd->obd_osfs_lock);
291                 filter_grant_incoming(exp, oa);
292
293 #if 0
294                 /* Reads do not increase grants */
295                 oa->o_grant = filter_grant(exp, oa->o_grant, oa->o_undirty,
296                                            filter_grant_space_left(exp));
297 #else
298                 oa->o_grant = 0;
299 #endif
300                 spin_unlock(&obd->obd_osfs_lock);
301         }
302
303         OBD_ALLOC(fso, objcount * sizeof(*fso));
304         if (fso == NULL)
305                 RETURN(-ENOMEM);
306
307         memset(res, 0, niocount * sizeof(*res));
308
309         push_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL);
310         for (i = 0, o = obj; i < objcount; i++, o++) {
311                 LASSERT(o->ioo_bufcnt);
312
313                 dentry = filter_oa2dentry(obd, oa);
314                 if (IS_ERR(dentry))
315                         GOTO(cleanup, rc = PTR_ERR(dentry));
316
317                 if (dentry->d_inode == NULL) {
318                         CERROR("trying to BRW to non-existent file "LPU64"\n",
319                                o->ioo_id);
320                         f_dput(dentry);
321                         GOTO(cleanup, rc = -ENOENT);
322                 }
323
324                 fso[i].fso_dentry = dentry;
325                 fso[i].fso_bufcnt = o->ioo_bufcnt;
326         }
327
328         if (time_after(jiffies, now + 15 * HZ))
329                 CERROR("slow preprw_read setup %lus\n", (jiffies - now) / HZ);
330         else
331                 CDEBUG(D_INFO, "preprw_read setup: %lu jiffies\n",
332                        (jiffies - now));
333
334         for (i = 0, o = obj, rnb = nb, lnb = res; i < objcount; i++, o++) {
335                 dentry = fso[i].fso_dentry;
336                 inode = dentry->d_inode;
337
338                 for (j = 0; j < o->ioo_bufcnt; j++, rnb++, lnb++) {
339                         lnb->dentry = dentry;
340                         lnb->offset = rnb->offset;
341                         lnb->len    = rnb->len;
342                         lnb->flags  = rnb->flags;
343
344                         if (inode->i_size <= rnb->offset) {
345                                 /* If there's no more data, abort early.
346                                  * lnb->page == NULL and lnb->rc == 0, so it's
347                                  * easy to detect later. */
348                                 break;
349                         } else {
350                                 rc = filter_start_page_read(obd, inode, lnb);
351                         }
352
353                         if (rc) {
354                                 CDEBUG(rc == -ENOSPC ? D_INODE : D_ERROR,
355                                        "page err %u@"LPU64" %u/%u %p: rc %d\n",
356                                        lnb->len, lnb->offset, j, o->ioo_bufcnt,
357                                        dentry, rc);
358                                 cleanup_phase = 1;
359                                 GOTO(cleanup, rc);
360                         }
361
362                         if (inode->i_size < lnb->offset + lnb->len - 1)
363                                 lnb->rc = inode->i_size - lnb->offset;
364                         else
365                                 lnb->rc = lnb->len;
366
367                         tot_bytes += lnb->rc;
368                         if (lnb->rc < lnb->len) {
369                                 /* short read, be sure to wait on it */
370                                 lnb++;
371                                 break;
372                         }
373                 }
374         }
375
376         if (time_after(jiffies, now + 15 * HZ))
377                 CERROR("slow start_page_read %lus\n", (jiffies - now) / HZ);
378         else
379                 CDEBUG(D_INFO, "start_page_read: %lu jiffies\n",
380                        (jiffies - now));
381
382         lprocfs_counter_add(obd->obd_stats, LPROC_FILTER_READ_BYTES, tot_bytes);
383         while (lnb-- > res) {
384                 rc = filter_finish_page_read(lnb);
385                 if (rc) {
386                         CERROR("error page %u@"LPU64" %u %p: rc %d\n", lnb->len,
387                                lnb->offset, (int)(lnb - res), lnb->dentry, rc);
388                         cleanup_phase = 1;
389                         GOTO(cleanup, rc);
390                 }
391         }
392
393         if (time_after(jiffies, now + 15 * HZ))
394                 CERROR("slow finish_page_read %lus\n", (jiffies - now) / HZ);
395         else
396                 CDEBUG(D_INFO, "finish_page_read: %lu jiffies\n",
397                        (jiffies - now));
398
399         filter_tally_read(&exp->exp_obd->u.filter, res, niocount);
400
401         EXIT;
402
403  cleanup:
404         switch (cleanup_phase) {
405         case 1:
406                 for (lnb = res; lnb < (res + niocount); lnb++) {
407                         if (lnb->page)
408                                 page_cache_release(lnb->page);
409                 }
410                 if (res->dentry != NULL)
411                         f_dput(res->dentry);
412                 else
413                         CERROR("NULL dentry in cleanup -- tell CFS\n");
414         case 0:
415                 OBD_FREE(fso, objcount * sizeof(*fso));
416                 pop_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL);
417         }
418         return rc;
419 }
420
421 /* When clients have dirtied as much space as they've been granted they
422  * fall through to sync writes.  These sync writes haven't been expressed
423  * in grants and need to error with ENOSPC when there isn't room in the
424  * filesystem for them after grants are taken into account.  However,
425  * writeback of the dirty data that was already granted space can write
426  * right on through.
427  *
428  * Caller must hold obd_osfs_lock. */
429 static int filter_grant_check(struct obd_export *exp, int objcount,
430                               struct fsfilt_objinfo *fso, int niocount,
431                               struct niobuf_remote *rnb,
432                               struct niobuf_local *lnb, obd_size *left,
433                               struct inode *inode)
434 {
435         struct filter_export_data *fed = &exp->exp_filter_data;
436         int blocksize = exp->exp_obd->u.filter.fo_sb->s_blocksize;
437         unsigned long used = 0, ungranted = 0, using;
438         int i, rc = -ENOSPC, obj, n = 0, mask = D_CACHE;
439
440         LASSERT_SPIN_LOCKED(&exp->exp_obd->obd_osfs_lock);
441
442         for (obj = 0; obj < objcount; obj++) {
443                 for (i = 0; i < fso[obj].fso_bufcnt; i++, n++) {
444                         int tmp, bytes;
445
446                         /* FIXME: this is calculated with PAGE_SIZE on client */
447                         bytes = rnb[n].len;
448                         bytes += rnb[n].offset & (blocksize - 1);
449                         tmp = (rnb[n].offset + rnb[n].len) & (blocksize - 1);
450                         if (tmp)
451                                 bytes += blocksize - tmp;
452
453                         if (rnb[n].flags & OBD_BRW_FROM_GRANT) {
454                                 if (fed->fed_grant < used + bytes) {
455                                         CDEBUG(D_CACHE,
456                                                "%s: cli %s/%p claims %ld+%d "
457                                                "GRANT, real grant %lu idx %d\n",
458                                                exp->exp_obd->obd_name,
459                                                exp->exp_client_uuid.uuid, exp,
460                                                used, bytes, fed->fed_grant, n);
461                                         mask = D_ERROR;
462                                 } else {
463                                         used += bytes;
464                                         rnb[n].flags |= OBD_BRW_GRANTED;
465                                         lnb[n].lnb_grant_used = bytes;
466                                         CDEBUG(0, "idx %d used=%lu\n", n, used);
467                                         rc = 0;
468                                         continue;
469                                 }
470                         }
471                         if (*left > ungranted) {
472                                 /* if enough space, pretend it was granted */
473                                 ungranted += bytes;
474                                 rnb[n].flags |= OBD_BRW_GRANTED;
475                                 CDEBUG(0, "idx %d ungranted=%lu\n",n,ungranted);
476                                 rc = 0;
477                                 continue;
478                         }
479
480                         /* We can't check for already-mapped blocks here, as
481                          * it requires dropping the osfs lock to do the bmap.
482                          * Instead, we return ENOSPC and in that case we need
483                          * to go through and verify if all of the blocks not
484                          * marked BRW_GRANTED are already mapped and we can
485                          * ignore this error. */
486                         lnb[n].rc = -ENOSPC;
487                         rnb[n].flags &= OBD_BRW_GRANTED;
488                         CDEBUG(D_CACHE,"%s: cli %s/%p idx %d no space for %d\n",
489                                exp->exp_obd->obd_name,
490                                exp->exp_client_uuid.uuid, exp, n, bytes);
491                 }
492         }
493
494         /* Now substract what client have used already.  We don't subtract
495          * this from the tot_granted yet, so that other client's can't grab
496          * that space before we have actually allocated our blocks.  That
497          * happens in filter_grant_commit() after the writes are done. */
498         *left -= ungranted;
499         fed->fed_grant -= used;
500         fed->fed_pending += used;
501         exp->exp_obd->u.filter.fo_tot_pending += used;
502
503         CDEBUG(mask,
504                "%s: cli %s/%p used: %lu ungranted: %lu grant: %lu dirty: %lu\n",
505                exp->exp_obd->obd_name, exp->exp_client_uuid.uuid, exp, used,
506                ungranted, fed->fed_grant, fed->fed_dirty);
507
508         /* Rough calc in case we don't refresh cached statfs data */
509         using = (used + ungranted + 1 ) >>
510                 exp->exp_obd->u.filter.fo_sb->s_blocksize_bits;
511         if (exp->exp_obd->obd_osfs.os_bavail > using)
512                 exp->exp_obd->obd_osfs.os_bavail -= using;
513         else
514                 exp->exp_obd->obd_osfs.os_bavail = 0;
515
516         if (fed->fed_dirty < used) {
517                 CERROR("%s: cli %s/%p claims used %lu > fed_dirty %lu\n",
518                        exp->exp_obd->obd_name, exp->exp_client_uuid.uuid, exp,
519                        used, fed->fed_dirty);
520                 used = fed->fed_dirty;
521         }
522         exp->exp_obd->u.filter.fo_tot_dirty -= used;
523         fed->fed_dirty -= used;
524
525         return rc;
526 }
527
528 static int filter_start_page_write(struct obd_device *obd, struct inode *inode,
529                                    struct niobuf_local *lnb)
530 {
531         struct page *page;
532
533         if (lnb->len != PAGE_SIZE)
534                 return filter_start_page_read(obd, inode, lnb);
535
536         page = alloc_pages(GFP_HIGHUSER, 0);
537         if (page == NULL) {
538                 CERROR("no memory for a temp page\n");
539                 RETURN(lnb->rc = -ENOMEM);
540         }
541 #if 0
542         POISON_PAGE(page, 0xf1);
543         if (lnb->len != PAGE_SIZE) {
544                 memset(kmap(page) + lnb->len, 0, PAGE_SIZE - lnb->len);
545                 kunmap(page);
546         }
547 #endif
548         page->index = lnb->offset >> PAGE_SHIFT;
549         lnb->page = page;
550
551         return 0;
552 }
553
554 static void filter_abort_page_write(struct niobuf_local *lnb)
555 {
556         LASSERT(lnb->page != NULL);
557
558         if (lnb->len != PAGE_SIZE)
559                 page_cache_release(lnb->page);
560         else
561                 __free_pages(lnb->page, 0);
562 }
563
564 /* a helper for both the 2.4 and 2.6 commitrw paths which are both built
565  * up by our shared filter_preprw_write() */
566 void filter_release_write_page(struct filter_obd *filter, struct inode *inode,
567                                struct niobuf_local *lnb, int rc)
568 {
569         if (lnb->len != PAGE_SIZE)
570                 return filter_release_read_page(filter, inode, lnb->page);
571
572         if (rc == 0)
573                 flip_into_page_cache(inode, lnb->page);
574         __free_page(lnb->page);
575 }
576
577 /* If we ever start to support multi-object BRW RPCs, we will need to get locks
578  * on mulitple inodes.  That isn't all, because there still exists the
579  * possibility of a truncate starting a new transaction while holding the ext3
580  * rwsem = write while some writes (which have started their transactions here)
581  * blocking on the ext3 rwsem = read => lock inversion.
582  *
583  * The handling gets very ugly when dealing with locked pages.  It may be easier
584  * to just get rid of the locked page code (which has problems of its own) and
585  * either discover we do not need it anymore (i.e. it was a symptom of another
586  * bug) or ensure we get the page locks in an appropriate order. */
587 static int filter_preprw_write(int cmd, struct obd_export *exp, struct obdo *oa,
588                                int objcount, struct obd_ioobj *obj,
589                                int niocount, struct niobuf_remote *nb,
590                                struct niobuf_local *res,
591                                struct obd_trans_info *oti)
592 {
593         struct lvfs_run_ctxt saved;
594         struct niobuf_remote *rnb;
595         struct niobuf_local *lnb;
596         struct fsfilt_objinfo fso;
597         struct dentry *dentry;
598         obd_size left;
599         unsigned long now = jiffies;
600         int rc = 0, i, tot_bytes = 0, cleanup_phase = 1;
601         ENTRY;
602         LASSERT(objcount == 1);
603         LASSERT(obj->ioo_bufcnt > 0);
604
605         memset(res, 0, niocount * sizeof(*res));
606
607         push_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL);
608         dentry = filter_fid2dentry(exp->exp_obd, NULL, obj->ioo_gr,
609                                    obj->ioo_id);
610         if (IS_ERR(dentry))
611                 GOTO(cleanup, rc = PTR_ERR(dentry));
612
613         if (dentry->d_inode == NULL) {
614                 CERROR("trying to BRW to non-existent file "LPU64"\n",
615                        obj->ioo_id);
616                 f_dput(dentry);
617                 GOTO(cleanup, rc = -ENOENT);
618         }
619
620         fso.fso_dentry = dentry;
621         fso.fso_bufcnt = obj->ioo_bufcnt;
622
623         if (time_after(jiffies, now + 15 * HZ))
624                 CERROR("slow preprw_write setup %lus\n", (jiffies - now) / HZ);
625         else
626                 CDEBUG(D_INFO, "preprw_write setup: %lu jiffies\n",
627                        (jiffies - now));
628
629         spin_lock(&exp->exp_obd->obd_osfs_lock);
630         if (oa)
631                 filter_grant_incoming(exp, oa);
632         cleanup_phase = 0;
633
634         left = filter_grant_space_left(exp);
635
636         rc = filter_grant_check(exp, objcount, &fso, niocount, nb, res,
637                                 &left, dentry->d_inode);
638         if (oa && oa->o_valid & OBD_MD_FLGRANT)
639                 oa->o_grant = filter_grant(exp,oa->o_grant,oa->o_undirty,left);
640
641         spin_unlock(&exp->exp_obd->obd_osfs_lock);
642
643         if (rc) {
644                 f_dput(dentry);
645                 GOTO(cleanup, rc);
646         }
647
648         for (i = 0, rnb = nb, lnb = res; i < obj->ioo_bufcnt;
649              i++, lnb++, rnb++) {
650                 /* We still set up for ungranted pages so that granted pages
651                  * can be written to disk as they were promised, and portals
652                  * needs to keep the pages all aligned properly. */
653                 lnb->dentry = dentry;
654                 lnb->offset = rnb->offset;
655                 lnb->len    = rnb->len;
656                 lnb->flags  = rnb->flags;
657
658                 rc = filter_start_page_write(exp->exp_obd, dentry->d_inode,lnb);
659                 if (rc) {
660                         CERROR("page err %u@"LPU64" %u/%u %p: rc %d\n",
661                                lnb->len, lnb->offset,
662                                i, obj->ioo_bufcnt, dentry, rc);
663                         while (lnb-- > res)
664                                 filter_abort_page_write(lnb);
665                         f_dput(dentry);
666                         GOTO(cleanup, rc);
667                 }
668                 if (lnb->rc == 0)
669                         tot_bytes += lnb->len;
670         }
671
672         while (lnb-- > res) {
673                 if (lnb->len == PAGE_SIZE)
674                         continue;
675                 rc = filter_finish_page_read(lnb);
676                 if (rc) {
677                         CERROR("error page %u@"LPU64" %u %p: rc %d\n", lnb->len,
678                                lnb->offset, (int)(lnb - res), lnb->dentry, rc);
679                         GOTO(cleanup, rc);
680                 }
681         }
682
683         if (time_after(jiffies, now + 15 * HZ))
684                 CERROR("slow start_page_write %lus\n", (jiffies - now) / HZ);
685         else
686                 CDEBUG(D_INFO, "start_page_write: %lu jiffies\n",
687                        (jiffies - now));
688
689         lprocfs_counter_add(exp->exp_obd->obd_stats, LPROC_FILTER_WRITE_BYTES,
690                             tot_bytes);
691         EXIT;
692 cleanup:
693         switch(cleanup_phase) {
694         case 1:
695                 spin_lock(&exp->exp_obd->obd_osfs_lock);
696                 if (oa)
697                         filter_grant_incoming(exp, oa);
698                 spin_unlock(&exp->exp_obd->obd_osfs_lock);
699         default: ;
700         }
701         pop_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL);
702         return rc;
703 }
704
705 int filter_preprw(int cmd, struct obd_export *exp, struct obdo *oa,
706                   int objcount, struct obd_ioobj *obj, int niocount,
707                   struct niobuf_remote *nb, struct niobuf_local *res,
708                   struct obd_trans_info *oti)
709 {
710         if (cmd == OBD_BRW_WRITE)
711                 return filter_preprw_write(cmd, exp, oa, objcount, obj,
712                                            niocount, nb, res, oti);
713
714         if (cmd == OBD_BRW_READ)
715                 return filter_preprw_read(cmd, exp, oa, objcount, obj,
716                                           niocount, nb, res, oti);
717
718         LBUG();
719         return -EPROTO;
720 }
721
722 void filter_release_read_page(struct filter_obd *filter, struct inode *inode,
723                               struct page *page)
724 {
725         int drop = 0;
726
727         if (inode != NULL &&
728             (inode->i_size > filter->fo_readcache_max_filesize))
729                 drop = 1;
730
731         /* drop from cache like truncate_list_pages() */
732         if (drop && !TryLockPage(page)) {
733                 if (page->mapping)
734                         ll_truncate_complete_page(page);
735                 unlock_page(page);
736         }
737         page_cache_release(page);
738 }
739
740 static int filter_commitrw_read(struct obd_export *exp, struct obdo *oa,
741                                 int objcount, struct obd_ioobj *obj,
742                                 int niocount, struct niobuf_local *res,
743                                 struct obd_trans_info *oti, int rc)
744 {
745         struct obd_ioobj *o;
746         struct niobuf_local *lnb;
747         int i, j;
748         struct inode *inode = NULL;
749         ENTRY;
750
751         if (res->dentry != NULL)
752                 inode = res->dentry->d_inode;
753
754         for (i = 0, o = obj, lnb = res; i < objcount; i++, o++) {
755                 for (j = 0 ; j < o->ioo_bufcnt ; j++, lnb++) {
756                         if (lnb->page == NULL)
757                                 continue;
758                         filter_release_read_page(&exp->exp_obd->u.filter,
759                                                  inode, lnb->page);
760                 }
761         }
762
763         if (res->dentry != NULL)
764                 f_dput(res->dentry);
765         RETURN(rc);
766 }
767
768 void flip_into_page_cache(struct inode *inode, struct page *new_page)
769 {
770         struct page *old_page;
771         int rc;
772
773         do {
774                 /* the dlm is protecting us from read/write concurrency, so we
775                  * expect this find_lock_page to return quickly.  even if we
776                  * race with another writer it won't be doing much work with
777                  * the page locked.  we do this 'cause t_c_p expects a
778                  * locked page, and it wants to grab the pagecache lock
779                  * as well. */
780                 old_page = find_lock_page(inode->i_mapping, new_page->index);
781                 if (old_page) {
782                         ll_truncate_complete_page(old_page);
783                         unlock_page(old_page);
784                         page_cache_release(old_page);
785                 }
786
787 #if 0 /* this should be a /proc tunable someday */
788                 /* racing o_directs (no locking ioctl) could race adding
789                  * their pages, so we repeat the page invalidation unless
790                  * we successfully added our new page */
791                 rc = add_to_page_cache_unique(new_page, inode->i_mapping,
792                                               new_page->index,
793                                               page_hash(inode->i_mapping,
794                                                         new_page->index));
795                 if (rc == 0) {
796                         /* add_to_page_cache clears uptodate|dirty and locks
797                          * the page */
798                         SetPageUptodate(new_page);
799                         unlock_page(new_page);
800                 }
801 #else
802                 rc = 0;
803 #endif
804         } while (rc != 0);
805 }
806
807 void filter_grant_commit(struct obd_export *exp, int niocount,
808                          struct niobuf_local *res)
809 {
810         struct filter_obd *filter = &exp->exp_obd->u.filter;
811         struct niobuf_local *lnb = res;
812         unsigned long pending = 0;
813         int i;
814
815         spin_lock(&exp->exp_obd->obd_osfs_lock);
816         for (i = 0, lnb = res; i < niocount; i++, lnb++)
817                 pending += lnb->lnb_grant_used;
818
819         LASSERTF(exp->exp_filter_data.fed_pending >= pending,
820                  "%s: cli %s/%p fed_pending: %lu grant_used: %lu\n",
821                  exp->exp_obd->obd_name, exp->exp_client_uuid.uuid, exp,
822                  exp->exp_filter_data.fed_pending, pending);
823         exp->exp_filter_data.fed_pending -= pending;
824         LASSERTF(filter->fo_tot_granted >= pending,
825                  "%s: cli %s/%p tot_granted: "LPU64" grant_used: %lu\n",
826                  exp->exp_obd->obd_name, exp->exp_client_uuid.uuid, exp,
827                  exp->exp_obd->u.filter.fo_tot_granted, pending);
828         filter->fo_tot_granted -= pending;
829         LASSERTF(filter->fo_tot_pending >= pending,
830                  "%s: cli %s/%p tot_pending: "LPU64" grant_used: %lu\n",
831                  exp->exp_obd->obd_name, exp->exp_client_uuid.uuid, exp,
832                  filter->fo_tot_pending, pending);
833         filter->fo_tot_pending -= pending;
834
835         spin_unlock(&exp->exp_obd->obd_osfs_lock);
836 }
837 int filter_do_cow(struct obd_export *exp, struct obd_ioobj *obj,
838                   int nioo, struct niobuf_remote *rnb)
839 {
840         struct dentry *dentry;
841         struct lvfs_run_ctxt saved;
842         struct write_extents *extents = NULL;
843         int j, rc = 0, numexts = 0, flags = 0;
844
845         ENTRY;
846
847         LASSERT(nioo == 1);
848
849         push_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL);
850         
851         dentry = filter_fid2dentry(exp->exp_obd, NULL, obj->ioo_gr,
852                                    obj->ioo_id);
853         if (IS_ERR(dentry)) {
854                 pop_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL);
855                 RETURN (PTR_ERR(dentry));
856         }
857
858         if (dentry->d_inode == NULL) {
859                 CERROR("trying to write extents to non-existent file "LPU64"\n",
860                        obj->ioo_id);
861                 GOTO(cleanup, rc = -ENOENT);
862         }
863         
864         flags = fsfilt_get_fs_flags(exp->exp_obd, dentry);
865         if (!(flags & SM_DO_COW)) {
866                 GOTO(cleanup, rc);
867         }
868         OBD_ALLOC(extents, obj->ioo_bufcnt * sizeof(struct write_extents)); 
869         if (!extents) {
870                 CERROR("No Memory\n");
871                 GOTO(cleanup, rc = -ENOMEM);
872         }
873         for (j = 0; j < obj->ioo_bufcnt; j++) {
874                 if (rnb[j].len != 0) {
875                         extents[numexts].w_count = rnb[j].len;
876                         extents[numexts].w_pos = rnb[j].offset;
877                         numexts++;
878                 } 
879         } 
880         rc = fsfilt_do_write_cow(exp->exp_obd, dentry, extents, numexts);
881         if (rc) {
882                 CERROR("Do cow error id "LPU64" rc:%d \n",
883                         obj->ioo_id, rc);
884                 GOTO(cleanup, rc); 
885         }
886         
887 cleanup:
888         if (extents) {
889                 OBD_FREE(extents, obj->ioo_bufcnt * sizeof(struct write_extents));
890         }
891         f_dput(dentry);
892         pop_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL);
893         RETURN(rc);
894
895 }
896 int filter_write_extents(struct obd_export *exp, struct obd_ioobj *obj, int nobj,
897                          int niocount, struct niobuf_local *local, int rc)
898 {
899         struct lvfs_run_ctxt saved;
900         struct dentry *dentry;
901         struct niobuf_local *lnb;
902         __u64  offset = 0;
903         __u32  len = 0;
904         int    i, flags; 
905  
906         ENTRY;
907
908         LASSERT(nobj == 1);
909
910         push_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL);
911
912         dentry = filter_fid2dentry(exp->exp_obd, NULL, obj->ioo_gr,
913                                    obj->ioo_id);
914         if (IS_ERR(dentry)) {
915                 pop_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL);
916                 RETURN (PTR_ERR(dentry));
917         }
918
919         if (dentry->d_inode == NULL) {
920                 CERROR("trying to write extents to non-existent file "LPU64"\n",
921                        obj->ioo_id);
922                 GOTO(cleanup, rc = -ENOENT);
923         }
924         
925         flags = fsfilt_get_fs_flags(exp->exp_obd, dentry);
926         if (!(flags & SM_DO_REC)) {
927                 GOTO(cleanup, rc);
928         }
929
930         for (i = 0, lnb = local; i < obj->ioo_bufcnt; i++, lnb++) {
931                 if (len == 0) {
932                         offset = lnb->offset;
933                         len = lnb->len;
934                 } else if (lnb->offset == (offset + len)) {
935                         len += lnb->len;
936                 } else {
937                         rc = fsfilt_write_extents(exp->exp_obd, dentry, 
938                                                   offset, len);
939                         if (rc) {
940                                 CERROR("write exts off "LPU64" num %u rc:%d\n",
941                                         offset, len, rc);
942                                 GOTO(cleanup, rc);
943                         }
944                         offset = lnb->offset;
945                         len = lnb->len; 
946                 } 
947         }
948         if (len > 0) {
949                 rc = fsfilt_write_extents(exp->exp_obd, dentry, 
950                                           offset, len);
951                 if (rc) {
952                         CERROR("write exts off "LPU64" num %u rc:%d\n",
953                                 offset, len, rc);
954                         GOTO(cleanup, rc);
955                 }
956         }
957 cleanup:
958         f_dput(dentry);
959         pop_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL);
960         RETURN(rc);
961 }
962
963 int filter_commitrw(int cmd, struct obd_export *exp, struct obdo *oa,
964                     int objcount, struct obd_ioobj *obj, int niocount,
965                     struct niobuf_local *res, struct obd_trans_info *oti,int rc)
966 {
967         if (cmd == OBD_BRW_WRITE)
968                 return filter_commitrw_write(exp, oa, objcount, obj, niocount,
969                                              res, oti, rc);
970         if (cmd == OBD_BRW_READ)
971                 return filter_commitrw_read(exp, oa, objcount, obj, niocount,
972                                             res, oti, rc);
973         LBUG();
974         return -EPROTO;
975 }
976
977 int filter_brw(int cmd, struct obd_export *exp, struct obdo *oa,
978                struct lov_stripe_md *lsm, obd_count oa_bufs,
979                struct brw_page *pga, struct obd_trans_info *oti)
980 {
981         struct obd_ioobj ioo;
982         struct niobuf_local *lnb;
983         struct niobuf_remote *rnb;
984         obd_count i;
985         int ret = 0;
986         ENTRY;
987
988         OBD_ALLOC(lnb, oa_bufs * sizeof(struct niobuf_local));
989         OBD_ALLOC(rnb, oa_bufs * sizeof(struct niobuf_remote));
990
991         if (lnb == NULL || rnb == NULL)
992                 GOTO(out, ret = -ENOMEM);
993
994         for (i = 0; i < oa_bufs; i++) {
995                 rnb[i].offset = pga[i].off;
996                 rnb[i].len = pga[i].count;
997         }
998
999         obdo_to_ioobj(oa, &ioo);
1000         ioo.ioo_bufcnt = oa_bufs;
1001
1002         ret = filter_preprw(cmd, exp, oa, 1, &ioo, oa_bufs, rnb, lnb, oti);
1003         if (ret != 0)
1004                 GOTO(out, ret);
1005
1006         for (i = 0; i < oa_bufs; i++) {
1007                 void *virt = kmap(pga[i].pg);
1008                 obd_off off = pga[i].off & ~PAGE_MASK;
1009                 void *addr = kmap(lnb[i].page);
1010
1011                 /* 2 kmaps == vanishingly small deadlock opportunity */
1012
1013                 if (cmd & OBD_BRW_WRITE)
1014                         memcpy(addr + off, virt + off, pga[i].count);
1015                 else
1016                         memcpy(virt + off, addr + off, pga[i].count);
1017
1018                 kunmap(lnb[i].page);
1019                 kunmap(pga[i].pg);
1020         }
1021
1022         ret = filter_commitrw(cmd, exp, oa, 1, &ioo, oa_bufs, lnb, oti, ret);
1023
1024 out:
1025         if (lnb)
1026                 OBD_FREE(lnb, oa_bufs * sizeof(struct niobuf_local));
1027         if (rnb)
1028                 OBD_FREE(rnb, oa_bufs * sizeof(struct niobuf_remote));
1029         RETURN(ret);
1030 }