struct obd_export {
__u64 exp_cookie;
- struct lustre_handle exp_impconnh;
struct list_head exp_obd_chain;
struct list_head exp_conn_chain;
struct obd_device *exp_obd;
struct ptlrpc_connection *exp_connection;
- struct ldlm_export_data exp_ldlm_data; /* can this go inside u? */
+ struct ldlm_export_data exp_ldlm_data;
union {
struct mds_export_data eu_mds_data;
struct filter_export_data eu_filter_data;
struct lov_export_data eu_lov_data;
} u;
- void *exp_data; /* device specific data */
- int exp_desclen;
- char *exp_desc;
- obd_uuid_t exp_uuid;
};
#define exp_mds_data u.eu_mds_data
int ptlrpc_run_recovery_upcall(struct ptlrpc_connection *conn);
int ptlrpc_reconnect_import(struct obd_import *imp, int rq_opc);
-int ptlrpc_replay(struct ptlrpc_connection *conn);
+int ptlrpc_replay(struct obd_import *imp, int unreplied_only);
#endif
/* Flags that apply to all requests are in the bottom 16 bits */
#define MSG_GEN_FLAG_MASK 0x0000ffff
+#define MSG_LAST_REPLAY 1
static inline int lustre_msg_get_flags(struct lustre_msg *msg)
{
struct ptlrpc_client *imp_client;
struct lustre_handle imp_handle;
struct list_head imp_chain;
+ struct list_head imp_request_list;
struct obd_device *imp_obd;
int imp_flags;
- /* XXX need a UUID here, I think, unless we just use the OBD's UUID */
+ int imp_level;
+ __u64 imp_last_xid;
+ __u64 imp_max_transno;
+ __u64 imp_peer_last_xid;
+ __u64 imp_peer_committed_transno;
+
+ /* Protects flags, level, *_xid, request_list */
+ spinlock_t imp_lock;
};
extern struct obd_import *class_conn2cliimp(struct lustre_handle *);
if (condition) \
break; \
if (__state == TASK_INTERRUPTIBLE && l_killable_pending(current)) {\
- CERROR("lwe: interrupt\n"); \
if (info->lwi_on_signal) \
info->lwi_on_signal(info->lwi_cb_data); \
ret = -EINTR; \
} \
if (info->lwi_timeout && !__timed_out) { \
if (schedule_timeout(info->lwi_timeout) == 0) { \
- CERROR("lwe: timeout\n"); \
__timed_out = 1; \
if (!info->lwi_on_timeout || \
info->lwi_on_timeout(info->lwi_cb_data)) { \
__state = TASK_INTERRUPTIBLE; \
/* Check for a pending interrupt. */ \
if (info->lwi_signals && l_killable_pending(current)) {\
- CERROR("lwe: pending interrupt\n"); \
if (info->lwi_on_signal) \
info->lwi_on_signal(info->lwi_cb_data); \
ret = -EINTR; \
void mdc_store_inode_generation(struct ptlrpc_request *req, int reqoff,
int repoff);
-extern int mds_client_add(struct mds_export_data *med, int cl_off);
-extern int mds_client_free(struct obd_export *exp);
+int mds_client_add(struct mds_obd *mds, struct mds_export_data *med,
+ int cl_off);
+int mds_client_free(struct obd_export *exp);
/* mds/mds_fs.c */
struct mds_fs_operations {
__u32 c_bootcount; /* peer's boot count */
spinlock_t c_lock; /* also protects req->rq_list */
- __u32 c_xid_in;
- __u32 c_xid_out;
atomic_t c_refcount;
__u64 c_token;
__u64 c_remote_conn;
__u64 c_remote_token;
- __u64 c_last_xid; /* protected by c_lock */
- __u64 c_last_committed;/* protected by c_lock */
- struct list_head c_delayed_head;/* delayed until post-recovery */
- struct list_head c_sending_head;/* protected by c_lock */
- struct list_head c_dying_head; /* protected by c_lock */
+ struct list_head c_delayed_head;/* delayed until post-recovery XXX imp? */
struct recovd_data c_recovd_data;
struct list_head c_imports;
#define PTL_RPC_FL_ERR (1 << 5)
#define PTL_RPC_FL_TIMEOUT (1 << 6)
#define PTL_RPC_FL_RESEND (1 << 7)
-#define PTL_RPC_FL_RECOVERY (1 << 8) /* retransmission for recovery */
+#define PTL_RPC_FL_RESTART (1 << 8) /* operation must be restarted */
#define PTL_RPC_FL_FINISHED (1 << 9)
#define PTL_RPC_FL_RETAIN (1 << 10) /* retain for replay after reply */
#define PTL_RPC_FL_REPLAY (1 << 11) /* replay upon recovery */
#define DEBUG_REQ(level, req, fmt, args...) \
do { \
CDEBUG(level, \
- "@@@ " fmt " req x"LPD64"/t"LPD64" o%d->%s:%d lens %d/%d fl " \
+ "@@@ " fmt " req x"LPD64"/t"LPD64" o%d->%s:%d lens %d/%d ref %d fl " \
"%x\n" , ## args, req->rq_xid, req->rq_transno, \
req->rq_reqmsg ? req->rq_reqmsg->opc : -1, \
req->rq_connection ? (char *)req->rq_connection->c_remote_uuid : "<?>", \
(req->rq_import && req->rq_import->imp_client) ? \
req->rq_import->imp_client->cli_request_portal : -1, \
- req->rq_reqlen, req->rq_replen, req->rq_flags); \
+ req->rq_reqlen, req->rq_replen, req->rq_refcount, req->rq_flags); \
} while (0)
struct ptlrpc_bulk_page {
void ptlrpc_free_bulk(struct ptlrpc_bulk_desc *bulk);
struct ptlrpc_bulk_page *ptlrpc_prep_bulk_page(struct ptlrpc_bulk_desc *desc);
void ptlrpc_free_bulk_page(struct ptlrpc_bulk_page *page);
-int ptlrpc_check_status(struct ptlrpc_request *req, int err);
/* rpc/service.c */
struct ptlrpc_service *
#define IOC_OSC_MAX_NR 50
struct mds_obd {
- struct ptlrpc_service *mds_service;
-
- char *mds_fstype;
- struct super_block *mds_sb;
- struct super_operations *mds_sop;
- struct vfsmount *mds_vfsmnt;
- struct obd_run_ctxt mds_ctxt;
- struct file_operations *mds_fop;
- struct inode_operations *mds_iop;
+ struct ptlrpc_service *mds_service;
+
+ char *mds_fstype;
+ struct super_block *mds_sb;
+ struct super_operations *mds_sop;
+ struct vfsmount *mds_vfsmnt;
+ struct obd_run_ctxt mds_ctxt;
+ struct file_operations *mds_fop;
+ struct inode_operations *mds_iop;
struct address_space_operations *mds_aops;
- struct mds_fs_operations *mds_fsops;
- int mds_max_mdsize;
- struct file *mds_rcvd_filp;
- spinlock_t mds_last_lock;
- __u64 mds_last_committed;
- __u64 mds_last_rcvd;
- __u64 mds_mount_count;
- struct ll_fid mds_rootfid;
- struct mds_server_data *mds_server_data;
+ struct mds_fs_operations *mds_fsops;
+
+ int mds_max_mdsize;
+ struct file *mds_rcvd_filp;
+ struct semaphore mds_transno_sem;
+ __u64 mds_last_committed;
+ __u64 mds_last_rcvd;
+ __u64 mds_mount_count;
+ struct ll_fid mds_rootfid;
+ struct mds_server_data *mds_server_data;
+
+ wait_queue_head_t mds_next_transno_waitq;
+ __u64 mds_next_recovery_transno;
+ int mds_recoverable_clients;
+ struct list_head mds_recovery_queue;
+ struct list_head mds_delayed_reply_queue;
+ spinlock_t mds_processing_task_lock;
+ pid_t mds_processing_task;
};
struct ldlm_obd {
LASSERT(list_empty(&lock->l_pending_chain));
spin_lock_bh(&waiting_locks_spinlock);
- lock->l_callback_timeout = jiffies + (obd_timeout * HZ);
+ lock->l_callback_timeout = jiffies + (obd_timeout * HZ / 2);
timeout_rounded = round_timeout(lock->l_callback_timeout);
LDLM_DEBUG(lock, "sending request");
rc = ptlrpc_queue_wait(req);
- rc = ptlrpc_check_status(req, rc);
if (rc != ELDLM_OK) {
LASSERT(!is_replay);
req->rq_replen = lustre_msg_size(1, &size);
rc = ptlrpc_queue_wait(req);
- rc = ptlrpc_check_status(req, rc);
if (rc != ELDLM_OK)
GOTO(out, rc);
req->rq_replen = lustre_msg_size(0, NULL);
rc = ptlrpc_queue_wait(req);
- rc = ptlrpc_check_status(req, rc);
ptlrpc_req_finished(req);
if (rc != ELDLM_OK)
GOTO(out, rc);
int rq_portal, rp_portal;
char *name;
struct client_obd *cli = &obddev->u.cli;
+ struct obd_import *imp = &cli->cl_import;
obd_uuid_t server_uuid;
ENTRY;
memcpy(server_uuid, data->ioc_inlbuf2, MIN(data->ioc_inllen2,
sizeof(server_uuid)));
- cli->cl_import.imp_connection = ptlrpc_uuid_to_connection(server_uuid);
- if (!cli->cl_import.imp_connection)
+ imp->imp_connection = ptlrpc_uuid_to_connection(server_uuid);
+ if (!imp->imp_connection)
RETURN(-ENOENT);
+
+ INIT_LIST_HEAD(&imp->imp_request_list);
+ spin_lock_init(&imp->imp_lock);
ptlrpc_init_client(rq_portal, rp_portal, name,
&obddev->obd_ldlm_client);
- cli->cl_import.imp_client = &obddev->obd_ldlm_client;
- cli->cl_import.imp_obd = obddev;
+ imp->imp_client = &obddev->obd_ldlm_client;
+ imp->imp_obd = obddev;
cli->cl_max_mds_easize = sizeof(struct lov_mds_md);
char *tmp[] = {cli->cl_target_uuid, obd->obd_uuid};
int rq_opc = (obd->obd_type->typ_ops->o_brw) ? OST_CONNECT :MDS_CONNECT;
struct ptlrpc_connection *c;
+ struct obd_import *imp = &cli->cl_import;
ENTRY;
down(&cli->cl_sem);
if (obd->obd_namespace == NULL)
GOTO(out_disco, rc = -ENOMEM);
+ INIT_LIST_HEAD(&imp->imp_chain);
+ imp->imp_last_xid = 0;
+ imp->imp_max_transno = 0;
+ imp->imp_peer_last_xid = 0;
+ imp->imp_peer_committed_transno = 0;
+
request = ptlrpc_prep_req(&cli->cl_import, rq_opc, 2, size, tmp);
if (!request)
GOTO(out_ldlm, rc = -ENOMEM);
recovd_conn_manage(c, recovd, recover);
rc = ptlrpc_queue_wait(request);
- rc = ptlrpc_check_status(request, rc);
if (rc)
GOTO(out_req, rc);
if (rq_opc == MDS_CONNECT)
- cli->cl_import.imp_flags |= IMP_REPLAYABLE;
- list_add(&cli->cl_import.imp_chain, &c->c_imports);
+ imp->imp_flags |= IMP_REPLAYABLE;
+ list_add(&imp->imp_chain, &c->c_imports);
c->c_level = LUSTRE_CONN_FULL;
- cli->cl_import.imp_handle.addr = request->rq_repmsg->addr;
- cli->cl_import.imp_handle.cookie = request->rq_repmsg->cookie;
+ imp->imp_handle.addr = request->rq_repmsg->addr;
+ imp->imp_handle.cookie = request->rq_repmsg->cookie;
EXIT;
out_req:
out_ldlm:
ldlm_namespace_free(obd->obd_namespace);
obd->obd_namespace = NULL;
+ if (rq_opc == MDS_CONNECT) {
+ /* Don't class_disconnect OSCs, because the LOV
+ * cares about them even if they can't connect to the
+ * OST.
+ *
+ * This is leak-bait, but without either a way to
+ * operate on the osc without an export or separate
+ * methods for connect-to-osc and connect-osc-to-ost
+ * it's not clear what else to do.
+ */
out_disco:
- class_disconnect(conn);
- MOD_DEC_USE_COUNT;
+ cli->cl_conn_count--;
+ class_disconnect(conn);
+ MOD_DEC_USE_COUNT;
+ }
}
out_sem:
up(&cli->cl_sem);
ldlm_namespace_free(obd->obd_namespace);
obd->obd_namespace = NULL;
- request = ptlrpc_prep_req(&cli->cl_import, rq_opc, 0, NULL, NULL);
+ request = ptlrpc_prep_req(&cli->cl_import, rq_opc, 0, NULL,
+ NULL);
if (!request)
GOTO(out_disco, rc = -ENOMEM);
-
+
request->rq_replen = lustre_msg_size(0, NULL);
+ /* Process disconnects even if we're waiting for recovery. */
+ request->rq_level = LUSTRE_CONN_RECOVD;
+
rc = ptlrpc_queue_wait(request);
if (rc)
GOTO(out_req, rc);
rc = obd_connect(&conn, target, cluuid, ptlrpc_recovd,
target_revoke_connection);
- if (rc)
+ /* EALREADY indicates a reconnection, send the reply normally. */
+ if (rc && rc != EALREADY)
GOTO(out, rc);
rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg);
dlmimp->imp_handle.addr = req->rq_reqmsg->addr;
dlmimp->imp_handle.cookie = req->rq_reqmsg->cookie;
dlmimp->imp_obd = /* LDLM! */ NULL;
+ spin_lock_init(&dlmimp->imp_lock);
req->rq_connection->c_level = LUSTRE_CONN_FULL;
out:
mdc_close(&sbi->ll_mdc_conn, inode->i_ino,
S_IFREG, &fd->fd_mdshandle, &req);
out_req:
- ptlrpc_free_req(req);
+ ptlrpc_req_finished(req); /* once for reply */
+ ptlrpc_req_finished(req); /* once for an early "commit" */
//out_fd:
fd->fd_mdshandle.cookie = DEAD_HANDLE_MAGIC;
kmem_cache_free(ll_file_data_slab, fd);
rc = -abs(rc2);
GOTO(out_fd, rc);
}
- CDEBUG(D_HA, "matched req %p xid "LPD64" transno "LPD64" op "
- "%d->%s:%d\n", fd->fd_req, fd->fd_req->rq_xid,
- fd->fd_req->rq_repmsg->transno, fd->fd_req->rq_reqmsg->opc,
- fd->fd_req->rq_import->imp_connection->c_remote_uuid,
- fd->fd_req->rq_import->imp_client->cli_request_portal);
+ DEBUG_REQ(D_HA, fd->fd_req, "matched open for this close: ");
ptlrpc_req_finished(fd->fd_req);
if (atomic_dec_and_test(&lli->lli_open_count)) {
imp->imp_flags |= IMP_INVALID;
spin_unlock(&imp->imp_connection->c_lock);
- list_for_each_safe(tmp, n, &imp->imp_connection->c_sending_head) {
+ list_for_each_safe(tmp, n, &imp->imp_request_list) {
struct ptlrpc_request *req =
list_entry(tmp, struct ptlrpc_request, rq_list);
- if (req->rq_import != imp)
- continue;
-
if (req->rq_flags & PTL_RPC_FL_REPLIED) {
/* no need to replay, just discard */
- CERROR("uncommitted req xid "LPD64" op %d to OST %s\n",
- (unsigned long long)req->rq_xid,
- req->rq_reqmsg->opc,
- imp->imp_obd->u.cli.cl_target_uuid);
+ DEBUG_REQ(D_ERROR, req, "uncommitted");
ptlrpc_req_finished(req);
} else {
- CERROR("inflight req xid "LPD64" op %d to OST %s\n",
- (unsigned long long)req->rq_xid,
- req->rq_reqmsg->opc,
- imp->imp_obd->u.cli.cl_target_uuid);
-
+ DEBUG_REQ(D_ERROR, req, "inflight");
req->rq_flags |= PTL_RPC_FL_ERR;
wake_up(&req->rq_wait_for_rep);
}
list_for_each_safe(tmp, n, &imp->imp_connection->c_delayed_head) {
struct ptlrpc_request *req =
list_entry(tmp, struct ptlrpc_request, rq_list);
- CERROR("aborting waiting req xid "LPD64" op %d to OST %s\n",
- (unsigned long long)req->rq_xid, req->rq_reqmsg->opc,
- imp->imp_obd->u.cli.cl_target_uuid);
+
+ if (req->rq_import != imp)
+ continue;
+
+ DEBUG_REQ(D_ERROR, req, "aborting waiting req");
req->rq_flags |= PTL_RPC_FL_ERR;
wake_up(&req->rq_wait_for_rep);
}
imp->imp_obd->obd_uuid);
}
-static int reconnect_mdc(struct obd_import *imp)
+static void reconnect_mdc(struct obd_import *imp)
{
- return ptlrpc_reconnect_import(imp, MDS_CONNECT);
+ int rc = ptlrpc_reconnect_import(imp, MDS_CONNECT);
+ if (!rc)
+ ptlrpc_replay(imp, 0 /* all reqs */);
+ else if (rc == EALREADY)
+ ptlrpc_replay(imp, 1 /* only unreplied reqs */);
}
static int ll_reconnect(struct ptlrpc_connection *conn)
{
struct list_head *tmp;
- int need_replay = 0;
ENTRY;
-
- /* XXX c_lock semantics! */
- conn->c_level = LUSTRE_CONN_CON;
-
- /* XXX this code MUST be shared with class_obd_connect! */
list_for_each(tmp, &conn->c_imports) {
struct obd_import *imp = list_entry(tmp, struct obd_import,
imp_chain);
if (imp->imp_obd->obd_type->typ_ops->o_brw) {
- /* XXX what to do if we fail? */
reconnect_osc(imp);
} else {
- int rc = reconnect_mdc(imp);
- if (!rc) {
- need_replay = 1;
- }
- /* make sure we don't try to replay for dead imps?
- *
- * else imp->imp_connection = NULL;
- *
- */
-
+ reconnect_mdc(imp);
}
}
- if (!need_replay) {
- /* all done! */
- conn->c_level = LUSTRE_CONN_FULL;
- RETURN(0);
- }
-
- conn->c_level = LUSTRE_CONN_RECOVD;
- /* this will replay, up the c_level, recovd_conn_fixed and continue
- * reqs. also, makes a mean cup of coffee.
- */
- RETURN(ptlrpc_replay(conn));
+ conn->c_level = LUSTRE_CONN_FULL;
+ RETURN(0);
}
int ll_recover(struct recovd_data *rd, int phase)
/* XXX should just be dealing with imports, probably through
* XXX iocontrol, need next-gen recovery! */
conn->c_flags |= CONN_INVALID;
- invalidate_request_list(&conn->c_sending_head);
+ /* invalidate_request_list(&conn->c_sending_head); */
invalidate_request_list(&conn->c_delayed_head);
spin_unlock(&conn->c_lock);
}
for (i = 0; i < desc->ld_tgt_count; i++) {
struct obd_device *tgt = class_uuid2obd(uuidarray[i]);
+ int rc2;
if (!tgt) {
CERROR("Target %s not attached\n", uuidarray[i]);
rc = obd_connect(&lov->tgts[i].conn, tgt, NULL, recovd,
recover);
- if (rc) {
- CERROR("Target %s connect error %d\n",
- uuidarray[i], rc);
- GOTO(out_disc, rc);
+
+ /* Register even if connect failed, so that we get reactivation
+ * notices.
+ */
+ rc2 = obd_iocontrol(IOC_OSC_REGISTER_LOV, &lov->tgts[i].conn,
+ sizeof(struct obd_device *), obd, NULL);
+ if (rc2) {
+ CERROR("Target %s REGISTER_LOV error %d\n",
+ uuidarray[i], rc2);
+ GOTO(out_disc, rc2);
}
- rc = obd_iocontrol(IOC_OSC_REGISTER_LOV, &lov->tgts[i].conn,
- sizeof(struct obd_device *), obd, NULL);
+
+ /* But mark failed-connect OSCs as inactive! */
if (rc) {
- CERROR("Target %s REGISTER_LOV error %d\n",
+ CDEBUG(D_INFO, "Target %s connect error %d\n",
uuidarray[i], rc);
- GOTO(out_disc, rc);
+ LASSERT(lov->tgts[i].active == 0);
+ rc = 0;
+ continue;
}
+
desc->ld_active_tgt_count++;
lov->tgts[i].active = 1;
}
goto out_local;
for (i = 0; i < lov->desc.ld_tgt_count; i++) {
- if (!lov->tgts[i].active) {
- CERROR("Skipping disconnect for inactive OSC %s\n",
- lov->tgts[i].uuid);
- continue;
- }
-
- lov->desc.ld_active_tgt_count--;
- lov->tgts[i].active = 0;
rc = obd_disconnect(&lov->tgts[i].conn);
if (rc) {
- CERROR("Target %s disconnect error %d\n",
- lov->tgts[i].uuid, rc);
- RETURN(rc);
+ if (lov->tgts[i].active) {
+ CERROR("Target %s disconnect error %d\n",
+ lov->tgts[i].uuid, rc);
+ }
+ rc = 0;
+ }
+ if (lov->tgts[i].active) {
+ lov->desc.ld_active_tgt_count--;
+ lov->tgts[i].active = 0;
}
}
OBD_FREE(lov->tgts, lov->bufsize);
CDEBUG(D_INFO, "Marking OBD %p %sactive\n", obd, activate ? "" : "in");
lov->tgts[i].active = activate;
- if (activate)
+ if (activate) {
+ /*
+ * foreach(export)
+ * foreach(open_file)
+ * if (file_handle uses this_osc)
+ * if (has_no_filehandle)
+ * open(file_handle, this_osc);
+ */
+ /* XXX reconnect? */
lov->desc.ld_active_tgt_count++;
- else
+ } else {
+ /*
+ * Should I invalidate filehandles that refer to this OSC, so
+ * that I reopen them during reactivation?
+ */
+ /* XXX disconnect from OSC? */
lov->desc.ld_active_tgt_count--;
+ }
EXIT;
out:
ENTRY;
if (data->ioc_inllen1 < 1) {
- CERROR("osc setup requires an MDC UUID\n");
+ CERROR("LOV setup requires an MDC UUID\n");
RETURN(-EINVAL);
}
lsm = *ea;
+ /* Can't create more stripes than we have targets (incl inactive). */
+ if (lsm && lsm->lsm_stripe_count > lov->desc.ld_tgt_count)
+ GOTO(out_tmp, rc = -EINVAL);
+
/* Free the user lsm if it needs to be changed, to avoid memory leaks */
if (!lsm || (lsm &&
lsm->lsm_stripe_count > lov->desc.ld_active_tgt_count)) {
out_tmp:
obdo_free(tmp);
- return rc;
+ RETURN(rc);
out_cleanup:
while (i-- > 0) {
lov = &export->exp_obd->u.lov;
for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
+ int err;
+ if (lov->tgts[loi->loi_ost_idx].active == 0) {
+ /* Orphan clean up will (someday) fix this up. */
+ continue;
+ }
+
memcpy(&tmp, oa, sizeof(tmp));
tmp.o_id = loi->loi_id;
if (lfh)
sizeof(lfh->lfh_handles[i]));
else
tmp.o_valid &= ~OBD_MD_FLHANDLE;
- rc = obd_destroy(&lov->tgts[loi->loi_ost_idx].conn, &tmp, NULL);
- if (rc)
- CERROR("Error destroying objid "LPX64" subobj "LPX64
- " on OST idx %d\n: rc = %d",
- oa->o_id, loi->loi_id, loi->loi_ost_idx, rc);
+ err = obd_destroy(&lov->tgts[loi->loi_ost_idx].conn, &tmp,
+ NULL);
+ if (err && lov->tgts[loi->loi_ost_idx].active) {
+ CERROR("Error destroying objid "LPX64" subobj "
+ LPX64" on OST idx %d\n: rc = %d",
+ oa->o_id, loi->loi_id, loi->loi_ost_idx, err);
+ if (!rc)
+ rc = err;
+ }
}
RETURN(rc);
}
struct lov_obd *lov;
struct lov_oinfo *loi;
struct lov_file_handles *lfh = NULL;
- int rc = 0, i;
+ int i;
int new = 1;
ENTRY;
if (loi->loi_id == 0)
continue;
+ if (lov->tgts[loi->loi_ost_idx].active == 0)
+ continue;
+
CDEBUG(D_INFO, "objid "LPX64"[%d] has subobj "LPX64" at idx "
"%u\n", oa->o_id, i, loi->loi_id, loi->loi_ost_idx);
/* create data objects with "parent" OA */
tmp.o_valid &= ~OBD_MD_FLHANDLE;
err = obd_getattr(&lov->tgts[loi->loi_ost_idx].conn, &tmp,NULL);
- if (err) {
+ if (err && lov->tgts[loi->loi_ost_idx].active) {
CERROR("Error getattr objid "LPX64" subobj "LPX64
" on OST idx %d: rc = %d\n",
oa->o_id, loi->loi_id, loi->loi_ost_idx, err);
- if (!rc)
- rc = err;
- continue; /* XXX or break? */
+ RETURN(err);
}
lov_merge_attrs(oa, &tmp, tmp.o_valid, lsm, i, &new);
}
- RETURN(rc);
+
+ RETURN(0);
}
static int lov_setattr(struct lustre_handle *conn, struct obdo *oa,
static int lov_open(struct lustre_handle *conn, struct obdo *oa,
struct lov_stripe_md *lsm)
{
- struct obdo *tmp;
+ struct obdo *tmp; /* on the heap here, on the stack in lov_close? */
struct obd_export *export = class_conn2export(conn);
struct lov_obd *lov;
struct lov_oinfo *loi;
struct lov_file_handles *lfh = NULL;
+ struct lustre_handle *handle;
int new = 1;
int rc = 0, i;
ENTRY;
oa->o_size = 0;
oa->o_blocks = 0;
for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
- int err;
+
+ if (lov->tgts[loi->loi_ost_idx].active == 0) {
+ continue;
+ }
/* create data objects with "parent" OA */
memcpy(tmp, oa, sizeof(*tmp));
tmp->o_id = loi->loi_id;
- err = obd_open(&lov->tgts[loi->loi_ost_idx].conn, tmp, NULL);
- if (err) {
+ rc = obd_open(&lov->tgts[loi->loi_ost_idx].conn, tmp, NULL);
+ if (rc && lov->tgts[loi->loi_ost_idx].active) {
CERROR("Error open objid "LPX64" subobj "LPX64
" on OST idx %d: rc = %d\n",
oa->o_id, lsm->lsm_oinfo[i].loi_id,
loi->loi_ost_idx, rc);
- if (!rc)
- rc = err;
+ goto out_handles;
}
lov_merge_attrs(oa, tmp, tmp->o_valid, lsm, i, &new);
sizeof(lfh->lfh_handles[i]));
}
- if (tmp->o_valid & OBD_MD_FLHANDLE) {
- struct lustre_handle *handle = obdo_handle(oa);
+ handle = obdo_handle(oa);
+
+ lfh->lfh_count = lsm->lsm_stripe_count;
+ get_random_bytes(&lfh->lfh_cookie, sizeof(lfh->lfh_cookie));
+
+ handle->addr = (__u64)(unsigned long)lfh;
+ handle->cookie = lfh->lfh_cookie;
+ oa->o_valid |= OBD_MD_FLHANDLE;
+ list_add(&lfh->lfh_list, &export->exp_lov_data.led_open_head);
- lfh->lfh_count = lsm->lsm_stripe_count;
- get_random_bytes(&lfh->lfh_cookie, sizeof(lfh->lfh_cookie));
-
- handle->addr = (__u64)(unsigned long)lfh;
- handle->cookie = lfh->lfh_cookie;
- oa->o_valid |= OBD_MD_FLHANDLE;
- list_add(&lfh->lfh_list, &export->exp_lov_data.led_open_head);
- } else
- goto out_handles;
-
- /* FIXME: returning an error, but having opened some objects is a bad
- * idea, since they will likely never be closed. We either
- * need to not return an error if _some_ objects could be
- * opened, and leave it to read/write to return -EIO (with
- * hopefully partial error status) or close all opened objects
- * and return an error. I think the former is preferred.
- */
out_tmp:
obdo_free(tmp);
RETURN(rc);
out_handles:
+ for (i--, loi = &lsm->lsm_oinfo[i]; i >= 0; i--, loi--) {
+ int err;
+
+ if (lov->tgts[loi->loi_ost_idx].active == 0)
+ continue;
+
+ memcpy(tmp, oa, sizeof(*tmp));
+ tmp->o_id = loi->loi_id;
+ memcpy(obdo_handle(tmp), &lfh->lfh_handles[i],
+ sizeof(lfh->lfh_handles[i]));
+
+ err = obd_close(&lov->tgts[loi->loi_ost_idx].conn, tmp, NULL);
+ if (err) {
+ CERROR("Error closing objid "LPX64" subobj "LPX64
+ " on OST idx %d after open error: rc = %d\n",
+ oa->o_id, loi->loi_id, loi->loi_ost_idx, err);
+ }
+ }
+
OBD_FREE(lfh->lfh_handles,
lsm->lsm_stripe_count * sizeof(*lfh->lfh_handles));
out_lfh:
lov = &export->exp_obd->u.lov;
for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
int err;
+
+ if (lov->tgts[loi->loi_ost_idx].active == 0)
+ continue;
/* create data objects with "parent" OA */
memcpy(&tmp, oa, sizeof(tmp));
RETURN(-EINVAL);
}
+ /* XXX assert that we're not in recovery */
+
if (!export || !export->exp_obd)
RETURN(-ENODEV);
struct ldlm_extent sub_ext;
struct lov_stripe_md submd;
+ *flags = 0;
sub_ext.start = lov_stripe_offset(lsm, extent->start, i);
sub_ext.end = lov_stripe_offset(lsm, extent->end, i);
if (sub_ext.start == sub_ext.end)
request->rq_level = level;
rc = ptlrpc_queue_wait(request);
- rc = ptlrpc_check_status(request, rc);
if (rc) {
CERROR("error in handling %d\n", rc);
mds_pack_req_body(req);
rc = ptlrpc_queue_wait(req);
- rc = ptlrpc_check_status(req, rc);
if (!rc) {
body = lustre_msg_buf(req->rq_repmsg, 0);
req->rq_replen = lustre_msg_size(2, size);
rc = ptlrpc_queue_wait(req);
- rc = ptlrpc_check_status(req, rc);
out:
RETURN(rc);
mds_pack_req_body(req);
rc = ptlrpc_queue_wait(req);
- rc = ptlrpc_check_status(req, rc);
if (!rc) {
body = lustre_msg_buf(req->rq_repmsg, 0);
struct mds_rec_create *rec = lustre_msg_buf(req->rq_reqmsg, reqoff);
struct mds_body *body = lustre_msg_buf(req->rq_repmsg, repoff);
+ DEBUG_REQ(D_HA, req, "storing generation %x for ino "LPD64,
+ body->fid1.generation, body->fid1.id);
memcpy(&rec->cr_replayfid, &body->fid1, sizeof rec->cr_replayfid);
}
req->rq_replen = lustre_msg_size(1, size);
rc = ptlrpc_queue_wait(req);
- rc = ptlrpc_check_status(req, rc);
if (!rc) {
body = lustre_msg_buf(req->rq_repmsg, 0);
mds_unpack_body(body);
req->rq_replen = lustre_msg_size(0, NULL);
rc = ptlrpc_queue_wait(req);
- rc = ptlrpc_check_status(req, rc);
EXIT;
out:
req->rq_replen = lustre_msg_size(1, &size);
rc = ptlrpc_queue_wait(req);
- rc = ptlrpc_check_status(req, rc);
if (rc) {
ptlrpc_abort_bulk(desc);
GOTO(out2, rc);
req->rq_replen = lustre_msg_size(1, &size);
rc = ptlrpc_queue_wait(req);
- rc = ptlrpc_check_status(req, rc);
if (rc)
GOTO(out, rc);
* Author: Peter Braam <braam@clusterfs.com>
* Author: Andreas Dilger <adilger@clusterfs.com>
* Author: Phil Schwan <phil@clusterfs.com>
+ * Author: Mike Shaver <shaver@clusterfs.com>
*
* This file is part of Lustre, http://www.lustre.org.
*
extern int mds_get_lovtgts(struct mds_obd *obd, int tgt_count,
obd_uuid_t *uuidarray);
extern int mds_get_lovdesc(struct mds_obd *obd, struct lov_desc *desc);
-extern int mds_update_last_rcvd(struct mds_obd *mds, void *handle,
- struct ptlrpc_request *req);
+extern void mds_start_transno(struct mds_obd *mds);
+extern int mds_finish_transno(struct mds_obd *mds, void *handle,
+ struct ptlrpc_request *req, int rc);
static int mds_cleanup(struct obd_device * obddev);
extern struct lprocfs_vars status_var_nm_1[];
struct ptlrpc_bulk_desc *desc = data;
ENTRY;
- CERROR("(not yet) starting recovery of client %p\n", desc->bd_client);
+ recovd_conn_fail(desc->bd_connection);
RETURN(1);
}
}
lwi = LWI_TIMEOUT(obd_timeout * HZ, mds_bulk_timeout, desc);
- rc = l_wait_event(desc->bd_waitq, desc->bd_flags & PTL_BULK_FL_SENT, &lwi);
+ rc = l_wait_event(desc->bd_waitq, desc->bd_flags & PTL_BULK_FL_SENT,
+ &lwi);
if (rc) {
if (rc != -ETIMEDOUT)
LBUG();
CERROR("FYI: NULL mcd - simultaneous connects\n");
continue;
}
- if (!memcmp(cluuid, mcd->mcd_uuid, sizeof(mcd->mcd_uuid))) {
+ if (!memcmp(cluuid, mcd->mcd_uuid, sizeof mcd->mcd_uuid)) {
+ /* XXX make handle-found-export a subroutine */
LASSERT(exp->exp_obd == obd);
- if (!list_empty(&exp->exp_conn_chain)) {
- CERROR("existing uuid/export, list not empty!\n");
- spin_unlock(&obd->obd_dev_lock);
+ spin_unlock(&obd->obd_dev_lock);
+ if (exp->exp_connection) {
+ struct lustre_handle *hdl;
+ hdl = &exp->exp_ldlm_data.led_import.imp_handle;
+ /* Might be a re-connect after a partition. */
+ if (!memcmp(conn, hdl, sizeof *conn)) {
+ CERROR("%s reconnecting\n", cluuid);
+ conn->addr = (__u64) (unsigned long)exp;
+ conn->cookie = exp->exp_cookie;
+ rc = EALREADY;
+ } else {
+ CERROR("%s reconnecting from %s, "
+ "handle mismatch (ours %Lx/%Lx, "
+ "theirs %Lx/%Lx)\n", cluuid,
+ exp->exp_connection->
+ c_remote_uuid, hdl->addr,
+ hdl->cookie, conn->addr,
+ conn->cookie);
+ /* XXX disconnect them here? */
+ memset(conn, 0, sizeof *conn);
+ rc = -EALREADY;
+ }
MOD_DEC_USE_COUNT;
- RETURN(-EALREADY);
+ RETURN(rc);
}
conn->addr = (__u64) (unsigned long)exp;
conn->cookie = exp->exp_cookie;
- spin_unlock(&obd->obd_dev_lock);
CDEBUG(D_INFO, "existing export for UUID '%s' at %p\n",
cluuid, exp);
CDEBUG(D_IOCTL,"connect: addr %Lx cookie %Lx\n",
(long long)conn->addr, (long long)conn->cookie);
- MOD_DEC_USE_COUNT;
RETURN(0);
}
}
spin_unlock(&obd->obd_dev_lock);
+
+ if (obd->u.mds.mds_recoverable_clients != 0) {
+ CERROR("denying connection for new client %s: in recovery\n",
+ cluuid);
+ MOD_DEC_USE_COUNT;
+ RETURN(-EBUSY);
+ }
+
/* XXX There is a small race between checking the list and adding a
* new connection for the same UUID, but the real threat (list
* corruption when multiple different clients connect) is solved.
INIT_LIST_HEAD(&med->med_open_head);
spin_lock_init(&med->med_open_lock);
- rc = mds_client_add(med, -1);
+ rc = mds_client_add(&obd->u.mds, med, -1);
if (rc)
GOTO(out_mcd, rc);
uc.ouc_fsgid = body->fsgid;
uc.ouc_cap = body->capability;
push_ctxt(&saved, &mds->mds_ctxt, &uc);
+ mds_start_transno(mds);
handle = mds_fs_start(mds, inode, MDS_FSOP_SETATTR);
- if (!handle)
- GOTO(out_ea, rc = -ENOMEM);
+ if (IS_ERR(handle)) {
+ rc = PTR_ERR(handle);
+ mds_finish_transno(mds, handle, req, rc);
+ GOTO(out_ea, rc);
+ }
rc = mds_fs_set_md(mds, inode, handle, lmm, lmm_size);
- if (!rc)
- rc = mds_update_last_rcvd(mds, handle, req);
+ rc = mds_finish_transno(mds, handle, req, rc);
rc2 = mds_fs_commit(mds, inode, handle);
if (rc2 && !rc)
return rc;
}
+/* forward declaration */
+int mds_handle(struct ptlrpc_request *req);
+
+static int check_for_next_transno(struct mds_obd *mds)
+{
+ struct ptlrpc_request *req;
+ req = list_entry(mds->mds_recovery_queue.next,
+ struct ptlrpc_request, rq_list);
+ return req->rq_reqmsg->transno == mds->mds_next_recovery_transno;
+}
+
+static void process_recovery_queue(struct mds_obd *mds)
+{
+ struct ptlrpc_request *req;
+
+ for (;;) {
+ spin_lock(&mds->mds_processing_task_lock);
+ req = list_entry(mds->mds_recovery_queue.next,
+ struct ptlrpc_request, rq_list);
+
+ if (req->rq_reqmsg->transno != mds->mds_next_recovery_transno) {
+ spin_unlock(&mds->mds_processing_task_lock);
+ wait_event(mds->mds_next_transno_waitq,
+ check_for_next_transno(mds));
+ continue;
+ }
+ list_del(&req->rq_list);
+ spin_unlock(&mds->mds_processing_task_lock);
+
+ DEBUG_REQ(D_HA, req, "");
+ mds_handle(req);
+
+ if (list_empty(&mds->mds_recovery_queue))
+ break;
+ }
+}
+
+static int queue_recovery_request(struct ptlrpc_request *req,
+ struct mds_obd *mds)
+{
+ struct list_head *tmp;
+ int inserted = 0, transno = req->rq_reqmsg->transno;
+
+ if (!transno) {
+ DEBUG_REQ(D_HA, req, "not queueing");
+ return 1;
+ }
+
+ spin_lock(&mds->mds_processing_task_lock);
+
+ if (mds->mds_processing_task == current->pid) {
+ /* Processing the queue right now, don't re-add. */
+ spin_unlock(&mds->mds_processing_task_lock);
+ return 1;
+ }
+
+ /* XXX O(n^2) */
+ list_for_each(tmp, &mds->mds_recovery_queue) {
+ struct ptlrpc_request *reqiter =
+ list_entry(tmp, struct ptlrpc_request, rq_list);
+ if (reqiter->rq_reqmsg->transno > transno) {
+ list_add_tail(&req->rq_list, &reqiter->rq_list);
+ inserted = 1;
+ break;
+ }
+ }
+
+ if (!inserted)
+ list_add_tail(&req->rq_list, &mds->mds_recovery_queue);
+
+ if (mds->mds_processing_task != 0) {
+ /* Someone else is processing this queue, we'll leave it to
+ * them.
+ */
+ spin_unlock(&mds->mds_processing_task_lock);
+ if (transno == mds->mds_next_recovery_transno)
+ wake_up(&mds->mds_next_transno_waitq);
+ return 0;
+ }
+
+ /* Nobody is processing, and we know there's (at least) one to process
+ * now, so we'll do the honours.
+ */
+ mds->mds_processing_task = current->pid;
+ spin_unlock(&mds->mds_processing_task_lock);
+
+ process_recovery_queue(mds);
+ return 0;
+}
+
+static int filter_recovery_request(struct ptlrpc_request *req,
+ struct mds_obd *mds, int *process)
+{
+ switch (req->rq_reqmsg->opc) {
+ case MDS_CONNECT:
+ case MDS_DISCONNECT:
+ case MDS_OPEN:
+ *process = 1;
+ RETURN(0);
+
+ case MDS_GETSTATUS: /* used in unmounting */
+ case MDS_REINT:
+ case LDLM_ENQUEUE:
+ *process = queue_recovery_request(req, mds);
+ RETURN(0);
+
+ default:
+ DEBUG_REQ(D_ERROR, req, "not permitted during recovery");
+ *process = 0;
+ RETURN(ptlrpc_error(req->rq_svc, req));
+ }
+}
+
+static int mds_queue_final_reply(struct ptlrpc_request *req, int rc)
+{
+ struct mds_obd *mds = mds_req2mds(req);
+
+ if (rc) {
+ /* Just like ptlrpc_error, but without the sending. */
+ lustre_pack_msg(0, NULL, NULL, &req->rq_replen,
+ &req->rq_repmsg);
+ req->rq_type = PTL_RPC_MSG_ERR;
+ }
+
+ list_add(&req->rq_list, &mds->mds_delayed_reply_queue);
+ if (--mds->mds_recoverable_clients == 0) {
+ struct list_head *tmp, *n;
+
+ CDEBUG(D_HA,
+ "all clients recovered, sending delayed replies\n");
+ list_for_each_safe(tmp, n, &mds->mds_delayed_reply_queue) {
+ req = list_entry(tmp, struct ptlrpc_request, rq_list);
+ DEBUG_REQ(D_HA, req, "delayed:");
+ ptlrpc_reply(req->rq_svc, req);
+ }
+ } else {
+ CDEBUG(D_HA, "%d recoverable clients remain\n",
+ mds->mds_recoverable_clients);
+ }
+
+ return 1;
+}
+
+static char *reint_names[] = {
+ [REINT_SETATTR] "setattr",
+ [REINT_CREATE] "create",
+ [REINT_LINK] "link",
+ [REINT_UNLINK] "unlink",
+ [REINT_RENAME] "rename"
+};
+
int mds_handle(struct ptlrpc_request *req)
{
int rc;
+ int should_process;
+ struct mds_obd *mds = NULL; /* quell gcc overwarning */
ENTRY;
rc = lustre_unpack_msg(req->rq_reqmsg, req->rq_reqlen);
GOTO(out, rc);
}
- if (req->rq_reqmsg->opc != MDS_CONNECT && req->rq_export == NULL)
- GOTO(out, rc = -ENOTCONN);
-
LASSERT(!strcmp(req->rq_obd->obd_type->typ_name, LUSTRE_MDT_NAME));
+ if (req->rq_reqmsg->opc != MDS_CONNECT) {
+ if (req->rq_export == NULL)
+ GOTO(out, rc = -ENOTCONN);
+
+ mds = mds_req2mds(req);
+ if (mds->mds_recoverable_clients != 0) {
+ rc = filter_recovery_request(req, mds, &should_process);
+ if (rc || !should_process)
+ RETURN(rc);
+ }
+ }
+
switch (req->rq_reqmsg->opc) {
case MDS_CONNECT:
- CDEBUG(D_INODE, "connect\n");
+ DEBUG_REQ(D_INODE, req, "connect");
OBD_FAIL_RETURN(OBD_FAIL_MDS_CONNECT_NET, 0);
rc = target_handle_connect(req);
+ /* Make sure that last_rcvd is correct. */
+ if (!rc) {
+ /* Now that we have an export, set mds. */
+ mds = mds_req2mds(req);
+ mds_fsync_super(mds->mds_sb);
+ }
break;
case MDS_DISCONNECT:
- CDEBUG(D_INODE, "disconnect\n");
+ DEBUG_REQ(D_INODE, req, "disconnect");
OBD_FAIL_RETURN(OBD_FAIL_MDS_DISCONNECT_NET, 0);
rc = target_handle_disconnect(req);
+ /* Make sure that last_rcvd is correct. */
+ if (!rc)
+ mds_fsync_super(mds->mds_sb);
goto out;
case MDS_GETSTATUS:
- CDEBUG(D_INODE, "getstatus\n");
+ DEBUG_REQ(D_INODE, req, "getstatus");
OBD_FAIL_RETURN(OBD_FAIL_MDS_GETSTATUS_NET, 0);
rc = mds_getstatus(req);
break;
case MDS_GETLOVINFO:
- CDEBUG(D_INODE, "getlovinfo\n");
+ DEBUG_REQ(D_INODE, req, "getlovinfo");
rc = mds_getlovinfo(req);
break;
case MDS_GETATTR:
- CDEBUG(D_INODE, "getattr\n");
+ DEBUG_REQ(D_INODE, req, "getattr");
OBD_FAIL_RETURN(OBD_FAIL_MDS_GETATTR_NET, 0);
rc = mds_getattr(0, req);
break;
case MDS_STATFS:
- CDEBUG(D_INODE, "statfs\n");
+ DEBUG_REQ(D_INODE, req, "statfs");
OBD_FAIL_RETURN(OBD_FAIL_MDS_STATFS_NET, 0);
rc = mds_statfs(req);
break;
case MDS_READPAGE:
- CDEBUG(D_INODE, "readpage\n");
+ DEBUG_REQ(D_INODE, req, "readpage\n");
OBD_FAIL_RETURN(OBD_FAIL_MDS_READPAGE_NET, 0);
rc = mds_readpage(req);
case MDS_REINT: {
int size = sizeof(struct mds_body);
- CDEBUG(D_INODE, "reint\n");
+ int opc = *(u32 *)lustre_msg_buf(req->rq_reqmsg, 0),
+ realopc = opc & REINT_OPCODE_MASK;
+
+ DEBUG_REQ(D_INODE, req, "reint (%s%s)",
+ reint_names[realopc],
+ opc & REINT_REPLAYING ? "|REPLAYING" : "");
+
OBD_FAIL_RETURN(OBD_FAIL_MDS_REINT_NET, 0);
rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen,
}
case MDS_OPEN:
- CDEBUG(D_INODE, "open\n");
+ DEBUG_REQ(D_INODE, req, "open");
OBD_FAIL_RETURN(OBD_FAIL_MDS_OPEN_NET, 0);
rc = mds_open(req);
break;
case MDS_CLOSE:
- CDEBUG(D_INODE, "close\n");
+ DEBUG_REQ(D_INODE, req, "close");
OBD_FAIL_RETURN(OBD_FAIL_MDS_CLOSE_NET, 0);
rc = mds_close(req);
break;
case LDLM_ENQUEUE:
- CDEBUG(D_INODE, "enqueue\n");
+ DEBUG_REQ(D_INODE, req, "enqueue");
OBD_FAIL_RETURN(OBD_FAIL_LDLM_ENQUEUE, 0);
rc = ldlm_handle_enqueue(req);
break;
case LDLM_CONVERT:
- CDEBUG(D_INODE, "convert\n");
+ DEBUG_REQ(D_INODE, req, "convert");
OBD_FAIL_RETURN(OBD_FAIL_LDLM_CONVERT, 0);
rc = ldlm_handle_convert(req);
break;
case LDLM_BL_CALLBACK:
case LDLM_CP_CALLBACK:
- CDEBUG(D_INODE, "callback\n");
+ DEBUG_REQ(D_INODE, req, "callback");
CERROR("callbacks should not happen on MDS\n");
LBUG();
OBD_FAIL_RETURN(OBD_FAIL_LDLM_BL_CALLBACK, 0);
if (!rc) {
struct mds_export_data *med = &req->rq_export->exp_mds_data;
- struct mds_obd *mds = mds_req2mds(req);
req->rq_repmsg->last_xid =
HTON__u64(le64_to_cpu(med->med_mcd->mcd_last_xid));
cpu_to_le32(req->rq_xid));
}
out:
- if (rc) {
+
+ if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_LAST_REPLAY) {
+ struct mds_obd *mds = mds_req2mds(req);
+ LASSERT(mds->mds_recoverable_clients);
+ DEBUG_REQ(D_HA, req, "LAST_REPLAY, queuing reply");
+ return mds_queue_final_reply(req, rc);
+ }
+
+ /* MDS_CONNECT / EALREADY (note: not -EALREADY!) isn't an error */
+ if (rc && (req->rq_reqmsg->opc != MDS_CONNECT ||
+ rc != EALREADY)) {
CERROR("mds: processing error (opcode %d): %d\n",
req->rq_reqmsg->opc, rc);
ptlrpc_error(req->rq_svc, req);
*
* Also assumes for mds_last_rcvd that we are not modifying it (no locking).
*/
-static
int mds_update_server_data(struct mds_obd *mds)
{
struct mds_server_data *msd = mds->mds_server_data;
}
/* Do recovery actions for the MDS */
-static int mds_recover(struct obd_device *obddev)
+static int mds_recovery_complete(struct obd_device *obddev)
{
struct mds_obd *mds = &obddev->u.mds;
struct obd_run_ctxt saved;
int rc;
+ LASSERT(mds->mds_recoverable_clients == 0);
+
/* This happens at the end when recovery is complete */
++mds->mds_mount_count;
push_ctxt(&saved, &mds->mds_ctxt, NULL);
if (!mds->mds_sb)
GOTO(err_put, rc = -ENODEV);
- spin_lock_init(&mds->mds_last_lock);
+ init_MUTEX(&mds->mds_transno_sem);
mds->mds_max_mdsize = sizeof(struct lov_mds_md);
rc = mds_fs_setup(obddev, mnt);
if (rc) {
GOTO(err_fs, rc = -ENOMEM);
}
-
- rc = mds_recover(obddev);
- if (rc)
- GOTO(err_fs, rc);
-
ptlrpc_init_client(LDLM_CB_REQUEST_PORTAL, LDLM_CB_REPLY_PORTAL,
"mds_ldlm_client", &obddev->obd_ldlm_client);
+ spin_lock_init(&mds->mds_processing_task_lock);
+ mds->mds_processing_task = 0;
+ INIT_LIST_HEAD(&mds->mds_recovery_queue);
+ INIT_LIST_HEAD(&mds->mds_delayed_reply_queue);
+
RETURN(0);
err_fs:
static unsigned long last_rcvd_slots[MDS_MAX_CLIENT_WORDS];
+#define LAST_RCVD "last_rcvd"
+
/* Add client data to the MDS. We use a bitmap to locate a free space
* in the last_rcvd file if cl_off is -1 (i.e. a new client).
* Otherwise, we have just read the data from the last_rcvd file and
* we know its offset.
*/
-int mds_client_add(struct mds_export_data *med, int cl_off)
+int mds_client_add(struct mds_obd *mds, struct mds_export_data *med, int cl_off)
{
+ int new_client = (cl_off == -1);
+
/* the bitmap operations can handle cl_off > sizeof(long) * 8, so
* there's no need for extra complication here
*/
- if (cl_off == -1) {
+ if (new_client) {
cl_off = find_first_zero_bit(last_rcvd_slots, MDS_MAX_CLIENTS);
repeat:
if (cl_off >= MDS_MAX_CLIENTS) {
cl_off, med->med_mcd->mcd_uuid);
med->med_off = cl_off;
+
+ if (new_client) {
+ struct obd_run_ctxt saved;
+ loff_t off = MDS_LR_CLIENT + (cl_off * MDS_LR_SIZE);
+ ssize_t written;
+
+ push_ctxt(&saved, &mds->mds_ctxt, NULL);
+ written = lustre_fwrite(mds->mds_rcvd_filp,
+ (char *)med->med_mcd,
+ sizeof(*med->med_mcd), &off);
+ pop_ctxt(&saved);
+
+ if (written != sizeof(*med->med_mcd)) {
+ if (written < 0)
+ RETURN(written);
+ RETURN(-EIO);
+ }
+ }
return 0;
}
int mds_client_free(struct obd_export *exp)
{
struct mds_export_data *med = &exp->exp_mds_data;
+ struct mds_obd *mds = &exp->exp_obd->u.mds;
+ struct mds_client_data zero_mcd;
+ struct obd_run_ctxt saved;
+ int written;
+ loff_t off;
if (!med->med_mcd)
RETURN(0);
LBUG();
}
+ off = med->med_off;
+
+ memset(&zero_mcd, 0, sizeof zero_mcd);
+ push_ctxt(&saved, &mds->mds_ctxt, NULL);
+ written = lustre_fwrite(mds->mds_rcvd_filp, (const char *)&zero_mcd,
+ sizeof zero_mcd, &off);
+ pop_ctxt(&saved);
+
+ if (written != sizeof zero_mcd) {
+ CERROR("error zeroing out client %s off %d in %s: %d\n",
+ med->med_mcd->mcd_uuid, med->med_off, LAST_RCVD,
+ written);
+ LBUG();
+ } else {
+ CDEBUG(D_INFO, "zeroed out disconnecting client %s at off %d\n",
+ med->med_mcd->mcd_uuid, med->med_off);
+ }
+
OBD_FREE(med->med_mcd, sizeof(*med->med_mcd));
return 0;
return 0;
}
-#define LAST_RCVD "last_rcvd"
-
static int mds_read_last_rcvd(struct obd_device *obddev, struct file *f)
{
struct mds_obd *mds = &obddev->u.mds;
struct mds_server_data *msd;
struct mds_client_data *mcd = NULL;
- loff_t fsize = f->f_dentry->d_inode->i_size;
loff_t off = 0;
int cl_off;
+ int max_off = f->f_dentry->d_inode->i_size / sizeof(*mcd);
__u64 last_rcvd = 0;
__u64 last_mount;
- int clients = 0;
int rc = 0;
OBD_ALLOC(msd, sizeof(*msd));
CDEBUG(D_INODE, "got %Lu for server last_mount value\n",
(unsigned long long)last_mount);
- for (off = MDS_LR_CLIENT, cl_off = 0, rc = sizeof(*mcd);
- off <= fsize - sizeof(*mcd) && rc == sizeof(*mcd);
- off = MDS_LR_CLIENT + ++cl_off * MDS_LR_SIZE) {
+ for (off = MDS_LR_CLIENT, cl_off = 0;
+ off < max_off;
+ off += MDS_LR_SIZE, cl_off++) {
+ int mount_age;
+
if (!mcd) {
OBD_ALLOC(mcd, sizeof(*mcd));
if (!mcd)
break;
}
+ if (mcd->mcd_uuid[0] == '\0') {
+ CDEBUG(D_INFO, "skipping zeroed client at offset %d\n",
+ cl_off);
+ continue;
+ }
+
last_rcvd = le64_to_cpu(mcd->mcd_last_rcvd);
/* The exports are cleaned up by mds_disconnect, so they
* need to be set up like real exports also.
*/
- if (last_rcvd && (last_mount - le64_to_cpu(mcd->mcd_mount_count)
- < MDS_MOUNT_RECOV)) {
+ mount_age = last_mount - le64_to_cpu(mcd->mcd_mount_count);
+ if (last_rcvd && mount_age < MDS_MOUNT_RECOV) {
struct obd_export *exp = class_new_export(obddev);
struct mds_export_data *med;
med = &exp->exp_mds_data;
med->med_mcd = mcd;
- mds_client_add(med, cl_off);
+ mds_client_add(mds, med, cl_off);
/* XXX put this in a helper if it gets more complex */
INIT_LIST_HEAD(&med->med_open_head);
spin_lock_init(&med->med_open_lock);
mcd = NULL;
- clients++;
+ mds->mds_recoverable_clients++;
MOD_INC_USE_COUNT;
} else {
CDEBUG(D_INFO,
- "ignored client %d, UUID '%s', last_mount %Ld\n",
+ "discarded client %d, UUID '%s', count %Ld\n",
cl_off, mcd->mcd_uuid,
(long long)le64_to_cpu(mcd->mcd_mount_count));
}
mds->mds_last_rcvd = last_rcvd;
}
}
- CDEBUG(D_INODE, "got %Lu for highest last_rcvd value, %d/%d clients\n",
- (unsigned long long)mds->mds_last_rcvd, clients, cl_off);
+
+ mds->mds_last_committed = mds->mds_last_rcvd;
+ if (mds->mds_recoverable_clients) {
+ CERROR("need recovery: %d recoverable clients, last_rcvd %Lu\n",
+ mds->mds_recoverable_clients, mds->mds_last_rcvd);
+ }
if (mcd)
OBD_FREE(mcd, sizeof(*mcd));
- /* After recovery, there can be no local uncommitted transactions */
- mds->mds_last_committed = mds->mds_last_rcvd;
-
return 0;
err_msd:
extern inline struct mds_obd *mds_req2mds(struct ptlrpc_request *req);
+void mds_start_transno(struct mds_obd *mds)
+{
+ ENTRY;
+ down(&mds->mds_transno_sem);
+}
+
/* Assumes caller has already pushed us into the kernel context. */
-int mds_update_last_rcvd(struct mds_obd *mds, void *handle,
- struct ptlrpc_request *req)
+int mds_finish_transno(struct mds_obd *mds, void *handle,
+ struct ptlrpc_request *req, int rc)
{
struct mds_export_data *med = &req->rq_export->exp_mds_data;
struct mds_client_data *mcd = med->med_mcd;
__u64 last_rcvd;
loff_t off;
- int rc;
+ ssize_t written;
+
+ /* Propagate error code. */
+ if (rc)
+ goto out;
/* we don't allocate new transnos for replayed requests */
- if (req->rq_level == LUSTRE_CONN_RECOVD)
- RETURN(0);
+ if (req->rq_level == LUSTRE_CONN_RECOVD) {
+ rc = 0;
+ goto out;
+ }
off = MDS_LR_CLIENT + med->med_off * MDS_LR_SIZE;
- spin_lock(&mds->mds_last_lock);
last_rcvd = ++mds->mds_last_rcvd;
- spin_unlock(&mds->mds_last_lock);
req->rq_repmsg->transno = HTON__u64(last_rcvd);
mcd->mcd_last_rcvd = cpu_to_le64(last_rcvd);
mcd->mcd_mount_count = cpu_to_le64(mds->mds_mount_count);
mcd->mcd_last_xid = cpu_to_le64(req->rq_xid);
mds_fs_set_last_rcvd(mds, handle);
- rc = lustre_fwrite(mds->mds_rcvd_filp, (char *)mcd, sizeof(*mcd), &off);
- CDEBUG(D_INODE, "wrote trans #"LPD64" for client '%s' at #%d: rc = "
- "%d\n", last_rcvd, mcd->mcd_uuid, med->med_off, rc);
+ written = lustre_fwrite(mds->mds_rcvd_filp, (char *)mcd, sizeof(*mcd),
+ &off);
+ CDEBUG(D_INODE, "wrote trans #"LPD64" for client %s at #%d: written = "
+ "%d\n", last_rcvd, mcd->mcd_uuid, med->med_off, written);
- if (rc == sizeof(*mcd))
- rc = 0;
- else {
- CERROR("error writing to last_rcvd file: rc = %d\n", rc);
- if (rc >= 0)
- rc = -EIO;
- }
+ if (written == sizeof(*mcd))
+ GOTO(out, rc = 0);
+ CERROR("error writing to last_rcvd file: rc = %d\n", rc);
+ if (written >= 0)
+ GOTO(out, rc = -EIO);
+
+ rc = 0;
+ out:
+ EXIT;
+ up(&mds->mds_transno_sem);
return rc;
}
OBD_FAIL_WRITE(OBD_FAIL_MDS_REINT_SETATTR_WRITE,
to_kdev_t(inode->i_sb->s_dev));
+ mds_start_transno(mds);
handle = mds_fs_start(mds, inode, MDS_FSOP_SETATTR);
- if (!handle)
- GOTO(out_setattr_de, rc = PTR_ERR(handle));
+ if (IS_ERR(handle)) {
+ rc = PTR_ERR(handle);
+ (void)mds_finish_transno(mds, handle, req, rc);
+ GOTO(out_setattr_de, rc);
+ }
rc = mds_fs_setattr(mds, de, handle, &rec->ur_iattr);
mds_pack_inode2body(body, inode);
}
- if (!rc)
- rc = mds_update_last_rcvd(mds, handle, req);
+ rc = mds_finish_transno(mds, handle, req, rc);
err = mds_fs_commit(mds, de->d_inode, handle);
if (err) {
rec->ur_mode |= S_ISGID;
}
+ /* From here on, we must exit via a path that calls mds_finish_transno,
+ * so that we release the mds_transno_sem (and, in the case of success,
+ * update the transno correctly). out_create_commit and
+ * out_transno_dchild are good candidates.
+ */
+ mds_start_transno(mds);
+
switch (type) {
case S_IFREG:{
handle = mds_fs_start(mds, dir, MDS_FSOP_CREATE);
- if (!handle)
- GOTO(out_create_dchild, PTR_ERR(handle));
+ if (IS_ERR(handle))
+ GOTO(out_transno_dchild, rc = PTR_ERR(handle));
rc = vfs_create(dir, dchild, rec->ur_mode);
EXIT;
break;
}
case S_IFDIR:{
handle = mds_fs_start(mds, dir, MDS_FSOP_MKDIR);
- if (!handle)
- GOTO(out_create_dchild, PTR_ERR(handle));
+ if (IS_ERR(handle))
+ GOTO(out_transno_dchild, rc = PTR_ERR(handle));
rc = vfs_mkdir(dir, dchild, rec->ur_mode);
EXIT;
break;
}
case S_IFLNK:{
handle = mds_fs_start(mds, dir, MDS_FSOP_SYMLINK);
- if (!handle)
- GOTO(out_create_dchild, PTR_ERR(handle));
+ if (IS_ERR(handle))
+ GOTO(out_transno_dchild, rc = PTR_ERR(handle));
rc = vfs_symlink(dir, dchild, rec->ur_name);
EXIT;
break;
case S_IFSOCK:{
int rdev = rec->ur_rdev;
handle = mds_fs_start(mds, dir, MDS_FSOP_MKNOD);
- if (!handle)
- GOTO(out_create_dchild, PTR_ERR(handle));
+ if (IS_ERR(handle))
+ GOTO(out_transno_dchild, rc = PTR_ERR(handle));
rc = vfs_mknod(dir, dchild, rec->ur_mode, rdev);
EXIT;
break;
}
default:
CERROR("bad file type %o creating %s\n", type, rec->ur_name);
- GOTO(out_create_dchild, rc = -EINVAL);
+ handle = NULL; /* quell uninitialized warning */
+ GOTO(out_transno_dchild, rc = -EINVAL);
}
if (rc) {
if (rec->ur_fid2->id) {
LASSERT(rec->ur_opcode & REINT_REPLAYING);
inode->i_generation = rec->ur_fid2->generation;
- /* Dirtied and committed by this setattr: */
+ /* Dirtied and committed by the upcoming setattr. */
CDEBUG(D_INODE, "recreated ino %ld with gen %ld\n",
inode->i_ino, inode->i_generation);
} else {
/* XXX should we abort here in case of error? */
}
- rc = mds_update_last_rcvd(mds, handle, req);
- if (rc) {
- CERROR("error on mds_update_last_rcvd: rc = %d\n", rc);
- GOTO(out_create_unlink, rc);
- }
-
body = lustre_msg_buf(req->rq_repmsg, offset);
mds_pack_inode2fid(&body->fid1, inode);
mds_pack_inode2body(body, inode);
}
EXIT;
out_create_commit:
+ if (rc) {
+ rc = mds_finish_transno(mds, handle, req, rc);
+ } else {
+ rc = mds_finish_transno(mds, handle, req, rc);
+ if (rc)
+ GOTO(out_create_unlink, rc);
+ }
err = mds_fs_commit(mds, dir, handle);
if (err) {
CERROR("error on commit: err = %d\n", err);
req->rq_status = rc;
return 0;
+out_transno_dchild:
+ /* Need to release the transno lock, and then put the dchild. */
+ LASSERT(rc);
+ mds_finish_transno(mds, handle, req, rc);
+ goto out_create_dchild;
+
out_create_unlink:
/* Destroy the file we just created. This should not need extra
* journal credits, as we have already modified all of the blocks
OBD_FAIL_WRITE(OBD_FAIL_MDS_REINT_UNLINK_WRITE,
to_kdev_t(dir->i_sb->s_dev));
+ mds_start_transno(mds);
switch (rec->ur_mode /* & S_IFMT ? */) {
case S_IFDIR:
handle = mds_fs_start(mds, dir, MDS_FSOP_RMDIR);
- if (!handle)
- GOTO(out_unlink_cancel, rc = PTR_ERR(handle));
+ if (IS_ERR(handle))
+ GOTO(out_unlink_cancel_transno, rc = PTR_ERR(handle));
rc = vfs_rmdir(dir, dchild);
break;
case S_IFREG:
case S_IFIFO:
case S_IFSOCK:
handle = mds_fs_start(mds, dir, MDS_FSOP_UNLINK);
- if (!handle)
- GOTO(out_unlink_cancel, rc = PTR_ERR(handle));
+ if (IS_ERR(handle))
+ GOTO(out_unlink_cancel_transno, rc = PTR_ERR(handle));
rc = vfs_unlink(dir, dchild);
break;
default:
CERROR("bad file type %o unlinking %s\n", rec->ur_mode, name);
handle = NULL;
LBUG();
- GOTO(out_unlink_cancel, rc = -EINVAL);
+ GOTO(out_unlink_cancel_transno, rc = -EINVAL);
}
- if (!rc)
- rc = mds_update_last_rcvd(mds, handle, req);
+ rc = mds_finish_transno(mds, handle, req, rc);
err = mds_fs_commit(mds, dir, handle);
if (err) {
CERROR("error on commit: err = %d\n", err);
l_dput(de);
req->rq_status = rc;
return 0;
+
+out_unlink_cancel_transno:
+ rc = mds_finish_transno(mds, handle, req, rc);
+ goto out_unlink_cancel;
}
static int mds_reint_link(struct mds_update_record *rec, int offset,
OBD_FAIL_WRITE(OBD_FAIL_MDS_REINT_LINK_WRITE,
to_kdev_t(de_src->d_inode->i_sb->s_dev));
+ mds_start_transno(mds);
handle = mds_fs_start(mds, de_tgt_dir->d_inode, MDS_FSOP_LINK);
- if (!handle)
- GOTO(out_link_dchild, rc = PTR_ERR(handle));
+ if (IS_ERR(handle)) {
+ rc = PTR_ERR(handle);
+ mds_finish_transno(mds, handle, req, rc);
+ GOTO(out_link_dchild, rc);
+ }
rc = vfs_link(de_src, de_tgt_dir->d_inode, dchild);
if (rc)
CERROR("link error %d\n", rc);
- if (!rc)
- rc = mds_update_last_rcvd(mds, handle, req);
+ rc = mds_finish_transno(mds, handle, req, rc);
err = mds_fs_commit(mds, de_tgt_dir->d_inode, handle);
if (err) {
OBD_FAIL_WRITE(OBD_FAIL_MDS_REINT_RENAME_WRITE,
to_kdev_t(de_srcdir->d_inode->i_sb->s_dev));
+ mds_start_transno(mds);
handle = mds_fs_start(mds, de_tgtdir->d_inode, MDS_FSOP_RENAME);
- if (!handle)
- GOTO(out_rename_denew, rc = PTR_ERR(handle));
+ if (IS_ERR(handle)) {
+ rc = PTR_ERR(handle);
+ mds_finish_transno(mds, handle, req, rc);
+ GOTO(out_rename_denew, rc);
+ }
+
lock_kernel();
rc = vfs_rename(de_srcdir->d_inode, de_old, de_tgtdir->d_inode, de_new,
NULL);
unlock_kernel();
- if (!rc)
- rc = mds_update_last_rcvd(mds, handle, req);
+ rc = mds_finish_transno(mds, handle, req, rc);
err = mds_fs_commit(mds, de_tgtdir->d_inode, handle);
if (err) {
INIT_LIST_HEAD(&obd->obd_imports);
spin_lock_init(&obd->obd_dev_lock);
- if (data->ioc_inlbuf2) {
- int len = strlen(data->ioc_inlbuf2) + 1;
- OBD_ALLOC(obd->obd_name, len);
- if (!obd->obd_name) {
- CERROR("no memory\n");
- LBUG();
- }
- memcpy(obd->obd_name, data->ioc_inlbuf2, len);
- } else {
- CERROR("WARNING: unnamed obd device\n");
+ if (data->ioc_inlbuf2) {
+ int len = strlen(data->ioc_inlbuf2) + 1;
+ OBD_ALLOC(obd->obd_name, len);
+ if (!obd->obd_name) {
+ CERROR("no memory\n");
+ LBUG();
}
- if (data->ioc_inlbuf3) {
- int len = strlen(data->ioc_inlbuf3);
- if (len >= sizeof(obd->obd_uuid)) {
- CERROR("uuid must be < %d bytes long\n",
- sizeof(obd->obd_uuid));
- if (obd->obd_name)
- OBD_FREE(obd->obd_name,
- strlen(obd->obd_name) + 1);
- GOTO(out, err=-EINVAL);
- }
- memcpy(obd->obd_uuid, data->ioc_inlbuf3, len);
+ memcpy(obd->obd_name, data->ioc_inlbuf2, len);
+ } else {
+ CERROR("WARNING: unnamed obd device\n");
+ }
+ if (data->ioc_inlbuf3) {
+ int len = strlen(data->ioc_inlbuf3);
+ if (len >= sizeof(obd->obd_uuid)) {
+ CERROR("uuid must be < %d bytes long\n",
+ sizeof(obd->obd_uuid));
+ if (obd->obd_name)
+ OBD_FREE(obd->obd_name,
+ strlen(obd->obd_name) + 1);
+ GOTO(out, err=-EINVAL);
}
+ memcpy(obd->obd_uuid, data->ioc_inlbuf3, len);
+ }
/* do the attach */
if (OBP(obd, attach))
err = OBP(obd,attach)(obd, sizeof(*data), data);
spin_unlock(&obddev->obd_dev_lock);
CERROR("force disconnecting %s:%s export %p\n",
export->exp_obd->obd_type->typ_name,
- export->exp_uuid, export);
+ export->exp_connection->c_remote_uuid, export);
rc = obd_disconnect(&conn);
if (rc < 0) {
/* AED: not so sure about this... We can't
request->rq_replen = lustre_msg_size(1, &size);
rc = ptlrpc_queue_wait(request);
- rc = ptlrpc_check_status(request, rc);
if (rc) {
CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
GOTO(out, rc);
request->rq_replen = lustre_msg_size(1, &size);
rc = ptlrpc_queue_wait(request);
- rc = ptlrpc_check_status(request, rc);
if (rc)
GOTO(out, rc);
request->rq_replen = lustre_msg_size(1, &size);
rc = ptlrpc_queue_wait(request);
- rc = ptlrpc_check_status(request, rc);
if (rc)
GOTO(out, rc);
request->rq_replen = lustre_msg_size(1, &size);
rc = ptlrpc_queue_wait(request);
- rc = ptlrpc_check_status(request, rc);
ptlrpc_req_finished(request);
return rc;
request->rq_replen = lustre_msg_size(1, &size);
rc = ptlrpc_queue_wait(request);
- rc = ptlrpc_check_status(request, rc);
if (rc)
GOTO(out_req, rc);
request->rq_replen = lustre_msg_size(1, &size);
rc = ptlrpc_queue_wait(request);
- rc = ptlrpc_check_status(request, rc);
if (rc)
GOTO(out, rc);
request->rq_replen = lustre_msg_size(1, &size);
rc = ptlrpc_queue_wait(request);
- rc = ptlrpc_check_status(request, rc);
if (rc)
GOTO(out, rc);
obd_count page_count, struct brw_page *pga,
struct obd_brw_set *set)
{
- struct ptlrpc_connection *connection =
- client_conn2cli(conn)->cl_import.imp_connection;
+ struct obd_import *imp = class_conn2cliimp(conn);
+ struct ptlrpc_connection *connection = imp->imp_connection;
struct ptlrpc_request *request = NULL;
struct ptlrpc_bulk_desc *desc = NULL;
struct ost_body *body;
size[1] = sizeof(struct obd_ioobj);
size[2] = page_count * sizeof(struct niobuf_remote);
- request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_READ, 3, size,
- NULL);
+ request = ptlrpc_prep_req(imp, OST_READ, 3, size, NULL);
if (!request)
RETURN(-ENOMEM);
ost_pack_ioo(&iooptr, lsm, page_count);
/* end almost identical to brw_write case */
- spin_lock(&connection->c_lock);
- xid = ++connection->c_xid_out; /* single xid for all pages */
- spin_unlock(&connection->c_lock);
+ spin_lock(&imp->imp_lock);
+ xid = ++imp->imp_last_xid; /* single xid for all pages */
+ spin_unlock(&imp->imp_lock);
obd_kmap_get(page_count, 0);
request->rq_replen = lustre_msg_size(1, size);
rc = ptlrpc_queue_wait(request);
- rc = ptlrpc_check_status(request, rc);
/*
* XXX: If there is an error during the processing of the callback,
size[1] = page_count * sizeof(*remote);
request->rq_replen = lustre_msg_size(2, size);
rc = ptlrpc_queue_wait(request);
- rc = ptlrpc_check_status(request, rc);
if (rc)
GOTO(out_unmap, rc);
request->rq_replen = lustre_msg_size(1, &size);
rc = ptlrpc_queue_wait(request);
- rc = ptlrpc_check_status(request, rc);
if (rc) {
CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
GOTO(out, rc);
struct ptlrpc_bulk_desc *desc = data;
ENTRY;
- CERROR("(not yet) starting recovery of client %p\n", desc->bd_client);
+ recovd_conn_fail(desc->bd_connection);
RETURN(1);
}
*/
atomic_set(&request->rq_refcount, 2);
- spin_lock(&conn->c_lock);
- request->rq_xid = HTON__u32(++conn->c_xid_out);
- spin_unlock(&conn->c_lock);
+ spin_lock(&imp->imp_lock);
+ request->rq_xid = HTON__u32(++imp->imp_last_xid);
+ spin_unlock(&imp->imp_lock);
request->rq_reqmsg->magic = PTLRPC_MSG_MAGIC;
request->rq_reqmsg->version = PTLRPC_MSG_VERSION;
RETURN(request);
}
-void ptlrpc_req_finished(struct ptlrpc_request *request)
-{
- if (request == NULL)
- return;
-
- if (atomic_dec_and_test(&request->rq_refcount))
- ptlrpc_free_req(request);
- else
- DEBUG_REQ(D_INFO, request, "refcount now %u",
- atomic_read(&request->rq_refcount));
-}
-
-void ptlrpc_free_req(struct ptlrpc_request *request)
+static void __ptlrpc_free_req(struct ptlrpc_request *request, int locked)
{
ENTRY;
if (request == NULL) {
request->rq_reqmsg = NULL;
}
- if (request->rq_connection) {
- spin_lock(&request->rq_connection->c_lock);
+ if (request->rq_import) {
+ if (!locked)
+ spin_lock(&request->rq_import->imp_lock);
list_del_init(&request->rq_list);
- spin_unlock(&request->rq_connection->c_lock);
+ if (!locked)
+ spin_unlock(&request->rq_import->imp_lock);
}
ptlrpc_put_connection(request->rq_connection);
EXIT;
}
+void ptlrpc_free_req(struct ptlrpc_request *request)
+{
+ __ptlrpc_free_req(request, 0);
+}
+
+static int __ptlrpc_req_finished(struct ptlrpc_request *request, int locked)
+{
+ ENTRY;
+ if (request == NULL)
+ RETURN(1);
+
+ if (atomic_dec_and_test(&request->rq_refcount)) {
+ __ptlrpc_free_req(request, locked);
+ RETURN(1);
+ }
+
+ DEBUG_REQ(D_INFO, request, "refcount now %u",
+ atomic_read(&request->rq_refcount));
+ RETURN(0);
+}
+
+void ptlrpc_req_finished(struct ptlrpc_request *request)
+{
+ __ptlrpc_req_finished(request, 0);
+}
+
static int ptlrpc_check_reply(struct ptlrpc_request *req)
{
int rc = 0;
if (req->rq_repmsg != NULL) {
- struct ptlrpc_connection *conn = req->rq_import->imp_connection;
+ struct obd_import *imp = req->rq_import;
+ struct ptlrpc_connection *conn = imp->imp_connection;
+ ENTRY;
if (req->rq_level > conn->c_level) {
- CDEBUG(D_HA,
- "rep to xid "LPD64" op %d to %s:%d: "
- "recovery started, ignoring (%d > %d)\n",
- (unsigned long long)req->rq_xid,
- req->rq_reqmsg->opc, conn->c_remote_uuid,
- req->rq_import->imp_client->cli_request_portal,
+ DEBUG_REQ(D_HA, req,
+ "recovery started, ignoring (%d > %d)",
req->rq_level, conn->c_level);
req->rq_repmsg = NULL;
GOTO(out, rc = 0);
}
req->rq_transno = NTOH__u64(req->rq_repmsg->transno);
+ spin_lock(&imp->imp_lock);
+ if (req->rq_transno > imp->imp_max_transno) {
+ imp->imp_max_transno = req->rq_transno;
+ } else if (req->rq_transno != 0) {
+ if (conn->c_level == LUSTRE_CONN_FULL) {
+ CERROR("got transno "LPD64" after "
+ LPD64": recovery may not work\n",
+ req->rq_transno, imp->imp_max_transno);
+ }
+ }
+ spin_unlock(&imp->imp_lock);
req->rq_flags |= PTL_RPC_FL_REPLIED;
GOTO(out, rc = 1);
}
if (req->rq_flags & PTL_RPC_FL_RESEND) {
- CERROR("-- RESTART --\n");
+ DEBUG_REQ(D_ERROR, req, "RESEND:");
GOTO(out, rc = 1);
}
if (req->rq_flags & PTL_RPC_FL_ERR) {
- CERROR("-- ABORTED --\n");
+ DEBUG_REQ(D_ERROR, req, "ABORTED:");
GOTO(out, rc = 1);
}
+ if (req->rq_flags & PTL_RPC_FL_RESTART) {
+ DEBUG_REQ(D_ERROR, req, "RESTART:");
+ GOTO(out, rc = 1);
+ }
out:
- CDEBUG(D_NET, "req = %p, rc = %d\n", req, rc);
+ DEBUG_REQ(D_NET, req, "rc = %d for", rc);
return rc;
}
-int ptlrpc_check_status(struct ptlrpc_request *req, int err)
+static int ptlrpc_check_status(struct ptlrpc_request *req)
{
+ int err;
ENTRY;
- if (err != 0) {
- CERROR("err is %d\n", err);
- RETURN(err);
- }
-
- if (req == NULL) {
- CERROR("req == NULL\n");
- RETURN(-ENOMEM);
- }
-
- if (req->rq_repmsg == NULL) {
- CERROR("req->rq_repmsg == NULL\n");
- RETURN(-ENOMEM);
- }
-
err = req->rq_repmsg->status;
if (req->rq_repmsg->type == NTOH__u32(PTL_RPC_MSG_ERR)) {
CERROR("req->rq_repmsg->type == PTL_RPC_MSG_ERR\n");
if (err != 0) {
if (err < 0)
- CERROR("req->rq_repmsg->status is %d\n", err);
+ CDEBUG(D_INFO, "req->rq_repmsg->status is %d\n", err);
else
CDEBUG(D_INFO, "req->rq_repmsg->status is %d\n", err);
- /* XXX: translate this error from net to host */
- RETURN(err);
}
- RETURN(0);
+ RETURN(err);
}
static void ptlrpc_cleanup_request_buf(struct ptlrpc_request *request)
return 0;
}
-/* caller must hold conn->c_lock */
-void ptlrpc_free_committed(struct ptlrpc_connection *conn)
+/* caller must hold imp->imp_lock */
+void ptlrpc_free_committed(struct obd_import *imp)
{
struct list_head *tmp, *saved;
struct ptlrpc_request *req;
-restart:
- list_for_each_safe(tmp, saved, &conn->c_sending_head) {
+ list_for_each_safe(tmp, saved, &imp->imp_request_list) {
req = list_entry(tmp, struct ptlrpc_request, rq_list);
if (req->rq_flags & PTL_RPC_FL_REPLAY) {
continue;
}
- if (!(req->rq_flags & PTL_RPC_FL_REPLIED)) {
+ /* If neither replied-to nor restarted, keep it. */
+ if (!(req->rq_flags &
+ (PTL_RPC_FL_REPLIED | PTL_RPC_FL_RESTART))) {
DEBUG_REQ(D_HA, req, "keeping (in-flight)");
continue;
}
+ /* This needs to match the commit test in ptlrpc_queue_wait() */
+ if (!(req->rq_import->imp_flags & IMP_REPLAYABLE) ||
+ req->rq_transno == 0) {
+ DEBUG_REQ(D_HA, req, "keeping (queue_wait will free)");
+ continue;
+ }
+
/* not yet committed */
- if (req->rq_transno > conn->c_last_committed)
+ if (req->rq_transno > imp->imp_peer_committed_transno)
break;
DEBUG_REQ(D_HA, req, "committing (last_committed %Lu)",
- (long long)conn->c_last_committed);
- if (atomic_dec_and_test(&req->rq_refcount)) {
- /* We do this to prevent free_req deadlock. Restarting
- * after each removal is not so bad, as we are almost
- * always deleting the first item in the list.
- *
- * If we use a recursive lock here, we can skip the
- * unlock/lock/restart sequence.
- */
- spin_unlock(&conn->c_lock);
- ptlrpc_free_req(req);
- spin_lock(&conn->c_lock);
- goto restart;
- } else {
- list_del(&req->rq_list);
- list_add(&req->rq_list, &conn->c_dying_head);
- }
+ imp->imp_peer_committed_transno);
+ __ptlrpc_req_finished(req, 1);
}
EXIT;
LASSERT(conn);
-restart1:
- spin_lock(&conn->c_lock);
- list_for_each_safe(tmp, saved, &conn->c_sending_head) {
+ spin_lock(&imp->imp_lock);
+ list_for_each_safe(tmp, saved, &imp->imp_request_list) {
req = list_entry(tmp, struct ptlrpc_request, rq_list);
- if (req->rq_import != imp)
- continue;
+
/* XXX we should make sure that nobody's sleeping on these! */
DEBUG_REQ(D_HA, req, "cleaning up from sending list");
list_del_init(&req->rq_list);
req->rq_import = NULL;
- spin_unlock(&conn->c_lock);
- ptlrpc_req_finished(req);
- goto restart1;
+ __ptlrpc_req_finished(req, 0);
}
-restart2:
- list_for_each_safe(tmp, saved, &conn->c_dying_head) {
- req = list_entry(tmp, struct ptlrpc_request, rq_list);
- if (req->rq_import != imp)
- continue;
- DEBUG_REQ(D_ERROR, req, "on dying list at cleanup");
- list_del_init(&req->rq_list);
- req->rq_import = NULL;
- spin_unlock(&conn->c_lock);
- ptlrpc_req_finished(req);
- spin_lock(&conn->c_lock);
- goto restart2;
- }
- spin_unlock(&conn->c_lock);
-
+ spin_unlock(&imp->imp_lock);
+
EXIT;
return;
}
void ptlrpc_continue_req(struct ptlrpc_request *req)
{
ENTRY;
- CDEBUG(D_HA, "continue delayed request "LPD64" opc %d\n",
- req->rq_xid, req->rq_reqmsg->opc);
+ DEBUG_REQ(D_HA, req, "continuing delayed request");
req->rq_reqmsg->addr = req->rq_import->imp_handle.addr;
req->rq_reqmsg->cookie = req->rq_import->imp_handle.cookie;
wake_up(&req->rq_wait_for_rep);
void ptlrpc_resend_req(struct ptlrpc_request *req)
{
ENTRY;
- CDEBUG(D_HA, "resend request "LPD64", opc %d\n",
- req->rq_xid, req->rq_reqmsg->opc);
+ DEBUG_REQ(D_HA, req, "resending");
req->rq_reqmsg->addr = req->rq_import->imp_handle.addr;
req->rq_reqmsg->cookie = req->rq_import->imp_handle.cookie;
req->rq_status = -EAGAIN;
void ptlrpc_restart_req(struct ptlrpc_request *req)
{
ENTRY;
- CDEBUG(D_HA, "restart completed request "LPD64", opc %d\n",
- req->rq_xid, req->rq_reqmsg->opc);
+ DEBUG_REQ(D_HA, req, "restarting (possibly-)completed request");
req->rq_status = -ERESTARTSYS;
- req->rq_flags |= PTL_RPC_FL_RECOVERY;
+ req->rq_flags |= PTL_RPC_FL_RESTART;
req->rq_flags &= ~PTL_RPC_FL_TIMEOUT;
wake_up(&req->rq_wait_for_rep);
EXIT;
{
int rc = 0;
struct l_wait_info lwi;
- //struct ptlrpc_client *cli = req->rq_import->imp_client;
- struct ptlrpc_connection *conn = req->rq_import->imp_connection;
+ struct obd_import *imp = req->rq_import;
+ struct ptlrpc_connection *conn = imp->imp_connection;
ENTRY;
init_waitqueue_head(&req->rq_wait_for_rep);
req->rq_reqmsg->status = HTON__u32(current->pid); /* for distributed debugging */
- CDEBUG(D_RPCTRACE, "Sending RPC pid:xid:nid:opc %d:"
- LPX64":%x:%d\n",
- NTOH__u32(req->rq_reqmsg->status),
- req->rq_xid,
- conn->c_peer.peer_nid,
- NTOH__u32(req->rq_reqmsg->opc)
- );
+ CDEBUG(D_RPCTRACE, "Sending RPC pid:xid:nid:opc %d:"LPU64":%x:%d\n",
+ NTOH__u32(req->rq_reqmsg->status), req->rq_xid,
+ conn->c_peer.peer_nid, NTOH__u32(req->rq_reqmsg->opc));
- //DEBUG_REQ(D_HA, req, "subsys: %s:", cli->cli_name);
/* XXX probably both an import and connection level are needed */
if (req->rq_level > conn->c_level) {
EIO_IF_INVALID(conn, req);
list_del(&req->rq_list);
- list_add_tail(&req->rq_list, &conn->c_sending_head);
+ list_add_tail(&req->rq_list, &imp->imp_request_list);
spin_unlock(&conn->c_lock);
rc = ptl_send_rpc(req);
if (rc) {
CDEBUG(D_HA, "error %d, opcode %d, need recovery\n", rc,
req->rq_reqmsg->opc);
- /* the sleep below will time out, triggering recovery */
+ /* sleep for a jiffy, then trigger recovery */
+ lwi = LWI_TIMEOUT_INTR(1, expired_request,
+ interrupted_request, req);
+ } else {
+ DEBUG_REQ(D_NET, req, "-- sleeping");
+ lwi = LWI_TIMEOUT_INTR(req->rq_timeout * HZ, expired_request,
+ interrupted_request, req);
}
-
- DEBUG_REQ(D_NET, req, "-- sleeping");
- lwi = LWI_TIMEOUT_INTR(req->rq_timeout * HZ, expired_request,
- interrupted_request, req);
l_wait_event(req->rq_wait_for_rep, ptlrpc_check_reply(req), &lwi);
DEBUG_REQ(D_NET, req, "-- done sleeping");
GOTO(out, rc = -EINVAL);
}
#endif
- CDEBUG(D_NET, "got rep "LPD64"\n", req->rq_xid);
+ CDEBUG(D_NET, "got rep "LPU64"\n", req->rq_xid);
if (req->rq_repmsg->status == 0)
CDEBUG(D_NET, "--> buf %p len %d status %d\n", req->rq_repmsg,
req->rq_replen, req->rq_repmsg->status);
*
* But don't commit anything that's kept indefinitely for replay (has
* the PTL_RPC_FL_REPLAY flag set), such as open requests.
+ *
+ * This needs to match the commit test in ptlrpc_free_committed().
*/
- if ((req->rq_import->imp_flags & IMP_REPLAYABLE) == 0 ||
+ if (!(req->rq_import->imp_flags & IMP_REPLAYABLE) ||
(req->rq_repmsg->transno == 0 &&
(req->rq_flags & PTL_RPC_FL_REPLAY) == 0)) {
/* This import doesn't support replay, so we can just "commit"
*/
DEBUG_REQ(D_HA, req, "not replayable, committing:");
list_del_init(&req->rq_list);
- spin_unlock(&conn->c_lock);
- ptlrpc_req_finished(req); /* Must be called unlocked. */
- spin_lock(&conn->c_lock);
- } else /* if (req->rq_import->imp_flags & IMP_REPLAYABLE) */ {
+ __ptlrpc_req_finished(req, 1);
+ }
+ if (req->rq_import->imp_flags & IMP_REPLAYABLE) {
/* Replay-enabled imports return commit-status information. */
- /* XXX this needs to be per-import, or multiple MDS services on
- * XXX the same system are going to interfere messily with each
- * XXX others' transno spaces.
- */
- conn->c_last_xid = req->rq_repmsg->last_xid;
- conn->c_last_committed = req->rq_repmsg->last_committed;
- ptlrpc_free_committed(conn);
+ imp->imp_peer_last_xid = req->rq_repmsg->last_xid;
+ imp->imp_peer_committed_transno =
+ req->rq_repmsg->last_committed;
+ ptlrpc_free_committed(imp);
}
+ rc = ptlrpc_check_status(req);
spin_unlock(&conn->c_lock);
EXIT;
GOTO(out, c);
c->c_level = LUSTRE_CONN_NEW;
- c->c_xid_in = 1;
- c->c_xid_out = 1;
c->c_generation = 1;
c->c_epoch = 1;
c->c_bootcount = 0;
c->c_flags = 0;
if (uuid)
strcpy(c->c_remote_uuid, uuid);
- INIT_LIST_HEAD(&c->c_delayed_head);
- INIT_LIST_HEAD(&c->c_sending_head);
- INIT_LIST_HEAD(&c->c_dying_head);
INIT_LIST_HEAD(&c->c_imports);
INIT_LIST_HEAD(&c->c_exports);
INIT_LIST_HEAD(&c->c_sb_chain);
INIT_LIST_HEAD(&c->c_recovd_data.rd_managed_chain);
+ INIT_LIST_HEAD(&c->c_delayed_head);
atomic_set(&c->c_refcount, 0);
ptlrpc_connection_addref(c);
spin_lock_init(&c->c_lock);
}
list_for_each_safe(tmp, pos, &conn_list) {
c = list_entry(tmp, struct ptlrpc_connection, c_link);
- CERROR("Connection %p has refcount %d at cleanup (nid=%lu)!\n",
- c, atomic_read(&c->c_refcount),
+ CERROR("Connection %p/%s has refcount %d (nid=%lu)\n",
+ c, c->c_remote_uuid, atomic_read(&c->c_refcount),
(unsigned long)c->c_peer.peer_nid);
list_del(&c->c_link);
OBD_FREE(c, sizeof(*c));
return;
}
- CERROR("connection %p to %s failed\n", conn, conn->c_remote_uuid);
- CERROR("peer is %08x %08lx %08lx\n", conn->c_peer.peer_nid,
+ CERROR("connection %p to %s (%08x %08lx %08lx) failed\n", conn,
+ conn->c_remote_uuid, conn->c_peer.peer_nid,
conn->c_peer.peer_ni.nal_idx, conn->c_peer.peer_ni.handle_idx);
list_del(&rd->rd_managed_chain);
list_add_tail(&rd->rd_managed_chain, &recovd->recovd_troubled_items);
* This code is issued under the GNU General Public License.
* See the file COPYING in this distribution
*
- * Copryright (C) 1996 Peter J. Braam <braam@stelias.com>
- * Copryright (C) 1999 Stelias Computing Inc. <braam@stelias.com>
- * Copryright (C) 1999 Seagate Technology Inc.
- * Copryright (C) 2001 Mountain View Data, Inc.
- * Copryright (C) 2002 Cluster File Systems, Inc.
+ * Copyright (C) 1996 Peter J. Braam <braam@stelias.com>
+ * Copyright (C) 1999 Stelias Computing Inc. <braam@stelias.com>
+ * Copyright (C) 1999 Seagate Technology Inc.
+ * Copyright (C) 2001 Mountain View Data, Inc.
+ * Copyright (C) 2002 Cluster File Systems, Inc.
*
*/
request->rq_level = LUSTRE_CONN_NEW;
request->rq_replen = lustre_msg_size(0, NULL);
/*
-
* This address is the export that represents our client-side LDLM
* service (for ASTs). We should only have one on this list, so we
* just grab the first one.
request->rq_reqmsg->addr = (__u64)(unsigned long)ldlmexp;
request->rq_reqmsg->cookie = ldlmexp->exp_cookie;
rc = ptlrpc_queue_wait(request);
- rc = ptlrpc_check_status(request, rc);
- if (rc) {
+ switch (rc) {
+ case EALREADY:
+ case -EALREADY:
+ /* already connected! */
+ memset(&old_hdl, 0, sizeof(old_hdl));
+ if (!memcmp(&old_hdl.addr, &request->rq_repmsg->addr,
+ sizeof (old_hdl.addr)) &&
+ !memcmp(&old_hdl.cookie, &request->rq_repmsg->cookie,
+ sizeof (old_hdl.cookie))) {
+ CERROR("%s@%s didn't like our handle %Lx/%Lx, failed\n",
+ cli->cl_target_uuid, conn->c_remote_uuid,
+ (__u64)(unsigned long)ldlmexp,
+ ldlmexp->exp_cookie);
+ GOTO(out_disc, rc = -ENOTCONN);
+ }
+
+ old_hdl.addr = request->rq_repmsg->addr;
+ old_hdl.cookie = request->rq_repmsg->cookie;
+ if (memcmp(&imp->imp_handle, &old_hdl, sizeof(old_hdl))) {
+ CERROR("%s@%s changed handle from %Lx/%Lx to %Lx/%Lx; "
+ "copying, but this may foreshadow disaster\n",
+ cli->cl_target_uuid, conn->c_remote_uuid,
+ old_hdl.addr, old_hdl.cookie,
+ imp->imp_handle.addr, imp->imp_handle.cookie);
+ imp->imp_handle.addr = request->rq_repmsg->addr;
+ imp->imp_handle.cookie = request->rq_repmsg->cookie;
+ GOTO(out_disc, rc = EALREADY);
+ }
+
+ CERROR("reconnected to %s@%s after partition\n",
+ cli->cl_target_uuid, conn->c_remote_uuid);
+ GOTO(out_disc, rc = EALREADY);
+ case 0:
+ old_hdl = imp->imp_handle;
+ imp->imp_handle.addr = request->rq_repmsg->addr;
+ imp->imp_handle.cookie = request->rq_repmsg->cookie;
+ CERROR("now connected to %s@%s (%Lx/%Lx, was %Lx/%Lx)!\n",
+ cli->cl_target_uuid, conn->c_remote_uuid,
+ imp->imp_handle.addr, imp->imp_handle.cookie,
+ old_hdl.addr, old_hdl.cookie);
+ GOTO(out_disc, rc = 0);
+ default:
CERROR("cannot connect to %s@%s: rc = %d\n",
cli->cl_target_uuid, conn->c_remote_uuid, rc);
- ptlrpc_free_req(request);
- GOTO(out_disc, rc = -ENOTCONN);
+ GOTO(out_disc, rc = -ENOTCONN); /* XXX preserve rc? */
}
-
- old_hdl = imp->imp_handle;
- imp->imp_handle.addr = request->rq_repmsg->addr;
- imp->imp_handle.cookie = request->rq_repmsg->cookie;
- CERROR("reconnected to %s@%s (%Lx/%Lx, was %Lx/%Lx)!\n",
- cli->cl_target_uuid, conn->c_remote_uuid,
- imp->imp_handle.addr, imp->imp_handle.cookie,
- old_hdl.addr, old_hdl.cookie);
- ptlrpc_req_finished(request);
out_disc:
+ ptlrpc_req_finished(request);
return rc;
}
#define REPLAY_RESEND 2 /* Resend required. */
#define REPLAY_RESEND_IGNORE 3 /* Resend, ignore the reply (already saw it). */
#define REPLAY_RESTART 4 /* Have to restart the call, sorry! */
-#define REPLAY_NO_STATE 5 /* Request doesn't change MDS state: skip. */
-static int replay_state(struct ptlrpc_request *req, __u64 last_xid)
+static int replay_state(struct ptlrpc_request *req, __u64 committed)
{
/* This request must always be replayed. */
if (req->rq_flags & PTL_RPC_FL_REPLAY)
return REPLAY_REPLAY;
/* Uncommitted request */
- if (req->rq_xid > last_xid) {
+ if (req->rq_transno > committed) {
if (req->rq_flags & PTL_RPC_FL_REPLIED) {
- if (req->rq_transno == 0) {
- /* If no transno was returned, no state was
- altered on the MDS. */
- return REPLAY_NO_STATE;
- }
-
/* Saw reply, so resend and ignore new reply. */
return REPLAY_RESEND_IGNORE;
}
static char *replay_state2str(int state) {
static char *state_strings[] = {
"COMMITTED", "REPLAY", "RESEND", "RESEND_IGNORE", "RESTART",
- "NO_STATE"
};
static char *unknown_state = "UNKNOWN";
return state_strings[state];
}
-int ptlrpc_replay(struct ptlrpc_connection *conn)
+int ptlrpc_replay(struct obd_import *imp, int unreplied_only)
{
- int rc = 0;
+ int rc = 0, state;
struct list_head *tmp, *pos;
struct ptlrpc_request *req;
+ struct ptlrpc_connection *conn = imp->imp_connection;
+ __u64 committed = imp->imp_peer_committed_transno;
ENTRY;
- spin_lock(&conn->c_lock);
+ spin_lock(&imp->imp_lock);
- CDEBUG(D_HA, "connection %p to %s has last_xid "LPD64"\n",
- conn, conn->c_remote_uuid, conn->c_last_xid);
+ CDEBUG(D_HA, "import %p from %s has committed "LPD64"\n",
+ imp, imp->imp_obd->u.cli.cl_target_uuid, committed);
- list_for_each(tmp, &conn->c_sending_head) {
- int state;
+ list_for_each(tmp, &imp->imp_request_list) {
req = list_entry(tmp, struct ptlrpc_request, rq_list);
- state = replay_state(req, conn->c_last_xid);
+ state = replay_state(req, committed);
DEBUG_REQ(D_HA, req, "SENDING: %s: ", replay_state2str(state));
}
list_for_each(tmp, &conn->c_delayed_head) {
- int state;
req = list_entry(tmp, struct ptlrpc_request, rq_list);
- state = replay_state(req, conn->c_last_xid);
- DEBUG_REQ(D_HA, req, "DELAYED: ");
+ state = replay_state(req, committed);
+ DEBUG_REQ(D_HA, req, "DELAYED: %s: ", replay_state2str(state));
}
- list_for_each_safe(tmp, pos, &conn->c_sending_head) {
+ list_for_each_safe(tmp, pos, &imp->imp_request_list) {
req = list_entry(tmp, struct ptlrpc_request, rq_list);
-
- switch (replay_state(req, conn->c_last_xid)) {
+
+ if (unreplied_only) {
+ if (!(req->rq_flags & PTL_RPC_FL_REPLIED)) {
+ DEBUG_REQ(D_HA, req, "UNREPLIED:");
+ ptlrpc_restart_req(req);
+ }
+ continue;
+ }
+
+ state = replay_state(req, committed);
+
+ if (req->rq_transno == imp->imp_max_transno) {
+ req->rq_reqmsg->flags |= MSG_LAST_REPLAY;
+ DEBUG_REQ(D_HA, req, "last for replay");
+ LASSERT(state != REPLAY_COMMITTED);
+ }
+
+ switch (state) {
case REPLAY_REPLAY:
DEBUG_REQ(D_HA, req, "REPLAY:");
rc = ptlrpc_replay_req(req);
}
break;
-
case REPLAY_COMMITTED:
- DEBUG_REQ(D_HA, req, "COMMITTED:");
- /* XXX commit now? */
- break;
-
- case REPLAY_NO_STATE:
- DEBUG_REQ(D_HA, req, "NO_STATE:");
+ DEBUG_REQ(D_ERROR, req, "COMMITTED:");
/* XXX commit now? */
break;
EXPORT_SYMBOL(ptlrpc_free_bulk);
EXPORT_SYMBOL(ptlrpc_prep_bulk_page);
EXPORT_SYMBOL(ptlrpc_free_bulk_page);
-EXPORT_SYMBOL(ptlrpc_check_status);
EXPORT_SYMBOL(ll_brw_sync_wait);
/* service.c */
osc = lookup(self.dom_node.parentNode, osc_uuid)
if osc:
n = OSC(osc)
- n.prepare()
+ try:
+ # Ignore connection failures, because the LOV will DTRT with
+ # an unconnected OSC.
+ n.prepare(ignore_connect_failure=1)
+ except CommandError:
+ print "Error preparing OSC %s (inactive)\n" % osc_uuid
else:
panic('osc not found:', osc_uuid)
mdc_uuid = prepare_mdc(self.dom_node.parentNode, self.mds_uuid)
self.lookup_server(self.ost_uuid)
self.add_module('lustre/osc', 'osc')
- def prepare(self):
+ def prepare(self, ignore_connect_failure = 0):
if is_prepared(self.uuid):
return
self.info(self.obd_uuid, self.ost_uuid)
srv = self.get_server()
- if local_net(srv):
- lctl.connect(srv.net_type, srv.nid, srv.port, srv.uuid, srv.send_mem, srv.recv_mem)
- else:
- r = find_route(srv)
- if r:
- lctl.add_route_host(r[0], srv.uuid, r[1], r[2])
+ try:
+ if local_net(srv):
+ lctl.connect(srv.net_type, srv.nid, srv.port, srv.uuid, srv.send_mem, srv.recv_mem)
else:
- panic ("no route to", srv.nid)
+ r = find_route(srv)
+ if r:
+ lctl.add_route_host(r[0], srv.uuid, r[1], r[2])
+ else:
+ panic ("no route to", srv.nid)
+ except CommandError:
+ if (ignore_connect_failure == 0):
+ pass
lctl.newdev(attach="osc %s %s" % (self.name, self.uuid),
setup ="%s %s" %(self.obd_uuid, srv.uuid))