extern unsigned int portal_printk;
extern unsigned int portal_cerror;
/* Debugging subsystems (32 bits, non-overlapping) */
-#define S_UNDEFINED (1 << 0)
-#define S_MDC (1 << 1)
-#define S_MDS (1 << 2)
-#define S_OSC (1 << 3)
-#define S_OST (1 << 4)
-#define S_CLASS (1 << 5)
-#define S_LOG (1 << 6)
-#define S_LLITE (1 << 7)
-#define S_RPC (1 << 8)
-#define S_MGMT (1 << 9)
-#define S_PORTALS (1 << 10)
-#define S_SOCKNAL (1 << 11)
-#define S_QSWNAL (1 << 12)
-#define S_PINGER (1 << 13)
-#define S_FILTER (1 << 14)
-#define S_PTLBD (1 << 15)
-#define S_ECHO (1 << 16)
-#define S_LDLM (1 << 17)
-#define S_LOV (1 << 18)
-#define S_GMNAL (1 << 19)
-#define S_PTLROUTER (1 << 20)
-#define S_COBD (1 << 21)
-#define S_IBNAL (1 << 22)
+#define S_UNDEFINED 0x00000001
+#define S_MDC 0x00000002
+#define S_MDS 0x00000004
+#define S_OSC 0x00000008
+#define S_OST 0x00000010
+#define S_CLASS 0x00000020
+#define S_LOG 0x00000040
+#define S_LLITE 0x00000080
+#define S_RPC 0x00000100
+#define S_MGMT 0x00000200
+#define S_PORTALS 0x00000400
+#define S_SOCKNAL 0x00000800
+#define S_QSWNAL 0x00001000
+#define S_PINGER 0x00002000
+#define S_FILTER 0x00004000
+#define S_PTLBD 0x00008000
+#define S_ECHO 0x00010000
+#define S_LDLM 0x00020000
+#define S_LOV 0x00040000
+#define S_GMNAL 0x00080000
+#define S_PTLROUTER 0x00100000
+#define S_COBD 0x00200000
+#define S_IBNAL 0x00400000
/* If you change these values, please keep portals/utils/debug.c
* up to date! */
/* Debugging masks (32 bits, non-overlapping) */
-#define D_TRACE (1 << 0) /* ENTRY/EXIT markers */
-#define D_INODE (1 << 1)
-#define D_SUPER (1 << 2)
-#define D_EXT2 (1 << 3) /* anything from ext2_debug */
-#define D_MALLOC (1 << 4) /* print malloc, free information */
-#define D_CACHE (1 << 5) /* cache-related items */
-#define D_INFO (1 << 6) /* general information */
-#define D_IOCTL (1 << 7) /* ioctl related information */
-#define D_BLOCKS (1 << 8) /* ext2 block allocation */
-#define D_NET (1 << 9) /* network communications */
-#define D_WARNING (1 << 10) /* CWARN(...) == CDEBUG (D_WARNING, ...) */
-#define D_BUFFS (1 << 11)
-#define D_OTHER (1 << 12)
-#define D_DENTRY (1 << 13)
-#define D_PORTALS (1 << 14) /* ENTRY/EXIT markers */
-#define D_PAGE (1 << 15) /* bulk page handling */
-#define D_DLMTRACE (1 << 16)
-#define D_ERROR (1 << 17) /* CERROR(...) == CDEBUG (D_ERROR, ...) */
-#define D_EMERG (1 << 18) /* CEMERG(...) == CDEBUG (D_EMERG, ...) */
-#define D_HA (1 << 19) /* recovery and failover */
-#define D_RPCTRACE (1 << 20) /* for distributed debugging */
-#define D_VFSTRACE (1 << 21)
-#define D_READA (1 << 22) /* read-ahead */
+#define D_TRACE 0x00000001 /* ENTRY/EXIT markers */
+#define D_INODE 0x00000002
+#define D_SUPER 0x00000004
+#define D_EXT2 0x00000008 /* anything from ext2_debug */
+#define D_MALLOC 0x00000010 /* print malloc, free information */
+#define D_CACHE 0x00000020 /* cache-related items */
+#define D_INFO 0x00000040 /* general information */
+#define D_IOCTL 0x00000080 /* ioctl related information */
+#define D_BLOCKS 0x00000100 /* ext2 block allocation */
+#define D_NET 0x00000200 /* network communications */
+#define D_WARNING 0x00000400 /* CWARN(...) == CDEBUG (D_WARNING, ...) */
+#define D_BUFFS 0x00000800
+#define D_OTHER 0x00001000
+#define D_DENTRY 0x00002000
+#define D_PORTALS 0x00004000 /* ENTRY/EXIT markers */
+#define D_PAGE 0x00008000 /* bulk page handling */
+#define D_DLMTRACE 0x00010000
+#define D_ERROR 0x00020000 /* CERROR(...) == CDEBUG (D_ERROR, ...) */
+#define D_EMERG 0x00040000 /* CEMERG(...) == CDEBUG (D_EMERG, ...) */
+#define D_HA 0x00080000 /* recovery and failover */
+#define D_RPCTRACE 0x00100000 /* for distributed debugging */
+#define D_VFSTRACE 0x00200000
+#define D_READA 0x00400000 /* read-ahead */
#ifdef __KERNEL__
# include <linux/sched.h> /* THREAD_SIZE */
#define LASSERTF(cond, fmt...) do { } while (0)
#endif
+#ifdef CONFIG_SMP
+#define LASSERT_SPIN_LOCKED(lock) LASSERT(spin_is_locked(lock))
+#else
+#define LASSERT_SPIN_LOCKED(lock) do {} while(0)
+#endif
+
#ifdef __arch_um__
#define LBUG_WITH_LOC(file, func, line) \
do { \
}
ksock_conn_t *
-ksocknal_find_conn_locked (ksock_tx_t *tx, ksock_peer_t *peer)
+ksocknal_find_conn_locked (ksock_tx_t *tx, ksock_peer_t *peer)
{
struct list_head *tmp;
ksock_conn_t *typed = NULL;
int tnob = 0;
ksock_conn_t *fallback = NULL;
int fnob = 0;
-
+
/* Find the conn with the shortest tx queue */
list_for_each (tmp, &peer->ksnp_conns) {
ksock_conn_t *c = list_entry(tmp, ksock_conn_t, ksnc_list);
int nob = atomic_read(&c->ksnc_tx_nob) +
- c->ksnc_sock->sk->sk_wmem_queued;
+ c->ksnc_sock->sk->sk_wmem_queued;
LASSERT (!c->ksnc_closing);
unsigned int portal_subsystem_debug = ~0 - (S_PORTALS | S_QSWNAL | S_SOCKNAL |
S_GMNAL | S_IBNAL);
unsigned int portal_debug = (D_WARNING | D_DLMTRACE | D_ERROR | D_EMERG | D_HA |
- D_RPCTRACE | D_VFSTRACE);
+ D_RPCTRACE | D_VFSTRACE | D_MALLOC);
unsigned int portal_cerror = 1;
unsigned int portal_printk;
unsigned int portal_stack;
* side, and collects the result
*/
static int procbridge_forward(nal_t *n, int id, void *args, size_t args_len,
- void *ret, ptl_size_t ret_len)
+ void *ret, size_t ret_len)
{
bridge b = (bridge) n->nal_data;
* side, and collects the result
*/
static int procbridge_forward(nal_t *n, int id, void *args, size_t args_len,
- void *ret, ptl_size_t ret_len)
+ void *ret, size_t ret_len)
{
bridge b = (bridge) n->nal_data;
static const char *portal_debug_subsystems[] =
{"undefined", "mdc", "mds", "osc", "ost", "class", "log", "llite",
"rpc", "mgmt", "portals", "socknal", "qswnal", "pinger", "filter",
- "ptlbd", "echo", "ldlm", "lov", "gmnal", "router", "cobd", NULL};
+ "ptlbd", "echo", "ldlm", "lov", "gmnal", "router", "cobd", "ibnal",
+ NULL};
static const char *portal_debug_masks[] =
{"trace", "inode", "super", "ext2", "malloc", "cache", "info", "ioctl",
"blocks", "net", "warning", "buffs", "other", "dentry", "portals",
-tbd Cluster File Systems, Inc. <info@clusterfs.com>
+2004-03-04 Cluster File Systems, Inc. <info@clusterfs.com>
* version 1.2.0
* bug fixes
- account for cache space usage on clients to avoid data loss (974)
- ptlrpc cleanup bug (2710)
- mds timeout on local locks (2588)
- namespace lock held during RPCs (2431)
+ - handle interrupted sync write properly (2503)
- don't try to handle a message that hasn't been replied to (2699)
- client assert failure during cleanup after abort recovery (2701)
- leak mdc device after failed mount (2712)
- don't delete objects on OST if given a bogus objid from MDS (2751)
- handle large client PAGE_SIZE readdir on small PAGE_SIZE MDS (2777)
- if rq_no_resend, then timeout request after recovery (2432)
+ - fix MDS llog_logid record size, 64-bit array alignment (2733)
+ - don't call usermode_helper from ptlrpcd, DEFAULT upcall (2773)
+ - put magic in mount.lustre data, check for bad/NULL mount data (2529)
+ - MDS recovery shouldn't delete objects that it has given out (2730)
+ - if enqueue arrives after completion, don't clobber LVB (2819)
+ - don't unlock pages twice when trigger_group_io returns error (2814)
+ - don't deref NULL rq_repmsg if ldlm_handle_enqueue failed (2822)
+ - don't write pages to disk if there was an error (1450)
+ - don't ping imports that have recovery disabled (2676)
+ - take buffered bytes into account when balancing socknal conn (2817)
+ - hold a DLM lock over readdir always, use truncate_inode_pages (2706)
+ - reconnect unlink llog connection after MDS reconnects to OST (2816)
+ - remove little-endian swabbing of llog records (1987)
+ - set/limit i_blksize to LL_MAX_BLKSIZE on client (2884)
+ - retry reposting request buffers if they fail (1191)
+ - grow extent at grant time to avoid granting a revoked lock (2809)
+ - lock revoke doesn't evict page if covered by a second lock (2765)
+ - disable VM readahead to avoid reading outside lock extents (2805)
* miscellania
- return LL_SUPER_MAGIC from statfs for the filesystem type (1972)
+ - updated kernel patches for hp-2.4.20 kernel (2681)
2004-02-07 Cluster File Systems, Inc. <info@clusterfs.com>
* version 1.0.4
static int cobd_commitrw(int cmd, struct obd_export *exp, struct obdo *oa,
int objcount, struct obd_ioobj *obj,
int niocount, struct niobuf_local *local,
- struct obd_trans_info *oti)
+ struct obd_trans_info *oti, int rc)
{
struct obd_export *cobd_exp;
- int rc;
if (exp->exp_obd == NULL)
return -EINVAL;
return -EOPNOTSUPP;
cobd_exp = exp->exp_obd->u.cobd.cobd_target_exp;
- rc = obd_commitrw(cmd, cobd_exp, oa, objcount, obj,niocount,local,oti);
+ rc = obd_commitrw(cmd, cobd_exp, oa, objcount, obj, niocount, local,
+ oti, rc);
return rc;
}
struct cache_obd *cobd;
if (obd == NULL) {
- CERROR("invalid client cookie "LPX64"\n",
+ CERROR("invalid client cookie "LPX64"\n",
exp->exp_handle.h_cookie);
return -EINVAL;
}
/* Passed by mount */
struct lustre_mount_data {
+ uint32_t lmd_magic;
uint32_t lmd_version;
uint64_t lmd_local_nid;
uint64_t lmd_server_nid;
#ifdef __KERNEL__
# include <linux/proc_fs.h>
-#endif
+#endif
#include <linux/lustre_lib.h>
#include <linux/lustre_net.h>
#define LDLM_CB_BLOCKING 1
#define LDLM_CB_CANCELING 2
-#define L2B(c) (1 << c)
-
/* compatibility matrix */
-#define LCK_COMPAT_EX L2B(LCK_NL)
-#define LCK_COMPAT_PW (LCK_COMPAT_EX | L2B(LCK_CR))
-#define LCK_COMPAT_PR (LCK_COMPAT_PW | L2B(LCK_PR))
-#define LCK_COMPAT_CW (LCK_COMPAT_PW | L2B(LCK_CW))
-#define LCK_COMPAT_CR (LCK_COMPAT_CW | L2B(LCK_PR) | L2B(LCK_PW))
-#define LCK_COMPAT_NL (LCK_COMPAT_CR | L2B(LCK_EX))
+#define LCK_COMPAT_EX LCK_NL
+#define LCK_COMPAT_PW (LCK_COMPAT_EX | LCK_CR)
+#define LCK_COMPAT_PR (LCK_COMPAT_PW | LCK_PR)
+#define LCK_COMPAT_CW (LCK_COMPAT_PW | LCK_CW)
+#define LCK_COMPAT_CR (LCK_COMPAT_CW | LCK_PR | LCK_PW)
+#define LCK_COMPAT_NL (LCK_COMPAT_CR | LCK_EX)
static ldlm_mode_t lck_compat_array[] = {
[LCK_EX] LCK_COMPAT_EX,
[LCK_NL] LCK_COMPAT_NL
};
-static inline int lockmode_compat(ldlm_mode_t exist, ldlm_mode_t new)
+static inline void lockmode_verify(ldlm_mode_t mode)
{
- LASSERT(exist >= LCK_EX && exist <= LCK_NL);
- LASSERT(new >= LCK_EX && new <= LCK_NL);
+ LASSERT(mode >= LCK_EX && mode <= LCK_NL);
+}
- return (lck_compat_array[exist] & L2B(new));
+static inline int lockmode_compat(ldlm_mode_t exist, ldlm_mode_t new)
+{
+ return (lck_compat_array[exist] & new);
}
/*
-
*/
-struct ldlm_lock;
-struct ldlm_resource;
+struct ldlm_lock;
+struct ldlm_resource;
struct ldlm_namespace;
typedef int (*ldlm_res_policy)(struct ldlm_namespace *, struct ldlm_lock **,
struct list_head ns_root_list; /* all root resources in ns */
struct lustre_lock ns_lock; /* protects hash, refcount, list */
struct list_head ns_list_chain; /* position in global NS list */
- /*
+ /*
struct proc_dir_entry *ns_proc_dir;
*/
struct list_head l_lru;
struct list_head l_res_link; // position in one of three res lists
struct list_head l_export_chain; // per-export chain of locks
- struct list_head l_pending_chain; // locks with callbacks pending
- unsigned long l_callback_timeout;
ldlm_mode_t l_req_mode;
ldlm_mode_t l_granted_mode;
ldlm_completion_callback l_completion_ast;
ldlm_blocking_callback l_blocking_ast;
ldlm_glimpse_callback l_glimpse_ast;
- void *l_ast_data;
struct obd_export *l_export;
- /* XXX phil can fix this, I'm sure */
struct obd_export *l_conn_export;
-// struct lustre_handle *l_connh;
__u32 l_flags;
struct lustre_handle l_remote_handle;
ldlm_policy_data_t l_policy_data;
- /* This LVB is used only on the client side, as temporary storage for
- * a lock value block received during an enqueue */
- __u32 l_lvb_len;
- void *l_lvb_data;
- void *l_lvb_swabber;
-
__u32 l_readers;
__u32 l_writers;
__u8 l_destroyed;
* on this waitq to learn when it becomes granted. */
wait_queue_head_t l_waitq;
struct timeval l_enqueued_time;
- unsigned long l_last_used; /* jiffies */
-};
+ unsigned long l_last_used; /* jiffies */
+ struct ldlm_extent l_req_extent;
+
+ /* Client-side-only members */
+ __u32 l_lvb_len; /* temporary storage for */
+ void *l_lvb_data; /* an LVB received during */
+ void *l_lvb_swabber; /* an enqueue */
+ void *l_ast_data;
+
+ /* Server-side-only members */
+ struct list_head l_pending_chain; /* callbacks pending */
+ unsigned long l_callback_timeout;
+};
#define LDLM_PLAIN 10
#define LDLM_EXTENT 11
CDEBUG(level, "### " format \
" ns: %s lock: %p/"LPX64" lrc: %d/%d,%d mode: %s/%s " \
"res: "LPU64"/"LPU64" rrc: %d type: %s ["LPU64"->"LPU64\
- "] flags: %x remote: "LPX64" expref: %d\n" , ## a, \
+ "] (req "LPU64"->"LPU64") flags: %x remote: "LPX64 \
+ " expref: %d\n" , ## a, \
lock->l_resource->lr_namespace->ns_name, lock, \
lock->l_handle.h_cookie, atomic_read(&lock->l_refc), \
lock->l_readers, lock->l_writers, \
ldlm_typename[lock->l_resource->lr_type], \
lock->l_policy_data.l_extent.start, \
lock->l_policy_data.l_extent.end, \
+ lock->l_req_extent.start, lock->l_req_extent.end, \
lock->l_flags, lock->l_remote_handle.cookie, \
lock->l_export ? \
atomic_read(&lock->l_export->exp_refcount) : -99); \
struct osc_creator {
spinlock_t oscc_lock;
struct list_head oscc_list;
- struct obd_export *oscc_exp;
+ struct obd_device *oscc_obd;
obd_id oscc_last_id;//last available pre-created object
obd_id oscc_next_id;// what object id to give out next
int oscc_initial_create_count;
wait_queue_head_t oscc_waitq; /* creating procs wait on this */
};
-struct osc_export_data {
- struct osc_creator oed_oscc;
-};
-
struct ldlm_export_data {
struct list_head led_held_locks; /* protected by namespace lock */
};
struct mds_export_data eu_mds_data;
struct filter_export_data eu_filter_data;
struct ec_export_data eu_ec_data;
- struct osc_export_data eu_osc_data;
} u;
};
#define exp_mds_data u.eu_mds_data
#define exp_lov_data u.eu_lov_data
#define exp_filter_data u.eu_filter_data
-#define exp_osc_data u.eu_osc_data
#define exp_ec_data u.eu_ec_data
extern struct obd_export *class_conn2export(struct lustre_handle *conn);
void ptlrpc_wake_delayed(struct obd_import *imp);
int ptlrpc_recover_import(struct obd_import *imp, char *new_uuid);
int ptlrpc_set_import_active(struct obd_import *imp, int active);
+void ptlrpc_invalidate_import(struct obd_import *imp);
void ptlrpc_fail_import(struct obd_import *imp, int generation);
void ptlrpc_fail_export(struct obd_export *exp);
struct ost_lvb {
__u64 lvb_size;
- __u64 lvb_time;
+ __u64 lvb_mtime;
+ __u64 lvb_atime;
+ __u64 lvb_ctime;
+ __u64 lvb_blocks;
};
extern void lustre_swab_ost_lvb(struct ost_lvb *);
/* lock types */
typedef enum {
LCK_EX = 1,
- LCK_PW,
- LCK_PR,
- LCK_CW,
- LCK_CR,
- LCK_NL
+ LCK_PW = 2,
+ LCK_PR = 4,
+ LCK_CW = 8,
+ LCK_CR = 16,
+ LCK_NL = 32
} ldlm_mode_t;
struct ldlm_extent {
__u32 lr_type;
__u32 lr_padding;
struct ldlm_res_id lr_name;
-} __attribute__((packed));
+};
extern void lustre_swab_ldlm_resource_desc (struct ldlm_resource_desc *r);
ldlm_mode_t l_req_mode;
ldlm_mode_t l_granted_mode;
ldlm_policy_data_t l_policy_data;
-} __attribute__((packed));
+};
extern void lustre_swab_ldlm_lock_desc (struct ldlm_lock_desc *l);
struct ldlm_lock_desc lock_desc;
struct lustre_handle lock_handle1;
struct lustre_handle lock_handle2;
-} __attribute__((packed));
+};
extern void lustre_swab_ldlm_request (struct ldlm_request *rq);
struct ldlm_reply {
__u32 lock_flags;
+ __u32 lock_padding;
struct ldlm_lock_desc lock_desc;
struct lustre_handle lock_handle;
__u64 lock_policy_res1;
__u32 lgl_ogen;
} __attribute__((packed));
+/* Records written to the CATALOGS list */
+#define CATLIST "CATALOGS"
+struct llog_catid {
+ struct llog_logid lci_logid;
+ __u32 lci_padding[3];
+} __attribute__((packed));
+
/* Log data record types - there is no specific reason that these need to
* be related to the RPC opcodes, but no reason not to (may be handy later?)
*/
PTL_CFG_REC = 0x10630000,
LLOG_GEN_REC = 0x10640000,
LLOG_HDR_MAGIC = 0x10645539,
- LLOG_LOGID_MAGIC = 0x1064553a,
+ LLOG_LOGID_MAGIC = 0x1064553b,
} llog_op_type;
/* Log record header - stored in little endian order.
struct llog_logid_rec {
struct llog_rec_hdr lid_hdr;
struct llog_logid lid_id;
- __u32 padding;
+ __u32 padding[5];
struct llog_rec_tail lid_tail;
} __attribute__((packed));
return import_state_names[state];
}
+enum obd_import_event {
+ IMP_EVENT_DISCON = 0x808001,
+ IMP_EVENT_INVALIDATE = 0x808002,
+ IMP_EVENT_ACTIVE = 0x808003,
+};
struct obd_import {
struct portals_handle imp_handle;
/* flags */
int imp_invalid:1, imp_replayable:1,
imp_dlm_fake:1, imp_server_timeout:1,
- imp_initial_recov:1;
+ imp_initial_recov:1, imp_force_verify:1,
+ imp_pingable:1;
__u32 imp_connect_op;
};
})
#endif /* _LUSTRE_LIB_H */
+
+#define LMD_MAGIC 0xbdacbdac
+
+#define lmd_bad_magic(LMDP) \
+({ \
+ struct lustre_mount_data *_lmd__ = (LMDP); \
+ int _ret__ = 0; \
+ if (!_lmd__) { \
+ CERROR("Missing mount data: " \
+ "check that /sbin/mount.lustre is installed.\n");\
+ _ret__ = 1; \
+ } else if (_lmd__->lmd_magic != LMD_MAGIC) { \
+ CERROR("Invalid mount data (%#x != %#x): " \
+ "check that /sbin/mount.lustre is installed\n", \
+ _lmd__->lmd_magic, LMD_MAGIC); \
+ _ret__ = 1; \
+ } \
+ _ret__; \
+})
+
int llog_cat_initialize(struct obd_device *obd, int count);
int obd_llog_init(struct obd_device *obd, struct obd_device *disk_obd,
- int count, struct llog_logid *logid);
+ int count, struct llog_catid *logid);
int obd_llog_finish(struct obd_device *obd, int count);
int llog_initiator_connect(struct llog_ctxt *ctxt);
int llog_receptor_accept(struct llog_ctxt *ctxt, struct obd_import *imp);
int llog_origin_connect(struct llog_ctxt *ctxt, int count,
- struct llog_logid *logid, struct llog_gen *gen);
+ struct llog_logid *logid, struct llog_gen *gen,
+ struct obd_uuid *uuid);
int llog_handle_connect(struct ptlrpc_request *req);
/* recov_thread.c */
struct llog_cookie *cookies, int flags);
int llog_obd_repl_sync(struct llog_ctxt *ctxt, struct obd_export *exp);
int llog_repl_connect(struct llog_ctxt *ctxt, int count,
- struct llog_logid *logid, struct llog_gen *gen);
+ struct llog_logid *logid, struct llog_gen *gen,
+ struct obd_uuid *uuid);
struct llog_operations {
int (*lop_write_rec)(struct llog_handle *loghandle,
int (*lop_cancel)(struct llog_ctxt *ctxt, struct lov_stripe_md *lsm,
int count, struct llog_cookie *cookies, int flags);
int (*lop_connect)(struct llog_ctxt *ctxt, int count,
- struct llog_logid *logid, struct llog_gen *gen);
+ struct llog_logid *logid, struct llog_gen *gen,
+ struct obd_uuid *uuid);
/* XXX add 2 more: commit callbacks and llog recovery functions */
};
/* llog_lvfs.c */
-int llog_get_cat_list(struct obd_device *obd, struct obd_device *disk_obd,
- char *name, int count, struct llog_logid *idarray);
extern struct llog_operations llog_lvfs_ops;
-
+int llog_get_cat_list(struct obd_device *obd, struct obd_device *disk_obd,
+ char *name, int count, struct llog_catid *idarray);
struct llog_ctxt {
int loc_idx; /* my index the obd array of ctxt's */
{
if (ctxt == NULL)
return -ENOTCONN;
-
+
*lop = ctxt->loc_logops;
if (*lop == NULL)
return -EOPNOTSUPP;
RETURN(-EOPNOTSUPP);
if (buf)
- buflen = le32_to_cpu(rec->lrh_len) + sizeof(struct llog_rec_hdr)
+ buflen = rec->lrh_len + sizeof(struct llog_rec_hdr)
+ sizeof(struct llog_rec_tail);
else
- buflen = le32_to_cpu(rec->lrh_len);
+ buflen = rec->lrh_len;
LASSERT(size_round(buflen) == buflen);
rc = lop->lop_write_rec(handle, rec, logcookies, numcookies, buf, idx);
}
static inline int llog_connect(struct llog_ctxt *ctxt, int count,
- struct llog_logid *logid, struct llog_gen *gen)
+ struct llog_logid *logid, struct llog_gen *gen,
+ struct obd_uuid *uuid)
{
struct llog_operations *lop;
int rc;
if (lop->lop_connect == NULL)
RETURN(-EOPNOTSUPP);
- rc = lop->lop_connect(ctxt, count, logid, gen);
+ rc = lop->lop_connect(ctxt, count, logid, gen, uuid);
RETURN(rc);
}
spinlock_t rq_lock;
/* client-side flags */
unsigned int rq_intr:1, rq_replied:1, rq_err:1,
- rq_timedout:1, rq_resend:1, rq_restart:1, rq_replay:1,
- rq_no_resend:1, rq_waiting:1, rq_receiving_reply:1;
+ rq_timedout:1, rq_resend:1, rq_restart:1, rq_replay:1,
+ rq_no_resend:1, rq_waiting:1, rq_receiving_reply:1,
+ rq_no_delay:1;
int rq_phase;
/* client-side refcount for SENT race */
atomic_t rq_refcount;
struct ptlrpc_srv_ni *rqbd_srv_ni;
ptl_handle_md_t rqbd_md_h;
int rqbd_refcount;
- int rqbd_eventcount;
char *rqbd_buffer;
struct ptlrpc_cb_id rqbd_cbid;
struct ptlrpc_request rqbd_req;
/* Interface-specific service state */
struct ptlrpc_service *sni_service; /* owning service */
struct ptlrpc_ni *sni_ni; /* network interface */
- struct list_head sni_rqbds; /* all the request buffers */
+ struct list_head sni_active_rqbds; /* req buffers receiving */
struct list_head sni_active_replies; /* all the active replies */
int sni_nrqbd_receiving; /* # posted request buffers */
};
int srv_nthreads; /* # running threads */
int srv_n_difficult_replies; /* # 'difficult' replies */
int srv_n_active_reqs; /* # reqs being served */
+ int srv_rqbd_timeout; /* timeout before re-posting reqs */
__u32 srv_req_portal;
__u32 srv_rep_portal;
int srv_n_queued_reqs; /* # reqs waiting to be served */
struct list_head srv_request_queue; /* reqs waiting for service */
+ struct list_head srv_idle_rqbds; /* request buffers to be reposted */
+
atomic_t srv_outstanding_replies;
struct list_head srv_reply_queue; /* replies waiting for service */
int ptlrpc_error(struct ptlrpc_request *req);
void ptlrpc_resend_req(struct ptlrpc_request *request);
int ptl_send_rpc(struct ptlrpc_request *request);
-void ptlrpc_register_rqbd (struct ptlrpc_request_buffer_desc *rqbd);
+int ptlrpc_register_rqbd (struct ptlrpc_request_buffer_desc *rqbd);
/* ptlrpc/client.c */
void ptlrpc_init_client(int req_portal, int rep_portal, char *name,
#ifdef __KERNEL__
-struct dentry *simple_mkdir(struct dentry *dir, char *name, int mode);
-struct dentry *simple_mknod(struct dentry *dir, char *name, int mode);
+struct dentry *simple_mkdir(struct dentry *dir, char *name, int mode, int fix);
+struct dentry *simple_mknod(struct dentry *dir, char *name, int mode, int fix);
int lustre_fread(struct file *file, void *buf, int len, loff_t *off);
int lustre_fwrite(struct file *file, const void *buf, int len, loff_t *off);
int lustre_fsync(struct file *file);
/* Public members. */
__u64 lsm_object_id; /* lov object id */
__u64 lsm_object_gr; /* lov object id */
- __u64 lsm_maxbytes;
+ __u64 lsm_maxbytes; /* maximum possible file size */
+ unsigned long lsm_xfersize; /* optimal transfer size */
/* LOV-private members start here -- only for use in lov/. */
__u32 lsm_magic;
struct list_head fo_export_list;
int fo_subdir_count;
+
obd_size fo_tot_dirty; /* protected by obd_osfs_lock */
obd_size fo_tot_granted; /* all values in bytes */
obd_size fo_tot_pending;
struct mdc_rpc_lock *cl_rpc_lock;
struct mdc_rpc_lock *cl_setattr_lock;
+ struct osc_creator cl_oscc;
};
/* Like a client, with some hangers-on. Keep mc_client_obd first so that we
int (*o_commitrw)(int cmd, struct obd_export *exp, struct obdo *oa,
int objcount, struct obd_ioobj *obj,
int niocount, struct niobuf_local *local,
- struct obd_trans_info *oti);
+ struct obd_trans_info *oti, int rc);
int (*o_enqueue)(struct obd_export *, struct lov_stripe_md *,
__u32 type, ldlm_policy_data_t *, __u32 mode,
int *flags, void *bl_cb, void *cp_cb, void *gl_cb,
/* llog related obd_methods */
int (*o_llog_init)(struct obd_device *obd, struct obd_device *disk_obd,
- int count, struct llog_logid *logid);
+ int count, struct llog_catid *logid);
int (*o_llog_finish)(struct obd_device *obd, int count);
/* metadata-only methods */
struct obd_client_handle *, int flag);
int (*o_unpin)(struct obd_export *, struct obd_client_handle *, int);
- int (*o_invalidate_import)(struct obd_device *, struct obd_import *);
+ int (*o_import_event)(struct obd_device *, struct obd_import *,
+ enum obd_import_event);
int (*o_notify)(struct obd_device *obd, struct obd_device *watched,
int active);
static inline int obd_commitrw(int cmd, struct obd_export *exp, struct obdo *oa,
int objcount, struct obd_ioobj *obj,
int niocount, struct niobuf_local *local,
- struct obd_trans_info *oti)
+ struct obd_trans_info *oti, int rc)
{
- int rc;
ENTRY;
OBD_CHECK_OP(exp->exp_obd, commitrw, -EOPNOTSUPP);
OBD_COUNTER_INCREMENT(exp->exp_obd, commitrw);
rc = OBP(exp->exp_obd, commitrw)(cmd, exp, oa, objcount, obj, niocount,
- local, oti);
+ local, oti, rc);
RETURN(rc);
}
return(rc);
}
-static inline void obd_invalidate_import(struct obd_device *obd,
- struct obd_import *imp)
+
+static inline void obd_import_event(struct obd_device *obd,
+ struct obd_import *imp,
+ enum obd_import_event event)
{
- if (obd->obd_set_up && OBP(obd, invalidate_import)) {
- OBD_COUNTER_INCREMENT(obd, invalidate_import);
- OBP(obd, invalidate_import)(obd, imp);
+ if (obd->obd_set_up && OBP(obd, import_event)) {
+ OBD_COUNTER_INCREMENT(obd, import_event);
+ OBP(obd, import_event)(obd, imp, event);
}
}
#define OBD_FAIL_PTLRPC 0x500
#define OBD_FAIL_PTLRPC_ACK 0x501
+#define OBD_FAIL_PTLRPC_RQBD 0x502
#define OBD_FAIL_OBD_PING_NET 0x600
#define OBD_FAIL_OBD_LOG_CANCEL_NET 0x601
include/linux/ext3_fs_sb.h | 10 +
5 files changed, 365 insertions(+)
-Index: linux-2.4.20-hp4-pnnl13/fs/ext3/super.c
+Index: linux/fs/ext3/super.c
===================================================================
---- linux-2.4.20-hp4-pnnl13.orig/fs/ext3/super.c 2004-01-12 19:22:32.000000000 +0300
-+++ linux-2.4.20-hp4-pnnl13/fs/ext3/super.c 2004-01-13 17:04:38.000000000 +0300
+--- linux.orig/fs/ext3/super.c Mon Feb 2 20:57:35 2004
++++ linux/fs/ext3/super.c Mon Feb 2 20:58:05 2004
@@ -400,6 +400,221 @@
}
}
if (sbi->s_mount_opt & EXT3_MOUNT_ABORT)
ext3_abort(sb, __FUNCTION__, "Abort forced by user");
-Index: linux-2.4.20-hp4-pnnl13/fs/ext3/inode.c
+Index: linux/fs/ext3/inode.c
===================================================================
---- linux-2.4.20-hp4-pnnl13.orig/fs/ext3/inode.c 2004-01-12 19:22:32.000000000 +0300
-+++ linux-2.4.20-hp4-pnnl13/fs/ext3/inode.c 2004-01-13 17:01:40.000000000 +0300
+--- linux.orig/fs/ext3/inode.c Mon Feb 2 20:57:35 2004
++++ linux/fs/ext3/inode.c Mon Feb 2 20:58:05 2004
@@ -2500,6 +2500,118 @@
return err;
}
/*
* On success, We end up with an outstanding reference count against
* iloc->bh. This _must_ be cleaned up later.
-Index: linux-2.4.20-hp4-pnnl13/fs/ext3/file.c
+Index: linux/fs/ext3/file.c
===================================================================
---- linux-2.4.20-hp4-pnnl13.orig/fs/ext3/file.c 2004-01-12 19:22:32.000000000 +0300
-+++ linux-2.4.20-hp4-pnnl13/fs/ext3/file.c 2004-01-13 17:01:40.000000000 +0300
+--- linux.orig/fs/ext3/file.c Mon Feb 2 20:57:34 2004
++++ linux/fs/ext3/file.c Mon Feb 2 20:58:05 2004
@@ -125,7 +125,11 @@
};
setattr: ext3_setattr, /* BKL held */
setxattr: ext3_setxattr, /* BKL held */
getxattr: ext3_getxattr, /* BKL held */
-Index: linux-2.4.20-hp4-pnnl13/fs/buffer.c
+Index: linux/include/linux/ext3_fs.h
===================================================================
---- linux-2.4.20-hp4-pnnl13.orig/fs/buffer.c 2003-09-13 15:19:26.000000000 +0400
-+++ linux-2.4.20-hp4-pnnl13/fs/buffer.c 2004-01-13 17:01:40.000000000 +0300
-@@ -376,6 +376,8 @@
- if (sb->s_op && sb->s_op->sync_fs)
- sb->s_op->sync_fs(sb);
- unlock_super(sb);
-+ if (sb->s_op && sb->s_op->sync_fs)
-+ sb->s_op->sync_fs(sb);
- unlock_kernel();
-
- return sync_buffers(dev, 1);
-Index: linux-2.4.20-hp4-pnnl13/include/linux/ext3_fs.h
-===================================================================
---- linux-2.4.20-hp4-pnnl13.orig/include/linux/ext3_fs.h 2004-01-12 19:22:32.000000000 +0300
-+++ linux-2.4.20-hp4-pnnl13/include/linux/ext3_fs.h 2004-01-13 17:01:40.000000000 +0300
+--- linux.orig/include/linux/ext3_fs.h Mon Feb 2 20:57:35 2004
++++ linux/include/linux/ext3_fs.h Mon Feb 2 20:58:05 2004
@@ -193,6 +193,7 @@
*/
#define EXT3_STATE_JDATA 0x00000001 /* journaled data exists */
/* ioctl.c */
extern int ext3_ioctl (struct inode *, struct file *, unsigned int,
-Index: linux-2.4.20-hp4-pnnl13/include/linux/ext3_fs_sb.h
+Index: linux/include/linux/ext3_fs_sb.h
===================================================================
---- linux-2.4.20-hp4-pnnl13.orig/include/linux/ext3_fs_sb.h 2004-01-12 19:22:32.000000000 +0300
-+++ linux-2.4.20-hp4-pnnl13/include/linux/ext3_fs_sb.h 2004-01-13 17:01:40.000000000 +0300
+--- linux.orig/include/linux/ext3_fs_sb.h Mon Feb 2 20:57:35 2004
++++ linux/include/linux/ext3_fs_sb.h Mon Feb 2 20:58:05 2004
@@ -29,6 +29,8 @@
#define EXT3_MAX_GROUP_LOADED 8
--- /dev/null
+
+
+
+ fs/inode.c | 21 ++++++++++++++-------
+ fs/smbfs/inode.c | 2 +-
+ fs/super.c | 4 ++--
+ include/linux/fs.h | 2 +-
+ 4 files changed, 18 insertions(+), 11 deletions(-)
+
+Index: linux/fs/inode.c
+===================================================================
+--- linux.orig/fs/inode.c Mon Feb 2 21:24:21 2004
++++ linux/fs/inode.c Mon Feb 2 21:27:53 2004
+@@ -632,7 +632,8 @@
+ /*
+ * Invalidate all inodes for a device.
+ */
+-static int invalidate_list(struct list_head *head, struct super_block * sb, struct list_head * dispose)
++static int invalidate_list(struct list_head *head, struct super_block * sb,
++ struct list_head * dispose, int show)
+ {
+ struct list_head *next;
+ int busy = 0, count = 0;
+@@ -657,6 +658,11 @@
+ count++;
+ continue;
+ }
++ if (show)
++ printk(KERN_ERR
++ "inode busy: dev %s:%lu (%p) mode %o count %u\n",
++ kdevname(sb->s_dev), inode->i_ino, inode,
++ inode->i_mode, atomic_read(&inode->i_count));
+ busy = 1;
+ }
+ /* only unused inodes may be cached with i_count zero */
+@@ -675,23 +681,24 @@
+ /**
+ * invalidate_inodes - discard the inodes on a device
+ * @sb: superblock
++ * @show: whether we should display any busy inodes found
+ *
+ * Discard all of the inodes for a given superblock. If the discard
+ * fails because there are busy inodes then a non zero value is returned.
+ * If the discard is successful all the inodes have been discarded.
+ */
+
+-int invalidate_inodes(struct super_block * sb)
++int invalidate_inodes(struct super_block * sb, int show)
+ {
+ int busy;
+ LIST_HEAD(throw_away);
+
+ spin_lock(&inode_lock);
+- busy = invalidate_list(&inode_in_use, sb, &throw_away);
+- busy |= invalidate_list(&inode_unused, sb, &throw_away);
+- busy |= invalidate_list(&inode_unused_pagecache, sb, &throw_away);
+- busy |= invalidate_list(&sb->s_dirty, sb, &throw_away);
+- busy |= invalidate_list(&sb->s_locked_inodes, sb, &throw_away);
++ busy = invalidate_list(&inode_in_use, sb, &throw_away, show);
++ busy |= invalidate_list(&inode_unused, sb, &throw_away, show);
++ busy |= invalidate_list(&inode_unused_pagecache, sb, &throw_away, show);
++ busy |= invalidate_list(&sb->s_dirty, sb, &throw_away, show);
++ busy |= invalidate_list(&sb->s_locked_inodes, sb, &throw_away, show);
+ spin_unlock(&inode_lock);
+
+ dispose_list(&throw_away);
+@@ -717,7 +724,7 @@
+ * hold).
+ */
+ shrink_dcache_sb(sb);
+- res = invalidate_inodes(sb);
++ res = invalidate_inodes(sb, 0);
+ drop_super(sb);
+ }
+ invalidate_buffers(dev);
+Index: linux/fs/super.c
+===================================================================
+--- linux.orig/fs/super.c Mon Feb 2 21:24:21 2004
++++ linux/fs/super.c Mon Feb 2 21:26:08 2004
+@@ -844,7 +844,7 @@
+ lock_super(sb);
+ lock_kernel();
+ sb->s_flags &= ~MS_ACTIVE;
+- invalidate_inodes(sb); /* bad name - it should be evict_inodes() */
++ invalidate_inodes(sb, 0); /* bad name - it should be evict_inodes() */
+ if (sop) {
+ if (sop->write_super && sb->s_dirt)
+ sop->write_super(sb);
+@@ -853,7 +853,7 @@
+ }
+
+ /* Forget any remaining inodes */
+- if (invalidate_inodes(sb)) {
++ if (invalidate_inodes(sb, 1)) {
+ printk(KERN_ERR "VFS: Busy inodes after unmount. "
+ "Self-destruct in 5 seconds. Have a nice day...\n");
+ }
+Index: linux/include/linux/fs.h
+===================================================================
+--- linux.orig/include/linux/fs.h Mon Feb 2 21:24:23 2004
++++ linux/include/linux/fs.h Mon Feb 2 21:26:08 2004
+@@ -1257,7 +1257,7 @@
+ extern void set_buffer_flushtime(struct buffer_head *);
+ extern void balance_dirty(void);
+ extern int check_disk_change(kdev_t);
+-extern int invalidate_inodes(struct super_block *);
++extern int invalidate_inodes(struct super_block *, int);
+ extern int invalidate_device(kdev_t, int);
+ extern void invalidate_inode_pages(struct inode *);
+ extern void invalidate_inode_pages2(struct address_space *);
+Index: linux/fs/smbfs/inode.c
+===================================================================
+--- linux.orig/fs/smbfs/inode.c Thu Nov 28 18:53:15 2002
++++ linux/fs/smbfs/inode.c Mon Feb 2 21:26:08 2004
+@@ -167,7 +167,7 @@
+ {
+ VERBOSE("\n");
+ shrink_dcache_sb(SB_of(server));
+- invalidate_inodes(SB_of(server));
++ invalidate_inodes(SB_of(server), 0);
+ }
+
+ /*
exports_2.4.20-rh-hp.patch
lustre_version.patch
vfs_intent-2.4.20-hp.patch
-invalidate_show.patch
+invalidate_show-2.4.20-hp.patch
export-truncate.patch
iod-stock-24-exports_hp.patch
ext-2.4-patch-1.patch
ext3-error-export.patch
iopen-2.4.20.patch
tcp-zero-copy.patch
+jbd-dont-account-blocks-twice.patch
+jbd-commit-tricks.patch
add_page_private.patch
socket-exports-vanilla.patch
removepage-2.4.20.patch
jbd-flushtime.patch
jbd-get_write_access.patch
nfs_export_kernel-2.4.20-hp.patch
+ext3-raw-lookup.patch
ext3-ea-in-inode-2.4.20.patch
listman-2.4.20.patch
ext3-trusted_ea-2.4.20.patch
* - the maximum extent
* - containing the requested extent
* - and not overlapping existing conflicting extents outside the requested one
- *
- * An alternative policy is to not shrink the new extent when conflicts exist */
+ */
static void
ldlm_extent_internal_policy(struct list_head *queue, struct ldlm_lock *req,
struct ldlm_extent *new_ex)
{
struct list_head *tmp;
ldlm_mode_t req_mode = req->l_req_mode;
- __u64 req_start = req->l_policy_data.l_extent.start;
- __u64 req_end = req->l_policy_data.l_extent.end;
+ __u64 req_start = req->l_req_extent.start;
+ __u64 req_end = req->l_req_extent.end;
ENTRY;
- if (new_ex->start == req_start && new_ex->end == req_end) {
- EXIT;
- return;
- }
+ lockmode_verify(req_mode);
list_for_each(tmp, queue) {
struct ldlm_lock *lock;
+ struct ldlm_extent *l_extent;
+
lock = list_entry(tmp, struct ldlm_lock, l_res_link);
+ l_extent = &lock->l_policy_data.l_extent;
- if (req == lock) {
+ if (new_ex->start == req_start && new_ex->end == req_end) {
EXIT;
return;
}
- /* if lock doesn't overlap new_ex, skip it. */
- if (lock->l_policy_data.l_extent.end < new_ex->start ||
- lock->l_policy_data.l_extent.start > new_ex->end)
+ /* Don't conflict with ourselves */
+ if (req == lock)
+ continue;
+
+ /* If lock doesn't overlap new_ex, skip it. */
+ if (l_extent->end < new_ex->start ||
+ l_extent->start > new_ex->end)
continue;
/* Locks are compatible, overlap doesn't matter */
if (lockmode_compat(lock->l_req_mode, req_mode))
continue;
- if (lock->l_policy_data.l_extent.start < req_start) {
- if (lock->l_policy_data.l_extent.end == ~0) {
+ /* Locks conflicting in requested extents and we can't satisfy
+ * both locks, so ignore it. Either we will ping-pong this
+ * extent (we would regardless of what extent we granted) or
+ * lock is unused and it shouldn't limit our extent growth. */
+ if (lock->l_req_extent.end >= req_start &&
+ lock->l_req_extent.start <= req_end)
+ continue;
+
+ /* We grow extents downwards only as far as they don't overlap
+ * with already-granted locks, on the assumtion that clients
+ * will be writing beyond the initial requested end and would
+ * then need to enqueue a new lock beyond the previous request.
+ * We don't grow downwards if there are lots of lockers. */
+ if (l_extent->start < req_start) {
+ if (atomic_read(&req->l_resource->lr_refcount) > 20)
new_ex->start = req_start;
- new_ex->end = req_end;
- EXIT;
- return;
- }
- new_ex->start = min(lock->l_policy_data.l_extent.end+1,
- req_start);
+ else
+ new_ex->start = min(l_extent->end+1, req_start);
}
- if (lock->l_policy_data.l_extent.end > req_end) {
- if (lock->l_policy_data.l_extent.start == 0) {
- new_ex->start = req_start;
- new_ex->end = req_end;
- EXIT;
- return;
- }
- new_ex->end = MAX(lock->l_policy_data.l_extent.start-1,
- req_end);
+ /* If we need to cancel this lock anyways because our request
+ * overlaps the granted lock, we grow up to its requested
+ * extent start instead of limiting this extent, assuming that
+ * clients are writing forwards and the lock had over grown
+ * its extent downwards before we enqueued our request. */
+ if (l_extent->end > req_end) {
+ if (l_extent->start <= req_end)
+ new_ex->end = max(lock->l_req_extent.start - 1,
+ req_end);
+ else
+ new_ex->end = max(l_extent->start - 1, req_end);
}
}
EXIT;
}
-/* Determine if the lock is compatible with all locks on the queue. */
+/* In order to determine the largest possible extent we can grant, we need
+ * to scan all of the queues. */
+static void ldlm_extent_policy(struct ldlm_resource *res,
+ struct ldlm_lock *lock, int *flags)
+{
+ struct ldlm_extent new_ex = { .start = 0, .end = ~0};
+
+ ldlm_extent_internal_policy(&res->lr_granted, lock, &new_ex);
+ ldlm_extent_internal_policy(&res->lr_waiting, lock, &new_ex);
+
+ if (new_ex.start != lock->l_policy_data.l_extent.start ||
+ new_ex.end != lock->l_policy_data.l_extent.end) {
+ *flags |= LDLM_FL_LOCK_CHANGED;
+ lock->l_policy_data.l_extent.start = new_ex.start;
+ lock->l_policy_data.l_extent.end = new_ex.end;
+ }
+}
+
+/* Determine if the lock is compatible with all locks on the queue.
+ * We stop walking the queue if we hit ourselves so we don't take
+ * conflicting locks enqueued after us into accound, or we'd wait forever. */
static int
ldlm_extent_compat_queue(struct list_head *queue, struct ldlm_lock *req,
int send_cbs)
struct list_head *tmp;
struct ldlm_lock *lock;
ldlm_mode_t req_mode = req->l_req_mode;
- __u64 req_start = req->l_policy_data.l_extent.start;
- __u64 req_end = req->l_policy_data.l_extent.end;
+ __u64 req_start = req->l_req_extent.start;
+ __u64 req_end = req->l_req_extent.end;
int compat = 1;
ENTRY;
+ lockmode_verify(req_mode);
+
list_for_each(tmp, queue) {
lock = list_entry(tmp, struct ldlm_lock, l_res_link);
ldlm_error_t *err)
{
struct ldlm_resource *res = lock->l_resource;
- struct ldlm_extent new_ex = {0, ~0};
struct list_head rpc_list = LIST_HEAD_INIT(rpc_list);
int rc;
ENTRY;
RETURN(LDLM_ITER_STOP);
ldlm_resource_unlink_lock(lock);
+
+ ldlm_extent_policy(res, lock, flags);
ldlm_grant_lock(lock, NULL, 0, 1);
RETURN(LDLM_ITER_CONTINUE);
}
- /* In order to determine the largest possible extent we can
- * grant, we need to scan all of the queues. */
- ldlm_extent_internal_policy(&res->lr_granted, lock, &new_ex);
- ldlm_extent_internal_policy(&res->lr_waiting, lock, &new_ex);
-
- if (new_ex.start != lock->l_policy_data.l_extent.start ||
- new_ex.end != lock->l_policy_data.l_extent.end) {
- *flags |= LDLM_FL_LOCK_CHANGED;
- lock->l_policy_data.l_extent.start = new_ex.start;
- lock->l_policy_data.l_extent.end = new_ex.end;
- }
-
restart:
LASSERT(res->lr_tmp == NULL);
res->lr_tmp = &rpc_list;
GOTO(restart, -ERESTART);
*flags |= LDLM_FL_BLOCK_GRANTED;
} else {
+ ldlm_extent_policy(res, lock, flags);
ldlm_resource_unlink_lock(lock);
ldlm_grant_lock(lock, NULL, 0, 0);
}
}
}
} else {
+ lockmode_verify(mode);
+
/* This loop determines if there are existing locks
* that conflict with the new lock request. */
list_for_each(tmp, &res->lr_granted) {
/* locks are compatible, overlap doesn't matter */
if (lockmode_compat(lock->l_granted_mode, mode))
continue;
-
+
if (!ldlm_flocks_overlap(lock, req))
continue;
int ldlm_reprocess_queue(struct ldlm_resource *res, struct list_head *queue);
int ldlm_run_ast_work(struct ldlm_namespace *, struct list_head *rpc_list);
+/* ldlm_lockd.c */
+int ldlm_bl_to_thread(struct ldlm_namespace *ns, struct ldlm_lock_desc *ld,
+ struct ldlm_lock *lock);
+
/* ldlm_plain.c */
int ldlm_process_plain_lock(struct ldlm_lock *lock, int *flags, int first_enq,
ldlm_error_t *err);
GOTO(out_ldlm, rc);
}
+ ptlrpc_pinger_add_import(imp);
EXIT;
if (rc) {
/* Yeah, obd_no_recov also (mainly) means "forced shutdown". */
if (obd->obd_no_recov)
- ptlrpc_set_import_active(imp, 0);
+ ptlrpc_invalidate_import(imp);
else
rc = ptlrpc_disconnect_import(imp);
{
l_lock(&lock->l_resource->lr_namespace->ns_lock);
ldlm_lock_remove_from_lru(lock);
- if (mode == LCK_NL || mode == LCK_CR || mode == LCK_PR)
+ if (mode & (LCK_NL | LCK_CR | LCK_PR))
lock->l_readers++;
- else
+ if (mode & (LCK_EX | LCK_CW | LCK_PW))
lock->l_writers++;
lock->l_last_used = jiffies;
l_unlock(&lock->l_resource->lr_namespace->ns_lock);
LDLM_DEBUG(lock, "ldlm_lock_decref(%s)", ldlm_lockname[mode]);
ns = lock->l_resource->lr_namespace;
l_lock(&ns->ns_lock);
- if (mode == LCK_NL || mode == LCK_CR || mode == LCK_PR) {
+ if (mode & (LCK_NL | LCK_CR | LCK_PR)) {
LASSERT(lock->l_readers > 0);
lock->l_readers--;
- } else {
+ }
+ if (mode & (LCK_EX | LCK_CW | LCK_PW)) {
LASSERT(lock->l_writers > 0);
lock->l_writers--;
}
"warning\n");
LDLM_DEBUG(lock, "final decref done on cbpending lock");
- l_unlock(&ns->ns_lock);
- l_check_no_ns_lock(ns);
- /* FIXME: need a real 'desc' here */
- if (lock->l_blocking_ast != NULL)
- lock->l_blocking_ast(lock, NULL, lock->l_ast_data,
- LDLM_CB_BLOCKING);
+ LDLM_LOCK_GET(lock); /* dropped by bl thread */
+ ldlm_lock_remove_from_lru(lock);
+ ldlm_bl_to_thread(ns, NULL, lock);
+ l_unlock(&ns->ns_lock);
} else if (ns->ns_client == LDLM_NAMESPACE_CLIENT &&
!lock->l_readers && !lock->l_writers) {
/* If this is a client-side namespace and this was the last
lock->l_readers == 0 && lock->l_writers == 0)
continue;
- if (lock->l_req_mode != mode)
+ if (!(lock->l_req_mode & mode))
continue;
if (lock->l_resource->lr_type == LDLM_EXTENT &&
!(lock->l_flags & LDLM_FL_LOCAL))
continue;
- ldlm_lock_addref_internal(lock, mode);
+ if (flags & LDLM_FL_TEST_LOCK)
+ LDLM_LOCK_GET(lock);
+ else
+ ldlm_lock_addref_internal(lock, mode);
return lock;
}
* If 'flags' contains LDLM_FL_CBPENDING, then locks that have been marked
* to be canceled can still be matched as long as they still have reader
* or writer refernces
+ * If 'flags' contains LDLM_FL_TEST_LOCK, then don't actually reference a lock,
+ * just tell us if we would have matched.
*
* Returns 1 if it finds an already-existing lock that is compatible; in this
* case, lockh is filled in with a addref()ed lock
l_unlock(&ns->ns_lock);
if (lock) {
- struct l_wait_info lwi;
ldlm_lock2handle(lock, lockh);
- if (lock->l_completion_ast)
- lock->l_completion_ast(lock, LDLM_FL_WAIT_NOREPROC,
- NULL);
+ if (!(lock->l_flags & LDLM_FL_CAN_MATCH)) {
+ struct l_wait_info lwi;
+ if (lock->l_completion_ast)
+ lock->l_completion_ast(lock,
+ LDLM_FL_WAIT_NOREPROC,
+ NULL);
- lwi = LWI_TIMEOUT_INTR(obd_timeout * HZ, NULL, NULL, NULL);
+ lwi = LWI_TIMEOUT_INTR(obd_timeout*HZ, NULL,NULL,NULL);
- /* XXX FIXME see comment about CAN_MATCH in lustre_dlm.h */
- l_wait_event(lock->l_waitq,
- (lock->l_flags & LDLM_FL_CAN_MATCH), &lwi);
+ /* XXX FIXME see comment on CAN_MATCH in lustre_dlm.h */
+ l_wait_event(lock->l_waitq,
+ (lock->l_flags & LDLM_FL_CAN_MATCH), &lwi);
+ }
}
if (rc)
- LDLM_DEBUG(lock, "matched");
- else
- LDLM_DEBUG_NOLOCK("not matched");
+ LDLM_DEBUG(lock, "matched ("LPU64" "LPU64")",
+ type == LDLM_PLAIN ? res_id->name[2] :
+ policy->l_extent.start,
+ type == LDLM_PLAIN ? res_id->name[3] :
+ policy->l_extent.end);
+ else if (!(flags & LDLM_FL_TEST_LOCK)) /* less verbose for test-only */
+ LDLM_DEBUG_NOLOCK("not matched ns %p type %u mode %u res "
+ LPU64"/"LPU64" ("LPU64" "LPU64")", ns,
+ type, mode, res_id->name[0], res_id->name[1],
+ type == LDLM_PLAIN ? res_id->name[2] :
+ policy->l_extent.start,
+ type == LDLM_PLAIN ? res_id->name[3] :
+ policy->l_extent.end);
if (old_lock)
LDLM_LOCK_PUT(old_lock);
+ if (flags & LDLM_FL_TEST_LOCK && rc)
+ LDLM_LOCK_PUT(lock);
return rc;
}
struct ldlm_resource *res;
l_lock(&ns->ns_lock);
- while(!list_empty(&exp->exp_ldlm_data.led_held_locks)) {
+ while(!list_empty(&exp->exp_ldlm_data.led_held_locks)) {
lock = list_entry(exp->exp_ldlm_data.led_held_locks.next,
struct ldlm_lock, l_export_chain);
res = ldlm_resource_getref(lock->l_resource);
"write: %d\n", (int)lock->l_req_mode, (int)lock->l_granted_mode,
atomic_read(&lock->l_refc), lock->l_readers, lock->l_writers);
if (lock->l_resource->lr_type == LDLM_EXTENT)
- CDEBUG(level, " Extent: "LPU64" -> "LPU64"\n",
+ CDEBUG(level, " Extent: "LPU64" -> "LPU64
+ " (req "LPU64"-"LPU64")\n",
lock->l_policy_data.l_extent.start,
- lock->l_policy_data.l_extent.end);
+ lock->l_policy_data.l_extent.end,
+ lock->l_req_extent.start, lock->l_req_extent.end);
else if (lock->l_resource->lr_type == LDLM_FLOCK)
CDEBUG(level, " Pid: %d Extent: "LPU64" -> "LPU64"\n",
lock->l_policy_data.l_flock.pid,
/* LDLM state */
-static struct ldlm_state *ldlm ;
+static struct ldlm_state *ldlm_state;
inline unsigned long round_timeout(unsigned long timeout)
{
if (rc == -ETIMEDOUT || rc == -EINTR || rc == -ENOTCONN) {
ldlm_del_waiting_lock(lock);
ldlm_failed_ast(lock, rc, "completion");
+ } else if (rc == -EINVAL) {
+ LDLM_DEBUG(lock, "lost the race -- client no longer has this "
+ "lock");
} else if (rc) {
LDLM_ERROR(lock, "client sent rc %d rq_status %d from "
- "completion AST\n", rc, req->rq_status);
+ "completion AST", rc, req->rq_status);
ldlm_lock_cancel(lock);
/* Server-side AST functions are called from ldlm_reprocess_all,
* which needs to be told to please restart its reprocessing. */
if (rc == -ETIMEDOUT || rc == -EINTR || rc == -ENOTCONN) {
ldlm_del_waiting_lock(lock);
ldlm_failed_ast(lock, rc, "glimpse");
+ } else if (rc == -EINVAL) {
+ LDLM_DEBUG(lock, "lost the race -- client no longer has this "
+ "lock");
} else if (rc) {
LDLM_ERROR(lock, "client sent rc %d rq_status %d from "
- "completion AST\n", rc, req->rq_status);
- ldlm_lock_cancel(lock);
+ "glimpse AST", rc, req->rq_status);
} else {
rc = res->lr_namespace->ns_lvbo->lvbo_update(res,
req->rq_repmsg, 0);
struct obd_device *obddev = req->rq_export->exp_obd;
struct ldlm_reply *dlm_rep;
struct ldlm_request *dlm_req;
- int rc, size[2] = {sizeof(*dlm_rep)};
+ int rc = 0, size[2] = {sizeof(*dlm_rep)};
__u32 flags;
- ldlm_error_t err;
+ ldlm_error_t err = ELDLM_OK;
struct ldlm_lock *lock = NULL;
void *cookie = NULL;
ENTRY;
lustre_swab_ldlm_request);
if (dlm_req == NULL) {
CERROR ("Can't unpack dlm_req\n");
- RETURN (-EFAULT);
+ GOTO(out, rc = -EFAULT);
}
flags = dlm_req->lock_flags;
blocking_callback, completion_callback,
glimpse_callback, NULL, 0);
if (!lock)
- GOTO(out, err = -ENOMEM);
+ GOTO(out, rc = -ENOMEM);
do_gettimeofday(&lock->l_enqueued_time);
memcpy(&lock->l_remote_handle, &dlm_req->lock_handle1,
rc = lustre_pack_reply(req, buffers, size, NULL);
if (rc)
- RETURN(rc);
+ GOTO(out, rc);
}
if (dlm_req->lock_desc.l_resource.lr_type != LDLM_PLAIN)
memcpy(&lock->l_policy_data, &dlm_req->lock_desc.l_policy_data,
sizeof(ldlm_policy_data_t));
+ if (dlm_req->lock_desc.l_resource.lr_type == LDLM_EXTENT)
+ memcpy(&lock->l_req_extent, &lock->l_policy_data.l_extent,
+ sizeof(lock->l_req_extent));
err = ldlm_lock_enqueue(obddev->obd_namespace, &lock, cookie, &flags);
if (err)
EXIT;
out:
- if (lock != NULL && lock->l_resource->lr_lvb_len > 0) {
- void *lvb = lustre_msg_buf(req->rq_repmsg, 1,
- lock->l_resource->lr_lvb_len);
- memcpy(lvb, lock->l_resource->lr_lvb_data,
- lock->l_resource->lr_lvb_len);
- }
req->rq_status = err;
+ if (req->rq_reply_state == NULL) {
+ err = lustre_pack_reply(req, 0, NULL, NULL);
+ if (rc == 0)
+ rc = err;
+ }
/* The LOCK_CHANGED code in ldlm_lock_enqueue depends on this
* ldlm_reprocess_all. If this moves, revisit that code. -phil */
if (lock) {
LDLM_DEBUG(lock, "server-side enqueue handler, sending reply"
- "(err=%d)", err);
+ "(err=%d, rc=%d)", err, rc);
+
+ if (lock->l_resource->lr_lvb_len > 0) {
+ void *lvb = lustre_msg_buf(req->rq_repmsg, 1,
+ lock->l_resource->lr_lvb_len);
+ memcpy(lvb, lock->l_resource->lr_lvb_data,
+ lock->l_resource->lr_lvb_len);
+ }
+
if (!err && dlm_req->lock_desc.l_resource.lr_type != LDLM_FLOCK)
ldlm_reprocess_all(lock->l_resource);
LDLM_LOCK_PUT(lock);
}
- LDLM_DEBUG_NOLOCK("server-side enqueue handler END (lock %p)", lock);
+ LDLM_DEBUG_NOLOCK("server-side enqueue handler END (lock %p, rc %d)",
+ lock, rc);
- return 0;
+ return rc;
}
int ldlm_handle_convert(struct ptlrpc_request *req)
(res, NULL, 0);
//(res, req->rq_reqmsg, 1);
}
-
+
ldlm_lock_cancel(lock);
if (ldlm_del_waiting_lock(lock))
CDEBUG(D_DLMTRACE, "cancelled waiting lock %p\n", lock);
static int ldlm_callback_reply(struct ptlrpc_request *req, int rc)
{
req->rq_status = rc;
- rc = lustre_pack_reply(req, 0, NULL, NULL);
- if (rc)
- return rc;
+ if (req->rq_reply_state == NULL) {
+ rc = lustre_pack_reply(req, 0, NULL, NULL);
+ if (rc)
+ return rc;
+ }
return ptlrpc_reply(req);
}
#ifdef __KERNEL__
-static int ldlm_bl_to_thread(struct ldlm_state *ldlm, struct ldlm_namespace *ns,
- struct ldlm_lock_desc *ld, struct ldlm_lock *lock)
+int ldlm_bl_to_thread(struct ldlm_namespace *ns, struct ldlm_lock_desc *ld,
+ struct ldlm_lock *lock)
{
- struct ldlm_bl_pool *blp = ldlm->ldlm_bl_pool;
+ struct ldlm_bl_pool *blp = ldlm_state->ldlm_bl_pool;
struct ldlm_bl_work_item *blwi;
ENTRY;
RETURN(-ENOMEM);
blwi->blwi_ns = ns;
- blwi->blwi_ld = *ld;
+ if (ld != NULL)
+ blwi->blwi_ld = *ld;
blwi->blwi_lock = lock;
spin_lock(&blp->blp_lock);
RETURN(0);
}
- if (req->rq_reqmsg->opc == LDLM_BL_CALLBACK) {
+ LASSERT(req->rq_export != NULL);
+ LASSERT(req->rq_export->exp_obd != NULL);
+
+ switch(req->rq_reqmsg->opc) {
+ case LDLM_BL_CALLBACK:
OBD_FAIL_RETURN(OBD_FAIL_LDLM_BL_CALLBACK, 0);
- } else if (req->rq_reqmsg->opc == LDLM_CP_CALLBACK) {
+ break;
+ case LDLM_CP_CALLBACK:
OBD_FAIL_RETURN(OBD_FAIL_LDLM_CP_CALLBACK, 0);
- } else if (req->rq_reqmsg->opc == LDLM_GL_CALLBACK) {
+ break;
+ case LDLM_GL_CALLBACK:
OBD_FAIL_RETURN(OBD_FAIL_LDLM_GL_CALLBACK, 0);
- } else if (req->rq_reqmsg->opc == OBD_LOG_CANCEL) {
+ break;
+ case OBD_LOG_CANCEL:
OBD_FAIL_RETURN(OBD_FAIL_OBD_LOG_CANCEL_NET, 0);
- } else if (req->rq_reqmsg->opc == LLOG_ORIGIN_HANDLE_CREATE) {
- OBD_FAIL_RETURN(OBD_FAIL_OBD_LOGD_NET, 0);
- } else if (req->rq_reqmsg->opc == LLOG_ORIGIN_HANDLE_NEXT_BLOCK) {
- OBD_FAIL_RETURN(OBD_FAIL_OBD_LOGD_NET, 0);
- } else if (req->rq_reqmsg->opc == LLOG_ORIGIN_HANDLE_READ_HEADER) {
- OBD_FAIL_RETURN(OBD_FAIL_OBD_LOGD_NET, 0);
- } else if (req->rq_reqmsg->opc == LLOG_ORIGIN_HANDLE_CLOSE) {
- OBD_FAIL_RETURN(OBD_FAIL_OBD_LOGD_NET, 0);
- } else {
- ldlm_callback_reply(req, -EPROTO);
- RETURN(0);
- }
-
- LASSERT(req->rq_export != NULL);
- LASSERT(req->rq_export->exp_obd != NULL);
-
- /* FIXME - how to send reply */
- if (req->rq_reqmsg->opc == OBD_LOG_CANCEL) {
- int rc = llog_origin_handle_cancel(req);
+ rc = llog_origin_handle_cancel(req);
ldlm_callback_reply(req, rc);
RETURN(0);
- }
- if (req->rq_reqmsg->opc == LLOG_ORIGIN_HANDLE_CREATE) {
- int rc = llog_origin_handle_create(req);
- req->rq_status = rc;
- ptlrpc_reply(req);
+ case LLOG_ORIGIN_HANDLE_CREATE:
+ OBD_FAIL_RETURN(OBD_FAIL_OBD_LOGD_NET, 0);
+ rc = llog_origin_handle_create(req);
+ ldlm_callback_reply(req, rc);
RETURN(0);
- }
- if (req->rq_reqmsg->opc == LLOG_ORIGIN_HANDLE_NEXT_BLOCK) {
- int rc = llog_origin_handle_next_block(req);
- req->rq_status = rc;
- ptlrpc_reply(req);
+ case LLOG_ORIGIN_HANDLE_NEXT_BLOCK:
+ OBD_FAIL_RETURN(OBD_FAIL_OBD_LOGD_NET, 0);
+ rc = llog_origin_handle_next_block(req);
+ ldlm_callback_reply(req, rc);
RETURN(0);
- }
- if (req->rq_reqmsg->opc == LLOG_ORIGIN_HANDLE_READ_HEADER) {
- int rc = llog_origin_handle_read_header(req);
- req->rq_status = rc;
- ptlrpc_reply(req);
+ case LLOG_ORIGIN_HANDLE_READ_HEADER:
+ OBD_FAIL_RETURN(OBD_FAIL_OBD_LOGD_NET, 0);
+ rc = llog_origin_handle_read_header(req);
+ ldlm_callback_reply(req, rc);
RETURN(0);
- }
- if (req->rq_reqmsg->opc == LLOG_ORIGIN_HANDLE_CLOSE) {
- int rc = llog_origin_handle_close(req);
+ case LLOG_ORIGIN_HANDLE_CLOSE:
+ OBD_FAIL_RETURN(OBD_FAIL_OBD_LOGD_NET, 0);
+ rc = llog_origin_handle_close(req);
ldlm_callback_reply(req, rc);
RETURN(0);
+ default:
+ CERROR("unknown opcode %u\n", req->rq_reqmsg->opc);
+ ldlm_callback_reply(req, -EPROTO);
+ RETURN(0);
}
ns = req->rq_export->exp_obd->obd_namespace;
* cancelling right now, because it's unused, or have an intent result
* in the reply, so we might have to push the responsibility for sending
* the reply down into the AST handlers, alas. */
- if (req->rq_reqmsg->opc == LDLM_CP_CALLBACK)
- ldlm_callback_reply(req, 0);
switch (req->rq_reqmsg->opc) {
case LDLM_BL_CALLBACK:
CDEBUG(D_INODE, "blocking ast\n");
#ifdef __KERNEL__
- rc = ldlm_bl_to_thread(ldlm, ns, &dlm_req->lock_desc, lock);
+ rc = ldlm_bl_to_thread(ns, &dlm_req->lock_desc, lock);
ldlm_callback_reply(req, rc);
#else
rc = 0;
break;
case LDLM_CP_CALLBACK:
CDEBUG(D_INODE, "completion ast\n");
+ ldlm_callback_reply(req, 0);
ldlm_handle_cp_callback(req, ns, dlm_req, lock);
break;
case LDLM_GL_CALLBACK:
#endif
ENTRY;
- if (ldlm != NULL)
+ if (ldlm_state != NULL)
RETURN(-EALREADY);
- OBD_ALLOC(ldlm, sizeof(*ldlm));
- if (ldlm == NULL)
+ OBD_ALLOC(ldlm_state, sizeof(*ldlm_state));
+ if (ldlm_state == NULL)
RETURN(-ENOMEM);
#ifdef __KERNEL__
GOTO(out_free, rc);
#endif
- ldlm->ldlm_cb_service =
+ ldlm_state->ldlm_cb_service =
ptlrpc_init_svc(LDLM_NBUFS, LDLM_BUFSIZE, LDLM_MAXREQSIZE,
LDLM_CB_REQUEST_PORTAL, LDLM_CB_REPLY_PORTAL,
ldlm_callback_handler, "ldlm_cbd",
ldlm_svc_proc_dir);
- if (!ldlm->ldlm_cb_service) {
+ if (!ldlm_state->ldlm_cb_service) {
CERROR("failed to start service\n");
GOTO(out_proc, rc = -ENOMEM);
}
- ldlm->ldlm_cancel_service =
- ptlrpc_init_svc(LDLM_NBUFS, LDLM_BUFSIZE, LDLM_MAXREQSIZE,
+ ldlm_state->ldlm_cancel_service =
+ ptlrpc_init_svc(LDLM_NBUFS, LDLM_BUFSIZE, LDLM_MAXREQSIZE,
LDLM_CANCEL_REQUEST_PORTAL,
LDLM_CANCEL_REPLY_PORTAL,
ldlm_cancel_handler, "ldlm_canceld",
ldlm_svc_proc_dir);
- if (!ldlm->ldlm_cancel_service) {
+ if (!ldlm_state->ldlm_cancel_service) {
CERROR("failed to start service\n");
GOTO(out_proc, rc = -ENOMEM);
}
OBD_ALLOC(blp, sizeof(*blp));
if (blp == NULL)
GOTO(out_proc, rc = -ENOMEM);
- ldlm->ldlm_bl_pool = blp;
+ ldlm_state->ldlm_bl_pool = blp;
atomic_set(&blp->blp_num_threads, 0);
init_waitqueue_head(&blp->blp_waitq);
wait_for_completion(&blp->blp_comp);
}
- rc = ptlrpc_start_n_threads(NULL, ldlm->ldlm_cancel_service,
+ rc = ptlrpc_start_n_threads(NULL, ldlm_state->ldlm_cancel_service,
LDLM_NUM_THREADS, "ldlm_cn");
if (rc) {
LBUG();
GOTO(out_thread, rc);
}
- rc = ptlrpc_start_n_threads(NULL, ldlm->ldlm_cb_service,
+ rc = ptlrpc_start_n_threads(NULL, ldlm_state->ldlm_cb_service,
LDLM_NUM_THREADS, "ldlm_cb");
if (rc) {
LBUG();
#ifdef __KERNEL__
out_thread:
- ptlrpc_unregister_service(ldlm->ldlm_cancel_service);
- ptlrpc_unregister_service(ldlm->ldlm_cb_service);
+ ptlrpc_unregister_service(ldlm_state->ldlm_cancel_service);
+ ptlrpc_unregister_service(ldlm_state->ldlm_cb_service);
#endif
out_proc:
ldlm_proc_cleanup();
out_free:
#endif
- OBD_FREE(ldlm, sizeof(*ldlm));
- ldlm = NULL;
+ OBD_FREE(ldlm_state, sizeof(*ldlm_state));
+ ldlm_state = NULL;
return rc;
}
static int ldlm_cleanup(int force)
{
#ifdef __KERNEL__
- struct ldlm_bl_pool *blp = ldlm->ldlm_bl_pool;
+ struct ldlm_bl_pool *blp = ldlm_state->ldlm_bl_pool;
#endif
ENTRY;
}
OBD_FREE(blp, sizeof(*blp));
- ptlrpc_stop_all_threads(ldlm->ldlm_cb_service);
- ptlrpc_unregister_service(ldlm->ldlm_cb_service);
- ptlrpc_stop_all_threads(ldlm->ldlm_cancel_service);
- ptlrpc_unregister_service(ldlm->ldlm_cancel_service);
+ ptlrpc_stop_all_threads(ldlm_state->ldlm_cb_service);
+ ptlrpc_unregister_service(ldlm_state->ldlm_cb_service);
+ ptlrpc_stop_all_threads(ldlm_state->ldlm_cancel_service);
+ ptlrpc_unregister_service(ldlm_state->ldlm_cancel_service);
ldlm_proc_cleanup();
expired_lock_thread.elt_state = ELT_TERMINATE;
#endif
- OBD_FREE(ldlm, sizeof(*ldlm));
- ldlm = NULL;
+ OBD_FREE(ldlm_state, sizeof(*ldlm_state));
+ ldlm_state = NULL;
RETURN(0);
}
EXPORT_SYMBOL(ldlm_namespace_new);
EXPORT_SYMBOL(ldlm_namespace_cleanup);
EXPORT_SYMBOL(ldlm_namespace_free);
+EXPORT_SYMBOL(ldlm_namespace_dump);
+EXPORT_SYMBOL(ldlm_dump_all_namespaces);
EXPORT_SYMBOL(ldlm_resource_get);
EXPORT_SYMBOL(ldlm_resource_putref);
int compat = 1;
ENTRY;
+ lockmode_verify(req_mode);
+
list_for_each(tmp, queue) {
lock = list_entry(tmp, struct ldlm_lock, l_res_link);
lock->l_lvb_swabber = lvb_swabber;
if (policy != NULL)
memcpy(&lock->l_policy_data, policy, sizeof(*policy));
+ if (type == LDLM_EXTENT)
+ memcpy(&lock->l_req_extent, &policy->l_extent,
+ sizeof(policy->l_extent));
err = ldlm_lock_enqueue(ns, &lock, policy, flags);
if (err != ELDLM_OK)
lock->l_lvb_swabber = lvb_swabber;
if (policy != NULL)
memcpy(&lock->l_policy_data, policy, sizeof(*policy));
+ if (type == LDLM_EXTENT)
+ memcpy(&lock->l_req_extent, &policy->l_extent,
+ sizeof(policy->l_extent));
LDLM_DEBUG(lock, "client-side enqueue START");
}
LDLM_DEBUG(lock, "enqueue reply includes blocking AST");
}
- if (lvb_len) {
+ /* If the lock has already been granted by a completion AST, don't
+ * clobber the LVB with an older one. */
+ if (lvb_len && (lock->l_req_mode != lock->l_granted_mode)) {
void *tmplvb;
tmplvb = lustre_swab_repbuf(req, 1, lvb_len, lvb_swabber);
if (tmplvb == NULL)
int ldlm_cancel_lru(struct ldlm_namespace *ns)
{
- struct list_head *tmp, *next, list = LIST_HEAD_INIT(list);
+ struct list_head *tmp, *next;
int count, rc = 0;
- struct ldlm_ast_work *w;
ENTRY;
l_lock(&ns->ns_lock);
* won't see this flag and call l_blocking_ast */
lock->l_flags |= LDLM_FL_CBPENDING;
- OBD_ALLOC(w, sizeof(*w));
- LASSERT(w);
-
- w->w_lock = LDLM_LOCK_GET(lock);
- list_add(&w->w_list, &list);
+ LDLM_LOCK_GET(lock); /* dropped by bl thread */
ldlm_lock_remove_from_lru(lock);
+ ldlm_bl_to_thread(ns, NULL, lock);
if (--count == 0)
break;
}
l_unlock(&ns->ns_lock);
-
- list_for_each_safe(tmp, next, &list) {
- struct lustre_handle lockh;
- int rc;
- w = list_entry(tmp, struct ldlm_ast_work, w_list);
-
- ldlm_lock2handle(w->w_lock, &lockh);
- rc = ldlm_cli_cancel(&lockh);
- if (rc != ELDLM_OK)
- CDEBUG(D_INFO, "ldlm_cli_cancel: %d\n", rc);
-
- list_del(&w->w_list);
- LDLM_LOCK_PUT(w->w_lock);
- OBD_FREE(w, sizeof(*w));
- }
-
RETURN(rc);
}
struct ptlrpc_request *req;
struct ldlm_request *body;
struct ldlm_reply *reply;
- int size;
+ int buffers = 1;
+ int size[2];
int flags;
/*
else
flags = LDLM_FL_REPLAY;
- size = sizeof(*body);
- req = ptlrpc_prep_req(imp, LDLM_ENQUEUE, 1, &size, NULL);
+ size[0] = sizeof(*body);
+ req = ptlrpc_prep_req(imp, LDLM_ENQUEUE, 1, size, NULL);
if (!req)
RETURN(-ENOMEM);
body->lock_flags = flags;
ldlm_lock2handle(lock, &body->lock_handle1);
- size = sizeof(*reply);
- req->rq_replen = lustre_msg_size(1, &size);
+ size[0] = sizeof(*reply);
+ if (lock->l_lvb_len != 0) {
+ buffers = 2;
+ size[1] = lock->l_lvb_len;
+ }
+ req->rq_replen = lustre_msg_size(buffers, size);
LDLM_DEBUG(lock, "replaying lock:");
{
struct list_head *bucket, *tmp;
struct ldlm_resource *res = NULL;
- int rc;
ENTRY;
LASSERT(ns != NULL);
l_unlock(&ns->ns_lock);
if (create && ns->ns_lvbo && ns->ns_lvbo->lvbo_init) {
- rc = ns->ns_lvbo->lvbo_init(res);
- if (rc) {
- CERROR("lvbo_init failure %d\n", rc);
- LASSERT(ldlm_resource_putref(res) == 1);
- res = NULL;
- }
+ int rc = ns->ns_lvbo->lvbo_init(res);
+ if (rc)
+ CERROR("lvbo_init failed for resource "LPU64": rc %d\n",
+ name.name[0], rc);
}
RETURN(res);
size_lock.end = OBD_OBJECT_EOF;
/* XXX I bet we should be checking the lock ignore flags.. */
+ /* FIXME use LDLM_FL_TEST_LOCK instead */
flags = LDLM_FL_CBPENDING | LDLM_FL_BLOCK_GRANTED;
matched = obd_match(exp, lsm, LDLM_EXTENT, &size_lock,
sizeof(size_lock), LCK_PR, &flags, inode,
&match_lockh);
- /* hey, alright, we hold a size lock that covers the size we
+ /* hey, alright, we hold a size lock that covers the size we
* just found, its not going to change for a while.. */
if (matched == 1) {
set_bit(LLI_F_HAVE_OST_SIZE_LOCK, &lli->lli_flags);
obd_cancel(exp, lsm, LCK_PR, &match_lockh);
- }
+ }
RETURN(0);
}
CDEBUG(D_INFO, "trying to match res "LPU64"\n", res_id.name[0]);
+ /* FIXME use LDLM_FL_TEST_LOCK instead */
flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_CBPENDING;
if (ldlm_lock_match(obddev->obd_namespace, flags, &res_id, LDLM_PLAIN,
NULL, LCK_PR, &lockh)) {
static int ll_dir_readpage(struct file *file, struct page *page)
{
struct inode *inode = page->mapping->host;
- struct ll_sb_info *sbi = ll_i2sbi(inode);
struct ll_fid mdc_fid;
__u64 offset;
- int rc = 0;
struct ptlrpc_request *request;
- struct lustre_handle lockh;
struct mds_body *body;
- struct lookup_intent it = { .it_op = IT_READDIR };
- struct mdc_op_data data;
- struct obd_device *obddev = class_exp2obd(sbi->ll_mdc_exp);
- struct ldlm_res_id res_id =
- { .name = {inode->i_ino, (__u64)inode->i_generation} };
+ int rc = 0;
ENTRY;
CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p)\n", inode->i_ino,
inode->i_generation, inode);
- if ((inode->i_size + PAGE_CACHE_SIZE - 1) >> PAGE_SHIFT <= page->index){
- /* XXX why do we need this exactly, and why do we think that
- * an all-zero directory page is useful?
- */
- CERROR("memsetting dir page %lu to zero (size %lld)\n",
- page->index, inode->i_size);
- memset(kmap(page), 0, PAGE_CACHE_SIZE);
- kunmap(page);
- GOTO(readpage_out, rc);
- }
-
- rc = ldlm_lock_match(obddev->obd_namespace, LDLM_FL_BLOCK_GRANTED,
- &res_id, LDLM_PLAIN, NULL, LCK_PR, &lockh);
- if (!rc) {
- ll_prepare_mdc_op_data(&data, inode, NULL, NULL, 0, 0);
-
- rc = mdc_enqueue(sbi->ll_mdc_exp, LDLM_PLAIN, &it, LCK_PR,
- &data, &lockh, NULL, 0,
- ldlm_completion_ast, ll_mdc_blocking_ast,
- inode);
- request = (struct ptlrpc_request *)it.d.lustre.it_data;
- if (request)
- ptlrpc_req_finished(request);
- if (rc < 0) {
- CERROR("lock enqueue: err: %d\n", rc);
- unlock_page(page);
- RETURN(rc);
- }
- }
- ldlm_lock_dump_handle(D_OTHER, &lockh);
-
- if (PageUptodate(page)) {
- CERROR("Explain this please?\n");
- GOTO(readpage_out, rc);
- }
mdc_pack_fid(&mdc_fid, inode->i_ino, inode->i_generation, S_IFDIR);
offset = page->index << PAGE_SHIFT;
- rc = mdc_readpage(sbi->ll_mdc_exp, &mdc_fid,
+ rc = mdc_readpage(ll_i2sbi(inode)->ll_mdc_exp, &mdc_fid,
offset, page, &request);
if (!rc) {
body = lustre_msg_buf(request->rq_repmsg, 0, sizeof (*body));
LASSERT_REPSWABBED (request, 0); /* swabbed by mdc_readpage() */
inode->i_size = body->size;
+ SetPageUptodate(page);
}
ptlrpc_req_finished(request);
- EXIT;
-
- readpage_out:
- if (!rc)
- SetPageUptodate(page);
unlock_page(page);
- ldlm_lock_decref(&lockh, LCK_PR);
+ EXIT;
return rc;
}
static struct page *ll_get_dir_page(struct inode *dir, unsigned long n)
{
+ struct ldlm_res_id res_id =
+ { .name = { dir->i_ino, (__u64)dir->i_generation} };
+ struct lustre_handle lockh;
+ struct obd_device *obddev = class_exp2obd(ll_i2sbi(dir)->ll_mdc_exp);
struct address_space *mapping = dir->i_mapping;
- struct page *page = read_cache_page(mapping, n,
- (filler_t*)mapping->a_ops->readpage, NULL);
+ struct page *page;
+ int rc;
+
+ rc = ldlm_lock_match(obddev->obd_namespace, LDLM_FL_BLOCK_GRANTED,
+ &res_id, LDLM_PLAIN, NULL, LCK_PR, &lockh);
+ if (!rc) {
+ struct lookup_intent it = { .it_op = IT_READDIR };
+ struct ptlrpc_request *request;
+ struct mdc_op_data data;
+
+ ll_prepare_mdc_op_data(&data, dir, NULL, NULL, 0, 0);
+
+ rc = mdc_enqueue(ll_i2sbi(dir)->ll_mdc_exp, LDLM_PLAIN, &it,
+ LCK_PR, &data, &lockh, NULL, 0,
+ ldlm_completion_ast, ll_mdc_blocking_ast, dir);
+
+ request = (struct ptlrpc_request *)it.d.lustre.it_data;
+ if (request)
+ ptlrpc_req_finished(request);
+ if (rc < 0) {
+ CERROR("lock enqueue: rc: %d\n", rc);
+ return ERR_PTR(rc);
+ }
+ }
+ ldlm_lock_dump_handle(D_OTHER, &lockh);
+
+ page = read_cache_page(mapping, n,
+ (filler_t*)mapping->a_ops->readpage, NULL);
if (!IS_ERR(page)) {
wait_on_page(page);
(void)kmap(page);
if (PageError(page))
goto fail;
}
+
+out_unlock:
+ ldlm_lock_decref(&lockh, LCK_PR);
return page;
fail:
ext2_put_page(page);
- return ERR_PTR(-EIO);
+ page = ERR_PTR(-EIO);
+ goto out_unlock;
}
-
/*
* p is at least 6 bytes before the end of page
*/
int ll_readdir(struct file * filp, void * dirent, filldir_t filldir)
{
- loff_t pos = filp->f_pos;
struct inode *inode = filp->f_dentry->d_inode;
+ loff_t pos = filp->f_pos;
// XXX struct super_block *sb = inode->i_sb;
unsigned offset = pos & ~PAGE_CACHE_MASK;
unsigned long n = pos >> PAGE_CACHE_SHIFT;
unsigned chunk_mask = ~(ext2_chunk_size(inode)-1);
unsigned char *types = NULL;
int need_revalidate = (filp->f_version != inode->i_version);
+ int rc = 0;
ENTRY;
- CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p)\n", inode->i_ino,
- inode->i_generation, inode);
+ CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p) pos %llu/%llu\n",
+ inode->i_ino, inode->i_generation, inode, pos, inode->i_size);
+
if (pos > inode->i_size - EXT2_DIR_REC_LEN(1))
- GOTO(done, 0);
+ RETURN(0);
types = ext2_filetype_table;
ext2_dirent *de;
struct page *page;
- CDEBUG(D_EXT2, "reading %lu of dir %lu page %lu, size %llu\n",
- PAGE_CACHE_SIZE, inode->i_ino, n, inode->i_size);
+ CDEBUG(D_EXT2,"read %lu of dir %lu/%u page %lu/%lu size %llu\n",
+ PAGE_CACHE_SIZE, inode->i_ino, inode->i_generation,
+ n, npages, inode->i_size);
page = ll_get_dir_page(inode, n);
/* size might have been updated by mdc_readpage */
npages = dir_pages(inode);
- if (IS_ERR(page))
+ if (IS_ERR(page)) {
+ rc = PTR_ERR(page);
+ CERROR("error reading dir %lu/%u page %lu: rc %d\n",
+ inode->i_ino, inode->i_generation, n, rc);
continue;
+ }
+
kaddr = page_address(page);
if (need_revalidate) {
offset = ext2_validate_entry(kaddr, offset, chunk_mask);
int over;
unsigned char d_type = DT_UNKNOWN;
+ rc = 0; /* no error if we return something */
if (types && de->file_type < EXT2_FT_MAX)
d_type = types[de->file_type];
le32_to_cpu(de->inode), d_type);
if (over) {
ext2_put_page(page);
- GOTO(done,0);
+ GOTO(done, rc);
}
}
}
filp->f_pos = (n << PAGE_CACHE_SHIFT) | offset;
filp->f_version = inode->i_version;
update_atime(inode);
- RETURN(0);
+ RETURN(rc);
}
static int ll_dir_ioctl(struct inode *inode, struct file *file,
*
* No one can dirty the extent until we've finished our work and they can
* enqueue another lock. The DLM protects us from ll_file_read/write here,
- * but other kernel actors could have pages locked. */
+ * but other kernel actors could have pages locked.
+ *
+ * Called with the DLM lock held. */
void ll_pgcache_remove_extent(struct inode *inode, struct lov_stripe_md *lsm,
struct ldlm_lock *lock, __u32 stripe)
{
- struct ldlm_extent *extent = &lock->l_policy_data.l_extent;
+ ldlm_policy_data_t tmpex;
unsigned long start, end, count, skip, i, j;
struct page *page;
- int rc, discard = lock->l_flags & LDLM_FL_DISCARD_DATA;
+ int rc, rc2, discard = lock->l_flags & LDLM_FL_DISCARD_DATA;
+ struct lustre_handle lockh;
ENTRY;
- CDEBUG(D_INODE, "obdo %lu inode %p ["LPU64"->"LPU64"] size: %llu\n",
- inode->i_ino, inode, extent->start, extent->end, inode->i_size);
+ memcpy(&tmpex, &lock->l_policy_data, sizeof(tmpex));
+ CDEBUG(D_INODE|D_PAGE, "inode %lu(%p) ["LPU64"->"LPU64"] size: %llu\n",
+ inode->i_ino, inode, tmpex.l_extent.start, tmpex.l_extent.end,
+ inode->i_size);
/* our locks are page granular thanks to osc_enqueue, we invalidate the
* whole page. */
- LASSERT((extent->start & ~PAGE_CACHE_MASK) == 0);
- LASSERT(((extent->end+1) & ~PAGE_CACHE_MASK) == 0);
+ LASSERT((tmpex.l_extent.start & ~PAGE_CACHE_MASK) == 0);
+ LASSERT(((tmpex.l_extent.end + 1) & ~PAGE_CACHE_MASK) == 0);
- start = extent->start >> PAGE_CACHE_SHIFT;
count = ~0;
skip = 0;
- end = (extent->end >> PAGE_CACHE_SHIFT) + 1;
- if ((end << PAGE_CACHE_SHIFT) < extent->end)
- end = ~0;
+ start = tmpex.l_extent.start >> PAGE_CACHE_SHIFT;
+ end = tmpex.l_extent.end >> PAGE_CACHE_SHIFT;
if (lsm->lsm_stripe_count > 1) {
count = lsm->lsm_stripe_size >> PAGE_CACHE_SHIFT;
skip = (lsm->lsm_stripe_count - 1) * count;
- start += (start/count * skip) + (stripe * count);
+ start += start/count * skip + stripe * count;
if (end != ~0)
- end += (end/count * skip) + (stripe * count);
- }
+ end += end/count * skip + stripe * count;
+ }
+ if (end < tmpex.l_extent.end >> PAGE_CACHE_SHIFT)
+ end = ~0;
i = (inode->i_size + PAGE_CACHE_SIZE-1) >> PAGE_CACHE_SHIFT;
if (i < end)
end = i;
- CDEBUG(D_INODE, "walking page indices start: %lu j: %lu count: %lu "
- "skip: %lu end: %lu%s\n", start, start % count, count, skip, end,
- discard ? " (DISCARDING)" : "");
+ CDEBUG(D_INODE|D_PAGE, "walking page indices start: %lu j: %lu "
+ "count: %lu skip: %lu end: %lu%s\n", start, start % count,
+ count, skip, end, discard ? " (DISCARDING)" : "");
/* this is the simplistic implementation of page eviction at
* cancelation. It is careful to get races with other page
* lockers handled correctly. fixes from bug 20 will make it
* more efficient by associating locks with pages and with
* batching writeback under the lock explicitly. */
- for (i = start, j = start % count ; ; j++, i++) {
- if (j == count) {
- i += skip;
- j = 0;
- }
- if (i >= end)
- break;
+ for (i = start, j = start % count ; i <= end;
+ j++, i++, tmpex.l_extent.start += PAGE_CACHE_SIZE) {
+ LASSERTF(tmpex.l_extent.start< lock->l_policy_data.l_extent.end,
+ LPU64" >= "LPU64" start %lu i %lu end %lu\n",
+ tmpex.l_extent.start, lock->l_policy_data.l_extent.end,
+ start, i, end);
ll_pgcache_lock(inode->i_mapping);
if (list_empty(&inode->i_mapping->dirty_pages) &&
list_empty(&inode->i_mapping->clean_pages) &&
list_empty(&inode->i_mapping->locked_pages)) {
- CDEBUG(D_INODE, "nothing left\n");
+ CDEBUG(D_INODE|D_PAGE, "nothing left\n");
ll_pgcache_unlock(inode->i_mapping);
break;
}
page = find_get_page(inode->i_mapping, i);
if (page == NULL)
- continue;
- LL_CDEBUG_PAGE(page, "locking page\n");
+ goto next_index;
+ LL_CDEBUG_PAGE(D_PAGE, page, "locking page\n");
lock_page(page);
/* page->mapping to check with racing against teardown */
if (page->mapping && PageDirty(page) && !discard) {
ClearPageDirty(page);
- LL_CDEBUG_PAGE(page, "found dirty\n");
+ LL_CDEBUG_PAGE(D_PAGE, page, "found dirty\n");
ll_pgcache_lock(inode->i_mapping);
list_del(&page->list);
list_add(&page->list, &inode->i_mapping->locked_pages);
lock_page(page);
}
- /* checking again to account for writeback's lock_page() */
- if (page->mapping != NULL) {
- LL_CDEBUG_PAGE(page, "truncating\n");
+ tmpex.l_extent.end = tmpex.l_extent.start + PAGE_CACHE_SIZE - 1;
+ /* check to see if another DLM lock covers this page */
+ rc2 = ldlm_lock_match(lock->l_resource->lr_namespace,
+ LDLM_FL_BLOCK_GRANTED|LDLM_FL_CBPENDING |
+ LDLM_FL_TEST_LOCK,
+ &lock->l_resource->lr_name, LDLM_EXTENT,
+ &tmpex, LCK_PR | LCK_PW, &lockh);
+ if (rc2 == 0 && page->mapping != NULL) {
+ // checking again to account for writeback's lock_page()
+ LL_CDEBUG_PAGE(D_PAGE, page, "truncating\n");
ll_truncate_complete_page(page);
}
unlock_page(page);
page_cache_release(page);
+
+ next_index:
+ if (j == count) {
+ i += skip;
+ j = 0;
+ }
}
EXIT;
}
CERROR("ldlm_cli_cancel failed: %d\n", rc);
break;
case LDLM_CB_CANCELING: {
- struct inode *inode = ll_inode_from_lock(lock);
+ struct inode *inode;
struct ll_inode_info *lli;
struct lov_stripe_md *lsm;
__u32 stripe;
__u64 kms;
+ /* This lock wasn't granted, don't try to evict pages */
+ if (lock->l_req_mode != lock->l_granted_mode)
+ RETURN(0);
+
+ inode = ll_inode_from_lock(lock);
if (inode == NULL)
RETURN(0);
lli = ll_i2info(inode);
LBUG();
}
- LDLM_DEBUG(lock, "i_size: %Lu -> stripe number %d -> size %Lu",
+ LDLM_DEBUG(lock, "i_size: %llu -> stripe number %u -> size "LPU64,
inode->i_size, data.stripe_number, data.size);
rc = lustre_pack_reply(req, 1, &size, NULL);
LCK_PR, &flags, ll_extent_lock_callback,
ldlm_completion_ast, ll_glimpse_callback, inode,
sizeof(*lvb), lustre_swab_ost_lvb, &lockh);
- if (rc > 0)
+ if (rc > 0) {
+ CERROR("obd_enqueue returned rc %d, returning -EIO\n", rc);
RETURN(-EIO);
+ }
lvb->lvb_size = lov_merge_size(lli->lli_smd, 0);
//inode->i_mtime = lov_merge_mtime(lli->lli_smd, inode->i_mtime);
inode->i_size = kms;
}
- CDEBUG(D_INFO, "Reading inode %lu, "LPSZ" bytes, offset %Ld, i_size "
- LPU64"\n", inode->i_ino, count, *ppos, inode->i_size);
+ CDEBUG(D_INFO, "Read ino %lu, "LPSZ" bytes, offset %lld, i_size %llu\n",
+ inode->i_ino, count, *ppos, inode->i_size);
/* turn off the kernel's read-ahead */
#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
CDEBUG(D_INFO, "trying to match res "LPU64"\n", res_id.name[0]);
+ /* FIXME use LDLM_FL_TEST_LOCK instead */
flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_CBPENDING;
if (ldlm_lock_match(obddev->obd_namespace, flags, &res_id, LDLM_PLAIN,
NULL, LCK_PR, &lockh)) {
struct page *llap_page;
struct list_head llap_pending_write;
/* only trust these if the page lock is providing exclusion */
- int llap_write_queued:1,
+ int llap_write_queued:1,
llap_defer_uptodate:1;
struct list_head llap_proc_item;
};
-#define LL_CDEBUG_PAGE(page, STR) \
- CDEBUG(D_PAGE, "page %p map %p ind %lu priv %0lx: " STR, \
- page, page->mapping, page->index, page->private)
+#define LL_CDEBUG_PAGE(mask, page, fmt, arg...) \
+ CDEBUG(mask, "page %p map %p ind %lu priv %0lx: " fmt, \
+ page, page->mapping, page->index, page->private, ## arg)
/* llite/lproc_llite.c */
int lprocfs_register_mountpoint(struct proc_dir_entry *parent,
#define LL_SBI_NOLCK 0x1
#define LL_SBI_READAHEAD 0x2
+#define LL_MAX_BLKSIZE (4UL * 1024 * 1024)
+
#if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0))
#define ll_s2sbi(sb) ((struct ll_sb_info *)((sb)->s_fs_info))
void __d_rehash(struct dentry * entry, int lock);
GOTO(out_root, err);
}
+ /* bug 2805 - set VM readahead to zero */
+ vm_max_readahead = vm_min_readahead = 0;
sb->s_root = d_alloc_root(root);
RETURN(err);
int err;
ENTRY;
+ if (lmd_bad_magic(lmd))
+ RETURN(-EINVAL);
+
generate_random_uuid(uuid);
class_uuid_unparse(uuid, &mdc_uuid);
ENTRY;
CDEBUG(D_VFSTRACE, "VFS Op: sb %p\n", sb);
- if (lmd == NULL) {
- CERROR("lustre_mount_data is NULL: check that /sbin/mount.lustre exists?\n");
+ if (lmd_bad_magic(lmd))
RETURN(-EINVAL);
- }
+
sbi = lustre_init_sbi(sb);
if (!sbi)
RETURN(-ENOMEM);
LBUG();
}
}
+ /* bug 2844 - limit i_blksize for broken user-space apps */
+ LASSERTF(lsm->lsm_xfersize != 0, "%lu\n", lsm->lsm_xfersize);
+ inode->i_blksize = min(lsm->lsm_xfersize, LL_MAX_BLKSIZE);
if (lli->lli_smd != lsm)
obd_free_memmd(ll_i2obdexp(inode), &lsm);
}
#include <linux/lustre_lite.h>
#include <linux/lprocfs_status.h>
#include <linux/seq_file.h>
+#include <linux/obd_support.h>
#include "llite_internal.h"
struct ll_sb_info *sbi = dp->data;
int rc;
- llap = kmalloc(sizeof(*llap), GFP_KERNEL);
+ OBD_ALLOC_GFP(llap, sizeof(*llap), GFP_KERNEL);
if (llap == NULL)
return -ENOMEM;
llap->llap_page = NULL;
llap->llap_cookie = sbi;
llap->llap_magic = 0;
-
+
rc = seq_open(file, &llite_dump_pgcache_seq_sops);
if (rc) {
- kfree(llap);
+ OBD_FREE(llap, sizeof(*llap));
return rc;
}
seq = file->private_data;
return 0;
}
-static int llite_dump_pgcache_seq_release(struct inode *inode,
+static int llite_dump_pgcache_seq_release(struct inode *inode,
struct file *file)
{
struct seq_file *seq = file->private_data;
if (!list_empty(&llap->llap_proc_item))
list_del_init(&llap->llap_proc_item);
spin_unlock(&sbi->ll_pglist_lock);
- kfree(llap);
+ OBD_FREE(llap, sizeof(*llap));
return seq_release(inode, file);
}
struct file_operations llite_dump_pgcache_fops = {
.open = llite_dump_pgcache_seq_open,
.read = seq_read,
- .release = llite_dump_pgcache_seq_release,
+ .release = llite_dump_pgcache_seq_release,
};
#endif /* LPROCFS */
if (TryLockPage(page))
RETURN(-EAGAIN);
- LL_CDEBUG_PAGE(page, "made ready\n");
+ LL_CDEBUG_PAGE(D_PAGE, page, "made ready\n");
page_cache_get(page);
/* if we left PageDirty we might get another writepage call
oig_release(oig);
GOTO(out, rc);
}
- LL_CDEBUG_PAGE(page, "write queued\n");
+ LL_CDEBUG_PAGE(D_PAGE, page, "write queued\n");
//llap_write_pending(inode, llap);
} else {
lprocfs_counter_incr(ll_i2sbi(inode)->ll_stats,
return;
}
- LL_CDEBUG_PAGE(page, "being evicted\n");
+ LL_CDEBUG_PAGE(D_PAGE, page, "being evicted\n");
exp = ll_i2obdexp(inode);
if (exp == NULL) {
page_extent.l_extent.start = (__u64)page->index << PAGE_CACHE_SHIFT;
page_extent.l_extent.end =
page_extent.l_extent.start + PAGE_CACHE_SIZE - 1;
- flags = LDLM_FL_CBPENDING | LDLM_FL_BLOCK_GRANTED;
+ flags = LDLM_FL_CBPENDING | LDLM_FL_BLOCK_GRANTED | LDLM_FL_TEST_LOCK;
matches = obd_match(ll_i2sbi(inode)->ll_osc_exp,
ll_i2info(inode)->lli_smd, LDLM_EXTENT,
- &page_extent, LCK_PR, &flags, inode, &match_lockh);
- if (matches < 0) {
- LL_CDEBUG_PAGE(page, "lock match failed\n");
- RETURN(matches);
- }
- if (matches) {
- obd_cancel(ll_i2sbi(inode)->ll_osc_exp,
- ll_i2info(inode)->lli_smd, LCK_PR, &match_lockh);
- }
+ &page_extent, LCK_PR | LCK_PW, &flags, inode,
+ &match_lockh);
+ if (matches < 0)
+ LL_CDEBUG_PAGE(D_ERROR, page, "lock match failed: rc %d\n",
+ matches);
RETURN(matches);
}
NULL, oig, llap->llap_cookie, OBD_BRW_READ, 0,
PAGE_SIZE, 0, ASYNC_COUNT_STABLE);
if (rc) {
- LL_CDEBUG_PAGE(page, "read queueing failed\n");
+ LL_CDEBUG_PAGE(D_ERROR, page, "read queue failed: rc %d\n", rc);
page_cache_release(page);
}
RETURN(rc);
}
#define LL_RA_MIN(inode) ((unsigned long)PTL_MD_MAX_PAGES / 2)
-#define LL_RA_MAX(inode) (inode->i_blksize * 3)
+#define LL_RA_MAX(inode) ((ll_i2info(inode)->lli_smd->lsm_xfersize * 3) >> \
+ PAGE_CACHE_SHIFT)
static void ll_readahead(struct ll_readahead_state *ras,
struct obd_export *exp, struct address_space *mapping,
rc = ll_issue_page_read(exp, llap, oig, 1);
if (rc == 0)
- LL_CDEBUG_PAGE(page, "started read-ahead\n");
+ LL_CDEBUG_PAGE(D_PAGE, page, "started read-ahead\n");
if (rc) {
next_page:
- LL_CDEBUG_PAGE(page, "skipping read-ahead\n");
+ LL_CDEBUG_PAGE(D_PAGE, page, "skipping read-ahead\n");
unlock_page(page);
}
ll_readahead(&fd->fd_ras, exp, page->mapping, oig);
obd_trigger_group_io(exp, ll_i2info(inode)->lli_smd, NULL,
oig);
- LL_CDEBUG_PAGE(page, "marking uptodate from defer\n");
+ LL_CDEBUG_PAGE(D_PAGE, page, "marking uptodate from defer\n");
SetPageUptodate(page);
unlock_page(page);
GOTO(out_oig, rc = 0);
if (rc == 0) {
static unsigned long next_print;
- CDEBUG(D_INODE, "didn't match a lock\n");
+ CDEBUG(D_INODE, "ino %lu page %lu (%llu) didn't match a lock\n",
+ inode->i_ino, page->index,
+ (long long)page->index << PAGE_CACHE_SHIFT);
if (time_after(jiffies, next_print)) {
+ CERROR("ino %lu page %lu (%llu) not covered by "
+ "a lock (mmap?). check debug logs.\n",
+ inode->i_ino, page->index,
+ (long long)page->index << PAGE_CACHE_SHIFT);
+ ldlm_dump_all_namespaces();
+ if (next_print == 0) {
+ CERROR("%s\n", portals_debug_dumpstack());
+ portals_debug_dumplog();
+ }
next_print = jiffies + 30 * HZ;
- CERROR("not covered by a lock (mmap?). check debug "
- "logs.\n");
}
}
if (rc)
GOTO(out, rc);
- LL_CDEBUG_PAGE(page, "queued readpage\n");
+ LL_CDEBUG_PAGE(D_PAGE, page, "queued readpage\n");
if ((ll_i2sbi(inode)->ll_flags & LL_SBI_READAHEAD))
ll_readahead(&fd->fd_ras, exp, page->mapping, oig);
if (IS_ERR(llap))
RETURN(PTR_ERR(llap));
- LL_CDEBUG_PAGE(page, "setting ready|urgent\n");
+ LL_CDEBUG_PAGE(D_PAGE, page, "setting ready|urgent\n");
rc = obd_set_async_flags(exp, ll_i2info(page->mapping->host)->lli_smd,
NULL, llap->llap_cookie,
} else {
llap->llap_write_queued = 0;
}
- } else {
+ } else {
SetPageError(page);
}
- LL_CDEBUG_PAGE(page, "io complete, unlocking\n");
+ LL_CDEBUG_PAGE(D_PAGE, page, "io complete, unlocking\n");
unlock_page(page);
page_cache_get(page);
if (llap->llap_write_queued) {
- LL_CDEBUG_PAGE(page, "marking urgent\n");
+ LL_CDEBUG_PAGE(D_PAGE, page, "marking urgent\n");
rc = obd_set_async_flags(exp, ll_i2info(inode)->lli_smd, NULL,
llap->llap_cookie,
ASYNC_READY | ASYNC_URGENT);
llap->llap_cookie, OBD_BRW_WRITE, 0, 0,
0, ASYNC_READY | ASYNC_URGENT);
if (rc == 0)
- LL_CDEBUG_PAGE(page, "mmap write queued\n");
+ LL_CDEBUG_PAGE(D_PAGE, page, "mmap write queued\n");
else
llap->llap_write_queued = 0;
}
} else {
llap->llap_write_queued = 0;
}
- } else {
+ } else {
SetPageError(page);
}
- LL_CDEBUG_PAGE(page, "io complete, unlocking\n");
+ LL_CDEBUG_PAGE(D_PAGE, page, "io complete, unlocking\n");
unlock_page(page);
page_cache_get(page);
if (llap->llap_write_queued) {
- LL_CDEBUG_PAGE(page, "marking urgent\n");
+ LL_CDEBUG_PAGE(D_PAGE, page, "marking urgent\n");
rc = obd_set_async_flags(exp, ll_i2info(inode)->lli_smd, NULL,
llap->llap_cookie,
ASYNC_READY | ASYNC_URGENT);
llap->llap_cookie, OBD_BRW_WRITE, 0, 0,
0, ASYNC_READY | ASYNC_URGENT);
if (rc == 0)
- LL_CDEBUG_PAGE(page, "mmap write queued\n");
+ LL_CDEBUG_PAGE(D_PAGE, page, "mmap write queued\n");
else
llap->llap_write_queued = 0;
}
/* lov_log.c */
int lov_llog_init(struct obd_device *obd, struct obd_device *tgt,
- int count, struct llog_logid *logid);
+ int count, struct llog_catid *logid);
int lov_llog_finish(struct obd_device *obd, int count);
/* lov_pack.c */
struct lov_mds_md *lmm, int lmm_bytes);
int lov_setstripe(struct obd_export *exp,
struct lov_stripe_md **lsmp, struct lov_user_md *lump);
-int lov_setea(struct obd_export *exp, struct lov_stripe_md **lsmp,
+int lov_setea(struct obd_export *exp, struct lov_stripe_md **lsmp,
struct lov_user_md *lump);
int lov_getstripe(struct obd_export *exp,
struct lov_stripe_md *lsm, struct lov_user_md *lump);
static int lov_llog_origin_connect(struct llog_ctxt *ctxt, int count,
struct llog_logid *logid,
- struct llog_gen *gen)
+ struct llog_gen *gen,
+ struct obd_uuid *uuid)
{
struct obd_device *obd = ctxt->loc_obd;
struct lov_obd *lov = &obd->u.lov;
for (i = 0; i < lov->desc.ld_tgt_count; i++) {
struct obd_device *child = lov->tgts[i].ltd_exp->exp_obd;
struct llog_ctxt *cctxt = llog_get_context(child, ctxt->loc_idx);
- rc = llog_connect(cctxt, 1, logid, gen);
+
+ if (uuid && !obd_uuid_equals(uuid, &lov->tgts[i].uuid))
+ continue;
+
+ rc = llog_connect(cctxt, 1, logid, gen, uuid);
if (rc) {
CERROR("error osc_llog_connect %d\n", i);
break;
int lov_llog_init(struct obd_device *obd, struct obd_device *tgt,
- int count, struct llog_logid *logid)
+ int count, struct llog_catid *logid)
{
struct lov_obd *lov = &obd->u.lov;
int i, rc = 0;
ENTRY;
-
+
rc = llog_setup(obd, LLOG_UNLINK_ORIG_CTXT, tgt, 0, NULL,
&lov_unlink_orig_logops);
if (rc)
RETURN(rc);
- rc = llog_setup(obd, LLOG_SIZE_REPL_CTXT, tgt, 0, NULL,
+ rc = llog_setup(obd, LLOG_SIZE_REPL_CTXT, tgt, 0, NULL,
&lov_size_repl_logops);
if (rc)
RETURN(rc);
break;
}
if (rc == 1) {
- if (lsm->lsm_stripe_count > 1)
+ if (lsm->lsm_stripe_count > 1) {
+ if (*flags & LDLM_FL_TEST_LOCK)
+ lov_llh_destroy(lov_lockh);
lov_llh_put(lov_lockh);
+ }
RETURN(1);
}
for (i = 0; i < lov->desc.ld_tgt_count; i++) {
int er;
- if (!lov->tgts[i].active)
+ if (val && !obd_uuid_equals(val, &lov->tgts[i].uuid))
+ continue;
+
+ if (!val && !lov->tgts[i].active)
continue;
er = obd_set_info(lov->tgts[i].ltd_exp, keylen, key, vallen,
(*lsmp)->lsm_magic = LOV_MAGIC;
(*lsmp)->lsm_stripe_count = stripe_count;
(*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES * stripe_count;
+ (*lsmp)->lsm_xfersize = PTL_MTU * stripe_count;
(*lsmp)->lsm_pattern = pattern;
(*lsmp)->lsm_oinfo[0].loi_ost_idx = ~0;
lsm->lsm_object_id = le64_to_cpu(lmm->lmm_object_id);
/* lsm->lsm_object_gr = 0; implicit */
lsm->lsm_stripe_size = le32_to_cpu(lmm->lmm_stripe_size);
+ lsm->lsm_xfersize = lsm->lsm_stripe_size * lsm->lsm_stripe_count;
lsm->lsm_pattern = LOV_PATTERN_RAID0;
ost_offset = le32_to_cpu(lmm->lmm_stripe_offset);
ost_count = le16_to_cpu(lmm->lmm_ost_count);
lsm->lsm_object_gr = le64_to_cpu(lmm->lmm_object_gr);
lsm->lsm_stripe_size = le32_to_cpu(lmm->lmm_stripe_size);
lsm->lsm_pattern = le32_to_cpu(lmm->lmm_pattern);
+ lsm->lsm_xfersize = lsm->lsm_stripe_size * lsm->lsm_stripe_count;
for (i = 0, loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++) {
/* XXX LOV STACKING call down to osc_unpackmd() */
(*lsmp)->lsm_oinfo[0].loi_ost_idx = lum.lmm_stripe_offset;
(*lsmp)->lsm_stripe_size = lum.lmm_stripe_size;
+ (*lsmp)->lsm_xfersize = lum.lmm_stripe_size * stripe_count;
RETURN(0);
}
#include <linux/lvfs.h>
-struct dentry *lvfs_fid2dentry(struct obd_run_ctxt *ctxt, __u64 id, __u32 gen, __u64 gr,
- void *data)
+struct dentry *lvfs_fid2dentry(struct obd_run_ctxt *ctxt, __u64 id,
+ __u32 gen, __u64 gr, void *data)
{
return ctxt->cb_ops.l_fid2dentry(id, gen, gr, data);
}
EXPORT_SYMBOL(pop_ctxt);
/* utility to make a file */
-struct dentry *simple_mknod(struct dentry *dir, char *name, int mode)
+struct dentry *simple_mknod(struct dentry *dir, char *name, int mode, int fix)
{
struct dentry *dchild;
int err = 0;
GOTO(out_err, err = -EEXIST);
/* Fixup file permissions if necessary */
- if ((old_mode & S_IALLUGO) != (mode & S_IALLUGO)) {
+ if (fix && (old_mode & S_IALLUGO) != (mode & S_IALLUGO)) {
CWARN("fixing permissions on %s from %o to %o\n",
name, old_mode, mode);
dchild->d_inode->i_mode = (mode & S_IALLUGO) |
EXPORT_SYMBOL(simple_mknod);
/* utility to make a directory */
-struct dentry *simple_mkdir(struct dentry *dir, char *name, int mode)
+struct dentry *simple_mkdir(struct dentry *dir, char *name, int mode, int fix)
{
struct dentry *dchild;
int err = 0;
GOTO(out_err, err = -ENOTDIR);
/* Fixup directory permissions if necessary */
- if ((old_mode & S_IALLUGO) != (mode & S_IALLUGO)) {
+ if (fix && (old_mode & S_IALLUGO) != (mode & S_IALLUGO)) {
CWARN("fixing permissions on %s from %o to %o\n",
name, old_mode, mode);
dchild->d_inode->i_mode = (mode & S_IALLUGO) |
{
union ptlrpc_async_args *aa = data;
struct mdc_rpc_lock *rpc_lock = aa->pointer_arg[0];
-
- mdc_put_rpc_lock(rpc_lock, NULL);
+ struct obd_device *obd = aa->pointer_arg[1];
+
+ if (rpc_lock == NULL) {
+ CERROR("called with NULL rpc_lock\n");
+ } else {
+ mdc_put_rpc_lock(rpc_lock, NULL);
+ LASSERTF(req->rq_async_args.pointer_arg[0] ==
+ obd->u.cli.cl_rpc_lock, "%p != %p\n",
+ req->rq_async_args.pointer_arg[0],
+ obd->u.cli.cl_rpc_lock);
+ aa->pointer_arg[0] = NULL;
+ }
wake_up(&req->rq_reply_waitq);
RETURN(rc);
}
-/* We can't use ptlrpc_check_reply, because we don't want to wake up for
+/* We can't use ptlrpc_check_reply, because we don't want to wake up for
* anything but a reply or an error. */
static int mdc_close_check_reply(struct ptlrpc_request *req)
{
struct ptlrpc_request *req;
struct mdc_open_data *mod;
struct l_wait_info lwi;
- struct mdc_rpc_lock *rpc_lock = obd->u.cli.cl_rpc_lock;
ENTRY;
req = ptlrpc_prep_req(class_exp2cliimp(exp), MDS_CLOSE, 1, &reqsize,
/* We hand a ref to the rpcd here, so we need another one of our own. */
ptlrpc_request_addref(req);
- mdc_get_rpc_lock(rpc_lock, NULL);
+ mdc_get_rpc_lock(obd->u.cli.cl_rpc_lock, NULL);
req->rq_interpret_reply = mdc_close_interpret;
- req->rq_async_args.pointer_arg[0] = rpc_lock;
+ req->rq_async_args.pointer_arg[0] = obd->u.cli.cl_rpc_lock;
+ req->rq_async_args.pointer_arg[1] = obd;
ptlrpcd_add_req(req);
lwi = LWI_TIMEOUT_INTR(MAX(req->rq_timeout * HZ, 1), go_back_to_sleep,
NULL, NULL);
"close succeeded. Please tell CFS.\n");
}
}
+ if (req->rq_async_args.pointer_arg[0] != NULL) {
+ CERROR("returned without dropping rpc_lock: rc %d\n", rc);
+ mdc_close_interpret(req, &req->rq_async_args, rc);
+ portals_debug_dumplog();
+ }
EXIT;
out:
return lprocfs_obd_detach(dev);
}
+static int mdc_import_event(struct obd_device *obd,
+ struct obd_import *imp,
+ enum obd_import_event event)
+{
+ int rc = 0;
+
+ LASSERT(imp->imp_obd == obd);
+
+ switch (event) {
+ case IMP_EVENT_DISCON: {
+ break;
+ }
+ case IMP_EVENT_INVALIDATE: {
+ struct ldlm_namespace *ns = obd->obd_namespace;
+
+ ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
+
+ if (obd->obd_observer)
+ rc = obd_notify(obd->obd_observer, obd, 0);
+ break;
+ }
+ case IMP_EVENT_ACTIVE: {
+ if (obd->obd_observer)
+ rc = obd_notify(obd->obd_observer, obd, 1);
+ break;
+ }
+ default:
+ CERROR("Unknown import event %d\n", event);
+ LBUG();
+ }
+ RETURN(rc);
+}
+
static int mdc_setup(struct obd_device *obd, obd_count len, void *buf)
{
struct client_obd *cli = &obd->u.cli;
static int mdc_llog_init(struct obd_device *obd, struct obd_device *tgt,
- int count, struct llog_logid *logid)
+ int count, struct llog_catid *logid)
{
struct llog_ctxt *ctxt;
int rc;
&llog_client_ops);
if (rc == 0) {
ctxt = llog_get_context(obd, LLOG_CONFIG_REPL_CTXT);
- ctxt->loc_imp = obd->u.cli.cl_import;
+ ctxt->loc_imp = obd->u.cli.cl_import;
}
RETURN(rc);
o_statfs: mdc_statfs,
o_pin: mdc_pin,
o_unpin: mdc_unpin,
+ o_import_event: mdc_import_event,
o_llog_init: mdc_llog_init,
o_llog_finish: mdc_llog_finish,
};
RETURN(rc);
}
-static int mds_disconnect(struct obd_export *export, int flags)
+static int mds_disconnect(struct obd_export *exp, int flags)
{
unsigned long irqflags;
int rc;
ENTRY;
- ldlm_cancel_locks_for_export(export);
+ LASSERT(exp);
+ class_export_get(exp);
+
+ spin_lock_irqsave(&exp->exp_lock, irqflags);
+ exp->exp_flags = flags;
+ spin_unlock_irqrestore(&exp->exp_lock, irqflags);
+
+ /* Disconnect early so that clients can't keep using export */
+ rc = class_disconnect(exp, flags);
+ ldlm_cancel_locks_for_export(exp);
/* complete all outstanding replies */
- spin_lock_irqsave (&export->exp_lock, irqflags);
- while (!list_empty (&export->exp_outstanding_replies)) {
+ spin_lock_irqsave(&exp->exp_lock, irqflags);
+ while (!list_empty(&exp->exp_outstanding_replies)) {
struct ptlrpc_reply_state *rs =
- list_entry (export->exp_outstanding_replies.next,
- struct ptlrpc_reply_state, rs_exp_list);
+ list_entry(exp->exp_outstanding_replies.next,
+ struct ptlrpc_reply_state, rs_exp_list);
struct ptlrpc_service *svc = rs->rs_srv_ni->sni_service;
- spin_lock (&svc->srv_lock);
- list_del_init (&rs->rs_exp_list);
- ptlrpc_schedule_difficult_reply (rs);
- spin_unlock (&svc->srv_lock);
+ spin_lock(&svc->srv_lock);
+ list_del_init(&rs->rs_exp_list);
+ ptlrpc_schedule_difficult_reply(rs);
+ spin_unlock(&svc->srv_lock);
}
- spin_unlock_irqrestore (&export->exp_lock, irqflags);
+ spin_unlock_irqrestore(&exp->exp_lock, irqflags);
- spin_lock_irqsave(&export->exp_lock, irqflags);
- export->exp_flags = flags;
- spin_unlock_irqrestore(&export->exp_lock, irqflags);
-
- rc = class_disconnect(export, flags);
+ class_export_put(exp);
RETURN(rc);
}
LASSERT(llog_get_context(obd, LLOG_UNLINK_ORIG_CTXT) != NULL);
rc = llog_connect(llog_get_context(obd, LLOG_UNLINK_ORIG_CTXT),
- obd->u.mds.mds_lov_desc.ld_tgt_count, NULL, NULL);
+ obd->u.mds.mds_lov_desc.ld_tgt_count,
+ NULL, NULL, NULL);
if (rc != 0) {
CERROR("faild at llog_origin_connect: %d\n", rc);
}
/* setup the directory tree */
push_ctxt(&saved, &obd->obd_ctxt, NULL);
- dentry = simple_mkdir(current->fs->pwd, "ROOT", 0755);
+ dentry = simple_mkdir(current->fs->pwd, "ROOT", 0755, 0);
if (IS_ERR(dentry)) {
rc = PTR_ERR(dentry);
CERROR("cannot create ROOT directory: rc = %d\n", rc);
}
mds->mds_fid_de = dentry;
- dentry = simple_mkdir(current->fs->pwd, "PENDING", 0777);
+ dentry = simple_mkdir(current->fs->pwd, "PENDING", 0777, 1);
if (IS_ERR(dentry)) {
rc = PTR_ERR(dentry);
CERROR("cannot create PENDING directory: rc = %d\n", rc);
}
mds->mds_pending_dir = dentry;
- dentry = simple_mkdir(current->fs->pwd, "LOGS", 0777);
+ dentry = simple_mkdir(current->fs->pwd, "LOGS", 0777, 1);
if (IS_ERR(dentry)) {
rc = PTR_ERR(dentry);
CERROR("cannot create LOGS directory: rc = %d\n", rc);
}
mds->mds_logs_dir = dentry;
- dentry = simple_mkdir(current->fs->pwd, "OBJECTS", 0777);
+ dentry = simple_mkdir(current->fs->pwd, "OBJECTS", 0777, 1);
if (IS_ERR(dentry)) {
rc = PTR_ERR(dentry);
CERROR("cannot create OBJECTS directory: rc = %d\n", rc);
/* mds/mds_log.c */
-int mds_log_op_unlink(struct obd_device *obd, struct inode *inode,
+int mds_log_op_unlink(struct obd_device *obd, struct inode *inode,
struct lov_mds_md *lmm, int lmm_size,
struct llog_cookie *logcookies, int cookies_size);
-int mds_llog_init(struct obd_device *obd, struct obd_device *tgt, int count,
- struct llog_logid *logid);
+int mds_llog_init(struct obd_device *obd, struct obd_device *tgt, int count,
+ struct llog_catid *logid);
int mds_llog_finish(struct obd_device *obd, int count);
/* mds/mds_lov.c */
static int mds_llog_origin_connect(struct llog_ctxt *ctxt, int count,
struct llog_logid *logid,
- struct llog_gen *gen)
+ struct llog_gen *gen,
+ struct obd_uuid *uuid)
{
struct obd_device *obd = ctxt->loc_obd;
struct obd_device *lov_obd = obd->u.mds.mds_osc_obd;
ENTRY;
lctxt = llog_get_context(lov_obd, ctxt->loc_idx);
- rc = llog_connect(lctxt, count, logid, gen);
+ rc = llog_connect(lctxt, count, logid, gen, uuid);
RETURN(rc);
}
};
int mds_llog_init(struct obd_device *obd, struct obd_device *tgt,
- int count, struct llog_logid *logid)
+ int count, struct llog_catid *logid)
{
struct obd_device *lov_obd = obd->u.mds.mds_osc_obd;
int rc;
if (!obd->obd_recovering) {
rc = llog_connect(llog_get_context(obd, LLOG_UNLINK_ORIG_CTXT),
obd->u.mds.mds_lov_desc.ld_tgt_count, NULL,
- NULL);
+ NULL, NULL);
if (rc != 0)
CERROR("faild at llog_origin_connect: %d\n", rc);
{
struct obd_uuid *uuid;
int rc = 0;
+ ENTRY;
if (!active)
RETURN(0);
CWARN("MDS %s: in recovery, not resetting orphans on %s\n",
obd->obd_name, uuid->uuid);
} else {
+ LASSERT(llog_get_context(obd, LLOG_UNLINK_ORIG_CTXT) != NULL);
+
+ rc = obd_set_info(obd->u.mds.mds_osc_exp, strlen("mds_conn"), "mds_conn",
+ 0, uuid);
+ if (rc != 0)
+ RETURN(rc);
+
+ rc = llog_connect(llog_get_context(obd, LLOG_UNLINK_ORIG_CTXT),
+ obd->u.mds.mds_lov_desc.ld_tgt_count,
+ NULL, NULL, uuid);
+ if (rc != 0) {
+ CERROR("faild at llog_origin_connect: %d\n", rc);
+ RETURN(rc);
+ }
+
CWARN("MDS %s: %s now active, resetting orphans\n",
obd->obd_name, uuid->uuid);
rc = mds_lov_clearorphans(&obd->u.mds, uuid);
/* The following are visible and mutable through /proc/sys/lustre/. */
unsigned int obd_fail_loc;
unsigned int obd_timeout = 100;
-char obd_lustre_upcall[128] = "/usr/lib/lustre/lustre_upcall";
+char obd_lustre_upcall[128] = "DEFAULT"; /* or NONE or /full/path/to/upcall */
unsigned int obd_sync_filter; /* = 0, don't sync by default */
#ifdef __KERNEL__
return obd->u.cli.cl_import;
}
-
/* Export management functions */
static void export_handle_addref(void *export)
{
if (export == NULL) {
fixme();
- CDEBUG(D_IOCTL, "disconnect: attempting to free "
- "null export %p\n", export);
+ CDEBUG(D_IOCTL, "attempting to free NULL export %p\n", export);
RETURN(-EINVAL);
}
if (!loghandle->lgh_hdr)
goto out;
- if (le32_to_cpu(loghandle->lgh_hdr->llh_flags) & LLOG_F_IS_PLAIN)
+ if (loghandle->lgh_hdr->llh_flags & LLOG_F_IS_PLAIN)
list_del_init(&loghandle->u.phd.phd_entry);
- if (le32_to_cpu(loghandle->lgh_hdr->llh_flags) & LLOG_F_IS_CAT)
+ if (loghandle->lgh_hdr->llh_flags & LLOG_F_IS_CAT)
LASSERT(list_empty(&loghandle->u.chd.chd_head));
OBD_FREE(loghandle->lgh_hdr, LLOG_CHUNK_SIZE);
RETURN(-EINVAL);
}
- llh->llh_count = cpu_to_le32(le32_to_cpu(llh->llh_count) - 1);
+ llh->llh_count--;
- if ((le32_to_cpu(llh->llh_flags) & LLOG_F_ZAP_WHEN_EMPTY) &&
- (le32_to_cpu(llh->llh_count) == 1) &&
+ if ((llh->llh_flags & LLOG_F_ZAP_WHEN_EMPTY) &&
+ (llh->llh_count == 1) &&
(loghandle->lgh_last_idx == (LLOG_BITMAP_BYTES * 8) - 1)) {
rc = llog_destroy(loghandle);
if (rc)
RETURN(-ENOMEM);
handle->lgh_hdr = llh;
/* first assign flags to use llog_client_ops */
- llh->llh_flags = cpu_to_le32(flags);
+ llh->llh_flags = flags;
rc = llog_read_header(handle);
if (rc == 0) {
- flags = le32_to_cpu(llh->llh_flags);
+ flags = llh->llh_flags;
if (uuid)
LASSERT(obd_uuid_equals(uuid, &llh->llh_tgtuuid));
GOTO(out, rc);
rc = 0;
handle->lgh_last_idx = 0; /* header is record with index 0 */
- llh->llh_count = cpu_to_le32(1); /* for the header record */
- llh->llh_hdr.lrh_type = cpu_to_le32(LLOG_HDR_MAGIC);
- llh->llh_hdr.lrh_len = llh->llh_tail.lrt_len =
- cpu_to_le32(LLOG_CHUNK_SIZE);
+ llh->llh_count = 1; /* for the header record */
+ llh->llh_hdr.lrh_type = LLOG_HDR_MAGIC;
+ llh->llh_hdr.lrh_len = llh->llh_tail.lrt_len = LLOG_CHUNK_SIZE;
llh->llh_hdr.lrh_index = llh->llh_tail.lrt_index = 0;
- llh->llh_timestamp = cpu_to_le64(LTIME_S(CURRENT_TIME));
+ llh->llh_timestamp = LTIME_S(CURRENT_TIME);
if (uuid)
memcpy(&llh->llh_tgtuuid, uuid, sizeof(llh->llh_tgtuuid));
- llh->llh_bitmap_offset = cpu_to_le32(offsetof(typeof(*llh),llh_bitmap));
+ llh->llh_bitmap_offset = offsetof(typeof(*llh),llh_bitmap);
ext2_set_bit(0, llh->llh_bitmap);
out:
if (flags & LLOG_F_IS_CAT) {
INIT_LIST_HEAD(&handle->u.chd.chd_head);
- llh->llh_size = cpu_to_le32(sizeof(struct llog_logid_rec));
+ llh->llh_size = sizeof(struct llog_logid_rec);
}
else if (flags & LLOG_F_IS_PLAIN)
INIT_LIST_HEAD(&handle->u.phd.phd_entry);
GOTO(out, rc);
rec = buf;
- idx = le32_to_cpu(rec->lrh_index);
+ idx = rec->lrh_index;
if (idx < index)
CDEBUG(D_HA, "index %u : idx %u\n", index, idx);
while (idx < index) {
- rec = ((void *)rec + le32_to_cpu(rec->lrh_len));
+ rec = (struct llog_rec_hdr *)
+ ((char *)rec + rec->lrh_len);
idx ++;
}
++index;
if (index > last_index)
GOTO(out, rc = 0);
- rec = ((void *)rec + le32_to_cpu(rec->lrh_len));
+ rec = (struct llog_rec_hdr *)
+ ((char *)rec + rec->lrh_len);
}
}
{
struct llog_handle *loghandle;
struct llog_log_hdr *llh;
- struct llog_logid_rec rec;
+ struct llog_logid_rec rec = { { 0 }, };
int rc, index, bitmap_size;
ENTRY;
index = (cathandle->lgh_last_idx + 1) % bitmap_size;
/* maximum number of available slots in catlog is bitmap_size - 2 */
- if (llh->llh_cat_idx == cpu_to_le32(index)) {
+ if (llh->llh_cat_idx == index) {
CERROR("no free catalog slots for log...\n");
RETURN(ERR_PTR(-ENOSPC));
} else {
LBUG(); /* should never happen */
}
cathandle->lgh_last_idx = index;
- llh->llh_count = cpu_to_le32(le32_to_cpu(llh->llh_count) + 1);
- llh->llh_tail.lrt_index = cpu_to_le32(index);
+ llh->llh_count++;
+ llh->llh_tail.lrt_index = index;
}
rc = llog_create(cathandle->lgh_ctxt, &loghandle, NULL, NULL);
LPX64"\n", loghandle->lgh_id.lgl_oid, loghandle->lgh_id.lgl_ogen,
index, cathandle->lgh_id.lgl_oid);
/* build the record for this log in the catalog */
- rec.lid_hdr.lrh_len = cpu_to_le32(sizeof(rec));
- rec.lid_hdr.lrh_index = cpu_to_le32(index);
- rec.lid_hdr.lrh_type = cpu_to_le32(LLOG_LOGID_MAGIC);
+ rec.lid_hdr.lrh_len = sizeof(rec);
+ rec.lid_hdr.lrh_index = index;
+ rec.lid_hdr.lrh_type = LLOG_LOGID_MAGIC;
rec.lid_id = loghandle->lgh_id;
- rec.lid_tail.lrt_len = cpu_to_le32(sizeof(rec));
- rec.lid_tail.lrt_index = cpu_to_le32(index);
+ rec.lid_tail.lrt_len = sizeof(rec);
+ rec.lid_tail.lrt_index = index;
/* update the catalog: header and record */
rc = llog_write_rec(cathandle, &rec.lid_hdr,
GOTO(out_destroy, rc);
}
- loghandle->lgh_hdr->llh_cat_idx = cpu_to_le32(index);
+ loghandle->lgh_hdr->llh_cat_idx = index;
cathandle->u.chd.chd_current_log = loghandle;
LASSERT(list_empty(&loghandle->u.phd.phd_entry));
list_add_tail(&loghandle->u.phd.phd_entry, &cathandle->u.chd.chd_head);
if (!rc) {
loghandle->u.phd.phd_cat_handle = cathandle;
loghandle->u.phd.phd_cookie.lgc_lgl = cathandle->lgh_id;
- loghandle->u.phd.phd_cookie.lgc_index =
- le32_to_cpu(loghandle->lgh_hdr->llh_cat_idx);
+ loghandle->u.phd.phd_cookie.lgc_index =
+ loghandle->lgh_hdr->llh_cat_idx;
}
out:
int rc;
ENTRY;
- LASSERT(le32_to_cpu(rec->lrh_len) <= LLOG_CHUNK_SIZE);
+ LASSERT(rec->lrh_len <= LLOG_CHUNK_SIZE);
loghandle = llog_cat_current_log(cathandle, 1);
if (IS_ERR(loghandle))
RETURN(PTR_ERR(loghandle));
/* loghandle is already locked by llog_cat_current_log() for us */
rc = llog_write_rec(loghandle, rec, reccookie, 1, buf, -1);
up_write(&loghandle->lgh_lock);
+ if (rc == -ENOSPC) {
+ /* to create a new plain log */
+ loghandle = llog_cat_current_log(cathandle, 1);
+ if (IS_ERR(loghandle))
+ RETURN(PTR_ERR(loghandle));
+ rc = llog_write_rec(loghandle, rec, reccookie, 1, buf, -1);
+ up_write(&loghandle->lgh_lock);
+ }
RETURN(rc);
}
struct llog_handle *llh;
int rc;
- if (le32_to_cpu(rec->lrh_type) != LLOG_LOGID_MAGIC) {
+ if (rec->lrh_type != LLOG_LOGID_MAGIC) {
CERROR("invalid record in catalog\n");
RETURN(-EINVAL);
}
CWARN("processing log "LPX64":%x at index %u of catalog "LPX64"\n",
lir->lid_id.lgl_oid, lir->lid_id.lgl_ogen,
- le32_to_cpu(rec->lrh_index), cat_llh->lgh_id.lgl_oid);
+ rec->lrh_index, cat_llh->lgh_id.lgl_oid);
rc = llog_cat_id2handle(cat_llh, &llh, &lir->lid_id);
if (rc) {
int rc;
ENTRY;
- LASSERT(llh->llh_flags &cpu_to_le32(LLOG_F_IS_CAT));
+ LASSERT(llh->llh_flags & LLOG_F_IS_CAT);
d.lpd_data = data;
d.lpd_cb = cb;
CWARN("catlog "LPX64" crosses index zero\n",
cat_llh->lgh_id.lgl_oid);
- cd.first_idx = le32_to_cpu(llh->llh_cat_idx);
+ cd.first_idx = llh->llh_cat_idx;
cd.last_idx = 0;
rc = llog_process(cat_llh, llog_cat_process_cb, &d, &cd);
if (rc != 0)
ENTRY;
bitmap_size = sizeof(llh->llh_bitmap) * 8;
- if (llh->llh_cat_idx == cpu_to_le32(index - 1)) {
- idx = le32_to_cpu(llh->llh_cat_idx) + 1;
- llh->llh_cat_idx = cpu_to_le32(idx);
+ if (llh->llh_cat_idx == (index - 1)) {
+ idx = llh->llh_cat_idx + 1;
+ llh->llh_cat_idx = idx;
if (idx == cathandle->lgh_last_idx)
goto out;
for (i = (index + 1) % bitmap_size;
i != cathandle->lgh_last_idx;
i = (i + 1) % bitmap_size) {
if (!ext2_test_bit(i, llh->llh_bitmap)) {
- idx = le32_to_cpu(llh->llh_cat_idx) + 1;
- llh->llh_cat_idx = cpu_to_le32(idx);
+ idx = llh->llh_cat_idx + 1;
+ llh->llh_cat_idx = idx;
} else if (i == 0) {
llh->llh_cat_idx = 0;
} else {
}
out:
CDEBUG(D_HA, "set catlog "LPX64" first idx %u\n",
- cathandle->lgh_id.lgl_oid,le32_to_cpu(llh->llh_cat_idx));
+ cathandle->lgh_id.lgl_oid, llh->llh_cat_idx);
}
RETURN(0);
#ifndef __LLOG_INTERNAL_H__
#define __LLOG_INTERNAL_H__
-int llog_get_cat_list(struct obd_device *obd, struct obd_device *disk_obd,
- char *name, int count, struct llog_logid *idarray);
-int llog_put_cat_list(struct obd_device *obd, struct obd_device *disk_obd,
- char *name, int count, struct llog_logid *);
+int llog_put_cat_list(struct obd_device *obd, struct obd_device *disk_obd,
+ char *name, int count, struct llog_catid *idarray);
int llog_cat_id2handle(struct llog_handle *cathandle, struct llog_handle **res,
struct llog_logid *logid);
#endif
char *endp;
int cur_index, rc = 0;
- cur_index = le32_to_cpu(rec->lrh_index);
+ cur_index = rec->lrh_index;
if (ioc_data && (ioc_data->ioc_inllen1)) {
l = 0;
if (to > 0 && cur_index > to)
RETURN(-LLOG_EEMPTY);
}
- if (handle->lgh_hdr->llh_flags & cpu_to_le32(LLOG_F_IS_CAT)) {
+ if (handle->lgh_hdr->llh_flags & LLOG_F_IS_CAT) {
struct llog_logid_rec *lir = (struct llog_logid_rec *)rec;
struct llog_handle *log_handle;
- if (rec->lrh_type != cpu_to_le32(LLOG_LOGID_MAGIC)) {
+ if (rec->lrh_type != LLOG_LOGID_MAGIC) {
l = snprintf(out, remains, "[index]: %05d [type]: "
"%02x [len]: %04d failed\n",
- cur_index, le32_to_cpu(rec->lrh_type),
- le32_to_cpu(rec->lrh_len));
+ cur_index, rec->lrh_type,
+ rec->lrh_len);
}
if (handle->lgh_ctxt == NULL)
RETURN(-EOPNOTSUPP);
rc = llog_process(log_handle, llog_check_cb, NULL, NULL);
llog_close(log_handle);
} else {
- switch (le32_to_cpu(rec->lrh_type)) {
+ switch (rec->lrh_type) {
case OST_SZ_REC:
case OST_RAID1_REC:
case MDS_UNLINK_REC:
case LLOG_HDR_MAGIC: {
l = snprintf(out, remains, "[index]: %05d [type]: "
"%02x [len]: %04d ok\n",
- cur_index, le32_to_cpu(rec->lrh_type),
- le32_to_cpu(rec->lrh_len));
+ cur_index, rec->lrh_type,
+ rec->lrh_len);
out += l;
remains -= l;
if (remains <= 0) {
default: {
l = snprintf(out, remains, "[index]: %05d [type]: "
"%02x [len]: %04d failed\n",
- cur_index, le32_to_cpu(rec->lrh_type),
- le32_to_cpu(rec->lrh_len));
+ cur_index, rec->lrh_type,
+ rec->lrh_len);
out += l;
remains -= l;
if (remains <= 0) {
ioc_data->ioc_inllen1 = 0;
}
- cur_index = le32_to_cpu(rec->lrh_index);
+ cur_index = rec->lrh_index;
if (cur_index < from)
RETURN(0);
if (to > 0 && cur_index > to)
RETURN(-LLOG_EEMPTY);
- if (handle->lgh_hdr->llh_flags & cpu_to_le32(LLOG_F_IS_CAT)) {
+ if (handle->lgh_hdr->llh_flags & LLOG_F_IS_CAT) {
struct llog_logid_rec *lir = (struct llog_logid_rec *)rec;
- if (rec->lrh_type != cpu_to_le32(LLOG_LOGID_MAGIC)) {
+ if (rec->lrh_type != LLOG_LOGID_MAGIC) {
CERROR("invalid record in catalog\n");
RETURN(-EINVAL);
}
} else {
l = snprintf(out, remains,
"[index]: %05d [type]: %02x [len]: %04d\n",
- cur_index, le32_to_cpu(rec->lrh_type),
- le32_to_cpu(rec->lrh_len));
+ cur_index, rec->lrh_type,
+ rec->lrh_len);
}
out += l;
remains -= l;
struct llog_logid_rec *lir = (struct llog_logid_rec*)rec;
int rc;
- if (rec->lrh_type != cpu_to_le32(LLOG_LOGID_MAGIC))
+ if (rec->lrh_type != LLOG_LOGID_MAGIC)
return (-EINVAL);
rc = llog_remove_log(handle, &lir->lid_id);
"last index: %d\n",
handle->lgh_id.lgl_oid, handle->lgh_id.lgl_ogr,
handle->lgh_id.lgl_ogen,
- le32_to_cpu(handle->lgh_hdr->llh_flags),
- le32_to_cpu(handle->lgh_hdr->llh_flags) &
+ handle->lgh_hdr->llh_flags,
+ handle->lgh_hdr->llh_flags &
LLOG_F_IS_CAT ? "cat" : "plain",
- le32_to_cpu(handle->lgh_hdr->llh_count),
+ handle->lgh_hdr->llh_count,
handle->lgh_last_idx);
out += l;
remains -= l;
struct llog_logid plain;
char *endp;
- if (!(handle->lgh_hdr->llh_flags & cpu_to_le32(LLOG_F_IS_CAT)))
+ if (!(handle->lgh_hdr->llh_flags & LLOG_F_IS_CAT))
GOTO(out_close, err = -EINVAL);
err = str2logid(&plain, data->ioc_inlbuf2, data->ioc_inllen2);
case OBD_IOC_LLOG_REMOVE: {
struct llog_logid plain;
- if (!(handle->lgh_hdr->llh_flags & cpu_to_le32(LLOG_F_IS_CAT)))
+ if (!(handle->lgh_hdr->llh_flags & LLOG_F_IS_CAT))
GOTO(out_close, err = -EINVAL);
if (data->ioc_inlbuf2) {
out_close:
if (handle->lgh_hdr &&
- handle->lgh_hdr->llh_flags & cpu_to_le32(LLOG_F_IS_CAT))
+ handle->lgh_hdr->llh_flags & LLOG_F_IS_CAT)
llog_cat_put(handle);
else
llog_close(handle);
struct obd_ioctl_data *data)
{
int size, i;
- struct llog_logid *idarray, *id;
- char name[32] = "CATLIST";
+ struct llog_catid *idarray;
+ struct llog_logid *id;
+ char name[32] = CATLIST;
char *out;
int l, remains, rc = 0;
out = data->ioc_bulk;
remains = data->ioc_inllen1;
- id = idarray;
for (i = 0; i < count; i++) {
+ id = &idarray[i].lci_logid;
l = snprintf(out, remains,
"catalog log: #"LPX64"#"LPX64"#%08x\n",
id->lgl_oid, id->lgl_ogr, id->lgl_ogen);
- id++;
out += l;
remains -= l;
if (remains <= 0) {
static int llog_lvfs_pad(struct obd_device *obd, struct l_file *file,
int len, int index)
{
- struct llog_rec_hdr rec;
+ struct llog_rec_hdr rec = { 0 };
struct llog_rec_tail tail;
int rc;
ENTRY;
LASSERT(len >= LLOG_MIN_REC_SIZE && (len & 0x7) == 0);
- tail.lrt_len = rec.lrh_len = cpu_to_le32(len);
- tail.lrt_index = rec.lrh_index = cpu_to_le32(index);
+ tail.lrt_len = rec.lrh_len = len;
+ tail.lrt_index = rec.lrh_index = index;
rec.lrh_type = 0;
rc = fsfilt_write_record(obd, file, &rec, sizeof(rec), &file->f_pos, 0);
int rc;
struct llog_rec_tail end;
loff_t saved_off = file->f_pos;
- int buflen = le32_to_cpu(rec->lrh_len);
+ int buflen = rec->lrh_len;
ENTRY;
file->f_pos = off;
}
/* the buf case */
- rec->lrh_len = cpu_to_le32(sizeof(*rec) + buflen + sizeof(end));
+ rec->lrh_len = sizeof(*rec) + buflen + sizeof(end);
rc = fsfilt_write_record(obd, file, rec, sizeof(*rec), &file->f_pos, 0);
if (rc) {
CERROR("error writing log hdr: rc %d\n", rc);
if (rc)
CERROR("error reading log header\n");
- handle->lgh_last_idx = le32_to_cpu(handle->lgh_hdr->llh_tail.lrt_index);
+ handle->lgh_last_idx = handle->lgh_hdr->llh_tail.lrt_index;
handle->lgh_file->f_pos = handle->lgh_file->f_dentry->d_inode->i_size;
RETURN(rc);
void *buf, int idx)
{
struct llog_log_hdr *llh;
- int reclen = le32_to_cpu(rec->lrh_len), index, rc;
+ int reclen = rec->lrh_len, index, rc;
struct llog_rec_tail *lrt;
struct obd_device *obd;
struct file *file;
- loff_t offset;
size_t left;
ENTRY;
/* record length should not bigger than LLOG_CHUNK_SIZE */
if (buf)
- rc = (reclen > LLOG_CHUNK_SIZE - sizeof(struct llog_rec_hdr)
- - sizeof(struct llog_rec_tail)) ? -E2BIG : 0;
+ rc = (reclen > LLOG_CHUNK_SIZE - sizeof(struct llog_rec_hdr) -
+ sizeof(struct llog_rec_tail)) ? -E2BIG : 0;
else
rc = (reclen > LLOG_CHUNK_SIZE) ? -E2BIG : 0;
if (rc)
if (rc || idx == 0)
RETURN(rc);
- saved_offset = sizeof(*llh) + (idx-1)*le32_to_cpu(rec->lrh_len);
+ saved_offset = sizeof(*llh) + (idx-1)*rec->lrh_len;
rc = llog_lvfs_write_blob(obd, file, rec, buf, saved_offset);
if (rc == 0 && reccookie) {
reccookie->lgc_lgl = loghandle->lgh_id;
*/
left = LLOG_CHUNK_SIZE - (file->f_pos & (LLOG_CHUNK_SIZE - 1));
if (buf)
- reclen = sizeof(*rec) + le32_to_cpu(rec->lrh_len) +
+ reclen = sizeof(*rec) + rec->lrh_len +
sizeof(struct llog_rec_tail);
/* NOTE: padding is a record, but no bit is set */
if (left != 0 && left != reclen &&
left < (reclen + LLOG_MIN_REC_SIZE)) {
+ int bitmap_size = sizeof(llh->llh_bitmap) * 8;
loghandle->lgh_last_idx++;
rc = llog_lvfs_pad(obd, file, left, loghandle->lgh_last_idx);
if (rc)
RETURN(rc);
+ /* if it's the last idx in log file, then return -ENOSPC */
+ if (loghandle->lgh_last_idx == bitmap_size - 1)
+ RETURN(-ENOSPC);
}
loghandle->lgh_last_idx++;
index = loghandle->lgh_last_idx;
- rec->lrh_index = cpu_to_le32(index);
+ rec->lrh_index = index;
if (buf == NULL) {
- lrt = (void *)rec + le32_to_cpu(rec->lrh_len) - sizeof(*lrt);
+ lrt = (struct llog_rec_tail *)
+ ((char *)rec + rec->lrh_len - sizeof(*lrt));
lrt->lrt_len = rec->lrh_len;
lrt->lrt_index = rec->lrh_index;
}
CERROR("argh, index %u already set in log bitmap?\n", index);
LBUG(); /* should never happen */
}
- llh->llh_count = cpu_to_le32(le32_to_cpu(llh->llh_count) + 1);
- llh->llh_tail.lrt_index = cpu_to_le32(index);
+ llh->llh_count++;
+ llh->llh_tail.lrt_index = index;
- offset = 0;
rc = llog_lvfs_write_blob(obd, file, &llh->llh_hdr, NULL, 0);
if (rc)
RETURN(rc);
RETURN(rc);
CDEBUG(D_HA, "added record "LPX64": idx: %u, %u bytes\n",
- loghandle->lgh_id.lgl_oid, index, le32_to_cpu(rec->lrh_len));
+ loghandle->lgh_id.lgl_oid, index, rec->lrh_len);
if (rc == 0 && reccookie) {
reccookie->lgc_lgl = loghandle->lgh_id;
reccookie->lgc_index = index;
- if (le32_to_cpu(rec->lrh_type) == MDS_UNLINK_REC)
+ if (rec->lrh_type == MDS_UNLINK_REC)
reccookie->lgc_subsys = LLOG_UNLINK_ORIG_CTXT;
- else if (le32_to_cpu(rec->lrh_type) == OST_SZ_REC)
+ else if (rec->lrh_type == OST_SZ_REC)
reccookie->lgc_subsys = LLOG_SIZE_ORIG_CTXT;
- else if (le32_to_cpu(rec->lrh_type) == OST_RAID1_REC)
+ else if (rec->lrh_type == OST_RAID1_REC)
reccookie->lgc_subsys = LLOG_RD1_ORIG_CTXT;
else
reccookie->lgc_subsys = -1;
rc = 1;
}
- if (rc == 0 && le32_to_cpu(rec->lrh_type) == LLOG_GEN_REC)
+ if (rc == 0 && rec->lrh_type == LLOG_GEN_REC)
rc = 1;
RETURN(rc);
}
tail = buf + rc - sizeof(struct llog_rec_tail);
- *cur_idx = le32_to_cpu(tail->lrt_index);
+ *cur_idx = tail->lrt_index;
/* this shouldn't happen */
if (tail->lrt_index == 0) {
loghandle->lgh_id.lgl_ogen, *cur_offset);
RETURN(-EINVAL);
}
- if (le32_to_cpu(tail->lrt_index) < next_idx)
+ if (tail->lrt_index < next_idx)
continue;
/* sanity check that the start of the new buffer is no farther
* than the record that we wanted. This shouldn't happen. */
rec = buf;
- if (le32_to_cpu(rec->lrh_index) > next_idx) {
+ if (rec->lrh_index > next_idx) {
CERROR("missed desired record? %u > %u\n",
- le32_to_cpu(rec->lrh_index), next_idx);
+ rec->lrh_index, next_idx);
RETURN(-ENOENT);
}
RETURN(0);
/* reads the catalog list */
int llog_get_cat_list(struct obd_device *obd, struct obd_device *disk_obd,
- char *name, int count, struct llog_logid *idarray)
+ char *name, int count, struct llog_catid *idarray)
{
struct obd_run_ctxt saved;
struct l_file *file;
/* writes the cat list */
int llog_put_cat_list(struct obd_device *obd, struct obd_device *disk_obd,
- char *name, int count, struct llog_logid *idarray)
+ char *name, int count, struct llog_catid *idarray)
{
struct obd_run_ctxt saved;
struct l_file *file;
int rc, index;
ENTRY;
- if (le32_to_cpu(rec->lrh_type) != LLOG_LOGID_MAGIC) {
+ if (rec->lrh_type != LLOG_LOGID_MAGIC) {
CERROR("invalid record in catalog\n");
RETURN(-EINVAL);
}
CWARN("processing log "LPX64":%x at index %u of catalog "LPX64"\n",
lir->lid_id.lgl_oid, lir->lid_id.lgl_ogen,
- le32_to_cpu(rec->lrh_index), cathandle->lgh_id.lgl_oid);
+ rec->lrh_index, cathandle->lgh_id.lgl_oid);
rc = llog_cat_id2handle(cathandle, &loghandle, &lir->lid_id);
if (rc) {
}
llh = loghandle->lgh_hdr;
- if ((le32_to_cpu(llh->llh_flags) & LLOG_F_ZAP_WHEN_EMPTY) &&
- (le32_to_cpu(llh->llh_count) == 1)) {
+ if ((llh->llh_flags & LLOG_F_ZAP_WHEN_EMPTY) &&
+ (llh->llh_count == 1)) {
rc = llog_destroy(loghandle);
if (rc)
CERROR("failure destroying log in postsetup: %d\n", rc);
if (rc == 0)
CWARN("cancel log "LPX64":%x at index %u of catalog "
LPX64"\n", lir->lid_id.lgl_oid,
- lir->lid_id.lgl_ogen, le32_to_cpu(rec->lrh_index),
+ lir->lid_id.lgl_ogen, rec->lrh_index,
cathandle->lgh_id.lgl_oid);
}
&cathandle->u.chd.chd_head,
u.phd.phd_entry) {
llh = loghandle->lgh_hdr;
- if ((le32_to_cpu(llh->llh_flags) &
+ if ((llh->llh_flags &
LLOG_F_ZAP_WHEN_EMPTY) &&
- (le32_to_cpu(llh->llh_count) == 1)) {
+ (llh->llh_count == 1)) {
rc = llog_destroy(loghandle);
if (rc)
CERROR("failure destroying log during "
int llog_cat_initialize(struct obd_device *obd, int count)
{
- struct llog_logid *idarray;
+ struct llog_catid *idarray;
int size = sizeof(*idarray) * count;
- char name[32] = "CATLIST";
+ char name[32] = CATLIST;
int rc;
ENTRY;
if (!idarray)
RETURN(-ENOMEM);
- memset(idarray, 0, size);
-
rc = llog_get_cat_list(obd, obd, name, count, idarray);
if (rc) {
CERROR("rc: %d\n", rc);
EXPORT_SYMBOL(llog_cat_initialize);
int obd_llog_init(struct obd_device *obd, struct obd_device *disk_obd,
- int count, struct llog_logid *logid)
+ int count, struct llog_catid *logid)
{
int rc;
ENTRY;
RETURN(-ERANGE);
}
- if (le32_to_cpu(llh->lgh_hdr->llh_count) != num_recs) {
+ if (llh->lgh_hdr->llh_count != num_recs) {
CERROR("%s: handle->count is %d, expected %d after write\n",
- test, le32_to_cpu(llh->lgh_hdr->llh_count), num_recs);
+ test, llh->lgh_hdr->llh_count, num_recs);
RETURN(-ERANGE);
}
int num_recs = 1; /* 1 for the header */
ENTRY;
- lcr.lcr_hdr.lrh_len = lcr.lcr_tail.lrt_len = cpu_to_le32(sizeof(lcr));
- lcr.lcr_hdr.lrh_type = cpu_to_le32(OST_SZ_REC);
+ lcr.lcr_hdr.lrh_len = lcr.lcr_tail.lrt_len = sizeof(lcr);
+ lcr.lcr_hdr.lrh_type = OST_SZ_REC;
CWARN("3a: write one create_rec\n");
rc = llog_write_rec(llh, &lcr.lcr_hdr, NULL, 0, NULL, -1);
for (i = 0; i < 10; i++) {
struct llog_rec_hdr hdr;
char buf[8];
- hdr.lrh_len = cpu_to_le32(8);
- hdr.lrh_type = cpu_to_le32(OBD_CFG_REC);
+ hdr.lrh_len = 8;
+ hdr.lrh_type = OBD_CFG_REC;
memset(buf, 0, sizeof buf);
rc = llog_write_rec(llh, &hdr, NULL, 0, buf, -1);
if (rc) {
ENTRY;
- lmr.lmr_hdr.lrh_len = lmr.lmr_tail.lrt_len =
- cpu_to_le32(LLOG_MIN_REC_SIZE);
- lmr.lmr_hdr.lrh_type = cpu_to_le32(0xf00f00);
+ lmr.lmr_hdr.lrh_len = lmr.lmr_tail.lrt_len = LLOG_MIN_REC_SIZE;
+ lmr.lmr_hdr.lrh_type = 0xf00f00;
sprintf(name, "%x", llog_test_rand+1);
CWARN("4a: create a catalog log with name: %s\n", name);
if (buf == NULL)
GOTO(out, rc = -ENOMEM);
for (i = 0; i < 5; i++) {
- rec.lrh_len = cpu_to_le32(buflen);
- rec.lrh_type = cpu_to_le32(OBD_CFG_REC);
+ rec.lrh_len = buflen;
+ rec.lrh_type = OBD_CFG_REC;
rc = llog_cat_add_rec(cath, &rec, NULL, buf);
if (rc) {
CERROR("4e: write 5 records failed at #%d: %d\n",
{
struct llog_logid_rec *lir = (struct llog_logid_rec *)rec;
- if (le32_to_cpu(rec->lrh_type) != LLOG_LOGID_MAGIC) {
+ if (rec->lrh_type != LLOG_LOGID_MAGIC) {
CERROR("invalid record in catalog\n");
RETURN(-EINVAL);
}
CWARN("seeing record at index %d - "LPX64":%x in log "LPX64"\n",
- le32_to_cpu(rec->lrh_index), lir->lid_id.lgl_oid,
+ rec->lrh_index, lir->lid_id.lgl_oid,
lir->lid_id.lgl_ogen, llh->lgh_id.lgl_oid);
RETURN(0);
}
static int plain_print_cb(struct llog_handle *llh, struct llog_rec_hdr *rec,
void *data)
{
- if (!(le32_to_cpu(llh->lgh_hdr->llh_flags) & LLOG_F_IS_PLAIN)) {
+ if (!(llh->lgh_hdr->llh_flags & LLOG_F_IS_PLAIN)) {
CERROR("log is not plain\n");
RETURN(-EINVAL);
}
CWARN("seeing record at index %d in log "LPX64"\n",
- le32_to_cpu(rec->lrh_index), llh->lgh_id.lgl_oid);
+ rec->lrh_index, llh->lgh_id.lgl_oid);
RETURN(0);
}
struct llog_cookie cookie;
static int i = 0;
- if (!(le32_to_cpu(llh->lgh_hdr->llh_flags) & LLOG_F_IS_PLAIN)) {
+ if (!(llh->lgh_hdr->llh_flags & LLOG_F_IS_PLAIN)) {
CERROR("log is not plain\n");
RETURN(-EINVAL);
}
cookie.lgc_lgl = llh->lgh_id;
- cookie.lgc_index = le32_to_cpu(rec->lrh_index);
+ cookie.lgc_index = rec->lrh_index;
llog_cat_cancel_records(llh->u.phd.phd_cat_handle, 1, &cookie);
i++;
ENTRY;
- lmr.lmr_hdr.lrh_len = lmr.lmr_tail.lrt_len =
- cpu_to_le32(LLOG_MIN_REC_SIZE);
- lmr.lmr_hdr.lrh_type = cpu_to_le32(0xf00f00);
+ lmr.lmr_hdr.lrh_len = lmr.lmr_tail.lrt_len = LLOG_MIN_REC_SIZE;
+ lmr.lmr_hdr.lrh_type = 0xf00f00;
CWARN("5a: re-open catalog by id\n");
rc = llog_create(ctxt, &llh, &cat_logid, NULL);
static int llog_test_llog_init(struct obd_device *obd, struct obd_device *tgt,
- int count, struct llog_logid *logid)
+ int count, struct llog_catid *logid)
{
int rc;
ENTRY;
LPROCFS_OBD_OP_INIT(num_private_stats, stats, llog_finish);
LPROCFS_OBD_OP_INIT(num_private_stats, stats, pin);
LPROCFS_OBD_OP_INIT(num_private_stats, stats, unpin);
- LPROCFS_OBD_OP_INIT(num_private_stats, stats, invalidate_import);
+ LPROCFS_OBD_OP_INIT(num_private_stats, stats, import_event);
LPROCFS_OBD_OP_INIT(num_private_stats, stats, notify);
for (i = num_private_stats; i < num_stats; i++) {
struct portals_handle *h;
h = list_entry(tmp, struct portals_handle, h_link);
- CERROR("forcing cleanup for handle "LPX64"\n",
- h->h_cookie);
+ CERROR("force clean handle "LPX64" addr %p addref %p\n",
+ h->h_cookie, h, h->h_addref);
class_handle_unhash_nolock(h);
}
int echo_commitrw(int cmd, struct obd_export *export, struct obdo *oa,
int objcount, struct obd_ioobj *obj, int niocount,
- struct niobuf_local *res, struct obd_trans_info *oti)
+ struct niobuf_local *res, struct obd_trans_info *oti, int rc)
{
struct obd_device *obd;
struct niobuf_local *r = res;
- int i, vrc = 0, rc = 0;
+ int i, vrc = 0;
ENTRY;
obd = export->exp_obd;
if (obd == NULL)
RETURN(-EINVAL);
+ if (rc)
+ GOTO(commitrw_cleanup, rc);
+
if ((cmd & OBD_BRW_RWMASK) == OBD_BRW_READ) {
CDEBUG(D_PAGE, "reading %d obdos with %d IOs\n",
objcount, niocount);
struct niobuf_remote *rnb;
obd_off off;
obd_size npages, tot_pages;
- int i, ret = 0, err = 0;
+ int i, ret = 0;
ENTRY;
if (count <= 0 || (count & (PAGE_SIZE - 1)) != 0 ||
struct page *page = lnb[i].page;
/* read past eof? */
- if (page == NULL && lnb[i].rc == 0)
+ if (page == NULL && lnb[i].rc == 0)
continue;
if (oa->o_id == ECHO_PERSISTENT_OBJID)
continue;
- if (rw == OBD_BRW_WRITE)
- echo_client_page_debug_setup(lsm, page, rw,
- oa->o_id,
- rnb[i].offset,
+ if (rw == OBD_BRW_WRITE)
+ echo_client_page_debug_setup(lsm, page, rw,
+ oa->o_id,
+ rnb[i].offset,
rnb[i].len);
else
- echo_client_page_debug_check(lsm, page,
- oa->o_id,
- rnb[i].offset,
+ echo_client_page_debug_check(lsm, page,
+ oa->o_id,
+ rnb[i].offset,
rnb[i].len);
}
- ret = obd_commitrw(rw, exp, oa, 1, &ioo, npages, lnb, oti);
+ ret = obd_commitrw(rw, exp, oa, 1, &ioo, npages, lnb, oti, ret);
if (ret != 0)
GOTO(out, ret);
- if (err)
- GOTO(out, ret = err);
}
out:
RETURN(ret);
}
-int echo_client_brw_ioctl(int rw, struct obd_export *exp,
+int echo_client_brw_ioctl(int rw, struct obd_export *exp,
struct obd_ioctl_data *data)
{
struct obd_device *obd = class_exp2obd(exp);
int i, rc = 0, cleanup_phase = 0;
ENTRY;
- O_dentry = simple_mkdir(current->fs->pwd, "O", 0700);
+ O_dentry = simple_mkdir(current->fs->pwd, "O", 0700, 1);
CDEBUG(D_INODE, "got/created O: %p\n", O_dentry);
if (IS_ERR(O_dentry)) {
rc = PTR_ERR(O_dentry);
loff_t off = 0;
sprintf(name, "%d", i);
- dentry = simple_mkdir(O_dentry, name, 0700);
+ dentry = simple_mkdir(O_dentry, name, 0700, 1);
CDEBUG(D_INODE, "got/created O/%s: %p\n", name, dentry);
if (IS_ERR(dentry)) {
rc = PTR_ERR(dentry);
char dir[20];
snprintf(dir, sizeof(dir), "d%u", i);
- dentry = simple_mkdir(O_dentry, dir, 0700);
+ dentry = simple_mkdir(O_dentry, dir, 0700, 1);
CDEBUG(D_INODE, "got/created O/0/%s: %p\n", dir,dentry);
if (IS_ERR(dentry)) {
rc = PTR_ERR(dentry);
RETURN(rc);
}
+/* Do extra sanity checks for grant accounting. We do this at connect,
+ * disconnect, and statfs RPC time, so it shouldn't be too bad. We can
+ * always get rid of it or turn it off when we know accounting is good. */
+static void filter_grant_sanity_check(struct obd_device *obd, char *func)
+{
+ struct filter_export_data *fed;
+ struct obd_export *exp;
+ obd_size maxsize = obd->obd_osfs.os_blocks * obd->obd_osfs.os_bsize;
+ obd_size tot_dirty = 0, tot_pending = 0, tot_granted = 0;
+ obd_size fo_tot_dirty, fo_tot_pending, fo_tot_granted;
+
+ if (list_empty(&obd->obd_exports))
+ return;
+
+ spin_lock(&obd->obd_osfs_lock);
+ spin_lock(&obd->obd_dev_lock);
+ list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain) {
+ fed = &exp->exp_filter_data;
+ LASSERTF(fed->fed_grant + fed->fed_pending <= maxsize,
+ "cli %s/%p %lu+%lu > "LPU64"\n",
+ exp->exp_client_uuid.uuid, exp,
+ fed->fed_grant, fed->fed_pending, maxsize);
+ LASSERTF(fed->fed_dirty <= maxsize, "cli %s/%p %lu > "LPU64"\n",
+ exp->exp_client_uuid.uuid, exp,fed->fed_dirty,maxsize);
+ CDEBUG(D_CACHE,"%s: cli %s/%p dirty %lu pend %lu grant %lu\n",
+ obd->obd_name, exp->exp_client_uuid.uuid, exp,
+ fed->fed_dirty, fed->fed_pending, fed->fed_grant);
+ tot_granted += fed->fed_grant + fed->fed_pending;
+ tot_pending += fed->fed_pending;
+ tot_dirty += fed->fed_dirty;
+ }
+ fo_tot_granted = obd->u.filter.fo_tot_granted;
+ fo_tot_pending = obd->u.filter.fo_tot_pending;
+ fo_tot_dirty = obd->u.filter.fo_tot_dirty;
+ spin_unlock(&obd->obd_dev_lock);
+ spin_unlock(&obd->obd_osfs_lock);
+
+ /* Do these assertions outside the spinlocks so we don't kill system */
+ LASSERTF(tot_granted == fo_tot_granted, "%s "LPU64" != "LPU64"\n",
+ func, tot_granted, fo_tot_granted);
+ LASSERTF(tot_pending == fo_tot_pending, "%s "LPU64" != "LPU64"\n",
+ func, tot_pending, fo_tot_pending);
+ LASSERTF(tot_dirty == fo_tot_dirty, "%s "LPU64" != "LPU64"\n",
+ func, tot_dirty, fo_tot_dirty);
+ LASSERTF(tot_pending <= tot_granted, "%s "LPU64" > "LPU64"\n",
+ func, tot_pending, tot_granted);
+ LASSERTF(tot_granted <= maxsize, "%s "LPU64" > "LPU64"\n",
+ func, tot_granted, maxsize);
+ LASSERTF(tot_dirty <= maxsize, "%s "LPU64" > "LPU64"\n",
+ func, tot_dirty, maxsize);
+}
+
+/* Remove this client from the grant accounting totals. This is done at
+ * disconnect time and also at export destroy time in case there was a race
+ * between removing the export and an incoming BRW updating the client grant.
+ * The client should do something similar when it invalidates its import. */
+static void filter_grant_discard(struct obd_export *exp)
+{
+ struct obd_device *obd = exp->exp_obd;
+ struct filter_obd *filter = &obd->u.filter;
+ struct filter_export_data *fed = &exp->exp_filter_data;
+
+ spin_lock(&obd->obd_osfs_lock);
+ CDEBUG(D_CACHE, "%s: cli %s/%p dirty %lu pend %lu grant %lu\n",
+ obd->obd_name, exp->exp_client_uuid.uuid, exp,
+ fed->fed_dirty, fed->fed_pending, fed->fed_grant);
+
+ LASSERTF(filter->fo_tot_granted >= fed->fed_grant,
+ "%s: tot_granted "LPU64" cli %s/%p fed_grant %lu\n",
+ obd->obd_name, filter->fo_tot_granted,
+ exp->exp_client_uuid.uuid, exp, fed->fed_grant);
+ filter->fo_tot_granted -= fed->fed_grant;
+ LASSERTF(exp->exp_obd->u.filter.fo_tot_pending >= fed->fed_pending,
+ "%s: tot_pending "LPU64" cli %s/%p fed_pending %lu\n",
+ obd->obd_name, filter->fo_tot_pending,
+ exp->exp_client_uuid.uuid, exp, fed->fed_pending);
+ LASSERTF(filter->fo_tot_dirty >= fed->fed_dirty,
+ "%s: tot_dirty "LPU64" cli %s/%p fed_dirty %lu\n",
+ obd->obd_name, filter->fo_tot_dirty,
+ exp->exp_client_uuid.uuid, exp, fed->fed_dirty);
+ filter->fo_tot_dirty -= fed->fed_dirty;
+ fed->fed_dirty = 0;
+ fed->fed_grant = 0;
+
+ spin_unlock(&obd->obd_osfs_lock);
+}
+
static int filter_destroy_export(struct obd_export *exp)
{
ENTRY;
if (exp->exp_obd->obd_replayable)
filter_client_free(exp, exp->exp_flags);
+
+ filter_grant_sanity_check(exp->exp_obd, __FUNCTION__);
+
RETURN(0);
}
/* also incredibly similar to mds_disconnect */
static int filter_disconnect(struct obd_export *exp, int flags)
{
- struct filter_obd *filter = &exp->exp_obd->u.filter;
- struct filter_export_data *fed = &exp->exp_filter_data;
+ struct obd_device *obd = exp->exp_obd;
unsigned long irqflags;
struct llog_ctxt *ctxt;
int rc;
ENTRY;
LASSERT(exp);
-
- /* This would imply RPCs still in flight or preprw/commitrw imbalance */
- if (fed->fed_pending)
- CWARN("%s: cli %s has %lu pending at disconnect time\n",
- exp->exp_obd->obd_name, exp->exp_client_uuid.uuid,
- fed->fed_pending);
-
- /* Forget what this client had cached. This is also done on the
- * client when it invalidates its import. Do this before unlinking
- * from the export list so filter_grant_sanity_check totals are OK. */
- spin_lock(&exp->exp_obd->obd_osfs_lock);
- LASSERTF(exp->exp_obd->u.filter.fo_tot_dirty >= fed->fed_dirty,
- "%s: tot_dirty "LPU64" cli %s/%p fed_dirty %lu\n",
- exp->exp_obd->obd_name, exp->exp_obd->u.filter.fo_tot_dirty,
- exp->exp_client_uuid.uuid, exp, fed->fed_dirty);
- exp->exp_obd->u.filter.fo_tot_dirty -= fed->fed_dirty;
- LASSERTF(exp->exp_obd->u.filter.fo_tot_granted >= fed->fed_grant,
- "%s: tot_granted "LPU64" cli %s/%p fed_grant %lu\n",
- exp->exp_obd->obd_name, exp->exp_obd->u.filter.fo_tot_granted,
- exp->exp_client_uuid.uuid, exp, fed->fed_grant);
- exp->exp_obd->u.filter.fo_tot_granted -= fed->fed_grant;
- LASSERTF(exp->exp_obd->u.filter.fo_tot_pending >= fed->fed_pending,
- "%s: tot_pending "LPU64" cli %s/%p fed_pending %lu\n",
- exp->exp_obd->obd_name, exp->exp_obd->u.filter.fo_tot_pending,
- exp->exp_client_uuid.uuid, exp, fed->fed_pending);
- fed->fed_dirty = 0;
- fed->fed_grant = 0;
- spin_unlock(&exp->exp_obd->obd_osfs_lock);
-
- ldlm_cancel_locks_for_export(exp);
+ class_export_get(exp);
spin_lock_irqsave(&exp->exp_lock, irqflags);
exp->exp_flags = flags;
spin_unlock_irqrestore(&exp->exp_lock, irqflags);
- fsfilt_sync(exp->exp_obd, filter->fo_sb);
+ filter_grant_discard(exp);
+
+ /* Disconnect early so that clients can't keep using export */
+ rc = class_disconnect(exp, flags);
+
+ /* Do this twice in case a BRW arrived between the first call and
+ * the class_export_unlink() call (bug 2663) */
+ filter_grant_discard(exp);
+
+ ldlm_cancel_locks_for_export(exp);
+
+ fsfilt_sync(obd, obd->u.filter.fo_sb);
/* flush any remaining cancel messages out to the target */
- ctxt = llog_get_context(exp->exp_obd, LLOG_UNLINK_REPL_CTXT);
+ ctxt = llog_get_context(obd, LLOG_UNLINK_REPL_CTXT);
llog_sync(ctxt, exp);
- rc = class_disconnect(exp, flags);
+ class_export_put(exp);
RETURN(rc);
}
RETURN(rc);
}
-/* debugging to make sure that nothing bad happens, can be turned off soon.
- * caller must hold osfs lock */
-static void filter_grant_total_exports(struct obd_device *obd,
- obd_size *tot_dirty,
- obd_size *tot_pending,
- obd_size *tot_granted,
- obd_size maxsize)
-{
- struct filter_export_data *fed;
- struct obd_export *exp_pos;
-
- spin_lock(&obd->obd_dev_lock);
- list_for_each_entry(exp_pos, &obd->obd_exports, exp_obd_chain) {
- fed = &exp_pos->exp_filter_data;
- LASSERTF(fed->fed_dirty <= maxsize, "cli %s/%p %lu > "LPU64"\n",
- exp_pos->exp_client_uuid.uuid, exp_pos,
- fed->fed_dirty, maxsize);
- LASSERTF(fed->fed_grant + fed->fed_pending <= maxsize,
- "cli %s/%p %lu+%lu > "LPU64"\n",
- exp_pos->exp_client_uuid.uuid, exp_pos,
- fed->fed_grant, fed->fed_pending, maxsize);
- *tot_dirty += fed->fed_dirty;
- *tot_pending += fed->fed_pending;
- *tot_granted += fed->fed_grant + fed->fed_pending;
- }
- spin_unlock(&obd->obd_dev_lock);
-}
-
-static void filter_grant_sanity_check(obd_size tot_dirty, obd_size tot_pending,
- obd_size tot_granted,
- obd_size fo_tot_dirty,
- obd_size fo_tot_pending,
- obd_size fo_tot_granted, obd_size maxsize)
-{
- LASSERTF(tot_dirty == fo_tot_dirty, LPU64" != "LPU64"\n",
- tot_dirty, fo_tot_dirty);
- LASSERTF(tot_pending == fo_tot_pending, LPU64" != "LPU64"\n",
- tot_pending, fo_tot_pending);
- LASSERTF(tot_granted == fo_tot_granted, LPU64" != "LPU64"\n",
- tot_granted, fo_tot_granted);
- LASSERTF(tot_dirty <= maxsize, LPU64" > "LPU64"\n", tot_dirty, maxsize);
- LASSERTF(tot_pending <= tot_granted, LPU64" > "LPU64"\n", tot_pending,
- tot_granted);
- LASSERTF(tot_granted <= maxsize, LPU64" > "LPU64"\n",
- tot_granted, maxsize);
-}
-
static int filter_statfs(struct obd_device *obd, struct obd_statfs *osfs,
unsigned long max_age)
{
struct filter_obd *filter = &obd->u.filter;
- obd_size tot_cached = 0, tot_pending = 0, tot_granted = 0;
- obd_size fo_tot_cached, fo_tot_pending, fo_tot_granted;
int blockbits = filter->fo_sb->s_blocksize_bits;
int rc;
ENTRY;
spin_lock(&obd->obd_osfs_lock);
rc = fsfilt_statfs(obd, filter->fo_sb, max_age);
memcpy(osfs, &obd->obd_osfs, sizeof(*osfs));
- filter_grant_total_exports(obd, &tot_cached, &tot_pending, &tot_granted,
- osfs->os_blocks << blockbits);
- fo_tot_cached = filter->fo_tot_dirty;
- fo_tot_pending = filter->fo_tot_pending;
- fo_tot_granted = filter->fo_tot_granted;
spin_unlock(&obd->obd_osfs_lock);
- /* Do check outside spinlock, to avoid wedging system on failure */
- filter_grant_sanity_check(tot_cached, tot_pending, tot_granted,
- fo_tot_cached, fo_tot_pending,
- fo_tot_granted, osfs->os_blocks << blockbits);
-
CDEBUG(D_SUPER | D_CACHE, "blocks cached "LPU64" granted "LPU64
- "pending "LPU64" free "LPU64" avail "LPU64"\n",
- tot_cached >> blockbits, tot_granted >> blockbits,
- tot_pending >> blockbits, osfs->os_bfree, osfs->os_bavail);
+ " pending "LPU64" free "LPU64" avail "LPU64"\n",
+ filter->fo_tot_dirty, filter->fo_tot_granted,
+ filter->fo_tot_pending,
+ osfs->os_bfree << blockbits, osfs->os_bavail << blockbits);
+
+ filter_grant_sanity_check(obd, __FUNCTION__);
osfs->os_bavail -= min(osfs->os_bavail,
- (tot_cached +tot_pending +osfs->os_bsize -1) >>
- blockbits);
+ (filter->fo_tot_dirty + filter->fo_tot_pending +
+ osfs->os_bsize -1) >> blockbits);
RETURN(rc);
}
};
static int filter_llog_init(struct obd_device *obd, struct obd_device *tgt,
- int count, struct llog_logid *logid)
+ int count, struct llog_catid *logid)
{
struct llog_ctxt *ctxt;
int rc;
struct niobuf_local *, struct obd_trans_info *);
int filter_commitrw(int cmd, struct obd_export *, struct obdo *, int objcount,
struct obd_ioobj *, int niocount, struct niobuf_local *,
- struct obd_trans_info *);
+ struct obd_trans_info *, int rc);
int filter_brw(int cmd, struct obd_export *, struct obdo *,
struct lov_stripe_md *, obd_count oa_bufs, struct brw_page *,
struct obd_trans_info *);
/* filter_io_*.c */
int filter_commitrw_write(struct obd_export *exp, struct obdo *oa, int objcount,
struct obd_ioobj *obj, int niocount,
- struct niobuf_local *res, struct obd_trans_info *oti);
+ struct niobuf_local *res, struct obd_trans_info *oti,
+ int rc);
obd_size filter_grant_space_left(struct obd_export *exp);
long filter_grant(struct obd_export *exp, obd_size current_grant,
obd_size want, obd_size fs_space_left);
struct obd_device *obd = exp->exp_obd;
ENTRY;
+ LASSERT_SPIN_LOCKED(&obd->obd_osfs_lock);
+
if ((oa->o_valid & (OBD_MD_FLBLOCKS|OBD_MD_FLGRANT)) !=
(OBD_MD_FLBLOCKS|OBD_MD_FLGRANT)) {
oa->o_valid &= ~OBD_MD_FLGRANT;
* leave this here in case there is a large error in accounting. */
CDEBUG(oa->o_grant > fed->fed_grant + FILTER_GRANT_CHUNK ?
D_ERROR : D_CACHE,
- "%s: cli %s reports granted: "LPU64" dropped: %u, local: %lu\n",
- obd->obd_name, exp->exp_client_uuid.uuid, oa->o_grant,
+ "%s: cli %s/%p reports grant: "LPU64" dropped: %u, local: %lu\n",
+ obd->obd_name, exp->exp_client_uuid.uuid, exp, oa->o_grant,
oa->o_dropped, fed->fed_grant);
/* Update our accounting now so that statfs takes it into account.
* on fed_dirty however. */
obd->u.filter.fo_tot_dirty += oa->o_dirty - fed->fed_dirty;
if (fed->fed_grant < oa->o_dropped) {
- CERROR("%s: cli %s reports %u dropped > fed_grant %lu\n",
- obd->obd_name, exp->exp_client_uuid.uuid,
+ CERROR("%s: cli %s/%p reports %u dropped > fed_grant %lu\n",
+ obd->obd_name, exp->exp_client_uuid.uuid, exp,
oa->o_dropped, fed->fed_grant);
oa->o_dropped = 0;
}
if (obd->u.filter.fo_tot_granted < oa->o_dropped) {
- CERROR("%s: cli %s reports %u dropped > tot_granted "LPU64"\n",
- obd->obd_name, exp->exp_client_uuid.uuid,
+ CERROR("%s: cli %s/%p reports %u dropped > tot_grant "LPU64"\n",
+ obd->obd_name, exp->exp_client_uuid.uuid, exp,
oa->o_dropped, obd->u.filter.fo_tot_granted);
oa->o_dropped = 0;
}
obd_size tot_granted = obd->u.filter.fo_tot_granted, avail, left = 0;
int rc, statfs_done = 0;
+ LASSERT_SPIN_LOCKED(&obd->obd_osfs_lock);
+
if (time_before(obd->obd_osfs_age, jiffies - HZ)) {
restat:
rc = fsfilt_statfs(obd, obd->u.filter.fo_sb, jiffies + 1);
if (left < tot_granted - obd->u.filter.fo_tot_pending &&
time_after(jiffies, next)) {
spin_unlock(&obd->obd_osfs_lock);
- CERROR("%s: cli %s granted "LPU64" more than available "
+ CERROR("%s: cli %s/%p grant "LPU64" > available "
LPU64" and pending "LPU64"\n", obd->obd_name,
- exp->exp_client_uuid.uuid, tot_granted, left,
- obd->u.filter.fo_tot_pending);
+ exp->exp_client_uuid.uuid, exp, tot_granted,
+ left, obd->u.filter.fo_tot_pending);
if (next == 0)
portals_debug_dumplog();
next = jiffies + 20 * HZ;
left = 0;
}
- CDEBUG(D_CACHE, "%s: cli %s free: "LPU64" avail: "LPU64" grant "LPU64
+ CDEBUG(D_CACHE, "%s: cli %s/%p free: "LPU64" avail: "LPU64" grant "LPU64
" left: "LPU64" pending: "LPU64"\n", obd->obd_name,
- exp->exp_client_uuid.uuid, obd->obd_osfs.os_bfree << blockbits,
- avail << blockbits, tot_granted, left,
- obd->u.filter.fo_tot_pending);
+ exp->exp_client_uuid.uuid, exp,
+ obd->obd_osfs.os_bfree << blockbits, avail << blockbits,
+ tot_granted, left, obd->u.filter.fo_tot_pending);
return left;
}
int blockbits = obd->u.filter.fo_sb->s_blocksize_bits;
__u64 grant = 0;
+ LASSERT_SPIN_LOCKED(&obd->obd_osfs_lock);
+
/* Grant some fraction of the client's requested grant space so that
* they are not always waiting for write credits (not all of it to
* avoid overgranting in face of multiple RPCs in flight). This
}
}
- CDEBUG(D_CACHE,"%s: cli %s wants: "LPU64" granting: "LPU64"\n",
- obd->obd_name, exp->exp_client_uuid.uuid, want, grant);
+ CDEBUG(D_CACHE,"%s: cli %s/%p wants: "LPU64" granting: "LPU64"\n",
+ obd->obd_name, exp->exp_client_uuid.uuid, exp, want, grant);
CDEBUG(D_CACHE,
- "%s: cli %s tot cached:"LPU64" granted:"LPU64
+ "%s: cli %s/%p tot cached:"LPU64" granted:"LPU64
" num_exports: %d\n", obd->obd_name, exp->exp_client_uuid.uuid,
- obd->u.filter.fo_tot_dirty,
+ exp, obd->u.filter.fo_tot_dirty,
obd->u.filter.fo_tot_granted, obd->obd_num_exports);
return grant;
unsigned long used = 0, ungranted = 0, using;
int i, rc = -ENOSPC, obj, n = 0, mask = D_CACHE;
+ LASSERT_SPIN_LOCKED(&exp->exp_obd->obd_osfs_lock);
+
for (obj = 0; obj < objcount; obj++) {
for (i = 0; i < fso[obj].fso_bufcnt; i++, n++) {
int tmp, bytes;
if (rnb[n].flags & OBD_BRW_FROM_GRANT) {
if (fed->fed_grant < used + bytes) {
CDEBUG(D_CACHE,
- "%s: cli %s claims %ld+%d GRANT,"
- " no such grant %lu, idx %d\n",
+ "%s: cli %s/%p claims %ld+%d "
+ "GRANT, real grant %lu idx %d\n",
exp->exp_obd->obd_name,
- exp->exp_client_uuid.uuid,
+ exp->exp_client_uuid.uuid, exp,
used, bytes, fed->fed_grant, n);
mask = D_ERROR;
} else {
* ignore this error. */
lnb[n].rc = -ENOSPC;
rnb[n].flags &= OBD_BRW_GRANTED;
- CDEBUG(D_CACHE, "%s: cli %s idx %d no space for %d\n",
+ CDEBUG(D_CACHE,"%s: cli %s/%p idx %d no space for %d\n",
exp->exp_obd->obd_name,
- exp->exp_client_uuid.uuid, n, bytes);
+ exp->exp_client_uuid.uuid, exp, n, bytes);
}
}
exp->exp_obd->u.filter.fo_tot_pending += used;
CDEBUG(mask,
- "%s: cli %s used: %lu ungranted: %lu grant: %lu dirty: %lu\n",
- exp->exp_obd->obd_name, exp->exp_client_uuid.uuid, used,
+ "%s: cli %s/%p used: %lu ungranted: %lu grant: %lu dirty: %lu\n",
+ exp->exp_obd->obd_name, exp->exp_client_uuid.uuid, exp, used,
ungranted, fed->fed_grant, fed->fed_dirty);
/* Rough calc in case we don't refresh cached statfs data */
exp->exp_obd->obd_osfs.os_bavail = 0;
if (fed->fed_dirty < used) {
- CERROR("%s: cli %s claims used %lu > fed_dirty %lu\n",
- exp->exp_obd->obd_name, exp->exp_client_uuid.uuid,
+ CERROR("%s: cli %s/%p claims used %lu > fed_dirty %lu\n",
+ exp->exp_obd->obd_name, exp->exp_client_uuid.uuid, exp,
used, fed->fed_dirty);
used = fed->fed_dirty;
}
static int filter_commitrw_read(struct obd_export *exp, struct obdo *oa,
int objcount, struct obd_ioobj *obj,
int niocount, struct niobuf_local *res,
- struct obd_trans_info *oti)
+ struct obd_trans_info *oti, int rc)
{
struct obd_ioobj *o;
struct niobuf_local *lnb;
page_cache_release(lnb->page);
}
}
+
if (res->dentry != NULL)
f_dput(res->dentry);
- RETURN(0);
+ RETURN(rc);
}
void flip_into_page_cache(struct inode *inode, struct page *new_page)
int filter_commitrw(int cmd, struct obd_export *exp, struct obdo *oa,
int objcount, struct obd_ioobj *obj, int niocount,
- struct niobuf_local *res, struct obd_trans_info *oti)
+ struct niobuf_local *res, struct obd_trans_info *oti,int rc)
{
if (cmd == OBD_BRW_WRITE)
return filter_commitrw_write(exp, oa, objcount, obj, niocount,
- res, oti);
+ res, oti, rc);
if (cmd == OBD_BRW_READ)
return filter_commitrw_read(exp, oa, objcount, obj, niocount,
- res, oti);
+ res, oti, rc);
LBUG();
return -EPROTO;
}
kunmap(pga[i].pg);
}
- ret = filter_commitrw(cmd, exp, oa, 1, &ioo, oa_bufs, lnb, oti);
+ ret = filter_commitrw(cmd, exp, oa, 1, &ioo, oa_bufs, lnb, oti, ret);
out:
if (lnb)
int filter_commitrw_write(struct obd_export *exp, struct obdo *oa, int objcount,
struct obd_ioobj *obj, int niocount,
- struct niobuf_local *res, struct obd_trans_info *oti)
+ struct niobuf_local *res, struct obd_trans_info *oti,
+ int rc)
{
struct obd_device *obd = exp->exp_obd;
struct obd_run_ctxt saved;
struct iattr iattr = { 0 };
struct kiobuf *iobuf;
struct inode *inode = NULL;
- int rc = 0, i, n, cleanup_phase = 0, err;
+ int i, n, cleanup_phase = 0, err;
unsigned long now = jiffies; /* DEBUGGING OST TIMEOUTS */
void *wait_handle;
ENTRY;
LASSERT(objcount == 1);
LASSERT(current->journal_info == NULL);
+ if (rc != 0)
+ GOTO(cleanup, rc);
+
rc = alloc_kiovec(1, &iobuf);
if (rc)
GOTO(cleanup, rc);
f_dput(dentry);
lvb->lvb_size = dentry->d_inode->i_size;
- lvb->lvb_time = LTIME_S(dentry->d_inode->i_mtime);
+ lvb->lvb_mtime = LTIME_S(dentry->d_inode->i_mtime);
+ CDEBUG(D_DLMTRACE, "res: "LPU64" initial lvb size: "LPU64", mtime: "
+ LPU64"\n", res->lr_name.name[0], lvb->lvb_size, lvb->lvb_mtime);
out:
if (oa)
lvb->lvb_size, new->lvb_size);
lvb->lvb_size = new->lvb_size;
}
- if (new->lvb_time > lvb->lvb_time) {
- CDEBUG(D_DLMTRACE, "res: "LPU64" updating lvb time: "
+ if (new->lvb_mtime > lvb->lvb_mtime) {
+ CDEBUG(D_DLMTRACE, "res: "LPU64" updating lvb mtime: "
LPU64" -> "LPU64"\n", res->lr_name.name[0],
- lvb->lvb_time, new->lvb_time);
- lvb->lvb_time = new->lvb_time;
+ lvb->lvb_mtime, new->lvb_mtime);
+ lvb->lvb_mtime = new->lvb_mtime;
}
GOTO(out, rc = 0);
}
obdo_from_inode(oa, dentry->d_inode, FILTER_VALID_FLAGS);
lvb->lvb_size = dentry->d_inode->i_size;
- lvb->lvb_time = LTIME_S(dentry->d_inode->i_mtime);
- CDEBUG(D_DLMTRACE, "res: "LPU64" initial lvb size: "LPU64", time: "
- LPU64"\n", res->lr_name.name[0], lvb->lvb_size, lvb->lvb_time);
+ lvb->lvb_mtime = LTIME_S(dentry->d_inode->i_mtime);
+ CDEBUG(D_DLMTRACE, "res: "LPU64" disk lvb size: "LPU64", mtime: "
+ LPU64"\n", res->lr_name.name[0], lvb->lvb_size, lvb->lvb_mtime);
f_dput(dentry);
out:
int *eof, void *data)
{
struct obd_device *obd = data;
- struct obd_export *exp;
- if (obd == NULL || list_empty(&obd->obd_exports))
+ if (obd == NULL)
return 0;
- spin_lock(&obd->obd_dev_lock);
- exp = list_entry(obd->obd_exports.next, struct obd_export,
- exp_obd_chain);
- spin_unlock(&obd->obd_dev_lock);
-
return snprintf(page, count, "%d\n",
- exp->exp_osc_data.oed_oscc.oscc_kick_barrier);
+ obd->u.cli.cl_oscc.oscc_kick_barrier);
}
int osc_wr_create_low_wm(struct file *file, const char *buffer,
unsigned long count, void *data)
{
struct obd_device *obd = data;
- struct obd_export *exp;
int val, rc;
- if (obd == NULL || list_empty(&obd->obd_exports))
+ if (obd == NULL)
return 0;
rc = lprocfs_write_helper(buffer, count, &val);
return -ERANGE;
spin_lock(&obd->obd_dev_lock);
- exp = list_entry(obd->obd_exports.next, struct obd_export,
- exp_obd_chain);
- exp->exp_osc_data.oed_oscc.oscc_kick_barrier = val;
+ obd->u.cli.cl_oscc.oscc_kick_barrier = val;
spin_unlock(&obd->obd_dev_lock);
return count;
int *eof, void *data)
{
struct obd_device *obd = data;
- struct obd_export *exp;
- if (obd == NULL || list_empty(&obd->obd_exports))
+ if (obd == NULL)
return 0;
- spin_lock(&obd->obd_dev_lock);
- exp = list_entry(obd->obd_exports.next, struct obd_export,
- exp_obd_chain);
- spin_unlock(&obd->obd_dev_lock);
-
return snprintf(page, count, "%d\n",
- exp->exp_osc_data.oed_oscc.oscc_grow_count);
+ obd->u.cli.cl_oscc.oscc_grow_count);
}
int osc_wr_create_count(struct file *file, const char *buffer,
unsigned long count, void *data)
{
struct obd_device *obd = data;
- struct obd_export *exp;
int val, rc;
- if (obd == NULL || list_empty(&obd->obd_exports))
+ if (obd == NULL)
return 0;
rc = lprocfs_write_helper(buffer, count, &val);
if (val < 0)
return -ERANGE;
- spin_lock(&obd->obd_dev_lock);
- exp = list_entry(obd->obd_exports.next, struct obd_export,
- exp_obd_chain);
- exp->exp_osc_data.oed_oscc.oscc_grow_count = val;
- spin_unlock(&obd->obd_dev_lock);
+ obd->u.cli.cl_oscc.oscc_grow_count = val;
return count;
}
int *eof, void *data)
{
struct obd_device *obd = data;
- struct obd_export *exp;
- if (obd == NULL || list_empty(&obd->obd_exports))
+ if (obd == NULL)
return 0;
- spin_lock(&obd->obd_dev_lock);
- exp = list_entry(obd->obd_exports.next, struct obd_export,
- exp_obd_chain);
- spin_unlock(&obd->obd_dev_lock);
-
return snprintf(page, count, LPU64"\n",
- exp->exp_osc_data.oed_oscc.oscc_next_id);
+ obd->u.cli.cl_oscc.oscc_next_id);
}
int osc_rd_prealloc_last_id(char *page, char **start, off_t off, int count,
int *eof, void *data)
{
struct obd_device *obd = data;
- struct obd_export *exp;
- if (obd == NULL || list_empty(&obd->obd_exports))
+ if (obd == NULL)
return 0;
- spin_lock(&obd->obd_dev_lock);
- exp = list_entry(obd->obd_exports.next, struct obd_export,
- exp_obd_chain);
- spin_unlock(&obd->obd_dev_lock);
-
return snprintf(page, count, LPU64"\n",
- exp->exp_osc_data.oed_oscc.oscc_last_id);
+ obd->u.cli.cl_oscc.oscc_last_id);
}
static struct lprocfs_vars lprocfs_obd_vars[] = {
oscc->oscc_flags |= OSCC_FLAG_CREATING;
spin_unlock(&oscc->oscc_lock);
- request = ptlrpc_prep_req(class_exp2cliimp(oscc->oscc_exp), OST_CREATE,
+ request = ptlrpc_prep_req(oscc->oscc_obd->u.cli.cl_import, OST_CREATE,
1, &size, NULL);
if (request == NULL) {
spin_lock(&oscc->oscc_lock);
ost_full = (oscc->oscc_flags & OSCC_FLAG_NOSPC);
spin_unlock(&oscc->oscc_lock);
- osc_invalid = class_exp2cliimp(oscc->oscc_exp)->imp_invalid;
-
+ osc_invalid = oscc->oscc_obd->u.cli.cl_import->imp_invalid;
+
return have_objs || ost_full || osc_invalid;
}
if (!oscc_has_objects(oscc, 1) && (oscc->oscc_flags & OSCC_FLAG_NOSPC))
rc = -ENOSPC;
- if (class_exp2cliimp(oscc->oscc_exp)->imp_invalid)
+ if (oscc->oscc_obd->u.cli.cl_import->imp_invalid)
rc = -EIO;
RETURN(rc);
struct lov_stripe_md **ea, struct obd_trans_info *oti)
{
struct lov_stripe_md *lsm;
- struct osc_creator *oscc = &exp->u.eu_osc_data.oed_oscc;
+ struct osc_creator *oscc = &exp->exp_obd->u.cli.cl_oscc;
int try_again = 1, rc = 0;
ENTRY;
LASSERT(oa);
oa->o_valid |= OBD_MD_FLID;
oa->o_id = oscc->oscc_next_id - 1;
- rc = osc_real_create(oscc->oscc_exp, oa, ea, NULL);
+ rc = osc_real_create(exp, oa, ea, NULL);
spin_lock(&oscc->oscc_lock);
if (rc == -ENOSPC)
rc = l_wait_event(oscc->oscc_waitq, !oscc_recovering(oscc),
&lwi);
LASSERT(rc == 0 || rc == -ETIMEDOUT);
- if (rc == -ETIMEDOUT)
+ if (rc == -ETIMEDOUT) {
+ CDEBUG(D_HA, "%p: timed out waiting for recovery\n", oscc);
RETURN(rc);
+ }
CDEBUG(D_HA, "%p: oscc recovery over, waking up\n", oscc);
}
RETURN(rc);
}
-void oscc_init(struct obd_export *exp)
+void oscc_init(struct obd_device *obd)
{
- struct osc_export_data *oed;
+ struct osc_creator *oscc;
- if (exp == NULL)
+ if (obd == NULL)
return;
- oed = &exp->exp_osc_data;
- memset(oed, 0, sizeof(*oed));
- INIT_LIST_HEAD(&oed->oed_oscc.oscc_list);
- init_waitqueue_head(&oed->oed_oscc.oscc_waitq);
- spin_lock_init(&oed->oed_oscc.oscc_lock);
- oed->oed_oscc.oscc_exp = exp;
- oed->oed_oscc.oscc_kick_barrier = 100;
- oed->oed_oscc.oscc_grow_count = 2000;
- oed->oed_oscc.oscc_initial_create_count = 2000;
-
- oed->oed_oscc.oscc_next_id = 2;
- oed->oed_oscc.oscc_last_id = 1;
- oed->oed_oscc.oscc_flags |= OSCC_FLAG_RECOVERING;
+ oscc = &obd->u.cli.cl_oscc;
+
+ memset(oscc, 0, sizeof(*oscc));
+ INIT_LIST_HEAD(&oscc->oscc_list);
+ init_waitqueue_head(&oscc->oscc_waitq);
+ spin_lock_init(&oscc->oscc_lock);
+ oscc->oscc_obd = obd;
+ oscc->oscc_kick_barrier = 100;
+ oscc->oscc_grow_count = 2000;
+ oscc->oscc_initial_create_count = 2000;
+
+ oscc->oscc_next_id = 2;
+ oscc->oscc_last_id = 1;
+ oscc->oscc_flags |= OSCC_FLAG_RECOVERING;
/* XXX the export handle should give the oscc the last object */
/* oed->oed_oscc.oscc_last_id = exph->....; */
}