X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lustre%2Fosp%2Fosp_trans.c;h=7bb3bea5207a48aca13c1bed31d57b5f9767a1ca;hb=f28cc25929c4e8a111e96b2205a0433542b35e84;hp=b055d33b9fcebf289f20b29424b6bc41d61d9586;hpb=12d6356a48de70922975e38451059211c753252e;p=fs%2Flustre-release.git diff --git a/lustre/osp/osp_trans.c b/lustre/osp/osp_trans.c index b055d33..7bb3bea 100644 --- a/lustre/osp/osp_trans.c +++ b/lustre/osp/osp_trans.c @@ -77,6 +77,7 @@ struct osp_update_args { atomic_t *oaua_count; wait_queue_head_t *oaua_waitq; bool oaua_flow_control; + const struct lu_env *oaua_update_env; }; /** @@ -194,6 +195,7 @@ struct osp_update_request *osp_update_request_create(struct dt_device *dt) INIT_LIST_HEAD(&our->our_req_list); INIT_LIST_HEAD(&our->our_cb_items); INIT_LIST_HEAD(&our->our_list); + spin_lock_init(&our->our_list_lock); osp_object_update_request_create(our, OUT_UPDATE_INIT_BUFFER_SIZE); return our; @@ -210,7 +212,7 @@ void osp_update_request_destroy(struct osp_update_request *our) list_for_each_entry_safe(ours, tmp, &our->our_req_list, ours_list) { list_del(&ours->ours_list); if (ours->ours_req != NULL) - OBD_FREE(ours->ours_req, ours->ours_req_size); + OBD_FREE_LARGE(ours->ours_req, ours->ours_req_size); OBD_FREE_PTR(ours); } OBD_FREE_PTR(our); @@ -454,6 +456,7 @@ int osp_remote_sync(const struct lu_env *env, struct osp_device *osp, * might be used to retrieve update log during recovery process, so * it will be allowed to send during recovery process */ req->rq_allow_replay = 1; + req->rq_allow_intr = 1; /* Note: some dt index api might return non-zero result here, like * osd_index_ea_lookup, so we should only check rc < 0 here */ @@ -467,7 +470,62 @@ int osp_remote_sync(const struct lu_env *env, struct osp_device *osp, RETURN(rc); } -static void osp_trans_stop_cb(struct osp_thandle *oth, int result) +/** + * Invalidate all objects in the osp thandle + * + * invalidate all of objects in the update request, which will be called + * when the transaction is aborted. + * + * \param[in] oth osp thandle. + */ +static void osp_thandle_invalidate_object(const struct lu_env *env, + struct osp_thandle *oth) +{ + struct osp_update_request *our = oth->ot_our; + struct osp_update_request_sub *ours; + + if (our == NULL) + return; + + list_for_each_entry(ours, &our->our_req_list, ours_list) { + struct object_update_request *our_req = ours->ours_req; + unsigned int i; + struct lu_object *obj; + struct osp_object *osp_obj; + + for (i = 0; i < our_req->ourq_count; i++) { + struct object_update *update; + + update = object_update_request_get(our_req, i, NULL); + if (update == NULL) + break; + + if (update->ou_type != OUT_WRITE) + continue; + + if (!fid_is_sane(&update->ou_fid)) + continue; + + obj = lu_object_find_slice(env, + &oth->ot_super.th_dev->dd_lu_dev, + &update->ou_fid, NULL); + if (IS_ERR(obj)) + break; + + osp_obj = lu2osp_obj(obj); + if (osp_obj->opo_ooa != NULL) { + spin_lock(&osp_obj->opo_lock); + osp_obj->opo_ooa->ooa_attr.la_valid = 0; + osp_obj->opo_stale = 1; + spin_unlock(&osp_obj->opo_lock); + } + lu_object_put(env, obj); + } + } +} + +static void osp_trans_stop_cb(const struct lu_env *env, + struct osp_thandle *oth, int result) { struct dt_txn_commit_cb *dcb; struct dt_txn_commit_cb *tmp; @@ -481,6 +539,9 @@ static void osp_trans_stop_cb(struct osp_thandle *oth, int result) list_del_init(&dcb->dcb_linkage); dcb->dcb_func(NULL, &oth->ot_super, dcb, result); } + + if (result < 0) + osp_thandle_invalidate_object(env, oth); } /** @@ -559,6 +620,17 @@ static int osp_update_interpret(const struct lu_env *env, if (our == NULL) RETURN(0); + /* Sigh env might be NULL in some cases, see + * this calling path. + * osp_send_update_thread() + * ptlrpc_set_wait() ----> null env. + * ptlrpc_check_set() + * osp_update_interpret() + * Let's use env in oaua for this case. + */ + if (env == NULL) + env = oaua->oaua_update_env; + oaua->oaua_update = NULL; oth = our->our_th; if (oaua->oaua_flow_control) { @@ -570,16 +642,16 @@ static int osp_update_interpret(const struct lu_env *env, } /* Unpack the results from the reply message. */ - if (req->rq_repmsg != NULL) { + if (req->rq_repmsg != NULL && req->rq_replied) { reply = req_capsule_server_sized_get(&req->rq_pill, &RMF_OUT_UPDATE_REPLY, OUT_UPDATE_REPLY_SIZE); - if (reply == NULL || reply->ourp_magic != UPDATE_REPLY_MAGIC) - rc1 = -EPROTO; - else + if (reply == NULL || reply->ourp_magic != UPDATE_REPLY_MAGIC) { + if (rc == 0) + rc = -EPROTO; + } else { count = reply->ourp_count; - } else { - rc1 = rc; + } } list_for_each_entry_safe(ouc, next, &our->our_cb_items, ouc_list) { @@ -588,18 +660,21 @@ static int osp_update_interpret(const struct lu_env *env, /* The peer may only have handled some requests (indicated * by the 'count') in the packaged OUT RPC, we can only get * results for the handled part. */ - if (index < count && reply->ourp_lens[index] > 0) { + if (index < count && reply->ourp_lens[index] > 0 && rc >= 0) { struct object_update_result *result; result = object_update_result_get(reply, index, NULL); if (result == NULL) - rc1 = -EPROTO; + rc1 = rc = -EPROTO; else - rc1 = result->our_rc; - } else { - rc1 = rc; - if (unlikely(rc1 == 0)) + rc1 = rc = result->our_rc; + } else if (rc1 >= 0) { + /* The peer did not handle these request, let's return + * -EINVAL to update interpret for now */ + if (rc >= 0) rc1 = -EINVAL; + else + rc1 = rc; } if (ouc->ouc_interpreter != NULL) @@ -616,13 +691,13 @@ static int osp_update_interpret(const struct lu_env *env, if (oth != NULL) { /* oth and osp_update_requests will be destoryed in * osp_thandle_put */ - osp_trans_stop_cb(oth, rc); + osp_trans_stop_cb(env, oth, rc); osp_thandle_put(oth); } else { osp_update_request_destroy(our); } - RETURN(0); + RETURN(rc); } /** @@ -665,6 +740,10 @@ int osp_unplug_async_request(const struct lu_env *env, args->oaua_update = our; args->oaua_count = NULL; args->oaua_waitq = NULL; + /* Note: this is asynchronous call for the request, so the + * interrupte cb and current function will be different + * thread, so we need use different env */ + args->oaua_update_env = NULL; args->oaua_flow_control = false; req->rq_interpret_reply = osp_update_interpret; ptlrpcd_add_req(req); @@ -949,7 +1028,8 @@ static void osp_request_commit_cb(struct ptlrpc_request *req) RETURN_EXIT; oth = thandle_to_osp_thandle(th); - if (lustre_msg_get_last_committed(req->rq_repmsg)) + if (req->rq_repmsg != NULL && + lustre_msg_get_last_committed(req->rq_repmsg)) last_committed_transno = lustre_msg_get_last_committed(req->rq_repmsg); @@ -1001,7 +1081,7 @@ void osp_trans_callback(const struct lu_env *env, osp_update_callback_fini(env, ouc); } } - osp_trans_stop_cb(oth, rc); + osp_trans_stop_cb(env, oth, rc); osp_trans_commit_cb(oth, rc); } @@ -1033,7 +1113,6 @@ static int osp_send_update_req(const struct lu_env *env, ENTRY; LASSERT(oth != NULL); - LASSERT(our->our_req_sent == 0); rc = osp_prep_update_req(env, osp->opd_obd->u.cli.cl_import, our, &req); if (rc != 0) { @@ -1043,6 +1122,9 @@ static int osp_send_update_req(const struct lu_env *env, args = ptlrpc_req_async_args(req); args->oaua_update = our; + /* set env to NULL, in case the interrupt cb and current function + * are in different thread */ + args->oaua_update_env = NULL; osp_thandle_get(oth); /* hold for update interpret */ req->rq_interpret_reply = osp_update_interpret; if (!oth->ot_super.th_wait_submit && !oth->ot_super.th_sync) { @@ -1086,6 +1168,10 @@ static int osp_send_update_req(const struct lu_env *env, if (top_device->ld_obd->obd_recovering) req->rq_allow_replay = 1; + /* Because this req will be synchronus, i.e. it will be called + * in the same thread, so it will be safe to use current + * env */ + args->oaua_update_env = env; if (osp->opd_connect_mdt) osp_get_rpc_lock(osp); rc = ptlrpc_queue_wait(req); @@ -1166,16 +1252,17 @@ struct thandle *osp_get_storage_thandle(const struct lu_env *env, /** * Set version for the transaction * - * Set the version for the transaction, then the osp RPC will be - * sent in the order of version, i.e. the transaction with lower - * version will be sent first. + * Set the version for the transaction and add the request to + * the sending list, then after transaction stop, the request + * will be picked in the order of version, by sending thread. * * \param [in] oth osp thandle to be set version. * * \retval 0 if set version succeeds * negative errno if set version fails. */ -int osp_check_and_set_rpc_version(struct osp_thandle *oth) +int osp_check_and_set_rpc_version(struct osp_thandle *oth, + struct osp_object *obj) { struct osp_device *osp = dt2osp_dev(oth->ot_super.th_dev); struct osp_updates *ou = osp->opd_update; @@ -1183,15 +1270,30 @@ int osp_check_and_set_rpc_version(struct osp_thandle *oth) if (ou == NULL) return -EIO; - if (oth->ot_version != 0) + if (oth->ot_our->our_version != 0) return 0; spin_lock(&ou->ou_lock); - oth->ot_version = ou->ou_version++; + spin_lock(&oth->ot_our->our_list_lock); + if (obj->opo_stale) { + spin_unlock(&oth->ot_our->our_list_lock); + spin_unlock(&ou->ou_lock); + return -ESTALE; + } + + /* Assign the version and add it to the sending list */ + osp_thandle_get(oth); + oth->ot_our->our_version = ou->ou_version++; + list_add_tail(&oth->ot_our->our_list, + &osp->opd_update->ou_list); + oth->ot_our->our_req_ready = 0; + spin_unlock(&oth->ot_our->our_list_lock); spin_unlock(&ou->ou_lock); + LASSERT(oth->ot_super.th_wait_submit == 1); CDEBUG(D_INFO, "%s: version "LPU64" oth:version %p:"LPU64"\n", - osp->opd_obd->obd_name, ou->ou_version, oth, oth->ot_version); + osp->opd_obd->obd_name, ou->ou_version, oth, + oth->ot_our->our_version); return 0; } @@ -1219,38 +1321,90 @@ osp_get_next_request(struct osp_updates *ou, struct osp_update_request **ourp) spin_lock(&ou->ou_lock); list_for_each_entry_safe(our, tmp, &ou->ou_list, our_list) { LASSERT(our->our_th != NULL); - CDEBUG(D_INFO, "our %p version "LPU64" rpc_version "LPU64"\n", - our, our->our_th->ot_version, ou->ou_rpc_version); - if (our->our_th->ot_version == 0) { - list_del_init(&our->our_list); - *ourp = our; - got_req = true; - break; - } - + CDEBUG(D_HA, "ou %p version "LPU64" rpc_version "LPU64"\n", + ou, our->our_version, ou->ou_rpc_version); + spin_lock(&our->our_list_lock); /* Find next osp_update_request in the list */ - if (our->our_th->ot_version == ou->ou_rpc_version) { + if (our->our_version == ou->ou_rpc_version && + our->our_req_ready) { list_del_init(&our->our_list); + spin_unlock(&our->our_list_lock); *ourp = our; got_req = true; break; } + spin_unlock(&our->our_list_lock); } spin_unlock(&ou->ou_lock); return got_req; } -static void osp_update_rpc_version(struct osp_updates *ou, - struct osp_thandle *oth) +/** + * Invalidate update request + * + * Invalidate update request in the OSP sending list, so all of + * requests in the sending list will return error, which happens + * when it finds one update (with writing llog) requests fails or + * the OSP is evicted by remote target. see osp_send_update_thread(). + * + * \param[in] osp OSP device whose update requests will be + * invalidated. + **/ +void osp_invalidate_request(struct osp_device *osp) { - if (oth->ot_version == 0) + struct lu_env env; + struct osp_updates *ou = osp->opd_update; + struct osp_update_request *our; + struct osp_update_request *tmp; + LIST_HEAD(list); + int rc; + ENTRY; + + if (ou == NULL) + return; + + rc = lu_env_init(&env, osp->opd_dt_dev.dd_lu_dev.ld_type->ldt_ctx_tags); + if (rc < 0) { + CERROR("%s: init env error: rc = %d\n", osp->opd_obd->obd_name, + rc); return; + } + + INIT_LIST_HEAD(&list); - LASSERT(oth->ot_version == ou->ou_rpc_version); spin_lock(&ou->ou_lock); - ou->ou_rpc_version++; + /* invalidate all of request in the sending list */ + list_for_each_entry_safe(our, tmp, &ou->ou_list, our_list) { + spin_lock(&our->our_list_lock); + if (our->our_req_ready) + list_move(&our->our_list, &list); + else + list_del_init(&our->our_list); + + if (our->our_th->ot_super.th_result == 0) + our->our_th->ot_super.th_result = -EIO; + + if (our->our_version >= ou->ou_rpc_version) + ou->ou_rpc_version = our->our_version + 1; + spin_unlock(&our->our_list_lock); + + CDEBUG(D_HA, "%s invalidate our %p\n", osp->opd_obd->obd_name, + our); + } + spin_unlock(&ou->ou_lock); + + /* invalidate all of request in the sending list */ + list_for_each_entry_safe(our, tmp, &list, our_list) { + spin_lock(&our->our_list_lock); + list_del_init(&our->our_list); + spin_unlock(&our->our_list_lock); + osp_trans_callback(&env, our->our_th, + our->our_th->ot_super.th_result); + osp_thandle_put(our->our_th); + } + lu_env_fini(&env); } /** @@ -1291,32 +1445,43 @@ int osp_send_update_thread(void *arg) our = NULL; l_wait_event(ou->ou_waitq, !osp_send_update_thread_running(osp) || - osp_get_next_request(ou, &our), - &lwi); + osp_get_next_request(ou, &our), &lwi); if (!osp_send_update_thread_running(osp)) { - if (our != NULL && our->our_th != NULL) { + if (our != NULL) { osp_trans_callback(&env, our->our_th, -EINTR); osp_thandle_put(our->our_th); } break; } - if (our->our_req_sent == 0) { - if (our->our_th != NULL && - our->our_th->ot_super.th_result != 0) - osp_trans_callback(&env, our->our_th, - our->our_th->ot_super.th_result); - else - rc = osp_send_update_req(&env, osp, our); + LASSERT(our->our_th != NULL); + if (our->our_th->ot_super.th_result != 0) { + osp_trans_callback(&env, our->our_th, + our->our_th->ot_super.th_result); + rc = our->our_th->ot_super.th_result; + } else if (OBD_FAIL_CHECK(OBD_FAIL_INVALIDATE_UPDATE)) { + rc = -EIO; + osp_trans_callback(&env, our->our_th, rc); + } else { + rc = osp_send_update_req(&env, osp, our); } - if (our->our_th != NULL) { - /* Update the rpc version */ - osp_update_rpc_version(ou, our->our_th); - /* Balanced for thandle_get in osp_trans_trigger() */ - osp_thandle_put(our->our_th); - } + /* Update the rpc version */ + spin_lock(&ou->ou_lock); + if (our->our_version == ou->ou_rpc_version) + ou->ou_rpc_version++; + spin_unlock(&ou->ou_lock); + + /* If one update request fails, let's fail all of the requests + * in the sending list, because the request in the sending + * list are dependent on either other, continue sending these + * request might cause llog or filesystem corruption */ + if (rc < 0) + osp_invalidate_request(osp); + + /* Balanced for thandle_get in osp_check_and_set_rpc_version */ + osp_thandle_put(our->our_th); } thread->t_flags = SVC_STOPPED; @@ -1327,36 +1492,6 @@ int osp_send_update_thread(void *arg) } /** - * Trigger the request for remote updates. - * - * Add the request to the sending list, and wake up osp update - * sending thread. - * - * \param[in] env pointer to the thread context - * \param[in] osp pointer to the OSP device - * \param[in] oth pointer to the transaction handler - * - */ -static void osp_trans_trigger(const struct lu_env *env, - struct osp_device *osp, - struct osp_thandle *oth) -{ - - CDEBUG(D_INFO, "%s: add oth %p with version "LPU64"\n", - osp->opd_obd->obd_name, oth, oth->ot_version); - - LASSERT(oth->ot_magic == OSP_THANDLE_MAGIC); - osp_thandle_get(oth); - LASSERT(oth->ot_our != NULL); - spin_lock(&osp->opd_update->ou_lock); - list_add_tail(&oth->ot_our->our_list, - &osp->opd_update->ou_list); - spin_unlock(&osp->opd_update->ou_lock); - - wake_up(&osp->opd_update->ou_waitq); -} - -/** * The OSP layer dt_device_operations::dt_trans_start() interface * to start the transaction. * @@ -1390,7 +1525,7 @@ int osp_trans_start(const struct lu_env *env, struct dt_device *dt, * to stop the transaction. * * If the transaction is a remote transaction, related remote - * updates will be triggered here via osp_trans_trigger(). + * updates will be triggered at the end of this function. * * For synchronous mode update or any failed update, the request * will be destroyed explicitly when the osp_trans_stop(). @@ -1438,17 +1573,25 @@ int osp_trans_stop(const struct lu_env *env, struct dt_device *dt, GOTO(out, rc = -EIO); } - if (th->th_sync) { - /* if th_sync is set, then it needs to be sent - * right away. Note: even thought the RPC has been - * sent, it still needs to be added to the sending - * list (see osp_trans_trigger()), so ou_rpc_version - * can be updated correctly. */ + CDEBUG(D_HA, "%s: add oth %p with version "LPU64"\n", + osp->opd_obd->obd_name, oth, our->our_version); + + LASSERT(our->our_req_ready == 0); + spin_lock(&our->our_list_lock); + if (likely(!list_empty(&our->our_list))) { + /* notify sending thread */ + our->our_req_ready = 1; + wake_up(&osp->opd_update->ou_waitq); + spin_unlock(&our->our_list_lock); + } else if (th->th_result == 0) { + /* if the request does not needs to be serialized, + * read-only request etc, let's send it right away */ + spin_unlock(&our->our_list_lock); rc = osp_send_update_req(env, osp, our); - our->our_req_sent = 1; + } else { + spin_unlock(&our->our_list_lock); + osp_trans_callback(env, oth, th->th_result); } - - osp_trans_trigger(env, osp, oth); out: osp_thandle_put(oth);