RETURN(-ENOTCONN);
id = d->opd_syn_last_used_id;
+ down_write(&d->opd_async_updates_rwsem);
+
+ CDEBUG(D_OTHER, "%s: async updates %d\n", d->opd_obd->obd_name,
+ atomic_read(&d->opd_async_updates_count));
+
+ /* make sure the connection is fine */
+ expire = cfs_time_shift(obd_timeout);
+ lwi = LWI_TIMEOUT(expire - cfs_time_current(), osp_sync_timeout, d);
+ rc = l_wait_event(d->opd_syn_barrier_waitq,
+ atomic_read(&d->opd_async_updates_count) == 0,
+ &lwi);
+ up_write(&d->opd_async_updates_rwsem);
+ if (rc != 0)
+ GOTO(out, rc);
CDEBUG(D_OTHER, "%s: id: used %lu, processed %lu\n",
d->opd_obd->obd_name, id, d->opd_syn_last_processed_id);
ENTRY;
mutex_init(&osp->opd_async_requests_mutex);
+ INIT_LIST_HEAD(&osp->opd_async_updates);
+ init_rwsem(&osp->opd_async_updates_rwsem);
+ atomic_set(&osp->opd_async_updates_count, 0);
obd = class_name2obd(lustre_cfg_string(cfg, 0));
if (obd == NULL) {
struct dt_update_request *opd_async_requests;
/* Protect current operations on opd_async_requests. */
struct mutex opd_async_requests_mutex;
+ struct list_head opd_async_updates;
+ struct rw_semaphore opd_async_updates_rwsem;
+ atomic_t opd_async_updates_count;
};
#define opd_pre_lock opd_pre->osp_pre_lock
struct osp_async_update_args {
struct dt_update_request *oaua_update;
+ atomic_t *oaua_count;
+ wait_queue_head_t *oaua_waitq;
bool oaua_flow_control;
};
index++;
}
+ if (oaua->oaua_count != NULL && atomic_dec_and_test(oaua->oaua_count))
+ wake_up_all(oaua->oaua_waitq);
+
dt_update_request_destroy(dt_update);
return 0;
args = ptlrpc_req_async_args(req);
args->oaua_update = update;
+ args->oaua_count = NULL;
+ args->oaua_waitq = NULL;
+ args->oaua_flow_control = false;
req->rq_interpret_reply = osp_async_update_interpret;
ptlrpcd_add_req(req, PDL_POLICY_LOCAL, -1);
}
rc = out_prep_update_req(env, osp->opd_obd->u.cli.cl_import,
dt_update->dur_buf.ub_req, &req);
if (rc == 0) {
+ down_read(&osp->opd_async_updates_rwsem);
+
args = ptlrpc_req_async_args(req);
args->oaua_update = dt_update;
+ args->oaua_count = &osp->opd_async_updates_count;
+ args->oaua_waitq = &osp->opd_syn_barrier_waitq;
args->oaua_flow_control = flow_control;
req->rq_interpret_reply =
osp_async_update_interpret;
+
+ atomic_inc(args->oaua_count);
+ up_read(&osp->opd_async_updates_rwsem);
+
ptlrpcd_add_req(req, PDL_POLICY_LOCAL, -1);
} else {
dt_update_request_destroy(dt_update);