* Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
*/
-#define EXPORT_SYMTAB
+#ifndef EXPORT_SYMTAB
+# define EXPORT_SYMTAB
+#endif
#define DEBUG_SUBSYSTEM S_LDLM
#ifdef __KERNEL__
#else
# include <liblustre.h>
#endif
-#include <linux/obd_ost.h>
+#include <linux/obd.h>
+#include <linux/obd_ost.h> /* for LUSTRE_OSC_NAME */
+#include <linux/lustre_mds.h> /* for LUSTRE_MDC_NAME */
+#include <linux/lustre_mgmt.h>
#include <linux/lustre_dlm.h>
-#include <linux/lustre_mds.h>
#include <linux/lustre_net.h>
-int client_import_connect(struct lustre_handle *dlm_handle,
+int client_obd_setup(struct obd_device *obddev, obd_count len, void *buf)
+{
+ struct ptlrpc_connection *conn;
+ struct lustre_cfg* lcfg = buf;
+ struct client_obd *cli = &obddev->u.cli;
+ struct obd_import *imp;
+ struct obd_uuid server_uuid;
+ int rq_portal, rp_portal, connect_op;
+ char *name = obddev->obd_type->typ_name;
+ char *mgmt_name = NULL;
+ int rc = 0;
+ struct obd_device *mgmt_obd;
+ mgmtcli_register_for_events_t register_f;
+ ENTRY;
+
+ /* In a more perfect world, we would hang a ptlrpc_client off of
+ * obd_type and just use the values from there. */
+ if (!strcmp(name, LUSTRE_OSC_NAME)) {
+ rq_portal = OST_REQUEST_PORTAL;
+ rp_portal = OSC_REPLY_PORTAL;
+ connect_op = OST_CONNECT;
+ } else if (!strcmp(name, LUSTRE_MDC_NAME)) {
+ rq_portal = MDS_REQUEST_PORTAL;
+ rp_portal = MDC_REPLY_PORTAL;
+ connect_op = MDS_CONNECT;
+ } else if (!strcmp(name, LUSTRE_MGMTCLI_NAME)) {
+ rq_portal = MGMT_REQUEST_PORTAL;
+ rp_portal = MGMT_REPLY_PORTAL;
+ connect_op = MGMT_CONNECT;
+ } else {
+ CERROR("unknown client OBD type \"%s\", can't setup\n",
+ name);
+ RETURN(-EINVAL);
+ }
+
+ if (lcfg->lcfg_inllen1 < 1) {
+ CERROR("requires a TARGET UUID\n");
+ RETURN(-EINVAL);
+ }
+
+ if (lcfg->lcfg_inllen1 > 37) {
+ CERROR("client UUID must be less than 38 characters\n");
+ RETURN(-EINVAL);
+ }
+
+ if (lcfg->lcfg_inllen2 < 1) {
+ CERROR("setup requires a SERVER UUID\n");
+ RETURN(-EINVAL);
+ }
+
+ if (lcfg->lcfg_inllen2 > 37) {
+ CERROR("target UUID must be less than 38 characters\n");
+ RETURN(-EINVAL);
+ }
+
+ sema_init(&cli->cl_sem, 1);
+ cli->cl_conn_count = 0;
+ memcpy(server_uuid.uuid, lcfg->lcfg_inlbuf2, MIN(lcfg->lcfg_inllen2,
+ sizeof(server_uuid)));
+
+ init_MUTEX(&cli->cl_dirty_sem);
+ cli->cl_dirty = 0;
+ cli->cl_dirty_granted = 0;
+ cli->cl_dirty_max = OSC_MAX_DIRTY_DEFAULT * 1024 * 1024;
+ cli->cl_ost_can_grant = 1;
+ INIT_LIST_HEAD(&cli->cl_cache_waiters);
+ INIT_LIST_HEAD(&cli->cl_loi_ready_list);
+ spin_lock_init(&cli->cl_loi_list_lock);
+ cli->cl_brw_in_flight = 0;
+ spin_lock_init(&cli->cl_read_rpc_hist.oh_lock);
+ spin_lock_init(&cli->cl_write_rpc_hist.oh_lock);
+ spin_lock_init(&cli->cl_read_page_hist.oh_lock);
+ spin_lock_init(&cli->cl_write_page_hist.oh_lock);
+ cli->cl_max_pages_per_rpc = PTL_MD_MAX_PAGES;
+ cli->cl_max_rpcs_in_flight = OSC_MAX_RIF_DEFAULT;
+
+ ldlm_get_ref();
+ if (rc) {
+ CERROR("ldlm_get_ref failed: %d\n", rc);
+ GOTO(err, rc);
+ }
+
+ conn = ptlrpc_uuid_to_connection(&server_uuid);
+ if (conn == NULL)
+ GOTO(err_ldlm, rc = -ENOENT);
+
+ ptlrpc_init_client(rq_portal, rp_portal, name,
+ &obddev->obd_ldlm_client);
+
+ imp = class_new_import();
+ if (imp == NULL) {
+ ptlrpc_put_connection(conn);
+ GOTO(err_ldlm, rc = -ENOENT);
+ }
+ imp->imp_connection = conn;
+ imp->imp_client = &obddev->obd_ldlm_client;
+ imp->imp_obd = obddev;
+ imp->imp_connect_op = connect_op;
+ imp->imp_generation = 0;
+ INIT_LIST_HEAD(&imp->imp_pinger_chain);
+ memcpy(imp->imp_target_uuid.uuid, lcfg->lcfg_inlbuf1,
+ lcfg->lcfg_inllen1);
+ class_import_put(imp);
+
+ cli->cl_import = imp;
+ cli->cl_max_mds_easize = sizeof(struct lov_mds_md);
+ cli->cl_max_mds_cookiesize = sizeof(struct llog_cookie);
+ cli->cl_sandev = to_kdev_t(0);
+
+ if (lcfg->lcfg_inllen3 != 0) {
+ if (!strcmp(lcfg->lcfg_inlbuf3, "inactive")) {
+ CDEBUG(D_HA, "marking %s %s->%s as inactive\n",
+ name, obddev->obd_name,
+ imp->imp_target_uuid.uuid);
+ imp->imp_invalid = 1;
+
+ if (lcfg->lcfg_inllen4 != 0)
+ mgmt_name = lcfg->lcfg_inlbuf4;
+ } else {
+ mgmt_name = lcfg->lcfg_inlbuf3;
+ }
+ }
+
+ if (mgmt_name != NULL) {
+ /* Register with management client if we need to. */
+ CDEBUG(D_HA, "%s registering with %s for events about %s\n",
+ obddev->obd_name, mgmt_name, server_uuid.uuid);
+
+ mgmt_obd = class_name2obd(mgmt_name);
+ if (!mgmt_obd) {
+ CERROR("can't find mgmtcli %s to register\n",
+ mgmt_name);
+ GOTO(err_import, rc = -ENOSYS);
+ }
+
+ register_f = inter_module_get("mgmtcli_register_for_events");
+ if (!register_f) {
+ CERROR("can't i_m_g mgmtcli_register_for_events\n");
+ GOTO(err_import, rc = -ENOSYS);
+ }
+
+ rc = register_f(mgmt_obd, obddev, &imp->imp_target_uuid);
+ inter_module_put("mgmtcli_register_for_events");
+
+ if (!rc)
+ cli->cl_mgmtcli_obd = mgmt_obd;
+ }
+
+ RETURN(rc);
+
+err_import:
+ class_destroy_import(imp);
+err_ldlm:
+ ldlm_put_ref(0);
+err:
+ RETURN(rc);
+
+}
+
+int client_obd_cleanup(struct obd_device *obddev, int flags)
+{
+ struct client_obd *cli = &obddev->u.cli;
+
+ if (!cli->cl_import)
+ RETURN(-EINVAL);
+ if (cli->cl_mgmtcli_obd) {
+ mgmtcli_deregister_for_events_t dereg_f;
+
+ dereg_f = inter_module_get("mgmtcli_deregister_for_events");
+ dereg_f(cli->cl_mgmtcli_obd, obddev);
+ inter_module_put("mgmtcli_deregister_for_events");
+ }
+ class_destroy_import(cli->cl_import);
+ cli->cl_import = NULL;
+
+ ldlm_put_ref(flags & OBD_OPT_FORCE);
+
+ RETURN(0);
+}
+
+int client_connect_import(struct lustre_handle *dlm_handle,
struct obd_device *obd,
struct obd_uuid *cluuid)
{
struct client_obd *cli = &obd->u.cli;
struct obd_import *imp = cli->cl_import;
struct obd_export *exp;
- struct ptlrpc_request *request;
- /* XXX maybe this is a good time to create a connect struct? */
- int rc, size[] = {sizeof(imp->imp_target_uuid),
- sizeof(obd->obd_uuid),
- sizeof(*dlm_handle)};
- char *tmp[] = {imp->imp_target_uuid.uuid,
- obd->obd_uuid.uuid,
- (char *)dlm_handle};
- int msg_flags;
-
+ int rc;
ENTRY;
+
down(&cli->cl_sem);
rc = class_connect(dlm_handle, obd, cluuid);
if (rc)
cli->cl_conn_count++;
if (cli->cl_conn_count > 1)
GOTO(out_sem, rc);
+ exp = class_conn2export(dlm_handle);
if (obd->obd_namespace != NULL)
CERROR("already have namespace!\n");
if (obd->obd_namespace == NULL)
GOTO(out_disco, rc = -ENOMEM);
- request = ptlrpc_prep_req(imp, imp->imp_connect_op, 3, size, tmp);
- if (!request)
- GOTO(out_ldlm, rc = -ENOMEM);
-
- request->rq_level = LUSTRE_CONN_NEW;
- request->rq_replen = lustre_msg_size(0, NULL);
-
- lustre_msg_add_op_flags(request->rq_reqmsg, MSG_CONNECT_PEER);
-
imp->imp_dlm_handle = *dlm_handle;
+ imp->imp_state = LUSTRE_IMP_DISCON;
- imp->imp_level = LUSTRE_CONN_CON;
- rc = ptlrpc_queue_wait(request);
- if (rc) {
- class_disconnect(dlm_handle, 0);
- GOTO(out_req, rc);
+ rc = ptlrpc_connect_import(imp);
+ if (rc != 0) {
+ LASSERT (imp->imp_state == LUSTRE_IMP_DISCON);
+ GOTO(out_ldlm, rc);
}
- exp = class_conn2export(dlm_handle);
- exp->exp_connection = ptlrpc_connection_addref(request->rq_connection);
- class_export_put(exp);
+ LASSERT (imp->imp_state == LUSTRE_IMP_FULL);
+
+ exp->exp_connection = ptlrpc_connection_addref(imp->imp_connection);
- msg_flags = lustre_msg_get_op_flags(request->rq_repmsg);
- if (msg_flags & MSG_CONNECT_REPLAYABLE) {
- imp->imp_replayable = 1;
+ if (imp->imp_replayable) {
CDEBUG(D_HA, "connected to replayable target: %s\n",
imp->imp_target_uuid.uuid);
ptlrpc_pinger_add_import(imp);
}
- imp->imp_level = LUSTRE_CONN_FULL;
- imp->imp_remote_handle = request->rq_repmsg->handle;
+
CDEBUG(D_HA, "local import: %p, remote handle: "LPX64"\n", imp,
imp->imp_remote_handle.cookie);
EXIT;
-out_req:
- ptlrpc_req_finished(request);
+
if (rc) {
out_ldlm:
- ldlm_namespace_free(obd->obd_namespace);
+ ldlm_namespace_free(obd->obd_namespace, 0);
obd->obd_namespace = NULL;
out_disco:
cli->cl_conn_count--;
- class_disconnect(dlm_handle, 0);
+ class_disconnect(exp, 0);
+ } else {
+ class_export_put(exp);
}
out_sem:
up(&cli->cl_sem);
return rc;
}
-int client_import_disconnect(struct lustre_handle *dlm_handle, int failover)
+int client_disconnect_export(struct obd_export *exp, int failover)
{
- struct obd_device *obd = class_conn2obd(dlm_handle);
+ struct obd_device *obd = class_exp2obd(exp);
struct client_obd *cli = &obd->u.cli;
struct obd_import *imp = cli->cl_import;
- struct ptlrpc_request *request = NULL;
- int rc = 0, err, rq_opc;
+ int rc = 0, err;
ENTRY;
if (!obd) {
- CERROR("invalid connection for disconnect: cookie "LPX64"\n",
- dlm_handle ? dlm_handle->cookie : -1UL);
- RETURN(-EINVAL);
- }
-
- switch (imp->imp_connect_op) {
- case OST_CONNECT: rq_opc = OST_DISCONNECT; break;
- case MDS_CONNECT: rq_opc = MDS_DISCONNECT; break;
- case MGMT_CONNECT:rq_opc = MGMT_DISCONNECT;break;
- default:
- CERROR("don't know how to disconnect from %s (connect_op %d)\n",
- imp->imp_target_uuid.uuid, imp->imp_connect_op);
+ CERROR("invalid export for disconnect: exp %p cookie "LPX64"\n",
+ exp, exp ? exp->exp_handle.h_cookie : -1);
RETURN(-EINVAL);
}
if (cli->cl_conn_count)
GOTO(out_no_disconnect, rc = 0);
+ /* Some non-replayable imports (MDS's OSCs) are pinged, so just
+ * delete it regardless. (It's safe to delete an import that was
+ * never added.) */
+ (void)ptlrpc_pinger_del_import(imp);
+
if (obd->obd_namespace != NULL) {
/* obd_no_recov == local only */
ldlm_cli_cancel_unused(obd->obd_namespace, NULL,
obd->obd_no_recov, NULL);
- ldlm_namespace_free(obd->obd_namespace);
+ ldlm_namespace_free(obd->obd_namespace, obd->obd_no_recov);
obd->obd_namespace = NULL;
}
/* Yeah, obd_no_recov also (mainly) means "forced shutdown". */
- if (obd->obd_no_recov) {
+ if (obd->obd_no_recov)
ptlrpc_set_import_active(imp, 0);
- } else {
- request = ptlrpc_prep_req(imp, rq_opc, 0, NULL, NULL);
- if (!request)
- GOTO(out_req, rc = -ENOMEM);
+ else
+ rc = ptlrpc_disconnect_import(imp);
- request->rq_replen = lustre_msg_size(0, NULL);
-
- rc = ptlrpc_queue_wait(request);
- if (rc)
- GOTO(out_req, rc);
- }
- if (imp->imp_replayable)
- ptlrpc_pinger_del_import(imp);
+ imp->imp_state = LUSTRE_IMP_NEW;
EXIT;
- out_req:
- if (request)
- ptlrpc_req_finished(request);
out_no_disconnect:
- err = class_disconnect(dlm_handle, 0);
+ err = class_disconnect(exp, 0);
if (!rc && err)
rc = err;
out_sem:
{
if (exp->exp_connection) {
struct lustre_handle *hdl;
- hdl = &exp->exp_ldlm_data.led_import->imp_remote_handle;
+ hdl = &exp->exp_imp_reverse->imp_remote_handle;
/* Might be a re-connect after a partition. */
if (!memcmp(&conn->cookie, &hdl->cookie, sizeof conn->cookie)) {
CERROR("%s reconnecting\n", cluuid->uuid);
LPX64")\n", cluuid->uuid,
exp->exp_connection->c_remote_uuid.uuid,
hdl->cookie, conn->cookie);
- /* XXX disconnect them here? */
memset(conn, 0, sizeof *conn);
- /* This is a little scary, but right now we build this
- * file separately into each server module, so I won't
- * go _immediately_ to hell.
- */
RETURN(-EALREADY);
}
}
{
struct obd_device *target;
struct obd_export *export = NULL;
- struct obd_import *dlmimp;
+ struct obd_import *revimp;
struct lustre_handle conn;
struct obd_uuid tgtuuid;
struct obd_uuid cluuid;
obd_str2uuid (&tgtuuid, str);
target = class_uuid2obd(&tgtuuid);
+ if (!target) {
+ target = class_name2obd(str);
+ }
+
if (!target || target->obd_stopping || !target->obd_set_up) {
CERROR("UUID '%s' is not available for connect\n", str);
GOTO(out, rc = -ENODEV);
memcpy(&conn, tmp, sizeof conn);
- rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg);
+ rc = lustre_pack_reply(req, 0, NULL, NULL);
if (rc)
GOTO(out, rc);
/* lctl gets a backstage, all-access pass. */
- if (obd_uuid_equals(&cluuid, &lctl_fake_uuid))
+ if (obd_uuid_equals(&cluuid, &target->obd_uuid))
goto dont_check_exports;
spin_lock(&target->obd_dev_lock);
export = NULL;
}
/* If we found an export, we already unlocked. */
- if (!export)
+ if (!export) {
spin_unlock(&target->obd_dev_lock);
+ } else if (req->rq_reqmsg->conn_cnt == 1) {
+ CERROR("%s reconnected with 1 conn_cnt; cookies not random?\n",
+ cluuid.uuid);
+ GOTO(out, rc = -EALREADY);
+ }
/* Tell the client if we're in recovery. */
/* If this is the first client, start the recovery timer */
if (export == NULL) {
if (target->obd_recovering) {
CERROR("denying connection for new client %s: "
- "in recovery\n", cluuid.uuid);
+ "%d clients in recovery for %lds\n", cluuid.uuid,
+ target->obd_recoverable_clients,
+ (target->obd_recovery_timer.expires-jiffies)/HZ);
rc = -EBUSY;
} else {
dont_check_exports:
if (rc && rc != EALREADY)
GOTO(out, rc);
+ /* XXX track this all the time? */
+ if (target->obd_recovering) {
+ target->obd_connected_clients++;
+ }
+
req->rq_repmsg->handle = conn;
/* If the client and the server are the same node, we will already
&remote_uuid);
req->rq_connection = ptlrpc_connection_addref(export->exp_connection);
+ LASSERT(export->exp_conn_cnt < req->rq_reqmsg->conn_cnt);
+ export->exp_conn_cnt = req->rq_reqmsg->conn_cnt;
+
if (rc == EALREADY) {
/* We indicate the reconnection in a flag, not an error code. */
lustre_msg_add_op_flags(req->rq_repmsg, MSG_CONNECT_RECONNECT);
memcpy(&conn, lustre_msg_buf(req->rq_reqmsg, 2, sizeof conn),
sizeof conn);
- if (export->exp_ldlm_data.led_import != NULL)
- class_destroy_import(export->exp_ldlm_data.led_import);
- dlmimp = export->exp_ldlm_data.led_import = class_new_import();
- dlmimp->imp_connection = ptlrpc_connection_addref(req->rq_connection);
- dlmimp->imp_client = &export->exp_obd->obd_ldlm_client;
- dlmimp->imp_remote_handle = conn;
- dlmimp->imp_obd = target;
- dlmimp->imp_dlm_fake = 1;
- dlmimp->imp_level = LUSTRE_CONN_FULL;
- class_import_put(dlmimp);
+ if (export->exp_imp_reverse != NULL)
+ class_destroy_import(export->exp_imp_reverse);
+ revimp = export->exp_imp_reverse = class_new_import();
+ revimp->imp_connection = ptlrpc_connection_addref(req->rq_connection);
+ revimp->imp_client = &export->exp_obd->obd_ldlm_client;
+ revimp->imp_remote_handle = conn;
+ revimp->imp_obd = target;
+ revimp->imp_dlm_fake = 1;
+ revimp->imp_state = LUSTRE_IMP_FULL;
+ class_import_put(revimp);
out:
if (rc)
req->rq_status = rc;
int target_handle_disconnect(struct ptlrpc_request *req)
{
- struct lustre_handle *conn = &req->rq_reqmsg->handle;
- struct obd_import *dlmimp;
int rc;
ENTRY;
- rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg);
+ rc = lustre_pack_reply(req, 0, NULL, NULL);
if (rc)
RETURN(rc);
- req->rq_status = obd_disconnect(conn, 0);
-
- dlmimp = req->rq_export->exp_ldlm_data.led_import;
- class_destroy_import(dlmimp);
-
- class_export_put(req->rq_export);
+ req->rq_status = obd_disconnect(req->rq_export, 0);
req->rq_export = NULL;
RETURN(0);
}
+void target_destroy_export(struct obd_export *exp)
+{
+ /* exports created from last_rcvd data, and "fake"
+ exports created by lctl don't have an import */
+ if (exp->exp_imp_reverse != NULL)
+ class_destroy_import(exp->exp_imp_reverse);
+
+ /* We cancel locks at disconnect time, but this will catch any locks
+ * granted in a race with recovery-induced disconnect. */
+ ldlm_cancel_locks_for_export(exp);
+}
+
/*
* Recovery functions
*/
DEBUG_REQ(D_ERROR, req, "aborted:");
req->rq_status = -ENOTCONN;
req->rq_type = PTL_RPC_MSG_ERR;
- rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen,
- &req->rq_repmsg);
+ rc = lustre_pack_reply(req, 0, NULL, NULL);
if (rc == 0) {
ptlrpc_reply(req);
} else {
void target_abort_recovery(void *data)
{
struct obd_device *obd = data;
+ int rc;
CERROR("disconnecting clients and aborting recovery\n");
spin_lock_bh(&obd->obd_processing_task_lock);
}
obd->obd_recovering = obd->obd_abort_recovery = 0;
- obd->obd_recoverable_clients = 0;
+
wake_up(&obd->obd_next_transno_waitq);
target_cancel_recovery_timer(obd);
spin_unlock_bh(&obd->obd_processing_task_lock);
+
class_disconnect_exports(obd, 0);
+
+ /* when recovery was abort, cleanup orphans for mds */
+ if (OBT(obd) && OBP(obd, postrecov)) {
+ rc = OBP(obd, postrecov)(obd);
+ if (rc >= 0)
+ CWARN("Cleanup %d orphans after recovery was aborted\n", rc);
+ else
+ CERROR("postrecov failed %d\n", rc);
+ }
+
abort_delayed_replies(obd);
abort_recovery_queue(obd);
ptlrpc_run_recovery_over_upcall(obd);
if (!recovering)
return;
- CERROR("timer will expire in %d seconds\n", OBD_RECOVERY_TIMEOUT / HZ);
+ CDEBUG(D_HA, "timer will expire in %u seconds\n",
+ OBD_RECOVERY_TIMEOUT / HZ);
mod_timer(&obd->obd_recovery_timer, jiffies + OBD_RECOVERY_TIMEOUT);
}
spin_unlock_bh(&obd->obd_processing_task_lock);
return;
}
- CERROR("%s: starting recovery timer\n", obd->obd_name);
+ CWARN("%s: starting recovery timer (%us)\n", obd->obd_name,
+ OBD_RECOVERY_TIMEOUT / HZ);
obd->obd_recovery_handler = handler;
obd->obd_recovery_timer.function = target_recovery_expired;
obd->obd_recovery_timer.data = (unsigned long)obd;
static int check_for_next_transno(struct obd_device *obd)
{
struct ptlrpc_request *req;
- int wake_up;
+ int wake_up = 0, connected, completed, queue_len, max;
+ __u64 next_transno, req_transno;
+ spin_lock_bh(&obd->obd_processing_task_lock);
req = list_entry(obd->obd_recovery_queue.next,
struct ptlrpc_request, rq_list);
- LASSERT(req->rq_reqmsg->transno >= obd->obd_next_recovery_transno);
-
- wake_up = req->rq_reqmsg->transno == obd->obd_next_recovery_transno ||
- (obd->obd_recovering) == 0;
- CDEBUG(D_HA, "check_for_next_transno: "LPD64" vs "LPD64", %d == %d\n",
- req->rq_reqmsg->transno, obd->obd_next_recovery_transno,
- obd->obd_recovering, wake_up);
+ max = obd->obd_max_recoverable_clients;
+ req_transno = req->rq_reqmsg->transno;
+ connected = obd->obd_connected_clients;
+ completed = max - obd->obd_recoverable_clients;
+ queue_len = obd->obd_requests_queued_for_recovery;
+ next_transno = obd->obd_next_recovery_transno;
+
+ if (obd->obd_abort_recovery) {
+ CDEBUG(D_HA, "waking for aborted recovery\n");
+ wake_up = 1;
+ } else if (!obd->obd_recovering) {
+ CDEBUG(D_HA, "waking for completed recovery (?)\n");
+ wake_up = 1;
+ } else if (req_transno == next_transno) {
+ CDEBUG(D_HA, "waking for next ("LPD64")\n", next_transno);
+ wake_up = 1;
+ } else if (queue_len + completed == max) {
+ CDEBUG(D_ERROR,
+ "waking for skipped transno (skip: "LPD64
+ ", ql: %d, comp: %d, conn: %d, next: "LPD64")\n",
+ next_transno, queue_len, completed, max, req_transno);
+ obd->obd_next_recovery_transno = req_transno;
+ wake_up = 1;
+ }
+ spin_unlock_bh(&obd->obd_processing_task_lock);
+ LASSERT(req->rq_reqmsg->transno >= next_transno);
return wake_up;
}
continue;
}
list_del_init(&req->rq_list);
+ obd->obd_requests_queued_for_recovery--;
spin_unlock_bh(&obd->obd_processing_task_lock);
- DEBUG_REQ(D_ERROR, req, "processing: ");
+ DEBUG_REQ(D_HA, req, "processing: ");
(void)obd->obd_recovery_handler(req);
+ obd->obd_replayed_requests++;
reset_recovery_timer(obd);
/* bug 1580: decide how to properly sync() in recovery */
//mds_fsync_super(mds->mds_sb);
list_add_tail(&req->rq_list, &obd->obd_recovery_queue);
}
+ obd->obd_requests_queued_for_recovery++;
+
if (obd->obd_processing_task != 0) {
/* Someone else is processing this queue, we'll leave it to
* them.
*/
- if (transno == obd->obd_next_recovery_transno)
- wake_up(&obd->obd_next_transno_waitq);
+ wake_up(&obd->obd_next_transno_waitq);
spin_unlock_bh(&obd->obd_processing_task_lock);
return 0;
}
struct ptlrpc_request *saved_req;
struct lustre_msg *reqmsg;
int recovery_done = 0;
+ int rc2;
if (rc) {
/* Just like ptlrpc_error, but without the sending. */
- lustre_pack_msg(0, NULL, NULL, &req->rq_replen,
- &req->rq_repmsg);
+ rc = lustre_pack_reply(req, 0, NULL, NULL);
+ LASSERT(rc == 0); /* XXX handle this */
req->rq_type = PTL_RPC_MSG_ERR;
}
LASSERT(list_empty(&req->rq_list));
- /* XXX just like the request-dup code in queue_recovery_request */
+ /* XXX a bit like the request-dup code in queue_recovery_request */
OBD_ALLOC(saved_req, sizeof *saved_req);
if (!saved_req)
LBUG();
if (recovery_done) {
struct list_head *tmp, *n;
ldlm_reprocess_all_ns(req->rq_export->exp_obd->obd_namespace);
- CERROR("%s: all clients recovered, sending delayed replies\n",
+ CWARN("%s: all clients recovered, sending delayed replies\n",
obd->obd_name);
obd->obd_recovering = 0;
+
+ /* when recovering finished, cleanup orphans for mds */
+ if (OBT(obd) && OBP(obd, postrecov)) {
+ rc2 = OBP(obd, postrecov)(obd);
+ if (rc2 >= 0)
+ CWARN("%s: all clients recovered, %d MDS orphans "
+ "deleted\n", obd->obd_name, rc2);
+ else
+ CERROR("postrecov failed %d\n", rc2);
+ }
+
list_for_each_safe(tmp, n, &obd->obd_delayed_reply_queue) {
req = list_entry(tmp, struct ptlrpc_request, rq_list);
DEBUG_REQ(D_ERROR, req, "delayed:");
}
target_cancel_recovery_timer(obd);
} else {
- CERROR("%s: %d recoverable clients remain\n",
+ CWARN("%s: %d recoverable clients remain\n",
obd->obd_name, obd->obd_recoverable_clients);
+ wake_up(&obd->obd_next_transno_waitq);
}
return 1;
* has finished. Note that if the ACK does arrive, its
* callback wakes us in short order. --eeb */
lwi = LWI_TIMEOUT (HZ/4, NULL, NULL);
- rc = l_wait_event(req->rq_wait_for_rep, !req->rq_want_ack,
+ rc = l_wait_event(req->rq_reply_waitq, !req->rq_want_ack,
&lwi);
CDEBUG (D_HA, "Retrying req %p: %d\n", req, rc);
/* NB go back and test rq_want_ack with locking, to ensure
wait_queue_t commit_wait;
struct obd_device *obd =
req->rq_export ? req->rq_export->exp_obd : NULL;
- struct obd_export *exp =
- (req->rq_export && req->rq_ack_locks[0].mode) ?
- req->rq_export : NULL;
+ struct obd_export *exp = NULL;
+
+ if (req->rq_export) {
+ for (i = 0; i < REQ_MAX_ACK_LOCKS; i++) {
+ if (req->rq_ack_locks[i].mode) {
+ exp = req->rq_export;
+ break;
+ }
+ }
+ }
if (exp) {
exp->exp_outstanding_reply = req;
} else {
obd_fail_loc |= OBD_FAIL_ONCE | OBD_FAILED;
DEBUG_REQ(D_ERROR, req, "dropping reply");
- if (!exp && req->rq_repmsg) {
+ if (req->rq_repmsg) {
OBD_FREE(req->rq_repmsg, req->rq_replen);
req->rq_repmsg = NULL;
}
- init_waitqueue_head(&req->rq_wait_for_rep);
+ init_waitqueue_head(&req->rq_reply_waitq);
netrc = 0;
}
init_waitqueue_entry(&commit_wait, current);
add_wait_queue(&obd->obd_commit_waitq, &commit_wait);
- rc = l_wait_event(req->rq_wait_for_rep,
+ rc = l_wait_event(req->rq_reply_waitq,
!req->rq_want_ack || req->rq_resent ||
req->rq_transno <= obd->obd_last_committed, &lwi);
remove_wait_queue(&obd->obd_commit_waitq, &commit_wait);
exp->exp_outstanding_reply = NULL;
- for (ack_lock = req->rq_ack_locks, i = 0; i < 4; i++, ack_lock++) {
+ for (ack_lock = req->rq_ack_locks, i = 0;
+ i < REQ_MAX_ACK_LOCKS; i++, ack_lock++) {
if (!ack_lock->mode)
- break;
+ continue;
ldlm_lock_decref(&ack_lock->lock, ack_lock->mode);
}
}
int target_handle_ping(struct ptlrpc_request *req)
{
- return lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg);
+ return lustre_pack_reply(req, 0, NULL, NULL);
+}
+
+void *ldlm_put_lock_into_req(struct ptlrpc_request *req,
+ struct lustre_handle *lock, int mode)
+{
+ int i;
+
+ for (i = 0; i < REQ_MAX_ACK_LOCKS; i++) {
+ if (req->rq_ack_locks[i].mode)
+ continue;
+ memcpy(&req->rq_ack_locks[i].lock, lock, sizeof(*lock));
+ req->rq_ack_locks[i].mode = mode;
+ return &req->rq_ack_locks[i];
+ }
+ CERROR("no space for lock in struct ptlrpc_request\n");
+ LBUG();
+ return NULL;
}
+