unsigned int portal_subsystem_debug = ~0 - (S_PORTALS | S_QSWNAL | S_SOCKNAL |
S_GMNAL | S_IBNAL);
unsigned int portal_debug = (D_WARNING | D_DLMTRACE | D_ERROR | D_EMERG | D_HA |
- D_RPCTRACE | D_VFSTRACE);
+ D_RPCTRACE | D_VFSTRACE | D_MALLOC);
unsigned int portal_cerror = 1;
unsigned int portal_printk;
unsigned int portal_stack;
- don't write pages to disk if there was an error (1450)
- don't ping imports that have recovery disabled (2676)
- take buffered bytes into account when balancing socknal conn (2817)
+ - hold a DLM lock over readdir always, use truncate_inode_pages (2706)
+ - reconnect unlink llog connection after MDS reconnects to OST (2816)
+ - remove little-endian swabbing of llog records (1987)
+ - set/limit i_blksize to LL_MAX_BLKSIZE on client (2884)
+ - retry reposting request buffers if they fail (1191)
+ - grow extent at grant time to avoid granting a revoked lock (2809)
+ - lock revoke doesn't evict page if covered by a second lock (2765)
+ - disable VM readahead to avoid reading outside lock extents (2805)
* miscellania
- return LL_SUPER_MAGIC from statfs for the filesystem type (1972)
#ifdef __KERNEL__
# include <linux/proc_fs.h>
-#endif
+#endif
#include <linux/lustre_lib.h>
#include <linux/lustre_net.h>
#define LDLM_CB_BLOCKING 1
#define LDLM_CB_CANCELING 2
-#define L2B(c) (1 << c)
-
/* compatibility matrix */
-#define LCK_COMPAT_EX L2B(LCK_NL)
-#define LCK_COMPAT_PW (LCK_COMPAT_EX | L2B(LCK_CR))
-#define LCK_COMPAT_PR (LCK_COMPAT_PW | L2B(LCK_PR))
-#define LCK_COMPAT_CW (LCK_COMPAT_PW | L2B(LCK_CW))
-#define LCK_COMPAT_CR (LCK_COMPAT_CW | L2B(LCK_PR) | L2B(LCK_PW))
-#define LCK_COMPAT_NL (LCK_COMPAT_CR | L2B(LCK_EX))
+#define LCK_COMPAT_EX LCK_NL
+#define LCK_COMPAT_PW (LCK_COMPAT_EX | LCK_CR)
+#define LCK_COMPAT_PR (LCK_COMPAT_PW | LCK_PR)
+#define LCK_COMPAT_CW (LCK_COMPAT_PW | LCK_CW)
+#define LCK_COMPAT_CR (LCK_COMPAT_CW | LCK_PR | LCK_PW)
+#define LCK_COMPAT_NL (LCK_COMPAT_CR | LCK_EX)
static ldlm_mode_t lck_compat_array[] = {
[LCK_EX] LCK_COMPAT_EX,
[LCK_NL] LCK_COMPAT_NL
};
-static inline int lockmode_compat(ldlm_mode_t exist, ldlm_mode_t new)
+static inline void lockmode_verify(ldlm_mode_t mode)
{
- LASSERT(exist >= LCK_EX && exist <= LCK_NL);
- LASSERT(new >= LCK_EX && new <= LCK_NL);
+ LASSERT(mode >= LCK_EX && mode <= LCK_NL);
+}
- return (lck_compat_array[exist] & L2B(new));
+static inline int lockmode_compat(ldlm_mode_t exist, ldlm_mode_t new)
+{
+ return (lck_compat_array[exist] & new);
}
/*
-
*/
-struct ldlm_lock;
-struct ldlm_resource;
+struct ldlm_lock;
+struct ldlm_resource;
struct ldlm_namespace;
typedef int (*ldlm_res_policy)(struct ldlm_namespace *, struct ldlm_lock **,
struct list_head ns_root_list; /* all root resources in ns */
struct lustre_lock ns_lock; /* protects hash, refcount, list */
struct list_head ns_list_chain; /* position in global NS list */
- /*
+ /*
struct proc_dir_entry *ns_proc_dir;
*/
struct list_head l_lru;
struct list_head l_res_link; // position in one of three res lists
struct list_head l_export_chain; // per-export chain of locks
- struct list_head l_pending_chain; // locks with callbacks pending
- unsigned long l_callback_timeout;
ldlm_mode_t l_req_mode;
ldlm_mode_t l_granted_mode;
ldlm_completion_callback l_completion_ast;
ldlm_blocking_callback l_blocking_ast;
ldlm_glimpse_callback l_glimpse_ast;
- void *l_ast_data;
struct obd_export *l_export;
- /* XXX phil can fix this, I'm sure */
struct obd_export *l_conn_export;
-// struct lustre_handle *l_connh;
__u32 l_flags;
struct lustre_handle l_remote_handle;
ldlm_policy_data_t l_policy_data;
- /* This LVB is used only on the client side, as temporary storage for
- * a lock value block received during an enqueue */
- __u32 l_lvb_len;
- void *l_lvb_data;
- void *l_lvb_swabber;
-
__u32 l_readers;
__u32 l_writers;
__u8 l_destroyed;
* on this waitq to learn when it becomes granted. */
wait_queue_head_t l_waitq;
struct timeval l_enqueued_time;
- unsigned long l_last_used; /* jiffies */
-};
+ unsigned long l_last_used; /* jiffies */
+ struct ldlm_extent l_req_extent;
+
+ /* Client-side-only members */
+ __u32 l_lvb_len; /* temporary storage for */
+ void *l_lvb_data; /* an LVB received during */
+ void *l_lvb_swabber; /* an enqueue */
+ void *l_ast_data;
+
+ /* Server-side-only members */
+ struct list_head l_pending_chain; /* callbacks pending */
+ unsigned long l_callback_timeout;
+};
#define LDLM_PLAIN 10
#define LDLM_EXTENT 11
CDEBUG(level, "### " format \
" ns: %s lock: %p/"LPX64" lrc: %d/%d,%d mode: %s/%s " \
"res: "LPU64"/"LPU64" rrc: %d type: %s ["LPU64"->"LPU64\
- "] flags: %x remote: "LPX64" expref: %d\n" , ## a, \
+ "] (req "LPU64"->"LPU64") flags: %x remote: "LPX64 \
+ " expref: %d\n" , ## a, \
lock->l_resource->lr_namespace->ns_name, lock, \
lock->l_handle.h_cookie, atomic_read(&lock->l_refc), \
lock->l_readers, lock->l_writers, \
ldlm_typename[lock->l_resource->lr_type], \
lock->l_policy_data.l_extent.start, \
lock->l_policy_data.l_extent.end, \
+ lock->l_req_extent.start, lock->l_req_extent.end, \
lock->l_flags, lock->l_remote_handle.cookie, \
lock->l_export ? \
atomic_read(&lock->l_export->exp_refcount) : -99); \
/* lock types */
typedef enum {
LCK_EX = 1,
- LCK_PW,
- LCK_PR,
- LCK_CW,
- LCK_CR,
- LCK_NL
+ LCK_PW = 2,
+ LCK_PR = 4,
+ LCK_CW = 8,
+ LCK_CR = 16,
+ LCK_NL = 32
} ldlm_mode_t;
struct ldlm_extent {
int llog_initiator_connect(struct llog_ctxt *ctxt);
int llog_receptor_accept(struct llog_ctxt *ctxt, struct obd_import *imp);
int llog_origin_connect(struct llog_ctxt *ctxt, int count,
- struct llog_logid *logid, struct llog_gen *gen);
+ struct llog_logid *logid, struct llog_gen *gen,
+ struct obd_uuid *uuid);
int llog_handle_connect(struct ptlrpc_request *req);
/* recov_thread.c */
struct llog_cookie *cookies, int flags);
int llog_obd_repl_sync(struct llog_ctxt *ctxt, struct obd_export *exp);
int llog_repl_connect(struct llog_ctxt *ctxt, int count,
- struct llog_logid *logid, struct llog_gen *gen);
+ struct llog_logid *logid, struct llog_gen *gen,
+ struct obd_uuid *uuid);
struct llog_operations {
int (*lop_write_rec)(struct llog_handle *loghandle,
int (*lop_cancel)(struct llog_ctxt *ctxt, struct lov_stripe_md *lsm,
int count, struct llog_cookie *cookies, int flags);
int (*lop_connect)(struct llog_ctxt *ctxt, int count,
- struct llog_logid *logid, struct llog_gen *gen);
+ struct llog_logid *logid, struct llog_gen *gen,
+ struct obd_uuid *uuid);
/* XXX add 2 more: commit callbacks and llog recovery functions */
};
RETURN(-EOPNOTSUPP);
if (buf)
- buflen = le32_to_cpu(rec->lrh_len) + sizeof(struct llog_rec_hdr)
+ buflen = rec->lrh_len + sizeof(struct llog_rec_hdr)
+ sizeof(struct llog_rec_tail);
else
- buflen = le32_to_cpu(rec->lrh_len);
+ buflen = rec->lrh_len;
LASSERT(size_round(buflen) == buflen);
rc = lop->lop_write_rec(handle, rec, logcookies, numcookies, buf, idx);
}
static inline int llog_connect(struct llog_ctxt *ctxt, int count,
- struct llog_logid *logid, struct llog_gen *gen)
+ struct llog_logid *logid, struct llog_gen *gen,
+ struct obd_uuid *uuid)
{
struct llog_operations *lop;
int rc;
if (lop->lop_connect == NULL)
RETURN(-EOPNOTSUPP);
- rc = lop->lop_connect(ctxt, count, logid, gen);
+ rc = lop->lop_connect(ctxt, count, logid, gen, uuid);
RETURN(rc);
}
struct ptlrpc_srv_ni *rqbd_srv_ni;
ptl_handle_md_t rqbd_md_h;
int rqbd_refcount;
- int rqbd_eventcount;
char *rqbd_buffer;
struct ptlrpc_cb_id rqbd_cbid;
struct ptlrpc_request rqbd_req;
/* Interface-specific service state */
struct ptlrpc_service *sni_service; /* owning service */
struct ptlrpc_ni *sni_ni; /* network interface */
- struct list_head sni_rqbds; /* all the request buffers */
+ struct list_head sni_active_rqbds; /* req buffers receiving */
struct list_head sni_active_replies; /* all the active replies */
int sni_nrqbd_receiving; /* # posted request buffers */
};
int srv_nthreads; /* # running threads */
int srv_n_difficult_replies; /* # 'difficult' replies */
int srv_n_active_reqs; /* # reqs being served */
+ int srv_rqbd_timeout; /* timeout before re-posting reqs */
__u32 srv_req_portal;
__u32 srv_rep_portal;
int srv_n_queued_reqs; /* # reqs waiting to be served */
struct list_head srv_request_queue; /* reqs waiting for service */
+ struct list_head srv_idle_rqbds; /* request buffers to be reposted */
+
atomic_t srv_outstanding_replies;
struct list_head srv_reply_queue; /* replies waiting for service */
int ptlrpc_error(struct ptlrpc_request *req);
void ptlrpc_resend_req(struct ptlrpc_request *request);
int ptl_send_rpc(struct ptlrpc_request *request);
-void ptlrpc_register_rqbd (struct ptlrpc_request_buffer_desc *rqbd);
+int ptlrpc_register_rqbd (struct ptlrpc_request_buffer_desc *rqbd);
/* ptlrpc/client.c */
void ptlrpc_init_client(int req_portal, int rep_portal, char *name,
/* Public members. */
__u64 lsm_object_id; /* lov object id */
__u64 lsm_object_gr; /* lov object id */
- __u64 lsm_maxbytes;
+ __u64 lsm_maxbytes; /* maximum possible file size */
+ unsigned long lsm_xfersize; /* optimal transfer size */
/* LOV-private members start here -- only for use in lov/. */
__u32 lsm_magic;
#define OBD_FAIL_PTLRPC 0x500
#define OBD_FAIL_PTLRPC_ACK 0x501
+#define OBD_FAIL_PTLRPC_RQBD 0x502
#define OBD_FAIL_OBD_PING_NET 0x600
#define OBD_FAIL_OBD_LOG_CANCEL_NET 0x601
* - the maximum extent
* - containing the requested extent
* - and not overlapping existing conflicting extents outside the requested one
- *
- * An alternative policy is to not shrink the new extent when conflicts exist */
+ */
static void
ldlm_extent_internal_policy(struct list_head *queue, struct ldlm_lock *req,
struct ldlm_extent *new_ex)
{
struct list_head *tmp;
ldlm_mode_t req_mode = req->l_req_mode;
- __u64 req_start = req->l_policy_data.l_extent.start;
- __u64 req_end = req->l_policy_data.l_extent.end;
+ __u64 req_start = req->l_req_extent.start;
+ __u64 req_end = req->l_req_extent.end;
ENTRY;
- if (new_ex->start == req_start && new_ex->end == req_end) {
- EXIT;
- return;
- }
+ lockmode_verify(req_mode);
list_for_each(tmp, queue) {
struct ldlm_lock *lock;
+ struct ldlm_extent *l_extent;
+
lock = list_entry(tmp, struct ldlm_lock, l_res_link);
+ l_extent = &lock->l_policy_data.l_extent;
- if (req == lock) {
+ if (new_ex->start == req_start && new_ex->end == req_end) {
EXIT;
return;
}
- /* if lock doesn't overlap new_ex, skip it. */
- if (lock->l_policy_data.l_extent.end < new_ex->start ||
- lock->l_policy_data.l_extent.start > new_ex->end)
+ /* Don't conflict with ourselves */
+ if (req == lock)
+ continue;
+
+ /* If lock doesn't overlap new_ex, skip it. */
+ if (l_extent->end < new_ex->start ||
+ l_extent->start > new_ex->end)
continue;
/* Locks are compatible, overlap doesn't matter */
if (lockmode_compat(lock->l_req_mode, req_mode))
continue;
- if (lock->l_policy_data.l_extent.start < req_start) {
- if (lock->l_policy_data.l_extent.end == ~0) {
+ /* Locks conflicting in requested extents and we can't satisfy
+ * both locks, so ignore it. Either we will ping-pong this
+ * extent (we would regardless of what extent we granted) or
+ * lock is unused and it shouldn't limit our extent growth. */
+ if (lock->l_req_extent.end >= req_start &&
+ lock->l_req_extent.start <= req_end)
+ continue;
+
+ /* We grow extents downwards only as far as they don't overlap
+ * with already-granted locks, on the assumtion that clients
+ * will be writing beyond the initial requested end and would
+ * then need to enqueue a new lock beyond the previous request.
+ * We don't grow downwards if there are lots of lockers. */
+ if (l_extent->start < req_start) {
+ if (atomic_read(&req->l_resource->lr_refcount) > 20)
new_ex->start = req_start;
- new_ex->end = req_end;
- EXIT;
- return;
- }
- new_ex->start = min(lock->l_policy_data.l_extent.end+1,
- req_start);
+ else
+ new_ex->start = min(l_extent->end+1, req_start);
}
- if (lock->l_policy_data.l_extent.end > req_end) {
- if (lock->l_policy_data.l_extent.start == 0) {
- new_ex->start = req_start;
- new_ex->end = req_end;
- EXIT;
- return;
- }
- new_ex->end = MAX(lock->l_policy_data.l_extent.start-1,
- req_end);
+ /* If we need to cancel this lock anyways because our request
+ * overlaps the granted lock, we grow up to its requested
+ * extent start instead of limiting this extent, assuming that
+ * clients are writing forwards and the lock had over grown
+ * its extent downwards before we enqueued our request. */
+ if (l_extent->end > req_end) {
+ if (l_extent->start <= req_end)
+ new_ex->end = max(lock->l_req_extent.start - 1,
+ req_end);
+ else
+ new_ex->end = max(l_extent->start - 1, req_end);
}
}
EXIT;
}
-/* Determine if the lock is compatible with all locks on the queue. */
+/* In order to determine the largest possible extent we can grant, we need
+ * to scan all of the queues. */
+static void ldlm_extent_policy(struct ldlm_resource *res,
+ struct ldlm_lock *lock, int *flags)
+{
+ struct ldlm_extent new_ex = { .start = 0, .end = ~0};
+
+ ldlm_extent_internal_policy(&res->lr_granted, lock, &new_ex);
+ ldlm_extent_internal_policy(&res->lr_waiting, lock, &new_ex);
+
+ if (new_ex.start != lock->l_policy_data.l_extent.start ||
+ new_ex.end != lock->l_policy_data.l_extent.end) {
+ *flags |= LDLM_FL_LOCK_CHANGED;
+ lock->l_policy_data.l_extent.start = new_ex.start;
+ lock->l_policy_data.l_extent.end = new_ex.end;
+ }
+}
+
+/* Determine if the lock is compatible with all locks on the queue.
+ * We stop walking the queue if we hit ourselves so we don't take
+ * conflicting locks enqueued after us into accound, or we'd wait forever. */
static int
ldlm_extent_compat_queue(struct list_head *queue, struct ldlm_lock *req,
int send_cbs)
struct list_head *tmp;
struct ldlm_lock *lock;
ldlm_mode_t req_mode = req->l_req_mode;
- __u64 req_start = req->l_policy_data.l_extent.start;
- __u64 req_end = req->l_policy_data.l_extent.end;
+ __u64 req_start = req->l_req_extent.start;
+ __u64 req_end = req->l_req_extent.end;
int compat = 1;
ENTRY;
+ lockmode_verify(req_mode);
+
list_for_each(tmp, queue) {
lock = list_entry(tmp, struct ldlm_lock, l_res_link);
ldlm_error_t *err)
{
struct ldlm_resource *res = lock->l_resource;
- struct ldlm_extent new_ex = {0, ~0};
struct list_head rpc_list = LIST_HEAD_INIT(rpc_list);
int rc;
ENTRY;
RETURN(LDLM_ITER_STOP);
ldlm_resource_unlink_lock(lock);
+
+ ldlm_extent_policy(res, lock, flags);
ldlm_grant_lock(lock, NULL, 0, 1);
RETURN(LDLM_ITER_CONTINUE);
}
- /* In order to determine the largest possible extent we can
- * grant, we need to scan all of the queues. */
- ldlm_extent_internal_policy(&res->lr_granted, lock, &new_ex);
- ldlm_extent_internal_policy(&res->lr_waiting, lock, &new_ex);
-
- if (new_ex.start != lock->l_policy_data.l_extent.start ||
- new_ex.end != lock->l_policy_data.l_extent.end) {
- *flags |= LDLM_FL_LOCK_CHANGED;
- lock->l_policy_data.l_extent.start = new_ex.start;
- lock->l_policy_data.l_extent.end = new_ex.end;
- }
-
restart:
LASSERT(res->lr_tmp == NULL);
res->lr_tmp = &rpc_list;
GOTO(restart, -ERESTART);
*flags |= LDLM_FL_BLOCK_GRANTED;
} else {
+ ldlm_extent_policy(res, lock, flags);
ldlm_resource_unlink_lock(lock);
ldlm_grant_lock(lock, NULL, 0, 0);
}
}
}
} else {
+ lockmode_verify(mode);
+
/* This loop determines if there are existing locks
* that conflict with the new lock request. */
list_for_each(tmp, &res->lr_granted) {
/* locks are compatible, overlap doesn't matter */
if (lockmode_compat(lock->l_granted_mode, mode))
continue;
-
+
if (!ldlm_flocks_overlap(lock, req))
continue;
int ldlm_reprocess_queue(struct ldlm_resource *res, struct list_head *queue);
int ldlm_run_ast_work(struct ldlm_namespace *, struct list_head *rpc_list);
+/* ldlm_lockd.c */
+int ldlm_bl_to_thread(struct ldlm_namespace *ns, struct ldlm_lock_desc *ld,
+ struct ldlm_lock *lock);
+
/* ldlm_plain.c */
int ldlm_process_plain_lock(struct ldlm_lock *lock, int *flags, int first_enq,
ldlm_error_t *err);
GOTO(out_ldlm, rc);
}
+ ptlrpc_pinger_add_import(imp);
EXIT;
if (rc) {
{
l_lock(&lock->l_resource->lr_namespace->ns_lock);
ldlm_lock_remove_from_lru(lock);
- if (mode == LCK_NL || mode == LCK_CR || mode == LCK_PR)
+ if (mode & (LCK_NL | LCK_CR | LCK_PR))
lock->l_readers++;
- else
+ if (mode & (LCK_EX | LCK_CW | LCK_PW))
lock->l_writers++;
lock->l_last_used = jiffies;
l_unlock(&lock->l_resource->lr_namespace->ns_lock);
LDLM_DEBUG(lock, "ldlm_lock_decref(%s)", ldlm_lockname[mode]);
ns = lock->l_resource->lr_namespace;
l_lock(&ns->ns_lock);
- if (mode == LCK_NL || mode == LCK_CR || mode == LCK_PR) {
+ if (mode & (LCK_NL | LCK_CR | LCK_PR)) {
LASSERT(lock->l_readers > 0);
lock->l_readers--;
- } else {
+ }
+ if (mode & (LCK_EX | LCK_CW | LCK_PW)) {
LASSERT(lock->l_writers > 0);
lock->l_writers--;
}
"warning\n");
LDLM_DEBUG(lock, "final decref done on cbpending lock");
- l_unlock(&ns->ns_lock);
- l_check_no_ns_lock(ns);
- /* FIXME: need a real 'desc' here */
- if (lock->l_blocking_ast != NULL)
- lock->l_blocking_ast(lock, NULL, lock->l_ast_data,
- LDLM_CB_BLOCKING);
+ LDLM_LOCK_GET(lock); /* dropped by bl thread */
+ ldlm_lock_remove_from_lru(lock);
+ ldlm_bl_to_thread(ns, NULL, lock);
+ l_unlock(&ns->ns_lock);
} else if (ns->ns_client == LDLM_NAMESPACE_CLIENT &&
!lock->l_readers && !lock->l_writers) {
/* If this is a client-side namespace and this was the last
lock->l_readers == 0 && lock->l_writers == 0)
continue;
- if (lock->l_req_mode != mode)
+ if (!(lock->l_req_mode & mode))
continue;
if (lock->l_resource->lr_type == LDLM_EXTENT &&
!(lock->l_flags & LDLM_FL_LOCAL))
continue;
- ldlm_lock_addref_internal(lock, mode);
+ if (flags & LDLM_FL_TEST_LOCK)
+ LDLM_LOCK_GET(lock);
+ else
+ ldlm_lock_addref_internal(lock, mode);
return lock;
}
* If 'flags' contains LDLM_FL_CBPENDING, then locks that have been marked
* to be canceled can still be matched as long as they still have reader
* or writer refernces
+ * If 'flags' contains LDLM_FL_TEST_LOCK, then don't actually reference a lock,
+ * just tell us if we would have matched.
*
* Returns 1 if it finds an already-existing lock that is compatible; in this
* case, lockh is filled in with a addref()ed lock
if (old_lock)
LDLM_LOCK_PUT(old_lock);
+ if (flags & LDLM_FL_TEST_LOCK && rc)
+ LDLM_LOCK_PUT(lock);
return rc;
}
struct ldlm_resource *res;
l_lock(&ns->ns_lock);
- while(!list_empty(&exp->exp_ldlm_data.led_held_locks)) {
+ while(!list_empty(&exp->exp_ldlm_data.led_held_locks)) {
lock = list_entry(exp->exp_ldlm_data.led_held_locks.next,
struct ldlm_lock, l_export_chain);
res = ldlm_resource_getref(lock->l_resource);
/* LDLM state */
-static struct ldlm_state *ldlm ;
+static struct ldlm_state *ldlm_state;
inline unsigned long round_timeout(unsigned long timeout)
{
if (dlm_req->lock_desc.l_resource.lr_type != LDLM_PLAIN)
memcpy(&lock->l_policy_data, &dlm_req->lock_desc.l_policy_data,
sizeof(ldlm_policy_data_t));
+ if (dlm_req->lock_desc.l_resource.lr_type == LDLM_EXTENT)
+ memcpy(&lock->l_req_extent, &lock->l_policy_data.l_extent,
+ sizeof(lock->l_req_extent));
err = ldlm_lock_enqueue(obddev->obd_namespace, &lock, cookie, &flags);
if (err)
}
#ifdef __KERNEL__
-static int ldlm_bl_to_thread(struct ldlm_state *ldlm, struct ldlm_namespace *ns,
- struct ldlm_lock_desc *ld, struct ldlm_lock *lock)
+int ldlm_bl_to_thread(struct ldlm_namespace *ns, struct ldlm_lock_desc *ld,
+ struct ldlm_lock *lock)
{
- struct ldlm_bl_pool *blp = ldlm->ldlm_bl_pool;
+ struct ldlm_bl_pool *blp = ldlm_state->ldlm_bl_pool;
struct ldlm_bl_work_item *blwi;
ENTRY;
RETURN(-ENOMEM);
blwi->blwi_ns = ns;
- blwi->blwi_ld = *ld;
+ if (ld != NULL)
+ blwi->blwi_ld = *ld;
blwi->blwi_lock = lock;
spin_lock(&blp->blp_lock);
case LDLM_BL_CALLBACK:
CDEBUG(D_INODE, "blocking ast\n");
#ifdef __KERNEL__
- rc = ldlm_bl_to_thread(ldlm, ns, &dlm_req->lock_desc, lock);
+ rc = ldlm_bl_to_thread(ns, &dlm_req->lock_desc, lock);
ldlm_callback_reply(req, rc);
#else
rc = 0;
#endif
ENTRY;
- if (ldlm != NULL)
+ if (ldlm_state != NULL)
RETURN(-EALREADY);
- OBD_ALLOC(ldlm, sizeof(*ldlm));
- if (ldlm == NULL)
+ OBD_ALLOC(ldlm_state, sizeof(*ldlm_state));
+ if (ldlm_state == NULL)
RETURN(-ENOMEM);
#ifdef __KERNEL__
GOTO(out_free, rc);
#endif
- ldlm->ldlm_cb_service =
+ ldlm_state->ldlm_cb_service =
ptlrpc_init_svc(LDLM_NBUFS, LDLM_BUFSIZE, LDLM_MAXREQSIZE,
LDLM_CB_REQUEST_PORTAL, LDLM_CB_REPLY_PORTAL,
ldlm_callback_handler, "ldlm_cbd",
ldlm_svc_proc_dir);
- if (!ldlm->ldlm_cb_service) {
+ if (!ldlm_state->ldlm_cb_service) {
CERROR("failed to start service\n");
GOTO(out_proc, rc = -ENOMEM);
}
- ldlm->ldlm_cancel_service =
- ptlrpc_init_svc(LDLM_NBUFS, LDLM_BUFSIZE, LDLM_MAXREQSIZE,
+ ldlm_state->ldlm_cancel_service =
+ ptlrpc_init_svc(LDLM_NBUFS, LDLM_BUFSIZE, LDLM_MAXREQSIZE,
LDLM_CANCEL_REQUEST_PORTAL,
LDLM_CANCEL_REPLY_PORTAL,
ldlm_cancel_handler, "ldlm_canceld",
ldlm_svc_proc_dir);
- if (!ldlm->ldlm_cancel_service) {
+ if (!ldlm_state->ldlm_cancel_service) {
CERROR("failed to start service\n");
GOTO(out_proc, rc = -ENOMEM);
}
OBD_ALLOC(blp, sizeof(*blp));
if (blp == NULL)
GOTO(out_proc, rc = -ENOMEM);
- ldlm->ldlm_bl_pool = blp;
+ ldlm_state->ldlm_bl_pool = blp;
atomic_set(&blp->blp_num_threads, 0);
init_waitqueue_head(&blp->blp_waitq);
wait_for_completion(&blp->blp_comp);
}
- rc = ptlrpc_start_n_threads(NULL, ldlm->ldlm_cancel_service,
+ rc = ptlrpc_start_n_threads(NULL, ldlm_state->ldlm_cancel_service,
LDLM_NUM_THREADS, "ldlm_cn");
if (rc) {
LBUG();
GOTO(out_thread, rc);
}
- rc = ptlrpc_start_n_threads(NULL, ldlm->ldlm_cb_service,
+ rc = ptlrpc_start_n_threads(NULL, ldlm_state->ldlm_cb_service,
LDLM_NUM_THREADS, "ldlm_cb");
if (rc) {
LBUG();
#ifdef __KERNEL__
out_thread:
- ptlrpc_unregister_service(ldlm->ldlm_cancel_service);
- ptlrpc_unregister_service(ldlm->ldlm_cb_service);
+ ptlrpc_unregister_service(ldlm_state->ldlm_cancel_service);
+ ptlrpc_unregister_service(ldlm_state->ldlm_cb_service);
#endif
out_proc:
ldlm_proc_cleanup();
out_free:
#endif
- OBD_FREE(ldlm, sizeof(*ldlm));
- ldlm = NULL;
+ OBD_FREE(ldlm_state, sizeof(*ldlm_state));
+ ldlm_state = NULL;
return rc;
}
static int ldlm_cleanup(int force)
{
#ifdef __KERNEL__
- struct ldlm_bl_pool *blp = ldlm->ldlm_bl_pool;
+ struct ldlm_bl_pool *blp = ldlm_state->ldlm_bl_pool;
#endif
ENTRY;
}
OBD_FREE(blp, sizeof(*blp));
- ptlrpc_stop_all_threads(ldlm->ldlm_cb_service);
- ptlrpc_unregister_service(ldlm->ldlm_cb_service);
- ptlrpc_stop_all_threads(ldlm->ldlm_cancel_service);
- ptlrpc_unregister_service(ldlm->ldlm_cancel_service);
+ ptlrpc_stop_all_threads(ldlm_state->ldlm_cb_service);
+ ptlrpc_unregister_service(ldlm_state->ldlm_cb_service);
+ ptlrpc_stop_all_threads(ldlm_state->ldlm_cancel_service);
+ ptlrpc_unregister_service(ldlm_state->ldlm_cancel_service);
ldlm_proc_cleanup();
expired_lock_thread.elt_state = ELT_TERMINATE;
#endif
- OBD_FREE(ldlm, sizeof(*ldlm));
- ldlm = NULL;
+ OBD_FREE(ldlm_state, sizeof(*ldlm_state));
+ ldlm_state = NULL;
RETURN(0);
}
EXPORT_SYMBOL(ldlm_namespace_new);
EXPORT_SYMBOL(ldlm_namespace_cleanup);
EXPORT_SYMBOL(ldlm_namespace_free);
+EXPORT_SYMBOL(ldlm_namespace_dump);
+EXPORT_SYMBOL(ldlm_dump_all_namespaces);
EXPORT_SYMBOL(ldlm_resource_get);
EXPORT_SYMBOL(ldlm_resource_putref);
int compat = 1;
ENTRY;
+ lockmode_verify(req_mode);
+
list_for_each(tmp, queue) {
lock = list_entry(tmp, struct ldlm_lock, l_res_link);
lock->l_lvb_swabber = lvb_swabber;
if (policy != NULL)
memcpy(&lock->l_policy_data, policy, sizeof(*policy));
+ if (type == LDLM_EXTENT)
+ memcpy(&lock->l_req_extent, &policy->l_extent,
+ sizeof(policy->l_extent));
err = ldlm_lock_enqueue(ns, &lock, policy, flags);
if (err != ELDLM_OK)
lock->l_lvb_swabber = lvb_swabber;
if (policy != NULL)
memcpy(&lock->l_policy_data, policy, sizeof(*policy));
+ if (type == LDLM_EXTENT)
+ memcpy(&lock->l_req_extent, &policy->l_extent,
+ sizeof(policy->l_extent));
LDLM_DEBUG(lock, "client-side enqueue START");
}
int ldlm_cancel_lru(struct ldlm_namespace *ns)
{
- struct list_head *tmp, *next, list = LIST_HEAD_INIT(list);
+ struct list_head *tmp, *next;
int count, rc = 0;
- struct ldlm_ast_work *w;
ENTRY;
l_lock(&ns->ns_lock);
* won't see this flag and call l_blocking_ast */
lock->l_flags |= LDLM_FL_CBPENDING;
- OBD_ALLOC(w, sizeof(*w));
- LASSERT(w);
-
- w->w_lock = LDLM_LOCK_GET(lock);
- list_add(&w->w_list, &list);
+ LDLM_LOCK_GET(lock); /* dropped by bl thread */
ldlm_lock_remove_from_lru(lock);
+ ldlm_bl_to_thread(ns, NULL, lock);
if (--count == 0)
break;
}
l_unlock(&ns->ns_lock);
-
- list_for_each_safe(tmp, next, &list) {
- struct lustre_handle lockh;
- int rc;
- w = list_entry(tmp, struct ldlm_ast_work, w_list);
-
- ldlm_lock2handle(w->w_lock, &lockh);
- rc = ldlm_cli_cancel(&lockh);
- if (rc != ELDLM_OK)
- CDEBUG(D_INFO, "ldlm_cli_cancel: %d\n", rc);
-
- list_del(&w->w_list);
- LDLM_LOCK_PUT(w->w_lock);
- OBD_FREE(w, sizeof(*w));
- }
-
RETURN(rc);
}
static int ll_dir_readpage(struct file *file, struct page *page)
{
struct inode *inode = page->mapping->host;
- struct ll_sb_info *sbi = ll_i2sbi(inode);
struct ll_fid mdc_fid;
__u64 offset;
- int rc = 0;
struct ptlrpc_request *request;
- struct lustre_handle lockh;
struct mds_body *body;
- struct lookup_intent it = { .it_op = IT_READDIR };
- struct mdc_op_data data;
- struct obd_device *obddev = class_exp2obd(sbi->ll_mdc_exp);
- struct ldlm_res_id res_id =
- { .name = {inode->i_ino, (__u64)inode->i_generation} };
+ int rc = 0;
ENTRY;
CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p)\n", inode->i_ino,
inode->i_generation, inode);
- if ((inode->i_size + PAGE_CACHE_SIZE - 1) >> PAGE_SHIFT <= page->index){
- /* XXX why do we need this exactly, and why do we think that
- * an all-zero directory page is useful?
- */
- CERROR("memsetting dir page %lu to zero (size %lld)\n",
- page->index, inode->i_size);
- memset(kmap(page), 0, PAGE_CACHE_SIZE);
- kunmap(page);
- GOTO(readpage_out, rc);
- }
-
- rc = ldlm_lock_match(obddev->obd_namespace, LDLM_FL_BLOCK_GRANTED,
- &res_id, LDLM_PLAIN, NULL, LCK_PR, &lockh);
- if (!rc) {
- ll_prepare_mdc_op_data(&data, inode, NULL, NULL, 0, 0);
-
- rc = mdc_enqueue(sbi->ll_mdc_exp, LDLM_PLAIN, &it, LCK_PR,
- &data, &lockh, NULL, 0,
- ldlm_completion_ast, ll_mdc_blocking_ast,
- inode);
- request = (struct ptlrpc_request *)it.d.lustre.it_data;
- if (request)
- ptlrpc_req_finished(request);
- if (rc < 0) {
- CERROR("lock enqueue: err: %d\n", rc);
- unlock_page(page);
- RETURN(rc);
- }
- }
- ldlm_lock_dump_handle(D_OTHER, &lockh);
-
- if (PageUptodate(page)) {
- CERROR("Explain this please?\n");
- GOTO(readpage_out, rc);
- }
mdc_pack_fid(&mdc_fid, inode->i_ino, inode->i_generation, S_IFDIR);
offset = page->index << PAGE_SHIFT;
- rc = mdc_readpage(sbi->ll_mdc_exp, &mdc_fid,
+ rc = mdc_readpage(ll_i2sbi(inode)->ll_mdc_exp, &mdc_fid,
offset, page, &request);
if (!rc) {
body = lustre_msg_buf(request->rq_repmsg, 0, sizeof (*body));
LASSERT_REPSWABBED (request, 0); /* swabbed by mdc_readpage() */
inode->i_size = body->size;
+ SetPageUptodate(page);
}
ptlrpc_req_finished(request);
- EXIT;
-
- readpage_out:
- if (!rc)
- SetPageUptodate(page);
unlock_page(page);
- ldlm_lock_decref(&lockh, LCK_PR);
+ EXIT;
return rc;
}
static struct page *ll_get_dir_page(struct inode *dir, unsigned long n)
{
+ struct ldlm_res_id res_id =
+ { .name = { dir->i_ino, (__u64)dir->i_generation} };
+ struct lustre_handle lockh;
+ struct obd_device *obddev = class_exp2obd(ll_i2sbi(dir)->ll_mdc_exp);
struct address_space *mapping = dir->i_mapping;
- struct page *page = read_cache_page(mapping, n,
- (filler_t*)mapping->a_ops->readpage, NULL);
+ struct page *page;
+ int rc;
+
+ rc = ldlm_lock_match(obddev->obd_namespace, LDLM_FL_BLOCK_GRANTED,
+ &res_id, LDLM_PLAIN, NULL, LCK_PR, &lockh);
+ if (!rc) {
+ struct lookup_intent it = { .it_op = IT_READDIR };
+ struct ptlrpc_request *request;
+ struct mdc_op_data data;
+
+ ll_prepare_mdc_op_data(&data, dir, NULL, NULL, 0, 0);
+
+ rc = mdc_enqueue(ll_i2sbi(dir)->ll_mdc_exp, LDLM_PLAIN, &it,
+ LCK_PR, &data, &lockh, NULL, 0,
+ ldlm_completion_ast, ll_mdc_blocking_ast, dir);
+
+ request = (struct ptlrpc_request *)it.d.lustre.it_data;
+ if (request)
+ ptlrpc_req_finished(request);
+ if (rc < 0) {
+ CERROR("lock enqueue: rc: %d\n", rc);
+ return ERR_PTR(rc);
+ }
+ }
+ ldlm_lock_dump_handle(D_OTHER, &lockh);
+
+ page = read_cache_page(mapping, n,
+ (filler_t*)mapping->a_ops->readpage, NULL);
if (!IS_ERR(page)) {
wait_on_page(page);
(void)kmap(page);
if (PageError(page))
goto fail;
}
+
+out_unlock:
+ ldlm_lock_decref(&lockh, LCK_PR);
return page;
fail:
ext2_put_page(page);
- return ERR_PTR(-EIO);
+ page = ERR_PTR(-EIO);
+ goto out_unlock;
}
-
/*
* p is at least 6 bytes before the end of page
*/
int ll_readdir(struct file * filp, void * dirent, filldir_t filldir)
{
- loff_t pos = filp->f_pos;
struct inode *inode = filp->f_dentry->d_inode;
+ loff_t pos = filp->f_pos;
// XXX struct super_block *sb = inode->i_sb;
unsigned offset = pos & ~PAGE_CACHE_MASK;
unsigned long n = pos >> PAGE_CACHE_SHIFT;
unsigned chunk_mask = ~(ext2_chunk_size(inode)-1);
unsigned char *types = NULL;
int need_revalidate = (filp->f_version != inode->i_version);
+ int rc = 0;
ENTRY;
- CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p)\n", inode->i_ino,
- inode->i_generation, inode);
+ CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p) pos %llu/%llu\n",
+ inode->i_ino, inode->i_generation, inode, pos, inode->i_size);
+
if (pos > inode->i_size - EXT2_DIR_REC_LEN(1))
- GOTO(done, 0);
+ RETURN(0);
types = ext2_filetype_table;
ext2_dirent *de;
struct page *page;
- CDEBUG(D_EXT2, "reading %lu of dir %lu page %lu, size %llu\n",
- PAGE_CACHE_SIZE, inode->i_ino, n, inode->i_size);
+ CDEBUG(D_EXT2,"read %lu of dir %lu/%u page %lu/%lu size %llu\n",
+ PAGE_CACHE_SIZE, inode->i_ino, inode->i_generation,
+ n, npages, inode->i_size);
page = ll_get_dir_page(inode, n);
/* size might have been updated by mdc_readpage */
npages = dir_pages(inode);
- if (IS_ERR(page))
+ if (IS_ERR(page)) {
+ rc = PTR_ERR(page);
+ CERROR("error reading dir %lu/%u page %lu: rc %d\n",
+ inode->i_ino, inode->i_generation, n, rc);
continue;
+ }
+
kaddr = page_address(page);
if (need_revalidate) {
offset = ext2_validate_entry(kaddr, offset, chunk_mask);
int over;
unsigned char d_type = DT_UNKNOWN;
+ rc = 0; /* no error if we return something */
if (types && de->file_type < EXT2_FT_MAX)
d_type = types[de->file_type];
le32_to_cpu(de->inode), d_type);
if (over) {
ext2_put_page(page);
- GOTO(done,0);
+ GOTO(done, rc);
}
}
}
filp->f_pos = (n << PAGE_CACHE_SHIFT) | offset;
filp->f_version = inode->i_version;
update_atime(inode);
- RETURN(0);
+ RETURN(rc);
}
static int ll_dir_ioctl(struct inode *inode, struct file *file,
*
* No one can dirty the extent until we've finished our work and they can
* enqueue another lock. The DLM protects us from ll_file_read/write here,
- * but other kernel actors could have pages locked. */
+ * but other kernel actors could have pages locked.
+ *
+ * Called with the DLM lock held. */
void ll_pgcache_remove_extent(struct inode *inode, struct lov_stripe_md *lsm,
struct ldlm_lock *lock, __u32 stripe)
{
struct ldlm_extent *extent = &lock->l_policy_data.l_extent;
+ ldlm_policy_data_t tmpex;
unsigned long start, end, count, skip, i, j;
struct page *page;
- int rc, discard = lock->l_flags & LDLM_FL_DISCARD_DATA;
+ int rc, rc2, discard = lock->l_flags & LDLM_FL_DISCARD_DATA;
+ struct lustre_handle lockh;
ENTRY;
CDEBUG(D_INODE, "obdo %lu inode %p ["LPU64"->"LPU64"] size: %llu\n",
start += (start/count * skip) + (stripe * count);
if (end != ~0)
end += (end/count * skip) + (stripe * count);
- }
+ }
i = (inode->i_size + PAGE_CACHE_SIZE-1) >> PAGE_CACHE_SHIFT;
if (i < end)
lock_page(page);
}
- /* checking again to account for writeback's lock_page() */
- if (page->mapping != NULL) {
+ tmpex.l_extent.start = (__u64)page->index << PAGE_CACHE_SHIFT;
+ tmpex.l_extent.end = tmpex.l_extent.start + PAGE_CACHE_SIZE - 1;
+ /* check to see if another DLM lock covers this page */
+ rc2 = ldlm_lock_match(lock->l_resource->lr_namespace,
+ LDLM_FL_BLOCK_GRANTED|LDLM_FL_CBPENDING |
+ LDLM_FL_TEST_LOCK,
+ &lock->l_resource->lr_name, LDLM_EXTENT,
+ &tmpex, LCK_PR | LCK_PW, &lockh);
+ if (rc2 == 0 && page->mapping != NULL) {
+ // checking again to account for writeback's lock_page()
LL_CDEBUG_PAGE(page, "truncating\n");
ll_truncate_complete_page(page);
}
CERROR("ldlm_cli_cancel failed: %d\n", rc);
break;
case LDLM_CB_CANCELING: {
- struct inode *inode = ll_inode_from_lock(lock);
+ struct inode *inode;
struct ll_inode_info *lli;
struct lov_stripe_md *lsm;
__u32 stripe;
__u64 kms;
+ /* This lock wasn't granted, don't try to evict pages */
+ if (lock->l_req_mode != lock->l_granted_mode)
+ RETURN(0);
+
+ inode = ll_inode_from_lock(lock);
if (inode == NULL)
RETURN(0);
lli = ll_i2info(inode);
struct page *llap_page;
struct list_head llap_pending_write;
/* only trust these if the page lock is providing exclusion */
- int llap_write_queued:1,
+ int llap_write_queued:1,
llap_defer_uptodate:1;
struct list_head llap_proc_item;
};
#define LL_SBI_NOLCK 0x1
#define LL_SBI_READAHEAD 0x2
+#define LL_MAX_BLKSIZE (4UL * 1024 * 1024)
+
#if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0))
#define ll_s2sbi(sb) ((struct ll_sb_info *)((sb)->s_fs_info))
void __d_rehash(struct dentry * entry, int lock);
GOTO(out_root, err);
}
+ /* bug 2805 - set VM readahead to zero */
+ vm_max_readahead = vm_min_readahead = 0;
sb->s_root = d_alloc_root(root);
RETURN(err);
LBUG();
}
}
+ /* bug 2844 - limit i_blksize for broken user-space apps */
+ LASSERTF(lsm->lsm_xfersize != 0, "%lu\n", lsm->lsm_xfersize);
+ inode->i_blksize = min(lsm->lsm_xfersize, LL_MAX_BLKSIZE);
if (lli->lli_smd != lsm)
obd_free_memmd(ll_i2obdexp(inode), &lsm);
}
#include <linux/lustre_lite.h>
#include <linux/lprocfs_status.h>
#include <linux/seq_file.h>
+#include <linux/obd_support.h>
#include "llite_internal.h"
struct ll_sb_info *sbi = dp->data;
int rc;
- llap = kmalloc(sizeof(*llap), GFP_KERNEL);
+ OBD_ALLOC_GFP(llap, sizeof(*llap), GFP_KERNEL);
if (llap == NULL)
return -ENOMEM;
llap->llap_page = NULL;
llap->llap_cookie = sbi;
llap->llap_magic = 0;
-
+
rc = seq_open(file, &llite_dump_pgcache_seq_sops);
if (rc) {
- kfree(llap);
+ OBD_FREE(llap, sizeof(*llap));
return rc;
}
seq = file->private_data;
return 0;
}
-static int llite_dump_pgcache_seq_release(struct inode *inode,
+static int llite_dump_pgcache_seq_release(struct inode *inode,
struct file *file)
{
struct seq_file *seq = file->private_data;
if (!list_empty(&llap->llap_proc_item))
list_del_init(&llap->llap_proc_item);
spin_unlock(&sbi->ll_pglist_lock);
- kfree(llap);
+ OBD_FREE(llap, sizeof(*llap));
return seq_release(inode, file);
}
struct file_operations llite_dump_pgcache_fops = {
.open = llite_dump_pgcache_seq_open,
.read = seq_read,
- .release = llite_dump_pgcache_seq_release,
+ .release = llite_dump_pgcache_seq_release,
};
#endif /* LPROCFS */
page_extent.l_extent.start = (__u64)page->index << PAGE_CACHE_SHIFT;
page_extent.l_extent.end =
page_extent.l_extent.start + PAGE_CACHE_SIZE - 1;
- flags = LDLM_FL_CBPENDING | LDLM_FL_BLOCK_GRANTED;
+ flags = LDLM_FL_CBPENDING | LDLM_FL_BLOCK_GRANTED | LDLM_FL_TEST_LOCK;
matches = obd_match(ll_i2sbi(inode)->ll_osc_exp,
ll_i2info(inode)->lli_smd, LDLM_EXTENT,
- &page_extent, LCK_PR, &flags, inode, &match_lockh);
- if (matches < 0) {
+ &page_extent, LCK_PR | LCK_PW, &flags, inode,
+ &match_lockh);
+ if (matches < 0)
LL_CDEBUG_PAGE(page, "lock match failed\n");
- RETURN(matches);
- }
- if (matches) {
- obd_cancel(ll_i2sbi(inode)->ll_osc_exp,
- ll_i2info(inode)->lli_smd, LCK_PR, &match_lockh);
- }
RETURN(matches);
}
}
#define LL_RA_MIN(inode) ((unsigned long)PTL_MD_MAX_PAGES / 2)
-#define LL_RA_MAX(inode) (inode->i_blksize * 3)
+#define LL_RA_MAX(inode) ((ll_i2info(inode)->lli_smd->lsm_xfersize * 3) >> \
+ PAGE_CACHE_SHIFT)
static void ll_readahead(struct ll_readahead_state *ras,
struct obd_export *exp, struct address_space *mapping,
if (page == NULL)
break;
+ /* Don't try to readahead beyond the end of the lock extent */
+ if (ll_page_matches(page) <= 0)
+ break;
+
/* the book-keeping above promises that we've tried
* all the indices from start to end, so we don't
* stop if anyone returns an error. This may not be good. */
- if (Page_Uptodate(page) || ll_page_matches(page) <= 0)
+ if (Page_Uptodate(page))
goto next_page;
llap = llap_from_page(page);
if (rc == 0) {
static unsigned long next_print;
- CDEBUG(D_INODE, "didn't match a lock\n");
+ CDEBUG(D_INODE, "ino %lu page %lu (%llu) didn't match a lock\n",
+ inode->i_ino, page->index,
+ (long long)page->index << PAGE_CACHE_SHIFT);
if (time_after(jiffies, next_print)) {
+ CERROR("ino %lu page %lu (%llu) not covered by "
+ "a lock (mmap?). check debug logs.\n",
+ inode->i_ino, page->index,
+ (long long)page->index << PAGE_CACHE_SHIFT);
+ ldlm_dump_all_namespaces();
+ if (next_print == 0) {
+ CERROR("%s\n", portals_debug_dumpstack());
+ portals_debug_dumplog();
+ }
next_print = jiffies + 30 * HZ;
- CERROR("not covered by a lock (mmap?). check debug "
- "logs.\n");
}
}
static int lov_llog_origin_connect(struct llog_ctxt *ctxt, int count,
struct llog_logid *logid,
- struct llog_gen *gen)
+ struct llog_gen *gen,
+ struct obd_uuid *uuid)
{
struct obd_device *obd = ctxt->loc_obd;
struct lov_obd *lov = &obd->u.lov;
for (i = 0; i < lov->desc.ld_tgt_count; i++) {
struct obd_device *child = lov->tgts[i].ltd_exp->exp_obd;
struct llog_ctxt *cctxt = llog_get_context(child, ctxt->loc_idx);
- rc = llog_connect(cctxt, 1, logid, gen);
+
+ if (uuid && !obd_uuid_equals(uuid, &lov->tgts[i].uuid))
+ continue;
+
+ rc = llog_connect(cctxt, 1, logid, gen, uuid);
if (rc) {
CERROR("error osc_llog_connect %d\n", i);
break;
break;
}
if (rc == 1) {
- if (lsm->lsm_stripe_count > 1)
+ if (lsm->lsm_stripe_count > 1) {
+ if (*flags & LDLM_FL_TEST_LOCK)
+ lov_llh_destroy(lov_lockh);
lov_llh_put(lov_lockh);
+ }
RETURN(1);
}
for (i = 0; i < lov->desc.ld_tgt_count; i++) {
int er;
- if (!lov->tgts[i].active)
+ if (val && !obd_uuid_equals(val, &lov->tgts[i].uuid))
+ continue;
+
+ if (!val && !lov->tgts[i].active)
continue;
er = obd_set_info(lov->tgts[i].ltd_exp, keylen, key, vallen,
(*lsmp)->lsm_magic = LOV_MAGIC;
(*lsmp)->lsm_stripe_count = stripe_count;
(*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES * stripe_count;
+ (*lsmp)->lsm_xfersize = PTL_MTU * stripe_count;
(*lsmp)->lsm_pattern = pattern;
(*lsmp)->lsm_oinfo[0].loi_ost_idx = ~0;
lsm->lsm_object_id = le64_to_cpu(lmm->lmm_object_id);
/* lsm->lsm_object_gr = 0; implicit */
lsm->lsm_stripe_size = le32_to_cpu(lmm->lmm_stripe_size);
+ lsm->lsm_xfersize = lsm->lsm_stripe_size * lsm->lsm_stripe_count;
lsm->lsm_pattern = LOV_PATTERN_RAID0;
ost_offset = le32_to_cpu(lmm->lmm_stripe_offset);
ost_count = le16_to_cpu(lmm->lmm_ost_count);
lsm->lsm_object_gr = le64_to_cpu(lmm->lmm_object_gr);
lsm->lsm_stripe_size = le32_to_cpu(lmm->lmm_stripe_size);
lsm->lsm_pattern = le32_to_cpu(lmm->lmm_pattern);
+ lsm->lsm_xfersize = lsm->lsm_stripe_size * lsm->lsm_stripe_count;
for (i = 0, loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++) {
/* XXX LOV STACKING call down to osc_unpackmd() */
(*lsmp)->lsm_oinfo[0].loi_ost_idx = lum.lmm_stripe_offset;
(*lsmp)->lsm_stripe_size = lum.lmm_stripe_size;
+ (*lsmp)->lsm_xfersize = lum.lmm_stripe_size * stripe_count;
RETURN(0);
}
{
union ptlrpc_async_args *aa = data;
struct mdc_rpc_lock *rpc_lock = aa->pointer_arg[0];
-
- mdc_put_rpc_lock(rpc_lock, NULL);
+ struct obd_device *obd = aa->pointer_arg[1];
+
+ if (rpc_lock == NULL) {
+ CERROR("called with NULL rpc_lock\n");
+ } else {
+ mdc_put_rpc_lock(rpc_lock, NULL);
+ LASSERTF(req->rq_async_args.pointer_arg[0] ==
+ obd->u.cli.cl_rpc_lock, "%p != %p\n",
+ req->rq_async_args.pointer_arg[0],
+ obd->u.cli.cl_rpc_lock);
+ aa->pointer_arg[0] = NULL;
+ }
wake_up(&req->rq_reply_waitq);
RETURN(rc);
}
-/* We can't use ptlrpc_check_reply, because we don't want to wake up for
+/* We can't use ptlrpc_check_reply, because we don't want to wake up for
* anything but a reply or an error. */
static int mdc_close_check_reply(struct ptlrpc_request *req)
{
struct ptlrpc_request *req;
struct mdc_open_data *mod;
struct l_wait_info lwi;
- struct mdc_rpc_lock *rpc_lock = obd->u.cli.cl_rpc_lock;
ENTRY;
req = ptlrpc_prep_req(class_exp2cliimp(exp), MDS_CLOSE, 1, &reqsize,
/* We hand a ref to the rpcd here, so we need another one of our own. */
ptlrpc_request_addref(req);
- mdc_get_rpc_lock(rpc_lock, NULL);
+ mdc_get_rpc_lock(obd->u.cli.cl_rpc_lock, NULL);
req->rq_interpret_reply = mdc_close_interpret;
- req->rq_async_args.pointer_arg[0] = rpc_lock;
+ req->rq_async_args.pointer_arg[0] = obd->u.cli.cl_rpc_lock;
+ req->rq_async_args.pointer_arg[1] = obd;
ptlrpcd_add_req(req);
lwi = LWI_TIMEOUT_INTR(MAX(req->rq_timeout * HZ, 1), go_back_to_sleep,
NULL, NULL);
"close succeeded. Please tell CFS.\n");
}
}
+ if (req->rq_async_args.pointer_arg[0] != NULL) {
+ CERROR("returned without dropping rpc_lock: rc %d\n", rc);
+ mdc_close_interpret(req, &req->rq_async_args, rc);
+ portals_debug_dumplog();
+ }
EXIT;
out:
LASSERT(llog_get_context(obd, LLOG_UNLINK_ORIG_CTXT) != NULL);
rc = llog_connect(llog_get_context(obd, LLOG_UNLINK_ORIG_CTXT),
- obd->u.mds.mds_lov_desc.ld_tgt_count, NULL, NULL);
+ obd->u.mds.mds_lov_desc.ld_tgt_count,
+ NULL, NULL, NULL);
if (rc != 0) {
CERROR("faild at llog_origin_connect: %d\n", rc);
}
static int mds_llog_origin_connect(struct llog_ctxt *ctxt, int count,
struct llog_logid *logid,
- struct llog_gen *gen)
+ struct llog_gen *gen,
+ struct obd_uuid *uuid)
{
struct obd_device *obd = ctxt->loc_obd;
struct obd_device *lov_obd = obd->u.mds.mds_osc_obd;
ENTRY;
lctxt = llog_get_context(lov_obd, ctxt->loc_idx);
- rc = llog_connect(lctxt, count, logid, gen);
+ rc = llog_connect(lctxt, count, logid, gen, uuid);
RETURN(rc);
}
if (!obd->obd_recovering) {
rc = llog_connect(llog_get_context(obd, LLOG_UNLINK_ORIG_CTXT),
obd->u.mds.mds_lov_desc.ld_tgt_count, NULL,
- NULL);
+ NULL, NULL);
if (rc != 0)
CERROR("faild at llog_origin_connect: %d\n", rc);
{
struct obd_uuid *uuid;
int rc = 0;
+ ENTRY;
if (!active)
RETURN(0);
CWARN("MDS %s: in recovery, not resetting orphans on %s\n",
obd->obd_name, uuid->uuid);
} else {
+ LASSERT(llog_get_context(obd, LLOG_UNLINK_ORIG_CTXT) != NULL);
+
+ rc = obd_set_info(obd->u.mds.mds_osc_exp, strlen("mds_conn"), "mds_conn",
+ 0, uuid);
+ if (rc != 0)
+ RETURN(rc);
+
+ rc = llog_connect(llog_get_context(obd, LLOG_UNLINK_ORIG_CTXT),
+ obd->u.mds.mds_lov_desc.ld_tgt_count,
+ NULL, NULL, uuid);
+ if (rc != 0) {
+ CERROR("faild at llog_origin_connect: %d\n", rc);
+ RETURN(rc);
+ }
+
CWARN("MDS %s: %s now active, resetting orphans\n",
obd->obd_name, uuid->uuid);
rc = mds_lov_clearorphans(&obd->u.mds, uuid);
if (!loghandle->lgh_hdr)
goto out;
- if (le32_to_cpu(loghandle->lgh_hdr->llh_flags) & LLOG_F_IS_PLAIN)
+ if (loghandle->lgh_hdr->llh_flags & LLOG_F_IS_PLAIN)
list_del_init(&loghandle->u.phd.phd_entry);
- if (le32_to_cpu(loghandle->lgh_hdr->llh_flags) & LLOG_F_IS_CAT)
+ if (loghandle->lgh_hdr->llh_flags & LLOG_F_IS_CAT)
LASSERT(list_empty(&loghandle->u.chd.chd_head));
OBD_FREE(loghandle->lgh_hdr, LLOG_CHUNK_SIZE);
RETURN(-EINVAL);
}
- llh->llh_count = cpu_to_le32(le32_to_cpu(llh->llh_count) - 1);
+ llh->llh_count--;
- if ((le32_to_cpu(llh->llh_flags) & LLOG_F_ZAP_WHEN_EMPTY) &&
- (le32_to_cpu(llh->llh_count) == 1) &&
+ if ((llh->llh_flags & LLOG_F_ZAP_WHEN_EMPTY) &&
+ (llh->llh_count == 1) &&
(loghandle->lgh_last_idx == (LLOG_BITMAP_BYTES * 8) - 1)) {
rc = llog_destroy(loghandle);
if (rc)
RETURN(-ENOMEM);
handle->lgh_hdr = llh;
/* first assign flags to use llog_client_ops */
- llh->llh_flags = cpu_to_le32(flags);
+ llh->llh_flags = flags;
rc = llog_read_header(handle);
if (rc == 0) {
- flags = le32_to_cpu(llh->llh_flags);
+ flags = llh->llh_flags;
if (uuid)
LASSERT(obd_uuid_equals(uuid, &llh->llh_tgtuuid));
GOTO(out, rc);
rc = 0;
handle->lgh_last_idx = 0; /* header is record with index 0 */
- llh->llh_count = cpu_to_le32(1); /* for the header record */
- llh->llh_hdr.lrh_type = cpu_to_le32(LLOG_HDR_MAGIC);
- llh->llh_hdr.lrh_len = llh->llh_tail.lrt_len =
- cpu_to_le32(LLOG_CHUNK_SIZE);
+ llh->llh_count = 1; /* for the header record */
+ llh->llh_hdr.lrh_type = LLOG_HDR_MAGIC;
+ llh->llh_hdr.lrh_len = llh->llh_tail.lrt_len = LLOG_CHUNK_SIZE;
llh->llh_hdr.lrh_index = llh->llh_tail.lrt_index = 0;
- llh->llh_timestamp = cpu_to_le64(LTIME_S(CURRENT_TIME));
+ llh->llh_timestamp = LTIME_S(CURRENT_TIME);
if (uuid)
memcpy(&llh->llh_tgtuuid, uuid, sizeof(llh->llh_tgtuuid));
- llh->llh_bitmap_offset = cpu_to_le32(offsetof(typeof(*llh),llh_bitmap));
+ llh->llh_bitmap_offset = offsetof(typeof(*llh),llh_bitmap);
ext2_set_bit(0, llh->llh_bitmap);
out:
if (flags & LLOG_F_IS_CAT) {
INIT_LIST_HEAD(&handle->u.chd.chd_head);
- llh->llh_size = cpu_to_le32(sizeof(struct llog_logid_rec));
+ llh->llh_size = sizeof(struct llog_logid_rec);
}
else if (flags & LLOG_F_IS_PLAIN)
INIT_LIST_HEAD(&handle->u.phd.phd_entry);
GOTO(out, rc);
rec = buf;
- idx = le32_to_cpu(rec->lrh_index);
+ idx = rec->lrh_index;
if (idx < index)
CDEBUG(D_HA, "index %u : idx %u\n", index, idx);
while (idx < index) {
- rec = ((void *)rec + le32_to_cpu(rec->lrh_len));
+ rec = (struct llog_rec_hdr *)
+ ((char *)rec + rec->lrh_len);
idx ++;
}
++index;
if (index > last_index)
GOTO(out, rc = 0);
- rec = ((void *)rec + le32_to_cpu(rec->lrh_len));
+ rec = (struct llog_rec_hdr *)
+ ((char *)rec + rec->lrh_len);
}
}
index = (cathandle->lgh_last_idx + 1) % bitmap_size;
/* maximum number of available slots in catlog is bitmap_size - 2 */
- if (llh->llh_cat_idx == cpu_to_le32(index)) {
+ if (llh->llh_cat_idx == index) {
CERROR("no free catalog slots for log...\n");
RETURN(ERR_PTR(-ENOSPC));
} else {
LBUG(); /* should never happen */
}
cathandle->lgh_last_idx = index;
- llh->llh_count = cpu_to_le32(le32_to_cpu(llh->llh_count) + 1);
- llh->llh_tail.lrt_index = cpu_to_le32(index);
+ llh->llh_count++;
+ llh->llh_tail.lrt_index = index;
}
rc = llog_create(cathandle->lgh_ctxt, &loghandle, NULL, NULL);
LPX64"\n", loghandle->lgh_id.lgl_oid, loghandle->lgh_id.lgl_ogen,
index, cathandle->lgh_id.lgl_oid);
/* build the record for this log in the catalog */
- rec.lid_hdr.lrh_len = cpu_to_le32(sizeof(rec));
- rec.lid_hdr.lrh_index = cpu_to_le32(index);
- rec.lid_hdr.lrh_type = cpu_to_le32(LLOG_LOGID_MAGIC);
+ rec.lid_hdr.lrh_len = sizeof(rec);
+ rec.lid_hdr.lrh_index = index;
+ rec.lid_hdr.lrh_type = LLOG_LOGID_MAGIC;
rec.lid_id = loghandle->lgh_id;
- rec.lid_tail.lrt_len = cpu_to_le32(sizeof(rec));
- rec.lid_tail.lrt_index = cpu_to_le32(index);
+ rec.lid_tail.lrt_len = sizeof(rec);
+ rec.lid_tail.lrt_index = index;
/* update the catalog: header and record */
rc = llog_write_rec(cathandle, &rec.lid_hdr,
GOTO(out_destroy, rc);
}
- loghandle->lgh_hdr->llh_cat_idx = cpu_to_le32(index);
+ loghandle->lgh_hdr->llh_cat_idx = index;
cathandle->u.chd.chd_current_log = loghandle;
LASSERT(list_empty(&loghandle->u.phd.phd_entry));
list_add_tail(&loghandle->u.phd.phd_entry, &cathandle->u.chd.chd_head);
if (!rc) {
loghandle->u.phd.phd_cat_handle = cathandle;
loghandle->u.phd.phd_cookie.lgc_lgl = cathandle->lgh_id;
- loghandle->u.phd.phd_cookie.lgc_index =
- le32_to_cpu(loghandle->lgh_hdr->llh_cat_idx);
+ loghandle->u.phd.phd_cookie.lgc_index =
+ loghandle->lgh_hdr->llh_cat_idx;
}
out:
int rc;
ENTRY;
- LASSERT(le32_to_cpu(rec->lrh_len) <= LLOG_CHUNK_SIZE);
+ LASSERT(rec->lrh_len <= LLOG_CHUNK_SIZE);
loghandle = llog_cat_current_log(cathandle, 1);
if (IS_ERR(loghandle))
RETURN(PTR_ERR(loghandle));
struct llog_handle *llh;
int rc;
- if (le32_to_cpu(rec->lrh_type) != LLOG_LOGID_MAGIC) {
+ if (rec->lrh_type != LLOG_LOGID_MAGIC) {
CERROR("invalid record in catalog\n");
RETURN(-EINVAL);
}
CWARN("processing log "LPX64":%x at index %u of catalog "LPX64"\n",
lir->lid_id.lgl_oid, lir->lid_id.lgl_ogen,
- le32_to_cpu(rec->lrh_index), cat_llh->lgh_id.lgl_oid);
+ rec->lrh_index, cat_llh->lgh_id.lgl_oid);
rc = llog_cat_id2handle(cat_llh, &llh, &lir->lid_id);
if (rc) {
int rc;
ENTRY;
- LASSERT(llh->llh_flags &cpu_to_le32(LLOG_F_IS_CAT));
+ LASSERT(llh->llh_flags & LLOG_F_IS_CAT);
d.lpd_data = data;
d.lpd_cb = cb;
CWARN("catlog "LPX64" crosses index zero\n",
cat_llh->lgh_id.lgl_oid);
- cd.first_idx = le32_to_cpu(llh->llh_cat_idx);
+ cd.first_idx = llh->llh_cat_idx;
cd.last_idx = 0;
rc = llog_process(cat_llh, llog_cat_process_cb, &d, &cd);
if (rc != 0)
ENTRY;
bitmap_size = sizeof(llh->llh_bitmap) * 8;
- if (llh->llh_cat_idx == cpu_to_le32(index - 1)) {
- idx = le32_to_cpu(llh->llh_cat_idx) + 1;
- llh->llh_cat_idx = cpu_to_le32(idx);
+ if (llh->llh_cat_idx == (index - 1)) {
+ idx = llh->llh_cat_idx + 1;
+ llh->llh_cat_idx = idx;
if (idx == cathandle->lgh_last_idx)
goto out;
for (i = (index + 1) % bitmap_size;
i != cathandle->lgh_last_idx;
i = (i + 1) % bitmap_size) {
if (!ext2_test_bit(i, llh->llh_bitmap)) {
- idx = le32_to_cpu(llh->llh_cat_idx) + 1;
- llh->llh_cat_idx = cpu_to_le32(idx);
+ idx = llh->llh_cat_idx + 1;
+ llh->llh_cat_idx = idx;
} else if (i == 0) {
llh->llh_cat_idx = 0;
} else {
}
out:
CDEBUG(D_HA, "set catlog "LPX64" first idx %u\n",
- cathandle->lgh_id.lgl_oid,le32_to_cpu(llh->llh_cat_idx));
+ cathandle->lgh_id.lgl_oid, llh->llh_cat_idx);
}
RETURN(0);
char *endp;
int cur_index, rc = 0;
- cur_index = le32_to_cpu(rec->lrh_index);
+ cur_index = rec->lrh_index;
if (ioc_data && (ioc_data->ioc_inllen1)) {
l = 0;
if (to > 0 && cur_index > to)
RETURN(-LLOG_EEMPTY);
}
- if (handle->lgh_hdr->llh_flags & cpu_to_le32(LLOG_F_IS_CAT)) {
+ if (handle->lgh_hdr->llh_flags & LLOG_F_IS_CAT) {
struct llog_logid_rec *lir = (struct llog_logid_rec *)rec;
struct llog_handle *log_handle;
- if (rec->lrh_type != cpu_to_le32(LLOG_LOGID_MAGIC)) {
+ if (rec->lrh_type != LLOG_LOGID_MAGIC) {
l = snprintf(out, remains, "[index]: %05d [type]: "
"%02x [len]: %04d failed\n",
- cur_index, le32_to_cpu(rec->lrh_type),
- le32_to_cpu(rec->lrh_len));
+ cur_index, rec->lrh_type,
+ rec->lrh_len);
}
if (handle->lgh_ctxt == NULL)
RETURN(-EOPNOTSUPP);
rc = llog_process(log_handle, llog_check_cb, NULL, NULL);
llog_close(log_handle);
} else {
- switch (le32_to_cpu(rec->lrh_type)) {
+ switch (rec->lrh_type) {
case OST_SZ_REC:
case OST_RAID1_REC:
case MDS_UNLINK_REC:
case LLOG_HDR_MAGIC: {
l = snprintf(out, remains, "[index]: %05d [type]: "
"%02x [len]: %04d ok\n",
- cur_index, le32_to_cpu(rec->lrh_type),
- le32_to_cpu(rec->lrh_len));
+ cur_index, rec->lrh_type,
+ rec->lrh_len);
out += l;
remains -= l;
if (remains <= 0) {
default: {
l = snprintf(out, remains, "[index]: %05d [type]: "
"%02x [len]: %04d failed\n",
- cur_index, le32_to_cpu(rec->lrh_type),
- le32_to_cpu(rec->lrh_len));
+ cur_index, rec->lrh_type,
+ rec->lrh_len);
out += l;
remains -= l;
if (remains <= 0) {
ioc_data->ioc_inllen1 = 0;
}
- cur_index = le32_to_cpu(rec->lrh_index);
+ cur_index = rec->lrh_index;
if (cur_index < from)
RETURN(0);
if (to > 0 && cur_index > to)
RETURN(-LLOG_EEMPTY);
- if (handle->lgh_hdr->llh_flags & cpu_to_le32(LLOG_F_IS_CAT)) {
+ if (handle->lgh_hdr->llh_flags & LLOG_F_IS_CAT) {
struct llog_logid_rec *lir = (struct llog_logid_rec *)rec;
- if (rec->lrh_type != cpu_to_le32(LLOG_LOGID_MAGIC)) {
+ if (rec->lrh_type != LLOG_LOGID_MAGIC) {
CERROR("invalid record in catalog\n");
RETURN(-EINVAL);
}
} else {
l = snprintf(out, remains,
"[index]: %05d [type]: %02x [len]: %04d\n",
- cur_index, le32_to_cpu(rec->lrh_type),
- le32_to_cpu(rec->lrh_len));
+ cur_index, rec->lrh_type,
+ rec->lrh_len);
}
out += l;
remains -= l;
struct llog_logid_rec *lir = (struct llog_logid_rec*)rec;
int rc;
- if (rec->lrh_type != cpu_to_le32(LLOG_LOGID_MAGIC))
+ if (rec->lrh_type != LLOG_LOGID_MAGIC)
return (-EINVAL);
rc = llog_remove_log(handle, &lir->lid_id);
"last index: %d\n",
handle->lgh_id.lgl_oid, handle->lgh_id.lgl_ogr,
handle->lgh_id.lgl_ogen,
- le32_to_cpu(handle->lgh_hdr->llh_flags),
- le32_to_cpu(handle->lgh_hdr->llh_flags) &
+ handle->lgh_hdr->llh_flags,
+ handle->lgh_hdr->llh_flags &
LLOG_F_IS_CAT ? "cat" : "plain",
- le32_to_cpu(handle->lgh_hdr->llh_count),
+ handle->lgh_hdr->llh_count,
handle->lgh_last_idx);
out += l;
remains -= l;
struct llog_logid plain;
char *endp;
- if (!(handle->lgh_hdr->llh_flags & cpu_to_le32(LLOG_F_IS_CAT)))
+ if (!(handle->lgh_hdr->llh_flags & LLOG_F_IS_CAT))
GOTO(out_close, err = -EINVAL);
err = str2logid(&plain, data->ioc_inlbuf2, data->ioc_inllen2);
case OBD_IOC_LLOG_REMOVE: {
struct llog_logid plain;
- if (!(handle->lgh_hdr->llh_flags & cpu_to_le32(LLOG_F_IS_CAT)))
+ if (!(handle->lgh_hdr->llh_flags & LLOG_F_IS_CAT))
GOTO(out_close, err = -EINVAL);
if (data->ioc_inlbuf2) {
out_close:
if (handle->lgh_hdr &&
- handle->lgh_hdr->llh_flags & cpu_to_le32(LLOG_F_IS_CAT))
+ handle->lgh_hdr->llh_flags & LLOG_F_IS_CAT)
llog_cat_put(handle);
else
llog_close(handle);
LASSERT(len >= LLOG_MIN_REC_SIZE && (len & 0x7) == 0);
- tail.lrt_len = rec.lrh_len = cpu_to_le32(len);
- tail.lrt_index = rec.lrh_index = cpu_to_le32(index);
+ tail.lrt_len = rec.lrh_len = len;
+ tail.lrt_index = rec.lrh_index = index;
rec.lrh_type = 0;
rc = fsfilt_write_record(obd, file, &rec, sizeof(rec), &file->f_pos, 0);
int rc;
struct llog_rec_tail end;
loff_t saved_off = file->f_pos;
- int buflen = le32_to_cpu(rec->lrh_len);
+ int buflen = rec->lrh_len;
ENTRY;
file->f_pos = off;
}
/* the buf case */
- rec->lrh_len = cpu_to_le32(sizeof(*rec) + buflen + sizeof(end));
+ rec->lrh_len = sizeof(*rec) + buflen + sizeof(end);
rc = fsfilt_write_record(obd, file, rec, sizeof(*rec), &file->f_pos, 0);
if (rc) {
CERROR("error writing log hdr: rc %d\n", rc);
if (rc)
CERROR("error reading log header\n");
- handle->lgh_last_idx = le32_to_cpu(handle->lgh_hdr->llh_tail.lrt_index);
+ handle->lgh_last_idx = handle->lgh_hdr->llh_tail.lrt_index;
handle->lgh_file->f_pos = handle->lgh_file->f_dentry->d_inode->i_size;
RETURN(rc);
void *buf, int idx)
{
struct llog_log_hdr *llh;
- int reclen = le32_to_cpu(rec->lrh_len), index, rc;
+ int reclen = rec->lrh_len, index, rc;
struct llog_rec_tail *lrt;
struct obd_device *obd;
struct file *file;
- loff_t offset;
size_t left;
ENTRY;
if (rc || idx == 0)
RETURN(rc);
- saved_offset = sizeof(*llh) + (idx-1)*le32_to_cpu(rec->lrh_len);
+ saved_offset = sizeof(*llh) + (idx-1)*rec->lrh_len;
rc = llog_lvfs_write_blob(obd, file, rec, buf, saved_offset);
if (rc == 0 && reccookie) {
reccookie->lgc_lgl = loghandle->lgh_id;
*/
left = LLOG_CHUNK_SIZE - (file->f_pos & (LLOG_CHUNK_SIZE - 1));
if (buf)
- reclen = sizeof(*rec) + le32_to_cpu(rec->lrh_len) +
+ reclen = sizeof(*rec) + rec->lrh_len +
sizeof(struct llog_rec_tail);
/* NOTE: padding is a record, but no bit is set */
loghandle->lgh_last_idx++;
index = loghandle->lgh_last_idx;
- rec->lrh_index = cpu_to_le32(index);
+ rec->lrh_index = index;
if (buf == NULL) {
- lrt = (void *)rec + le32_to_cpu(rec->lrh_len) - sizeof(*lrt);
+ lrt = (struct llog_rec_tail *)
+ ((char *)rec + rec->lrh_len - sizeof(*lrt));
lrt->lrt_len = rec->lrh_len;
lrt->lrt_index = rec->lrh_index;
}
CERROR("argh, index %u already set in log bitmap?\n", index);
LBUG(); /* should never happen */
}
- llh->llh_count = cpu_to_le32(le32_to_cpu(llh->llh_count) + 1);
- llh->llh_tail.lrt_index = cpu_to_le32(index);
+ llh->llh_count++;
+ llh->llh_tail.lrt_index = index;
- offset = 0;
rc = llog_lvfs_write_blob(obd, file, &llh->llh_hdr, NULL, 0);
if (rc)
RETURN(rc);
RETURN(rc);
CDEBUG(D_HA, "added record "LPX64": idx: %u, %u bytes\n",
- loghandle->lgh_id.lgl_oid, index, le32_to_cpu(rec->lrh_len));
+ loghandle->lgh_id.lgl_oid, index, rec->lrh_len);
if (rc == 0 && reccookie) {
reccookie->lgc_lgl = loghandle->lgh_id;
reccookie->lgc_index = index;
- if (le32_to_cpu(rec->lrh_type) == MDS_UNLINK_REC)
+ if (rec->lrh_type == MDS_UNLINK_REC)
reccookie->lgc_subsys = LLOG_UNLINK_ORIG_CTXT;
- else if (le32_to_cpu(rec->lrh_type) == OST_SZ_REC)
+ else if (rec->lrh_type == OST_SZ_REC)
reccookie->lgc_subsys = LLOG_SIZE_ORIG_CTXT;
- else if (le32_to_cpu(rec->lrh_type) == OST_RAID1_REC)
+ else if (rec->lrh_type == OST_RAID1_REC)
reccookie->lgc_subsys = LLOG_RD1_ORIG_CTXT;
else
reccookie->lgc_subsys = -1;
rc = 1;
}
- if (rc == 0 && le32_to_cpu(rec->lrh_type) == LLOG_GEN_REC)
+ if (rc == 0 && rec->lrh_type == LLOG_GEN_REC)
rc = 1;
RETURN(rc);
}
tail = buf + rc - sizeof(struct llog_rec_tail);
- *cur_idx = le32_to_cpu(tail->lrt_index);
+ *cur_idx = tail->lrt_index;
/* this shouldn't happen */
if (tail->lrt_index == 0) {
loghandle->lgh_id.lgl_ogen, *cur_offset);
RETURN(-EINVAL);
}
- if (le32_to_cpu(tail->lrt_index) < next_idx)
+ if (tail->lrt_index < next_idx)
continue;
/* sanity check that the start of the new buffer is no farther
* than the record that we wanted. This shouldn't happen. */
rec = buf;
- if (le32_to_cpu(rec->lrh_index) > next_idx) {
+ if (rec->lrh_index > next_idx) {
CERROR("missed desired record? %u > %u\n",
- le32_to_cpu(rec->lrh_index), next_idx);
+ rec->lrh_index, next_idx);
RETURN(-ENOENT);
}
RETURN(0);
int rc, index;
ENTRY;
- if (le32_to_cpu(rec->lrh_type) != LLOG_LOGID_MAGIC) {
+ if (rec->lrh_type != LLOG_LOGID_MAGIC) {
CERROR("invalid record in catalog\n");
RETURN(-EINVAL);
}
CWARN("processing log "LPX64":%x at index %u of catalog "LPX64"\n",
lir->lid_id.lgl_oid, lir->lid_id.lgl_ogen,
- le32_to_cpu(rec->lrh_index), cathandle->lgh_id.lgl_oid);
+ rec->lrh_index, cathandle->lgh_id.lgl_oid);
rc = llog_cat_id2handle(cathandle, &loghandle, &lir->lid_id);
if (rc) {
}
llh = loghandle->lgh_hdr;
- if ((le32_to_cpu(llh->llh_flags) & LLOG_F_ZAP_WHEN_EMPTY) &&
- (le32_to_cpu(llh->llh_count) == 1)) {
+ if ((llh->llh_flags & LLOG_F_ZAP_WHEN_EMPTY) &&
+ (llh->llh_count == 1)) {
rc = llog_destroy(loghandle);
if (rc)
CERROR("failure destroying log in postsetup: %d\n", rc);
if (rc == 0)
CWARN("cancel log "LPX64":%x at index %u of catalog "
LPX64"\n", lir->lid_id.lgl_oid,
- lir->lid_id.lgl_ogen, le32_to_cpu(rec->lrh_index),
+ lir->lid_id.lgl_ogen, rec->lrh_index,
cathandle->lgh_id.lgl_oid);
}
&cathandle->u.chd.chd_head,
u.phd.phd_entry) {
llh = loghandle->lgh_hdr;
- if ((le32_to_cpu(llh->llh_flags) &
+ if ((llh->llh_flags &
LLOG_F_ZAP_WHEN_EMPTY) &&
- (le32_to_cpu(llh->llh_count) == 1)) {
+ (llh->llh_count == 1)) {
rc = llog_destroy(loghandle);
if (rc)
CERROR("failure destroying log during "
RETURN(-ERANGE);
}
- if (le32_to_cpu(llh->lgh_hdr->llh_count) != num_recs) {
+ if (llh->lgh_hdr->llh_count != num_recs) {
CERROR("%s: handle->count is %d, expected %d after write\n",
- test, le32_to_cpu(llh->lgh_hdr->llh_count), num_recs);
+ test, llh->lgh_hdr->llh_count, num_recs);
RETURN(-ERANGE);
}
int num_recs = 1; /* 1 for the header */
ENTRY;
- lcr.lcr_hdr.lrh_len = lcr.lcr_tail.lrt_len = cpu_to_le32(sizeof(lcr));
- lcr.lcr_hdr.lrh_type = cpu_to_le32(OST_SZ_REC);
+ lcr.lcr_hdr.lrh_len = lcr.lcr_tail.lrt_len = sizeof(lcr);
+ lcr.lcr_hdr.lrh_type = OST_SZ_REC;
CWARN("3a: write one create_rec\n");
rc = llog_write_rec(llh, &lcr.lcr_hdr, NULL, 0, NULL, -1);
for (i = 0; i < 10; i++) {
struct llog_rec_hdr hdr;
char buf[8];
- hdr.lrh_len = cpu_to_le32(8);
- hdr.lrh_type = cpu_to_le32(OBD_CFG_REC);
+ hdr.lrh_len = 8;
+ hdr.lrh_type = OBD_CFG_REC;
memset(buf, 0, sizeof buf);
rc = llog_write_rec(llh, &hdr, NULL, 0, buf, -1);
if (rc) {
ENTRY;
- lmr.lmr_hdr.lrh_len = lmr.lmr_tail.lrt_len =
- cpu_to_le32(LLOG_MIN_REC_SIZE);
- lmr.lmr_hdr.lrh_type = cpu_to_le32(0xf00f00);
+ lmr.lmr_hdr.lrh_len = lmr.lmr_tail.lrt_len = LLOG_MIN_REC_SIZE;
+ lmr.lmr_hdr.lrh_type = 0xf00f00;
sprintf(name, "%x", llog_test_rand+1);
CWARN("4a: create a catalog log with name: %s\n", name);
if (buf == NULL)
GOTO(out, rc = -ENOMEM);
for (i = 0; i < 5; i++) {
- rec.lrh_len = cpu_to_le32(buflen);
- rec.lrh_type = cpu_to_le32(OBD_CFG_REC);
+ rec.lrh_len = buflen;
+ rec.lrh_type = OBD_CFG_REC;
rc = llog_cat_add_rec(cath, &rec, NULL, buf);
if (rc) {
CERROR("4e: write 5 records failed at #%d: %d\n",
{
struct llog_logid_rec *lir = (struct llog_logid_rec *)rec;
- if (le32_to_cpu(rec->lrh_type) != LLOG_LOGID_MAGIC) {
+ if (rec->lrh_type != LLOG_LOGID_MAGIC) {
CERROR("invalid record in catalog\n");
RETURN(-EINVAL);
}
CWARN("seeing record at index %d - "LPX64":%x in log "LPX64"\n",
- le32_to_cpu(rec->lrh_index), lir->lid_id.lgl_oid,
+ rec->lrh_index, lir->lid_id.lgl_oid,
lir->lid_id.lgl_ogen, llh->lgh_id.lgl_oid);
RETURN(0);
}
static int plain_print_cb(struct llog_handle *llh, struct llog_rec_hdr *rec,
void *data)
{
- if (!(le32_to_cpu(llh->lgh_hdr->llh_flags) & LLOG_F_IS_PLAIN)) {
+ if (!(llh->lgh_hdr->llh_flags & LLOG_F_IS_PLAIN)) {
CERROR("log is not plain\n");
RETURN(-EINVAL);
}
CWARN("seeing record at index %d in log "LPX64"\n",
- le32_to_cpu(rec->lrh_index), llh->lgh_id.lgl_oid);
+ rec->lrh_index, llh->lgh_id.lgl_oid);
RETURN(0);
}
struct llog_cookie cookie;
static int i = 0;
- if (!(le32_to_cpu(llh->lgh_hdr->llh_flags) & LLOG_F_IS_PLAIN)) {
+ if (!(llh->lgh_hdr->llh_flags & LLOG_F_IS_PLAIN)) {
CERROR("log is not plain\n");
RETURN(-EINVAL);
}
cookie.lgc_lgl = llh->lgh_id;
- cookie.lgc_index = le32_to_cpu(rec->lrh_index);
+ cookie.lgc_index = rec->lrh_index;
llog_cat_cancel_records(llh->u.phd.phd_cat_handle, 1, &cookie);
i++;
ENTRY;
- lmr.lmr_hdr.lrh_len = lmr.lmr_tail.lrt_len =
- cpu_to_le32(LLOG_MIN_REC_SIZE);
- lmr.lmr_hdr.lrh_type = cpu_to_le32(0xf00f00);
+ lmr.lmr_hdr.lrh_len = lmr.lmr_tail.lrt_len = LLOG_MIN_REC_SIZE;
+ lmr.lmr_hdr.lrh_type = 0xf00f00;
CWARN("5a: re-open catalog by id\n");
rc = llog_create(ctxt, &llh, &cat_logid, NULL);
struct portals_handle *h;
h = list_entry(tmp, struct portals_handle, h_link);
- CERROR("forcing cleanup for handle "LPX64"\n",
- h->h_cookie);
+ CERROR("force clean handle "LPX64" addr %p addref %p\n",
+ h->h_cookie, h, h->h_addref);
class_handle_unhash_nolock(h);
}
exp->exp_flags = flags;
spin_unlock_irqrestore(&exp->exp_lock, irqflags);
- if (!(flags & OBD_OPT_FORCE))
- filter_grant_sanity_check(obd, __FUNCTION__);
filter_grant_discard(exp);
/* Disconnect early so that clients can't keep using export */
/* Do this twice in case a BRW arrived between the first call and
* the class_export_unlink() call (bug 2663) */
- if (!(flags & OBD_OPT_FORCE))
- filter_grant_sanity_check(obd, __FUNCTION__);
filter_grant_discard(exp);
ldlm_cancel_locks_for_export(exp);
spin_unlock(&obd->obd_osfs_lock);
CDEBUG(D_SUPER | D_CACHE, "blocks cached "LPU64" granted "LPU64
- "pending "LPU64" free "LPU64" avail "LPU64"\n",
- filter->fo_tot_dirty >> blockbits,
- filter->fo_tot_granted >> blockbits,
- filter->fo_tot_pending >> blockbits,
- osfs->os_bfree, osfs->os_bavail);
+ " pending "LPU64" free "LPU64" avail "LPU64"\n",
+ filter->fo_tot_dirty, filter->fo_tot_granted,
+ filter->fo_tot_pending,
+ osfs->os_bfree << blockbits, osfs->os_bavail << blockbits);
filter_grant_sanity_check(obd, __FUNCTION__);
rc = l_wait_event(oscc->oscc_waitq, !oscc_recovering(oscc),
&lwi);
LASSERT(rc == 0 || rc == -ETIMEDOUT);
- if (rc == -ETIMEDOUT)
+ if (rc == -ETIMEDOUT) {
+ CDEBUG(D_HA, "%p: timed out waiting for recovery\n", oscc);
RETURN(rc);
+ }
CDEBUG(D_HA, "%p: oscc recovery over, waking up\n", oscc);
}
LASSERT((oa->o_valid & OBD_MD_FLFLAGS) &&
oa->o_flags == OBD_FL_DELORPHAN);
DEBUG_REQ(D_HA, request,
- "delorphan from OST integration; level == RECOVER");
- request->rq_send_state = LUSTRE_IMP_RECOVER;
+ "delorphan from OST integration");
}
rc = ptlrpc_queue_wait(request);
if (mode == LCK_PR) {
rc = ldlm_lock_match(obd->obd_namespace, *flags, &res_id, type,
policy, LCK_PW, lockh);
- if (rc == 1) {
+ if (rc == 1 && !(*flags & LDLM_FL_TEST_LOCK)) {
/* FIXME: This is not incredibly elegant, but it might
* be more elegant than adding another parameter to
* lock_match. I want a second opinion. */
unsigned int portal_subsystem_debug = ~0 - (S_PORTALS | S_QSWNAL | S_SOCKNAL |
S_GMNAL | S_IBNAL);
unsigned int portal_debug = (D_WARNING | D_DLMTRACE | D_ERROR | D_EMERG | D_HA |
- D_RPCTRACE | D_VFSTRACE);
+ D_RPCTRACE | D_VFSTRACE | D_MALLOC);
unsigned int portal_cerror = 1;
unsigned int portal_printk;
unsigned int portal_stack;
list_add_tail(&req->rq_list, &service->srv_request_queue);
service->srv_n_queued_reqs++;
- rqbd->rqbd_eventcount++;
/* NB everything can disappear under us once the request
* has been queued and we unlock, so do the wake now... */
}
imp->imp_remote_handle = request->rq_repmsg->handle;
IMPORT_SET_STATE(imp, LUSTRE_IMP_FULL);
- ptlrpc_pinger_add_import(imp);
GOTO(finish, rc = 0);
}
imp->imp_target_uuid.uuid,
imp->imp_connection->c_remote_uuid.uuid);
- ptlrpc_validate_import(imp);
rc = ptlrpc_resend(imp);
if (rc)
GOTO(out, rc);
IMPORT_SET_STATE(imp, LUSTRE_IMP_FULL);
+ ptlrpc_validate_import(imp);
}
if (imp->imp_state == LUSTRE_IMP_FULL) {
GOTO(out, rc =-EFAULT);
}
memcpy(handle->lgh_hdr, hdr, sizeof (*hdr));
- handle->lgh_last_idx = le32_to_cpu(handle->lgh_hdr->llh_tail.lrt_index);
+ handle->lgh_last_idx = handle->lgh_hdr->llh_tail.lrt_index;
out:
if (req)
#ifdef __KERNEL__
int llog_origin_connect(struct llog_ctxt *ctxt, int count,
- struct llog_logid *logid, struct llog_gen *gen)
+ struct llog_logid *logid, struct llog_gen *gen,
+ struct obd_uuid *uuid)
{
struct llog_gen_rec *lgr;
struct obd_import *imp;
ctxt = llog_get_context(obd, req_body->lgdc_ctxt_idx);
rc = llog_connect(ctxt, 1, &req_body->lgdc_logid,
- &req_body->lgdc_gen);
+ &req_body->lgdc_gen, NULL);
if (rc != 0)
CERROR("failed at llog_relp_connect\n");
}
ctxt = cbd->ctxt;
- if (!(cat->lgh_hdr->llh_flags & cpu_to_le32(LLOG_F_IS_CAT)))
+ if (!(cat->lgh_hdr->llh_flags & LLOG_F_IS_CAT))
RETURN(-EINVAL);
lir = (struct llog_logid_rec *)rec;
return rc;
}
-void ptlrpc_register_rqbd (struct ptlrpc_request_buffer_desc *rqbd)
+int ptlrpc_register_rqbd (struct ptlrpc_request_buffer_desc *rqbd)
{
struct ptlrpc_srv_ni *srv_ni = rqbd->rqbd_srv_ni;
struct ptlrpc_service *service = srv_ni->sni_service;
int rc;
ptl_md_t md;
ptl_handle_me_t me_h;
- unsigned long flags;
CDEBUG(D_NET, "PtlMEAttach: portal %d on %s h %lx."LPX64"\n",
service->srv_req_portal, srv_ni->sni_ni->pni_name,
srv_ni->sni_ni->pni_ni_h.nal_idx,
srv_ni->sni_ni->pni_ni_h.cookie);
+ if (OBD_FAIL_CHECK_ONCE(OBD_FAIL_PTLRPC_RQBD))
+ return (-ENOMEM);
+
rc = PtlMEAttach(srv_ni->sni_ni->pni_ni_h, service->srv_req_portal,
match_id, 0, ~0, PTL_UNLINK, PTL_INS_AFTER, &me_h);
if (rc != PTL_OK) {
CERROR("PtlMEAttach failed: %d\n", rc);
- GOTO (failed, NULL);
+ return (-ENOMEM);
}
LASSERT(rqbd->rqbd_refcount == 0);
md.user_ptr = &rqbd->rqbd_cbid;
md.eventq = srv_ni->sni_ni->pni_eq_h;
- spin_lock_irqsave (&service->srv_lock, flags);
- srv_ni->sni_nrqbd_receiving++;
- spin_unlock_irqrestore (&service->srv_lock, flags);
-
rc = PtlMDAttach(me_h, md, PTL_UNLINK, &rqbd->rqbd_md_h);
if (rc == PTL_OK)
- return;
-
- CERROR("PtlMDAttach failed: %d\n", rc);
+ return (0);
+
+ CERROR("PtlMDAttach failed: %d; \n", rc);
LASSERT (rc == PTL_NOSPACE);
rc = PtlMEUnlink (me_h);
LASSERT (rc == PTL_OK);
-
- spin_lock_irqsave (&service->srv_lock, flags);
- srv_ni->sni_nrqbd_receiving--;
- if (srv_ni->sni_nrqbd_receiving == 0) {
- /* This service is off-air on this interface because all
- * its request buffers are busy. Portals will have started
- * dropping incoming requests until more buffers get
- * posted */
- CERROR("All %s %s request buffers busy\n",
- service->srv_name, srv_ni->sni_ni->pni_name);
- }
- spin_unlock_irqrestore (&service->srv_lock, flags);
-
- failed:
- LBUG(); /* BUG 1191 */
- /* put req on a retry list? */
+ rqbd->rqbd_refcount = 0;
+
+ return (-ENOMEM);
}
(long long)LCK_EX);
LASSERTF(LCK_PW == 2, " found %lld\n",
(long long)LCK_PW);
- LASSERTF(LCK_PR == 3, " found %lld\n",
+ LASSERTF(LCK_PR == 4, " found %lld\n",
(long long)LCK_PR);
- LASSERTF(LCK_CW == 4, " found %lld\n",
+ LASSERTF(LCK_CW == 8, " found %lld\n",
(long long)LCK_CW);
- LASSERTF(LCK_CR == 5, " found %lld\n",
+ LASSERTF(LCK_CR == 16, " found %lld\n",
(long long)LCK_CR);
- LASSERTF(LCK_NL == 6, " found %lld\n",
+ LASSERTF(LCK_NL == 32, " found %lld\n",
(long long)LCK_NL);
LASSERTF(PTLBD_QUERY == 200, " found %lld\n",
(long long)PTLBD_QUERY);
LASSERT(ctxt);
+ if (ctxt->loc_imp == NULL) {
+ CWARN("no import for ctxt %p\n", ctxt);
+ RETURN(0);
+ }
+
if (count == 0 || cookies == NULL) {
down(&ctxt->loc_sem);
if (ctxt->loc_llcd == NULL || !(flags & OBD_LLOG_FL_SENDNOW))
}
int llog_repl_connect(struct llog_ctxt *ctxt, int count,
- struct llog_logid *logid, struct llog_gen *gen)
+ struct llog_logid *logid, struct llog_gen *gen,
+ struct obd_uuid *uuid)
{
struct llog_canceld_ctxt *llcd;
int rc;
}
spin_lock_irqsave (&svc->srv_lock, flags);
- list_add(&rqbd->rqbd_list, &srv_ni->sni_rqbds);
+ list_add(&rqbd->rqbd_list, &svc->srv_idle_rqbds);
svc->srv_nbufs++;
spin_unlock_irqrestore (&svc->srv_lock, flags);
(large->tv_usec - small->tv_usec);
}
+static int
+ptlrpc_server_post_idle_rqbds (struct ptlrpc_service *svc)
+{
+ struct ptlrpc_srv_ni *srv_ni;
+ struct ptlrpc_request_buffer_desc *rqbd;
+ unsigned long flags;
+ int rc;
+
+ spin_lock_irqsave(&svc->srv_lock, flags);
+ if (list_empty (&svc->srv_idle_rqbds)) {
+ spin_unlock_irqrestore(&svc->srv_lock, flags);
+ return (0);
+ }
+
+ rqbd = list_entry(svc->srv_idle_rqbds.next,
+ struct ptlrpc_request_buffer_desc,
+ rqbd_list);
+ list_del (&rqbd->rqbd_list);
+
+ /* assume we will post successfully */
+ srv_ni = rqbd->rqbd_srv_ni;
+ srv_ni->sni_nrqbd_receiving++;
+ list_add (&rqbd->rqbd_list, &srv_ni->sni_active_rqbds);
+
+ spin_unlock_irqrestore(&svc->srv_lock, flags);
+
+ rc = ptlrpc_register_rqbd(rqbd);
+ if (rc == 0)
+ return (1);
+
+ spin_lock_irqsave(&svc->srv_lock, flags);
+
+ srv_ni->sni_nrqbd_receiving--;
+ list_del(&rqbd->rqbd_list);
+ list_add_tail(&rqbd->rqbd_list, &svc->srv_idle_rqbds);
+
+ if (srv_ni->sni_nrqbd_receiving == 0) {
+ /* This service is off-air on this interface because all
+ * its request buffers are busy. Portals will have started
+ * dropping incoming requests until more buffers get
+ * posted */
+ CERROR("All %s %s request buffers busy\n",
+ svc->srv_name, srv_ni->sni_ni->pni_name);
+ }
+
+ spin_unlock_irqrestore (&svc->srv_lock, flags);
+
+ return (-1);
+}
+
struct ptlrpc_service *
ptlrpc_init_svc(int nbufs, int bufsize, int max_req_size,
int req_portal, int rep_portal,
service->srv_handler = handler;
INIT_LIST_HEAD(&service->srv_request_queue);
+ INIT_LIST_HEAD(&service->srv_idle_rqbds);
INIT_LIST_HEAD(&service->srv_reply_queue);
/* First initialise enough for early teardown */
srv_ni->sni_service = service;
srv_ni->sni_ni = &ptlrpc_interfaces[i];
- INIT_LIST_HEAD(&srv_ni->sni_rqbds);
+ INIT_LIST_HEAD(&srv_ni->sni_active_rqbds);
INIT_LIST_HEAD(&srv_ni->sni_active_replies);
}
srv_ni->sni_ni->pni_name);
GOTO(failed, NULL);
}
- ptlrpc_register_rqbd (rqbd);
+
+ /* We shouldn't be under memory pressure at
+ * startup, so fail if we can't post all our
+ * buffers at this time. */
+ if (ptlrpc_server_post_idle_rqbds(service) <= 0)
+ GOTO(failed, NULL);
}
}
return NULL;
}
+static void
+ptlrpc_server_free_request(struct ptlrpc_service *svc, struct ptlrpc_request *req)
+{
+ unsigned long flags;
+ int refcount;
+
+ spin_lock_irqsave(&svc->srv_lock, flags);
+ svc->srv_n_active_reqs--;
+ refcount = --(req->rq_rqbd->rqbd_refcount);
+ if (refcount == 0) {
+ /* request buffer is now idle */
+ list_del(&req->rq_rqbd->rqbd_list);
+ list_add_tail(&req->rq_rqbd->rqbd_list,
+ &svc->srv_idle_rqbds);
+ }
+ spin_unlock_irqrestore(&svc->srv_lock, flags);
+
+ ptlrpc_free_server_req(req);
+}
+
static int
ptlrpc_server_handle_request (struct ptlrpc_service *svc)
{
struct timeval work_start;
struct timeval work_end;
long timediff;
- int refcount;
int rc;
ENTRY;
}
}
- spin_lock_irqsave(&svc->srv_lock, flags);
- svc->srv_n_active_reqs--;
- refcount = --(request->rq_rqbd->rqbd_refcount);
- spin_unlock_irqrestore(&svc->srv_lock, flags);
-
- if (refcount == 0) {
- /* rqbd now idle: repost */
- ptlrpc_register_rqbd(request->rq_rqbd);
- }
-
- ptlrpc_free_server_req(request);
-
+ ptlrpc_server_free_request(svc, request);
+
RETURN(1);
}
liblustre_check_services (void *arg)
{
int did_something = 0;
+ int rc;
struct list_head *tmp, *nxt;
ENTRY;
svc->srv_nthreads++;
- while (ptlrpc_server_handle_reply (svc))
- did_something++;
-
- while (ptlrpc_server_handle_request (svc))
- did_something++;
-
+ do {
+ rc = ptlrpc_server_handle_reply(svc);
+ rc |= ptlrpc_server_handle_request(svc);
+ rc |= (ptlrpc_server_post_idle_rqbds(svc) > 0);
+ did_something |= rc;
+ } while (rc);
+
svc->srv_nthreads--;
}
reparent_to_init();
}
+static int
+ptlrpc_retry_rqbds(void *arg)
+{
+ struct ptlrpc_service *svc = (struct ptlrpc_service *)arg;
+
+ svc->srv_rqbd_timeout = 0;
+ return (-ETIMEDOUT);
+}
+
static int ptlrpc_main(void *arg)
{
struct ptlrpc_svc_data *data = (struct ptlrpc_svc_data *)arg;
while ((thread->t_flags & SVC_STOPPING) == 0 ||
svc->srv_n_difficult_replies != 0) {
/* Don't exit while there are replies to be handled */
- struct l_wait_info lwi = { 0 };
-
+ struct l_wait_info lwi = LWI_TIMEOUT(svc->srv_rqbd_timeout,
+ ptlrpc_retry_rqbds, svc);
+
l_wait_event_exclusive (svc->srv_waitq,
- (thread->t_flags & SVC_STOPPING) != 0 ||
+ ((thread->t_flags & SVC_STOPPING) != 0 &&
+ svc->srv_n_difficult_replies == 0) ||
+ (!list_empty(&svc->srv_idle_rqbds) &&
+ svc->srv_rqbd_timeout == 0) ||
!list_empty (&svc->srv_reply_queue) ||
(!list_empty (&svc->srv_request_queue) &&
(svc->srv_n_difficult_replies == 0 ||
(svc->srv_n_difficult_replies == 0 ||
svc->srv_n_active_reqs < (svc->srv_nthreads - 1)))
ptlrpc_server_handle_request (svc);
+
+ if (!list_empty(&svc->srv_idle_rqbds) &&
+ ptlrpc_server_post_idle_rqbds(svc) < 0) {
+ /* I just failed to repost request buffers. Wait
+ * for a timeout (unless something else happens)
+ * before I try again */
+ svc->srv_rqbd_timeout = HZ/10;
+ }
}
spin_lock_irqsave(&svc->srv_lock, flags);
service->srv_name, srv_ni->sni_ni->pni_name);
/* Unlink all the request buffers. This forces a 'final'
- * event with its 'unlink' flag set for each rqbd */
- list_for_each(tmp, &srv_ni->sni_rqbds) {
+ * event with its 'unlink' flag set for each posted rqbd */
+ list_for_each(tmp, &srv_ni->sni_active_rqbds) {
struct ptlrpc_request_buffer_desc *rqbd =
list_entry(tmp, struct ptlrpc_request_buffer_desc,
rqbd_list);
list_del(&req->rq_list);
service->srv_n_queued_reqs--;
- req->rq_rqbd->rqbd_refcount--;
-
- ptlrpc_free_server_req(req);
+ service->srv_n_active_reqs++;
+
+ ptlrpc_server_free_request(service, req);
}
LASSERT(service->srv_n_queued_reqs == 0);
+ LASSERT(service->srv_n_active_reqs == 0);
- /* Now free all the request buffers since nothing references them
- * any more... */
for (i = 0; i < ptlrpc_ninterfaces; i++) {
srv_ni = &service->srv_interfaces[i];
+ LASSERT(list_empty(&srv_ni->sni_active_rqbds));
+ }
- while (!list_empty(&srv_ni->sni_rqbds)) {
- struct ptlrpc_request_buffer_desc *rqbd =
- list_entry(srv_ni->sni_rqbds.next,
- struct ptlrpc_request_buffer_desc,
- rqbd_list);
+ /* Now free all the request buffers since nothing references them
+ * any more... */
+ while (!list_empty(&service->srv_idle_rqbds)) {
+ struct ptlrpc_request_buffer_desc *rqbd =
+ list_entry(service->srv_idle_rqbds.next,
+ struct ptlrpc_request_buffer_desc,
+ rqbd_list);
- ptlrpc_free_rqbd(rqbd);
- }
+ ptlrpc_free_rqbd(rqbd);
}
/* wait for all outstanding replies to complete (they were
# lustre.spec
-%define version v1_2_0pre5
+%define version b_smallfix8
%define kversion @LINUXRELEASE@
%define linuxdir @LINUX@
%define enable_doc @ENABLE_DOC@
}
run_test 6 "open1, open2, unlink |X| close1 [fail mds] close2"
-
if [ "$ONLY" != "setup" ]; then
equals_msg test complete, cleaning up
cleanup
build_test_filter
-rm -f ostactive
+SETUP=${SETUP:-"setup"}
+CLEANUP=${CLEANUP:-"cleanup"}
-gen_config
+setup() {
+ gen_config
-start ost --reformat $OSTLCONFARGS
-
-[ "$DAEMONFILE" ] && $LCTL debug_daemon start $DAEMONFILE $DAEMONSIZE
-start mds --reformat $MDSLCONFARGS
-zconf_mount `hostname` $MOUNT
+ start ost --reformat $OSTLCONFARGS
+ [ "$DAEMONFILE" ] && $LCTL debug_daemon start $DAEMONFILE $DAEMONSIZE
+ start mds --reformat $MDSLCONFARGS
+ zconf_mount `hostname` $MOUNT
+}
mkdir -p $DIR
+$SETUP
+
test_0() {
fail ost
cp /etc/profile $DIR/$tfile
sync
diff /etc/profile $DIR/$tfile
+ rm -f $DIR/$tfile
}
run_test 0 "empty replay"
date > $DIR/$tfile
fail ost
$CHECKSTAT -t file $DIR/$tfile || return 1
+ rm -f $DIR/$tfile
}
run_test 1 "touch"
for i in `seq 10`; do
grep -q "tag-$i" $DIR/$tfile-$i || error "f2-$i"
done
+ rm -f $DIR/$tfile-*
}
run_test 2 "|x| 10 open(O_CREAT)s"
sleep 10
fail ost
wait $PID || return 1
+ rm -f $DIR/$tfile
}
run_test 5 "Fail OST during iozone"
+kbytesfree() {
+ cat /proc/fs/lustre/osc/OSC_*MNT*/kbytesfree | awk '{total+=$1} END {print total}'
+}
+
+test_6() {
+ f=$DIR/$tfile
+ before=`kbytesfree`
+ dd if=/dev/urandom bs=1024 count=5120 of=$f
+#define OBD_FAIL_MDS_REINT_NET_REP 0x119
+ do_facet mds "sysctl -w lustre.fail_loc=0x80000119"
+ sync
+ after_dd=`kbytesfree`
+ echo "before: $before after_dd: $after_dd"
+ (( before > after_dd )) || return 1
+ rm -f $f
+ fail ost
+ $CHECKSTAT -t file $f && return 2 || true
+ sync
+ # let the delete happen
+ sleep 2
+ after=`kbytesfree`
+ echo "before: $before after: $after"
+ (( before == after )) || return 3
+}
+run_test 6 "Fail OST before obd_destroy"
+
+test_7() {
+ f=$DIR/$tfile
+ before=`kbytesfree`
+ dd if=/dev/urandom bs=1024 count=5120 of=$f
+ sync
+ after_dd=`kbytesfree`
+ echo "before: $before after_dd: $after_dd"
+ (( before > after_dd )) || return 1
+ replay_barrier ost
+ rm -f $f
+ fail ost
+ $CHECKSTAT -t file $f && return 2 || true
+ sync
+ # let the delete happen
+ sleep 2
+ after=`kbytesfree`
+ echo "before: $before after: $after"
+ (( before == after )) || return 3
+}
+run_test 7 "Fail OST before obd_destroy"
+
equals_msg test complete, cleaning up
-cleanup
+$CLEANUP
}
run_test 40 "cause recovery in ptlrpc, ensure IO continues"
+
#b=2814
# make sure that a read to one osc doesn't try to double-unlock its page just
# because another osc is invalid. trigger_group_io used to mistakenly return
# on valid oscs. This was fatal if the caller was ll_readpage who unlocked
# the page, guarnateeing that the unlock from the RPC completion would
# assert on trying to unlock the unlocked page.
-test_41(){
- local f=$MOUNT/t42
+test_41() {
+ local f=$MOUNT/$tfile
# make sure the start of the file is ost1
lfs setstripe $f $((128 * 1024)) 0 0
do_facet client dd if=/dev/zero of=$f bs=4k count=1 || return 3
cancel_lru_locks OSC
# fail ost2 and read from ost1
- local osc2_dev=`../utils/lctl device_list | \
+ local osc2_dev=`$LCTL device_list | \
awk '(/ost2.*client_facet/){print $4}' `
- lctl --device "\$"$osc2_dev deactivate
+ $LCTL --device %$osc2_dev deactivate
do_facet client dd if=$f of=/dev/null bs=4k count=1 || return 3
- lctl --device "\$"$osc2_dev activate
+ $LCTL --device %$osc2_dev activate
return 0
}
run_test 41 "read from a valid osc while other oscs are invalid"
+# test MDS recovery after ost failure
+test_42() {
+ createmany -o $DIR/$tfile-%d 800
+ replay_barrier ost
+ unlinkmany $DIR/$tfile-%d 0 400
+ facet_failover ost
+
+ # osc is evicted after
+ df $MOUNT && return 1
+ df $MOUNT || return 2
+ echo wait for MDS to timeout and recover
+ sleep $((TIMEOUT * 2))
+ unlinkmany $DIR/$tfile-%d 400 400
+ $CHECKSTAT -t file $DIR/$tfile-* && return 1 || true
+}
+run_test 42 "recoery after ost failure"
+
equals_msg test complete, cleaning up
$CLEANUP
while sleep 1 ; do
echo '-----------------------'
egrep "ll_|ldlm|filp|dentry|inode|portals|size-[0-9]* " /proc/slabinfo
+ cat /proc/meminfo
done
}
run_test 27j "lstripe with bad stripe offset (should return error)"
+test_27k() { # bug 2844
+ FILE=$DIR/d27/f27k
+ LL_MAX_BLKSIZE=$((4 * 1024 * 1024))
+ [ ! -d $DIR/d27 ] && mkdir -p $DIR/d27
+ $LSTRIPE $FILE 67108864 -1 0 || error "lstripe failed"
+ BLKSIZE=`stat $FILE | awk '/IO Block:/ { print $7 }'`
+ [ $BLKSIZE -le $LL_MAX_BLKSIZE ] || error "$BLKSIZE > $LL_MAX_BLKSIZE"
+ dd if=/dev/zero of=$FILE bs=4k count=1
+ BLKSIZE=`stat $FILE | awk '/IO Block:/ { print $7 }'`
+ [ $BLKSIZE -le $LL_MAX_BLKSIZE ] || error "$BLKSIZE > $LL_MAX_BLKSIZE"
+}
+run_test 27k "limit i_blksize for broken user apps ============="
+
test_28() {
mkdir $DIR/d28
$CREATETEST $DIR/d28/ct || error
set -e
ONLY=${ONLY:-"$*"}
-# bug number for skipped test: 1557
-ALWAYS_EXCEPT=${ALWAYS_EXCEPT:-"8"}
+# bug number for skipped test: 1768 1557
+ALWAYS_EXCEPT=${ALWAYS_EXCEPT:-"4 8 14b"}
# UPDATE THE COMMENT ABOVE WITH BUG NUMBERS WHEN CHANGING ALWAYS_EXCEPT!
[ "$ALWAYS_EXCEPT$EXCEPT" ] && echo "Skipping tests: $ALWAYS_EXCEPT $EXCEPT"
# Test interface
error() {
echo "${TESTSUITE}: **** FAIL:" $@
+ log "FAIL: $@"
exit 1
}
printf '===== %s %.*s\n' "$msg" $suffixlen $EQUALS
}
+log() {
+ echo "$*"
+ lctl mark "$*" 2> /dev/null || true
+}
+
run_one() {
testnum=$1
message=$2
# Pretty tests run faster.
equals_msg $testnum: $message
+ log "== test $1: $2"
test_${testnum} || error "test_$testnum failed with $?"
}
return rc;
}
} else if (lmd->lmd_nal == QSWNAL) {
-#if MULTIRAIL_EKC
char *pfiles[] = {"/proc/qsnet/elan3/device0/position",
"/proc/qsnet/elan4/device0/position",
+ "/proc/elan/device0/position",
NULL};
-#else
- char *pfiles[] = {"/proc/elan/device0/position",
- NULL};
-#endif
int i = 0;
do {
rc = get_local_elan_id(pfiles[i], buf);
- } while (rc != 0 &&
- pfiles[++i] != NULL);
+ } while (rc != 0 && pfiles[++i] != NULL);
if (rc != 0) {
- fprintf(stderr, "mount: can't read elan ID"
- " from /proc\n");
+ fprintf(stderr,
+ "mount: can't read Elan ID from /proc\n");
return -1;
}
}
if (ptl_parse_nid (&nid, buf) != 0) {
- fprintf (stderr, "mount: can't parse NID %s\n",
- buf);
+ fprintf (stderr, "mount: can't parse NID %s\n", buf);
return (-1);
}
(long long)LCK_EX);
LASSERTF(LCK_PW == 2, " found %lld\n",
(long long)LCK_PW);
- LASSERTF(LCK_PR == 3, " found %lld\n",
+ LASSERTF(LCK_PR == 4, " found %lld\n",
(long long)LCK_PR);
- LASSERTF(LCK_CW == 4, " found %lld\n",
+ LASSERTF(LCK_CW == 8, " found %lld\n",
(long long)LCK_CW);
- LASSERTF(LCK_CR == 5, " found %lld\n",
+ LASSERTF(LCK_CR == 16, " found %lld\n",
(long long)LCK_CR);
- LASSERTF(LCK_NL == 6, " found %lld\n",
+ LASSERTF(LCK_NL == 32, " found %lld\n",
(long long)LCK_NL);
LASSERTF(PTLBD_QUERY == 200, " found %lld\n",
(long long)PTLBD_QUERY);