Whamcloud - gitweb
Land b_smallfix onto HEAD (20040512_1806)
[fs/lustre-release.git] / lustre / liblustre / rw.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Lustre Light block IO
5  *
6  *  Copyright (c) 2002, 2003 Cluster File Systems, Inc.
7  *
8  *   This file is part of Lustre, http://www.lustre.org.
9  *
10  *   Lustre is free software; you can redistribute it and/or
11  *   modify it under the terms of version 2 of the GNU General Public
12  *   License as published by the Free Software Foundation.
13  *
14  *   Lustre is distributed in the hope that it will be useful,
15  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
16  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17  *   GNU General Public License for more details.
18  *
19  *   You should have received a copy of the GNU General Public License
20  *   along with Lustre; if not, write to the Free Software
21  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
22  */
23
24 #define DEBUG_SUBSYSTEM S_LLITE
25
26 #include <stdlib.h>
27 #include <string.h>
28 #include <assert.h>
29 #include <time.h>
30 #include <sys/types.h>
31 #include <sys/queue.h>
32 #include <fcntl.h>
33
34 #include <sysio.h>
35 #include <fs.h>
36 #include <mount.h>
37 #include <inode.h>
38 #include <file.h>
39
40 #undef LIST_HEAD
41
42 #include "llite_lib.h"
43
44 static int llu_lock_to_stripe_offset(struct inode *inode, struct ldlm_lock *lock)
45 {
46         struct llu_inode_info *lli = llu_i2info(inode);
47         struct lov_stripe_md *lsm = lli->lli_smd;
48         struct obd_export *exp = llu_i2obdexp(inode);
49         struct {
50                 char name[16];
51                 struct ldlm_lock *lock;
52                 struct lov_stripe_md *lsm;
53         } key = { .name = "lock_to_stripe", .lock = lock, .lsm = lsm };
54         __u32 stripe, vallen = sizeof(stripe);
55         int rc;
56         ENTRY;
57
58         if (lsm->lsm_stripe_count == 1)
59                 RETURN(0);
60
61         /* get our offset in the lov */
62         rc = obd_get_info(exp, sizeof(key), &key, &vallen, &stripe);
63         if (rc != 0) {
64                 CERROR("obd_get_info: rc = %d\n", rc);
65                 LBUG();
66         }
67         LASSERT(stripe < lsm->lsm_stripe_count);
68         RETURN(stripe);
69 }
70
71 static int llu_extent_lock_callback(struct ldlm_lock *lock,
72                                     struct ldlm_lock_desc *new, void *data,
73                                     int flag)
74 {
75         struct lustre_handle lockh = { 0 };
76         int rc;
77         ENTRY;
78
79         if ((unsigned long)data > 0 && (unsigned long)data < 0x1000) {
80                 LDLM_ERROR(lock, "cancelling lock with bad data %p", data);
81                 LBUG();
82         }
83
84         switch (flag) {
85         case LDLM_CB_BLOCKING:
86                 ldlm_lock2handle(lock, &lockh);
87                 rc = ldlm_cli_cancel(&lockh);
88                 if (rc != ELDLM_OK)
89                         CERROR("ldlm_cli_cancel failed: %d\n", rc);
90                 break;
91         case LDLM_CB_CANCELING: {
92                 struct inode *inode;
93                 struct llu_inode_info *lli;
94                 struct lov_stripe_md *lsm;
95                 __u32 stripe;
96                 __u64 kms;
97
98                 /* This lock wasn't granted, don't try to evict pages */
99                 if (lock->l_req_mode != lock->l_granted_mode)
100                         RETURN(0);
101
102                 inode = llu_inode_from_lock(lock);
103                 if (!inode)
104                         RETURN(0);
105                 lli= llu_i2info(inode);
106                 if (!lli)
107                         goto iput;
108                 if (!lli->lli_smd)
109                         goto iput;
110                 lsm = lli->lli_smd;
111
112                 stripe = llu_lock_to_stripe_offset(inode, lock);
113                 kms = ldlm_extent_shift_kms(lock,
114                                             lsm->lsm_oinfo[stripe].loi_kms);
115                 if (lsm->lsm_oinfo[stripe].loi_kms != kms)
116                         LDLM_DEBUG(lock, "updating kms from "LPU64" to "LPU64,
117                                    lsm->lsm_oinfo[stripe].loi_kms, kms);
118                 lsm->lsm_oinfo[stripe].loi_kms = kms;
119 iput:
120                 I_RELE(inode);
121                 break;
122         }
123         default:
124                 LBUG();
125         }
126
127         RETURN(0);
128 }
129
130 static int llu_glimpse_callback(struct ldlm_lock *lock, void *reqp)
131 {
132         struct ptlrpc_request *req = reqp;
133         struct inode *inode = llu_inode_from_lock(lock);
134         struct llu_inode_info *lli;
135         struct ost_lvb *lvb;
136         int rc, size = sizeof(*lvb), stripe = 0;
137         ENTRY;
138
139         if (inode == NULL)
140                 GOTO(out, rc = -ELDLM_NO_LOCK_DATA);
141         lli = llu_i2info(inode);
142         if (lli == NULL)
143                 GOTO(iput, rc = -ELDLM_NO_LOCK_DATA);
144         if (lli->lli_smd == NULL)
145                 GOTO(iput, rc = -ELDLM_NO_LOCK_DATA);
146
147         /* First, find out which stripe index this lock corresponds to. */
148         if (lli->lli_smd->lsm_stripe_count > 1)
149                 stripe = llu_lock_to_stripe_offset(inode, lock);
150
151         rc = lustre_pack_reply(req, 1, &size, NULL);
152         if (rc) {
153                 CERROR("lustre_pack_reply: %d\n", rc);
154                 GOTO(iput, rc);
155         }
156
157         lvb = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*lvb));
158         lvb->lvb_size = lli->lli_smd->lsm_oinfo[stripe].loi_kms;
159
160         LDLM_DEBUG(lock, "i_size: %llu -> stripe number %u -> kms "LPU64,
161                    lli->lli_st_size, stripe, lvb->lvb_size);
162  iput:
163         I_RELE(inode);
164  out:
165         /* These errors are normal races, so we don't want to fill the console
166          * with messages by calling ptlrpc_error() */
167         if (rc == -ELDLM_NO_LOCK_DATA)
168                 lustre_pack_reply(req, 0, NULL, NULL);
169
170         req->rq_status = rc;
171         return rc;
172 }
173
174 __u64 lov_merge_size(struct lov_stripe_md *lsm, int kms);
175 __u64 lov_merge_mtime(struct lov_stripe_md *lsm, __u64 current_time);
176
177 /* NB: lov_merge_size will prefer locally cached writes if they extend the
178  * file (because it prefers KMS over RSS when larger) */
179 int llu_glimpse_size(struct inode *inode, struct ost_lvb *lvb)
180 {
181         struct llu_inode_info *lli = llu_i2info(inode);
182         struct llu_sb_info *sbi = llu_i2sbi(inode);
183         ldlm_policy_data_t policy = { .l_extent = { 0, OBD_OBJECT_EOF } };
184         struct lustre_handle lockh = { 0 };
185         int rc, flags = LDLM_FL_HAS_INTENT;
186         ENTRY;
187
188         CDEBUG(D_DLMTRACE, "Glimpsing inode %lu\n", lli->lli_st_ino);
189
190         rc = obd_enqueue(sbi->ll_osc_exp, lli->lli_smd, LDLM_EXTENT, &policy,
191                          LCK_PR, &flags, llu_extent_lock_callback,
192                          ldlm_completion_ast, llu_glimpse_callback, inode,
193                          sizeof(*lvb), lustre_swab_ost_lvb, &lockh);
194         if (rc > 0)
195                 RETURN(-EIO);
196
197         lvb->lvb_size = lov_merge_size(lli->lli_smd, 0);
198         //inode->i_mtime = lov_merge_mtime(lli->lli_smd, inode->i_mtime);
199
200         CDEBUG(D_DLMTRACE, "glimpse: size: "LPU64"\n", lvb->lvb_size);
201
202         obd_cancel(sbi->ll_osc_exp, lli->lli_smd, LCK_PR, &lockh);
203
204         RETURN(rc);
205 }
206
207 int llu_extent_lock(struct ll_file_data *fd, struct inode *inode,
208                     struct lov_stripe_md *lsm, int mode,
209                     ldlm_policy_data_t *policy, struct lustre_handle *lockh,
210                     int ast_flags)
211 {
212         struct llu_sb_info *sbi = llu_i2sbi(inode);
213         struct llu_inode_info *lli = llu_i2info(inode);
214         int rc;
215         ENTRY;
216
217         LASSERT(lockh->cookie == 0);
218
219         /* XXX phil: can we do this?  won't it screw the file size up? */
220         if ((fd && (fd->fd_flags & LL_FILE_IGNORE_LOCK)) ||
221             (sbi->ll_flags & LL_SBI_NOLCK))
222                 RETURN(0);
223
224         CDEBUG(D_DLMTRACE, "Locking inode %lu, start "LPU64" end "LPU64"\n",
225                lli->lli_st_ino, policy->l_extent.start, policy->l_extent.end);
226
227         rc = obd_enqueue(sbi->ll_osc_exp, lsm, LDLM_EXTENT, policy, mode,
228                          &ast_flags, llu_extent_lock_callback,
229                          ldlm_completion_ast, llu_glimpse_callback, inode,
230                          sizeof(struct ost_lvb), lustre_swab_ost_lvb, lockh);
231         if (rc > 0)
232                 rc = -EIO;
233
234         if (policy->l_extent.start == 0 &&
235             policy->l_extent.end == OBD_OBJECT_EOF)
236                 lli->lli_st_size = lov_merge_size(lsm, 1);
237
238         //inode->i_mtime = lov_merge_mtime(lsm, inode->i_mtime);
239
240         RETURN(rc);
241 }
242
243 #if 0
244 int llu_extent_lock_no_validate(struct ll_file_data *fd,
245                                 struct inode *inode,
246                                 struct lov_stripe_md *lsm,
247                                 int mode,
248                                 struct ldlm_extent *extent,
249                                 struct lustre_handle *lockh,
250                                 int ast_flags)
251 {
252         struct llu_sb_info *sbi = llu_i2sbi(inode);
253         struct llu_inode_info *lli = llu_i2info(inode);
254         int rc;
255         ENTRY;
256
257         LASSERT(lockh->cookie == 0);
258
259         /* XXX phil: can we do this?  won't it screw the file size up? */
260         if ((fd && (fd->fd_flags & LL_FILE_IGNORE_LOCK)) ||
261             (sbi->ll_flags & LL_SBI_NOLCK))
262                 RETURN(0);
263
264         CDEBUG(D_DLMTRACE, "Locking inode %lu, start "LPU64" end "LPU64"\n",
265                lli->lli_st_ino, extent->start, extent->end);
266
267         rc = obd_enqueue(sbi->ll_osc_exp, lsm, NULL, LDLM_EXTENT, extent,
268                          sizeof(extent), mode, &ast_flags,
269                          llu_extent_lock_callback, inode, lockh);
270
271         RETURN(rc);
272 }
273
274 /*
275  * this grabs a lock and manually implements behaviour that makes it look like
276  * the OST is returning the file size with each lock acquisition.
277  */
278 int llu_extent_lock(struct ll_file_data *fd, struct inode *inode,
279                     struct lov_stripe_md *lsm, int mode,
280                     struct ldlm_extent *extent, struct lustre_handle *lockh,
281                     int nonblock)
282 {
283         struct llu_inode_info *lli = llu_i2info(inode);
284         struct obd_export *exp = llu_i2obdexp(inode);
285         struct ldlm_extent size_lock;
286         struct lustre_handle match_lockh = {0};
287         int flags, rc, matched;
288         int astflags = nonblock ? LDLM_FL_BLOCK_NOWAIT : 0;
289         ENTRY;
290
291         rc = llu_extent_lock_no_validate(fd, inode, lsm, mode, extent,
292                                          lockh, astflags);
293         if (rc != ELDLM_OK)
294                 RETURN(rc);
295
296         if (test_bit(LLI_F_HAVE_OST_SIZE_LOCK, &lli->lli_flags))
297                 RETURN(0);
298
299         rc = llu_inode_getattr(inode, lsm);
300         if (rc) {
301                 llu_extent_unlock(fd, inode, lsm, mode, lockh);
302                 RETURN(rc);
303         }
304
305         size_lock.start = lli->lli_st_size;
306         size_lock.end = OBD_OBJECT_EOF;
307
308         /* XXX I bet we should be checking the lock ignore flags.. */
309         /* FIXME use LDLM_FL_TEST_LOCK instead */
310         flags = LDLM_FL_CBPENDING | LDLM_FL_BLOCK_GRANTED;
311         matched = obd_match(exp, lsm, LDLM_EXTENT, &size_lock,
312                             sizeof(size_lock), LCK_PR, &flags, inode,
313                             &match_lockh);
314
315         /* hey, alright, we hold a size lock that covers the size we
316          * just found, its not going to change for a while.. */
317         if (matched == 1) {
318                 set_bit(LLI_F_HAVE_OST_SIZE_LOCK, &lli->lli_flags);
319                 obd_cancel(exp, lsm, LCK_PR, &match_lockh);
320         }
321
322         RETURN(0);
323 }
324 #endif
325
326 int llu_extent_unlock(struct ll_file_data *fd, struct inode *inode,
327                 struct lov_stripe_md *lsm, int mode,
328                 struct lustre_handle *lockh)
329 {
330         struct llu_sb_info *sbi = llu_i2sbi(inode);
331         int rc;
332         ENTRY;
333
334         /* XXX phil: can we do this?  won't it screw the file size up? */
335         if ((fd && (fd->fd_flags & LL_FILE_IGNORE_LOCK)) ||
336             (sbi->ll_flags & LL_SBI_NOLCK))
337                 RETURN(0);
338
339         rc = obd_cancel(sbi->ll_osc_exp, lsm, mode, lockh);
340
341         RETURN(rc);
342 }
343
344 #define LLAP_MAGIC 12346789
345
346 struct ll_async_page {
347         int             llap_magic;
348         void           *llap_cookie;
349         int             llap_queued;
350         struct page    *llap_page;
351         struct inode   *llap_inode;
352 };
353
354 static struct ll_async_page *llap_from_cookie(void *cookie)
355 {
356         struct ll_async_page *llap = cookie;
357         if (llap->llap_magic != LLAP_MAGIC)
358                 return ERR_PTR(-EINVAL);
359         return llap;
360 };
361
362 static void llu_ap_fill_obdo(void *data, int cmd, struct obdo *oa)
363 {
364         struct ll_async_page *llap;
365         struct inode *inode;
366         struct lov_stripe_md *lsm;
367         obd_flag valid_flags;
368         ENTRY;
369
370         llap = llap_from_cookie(data);
371         if (IS_ERR(llap)) {
372                 EXIT;
373                 return;
374         }
375
376         inode = llap->llap_inode;
377         lsm = llu_i2info(inode)->lli_smd;
378
379         oa->o_id = lsm->lsm_object_id;
380         oa->o_valid = OBD_MD_FLID;
381         valid_flags = OBD_MD_FLTYPE | OBD_MD_FLATIME;
382         if (cmd == OBD_BRW_WRITE)
383                 valid_flags |= OBD_MD_FLMTIME | OBD_MD_FLCTIME;
384
385         obdo_from_inode(oa, inode, valid_flags);
386         EXIT;
387 }
388
389 /* called for each page in a completed rpc.*/
390 static void llu_ap_completion(void *data, int cmd, struct obdo *oa, int rc)
391 {
392         struct ll_async_page *llap;
393         struct page *page;
394
395         llap = llap_from_cookie(data);
396         if (IS_ERR(llap)) {
397                 EXIT;
398                 return;
399         }
400
401         llap->llap_queued = 0;
402         page = llap->llap_page;
403
404         if (rc != 0) {
405                 if (cmd == OBD_BRW_WRITE)
406                         CERROR("writeback error on page %p index %ld: %d\n", 
407                                page, page->index, rc);
408         }
409         EXIT;
410 }
411
412 static struct obd_async_page_ops llu_async_page_ops = {
413         .ap_make_ready =        NULL,
414         .ap_refresh_count =     NULL,
415         .ap_fill_obdo =         llu_ap_fill_obdo,
416         .ap_completion =        llu_ap_completion,
417 };
418
419 static
420 struct llu_sysio_cookie* get_sysio_cookie(struct inode *inode, int maxpages)
421 {
422         struct llu_sysio_cookie *cookie;
423         int rc;
424
425         OBD_ALLOC(cookie, LLU_SYSIO_COOKIE_SIZE(maxpages));
426         if (cookie == NULL)
427                 goto out;
428
429         I_REF(inode);
430         cookie->lsc_inode = inode;
431         cookie->lsc_maxpages = maxpages;
432         cookie->lsc_llap = (struct ll_async_page *)(cookie + 1);
433         cookie->lsc_pages = (struct page *) (cookie->lsc_llap + maxpages);
434
435         rc = oig_init(&cookie->lsc_oig);
436         if (rc) {
437                 OBD_FREE(cookie, LLU_SYSIO_COOKIE_SIZE(maxpages));
438                 cookie = NULL;
439         }
440
441 out:
442         return cookie;
443 }
444
445 static
446 void put_sysio_cookie(struct llu_sysio_cookie *cookie)
447 {
448         struct lov_stripe_md *lsm = llu_i2info(cookie->lsc_inode)->lli_smd;
449         struct obd_export *exp = llu_i2obdexp(cookie->lsc_inode);
450         struct ll_async_page *llap = cookie->lsc_llap;
451         int i;
452
453         for (i = 0; i< cookie->lsc_maxpages; i++) {
454                 if (llap[i].llap_cookie)
455                         obd_teardown_async_page(exp, lsm, NULL,
456                                                 llap[i].llap_cookie);
457         }
458
459         I_RELE(cookie->lsc_inode);
460
461         oig_release(cookie->lsc_oig);
462         OBD_FREE(cookie, LLU_SYSIO_COOKIE_SIZE(cookie->lsc_maxpages));
463 }
464
465 static
466 int llu_prep_async_io(struct llu_sysio_cookie *cookie, int cmd,
467                       char *buf, loff_t pos, size_t count)
468 {
469         struct llu_inode_info *lli = llu_i2info(cookie->lsc_inode);
470         struct lov_stripe_md *lsm = lli->lli_smd;
471         struct obd_export *exp = llu_i2obdexp(cookie->lsc_inode);
472         struct page *pages = cookie->lsc_pages;
473         struct ll_async_page *llap = cookie->lsc_llap;
474         int i, rc, npages = 0;
475         ENTRY;
476
477         if (!exp)
478                 RETURN(-EINVAL);
479
480         /* prepare the pages array */
481         do {
482                 unsigned long index, offset, bytes;
483
484                 offset = (pos & ~PAGE_CACHE_MASK);
485                 index = pos >> PAGE_CACHE_SHIFT;
486                 bytes = PAGE_CACHE_SIZE - offset;
487                 if (bytes > count)
488                         bytes = count;
489
490                 /* prevent read beyond file range */
491                 if ((cmd == OBD_BRW_READ) &&
492                     (pos + bytes) >= lli->lli_st_size) {
493                         if (pos >= lli->lli_st_size)
494                                 break;
495                         bytes = lli->lli_st_size - pos;
496                 }
497
498                 /* prepare page for this index */
499                 pages[npages].index = index;
500                 pages[npages].addr = buf - offset;
501
502                 pages[npages]._offset = offset;
503                 pages[npages]._count = bytes;
504
505                 npages++;
506                 count -= bytes;
507                 pos += bytes;
508                 buf += bytes;
509
510                 cookie->lsc_rwcount += bytes;
511         } while (count);
512
513         cookie->lsc_npages = npages;
514
515         for (i = 0; i < npages; i++) {
516                 llap[i].llap_magic = LLAP_MAGIC;
517                 rc = obd_prep_async_page(exp, lsm, NULL, &pages[i],
518                                          (obd_off)pages[i].index << PAGE_SHIFT,
519                                          &llu_async_page_ops,
520                                          &llap[i], &llap[i].llap_cookie);
521                 if (rc) {
522                         llap[i].llap_cookie = NULL;
523                         RETURN(rc);
524                 }
525                 CDEBUG(D_CACHE, "llap %p page %p cookie %p obj off "LPU64"\n",
526                        &llap[i], &pages[i], llap[i].llap_cookie,
527                        (obd_off)pages[i].index << PAGE_SHIFT);
528                 pages[i].private = (unsigned long)&llap[i];
529                 llap[i].llap_page = &pages[i];
530                 llap[i].llap_inode = cookie->lsc_inode;
531
532                 rc = obd_queue_group_io(exp, lsm, NULL, cookie->lsc_oig,
533                                         llap[i].llap_cookie, cmd,
534                                         pages[i]._offset, pages[i]._count, 0,
535                                         ASYNC_READY | ASYNC_URGENT |
536                                         ASYNC_COUNT_STABLE | ASYNC_GROUP_SYNC);
537                 if (rc)
538                         RETURN(rc);
539
540                 llap[i].llap_queued = 1;
541         }
542
543         RETURN(0);
544 }
545
546 static
547 int llu_start_async_io(struct llu_sysio_cookie *cookie)
548 {
549         struct lov_stripe_md *lsm = llu_i2info(cookie->lsc_inode)->lli_smd;
550         struct obd_export *exp = llu_i2obdexp(cookie->lsc_inode);
551
552         return obd_trigger_group_io(exp, lsm, NULL, cookie->lsc_oig);
553 }
554
555 /*
556  * read/write a continuous buffer for an inode (zero-copy)
557  */
558 struct llu_sysio_cookie*
559 llu_rw(int cmd, struct inode *inode, char *buf, size_t count, loff_t pos)
560 {
561         struct llu_sysio_cookie *cookie;
562         int max_pages, rc;
563         ENTRY;
564
565         max_pages = (count >> PAGE_SHIFT) + 2;
566
567         cookie = get_sysio_cookie(inode, max_pages);
568         if (!cookie)
569                 RETURN(ERR_PTR(-ENOMEM));
570
571         rc = llu_prep_async_io(cookie, cmd, buf, pos, count);
572         if (rc)
573                 GOTO(out_cleanup, rc);
574
575         rc = llu_start_async_io(cookie);
576         if (rc)
577                 GOTO(out_cleanup, rc);
578
579 /*
580         rc = oig_wait(&oig);
581         if (rc) {
582                 CERROR("file i/o error!\n");
583                 rw_count = rc;
584         }
585 */
586         RETURN(cookie);
587
588 out_cleanup:
589         put_sysio_cookie(cookie);
590         RETURN(ERR_PTR(rc));
591 }
592
593 void lov_increase_kms(struct obd_export *exp, struct lov_stripe_md *lsm,
594                       obd_off size);
595
596 struct llu_sysio_callback_args*
597 llu_file_write(struct inode *inode, const struct iovec *iovec,
598                size_t iovlen, loff_t pos)
599 {
600         struct llu_inode_info *lli = llu_i2info(inode);
601         struct ll_file_data *fd = lli->lli_file_data;
602         struct lustre_handle lockh = {0};
603         struct lov_stripe_md *lsm = lli->lli_smd;
604         struct obd_export *exp = NULL;
605         ldlm_policy_data_t policy;
606         struct llu_sysio_callback_args *lsca;
607         struct llu_sysio_cookie *cookie;
608         int astflag = (lli->lli_open_flags & O_NONBLOCK) ?
609                        LDLM_FL_BLOCK_NOWAIT : 0;
610         ldlm_error_t err;
611         int iovidx;
612         ENTRY;
613
614         /* XXX consider other types later */
615         if (!S_ISREG(lli->lli_st_mode))
616                 LBUG();
617
618         LASSERT(iovlen <= MAX_IOVEC);
619
620         exp = llu_i2obdexp(inode);
621         if (exp == NULL)
622                 RETURN(ERR_PTR(-EINVAL));
623
624         OBD_ALLOC(lsca, sizeof(*lsca));
625         if (!lsca)
626                 RETURN(ERR_PTR(-ENOMEM));
627
628         /* FIXME optimize the following extent locking */
629         for (iovidx = 0; iovidx < iovlen; iovidx++) {
630                 char *buf = (char*)iovec[iovidx].iov_base;
631                 size_t count = iovec[iovidx].iov_len;
632
633                 if (count == 0)
634                         continue;
635
636                 if (pos + count > lli->lli_maxbytes)
637                         GOTO(err_out, err = -ERANGE);
638
639                 /* FIXME libsysio haven't handle O_APPEND?? */
640                 policy.l_extent.start = pos;
641                 policy.l_extent.end = pos + count - 1;
642
643                 err = llu_extent_lock(fd, inode, lsm, LCK_PW, &policy,
644                                       &lockh, astflag);
645                 if (err != ELDLM_OK)
646                         GOTO(err_out, err = -ENOLCK);
647
648                 CDEBUG(D_INFO, "Writing inode %lu, "LPSZ" bytes, offset %llu\n",
649                        lli->lli_st_ino, count, pos);
650
651                 cookie = llu_rw(OBD_BRW_WRITE, inode, buf, count, pos);
652                 if (!IS_ERR(cookie)) {
653                         /* save cookie */
654                         lsca->cookies[lsca->ncookies++] = cookie;
655                         pos += count;
656                         lov_increase_kms(exp, lsm, pos);
657                         /* file size grow */
658                         if (pos > lli->lli_st_size)
659                                 lli->lli_st_size = pos;
660                 } else {
661                         llu_extent_unlock(fd, inode, lsm, LCK_PW, &lockh);
662                         GOTO(err_out, err = PTR_ERR(cookie));
663                 }
664
665                 /* XXX errors? */
666                 err = llu_extent_unlock(fd, inode, lsm, LCK_PW, &lockh);
667                 if (err)
668                         CERROR("extent unlock error %d\n", err);
669         }
670
671         RETURN(lsca);
672
673 err_out:
674         /* teardown all async stuff */
675         while (lsca->ncookies--) {
676                 put_sysio_cookie(lsca->cookies[lsca->ncookies]);
677         }
678         OBD_FREE(lsca, sizeof(*lsca));
679
680         RETURN(ERR_PTR(err));
681 }
682
683 #if 0
684 static void llu_update_atime(struct inode *inode)
685 {
686         struct llu_inode_info *lli = llu_i2info(inode);
687
688 #ifdef USE_ATIME
689         struct iattr attr;
690
691         attr.ia_atime = LTIME_S(CURRENT_TIME);
692         attr.ia_valid = ATTR_ATIME;
693
694         if (lli->lli_st_atime == attr.ia_atime) return;
695         if (IS_RDONLY(inode)) return;
696         if (IS_NOATIME(inode)) return;
697
698         /* ll_inode_setattr() sets inode->i_atime from attr.ia_atime */
699         llu_inode_setattr(inode, &attr, 0);
700 #else
701         /* update atime, but don't explicitly write it out just this change */
702         inode->i_atime = CURRENT_TIME;
703 #endif
704 }
705 #endif
706
707 struct llu_sysio_callback_args*
708 llu_file_read(struct inode *inode, const struct iovec *iovec,
709               size_t iovlen, loff_t pos)
710 {
711         struct llu_inode_info *lli = llu_i2info(inode);
712         struct ll_file_data *fd = lli->lli_file_data;
713         struct lov_stripe_md *lsm = lli->lli_smd;
714         struct lustre_handle lockh = { 0 };
715         ldlm_policy_data_t policy;
716         struct llu_sysio_callback_args *lsca;
717         struct llu_sysio_cookie *cookie;
718         int astflag = (lli->lli_open_flags & O_NONBLOCK) ?
719                        LDLM_FL_BLOCK_NOWAIT : 0;
720         __u64 kms;
721         int iovidx;
722
723         ldlm_error_t err;
724         ENTRY;
725
726         OBD_ALLOC(lsca, sizeof(*lsca));
727         if (!lsca)
728                 RETURN(ERR_PTR(-ENOMEM));
729
730         for (iovidx = 0; iovidx < iovlen; iovidx++) {
731                 char *buf = iovec[iovidx].iov_base;
732                 size_t count = iovec[iovidx].iov_len;
733
734                 /* "If nbyte is 0, read() will return 0 and have no other results."
735                  *                      -- Single Unix Spec */
736                 if (count == 0)
737                         continue;
738
739                 policy.l_extent.start = pos;
740                 policy.l_extent.end = pos + count - 1;
741
742                 err = llu_extent_lock(fd, inode, lsm, LCK_PR, &policy,
743                                       &lockh, astflag);
744                 if (err != ELDLM_OK)
745                         GOTO(err_out, err = -ENOLCK);
746
747                 kms = lov_merge_size(lsm, 1);
748                 if (policy.l_extent.end > kms) {
749                         /* A glimpse is necessary to determine whether we
750                          * return a short read or some zeroes at the end of
751                          * the buffer */
752                         struct ost_lvb lvb;
753                         if (llu_glimpse_size(inode, &lvb)) {
754                                 llu_extent_unlock(fd, inode, lsm, LCK_PR, &lockh);
755                                 GOTO(err_out, err = -ENOLCK);
756                         }
757                         lli->lli_st_size = lvb.lvb_size;
758                 } else {
759                         lli->lli_st_size = kms;
760                 }
761
762                 CDEBUG(D_INFO, "Reading inode %lu, "LPSZ" bytes, offset %Ld, "
763                        "i_size "LPU64"\n", lli->lli_st_ino, count, pos,
764                        lli->lli_st_size);
765
766                 if (pos >= lli->lli_st_size) {
767                         llu_extent_unlock(fd, inode, lsm, LCK_PR, &lockh);
768                         break;
769                 }
770
771                 cookie = llu_rw(OBD_BRW_READ, inode, buf, count, pos);
772                 if (!IS_ERR(cookie)) {
773                         /* save cookie */
774                         lsca->cookies[lsca->ncookies++] = cookie;
775                         pos += count;
776                 } else {
777                         llu_extent_unlock(fd, inode, lsm, LCK_PR, &lockh);
778                         GOTO(err_out, err = PTR_ERR(cookie));
779                 }
780
781                 /* XXX errors? */
782                 err = llu_extent_unlock(fd, inode, lsm, LCK_PR, &lockh);
783                 if (err)
784                         CERROR("extent_unlock fail: %d\n", err);
785         }
786 #if 0
787         if (readed > 0)
788                 llu_update_atime(inode);
789 #endif
790         RETURN(lsca);
791
792 err_out:
793         /* teardown all async stuff */
794         while (lsca->ncookies--) {
795                 put_sysio_cookie(lsca->cookies[lsca->ncookies]);
796         }
797         OBD_FREE(lsca, sizeof(*lsca));
798
799         RETURN(ERR_PTR(err));
800 }
801
802 int llu_iop_iodone(struct ioctx *ioctxp)
803 {
804         struct llu_sysio_callback_args *lsca = ioctxp->ioctx_private;
805         struct llu_sysio_cookie *cookie;
806         int i, err = 0, rc = 0;
807         ENTRY;
808
809         /* write/read(fd, buf, 0) */
810         if (!lsca) {
811                 ioctxp->ioctx_cc = 0;
812                 RETURN(1);
813         }
814
815         LASSERT(!IS_ERR(lsca));
816
817         for (i = 0; i < lsca->ncookies; i++) {
818                 cookie = lsca->cookies[i];
819                 if (cookie) {
820                         err = oig_wait(cookie->lsc_oig);
821                         if (err && !rc)
822                                 rc = err;
823                         if (!rc)
824                                 ioctxp->ioctx_cc += cookie->lsc_rwcount;
825                         put_sysio_cookie(cookie);
826                 }
827         }
828
829         if (rc) {
830                 LASSERT(rc < 0);
831                 ioctxp->ioctx_cc = -1;
832                 ioctxp->ioctx_errno = -rc;
833         }
834
835         OBD_FREE(lsca, sizeof(*lsca));
836         ioctxp->ioctx_private = NULL;
837
838         RETURN(1);
839 }