X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lustre%2Fosp%2Fosp_trans.c;h=f3669515271c4669f8d03881a6bf130cd9e12f2e;hb=5c883ea2748ae9e430a9cd863a9b630b2a74440a;hp=ffcc382d917a35c7ba4aed6b32208e8a7cb6136e;hpb=0b1ad400c8f64575292a7ff54a8ce872a124b19e;p=fs%2Flustre-release.git diff --git a/lustre/osp/osp_trans.c b/lustre/osp/osp_trans.c index ffcc382..f366951 100644 --- a/lustre/osp/osp_trans.c +++ b/lustre/osp/osp_trans.c @@ -20,7 +20,7 @@ * GPL HEADER END */ /* - * Copyright (c) 2014, 2015, Intel Corporation. + * Copyright (c) 2014, 2017, Intel Corporation. */ /* * lustre/osp/osp_trans.c @@ -77,6 +77,7 @@ struct osp_update_args { atomic_t *oaua_count; wait_queue_head_t *oaua_waitq; bool oaua_flow_control; + const struct lu_env *oaua_update_env; }; /** @@ -96,20 +97,6 @@ struct osp_update_callback { osp_update_interpreter_t ouc_interpreter; }; -static struct object_update_request *object_update_request_alloc(size_t size) -{ - struct object_update_request *ourq; - - OBD_ALLOC_LARGE(ourq, size); - if (ourq == NULL) - return ERR_PTR(-ENOMEM); - - ourq->ourq_magic = UPDATE_REQUEST_MAGIC; - ourq->ourq_count = 0; - - return ourq; -} - /** * Allocate new update request * @@ -125,21 +112,28 @@ int osp_object_update_request_create(struct osp_update_request *our, size_t size) { struct osp_update_request_sub *ours; + struct object_update_request *ourq; OBD_ALLOC_PTR(ours); if (ours == NULL) return -ENOMEM; - if (size < OUT_UPDATE_INIT_BUFFER_SIZE) - size = OUT_UPDATE_INIT_BUFFER_SIZE; - - ours->ours_req = object_update_request_alloc(size); - - if (IS_ERR(ours->ours_req)) { + /* The object update request will be added to an SG list for + * bulk transfer. Some IB HW cannot handle partial pages in SG + * lists (since they create gaps in memory regions) so we + * round the size up to the next multiple of PAGE_SIZE. See + * LU-9983. */ + LASSERT(size > 0); + size = round_up(size, PAGE_SIZE); + OBD_ALLOC_LARGE(ourq, size); + if (ourq == NULL) { OBD_FREE_PTR(ours); return -ENOMEM; } + ourq->ourq_magic = UPDATE_REQUEST_MAGIC; + ourq->ourq_count = 0; + ours->ours_req = ourq; ours->ours_req_size = size; INIT_LIST_HEAD(&ours->ours_list); list_add_tail(&ours->ours_list, &our->our_req_list); @@ -186,6 +180,7 @@ osp_current_object_update_request(struct osp_update_request *our) struct osp_update_request *osp_update_request_create(struct dt_device *dt) { struct osp_update_request *our; + int rc; OBD_ALLOC_PTR(our); if (our == NULL) @@ -194,13 +189,19 @@ struct osp_update_request *osp_update_request_create(struct dt_device *dt) INIT_LIST_HEAD(&our->our_req_list); INIT_LIST_HEAD(&our->our_cb_items); INIT_LIST_HEAD(&our->our_list); + INIT_LIST_HEAD(&our->our_invalidate_cb_list); spin_lock_init(&our->our_list_lock); - osp_object_update_request_create(our, OUT_UPDATE_INIT_BUFFER_SIZE); + rc = osp_object_update_request_create(our, PAGE_SIZE); + if (rc != 0) { + OBD_FREE_PTR(our); + return ERR_PTR(rc); + } return our; } -void osp_update_request_destroy(struct osp_update_request *our) +void osp_update_request_destroy(const struct lu_env *env, + struct osp_update_request *our) { struct osp_update_request_sub *ours; struct osp_update_request_sub *tmp; @@ -214,6 +215,31 @@ void osp_update_request_destroy(struct osp_update_request *our) OBD_FREE_LARGE(ours->ours_req, ours->ours_req_size); OBD_FREE_PTR(ours); } + + if (!list_empty(&our->our_invalidate_cb_list)) { + struct lu_env lenv; + struct osp_object *obj; + struct osp_object *next; + + if (env == NULL) { + lu_env_init(&lenv, LCT_MD_THREAD | LCT_DT_THREAD); + env = &lenv; + } + + list_for_each_entry_safe(obj, next, + &our->our_invalidate_cb_list, + opo_invalidate_cb_list) { + spin_lock(&obj->opo_lock); + list_del_init(&obj->opo_invalidate_cb_list); + spin_unlock(&obj->opo_lock); + + dt_object_put(env, &obj->opo_obj); + } + + if (env == &lenv) + lu_env_fini(&lenv); + } + OBD_FREE_PTR(our); } @@ -231,7 +257,7 @@ object_update_request_dump(const struct object_update_request *ourq, update = object_update_request_get(ourq, i, &size); LASSERT(update != NULL); CDEBUG(mask, "i = %u fid = "DFID" op = %s " - "params = %d batchid = "LPU64" size = %zu repsize %u\n", + "params = %d batchid = %llu size = %zu repsize %u\n", i, PFID(&update->ou_fid), update_op_str(update->ou_type), update->ou_params_count, @@ -350,8 +376,8 @@ int osp_prep_update_req(const struct lu_env *env, struct obd_import *imp, buf_count++; } repsize += sizeof(*reply); - repsize = (repsize + OUT_UPDATE_REPLY_SIZE - 1) & - ~(OUT_UPDATE_REPLY_SIZE - 1); + if (repsize < OUT_UPDATE_REPLY_SIZE) + repsize = OUT_UPDATE_REPLY_SIZE; LASSERT(buf_count > 0); req = ptlrpc_request_alloc(imp, &RQF_OUT_UPDATE); @@ -451,10 +477,7 @@ int osp_remote_sync(const struct lu_env *env, struct osp_device *osp, if (rc != 0) RETURN(rc); - /* This will only be called with read-only update, and these updates - * might be used to retrieve update log during recovery process, so - * it will be allowed to send during recovery process */ - req->rq_allow_replay = 1; + osp_set_req_replay(osp, req); req->rq_allow_intr = 1; /* Note: some dt index api might return non-zero result here, like @@ -478,48 +501,26 @@ int osp_remote_sync(const struct lu_env *env, struct osp_device *osp, * \param[in] oth osp thandle. */ static void osp_thandle_invalidate_object(const struct lu_env *env, - struct osp_thandle *oth) + struct osp_thandle *oth, + int result) { struct osp_update_request *our = oth->ot_our; - struct osp_update_request_sub *ours; + struct osp_object *obj; + struct osp_object *next; if (our == NULL) return; - list_for_each_entry(ours, &our->our_req_list, ours_list) { - struct object_update_request *our_req = ours->ours_req; - unsigned int i; - struct lu_object *obj; - struct osp_object *osp_obj; - - for (i = 0; i < our_req->ourq_count; i++) { - struct object_update *update; - - update = object_update_request_get(our_req, i, NULL); - if (update == NULL) - break; - - if (update->ou_type != OUT_WRITE) - continue; - - if (!fid_is_sane(&update->ou_fid)) - continue; - - obj = lu_object_find_slice(env, - &oth->ot_super.th_dev->dd_lu_dev, - &update->ou_fid, NULL); - if (IS_ERR(obj)) - break; - - osp_obj = lu2osp_obj(obj); - if (osp_obj->opo_ooa != NULL) { - spin_lock(&osp_obj->opo_lock); - osp_obj->opo_ooa->ooa_attr.la_valid = 0; - osp_obj->opo_stale = 1; - spin_unlock(&osp_obj->opo_lock); - } - lu_object_put(env, obj); - } + list_for_each_entry_safe(obj, next, &our->our_invalidate_cb_list, + opo_invalidate_cb_list) { + if (result < 0) + osp_invalidate(env, &obj->opo_obj); + + spin_lock(&obj->opo_lock); + list_del_init(&obj->opo_invalidate_cb_list); + spin_unlock(&obj->opo_lock); + + dt_object_put(env, &obj->opo_obj); } } @@ -539,8 +540,7 @@ static void osp_trans_stop_cb(const struct lu_env *env, dcb->dcb_func(NULL, &oth->ot_super, dcb, result); } - if (result < 0) - osp_thandle_invalidate_object(env, oth); + osp_thandle_invalidate_object(env, oth, result); } /** @@ -602,23 +602,34 @@ static void osp_update_callback_fini(const struct lu_env *env, * \retval negative error number on failure */ static int osp_update_interpret(const struct lu_env *env, - struct ptlrpc_request *req, void *arg, int rc) + struct ptlrpc_request *req, void *args, int rc) { - struct object_update_reply *reply = NULL; - struct osp_update_args *oaua = arg; - struct osp_update_request *our = oaua->oaua_update; - struct osp_thandle *oth; - struct osp_update_callback *ouc; - struct osp_update_callback *next; - int count = 0; - int index = 0; - int rc1 = 0; + struct object_update_reply *reply = NULL; + struct osp_update_args *oaua = args; + struct osp_update_request *our = oaua->oaua_update; + struct osp_thandle *oth; + struct osp_update_callback *ouc; + struct osp_update_callback *next; + int count = 0; + int index = 0; + int rc1 = 0; ENTRY; if (our == NULL) RETURN(0); + /* Sigh env might be NULL in some cases, see + * this calling path. + * osp_send_update_thread() + * ptlrpc_set_wait() ----> null env. + * ptlrpc_check_set() + * osp_update_interpret() + * Let's use env in oaua for this case. + */ + if (env == NULL) + env = oaua->oaua_update_env; + oaua->oaua_update = NULL; oth = our->our_th; if (oaua->oaua_flow_control) { @@ -630,7 +641,7 @@ static int osp_update_interpret(const struct lu_env *env, } /* Unpack the results from the reply message. */ - if (req->rq_repmsg != NULL) { + if (req->rq_repmsg != NULL && req->rq_replied) { reply = req_capsule_server_sized_get(&req->rq_pill, &RMF_OUT_UPDATE_REPLY, OUT_UPDATE_REPLY_SIZE); @@ -680,9 +691,9 @@ static int osp_update_interpret(const struct lu_env *env, /* oth and osp_update_requests will be destoryed in * osp_thandle_put */ osp_trans_stop_cb(env, oth, rc); - osp_thandle_put(oth); + osp_thandle_put(env, oth); } else { - osp_update_request_destroy(our); + osp_update_request_destroy(env, our); } RETURN(rc); @@ -722,12 +733,16 @@ int osp_unplug_async_request(const struct lu_env *env, ouc->ouc_data, 0, rc); osp_update_callback_fini(env, ouc); } - osp_update_request_destroy(our); + osp_update_request_destroy(env, our); } else { - args = ptlrpc_req_async_args(req); + args = ptlrpc_req_async_args(args, req); args->oaua_update = our; args->oaua_count = NULL; args->oaua_waitq = NULL; + /* Note: this is asynchronous call for the request, so the + * interrupte cb and current function will be different + * thread, so we need use different env */ + args->oaua_update_env = NULL; args->oaua_flow_control = false; req->rq_interpret_reply = osp_update_interpret; ptlrpcd_add_req(req); @@ -900,13 +915,14 @@ int osp_trans_update_request_create(struct thandle *th) return 0; } -void osp_thandle_destroy(struct osp_thandle *oth) +void osp_thandle_destroy(const struct lu_env *env, + struct osp_thandle *oth) { LASSERT(oth->ot_magic == OSP_THANDLE_MAGIC); LASSERT(list_empty(&oth->ot_commit_dcb_list)); LASSERT(list_empty(&oth->ot_stop_dcb_list)); if (oth->ot_our != NULL) - osp_update_request_destroy(oth->ot_our); + osp_update_request_destroy(env, oth->ot_our); OBD_FREE_PTR(oth); } @@ -950,7 +966,6 @@ struct thandle *osp_trans_create(const struct lu_env *env, struct dt_device *d) oth->ot_magic = OSP_THANDLE_MAGIC; th = &oth->ot_super; th->th_dev = d; - th->th_tags = LCT_TX_HANDLE; atomic_set(&oth->ot_refcount, 1); INIT_LIST_HEAD(&oth->ot_commit_dcb_list); @@ -1022,7 +1037,7 @@ static void osp_request_commit_cb(struct ptlrpc_request *req) last_committed_transno = req->rq_import->imp_peer_committed_transno; - CDEBUG(D_HA, "trans no "LPU64" committed transno "LPU64"\n", + CDEBUG(D_HA, "trans no %llu committed transno %llu\n", req->rq_transno, last_committed_transno); /* If the transaction is not really committed, mark result = 1 */ @@ -1032,7 +1047,7 @@ static void osp_request_commit_cb(struct ptlrpc_request *req) osp_trans_commit_cb(oth, result); req->rq_committed = 1; - osp_thandle_put(oth); + osp_thandle_put(NULL, oth); EXIT; } @@ -1091,7 +1106,6 @@ static int osp_send_update_req(const struct lu_env *env, { struct osp_update_args *args; struct ptlrpc_request *req; - struct lu_device *top_device; struct osp_thandle *oth = our->our_th; int rc = 0; ENTRY; @@ -1104,21 +1118,24 @@ static int osp_send_update_req(const struct lu_env *env, RETURN(rc); } - args = ptlrpc_req_async_args(req); + args = ptlrpc_req_async_args(args, req); args->oaua_update = our; + /* set env to NULL, in case the interrupt cb and current function + * are in different thread */ + args->oaua_update_env = NULL; osp_thandle_get(oth); /* hold for update interpret */ req->rq_interpret_reply = osp_update_interpret; if (!oth->ot_super.th_wait_submit && !oth->ot_super.th_sync) { if (!osp->opd_imp_active || !osp->opd_imp_connected) { osp_trans_callback(env, oth, rc); - osp_thandle_put(oth); + osp_thandle_put(env, oth); GOTO(out, rc = -ENOTCONN); } rc = obd_get_request_slot(&osp->opd_obd->u.cli); if (rc != 0) { osp_trans_callback(env, oth, rc); - osp_thandle_put(oth); + osp_thandle_put(env, oth); GOTO(out, rc = -ENOTCONN); } args->oaua_flow_control = true; @@ -1126,7 +1143,7 @@ static int osp_send_update_req(const struct lu_env *env, if (!osp->opd_connect_mdt) { down_read(&osp->opd_async_updates_rwsem); args->oaua_count = &osp->opd_async_updates_count; - args->oaua_waitq = &osp->opd_syn_barrier_waitq; + args->oaua_waitq = &osp->opd_sync_barrier_waitq; up_read(&osp->opd_async_updates_rwsem); atomic_inc(args->oaua_count); } @@ -1145,28 +1162,33 @@ static int osp_send_update_req(const struct lu_env *env, * status, in case the other target is being recoveried * at the same time, and if we wait here for the import * to be recoveryed, it might cause deadlock */ - top_device = osp->opd_dt_dev.dd_lu_dev.ld_site->ls_top_dev; - if (top_device->ld_obd->obd_recovering) - req->rq_allow_replay = 1; + osp_set_req_replay(osp, req); + /* Because this req will be synchronus, i.e. it will be called + * in the same thread, so it will be safe to use current + * env */ + args->oaua_update_env = env; if (osp->opd_connect_mdt) osp_get_rpc_lock(osp); rc = ptlrpc_queue_wait(req); if (osp->opd_connect_mdt) osp_put_rpc_lock(osp); - if ((rc == -ENOMEM && req->rq_set == NULL) || + + /* We use rq_queued_time to distinguish between local + * and remote -ENOMEM. */ + if ((rc == -ENOMEM && req->rq_queued_time == 0) || (req->rq_transno == 0 && !req->rq_committed)) { if (args->oaua_update != NULL) { /* If osp_update_interpret is not being called, * release the osp_thandle */ args->oaua_update = NULL; - osp_thandle_put(oth); + osp_thandle_put(env, oth); } req->rq_cb_data = NULL; rc = rc == 0 ? req->rq_status : rc; osp_trans_callback(env, oth, rc); - osp_thandle_put(oth); + osp_thandle_put(env, oth); GOTO(out, rc); } } @@ -1181,7 +1203,7 @@ out: * Get local thandle for osp_thandle * * Get the local OSD thandle from the OSP thandle. Currently, there - * are a few OSP API (osp_object_create() and osp_sync_add()) needs + * are a few OSP API (osp_create() and osp_sync_add()) needs * to update the object on local OSD device. * * If the osp_thandle comes from normal stack (MDD->LOD->OSP), then @@ -1231,7 +1253,7 @@ struct thandle *osp_get_storage_thandle(const struct lu_env *env, * * Set the version for the transaction and add the request to * the sending list, then after transaction stop, the request - * will be picked in the order of version, by sending thread. + * will be sent in the order of version by the sending thread. * * \param [in] oth osp thandle to be set version. * @@ -1261,6 +1283,7 @@ int osp_check_and_set_rpc_version(struct osp_thandle *oth, /* Assign the version and add it to the sending list */ osp_thandle_get(oth); oth->ot_our->our_version = ou->ou_version++; + oth->ot_our->our_generation = ou->ou_generation; list_add_tail(&oth->ot_our->our_list, &osp->opd_update->ou_list); oth->ot_our->our_req_ready = 0; @@ -1268,8 +1291,8 @@ int osp_check_and_set_rpc_version(struct osp_thandle *oth, spin_unlock(&ou->ou_lock); LASSERT(oth->ot_super.th_wait_submit == 1); - CDEBUG(D_INFO, "%s: version "LPU64" oth:version %p:"LPU64"\n", - osp->opd_obd->obd_name, ou->ou_version, oth, + CDEBUG(D_INFO, "%s: version %llu gen %llu oth:version %p:%llu\n", + osp->opd_obd->obd_name, ou->ou_version, ou->ou_generation, oth, oth->ot_our->our_version); return 0; @@ -1298,7 +1321,7 @@ osp_get_next_request(struct osp_updates *ou, struct osp_update_request **ourp) spin_lock(&ou->ou_lock); list_for_each_entry_safe(our, tmp, &ou->ou_list, our_list) { LASSERT(our->our_th != NULL); - CDEBUG(D_HA, "ou %p version "LPU64" rpc_version "LPU64"\n", + CDEBUG(D_HA, "ou %p version %llu rpc_version %llu\n", ou, our->our_version, ou->ou_rpc_version); spin_lock(&our->our_list_lock); /* Find next osp_update_request in the list */ @@ -1345,11 +1368,14 @@ void osp_invalidate_request(struct osp_device *osp) if (rc < 0) { CERROR("%s: init env error: rc = %d\n", osp->opd_obd->obd_name, rc); + + spin_lock(&ou->ou_lock); + ou->ou_generation++; + spin_unlock(&ou->ou_lock); + return; } - INIT_LIST_HEAD(&list); - spin_lock(&ou->ou_lock); /* invalidate all of request in the sending list */ list_for_each_entry_safe(our, tmp, &ou->ou_list, our_list) { @@ -1370,6 +1396,9 @@ void osp_invalidate_request(struct osp_device *osp) our); } + /* Increase the generation, then the update request with old generation + * will fail with -EIO. */ + ou->ou_generation++; spin_unlock(&ou->ou_lock); /* invalidate all of request in the sending list */ @@ -1379,7 +1408,7 @@ void osp_invalidate_request(struct osp_device *osp) spin_unlock(&our->our_list_lock); osp_trans_callback(&env, our->our_th, our->our_th->ot_super.th_result); - osp_thandle_put(our->our_th); + osp_thandle_put(&env, our->our_th); } lu_env_fini(&env); } @@ -1401,7 +1430,6 @@ int osp_send_update_thread(void *arg) { struct lu_env env; struct osp_device *osp = arg; - struct l_wait_info lwi = { 0 }; struct osp_updates *ou = osp->opd_update; struct ptlrpc_thread *thread = &osp->opd_update_thread; struct osp_update_request *our = NULL; @@ -1420,14 +1448,14 @@ int osp_send_update_thread(void *arg) wake_up(&thread->t_ctl_waitq); while (1) { our = NULL; - l_wait_event(ou->ou_waitq, - !osp_send_update_thread_running(osp) || - osp_get_next_request(ou, &our), &lwi); + wait_event_idle(ou->ou_waitq, + !osp_send_update_thread_running(osp) || + osp_get_next_request(ou, &our)); if (!osp_send_update_thread_running(osp)) { if (our != NULL) { osp_trans_callback(&env, our->our_th, -EINTR); - osp_thandle_put(our->our_th); + osp_thandle_put(&env, our->our_th); } break; } @@ -1437,7 +1465,8 @@ int osp_send_update_thread(void *arg) osp_trans_callback(&env, our->our_th, our->our_th->ot_super.th_result); rc = our->our_th->ot_super.th_result; - } else if (OBD_FAIL_CHECK(OBD_FAIL_INVALIDATE_UPDATE)) { + } else if (ou->ou_generation != our->our_generation || + OBD_FAIL_CHECK(OBD_FAIL_INVALIDATE_UPDATE)) { rc = -EIO; osp_trans_callback(&env, our->our_th, rc); } else { @@ -1458,7 +1487,7 @@ int osp_send_update_thread(void *arg) osp_invalidate_request(osp); /* Balanced for thandle_get in osp_check_and_set_rpc_version */ - osp_thandle_put(our->our_th); + osp_thandle_put(&env, our->our_th); } thread->t_flags = SVC_STOPPED; @@ -1550,7 +1579,7 @@ int osp_trans_stop(const struct lu_env *env, struct dt_device *dt, GOTO(out, rc = -EIO); } - CDEBUG(D_HA, "%s: add oth %p with version "LPU64"\n", + CDEBUG(D_HA, "%s: add oth %p with version %llu\n", osp->opd_obd->obd_name, oth, our->our_version); LASSERT(our->our_req_ready == 0); @@ -1570,7 +1599,7 @@ int osp_trans_stop(const struct lu_env *env, struct dt_device *dt, osp_trans_callback(env, oth, th->th_result); } out: - osp_thandle_put(oth); + osp_thandle_put(env, oth); RETURN(rc); }