Whamcloud - gitweb
Land b_smallfix onto HEAD (20040223_1817)
[fs/lustre-release.git] / lustre / liblustre / rw.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Lustre Light Super operations
5  *
6  *  Copyright (c) 2002, 2003 Cluster File Systems, Inc.
7  *
8  *   This file is part of Lustre, http://www.lustre.org.
9  *
10  *   Lustre is free software; you can redistribute it and/or
11  *   modify it under the terms of version 2 of the GNU General Public
12  *   License as published by the Free Software Foundation.
13  *
14  *   Lustre is distributed in the hope that it will be useful,
15  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
16  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17  *   GNU General Public License for more details.
18  *
19  *   You should have received a copy of the GNU General Public License
20  *   along with Lustre; if not, write to the Free Software
21  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
22  */
23
24 #define DEBUG_SUBSYSTEM S_LLITE
25
26 #include <stdlib.h>
27 #include <string.h>
28 #include <assert.h>
29 #include <time.h>
30 #include <sys/types.h>
31 #include <sys/queue.h>
32
33 #include <sysio.h>
34 #include <fs.h>
35 #include <mount.h>
36 #include <inode.h>
37 #include <file.h>
38
39 #undef LIST_HEAD
40
41 #include "llite_lib.h"
42
43 static int llu_extent_lock_callback(struct ldlm_lock *lock,
44                                     struct ldlm_lock_desc *new, void *data,
45                                     int flag)
46 {
47         struct lustre_handle lockh = { 0 };
48         int rc;
49         ENTRY;
50         
51
52         if ((unsigned long)data > 0 && (unsigned long)data < 0x1000) {
53                 LDLM_ERROR(lock, "cancelling lock with bad data %p", data);
54                 LBUG();
55         }
56         
57         switch (flag) {
58         case LDLM_CB_BLOCKING:
59                 ldlm_lock2handle(lock, &lockh);
60                 rc = ldlm_cli_cancel(&lockh);
61                 if (rc != ELDLM_OK)
62                         CERROR("ldlm_cli_cancel failed: %d\n", rc);
63                 break;
64         case LDLM_CB_CANCELING: {
65                 struct inode *inode = llu_inode_from_lock(lock);
66                 struct llu_inode_info *lli;
67                 
68                 if (!inode)
69                         RETURN(0);
70                 lli= llu_i2info(inode);
71                 if (!lli) {
72                         I_RELE(inode);
73                         RETURN(0);
74                 }
75                 if (!lli->lli_smd) {
76                         I_RELE(inode);
77                         RETURN(0);
78                 }
79
80 /*
81                 ll_pgcache_remove_extent(inode, lli->lli_smd, lock);
82                 iput(inode);
83 */
84                 I_RELE(inode);
85                 break;
86         }
87         default:
88                 LBUG();
89         }
90         
91         RETURN(0);
92 }
93
94 int llu_extent_lock_no_validate(struct ll_file_data *fd,
95                                 struct inode *inode,
96                                 struct lov_stripe_md *lsm,
97                                 int mode,
98                                 struct ldlm_extent *extent,
99                                 struct lustre_handle *lockh,
100                                 int ast_flags)
101 {
102         struct llu_sb_info *sbi = llu_i2sbi(inode);
103         struct llu_inode_info *lli = llu_i2info(inode);
104         int rc;
105         ENTRY;
106
107         LASSERT(lockh->cookie == 0);
108
109         /* XXX phil: can we do this?  won't it screw the file size up? */
110         if ((fd && (fd->fd_flags & LL_FILE_IGNORE_LOCK)) ||
111             (sbi->ll_flags & LL_SBI_NOLCK))
112                 RETURN(0);
113
114         CDEBUG(D_DLMTRACE, "Locking inode %lu, start "LPU64" end "LPU64"\n",
115                lli->lli_st_ino, extent->start, extent->end);
116
117         rc = obd_enqueue(sbi->ll_osc_exp, lsm, NULL, LDLM_EXTENT, extent,
118                          sizeof(extent), mode, &ast_flags,
119                          llu_extent_lock_callback, inode, lockh);
120
121         RETURN(rc);
122 }
123
124 /*
125  * this grabs a lock and manually implements behaviour that makes it look like
126  * the OST is returning the file size with each lock acquisition.
127  */
128 int llu_extent_lock(struct ll_file_data *fd, struct inode *inode,
129                     struct lov_stripe_md *lsm, int mode,
130                     struct ldlm_extent *extent, struct lustre_handle *lockh)
131 {
132         struct llu_inode_info *lli = llu_i2info(inode);
133         struct obd_export *exp = llu_i2obdexp(inode);
134         struct ldlm_extent size_lock;
135         struct lustre_handle match_lockh = {0};
136         int flags, rc, matched;
137         ENTRY;
138
139         rc = llu_extent_lock_no_validate(fd, inode, lsm, mode, extent, lockh, 0);
140         if (rc != ELDLM_OK)
141                 RETURN(rc);
142
143         if (test_bit(LLI_F_HAVE_OST_SIZE_LOCK, &lli->lli_flags))
144                 RETURN(0);
145
146         rc = llu_inode_getattr(inode, lsm);
147         if (rc) {
148                 llu_extent_unlock(fd, inode, lsm, mode, lockh);
149                 RETURN(rc);
150         }
151
152         size_lock.start = lli->lli_st_size;
153         size_lock.end = OBD_OBJECT_EOF;
154
155         /* XXX I bet we should be checking the lock ignore flags.. */
156         flags = LDLM_FL_CBPENDING | LDLM_FL_BLOCK_GRANTED;
157         matched = obd_match(exp, lsm, LDLM_EXTENT, &size_lock,
158                             sizeof(size_lock), LCK_PR, &flags, inode,
159                             &match_lockh);
160
161         /* hey, alright, we hold a size lock that covers the size we 
162          * just found, its not going to change for a while.. */
163         if (matched == 1) {
164                 set_bit(LLI_F_HAVE_OST_SIZE_LOCK, &lli->lli_flags);
165                 obd_cancel(exp, lsm, LCK_PR, &match_lockh);
166         } 
167
168         RETURN(0);
169 }
170
171 int llu_extent_unlock(struct ll_file_data *fd, struct inode *inode,
172                 struct lov_stripe_md *lsm, int mode,
173                 struct lustre_handle *lockh)
174 {
175         struct llu_sb_info *sbi = llu_i2sbi(inode);
176         int rc;
177         ENTRY;
178 #if 0
179         /* XXX phil: can we do this?  won't it screw the file size up? */
180         if ((fd && (fd->fd_flags & LL_FILE_IGNORE_LOCK)) ||
181             (sbi->ll_flags & LL_SBI_NOLCK))
182                 RETURN(0);
183 #endif
184         rc = obd_cancel(sbi->ll_osc_exp, lsm, mode, lockh);
185
186         RETURN(rc);
187 }
188
189 #define LLAP_MAGIC 12346789
190
191 struct ll_async_page {
192         int             llap_magic;
193         void           *llap_cookie;
194         int             llap_queued;
195         struct page    *llap_page;
196         struct inode   *llap_inode;
197 };
198
199 static struct ll_async_page *llap_from_cookie(void *cookie)
200 {
201         struct ll_async_page *llap = cookie;
202         if (llap->llap_magic != LLAP_MAGIC)
203                 return ERR_PTR(-EINVAL);
204         return llap;
205 };
206
207 static void llu_ap_fill_obdo(void *data, int cmd, struct obdo *oa)
208 {
209         struct ll_async_page *llap;
210         struct inode *inode;
211         struct lov_stripe_md *lsm;
212         obd_flag valid_flags;
213         ENTRY;
214
215         llap = llap_from_cookie(data);
216         if (IS_ERR(llap)) {
217                 EXIT;
218                 return;
219         }
220
221         inode = llap->llap_inode;
222         lsm = llu_i2info(inode)->lli_smd;
223
224         oa->o_id = lsm->lsm_object_id;
225         oa->o_valid = OBD_MD_FLID;
226         valid_flags = OBD_MD_FLTYPE | OBD_MD_FLATIME;
227         if (cmd == OBD_BRW_WRITE)
228                 valid_flags |= OBD_MD_FLMTIME | OBD_MD_FLCTIME;
229
230         obdo_from_inode(oa, inode, valid_flags);
231         EXIT;
232 }
233
234 /* called for each page in a completed rpc.*/
235 static void llu_ap_completion(void *data, int cmd, int rc)
236 {
237         struct ll_async_page *llap;
238         struct page *page;
239
240         llap = llap_from_cookie(data);
241         if (IS_ERR(llap)) {
242                 EXIT;
243                 return;
244         }
245
246         llap->llap_queued = 0;
247         page = llap->llap_page;
248
249         if (rc != 0) {
250                 if (cmd == OBD_BRW_WRITE)
251                         CERROR("writeback error on page %p index %ld: %d\n", 
252                                page, page->index, rc);
253         }
254         EXIT;
255 }
256
257 static struct obd_async_page_ops llu_async_page_ops = {
258         .ap_make_ready =        NULL,
259         .ap_refresh_count =     NULL,
260         .ap_fill_obdo =         llu_ap_fill_obdo,
261         .ap_completion =        llu_ap_completion,
262 };
263
264 static
265 struct llu_sysio_cookie* get_sysio_cookie(struct inode *inode, int maxpages)
266 {
267         struct llu_sysio_cookie *cookie;
268         int rc;
269
270         OBD_ALLOC(cookie, LLU_SYSIO_COOKIE_SIZE(maxpages));
271         if (cookie == NULL)
272                 goto out;
273
274         I_REF(inode);
275         cookie->lsc_inode = inode;
276         cookie->lsc_maxpages = maxpages;
277         cookie->lsc_llap = (struct ll_async_page *)(cookie + 1);
278         cookie->lsc_pages = (struct page *) (cookie->lsc_llap + maxpages);
279
280         rc = oig_init(&cookie->lsc_oig);
281         if (rc) {
282                 OBD_FREE(cookie, LLU_SYSIO_COOKIE_SIZE(maxpages));
283                 cookie = NULL;
284         }
285
286 out:
287         return cookie;
288 }
289
290 static
291 void put_sysio_cookie(struct llu_sysio_cookie *cookie)
292 {
293         struct lov_stripe_md *lsm = llu_i2info(cookie->lsc_inode)->lli_smd;
294         struct obd_export *exp = llu_i2obdexp(cookie->lsc_inode);
295         struct ll_async_page *llap = cookie->lsc_llap;
296 #ifdef LIBLUSTRE_HANDLE_UNALIGNED_PAGE
297         struct page *pages = cookie->lsc_pages;
298 #endif
299         int i;
300
301         for (i = 0; i< cookie->lsc_maxpages; i++) {
302                 if (llap[i].llap_cookie)
303                         obd_teardown_async_page(exp, lsm, NULL,
304                                                 llap[i].llap_cookie);
305 #ifdef LIBLUSTRE_HANDLE_UNALIGNED_PAGE
306                 if (pages[i]._managed) {
307                         free(pages[i].addr);
308                         pages[i]._managed = 0;
309                 }
310 #endif
311         }
312
313         I_RELE(cookie->lsc_inode);
314
315         oig_release(cookie->lsc_oig);
316         OBD_FREE(cookie, LLU_SYSIO_COOKIE_SIZE(cookie->lsc_maxpages));
317 }
318
319 #ifdef LIBLUSTRE_HANDLE_UNALIGNED_PAGE
320 /* Note: these code should be removed finally, don't need
321  * more cleanup
322  */
323 static
324 int prepare_unaligned_write(struct llu_sysio_cookie *cookie)
325 {
326         struct inode *inode = cookie->lsc_inode;
327         struct llu_inode_info *lli = llu_i2info(inode);
328         struct lov_stripe_md *lsm = lli->lli_smd;
329         struct obdo oa;
330         struct page *pages = cookie->lsc_pages;
331         int i, pgidx[2] = {0, cookie->lsc_npages-1};
332         int rc;
333         ENTRY;
334
335         for (i = 0; i < 2; i++) {
336                 struct page *oldpage = &pages[pgidx[i]];
337                 struct page newpage;
338                 struct brw_page pg;
339                 char *newbuf;
340
341                 if (i == 0 && pgidx[0] == pgidx[1])
342                         continue;
343
344                 LASSERT(oldpage->_offset + oldpage->_count <= PAGE_CACHE_SIZE);
345
346                 if (oldpage->_count == PAGE_CACHE_SIZE)
347                         continue;
348
349                 if (oldpage->index << PAGE_CACHE_SHIFT >=
350                     lli->lli_st_size)
351                         continue;
352
353                 newbuf = malloc(PAGE_CACHE_SIZE);
354                 if (!newbuf)
355                         return -ENOMEM;
356
357                 newpage.index = oldpage->index;
358                 newpage.addr = newbuf;
359
360                 pg.pg = &newpage;
361                 pg.off = ((obd_off)newpage.index << PAGE_CACHE_SHIFT);
362                 if (pg.off + PAGE_CACHE_SIZE > lli->lli_st_size)
363                         pg.count = lli->lli_st_size % PAGE_CACHE_SIZE;
364                 else
365                         pg.count = PAGE_CACHE_SIZE;
366                 pg.flag = 0;
367
368                 oa.o_id = lsm->lsm_object_id;
369                 oa.o_mode = lli->lli_st_mode;
370                 oa.o_valid = OBD_MD_FLID | OBD_MD_FLMODE | OBD_MD_FLTYPE;
371
372                 /* issue read */
373                 rc = obd_brw(OBD_BRW_READ, llu_i2obdexp(inode), &oa, lsm, 1, &pg, NULL);
374                 if (rc) {
375                         free(newbuf);
376                         RETURN(rc);
377                 }
378
379                 /* copy page content, and reset page params */
380                 memcpy(newbuf + oldpage->_offset,
381                        (char*)oldpage->addr + oldpage->_offset,
382                        oldpage->_count);
383
384                 oldpage->addr = newbuf;
385                 if ((((obd_off)oldpage->index << PAGE_CACHE_SHIFT) +
386                     oldpage->_offset + oldpage->_count) > lli->lli_st_size)
387                         oldpage->_count += oldpage->_offset;
388                 else
389                         oldpage->_count = PAGE_CACHE_SIZE;
390                 oldpage->_offset = 0;
391                 oldpage->_managed = 1;
392         }
393
394         RETURN(0);
395 }
396 #endif
397
398 static
399 int llu_prep_async_io(struct llu_sysio_cookie *cookie, int cmd,
400                       char *buf, loff_t pos, size_t count)
401 {
402         struct llu_inode_info *lli = llu_i2info(cookie->lsc_inode);
403         struct lov_stripe_md *lsm = lli->lli_smd;
404         struct obd_export *exp = llu_i2obdexp(cookie->lsc_inode);
405         struct page *pages = cookie->lsc_pages;
406         struct ll_async_page *llap = cookie->lsc_llap;
407         int i, rc, npages = 0;
408         ENTRY;
409
410         if (!exp)
411                 RETURN(-EINVAL);
412
413         /* prepare the pages array */
414         do {
415                 unsigned long index, offset, bytes;
416
417                 offset = (pos & ~PAGE_CACHE_MASK);
418                 index = pos >> PAGE_CACHE_SHIFT;
419                 bytes = PAGE_CACHE_SIZE - offset;
420                 if (bytes > count)
421                         bytes = count;
422
423                 /* prevent read beyond file range */
424                 if ((cmd == OBD_BRW_READ) &&
425                     (pos + bytes) >= lli->lli_st_size) {
426                         if (pos >= lli->lli_st_size)
427                                 break;
428                         bytes = lli->lli_st_size - pos;
429                 }
430
431                 /* prepare page for this index */
432                 pages[npages].index = index;
433                 pages[npages].addr = buf - offset;
434
435                 pages[npages]._offset = offset;
436                 pages[npages]._count = bytes;
437
438                 npages++;
439                 count -= bytes;
440                 pos += bytes;
441                 buf += bytes;
442
443                 cookie->lsc_rwcount += bytes;
444         } while (count);
445
446         cookie->lsc_npages = npages;
447
448 #ifdef LIBLUSTRE_HANDLE_UNALIGNED_PAGE
449         if (cmd == OBD_BRW_WRITE) {
450                 rc = prepare_unaligned_write(cookie);
451                 if (rc)
452                         RETURN(rc);
453         }
454 #endif
455
456         for (i = 0; i < npages; i++) {
457                 llap[i].llap_magic = LLAP_MAGIC;
458                 rc = obd_prep_async_page(exp, lsm, NULL, &pages[i],
459                                          (obd_off)pages[i].index << PAGE_SHIFT,
460                                          &llu_async_page_ops,
461                                          &llap[i], &llap[i].llap_cookie);
462                 if (rc) {
463                         llap[i].llap_cookie = NULL;
464                         RETURN(rc);
465                 }
466                 CDEBUG(D_CACHE, "llap %p page %p cookie %p obj off "LPU64"\n",
467                        &llap[i], &pages[i], llap[i].llap_cookie,
468                        (obd_off)pages[i].index << PAGE_SHIFT);
469                 pages[i].private = (unsigned long)&llap[i];
470                 llap[i].llap_page = &pages[i];
471                 llap[i].llap_inode = cookie->lsc_inode;
472
473                 rc = obd_queue_group_io(exp, lsm, NULL, cookie->lsc_oig,
474                                         llap[i].llap_cookie, cmd,
475                                         pages[i]._offset, pages[i]._count, 0,
476                                         ASYNC_READY | ASYNC_URGENT |
477                                         ASYNC_COUNT_STABLE | ASYNC_GROUP_SYNC);
478                 if (rc)
479                         RETURN(rc);
480
481                 llap[i].llap_queued = 1;
482         }
483
484         RETURN(0);
485 }
486
487 static
488 int llu_start_async_io(struct llu_sysio_cookie *cookie)
489 {
490         struct lov_stripe_md *lsm = llu_i2info(cookie->lsc_inode)->lli_smd;
491         struct obd_export *exp = llu_i2obdexp(cookie->lsc_inode);
492
493         return obd_trigger_group_io(exp, lsm, NULL, cookie->lsc_oig);
494 }
495
496 /*
497  * read/write a continuous buffer for an inode (zero-copy)
498  */
499 struct llu_sysio_cookie*
500 llu_rw(int cmd, struct inode *inode, char *buf, size_t count, loff_t pos)
501 {
502         struct llu_sysio_cookie *cookie;
503         int max_pages, rc;
504         ENTRY;
505
506         max_pages = (count >> PAGE_SHIFT) + 2;
507
508         cookie = get_sysio_cookie(inode, max_pages);
509         if (!cookie)
510                 RETURN(ERR_PTR(-ENOMEM));
511
512         rc = llu_prep_async_io(cookie, cmd, buf, pos, count);
513         if (rc)
514                 GOTO(out_cleanup, rc);
515
516         rc = llu_start_async_io(cookie);
517         if (rc)
518                 GOTO(out_cleanup, rc);
519
520 /*
521         rc = oig_wait(&oig);
522         if (rc) {
523                 CERROR("file i/o error!\n");
524                 rw_count = rc;
525         }
526 */
527         RETURN(cookie);
528
529 out_cleanup:
530         put_sysio_cookie(cookie);
531         RETURN(ERR_PTR(rc));
532 }
533
534 struct llu_sysio_callback_args*
535 llu_file_write(struct inode *inode, const struct iovec *iovec,
536                size_t iovlen, loff_t pos)
537 {
538         struct llu_inode_info *lli = llu_i2info(inode);
539         struct ll_file_data *fd = lli->lli_file_data;
540         struct lustre_handle lockh = {0};
541         struct lov_stripe_md *lsm = lli->lli_smd;
542         struct llu_sysio_callback_args *lsca;
543         struct llu_sysio_cookie *cookie;
544         struct ldlm_extent extent;
545         ldlm_error_t err;
546         int iovidx;
547         ENTRY;
548
549         /* XXX consider other types later */
550         if (!S_ISREG(lli->lli_st_mode))
551                 LBUG();
552
553         LASSERT(iovlen <= MAX_IOVEC);
554
555         OBD_ALLOC(lsca, sizeof(*lsca));
556         if (!lsca)
557                 RETURN(ERR_PTR(-ENOMEM));
558
559         /* FIXME optimize the following extent locking */
560         for (iovidx = 0; iovidx < iovlen; iovidx++) {
561                 char *buf = (char*)iovec[iovidx].iov_base;
562                 size_t count = iovec[iovidx].iov_len;
563
564                 if (count == 0)
565                         continue;
566
567                 /* FIXME libsysio haven't handle O_APPEND */
568                 extent.start = pos;
569                 extent.end = pos + count - 1;
570
571 #ifdef LIBLUSTRE_HANDLE_UNALIGNED_PAGE
572                 if ((pos & ~PAGE_CACHE_MASK) == 0 &&
573                     (count & ~PAGE_CACHE_MASK) == 0)
574                         err = llu_extent_lock_no_validate(fd, inode, lsm,
575                                                 LCK_PW, &extent, &lockh, 0);
576                 else
577                         err = llu_extent_lock(fd, inode, lsm, LCK_PW,
578                                                 &extent, &lockh);
579 #else
580                 /* server will handle partial write, so we don't
581                  * care for file size here */
582                 err = llu_extent_lock_no_validate(fd, inode, lsm, LCK_PW,
583                                                 &extent, &lockh, 0);
584 #endif
585                 if (err != ELDLM_OK)
586                         GOTO(err_out, err = -ENOLCK);
587
588                 CDEBUG(D_INFO, "Writing inode %lu, "LPSZ" bytes, offset %Lu\n",
589                        lli->lli_st_ino, count, pos);
590
591                 cookie = llu_rw(OBD_BRW_WRITE, inode, buf, count, pos);
592                 if (!IS_ERR(cookie)) {
593                         /* save cookie */
594                         lsca->cookies[lsca->ncookies++] = cookie;
595                         pos += count;
596                         /* file size grow. XXX should be done here? */
597                         if (pos > lli->lli_st_size) {
598                                 lli->lli_st_size = pos;
599                                 set_bit(LLI_F_PREFER_EXTENDED_SIZE,
600                                         &lli->lli_flags);
601                         }
602                 } else {
603                         llu_extent_unlock(fd, inode, lsm, LCK_PW, &lockh);
604                         GOTO(err_out, err = PTR_ERR(cookie));
605                 }
606
607                 /* XXX errors? */
608                 err = llu_extent_unlock(fd, inode, lsm, LCK_PW, &lockh);
609                 if (err)
610                         CERROR("extent unlock error %d\n", err);
611         }
612
613         RETURN(lsca);
614
615 err_out:
616         /* teardown all async stuff */
617         while (lsca->ncookies--) {
618                 put_sysio_cookie(lsca->cookies[lsca->ncookies]);
619         }
620         OBD_FREE(lsca, sizeof(*lsca));
621
622         RETURN(ERR_PTR(err));
623 }
624
625 #if 0
626 static void llu_update_atime(struct inode *inode)
627 {
628         struct llu_inode_info *lli = llu_i2info(inode);
629
630 #ifdef USE_ATIME
631         struct iattr attr;
632
633         attr.ia_atime = LTIME_S(CURRENT_TIME);
634         attr.ia_valid = ATTR_ATIME;
635
636         if (lli->lli_st_atime == attr.ia_atime) return;
637         if (IS_RDONLY(inode)) return;
638         if (IS_NOATIME(inode)) return;
639
640         /* ll_inode_setattr() sets inode->i_atime from attr.ia_atime */
641         llu_inode_setattr(inode, &attr, 0);
642 #else
643         /* update atime, but don't explicitly write it out just this change */
644         inode->i_atime = CURRENT_TIME;
645 #endif
646 }
647 #endif
648
649 struct llu_sysio_callback_args*
650 llu_file_read(struct inode *inode, const struct iovec *iovec,
651                        size_t iovlen, loff_t pos)
652 {
653         struct llu_inode_info *lli = llu_i2info(inode);
654         struct ll_file_data *fd = lli->lli_file_data;
655         struct lov_stripe_md *lsm = lli->lli_smd;
656         struct lustre_handle lockh = { 0 };
657         struct ldlm_extent extent;
658         struct llu_sysio_callback_args *lsca;
659         struct llu_sysio_cookie *cookie;
660         int iovidx;
661
662         ldlm_error_t err;
663         ENTRY;
664
665         OBD_ALLOC(lsca, sizeof(*lsca));
666         if (!lsca)
667                 RETURN(ERR_PTR(-ENOMEM));
668
669         for (iovidx = 0; iovidx < iovlen; iovidx++) {
670                 char *buf = iovec[iovidx].iov_base;
671                 size_t count = iovec[iovidx].iov_len;
672
673                 /* "If nbyte is 0, read() will return 0 and have no other results."
674                  *                      -- Single Unix Spec */
675                 if (count == 0)
676                         continue;
677
678                 extent.start = pos;
679                 extent.end = pos + count - 1;
680
681                 err = llu_extent_lock(fd, inode, lsm, LCK_PR, &extent, &lockh);
682                 if (err != ELDLM_OK)
683                         GOTO(err_out, err = -ENOLCK);
684
685                 CDEBUG(D_INFO, "Reading inode %lu, "LPSZ" bytes, offset %Ld\n",
686                        lli->lli_st_ino, count, pos);
687
688                 if (pos >= lli->lli_st_size) {
689                         llu_extent_unlock(fd, inode, lsm, LCK_PR, &lockh);
690                         break;
691                 }
692
693                 cookie = llu_rw(OBD_BRW_READ, inode, buf, count, pos);
694                 if (!IS_ERR(cookie)) {
695                         /* save cookie */
696                         lsca->cookies[lsca->ncookies++] = cookie;
697                         pos += count;
698                 } else {
699                         llu_extent_unlock(fd, inode, lsm, LCK_PR, &lockh);
700                         GOTO(err_out, err = PTR_ERR(cookie));
701                 }
702
703                 /* XXX errors? */
704                 err = llu_extent_unlock(fd, inode, lsm, LCK_PR, &lockh);
705                 if (err)
706                         CERROR("extent_unlock fail: %d\n", err);
707         }
708 #if 0
709         if (readed > 0)
710                 llu_update_atime(inode);
711 #endif
712         RETURN(lsca);
713
714 err_out:
715         /* teardown all async stuff */
716         while (lsca->ncookies--) {
717                 put_sysio_cookie(lsca->cookies[lsca->ncookies]);
718         }
719         OBD_FREE(lsca, sizeof(*lsca));
720
721         RETURN(ERR_PTR(err));
722 }
723
724 int llu_iop_iodone(struct ioctx *ioctxp)
725 {
726         struct llu_sysio_callback_args *lsca = ioctxp->ioctx_private;
727         struct llu_sysio_cookie *cookie;
728         int i, err = 0, rc = 0;
729         ENTRY;
730
731         /* write/read(fd, buf, 0) */
732         if (!lsca) {
733                 ioctxp->ioctx_cc = 0;
734                 RETURN(1);
735         }
736
737         LASSERT(!IS_ERR(lsca));
738
739         for (i = 0; i < lsca->ncookies; i++) {
740                 cookie = lsca->cookies[i];
741                 if (cookie) {
742                         err = oig_wait(cookie->lsc_oig);
743                         if (err && !rc)
744                                 rc = err;
745                         if (!rc)
746                                 ioctxp->ioctx_cc += cookie->lsc_rwcount;
747                         put_sysio_cookie(cookie);
748                 }
749         }
750
751         if (rc) {
752                 LASSERT(rc < 0);
753                 ioctxp->ioctx_cc = -1;
754                 ioctxp->ioctx_errno = -rc;
755         }
756
757         OBD_FREE(lsca, sizeof(*lsca));
758         ioctxp->ioctx_private = NULL;
759
760         RETURN(1);
761 }