Whamcloud - gitweb
Land b1_2 onto HEAD (20040304_171022)
[fs/lustre-release.git] / lustre / liblustre / rw.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Lustre Light Super operations
5  *
6  *  Copyright (c) 2002, 2003 Cluster File Systems, Inc.
7  *
8  *   This file is part of Lustre, http://www.lustre.org.
9  *
10  *   Lustre is free software; you can redistribute it and/or
11  *   modify it under the terms of version 2 of the GNU General Public
12  *   License as published by the Free Software Foundation.
13  *
14  *   Lustre is distributed in the hope that it will be useful,
15  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
16  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17  *   GNU General Public License for more details.
18  *
19  *   You should have received a copy of the GNU General Public License
20  *   along with Lustre; if not, write to the Free Software
21  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
22  */
23
24 #define DEBUG_SUBSYSTEM S_LLITE
25
26 #include <stdlib.h>
27 #include <string.h>
28 #include <assert.h>
29 #include <time.h>
30 #include <sys/types.h>
31 #include <sys/queue.h>
32
33 #include <sysio.h>
34 #include <fs.h>
35 #include <mount.h>
36 #include <inode.h>
37 #include <file.h>
38
39 #undef LIST_HEAD
40
41 #include "llite_lib.h"
42
43 static int llu_lock_to_stripe_offset(struct inode *inode, struct ldlm_lock *lock)
44 {
45         struct llu_inode_info *lli = llu_i2info(inode);
46         struct lov_stripe_md *lsm = lli->lli_smd;
47         struct obd_export *exp = llu_i2obdexp(inode);
48         struct {
49                 char name[16];
50                 struct ldlm_lock *lock;
51                 struct lov_stripe_md *lsm;
52         } key = { .name = "lock_to_stripe", .lock = lock, .lsm = lsm };
53         __u32 stripe, vallen = sizeof(stripe);
54         int rc;
55         ENTRY;
56
57         if (lsm->lsm_stripe_count == 1)
58                 RETURN(0);
59
60         /* get our offset in the lov */
61         rc = obd_get_info(exp, sizeof(key), &key, &vallen, &stripe);
62         if (rc != 0) {
63                 CERROR("obd_get_info: rc = %d\n", rc);
64                 LBUG();
65         }
66         LASSERT(stripe < lsm->lsm_stripe_count);
67         RETURN(stripe);
68 }
69
70 static int llu_extent_lock_callback(struct ldlm_lock *lock,
71                                     struct ldlm_lock_desc *new, void *data,
72                                     int flag)
73 {
74         struct lustre_handle lockh = { 0 };
75         int rc;
76         ENTRY;
77         
78
79         if ((unsigned long)data > 0 && (unsigned long)data < 0x1000) {
80                 LDLM_ERROR(lock, "cancelling lock with bad data %p", data);
81                 LBUG();
82         }
83         
84         switch (flag) {
85         case LDLM_CB_BLOCKING:
86                 ldlm_lock2handle(lock, &lockh);
87                 rc = ldlm_cli_cancel(&lockh);
88                 if (rc != ELDLM_OK)
89                         CERROR("ldlm_cli_cancel failed: %d\n", rc);
90                 break;
91         case LDLM_CB_CANCELING: {
92                 struct inode *inode = llu_inode_from_lock(lock);
93                 struct llu_inode_info *lli;
94                 struct lov_stripe_md *lsm;
95                 __u32 stripe;
96                 __u64 kms;
97                 
98                 if (!inode)
99                         RETURN(0);
100                 lli= llu_i2info(inode);
101                 if (!lli)
102                         goto iput;
103                 if (!lli->lli_smd)
104                         goto iput;
105                 lsm = lli->lli_smd;
106
107                 stripe = llu_lock_to_stripe_offset(inode, lock);
108                 kms = ldlm_extent_shift_kms(lock,
109                                             lsm->lsm_oinfo[stripe].loi_kms);
110                 if (lsm->lsm_oinfo[stripe].loi_kms != kms)
111                         LDLM_DEBUG(lock, "updating kms from "LPU64" to "LPU64,
112                                    lsm->lsm_oinfo[stripe].loi_kms, kms);
113                 lsm->lsm_oinfo[stripe].loi_kms = kms;
114 iput:
115                 I_RELE(inode);
116                 break;
117         }
118         default:
119                 LBUG();
120         }
121         
122         RETURN(0);
123 }
124
125 static int llu_glimpse_callback(struct ldlm_lock *lock, void *reqp)
126 {
127         struct ptlrpc_request *req = reqp;
128         struct inode *inode = llu_inode_from_lock(lock);
129         struct obd_export *exp;
130         struct llu_inode_info *lli;
131         struct ost_lvb *lvb;
132         struct {
133                 int stripe_number;
134                 __u64 size;
135                 struct lov_stripe_md *lsm;
136         } data;
137         __u32 vallen = sizeof(data);
138         int rc, size = sizeof(*lvb);
139         ENTRY;
140
141         if (inode == NULL)
142                 RETURN(0);
143         lli = llu_i2info(inode);
144         if (lli == NULL)
145                 goto iput;
146         if (lli->lli_smd == NULL)
147                 goto iput;
148         exp = llu_i2obdexp(inode);
149
150         /* First, find out which stripe index this lock corresponds to. */
151         if (lli->lli_smd->lsm_stripe_count > 1)
152                 data.stripe_number = llu_lock_to_stripe_offset(inode, lock);
153         else
154                 data.stripe_number = 0;
155
156         data.size = lli->lli_st_size;
157         data.lsm = lli->lli_smd;
158
159         rc = obd_get_info(exp, strlen("size_to_stripe"), "size_to_stripe",
160                           &vallen, &data);
161         if (rc != 0) {
162                 CERROR("obd_get_info: rc = %d\n", rc);
163                 LBUG();
164         }
165
166         LDLM_DEBUG(lock, "i_size: %Lu -> stripe number %d -> size %Lu",
167                    lli->lli_st_size, data.stripe_number, data.size);
168
169         rc = lustre_pack_reply(req, 1, &size, NULL);
170         if (rc) {
171                 CERROR("lustre_pack_reply: %d\n", rc);
172                 goto iput;
173         }
174
175         lvb = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*lvb));
176         lvb->lvb_size = data.size;
177         ptlrpc_reply(req);
178
179  iput:
180         I_RELE(inode);
181         RETURN(0);
182 }
183
184 __u64 lov_merge_size(struct lov_stripe_md *lsm, int kms);
185 __u64 lov_merge_mtime(struct lov_stripe_md *lsm, __u64 current_time);
186
187 /* NB: lov_merge_size will prefer locally cached writes if they extend the
188  * file (because it prefers KMS over RSS when larger) */
189 int llu_glimpse_size(struct inode *inode, struct ost_lvb *lvb)
190 {
191         struct llu_inode_info *lli = llu_i2info(inode);
192         struct llu_sb_info *sbi = llu_i2sbi(inode);
193         ldlm_policy_data_t policy = { .l_extent = { 0, OBD_OBJECT_EOF } };
194         struct lustre_handle lockh;
195         int rc, flags = LDLM_FL_HAS_INTENT;
196         ENTRY;
197
198         CDEBUG(D_DLMTRACE, "Glimpsing inode %lu\n", lli->lli_st_ino);
199
200         rc = obd_enqueue(sbi->ll_osc_exp, lli->lli_smd, LDLM_EXTENT, &policy,
201                          LCK_PR, &flags, llu_extent_lock_callback,
202                          ldlm_completion_ast, llu_glimpse_callback, inode,
203                          sizeof(*lvb), lustre_swab_ost_lvb, &lockh);
204         if (rc > 0)
205                 RETURN(-EIO);
206
207         lvb->lvb_size = lov_merge_size(lli->lli_smd, 0);
208         //inode->i_mtime = lov_merge_mtime(lli->lli_smd, inode->i_mtime);
209
210         CDEBUG(D_DLMTRACE, "glimpse: size: "LPU64"\n", lvb->lvb_size);
211
212         obd_cancel(sbi->ll_osc_exp, lli->lli_smd, LCK_PR, &lockh);
213
214         RETURN(rc);
215 }
216
217 int llu_extent_lock(struct ll_file_data *fd, struct inode *inode,
218                     struct lov_stripe_md *lsm, int mode,
219                     ldlm_policy_data_t *policy, struct lustre_handle *lockh,
220                     int ast_flags)
221 {
222         struct llu_sb_info *sbi = llu_i2sbi(inode);
223         struct llu_inode_info *lli = llu_i2info(inode);
224         int rc;
225         ENTRY;
226
227         LASSERT(lockh->cookie == 0);
228
229         /* XXX phil: can we do this?  won't it screw the file size up? */
230         if ((fd && (fd->fd_flags & LL_FILE_IGNORE_LOCK)) ||
231             (sbi->ll_flags & LL_SBI_NOLCK))
232                 RETURN(0);
233
234         CDEBUG(D_DLMTRACE, "Locking inode %lu, start "LPU64" end "LPU64"\n",
235                lli->lli_st_ino, policy->l_extent.start, policy->l_extent.end);
236
237         rc = obd_enqueue(sbi->ll_osc_exp, lsm, LDLM_EXTENT, policy, mode,
238                          &ast_flags, llu_extent_lock_callback,
239                          ldlm_completion_ast, llu_glimpse_callback, inode,
240                          sizeof(struct ost_lvb), lustre_swab_ost_lvb, lockh);
241         if (rc > 0)
242                 rc = -EIO;
243
244         if (policy->l_extent.start == 0 &&
245             policy->l_extent.end == OBD_OBJECT_EOF)
246                 lli->lli_st_size = lov_merge_size(lsm, 1);
247
248         //inode->i_mtime = lov_merge_mtime(lsm, inode->i_mtime);
249
250         RETURN(rc);
251 }
252
253 #if 0
254 int llu_extent_lock_no_validate(struct ll_file_data *fd,
255                                 struct inode *inode,
256                                 struct lov_stripe_md *lsm,
257                                 int mode,
258                                 struct ldlm_extent *extent,
259                                 struct lustre_handle *lockh,
260                                 int ast_flags)
261 {
262         struct llu_sb_info *sbi = llu_i2sbi(inode);
263         struct llu_inode_info *lli = llu_i2info(inode);
264         int rc;
265         ENTRY;
266
267         LASSERT(lockh->cookie == 0);
268
269         /* XXX phil: can we do this?  won't it screw the file size up? */
270         if ((fd && (fd->fd_flags & LL_FILE_IGNORE_LOCK)) ||
271             (sbi->ll_flags & LL_SBI_NOLCK))
272                 RETURN(0);
273
274         CDEBUG(D_DLMTRACE, "Locking inode %lu, start "LPU64" end "LPU64"\n",
275                lli->lli_st_ino, extent->start, extent->end);
276
277         rc = obd_enqueue(sbi->ll_osc_exp, lsm, NULL, LDLM_EXTENT, extent,
278                          sizeof(extent), mode, &ast_flags,
279                          llu_extent_lock_callback, inode, lockh);
280
281         RETURN(rc);
282 }
283
284 /*
285  * this grabs a lock and manually implements behaviour that makes it look like
286  * the OST is returning the file size with each lock acquisition.
287  */
288 int llu_extent_lock(struct ll_file_data *fd, struct inode *inode,
289                     struct lov_stripe_md *lsm, int mode,
290                     struct ldlm_extent *extent, struct lustre_handle *lockh)
291 {
292         struct llu_inode_info *lli = llu_i2info(inode);
293         struct obd_export *exp = llu_i2obdexp(inode);
294         struct ldlm_extent size_lock;
295         struct lustre_handle match_lockh = {0};
296         int flags, rc, matched;
297         ENTRY;
298
299         rc = llu_extent_lock_no_validate(fd, inode, lsm, mode, extent, lockh, 0);
300         if (rc != ELDLM_OK)
301                 RETURN(rc);
302
303         if (test_bit(LLI_F_HAVE_OST_SIZE_LOCK, &lli->lli_flags))
304                 RETURN(0);
305
306         rc = llu_inode_getattr(inode, lsm);
307         if (rc) {
308                 llu_extent_unlock(fd, inode, lsm, mode, lockh);
309                 RETURN(rc);
310         }
311
312         size_lock.start = lli->lli_st_size;
313         size_lock.end = OBD_OBJECT_EOF;
314
315         /* XXX I bet we should be checking the lock ignore flags.. */
316         /* FIXME use LDLM_FL_TEST_LOCK instead */
317         flags = LDLM_FL_CBPENDING | LDLM_FL_BLOCK_GRANTED;
318         matched = obd_match(exp, lsm, LDLM_EXTENT, &size_lock,
319                             sizeof(size_lock), LCK_PR, &flags, inode,
320                             &match_lockh);
321
322         /* hey, alright, we hold a size lock that covers the size we
323          * just found, its not going to change for a while.. */
324         if (matched == 1) {
325                 set_bit(LLI_F_HAVE_OST_SIZE_LOCK, &lli->lli_flags);
326                 obd_cancel(exp, lsm, LCK_PR, &match_lockh);
327         }
328
329         RETURN(0);
330 }
331 #endif
332
333 int llu_extent_unlock(struct ll_file_data *fd, struct inode *inode,
334                 struct lov_stripe_md *lsm, int mode,
335                 struct lustre_handle *lockh)
336 {
337         struct llu_sb_info *sbi = llu_i2sbi(inode);
338         int rc;
339         ENTRY;
340
341         /* XXX phil: can we do this?  won't it screw the file size up? */
342         if ((fd && (fd->fd_flags & LL_FILE_IGNORE_LOCK)) ||
343             (sbi->ll_flags & LL_SBI_NOLCK))
344                 RETURN(0);
345
346         rc = obd_cancel(sbi->ll_osc_exp, lsm, mode, lockh);
347
348         RETURN(rc);
349 }
350
351 #define LLAP_MAGIC 12346789
352
353 struct ll_async_page {
354         int             llap_magic;
355         void           *llap_cookie;
356         int             llap_queued;
357         struct page    *llap_page;
358         struct inode   *llap_inode;
359 };
360
361 static struct ll_async_page *llap_from_cookie(void *cookie)
362 {
363         struct ll_async_page *llap = cookie;
364         if (llap->llap_magic != LLAP_MAGIC)
365                 return ERR_PTR(-EINVAL);
366         return llap;
367 };
368
369 static void llu_ap_fill_obdo(void *data, int cmd, struct obdo *oa)
370 {
371         struct ll_async_page *llap;
372         struct inode *inode;
373         struct lov_stripe_md *lsm;
374         obd_flag valid_flags;
375         ENTRY;
376
377         llap = llap_from_cookie(data);
378         if (IS_ERR(llap)) {
379                 EXIT;
380                 return;
381         }
382
383         inode = llap->llap_inode;
384         lsm = llu_i2info(inode)->lli_smd;
385
386         oa->o_id = lsm->lsm_object_id;
387         oa->o_valid = OBD_MD_FLID;
388         valid_flags = OBD_MD_FLTYPE | OBD_MD_FLATIME;
389         if (cmd == OBD_BRW_WRITE)
390                 valid_flags |= OBD_MD_FLMTIME | OBD_MD_FLCTIME;
391
392         obdo_from_inode(oa, inode, valid_flags);
393         EXIT;
394 }
395
396 /* called for each page in a completed rpc.*/
397 static void llu_ap_completion(void *data, int cmd, int rc)
398 {
399         struct ll_async_page *llap;
400         struct page *page;
401
402         llap = llap_from_cookie(data);
403         if (IS_ERR(llap)) {
404                 EXIT;
405                 return;
406         }
407
408         llap->llap_queued = 0;
409         page = llap->llap_page;
410
411         if (rc != 0) {
412                 if (cmd == OBD_BRW_WRITE)
413                         CERROR("writeback error on page %p index %ld: %d\n", 
414                                page, page->index, rc);
415         }
416         EXIT;
417 }
418
419 static struct obd_async_page_ops llu_async_page_ops = {
420         .ap_make_ready =        NULL,
421         .ap_refresh_count =     NULL,
422         .ap_fill_obdo =         llu_ap_fill_obdo,
423         .ap_completion =        llu_ap_completion,
424 };
425
426 static
427 struct llu_sysio_cookie* get_sysio_cookie(struct inode *inode, int maxpages)
428 {
429         struct llu_sysio_cookie *cookie;
430         int rc;
431
432         OBD_ALLOC(cookie, LLU_SYSIO_COOKIE_SIZE(maxpages));
433         if (cookie == NULL)
434                 goto out;
435
436         I_REF(inode);
437         cookie->lsc_inode = inode;
438         cookie->lsc_maxpages = maxpages;
439         cookie->lsc_llap = (struct ll_async_page *)(cookie + 1);
440         cookie->lsc_pages = (struct page *) (cookie->lsc_llap + maxpages);
441
442         rc = oig_init(&cookie->lsc_oig);
443         if (rc) {
444                 OBD_FREE(cookie, LLU_SYSIO_COOKIE_SIZE(maxpages));
445                 cookie = NULL;
446         }
447
448 out:
449         return cookie;
450 }
451
452 static
453 void put_sysio_cookie(struct llu_sysio_cookie *cookie)
454 {
455         struct lov_stripe_md *lsm = llu_i2info(cookie->lsc_inode)->lli_smd;
456         struct obd_export *exp = llu_i2obdexp(cookie->lsc_inode);
457         struct ll_async_page *llap = cookie->lsc_llap;
458 #ifdef LIBLUSTRE_HANDLE_UNALIGNED_PAGE
459         struct page *pages = cookie->lsc_pages;
460 #endif
461         int i;
462
463         for (i = 0; i< cookie->lsc_maxpages; i++) {
464                 if (llap[i].llap_cookie)
465                         obd_teardown_async_page(exp, lsm, NULL,
466                                                 llap[i].llap_cookie);
467 #ifdef LIBLUSTRE_HANDLE_UNALIGNED_PAGE
468                 if (pages[i]._managed) {
469                         free(pages[i].addr);
470                         pages[i]._managed = 0;
471                 }
472 #endif
473         }
474
475         I_RELE(cookie->lsc_inode);
476
477         oig_release(cookie->lsc_oig);
478         OBD_FREE(cookie, LLU_SYSIO_COOKIE_SIZE(cookie->lsc_maxpages));
479 }
480
481 #ifdef LIBLUSTRE_HANDLE_UNALIGNED_PAGE
482 /* Note: these code should be removed finally, don't need
483  * more cleanup
484  */
485 static
486 int prepare_unaligned_write(struct llu_sysio_cookie *cookie)
487 {
488         struct inode *inode = cookie->lsc_inode;
489         struct llu_inode_info *lli = llu_i2info(inode);
490         struct lov_stripe_md *lsm = lli->lli_smd;
491         struct obdo oa;
492         struct page *pages = cookie->lsc_pages;
493         int i, pgidx[2] = {0, cookie->lsc_npages-1};
494         int rc;
495         ENTRY;
496
497         for (i = 0; i < 2; i++) {
498                 struct page *oldpage = &pages[pgidx[i]];
499                 struct page newpage;
500                 struct brw_page pg;
501                 char *newbuf;
502
503                 if (i == 0 && pgidx[0] == pgidx[1])
504                         continue;
505
506                 LASSERT(oldpage->_offset + oldpage->_count <= PAGE_CACHE_SIZE);
507
508                 if (oldpage->_count == PAGE_CACHE_SIZE)
509                         continue;
510
511                 if (oldpage->index << PAGE_CACHE_SHIFT >=
512                     lli->lli_st_size)
513                         continue;
514
515                 newbuf = malloc(PAGE_CACHE_SIZE);
516                 if (!newbuf)
517                         return -ENOMEM;
518
519                 newpage.index = oldpage->index;
520                 newpage.addr = newbuf;
521
522                 pg.pg = &newpage;
523                 pg.off = ((obd_off)newpage.index << PAGE_CACHE_SHIFT);
524                 if (pg.off + PAGE_CACHE_SIZE > lli->lli_st_size)
525                         pg.count = lli->lli_st_size % PAGE_CACHE_SIZE;
526                 else
527                         pg.count = PAGE_CACHE_SIZE;
528                 pg.flag = 0;
529
530                 oa.o_id = lsm->lsm_object_id;
531                 oa.o_mode = lli->lli_st_mode;
532                 oa.o_valid = OBD_MD_FLID | OBD_MD_FLMODE | OBD_MD_FLTYPE;
533
534                 /* issue read */
535                 rc = obd_brw(OBD_BRW_READ, llu_i2obdexp(inode), &oa, lsm, 1, &pg, NULL);
536                 if (rc) {
537                         free(newbuf);
538                         RETURN(rc);
539                 }
540
541                 /* copy page content, and reset page params */
542                 memcpy(newbuf + oldpage->_offset,
543                        (char*)oldpage->addr + oldpage->_offset,
544                        oldpage->_count);
545
546                 oldpage->addr = newbuf;
547                 if ((((obd_off)oldpage->index << PAGE_CACHE_SHIFT) +
548                     oldpage->_offset + oldpage->_count) > lli->lli_st_size)
549                         oldpage->_count += oldpage->_offset;
550                 else
551                         oldpage->_count = PAGE_CACHE_SIZE;
552                 oldpage->_offset = 0;
553                 oldpage->_managed = 1;
554         }
555
556         RETURN(0);
557 }
558 #endif
559
560 static
561 int llu_prep_async_io(struct llu_sysio_cookie *cookie, int cmd,
562                       char *buf, loff_t pos, size_t count)
563 {
564         struct llu_inode_info *lli = llu_i2info(cookie->lsc_inode);
565         struct lov_stripe_md *lsm = lli->lli_smd;
566         struct obd_export *exp = llu_i2obdexp(cookie->lsc_inode);
567         struct page *pages = cookie->lsc_pages;
568         struct ll_async_page *llap = cookie->lsc_llap;
569         int i, rc, npages = 0;
570         ENTRY;
571
572         if (!exp)
573                 RETURN(-EINVAL);
574
575         /* prepare the pages array */
576         do {
577                 unsigned long index, offset, bytes;
578
579                 offset = (pos & ~PAGE_CACHE_MASK);
580                 index = pos >> PAGE_CACHE_SHIFT;
581                 bytes = PAGE_CACHE_SIZE - offset;
582                 if (bytes > count)
583                         bytes = count;
584
585                 /* prevent read beyond file range */
586                 if ((cmd == OBD_BRW_READ) &&
587                     (pos + bytes) >= lli->lli_st_size) {
588                         if (pos >= lli->lli_st_size)
589                                 break;
590                         bytes = lli->lli_st_size - pos;
591                 }
592
593                 /* prepare page for this index */
594                 pages[npages].index = index;
595                 pages[npages].addr = buf - offset;
596
597                 pages[npages]._offset = offset;
598                 pages[npages]._count = bytes;
599
600                 npages++;
601                 count -= bytes;
602                 pos += bytes;
603                 buf += bytes;
604
605                 cookie->lsc_rwcount += bytes;
606         } while (count);
607
608         cookie->lsc_npages = npages;
609
610 #ifdef LIBLUSTRE_HANDLE_UNALIGNED_PAGE
611         if (cmd == OBD_BRW_WRITE) {
612                 rc = prepare_unaligned_write(cookie);
613                 if (rc)
614                         RETURN(rc);
615         }
616 #endif
617
618         for (i = 0; i < npages; i++) {
619                 llap[i].llap_magic = LLAP_MAGIC;
620                 rc = obd_prep_async_page(exp, lsm, NULL, &pages[i],
621                                          (obd_off)pages[i].index << PAGE_SHIFT,
622                                          &llu_async_page_ops,
623                                          &llap[i], &llap[i].llap_cookie);
624                 if (rc) {
625                         llap[i].llap_cookie = NULL;
626                         RETURN(rc);
627                 }
628                 CDEBUG(D_CACHE, "llap %p page %p cookie %p obj off "LPU64"\n",
629                        &llap[i], &pages[i], llap[i].llap_cookie,
630                        (obd_off)pages[i].index << PAGE_SHIFT);
631                 pages[i].private = (unsigned long)&llap[i];
632                 llap[i].llap_page = &pages[i];
633                 llap[i].llap_inode = cookie->lsc_inode;
634
635                 rc = obd_queue_group_io(exp, lsm, NULL, cookie->lsc_oig,
636                                         llap[i].llap_cookie, cmd,
637                                         pages[i]._offset, pages[i]._count, 0,
638                                         ASYNC_READY | ASYNC_URGENT |
639                                         ASYNC_COUNT_STABLE | ASYNC_GROUP_SYNC);
640                 if (rc)
641                         RETURN(rc);
642
643                 llap[i].llap_queued = 1;
644         }
645
646         RETURN(0);
647 }
648
649 static
650 int llu_start_async_io(struct llu_sysio_cookie *cookie)
651 {
652         struct lov_stripe_md *lsm = llu_i2info(cookie->lsc_inode)->lli_smd;
653         struct obd_export *exp = llu_i2obdexp(cookie->lsc_inode);
654
655         return obd_trigger_group_io(exp, lsm, NULL, cookie->lsc_oig);
656 }
657
658 /*
659  * read/write a continuous buffer for an inode (zero-copy)
660  */
661 struct llu_sysio_cookie*
662 llu_rw(int cmd, struct inode *inode, char *buf, size_t count, loff_t pos)
663 {
664         struct llu_sysio_cookie *cookie;
665         int max_pages, rc;
666         ENTRY;
667
668         max_pages = (count >> PAGE_SHIFT) + 2;
669
670         cookie = get_sysio_cookie(inode, max_pages);
671         if (!cookie)
672                 RETURN(ERR_PTR(-ENOMEM));
673
674         rc = llu_prep_async_io(cookie, cmd, buf, pos, count);
675         if (rc)
676                 GOTO(out_cleanup, rc);
677
678         rc = llu_start_async_io(cookie);
679         if (rc)
680                 GOTO(out_cleanup, rc);
681
682 /*
683         rc = oig_wait(&oig);
684         if (rc) {
685                 CERROR("file i/o error!\n");
686                 rw_count = rc;
687         }
688 */
689         RETURN(cookie);
690
691 out_cleanup:
692         put_sysio_cookie(cookie);
693         RETURN(ERR_PTR(rc));
694 }
695
696 void lov_increase_kms(struct obd_export *exp, struct lov_stripe_md *lsm,
697                       obd_off size);
698
699 struct llu_sysio_callback_args*
700 llu_file_write(struct inode *inode, const struct iovec *iovec,
701                size_t iovlen, loff_t pos)
702 {
703         struct llu_inode_info *lli = llu_i2info(inode);
704         struct ll_file_data *fd = lli->lli_file_data;
705         struct lustre_handle lockh = {0};
706         struct lov_stripe_md *lsm = lli->lli_smd;
707         struct obd_export *exp = NULL;
708         ldlm_policy_data_t policy;
709         struct llu_sysio_callback_args *lsca;
710         struct llu_sysio_cookie *cookie;
711         ldlm_error_t err;
712         int iovidx;
713         ENTRY;
714
715         /* XXX consider other types later */
716         if (!S_ISREG(lli->lli_st_mode))
717                 LBUG();
718
719         LASSERT(iovlen <= MAX_IOVEC);
720
721         exp = llu_i2obdexp(inode);
722         if (exp == NULL)
723                 RETURN(ERR_PTR(-EINVAL));
724
725         OBD_ALLOC(lsca, sizeof(*lsca));
726         if (!lsca)
727                 RETURN(ERR_PTR(-ENOMEM));
728
729         /* FIXME optimize the following extent locking */
730         for (iovidx = 0; iovidx < iovlen; iovidx++) {
731                 char *buf = (char*)iovec[iovidx].iov_base;
732                 size_t count = iovec[iovidx].iov_len;
733
734                 if (count == 0)
735                         continue;
736
737                 if (pos + count > lli->lli_maxbytes)
738                         GOTO(err_out, err = -ERANGE);
739
740                 /* FIXME libsysio haven't handle O_APPEND?? */
741                 policy.l_extent.start = pos;
742                 policy.l_extent.end = pos + count - 1;
743
744                 err = llu_extent_lock(fd, inode, lsm, LCK_PW, &policy,
745                                       &lockh, 0);
746                 if (err != ELDLM_OK)
747                         GOTO(err_out, err = -ENOLCK);
748
749                 CDEBUG(D_INFO, "Writing inode %lu, "LPSZ" bytes, offset %Lu\n",
750                        lli->lli_st_ino, count, pos);
751
752                 cookie = llu_rw(OBD_BRW_WRITE, inode, buf, count, pos);
753                 if (!IS_ERR(cookie)) {
754                         /* save cookie */
755                         lsca->cookies[lsca->ncookies++] = cookie;
756                         pos += count;
757                         lov_increase_kms(exp, lsm, pos);
758                         /* file size grow */
759                         if (pos > lli->lli_st_size)
760                                 lli->lli_st_size = pos;
761                 } else {
762                         llu_extent_unlock(fd, inode, lsm, LCK_PW, &lockh);
763                         GOTO(err_out, err = PTR_ERR(cookie));
764                 }
765
766                 /* XXX errors? */
767                 err = llu_extent_unlock(fd, inode, lsm, LCK_PW, &lockh);
768                 if (err)
769                         CERROR("extent unlock error %d\n", err);
770         }
771
772         RETURN(lsca);
773
774 err_out:
775         /* teardown all async stuff */
776         while (lsca->ncookies--) {
777                 put_sysio_cookie(lsca->cookies[lsca->ncookies]);
778         }
779         OBD_FREE(lsca, sizeof(*lsca));
780
781         RETURN(ERR_PTR(err));
782 }
783
784 #if 0
785 static void llu_update_atime(struct inode *inode)
786 {
787         struct llu_inode_info *lli = llu_i2info(inode);
788
789 #ifdef USE_ATIME
790         struct iattr attr;
791
792         attr.ia_atime = LTIME_S(CURRENT_TIME);
793         attr.ia_valid = ATTR_ATIME;
794
795         if (lli->lli_st_atime == attr.ia_atime) return;
796         if (IS_RDONLY(inode)) return;
797         if (IS_NOATIME(inode)) return;
798
799         /* ll_inode_setattr() sets inode->i_atime from attr.ia_atime */
800         llu_inode_setattr(inode, &attr, 0);
801 #else
802         /* update atime, but don't explicitly write it out just this change */
803         inode->i_atime = CURRENT_TIME;
804 #endif
805 }
806 #endif
807
808 struct llu_sysio_callback_args*
809 llu_file_read(struct inode *inode, const struct iovec *iovec,
810               size_t iovlen, loff_t pos)
811 {
812         struct llu_inode_info *lli = llu_i2info(inode);
813         struct ll_file_data *fd = lli->lli_file_data;
814         struct lov_stripe_md *lsm = lli->lli_smd;
815         struct lustre_handle lockh = { 0 };
816         ldlm_policy_data_t policy;
817         struct llu_sysio_callback_args *lsca;
818         struct llu_sysio_cookie *cookie;
819         __u64 kms;
820         int iovidx;
821
822         ldlm_error_t err;
823         ENTRY;
824
825         OBD_ALLOC(lsca, sizeof(*lsca));
826         if (!lsca)
827                 RETURN(ERR_PTR(-ENOMEM));
828
829         for (iovidx = 0; iovidx < iovlen; iovidx++) {
830                 char *buf = iovec[iovidx].iov_base;
831                 size_t count = iovec[iovidx].iov_len;
832
833                 /* "If nbyte is 0, read() will return 0 and have no other results."
834                  *                      -- Single Unix Spec */
835                 if (count == 0)
836                         continue;
837
838                 policy.l_extent.start = pos;
839                 policy.l_extent.end = pos + count - 1;
840
841                 err = llu_extent_lock(fd, inode, lsm, LCK_PR, &policy, &lockh, 0);
842                 if (err != ELDLM_OK)
843                         GOTO(err_out, err = -ENOLCK);
844
845                 kms = lov_merge_size(lsm, 1);
846                 if (policy.l_extent.end > kms) {
847                         /* A glimpse is necessary to determine whether we
848                          * return a short read or some zeroes at the end of
849                          * the buffer */
850                         struct ost_lvb lvb;
851                         if (llu_glimpse_size(inode, &lvb)) {
852                                 llu_extent_unlock(fd, inode, lsm, LCK_PR, &lockh);
853                                 GOTO(err_out, err = -ENOLCK);
854                         }
855                         lli->lli_st_size = lvb.lvb_size;
856                 } else {
857                         lli->lli_st_size = kms;
858                 }
859
860                 CDEBUG(D_INFO, "Reading inode %lu, "LPSZ" bytes, offset %Ld, "
861                        "i_size "LPU64"\n", lli->lli_st_ino, count, pos,
862                        lli->lli_st_size);
863
864                 if (pos >= lli->lli_st_size) {
865                         llu_extent_unlock(fd, inode, lsm, LCK_PR, &lockh);
866                         break;
867                 }
868
869                 cookie = llu_rw(OBD_BRW_READ, inode, buf, count, pos);
870                 if (!IS_ERR(cookie)) {
871                         /* save cookie */
872                         lsca->cookies[lsca->ncookies++] = cookie;
873                         pos += count;
874                 } else {
875                         llu_extent_unlock(fd, inode, lsm, LCK_PR, &lockh);
876                         GOTO(err_out, err = PTR_ERR(cookie));
877                 }
878
879                 /* XXX errors? */
880                 err = llu_extent_unlock(fd, inode, lsm, LCK_PR, &lockh);
881                 if (err)
882                         CERROR("extent_unlock fail: %d\n", err);
883         }
884 #if 0
885         if (readed > 0)
886                 llu_update_atime(inode);
887 #endif
888         RETURN(lsca);
889
890 err_out:
891         /* teardown all async stuff */
892         while (lsca->ncookies--) {
893                 put_sysio_cookie(lsca->cookies[lsca->ncookies]);
894         }
895         OBD_FREE(lsca, sizeof(*lsca));
896
897         RETURN(ERR_PTR(err));
898 }
899
900 int llu_iop_iodone(struct ioctx *ioctxp)
901 {
902         struct llu_sysio_callback_args *lsca = ioctxp->ioctx_private;
903         struct llu_sysio_cookie *cookie;
904         int i, err = 0, rc = 0;
905         ENTRY;
906
907         /* write/read(fd, buf, 0) */
908         if (!lsca) {
909                 ioctxp->ioctx_cc = 0;
910                 RETURN(1);
911         }
912
913         LASSERT(!IS_ERR(lsca));
914
915         for (i = 0; i < lsca->ncookies; i++) {
916                 cookie = lsca->cookies[i];
917                 if (cookie) {
918                         err = oig_wait(cookie->lsc_oig);
919                         if (err && !rc)
920                                 rc = err;
921                         if (!rc)
922                                 ioctxp->ioctx_cc += cookie->lsc_rwcount;
923                         put_sysio_cookie(cookie);
924                 }
925         }
926
927         if (rc) {
928                 LASSERT(rc < 0);
929                 ioctxp->ioctx_cc = -1;
930                 ioctxp->ioctx_errno = -rc;
931         }
932
933         OBD_FREE(lsca, sizeof(*lsca));
934         ioctxp->ioctx_private = NULL;
935
936         RETURN(1);
937 }