Whamcloud - gitweb
- unland b_fid to HEAD
[fs/lustre-release.git] / lustre / obdfilter / filter_io.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  linux/fs/obdfilter/filter_io.c
5  *
6  *  Copyright (c) 2001-2003 Cluster File Systems, Inc.
7  *   Author: Peter Braam <braam@clusterfs.com>
8  *   Author: Andreas Dilger <adilger@clusterfs.com>
9  *   Author: Phil Schwan <phil@clusterfs.com>
10  *
11  *   This file is part of Lustre, http://www.lustre.org.
12  *
13  *   Lustre is free software; you can redistribute it and/or
14  *   modify it under the terms of version 2 of the GNU General Public
15  *   License as published by the Free Software Foundation.
16  *
17  *   Lustre is distributed in the hope that it will be useful,
18  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
19  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
20  *   GNU General Public License for more details.
21  *
22  *   You should have received a copy of the GNU General Public License
23  *   along with Lustre; if not, write to the Free Software
24  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
25  */
26
27 #define DEBUG_SUBSYSTEM S_FILTER
28
29 #include <linux/config.h>
30 #include <linux/module.h>
31 #include <linux/pagemap.h> // XXX kill me soon
32 #include <linux/version.h>
33
34 #include <linux/obd_class.h>
35 #include <linux/lustre_fsfilt.h>
36 #include <linux/lustre_smfs.h>
37 #include <linux/lustre_snap.h>
38 #include "filter_internal.h"
39
40 static int filter_alloc_dio_page(struct obd_device *obd, struct inode *inode,
41                                  struct niobuf_local *lnb)
42
43 {
44         struct page *page;
45         ENTRY;
46  
47         page = alloc_pages(GFP_HIGHUSER, 0);
48         if (page == NULL) {
49                 CERROR("no memory for a temp page\n");
50                 lnb->rc = -ENOMEM;
51                 RETURN(-ENOMEM);
52         }
53
54 #if 0
55         POISON_PAGE(page, 0xf1);
56         if (lnb->len != PAGE_SIZE) {
57                 memset(kmap(page) + lnb->len, 0, PAGE_SIZE - lnb->len);
58                 kunmap(page);
59         }
60 #endif
61         page->index = lnb->offset >> PAGE_SHIFT;
62
63         lnb->page = page;
64
65         RETURN(0);
66 }
67
68 void filter_free_dio_pages(int objcount, struct obd_ioobj *obj,
69                            int niocount, struct niobuf_local *res)
70 {
71         int i, j;
72
73         for (i = 0; i < objcount; i++, obj++) {
74                 for (j = 0 ; j < obj->ioo_bufcnt ; j++, res++) {
75                         if (res->page != NULL) {
76                                 __free_page(res->page);
77                                 res->page = NULL;
78                         }
79                 }
80         }
81 }
82
83 /* Grab the dirty and seen grant announcements from the incoming obdo.
84  * We will later calculate the clients new grant and return it.
85  * Caller must hold osfs lock */
86 static void filter_grant_incoming(struct obd_export *exp, struct obdo *oa)
87 {
88         struct filter_export_data *fed;
89         struct obd_device *obd = exp->exp_obd;
90         static unsigned long last_msg;
91         static int last_count;
92         int mask = D_CACHE;
93         ENTRY;
94
95         LASSERT_SPIN_LOCKED(&obd->obd_osfs_lock);
96
97         if ((oa->o_valid & (OBD_MD_FLBLOCKS|OBD_MD_FLGRANT)) !=
98                                         (OBD_MD_FLBLOCKS|OBD_MD_FLGRANT)) {
99                 oa->o_valid &= ~OBD_MD_FLGRANT;
100                 EXIT;
101                 return;
102         }
103
104         fed = &exp->exp_filter_data;
105
106         /* Don't print this to the console the first time it happens, since
107          * it can happen legitimately on occasion, but only rarely. */
108         if (time_after(jiffies, last_msg + 60 * HZ)) {
109                 last_count = 0;
110                 last_msg = jiffies;
111         }
112         if ((last_count & (-last_count)) == last_count)
113                 mask = D_WARNING;
114         last_count++;
115
116         /* Add some margin, since there is a small race if other RPCs arrive
117          * out-or-order and have already consumed some grant.  We want to
118          * leave this here in case there is a large error in accounting. */
119         CDEBUG(oa->o_grant > fed->fed_grant + FILTER_GRANT_CHUNK ? mask:D_CACHE,
120                "%s: cli %s/%p reports grant: "LPU64" dropped: %u, local: %lu\n",
121                obd->obd_name, exp->exp_client_uuid.uuid, exp, oa->o_grant,
122                oa->o_dropped, fed->fed_grant);
123
124         /* Update our accounting now so that statfs takes it into account.
125          * Note that fed_dirty is only approximate and can become incorrect
126          * if RPCs arrive out-of-order.  No important calculations depend
127          * on fed_dirty however. */
128         obd->u.filter.fo_tot_dirty += oa->o_dirty - fed->fed_dirty;
129         if (fed->fed_grant < oa->o_dropped) {
130                 CERROR("%s: cli %s/%p reports %u dropped > fed_grant %lu\n",
131                        obd->obd_name, exp->exp_client_uuid.uuid, exp,
132                        oa->o_dropped, fed->fed_grant);
133                 oa->o_dropped = 0;
134         }
135         if (obd->u.filter.fo_tot_granted < oa->o_dropped) {
136                 CERROR("%s: cli %s/%p reports %u dropped > tot_grant "LPU64"\n",
137                        obd->obd_name, exp->exp_client_uuid.uuid, exp,
138                        oa->o_dropped, obd->u.filter.fo_tot_granted);
139                 oa->o_dropped = 0;
140         }
141         obd->u.filter.fo_tot_granted -= oa->o_dropped;
142         fed->fed_grant -= oa->o_dropped;
143         fed->fed_dirty = oa->o_dirty;
144         EXIT;
145 }
146
147 #define GRANT_FOR_LLOG(obd) 16
148
149 /* Figure out how much space is available between what we've granted
150  * and what remains in the filesystem.  Compensate for ext3 indirect
151  * block overhead when computing how much free space is left ungranted.
152  *
153  * Caller must hold obd_osfs_lock. */
154 obd_size filter_grant_space_left(struct obd_export *exp)
155 {
156         struct obd_device *obd = exp->exp_obd;
157         int blockbits = obd->u.filter.fo_sb->s_blocksize_bits;
158         obd_size tot_granted = obd->u.filter.fo_tot_granted, avail, left = 0;
159         int rc, statfs_done = 0;
160
161         LASSERT_SPIN_LOCKED(&obd->obd_osfs_lock);
162
163         if (time_before(obd->obd_osfs_age, jiffies - HZ)) {
164 restat:
165                 rc = fsfilt_statfs(obd, obd->u.filter.fo_sb, jiffies + 1);
166                 if (rc) /* N.B. statfs can't really fail */
167                         RETURN(0);
168                 statfs_done = 1;
169         }
170
171         avail = obd->obd_osfs.os_bavail;
172         left = avail - (avail >> (blockbits - 3)); /* (d)indirect */
173         if (left > GRANT_FOR_LLOG(obd)) {
174                 left = (left - GRANT_FOR_LLOG(obd)) << blockbits;
175         } else {
176                 left = 0 /* << blockbits */;
177         }
178
179         if (!statfs_done && left < 32 * FILTER_GRANT_CHUNK + tot_granted) {
180                 CDEBUG(D_CACHE, "fs has no space left and statfs too old\n");
181                 goto restat;
182         }
183
184         if (left >= tot_granted) {
185                 left -= tot_granted;
186         } else {
187                 static unsigned long next;
188                 if (left < tot_granted - obd->u.filter.fo_tot_pending &&
189                     time_after(jiffies, next)) {
190                         spin_unlock(&obd->obd_osfs_lock);
191                         CERROR("%s: cli %s/%p grant "LPU64" > available "
192                                LPU64" and pending "LPU64"\n", obd->obd_name,
193                                exp->exp_client_uuid.uuid, exp, tot_granted,
194                                left, obd->u.filter.fo_tot_pending);
195                         if (next == 0)
196                                 portals_debug_dumplog();
197                         next = jiffies + 20 * HZ;
198                         spin_lock(&obd->obd_osfs_lock);
199                 }
200                 left = 0;
201         }
202
203         CDEBUG(D_CACHE, "%s: cli %s/%p free: "LPU64" avail: "LPU64" grant "LPU64
204                " left: "LPU64" pending: "LPU64"\n", obd->obd_name,
205                exp->exp_client_uuid.uuid, exp,
206                obd->obd_osfs.os_bfree << blockbits, avail << blockbits,
207                tot_granted, left, obd->u.filter.fo_tot_pending);
208
209         return left;
210 }
211
212 /* Calculate how much grant space to allocate to this client, based on how
213  * much space is currently free and how much of that is already granted.
214  *
215  * Caller must hold obd_osfs_lock. */
216 long filter_grant(struct obd_export *exp, obd_size current_grant,
217                   obd_size want, obd_size fs_space_left)
218 {
219         struct obd_device *obd = exp->exp_obd;
220         struct filter_export_data *fed = &exp->exp_filter_data;
221         int blockbits = obd->u.filter.fo_sb->s_blocksize_bits;
222         __u64 grant = 0;
223
224         LASSERT_SPIN_LOCKED(&obd->obd_osfs_lock);
225
226         /* Grant some fraction of the client's requested grant space so that
227          * they are not always waiting for write credits (not all of it to
228          * avoid overgranting in face of multiple RPCs in flight).  This
229          * essentially will be able to control the OSC_MAX_RIF for a client.
230          *
231          * If we do have a large disparity between what the client thinks it
232          * has and what we think it has, don't grant very much and let the
233          * client consume its grant first.  Either it just has lots of RPCs
234          * in flight, or it was evicted and its grants will soon be used up. */
235         if (current_grant < want &&
236             current_grant < fed->fed_grant + FILTER_GRANT_CHUNK) {
237                 grant = min((want >> blockbits) / 2,
238                             (fs_space_left >> blockbits) / 8);
239                 grant <<= blockbits;
240
241                 if (grant) {
242                         if (grant > FILTER_GRANT_CHUNK)
243                                 grant = FILTER_GRANT_CHUNK;
244
245                         obd->u.filter.fo_tot_granted += grant;
246                         fed->fed_grant += grant;
247                 }
248         }
249
250         CDEBUG(D_CACHE,"%s: cli %s/%p wants: "LPU64" granting: "LPU64"\n",
251                obd->obd_name, exp->exp_client_uuid.uuid, exp, want, grant);
252         CDEBUG(D_CACHE,
253                "%s: cli %s/%p tot cached:"LPU64" granted:"LPU64
254                " num_exports: %d\n", obd->obd_name, exp->exp_client_uuid.uuid,
255                exp, obd->u.filter.fo_tot_dirty,
256                obd->u.filter.fo_tot_granted, obd->obd_num_exports);
257
258         return grant;
259 }
260
261
262 static int filter_preprw_read(int cmd, struct obd_export *exp, struct obdo *oa,
263                               int objcount, struct obd_ioobj *obj,
264                               int niocount, struct niobuf_remote *nb,
265                               struct niobuf_local *res,
266                               struct obd_trans_info *oti)
267 {
268         struct obd_device *obd = exp->exp_obd;
269         struct lvfs_run_ctxt saved;
270         struct niobuf_remote *rnb;
271         struct niobuf_local *lnb;
272         struct dentry *dentry = NULL;
273         struct inode *inode;
274         void *iobuf = NULL; 
275         int rc = 0, i, tot_bytes = 0;
276         unsigned long now = jiffies;
277         ENTRY;
278
279         /* We are currently not supporting multi-obj BRW_READ RPCS at all.
280          * When we do this function's dentry cleanup will need to be fixed */
281         LASSERTF(objcount == 1, "%d\n", objcount);
282         LASSERTF(obj->ioo_bufcnt > 0, "%d\n", obj->ioo_bufcnt);
283
284         if (oa && oa->o_valid & OBD_MD_FLGRANT) {
285                 spin_lock(&obd->obd_osfs_lock);
286                 filter_grant_incoming(exp, oa);
287
288                 oa->o_grant = 0;
289                 
290                 spin_unlock(&obd->obd_osfs_lock);
291         }
292
293         memset(res, 0, niocount * sizeof(*res));
294
295         push_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL);
296         rc = filter_alloc_iobuf(OBD_BRW_READ, obj->ioo_bufcnt, &iobuf);
297         if (rc)
298                 GOTO(cleanup, rc);
299
300         dentry = filter_oa2dentry(obd, oa);
301         if (IS_ERR(dentry))
302                 GOTO(cleanup, rc = PTR_ERR(dentry));
303
304         if (dentry->d_inode == NULL) {
305                 CERROR("trying to BRW to non-existent file "LPU64"\n",
306                                obj->ioo_id);
307                 GOTO(cleanup, rc = -ENOENT);
308         }
309
310         inode = dentry->d_inode; 
311
312         if (time_after(jiffies, now + 15 * HZ))
313                 CERROR("slow preprw_read setup %lus\n", (jiffies - now) / HZ);
314         else
315                 CDEBUG(D_INFO, "preprw_read setup: %lu jiffies\n",
316                        (jiffies - now));
317
318         for (i = 0, lnb = res, rnb = nb; i < obj->ioo_bufcnt;
319              i++, rnb++, lnb++) {
320                 lnb->dentry = dentry;
321                 lnb->offset = rnb->offset;
322                 lnb->len    = rnb->len;
323                 lnb->flags  = rnb->flags;
324
325                 if (inode->i_size <= rnb->offset)
326                       /* If there's no more data, abort early.
327                       * lnb->page == NULL and lnb->rc == 0, so it's
328                       * easy to detect later. */
329                         break;
330                 else
331                         rc = filter_alloc_dio_page(obd, inode, lnb);
332                 if (rc) {
333                         CDEBUG(rc == -ENOSPC ? D_INODE : D_ERROR,
334                              "page err %u@"LPU64" %u/%u %p: rc %d\n",
335                               lnb->len, lnb->offset, i, obj->ioo_bufcnt,
336                               dentry, rc);
337                         GOTO(cleanup, rc);
338                 }
339
340                 if (inode->i_size < lnb->offset + lnb->len - 1)
341                         lnb->rc = inode->i_size - lnb->offset;
342                 else
343                         lnb->rc = lnb->len;
344
345                 tot_bytes += lnb->rc;
346
347                 filter_iobuf_add_page(obd, iobuf, inode, lnb->page);
348         }
349
350         if (time_after(jiffies, now + 15 * HZ))
351                 CERROR("slow start_page_read %lus\n", (jiffies - now) / HZ);
352         else
353                 CDEBUG(D_INFO, "start_page_read: %lu jiffies\n",
354                        (jiffies - now));
355
356         rc = filter_direct_io(OBD_BRW_READ, dentry, iobuf, exp,
357                               NULL, NULL, NULL);
358         if (rc)
359                 GOTO(cleanup, rc);
360
361         lprocfs_counter_add(obd->obd_stats, LPROC_FILTER_READ_BYTES, tot_bytes);
362
363         filter_tally_read(&exp->exp_obd->u.filter, res, niocount);
364
365         EXIT;
366
367 cleanup:
368         if (rc != 0) {
369                 filter_free_dio_pages(objcount, obj, niocount, res);
370
371                 if (dentry != NULL)
372                         f_dput(dentry);
373                 else
374                         CERROR("NULL dentry in cleanup -- tell CFS\n");
375         }
376
377         if (iobuf != NULL)
378                 filter_free_iobuf(iobuf);
379
380         pop_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL);
381         if (rc)
382                 CERROR("io error %d\n", rc);
383         return rc;
384 }
385
386 /* When clients have dirtied as much space as they've been granted they
387  * fall through to sync writes.  These sync writes haven't been expressed
388  * in grants and need to error with ENOSPC when there isn't room in the
389  * filesystem for them after grants are taken into account.  However,
390  * writeback of the dirty data that was already granted space can write
391  * right on through.
392  *
393  * Caller must hold obd_osfs_lock. */
394 static int filter_grant_check(struct obd_export *exp, int objcount,
395                               struct fsfilt_objinfo *fso, int niocount,
396                               struct niobuf_remote *rnb,
397                               struct niobuf_local *lnb, obd_size *left,
398                               struct inode *inode)
399 {
400         struct filter_export_data *fed = &exp->exp_filter_data;
401         int blocksize = exp->exp_obd->u.filter.fo_sb->s_blocksize;
402         unsigned long used = 0, ungranted = 0, using;
403         int i, rc = -ENOSPC, obj, n = 0, mask = D_CACHE;
404
405         LASSERT_SPIN_LOCKED(&exp->exp_obd->obd_osfs_lock);
406
407         for (obj = 0; obj < objcount; obj++) {
408                 for (i = 0; i < fso[obj].fso_bufcnt; i++, n++) {
409                         int tmp, bytes;
410
411                         /* FIXME: this is calculated with PAGE_SIZE on client */
412                         bytes = rnb[n].len;
413                         bytes += rnb[n].offset & (blocksize - 1);
414                         tmp = (rnb[n].offset + rnb[n].len) & (blocksize - 1);
415                         if (tmp)
416                                 bytes += blocksize - tmp;
417
418                         if (rnb[n].flags & OBD_BRW_FROM_GRANT) {
419                                 if (fed->fed_grant < used + bytes) {
420                                         CDEBUG(D_CACHE,
421                                                "%s: cli %s/%p claims %ld+%d "
422                                                "GRANT, real grant %lu idx %d\n",
423                                                exp->exp_obd->obd_name,
424                                                exp->exp_client_uuid.uuid, exp,
425                                                used, bytes, fed->fed_grant, n);
426                                         mask = D_ERROR;
427                                 } else {
428                                         used += bytes;
429                                         rnb[n].flags |= OBD_BRW_GRANTED;
430                                         lnb[n].lnb_grant_used = bytes;
431                                         CDEBUG(0, "idx %d used=%lu\n", n, used);
432                                         rc = 0;
433                                         continue;
434                                 }
435                         }
436                         if (*left > ungranted) {
437                                 /* if enough space, pretend it was granted */
438                                 ungranted += bytes;
439                                 rnb[n].flags |= OBD_BRW_GRANTED;
440                                 CDEBUG(0, "idx %d ungranted=%lu\n",n,ungranted);
441                                 rc = 0;
442                                 continue;
443                         }
444
445                         /* We can't check for already-mapped blocks here, as
446                          * it requires dropping the osfs lock to do the bmap.
447                          * Instead, we return ENOSPC and in that case we need
448                          * to go through and verify if all of the blocks not
449                          * marked BRW_GRANTED are already mapped and we can
450                          * ignore this error. */
451                         lnb[n].rc = -ENOSPC;
452                         rnb[n].flags &= OBD_BRW_GRANTED;
453                         CDEBUG(D_CACHE,"%s: cli %s/%p idx %d no space for %d\n",
454                                exp->exp_obd->obd_name,
455                                exp->exp_client_uuid.uuid, exp, n, bytes);
456                 }
457         }
458
459         /* Now substract what client have used already.  We don't subtract
460          * this from the tot_granted yet, so that other client's can't grab
461          * that space before we have actually allocated our blocks.  That
462          * happens in filter_grant_commit() after the writes are done. */
463         *left -= ungranted;
464         fed->fed_grant -= used;
465         fed->fed_pending += used;
466         exp->exp_obd->u.filter.fo_tot_pending += used;
467
468         CDEBUG(mask,
469                "%s: cli %s/%p used: %lu ungranted: %lu grant: %lu dirty: %lu\n",
470                exp->exp_obd->obd_name, exp->exp_client_uuid.uuid, exp, used,
471                ungranted, fed->fed_grant, fed->fed_dirty);
472
473         /* Rough calc in case we don't refresh cached statfs data */
474         using = (used + ungranted + 1 ) >>
475                 exp->exp_obd->u.filter.fo_sb->s_blocksize_bits;
476         if (exp->exp_obd->obd_osfs.os_bavail > using)
477                 exp->exp_obd->obd_osfs.os_bavail -= using;
478         else
479                 exp->exp_obd->obd_osfs.os_bavail = 0;
480
481         if (fed->fed_dirty < used) {
482                 CERROR("%s: cli %s/%p claims used %lu > fed_dirty %lu\n",
483                        exp->exp_obd->obd_name, exp->exp_client_uuid.uuid, exp,
484                        used, fed->fed_dirty);
485                 used = fed->fed_dirty;
486         }
487         exp->exp_obd->u.filter.fo_tot_dirty -= used;
488         fed->fed_dirty -= used;
489
490         return rc;
491 }
492
493 /* If we ever start to support multi-object BRW RPCs, we will need to get locks
494  * on mulitple inodes.  That isn't all, because there still exists the
495  * possibility of a truncate starting a new transaction while holding the ext3
496  * rwsem = write while some writes (which have started their transactions here)
497  * blocking on the ext3 rwsem = read => lock inversion.
498  *
499  * The handling gets very ugly when dealing with locked pages.  It may be easier
500  * to just get rid of the locked page code (which has problems of its own) and
501  * either discover we do not need it anymore (i.e. it was a symptom of another
502  * bug) or ensure we get the page locks in an appropriate order. */
503 static int filter_preprw_write(int cmd, struct obd_export *exp, struct obdo *oa,
504                                int objcount, struct obd_ioobj *obj,
505                                int niocount, struct niobuf_remote *nb,
506                                struct niobuf_local *res,
507                                struct obd_trans_info *oti)
508 {
509         struct lvfs_run_ctxt saved;
510         struct niobuf_remote *rnb;
511         struct niobuf_local *lnb = res;
512         struct fsfilt_objinfo fso;
513         struct dentry *dentry = NULL;
514         void *iobuf; 
515         obd_size left;
516         unsigned long now = jiffies;
517         int rc = 0, i, tot_bytes = 0, cleanup_phase = 0;
518         ENTRY;
519         LASSERT(objcount == 1);
520         LASSERT(obj->ioo_bufcnt > 0);
521
522         memset(res, 0, niocount * sizeof(*res));
523
524         rc = filter_alloc_iobuf(OBD_BRW_READ, obj->ioo_bufcnt, &iobuf);
525         if (rc)
526                 GOTO(cleanup, rc);
527         cleanup_phase = 1;
528
529         push_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL);
530         dentry = filter_fid2dentry(exp->exp_obd, NULL, obj->ioo_gr,
531                                    obj->ioo_id);
532         if (IS_ERR(dentry))
533                 GOTO(cleanup, rc = PTR_ERR(dentry));
534         
535         cleanup_phase = 2;
536         
537         if (dentry->d_inode == NULL) {
538                 CERROR("trying to BRW to non-existent file "LPU64"\n",
539                        obj->ioo_id);
540                 f_dput(dentry);
541                 GOTO(cleanup, rc = -ENOENT);
542         }
543
544         fso.fso_dentry = dentry;
545         fso.fso_bufcnt = obj->ioo_bufcnt;
546
547         if (time_after(jiffies, now + 15 * HZ))
548                 CERROR("slow preprw_write setup %lus\n", (jiffies - now) / HZ);
549         else
550                 CDEBUG(D_INFO, "preprw_write setup: %lu jiffies\n",
551                        (jiffies - now));
552
553         spin_lock(&exp->exp_obd->obd_osfs_lock);
554         if (oa)
555                 filter_grant_incoming(exp, oa);
556         
557         cleanup_phase = 3;
558
559         left = filter_grant_space_left(exp);
560
561         rc = filter_grant_check(exp, objcount, &fso, niocount, nb, res,
562                                 &left, dentry->d_inode);
563         if (oa && oa->o_valid & OBD_MD_FLGRANT)
564                 oa->o_grant = filter_grant(exp,oa->o_grant,oa->o_undirty,left);
565
566         spin_unlock(&exp->exp_obd->obd_osfs_lock);
567
568         if (rc) 
569                 GOTO(cleanup, rc);
570
571         for (i = 0, rnb = nb, lnb = res; i < obj->ioo_bufcnt;
572              i++, lnb++, rnb++) {
573                 /* We still set up for ungranted pages so that granted pages
574                  * can be written to disk as they were promised, and portals
575                  * needs to keep the pages all aligned properly. */
576                 lnb->dentry = dentry;
577                 lnb->offset = rnb->offset;
578                 lnb->len    = rnb->len;
579                 lnb->flags  = rnb->flags;
580
581                 rc = filter_alloc_dio_page(exp->exp_obd, dentry->d_inode,lnb);
582                 if (rc) {
583                         CERROR("page err %u@"LPU64" %u/%u %p: rc %d\n",
584                                lnb->len, lnb->offset,
585                                i, obj->ioo_bufcnt, dentry, rc);
586                         GOTO(cleanup, rc);
587                 }
588                 cleanup_phase = 4;
589
590                 /* If the filter writes a partial page, then has the file
591                  * extended, the client will read in the whole page.  the
592                  * filter has to be careful to zero the rest of the partial
593                  * page on disk.  we do it by hand for partial extending
594                  * writes, send_bio() is responsible for zeroing pages when
595                  * asked to read unmapped blocks -- brw_kiovec() does this. */
596                 if (lnb->len != PAGE_SIZE) {
597                         if (lnb->offset + lnb->len < dentry->d_inode->i_size) {
598                                 filter_iobuf_add_page(exp->exp_obd, iobuf,
599                                                       dentry->d_inode,
600                                                       lnb->page);
601                         } else {
602                                 memset(kmap(lnb->page) + lnb->len, 0,
603                                        PAGE_SIZE - lnb->len);
604                                 kunmap(lnb->page);
605                         }
606                 }
607                 if (lnb->rc == 0)
608                         tot_bytes += lnb->len;
609         }
610
611         rc = filter_direct_io(OBD_BRW_READ, dentry, iobuf, exp,
612                               NULL, NULL, NULL);
613         
614         if (time_after(jiffies, now + 15 * HZ))
615                 CERROR("slow start_page_write %lus\n", (jiffies - now) / HZ);
616         else
617                 CDEBUG(D_INFO, "start_page_write: %lu jiffies\n",
618                        (jiffies - now));
619
620         lprocfs_counter_add(exp->exp_obd->obd_stats, LPROC_FILTER_WRITE_BYTES,
621                             tot_bytes);
622         EXIT;
623 cleanup:
624         switch(cleanup_phase) {
625         case 4:
626                 if (rc)
627                         filter_free_dio_pages(objcount, obj, niocount, res);
628         case 3:
629                 pop_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL);
630                 filter_free_iobuf(iobuf);
631         case 2:
632                 if (rc)
633                         f_dput(dentry);
634                 break;
635         case 1:
636                 spin_lock(&exp->exp_obd->obd_osfs_lock);
637                 if (oa)
638                         filter_grant_incoming(exp, oa);
639                 spin_unlock(&exp->exp_obd->obd_osfs_lock);
640                 pop_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL);
641                 filter_free_iobuf(iobuf);
642                 break;
643         default:;
644         
645         }
646         RETURN(rc);
647 }
648
649 int filter_preprw(int cmd, struct obd_export *exp, struct obdo *oa,
650                   int objcount, struct obd_ioobj *obj, int niocount,
651                   struct niobuf_remote *nb, struct niobuf_local *res,
652                   struct obd_trans_info *oti)
653 {
654         if (cmd == OBD_BRW_WRITE)
655                 return filter_preprw_write(cmd, exp, oa, objcount, obj,
656                                            niocount, nb, res, oti);
657
658         if (cmd == OBD_BRW_READ)
659                 return filter_preprw_read(cmd, exp, oa, objcount, obj,
660                                           niocount, nb, res, oti);
661
662         LBUG();
663         return -EPROTO;
664 }
665
666 void filter_release_read_page(struct filter_obd *filter, struct inode *inode,
667                               struct page *page)
668 {
669         int drop = 0;
670
671         if (inode != NULL &&
672             (inode->i_size > filter->fo_readcache_max_filesize))
673                 drop = 1;
674
675         /* drop from cache like truncate_list_pages() */
676         if (drop && !TryLockPage(page)) {
677                 if (page->mapping)
678                         ll_truncate_complete_page(page);
679                 unlock_page(page);
680         }
681         page_cache_release(page);
682 }
683
684 static int filter_commitrw_read(struct obd_export *exp, struct obdo *oa,
685                                 int objcount, struct obd_ioobj *obj,
686                                 int niocount, struct niobuf_local *res,
687                                 struct obd_trans_info *oti, int rc)
688 {
689         struct inode *inode = NULL;
690         ENTRY;
691
692         if (res->dentry != NULL)
693                 inode = res->dentry->d_inode;
694
695         filter_free_dio_pages(objcount, obj, niocount, res);
696         
697         if (res->dentry != NULL)
698                 f_dput(res->dentry);
699         RETURN(rc);
700 }
701
702 void flip_into_page_cache(struct inode *inode, struct page *new_page)
703 {
704         struct page *old_page;
705         int rc;
706
707         do {
708                 /* the dlm is protecting us from read/write concurrency, so we
709                  * expect this find_lock_page to return quickly.  even if we
710                  * race with another writer it won't be doing much work with
711                  * the page locked.  we do this 'cause t_c_p expects a
712                  * locked page, and it wants to grab the pagecache lock
713                  * as well. */
714                 old_page = find_lock_page(inode->i_mapping, new_page->index);
715                 if (old_page) {
716                         ll_truncate_complete_page(old_page);
717                         unlock_page(old_page);
718                         page_cache_release(old_page);
719                 }
720
721 #if 0 /* this should be a /proc tunable someday */
722                 /* racing o_directs (no locking ioctl) could race adding
723                  * their pages, so we repeat the page invalidation unless
724                  * we successfully added our new page */
725                 rc = add_to_page_cache_unique(new_page, inode->i_mapping,
726                                               new_page->index,
727                                               page_hash(inode->i_mapping,
728                                                         new_page->index));
729                 if (rc == 0) {
730                         /* add_to_page_cache clears uptodate|dirty and locks
731                          * the page */
732                         SetPageUptodate(new_page);
733                         unlock_page(new_page);
734                 }
735 #else
736                 rc = 0;
737 #endif
738         } while (rc != 0);
739 }
740
741 void filter_grant_commit(struct obd_export *exp, int niocount,
742                          struct niobuf_local *res)
743 {
744         struct filter_obd *filter = &exp->exp_obd->u.filter;
745         struct niobuf_local *lnb = res;
746         unsigned long pending = 0;
747         int i;
748
749         spin_lock(&exp->exp_obd->obd_osfs_lock);
750         for (i = 0, lnb = res; i < niocount; i++, lnb++)
751                 pending += lnb->lnb_grant_used;
752
753         LASSERTF(exp->exp_filter_data.fed_pending >= pending,
754                  "%s: cli %s/%p fed_pending: %lu grant_used: %lu\n",
755                  exp->exp_obd->obd_name, exp->exp_client_uuid.uuid, exp,
756                  exp->exp_filter_data.fed_pending, pending);
757         exp->exp_filter_data.fed_pending -= pending;
758         LASSERTF(filter->fo_tot_granted >= pending,
759                  "%s: cli %s/%p tot_granted: "LPU64" grant_used: %lu\n",
760                  exp->exp_obd->obd_name, exp->exp_client_uuid.uuid, exp,
761                  exp->exp_obd->u.filter.fo_tot_granted, pending);
762         filter->fo_tot_granted -= pending;
763         LASSERTF(filter->fo_tot_pending >= pending,
764                  "%s: cli %s/%p tot_pending: "LPU64" grant_used: %lu\n",
765                  exp->exp_obd->obd_name, exp->exp_client_uuid.uuid, exp,
766                  filter->fo_tot_pending, pending);
767         filter->fo_tot_pending -= pending;
768
769         spin_unlock(&exp->exp_obd->obd_osfs_lock);
770 }
771 int filter_do_cow(struct obd_export *exp, struct obd_ioobj *obj,
772                   int nioo, struct niobuf_remote *rnb)
773 {
774         struct dentry *dentry;
775         struct lvfs_run_ctxt saved;
776         struct write_extents *extents = NULL;
777         int j, rc = 0, numexts = 0, flags = 0;
778
779         ENTRY;
780
781         LASSERT(nioo == 1);
782
783         push_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL);
784         
785         dentry = filter_fid2dentry(exp->exp_obd, NULL, obj->ioo_gr,
786                                    obj->ioo_id);
787         if (IS_ERR(dentry)) {
788                 pop_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL);
789                 RETURN (PTR_ERR(dentry));
790         }
791
792         if (dentry->d_inode == NULL) {
793                 CERROR("trying to write extents to non-existent file "LPU64"\n",
794                        obj->ioo_id);
795                 GOTO(cleanup, rc = -ENOENT);
796         }
797         
798         flags = fsfilt_get_fs_flags(exp->exp_obd, dentry);
799         if (!(flags & SM_DO_COW)) {
800                 GOTO(cleanup, rc);
801         }
802         OBD_ALLOC(extents, obj->ioo_bufcnt * sizeof(struct write_extents)); 
803         if (!extents) {
804                 CERROR("No Memory\n");
805                 GOTO(cleanup, rc = -ENOMEM);
806         }
807         for (j = 0; j < obj->ioo_bufcnt; j++) {
808                 if (rnb[j].len != 0) {
809                         extents[numexts].w_count = rnb[j].len;
810                         extents[numexts].w_pos = rnb[j].offset;
811                         numexts++;
812                 } 
813         } 
814         rc = fsfilt_do_write_cow(exp->exp_obd, dentry, extents, numexts);
815         if (rc) {
816                 CERROR("Do cow error id "LPU64" rc:%d \n",
817                         obj->ioo_id, rc);
818                 GOTO(cleanup, rc); 
819         }
820         
821 cleanup:
822         if (extents) {
823                 OBD_FREE(extents, obj->ioo_bufcnt * sizeof(struct write_extents));
824         }
825         f_dput(dentry);
826         pop_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL);
827         RETURN(rc);
828
829 }
830 int filter_write_extents(struct obd_export *exp, struct obd_ioobj *obj, int nobj,
831                          int niocount, struct niobuf_local *local, int rc)
832 {
833         struct lvfs_run_ctxt saved;
834         struct dentry *dentry;
835         struct niobuf_local *lnb;
836         __u64  offset = 0;
837         __u32  len = 0;
838         int    i, flags; 
839  
840         ENTRY;
841
842         LASSERT(nobj == 1);
843
844         push_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL);
845
846         dentry = filter_fid2dentry(exp->exp_obd, NULL, obj->ioo_gr,
847                                    obj->ioo_id);
848         if (IS_ERR(dentry)) {
849                 pop_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL);
850                 RETURN (PTR_ERR(dentry));
851         }
852
853         if (dentry->d_inode == NULL) {
854                 CERROR("trying to write extents to non-existent file "LPU64"\n",
855                        obj->ioo_id);
856                 GOTO(cleanup, rc = -ENOENT);
857         }
858         
859         flags = fsfilt_get_fs_flags(exp->exp_obd, dentry);
860         if (!(flags & SM_DO_REC)) {
861                 GOTO(cleanup, rc);
862         }
863
864         for (i = 0, lnb = local; i < obj->ioo_bufcnt; i++, lnb++) {
865                 if (len == 0) {
866                         offset = lnb->offset;
867                         len = lnb->len;
868                 } else if (lnb->offset == (offset + len)) {
869                         len += lnb->len;
870                 } else {
871                         rc = fsfilt_write_extents(exp->exp_obd, dentry, 
872                                                   offset, len);
873                         if (rc) {
874                                 CERROR("write exts off "LPU64" num %u rc:%d\n",
875                                         offset, len, rc);
876                                 GOTO(cleanup, rc);
877                         }
878                         offset = lnb->offset;
879                         len = lnb->len; 
880                 } 
881         }
882         if (len > 0) {
883                 rc = fsfilt_write_extents(exp->exp_obd, dentry, 
884                                           offset, len);
885                 if (rc) {
886                         CERROR("write exts off "LPU64" num %u rc:%d\n",
887                                 offset, len, rc);
888                         GOTO(cleanup, rc);
889                 }
890         }
891 cleanup:
892         f_dput(dentry);
893         pop_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL);
894         RETURN(rc);
895 }
896
897 int filter_commitrw(int cmd, struct obd_export *exp, struct obdo *oa,
898                     int objcount, struct obd_ioobj *obj, int niocount,
899                     struct niobuf_local *res, struct obd_trans_info *oti,int rc)
900 {
901         if (cmd == OBD_BRW_WRITE)
902                 return filter_commitrw_write(exp, oa, objcount, obj, niocount,
903                                              res, oti, rc);
904         if (cmd == OBD_BRW_READ)
905                 return filter_commitrw_read(exp, oa, objcount, obj, niocount,
906                                             res, oti, rc);
907         LBUG();
908         return -EPROTO;
909 }
910
911 int filter_brw(int cmd, struct obd_export *exp, struct obdo *oa,
912                struct lov_stripe_md *lsm, obd_count oa_bufs,
913                struct brw_page *pga, struct obd_trans_info *oti)
914 {
915         struct obd_ioobj ioo;
916         struct niobuf_local *lnb;
917         struct niobuf_remote *rnb;
918         obd_count i;
919         int ret = 0;
920         ENTRY;
921
922         OBD_ALLOC(lnb, oa_bufs * sizeof(struct niobuf_local));
923         OBD_ALLOC(rnb, oa_bufs * sizeof(struct niobuf_remote));
924
925         if (lnb == NULL || rnb == NULL)
926                 GOTO(out, ret = -ENOMEM);
927
928         for (i = 0; i < oa_bufs; i++) {
929                 rnb[i].offset = pga[i].disk_offset;
930                 rnb[i].len = pga[i].count;
931         }
932
933         obdo_to_ioobj(oa, &ioo);
934         ioo.ioo_bufcnt = oa_bufs;
935
936         ret = filter_preprw(cmd, exp, oa, 1, &ioo, oa_bufs, rnb, lnb, oti);
937         if (ret != 0)
938                 GOTO(out, ret);
939
940         for (i = 0; i < oa_bufs; i++) {
941                 void *virt;
942                 obd_off off;
943                 void *addr;
944
945                 if (lnb[i].page == NULL)
946                         break;
947
948                 off = pga[i].disk_offset & ~PAGE_MASK;
949                 virt = kmap(pga[i].pg);
950                 addr = kmap(lnb[i].page);
951
952                 /* 2 kmaps == vanishingly small deadlock opportunity */
953
954                 if (cmd & OBD_BRW_WRITE)
955                         memcpy(addr + off, virt + off, pga[i].count);
956                 else
957                         memcpy(virt + off, addr + off, pga[i].count);
958
959                 kunmap(lnb[i].page);
960                 kunmap(pga[i].pg);
961         }
962
963         ret = filter_commitrw(cmd, exp, oa, 1, &ioo, oa_bufs, lnb, oti, ret);
964
965 out:
966         if (lnb)
967                 OBD_FREE(lnb, oa_bufs * sizeof(struct niobuf_local));
968         if (rnb)
969                 OBD_FREE(rnb, oa_bufs * sizeof(struct niobuf_remote));
970         RETURN(ret);
971 }