Whamcloud - gitweb
- landing b_fid.
[fs/lustre-release.git] / lustre / obdfilter / filter_io.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  linux/fs/obdfilter/filter_io.c
5  *
6  *  Copyright (c) 2001-2003 Cluster File Systems, Inc.
7  *   Author: Peter Braam <braam@clusterfs.com>
8  *   Author: Andreas Dilger <adilger@clusterfs.com>
9  *   Author: Phil Schwan <phil@clusterfs.com>
10  *
11  *   This file is part of Lustre, http://www.lustre.org.
12  *
13  *   Lustre is free software; you can redistribute it and/or
14  *   modify it under the terms of version 2 of the GNU General Public
15  *   License as published by the Free Software Foundation.
16  *
17  *   Lustre is distributed in the hope that it will be useful,
18  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
19  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
20  *   GNU General Public License for more details.
21  *
22  *   You should have received a copy of the GNU General Public License
23  *   along with Lustre; if not, write to the Free Software
24  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
25  */
26
27 #define DEBUG_SUBSYSTEM S_FILTER
28
29 #include <linux/config.h>
30 #include <linux/module.h>
31 #include <linux/pagemap.h> // XXX kill me soon
32 #include <linux/version.h>
33
34 #include <linux/obd_class.h>
35 #include <linux/lustre_fsfilt.h>
36 #include <linux/lustre_smfs.h>
37 #include <linux/lustre_snap.h>
38 #include "filter_internal.h"
39
40 static int filter_alloc_dio_page(struct obd_device *obd, struct inode *inode,
41                                  struct niobuf_local *lnb)
42
43 {
44         struct page *page;
45         ENTRY;
46  
47         page = alloc_pages(GFP_HIGHUSER, 0);
48         if (page == NULL) {
49                 CERROR("no memory for a temp page\n");
50                 lnb->rc = -ENOMEM;
51                 RETURN(-ENOMEM);
52         }
53
54 #if 0
55         POISON_PAGE(page, 0xf1);
56         if (lnb->len != PAGE_SIZE) {
57                 memset(kmap(page) + lnb->len, 0, PAGE_SIZE - lnb->len);
58                 kunmap(page);
59         }
60 #endif
61         page->index = lnb->offset >> PAGE_SHIFT;
62
63         lnb->page = page;
64
65         RETURN(0);
66 }
67
68 void filter_free_dio_pages(int objcount, struct obd_ioobj *obj,
69                            int niocount, struct niobuf_local *res)
70 {
71         int i, j;
72
73         for (i = 0; i < objcount; i++, obj++) {
74                 for (j = 0 ; j < obj->ioo_bufcnt ; j++, res++) {
75                         if (res->page != NULL) {
76                                 __free_page(res->page);
77                                 res->page = NULL;
78                         }
79                 }
80         }
81 }
82
83 /* Grab the dirty and seen grant announcements from the incoming obdo.
84  * We will later calculate the clients new grant and return it.
85  * Caller must hold osfs lock */
86 static void filter_grant_incoming(struct obd_export *exp, struct obdo *oa)
87 {
88         struct filter_export_data *fed;
89         struct obd_device *obd = exp->exp_obd;
90         static unsigned long last_msg;
91         static int last_count;
92         int mask = D_CACHE;
93         ENTRY;
94
95         LASSERT_SPIN_LOCKED(&obd->obd_osfs_lock);
96
97         if ((oa->o_valid & (OBD_MD_FLBLOCKS|OBD_MD_FLGRANT)) !=
98                                         (OBD_MD_FLBLOCKS|OBD_MD_FLGRANT)) {
99                 oa->o_valid &= ~OBD_MD_FLGRANT;
100                 EXIT;
101                 return;
102         }
103
104         fed = &exp->exp_filter_data;
105
106         /* Don't print this to the console the first time it happens, since
107          * it can happen legitimately on occasion, but only rarely. */
108         if (time_after(jiffies, last_msg + 60 * HZ)) {
109                 last_count = 0;
110                 last_msg = jiffies;
111         }
112         if ((last_count & (-last_count)) == last_count)
113                 mask = D_WARNING;
114         last_count++;
115
116         /* Add some margin, since there is a small race if other RPCs arrive
117          * out-or-order and have already consumed some grant.  We want to
118          * leave this here in case there is a large error in accounting. */
119         CDEBUG(oa->o_grant > fed->fed_grant + FILTER_GRANT_CHUNK ? mask:D_CACHE,
120                "%s: cli %s/%p reports grant: "LPU64" dropped: %u, local: %lu\n",
121                obd->obd_name, exp->exp_client_uuid.uuid, exp, oa->o_grant,
122                oa->o_dropped, fed->fed_grant);
123
124         /* Update our accounting now so that statfs takes it into account.
125          * Note that fed_dirty is only approximate and can become incorrect
126          * if RPCs arrive out-of-order.  No important calculations depend
127          * on fed_dirty however. */
128         obd->u.filter.fo_tot_dirty += oa->o_dirty - fed->fed_dirty;
129         if (fed->fed_grant < oa->o_dropped) {
130                 CERROR("%s: cli %s/%p reports %u dropped > fed_grant %lu\n",
131                        obd->obd_name, exp->exp_client_uuid.uuid, exp,
132                        oa->o_dropped, fed->fed_grant);
133                 oa->o_dropped = 0;
134         }
135         if (obd->u.filter.fo_tot_granted < oa->o_dropped) {
136                 CERROR("%s: cli %s/%p reports %u dropped > tot_grant "LPU64"\n",
137                        obd->obd_name, exp->exp_client_uuid.uuid, exp,
138                        oa->o_dropped, obd->u.filter.fo_tot_granted);
139                 oa->o_dropped = 0;
140         }
141         obd->u.filter.fo_tot_granted -= oa->o_dropped;
142         fed->fed_grant -= oa->o_dropped;
143         fed->fed_dirty = oa->o_dirty;
144         EXIT;
145 }
146
147 #define GRANT_FOR_LLOG(obd) 16
148
149 /* Figure out how much space is available between what we've granted
150  * and what remains in the filesystem.  Compensate for ext3 indirect
151  * block overhead when computing how much free space is left ungranted.
152  *
153  * Caller must hold obd_osfs_lock. */
154 obd_size filter_grant_space_left(struct obd_export *exp)
155 {
156         struct obd_device *obd = exp->exp_obd;
157         int blockbits = obd->u.filter.fo_sb->s_blocksize_bits;
158         obd_size tot_granted = obd->u.filter.fo_tot_granted, avail, left = 0;
159         int rc, statfs_done = 0;
160
161         LASSERT_SPIN_LOCKED(&obd->obd_osfs_lock);
162
163         if (time_before(obd->obd_osfs_age, jiffies - HZ)) {
164 restat:
165                 rc = fsfilt_statfs(obd, obd->u.filter.fo_sb, jiffies + 1);
166                 if (rc) /* N.B. statfs can't really fail */
167                         RETURN(0);
168                 statfs_done = 1;
169         }
170
171         avail = obd->obd_osfs.os_bavail;
172         left = avail - (avail >> (blockbits - 3)); /* (d)indirect */
173         if (left > GRANT_FOR_LLOG(obd)) {
174                 left = (left - GRANT_FOR_LLOG(obd)) << blockbits;
175         } else {
176                 left = 0 /* << blockbits */;
177         }
178
179         if (!statfs_done && left < 32 * FILTER_GRANT_CHUNK + tot_granted) {
180                 CDEBUG(D_CACHE, "fs has no space left and statfs too old\n");
181                 goto restat;
182         }
183
184         if (left >= tot_granted) {
185                 left -= tot_granted;
186         } else {
187                 static unsigned long next;
188                 if (left < tot_granted - obd->u.filter.fo_tot_pending &&
189                     time_after(jiffies, next)) {
190                         spin_unlock(&obd->obd_osfs_lock);
191                         CERROR("%s: cli %s/%p grant "LPU64" > available "
192                                LPU64" and pending "LPU64"\n", obd->obd_name,
193                                exp->exp_client_uuid.uuid, exp, tot_granted,
194                                left, obd->u.filter.fo_tot_pending);
195                         if (next == 0)
196                                 portals_debug_dumplog();
197                         next = jiffies + 20 * HZ;
198                         spin_lock(&obd->obd_osfs_lock);
199                 }
200                 left = 0;
201         }
202
203         CDEBUG(D_CACHE, "%s: cli %s/%p free: "LPU64" avail: "LPU64" grant "LPU64
204                " left: "LPU64" pending: "LPU64"\n", obd->obd_name,
205                exp->exp_client_uuid.uuid, exp,
206                obd->obd_osfs.os_bfree << blockbits, avail << blockbits,
207                tot_granted, left, obd->u.filter.fo_tot_pending);
208
209         return left;
210 }
211
212 /* Calculate how much grant space to allocate to this client, based on how
213  * much space is currently free and how much of that is already granted.
214  *
215  * Caller must hold obd_osfs_lock. */
216 long filter_grant(struct obd_export *exp, obd_size current_grant,
217                   obd_size want, obd_size fs_space_left)
218 {
219         struct obd_device *obd = exp->exp_obd;
220         struct filter_export_data *fed = &exp->exp_filter_data;
221         int blockbits = obd->u.filter.fo_sb->s_blocksize_bits;
222         __u64 grant = 0;
223
224         LASSERT_SPIN_LOCKED(&obd->obd_osfs_lock);
225
226         /* Grant some fraction of the client's requested grant space so that
227          * they are not always waiting for write credits (not all of it to
228          * avoid overgranting in face of multiple RPCs in flight).  This
229          * essentially will be able to control the OSC_MAX_RIF for a client.
230          *
231          * If we do have a large disparity between what the client thinks it
232          * has and what we think it has, don't grant very much and let the
233          * client consume its grant first.  Either it just has lots of RPCs
234          * in flight, or it was evicted and its grants will soon be used up. */
235         if (current_grant < want &&
236             current_grant < fed->fed_grant + FILTER_GRANT_CHUNK) {
237                 grant = min((want >> blockbits) / 2,
238                             (fs_space_left >> blockbits) / 8);
239                 grant <<= blockbits;
240
241                 if (grant) {
242                         if (grant > FILTER_GRANT_CHUNK)
243                                 grant = FILTER_GRANT_CHUNK;
244
245                         obd->u.filter.fo_tot_granted += grant;
246                         fed->fed_grant += grant;
247                 }
248         }
249
250         CDEBUG(D_CACHE,"%s: cli %s/%p wants: "LPU64" granting: "LPU64"\n",
251                obd->obd_name, exp->exp_client_uuid.uuid, exp, want, grant);
252         CDEBUG(D_CACHE,
253                "%s: cli %s/%p tot cached:"LPU64" granted:"LPU64
254                " num_exports: %d\n", obd->obd_name, exp->exp_client_uuid.uuid,
255                exp, obd->u.filter.fo_tot_dirty,
256                obd->u.filter.fo_tot_granted, obd->obd_num_exports);
257
258         return grant;
259 }
260
261
262 static int filter_preprw_read(int cmd, struct obd_export *exp, struct obdo *oa,
263                               int objcount, struct obd_ioobj *obj,
264                               int niocount, struct niobuf_remote *nb,
265                               struct niobuf_local *res,
266                               struct obd_trans_info *oti)
267 {
268         struct obd_device *obd = exp->exp_obd;
269         struct lvfs_run_ctxt saved;
270         struct niobuf_remote *rnb;
271         struct niobuf_local *lnb;
272         struct dentry *dentry = NULL;
273         struct inode *inode;
274         void *iobuf = NULL; 
275         int rc = 0, i, tot_bytes = 0;
276         unsigned long now = jiffies;
277         ENTRY;
278
279         /* We are currently not supporting multi-obj BRW_READ RPCS at all.
280          * When we do this function's dentry cleanup will need to be fixed */
281         LASSERTF(objcount == 1, "%d\n", objcount);
282         LASSERTF(obj->ioo_bufcnt > 0, "%d\n", obj->ioo_bufcnt);
283
284         if (oa && oa->o_valid & OBD_MD_FLGRANT) {
285                 spin_lock(&obd->obd_osfs_lock);
286                 filter_grant_incoming(exp, oa);
287
288                 oa->o_grant = 0;
289                 
290                 spin_unlock(&obd->obd_osfs_lock);
291         }
292
293         memset(res, 0, niocount * sizeof(*res));
294
295         push_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL);
296         rc = filter_alloc_iobuf(OBD_BRW_READ, obj->ioo_bufcnt, &iobuf);
297         if (rc)
298                 GOTO(cleanup, rc);
299
300         dentry = filter_oa2dentry(obd, oa);
301         if (IS_ERR(dentry))
302                 GOTO(cleanup, rc = PTR_ERR(dentry));
303
304         if (dentry->d_inode == NULL) {
305                 CERROR("trying to BRW to non-existent file "LPU64"\n",
306                                obj->ioo_id);
307                 GOTO(cleanup, rc = -ENOENT);
308         }
309
310         inode = dentry->d_inode; 
311
312         if (time_after(jiffies, now + 15 * HZ))
313                 CERROR("slow preprw_read setup %lus\n", (jiffies - now) / HZ);
314         else
315                 CDEBUG(D_INFO, "preprw_read setup: %lu jiffies\n",
316                        (jiffies - now));
317
318         for (i = 0, lnb = res, rnb = nb; i < obj->ioo_bufcnt;
319              i++, rnb++, lnb++) {
320                 lnb->dentry = dentry;
321                 lnb->offset = rnb->offset;
322                 lnb->len    = rnb->len;
323                 lnb->flags  = rnb->flags;
324
325                 if (inode->i_size <= rnb->offset)
326                       /* If there's no more data, abort early.
327                       * lnb->page == NULL and lnb->rc == 0, so it's
328                       * easy to detect later. */
329                         break;
330                 else
331                         rc = filter_alloc_dio_page(obd, inode, lnb);
332                 if (rc) {
333                         CDEBUG(rc == -ENOSPC ? D_INODE : D_ERROR,
334                              "page err %u@"LPU64" %u/%u %p: rc %d\n",
335                               lnb->len, lnb->offset, i, obj->ioo_bufcnt,
336                               dentry, rc);
337                         GOTO(cleanup, rc);
338                 }
339
340                 if (inode->i_size < lnb->offset + lnb->len - 1)
341                         lnb->rc = inode->i_size - lnb->offset;
342                 else
343                         lnb->rc = lnb->len;
344
345                 tot_bytes += lnb->rc;
346
347                 filter_iobuf_add_page(obd, iobuf, inode, lnb->page);
348         }
349
350         if (time_after(jiffies, now + 15 * HZ))
351                 CERROR("slow start_page_read %lus\n", (jiffies - now) / HZ);
352         else
353                 CDEBUG(D_INFO, "start_page_read: %lu jiffies\n",
354                        (jiffies - now));
355
356         rc = filter_direct_io(OBD_BRW_READ, dentry, iobuf, exp,
357                               NULL, NULL, NULL);
358         if (rc)
359                 GOTO(cleanup, rc);
360
361         lprocfs_counter_add(obd->obd_stats, LPROC_FILTER_READ_BYTES, tot_bytes);
362
363         filter_tally_read(&exp->exp_obd->u.filter, res, niocount);
364
365         EXIT;
366
367 cleanup:
368         if (rc != 0) {
369                 filter_free_dio_pages(objcount, obj, niocount, res);
370
371                 if (dentry != NULL)
372                         f_dput(dentry);
373                 else
374                         CERROR("NULL dentry in cleanup -- tell CFS\n");
375         }
376
377         if (iobuf != NULL)
378                 filter_free_iobuf(iobuf);
379
380         pop_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL);
381         if (rc)
382                 CERROR("io error %d\n", rc);
383         return rc;
384 }
385
386 /* When clients have dirtied as much space as they've been granted they
387  * fall through to sync writes.  These sync writes haven't been expressed
388  * in grants and need to error with ENOSPC when there isn't room in the
389  * filesystem for them after grants are taken into account.  However,
390  * writeback of the dirty data that was already granted space can write
391  * right on through.
392  *
393  * Caller must hold obd_osfs_lock. */
394 static int filter_grant_check(struct obd_export *exp, int objcount,
395                               struct fsfilt_objinfo *fso, int niocount,
396                               struct niobuf_remote *rnb,
397                               struct niobuf_local *lnb, obd_size *left,
398                               struct inode *inode)
399 {
400         struct filter_export_data *fed = &exp->exp_filter_data;
401         int blocksize = exp->exp_obd->u.filter.fo_sb->s_blocksize;
402         unsigned long used = 0, ungranted = 0, using;
403         int i, rc = -ENOSPC, obj, n = 0, mask = D_CACHE;
404
405         LASSERT_SPIN_LOCKED(&exp->exp_obd->obd_osfs_lock);
406
407         for (obj = 0; obj < objcount; obj++) {
408                 for (i = 0; i < fso[obj].fso_bufcnt; i++, n++) {
409                         int tmp, bytes;
410
411                         /* FIXME: this is calculated with PAGE_SIZE on client */
412                         bytes = rnb[n].len;
413                         bytes += rnb[n].offset & (blocksize - 1);
414                         tmp = (rnb[n].offset + rnb[n].len) & (blocksize - 1);
415                         if (tmp)
416                                 bytes += blocksize - tmp;
417
418                         if (rnb[n].flags & OBD_BRW_FROM_GRANT) {
419                                 if (fed->fed_grant < used + bytes) {
420                                         CDEBUG(D_CACHE,
421                                                "%s: cli %s/%p claims %ld+%d "
422                                                "GRANT, real grant %lu idx %d\n",
423                                                exp->exp_obd->obd_name,
424                                                exp->exp_client_uuid.uuid, exp,
425                                                used, bytes, fed->fed_grant, n);
426                                         mask = D_ERROR;
427                                 } else {
428                                         used += bytes;
429                                         rnb[n].flags |= OBD_BRW_GRANTED;
430                                         lnb[n].lnb_grant_used = bytes;
431                                         CDEBUG(0, "idx %d used=%lu\n", n, used);
432                                         rc = 0;
433                                         continue;
434                                 }
435                         }
436                         if (*left > ungranted) {
437                                 /* if enough space, pretend it was granted */
438                                 ungranted += bytes;
439                                 rnb[n].flags |= OBD_BRW_GRANTED;
440                                 CDEBUG(0, "idx %d ungranted=%lu\n",n,ungranted);
441                                 rc = 0;
442                                 continue;
443                         }
444
445                         /* We can't check for already-mapped blocks here, as
446                          * it requires dropping the osfs lock to do the bmap.
447                          * Instead, we return ENOSPC and in that case we need
448                          * to go through and verify if all of the blocks not
449                          * marked BRW_GRANTED are already mapped and we can
450                          * ignore this error. */
451                         lnb[n].rc = -ENOSPC;
452                         rnb[n].flags &= OBD_BRW_GRANTED;
453                         CDEBUG(D_CACHE,"%s: cli %s/%p idx %d no space for %d\n",
454                                exp->exp_obd->obd_name,
455                                exp->exp_client_uuid.uuid, exp, n, bytes);
456                 }
457         }
458
459         /* Now substract what client have used already.  We don't subtract
460          * this from the tot_granted yet, so that other client's can't grab
461          * that space before we have actually allocated our blocks.  That
462          * happens in filter_grant_commit() after the writes are done. */
463         *left -= ungranted;
464         fed->fed_grant -= used;
465         fed->fed_pending += used;
466         exp->exp_obd->u.filter.fo_tot_pending += used;
467
468         CDEBUG(mask,
469                "%s: cli %s/%p used: %lu ungranted: %lu grant: %lu dirty: %lu\n",
470                exp->exp_obd->obd_name, exp->exp_client_uuid.uuid, exp, used,
471                ungranted, fed->fed_grant, fed->fed_dirty);
472
473         /* Rough calc in case we don't refresh cached statfs data */
474         using = (used + ungranted + 1 ) >>
475                 exp->exp_obd->u.filter.fo_sb->s_blocksize_bits;
476         if (exp->exp_obd->obd_osfs.os_bavail > using)
477                 exp->exp_obd->obd_osfs.os_bavail -= using;
478         else
479                 exp->exp_obd->obd_osfs.os_bavail = 0;
480
481         if (fed->fed_dirty < used) {
482                 CERROR("%s: cli %s/%p claims used %lu > fed_dirty %lu\n",
483                        exp->exp_obd->obd_name, exp->exp_client_uuid.uuid, exp,
484                        used, fed->fed_dirty);
485                 used = fed->fed_dirty;
486         }
487         exp->exp_obd->u.filter.fo_tot_dirty -= used;
488         fed->fed_dirty -= used;
489
490         return rc;
491 }
492
493 /* If we ever start to support multi-object BRW RPCs, we will need to get locks
494  * on mulitple inodes.  That isn't all, because there still exists the
495  * possibility of a truncate starting a new transaction while holding the ext3
496  * rwsem = write while some writes (which have started their transactions here)
497  * blocking on the ext3 rwsem = read => lock inversion.
498  *
499  * The handling gets very ugly when dealing with locked pages.  It may be easier
500  * to just get rid of the locked page code (which has problems of its own) and
501  * either discover we do not need it anymore (i.e. it was a symptom of another
502  * bug) or ensure we get the page locks in an appropriate order. */
503 static int filter_preprw_write(int cmd, struct obd_export *exp, struct obdo *oa,
504                                int objcount, struct obd_ioobj *obj,
505                                int niocount, struct niobuf_remote *nb,
506                                struct niobuf_local *res,
507                                struct obd_trans_info *oti)
508 {
509         struct lvfs_run_ctxt saved;
510         struct niobuf_remote *rnb;
511         struct niobuf_local *lnb = res;
512         struct fsfilt_objinfo fso;
513         struct dentry *dentry = NULL;
514         void *iobuf; 
515         obd_size left;
516         unsigned long now = jiffies;
517         int rc = 0, i, tot_bytes = 0, cleanup_phase = 0;
518         ENTRY;
519         LASSERT(objcount == 1);
520         LASSERT(obj->ioo_bufcnt > 0);
521
522         memset(res, 0, niocount * sizeof(*res));
523
524         rc = filter_alloc_iobuf(OBD_BRW_READ, obj->ioo_bufcnt, &iobuf);
525         if (rc)
526                 GOTO(cleanup, rc);
527         cleanup_phase = 1;
528
529         push_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL);
530         dentry = filter_id2dentry(exp->exp_obd, NULL, obj->ioo_gr,
531                                   obj->ioo_id);
532         if (IS_ERR(dentry))
533                 GOTO(cleanup, rc = PTR_ERR(dentry));
534         
535         cleanup_phase = 2;
536         
537         if (dentry->d_inode == NULL) {
538                 CERROR("trying to BRW to non-existent file "LPU64"\n",
539                        obj->ioo_id);
540                 GOTO(cleanup, rc = -ENOENT);
541         }
542
543         fso.fso_dentry = dentry;
544         fso.fso_bufcnt = obj->ioo_bufcnt;
545
546         if (time_after(jiffies, now + 15 * HZ))
547                 CERROR("slow preprw_write setup %lus\n", (jiffies - now) / HZ);
548         else
549                 CDEBUG(D_INFO, "preprw_write setup: %lu jiffies\n",
550                        (jiffies - now));
551
552         spin_lock(&exp->exp_obd->obd_osfs_lock);
553         if (oa)
554                 filter_grant_incoming(exp, oa);
555         
556         cleanup_phase = 3;
557
558         left = filter_grant_space_left(exp);
559
560         rc = filter_grant_check(exp, objcount, &fso, niocount, nb, res,
561                                 &left, dentry->d_inode);
562         if (oa && oa->o_valid & OBD_MD_FLGRANT)
563                 oa->o_grant = filter_grant(exp,oa->o_grant,oa->o_undirty,left);
564
565         spin_unlock(&exp->exp_obd->obd_osfs_lock);
566
567         if (rc) 
568                 GOTO(cleanup, rc);
569
570         for (i = 0, rnb = nb, lnb = res; i < obj->ioo_bufcnt;
571              i++, lnb++, rnb++) {
572                 /* We still set up for ungranted pages so that granted pages
573                  * can be written to disk as they were promised, and portals
574                  * needs to keep the pages all aligned properly. */
575                 lnb->dentry = dentry;
576                 lnb->offset = rnb->offset;
577                 lnb->len    = rnb->len;
578                 lnb->flags  = rnb->flags;
579
580                 rc = filter_alloc_dio_page(exp->exp_obd, dentry->d_inode,lnb);
581                 if (rc) {
582                         CERROR("page err %u@"LPU64" %u/%u %p: rc %d\n",
583                                lnb->len, lnb->offset,
584                                i, obj->ioo_bufcnt, dentry, rc);
585                         GOTO(cleanup, rc);
586                 }
587                 cleanup_phase = 4;
588
589                 /* If the filter writes a partial page, then has the file
590                  * extended, the client will read in the whole page.  the
591                  * filter has to be careful to zero the rest of the partial
592                  * page on disk.  we do it by hand for partial extending
593                  * writes, send_bio() is responsible for zeroing pages when
594                  * asked to read unmapped blocks -- brw_kiovec() does this. */
595                 if (lnb->len != PAGE_SIZE) {
596                         if (lnb->offset + lnb->len < dentry->d_inode->i_size) {
597                                 filter_iobuf_add_page(exp->exp_obd, iobuf,
598                                                       dentry->d_inode,
599                                                       lnb->page);
600                         } else {
601                                 memset(kmap(lnb->page) + lnb->len, 0,
602                                        PAGE_SIZE - lnb->len);
603                                 kunmap(lnb->page);
604                         }
605                 }
606                 if (lnb->rc == 0)
607                         tot_bytes += lnb->len;
608         }
609
610         rc = filter_direct_io(OBD_BRW_READ, dentry, iobuf, exp,
611                               NULL, NULL, NULL);
612         
613         if (time_after(jiffies, now + 15 * HZ))
614                 CERROR("slow start_page_write %lus\n", (jiffies - now) / HZ);
615         else
616                 CDEBUG(D_INFO, "start_page_write: %lu jiffies\n",
617                        (jiffies - now));
618
619         lprocfs_counter_add(exp->exp_obd->obd_stats, LPROC_FILTER_WRITE_BYTES,
620                             tot_bytes);
621         EXIT;
622 cleanup:
623         switch(cleanup_phase) {
624         case 4:
625                 if (rc)
626                         filter_free_dio_pages(objcount, obj, niocount, res);
627         case 3:
628                 pop_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL);
629                 filter_free_iobuf(iobuf);
630         case 2:
631                 if (rc)
632                         f_dput(dentry);
633                 break;
634         case 1:
635                 spin_lock(&exp->exp_obd->obd_osfs_lock);
636                 if (oa)
637                         filter_grant_incoming(exp, oa);
638                 spin_unlock(&exp->exp_obd->obd_osfs_lock);
639                 pop_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL);
640                 filter_free_iobuf(iobuf);
641                 break;
642         default:;
643         
644         }
645         RETURN(rc);
646 }
647
648 int filter_preprw(int cmd, struct obd_export *exp, struct obdo *oa,
649                   int objcount, struct obd_ioobj *obj, int niocount,
650                   struct niobuf_remote *nb, struct niobuf_local *res,
651                   struct obd_trans_info *oti)
652 {
653         if (cmd == OBD_BRW_WRITE)
654                 return filter_preprw_write(cmd, exp, oa, objcount, obj,
655                                            niocount, nb, res, oti);
656
657         if (cmd == OBD_BRW_READ)
658                 return filter_preprw_read(cmd, exp, oa, objcount, obj,
659                                           niocount, nb, res, oti);
660
661         LBUG();
662         return -EPROTO;
663 }
664
665 void filter_release_read_page(struct filter_obd *filter, struct inode *inode,
666                               struct page *page)
667 {
668         int drop = 0;
669
670         if (inode != NULL &&
671             (inode->i_size > filter->fo_readcache_max_filesize))
672                 drop = 1;
673
674         /* drop from cache like truncate_list_pages() */
675         if (drop && !TryLockPage(page)) {
676                 if (page->mapping)
677                         ll_truncate_complete_page(page);
678                 unlock_page(page);
679         }
680         page_cache_release(page);
681 }
682
683 static int filter_commitrw_read(struct obd_export *exp, struct obdo *oa,
684                                 int objcount, struct obd_ioobj *obj,
685                                 int niocount, struct niobuf_local *res,
686                                 struct obd_trans_info *oti, int rc)
687 {
688         struct inode *inode = NULL;
689         ENTRY;
690
691         if (res->dentry != NULL)
692                 inode = res->dentry->d_inode;
693
694         filter_free_dio_pages(objcount, obj, niocount, res);
695         
696         if (res->dentry != NULL)
697                 f_dput(res->dentry);
698         RETURN(rc);
699 }
700
701 void flip_into_page_cache(struct inode *inode, struct page *new_page)
702 {
703         struct page *old_page;
704         int rc;
705
706         do {
707                 /* the dlm is protecting us from read/write concurrency, so we
708                  * expect this find_lock_page to return quickly.  even if we
709                  * race with another writer it won't be doing much work with
710                  * the page locked.  we do this 'cause t_c_p expects a
711                  * locked page, and it wants to grab the pagecache lock
712                  * as well. */
713                 old_page = find_lock_page(inode->i_mapping, new_page->index);
714                 if (old_page) {
715                         ll_truncate_complete_page(old_page);
716                         unlock_page(old_page);
717                         page_cache_release(old_page);
718                 }
719
720 #if 0 /* this should be a /proc tunable someday */
721                 /* racing o_directs (no locking ioctl) could race adding
722                  * their pages, so we repeat the page invalidation unless
723                  * we successfully added our new page */
724                 rc = add_to_page_cache_unique(new_page, inode->i_mapping,
725                                               new_page->index,
726                                               page_hash(inode->i_mapping,
727                                                         new_page->index));
728                 if (rc == 0) {
729                         /* add_to_page_cache clears uptodate|dirty and locks
730                          * the page */
731                         SetPageUptodate(new_page);
732                         unlock_page(new_page);
733                 }
734 #else
735                 rc = 0;
736 #endif
737         } while (rc != 0);
738 }
739
740 void filter_grant_commit(struct obd_export *exp, int niocount,
741                          struct niobuf_local *res)
742 {
743         struct filter_obd *filter = &exp->exp_obd->u.filter;
744         struct niobuf_local *lnb = res;
745         unsigned long pending = 0;
746         int i;
747
748         spin_lock(&exp->exp_obd->obd_osfs_lock);
749         for (i = 0, lnb = res; i < niocount; i++, lnb++)
750                 pending += lnb->lnb_grant_used;
751
752         LASSERTF(exp->exp_filter_data.fed_pending >= pending,
753                  "%s: cli %s/%p fed_pending: %lu grant_used: %lu\n",
754                  exp->exp_obd->obd_name, exp->exp_client_uuid.uuid, exp,
755                  exp->exp_filter_data.fed_pending, pending);
756         exp->exp_filter_data.fed_pending -= pending;
757         LASSERTF(filter->fo_tot_granted >= pending,
758                  "%s: cli %s/%p tot_granted: "LPU64" grant_used: %lu\n",
759                  exp->exp_obd->obd_name, exp->exp_client_uuid.uuid, exp,
760                  exp->exp_obd->u.filter.fo_tot_granted, pending);
761         filter->fo_tot_granted -= pending;
762         LASSERTF(filter->fo_tot_pending >= pending,
763                  "%s: cli %s/%p tot_pending: "LPU64" grant_used: %lu\n",
764                  exp->exp_obd->obd_name, exp->exp_client_uuid.uuid, exp,
765                  filter->fo_tot_pending, pending);
766         filter->fo_tot_pending -= pending;
767
768         spin_unlock(&exp->exp_obd->obd_osfs_lock);
769 }
770 int filter_do_cow(struct obd_export *exp, struct obd_ioobj *obj,
771                   int nioo, struct niobuf_remote *rnb)
772 {
773         struct dentry *dentry;
774         struct lvfs_run_ctxt saved;
775         struct write_extents *extents = NULL;
776         int j, rc = 0, numexts = 0, flags = 0;
777
778         ENTRY;
779
780         LASSERT(nioo == 1);
781
782         push_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL);
783         
784         dentry = filter_id2dentry(exp->exp_obd, NULL, obj->ioo_gr,
785                                   obj->ioo_id);
786         if (IS_ERR(dentry)) {
787                 pop_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL);
788                 RETURN (PTR_ERR(dentry));
789         }
790
791         if (dentry->d_inode == NULL) {
792                 CERROR("trying to write extents to non-existent file "LPU64"\n",
793                        obj->ioo_id);
794                 GOTO(cleanup, rc = -ENOENT);
795         }
796         
797         flags = fsfilt_get_fs_flags(exp->exp_obd, dentry);
798         if (!(flags & SM_DO_COW)) {
799                 GOTO(cleanup, rc);
800         }
801         OBD_ALLOC(extents, obj->ioo_bufcnt * sizeof(struct write_extents)); 
802         if (!extents) {
803                 CERROR("No Memory\n");
804                 GOTO(cleanup, rc = -ENOMEM);
805         }
806         for (j = 0; j < obj->ioo_bufcnt; j++) {
807                 if (rnb[j].len != 0) {
808                         extents[numexts].w_count = rnb[j].len;
809                         extents[numexts].w_pos = rnb[j].offset;
810                         numexts++;
811                 } 
812         } 
813         rc = fsfilt_do_write_cow(exp->exp_obd, dentry, extents, numexts);
814         if (rc) {
815                 CERROR("Do cow error id "LPU64" rc:%d \n",
816                         obj->ioo_id, rc);
817                 GOTO(cleanup, rc); 
818         }
819         
820 cleanup:
821         if (extents) {
822                 OBD_FREE(extents, obj->ioo_bufcnt * sizeof(struct write_extents));
823         }
824         f_dput(dentry);
825         pop_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL);
826         RETURN(rc);
827
828 }
829 int filter_write_extents(struct obd_export *exp, struct obd_ioobj *obj, int nobj,
830                          int niocount, struct niobuf_local *local, int rc)
831 {
832         struct lvfs_run_ctxt saved;
833         struct dentry *dentry;
834         struct niobuf_local *lnb;
835         __u64  offset = 0;
836         __u32  len = 0;
837         int    i, flags; 
838  
839         ENTRY;
840
841         LASSERT(nobj == 1);
842
843         push_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL);
844         dentry = filter_id2dentry(exp->exp_obd, NULL, obj->ioo_gr,
845                                   obj->ioo_id);
846         if (IS_ERR(dentry)) {
847                 pop_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL);
848                 RETURN (PTR_ERR(dentry));
849         }
850
851         if (dentry->d_inode == NULL) {
852                 CERROR("trying to write extents to non-existent file "LPU64"\n",
853                        obj->ioo_id);
854                 GOTO(cleanup, rc = -ENOENT);
855         }
856         
857         flags = fsfilt_get_fs_flags(exp->exp_obd, dentry);
858         if (!(flags & SM_DO_REC)) {
859                 GOTO(cleanup, rc);
860         }
861
862         for (i = 0, lnb = local; i < obj->ioo_bufcnt; i++, lnb++) {
863                 if (len == 0) {
864                         offset = lnb->offset;
865                         len = lnb->len;
866                 } else if (lnb->offset == (offset + len)) {
867                         len += lnb->len;
868                 } else {
869                         rc = fsfilt_write_extents(exp->exp_obd, dentry, 
870                                                   offset, len);
871                         if (rc) {
872                                 CERROR("write exts off "LPU64" num %u rc:%d\n",
873                                         offset, len, rc);
874                                 GOTO(cleanup, rc);
875                         }
876                         offset = lnb->offset;
877                         len = lnb->len; 
878                 } 
879         }
880         if (len > 0) {
881                 rc = fsfilt_write_extents(exp->exp_obd, dentry, 
882                                           offset, len);
883                 if (rc) {
884                         CERROR("write exts off "LPU64" num %u rc:%d\n",
885                                 offset, len, rc);
886                         GOTO(cleanup, rc);
887                 }
888         }
889 cleanup:
890         f_dput(dentry);
891         pop_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL);
892         RETURN(rc);
893 }
894
895 int filter_commitrw(int cmd, struct obd_export *exp, struct obdo *oa,
896                     int objcount, struct obd_ioobj *obj, int niocount,
897                     struct niobuf_local *res, struct obd_trans_info *oti,int rc)
898 {
899         if (cmd == OBD_BRW_WRITE)
900                 return filter_commitrw_write(exp, oa, objcount, obj, niocount,
901                                              res, oti, rc);
902         if (cmd == OBD_BRW_READ)
903                 return filter_commitrw_read(exp, oa, objcount, obj, niocount,
904                                             res, oti, rc);
905         LBUG();
906         return -EPROTO;
907 }
908
909 int filter_brw(int cmd, struct obd_export *exp, struct obdo *oa,
910                struct lov_stripe_md *lsm, obd_count oa_bufs,
911                struct brw_page *pga, struct obd_trans_info *oti)
912 {
913         struct obd_ioobj ioo;
914         struct niobuf_local *lnb;
915         struct niobuf_remote *rnb;
916         obd_count i;
917         int ret = 0;
918         ENTRY;
919
920         OBD_ALLOC(lnb, oa_bufs * sizeof(struct niobuf_local));
921         OBD_ALLOC(rnb, oa_bufs * sizeof(struct niobuf_remote));
922
923         if (lnb == NULL || rnb == NULL)
924                 GOTO(out, ret = -ENOMEM);
925
926         for (i = 0; i < oa_bufs; i++) {
927                 rnb[i].offset = pga[i].disk_offset;
928                 rnb[i].len = pga[i].count;
929         }
930
931         obdo_to_ioobj(oa, &ioo);
932         ioo.ioo_bufcnt = oa_bufs;
933
934         ret = filter_preprw(cmd, exp, oa, 1, &ioo, oa_bufs, rnb, lnb, oti);
935         if (ret != 0)
936                 GOTO(out, ret);
937
938         for (i = 0; i < oa_bufs; i++) {
939                 void *virt;
940                 obd_off off;
941                 void *addr;
942
943                 if (lnb[i].page == NULL)
944                         break;
945
946                 off = pga[i].disk_offset & ~PAGE_MASK;
947                 virt = kmap(pga[i].pg);
948                 addr = kmap(lnb[i].page);
949
950                 /* 2 kmaps == vanishingly small deadlock opportunity */
951
952                 if (cmd & OBD_BRW_WRITE)
953                         memcpy(addr + off, virt + off, pga[i].count);
954                 else
955                         memcpy(virt + off, addr + off, pga[i].count);
956
957                 kunmap(lnb[i].page);
958                 kunmap(pga[i].pg);
959         }
960
961         ret = filter_commitrw(cmd, exp, oa, 1, &ioo, oa_bufs, lnb, oti, ret);
962
963 out:
964         if (lnb)
965                 OBD_FREE(lnb, oa_bufs * sizeof(struct niobuf_local));
966         if (rnb)
967                 OBD_FREE(rnb, oa_bufs * sizeof(struct niobuf_remote));
968         RETURN(ret);
969 }