X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lustre%2Fliblustre%2Frw.c;h=53bda1f4c21bae0c80cca2f0e680aa0f71789a5a;hb=d8fc975a264d92375c0da12664937abe01ef3bf5;hp=b716544be8e97c4e090ed32abc463c118d094409;hpb=d2d56f38da01001c92a09afc6b52b5acbd9bc13c;p=fs%2Flustre-release.git diff --git a/lustre/liblustre/rw.c b/lustre/liblustre/rw.c index b716544..53bda1f 100644 --- a/lustre/liblustre/rw.c +++ b/lustre/liblustre/rw.c @@ -1,24 +1,41 @@ /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- * vim:expandtab:shiftwidth=8:tabstop=8: * - * Lustre Light block IO + * GPL HEADER START + * + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 only, + * as published by the Free Software Foundation. * - * Copyright (c) 2002-2004 Cluster File Systems, Inc. + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License version 2 for more details (a copy is included + * in the LICENSE file that accompanied this code). * - * This file is part of Lustre, http://www.lustre.org. + * You should have received a copy of the GNU General Public License + * version 2 along with this program; If not, see + * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf * - * Lustre is free software; you can redistribute it and/or - * modify it under the terms of version 2 of the GNU General Public - * License as published by the Free Software Foundation. + * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara, + * CA 95054 USA or visit www.sun.com if you need additional information or + * have any questions. + * + * GPL HEADER END + */ +/* + * Copyright 2008 Sun Microsystems, Inc. All rights reserved + * Use is subject to license terms. + */ +/* + * This file is part of Lustre, http://www.lustre.org/ + * Lustre is a trademark of Sun Microsystems, Inc. * - * Lustre is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. + * lustre/liblustre/rw.c * - * You should have received a copy of the GNU General Public License - * along with Lustre; if not, write to the Free Software - * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + * Lustre Light block IO */ #define DEBUG_SUBSYSTEM S_LLITE @@ -33,10 +50,10 @@ #include #include +#include #ifdef HAVE_XTIO_H #include #endif -#include #include #include #include @@ -57,14 +74,14 @@ struct llu_io_group int lig_npages; __u64 lig_rwcount; struct ll_async_page *lig_llaps; - struct page *lig_pages; + cfs_page_t *lig_pages; void *lig_llap_cookies; }; #define LLU_IO_GROUP_SIZE(x) \ (sizeof(struct llu_io_group) + \ (sizeof(struct ll_async_page) + \ - sizeof(struct page) + \ + sizeof(cfs_page_t) + \ llap_cookie_size) * (x)) struct llu_io_session @@ -93,8 +110,7 @@ static int llu_lock_to_stripe_offset(struct inode *inode, struct ldlm_lock *lock struct { char name[16]; struct ldlm_lock *lock; - struct lov_stripe_md *lsm; - } key = { .name = "lock_to_stripe", .lock = lock, .lsm = lsm }; + } key = { .name = KEY_LOCK_TO_STRIPE, .lock = lock }; __u32 stripe, vallen = sizeof(stripe); int rc; ENTRY; @@ -103,7 +119,7 @@ static int llu_lock_to_stripe_offset(struct inode *inode, struct ldlm_lock *lock RETURN(0); /* get our offset in the lov */ - rc = obd_get_info(exp, sizeof(key), &key, &vallen, &stripe); + rc = obd_get_info(exp, sizeof(key), &key, &vallen, &stripe, lsm); if (rc != 0) { CERROR("obd_get_info: rc = %d\n", rc); LBUG(); @@ -112,9 +128,9 @@ static int llu_lock_to_stripe_offset(struct inode *inode, struct ldlm_lock *lock RETURN(stripe); } -static int llu_extent_lock_callback(struct ldlm_lock *lock, - struct ldlm_lock_desc *new, void *data, - int flag) +int llu_extent_lock_cancel_cb(struct ldlm_lock *lock, + struct ldlm_lock_desc *new, void *data, + int flag) { struct lustre_handle lockh = { 0 }; int rc; @@ -179,7 +195,6 @@ static int llu_glimpse_callback(struct ldlm_lock *lock, void *reqp) struct inode *inode = llu_inode_from_lock(lock); struct llu_inode_info *lli; struct ost_lvb *lvb; - int size[2] = { sizeof(struct ptlrpc_body), sizeof(*lvb) }; int rc, stripe = 0; ENTRY; @@ -195,17 +210,20 @@ static int llu_glimpse_callback(struct ldlm_lock *lock, void *reqp) if (lli->lli_smd->lsm_stripe_count > 1) stripe = llu_lock_to_stripe_offset(inode, lock); - rc = lustre_pack_reply(req, 2, size, NULL); + req_capsule_extend(&req->rq_pill, &RQF_LDLM_GL_CALLBACK); + req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, + sizeof(*lvb)); + rc = req_capsule_server_pack(&req->rq_pill); if (rc) { - CERROR("lustre_pack_reply: %d\n", rc); + CERROR("failed pack reply: %d\n", rc); GOTO(iput, rc); } - lvb = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF, sizeof(*lvb)); + lvb = req_capsule_server_get(&req->rq_pill, &RMF_DLM_LVB); lvb->lvb_size = lli->lli_smd->lsm_oinfo[stripe]->loi_kms; - LDLM_DEBUG(lock, "i_size: %llu -> stripe number %u -> kms "LPU64, - (long long)llu_i2stat(inode)->st_size, stripe,lvb->lvb_size); + LDLM_DEBUG(lock, "i_size: "LPU64" -> stripe number %u -> kms "LPU64, + (__u64)llu_i2stat(inode)->st_size, stripe,lvb->lvb_size); iput: I_RELE(inode); out: @@ -218,22 +236,27 @@ static int llu_glimpse_callback(struct ldlm_lock *lock, void *reqp) return rc; } -static void llu_merge_lvb(struct inode *inode) +static int llu_merge_lvb(struct inode *inode) { struct llu_inode_info *lli = llu_i2info(inode); struct llu_sb_info *sbi = llu_i2sbi(inode); struct intnl_stat *st = llu_i2stat(inode); struct ost_lvb lvb; + int rc; ENTRY; inode_init_lvb(inode, &lvb); - obd_merge_lvb(sbi->ll_dt_exp, lli->lli_smd, &lvb, 0); + rc = obd_merge_lvb(sbi->ll_dt_exp, lli->lli_smd, &lvb, 0); st->st_size = lvb.lvb_size; st->st_blocks = lvb.lvb_blocks; + /* handle st_blocks overflow gracefully */ + if (st->st_blocks < lvb.lvb_blocks) + st->st_blocks = ~0UL; st->st_mtime = lvb.lvb_mtime; st->st_atime = lvb.lvb_atime; st->st_ctime = lvb.lvb_ctime; - EXIT; + + RETURN(rc); } int llu_local_size(struct inode *inode) @@ -250,15 +273,15 @@ int llu_local_size(struct inode *inode) RETURN(0); rc = obd_match(sbi->ll_dt_exp, lli->lli_smd, LDLM_EXTENT, - &policy, LCK_PR | LCK_PW, &flags, inode, &lockh); + &policy, LCK_PR, &flags, inode, &lockh); if (rc < 0) RETURN(rc); else if (rc == 0) RETURN(-ENODATA); - llu_merge_lvb(inode); + rc = llu_merge_lvb(inode); obd_cancel(sbi->ll_dt_exp, lli->lli_smd, LCK_PR, &lockh); - RETURN(0); + RETURN(rc); } /* NB: lov_merge_size will prefer locally cached writes if they extend the @@ -269,7 +292,7 @@ int llu_glimpse_size(struct inode *inode) struct intnl_stat *st = llu_i2stat(inode); struct llu_sb_info *sbi = llu_i2sbi(inode); struct lustre_handle lockh = { 0 }; - struct obd_enqueue_info einfo = { 0 }; + struct ldlm_enqueue_info einfo = { 0 }; struct obd_info oinfo = { { { 0 } } }; int rc; ENTRY; @@ -278,18 +301,17 @@ int llu_glimpse_size(struct inode *inode) if (lli->lli_flags & LLIF_MDS_SIZE_LOCK) RETURN(0); - CDEBUG(D_DLMTRACE, "Glimpsing inode %llu\n", (long long)st->st_ino); + CDEBUG(D_DLMTRACE, "Glimpsing inode "LPU64"\n", (__u64)st->st_ino); if (!lli->lli_smd) { - CDEBUG(D_DLMTRACE, "No objects for inode %llu\n", - (long long)st->st_ino); + CDEBUG(D_DLMTRACE, "No objects for inode "LPU64"\n", + (__u64)st->st_ino); RETURN(0); } einfo.ei_type = LDLM_EXTENT; einfo.ei_mode = LCK_PR; - einfo.ei_flags = LDLM_FL_HAS_INTENT; - einfo.ei_cb_bl = llu_extent_lock_callback; + einfo.ei_cb_bl = osc_extent_blocking_cb; einfo.ei_cb_cp = ldlm_completion_ast; einfo.ei_cb_gl = llu_glimpse_callback; einfo.ei_cbdata = inode; @@ -297,6 +319,7 @@ int llu_glimpse_size(struct inode *inode) oinfo.oi_policy.l_extent.end = OBD_OBJECT_EOF; oinfo.oi_lockh = &lockh; oinfo.oi_md = lli->lli_smd; + oinfo.oi_flags = LDLM_FL_HAS_INTENT; rc = obd_enqueue_rqset(sbi->ll_dt_exp, &oinfo, &einfo); if (rc) { @@ -304,9 +327,9 @@ int llu_glimpse_size(struct inode *inode) RETURN(rc > 0 ? -EIO : rc); } - llu_merge_lvb(inode); - CDEBUG(D_DLMTRACE, "glimpse: size: %llu, blocks: %llu\n", - (long long)st->st_size, (long long)st->st_blocks); + rc = llu_merge_lvb(inode); + CDEBUG(D_DLMTRACE, "glimpse: size: "LPU64", blocks: "LPU64"\n", + (__u64)st->st_size, (__u64)st->st_blocks); RETURN(rc); } @@ -318,7 +341,7 @@ int llu_extent_lock(struct ll_file_data *fd, struct inode *inode, { struct llu_sb_info *sbi = llu_i2sbi(inode); struct intnl_stat *st = llu_i2stat(inode); - struct obd_enqueue_info einfo = { 0 }; + struct ldlm_enqueue_info einfo = { 0 }; struct obd_info oinfo = { { { 0 } } }; struct ost_lvb lvb; int rc; @@ -333,13 +356,12 @@ int llu_extent_lock(struct ll_file_data *fd, struct inode *inode, RETURN(0); CDEBUG(D_DLMTRACE, "Locking inode %llu, start "LPU64" end "LPU64"\n", - (long long)st->st_ino, policy->l_extent.start, + (unsigned long long)st->st_ino, policy->l_extent.start, policy->l_extent.end); einfo.ei_type = LDLM_EXTENT; einfo.ei_mode = mode; - einfo.ei_flags = ast_flags; - einfo.ei_cb_bl = llu_extent_lock_callback; + einfo.ei_cb_bl = osc_extent_blocking_cb; einfo.ei_cb_cp = ldlm_completion_ast; einfo.ei_cb_gl = llu_glimpse_callback; einfo.ei_cbdata = inode; @@ -347,8 +369,9 @@ int llu_extent_lock(struct ll_file_data *fd, struct inode *inode, oinfo.oi_policy = *policy; oinfo.oi_lockh = lockh; oinfo.oi_md = lsm; + oinfo.oi_flags = ast_flags; - rc = obd_enqueue(sbi->ll_dt_exp, &oinfo, &einfo); + rc = obd_enqueue(sbi->ll_dt_exp, &oinfo, &einfo, NULL); *policy = oinfo.oi_policy; if (rc > 0) rc = -EIO; @@ -394,10 +417,17 @@ struct ll_async_page { int llap_magic; void *llap_cookie; int llap_queued; - struct page *llap_page; + cfs_page_t *llap_page; struct inode *llap_inode; }; +static inline struct ll_async_page *llap_from_cookie(void *ptr) +{ + struct ll_async_page *ap = ptr; + LASSERT(ap->llap_magic == LLAP_MAGIC); + return ap; +} + static void llu_ap_fill_obdo(void *data, int cmd, struct obdo *oa) { struct ll_async_page *llap; @@ -406,7 +436,7 @@ static void llu_ap_fill_obdo(void *data, int cmd, struct obdo *oa) obd_flag valid_flags; ENTRY; - llap = LLAP_FROM_COOKIE(data); + llap = llap_from_cookie(data); inode = llap->llap_inode; lsm = llu_i2info(inode)->lli_smd; @@ -428,7 +458,7 @@ static void llu_ap_update_obdo(void *data, int cmd, struct obdo *oa, struct ll_async_page *llap; ENTRY; - llap = LLAP_FROM_COOKIE(data); + llap = llap_from_cookie(data); obdo_from_inode(oa, llap->llap_inode, valid); EXIT; @@ -438,10 +468,10 @@ static void llu_ap_update_obdo(void *data, int cmd, struct obdo *oa, static int llu_ap_completion(void *data, int cmd, struct obdo *oa, int rc) { struct ll_async_page *llap; - struct page *page; + cfs_page_t *page; ENTRY; - llap = LLAP_FROM_COOKIE(data); + llap = llap_from_cookie(data); llap->llap_queued = 0; page = llap->llap_page; @@ -453,12 +483,18 @@ static int llu_ap_completion(void *data, int cmd, struct obdo *oa, int rc) RETURN(0); } +static struct obd_capa * llu_ap_lookup_capa(void *data, int cmd) +{ + return NULL; +} + static struct obd_async_page_ops llu_async_page_ops = { .ap_make_ready = NULL, .ap_refresh_count = NULL, .ap_fill_obdo = llu_ap_fill_obdo, .ap_update_obdo = llu_ap_update_obdo, .ap_completion = llu_ap_completion, + .ap_lookup_capa = llu_ap_lookup_capa, }; static int llu_queue_pio(int cmd, struct llu_io_group *group, @@ -468,7 +504,7 @@ static int llu_queue_pio(int cmd, struct llu_io_group *group, struct intnl_stat *st = llu_i2stat(group->lig_inode); struct lov_stripe_md *lsm = lli->lli_smd; struct obd_export *exp = llu_i2obdexp(group->lig_inode); - struct page *pages = &group->lig_pages[group->lig_npages],*page = pages; + cfs_page_t *pages = &group->lig_pages[group->lig_npages],*page = pages; struct ll_async_page *llap = &group->lig_llaps[group->lig_npages]; void *llap_cookie = group->lig_llap_cookies + llap_cookie_size * group->lig_npages; @@ -524,12 +560,15 @@ static int llu_queue_pio(int cmd, struct llu_io_group *group, rc = obd_prep_async_page(exp, lsm, NULL, page, (obd_off)page->index << CFS_PAGE_SHIFT, &llu_async_page_ops, - llap, &llap->llap_cookie); + llap, &llap->llap_cookie, + 1 /* no cache in liblustre at all */, + NULL); if (rc) { LASSERT(rc < 0); llap->llap_cookie = NULL; RETURN(rc); } + CDEBUG(D_CACHE, "llap %p page %p group %p obj off "LPU64"\n", llap, page, llap->llap_cookie, (obd_off)pages->index << CFS_PAGE_SHIFT); @@ -595,7 +634,8 @@ struct llu_io_group * get_io_group(struct inode *inode, int maxpages, if (!llap_cookie_size) llap_cookie_size = obd_prep_async_page(llu_i2obdexp(inode), NULL, NULL, NULL, 0, - NULL, NULL, NULL); + NULL, NULL, NULL, 0, + NULL); OBD_ALLOC(group, LLU_IO_GROUP_SIZE(maxpages)); if (!group) @@ -606,7 +646,7 @@ struct llu_io_group * get_io_group(struct inode *inode, int maxpages, group->lig_maxpages = maxpages; group->lig_params = params; group->lig_llaps = (struct ll_async_page *)(group + 1); - group->lig_pages = (struct page *)(&group->lig_llaps[maxpages]); + group->lig_pages = (cfs_page_t *)(&group->lig_llaps[maxpages]); group->lig_llap_cookies = (void *)(&group->lig_pages[maxpages]); rc = oig_init(&group->lig_oig); @@ -774,7 +814,7 @@ ssize_t llu_file_prwv(const struct iovec *iovec, int iovlen, err = oig_wait(iogroup->lig_oig); if (err) { - CERROR("sync error %d, data corruption possible\n", err); + CERROR("%s error: %s\n", is_read ? "read" : "write", strerror(-err)); GOTO(err_unlock, err); }