Whamcloud - gitweb
a59dba0d21c182aaba3dffa5494ced2fba2052ed
[fs/lustre-release.git] / lustre / liblustre / rw.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Lustre Light block IO
5  *
6  *  Copyright (c) 2002, 2003 Cluster File Systems, Inc.
7  *
8  *   This file is part of Lustre, http://www.lustre.org.
9  *
10  *   Lustre is free software; you can redistribute it and/or
11  *   modify it under the terms of version 2 of the GNU General Public
12  *   License as published by the Free Software Foundation.
13  *
14  *   Lustre is distributed in the hope that it will be useful,
15  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
16  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17  *   GNU General Public License for more details.
18  *
19  *   You should have received a copy of the GNU General Public License
20  *   along with Lustre; if not, write to the Free Software
21  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
22  */
23
24 #define DEBUG_SUBSYSTEM S_LLITE
25
26 #include <stdlib.h>
27 #include <string.h>
28 #include <assert.h>
29 #include <time.h>
30 #include <sys/types.h>
31 #include <sys/queue.h>
32 #include <fcntl.h>
33
34 #include <sysio.h>
35 #include <fs.h>
36 #include <mount.h>
37 #include <inode.h>
38 #include <file.h>
39
40 #undef LIST_HEAD
41
42 #include "llite_lib.h"
43
44 size_t llap_cookie_size;
45
46 static int llu_lock_to_stripe_offset(struct inode *inode,struct ldlm_lock *lock)
47 {
48         struct llu_inode_info *lli = llu_i2info(inode);
49         struct lov_stripe_md *lsm = lli->lli_smd;
50         struct obd_export *exp = llu_i2obdexp(inode);
51         struct {
52                 char name[16];
53                 struct ldlm_lock *lock;
54                 struct lov_stripe_md *lsm;
55         } key = { .name = "lock_to_stripe", .lock = lock, .lsm = lsm };
56         __u32 stripe, vallen = sizeof(stripe);
57         int rc;
58         ENTRY;
59
60         if (lsm->lsm_stripe_count == 1)
61                 RETURN(0);
62
63         /* get our offset in the lov */
64         rc = obd_get_info(exp, sizeof(key), &key, &vallen, &stripe);
65         if (rc != 0) {
66                 CERROR("obd_get_info: rc = %d\n", rc);
67                 LBUG();
68         }
69         LASSERT(stripe < lsm->lsm_stripe_count);
70         RETURN(stripe);
71 }
72
73 static int llu_extent_lock_callback(struct ldlm_lock *lock,
74                                     struct ldlm_lock_desc *new, void *data,
75                                     int flag)
76 {
77         struct lustre_handle lockh = { 0 };
78         int rc;
79         ENTRY;
80
81         if ((unsigned long)data > 0 && (unsigned long)data < 0x1000) {
82                 LDLM_ERROR(lock, "cancelling lock with bad data %p", data);
83                 LBUG();
84         }
85
86         switch (flag) {
87         case LDLM_CB_BLOCKING:
88                 ldlm_lock2handle(lock, &lockh);
89                 rc = ldlm_cli_cancel(&lockh);
90                 if (rc != ELDLM_OK)
91                         CERROR("ldlm_cli_cancel failed: %d\n", rc);
92                 break;
93         case LDLM_CB_CANCELING: {
94                 struct inode *inode;
95                 struct llu_inode_info *lli;
96                 struct lov_stripe_md *lsm;
97                 __u32 stripe;
98                 __u64 kms;
99
100                 /* This lock wasn't granted, don't try to evict pages */
101                 if (lock->l_req_mode != lock->l_granted_mode)
102                         RETURN(0);
103
104                 inode = llu_inode_from_lock(lock);
105                 if (!inode)
106                         RETURN(0);
107                 lli= llu_i2info(inode);
108                 if (!lli)
109                         goto iput;
110                 if (!lli->lli_smd)
111                         goto iput;
112                 lsm = lli->lli_smd;
113
114                 stripe = llu_lock_to_stripe_offset(inode, lock);
115                 l_lock(&lock->l_resource->lr_namespace->ns_lock);
116                 kms = ldlm_extent_shift_kms(lock,
117                                             lsm->lsm_oinfo[stripe].loi_kms);
118                 l_unlock(&lock->l_resource->lr_namespace->ns_lock);
119                 if (lsm->lsm_oinfo[stripe].loi_kms != kms)
120                         LDLM_DEBUG(lock, "updating kms from "LPU64" to "LPU64,
121                                    lsm->lsm_oinfo[stripe].loi_kms, kms);
122                 lsm->lsm_oinfo[stripe].loi_kms = kms;
123 iput:
124                 I_RELE(inode);
125                 break;
126         }
127         default:
128                 LBUG();
129         }
130
131         RETURN(0);
132 }
133
134 static int llu_glimpse_callback(struct ldlm_lock *lock, void *reqp)
135 {
136         struct ptlrpc_request *req = reqp;
137         struct inode *inode = llu_inode_from_lock(lock);
138         struct llu_inode_info *lli;
139         struct ost_lvb *lvb;
140         int rc, size = sizeof(*lvb), stripe = 0;
141         ENTRY;
142
143         if (inode == NULL)
144                 GOTO(out, rc = -ELDLM_NO_LOCK_DATA);
145         lli = llu_i2info(inode);
146         if (lli == NULL)
147                 GOTO(iput, rc = -ELDLM_NO_LOCK_DATA);
148         if (lli->lli_smd == NULL)
149                 GOTO(iput, rc = -ELDLM_NO_LOCK_DATA);
150
151         /* First, find out which stripe index this lock corresponds to. */
152         if (lli->lli_smd->lsm_stripe_count > 1)
153                 stripe = llu_lock_to_stripe_offset(inode, lock);
154
155         rc = lustre_pack_reply(req, 1, &size, NULL);
156         if (rc) {
157                 CERROR("lustre_pack_reply: %d\n", rc);
158                 GOTO(iput, rc);
159         }
160
161         lvb = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*lvb));
162         lvb->lvb_size = lli->lli_smd->lsm_oinfo[stripe].loi_kms;
163
164         LDLM_DEBUG(lock, "i_size: %llu -> stripe number %u -> kms "LPU64,
165                    lli->lli_st_size, stripe, lvb->lvb_size);
166  iput:
167         I_RELE(inode);
168  out:
169         /* These errors are normal races, so we don't want to fill the console
170          * with messages by calling ptlrpc_error() */
171         if (rc == -ELDLM_NO_LOCK_DATA)
172                 lustre_pack_reply(req, 0, NULL, NULL);
173
174         req->rq_status = rc;
175         return rc;
176 }
177
178 __u64 lov_merge_size(struct lov_stripe_md *lsm, int kms);
179 __u64 lov_merge_blocks(struct lov_stripe_md *lsm);
180 __u64 lov_merge_mtime(struct lov_stripe_md *lsm, __u64 current_time);
181
182 /* NB: lov_merge_size will prefer locally cached writes if they extend the
183  * file (because it prefers KMS over RSS when larger) */
184 int llu_glimpse_size(struct inode *inode)
185 {
186         struct llu_inode_info *lli = llu_i2info(inode);
187         struct llu_sb_info *sbi = llu_i2sbi(inode);
188         ldlm_policy_data_t policy = { .l_extent = { 0, OBD_OBJECT_EOF } };
189         struct lustre_handle lockh = { 0 };
190         int rc, flags = LDLM_FL_HAS_INTENT;
191         ENTRY;
192
193         CDEBUG(D_DLMTRACE, "Glimpsing inode %lu\n", lli->lli_st_ino);
194
195         rc = obd_enqueue(sbi->ll_osc_exp, lli->lli_smd, LDLM_EXTENT, &policy,
196                          LCK_PR, &flags, llu_extent_lock_callback,
197                          ldlm_completion_ast, llu_glimpse_callback, inode,
198                          sizeof(struct ost_lvb), lustre_swab_ost_lvb, &lockh);
199         if (rc > 0)
200                 RETURN(-EIO);
201
202         lli->lli_st_size = lov_merge_size(lli->lli_smd, 0);
203         lli->lli_st_blocks = lov_merge_blocks(lli->lli_smd);
204         lli->lli_st_mtime = lov_merge_mtime(lli->lli_smd, lli->lli_st_mtime);
205
206         CDEBUG(D_DLMTRACE, "glimpse: size: %llu, blocks: %lu\n",
207                lli->lli_st_size, lli->lli_st_blocks);
208
209
210         obd_cancel(sbi->ll_osc_exp, lli->lli_smd, LCK_PR, &lockh);
211
212         RETURN(rc);
213 }
214
215 int llu_extent_lock(struct ll_file_data *fd, struct inode *inode,
216                     struct lov_stripe_md *lsm, int mode,
217                     ldlm_policy_data_t *policy, struct lustre_handle *lockh,
218                     int ast_flags)
219 {
220         struct llu_sb_info *sbi = llu_i2sbi(inode);
221         struct llu_inode_info *lli = llu_i2info(inode);
222         int rc;
223         ENTRY;
224
225         LASSERT(lockh->cookie == 0);
226
227         /* XXX phil: can we do this?  won't it screw the file size up? */
228         if ((fd && (fd->fd_flags & LL_FILE_IGNORE_LOCK)) ||
229             (sbi->ll_flags & LL_SBI_NOLCK))
230                 RETURN(0);
231
232         CDEBUG(D_DLMTRACE, "Locking inode %lu, start "LPU64" end "LPU64"\n",
233                lli->lli_st_ino, policy->l_extent.start, policy->l_extent.end);
234
235         rc = obd_enqueue(sbi->ll_osc_exp, lsm, LDLM_EXTENT, policy, mode,
236                          &ast_flags, llu_extent_lock_callback,
237                          ldlm_completion_ast, llu_glimpse_callback, inode,
238                          sizeof(struct ost_lvb), lustre_swab_ost_lvb, lockh);
239         if (rc > 0)
240                 rc = -EIO;
241
242         if (policy->l_extent.start == 0 &&
243             policy->l_extent.end == OBD_OBJECT_EOF)
244                 lli->lli_st_size = lov_merge_size(lsm, 1);
245
246         RETURN(rc);
247 }
248
249 #if 0
250 int llu_extent_lock_no_validate(struct ll_file_data *fd,
251                                 struct inode *inode,
252                                 struct lov_stripe_md *lsm,
253                                 int mode,
254                                 struct ldlm_extent *extent,
255                                 struct lustre_handle *lockh,
256                                 int ast_flags)
257 {
258         struct llu_sb_info *sbi = llu_i2sbi(inode);
259         struct llu_inode_info *lli = llu_i2info(inode);
260         int rc;
261         ENTRY;
262
263         LASSERT(lockh->cookie == 0);
264
265         /* XXX phil: can we do this?  won't it screw the file size up? */
266         if ((fd && (fd->fd_flags & LL_FILE_IGNORE_LOCK)) ||
267             (sbi->ll_flags & LL_SBI_NOLCK))
268                 RETURN(0);
269
270         CDEBUG(D_DLMTRACE, "Locking inode %lu, start "LPU64" end "LPU64"\n",
271                lli->lli_st_ino, extent->start, extent->end);
272
273         rc = obd_enqueue(sbi->ll_osc_exp, lsm, NULL, LDLM_EXTENT, extent,
274                          sizeof(extent), mode, &ast_flags,
275                          llu_extent_lock_callback, inode, lockh);
276
277         RETURN(rc);
278 }
279
280 /*
281  * this grabs a lock and manually implements behaviour that makes it look like
282  * the OST is returning the file size with each lock acquisition.
283  */
284 int llu_extent_lock(struct ll_file_data *fd, struct inode *inode,
285                     struct lov_stripe_md *lsm, int mode,
286                     struct ldlm_extent *extent, struct lustre_handle *lockh)
287 {
288         struct llu_inode_info *lli = llu_i2info(inode);
289         struct obd_export *exp = llu_i2obdexp(inode);
290         struct ldlm_extent size_lock;
291         struct lustre_handle match_lockh = {0};
292         int flags, rc, matched;
293         ENTRY;
294
295         rc = llu_extent_lock_no_validate(fd, inode, lsm, mode, extent, lockh, 0);
296         if (rc != ELDLM_OK)
297                 RETURN(rc);
298
299         if (test_bit(LLI_F_HAVE_OST_SIZE_LOCK, &lli->lli_flags))
300                 RETURN(0);
301
302         rc = llu_inode_getattr(inode, lsm);
303         if (rc) {
304                 llu_extent_unlock(fd, inode, lsm, mode, lockh);
305                 RETURN(rc);
306         }
307
308         size_lock.start = lli->lli_st_size;
309         size_lock.end = OBD_OBJECT_EOF;
310
311         /* XXX I bet we should be checking the lock ignore flags.. */
312         /* FIXME use LDLM_FL_TEST_LOCK instead */
313         flags = LDLM_FL_CBPENDING | LDLM_FL_BLOCK_GRANTED;
314         matched = obd_match(exp, lsm, LDLM_EXTENT, &size_lock,
315                             sizeof(size_lock), LCK_PR, &flags, inode,
316                             &match_lockh);
317
318         /* hey, alright, we hold a size lock that covers the size we
319          * just found, its not going to change for a while.. */
320         if (matched == 1) {
321                 set_bit(LLI_F_HAVE_OST_SIZE_LOCK, &lli->lli_flags);
322                 obd_cancel(exp, lsm, LCK_PR, &match_lockh);
323         }
324
325         RETURN(0);
326 }
327 #endif
328
329 int llu_extent_unlock(struct ll_file_data *fd, struct inode *inode,
330                 struct lov_stripe_md *lsm, int mode,
331                 struct lustre_handle *lockh)
332 {
333         struct llu_sb_info *sbi = llu_i2sbi(inode);
334         int rc;
335         ENTRY;
336
337         /* XXX phil: can we do this?  won't it screw the file size up? */
338         if ((fd && (fd->fd_flags & LL_FILE_IGNORE_LOCK)) ||
339             (sbi->ll_flags & LL_SBI_NOLCK))
340                 RETURN(0);
341
342         rc = obd_cancel(sbi->ll_osc_exp, lsm, mode, lockh);
343
344         RETURN(rc);
345 }
346
347 #define LLAP_MAGIC 12346789
348
349 struct ll_async_page {
350         int             llap_magic;
351         void           *llap_cookie;
352         int             llap_queued;
353         struct page    *llap_page;
354         struct inode   *llap_inode;
355 };
356
357 static struct ll_async_page *llap_from_cookie(void *cookie)
358 {
359         struct ll_async_page *llap = cookie;
360         if (llap->llap_magic != LLAP_MAGIC)
361                 return ERR_PTR(-EINVAL);
362         return llap;
363 };
364
365 static void llu_ap_fill_obdo(void *data, int cmd, struct obdo *oa)
366 {
367         struct ll_async_page *llap;
368         struct inode *inode;
369         struct lov_stripe_md *lsm;
370         obd_flag valid_flags;
371         ENTRY;
372
373         llap = llap_from_cookie(data);
374         if (IS_ERR(llap)) {
375                 EXIT;
376                 return;
377         }
378
379         inode = llap->llap_inode;
380         lsm = llu_i2info(inode)->lli_smd;
381
382         oa->o_id = lsm->lsm_object_id;
383         oa->o_valid = OBD_MD_FLID;
384         valid_flags = OBD_MD_FLTYPE | OBD_MD_FLATIME;
385         if (cmd == OBD_BRW_WRITE)
386                 valid_flags |= OBD_MD_FLMTIME | OBD_MD_FLCTIME;
387
388         obdo_from_inode(oa, inode, valid_flags);
389         EXIT;
390 }
391
392 /* called for each page in a completed rpc.*/
393 static void llu_ap_completion(void *data, int cmd, struct obdo *oa, int rc)
394 {
395         struct ll_async_page *llap;
396         struct page *page;
397
398         llap = llap_from_cookie(data);
399         if (IS_ERR(llap)) {
400                 EXIT;
401                 return;
402         }
403
404         llap->llap_queued = 0;
405         page = llap->llap_page;
406
407         if (rc != 0) {
408                 if (cmd == OBD_BRW_WRITE)
409                         CERROR("writeback error on page %p index %ld: %d\n", 
410                                page, page->index, rc);
411         }
412         EXIT;
413 }
414
415 static struct obd_async_page_ops llu_async_page_ops = {
416         .ap_make_ready =        NULL,
417         .ap_refresh_count =     NULL,
418         .ap_fill_obdo =         llu_ap_fill_obdo,
419         .ap_completion =        llu_ap_completion,
420 };
421
422 static
423 struct llu_sysio_cookie* get_sysio_cookie(struct inode *inode,
424                                           struct obd_export *exp, int maxpages)
425 {
426         struct llu_sysio_cookie *cookie;
427         int rc;
428
429         if (!llap_cookie_size)
430                 llap_cookie_size = obd_prep_async_page(llu_i2obdexp(inode),
431                                                        NULL, NULL, NULL, 0,
432                                                        NULL, NULL, NULL);
433         OBD_ALLOC(cookie, LLU_SYSIO_COOKIE_SIZE(exp, maxpages));
434         if (cookie == NULL)
435                 goto out;
436
437         I_REF(inode);
438         cookie->lsc_inode = inode;
439         cookie->lsc_maxpages = maxpages;
440         cookie->lsc_llap = (struct ll_async_page *)(cookie + 1);
441         cookie->lsc_pages = (struct page *) (cookie->lsc_llap + maxpages);
442         cookie->lsc_llap_cookie = (void *)(cookie->lsc_pages + maxpages);
443
444         rc = oig_init(&cookie->lsc_oig);
445         if (rc) {
446                 OBD_FREE(cookie, LLU_SYSIO_COOKIE_SIZE(exp, maxpages));
447                 cookie = NULL;
448         }
449
450 out:
451         return cookie;
452 }
453
454 static
455 void put_sysio_cookie(struct llu_sysio_cookie *cookie)
456 {
457         struct lov_stripe_md *lsm = llu_i2info(cookie->lsc_inode)->lli_smd;
458         struct obd_export *exp = llu_i2obdexp(cookie->lsc_inode);
459         struct ll_async_page *llap = cookie->lsc_llap;
460 #ifdef LIBLUSTRE_HANDLE_UNALIGNED_PAGE
461         struct page *pages = cookie->lsc_pages;
462 #endif
463         int i;
464
465         for (i = 0; i< cookie->lsc_maxpages; i++) {
466                 if (llap[i].llap_cookie)
467                         obd_teardown_async_page(exp, lsm, NULL,
468                                                 llap[i].llap_cookie);
469 #ifdef LIBLUSTRE_HANDLE_UNALIGNED_PAGE
470                 if (pages[i]._managed) {
471                         free(pages[i].addr);
472                         pages[i]._managed = 0;
473                 }
474 #endif
475         }
476
477         I_RELE(cookie->lsc_inode);
478
479         oig_release(cookie->lsc_oig);
480         OBD_FREE(cookie, LLU_SYSIO_COOKIE_SIZE(exp, cookie->lsc_maxpages));
481 }
482
483 #ifdef LIBLUSTRE_HANDLE_UNALIGNED_PAGE
484 /* Note: these code should be removed finally, don't need
485  * more cleanup
486  */
487 static
488 int prepare_unaligned_write(struct llu_sysio_cookie *cookie)
489 {
490         struct inode *inode = cookie->lsc_inode;
491         struct llu_inode_info *lli = llu_i2info(inode);
492         struct lov_stripe_md *lsm = lli->lli_smd;
493         struct obdo oa;
494         struct page *pages = cookie->lsc_pages;
495         int i, pgidx[2] = {0, cookie->lsc_npages-1};
496         int rc;
497         ENTRY;
498
499         for (i = 0; i < 2; i++) {
500                 struct page *oldpage = &pages[pgidx[i]];
501                 struct page newpage;
502                 struct brw_page pg;
503                 char *newbuf;
504
505                 if (i == 0 && pgidx[0] == pgidx[1])
506                         continue;
507
508                 LASSERT(oldpage->_offset + oldpage->_count <= PAGE_CACHE_SIZE);
509
510                 if (oldpage->_count == PAGE_CACHE_SIZE)
511                         continue;
512
513                 if (oldpage->index << PAGE_CACHE_SHIFT >=
514                     lli->lli_st_size)
515                         continue;
516
517                 newbuf = malloc(PAGE_CACHE_SIZE);
518                 if (!newbuf)
519                         return -ENOMEM;
520
521                 newpage.index = oldpage->index;
522                 newpage.addr = newbuf;
523
524                 pg.pg = &newpage;
525                 pg.off = ((obd_off)newpage.index << PAGE_CACHE_SHIFT);
526                 if (pg.off + PAGE_CACHE_SIZE > lli->lli_st_size)
527                         pg.count = lli->lli_st_size % PAGE_CACHE_SIZE;
528                 else
529                         pg.count = PAGE_CACHE_SIZE;
530                 pg.flag = 0;
531
532                 oa.o_id = lsm->lsm_object_id;
533                 oa.o_mode = lli->lli_st_mode;
534                 oa.o_valid = OBD_MD_FLID | OBD_MD_FLMODE | OBD_MD_FLTYPE;
535
536                 /* issue read */
537                 rc = obd_brw(OBD_BRW_READ, llu_i2obdexp(inode), &oa, lsm, 1, &pg, NULL);
538                 if (rc) {
539                         free(newbuf);
540                         RETURN(rc);
541                 }
542
543                 /* copy page content, and reset page params */
544                 memcpy(newbuf + oldpage->_offset,
545                        (char*)oldpage->addr + oldpage->_offset,
546                        oldpage->_count);
547
548                 oldpage->addr = newbuf;
549                 if ((((obd_off)oldpage->index << PAGE_CACHE_SHIFT) +
550                     oldpage->_offset + oldpage->_count) > lli->lli_st_size)
551                         oldpage->_count += oldpage->_offset;
552                 else
553                         oldpage->_count = PAGE_CACHE_SIZE;
554                 oldpage->_offset = 0;
555                 oldpage->_managed = 1;
556         }
557
558         RETURN(0);
559 }
560 #endif
561
562 static
563 int llu_prep_async_io(struct llu_sysio_cookie *cookie, int cmd,
564                       char *buf, loff_t pos, size_t count)
565 {
566         struct llu_inode_info *lli = llu_i2info(cookie->lsc_inode);
567         struct lov_stripe_md *lsm = lli->lli_smd;
568         struct obd_export *exp = llu_i2obdexp(cookie->lsc_inode);
569         struct page *pages = cookie->lsc_pages;
570         struct ll_async_page *llap = cookie->lsc_llap;
571         void *llap_cookie = cookie->lsc_llap_cookie;
572         int i, rc, npages = 0;
573         ENTRY;
574
575         if (!exp)
576                 RETURN(-EINVAL);
577
578         /* prepare the pages array */
579         do {
580                 unsigned long index, offset, bytes;
581
582                 offset = (pos & ~PAGE_CACHE_MASK);
583                 index = pos >> PAGE_CACHE_SHIFT;
584                 bytes = PAGE_CACHE_SIZE - offset;
585                 if (bytes > count)
586                         bytes = count;
587
588                 /* prevent read beyond file range */
589                 if ((cmd == OBD_BRW_READ) &&
590                     (pos + bytes) >= lli->lli_st_size) {
591                         if (pos >= lli->lli_st_size)
592                                 break;
593                         bytes = lli->lli_st_size - pos;
594                 }
595
596                 /* prepare page for this index */
597                 pages[npages].index = index;
598                 pages[npages].addr = buf - offset;
599
600                 pages[npages]._offset = offset;
601                 pages[npages]._count = bytes;
602
603                 npages++;
604                 count -= bytes;
605                 pos += bytes;
606                 buf += bytes;
607
608                 cookie->lsc_rwcount += bytes;
609         } while (count);
610
611         cookie->lsc_npages = npages;
612
613 #ifdef LIBLUSTRE_HANDLE_UNALIGNED_PAGE
614         if (cmd == OBD_BRW_WRITE) {
615                 rc = prepare_unaligned_write(cookie);
616                 if (rc)
617                         RETURN(rc);
618         }
619 #endif
620
621         for (i = 0; i < npages; i++) {
622                 llap[i].llap_magic = LLAP_MAGIC;
623                 llap[i].llap_cookie = llap_cookie + i * llap_cookie_size;
624                 rc = obd_prep_async_page(exp, lsm, NULL, &pages[i],
625                                          (obd_off)pages[i].index << PAGE_SHIFT,
626                                          &llu_async_page_ops,
627                                          &llap[i], &llap[i].llap_cookie);
628                 if (rc) {
629                         llap[i].llap_cookie = NULL;
630                         RETURN(rc);
631                 }
632                 CDEBUG(D_CACHE, "llap %p page %p cookie %p obj off "LPU64"\n",
633                        &llap[i], &pages[i], llap[i].llap_cookie,
634                        (obd_off)pages[i].index << PAGE_SHIFT);
635                 pages[i].private = (unsigned long)&llap[i];
636                 llap[i].llap_page = &pages[i];
637                 llap[i].llap_inode = cookie->lsc_inode;
638
639                 rc = obd_queue_group_io(exp, lsm, NULL, cookie->lsc_oig,
640                                         llap[i].llap_cookie, cmd,
641                                         pages[i]._offset, pages[i]._count, 0,
642                                         ASYNC_READY | ASYNC_URGENT |
643                                         ASYNC_COUNT_STABLE | ASYNC_GROUP_SYNC);
644                 if (rc)
645                         RETURN(rc);
646
647                 llap[i].llap_queued = 1;
648         }
649
650         RETURN(0);
651 }
652
653 static
654 int llu_start_async_io(struct llu_sysio_cookie *cookie)
655 {
656         struct lov_stripe_md *lsm = llu_i2info(cookie->lsc_inode)->lli_smd;
657         struct obd_export *exp = llu_i2obdexp(cookie->lsc_inode);
658
659         return obd_trigger_group_io(exp, lsm, NULL, cookie->lsc_oig);
660 }
661
662 /*
663  * read/write a continuous buffer for an inode (zero-copy)
664  */
665 struct llu_sysio_cookie*
666 llu_rw(int cmd, struct inode *inode, char *buf, size_t count, loff_t pos)
667 {
668         struct llu_sysio_cookie *cookie;
669         int max_pages, rc;
670         ENTRY;
671
672         max_pages = (count >> PAGE_SHIFT) + 2;
673
674         cookie = get_sysio_cookie(inode, llu_i2obdexp(inode), max_pages);
675         if (!cookie)
676                 RETURN(ERR_PTR(-ENOMEM));
677
678         rc = llu_prep_async_io(cookie, cmd, buf, pos, count);
679         if (rc)
680                 GOTO(out_cleanup, rc);
681
682         rc = llu_start_async_io(cookie);
683         if (rc)
684                 GOTO(out_cleanup, rc);
685
686 /*
687         rc = oig_wait(&oig);
688         if (rc) {
689                 CERROR("file i/o error!\n");
690                 rw_count = rc;
691         }
692 */
693         RETURN(cookie);
694
695 out_cleanup:
696         put_sysio_cookie(cookie);
697         RETURN(ERR_PTR(rc));
698 }
699
700 void lov_increase_kms(struct obd_export *exp, struct lov_stripe_md *lsm,
701                       obd_off size);
702
703 struct llu_sysio_callback_args*
704 llu_file_write(struct inode *inode, const struct iovec *iovec,
705                size_t iovlen, loff_t pos)
706 {
707         struct llu_inode_info *lli = llu_i2info(inode);
708         struct ll_file_data *fd = lli->lli_file_data;
709         struct lustre_handle lockh = {0};
710         struct lov_stripe_md *lsm = lli->lli_smd;
711         struct obd_export *exp = NULL;
712         ldlm_policy_data_t policy;
713         struct llu_sysio_callback_args *lsca;
714         struct llu_sysio_cookie *cookie;
715         ldlm_error_t err;
716         int iovidx;
717         ENTRY;
718
719         /* XXX consider other types later */
720         if (!S_ISREG(lli->lli_st_mode))
721                 LBUG();
722
723         LASSERT(iovlen <= MAX_IOVEC);
724
725         exp = llu_i2obdexp(inode);
726         if (exp == NULL)
727                 RETURN(ERR_PTR(-EINVAL));
728
729         OBD_ALLOC(lsca, sizeof(*lsca));
730         if (!lsca)
731                 RETURN(ERR_PTR(-ENOMEM));
732
733         /* FIXME optimize the following extent locking */
734         for (iovidx = 0; iovidx < iovlen; iovidx++) {
735                 char *buf = (char*)iovec[iovidx].iov_base;
736                 size_t count = iovec[iovidx].iov_len;
737
738                 if (count == 0)
739                         continue;
740
741                 if (pos + count > lli->lli_maxbytes)
742                         GOTO(err_out, err = -ERANGE);
743
744                 /* FIXME libsysio haven't handle O_APPEND?? */
745                 policy.l_extent.start = pos;
746                 policy.l_extent.end = pos + count - 1;
747
748                 err = llu_extent_lock(fd, inode, lsm, LCK_PW, &policy,
749                                       &lockh, 0);
750                 if (err != ELDLM_OK)
751                         GOTO(err_out, err = -ENOLCK);
752
753                 CDEBUG(D_INFO, "Writing inode %lu, "LPSZ" bytes, offset %Lu\n",
754                        lli->lli_st_ino, count, pos);
755
756                 cookie = llu_rw(OBD_BRW_WRITE, inode, buf, count, pos);
757                 if (!IS_ERR(cookie)) {
758                         /* save cookie */
759                         lsca->cookies[lsca->ncookies++] = cookie;
760                         pos += count;
761                         lov_increase_kms(exp, lsm, pos);
762                         /* file size grow */
763                         if (pos > lli->lli_st_size)
764                                 lli->lli_st_size = pos;
765                 } else {
766                         llu_extent_unlock(fd, inode, lsm, LCK_PW, &lockh);
767                         GOTO(err_out, err = PTR_ERR(cookie));
768                 }
769
770                 /* XXX errors? */
771                 err = llu_extent_unlock(fd, inode, lsm, LCK_PW, &lockh);
772                 if (err)
773                         CERROR("extent unlock error %d\n", err);
774         }
775
776         RETURN(lsca);
777
778 err_out:
779         /* teardown all async stuff */
780         while (lsca->ncookies--) {
781                 put_sysio_cookie(lsca->cookies[lsca->ncookies]);
782         }
783         OBD_FREE(lsca, sizeof(*lsca));
784
785         RETURN(ERR_PTR(err));
786 }
787
788 #if 0
789 static void llu_update_atime(struct inode *inode)
790 {
791         struct llu_inode_info *lli = llu_i2info(inode);
792
793 #ifdef USE_ATIME
794         struct iattr attr;
795
796         attr.ia_atime = LTIME_S(CURRENT_TIME);
797         attr.ia_valid = ATTR_ATIME;
798
799         if (lli->lli_st_atime == attr.ia_atime) return;
800         if (IS_RDONLY(inode)) return;
801         if (IS_NOATIME(inode)) return;
802
803         /* ll_inode_setattr() sets inode->i_atime from attr.ia_atime */
804         llu_inode_setattr(inode, &attr, 0);
805 #else
806         /* update atime, but don't explicitly write it out just this change */
807         inode->i_atime = CURRENT_TIME;
808 #endif
809 }
810 #endif
811
812 struct llu_sysio_callback_args*
813 llu_file_read(struct inode *inode, const struct iovec *iovec,
814               size_t iovlen, loff_t pos)
815 {
816         struct llu_inode_info *lli = llu_i2info(inode);
817         struct ll_file_data *fd = lli->lli_file_data;
818         struct lov_stripe_md *lsm = lli->lli_smd;
819         struct lustre_handle lockh = { 0 };
820         ldlm_policy_data_t policy;
821         struct llu_sysio_callback_args *lsca;
822         struct llu_sysio_cookie *cookie;
823         __u64 kms;
824         int iovidx;
825
826         ldlm_error_t err;
827         ENTRY;
828
829         OBD_ALLOC(lsca, sizeof(*lsca));
830         if (!lsca)
831                 RETURN(ERR_PTR(-ENOMEM));
832
833         for (iovidx = 0; iovidx < iovlen; iovidx++) {
834                 char *buf = iovec[iovidx].iov_base;
835                 size_t count = iovec[iovidx].iov_len;
836
837                 /* "If nbyte is 0, read() will return 0 and have no other results."
838                  *                      -- Single Unix Spec */
839                 if (count == 0)
840                         continue;
841
842                 policy.l_extent.start = pos;
843                 policy.l_extent.end = pos + count - 1;
844
845                 err = llu_extent_lock(fd, inode, lsm, LCK_PR, &policy, &lockh, 0);
846                 if (err != ELDLM_OK)
847                         GOTO(err_out, err = -ENOLCK);
848
849                 kms = lov_merge_size(lsm, 1);
850                 if (policy.l_extent.end > kms) {
851                         /* A glimpse is necessary to determine whether we
852                          * return a short read or some zeroes at the end of
853                          * the buffer */
854                         if (llu_glimpse_size(inode)) {
855                                 llu_extent_unlock(fd, inode, lsm,LCK_PR,&lockh);
856                                 GOTO(err_out, err = -ENOLCK);
857                         }
858                 } else {
859                         lli->lli_st_size = kms;
860                 }
861
862                 CDEBUG(D_INFO, "Reading inode %lu, "LPSZ" bytes, offset %Ld, "
863                        "i_size "LPU64"\n", lli->lli_st_ino, count, pos,
864                        lli->lli_st_size);
865
866                 if (pos >= lli->lli_st_size) {
867                         llu_extent_unlock(fd, inode, lsm, LCK_PR, &lockh);
868                         break;
869                 }
870
871                 cookie = llu_rw(OBD_BRW_READ, inode, buf, count, pos);
872                 if (!IS_ERR(cookie)) {
873                         /* save cookie */
874                         lsca->cookies[lsca->ncookies++] = cookie;
875                         pos += count;
876                 } else {
877                         llu_extent_unlock(fd, inode, lsm, LCK_PR, &lockh);
878                         GOTO(err_out, err = PTR_ERR(cookie));
879                 }
880
881                 /* XXX errors? */
882                 err = llu_extent_unlock(fd, inode, lsm, LCK_PR, &lockh);
883                 if (err)
884                         CERROR("extent_unlock fail: %d\n", err);
885         }
886 #if 0
887         if (readed > 0)
888                 llu_update_atime(inode);
889 #endif
890         RETURN(lsca);
891
892 err_out:
893         /* teardown all async stuff */
894         while (lsca->ncookies--) {
895                 put_sysio_cookie(lsca->cookies[lsca->ncookies]);
896         }
897         OBD_FREE(lsca, sizeof(*lsca));
898
899         RETURN(ERR_PTR(err));
900 }
901
902 int llu_iop_iodone(struct ioctx *ioctxp)
903 {
904         struct llu_sysio_callback_args *lsca = ioctxp->ioctx_private;
905         struct llu_sysio_cookie *cookie;
906         int i, err = 0, rc = 0;
907         ENTRY;
908
909         /* write/read(fd, buf, 0) */
910         if (!lsca) {
911                 ioctxp->ioctx_cc = 0;
912                 RETURN(1);
913         }
914
915         LASSERT(!IS_ERR(lsca));
916
917         for (i = 0; i < lsca->ncookies; i++) {
918                 cookie = lsca->cookies[i];
919                 if (cookie) {
920                         err = oig_wait(cookie->lsc_oig);
921                         if (err && !rc)
922                                 rc = err;
923                         if (!rc)
924                                 ioctxp->ioctx_cc += cookie->lsc_rwcount;
925                         put_sysio_cookie(cookie);
926                 }
927         }
928
929         if (rc) {
930                 LASSERT(rc < 0);
931                 ioctxp->ioctx_cc = -1;
932                 ioctxp->ioctx_errno = -rc;
933         }
934
935         OBD_FREE(lsca, sizeof(*lsca));
936         ioctxp->ioctx_private = NULL;
937
938         RETURN(1);
939 }