INIT_LIST_HEAD(&our->our_req_list);
INIT_LIST_HEAD(&our->our_cb_items);
INIT_LIST_HEAD(&our->our_list);
+ spin_lock_init(&our->our_list_lock);
osp_object_update_request_create(our, OUT_UPDATE_INIT_BUFFER_SIZE);
return our;
RETURN(rc);
}
-static void osp_trans_stop_cb(struct osp_thandle *oth, int result)
+/**
+ * Invalidate all objects in the osp thandle
+ *
+ * invalidate all of objects in the update request, which will be called
+ * when the transaction is aborted.
+ *
+ * \param[in] oth osp thandle.
+ */
+static void osp_thandle_invalidate_object(const struct lu_env *env,
+ struct osp_thandle *oth)
+{
+ struct osp_update_request *our = oth->ot_our;
+ struct osp_update_request_sub *ours;
+
+ if (our == NULL)
+ return;
+
+ list_for_each_entry(ours, &our->our_req_list, ours_list) {
+ struct object_update_request *our_req = ours->ours_req;
+ unsigned int i;
+ struct lu_object *obj;
+ struct osp_object *osp_obj;
+
+ for (i = 0; i < our_req->ourq_count; i++) {
+ struct object_update *update;
+
+ update = object_update_request_get(our_req, i, NULL);
+ if (update == NULL)
+ break;
+
+ if (update->ou_type != OUT_WRITE)
+ continue;
+
+ if (!fid_is_sane(&update->ou_fid))
+ continue;
+
+ obj = lu_object_find_slice(env,
+ &oth->ot_super.th_dev->dd_lu_dev,
+ &update->ou_fid, NULL);
+ if (IS_ERR(obj))
+ break;
+
+ osp_obj = lu2osp_obj(obj);
+ if (osp_obj->opo_ooa != NULL) {
+ spin_lock(&osp_obj->opo_lock);
+ osp_obj->opo_ooa->ooa_attr.la_valid = 0;
+ osp_obj->opo_stale = 1;
+ spin_unlock(&osp_obj->opo_lock);
+ }
+ lu_object_put(env, obj);
+ }
+ }
+}
+
+static void osp_trans_stop_cb(const struct lu_env *env,
+ struct osp_thandle *oth, int result)
{
struct dt_txn_commit_cb *dcb;
struct dt_txn_commit_cb *tmp;
list_del_init(&dcb->dcb_linkage);
dcb->dcb_func(NULL, &oth->ot_super, dcb, result);
}
+
+ if (result < 0)
+ osp_thandle_invalidate_object(env, oth);
}
/**
reply = req_capsule_server_sized_get(&req->rq_pill,
&RMF_OUT_UPDATE_REPLY,
OUT_UPDATE_REPLY_SIZE);
- if (reply == NULL || reply->ourp_magic != UPDATE_REPLY_MAGIC)
- rc1 = -EPROTO;
- else
+ if (reply == NULL || reply->ourp_magic != UPDATE_REPLY_MAGIC) {
+ if (rc == 0)
+ rc = -EPROTO;
+ } else {
count = reply->ourp_count;
- } else {
- rc1 = rc;
+ }
}
list_for_each_entry_safe(ouc, next, &our->our_cb_items, ouc_list) {
/* The peer may only have handled some requests (indicated
* by the 'count') in the packaged OUT RPC, we can only get
* results for the handled part. */
- if (index < count && reply->ourp_lens[index] > 0) {
+ if (index < count && reply->ourp_lens[index] > 0 && rc >= 0) {
struct object_update_result *result;
result = object_update_result_get(reply, index, NULL);
if (result == NULL)
- rc1 = -EPROTO;
+ rc1 = rc = -EPROTO;
else
- rc1 = result->our_rc;
- } else {
- rc1 = rc;
- if (unlikely(rc1 == 0))
+ rc1 = rc = result->our_rc;
+ } else if (rc1 >= 0) {
+ /* The peer did not handle these request, let's return
+ * -EINVAL to update interpret for now */
+ if (rc >= 0)
rc1 = -EINVAL;
+ else
+ rc1 = rc;
}
if (ouc->ouc_interpreter != NULL)
if (oth != NULL) {
/* oth and osp_update_requests will be destoryed in
* osp_thandle_put */
- osp_trans_stop_cb(oth, rc);
+ osp_trans_stop_cb(env, oth, rc);
osp_thandle_put(oth);
} else {
osp_update_request_destroy(our);
}
- RETURN(0);
+ RETURN(rc);
}
/**
osp_update_callback_fini(env, ouc);
}
}
- osp_trans_stop_cb(oth, rc);
+ osp_trans_stop_cb(env, oth, rc);
osp_trans_commit_cb(oth, rc);
}
ENTRY;
LASSERT(oth != NULL);
- LASSERT(our->our_req_sent == 0);
rc = osp_prep_update_req(env, osp->opd_obd->u.cli.cl_import,
our, &req);
if (rc != 0) {
/**
* Set version for the transaction
*
- * Set the version for the transaction, then the osp RPC will be
- * sent in the order of version, i.e. the transaction with lower
- * version will be sent first.
+ * Set the version for the transaction and add the request to
+ * the sending list, then after transaction stop, the request
+ * will be picked in the order of version, by sending thread.
*
* \param [in] oth osp thandle to be set version.
*
* \retval 0 if set version succeeds
* negative errno if set version fails.
*/
-int osp_check_and_set_rpc_version(struct osp_thandle *oth)
+int osp_check_and_set_rpc_version(struct osp_thandle *oth,
+ struct osp_object *obj)
{
struct osp_device *osp = dt2osp_dev(oth->ot_super.th_dev);
struct osp_updates *ou = osp->opd_update;
if (ou == NULL)
return -EIO;
- if (oth->ot_version != 0)
+ if (oth->ot_our->our_version != 0)
return 0;
spin_lock(&ou->ou_lock);
- oth->ot_version = ou->ou_version++;
+ spin_lock(&oth->ot_our->our_list_lock);
+ if (obj->opo_stale) {
+ spin_unlock(&oth->ot_our->our_list_lock);
+ spin_unlock(&ou->ou_lock);
+ return -ESTALE;
+ }
+
+ /* Assign the version and add it to the sending list */
+ osp_thandle_get(oth);
+ oth->ot_our->our_version = ou->ou_version++;
+ list_add_tail(&oth->ot_our->our_list,
+ &osp->opd_update->ou_list);
+ oth->ot_our->our_req_ready = 0;
+ spin_unlock(&oth->ot_our->our_list_lock);
spin_unlock(&ou->ou_lock);
+ LASSERT(oth->ot_super.th_wait_submit == 1);
CDEBUG(D_INFO, "%s: version "LPU64" oth:version %p:"LPU64"\n",
- osp->opd_obd->obd_name, ou->ou_version, oth, oth->ot_version);
+ osp->opd_obd->obd_name, ou->ou_version, oth,
+ oth->ot_our->our_version);
return 0;
}
spin_lock(&ou->ou_lock);
list_for_each_entry_safe(our, tmp, &ou->ou_list, our_list) {
LASSERT(our->our_th != NULL);
- CDEBUG(D_INFO, "our %p version "LPU64" rpc_version "LPU64"\n",
- our, our->our_th->ot_version, ou->ou_rpc_version);
- if (our->our_th->ot_version == 0) {
- list_del_init(&our->our_list);
- *ourp = our;
- got_req = true;
- break;
- }
-
+ CDEBUG(D_HA, "ou %p version "LPU64" rpc_version "LPU64"\n",
+ ou, our->our_version, ou->ou_rpc_version);
+ spin_lock(&our->our_list_lock);
/* Find next osp_update_request in the list */
- if (our->our_th->ot_version == ou->ou_rpc_version) {
+ if (our->our_version == ou->ou_rpc_version &&
+ our->our_req_ready) {
list_del_init(&our->our_list);
+ spin_unlock(&our->our_list_lock);
*ourp = our;
got_req = true;
break;
}
+ spin_unlock(&our->our_list_lock);
}
spin_unlock(&ou->ou_lock);
return got_req;
}
-static void osp_update_rpc_version(struct osp_updates *ou,
- struct osp_thandle *oth)
+/**
+ * Invalidate update request
+ *
+ * Invalidate update request in the OSP sending list, so all of
+ * requests in the sending list will return error, which happens
+ * when it finds one update (with writing llog) requests fails or
+ * the OSP is evicted by remote target. see osp_send_update_thread().
+ *
+ * \param[in] osp OSP device whose update requests will be
+ * invalidated.
+ **/
+void osp_invalidate_request(struct osp_device *osp)
{
- if (oth->ot_version == 0)
+ struct lu_env env;
+ struct osp_updates *ou = osp->opd_update;
+ struct osp_update_request *our;
+ struct osp_update_request *tmp;
+ LIST_HEAD(list);
+ int rc;
+ ENTRY;
+
+ if (ou == NULL)
+ return;
+
+ rc = lu_env_init(&env, osp->opd_dt_dev.dd_lu_dev.ld_type->ldt_ctx_tags);
+ if (rc < 0) {
+ CERROR("%s: init env error: rc = %d\n", osp->opd_obd->obd_name,
+ rc);
return;
+ }
+
+ INIT_LIST_HEAD(&list);
- LASSERT(oth->ot_version == ou->ou_rpc_version);
spin_lock(&ou->ou_lock);
- ou->ou_rpc_version++;
+ /* invalidate all of request in the sending list */
+ list_for_each_entry_safe(our, tmp, &ou->ou_list, our_list) {
+ spin_lock(&our->our_list_lock);
+ if (our->our_req_ready)
+ list_move(&our->our_list, &list);
+ else
+ list_del_init(&our->our_list);
+
+ if (our->our_th->ot_super.th_result == 0)
+ our->our_th->ot_super.th_result = -EIO;
+
+ if (our->our_version >= ou->ou_rpc_version)
+ ou->ou_rpc_version = our->our_version + 1;
+ spin_unlock(&our->our_list_lock);
+
+ CDEBUG(D_HA, "%s invalidate our %p\n", osp->opd_obd->obd_name,
+ our);
+ }
+
spin_unlock(&ou->ou_lock);
+
+ /* invalidate all of request in the sending list */
+ list_for_each_entry_safe(our, tmp, &list, our_list) {
+ spin_lock(&our->our_list_lock);
+ list_del_init(&our->our_list);
+ spin_unlock(&our->our_list_lock);
+ osp_trans_callback(&env, our->our_th,
+ our->our_th->ot_super.th_result);
+ osp_thandle_put(our->our_th);
+ }
+ lu_env_fini(&env);
}
/**
our = NULL;
l_wait_event(ou->ou_waitq,
!osp_send_update_thread_running(osp) ||
- osp_get_next_request(ou, &our),
- &lwi);
+ osp_get_next_request(ou, &our), &lwi);
if (!osp_send_update_thread_running(osp)) {
- if (our != NULL && our->our_th != NULL) {
+ if (our != NULL) {
osp_trans_callback(&env, our->our_th, -EINTR);
osp_thandle_put(our->our_th);
}
break;
}
- if (our->our_req_sent == 0) {
- if (our->our_th != NULL &&
- our->our_th->ot_super.th_result != 0)
- osp_trans_callback(&env, our->our_th,
- our->our_th->ot_super.th_result);
- else
- rc = osp_send_update_req(&env, osp, our);
+ LASSERT(our->our_th != NULL);
+ if (our->our_th->ot_super.th_result != 0) {
+ osp_trans_callback(&env, our->our_th,
+ our->our_th->ot_super.th_result);
+ rc = our->our_th->ot_super.th_result;
+ } else if (OBD_FAIL_CHECK(OBD_FAIL_INVALIDATE_UPDATE)) {
+ rc = -EIO;
+ osp_trans_callback(&env, our->our_th, rc);
+ } else {
+ rc = osp_send_update_req(&env, osp, our);
}
- if (our->our_th != NULL) {
- /* Update the rpc version */
- osp_update_rpc_version(ou, our->our_th);
- /* Balanced for thandle_get in osp_trans_trigger() */
- osp_thandle_put(our->our_th);
- }
+ /* Update the rpc version */
+ spin_lock(&ou->ou_lock);
+ if (our->our_version == ou->ou_rpc_version)
+ ou->ou_rpc_version++;
+ spin_unlock(&ou->ou_lock);
+
+ /* If one update request fails, let's fail all of the requests
+ * in the sending list, because the request in the sending
+ * list are dependent on either other, continue sending these
+ * request might cause llog or filesystem corruption */
+ if (rc < 0)
+ osp_invalidate_request(osp);
+
+ /* Balanced for thandle_get in osp_check_and_set_rpc_version */
+ osp_thandle_put(our->our_th);
}
thread->t_flags = SVC_STOPPED;
}
/**
- * Trigger the request for remote updates.
- *
- * Add the request to the sending list, and wake up osp update
- * sending thread.
- *
- * \param[in] env pointer to the thread context
- * \param[in] osp pointer to the OSP device
- * \param[in] oth pointer to the transaction handler
- *
- */
-static void osp_trans_trigger(const struct lu_env *env,
- struct osp_device *osp,
- struct osp_thandle *oth)
-{
-
- CDEBUG(D_INFO, "%s: add oth %p with version "LPU64"\n",
- osp->opd_obd->obd_name, oth, oth->ot_version);
-
- LASSERT(oth->ot_magic == OSP_THANDLE_MAGIC);
- osp_thandle_get(oth);
- LASSERT(oth->ot_our != NULL);
- spin_lock(&osp->opd_update->ou_lock);
- list_add_tail(&oth->ot_our->our_list,
- &osp->opd_update->ou_list);
- spin_unlock(&osp->opd_update->ou_lock);
-
- wake_up(&osp->opd_update->ou_waitq);
-}
-
-/**
* The OSP layer dt_device_operations::dt_trans_start() interface
* to start the transaction.
*
* to stop the transaction.
*
* If the transaction is a remote transaction, related remote
- * updates will be triggered here via osp_trans_trigger().
+ * updates will be triggered at the end of this function.
*
* For synchronous mode update or any failed update, the request
* will be destroyed explicitly when the osp_trans_stop().
GOTO(out, rc = -EIO);
}
- if (th->th_sync) {
- /* if th_sync is set, then it needs to be sent
- * right away. Note: even thought the RPC has been
- * sent, it still needs to be added to the sending
- * list (see osp_trans_trigger()), so ou_rpc_version
- * can be updated correctly. */
+ CDEBUG(D_HA, "%s: add oth %p with version "LPU64"\n",
+ osp->opd_obd->obd_name, oth, our->our_version);
+
+ LASSERT(our->our_req_ready == 0);
+ spin_lock(&our->our_list_lock);
+ if (likely(!list_empty(&our->our_list))) {
+ /* notify sending thread */
+ our->our_req_ready = 1;
+ wake_up(&osp->opd_update->ou_waitq);
+ spin_unlock(&our->our_list_lock);
+ } else if (th->th_result == 0) {
+ /* if the request does not needs to be serialized,
+ * read-only request etc, let's send it right away */
+ spin_unlock(&our->our_list_lock);
rc = osp_send_update_req(env, osp, our);
- our->our_req_sent = 1;
+ } else {
+ spin_unlock(&our->our_list_lock);
+ osp_trans_callback(env, oth, th->th_result);
}
-
- osp_trans_trigger(env, osp, oth);
out:
osp_thandle_put(oth);