- correct journal credits calculated for CANCEL_UNLINK_LOG (2931)
- don't close files for self_export to avoid uninitialized obd (2936)
- let lustre could be mounted with the same name for node and mds (2939)
-
+ - clear page cache after eviction (2766)
+
2004-03-04 Cluster File Systems, Inc. <info@clusterfs.com>
* version 1.2.0
* bug fixes
#define LDLM_FL_LOCAL_ONLY 0x000400 /* see ldlm_cli_cancel_unused */
/* don't run the cancel callback under ldlm_cli_cancel_unused */
-#define LDLM_FL_NO_CALLBACK 0x000800
+#define LDLM_FL_FAILED 0x000800
#define LDLM_FL_HAS_INTENT 0x001000 /* lock request has intent */
#define LDLM_FL_CANCELING 0x002000 /* lock cancel has already been sent */
void ptlrpc_wake_delayed(struct obd_import *imp);
int ptlrpc_recover_import(struct obd_import *imp, char *new_uuid);
int ptlrpc_set_import_active(struct obd_import *imp, int active);
-void ptlrpc_invalidate_import(struct obd_import *imp);
+void ptlrpc_deactivate_import(struct obd_import *imp);
+void ptlrpc_invalidate_import(struct obd_import *imp, int in_rpc);
void ptlrpc_fail_import(struct obd_import *imp, int generation);
void ptlrpc_fail_export(struct obd_export *exp);
enum obd_import_event {
IMP_EVENT_DISCON = 0x808001,
- IMP_EVENT_INVALIDATE = 0x808002,
- IMP_EVENT_ACTIVE = 0x808003,
+ IMP_EVENT_INACTIVE = 0x808002,
+ IMP_EVENT_INVALIDATE = 0x808003,
+ IMP_EVENT_ACTIVE = 0x808004,
};
struct obd_import {
struct obd_device *imp_obd;
wait_queue_head_t imp_recovery_waitq;
__u64 imp_last_replay_transno;
+ atomic_t imp_inflight;
atomic_t imp_replay_inflight;
enum lustre_imp_state imp_state;
int imp_generation;
struct ptlrpc_request_set *rq_set;
void *rq_interpret_reply; /* Async completion handler */
union ptlrpc_async_args rq_async_args; /* Async completion context */
+ void * rq_ptlrpcd_data;
};
int ptlrpc_pinger_del_import(struct obd_import *imp);
/* ptlrpc/ptlrpcd.c */
-void ptlrpcd_wake(void);
+void ptlrpcd_wake(struct ptlrpc_request *req);
void ptlrpcd_add_req(struct ptlrpc_request *req);
int ptlrpcd_addref(void);
void ptlrpcd_decref(void);
/* _cli_ is poorly named, it should be _ready_ */
struct list_head loi_cli_item;
struct list_head loi_write_item;
+ struct list_head loi_read_item;
int loi_kms_valid:1;
__u64 loi_kms; /* known minimum size */
INIT_LIST_HEAD(&loi->loi_write_lop.lop_pending_group);
INIT_LIST_HEAD(&loi->loi_cli_item);
INIT_LIST_HEAD(&loi->loi_write_item);
+ INIT_LIST_HEAD(&loi->loi_read_item);
}
struct lov_stripe_md {
spinlock_t cl_loi_list_lock;
struct list_head cl_loi_ready_list;
struct list_head cl_loi_write_list;
+ struct list_head cl_loi_read_list;
int cl_brw_in_flight;
/* just a sum of the loi/lop pending numbers to be exported by /proc */
int cl_pending_w_pages;
#define OBD_FAIL_PTLRPC 0x500
#define OBD_FAIL_PTLRPC_ACK 0x501
#define OBD_FAIL_PTLRPC_RQBD 0x502
+#define OBD_FAIL_PTLRPC_BULK_GET_NET 0x503
+#define OBD_FAIL_PTLRPC_BULK_PUT_NET 0x504
#define OBD_FAIL_OBD_PING_NET 0x600
#define OBD_FAIL_OBD_LOG_CANCEL_NET 0x601
INIT_LIST_HEAD(&cli->cl_cache_waiters);
INIT_LIST_HEAD(&cli->cl_loi_ready_list);
INIT_LIST_HEAD(&cli->cl_loi_write_list);
+ INIT_LIST_HEAD(&cli->cl_loi_read_list);
spin_lock_init(&cli->cl_loi_list_lock);
cli->cl_brw_in_flight = 0;
spin_lock_init(&cli->cl_read_rpc_hist.oh_lock);
/* Yeah, obd_no_recov also (mainly) means "forced shutdown". */
if (obd->obd_no_recov)
- ptlrpc_invalidate_import(imp);
+ ptlrpc_invalidate_import(imp, 0);
else
rc = ptlrpc_disconnect_import(imp);
/* Go to sleep until the lock is granted or cancelled. */
rc = l_wait_event(lock->l_waitq,
((lock->l_req_mode == lock->l_granted_mode) ||
- (lock->l_flags & LDLM_FL_CANCEL)), &lwi);
+ (lock->l_flags & LDLM_FL_FAILED)), &lwi);
- if (lock->l_destroyed) {
+ if (lock->l_destroyed || lock->l_flags & LDLM_FL_FAILED) {
LDLM_DEBUG(lock, "client-side enqueue waking up: destroyed");
RETURN(-EIO);
}
w->w_lock = LDLM_LOCK_GET(lock);
- /* Prevent the cancel callback from being called by setting
- * LDLM_FL_CANCEL in the lock. Very sneaky. -p */
- if (flags & LDLM_FL_NO_CALLBACK)
- w->w_lock->l_flags |= LDLM_FL_CANCEL;
-
list_add(&w->w_list, &list);
}
l_unlock(&ns->ns_lock);
*
* If flags & LDLM_FL_LOCAL_ONLY, throw the locks away without trying
* to notify the server.
- * If flags & LDLM_FL_NO_CALLBACK, don't run the cancel callback.
* If flags & LDLM_FL_WARN, print a warning if some locks are still in use. */
int ldlm_cli_cancel_unused(struct ldlm_namespace *ns,
struct ldlm_res_id *res_id, int flags, void *opaque)
lock = list_entry(tmp, struct ldlm_lock, l_res_link);
LDLM_LOCK_GET(lock);
- lock->l_flags |= LDLM_FL_CANCEL;
+ lock->l_flags |= LDLM_FL_FAILED;
lock->l_flags |= flags;
if (local_only && (lock->l_readers || lock->l_writers)) {
tmpex.l_extent.end = tmpex.l_extent.start + PAGE_CACHE_SIZE - 1;
/* check to see if another DLM lock covers this page */
- rc2 = ldlm_lock_match(lock->l_resource->lr_namespace,
+ ldlm_lock2handle(lock, &lockh);
+ rc2 = ldlm_lock_match(NULL,
LDLM_FL_BLOCK_GRANTED|LDLM_FL_CBPENDING |
LDLM_FL_TEST_LOCK,
- &lock->l_resource->lr_name, LDLM_EXTENT,
- &tmpex, LCK_PR | LCK_PW, &lockh);
+ NULL, 0, &tmpex, 0, &lockh);
if (rc2 == 0 && page->mapping != NULL) {
// checking again to account for writeback's lock_page()
LL_CDEBUG_PAGE(D_PAGE, page, "truncating\n");
{
struct ll_async_page *llap;
struct page *page;
+ ENTRY;
llap = llap_from_cookie(data);
if (IS_ERR(llap)) {
page = llap->llap_page;
LASSERT(PageLocked(page));
+ LL_CDEBUG_PAGE(D_PAGE, page, "completing cmd %d with %d\n", cmd, rc);
+
if (rc == 0) {
if (cmd == OBD_BRW_READ) {
if (!llap->llap_defer_uptodate)
} else {
llap->llap_write_queued = 0;
}
+ ClearPageError(page);
} else {
+ if (cmd == OBD_BRW_READ)
+ llap->llap_defer_uptodate = 0;
SetPageError(page);
}
- LL_CDEBUG_PAGE(D_PAGE, page, "io complete, unlocking\n");
unlock_page(page);
}
page_cache_release(page);
+ EXIT;
}
static int ll_writepage_24(struct page *page)
case IMP_EVENT_DISCON: {
break;
}
+ case IMP_EVENT_INACTIVE: {
+ if (obd->obd_observer)
+ rc = obd_notify(obd->obd_observer, obd, 0);
+ break;
+ }
case IMP_EVENT_INVALIDATE: {
struct ldlm_namespace *ns = obd->obd_namespace;
ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
- if (obd->obd_observer)
- rc = obd_notify(obd->obd_observer, obd, 0);
break;
}
case IMP_EVENT_ACTIVE: {
down(&parent_inode->i_sem);
de = lookup_one_len(fidname, mds->mds_objects_dir, namelen);
if (de == NULL || de->d_inode == NULL) {
- CERROR("destroying non-existent object "LPU64"\n", oa->o_id);
+ CERROR("destroying non-existent object "LPU64" %s\n",
+ oa->o_id, fidname);
GOTO(out_dput, rc = IS_ERR(de) ? PTR_ERR(de) : -ENOENT);
}
init_waitqueue_head(&imp->imp_recovery_waitq);
atomic_set(&imp->imp_refcount, 2);
+ atomic_set(&imp->imp_inflight, 0);
atomic_set(&imp->imp_replay_inflight, 0);
INIT_LIST_HEAD(&imp->imp_handle.h_link);
class_handle_hash(&imp->imp_handle, import_handle_addref);
/* ok, it's been put in an rpc. */
if (oap->oap_request != NULL) {
ptlrpc_mark_interrupted(oap->oap_request);
- ptlrpcd_wake();
+ ptlrpcd_wake(oap->oap_request);
GOTO(unlock, 0);
}
struct list_head *pos, *n;
ENTRY;
- CDEBUG(D_INODE, "request %p aa %p\n", request, aa);
rc = osc_brw_fini_request(request, aa->aa_oa, aa->aa_requested_nob,
aa->aa_nio_count, aa->aa_page_count,
aa->aa_pga, rc);
+ CDEBUG(D_INODE, "request %p aa %p rc %d\n", request, aa, rc);
+
cli = aa->aa_cli;
/* in failout recovery we ignore writeback failure and want
* to just tell llite to unlock the page and continue */
- if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
+ if (request->rq_reqmsg->opc == OST_WRITE &&
+ (cli->cl_import == NULL || cli->cl_import->imp_invalid)) {
+ CDEBUG(D_INODE, "flipping to rc 0 imp %p inv %d\n",
+ cli->cl_import,
+ cli->cl_import ? cli->cl_import->imp_invalid : -1);
rc = 0;
+ }
spin_lock(&cli->cl_loi_list_lock);
if (lop->lop_num_pending == 0)
RETURN(0);
+ /* if we have an invalid import we want to drain the queued pages
+ * by forcing them through rpcs that immediately fail and complete
+ * the pages. recovery relies on this to empty the queued pages
+ * before canceling the locks and evicting down the llite pages */
+ if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
+ RETURN(1);
+
/* stream rpcs in queue order as long as as there is an urgent page
* queued. this is our cheap solution for good batching in the case
* where writepage marks some random page in the middle of the file as
on_list(&loi->loi_write_item, &cli->cl_loi_write_list,
loi->loi_write_lop.lop_num_pending);
+
+ on_list(&loi->loi_read_item, &cli->cl_loi_read_list,
+ loi->loi_read_lop.lop_num_pending);
}
#define LOI_DEBUG(LOI, STR, args...) \
!list_empty(&cli->cl_loi_write_list))
RETURN(list_entry(cli->cl_loi_write_list.next,
struct lov_oinfo, loi_write_item));
+
+ /* then return all queued objects when we have an invalid import
+ * so that they get flushed */
+ if (cli->cl_import == NULL || cli->cl_import->imp_invalid) {
+ if (!list_empty(&cli->cl_loi_write_list))
+ RETURN(list_entry(cli->cl_loi_write_list.next,
+ struct lov_oinfo, loi_write_item));
+ if (!list_empty(&cli->cl_loi_read_list))
+ RETURN(list_entry(cli->cl_loi_read_list.next,
+ struct lov_oinfo, loi_read_item));
+ }
RETURN(NULL);
}
list_del_init(&loi->loi_cli_item);
if (!list_empty(&loi->loi_write_item))
list_del_init(&loi->loi_write_item);
+ if (!list_empty(&loi->loi_read_item))
+ list_del_init(&loi->loi_read_item);
loi_list_maint(cli, loi);
}
break;
}
+ case IMP_EVENT_INACTIVE: {
+ if (obd->obd_observer)
+ rc = obd_notify(obd->obd_observer, obd, 0);
+ break;
+ }
case IMP_EVENT_INVALIDATE: {
struct ldlm_namespace *ns = obd->obd_namespace;
- /* this used to try and tear down queued pages, but it was
- * not correctly implemented. We'll have to do it again once
- * we call obd_invalidate_import() agian */
- /* XXX And we still need to do this */
-
- /* Reset grants, too */
+ /* Reset grants */
cli = &obd->u.cli;
spin_lock(&cli->cl_loi_list_lock);
cli->cl_avail_grant = 0;
cli->cl_lost_grant = 0;
+ /* all pages go to failing rpcs due to the invalid import */
+ osc_check_rpcs(cli);
spin_unlock(&cli->cl_loi_list_lock);
ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
- if (obd->obd_observer)
- rc = obd_notify(obd->obd_observer, obd, 0);
break;
}
case IMP_EVENT_ACTIVE: {
list_add_tail(&req->rq_set_chain, &set->set_requests);
req->rq_set = set;
set->set_remaining++;
+ atomic_inc(&req->rq_import->imp_inflight);
}
/* lock so many callers can add things, the context that owns the set
DEBUG_REQ(D_ERROR, req, "IMP_CLOSED ");
*status = -EIO;
}
+ /* allow CONNECT even if import is invalid */
+ else if (req->rq_send_state == LUSTRE_IMP_CONNECTING &&
+ imp->imp_state == LUSTRE_IMP_CONNECTING) {
+ ;
+ }
/*
* If the import has been invalidated (such as by an OST failure), the
* request must fail with -EIO.
imp = req->rq_import;
spin_lock_irqsave(&imp->imp_lock, flags);
- if (imp->imp_invalid) {
- spin_unlock_irqrestore(&imp->imp_lock, flags);
- req->rq_status = -EIO;
- req->rq_phase = RQ_PHASE_INTERPRET;
- RETURN(-EIO);
- }
-
req->rq_import_generation = imp->imp_generation;
if (ptlrpc_import_delay_req(imp, req, &rc)) {
req->rq_reqmsg->opc);
set->set_remaining--;
+
+ atomic_dec(&imp->imp_inflight);
+ wake_up(&imp->imp_recovery_waitq);
}
/* If we hit an error, we want to recover promptly. */
LASSERT(req->rq_set == NULL);
LASSERT(!req->rq_receiving_reply);
+ atomic_inc(&imp->imp_inflight);
/* for distributed debugging */
req->rq_reqmsg->status = current->pid;
LASSERT(!req->rq_receiving_reply);
req->rq_phase = RQ_PHASE_INTERPRET;
+
+ atomic_dec(&imp->imp_inflight);
+ wake_up(&imp->imp_recovery_waitq);
RETURN(rc);
}
struct ptlrpc_connect_async_args {
__u64 pcaa_peer_committed;
int pcaa_initial_connect;
- int pcaa_was_invalid;
};
/* A CLOSED import should remain so. */
return rc;
}
-void ptlrpc_invalidate_import(struct obd_import *imp)
+/*
+ * This acts as a barrier; all existing requests are rejected, and
+ * no new requests will be accepted until the import is valid again.
+ */
+void ptlrpc_deactivate_import(struct obd_import *imp)
{
- struct obd_device *obd = imp->imp_obd;
unsigned long flags;
ENTRY;
spin_lock_irqsave(&imp->imp_lock, flags);
- /* This is a bit of a hack, but invalidating replayable
- * imports makes a temporary reconnect failure into a much more
- * ugly -- and hard to remedy -- situation. */
- if (!imp->imp_replayable) {
- CDEBUG(D_HA, "setting import %s INVALID\n",
- imp->imp_target_uuid.uuid);
- imp->imp_invalid = 1;
- }
+ CDEBUG(D_HA, "setting import %s INVALID\n",
+ imp->imp_target_uuid.uuid);
+ imp->imp_invalid = 1;
imp->imp_generation++;
spin_unlock_irqrestore(&imp->imp_lock, flags);
ptlrpc_abort_inflight(imp);
- obd_import_event(obd, imp, IMP_EVENT_INVALIDATE);
+ obd_import_event(imp->imp_obd, imp, IMP_EVENT_INACTIVE);
}
-void ptlrpc_validate_import(struct obd_import *imp)
+/*
+ * This function will invalidate the import, if necessary, then block
+ * for all the RPC completions, and finally notify the obd to
+ * invalidate its state (ie cancel locks, clear pending requests,
+ * etc).
+ *
+ * in_rpc: true if this is called while processing an rpc, like
+ * CONNECT. It will allow for one RPC to be inflight while
+ * waiting for requests to complete. Ugly, yes, but I don't see an
+ * cleaner way right now.
+ */
+void ptlrpc_invalidate_import(struct obd_import *imp, int in_rpc)
+{
+ struct l_wait_info lwi;
+ int inflight = 0;
+ int rc;
+
+ if (!imp->imp_invalid)
+ ptlrpc_deactivate_import(imp);
+
+ LASSERT(imp->imp_invalid);
+
+ if (in_rpc)
+ inflight = 1;
+ /* wait for all requests to error out and call completion
+ callbacks */
+ lwi = LWI_TIMEOUT_INTR(MAX(obd_timeout * HZ, 1), NULL,
+ NULL, NULL);
+ rc = l_wait_event(imp->imp_recovery_waitq,
+ (atomic_read(&imp->imp_inflight) == inflight),
+ &lwi);
+
+ if (rc)
+ CERROR("%s: rc = %d waiting for callback (%d != %d)\n",
+ atomic_read(&imp->imp_inflight), inflight,
+ imp->imp_target_uuid.uuid, rc);
+
+ obd_import_event(imp->imp_obd, imp, IMP_EVENT_INVALIDATE);
+}
+
+static void ptlrpc_activate_import(struct obd_import *imp)
{
struct obd_device *obd = imp->imp_obd;
unsigned long flags;
imp->imp_target_uuid.uuid,
imp->imp_connection->c_remote_uuid.uuid,
imp->imp_obd->obd_name);
- ptlrpc_invalidate_import(imp);
+ ptlrpc_deactivate_import(imp);
}
CDEBUG(D_HA, "%s: waking up pinger\n",
int initial_connect = 0;
int rc;
__u64 committed_before_reconnect = 0;
- int was_invalid = 0;
struct ptlrpc_request *request;
int size[] = {sizeof(imp->imp_target_uuid),
sizeof(obd->obd_uuid),
}
- if (imp->imp_invalid) {
- imp->imp_invalid = 0;
- was_invalid = 1;
- }
spin_unlock_irqrestore(&imp->imp_lock, flags);
aa->pcaa_peer_committed = committed_before_reconnect;
aa->pcaa_initial_connect = initial_connect;
- aa->pcaa_was_invalid = was_invalid;
if (aa->pcaa_initial_connect)
imp->imp_replayable = 1;
imp->imp_target_uuid.uuid,
imp->imp_connection->c_remote_uuid.uuid);
}
- IMPORT_SET_STATE(imp, LUSTRE_IMP_RECOVER);
+
+ if (imp->imp_invalid)
+ IMPORT_SET_STATE(imp, LUSTRE_IMP_EVICTED);
+ else
+ IMPORT_SET_STATE(imp, LUSTRE_IMP_RECOVER);
}
- else if (MSG_CONNECT_RECOVERING & msg_flags) {
+ else if ((MSG_CONNECT_RECOVERING & msg_flags) && !imp->imp_invalid) {
LASSERT(imp->imp_replayable);
- imp->imp_state = LUSTRE_IMP_RECOVER;
imp->imp_remote_handle = request->rq_repmsg->handle;
IMPORT_SET_STATE(imp, LUSTRE_IMP_REPLAY);
}
finish:
rc = ptlrpc_import_recovery_state_machine(imp);
if (rc != 0) {
- if (aa->pcaa_was_invalid)
- ptlrpc_invalidate_import(imp);
-
if (rc == -ENOTCONN) {
CDEBUG(D_HA, "evicted/aborted by %s@%s during recovery;"
"invalidating and reconnecting\n",
if (rc != 0) {
IMPORT_SET_STATE(imp, LUSTRE_IMP_DISCON);
if (aa->pcaa_initial_connect && !imp->imp_initial_recov) {
- ptlrpc_invalidate_import(imp);
+ ptlrpc_deactivate_import(imp);
}
CDEBUG(D_ERROR, "recovery of %s on %s failed (%d)\n",
imp->imp_target_uuid.uuid,
RETURN(0);
}
-
int ptlrpc_import_recovery_state_machine(struct obd_import *imp)
{
int rc = 0;
CDEBUG(D_HA, "evicted from %s@%s; invalidating\n",
imp->imp_target_uuid.uuid,
imp->imp_connection->c_remote_uuid.uuid);
- ptlrpc_invalidate_import(imp);
+
+ ptlrpc_invalidate_import(imp, 1);
+
IMPORT_SET_STATE(imp, LUSTRE_IMP_RECOVER);
}
if (rc)
GOTO(out, rc);
IMPORT_SET_STATE(imp, LUSTRE_IMP_FULL);
- ptlrpc_validate_import(imp);
+ ptlrpc_activate_import(imp);
}
if (imp->imp_state == LUSTRE_IMP_FULL) {
__u64 xid;
ENTRY;
+ if (OBD_FAIL_CHECK_ONCE(OBD_FAIL_PTLRPC_BULK_PUT_NET))
+ RETURN(0);
+
/* NB no locking required until desc is on the network */
LASSERT (!desc->bd_network_rw);
LASSERT (desc->bd_type == BULK_PUT_SOURCE ||
ptl_md_t md;
ENTRY;
+ if (OBD_FAIL_CHECK_ONCE(OBD_FAIL_PTLRPC_BULK_GET_NET))
+ RETURN(0);
+
/* NB no locking required until desc is on the network */
LASSERT (desc->bd_nob > 0);
LASSERT (!desc->bd_network_rw);
EXPORT_SYMBOL(ptlrpc_resend);
EXPORT_SYMBOL(ptlrpc_wake_delayed);
EXPORT_SYMBOL(ptlrpc_set_import_active);
+EXPORT_SYMBOL(ptlrpc_deactivate_import);
EXPORT_SYMBOL(ptlrpc_invalidate_import);
EXPORT_SYMBOL(ptlrpc_fail_import);
EXPORT_SYMBOL(ptlrpc_fail_export);
#include <linux/lprocfs_status.h>
#define LIOD_STOP 0
-static struct ptlrpcd_ctl {
+struct ptlrpcd_ctl {
unsigned long pc_flags;
spinlock_t pc_lock;
struct completion pc_starting;
struct list_head pc_req_list;
wait_queue_head_t pc_waitq;
struct ptlrpc_request_set *pc_set;
-} ptlrpcd_pc;
+#ifndef __KERNEL__
+ int pc_recurred;
+ void *pc_callback;
+#endif
+};
+
+static struct ptlrpcd_ctl ptlrpcd_pc;
+static struct ptlrpcd_ctl ptlrpcd_recovery_pc;
static DECLARE_MUTEX(ptlrpcd_sem);
static int ptlrpcd_users = 0;
-void ptlrpcd_wake(void)
+void ptlrpcd_wake(struct ptlrpc_request *req)
{
- struct ptlrpcd_ctl *pc = &ptlrpcd_pc;
+ struct ptlrpcd_ctl *pc = req->rq_ptlrpcd_data;
+
+ LASSERT(pc != NULL);
+
wake_up(&pc->pc_waitq);
}
void ptlrpcd_add_req(struct ptlrpc_request *req)
{
- struct ptlrpcd_ctl *pc = &ptlrpcd_pc;
+ struct ptlrpcd_ctl *pc;
+
+ if (req->rq_send_state == LUSTRE_IMP_FULL)
+ pc = &ptlrpcd_pc;
+ else
+ pc = &ptlrpcd_recovery_pc;
ptlrpc_set_add_new_req(pc->pc_set, req);
- ptlrpcd_wake();
+ req->rq_ptlrpcd_data = pc;
+
+ ptlrpcd_wake(req);
}
static int ptlrpcd_check(struct ptlrpcd_ctl *pc)
return 0;
}
#else
-static int ptlrpcd_recurred = 0;
-static void *ptlrpcd_callback;
int ptlrpcd_check_async_rpcs(void *arg)
{
int rc = 0;
/* single threaded!! */
- ptlrpcd_recurred++;
+ pc->pc_recurred++;
- if (ptlrpcd_recurred == 1)
+ if (pc->pc_recurred == 1)
rc = ptlrpcd_check(pc);
- ptlrpcd_recurred--;
+ pc->pc_recurred--;
return rc;
}
#endif
-int ptlrpcd_addref(void)
+static int ptlrpcd_start(struct ptlrpcd_ctl *pc)
{
- struct ptlrpcd_ctl *pc = &ptlrpcd_pc;
int rc = 0;
- ENTRY;
-
- down(&ptlrpcd_sem);
- if (++ptlrpcd_users != 1)
- GOTO(out, rc);
memset(pc, 0, sizeof(*pc));
init_completion(&pc->pc_starting);
wait_for_completion(&pc->pc_starting);
#else
- ptlrpcd_callback =
+ pc->pc_callback =
liblustre_register_wait_callback(&ptlrpcd_check_async_rpcs, pc);
#endif
out:
+ RETURN(rc);
+}
+
+static void ptlrpcd_stop(struct ptlrpcd_ctl *pc)
+{
+ set_bit(LIOD_STOP, &pc->pc_flags);
+ wake_up(&pc->pc_waitq);
+#ifdef __KERNEL__
+ wait_for_completion(&pc->pc_finishing);
+#else
+ liblustre_deregister_wait_callback(pc->pc_callback);
+#endif
+ ptlrpc_set_destroy(pc->pc_set);
+}
+
+int ptlrpcd_addref(void)
+{
+ int rc = 0;
+ ENTRY;
+
+ down(&ptlrpcd_sem);
+ if (++ptlrpcd_users != 1)
+ GOTO(out, rc);
+
+ rc = ptlrpcd_start(&ptlrpcd_pc);
+ if (rc) {
+ --ptlrpcd_users;
+ GOTO(out, rc);
+ }
+
+ rc = ptlrpcd_start(&ptlrpcd_recovery_pc);
+ if (rc) {
+ ptlrpcd_stop(&ptlrpcd_pc);
+ --ptlrpcd_users;
+ GOTO(out, rc);
+ }
+out:
up(&ptlrpcd_sem);
RETURN(rc);
}
void ptlrpcd_decref(void)
{
- struct ptlrpcd_ctl *pc = &ptlrpcd_pc;
-
down(&ptlrpcd_sem);
if (--ptlrpcd_users == 0) {
- set_bit(LIOD_STOP, &pc->pc_flags);
- wake_up(&pc->pc_waitq);
-#ifdef __KERNEL__
- wait_for_completion(&pc->pc_finishing);
-#else
- liblustre_deregister_wait_callback(ptlrpcd_callback);
-#endif
- ptlrpc_set_destroy(pc->pc_set);
+ ptlrpcd_stop(&ptlrpcd_pc);
+ ptlrpcd_stop(&ptlrpcd_recovery_pc);
}
up(&ptlrpcd_sem);
}
imp->imp_target_uuid.uuid,
imp->imp_connection->c_remote_uuid.uuid);
- ptlrpc_set_import_discon(imp);
+ if (ptlrpc_set_import_discon(imp)) {
+ if (!imp->imp_replayable) {
+ CDEBUG(D_HA, "import %s@%s for %s not replayable, "
+ "auto-deactivating\n",
+ imp->imp_target_uuid.uuid,
+ imp->imp_connection->c_remote_uuid.uuid,
+ imp->imp_obd->obd_name);
+ ptlrpc_deactivate_import(imp);
+ }
+
+ rc = ptlrpc_connect_import(imp, NULL);
+ }
- rc = ptlrpc_connect_import(imp, NULL);
/* Wait for recovery to complete and resend. If evicted, then
this request will be errored out later.*/
* This should only be called by the ioctl interface, currently
* with the lctl deactivate and activate commands.
*/
-int ptlrpc_set_import_active(struct obd_import *imp, int active)
+int ptlrpc_set_import_active(struct obd_import *imp, int active)
{
struct obd_device *obd = imp->imp_obd;
- unsigned long flags;
int rc = 0;
LASSERT(obd);
/* When deactivating, mark import invalid, and abort in-flight
* requests. */
if (!active) {
- ptlrpc_invalidate_import(imp);
+ ptlrpc_invalidate_import(imp, 0);
}
/* When activating, mark import valid, and attempt recovery */
if (active) {
CDEBUG(D_HA, "setting import %s VALID\n",
imp->imp_target_uuid.uuid);
- spin_lock_irqsave(&imp->imp_lock, flags);
- imp->imp_invalid = 0;
- spin_unlock_irqrestore(&imp->imp_lock, flags);
-
rc = ptlrpc_recover_import(imp, NULL);
- if (rc) {
- spin_lock_irqsave(&imp->imp_lock, flags);
- imp->imp_invalid = 1;
- spin_unlock_irqrestore(&imp->imp_lock, flags);
- }
}
RETURN(rc);
set -e
+# 17 = bug 2732
+ALWAYS_EXCEPT="17"
+
+
LUSTRE=${LUSTRE:-`dirname $0`/..}
UPCALL=${UPCALL:-$PWD/recovery-small-upcall.sh}
. $LUSTRE/tests/test-framework.sh
exit $?
fi
+if [ "$ONLY" == "cleanup" ]; then
+ sysctl -w portals.debug=0 || true
+ cleanup
+ exit
+fi
+
REFORMAT=--reformat $SETUP
unset REFORMAT
}
run_test 15 "failed open (-ENOMEM)"
+test_16() {
+# OBD_FAIL_PTLRPC_BULK_PUT_NET | OBD_FAIL_ONCE
+ do_facet client cp /etc/termcap $MOUNT
+ sync
+
+ sysctl -w lustre.fail_loc=0x80000504
+ cancel_lru_locks OSC
+ # wil get evicted here
+ do_facet client "diff /etc/termcap $MOUNT/termcap" && return 1
+ sysctl -w lustre.fail_loc=0
+ do_facet client "diff /etc/termcap $MOUNT/termcap" || return 2
+
+}
+run_test 16 "timeout bulk put, evict client (2732)"
+
+test_17() {
+# OBD_FAIL_PTLRPC_BULK_GET_NET | OBD_FAIL_ONCE
+ # wil get evicted here
+ sysctl -w lustre.fail_loc=0x80000503
+ do_facet client cp /etc/termcap $MOUNT && return 1
+
+ do_facet client "diff /etc/termcap $MOUNT/termcap" && return 1
+ sysctl -w lustre.fail_loc=0
+ do_facet client "diff /etc/termcap $MOUNT/termcap" || return 2
+
+}
+run_test 17 "timeout bulk get, evict client (2732)"
+
+test_18() {
+# OBD_FAIL_PTLRPC_BULK_PUT_NET|OBD_FAIL_ONCE
+ do_facet client mkdir -p $MOUNT/$tdir
+ f=$MOUNT/$tdir/$tfile
+ f2=$MOUNT/$tdir/${tfile}-2
+
+ cancel_lru_locks OSC
+ for a in /proc/fs/lustre/llite/*/dump_page_cache; do
+ if [ `wc -l $a | awk '{print $1}'` -gt 1 ]; then
+ echo there is still data in page cache $a ?
+ cat $a;
+ return 1;
+ fi
+ done
+
+ # shouldn't have to set stripe size of count==1
+ lfs setstripe $f $((128 * 1024)) 0 1
+ lfs setstripe $f2 $((128 * 1024)) 0 1
+
+ do_facet client cp /etc/termcap $f
+ sync
+ # just use this write to trigger the client's eviction from the ost
+ sysctl -w lustre.fail_loc=0x80000503
+ do_facet client dd if=/dev/zero of=$f2 bs=4k count=1
+ sync
+ sysctl -w lustre.fail_loc=0
+ # allow recovery to complete
+ sleep 10
+ # my understanding is that there should be nothing in the page
+ # cache after the client reconnects?
+ for a in /proc/fs/lustre/llite/*/dump_page_cache; do
+ if [ `wc -l $a | awk '{print $1}'` -gt 1 ]; then
+ echo there is still data in page cache $a ?
+ cat $a;
+ return 1;
+ fi
+ done
+}
+run_test 18 "eviction and reconnect clears page cache (2766)"
+
$CLEANUP
run_test 5 "Fail OST during iozone"
kbytesfree() {
- cat /proc/fs/lustre/osc/OSC_*MNT*/kbytesfree | awk '{total+=$1} END {print total}'
+ awk '{total+=$1} END {print total}' /proc/fs/lustre/osc/OSC_*MNT*/kbytesfree
}
test_6() {
f=$DIR/$tfile
+ sync
before=`kbytesfree`
dd if=/dev/urandom bs=1024 count=5120 of=$f
#define OBD_FAIL_MDS_REINT_NET_REP 0x119
# test MDS recovery after ost failure
test_42() {
+ blocks=`df $MOUNT | tail -1 | awk '{ print $1 }'`
createmany -o $DIR/$tfile-%d 800
replay_barrier ost
unlinkmany $DIR/$tfile-%d 0 400
facet_failover ost
- # osc is evicted after
- df $MOUNT && return 1
- df $MOUNT || return 2
+ # osc is evicted, fs is smaller
+ blocks_after=`df $MOUNT | tail -1 | awk '{ print $1 }'`
+ [ $blocks_after -lt $blocks ] || return 1
echo wait for MDS to timeout and recover
sleep $((TIMEOUT * 2))
unlinkmany $DIR/$tfile-%d 400 400
wait_for_host() {
HOST=$1
check_network $HOST 900
- while ! do_node $HOST "$CHECKSTAT -t dir $LUSTRE"; do sleep 5; done
while ! do_node $HOST "ls -d $LUSTRE " > /dev/null; do sleep 5; done
}