Whamcloud - gitweb
Land b_smallfix onto HEAD (20040330_2312)
[fs/lustre-release.git] / lustre / liblustre / rw.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Lustre Light Super operations
5  *
6  *  Copyright (c) 2002, 2003 Cluster File Systems, Inc.
7  *
8  *   This file is part of Lustre, http://www.lustre.org.
9  *
10  *   Lustre is free software; you can redistribute it and/or
11  *   modify it under the terms of version 2 of the GNU General Public
12  *   License as published by the Free Software Foundation.
13  *
14  *   Lustre is distributed in the hope that it will be useful,
15  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
16  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17  *   GNU General Public License for more details.
18  *
19  *   You should have received a copy of the GNU General Public License
20  *   along with Lustre; if not, write to the Free Software
21  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
22  */
23
24 #define DEBUG_SUBSYSTEM S_LLITE
25
26 #include <stdlib.h>
27 #include <string.h>
28 #include <assert.h>
29 #include <time.h>
30 #include <sys/types.h>
31 #include <sys/queue.h>
32 #include <fcntl.h>
33
34 #include <sysio.h>
35 #include <fs.h>
36 #include <mount.h>
37 #include <inode.h>
38 #include <file.h>
39
40 #undef LIST_HEAD
41
42 #include "llite_lib.h"
43
44 static int llu_lock_to_stripe_offset(struct inode *inode, struct ldlm_lock *lock)
45 {
46         struct llu_inode_info *lli = llu_i2info(inode);
47         struct lov_stripe_md *lsm = lli->lli_smd;
48         struct obd_export *exp = llu_i2obdexp(inode);
49         struct {
50                 char name[16];
51                 struct ldlm_lock *lock;
52                 struct lov_stripe_md *lsm;
53         } key = { .name = "lock_to_stripe", .lock = lock, .lsm = lsm };
54         __u32 stripe, vallen = sizeof(stripe);
55         int rc;
56         ENTRY;
57
58         if (lsm->lsm_stripe_count == 1)
59                 RETURN(0);
60
61         /* get our offset in the lov */
62         rc = obd_get_info(exp, sizeof(key), &key, &vallen, &stripe);
63         if (rc != 0) {
64                 CERROR("obd_get_info: rc = %d\n", rc);
65                 LBUG();
66         }
67         LASSERT(stripe < lsm->lsm_stripe_count);
68         RETURN(stripe);
69 }
70
71 static int llu_extent_lock_callback(struct ldlm_lock *lock,
72                                     struct ldlm_lock_desc *new, void *data,
73                                     int flag)
74 {
75         struct lustre_handle lockh = { 0 };
76         int rc;
77         ENTRY;
78         
79
80         if ((unsigned long)data > 0 && (unsigned long)data < 0x1000) {
81                 LDLM_ERROR(lock, "cancelling lock with bad data %p", data);
82                 LBUG();
83         }
84         
85         switch (flag) {
86         case LDLM_CB_BLOCKING:
87                 ldlm_lock2handle(lock, &lockh);
88                 rc = ldlm_cli_cancel(&lockh);
89                 if (rc != ELDLM_OK)
90                         CERROR("ldlm_cli_cancel failed: %d\n", rc);
91                 break;
92         case LDLM_CB_CANCELING: {
93                 struct inode *inode;
94                 struct llu_inode_info *lli;
95                 struct lov_stripe_md *lsm;
96                 __u32 stripe;
97                 __u64 kms;
98                 
99                 /* This lock wasn't granted, don't try to evict pages */
100                 if (lock->l_req_mode != lock->l_granted_mode)
101                         RETURN(0);
102
103                 inode = llu_inode_from_lock(lock);
104                 if (!inode)
105                         RETURN(0);
106                 lli= llu_i2info(inode);
107                 if (!lli)
108                         goto iput;
109                 if (!lli->lli_smd)
110                         goto iput;
111                 lsm = lli->lli_smd;
112
113                 stripe = llu_lock_to_stripe_offset(inode, lock);
114                 kms = ldlm_extent_shift_kms(lock,
115                                             lsm->lsm_oinfo[stripe].loi_kms);
116                 if (lsm->lsm_oinfo[stripe].loi_kms != kms)
117                         LDLM_DEBUG(lock, "updating kms from "LPU64" to "LPU64,
118                                    lsm->lsm_oinfo[stripe].loi_kms, kms);
119                 lsm->lsm_oinfo[stripe].loi_kms = kms;
120 iput:
121                 I_RELE(inode);
122                 break;
123         }
124         default:
125                 LBUG();
126         }
127         
128         RETURN(0);
129 }
130
131 static int llu_glimpse_callback(struct ldlm_lock *lock, void *reqp)
132 {
133         struct ptlrpc_request *req = reqp;
134         struct inode *inode = llu_inode_from_lock(lock);
135         struct llu_inode_info *lli;
136         struct ost_lvb *lvb;
137         int rc, size = sizeof(*lvb), stripe = 0;
138         ENTRY;
139
140         if (inode == NULL)
141                 GOTO(out, rc = -ELDLM_NO_LOCK_DATA);
142         lli = llu_i2info(inode);
143         if (lli == NULL)
144                 GOTO(iput, rc = -ELDLM_NO_LOCK_DATA);
145         if (lli->lli_smd == NULL)
146                 GOTO(iput, rc = -ELDLM_NO_LOCK_DATA);
147
148         /* First, find out which stripe index this lock corresponds to. */
149         if (lli->lli_smd->lsm_stripe_count > 1)
150                 stripe = llu_lock_to_stripe_offset(inode, lock);
151
152         rc = lustre_pack_reply(req, 1, &size, NULL);
153         if (rc) {
154                 CERROR("lustre_pack_reply: %d\n", rc);
155                 GOTO(iput, rc);
156         }
157
158         lvb = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*lvb));
159         lvb->lvb_size = lli->lli_smd->lsm_oinfo[stripe].loi_kms;
160
161         LDLM_DEBUG(lock, "i_size: %llu -> stripe number %u -> kms "LPU64,
162                    lli->lli_st_size, stripe, lvb->lvb_size);
163  iput:
164         I_RELE(inode);
165  out:
166         /* These errors are normal races, so we don't want to fill the console
167          * with messages by calling ptlrpc_error() */
168         if (rc == -ELDLM_NO_LOCK_DATA)
169                 lustre_pack_reply(req, 0, NULL, NULL);
170
171         req->rq_status = rc;
172         return rc;
173 }
174
175 __u64 lov_merge_size(struct lov_stripe_md *lsm, int kms);
176 __u64 lov_merge_mtime(struct lov_stripe_md *lsm, __u64 current_time);
177
178 /* NB: lov_merge_size will prefer locally cached writes if they extend the
179  * file (because it prefers KMS over RSS when larger) */
180 int llu_glimpse_size(struct inode *inode, struct ost_lvb *lvb)
181 {
182         struct llu_inode_info *lli = llu_i2info(inode);
183         struct llu_sb_info *sbi = llu_i2sbi(inode);
184         ldlm_policy_data_t policy = { .l_extent = { 0, OBD_OBJECT_EOF } };
185         struct lustre_handle lockh;
186         int rc, flags = LDLM_FL_HAS_INTENT;
187         ENTRY;
188
189         CDEBUG(D_DLMTRACE, "Glimpsing inode %lu\n", lli->lli_st_ino);
190
191         rc = obd_enqueue(sbi->ll_osc_exp, lli->lli_smd, LDLM_EXTENT, &policy,
192                          LCK_PR, &flags, llu_extent_lock_callback,
193                          ldlm_completion_ast, llu_glimpse_callback, inode,
194                          sizeof(*lvb), lustre_swab_ost_lvb, &lockh);
195         if (rc > 0)
196                 RETURN(-EIO);
197
198         lvb->lvb_size = lov_merge_size(lli->lli_smd, 0);
199         //inode->i_mtime = lov_merge_mtime(lli->lli_smd, inode->i_mtime);
200
201         CDEBUG(D_DLMTRACE, "glimpse: size: "LPU64"\n", lvb->lvb_size);
202
203         obd_cancel(sbi->ll_osc_exp, lli->lli_smd, LCK_PR, &lockh);
204
205         RETURN(rc);
206 }
207
208 int llu_extent_lock(struct ll_file_data *fd, struct inode *inode,
209                     struct lov_stripe_md *lsm, int mode,
210                     ldlm_policy_data_t *policy, struct lustre_handle *lockh,
211                     int ast_flags)
212 {
213         struct llu_sb_info *sbi = llu_i2sbi(inode);
214         struct llu_inode_info *lli = llu_i2info(inode);
215         int rc;
216         ENTRY;
217
218         LASSERT(lockh->cookie == 0);
219
220         /* XXX phil: can we do this?  won't it screw the file size up? */
221         if ((fd && (fd->fd_flags & LL_FILE_IGNORE_LOCK)) ||
222             (sbi->ll_flags & LL_SBI_NOLCK))
223                 RETURN(0);
224
225         CDEBUG(D_DLMTRACE, "Locking inode %lu, start "LPU64" end "LPU64"\n",
226                lli->lli_st_ino, policy->l_extent.start, policy->l_extent.end);
227
228         rc = obd_enqueue(sbi->ll_osc_exp, lsm, LDLM_EXTENT, policy, mode,
229                          &ast_flags, llu_extent_lock_callback,
230                          ldlm_completion_ast, llu_glimpse_callback, inode,
231                          sizeof(struct ost_lvb), lustre_swab_ost_lvb, lockh);
232         if (rc > 0)
233                 rc = -EIO;
234
235         if (policy->l_extent.start == 0 &&
236             policy->l_extent.end == OBD_OBJECT_EOF)
237                 lli->lli_st_size = lov_merge_size(lsm, 1);
238
239         //inode->i_mtime = lov_merge_mtime(lsm, inode->i_mtime);
240
241         RETURN(rc);
242 }
243
244 #if 0
245 int llu_extent_lock_no_validate(struct ll_file_data *fd,
246                                 struct inode *inode,
247                                 struct lov_stripe_md *lsm,
248                                 int mode,
249                                 struct ldlm_extent *extent,
250                                 struct lustre_handle *lockh,
251                                 int ast_flags)
252 {
253         struct llu_sb_info *sbi = llu_i2sbi(inode);
254         struct llu_inode_info *lli = llu_i2info(inode);
255         int rc;
256         ENTRY;
257
258         LASSERT(lockh->cookie == 0);
259
260         /* XXX phil: can we do this?  won't it screw the file size up? */
261         if ((fd && (fd->fd_flags & LL_FILE_IGNORE_LOCK)) ||
262             (sbi->ll_flags & LL_SBI_NOLCK))
263                 RETURN(0);
264
265         CDEBUG(D_DLMTRACE, "Locking inode %lu, start "LPU64" end "LPU64"\n",
266                lli->lli_st_ino, extent->start, extent->end);
267
268         rc = obd_enqueue(sbi->ll_osc_exp, lsm, NULL, LDLM_EXTENT, extent,
269                          sizeof(extent), mode, &ast_flags,
270                          llu_extent_lock_callback, inode, lockh);
271
272         RETURN(rc);
273 }
274
275 /*
276  * this grabs a lock and manually implements behaviour that makes it look like
277  * the OST is returning the file size with each lock acquisition.
278  */
279 int llu_extent_lock(struct ll_file_data *fd, struct inode *inode,
280                     struct lov_stripe_md *lsm, int mode,
281                     struct ldlm_extent *extent, struct lustre_handle *lockh,
282                     int nonblock)
283 {
284         struct llu_inode_info *lli = llu_i2info(inode);
285         struct obd_export *exp = llu_i2obdexp(inode);
286         struct ldlm_extent size_lock;
287         struct lustre_handle match_lockh = {0};
288         int flags, rc, matched;
289         int astflags = nonblock ? LDLM_FL_BLOCK_NOWAIT : 0;
290         ENTRY;
291
292         rc = llu_extent_lock_no_validate(fd, inode, lsm, mode, extent,
293                                          lockh, astflags);
294         if (rc != ELDLM_OK)
295                 RETURN(rc);
296
297         if (test_bit(LLI_F_HAVE_OST_SIZE_LOCK, &lli->lli_flags))
298                 RETURN(0);
299
300         rc = llu_inode_getattr(inode, lsm);
301         if (rc) {
302                 llu_extent_unlock(fd, inode, lsm, mode, lockh);
303                 RETURN(rc);
304         }
305
306         size_lock.start = lli->lli_st_size;
307         size_lock.end = OBD_OBJECT_EOF;
308
309         /* XXX I bet we should be checking the lock ignore flags.. */
310         /* FIXME use LDLM_FL_TEST_LOCK instead */
311         flags = LDLM_FL_CBPENDING | LDLM_FL_BLOCK_GRANTED;
312         matched = obd_match(exp, lsm, LDLM_EXTENT, &size_lock,
313                             sizeof(size_lock), LCK_PR, &flags, inode,
314                             &match_lockh);
315
316         /* hey, alright, we hold a size lock that covers the size we
317          * just found, its not going to change for a while.. */
318         if (matched == 1) {
319                 set_bit(LLI_F_HAVE_OST_SIZE_LOCK, &lli->lli_flags);
320                 obd_cancel(exp, lsm, LCK_PR, &match_lockh);
321         }
322
323         RETURN(0);
324 }
325 #endif
326
327 int llu_extent_unlock(struct ll_file_data *fd, struct inode *inode,
328                 struct lov_stripe_md *lsm, int mode,
329                 struct lustre_handle *lockh)
330 {
331         struct llu_sb_info *sbi = llu_i2sbi(inode);
332         int rc;
333         ENTRY;
334
335         /* XXX phil: can we do this?  won't it screw the file size up? */
336         if ((fd && (fd->fd_flags & LL_FILE_IGNORE_LOCK)) ||
337             (sbi->ll_flags & LL_SBI_NOLCK))
338                 RETURN(0);
339
340         rc = obd_cancel(sbi->ll_osc_exp, lsm, mode, lockh);
341
342         RETURN(rc);
343 }
344
345 #define LLAP_MAGIC 12346789
346
347 struct ll_async_page {
348         int             llap_magic;
349         void           *llap_cookie;
350         int             llap_queued;
351         struct page    *llap_page;
352         struct inode   *llap_inode;
353 };
354
355 static struct ll_async_page *llap_from_cookie(void *cookie)
356 {
357         struct ll_async_page *llap = cookie;
358         if (llap->llap_magic != LLAP_MAGIC)
359                 return ERR_PTR(-EINVAL);
360         return llap;
361 };
362
363 static void llu_ap_fill_obdo(void *data, int cmd, struct obdo *oa)
364 {
365         struct ll_async_page *llap;
366         struct inode *inode;
367         struct lov_stripe_md *lsm;
368         obd_flag valid_flags;
369         ENTRY;
370
371         llap = llap_from_cookie(data);
372         if (IS_ERR(llap)) {
373                 EXIT;
374                 return;
375         }
376
377         inode = llap->llap_inode;
378         lsm = llu_i2info(inode)->lli_smd;
379
380         oa->o_id = lsm->lsm_object_id;
381         oa->o_valid = OBD_MD_FLID;
382         valid_flags = OBD_MD_FLTYPE | OBD_MD_FLATIME;
383         if (cmd == OBD_BRW_WRITE)
384                 valid_flags |= OBD_MD_FLMTIME | OBD_MD_FLCTIME;
385
386         obdo_from_inode(oa, inode, valid_flags);
387         EXIT;
388 }
389
390 /* called for each page in a completed rpc.*/
391 static void llu_ap_completion(void *data, int cmd, struct obdo *oa, int rc)
392 {
393         struct ll_async_page *llap;
394         struct page *page;
395
396         llap = llap_from_cookie(data);
397         if (IS_ERR(llap)) {
398                 EXIT;
399                 return;
400         }
401
402         llap->llap_queued = 0;
403         page = llap->llap_page;
404
405         if (rc != 0) {
406                 if (cmd == OBD_BRW_WRITE)
407                         CERROR("writeback error on page %p index %ld: %d\n", 
408                                page, page->index, rc);
409         }
410         EXIT;
411 }
412
413 static struct obd_async_page_ops llu_async_page_ops = {
414         .ap_make_ready =        NULL,
415         .ap_refresh_count =     NULL,
416         .ap_fill_obdo =         llu_ap_fill_obdo,
417         .ap_completion =        llu_ap_completion,
418 };
419
420 static
421 struct llu_sysio_cookie* get_sysio_cookie(struct inode *inode, int maxpages)
422 {
423         struct llu_sysio_cookie *cookie;
424         int rc;
425
426         OBD_ALLOC(cookie, LLU_SYSIO_COOKIE_SIZE(maxpages));
427         if (cookie == NULL)
428                 goto out;
429
430         I_REF(inode);
431         cookie->lsc_inode = inode;
432         cookie->lsc_maxpages = maxpages;
433         cookie->lsc_llap = (struct ll_async_page *)(cookie + 1);
434         cookie->lsc_pages = (struct page *) (cookie->lsc_llap + maxpages);
435
436         rc = oig_init(&cookie->lsc_oig);
437         if (rc) {
438                 OBD_FREE(cookie, LLU_SYSIO_COOKIE_SIZE(maxpages));
439                 cookie = NULL;
440         }
441
442 out:
443         return cookie;
444 }
445
446 static
447 void put_sysio_cookie(struct llu_sysio_cookie *cookie)
448 {
449         struct lov_stripe_md *lsm = llu_i2info(cookie->lsc_inode)->lli_smd;
450         struct obd_export *exp = llu_i2obdexp(cookie->lsc_inode);
451         struct ll_async_page *llap = cookie->lsc_llap;
452 #ifdef LIBLUSTRE_HANDLE_UNALIGNED_PAGE
453         struct page *pages = cookie->lsc_pages;
454 #endif
455         int i;
456
457         for (i = 0; i< cookie->lsc_maxpages; i++) {
458                 if (llap[i].llap_cookie)
459                         obd_teardown_async_page(exp, lsm, NULL,
460                                                 llap[i].llap_cookie);
461 #ifdef LIBLUSTRE_HANDLE_UNALIGNED_PAGE
462                 if (pages[i]._managed) {
463                         free(pages[i].addr);
464                         pages[i]._managed = 0;
465                 }
466 #endif
467         }
468
469         I_RELE(cookie->lsc_inode);
470
471         oig_release(cookie->lsc_oig);
472         OBD_FREE(cookie, LLU_SYSIO_COOKIE_SIZE(cookie->lsc_maxpages));
473 }
474
475 #ifdef LIBLUSTRE_HANDLE_UNALIGNED_PAGE
476 /* Note: these code should be removed finally, don't need
477  * more cleanup
478  */
479 static
480 int prepare_unaligned_write(struct llu_sysio_cookie *cookie)
481 {
482         struct inode *inode = cookie->lsc_inode;
483         struct llu_inode_info *lli = llu_i2info(inode);
484         struct lov_stripe_md *lsm = lli->lli_smd;
485         struct obdo oa;
486         struct page *pages = cookie->lsc_pages;
487         int i, pgidx[2] = {0, cookie->lsc_npages-1};
488         int rc;
489         ENTRY;
490
491         for (i = 0; i < 2; i++) {
492                 struct page *oldpage = &pages[pgidx[i]];
493                 struct page newpage;
494                 struct brw_page pg;
495                 char *newbuf;
496
497                 if (i == 0 && pgidx[0] == pgidx[1])
498                         continue;
499
500                 LASSERT(oldpage->_offset + oldpage->_count <= PAGE_CACHE_SIZE);
501
502                 if (oldpage->_count == PAGE_CACHE_SIZE)
503                         continue;
504
505                 if (oldpage->index << PAGE_CACHE_SHIFT >=
506                     lli->lli_st_size)
507                         continue;
508
509                 newbuf = malloc(PAGE_CACHE_SIZE);
510                 if (!newbuf)
511                         return -ENOMEM;
512
513                 newpage.index = oldpage->index;
514                 newpage.addr = newbuf;
515
516                 pg.pg = &newpage;
517                 pg.off = ((obd_off)newpage.index << PAGE_CACHE_SHIFT);
518                 if (pg.off + PAGE_CACHE_SIZE > lli->lli_st_size)
519                         pg.count = lli->lli_st_size % PAGE_CACHE_SIZE;
520                 else
521                         pg.count = PAGE_CACHE_SIZE;
522                 pg.flag = 0;
523
524                 oa.o_id = lsm->lsm_object_id;
525                 oa.o_mode = lli->lli_st_mode;
526                 oa.o_valid = OBD_MD_FLID | OBD_MD_FLMODE | OBD_MD_FLTYPE;
527
528                 /* issue read */
529                 rc = obd_brw(OBD_BRW_READ, llu_i2obdexp(inode), &oa, lsm, 1, &pg, NULL);
530                 if (rc) {
531                         free(newbuf);
532                         RETURN(rc);
533                 }
534
535                 /* copy page content, and reset page params */
536                 memcpy(newbuf + oldpage->_offset,
537                        (char*)oldpage->addr + oldpage->_offset,
538                        oldpage->_count);
539
540                 oldpage->addr = newbuf;
541                 if ((((obd_off)oldpage->index << PAGE_CACHE_SHIFT) +
542                     oldpage->_offset + oldpage->_count) > lli->lli_st_size)
543                         oldpage->_count += oldpage->_offset;
544                 else
545                         oldpage->_count = PAGE_CACHE_SIZE;
546                 oldpage->_offset = 0;
547                 oldpage->_managed = 1;
548         }
549
550         RETURN(0);
551 }
552 #endif
553
554 static
555 int llu_prep_async_io(struct llu_sysio_cookie *cookie, int cmd,
556                       char *buf, loff_t pos, size_t count)
557 {
558         struct llu_inode_info *lli = llu_i2info(cookie->lsc_inode);
559         struct lov_stripe_md *lsm = lli->lli_smd;
560         struct obd_export *exp = llu_i2obdexp(cookie->lsc_inode);
561         struct page *pages = cookie->lsc_pages;
562         struct ll_async_page *llap = cookie->lsc_llap;
563         int i, rc, npages = 0;
564         ENTRY;
565
566         if (!exp)
567                 RETURN(-EINVAL);
568
569         /* prepare the pages array */
570         do {
571                 unsigned long index, offset, bytes;
572
573                 offset = (pos & ~PAGE_CACHE_MASK);
574                 index = pos >> PAGE_CACHE_SHIFT;
575                 bytes = PAGE_CACHE_SIZE - offset;
576                 if (bytes > count)
577                         bytes = count;
578
579                 /* prevent read beyond file range */
580                 if ((cmd == OBD_BRW_READ) &&
581                     (pos + bytes) >= lli->lli_st_size) {
582                         if (pos >= lli->lli_st_size)
583                                 break;
584                         bytes = lli->lli_st_size - pos;
585                 }
586
587                 /* prepare page for this index */
588                 pages[npages].index = index;
589                 pages[npages].addr = buf - offset;
590
591                 pages[npages]._offset = offset;
592                 pages[npages]._count = bytes;
593
594                 npages++;
595                 count -= bytes;
596                 pos += bytes;
597                 buf += bytes;
598
599                 cookie->lsc_rwcount += bytes;
600         } while (count);
601
602         cookie->lsc_npages = npages;
603
604 #ifdef LIBLUSTRE_HANDLE_UNALIGNED_PAGE
605         if (cmd == OBD_BRW_WRITE) {
606                 rc = prepare_unaligned_write(cookie);
607                 if (rc)
608                         RETURN(rc);
609         }
610 #endif
611
612         for (i = 0; i < npages; i++) {
613                 llap[i].llap_magic = LLAP_MAGIC;
614                 rc = obd_prep_async_page(exp, lsm, NULL, &pages[i],
615                                          (obd_off)pages[i].index << PAGE_SHIFT,
616                                          &llu_async_page_ops,
617                                          &llap[i], &llap[i].llap_cookie);
618                 if (rc) {
619                         llap[i].llap_cookie = NULL;
620                         RETURN(rc);
621                 }
622                 CDEBUG(D_CACHE, "llap %p page %p cookie %p obj off "LPU64"\n",
623                        &llap[i], &pages[i], llap[i].llap_cookie,
624                        (obd_off)pages[i].index << PAGE_SHIFT);
625                 pages[i].private = (unsigned long)&llap[i];
626                 llap[i].llap_page = &pages[i];
627                 llap[i].llap_inode = cookie->lsc_inode;
628
629                 rc = obd_queue_group_io(exp, lsm, NULL, cookie->lsc_oig,
630                                         llap[i].llap_cookie, cmd,
631                                         pages[i]._offset, pages[i]._count, 0,
632                                         ASYNC_READY | ASYNC_URGENT |
633                                         ASYNC_COUNT_STABLE | ASYNC_GROUP_SYNC);
634                 if (rc)
635                         RETURN(rc);
636
637                 llap[i].llap_queued = 1;
638         }
639
640         RETURN(0);
641 }
642
643 static
644 int llu_start_async_io(struct llu_sysio_cookie *cookie)
645 {
646         struct lov_stripe_md *lsm = llu_i2info(cookie->lsc_inode)->lli_smd;
647         struct obd_export *exp = llu_i2obdexp(cookie->lsc_inode);
648
649         return obd_trigger_group_io(exp, lsm, NULL, cookie->lsc_oig);
650 }
651
652 /*
653  * read/write a continuous buffer for an inode (zero-copy)
654  */
655 struct llu_sysio_cookie*
656 llu_rw(int cmd, struct inode *inode, char *buf, size_t count, loff_t pos)
657 {
658         struct llu_sysio_cookie *cookie;
659         int max_pages, rc;
660         ENTRY;
661
662         max_pages = (count >> PAGE_SHIFT) + 2;
663
664         cookie = get_sysio_cookie(inode, max_pages);
665         if (!cookie)
666                 RETURN(ERR_PTR(-ENOMEM));
667
668         rc = llu_prep_async_io(cookie, cmd, buf, pos, count);
669         if (rc)
670                 GOTO(out_cleanup, rc);
671
672         rc = llu_start_async_io(cookie);
673         if (rc)
674                 GOTO(out_cleanup, rc);
675
676 /*
677         rc = oig_wait(&oig);
678         if (rc) {
679                 CERROR("file i/o error!\n");
680                 rw_count = rc;
681         }
682 */
683         RETURN(cookie);
684
685 out_cleanup:
686         put_sysio_cookie(cookie);
687         RETURN(ERR_PTR(rc));
688 }
689
690 void lov_increase_kms(struct obd_export *exp, struct lov_stripe_md *lsm,
691                       obd_off size);
692
693 struct llu_sysio_callback_args*
694 llu_file_write(struct inode *inode, const struct iovec *iovec,
695                size_t iovlen, loff_t pos)
696 {
697         struct llu_inode_info *lli = llu_i2info(inode);
698         struct ll_file_data *fd = lli->lli_file_data;
699         struct lustre_handle lockh = {0};
700         struct lov_stripe_md *lsm = lli->lli_smd;
701         struct obd_export *exp = NULL;
702         ldlm_policy_data_t policy;
703         struct llu_sysio_callback_args *lsca;
704         struct llu_sysio_cookie *cookie;
705         int astflag = (lli->lli_open_flags & O_NONBLOCK) ?
706                        LDLM_FL_BLOCK_NOWAIT : 0;
707         ldlm_error_t err;
708         int iovidx;
709         ENTRY;
710
711         /* XXX consider other types later */
712         if (!S_ISREG(lli->lli_st_mode))
713                 LBUG();
714
715         LASSERT(iovlen <= MAX_IOVEC);
716
717         exp = llu_i2obdexp(inode);
718         if (exp == NULL)
719                 RETURN(ERR_PTR(-EINVAL));
720
721         OBD_ALLOC(lsca, sizeof(*lsca));
722         if (!lsca)
723                 RETURN(ERR_PTR(-ENOMEM));
724
725         /* FIXME optimize the following extent locking */
726         for (iovidx = 0; iovidx < iovlen; iovidx++) {
727                 char *buf = (char*)iovec[iovidx].iov_base;
728                 size_t count = iovec[iovidx].iov_len;
729
730                 if (count == 0)
731                         continue;
732
733                 if (pos + count > lli->lli_maxbytes)
734                         GOTO(err_out, err = -ERANGE);
735
736                 /* FIXME libsysio haven't handle O_APPEND?? */
737                 policy.l_extent.start = pos;
738                 policy.l_extent.end = pos + count - 1;
739
740                 err = llu_extent_lock(fd, inode, lsm, LCK_PW, &policy,
741                                       &lockh, astflag);
742                 if (err != ELDLM_OK)
743                         GOTO(err_out, err = -ENOLCK);
744
745                 CDEBUG(D_INFO, "Writing inode %lu, "LPSZ" bytes, offset %Lu\n",
746                        lli->lli_st_ino, count, pos);
747
748                 cookie = llu_rw(OBD_BRW_WRITE, inode, buf, count, pos);
749                 if (!IS_ERR(cookie)) {
750                         /* save cookie */
751                         lsca->cookies[lsca->ncookies++] = cookie;
752                         pos += count;
753                         lov_increase_kms(exp, lsm, pos);
754                         /* file size grow */
755                         if (pos > lli->lli_st_size)
756                                 lli->lli_st_size = pos;
757                 } else {
758                         llu_extent_unlock(fd, inode, lsm, LCK_PW, &lockh);
759                         GOTO(err_out, err = PTR_ERR(cookie));
760                 }
761
762                 /* XXX errors? */
763                 err = llu_extent_unlock(fd, inode, lsm, LCK_PW, &lockh);
764                 if (err)
765                         CERROR("extent unlock error %d\n", err);
766         }
767
768         RETURN(lsca);
769
770 err_out:
771         /* teardown all async stuff */
772         while (lsca->ncookies--) {
773                 put_sysio_cookie(lsca->cookies[lsca->ncookies]);
774         }
775         OBD_FREE(lsca, sizeof(*lsca));
776
777         RETURN(ERR_PTR(err));
778 }
779
780 #if 0
781 static void llu_update_atime(struct inode *inode)
782 {
783         struct llu_inode_info *lli = llu_i2info(inode);
784
785 #ifdef USE_ATIME
786         struct iattr attr;
787
788         attr.ia_atime = LTIME_S(CURRENT_TIME);
789         attr.ia_valid = ATTR_ATIME;
790
791         if (lli->lli_st_atime == attr.ia_atime) return;
792         if (IS_RDONLY(inode)) return;
793         if (IS_NOATIME(inode)) return;
794
795         /* ll_inode_setattr() sets inode->i_atime from attr.ia_atime */
796         llu_inode_setattr(inode, &attr, 0);
797 #else
798         /* update atime, but don't explicitly write it out just this change */
799         inode->i_atime = CURRENT_TIME;
800 #endif
801 }
802 #endif
803
804 struct llu_sysio_callback_args*
805 llu_file_read(struct inode *inode, const struct iovec *iovec,
806               size_t iovlen, loff_t pos)
807 {
808         struct llu_inode_info *lli = llu_i2info(inode);
809         struct ll_file_data *fd = lli->lli_file_data;
810         struct lov_stripe_md *lsm = lli->lli_smd;
811         struct lustre_handle lockh = { 0 };
812         ldlm_policy_data_t policy;
813         struct llu_sysio_callback_args *lsca;
814         struct llu_sysio_cookie *cookie;
815         int astflag = (lli->lli_open_flags & O_NONBLOCK) ?
816                        LDLM_FL_BLOCK_NOWAIT : 0;
817         __u64 kms;
818         int iovidx;
819
820         ldlm_error_t err;
821         ENTRY;
822
823         OBD_ALLOC(lsca, sizeof(*lsca));
824         if (!lsca)
825                 RETURN(ERR_PTR(-ENOMEM));
826
827         for (iovidx = 0; iovidx < iovlen; iovidx++) {
828                 char *buf = iovec[iovidx].iov_base;
829                 size_t count = iovec[iovidx].iov_len;
830
831                 /* "If nbyte is 0, read() will return 0 and have no other results."
832                  *                      -- Single Unix Spec */
833                 if (count == 0)
834                         continue;
835
836                 policy.l_extent.start = pos;
837                 policy.l_extent.end = pos + count - 1;
838
839                 err = llu_extent_lock(fd, inode, lsm, LCK_PR, &policy,
840                                       &lockh, astflag);
841                 if (err != ELDLM_OK)
842                         GOTO(err_out, err = -ENOLCK);
843
844                 kms = lov_merge_size(lsm, 1);
845                 if (policy.l_extent.end > kms) {
846                         /* A glimpse is necessary to determine whether we
847                          * return a short read or some zeroes at the end of
848                          * the buffer */
849                         struct ost_lvb lvb;
850                         if (llu_glimpse_size(inode, &lvb)) {
851                                 llu_extent_unlock(fd, inode, lsm, LCK_PR, &lockh);
852                                 GOTO(err_out, err = -ENOLCK);
853                         }
854                         lli->lli_st_size = lvb.lvb_size;
855                 } else {
856                         lli->lli_st_size = kms;
857                 }
858
859                 CDEBUG(D_INFO, "Reading inode %lu, "LPSZ" bytes, offset %Ld, "
860                        "i_size "LPU64"\n", lli->lli_st_ino, count, pos,
861                        lli->lli_st_size);
862
863                 if (pos >= lli->lli_st_size) {
864                         llu_extent_unlock(fd, inode, lsm, LCK_PR, &lockh);
865                         break;
866                 }
867
868                 cookie = llu_rw(OBD_BRW_READ, inode, buf, count, pos);
869                 if (!IS_ERR(cookie)) {
870                         /* save cookie */
871                         lsca->cookies[lsca->ncookies++] = cookie;
872                         pos += count;
873                 } else {
874                         llu_extent_unlock(fd, inode, lsm, LCK_PR, &lockh);
875                         GOTO(err_out, err = PTR_ERR(cookie));
876                 }
877
878                 /* XXX errors? */
879                 err = llu_extent_unlock(fd, inode, lsm, LCK_PR, &lockh);
880                 if (err)
881                         CERROR("extent_unlock fail: %d\n", err);
882         }
883 #if 0
884         if (readed > 0)
885                 llu_update_atime(inode);
886 #endif
887         RETURN(lsca);
888
889 err_out:
890         /* teardown all async stuff */
891         while (lsca->ncookies--) {
892                 put_sysio_cookie(lsca->cookies[lsca->ncookies]);
893         }
894         OBD_FREE(lsca, sizeof(*lsca));
895
896         RETURN(ERR_PTR(err));
897 }
898
899 int llu_iop_iodone(struct ioctx *ioctxp)
900 {
901         struct llu_sysio_callback_args *lsca = ioctxp->ioctx_private;
902         struct llu_sysio_cookie *cookie;
903         int i, err = 0, rc = 0;
904         ENTRY;
905
906         /* write/read(fd, buf, 0) */
907         if (!lsca) {
908                 ioctxp->ioctx_cc = 0;
909                 RETURN(1);
910         }
911
912         LASSERT(!IS_ERR(lsca));
913
914         for (i = 0; i < lsca->ncookies; i++) {
915                 cookie = lsca->cookies[i];
916                 if (cookie) {
917                         err = oig_wait(cookie->lsc_oig);
918                         if (err && !rc)
919                                 rc = err;
920                         if (!rc)
921                                 ioctxp->ioctx_cc += cookie->lsc_rwcount;
922                         put_sysio_cookie(cookie);
923                 }
924         }
925
926         if (rc) {
927                 LASSERT(rc < 0);
928                 ioctxp->ioctx_cc = -1;
929                 ioctxp->ioctx_errno = -rc;
930         }
931
932         OBD_FREE(lsca, sizeof(*lsca));
933         ioctxp->ioctx_private = NULL;
934
935         RETURN(1);
936 }