struct out_update_header *ouh;
struct out_update_buffer *oub;
__u32 buf_count = 0;
+ int page_count = 0;
int repsize = 0;
struct object_update_reply *reply;
int rc, i;
list_for_each_entry(ours, &our->our_req_list, ours_list) {
oub->oub_size = ours->ours_req_size;
oub++;
+ /* First *and* last might be partial pages, hence +1 */
+ page_count += DIV_ROUND_UP(ours->ours_req_size, PAGE_SIZE) + 1;
}
req->rq_bulk_write = 1;
- desc = ptlrpc_prep_bulk_imp(req, buf_count,
+ desc = ptlrpc_prep_bulk_imp(req, page_count,
MD_MAX_BRW_SIZE >> LNET_MTU_BITS,
- PTLRPC_BULK_GET_SOURCE | PTLRPC_BULK_BUF_KVEC,
- MDS_BULK_PORTAL, &ptlrpc_bulk_kvec_ops);
+ PTLRPC_BULK_GET_SOURCE,
+ MDS_BULK_PORTAL, &ptlrpc_bulk_kiov_nopin_ops);
if (desc == NULL)
GOTO(out_req, rc = -ENOMEM);
}
if (oaua->oaua_count != NULL && atomic_dec_and_test(oaua->oaua_count))
- wake_up_all(oaua->oaua_waitq);
+ wake_up(oaua->oaua_waitq);
if (oth != NULL) {
/* oth and osp_update_requests will be destoryed in
}
osp_update_request_destroy(env, our);
} else {
- args = ptlrpc_req_async_args(req);
+ args = ptlrpc_req_async_args(args, req);
args->oaua_update = our;
args->oaua_count = NULL;
args->oaua_waitq = NULL;
struct osp_update_args *args;
struct ptlrpc_request *req;
struct osp_thandle *oth = our->our_th;
+ struct osp_updates *ou = osp->opd_update;
int rc = 0;
ENTRY;
LASSERT(oth != NULL);
+
+ if (ou && ou->ou_generation != our->our_generation) {
+ const struct lnet_process_id *peer =
+ &osp->opd_obd->u.cli.cl_import->imp_connection->c_peer;
+ rc = -ESTALE;
+ osp_trans_callback(env, oth, rc);
+ CDEBUG(D_HA, "%s: stale tx to %s: gen %llu != %llu: rc = %d\n",
+ osp->opd_obd->obd_name, libcfs_nid2str(peer->nid),
+ osp->opd_update->ou_generation, our->our_generation, rc);
+ RETURN(rc);
+ }
+
rc = osp_prep_update_req(env, osp->opd_obd->u.cli.cl_import,
our, &req);
if (rc != 0) {
RETURN(rc);
}
- args = ptlrpc_req_async_args(req);
+ args = ptlrpc_req_async_args(args, req);
args->oaua_update = our;
/* set env to NULL, in case the interrupt cb and current function
* are in different thread */
return;
}
- INIT_LIST_HEAD(&list);
-
spin_lock(&ou->ou_lock);
/* invalidate all of request in the sending list */
list_for_each_entry_safe(our, tmp, &ou->ou_list, our_list) {
spin_lock(&our->our_list_lock);
- if (our->our_req_ready)
+ if (our->our_req_ready) {
list_move(&our->our_list, &list);
- else
+ } else {
+ /* this thandle won't be forwarded to
+ * the dedicated thread, so drop the
+ * reference here */
+ osp_thandle_put(&env, our->our_th);
list_del_init(&our->our_list);
+ }
if (our->our_th->ot_super.th_result == 0)
our->our_th->ot_super.th_result = -EIO;
*/
int osp_send_update_thread(void *arg)
{
- struct lu_env env;
+ struct lu_env *env;
struct osp_device *osp = arg;
- struct l_wait_info lwi = { 0 };
struct osp_updates *ou = osp->opd_update;
- struct ptlrpc_thread *thread = &osp->opd_update_thread;
struct osp_update_request *our = NULL;
int rc;
ENTRY;
LASSERT(ou != NULL);
- rc = lu_env_init(&env, osp->opd_dt_dev.dd_lu_dev.ld_type->ldt_ctx_tags);
- if (rc < 0) {
- CERROR("%s: init env error: rc = %d\n", osp->opd_obd->obd_name,
- rc);
- RETURN(rc);
- }
+ env = &ou->ou_env;
- thread->t_flags = SVC_RUNNING;
- wake_up(&thread->t_ctl_waitq);
while (1) {
our = NULL;
- l_wait_event(ou->ou_waitq,
- !osp_send_update_thread_running(osp) ||
- osp_get_next_request(ou, &our), &lwi);
+ wait_event_idle(ou->ou_waitq,
+ kthread_should_stop() ||
+ osp_get_next_request(ou, &our));
- if (!osp_send_update_thread_running(osp)) {
+ if (kthread_should_stop()) {
if (our != NULL) {
- osp_trans_callback(&env, our->our_th, -EINTR);
- osp_thandle_put(&env, our->our_th);
+ osp_trans_callback(env, our->our_th, -EINTR);
+ osp_thandle_put(env, our->our_th);
}
break;
}
LASSERT(our->our_th != NULL);
if (our->our_th->ot_super.th_result != 0) {
- osp_trans_callback(&env, our->our_th,
+ osp_trans_callback(env, our->our_th,
our->our_th->ot_super.th_result);
rc = our->our_th->ot_super.th_result;
- } else if (ou->ou_generation != our->our_generation ||
- OBD_FAIL_CHECK(OBD_FAIL_INVALIDATE_UPDATE)) {
+ } else if (OBD_FAIL_CHECK(OBD_FAIL_INVALIDATE_UPDATE)) {
rc = -EIO;
- osp_trans_callback(&env, our->our_th, rc);
+ osp_trans_callback(env, our->our_th, rc);
} else {
- rc = osp_send_update_req(&env, osp, our);
+ rc = osp_send_update_req(env, osp, our);
}
/* Update the rpc version */
osp_invalidate_request(osp);
/* Balanced for thandle_get in osp_check_and_set_rpc_version */
- osp_thandle_put(&env, our->our_th);
+ osp_thandle_put(env, our->our_th);
}
- thread->t_flags = SVC_STOPPED;
- lu_env_fini(&env);
- wake_up(&thread->t_ctl_waitq);
-
RETURN(0);
}
struct thandle *th)
{
struct osp_thandle *oth = thandle_to_osp_thandle(th);
+ struct osp_device *osp = dt2osp_dev(dt);
+ struct osp_updates *ou = osp->opd_update;
+ if (ou) {
+ LASSERT(oth->ot_our);
+ oth->ot_our->our_generation = ou->ou_generation;
+ }
if (oth->ot_super.th_sync)
oth->ot_our->our_flags |= UPDATE_FL_SYNC;
/* For remote thandle, if there are local thandle, start it here*/
GOTO(out, rc);
}
- if (osp->opd_update == NULL ||
- !osp_send_update_thread_running(osp)) {
+ if (osp->opd_update == NULL) {
osp_trans_callback(env, oth, -EIO);
GOTO(out, rc = -EIO);
}