#include <linux/lustre_mgmt.h>
#include <linux/lustre_dlm.h>
#include <linux/lustre_net.h>
+/* @priority: if non-zero, move the selected to the list head
+ * @nocreate: if non-zero, only search in existed connections
+ */
+static int import_set_conn(struct obd_import *imp, struct obd_uuid *uuid,
+ int priority, int nocreate)
+{
+ struct ptlrpc_connection *ptlrpc_conn;
+ struct obd_import_conn *imp_conn = NULL, *item;
+ int rc = 0;
+ ENTRY;
+
+ LASSERT(!(nocreate && !priority));
+
+ ptlrpc_conn = ptlrpc_uuid_to_connection(uuid);
+ if (!ptlrpc_conn) {
+ CERROR("can't find connection %s\n", uuid->uuid);
+ RETURN (-EINVAL);
+ }
+
+ if (!nocreate) {
+ OBD_ALLOC(imp_conn, sizeof(*imp_conn));
+ if (!imp_conn) {
+ CERROR("fail to alloc memory\n");
+ GOTO(out_put, rc = -ENOMEM);
+ }
+ }
+
+ spin_lock(&imp->imp_lock);
+ list_for_each_entry(item, &imp->imp_conn_list, oic_item) {
+ if (obd_uuid_equals(uuid, &item->oic_uuid)) {
+ if (priority) {
+ list_del(&item->oic_item);
+ list_add(&item->oic_item, &imp->imp_conn_list);
+ item->oic_last_attempt = 0;
+ }
+ CDEBUG(D_HA, "imp %p@%s: find existed conn %s%s\n",
+ imp, imp->imp_obd->obd_name, uuid->uuid,
+ (priority ? ", move to head." : ""));
+ spin_unlock(&imp->imp_lock);
+ GOTO(out_free, rc = 0);
+ }
+ }
+ /* not found */
+ if (!nocreate) {
+ imp_conn->oic_conn = ptlrpc_conn;
+ imp_conn->oic_uuid = *uuid;
+ imp_conn->oic_last_attempt = 0;
+ if (priority)
+ list_add(&imp_conn->oic_item, &imp->imp_conn_list);
+ else
+ list_add_tail(&imp_conn->oic_item, &imp->imp_conn_list);
+ CDEBUG(D_HA, "imp %p@%s: add connection %s at %s\n",
+ imp, imp->imp_obd->obd_name, uuid->uuid,
+ (priority ? "head" : "tail"));
+ } else
+ rc = -ENOENT;
+
+ spin_unlock(&imp->imp_lock);
+ RETURN(0);
+out_free:
+ if (imp_conn)
+ OBD_FREE(imp_conn, sizeof(*imp_conn));
+out_put:
+ ptlrpc_put_connection(ptlrpc_conn);
+ RETURN(rc);
+}
+
+int import_set_conn_priority(struct obd_import *imp, struct obd_uuid *uuid)
+{
+ return import_set_conn(imp, uuid, 1, 1);
+}
+
+int client_import_add_conn(struct obd_import *imp, struct obd_uuid *uuid,
+ int priority)
+{
+ return import_set_conn(imp, uuid, priority, 0);
+}
+
+int client_import_del_conn(struct obd_import *imp, struct obd_uuid *uuid)
+{
+ struct obd_import_conn *imp_conn;
+ struct obd_export *dlmexp;
+ int rc = -ENOENT;
+ ENTRY;
+
+ spin_lock(&imp->imp_lock);
+ if (list_empty(&imp->imp_conn_list)) {
+ LASSERT(!imp->imp_conn_current);
+ LASSERT(!imp->imp_connection);
+ GOTO(out, rc);
+ }
+
+ list_for_each_entry(imp_conn, &imp->imp_conn_list, oic_item) {
+ if (!obd_uuid_equals(uuid, &imp_conn->oic_uuid))
+ continue;
+ LASSERT(imp_conn->oic_conn);
+
+ /* is current conn? */
+ if (imp_conn == imp->imp_conn_current) {
+ LASSERT(imp_conn->oic_conn == imp->imp_connection);
+
+ if (imp->imp_state != LUSTRE_IMP_CLOSED &&
+ imp->imp_state != LUSTRE_IMP_DISCON) {
+ CERROR("can't remove current connection\n");
+ GOTO(out, rc = -EBUSY);
+ }
+
+ ptlrpc_put_connection(imp->imp_connection);
+ imp->imp_connection = NULL;
+
+ dlmexp = class_conn2export(&imp->imp_dlm_handle);
+ if (dlmexp && dlmexp->exp_connection) {
+ LASSERT(dlmexp->exp_connection ==
+ imp_conn->oic_conn);
+ ptlrpc_put_connection(dlmexp->exp_connection);
+ dlmexp->exp_connection = NULL;
+ }
+ }
+
+ list_del(&imp_conn->oic_item);
+ ptlrpc_put_connection(imp_conn->oic_conn);
+ OBD_FREE(imp_conn, sizeof(*imp_conn));
+ CDEBUG(D_HA, "imp %p@%s: remove connection %s\n",
+ imp, imp->imp_obd->obd_name, uuid->uuid);
+ rc = 0;
+ break;
+ }
+out:
+ spin_unlock(&imp->imp_lock);
+ if (rc == -ENOENT)
+ CERROR("connection %s not found\n", uuid->uuid);
+ RETURN(rc);
+}
int client_obd_setup(struct obd_device *obddev, obd_count len, void *buf)
{
- struct ptlrpc_connection *conn;
struct lustre_cfg* lcfg = buf;
struct client_obd *cli = &obddev->u.cli;
struct obd_import *imp;
cli->cl_dirty = 0;
cli->cl_avail_grant = 0;
+ /* FIXME: should limit this for the sum of all cl_dirty_max */
cli->cl_dirty_max = OSC_MAX_DIRTY_DEFAULT * 1024 * 1024;
+ if (cli->cl_dirty_max >> PAGE_SHIFT > num_physpages / 8)
+ cli->cl_dirty_max = num_physpages << (PAGE_SHIFT - 3);
INIT_LIST_HEAD(&cli->cl_cache_waiters);
INIT_LIST_HEAD(&cli->cl_loi_ready_list);
INIT_LIST_HEAD(&cli->cl_loi_write_list);
INIT_LIST_HEAD(&cli->cl_loi_read_list);
spin_lock_init(&cli->cl_loi_list_lock);
- cli->cl_brw_in_flight = 0;
+ cli->cl_r_in_flight = 0;
+ cli->cl_w_in_flight = 0;
spin_lock_init(&cli->cl_read_rpc_hist.oh_lock);
spin_lock_init(&cli->cl_write_rpc_hist.oh_lock);
spin_lock_init(&cli->cl_read_page_hist.oh_lock);
spin_lock_init(&cli->cl_write_page_hist.oh_lock);
- cli->cl_max_pages_per_rpc = PTLRPC_MAX_BRW_PAGES;
- cli->cl_max_rpcs_in_flight = OSC_MAX_RIF_DEFAULT;
+
+ if (num_physpages <= 32768) { /* <= 128 MB */
+ cli->cl_max_pages_per_rpc = PTLRPC_MAX_BRW_PAGES / 3;
+ cli->cl_max_rpcs_in_flight = OSC_MAX_RIF_DEFAULT / 3;
+ } else {
+ cli->cl_max_pages_per_rpc = PTLRPC_MAX_BRW_PAGES;
+ cli->cl_max_rpcs_in_flight = OSC_MAX_RIF_DEFAULT;
+ }
+
rc = ldlm_get_ref();
if (rc) {
GOTO(err, rc);
}
- conn = ptlrpc_uuid_to_connection(&server_uuid);
- if (conn == NULL)
- GOTO(err_ldlm, rc = -ENOENT);
-
ptlrpc_init_client(rq_portal, rp_portal, name,
&obddev->obd_ldlm_client);
imp = class_new_import();
- if (imp == NULL) {
- ptlrpc_put_connection(conn);
+ if (imp == NULL)
GOTO(err_ldlm, rc = -ENOENT);
- }
- imp->imp_connection = conn;
imp->imp_client = &obddev->obd_ldlm_client;
imp->imp_obd = obddev;
imp->imp_connect_op = connect_op;
lcfg->lcfg_inllen1);
class_import_put(imp);
+ rc = client_import_add_conn(imp, &server_uuid, 1);
+ if (rc) {
+ CERROR("can't add initial connection\n");
+ GOTO(err_import, rc);
+ }
+
cli->cl_import = imp;
cli->cl_max_mds_easize = sizeof(struct lov_mds_md);
cli->cl_max_mds_cookiesize = sizeof(struct llog_cookie);
if (rc != 0)
GOTO(out_ldlm, rc);
- exp->exp_connection = ptlrpc_connection_addref(imp->imp_connection);
imp->imp_connect_flags = connect_flags;
rc = ptlrpc_connect_import(imp, NULL);
if (rc != 0) {
LASSERT (imp->imp_state == LUSTRE_IMP_DISCON);
GOTO(out_ldlm, rc);
}
-
+ LASSERT(exp->exp_connection);
ptlrpc_pinger_add_import(imp);
EXIT;
RETURN(0);
}
+static char nidstr[PTL_NALFMT_SIZE];
int target_handle_connect(struct ptlrpc_request *req)
{
unsigned long connect_flags = 0, *cfp;
}
if (!target || target->obd_stopping || !target->obd_set_up) {
- CERROR("UUID '%s' is not available for connect\n", str);
-
+ CERROR("UUID '%s' is not available for connect from NID %s\n",
+ str, ptlrpc_peernid2str(&req->rq_peer, nidstr));
GOTO(out, rc = -ENODEV);
}
#endif
if (export == NULL) {
if (target->obd_recovering) {
- CERROR("denying connection for new client %s@%s: "
- "%d clients in recovery for %lds\n", cluuid.uuid,
+ CERROR("%s denying connection for new client %s@%s: "
+ "%d clients in recovery for %lds\n", target->obd_name,
+ cluuid.uuid,
ptlrpc_peernid2str(&req->rq_peer, peer_str),
target->obd_recoverable_clients,
(target->obd_recovery_timer.expires-jiffies)/HZ);
OBD_FREE(req, sizeof *req);
}
-static void abort_delayed_replies(struct obd_device *obd)
+
+
+static void target_release_saved_req(struct ptlrpc_request *req)
+{
+ class_export_put(req->rq_export);
+ OBD_FREE(req->rq_reqmsg, req->rq_reqlen);
+ OBD_FREE(req, sizeof *req);
+}
+
+static void target_finish_recovery(struct obd_device *obd)
{
- struct ptlrpc_request *req;
struct list_head *tmp, *n;
+ int rc;
+
+ CWARN("%s: sending delayed replies to recovered clients\n",
+ obd->obd_name);
+
+ ldlm_reprocess_all_ns(obd->obd_namespace);
+
+ /* when recovery finished, cleanup orphans on mds and ost */
+ if (OBT(obd) && OBP(obd, postrecov)) {
+ rc = OBP(obd, postrecov)(obd);
+ if (rc >= 0)
+ CWARN("%s: all clients recovered, %d MDS "
+ "orphans deleted\n", obd->obd_name, rc);
+ else
+ CERROR("postrecov failed %d\n", rc);
+ }
+
+
list_for_each_safe(tmp, n, &obd->obd_delayed_reply_queue) {
+ struct ptlrpc_request *req;
req = list_entry(tmp, struct ptlrpc_request, rq_list);
- DEBUG_REQ(D_ERROR, req, "aborted:");
- req->rq_status = -ENOTCONN;
- req->rq_type = PTL_RPC_MSG_ERR;
- ptlrpc_reply(req);
- class_export_put(req->rq_export);
list_del(&req->rq_list);
- OBD_FREE(req->rq_reqmsg, req->rq_reqlen);
- OBD_FREE(req, sizeof *req);
+ DEBUG_REQ(D_ERROR, req, "delayed:");
+ ptlrpc_reply(req);
+ target_release_saved_req(req);
}
+ obd->obd_recovery_end = LTIME_S(CURRENT_TIME);
+ return;
}
static void abort_recovery_queue(struct obd_device *obd)
list_for_each_safe(tmp, n, &obd->obd_recovery_queue) {
req = list_entry(tmp, struct ptlrpc_request, rq_list);
+ list_del(&req->rq_list);
DEBUG_REQ(D_ERROR, req, "aborted:");
req->rq_status = -ENOTCONN;
req->rq_type = PTL_RPC_MSG_ERR;
DEBUG_REQ(D_ERROR, req,
"packing failed for abort-reply; skipping");
}
+ target_release_saved_req(req);
+ }
+}
+/* Called from a cleanup function if the device is being cleaned up
+ forcefully. The exports should all have been disconnected already,
+ the only thing left to do is
+ - clear the recovery flags
+ - cancel the timer
+ - free queued requests and replies, but don't send replies
+ Because the obd_stopping flag is set, no new requests should be received.
+
+*/
+void target_cleanup_recovery(struct obd_device *obd)
+{
+ struct list_head *tmp, *n;
+ struct ptlrpc_request *req;
+
+ spin_lock_bh(&obd->obd_processing_task_lock);
+ if (!obd->obd_recovering) {
+ spin_unlock_bh(&obd->obd_processing_task_lock);
+ EXIT;
+ return;
+ }
+ obd->obd_recovering = obd->obd_abort_recovery = 0;
+ target_cancel_recovery_timer(obd);
+ spin_unlock_bh(&obd->obd_processing_task_lock);
+
+ list_for_each_safe(tmp, n, &obd->obd_delayed_reply_queue) {
+ req = list_entry(tmp, struct ptlrpc_request, rq_list);
list_del(&req->rq_list);
- class_export_put(req->rq_export);
- OBD_FREE(req->rq_reqmsg, req->rq_reqlen);
- OBD_FREE(req, sizeof *req);
+ LASSERT (req->rq_reply_state);
+ lustre_free_reply_state(req->rq_reply_state);
+ target_release_saved_req(req);
}
+ list_for_each_safe(tmp, n, &obd->obd_recovery_queue) {
+ req = list_entry(tmp, struct ptlrpc_request, rq_list);
+ list_del(&req->rq_list);
+ LASSERT (req->rq_reply_state == 0);
+ target_release_saved_req(req);
+ }
}
static void target_abort_recovery(void *data)
{
struct obd_device *obd = data;
- int rc;
-
- CERROR("disconnecting clients and aborting recovery\n");
+
LASSERT(!obd->obd_recovering);
- class_disconnect_exports(obd, 0);
+ class_disconnect_stale_exports(obd, 0);
- /* when recovery was aborted, cleanup orphans on mds and ost */
- if (OBT(obd) && OBP(obd, postrecov)) {
- rc = OBP(obd, postrecov)(obd);
- if (rc >= 0)
- CWARN("Cleanup %d orphans after recovery was aborted\n",
- rc);
- else
- CERROR("postrecov failed %d\n", rc);
- }
+ CERROR("%s: recovery period over; disconnecting unfinished clients.\n",
+ obd->obd_name);
- abort_delayed_replies(obd);
abort_recovery_queue(obd);
+ target_finish_recovery(obd);
ptlrpc_run_recovery_over_upcall(obd);
}
struct obd_device *obd = (struct obd_device *)castmeharder;
CERROR("recovery timed out, aborting\n");
spin_lock_bh(&obd->obd_processing_task_lock);
- obd->obd_abort_recovery = 1;
+ if (obd->obd_recovering)
+ obd->obd_abort_recovery = 1;
+
wake_up(&obd->obd_next_transno_waitq);
spin_unlock_bh(&obd->obd_processing_task_lock);
}
spin_unlock_bh(&obd->obd_processing_task_lock);
}
-static void target_finish_recovery(struct obd_device *obd)
-{
- struct list_head *tmp, *n;
- int rc2;
-
- ldlm_reprocess_all_ns(obd->obd_namespace);
-
- CWARN("%s: all clients recovered, calling postrecov\n",
- obd->obd_name);
- /* when recovery finished, cleanup orphans on mds and ost */
- if (OBT(obd) && OBP(obd, postrecov)) {
- rc2 = OBP(obd, postrecov)(obd);
- if (rc2 >= 0)
- CWARN("%s: all clients recovered, %d MDS "
- "orphans deleted\n", obd->obd_name, rc2);
- else
- CERROR("postrecov failed %d\n", rc2);
- }
-
- CWARN("%s: recovery over, sending delayed replies\n",
- obd->obd_name);
- list_for_each_safe(tmp, n, &obd->obd_delayed_reply_queue) {
- struct ptlrpc_request *req;
- req = list_entry(tmp, struct ptlrpc_request, rq_list);
- DEBUG_REQ(D_ERROR, req, "delayed:");
- ptlrpc_reply(req);
- ptlrpc_free_clone(req);
- }
- ptlrpc_run_recovery_over_upcall(obd);
-}
-
static int check_for_next_transno(struct obd_device *obd)
{
struct ptlrpc_request *req = NULL;
return req;
}
-
+#ifdef __KERNEL__
static int target_recovery_thread(void *arg)
{
struct obd_device *obd = arg;
spin_unlock_bh(&obd->obd_processing_task_lock);
}
}
-
+#endif
int target_queue_recovery_request(struct ptlrpc_request *req,
struct obd_device *obd)
{
}
if (rc) {
- DEBUG_REQ(D_ERROR, req, "processing error (%d)", rc);
- if (req->rq_reply_state == NULL) {
- rc = lustre_pack_reply (req, 0, NULL, NULL);
- if (rc != 0) {
- CERROR ("can't allocate reply\n");
- return (rc);
- }
- }
- req->rq_type = PTL_RPC_MSG_ERR;
+ req->rq_status = rc;
+ return (ptlrpc_error(req));
} else {
DEBUG_REQ(D_NET, req, "sending reply");
}