* Use is subject to license terms.
*/
/*
+ * Copyright (c) 2011 Whamcloud, Inc.
+ */
+/*
* This file is part of Lustre, http://www.lustre.org/
* Lustre is a trademark of Sun Microsystems, Inc.
*/
struct obd_uuid server_uuid;
int rq_portal, rp_portal, connect_op;
char *name = obddev->obd_type->typ_name;
+ ldlm_ns_type_t ns_type = LDLM_NS_TYPE_UNKNOWN;
int rc;
ENTRY;
connect_op = OST_CONNECT;
cli->cl_sp_me = LUSTRE_SP_CLI;
cli->cl_sp_to = LUSTRE_SP_OST;
+ ns_type = LDLM_NS_TYPE_OSC;
+
} else if (!strcmp(name, LUSTRE_MDC_NAME)) {
rq_portal = MDS_REQUEST_PORTAL;
rp_portal = MDC_REPLY_PORTAL;
connect_op = MDS_CONNECT;
cli->cl_sp_me = LUSTRE_SP_CLI;
cli->cl_sp_to = LUSTRE_SP_MDT;
+ ns_type = LDLM_NS_TYPE_MDC;
+
} else if (!strcmp(name, LUSTRE_MGC_NAME)) {
rq_portal = MGS_REQUEST_PORTAL;
rp_portal = MGC_REPLY_PORTAL;
cli->cl_sp_me = LUSTRE_SP_MGC;
cli->cl_sp_to = LUSTRE_SP_MGS;
cli->cl_flvr_mgc.sf_rpc = SPTLRPC_FLVR_INVALID;
+ ns_type = LDLM_NS_TYPE_MGC;
+
} else {
CERROR("unknown client OBD type \"%s\", can't setup\n",
name);
obddev->obd_namespace = ldlm_namespace_new(obddev, obddev->obd_name,
LDLM_NAMESPACE_CLIENT,
- LDLM_NAMESPACE_GREEDY);
+ LDLM_NAMESPACE_GREEDY,
+ ns_type);
if (obddev->obd_namespace == NULL) {
CERROR("Unable to create client namespace - %s\n",
obddev->obd_name);
int rc = 0;
int mds_conn = 0;
struct obd_connect_data *data, *tmpdata;
+ int size, tmpsize;
lnet_nid_t *client_nid = NULL;
ENTRY;
conn = *tmp;
+ size = req_capsule_get_size(&req->rq_pill, &RMF_CONNECT_DATA,
+ RCL_CLIENT);
data = req_capsule_client_get(&req->rq_pill, &RMF_CONNECT_DATA);
if (!data)
GOTO(out, rc = -EPROTO);
export, (long)cfs_time_current_sec(),
export ? (long)export->exp_last_request_time : 0);
- /* Tell the client if we're in recovery. */
- if (target->obd_recovering) {
- lustre_msg_add_op_flags(req->rq_repmsg, MSG_CONNECT_RECOVERING);
- /* If this is the first time a client connects,
- reset the recovery timer */
- if (rc == 0)
- target_start_and_reset_recovery_timer(target, req,
- !export);
- }
+ /* If this is the first time a client connects,
+ * reset the recovery timer */
+ if (rc == 0 && target->obd_recovering)
+ target_start_and_reset_recovery_timer(target, req, !export);
/* We want to handle EALREADY but *not* -EALREADY from
* target_handle_reconnect(), return reconnection state in a flag */
/* Return only the parts of obd_connect_data that we understand, so the
* client knows that we don't understand the rest. */
if (data) {
- tmpdata = req_capsule_server_get(&req->rq_pill,
- &RMF_CONNECT_DATA);
- //data->ocd_connect_flags &= OBD_CONNECT_SUPPORTED;
- *tmpdata = *data;
+ tmpsize = req_capsule_get_size(&req->rq_pill, &RMF_CONNECT_DATA,
+ RCL_SERVER);
+ tmpdata = req_capsule_server_get(&req->rq_pill,
+ &RMF_CONNECT_DATA);
+ /* Don't use struct assignment here, because the client reply
+ * buffer may be smaller/larger than the local struct
+ * obd_connect_data. */
+ memcpy(tmpdata, data, min(tmpsize, size));
}
/* If all else goes well, this is our RPC return code. */
cfs_waitq_signal(&target->obd_next_transno_waitq);
}
cfs_spin_unlock(&target->obd_recovery_task_lock);
+
+ /* Tell the client we're in recovery, when client is involved in it. */
+ if (target->obd_recovering)
+ lustre_msg_add_op_flags(req->rq_repmsg, MSG_CONNECT_RECOVERING);
+
tmp = req_capsule_client_get(&req->rq_pill, &RMF_CONN);
conn = *tmp;
if (exp->exp_imp_reverse != NULL)
client_destroy_import(exp->exp_imp_reverse);
- LASSERT(cfs_atomic_read(&exp->exp_locks_count) == 0);
- LASSERT(cfs_atomic_read(&exp->exp_rpc_count) == 0);
- LASSERT(cfs_atomic_read(&exp->exp_cb_count) == 0);
- LASSERT(cfs_atomic_read(&exp->exp_replay_count) == 0);
+ LASSERT_ATOMIC_ZERO(&exp->exp_locks_count);
+ LASSERT_ATOMIC_ZERO(&exp->exp_rpc_count);
+ LASSERT_ATOMIC_ZERO(&exp->exp_cb_count);
+ LASSERT_ATOMIC_ZERO(&exp->exp_replay_count);
}
/*
class_export_rpc_get(req->rq_export);
LASSERT(cfs_list_empty(&req->rq_list));
CFS_INIT_LIST_HEAD(&req->rq_replay_list);
+
/* increase refcount to keep request in queue */
- LASSERT(cfs_atomic_read(&req->rq_refcount));
cfs_atomic_inc(&req->rq_refcount);
/** let export know it has replays to be handled */
cfs_atomic_inc(&req->rq_export->exp_replay_count);
- /* release service thread while request is queued
- * we are moving the request from active processing
- * to waiting on the replay queue */
- ptlrpc_server_active_request_dec(req);
}
static void target_request_copy_put(struct ptlrpc_request *req)
{
LASSERT(cfs_list_empty(&req->rq_replay_list));
- LASSERT(cfs_atomic_read(&req->rq_export->exp_replay_count) > 0);
+ LASSERT_ATOMIC_POS(&req->rq_export->exp_replay_count);
+
cfs_atomic_dec(&req->rq_export->exp_replay_count);
class_export_rpc_put(req->rq_export);
- /* ptlrpc_server_drop_request() assumes the request is active */
- ptlrpc_server_active_request_inc(req);
ptlrpc_server_drop_request(req);
}
cfs_init_completion(&trd->trd_finishing);
trd->trd_recovery_handler = handler;
- if (cfs_kernel_thread(target_recovery_thread, lut, 0) > 0) {
+ if (cfs_create_thread(target_recovery_thread, lut, 0) > 0) {
cfs_wait_for_completion(&trd->trd_starting);
LASSERT(obd->obd_recovering != 0);
} else
if (exp->exp_req_replay_needed) {
exp->exp_req_replay_needed = 0;
cfs_spin_unlock(&exp->exp_lock);
- LASSERT(cfs_atomic_read(&obd->obd_req_replay_clients));
+
+ LASSERT_ATOMIC_POS(&obd->obd_req_replay_clients);
cfs_atomic_dec(&obd->obd_req_replay_clients);
} else {
cfs_spin_unlock(&exp->exp_lock);
if (exp->exp_lock_replay_needed) {
exp->exp_lock_replay_needed = 0;
cfs_spin_unlock(&exp->exp_lock);
- LASSERT(cfs_atomic_read(&obd->obd_lock_replay_clients));
+
+ LASSERT_ATOMIC_POS(&obd->obd_lock_replay_clients);
cfs_atomic_dec(&obd->obd_lock_replay_clients);
} else {
cfs_spin_unlock(&exp->exp_lock);
cfs_spin_unlock(&exp->exp_locks_list_guard);
}
#endif
+
+static int target_bulk_timeout(void *data)
+{
+ ENTRY;
+ /* We don't fail the connection here, because having the export
+ * killed makes the (vital) call to commitrw very sad.
+ */
+ RETURN(1);
+}
+
+static inline char *bulk2type(struct ptlrpc_bulk_desc *desc)
+{
+ return desc->bd_type == BULK_GET_SINK ? "GET" : "PUT";
+}
+
+int target_bulk_io(struct obd_export *exp, struct ptlrpc_bulk_desc *desc,
+ struct l_wait_info *lwi)
+{
+ struct ptlrpc_request *req = desc->bd_req;
+ int rc = 0;
+ ENTRY;
+
+ /* Check if there is eviction in progress, and if so, wait for
+ * it to finish */
+ if (unlikely(cfs_atomic_read(&exp->exp_obd->obd_evict_inprogress))) {
+ *lwi = LWI_INTR(NULL, NULL);
+ rc = l_wait_event(exp->exp_obd->obd_evict_inprogress_waitq,
+ !cfs_atomic_read(&exp->exp_obd->
+ obd_evict_inprogress),
+ lwi);
+ }
+
+ /* Check if client was evicted or tried to reconnect already */
+ if (exp->exp_failed || exp->exp_abort_active_req) {
+ rc = -ENOTCONN;
+ } else {
+ if (desc->bd_type == BULK_PUT_SINK)
+ rc = sptlrpc_svc_wrap_bulk(req, desc);
+ if (rc == 0)
+ rc = ptlrpc_start_bulk_transfer(desc);
+ }
+
+ if (rc == 0 && OBD_FAIL_CHECK(OBD_FAIL_MDS_SENDPAGE)) {
+ ptlrpc_abort_bulk(desc);
+ } else if (rc == 0) {
+ time_t start = cfs_time_current_sec();
+ do {
+ long timeoutl = req->rq_deadline - cfs_time_current_sec();
+ cfs_duration_t timeout = timeoutl <= 0 ?
+ CFS_TICK : cfs_time_seconds(timeoutl);
+ *lwi = LWI_TIMEOUT_INTERVAL(timeout,
+ cfs_time_seconds(1),
+ target_bulk_timeout,
+ desc);
+ rc = l_wait_event(desc->bd_waitq,
+ !ptlrpc_server_bulk_active(desc) ||
+ exp->exp_failed ||
+ exp->exp_abort_active_req,
+ lwi);
+ LASSERT(rc == 0 || rc == -ETIMEDOUT);
+ /* Wait again if we changed deadline */
+ } while ((rc == -ETIMEDOUT) &&
+ (req->rq_deadline > cfs_time_current_sec()));
+
+ if (rc == -ETIMEDOUT) {
+ DEBUG_REQ(D_ERROR, req,
+ "timeout on bulk %s after %ld%+lds",
+ bulk2type(desc),
+ req->rq_deadline - start,
+ cfs_time_current_sec() -
+ req->rq_deadline);
+ ptlrpc_abort_bulk(desc);
+ } else if (exp->exp_failed) {
+ DEBUG_REQ(D_ERROR, req, "Eviction on bulk %s",
+ bulk2type(desc));
+ rc = -ENOTCONN;
+ ptlrpc_abort_bulk(desc);
+ } else if (exp->exp_abort_active_req) {
+ DEBUG_REQ(D_ERROR, req, "Reconnect on bulk %s",
+ bulk2type(desc));
+ /* we don't reply anyway */
+ rc = -ETIMEDOUT;
+ ptlrpc_abort_bulk(desc);
+ } else if (!desc->bd_success ||
+ desc->bd_nob_transferred != desc->bd_nob) {
+ DEBUG_REQ(D_ERROR, req, "%s bulk %s %d(%d)",
+ desc->bd_success ?
+ "truncated" : "network error on",
+ bulk2type(desc),
+ desc->bd_nob_transferred,
+ desc->bd_nob);
+ /* XXX should this be a different errno? */
+ rc = -ETIMEDOUT;
+ } else if (desc->bd_type == BULK_GET_SINK) {
+ rc = sptlrpc_svc_unwrap_bulk(req, desc);
+ }
+ } else {
+ DEBUG_REQ(D_ERROR, req, "bulk %s failed: rc %d",
+ bulk2type(desc), rc);
+ }
+
+ RETURN(rc);
+}
+EXPORT_SYMBOL(target_bulk_io);