#include <linux/lustre_mgmt.h>
#include <linux/lustre_dlm.h>
#include <linux/lustre_net.h>
+#include <linux/lustre_sec.h>
+
/* @priority: if non-zero, move the selected to the list head
* @nocreate: if non-zero, only search in existed connections
*/
spin_lock_init(&cli->cl_write_rpc_hist.oh_lock);
spin_lock_init(&cli->cl_read_page_hist.oh_lock);
spin_lock_init(&cli->cl_write_page_hist.oh_lock);
-
- if (num_physpages <= 32768) { /* <= 128 MB */
- cli->cl_max_pages_per_rpc = PTLRPC_MAX_BRW_PAGES / 3;
- cli->cl_max_rpcs_in_flight = OSC_MAX_RIF_DEFAULT / 3;
+
+ if (num_physpages >> (20 - PAGE_SHIFT) <= 128) { /* <= 128 MB */
+ cli->cl_max_pages_per_rpc = PTLRPC_MAX_BRW_PAGES / 4;
+ cli->cl_max_rpcs_in_flight = OSC_MAX_RIF_DEFAULT / 4;
+ } else if (num_physpages >> (20 - PAGE_SHIFT) <= 512) { /* <= 512 MB */
+ cli->cl_max_pages_per_rpc = PTLRPC_MAX_BRW_PAGES / 2;
+ cli->cl_max_rpcs_in_flight = OSC_MAX_RIF_DEFAULT / 2;
} else {
cli->cl_max_pages_per_rpc = PTLRPC_MAX_BRW_PAGES;
cli->cl_max_rpcs_in_flight = OSC_MAX_RIF_DEFAULT;
}
-
rc = ldlm_get_ref();
if (rc) {
CERROR("ldlm_get_ref failed: %d\n", rc);
int client_obd_cleanup(struct obd_device *obddev, int flags)
{
struct client_obd *cli = &obddev->u.cli;
+ ENTRY;
if (!cli->cl_import)
RETURN(-EINVAL);
dereg_f(cli->cl_mgmtcli_obd, obddev);
inter_module_put("mgmtcli_deregister_for_events");
}
+
+ /* Here we try to drop the security structure after destroy import,
+ * to avoid issue of "sleep in spinlock".
+ */
+ class_import_get(cli->cl_import);
class_destroy_import(cli->cl_import);
+ ptlrpcs_import_drop_sec(cli->cl_import);
+ class_import_put(cli->cl_import);
cli->cl_import = NULL;
ldlm_put_ref(flags & OBD_OPT_FORCE);
int client_connect_import(struct lustre_handle *dlm_handle,
struct obd_device *obd,
struct obd_uuid *cluuid,
+ struct obd_connect_data *conn_data,
unsigned long connect_flags)
{
struct client_obd *cli = &obd->u.cli;
if (obd->obd_namespace == NULL)
GOTO(out_disco, rc = -ENOMEM);
+ rc = ptlrpcs_import_get_sec(imp);
+ if (rc != 0)
+ GOTO(out_ldlm, rc);
+
imp->imp_dlm_handle = *dlm_handle;
rc = ptlrpc_init_import(imp);
if (rc != 0)
GOTO(out_ldlm, rc);
imp->imp_connect_flags = connect_flags;
+ if (conn_data)
+ memcpy(&imp->imp_connect_data, conn_data, sizeof(*conn_data));
+
rc = ptlrpc_connect_import(imp, NULL);
if (rc != 0) {
LASSERT (imp->imp_state == LUSTRE_IMP_DISCON);
return rc;
}
-int client_disconnect_export(struct obd_export *exp, int failover)
+int client_disconnect_export(struct obd_export *exp, unsigned long flags)
{
struct obd_device *obd = class_exp2obd(exp);
struct client_obd *cli = &obd->u.cli;
obd->obd_namespace = NULL;
}
- /* Yeah, obd_no_recov also (mainly) means "forced shutdown". */
+ /*
+ * Yeah, obd_no_recov also (mainly) means "forced shutdown".
+ */
if (obd->obd_no_recov)
ptlrpc_invalidate_import(imp, 0);
else
RETURN(0);
}
-static char nidstr[PTL_NALFMT_SIZE];
+static inline int ptlrpc_peer_is_local(struct ptlrpc_peer *peer)
+{
+ ptl_process_id_t myid;
+
+ PtlGetId(peer->peer_ni->pni_ni_h, &myid);
+ return (memcmp(&peer->peer_id, &myid, sizeof(myid)) == 0);
+}
+
int target_handle_connect(struct ptlrpc_request *req)
{
unsigned long connect_flags = 0, *cfp;
struct obd_uuid cluuid;
struct obd_uuid remote_uuid;
struct list_head *p;
+ struct obd_connect_data *conn_data;
+ int conn_data_size = sizeof(*conn_data);
char *str, *tmp;
int rc = 0;
unsigned long flags;
int initial_conn = 0;
char peer_str[PTL_NALFMT_SIZE];
+ const int offset = 1;
ENTRY;
OBD_RACE(OBD_FAIL_TGT_CONN_RACE);
- LASSERT_REQSWAB (req, 0);
- str = lustre_msg_string(req->rq_reqmsg, 0, sizeof(tgtuuid) - 1);
+ LASSERT_REQSWAB (req, offset + 0);
+ str = lustre_msg_string(req->rq_reqmsg, offset + 0,
+ sizeof(tgtuuid) - 1);
if (str == NULL) {
CERROR("bad target UUID for connect\n");
GOTO(out, rc = -EINVAL);
obd_str2uuid (&tgtuuid, str);
target = class_uuid2obd(&tgtuuid);
- if (!target) {
+ if (!target)
target = class_name2obd(str);
- }
if (!target || target->obd_stopping || !target->obd_set_up) {
- CERROR("UUID '%s' is not available for connect from NID %s\n",
- str, ptlrpc_peernid2str(&req->rq_peer, nidstr));
+ CERROR("UUID '%s' is not available for connect from %s\n",
+ str, req->rq_peerstr);
GOTO(out, rc = -ENODEV);
}
- LASSERT_REQSWAB (req, 1);
- str = lustre_msg_string(req->rq_reqmsg, 1, sizeof(cluuid) - 1);
+ LASSERT_REQSWAB (req, offset + 1);
+ str = lustre_msg_string(req->rq_reqmsg, offset + 1, sizeof(cluuid) - 1);
if (str == NULL) {
CERROR("bad client UUID for connect\n");
GOTO(out, rc = -EINVAL);
LBUG();
}
- tmp = lustre_msg_buf(req->rq_reqmsg, 2, sizeof conn);
+ tmp = lustre_msg_buf(req->rq_reqmsg, offset + 2, sizeof conn);
if (tmp == NULL)
GOTO(out, rc = -EPROTO);
memcpy(&conn, tmp, sizeof conn);
- cfp = lustre_msg_buf(req->rq_reqmsg, 3, sizeof(unsigned long));
+ cfp = lustre_msg_buf(req->rq_reqmsg, offset + 3, sizeof(unsigned long));
LASSERT(cfp != NULL);
connect_flags = *cfp;
- rc = lustre_pack_reply(req, 0, NULL, NULL);
+ conn_data = lustre_swab_reqbuf(req, offset + 4, sizeof(*conn_data),
+ lustre_swab_connect);
+ if (!conn_data)
+ GOTO(out, rc = -EPROTO);
+
+ rc = lustre_pack_reply(req, 1, &conn_data_size, NULL);
if (rc)
GOTO(out, rc);
ptlrpc_peernid2str(&req->rq_peer, peer_str),
export, atomic_read(&export->exp_rpc_count));
GOTO(out, rc = -EBUSY);
- }
- else if (req->rq_reqmsg->conn_cnt == 1 && !initial_conn) {
+ } else if (req->rq_reqmsg->conn_cnt == 1 && !initial_conn) {
CERROR("%s reconnected with 1 conn_cnt; cookies not random?\n",
cluuid.uuid);
GOTO(out, rc = -EALREADY);
CWARN("%s: connection from %s@%s/%lu %s\n", target->obd_name, cluuid.uuid,
ptlrpc_peernid2str(&req->rq_peer, peer_str), *cfp,
target->obd_recovering ? "(recovering)" : "");
+
if (target->obd_recovering) {
lustre_msg_add_op_flags(req->rq_repmsg, MSG_CONNECT_RECOVERING);
target_start_recovery_timer(target);
}
+
#if 0
/* Tell the client if we support replayable requests */
if (target->obd_replayable)
lustre_msg_add_op_flags(req->rq_repmsg, MSG_CONNECT_REPLAYABLE);
#endif
+
if (export == NULL) {
if (target->obd_recovering) {
CERROR("%s denying connection for new client %s@%s: "
rc = -EBUSY;
} else {
dont_check_exports:
- rc = obd_connect(&conn, target, &cluuid, connect_flags);
+ rc = obd_connect(&conn, target, &cluuid, conn_data,
+ connect_flags);
}
}
+
+ /* Return only the parts of obd_connect_data that we understand, so the
+ * client knows that we don't understand the rest. */
+ conn_data->ocd_connect_flags &= OBD_CONNECT_SUPPORTED;
+ memcpy(lustre_msg_buf(req->rq_repmsg, 0, sizeof(*conn_data)), conn_data,
+ sizeof(*conn_data));
+
/* Tell the client if we support replayable requests */
if (target->obd_replayable)
lustre_msg_add_op_flags(req->rq_repmsg, MSG_CONNECT_REPLAYABLE);
if (lustre_msg_get_op_flags(req->rq_reqmsg) & MSG_CONNECT_LIBCLIENT)
export->exp_libclient = 1;
+ if (!(lustre_msg_get_op_flags(req->rq_reqmsg) & MSG_CONNECT_ASYNC) &&
+ ptlrpc_peer_is_local(&req->rq_peer)) {
+ CWARN("%s: exp %p set sync\n", target->obd_name, export);
+ export->exp_sync = 1;
+ } else {
+ CDEBUG(D_HA, "%s: exp %p set async\n",target->obd_name,export);
+ export->exp_sync = 0;
+ }
+
if (export->exp_connection != NULL)
ptlrpc_put_connection(export->exp_connection);
export->exp_connection = ptlrpc_get_connection(&req->rq_peer,
GOTO(out, rc = 0);
}
- if (target->obd_recovering) {
+ if (target->obd_recovering)
target->obd_connected_clients++;
- }
- memcpy(&conn, lustre_msg_buf(req->rq_reqmsg, 2, sizeof conn),
- sizeof conn);
+ memcpy(&conn, lustre_msg_buf(req->rq_reqmsg, offset + 2, sizeof(conn)),
+ sizeof(conn));
- if (export->exp_imp_reverse != NULL)
+ if (export->exp_imp_reverse != NULL) {
+ /* same logic as client_obd_cleanup */
+ class_import_get(export->exp_imp_reverse);
class_destroy_import(export->exp_imp_reverse);
+ ptlrpcs_import_drop_sec(export->exp_imp_reverse);
+ class_import_put(export->exp_imp_reverse);
+ }
+
+ /* for the rest part, we return -ENOTCONN in case of errors
+ * in order to let client initialize connection again.
+ */
revimp = export->exp_imp_reverse = class_new_import();
+ if (!revimp) {
+ CERROR("fail to alloc new reverse import.\n");
+ GOTO(out, rc = -ENOTCONN);
+ }
+
revimp->imp_connection = ptlrpc_connection_addref(export->exp_connection);
revimp->imp_client = &export->exp_obd->obd_ldlm_client;
revimp->imp_remote_handle = conn;
revimp->imp_obd = target;
revimp->imp_dlm_fake = 1;
revimp->imp_state = LUSTRE_IMP_FULL;
+
+ rc = ptlrpcs_import_get_sec(revimp);
+ if (rc) {
+ CERROR("reverse import can not get sec: %d\n", rc);
+ class_destroy_import(revimp);
+ export->exp_imp_reverse = NULL;
+ GOTO(out, rc = -ENOTCONN);
+ }
+
class_import_put(revimp);
- rc = obd_connect_post(export);
+ rc = obd_connect_post(export, connect_flags);
out:
if (rc)
req->rq_status = rc;
{
/* exports created from last_rcvd data, and "fake"
exports created by lctl don't have an import */
- if (exp->exp_imp_reverse != NULL)
+ if (exp->exp_imp_reverse != NULL) {
+ ptlrpcs_import_drop_sec(exp->exp_imp_reverse);
class_destroy_import(exp->exp_imp_reverse);
+ }
/* We cancel locks at disconnect time, but this will catch any locks
* granted in a race with recovery-induced disconnect. */
memcpy(copy_req, orig_req, sizeof *copy_req);
memcpy(copy_reqmsg, orig_req->rq_reqmsg, orig_req->rq_reqlen);
- /* the copied req takes over the reply state */
+ /* the copied req takes over the reply state and security data */
orig_req->rq_reply_state = NULL;
+ orig_req->rq_sec_svcdata = NULL;
copy_req->rq_reqmsg = copy_reqmsg;
class_export_get(copy_req->rq_export);
return copy_req;
}
+
void ptlrpc_free_clone( struct ptlrpc_request *req)
{
+ if (req->rq_svcsec)
+ svcsec_cleanup_req(req);
+
class_export_put(req->rq_export);
list_del(&req->rq_list);
OBD_FREE(req->rq_reqmsg, req->rq_reqlen);
OBD_FREE(req, sizeof *req);
}
-
-
static void target_release_saved_req(struct ptlrpc_request *req)
{
+ if (req->rq_svcsec)
+ svcsec_cleanup_req(req);
+
class_export_put(req->rq_export);
OBD_FREE(req->rq_reqmsg, req->rq_reqlen);
OBD_FREE(req, sizeof *req);
target_release_saved_req(req);
}
}
+
/* Called from a cleanup function if the device is being cleaned up
forcefully. The exports should all have been disconnected already,
the only thing left to do is
list_del(&req->rq_list);
LASSERT (req->rq_reply_state == 0);
target_release_saved_req(req);
- }
+ }
}
static void target_abort_recovery(void *data)
{
struct obd_device *obd = data;
-
+
LASSERT(!obd->obd_recovering);
class_disconnect_stale_exports(obd, 0);
del_timer(&obd->obd_recovery_timer);
}
+#ifdef __KERNEL__
static void reset_recovery_timer(struct obd_device *obd)
{
spin_lock_bh(&obd->obd_processing_task_lock);
mod_timer(&obd->obd_recovery_timer, jiffies + OBD_RECOVERY_TIMEOUT);
spin_unlock_bh(&obd->obd_processing_task_lock);
}
-
+#endif
/* Only start it the first time called */
void target_start_recovery_timer(struct obd_device *obd)
spin_unlock_bh(&obd->obd_processing_task_lock);
}
+#ifdef __KERNEL__
static int check_for_next_transno(struct obd_device *obd)
{
struct ptlrpc_request *req = NULL;
return req;
}
-#ifdef __KERNEL__
static int target_recovery_thread(void *arg)
{
struct obd_device *obd = arg;
}
}
#endif
+
int target_queue_recovery_request(struct ptlrpc_request *req,
struct obd_device *obd)
{
if (obd->obd_recovery_data.trd_processing_task == current->pid ||
transno < obd->obd_next_recovery_transno) {
/* Processing the queue right now, don't re-add. */
- lustre_msg_clear_flags(req->rq_reqmsg, MSG_RESENT);
LASSERT(list_empty(&req->rq_list));
spin_unlock_bh(&obd->obd_processing_task_lock);
return 1;
}
}
- if (!inserted) {
+ if (!inserted)
list_add_tail(&req->rq_list, &obd->obd_recovery_queue);
- }
obd->obd_requests_queued_for_recovery++;
wake_up(&obd->obd_next_transno_waitq);
spin_unlock_bh(&obd->obd_processing_task_lock);
-
return 0;
}