Whamcloud - gitweb
6818ace08da9dacea7a8a6b336b2b48741c23d58
[fs/lustre-release.git] / lustre / llite / rw.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Lustre Lite I/O Page Cache
5  *
6  *  Copyright (c) 2001-2003 Cluster File Systems, Inc.
7  *
8  *   This file is part of Lustre, http://www.lustre.org.
9  *
10  *   Lustre is free software; you can redistribute it and/or
11  *   modify it under the terms of version 2 of the GNU General Public
12  *   License as published by the Free Software Foundation.
13  *
14  *   Lustre is distributed in the hope that it will be useful,
15  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
16  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17  *   GNU General Public License for more details.
18  *
19  *   You should have received a copy of the GNU General Public License
20  *   along with Lustre; if not, write to the Free Software
21  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
22  */
23
24 #include <linux/config.h>
25 #include <linux/kernel.h>
26 #include <linux/mm.h>
27 #include <linux/string.h>
28 #include <linux/stat.h>
29 #include <linux/errno.h>
30 #include <linux/smp_lock.h>
31 #include <linux/unistd.h>
32 #include <linux/version.h>
33 #include <asm/system.h>
34 #include <asm/uaccess.h>
35
36 #include <linux/fs.h>
37 #if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
38 #include <linux/buffer_head.h>
39 #else
40 #include <linux/iobuf.h>
41 #endif
42 #include <linux/stat.h>
43 #include <asm/uaccess.h>
44 #include <asm/segment.h>
45 #include <linux/mm.h>
46 #include <linux/pagemap.h>
47 #include <linux/smp_lock.h>
48
49 #define DEBUG_SUBSYSTEM S_LLITE
50
51 #include <linux/lustre_mds.h>
52 #include <linux/lustre_lite.h>
53 #include <linux/lustre_lib.h>
54
55 /*
56  * Remove page from dirty list
57  */
58 static void __set_page_clean(struct page *page)
59 {
60         struct address_space *mapping = page->mapping;
61         struct inode *inode;
62
63         if (!mapping)
64                 return;
65
66 #if (LINUX_VERSION_CODE <= KERNEL_VERSION(2,5,0))
67         spin_lock(&pagecache_lock);
68 #endif
69
70         list_del(&page->list);
71         list_add(&page->list, &mapping->clean_pages);
72
73         inode = mapping->host;
74         if (list_empty(&mapping->dirty_pages)) {
75                 CDEBUG(D_INODE, "inode clean\n");
76                 inode->i_state &= ~I_DIRTY_PAGES;
77         }
78 #if (LINUX_VERSION_CODE <= KERNEL_VERSION(2,5,0))
79         spin_unlock(&pagecache_lock);
80 #endif
81         EXIT;
82 }
83
84 inline void set_page_clean(struct page *page)
85 {
86         if (PageDirty(page)) {
87                 ClearPageDirty(page);
88                 __set_page_clean(page);
89         }
90 }
91
92 /* SYNCHRONOUS I/O to object storage for an inode */
93 static int ll_brw(int cmd, struct inode *inode, struct page *page, int create)
94 {
95         struct ll_inode_info *lli = ll_i2info(inode);
96         struct lov_stripe_md *lsm = lli->lli_smd;
97         struct obd_brw_set *set;
98         struct brw_page pg;
99         int rc;
100         ENTRY;
101
102         set = obd_brw_set_new();
103         if (set == NULL)
104                 RETURN(-ENOMEM);
105
106         pg.pg = page;
107         pg.off = ((obd_off)page->index) << PAGE_SHIFT;
108
109         if (cmd == OBD_BRW_WRITE && (pg.off + PAGE_SIZE > inode->i_size))
110                 pg.count = inode->i_size % PAGE_SIZE;
111         else
112                 pg.count = PAGE_SIZE;
113
114         CDEBUG(D_PAGE, "%s %d bytes ino %lu at "LPU64"/"LPX64"\n",
115               cmd & OBD_BRW_WRITE ? "write" : "read", pg.count, inode->i_ino,
116               pg.off, pg.off);
117         if (pg.count == 0) {
118                 CERROR("ZERO COUNT: ino %lu: size %p:%Lu(%p:%Lu) idx %lu off "
119                        LPU64"\n",
120                        inode->i_ino, inode, inode->i_size, page->mapping->host,
121                        page->mapping->host->i_size, page->index, pg.off);
122         }
123
124         pg.flag = create ? OBD_BRW_CREATE : 0;
125
126         set->brw_callback = ll_brw_sync_wait;
127         rc = obd_brw(cmd, ll_i2obdconn(inode), lsm, 1, &pg, set, NULL);
128         if (rc) {
129                 if (rc != -EIO)
130                         CERROR("error from obd_brw: rc = %d\n", rc);
131         } else {
132                 rc = ll_brw_sync_wait(set, CB_PHASE_START);
133                 if (rc)
134                         CERROR("error from callback: rc = %d\n", rc);
135         }
136         obd_brw_set_free(set);
137
138         RETURN(rc);
139 }
140
141 /* returns the page unlocked, but with a reference */
142 static int ll_readpage(struct file *file, struct page *page)
143 {
144         struct inode *inode = page->mapping->host;
145         obd_off offset = ((obd_off)page->index) << PAGE_SHIFT;
146         int rc = 0;
147         ENTRY;
148
149         if (!PageLocked(page))
150                 LBUG();
151
152         if (inode->i_size <= offset) {
153                 CERROR("reading beyond EOF\n");
154                 memset(kmap(page), 0, PAGE_SIZE);
155                 kunmap(page);
156                 GOTO(readpage_out, rc);
157         }
158
159         /* XXX Workaround for BA OSTs returning short reads at EOF.  The linux
160          *     OST will return the full page, zero-filled at the end, which
161          *     will just overwrite the data we set here.
162          *     Bug 593 relates to fixing this properly.
163          */
164         if (inode->i_size < offset + PAGE_SIZE) {
165                 int count = inode->i_size - offset;
166                 void *addr = kmap(page);
167                 //POISON(addr, 0x7c, count);
168                 memset(addr + count, 0, PAGE_SIZE - count);
169                 kunmap(page);
170         }
171
172         if (PageUptodate(page)) {
173                 CERROR("Explain this please?\n");
174                 GOTO(readpage_out, rc);
175         }
176
177         CDEBUG(D_VFSTRACE, "VFS Op\n");
178         rc = ll_brw(OBD_BRW_READ, inode, page, 0);
179         EXIT;
180
181  readpage_out:
182         if (!rc)
183                 SetPageUptodate(page);
184         unlock_page(page);
185         return 0;
186 } /* ll_readpage */
187
188 void ll_truncate(struct inode *inode)
189 {
190         struct obdo oa = {0};
191         struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
192         struct lustre_handle lockh = { 0, 0 };
193         int err;
194         ENTRY;
195
196         if (!lsm) {
197                 /* object not yet allocated */
198                 inode->i_mtime = inode->i_ctime = CURRENT_TIME;
199                 return;
200         }
201
202         oa.o_id = lsm->lsm_object_id;
203         oa.o_mode = inode->i_mode;
204         oa.o_valid = OBD_MD_FLID | OBD_MD_FLMODE | OBD_MD_FLTYPE;
205
206         CDEBUG(D_VFSTRACE, "VFS Op\n");
207         CDEBUG(D_INFO, "calling punch for "LPX64" (all bytes after %Lu)\n",
208                oa.o_id, inode->i_size);
209
210         err = ll_size_lock(inode, lsm, inode->i_size, LCK_PW, &lockh);
211         if (err) {
212                 CERROR("ll_size_lock failed: %d\n", err);
213                 return;
214         }
215
216         /* truncate == punch from new size to absolute end of file */
217         err = obd_punch(ll_i2obdconn(inode), &oa, lsm, inode->i_size,
218                         OBD_OBJECT_EOF, NULL);
219         if (err)
220                 CERROR("obd_truncate fails (%d) ino %lu\n", err, inode->i_ino);
221         else
222                 obdo_to_inode(inode, &oa, oa.o_valid);
223
224         err = ll_size_unlock(inode, lsm, LCK_PW, &lockh);
225         if (err)
226                 CERROR("ll_size_unlock failed: %d\n", err);
227
228         EXIT;
229         return;
230 } /* ll_truncate */
231
232 //#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
233
234 static int ll_prepare_write(struct file *file, struct page *page, unsigned from,
235                             unsigned to)
236 {
237         struct inode *inode = page->mapping->host;
238         obd_off offset = ((obd_off)page->index) << PAGE_SHIFT;
239         int rc = 0;
240         char *addr;
241         ENTRY;
242
243         addr = kmap(page);
244         LASSERT(PageLocked(page));
245
246         if (PageUptodate(page))
247                 RETURN(0);
248
249         //POISON(addr + from, 0xca, to - from);
250
251         /* We're completely overwriting an existing page, so _don't_ set it up
252          * to date until commit_write */
253         if (from == 0 && to == PAGE_SIZE)
254                 RETURN(0);
255         CDEBUG(D_VFSTRACE, "VFS Op\n");
256
257         /* If are writing to a new page, no need to read old data.  If we
258          * haven't already gotten the file size in ll_file_write() since
259          * we got our extent lock, we need to verify it here before we
260          * overwrite some other node's write (bug 445).
261          */
262         if (inode->i_size <= offset) {
263                 if (!S_ISBLK(inode->i_mode) && !(file->f_flags & O_APPEND)) {
264                         struct ll_file_data *fd = file->private_data;
265                         struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
266
267                         rc = ll_file_size(inode, lsm, fd->fd_ostdata);
268                         if (rc)
269                                 GOTO(prepare_done, rc);
270                 }
271                 if (inode->i_size <= offset) {
272                         memset(addr, 0, PAGE_SIZE);
273                         GOTO(prepare_done, rc=0);
274                 }
275         }
276
277         rc = ll_brw(OBD_BRW_READ, inode, page, 0);
278
279         EXIT;
280  prepare_done:
281         if (!rc)
282                 SetPageUptodate(page);
283         else
284                 kunmap (page);
285
286         return rc;
287 }
288
289 /* Write a page from kupdated or kswapd.
290  *
291  * We unlock the page even in the face of an error, otherwise dirty
292  * pages could OOM the system if they cannot be written.  Also, there
293  * is nobody to return an error code to from here - the application
294  * may not even be running anymore.
295  *
296  * Returns the page unlocked, but with a reference.
297  */
298 static int ll_writepage(struct page *page) {
299         struct inode *inode = page->mapping->host;
300         int err;
301         ENTRY;
302
303         LASSERT(PageLocked(page));
304
305         /* XXX need to make sure we have LDLM lock on this page */
306         CDEBUG(D_VFSTRACE, "VFS Op\n");
307         err = ll_brw(OBD_BRW_WRITE, inode, page, 1);
308         if (err)
309                 CERROR("ll_brw failure %d\n", err);
310         else
311                 set_page_clean(page);
312
313         unlock_page(page);
314         RETURN(err);
315 }
316
317
318 /* SYNCHRONOUS I/O to object storage for an inode -- object attr will be updated
319  * too */
320 static int ll_commit_write(struct file *file, struct page *page,
321                            unsigned from, unsigned to)
322 {
323         struct inode *inode = page->mapping->host;
324         struct ll_inode_info *lli = ll_i2info(inode);
325         struct lov_stripe_md *md = lli->lli_smd;
326         struct brw_page pg;
327         struct obd_brw_set *set;
328         int rc, create = 1;
329         loff_t size;
330         ENTRY;
331
332         pg.pg = page;
333         pg.count = to;
334         /* XXX make the starting offset "from" */
335         pg.off = (((obd_off)page->index) << PAGE_SHIFT);
336         pg.flag = create ? OBD_BRW_CREATE : 0;
337
338         set = obd_brw_set_new();
339         if (set == NULL)
340                 RETURN(-ENOMEM);
341
342         SetPageUptodate(page);
343
344         if (!PageLocked(page))
345                 LBUG();
346
347         CDEBUG(D_VFSTRACE, "VFS Op\n");
348         CDEBUG(D_INODE, "commit_page writing (off "LPD64"), count %d\n",
349                pg.off, pg.count);
350
351         set->brw_callback = ll_brw_sync_wait;
352         rc = obd_brw(OBD_BRW_WRITE, ll_i2obdconn(inode), md, 1, &pg, set, NULL);
353         if (rc)
354                 CERROR("error from obd_brw: rc = %d\n", rc);
355         else {
356                 rc = ll_brw_sync_wait(set, CB_PHASE_START);
357                 if (rc)
358                         CERROR("error from callback: rc = %d\n", rc);
359         }
360         obd_brw_set_free(set);
361         kunmap(page);
362
363         size = pg.off + pg.count;
364         /* do NOT truncate when writing in the middle of a file */
365         if (size > inode->i_size)
366                 inode->i_size = size;
367
368         RETURN(rc);
369 } /* ll_commit_write */
370
371 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
372 static int ll_direct_IO(int rw, struct inode *inode, struct kiobuf *iobuf,
373                         unsigned long blocknr, int blocksize)
374 {
375         struct ll_inode_info *lli = ll_i2info(inode);
376         struct lov_stripe_md *lsm = lli->lli_smd;
377         struct brw_page *pga;
378         struct obd_brw_set *set;
379         loff_t offset;
380         int length, i, flags, rc = 0;
381         ENTRY;
382
383         CDEBUG(D_VFSTRACE, "VFS Op\n");
384         if (!lsm || !lsm->lsm_object_id)
385                 RETURN(-ENOMEM);
386
387         /* XXX Keep here until we find ia64 problem, it crashes otherwise */
388         if (blocksize != PAGE_SIZE) {
389                 CERROR("direct_IO blocksize != PAGE_SIZE\n");
390                 RETURN(-EINVAL);
391         }
392
393         set = obd_brw_set_new();
394         if (set == NULL)
395                 RETURN(-ENOMEM);
396
397         OBD_ALLOC(pga, sizeof(*pga) * iobuf->nr_pages);
398         if (!pga) {
399                 obd_brw_set_free(set);
400                 RETURN(-ENOMEM);
401         }
402
403         CDEBUG(D_PAGE, "blocksize %u, blocknr %lu, iobuf %p: nr_pages %u, "
404                        "array_len %u, offset %u, length %u\n",
405                blocksize, blocknr, iobuf, iobuf->nr_pages,
406                iobuf->array_len, iobuf->offset, iobuf->length);
407
408         flags = (rw == WRITE ? OBD_BRW_CREATE : 0) /* | OBD_BRW_DIRECTIO */;
409         offset = (blocknr << inode->i_blkbits) /* + iobuf->offset? */;
410         length = iobuf->length;
411
412         for (i = 0, length = iobuf->length; length > 0;
413              length -= pga[i].count, offset += pga[i].count, i++) { /*i last!*/
414                 pga[i].pg = iobuf->maplist[i];
415                 pga[i].off = offset;
416                 /* To the end of the page, or the length, whatever is less */
417                 pga[i].count = min_t(int, PAGE_SIZE - (offset & ~PAGE_MASK),
418                                      length);
419                 pga[i].flag = flags;
420                 CDEBUG(D_PAGE, "page %d (%p), offset "LPU64", count %u\n",
421                        i, pga[i].pg, pga[i].off, pga[i].count);
422                 if (rw == READ) {
423                         //POISON(kmap(iobuf->maplist[i]), 0xc5, PAGE_SIZE);
424                         //kunmap(iobuf->maplist[i]);
425                 }
426         }
427
428         set->brw_callback = ll_brw_sync_wait;
429         rc = obd_brw(rw == WRITE ? OBD_BRW_WRITE : OBD_BRW_READ,
430                      ll_i2obdconn(inode), lsm, iobuf->nr_pages, pga, set, NULL);
431         if (rc) {
432                 CDEBUG(rc == -ENOSPC ? D_INODE : D_ERROR,
433                        "error from obd_brw: rc = %d\n", rc);
434         } else {
435                 rc = ll_brw_sync_wait(set, CB_PHASE_START);
436                 if (rc)
437                         CERROR("error from callback: rc = %d\n", rc);
438         }
439         obd_brw_set_free(set);
440         if (rc == 0)
441                 rc = iobuf->length;
442
443         OBD_FREE(pga, sizeof(*pga) * iobuf->nr_pages);
444         RETURN(rc);
445 }
446 #endif
447
448 int ll_flush_inode_pages(struct inode * inode)
449 {
450         obd_count        bufs_per_obdo = 0;
451         obd_size         *count = NULL;
452         obd_off          *offset = NULL;
453         obd_flag         *flags = NULL;
454         int              err = 0;
455
456         ENTRY;
457
458 #if (LINUX_VERSION_CODE <= KERNEL_VERSION(2,5,0))
459         spin_lock(&pagecache_lock);
460
461         spin_unlock(&pagecache_lock);
462 #endif
463
464
465         OBD_ALLOC(count, sizeof(*count) * bufs_per_obdo);
466         OBD_ALLOC(offset, sizeof(*offset) * bufs_per_obdo);
467         OBD_ALLOC(flags, sizeof(*flags) * bufs_per_obdo);
468         if (!count || !offset || !flags)
469                 GOTO(out, err=-ENOMEM);
470
471 #if 0
472         for (i = 0 ; i < bufs_per_obdo ; i++) {
473                 count[i] = PAGE_SIZE;
474                 offset[i] = ((obd_off)(iobuf->maplist[i])->index) << PAGE_SHIFT;
475                 flags[i] = OBD_BRW_CREATE;
476         }
477
478         err = obd_brw(OBD_BRW_WRITE, ll_i2obdconn(inode),
479                       ll_i2info(inode)->lli_smd, bufs_per_obdo,
480                       iobuf->maplist, count, offset, flags, NULL, NULL);
481         if (err == 0)
482                 err = bufs_per_obdo * 4096;
483 #endif
484  out:
485         OBD_FREE(flags, sizeof(*flags) * bufs_per_obdo);
486         OBD_FREE(count, sizeof(*count) * bufs_per_obdo);
487         OBD_FREE(offset, sizeof(*offset) * bufs_per_obdo);
488         RETURN(err);
489 }
490
491 //#endif
492
493
494 struct address_space_operations ll_aops = {
495         readpage: ll_readpage,
496 #if (LINUX_VERSION_CODE <= KERNEL_VERSION(2,5,0))
497         direct_IO: ll_direct_IO,
498 #endif
499         writepage: ll_writepage,
500         sync_page: block_sync_page,
501         prepare_write: ll_prepare_write,
502         commit_write: ll_commit_write,
503         bmap: NULL
504 //#endif
505 };