Whamcloud - gitweb
land b_eq on HEAD
[fs/lustre-release.git] / lustre / liblustre / rw.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Lustre Light Super operations
5  *
6  *  Copyright (c) 2002, 2003 Cluster File Systems, Inc.
7  *
8  *   This file is part of Lustre, http://www.lustre.org.
9  *
10  *   Lustre is free software; you can redistribute it and/or
11  *   modify it under the terms of version 2 of the GNU General Public
12  *   License as published by the Free Software Foundation.
13  *
14  *   Lustre is distributed in the hope that it will be useful,
15  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
16  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17  *   GNU General Public License for more details.
18  *
19  *   You should have received a copy of the GNU General Public License
20  *   along with Lustre; if not, write to the Free Software
21  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
22  */
23
24 #define DEBUG_SUBSYSTEM S_LLITE
25
26 #include <stdlib.h>
27 #include <string.h>
28 #include <assert.h>
29 #include <time.h>
30 #include <sys/types.h>
31 #include <sys/queue.h>
32
33 #include <sysio.h>
34 #include <fs.h>
35 #include <mount.h>
36 #include <inode.h>
37 #include <file.h>
38
39 #undef LIST_HEAD
40
41 #include "llite_lib.h"
42
43 static int llu_extent_lock_callback(struct ldlm_lock *lock,
44                                     struct ldlm_lock_desc *new, void *data,
45                                     int flag)
46 {
47         struct lustre_handle lockh = { 0 };
48         int rc;
49         ENTRY;
50         
51
52         if ((unsigned long)data > 0 && (unsigned long)data < 0x1000) {
53                 LDLM_ERROR(lock, "cancelling lock with bad data %p", data);
54                 LBUG();
55         }
56         
57         switch (flag) {
58         case LDLM_CB_BLOCKING:
59                 ldlm_lock2handle(lock, &lockh);
60                 rc = ldlm_cli_cancel(&lockh);
61                 if (rc != ELDLM_OK)
62                         CERROR("ldlm_cli_cancel failed: %d\n", rc);
63                 break;
64         case LDLM_CB_CANCELING: {
65                 struct inode *inode = llu_inode_from_lock(lock);
66                 struct llu_inode_info *lli;
67                 
68                 if (!inode)
69                         RETURN(0);
70                 lli= llu_i2info(inode);
71                 if (!lli) {
72                         I_RELE(inode);
73                         RETURN(0);
74                 }
75                 if (!lli->lli_smd) {
76                         I_RELE(inode);
77                         RETURN(0);
78                 }
79
80 /*
81                 ll_pgcache_remove_extent(inode, lli->lli_smd, lock);
82                 iput(inode);
83 */
84                 I_RELE(inode);
85                 break;
86         }
87         default:
88                 LBUG();
89         }
90         
91         RETURN(0);
92 }
93
94 int llu_extent_lock_no_validate(struct ll_file_data *fd,
95                                 struct inode *inode,
96                                 struct lov_stripe_md *lsm,
97                                 int mode,
98                                 struct ldlm_extent *extent,
99                                 struct lustre_handle *lockh,
100                                 int ast_flags)
101 {
102         struct llu_sb_info *sbi = llu_i2sbi(inode);
103         struct llu_inode_info *lli = llu_i2info(inode);
104         int rc;
105         ENTRY;
106
107         LASSERT(lockh->cookie == 0);
108
109         /* XXX phil: can we do this?  won't it screw the file size up? */
110         if ((fd && (fd->fd_flags & LL_FILE_IGNORE_LOCK)) ||
111             (sbi->ll_flags & LL_SBI_NOLCK))
112                 RETURN(0);
113
114         CDEBUG(D_DLMTRACE, "Locking inode %lu, start "LPU64" end "LPU64"\n",
115                lli->lli_st_ino, extent->start, extent->end);
116
117         rc = obd_enqueue(sbi->ll_osc_exp, lsm, NULL, LDLM_EXTENT, extent,
118                          sizeof(extent), mode, &ast_flags,
119                          llu_extent_lock_callback, inode, lockh);
120
121         RETURN(rc);
122 }
123
124 /*
125  * this grabs a lock and manually implements behaviour that makes it look like
126  * the OST is returning the file size with each lock acquisition.
127  */
128 int llu_extent_lock(struct ll_file_data *fd, struct inode *inode,
129                     struct lov_stripe_md *lsm, int mode,
130                     struct ldlm_extent *extent, struct lustre_handle *lockh)
131 {
132         struct llu_inode_info *lli = llu_i2info(inode);
133         struct obd_export *exp = llu_i2obdexp(inode);
134         struct ldlm_extent size_lock;
135         struct lustre_handle match_lockh = {0};
136         int flags, rc, matched;
137         ENTRY;
138
139         rc = llu_extent_lock_no_validate(fd, inode, lsm, mode, extent, lockh, 0);
140         if (rc != ELDLM_OK)
141                 RETURN(rc);
142
143         if (test_bit(LLI_F_HAVE_OST_SIZE_LOCK, &lli->lli_flags))
144                 RETURN(0);
145
146         rc = llu_inode_getattr(inode, lsm);
147         if (rc) {
148                 llu_extent_unlock(fd, inode, lsm, mode, lockh);
149                 RETURN(rc);
150         }
151
152         size_lock.start = lli->lli_st_size;
153         size_lock.end = OBD_OBJECT_EOF;
154
155         /* XXX I bet we should be checking the lock ignore flags.. */
156         flags = LDLM_FL_CBPENDING | LDLM_FL_BLOCK_GRANTED;
157         matched = obd_match(exp, lsm, LDLM_EXTENT, &size_lock,
158                             sizeof(size_lock), LCK_PR, &flags, inode,
159                             &match_lockh);
160
161         /* hey, alright, we hold a size lock that covers the size we 
162          * just found, its not going to change for a while.. */
163         if (matched == 1) {
164                 set_bit(LLI_F_HAVE_OST_SIZE_LOCK, &lli->lli_flags);
165                 obd_cancel(exp, lsm, LCK_PR, &match_lockh);
166         } 
167
168         RETURN(0);
169 }
170
171 int llu_extent_unlock(struct ll_file_data *fd, struct inode *inode,
172                 struct lov_stripe_md *lsm, int mode,
173                 struct lustre_handle *lockh)
174 {
175         struct llu_sb_info *sbi = llu_i2sbi(inode);
176         int rc;
177         ENTRY;
178 #if 0
179         /* XXX phil: can we do this?  won't it screw the file size up? */
180         if ((fd && (fd->fd_flags & LL_FILE_IGNORE_LOCK)) ||
181             (sbi->ll_flags & LL_SBI_NOLCK))
182                 RETURN(0);
183 #endif
184         rc = obd_cancel(sbi->ll_osc_exp, lsm, mode, lockh);
185
186         RETURN(rc);
187 }
188
189 #define LLAP_MAGIC 12346789
190
191 struct ll_async_page {
192         int             llap_magic;
193         void           *llap_cookie;
194         int             llap_queued;
195         struct page    *llap_page;
196         struct inode   *llap_inode;
197 };
198
199 static struct ll_async_page *llap_from_cookie(void *cookie)
200 {
201         struct ll_async_page *llap = cookie;
202         if (llap->llap_magic != LLAP_MAGIC)
203                 return ERR_PTR(-EINVAL);
204         return llap;
205 };
206
207 static void llu_ap_fill_obdo(void *data, int cmd, struct obdo *oa)
208 {
209         struct ll_async_page *llap;
210         struct inode *inode;
211         struct lov_stripe_md *lsm;
212         obd_flag valid_flags;
213         ENTRY;
214
215         llap = llap_from_cookie(data);
216         if (IS_ERR(llap)) {
217                 EXIT;
218                 return;
219         }
220
221         inode = llap->llap_inode;
222         lsm = llu_i2info(inode)->lli_smd;
223
224         oa->o_id = lsm->lsm_object_id;
225         oa->o_valid = OBD_MD_FLID;
226         valid_flags = OBD_MD_FLTYPE | OBD_MD_FLATIME;
227         if (cmd == OBD_BRW_WRITE)
228                 valid_flags |= OBD_MD_FLMTIME | OBD_MD_FLCTIME;
229
230         obdo_from_inode(oa, inode, valid_flags);
231         EXIT;
232 }
233
234 /* called for each page in a completed rpc.*/
235 static void llu_ap_completion(void *data, int cmd, int rc)
236 {
237         struct ll_async_page *llap;
238         struct page *page;
239
240         llap = llap_from_cookie(data);
241         if (IS_ERR(llap)) {
242                 EXIT;
243                 return;
244         }
245
246         llap->llap_queued = 0;
247         page = llap->llap_page;
248
249         if (rc != 0) {
250                 if (cmd == OBD_BRW_WRITE)
251                         CERROR("writeback error on page %p index %ld: %d\n", 
252                                page, page->index, rc);
253         }
254         EXIT;
255 }
256
257 static struct obd_async_page_ops llu_async_page_ops = {
258         .ap_make_ready =        NULL,
259         .ap_refresh_count =     NULL,
260         .ap_fill_obdo =         llu_ap_fill_obdo,
261         .ap_completion =        llu_ap_completion,
262 };
263
264 static
265 struct llu_sysio_cookie* get_sysio_cookie(struct inode *inode, int maxpages)
266 {
267         struct llu_sysio_cookie *cookie;
268
269         OBD_ALLOC(cookie, LLU_SYSIO_COOKIE_SIZE(maxpages));
270         if (cookie) {
271                 I_REF(inode);
272                 cookie->lsc_inode = inode;
273                 cookie->lsc_maxpages = maxpages;
274                 cookie->lsc_llap = (struct ll_async_page *)(cookie + 1);
275                 cookie->lsc_pages = (struct page *) (cookie->lsc_llap + maxpages);
276
277                 osic_init(&cookie->lsc_osic);
278         }
279
280         return cookie;
281 }
282
283 static
284 void put_sysio_cookie(struct llu_sysio_cookie *cookie)
285 {
286         struct lov_stripe_md *lsm = llu_i2info(cookie->lsc_inode)->lli_smd;
287         struct obd_export *exp = llu_i2obdexp(cookie->lsc_inode);
288         struct ll_async_page *llap = cookie->lsc_llap;
289 #ifdef LIBLUSTRE_HANDLE_UNALIGNED_PAGE
290         struct page *pages = cookie->lsc_pages;
291 #endif
292         int i;
293
294         for (i = 0; i< cookie->lsc_maxpages; i++) {
295                 if (llap[i].llap_cookie)
296                         obd_teardown_async_page(exp, lsm, NULL,
297                                                 llap[i].llap_cookie);
298 #ifdef LIBLUSTRE_HANDLE_UNALIGNED_PAGE
299                 if (pages[i]._managed) {
300                         free(pages[i].addr);
301                         pages[i]._managed = 0;
302                 }
303 #endif
304         }
305
306         I_RELE(cookie->lsc_inode);
307
308         osic_release(cookie->lsc_osic);
309         OBD_FREE(cookie, LLU_SYSIO_COOKIE_SIZE(cookie->lsc_maxpages));
310 }
311
312 #ifdef LIBLUSTRE_HANDLE_UNALIGNED_PAGE
313 /* Note: these code should be removed finally, don't need
314  * more cleanup
315  */
316 static
317 int prepare_unaligned_write(struct llu_sysio_cookie *cookie)
318 {
319         struct inode *inode = cookie->lsc_inode;
320         struct llu_inode_info *lli = llu_i2info(inode);
321         struct lov_stripe_md *lsm = lli->lli_smd;
322         struct obdo oa;
323         struct page *pages = cookie->lsc_pages;
324         int i, pgidx[2] = {0, cookie->lsc_npages-1};
325         int rc;
326         ENTRY;
327
328         for (i = 0; i < 2; i++) {
329                 struct page *oldpage = &pages[pgidx[i]];
330                 struct page newpage;
331                 struct brw_page pg;
332                 char *newbuf;
333
334                 if (i == 0 && pgidx[0] == pgidx[1])
335                         continue;
336
337                 LASSERT(oldpage->_offset + oldpage->_count <= PAGE_CACHE_SIZE);
338
339                 if (oldpage->_count == PAGE_CACHE_SIZE)
340                         continue;
341
342                 if (oldpage->index << PAGE_CACHE_SHIFT >=
343                     lli->lli_st_size)
344                         continue;
345
346                 newbuf = malloc(PAGE_CACHE_SIZE);
347                 if (!newbuf)
348                         return -ENOMEM;
349
350                 newpage.index = oldpage->index;
351                 newpage.addr = newbuf;
352
353                 pg.pg = &newpage;
354                 pg.off = ((obd_off)newpage.index << PAGE_CACHE_SHIFT);
355                 if (pg.off + PAGE_CACHE_SIZE > lli->lli_st_size)
356                         pg.count = lli->lli_st_size % PAGE_CACHE_SIZE;
357                 else
358                         pg.count = PAGE_CACHE_SIZE;
359                 pg.flag = 0;
360
361                 oa.o_id = lsm->lsm_object_id;
362                 oa.o_mode = lli->lli_st_mode;
363                 oa.o_valid = OBD_MD_FLID | OBD_MD_FLMODE | OBD_MD_FLTYPE;
364
365                 /* issue read */
366                 rc = obd_brw(OBD_BRW_READ, llu_i2obdexp(inode), &oa, lsm, 1, &pg, NULL);
367                 if (rc) {
368                         free(newbuf);
369                         RETURN(rc);
370                 }
371
372                 /* copy page content, and reset page params */
373                 memcpy(newbuf + oldpage->_offset,
374                        (char*)oldpage->addr + oldpage->_offset,
375                        oldpage->_count);
376
377                 oldpage->addr = newbuf;
378                 if ((((obd_off)oldpage->index << PAGE_CACHE_SHIFT) +
379                     oldpage->_offset + oldpage->_count) > lli->lli_st_size)
380                         oldpage->_count += oldpage->_offset;
381                 else
382                         oldpage->_count = PAGE_CACHE_SIZE;
383                 oldpage->_offset = 0;
384                 oldpage->_managed = 1;
385         }
386
387         RETURN(0);
388 }
389 #endif
390
391 static
392 int llu_prep_async_io(struct llu_sysio_cookie *cookie, int cmd,
393                       char *buf, loff_t pos, size_t count)
394 {
395         struct llu_inode_info *lli = llu_i2info(cookie->lsc_inode);
396         struct lov_stripe_md *lsm = lli->lli_smd;
397         struct obd_export *exp = llu_i2obdexp(cookie->lsc_inode);
398         struct page *pages = cookie->lsc_pages;
399         struct ll_async_page *llap = cookie->lsc_llap;
400         int i, rc, npages = 0;
401         ENTRY;
402
403         if (!exp)
404                 RETURN(-EINVAL);
405
406         /* prepare the pages array */
407         do {
408                 unsigned long index, offset, bytes;
409
410                 offset = (pos & ~PAGE_CACHE_MASK);
411                 index = pos >> PAGE_CACHE_SHIFT;
412                 bytes = PAGE_CACHE_SIZE - offset;
413                 if (bytes > count)
414                         bytes = count;
415
416                 /* prevent read beyond file range */
417                 if ((cmd == OBD_BRW_READ) &&
418                     (pos + bytes) >= lli->lli_st_size) {
419                         if (pos >= lli->lli_st_size)
420                                 break;
421                         bytes = lli->lli_st_size - pos;
422                 }
423
424                 /* prepare page for this index */
425                 pages[npages].index = index;
426                 pages[npages].addr = buf - offset;
427
428                 pages[npages]._offset = offset;
429                 pages[npages]._count = bytes;
430
431                 npages++;
432                 count -= bytes;
433                 pos += bytes;
434                 buf += bytes;
435
436                 cookie->lsc_rwcount += bytes;
437         } while (count);
438
439         cookie->lsc_npages = npages;
440
441 #ifdef LIBLUSTRE_HANDLE_UNALIGNED_PAGE
442         if (cmd == OBD_BRW_WRITE) {
443                 rc = prepare_unaligned_write(cookie);
444                 if (rc)
445                         RETURN(rc);
446         }
447 #endif
448
449         for (i = 0; i < npages; i++) {
450                 llap[i].llap_magic = LLAP_MAGIC;
451                 rc = obd_prep_async_page(exp, lsm, NULL, &pages[i],
452                                          (obd_off)pages[i].index << PAGE_SHIFT,
453                                          &llu_async_page_ops,
454                                          &llap[i], &llap[i].llap_cookie);
455                 if (rc) {
456                         llap[i].llap_cookie = NULL;
457                         RETURN(rc);
458                 }
459                 CDEBUG(D_CACHE, "llap %p page %p cookie %p obj off "LPU64"\n",
460                        &llap[i], &pages[i], llap[i].llap_cookie,
461                        (obd_off)pages[i].index << PAGE_SHIFT);
462                 pages[i].private = (unsigned long)&llap[i];
463                 llap[i].llap_page = &pages[i];
464                 llap[i].llap_inode = cookie->lsc_inode;
465
466                 rc = obd_queue_sync_io(exp, lsm, NULL, cookie->lsc_osic,
467                                        llap[i].llap_cookie, cmd,
468                                        pages[i]._offset, pages[i]._count, 0);
469                 if (rc)
470                         RETURN(rc);
471
472                 llap[i].llap_queued = 1;
473         }
474
475         RETURN(0);
476 }
477
478 static
479 int llu_start_async_io(struct llu_sysio_cookie *cookie)
480 {
481         struct lov_stripe_md *lsm = llu_i2info(cookie->lsc_inode)->lli_smd;
482         struct obd_export *exp = llu_i2obdexp(cookie->lsc_inode);
483
484         return obd_trigger_sync_io(exp, lsm, NULL, cookie->lsc_osic);
485 }
486
487 /*
488  * read/write a continuous buffer for an inode (zero-copy)
489  */
490 struct llu_sysio_cookie*
491 llu_rw(int cmd, struct inode *inode, char *buf, size_t count, loff_t pos)
492 {
493         struct llu_sysio_cookie *cookie;
494         int max_pages, rc;
495         ENTRY;
496
497         max_pages = (count >> PAGE_SHIFT) + 2;
498
499         cookie = get_sysio_cookie(inode, max_pages);
500         if (!cookie)
501                 RETURN(ERR_PTR(-ENOMEM));
502
503         rc = llu_prep_async_io(cookie, cmd, buf, pos, count);
504         if (rc)
505                 GOTO(out_cleanup, rc);
506
507         rc = llu_start_async_io(cookie);
508         if (rc)
509                 GOTO(out_cleanup, rc);
510
511 /*
512         rc = osic_wait(&osic);
513         if (rc) {
514                 CERROR("file i/o error!\n");
515                 rw_count = rc;
516         }
517 */
518         RETURN(cookie);
519
520 out_cleanup:
521         put_sysio_cookie(cookie);
522         RETURN(ERR_PTR(rc));
523 }
524
525 struct llu_sysio_callback_args*
526 llu_file_write(struct inode *inode, const struct iovec *iovec,
527                size_t iovlen, loff_t pos)
528 {
529         struct llu_inode_info *lli = llu_i2info(inode);
530         struct ll_file_data *fd = lli->lli_file_data;
531         struct lustre_handle lockh = {0};
532         struct lov_stripe_md *lsm = lli->lli_smd;
533         struct llu_sysio_callback_args *lsca;
534         struct llu_sysio_cookie *cookie;
535         struct ldlm_extent extent;
536         ldlm_error_t err;
537         int iovidx;
538         ENTRY;
539
540         /* XXX consider other types later */
541         if (!S_ISREG(lli->lli_st_mode))
542                 LBUG();
543
544         LASSERT(iovlen <= MAX_IOVEC);
545
546         OBD_ALLOC(lsca, sizeof(*lsca));
547         if (!lsca)
548                 RETURN(ERR_PTR(-ENOMEM));
549
550         /* FIXME optimize the following extent locking */
551         for (iovidx = 0; iovidx < iovlen; iovidx++) {
552                 char *buf = (char*)iovec[iovidx].iov_base;
553                 size_t count = iovec[iovidx].iov_len;
554
555                 if (count == 0)
556                         continue;
557
558                 /* FIXME libsysio haven't handle O_APPEND */
559                 extent.start = pos;
560                 extent.end = pos + count - 1;
561
562 #ifdef LIBLUSTRE_HANDLE_UNALIGNED_PAGE
563                 if ((pos & ~PAGE_CACHE_MASK) == 0 &&
564                     (count & ~PAGE_CACHE_MASK) == 0)
565                         err = llu_extent_lock_no_validate(fd, inode, lsm,
566                                                 LCK_PW, &extent, &lockh, 0);
567                 else
568                         err = llu_extent_lock(fd, inode, lsm, LCK_PW,
569                                                 &extent, &lockh);
570 #else
571                 /* server will handle partial write, so we don't
572                  * care for file size here */
573                 err = llu_extent_lock_no_validate(fd, inode, lsm, LCK_PW,
574                                                 &extent, &lockh, 0);
575 #endif
576                 if (err != ELDLM_OK)
577                         GOTO(err_out, err = -ENOLCK);
578
579                 CDEBUG(D_INFO, "Writing inode %lu, "LPSZ" bytes, offset %Lu\n",
580                        lli->lli_st_ino, count, pos);
581
582                 cookie = llu_rw(OBD_BRW_WRITE, inode, buf, count, pos);
583                 if (!IS_ERR(cookie)) {
584                         /* save cookie */
585                         lsca->cookies[lsca->ncookies++] = cookie;
586                         pos += count;
587                         /* file size grow. XXX should be done here? */
588                         if (pos > lli->lli_st_size) {
589                                 lli->lli_st_size = pos;
590                                 set_bit(LLI_F_PREFER_EXTENDED_SIZE,
591                                         &lli->lli_flags);
592                         }
593                 } else {
594                         llu_extent_unlock(fd, inode, lsm, LCK_PW, &lockh);
595                         GOTO(err_out, err = PTR_ERR(cookie));
596                 }
597
598                 /* XXX errors? */
599                 err = llu_extent_unlock(fd, inode, lsm, LCK_PW, &lockh);
600                 if (err)
601                         CERROR("extent unlock error %d\n", err);
602         }
603
604         RETURN(lsca);
605
606 err_out:
607         /* teardown all async stuff */
608         while (lsca->ncookies--) {
609                 put_sysio_cookie(lsca->cookies[lsca->ncookies]);
610         }
611         OBD_FREE(lsca, sizeof(*lsca));
612
613         RETURN(ERR_PTR(err));
614 }
615
616 #if 0
617 static void llu_update_atime(struct inode *inode)
618 {
619         struct llu_inode_info *lli = llu_i2info(inode);
620
621 #ifdef USE_ATIME
622         struct iattr attr;
623
624         attr.ia_atime = LTIME_S(CURRENT_TIME);
625         attr.ia_valid = ATTR_ATIME;
626
627         if (lli->lli_st_atime == attr.ia_atime) return;
628         if (IS_RDONLY(inode)) return;
629         if (IS_NOATIME(inode)) return;
630
631         /* ll_inode_setattr() sets inode->i_atime from attr.ia_atime */
632         llu_inode_setattr(inode, &attr, 0);
633 #else
634         /* update atime, but don't explicitly write it out just this change */
635         inode->i_atime = CURRENT_TIME;
636 #endif
637 }
638 #endif
639
640 struct llu_sysio_callback_args*
641 llu_file_read(struct inode *inode, const struct iovec *iovec,
642                        size_t iovlen, loff_t pos)
643 {
644         struct llu_inode_info *lli = llu_i2info(inode);
645         struct ll_file_data *fd = lli->lli_file_data;
646         struct lov_stripe_md *lsm = lli->lli_smd;
647         struct lustre_handle lockh = { 0 };
648         struct ldlm_extent extent;
649         struct llu_sysio_callback_args *lsca;
650         struct llu_sysio_cookie *cookie;
651         int iovidx;
652
653         ldlm_error_t err;
654         ENTRY;
655
656         OBD_ALLOC(lsca, sizeof(*lsca));
657         if (!lsca)
658                 RETURN(ERR_PTR(-ENOMEM));
659
660         for (iovidx = 0; iovidx < iovlen; iovidx++) {
661                 char *buf = iovec[iovidx].iov_base;
662                 size_t count = iovec[iovidx].iov_len;
663
664                 /* "If nbyte is 0, read() will return 0 and have no other results."
665                  *                      -- Single Unix Spec */
666                 if (count == 0)
667                         continue;
668
669                 extent.start = pos;
670                 extent.end = pos + count - 1;
671
672                 err = llu_extent_lock(fd, inode, lsm, LCK_PR, &extent, &lockh);
673                 if (err != ELDLM_OK)
674                         GOTO(err_out, err = -ENOLCK);
675
676                 CDEBUG(D_INFO, "Reading inode %lu, "LPSZ" bytes, offset %Ld\n",
677                        lli->lli_st_ino, count, pos);
678
679                 if (pos >= lli->lli_st_size) {
680                         llu_extent_unlock(fd, inode, lsm, LCK_PR, &lockh);
681                         break;
682                 }
683
684                 cookie = llu_rw(OBD_BRW_READ, inode, buf, count, pos);
685                 if (!IS_ERR(cookie)) {
686                         /* save cookie */
687                         lsca->cookies[lsca->ncookies++] = cookie;
688                         pos += count;
689                 } else {
690                         llu_extent_unlock(fd, inode, lsm, LCK_PR, &lockh);
691                         GOTO(err_out, err = PTR_ERR(cookie));
692                 }
693
694                 /* XXX errors? */
695                 err = llu_extent_unlock(fd, inode, lsm, LCK_PR, &lockh);
696                 if (err)
697                         CERROR("extent_unlock fail: %d\n", err);
698         }
699 #if 0
700         if (readed > 0)
701                 llu_update_atime(inode);
702 #endif
703         RETURN(lsca);
704
705 err_out:
706         /* teardown all async stuff */
707         while (lsca->ncookies--) {
708                 put_sysio_cookie(lsca->cookies[lsca->ncookies]);
709         }
710         OBD_FREE(lsca, sizeof(*lsca));
711
712         RETURN(ERR_PTR(err));
713 }
714
715 int llu_iop_iodone(struct ioctx *ioctxp)
716 {
717         struct llu_sysio_callback_args *lsca = ioctxp->ioctx_private;
718         struct llu_sysio_cookie *cookie;
719         int i, err = 0, rc = 0;
720         ENTRY;
721
722         /* write/read(fd, buf, 0) */
723         if (!lsca) {
724                 ioctxp->ioctx_cc = 0;
725                 RETURN(1);
726         }
727
728         LASSERT(!IS_ERR(lsca));
729
730         for (i = 0; i < lsca->ncookies; i++) {
731                 cookie = lsca->cookies[i];
732                 if (cookie) {
733                         err = osic_wait(cookie->lsc_osic);
734                         if (err && !rc)
735                                 rc = err;
736                         if (!rc)
737                                 ioctxp->ioctx_cc += cookie->lsc_rwcount;
738                         put_sysio_cookie(cookie);
739                 }
740         }
741
742         if (rc) {
743                 LASSERT(rc < 0);
744                 ioctxp->ioctx_cc = -1;
745                 ioctxp->ioctx_errno = -rc;
746         }
747
748         OBD_FREE(lsca, sizeof(*lsca));
749         ioctxp->ioctx_private = NULL;
750
751         RETURN(1);
752 }