X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lustre%2Fosp%2Fosp_trans.c;h=f0cad0222cb7872508cba0279b067bcf0a0688fb;hb=8ed5937d25e436add1d8a137c71e76303152b801;hp=545fefd7d4a5a874705ca6ef4636e999321a0f2c;hpb=d10200a80770f0029d1d665af954187b9ad883df;p=fs%2Flustre-release.git diff --git a/lustre/osp/osp_trans.c b/lustre/osp/osp_trans.c index 545fefd..f0cad02 100644 --- a/lustre/osp/osp_trans.c +++ b/lustre/osp/osp_trans.c @@ -20,7 +20,7 @@ * GPL HEADER END */ /* - * Copyright (c) 2014, 2016, Intel Corporation. + * Copyright (c) 2014, 2017, Intel Corporation. */ /* * lustre/osp/osp_trans.c @@ -97,20 +97,6 @@ struct osp_update_callback { osp_update_interpreter_t ouc_interpreter; }; -static struct object_update_request *object_update_request_alloc(size_t size) -{ - struct object_update_request *ourq; - - OBD_ALLOC_LARGE(ourq, size); - if (ourq == NULL) - return ERR_PTR(-ENOMEM); - - ourq->ourq_magic = UPDATE_REQUEST_MAGIC; - ourq->ourq_count = 0; - - return ourq; -} - /** * Allocate new update request * @@ -126,21 +112,28 @@ int osp_object_update_request_create(struct osp_update_request *our, size_t size) { struct osp_update_request_sub *ours; + struct object_update_request *ourq; OBD_ALLOC_PTR(ours); if (ours == NULL) return -ENOMEM; - if (size < OUT_UPDATE_INIT_BUFFER_SIZE) - size = OUT_UPDATE_INIT_BUFFER_SIZE; - - ours->ours_req = object_update_request_alloc(size); - - if (IS_ERR(ours->ours_req)) { + /* The object update request will be added to an SG list for + * bulk transfer. Some IB HW cannot handle partial pages in SG + * lists (since they create gaps in memory regions) so we + * round the size up to the next multiple of PAGE_SIZE. See + * LU-9983. */ + LASSERT(size > 0); + size = round_up(size, PAGE_SIZE); + OBD_ALLOC_LARGE(ourq, size); + if (ourq == NULL) { OBD_FREE_PTR(ours); return -ENOMEM; } + ourq->ourq_magic = UPDATE_REQUEST_MAGIC; + ourq->ourq_count = 0; + ours->ours_req = ourq; ours->ours_req_size = size; INIT_LIST_HEAD(&ours->ours_list); list_add_tail(&ours->ours_list, &our->our_req_list); @@ -199,7 +192,7 @@ struct osp_update_request *osp_update_request_create(struct dt_device *dt) INIT_LIST_HEAD(&our->our_invalidate_cb_list); spin_lock_init(&our->our_list_lock); - rc = osp_object_update_request_create(our, OUT_UPDATE_INIT_BUFFER_SIZE); + rc = osp_object_update_request_create(our, PAGE_SIZE); if (rc != 0) { OBD_FREE_PTR(our); return ERR_PTR(rc); @@ -240,7 +233,7 @@ void osp_update_request_destroy(const struct lu_env *env, list_del_init(&obj->opo_invalidate_cb_list); spin_unlock(&obj->opo_lock); - lu_object_put(env, &obj->opo_obj.do_lu); + dt_object_put(env, &obj->opo_obj); } if (env == &lenv) @@ -354,6 +347,7 @@ int osp_prep_update_req(const struct lu_env *env, struct obd_import *imp, struct out_update_header *ouh; struct out_update_buffer *oub; __u32 buf_count = 0; + int page_count = 0; int repsize = 0; struct object_update_reply *reply; int rc, i; @@ -383,8 +377,8 @@ int osp_prep_update_req(const struct lu_env *env, struct obd_import *imp, buf_count++; } repsize += sizeof(*reply); - repsize = (repsize + OUT_UPDATE_REPLY_SIZE - 1) & - ~(OUT_UPDATE_REPLY_SIZE - 1); + if (repsize < OUT_UPDATE_REPLY_SIZE) + repsize = OUT_UPDATE_REPLY_SIZE; LASSERT(buf_count > 0); req = ptlrpc_request_alloc(imp, &RQF_OUT_UPDATE); @@ -425,13 +419,15 @@ int osp_prep_update_req(const struct lu_env *env, struct obd_import *imp, list_for_each_entry(ours, &our->our_req_list, ours_list) { oub->oub_size = ours->ours_req_size; oub++; + /* First *and* last might be partial pages, hence +1 */ + page_count += DIV_ROUND_UP(ours->ours_req_size, PAGE_SIZE) + 1; } req->rq_bulk_write = 1; - desc = ptlrpc_prep_bulk_imp(req, buf_count, + desc = ptlrpc_prep_bulk_imp(req, page_count, MD_MAX_BRW_SIZE >> LNET_MTU_BITS, - PTLRPC_BULK_GET_SOURCE | PTLRPC_BULK_BUF_KVEC, - MDS_BULK_PORTAL, &ptlrpc_bulk_kvec_ops); + PTLRPC_BULK_GET_SOURCE, + MDS_BULK_PORTAL, &ptlrpc_bulk_kiov_nopin_ops); if (desc == NULL) GOTO(out_req, rc = -ENOMEM); @@ -527,7 +523,7 @@ static void osp_thandle_invalidate_object(const struct lu_env *env, list_del_init(&obj->opo_invalidate_cb_list); spin_unlock(&obj->opo_lock); - lu_object_put(env, &obj->opo_obj.do_lu); + dt_object_put(env, &obj->opo_obj); } } @@ -609,17 +605,17 @@ static void osp_update_callback_fini(const struct lu_env *env, * \retval negative error number on failure */ static int osp_update_interpret(const struct lu_env *env, - struct ptlrpc_request *req, void *arg, int rc) + struct ptlrpc_request *req, void *args, int rc) { - struct object_update_reply *reply = NULL; - struct osp_update_args *oaua = arg; - struct osp_update_request *our = oaua->oaua_update; - struct osp_thandle *oth; - struct osp_update_callback *ouc; - struct osp_update_callback *next; - int count = 0; - int index = 0; - int rc1 = 0; + struct object_update_reply *reply = NULL; + struct osp_update_args *oaua = args; + struct osp_update_request *our = oaua->oaua_update; + struct osp_thandle *oth; + struct osp_update_callback *ouc; + struct osp_update_callback *next; + int count = 0; + int index = 0; + int rc1 = 0; ENTRY; @@ -742,7 +738,7 @@ int osp_unplug_async_request(const struct lu_env *env, } osp_update_request_destroy(env, our); } else { - args = ptlrpc_req_async_args(req); + args = ptlrpc_req_async_args(args, req); args->oaua_update = our; args->oaua_count = NULL; args->oaua_waitq = NULL; @@ -973,7 +969,6 @@ struct thandle *osp_trans_create(const struct lu_env *env, struct dt_device *d) oth->ot_magic = OSP_THANDLE_MAGIC; th = &oth->ot_super; th->th_dev = d; - th->th_tags = LCT_TX_HANDLE; atomic_set(&oth->ot_refcount, 1); INIT_LIST_HEAD(&oth->ot_commit_dcb_list); @@ -1126,7 +1121,7 @@ static int osp_send_update_req(const struct lu_env *env, RETURN(rc); } - args = ptlrpc_req_async_args(req); + args = ptlrpc_req_async_args(args, req); args->oaua_update = our; /* set env to NULL, in case the interrupt cb and current function * are in different thread */ @@ -1151,7 +1146,7 @@ static int osp_send_update_req(const struct lu_env *env, if (!osp->opd_connect_mdt) { down_read(&osp->opd_async_updates_rwsem); args->oaua_count = &osp->opd_async_updates_count; - args->oaua_waitq = &osp->opd_syn_barrier_waitq; + args->oaua_waitq = &osp->opd_sync_barrier_waitq; up_read(&osp->opd_async_updates_rwsem); atomic_inc(args->oaua_count); } @@ -1181,7 +1176,10 @@ static int osp_send_update_req(const struct lu_env *env, rc = ptlrpc_queue_wait(req); if (osp->opd_connect_mdt) osp_put_rpc_lock(osp); - if ((rc == -ENOMEM && req->rq_set == NULL) || + + /* We use rq_queued_time to distinguish between local + * and remote -ENOMEM. */ + if ((rc == -ENOMEM && req->rq_queued_time == 0) || (req->rq_transno == 0 && !req->rq_committed)) { if (args->oaua_update != NULL) { /* If osp_update_interpret is not being called, @@ -1208,7 +1206,7 @@ out: * Get local thandle for osp_thandle * * Get the local OSD thandle from the OSP thandle. Currently, there - * are a few OSP API (osp_object_create() and osp_sync_add()) needs + * are a few OSP API (osp_create() and osp_sync_add()) needs * to update the object on local OSD device. * * If the osp_thandle comes from normal stack (MDD->LOD->OSP), then @@ -1258,7 +1256,7 @@ struct thandle *osp_get_storage_thandle(const struct lu_env *env, * * Set the version for the transaction and add the request to * the sending list, then after transaction stop, the request - * will be picked in the order of version, by sending thread. + * will be sent in the order of version by the sending thread. * * \param [in] oth osp thandle to be set version. * @@ -1288,6 +1286,7 @@ int osp_check_and_set_rpc_version(struct osp_thandle *oth, /* Assign the version and add it to the sending list */ osp_thandle_get(oth); oth->ot_our->our_version = ou->ou_version++; + oth->ot_our->our_generation = ou->ou_generation; list_add_tail(&oth->ot_our->our_list, &osp->opd_update->ou_list); oth->ot_our->our_req_ready = 0; @@ -1295,8 +1294,8 @@ int osp_check_and_set_rpc_version(struct osp_thandle *oth, spin_unlock(&ou->ou_lock); LASSERT(oth->ot_super.th_wait_submit == 1); - CDEBUG(D_INFO, "%s: version %llu oth:version %p:%llu\n", - osp->opd_obd->obd_name, ou->ou_version, oth, + CDEBUG(D_INFO, "%s: version %llu gen %llu oth:version %p:%llu\n", + osp->opd_obd->obd_name, ou->ou_version, ou->ou_generation, oth, oth->ot_our->our_version); return 0; @@ -1372,11 +1371,14 @@ void osp_invalidate_request(struct osp_device *osp) if (rc < 0) { CERROR("%s: init env error: rc = %d\n", osp->opd_obd->obd_name, rc); + + spin_lock(&ou->ou_lock); + ou->ou_generation++; + spin_unlock(&ou->ou_lock); + return; } - INIT_LIST_HEAD(&list); - spin_lock(&ou->ou_lock); /* invalidate all of request in the sending list */ list_for_each_entry_safe(our, tmp, &ou->ou_list, our_list) { @@ -1397,6 +1399,9 @@ void osp_invalidate_request(struct osp_device *osp) our); } + /* Increase the generation, then the update request with old generation + * will fail with -EIO. */ + ou->ou_generation++; spin_unlock(&ou->ou_lock); /* invalidate all of request in the sending list */ @@ -1426,49 +1431,41 @@ void osp_invalidate_request(struct osp_device *osp) */ int osp_send_update_thread(void *arg) { - struct lu_env env; + struct lu_env *env; struct osp_device *osp = arg; - struct l_wait_info lwi = { 0 }; struct osp_updates *ou = osp->opd_update; - struct ptlrpc_thread *thread = &osp->opd_update_thread; struct osp_update_request *our = NULL; int rc; ENTRY; LASSERT(ou != NULL); - rc = lu_env_init(&env, osp->opd_dt_dev.dd_lu_dev.ld_type->ldt_ctx_tags); - if (rc < 0) { - CERROR("%s: init env error: rc = %d\n", osp->opd_obd->obd_name, - rc); - RETURN(rc); - } + env = &ou->ou_env; - thread->t_flags = SVC_RUNNING; - wake_up(&thread->t_ctl_waitq); while (1) { our = NULL; - l_wait_event(ou->ou_waitq, - !osp_send_update_thread_running(osp) || - osp_get_next_request(ou, &our), &lwi); + wait_event_idle(ou->ou_waitq, + kthread_should_stop() || + osp_get_next_request(ou, &our)); - if (!osp_send_update_thread_running(osp)) { + if (kthread_should_stop()) { if (our != NULL) { - osp_trans_callback(&env, our->our_th, -EINTR); - osp_thandle_put(&env, our->our_th); + osp_trans_callback(env, our->our_th, -EINTR); + osp_thandle_put(env, our->our_th); } break; } LASSERT(our->our_th != NULL); if (our->our_th->ot_super.th_result != 0) { - osp_trans_callback(&env, our->our_th, + osp_trans_callback(env, our->our_th, our->our_th->ot_super.th_result); rc = our->our_th->ot_super.th_result; - } else if (OBD_FAIL_CHECK(OBD_FAIL_INVALIDATE_UPDATE)) { + } else if (ou->ou_generation != our->our_generation || + OBD_FAIL_CHECK(OBD_FAIL_INVALIDATE_UPDATE)) { rc = -EIO; - osp_trans_callback(&env, our->our_th, rc); + osp_trans_callback(env, our->our_th, rc); } else { - rc = osp_send_update_req(&env, osp, our); + rc = osp_send_update_req(env, osp, our); } /* Update the rpc version */ @@ -1485,13 +1482,9 @@ int osp_send_update_thread(void *arg) osp_invalidate_request(osp); /* Balanced for thandle_get in osp_check_and_set_rpc_version */ - osp_thandle_put(&env, our->our_th); + osp_thandle_put(env, our->our_th); } - thread->t_flags = SVC_STOPPED; - lu_env_fini(&env); - wake_up(&thread->t_ctl_waitq); - RETURN(0); } @@ -1571,8 +1564,7 @@ int osp_trans_stop(const struct lu_env *env, struct dt_device *dt, GOTO(out, rc); } - if (osp->opd_update == NULL || - !osp_send_update_thread_running(osp)) { + if (osp->opd_update == NULL) { osp_trans_callback(env, oth, -EIO); GOTO(out, rc = -EIO); }