X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lustre%2Fosc%2Fosc_request.c;h=4b5f2c36c59271e1a92510013ebd2d6a10ec4eea;hb=421d8e98f309870475aadc05aa81a60e9acd8e65;hp=2b5f8a8ea8fe9e294e001b681090e2c7867898b2;hpb=860404143dafbf696b4d38bb9135eabfd471cc36;p=fs%2Flustre-release.git diff --git a/lustre/osc/osc_request.c b/lustre/osc/osc_request.c index 2b5f8a8..4b5f2c3 100644 --- a/lustre/osc/osc_request.c +++ b/lustre/osc/osc_request.c @@ -30,6 +30,9 @@ * Use is subject to license terms. */ /* + * Copyright (c) 2011 Whamcloud, Inc. + */ +/* * This file is part of Lustre, http://www.lustre.org/ * Lustre is a trademark of Sun Microsystems, Inc. */ @@ -109,6 +112,7 @@ static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp, struct lov_mds_md *lmm, int lmm_bytes) { int lsm_size; + struct obd_import *imp = class_exp2cliimp(exp); ENTRY; if (lmm != NULL) { @@ -156,7 +160,11 @@ static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp, LASSERT_SEQ_IS_MDT((*lsmp)->lsm_object_seq); } - (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES; + if (imp != NULL && + (imp->imp_connect_data.ocd_connect_flags & OBD_CONNECT_MAXBYTES)) + (*lsmp)->lsm_maxbytes = imp->imp_connect_data.ocd_maxbytes; + else + (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES; RETURN(lsm_size); } @@ -564,16 +572,40 @@ static int osc_punch(struct obd_export *exp, struct obd_info *oinfo, oinfo->oi_cb_up, oinfo, rqset); } -static int osc_sync(struct obd_export *exp, struct obdo *oa, - struct lov_stripe_md *md, obd_size start, obd_size end, - void *capa) +static int osc_sync_interpret(const struct lu_env *env, + struct ptlrpc_request *req, + void *arg, int rc) +{ + struct osc_async_args *aa = arg; + struct ost_body *body; + ENTRY; + + if (rc) + GOTO(out, rc); + + body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY); + if (body == NULL) { + CERROR ("can't unpack ost_body\n"); + GOTO(out, rc = -EPROTO); + } + + *aa->aa_oi->oi_oa = body->oa; +out: + rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc); + RETURN(rc); +} + +static int osc_sync(struct obd_export *exp, struct obd_info *oinfo, + obd_size start, obd_size end, + struct ptlrpc_request_set *set) { struct ptlrpc_request *req; struct ost_body *body; + struct osc_async_args *aa; int rc; ENTRY; - if (!oa) { + if (!oinfo->oi_oa) { CDEBUG(D_INFO, "oa NULL\n"); RETURN(-EINVAL); } @@ -582,7 +614,7 @@ static int osc_sync(struct obd_export *exp, struct obdo *oa, if (req == NULL) RETURN(-ENOMEM); - osc_set_capa_size(req, &RMF_CAPA1, capa); + osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa); rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC); if (rc) { ptlrpc_request_free(req); @@ -592,28 +624,21 @@ static int osc_sync(struct obd_export *exp, struct obdo *oa, /* overload the size and blocks fields in the oa with start/end */ body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY); LASSERT(body); - lustre_set_wire_obdo(&body->oa, oa); + lustre_set_wire_obdo(&body->oa, oinfo->oi_oa); body->oa.o_size = start; body->oa.o_blocks = end; body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS); - osc_pack_capa(req, body, capa); + osc_pack_capa(req, body, oinfo->oi_capa); ptlrpc_request_set_replen(req); + req->rq_interpret_reply = osc_sync_interpret; - rc = ptlrpc_queue_wait(req); - if (rc) - GOTO(out, rc); - - body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY); - if (body == NULL) - GOTO(out, rc = -EPROTO); - - lustre_get_wire_obdo(oa, &body->oa); + CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args)); + aa = ptlrpc_req_async_args(req); + aa->aa_oi = oinfo; - EXIT; - out: - ptlrpc_req_finished(req); - return rc; + ptlrpc_set_add_req(set, req); + RETURN (0); } /* Find and cancel locally locks matched by @mode in the resource found by @@ -864,17 +889,6 @@ static unsigned long rpcs_in_flight(struct client_obd *cli) return cli->cl_r_in_flight + cli->cl_w_in_flight; } -int osc_wake_sync_fs(struct client_obd *cli) -{ - ENTRY; - if (cfs_list_empty(&cli->cl_loi_sync_fs_list) && - cli->cl_sf_wait.started) { - cli->cl_sf_wait.sfw_upcall(cli->cl_sf_wait.sfw_oi, 0); - cli->cl_sf_wait.started = 0; - } - RETURN(0); -} - /* caller must hold loi_list_lock */ void osc_wake_cache_waiters(struct client_obd *cli) { @@ -1111,7 +1125,7 @@ static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd) CWARN("%s: available grant < 0, the OSS is probably not running" " with patch from bug20278 (%ld) \n", cli->cl_import->imp_obd->obd_name, cli->cl_avail_grant); - /* workaround for 1.6 servers which do not have + /* workaround for 1.6 servers which do not have * the patch from bug20278 */ cli->cl_avail_grant = ocd->ocd_grant; } @@ -1183,7 +1197,7 @@ static int check_write_rcs(struct ptlrpc_request *req, /* return error if any niobuf was in error */ for (i = 0; i < niocount; i++) { - if (remote_rcs[i] < 0) + if ((int)remote_rcs[i] < 0) return(remote_rcs[i]); if (remote_rcs[i] != 0) { @@ -1259,7 +1273,8 @@ static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa, struct lov_stripe_md *lsm, obd_count page_count, struct brw_page **pga, struct ptlrpc_request **reqp, - struct obd_capa *ocapa, int reserve) + struct obd_capa *ocapa, int reserve, + int resend) { struct ptlrpc_request *req; struct ptlrpc_bulk_desc *desc; @@ -1334,11 +1349,17 @@ static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa, pg_prev = pga[0]; for (requested_nob = i = 0; i < page_count; i++, niobuf++) { struct brw_page *pg = pga[i]; + int poff = pg->off & ~CFS_PAGE_MASK; LASSERT(pg->count > 0); - LASSERTF((pg->off & ~CFS_PAGE_MASK) + pg->count <= CFS_PAGE_SIZE, - "i: %d pg: %p off: "LPU64", count: %u\n", i, pg, - pg->off, pg->count); + /* make sure there is no gap in the middle of page array */ + LASSERTF(page_count == 1 || + (ergo(i == 0, poff + pg->count == CFS_PAGE_SIZE) && + ergo(i > 0 && i < page_count - 1, + poff == 0 && pg->count == CFS_PAGE_SIZE) && + ergo(i == page_count - 1, poff == 0)), + "i: %d/%d pg: %p off: "LPU64", count: %u\n", + i, page_count, pg, pg->off, pg->count); #ifdef __linux__ LASSERTF(i == 0 || pg->off > pg_prev->off, "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64 @@ -1354,8 +1375,7 @@ static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa, LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) == (pg->flag & OBD_BRW_SRVLOCK)); - ptlrpc_prep_bulk_page(desc, pg->pg, pg->off & ~CFS_PAGE_MASK, - pg->count); + ptlrpc_prep_bulk_page(desc, pg->pg, poff, pg->count); requested_nob += pg->count; if (i > 0 && can_merge_pages(pg_prev, pg)) { @@ -1375,6 +1395,14 @@ static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa, &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount)); osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0); + if (resend) { + if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) { + body->oa.o_valid |= OBD_MD_FLFLAGS; + body->oa.o_flags = 0; + } + body->oa.o_flags |= OBD_FL_RECOV_RESEND; + } + if (osc_should_shrink_grant(cli)) osc_shrink_grant_local(cli, &body->oa); @@ -1610,7 +1638,7 @@ static int osc_brw_fini_request(struct ptlrpc_request *req, int rc) if (server_cksum == ~0 && rc > 0) { CERROR("Protocol error: server %s set the 'checksum' " "bit, but didn't send a checksum. Not fatal, " - "but please notify on http://bugzilla.lustre.org/\n", + "but please notify on http://bugs.whamcloud.com/\n", libcfs_nid2str(peer->nid)); } else if (server_cksum != client_cksum) { LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from " @@ -1677,7 +1705,7 @@ static int osc_brw_internal(int cmd, struct obd_export *exp, struct obdo *oa, restart_bulk: rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm, - page_count, pga, &req, ocapa, 0); + page_count, pga, &req, ocapa, 0, resends); if (rc != 0) return (rc); @@ -1694,7 +1722,7 @@ restart_bulk: ptlrpc_req_finished(req); if (osc_recoverable_error(rc)) { resends++; - if (!osc_should_resend(resends, &exp->exp_obd->u.cli)) { + if (!client_should_resend(resends, &exp->exp_obd->u.cli)) { CERROR("too many resend retries, returning error\n"); RETURN(-EIO); } @@ -1718,7 +1746,7 @@ int osc_brw_redo_request(struct ptlrpc_request *request, int rc = 0; ENTRY; - if (!osc_should_resend(aa->aa_resends, aa->aa_cli)) { + if (!client_should_resend(aa->aa_resends, aa->aa_cli)) { CERROR("too many resent retries, returning error\n"); RETURN(-EIO); } @@ -1730,7 +1758,7 @@ int osc_brw_redo_request(struct ptlrpc_request *request, aa->aa_cli, aa->aa_oa, NULL /* lsm unused by osc currently */, aa->aa_page_count, aa->aa_ppga, - &new_req, aa->aa_ocapa, 0); + &new_req, aa->aa_ocapa, 0, 1); if (rc) RETURN(rc); @@ -1943,24 +1971,6 @@ static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap, osc_release_write_grant(cli, &oap->oap_brw_page, sent); } -static int lop_makes_syncfs_rpc(struct loi_oap_pages *lop) -{ - struct osc_async_page *oap; - ENTRY; - - if (cfs_list_empty(&lop->lop_urgent)) - RETURN(0); - - oap = cfs_list_entry(lop->lop_urgent.next, - struct osc_async_page, oap_urgent_item); - - if (oap->oap_async_flags & ASYNC_SYNCFS) { - CDEBUG(D_CACHE, "syncfs request forcing RPC\n"); - RETURN(1); - } - - RETURN(0); -} /* This maintains the lists of pending pages to read/write for a given object * (lop). This is used by osc_check_rpcs->osc_next_loi() and loi_list_maint() @@ -2049,19 +2059,10 @@ void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi) on_list(&loi->loi_ready_item, &cli->cl_loi_ready_list, 0); on_list(&loi->loi_hp_ready_item, &cli->cl_loi_hp_ready_list, 1); } else { - if (lop_makes_syncfs_rpc(&loi->loi_write_lop)) { - on_list(&loi->loi_sync_fs_item, - &cli->cl_loi_sync_fs_list, - loi->loi_write_lop.lop_num_pending); - } else { - on_list(&loi->loi_hp_ready_item, - &cli->cl_loi_hp_ready_list, 0); - on_list(&loi->loi_ready_item, &cli->cl_loi_ready_list, - lop_makes_rpc(cli, &loi->loi_write_lop, - OBD_BRW_WRITE)|| - lop_makes_rpc(cli, &loi->loi_read_lop, - OBD_BRW_READ)); - } + on_list(&loi->loi_hp_ready_item, &cli->cl_loi_hp_ready_list, 0); + on_list(&loi->loi_ready_item, &cli->cl_loi_ready_list, + lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)|| + lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ)); } on_list(&loi->loi_write_item, &cli->cl_loi_write_list, @@ -2149,34 +2150,6 @@ static void osc_process_ar(struct osc_async_rc *ar, __u64 xid, ar->ar_force_sync = 0; } -static int osc_add_to_lop_urgent(struct loi_oap_pages *lop, - struct osc_async_page *oap, - obd_flag async_flags) -{ - - /* If true, then already present in lop urgent */ - if (!cfs_list_empty(&oap->oap_urgent_item)) { - CWARN("Request to add duplicate oap_urgent for flag = %d\n", - oap->oap_async_flags); - return 1; - } - - /* item from sync_fs, to avoid duplicates check the existing flags */ - if (async_flags & ASYNC_SYNCFS) { - cfs_list_add_tail(&oap->oap_urgent_item, - &lop->lop_urgent); - return 0; - } - - if (oap->oap_async_flags & ASYNC_HP) - cfs_list_add(&oap->oap_urgent_item, &lop->lop_urgent); - else if (oap->oap_async_flags & ASYNC_URGENT || - async_flags & ASYNC_URGENT) - cfs_list_add_tail(&oap->oap_urgent_item, &lop->lop_urgent); - - return 0; -} - void osc_oap_to_pending(struct osc_async_page *oap) { struct loi_oap_pages *lop; @@ -2186,7 +2159,10 @@ void osc_oap_to_pending(struct osc_async_page *oap) else lop = &oap->oap_loi->loi_read_lop; - osc_add_to_lop_urgent(lop, oap, 0); + if (oap->oap_async_flags & ASYNC_HP) + cfs_list_add(&oap->oap_urgent_item, &lop->lop_urgent); + else if (oap->oap_async_flags & ASYNC_URGENT) + cfs_list_add_tail(&oap->oap_urgent_item, &lop->lop_urgent); cfs_list_add_tail(&oap->oap_pending_item, &lop->lop_pending); lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, 1); } @@ -2302,7 +2278,6 @@ static int brw_interpret(const struct lu_env *env, osc_release_write_grant(aa->aa_cli, aa->aa_ppga[i], 1); } osc_wake_cache_waiters(cli); - osc_wake_sync_fs(cli); osc_check_rpcs(env, cli); client_obd_list_unlock(&cli->cl_loi_list_lock); if (!async) @@ -2387,7 +2362,7 @@ static struct ptlrpc_request *osc_build_req(const struct lu_env *env, sort_brw_pages(pga, page_count); rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count, - pga, &req, crattr.cra_capa, 1); + pga, &req, crattr.cra_capa, 1, 0); if (rc != 0) { CERROR("prep_req failed: %d\n", rc); GOTO(out, req = ERR_PTR(rc)); @@ -2464,26 +2439,22 @@ osc_send_oap_rpc(const struct lu_env *env, struct client_obd *cli, struct osc_brw_async_args *aa; const struct obd_async_page_ops *ops; CFS_LIST_HEAD(rpc_list); - CFS_LIST_HEAD(tmp_list); - unsigned int ending_offset; - unsigned starting_offset = 0; int srvlock = 0, mem_tight = 0; struct cl_object *clob = NULL; + obd_off starting_offset = OBD_OBJECT_EOF; + unsigned int ending_offset; + int starting_page_off = 0; ENTRY; /* ASYNC_HP pages first. At present, when the lock the pages is * to be canceled, the pages covered by the lock will be sent out * with ASYNC_HP. We have to send out them as soon as possible. */ cfs_list_for_each_entry_safe(oap, tmp, &lop->lop_urgent, oap_urgent_item) { - if (oap->oap_async_flags & ASYNC_HP) - cfs_list_move(&oap->oap_pending_item, &tmp_list); - else - cfs_list_move_tail(&oap->oap_pending_item, &tmp_list); + if (oap->oap_async_flags & ASYNC_HP) + cfs_list_move(&oap->oap_pending_item, &lop->lop_pending); if (++page_count >= cli->cl_max_pages_per_rpc) break; } - - cfs_list_splice(&tmp_list, &lop->lop_pending); page_count = 0; /* first we find the pages we're allowed to work with */ @@ -2512,7 +2483,13 @@ osc_send_oap_rpc(const struct lu_env *env, struct client_obd *cli, /* If there is a gap at the start of this page, it can't merge * with any previous page, so we'll hand the network a * "fragmented" page array that it can't transfer in 1 RDMA */ - if (page_count != 0 && oap->oap_page_off != 0) + if (oap->oap_obj_off < starting_offset) { + if (starting_page_off != 0) + break; + + starting_page_off = oap->oap_page_off; + starting_offset = oap->oap_obj_off + starting_page_off; + } else if (oap->oap_page_off != 0) break; /* in llite being 'ready' equates to the page being locked @@ -2590,10 +2567,6 @@ osc_send_oap_rpc(const struct lu_env *env, struct client_obd *cli, lop_update_pending(cli, lop, cmd, -1); cfs_list_del_init(&oap->oap_urgent_item); - if (page_count == 0) - starting_offset = (oap->oap_obj_off+oap->oap_page_off) & - (PTLRPC_MAX_BRW_SIZE - 1); - /* ask the caller for the size of the io as the rpc leaves. */ if (!(oap->oap_async_flags & ASYNC_COUNT_STABLE)) { oap->oap_count = @@ -2611,20 +2584,22 @@ osc_send_oap_rpc(const struct lu_env *env, struct client_obd *cli, /* now put the page back in our accounting */ cfs_list_add_tail(&oap->oap_rpc_item, &rpc_list); + if (page_count++ == 0) + srvlock = !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK); + if (oap->oap_brw_flags & OBD_BRW_MEMALLOC) mem_tight = 1; - if (page_count == 0) - srvlock = !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK); - if (++page_count >= cli->cl_max_pages_per_rpc) - break; /* End on a PTLRPC_MAX_BRW_SIZE boundary. We want full-sized * RPCs aligned on PTLRPC_MAX_BRW_SIZE boundaries to help reads * have the same alignment as the initial writes that allocated * extents on the server. */ - ending_offset = (oap->oap_obj_off + oap->oap_page_off + - oap->oap_count) & (PTLRPC_MAX_BRW_SIZE - 1); - if (ending_offset == 0) + ending_offset = oap->oap_obj_off + oap->oap_page_off + + oap->oap_count; + if (!(ending_offset & (PTLRPC_MAX_BRW_SIZE - 1))) + break; + + if (page_count >= cli->cl_max_pages_per_rpc) break; /* If there is a gap at the end of this page, it can't merge @@ -2635,7 +2610,7 @@ osc_send_oap_rpc(const struct lu_env *env, struct client_obd *cli, } osc_wake_cache_waiters(cli); - osc_wake_sync_fs(cli); + loi_list_maint(cli, loi); client_obd_list_unlock(&cli->cl_loi_list_lock); @@ -2658,6 +2633,7 @@ osc_send_oap_rpc(const struct lu_env *env, struct client_obd *cli, aa = ptlrpc_req_async_args(req); + starting_offset &= PTLRPC_MAX_BRW_SIZE - 1; if (cmd == OBD_BRW_READ) { lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count); lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight); @@ -2728,9 +2704,6 @@ struct lov_oinfo *osc_next_loi(struct client_obd *cli) if (!cfs_list_empty(&cli->cl_loi_ready_list)) RETURN(cfs_list_entry(cli->cl_loi_ready_list.next, struct lov_oinfo, loi_ready_item)); - if (!cfs_list_empty(&cli->cl_loi_sync_fs_list)) - RETURN(cfs_list_entry(cli->cl_loi_sync_fs_list.next, - struct lov_oinfo, loi_sync_fs_item)); /* then if we have cache waiters, return all objects with queued * writes. This is especially important when many small files @@ -2821,7 +2794,7 @@ void osc_check_rpcs(const struct lu_env *env, struct client_obd *cli) if (rc > 0) race_counter = 0; - else + else if (rc == 0) race_counter++; } if (lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ)) { @@ -2832,7 +2805,7 @@ void osc_check_rpcs(const struct lu_env *env, struct client_obd *cli) if (rc > 0) race_counter = 0; - else + else if (rc == 0) race_counter++; } @@ -2846,8 +2819,6 @@ void osc_check_rpcs(const struct lu_env *env, struct client_obd *cli) cfs_list_del_init(&loi->loi_write_item); if (!cfs_list_empty(&loi->loi_read_item)) cfs_list_del_init(&loi->loi_read_item); - if (!cfs_list_empty(&loi->loi_sync_fs_item)) - cfs_list_del_init(&loi->loi_sync_fs_item); loi_list_maint(cli, loi); @@ -2909,7 +2880,7 @@ static int osc_enter_cache(const struct lu_env *env, struct osc_async_page *oap) { struct osc_cache_waiter ocw; - struct l_wait_info lwi = { 0 }; + struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL); ENTRY; @@ -2920,8 +2891,9 @@ static int osc_enter_cache(const struct lu_env *env, /* force the caller to try sync io. this can jump the list * of queued writes and create a discontiguous rpc stream */ - if (cli->cl_dirty_max < CFS_PAGE_SIZE || cli->cl_ar.ar_force_sync || - loi->loi_ar.ar_force_sync) + if (OBD_FAIL_CHECK(OBD_FAIL_OSC_NO_GRANT) || + cli->cl_dirty_max < CFS_PAGE_SIZE || + cli->cl_ar.ar_force_sync || loi->loi_ar.ar_force_sync) RETURN(-EDQUOT); /* Hopefully normal case - cache space and write credits available */ @@ -3001,28 +2973,17 @@ int osc_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm, RETURN(0); } -struct osc_async_page *oap_from_cookie(void *cookie) -{ - struct osc_async_page *oap = cookie; - if (oap->oap_magic != OAP_MAGIC) - return ERR_PTR(-EINVAL); - return oap; -}; - -int osc_queue_async_io(const struct lu_env *env, - struct obd_export *exp, struct lov_stripe_md *lsm, - struct lov_oinfo *loi, void *cookie, - int cmd, obd_off off, int count, - obd_flag brw_flags, enum async_flags async_flags) +int osc_queue_async_io(const struct lu_env *env, struct obd_export *exp, + struct lov_stripe_md *lsm, struct lov_oinfo *loi, + struct osc_async_page *oap, int cmd, int off, + int count, obd_flag brw_flags, enum async_flags async_flags) { struct client_obd *cli = &exp->exp_obd->u.cli; - struct osc_async_page *oap; int rc = 0; ENTRY; - oap = oap_from_cookie(cookie); - if (IS_ERR(oap)) - RETURN(PTR_ERR(oap)); + if (oap->oap_magic != OAP_MAGIC) + RETURN(-EINVAL); if (cli->cl_import == NULL || cli->cl_import->imp_invalid) RETURN(-EIO); @@ -3112,27 +3073,16 @@ int osc_set_async_flags_base(struct client_obd *cli, if ((oap->oap_async_flags & async_flags) == async_flags) RETURN(0); - /* XXX: This introduces a tiny insignificant race for the case if this - * loi already had other urgent items. - */ - if (SETTING(oap->oap_async_flags, async_flags, ASYNC_SYNCFS) && - cfs_list_empty(&oap->oap_rpc_item) && - cfs_list_empty(&oap->oap_urgent_item)) { - osc_add_to_lop_urgent(lop, oap, ASYNC_SYNCFS); - flags |= ASYNC_SYNCFS; - cfs_spin_lock(&oap->oap_lock); - oap->oap_async_flags |= flags; - cfs_spin_unlock(&oap->oap_lock); - loi_list_maint(cli, loi); - RETURN(0); - } - if (SETTING(oap->oap_async_flags, async_flags, ASYNC_READY)) flags |= ASYNC_READY; if (SETTING(oap->oap_async_flags, async_flags, ASYNC_URGENT) && cfs_list_empty(&oap->oap_rpc_item)) { - osc_add_to_lop_urgent(lop, oap, ASYNC_URGENT); + if (oap->oap_async_flags & ASYNC_HP) + cfs_list_add(&oap->oap_urgent_item, &lop->lop_urgent); + else + cfs_list_add_tail(&oap->oap_urgent_item, + &lop->lop_urgent); flags |= ASYNC_URGENT; loi_list_maint(cli, loi); } @@ -3145,19 +3095,16 @@ int osc_set_async_flags_base(struct client_obd *cli, RETURN(0); } -int osc_teardown_async_page(struct obd_export *exp, - struct lov_stripe_md *lsm, - struct lov_oinfo *loi, void *cookie) +int osc_teardown_async_page(struct obd_export *exp, struct lov_stripe_md *lsm, + struct lov_oinfo *loi, struct osc_async_page *oap) { struct client_obd *cli = &exp->exp_obd->u.cli; struct loi_oap_pages *lop; - struct osc_async_page *oap; int rc = 0; ENTRY; - oap = oap_from_cookie(cookie); - if (IS_ERR(oap)) - RETURN(PTR_ERR(oap)); + if (oap->oap_magic != OAP_MAGIC) + RETURN(-EINVAL); if (loi == NULL) loi = lsm->lsm_oinfo[0]; @@ -3179,8 +3126,7 @@ int osc_teardown_async_page(struct obd_export *exp, if (!cfs_list_empty(&oap->oap_urgent_item)) { cfs_list_del_init(&oap->oap_urgent_item); cfs_spin_lock(&oap->oap_lock); - oap->oap_async_flags &= ~(ASYNC_URGENT | ASYNC_HP | - ASYNC_SYNCFS); + oap->oap_async_flags &= ~(ASYNC_URGENT | ASYNC_HP); cfs_spin_unlock(&oap->oap_lock); } if (!cfs_list_empty(&oap->oap_pending_item)) { @@ -3194,11 +3140,11 @@ out: RETURN(rc); } -static void osc_set_lock_data_with_check(struct ldlm_lock *lock, - struct ldlm_enqueue_info *einfo, - int flags) +static int osc_set_lock_data_with_check(struct ldlm_lock *lock, + struct ldlm_enqueue_info *einfo) { void *data = einfo->ei_cbdata; + int set = 0; LASSERT(lock != NULL); LASSERT(lock->l_blocking_ast == einfo->ei_cb_bl); @@ -3208,24 +3154,31 @@ static void osc_set_lock_data_with_check(struct ldlm_lock *lock, lock_res_and_lock(lock); cfs_spin_lock(&osc_ast_guard); - LASSERT(lock->l_ast_data == NULL || lock->l_ast_data == data); - lock->l_ast_data = data; + + if (lock->l_ast_data == NULL) + lock->l_ast_data = data; + if (lock->l_ast_data == data) + set = 1; + cfs_spin_unlock(&osc_ast_guard); unlock_res_and_lock(lock); + + return set; } -static void osc_set_data_with_check(struct lustre_handle *lockh, - struct ldlm_enqueue_info *einfo, - int flags) +static int osc_set_data_with_check(struct lustre_handle *lockh, + struct ldlm_enqueue_info *einfo) { struct ldlm_lock *lock = ldlm_handle2lock(lockh); + int set = 0; if (lock != NULL) { - osc_set_lock_data_with_check(lock, einfo, flags); + set = osc_set_lock_data_with_check(lock, einfo); LDLM_LOCK_PUT(lock); } else CERROR("lockh %p, data %p - client evicted?\n", lockh, einfo->ei_cbdata); + return set; } static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm, @@ -3437,13 +3390,11 @@ int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id, if (mode) { struct ldlm_lock *matched = ldlm_handle2lock(lockh); - if (matched->l_ast_data == NULL || - matched->l_ast_data == einfo->ei_cbdata) { + if (osc_set_lock_data_with_check(matched, einfo)) { /* addref the lock only if not async requests and PW * lock is matched whereas we asked for PR. */ if (!rqset && einfo->ei_mode != mode) ldlm_lock_addref(lockh, LCK_PR); - osc_set_lock_data_with_check(matched, einfo, *flags); if (intent) { /* I would like to be able to ASSERT here that * rss <= kms, but I can't, for reasons which @@ -3568,8 +3519,13 @@ int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id, rc = ldlm_lock_match(obd->obd_namespace, lflags, res_id, type, policy, rc, lockh, unref); if (rc) { - if (data != NULL) - osc_set_data_with_check(lockh, data, lflags); + if (data != NULL) { + if (!osc_set_data_with_check(lockh, data)) { + if (!(lflags & LDLM_FL_TEST_LOCK)) + ldlm_lock_decref(lockh, rc); + RETURN(0); + } + } if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) { ldlm_lock_addref(lockh, LCK_PR); ldlm_lock_decref(lockh, LCK_PW); @@ -3665,8 +3621,8 @@ static int osc_statfs_interpret(const struct lu_env *env, * avail < ~0.1% max max = avail + used * 1025 * avail < avail + used used = blocks - free * 1024 * avail < used - * 1024 * avail < blocks - free - * avail < ((blocks - free) >> 10) + * 1024 * avail < blocks - free + * avail < ((blocks - free) >> 10) * * On very large disk, say 16TB 0.1% will be 16 GB. We don't want to * lose that amount of space so in those cases we report no space left @@ -4444,6 +4400,14 @@ static int osc_import_event(struct obd_device *obd, rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL); break; } + case IMP_EVENT_DEACTIVATE: { + rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE, NULL); + break; + } + case IMP_EVENT_ACTIVATE: { + rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE, NULL); + break; + } default: CERROR("Unknown import event %d\n", event); LBUG(); @@ -4540,23 +4504,17 @@ static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage) break; } case OBD_CLEANUP_EXPORTS: { - /* If we set up but never connected, the - client import will not have been cleaned. */ - if (obd->u.cli.cl_import) { - struct obd_import *imp; - cfs_down_write(&obd->u.cli.cl_sem); - imp = obd->u.cli.cl_import; - CDEBUG(D_CONFIG, "%s: client import never connected\n", - obd->obd_name); - ptlrpc_invalidate_import(imp); - if (imp->imp_rq_pool) { - ptlrpc_free_rq_pool(imp->imp_rq_pool); - imp->imp_rq_pool = NULL; - } - class_destroy_import(imp); - cfs_up_write(&obd->u.cli.cl_sem); - obd->u.cli.cl_import = NULL; - } + /* LU-464 + * for echo client, export may be on zombie list, wait for + * zombie thread to cull it, because cli.cl_import will be + * cleared in client_disconnect_export(): + * class_export_destroy() -> obd_cleanup() -> + * echo_device_free() -> echo_client_cleanup() -> + * obd_disconnect() -> osc_disconnect() -> + * client_disconnect_export() + */ + obd_zombie_barrier(); + obd_cleanup_client_import(obd); rc = obd_llog_finish(obd, 0); if (rc != 0) CERROR("failed to cleanup llogging subsystems\n"); @@ -4602,46 +4560,6 @@ int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg) return(rc); } -static int osc_sync_fs(struct obd_export *exp, struct obd_info *oinfo, - int wait) -{ - struct obd_device *obd = class_exp2obd(exp); - struct client_obd *cli; - struct lov_oinfo *loi; - struct lov_oinfo *tloi; - struct osc_async_page *oap; - struct osc_async_page *toap; - struct loi_oap_pages *lop; - struct lu_env *env; - int refcheck; - int rc = 0; - ENTRY; - - env = cl_env_get(&refcheck); - if (IS_ERR(env)) - RETURN(PTR_ERR(env)); - - cli = &obd->u.cli; - client_obd_list_lock(&cli->cl_loi_list_lock); - cli->cl_sf_wait.sfw_oi = oinfo; - cli->cl_sf_wait.sfw_upcall = oinfo->oi_cb_up; - cli->cl_sf_wait.started = 1; - /* creating cl_loi_sync_fs list */ - cfs_list_for_each_entry_safe(loi, tloi, &cli->cl_loi_write_list, - loi_write_item) { - lop = &loi->loi_write_lop; - cfs_list_for_each_entry_safe(oap, toap, &lop->lop_pending, - oap_pending_item) - osc_set_async_flags_base(cli, loi, oap, ASYNC_SYNCFS); - } - osc_check_rpcs(env, cli); - osc_wake_sync_fs(cli); - client_obd_list_unlock(&cli->cl_loi_list_lock); - cl_env_put(env, &refcheck); - - RETURN(rc); -} - static int osc_process_config(struct obd_device *obd, obd_count len, void *buf) { return osc_process_config_base(obd, buf); @@ -4684,7 +4602,6 @@ struct obd_ops osc_obd_ops = { .o_llog_init = osc_llog_init, .o_llog_finish = osc_llog_finish, .o_process_config = osc_process_config, - .o_sync_fs = osc_sync_fs, }; extern struct lu_kmem_descr osc_caches[];