X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Fosc%2Fosc_request.c;h=cb53cc692816638a997e188e3fe3e6c85fd6b4e7;hp=71948cbf7e4c3f46ae2ed2fc3117a0bf5acc9d0e;hb=eecf86131d099242d2e8c1f5d6be241ec1416c9a;hpb=c0246d8878097a618efa9296d90896ac2cc2e9e4 diff --git a/lustre/osc/osc_request.c b/lustre/osc/osc_request.c index 71948cb..cb53cc6 100644 --- a/lustre/osc/osc_request.c +++ b/lustre/osc/osc_request.c @@ -32,6 +32,9 @@ #define DEBUG_SUBSYSTEM S_OSC +#include +#include +#include #include #include #include @@ -40,11 +43,11 @@ #include #include #include -#include #include #include #include #include +#include #include "osc_internal.h" @@ -56,6 +59,9 @@ struct ptlrpc_request_pool *osc_rq_pool; static unsigned int osc_reqpool_mem_max = 5; module_param(osc_reqpool_mem_max, uint, 0444); +static int osc_idle_timeout = 20; +module_param(osc_idle_timeout, uint, 0644); + #define osc_grant_args osc_brw_async_args struct osc_setattr_args { @@ -176,24 +182,25 @@ out: } static int osc_setattr_interpret(const struct lu_env *env, - struct ptlrpc_request *req, - struct osc_setattr_args *sa, int rc) + struct ptlrpc_request *req, void *args, int rc) { - struct ost_body *body; - ENTRY; + struct osc_setattr_args *sa = args; + struct ost_body *body; - if (rc != 0) - GOTO(out, rc); + ENTRY; - body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY); - if (body == NULL) - GOTO(out, rc = -EPROTO); + if (rc != 0) + GOTO(out, rc); + + body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY); + if (body == NULL) + GOTO(out, rc = -EPROTO); lustre_get_wire_obdo(&req->rq_import->imp_connect_data, sa->sa_oa, &body->oa); out: - rc = sa->sa_upcall(sa->sa_cookie, rc); - RETURN(rc); + rc = sa->sa_upcall(sa->sa_cookie, rc); + RETURN(rc); } int osc_setattr_async(struct obd_export *exp, struct obdo *oa, @@ -225,19 +232,14 @@ int osc_setattr_async(struct obd_export *exp, struct obdo *oa, /* Do not wait for response. */ ptlrpcd_add_req(req); } else { - req->rq_interpret_reply = - (ptlrpc_interpterer_t)osc_setattr_interpret; + req->rq_interpret_reply = osc_setattr_interpret; - CLASSERT(sizeof(*sa) <= sizeof(req->rq_async_args)); - sa = ptlrpc_req_async_args(req); + sa = ptlrpc_req_async_args(sa, req); sa->sa_oa = oa; sa->sa_upcall = upcall; sa->sa_cookie = cookie; - if (rqset == PTLRPCD_SET) - ptlrpcd_add_req(req); - else - ptlrpc_set_add_req(rqset, req); + ptlrpc_set_add_req(rqset, req); } RETURN(0); @@ -317,16 +319,12 @@ int osc_ladvise_base(struct obd_export *exp, struct obdo *oa, } req->rq_interpret_reply = osc_ladvise_interpret; - CLASSERT(sizeof(*la) <= sizeof(req->rq_async_args)); - la = ptlrpc_req_async_args(req); + la = ptlrpc_req_async_args(la, req); la->la_oa = oa; la->la_upcall = upcall; la->la_cookie = cookie; - if (rqset == PTLRPCD_SET) - ptlrpcd_add_req(req); - else - ptlrpc_set_add_req(rqset, req); + ptlrpc_set_add_req(rqset, req); RETURN(0); } @@ -413,9 +411,8 @@ int osc_punch_send(struct obd_export *exp, struct obdo *oa, ptlrpc_request_set_replen(req); - req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret; - CLASSERT(sizeof(*sa) <= sizeof(req->rq_async_args)); - sa = ptlrpc_req_async_args(req); + req->rq_interpret_reply = osc_setattr_interpret; + sa = ptlrpc_req_async_args(sa, req); sa->sa_oa = oa; sa->sa_upcall = upcall; sa->sa_cookie = cookie; @@ -426,15 +423,79 @@ int osc_punch_send(struct obd_export *exp, struct obdo *oa, } EXPORT_SYMBOL(osc_punch_send); +/** + * osc_fallocate_base() - Handles fallocate request. + * + * @exp: Export structure + * @oa: Attributes passed to OSS from client (obdo structure) + * @upcall: Primary & supplementary group information + * @cookie: Exclusive identifier + * @rqset: Request list. + * @mode: Operation done on given range. + * + * osc_fallocate_base() - Handles fallocate requests only. Only block + * allocation or standard preallocate operation is supported currently. + * Other mode flags is not supported yet. ftruncate(2) or truncate(2) + * is supported via SETATTR request. + * + * Return: Non-zero on failure and O on success. + */ +int osc_fallocate_base(struct obd_export *exp, struct obdo *oa, + obd_enqueue_update_f upcall, void *cookie, int mode) +{ + struct ptlrpc_request *req; + struct osc_setattr_args *sa; + struct ost_body *body; + struct obd_import *imp = class_exp2cliimp(exp); + int rc; + ENTRY; + + /* + * Only mode == 0 (which is standard prealloc) is supported now. + * Punch is not supported yet. + */ + if (mode & ~FALLOC_FL_KEEP_SIZE) + RETURN(-EOPNOTSUPP); + oa->o_falloc_mode = mode; + + req = ptlrpc_request_alloc(class_exp2cliimp(exp), + &RQF_OST_FALLOCATE); + if (req == NULL) + RETURN(-ENOMEM); + + rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_FALLOCATE); + if (rc != 0) { + ptlrpc_request_free(req); + RETURN(rc); + } + + body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY); + LASSERT(body); + + lustre_set_wire_obdo(&imp->imp_connect_data, &body->oa, oa); + + ptlrpc_request_set_replen(req); + + req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret; + BUILD_BUG_ON(sizeof(*sa) > sizeof(req->rq_async_args)); + sa = ptlrpc_req_async_args(sa, req); + sa->sa_oa = oa; + sa->sa_upcall = upcall; + sa->sa_cookie = cookie; + + ptlrpcd_add_req(req); + + RETURN(0); +} + static int osc_sync_interpret(const struct lu_env *env, - struct ptlrpc_request *req, - void *arg, int rc) + struct ptlrpc_request *req, void *args, int rc) { - struct osc_fsync_args *fa = arg; - struct ost_body *body; - struct cl_attr *attr = &osc_env_info(env)->oti_attr; - unsigned long valid = 0; - struct cl_object *obj; + struct osc_fsync_args *fa = args; + struct ost_body *body; + struct cl_attr *attr = &osc_env_info(env)->oti_attr; + unsigned long valid = 0; + struct cl_object *obj; ENTRY; if (rc != 0) @@ -494,17 +555,13 @@ int osc_sync_base(struct osc_object *obj, struct obdo *oa, ptlrpc_request_set_replen(req); req->rq_interpret_reply = osc_sync_interpret; - CLASSERT(sizeof(*fa) <= sizeof(req->rq_async_args)); - fa = ptlrpc_req_async_args(req); + fa = ptlrpc_req_async_args(fa, req); fa->fa_obj = obj; fa->fa_oa = oa; fa->fa_upcall = upcall; fa->fa_cookie = cookie; - if (rqset == PTLRPCD_SET) - ptlrpcd_add_req(req); - else - ptlrpc_set_add_req(rqset, req); + ptlrpc_set_add_req(rqset, req); RETURN (0); } @@ -545,13 +602,13 @@ static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa, } static int osc_destroy_interpret(const struct lu_env *env, - struct ptlrpc_request *req, void *data, - int rc) + struct ptlrpc_request *req, void *args, int rc) { struct client_obd *cli = &req->rq_import->imp_obd->u.cli; atomic_dec(&cli->cl_destroy_in_flight); wake_up(&cli->cl_destroy_waitq); + return 0; } @@ -579,7 +636,7 @@ static int osc_destroy(const struct lu_env *env, struct obd_export *exp, struct client_obd *cli = &exp->exp_obd->u.cli; struct ptlrpc_request *req; struct ost_body *body; - struct list_head cancels = LIST_HEAD_INIT(cancels); + LIST_HEAD(cancels); int rc, count; ENTRY; @@ -615,17 +672,16 @@ static int osc_destroy(const struct lu_env *env, struct obd_export *exp, req->rq_interpret_reply = osc_destroy_interpret; if (!osc_can_send_destroy(cli)) { - struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL); - /* * Wait until the number of on-going destroy RPCs drops * under max_rpc_in_flight */ - rc = l_wait_event_exclusive(cli->cl_destroy_waitq, - osc_can_send_destroy(cli), &lwi); + rc = l_wait_event_abortable_exclusive( + cli->cl_destroy_waitq, + osc_can_send_destroy(cli)); if (rc) { ptlrpc_req_finished(req); - RETURN(rc); + RETURN(-EINTR); } } @@ -647,21 +703,18 @@ static void osc_announce_cached(struct client_obd *cli, struct obdo *oa, oa->o_dirty = cli->cl_dirty_grant; else oa->o_dirty = cli->cl_dirty_pages << PAGE_SHIFT; - if (unlikely(cli->cl_dirty_pages - cli->cl_dirty_transit > - cli->cl_dirty_max_pages)) { - CERROR("dirty %lu - %lu > dirty_max %lu\n", - cli->cl_dirty_pages, cli->cl_dirty_transit, + if (unlikely(cli->cl_dirty_pages > cli->cl_dirty_max_pages)) { + CERROR("dirty %lu > dirty_max %lu\n", + cli->cl_dirty_pages, cli->cl_dirty_max_pages); oa->o_undirty = 0; - } else if (unlikely(atomic_long_read(&obd_dirty_pages) - - atomic_long_read(&obd_dirty_transit_pages) > + } else if (unlikely(atomic_long_read(&obd_dirty_pages) > (long)(obd_max_dirty_pages + 1))) { /* The atomic_read() allowing the atomic_inc() are * not covered by a lock thus they may safely race and trip * this CERROR() unless we add in a small fudge factor (+1). */ - CERROR("%s: dirty %ld - %ld > system dirty_max %ld\n", + CERROR("%s: dirty %ld > system dirty_max %ld\n", cli_name(cli), atomic_long_read(&obd_dirty_pages), - atomic_long_read(&obd_dirty_transit_pages), obd_max_dirty_pages); oa->o_undirty = 0; } else if (unlikely(cli->cl_dirty_max_pages - cli->cl_dirty_pages > @@ -690,8 +743,8 @@ static void osc_announce_cached(struct client_obd *cli, struct obdo *oa, /* Do not ask for more than OBD_MAX_GRANT - a margin for server * to add extent tax, etc. */ - oa->o_undirty = min(undirty, OBD_MAX_GRANT - - (PTLRPC_MAX_BRW_PAGES << PAGE_SHIFT)*4UL); + oa->o_undirty = min(undirty, OBD_MAX_GRANT & + ~(PTLRPC_MAX_BRW_SIZE * 4UL)); } oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant; oa->o_dropped = cli->cl_lost_grant; @@ -725,25 +778,37 @@ static void osc_update_grant(struct client_obd *cli, struct ost_body *body) } } +/** + * grant thread data for shrinking space. + */ +struct grant_thread_data { + struct list_head gtd_clients; + struct mutex gtd_mutex; + unsigned long gtd_stopped:1; +}; +static struct grant_thread_data client_gtd; + static int osc_shrink_grant_interpret(const struct lu_env *env, - struct ptlrpc_request *req, - void *aa, int rc) + struct ptlrpc_request *req, + void *args, int rc) { - struct client_obd *cli = &req->rq_import->imp_obd->u.cli; - struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa; - struct ost_body *body; + struct osc_grant_args *aa = args; + struct client_obd *cli = &req->rq_import->imp_obd->u.cli; + struct ost_body *body; - if (rc != 0) { - __osc_update_grant(cli, oa->o_grant); - GOTO(out, rc); - } + if (rc != 0) { + __osc_update_grant(cli, aa->aa_oa->o_grant); + GOTO(out, rc); + } - body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY); - LASSERT(body); - osc_update_grant(cli, body); + body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY); + LASSERT(body); + osc_update_grant(cli, body); out: - OBDO_FREE(oa); - return rc; + OBD_SLAB_FREE_PTR(aa->aa_oa, osc_obdo_kmem); + aa->aa_oa = NULL; + + return rc; } static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa) @@ -803,6 +868,11 @@ int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes) osc_announce_cached(cli, &body->oa, 0); spin_lock(&cli->cl_loi_list_lock); + if (target_bytes >= cli->cl_avail_grant) { + /* available grant has changed since target calculation */ + spin_unlock(&cli->cl_loi_list_lock); + GOTO(out_free, rc = 0); + } body->oa.o_grant = cli->cl_avail_grant - target_bytes; cli->cl_avail_grant = target_bytes; spin_unlock(&cli->cl_loi_list_lock); @@ -818,6 +888,7 @@ int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes) sizeof(*body), body, NULL); if (rc != 0) __osc_update_grant(cli, body->oa.o_grant); +out_free: OBD_FREE_PTR(body); RETURN(rc); } @@ -826,9 +897,14 @@ static int osc_should_shrink_grant(struct client_obd *client) { time64_t next_shrink = client->cl_next_shrink_grant; - if ((client->cl_import->imp_connect_data.ocd_connect_flags & - OBD_CONNECT_GRANT_SHRINK) == 0) - return 0; + if (client->cl_import == NULL) + return 0; + + if (!OCD_HAS_FLAG(&client->cl_import->imp_connect_data, GRANT_SHRINK) || + client->cl_import->imp_grant_shrink_disabled) { + osc_update_next_shrink(client); + return 0; + } if (ktime_get_seconds() >= next_shrink - 5) { /* Get the current RPC size directly, instead of going via: @@ -845,38 +921,92 @@ static int osc_should_shrink_grant(struct client_obd *client) return 0; } -static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data) +#define GRANT_SHRINK_RPC_BATCH 100 + +static struct delayed_work work; + +static void osc_grant_work_handler(struct work_struct *data) { - struct client_obd *client; + struct client_obd *cli; + int rpc_sent; + bool init_next_shrink = true; + time64_t next_shrink = ktime_get_seconds() + GRANT_SHRINK_INTERVAL; + + rpc_sent = 0; + mutex_lock(&client_gtd.gtd_mutex); + list_for_each_entry(cli, &client_gtd.gtd_clients, + cl_grant_chain) { + if (rpc_sent < GRANT_SHRINK_RPC_BATCH && + osc_should_shrink_grant(cli)) { + osc_shrink_grant(cli); + rpc_sent++; + } - list_for_each_entry(client, &item->ti_obd_list, cl_grant_shrink_list) { - if (osc_should_shrink_grant(client)) - osc_shrink_grant(client); + if (!init_next_shrink) { + if (cli->cl_next_shrink_grant < next_shrink && + cli->cl_next_shrink_grant > ktime_get_seconds()) + next_shrink = cli->cl_next_shrink_grant; + } else { + init_next_shrink = false; + next_shrink = cli->cl_next_shrink_grant; + } + } + mutex_unlock(&client_gtd.gtd_mutex); + + if (client_gtd.gtd_stopped == 1) + return; + + if (next_shrink > ktime_get_seconds()) { + time64_t delay = next_shrink - ktime_get_seconds(); + + schedule_delayed_work(&work, cfs_time_seconds(delay)); + } else { + schedule_work(&work.work); } - return 0; } -static int osc_add_shrink_grant(struct client_obd *client) +void osc_schedule_grant_work(void) { - int rc; + cancel_delayed_work_sync(&work); + schedule_work(&work.work); +} + +/** + * Start grant thread for returing grant to server for idle clients. + */ +static int osc_start_grant_work(void) +{ + client_gtd.gtd_stopped = 0; + mutex_init(&client_gtd.gtd_mutex); + INIT_LIST_HEAD(&client_gtd.gtd_clients); + + INIT_DELAYED_WORK(&work, osc_grant_work_handler); + schedule_work(&work.work); - rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval, - TIMEOUT_GRANT, - osc_grant_shrink_grant_cb, NULL, - &client->cl_grant_shrink_list); - if (rc) { - CERROR("add grant client %s error %d\n", cli_name(client), rc); - return rc; - } - CDEBUG(D_CACHE, "add grant client %s\n", cli_name(client)); - osc_update_next_shrink(client); return 0; } -static int osc_del_shrink_grant(struct client_obd *client) +static void osc_stop_grant_work(void) +{ + client_gtd.gtd_stopped = 1; + cancel_delayed_work_sync(&work); +} + +static void osc_add_grant_list(struct client_obd *client) +{ + mutex_lock(&client_gtd.gtd_mutex); + list_add(&client->cl_grant_chain, &client_gtd.gtd_clients); + mutex_unlock(&client_gtd.gtd_mutex); +} + +static void osc_del_grant_list(struct client_obd *client) { - return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list, - TIMEOUT_GRANT); + if (list_empty(&client->cl_grant_chain)) + return; + + mutex_lock(&client_gtd.gtd_mutex); + list_del_init(&client->cl_grant_chain); + mutex_unlock(&client_gtd.gtd_mutex); } void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd) @@ -926,15 +1056,14 @@ void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd) } spin_unlock(&cli->cl_loi_list_lock); - CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld." - "chunk bits: %d cl_max_extent_pages: %d\n", - cli_name(cli), - cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits, - cli->cl_max_extent_pages); + CDEBUG(D_CACHE, + "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld. chunk bits: %d cl_max_extent_pages: %d\n", + cli_name(cli), + cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits, + cli->cl_max_extent_pages); - if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK && - list_empty(&cli->cl_grant_shrink_list)) - osc_add_shrink_grant(cli); + if (OCD_HAS_FLAG(ocd, GRANT_SHRINK) && list_empty(&cli->cl_grant_chain)) + osc_add_grant_list(cli); } EXPORT_SYMBOL(osc_init_grant); @@ -994,8 +1123,11 @@ static int check_write_rcs(struct ptlrpc_request *req, /* return error if any niobuf was in error */ for (i = 0; i < niocount; i++) { - if ((int)remote_rcs[i] < 0) - return(remote_rcs[i]); + if ((int)remote_rcs[i] < 0) { + CDEBUG(D_INFO, "rc[%d]: %d req %p\n", + i, remote_rcs[i], req); + return remote_rcs[i]; + } if (remote_rcs[i] != 0) { CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n", @@ -1024,7 +1156,7 @@ static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2) * safe to combine */ if (unlikely((p1->flag & mask) != (p2->flag & mask))) { CWARN("Saw flags 0x%x and 0x%x in the same brw, please " - "report this at https://jira.hpdd.intel.com/\n", + "report this at https://jira.whamcloud.com/\n", p1->flag, p2->flag); } return 0; @@ -1033,23 +1165,128 @@ static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2) return (p1->off + p1->count == p2->off); } +#if IS_ENABLED(CONFIG_CRC_T10DIF) +static int osc_checksum_bulk_t10pi(const char *obd_name, int nob, + size_t pg_count, struct brw_page **pga, + int opc, obd_dif_csum_fn *fn, + int sector_size, + u32 *check_sum) +{ + struct ahash_request *req; + /* Used Adler as the default checksum type on top of DIF tags */ + unsigned char cfs_alg = cksum_obd2cfs(OBD_CKSUM_T10_TOP); + struct page *__page; + unsigned char *buffer; + __u16 *guard_start; + unsigned int bufsize; + int guard_number; + int used_number = 0; + int used; + u32 cksum; + int rc = 0; + int i = 0; + + LASSERT(pg_count > 0); + + __page = alloc_page(GFP_KERNEL); + if (__page == NULL) + return -ENOMEM; + + req = cfs_crypto_hash_init(cfs_alg, NULL, 0); + if (IS_ERR(req)) { + rc = PTR_ERR(req); + CERROR("%s: unable to initialize checksum hash %s: rc = %d\n", + obd_name, cfs_crypto_hash_name(cfs_alg), rc); + GOTO(out, rc); + } + + buffer = kmap(__page); + guard_start = (__u16 *)buffer; + guard_number = PAGE_SIZE / sizeof(*guard_start); + while (nob > 0 && pg_count > 0) { + unsigned int count = pga[i]->count > nob ? nob : pga[i]->count; + + /* corrupt the data before we compute the checksum, to + * simulate an OST->client data error */ + if (unlikely(i == 0 && opc == OST_READ && + OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE))) { + unsigned char *ptr = kmap(pga[i]->pg); + int off = pga[i]->off & ~PAGE_MASK; + + memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob)); + kunmap(pga[i]->pg); + } + + /* + * The left guard number should be able to hold checksums of a + * whole page + */ + rc = obd_page_dif_generate_buffer(obd_name, pga[i]->pg, + pga[i]->off & ~PAGE_MASK, + count, + guard_start + used_number, + guard_number - used_number, + &used, sector_size, + fn); + if (rc) + break; + + used_number += used; + if (used_number == guard_number) { + cfs_crypto_hash_update_page(req, __page, 0, + used_number * sizeof(*guard_start)); + used_number = 0; + } + + nob -= pga[i]->count; + pg_count--; + i++; + } + kunmap(__page); + if (rc) + GOTO(out, rc); + + if (used_number != 0) + cfs_crypto_hash_update_page(req, __page, 0, + used_number * sizeof(*guard_start)); + + bufsize = sizeof(cksum); + cfs_crypto_hash_final(req, (unsigned char *)&cksum, &bufsize); + + /* For sending we only compute the wrong checksum instead + * of corrupting the data so it is still correct on a redo */ + if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND)) + cksum++; + + *check_sum = cksum; +out: + __free_page(__page); + return rc; +} +#else /* !CONFIG_CRC_T10DIF */ +#define obd_dif_ip_fn NULL +#define obd_dif_crc_fn NULL +#define osc_checksum_bulk_t10pi(name, nob, pgc, pga, opc, fn, ssize, csum) \ + -EOPNOTSUPP +#endif /* CONFIG_CRC_T10DIF */ + static int osc_checksum_bulk(int nob, size_t pg_count, struct brw_page **pga, int opc, enum cksum_types cksum_type, u32 *cksum) { int i = 0; - struct cfs_crypto_hash_desc *hdesc; + struct ahash_request *req; unsigned int bufsize; unsigned char cfs_alg = cksum_obd2cfs(cksum_type); LASSERT(pg_count > 0); - hdesc = cfs_crypto_hash_init(cfs_alg, NULL, 0); - if (IS_ERR(hdesc)) { + req = cfs_crypto_hash_init(cfs_alg, NULL, 0); + if (IS_ERR(req)) { CERROR("Unable to initialize checksum hash %s\n", cfs_crypto_hash_name(cfs_alg)); - return PTR_ERR(hdesc); + return PTR_ERR(req); } while (nob > 0 && pg_count > 0) { @@ -1065,7 +1302,7 @@ static int osc_checksum_bulk(int nob, size_t pg_count, memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob)); kunmap(pga[i]->pg); } - cfs_crypto_hash_update_page(hdesc, pga[i]->pg, + cfs_crypto_hash_update_page(req, pga[i]->pg, pga[i]->off & ~PAGE_MASK, count); LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d\n", @@ -1077,7 +1314,7 @@ static int osc_checksum_bulk(int nob, size_t pg_count, } bufsize = sizeof(*cksum); - cfs_crypto_hash_final(hdesc, (unsigned char *)cksum, &bufsize); + cfs_crypto_hash_final(req, (unsigned char *)cksum, &bufsize); /* For sending we only compute the wrong checksum instead * of corrupting the data so it is still correct on a redo */ @@ -1087,27 +1324,73 @@ static int osc_checksum_bulk(int nob, size_t pg_count, return 0; } +static int osc_checksum_bulk_rw(const char *obd_name, + enum cksum_types cksum_type, + int nob, size_t pg_count, + struct brw_page **pga, int opc, + u32 *check_sum) +{ + obd_dif_csum_fn *fn = NULL; + int sector_size = 0; + int rc; + + ENTRY; + obd_t10_cksum2dif(cksum_type, &fn, §or_size); + + if (fn) + rc = osc_checksum_bulk_t10pi(obd_name, nob, pg_count, pga, + opc, fn, sector_size, check_sum); + else + rc = osc_checksum_bulk(nob, pg_count, pga, opc, cksum_type, + check_sum); + + RETURN(rc); +} + +static inline void osc_release_bounce_pages(struct brw_page **pga, + u32 page_count) +{ +#ifdef HAVE_LUSTRE_CRYPTO + int i; + + for (i = 0; i < page_count; i++) { + if (pga[i]->pg->mapping) + /* bounce pages are unmapped */ + continue; + if (pga[i]->flag & OBD_BRW_SYNC) + /* sync transfer cannot have encrypted pages */ + continue; + llcrypt_finalize_bounce_page(&pga[i]->pg); + pga[i]->count -= pga[i]->bp_count_diff; + pga[i]->off += pga[i]->bp_off_diff; + } +#endif +} + static int osc_brw_prep_request(int cmd, struct client_obd *cli, struct obdo *oa, u32 page_count, struct brw_page **pga, struct ptlrpc_request **reqp, int resend) { - struct ptlrpc_request *req; - struct ptlrpc_bulk_desc *desc; - struct ost_body *body; - struct obd_ioobj *ioobj; - struct niobuf_remote *niobuf; + struct ptlrpc_request *req; + struct ptlrpc_bulk_desc *desc; + struct ost_body *body; + struct obd_ioobj *ioobj; + struct niobuf_remote *niobuf; int niocount, i, requested_nob, opc, rc, short_io_size = 0; - struct osc_brw_async_args *aa; - struct req_capsule *pill; - struct brw_page *pg_prev; + struct osc_brw_async_args *aa; + struct req_capsule *pill; + struct brw_page *pg_prev; void *short_io_buf; + const char *obd_name = cli->cl_import->imp_obd->obd_name; + struct inode *inode; - ENTRY; - if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ)) - RETURN(-ENOMEM); /* Recoverable */ - if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2)) - RETURN(-EINVAL); /* Fatal */ + ENTRY; + inode = page2inode(pga[0]->pg); + if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ)) + RETURN(-ENOMEM); /* Recoverable */ + if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2)) + RETURN(-EINVAL); /* Fatal */ if ((cmd & OBD_BRW_WRITE) != 0) { opc = OST_WRITE; @@ -1121,6 +1404,51 @@ osc_brw_prep_request(int cmd, struct client_obd *cli, struct obdo *oa, if (req == NULL) RETURN(-ENOMEM); + if (opc == OST_WRITE && inode && IS_ENCRYPTED(inode)) { + for (i = 0; i < page_count; i++) { + struct brw_page *pg = pga[i]; + struct page *data_page = NULL; + bool retried = false; + bool lockedbymyself; + +retry_encrypt: + /* The page can already be locked when we arrive here. + * This is possible when cl_page_assume/vvp_page_assume + * is stuck on wait_on_page_writeback with page lock + * held. In this case there is no risk for the lock to + * be released while we are doing our encryption + * processing, because writeback against that page will + * end in vvp_page_completion_write/cl_page_completion, + * which means only once the page is fully processed. + */ + lockedbymyself = trylock_page(pg->pg); + data_page = + llcrypt_encrypt_pagecache_blocks(pg->pg, + PAGE_SIZE, 0, + GFP_NOFS); + if (lockedbymyself) + unlock_page(pg->pg); + if (IS_ERR(data_page)) { + rc = PTR_ERR(data_page); + if (rc == -ENOMEM && !retried) { + retried = true; + rc = 0; + goto retry_encrypt; + } + ptlrpc_request_free(req); + RETURN(rc); + } + /* len is forced to PAGE_SIZE, and poff to 0 + * so store the old, clear text info + */ + pg->pg = data_page; + pg->bp_count_diff = PAGE_SIZE - pg->count; + pg->count = PAGE_SIZE; + pg->bp_off_diff = pg->off & ~PAGE_MASK; + pg->off = pg->off & PAGE_MASK; + } + } + for (niocount = i = 1; i < page_count; i++) { if (!can_merge_pages(pga[i - 1], pga[i])) niocount++; @@ -1135,9 +1463,9 @@ osc_brw_prep_request(int cmd, struct client_obd *cli, struct obdo *oa, for (i = 0; i < page_count; i++) short_io_size += pga[i]->count; - /* Check if we can do a short io. */ - if (!(short_io_size <= cli->cl_short_io_bytes && niocount == 1 && - imp_connect_shortio(cli->cl_import))) + /* Check if read/write is small enough to be a short io. */ + if (short_io_size > cli->cl_max_short_io_bytes || niocount > 1 || + !imp_connect_shortio(cli->cl_import)) short_io_size = 0; req_capsule_set_size(pill, &RMF_SHORT_IO, RCL_CLIENT, @@ -1167,8 +1495,7 @@ osc_brw_prep_request(int cmd, struct client_obd *cli, struct obdo *oa, desc = ptlrpc_prep_bulk_imp(req, page_count, cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS, (opc == OST_WRITE ? PTLRPC_BULK_GET_SOURCE : - PTLRPC_BULK_PUT_SINK) | - PTLRPC_BULK_BUF_KIOV, + PTLRPC_BULK_PUT_SINK), OST_BULK_PORTAL, &ptlrpc_bulk_kiov_pin_ops); @@ -1244,13 +1571,13 @@ no_bulk: LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) == (pg->flag & OBD_BRW_SRVLOCK)); if (short_io_size != 0 && opc == OST_WRITE) { - unsigned char *ptr = ll_kmap_atomic(pg->pg, KM_USER0); + unsigned char *ptr = kmap_atomic(pg->pg); LASSERT(short_io_size >= requested_nob + pg->count); memcpy(short_io_buf + requested_nob, ptr + poff, pg->count); - ll_kunmap_atomic(ptr, KM_USER0); + kunmap_atomic(ptr); } else if (short_io_size == 0) { desc->bd_frag_ops->add_kiov_frag(desc, pg->pg, poff, pg->count); @@ -1296,12 +1623,14 @@ no_bulk: if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) body->oa.o_flags = 0; - body->oa.o_flags |= cksum_type_pack(cksum_type); + body->oa.o_flags |= obd_cksum_type_pack(obd_name, + cksum_type); body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS; - rc = osc_checksum_bulk(requested_nob, page_count, - pga, OST_WRITE, cksum_type, - &body->oa.o_cksum); + rc = osc_checksum_bulk_rw(obd_name, cksum_type, + requested_nob, page_count, + pga, OST_WRITE, + &body->oa.o_cksum); if (rc < 0) { CDEBUG(D_PAGE, "failed to checksum, rc = %d\n", rc); @@ -1312,7 +1641,8 @@ no_bulk: /* save this in 'oa', too, for later checking */ oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS; - oa->o_flags |= cksum_type_pack(cksum_type); + oa->o_flags |= obd_cksum_type_pack(obd_name, + cksum_type); } else { /* clear out the checksum flag, in case this is a * resend but cl_checksum is no longer set. b=11238 */ @@ -1327,9 +1657,10 @@ no_bulk: !sptlrpc_flavor_has_bulk(&req->rq_flvr)) { if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) body->oa.o_flags = 0; - body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type); + body->oa.o_flags |= obd_cksum_type_pack(obd_name, + cli->cl_cksum_type); body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS; - } + } /* Client cksum has been already copied to wire obdo in previous * lustre_set_wire_obdo(), and in the case a bulk-read is being @@ -1338,8 +1669,7 @@ no_bulk: } ptlrpc_request_set_replen(req); - CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args)); - aa = ptlrpc_req_async_args(req); + aa = ptlrpc_req_async_args(aa, req); aa->aa_oa = oa; aa->aa_requested_nob = requested_nob; aa->aa_nio_count = niocount; @@ -1417,21 +1747,23 @@ static void dump_all_bulk_pages(struct obdo *oa, __u32 page_count, kunmap(pga[i]->pg); } - rc = ll_vfs_fsync_range(filp, 0, LLONG_MAX, 1); + rc = vfs_fsync_range(filp, 0, LLONG_MAX, 1); if (rc) CERROR("%s: sync returns %d\n", dbgcksum_file_name, rc); filp_close(filp, NULL); - return; } static int check_write_checksum(struct obdo *oa, const struct lnet_process_id *peer, - __u32 client_cksum, __u32 server_cksum, - struct osc_brw_async_args *aa) + __u32 client_cksum, __u32 server_cksum, + struct osc_brw_async_args *aa) { - __u32 new_cksum; - char *msg; + const char *obd_name = aa->aa_cli->cl_import->imp_obd->obd_name; enum cksum_types cksum_type; + obd_dif_csum_fn *fn = NULL; + int sector_size = 0; + __u32 new_cksum; + char *msg; int rc; if (server_cksum == client_cksum) { @@ -1443,15 +1775,43 @@ check_write_checksum(struct obdo *oa, const struct lnet_process_id *peer, dump_all_bulk_pages(oa, aa->aa_page_count, aa->aa_ppga, server_cksum, client_cksum); - cksum_type = cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ? - oa->o_flags : 0); - rc = osc_checksum_bulk(aa->aa_requested_nob, aa->aa_page_count, - aa->aa_ppga, OST_WRITE, cksum_type, - &new_cksum); + cksum_type = obd_cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ? + oa->o_flags : 0); + + switch (cksum_type) { + case OBD_CKSUM_T10IP512: + fn = obd_dif_ip_fn; + sector_size = 512; + break; + case OBD_CKSUM_T10IP4K: + fn = obd_dif_ip_fn; + sector_size = 4096; + break; + case OBD_CKSUM_T10CRC512: + fn = obd_dif_crc_fn; + sector_size = 512; + break; + case OBD_CKSUM_T10CRC4K: + fn = obd_dif_crc_fn; + sector_size = 4096; + break; + default: + break; + } + + if (fn) + rc = osc_checksum_bulk_t10pi(obd_name, aa->aa_requested_nob, + aa->aa_page_count, aa->aa_ppga, + OST_WRITE, fn, sector_size, + &new_cksum); + else + rc = osc_checksum_bulk(aa->aa_requested_nob, aa->aa_page_count, + aa->aa_ppga, OST_WRITE, cksum_type, + &new_cksum); if (rc < 0) msg = "failed to calculate the client write checksum"; - else if (cksum_type != cksum_type_unpack(aa->aa_oa->o_flags)) + else if (cksum_type != obd_cksum_type_unpack(aa->aa_oa->o_flags)) msg = "the server did not use the checksum type specified in " "the original request - likely a protocol problem"; else if (new_cksum == server_cksum) @@ -1467,15 +1827,15 @@ check_write_checksum(struct obdo *oa, const struct lnet_process_id *peer, DFID " object "DOSTID" extent [%llu-%llu], original " "client csum %x (type %x), server csum %x (type %x)," " client csum now %x\n", - aa->aa_cli->cl_import->imp_obd->obd_name, - msg, libcfs_nid2str(peer->nid), + obd_name, msg, libcfs_nid2str(peer->nid), oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0, oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0, oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0, POSTID(&oa->o_oi), aa->aa_ppga[0]->off, aa->aa_ppga[aa->aa_page_count - 1]->off + aa->aa_ppga[aa->aa_page_count-1]->count - 1, - client_cksum, cksum_type_unpack(aa->aa_oa->o_flags), + client_cksum, + obd_cksum_type_unpack(aa->aa_oa->o_flags), server_cksum, cksum_type, new_cksum); return 1; } @@ -1483,25 +1843,28 @@ check_write_checksum(struct obdo *oa, const struct lnet_process_id *peer, /* Note rc enters this function as number of bytes transferred */ static int osc_brw_fini_request(struct ptlrpc_request *req, int rc) { - struct osc_brw_async_args *aa = (void *)&req->rq_async_args; + struct osc_brw_async_args *aa = (void *)&req->rq_async_args; + struct client_obd *cli = aa->aa_cli; + const char *obd_name = cli->cl_import->imp_obd->obd_name; const struct lnet_process_id *peer = - &req->rq_import->imp_connection->c_peer; - struct client_obd *cli = aa->aa_cli; - struct ost_body *body; + &req->rq_import->imp_connection->c_peer; + struct ost_body *body; u32 client_cksum = 0; - ENTRY; + struct inode *inode; - if (rc < 0 && rc != -EDQUOT) { - DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc); - RETURN(rc); - } + ENTRY; - LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc); - body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY); - if (body == NULL) { - DEBUG_REQ(D_INFO, req, "Can't unpack body\n"); - RETURN(-EPROTO); - } + if (rc < 0 && rc != -EDQUOT) { + DEBUG_REQ(D_INFO, req, "Failed request: rc = %d", rc); + RETURN(rc); + } + + LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc); + body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY); + if (body == NULL) { + DEBUG_REQ(D_INFO, req, "cannot unpack body"); + RETURN(-EPROTO); + } /* set/clear over quota flag for a uid/gid/projid */ if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE && @@ -1509,42 +1872,45 @@ static int osc_brw_fini_request(struct ptlrpc_request *req, int rc) unsigned qid[LL_MAXQUOTAS] = { body->oa.o_uid, body->oa.o_gid, body->oa.o_projid }; - CDEBUG(D_QUOTA, "setdq for [%u %u %u] with valid %#llx, flags %x\n", + CDEBUG(D_QUOTA, + "setdq for [%u %u %u] with valid %#llx, flags %x\n", body->oa.o_uid, body->oa.o_gid, body->oa.o_projid, body->oa.o_valid, body->oa.o_flags); - osc_quota_setdq(cli, qid, body->oa.o_valid, + osc_quota_setdq(cli, req->rq_xid, qid, body->oa.o_valid, body->oa.o_flags); - } + } - osc_update_grant(cli, body); + osc_update_grant(cli, body); - if (rc < 0) - RETURN(rc); + if (rc < 0) + RETURN(rc); - if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM) - client_cksum = aa->aa_oa->o_cksum; /* save for later */ + if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM) + client_cksum = aa->aa_oa->o_cksum; /* save for later */ - if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) { - if (rc > 0) { - CERROR("Unexpected +ve rc %d\n", rc); - RETURN(-EPROTO); - } + if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) { + if (rc > 0) { + CERROR("%s: unexpected positive size %d\n", + obd_name, rc); + RETURN(-EPROTO); + } if (req->rq_bulk != NULL && sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk)) - RETURN(-EAGAIN); + RETURN(-EAGAIN); - if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum && - check_write_checksum(&body->oa, peer, client_cksum, + if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum && + check_write_checksum(&body->oa, peer, client_cksum, body->oa.o_cksum, aa)) - RETURN(-EAGAIN); + RETURN(-EAGAIN); - rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count, - aa->aa_page_count, aa->aa_ppga); - GOTO(out, rc); - } + rc = check_write_rcs(req, aa->aa_requested_nob, + aa->aa_nio_count, aa->aa_page_count, + aa->aa_ppga); + GOTO(out, rc); + } - /* The rest of this function executes only for OST_READs */ + /* The rest of this function executes only for OST_READs */ if (req->rq_bulk == NULL) { rc = req_capsule_get_size(&req->rq_pill, &RMF_SHORT_IO, @@ -1554,20 +1920,20 @@ static int osc_brw_fini_request(struct ptlrpc_request *req, int rc) /* if unwrap_bulk failed, return -EAGAIN to retry */ rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc); } - if (rc < 0) - GOTO(out, rc = -EAGAIN); + if (rc < 0) + GOTO(out, rc = -EAGAIN); - if (rc > aa->aa_requested_nob) { - CERROR("Unexpected rc %d (%d requested)\n", rc, - aa->aa_requested_nob); - RETURN(-EPROTO); - } + if (rc > aa->aa_requested_nob) { + CERROR("%s: unexpected size %d, requested %d\n", obd_name, + rc, aa->aa_requested_nob); + RETURN(-EPROTO); + } if (req->rq_bulk != NULL && rc != req->rq_bulk->bd_nob_transferred) { - CERROR ("Unexpected rc %d (%d transferred)\n", - rc, req->rq_bulk->bd_nob_transferred); - return (-EPROTO); - } + CERROR("%s: unexpected size %d, transferred %d\n", obd_name, + rc, req->rq_bulk->bd_nob_transferred); + RETURN(-EPROTO); + } if (req->rq_bulk == NULL) { /* short io */ @@ -1586,10 +1952,10 @@ static int osc_brw_fini_request(struct ptlrpc_request *req, int rc) CDEBUG(D_CACHE, "page %p count %d\n", aa->aa_ppga[i]->pg, count); - ptr = ll_kmap_atomic(aa->aa_ppga[i]->pg, KM_USER0); + ptr = kmap_atomic(aa->aa_ppga[i]->pg); memcpy(ptr + (aa->aa_ppga[i]->off & ~PAGE_MASK), buf, count); - ll_kunmap_atomic((void *) ptr, KM_USER0); + kunmap_atomic((void *) ptr); buf += count; nob -= count; @@ -1607,16 +1973,16 @@ static int osc_brw_fini_request(struct ptlrpc_request *req, int rc) char *via = ""; char *router = ""; enum cksum_types cksum_type; - - cksum_type = cksum_type_unpack(body->oa.o_valid &OBD_MD_FLFLAGS? - body->oa.o_flags : 0); - rc = osc_checksum_bulk(rc, aa->aa_page_count, aa->aa_ppga, - OST_READ, cksum_type, &client_cksum); - if (rc < 0) { - CDEBUG(D_PAGE, - "failed to calculate checksum, rc = %d\n", rc); + u32 o_flags = body->oa.o_valid & OBD_MD_FLFLAGS ? + body->oa.o_flags : 0; + + cksum_type = obd_cksum_type_unpack(o_flags); + rc = osc_checksum_bulk_rw(obd_name, cksum_type, rc, + aa->aa_page_count, aa->aa_ppga, + OST_READ, &client_cksum); + if (rc < 0) GOTO(out, rc); - } + if (req->rq_bulk != NULL && peer->nid != req->rq_bulk->bd_sender) { via = " via "; @@ -1638,7 +2004,7 @@ static int osc_brw_fini_request(struct ptlrpc_request *req, int rc) "%s%s%s inode "DFID" object "DOSTID " extent [%llu-%llu], client %x, " "server %x, cksum_type %x\n", - req->rq_import->imp_obd->obd_name, + obd_name, libcfs_nid2str(peer->nid), via, router, clbody->oa.o_valid & OBD_MD_FLFID ? @@ -1661,32 +2027,64 @@ static int osc_brw_fini_request(struct ptlrpc_request *req, int rc) CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum); rc = 0; } - } else if (unlikely(client_cksum)) { - static int cksum_missed; + } else if (unlikely(client_cksum)) { + static int cksum_missed; + + cksum_missed++; + if ((cksum_missed & (-cksum_missed)) == cksum_missed) + CERROR("%s: checksum %u requested from %s but not sent\n", + obd_name, cksum_missed, + libcfs_nid2str(peer->nid)); + } else { + rc = 0; + } + + inode = page2inode(aa->aa_ppga[0]->pg); + if (inode && IS_ENCRYPTED(inode)) { + int idx; + + if (!llcrypt_has_encryption_key(inode)) { + CDEBUG(D_SEC, "no enc key for ino %lu\n", inode->i_ino); + GOTO(out, rc); + } + for (idx = 0; idx < aa->aa_page_count; idx++) { + struct brw_page *pg = aa->aa_ppga[idx]; + __u64 *p, *q; + + /* do not decrypt if page is all 0s */ + p = q = page_address(pg->pg); + while (p - q < PAGE_SIZE / sizeof(*p)) { + if (*p != 0) + break; + p++; + } + if (p - q == PAGE_SIZE / sizeof(*p)) + continue; + + rc = llcrypt_decrypt_pagecache_blocks(pg->pg, + PAGE_SIZE, 0); + if (rc) + GOTO(out, rc); + } + } - cksum_missed++; - if ((cksum_missed & (-cksum_missed)) == cksum_missed) - CERROR("Checksum %u requested from %s but not sent\n", - cksum_missed, libcfs_nid2str(peer->nid)); - } else { - rc = 0; - } out: if (rc >= 0) lustre_get_wire_obdo(&req->rq_import->imp_connect_data, aa->aa_oa, &body->oa); - RETURN(rc); + RETURN(rc); } static int osc_brw_redo_request(struct ptlrpc_request *request, struct osc_brw_async_args *aa, int rc) { - struct ptlrpc_request *new_req; - struct osc_brw_async_args *new_aa; - struct osc_async_page *oap; - ENTRY; + struct ptlrpc_request *new_req; + struct osc_brw_async_args *new_aa; + struct osc_async_page *oap; + ENTRY; + /* The below message is checked in replay-ost-single.sh test_8ae*/ DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request, "redo for recoverable error %d", rc); @@ -1698,21 +2096,19 @@ static int osc_brw_redo_request(struct ptlrpc_request *request, RETURN(rc); list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) { - if (oap->oap_request != NULL) { - LASSERTF(request == oap->oap_request, - "request %p != oap_request %p\n", - request, oap->oap_request); - if (oap->oap_interrupted) { - ptlrpc_req_finished(new_req); - RETURN(-EINTR); - } - } - } - /* New request takes over pga and oaps from old request. - * Note that copying a list_head doesn't work, need to move it... */ - aa->aa_resends++; - new_req->rq_interpret_reply = request->rq_interpret_reply; - new_req->rq_async_args = request->rq_async_args; + if (oap->oap_request != NULL) { + LASSERTF(request == oap->oap_request, + "request %p != oap_request %p\n", + request, oap->oap_request); + } + } + /* + * New request takes over pga and oaps from old request. + * Note that copying a list_head doesn't work, need to move it... + */ + aa->aa_resends++; + new_req->rq_interpret_reply = request->rq_interpret_reply; + new_req->rq_async_args = request->rq_async_args; new_req->rq_commit_cb = request->rq_commit_cb; /* cap resend delay to the current request timeout, this is similar to * what ptlrpc does (see after_reply()) */ @@ -1723,7 +2119,7 @@ static int osc_brw_redo_request(struct ptlrpc_request *request, new_req->rq_generation_set = 1; new_req->rq_import_generation = request->rq_import_generation; - new_aa = ptlrpc_req_async_args(new_req); + new_aa = ptlrpc_req_async_args(new_aa, new_req); INIT_LIST_HEAD(&new_aa->aa_oaps); list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps); @@ -1781,24 +2177,31 @@ static void sort_brw_pages(struct brw_page **array, int num) static void osc_release_ppga(struct brw_page **ppga, size_t count) { - LASSERT(ppga != NULL); - OBD_FREE(ppga, sizeof(*ppga) * count); + LASSERT(ppga != NULL); + OBD_FREE_PTR_ARRAY(ppga, count); } static int brw_interpret(const struct lu_env *env, - struct ptlrpc_request *req, void *data, int rc) + struct ptlrpc_request *req, void *args, int rc) { - struct osc_brw_async_args *aa = data; + struct osc_brw_async_args *aa = args; struct osc_extent *ext; struct osc_extent *tmp; struct client_obd *cli = aa->aa_cli; - unsigned long transferred = 0; - ENTRY; + unsigned long transferred = 0; - rc = osc_brw_fini_request(req, rc); - CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc); - /* When server return -EINPROGRESS, client should always retry - * regardless of the number of times the bulk was resent already. */ + ENTRY; + + rc = osc_brw_fini_request(req, rc); + CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc); + + /* restore clear text pages */ + osc_release_bounce_pages(aa->aa_ppga, aa->aa_page_count); + + /* + * When server returns -EINPROGRESS, client should always retry + * regardless of the number of times the bulk was resent already. + */ if (osc_recoverable_error(rc) && !req->rq_no_delay) { if (req->rq_import_generation != req->rq_import->imp_generation) { @@ -1873,7 +2276,8 @@ static int brw_interpret(const struct lu_env *env, cl_object_attr_update(env, obj, attr, valid); cl_object_attr_unlock(obj); } - OBDO_FREE(aa->aa_oa); + OBD_SLAB_FREE_PTR(aa->aa_oa, osc_obdo_kmem); + aa->aa_oa = NULL; if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE && rc == 0) osc_inc_unstable_pages(req); @@ -1944,17 +2348,17 @@ int osc_build_rpc(const struct lu_env *env, struct client_obd *cli, struct cl_req_attr *crattr = NULL; loff_t starting_offset = OBD_OBJECT_EOF; loff_t ending_offset = 0; - int mpflag = 0; + /* '1' for consistency with code that checks !mpflag to restore */ + int mpflag = 1; int mem_tight = 0; int page_count = 0; bool soft_sync = false; - bool interrupted = false; bool ndelay = false; int i; int grant = 0; int rc; __u32 layout_version = 0; - struct list_head rpc_list = LIST_HEAD_INIT(rpc_list); + LIST_HEAD(rpc_list); struct ost_body *body; ENTRY; LASSERT(!list_empty(ext_list)); @@ -1965,20 +2369,20 @@ int osc_build_rpc(const struct lu_env *env, struct client_obd *cli, mem_tight |= ext->oe_memalloc; grant += ext->oe_grants; page_count += ext->oe_nr_pages; - layout_version = MAX(layout_version, ext->oe_layout_version); + layout_version = max(layout_version, ext->oe_layout_version); if (obj == NULL) obj = ext->oe_obj; } soft_sync = osc_over_unstable_soft_limit(cli); if (mem_tight) - mpflag = cfs_memory_pressure_get_and_set(); + mpflag = memalloc_noreclaim_save(); - OBD_ALLOC(pga, sizeof(*pga) * page_count); + OBD_ALLOC_PTR_ARRAY(pga, page_count); if (pga == NULL) GOTO(out, rc = -ENOMEM); - OBDO_ALLOC(oa); + OBD_SLAB_ALLOC_PTR_GFP(oa, osc_obdo_kmem, GFP_NOFS); if (oa == NULL) GOTO(out, rc = -ENOMEM); @@ -2005,8 +2409,6 @@ int osc_build_rpc(const struct lu_env *env, struct client_obd *cli, else LASSERT(oap->oap_page_off + oap->oap_count == PAGE_SIZE); - if (oap->oap_interrupted) - interrupted = true; } if (ext->oe_ndelay) ndelay = true; @@ -2045,8 +2447,6 @@ int osc_build_rpc(const struct lu_env *env, struct client_obd *cli, req->rq_interpret_reply = brw_interpret; req->rq_memalloc = mem_tight != 0; oap->oap_request = ptlrpc_request_addref(req); - if (interrupted && !req->rq_intr) - ptlrpc_mark_interrupted(req); if (ndelay) { req->rq_no_resend = req->rq_no_delay = 1; /* probably set a shorter timeout value. @@ -2065,8 +2465,7 @@ int osc_build_rpc(const struct lu_env *env, struct client_obd *cli, cl_req_attr_set(env, osc2cl(obj), crattr); lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid); - CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args)); - aa = ptlrpc_req_async_args(req); + aa = ptlrpc_req_async_args(aa, req); INIT_LIST_HEAD(&aa->aa_oaps); list_splice_init(&rpc_list, &aa->aa_oaps); INIT_LIST_HEAD(&aa->aa_exts); @@ -2089,7 +2488,7 @@ int osc_build_rpc(const struct lu_env *env, struct client_obd *cli, } spin_unlock(&cli->cl_loi_list_lock); - DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %ur/%uw in flight", + DEBUG_REQ(D_INODE, req, "%d pages, aa %p, now %ur/%uw in flight", page_count, aa, cli->cl_r_in_flight, cli->cl_w_in_flight); OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_DELAY_IO, cfs_fail_val); @@ -2099,16 +2498,18 @@ int osc_build_rpc(const struct lu_env *env, struct client_obd *cli, EXIT; out: - if (mem_tight != 0) - cfs_memory_pressure_restore(mpflag); + if (mem_tight) + memalloc_noreclaim_restore(mpflag); if (rc != 0) { LASSERT(req == NULL); if (oa) - OBDO_FREE(oa); - if (pga) - OBD_FREE(pga, sizeof(*pga) * page_count); + OBD_SLAB_FREE_PTR(oa, osc_obdo_kmem); + if (pga) { + osc_release_bounce_pages(pga, page_count); + osc_release_ppga(pga, page_count); + } /* this should happen rarely and is pretty bad, it makes the * pending list not follow the dirty order */ while (!list_empty(ext_list)) { @@ -2178,8 +2579,9 @@ int osc_enqueue_fini(struct ptlrpc_request *req, osc_enqueue_upcall_f upcall, } int osc_enqueue_interpret(const struct lu_env *env, struct ptlrpc_request *req, - struct osc_enqueue_args *aa, int rc) + void *args, int rc) { + struct osc_enqueue_args *aa = args; struct ldlm_lock *lock; struct lustre_handle *lockh = &aa->oa_lockh; enum ldlm_mode mode = aa->oa_mode; @@ -2229,8 +2631,6 @@ int osc_enqueue_interpret(const struct lu_env *env, struct ptlrpc_request *req, RETURN(rc); } -struct ptlrpc_request_set *PTLRPCD_SET = (void *)1; - /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock * from the 2nd OSC before a lock from the 1st one. This does not deadlock with * other synchronous requests, however keeping some locks and trying to obtain @@ -2240,9 +2640,8 @@ struct ptlrpc_request_set *PTLRPCD_SET = (void *)1; * release locks just after they are obtained. */ int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id, __u64 *flags, union ldlm_policy_data *policy, - struct ost_lvb *lvb, int kms_valid, - osc_enqueue_upcall_f upcall, void *cookie, - struct ldlm_enqueue_info *einfo, + struct ost_lvb *lvb, osc_enqueue_upcall_f upcall, + void *cookie, struct ldlm_enqueue_info *einfo, struct ptlrpc_request_set *rqset, int async, bool speculative) { @@ -2260,15 +2659,6 @@ int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id, policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK; policy->l_extent.end |= ~PAGE_MASK; - /* - * kms is not valid when either object is completely fresh (so that no - * locks are cached), or object was evicted. In the latter case cached - * lock cannot be used, because it would prime inode state with - * potentially stale LVB. - */ - if (!kms_valid) - goto no_match; - /* Next, search for already existing extent locks that will cover us */ /* If we're trying to read, we also search for an existing PW lock. The * VFS and page cache already protect us locally, so lots of readers/ @@ -2331,7 +2721,6 @@ int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id, } } -no_match: if (*flags & (LDLM_FL_TEST_LOCK | LDLM_FL_MATCH_LOCK)) RETURN(-ENOLCK); @@ -2360,8 +2749,7 @@ no_match: if (async) { if (!rc) { struct osc_enqueue_args *aa; - CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args)); - aa = ptlrpc_req_async_args(req); + aa = ptlrpc_req_async_args(aa, req); aa->oa_exp = exp; aa->oa_mode = einfo->ei_mode; aa->oa_type = einfo->ei_type; @@ -2380,12 +2768,8 @@ no_match: aa->oa_flags = NULL; } - req->rq_interpret_reply = - (ptlrpc_interpterer_t)osc_enqueue_interpret; - if (rqset == PTLRPCD_SET) - ptlrpcd_add_req(req); - else - ptlrpc_set_add_req(rqset, req); + req->rq_interpret_reply = osc_enqueue_interpret; + ptlrpc_set_add_req(rqset, req); } else if (intent) { ptlrpc_req_finished(req); } @@ -2400,9 +2784,10 @@ no_match: RETURN(rc); } -int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id, - enum ldlm_type type, union ldlm_policy_data *policy, - enum ldlm_mode mode, __u64 *flags, void *data, +int osc_match_base(const struct lu_env *env, struct obd_export *exp, + struct ldlm_res_id *res_id, enum ldlm_type type, + union ldlm_policy_data *policy, enum ldlm_mode mode, + __u64 *flags, struct osc_object *obj, struct lustre_handle *lockh, int unref) { struct obd_device *obd = exp->exp_obd; @@ -2430,11 +2815,19 @@ int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id, if (rc == 0 || lflags & LDLM_FL_TEST_LOCK) RETURN(rc); - if (data != NULL) { + if (obj != NULL) { struct ldlm_lock *lock = ldlm_handle2lock(lockh); LASSERT(lock != NULL); - if (!osc_set_lock_data(lock, data)) { + if (osc_set_lock_data(lock, obj)) { + lock_res_and_lock(lock); + if (!ldlm_is_lvb_cached(lock)) { + LASSERT(lock->l_ast_data == obj); + osc_lock_lvb_update(env, obj, lock, NULL); + ldlm_set_lvb_cached(lock); + } + unlock_res_and_lock(lock); + } else { ldlm_lock_decref(lockh, rc); rc = 0; } @@ -2444,36 +2837,36 @@ int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id, } static int osc_statfs_interpret(const struct lu_env *env, - struct ptlrpc_request *req, - struct osc_async_args *aa, int rc) + struct ptlrpc_request *req, void *args, int rc) { - struct obd_statfs *msfs; - ENTRY; + struct osc_async_args *aa = args; + struct obd_statfs *msfs; - if (rc == -EBADR) - /* The request has in fact never been sent - * due to issues at a higher level (LOV). - * Exit immediately since the caller is - * aware of the problem and takes care - * of the clean up */ - RETURN(rc); + ENTRY; + if (rc == -EBADR) + /* + * The request has in fact never been sent due to issues at + * a higher level (LOV). Exit immediately since the caller + * is aware of the problem and takes care of the clean up. + */ + RETURN(rc); - if ((rc == -ENOTCONN || rc == -EAGAIN) && - (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY)) - GOTO(out, rc = 0); + if ((rc == -ENOTCONN || rc == -EAGAIN) && + (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY)) + GOTO(out, rc = 0); - if (rc != 0) - GOTO(out, rc); + if (rc != 0) + GOTO(out, rc); - msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS); - if (msfs == NULL) { + msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS); + if (msfs == NULL) GOTO(out, rc = -EPROTO); - } - *aa->aa_oi->oi_osfs = *msfs; + *aa->aa_oi->oi_osfs = *msfs; out: - rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc); - RETURN(rc); + rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc); + + RETURN(rc); } static int osc_statfs_async(struct obd_export *exp, @@ -2483,9 +2876,25 @@ static int osc_statfs_async(struct obd_export *exp, struct obd_device *obd = class_exp2obd(exp); struct ptlrpc_request *req; struct osc_async_args *aa; - int rc; + int rc; ENTRY; + if (obd->obd_osfs_age >= max_age) { + CDEBUG(D_SUPER, + "%s: use %p cache blocks %llu/%llu objects %llu/%llu\n", + obd->obd_name, &obd->obd_osfs, + obd->obd_osfs.os_bavail, obd->obd_osfs.os_blocks, + obd->obd_osfs.os_ffree, obd->obd_osfs.os_files); + spin_lock(&obd->obd_osfs_lock); + memcpy(oinfo->oi_osfs, &obd->obd_osfs, sizeof(*oinfo->oi_osfs)); + spin_unlock(&obd->obd_osfs_lock); + oinfo->oi_flags |= OBD_STATFS_FROM_CACHE; + if (oinfo->oi_cb_up) + oinfo->oi_cb_up(oinfo, 0); + + RETURN(0); + } + /* We could possibly pass max_age in the request (as an absolute * timestamp or a "seconds.usec ago") so the target can avoid doing * extra calls into the filesystem if that isn't necessary (e.g. @@ -2501,19 +2910,18 @@ static int osc_statfs_async(struct obd_export *exp, ptlrpc_request_free(req); RETURN(rc); } - ptlrpc_request_set_replen(req); - req->rq_request_portal = OST_CREATE_PORTAL; - ptlrpc_at_set_req_timeout(req); + ptlrpc_request_set_replen(req); + req->rq_request_portal = OST_CREATE_PORTAL; + ptlrpc_at_set_req_timeout(req); - if (oinfo->oi_flags & OBD_STATFS_NODELAY) { - /* procfs requests not want stat in wait for avoid deadlock */ - req->rq_no_resend = 1; - req->rq_no_delay = 1; - } + if (oinfo->oi_flags & OBD_STATFS_NODELAY) { + /* procfs requests not want stat in wait for avoid deadlock */ + req->rq_no_resend = 1; + req->rq_no_delay = 1; + } - req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret; - CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args)); - aa = ptlrpc_req_async_args(req); + req->rq_interpret_reply = osc_statfs_interpret; + aa = ptlrpc_req_async_args(aa, req); aa->aa_oi = oinfo; ptlrpc_set_add_req(rqset, req); @@ -2523,12 +2931,13 @@ static int osc_statfs_async(struct obd_export *exp, static int osc_statfs(const struct lu_env *env, struct obd_export *exp, struct obd_statfs *osfs, time64_t max_age, __u32 flags) { - struct obd_device *obd = class_exp2obd(exp); - struct obd_statfs *msfs; - struct ptlrpc_request *req; - struct obd_import *imp = NULL; - int rc; - ENTRY; + struct obd_device *obd = class_exp2obd(exp); + struct obd_statfs *msfs; + struct ptlrpc_request *req; + struct obd_import *imp = NULL; + int rc; + ENTRY; + /*Since the request might also come from lprocfs, so we need *sync this with client_disconnect_export Bug15684*/ @@ -2539,86 +2948,83 @@ static int osc_statfs(const struct lu_env *env, struct obd_export *exp, if (!imp) RETURN(-ENODEV); - /* We could possibly pass max_age in the request (as an absolute - * timestamp or a "seconds.usec ago") so the target can avoid doing - * extra calls into the filesystem if that isn't necessary (e.g. - * during mount that would help a bit). Having relative timestamps - * is not so great if request processing is slow, while absolute - * timestamps are not ideal because they need time synchronization. */ - req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS); + /* We could possibly pass max_age in the request (as an absolute + * timestamp or a "seconds.usec ago") so the target can avoid doing + * extra calls into the filesystem if that isn't necessary (e.g. + * during mount that would help a bit). Having relative timestamps + * is not so great if request processing is slow, while absolute + * timestamps are not ideal because they need time synchronization. */ + req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS); - class_import_put(imp); + class_import_put(imp); - if (req == NULL) - RETURN(-ENOMEM); + if (req == NULL) + RETURN(-ENOMEM); - rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS); - if (rc) { - ptlrpc_request_free(req); - RETURN(rc); - } - ptlrpc_request_set_replen(req); - req->rq_request_portal = OST_CREATE_PORTAL; - ptlrpc_at_set_req_timeout(req); + rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS); + if (rc) { + ptlrpc_request_free(req); + RETURN(rc); + } + ptlrpc_request_set_replen(req); + req->rq_request_portal = OST_CREATE_PORTAL; + ptlrpc_at_set_req_timeout(req); - if (flags & OBD_STATFS_NODELAY) { - /* procfs requests not want stat in wait for avoid deadlock */ - req->rq_no_resend = 1; - req->rq_no_delay = 1; - } + if (flags & OBD_STATFS_NODELAY) { + /* procfs requests not want stat in wait for avoid deadlock */ + req->rq_no_resend = 1; + req->rq_no_delay = 1; + } - rc = ptlrpc_queue_wait(req); - if (rc) - GOTO(out, rc); + rc = ptlrpc_queue_wait(req); + if (rc) + GOTO(out, rc); - msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS); - if (msfs == NULL) { - GOTO(out, rc = -EPROTO); - } + msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS); + if (msfs == NULL) + GOTO(out, rc = -EPROTO); - *osfs = *msfs; + *osfs = *msfs; - EXIT; - out: - ptlrpc_req_finished(req); - return rc; + EXIT; +out: + ptlrpc_req_finished(req); + return rc; } static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len, void *karg, void __user *uarg) { - struct obd_device *obd = exp->exp_obd; - struct obd_ioctl_data *data = karg; - int err = 0; - ENTRY; + struct obd_device *obd = exp->exp_obd; + struct obd_ioctl_data *data = karg; + int rc = 0; + ENTRY; if (!try_module_get(THIS_MODULE)) { CERROR("%s: cannot get module '%s'\n", obd->obd_name, module_name(THIS_MODULE)); return -EINVAL; } - switch (cmd) { - case OBD_IOC_CLIENT_RECOVER: - err = ptlrpc_recover_import(obd->u.cli.cl_import, - data->ioc_inlbuf1, 0); - if (err > 0) - err = 0; - GOTO(out, err); - case IOC_OSC_SET_ACTIVE: - err = ptlrpc_set_import_active(obd->u.cli.cl_import, - data->ioc_offset); - GOTO(out, err); - case OBD_IOC_PING_TARGET: - err = ptlrpc_obd_ping(obd); - GOTO(out, err); + switch (cmd) { + case OBD_IOC_CLIENT_RECOVER: + rc = ptlrpc_recover_import(obd->u.cli.cl_import, + data->ioc_inlbuf1, 0); + if (rc > 0) + rc = 0; + break; + case IOC_OSC_SET_ACTIVE: + rc = ptlrpc_set_import_active(obd->u.cli.cl_import, + data->ioc_offset); + break; default: - CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n", - cmd, current_comm()); - GOTO(out, err = -ENOTTY); + rc = -ENOTTY; + CDEBUG(D_INODE, "%s: unrecognised ioctl %#x by %s: rc = %d\n", + obd->obd_name, cmd, current->comm, rc); + break; } -out: + module_put(THIS_MODULE); - return err; + return rc; } int osc_set_info_async(const struct lu_env *env, struct obd_export *exp, @@ -2651,23 +3057,6 @@ int osc_set_info_async(const struct lu_env *env, struct obd_export *exp, RETURN(0); } - if (KEY_IS(KEY_CACHE_SET)) { - struct client_obd *cli = &obd->u.cli; - - LASSERT(cli->cl_cache == NULL); /* only once */ - cli->cl_cache = (struct cl_client_cache *)val; - cl_cache_incref(cli->cl_cache); - cli->cl_lru_left = &cli->cl_cache->ccc_lru_left; - - /* add this osc into entity list */ - LASSERT(list_empty(&cli->cl_lru_osc)); - spin_lock(&cli->cl_cache->ccc_lru_lock); - list_add(&cli->cl_lru_osc, &cli->cl_cache->ccc_lru); - spin_unlock(&cli->cl_cache->ccc_lru_lock); - - RETURN(0); - } - if (KEY_IS(KEY_CACHE_LRU_SHRINK)) { struct client_obd *cli = &obd->u.cli; long nr = atomic_long_read(&cli->cl_lru_in_list) >> 1; @@ -2716,9 +3105,8 @@ int osc_set_info_async(const struct lu_env *env, struct obd_export *exp, struct osc_grant_args *aa; struct obdo *oa; - CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args)); - aa = ptlrpc_req_async_args(req); - OBDO_ALLOC(oa); + aa = ptlrpc_req_async_args(aa, req); + OBD_SLAB_ALLOC_PTR_GFP(oa, osc_obdo_kmem, GFP_NOFS); if (!oa) { ptlrpc_req_finished(req); RETURN(-ENOMEM); @@ -2753,10 +3141,13 @@ int osc_reconnect(const struct lu_env *env, struct obd_export *exp, spin_lock(&cli->cl_loi_list_lock); grant = cli->cl_avail_grant + cli->cl_reserved_grant; - if (data->ocd_connect_flags & OBD_CONNECT_GRANT_PARAM) + if (data->ocd_connect_flags & OBD_CONNECT_GRANT_PARAM) { + /* restore ocd_grant_blkbits as client page bits */ + data->ocd_grant_blkbits = PAGE_SHIFT; grant += cli->cl_dirty_grant; - else + } else { grant += cli->cl_dirty_pages << PAGE_SHIFT; + } data->ocd_grant = grant ? : 2 * cli_brw_size(obd); lost_grant = cli->cl_lost_grant; cli->cl_lost_grant = 0; @@ -2776,27 +3167,24 @@ int osc_disconnect(struct obd_export *exp) struct obd_device *obd = class_exp2obd(exp); int rc; - rc = client_disconnect_export(exp); - /** - * Initially we put del_shrink_grant before disconnect_export, but it - * causes the following problem if setup (connect) and cleanup - * (disconnect) are tangled together. - * connect p1 disconnect p2 - * ptlrpc_connect_import - * ............... class_manual_cleanup - * osc_disconnect - * del_shrink_grant - * ptlrpc_connect_interrupt - * init_grant_shrink - * add this client to shrink list - * cleanup_osc - * Bang! pinger trigger the shrink. - * So the osc should be disconnected from the shrink list, after we - * are sure the import has been destroyed. BUG18662 - */ - if (obd->u.cli.cl_import == NULL) - osc_del_shrink_grant(&obd->u.cli); - return rc; + rc = client_disconnect_export(exp); + /** + * Initially we put del_shrink_grant before disconnect_export, but it + * causes the following problem if setup (connect) and cleanup + * (disconnect) are tangled together. + * connect p1 disconnect p2 + * ptlrpc_connect_import + * ............... class_manual_cleanup + * osc_disconnect + * del_shrink_grant + * ptlrpc_connect_interrupt + * osc_init_grant + * add this client to shrink list + * cleanup_osc + * Bang! grant shrink thread trigger the shrink. BUG18662 + */ + osc_del_grant_list(&obd->u.cli); + return rc; } EXPORT_SYMBOL(osc_disconnect); @@ -2921,7 +3309,7 @@ static int osc_cancel_weight(struct ldlm_lock *lock) * Cancel all unused and granted extent lock. */ if (lock->l_resource->lr_type == LDLM_EXTENT && - lock->l_granted_mode == lock->l_req_mode && + ldlm_is_granted(lock) && osc_ldlm_weigh_ast(lock) == 0) RETURN(1); @@ -2970,8 +3358,8 @@ int osc_setup_common(struct obd_device *obd, struct lustre_cfg *lcfg) GOTO(out_ptlrpcd_work, rc); cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL; + osc_update_next_shrink(cli); - INIT_LIST_HEAD(&cli->cl_grant_shrink_list); RETURN(rc); out_ptlrpcd_work: @@ -2993,7 +3381,6 @@ EXPORT_SYMBOL(osc_setup_common); int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg) { struct client_obd *cli = &obd->u.cli; - struct obd_type *type; int adding; int added; int req_count; @@ -3005,36 +3392,9 @@ int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg) if (rc < 0) RETURN(rc); -#ifdef CONFIG_PROC_FS - obd->obd_vars = lprocfs_osc_obd_vars; -#endif - /* If this is true then both client (osc) and server (osp) are on the - * same node. The osp layer if loaded first will register the osc proc - * directory. In that case this obd_device will be attached its proc - * tree to type->typ_procsym instead of obd->obd_type->typ_procroot. - */ - type = class_search_type(LUSTRE_OSP_NAME); - if (type && type->typ_procsym) { - obd->obd_proc_entry = lprocfs_register(obd->obd_name, - type->typ_procsym, - obd->obd_vars, obd); - if (IS_ERR(obd->obd_proc_entry)) { - rc = PTR_ERR(obd->obd_proc_entry); - CERROR("error %d setting up lprocfs for %s\n", rc, - obd->obd_name); - obd->obd_proc_entry = NULL; - } - } - - rc = lprocfs_obd_setup(obd, false); - if (!rc) { - /* If the basic OSC proc tree construction succeeded then - * lets do the rest. - */ - lproc_osc_attach_seqstat(obd); - sptlrpc_lprocfs_cliobd_attach(obd); - ptlrpc_lprocfs_register_obd(obd); - } + rc = osc_tunables_init(obd); + if (rc) + RETURN(rc); /* * We try to control the total number of requests with a upper limit @@ -3051,12 +3411,13 @@ int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg) atomic_add(added, &osc_pool_req_count); } - INIT_LIST_HEAD(&cli->cl_grant_shrink_list); ns_register_cancel(obd->obd_namespace, osc_cancel_weight); spin_lock(&osc_shrink_lock); list_add_tail(&cli->cl_shrink_list, &osc_shrink_list); spin_unlock(&osc_shrink_lock); + cli->cl_import->imp_idle_timeout = osc_idle_timeout; + cli->cl_import->imp_idle_debug = D_HA; RETURN(0); } @@ -3133,25 +3494,14 @@ int osc_cleanup_common(struct obd_device *obd) } EXPORT_SYMBOL(osc_cleanup_common); -int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg) -{ - int rc = class_process_proc_param(PARAM_OSC, obd->obd_vars, lcfg, obd); - return rc > 0 ? 0: rc; -} - -static int osc_process_config(struct obd_device *obd, size_t len, void *buf) -{ - return osc_process_config_base(obd, buf); -} - -static struct obd_ops osc_obd_ops = { +static const struct obd_ops osc_obd_ops = { .o_owner = THIS_MODULE, .o_setup = osc_setup, .o_precleanup = osc_precleanup, .o_cleanup = osc_cleanup_common, .o_add_conn = client_import_add_conn, .o_del_conn = client_import_del_conn, - .o_connect = client_connect_import, + .o_connect = client_connect_import, .o_reconnect = osc_reconnect, .o_disconnect = osc_disconnect, .o_statfs = osc_statfs, @@ -3163,12 +3513,11 @@ static struct obd_ops osc_obd_ops = { .o_iocontrol = osc_iocontrol, .o_set_info_async = osc_set_info_async, .o_import_event = osc_import_event, - .o_process_config = osc_process_config, .o_quotactl = osc_quotactl, }; static struct shrinker *osc_cache_shrinker; -struct list_head osc_shrink_list = LIST_HEAD_INIT(osc_shrink_list); +LIST_HEAD(osc_shrink_list); DEFINE_SPINLOCK(osc_shrink_lock); #ifndef HAVE_SHRINKER_COUNT @@ -3178,10 +3527,6 @@ static int osc_cache_shrink(SHRINKER_ARGS(sc, nr_to_scan, gfp_mask)) .nr_to_scan = shrink_param(sc, nr_to_scan), .gfp_mask = shrink_param(sc, gfp_mask) }; -#if !defined(HAVE_SHRINKER_WANT_SHRINK_PTR) && !defined(HAVE_SHRINK_CONTROL) - struct shrinker *shrinker = NULL; -#endif - (void)osc_cache_shrink_scan(shrinker, &scv); return osc_cache_shrink_count(shrinker, &scv); @@ -3190,8 +3535,6 @@ static int osc_cache_shrink(SHRINKER_ARGS(sc, nr_to_scan, gfp_mask)) static int __init osc_init(void) { - bool enable_proc = true; - struct obd_type *type; unsigned int reqpool_size; unsigned int reqsize; int rc; @@ -3208,11 +3551,7 @@ static int __init osc_init(void) if (rc) RETURN(rc); - type = class_search_type(LUSTRE_OSP_NAME); - if (type != NULL && type->typ_procsym != NULL) - enable_proc = false; - - rc = class_register_type(&osc_obd_ops, NULL, enable_proc, NULL, + rc = class_register_type(&osc_obd_ops, NULL, true, NULL, LUSTRE_OSC_NAME, &osc_device_type); if (rc) GOTO(out_kmem, rc); @@ -3241,19 +3580,28 @@ static int __init osc_init(void) osc_rq_pool = ptlrpc_init_rq_pool(0, OST_IO_MAXREQSIZE, ptlrpc_add_rqs_to_pool); - if (osc_rq_pool != NULL) - GOTO(out, rc); - rc = -ENOMEM; + if (osc_rq_pool == NULL) + GOTO(out_type, rc = -ENOMEM); + + rc = osc_start_grant_work(); + if (rc != 0) + GOTO(out_req_pool, rc); + + RETURN(rc); + +out_req_pool: + ptlrpc_free_rq_pool(osc_rq_pool); out_type: class_unregister_type(LUSTRE_OSC_NAME); out_kmem: lu_kmem_fini(osc_caches); -out: + RETURN(rc); } static void __exit osc_exit(void) { + osc_stop_grant_work(); remove_shrinker(osc_cache_shrinker); class_unregister_type(LUSTRE_OSC_NAME); lu_kmem_fini(osc_caches);