upcoming locking work.
Highlights:
- lock replay infrastructure (needs much more testing, but didn't regress
anything outside recovery)
- b=421: ldlm iterators
- b=348: imports now have service levels, replacing connections' c_level
- replace c_delayed_head with imp_delayed_list
- split imp_request_list into imp_replay_list for retained requests and
imp_sending_list for inflight reqs
- as a side-effect, clean up rq_refcount story materially
- client-side recovery is now dispatched via a per-import handler function,
for better layering and modularity
- wire imports up to recovery before attempting mounts, for better handling of
mount-time failures
ldlm_mode_t l_req_mode;
ldlm_mode_t l_granted_mode;
- ldlm_completion_callback l_completion_ast;
- ldlm_blocking_callback l_blocking_ast;
+ ldlm_completion_callback l_completion_ast;
+ ldlm_blocking_callback l_blocking_ast;
struct obd_export *l_export;
struct lustre_handle *l_connh;
__u32 l_flags;
- struct lustre_handle l_remote_handle;
+ struct lustre_handle l_remote_handle;
void *l_data;
__u32 l_data_len;
struct ldlm_extent l_extent;
#define LDLM_DEBUG_NOLOCK(format, a...) \
CDEBUG(D_DLMTRACE, "### " format "\n" , ## a)
+/*
+ * Iterators.
+ */
+
+#define LDLM_ITER_CONTINUE 0 /* keep iterating */
+#define LDLM_ITER_STOP 1 /* stop iterating */
+
+typedef int (*ldlm_iterator_t)(struct ldlm_lock *, void *);
+
+int ldlm_resource_foreach(struct ldlm_resource *res, ldlm_iterator_t iter,
+ void *closure);
+int ldlm_namespace_foreach(struct ldlm_namespace *ns, ldlm_iterator_t iter,
+ void *closure);
+
+int ldlm_replay_locks(struct obd_import *imp);
+
/* ldlm_extent.c */
int ldlm_extent_compat(struct ldlm_lock *, struct ldlm_lock *);
int ldlm_extent_policy(struct ldlm_lock *, void *, ldlm_mode_t, int flags,
int ptlrpc_run_recovery_upcall(struct ptlrpc_connection *conn);
int ptlrpc_reconnect_import(struct obd_import *imp, int rq_opc);
-int ptlrpc_replay(struct obd_import *imp, int unreplied_only);
-
+int ptlrpc_replay(struct obd_import *imp, int send_last_flag);
+int ptlrpc_resend(struct obd_import *imp);
+void ptlrpc_free_committed(struct obd_import *imp);
+void ptlrpc_wake_delayed(struct obd_import *imp);
#endif
#define IMP_INVALID 1
#define IMP_REPLAYABLE 2
+typedef int (*import_recover_t)(struct obd_import *imp, int phase);
+
#include <linux/lustre_idl.h>
struct obd_import {
+ import_recover_t imp_recover;
struct ptlrpc_connection *imp_connection;
struct ptlrpc_client *imp_client;
struct lustre_handle imp_handle;
struct list_head imp_chain;
- struct list_head imp_request_list;
+
+ /* Lists of requests that are retained for replay, waiting for a reply,
+ * or waiting for recovery to complete, respectively.
+ */
+ struct list_head imp_replay_list;
+ struct list_head imp_sending_list;
+ struct list_head imp_delayed_list;
+
struct obd_device *imp_obd;
int imp_flags;
int imp_level;
__u64 imp_peer_last_xid;
__u64 imp_peer_committed_transno;
- /* Protects flags, level, *_xid, request_list */
+ /* Protects flags, level, *_xid, *_list */
spinlock_t imp_lock;
};
__u8 c_local_uuid[37]; /* XXX do we need this? */
__u8 c_remote_uuid[37];
- int c_level;
__u32 c_generation; /* changes upon new connection */
__u32 c_epoch; /* changes when peer changes */
__u32 c_bootcount; /* peer's boot count */
ldlm_add_waiting_lock(lock);
(void)ptl_send_rpc(req);
- /* no commit, and no waiting for reply, so 2x decref now */
- ptlrpc_req_finished(req);
+ /* not waiting for reply */
ptlrpc_req_finished(req);
RETURN(rc);
req->rq_replen = 0; /* no reply needed */
(void)ptl_send_rpc(req);
- /* no commit, and no waiting for reply, so 2x decref now */
- ptlrpc_req_finished(req);
+
+ /* not waiting for reply */
ptlrpc_req_finished(req);
RETURN(rc);
EXPORT_SYMBOL(ldlm_namespace_free);
EXPORT_SYMBOL(ldlm_namespace_dump);
EXPORT_SYMBOL(ldlm_cancel_locks_for_export);
+EXPORT_SYMBOL(ldlm_replay_locks);
+EXPORT_SYMBOL(ldlm_resource_foreach);
+EXPORT_SYMBOL(ldlm_namespace_foreach);
EXPORT_SYMBOL(l_lock);
EXPORT_SYMBOL(l_unlock);
return ELDLM_OK;
}
+
+/* Lock iterators. */
+
+int ldlm_resource_foreach(struct ldlm_resource *res, ldlm_iterator_t iter,
+ void *closure)
+{
+ struct list_head *tmp, *next;
+ struct ldlm_lock *lock;
+ int rc = LDLM_ITER_CONTINUE;
+ struct ldlm_namespace *ns = res->lr_namespace;
+
+ ENTRY;
+
+ if (!res)
+ RETURN(LDLM_ITER_CONTINUE);
+
+ l_lock(&ns->ns_lock);
+ list_for_each_safe(tmp, next, &res->lr_granted) {
+ lock = list_entry(tmp, struct ldlm_lock, l_res_link);
+
+ if (iter(lock, closure) == LDLM_ITER_STOP)
+ GOTO(out, rc = LDLM_ITER_STOP);
+ }
+
+ list_for_each_safe(tmp, next, &res->lr_converting) {
+ lock = list_entry(tmp, struct ldlm_lock, l_res_link);
+
+ if (iter(lock, closure) == LDLM_ITER_STOP)
+ GOTO(out, rc = LDLM_ITER_STOP);
+ }
+
+ list_for_each_safe(tmp, next, &res->lr_waiting) {
+ lock = list_entry(tmp, struct ldlm_lock, l_res_link);
+
+ if (iter(lock, closure) == LDLM_ITER_STOP)
+ GOTO(out, rc = LDLM_ITER_STOP);
+ }
+ out:
+ l_unlock(&ns->ns_lock);
+ RETURN(rc);
+}
+
+struct iter_helper_data {
+ ldlm_iterator_t iter;
+ void *closure;
+};
+
+static int ldlm_iter_helper(struct ldlm_lock *lock, void *closure)
+{
+ struct iter_helper_data *helper = closure;
+ return helper->iter(lock, helper->closure);
+}
+
+int ldlm_namespace_foreach(struct ldlm_namespace *ns, ldlm_iterator_t iter,
+ void *closure)
+{
+ int i, rc = LDLM_ITER_CONTINUE;
+ struct iter_helper_data helper = { iter: iter, closure: closure };
+
+ l_lock(&ns->ns_lock);
+ for (i = 0; i < RES_HASH_SIZE; i++) {
+ struct list_head *tmp, *next;
+ list_for_each_safe(tmp, next, &(ns->ns_hash[i])) {
+ struct ldlm_resource *res =
+ list_entry(tmp, struct ldlm_resource, lr_hash);
+
+ ldlm_resource_getref(res);
+ rc = ldlm_resource_foreach(res, ldlm_iter_helper,
+ &helper);
+ ldlm_resource_put(res);
+ if (rc == LDLM_ITER_STOP)
+ GOTO(out, rc);
+ }
+ }
+ out:
+ l_unlock(&ns->ns_lock);
+ RETURN(rc);
+}
+
+/* Lock replay */
+
+static int ldlm_chain_lock_for_replay(struct ldlm_lock *lock, void *closure)
+{
+ struct list_head *list = closure;
+
+ /* we use l_pending_chain here, because it's unused on clients. */
+ list_add(&lock->l_pending_chain, list);
+ return LDLM_ITER_CONTINUE;
+}
+
+static int replay_one_lock(struct obd_import *imp, struct ldlm_lock *lock,
+ int last)
+{
+ struct ptlrpc_request *req;
+ struct ldlm_request *body;
+ struct ldlm_reply *reply;
+ int rc, size;
+ int flags = LDLM_FL_REPLAY;
+
+ flags |= lock->l_flags &
+ (LDLM_FL_BLOCK_GRANTED|LDLM_FL_BLOCK_CONV|LDLM_FL_BLOCK_WAIT);
+
+ size = sizeof(*body);
+ req = ptlrpc_prep_req(imp, LDLM_ENQUEUE, 1, &size, NULL);
+ if (!req)
+ RETURN(-ENOMEM);
+
+ body = lustre_msg_buf(req->rq_reqmsg, 0);
+ ldlm_lock2desc(lock, &body->lock_desc);
+ body->lock_flags = flags;
+
+ ldlm_lock2handle(lock, &body->lock_handle1);
+ size = sizeof(*reply);
+ req->rq_replen = lustre_msg_size(1, &size);
+
+ if (last)
+ req->rq_reqmsg->flags |= MSG_LAST_REPLAY;
+
+ LDLM_DEBUG(lock, "replaying lock:");
+ rc = ptlrpc_queue_wait(req);
+ if (rc != ELDLM_OK)
+ GOTO(out, rc);
+
+ reply = lustre_msg_buf(req->rq_repmsg, 0);
+ memcpy(&lock->l_remote_handle, &reply->lock_handle,
+ sizeof(lock->l_remote_handle));
+ LDLM_DEBUG(lock, "replayed lock:");
+ out:
+ ptlrpc_req_finished(req);
+ RETURN(rc);
+}
+
+int ldlm_replay_locks(struct obd_import *imp)
+{
+ struct ldlm_namespace *ns = imp->imp_obd->obd_namespace;
+ struct list_head list, *pos, *next;
+ struct ldlm_lock *lock;
+ int rc = 0;
+
+ ENTRY;
+ INIT_LIST_HEAD(&list);
+
+ l_lock(&ns->ns_lock);
+ (void)ldlm_namespace_foreach(ns, ldlm_chain_lock_for_replay, &list);
+
+ list_for_each_safe(pos, next, &list) {
+ lock = list_entry(pos, struct ldlm_lock, l_pending_chain);
+ rc = replay_one_lock(imp, lock, (next == &list));
+ if (rc)
+ break; /* or try to do the rest? */
+ }
+ l_unlock(&ns->ns_lock);
+ RETURN(rc);
+}
if (!imp->imp_connection)
RETURN(-ENOENT);
- INIT_LIST_HEAD(&imp->imp_request_list);
+ INIT_LIST_HEAD(&imp->imp_replay_list);
+ INIT_LIST_HEAD(&imp->imp_sending_list);
+ INIT_LIST_HEAD(&imp->imp_delayed_list);
spin_lock_init(&imp->imp_lock);
ptlrpc_init_client(rq_portal, rp_portal, name,
request->rq_reqmsg->cookie = conn->cookie;
c = class_conn2export(conn)->exp_connection =
ptlrpc_connection_addref(request->rq_connection);
+ list_add(&imp->imp_chain, &c->c_imports);
recovd_conn_manage(c, recovd, recover);
+ imp->imp_level = LUSTRE_CONN_CON;
rc = ptlrpc_queue_wait(request);
if (rc)
GOTO(out_req, rc);
if (rq_opc == MDS_CONNECT)
imp->imp_flags |= IMP_REPLAYABLE;
- list_add(&imp->imp_chain, &c->c_imports);
- c->c_level = LUSTRE_CONN_FULL;
+ imp->imp_level = LUSTRE_CONN_FULL;
imp->imp_handle.addr = request->rq_repmsg->addr;
imp->imp_handle.cookie = request->rq_repmsg->cookie;
dlmimp->imp_handle.cookie = req->rq_reqmsg->cookie;
dlmimp->imp_obd = /* LDLM! */ NULL;
spin_lock_init(&dlmimp->imp_lock);
-
- req->rq_connection->c_level = LUSTRE_CONN_FULL;
+ dlmimp->imp_level = LUSTRE_CONN_FULL;
out:
req->rq_status = rc;
RETURN(rc);
{
ENTRY;
- conn->c_level = LUSTRE_CONN_RECOVD;
conn->c_recovd_data.rd_phase = RD_PREPARED;
RETURN(0);
#include <linux/lustre_ha.h>
#include <linux/lustre_dlm.h>
#include <linux/lustre_idl.h>
-#include <linux/obd_lov.h> /* for IOC_LOV_SET_OSC_ACTIVE */
static int ll_retry_recovery(struct ptlrpc_connection *conn)
{
RETURN(0);
}
-/* XXX looks a lot like super.c:invalidate_request_list, don't it? */
-static void abort_inflight_for_import(struct obd_import *imp)
-{
- struct list_head *tmp, *n;
-
- /* Make sure that no new requests get processed for this import.
- * ptlrpc_queue_wait must (and does) hold c_lock while testing this
- * flags and then putting requests on sending_head or delayed_head.
- */
- spin_lock(&imp->imp_connection->c_lock);
- imp->imp_flags |= IMP_INVALID;
- spin_unlock(&imp->imp_connection->c_lock);
-
- list_for_each_safe(tmp, n, &imp->imp_request_list) {
- struct ptlrpc_request *req =
- list_entry(tmp, struct ptlrpc_request, rq_list);
-
- if (req->rq_flags & PTL_RPC_FL_REPLIED) {
- /* no need to replay, just discard */
- DEBUG_REQ(D_ERROR, req, "uncommitted");
- ptlrpc_req_finished(req);
- } else {
- DEBUG_REQ(D_ERROR, req, "inflight");
- req->rq_flags |= PTL_RPC_FL_ERR;
- wake_up(&req->rq_wait_for_rep);
- }
- }
-
- list_for_each_safe(tmp, n, &imp->imp_connection->c_delayed_head) {
- struct ptlrpc_request *req =
- list_entry(tmp, struct ptlrpc_request, rq_list);
-
- if (req->rq_import != imp)
- continue;
-
- DEBUG_REQ(D_ERROR, req, "aborting waiting req");
- req->rq_flags |= PTL_RPC_FL_ERR;
- wake_up(&req->rq_wait_for_rep);
- }
-}
-
-static void set_osc_active(struct obd_import *imp, int active)
-{
- struct obd_device *notify_obd = imp->imp_obd->u.cli.cl_containing_lov;
-
- if (notify_obd == NULL)
- return;
-
- /* How gross is _this_? */
- if (!list_empty(¬ify_obd->obd_exports)) {
- int rc;
- struct lustre_handle fakeconn;
- struct obd_ioctl_data ioc_data;
- struct obd_export *exp =
- list_entry(notify_obd->obd_exports.next,
- struct obd_export, exp_obd_chain);
-
- fakeconn.addr = (__u64)(unsigned long)exp;
- fakeconn.cookie = exp->exp_cookie;
- ioc_data.ioc_inlbuf1 = imp->imp_obd->obd_uuid;
- ioc_data.ioc_offset = active;
- rc = obd_iocontrol(IOC_LOV_SET_OSC_ACTIVE, &fakeconn,
- sizeof ioc_data, &ioc_data, NULL);
- if (rc)
- CERROR("disabling %s on LOV %p/%s: %d\n",
- imp->imp_obd->obd_uuid, notify_obd,
- notify_obd->obd_uuid, rc);
- } else {
- CDEBUG(D_HA, "No exports for obd %p/%s, can't notify about "
- "%p\n", notify_obd, notify_obd->obd_uuid,
- imp->imp_obd->obd_uuid);
- }
-}
-
-static void prepare_osc(struct obd_import *imp)
-{
- struct ldlm_namespace *ns = imp->imp_obd->obd_namespace;
-
- CDEBUG(D_HA, "invalidating all locks for OST imp %p (to %s):\n",
- imp, imp->imp_connection->c_remote_uuid);
- ldlm_namespace_dump(ns);
- ldlm_namespace_cleanup(ns, 1 /* no network ops */);
-
- abort_inflight_for_import(imp);
-
- set_osc_active(imp, 0 /* inactive */);
-}
-
-static void prepare_mdc(struct obd_import *imp)
-{
- struct ldlm_namespace *ns = imp->imp_obd->obd_namespace;
- ldlm_cli_cancel_unused(ns, NULL, LDLM_FL_LOCAL_ONLY);
-}
-
-static int ll_prepare_recovery(struct ptlrpc_connection *conn)
-{
- struct list_head *tmp;
-
- list_for_each(tmp, &conn->c_imports) {
- struct obd_import *imp = list_entry(tmp, struct obd_import,
- imp_chain);
-
- if (imp->imp_obd->obd_type->typ_ops->o_brw)
- prepare_osc(imp);
- else
- prepare_mdc(imp);
- }
-
- return ptlrpc_run_recovery_upcall(conn);
-}
-
-static void reconnect_osc(struct obd_import *imp)
-{
- int rc = ptlrpc_reconnect_import(imp, OST_CONNECT);
- if (rc == 0)
- set_osc_active(imp, 1 /* active */);
- else
- CDEBUG(D_HA, "reconnection failed, not reactivating OSC %s\n",
- imp->imp_obd->obd_uuid);
-}
-
-static void reconnect_mdc(struct obd_import *imp)
-{
- int rc = ptlrpc_reconnect_import(imp, MDS_CONNECT);
- if (!rc)
- ptlrpc_replay(imp, 0 /* all reqs */);
- else if (rc == EALREADY)
- ptlrpc_replay(imp, 1 /* only unreplied reqs */);
-}
-
-static int ll_reconnect(struct ptlrpc_connection *conn)
-{
- struct list_head *tmp;
-
- ENTRY;
- list_for_each(tmp, &conn->c_imports) {
- struct obd_import *imp = list_entry(tmp, struct obd_import,
- imp_chain);
- if (imp->imp_obd->obd_type->typ_ops->o_brw) {
- reconnect_osc(imp);
- } else {
- reconnect_mdc(imp);
- }
- }
-
- conn->c_level = LUSTRE_CONN_FULL;
- RETURN(0);
-}
-
int ll_recover(struct recovd_data *rd, int phase)
{
struct ptlrpc_connection *conn = class_rd2conn(rd);
+ struct list_head *tmp;
LASSERT(conn);
ENTRY;
switch (phase) {
case PTLRPC_RECOVD_PHASE_PREPARE:
- RETURN(ll_prepare_recovery(conn));
case PTLRPC_RECOVD_PHASE_RECOVER:
- RETURN(ll_reconnect(conn));
+ list_for_each(tmp, &conn->c_imports) {
+ struct obd_import *imp =
+ list_entry(tmp, struct obd_import, imp_chain);
+
+ if (phase == PTLRPC_RECOVD_PHASE_PREPARE) {
+ spin_lock(&imp->imp_lock);
+ imp->imp_level = LUSTRE_CONN_RECOVD;
+ spin_unlock(&imp->imp_lock);
+ }
+ imp->imp_recover(imp, phase);
+ }
+
+ if (phase == PTLRPC_RECOVD_PHASE_PREPARE)
+ RETURN(ptlrpc_run_recovery_upcall(conn));
+ RETURN(0);
+
case PTLRPC_RECOVD_PHASE_FAILURE:
RETURN(ll_retry_recovery(conn));
}
GOTO(out_free, sb = NULL);
}
-#warning Mike: is this the right place to raise the connection level?
mdc_conn = sbi2mdc(sbi)->cl_import.imp_connection;
- mdc_conn->c_level = LUSTRE_CONN_FULL;
list_add(&mdc_conn->c_sb_chain, &sbi->ll_conn_chain);
obd = class_uuid2obd(osc);
GOTO(out_free, sb = NULL);
}
-#warning Peter: is this the right place to raise the connection level?
mdc_conn = sbi2mdc(sbi)->cl_import.imp_connection;
- mdc_conn->c_level = LUSTRE_CONN_FULL;
list_add(&mdc_conn->c_sb_chain, &sbi->ll_conn_chain);
obd = class_uuid2obd(osc);
return rc;
}
-int mdc_attach(struct obd_device *dev, obd_count len, void *data)
+
+static int mdc_attach(struct obd_device *dev, obd_count len, void *data)
{
return lprocfs_reg_obd(dev, status_var_nm_1, dev);
}
-int mdc_detach(struct obd_device *dev)
+static int mdc_detach(struct obd_device *dev)
{
return lprocfs_dereg_obd(dev);
}
+
+static int mdc_recover(struct obd_import *imp, int phase)
+{
+ int rc;
+ ENTRY;
+
+ switch(phase) {
+ case PTLRPC_RECOVD_PHASE_PREPARE:
+ ldlm_cli_cancel_unused(imp->imp_obd->obd_namespace,
+ NULL, LDLM_FL_LOCAL_ONLY);
+ RETURN(0);
+ case PTLRPC_RECOVD_PHASE_RECOVER:
+ rc = ptlrpc_reconnect_import(imp, MDS_CONNECT);
+ if (rc == EALREADY)
+ RETURN(ptlrpc_replay(imp, 0));
+ if (rc)
+ RETURN(rc);
+
+ rc = ptlrpc_replay(imp, 0 /* no last flag*/);
+ if (rc)
+ RETURN(rc);
+
+ rc = ldlm_replay_locks(imp);
+ if (rc)
+ RETURN(rc);
+
+ spin_lock(&imp->imp_lock);
+ imp->imp_level = LUSTRE_CONN_FULL;
+ spin_unlock(&imp->imp_lock);
+
+ ptlrpc_wake_delayed(imp);
+
+ rc = ptlrpc_resend(imp);
+ if (rc)
+ RETURN(rc);
+
+ RETURN(0);
+ default:
+ RETURN(-EINVAL);
+ }
+}
+
+static int mdc_connect(struct lustre_handle *conn, struct obd_device *obd,
+ obd_uuid_t cluuid, struct recovd_obd *recovd,
+ ptlrpc_recovery_cb_t recover)
+{
+ struct obd_import *imp = &obd->u.cli.cl_import;
+ imp->imp_recover = mdc_recover;
+ return client_obd_connect(conn, obd, cluuid, recovd, recover);
+}
+
struct obd_ops mdc_obd_ops = {
o_attach: mdc_attach,
o_detach: mdc_detach,
o_setup: client_obd_setup,
o_cleanup: client_obd_cleanup,
- o_connect: client_obd_connect,
+ o_connect: mdc_connect,
o_disconnect: client_obd_disconnect,
o_statfs: mdc_statfs,
};
#include <linux/kp30.h>
#include <linux/lustre_mds.h> /* for mds_objid */
#include <linux/obd_ost.h>
+#include <linux/obd_lov.h> /* for IOC_LOV_SET_OSC_ACTIVE */
#include <linux/ctype.h>
#include <linux/init.h>
#include <linux/lustre_ha.h>
extern struct lprocfs_vars status_var_nm_1[];
extern struct lprocfs_vars status_class_var[];
-int osc_attach(struct obd_device *dev, obd_count len, void *data)
+static int osc_attach(struct obd_device *dev, obd_count len, void *data)
{
return lprocfs_reg_obd(dev, status_var_nm_1, dev);
}
-int osc_detach(struct obd_device *dev)
+static int osc_detach(struct obd_device *dev)
{
return lprocfs_dereg_obd(dev);
}
return err;
}
+static void set_osc_active(struct obd_import *imp, int active)
+{
+ struct obd_device *notify_obd = imp->imp_obd->u.cli.cl_containing_lov;
+
+ if (notify_obd == NULL)
+ return;
+
+ /* How gross is _this_? */
+ if (!list_empty(¬ify_obd->obd_exports)) {
+ int rc;
+ struct lustre_handle fakeconn;
+ struct obd_ioctl_data ioc_data;
+ struct obd_export *exp =
+ list_entry(notify_obd->obd_exports.next,
+ struct obd_export, exp_obd_chain);
+
+ fakeconn.addr = (__u64)(unsigned long)exp;
+ fakeconn.cookie = exp->exp_cookie;
+ ioc_data.ioc_inlbuf1 = imp->imp_obd->obd_uuid;
+ ioc_data.ioc_offset = active;
+ rc = obd_iocontrol(IOC_LOV_SET_OSC_ACTIVE, &fakeconn,
+ sizeof ioc_data, &ioc_data, NULL);
+ if (rc)
+ CERROR("disabling %s on LOV %p/%s: %d\n",
+ imp->imp_obd->obd_uuid, notify_obd,
+ notify_obd->obd_uuid, rc);
+ } else {
+ CDEBUG(D_HA, "No exports for obd %p/%s, can't notify about "
+ "%p\n", notify_obd, notify_obd->obd_uuid,
+ imp->imp_obd->obd_uuid);
+ }
+}
+
+
+/* XXX looks a lot like super.c:invalidate_request_list, don't it? */
+static void abort_inflight_for_import(struct obd_import *imp)
+{
+ struct list_head *tmp, *n;
+
+ /* Make sure that no new requests get processed for this import.
+ * ptlrpc_queue_wait must (and does) hold imp_lock while testing this
+ * flag and then putting requests on sending_list or delayed_list.
+ */
+ spin_lock(&imp->imp_lock);
+ imp->imp_flags |= IMP_INVALID;
+ spin_unlock(&imp->imp_lock);
+
+ list_for_each_safe(tmp, n, &imp->imp_sending_list) {
+ struct ptlrpc_request *req =
+ list_entry(tmp, struct ptlrpc_request, rq_list);
+
+ DEBUG_REQ(D_HA, req, "inflight");
+ req->rq_flags |= PTL_RPC_FL_ERR;
+ wake_up(&req->rq_wait_for_rep);
+ }
+
+ list_for_each_safe(tmp, n, &imp->imp_delayed_list) {
+ struct ptlrpc_request *req =
+ list_entry(tmp, struct ptlrpc_request, rq_list);
+
+ DEBUG_REQ(D_HA, req, "aborting waiting req");
+ req->rq_flags |= PTL_RPC_FL_ERR;
+ wake_up(&req->rq_wait_for_rep);
+ }
+}
+
+static int osc_recover(struct obd_import *imp, int phase)
+{
+ int rc;
+ ENTRY;
+
+ switch(phase) {
+ case PTLRPC_RECOVD_PHASE_PREPARE: {
+ struct ldlm_namespace *ns = imp->imp_obd->obd_namespace;
+ ldlm_namespace_cleanup(ns, 1 /* no network ops */);
+ abort_inflight_for_import(imp);
+ set_osc_active(imp, 0 /* inactive */);
+ RETURN(0);
+ }
+ case PTLRPC_RECOVD_PHASE_RECOVER:
+ rc = ptlrpc_reconnect_import(imp, OST_CONNECT);
+ if (rc)
+ RETURN(rc);
+ set_osc_active(imp, 1 /* active */);
+ RETURN(0);
+ default:
+ RETURN(-EINVAL);
+ }
+}
+
+static int osc_connect(struct lustre_handle *conn, struct obd_device *obd,
+ obd_uuid_t cluuid, struct recovd_obd *recovd,
+ ptlrpc_recovery_cb_t recover)
+{
+ struct obd_import *imp = &obd->u.cli.cl_import;
+ imp->imp_recover = osc_recover;
+ return client_obd_connect(conn, obd, cluuid, recovd, recover);
+}
+
struct obd_ops osc_obd_ops = {
o_attach: osc_attach,
o_detach: osc_detach,
o_setup: client_obd_setup,
o_cleanup: client_obd_cleanup,
- o_connect: client_obd_connect,
+ o_connect: osc_connect,
o_disconnect: client_obd_disconnect,
o_statfs: osc_statfs,
o_packmd: osc_packmd,
CERROR("IO of %d pages to/from %s:%d (conn %p) timed out\n",
desc->bd_page_count, desc->bd_connection->c_remote_uuid,
desc->bd_portal, desc->bd_connection);
- desc->bd_connection->c_level = LUSTRE_CONN_RECOVD;
/* This one will "never" arrive, don't wait for it. */
if (atomic_dec_and_test(&set->brw_refcount))
request->rq_connection = ptlrpc_connection_addref(conn);
INIT_LIST_HEAD(&request->rq_list);
- /*
- * This will be reduced once when the sender is finished (waiting for
- * reply, f.e.), and once when the request has been committed and is
- * removed from the to-be-committed list.
- *
- * Also, the refcount will be increased in ptl_send_rpc immediately
- * before we hand it off to portals, and there will be a corresponding
- * decrease in request_out_cb (which is called to indicate that portals
- * is finished with the request, and it can be safely freed).
- *
- * (Except in the DLM server case, where it will be dropped twice
- * by the sender, and then the last time by request_out_callback.)
- */
- atomic_set(&request->rq_refcount, 2);
+ atomic_set(&request->rq_refcount, 1);
spin_lock(&imp->imp_lock);
request->rq_xid = HTON__u32(++imp->imp_last_xid);
{
int rc = 0;
+ ENTRY;
if (req->rq_repmsg != NULL) {
- struct obd_import *imp = req->rq_import;
- struct ptlrpc_connection *conn = imp->imp_connection;
- ENTRY;
- if (req->rq_level > conn->c_level) {
- DEBUG_REQ(D_HA, req,
- "recovery started, ignoring (%d > %d)",
- req->rq_level, conn->c_level);
- req->rq_repmsg = NULL;
- GOTO(out, rc = 0);
- }
req->rq_transno = NTOH__u64(req->rq_repmsg->transno);
- spin_lock(&imp->imp_lock);
- if (req->rq_transno > imp->imp_max_transno) {
- imp->imp_max_transno = req->rq_transno;
- } else if (req->rq_transno != 0) {
- if (conn->c_level == LUSTRE_CONN_FULL) {
- CERROR("got transno "LPD64" after "
- LPD64": recovery may not work\n",
- req->rq_transno, imp->imp_max_transno);
- }
- }
- spin_unlock(&imp->imp_lock);
req->rq_flags |= PTL_RPC_FL_REPLIED;
GOTO(out, rc = 1);
}
struct list_head *tmp, *saved;
struct ptlrpc_request *req;
- list_for_each_safe(tmp, saved, &imp->imp_request_list) {
+ list_for_each_safe(tmp, saved, &imp->imp_replay_list) {
req = list_entry(tmp, struct ptlrpc_request, rq_list);
if (req->rq_flags & PTL_RPC_FL_REPLAY) {
continue;
}
- /* If neither replied-to nor restarted, keep it. */
- if (!(req->rq_flags &
- (PTL_RPC_FL_REPLIED | PTL_RPC_FL_RESTART))) {
- DEBUG_REQ(D_HA, req, "keeping (in-flight)");
- continue;
- }
-
- /* This needs to match the commit test in ptlrpc_queue_wait() */
- if (!(req->rq_import->imp_flags & IMP_REPLAYABLE) ||
- req->rq_transno == 0) {
- DEBUG_REQ(D_HA, req, "keeping (queue_wait will free)");
- continue;
- }
-
/* not yet committed */
if (req->rq_transno > imp->imp_peer_committed_transno)
break;
LASSERT(conn);
spin_lock(&imp->imp_lock);
- list_for_each_safe(tmp, saved, &imp->imp_request_list) {
+ list_for_each_safe(tmp, saved, &imp->imp_replay_list) {
req = list_entry(tmp, struct ptlrpc_request, rq_list);
/* XXX we should make sure that nobody's sleeping on these! */
RETURN(1);
req->rq_timeout = 0;
- req->rq_connection->c_level = LUSTRE_CONN_RECOVD;
recovd_conn_fail(req->rq_import->imp_connection);
+#if 0
/* If this request is for recovery or other primordial tasks,
* don't go back to sleep.
*/
if (req->rq_level < LUSTRE_CONN_FULL)
RETURN(1);
+#endif
RETURN(0);
}
RETURN(1); /* ignored, as of this writing */
}
-/* If we're being torn down by umount -f, or the import has been
- * invalidated (such as by an OST failure), the request must fail with
- * -EIO.
+/* If the import has been invalidated (such as by an OST failure), the
+ * request must fail with -EIO.
*
- * Must be called with conn->c_lock held, will drop it if it returns -EIO.
- *
- * XXX this should just be testing the import, and umount_begin shouldn't touch
- * XXX the connection.
+ * Must be called with imp_lock held, will drop it if it returns -EIO.
*/
-#define EIO_IF_INVALID(conn, req) \
-if ((conn->c_flags & CONN_INVALID) || \
- (req->rq_import->imp_flags & IMP_INVALID)) { \
- DEBUG_REQ(D_ERROR, req, "%s_INVALID:", \
- (conn->c_flags & CONN_INVALID) ? "CONN" : "IMP"); \
- spin_unlock(&conn->c_lock); \
+#define EIO_IF_INVALID(req) \
+if (req->rq_import->imp_flags & IMP_INVALID) { \
+ DEBUG_REQ(D_ERROR, req, "IMP_INVALID:"); \
+ spin_unlock(&imp->imp_lock); \
RETURN(-EIO); \
}
ENTRY;
init_waitqueue_head(&req->rq_wait_for_rep);
- req->rq_reqmsg->status = HTON__u32(current->pid); /* for distributed debugging */
+
+ /* for distributed debugging */
+ req->rq_reqmsg->status = HTON__u32(current->pid);
CDEBUG(D_RPCTRACE, "Sending RPC pid:xid:nid:opc %d:"LPU64":%x:%d\n",
NTOH__u32(req->rq_reqmsg->status), req->rq_xid,
conn->c_peer.peer_nid, NTOH__u32(req->rq_reqmsg->opc));
-
- /* XXX probably both an import and connection level are needed */
- if (req->rq_level > conn->c_level) {
- spin_lock(&conn->c_lock);
- EIO_IF_INVALID(conn, req);
+ if (req->rq_level > imp->imp_level) {
+ spin_lock(&imp->imp_lock);
+ EIO_IF_INVALID(req);
list_del(&req->rq_list);
- list_add_tail(&req->rq_list, &conn->c_delayed_head);
- spin_unlock(&conn->c_lock);
+ list_add_tail(&req->rq_list, &imp->imp_delayed_list);
+ spin_unlock(&imp->imp_lock);
- DEBUG_REQ(D_HA, req, "waiting for recovery: (%d < %d)",
- req->rq_level, conn->c_level);
+ DEBUG_REQ(D_HA, req, "\"%s\" waiting for recovery: (%d < %d)",
+ current->comm, req->rq_level, imp->imp_level);
lwi = LWI_INTR(NULL, NULL);
rc = l_wait_event(req->rq_wait_for_rep,
- (req->rq_level <= conn->c_level) ||
+ (req->rq_level <= imp->imp_level) ||
(req->rq_flags & PTL_RPC_FL_ERR), &lwi);
- spin_lock(&conn->c_lock);
+ spin_lock(&imp->imp_lock);
list_del_init(&req->rq_list);
- spin_unlock(&conn->c_lock);
+ spin_unlock(&imp->imp_lock);
if (req->rq_flags & PTL_RPC_FL_ERR)
RETURN(-EIO);
}
resend:
req->rq_timeout = obd_timeout;
- spin_lock(&conn->c_lock);
- EIO_IF_INVALID(conn, req);
+ spin_lock(&imp->imp_lock);
+ EIO_IF_INVALID(req);
- list_del(&req->rq_list);
- list_add_tail(&req->rq_list, &imp->imp_request_list);
- spin_unlock(&conn->c_lock);
+ LASSERT(list_empty(&req->rq_list));
+ list_add_tail(&req->rq_list, &imp->imp_sending_list);
+ spin_unlock(&imp->imp_lock);
rc = ptl_send_rpc(req);
if (rc) {
CDEBUG(D_HA, "error %d, opcode %d, need recovery\n", rc,
l_wait_event(req->rq_wait_for_rep, ptlrpc_check_reply(req), &lwi);
DEBUG_REQ(D_NET, req, "-- done sleeping");
+ spin_lock(&imp->imp_lock);
+ list_del_init(&req->rq_list);
+ spin_unlock(&imp->imp_lock);
+
if (req->rq_flags & PTL_RPC_FL_ERR) {
ptlrpc_abort(req);
GOTO(out, rc = -EIO);
goto resend;
}
- // up(&cli->cli_rpc_sem);
if (req->rq_flags & PTL_RPC_FL_INTR) {
if (!(req->rq_flags & PTL_RPC_FL_TIMEOUT))
LBUG(); /* should only be interrupted if we timed out */
CDEBUG(D_NET, "--> buf %p len %d status %d\n", req->rq_repmsg,
req->rq_replen, req->rq_repmsg->status);
- spin_lock(&conn->c_lock);
- /* Requests that aren't from replayable imports, or which don't have
- * transno information, can be "committed" early.
- *
- * But don't commit anything that's kept indefinitely for replay (has
- * the PTL_RPC_FL_REPLAY flag set), such as open requests.
- *
- * This needs to match the commit test in ptlrpc_free_committed().
- */
- if (!(req->rq_import->imp_flags & IMP_REPLAYABLE) ||
- (req->rq_repmsg->transno == 0 &&
- (req->rq_flags & PTL_RPC_FL_REPLAY) == 0)) {
- /* This import doesn't support replay, so we can just "commit"
- * this request now.
- */
- DEBUG_REQ(D_HA, req, "not replayable, committing:");
- list_del_init(&req->rq_list);
- __ptlrpc_req_finished(req, 1);
- }
if (req->rq_import->imp_flags & IMP_REPLAYABLE) {
+ spin_lock(&imp->imp_lock);
+ if (req->rq_flags & PTL_RPC_FL_REPLAY || req->rq_transno != 0) {
+ /* Balanced in ptlrpc_free_committed, usually. */
+ atomic_inc(&req->rq_refcount);
+ list_add_tail(&req->rq_list, &imp->imp_replay_list);
+ }
+
+ if (req->rq_transno > imp->imp_max_transno) {
+ imp->imp_max_transno = req->rq_transno;
+ } else if (req->rq_transno != 0 &&
+ imp->imp_level == LUSTRE_CONN_FULL) {
+ CERROR("got transno "LPD64" after "LPD64": recovery "
+ "may not work\n", req->rq_transno,
+ imp->imp_max_transno);
+ }
+
/* Replay-enabled imports return commit-status information. */
imp->imp_peer_last_xid = req->rq_repmsg->last_xid;
- imp->imp_peer_committed_transno =
+ imp->imp_peer_committed_transno =
req->rq_repmsg->last_committed;
+ spin_unlock(&imp->imp_lock);
ptlrpc_free_committed(imp);
}
rc = ptlrpc_check_status(req);
- spin_unlock(&conn->c_lock);
EXIT;
out:
if (c == NULL)
GOTO(out, c);
- c->c_level = LUSTRE_CONN_NEW;
c->c_generation = 1;
c->c_epoch = 1;
c->c_bootcount = 0;
int rc;
ENTRY;
- conn->c_level = LUSTRE_CONN_RECOVD;
-
argv[0] = obd_recovery_upcall;
argv[1] = conn->c_remote_uuid;
argv[2] = NULL;
RETURN(0);
}
-#define REPLAY_COMMITTED 0 /* Fully processed (commit + reply). */
-#define REPLAY_REPLAY 1 /* Forced-replay (e.g. open). */
-#define REPLAY_RESEND 2 /* Resend required. */
-#define REPLAY_RESEND_IGNORE 3 /* Resend, ignore the reply (already saw it). */
-#define REPLAY_RESTART 4 /* Have to restart the call, sorry! */
-
-static int replay_state(struct ptlrpc_request *req, __u64 committed)
+int ptlrpc_replay(struct obd_import *imp, int send_last_flag)
{
- /* This request must always be replayed. */
- if (req->rq_flags & PTL_RPC_FL_REPLAY)
- return REPLAY_REPLAY;
+ int rc = 0;
+ struct list_head *tmp, *pos;
+ struct ptlrpc_request *req;
+ __u64 committed = imp->imp_peer_committed_transno;
+ ENTRY;
- /* Uncommitted request */
- if (req->rq_transno > committed) {
- if (req->rq_flags & PTL_RPC_FL_REPLIED) {
- /* Saw reply, so resend and ignore new reply. */
- return REPLAY_RESEND_IGNORE;
- }
+ /* It might have committed some after we last spoke, so make sure we
+ * get rid of them now.
+ */
+ ptlrpc_free_committed(imp);
- /* Didn't see reply either, so resend. */
- return REPLAY_RESEND;
+ spin_lock(&imp->imp_lock);
+
+ CDEBUG(D_HA, "import %p from %s has committed "LPD64"\n",
+ imp, imp->imp_obd->u.cli.cl_target_uuid, committed);
+
+ list_for_each(tmp, &imp->imp_replay_list) {
+ req = list_entry(tmp, struct ptlrpc_request, rq_list);
+ DEBUG_REQ(D_HA, req, "RETAINED: ");
}
- /* This request has been committed and we saw the reply. Goodbye! */
- if (req->rq_flags & PTL_RPC_FL_REPLIED)
- return REPLAY_COMMITTED;
+ list_for_each_safe(tmp, pos, &imp->imp_replay_list) {
+ req = list_entry(tmp, struct ptlrpc_request, rq_list);
+
+ if (req->rq_transno == imp->imp_max_transno &&
+ send_last_flag) {
+ req->rq_reqmsg->flags |= MSG_LAST_REPLAY;
+ DEBUG_REQ(D_HA, req, "LAST_REPLAY:");
+ } else {
+ DEBUG_REQ(D_HA, req, "REPLAY:");
+ }
+
+ rc = ptlrpc_replay_req(req);
+ req->rq_reqmsg->flags &= ~MSG_LAST_REPLAY;
- /* Request committed, but we didn't see the reply: have to restart. */
- return REPLAY_RESTART;
+ if (rc) {
+ CERROR("recovery replay error %d for req %Ld\n",
+ rc, req->rq_xid);
+ GOTO(out, rc);
+ }
+ }
+
+ out:
+ spin_unlock(&imp->imp_lock);
+ return rc;
}
-static char *replay_state2str(int state) {
- static char *state_strings[] = {
- "COMMITTED", "REPLAY", "RESEND", "RESEND_IGNORE", "RESTART",
- };
- static char *unknown_state = "UNKNOWN";
+#define NO_RESEND 0 /* No action required. */
+#define RESEND 1 /* Resend required. */
+#define RESEND_IGNORE 2 /* Resend, ignore the reply (already saw it). */
+#define RESTART 3 /* Have to restart the call, sorry! */
+
+static int resend_type(struct ptlrpc_request *req, __u64 committed)
+{
+ if (req->rq_transno < committed) {
+ if (req->rq_flags & PTL_RPC_FL_REPLIED) {
+ /* Saw the reply and it was committed, no biggie. */
+ DEBUG_REQ(D_HA, req, "NO_RESEND");
+ return NO_RESEND;
+ }
+ /* Request committed, but no reply: have to restart. */
+ return RESTART;
+ }
- if (state < 0 ||
- state > (sizeof(state_strings) / sizeof(state_strings[0]))) {
- return unknown_state;
+ if (req->rq_flags & PTL_RPC_FL_REPLIED) {
+ /* Saw reply, so resend and ignore new reply. */
+ return RESEND_IGNORE;
}
- return state_strings[state];
+ /* Didn't see reply either, so resend. */
+ return RESEND;
+
}
-int ptlrpc_replay(struct obd_import *imp, int unreplied_only)
+int ptlrpc_resend(struct obd_import *imp)
{
- int rc = 0, state;
+ int rc = 0, type;
struct list_head *tmp, *pos;
struct ptlrpc_request *req;
- struct ptlrpc_connection *conn = imp->imp_connection;
__u64 committed = imp->imp_peer_committed_transno;
+
ENTRY;
spin_lock(&imp->imp_lock);
-
- CDEBUG(D_HA, "import %p from %s has committed "LPD64"\n",
- imp, imp->imp_obd->u.cli.cl_target_uuid, committed);
-
- list_for_each(tmp, &imp->imp_request_list) {
- req = list_entry(tmp, struct ptlrpc_request, rq_list);
- state = replay_state(req, committed);
- DEBUG_REQ(D_HA, req, "SENDING: %s: ", replay_state2str(state));
- }
-
- list_for_each(tmp, &conn->c_delayed_head) {
+ list_for_each(tmp, &imp->imp_sending_list) {
req = list_entry(tmp, struct ptlrpc_request, rq_list);
- state = replay_state(req, committed);
- DEBUG_REQ(D_HA, req, "DELAYED: %s: ", replay_state2str(state));
+ DEBUG_REQ(D_HA, req, "SENDING: ");
}
- list_for_each_safe(tmp, pos, &imp->imp_request_list) {
+ list_for_each_safe(tmp, pos, &imp->imp_sending_list) {
req = list_entry(tmp, struct ptlrpc_request, rq_list);
-
- if (unreplied_only) {
- if (!(req->rq_flags & PTL_RPC_FL_REPLIED)) {
- DEBUG_REQ(D_HA, req, "UNREPLIED:");
- ptlrpc_restart_req(req);
- }
- continue;
- }
-
- state = replay_state(req, committed);
-
- if (req->rq_transno == imp->imp_max_transno) {
- req->rq_reqmsg->flags |= MSG_LAST_REPLAY;
- DEBUG_REQ(D_HA, req, "last for replay");
- LASSERT(state != REPLAY_COMMITTED);
- }
-
- switch (state) {
- case REPLAY_REPLAY:
- DEBUG_REQ(D_HA, req, "REPLAY:");
- rc = ptlrpc_replay_req(req);
-#if 0
-#error We should not hold a spinlock over such a lengthy operation.
-#error If necessary, drop spinlock, do operation, re-get spinlock, restart loop.
-#error If we need to avoid re-processint items, then delete them from the list
-#error as they are replayed and re-add at the tail of this list, so the next
-#error item to process will always be at the head of the list.
-#endif
- if (rc) {
- CERROR("recovery replay error %d for req %Ld\n",
- rc, req->rq_xid);
- GOTO(out, rc);
- }
+
+ switch(resend_type(req, committed)) {
+ case NO_RESEND:
break;
- case REPLAY_COMMITTED:
- DEBUG_REQ(D_ERROR, req, "COMMITTED:");
- /* XXX commit now? */
+ case RESTART:
+ DEBUG_REQ(D_HA, req, "RESTART:");
+ ptlrpc_restart_req(req);
break;
- case REPLAY_RESEND_IGNORE:
+ case RESEND_IGNORE:
DEBUG_REQ(D_HA, req, "RESEND_IGNORE:");
- rc = ptlrpc_replay_req(req);
+ rc = ptlrpc_replay_req(req);
if (rc) {
- CERROR("request resend error %d for req %Ld\n",
- rc, req->rq_xid);
- GOTO(out, rc);
+ DEBUG_REQ(D_ERROR, req, "error %d resending:",
+ rc);
+ ptlrpc_restart_req(req); /* might as well */
}
break;
- case REPLAY_RESTART:
- DEBUG_REQ(D_HA, req, "RESTART:");
- ptlrpc_restart_req(req);
- break;
-
- case REPLAY_RESEND:
+ case RESEND:
DEBUG_REQ(D_HA, req, "RESEND:");
ptlrpc_resend_req(req);
break;
-
+
default:
LBUG();
}
-
}
+}
- conn->c_level = LUSTRE_CONN_FULL;
- recovd_conn_fixed(conn);
+void ptlrpc_wake_delayed(struct obd_import *imp)
+{
+ struct list_head *tmp, *pos;
+ struct ptlrpc_request *req;
- CERROR("recovery complete on conn %p(%s), waking delayed reqs\n",
- conn, conn->c_remote_uuid);
- /* Finally, continue processing requests that blocked for recovery. */
- list_for_each_safe(tmp, pos, &conn->c_delayed_head) {
+ spin_lock(&imp->imp_lock);
+ list_for_each_safe(tmp, pos, &imp->imp_delayed_list) {
req = list_entry(tmp, struct ptlrpc_request, rq_list);
- DEBUG_REQ(D_HA, req, "WAKING: ");
- ptlrpc_continue_req(req);
+ DEBUG_REQ(D_HA, req, "waking:");
+ wake_up(&req->rq_wait_for_rep);
}
-
- EXIT;
- out:
- spin_unlock(&conn->c_lock);
- return rc;
+ spin_unlock(&imp->imp_lock);
}
EXPORT_SYMBOL(ptlrpc_run_recovery_upcall);
EXPORT_SYMBOL(ptlrpc_reconnect_import);
EXPORT_SYMBOL(ptlrpc_replay);
+EXPORT_SYMBOL(ptlrpc_resend);
+EXPORT_SYMBOL(ptlrpc_wake_delayed);
MODULE_AUTHOR("Cluster File Systems, Inc <info@clusterfs.com>");
MODULE_DESCRIPTION("Lustre Request Processor v1.0");