* Use is subject to license terms.
*/
/*
+ * Copyright (c) 2011 Whamcloud, Inc.
+ */
+/*
* This file is part of Lustre, http://www.lustre.org/
* Lustre is a trademark of Sun Microsystems, Inc.
*/
struct lov_mds_md *lmm, int lmm_bytes)
{
int lsm_size;
+ struct obd_import *imp = class_exp2cliimp(exp);
ENTRY;
if (lmm != NULL) {
LASSERT_SEQ_IS_MDT((*lsmp)->lsm_object_seq);
}
- (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
+ if (imp != NULL &&
+ (imp->imp_connect_data.ocd_connect_flags & OBD_CONNECT_MAXBYTES))
+ (*lsmp)->lsm_maxbytes = imp->imp_connect_data.ocd_maxbytes;
+ else
+ (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
RETURN(lsm_size);
}
/* do mds to ost setattr asynchronously */
if (!rqset) {
/* Do not wait for response. */
- ptlrpcd_add_req(req, PSCOPE_OTHER);
+ ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
} else {
req->rq_interpret_reply =
(ptlrpc_interpterer_t)osc_setattr_interpret;
sa->sa_cookie = cookie;
if (rqset == PTLRPCD_SET)
- ptlrpcd_add_req(req, PSCOPE_OTHER);
+ ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
else
ptlrpc_set_add_req(rqset, req);
}
sa->sa_upcall = upcall;
sa->sa_cookie = cookie;
if (rqset == PTLRPCD_SET)
- ptlrpcd_add_req(req, PSCOPE_OTHER);
+ ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
else
ptlrpc_set_add_req(rqset, req);
}
/* Do not wait for response */
- ptlrpcd_add_req(req, PSCOPE_OTHER);
+ ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
RETURN(0);
}
CWARN("%s: available grant < 0, the OSS is probably not running"
" with patch from bug20278 (%ld) \n",
cli->cl_import->imp_obd->obd_name, cli->cl_avail_grant);
- /* workaround for 1.6 servers which do not have
+ /* workaround for 1.6 servers which do not have
* the patch from bug20278 */
cli->cl_avail_grant = ocd->ocd_grant;
}
/* return error if any niobuf was in error */
for (i = 0; i < niocount; i++) {
- if (remote_rcs[i] < 0)
+ if ((int)remote_rcs[i] < 0)
return(remote_rcs[i]);
if (remote_rcs[i] != 0) {
static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
{
if (p1->flag != p2->flag) {
- unsigned mask = ~(OBD_BRW_FROM_GRANT|
- OBD_BRW_NOCACHE|OBD_BRW_SYNC|OBD_BRW_ASYNC);
+ unsigned mask = ~(OBD_BRW_FROM_GRANT| OBD_BRW_NOCACHE|
+ OBD_BRW_SYNC|OBD_BRW_ASYNC|OBD_BRW_NOQUOTA);
/* warn if we try to combine flags that we don't know to be
* safe to combine */
- if ((p1->flag & mask) != (p2->flag & mask))
- CERROR("is it ok to have flags 0x%x and 0x%x in the "
- "same brw?\n", p1->flag, p2->flag);
+ if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
+ CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
+ "report this at http://bugs.whamcloud.com/\n",
+ p1->flag, p2->flag);
+ }
return 0;
}
pg_prev = pga[0];
for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
struct brw_page *pg = pga[i];
+ int poff = pg->off & ~CFS_PAGE_MASK;
LASSERT(pg->count > 0);
- LASSERTF((pg->off & ~CFS_PAGE_MASK) + pg->count <= CFS_PAGE_SIZE,
- "i: %d pg: %p off: "LPU64", count: %u\n", i, pg,
- pg->off, pg->count);
+ /* make sure there is no gap in the middle of page array */
+ LASSERTF(page_count == 1 ||
+ (ergo(i == 0, poff + pg->count == CFS_PAGE_SIZE) &&
+ ergo(i > 0 && i < page_count - 1,
+ poff == 0 && pg->count == CFS_PAGE_SIZE) &&
+ ergo(i == page_count - 1, poff == 0)),
+ "i: %d/%d pg: %p off: "LPU64", count: %u\n",
+ i, page_count, pg, pg->off, pg->count);
#ifdef __linux__
LASSERTF(i == 0 || pg->off > pg_prev->off,
"i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
(pg->flag & OBD_BRW_SRVLOCK));
- ptlrpc_prep_bulk_page(desc, pg->pg, pg->off & ~CFS_PAGE_MASK,
- pg->count);
+ ptlrpc_prep_bulk_page(desc, pg->pg, poff, pg->count);
requested_nob += pg->count;
if (i > 0 && can_merge_pages(pg_prev, pg)) {
if (oa->o_valid & OBD_MD_FLFLAGS && oa->o_flags & OBD_FL_MMAP)
return 1;
- if (oa->o_valid & OBD_MD_FLFLAGS)
- cksum_type = cksum_type_unpack(oa->o_flags);
- else
- cksum_type = OBD_CKSUM_CRC32;
-
+ cksum_type = cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
+ oa->o_flags : 0);
new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
cksum_type);
char *router;
cksum_type_t cksum_type;
- if (body->oa.o_valid & OBD_MD_FLFLAGS)
- cksum_type = cksum_type_unpack(body->oa.o_flags);
- else
- cksum_type = OBD_CKSUM_CRC32;
+ cksum_type = cksum_type_unpack(body->oa.o_valid &OBD_MD_FLFLAGS?
+ body->oa.o_flags : 0);
client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
aa->aa_ppga, OST_READ,
cksum_type);
osc_check_rpcs(env, cli);
client_obd_list_unlock(&cli->cl_loi_list_lock);
if (!async)
- cl_req_completion(env, aa->aa_clerq, rc);
+ cl_req_completion(env, aa->aa_clerq, rc < 0 ? rc :
+ req->rq_bulk->bd_nob_transferred);
osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
+ ptlrpc_lprocfs_brw(req, req->rq_bulk->bd_nob_transferred);
RETURN(rc);
}
struct osc_brw_async_args *aa;
const struct obd_async_page_ops *ops;
CFS_LIST_HEAD(rpc_list);
- CFS_LIST_HEAD(tmp_list);
- unsigned int ending_offset;
- unsigned starting_offset = 0;
int srvlock = 0, mem_tight = 0;
struct cl_object *clob = NULL;
+ obd_off starting_offset = OBD_OBJECT_EOF;
+ unsigned int ending_offset;
+ int starting_page_off = 0;
ENTRY;
/* ASYNC_HP pages first. At present, when the lock the pages is
* to be canceled, the pages covered by the lock will be sent out
* with ASYNC_HP. We have to send out them as soon as possible. */
cfs_list_for_each_entry_safe(oap, tmp, &lop->lop_urgent, oap_urgent_item) {
- if (oap->oap_async_flags & ASYNC_HP)
- cfs_list_move(&oap->oap_pending_item, &tmp_list);
- else
- cfs_list_move_tail(&oap->oap_pending_item, &tmp_list);
+ if (oap->oap_async_flags & ASYNC_HP)
+ cfs_list_move(&oap->oap_pending_item, &lop->lop_pending);
if (++page_count >= cli->cl_max_pages_per_rpc)
break;
}
-
- cfs_list_splice(&tmp_list, &lop->lop_pending);
page_count = 0;
/* first we find the pages we're allowed to work with */
/* If there is a gap at the start of this page, it can't merge
* with any previous page, so we'll hand the network a
* "fragmented" page array that it can't transfer in 1 RDMA */
- if (page_count != 0 && oap->oap_page_off != 0)
+ if (oap->oap_obj_off < starting_offset) {
+ if (starting_page_off != 0)
+ break;
+
+ starting_page_off = oap->oap_page_off;
+ starting_offset = oap->oap_obj_off + starting_page_off;
+ } else if (oap->oap_page_off != 0)
break;
/* in llite being 'ready' equates to the page being locked
lop_update_pending(cli, lop, cmd, -1);
cfs_list_del_init(&oap->oap_urgent_item);
- if (page_count == 0)
- starting_offset = (oap->oap_obj_off+oap->oap_page_off) &
- (PTLRPC_MAX_BRW_SIZE - 1);
-
/* ask the caller for the size of the io as the rpc leaves. */
if (!(oap->oap_async_flags & ASYNC_COUNT_STABLE)) {
oap->oap_count =
/* now put the page back in our accounting */
cfs_list_add_tail(&oap->oap_rpc_item, &rpc_list);
+ if (page_count++ == 0)
+ srvlock = !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK);
+
if (oap->oap_brw_flags & OBD_BRW_MEMALLOC)
mem_tight = 1;
- if (page_count == 0)
- srvlock = !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK);
- if (++page_count >= cli->cl_max_pages_per_rpc)
- break;
/* End on a PTLRPC_MAX_BRW_SIZE boundary. We want full-sized
* RPCs aligned on PTLRPC_MAX_BRW_SIZE boundaries to help reads
* have the same alignment as the initial writes that allocated
* extents on the server. */
- ending_offset = (oap->oap_obj_off + oap->oap_page_off +
- oap->oap_count) & (PTLRPC_MAX_BRW_SIZE - 1);
- if (ending_offset == 0)
+ ending_offset = oap->oap_obj_off + oap->oap_page_off +
+ oap->oap_count;
+ if (!(ending_offset & (PTLRPC_MAX_BRW_SIZE - 1)))
+ break;
+
+ if (page_count >= cli->cl_max_pages_per_rpc)
break;
/* If there is a gap at the end of this page, it can't merge
aa = ptlrpc_req_async_args(req);
+ starting_offset &= PTLRPC_MAX_BRW_SIZE - 1;
if (cmd == OBD_BRW_READ) {
lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
(starting_offset >> CFS_PAGE_SHIFT) + 1);
}
- ptlrpc_lprocfs_brw(req, aa->aa_requested_nob);
client_obd_list_lock(&cli->cl_loi_list_lock);
page_count, aa, cli->cl_r_in_flight, cli->cl_w_in_flight);
req->rq_interpret_reply = brw_interpret;
- ptlrpcd_add_req(req, PSCOPE_BRW);
+
+ /* XXX: Maybe the caller can check the RPC bulk descriptor to see which
+ * CPU/NUMA node the majority of pages were allocated on, and try
+ * to assign the async RPC to the CPU core (PDL_POLICY_PREFERRED)
+ * to reduce cross-CPU memory traffic.
+ *
+ * But on the other hand, we expect that multiple ptlrpcd threads
+ * and the initial write sponsor can run in parallel, especially
+ * when data checksum is enabled, which is CPU-bound operation and
+ * single ptlrpcd thread cannot process in time. So more ptlrpcd
+ * threads sharing BRW load (with PDL_POLICY_ROUND) seems better.
+ */
+ ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
RETURN(1);
}
if (rc > 0)
race_counter = 0;
- else
+ else if (rc == 0)
race_counter++;
}
if (lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ)) {
if (rc > 0)
race_counter = 0;
- else
+ else if (rc == 0)
race_counter++;
}
/* force the caller to try sync io. this can jump the list
* of queued writes and create a discontiguous rpc stream */
- if (cli->cl_dirty_max < CFS_PAGE_SIZE || cli->cl_ar.ar_force_sync ||
- loi->loi_ar.ar_force_sync)
+ if (OBD_FAIL_CHECK(OBD_FAIL_OSC_NO_GRANT) ||
+ cli->cl_dirty_max < CFS_PAGE_SIZE ||
+ cli->cl_ar.ar_force_sync || loi->loi_ar.ar_force_sync)
RETURN(-EDQUOT);
/* Hopefully normal case - cache space and write credits available */
int osc_queue_async_io(const struct lu_env *env, struct obd_export *exp,
struct lov_stripe_md *lsm, struct lov_oinfo *loi,
- struct osc_async_page *oap, int cmd, obd_off off,
- int count, obd_flag brw_flags,
- enum async_flags async_flags)
+ struct osc_async_page *oap, int cmd, int off,
+ int count, obd_flag brw_flags, enum async_flags async_flags)
{
struct client_obd *cli = &exp->exp_obd->u.cli;
int rc = 0;
req->rq_interpret_reply =
(ptlrpc_interpterer_t)osc_enqueue_interpret;
if (rqset == PTLRPCD_SET)
- ptlrpcd_add_req(req, PSCOPE_OTHER);
+ ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
else
ptlrpc_set_add_req(rqset, req);
} else if (intent) {
* avail < ~0.1% max max = avail + used
* 1025 * avail < avail + used used = blocks - free
* 1024 * avail < used
- * 1024 * avail < blocks - free
- * avail < ((blocks - free) >> 10)
+ * 1024 * avail < blocks - free
+ * avail < ((blocks - free) >> 10)
*
* On very large disk, say 16TB 0.1% will be 16 GB. We don't want to
* lose that amount of space so in those cases we report no space left
((msfs->os_ffree < 32) || (msfs->os_bavail < used))))
cli->cl_oscc.oscc_flags |= OSCC_FLAG_NOSPC;
else if (unlikely(((cli->cl_oscc.oscc_flags & OSCC_FLAG_NOSPC) != 0) &&
- (msfs->os_ffree > 64) && (msfs->os_bavail > (used << 1))))
- cli->cl_oscc.oscc_flags &= ~OSCC_FLAG_NOSPC;
+ (msfs->os_ffree > 64) &&
+ (msfs->os_bavail > (used << 1)))) {
+ cli->cl_oscc.oscc_flags &= ~(OSCC_FLAG_NOSPC |
+ OSCC_FLAG_NOSPC_BLK);
+ }
+
+ if (unlikely(((cli->cl_oscc.oscc_flags & OSCC_FLAG_NOSPC) != 0) &&
+ (msfs->os_bavail < used)))
+ cli->cl_oscc.oscc_flags |= OSCC_FLAG_NOSPC_BLK;
cfs_spin_unlock(&cli->cl_oscc.oscc_lock);
GOTO(out, err);
case OBD_IOC_CLIENT_RECOVER:
err = ptlrpc_recover_import(obd->u.cli.cl_import,
- data->ioc_inlbuf1);
+ data->ioc_inlbuf1, 0);
if (err > 0)
err = 0;
GOTO(out, err);
ptlrpc_set_add_req(set, req);
ptlrpc_check_set(NULL, set);
} else
- ptlrpcd_add_req(req, PSCOPE_OTHER);
+ ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
RETURN(0);
}
struct osc_creator *oscc = &obd->u.cli.cl_oscc;
cfs_spin_lock(&oscc->oscc_lock);
- oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
+ oscc->oscc_flags &= ~(OSCC_FLAG_NOSPC |
+ OSCC_FLAG_NOSPC_BLK);
cfs_spin_unlock(&oscc->oscc_lock);
}
rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
* client_disconnect_export()
*/
obd_zombie_barrier();
- /* If we set up but never connected, the
- client import will not have been cleaned. */
- if (obd->u.cli.cl_import) {
- struct obd_import *imp;
- cfs_down_write(&obd->u.cli.cl_sem);
- imp = obd->u.cli.cl_import;
- CDEBUG(D_CONFIG, "%s: client import never connected\n",
- obd->obd_name);
- ptlrpc_invalidate_import(imp);
- if (imp->imp_rq_pool) {
- ptlrpc_free_rq_pool(imp->imp_rq_pool);
- imp->imp_rq_pool = NULL;
- }
- class_destroy_import(imp);
- cfs_up_write(&obd->u.cli.cl_sem);
- obd->u.cli.cl_import = NULL;
- }
+ obd_cleanup_client_import(obd);
rc = obd_llog_finish(obd, 0);
if (rc != 0)
CERROR("failed to cleanup llogging subsystems\n");