Whamcloud - gitweb
land v0.9.1 on HEAD, in preparation for a 1.0.x branch
[fs/lustre-release.git] / lustre / liblustre / rw.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Lustre Light Super operations
5  *
6  *  Copyright (c) 2002, 2003 Cluster File Systems, Inc.
7  *
8  *   This file is part of Lustre, http://www.lustre.org.
9  *
10  *   Lustre is free software; you can redistribute it and/or
11  *   modify it under the terms of version 2 of the GNU General Public
12  *   License as published by the Free Software Foundation.
13  *
14  *   Lustre is distributed in the hope that it will be useful,
15  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
16  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17  *   GNU General Public License for more details.
18  *
19  *   You should have received a copy of the GNU General Public License
20  *   along with Lustre; if not, write to the Free Software
21  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
22  */
23
24 #define DEBUG_SUBSYSTEM S_LLITE
25
26 #include <stdlib.h>
27 #include <string.h>
28 #include <assert.h>
29 #include <time.h>
30 #include <sys/types.h>
31 #include <sys/queue.h>
32
33 #include <sysio.h>
34 #include <fs.h>
35 #include <mount.h>
36 #include <inode.h>
37 #include <file.h>
38
39 #include "llite_lib.h"
40
41 #if 0
42 void llu_pgcache_remove_extent(struct inode *inode, struct lov_stripe_md *lsm,
43                                struct ldlm_lock *lock)
44 {
45         clear_bit(LLI_F_HAVE_SIZE_LOCK, &(llu_i2info(inode)->lli_flags));
46 #if 0
47         struct ldlm_extent *extent = &lock->l_extent;
48         unsigned long start, end, count, skip, i, j;
49         struct page *page;
50         int ret;
51         ENTRY;
52
53         CDEBUG(D_INODE, "obdo %lu inode %p ["LPU64"->"LPU64"] size: %llu\n",
54                inode->i_ino, inode, extent->start, extent->end, inode->i_size);
55
56         start = extent->start >> PAGE_CACHE_SHIFT;
57         count = ~0;
58         skip = 0;
59         end = (extent->end >> PAGE_CACHE_SHIFT) + 1;
60         if ((end << PAGE_CACHE_SHIFT) < extent->end)
61                 end = ~0;
62         if (lsm->lsm_stripe_count > 1) {
63                 struct {
64                         char name[16];
65                         struct ldlm_lock *lock;
66                         struct lov_stripe_md *lsm;
67                 } key = { .name = "lock_to_stripe", .lock = lock, .lsm = lsm };
68                 __u32 stripe;
69                 __u32 vallen = sizeof(stripe);
70                 int rc;
71
72                 /* get our offset in the lov */
73                 rc = obd_get_info(ll_i2obdconn(inode), sizeof(key),
74                                   &key, &vallen, &stripe);
75                 if (rc != 0) {
76                         CERROR("obd_get_info: rc = %d\n", rc);
77                         LBUG();
78                 }
79                 LASSERT(stripe < lsm->lsm_stripe_count);
80
81                 count = lsm->lsm_stripe_size >> PAGE_CACHE_SHIFT;
82                 skip = (lsm->lsm_stripe_count - 1) * count;
83                 start += (start/count * skip) + (stripe * count);
84                 if (end != ~0)
85                         end += (end/count * skip) + (stripe * count);
86         }
87
88         i = (inode->i_size + PAGE_CACHE_SIZE-1) >> PAGE_CACHE_SHIFT;
89         if (end >= i)
90                 clear_bit(LLI_F_HAVE_SIZE_LOCK, &(ll_i2info(inode)->lli_flags));
91         if (i < end)
92                 end = i;
93
94         CDEBUG(D_INODE, "start: %lu j: %lu count: %lu skip: %lu end: %lu\n",
95                start, start % count, count, skip, end);
96
97         /* start writeback on dirty pages in the extent when its PW */
98         for (i = start, j = start % count;
99                         lock->l_granted_mode == LCK_PW && i < end; j++, i++) {
100                 if (j == count) {
101                         i += skip;
102                         j = 0;
103                 }
104                 /* its unlikely, but give us a chance to bail when we're out */
105                 PGCACHE_WRLOCK(inode->i_mapping);
106                 if (list_empty(&inode->i_mapping->dirty_pages)) {
107                         CDEBUG(D_INODE, "dirty list empty\n");
108                         PGCACHE_WRUNLOCK(inode->i_mapping);
109                         break;
110                 }
111                 PGCACHE_WRUNLOCK(inode->i_mapping);
112
113                 if (need_resched())
114                         schedule();
115
116         /* always do a getattr for the first person to pop out of lock
117          * acquisition.. the DID_GETATTR flag and semaphore serialize
118          * this initial race.  we used to make a decision based on whether
119          * the lock was matched or acquired, but the matcher could win the
120          * waking race with the first issuer so that was no good..
121          */
122         if (test_bit(LLI_F_DID_GETATTR, &lli->lli_flags))
123                 RETURN(ELDLM_OK);
124
125         down(&lli->lli_getattr_sem);
126
127         if (!test_bit(LLI_F_DID_GETATTR, &lli->lli_flags)) {
128                 rc = ll_inode_getattr(inode, lsm);
129                 if (rc == 0) {
130                         set_bit(LLI_F_DID_GETATTR, &lli->lli_flags);
131                 } else {
132                         unlock_page(page);
133                 }
134                 page_cache_release(page);
135
136         }
137
138         /* our locks are page granular thanks to osc_enqueue, we invalidate the
139          * whole page. */
140         LASSERT((extent->start & ~PAGE_CACHE_MASK) == 0);
141         LASSERT(((extent->end+1) & ~PAGE_CACHE_MASK) == 0);
142         for (i = start, j = start % count ; i < end ; j++, i++) {
143                 if ( j == count ) {
144                         i += skip;
145                         j = 0;
146                 }
147                 PGCACHE_WRLOCK(inode->i_mapping);
148                 if (list_empty(&inode->i_mapping->dirty_pages) &&
149                      list_empty(&inode->i_mapping->clean_pages) &&
150                      list_empty(&inode->i_mapping->locked_pages)) {
151                         CDEBUG(D_INODE, "nothing left\n");
152                         PGCACHE_WRUNLOCK(inode->i_mapping);
153                         break;
154                 }
155                 PGCACHE_WRUNLOCK(inode->i_mapping);
156                 if (need_resched())
157                         schedule();
158                 page = find_get_page(inode->i_mapping, i);
159                 if (page == NULL)
160                         continue;
161                 CDEBUG(D_INODE, "dropping page %p at %lu\n", page, page->index);
162                 lock_page(page);
163                 if (page->mapping) /* might have raced */
164 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
165                         truncate_complete_page(page);
166 #else
167                         truncate_complete_page(page->mapping, page);
168 #endif                
169                 unlock_page(page);
170                 page_cache_release(page);
171         }
172         EXIT;
173 #endif
174 }
175
176 int llu_lock_callback(struct ldlm_lock *lock, struct ldlm_lock_desc *new,
177                       void *data, int flag)
178 {
179         struct inode *inode = data;
180         struct llu_inode_info *lli = llu_i2info(inode);
181         struct lustre_handle lockh = {0};
182         int rc;
183         ENTRY;
184
185         if (inode == NULL)
186                 LBUG();
187
188         switch (flag) {
189         case LDLM_CB_BLOCKING:
190                 ldlm_lock2handle(lock, &lockh);
191                 rc = ldlm_cli_cancel(&lockh);
192                 if (rc != ELDLM_OK)
193                         CERROR("ldlm_cli_cancel failed: %d\n", rc);
194                 break;
195         case LDLM_CB_CANCELING: {
196                 /* FIXME: we could be given 'canceling intents' so that we
197                  * could know to write-back or simply throw away the pages
198                  * based on if the cancel comes from a desire to, say,
199                  * read or truncate.. */
200                 llu_pgcache_remove_extent(inode, lli->lli_smd, lock);
201                 break;
202         }
203         default:
204                 LBUG();
205         }
206
207         RETURN(0);
208 }
209 #endif
210
211 static int llu_extent_lock_callback(struct ldlm_lock *lock,
212                                     struct ldlm_lock_desc *new, void *data,
213                                     int flag)
214 {
215         struct lustre_handle lockh = { 0 };
216         int rc;
217         ENTRY;
218         
219
220         if ((unsigned long)data > 0 && (unsigned long)data < 0x1000) {
221                 LDLM_ERROR(lock, "cancelling lock with bad data %p", data);
222                 LBUG();
223         }
224         
225         switch (flag) {
226         case LDLM_CB_BLOCKING:
227                 ldlm_lock2handle(lock, &lockh);
228                 rc = ldlm_cli_cancel(&lockh);
229                 if (rc != ELDLM_OK)
230                         CERROR("ldlm_cli_cancel failed: %d\n", rc);
231                 break;
232         case LDLM_CB_CANCELING: {
233                 struct inode *inode = llu_inode_from_lock(lock);
234                 struct llu_inode_info *lli;
235                 
236                 if (!inode)
237                         RETURN(0);
238                 lli= llu_i2info(inode);
239                 if (!lli) {
240                         I_RELE(inode);
241                         RETURN(0);
242                 }
243                 if (!lli->lli_smd) {
244                         I_RELE(inode);
245                         RETURN(0);
246                 }
247
248 /*
249                 ll_pgcache_remove_extent(inode, lli->lli_smd, lock);
250                 iput(inode);
251 */
252                 I_RELE(inode);
253                 break;
254         }
255         default:
256                 LBUG();
257         }
258         
259         RETURN(0);
260 }
261
262 int llu_extent_lock_no_validate(struct ll_file_data *fd,
263                                 struct inode *inode,
264                                 struct lov_stripe_md *lsm,
265                                 int mode,
266                                 struct ldlm_extent *extent,
267                                 struct lustre_handle *lockh,
268                                 int ast_flags)
269 {
270         struct llu_sb_info *sbi = llu_i2sbi(inode);
271         struct llu_inode_info *lli = llu_i2info(inode);
272         int rc;
273         ENTRY;
274
275         LASSERT(lockh->cookie == 0);
276
277 #if 0
278         /* XXX phil: can we do this?  won't it screw the file size up? */
279         if ((fd && (fd->fd_flags & LL_FILE_IGNORE_LOCK)) ||
280             (sbi->ll_flags & LL_SBI_NOLCK))
281                 RETURN(0);
282 #endif
283
284         CDEBUG(D_DLMTRACE, "Locking inode %lu, start "LPU64" end "LPU64"\n",
285                lli->lli_st_ino, extent->start, extent->end);
286
287         rc = obd_enqueue(sbi->ll_osc_exp, lsm, NULL, LDLM_EXTENT, extent,
288                          sizeof(extent), mode, &ast_flags,
289                          llu_extent_lock_callback, inode, lockh);
290
291         RETURN(rc);
292 }
293
294 /*
295  * this grabs a lock and manually implements behaviour that makes it look like
296  * the OST is returning the file size with each lock acquisition.
297  */
298 int llu_extent_lock(struct ll_file_data *fd, struct inode *inode,
299                     struct lov_stripe_md *lsm, int mode,
300                     struct ldlm_extent *extent, struct lustre_handle *lockh)
301 {
302         struct llu_inode_info *lli = llu_i2info(inode);
303         struct obd_export *exp = llu_i2obdexp(inode);
304         struct ldlm_extent size_lock;
305         struct lustre_handle match_lockh = {0};
306         int flags, rc, matched;
307         ENTRY;
308
309         rc = llu_extent_lock_no_validate(fd, inode, lsm, mode, extent, lockh, 0);
310         if (rc != ELDLM_OK)
311                 RETURN(rc);
312
313         if (test_bit(LLI_F_HAVE_OST_SIZE_LOCK, &lli->lli_flags))
314                 RETURN(0);
315
316         rc = llu_inode_getattr(inode, lsm);
317         if (rc) {
318                 llu_extent_unlock(fd, inode, lsm, mode, lockh);
319                 RETURN(rc);
320         }
321
322         size_lock.start = lli->lli_st_size;
323         size_lock.end = OBD_OBJECT_EOF;
324
325         /* XXX I bet we should be checking the lock ignore flags.. */
326         flags = LDLM_FL_CBPENDING | LDLM_FL_BLOCK_GRANTED;
327         matched = obd_match(exp, lsm, LDLM_EXTENT, &size_lock,
328                             sizeof(size_lock), LCK_PR, &flags, inode,
329                             &match_lockh);
330
331         /* hey, alright, we hold a size lock that covers the size we 
332          * just found, its not going to change for a while.. */
333         if (matched == 1) {
334                 set_bit(LLI_F_HAVE_OST_SIZE_LOCK, &lli->lli_flags);
335                 obd_cancel(exp, lsm, LCK_PR, &match_lockh);
336         } 
337
338         RETURN(0);
339 }
340
341 int llu_extent_unlock(struct ll_file_data *fd, struct inode *inode,
342                 struct lov_stripe_md *lsm, int mode,
343                 struct lustre_handle *lockh)
344 {
345         struct llu_sb_info *sbi = llu_i2sbi(inode);
346         int rc;
347         ENTRY;
348 #if 0
349         /* XXX phil: can we do this?  won't it screw the file size up? */
350         if ((fd && (fd->fd_flags & LL_FILE_IGNORE_LOCK)) ||
351             (sbi->ll_flags & LL_SBI_NOLCK))
352                 RETURN(0);
353 #endif
354         rc = obd_cancel(sbi->ll_osc_exp, lsm, mode, lockh);
355
356         RETURN(rc);
357 }
358
359 #define LLAP_MAGIC 12346789
360
361 struct ll_async_page {
362         int             llap_magic;
363         void           *llap_cookie;
364         int             llap_queued;
365         struct page    *llap_page;
366         struct inode   *llap_inode;
367 };
368
369 static struct ll_async_page *llap_from_cookie(void *cookie)
370 {
371         struct ll_async_page *llap = cookie;
372         if (llap->llap_magic != LLAP_MAGIC)
373                 return ERR_PTR(-EINVAL);
374         return llap;
375 };
376
377 static void llu_ap_fill_obdo(void *data, int cmd, struct obdo *oa)
378 {
379         struct ll_async_page *llap;
380         struct inode *inode;
381         struct lov_stripe_md *lsm;
382         obd_flag valid_flags;
383         ENTRY;
384
385         llap = llap_from_cookie(data);
386         if (IS_ERR(llap)) {
387                 EXIT;
388                 return;
389         }
390
391         inode = llap->llap_inode;
392         lsm = llu_i2info(inode)->lli_smd;
393
394         oa->o_id = lsm->lsm_object_id;
395         oa->o_valid = OBD_MD_FLID;
396         valid_flags = OBD_MD_FLTYPE | OBD_MD_FLATIME;
397         if (cmd == OBD_BRW_WRITE)
398                 valid_flags |= OBD_MD_FLMTIME | OBD_MD_FLCTIME;
399
400         obdo_from_inode(oa, inode, valid_flags);
401         EXIT;
402 }
403
404 /* called for each page in a completed rpc.*/
405 static void llu_ap_completion(void *data, int cmd, int rc)
406 {
407         struct ll_async_page *llap;
408         struct page *page;
409
410         llap = llap_from_cookie(data);
411         if (IS_ERR(llap)) {
412                 EXIT;
413                 return;
414         }
415
416         llap->llap_queued = 0;
417         page = llap->llap_page;
418
419         if (rc != 0) {
420                 if (cmd == OBD_BRW_WRITE)
421                         CERROR("writeback error on page %p index %ld: %d\n", 
422                                page, page->index, rc);
423         }
424         EXIT;
425 }
426
427 static struct obd_async_page_ops llu_async_page_ops = {
428         .ap_make_ready =        NULL,
429         .ap_refresh_count =     NULL,
430         .ap_fill_obdo =         llu_ap_fill_obdo,
431         .ap_completion =        llu_ap_completion,
432 };
433
434 static
435 struct llu_sysio_cookie* get_sysio_cookie(struct inode *inode, int npages)
436 {
437         struct llu_sysio_cookie *cookie;
438
439         OBD_ALLOC(cookie, LLU_SYSIO_COOKIE_SIZE(npages));
440         if (cookie) {
441                 I_REF(inode);
442                 cookie->lsc_inode = inode;
443                 cookie->lsc_npages = npages;
444                 cookie->lsc_llap = (struct ll_async_page *)(cookie + 1);
445                 cookie->lsc_pages = (struct page *) (cookie->lsc_llap + npages);
446
447                 osic_init(&cookie->lsc_osic);
448         }
449
450         return cookie;
451 }
452
453 static
454 void put_sysio_cookie(struct llu_sysio_cookie *cookie)
455 {
456         struct lov_stripe_md *lsm = llu_i2info(cookie->lsc_inode)->lli_smd;
457         struct obd_export *exp = llu_i2obdexp(cookie->lsc_inode);
458         struct ll_async_page *llap = cookie->lsc_llap;
459         int i;
460
461         for (i = 0; i< cookie->lsc_npages; i++) {
462                 if (llap[i].llap_cookie)
463                         obd_teardown_async_page(exp, lsm, NULL,
464                                                 llap[i].llap_cookie);
465         }
466
467         I_RELE(cookie->lsc_inode);
468
469         OBD_FREE(cookie, LLU_SYSIO_COOKIE_SIZE(cookie->lsc_npages));
470 }
471
472 static
473 int llu_prep_async_io(struct llu_sysio_cookie *cookie, int cmd,
474                       char *buf, loff_t pos, size_t count)
475 {
476         struct lov_stripe_md *lsm = llu_i2info(cookie->lsc_inode)->lli_smd;
477         struct obd_export *exp = llu_i2obdexp(cookie->lsc_inode);
478         struct page *pages = cookie->lsc_pages;
479         struct ll_async_page *llap = cookie->lsc_llap;
480         int i, rc, npages = 0;
481         ENTRY;
482
483         if (!exp)
484                 RETURN(-EINVAL);
485
486         cookie->lsc_rwcount = count;
487
488         /* prepare the pages array */
489         do {
490                 unsigned long index, offset, bytes;
491
492                 offset = (pos & ~PAGE_CACHE_MASK);
493                 index = pos >> PAGE_CACHE_SHIFT;
494                 bytes = PAGE_CACHE_SIZE - offset;
495                 if (bytes > count)
496                         bytes = count;
497
498                 /* prepare page for this index */
499                 pages[npages].index = index;
500                 pages[npages].addr = buf - offset;
501
502                 pages[npages]._offset = offset;
503                 pages[npages]._count = bytes;
504
505                 npages++;
506                 count -= bytes;
507                 pos += bytes;
508                 buf += bytes;
509         } while (count);
510
511         for (i = 0; i < npages; i++) {
512                 llap[i].llap_magic = LLAP_MAGIC;
513                 rc = obd_prep_async_page(exp, lsm, NULL, &pages[i],
514                                          (obd_off)pages[i].index << PAGE_SHIFT,
515                                          &llu_async_page_ops,
516                                          &llap[i], &llap[i].llap_cookie);
517                 if (rc) {
518                         llap[i].llap_cookie = NULL;
519                         RETURN(rc);
520                 }
521                 CDEBUG(D_CACHE, "llap %p page %p cookie %p obj off "LPU64"\n",
522                        &llap[i], &pages[i], llap[i].llap_cookie,
523                        (obd_off)pages[i].index << PAGE_SHIFT);
524                 pages[i].private = (unsigned long)&llap[i];
525                 llap[i].llap_page = &pages[i];
526                 llap[i].llap_inode = cookie->lsc_inode;
527
528                 rc = obd_queue_sync_io(exp, lsm, NULL, &cookie->lsc_osic,
529                                        llap[i].llap_cookie, cmd,
530                                        pages[i]._offset, pages[i]._count, 0);
531                 if (rc)
532                         RETURN(rc);
533
534                 llap[i].llap_queued = 1;
535         }
536
537         RETURN(0);
538 }
539
540 static
541 int llu_start_async_io(struct llu_sysio_cookie *cookie)
542 {
543         struct lov_stripe_md *lsm = llu_i2info(cookie->lsc_inode)->lli_smd;
544         struct obd_export *exp = llu_i2obdexp(cookie->lsc_inode);
545
546         return obd_trigger_sync_io(exp, lsm, NULL, &cookie->lsc_osic);
547 }
548
549 /*
550  * read/write a continuous buffer for an inode (zero-copy)
551  */
552 struct llu_sysio_cookie*
553 llu_rw(int cmd, struct inode *inode, char *buf, size_t count, loff_t pos)
554 {
555         struct llu_sysio_cookie *cookie;
556         int max_pages, rc;
557         ENTRY;
558
559         max_pages = (count >> PAGE_SHIFT) + 2;
560
561         cookie = get_sysio_cookie(inode, max_pages);
562         if (!cookie)
563                 RETURN(ERR_PTR(-ENOMEM));
564
565         rc = llu_prep_async_io(cookie, cmd, buf, pos, count);
566         if (rc)
567                 GOTO(out_cleanup, rc);
568
569         rc = llu_start_async_io(cookie);
570         if (rc)
571                 GOTO(out_cleanup, rc);
572
573 /*
574         rc = osic_wait(&osic);
575         if (rc) {
576                 CERROR("file i/o error!\n");
577                 rw_count = rc;
578         }
579 */
580         RETURN(cookie);
581
582 out_cleanup:
583         put_sysio_cookie(cookie);
584         RETURN(ERR_PTR(rc));
585 }
586
587 struct llu_sysio_callback_args*
588 llu_file_write(struct inode *inode, const struct iovec *iovec,
589                size_t iovlen, loff_t pos)
590 {
591         struct llu_inode_info *lli = llu_i2info(inode);
592         struct ll_file_data *fd = lli->lli_file_data;
593         struct lustre_handle lockh = {0};
594         struct lov_stripe_md *lsm = lli->lli_smd;
595         struct llu_sysio_callback_args *lsca;
596         struct llu_sysio_cookie *cookie;
597         struct ldlm_extent extent;
598         ldlm_error_t err;
599         int iovidx;
600         ENTRY;
601
602         /* XXX consider other types later */
603         if (!S_ISREG(lli->lli_st_mode))
604                 LBUG();
605
606         LASSERT(iovlen <= MAX_IOVEC);
607
608         OBD_ALLOC(lsca, sizeof(*lsca));
609         if (!lsca)
610                 RETURN(ERR_PTR(-ENOMEM));
611
612         /* FIXME optimize the following extent locking */
613         for (iovidx = 0; iovidx < iovlen; iovidx++) {
614                 char *buf = iovec[iovidx].iov_base;
615                 size_t count = iovec[iovidx].iov_len;
616
617                 if (count == 0)
618                         continue;
619
620                 /* FIXME libsysio haven't consider the open flags
621                  * such as O_APPEND */
622 #if 0
623                 if (!S_ISBLK(lli->lli_st_mode) && file->f_flags & O_APPEND) {
624                         extent.start = 0;
625                         extent.end = OBD_OBJECT_EOF;
626                 } else  {
627                         extent.start = *ppos;
628                         extent.end = *ppos + count - 1;
629                 }
630 #else
631                 extent.start = pos;
632                 extent.end = pos + count - 1;
633 #endif
634
635                 err = llu_extent_lock(fd, inode, lsm, LCK_PW, &extent, &lockh);
636                 if (err != ELDLM_OK)
637                         GOTO(err_out, err = -ENOLCK);
638
639                 CDEBUG(D_INFO, "Writing inode %lu, "LPSZ" bytes, offset %Lu\n",
640                        lli->lli_st_ino, count, pos);
641
642                 cookie = llu_rw(OBD_BRW_WRITE, inode, buf, count, pos);
643                 if (!IS_ERR(cookie)) {
644                         /* save cookie */
645                         lsca->cookies[lsca->ncookies++] = cookie;
646                         pos += count;
647                         /* file size grow. XXX should be done here? */
648                         if (pos > lli->lli_st_size) {
649                                 lli->lli_st_size = pos;
650                                 set_bit(LLI_F_PREFER_EXTENDED_SIZE,
651                                         &lli->lli_flags);
652                         }
653                 } else {
654                         llu_extent_unlock(fd, inode, lsm, LCK_PW, &lockh);
655                         GOTO(err_out, err = PTR_ERR(cookie));
656                 }
657
658                 /* XXX errors? */
659                 err = llu_extent_unlock(fd, inode, lsm, LCK_PW, &lockh);
660                 if (err)
661                         CERROR("extent unlock error %d\n", err);
662         }
663
664         RETURN(lsca);
665
666 err_out:
667         /* teardown all async stuff */
668         while (lsca->ncookies--) {
669                 put_sysio_cookie(lsca->cookies[lsca->ncookies]);
670         }
671         OBD_FREE(lsca, sizeof(*lsca));
672
673         RETURN(ERR_PTR(err));
674 }
675
676 #if 0
677 static void llu_update_atime(struct inode *inode)
678 {
679         struct llu_inode_info *lli = llu_i2info(inode);
680
681 #ifdef USE_ATIME
682         struct iattr attr;
683
684         attr.ia_atime = LTIME_S(CURRENT_TIME);
685         attr.ia_valid = ATTR_ATIME;
686
687         if (lli->lli_st_atime == attr.ia_atime) return;
688         if (IS_RDONLY(inode)) return;
689         if (IS_NOATIME(inode)) return;
690
691         /* ll_inode_setattr() sets inode->i_atime from attr.ia_atime */
692         llu_inode_setattr(inode, &attr, 0);
693 #else
694         /* update atime, but don't explicitly write it out just this change */
695         inode->i_atime = CURRENT_TIME;
696 #endif
697 }
698 #endif
699
700 struct llu_sysio_callback_args*
701 llu_file_read(struct inode *inode, const struct iovec *iovec,
702                        size_t iovlen, loff_t pos)
703 {
704         struct llu_inode_info *lli = llu_i2info(inode);
705         struct ll_file_data *fd = lli->lli_file_data;
706         struct lov_stripe_md *lsm = lli->lli_smd;
707         struct lustre_handle lockh = { 0 };
708         struct ldlm_extent extent;
709         struct llu_sysio_callback_args *lsca;
710         struct llu_sysio_cookie *cookie;
711         int iovidx;
712
713         ldlm_error_t err;
714         ENTRY;
715
716         OBD_ALLOC(lsca, sizeof(*lsca));
717         if (!lsca)
718                 RETURN(ERR_PTR(-ENOMEM));
719
720         for (iovidx = 0; iovidx < iovlen; iovidx++) {
721                 char *buf = iovec[iovidx].iov_base;
722                 size_t count = iovec[iovidx].iov_len;
723
724                 /* "If nbyte is 0, read() will return 0 and have no other results."
725                  *                      -- Single Unix Spec */
726                 if (count == 0)
727                         continue;
728
729                 extent.start = pos;
730                 extent.end = pos + count - 1;
731
732                 err = llu_extent_lock(fd, inode, lsm, LCK_PR, &extent, &lockh);
733                 if (err != ELDLM_OK)
734                         GOTO(err_out, err = -ENOLCK);
735
736                 CDEBUG(D_INFO, "Reading inode %lu, "LPSZ" bytes, offset %Ld\n",
737                        lli->lli_st_ino, count, pos);
738
739                 cookie = llu_rw(OBD_BRW_READ, inode, buf, count, pos);
740                 if (!IS_ERR(cookie)) {
741                         /* save cookie */
742                         lsca->cookies[lsca->ncookies++] = cookie;
743                         pos += count;
744                 } else {
745                         llu_extent_unlock(fd, inode, lsm, LCK_PR, &lockh);
746                         GOTO(err_out, err = PTR_ERR(cookie));
747                 }
748
749                 /* XXX errors? */
750                 err = llu_extent_unlock(fd, inode, lsm, LCK_PR, &lockh);
751                 if (err)
752                         CERROR("extent_unlock fail: %d\n", err);
753         }
754 #if 0
755         if (readed > 0)
756                 llu_update_atime(inode);
757 #endif
758         RETURN(lsca);
759
760 err_out:
761         /* teardown all async stuff */
762         while (lsca->ncookies--) {
763                 put_sysio_cookie(lsca->cookies[lsca->ncookies]);
764         }
765         OBD_FREE(lsca, sizeof(*lsca));
766
767         RETURN(ERR_PTR(err));
768 }
769
770 int llu_iop_iodone(struct ioctx *ioctxp)
771 {
772         struct llu_sysio_callback_args *lsca = ioctxp->ioctx_private;
773         struct llu_sysio_cookie *cookie;
774         int i, err = 0, rc = 0;
775         ENTRY;
776
777         /* write/read(fd, buf, 0) */
778         if (!lsca)
779                 return 1;
780
781         LASSERT(!IS_ERR(lsca));
782
783         for (i = 0; i < lsca->ncookies; i++) {
784                 cookie = lsca->cookies[i];
785                 if (cookie) {
786                         err = osic_wait(&cookie->lsc_osic);
787                         if (err && !rc)
788                                 rc = err;
789                         if (!rc)
790                                 ioctxp->ioctx_cc += cookie->lsc_rwcount;
791                         put_sysio_cookie(cookie);
792                 }
793         }
794
795         if (rc)
796                 ioctxp->ioctx_cc = rc;
797
798         OBD_FREE(lsca, sizeof(*lsca));
799         ioctxp->ioctx_private = NULL;
800
801         RETURN(1);
802 }