X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Fosc%2Fosc_request.c;h=cf3136ecea9838d0d602b2ebfe79ec85c2ab8641;hp=aafe470908c1c8af60af9f42ad90a1049cd85927;hb=4c4c327b25f3414f20a9ae600e7311f1aa3a866d;hpb=80a818b80373bebd1438a74aeebda102b4885e53 diff --git a/lustre/osc/osc_request.c b/lustre/osc/osc_request.c index aafe470..cf3136e 100644 --- a/lustre/osc/osc_request.c +++ b/lustre/osc/osc_request.c @@ -15,11 +15,7 @@ * * You should have received a copy of the GNU General Public License * version 2 along with this program; If not, see - * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf - * - * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara, - * CA 95054 USA or visit www.sun.com if you need additional information or - * have any questions. + * http://www.gnu.org/licenses/gpl-2.0.html * * GPL HEADER END */ @@ -27,7 +23,7 @@ * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved. * Use is subject to license terms. * - * Copyright (c) 2011, 2015, Intel Corporation. + * Copyright (c) 2011, 2017, Intel Corporation. */ /* * This file is part of Lustre, http://www.lustre.org/ @@ -36,24 +32,20 @@ #define DEBUG_SUBSYSTEM S_OSC -#include - -#include - +#include #include #include #include #include #include -#include +#include #include #include -#include #include #include #include +#include -#include "osc_cl_internal.h" #include "osc_internal.h" atomic_t osc_pool_req_count; @@ -64,17 +56,8 @@ struct ptlrpc_request_pool *osc_rq_pool; static unsigned int osc_reqpool_mem_max = 5; module_param(osc_reqpool_mem_max, uint, 0444); -struct osc_brw_async_args { - struct obdo *aa_oa; - int aa_requested_nob; - int aa_nio_count; - u32 aa_page_count; - int aa_resends; - struct brw_page **aa_ppga; - struct client_obd *aa_cli; - struct list_head aa_oaps; - struct list_head aa_exts; -}; +static int osc_idle_timeout = 20; +module_param(osc_idle_timeout, uint, 0644); #define osc_grant_args osc_brw_async_args @@ -97,18 +80,6 @@ struct osc_ladvise_args { void *la_cookie; }; -struct osc_enqueue_args { - struct obd_export *oa_exp; - enum ldlm_type oa_type; - enum ldlm_mode oa_mode; - __u64 *oa_flags; - osc_enqueue_upcall_f oa_upcall; - void *oa_cookie; - struct ost_lvb *oa_lvb; - struct lustre_handle oa_lockh; - unsigned int oa_agl:1; -}; - static void osc_release_ppga(struct brw_page **ppga, size_t count); static int brw_interpret(const struct lu_env *env, struct ptlrpc_request *req, void *data, int rc); @@ -208,24 +179,25 @@ out: } static int osc_setattr_interpret(const struct lu_env *env, - struct ptlrpc_request *req, - struct osc_setattr_args *sa, int rc) + struct ptlrpc_request *req, void *args, int rc) { - struct ost_body *body; - ENTRY; + struct osc_setattr_args *sa = args; + struct ost_body *body; - if (rc != 0) - GOTO(out, rc); + ENTRY; - body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY); - if (body == NULL) - GOTO(out, rc = -EPROTO); + if (rc != 0) + GOTO(out, rc); + + body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY); + if (body == NULL) + GOTO(out, rc = -EPROTO); lustre_get_wire_obdo(&req->rq_import->imp_connect_data, sa->sa_oa, &body->oa); out: - rc = sa->sa_upcall(sa->sa_cookie, rc); - RETURN(rc); + rc = sa->sa_upcall(sa->sa_cookie, rc); + RETURN(rc); } int osc_setattr_async(struct obd_export *exp, struct obdo *oa, @@ -257,11 +229,9 @@ int osc_setattr_async(struct obd_export *exp, struct obdo *oa, /* Do not wait for response. */ ptlrpcd_add_req(req); } else { - req->rq_interpret_reply = - (ptlrpc_interpterer_t)osc_setattr_interpret; + req->rq_interpret_reply = osc_setattr_interpret; - CLASSERT(sizeof(*sa) <= sizeof(req->rq_async_args)); - sa = ptlrpc_req_async_args(req); + sa = ptlrpc_req_async_args(sa, req); sa->sa_oa = oa; sa->sa_upcall = upcall; sa->sa_cookie = cookie; @@ -349,8 +319,7 @@ int osc_ladvise_base(struct obd_export *exp, struct obdo *oa, } req->rq_interpret_reply = osc_ladvise_interpret; - CLASSERT(sizeof(*la) <= sizeof(req->rq_async_args)); - la = ptlrpc_req_async_args(req); + la = ptlrpc_req_async_args(la, req); la->la_oa = oa; la->la_upcall = upcall; la->la_cookie = cookie; @@ -414,57 +383,57 @@ out: RETURN(rc); } -int osc_punch_base(struct obd_export *exp, struct obdo *oa, - obd_enqueue_update_f upcall, void *cookie, - struct ptlrpc_request_set *rqset) +int osc_punch_send(struct obd_export *exp, struct obdo *oa, + obd_enqueue_update_f upcall, void *cookie) { - struct ptlrpc_request *req; - struct osc_setattr_args *sa; - struct ost_body *body; - int rc; - ENTRY; + struct ptlrpc_request *req; + struct osc_setattr_args *sa; + struct obd_import *imp = class_exp2cliimp(exp); + struct ost_body *body; + int rc; - req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH); - if (req == NULL) - RETURN(-ENOMEM); + ENTRY; - rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH); - if (rc) { - ptlrpc_request_free(req); - RETURN(rc); - } - req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */ - ptlrpc_at_set_req_timeout(req); + req = ptlrpc_request_alloc(imp, &RQF_OST_PUNCH); + if (req == NULL) + RETURN(-ENOMEM); + + rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH); + if (rc < 0) { + ptlrpc_request_free(req); + RETURN(rc); + } + + osc_set_io_portal(req); + + ptlrpc_at_set_req_timeout(req); body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY); - LASSERT(body); - lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa); + + lustre_set_wire_obdo(&imp->imp_connect_data, &body->oa, oa); ptlrpc_request_set_replen(req); - req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret; - CLASSERT(sizeof(*sa) <= sizeof(req->rq_async_args)); - sa = ptlrpc_req_async_args(req); + req->rq_interpret_reply = osc_setattr_interpret; + sa = ptlrpc_req_async_args(sa, req); sa->sa_oa = oa; sa->sa_upcall = upcall; sa->sa_cookie = cookie; - if (rqset == PTLRPCD_SET) - ptlrpcd_add_req(req); - else - ptlrpc_set_add_req(rqset, req); + + ptlrpcd_add_req(req); RETURN(0); } +EXPORT_SYMBOL(osc_punch_send); static int osc_sync_interpret(const struct lu_env *env, - struct ptlrpc_request *req, - void *arg, int rc) + struct ptlrpc_request *req, void *args, int rc) { - struct osc_fsync_args *fa = arg; - struct ost_body *body; - struct cl_attr *attr = &osc_env_info(env)->oti_attr; - unsigned long valid = 0; - struct cl_object *obj; + struct osc_fsync_args *fa = args; + struct ost_body *body; + struct cl_attr *attr = &osc_env_info(env)->oti_attr; + unsigned long valid = 0; + struct cl_object *obj; ENTRY; if (rc != 0) @@ -524,8 +493,7 @@ int osc_sync_base(struct osc_object *obj, struct obdo *oa, ptlrpc_request_set_replen(req); req->rq_interpret_reply = osc_sync_interpret; - CLASSERT(sizeof(*fa) <= sizeof(req->rq_async_args)); - fa = ptlrpc_req_async_args(req); + fa = ptlrpc_req_async_args(fa, req); fa->fa_obj = obj; fa->fa_oa = oa; fa->fa_upcall = upcall; @@ -575,13 +543,13 @@ static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa, } static int osc_destroy_interpret(const struct lu_env *env, - struct ptlrpc_request *req, void *data, - int rc) + struct ptlrpc_request *req, void *args, int rc) { struct client_obd *cli = &req->rq_import->imp_obd->u.cli; atomic_dec(&cli->cl_destroy_in_flight); wake_up(&cli->cl_destroy_waitq); + return 0; } @@ -609,7 +577,7 @@ static int osc_destroy(const struct lu_env *env, struct obd_export *exp, struct client_obd *cli = &exp->exp_obd->u.cli; struct ptlrpc_request *req; struct ost_body *body; - struct list_head cancels = LIST_HEAD_INIT(cancels); + LIST_HEAD(cancels); int rc, count; ENTRY; @@ -651,8 +619,12 @@ static int osc_destroy(const struct lu_env *env, struct obd_export *exp, * Wait until the number of on-going destroy RPCs drops * under max_rpc_in_flight */ - l_wait_event_exclusive(cli->cl_destroy_waitq, - osc_can_send_destroy(cli), &lwi); + rc = l_wait_event_exclusive(cli->cl_destroy_waitq, + osc_can_send_destroy(cli), &lwi); + if (rc) { + ptlrpc_req_finished(req); + RETURN(rc); + } } /* Do not wait for response */ @@ -672,22 +644,19 @@ static void osc_announce_cached(struct client_obd *cli, struct obdo *oa, if (OCD_HAS_FLAG(&cli->cl_import->imp_connect_data, GRANT_PARAM)) oa->o_dirty = cli->cl_dirty_grant; else - oa->o_dirty = cli->cl_dirty_pages << PAGE_CACHE_SHIFT; - if (unlikely(cli->cl_dirty_pages - cli->cl_dirty_transit > - cli->cl_dirty_max_pages)) { - CERROR("dirty %lu - %lu > dirty_max %lu\n", - cli->cl_dirty_pages, cli->cl_dirty_transit, + oa->o_dirty = cli->cl_dirty_pages << PAGE_SHIFT; + if (unlikely(cli->cl_dirty_pages > cli->cl_dirty_max_pages)) { + CERROR("dirty %lu > dirty_max %lu\n", + cli->cl_dirty_pages, cli->cl_dirty_max_pages); oa->o_undirty = 0; - } else if (unlikely(atomic_long_read(&obd_dirty_pages) - - atomic_long_read(&obd_dirty_transit_pages) > + } else if (unlikely(atomic_long_read(&obd_dirty_pages) > (long)(obd_max_dirty_pages + 1))) { /* The atomic_read() allowing the atomic_inc() are * not covered by a lock thus they may safely race and trip * this CERROR() unless we add in a small fudge factor (+1). */ - CERROR("%s: dirty %ld - %ld > system dirty_max %ld\n", + CERROR("%s: dirty %ld > system dirty_max %ld\n", cli_name(cli), atomic_long_read(&obd_dirty_pages), - atomic_long_read(&obd_dirty_transit_pages), obd_max_dirty_pages); oa->o_undirty = 0; } else if (unlikely(cli->cl_dirty_max_pages - cli->cl_dirty_pages > @@ -697,11 +666,12 @@ static void osc_announce_cached(struct client_obd *cli, struct obdo *oa, oa->o_undirty = 0; } else { unsigned long nrpages; + unsigned long undirty; nrpages = cli->cl_max_pages_per_rpc; nrpages *= cli->cl_max_rpcs_in_flight + 1; nrpages = max(nrpages, cli->cl_dirty_max_pages); - oa->o_undirty = nrpages << PAGE_CACHE_SHIFT; + undirty = nrpages << PAGE_SHIFT; if (OCD_HAS_FLAG(&cli->cl_import->imp_connect_data, GRANT_PARAM)) { int nrextents; @@ -710,8 +680,13 @@ static void osc_announce_cached(struct client_obd *cli, struct obdo *oa, * grant space */ nrextents = (nrpages + cli->cl_max_extent_pages - 1) / cli->cl_max_extent_pages; - oa->o_undirty += nrextents * cli->cl_grant_extent_tax; + undirty += nrextents * cli->cl_grant_extent_tax; } + /* Do not ask for more than OBD_MAX_GRANT - a margin for server + * to add extent tax, etc. + */ + oa->o_undirty = min(undirty, OBD_MAX_GRANT & + ~(PTLRPC_MAX_BRW_SIZE * 4UL)); } oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant; oa->o_dropped = cli->cl_lost_grant; @@ -723,10 +698,11 @@ static void osc_announce_cached(struct client_obd *cli, struct obdo *oa, void osc_update_next_shrink(struct client_obd *cli) { - cli->cl_next_shrink_grant = - cfs_time_shift(cli->cl_grant_shrink_interval); - CDEBUG(D_CACHE, "next time %ld to shrink grant \n", - cli->cl_next_shrink_grant); + cli->cl_next_shrink_grant = ktime_get_seconds() + + cli->cl_grant_shrink_interval; + + CDEBUG(D_CACHE, "next time %lld to shrink grant\n", + cli->cl_next_shrink_grant); } static void __osc_update_grant(struct client_obd *cli, u64 grant) @@ -744,30 +720,37 @@ static void osc_update_grant(struct client_obd *cli, struct ost_body *body) } } -static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp, - u32 keylen, void *key, - u32 vallen, void *val, - struct ptlrpc_request_set *set); +/** + * grant thread data for shrinking space. + */ +struct grant_thread_data { + struct list_head gtd_clients; + struct mutex gtd_mutex; + unsigned long gtd_stopped:1; +}; +static struct grant_thread_data client_gtd; static int osc_shrink_grant_interpret(const struct lu_env *env, - struct ptlrpc_request *req, - void *aa, int rc) + struct ptlrpc_request *req, + void *args, int rc) { - struct client_obd *cli = &req->rq_import->imp_obd->u.cli; - struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa; - struct ost_body *body; + struct osc_grant_args *aa = args; + struct client_obd *cli = &req->rq_import->imp_obd->u.cli; + struct ost_body *body; - if (rc != 0) { - __osc_update_grant(cli, oa->o_grant); - GOTO(out, rc); - } + if (rc != 0) { + __osc_update_grant(cli, aa->aa_oa->o_grant); + GOTO(out, rc); + } - body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY); - LASSERT(body); - osc_update_grant(cli, body); + body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY); + LASSERT(body); + osc_update_grant(cli, body); out: - OBDO_FREE(oa); - return rc; + OBD_SLAB_FREE_PTR(aa->aa_oa, osc_obdo_kmem); + aa->aa_oa = NULL; + + return rc; } static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa) @@ -791,11 +774,11 @@ static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa) static int osc_shrink_grant(struct client_obd *cli) { __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) * - (cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT); + (cli->cl_max_pages_per_rpc << PAGE_SHIFT); spin_lock(&cli->cl_loi_list_lock); if (cli->cl_avail_grant <= target_bytes) - target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT; + target_bytes = cli->cl_max_pages_per_rpc << PAGE_SHIFT; spin_unlock(&cli->cl_loi_list_lock); return osc_shrink_grant_to_target(cli, target_bytes); @@ -811,8 +794,8 @@ int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes) /* Don't shrink if we are already above or below the desired limit * We don't want to shrink below a single RPC, as that will negatively * impact block allocation and long-term performance. */ - if (target_bytes < cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT) - target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT; + if (target_bytes < cli->cl_max_pages_per_rpc << PAGE_SHIFT) + target_bytes = cli->cl_max_pages_per_rpc << PAGE_SHIFT; if (target_bytes >= cli->cl_avail_grant) { spin_unlock(&cli->cl_loi_list_lock); @@ -827,6 +810,11 @@ int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes) osc_announce_cached(cli, &body->oa, 0); spin_lock(&cli->cl_loi_list_lock); + if (target_bytes >= cli->cl_avail_grant) { + /* available grant has changed since target calculation */ + spin_unlock(&cli->cl_loi_list_lock); + GOTO(out_free, rc = 0); + } body->oa.o_grant = cli->cl_avail_grant - target_bytes; cli->cl_avail_grant = target_bytes; spin_unlock(&cli->cl_loi_list_lock); @@ -842,24 +830,29 @@ int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes) sizeof(*body), body, NULL); if (rc != 0) __osc_update_grant(cli, body->oa.o_grant); +out_free: OBD_FREE_PTR(body); RETURN(rc); } static int osc_should_shrink_grant(struct client_obd *client) { - cfs_time_t time = cfs_time_current(); - cfs_time_t next_shrink = client->cl_next_shrink_grant; + time64_t next_shrink = client->cl_next_shrink_grant; - if ((client->cl_import->imp_connect_data.ocd_connect_flags & - OBD_CONNECT_GRANT_SHRINK) == 0) - return 0; + if (client->cl_import == NULL) + return 0; - if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) { + if (!OCD_HAS_FLAG(&client->cl_import->imp_connect_data, GRANT_SHRINK) || + client->cl_import->imp_grant_shrink_disabled) { + osc_update_next_shrink(client); + return 0; + } + + if (ktime_get_seconds() >= next_shrink - 5) { /* Get the current RPC size directly, instead of going via: * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export) * Keep comment here so that it can be found by searching. */ - int brw_size = client->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT; + int brw_size = client->cl_max_pages_per_rpc << PAGE_SHIFT; if (client->cl_import->imp_state == LUSTRE_IMP_FULL && client->cl_avail_grant > brw_size) @@ -870,41 +863,95 @@ static int osc_should_shrink_grant(struct client_obd *client) return 0; } -static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data) +#define GRANT_SHRINK_RPC_BATCH 100 + +static struct delayed_work work; + +static void osc_grant_work_handler(struct work_struct *data) { - struct client_obd *client; + struct client_obd *cli; + int rpc_sent; + bool init_next_shrink = true; + time64_t next_shrink = ktime_get_seconds() + GRANT_SHRINK_INTERVAL; + + rpc_sent = 0; + mutex_lock(&client_gtd.gtd_mutex); + list_for_each_entry(cli, &client_gtd.gtd_clients, + cl_grant_chain) { + if (rpc_sent < GRANT_SHRINK_RPC_BATCH && + osc_should_shrink_grant(cli)) { + osc_shrink_grant(cli); + rpc_sent++; + } + + if (!init_next_shrink) { + if (cli->cl_next_shrink_grant < next_shrink && + cli->cl_next_shrink_grant > ktime_get_seconds()) + next_shrink = cli->cl_next_shrink_grant; + } else { + init_next_shrink = false; + next_shrink = cli->cl_next_shrink_grant; + } + } + mutex_unlock(&client_gtd.gtd_mutex); + + if (client_gtd.gtd_stopped == 1) + return; - list_for_each_entry(client, &item->ti_obd_list, cl_grant_shrink_list) { - if (osc_should_shrink_grant(client)) - osc_shrink_grant(client); + if (next_shrink > ktime_get_seconds()) { + time64_t delay = next_shrink - ktime_get_seconds(); + + schedule_delayed_work(&work, cfs_time_seconds(delay)); + } else { + schedule_work(&work.work); } - return 0; } -static int osc_add_shrink_grant(struct client_obd *client) +void osc_schedule_grant_work(void) { - int rc; + cancel_delayed_work_sync(&work); + schedule_work(&work.work); +} + +/** + * Start grant thread for returing grant to server for idle clients. + */ +static int osc_start_grant_work(void) +{ + client_gtd.gtd_stopped = 0; + mutex_init(&client_gtd.gtd_mutex); + INIT_LIST_HEAD(&client_gtd.gtd_clients); + + INIT_DELAYED_WORK(&work, osc_grant_work_handler); + schedule_work(&work.work); - rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval, - TIMEOUT_GRANT, - osc_grant_shrink_grant_cb, NULL, - &client->cl_grant_shrink_list); - if (rc) { - CERROR("add grant client %s error %d\n", cli_name(client), rc); - return rc; - } - CDEBUG(D_CACHE, "add grant client %s\n", cli_name(client)); - osc_update_next_shrink(client); return 0; } -static int osc_del_shrink_grant(struct client_obd *client) +static void osc_stop_grant_work(void) +{ + client_gtd.gtd_stopped = 1; + cancel_delayed_work_sync(&work); +} + +static void osc_add_grant_list(struct client_obd *client) { - return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list, - TIMEOUT_GRANT); + mutex_lock(&client_gtd.gtd_mutex); + list_add(&client->cl_grant_chain, &client_gtd.gtd_clients); + mutex_unlock(&client_gtd.gtd_mutex); +} + +static void osc_del_grant_list(struct client_obd *client) +{ + if (list_empty(&client->cl_grant_chain)) + return; + + mutex_lock(&client_gtd.gtd_mutex); + list_del_init(&client->cl_grant_chain); + mutex_unlock(&client_gtd.gtd_mutex); } -static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd) +void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd) { /* * ocd_grant is the total grant amount we're expect to hold: if we've @@ -923,18 +970,9 @@ static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd) cli->cl_avail_grant -= cli->cl_dirty_grant; else cli->cl_avail_grant -= - cli->cl_dirty_pages << PAGE_CACHE_SHIFT; + cli->cl_dirty_pages << PAGE_SHIFT; } - if (cli->cl_avail_grant < 0) { - CWARN("%s: available grant < 0: avail/ocd/dirty %ld/%u/%ld\n", - cli_name(cli), cli->cl_avail_grant, - ocd->ocd_grant, cli->cl_dirty_pages << PAGE_CACHE_SHIFT); - /* workaround for servers which do not have the patch from - * LU-2679 */ - cli->cl_avail_grant = ocd->ocd_grant; - } - if (OCD_HAS_FLAG(ocd, GRANT_PARAM)) { u64 size; int chunk_mask; @@ -966,10 +1004,10 @@ static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd) cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits, cli->cl_max_extent_pages); - if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK && - list_empty(&cli->cl_grant_shrink_list)) - osc_add_shrink_grant(cli); + if (OCD_HAS_FLAG(ocd, GRANT_SHRINK) && list_empty(&cli->cl_grant_chain)) + osc_add_grant_list(cli); } +EXPORT_SYMBOL(osc_init_grant); /* We assume that the reason this OSC got a short read is because it read * beyond the end of a stripe file; i.e. lustre is reading a sparse file @@ -1027,8 +1065,11 @@ static int check_write_rcs(struct ptlrpc_request *req, /* return error if any niobuf was in error */ for (i = 0; i < niocount; i++) { - if ((int)remote_rcs[i] < 0) - return(remote_rcs[i]); + if ((int)remote_rcs[i] < 0) { + CDEBUG(D_INFO, "rc[%d]: %d req %p\n", + i, remote_rcs[i], req); + return remote_rcs[i]; + } if (remote_rcs[i] != 0) { CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n", @@ -1036,8 +1077,8 @@ static int check_write_rcs(struct ptlrpc_request *req, return(-EPROTO); } } - - if (req->rq_bulk->bd_nob_transferred != requested_nob) { + if (req->rq_bulk != NULL && + req->rq_bulk->bd_nob_transferred != requested_nob) { CERROR("Unexpected # bytes transferred: %d (requested %d)\n", req->rq_bulk->bd_nob_transferred, requested_nob); return(-EPROTO); @@ -1057,7 +1098,7 @@ static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2) * safe to combine */ if (unlikely((p1->flag & mask) != (p2->flag & mask))) { CWARN("Saw flags 0x%x and 0x%x in the same brw, please " - "report this at https://jira.hpdd.intel.com/\n", + "report this at https://jira.whamcloud.com/\n", p1->flag, p2->flag); } return 0; @@ -1066,24 +1107,128 @@ static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2) return (p1->off + p1->count == p2->off); } -static u32 osc_checksum_bulk(int nob, size_t pg_count, +#if IS_ENABLED(CONFIG_CRC_T10DIF) +static int osc_checksum_bulk_t10pi(const char *obd_name, int nob, + size_t pg_count, struct brw_page **pga, + int opc, obd_dif_csum_fn *fn, + int sector_size, + u32 *check_sum) +{ + struct ahash_request *req; + /* Used Adler as the default checksum type on top of DIF tags */ + unsigned char cfs_alg = cksum_obd2cfs(OBD_CKSUM_T10_TOP); + struct page *__page; + unsigned char *buffer; + __u16 *guard_start; + unsigned int bufsize; + int guard_number; + int used_number = 0; + int used; + u32 cksum; + int rc = 0; + int i = 0; + + LASSERT(pg_count > 0); + + __page = alloc_page(GFP_KERNEL); + if (__page == NULL) + return -ENOMEM; + + req = cfs_crypto_hash_init(cfs_alg, NULL, 0); + if (IS_ERR(req)) { + rc = PTR_ERR(req); + CERROR("%s: unable to initialize checksum hash %s: rc = %d\n", + obd_name, cfs_crypto_hash_name(cfs_alg), rc); + GOTO(out, rc); + } + + buffer = kmap(__page); + guard_start = (__u16 *)buffer; + guard_number = PAGE_SIZE / sizeof(*guard_start); + while (nob > 0 && pg_count > 0) { + unsigned int count = pga[i]->count > nob ? nob : pga[i]->count; + + /* corrupt the data before we compute the checksum, to + * simulate an OST->client data error */ + if (unlikely(i == 0 && opc == OST_READ && + OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE))) { + unsigned char *ptr = kmap(pga[i]->pg); + int off = pga[i]->off & ~PAGE_MASK; + + memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob)); + kunmap(pga[i]->pg); + } + + /* + * The left guard number should be able to hold checksums of a + * whole page + */ + rc = obd_page_dif_generate_buffer(obd_name, pga[i]->pg, + pga[i]->off & ~PAGE_MASK, + count, + guard_start + used_number, + guard_number - used_number, + &used, sector_size, + fn); + if (rc) + break; + + used_number += used; + if (used_number == guard_number) { + cfs_crypto_hash_update_page(req, __page, 0, + used_number * sizeof(*guard_start)); + used_number = 0; + } + + nob -= pga[i]->count; + pg_count--; + i++; + } + kunmap(__page); + if (rc) + GOTO(out, rc); + + if (used_number != 0) + cfs_crypto_hash_update_page(req, __page, 0, + used_number * sizeof(*guard_start)); + + bufsize = sizeof(cksum); + cfs_crypto_hash_final(req, (unsigned char *)&cksum, &bufsize); + + /* For sending we only compute the wrong checksum instead + * of corrupting the data so it is still correct on a redo */ + if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND)) + cksum++; + + *check_sum = cksum; +out: + __free_page(__page); + return rc; +} +#else /* !CONFIG_CRC_T10DIF */ +#define obd_dif_ip_fn NULL +#define obd_dif_crc_fn NULL +#define osc_checksum_bulk_t10pi(name, nob, pgc, pga, opc, fn, ssize, csum) \ + -EOPNOTSUPP +#endif /* CONFIG_CRC_T10DIF */ + +static int osc_checksum_bulk(int nob, size_t pg_count, struct brw_page **pga, int opc, - cksum_type_t cksum_type) + enum cksum_types cksum_type, + u32 *cksum) { - u32 cksum; int i = 0; - struct cfs_crypto_hash_desc *hdesc; + struct ahash_request *req; unsigned int bufsize; - int err; unsigned char cfs_alg = cksum_obd2cfs(cksum_type); LASSERT(pg_count > 0); - hdesc = cfs_crypto_hash_init(cfs_alg, NULL, 0); - if (IS_ERR(hdesc)) { + req = cfs_crypto_hash_init(cfs_alg, NULL, 0); + if (IS_ERR(req)) { CERROR("Unable to initialize checksum hash %s\n", cfs_crypto_hash_name(cfs_alg)); - return PTR_ERR(hdesc); + return PTR_ERR(req); } while (nob > 0 && pg_count > 0) { @@ -1099,7 +1244,7 @@ static u32 osc_checksum_bulk(int nob, size_t pg_count, memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob)); kunmap(pga[i]->pg); } - cfs_crypto_hash_update_page(hdesc, pga[i]->pg, + cfs_crypto_hash_update_page(req, pga[i]->pg, pga[i]->off & ~PAGE_MASK, count); LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d\n", @@ -1110,15 +1255,38 @@ static u32 osc_checksum_bulk(int nob, size_t pg_count, i++; } - bufsize = sizeof(cksum); - err = cfs_crypto_hash_final(hdesc, (unsigned char *)&cksum, &bufsize); + bufsize = sizeof(*cksum); + cfs_crypto_hash_final(req, (unsigned char *)cksum, &bufsize); /* For sending we only compute the wrong checksum instead * of corrupting the data so it is still correct on a redo */ if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND)) - cksum++; + (*cksum)++; + + return 0; +} + +static int osc_checksum_bulk_rw(const char *obd_name, + enum cksum_types cksum_type, + int nob, size_t pg_count, + struct brw_page **pga, int opc, + u32 *check_sum) +{ + obd_dif_csum_fn *fn = NULL; + int sector_size = 0; + int rc; + + ENTRY; + obd_t10_cksum2dif(cksum_type, &fn, §or_size); + + if (fn) + rc = osc_checksum_bulk_t10pi(obd_name, nob, pg_count, pga, + opc, fn, sector_size, check_sum); + else + rc = osc_checksum_bulk(nob, pg_count, pga, opc, cksum_type, + check_sum); - return cksum; + RETURN(rc); } static int @@ -1131,10 +1299,12 @@ osc_brw_prep_request(int cmd, struct client_obd *cli, struct obdo *oa, struct ost_body *body; struct obd_ioobj *ioobj; struct niobuf_remote *niobuf; - int niocount, i, requested_nob, opc, rc; + int niocount, i, requested_nob, opc, rc, short_io_size = 0; struct osc_brw_async_args *aa; struct req_capsule *pill; struct brw_page *pg_prev; + void *short_io_buf; + const char *obd_name = cli->cl_import->imp_obd->obd_name; ENTRY; if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ)) @@ -1165,17 +1335,38 @@ osc_brw_prep_request(int cmd, struct client_obd *cli, struct obdo *oa, req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT, niocount * sizeof(*niobuf)); + for (i = 0; i < page_count; i++) + short_io_size += pga[i]->count; + + /* Check if read/write is small enough to be a short io. */ + if (short_io_size > cli->cl_max_short_io_bytes || niocount > 1 || + !imp_connect_shortio(cli->cl_import)) + short_io_size = 0; + + req_capsule_set_size(pill, &RMF_SHORT_IO, RCL_CLIENT, + opc == OST_READ ? 0 : short_io_size); + if (opc == OST_READ) + req_capsule_set_size(pill, &RMF_SHORT_IO, RCL_SERVER, + short_io_size); + rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc); if (rc) { ptlrpc_request_free(req); RETURN(rc); } - req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */ - ptlrpc_at_set_req_timeout(req); + osc_set_io_portal(req); + + ptlrpc_at_set_req_timeout(req); /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own * retry logic */ req->rq_no_retry_einprogress = 1; + if (short_io_size != 0) { + desc = NULL; + short_io_buf = NULL; + goto no_bulk; + } + desc = ptlrpc_prep_bulk_imp(req, page_count, cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS, (opc == OST_WRITE ? PTLRPC_BULK_GET_SOURCE : @@ -1187,7 +1378,7 @@ osc_brw_prep_request(int cmd, struct client_obd *cli, struct obdo *oa, if (desc == NULL) GOTO(out, rc = -ENOMEM); /* NB request now owns desc and will free it when it gets freed */ - +no_bulk: body = req_capsule_client_get(pill, &RMF_OST_BODY); ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ); niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE); @@ -1195,6 +1386,15 @@ osc_brw_prep_request(int cmd, struct client_obd *cli, struct obdo *oa, lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa); + /* For READ and WRITE, we can't fill o_uid and o_gid using from_kuid() + * and from_kgid(), because they are asynchronous. Fortunately, variable + * oa contains valid o_uid and o_gid in these two operations. + * Besides, filling o_uid and o_gid is enough for nrs-tbf, see LU-9658. + * OBD_MD_FLUID and OBD_MD_FLUID is not set in order to avoid breaking + * other process logic */ + body->oa.o_uid = oa->o_uid; + body->oa.o_gid = oa->o_gid; + obdo_to_ioobj(oa, ioobj); ioobj->ioo_bufcnt = niocount; /* The high bits of ioo_max_brw tells server _maximum_ number of bulks @@ -1202,7 +1402,26 @@ osc_brw_prep_request(int cmd, struct client_obd *cli, struct obdo *oa, * when the RPC is finally sent in ptlrpc_register_bulk(). It sends * "max - 1" for old client compatibility sending "0", and also so the * the actual maximum is a power-of-two number, not one less. LU-1431 */ - ioobj_max_brw_set(ioobj, desc->bd_md_max_brw); + if (desc != NULL) + ioobj_max_brw_set(ioobj, desc->bd_md_max_brw); + else /* short io */ + ioobj_max_brw_set(ioobj, 0); + + if (short_io_size != 0) { + if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) { + body->oa.o_valid |= OBD_MD_FLFLAGS; + body->oa.o_flags = 0; + } + body->oa.o_flags |= OBD_FL_SHORT_IO; + CDEBUG(D_CACHE, "Using short io for data transfer, size = %d\n", + short_io_size); + if (opc == OST_WRITE) { + short_io_buf = req_capsule_client_get(pill, + &RMF_SHORT_IO); + LASSERT(short_io_buf != NULL); + } + } + LASSERT(page_count > 0); pg_prev = pga[0]; for (requested_nob = i = 0; i < page_count; i++, niobuf++) { @@ -1212,9 +1431,9 @@ osc_brw_prep_request(int cmd, struct client_obd *cli, struct obdo *oa, LASSERT(pg->count > 0); /* make sure there is no gap in the middle of page array */ LASSERTF(page_count == 1 || - (ergo(i == 0, poff + pg->count == PAGE_CACHE_SIZE) && + (ergo(i == 0, poff + pg->count == PAGE_SIZE) && ergo(i > 0 && i < page_count - 1, - poff == 0 && pg->count == PAGE_CACHE_SIZE) && + poff == 0 && pg->count == PAGE_SIZE) && ergo(i == page_count - 1, poff == 0)), "i: %d/%d pg: %p off: %llu, count: %u\n", i, page_count, pg, pg->off, pg->count); @@ -1227,9 +1446,19 @@ osc_brw_prep_request(int cmd, struct client_obd *cli, struct obdo *oa, pg_prev->pg->index, pg_prev->off); LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) == (pg->flag & OBD_BRW_SRVLOCK)); - - desc->bd_frag_ops->add_kiov_frag(desc, pg->pg, poff, pg->count); - requested_nob += pg->count; + if (short_io_size != 0 && opc == OST_WRITE) { + unsigned char *ptr = ll_kmap_atomic(pg->pg, KM_USER0); + + LASSERT(short_io_size >= requested_nob + pg->count); + memcpy(short_io_buf + requested_nob, + ptr + poff, + pg->count); + ll_kunmap_atomic(ptr, KM_USER0); + } else if (short_io_size == 0) { + desc->bd_frag_ops->add_kiov_frag(desc, pg->pg, poff, + pg->count); + } + requested_nob += pg->count; if (i > 0 && can_merge_pages(pg_prev, pg)) { niobuf--; @@ -1265,23 +1494,31 @@ osc_brw_prep_request(int cmd, struct client_obd *cli, struct obdo *oa, !sptlrpc_flavor_has_bulk(&req->rq_flvr)) { /* store cl_cksum_type in a local variable since * it can be changed via lprocfs */ - cksum_type_t cksum_type = cli->cl_cksum_type; + enum cksum_types cksum_type = cli->cl_cksum_type; - if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) { - oa->o_flags &= OBD_FL_LOCAL_MASK; + if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) body->oa.o_flags = 0; - } - body->oa.o_flags |= cksum_type_pack(cksum_type); + + body->oa.o_flags |= obd_cksum_type_pack(obd_name, + cksum_type); body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS; - body->oa.o_cksum = osc_checksum_bulk(requested_nob, - page_count, pga, - OST_WRITE, - cksum_type); + + rc = osc_checksum_bulk_rw(obd_name, cksum_type, + requested_nob, page_count, + pga, OST_WRITE, + &body->oa.o_cksum); + if (rc < 0) { + CDEBUG(D_PAGE, "failed to checksum, rc = %d\n", + rc); + GOTO(out, rc); + } CDEBUG(D_PAGE, "checksum at write origin: %x\n", body->oa.o_cksum); + /* save this in 'oa', too, for later checking */ oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS; - oa->o_flags |= cksum_type_pack(cksum_type); + oa->o_flags |= obd_cksum_type_pack(obd_name, + cksum_type); } else { /* clear out the checksum flag, in case this is a * resend but cl_checksum is no longer set. b=11238 */ @@ -1296,21 +1533,26 @@ osc_brw_prep_request(int cmd, struct client_obd *cli, struct obdo *oa, !sptlrpc_flavor_has_bulk(&req->rq_flvr)) { if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) body->oa.o_flags = 0; - body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type); + body->oa.o_flags |= obd_cksum_type_pack(obd_name, + cli->cl_cksum_type); body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS; - } - } - ptlrpc_request_set_replen(req); + } - CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args)); - aa = ptlrpc_req_async_args(req); - aa->aa_oa = oa; - aa->aa_requested_nob = requested_nob; - aa->aa_nio_count = niocount; - aa->aa_page_count = page_count; - aa->aa_resends = 0; - aa->aa_ppga = pga; - aa->aa_cli = cli; + /* Client cksum has been already copied to wire obdo in previous + * lustre_set_wire_obdo(), and in the case a bulk-read is being + * resent due to cksum error, this will allow Server to + * check+dump pages on its side */ + } + ptlrpc_request_set_replen(req); + + aa = ptlrpc_req_async_args(aa, req); + aa->aa_oa = oa; + aa->aa_requested_nob = requested_nob; + aa->aa_nio_count = niocount; + aa->aa_page_count = page_count; + aa->aa_resends = 0; + aa->aa_ppga = pga; + aa->aa_cli = cli; INIT_LIST_HEAD(&aa->aa_oaps); *reqp = req; @@ -1325,26 +1567,127 @@ osc_brw_prep_request(int cmd, struct client_obd *cli, struct obdo *oa, RETURN(rc); } -static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer, - __u32 client_cksum, __u32 server_cksum, int nob, - size_t page_count, struct brw_page **pga, - cksum_type_t client_cksum_type) -{ - __u32 new_cksum; - char *msg; - cksum_type_t cksum_type; +char dbgcksum_file_name[PATH_MAX]; + +static void dump_all_bulk_pages(struct obdo *oa, __u32 page_count, + struct brw_page **pga, __u32 server_cksum, + __u32 client_cksum) +{ + struct file *filp; + int rc, i; + unsigned int len; + char *buf; + + /* will only keep dump of pages on first error for the same range in + * file/fid, not during the resends/retries. */ + snprintf(dbgcksum_file_name, sizeof(dbgcksum_file_name), + "%s-checksum_dump-osc-"DFID":[%llu-%llu]-%x-%x", + (strncmp(libcfs_debug_file_path_arr, "NONE", 4) != 0 ? + libcfs_debug_file_path_arr : + LIBCFS_DEBUG_FILE_PATH_DEFAULT), + oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : 0ULL, + oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0, + oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0, + pga[0]->off, + pga[page_count-1]->off + pga[page_count-1]->count - 1, + client_cksum, server_cksum); + filp = filp_open(dbgcksum_file_name, + O_CREAT | O_EXCL | O_WRONLY | O_LARGEFILE, 0600); + if (IS_ERR(filp)) { + rc = PTR_ERR(filp); + if (rc == -EEXIST) + CDEBUG(D_INFO, "%s: can't open to dump pages with " + "checksum error: rc = %d\n", dbgcksum_file_name, + rc); + else + CERROR("%s: can't open to dump pages with checksum " + "error: rc = %d\n", dbgcksum_file_name, rc); + return; + } + + for (i = 0; i < page_count; i++) { + len = pga[i]->count; + buf = kmap(pga[i]->pg); + while (len != 0) { + rc = cfs_kernel_write(filp, buf, len, &filp->f_pos); + if (rc < 0) { + CERROR("%s: wanted to write %u but got %d " + "error\n", dbgcksum_file_name, len, rc); + break; + } + len -= rc; + buf += rc; + CDEBUG(D_INFO, "%s: wrote %d bytes\n", + dbgcksum_file_name, rc); + } + kunmap(pga[i]->pg); + } + + rc = vfs_fsync_range(filp, 0, LLONG_MAX, 1); + if (rc) + CERROR("%s: sync returns %d\n", dbgcksum_file_name, rc); + filp_close(filp, NULL); +} + +static int +check_write_checksum(struct obdo *oa, const struct lnet_process_id *peer, + __u32 client_cksum, __u32 server_cksum, + struct osc_brw_async_args *aa) +{ + const char *obd_name = aa->aa_cli->cl_import->imp_obd->obd_name; + enum cksum_types cksum_type; + obd_dif_csum_fn *fn = NULL; + int sector_size = 0; + __u32 new_cksum; + char *msg; + int rc; if (server_cksum == client_cksum) { CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum); return 0; } - cksum_type = cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ? - oa->o_flags : 0); - new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE, - cksum_type); + if (aa->aa_cli->cl_checksum_dump) + dump_all_bulk_pages(oa, aa->aa_page_count, aa->aa_ppga, + server_cksum, client_cksum); + + cksum_type = obd_cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ? + oa->o_flags : 0); + + switch (cksum_type) { + case OBD_CKSUM_T10IP512: + fn = obd_dif_ip_fn; + sector_size = 512; + break; + case OBD_CKSUM_T10IP4K: + fn = obd_dif_ip_fn; + sector_size = 4096; + break; + case OBD_CKSUM_T10CRC512: + fn = obd_dif_crc_fn; + sector_size = 512; + break; + case OBD_CKSUM_T10CRC4K: + fn = obd_dif_crc_fn; + sector_size = 4096; + break; + default: + break; + } + + if (fn) + rc = osc_checksum_bulk_t10pi(obd_name, aa->aa_requested_nob, + aa->aa_page_count, aa->aa_ppga, + OST_WRITE, fn, sector_size, + &new_cksum); + else + rc = osc_checksum_bulk(aa->aa_requested_nob, aa->aa_page_count, + aa->aa_ppga, OST_WRITE, cksum_type, + &new_cksum); - if (cksum_type != client_cksum_type) + if (rc < 0) + msg = "failed to calculate the client write checksum"; + else if (cksum_type != obd_cksum_type_unpack(aa->aa_oa->o_flags)) msg = "the server did not use the checksum type specified in " "the original request - likely a protocol problem"; else if (new_cksum == server_cksum) @@ -1356,103 +1699,145 @@ static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer, msg = "changed in transit AND doesn't match the original - " "likely false positive due to mmap IO (bug 11742)"; - LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inode "DFID - " object "DOSTID" extent [%llu-%llu]\n", - msg, libcfs_nid2str(peer->nid), + LCONSOLE_ERROR_MSG(0x132, "%s: BAD WRITE CHECKSUM: %s: from %s inode " + DFID " object "DOSTID" extent [%llu-%llu], original " + "client csum %x (type %x), server csum %x (type %x)," + " client csum now %x\n", + obd_name, msg, libcfs_nid2str(peer->nid), oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0, oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0, oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0, - POSTID(&oa->o_oi), pga[0]->off, - pga[page_count-1]->off + pga[page_count-1]->count - 1); - CERROR("original client csum %x (type %x), server csum %x (type %x), " - "client csum now %x\n", client_cksum, client_cksum_type, - server_cksum, cksum_type, new_cksum); + POSTID(&oa->o_oi), aa->aa_ppga[0]->off, + aa->aa_ppga[aa->aa_page_count - 1]->off + + aa->aa_ppga[aa->aa_page_count-1]->count - 1, + client_cksum, + obd_cksum_type_unpack(aa->aa_oa->o_flags), + server_cksum, cksum_type, new_cksum); return 1; } /* Note rc enters this function as number of bytes transferred */ static int osc_brw_fini_request(struct ptlrpc_request *req, int rc) { - struct osc_brw_async_args *aa = (void *)&req->rq_async_args; - const lnet_process_id_t *peer = - &req->rq_import->imp_connection->c_peer; - struct client_obd *cli = aa->aa_cli; - struct ost_body *body; + struct osc_brw_async_args *aa = (void *)&req->rq_async_args; + struct client_obd *cli = aa->aa_cli; + const char *obd_name = cli->cl_import->imp_obd->obd_name; + const struct lnet_process_id *peer = + &req->rq_import->imp_connection->c_peer; + struct ost_body *body; u32 client_cksum = 0; - ENTRY; - if (rc < 0 && rc != -EDQUOT) { - DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc); - RETURN(rc); - } + ENTRY; - LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc); - body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY); - if (body == NULL) { - DEBUG_REQ(D_INFO, req, "Can't unpack body\n"); - RETURN(-EPROTO); - } + if (rc < 0 && rc != -EDQUOT) { + DEBUG_REQ(D_INFO, req, "Failed request: rc = %d", rc); + RETURN(rc); + } - /* set/clear over quota flag for a uid/gid */ - if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE && - body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) { - unsigned int qid[LL_MAXQUOTAS] = - {body->oa.o_uid, body->oa.o_gid}; + LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc); + body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY); + if (body == NULL) { + DEBUG_REQ(D_INFO, req, "cannot unpack body"); + RETURN(-EPROTO); + } - CDEBUG(D_QUOTA, "setdq for [%u %u] with valid %#llx, flags %x\n", - body->oa.o_uid, body->oa.o_gid, body->oa.o_valid, - body->oa.o_flags); - osc_quota_setdq(cli, qid, body->oa.o_valid, body->oa.o_flags); - } + /* set/clear over quota flag for a uid/gid/projid */ + if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE && + body->oa.o_valid & (OBD_MD_FLALLQUOTA)) { + unsigned qid[LL_MAXQUOTAS] = { + body->oa.o_uid, body->oa.o_gid, + body->oa.o_projid }; + CDEBUG(D_QUOTA, + "setdq for [%u %u %u] with valid %#llx, flags %x\n", + body->oa.o_uid, body->oa.o_gid, body->oa.o_projid, + body->oa.o_valid, body->oa.o_flags); + osc_quota_setdq(cli, req->rq_xid, qid, body->oa.o_valid, + body->oa.o_flags); + } - osc_update_grant(cli, body); + osc_update_grant(cli, body); - if (rc < 0) - RETURN(rc); + if (rc < 0) + RETURN(rc); - if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM) - client_cksum = aa->aa_oa->o_cksum; /* save for later */ + if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM) + client_cksum = aa->aa_oa->o_cksum; /* save for later */ - if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) { - if (rc > 0) { - CERROR("Unexpected +ve rc %d\n", rc); - RETURN(-EPROTO); - } - LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob); + if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) { + if (rc > 0) { + CERROR("%s: unexpected positive size %d\n", + obd_name, rc); + RETURN(-EPROTO); + } - if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk)) - RETURN(-EAGAIN); + if (req->rq_bulk != NULL && + sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk)) + RETURN(-EAGAIN); - if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum && - check_write_checksum(&body->oa, peer, client_cksum, - body->oa.o_cksum, aa->aa_requested_nob, - aa->aa_page_count, aa->aa_ppga, - cksum_type_unpack(aa->aa_oa->o_flags))) - RETURN(-EAGAIN); + if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum && + check_write_checksum(&body->oa, peer, client_cksum, + body->oa.o_cksum, aa)) + RETURN(-EAGAIN); - rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count, - aa->aa_page_count, aa->aa_ppga); - GOTO(out, rc); - } + rc = check_write_rcs(req, aa->aa_requested_nob, + aa->aa_nio_count, aa->aa_page_count, + aa->aa_ppga); + GOTO(out, rc); + } - /* The rest of this function executes only for OST_READs */ + /* The rest of this function executes only for OST_READs */ - /* if unwrap_bulk failed, return -EAGAIN to retry */ - rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc); - if (rc < 0) - GOTO(out, rc = -EAGAIN); + if (req->rq_bulk == NULL) { + rc = req_capsule_get_size(&req->rq_pill, &RMF_SHORT_IO, + RCL_SERVER); + LASSERT(rc == req->rq_status); + } else { + /* if unwrap_bulk failed, return -EAGAIN to retry */ + rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc); + } + if (rc < 0) + GOTO(out, rc = -EAGAIN); - if (rc > aa->aa_requested_nob) { - CERROR("Unexpected rc %d (%d requested)\n", rc, - aa->aa_requested_nob); - RETURN(-EPROTO); - } + if (rc > aa->aa_requested_nob) { + CERROR("%s: unexpected size %d, requested %d\n", obd_name, + rc, aa->aa_requested_nob); + RETURN(-EPROTO); + } - if (rc != req->rq_bulk->bd_nob_transferred) { - CERROR ("Unexpected rc %d (%d transferred)\n", - rc, req->rq_bulk->bd_nob_transferred); - return (-EPROTO); - } + if (req->rq_bulk != NULL && rc != req->rq_bulk->bd_nob_transferred) { + CERROR("%s: unexpected size %d, transferred %d\n", obd_name, + rc, req->rq_bulk->bd_nob_transferred); + RETURN(-EPROTO); + } + + if (req->rq_bulk == NULL) { + /* short io */ + int nob, pg_count, i = 0; + unsigned char *buf; + + CDEBUG(D_CACHE, "Using short io read, size %d\n", rc); + pg_count = aa->aa_page_count; + buf = req_capsule_server_sized_get(&req->rq_pill, &RMF_SHORT_IO, + rc); + nob = rc; + while (nob > 0 && pg_count > 0) { + unsigned char *ptr; + int count = aa->aa_ppga[i]->count > nob ? + nob : aa->aa_ppga[i]->count; + + CDEBUG(D_CACHE, "page %p count %d\n", + aa->aa_ppga[i]->pg, count); + ptr = ll_kmap_atomic(aa->aa_ppga[i]->pg, KM_USER0); + memcpy(ptr + (aa->aa_ppga[i]->off & ~PAGE_MASK), buf, + count); + ll_kunmap_atomic((void *) ptr, KM_USER0); + + buf += count; + nob -= count; + i++; + pg_count--; + } + } if (rc < aa->aa_requested_nob) handle_short_read(rc, aa->aa_page_count, aa->aa_ppga); @@ -1462,39 +1847,53 @@ static int osc_brw_fini_request(struct ptlrpc_request *req, int rc) u32 server_cksum = body->oa.o_cksum; char *via = ""; char *router = ""; - cksum_type_t cksum_type; - - cksum_type = cksum_type_unpack(body->oa.o_valid &OBD_MD_FLFLAGS? - body->oa.o_flags : 0); - client_cksum = osc_checksum_bulk(rc, aa->aa_page_count, - aa->aa_ppga, OST_READ, - cksum_type); - - if (peer->nid != req->rq_bulk->bd_sender) { + enum cksum_types cksum_type; + u32 o_flags = body->oa.o_valid & OBD_MD_FLFLAGS ? + body->oa.o_flags : 0; + + cksum_type = obd_cksum_type_unpack(o_flags); + rc = osc_checksum_bulk_rw(obd_name, cksum_type, rc, + aa->aa_page_count, aa->aa_ppga, + OST_READ, &client_cksum); + if (rc < 0) + GOTO(out, rc); + + if (req->rq_bulk != NULL && + peer->nid != req->rq_bulk->bd_sender) { via = " via "; router = libcfs_nid2str(req->rq_bulk->bd_sender); } if (server_cksum != client_cksum) { + struct ost_body *clbody; + u32 page_count = aa->aa_page_count; + + clbody = req_capsule_client_get(&req->rq_pill, + &RMF_OST_BODY); + if (cli->cl_checksum_dump) + dump_all_bulk_pages(&clbody->oa, page_count, + aa->aa_ppga, server_cksum, + client_cksum); + LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from " "%s%s%s inode "DFID" object "DOSTID - " extent [%llu-%llu]\n", - req->rq_import->imp_obd->obd_name, + " extent [%llu-%llu], client %x, " + "server %x, cksum_type %x\n", + obd_name, libcfs_nid2str(peer->nid), via, router, - body->oa.o_valid & OBD_MD_FLFID ? - body->oa.o_parent_seq : (__u64)0, - body->oa.o_valid & OBD_MD_FLFID ? - body->oa.o_parent_oid : 0, - body->oa.o_valid & OBD_MD_FLFID ? - body->oa.o_parent_ver : 0, + clbody->oa.o_valid & OBD_MD_FLFID ? + clbody->oa.o_parent_seq : 0ULL, + clbody->oa.o_valid & OBD_MD_FLFID ? + clbody->oa.o_parent_oid : 0, + clbody->oa.o_valid & OBD_MD_FLFID ? + clbody->oa.o_parent_ver : 0, POSTID(&body->oa.o_oi), aa->aa_ppga[0]->off, - aa->aa_ppga[aa->aa_page_count-1]->off + - aa->aa_ppga[aa->aa_page_count-1]->count - - 1); - CERROR("client %x, server %x, cksum_type %x\n", - client_cksum, server_cksum, cksum_type); + aa->aa_ppga[page_count-1]->off + + aa->aa_ppga[page_count-1]->count - 1, + client_cksum, server_cksum, + cksum_type); cksum_counter = 0; aa->aa_oa->o_cksum = client_cksum; rc = -EAGAIN; @@ -1503,32 +1902,34 @@ static int osc_brw_fini_request(struct ptlrpc_request *req, int rc) CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum); rc = 0; } - } else if (unlikely(client_cksum)) { - static int cksum_missed; - - cksum_missed++; - if ((cksum_missed & (-cksum_missed)) == cksum_missed) - CERROR("Checksum %u requested from %s but not sent\n", - cksum_missed, libcfs_nid2str(peer->nid)); - } else { - rc = 0; - } + } else if (unlikely(client_cksum)) { + static int cksum_missed; + + cksum_missed++; + if ((cksum_missed & (-cksum_missed)) == cksum_missed) + CERROR("%s: checksum %u requested from %s but not sent\n", + obd_name, cksum_missed, + libcfs_nid2str(peer->nid)); + } else { + rc = 0; + } out: if (rc >= 0) lustre_get_wire_obdo(&req->rq_import->imp_connect_data, aa->aa_oa, &body->oa); - RETURN(rc); + RETURN(rc); } static int osc_brw_redo_request(struct ptlrpc_request *request, struct osc_brw_async_args *aa, int rc) { - struct ptlrpc_request *new_req; - struct osc_brw_async_args *new_aa; - struct osc_async_page *oap; - ENTRY; + struct ptlrpc_request *new_req; + struct osc_brw_async_args *new_aa; + struct osc_async_page *oap; + ENTRY; + /* The below message is checked in replay-ost-single.sh test_8ae*/ DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request, "redo for recoverable error %d", rc); @@ -1550,22 +1951,24 @@ static int osc_brw_redo_request(struct ptlrpc_request *request, } } } - /* New request takes over pga and oaps from old request. - * Note that copying a list_head doesn't work, need to move it... */ - aa->aa_resends++; - new_req->rq_interpret_reply = request->rq_interpret_reply; - new_req->rq_async_args = request->rq_async_args; + /* + * New request takes over pga and oaps from old request. + * Note that copying a list_head doesn't work, need to move it... + */ + aa->aa_resends++; + new_req->rq_interpret_reply = request->rq_interpret_reply; + new_req->rq_async_args = request->rq_async_args; new_req->rq_commit_cb = request->rq_commit_cb; /* cap resend delay to the current request timeout, this is similar to * what ptlrpc does (see after_reply()) */ if (aa->aa_resends > new_req->rq_timeout) - new_req->rq_sent = cfs_time_current_sec() + new_req->rq_timeout; + new_req->rq_sent = ktime_get_real_seconds() + new_req->rq_timeout; else - new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends; + new_req->rq_sent = ktime_get_real_seconds() + aa->aa_resends; new_req->rq_generation_set = 1; new_req->rq_import_generation = request->rq_import_generation; - new_aa = ptlrpc_req_async_args(new_req); + new_aa = ptlrpc_req_async_args(new_aa, new_req); INIT_LIST_HEAD(&new_aa->aa_oaps); list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps); @@ -1628,19 +2031,23 @@ static void osc_release_ppga(struct brw_page **ppga, size_t count) } static int brw_interpret(const struct lu_env *env, - struct ptlrpc_request *req, void *data, int rc) + struct ptlrpc_request *req, void *args, int rc) { - struct osc_brw_async_args *aa = data; + struct osc_brw_async_args *aa = args; struct osc_extent *ext; struct osc_extent *tmp; struct client_obd *cli = aa->aa_cli; - ENTRY; + unsigned long transferred = 0; + + ENTRY; - rc = osc_brw_fini_request(req, rc); - CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc); - /* When server return -EINPROGRESS, client should always retry - * regardless of the number of times the bulk was resent already. */ - if (osc_recoverable_error(rc)) { + rc = osc_brw_fini_request(req, rc); + CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc); + /* + * When server returns -EINPROGRESS, client should always retry + * regardless of the number of times the bulk was resent already. + */ + if (osc_recoverable_error(rc) && !req->rq_no_delay) { if (req->rq_import_generation != req->rq_import->imp_generation) { CDEBUG(D_HA, "%s: resend cross eviction for object: " @@ -1714,20 +2121,26 @@ static int brw_interpret(const struct lu_env *env, cl_object_attr_update(env, obj, attr, valid); cl_object_attr_unlock(obj); } - OBDO_FREE(aa->aa_oa); + OBD_SLAB_FREE_PTR(aa->aa_oa, osc_obdo_kmem); + aa->aa_oa = NULL; if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE && rc == 0) osc_inc_unstable_pages(req); list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) { list_del_init(&ext->oe_link); - osc_extent_finish(env, ext, 1, rc); + osc_extent_finish(env, ext, 1, + rc && req->rq_no_delay ? -EWOULDBLOCK : rc); } LASSERT(list_empty(&aa->aa_exts)); LASSERT(list_empty(&aa->aa_oaps)); + transferred = (req->rq_bulk == NULL ? /* short io */ + aa->aa_requested_nob : + req->rq_bulk->bd_nob_transferred); + osc_release_ppga(aa->aa_ppga, aa->aa_page_count); - ptlrpc_lprocfs_brw(req, req->rq_bulk->bd_nob_transferred); + ptlrpc_lprocfs_brw(req, transferred); spin_lock(&cli->cl_loi_list_lock); /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters @@ -1785,10 +2198,12 @@ int osc_build_rpc(const struct lu_env *env, struct client_obd *cli, int page_count = 0; bool soft_sync = false; bool interrupted = false; + bool ndelay = false; int i; int grant = 0; int rc; - struct list_head rpc_list = LIST_HEAD_INIT(rpc_list); + __u32 layout_version = 0; + LIST_HEAD(rpc_list); struct ost_body *body; ENTRY; LASSERT(!list_empty(ext_list)); @@ -1799,6 +2214,7 @@ int osc_build_rpc(const struct lu_env *env, struct client_obd *cli, mem_tight |= ext->oe_memalloc; grant += ext->oe_grants; page_count += ext->oe_nr_pages; + layout_version = max(layout_version, ext->oe_layout_version); if (obj == NULL) obj = ext->oe_obj; } @@ -1811,7 +2227,7 @@ int osc_build_rpc(const struct lu_env *env, struct client_obd *cli, if (pga == NULL) GOTO(out, rc = -ENOMEM); - OBDO_ALLOC(oa); + OBD_SLAB_ALLOC_PTR_GFP(oa, osc_obdo_kmem, GFP_NOFS); if (oa == NULL) GOTO(out, rc = -ENOMEM); @@ -1837,10 +2253,12 @@ int osc_build_rpc(const struct lu_env *env, struct client_obd *cli, oap->oap_count; else LASSERT(oap->oap_page_off + oap->oap_count == - PAGE_CACHE_SIZE); + PAGE_SIZE); if (oap->oap_interrupted) interrupted = true; } + if (ext->oe_ndelay) + ndelay = true; } /* first page in the list */ @@ -1854,8 +2272,16 @@ int osc_build_rpc(const struct lu_env *env, struct client_obd *cli, crattr->cra_oa = oa; cl_req_attr_set(env, osc2cl(obj), crattr); - if (cmd == OBD_BRW_WRITE) + if (cmd == OBD_BRW_WRITE) { oa->o_grant_used = grant; + if (layout_version > 0) { + CDEBUG(D_LAYOUT, DFID": write with layout version %u\n", + PFID(&oa->o_oi.oi_fid), layout_version); + + oa->o_layout_version = layout_version; + oa->o_valid |= OBD_MD_LAYOUT_VERSION; + } + } sort_brw_pages(pga, page_count); rc = osc_brw_prep_request(cmd, cli, oa, page_count, pga, &req, 0); @@ -1870,6 +2296,12 @@ int osc_build_rpc(const struct lu_env *env, struct client_obd *cli, oap->oap_request = ptlrpc_request_addref(req); if (interrupted && !req->rq_intr) ptlrpc_mark_interrupted(req); + if (ndelay) { + req->rq_no_resend = req->rq_no_delay = 1; + /* probably set a shorter timeout value. + * to handle ETIMEDOUT in brw_interpret() correctly. */ + /* lustre_msg_set_timeout(req, req->rq_timeout / 2); */ + } /* Need to update the timestamps after the request is built in case * we race with setattr (locally or in queue at OST). If OST gets @@ -1878,19 +2310,18 @@ int osc_build_rpc(const struct lu_env *env, struct client_obd *cli, * way to do this in a single call. bug 10150 */ body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY); crattr->cra_oa = &body->oa; - crattr->cra_flags = OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME; + crattr->cra_flags = OBD_MD_FLMTIME | OBD_MD_FLCTIME | OBD_MD_FLATIME; cl_req_attr_set(env, osc2cl(obj), crattr); lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid); - CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args)); - aa = ptlrpc_req_async_args(req); + aa = ptlrpc_req_async_args(aa, req); INIT_LIST_HEAD(&aa->aa_oaps); list_splice_init(&rpc_list, &aa->aa_oaps); INIT_LIST_HEAD(&aa->aa_exts); list_splice_init(ext_list, &aa->aa_exts); spin_lock(&cli->cl_loi_list_lock); - starting_offset >>= PAGE_CACHE_SHIFT; + starting_offset >>= PAGE_SHIFT; if (cmd == OBD_BRW_READ) { cli->cl_r_in_flight++; lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count); @@ -1906,7 +2337,7 @@ int osc_build_rpc(const struct lu_env *env, struct client_obd *cli, } spin_unlock(&cli->cl_loi_list_lock); - DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %ur/%uw in flight", + DEBUG_REQ(D_INODE, req, "%d pages, aa %p, now %ur/%uw in flight", page_count, aa, cli->cl_r_in_flight, cli->cl_w_in_flight); OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_DELAY_IO, cfs_fail_val); @@ -1923,7 +2354,7 @@ out: LASSERT(req == NULL); if (oa) - OBDO_FREE(oa); + OBD_SLAB_FREE_PTR(oa, osc_obdo_kmem); if (pga) OBD_FREE(pga, sizeof(*pga) * page_count); /* this should happen rarely and is pretty bad, it makes the @@ -1956,10 +2387,10 @@ static int osc_set_lock_data(struct ldlm_lock *lock, void *data) return set; } -static int osc_enqueue_fini(struct ptlrpc_request *req, - osc_enqueue_upcall_f upcall, void *cookie, - struct lustre_handle *lockh, enum ldlm_mode mode, - __u64 *flags, int agl, int errcode) +int osc_enqueue_fini(struct ptlrpc_request *req, osc_enqueue_upcall_f upcall, + void *cookie, struct lustre_handle *lockh, + enum ldlm_mode mode, __u64 *flags, bool speculative, + int errcode) { bool intent = *flags & LDLM_FL_HAS_INTENT; int rc; @@ -1976,7 +2407,7 @@ static int osc_enqueue_fini(struct ptlrpc_request *req, ptlrpc_status_ntoh(rep->lock_policy_res1); if (rep->lock_policy_res1) errcode = rep->lock_policy_res1; - if (!agl) + if (!speculative) *flags |= LDLM_FL_LVB_READY; } else if (errcode == ELDLM_OK) { *flags |= LDLM_FL_LVB_READY; @@ -1991,13 +2422,13 @@ static int osc_enqueue_fini(struct ptlrpc_request *req, if (errcode == ELDLM_OK && lustre_handle_is_used(lockh)) ldlm_lock_decref(lockh, mode); - RETURN(rc); + RETURN(rc); } -static int osc_enqueue_interpret(const struct lu_env *env, - struct ptlrpc_request *req, - struct osc_enqueue_args *aa, int rc) +int osc_enqueue_interpret(const struct lu_env *env, struct ptlrpc_request *req, + void *args, int rc) { + struct osc_enqueue_args *aa = args; struct ldlm_lock *lock; struct lustre_handle *lockh = &aa->oa_lockh; enum ldlm_mode mode = aa->oa_mode; @@ -2026,7 +2457,7 @@ static int osc_enqueue_interpret(const struct lu_env *env, /* Let CP AST to grant the lock first. */ OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1); - if (aa->oa_agl) { + if (aa->oa_speculative) { LASSERT(aa->oa_lvb == NULL); LASSERT(aa->oa_flags == NULL); aa->oa_flags = &flags; @@ -2038,9 +2469,9 @@ static int osc_enqueue_interpret(const struct lu_env *env, lockh, rc); /* Complete osc stuff. */ rc = osc_enqueue_fini(req, aa->oa_upcall, aa->oa_cookie, lockh, mode, - aa->oa_flags, aa->oa_agl, rc); + aa->oa_flags, aa->oa_speculative, rc); - OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10); + OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10); ldlm_lock_decref(lockh, mode); LDLM_LOCK_PUT(lock); @@ -2058,10 +2489,10 @@ struct ptlrpc_request_set *PTLRPCD_SET = (void *)1; * release locks just after they are obtained. */ int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id, __u64 *flags, union ldlm_policy_data *policy, - struct ost_lvb *lvb, int kms_valid, - osc_enqueue_upcall_f upcall, void *cookie, - struct ldlm_enqueue_info *einfo, - struct ptlrpc_request_set *rqset, int async, int agl) + struct ost_lvb *lvb, osc_enqueue_upcall_f upcall, + void *cookie, struct ldlm_enqueue_info *einfo, + struct ptlrpc_request_set *rqset, int async, + bool speculative) { struct obd_device *obd = exp->exp_obd; struct lustre_handle lockh = { 0 }; @@ -2077,15 +2508,6 @@ int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id, policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK; policy->l_extent.end |= ~PAGE_MASK; - /* - * kms is not valid when either object is completely fresh (so that no - * locks are cached), or object was evicted. In the latter case cached - * lock cannot be used, because it would prime inode state with - * potentially stale LVB. - */ - if (!kms_valid) - goto no_match; - /* Next, search for already existing extent locks that will cover us */ /* If we're trying to read, we also search for an existing PW lock. The * VFS and page cache already protect us locally, so lots of readers/ @@ -2101,7 +2523,10 @@ int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id, mode = einfo->ei_mode; if (einfo->ei_mode == LCK_PR) mode |= LCK_PW; - if (agl == 0) + /* Normal lock requests must wait for the LVB to be ready before + * matching a lock; speculative lock requests do not need to, + * because they will not actually use the lock. */ + if (!speculative) match_flags |= LDLM_FL_LVB_READY; if (intent != 0) match_flags |= LDLM_FL_BLOCK_GRANTED; @@ -2114,13 +2539,22 @@ int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id, RETURN(ELDLM_OK); matched = ldlm_handle2lock(&lockh); - if (agl) { - /* AGL enqueues DLM locks speculatively. Therefore if - * it already exists a DLM lock, it wll just inform the - * caller to cancel the AGL process for this stripe. */ + if (speculative) { + /* This DLM lock request is speculative, and does not + * have an associated IO request. Therefore if there + * is already a DLM lock, it wll just inform the + * caller to cancel the request for this stripe.*/ + lock_res_and_lock(matched); + if (ldlm_extent_equal(&policy->l_extent, + &matched->l_policy_data.l_extent)) + rc = -EEXIST; + else + rc = -ECANCELED; + unlock_res_and_lock(matched); + ldlm_lock_decref(&lockh, mode); LDLM_LOCK_PUT(matched); - RETURN(-ECANCELED); + RETURN(rc); } else if (osc_set_lock_data(matched, einfo->ei_cbdata)) { *flags |= LDLM_FL_LVB_READY; @@ -2136,7 +2570,6 @@ int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id, } } -no_match: if (*flags & (LDLM_FL_TEST_LOCK | LDLM_FL_MATCH_LOCK)) RETURN(-ENOLCK); @@ -2165,28 +2598,26 @@ no_match: if (async) { if (!rc) { struct osc_enqueue_args *aa; - CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args)); - aa = ptlrpc_req_async_args(req); - aa->oa_exp = exp; - aa->oa_mode = einfo->ei_mode; - aa->oa_type = einfo->ei_type; + aa = ptlrpc_req_async_args(aa, req); + aa->oa_exp = exp; + aa->oa_mode = einfo->ei_mode; + aa->oa_type = einfo->ei_type; lustre_handle_copy(&aa->oa_lockh, &lockh); - aa->oa_upcall = upcall; - aa->oa_cookie = cookie; - aa->oa_agl = !!agl; - if (!agl) { + aa->oa_upcall = upcall; + aa->oa_cookie = cookie; + aa->oa_speculative = speculative; + if (!speculative) { aa->oa_flags = flags; aa->oa_lvb = lvb; } else { - /* AGL is essentially to enqueue an DLM lock - * in advance, so we don't care about the - * result of AGL enqueue. */ + /* speculative locks are essentially to enqueue + * a DLM lock in advance, so we don't care + * about the result of the enqueue. */ aa->oa_lvb = NULL; aa->oa_flags = NULL; } - req->rq_interpret_reply = - (ptlrpc_interpterer_t)osc_enqueue_interpret; + req->rq_interpret_reply = osc_enqueue_interpret; if (rqset == PTLRPCD_SET) ptlrpcd_add_req(req); else @@ -2198,16 +2629,17 @@ no_match: } rc = osc_enqueue_fini(req, upcall, cookie, &lockh, einfo->ei_mode, - flags, agl, rc); + flags, speculative, rc); if (intent) ptlrpc_req_finished(req); RETURN(rc); } -int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id, - enum ldlm_type type, union ldlm_policy_data *policy, - enum ldlm_mode mode, __u64 *flags, void *data, +int osc_match_base(const struct lu_env *env, struct obd_export *exp, + struct ldlm_res_id *res_id, enum ldlm_type type, + union ldlm_policy_data *policy, enum ldlm_mode mode, + __u64 *flags, struct osc_object *obj, struct lustre_handle *lockh, int unref) { struct obd_device *obd = exp->exp_obd; @@ -2235,11 +2667,19 @@ int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id, if (rc == 0 || lflags & LDLM_FL_TEST_LOCK) RETURN(rc); - if (data != NULL) { + if (obj != NULL) { struct ldlm_lock *lock = ldlm_handle2lock(lockh); LASSERT(lock != NULL); - if (!osc_set_lock_data(lock, data)) { + if (osc_set_lock_data(lock, obj)) { + lock_res_and_lock(lock); + if (!ldlm_is_lvb_cached(lock)) { + LASSERT(lock->l_ast_data == obj); + osc_lock_lvb_update(env, obj, lock, NULL); + ldlm_set_lvb_cached(lock); + } + unlock_res_and_lock(lock); + } else { ldlm_lock_decref(lockh, rc); rc = 0; } @@ -2249,48 +2689,64 @@ int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id, } static int osc_statfs_interpret(const struct lu_env *env, - struct ptlrpc_request *req, - struct osc_async_args *aa, int rc) + struct ptlrpc_request *req, void *args, int rc) { - struct obd_statfs *msfs; - ENTRY; + struct osc_async_args *aa = args; + struct obd_statfs *msfs; - if (rc == -EBADR) - /* The request has in fact never been sent - * due to issues at a higher level (LOV). - * Exit immediately since the caller is - * aware of the problem and takes care - * of the clean up */ - RETURN(rc); + ENTRY; + if (rc == -EBADR) + /* + * The request has in fact never been sent due to issues at + * a higher level (LOV). Exit immediately since the caller + * is aware of the problem and takes care of the clean up. + */ + RETURN(rc); - if ((rc == -ENOTCONN || rc == -EAGAIN) && - (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY)) - GOTO(out, rc = 0); + if ((rc == -ENOTCONN || rc == -EAGAIN) && + (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY)) + GOTO(out, rc = 0); - if (rc != 0) - GOTO(out, rc); + if (rc != 0) + GOTO(out, rc); - msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS); - if (msfs == NULL) { + msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS); + if (msfs == NULL) GOTO(out, rc = -EPROTO); - } - *aa->aa_oi->oi_osfs = *msfs; + *aa->aa_oi->oi_osfs = *msfs; out: - rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc); - RETURN(rc); + rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc); + + RETURN(rc); } static int osc_statfs_async(struct obd_export *exp, - struct obd_info *oinfo, __u64 max_age, + struct obd_info *oinfo, time64_t max_age, struct ptlrpc_request_set *rqset) { struct obd_device *obd = class_exp2obd(exp); struct ptlrpc_request *req; struct osc_async_args *aa; - int rc; + int rc; ENTRY; + if (obd->obd_osfs_age >= max_age) { + CDEBUG(D_SUPER, + "%s: use %p cache blocks %llu/%llu objects %llu/%llu\n", + obd->obd_name, &obd->obd_osfs, + obd->obd_osfs.os_bavail, obd->obd_osfs.os_blocks, + obd->obd_osfs.os_ffree, obd->obd_osfs.os_files); + spin_lock(&obd->obd_osfs_lock); + memcpy(oinfo->oi_osfs, &obd->obd_osfs, sizeof(*oinfo->oi_osfs)); + spin_unlock(&obd->obd_osfs_lock); + oinfo->oi_flags |= OBD_STATFS_FROM_CACHE; + if (oinfo->oi_cb_up) + oinfo->oi_cb_up(oinfo, 0); + + RETURN(0); + } + /* We could possibly pass max_age in the request (as an absolute * timestamp or a "seconds.usec ago") so the target can avoid doing * extra calls into the filesystem if that isn't necessary (e.g. @@ -2306,34 +2762,34 @@ static int osc_statfs_async(struct obd_export *exp, ptlrpc_request_free(req); RETURN(rc); } - ptlrpc_request_set_replen(req); - req->rq_request_portal = OST_CREATE_PORTAL; - ptlrpc_at_set_req_timeout(req); + ptlrpc_request_set_replen(req); + req->rq_request_portal = OST_CREATE_PORTAL; + ptlrpc_at_set_req_timeout(req); - if (oinfo->oi_flags & OBD_STATFS_NODELAY) { - /* procfs requests not want stat in wait for avoid deadlock */ - req->rq_no_resend = 1; - req->rq_no_delay = 1; - } + if (oinfo->oi_flags & OBD_STATFS_NODELAY) { + /* procfs requests not want stat in wait for avoid deadlock */ + req->rq_no_resend = 1; + req->rq_no_delay = 1; + } - req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret; - CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args)); - aa = ptlrpc_req_async_args(req); - aa->aa_oi = oinfo; + req->rq_interpret_reply = osc_statfs_interpret; + aa = ptlrpc_req_async_args(aa, req); + aa->aa_oi = oinfo; - ptlrpc_set_add_req(rqset, req); - RETURN(0); + ptlrpc_set_add_req(rqset, req); + RETURN(0); } static int osc_statfs(const struct lu_env *env, struct obd_export *exp, - struct obd_statfs *osfs, __u64 max_age, __u32 flags) + struct obd_statfs *osfs, time64_t max_age, __u32 flags) { - struct obd_device *obd = class_exp2obd(exp); - struct obd_statfs *msfs; - struct ptlrpc_request *req; - struct obd_import *imp = NULL; - int rc; - ENTRY; + struct obd_device *obd = class_exp2obd(exp); + struct obd_statfs *msfs; + struct ptlrpc_request *req; + struct obd_import *imp = NULL; + int rc; + ENTRY; + /*Since the request might also come from lprocfs, so we need *sync this with client_disconnect_export Bug15684*/ @@ -2344,92 +2800,88 @@ static int osc_statfs(const struct lu_env *env, struct obd_export *exp, if (!imp) RETURN(-ENODEV); - /* We could possibly pass max_age in the request (as an absolute - * timestamp or a "seconds.usec ago") so the target can avoid doing - * extra calls into the filesystem if that isn't necessary (e.g. - * during mount that would help a bit). Having relative timestamps - * is not so great if request processing is slow, while absolute - * timestamps are not ideal because they need time synchronization. */ - req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS); + /* We could possibly pass max_age in the request (as an absolute + * timestamp or a "seconds.usec ago") so the target can avoid doing + * extra calls into the filesystem if that isn't necessary (e.g. + * during mount that would help a bit). Having relative timestamps + * is not so great if request processing is slow, while absolute + * timestamps are not ideal because they need time synchronization. */ + req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS); - class_import_put(imp); + class_import_put(imp); - if (req == NULL) - RETURN(-ENOMEM); + if (req == NULL) + RETURN(-ENOMEM); - rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS); - if (rc) { - ptlrpc_request_free(req); - RETURN(rc); - } - ptlrpc_request_set_replen(req); - req->rq_request_portal = OST_CREATE_PORTAL; - ptlrpc_at_set_req_timeout(req); + rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS); + if (rc) { + ptlrpc_request_free(req); + RETURN(rc); + } + ptlrpc_request_set_replen(req); + req->rq_request_portal = OST_CREATE_PORTAL; + ptlrpc_at_set_req_timeout(req); - if (flags & OBD_STATFS_NODELAY) { - /* procfs requests not want stat in wait for avoid deadlock */ - req->rq_no_resend = 1; - req->rq_no_delay = 1; - } + if (flags & OBD_STATFS_NODELAY) { + /* procfs requests not want stat in wait for avoid deadlock */ + req->rq_no_resend = 1; + req->rq_no_delay = 1; + } - rc = ptlrpc_queue_wait(req); - if (rc) - GOTO(out, rc); + rc = ptlrpc_queue_wait(req); + if (rc) + GOTO(out, rc); - msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS); - if (msfs == NULL) { - GOTO(out, rc = -EPROTO); - } + msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS); + if (msfs == NULL) + GOTO(out, rc = -EPROTO); - *osfs = *msfs; + *osfs = *msfs; - EXIT; - out: - ptlrpc_req_finished(req); - return rc; + EXIT; +out: + ptlrpc_req_finished(req); + return rc; } static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len, void *karg, void __user *uarg) { - struct obd_device *obd = exp->exp_obd; - struct obd_ioctl_data *data = karg; - int err = 0; - ENTRY; + struct obd_device *obd = exp->exp_obd; + struct obd_ioctl_data *data = karg; + int rc = 0; + ENTRY; if (!try_module_get(THIS_MODULE)) { CERROR("%s: cannot get module '%s'\n", obd->obd_name, module_name(THIS_MODULE)); return -EINVAL; } - switch (cmd) { - case OBD_IOC_CLIENT_RECOVER: - err = ptlrpc_recover_import(obd->u.cli.cl_import, - data->ioc_inlbuf1, 0); - if (err > 0) - err = 0; - GOTO(out, err); - case IOC_OSC_SET_ACTIVE: - err = ptlrpc_set_import_active(obd->u.cli.cl_import, - data->ioc_offset); - GOTO(out, err); - case OBD_IOC_PING_TARGET: - err = ptlrpc_obd_ping(obd); - GOTO(out, err); + switch (cmd) { + case OBD_IOC_CLIENT_RECOVER: + rc = ptlrpc_recover_import(obd->u.cli.cl_import, + data->ioc_inlbuf1, 0); + if (rc > 0) + rc = 0; + break; + case IOC_OSC_SET_ACTIVE: + rc = ptlrpc_set_import_active(obd->u.cli.cl_import, + data->ioc_offset); + break; default: - CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n", - cmd, current_comm()); - GOTO(out, err = -ENOTTY); + rc = -ENOTTY; + CDEBUG(D_INODE, "%s: unrecognised ioctl %#x by %s: rc = %d\n", + obd->obd_name, cmd, current_comm(), rc); + break; } -out: + module_put(THIS_MODULE); - return err; + return rc; } -static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp, - u32 keylen, void *key, - u32 vallen, void *val, - struct ptlrpc_request_set *set) +int osc_set_info_async(const struct lu_env *env, struct obd_export *exp, + u32 keylen, void *key, u32 vallen, void *val, + struct ptlrpc_request_set *set) { struct ptlrpc_request *req; struct obd_device *obd = exp->exp_obd; @@ -2457,23 +2909,6 @@ static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp, RETURN(0); } - if (KEY_IS(KEY_CACHE_SET)) { - struct client_obd *cli = &obd->u.cli; - - LASSERT(cli->cl_cache == NULL); /* only once */ - cli->cl_cache = (struct cl_client_cache *)val; - cl_cache_incref(cli->cl_cache); - cli->cl_lru_left = &cli->cl_cache->ccc_lru_left; - - /* add this osc into entity list */ - LASSERT(list_empty(&cli->cl_lru_osc)); - spin_lock(&cli->cl_cache->ccc_lru_lock); - list_add(&cli->cl_lru_osc, &cli->cl_cache->ccc_lru); - spin_unlock(&cli->cl_cache->ccc_lru_lock); - - RETURN(0); - } - if (KEY_IS(KEY_CACHE_LRU_SHRINK)) { struct client_obd *cli = &obd->u.cli; long nr = atomic_long_read(&cli->cl_lru_in_list) >> 1; @@ -2516,23 +2951,22 @@ static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp, tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ? &RMF_OST_BODY : &RMF_SETINFO_VAL); - memcpy(tmp, val, vallen); + memcpy(tmp, val, vallen); if (KEY_IS(KEY_GRANT_SHRINK)) { - struct osc_grant_args *aa; - struct obdo *oa; - - CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args)); - aa = ptlrpc_req_async_args(req); - OBDO_ALLOC(oa); - if (!oa) { - ptlrpc_req_finished(req); - RETURN(-ENOMEM); - } - *oa = ((struct ost_body *)val)->oa; - aa->aa_oa = oa; - req->rq_interpret_reply = osc_shrink_grant_interpret; - } + struct osc_grant_args *aa; + struct obdo *oa; + + aa = ptlrpc_req_async_args(aa, req); + OBD_SLAB_ALLOC_PTR_GFP(oa, osc_obdo_kmem, GFP_NOFS); + if (!oa) { + ptlrpc_req_finished(req); + RETURN(-ENOMEM); + } + *oa = ((struct ost_body *)val)->oa; + aa->aa_oa = oa; + req->rq_interpret_reply = osc_shrink_grant_interpret; + } ptlrpc_request_set_replen(req); if (!KEY_IS(KEY_GRANT_SHRINK)) { @@ -2545,25 +2979,27 @@ static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp, RETURN(0); } +EXPORT_SYMBOL(osc_set_info_async); -static int osc_reconnect(const struct lu_env *env, - struct obd_export *exp, struct obd_device *obd, - struct obd_uuid *cluuid, - struct obd_connect_data *data, - void *localdata) +int osc_reconnect(const struct lu_env *env, struct obd_export *exp, + struct obd_device *obd, struct obd_uuid *cluuid, + struct obd_connect_data *data, void *localdata) { - struct client_obd *cli = &obd->u.cli; + struct client_obd *cli = &obd->u.cli; - if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) { - long lost_grant; + if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) { + long lost_grant; long grant; spin_lock(&cli->cl_loi_list_lock); grant = cli->cl_avail_grant + cli->cl_reserved_grant; - if (data->ocd_connect_flags & OBD_CONNECT_GRANT_PARAM) + if (data->ocd_connect_flags & OBD_CONNECT_GRANT_PARAM) { + /* restore ocd_grant_blkbits as client page bits */ + data->ocd_grant_blkbits = PAGE_SHIFT; grant += cli->cl_dirty_grant; - else - grant += cli->cl_dirty_pages << PAGE_CACHE_SHIFT; + } else { + grant += cli->cl_dirty_pages << PAGE_SHIFT; + } data->ocd_grant = grant ? : 2 * cli_brw_size(obd); lost_grant = cli->cl_lost_grant; cli->cl_lost_grant = 0; @@ -2576,37 +3012,36 @@ static int osc_reconnect(const struct lu_env *env, RETURN(0); } +EXPORT_SYMBOL(osc_reconnect); -static int osc_disconnect(struct obd_export *exp) +int osc_disconnect(struct obd_export *exp) { struct obd_device *obd = class_exp2obd(exp); int rc; - rc = client_disconnect_export(exp); - /** - * Initially we put del_shrink_grant before disconnect_export, but it - * causes the following problem if setup (connect) and cleanup - * (disconnect) are tangled together. - * connect p1 disconnect p2 - * ptlrpc_connect_import - * ............... class_manual_cleanup - * osc_disconnect - * del_shrink_grant - * ptlrpc_connect_interrupt - * init_grant_shrink - * add this client to shrink list - * cleanup_osc - * Bang! pinger trigger the shrink. - * So the osc should be disconnected from the shrink list, after we - * are sure the import has been destroyed. BUG18662 - */ - if (obd->u.cli.cl_import == NULL) - osc_del_shrink_grant(&obd->u.cli); - return rc; -} - -static int osc_ldlm_resource_invalidate(struct cfs_hash *hs, - struct cfs_hash_bd *bd, struct hlist_node *hnode, void *arg) + rc = client_disconnect_export(exp); + /** + * Initially we put del_shrink_grant before disconnect_export, but it + * causes the following problem if setup (connect) and cleanup + * (disconnect) are tangled together. + * connect p1 disconnect p2 + * ptlrpc_connect_import + * ............... class_manual_cleanup + * osc_disconnect + * del_shrink_grant + * ptlrpc_connect_interrupt + * osc_init_grant + * add this client to shrink list + * cleanup_osc + * Bang! grant shrink thread trigger the shrink. BUG18662 + */ + osc_del_grant_list(&obd->u.cli); + return rc; +} +EXPORT_SYMBOL(osc_disconnect); + +int osc_ldlm_resource_invalidate(struct cfs_hash *hs, struct cfs_hash_bd *bd, + struct hlist_node *hnode, void *arg) { struct lu_env *env = arg; struct ldlm_resource *res = cfs_hash_object(hs, hnode); @@ -2635,6 +3070,7 @@ static int osc_ldlm_resource_invalidate(struct cfs_hash *hs, RETURN(0); } +EXPORT_SYMBOL(osc_ldlm_resource_invalidate); static int osc_import_event(struct obd_device *obd, struct obd_import *imp, @@ -2656,7 +3092,7 @@ static int osc_import_event(struct obd_device *obd, break; } case IMP_EVENT_INACTIVE: { - rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL); + rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE); break; } case IMP_EVENT_INVALIDATE: { @@ -2681,7 +3117,7 @@ static int osc_import_event(struct obd_device *obd, break; } case IMP_EVENT_ACTIVE: { - rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL); + rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE); break; } case IMP_EVENT_OCD: { @@ -2694,15 +3130,15 @@ static int osc_import_event(struct obd_device *obd, if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL) imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL; - rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL); + rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD); break; } case IMP_EVENT_DEACTIVATE: { - rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE, NULL); + rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE); break; } case IMP_EVENT_ACTIVATE: { - rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE, NULL); + rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE); break; } default: @@ -2725,7 +3161,7 @@ static int osc_cancel_weight(struct ldlm_lock *lock) * Cancel all unused and granted extent lock. */ if (lock->l_resource->lr_type == LDLM_EXTENT && - lock->l_granted_mode == lock->l_req_mode && + ldlm_is_granted(lock) && osc_ldlm_weigh_ast(lock) == 0) RETURN(1); @@ -2742,15 +3178,12 @@ static int brw_queue_work(const struct lu_env *env, void *data) RETURN(0); } -int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg) +int osc_setup_common(struct obd_device *obd, struct lustre_cfg *lcfg) { struct client_obd *cli = &obd->u.cli; - struct obd_type *type; - void *handler; - int rc; - int adding; - int added; - int req_count; + void *handler; + int rc; + ENTRY; rc = ptlrpcd_addref(); @@ -2761,9 +3194,10 @@ int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg) if (rc) GOTO(out_ptlrpcd, rc); + handler = ptlrpcd_alloc_work(cli->cl_import, brw_queue_work, cli); if (IS_ERR(handler)) - GOTO(out_client_setup, rc = PTR_ERR(handler)); + GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler)); cli->cl_writeback_work = handler; handler = ptlrpcd_alloc_work(cli->cl_import, lru_queue_work, cli); @@ -2776,36 +3210,43 @@ int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg) GOTO(out_ptlrpcd_work, rc); cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL; + osc_update_next_shrink(cli); -#ifdef CONFIG_PROC_FS - obd->obd_vars = lprocfs_osc_obd_vars; -#endif - /* If this is true then both client (osc) and server (osp) are on the - * same node. The osp layer if loaded first will register the osc proc - * directory. In that case this obd_device will be attached its proc - * tree to type->typ_procsym instead of obd->obd_type->typ_procroot. */ - type = class_search_type(LUSTRE_OSP_NAME); - if (type && type->typ_procsym) { - obd->obd_proc_entry = lprocfs_register(obd->obd_name, - type->typ_procsym, - obd->obd_vars, obd); - if (IS_ERR(obd->obd_proc_entry)) { - rc = PTR_ERR(obd->obd_proc_entry); - CERROR("error %d setting up lprocfs for %s\n", rc, - obd->obd_name); - obd->obd_proc_entry = NULL; - } - } else { - rc = lprocfs_obd_setup(obd); - } + RETURN(rc); - /* If the basic OSC proc tree construction succeeded then - * lets do the rest. */ - if (rc == 0) { - lproc_osc_attach_seqstat(obd); - sptlrpc_lprocfs_cliobd_attach(obd); - ptlrpc_lprocfs_register_obd(obd); +out_ptlrpcd_work: + if (cli->cl_writeback_work != NULL) { + ptlrpcd_destroy_work(cli->cl_writeback_work); + cli->cl_writeback_work = NULL; } + if (cli->cl_lru_work != NULL) { + ptlrpcd_destroy_work(cli->cl_lru_work); + cli->cl_lru_work = NULL; + } + client_obd_cleanup(obd); +out_ptlrpcd: + ptlrpcd_decref(); + RETURN(rc); +} +EXPORT_SYMBOL(osc_setup_common); + +int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg) +{ + struct client_obd *cli = &obd->u.cli; + int adding; + int added; + int req_count; + int rc; + + ENTRY; + + rc = osc_setup_common(obd, lcfg); + if (rc < 0) + RETURN(rc); + + rc = osc_tunables_init(obd); + if (rc) + RETURN(rc); /* * We try to control the total number of requests with a upper limit @@ -2822,32 +3263,18 @@ int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg) atomic_add(added, &osc_pool_req_count); } - INIT_LIST_HEAD(&cli->cl_grant_shrink_list); ns_register_cancel(obd->obd_namespace, osc_cancel_weight); spin_lock(&osc_shrink_lock); list_add_tail(&cli->cl_shrink_list, &osc_shrink_list); spin_unlock(&osc_shrink_lock); + cli->cl_import->imp_idle_timeout = osc_idle_timeout; + cli->cl_import->imp_idle_debug = D_HA; RETURN(0); - -out_ptlrpcd_work: - if (cli->cl_writeback_work != NULL) { - ptlrpcd_destroy_work(cli->cl_writeback_work); - cli->cl_writeback_work = NULL; - } - if (cli->cl_lru_work != NULL) { - ptlrpcd_destroy_work(cli->cl_lru_work); - cli->cl_lru_work = NULL; - } -out_client_setup: - client_obd_cleanup(obd); -out_ptlrpcd: - ptlrpcd_decref(); - RETURN(rc); } -static int osc_precleanup(struct obd_device *obd) +int osc_precleanup_common(struct obd_device *obd) { struct client_obd *cli = &obd->u.cli; ENTRY; @@ -2873,12 +3300,21 @@ static int osc_precleanup(struct obd_device *obd) } obd_cleanup_client_import(obd); + RETURN(0); +} +EXPORT_SYMBOL(osc_precleanup_common); + +static int osc_precleanup(struct obd_device *obd) +{ + ENTRY; + + osc_precleanup_common(obd); + ptlrpc_lprocfs_unregister_obd(obd); - lprocfs_obd_cleanup(obd); RETURN(0); } -int osc_cleanup(struct obd_device *obd) +int osc_cleanup_common(struct obd_device *obd) { struct client_obd *cli = &obd->u.cli; int rc; @@ -2908,26 +3344,16 @@ int osc_cleanup(struct obd_device *obd) ptlrpcd_decref(); RETURN(rc); } +EXPORT_SYMBOL(osc_cleanup_common); -int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg) -{ - int rc = class_process_proc_param(PARAM_OSC, obd->obd_vars, lcfg, obd); - return rc > 0 ? 0: rc; -} - -static int osc_process_config(struct obd_device *obd, size_t len, void *buf) -{ - return osc_process_config_base(obd, buf); -} - -static struct obd_ops osc_obd_ops = { +static const struct obd_ops osc_obd_ops = { .o_owner = THIS_MODULE, .o_setup = osc_setup, .o_precleanup = osc_precleanup, - .o_cleanup = osc_cleanup, + .o_cleanup = osc_cleanup_common, .o_add_conn = client_import_add_conn, .o_del_conn = client_import_del_conn, - .o_connect = client_connect_import, + .o_connect = client_connect_import, .o_reconnect = osc_reconnect, .o_disconnect = osc_disconnect, .o_statfs = osc_statfs, @@ -2939,12 +3365,11 @@ static struct obd_ops osc_obd_ops = { .o_iocontrol = osc_iocontrol, .o_set_info_async = osc_set_info_async, .o_import_event = osc_import_event, - .o_process_config = osc_process_config, .o_quotactl = osc_quotactl, }; static struct shrinker *osc_cache_shrinker; -struct list_head osc_shrink_list = LIST_HEAD_INIT(osc_shrink_list); +LIST_HEAD(osc_shrink_list); DEFINE_SPINLOCK(osc_shrink_lock); #ifndef HAVE_SHRINKER_COUNT @@ -2954,10 +3379,6 @@ static int osc_cache_shrink(SHRINKER_ARGS(sc, nr_to_scan, gfp_mask)) .nr_to_scan = shrink_param(sc, nr_to_scan), .gfp_mask = shrink_param(sc, gfp_mask) }; -#if !defined(HAVE_SHRINKER_WANT_SHRINK_PTR) && !defined(HAVE_SHRINK_CONTROL) - struct shrinker *shrinker = NULL; -#endif - (void)osc_cache_shrink_scan(shrinker, &scv); return osc_cache_shrink_count(shrinker, &scv); @@ -2966,8 +3387,6 @@ static int osc_cache_shrink(SHRINKER_ARGS(sc, nr_to_scan, gfp_mask)) static int __init osc_init(void) { - bool enable_proc = true; - struct obd_type *type; unsigned int reqpool_size; unsigned int reqsize; int rc; @@ -2984,11 +3403,7 @@ static int __init osc_init(void) if (rc) RETURN(rc); - type = class_search_type(LUSTRE_OSP_NAME); - if (type != NULL && type->typ_procsym != NULL) - enable_proc = false; - - rc = class_register_type(&osc_obd_ops, NULL, enable_proc, NULL, + rc = class_register_type(&osc_obd_ops, NULL, true, NULL, LUSTRE_OSC_NAME, &osc_device_type); if (rc) GOTO(out_kmem, rc); @@ -3017,19 +3432,28 @@ static int __init osc_init(void) osc_rq_pool = ptlrpc_init_rq_pool(0, OST_IO_MAXREQSIZE, ptlrpc_add_rqs_to_pool); - if (osc_rq_pool != NULL) - GOTO(out, rc); - rc = -ENOMEM; + if (osc_rq_pool == NULL) + GOTO(out_type, rc = -ENOMEM); + + rc = osc_start_grant_work(); + if (rc != 0) + GOTO(out_req_pool, rc); + + RETURN(rc); + +out_req_pool: + ptlrpc_free_rq_pool(osc_rq_pool); out_type: class_unregister_type(LUSTRE_OSC_NAME); out_kmem: lu_kmem_fini(osc_caches); -out: + RETURN(rc); } static void __exit osc_exit(void) { + osc_stop_grant_work(); remove_shrinker(osc_cache_shrinker); class_unregister_type(LUSTRE_OSC_NAME); lu_kmem_fini(osc_caches);