X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lustre%2Fosc%2Fosc_request.c;h=64d5e3ceee45f5afad3419c5f1df0175b03e8ab4;hb=1f12a4c87114106ff7e8eff06b3883146f8664ba;hp=b8353c530d50603b44a58c92ee6d32b0bcf565e0;hpb=84a3fd67356c8073a917ea6abd63928055e38156;p=fs%2Flustre-release.git diff --git a/lustre/osc/osc_request.c b/lustre/osc/osc_request.c index b8353c5..64d5e3c 100644 --- a/lustre/osc/osc_request.c +++ b/lustre/osc/osc_request.c @@ -27,7 +27,7 @@ * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved. * Use is subject to license terms. * - * Copyright (c) 2011, 2012, Whamcloud, Inc. + * Copyright (c) 2011, 2012, Intel Corporation. */ /* * This file is part of Lustre, http://www.lustre.org/ @@ -58,6 +58,7 @@ #include #include #include +#include #include "osc_internal.h" #include "osc_cl_internal.h" @@ -68,97 +69,99 @@ int osc_cleanup(struct obd_device *obd); /* Pack OSC object metadata for disk storage (LE byte order). */ static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp, - struct lov_stripe_md *lsm) + struct lov_stripe_md *lsm) { - int lmm_size; - ENTRY; + int lmm_size; + ENTRY; - lmm_size = sizeof(**lmmp); - if (!lmmp) - RETURN(lmm_size); + lmm_size = sizeof(**lmmp); + if (lmmp == NULL) + RETURN(lmm_size); - if (*lmmp && !lsm) { - OBD_FREE(*lmmp, lmm_size); - *lmmp = NULL; - RETURN(0); - } + if (*lmmp != NULL && lsm == NULL) { + OBD_FREE(*lmmp, lmm_size); + *lmmp = NULL; + RETURN(0); + } else if (unlikely(lsm != NULL && lsm->lsm_object_id == 0)) { + RETURN(-EBADF); + } - if (!*lmmp) { - OBD_ALLOC(*lmmp, lmm_size); - if (!*lmmp) - RETURN(-ENOMEM); - } + if (*lmmp == NULL) { + OBD_ALLOC(*lmmp, lmm_size); + if (*lmmp == NULL) + RETURN(-ENOMEM); + } - if (lsm) { - LASSERT(lsm->lsm_object_id); - LASSERT_SEQ_IS_MDT(lsm->lsm_object_seq); - (*lmmp)->lmm_object_id = cpu_to_le64(lsm->lsm_object_id); - (*lmmp)->lmm_object_seq = cpu_to_le64(lsm->lsm_object_seq); - } + if (lsm != NULL) { + (*lmmp)->lmm_object_id = cpu_to_le64(lsm->lsm_object_id); + (*lmmp)->lmm_object_seq = cpu_to_le64(lsm->lsm_object_seq); + } - RETURN(lmm_size); + RETURN(lmm_size); } /* Unpack OSC object metadata from disk storage (LE byte order). */ static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp, - struct lov_mds_md *lmm, int lmm_bytes) + struct lov_mds_md *lmm, int lmm_bytes) { - int lsm_size; - struct obd_import *imp = class_exp2cliimp(exp); - ENTRY; + int lsm_size; + struct obd_import *imp = class_exp2cliimp(exp); + ENTRY; - if (lmm != NULL) { - if (lmm_bytes < sizeof (*lmm)) { - CERROR("lov_mds_md too small: %d, need %d\n", - lmm_bytes, (int)sizeof(*lmm)); - RETURN(-EINVAL); - } - /* XXX LOV_MAGIC etc check? */ + if (lmm != NULL) { + if (lmm_bytes < sizeof(*lmm)) { + CERROR("%s: lov_mds_md too small: %d, need %d\n", + exp->exp_obd->obd_name, lmm_bytes, + (int)sizeof(*lmm)); + RETURN(-EINVAL); + } + /* XXX LOV_MAGIC etc check? */ - if (lmm->lmm_object_id == 0) { - CERROR("lov_mds_md: zero lmm_object_id\n"); - RETURN(-EINVAL); - } - } + if (unlikely(lmm->lmm_object_id == 0)) { + CERROR("%s: zero lmm_object_id\n", + exp->exp_obd->obd_name); + RETURN(-EINVAL); + } + } - lsm_size = lov_stripe_md_size(1); - if (lsmp == NULL) - RETURN(lsm_size); + lsm_size = lov_stripe_md_size(1); + if (lsmp == NULL) + RETURN(lsm_size); - if (*lsmp != NULL && lmm == NULL) { - OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo)); - OBD_FREE(*lsmp, lsm_size); - *lsmp = NULL; - RETURN(0); - } + if (*lsmp != NULL && lmm == NULL) { + OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo)); + OBD_FREE(*lsmp, lsm_size); + *lsmp = NULL; + RETURN(0); + } - if (*lsmp == NULL) { - OBD_ALLOC(*lsmp, lsm_size); - if (*lsmp == NULL) - RETURN(-ENOMEM); - OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo)); - if ((*lsmp)->lsm_oinfo[0] == NULL) { - OBD_FREE(*lsmp, lsm_size); - RETURN(-ENOMEM); - } - loi_init((*lsmp)->lsm_oinfo[0]); - } + if (*lsmp == NULL) { + OBD_ALLOC(*lsmp, lsm_size); + if (unlikely(*lsmp == NULL)) + RETURN(-ENOMEM); + OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo)); + if (unlikely((*lsmp)->lsm_oinfo[0] == NULL)) { + OBD_FREE(*lsmp, lsm_size); + RETURN(-ENOMEM); + } + loi_init((*lsmp)->lsm_oinfo[0]); + } else if (unlikely((*lsmp)->lsm_object_id == 0)) { + RETURN(-EBADF); + } - if (lmm != NULL) { - /* XXX zero *lsmp? */ - (*lsmp)->lsm_object_id = le64_to_cpu (lmm->lmm_object_id); - (*lsmp)->lsm_object_seq = le64_to_cpu (lmm->lmm_object_seq); - LASSERT((*lsmp)->lsm_object_id); - LASSERT_SEQ_IS_MDT((*lsmp)->lsm_object_seq); - } + if (lmm != NULL) { + /* XXX zero *lsmp? */ + (*lsmp)->lsm_object_id = le64_to_cpu(lmm->lmm_object_id); + (*lsmp)->lsm_object_seq = le64_to_cpu(lmm->lmm_object_seq); + } - if (imp != NULL && - (imp->imp_connect_data.ocd_connect_flags & OBD_CONNECT_MAXBYTES)) - (*lsmp)->lsm_maxbytes = imp->imp_connect_data.ocd_maxbytes; - else - (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES; + if (imp != NULL && + (imp->imp_connect_data.ocd_connect_flags & OBD_CONNECT_MAXBYTES)) + (*lsmp)->lsm_maxbytes = imp->imp_connect_data.ocd_maxbytes; + else + (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES; - RETURN(lsm_size); + RETURN(lsm_size); } static inline void osc_pack_capa(struct ptlrpc_request *req, @@ -215,9 +218,9 @@ static int osc_getattr_interpret(const struct lu_env *env, CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode); lustre_get_wire_obdo(aa->aa_oi->oi_oa, &body->oa); - /* This should really be sent by the OST */ - aa->aa_oi->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE; - aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ; + /* This should really be sent by the OST */ + aa->aa_oi->oi_oa->o_blksize = DT_MAX_BRW_SIZE; + aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ; } else { CDEBUG(D_INFO, "can't unpack ost_body\n"); rc = -EPROTO; @@ -294,9 +297,8 @@ static int osc_getattr(const struct lu_env *env, struct obd_export *exp, CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode); lustre_get_wire_obdo(oinfo->oi_oa, &body->oa); - /* This should really be sent by the OST */ - oinfo->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE; - oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ; + oinfo->oi_oa->o_blksize = cli_brw_size(exp->exp_obd); + oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ; EXIT; out: @@ -477,9 +479,8 @@ int osc_real_create(struct obd_export *exp, struct obdo *oa, lustre_get_wire_obdo(oa, &body->oa); - /* This should really be sent by the OST */ - oa->o_blksize = PTLRPC_MAX_BRW_SIZE; - oa->o_valid |= OBD_MD_FLBLKSZ; + oa->o_blksize = cli_brw_size(exp->exp_obd); + oa->o_valid |= OBD_MD_FLBLKSZ; /* XXX LOV STACKING: the lsm that is passed to us from LOV does not * have valid lsm_oinfo data structs, so don't go touching that. @@ -662,10 +663,19 @@ static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa, int count; ENTRY; - osc_build_res_name(oa->o_id, oa->o_seq, &res_id); - res = ldlm_resource_get(ns, NULL, &res_id, 0, 0); - if (res == NULL) - RETURN(0); + /* Return, i.e. cancel nothing, only if ELC is supported (flag in + * export) but disabled through procfs (flag in NS). + * + * This distinguishes from a case when ELC is not supported originally, + * when we still want to cancel locks in advance and just cancel them + * locally, without sending any RPC. */ + if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns)) + RETURN(0); + + ostid_build_res_name(&oa->o_oi, &res_id); + res = ldlm_resource_get(ns, NULL, &res_id, 0, 0); + if (res == NULL) + RETURN(0); LDLM_RESOURCE_ADDREF(res); count = ldlm_cancel_resource_local(res, cancels, NULL, mode, @@ -704,6 +714,31 @@ static int osc_can_send_destroy(struct client_obd *cli) return 0; } +int osc_create(const struct lu_env *env, struct obd_export *exp, + struct obdo *oa, struct lov_stripe_md **ea, + struct obd_trans_info *oti) +{ + int rc = 0; + ENTRY; + + LASSERT(oa); + LASSERT(ea); + LASSERT(oa->o_valid & OBD_MD_FLGROUP); + + if ((oa->o_valid & OBD_MD_FLFLAGS) && + oa->o_flags == OBD_FL_RECREATE_OBJS) { + RETURN(osc_real_create(exp, oa, ea, oti)); + } + + if (!fid_seq_is_mdt(oa->o_seq)) + RETURN(osc_real_create(exp, oa, ea, oti)); + + /* we should not get here anymore */ + LBUG(); + + RETURN(rc); +} + /* Destroy requests can be async always on the client, and we don't even really * care about the return code since the client cannot do anything at all about * a destroy failure. @@ -760,8 +795,11 @@ static int osc_destroy(const struct lu_env *env, struct obd_export *exp, osc_pack_capa(req, body, (struct obd_capa *)capa); ptlrpc_request_set_replen(req); - /* don't throttle destroy RPCs for the MDT */ - if (!(cli->cl_import->imp_connect_flags_orig & OBD_CONNECT_MDS)) { + /* If osc_destory is for destroying the unlink orphan, + * sent from MDT to OST, which should not be blocked here, + * because the process might be triggered by ptlrpcd, and + * it is not good to block ptlrpcd thread (b=16006)*/ + if (!(oa->o_flags & OBD_FL_DELORPHAN)) { req->rq_interpret_reply = osc_destroy_interpret; if (!osc_can_send_destroy(cli)) { struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, @@ -791,31 +829,33 @@ static void osc_announce_cached(struct client_obd *cli, struct obdo *oa, oa->o_valid |= bits; client_obd_list_lock(&cli->cl_loi_list_lock); oa->o_dirty = cli->cl_dirty; - if (cli->cl_dirty - cli->cl_dirty_transit > cli->cl_dirty_max) { - CERROR("dirty %lu - %lu > dirty_max %lu\n", - cli->cl_dirty, cli->cl_dirty_transit, cli->cl_dirty_max); - oa->o_undirty = 0; - } else if (cfs_atomic_read(&obd_dirty_pages) - - cfs_atomic_read(&obd_dirty_transit_pages) > - obd_max_dirty_pages + 1){ - /* The cfs_atomic_read() allowing the cfs_atomic_inc() are - * not covered by a lock thus they may safely race and trip - * this CERROR() unless we add in a small fudge factor (+1). */ - CERROR("dirty %d - %d > system dirty_max %d\n", - cfs_atomic_read(&obd_dirty_pages), - cfs_atomic_read(&obd_dirty_transit_pages), - obd_max_dirty_pages); - oa->o_undirty = 0; - } else if (cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff) { - CERROR("dirty %lu - dirty_max %lu too big???\n", - cli->cl_dirty, cli->cl_dirty_max); - oa->o_undirty = 0; - } else { - long max_in_flight = (cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT)* - (cli->cl_max_rpcs_in_flight + 1); + if (unlikely(cli->cl_dirty - cli->cl_dirty_transit > + cli->cl_dirty_max)) { + CERROR("dirty %lu - %lu > dirty_max %lu\n", + cli->cl_dirty, cli->cl_dirty_transit, cli->cl_dirty_max); + oa->o_undirty = 0; + } else if (unlikely(cfs_atomic_read(&obd_dirty_pages) - + cfs_atomic_read(&obd_dirty_transit_pages) > + (long)(obd_max_dirty_pages + 1))) { + /* The cfs_atomic_read() allowing the cfs_atomic_inc() are + * not covered by a lock thus they may safely race and trip + * this CERROR() unless we add in a small fudge factor (+1). */ + CERROR("dirty %d - %d > system dirty_max %d\n", + cfs_atomic_read(&obd_dirty_pages), + cfs_atomic_read(&obd_dirty_transit_pages), + obd_max_dirty_pages); + oa->o_undirty = 0; + } else if (unlikely(cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff)) { + CERROR("dirty %lu - dirty_max %lu too big???\n", + cli->cl_dirty, cli->cl_dirty_max); + oa->o_undirty = 0; + } else { + long max_in_flight = (cli->cl_max_pages_per_rpc << + CFS_PAGE_SHIFT)* + (cli->cl_max_rpcs_in_flight + 1); oa->o_undirty = max(cli->cl_dirty_max, max_in_flight); } - oa->o_grant = cli->cl_avail_grant; + oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant; oa->o_dropped = cli->cl_lost_grant; cli->cl_lost_grant = 0; client_obd_list_unlock(&cli->cl_loi_list_lock); @@ -892,45 +932,45 @@ static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa) * needed, and avoids shrinking the grant piecemeal. */ static int osc_shrink_grant(struct client_obd *cli) { - long target = (cli->cl_max_rpcs_in_flight + 1) * - cli->cl_max_pages_per_rpc; + __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) * + (cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT); - client_obd_list_lock(&cli->cl_loi_list_lock); - if (cli->cl_avail_grant <= target) - target = cli->cl_max_pages_per_rpc; - client_obd_list_unlock(&cli->cl_loi_list_lock); + client_obd_list_lock(&cli->cl_loi_list_lock); + if (cli->cl_avail_grant <= target_bytes) + target_bytes = cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT; + client_obd_list_unlock(&cli->cl_loi_list_lock); - return osc_shrink_grant_to_target(cli, target); + return osc_shrink_grant_to_target(cli, target_bytes); } -int osc_shrink_grant_to_target(struct client_obd *cli, long target) +int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes) { - int rc = 0; - struct ost_body *body; - ENTRY; - - client_obd_list_lock(&cli->cl_loi_list_lock); - /* Don't shrink if we are already above or below the desired limit - * We don't want to shrink below a single RPC, as that will negatively - * impact block allocation and long-term performance. */ - if (target < cli->cl_max_pages_per_rpc) - target = cli->cl_max_pages_per_rpc; + int rc = 0; + struct ost_body *body; + ENTRY; - if (target >= cli->cl_avail_grant) { - client_obd_list_unlock(&cli->cl_loi_list_lock); - RETURN(0); - } - client_obd_list_unlock(&cli->cl_loi_list_lock); + client_obd_list_lock(&cli->cl_loi_list_lock); + /* Don't shrink if we are already above or below the desired limit + * We don't want to shrink below a single RPC, as that will negatively + * impact block allocation and long-term performance. */ + if (target_bytes < cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT) + target_bytes = cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT; + + if (target_bytes >= cli->cl_avail_grant) { + client_obd_list_unlock(&cli->cl_loi_list_lock); + RETURN(0); + } + client_obd_list_unlock(&cli->cl_loi_list_lock); - OBD_ALLOC_PTR(body); - if (!body) - RETURN(-ENOMEM); + OBD_ALLOC_PTR(body); + if (!body) + RETURN(-ENOMEM); - osc_announce_cached(cli, &body->oa, 0); + osc_announce_cached(cli, &body->oa, 0); - client_obd_list_lock(&cli->cl_loi_list_lock); - body->oa.o_grant = cli->cl_avail_grant - target; - cli->cl_avail_grant = target; + client_obd_list_lock(&cli->cl_loi_list_lock); + body->oa.o_grant = cli->cl_avail_grant - target_bytes; + cli->cl_avail_grant = target_bytes; client_obd_list_unlock(&cli->cl_loi_list_lock); if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) { body->oa.o_valid |= OBD_MD_FLFLAGS; @@ -948,7 +988,6 @@ int osc_shrink_grant_to_target(struct client_obd *cli, long target) RETURN(rc); } -#define GRANT_SHRINK_LIMIT PTLRPC_MAX_BRW_SIZE static int osc_should_shrink_grant(struct client_obd *client) { cfs_time_t time = cfs_time_current(); @@ -958,13 +997,18 @@ static int osc_should_shrink_grant(struct client_obd *client) OBD_CONNECT_GRANT_SHRINK) == 0) return 0; - if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) { - if (client->cl_import->imp_state == LUSTRE_IMP_FULL && - client->cl_avail_grant > GRANT_SHRINK_LIMIT) - return 1; - else - osc_update_next_shrink(client); - } + if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) { + /* Get the current RPC size directly, instead of going via: + * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export) + * Keep comment here so that it can be found by searching. */ + int brw_size = client->cl_max_pages_per_rpc << CFS_PAGE_SHIFT; + + if (client->cl_import->imp_state == LUSTRE_IMP_FULL && + client->cl_avail_grant > brw_size) + return 1; + else + osc_update_next_shrink(client); + } return 0; } @@ -1030,15 +1074,17 @@ static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd) cli->cl_avail_grant = ocd->ocd_grant; } - client_obd_list_unlock(&cli->cl_loi_list_lock); + /* determine the appropriate chunk size used by osc_extent. */ + cli->cl_chunkbits = max_t(int, CFS_PAGE_SHIFT, ocd->ocd_blocksize); + client_obd_list_unlock(&cli->cl_loi_list_lock); - CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld \n", - cli->cl_import->imp_obd->obd_name, - cli->cl_avail_grant, cli->cl_lost_grant); + CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld." + "chunk bits: %d.\n", cli->cl_import->imp_obd->obd_name, + cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits); - if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK && - cfs_list_empty(&cli->cl_grant_shrink_list)) - osc_add_shrink_grant(cli); + if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK && + cfs_list_empty(&cli->cl_grant_shrink_list)) + osc_add_shrink_grant(cli); } /* We assume that the reason this OSC got a short read is because it read @@ -1246,13 +1292,14 @@ static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa, } req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */ ptlrpc_at_set_req_timeout(req); + /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own + * retry logic */ + req->rq_no_retry_einprogress = 1; - if (opc == OST_WRITE) - desc = ptlrpc_prep_bulk_imp(req, page_count, - BULK_GET_SOURCE, OST_BULK_PORTAL); - else - desc = ptlrpc_prep_bulk_imp(req, page_count, - BULK_PUT_SINK, OST_BULK_PORTAL); + desc = ptlrpc_prep_bulk_imp(req, page_count, + cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS, + opc == OST_WRITE ? BULK_GET_SOURCE : BULK_PUT_SINK, + OST_BULK_PORTAL); if (desc == NULL) GOTO(out, rc = -ENOMEM); @@ -1265,11 +1312,17 @@ static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa, lustre_set_wire_obdo(&body->oa, oa); - obdo_to_ioobj(oa, ioobj); - ioobj->ioo_bufcnt = niocount; - osc_pack_capa(req, body, ocapa); - LASSERT (page_count > 0); - pg_prev = pga[0]; + obdo_to_ioobj(oa, ioobj); + ioobj->ioo_bufcnt = niocount; + /* The high bits of ioo_max_brw tells server _maximum_ number of bulks + * that might be send for this request. The actual number is decided + * when the RPC is finally sent in ptlrpc_register_bulk(). It sends + * "max - 1" for old client compatibility sending "0", and also so the + * the actual maximum is a power-of-two number, not one less. LU-1431 */ + ioobj_max_brw_set(ioobj, desc->bd_md_max_brw); + osc_pack_capa(req, body, ocapa); + LASSERT(page_count > 0); + pg_prev = pga[0]; for (requested_nob = i = 0; i < page_count; i++, niobuf++) { struct brw_page *pg = pga[i]; int poff = pg->off & ~CFS_PAGE_MASK; @@ -1298,7 +1351,7 @@ static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa, LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) == (pg->flag & OBD_BRW_SRVLOCK)); - ptlrpc_prep_bulk_page(desc, pg->pg, poff, pg->count); + ptlrpc_prep_bulk_page_pin(desc, pg->pg, poff, pg->count); requested_nob += pg->count; if (i > 0 && can_merge_pages(pg_prev, pg)) { @@ -1669,17 +1722,16 @@ out: RETURN (rc); } -int osc_brw_redo_request(struct ptlrpc_request *request, - struct osc_brw_async_args *aa) +static int osc_brw_redo_request(struct ptlrpc_request *request, + struct osc_brw_async_args *aa, int rc) { struct ptlrpc_request *new_req; - struct ptlrpc_request_set *set = request->rq_set; struct osc_brw_async_args *new_aa; struct osc_async_page *oap; - int rc = 0; ENTRY; - DEBUG_REQ(D_ERROR, request, "redo for recoverable error"); + DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request, + "redo for recoverable error %d", rc); rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) == OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ, @@ -1690,15 +1742,12 @@ int osc_brw_redo_request(struct ptlrpc_request *request, if (rc) RETURN(rc); - client_obd_list_lock(&aa->aa_cli->cl_loi_list_lock); - cfs_list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) { if (oap->oap_request != NULL) { LASSERTF(request == oap->oap_request, "request %p != oap_request %p\n", request, oap->oap_request); if (oap->oap_interrupted) { - client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock); ptlrpc_req_finished(new_req); RETURN(-EINTR); } @@ -1709,15 +1758,22 @@ int osc_brw_redo_request(struct ptlrpc_request *request, aa->aa_resends++; new_req->rq_interpret_reply = request->rq_interpret_reply; new_req->rq_async_args = request->rq_async_args; - new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends; + /* cap resend delay to the current request timeout, this is similar to + * what ptlrpc does (see after_reply()) */ + if (aa->aa_resends > new_req->rq_timeout) + new_req->rq_sent = cfs_time_current_sec() + new_req->rq_timeout; + else + new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends; new_req->rq_generation_set = 1; new_req->rq_import_generation = request->rq_import_generation; new_aa = ptlrpc_req_async_args(new_req); CFS_INIT_LIST_HEAD(&new_aa->aa_oaps); - cfs_list_splice(&aa->aa_oaps, &new_aa->aa_oaps); - CFS_INIT_LIST_HEAD(&aa->aa_oaps); + cfs_list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps); + CFS_INIT_LIST_HEAD(&new_aa->aa_exts); + cfs_list_splice_init(&aa->aa_exts, &new_aa->aa_exts); + new_aa->aa_resends = aa->aa_resends; cfs_list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) { if (oap->oap_request) { @@ -1729,16 +1785,14 @@ int osc_brw_redo_request(struct ptlrpc_request *request, new_aa->aa_ocapa = aa->aa_ocapa; aa->aa_ocapa = NULL; - /* use ptlrpc_set_add_req is safe because interpret functions work - * in check_set context. only one way exist with access to request - * from different thread got -EINTR - this way protected with - * cl_loi_list_lock */ - ptlrpc_set_add_req(set, new_req); - - client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock); + /* XXX: This code will run into problem if we're going to support + * to add a series of BRW RPCs into a self-defined ptlrpc_request_set + * and wait for all of them to be finished. We should inherit request + * set from old request. */ + ptlrpcd_add_req(new_req, PDL_POLICY_SAME, -1); - DEBUG_REQ(D_INFO, new_req, "new request"); - RETURN(0); + DEBUG_REQ(D_INFO, new_req, "new request"); + RETURN(0); } /* @@ -1895,9 +1949,11 @@ out: static int brw_interpret(const struct lu_env *env, struct ptlrpc_request *req, void *data, int rc) { - struct osc_brw_async_args *aa = data; - struct osc_async_page *oap, *tmp; - struct client_obd *cli; + struct osc_brw_async_args *aa = data; + struct osc_extent *ext; + struct osc_extent *tmp; + struct cl_object *obj = NULL; + struct client_obd *cli = aa->aa_cli; ENTRY; rc = osc_brw_fini_request(req, rc); @@ -1913,7 +1969,7 @@ static int brw_interpret(const struct lu_env *env, aa->aa_oa->o_id, aa->aa_oa->o_seq, rc); } else if (rc == -EINPROGRESS || client_should_resend(aa->aa_resends, aa->aa_cli)) { - rc = osc_brw_redo_request(req, aa); + rc = osc_brw_redo_request(req, aa, rc); } else { CERROR("%s: too many resent retries for object: " ""LPU64":"LPU64", rc = %d.\n", @@ -1932,46 +1988,80 @@ static int brw_interpret(const struct lu_env *env, aa->aa_ocapa = NULL; } - cli = aa->aa_cli; - client_obd_list_lock(&cli->cl_loi_list_lock); + cfs_list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) { + if (obj == NULL && rc == 0) { + obj = osc2cl(ext->oe_obj); + cl_object_get(obj); + } - /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters - * is called so we know whether to go to sync BRWs or wait for more - * RPCs to complete */ - if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) - cli->cl_w_in_flight--; - else - cli->cl_r_in_flight--; - - /* the caller may re-use the oap after the completion call so - * we need to clean it up a little */ - cfs_list_for_each_entry_safe(oap, tmp, &aa->aa_oaps, - oap_rpc_item) { - cfs_list_del_init(&oap->oap_rpc_item); - osc_ap_completion(env, cli, aa->aa_oa, oap, 1, rc); + cfs_list_del_init(&ext->oe_link); + osc_extent_finish(env, ext, 1, rc); + } + LASSERT(cfs_list_empty(&aa->aa_exts)); + LASSERT(cfs_list_empty(&aa->aa_oaps)); + + if (obj != NULL) { + struct obdo *oa = aa->aa_oa; + struct cl_attr *attr = &osc_env_info(env)->oti_attr; + unsigned long valid = 0; + + LASSERT(rc == 0); + if (oa->o_valid & OBD_MD_FLBLOCKS) { + attr->cat_blocks = oa->o_blocks; + valid |= CAT_BLOCKS; + } + if (oa->o_valid & OBD_MD_FLMTIME) { + attr->cat_mtime = oa->o_mtime; + valid |= CAT_MTIME; + } + if (oa->o_valid & OBD_MD_FLATIME) { + attr->cat_atime = oa->o_atime; + valid |= CAT_ATIME; + } + if (oa->o_valid & OBD_MD_FLCTIME) { + attr->cat_ctime = oa->o_ctime; + valid |= CAT_CTIME; + } + if (valid != 0) { + cl_object_attr_lock(obj); + cl_object_attr_set(env, obj, attr, valid); + cl_object_attr_unlock(obj); + } + cl_object_put(env, obj); } OBDO_FREE(aa->aa_oa); - osc_wake_cache_waiters(cli); - osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME); - client_obd_list_unlock(&cli->cl_loi_list_lock); - cl_req_completion(env, aa->aa_clerq, rc < 0 ? rc : req->rq_bulk->bd_nob_transferred); osc_release_ppga(aa->aa_ppga, aa->aa_page_count); ptlrpc_lprocfs_brw(req, req->rq_bulk->bd_nob_transferred); + client_obd_list_lock(&cli->cl_loi_list_lock); + /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters + * is called so we know whether to go to sync BRWs or wait for more + * RPCs to complete */ + if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) + cli->cl_w_in_flight--; + else + cli->cl_r_in_flight--; + osc_wake_cache_waiters(cli); + client_obd_list_unlock(&cli->cl_loi_list_lock); + + osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME); RETURN(rc); } -/* The most tricky part of this function is that it will return with - * cli->cli_loi_list_lock held. +/** + * Build an RPC by the list of extent @ext_list. The caller must ensure + * that the total pages in this list are NOT over max pages per RPC. + * Extents in the list must be in OES_RPC state. */ int osc_build_rpc(const struct lu_env *env, struct client_obd *cli, - cfs_list_t *rpc_list, int page_count, int cmd, - pdl_policy_t pol) + cfs_list_t *ext_list, int cmd, pdl_policy_t pol) { struct ptlrpc_request *req = NULL; + struct osc_extent *ext; + CFS_LIST_HEAD(rpc_list); struct brw_page **pga = NULL; struct osc_brw_async_args *aa = NULL; struct obdo *oa = NULL; @@ -1981,17 +2071,39 @@ int osc_build_rpc(const struct lu_env *env, struct client_obd *cli, enum cl_req_type crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ; struct ldlm_lock *lock = NULL; struct cl_req_attr crattr; - int i, rc, mpflag = 0; + obd_off starting_offset = OBD_OBJECT_EOF; + obd_off ending_offset = 0; + int i, rc, mpflag = 0, mem_tight = 0, page_count = 0; - ENTRY; - LASSERT(!cfs_list_empty(rpc_list)); + ENTRY; + LASSERT(!cfs_list_empty(ext_list)); + + /* add pages into rpc_list to build BRW rpc */ + cfs_list_for_each_entry(ext, ext_list, oe_link) { + LASSERT(ext->oe_state == OES_RPC); + mem_tight |= ext->oe_memalloc; + cfs_list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) { + ++page_count; + cfs_list_add_tail(&oap->oap_rpc_item, &rpc_list); + if (starting_offset > oap->oap_obj_off) + starting_offset = oap->oap_obj_off; + else + LASSERT(oap->oap_page_off == 0); + if (ending_offset < oap->oap_obj_off + oap->oap_count) + ending_offset = oap->oap_obj_off + + oap->oap_count; + else + LASSERT(oap->oap_page_off + oap->oap_count == + CFS_PAGE_SIZE); + } + } - if (cmd & OBD_BRW_MEMALLOC) - mpflag = cfs_memory_pressure_get_and_set(); + if (mem_tight) + mpflag = cfs_memory_pressure_get_and_set(); - memset(&crattr, 0, sizeof crattr); - OBD_ALLOC(pga, sizeof(*pga) * page_count); - if (pga == NULL) + memset(&crattr, 0, sizeof crattr); + OBD_ALLOC(pga, sizeof(*pga) * page_count); + if (pga == NULL) GOTO(out, rc = -ENOMEM); OBDO_ALLOC(oa); @@ -1999,16 +2111,18 @@ int osc_build_rpc(const struct lu_env *env, struct client_obd *cli, GOTO(out, rc = -ENOMEM); i = 0; - cfs_list_for_each_entry(oap, rpc_list, oap_rpc_item) { - struct cl_page *page = osc_oap2cl_page(oap); + cfs_list_for_each_entry(oap, &rpc_list, oap_rpc_item) { + struct cl_page *page = oap2cl_page(oap); if (clerq == NULL) { clerq = cl_req_alloc(env, page, crt, 1 /* only 1-object rpcs for * now */); if (IS_ERR(clerq)) GOTO(out, rc = PTR_ERR(clerq)); - lock = oap->oap_ldlm_lock; - } + lock = oap->oap_ldlm_lock; + } + if (mem_tight) + oap->oap_brw_flags |= OBD_BRW_MEMALLOC; pga[i] = &oap->oap_brw_page; pga[i]->off = oap->oap_obj_off + oap->oap_page_off; CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n", @@ -2019,8 +2133,8 @@ int osc_build_rpc(const struct lu_env *env, struct client_obd *cli, /* always get the data for the obdo for the rpc */ LASSERT(clerq != NULL); - crattr.cra_oa = oa; - crattr.cra_capa = NULL; + crattr.cra_oa = oa; + crattr.cra_capa = NULL; memset(crattr.cra_jobid, 0, JOBSTATS_JOBID_SIZE); cl_req_attr_set(env, clerq, &crattr, ~0ULL); if (lock) { @@ -2032,18 +2146,18 @@ int osc_build_rpc(const struct lu_env *env, struct client_obd *cli, if (rc != 0) { CERROR("cl_req_prep failed: %d\n", rc); GOTO(out, rc); - } + } - sort_brw_pages(pga, page_count); - rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count, - pga, &req, crattr.cra_capa, 1, 0); - if (rc != 0) { - CERROR("prep_req failed: %d\n", rc); + sort_brw_pages(pga, page_count); + rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count, + pga, &req, crattr.cra_capa, 1, 0); + if (rc != 0) { + CERROR("prep_req failed: %d\n", rc); GOTO(out, rc); } req->rq_interpret_reply = brw_interpret; - if (cmd & OBD_BRW_MEMALLOC) + if (mem_tight != 0) req->rq_memalloc = 1; /* Need to update the timestamps after the request is built in case @@ -2056,17 +2170,72 @@ int osc_build_rpc(const struct lu_env *env, struct client_obd *cli, lustre_msg_set_jobid(req->rq_reqmsg, crattr.cra_jobid); - CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args)); - aa = ptlrpc_req_async_args(req); - CFS_INIT_LIST_HEAD(&aa->aa_oaps); - cfs_list_splice(rpc_list, &aa->aa_oaps); - CFS_INIT_LIST_HEAD(rpc_list); - aa->aa_clerq = clerq; + CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args)); + aa = ptlrpc_req_async_args(req); + CFS_INIT_LIST_HEAD(&aa->aa_oaps); + cfs_list_splice_init(&rpc_list, &aa->aa_oaps); + CFS_INIT_LIST_HEAD(&aa->aa_exts); + cfs_list_splice_init(ext_list, &aa->aa_exts); + aa->aa_clerq = clerq; + + /* queued sync pages can be torn down while the pages + * were between the pending list and the rpc */ + tmp = NULL; + cfs_list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) { + /* only one oap gets a request reference */ + if (tmp == NULL) + tmp = oap; + if (oap->oap_interrupted && !req->rq_intr) { + CDEBUG(D_INODE, "oap %p in req %p interrupted\n", + oap, req); + ptlrpc_mark_interrupted(req); + } + } + if (tmp != NULL) + tmp->oap_request = ptlrpc_request_addref(req); + + client_obd_list_lock(&cli->cl_loi_list_lock); + starting_offset >>= CFS_PAGE_SHIFT; + if (cmd == OBD_BRW_READ) { + cli->cl_r_in_flight++; + lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count); + lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight); + lprocfs_oh_tally_log2(&cli->cl_read_offset_hist, + starting_offset + 1); + } else { + cli->cl_w_in_flight++; + lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count); + lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight); + lprocfs_oh_tally_log2(&cli->cl_write_offset_hist, + starting_offset + 1); + } + client_obd_list_unlock(&cli->cl_loi_list_lock); + + DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight", + page_count, aa, cli->cl_r_in_flight, + cli->cl_w_in_flight); + + /* XXX: Maybe the caller can check the RPC bulk descriptor to + * see which CPU/NUMA node the majority of pages were allocated + * on, and try to assign the async RPC to the CPU core + * (PDL_POLICY_PREFERRED) to reduce cross-CPU memory traffic. + * + * But on the other hand, we expect that multiple ptlrpcd + * threads and the initial write sponsor can run in parallel, + * especially when data checksum is enabled, which is CPU-bound + * operation and single ptlrpcd thread cannot process in time. + * So more ptlrpcd threads sharing BRW load + * (with PDL_POLICY_ROUND) seems better. + */ + ptlrpcd_add_req(req, pol, -1); + rc = 0; + EXIT; + out: - if (cmd & OBD_BRW_MEMALLOC) - cfs_memory_pressure_restore(mpflag); + if (mem_tight != 0) + cfs_memory_pressure_restore(mpflag); - capa_put(crattr.cra_capa); + capa_put(crattr.cra_capa); if (rc != 0) { LASSERT(req == NULL); @@ -2076,59 +2245,14 @@ out: OBD_FREE(pga, sizeof(*pga) * page_count); /* this should happen rarely and is pretty bad, it makes the * pending list not follow the dirty order */ - client_obd_list_lock(&cli->cl_loi_list_lock); - cfs_list_for_each_entry_safe(oap, tmp, rpc_list, oap_rpc_item) { - cfs_list_del_init(&oap->oap_rpc_item); - - /* queued sync pages can be torn down while the pages - * were between the pending list and the rpc */ - if (oap->oap_interrupted) { - CDEBUG(D_INODE, "oap %p interrupted\n", oap); - osc_ap_completion(env, cli, NULL, oap, 0, - oap->oap_count); - continue; - } - osc_ap_completion(env, cli, NULL, oap, 0, rc); + while (!cfs_list_empty(ext_list)) { + ext = cfs_list_entry(ext_list->next, struct osc_extent, + oe_link); + cfs_list_del_init(&ext->oe_link); + osc_extent_finish(env, ext, 0, rc); } if (clerq && !IS_ERR(clerq)) cl_req_completion(env, clerq, rc); - } else { - struct osc_async_page *tmp = NULL; - - /* queued sync pages can be torn down while the pages - * were between the pending list and the rpc */ - LASSERT(aa != NULL); - client_obd_list_lock(&cli->cl_loi_list_lock); - cfs_list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) { - /* only one oap gets a request reference */ - if (tmp == NULL) - tmp = oap; - if (oap->oap_interrupted && !req->rq_intr) { - CDEBUG(D_INODE, "oap %p in req %p interrupted\n", - oap, req); - ptlrpc_mark_interrupted(req); - } - } - if (tmp != NULL) - tmp->oap_request = ptlrpc_request_addref(req); - - DEBUG_REQ(D_INODE,req, "%d pages, aa %p. now %dr/%dw in flight", - page_count, aa, cli->cl_r_in_flight, - cli->cl_w_in_flight); - - /* XXX: Maybe the caller can check the RPC bulk descriptor to - * see which CPU/NUMA node the majority of pages were allocated - * on, and try to assign the async RPC to the CPU core - * (PDL_POLICY_PREFERRED) to reduce cross-CPU memory traffic. - * - * But on the other hand, we expect that multiple ptlrpcd - * threads and the initial write sponsor can run in parallel, - * especially when data checksum is enabled, which is CPU-bound - * operation and single ptlrpcd thread cannot process in time. - * So more ptlrpcd threads sharing BRW load - * (with PDL_POLICY_ROUND) seems better. - */ - ptlrpcd_add_req(req, pol, -1); } RETURN(rc); } @@ -2146,17 +2270,17 @@ static int osc_set_lock_data_with_check(struct ldlm_lock *lock, LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl); lock_res_and_lock(lock); - cfs_spin_lock(&osc_ast_guard); + spin_lock(&osc_ast_guard); - if (lock->l_ast_data == NULL) - lock->l_ast_data = data; - if (lock->l_ast_data == data) - set = 1; + if (lock->l_ast_data == NULL) + lock->l_ast_data = data; + if (lock->l_ast_data == data) + set = 1; - cfs_spin_unlock(&osc_ast_guard); - unlock_res_and_lock(lock); + spin_unlock(&osc_ast_guard); + unlock_res_and_lock(lock); - return set; + return set; } static int osc_set_data_with_check(struct lustre_handle *lockh, @@ -2180,7 +2304,7 @@ static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm, struct ldlm_res_id res_id; struct obd_device *obd = class_exp2obd(exp); - osc_build_res_name(lsm->lsm_object_id, lsm->lsm_object_seq, &res_id); + ostid_build_res_name(&lsm->lsm_object_oid, &res_id); ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data); return 0; } @@ -2196,7 +2320,7 @@ static int osc_find_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm, struct obd_device *obd = class_exp2obd(exp); int rc = 0; - osc_build_res_name(lsm->lsm_object_id, lsm->lsm_object_seq, &res_id); + ostid_build_res_name(&lsm->lsm_object_oid, &res_id); rc = ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data); if (rc == LDLM_ITER_STOP) return(1); @@ -2207,7 +2331,7 @@ static int osc_find_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm, static int osc_enqueue_fini(struct ptlrpc_request *req, struct ost_lvb *lvb, obd_enqueue_update_f upcall, void *cookie, - int *flags, int agl, int rc) + __u64 *flags, int agl, int rc) { int intent = *flags & LDLM_FL_HAS_INTENT; ENTRY; @@ -2246,7 +2370,7 @@ static int osc_enqueue_interpret(const struct lu_env *env, __u32 mode; struct ost_lvb *lvb; __u32 lvb_len; - int *flags = aa->oa_flags; + __u64 *flags = aa->oa_flags; /* Make a local copy of a lock handle and a mode, because aa->oa_* * might be freed anytime after lock upcall has been called. */ @@ -2354,7 +2478,7 @@ struct ptlrpc_request_set *PTLRPCD_SET = (void *)1; * is excluded from the cluster -- such scenarious make the life difficult, so * release locks just after they are obtained. */ int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id, - int *flags, ldlm_policy_data_t *policy, + __u64 *flags, ldlm_policy_data_t *policy, struct ost_lvb *lvb, int kms_valid, obd_enqueue_update_f upcall, void *cookie, struct ldlm_enqueue_info *einfo, @@ -2422,7 +2546,10 @@ int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id, * are explained in lov_enqueue() */ } - /* We already have a lock, and it's referenced */ + /* We already have a lock, and it's referenced. + * + * At this point, the cl_lock::cll_state is CLS_QUEUING, + * AGL upcall may change it to CLS_HELD directly. */ (*upcall)(cookie, ELDLM_OK); if (einfo->ei_mode != mode) @@ -2461,7 +2588,7 @@ int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id, *flags &= ~LDLM_FL_BLOCK_GRANTED; rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb, - sizeof(*lvb), lockh, async); + sizeof(*lvb), LVB_T_OST, lockh, async); if (rqset) { if (!rc) { struct osc_enqueue_args *aa; @@ -2503,9 +2630,7 @@ static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo, int rc; ENTRY; - osc_build_res_name(oinfo->oi_md->lsm_object_id, - oinfo->oi_md->lsm_object_seq, &res_id); - + ostid_build_res_name(&oinfo->oi_md->lsm_object_oid, &res_id); rc = osc_enqueue_base(exp, &res_id, &oinfo->oi_flags, &oinfo->oi_policy, &oinfo->oi_md->lsm_oinfo[0]->loi_lvb, oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid, @@ -2585,10 +2710,10 @@ static int osc_cancel_unused(struct obd_export *exp, struct obd_device *obd = class_exp2obd(exp); struct ldlm_res_id res_id, *resp = NULL; - if (lsm != NULL) { - resp = osc_build_res_name(lsm->lsm_object_id, - lsm->lsm_object_seq, &res_id); - } + if (lsm != NULL) { + ostid_build_res_name(&lsm->lsm_object_oid, &res_id); + resp = &res_id; + } return ldlm_cli_cancel_unused(obd->obd_namespace, resp, flags, opaque); } @@ -2597,9 +2722,7 @@ static int osc_statfs_interpret(const struct lu_env *env, struct ptlrpc_request *req, struct osc_async_args *aa, int rc) { - struct client_obd *cli = &req->rq_import->imp_obd->u.cli; struct obd_statfs *msfs; - __u64 used; ENTRY; if (rc == -EBADR) @@ -2622,51 +2745,6 @@ static int osc_statfs_interpret(const struct lu_env *env, GOTO(out, rc = -EPROTO); } - /* Reinitialize the RDONLY and DEGRADED flags at the client - * on each statfs, so they don't stay set permanently. */ - cfs_spin_lock(&cli->cl_oscc.oscc_lock); - - if (unlikely(msfs->os_state & OS_STATE_DEGRADED)) - cli->cl_oscc.oscc_flags |= OSCC_FLAG_DEGRADED; - else if (unlikely(cli->cl_oscc.oscc_flags & OSCC_FLAG_DEGRADED)) - cli->cl_oscc.oscc_flags &= ~OSCC_FLAG_DEGRADED; - - if (unlikely(msfs->os_state & OS_STATE_READONLY)) - cli->cl_oscc.oscc_flags |= OSCC_FLAG_RDONLY; - else if (unlikely(cli->cl_oscc.oscc_flags & OSCC_FLAG_RDONLY)) - cli->cl_oscc.oscc_flags &= ~OSCC_FLAG_RDONLY; - - /* Add a bit of hysteresis so this flag isn't continually flapping, - * and ensure that new files don't get extremely fragmented due to - * only a small amount of available space in the filesystem. - * We want to set the NOSPC flag when there is less than ~0.1% free - * and clear it when there is at least ~0.2% free space, so: - * avail < ~0.1% max max = avail + used - * 1025 * avail < avail + used used = blocks - free - * 1024 * avail < used - * 1024 * avail < blocks - free - * avail < ((blocks - free) >> 10) - * - * On very large disk, say 16TB 0.1% will be 16 GB. We don't want to - * lose that amount of space so in those cases we report no space left - * if their is less than 1 GB left. */ - used = min_t(__u64,(msfs->os_blocks - msfs->os_bfree) >> 10, 1 << 30); - if (unlikely(((cli->cl_oscc.oscc_flags & OSCC_FLAG_NOSPC) == 0) && - ((msfs->os_ffree < 32) || (msfs->os_bavail < used)))) - cli->cl_oscc.oscc_flags |= OSCC_FLAG_NOSPC; - else if (unlikely(((cli->cl_oscc.oscc_flags & OSCC_FLAG_NOSPC) != 0) && - (msfs->os_ffree > 64) && - (msfs->os_bavail > (used << 1)))) { - cli->cl_oscc.oscc_flags &= ~(OSCC_FLAG_NOSPC | - OSCC_FLAG_NOSPC_BLK); - } - - if (unlikely(((cli->cl_oscc.oscc_flags & OSCC_FLAG_NOSPC) != 0) && - (msfs->os_bavail < used))) - cli->cl_oscc.oscc_flags |= OSCC_FLAG_NOSPC_BLK; - - cfs_spin_unlock(&cli->cl_oscc.oscc_lock); - *aa->aa_oi->oi_osfs = *msfs; out: rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc); @@ -2729,10 +2807,10 @@ static int osc_statfs(const struct lu_env *env, struct obd_export *exp, /*Since the request might also come from lprocfs, so we need *sync this with client_disconnect_export Bug15684*/ - cfs_down_read(&obd->u.cli.cl_sem); + down_read(&obd->u.cli.cl_sem); if (obd->u.cli.cl_import) imp = class_import_get(obd->u.cli.cl_import); - cfs_up_read(&obd->u.cli.cl_sem); + up_read(&obd->u.cli.cl_sem); if (!imp) RETURN(-ENODEV); @@ -3028,40 +3106,6 @@ static int osc_get_info(const struct lu_env *env, struct obd_export *exp, RETURN(-EINVAL); } -static int osc_setinfo_mds_connect_import(struct obd_import *imp) -{ - struct llog_ctxt *ctxt; - int rc = 0; - ENTRY; - - ctxt = llog_get_context(imp->imp_obd, LLOG_MDS_OST_ORIG_CTXT); - if (ctxt) { - rc = llog_initiator_connect(ctxt); - llog_ctxt_put(ctxt); - } else { - /* XXX return an error? skip setting below flags? */ - } - - cfs_spin_lock(&imp->imp_lock); - imp->imp_server_timeout = 1; - imp->imp_pingable = 1; - cfs_spin_unlock(&imp->imp_lock); - CDEBUG(D_RPCTRACE, "pinging OST %s\n", obd2cli_tgt(imp->imp_obd)); - - RETURN(rc); -} - -static int osc_setinfo_mds_conn_interpret(const struct lu_env *env, - struct ptlrpc_request *req, - void *aa, int rc) -{ - ENTRY; - if (rc != 0) - RETURN(rc); - - RETURN(osc_setinfo_mds_connect_import(req->rq_import)); -} - static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp, obd_count keylen, void *key, obd_count vallen, void *val, struct ptlrpc_request_set *set) @@ -3075,32 +3119,6 @@ static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp, OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10); - if (KEY_IS(KEY_NEXT_ID)) { - obd_id new_val; - struct osc_creator *oscc = &obd->u.cli.cl_oscc; - - if (vallen != sizeof(obd_id)) - RETURN(-ERANGE); - if (val == NULL) - RETURN(-EINVAL); - - if (vallen != sizeof(obd_id)) - RETURN(-EINVAL); - - /* avoid race between allocate new object and set next id - * from ll_sync thread */ - cfs_spin_lock(&oscc->oscc_lock); - new_val = *((obd_id*)val) + 1; - if (new_val > oscc->oscc_next_id) - oscc->oscc_next_id = new_val; - cfs_spin_unlock(&oscc->oscc_lock); - CDEBUG(D_HA, "%s: set oscc_next_id = "LPU64"\n", - exp->exp_obd->obd_name, - obd->u.cli.cl_oscc.oscc_next_id); - - RETURN(0); - } - if (KEY_IS(KEY_CHECKSUM)) { if (vallen != sizeof(int)) RETURN(-EINVAL); @@ -3118,6 +3136,33 @@ static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp, RETURN(0); } + if (KEY_IS(KEY_CACHE_SET)) { + struct client_obd *cli = &obd->u.cli; + + LASSERT(cli->cl_cache == NULL); /* only once */ + cli->cl_cache = (struct cl_client_cache *)val; + cfs_atomic_inc(&cli->cl_cache->ccc_users); + cli->cl_lru_left = &cli->cl_cache->ccc_lru_left; + + /* add this osc into entity list */ + LASSERT(cfs_list_empty(&cli->cl_lru_osc)); + spin_lock(&cli->cl_cache->ccc_lru_lock); + cfs_list_add(&cli->cl_lru_osc, &cli->cl_cache->ccc_lru); + spin_unlock(&cli->cl_cache->ccc_lru_lock); + + RETURN(0); + } + + if (KEY_IS(KEY_CACHE_LRU_SHRINK)) { + struct client_obd *cli = &obd->u.cli; + int nr = cfs_atomic_read(&cli->cl_lru_in_list) >> 1; + int target = *(int *)val; + + nr = osc_lru_shrink(cli, min(nr, target)); + *(int *)val -= nr; + RETURN(0); + } + if (!set && !KEY_IS(KEY_GRANT_SHRINK)) RETURN(-EINVAL); @@ -3128,38 +3173,31 @@ static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp, Even if something bad goes through, we'd get a -EINVAL from OST anyway. */ - if (KEY_IS(KEY_GRANT_SHRINK)) - req = ptlrpc_request_alloc(imp, &RQF_OST_SET_GRANT_INFO); - else - req = ptlrpc_request_alloc(imp, &RQF_OBD_SET_INFO); - - if (req == NULL) - RETURN(-ENOMEM); - - req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY, - RCL_CLIENT, keylen); - req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL, - RCL_CLIENT, vallen); - rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO); - if (rc) { - ptlrpc_request_free(req); - RETURN(rc); - } + req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ? + &RQF_OST_SET_GRANT_INFO : + &RQF_OBD_SET_INFO); + if (req == NULL) + RETURN(-ENOMEM); + + req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY, + RCL_CLIENT, keylen); + if (!KEY_IS(KEY_GRANT_SHRINK)) + req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL, + RCL_CLIENT, vallen); + rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO); + if (rc) { + ptlrpc_request_free(req); + RETURN(rc); + } - tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY); - memcpy(tmp, key, keylen); - tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_VAL); + tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY); + memcpy(tmp, key, keylen); + tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ? + &RMF_OST_BODY : + &RMF_SETINFO_VAL); memcpy(tmp, val, vallen); - if (KEY_IS(KEY_MDS_CONN)) { - struct osc_creator *oscc = &obd->u.cli.cl_oscc; - - oscc->oscc_oa.o_seq = (*(__u32 *)val); - oscc->oscc_oa.o_valid |= OBD_MD_FLGROUP; - LASSERT_SEQ_IS_MDT(oscc->oscc_oa.o_seq); - req->rq_no_delay = req->rq_no_resend = 1; - req->rq_interpret_reply = osc_setinfo_mds_conn_interpret; - } else if (KEY_IS(KEY_GRANT_SHRINK)) { + if (KEY_IS(KEY_GRANT_SHRINK)) { struct osc_grant_args *aa; struct obdo *oa; @@ -3187,101 +3225,31 @@ static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp, } -static struct llog_operations osc_size_repl_logops = { - lop_cancel: llog_obd_repl_cancel -}; - -static struct llog_operations osc_mds_ost_orig_logops; - -static int __osc_llog_init(struct obd_device *obd, struct obd_llog_group *olg, - struct obd_device *tgt, struct llog_catid *catid) -{ - int rc; - ENTRY; - - rc = llog_setup(obd, &obd->obd_olg, LLOG_MDS_OST_ORIG_CTXT, tgt, 1, - &catid->lci_logid, &osc_mds_ost_orig_logops); - if (rc) { - CERROR("failed LLOG_MDS_OST_ORIG_CTXT\n"); - GOTO(out, rc); - } - - rc = llog_setup(obd, &obd->obd_olg, LLOG_SIZE_REPL_CTXT, tgt, 1, - NULL, &osc_size_repl_logops); - if (rc) { - struct llog_ctxt *ctxt = - llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT); - if (ctxt) - llog_cleanup(ctxt); - CERROR("failed LLOG_SIZE_REPL_CTXT\n"); - } - GOTO(out, rc); -out: - if (rc) { - CERROR("osc '%s' tgt '%s' catid %p rc=%d\n", - obd->obd_name, tgt->obd_name, catid, rc); - CERROR("logid "LPX64":0x%x\n", - catid->lci_logid.lgl_oid, catid->lci_logid.lgl_ogen); - } - return rc; -} - static int osc_llog_init(struct obd_device *obd, struct obd_llog_group *olg, struct obd_device *disk_obd, int *index) { - struct llog_catid catid; - static char name[32] = CATLIST; - int rc; - ENTRY; - - LASSERT(olg == &obd->obd_olg); - - cfs_mutex_lock(&olg->olg_cat_processing); - rc = llog_get_cat_list(disk_obd, name, *index, 1, &catid); - if (rc) { - CERROR("rc: %d\n", rc); - GOTO(out, rc); - } - - CDEBUG(D_INFO, "%s: Init llog for %d - catid "LPX64"/"LPX64":%x\n", - obd->obd_name, *index, catid.lci_logid.lgl_oid, - catid.lci_logid.lgl_oseq, catid.lci_logid.lgl_ogen); - - rc = __osc_llog_init(obd, olg, disk_obd, &catid); - if (rc) { - CERROR("rc: %d\n", rc); - GOTO(out, rc); - } - - rc = llog_put_cat_list(disk_obd, name, *index, 1, &catid); - if (rc) { - CERROR("rc: %d\n", rc); - GOTO(out, rc); - } - - out: - cfs_mutex_unlock(&olg->olg_cat_processing); - - return rc; + /* this code is not supposed to be used with LOD/OSP + * to be removed soon */ + LBUG(); + return 0; } static int osc_llog_finish(struct obd_device *obd, int count) { - struct llog_ctxt *ctxt; - int rc = 0, rc2 = 0; - ENTRY; + struct llog_ctxt *ctxt; - ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT); - if (ctxt) - rc = llog_cleanup(ctxt); + ENTRY; - ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT); - if (ctxt) - rc2 = llog_cleanup(ctxt); - if (!rc) - rc = rc2; + ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT); + if (ctxt) { + llog_cat_close(NULL, ctxt->loc_handle); + llog_cleanup(NULL, ctxt); + } - RETURN(rc); + ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT); + if (ctxt) + llog_cleanup(NULL, ctxt); + RETURN(0); } static int osc_reconnect(const struct lu_env *env, @@ -3297,20 +3265,17 @@ static int osc_reconnect(const struct lu_env *env, client_obd_list_lock(&cli->cl_loi_list_lock); data->ocd_grant = (cli->cl_avail_grant + cli->cl_dirty) ?: - 2 * cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT; + 2 * cli_brw_size(obd); lost_grant = cli->cl_lost_grant; cli->cl_lost_grant = 0; client_obd_list_unlock(&cli->cl_loi_list_lock); - CDEBUG(D_CACHE, "request ocd_grant: %d cl_avail_grant: %ld " - "cl_dirty: %ld cl_lost_grant: %ld\n", data->ocd_grant, - cli->cl_avail_grant, cli->cl_dirty, lost_grant); CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d" - " ocd_grant: %d\n", data->ocd_connect_flags, - data->ocd_version, data->ocd_grant); - } + " ocd_grant: %d, lost: %ld.\n", data->ocd_connect_flags, + data->ocd_version, data->ocd_grant, lost_grant); + } - RETURN(0); + RETURN(0); } static int osc_disconnect(struct obd_export *exp) @@ -3324,7 +3289,7 @@ static int osc_disconnect(struct obd_export *exp) if (obd->u.cli.cl_conn_count == 1) { /* Flush any remaining cancel messages out to the * target */ - llog_sync(ctxt, exp); + llog_sync(ctxt, exp, 0); } llog_ctxt_put(ctxt); } else { @@ -3367,14 +3332,6 @@ static int osc_import_event(struct obd_device *obd, switch (event) { case IMP_EVENT_DISCON: { - /* Only do this on the MDS OSC's */ - if (imp->imp_server_timeout) { - struct osc_creator *oscc = &obd->u.cli.cl_oscc; - - cfs_spin_lock(&oscc->oscc_lock); - oscc->oscc_flags |= OSCC_FLAG_RECOVERING; - cfs_spin_unlock(&oscc->oscc_lock); - } cli = &obd->u.cli; client_obd_list_lock(&cli->cl_loi_list_lock); cli->cl_avail_grant = 0; @@ -3395,11 +3352,9 @@ static int osc_import_event(struct obd_device *obd, if (!IS_ERR(env)) { /* Reset grants */ cli = &obd->u.cli; - client_obd_list_lock(&cli->cl_loi_list_lock); /* all pages go to failing rpcs due to the invalid * import */ osc_io_unplug(env, cli, NULL, PDL_POLICY_ROUND); - client_obd_list_unlock(&cli->cl_loi_list_lock); ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY); cl_env_put(env, &refcheck); @@ -3408,15 +3363,6 @@ static int osc_import_event(struct obd_device *obd, break; } case IMP_EVENT_ACTIVE: { - /* Only do this on the MDS OSC's */ - if (imp->imp_server_timeout) { - struct osc_creator *oscc = &obd->u.cli.cl_oscc; - - cfs_spin_lock(&oscc->oscc_lock); - oscc->oscc_flags &= ~(OSCC_FLAG_NOSPC | - OSCC_FLAG_NOSPC_BLK); - cfs_spin_unlock(&oscc->oscc_lock); - } rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL); break; } @@ -3480,64 +3426,64 @@ static int brw_queue_work(const struct lu_env *env, void *data) CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli); - client_obd_list_lock(&cli->cl_loi_list_lock); osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME); - client_obd_list_unlock(&cli->cl_loi_list_lock); RETURN(0); } int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg) { - struct client_obd *cli = &obd->u.cli; - int rc; - ENTRY; - - ENTRY; - rc = ptlrpcd_addref(); - if (rc) - RETURN(rc); - - rc = client_obd_setup(obd, lcfg); - if (rc == 0) { - void *handler; - handler = ptlrpcd_alloc_work(cli->cl_import, - brw_queue_work, cli); - if (!IS_ERR(handler)) - cli->cl_writeback_work = handler; - else - rc = PTR_ERR(handler); - } - - if (rc == 0) { - struct lprocfs_static_vars lvars = { 0 }; - - cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL; - lprocfs_osc_init_vars(&lvars); - if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) { - lproc_osc_attach_seqstat(obd); - sptlrpc_lprocfs_cliobd_attach(obd); - ptlrpc_lprocfs_register_obd(obd); - } - - oscc_init(obd); - /* We need to allocate a few requests more, because - brw_interpret tries to create new requests before freeing - previous ones. Ideally we want to have 2x max_rpcs_in_flight - reserved, but I afraid that might be too much wasted RAM - in fact, so 2 is just my guess and still should work. */ - cli->cl_import->imp_rq_pool = - ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2, - OST_MAXREQSIZE, - ptlrpc_add_rqs_to_pool); + struct lprocfs_static_vars lvars = { 0 }; + struct client_obd *cli = &obd->u.cli; + void *handler; + int rc; + ENTRY; - CFS_INIT_LIST_HEAD(&cli->cl_grant_shrink_list); + rc = ptlrpcd_addref(); + if (rc) + RETURN(rc); + + rc = client_obd_setup(obd, lcfg); + if (rc) + GOTO(out_ptlrpcd, rc); + + handler = ptlrpcd_alloc_work(cli->cl_import, brw_queue_work, cli); + if (IS_ERR(handler)) + GOTO(out_client_setup, rc = PTR_ERR(handler)); + cli->cl_writeback_work = handler; + + rc = osc_quota_setup(obd); + if (rc) + GOTO(out_ptlrpcd_work, rc); + + cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL; + lprocfs_osc_init_vars(&lvars); + if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) { + lproc_osc_attach_seqstat(obd); + sptlrpc_lprocfs_cliobd_attach(obd); + ptlrpc_lprocfs_register_obd(obd); + } - ns_register_cancel(obd->obd_namespace, osc_cancel_for_recovery); - } + /* We need to allocate a few requests more, because + * brw_interpret tries to create new requests before freeing + * previous ones, Ideally we want to have 2x max_rpcs_in_flight + * reserved, but I'm afraid that might be too much wasted RAM + * in fact, so 2 is just my guess and still should work. */ + cli->cl_import->imp_rq_pool = + ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2, + OST_MAXREQSIZE, + ptlrpc_add_rqs_to_pool); + + CFS_INIT_LIST_HEAD(&cli->cl_grant_shrink_list); + ns_register_cancel(obd->obd_namespace, osc_cancel_for_recovery); + RETURN(rc); - if (rc) - ptlrpcd_decref(); - RETURN(rc); +out_ptlrpcd_work: + ptlrpcd_destroy_work(handler); +out_client_setup: + client_obd_cleanup(obd); +out_ptlrpcd: + ptlrpcd_decref(); + RETURN(rc); } static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage) @@ -3552,9 +3498,9 @@ static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage) CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name); /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */ ptlrpc_deactivate_import(imp); - cfs_spin_lock(&imp->imp_lock); - imp->imp_pingable = 0; - cfs_spin_unlock(&imp->imp_lock); + spin_lock(&imp->imp_lock); + imp->imp_pingable = 0; + spin_unlock(&imp->imp_lock); break; } case OBD_CLEANUP_EXPORTS: { @@ -3587,9 +3533,21 @@ static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage) int osc_cleanup(struct obd_device *obd) { - int rc; + struct client_obd *cli = &obd->u.cli; + int rc; - ENTRY; + ENTRY; + + /* lru cleanup */ + if (cli->cl_cache != NULL) { + LASSERT(cfs_atomic_read(&cli->cl_cache->ccc_users) > 0); + spin_lock(&cli->cl_cache->ccc_lru_lock); + cfs_list_del_init(&cli->cl_lru_osc); + spin_unlock(&cli->cl_cache->ccc_lru_lock); + cli->cl_lru_left = NULL; + cfs_atomic_dec(&cli->cl_cache->ccc_users); + cli->cl_cache = NULL; + } /* free memory of osc quota cache */ osc_quota_cleanup(obd); @@ -3638,9 +3596,7 @@ struct obd_ops osc_obd_ops = { .o_statfs_async = osc_statfs_async, .o_packmd = osc_packmd, .o_unpackmd = osc_unpackmd, - .o_precreate = osc_precreate, .o_create = osc_create, - .o_create_async = osc_create_async, .o_destroy = osc_destroy, .o_getattr = osc_getattr, .o_getattr_async = osc_getattr_async, @@ -3663,12 +3619,11 @@ struct obd_ops osc_obd_ops = { .o_process_config = osc_process_config, .o_quotactl = osc_quotactl, .o_quotacheck = osc_quotacheck, - .o_quota_adjust_qunit = osc_quota_adjust_qunit, }; extern struct lu_kmem_descr osc_caches[]; -extern cfs_spinlock_t osc_ast_guard; -extern cfs_lock_class_key_t osc_ast_guard_class; +extern spinlock_t osc_ast_guard; +extern struct lock_class_key osc_ast_guard_class; int __init osc_init(void) { @@ -3685,7 +3640,6 @@ int __init osc_init(void) lprocfs_osc_init_vars(&lvars); - osc_quota_init(); rc = class_register_type(&osc_obd_ops, NULL, lvars.module_vars, LUSTRE_OSC_NAME, &osc_device_type); if (rc) { @@ -3693,22 +3647,15 @@ int __init osc_init(void) RETURN(rc); } - cfs_spin_lock_init(&osc_ast_guard); - cfs_lockdep_set_class(&osc_ast_guard, &osc_ast_guard_class); - - osc_mds_ost_orig_logops = llog_lvfs_ops; - osc_mds_ost_orig_logops.lop_setup = llog_obd_origin_setup; - osc_mds_ost_orig_logops.lop_cleanup = llog_obd_origin_cleanup; - osc_mds_ost_orig_logops.lop_add = llog_obd_origin_add; - osc_mds_ost_orig_logops.lop_connect = llog_origin_connect; + spin_lock_init(&osc_ast_guard); + lockdep_set_class(&osc_ast_guard, &osc_ast_guard_class); - RETURN(rc); + RETURN(rc); } #ifdef __KERNEL__ static void /*__exit*/ osc_exit(void) { - osc_quota_exit(); class_unregister_type(LUSTRE_OSC_NAME); lu_kmem_fini(osc_caches); }