Whamcloud - gitweb
liblustre:
[fs/lustre-release.git] / lustre / liblustre / rw.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Lustre Light Super operations
5  *
6  *  Copyright (c) 2002, 2003 Cluster File Systems, Inc.
7  *
8  *   This file is part of Lustre, http://www.lustre.org.
9  *
10  *   Lustre is free software; you can redistribute it and/or
11  *   modify it under the terms of version 2 of the GNU General Public
12  *   License as published by the Free Software Foundation.
13  *
14  *   Lustre is distributed in the hope that it will be useful,
15  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
16  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17  *   GNU General Public License for more details.
18  *
19  *   You should have received a copy of the GNU General Public License
20  *   along with Lustre; if not, write to the Free Software
21  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
22  */
23
24 #define DEBUG_SUBSYSTEM S_LLITE
25
26 #include <stdlib.h>
27 #include <string.h>
28 #include <assert.h>
29 #include <time.h>
30 #include <sys/types.h>
31 #include <sys/queue.h>
32
33 #include <sysio.h>
34 #include <fs.h>
35 #include <mount.h>
36 #include <inode.h>
37 #include <file.h>
38
39 #undef LIST_HEAD
40
41 #include "llite_lib.h"
42
43 static int llu_lock_to_stripe_offset(struct inode *inode, struct ldlm_lock *lock)
44 {
45         struct llu_inode_info *lli = llu_i2info(inode);
46         struct lov_stripe_md *lsm = lli->lli_smd;
47         struct obd_export *exp = llu_i2obdexp(inode);
48         struct {
49                 char name[16];
50                 struct ldlm_lock *lock;
51                 struct lov_stripe_md *lsm;
52         } key = { .name = "lock_to_stripe", .lock = lock, .lsm = lsm };
53         __u32 stripe, vallen = sizeof(stripe);
54         int rc;
55         ENTRY;
56
57         if (lsm->lsm_stripe_count == 1)
58                 RETURN(0);
59
60         /* get our offset in the lov */
61         rc = obd_get_info(exp, sizeof(key), &key, &vallen, &stripe);
62         if (rc != 0) {
63                 CERROR("obd_get_info: rc = %d\n", rc);
64                 LBUG();
65         }
66         LASSERT(stripe < lsm->lsm_stripe_count);
67         RETURN(stripe);
68 }
69
70 static int llu_extent_lock_callback(struct ldlm_lock *lock,
71                                     struct ldlm_lock_desc *new, void *data,
72                                     int flag)
73 {
74         struct lustre_handle lockh = { 0 };
75         int rc;
76         ENTRY;
77         
78
79         if ((unsigned long)data > 0 && (unsigned long)data < 0x1000) {
80                 LDLM_ERROR(lock, "cancelling lock with bad data %p", data);
81                 LBUG();
82         }
83         
84         switch (flag) {
85         case LDLM_CB_BLOCKING:
86                 ldlm_lock2handle(lock, &lockh);
87                 rc = ldlm_cli_cancel(&lockh);
88                 if (rc != ELDLM_OK)
89                         CERROR("ldlm_cli_cancel failed: %d\n", rc);
90                 break;
91         case LDLM_CB_CANCELING: {
92                 struct inode *inode = llu_inode_from_lock(lock);
93                 struct llu_inode_info *lli;
94                 struct lov_stripe_md *lsm;
95                 __u32 stripe;
96                 __u64 kms;
97                 
98                 if (!inode)
99                         RETURN(0);
100                 lli= llu_i2info(inode);
101                 if (!lli)
102                         goto iput;
103                 if (!lli->lli_smd)
104                         goto iput;
105                 lsm = lli->lli_smd;
106
107                 stripe = llu_lock_to_stripe_offset(inode, lock);
108                 kms = ldlm_extent_shift_kms(lock,
109                                             lsm->lsm_oinfo[stripe].loi_kms);
110                 if (lsm->lsm_oinfo[stripe].loi_kms != kms)
111                         LDLM_DEBUG(lock, "updating kms from "LPU64" to "LPU64,
112                                    lsm->lsm_oinfo[stripe].loi_kms, kms);
113                 lsm->lsm_oinfo[stripe].loi_kms = kms;
114 iput:
115                 I_RELE(inode);
116                 break;
117         }
118         default:
119                 LBUG();
120         }
121         
122         RETURN(0);
123 }
124
125 static int llu_glimpse_callback(struct ldlm_lock *lock, void *reqp)
126 {
127         struct ptlrpc_request *req = reqp;
128         struct inode *inode = llu_inode_from_lock(lock);
129         struct obd_export *exp;
130         struct llu_inode_info *lli;
131         struct ost_lvb *lvb;
132         struct {
133                 int stripe_number;
134                 __u64 size;
135                 struct lov_stripe_md *lsm;
136         } data;
137         __u32 vallen = sizeof(data);
138         int rc, size = sizeof(*lvb);
139         ENTRY;
140
141         if (inode == NULL)
142                 RETURN(0);
143         lli = llu_i2info(inode);
144         if (lli == NULL)
145                 goto iput;
146         if (lli->lli_smd == NULL)
147                 goto iput;
148         exp = llu_i2obdexp(inode);
149
150         /* First, find out which stripe index this lock corresponds to. */
151         if (lli->lli_smd->lsm_stripe_count > 1)
152                 data.stripe_number = llu_lock_to_stripe_offset(inode, lock);
153         else
154                 data.stripe_number = 0;
155
156         data.size = lli->lli_st_size;
157         data.lsm = lli->lli_smd;
158
159         rc = obd_get_info(exp, strlen("size_to_stripe"), "size_to_stripe",
160                           &vallen, &data);
161         if (rc != 0) {
162                 CERROR("obd_get_info: rc = %d\n", rc);
163                 LBUG();
164         }
165
166         LDLM_DEBUG(lock, "i_size: %Lu -> stripe number %d -> size %Lu",
167                    lli->lli_st_size, data.stripe_number, data.size);
168
169         rc = lustre_pack_reply(req, 1, &size, NULL);
170         if (rc) {
171                 CERROR("lustre_pack_reply: %d\n", rc);
172                 goto iput;
173         }
174
175         lvb = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*lvb));
176         lvb->lvb_size = data.size;
177         ptlrpc_reply(req);
178
179  iput:
180         I_RELE(inode);
181         RETURN(0);
182 }
183
184 __u64 lov_merge_size(struct lov_stripe_md *lsm, int kms);
185 __u64 lov_merge_mtime(struct lov_stripe_md *lsm, __u64 current_time);
186
187 /* NB: lov_merge_size will prefer locally cached writes if they extend the
188  * file (because it prefers KMS over RSS when larger) */
189 int llu_glimpse_size(struct inode *inode, struct ost_lvb *lvb)
190 {
191         struct llu_inode_info *lli = llu_i2info(inode);
192         struct llu_sb_info *sbi = llu_i2sbi(inode);
193         ldlm_policy_data_t policy = { .l_extent = { 0, OBD_OBJECT_EOF } };
194         struct lustre_handle lockh;
195         int rc, flags = LDLM_FL_HAS_INTENT;
196         ENTRY;
197
198         CDEBUG(D_DLMTRACE, "Glimpsing inode %lu\n", lli->lli_st_ino);
199
200         rc = obd_enqueue(sbi->ll_osc_exp, lli->lli_smd, LDLM_EXTENT, &policy,
201                          LCK_PR, &flags, llu_extent_lock_callback,
202                          ldlm_completion_ast, llu_glimpse_callback, inode,
203                          sizeof(*lvb), lustre_swab_ost_lvb, &lockh);
204         if (rc > 0)
205                 RETURN(-EIO);
206
207         lvb->lvb_size = lov_merge_size(lli->lli_smd, 0);
208         //inode->i_mtime = lov_merge_mtime(lli->lli_smd, inode->i_mtime);
209
210         CDEBUG(D_DLMTRACE, "glimpse: size: "LPU64"\n", lvb->lvb_size);
211
212         obd_cancel(sbi->ll_osc_exp, lli->lli_smd, LCK_PR, &lockh);
213
214         RETURN(rc);
215 }
216
217 int llu_extent_lock(struct ll_file_data *fd, struct inode *inode,
218                     struct lov_stripe_md *lsm, int mode,
219                     ldlm_policy_data_t *policy, struct lustre_handle *lockh,
220                     int ast_flags)
221 {
222         struct llu_sb_info *sbi = llu_i2sbi(inode);
223         struct llu_inode_info *lli = llu_i2info(inode);
224         int rc;
225         ENTRY;
226
227         LASSERT(lockh->cookie == 0);
228
229         /* XXX phil: can we do this?  won't it screw the file size up? */
230         if ((fd && (fd->fd_flags & LL_FILE_IGNORE_LOCK)) ||
231             (sbi->ll_flags & LL_SBI_NOLCK))
232                 RETURN(0);
233
234         CDEBUG(D_DLMTRACE, "Locking inode %lu, start "LPU64" end "LPU64"\n",
235                lli->lli_st_ino, policy->l_extent.start, policy->l_extent.end);
236
237         rc = obd_enqueue(sbi->ll_osc_exp, lsm, LDLM_EXTENT, policy, mode,
238                          &ast_flags, llu_extent_lock_callback,
239                          ldlm_completion_ast, llu_glimpse_callback, inode,
240                          sizeof(struct ost_lvb), lustre_swab_ost_lvb, lockh);
241         if (rc > 0)
242                 rc = -EIO;
243
244         if (policy->l_extent.start == 0 &&
245             policy->l_extent.end == OBD_OBJECT_EOF)
246                 lli->lli_st_size = lov_merge_size(lsm, 1);
247
248         //inode->i_mtime = lov_merge_mtime(lsm, inode->i_mtime);
249
250         RETURN(rc);
251 }
252
253 #if 0
254 int llu_extent_lock_no_validate(struct ll_file_data *fd,
255                                 struct inode *inode,
256                                 struct lov_stripe_md *lsm,
257                                 int mode,
258                                 struct ldlm_extent *extent,
259                                 struct lustre_handle *lockh,
260                                 int ast_flags)
261 {
262         struct llu_sb_info *sbi = llu_i2sbi(inode);
263         struct llu_inode_info *lli = llu_i2info(inode);
264         int rc;
265         ENTRY;
266
267         LASSERT(lockh->cookie == 0);
268
269         /* XXX phil: can we do this?  won't it screw the file size up? */
270         if ((fd && (fd->fd_flags & LL_FILE_IGNORE_LOCK)) ||
271             (sbi->ll_flags & LL_SBI_NOLCK))
272                 RETURN(0);
273
274         CDEBUG(D_DLMTRACE, "Locking inode %lu, start "LPU64" end "LPU64"\n",
275                lli->lli_st_ino, extent->start, extent->end);
276
277         rc = obd_enqueue(sbi->ll_osc_exp, lsm, NULL, LDLM_EXTENT, extent,
278                          sizeof(extent), mode, &ast_flags,
279                          llu_extent_lock_callback, inode, lockh);
280
281         RETURN(rc);
282 }
283
284 /*
285  * this grabs a lock and manually implements behaviour that makes it look like
286  * the OST is returning the file size with each lock acquisition.
287  */
288 int llu_extent_lock(struct ll_file_data *fd, struct inode *inode,
289                     struct lov_stripe_md *lsm, int mode,
290                     struct ldlm_extent *extent, struct lustre_handle *lockh)
291 {
292         struct llu_inode_info *lli = llu_i2info(inode);
293         struct obd_export *exp = llu_i2obdexp(inode);
294         struct ldlm_extent size_lock;
295         struct lustre_handle match_lockh = {0};
296         int flags, rc, matched;
297         ENTRY;
298
299         rc = llu_extent_lock_no_validate(fd, inode, lsm, mode, extent, lockh, 0);
300         if (rc != ELDLM_OK)
301                 RETURN(rc);
302
303         if (test_bit(LLI_F_HAVE_OST_SIZE_LOCK, &lli->lli_flags))
304                 RETURN(0);
305
306         rc = llu_inode_getattr(inode, lsm);
307         if (rc) {
308                 llu_extent_unlock(fd, inode, lsm, mode, lockh);
309                 RETURN(rc);
310         }
311
312         size_lock.start = lli->lli_st_size;
313         size_lock.end = OBD_OBJECT_EOF;
314
315         /* XXX I bet we should be checking the lock ignore flags.. */
316         flags = LDLM_FL_CBPENDING | LDLM_FL_BLOCK_GRANTED;
317         matched = obd_match(exp, lsm, LDLM_EXTENT, &size_lock,
318                             sizeof(size_lock), LCK_PR, &flags, inode,
319                             &match_lockh);
320
321         /* hey, alright, we hold a size lock that covers the size we 
322          * just found, its not going to change for a while.. */
323         if (matched == 1) {
324                 set_bit(LLI_F_HAVE_OST_SIZE_LOCK, &lli->lli_flags);
325                 obd_cancel(exp, lsm, LCK_PR, &match_lockh);
326         } 
327
328         RETURN(0);
329 }
330 #endif
331
332 int llu_extent_unlock(struct ll_file_data *fd, struct inode *inode,
333                 struct lov_stripe_md *lsm, int mode,
334                 struct lustre_handle *lockh)
335 {
336         struct llu_sb_info *sbi = llu_i2sbi(inode);
337         int rc;
338         ENTRY;
339
340         /* XXX phil: can we do this?  won't it screw the file size up? */
341         if ((fd && (fd->fd_flags & LL_FILE_IGNORE_LOCK)) ||
342             (sbi->ll_flags & LL_SBI_NOLCK))
343                 RETURN(0);
344
345         rc = obd_cancel(sbi->ll_osc_exp, lsm, mode, lockh);
346
347         RETURN(rc);
348 }
349
350 #define LLAP_MAGIC 12346789
351
352 struct ll_async_page {
353         int             llap_magic;
354         void           *llap_cookie;
355         int             llap_queued;
356         struct page    *llap_page;
357         struct inode   *llap_inode;
358 };
359
360 static struct ll_async_page *llap_from_cookie(void *cookie)
361 {
362         struct ll_async_page *llap = cookie;
363         if (llap->llap_magic != LLAP_MAGIC)
364                 return ERR_PTR(-EINVAL);
365         return llap;
366 };
367
368 static void llu_ap_fill_obdo(void *data, int cmd, struct obdo *oa)
369 {
370         struct ll_async_page *llap;
371         struct inode *inode;
372         struct lov_stripe_md *lsm;
373         obd_flag valid_flags;
374         ENTRY;
375
376         llap = llap_from_cookie(data);
377         if (IS_ERR(llap)) {
378                 EXIT;
379                 return;
380         }
381
382         inode = llap->llap_inode;
383         lsm = llu_i2info(inode)->lli_smd;
384
385         oa->o_id = lsm->lsm_object_id;
386         oa->o_valid = OBD_MD_FLID;
387         valid_flags = OBD_MD_FLTYPE | OBD_MD_FLATIME;
388         if (cmd == OBD_BRW_WRITE)
389                 valid_flags |= OBD_MD_FLMTIME | OBD_MD_FLCTIME;
390
391         obdo_from_inode(oa, inode, valid_flags);
392         EXIT;
393 }
394
395 /* called for each page in a completed rpc.*/
396 static void llu_ap_completion(void *data, int cmd, int rc)
397 {
398         struct ll_async_page *llap;
399         struct page *page;
400
401         llap = llap_from_cookie(data);
402         if (IS_ERR(llap)) {
403                 EXIT;
404                 return;
405         }
406
407         llap->llap_queued = 0;
408         page = llap->llap_page;
409
410         if (rc != 0) {
411                 if (cmd == OBD_BRW_WRITE)
412                         CERROR("writeback error on page %p index %ld: %d\n", 
413                                page, page->index, rc);
414         }
415         EXIT;
416 }
417
418 static struct obd_async_page_ops llu_async_page_ops = {
419         .ap_make_ready =        NULL,
420         .ap_refresh_count =     NULL,
421         .ap_fill_obdo =         llu_ap_fill_obdo,
422         .ap_completion =        llu_ap_completion,
423 };
424
425 static
426 struct llu_sysio_cookie* get_sysio_cookie(struct inode *inode, int maxpages)
427 {
428         struct llu_sysio_cookie *cookie;
429         int rc;
430
431         OBD_ALLOC(cookie, LLU_SYSIO_COOKIE_SIZE(maxpages));
432         if (cookie == NULL)
433                 goto out;
434
435         I_REF(inode);
436         cookie->lsc_inode = inode;
437         cookie->lsc_maxpages = maxpages;
438         cookie->lsc_llap = (struct ll_async_page *)(cookie + 1);
439         cookie->lsc_pages = (struct page *) (cookie->lsc_llap + maxpages);
440
441         rc = oig_init(&cookie->lsc_oig);
442         if (rc) {
443                 OBD_FREE(cookie, LLU_SYSIO_COOKIE_SIZE(maxpages));
444                 cookie = NULL;
445         }
446
447 out:
448         return cookie;
449 }
450
451 static
452 void put_sysio_cookie(struct llu_sysio_cookie *cookie)
453 {
454         struct lov_stripe_md *lsm = llu_i2info(cookie->lsc_inode)->lli_smd;
455         struct obd_export *exp = llu_i2obdexp(cookie->lsc_inode);
456         struct ll_async_page *llap = cookie->lsc_llap;
457 #ifdef LIBLUSTRE_HANDLE_UNALIGNED_PAGE
458         struct page *pages = cookie->lsc_pages;
459 #endif
460         int i;
461
462         for (i = 0; i< cookie->lsc_maxpages; i++) {
463                 if (llap[i].llap_cookie)
464                         obd_teardown_async_page(exp, lsm, NULL,
465                                                 llap[i].llap_cookie);
466 #ifdef LIBLUSTRE_HANDLE_UNALIGNED_PAGE
467                 if (pages[i]._managed) {
468                         free(pages[i].addr);
469                         pages[i]._managed = 0;
470                 }
471 #endif
472         }
473
474         I_RELE(cookie->lsc_inode);
475
476         oig_release(cookie->lsc_oig);
477         OBD_FREE(cookie, LLU_SYSIO_COOKIE_SIZE(cookie->lsc_maxpages));
478 }
479
480 #ifdef LIBLUSTRE_HANDLE_UNALIGNED_PAGE
481 /* Note: these code should be removed finally, don't need
482  * more cleanup
483  */
484 static
485 int prepare_unaligned_write(struct llu_sysio_cookie *cookie)
486 {
487         struct inode *inode = cookie->lsc_inode;
488         struct llu_inode_info *lli = llu_i2info(inode);
489         struct lov_stripe_md *lsm = lli->lli_smd;
490         struct obdo oa;
491         struct page *pages = cookie->lsc_pages;
492         int i, pgidx[2] = {0, cookie->lsc_npages-1};
493         int rc;
494         ENTRY;
495
496         for (i = 0; i < 2; i++) {
497                 struct page *oldpage = &pages[pgidx[i]];
498                 struct page newpage;
499                 struct brw_page pg;
500                 char *newbuf;
501
502                 if (i == 0 && pgidx[0] == pgidx[1])
503                         continue;
504
505                 LASSERT(oldpage->_offset + oldpage->_count <= PAGE_CACHE_SIZE);
506
507                 if (oldpage->_count == PAGE_CACHE_SIZE)
508                         continue;
509
510                 if (oldpage->index << PAGE_CACHE_SHIFT >=
511                     lli->lli_st_size)
512                         continue;
513
514                 newbuf = malloc(PAGE_CACHE_SIZE);
515                 if (!newbuf)
516                         return -ENOMEM;
517
518                 newpage.index = oldpage->index;
519                 newpage.addr = newbuf;
520
521                 pg.pg = &newpage;
522                 pg.off = ((obd_off)newpage.index << PAGE_CACHE_SHIFT);
523                 if (pg.off + PAGE_CACHE_SIZE > lli->lli_st_size)
524                         pg.count = lli->lli_st_size % PAGE_CACHE_SIZE;
525                 else
526                         pg.count = PAGE_CACHE_SIZE;
527                 pg.flag = 0;
528
529                 oa.o_id = lsm->lsm_object_id;
530                 oa.o_mode = lli->lli_st_mode;
531                 oa.o_valid = OBD_MD_FLID | OBD_MD_FLMODE | OBD_MD_FLTYPE;
532
533                 /* issue read */
534                 rc = obd_brw(OBD_BRW_READ, llu_i2obdexp(inode), &oa, lsm, 1, &pg, NULL);
535                 if (rc) {
536                         free(newbuf);
537                         RETURN(rc);
538                 }
539
540                 /* copy page content, and reset page params */
541                 memcpy(newbuf + oldpage->_offset,
542                        (char*)oldpage->addr + oldpage->_offset,
543                        oldpage->_count);
544
545                 oldpage->addr = newbuf;
546                 if ((((obd_off)oldpage->index << PAGE_CACHE_SHIFT) +
547                     oldpage->_offset + oldpage->_count) > lli->lli_st_size)
548                         oldpage->_count += oldpage->_offset;
549                 else
550                         oldpage->_count = PAGE_CACHE_SIZE;
551                 oldpage->_offset = 0;
552                 oldpage->_managed = 1;
553         }
554
555         RETURN(0);
556 }
557 #endif
558
559 static
560 int llu_prep_async_io(struct llu_sysio_cookie *cookie, int cmd,
561                       char *buf, loff_t pos, size_t count)
562 {
563         struct llu_inode_info *lli = llu_i2info(cookie->lsc_inode);
564         struct lov_stripe_md *lsm = lli->lli_smd;
565         struct obd_export *exp = llu_i2obdexp(cookie->lsc_inode);
566         struct page *pages = cookie->lsc_pages;
567         struct ll_async_page *llap = cookie->lsc_llap;
568         int i, rc, npages = 0;
569         ENTRY;
570
571         if (!exp)
572                 RETURN(-EINVAL);
573
574         /* prepare the pages array */
575         do {
576                 unsigned long index, offset, bytes;
577
578                 offset = (pos & ~PAGE_CACHE_MASK);
579                 index = pos >> PAGE_CACHE_SHIFT;
580                 bytes = PAGE_CACHE_SIZE - offset;
581                 if (bytes > count)
582                         bytes = count;
583
584                 /* prevent read beyond file range */
585                 if ((cmd == OBD_BRW_READ) &&
586                     (pos + bytes) >= lli->lli_st_size) {
587                         if (pos >= lli->lli_st_size)
588                                 break;
589                         bytes = lli->lli_st_size - pos;
590                 }
591
592                 /* prepare page for this index */
593                 pages[npages].index = index;
594                 pages[npages].addr = buf - offset;
595
596                 pages[npages]._offset = offset;
597                 pages[npages]._count = bytes;
598
599                 npages++;
600                 count -= bytes;
601                 pos += bytes;
602                 buf += bytes;
603
604                 cookie->lsc_rwcount += bytes;
605         } while (count);
606
607         cookie->lsc_npages = npages;
608
609 #ifdef LIBLUSTRE_HANDLE_UNALIGNED_PAGE
610         if (cmd == OBD_BRW_WRITE) {
611                 rc = prepare_unaligned_write(cookie);
612                 if (rc)
613                         RETURN(rc);
614         }
615 #endif
616
617         for (i = 0; i < npages; i++) {
618                 llap[i].llap_magic = LLAP_MAGIC;
619                 rc = obd_prep_async_page(exp, lsm, NULL, &pages[i],
620                                          (obd_off)pages[i].index << PAGE_SHIFT,
621                                          &llu_async_page_ops,
622                                          &llap[i], &llap[i].llap_cookie);
623                 if (rc) {
624                         llap[i].llap_cookie = NULL;
625                         RETURN(rc);
626                 }
627                 CDEBUG(D_CACHE, "llap %p page %p cookie %p obj off "LPU64"\n",
628                        &llap[i], &pages[i], llap[i].llap_cookie,
629                        (obd_off)pages[i].index << PAGE_SHIFT);
630                 pages[i].private = (unsigned long)&llap[i];
631                 llap[i].llap_page = &pages[i];
632                 llap[i].llap_inode = cookie->lsc_inode;
633
634                 rc = obd_queue_group_io(exp, lsm, NULL, cookie->lsc_oig,
635                                         llap[i].llap_cookie, cmd,
636                                         pages[i]._offset, pages[i]._count, 0,
637                                         ASYNC_READY | ASYNC_URGENT |
638                                         ASYNC_COUNT_STABLE | ASYNC_GROUP_SYNC);
639                 if (rc)
640                         RETURN(rc);
641
642                 llap[i].llap_queued = 1;
643         }
644
645         RETURN(0);
646 }
647
648 static
649 int llu_start_async_io(struct llu_sysio_cookie *cookie)
650 {
651         struct lov_stripe_md *lsm = llu_i2info(cookie->lsc_inode)->lli_smd;
652         struct obd_export *exp = llu_i2obdexp(cookie->lsc_inode);
653
654         return obd_trigger_group_io(exp, lsm, NULL, cookie->lsc_oig);
655 }
656
657 /*
658  * read/write a continuous buffer for an inode (zero-copy)
659  */
660 struct llu_sysio_cookie*
661 llu_rw(int cmd, struct inode *inode, char *buf, size_t count, loff_t pos)
662 {
663         struct llu_sysio_cookie *cookie;
664         int max_pages, rc;
665         ENTRY;
666
667         max_pages = (count >> PAGE_SHIFT) + 2;
668
669         cookie = get_sysio_cookie(inode, max_pages);
670         if (!cookie)
671                 RETURN(ERR_PTR(-ENOMEM));
672
673         rc = llu_prep_async_io(cookie, cmd, buf, pos, count);
674         if (rc)
675                 GOTO(out_cleanup, rc);
676
677         rc = llu_start_async_io(cookie);
678         if (rc)
679                 GOTO(out_cleanup, rc);
680
681 /*
682         rc = oig_wait(&oig);
683         if (rc) {
684                 CERROR("file i/o error!\n");
685                 rw_count = rc;
686         }
687 */
688         RETURN(cookie);
689
690 out_cleanup:
691         put_sysio_cookie(cookie);
692         RETURN(ERR_PTR(rc));
693 }
694
695 void lov_increase_kms(struct obd_export *exp, struct lov_stripe_md *lsm,
696                       obd_off size);
697
698 struct llu_sysio_callback_args*
699 llu_file_write(struct inode *inode, const struct iovec *iovec,
700                size_t iovlen, loff_t pos)
701 {
702         struct llu_inode_info *lli = llu_i2info(inode);
703         struct ll_file_data *fd = lli->lli_file_data;
704         struct lustre_handle lockh = {0};
705         struct lov_stripe_md *lsm = lli->lli_smd;
706         struct obd_export *exp = NULL;
707         ldlm_policy_data_t policy;
708         struct llu_sysio_callback_args *lsca;
709         struct llu_sysio_cookie *cookie;
710         ldlm_error_t err;
711         int iovidx;
712         ENTRY;
713
714         /* XXX consider other types later */
715         if (!S_ISREG(lli->lli_st_mode))
716                 LBUG();
717
718         LASSERT(iovlen <= MAX_IOVEC);
719
720         exp = llu_i2obdexp(inode);
721         if (exp == NULL)
722                 RETURN(ERR_PTR(-EINVAL));
723
724         OBD_ALLOC(lsca, sizeof(*lsca));
725         if (!lsca)
726                 RETURN(ERR_PTR(-ENOMEM));
727
728         /* FIXME optimize the following extent locking */
729         for (iovidx = 0; iovidx < iovlen; iovidx++) {
730                 char *buf = (char*)iovec[iovidx].iov_base;
731                 size_t count = iovec[iovidx].iov_len;
732
733                 if (count == 0)
734                         continue;
735
736                 if (pos + count > lli->lli_maxbytes)
737                         GOTO(err_out, err = -ERANGE);
738
739                 /* FIXME libsysio haven't handle O_APPEND?? */
740                 policy.l_extent.start = pos;
741                 policy.l_extent.end = pos + count - 1;
742
743                 err = llu_extent_lock(fd, inode, lsm, LCK_PW, &policy,
744                                       &lockh, 0);
745                 if (err != ELDLM_OK)
746                         GOTO(err_out, err = -ENOLCK);
747
748                 CDEBUG(D_INFO, "Writing inode %lu, "LPSZ" bytes, offset %Lu\n",
749                        lli->lli_st_ino, count, pos);
750
751                 cookie = llu_rw(OBD_BRW_WRITE, inode, buf, count, pos);
752                 if (!IS_ERR(cookie)) {
753                         /* save cookie */
754                         lsca->cookies[lsca->ncookies++] = cookie;
755                         pos += count;
756                         lov_increase_kms(exp, lsm, pos);
757                         /* file size grow */
758                         if (pos > lli->lli_st_size)
759                                 lli->lli_st_size = pos;
760                 } else {
761                         llu_extent_unlock(fd, inode, lsm, LCK_PW, &lockh);
762                         GOTO(err_out, err = PTR_ERR(cookie));
763                 }
764
765                 /* XXX errors? */
766                 err = llu_extent_unlock(fd, inode, lsm, LCK_PW, &lockh);
767                 if (err)
768                         CERROR("extent unlock error %d\n", err);
769         }
770
771         RETURN(lsca);
772
773 err_out:
774         /* teardown all async stuff */
775         while (lsca->ncookies--) {
776                 put_sysio_cookie(lsca->cookies[lsca->ncookies]);
777         }
778         OBD_FREE(lsca, sizeof(*lsca));
779
780         RETURN(ERR_PTR(err));
781 }
782
783 #if 0
784 static void llu_update_atime(struct inode *inode)
785 {
786         struct llu_inode_info *lli = llu_i2info(inode);
787
788 #ifdef USE_ATIME
789         struct iattr attr;
790
791         attr.ia_atime = LTIME_S(CURRENT_TIME);
792         attr.ia_valid = ATTR_ATIME;
793
794         if (lli->lli_st_atime == attr.ia_atime) return;
795         if (IS_RDONLY(inode)) return;
796         if (IS_NOATIME(inode)) return;
797
798         /* ll_inode_setattr() sets inode->i_atime from attr.ia_atime */
799         llu_inode_setattr(inode, &attr, 0);
800 #else
801         /* update atime, but don't explicitly write it out just this change */
802         inode->i_atime = CURRENT_TIME;
803 #endif
804 }
805 #endif
806
807 struct llu_sysio_callback_args*
808 llu_file_read(struct inode *inode, const struct iovec *iovec,
809               size_t iovlen, loff_t pos)
810 {
811         struct llu_inode_info *lli = llu_i2info(inode);
812         struct ll_file_data *fd = lli->lli_file_data;
813         struct lov_stripe_md *lsm = lli->lli_smd;
814         struct lustre_handle lockh = { 0 };
815         ldlm_policy_data_t policy;
816         struct llu_sysio_callback_args *lsca;
817         struct llu_sysio_cookie *cookie;
818         __u64 kms;
819         int iovidx;
820
821         ldlm_error_t err;
822         ENTRY;
823
824         OBD_ALLOC(lsca, sizeof(*lsca));
825         if (!lsca)
826                 RETURN(ERR_PTR(-ENOMEM));
827
828         for (iovidx = 0; iovidx < iovlen; iovidx++) {
829                 char *buf = iovec[iovidx].iov_base;
830                 size_t count = iovec[iovidx].iov_len;
831
832                 /* "If nbyte is 0, read() will return 0 and have no other results."
833                  *                      -- Single Unix Spec */
834                 if (count == 0)
835                         continue;
836
837                 policy.l_extent.start = pos;
838                 policy.l_extent.end = pos + count - 1;
839
840                 err = llu_extent_lock(fd, inode, lsm, LCK_PR, &policy, &lockh, 0);
841                 if (err != ELDLM_OK)
842                         GOTO(err_out, err = -ENOLCK);
843
844                 kms = lov_merge_size(lsm, 1);
845                 if (policy.l_extent.end > kms) {
846                         /* A glimpse is necessary to determine whether we
847                          * return a short read or some zeroes at the end of
848                          * the buffer */
849                         struct ost_lvb lvb;
850                         if (llu_glimpse_size(inode, &lvb)) {
851                                 llu_extent_unlock(fd, inode, lsm, LCK_PR, &lockh);
852                                 GOTO(err_out, err = -ENOLCK);
853                         }
854                         lli->lli_st_size = lvb.lvb_size;
855                 } else {
856                         lli->lli_st_size = kms;
857                 }
858
859                 CDEBUG(D_INFO, "Reading inode %lu, "LPSZ" bytes, offset %Ld, "
860                        "i_size "LPU64"\n", lli->lli_st_ino, count, pos,
861                        lli->lli_st_size);
862
863                 if (pos >= lli->lli_st_size) {
864                         llu_extent_unlock(fd, inode, lsm, LCK_PR, &lockh);
865                         break;
866                 }
867
868                 cookie = llu_rw(OBD_BRW_READ, inode, buf, count, pos);
869                 if (!IS_ERR(cookie)) {
870                         /* save cookie */
871                         lsca->cookies[lsca->ncookies++] = cookie;
872                         pos += count;
873                 } else {
874                         llu_extent_unlock(fd, inode, lsm, LCK_PR, &lockh);
875                         GOTO(err_out, err = PTR_ERR(cookie));
876                 }
877
878                 /* XXX errors? */
879                 err = llu_extent_unlock(fd, inode, lsm, LCK_PR, &lockh);
880                 if (err)
881                         CERROR("extent_unlock fail: %d\n", err);
882         }
883 #if 0
884         if (readed > 0)
885                 llu_update_atime(inode);
886 #endif
887         RETURN(lsca);
888
889 err_out:
890         /* teardown all async stuff */
891         while (lsca->ncookies--) {
892                 put_sysio_cookie(lsca->cookies[lsca->ncookies]);
893         }
894         OBD_FREE(lsca, sizeof(*lsca));
895
896         RETURN(ERR_PTR(err));
897 }
898
899 int llu_iop_iodone(struct ioctx *ioctxp)
900 {
901         struct llu_sysio_callback_args *lsca = ioctxp->ioctx_private;
902         struct llu_sysio_cookie *cookie;
903         int i, err = 0, rc = 0;
904         ENTRY;
905
906         /* write/read(fd, buf, 0) */
907         if (!lsca) {
908                 ioctxp->ioctx_cc = 0;
909                 RETURN(1);
910         }
911
912         LASSERT(!IS_ERR(lsca));
913
914         for (i = 0; i < lsca->ncookies; i++) {
915                 cookie = lsca->cookies[i];
916                 if (cookie) {
917                         err = oig_wait(cookie->lsc_oig);
918                         if (err && !rc)
919                                 rc = err;
920                         if (!rc)
921                                 ioctxp->ioctx_cc += cookie->lsc_rwcount;
922                         put_sysio_cookie(cookie);
923                 }
924         }
925
926         if (rc) {
927                 LASSERT(rc < 0);
928                 ioctxp->ioctx_cc = -1;
929                 ioctxp->ioctx_errno = -rc;
930         }
931
932         OBD_FREE(lsca, sizeof(*lsca));
933         ioctxp->ioctx_private = NULL;
934
935         RETURN(1);
936 }