X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lustre%2Fosc%2Fosc_request.c;h=995391d454c4e2b74e863023781dd1e84857912a;hb=51206e8cd42134400fa0b6259a92d7138f3dc984;hp=713b6a8711e4417142f6f883766aed2cb4f0b27a;hpb=0a859380c36ac24871f221b35042f76c56b04438;p=fs%2Flustre-release.git diff --git a/lustre/osc/osc_request.c b/lustre/osc/osc_request.c index 713b6a8..995391d 100644 --- a/lustre/osc/osc_request.c +++ b/lustre/osc/osc_request.c @@ -26,8 +26,10 @@ * GPL HEADER END */ /* - * Copyright 2008 Sun Microsystems, Inc. All rights reserved + * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved. * Use is subject to license terms. + * + * Copyright (c) 2011, 2012, Whamcloud, Inc. */ /* * This file is part of Lustre, http://www.lustre.org/ @@ -63,12 +65,11 @@ #include #include "osc_internal.h" -static quota_interface_t *quota_interface = NULL; -extern quota_interface_t osc_quota_interface; - static void osc_release_ppga(struct brw_page **ppga, obd_count count); static int brw_interpret(const struct lu_env *env, struct ptlrpc_request *req, void *data, int rc); +static void osc_check_rpcs0(const struct lu_env *env, struct client_obd *cli, + int ptlrpc); int osc_cleanup(struct obd_device *obd); /* Pack OSC object metadata for disk storage (LE byte order). */ @@ -109,6 +110,7 @@ static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp, struct lov_mds_md *lmm, int lmm_bytes) { int lsm_size; + struct obd_import *imp = class_exp2cliimp(exp); ENTRY; if (lmm != NULL) { @@ -156,7 +158,11 @@ static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp, LASSERT_SEQ_IS_MDT((*lsmp)->lsm_object_seq); } - (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES; + if (imp != NULL && + (imp->imp_connect_data.ocd_connect_flags & OBD_CONNECT_MAXBYTES)) + (*lsmp)->lsm_maxbytes = imp->imp_connect_data.ocd_maxbytes; + else + (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES; RETURN(lsm_size); } @@ -395,7 +401,7 @@ int osc_setattr_async_base(struct obd_export *exp, struct obd_info *oinfo, /* do mds to ost setattr asynchronously */ if (!rqset) { /* Do not wait for response. */ - ptlrpcd_add_req(req, PSCOPE_OTHER); + ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1); } else { req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret; @@ -407,7 +413,7 @@ int osc_setattr_async_base(struct obd_export *exp, struct obd_info *oinfo, sa->sa_cookie = cookie; if (rqset == PTLRPCD_SET) - ptlrpcd_add_req(req, PSCOPE_OTHER); + ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1); else ptlrpc_set_add_req(rqset, req); } @@ -546,7 +552,7 @@ int osc_punch_base(struct obd_export *exp, struct obd_info *oinfo, sa->sa_upcall = upcall; sa->sa_cookie = cookie; if (rqset == PTLRPCD_SET) - ptlrpcd_add_req(req, PSCOPE_OTHER); + ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1); else ptlrpc_set_add_req(rqset, req); @@ -564,16 +570,40 @@ static int osc_punch(struct obd_export *exp, struct obd_info *oinfo, oinfo->oi_cb_up, oinfo, rqset); } -static int osc_sync(struct obd_export *exp, struct obdo *oa, - struct lov_stripe_md *md, obd_size start, obd_size end, - void *capa) +static int osc_sync_interpret(const struct lu_env *env, + struct ptlrpc_request *req, + void *arg, int rc) +{ + struct osc_async_args *aa = arg; + struct ost_body *body; + ENTRY; + + if (rc) + GOTO(out, rc); + + body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY); + if (body == NULL) { + CERROR ("can't unpack ost_body\n"); + GOTO(out, rc = -EPROTO); + } + + *aa->aa_oi->oi_oa = body->oa; +out: + rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc); + RETURN(rc); +} + +static int osc_sync(struct obd_export *exp, struct obd_info *oinfo, + obd_size start, obd_size end, + struct ptlrpc_request_set *set) { struct ptlrpc_request *req; struct ost_body *body; + struct osc_async_args *aa; int rc; ENTRY; - if (!oa) { + if (!oinfo->oi_oa) { CDEBUG(D_INFO, "oa NULL\n"); RETURN(-EINVAL); } @@ -582,7 +612,7 @@ static int osc_sync(struct obd_export *exp, struct obdo *oa, if (req == NULL) RETURN(-ENOMEM); - osc_set_capa_size(req, &RMF_CAPA1, capa); + osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa); rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC); if (rc) { ptlrpc_request_free(req); @@ -592,28 +622,21 @@ static int osc_sync(struct obd_export *exp, struct obdo *oa, /* overload the size and blocks fields in the oa with start/end */ body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY); LASSERT(body); - lustre_set_wire_obdo(&body->oa, oa); + lustre_set_wire_obdo(&body->oa, oinfo->oi_oa); body->oa.o_size = start; body->oa.o_blocks = end; body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS); - osc_pack_capa(req, body, capa); + osc_pack_capa(req, body, oinfo->oi_capa); ptlrpc_request_set_replen(req); + req->rq_interpret_reply = osc_sync_interpret; - rc = ptlrpc_queue_wait(req); - if (rc) - GOTO(out, rc); - - body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY); - if (body == NULL) - GOTO(out, rc = -EPROTO); - - lustre_get_wire_obdo(oa, &body->oa); + CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args)); + aa = ptlrpc_req_async_args(req); + aa->aa_oi = oinfo; - EXIT; - out: - ptlrpc_req_finished(req); - return rc; + ptlrpc_set_add_req(set, req); + RETURN (0); } /* Find and cancel locally locks matched by @mode in the resource found by @@ -743,7 +766,7 @@ static int osc_destroy(struct obd_export *exp, struct obdo *oa, } /* Do not wait for response */ - ptlrpcd_add_req(req, PSCOPE_OTHER); + ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1); RETURN(0); } @@ -837,7 +860,16 @@ static void osc_release_write_grant(struct client_obd *cli, cli->cl_dirty_transit -= CFS_PAGE_SIZE; } if (!sent) { - cli->cl_lost_grant += CFS_PAGE_SIZE; + /* Reclaim grant from truncated pages. This is used to solve + * write-truncate and grant all gone(to lost_grant) problem. + * For a vfs write this problem can be easily solved by a sync + * write, however, this is not an option for page_mkwrite() + * because grant has to be allocated before a page becomes + * dirty. */ + if (cli->cl_avail_grant < PTLRPC_MAX_BRW_SIZE) + cli->cl_avail_grant += CFS_PAGE_SIZE; + else + cli->cl_lost_grant += CFS_PAGE_SIZE; CDEBUG(D_CACHE, "lost grant: %lu avail grant: %lu dirty: %lu\n", cli->cl_lost_grant, cli->cl_avail_grant, cli->cl_dirty); } else if (CFS_PAGE_SIZE != blocksize && pga->count != CFS_PAGE_SIZE) { @@ -901,6 +933,9 @@ void osc_wake_cache_waiters(struct client_obd *cli) &ocw->ocw_oap->oap_brw_page); } + CDEBUG(D_CACHE, "wake up %p for oap %p, avail grant %ld\n", + ocw, ocw->ocw_oap, cli->cl_avail_grant); + cfs_waitq_signal(&ocw->ocw_waitq); } @@ -943,7 +978,7 @@ static int osc_shrink_grant_interpret(const struct lu_env *env, LASSERT(body); osc_update_grant(cli, body); out: - OBD_FREE_PTR(oa); + OBDO_FREE(oa); return rc; } @@ -953,6 +988,10 @@ static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa) oa->o_grant = cli->cl_avail_grant / 4; cli->cl_avail_grant -= oa->o_grant; client_obd_list_unlock(&cli->cl_loi_list_lock); + if (!(oa->o_valid & OBD_MD_FLFLAGS)) { + oa->o_valid |= OBD_MD_FLFLAGS; + oa->o_flags = 0; + } oa->o_flags |= OBD_FL_SHRINK_GRANT; osc_update_next_shrink(cli); } @@ -1003,6 +1042,10 @@ int osc_shrink_grant_to_target(struct client_obd *cli, long target) body->oa.o_grant = cli->cl_avail_grant - target; cli->cl_avail_grant = target; client_obd_list_unlock(&cli->cl_loi_list_lock); + if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) { + body->oa.o_valid |= OBD_MD_FLFLAGS; + body->oa.o_flags = 0; + } body->oa.o_flags |= OBD_FL_SHRINK_GRANT; osc_update_next_shrink(cli); @@ -1020,6 +1063,11 @@ static int osc_should_shrink_grant(struct client_obd *client) { cfs_time_t time = cfs_time_current(); cfs_time_t next_shrink = client->cl_next_shrink_grant; + + if ((client->cl_import->imp_connect_data.ocd_connect_flags & + OBD_CONNECT_GRANT_SHRINK) == 0) + return 0; + if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) { if (client->cl_import->imp_state == LUSTRE_IMP_FULL && client->cl_avail_grant > GRANT_SHRINK_LIMIT) @@ -1082,11 +1130,21 @@ static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd) cli->cl_avail_grant = ocd->ocd_grant; else cli->cl_avail_grant = ocd->ocd_grant - cli->cl_dirty; + + if (cli->cl_avail_grant < 0) { + CWARN("%s: available grant < 0, the OSS is probably not running" + " with patch from bug20278 (%ld) \n", + cli->cl_import->imp_obd->obd_name, cli->cl_avail_grant); + /* workaround for 1.6 servers which do not have + * the patch from bug20278 */ + cli->cl_avail_grant = ocd->ocd_grant; + } + client_obd_list_unlock(&cli->cl_loi_list_lock); - CDEBUG(D_CACHE, "setting cl_avail_grant: %ld cl_lost_grant: %ld \n", + CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld \n", + cli->cl_import->imp_obd->obd_name, cli->cl_avail_grant, cli->cl_lost_grant); - LASSERT(cli->cl_avail_grant >= 0); if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK && cfs_list_empty(&cli->cl_grant_shrink_list)) @@ -1149,7 +1207,7 @@ static int check_write_rcs(struct ptlrpc_request *req, /* return error if any niobuf was in error */ for (i = 0; i < niocount; i++) { - if (remote_rcs[i] < 0) + if ((int)remote_rcs[i] < 0) return(remote_rcs[i]); if (remote_rcs[i] != 0) { @@ -1171,14 +1229,16 @@ static int check_write_rcs(struct ptlrpc_request *req, static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2) { if (p1->flag != p2->flag) { - unsigned mask = ~(OBD_BRW_FROM_GRANT| - OBD_BRW_NOCACHE|OBD_BRW_SYNC); + unsigned mask = ~(OBD_BRW_FROM_GRANT| OBD_BRW_NOCACHE| + OBD_BRW_SYNC|OBD_BRW_ASYNC|OBD_BRW_NOQUOTA); /* warn if we try to combine flags that we don't know to be * safe to combine */ - if ((p1->flag & mask) != (p2->flag & mask)) - CERROR("is it ok to have flags 0x%x and 0x%x in the " - "same brw?\n", p1->flag, p2->flag); + if (unlikely((p1->flag & mask) != (p2->flag & mask))) { + CWARN("Saw flags 0x%x and 0x%x in the same brw, please " + "report this at http://bugs.whamcloud.com/\n", + p1->flag, p2->flag); + } return 0; } @@ -1218,14 +1278,15 @@ static obd_count osc_checksum_bulk(int nob, obd_count pg_count, if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND)) cksum++; - return cksum; + return fini_checksum(cksum, cksum_type); } static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa, struct lov_stripe_md *lsm, obd_count page_count, struct brw_page **pga, struct ptlrpc_request **reqp, - struct obd_capa *ocapa, int reserve) + struct obd_capa *ocapa, int reserve, + int resend) { struct ptlrpc_request *req; struct ptlrpc_bulk_desc *desc; @@ -1300,11 +1361,17 @@ static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa, pg_prev = pga[0]; for (requested_nob = i = 0; i < page_count; i++, niobuf++) { struct brw_page *pg = pga[i]; + int poff = pg->off & ~CFS_PAGE_MASK; LASSERT(pg->count > 0); - LASSERTF((pg->off & ~CFS_PAGE_MASK) + pg->count <= CFS_PAGE_SIZE, - "i: %d pg: %p off: "LPU64", count: %u\n", i, pg, - pg->off, pg->count); + /* make sure there is no gap in the middle of page array */ + LASSERTF(page_count == 1 || + (ergo(i == 0, poff + pg->count == CFS_PAGE_SIZE) && + ergo(i > 0 && i < page_count - 1, + poff == 0 && pg->count == CFS_PAGE_SIZE) && + ergo(i == page_count - 1, poff == 0)), + "i: %d/%d pg: %p off: "LPU64", count: %u\n", + i, page_count, pg, pg->off, pg->count); #ifdef __linux__ LASSERTF(i == 0 || pg->off > pg_prev->off, "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64 @@ -1320,8 +1387,7 @@ static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa, LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) == (pg->flag & OBD_BRW_SRVLOCK)); - ptlrpc_prep_bulk_page(desc, pg->pg, pg->off & ~CFS_PAGE_MASK, - pg->count); + ptlrpc_prep_bulk_page(desc, pg->pg, poff, pg->count); requested_nob += pg->count; if (i > 0 && can_merge_pages(pg_prev, pg)) { @@ -1341,12 +1407,20 @@ static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa, &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount)); osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0); + if (resend) { + if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) { + body->oa.o_valid |= OBD_MD_FLFLAGS; + body->oa.o_flags = 0; + } + body->oa.o_flags |= OBD_FL_RECOV_RESEND; + } + if (osc_should_shrink_grant(cli)) osc_shrink_grant_local(cli, &body->oa); /* size[REQ_REC_OFF] still sizeof (*body) */ if (opc == OST_WRITE) { - if (unlikely(cli->cl_checksum) && + if (cli->cl_checksum && !sptlrpc_flavor_has_bulk(&req->rq_flvr)) { /* store cl_cksum_type in a local variable since * it can be changed via lprocfs */ @@ -1377,7 +1451,7 @@ static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa, req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER, sizeof(__u32) * niocount); } else { - if (unlikely(cli->cl_checksum) && + if (cli->cl_checksum && !sptlrpc_flavor_has_bulk(&req->rq_flvr)) { if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) body->oa.o_flags = 0; @@ -1422,11 +1496,8 @@ static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer, return 0; } - if (oa->o_valid & OBD_MD_FLFLAGS) - cksum_type = cksum_type_unpack(oa->o_flags); - else - cksum_type = OBD_CKSUM_CRC32; - + cksum_type = cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ? + oa->o_flags : 0); new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE, cksum_type); @@ -1481,7 +1552,6 @@ static int osc_brw_fini_request(struct ptlrpc_request *req, int rc) RETURN(-EPROTO); } -#ifdef HAVE_QUOTA_SUPPORT /* set/clear over quota flag for a uid/gid */ if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE && body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) { @@ -1490,10 +1560,10 @@ static int osc_brw_fini_request(struct ptlrpc_request *req, int rc) CDEBUG(D_QUOTA, "setdq for [%u %u] with valid "LPX64", flags %x\n", body->oa.o_uid, body->oa.o_gid, body->oa.o_valid, body->oa.o_flags); - lquota_setdq(quota_interface, cli, qid, body->oa.o_valid, - body->oa.o_flags); + osc_quota_setdq(cli, qid, body->oa.o_valid, body->oa.o_flags); } -#endif + + osc_update_grant(cli, body); if (rc < 0) RETURN(rc); @@ -1501,8 +1571,6 @@ static int osc_brw_fini_request(struct ptlrpc_request *req, int rc) if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM) client_cksum = aa->aa_oa->o_cksum; /* save for later */ - osc_update_grant(cli, body); - if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) { if (rc > 0) { CERROR("Unexpected +ve rc %d\n", rc); @@ -1554,10 +1622,8 @@ static int osc_brw_fini_request(struct ptlrpc_request *req, int rc) char *router; cksum_type_t cksum_type; - if (body->oa.o_valid & OBD_MD_FLFLAGS) - cksum_type = cksum_type_unpack(body->oa.o_flags); - else - cksum_type = OBD_CKSUM_CRC32; + cksum_type = cksum_type_unpack(body->oa.o_valid &OBD_MD_FLFLAGS? + body->oa.o_flags : 0); client_cksum = osc_checksum_bulk(rc, aa->aa_page_count, aa->aa_ppga, OST_READ, cksum_type); @@ -1572,7 +1638,7 @@ static int osc_brw_fini_request(struct ptlrpc_request *req, int rc) if (server_cksum == ~0 && rc > 0) { CERROR("Protocol error: server %s set the 'checksum' " "bit, but didn't send a checksum. Not fatal, " - "but please notify on http://bugzilla.lustre.org/\n", + "but please notify on http://bugs.whamcloud.com/\n", libcfs_nid2str(peer->nid)); } else if (server_cksum != client_cksum) { LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from " @@ -1630,19 +1696,25 @@ static int osc_brw_internal(int cmd, struct obd_export *exp, struct obdo *oa, struct ptlrpc_request *req; int rc; cfs_waitq_t waitq; - int resends = 0; + int generation, resends = 0; struct l_wait_info lwi; ENTRY; cfs_waitq_init(&waitq); + generation = exp->exp_obd->u.cli.cl_import->imp_generation; restart_bulk: rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm, - page_count, pga, &req, ocapa, 0); + page_count, pga, &req, ocapa, 0, resends); if (rc != 0) return (rc); + if (resends) { + req->rq_generation_set = 1; + req->rq_import_generation = generation; + } + rc = ptlrpc_queue_wait(req); if (rc == -ETIMEDOUT && req->rq_resend) { @@ -1654,19 +1726,34 @@ restart_bulk: rc = osc_brw_fini_request(req, rc); ptlrpc_req_finished(req); + /* When server return -EINPROGRESS, client should always retry + * regardless of the number of times the bulk was resent already.*/ if (osc_recoverable_error(rc)) { resends++; - if (!osc_should_resend(resends, &exp->exp_obd->u.cli)) { - CERROR("too many resend retries, returning error\n"); - RETURN(-EIO); + if (rc != -EINPROGRESS && + !client_should_resend(resends, &exp->exp_obd->u.cli)) { + CERROR("%s: too many resend retries for object: " + ""LPU64":"LPU64", rc = %d.\n", + exp->exp_obd->obd_name, oa->o_id, oa->o_seq, rc); + goto out; + } + if (generation != + exp->exp_obd->u.cli.cl_import->imp_generation) { + CDEBUG(D_HA, "%s: resend cross eviction for object: " + ""LPU64":"LPU64", rc = %d.\n", + exp->exp_obd->obd_name, oa->o_id, oa->o_seq, rc); + goto out; } - lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL, NULL); + lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL, + NULL); l_wait_event(waitq, 0, &lwi); goto restart_bulk; } - +out: + if (rc == -EAGAIN || rc == -EINPROGRESS) + rc = -EIO; RETURN (rc); } @@ -1680,11 +1767,6 @@ int osc_brw_redo_request(struct ptlrpc_request *request, int rc = 0; ENTRY; - if (!osc_should_resend(aa->aa_resends, aa->aa_cli)) { - CERROR("too many resend retries, returning error\n"); - RETURN(-EIO); - } - DEBUG_REQ(D_ERROR, request, "redo for recoverable error"); rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) == @@ -1692,7 +1774,7 @@ int osc_brw_redo_request(struct ptlrpc_request *request, aa->aa_cli, aa->aa_oa, NULL /* lsm unused by osc currently */, aa->aa_page_count, aa->aa_ppga, - &new_req, aa->aa_ocapa, 0); + &new_req, aa->aa_ocapa, 0, 1); if (rc) RETURN(rc); @@ -1716,6 +1798,8 @@ int osc_brw_redo_request(struct ptlrpc_request *request, new_req->rq_interpret_reply = request->rq_interpret_reply; new_req->rq_async_args = request->rq_async_args; new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends; + new_req->rq_generation_set = 1; + new_req->rq_import_generation = request->rq_import_generation; new_aa = ptlrpc_req_async_args(new_req); @@ -1912,7 +1996,6 @@ static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap, static int lop_makes_rpc(struct client_obd *cli, struct loi_oap_pages *lop, int cmd) { - int optimal; ENTRY; if (lop->lop_num_pending == 0) @@ -1933,23 +2016,17 @@ static int lop_makes_rpc(struct client_obd *cli, struct loi_oap_pages *lop, CDEBUG(D_CACHE, "urgent request forcing RPC\n"); RETURN(1); } - /* fire off rpcs when we have 'optimal' rpcs as tuned for the wire. */ - optimal = cli->cl_max_pages_per_rpc; + if (cmd & OBD_BRW_WRITE) { /* trigger a write rpc stream as long as there are dirtiers * waiting for space. as they're waiting, they're not going to - * create more pages to coallesce with what's waiting.. */ + * create more pages to coalesce with what's waiting.. */ if (!cfs_list_empty(&cli->cl_cache_waiters)) { CDEBUG(D_CACHE, "cache waiters forcing RPC\n"); RETURN(1); } - /* +16 to avoid triggering rpcs that would want to include pages - * that are being queued but which can't be made ready until - * the queuer finishes with the page. this is a wart for - * llite::commit_write() */ - optimal += 16; } - if (lop->lop_num_pending >= optimal) + if (lop->lop_num_pending >= cli->cl_max_pages_per_rpc) RETURN(1); RETURN(0); @@ -2140,9 +2217,9 @@ static void osc_ap_completion(const struct lu_env *env, rc = oap->oap_caller_ops->ap_completion(env, oap->oap_caller_data, oap->oap_cmd, oa, rc); - /* ll_ap_completion (from llite) drops PG_locked. so, a new - * I/O on the page could start, but OSC calls it under lock - * and thus we can add oap back to pending safely */ + /* cl_page_completion() drops PG_locked. so, a new I/O on the page could + * start, but OSC calls it under lock and thus we can add oap back to + * pending safely */ if (rc) /* upper layer wants to leave the page on pending queue */ osc_oap_to_pending(oap); @@ -2151,6 +2228,18 @@ static void osc_ap_completion(const struct lu_env *env, EXIT; } +static int brw_queue_work(const struct lu_env *env, void *data) +{ + struct client_obd *cli = data; + + CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli); + + client_obd_list_lock(&cli->cl_loi_list_lock); + osc_check_rpcs0(env, cli, 1); + client_obd_list_unlock(&cli->cl_loi_list_lock); + RETURN(0); +} + static int brw_interpret(const struct lu_env *env, struct ptlrpc_request *req, void *data, int rc) { @@ -2161,10 +2250,29 @@ static int brw_interpret(const struct lu_env *env, rc = osc_brw_fini_request(req, rc); CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc); + /* When server return -EINPROGRESS, client should always retry + * regardless of the number of times the bulk was resent already. */ if (osc_recoverable_error(rc)) { - rc = osc_brw_redo_request(req, aa); + if (req->rq_import_generation != + req->rq_import->imp_generation) { + CDEBUG(D_HA, "%s: resend cross eviction for object: " + ""LPU64":"LPU64", rc = %d.\n", + req->rq_import->imp_obd->obd_name, + aa->aa_oa->o_id, aa->aa_oa->o_seq, rc); + } else if (rc == -EINPROGRESS || + client_should_resend(aa->aa_resends, aa->aa_cli)) { + rc = osc_brw_redo_request(req, aa); + } else { + CERROR("%s: too many resent retries for object: " + ""LPU64":"LPU64", rc = %d.\n", + req->rq_import->imp_obd->obd_name, + aa->aa_oa->o_id, aa->aa_oa->o_seq, rc); + } + if (rc == 0) RETURN(0); + else if (rc == -EAGAIN || rc == -EINPROGRESS) + rc = -EIO; } if (aa->aa_ocapa) { @@ -2173,7 +2281,6 @@ static int brw_interpret(const struct lu_env *env, } cli = aa->aa_cli; - client_obd_list_lock(&cli->cl_loi_list_lock); /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters @@ -2196,19 +2303,20 @@ static int brw_interpret(const struct lu_env *env, } OBDO_FREE(aa->aa_oa); } else { /* from async_internal() */ - int i; + obd_count i; for (i = 0; i < aa->aa_page_count; i++) osc_release_write_grant(aa->aa_cli, aa->aa_ppga[i], 1); - - if (aa->aa_oa->o_flags & OBD_FL_TEMPORARY) - OBDO_FREE(aa->aa_oa); } osc_wake_cache_waiters(cli); - osc_check_rpcs(env, cli); + osc_check_rpcs0(env, cli, 1); client_obd_list_unlock(&cli->cl_loi_list_lock); + if (!async) - cl_req_completion(env, aa->aa_clerq, rc); + cl_req_completion(env, aa->aa_clerq, rc < 0 ? rc : + req->rq_bulk->bd_nob_transferred); osc_release_ppga(aa->aa_ppga, aa->aa_page_count); + ptlrpc_lprocfs_brw(req, req->rq_bulk->bd_nob_transferred); + RETURN(rc); } @@ -2222,19 +2330,20 @@ static struct ptlrpc_request *osc_build_req(const struct lu_env *env, struct osc_brw_async_args *aa; struct obdo *oa = NULL; const struct obd_async_page_ops *ops = NULL; - void *caller_data = NULL; struct osc_async_page *oap; struct osc_async_page *tmp; - struct ost_body *body; struct cl_req *clerq = NULL; enum cl_req_type crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ; struct ldlm_lock *lock = NULL; struct cl_req_attr crattr; - int i, rc; + int i, rc, mpflag = 0; ENTRY; LASSERT(!cfs_list_empty(rpc_list)); + if (cmd & OBD_BRW_MEMALLOC) + mpflag = cfs_memory_pressure_get_and_set(); + memset(&crattr, 0, sizeof crattr); OBD_ALLOC(pga, sizeof(*pga) * page_count); if (pga == NULL) @@ -2249,7 +2358,6 @@ static struct ptlrpc_request *osc_build_req(const struct lu_env *env, struct cl_page *page = osc_oap2cl_page(oap); if (ops == NULL) { ops = oap->oap_caller_ops; - caller_data = oap->oap_caller_data; clerq = cl_req_alloc(env, page, crt, 1 /* only 1-object rpcs for @@ -2284,18 +2392,20 @@ static struct ptlrpc_request *osc_build_req(const struct lu_env *env, sort_brw_pages(pga, page_count); rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count, - pga, &req, crattr.cra_capa, 1); + pga, &req, crattr.cra_capa, 1, 0); if (rc != 0) { CERROR("prep_req failed: %d\n", rc); GOTO(out, req = ERR_PTR(rc)); } + if (cmd & OBD_BRW_MEMALLOC) + req->rq_memalloc = 1; + /* Need to update the timestamps after the request is built in case * we race with setattr (locally or in queue at OST). If OST gets * later setattr before earlier BRW (as determined by the request xid), * the OST will not use BRW timestamps. Sadly, there is no obvious * way to do this in a single call. bug 10150 */ - body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY); cl_req_attr_set(env, clerq, &crattr, OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME); @@ -2306,6 +2416,9 @@ static struct ptlrpc_request *osc_build_req(const struct lu_env *env, CFS_INIT_LIST_HEAD(rpc_list); aa->aa_clerq = clerq; out: + if (cmd & OBD_BRW_MEMALLOC) + cfs_memory_pressure_restore(mpflag); + capa_put(crattr.cra_capa); if (IS_ERR(req)) { if (oa) @@ -2340,13 +2453,14 @@ out: * \param cmd OBD_BRW_* macroses * \param lop pending pages * - * \return zero if pages successfully add to send queue. - * \return not zere if error occurring. + * \return zero if no page added to send queue. + * \return 1 if pages successfully added to send queue. + * \return negative on errors. */ static int osc_send_oap_rpc(const struct lu_env *env, struct client_obd *cli, - struct lov_oinfo *loi, - int cmd, struct loi_oap_pages *lop) + struct lov_oinfo *loi, int cmd, + struct loi_oap_pages *lop, pdl_policy_t pol) { struct ptlrpc_request *req; obd_count page_count = 0; @@ -2354,26 +2468,26 @@ osc_send_oap_rpc(const struct lu_env *env, struct client_obd *cli, struct osc_brw_async_args *aa; const struct obd_async_page_ops *ops; CFS_LIST_HEAD(rpc_list); - CFS_LIST_HEAD(tmp_list); - unsigned int ending_offset; - unsigned starting_offset = 0; - int srvlock = 0; + int srvlock = 0, mem_tight = 0; struct cl_object *clob = NULL; + obd_off starting_offset = OBD_OBJECT_EOF; + unsigned int ending_offset; + int starting_page_off = 0; ENTRY; /* ASYNC_HP pages first. At present, when the lock the pages is * to be canceled, the pages covered by the lock will be sent out * with ASYNC_HP. We have to send out them as soon as possible. */ cfs_list_for_each_entry_safe(oap, tmp, &lop->lop_urgent, oap_urgent_item) { - if (oap->oap_async_flags & ASYNC_HP) - cfs_list_move(&oap->oap_pending_item, &tmp_list); - else - cfs_list_move_tail(&oap->oap_pending_item, &tmp_list); + if (oap->oap_async_flags & ASYNC_HP) + cfs_list_move(&oap->oap_pending_item, &rpc_list); + else if (!(oap->oap_brw_flags & OBD_BRW_SYNC)) + /* only do this for writeback pages. */ + cfs_list_move_tail(&oap->oap_pending_item, &rpc_list); if (++page_count >= cli->cl_max_pages_per_rpc) break; } - - cfs_list_splice(&tmp_list, &lop->lop_pending); + cfs_list_splice_init(&rpc_list, &lop->lop_pending); page_count = 0; /* first we find the pages we're allowed to work with */ @@ -2402,14 +2516,20 @@ osc_send_oap_rpc(const struct lu_env *env, struct client_obd *cli, /* If there is a gap at the start of this page, it can't merge * with any previous page, so we'll hand the network a * "fragmented" page array that it can't transfer in 1 RDMA */ - if (page_count != 0 && oap->oap_page_off != 0) + if (oap->oap_obj_off < starting_offset) { + if (starting_page_off != 0) + break; + + starting_page_off = oap->oap_page_off; + starting_offset = oap->oap_obj_off + starting_page_off; + } else if (oap->oap_page_off != 0) break; /* in llite being 'ready' equates to the page being locked * until completion unlocks it. commit_write submits a page * as not ready because its unlock will happen unconditionally * as the call returns. if we race with commit_write giving - * us that page we dont' want to create a hole in the page + * us that page we don't want to create a hole in the page * stream, so we stop and leave the rpc to be fired by * another dirtier or kupdated interval (the not ready page * will still be on the dirty list). we could call in @@ -2453,37 +2573,12 @@ osc_send_oap_rpc(const struct lu_env *env, struct client_obd *cli, } if (oap == NULL) break; - /* - * Page submitted for IO has to be locked. Either by - * ->ap_make_ready() or by higher layers. - */ -#if defined(__KERNEL__) && defined(__linux__) - { - struct cl_page *page; - - page = osc_oap2cl_page(oap); - - if (page->cp_type == CPT_CACHEABLE && - !(PageLocked(oap->oap_page) && - (CheckWriteback(oap->oap_page, cmd)))) { - CDEBUG(D_PAGE, "page %p lost wb %lx/%x\n", - oap->oap_page, - (long)oap->oap_page->flags, - oap->oap_async_flags); - LBUG(); - } - } -#endif /* take the page out of our book-keeping */ cfs_list_del_init(&oap->oap_pending_item); lop_update_pending(cli, lop, cmd, -1); cfs_list_del_init(&oap->oap_urgent_item); - if (page_count == 0) - starting_offset = (oap->oap_obj_off+oap->oap_page_off) & - (PTLRPC_MAX_BRW_SIZE - 1); - /* ask the caller for the size of the io as the rpc leaves. */ if (!(oap->oap_async_flags & ASYNC_COUNT_STABLE)) { oap->oap_count = @@ -2501,18 +2596,22 @@ osc_send_oap_rpc(const struct lu_env *env, struct client_obd *cli, /* now put the page back in our accounting */ cfs_list_add_tail(&oap->oap_rpc_item, &rpc_list); - if (page_count == 0) + if (page_count++ == 0) srvlock = !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK); - if (++page_count >= cli->cl_max_pages_per_rpc) - break; + + if (oap->oap_brw_flags & OBD_BRW_MEMALLOC) + mem_tight = 1; /* End on a PTLRPC_MAX_BRW_SIZE boundary. We want full-sized * RPCs aligned on PTLRPC_MAX_BRW_SIZE boundaries to help reads * have the same alignment as the initial writes that allocated * extents on the server. */ - ending_offset = (oap->oap_obj_off + oap->oap_page_off + - oap->oap_count) & (PTLRPC_MAX_BRW_SIZE - 1); - if (ending_offset == 0) + ending_offset = oap->oap_obj_off + oap->oap_page_off + + oap->oap_count; + if (!(ending_offset & (PTLRPC_MAX_BRW_SIZE - 1))) + break; + + if (page_count >= cli->cl_max_pages_per_rpc) break; /* If there is a gap at the end of this page, it can't merge @@ -2522,8 +2621,6 @@ osc_send_oap_rpc(const struct lu_env *env, struct client_obd *cli, break; } - osc_wake_cache_waiters(cli); - loi_list_maint(cli, loi); client_obd_list_unlock(&cli->cl_loi_list_lock); @@ -2536,7 +2633,8 @@ osc_send_oap_rpc(const struct lu_env *env, struct client_obd *cli, RETURN(0); } - req = osc_build_req(env, cli, &rpc_list, page_count, cmd); + req = osc_build_req(env, cli, &rpc_list, page_count, + mem_tight ? (cmd | OBD_BRW_MEMALLOC) : cmd); if (IS_ERR(req)) { LASSERT(cfs_list_empty(&rpc_list)); loi_list_maint(cli, loi); @@ -2545,6 +2643,7 @@ osc_send_oap_rpc(const struct lu_env *env, struct client_obd *cli, aa = ptlrpc_req_async_args(req); + starting_offset &= PTLRPC_MAX_BRW_SIZE - 1; if (cmd == OBD_BRW_READ) { lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count); lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight); @@ -2557,7 +2656,6 @@ osc_send_oap_rpc(const struct lu_env *env, struct client_obd *cli, lprocfs_oh_tally_log2(&cli->cl_write_offset_hist, (starting_offset >> CFS_PAGE_SHIFT) + 1); } - ptlrpc_lprocfs_brw(req, aa->aa_requested_nob); client_obd_list_lock(&cli->cl_loi_list_lock); @@ -2586,7 +2684,19 @@ osc_send_oap_rpc(const struct lu_env *env, struct client_obd *cli, page_count, aa, cli->cl_r_in_flight, cli->cl_w_in_flight); req->rq_interpret_reply = brw_interpret; - ptlrpcd_add_req(req, PSCOPE_BRW); + + /* XXX: Maybe the caller can check the RPC bulk descriptor to see which + * CPU/NUMA node the majority of pages were allocated on, and try + * to assign the async RPC to the CPU core (PDL_POLICY_PREFERRED) + * to reduce cross-CPU memory traffic. + * + * But on the other hand, we expect that multiple ptlrpcd threads + * and the initial write sponsor can run in parallel, especially + * when data checksum is enabled, which is CPU-bound operation and + * single ptlrpcd thread cannot process in time. So more ptlrpcd + * threads sharing BRW load (with PDL_POLICY_ROUND) seems better. + */ + ptlrpcd_add_req(req, pol, -1); RETURN(1); } @@ -2660,12 +2770,15 @@ static int osc_max_rpc_in_flight(struct client_obd *cli, struct lov_oinfo *loi) } /* called with the loi list lock held */ -void osc_check_rpcs(const struct lu_env *env, struct client_obd *cli) +static void osc_check_rpcs0(const struct lu_env *env, struct client_obd *cli, int ptlrpc) { struct lov_oinfo *loi; int rc = 0, race_counter = 0; + pdl_policy_t pol; ENTRY; + pol = ptlrpc ? PDL_POLICY_SAME : PDL_POLICY_ROUND; + while ((loi = osc_next_loi(cli)) != NULL) { LOI_DEBUG(loi, "%lu in flight\n", rpcs_in_flight(cli)); @@ -2680,7 +2793,7 @@ void osc_check_rpcs(const struct lu_env *env, struct client_obd *cli) * do io on writes while there are cache waiters */ if (lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)) { rc = osc_send_oap_rpc(env, cli, loi, OBD_BRW_WRITE, - &loi->loi_write_lop); + &loi->loi_write_lop, pol); if (rc < 0) { CERROR("Write request failed with %d\n", rc); @@ -2705,22 +2818,22 @@ void osc_check_rpcs(const struct lu_env *env, struct client_obd *cli) if (rc > 0) race_counter = 0; - else + else if (rc == 0) race_counter++; } if (lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ)) { rc = osc_send_oap_rpc(env, cli, loi, OBD_BRW_READ, - &loi->loi_read_lop); + &loi->loi_read_lop, pol); if (rc < 0) CERROR("Read request failed with %d\n", rc); if (rc > 0) race_counter = 0; - else + else if (rc == 0) race_counter++; } - /* attempt some inter-object balancing by issueing rpcs + /* attempt some inter-object balancing by issuing rpcs * for each object in turn */ if (!cfs_list_empty(&loi->loi_hp_ready_item)) cfs_list_del_init(&loi->loi_hp_ready_item); @@ -2741,26 +2854,12 @@ void osc_check_rpcs(const struct lu_env *env, struct client_obd *cli) if (race_counter == 10) break; } - EXIT; } -/* we're trying to queue a page in the osc so we're subject to the - * 'cl_dirty_max' limit on the number of pages that can be queued in the osc. - * If the osc's queued pages are already at that limit, then we want to sleep - * until there is space in the osc's queue for us. We also may be waiting for - * write credits from the OST if there are RPCs in flight that may return some - * before we fall back to sync writes. - * - * We need this know our allocation was granted in the presence of signals */ -static int ocw_granted(struct client_obd *cli, struct osc_cache_waiter *ocw) +void osc_check_rpcs(const struct lu_env *env, struct client_obd *cli) { - int rc; - ENTRY; - client_obd_list_lock(&cli->cl_loi_list_lock); - rc = cfs_list_empty(&ocw->ocw_entry) || rpcs_in_flight(cli) == 0; - client_obd_list_unlock(&cli->cl_loi_list_lock); - RETURN(rc); -}; + osc_check_rpcs0(env, cli, 0); +} /** * Non-blocking version of osc_enter_cache() that consumes grant only when it @@ -2791,8 +2890,8 @@ static int osc_enter_cache(const struct lu_env *env, struct osc_async_page *oap) { struct osc_cache_waiter ocw; - struct l_wait_info lwi = { 0 }; - + struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL); + int rc = -EDQUOT; ENTRY; CDEBUG(D_CACHE, "dirty: %ld/%d dirty_max: %ld/%d dropped: %lu " @@ -2802,8 +2901,9 @@ static int osc_enter_cache(const struct lu_env *env, /* force the caller to try sync io. this can jump the list * of queued writes and create a discontiguous rpc stream */ - if (cli->cl_dirty_max < CFS_PAGE_SIZE || cli->cl_ar.ar_force_sync || - loi->loi_ar.ar_force_sync) + if (OBD_FAIL_CHECK(OBD_FAIL_OSC_NO_GRANT) || + cli->cl_dirty_max < CFS_PAGE_SIZE || + cli->cl_ar.ar_force_sync || loi->loi_ar.ar_force_sync) RETURN(-EDQUOT); /* Hopefully normal case - cache space and write credits available */ @@ -2812,35 +2912,38 @@ static int osc_enter_cache(const struct lu_env *env, osc_enter_cache_try(env, cli, loi, oap, 0)) RETURN(0); - /* It is safe to block as a cache waiter as long as there is grant - * space available or the hope of additional grant being returned - * when an in flight write completes. Using the write back cache - * if possible is preferable to sending the data synchronously - * because write pages can then be merged in to large requests. - * The addition of this cache waiter will causing pending write - * pages to be sent immediately. */ - if (cli->cl_w_in_flight || cli->cl_avail_grant >= CFS_PAGE_SIZE) { + /* We can get here for two reasons: too many dirty pages in cache, or + * run out of grants. In both cases we should write dirty pages out. + * Adding a cache waiter will trigger urgent write-out no matter what + * RPC size will be. + * The exiting condition is no avail grants and no dirty pages caching, + * that really means there is no space on the OST. */ + cfs_waitq_init(&ocw.ocw_waitq); + ocw.ocw_oap = oap; + while (cli->cl_dirty > 0) { cfs_list_add_tail(&ocw.ocw_entry, &cli->cl_cache_waiters); - cfs_waitq_init(&ocw.ocw_waitq); - ocw.ocw_oap = oap; ocw.ocw_rc = 0; loi_list_maint(cli, loi); osc_check_rpcs(env, cli); client_obd_list_unlock(&cli->cl_loi_list_lock); - CDEBUG(D_CACHE, "sleeping for cache space\n"); - l_wait_event(ocw.ocw_waitq, ocw_granted(cli, &ocw), &lwi); + CDEBUG(D_CACHE, "%s: sleeping for cache space @ %p for %p\n", + cli->cl_import->imp_obd->obd_name, &ocw, oap); + + rc = l_wait_event(ocw.ocw_waitq, cfs_list_empty(&ocw.ocw_entry), &lwi); client_obd_list_lock(&cli->cl_loi_list_lock); - if (!cfs_list_empty(&ocw.ocw_entry)) { - cfs_list_del(&ocw.ocw_entry); - RETURN(-EINTR); - } - RETURN(ocw.ocw_rc); + cfs_list_del_init(&ocw.ocw_entry); + if (rc < 0) + break; + + rc = ocw.ocw_rc; + if (rc != -EDQUOT) + break; } - RETURN(-EDQUOT); + RETURN(rc); } @@ -2883,28 +2986,17 @@ int osc_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm, RETURN(0); } -struct osc_async_page *oap_from_cookie(void *cookie) -{ - struct osc_async_page *oap = cookie; - if (oap->oap_magic != OAP_MAGIC) - return ERR_PTR(-EINVAL); - return oap; -}; - -int osc_queue_async_io(const struct lu_env *env, - struct obd_export *exp, struct lov_stripe_md *lsm, - struct lov_oinfo *loi, void *cookie, - int cmd, obd_off off, int count, - obd_flag brw_flags, enum async_flags async_flags) +int osc_queue_async_io(const struct lu_env *env, struct obd_export *exp, + struct lov_stripe_md *lsm, struct lov_oinfo *loi, + struct osc_async_page *oap, int cmd, int off, + int count, obd_flag brw_flags, enum async_flags async_flags) { struct client_obd *cli = &exp->exp_obd->u.cli; - struct osc_async_page *oap; int rc = 0; ENTRY; - oap = oap_from_cookie(cookie); - if (IS_ERR(oap)) - RETURN(PTR_ERR(oap)); + if (oap->oap_magic != OAP_MAGIC) + RETURN(-EINVAL); if (cli->cl_import == NULL || cli->cl_import->imp_invalid) RETURN(-EIO); @@ -2929,7 +3021,7 @@ int osc_queue_async_io(const struct lu_env *env, qid[USRQUOTA] = attr.cat_uid; qid[GRPQUOTA] = attr.cat_gid; if (rc == 0 && - lquota_chkdq(quota_interface, cli, qid) == NO_QUOTA) + osc_quota_chkdq(cli, qid) == NO_QUOTA) rc = -EDQUOT; if (rc) RETURN(rc); @@ -2946,7 +3038,7 @@ int osc_queue_async_io(const struct lu_env *env, oap->oap_count = count; oap->oap_brw_flags = brw_flags; /* Give a hint to OST that requests are coming from kswapd - bug19529 */ - if (libcfs_memory_pressure_get()) + if (cfs_memory_pressure_get()) oap->oap_brw_flags |= OBD_BRW_MEMALLOC; cfs_spin_lock(&oap->oap_lock); oap->oap_async_flags = async_flags; @@ -2960,13 +3052,19 @@ int osc_queue_async_io(const struct lu_env *env, } } - osc_oap_to_pending(oap); - loi_list_maint(cli, loi); - LOI_DEBUG(loi, "oap %p page %p added for cmd %d\n", oap, oap->oap_page, cmd); - osc_check_rpcs(env, cli); + osc_oap_to_pending(oap); + loi_list_maint(cli, loi); + if (!osc_max_rpc_in_flight(cli, loi) && + lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)) { + LASSERT(cli->cl_writeback_work != NULL); + rc = ptlrpcd_queue_work(cli->cl_writeback_work); + + CDEBUG(D_CACHE, "Queued writeback work for client obd %p/%d.\n", + cli, rc); + } client_obd_list_unlock(&cli->cl_loi_list_lock); RETURN(0); @@ -3016,19 +3114,16 @@ int osc_set_async_flags_base(struct client_obd *cli, RETURN(0); } -int osc_teardown_async_page(struct obd_export *exp, - struct lov_stripe_md *lsm, - struct lov_oinfo *loi, void *cookie) +int osc_teardown_async_page(struct obd_export *exp, struct lov_stripe_md *lsm, + struct lov_oinfo *loi, struct osc_async_page *oap) { struct client_obd *cli = &exp->exp_obd->u.cli; struct loi_oap_pages *lop; - struct osc_async_page *oap; int rc = 0; ENTRY; - oap = oap_from_cookie(cookie); - if (IS_ERR(oap)) - RETURN(PTR_ERR(oap)); + if (oap->oap_magic != OAP_MAGIC) + RETURN(-EINVAL); if (loi == NULL) loi = lsm->lsm_oinfo[0]; @@ -3064,11 +3159,11 @@ out: RETURN(rc); } -static void osc_set_lock_data_with_check(struct ldlm_lock *lock, - struct ldlm_enqueue_info *einfo, - int flags) +static int osc_set_lock_data_with_check(struct ldlm_lock *lock, + struct ldlm_enqueue_info *einfo) { void *data = einfo->ei_cbdata; + int set = 0; LASSERT(lock != NULL); LASSERT(lock->l_blocking_ast == einfo->ei_cb_bl); @@ -3078,24 +3173,31 @@ static void osc_set_lock_data_with_check(struct ldlm_lock *lock, lock_res_and_lock(lock); cfs_spin_lock(&osc_ast_guard); - LASSERT(lock->l_ast_data == NULL || lock->l_ast_data == data); - lock->l_ast_data = data; + + if (lock->l_ast_data == NULL) + lock->l_ast_data = data; + if (lock->l_ast_data == data) + set = 1; + cfs_spin_unlock(&osc_ast_guard); unlock_res_and_lock(lock); + + return set; } -static void osc_set_data_with_check(struct lustre_handle *lockh, - struct ldlm_enqueue_info *einfo, - int flags) +static int osc_set_data_with_check(struct lustre_handle *lockh, + struct ldlm_enqueue_info *einfo) { struct ldlm_lock *lock = ldlm_handle2lock(lockh); + int set = 0; if (lock != NULL) { - osc_set_lock_data_with_check(lock, einfo, flags); + set = osc_set_lock_data_with_check(lock, einfo); LDLM_LOCK_PUT(lock); } else CERROR("lockh %p, data %p - client evicted?\n", lockh, einfo->ei_cbdata); + return set; } static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm, @@ -3131,7 +3233,7 @@ static int osc_find_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm, static int osc_enqueue_fini(struct ptlrpc_request *req, struct ost_lvb *lvb, obd_enqueue_update_f upcall, void *cookie, - int *flags, int rc) + int *flags, int agl, int rc) { int intent = *flags & LDLM_FL_HAS_INTENT; ENTRY; @@ -3149,7 +3251,8 @@ static int osc_enqueue_fini(struct ptlrpc_request *req, struct ost_lvb *lvb, } } - if ((intent && rc == ELDLM_LOCK_ABORTED) || !rc) { + if ((intent != 0 && rc == ELDLM_LOCK_ABORTED && agl == 0) || + (rc == 0)) { *flags |= LDLM_FL_LVB_READY; CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n", lvb->lvb_size, lvb->lvb_blocks, lvb->lvb_mtime); @@ -3167,6 +3270,9 @@ static int osc_enqueue_interpret(const struct lu_env *env, struct ldlm_lock *lock; struct lustre_handle handle; __u32 mode; + struct ost_lvb *lvb; + __u32 lvb_len; + int *flags = aa->oa_flags; /* Make a local copy of a lock handle and a mode, because aa->oa_* * might be freed anytime after lock upcall has been called. */ @@ -3183,13 +3289,23 @@ static int osc_enqueue_interpret(const struct lu_env *env, * osc_enqueue_fini(). */ ldlm_lock_addref(&handle, mode); + /* Let CP AST to grant the lock first. */ + OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1); + + if (aa->oa_agl && rc == ELDLM_LOCK_ABORTED) { + lvb = NULL; + lvb_len = 0; + } else { + lvb = aa->oa_lvb; + lvb_len = sizeof(*aa->oa_lvb); + } + /* Complete obtaining the lock procedure. */ rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1, - mode, aa->oa_flags, aa->oa_lvb, - sizeof(*aa->oa_lvb), &handle, rc); + mode, flags, lvb, lvb_len, &handle, rc); /* Complete osc stuff. */ - rc = osc_enqueue_fini(req, aa->oa_lvb, - aa->oa_upcall, aa->oa_cookie, aa->oa_flags, rc); + rc = osc_enqueue_fini(req, aa->oa_lvb, aa->oa_upcall, aa->oa_cookie, + flags, aa->oa_agl, rc); OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10); @@ -3213,8 +3329,9 @@ void osc_update_enqueue(struct lustre_handle *lov_lockhp, struct lov_oinfo *loi, int flags, struct ost_lvb *lvb, __u32 mode, int rc) { + struct ldlm_lock *lock = ldlm_handle2lock(lov_lockhp); + if (rc == ELDLM_OK) { - struct ldlm_lock *lock = ldlm_handle2lock(lov_lockhp); __u64 tmp; LASSERT(lock != NULL); @@ -3235,13 +3352,21 @@ void osc_update_enqueue(struct lustre_handle *lov_lockhp, lock->l_policy_data.l_extent.end); } ldlm_lock_allow_match(lock); - LDLM_LOCK_PUT(lock); } else if (rc == ELDLM_LOCK_ABORTED && (flags & LDLM_FL_HAS_INTENT)) { + LASSERT(lock != NULL); loi->loi_lvb = *lvb; + ldlm_lock_allow_match(lock); CDEBUG(D_INODE, "glimpsed, setting rss="LPU64"; leaving" " kms="LPU64"\n", loi->loi_lvb.lvb_size, loi->loi_kms); rc = ELDLM_OK; } + + if (lock != NULL) { + if (rc != ELDLM_OK) + ldlm_lock_fail_match(lock); + + LDLM_LOCK_PUT(lock); + } } EXPORT_SYMBOL(osc_update_enqueue); @@ -3260,11 +3385,12 @@ int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id, obd_enqueue_update_f upcall, void *cookie, struct ldlm_enqueue_info *einfo, struct lustre_handle *lockh, - struct ptlrpc_request_set *rqset, int async) + struct ptlrpc_request_set *rqset, int async, int agl) { struct obd_device *obd = exp->exp_obd; struct ptlrpc_request *req = NULL; int intent = *flags & LDLM_FL_HAS_INTENT; + int match_lvb = (agl != 0 ? 0 : LDLM_FL_LVB_READY); ldlm_mode_t mode; int rc; ENTRY; @@ -3298,19 +3424,24 @@ int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id, mode = einfo->ei_mode; if (einfo->ei_mode == LCK_PR) mode |= LCK_PW; - mode = ldlm_lock_match(obd->obd_namespace, - *flags | LDLM_FL_LVB_READY, res_id, + mode = ldlm_lock_match(obd->obd_namespace, *flags | match_lvb, res_id, einfo->ei_type, policy, mode, lockh, 0); if (mode) { struct ldlm_lock *matched = ldlm_handle2lock(lockh); - if (matched->l_ast_data == NULL || - matched->l_ast_data == einfo->ei_cbdata) { + if ((agl != 0) && !(matched->l_flags & LDLM_FL_LVB_READY)) { + /* For AGL, if enqueue RPC is sent but the lock is not + * granted, then skip to process this strpe. + * Return -ECANCELED to tell the caller. */ + ldlm_lock_decref(lockh, mode); + LDLM_LOCK_PUT(matched); + RETURN(-ECANCELED); + } else if (osc_set_lock_data_with_check(matched, einfo)) { + *flags |= LDLM_FL_LVB_READY; /* addref the lock only if not async requests and PW * lock is matched whereas we asked for PR. */ if (!rqset && einfo->ei_mode != mode) ldlm_lock_addref(lockh, LCK_PR); - osc_set_lock_data_with_check(matched, einfo, *flags); if (intent) { /* I would like to be able to ASSERT here that * rss <= kms, but I can't, for reasons which @@ -3320,16 +3451,17 @@ int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id, /* We already have a lock, and it's referenced */ (*upcall)(cookie, ELDLM_OK); - /* For async requests, decref the lock. */ if (einfo->ei_mode != mode) ldlm_lock_decref(lockh, LCK_PW); else if (rqset) + /* For async requests, decref the lock. */ ldlm_lock_decref(lockh, einfo->ei_mode); LDLM_LOCK_PUT(matched); RETURN(ELDLM_OK); - } else + } else { ldlm_lock_decref(lockh, mode); - LDLM_LOCK_PUT(matched); + LDLM_LOCK_PUT(matched); + } } no_match: @@ -3341,8 +3473,10 @@ int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id, RETURN(-ENOMEM); rc = ldlm_prep_enqueue_req(exp, req, &cancels, 0); - if (rc) + if (rc) { + ptlrpc_request_free(req); RETURN(rc); + } req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, sizeof *lvb); @@ -3366,11 +3500,12 @@ int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id, aa->oa_cookie = cookie; aa->oa_lvb = lvb; aa->oa_lockh = lockh; + aa->oa_agl = !!agl; req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_enqueue_interpret; if (rqset == PTLRPCD_SET) - ptlrpcd_add_req(req, PSCOPE_OTHER); + ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1); else ptlrpc_set_add_req(rqset, req); } else if (intent) { @@ -3379,7 +3514,7 @@ int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id, RETURN(rc); } - rc = osc_enqueue_fini(req, lvb, upcall, cookie, flags, rc); + rc = osc_enqueue_fini(req, lvb, upcall, cookie, flags, agl, rc); if (intent) ptlrpc_req_finished(req); @@ -3401,7 +3536,7 @@ static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo, &oinfo->oi_md->lsm_oinfo[0]->loi_lvb, oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid, oinfo->oi_cb_up, oinfo, einfo, oinfo->oi_lockh, - rqset, rqset != NULL); + rqset, rqset != NULL, 0); RETURN(rc); } @@ -3433,8 +3568,13 @@ int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id, rc = ldlm_lock_match(obd->obd_namespace, lflags, res_id, type, policy, rc, lockh, unref); if (rc) { - if (data != NULL) - osc_set_data_with_check(lockh, data, lflags); + if (data != NULL) { + if (!osc_set_data_with_check(lockh, data)) { + if (!(lflags & LDLM_FL_TEST_LOCK)) + ldlm_lock_decref(lockh, rc); + RETURN(0); + } + } if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) { ldlm_lock_addref(lockh, LCK_PR); ldlm_lock_decref(lockh, LCK_PW); @@ -3530,8 +3670,8 @@ static int osc_statfs_interpret(const struct lu_env *env, * avail < ~0.1% max max = avail + used * 1025 * avail < avail + used used = blocks - free * 1024 * avail < used - * 1024 * avail < blocks - free - * avail < ((blocks - free) >> 10) + * 1024 * avail < blocks - free + * avail < ((blocks - free) >> 10) * * On very large disk, say 16TB 0.1% will be 16 GB. We don't want to * lose that amount of space so in those cases we report no space left @@ -3541,8 +3681,15 @@ static int osc_statfs_interpret(const struct lu_env *env, ((msfs->os_ffree < 32) || (msfs->os_bavail < used)))) cli->cl_oscc.oscc_flags |= OSCC_FLAG_NOSPC; else if (unlikely(((cli->cl_oscc.oscc_flags & OSCC_FLAG_NOSPC) != 0) && - (msfs->os_ffree > 64) && (msfs->os_bavail > (used << 1)))) - cli->cl_oscc.oscc_flags &= ~OSCC_FLAG_NOSPC; + (msfs->os_ffree > 64) && + (msfs->os_bavail > (used << 1)))) { + cli->cl_oscc.oscc_flags &= ~(OSCC_FLAG_NOSPC | + OSCC_FLAG_NOSPC_BLK); + } + + if (unlikely(((cli->cl_oscc.oscc_flags & OSCC_FLAG_NOSPC) != 0) && + (msfs->os_bavail < used))) + cli->cl_oscc.oscc_flags |= OSCC_FLAG_NOSPC_BLK; cfs_spin_unlock(&cli->cl_oscc.oscc_lock); @@ -3783,7 +3930,7 @@ static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len, GOTO(out, err); case OBD_IOC_CLIENT_RECOVER: err = ptlrpc_recover_import(obd->u.cli.cl_import, - data->ioc_inlbuf1); + data->ioc_inlbuf1, 0); if (err > 0) err = 0; GOTO(out, err); @@ -3792,8 +3939,7 @@ static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len, data->ioc_offset); GOTO(out, err); case OBD_IOC_POLL_QUOTACHECK: - err = lquota_poll_check(quota_interface, exp, - (struct if_quotacheck *)karg); + err = osc_quota_poll_check(exp, (struct if_quotacheck *)karg); GOTO(out, err); case OBD_IOC_PING_TARGET: err = ptlrpc_obd_ping(obd); @@ -4042,7 +4188,7 @@ static int osc_set_info_async(struct obd_export *exp, obd_count keylen, CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args)); aa = ptlrpc_req_async_args(req); - OBD_ALLOC_PTR(oa); + OBDO_ALLOC(oa); if (!oa) { ptlrpc_req_finished(req); RETURN(-ENOMEM); @@ -4058,7 +4204,7 @@ static int osc_set_info_async(struct obd_export *exp, obd_count keylen, ptlrpc_set_add_req(set, req); ptlrpc_check_set(NULL, set); } else - ptlrpcd_add_req(req, PSCOPE_OTHER); + ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1); RETURN(0); } @@ -4113,7 +4259,7 @@ static int osc_llog_init(struct obd_device *obd, struct obd_llog_group *olg, LASSERT(olg == &obd->obd_olg); - cfs_mutex_down(&olg->olg_cat_processing); + cfs_mutex_lock(&olg->olg_cat_processing); rc = llog_get_cat_list(disk_obd, name, *index, 1, &catid); if (rc) { CERROR("rc: %d\n", rc); @@ -4137,7 +4283,7 @@ static int osc_llog_init(struct obd_device *obd, struct obd_llog_group *olg, } out: - cfs_mutex_up(&olg->olg_cat_processing); + cfs_mutex_unlock(&olg->olg_cat_processing); return rc; } @@ -4290,7 +4436,8 @@ static int osc_import_event(struct obd_device *obd, struct osc_creator *oscc = &obd->u.cli.cl_oscc; cfs_spin_lock(&oscc->oscc_lock); - oscc->oscc_flags &= ~OSCC_FLAG_NOSPC; + oscc->oscc_flags &= ~(OSCC_FLAG_NOSPC | + OSCC_FLAG_NOSPC_BLK); cfs_spin_unlock(&oscc->oscc_lock); } rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL); @@ -4309,6 +4456,14 @@ static int osc_import_event(struct obd_device *obd, rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL); break; } + case IMP_EVENT_DEACTIVATE: { + rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE, NULL); + break; + } + case IMP_EVENT_ACTIVATE: { + rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE, NULL); + break; + } default: CERROR("Unknown import event %d\n", event); LBUG(); @@ -4316,8 +4471,35 @@ static int osc_import_event(struct obd_device *obd, RETURN(rc); } +/** + * Determine whether the lock can be canceled before replaying the lock + * during recovery, see bug16774 for detailed information. + * + * \retval zero the lock can't be canceled + * \retval other ok to cancel + */ +static int osc_cancel_for_recovery(struct ldlm_lock *lock) +{ + check_res_locked(lock->l_resource); + + /* + * Cancel all unused extent lock in granted mode LCK_PR or LCK_CR. + * + * XXX as a future improvement, we can also cancel unused write lock + * if it doesn't have dirty data and active mmaps. + */ + if (lock->l_resource->lr_type == LDLM_EXTENT && + (lock->l_granted_mode == LCK_PR || + lock->l_granted_mode == LCK_CR) && + (osc_dlm_lock_pageref(lock) == 0)) + RETURN(1); + + RETURN(0); +} + int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg) { + struct client_obd *cli = &obd->u.cli; int rc; ENTRY; @@ -4327,11 +4509,18 @@ int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg) RETURN(rc); rc = client_obd_setup(obd, lcfg); - if (rc) { - ptlrpcd_decref(); - } else { + if (rc == 0) { + void *handler; + handler = ptlrpcd_alloc_work(cli->cl_import, + brw_queue_work, cli); + if (!IS_ERR(handler)) + cli->cl_writeback_work = handler; + else + rc = PTR_ERR(handler); + } + + if (rc == 0) { struct lprocfs_static_vars lvars = { 0 }; - struct client_obd *cli = &obd->u.cli; cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL; lprocfs_osc_init_vars(&lvars); @@ -4353,9 +4542,12 @@ int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg) ptlrpc_add_rqs_to_pool); CFS_INIT_LIST_HEAD(&cli->cl_grant_shrink_list); - cfs_sema_init(&cli->cl_grant_sem, 1); + + ns_register_cancel(obd->obd_namespace, osc_cancel_for_recovery); } + if (rc) + ptlrpcd_decref(); RETURN(rc); } @@ -4377,23 +4569,24 @@ static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage) break; } case OBD_CLEANUP_EXPORTS: { - /* If we set up but never connected, the - client import will not have been cleaned. */ - if (obd->u.cli.cl_import) { - struct obd_import *imp; - cfs_down_write(&obd->u.cli.cl_sem); - imp = obd->u.cli.cl_import; - CDEBUG(D_CONFIG, "%s: client import never connected\n", - obd->obd_name); - ptlrpc_invalidate_import(imp); - if (imp->imp_rq_pool) { - ptlrpc_free_rq_pool(imp->imp_rq_pool); - imp->imp_rq_pool = NULL; - } - class_destroy_import(imp); - cfs_up_write(&obd->u.cli.cl_sem); - obd->u.cli.cl_import = NULL; + struct client_obd *cli = &obd->u.cli; + /* LU-464 + * for echo client, export may be on zombie list, wait for + * zombie thread to cull it, because cli.cl_import will be + * cleared in client_disconnect_export(): + * class_export_destroy() -> obd_cleanup() -> + * echo_device_free() -> echo_client_cleanup() -> + * obd_disconnect() -> osc_disconnect() -> + * client_disconnect_export() + */ + obd_zombie_barrier(); + if (cli->cl_writeback_work) { + ptlrpcd_destroy_work(cli->cl_writeback_work); + cli->cl_writeback_work = NULL; } + obd_cleanup_client_import(obd); + ptlrpc_lprocfs_unregister_obd(obd); + lprocfs_obd_cleanup(obd); rc = obd_llog_finish(obd, 0); if (rc != 0) CERROR("failed to cleanup llogging subsystems\n"); @@ -4408,11 +4601,9 @@ int osc_cleanup(struct obd_device *obd) int rc; ENTRY; - ptlrpc_lprocfs_unregister_obd(obd); - lprocfs_obd_cleanup(obd); /* free memory of osc quota cache */ - lquota_cleanup(quota_interface, obd); + osc_quota_cleanup(obd); rc = client_obd_cleanup(obd); @@ -4481,6 +4672,9 @@ struct obd_ops osc_obd_ops = { .o_llog_init = osc_llog_init, .o_llog_finish = osc_llog_finish, .o_process_config = osc_process_config, + .o_quotactl = osc_quotactl, + .o_quotacheck = osc_quotacheck, + .o_quota_adjust_qunit = osc_quota_adjust_qunit, }; extern struct lu_kmem_descr osc_caches[]; @@ -4496,22 +4690,16 @@ int __init osc_init(void) /* print an address of _any_ initialized kernel symbol from this * module, to allow debugging with gdb that doesn't support data * symbols from modules.*/ - CDEBUG(D_CONSOLE, "Lustre OSC module (%p).\n", &osc_caches); + CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches); rc = lu_kmem_init(osc_caches); lprocfs_osc_init_vars(&lvars); - cfs_request_module("lquota"); - quota_interface = PORTAL_SYMBOL_GET(osc_quota_interface); - lquota_init(quota_interface); - init_obd_quota_ops(quota_interface, &osc_obd_ops); - + osc_quota_init(); rc = class_register_type(&osc_obd_ops, NULL, lvars.module_vars, LUSTRE_OSC_NAME, &osc_device_type); if (rc) { - if (quota_interface) - PORTAL_SYMBOL_PUT(osc_quota_interface); lu_kmem_fini(osc_caches); RETURN(rc); } @@ -4533,10 +4721,7 @@ static void /*__exit*/ osc_exit(void) { lu_device_type_fini(&osc_device_type); - lquota_exit(quota_interface); - if (quota_interface) - PORTAL_SYMBOL_PUT(osc_quota_interface); - + osc_quota_exit(); class_unregister_type(LUSTRE_OSC_NAME); lu_kmem_fini(osc_caches); }