Whamcloud - gitweb
Land b_smallfix onto HEAD (20040416_1638) (more 2.6 build fixes)
[fs/lustre-release.git] / lustre / liblustre / rw.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Lustre Light block IO
5  *
6  *  Copyright (c) 2002, 2003 Cluster File Systems, Inc.
7  *
8  *   This file is part of Lustre, http://www.lustre.org.
9  *
10  *   Lustre is free software; you can redistribute it and/or
11  *   modify it under the terms of version 2 of the GNU General Public
12  *   License as published by the Free Software Foundation.
13  *
14  *   Lustre is distributed in the hope that it will be useful,
15  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
16  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17  *   GNU General Public License for more details.
18  *
19  *   You should have received a copy of the GNU General Public License
20  *   along with Lustre; if not, write to the Free Software
21  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
22  */
23
24 #define DEBUG_SUBSYSTEM S_LLITE
25
26 #include <stdlib.h>
27 #include <string.h>
28 #include <assert.h>
29 #include <time.h>
30 #include <sys/types.h>
31 #include <sys/queue.h>
32 #include <fcntl.h>
33
34 #include <sysio.h>
35 #include <fs.h>
36 #include <mount.h>
37 #include <inode.h>
38 #include <file.h>
39
40 #undef LIST_HEAD
41
42 #include "llite_lib.h"
43
44 static int llu_lock_to_stripe_offset(struct inode *inode, struct ldlm_lock *lock)
45 {
46         struct llu_inode_info *lli = llu_i2info(inode);
47         struct lov_stripe_md *lsm = lli->lli_smd;
48         struct obd_export *exp = llu_i2obdexp(inode);
49         struct {
50                 char name[16];
51                 struct ldlm_lock *lock;
52                 struct lov_stripe_md *lsm;
53         } key = { .name = "lock_to_stripe", .lock = lock, .lsm = lsm };
54         __u32 stripe, vallen = sizeof(stripe);
55         int rc;
56         ENTRY;
57
58         if (lsm->lsm_stripe_count == 1)
59                 RETURN(0);
60
61         /* get our offset in the lov */
62         rc = obd_get_info(exp, sizeof(key), &key, &vallen, &stripe);
63         if (rc != 0) {
64                 CERROR("obd_get_info: rc = %d\n", rc);
65                 LBUG();
66         }
67         LASSERT(stripe < lsm->lsm_stripe_count);
68         RETURN(stripe);
69 }
70
71 static int llu_extent_lock_callback(struct ldlm_lock *lock,
72                                     struct ldlm_lock_desc *new, void *data,
73                                     int flag)
74 {
75         struct lustre_handle lockh = { 0 };
76         int rc;
77         ENTRY;
78
79         if ((unsigned long)data > 0 && (unsigned long)data < 0x1000) {
80                 LDLM_ERROR(lock, "cancelling lock with bad data %p", data);
81                 LBUG();
82         }
83
84         switch (flag) {
85         case LDLM_CB_BLOCKING:
86                 ldlm_lock2handle(lock, &lockh);
87                 rc = ldlm_cli_cancel(&lockh);
88                 if (rc != ELDLM_OK)
89                         CERROR("ldlm_cli_cancel failed: %d\n", rc);
90                 break;
91         case LDLM_CB_CANCELING: {
92                 struct inode *inode;
93                 struct llu_inode_info *lli;
94                 struct lov_stripe_md *lsm;
95                 __u32 stripe;
96                 __u64 kms;
97
98                 /* This lock wasn't granted, don't try to evict pages */
99                 if (lock->l_req_mode != lock->l_granted_mode)
100                         RETURN(0);
101
102                 inode = llu_inode_from_lock(lock);
103                 if (!inode)
104                         RETURN(0);
105                 lli= llu_i2info(inode);
106                 if (!lli)
107                         goto iput;
108                 if (!lli->lli_smd)
109                         goto iput;
110                 lsm = lli->lli_smd;
111
112                 stripe = llu_lock_to_stripe_offset(inode, lock);
113                 kms = ldlm_extent_shift_kms(lock,
114                                             lsm->lsm_oinfo[stripe].loi_kms);
115                 if (lsm->lsm_oinfo[stripe].loi_kms != kms)
116                         LDLM_DEBUG(lock, "updating kms from "LPU64" to "LPU64,
117                                    lsm->lsm_oinfo[stripe].loi_kms, kms);
118                 lsm->lsm_oinfo[stripe].loi_kms = kms;
119 iput:
120                 I_RELE(inode);
121                 break;
122         }
123         default:
124                 LBUG();
125         }
126
127         RETURN(0);
128 }
129
130 static int llu_glimpse_callback(struct ldlm_lock *lock, void *reqp)
131 {
132         struct ptlrpc_request *req = reqp;
133         struct inode *inode = llu_inode_from_lock(lock);
134         struct llu_inode_info *lli;
135         struct ost_lvb *lvb;
136         int rc, size = sizeof(*lvb), stripe = 0;
137         ENTRY;
138
139         if (inode == NULL)
140                 GOTO(out, rc = -ELDLM_NO_LOCK_DATA);
141         lli = llu_i2info(inode);
142         if (lli == NULL)
143                 GOTO(iput, rc = -ELDLM_NO_LOCK_DATA);
144         if (lli->lli_smd == NULL)
145                 GOTO(iput, rc = -ELDLM_NO_LOCK_DATA);
146
147         /* First, find out which stripe index this lock corresponds to. */
148         if (lli->lli_smd->lsm_stripe_count > 1)
149                 stripe = llu_lock_to_stripe_offset(inode, lock);
150
151         rc = lustre_pack_reply(req, 1, &size, NULL);
152         if (rc) {
153                 CERROR("lustre_pack_reply: %d\n", rc);
154                 GOTO(iput, rc);
155         }
156
157         lvb = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*lvb));
158         lvb->lvb_size = lli->lli_smd->lsm_oinfo[stripe].loi_kms;
159
160         LDLM_DEBUG(lock, "i_size: %llu -> stripe number %u -> kms "LPU64,
161                    lli->lli_st_size, stripe, lvb->lvb_size);
162  iput:
163         I_RELE(inode);
164  out:
165         /* These errors are normal races, so we don't want to fill the console
166          * with messages by calling ptlrpc_error() */
167         if (rc == -ELDLM_NO_LOCK_DATA)
168                 lustre_pack_reply(req, 0, NULL, NULL);
169
170         req->rq_status = rc;
171         return rc;
172 }
173
174 __u64 lov_merge_size(struct lov_stripe_md *lsm, int kms);
175 __u64 lov_merge_mtime(struct lov_stripe_md *lsm, __u64 current_time);
176
177 /* NB: lov_merge_size will prefer locally cached writes if they extend the
178  * file (because it prefers KMS over RSS when larger) */
179 int llu_glimpse_size(struct inode *inode, struct ost_lvb *lvb)
180 {
181         struct llu_inode_info *lli = llu_i2info(inode);
182         struct llu_sb_info *sbi = llu_i2sbi(inode);
183         ldlm_policy_data_t policy = { .l_extent = { 0, OBD_OBJECT_EOF } };
184         struct lustre_handle lockh = { 0 };
185         int rc, flags = LDLM_FL_HAS_INTENT;
186         ENTRY;
187
188         CDEBUG(D_DLMTRACE, "Glimpsing inode %lu\n", lli->lli_st_ino);
189
190         rc = obd_enqueue(sbi->ll_osc_exp, lli->lli_smd, LDLM_EXTENT, &policy,
191                          LCK_PR, &flags, llu_extent_lock_callback,
192                          ldlm_completion_ast, llu_glimpse_callback, inode,
193                          sizeof(*lvb), lustre_swab_ost_lvb, &lockh);
194         if (rc > 0)
195                 RETURN(-EIO);
196
197         lvb->lvb_size = lov_merge_size(lli->lli_smd, 0);
198         //inode->i_mtime = lov_merge_mtime(lli->lli_smd, inode->i_mtime);
199
200         CDEBUG(D_DLMTRACE, "glimpse: size: "LPU64"\n", lvb->lvb_size);
201
202         obd_cancel(sbi->ll_osc_exp, lli->lli_smd, LCK_PR, &lockh);
203
204         RETURN(rc);
205 }
206
207 int llu_extent_lock(struct ll_file_data *fd, struct inode *inode,
208                     struct lov_stripe_md *lsm, int mode,
209                     ldlm_policy_data_t *policy, struct lustre_handle *lockh,
210                     int ast_flags)
211 {
212         struct llu_sb_info *sbi = llu_i2sbi(inode);
213         struct llu_inode_info *lli = llu_i2info(inode);
214         int rc;
215         ENTRY;
216
217         LASSERT(lockh->cookie == 0);
218
219         /* XXX phil: can we do this?  won't it screw the file size up? */
220         if ((fd && (fd->fd_flags & LL_FILE_IGNORE_LOCK)) ||
221             (sbi->ll_flags & LL_SBI_NOLCK))
222                 RETURN(0);
223
224         CDEBUG(D_DLMTRACE, "Locking inode %lu, start "LPU64" end "LPU64"\n",
225                lli->lli_st_ino, policy->l_extent.start, policy->l_extent.end);
226
227         rc = obd_enqueue(sbi->ll_osc_exp, lsm, LDLM_EXTENT, policy, mode,
228                          &ast_flags, llu_extent_lock_callback,
229                          ldlm_completion_ast, llu_glimpse_callback, inode,
230                          sizeof(struct ost_lvb), lustre_swab_ost_lvb, lockh);
231         if (rc > 0)
232                 rc = -EIO;
233
234         if (policy->l_extent.start == 0 &&
235             policy->l_extent.end == OBD_OBJECT_EOF)
236                 lli->lli_st_size = lov_merge_size(lsm, 1);
237
238         //inode->i_mtime = lov_merge_mtime(lsm, inode->i_mtime);
239
240         RETURN(rc);
241 }
242
243 #if 0
244 int llu_extent_lock_no_validate(struct ll_file_data *fd,
245                                 struct inode *inode,
246                                 struct lov_stripe_md *lsm,
247                                 int mode,
248                                 struct ldlm_extent *extent,
249                                 struct lustre_handle *lockh,
250                                 int ast_flags)
251 {
252         struct llu_sb_info *sbi = llu_i2sbi(inode);
253         struct llu_inode_info *lli = llu_i2info(inode);
254         int rc;
255         ENTRY;
256
257         LASSERT(lockh->cookie == 0);
258
259         /* XXX phil: can we do this?  won't it screw the file size up? */
260         if ((fd && (fd->fd_flags & LL_FILE_IGNORE_LOCK)) ||
261             (sbi->ll_flags & LL_SBI_NOLCK))
262                 RETURN(0);
263
264         CDEBUG(D_DLMTRACE, "Locking inode %lu, start "LPU64" end "LPU64"\n",
265                lli->lli_st_ino, extent->start, extent->end);
266
267         rc = obd_enqueue(sbi->ll_osc_exp, lsm, NULL, LDLM_EXTENT, extent,
268                          sizeof(extent), mode, &ast_flags,
269                          llu_extent_lock_callback, inode, lockh);
270
271         RETURN(rc);
272 }
273
274 /*
275  * this grabs a lock and manually implements behaviour that makes it look like
276  * the OST is returning the file size with each lock acquisition.
277  */
278 int llu_extent_lock(struct ll_file_data *fd, struct inode *inode,
279                     struct lov_stripe_md *lsm, int mode,
280                     struct ldlm_extent *extent, struct lustre_handle *lockh,
281                     int nonblock)
282 {
283         struct llu_inode_info *lli = llu_i2info(inode);
284         struct obd_export *exp = llu_i2obdexp(inode);
285         struct ldlm_extent size_lock;
286         struct lustre_handle match_lockh = {0};
287         int flags, rc, matched;
288         int astflags = nonblock ? LDLM_FL_BLOCK_NOWAIT : 0;
289         ENTRY;
290
291         rc = llu_extent_lock_no_validate(fd, inode, lsm, mode, extent,
292                                          lockh, astflags);
293         if (rc != ELDLM_OK)
294                 RETURN(rc);
295
296         if (test_bit(LLI_F_HAVE_OST_SIZE_LOCK, &lli->lli_flags))
297                 RETURN(0);
298
299         rc = llu_inode_getattr(inode, lsm);
300         if (rc) {
301                 llu_extent_unlock(fd, inode, lsm, mode, lockh);
302                 RETURN(rc);
303         }
304
305         size_lock.start = lli->lli_st_size;
306         size_lock.end = OBD_OBJECT_EOF;
307
308         /* XXX I bet we should be checking the lock ignore flags.. */
309         /* FIXME use LDLM_FL_TEST_LOCK instead */
310         flags = LDLM_FL_CBPENDING | LDLM_FL_BLOCK_GRANTED;
311         matched = obd_match(exp, lsm, LDLM_EXTENT, &size_lock,
312                             sizeof(size_lock), LCK_PR, &flags, inode,
313                             &match_lockh);
314
315         /* hey, alright, we hold a size lock that covers the size we
316          * just found, its not going to change for a while.. */
317         if (matched == 1) {
318                 set_bit(LLI_F_HAVE_OST_SIZE_LOCK, &lli->lli_flags);
319                 obd_cancel(exp, lsm, LCK_PR, &match_lockh);
320         }
321
322         RETURN(0);
323 }
324 #endif
325
326 int llu_extent_unlock(struct ll_file_data *fd, struct inode *inode,
327                 struct lov_stripe_md *lsm, int mode,
328                 struct lustre_handle *lockh)
329 {
330         struct llu_sb_info *sbi = llu_i2sbi(inode);
331         int rc;
332         ENTRY;
333
334         /* XXX phil: can we do this?  won't it screw the file size up? */
335         if ((fd && (fd->fd_flags & LL_FILE_IGNORE_LOCK)) ||
336             (sbi->ll_flags & LL_SBI_NOLCK))
337                 RETURN(0);
338
339         rc = obd_cancel(sbi->ll_osc_exp, lsm, mode, lockh);
340
341         RETURN(rc);
342 }
343
344 #define LLAP_MAGIC 12346789
345
346 struct ll_async_page {
347         int             llap_magic;
348         void           *llap_cookie;
349         int             llap_queued;
350         struct page    *llap_page;
351         struct inode   *llap_inode;
352 };
353
354 static struct ll_async_page *llap_from_cookie(void *cookie)
355 {
356         struct ll_async_page *llap = cookie;
357         if (llap->llap_magic != LLAP_MAGIC)
358                 return ERR_PTR(-EINVAL);
359         return llap;
360 };
361
362 static void llu_ap_fill_obdo(void *data, int cmd, struct obdo *oa)
363 {
364         struct ll_async_page *llap;
365         struct inode *inode;
366         struct lov_stripe_md *lsm;
367         obd_flag valid_flags;
368         ENTRY;
369
370         llap = llap_from_cookie(data);
371         if (IS_ERR(llap)) {
372                 EXIT;
373                 return;
374         }
375
376         inode = llap->llap_inode;
377         lsm = llu_i2info(inode)->lli_smd;
378
379         oa->o_id = lsm->lsm_object_id;
380         oa->o_valid = OBD_MD_FLID;
381         valid_flags = OBD_MD_FLTYPE | OBD_MD_FLATIME;
382         if (cmd == OBD_BRW_WRITE)
383                 valid_flags |= OBD_MD_FLMTIME | OBD_MD_FLCTIME;
384
385         obdo_from_inode(oa, inode, valid_flags);
386         EXIT;
387 }
388
389 /* called for each page in a completed rpc.*/
390 static void llu_ap_completion(void *data, int cmd, struct obdo *oa, int rc)
391 {
392         struct ll_async_page *llap;
393         struct page *page;
394
395         llap = llap_from_cookie(data);
396         if (IS_ERR(llap)) {
397                 EXIT;
398                 return;
399         }
400
401         llap->llap_queued = 0;
402         page = llap->llap_page;
403
404         if (rc != 0) {
405                 if (cmd == OBD_BRW_WRITE)
406                         CERROR("writeback error on page %p index %ld: %d\n", 
407                                page, page->index, rc);
408         }
409         EXIT;
410 }
411
412 static struct obd_async_page_ops llu_async_page_ops = {
413         .ap_make_ready =        NULL,
414         .ap_refresh_count =     NULL,
415         .ap_fill_obdo =         llu_ap_fill_obdo,
416         .ap_completion =        llu_ap_completion,
417 };
418
419 static
420 struct llu_sysio_cookie* get_sysio_cookie(struct inode *inode, int maxpages)
421 {
422         struct llu_sysio_cookie *cookie;
423         int rc;
424
425         OBD_ALLOC(cookie, LLU_SYSIO_COOKIE_SIZE(maxpages));
426         if (cookie == NULL)
427                 goto out;
428
429         I_REF(inode);
430         cookie->lsc_inode = inode;
431         cookie->lsc_maxpages = maxpages;
432         cookie->lsc_llap = (struct ll_async_page *)(cookie + 1);
433         cookie->lsc_pages = (struct page *) (cookie->lsc_llap + maxpages);
434
435         rc = oig_init(&cookie->lsc_oig);
436         if (rc) {
437                 OBD_FREE(cookie, LLU_SYSIO_COOKIE_SIZE(maxpages));
438                 cookie = NULL;
439         }
440
441 out:
442         return cookie;
443 }
444
445 static
446 void put_sysio_cookie(struct llu_sysio_cookie *cookie)
447 {
448         struct lov_stripe_md *lsm = llu_i2info(cookie->lsc_inode)->lli_smd;
449         struct obd_export *exp = llu_i2obdexp(cookie->lsc_inode);
450         struct ll_async_page *llap = cookie->lsc_llap;
451 #ifdef LIBLUSTRE_HANDLE_UNALIGNED_PAGE
452         struct page *pages = cookie->lsc_pages;
453 #endif
454         int i;
455
456         for (i = 0; i< cookie->lsc_maxpages; i++) {
457                 if (llap[i].llap_cookie)
458                         obd_teardown_async_page(exp, lsm, NULL,
459                                                 llap[i].llap_cookie);
460 #ifdef LIBLUSTRE_HANDLE_UNALIGNED_PAGE
461                 if (pages[i]._managed) {
462                         free(pages[i].addr);
463                         pages[i]._managed = 0;
464                 }
465 #endif
466         }
467
468         I_RELE(cookie->lsc_inode);
469
470         oig_release(cookie->lsc_oig);
471         OBD_FREE(cookie, LLU_SYSIO_COOKIE_SIZE(cookie->lsc_maxpages));
472 }
473
474 #ifdef LIBLUSTRE_HANDLE_UNALIGNED_PAGE
475 /* Note: these code should be removed finally, don't need
476  * more cleanup
477  */
478 static
479 int prepare_unaligned_write(struct llu_sysio_cookie *cookie)
480 {
481         struct inode *inode = cookie->lsc_inode;
482         struct llu_inode_info *lli = llu_i2info(inode);
483         struct lov_stripe_md *lsm = lli->lli_smd;
484         struct obdo oa;
485         struct page *pages = cookie->lsc_pages;
486         int i, pgidx[2] = {0, cookie->lsc_npages-1};
487         int rc;
488         ENTRY;
489
490         for (i = 0; i < 2; i++) {
491                 struct page *oldpage = &pages[pgidx[i]];
492                 struct page newpage;
493                 struct brw_page pg;
494                 char *newbuf;
495
496                 if (i == 0 && pgidx[0] == pgidx[1])
497                         continue;
498
499                 LASSERT(oldpage->_offset + oldpage->_count <= PAGE_CACHE_SIZE);
500
501                 if (oldpage->_count == PAGE_CACHE_SIZE)
502                         continue;
503
504                 if (oldpage->index << PAGE_CACHE_SHIFT >=
505                     lli->lli_st_size)
506                         continue;
507
508                 newbuf = malloc(PAGE_CACHE_SIZE);
509                 if (!newbuf)
510                         return -ENOMEM;
511
512                 newpage.index = oldpage->index;
513                 newpage.addr = newbuf;
514
515                 pg.pg = &newpage;
516                 pg.off = ((obd_off)newpage.index << PAGE_CACHE_SHIFT);
517                 if (pg.off + PAGE_CACHE_SIZE > lli->lli_st_size)
518                         pg.count = lli->lli_st_size % PAGE_CACHE_SIZE;
519                 else
520                         pg.count = PAGE_CACHE_SIZE;
521                 pg.flag = 0;
522
523                 oa.o_id = lsm->lsm_object_id;
524                 oa.o_mode = lli->lli_st_mode;
525                 oa.o_valid = OBD_MD_FLID | OBD_MD_FLMODE | OBD_MD_FLTYPE;
526
527                 /* issue read */
528                 rc = obd_brw(OBD_BRW_READ, llu_i2obdexp(inode), &oa, lsm, 1, &pg, NULL);
529                 if (rc) {
530                         free(newbuf);
531                         RETURN(rc);
532                 }
533
534                 /* copy page content, and reset page params */
535                 memcpy(newbuf + oldpage->_offset,
536                        (char*)oldpage->addr + oldpage->_offset,
537                        oldpage->_count);
538
539                 oldpage->addr = newbuf;
540                 if ((((obd_off)oldpage->index << PAGE_CACHE_SHIFT) +
541                     oldpage->_offset + oldpage->_count) > lli->lli_st_size)
542                         oldpage->_count += oldpage->_offset;
543                 else
544                         oldpage->_count = PAGE_CACHE_SIZE;
545                 oldpage->_offset = 0;
546                 oldpage->_managed = 1;
547         }
548
549         RETURN(0);
550 }
551 #endif
552
553 static
554 int llu_prep_async_io(struct llu_sysio_cookie *cookie, int cmd,
555                       char *buf, loff_t pos, size_t count)
556 {
557         struct llu_inode_info *lli = llu_i2info(cookie->lsc_inode);
558         struct lov_stripe_md *lsm = lli->lli_smd;
559         struct obd_export *exp = llu_i2obdexp(cookie->lsc_inode);
560         struct page *pages = cookie->lsc_pages;
561         struct ll_async_page *llap = cookie->lsc_llap;
562         int i, rc, npages = 0;
563         ENTRY;
564
565         if (!exp)
566                 RETURN(-EINVAL);
567
568         /* prepare the pages array */
569         do {
570                 unsigned long index, offset, bytes;
571
572                 offset = (pos & ~PAGE_CACHE_MASK);
573                 index = pos >> PAGE_CACHE_SHIFT;
574                 bytes = PAGE_CACHE_SIZE - offset;
575                 if (bytes > count)
576                         bytes = count;
577
578                 /* prevent read beyond file range */
579                 if ((cmd == OBD_BRW_READ) &&
580                     (pos + bytes) >= lli->lli_st_size) {
581                         if (pos >= lli->lli_st_size)
582                                 break;
583                         bytes = lli->lli_st_size - pos;
584                 }
585
586                 /* prepare page for this index */
587                 pages[npages].index = index;
588                 pages[npages].addr = buf - offset;
589
590                 pages[npages]._offset = offset;
591                 pages[npages]._count = bytes;
592
593                 npages++;
594                 count -= bytes;
595                 pos += bytes;
596                 buf += bytes;
597
598                 cookie->lsc_rwcount += bytes;
599         } while (count);
600
601         cookie->lsc_npages = npages;
602
603 #ifdef LIBLUSTRE_HANDLE_UNALIGNED_PAGE
604         if (cmd == OBD_BRW_WRITE) {
605                 rc = prepare_unaligned_write(cookie);
606                 if (rc)
607                         RETURN(rc);
608         }
609 #endif
610
611         for (i = 0; i < npages; i++) {
612                 llap[i].llap_magic = LLAP_MAGIC;
613                 rc = obd_prep_async_page(exp, lsm, NULL, &pages[i],
614                                          (obd_off)pages[i].index << PAGE_SHIFT,
615                                          &llu_async_page_ops,
616                                          &llap[i], &llap[i].llap_cookie);
617                 if (rc) {
618                         llap[i].llap_cookie = NULL;
619                         RETURN(rc);
620                 }
621                 CDEBUG(D_CACHE, "llap %p page %p cookie %p obj off "LPU64"\n",
622                        &llap[i], &pages[i], llap[i].llap_cookie,
623                        (obd_off)pages[i].index << PAGE_SHIFT);
624                 pages[i].private = (unsigned long)&llap[i];
625                 llap[i].llap_page = &pages[i];
626                 llap[i].llap_inode = cookie->lsc_inode;
627
628                 rc = obd_queue_group_io(exp, lsm, NULL, cookie->lsc_oig,
629                                         llap[i].llap_cookie, cmd,
630                                         pages[i]._offset, pages[i]._count, 0,
631                                         ASYNC_READY | ASYNC_URGENT |
632                                         ASYNC_COUNT_STABLE | ASYNC_GROUP_SYNC);
633                 if (rc)
634                         RETURN(rc);
635
636                 llap[i].llap_queued = 1;
637         }
638
639         RETURN(0);
640 }
641
642 static
643 int llu_start_async_io(struct llu_sysio_cookie *cookie)
644 {
645         struct lov_stripe_md *lsm = llu_i2info(cookie->lsc_inode)->lli_smd;
646         struct obd_export *exp = llu_i2obdexp(cookie->lsc_inode);
647
648         return obd_trigger_group_io(exp, lsm, NULL, cookie->lsc_oig);
649 }
650
651 /*
652  * read/write a continuous buffer for an inode (zero-copy)
653  */
654 struct llu_sysio_cookie*
655 llu_rw(int cmd, struct inode *inode, char *buf, size_t count, loff_t pos)
656 {
657         struct llu_sysio_cookie *cookie;
658         int max_pages, rc;
659         ENTRY;
660
661         max_pages = (count >> PAGE_SHIFT) + 2;
662
663         cookie = get_sysio_cookie(inode, max_pages);
664         if (!cookie)
665                 RETURN(ERR_PTR(-ENOMEM));
666
667         rc = llu_prep_async_io(cookie, cmd, buf, pos, count);
668         if (rc)
669                 GOTO(out_cleanup, rc);
670
671         rc = llu_start_async_io(cookie);
672         if (rc)
673                 GOTO(out_cleanup, rc);
674
675 /*
676         rc = oig_wait(&oig);
677         if (rc) {
678                 CERROR("file i/o error!\n");
679                 rw_count = rc;
680         }
681 */
682         RETURN(cookie);
683
684 out_cleanup:
685         put_sysio_cookie(cookie);
686         RETURN(ERR_PTR(rc));
687 }
688
689 void lov_increase_kms(struct obd_export *exp, struct lov_stripe_md *lsm,
690                       obd_off size);
691
692 struct llu_sysio_callback_args*
693 llu_file_write(struct inode *inode, const struct iovec *iovec,
694                size_t iovlen, loff_t pos)
695 {
696         struct llu_inode_info *lli = llu_i2info(inode);
697         struct ll_file_data *fd = lli->lli_file_data;
698         struct lustre_handle lockh = {0};
699         struct lov_stripe_md *lsm = lli->lli_smd;
700         struct obd_export *exp = NULL;
701         ldlm_policy_data_t policy;
702         struct llu_sysio_callback_args *lsca;
703         struct llu_sysio_cookie *cookie;
704         int astflag = (lli->lli_open_flags & O_NONBLOCK) ?
705                        LDLM_FL_BLOCK_NOWAIT : 0;
706         ldlm_error_t err;
707         int iovidx;
708         ENTRY;
709
710         /* XXX consider other types later */
711         if (!S_ISREG(lli->lli_st_mode))
712                 LBUG();
713
714         LASSERT(iovlen <= MAX_IOVEC);
715
716         exp = llu_i2obdexp(inode);
717         if (exp == NULL)
718                 RETURN(ERR_PTR(-EINVAL));
719
720         OBD_ALLOC(lsca, sizeof(*lsca));
721         if (!lsca)
722                 RETURN(ERR_PTR(-ENOMEM));
723
724         /* FIXME optimize the following extent locking */
725         for (iovidx = 0; iovidx < iovlen; iovidx++) {
726                 char *buf = (char*)iovec[iovidx].iov_base;
727                 size_t count = iovec[iovidx].iov_len;
728
729                 if (count == 0)
730                         continue;
731
732                 if (pos + count > lli->lli_maxbytes)
733                         GOTO(err_out, err = -ERANGE);
734
735                 /* FIXME libsysio haven't handle O_APPEND?? */
736                 policy.l_extent.start = pos;
737                 policy.l_extent.end = pos + count - 1;
738
739                 err = llu_extent_lock(fd, inode, lsm, LCK_PW, &policy,
740                                       &lockh, astflag);
741                 if (err != ELDLM_OK)
742                         GOTO(err_out, err = -ENOLCK);
743
744                 CDEBUG(D_INFO, "Writing inode %lu, "LPSZ" bytes, offset %Lu\n",
745                        lli->lli_st_ino, count, pos);
746
747                 cookie = llu_rw(OBD_BRW_WRITE, inode, buf, count, pos);
748                 if (!IS_ERR(cookie)) {
749                         /* save cookie */
750                         lsca->cookies[lsca->ncookies++] = cookie;
751                         pos += count;
752                         lov_increase_kms(exp, lsm, pos);
753                         /* file size grow */
754                         if (pos > lli->lli_st_size)
755                                 lli->lli_st_size = pos;
756                 } else {
757                         llu_extent_unlock(fd, inode, lsm, LCK_PW, &lockh);
758                         GOTO(err_out, err = PTR_ERR(cookie));
759                 }
760
761                 /* XXX errors? */
762                 err = llu_extent_unlock(fd, inode, lsm, LCK_PW, &lockh);
763                 if (err)
764                         CERROR("extent unlock error %d\n", err);
765         }
766
767         RETURN(lsca);
768
769 err_out:
770         /* teardown all async stuff */
771         while (lsca->ncookies--) {
772                 put_sysio_cookie(lsca->cookies[lsca->ncookies]);
773         }
774         OBD_FREE(lsca, sizeof(*lsca));
775
776         RETURN(ERR_PTR(err));
777 }
778
779 #if 0
780 static void llu_update_atime(struct inode *inode)
781 {
782         struct llu_inode_info *lli = llu_i2info(inode);
783
784 #ifdef USE_ATIME
785         struct iattr attr;
786
787         attr.ia_atime = LTIME_S(CURRENT_TIME);
788         attr.ia_valid = ATTR_ATIME;
789
790         if (lli->lli_st_atime == attr.ia_atime) return;
791         if (IS_RDONLY(inode)) return;
792         if (IS_NOATIME(inode)) return;
793
794         /* ll_inode_setattr() sets inode->i_atime from attr.ia_atime */
795         llu_inode_setattr(inode, &attr, 0);
796 #else
797         /* update atime, but don't explicitly write it out just this change */
798         inode->i_atime = CURRENT_TIME;
799 #endif
800 }
801 #endif
802
803 struct llu_sysio_callback_args*
804 llu_file_read(struct inode *inode, const struct iovec *iovec,
805               size_t iovlen, loff_t pos)
806 {
807         struct llu_inode_info *lli = llu_i2info(inode);
808         struct ll_file_data *fd = lli->lli_file_data;
809         struct lov_stripe_md *lsm = lli->lli_smd;
810         struct lustre_handle lockh = { 0 };
811         ldlm_policy_data_t policy;
812         struct llu_sysio_callback_args *lsca;
813         struct llu_sysio_cookie *cookie;
814         int astflag = (lli->lli_open_flags & O_NONBLOCK) ?
815                        LDLM_FL_BLOCK_NOWAIT : 0;
816         __u64 kms;
817         int iovidx;
818
819         ldlm_error_t err;
820         ENTRY;
821
822         OBD_ALLOC(lsca, sizeof(*lsca));
823         if (!lsca)
824                 RETURN(ERR_PTR(-ENOMEM));
825
826         for (iovidx = 0; iovidx < iovlen; iovidx++) {
827                 char *buf = iovec[iovidx].iov_base;
828                 size_t count = iovec[iovidx].iov_len;
829
830                 /* "If nbyte is 0, read() will return 0 and have no other results."
831                  *                      -- Single Unix Spec */
832                 if (count == 0)
833                         continue;
834
835                 policy.l_extent.start = pos;
836                 policy.l_extent.end = pos + count - 1;
837
838                 err = llu_extent_lock(fd, inode, lsm, LCK_PR, &policy,
839                                       &lockh, astflag);
840                 if (err != ELDLM_OK)
841                         GOTO(err_out, err = -ENOLCK);
842
843                 kms = lov_merge_size(lsm, 1);
844                 if (policy.l_extent.end > kms) {
845                         /* A glimpse is necessary to determine whether we
846                          * return a short read or some zeroes at the end of
847                          * the buffer */
848                         struct ost_lvb lvb;
849                         if (llu_glimpse_size(inode, &lvb)) {
850                                 llu_extent_unlock(fd, inode, lsm, LCK_PR, &lockh);
851                                 GOTO(err_out, err = -ENOLCK);
852                         }
853                         lli->lli_st_size = lvb.lvb_size;
854                 } else {
855                         lli->lli_st_size = kms;
856                 }
857
858                 CDEBUG(D_INFO, "Reading inode %lu, "LPSZ" bytes, offset %Ld, "
859                        "i_size "LPU64"\n", lli->lli_st_ino, count, pos,
860                        lli->lli_st_size);
861
862                 if (pos >= lli->lli_st_size) {
863                         llu_extent_unlock(fd, inode, lsm, LCK_PR, &lockh);
864                         break;
865                 }
866
867                 cookie = llu_rw(OBD_BRW_READ, inode, buf, count, pos);
868                 if (!IS_ERR(cookie)) {
869                         /* save cookie */
870                         lsca->cookies[lsca->ncookies++] = cookie;
871                         pos += count;
872                 } else {
873                         llu_extent_unlock(fd, inode, lsm, LCK_PR, &lockh);
874                         GOTO(err_out, err = PTR_ERR(cookie));
875                 }
876
877                 /* XXX errors? */
878                 err = llu_extent_unlock(fd, inode, lsm, LCK_PR, &lockh);
879                 if (err)
880                         CERROR("extent_unlock fail: %d\n", err);
881         }
882 #if 0
883         if (readed > 0)
884                 llu_update_atime(inode);
885 #endif
886         RETURN(lsca);
887
888 err_out:
889         /* teardown all async stuff */
890         while (lsca->ncookies--) {
891                 put_sysio_cookie(lsca->cookies[lsca->ncookies]);
892         }
893         OBD_FREE(lsca, sizeof(*lsca));
894
895         RETURN(ERR_PTR(err));
896 }
897
898 int llu_iop_iodone(struct ioctx *ioctxp)
899 {
900         struct llu_sysio_callback_args *lsca = ioctxp->ioctx_private;
901         struct llu_sysio_cookie *cookie;
902         int i, err = 0, rc = 0;
903         ENTRY;
904
905         /* write/read(fd, buf, 0) */
906         if (!lsca) {
907                 ioctxp->ioctx_cc = 0;
908                 RETURN(1);
909         }
910
911         LASSERT(!IS_ERR(lsca));
912
913         for (i = 0; i < lsca->ncookies; i++) {
914                 cookie = lsca->cookies[i];
915                 if (cookie) {
916                         err = oig_wait(cookie->lsc_oig);
917                         if (err && !rc)
918                                 rc = err;
919                         if (!rc)
920                                 ioctxp->ioctx_cc += cookie->lsc_rwcount;
921                         put_sysio_cookie(cookie);
922                 }
923         }
924
925         if (rc) {
926                 LASSERT(rc < 0);
927                 ioctxp->ioctx_cc = -1;
928                 ioctxp->ioctx_errno = -rc;
929         }
930
931         OBD_FREE(lsca, sizeof(*lsca));
932         ioctxp->ioctx_private = NULL;
933
934         RETURN(1);
935 }