])
])
+# 2.6.27 exported add_to_page_cache_lru.
+AC_DEFUN([LC_EXPORT_ADD_TO_PAGE_CACHE_LRU],
+[LB_CHECK_SYMBOL_EXPORT([add_to_page_cache_lru],
+[mm/filemap.c],[
+ AC_DEFINE(HAVE_ADD_TO_PAGE_CACHE_LRU, 1,
+ [add_to_page_cache_lru functions are present])
+],[
+])
+])
+
# 2.6.31
# 2.6.31 replaces blk_queue_hardsect_size by blk_queue_logical_block_size function
LC_VFS_SYMLINK_5ARGS
LC_SB_ANY_QUOTA_ACTIVE
LC_SB_HAS_QUOTA_ACTIVE
+ LC_EXPORT_ADD_TO_PAGE_CACHE_LRU
# 2.6.31
LC_BLK_QUEUE_LOG_BLK_SIZE
])
])
+# 2.6.29 split file and anonymous page queues
+AC_DEFUN([LC_PAGEVEC_LRU_ADD_FILE],
+[AC_MSG_CHECKING([if kernel has .pagevec_lru_add_file])
+LB_LINUX_TRY_COMPILE([
+ #include <linux/mm.h>
+ #include <linux/pagevec.h>
+],[
+ struct pagevec lru_pagevec;
+
+ pagevec_init(&lru_pagevec, 0);
+ pagevec_lru_add_file(&lru_pagevec);
+],[
+ AC_MSG_RESULT([yes])
+ AC_DEFINE(HAVE_PAGEVEC_LRU_ADD_FILE, 1,
+ [kernel has .pagevec_lru_add_file])
+],[
+ AC_MSG_RESULT([no])
+])
+])
+
#
# --enable-mpitest
#
LC_D_OBTAIN_ALIAS
LC_BLKDEV_PUT_2ARGS
LC_DENTRY_OPEN_4ARGS
+ LC_PAGEVEC_LRU_ADD_FILE
])
* Use is subject to license terms.
*/
/*
+ * Copyright (c) 2011 Whamcloud, Inc.
+ */
+/*
* This file is part of Lustre, http://www.lustre.org/
* Lustre is a trademark of Sun Microsystems, Inc.
*
OBD_CONNECT_MDS_CAPA |
OBD_CONNECT_OSS_CAPA |
OBD_CONNECT_IBITS |
+ OBD_CONNECT_BRW_SIZE |
OBD_CONNECT_MDS_MDS |
OBD_CONNECT_FID |
OBD_CONNECT_AT |
OBD_CONNECT_FULL20 |
OBD_CONNECT_64BITHASH;
+ ocd->ocd_brw_size = PTLRPC_MAX_BRW_SIZE;
rc = obd_connect(env, &desc->cl_exp, mdc, &mdc->obd_uuid, ocd, NULL);
OBD_FREE_PTR(ocd);
if (rc) {
* Use is subject to license terms.
*/
/*
+ * Copyright (c) 2011 Whamcloud, Inc.
+ */
+/*
* This file is part of Lustre, http://www.lustre.org/
* Lustre is a trademark of Sun Microsystems, Inc.
*/
#define bio_hw_segments(q, bio) 0
#endif
+#ifndef HAVE_PAGEVEC_LRU_ADD_FILE
+#define pagevec_lru_add_file pagevec_lru_add
+#endif
+
+#ifdef HAVE_ADD_TO_PAGE_CACHE_LRU
+#define ll_add_to_page_cache_lru(pg, mapping, off, gfp) \
+ add_to_page_cache_lru(pg, mapping, off, gfp)
+#define ll_pagevec_init(pv, cold) do {} while (0)
+#define ll_pagevec_add(pv, pg) (0)
+#define ll_pagevec_lru_add_file(pv) do {} while (0)
+#else
+#define ll_add_to_page_cache_lru(pg, mapping, off, gfp) \
+ add_to_page_cache(pg, mapping, off, gfp)
+#define ll_pagevec_init(pv, cold) pagevec_init(&lru_pvec, cold);
+#define ll_pagevec_add(pv, pg) pagevec_add(pv, pg)
+#define ll_pagevec_lru_add_file(pv) pagevec_lru_add_file(pv)
+#endif
+
#endif /* __KERNEL__ */
#endif /* _COMPAT25_H */
* Use is subject to license terms.
*/
/*
+ * Copyright (c) 2011 Whamcloud, Inc.
+ */
+/*
* This file is part of Lustre, http://www.lustre.org/
* Lustre is a trademark of Sun Microsystems, Inc.
*
unsigned long count, void *data);
int lprocfs_obd_rd_mntdev(char *page, char **start, off_t off,
int count, int *eof, void *data);
+int lprocfs_obd_rd_max_pages_per_rpc(char *page, char **start, off_t off,
+ int count, int *eof, void *data);
+int lprocfs_obd_wr_max_pages_per_rpc(struct file *file, const char *buffer,
+ unsigned long count, void *data);
/* all quota proc functions */
extern int lprocfs_quota_rd_bunit(char *page, char **start,
off_t off, int count,
* Use is subject to license terms.
*/
/*
+ * Copyright (c) 2011 Whamcloud, Inc.
+ */
+/*
* This file is part of Lustre, http://www.lustre.org/
* Lustre is a trademark of Sun Microsystems, Inc.
*
};
enum lu_dirpage_flags {
- LDF_EMPTY = 1 << 0
+ /**
+ * dirpage contains no entry.
+ */
+ LDF_EMPTY = 1 << 0,
+ /**
+ * last entry's lde_hash equals ldp_hash_end.
+ */
+ LDF_COLLIDE = 1 << 1
};
static inline struct lu_dirent *lu_dirent_start(struct lu_dirpage *dp)
#define MDS_DIR_END_OFF 0xfffffffffffffffeULL
+/**
+ * MDS_READPAGE page size
+ *
+ * This is the directory page size packed in MDS_READPAGE RPC.
+ * It's different than CFS_PAGE_SIZE because the client needs to
+ * access the struct lu_dirpage header packed at the beginning of
+ * the "page" and without this there isn't any way to know find the
+ * lu_dirpage header is if client and server CFS_PAGE_SIZE differ.
+ */
+#define LU_PAGE_SHIFT 12
+#define LU_PAGE_SIZE (1UL << LU_PAGE_SHIFT)
+#define LU_PAGE_MASK (~(LU_PAGE_SIZE - 1))
+
+#define LU_PAGE_COUNT 1 << (CFS_PAGE_SHIFT - LU_PAGE_SHIFT)
+
/** @} lu_dir */
struct lustre_handle {
OBD_CONNECT_CANCELSET | OBD_CONNECT_AT | \
OBD_CONNECT_RMT_CLIENT | \
OBD_CONNECT_RMT_CLIENT_FORCE | \
- OBD_CONNECT_MDS_CAPA | OBD_CONNECT_OSS_CAPA | \
- OBD_CONNECT_MDS_MDS | OBD_CONNECT_FID | \
- LRU_RESIZE_CONNECT_FLAG | OBD_CONNECT_VBR | \
- OBD_CONNECT_LOV_V3 | OBD_CONNECT_SOM | \
- OBD_CONNECT_FULL20 | OBD_CONNECT_64BITHASH)
+ OBD_CONNECT_BRW_SIZE | OBD_CONNECT_MDS_CAPA | \
+ OBD_CONNECT_OSS_CAPA | OBD_CONNECT_MDS_MDS | \
+ OBD_CONNECT_FID | LRU_RESIZE_CONNECT_FLAG | \
+ OBD_CONNECT_VBR | OBD_CONNECT_LOV_V3 | \
+ OBD_CONNECT_SOM | OBD_CONNECT_FULL20 | \
+ OBD_CONNECT_64BITHASH)
#define OST_CONNECT_SUPPORTED (OBD_CONNECT_SRVLOCK | OBD_CONNECT_GRANT | \
OBD_CONNECT_REQPORTAL | OBD_CONNECT_VERSION | \
OBD_CONNECT_TRUNCLOCK | OBD_CONNECT_INDEX | \
* Use is subject to license terms.
*/
/*
+ * Copyright (c) 2011 Whamcloud, Inc.
+ */
+/*
* This file is part of Lustre, http://www.lustre.org/
* Lustre is a trademark of Sun Microsystems, Inc.
*
#define OBD_RECOVERY_MAX_TIME (obd_timeout * 18) /* b13079 */
+struct l_wait_info;
+
void target_cancel_recovery_timer(struct obd_device *obd);
void target_stop_recovery_thread(struct obd_device *obd);
void target_cleanup_recovery(struct obd_device *obd);
int target_queue_recovery_request(struct ptlrpc_request *req,
struct obd_device *obd);
void target_send_reply(struct ptlrpc_request *req, int rc, int fail_id);
+int target_bulk_io(struct obd_export *exp, struct ptlrpc_bulk_desc *desc,
+ struct l_wait_info *lwi);
/* client.c */
* Use is subject to license terms.
*/
/*
+ * Copyright (c) 2011 Whamcloud, Inc.
+ */
+/*
* This file is part of Lustre, http://www.lustre.org/
* Lustre is a trademark of Sun Microsystems, Inc.
*
int (*moo_xattr_del)(const struct lu_env *env, struct md_object *obj,
const char *name);
+ /** \retval number of bytes actually read upon success */
int (*moo_readpage)(const struct lu_env *env, struct md_object *obj,
const struct lu_rdpg *rdpg);
* Use is subject to license terms.
*/
/*
+ * Copyright (c) 2011 Whamcloud, Inc.
+ */
+/*
* This file is part of Lustre, http://www.lustre.org/
* Lustre is a trademark of Sun Microsystems, Inc.
*/
int (*m_sync)(struct obd_export *, const struct lu_fid *,
struct obd_capa *, struct ptlrpc_request **);
int (*m_readpage)(struct obd_export *, const struct lu_fid *,
- struct obd_capa *, __u64, struct page *,
- struct ptlrpc_request **);
+ struct obd_capa *, __u64, struct page **,
+ unsigned, struct ptlrpc_request **);
int (*m_unlink)(struct obd_export *, struct md_op_data *,
struct ptlrpc_request **);
extern void obdo_from_inode(struct obdo *dst, struct inode *src,
struct lu_fid *parent, obd_flag valid);
+/* return 1 if client should be resend request */
+static inline int client_should_resend(int resend, struct client_obd *cli)
+{
+ return cfs_atomic_read(&cli->cl_resends) ?
+ cfs_atomic_read(&cli->cl_resends) > resend : 1;
+}
+
#endif /* __OBD_H */
* Use is subject to license terms.
*/
/*
+ * Copyright (c) 2011 Whamcloud, Inc.
+ */
+/*
* This file is part of Lustre, http://www.lustre.org/
* Lustre is a trademark of Sun Microsystems, Inc.
*/
static inline int md_readpage(struct obd_export *exp, const struct lu_fid *fid,
struct obd_capa *oc, __u64 offset,
- struct page *page,
+ struct page **pages, unsigned npages,
struct ptlrpc_request **request)
{
int rc;
ENTRY;
EXP_CHECK_MD_OP(exp, readpage);
EXP_MD_COUNTER_INCREMENT(exp, readpage);
- rc = MDP(exp->exp_obd, readpage)(exp, fid, oc, offset, page, request);
+ rc = MDP(exp->exp_obd, readpage)(exp, fid, oc, offset, pages, npages,
+ request);
RETURN(rc);
}
* Use is subject to license terms.
*/
/*
+ * Copyright (c) 2011 Whamcloud, Inc.
+ */
+/*
* This file is part of Lustre, http://www.lustre.org/
* Lustre is a trademark of Sun Microsystems, Inc.
*/
cfs_spin_unlock(&exp->exp_locks_list_guard);
}
#endif
+
+static int target_bulk_timeout(void *data)
+{
+ ENTRY;
+ /* We don't fail the connection here, because having the export
+ * killed makes the (vital) call to commitrw very sad.
+ */
+ RETURN(1);
+}
+
+static inline char *bulk2type(struct ptlrpc_bulk_desc *desc)
+{
+ return desc->bd_type == BULK_GET_SINK ? "GET" : "PUT";
+}
+
+int target_bulk_io(struct obd_export *exp, struct ptlrpc_bulk_desc *desc,
+ struct l_wait_info *lwi)
+{
+ struct ptlrpc_request *req = desc->bd_req;
+ int rc = 0;
+ ENTRY;
+
+ /* Check if there is eviction in progress, and if so, wait for
+ * it to finish */
+ if (unlikely(cfs_atomic_read(&exp->exp_obd->obd_evict_inprogress))) {
+ *lwi = LWI_INTR(NULL, NULL);
+ rc = l_wait_event(exp->exp_obd->obd_evict_inprogress_waitq,
+ !cfs_atomic_read(&exp->exp_obd->
+ obd_evict_inprogress),
+ lwi);
+ }
+
+ /* Check if client was evicted or tried to reconnect already */
+ if (exp->exp_failed || exp->exp_abort_active_req) {
+ rc = -ENOTCONN;
+ } else {
+ if (desc->bd_type == BULK_PUT_SINK)
+ rc = sptlrpc_svc_wrap_bulk(req, desc);
+ if (rc == 0)
+ rc = ptlrpc_start_bulk_transfer(desc);
+ }
+
+ if (rc == 0 && OBD_FAIL_CHECK(OBD_FAIL_MDS_SENDPAGE)) {
+ ptlrpc_abort_bulk(desc);
+ } else if (rc == 0) {
+ time_t start = cfs_time_current_sec();
+ do {
+ long timeoutl = req->rq_deadline - cfs_time_current_sec();
+ cfs_duration_t timeout = timeoutl <= 0 ?
+ CFS_TICK : cfs_time_seconds(timeoutl);
+ *lwi = LWI_TIMEOUT_INTERVAL(timeout,
+ cfs_time_seconds(1),
+ target_bulk_timeout,
+ desc);
+ rc = l_wait_event(desc->bd_waitq,
+ !ptlrpc_server_bulk_active(desc) ||
+ exp->exp_failed ||
+ exp->exp_abort_active_req,
+ lwi);
+ LASSERT(rc == 0 || rc == -ETIMEDOUT);
+ /* Wait again if we changed deadline */
+ } while ((rc == -ETIMEDOUT) &&
+ (req->rq_deadline > cfs_time_current_sec()));
+
+ if (rc == -ETIMEDOUT) {
+ DEBUG_REQ(D_ERROR, req,
+ "timeout on bulk %s after %ld%+lds",
+ bulk2type(desc),
+ req->rq_deadline - start,
+ cfs_time_current_sec() -
+ req->rq_deadline);
+ ptlrpc_abort_bulk(desc);
+ } else if (exp->exp_failed) {
+ DEBUG_REQ(D_ERROR, req, "Eviction on bulk %s",
+ bulk2type(desc));
+ rc = -ENOTCONN;
+ ptlrpc_abort_bulk(desc);
+ } else if (exp->exp_abort_active_req) {
+ DEBUG_REQ(D_ERROR, req, "Reconnect on bulk %s",
+ bulk2type(desc));
+ /* we don't reply anyway */
+ rc = -ETIMEDOUT;
+ ptlrpc_abort_bulk(desc);
+ } else if (!desc->bd_success ||
+ desc->bd_nob_transferred != desc->bd_nob) {
+ DEBUG_REQ(D_ERROR, req, "%s bulk %s %d(%d)",
+ desc->bd_success ?
+ "truncated" : "network error on",
+ bulk2type(desc),
+ desc->bd_nob_transferred,
+ desc->bd_nob);
+ /* XXX should this be a different errno? */
+ rc = -ETIMEDOUT;
+ } else if (desc->bd_type == BULK_GET_SINK) {
+ rc = sptlrpc_svc_unwrap_bulk(req, desc);
+ }
+ } else {
+ DEBUG_REQ(D_ERROR, req, "bulk %s failed: rc %d",
+ bulk2type(desc), rc);
+ }
+
+ RETURN(rc);
+}
+EXPORT_SYMBOL(target_bulk_io);
* Use is subject to license terms.
*/
/*
+ * Copyright (c) 2011 Whamcloud, Inc.
+ */
+/*
* This file is part of Lustre, http://www.lustre.org/
* Lustre is a trademark of Sun Microsystems, Inc.
*
offset = (__u64)hash_x_index(page->index, 0);
rc = md_readpage(sbi->ll_md_exp, &lli->lli_fid, NULL,
- offset, page, &request);
+ offset, &page, 1, &request);
if (!rc) {
body = req_capsule_server_get(&request->rq_pill, &RMF_MDT_BODY);
LASSERT(body != NULL); /* checked by md_readpage() */
* Use is subject to license terms.
*/
/*
+ * Copyright (c) 2011 Whamcloud, Inc.
+ */
+/*
* This file is part of Lustre, http://www.lustre.org/
* Lustre is a trademark of Sun Microsystems, Inc.
*
#include <linux/smp_lock.h>
#include <asm/uaccess.h>
#include <linux/buffer_head.h> // for wait_on_buffer
+#include <linux/pagevec.h>
#define DEBUG_SUBSYSTEM S_LLITE
*
* page format
*
- *
- *
- *
+ * Page in MDS_READPAGE RPC is packed in LU_PAGE_SIZE, and each page contains
+ * a header lu_dirpage which describes the start/end hash, and whether this
+ * page is empty (contains no dir entry) or hash collide with next page.
+ * After client receives reply, several pages will be integrated into dir page
+ * in CFS_PAGE_SIZE (if CFS_PAGE_SIZE greater than LU_PAGE_SIZE), and the
+ * lu_dirpage for this integrated page will be adjusted.
*
*/
/* returns the page unlocked, but with a reference */
-static int ll_dir_readpage(struct file *file, struct page *page)
+static int ll_dir_readpage(struct file *file, struct page *page0)
{
- struct inode *inode = page->mapping->host;
+ struct inode *inode = page0->mapping->host;
+ int hash64 = ll_i2sbi(inode)->ll_flags & LL_SBI_64BIT_HASH;
+ struct obd_export *exp = ll_i2sbi(inode)->ll_md_exp;
struct ptlrpc_request *request;
struct mdt_body *body;
struct obd_capa *oc;
__u64 hash;
+ struct page **page_pool;
+ struct page *page;
+#ifndef HAVE_ADD_TO_PAGE_CACHE_LRU
+ struct pagevec lru_pvec;
+#endif
+ struct lu_dirpage *dp;
+ int max_pages = ll_i2sbi(inode)->ll_md_brw_size >> CFS_PAGE_SHIFT;
+ int nrdpgs = 0; /* number of pages read actually */
+ int npages;
+ int i;
int rc;
ENTRY;
hash = lli->lli_sa_pos;
cfs_spin_unlock(&lli->lli_sa_lock);
}
- CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p) off %lu\n",
- inode->i_ino, inode->i_generation, inode, (unsigned long)hash);
+ CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p) hash "LPU64"\n",
+ inode->i_ino, inode->i_generation, inode, hash);
+
+ LASSERT(max_pages > 0 && max_pages <= PTLRPC_MAX_BRW_PAGES);
+
+ OBD_ALLOC(page_pool, sizeof(page) * max_pages);
+ if (page_pool != NULL) {
+ page_pool[0] = page0;
+ } else {
+ page_pool = &page0;
+ max_pages = 1;
+ }
+ for (npages = 1; npages < max_pages; npages++) {
+ page = page_cache_alloc_cold(inode->i_mapping);
+ if (!page)
+ break;
+ page_pool[npages] = page;
+ }
oc = ll_mdscapa_get(inode);
- rc = md_readpage(ll_i2sbi(inode)->ll_md_exp, ll_inode2fid(inode),
- oc, hash, page, &request);
+ rc = md_readpage(exp, ll_inode2fid(inode), oc, hash, page_pool, npages,
+ &request);
capa_put(oc);
- if (!rc) {
+ if (rc == 0) {
body = req_capsule_server_get(&request->rq_pill, &RMF_MDT_BODY);
/* Checked by mdc_readpage() */
LASSERT(body != NULL);
if (body->valid & OBD_MD_FLSIZE)
cl_isize_write(inode, body->size);
- SetPageUptodate(page);
+
+ nrdpgs = (request->rq_bulk->bd_nob_transferred+CFS_PAGE_SIZE-1)
+ >> CFS_PAGE_SHIFT;
+ SetPageUptodate(page0);
}
+ unlock_page(page0);
ptlrpc_req_finished(request);
- unlock_page(page);
+ CDEBUG(D_VFSTRACE, "read %d/%d pages\n", nrdpgs, npages);
+
+ ll_pagevec_init(&lru_pvec, 0);
+ for (i = 1; i < npages; i++) {
+ unsigned long offset;
+ int ret;
+
+ page = page_pool[i];
+
+ if (rc < 0 || i >= nrdpgs) {
+ page_cache_release(page);
+ continue;
+ }
+
+ SetPageUptodate(page);
+
+ dp = cfs_kmap(page);
+ hash = le64_to_cpu(dp->ldp_hash_start);
+ cfs_kunmap(page);
+
+ offset = hash_x_index(hash, hash64);
+
+ prefetchw(&page->flags);
+ ret = ll_add_to_page_cache_lru(page, inode->i_mapping, offset,
+ GFP_KERNEL);
+ if (ret == 0) {
+ unlock_page(page);
+ page_cache_get(page);
+ if (ll_pagevec_add(&lru_pvec, page) == 0)
+ ll_pagevec_lru_add_file(&lru_pvec);
+ } else {
+ CDEBUG(D_VFSTRACE, "page %lu add to page cache failed:"
+ " %d\n", offset, ret);
+ }
+ page_cache_release(page);
+ }
+ ll_pagevec_lru_add_file(&lru_pvec);
+
+ if (page_pool != &page0)
+ OBD_FREE(page_pool, sizeof(struct page *) * max_pages);
EXIT;
return rc;
}
*/
wait_on_page(page);
if (PageUptodate(page)) {
- dp = kmap(page);
+ dp = cfs_kmap(page);
if (BITS_PER_LONG == 32 && hash64) {
*start = le64_to_cpu(dp->ldp_hash_start) >> 32;
*end = le64_to_cpu(dp->ldp_hash_end) >> 32;
}
LASSERTF(*start <= *hash, "start = "LPX64",end = "
LPX64",hash = "LPX64"\n", *start, *end, *hash);
+ CDEBUG(D_VFSTRACE, "page %lu [%llu %llu], hash "LPU64"\n",
+ offset, *start, *end, *hash);
if (*hash > *end || (*end != *start && *hash == *end)) {
- ll_release_page(page, *hash, *start, *end);
+ /*
+ * upon hash collision, remove this page,
+ * otherwise put page reference, and
+ * ll_get_dir_page() will issue RPC to fetch
+ * the page we want.
+ */
+ if (dp->ldp_flags & cpu_to_le32(LDF_COLLIDE)) {
+ ll_release_page(page, *hash, *start, *end);
+ } else {
+ cfs_kunmap(page);
+ page_cache_release(page);
+ }
page = NULL;
}
} else {
* Use is subject to license terms.
*/
/*
+ * Copyright (c) 2011 Whamcloud, Inc.
+ */
+/*
* This file is part of Lustre, http://www.lustre.org/
* Lustre is a trademark of Sun Microsystems, Inc.
*/
/* =0 - hold lock over whole read/write
* >0 - max. chunk to be read/written w/o lock re-acquiring */
unsigned long ll_max_rw_chunk;
+ unsigned int ll_md_brw_size; /* used by readdir */
struct lu_site *ll_site;
struct cl_device *ll_cl;
* Use is subject to license terms.
*/
/*
+ * Copyright (c) 2011 Whamcloud, Inc.
+ */
+/*
* This file is part of Lustre, http://www.lustre.org/
* Lustre is a trademark of Sun Microsystems, Inc.
*
/* indicate the features supported by this client */
data->ocd_connect_flags = OBD_CONNECT_IBITS | OBD_CONNECT_NODEVOH |
OBD_CONNECT_JOIN | OBD_CONNECT_ATTRFID |
- OBD_CONNECT_VERSION | OBD_CONNECT_MDS_CAPA |
- OBD_CONNECT_OSS_CAPA | OBD_CONNECT_CANCELSET|
- OBD_CONNECT_FID | OBD_CONNECT_AT |
- OBD_CONNECT_LOV_V3 | OBD_CONNECT_RMT_CLIENT |
- OBD_CONNECT_VBR | OBD_CONNECT_FULL20 |
- OBD_CONNECT_64BITHASH;
+ OBD_CONNECT_VERSION | OBD_CONNECT_BRW_SIZE |
+ OBD_CONNECT_MDS_CAPA | OBD_CONNECT_OSS_CAPA |
+ OBD_CONNECT_CANCELSET| OBD_CONNECT_FID |
+ OBD_CONNECT_AT | OBD_CONNECT_LOV_V3 |
+ OBD_CONNECT_RMT_CLIENT | OBD_CONNECT_VBR |
+ OBD_CONNECT_FULL20 | OBD_CONNECT_64BITHASH;
if (sbi->ll_flags & LL_SBI_SOM_PREVIEW)
data->ocd_connect_flags |= OBD_CONNECT_SOM;
if (sbi->ll_flags & LL_SBI_RMT_CLIENT)
data->ocd_connect_flags |= OBD_CONNECT_RMT_CLIENT_FORCE;
+ data->ocd_brw_size = PTLRPC_MAX_BRW_SIZE;
+
err = obd_connect(NULL, &sbi->ll_md_exp, obd, &sbi->ll_sb_uuid, data, NULL);
if (err == -EBUSY) {
LCONSOLE_ERROR_MSG(0x14f, "An MDT (md %s) is performing "
if (data->ocd_connect_flags & OBD_CONNECT_64BITHASH)
sbi->ll_flags |= LL_SBI_64BIT_HASH;
+ if (data->ocd_connect_flags & OBD_CONNECT_BRW_SIZE)
+ sbi->ll_md_brw_size = data->ocd_brw_size;
+ else
+ sbi->ll_md_brw_size = CFS_PAGE_SIZE;
+
obd = class_name2obd(dt);
if (!obd) {
CERROR("DT %s: not setup or attached\n", dt);
obd->obd_upcall.onu_owner = &sbi->ll_lco;
obd->obd_upcall.onu_upcall = cl_ocd_update;
- data->ocd_brw_size = PTLRPC_MAX_BRW_PAGES << CFS_PAGE_SHIFT;
+
+ data->ocd_brw_size = PTLRPC_MAX_BRW_SIZE;
err = obd_connect(NULL, &sbi->ll_dt_exp, obd, &sbi->ll_sb_uuid, data, NULL);
if (err == -EBUSY) {
* Use is subject to license terms.
*/
/*
+ * Copyright (c) 2011 Whamcloud, Inc.
+ */
+/*
* This file is part of Lustre, http://www.lustre.org/
* Lustre is a trademark of Sun Microsystems, Inc.
*/
}
static int lmv_readpage(struct obd_export *exp, const struct lu_fid *fid,
- struct obd_capa *oc, __u64 offset64, struct page *page,
+ struct obd_capa *oc, __u64 offset64,
+ struct page **pages, unsigned npages,
struct ptlrpc_request **request)
{
struct obd_device *obd = exp->exp_obd;
int tgt0_idx = 0;
int rc;
int nr = 0;
+ int i;
+ /* number of pages read, in CFS_PAGE_SIZE */
+ int nrdpgs;
+ /* number of pages transferred in LU_PAGE_SIZE */
+ int nlupgs;
struct lmv_stripe *los;
struct lmv_tgt_desc *tgt;
struct lu_dirpage *dp;
if (IS_ERR(tgt))
GOTO(cleanup, rc = PTR_ERR(tgt));
- rc = md_readpage(tgt->ltd_exp, &rid, oc, offset, page, request);
+ rc = md_readpage(tgt->ltd_exp, &rid, oc, offset, pages, npages,
+ request);
if (rc)
GOTO(cleanup, rc);
- if (obj) {
- dp = cfs_kmap(page);
- lmv_hash_adjust(&dp->ldp_hash_start, hash_adj);
- lmv_hash_adjust(&dp->ldp_hash_end, hash_adj);
- LASSERT(le64_to_cpu(dp->ldp_hash_start) <= offset64);
+ nrdpgs = ((*request)->rq_bulk->bd_nob_transferred + CFS_PAGE_SIZE - 1)
+ >> CFS_PAGE_SHIFT;
+ nlupgs = (*request)->rq_bulk->bd_nob_transferred >> LU_PAGE_SHIFT;
+ LASSERT(!((*request)->rq_bulk->bd_nob_transferred & ~LU_PAGE_MASK));
+ LASSERT(nrdpgs > 0 && nrdpgs <= npages);
- for (ent = lu_dirent_start(dp); ent != NULL;
- ent = lu_dirent_next(ent))
- lmv_hash_adjust(&ent->lde_hash, hash_adj);
+ CDEBUG(D_INODE, "read %d(%d)/%d pages\n", nrdpgs, nlupgs, npages);
- if (tgt0_idx != nr - 1) {
- __u64 end;
+ for (i = 0; i < nrdpgs; i++) {
+#if CFS_PAGE_SIZE > LU_PAGE_SIZE
+ struct lu_dirpage *first;
+ __u64 hash_end = 0;
+ __u32 flags = 0;
+#endif
+ struct lu_dirent *tmp = NULL;
+
+ dp = cfs_kmap(pages[i]);
+ if (obj) {
+ lmv_hash_adjust(&dp->ldp_hash_start, hash_adj);
+ lmv_hash_adjust(&dp->ldp_hash_end, hash_adj);
+ LASSERT(le64_to_cpu(dp->ldp_hash_start) <= offset64);
- end = le64_to_cpu(dp->ldp_hash_end);
- if (end == MDS_DIR_END_OFF) {
+ if ((tgt0_idx != nr - 1) &&
+ (le64_to_cpu(dp->ldp_hash_end) == MDS_DIR_END_OFF))
+ {
dp->ldp_hash_end = cpu_to_le32(seg_size *
(tgt0_idx + 1));
CDEBUG(D_INODE,
""DFID" reset end "LPX64" tgt %d\n",
PFID(&rid),
- (__u64)le64_to_cpu(dp->ldp_hash_end), tgt_idx);
+ (__u64)le64_to_cpu(dp->ldp_hash_end),
+ tgt_idx);
}
}
- cfs_kunmap(page);
+
+ ent = lu_dirent_start(dp);
+#if CFS_PAGE_SIZE > LU_PAGE_SIZE
+ first = dp;
+ hash_end = dp->ldp_hash_end;
+repeat:
+#endif
+ nlupgs--;
+ for (tmp = ent; ent != NULL;
+ tmp = ent, ent = lu_dirent_next(ent)) {
+ if (obj)
+ lmv_hash_adjust(&ent->lde_hash, hash_adj);
+ }
+
+#if CFS_PAGE_SIZE > LU_PAGE_SIZE
+ dp = (struct lu_dirpage *)((char *)dp + LU_PAGE_SIZE);
+ if (((unsigned long)dp & ~CFS_PAGE_MASK) && nlupgs > 0) {
+ ent = lu_dirent_start(dp);
+
+ if (obj) {
+ lmv_hash_adjust(&dp->ldp_hash_end, hash_adj);
+ if ((tgt0_idx != nr - 1) &&
+ (le64_to_cpu(dp->ldp_hash_end) ==
+ MDS_DIR_END_OFF)) {
+ hash_end = cpu_to_le32(seg_size *
+ (tgt0_idx + 1));
+ CDEBUG(D_INODE,
+ ""DFID" reset end "LPX64" tgt %d\n",
+ PFID(&rid),
+ (__u64)le64_to_cpu(hash_end),
+ tgt_idx);
+ }
+ }
+ hash_end = dp->ldp_hash_end;
+ flags = dp->ldp_flags;
+
+ if (tmp) {
+ /* enlarge the end entry lde_reclen from 0 to
+ * first entry of next lu_dirpage, in this way
+ * several lu_dirpages can be stored into one
+ * client page on client. */
+ tmp = ((void *)tmp) +
+ le16_to_cpu(tmp->lde_reclen);
+ tmp->lde_reclen =
+ cpu_to_le16((char *)(dp->ldp_entries) -
+ (char *)tmp);
+ goto repeat;
+ }
+ }
+ first->ldp_hash_end = hash_end;
+ first->ldp_flags &= ~cpu_to_le32(LDF_COLLIDE);
+ first->ldp_flags |= flags & cpu_to_le32(LDF_COLLIDE);
+#endif
+ cfs_kunmap(pages[i]);
}
EXIT;
cleanup:
* Use is subject to license terms.
*/
/*
+ * Copyright (c) 2011 Whamcloud, Inc.
+ */
+/*
* This file is part of Lustre, http://www.lustre.org/
* Lustre is a trademark of Sun Microsystems, Inc.
*/
/*{ "filegroups", lprocfs_rd_filegroups, 0, 0 },*/
{ "mds_server_uuid", lprocfs_rd_server_uuid, 0, 0 },
{ "mds_conn_uuid", lprocfs_rd_conn_uuid, 0, 0 },
+ /*
+ * FIXME: below proc entry is provided, but not in used, instead
+ * sbi->sb_md_brw_size is used, the per obd variable should be used
+ * when CMD is enabled, and dir pages are managed in MDC layer.
+ * Remember to enable proc write function.
+ */
+ { "max_pages_per_rpc", lprocfs_obd_rd_max_pages_per_rpc,
+ /* lprocfs_obd_wr_max_pages_per_rpc */0, 0 },
{ "max_rpcs_in_flight", mdc_rd_max_rpcs_in_flight,
mdc_wr_max_rpcs_in_flight, 0 },
{ "timeouts", lprocfs_rd_timeouts, 0, 0 },
* Use is subject to license terms.
*/
/*
+ * Copyright (c) 2011 Whamcloud, Inc.
+ */
+/*
* This file is part of Lustre, http://www.lustre.org/
* Lustre is a trademark of Sun Microsystems, Inc.
*/
#endif
int mdc_readpage(struct obd_export *exp, const struct lu_fid *fid,
- struct obd_capa *oc, __u64 offset, struct page *page,
- struct ptlrpc_request **request)
+ struct obd_capa *oc, __u64 offset, struct page **pages,
+ unsigned npages, struct ptlrpc_request **request)
{
struct ptlrpc_request *req;
struct ptlrpc_bulk_desc *desc;
+ int i;
+ cfs_waitq_t waitq;
+ int resends = 0;
+ struct l_wait_info lwi;
int rc;
ENTRY;
*request = NULL;
+ cfs_waitq_init(&waitq);
+
+restart_bulk:
req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_MDS_READPAGE);
if (req == NULL)
RETURN(-ENOMEM);
req->rq_request_portal = MDS_READPAGE_PORTAL;
ptlrpc_at_set_req_timeout(req);
- desc = ptlrpc_prep_bulk_imp(req, 1, BULK_PUT_SINK, MDS_BULK_PORTAL);
+ desc = ptlrpc_prep_bulk_imp(req, npages, BULK_PUT_SINK,
+ MDS_BULK_PORTAL);
if (desc == NULL) {
ptlrpc_request_free(req);
RETURN(-ENOMEM);
}
/* NB req now owns desc and will free it when it gets freed */
- ptlrpc_prep_bulk_page(desc, page, 0, CFS_PAGE_SIZE);
- mdc_readdir_pack(req, offset, CFS_PAGE_SIZE, fid, oc);
+ for (i = 0; i < npages; i++)
+ ptlrpc_prep_bulk_page(desc, pages[i], 0, CFS_PAGE_SIZE);
+
+ mdc_readdir_pack(req, offset, CFS_PAGE_SIZE * npages, fid, oc);
ptlrpc_request_set_replen(req);
rc = ptlrpc_queue_wait(req);
if (rc) {
ptlrpc_req_finished(req);
- RETURN(rc);
+ if (rc != -ETIMEDOUT)
+ RETURN(rc);
+
+ resends++;
+ if (!client_should_resend(resends, &exp->exp_obd->u.cli)) {
+ CERROR("too many resend retries, returning error\n");
+ RETURN(-EIO);
+ }
+ lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL, NULL);
+ l_wait_event(waitq, 0, &lwi);
+
+ goto restart_bulk;
}
rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk,
RETURN(rc);
}
- if (req->rq_bulk->bd_nob_transferred != CFS_PAGE_SIZE) {
+ if (req->rq_bulk->bd_nob_transferred & ~LU_PAGE_MASK) {
CERROR("Unexpected # bytes transferred: %d (%ld expected)\n",
- req->rq_bulk->bd_nob_transferred, CFS_PAGE_SIZE);
+ req->rq_bulk->bd_nob_transferred,
+ CFS_PAGE_SIZE * npages);
ptlrpc_req_finished(req);
RETURN(-EPROTO);
}
* Use is subject to license terms.
*/
/*
+ * Copyright (c) 2011 Whamcloud, Inc.
+ */
+/*
* This file is part of Lustre, http://www.lustre.org/
* Lustre is a trademark of Sun Microsystems, Inc.
*
}
static int mdd_dir_page_build(const struct lu_env *env, struct mdd_device *mdd,
- int first, void *area, int nob,
+ struct lu_dirpage *dp, int nob,
const struct dt_it_ops *iops, struct dt_it *it,
- __u64 *start, __u64 *end,
- struct lu_dirent **last, __u32 attr)
+ __u32 attr)
{
+ void *area = dp;
int result;
__u64 hash = 0;
struct lu_dirent *ent;
+ struct lu_dirent *last = NULL;
+ int first = 1;
- if (first) {
- memset(area, 0, sizeof (struct lu_dirpage));
- area += sizeof (struct lu_dirpage);
- nob -= sizeof (struct lu_dirpage);
- }
+ memset(area, 0, sizeof (*dp));
+ area += sizeof (*dp);
+ nob -= sizeof (*dp);
ent = area;
do {
hash = iops->store(env, it);
if (unlikely(first)) {
first = 0;
- *start = hash;
+ dp->ldp_hash_start = cpu_to_le64(hash);
}
/* calculate max space required for lu_dirent */
* so recheck rec length */
recsize = le16_to_cpu(ent->lde_reclen);
} else {
- /*
- * record doesn't fit into page, enlarge previous one.
- */
- if (*last) {
- (*last)->lde_reclen =
- cpu_to_le16(le16_to_cpu((*last)->lde_reclen) +
- nob);
- result = 0;
- } else
- result = -EINVAL;
-
+ result = (last != NULL) ? 0 :-EINVAL;
goto out;
}
- *last = ent;
+ last = ent;
ent = (void *)ent + recsize;
nob -= recsize;
} while (result == 0);
out:
- *end = hash;
+ dp->ldp_hash_end = cpu_to_le64(hash);
+ if (last != NULL) {
+ if (last->lde_hash == dp->ldp_hash_end)
+ dp->ldp_flags |= cpu_to_le32(LDF_COLLIDE);
+ last->lde_reclen = 0; /* end mark */
+ }
return result;
}
struct dt_object *next = mdd_object_child(obj);
const struct dt_it_ops *iops;
struct page *pg;
- struct lu_dirent *last = NULL;
struct mdd_device *mdd = mdo2mdd(&obj->mod_obj);
int i;
+ int nlupgs = 0;
int rc;
int nob;
- __u64 hash_start;
- __u64 hash_end = 0;
LASSERT(rdpg->rp_pages != NULL);
LASSERT(next->do_index_ops != NULL);
rc = iops->load(env, it, rdpg->rp_hash);
- if (rc == 0){
+ if (rc == 0) {
/*
* Iterator didn't find record with exactly the key requested.
*
*/
for (i = 0, nob = rdpg->rp_count; rc == 0 && nob > 0;
i++, nob -= CFS_PAGE_SIZE) {
+ struct lu_dirpage *dp;
+
LASSERT(i < rdpg->rp_npages);
pg = rdpg->rp_pages[i];
- rc = mdd_dir_page_build(env, mdd, !i, cfs_kmap(pg),
- min_t(int, nob, CFS_PAGE_SIZE), iops,
- it, &hash_start, &hash_end, &last,
- rdpg->rp_attrs);
- if (rc != 0 || i == rdpg->rp_npages - 1) {
- if (last)
- last->lde_reclen = 0;
+ dp = cfs_kmap(pg);
+#if CFS_PAGE_SIZE > LU_PAGE_SIZE
+repeat:
+#endif
+ rc = mdd_dir_page_build(env, mdd, dp,
+ min_t(int, nob, LU_PAGE_SIZE),
+ iops, it, rdpg->rp_attrs);
+ if (rc > 0) {
+ /*
+ * end of directory.
+ */
+ dp->ldp_hash_end = cpu_to_le64(MDS_DIR_END_OFF);
+ nlupgs++;
+ } else if (rc < 0) {
+ CWARN("build page failed: %d!\n", rc);
+ } else {
+ nlupgs++;
+#if CFS_PAGE_SIZE > LU_PAGE_SIZE
+ dp = (struct lu_dirpage *)((char *)dp + LU_PAGE_SIZE);
+ if ((unsigned long)dp & ~CFS_PAGE_MASK)
+ goto repeat;
+#endif
}
cfs_kunmap(pg);
}
- if (rc > 0) {
- /*
- * end of directory.
- */
- hash_end = MDS_DIR_END_OFF;
- rc = 0;
- }
- if (rc == 0) {
+ if (rc >= 0) {
struct lu_dirpage *dp;
dp = cfs_kmap(rdpg->rp_pages[0]);
dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash);
- dp->ldp_hash_end = cpu_to_le64(hash_end);
- if (i == 0)
+ if (nlupgs == 0) {
/*
- * No pages were processed, mark this.
+ * No pages were processed, mark this for first page
+ * and send back.
*/
- dp->ldp_flags |= LDF_EMPTY;
-
- dp->ldp_flags = cpu_to_le32(dp->ldp_flags);
+ dp->ldp_flags = cpu_to_le32(LDF_EMPTY);
+ nlupgs = 1;
+ }
cfs_kunmap(rdpg->rp_pages[0]);
+
+ rc = min_t(unsigned int, nlupgs * LU_PAGE_SIZE, rdpg->rp_count);
}
iops->put(env, it);
iops->fini(env, it);
memset(dp, 0 , sizeof(struct lu_dirpage));
dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash);
dp->ldp_hash_end = cpu_to_le64(MDS_DIR_END_OFF);
- dp->ldp_flags |= LDF_EMPTY;
- dp->ldp_flags = cpu_to_le32(dp->ldp_flags);
+ dp->ldp_flags = cpu_to_le32(LDF_EMPTY);
cfs_kunmap(pg);
- GOTO(out_unlock, rc = 0);
+ GOTO(out_unlock, rc = LU_PAGE_SIZE);
}
rc = __mdd_readpage(env, mdd_obj, rdpg);
* Use is subject to license terms.
*/
/*
+ * Copyright (c) 2011 Whamcloud, Inc.
+ */
+/*
* This file is part of Lustre, http://www.lustre.org/
* Lustre is a trademark of Sun Microsystems, Inc.
*
}
static int mdt_sendpage(struct mdt_thread_info *info,
- struct lu_rdpg *rdpg)
+ struct lu_rdpg *rdpg, int nob)
{
struct ptlrpc_request *req = mdt_info_req(info);
struct obd_export *exp = req->rq_export;
struct l_wait_info *lwi = &info->mti_u.rdpg.mti_wait_info;
int tmpcount;
int tmpsize;
- int timeout;
int i;
int rc;
ENTRY;
if (desc == NULL)
RETURN(-ENOMEM);
- for (i = 0, tmpcount = rdpg->rp_count;
- i < rdpg->rp_npages; i++, tmpcount -= tmpsize) {
+ for (i = 0, tmpcount = nob;
+ i < rdpg->rp_npages && tmpcount > 0; i++, tmpcount -= tmpsize) {
tmpsize = min_t(int, tmpcount, CFS_PAGE_SIZE);
ptlrpc_prep_bulk_page(desc, rdpg->rp_pages[i], 0, tmpsize);
}
- LASSERT(desc->bd_nob == rdpg->rp_count);
- rc = sptlrpc_svc_wrap_bulk(req, desc);
- if (rc)
- GOTO(free_desc, rc);
-
- rc = ptlrpc_start_bulk_transfer(desc);
- if (rc)
- GOTO(free_desc, rc);
-
- if (OBD_FAIL_CHECK(OBD_FAIL_MDS_SENDPAGE))
- GOTO(abort_bulk, rc = 0);
-
- do {
- timeout = (int) req->rq_deadline - cfs_time_current_sec();
- if (timeout < 0)
- CERROR("Req deadline already passed %lu (now: %lu)\n",
- req->rq_deadline, cfs_time_current_sec());
- *lwi = LWI_TIMEOUT_INTERVAL(cfs_time_seconds(max(timeout, 1)),
- cfs_time_seconds(1), NULL, NULL);
- rc = l_wait_event(desc->bd_waitq,
- !ptlrpc_server_bulk_active(desc) ||
- exp->exp_failed ||
- exp->exp_abort_active_req, lwi);
- LASSERT (rc == 0 || rc == -ETIMEDOUT);
- } while ((rc == -ETIMEDOUT) &&
- (req->rq_deadline > cfs_time_current_sec()));
-
- if (rc == 0) {
- if (desc->bd_success &&
- desc->bd_nob_transferred == rdpg->rp_count)
- GOTO(free_desc, rc);
-
- rc = -ETIMEDOUT;
- if (exp->exp_abort_active_req || exp->exp_failed)
- GOTO(abort_bulk, rc);
- }
-
- DEBUG_REQ(D_ERROR, req, "bulk failed: %s %d(%d), evicting %s@%s",
- (rc == -ETIMEDOUT) ? "timeout" : "network error",
- desc->bd_nob_transferred, rdpg->rp_count,
- exp->exp_client_uuid.uuid,
- exp->exp_connection->c_remote_uuid.uuid);
-
- class_fail_export(exp);
-
- EXIT;
-abort_bulk:
- ptlrpc_abort_bulk(desc);
-free_desc:
+ LASSERT(desc->bd_nob == nob);
+ rc = target_bulk_io(exp, desc, lwi);
ptlrpc_free_bulk(desc);
- return rc;
+ RETURN(rc);
}
#ifdef HAVE_SPLIT_SUPPORT
rdpg->rp_attrs = reqbody->mode;
if (info->mti_exp->exp_connect_flags & OBD_CONNECT_64BITHASH)
rdpg->rp_attrs |= LUDA_64BITHASH;
- rdpg->rp_count = reqbody->nlink;
- rdpg->rp_npages = (rdpg->rp_count + CFS_PAGE_SIZE - 1)>>CFS_PAGE_SHIFT;
+ rdpg->rp_count = min_t(unsigned int, reqbody->nlink,
+ PTLRPC_MAX_BRW_SIZE);
+ rdpg->rp_npages = (rdpg->rp_count + CFS_PAGE_SIZE - 1) >>
+ CFS_PAGE_SHIFT;
OBD_ALLOC(rdpg->rp_pages, rdpg->rp_npages * sizeof rdpg->rp_pages[0]);
if (rdpg->rp_pages == NULL)
RETURN(-ENOMEM);
/* call lower layers to fill allocated pages with directory data */
rc = mo_readpage(info->mti_env, mdt_object_child(object), rdpg);
- if (rc)
+ if (rc < 0)
GOTO(free_rdpg, rc);
/* send pages to client */
- rc = mdt_sendpage(info, rdpg);
+ rc = mdt_sendpage(info, rdpg, rc);
EXIT;
free_rdpg:
if (!mdt->mdt_som_conf)
data->ocd_connect_flags &= ~OBD_CONNECT_SOM;
+ if (data->ocd_connect_flags & OBD_CONNECT_BRW_SIZE) {
+ data->ocd_brw_size = min(data->ocd_brw_size,
+ (__u32)(PTLRPC_MAX_BRW_PAGES << CFS_PAGE_SHIFT));
+ if (data->ocd_brw_size == 0) {
+ CERROR("%s: cli %s/%p ocd_connect_flags: "LPX64
+ " ocd_version: %x ocd_grant: %d "
+ "ocd_index: %u ocd_brw_size is "
+ "unexpectedly zero, network data "
+ "corruption? Refusing connection of this"
+ " client\n",
+ exp->exp_obd->obd_name,
+ exp->exp_client_uuid.uuid,
+ exp, data->ocd_connect_flags, data->ocd_version,
+ data->ocd_grant, data->ocd_index);
+ return -EPROTO;
+ }
+ }
+
cfs_spin_lock(&exp->exp_lock);
exp->exp_connect_flags = data->ocd_connect_flags;
cfs_spin_unlock(&exp->exp_lock);
* Use is subject to license terms.
*/
/*
+ * Copyright (c) 2011 Whamcloud, Inc.
+ */
+/*
* This file is part of Lustre, http://www.lustre.org/
* Lustre is a trademark of Sun Microsystems, Inc.
*
}
EXPORT_SYMBOL(lprocfs_obd_rd_mntdev);
+int lprocfs_obd_rd_max_pages_per_rpc(char *page, char **start, off_t off,
+ int count, int *eof, void *data)
+{
+ struct obd_device *dev = data;
+ struct client_obd *cli = &dev->u.cli;
+ int rc;
+
+ client_obd_list_lock(&cli->cl_loi_list_lock);
+ rc = snprintf(page, count, "%d\n", cli->cl_max_pages_per_rpc);
+ client_obd_list_unlock(&cli->cl_loi_list_lock);
+ return rc;
+}
+EXPORT_SYMBOL(lprocfs_obd_rd_max_pages_per_rpc);
+
+int lprocfs_obd_wr_max_pages_per_rpc(struct file *file, const char *buffer,
+ unsigned long count, void *data)
+{
+ struct obd_device *dev = data;
+ struct client_obd *cli = &dev->u.cli;
+ struct obd_connect_data *ocd = &cli->cl_import->imp_connect_data;
+ int val, rc;
+
+ rc = lprocfs_write_helper(buffer, count, &val);
+ if (rc)
+ return rc;
+
+ LPROCFS_CLIMP_CHECK(dev);
+ if (val < 1 || val > ocd->ocd_brw_size >> CFS_PAGE_SHIFT) {
+ LPROCFS_CLIMP_EXIT(dev);
+ return -ERANGE;
+ }
+ client_obd_list_lock(&cli->cl_loi_list_lock);
+ cli->cl_max_pages_per_rpc = val;
+ client_obd_list_unlock(&cli->cl_loi_list_lock);
+
+ LPROCFS_CLIMP_EXIT(dev);
+ return count;
+}
+EXPORT_SYMBOL(lprocfs_obd_wr_max_pages_per_rpc);
+
EXPORT_SYMBOL(lprocfs_register);
EXPORT_SYMBOL(lprocfs_srch);
EXPORT_SYMBOL(lprocfs_remove);
* Use is subject to license terms.
*/
/*
+ * Copyright (c) 2011 Whamcloud, Inc.
+ */
+/*
* This file is part of Lustre, http://www.lustre.org/
* Lustre is a trademark of Sun Microsystems, Inc.
*/
return count;
}
-static int osc_rd_max_pages_per_rpc(char *page, char **start, off_t off,
- int count, int *eof, void *data)
-{
- struct obd_device *dev = data;
- struct client_obd *cli = &dev->u.cli;
- int rc;
-
- client_obd_list_lock(&cli->cl_loi_list_lock);
- rc = snprintf(page, count, "%d\n", cli->cl_max_pages_per_rpc);
- client_obd_list_unlock(&cli->cl_loi_list_lock);
- return rc;
-}
-
-static int osc_wr_max_pages_per_rpc(struct file *file, const char *buffer,
- unsigned long count, void *data)
-{
- struct obd_device *dev = data;
- struct client_obd *cli = &dev->u.cli;
- struct obd_connect_data *ocd = &cli->cl_import->imp_connect_data;
- int val, rc;
-
- rc = lprocfs_write_helper(buffer, count, &val);
- if (rc)
- return rc;
-
- LPROCFS_CLIMP_CHECK(dev);
- if (val < 1 || val > ocd->ocd_brw_size >> CFS_PAGE_SHIFT) {
- LPROCFS_CLIMP_EXIT(dev);
- return -ERANGE;
- }
- client_obd_list_lock(&cli->cl_loi_list_lock);
- cli->cl_max_pages_per_rpc = val;
- client_obd_list_unlock(&cli->cl_loi_list_lock);
-
- LPROCFS_CLIMP_EXIT(dev);
- return count;
-}
-
static int osc_rd_max_rpcs_in_flight(char *page, char **start, off_t off,
int count, int *eof, void *data)
{
{ "ost_conn_uuid", lprocfs_rd_conn_uuid, 0, 0 },
{ "active", osc_rd_active,
osc_wr_active, 0 },
- { "max_pages_per_rpc", osc_rd_max_pages_per_rpc,
- osc_wr_max_pages_per_rpc, 0 },
+ { "max_pages_per_rpc", lprocfs_obd_rd_max_pages_per_rpc,
+ lprocfs_obd_wr_max_pages_per_rpc, 0 },
{ "max_rpcs_in_flight", osc_rd_max_rpcs_in_flight,
osc_wr_max_rpcs_in_flight, 0 },
{ "destroys_in_flight", osc_rd_destroys_in_flight, 0, 0 },
* Use is subject to license terms.
*/
/*
+ * Copyright (c) 2011 Whamcloud, Inc.
+ */
+/*
* This file is part of Lustre, http://www.lustre.org/
* Lustre is a trademark of Sun Microsystems, Inc.
*/
return (rc == -EIO || rc == -EROFS || rc == -ENOMEM || rc == -EAGAIN);
}
-/* return 1 if osc should be resend request */
-static inline int osc_should_resend(int resend, struct client_obd *cli)
-{
- return cfs_atomic_read(&cli->cl_resends) ?
- cfs_atomic_read(&cli->cl_resends) > resend : 1;
-}
-
#ifndef min_t
#define min_t(type,x,y) \
({ type __x = (x); type __y = (y); __x < __y ? __x: __y; })
ptlrpc_req_finished(req);
if (osc_recoverable_error(rc)) {
resends++;
- if (!osc_should_resend(resends, &exp->exp_obd->u.cli)) {
+ if (!client_should_resend(resends, &exp->exp_obd->u.cli)) {
CERROR("too many resend retries, returning error\n");
RETURN(-EIO);
}
int rc = 0;
ENTRY;
- if (!osc_should_resend(aa->aa_resends, aa->aa_cli)) {
+ if (!client_should_resend(aa->aa_resends, aa->aa_cli)) {
CERROR("too many resent retries, returning error\n");
RETURN(-EIO);
}
RETURN(0);
}
-static int ost_bulk_timeout(void *data)
-{
- ENTRY;
- /* We don't fail the connection here, because having the export
- * killed makes the (vital) call to commitrw very sad.
- */
- RETURN(1);
-}
-
static __u32 ost_checksum_bulk(struct ptlrpc_bulk_desc *desc, int opc,
cksum_type_t cksum_type)
{
/* Check if client was evicted while we were doing i/o before touching
network */
if (rc == 0) {
- /* Check if there is eviction in progress, and if so, wait for
- * it to finish */
- if (unlikely(cfs_atomic_read(&exp->exp_obd->
- obd_evict_inprogress))) {
- lwi = LWI_INTR(NULL, NULL);
- rc = l_wait_event(exp->exp_obd->
- obd_evict_inprogress_waitq,
- !cfs_atomic_read(&exp->exp_obd->
- obd_evict_inprogress),
- &lwi);
- }
- /* Check if client was evicted or tried to reconnect already */
- if (exp->exp_failed || exp->exp_abort_active_req)
- rc = -ENOTCONN;
- else {
- rc = sptlrpc_svc_wrap_bulk(req, desc);
- if (rc == 0)
- rc = ptlrpc_start_bulk_transfer(desc);
- }
-
- if (rc == 0) {
- time_t start = cfs_time_current_sec();
- do {
- long timeoutl = req->rq_deadline -
- cfs_time_current_sec();
- cfs_duration_t timeout = timeoutl <= 0 ?
- CFS_TICK : cfs_time_seconds(timeoutl);
- lwi = LWI_TIMEOUT_INTERVAL(timeout,
- cfs_time_seconds(1),
- ost_bulk_timeout,
- desc);
- rc = l_wait_event(desc->bd_waitq,
- !ptlrpc_server_bulk_active(desc) ||
- exp->exp_failed ||
- exp->exp_abort_active_req,
- &lwi);
- LASSERT(rc == 0 || rc == -ETIMEDOUT);
- /* Wait again if we changed deadline */
- } while ((rc == -ETIMEDOUT) &&
- (req->rq_deadline > cfs_time_current_sec()));
-
- if (rc == -ETIMEDOUT) {
- DEBUG_REQ(D_ERROR, req,
- "timeout on bulk PUT after %ld%+lds",
- req->rq_deadline - start,
- cfs_time_current_sec() -
- req->rq_deadline);
- ptlrpc_abort_bulk(desc);
- } else if (exp->exp_failed) {
- DEBUG_REQ(D_ERROR, req, "Eviction on bulk PUT");
- rc = -ENOTCONN;
- ptlrpc_abort_bulk(desc);
- } else if (exp->exp_abort_active_req) {
- DEBUG_REQ(D_ERROR, req, "Reconnect on bulk PUT");
- /* we don't reply anyway */
- rc = -ETIMEDOUT;
- ptlrpc_abort_bulk(desc);
- } else if (!desc->bd_success ||
- desc->bd_nob_transferred != desc->bd_nob) {
- DEBUG_REQ(D_ERROR, req, "%s bulk PUT %d(%d)",
- desc->bd_success ?
- "truncated" : "network error on",
- desc->bd_nob_transferred,
- desc->bd_nob);
- /* XXX should this be a different errno? */
- rc = -ETIMEDOUT;
- }
- } else {
- DEBUG_REQ(D_ERROR, req, "bulk PUT failed: rc %d", rc);
- }
+ rc = target_bulk_io(exp, desc, &lwi);
no_reply = rc != 0;
}
/* pause before transaction has been started */
OBD_FAIL_TIMEOUT(OBD_FAIL_OST_BRW_PAUSE_BULK, (obd_timeout + 1) / 4);
- /* Check if there is eviction in progress, and if so, wait for it to
- * finish */
- if (unlikely(cfs_atomic_read(&exp->exp_obd->obd_evict_inprogress))) {
- lwi = LWI_INTR(NULL, NULL); // We do not care how long it takes
- rc = l_wait_event(exp->exp_obd->obd_evict_inprogress_waitq,
- !cfs_atomic_read(&exp->exp_obd->obd_evict_inprogress),
- &lwi);
- }
- if (exp->exp_failed)
- GOTO(out, rc = -ENOTCONN);
-
/* ost_body, ioobj & noibuf_remote are verified and swabbed in
* ost_rw_hpreq_check(). */
body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
if (rc != 0)
GOTO(out_lock, rc);
- /* Check if client was evicted or tried to reconnect while we
- * were doing i/o before touching network */
- if (desc->bd_export->exp_failed ||
- desc->bd_export->exp_abort_active_req)
- rc = -ENOTCONN;
- else
- rc = ptlrpc_start_bulk_transfer(desc);
- if (rc == 0) {
- time_t start = cfs_time_current_sec();
- do {
- long timeoutl = req->rq_deadline -
- cfs_time_current_sec();
- cfs_duration_t timeout = timeoutl <= 0 ?
- CFS_TICK : cfs_time_seconds(timeoutl);
- lwi = LWI_TIMEOUT_INTERVAL(timeout, cfs_time_seconds(1),
- ost_bulk_timeout, desc);
- rc = l_wait_event(desc->bd_waitq,
- !ptlrpc_server_bulk_active(desc) ||
- desc->bd_export->exp_failed ||
- desc->bd_export->exp_abort_active_req,
- &lwi);
- LASSERT(rc == 0 || rc == -ETIMEDOUT);
- /* Wait again if we changed deadline */
- } while ((rc == -ETIMEDOUT) &&
- (req->rq_deadline > cfs_time_current_sec()));
-
- if (rc == -ETIMEDOUT) {
- DEBUG_REQ(D_ERROR, req,
- "timeout on bulk GET after %ld%+lds",
- req->rq_deadline - start,
- cfs_time_current_sec() -
- req->rq_deadline);
- ptlrpc_abort_bulk(desc);
- } else if (desc->bd_export->exp_failed) {
- DEBUG_REQ(D_ERROR, req, "Eviction on bulk GET");
- rc = -ENOTCONN;
- ptlrpc_abort_bulk(desc);
- } else if (desc->bd_export->exp_abort_active_req) {
- DEBUG_REQ(D_ERROR, req, "Reconnect on bulk GET");
- /* we don't reply anyway */
- rc = -ETIMEDOUT;
- ptlrpc_abort_bulk(desc);
- } else if (!desc->bd_success) {
- DEBUG_REQ(D_ERROR, req, "network error on bulk GET");
- /* XXX should this be a different errno? */
- rc = -ETIMEDOUT;
- } else {
- rc = sptlrpc_svc_unwrap_bulk(req, desc);
- }
- } else {
- DEBUG_REQ(D_ERROR, req, "ptlrpc_bulk_get failed: rc %d", rc);
- }
+ rc = target_bulk_io(exp, desc, &lwi);
no_reply = rc != 0;
skip_transfer:
* Use is subject to license terms.
*/
/*
+ * Copyright (c) 2011 Whamcloud, Inc.
+ */
+/*
* This file is part of Lustre, http://www.lustre.org/
* Lustre is a trademark of Sun Microsystems, Inc.
*
cli->cl_cksum_type = OBD_CKSUM_CRC32;
}
- if (ocd->ocd_connect_flags & OBD_CONNECT_BRW_SIZE) {
+ if (ocd->ocd_connect_flags & OBD_CONNECT_BRW_SIZE)
cli->cl_max_pages_per_rpc =
ocd->ocd_brw_size >> CFS_PAGE_SHIFT;
- }
+ else if (imp->imp_connect_op == MDS_CONNECT ||
+ imp->imp_connect_op == MGS_CONNECT)
+ cli->cl_max_pages_per_rpc = 1;
/* Reset ns_connect_flags only for initial connect. It might be
* changed in while using FS and if we reset it in reconnect
}
run_test 24u "create stripe file"
+page_size() {
+ getconf PAGE_SIZE
+}
+
test_24v() {
local NRFILES=100000
local FREE_INODES=`lfs df -i|grep "filesystem summary" | awk '{print $5}'`
mkdir -p $DIR/d24v
createmany -m $DIR/d24v/$tfile $NRFILES
+
+ cancel_lru_locks mdc
+ lctl set_param mdc.*.stats clear
+
ls $DIR/d24v >/dev/null || error "error in listing large dir"
+ # LU-5 large readdir
+ # DIRENT_SIZE = 32 bytes for sizeof(struct lu_dirent) +
+ # 8 bytes for name(filename is mostly 5 in this test) +
+ # 8 bytes for luda_type
+ # take into account of overhead in lu_dirpage header and end mark in
+ # each page, plus one in RPC_NUM calculation.
+ DIRENT_SIZE=48
+ RPC_SIZE=$(($(lctl get_param -n mdc.*.max_pages_per_rpc)*$(page_size)))
+ RPC_NUM=$(((NRFILES * DIRENT_SIZE + RPC_SIZE - 1) / RPC_SIZE + 1))
+ mds_readpage=`lctl get_param mdc.*.stats | \
+ awk '/^mds_readpage/ {print $2}'`
+ [ $mds_readpage -gt $RPC_NUM ] && \
+ error "large readdir doesn't take effect"
+
rm $DIR/d24v -rf
}
run_test 24v "list directory with large files (handle hash collision, bug: 17560)"
}
run_test 42d "test complete truncate of file with cached dirty data"
-page_size() {
- getconf PAGE_SIZE
-}
-
test_42e() { # bug22074
local TDIR=$DIR/${tdir}e
local pagesz=$(page_size)
CHECK_VALUE(MGS_TARGET_DEL);
CHECK_VALUE(MGS_SET_INFO);
+ CHECK_VALUE(LDF_EMPTY);
+ CHECK_VALUE(LDF_COLLIDE);
+ CHECK_VALUE(LU_PAGE_SIZE);
+
COMMENT("Sizes and Offsets");
BLANK_LINE();
CHECK_STRUCT(obd_uuid);