X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Fosc%2Fosc_request.c;h=4092d93eb310423f5cb179d2520ba497a94300b8;hp=20482785127b9b8891137d792d4e17112f4f57b9;hb=2d5f51b92f6eb45e9fd6ae57f8d9ec349049ea14;hpb=72e1ce04a8b8f6887ff3df620f20755be0d244d8 diff --git a/lustre/osc/osc_request.c b/lustre/osc/osc_request.c index 2048278..4092d93 100644 --- a/lustre/osc/osc_request.c +++ b/lustre/osc/osc_request.c @@ -27,7 +27,7 @@ * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved. * Use is subject to license terms. * - * Copyright (c) 2011, 2012, Whamcloud, Inc. + * Copyright (c) 2011, 2012, Intel Corporation. */ /* * This file is part of Lustre, http://www.lustre.org/ @@ -58,6 +58,7 @@ #include #include #include +#include #include "osc_internal.h" #include "osc_cl_internal.h" @@ -68,97 +69,95 @@ int osc_cleanup(struct obd_device *obd); /* Pack OSC object metadata for disk storage (LE byte order). */ static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp, - struct lov_stripe_md *lsm) + struct lov_stripe_md *lsm) { - int lmm_size; - ENTRY; + int lmm_size; + ENTRY; - lmm_size = sizeof(**lmmp); - if (!lmmp) - RETURN(lmm_size); + lmm_size = sizeof(**lmmp); + if (lmmp == NULL) + RETURN(lmm_size); - if (*lmmp && !lsm) { - OBD_FREE(*lmmp, lmm_size); - *lmmp = NULL; - RETURN(0); - } + if (*lmmp != NULL && lsm == NULL) { + OBD_FREE(*lmmp, lmm_size); + *lmmp = NULL; + RETURN(0); + } else if (unlikely(lsm != NULL && ostid_id(&lsm->lsm_oi) == 0)) { + RETURN(-EBADF); + } - if (!*lmmp) { - OBD_ALLOC(*lmmp, lmm_size); - if (!*lmmp) - RETURN(-ENOMEM); - } + if (*lmmp == NULL) { + OBD_ALLOC(*lmmp, lmm_size); + if (*lmmp == NULL) + RETURN(-ENOMEM); + } - if (lsm) { - LASSERT(lsm->lsm_object_id); - LASSERT_SEQ_IS_MDT(lsm->lsm_object_seq); - (*lmmp)->lmm_object_id = cpu_to_le64(lsm->lsm_object_id); - (*lmmp)->lmm_object_seq = cpu_to_le64(lsm->lsm_object_seq); - } + if (lsm) + ostid_cpu_to_le(&lsm->lsm_oi, &(*lmmp)->lmm_oi); - RETURN(lmm_size); + RETURN(lmm_size); } /* Unpack OSC object metadata from disk storage (LE byte order). */ static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp, - struct lov_mds_md *lmm, int lmm_bytes) + struct lov_mds_md *lmm, int lmm_bytes) { - int lsm_size; - struct obd_import *imp = class_exp2cliimp(exp); - ENTRY; + int lsm_size; + struct obd_import *imp = class_exp2cliimp(exp); + ENTRY; - if (lmm != NULL) { - if (lmm_bytes < sizeof (*lmm)) { - CERROR("lov_mds_md too small: %d, need %d\n", - lmm_bytes, (int)sizeof(*lmm)); - RETURN(-EINVAL); - } - /* XXX LOV_MAGIC etc check? */ + if (lmm != NULL) { + if (lmm_bytes < sizeof(*lmm)) { + CERROR("%s: lov_mds_md too small: %d, need %d\n", + exp->exp_obd->obd_name, lmm_bytes, + (int)sizeof(*lmm)); + RETURN(-EINVAL); + } + /* XXX LOV_MAGIC etc check? */ - if (lmm->lmm_object_id == 0) { - CERROR("lov_mds_md: zero lmm_object_id\n"); - RETURN(-EINVAL); - } - } + if (unlikely(ostid_id(&lmm->lmm_oi) == 0)) { + CERROR("%s: zero lmm_object_id: rc = %d\n", + exp->exp_obd->obd_name, -EINVAL); + RETURN(-EINVAL); + } + } - lsm_size = lov_stripe_md_size(1); - if (lsmp == NULL) - RETURN(lsm_size); + lsm_size = lov_stripe_md_size(1); + if (lsmp == NULL) + RETURN(lsm_size); - if (*lsmp != NULL && lmm == NULL) { - OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo)); - OBD_FREE(*lsmp, lsm_size); - *lsmp = NULL; - RETURN(0); - } + if (*lsmp != NULL && lmm == NULL) { + OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo)); + OBD_FREE(*lsmp, lsm_size); + *lsmp = NULL; + RETURN(0); + } - if (*lsmp == NULL) { - OBD_ALLOC(*lsmp, lsm_size); - if (*lsmp == NULL) - RETURN(-ENOMEM); - OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo)); - if ((*lsmp)->lsm_oinfo[0] == NULL) { - OBD_FREE(*lsmp, lsm_size); - RETURN(-ENOMEM); - } - loi_init((*lsmp)->lsm_oinfo[0]); - } + if (*lsmp == NULL) { + OBD_ALLOC(*lsmp, lsm_size); + if (unlikely(*lsmp == NULL)) + RETURN(-ENOMEM); + OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo)); + if (unlikely((*lsmp)->lsm_oinfo[0] == NULL)) { + OBD_FREE(*lsmp, lsm_size); + RETURN(-ENOMEM); + } + loi_init((*lsmp)->lsm_oinfo[0]); + } else if (unlikely(ostid_id(&(*lsmp)->lsm_oi) == 0)) { + RETURN(-EBADF); + } - if (lmm != NULL) { - /* XXX zero *lsmp? */ - (*lsmp)->lsm_object_id = le64_to_cpu (lmm->lmm_object_id); - (*lsmp)->lsm_object_seq = le64_to_cpu (lmm->lmm_object_seq); - LASSERT((*lsmp)->lsm_object_id); - LASSERT_SEQ_IS_MDT((*lsmp)->lsm_object_seq); - } + if (lmm != NULL) + /* XXX zero *lsmp? */ + ostid_le_to_cpu(&lmm->lmm_oi, &(*lsmp)->lsm_oi); - if (imp != NULL && - (imp->imp_connect_data.ocd_connect_flags & OBD_CONNECT_MAXBYTES)) - (*lsmp)->lsm_maxbytes = imp->imp_connect_data.ocd_maxbytes; - else - (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES; + if (imp != NULL && + (imp->imp_connect_data.ocd_connect_flags & OBD_CONNECT_MAXBYTES)) + (*lsmp)->lsm_maxbytes = imp->imp_connect_data.ocd_maxbytes; + else + (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES; - RETURN(lsm_size); + RETURN(lsm_size); } static inline void osc_pack_capa(struct ptlrpc_request *req, @@ -215,9 +214,9 @@ static int osc_getattr_interpret(const struct lu_env *env, CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode); lustre_get_wire_obdo(aa->aa_oi->oi_oa, &body->oa); - /* This should really be sent by the OST */ - aa->aa_oi->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE; - aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ; + /* This should really be sent by the OST */ + aa->aa_oi->oi_oa->o_blksize = DT_MAX_BRW_SIZE; + aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ; } else { CDEBUG(D_INFO, "can't unpack ost_body\n"); rc = -EPROTO; @@ -294,9 +293,8 @@ static int osc_getattr(const struct lu_env *env, struct obd_export *exp, CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode); lustre_get_wire_obdo(oinfo->oi_oa, &body->oa); - /* This should really be sent by the OST */ - oinfo->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE; - oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ; + oinfo->oi_oa->o_blksize = cli_brw_size(exp->exp_obd); + oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ; EXIT; out: @@ -477,17 +475,15 @@ int osc_real_create(struct obd_export *exp, struct obdo *oa, lustre_get_wire_obdo(oa, &body->oa); - /* This should really be sent by the OST */ - oa->o_blksize = PTLRPC_MAX_BRW_SIZE; - oa->o_valid |= OBD_MD_FLBLKSZ; + oa->o_blksize = cli_brw_size(exp->exp_obd); + oa->o_valid |= OBD_MD_FLBLKSZ; - /* XXX LOV STACKING: the lsm that is passed to us from LOV does not - * have valid lsm_oinfo data structs, so don't go touching that. - * This needs to be fixed in a big way. - */ - lsm->lsm_object_id = oa->o_id; - lsm->lsm_object_seq = oa->o_seq; - *ea = lsm; + /* XXX LOV STACKING: the lsm that is passed to us from LOV does not + * have valid lsm_oinfo data structs, so don't go touching that. + * This needs to be fixed in a big way. + */ + lsm->lsm_oi = oa->o_oi; + *ea = lsm; if (oti != NULL) { oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg); @@ -662,10 +658,19 @@ static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa, int count; ENTRY; - osc_build_res_name(oa->o_id, oa->o_seq, &res_id); - res = ldlm_resource_get(ns, NULL, &res_id, 0, 0); - if (res == NULL) - RETURN(0); + /* Return, i.e. cancel nothing, only if ELC is supported (flag in + * export) but disabled through procfs (flag in NS). + * + * This distinguishes from a case when ELC is not supported originally, + * when we still want to cancel locks in advance and just cancel them + * locally, without sending any RPC. */ + if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns)) + RETURN(0); + + ostid_build_res_name(&oa->o_oi, &res_id); + res = ldlm_resource_get(ns, NULL, &res_id, 0, 0); + if (res == NULL) + RETURN(0); LDLM_RESOURCE_ADDREF(res); count = ldlm_cancel_resource_local(res, cancels, NULL, mode, @@ -720,7 +725,7 @@ int osc_create(const struct lu_env *env, struct obd_export *exp, RETURN(osc_real_create(exp, oa, ea, oti)); } - if (!fid_seq_is_mdt(oa->o_seq)) + if (!fid_seq_is_mdt(ostid_seq(&oa->o_oi))) RETURN(osc_real_create(exp, oa, ea, oti)); /* we should not get here anymore */ @@ -819,28 +824,33 @@ static void osc_announce_cached(struct client_obd *cli, struct obdo *oa, oa->o_valid |= bits; client_obd_list_lock(&cli->cl_loi_list_lock); oa->o_dirty = cli->cl_dirty; - if (cli->cl_dirty - cli->cl_dirty_transit > cli->cl_dirty_max) { - CERROR("dirty %lu - %lu > dirty_max %lu\n", - cli->cl_dirty, cli->cl_dirty_transit, cli->cl_dirty_max); - oa->o_undirty = 0; - } else if (cfs_atomic_read(&obd_dirty_pages) - - cfs_atomic_read(&obd_dirty_transit_pages) > - obd_max_dirty_pages + 1){ - /* The cfs_atomic_read() allowing the cfs_atomic_inc() are - * not covered by a lock thus they may safely race and trip - * this CERROR() unless we add in a small fudge factor (+1). */ - CERROR("dirty %d - %d > system dirty_max %d\n", - cfs_atomic_read(&obd_dirty_pages), - cfs_atomic_read(&obd_dirty_transit_pages), - obd_max_dirty_pages); - oa->o_undirty = 0; - } else if (cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff) { - CERROR("dirty %lu - dirty_max %lu too big???\n", - cli->cl_dirty, cli->cl_dirty_max); - oa->o_undirty = 0; - } else { - long max_in_flight = (cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT)* - (cli->cl_max_rpcs_in_flight + 1); + if (unlikely(cli->cl_dirty - cli->cl_dirty_transit > + cli->cl_dirty_max)) { + CERROR("dirty %lu - %lu > dirty_max %lu\n", + cli->cl_dirty, cli->cl_dirty_transit, cli->cl_dirty_max); + oa->o_undirty = 0; + } else if (unlikely(cfs_atomic_read(&obd_unstable_pages) + + cfs_atomic_read(&obd_dirty_pages) - + cfs_atomic_read(&obd_dirty_transit_pages) > + (long)(obd_max_dirty_pages + 1))) { + /* The cfs_atomic_read() allowing the cfs_atomic_inc() are + * not covered by a lock thus they may safely race and trip + * this CERROR() unless we add in a small fudge factor (+1). */ + CERROR("%s: dirty %d + %d - %d > system dirty_max %d\n", + cli->cl_import->imp_obd->obd_name, + cfs_atomic_read(&obd_unstable_pages), + cfs_atomic_read(&obd_dirty_pages), + cfs_atomic_read(&obd_dirty_transit_pages), + obd_max_dirty_pages); + oa->o_undirty = 0; + } else if (unlikely(cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff)) { + CERROR("dirty %lu - dirty_max %lu too big???\n", + cli->cl_dirty, cli->cl_dirty_max); + oa->o_undirty = 0; + } else { + long max_in_flight = (cli->cl_max_pages_per_rpc << + CFS_PAGE_SHIFT)* + (cli->cl_max_rpcs_in_flight + 1); oa->o_undirty = max(cli->cl_dirty_max, max_in_flight); } oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant; @@ -920,45 +930,45 @@ static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa) * needed, and avoids shrinking the grant piecemeal. */ static int osc_shrink_grant(struct client_obd *cli) { - long target = (cli->cl_max_rpcs_in_flight + 1) * - cli->cl_max_pages_per_rpc; + __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) * + (cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT); - client_obd_list_lock(&cli->cl_loi_list_lock); - if (cli->cl_avail_grant <= target) - target = cli->cl_max_pages_per_rpc; - client_obd_list_unlock(&cli->cl_loi_list_lock); + client_obd_list_lock(&cli->cl_loi_list_lock); + if (cli->cl_avail_grant <= target_bytes) + target_bytes = cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT; + client_obd_list_unlock(&cli->cl_loi_list_lock); - return osc_shrink_grant_to_target(cli, target); + return osc_shrink_grant_to_target(cli, target_bytes); } -int osc_shrink_grant_to_target(struct client_obd *cli, long target) +int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes) { - int rc = 0; - struct ost_body *body; - ENTRY; - - client_obd_list_lock(&cli->cl_loi_list_lock); - /* Don't shrink if we are already above or below the desired limit - * We don't want to shrink below a single RPC, as that will negatively - * impact block allocation and long-term performance. */ - if (target < cli->cl_max_pages_per_rpc) - target = cli->cl_max_pages_per_rpc; + int rc = 0; + struct ost_body *body; + ENTRY; - if (target >= cli->cl_avail_grant) { - client_obd_list_unlock(&cli->cl_loi_list_lock); - RETURN(0); - } - client_obd_list_unlock(&cli->cl_loi_list_lock); + client_obd_list_lock(&cli->cl_loi_list_lock); + /* Don't shrink if we are already above or below the desired limit + * We don't want to shrink below a single RPC, as that will negatively + * impact block allocation and long-term performance. */ + if (target_bytes < cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT) + target_bytes = cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT; + + if (target_bytes >= cli->cl_avail_grant) { + client_obd_list_unlock(&cli->cl_loi_list_lock); + RETURN(0); + } + client_obd_list_unlock(&cli->cl_loi_list_lock); - OBD_ALLOC_PTR(body); - if (!body) - RETURN(-ENOMEM); + OBD_ALLOC_PTR(body); + if (!body) + RETURN(-ENOMEM); - osc_announce_cached(cli, &body->oa, 0); + osc_announce_cached(cli, &body->oa, 0); - client_obd_list_lock(&cli->cl_loi_list_lock); - body->oa.o_grant = cli->cl_avail_grant - target; - cli->cl_avail_grant = target; + client_obd_list_lock(&cli->cl_loi_list_lock); + body->oa.o_grant = cli->cl_avail_grant - target_bytes; + cli->cl_avail_grant = target_bytes; client_obd_list_unlock(&cli->cl_loi_list_lock); if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) { body->oa.o_valid |= OBD_MD_FLFLAGS; @@ -976,7 +986,6 @@ int osc_shrink_grant_to_target(struct client_obd *cli, long target) RETURN(rc); } -#define GRANT_SHRINK_LIMIT PTLRPC_MAX_BRW_SIZE static int osc_should_shrink_grant(struct client_obd *client) { cfs_time_t time = cfs_time_current(); @@ -986,13 +995,18 @@ static int osc_should_shrink_grant(struct client_obd *client) OBD_CONNECT_GRANT_SHRINK) == 0) return 0; - if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) { - if (client->cl_import->imp_state == LUSTRE_IMP_FULL && - client->cl_avail_grant > GRANT_SHRINK_LIMIT) - return 1; - else - osc_update_next_shrink(client); - } + if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) { + /* Get the current RPC size directly, instead of going via: + * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export) + * Keep comment here so that it can be found by searching. */ + int brw_size = client->cl_max_pages_per_rpc << CFS_PAGE_SHIFT; + + if (client->cl_import->imp_state == LUSTRE_IMP_FULL && + client->cl_avail_grant > brw_size) + return 1; + else + osc_update_next_shrink(client); + } return 0; } @@ -1050,12 +1064,12 @@ static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd) cli->cl_avail_grant = ocd->ocd_grant - cli->cl_dirty; if (cli->cl_avail_grant < 0) { - CWARN("%s: available grant < 0, the OSS is probably not running" - " with patch from bug20278 (%ld) \n", - cli->cl_import->imp_obd->obd_name, cli->cl_avail_grant); - /* workaround for 1.6 servers which do not have - * the patch from bug20278 */ - cli->cl_avail_grant = ocd->ocd_grant; + CWARN("%s: available grant < 0: avail/ocd/dirty %ld/%u/%ld\n", + cli->cl_import->imp_obd->obd_name, cli->cl_avail_grant, + ocd->ocd_grant, cli->cl_dirty); + /* workaround for servers which do not have the patch from + * LU-2679 */ + cli->cl_avail_grant = ocd->ocd_grant; } /* determine the appropriate chunk size used by osc_extent. */ @@ -1200,8 +1214,8 @@ static obd_count osc_checksum_bulk(int nob, obd_count pg_count, cfs_crypto_hash_update_page(hdesc, pga[i]->pg, pga[i]->off & ~CFS_PAGE_MASK, count); - LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d checksum %x\n", - (int)(pga[i]->off & ~CFS_PAGE_MASK), cksum); + LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d\n", + (int)(pga[i]->off & ~CFS_PAGE_MASK)); nob -= pga[i]->count; pg_count--; @@ -1280,12 +1294,10 @@ static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa, * retry logic */ req->rq_no_retry_einprogress = 1; - if (opc == OST_WRITE) - desc = ptlrpc_prep_bulk_imp(req, page_count, - BULK_GET_SOURCE, OST_BULK_PORTAL); - else - desc = ptlrpc_prep_bulk_imp(req, page_count, - BULK_PUT_SINK, OST_BULK_PORTAL); + desc = ptlrpc_prep_bulk_imp(req, page_count, + cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS, + opc == OST_WRITE ? BULK_GET_SOURCE : BULK_PUT_SINK, + OST_BULK_PORTAL); if (desc == NULL) GOTO(out, rc = -ENOMEM); @@ -1298,11 +1310,17 @@ static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa, lustre_set_wire_obdo(&body->oa, oa); - obdo_to_ioobj(oa, ioobj); - ioobj->ioo_bufcnt = niocount; - osc_pack_capa(req, body, ocapa); - LASSERT (page_count > 0); - pg_prev = pga[0]; + obdo_to_ioobj(oa, ioobj); + ioobj->ioo_bufcnt = niocount; + /* The high bits of ioo_max_brw tells server _maximum_ number of bulks + * that might be send for this request. The actual number is decided + * when the RPC is finally sent in ptlrpc_register_bulk(). It sends + * "max - 1" for old client compatibility sending "0", and also so the + * the actual maximum is a power-of-two number, not one less. LU-1431 */ + ioobj_max_brw_set(ioobj, desc->bd_md_max_brw); + osc_pack_capa(req, body, ocapa); + LASSERT(page_count > 0); + pg_prev = pga[0]; for (requested_nob = i = 0; i < page_count; i++, niobuf++) { struct brw_page *pg = pga[i]; int poff = pg->off & ~CFS_PAGE_MASK; @@ -1457,20 +1475,18 @@ static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer, msg = "changed in transit AND doesn't match the original - " "likely false positive due to mmap IO (bug 11742)"; - LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inode "DFID - " object "LPU64"/"LPU64" extent ["LPU64"-"LPU64"]\n", - msg, libcfs_nid2str(peer->nid), - oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0, - oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0, - oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0, - oa->o_id, - oa->o_valid & OBD_MD_FLGROUP ? oa->o_seq : (__u64)0, - pga[0]->off, - pga[page_count-1]->off + pga[page_count-1]->count - 1); - CERROR("original client csum %x (type %x), server csum %x (type %x), " - "client csum now %x\n", client_cksum, client_cksum_type, - server_cksum, cksum_type, new_cksum); - return 1; + LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inode "DFID + " object "DOSTID" extent ["LPU64"-"LPU64"]\n", + msg, libcfs_nid2str(peer->nid), + oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0, + oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0, + oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0, + POSTID(&oa->o_oi), pga[0]->off, + pga[page_count-1]->off + pga[page_count-1]->count - 1); + CERROR("original client csum %x (type %x), server csum %x (type %x), " + "client csum now %x\n", client_cksum, client_cksum_type, + server_cksum, cksum_type, new_cksum); + return 1; } /* Note rc enters this function as number of bytes transferred */ @@ -1579,42 +1595,39 @@ static int osc_brw_fini_request(struct ptlrpc_request *req, int rc) router = libcfs_nid2str(req->rq_bulk->bd_sender); } - if (server_cksum == ~0 && rc > 0) { - CERROR("Protocol error: server %s set the 'checksum' " - "bit, but didn't send a checksum. Not fatal, " - "but please notify on http://bugs.whamcloud.com/\n", - libcfs_nid2str(peer->nid)); - } else if (server_cksum != client_cksum) { - LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from " - "%s%s%s inode "DFID" object " - LPU64"/"LPU64" extent " - "["LPU64"-"LPU64"]\n", - req->rq_import->imp_obd->obd_name, - libcfs_nid2str(peer->nid), - via, router, - body->oa.o_valid & OBD_MD_FLFID ? - body->oa.o_parent_seq : (__u64)0, - body->oa.o_valid & OBD_MD_FLFID ? - body->oa.o_parent_oid : 0, - body->oa.o_valid & OBD_MD_FLFID ? - body->oa.o_parent_ver : 0, - body->oa.o_id, - body->oa.o_valid & OBD_MD_FLGROUP ? - body->oa.o_seq : (__u64)0, - aa->aa_ppga[0]->off, - aa->aa_ppga[aa->aa_page_count-1]->off + - aa->aa_ppga[aa->aa_page_count-1]->count - - 1); - CERROR("client %x, server %x, cksum_type %x\n", - client_cksum, server_cksum, cksum_type); - cksum_counter = 0; - aa->aa_oa->o_cksum = client_cksum; - rc = -EAGAIN; - } else { - cksum_counter++; - CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum); - rc = 0; - } + if (server_cksum == ~0 && rc > 0) { + CERROR("Protocol error: server %s set the 'checksum' " + "bit, but didn't send a checksum. Not fatal, " + "but please notify on http://bugs.whamcloud.com/\n", + libcfs_nid2str(peer->nid)); + } else if (server_cksum != client_cksum) { + LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from " + "%s%s%s inode "DFID" object "DOSTID + " extent ["LPU64"-"LPU64"]\n", + req->rq_import->imp_obd->obd_name, + libcfs_nid2str(peer->nid), + via, router, + body->oa.o_valid & OBD_MD_FLFID ? + body->oa.o_parent_seq : (__u64)0, + body->oa.o_valid & OBD_MD_FLFID ? + body->oa.o_parent_oid : 0, + body->oa.o_valid & OBD_MD_FLFID ? + body->oa.o_parent_ver : 0, + POSTID(&body->oa.o_oi), + aa->aa_ppga[0]->off, + aa->aa_ppga[aa->aa_page_count-1]->off + + aa->aa_ppga[aa->aa_page_count-1]->count - + 1); + CERROR("client %x, server %x, cksum_type %x\n", + client_cksum, server_cksum, cksum_type); + cksum_counter = 0; + aa->aa_oa->o_cksum = client_cksum; + rc = -EAGAIN; + } else { + cksum_counter++; + CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum); + rc = 0; + } } else if (unlikely(client_cksum)) { static int cksum_missed; @@ -1678,17 +1691,17 @@ restart_bulk: if (rc != -EINPROGRESS && !client_should_resend(resends, &exp->exp_obd->u.cli)) { CERROR("%s: too many resend retries for object: " - ""LPU64":"LPU64", rc = %d.\n", - exp->exp_obd->obd_name, oa->o_id, oa->o_seq, rc); - goto out; - } - if (generation != - exp->exp_obd->u.cli.cl_import->imp_generation) { - CDEBUG(D_HA, "%s: resend cross eviction for object: " - ""LPU64":"LPU64", rc = %d.\n", - exp->exp_obd->obd_name, oa->o_id, oa->o_seq, rc); + ""DOSTID", rc = %d.\n", exp->exp_obd->obd_name, + POSTID(&oa->o_oi), rc); goto out; } + if (generation != + exp->exp_obd->u.cli.cl_import->imp_generation) { + CDEBUG(D_HA, "%s: resend cross eviction for object: " + ""DOSTID", rc = %d.\n", exp->exp_obd->obd_name, + POSTID(&oa->o_oi), rc); + goto out; + } lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL, NULL); @@ -1738,6 +1751,7 @@ static int osc_brw_redo_request(struct ptlrpc_request *request, aa->aa_resends++; new_req->rq_interpret_reply = request->rq_interpret_reply; new_req->rq_async_args = request->rq_async_args; + new_req->rq_commit_cb = request->rq_commit_cb; /* cap resend delay to the current request timeout, this is similar to * what ptlrpc does (see after_reply()) */ if (aa->aa_resends > new_req->rq_timeout) @@ -1753,6 +1767,7 @@ static int osc_brw_redo_request(struct ptlrpc_request *request, cfs_list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps); CFS_INIT_LIST_HEAD(&new_aa->aa_exts); cfs_list_splice_init(&aa->aa_exts, &new_aa->aa_exts); + new_aa->aa_resends = aa->aa_resends; cfs_list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) { if (oap->oap_request) { @@ -1939,28 +1954,28 @@ static int brw_interpret(const struct lu_env *env, CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc); /* When server return -EINPROGRESS, client should always retry * regardless of the number of times the bulk was resent already. */ - if (osc_recoverable_error(rc)) { - if (req->rq_import_generation != - req->rq_import->imp_generation) { - CDEBUG(D_HA, "%s: resend cross eviction for object: " - ""LPU64":"LPU64", rc = %d.\n", - req->rq_import->imp_obd->obd_name, - aa->aa_oa->o_id, aa->aa_oa->o_seq, rc); - } else if (rc == -EINPROGRESS || - client_should_resend(aa->aa_resends, aa->aa_cli)) { - rc = osc_brw_redo_request(req, aa, rc); - } else { - CERROR("%s: too many resent retries for object: " - ""LPU64":"LPU64", rc = %d.\n", - req->rq_import->imp_obd->obd_name, - aa->aa_oa->o_id, aa->aa_oa->o_seq, rc); - } + if (osc_recoverable_error(rc)) { + if (req->rq_import_generation != + req->rq_import->imp_generation) { + CDEBUG(D_HA, "%s: resend cross eviction for object: " + ""DOSTID", rc = %d.\n", + req->rq_import->imp_obd->obd_name, + POSTID(&aa->aa_oa->o_oi), rc); + } else if (rc == -EINPROGRESS || + client_should_resend(aa->aa_resends, aa->aa_cli)) { + rc = osc_brw_redo_request(req, aa, rc); + } else { + CERROR("%s: too many resent retries for object: " + ""LPU64":"LPU64", rc = %d.\n", + req->rq_import->imp_obd->obd_name, + POSTID(&aa->aa_oa->o_oi), rc); + } - if (rc == 0) - RETURN(0); - else if (rc == -EAGAIN || rc == -EINPROGRESS) - rc = -EIO; - } + if (rc == 0) + RETURN(0); + else if (rc == -EAGAIN || rc == -EINPROGRESS) + rc = -EIO; + } if (aa->aa_ocapa) { capa_put(aa->aa_ocapa); @@ -2030,6 +2045,20 @@ static int brw_interpret(const struct lu_env *env, RETURN(rc); } +static void brw_commit(struct ptlrpc_request *req) +{ + spin_lock(&req->rq_lock); + /* If osc_inc_unstable_pages (via osc_extent_finish) races with + * this called via the rq_commit_cb, I need to ensure + * osc_dec_unstable_pages is still called. Otherwise unstable + * pages may be leaked. */ + if (req->rq_unstable) + osc_dec_unstable_pages(req); + else + req->rq_committed = 1; + spin_unlock(&req->rq_lock); +} + /** * Build an RPC by the list of extent @ext_list. The caller must ensure * that the total pages in this list are NOT over max pages per RPC. @@ -2135,7 +2164,9 @@ int osc_build_rpc(const struct lu_env *env, struct client_obd *cli, GOTO(out, rc); } + req->rq_commit_cb = brw_commit; req->rq_interpret_reply = brw_interpret; + if (mem_tight != 0) req->rq_memalloc = 1; @@ -2249,17 +2280,17 @@ static int osc_set_lock_data_with_check(struct ldlm_lock *lock, LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl); lock_res_and_lock(lock); - cfs_spin_lock(&osc_ast_guard); + spin_lock(&osc_ast_guard); - if (lock->l_ast_data == NULL) - lock->l_ast_data = data; - if (lock->l_ast_data == data) - set = 1; + if (lock->l_ast_data == NULL) + lock->l_ast_data = data; + if (lock->l_ast_data == data) + set = 1; - cfs_spin_unlock(&osc_ast_guard); - unlock_res_and_lock(lock); + spin_unlock(&osc_ast_guard); + unlock_res_and_lock(lock); - return set; + return set; } static int osc_set_data_with_check(struct lustre_handle *lockh, @@ -2283,7 +2314,7 @@ static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm, struct ldlm_res_id res_id; struct obd_device *obd = class_exp2obd(exp); - osc_build_res_name(lsm->lsm_object_id, lsm->lsm_object_seq, &res_id); + ostid_build_res_name(&lsm->lsm_oi, &res_id); ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data); return 0; } @@ -2299,7 +2330,7 @@ static int osc_find_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm, struct obd_device *obd = class_exp2obd(exp); int rc = 0; - osc_build_res_name(lsm->lsm_object_id, lsm->lsm_object_seq, &res_id); + ostid_build_res_name(&lsm->lsm_oi, &res_id); rc = ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data); if (rc == LDLM_ITER_STOP) return(1); @@ -2567,7 +2598,7 @@ int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id, *flags &= ~LDLM_FL_BLOCK_GRANTED; rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb, - sizeof(*lvb), lockh, async); + sizeof(*lvb), LVB_T_OST, lockh, async); if (rqset) { if (!rc) { struct osc_enqueue_args *aa; @@ -2609,9 +2640,7 @@ static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo, int rc; ENTRY; - osc_build_res_name(oinfo->oi_md->lsm_object_id, - oinfo->oi_md->lsm_object_seq, &res_id); - + ostid_build_res_name(&oinfo->oi_md->lsm_oi, &res_id); rc = osc_enqueue_base(exp, &res_id, &oinfo->oi_flags, &oinfo->oi_policy, &oinfo->oi_md->lsm_oinfo[0]->loi_lvb, oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid, @@ -2691,10 +2720,10 @@ static int osc_cancel_unused(struct obd_export *exp, struct obd_device *obd = class_exp2obd(exp); struct ldlm_res_id res_id, *resp = NULL; - if (lsm != NULL) { - resp = osc_build_res_name(lsm->lsm_object_id, - lsm->lsm_object_seq, &res_id); - } + if (lsm != NULL) { + ostid_build_res_name(&lsm->lsm_oi, &res_id); + resp = &res_id; + } return ldlm_cli_cancel_unused(obd->obd_namespace, resp, flags, opaque); } @@ -2788,10 +2817,10 @@ static int osc_statfs(const struct lu_env *env, struct obd_export *exp, /*Since the request might also come from lprocfs, so we need *sync this with client_disconnect_export Bug15684*/ - cfs_down_read(&obd->u.cli.cl_sem); + down_read(&obd->u.cli.cl_sem); if (obd->u.cli.cl_import) imp = class_import_get(obd->u.cli.cl_import); - cfs_up_read(&obd->u.cli.cl_sem); + up_read(&obd->u.cli.cl_sem); if (!imp) RETURN(-ENODEV); @@ -2880,18 +2909,18 @@ static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump) if (!lumk) RETURN(-ENOMEM); - if (lum.lmm_magic == LOV_USER_MAGIC_V1) - lmm_objects = &(((struct lov_user_md_v1 *)lumk)->lmm_objects[0]); - else - lmm_objects = &(lumk->lmm_objects[0]); - lmm_objects->l_object_id = lsm->lsm_object_id; - } else { - lum_size = lov_mds_md_size(0, lum.lmm_magic); - lumk = &lum; - } + if (lum.lmm_magic == LOV_USER_MAGIC_V1) + lmm_objects = + &(((struct lov_user_md_v1 *)lumk)->lmm_objects[0]); + else + lmm_objects = &(lumk->lmm_objects[0]); + lmm_objects->l_ost_oi = lsm->lsm_oi; + } else { + lum_size = lov_mds_md_size(0, lum.lmm_magic); + lumk = &lum; + } - lumk->lmm_object_id = lsm->lsm_object_id; - lumk->lmm_object_seq = lsm->lsm_object_seq; + lumk->lmm_oi = lsm->lsm_oi; lumk->lmm_stripe_count = 1; if (cfs_copy_to_user(lump, lumk, lum_size)) @@ -3127,9 +3156,9 @@ static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp, /* add this osc into entity list */ LASSERT(cfs_list_empty(&cli->cl_lru_osc)); - cfs_spin_lock(&cli->cl_cache->ccc_lru_lock); + spin_lock(&cli->cl_cache->ccc_lru_lock); cfs_list_add(&cli->cl_lru_osc, &cli->cl_cache->ccc_lru); - cfs_spin_unlock(&cli->cl_cache->ccc_lru_lock); + spin_unlock(&cli->cl_cache->ccc_lru_lock); RETURN(0); } @@ -3154,27 +3183,28 @@ static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp, Even if something bad goes through, we'd get a -EINVAL from OST anyway. */ - if (KEY_IS(KEY_GRANT_SHRINK)) - req = ptlrpc_request_alloc(imp, &RQF_OST_SET_GRANT_INFO); - else - req = ptlrpc_request_alloc(imp, &RQF_OBD_SET_INFO); - - if (req == NULL) - RETURN(-ENOMEM); - - req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY, - RCL_CLIENT, keylen); - req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL, - RCL_CLIENT, vallen); - rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO); - if (rc) { - ptlrpc_request_free(req); - RETURN(rc); - } + req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ? + &RQF_OST_SET_GRANT_INFO : + &RQF_OBD_SET_INFO); + if (req == NULL) + RETURN(-ENOMEM); + + req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY, + RCL_CLIENT, keylen); + if (!KEY_IS(KEY_GRANT_SHRINK)) + req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL, + RCL_CLIENT, vallen); + rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO); + if (rc) { + ptlrpc_request_free(req); + RETURN(rc); + } - tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY); - memcpy(tmp, key, keylen); - tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_VAL); + tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY); + memcpy(tmp, key, keylen); + tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ? + &RMF_OST_BODY : + &RMF_SETINFO_VAL); memcpy(tmp, val, vallen); if (KEY_IS(KEY_GRANT_SHRINK)) { @@ -3245,7 +3275,7 @@ static int osc_reconnect(const struct lu_env *env, client_obd_list_lock(&cli->cl_loi_list_lock); data->ocd_grant = (cli->cl_avail_grant + cli->cl_dirty) ?: - 2 * cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT; + 2 * cli_brw_size(obd); lost_grant = cli->cl_lost_grant; cli->cl_lost_grant = 0; client_obd_list_unlock(&cli->cl_loi_list_lock); @@ -3478,9 +3508,9 @@ static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage) CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name); /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */ ptlrpc_deactivate_import(imp); - cfs_spin_lock(&imp->imp_lock); - imp->imp_pingable = 0; - cfs_spin_unlock(&imp->imp_lock); + spin_lock(&imp->imp_lock); + imp->imp_pingable = 0; + spin_unlock(&imp->imp_lock); break; } case OBD_CLEANUP_EXPORTS: { @@ -3521,9 +3551,9 @@ int osc_cleanup(struct obd_device *obd) /* lru cleanup */ if (cli->cl_cache != NULL) { LASSERT(cfs_atomic_read(&cli->cl_cache->ccc_users) > 0); - cfs_spin_lock(&cli->cl_cache->ccc_lru_lock); + spin_lock(&cli->cl_cache->ccc_lru_lock); cfs_list_del_init(&cli->cl_lru_osc); - cfs_spin_unlock(&cli->cl_cache->ccc_lru_lock); + spin_unlock(&cli->cl_cache->ccc_lru_lock); cli->cl_lru_left = NULL; cfs_atomic_dec(&cli->cl_cache->ccc_users); cli->cl_cache = NULL; @@ -3602,8 +3632,8 @@ struct obd_ops osc_obd_ops = { }; extern struct lu_kmem_descr osc_caches[]; -extern cfs_spinlock_t osc_ast_guard; -extern cfs_lock_class_key_t osc_ast_guard_class; +extern spinlock_t osc_ast_guard; +extern struct lock_class_key osc_ast_guard_class; int __init osc_init(void) { @@ -3627,10 +3657,10 @@ int __init osc_init(void) RETURN(rc); } - cfs_spin_lock_init(&osc_ast_guard); - cfs_lockdep_set_class(&osc_ast_guard, &osc_ast_guard_class); + spin_lock_init(&osc_ast_guard); + lockdep_set_class(&osc_ast_guard, &osc_ast_guard_class); - RETURN(rc); + RETURN(rc); } #ifdef __KERNEL__