+ * Set version for the transaction
+ *
+ * Set the version for the transaction and add the request to
+ * the sending list, then after transaction stop, the request
+ * will be sent in the order of version by the sending thread.
+ *
+ * \param [in] oth osp thandle to be set version.
+ *
+ * \retval 0 if set version succeeds
+ * negative errno if set version fails.
+ */
+int osp_check_and_set_rpc_version(struct osp_thandle *oth,
+ struct osp_object *obj)
+{
+ struct osp_device *osp = dt2osp_dev(oth->ot_super.th_dev);
+ struct osp_updates *ou = osp->opd_update;
+
+ if (ou == NULL)
+ return -EIO;
+
+ if (oth->ot_our->our_version != 0)
+ return 0;
+
+ spin_lock(&ou->ou_lock);
+ spin_lock(&oth->ot_our->our_list_lock);
+ if (obj->opo_stale) {
+ spin_unlock(&oth->ot_our->our_list_lock);
+ spin_unlock(&ou->ou_lock);
+ return -ESTALE;
+ }
+
+ /* Assign the version and add it to the sending list */
+ osp_thandle_get(oth);
+ oth->ot_our->our_version = ou->ou_version++;
+ oth->ot_our->our_generation = ou->ou_generation;
+ list_add_tail(&oth->ot_our->our_list,
+ &osp->opd_update->ou_list);
+ oth->ot_our->our_req_ready = 0;
+ spin_unlock(&oth->ot_our->our_list_lock);
+ spin_unlock(&ou->ou_lock);
+
+ LASSERT(oth->ot_super.th_wait_submit == 1);
+ CDEBUG(D_INFO, "%s: version %llu gen %llu oth:version %p:%llu\n",
+ osp->opd_obd->obd_name, ou->ou_version, ou->ou_generation, oth,
+ oth->ot_our->our_version);
+
+ return 0;
+}
+
+/**
+ * Get next OSP update request in the sending list
+ * Get next OSP update request in the sending list by version number, next
+ * request will be
+ * 1. transaction which does not have a version number.
+ * 2. transaction whose version == opd_rpc_version.
+ *
+ * \param [in] ou osp update structure.
+ * \param [out] ourp the pointer holding the next update request.
+ *
+ * \retval true if getting the next transaction.
+ * \retval false if not getting the next transaction.
+ */
+static bool
+osp_get_next_request(struct osp_updates *ou, struct osp_update_request **ourp)
+{
+ struct osp_update_request *our;
+ struct osp_update_request *tmp;
+ bool got_req = false;
+
+ spin_lock(&ou->ou_lock);
+ list_for_each_entry_safe(our, tmp, &ou->ou_list, our_list) {
+ LASSERT(our->our_th != NULL);
+ CDEBUG(D_HA, "ou %p version %llu rpc_version %llu\n",
+ ou, our->our_version, ou->ou_rpc_version);
+ spin_lock(&our->our_list_lock);
+ /* Find next osp_update_request in the list */
+ if (our->our_version == ou->ou_rpc_version &&
+ our->our_req_ready) {
+ list_del_init(&our->our_list);
+ spin_unlock(&our->our_list_lock);
+ *ourp = our;
+ got_req = true;
+ break;
+ }
+ spin_unlock(&our->our_list_lock);
+ }
+ spin_unlock(&ou->ou_lock);
+
+ return got_req;
+}
+
+/**
+ * Invalidate update request
+ *
+ * Invalidate update request in the OSP sending list, so all of
+ * requests in the sending list will return error, which happens
+ * when it finds one update (with writing llog) requests fails or
+ * the OSP is evicted by remote target. see osp_send_update_thread().
+ *
+ * \param[in] osp OSP device whose update requests will be
+ * invalidated.
+ **/
+void osp_invalidate_request(struct osp_device *osp)
+{
+ struct lu_env env;
+ struct osp_updates *ou = osp->opd_update;
+ struct osp_update_request *our;
+ struct osp_update_request *tmp;
+ LIST_HEAD(list);
+ int rc;
+ ENTRY;
+
+ if (ou == NULL)
+ return;
+
+ rc = lu_env_init(&env, osp->opd_dt_dev.dd_lu_dev.ld_type->ldt_ctx_tags);
+ if (rc < 0) {
+ CERROR("%s: init env error: rc = %d\n", osp->opd_obd->obd_name,
+ rc);
+
+ spin_lock(&ou->ou_lock);
+ ou->ou_generation++;
+ spin_unlock(&ou->ou_lock);
+
+ return;
+ }
+
+ spin_lock(&ou->ou_lock);
+ /* invalidate all of request in the sending list */
+ list_for_each_entry_safe(our, tmp, &ou->ou_list, our_list) {
+ spin_lock(&our->our_list_lock);
+ if (our->our_req_ready)
+ list_move(&our->our_list, &list);
+ else
+ list_del_init(&our->our_list);
+
+ if (our->our_th->ot_super.th_result == 0)
+ our->our_th->ot_super.th_result = -EIO;
+
+ if (our->our_version >= ou->ou_rpc_version)
+ ou->ou_rpc_version = our->our_version + 1;
+ spin_unlock(&our->our_list_lock);
+
+ CDEBUG(D_HA, "%s invalidate our %p\n", osp->opd_obd->obd_name,
+ our);
+ }
+
+ /* Increase the generation, then the update request with old generation
+ * will fail with -EIO. */
+ ou->ou_generation++;
+ spin_unlock(&ou->ou_lock);
+
+ /* invalidate all of request in the sending list */
+ list_for_each_entry_safe(our, tmp, &list, our_list) {
+ spin_lock(&our->our_list_lock);
+ list_del_init(&our->our_list);
+ spin_unlock(&our->our_list_lock);
+ osp_trans_callback(&env, our->our_th,
+ our->our_th->ot_super.th_result);
+ osp_thandle_put(&env, our->our_th);
+ }
+ lu_env_fini(&env);
+}
+
+/**
+ * Sending update thread
+ *
+ * Create thread to send update request to other MDTs, this thread will pull
+ * out update request from the list in OSP by version number, i.e. it will
+ * make sure the update request with lower version number will be sent first.
+ *
+ * \param[in] arg hold the OSP device.
+ *
+ * \retval 0 if the thread is created successfully.
+ * \retal negative error if the thread is not created
+ * successfully.
+ */
+int osp_send_update_thread(void *arg)
+{
+ struct lu_env env;
+ struct osp_device *osp = arg;
+ struct osp_updates *ou = osp->opd_update;
+ struct ptlrpc_thread *thread = &osp->opd_update_thread;
+ struct osp_update_request *our = NULL;
+ int rc;
+ ENTRY;
+
+ LASSERT(ou != NULL);
+ rc = lu_env_init(&env, osp->opd_dt_dev.dd_lu_dev.ld_type->ldt_ctx_tags);
+ if (rc < 0) {
+ CERROR("%s: init env error: rc = %d\n", osp->opd_obd->obd_name,
+ rc);
+ RETURN(rc);
+ }
+
+ thread->t_flags = SVC_RUNNING;
+ wake_up(&thread->t_ctl_waitq);
+ while (1) {
+ our = NULL;
+ wait_event_idle(ou->ou_waitq,
+ !osp_send_update_thread_running(osp) ||
+ osp_get_next_request(ou, &our));
+
+ if (!osp_send_update_thread_running(osp)) {
+ if (our != NULL) {
+ osp_trans_callback(&env, our->our_th, -EINTR);
+ osp_thandle_put(&env, our->our_th);
+ }
+ break;
+ }
+
+ LASSERT(our->our_th != NULL);
+ if (our->our_th->ot_super.th_result != 0) {
+ osp_trans_callback(&env, our->our_th,
+ our->our_th->ot_super.th_result);
+ rc = our->our_th->ot_super.th_result;
+ } else if (ou->ou_generation != our->our_generation ||
+ OBD_FAIL_CHECK(OBD_FAIL_INVALIDATE_UPDATE)) {
+ rc = -EIO;
+ osp_trans_callback(&env, our->our_th, rc);
+ } else {
+ rc = osp_send_update_req(&env, osp, our);
+ }
+
+ /* Update the rpc version */
+ spin_lock(&ou->ou_lock);
+ if (our->our_version == ou->ou_rpc_version)
+ ou->ou_rpc_version++;
+ spin_unlock(&ou->ou_lock);
+
+ /* If one update request fails, let's fail all of the requests
+ * in the sending list, because the request in the sending
+ * list are dependent on either other, continue sending these
+ * request might cause llog or filesystem corruption */
+ if (rc < 0)
+ osp_invalidate_request(osp);
+
+ /* Balanced for thandle_get in osp_check_and_set_rpc_version */
+ osp_thandle_put(&env, our->our_th);
+ }
+
+ thread->t_flags = SVC_STOPPED;
+ lu_env_fini(&env);
+ wake_up(&thread->t_ctl_waitq);
+
+ RETURN(0);
+}
+
+/**