}
#ifdef __KERNEL__
-#include <linux/lustre_version.h>
-#if (LUSTRE_KERNEL_VERSION >= 30)
-#warning "FIXME: remove workaround when l30 is widely used"
char stack_backtrace[LUSTRE_TRACE_SIZE];
spinlock_t stack_backtrace_lock = SPIN_LOCK_UNLOCKED;
char *portals_debug_dumpstack(void)
{
asm("int $3");
- return "dump stack";
+ return "dump stack\n";
}
#elif defined(__i386__)
#endif /* __arch_um__ */
EXPORT_SYMBOL(stack_backtrace_lock);
EXPORT_SYMBOL(portals_debug_dumpstack);
-#endif /* LUSTRE_KERNEL_VERSION < 30 */
#endif /* __KERNEL__ */
EXPORT_SYMBOL(portals_debug_dumplog);
- reduce journal credits needed for BRW writes (2370)
- orphan handling to avoid losing space on client/server crashes
- ptlrpcd can be blocked, stopping ALL progress (2477)
+ - use lock value blocks to assist in proper KMS, faster stat (1021)
+ - takes i_sem instead of DLM locks internally on obdfilter (2720)
- recovery for initial connections (2355)
- fixes for mds_cleanup_orphans (1934)
- abort_recovery crashes MDS in b_eq (mds_unlink_orphan) (2584)
#define ll_pgcache_lock(mapping) spin_lock(&mapping->page_lock)
#define ll_pgcache_unlock(mapping) spin_unlock(&mapping->page_lock)
+#define ll_call_writepage(inode, page) \
+ (inode)->i_mapping->a_ops->writepage(page, NULL)
+#define ll_truncate_complete_page(page) \
+ truncate_complete_page(page->mapping, page)
#define ll_vfs_create(a,b,c,d) vfs_create(a,b,c,d)
current->tty = NULL;
}
-#define rb_node_s rb_node
-#define rb_root_s rb_root
-typedef struct rb_root_s rb_root_t;
-typedef struct rb_node_s rb_node_t;
-
#define smp_num_cpus NR_CPUS
#ifndef conditional_schedule
#define ll_pgcache_lock(mapping) spin_lock(&pagecache_lock)
#define ll_pgcache_unlock(mapping) spin_unlock(&pagecache_lock)
+#define ll_call_writepage(inode, page) \
+ (inode)->i_mapping->a_ops->writepage(page)
+#define ll_truncate_complete_page(page) truncate_complete_page(page)
static inline void __d_drop(struct dentry *dentry)
{
/* These are flags that are mapped into the flags and ASTs of blocking locks */
#define LDLM_AST_DISCARD_DATA 0x80000000 /* Add FL_DISCARD to blocking ASTs */
-
/* Flags sent in AST lock_flags to be mapped into the receiving lock. */
#define LDLM_AST_FLAGS (LDLM_FL_DISCARD_DATA)
+/* XXX FIXME: This is being added to b_size as a low-risk fix to the fact that
+ * the LVB filling happens _after_ the lock has been granted, so another thread
+ * can match before the LVB has been updated. As a dirty hack, we set
+ * LDLM_FL_CAN_MATCH only after we've done the LVB poop.
+ *
+ * The proper fix is to do the granting inside of the completion AST, which can
+ * be replaced with a LVB-aware wrapping function for OSC locks. That change is
+ * pretty high-risk, though, and would need a lot more testing. */
+#define LDLM_FL_CAN_MATCH 0x100000
+
/* The blocking callback is overloaded to perform two functions. These flags
* indicate which operation should be performed. */
#define LDLM_CB_BLOCKING 1
-
*/
+struct ldlm_lock;
+struct ldlm_resource;
+struct ldlm_namespace;
+
+typedef int (*ldlm_res_policy)(struct ldlm_namespace *, struct ldlm_lock **,
+ void *req_cookie, ldlm_mode_t mode, int flags,
+ void *data);
+
+struct ldlm_valblock_ops {
+ int (*lvbo_init)(struct ldlm_resource *res);
+ int (*lvbo_update)(struct ldlm_resource *res, struct lustre_msg *m,
+ int buf_idx);
+};
+
struct ldlm_namespace {
char *ns_name;
__u32 ns_client; /* is this a client-side lock tree? */
spinlock_t ns_counter_lock;
__u64 ns_locks;
__u64 ns_resources;
+ ldlm_res_policy ns_policy;
+ struct ldlm_valblock_ops *ns_lvbo;
+ void *ns_lvbp;
};
/*
int flag);
typedef int (*ldlm_completion_callback)(struct ldlm_lock *lock, int flags,
void *data);
+typedef int (*ldlm_glimpse_callback)(struct ldlm_lock *lock, void *data);
struct ldlm_lock {
struct portals_handle l_handle; // must be first in the structure
ldlm_completion_callback l_completion_ast;
ldlm_blocking_callback l_blocking_ast;
+ ldlm_glimpse_callback l_glimpse_ast;
void *l_ast_data;
struct obd_export *l_export;
__u32 l_flags;
struct lustre_handle l_remote_handle;
ldlm_policy_data_t l_policy_data;
- __u32 l_version[RES_VERSION_SIZE];
+
+ /* This LVB is used only on the client side, as temporary storage for
+ * a lock value block received during an enqueue */
+ __u32 l_lvb_len;
+ void *l_lvb_data;
+ void *l_lvb_swabber;
__u32 l_readers;
__u32 l_writers;
* on this waitq to learn when it becomes granted. */
wait_queue_head_t l_waitq;
struct timeval l_enqueued_time;
+ unsigned long l_last_used; /* jiffies */
};
-typedef int (*ldlm_res_policy)(struct ldlm_namespace *, struct ldlm_lock **,
- void *req_cookie, ldlm_mode_t mode, int flags,
- void *data);
#define LDLM_PLAIN 10
#define LDLM_EXTENT 11
__u32 lr_type; /* LDLM_PLAIN or LDLM_EXTENT */
struct ldlm_resource *lr_root;
struct ldlm_res_id lr_name;
- __u32 lr_version[RES_VERSION_SIZE];
atomic_t lr_refcount;
+ /* Server-side-only lock value block elements */
+ struct semaphore lr_lvb_sem;
+ __u32 lr_lvb_len;
+ void *lr_lvb_data;
+
/* lr_tmp holds a list head temporarily, during the building of a work
* queue. see ldlm_add_ast_work_item and ldlm_run_ast_work */
void *lr_tmp;
#define LDLM_DEBUG_NOLOCK(format, a...) \
CDEBUG(D_DLMTRACE, "### " format "\n" , ## a)
+typedef int (*ldlm_processing_policy)(struct ldlm_lock *lock, int *flags,
+ int first_enq, ldlm_error_t *err);
+
/*
* Iterators.
*/
/* ldlm_flock.c */
int ldlm_flock_completion_ast(struct ldlm_lock *lock, int flags, void *data);
+/* ldlm_extent.c */
+__u64 ldlm_extent_shift_kms(struct ldlm_lock *lock, __u64 old_kms);
+
+
/* ldlm_lockd.c */
int ldlm_server_blocking_ast(struct ldlm_lock *, struct ldlm_lock_desc *,
void *data, int flag);
int ldlm_server_completion_ast(struct ldlm_lock *lock, int flags, void *data);
+int ldlm_server_glimpse_ast(struct ldlm_lock *lock, void *data);
int ldlm_handle_enqueue(struct ptlrpc_request *req, ldlm_completion_callback,
- ldlm_blocking_callback);
+ ldlm_blocking_callback, ldlm_glimpse_callback);
int ldlm_handle_convert(struct ptlrpc_request *req);
int ldlm_handle_cancel(struct ptlrpc_request *req);
int ldlm_del_waiting_lock(struct ldlm_lock *lock);
void ldlm_put_ref(int force);
/* ldlm_lock.c */
-void ldlm_register_intent(ldlm_res_policy arg);
-void ldlm_unregister_intent(void);
+ldlm_processing_policy ldlm_get_processing_policy(struct ldlm_resource *res);
+void ldlm_register_intent(struct ldlm_namespace *ns, ldlm_res_policy arg);
void ldlm_lock2handle(struct ldlm_lock *lock, struct lustre_handle *lockh);
struct ldlm_lock *__ldlm_handle2lock(struct lustre_handle *, int flags);
void ldlm_cancel_callback(struct ldlm_lock *);
void ldlm_lock_addref(struct lustre_handle *lockh, __u32 mode);
void ldlm_lock_decref(struct lustre_handle *lockh, __u32 mode);
void ldlm_lock_decref_and_cancel(struct lustre_handle *lockh, __u32 mode);
+void ldlm_lock_allow_match(struct ldlm_lock *lock);
int ldlm_lock_match(struct ldlm_namespace *ns, int flags, struct ldlm_res_id *,
- __u32 type, void *cookie, int cookielen, ldlm_mode_t mode,
+ __u32 type, ldlm_policy_data_t *, ldlm_mode_t mode,
struct lustre_handle *);
struct ldlm_resource *ldlm_lock_convert(struct ldlm_lock *lock, int new_mode,
int *flags);
int ldlm_cli_enqueue(struct obd_export *exp,
struct ptlrpc_request *req,
struct ldlm_namespace *ns,
- struct lustre_handle *parent_lock_handle,
struct ldlm_res_id,
__u32 type,
- void *cookie, int cookielen,
+ ldlm_policy_data_t *,
ldlm_mode_t mode,
int *flags,
+ ldlm_blocking_callback blocking,
ldlm_completion_callback completion,
- ldlm_blocking_callback callback,
+ ldlm_glimpse_callback glimpse,
void *data,
+ void *lvb,
+ __u32 lvb_len,
+ void *lvb_swabber,
struct lustre_handle *lockh);
int ldlm_server_ast(struct lustre_handle *lockh, struct ldlm_lock_desc *new,
void *data, __u32 data_len);
int flags, void *opaque);
/* mds/handler.c */
-/* This has to be here because recurisve inclusion sucks. */
+/* This has to be here because recursive inclusion sucks. */
int intent_disposition(struct ldlm_reply *rep, int flag);
void intent_set_disposition(struct ldlm_reply *rep, int flag);
int mds_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
extern void lustre_swab_ost_body (struct ost_body *b);
extern void lustre_swab_ost_last_id(obd_id *id);
+/* lock value block communicated between the filter and llite */
+
+struct ost_lvb {
+ __u64 lvb_size;
+ __u64 lvb_time;
+};
+
+extern void lustre_swab_ost_lvb(struct ost_lvb *);
+
/*
* MDS REQ RECORDS
*/
LDLM_CANCEL = 103,
LDLM_BL_CALLBACK = 104,
LDLM_CP_CALLBACK = 105,
+ LDLM_GL_CALLBACK = 106,
LDLM_LAST_OPC
} ldlm_cmd_t;
#define LDLM_FIRST_OPC LDLM_ENQUEUE
#define RES_NAME_SIZE 4
-#define RES_VERSION_SIZE 4
-
struct ldlm_res_id {
__u64 name[RES_NAME_SIZE];
};
extern void lustre_swab_ldlm_intent (struct ldlm_intent *i);
-/* Note this unaligned structure; as long as it's only used in ldlm_request
- * below, we're probably fine. */
struct ldlm_resource_desc {
__u32 lr_type;
+ __u32 lr_padding;
struct ldlm_res_id lr_name;
- __u32 lr_version[RES_VERSION_SIZE];
} __attribute__((packed));
extern void lustre_swab_ldlm_resource_desc (struct ldlm_resource_desc *r);
ldlm_mode_t l_req_mode;
ldlm_mode_t l_granted_mode;
ldlm_policy_data_t l_policy_data;
- __u32 l_version[RES_VERSION_SIZE];
} __attribute__((packed));
extern void lustre_swab_ldlm_lock_desc (struct ldlm_lock_desc *l);
struct ldlm_request {
__u32 lock_flags;
+ __u32 lock_padding;
struct ldlm_lock_desc lock_desc;
struct lustre_handle lock_handle1;
struct lustre_handle lock_handle2;
struct ldlm_reply {
__u32 lock_flags;
- __u32 lock_mode;
- struct ldlm_res_id lock_resource_name;
+ struct ldlm_lock_desc lock_desc;
struct lustre_handle lock_handle;
- ldlm_policy_data_t lock_policy_data;
__u64 lock_policy_res1;
__u64 lock_policy_res2;
};
int lustre_unpack_msg(struct lustre_msg *m, int len);
void *lustre_msg_buf(struct lustre_msg *m, int n, int minlen);
char *lustre_msg_string (struct lustre_msg *m, int n, int max_len);
+void *lustre_swab_buf(struct lustre_msg *, int n, int minlen, void *swabber);
void *lustre_swab_reqbuf (struct ptlrpc_request *req, int n, int minlen,
void *swabber);
void *lustre_swab_repbuf (struct ptlrpc_request *req, int n, int minlen,
struct loi_oap_pages loi_write_lop;
/* _cli_ is poorly named, it should be _ready_ */
struct list_head loi_cli_item;
- struct list_head loi_write_item;
+ struct list_head loi_write_item;
+
+ __u64 loi_kms; /* known minimum size */
+ __u64 loi_rss; /* recently seen size */
+ __u64 loi_mtime; /* recently seen mtime */
};
static inline void loi_init(struct lov_oinfo *loi)
int objcount, struct obd_ioobj *obj,
int niocount, struct niobuf_local *local,
struct obd_trans_info *oti);
- int (*o_enqueue)(struct obd_export *exp, struct lov_stripe_md *md,
- struct lustre_handle *parent_lock,
- __u32 type, void *cookie, int cookielen, __u32 mode,
- int *flags, void *cb, void *data,
+ int (*o_enqueue)(struct obd_export *, struct lov_stripe_md *,
+ __u32 type, ldlm_policy_data_t *, __u32 mode,
+ int *flags, void *bl_cb, void *cp_cb, void *gl_cb,
+ void *data, __u32 lvb_len, void *lvb_swabber,
struct lustre_handle *lockh);
- int (*o_match)(struct obd_export *exp, struct lov_stripe_md *md,
- __u32 type, void *cookie, int cookielen, __u32 mode,
- int *flags, void *data, struct lustre_handle *lockh);
- int (*o_change_cbdata)(struct obd_export *exp,
- struct lov_stripe_md *lsm, ldlm_iterator_t it,
- void *data);
+ int (*o_match)(struct obd_export *, struct lov_stripe_md *, __u32 type,
+ ldlm_policy_data_t *, __u32 mode, int *flags, void *data,
+ struct lustre_handle *lockh);
+ int (*o_change_cbdata)(struct obd_export *, struct lov_stripe_md *,
+ ldlm_iterator_t it, void *data);
int (*o_cancel)(struct obd_export *, struct lov_stripe_md *md,
__u32 mode, struct lustre_handle *);
int (*o_cancel_unused)(struct obd_export *, struct lov_stripe_md *,
int count, struct llog_logid *logid);
int (*o_llog_finish)(struct obd_device *obd, int count);
- /* only until proper file size mechanics arrive */
- int (*o_lock_contains)(struct obd_export *exp,
- struct lov_stripe_md *lsm,
- struct ldlm_lock *lock, obd_off offset);
-
/* metadata-only methods */
int (*o_pin)(struct obd_export *, obd_id ino, __u32 gen, int type,
struct obd_client_handle *, int flag);
* to lprocfs_alloc_obd_stats() in obdclass/lprocfs_status.c.
* Also, add a wrapper function in include/linux/obd_class.h.
*/
-
};
RETURN(rc);
}
-static inline int obd_enqueue(struct obd_export *exp,
- struct lov_stripe_md *ea,
- struct lustre_handle *parent_lock,
- __u32 type, void *cookie, int cookielen,
- __u32 mode, int *flags, void *cb, void *data,
- struct lustre_handle *lockh)
+static inline int obd_enqueue(struct obd_export *exp, struct lov_stripe_md *ea,
+ __u32 type, ldlm_policy_data_t *policy,
+ __u32 mode, int *flags, void *bl_cb, void *cp_cb,
+ void *gl_cb, void *data, __u32 lvb_len,
+ void *lvb_swabber, struct lustre_handle *lockh)
{
int rc;
ENTRY;
EXP_CHECK_OP(exp, enqueue);
OBD_COUNTER_INCREMENT(exp->exp_obd, enqueue);
- rc = OBP(exp->exp_obd, enqueue)(exp, ea, parent_lock, type,
- cookie, cookielen, mode, flags, cb,
- data, lockh);
+ rc = OBP(exp->exp_obd, enqueue)(exp, ea, type, policy, mode, flags,
+ bl_cb, cp_cb, gl_cb, data, lvb_len,
+ lvb_swabber, lockh);
RETURN(rc);
}
-static inline int obd_match(struct obd_export *exp,
- struct lov_stripe_md *ea, __u32 type, void *cookie,
- int cookielen, __u32 mode, int *flags, void *data,
- struct lustre_handle *lockh)
+static inline int obd_match(struct obd_export *exp, struct lov_stripe_md *ea,
+ __u32 type, ldlm_policy_data_t *policy, __u32 mode,
+ int *flags, void *data, struct lustre_handle *lockh)
{
int rc;
ENTRY;
EXP_CHECK_OP(exp, match);
OBD_COUNTER_INCREMENT(exp->exp_obd, match);
- rc = OBP(exp->exp_obd, match)(exp, ea, type, cookie, cookielen, mode,
- flags, data, lockh);
+ rc = OBP(exp->exp_obd, match)(exp, ea, type, policy, mode, flags, data,
+ lockh);
RETURN(rc);
}
-
static inline int obd_change_cbdata(struct obd_export *exp,
struct lov_stripe_md *lsm,
ldlm_iterator_t it, void *data)
return(rc);
}
-static inline int obd_lock_contains(struct obd_export *exp,
- struct lov_stripe_md *lsm,
- struct ldlm_lock *lock, obd_off offset)
-{
- int rc;
- ENTRY;
-
- EXP_CHECK_OP(exp, lock_contains);
- OBD_COUNTER_INCREMENT(exp->exp_obd, lock_contains);
-
- rc = OBP(exp->exp_obd, lock_contains)(exp, lsm, lock, offset);
- RETURN(rc);
-}
-
static inline void obd_invalidate_import(struct obd_device *obd,
struct obd_import *imp)
{
struct ec_object *ecl_object;
__u64 ecl_cookie;
struct lustre_handle ecl_lock_handle;
- struct ldlm_extent ecl_extent;
+ ldlm_policy_data_t ecl_policy;
__u32 ecl_mode;
};
#define OBD_FAIL_LDLM_CANCEL 0x304
#define OBD_FAIL_LDLM_BL_CALLBACK 0x305
#define OBD_FAIL_LDLM_CP_CALLBACK 0x306
+#define OBD_FAIL_LDLM_GL_CALLBACK 0x307
#define OBD_FAIL_OSC 0x400
#define OBD_FAIL_OSC_BRW_READ_BULK 0x401
+++ /dev/null
-/*
- Red Black Trees
- (C) 1999 Andrea Arcangeli <andrea@suse.de>
-
- This program is free software; you can redistribute it and/or modify
- it under the terms of the GNU General Public License as published by
- the Free Software Foundation; either version 2 of the License, or
- (at your option) any later version.
-
- This program is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU General Public License for more details.
-
- You should have received a copy of the GNU General Public License
- along with this program; if not, write to the Free Software
- Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
-
- linux/include/linux/rbtree.h
-
- To use rbtrees you'll have to implement your own insert and search cores.
- This will avoid us to use callbacks and to drop drammatically performances.
- I know it's not the cleaner way, but in C (not in C++) to get
- performances and genericity...
-
- Some example of insert and search follows here. The search is a plain
- normal search over an ordered tree. The insert instead must be implemented
- int two steps: as first thing the code must insert the element in
- order as a red leaf in the tree, then the support library function
- rb_insert_color() must be called. Such function will do the
- not trivial work to rebalance the rbtree if necessary.
-
------------------------------------------------------------------------
-static inline struct page * rb_search_page_cache(struct inode * inode,
- unsigned long offset)
-{
- rb_node_t * n = inode->i_rb_page_cache.rb_node;
- struct page * page;
-
- while (n)
- {
- page = rb_entry(n, struct page, rb_page_cache);
-
- if (offset < page->offset)
- n = n->rb_left;
- else if (offset > page->offset)
- n = n->rb_right;
- else
- return page;
- }
- return NULL;
-}
-
-static inline struct page * __rb_insert_page_cache(struct inode * inode,
- unsigned long offset,
- rb_node_t * node)
-{
- rb_node_t ** p = &inode->i_rb_page_cache.rb_node;
- rb_node_t * parent = NULL;
- struct page * page;
-
- while (*p)
- {
- parent = *p;
- page = rb_entry(parent, struct page, rb_page_cache);
-
- if (offset < page->offset)
- p = &(*p)->rb_left;
- else if (offset > page->offset)
- p = &(*p)->rb_right;
- else
- return page;
- }
-
- rb_link_node(node, parent, p);
-
- return NULL;
-}
-
-static inline struct page * rb_insert_page_cache(struct inode * inode,
- unsigned long offset,
- rb_node_t * node)
-{
- struct page * ret;
- if ((ret = __rb_insert_page_cache(inode, offset, node)))
- goto out;
- rb_insert_color(node, &inode->i_rb_page_cache);
- out:
- return ret;
-}
------------------------------------------------------------------------
-*/
-
-#ifndef _LINUX_RBTREE_H
-#define _LINUX_RBTREE_H
-
-typedef struct rb_node_s
-{
- struct rb_node_s * rb_parent;
- int rb_color;
-#define RB_RED 0
-#define RB_BLACK 1
- struct rb_node_s * rb_right;
- struct rb_node_s * rb_left;
-}
-rb_node_t;
-
-typedef struct rb_root_s
-{
- struct rb_node_s * rb_node;
-}
-rb_root_t;
-
-#define RB_ROOT (rb_root_t) { NULL, }
-#define rb_entry(ptr, type, member) \
- ((type *)((char *)(ptr)-(unsigned long)(&((type *)0)->member)))
-
-extern void rb_insert_color(rb_node_t *, rb_root_t *);
-extern void rb_erase(rb_node_t *, rb_root_t *);
-extern rb_node_t *rb_get_first(rb_root_t *root);
-extern rb_node_t *rb_get_next(rb_node_t *n);
-
-static inline void rb_link_node(rb_node_t * node, rb_node_t * parent, rb_node_t ** rb_link)
-{
- node->rb_parent = parent;
- node->rb_color = RB_RED;
- node->rb_left = node->rb_right = NULL;
-
- *rb_link = node;
-}
-
-#endif /* _LINUX_RBTREE_H */
+++ 25/arch/parisc/lib/checksum.c 2003-10-05 00:33:23.000000000 -0700
@@ -16,8 +16,10 @@
*
- * $Id: 2.6.0-test6-mm4.patch,v 1.4 2004/02/14 03:14:33 rread Exp $
+ * $Id: 2.6.0-test6-mm4.patch,v 1.5 2004/02/23 23:37:02 phil Exp $
*/
-#include <net/checksum.h>
+#include <linux/module.h>
--- linux-2.6.0-test6/drivers/char/ftape/compressor/zftape-compress.c 2003-06-14 12:18:32.000000000 -0700
+++ 25/drivers/char/ftape/compressor/zftape-compress.c 2003-10-05 00:33:24.000000000 -0700
@@ -31,6 +31,7 @@
- char zftc_rev[] = "$Revision: 1.4 $";
- char zftc_dat[] = "$Date: 2004/02/14 03:14:33 $";
+ char zftc_rev[] = "$Revision: 1.5 $";
+ char zftc_dat[] = "$Date: 2004/02/23 23:37:02 $";
+#include <linux/version.h>
#include <linux/errno.h>
--- linux-2.6.0-test6/drivers/isdn/hardware/eicon/divamnt.c 2003-09-27 18:57:44.000000000 -0700
+++ 25/drivers/isdn/hardware/eicon/divamnt.c 2003-10-05 00:33:24.000000000 -0700
@@ -1,4 +1,4 @@
--/* $Id: 2.6.0-test6-mm4.patch,v 1.4 2004/02/14 03:14:33 rread Exp $
-+/* $Id: 2.6.0-test6-mm4.patch,v 1.4 2004/02/14 03:14:33 rread Exp $
+-/* $Id: 2.6.0-test6-mm4.patch,v 1.5 2004/02/23 23:37:02 phil Exp $
++/* Id: 2.6.0-test6-mm4.patch,v 1.3.2.1 2004/02/14 07:21:32 nic Exp $
*
* Driver for Eicon DIVA Server ISDN cards.
* Maint module
-#include "di_defs.h"
#include "debug_if.h"
--static char *main_revision = "$Revision: 1.4 $";
-+static char *main_revision = "$Revision: 1.4 $";
+-static char *main_revision = "$Revision: 1.5 $";
++static char *main_revision = "$Revision: 1.5 $";
static int major;
--- linux-2.6.0-test6/drivers/isdn/hardware/eicon/divasmain.c 2003-09-27 18:57:44.000000000 -0700
+++ 25/drivers/isdn/hardware/eicon/divasmain.c 2003-10-05 00:33:24.000000000 -0700
@@ -1,4 +1,4 @@
--/* $Id: 2.6.0-test6-mm4.patch,v 1.4 2004/02/14 03:14:33 rread Exp $
-+/* $Id: 2.6.0-test6-mm4.patch,v 1.4 2004/02/14 03:14:33 rread Exp $
+-/* $Id: 2.6.0-test6-mm4.patch,v 1.5 2004/02/23 23:37:02 phil Exp $
++/* Id: 2.6.0-test6-mm4.patch,v 1.3.2.1 2004/02/14 07:21:32 nic Exp $
*
* Low level driver for Eicon DIVA Server ISDN cards.
*
#include "diva_dma.h"
#include "diva_pci.h"
--static char *main_revision = "$Revision: 1.4 $";
-+static char *main_revision = "$Revision: 1.4 $";
+-static char *main_revision = "$Revision: 1.5 $";
++static char *main_revision = "$Revision: 1.5 $";
static int major;
--- linux-2.6.0-test6/drivers/isdn/hardware/eicon/dqueue.c 2003-06-14 12:18:22.000000000 -0700
+++ 25/drivers/isdn/hardware/eicon/dqueue.c 2003-10-05 00:33:24.000000000 -0700
@@ -1,10 +1,10 @@
--/* $Id: 2.6.0-test6-mm4.patch,v 1.4 2004/02/14 03:14:33 rread Exp $
-+/* $Id: 2.6.0-test6-mm4.patch,v 1.4 2004/02/14 03:14:33 rread Exp $
+-/* $Id: 2.6.0-test6-mm4.patch,v 1.5 2004/02/23 23:37:02 phil Exp $
++/* Id: 2.6.0-test6-mm4.patch,v 1.3.2.1 2004/02/14 07:21:32 nic Exp $
*
* Driver for Eicon DIVA Server ISDN cards.
* User Mode IDI Interface
--- linux-2.6.0-test6/drivers/isdn/hardware/eicon/mntfunc.c 2003-09-27 18:57:44.000000000 -0700
+++ 25/drivers/isdn/hardware/eicon/mntfunc.c 2003-10-05 00:33:24.000000000 -0700
@@ -1,4 +1,4 @@
--/* $Id: 2.6.0-test6-mm4.patch,v 1.4 2004/02/14 03:14:33 rread Exp $
-+/* $Id: 2.6.0-test6-mm4.patch,v 1.4 2004/02/14 03:14:33 rread Exp $
+-/* $Id: 2.6.0-test6-mm4.patch,v 1.5 2004/02/23 23:37:02 phil Exp $
++/* Id: 2.6.0-test6-mm4.patch,v 1.4 2004/02/14 03:14:33 rread Exp $
*
* Driver for Eicon DIVA Server ISDN cards.
* Maint module
--- linux-2.6.0-test6/drivers/isdn/hardware/eicon/os_capi.h 2003-06-14 12:18:25.000000000 -0700
+++ 25/drivers/isdn/hardware/eicon/os_capi.h 2003-10-05 00:33:24.000000000 -0700
@@ -1,10 +1,10 @@
--/* $Id: 2.6.0-test6-mm4.patch,v 1.4 2004/02/14 03:14:33 rread Exp $
-+/* $Id: 2.6.0-test6-mm4.patch,v 1.4 2004/02/14 03:14:33 rread Exp $
+-/* $Id: 2.6.0-test6-mm4.patch,v 1.5 2004/02/23 23:37:02 phil Exp $
++/* Id: 2.6.0-test6-mm4.patch,v 1.3.2.1 2004/02/14 07:21:32 nic Exp $
*
* ISDN interface module for Eicon active cards DIVA.
* CAPI Interface OS include files
--- linux-2.6.0-test6/drivers/isdn/hardware/eicon/platform.h 2003-09-27 18:57:44.000000000 -0700
+++ 25/drivers/isdn/hardware/eicon/platform.h 2003-10-05 00:33:24.000000000 -0700
@@ -1,4 +1,4 @@
--/* $Id: 2.6.0-test6-mm4.patch,v 1.4 2004/02/14 03:14:33 rread Exp $
-+/* $Id: 2.6.0-test6-mm4.patch,v 1.4 2004/02/14 03:14:33 rread Exp $
+-/* $Id: 2.6.0-test6-mm4.patch,v 1.5 2004/02/23 23:37:02 phil Exp $
++/* Id: 2.6.0-test6-mm4.patch,v 1.3.2.1 2004/02/14 07:21:32 nic Exp $
*
* platform.h
*
+++ 25/drivers/media/video/planb.c 2003-10-05 00:33:24.000000000 -0700
@@ -27,7 +27,6 @@
- /* $Id: 2.6.0-test6-mm4.patch,v 1.4 2004/02/14 03:14:33 rread Exp $ */
+ /* $Id: 2.6.0-test6-mm4.patch,v 1.5 2004/02/23 23:37:02 phil Exp $ */
-#include <linux/version.h>
#include <linux/init.h>
--- linux-2.6.0-test6/drivers/mtd/chips/map_rom.c 2003-06-14 12:18:24.000000000 -0700
+++ 25/drivers/mtd/chips/map_rom.c 2003-10-05 00:33:24.000000000 -0700
@@ -4,7 +4,6 @@
- * $Id: 2.6.0-test6-mm4.patch,v 1.4 2004/02/14 03:14:33 rread Exp $
+ * $Id: 2.6.0-test6-mm4.patch,v 1.5 2004/02/23 23:37:02 phil Exp $
*/
-#include <linux/version.h>
#include <linux/hdlc.h>
/* Version */
--static const char version[] = "$Id: 2.6.0-test6-mm4.patch,v 1.4 2004/02/14 03:14:33 rread Exp $ for Linux\n";
-+static const char version[] = "$Id: 2.6.0-test6-mm4.patch,v 1.4 2004/02/14 03:14:33 rread Exp $ for Linux\n";
+-static const char version[] = "$Id: 2.6.0-test6-mm4.patch,v 1.5 2004/02/23 23:37:02 phil Exp $ for Linux\n";
++static const char version[] = "Id: 2.6.0-test6-mm4.patch,v 1.3.2.1 2004/02/14 07:21:32 nic Exp $ for Linux\n";
static int debug;
static int quartz;
-$Id: bproc-patch-2.4.20,v 1.4 2004/02/14 03:14:37 rread Exp $
-
Index: linux/fs/exec.c
===================================================================
--- linux.orig/fs/exec.c 2003-09-03 17:52:00.000000000 -0400
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+ *
-+ * $Id: bproc-patch-2.4.20,v 1.4 2004/02/14 03:14:37 rread Exp $
++ * Id: bproc-patch-2.4.20,v 1.3.2.1 2004/02/14 07:21:44 nic Exp $
+ *-----------------------------------------------------------------------*/
+#include <linux/kernel.h>
+#include <linux/sched.h>
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+ *
-+ * $Id: bproc-patch-2.4.20,v 1.4 2004/02/14 03:14:37 rread Exp $
++ * Id: bproc-patch-2.4.20,v 1.3.2.1 2004/02/14 07:21:44 nic Exp $
+ *-----------------------------------------------------------------------*/
+#ifndef _LINUX_BPROC_H
+#define _LINUX_BPROC_H
static unsigned long next_msg;
if (l_has_lock(&ns->ns_lock) && time_after(jiffies, next_msg)) {
- CERROR("namespace %s lock held during RPCs; tell phil\n",
+ CERROR("namespace %s lock held illegally; tell phil\n",
ns->ns_name);
#if (LUSTRE_KERNEL_VERSION >= 30)
CERROR(portals_debug_dumpstack());
ldlm_extent_internal_policy(&res->lr_waiting, lock, &new_ex);
if (new_ex.start != lock->l_policy_data.l_extent.start ||
- new_ex.end != lock->l_policy_data.l_extent.end) {
+ new_ex.end != lock->l_policy_data.l_extent.end) {
*flags |= LDLM_FL_LOCK_CHANGED;
lock->l_policy_data.l_extent.start = new_ex.start;
lock->l_policy_data.l_extent.end = new_ex.end;
}
RETURN(0);
}
+
+/* When a lock is cancelled by a client, the KMS may undergo change if this
+ * is the "highest lock". This function returns the new KMS value.
+ *
+ * NB: A lock on [x,y] protects a KMS of up to y + 1 bytes! */
+__u64 ldlm_extent_shift_kms(struct ldlm_lock *lock, __u64 old_kms)
+{
+ struct ldlm_resource *res = lock->l_resource;
+ struct list_head *tmp;
+ struct ldlm_lock *lck;
+ __u64 kms = 0;
+ ENTRY;
+
+ l_lock(&res->lr_namespace->ns_lock);
+ list_for_each(tmp, &res->lr_granted) {
+ lck = list_entry(tmp, struct ldlm_lock, l_res_link);
+
+ if (lock == lck)
+ continue;
+ if (lck->l_policy_data.l_extent.end >= old_kms)
+ GOTO(out, kms = old_kms);
+ kms = lck->l_policy_data.l_extent.end + 1;
+ }
+
+ GOTO(out, kms);
+ out:
+ l_unlock(&res->lr_namespace->ns_lock);
+ return kms;
+}
* release the ns_lock, allocate the new lock,
* and restart processing this lock. */
new2 = ldlm_lock_create(ns, NULL, res->lr_name, LDLM_FLOCK,
- lock->l_granted_mode, NULL, NULL, NULL);
+ lock->l_granted_mode, NULL, NULL, NULL,
+ NULL, 0);
if (!new2) {
ldlm_flock_destroy(req, lock->l_granted_mode, *flags);
*err = -ENOLCK;
ldlm_lock_create(struct ldlm_namespace *ns,
struct lustre_handle *parent_lock_handle, struct ldlm_res_id,
__u32 type, ldlm_mode_t, ldlm_blocking_callback,
- ldlm_completion_callback, void *data);
+ ldlm_completion_callback, ldlm_glimpse_callback, void *data,
+ __u32 lvb_len);
ldlm_error_t ldlm_lock_enqueue(struct ldlm_namespace *, struct ldlm_lock **,
- void *cookie, int cookie_len, int *flags);
+ void *cookie, int *flags);
void ldlm_lock_addref_internal(struct ldlm_lock *, __u32 mode);
void ldlm_lock_decref_internal(struct ldlm_lock *, __u32 mode);
void ldlm_add_ast_work_item(struct ldlm_lock *lock, struct ldlm_lock *new,
int ldlm_reprocess_queue(struct ldlm_resource *res, struct list_head *queue);
int ldlm_run_ast_work(struct ldlm_namespace *, struct list_head *rpc_list);
-typedef int (*ldlm_processing_policy)(struct ldlm_lock *lock, int *flags,
- int first_enq, ldlm_error_t *err);
-
/* ldlm_plain.c */
int ldlm_process_plain_lock(struct ldlm_lock *lock, int *flags, int first_enq,
ldlm_error_t *err);
#endif
};
-static ldlm_res_policy ldlm_intent_policy_func;
-
-void ldlm_register_intent(ldlm_res_policy arg)
+ldlm_processing_policy ldlm_get_processing_policy(struct ldlm_resource *res)
{
- ldlm_intent_policy_func = arg;
+ return ldlm_processing_policy_table[res->lr_type];
}
-void ldlm_unregister_intent(void)
+void ldlm_register_intent(struct ldlm_namespace *ns, ldlm_res_policy arg)
{
- ldlm_intent_policy_func = NULL;
+ ns->ns_policy = arg;
}
/*
if (lock->l_parent)
LDLM_LOCK_PUT(lock->l_parent);
+ if (lock->l_lvb_data != NULL)
+ OBD_FREE(lock->l_lvb_data, lock->l_lvb_len);
+
OBD_SLAB_FREE(lock, ldlm_lock_slab, sizeof(*lock));
l_unlock(&ns->ns_lock);
}
desc->l_granted_mode = lock->l_granted_mode;
memcpy(&desc->l_policy_data, &lock->l_policy_data,
sizeof(desc->l_policy_data));
- memcpy(desc->l_version, lock->l_version, sizeof(desc->l_version));
}
void ldlm_add_ast_work_item(struct ldlm_lock *lock, struct ldlm_lock *new,
lock->l_readers++;
else
lock->l_writers++;
+ lock->l_last_used = jiffies;
l_unlock(&lock->l_resource->lr_namespace->ns_lock);
LDLM_LOCK_GET(lock);
LDLM_DEBUG(lock, "ldlm_lock_addref(%s)", ldlm_lockname[mode]);
(lock->l_flags & LDLM_FL_CBPENDING)) {
/* If we received a blocked AST and this was the last reference,
* run the callback. */
- if (!ns->ns_client && lock->l_export)
+ if (ns->ns_client == LDLM_NAMESPACE_SERVER && lock->l_export)
CERROR("FL_CBPENDING set on non-local lock--just a "
"warning\n");
if (lock->l_blocking_ast != NULL)
lock->l_blocking_ast(lock, NULL, lock->l_ast_data,
LDLM_CB_BLOCKING);
- } else if (ns->ns_client && !lock->l_readers && !lock->l_writers) {
+ } else if (ns->ns_client == LDLM_NAMESPACE_CLIENT &&
+ !lock->l_readers && !lock->l_writers) {
/* If this is a client-side namespace and this was the last
* reference, put it on the LRU. */
LASSERT(list_empty(&lock->l_lru));
/* returns a referenced lock or NULL. See the flag descriptions below, in the
* comment above ldlm_lock_match */
static struct ldlm_lock *search_queue(struct list_head *queue, ldlm_mode_t mode,
- struct ldlm_extent *extent,
+ ldlm_policy_data_t *policy,
struct ldlm_lock *old_lock, int flags)
{
struct ldlm_lock *lock;
continue;
if (lock->l_resource->lr_type == LDLM_EXTENT &&
- (lock->l_policy_data.l_extent.start > extent->start ||
- lock->l_policy_data.l_extent.end < extent->end))
+ (lock->l_policy_data.l_extent.start >
+ policy->l_extent.start ||
+ lock->l_policy_data.l_extent.end < policy->l_extent.end))
continue;
if (lock->l_destroyed)
return NULL;
}
+void ldlm_lock_allow_match(struct ldlm_lock *lock)
+{
+ l_lock(&lock->l_resource->lr_namespace->ns_lock);
+ lock->l_flags |= LDLM_FL_CAN_MATCH;
+ wake_up(&lock->l_waitq);
+ l_unlock(&lock->l_resource->lr_namespace->ns_lock);
+}
+
/* Can be called in two ways:
*
* If 'ns' is NULL, then lockh describes an existing lock that we want to look
* case, lockh is filled in with a addref()ed lock
*/
int ldlm_lock_match(struct ldlm_namespace *ns, int flags,
- struct ldlm_res_id *res_id, __u32 type, void *cookie,
- int cookielen, ldlm_mode_t mode,
+ struct ldlm_res_id *res_id, __u32 type,
+ ldlm_policy_data_t *policy, ldlm_mode_t mode,
struct lustre_handle *lockh)
{
struct ldlm_resource *res;
l_lock(&ns->ns_lock);
- lock = search_queue(&res->lr_granted, mode, cookie, old_lock, flags);
+ lock = search_queue(&res->lr_granted, mode, policy, old_lock, flags);
if (lock != NULL)
GOTO(out, rc = 1);
if (flags & LDLM_FL_BLOCK_GRANTED)
GOTO(out, rc = 0);
- lock = search_queue(&res->lr_converting, mode, cookie, old_lock, flags);
+ lock = search_queue(&res->lr_converting, mode, policy, old_lock, flags);
if (lock != NULL)
GOTO(out, rc = 1);
- lock = search_queue(&res->lr_waiting, mode, cookie, old_lock, flags);
+ lock = search_queue(&res->lr_waiting, mode, policy, old_lock, flags);
if (lock != NULL)
GOTO(out, rc = 1);
l_unlock(&ns->ns_lock);
if (lock) {
+ struct l_wait_info lwi;
ldlm_lock2handle(lock, lockh);
if (lock->l_completion_ast)
lock->l_completion_ast(lock, LDLM_FL_WAIT_NOREPROC,
NULL);
+
+ lwi = LWI_TIMEOUT_INTR(obd_timeout * HZ, NULL, NULL, NULL);
+
+ /* XXX FIXME see comment about CAN_MATCH in lustre_dlm.h */
+ l_wait_event(lock->l_waitq,
+ (lock->l_flags & LDLM_FL_CAN_MATCH), &lwi);
}
if (rc)
LDLM_DEBUG(lock, "matched");
ldlm_mode_t mode,
ldlm_blocking_callback blocking,
ldlm_completion_callback completion,
- void *data)
+ ldlm_glimpse_callback glimpse,
+ void *data, __u32 lvb_len)
{
struct ldlm_resource *res, *parent_res = NULL;
struct ldlm_lock *lock, *parent_lock = NULL;
lock->l_ast_data = data;
lock->l_blocking_ast = blocking;
lock->l_completion_ast = completion;
+ lock->l_glimpse_ast = glimpse;
+
+ lock->l_lvb_len = lvb_len;
+ OBD_ALLOC(lock->l_lvb_data, lvb_len);
+ if (lock->l_lvb_data == NULL) {
+ OBD_SLAB_FREE(lock, ldlm_lock_slab, sizeof(*lock));
+ RETURN(NULL);
+ }
RETURN(lock);
}
ldlm_error_t ldlm_lock_enqueue(struct ldlm_namespace *ns,
struct ldlm_lock **lockp,
- void *cookie, int cookie_len, int *flags)
+ void *cookie, int *flags)
{
struct ldlm_lock *lock = *lockp;
struct ldlm_resource *res = lock->l_resource;
ldlm_error_t rc = ELDLM_OK;
ENTRY;
- if (res->lr_type != LDLM_PLAIN)
- memcpy(&lock->l_policy_data, cookie, cookie_len);
-
/* policies are not executed on the client or during replay */
if ((*flags & (LDLM_FL_HAS_INTENT|LDLM_FL_REPLAY)) == LDLM_FL_HAS_INTENT
- && !local && ldlm_intent_policy_func) {
- rc = ldlm_intent_policy_func(ns, lockp, cookie,
- lock->l_req_mode, *flags, NULL);
+ && !local && ns->ns_policy) {
+ rc = ns->ns_policy(ns, lockp, cookie, lock->l_req_mode, *flags,
+ NULL);
if (rc == ELDLM_LOCK_REPLACED) {
/* The lock that was returned has already been granted,
- * and placed into lockp. Destroy the old one and our
+ * and placed into lockp. If it's not the same as the
+ * one we passed in, then destroy the old one and our
* work here is done. */
- ldlm_lock_destroy(lock);
- LDLM_LOCK_PUT(lock);
+ if (lock != *lockp) {
+ ldlm_lock_destroy(lock);
+ LDLM_LOCK_PUT(lock);
+ }
*flags |= LDLM_FL_LOCK_CHANGED;
RETURN(0);
} else if (rc == ELDLM_LOCK_ABORTED ||
if (!((portal_debug | D_ERROR) & level))
return;
- if (RES_VERSION_SIZE != 4)
- LBUG();
-
if (!lock) {
CDEBUG(level, " NULL LDLM lock\n");
return;
}
- CDEBUG(level,
- " -- Lock dump: %p/"LPX64" (%x %x %x %x) (rc: %d) (pos: %d)\n",
- lock, lock->l_handle.h_cookie, lock->l_version[0],
- lock->l_version[1], lock->l_version[2], lock->l_version[3],
- atomic_read(&lock->l_refc), pos);
+ CDEBUG(level, " -- Lock dump: %p/"LPX64" (rc: %d) (pos: %d)\n",
+ lock, lock->l_handle.h_cookie, atomic_read(&lock->l_refc),
+ pos);
if (lock->l_conn_export != NULL)
obd = lock->l_conn_export->exp_obd;
if (lock->l_export && lock->l_export->exp_connection) {
req->rq_send_state = LUSTRE_IMP_FULL;
req->rq_timeout = 2; /* 2 second timeout for initial AST reply */
rc = ptlrpc_queue_wait(req);
- if (rc == -ETIMEDOUT || rc == -EINTR) {
+ if (rc == -ETIMEDOUT || rc == -EINTR || rc == -ENOTCONN) {
LASSERT(lock->l_export);
if (lock->l_export->exp_libclient) {
CDEBUG(D_HA, "BLOCKING AST to liblustre client (nid "
struct ptlrpc_request *req;
struct timeval granted_time;
long total_enqueue_wait;
- int rc = 0, size = sizeof(*body);
+ int rc = 0, size[2] = {sizeof(*body)}, buffers = 1;
ENTRY;
- if (lock == NULL) {
- LBUG();
- RETURN(-EINVAL);
- }
+ LASSERT(lock != NULL);
do_gettimeofday(&granted_time);
total_enqueue_wait = timeval_sub(&granted_time, &lock->l_enqueued_time);
if (total_enqueue_wait / 1000000 > obd_timeout)
LDLM_ERROR(lock, "enqueue wait took %ldus", total_enqueue_wait);
+ if (lock->l_resource->lr_lvb_len) {
+ buffers = 2;
+ size[1] = lock->l_resource->lr_lvb_len;
+ }
+
req = ptlrpc_prep_req(lock->l_export->exp_imp_reverse,
- LDLM_CP_CALLBACK, 1, &size, NULL);
- if (!req)
+ LDLM_CP_CALLBACK, buffers, size, NULL);
+ if (req == NULL)
RETURN(-ENOMEM);
body = lustre_msg_buf(req->rq_reqmsg, 0, sizeof (*body));
body->lock_flags = flags;
ldlm_lock2desc(lock, &body->lock_desc);
+ if (buffers == 2) {
+ void *lvb = lustre_msg_buf(req->rq_reqmsg, 1,
+ lock->l_resource->lr_lvb_len);
+ memcpy(lvb, lock->l_resource->lr_lvb_data,
+ lock->l_resource->lr_lvb_len);
+ }
+
LDLM_DEBUG(lock, "server preparing completion AST (after %ldus wait)",
total_enqueue_wait);
req->rq_replen = lustre_msg_size(0, NULL);
l_unlock(&lock->l_resource->lr_namespace->ns_lock);
rc = ptlrpc_queue_wait(req);
- if (rc == -ETIMEDOUT || rc == -EINTR) {
+ if (rc == -ETIMEDOUT || rc == -EINTR || rc == -ENOTCONN) {
ldlm_del_waiting_lock(lock);
ldlm_failed_ast(lock, rc, "completion");
} else if (rc) {
RETURN(rc);
}
+int ldlm_server_glimpse_ast(struct ldlm_lock *lock, void *data)
+{
+ struct ldlm_resource *res = lock->l_resource;
+ struct ldlm_request *body;
+ struct ptlrpc_request *req;
+ int rc = 0, size = sizeof(*body);
+ ENTRY;
+
+ LASSERT(lock != NULL);
+
+ req = ptlrpc_prep_req(lock->l_export->exp_imp_reverse,
+ LDLM_GL_CALLBACK, 1, &size, NULL);
+ if (req == NULL)
+ RETURN(-ENOMEM);
+
+ body = lustre_msg_buf(req->rq_reqmsg, 0, sizeof(*body));
+ memcpy(&body->lock_handle1, &lock->l_remote_handle,
+ sizeof(body->lock_handle1));
+ ldlm_lock2desc(lock, &body->lock_desc);
+
+ size = lock->l_resource->lr_lvb_len;
+ req->rq_replen = lustre_msg_size(1, &size);
+
+ req->rq_send_state = LUSTRE_IMP_FULL;
+ req->rq_timeout = 2; /* 2 second timeout for initial AST reply */
+
+ rc = ptlrpc_queue_wait(req);
+ if (rc == -ETIMEDOUT || rc == -EINTR || rc == -ENOTCONN) {
+ ldlm_del_waiting_lock(lock);
+ ldlm_failed_ast(lock, rc, "glimpse");
+ } else if (rc) {
+ LDLM_ERROR(lock, "client sent rc %d rq_status %d from "
+ "completion AST\n", rc, req->rq_status);
+ ldlm_lock_cancel(lock);
+ } else {
+ rc = res->lr_namespace->ns_lvbo->lvbo_update(res,
+ req->rq_repmsg, 0);
+ }
+ ptlrpc_req_finished(req);
+ RETURN(rc);
+}
+
int ldlm_handle_enqueue(struct ptlrpc_request *req,
ldlm_completion_callback completion_callback,
- ldlm_blocking_callback blocking_callback)
+ ldlm_blocking_callback blocking_callback,
+ ldlm_glimpse_callback glimpse_callback)
{
struct obd_device *obddev = req->rq_export->exp_obd;
struct ldlm_reply *dlm_rep;
struct ldlm_request *dlm_req;
- int rc, size = sizeof(*dlm_rep), cookielen = 0;
+ int rc, size[2] = {sizeof(*dlm_rep)};
__u32 flags;
ldlm_error_t err;
struct ldlm_lock *lock = NULL;
}
flags = dlm_req->lock_flags;
- if (dlm_req->lock_desc.l_resource.lr_type == LDLM_PLAIN &&
- (flags & LDLM_FL_HAS_INTENT)) {
- /* In this case, the reply buffer is allocated deep in
- * local_lock_enqueue by the policy function. */
- cookie = req;
- cookielen = sizeof(*req);
- } else {
- rc = lustre_pack_reply(req, 1, &size, NULL);
- if (rc) {
- CERROR("out of memory\n");
- RETURN(-ENOMEM);
- }
- if (dlm_req->lock_desc.l_resource.lr_type != LDLM_PLAIN) {
- cookie = &dlm_req->lock_desc.l_policy_data;
- cookielen = sizeof(ldlm_policy_data_t);
- }
- }
/* The lock's callback data might be set in the policy function */
- lock = ldlm_lock_create(obddev->obd_namespace,
- &dlm_req->lock_handle2,
+ lock = ldlm_lock_create(obddev->obd_namespace, &dlm_req->lock_handle2,
dlm_req->lock_desc.l_resource.lr_name,
dlm_req->lock_desc.l_resource.lr_type,
dlm_req->lock_desc.l_req_mode,
- blocking_callback, completion_callback, NULL);
+ blocking_callback, completion_callback,
+ glimpse_callback, NULL, 0);
if (!lock)
GOTO(out, err = -ENOMEM);
&lock->l_export->exp_ldlm_data.led_held_locks);
l_unlock(&lock->l_resource->lr_namespace->ns_lock);
- err = ldlm_lock_enqueue(obddev->obd_namespace, &lock, cookie, cookielen,
- &flags);
+ if (flags & LDLM_FL_HAS_INTENT) {
+ /* In this case, the reply buffer is allocated deep in
+ * local_lock_enqueue by the policy function. */
+ cookie = req;
+ } else {
+ int buffers = 1;
+ if (lock->l_resource->lr_lvb_len) {
+ size[1] = lock->l_resource->lr_lvb_len;
+ buffers = 2;
+ }
+
+ rc = lustre_pack_reply(req, buffers, size, NULL);
+ if (rc)
+ RETURN(rc);
+ }
+
+ if (dlm_req->lock_desc.l_resource.lr_type != LDLM_PLAIN)
+ memcpy(&lock->l_policy_data, &dlm_req->lock_desc.l_policy_data,
+ sizeof(ldlm_policy_data_t));
+
+ err = ldlm_lock_enqueue(obddev->obd_namespace, &lock, cookie, &flags);
if (err)
GOTO(out, err);
dlm_rep = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*dlm_rep));
dlm_rep->lock_flags = flags;
+ ldlm_lock2desc(lock, &dlm_rep->lock_desc);
ldlm_lock2handle(lock, &dlm_rep->lock_handle);
- if (dlm_req->lock_desc.l_resource.lr_type != LDLM_PLAIN) {
- memcpy(&dlm_rep->lock_policy_data, &lock->l_policy_data,
- cookielen);
- }
- if (dlm_rep->lock_flags & LDLM_FL_LOCK_CHANGED) {
- memcpy(&dlm_rep->lock_resource_name, &lock->l_resource->lr_name,
- sizeof(dlm_rep->lock_resource_name));
- dlm_rep->lock_mode = lock->l_req_mode;
- }
/* We never send a blocking AST until the lock is granted, but
* we can tell it right now */
EXIT;
out:
+ if (lock->l_resource->lr_lvb_len > 0) {
+ void *lvb = lustre_msg_buf(req->rq_repmsg, 1,
+ lock->l_resource->lr_lvb_len);
+ memcpy(lvb, lock->l_resource->lr_lvb_data,
+ lock->l_resource->lr_lvb_len);
+ }
req->rq_status = err;
/* The LOCK_CHANGED code in ldlm_lock_enqueue depends on this
{
struct ldlm_request *dlm_req;
struct ldlm_lock *lock;
+ struct ldlm_resource *res;
char str[PTL_NALFMT_SIZE];
int rc;
ENTRY;
- dlm_req = lustre_swab_reqbuf (req, 0, sizeof (*dlm_req),
+ dlm_req = lustre_swab_reqbuf(req, 0, sizeof (*dlm_req),
lustre_swab_ldlm_request);
if (dlm_req == NULL) {
CERROR("bad request buffer for cancel\n");
req->rq_status = ESTALE;
} else {
LDLM_DEBUG(lock, "server-side cancel handler START");
+ res = lock->l_resource;
+ if (res && res->lr_namespace->ns_lvbo &&
+ res->lr_namespace->ns_lvbo->lvbo_update) {
+ (void)res->lr_namespace->ns_lvbo->lvbo_update
+ (res, NULL, 0);
+ //(res, req->rq_reqmsg, 1);
+ }
+
ldlm_lock_cancel(lock);
if (ldlm_del_waiting_lock(lock))
CDEBUG(D_DLMTRACE, "cancelled waiting lock %p\n", lock);
- req->rq_status = 0;
+ req->rq_status = rc;
}
if (ptlrpc_reply(req) != 0)
LDLM_DEBUG(lock, "completion AST includes blocking AST");
}
+ if (lock->l_lvb_len) {
+ void *lvb;
+ lvb = lustre_swab_reqbuf(req, 1, lock->l_lvb_len,
+ lock->l_lvb_swabber);
+ if (lvb == NULL) {
+ LDLM_ERROR(lock, "completion AST did not contain "
+ "expected LVB!");
+ } else {
+ memcpy(lock->l_lvb_data, lvb, lock->l_lvb_len);
+ }
+ }
+
lock->l_resource->lr_tmp = &ast_list;
ldlm_grant_lock(lock, req, sizeof(*req), 1);
lock->l_resource->lr_tmp = NULL;
EXIT;
}
+static void ldlm_handle_gl_callback(struct ptlrpc_request *req,
+ struct ldlm_namespace *ns,
+ struct ldlm_request *dlm_req,
+ struct ldlm_lock *lock)
+{
+ ENTRY;
+
+ l_lock(&ns->ns_lock);
+ LDLM_DEBUG(lock, "client glimpse AST callback handler");
+
+ if (lock->l_glimpse_ast != NULL) {
+ l_unlock(&ns->ns_lock);
+ l_check_no_ns_lock(ns);
+ lock->l_glimpse_ast(lock, req);
+ l_lock(&ns->ns_lock);
+ }
+
+ if (lock->l_granted_mode == LCK_PW &&
+ !lock->l_readers && !lock->l_writers &&
+ time_after(jiffies, lock->l_last_used + 10 * HZ)) {
+ l_unlock(&ns->ns_lock);
+ ldlm_handle_bl_callback(ns, NULL, lock);
+ EXIT;
+ return;
+ }
+
+ l_unlock(&ns->ns_lock);
+ LDLM_LOCK_PUT(lock);
+ EXIT;
+}
+
static int ldlm_callback_reply(struct ptlrpc_request *req, int rc)
{
req->rq_status = rc;
OBD_FAIL_RETURN(OBD_FAIL_LDLM_BL_CALLBACK, 0);
} else if (req->rq_reqmsg->opc == LDLM_CP_CALLBACK) {
OBD_FAIL_RETURN(OBD_FAIL_LDLM_CP_CALLBACK, 0);
+ } else if (req->rq_reqmsg->opc == LDLM_GL_CALLBACK) {
+ OBD_FAIL_RETURN(OBD_FAIL_LDLM_GL_CALLBACK, 0);
} else if (req->rq_reqmsg->opc == OBD_LOG_CANCEL) {
OBD_FAIL_RETURN(OBD_FAIL_OBD_LOG_CANCEL_NET, 0);
} else if (req->rq_reqmsg->opc == LLOG_ORIGIN_HANDLE_CREATE) {
* cancelling right now, because it's unused, or have an intent result
* in the reply, so we might have to push the responsibility for sending
* the reply down into the AST handlers, alas. */
- if (req->rq_reqmsg->opc != LDLM_BL_CALLBACK)
+ if (req->rq_reqmsg->opc == LDLM_CP_CALLBACK)
ldlm_callback_reply(req, 0);
switch (req->rq_reqmsg->opc) {
CDEBUG(D_INODE, "completion ast\n");
ldlm_handle_cp_callback(req, ns, dlm_req, lock);
break;
+ case LDLM_GL_CALLBACK:
+ CDEBUG(D_INODE, "glimpse ast\n");
+ ldlm_handle_gl_callback(req, ns, dlm_req, lock);
+ break;
default:
LBUG(); /* checked above */
}
/* ldlm_flock.c */
EXPORT_SYMBOL(ldlm_flock_completion_ast);
+/* ldlm_extent.c */
+EXPORT_SYMBOL(ldlm_extent_shift_kms);
+
/* ldlm_lock.c */
+EXPORT_SYMBOL(ldlm_get_processing_policy);
EXPORT_SYMBOL(ldlm_lock2desc);
EXPORT_SYMBOL(ldlm_register_intent);
-EXPORT_SYMBOL(ldlm_unregister_intent);
EXPORT_SYMBOL(ldlm_lockname);
EXPORT_SYMBOL(ldlm_typename);
EXPORT_SYMBOL(ldlm_lock2handle);
EXPORT_SYMBOL(__ldlm_handle2lock);
+EXPORT_SYMBOL(ldlm_lock_get);
EXPORT_SYMBOL(ldlm_lock_put);
EXPORT_SYMBOL(ldlm_lock_match);
EXPORT_SYMBOL(ldlm_lock_cancel);
EXPORT_SYMBOL(ldlm_lock_dump_handle);
EXPORT_SYMBOL(ldlm_cancel_locks_for_export);
EXPORT_SYMBOL(ldlm_reprocess_all_ns);
+EXPORT_SYMBOL(ldlm_lock_allow_match);
/* ldlm_request.c */
EXPORT_SYMBOL(ldlm_completion_ast);
/* ldlm_lockd.c */
EXPORT_SYMBOL(ldlm_server_blocking_ast);
EXPORT_SYMBOL(ldlm_server_completion_ast);
+EXPORT_SYMBOL(ldlm_server_glimpse_ast);
EXPORT_SYMBOL(ldlm_handle_enqueue);
EXPORT_SYMBOL(ldlm_handle_cancel);
EXPORT_SYMBOL(ldlm_handle_convert);
EXPORT_SYMBOL(ldlm_namespace_new);
EXPORT_SYMBOL(ldlm_namespace_cleanup);
EXPORT_SYMBOL(ldlm_namespace_free);
+EXPORT_SYMBOL(ldlm_resource_get);
+EXPORT_SYMBOL(ldlm_resource_putref);
/* l_lock.c */
EXPORT_SYMBOL(l_lock);
int ldlm_completion_ast(struct ldlm_lock *lock, int flags, void *data)
{
- /* XXX ALLOCATE - 160 mytes */
+ /* XXX ALLOCATE - 160 bytes */
struct lock_wait_data lwd;
unsigned long irqflags;
struct obd_device *obd;
struct obd_import *imp = NULL;
- int rc = 0;
struct l_wait_info lwi;
+ int rc = 0;
ENTRY;
if (flags == LDLM_FL_WAIT_NOREPROC)
goto noreproc;
- if (flags == 0) {
+ if (!(flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED |
+ LDLM_FL_BLOCK_CONV))) {
wake_up(&lock->l_waitq);
RETURN(0);
}
- if (!(flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED |
- LDLM_FL_BLOCK_CONV)))
- RETURN(0);
-
LDLM_DEBUG(lock, "client-side enqueue returned a blocked lock, "
"sleeping");
ldlm_lock_dump(D_OTHER, lock, 0);
}
static int ldlm_cli_enqueue_local(struct ldlm_namespace *ns,
- struct lustre_handle *parent_lockh,
struct ldlm_res_id res_id,
__u32 type,
- void *cookie, int cookielen,
+ ldlm_policy_data_t *policy,
ldlm_mode_t mode,
int *flags,
- ldlm_completion_callback completion,
ldlm_blocking_callback blocking,
- void *data,
+ ldlm_completion_callback completion,
+ ldlm_glimpse_callback glimpse,
+ void *data, __u32 lvb_len,
+ void *lvb_swabber,
struct lustre_handle *lockh)
{
struct ldlm_lock *lock;
LBUG();
}
- lock = ldlm_lock_create(ns, parent_lockh, res_id, type, mode,
- blocking, completion, data);
+ lock = ldlm_lock_create(ns, NULL, res_id, type, mode, blocking,
+ completion, glimpse, data, lvb_len);
if (!lock)
GOTO(out_nolock, err = -ENOMEM);
LDLM_DEBUG(lock, "client-side local enqueue handler, new lock created");
ldlm_lock_addref_internal(lock, mode);
ldlm_lock2handle(lock, lockh);
lock->l_flags |= LDLM_FL_LOCAL;
+ lock->l_lvb_swabber = lvb_swabber;
+ if (policy != NULL)
+ memcpy(&lock->l_policy_data, policy, sizeof(*policy));
- err = ldlm_lock_enqueue(ns, &lock, cookie, cookielen, flags);
+ err = ldlm_lock_enqueue(ns, &lock, policy, flags);
if (err != ELDLM_OK)
GOTO(out, err);
- if (type != LDLM_PLAIN)
- memcpy(cookie, &lock->l_policy_data, cookielen);
+ if (policy != NULL)
+ memcpy(policy, &lock->l_policy_data, sizeof(*policy));
if ((*flags) & LDLM_FL_LOCK_CHANGED)
memcpy(&res_id, &lock->l_resource->lr_name, sizeof(res_id));
int ldlm_cli_enqueue(struct obd_export *exp,
struct ptlrpc_request *req,
struct ldlm_namespace *ns,
- struct lustre_handle *parent_lock_handle,
struct ldlm_res_id res_id,
__u32 type,
- void *cookie, int cookielen,
+ ldlm_policy_data_t *policy,
ldlm_mode_t mode,
int *flags,
- ldlm_completion_callback completion,
ldlm_blocking_callback blocking,
+ ldlm_completion_callback completion,
+ ldlm_glimpse_callback glimpse,
void *data,
+ void *lvb,
+ __u32 lvb_len,
+ void *lvb_swabber,
struct lustre_handle *lockh)
{
struct ldlm_lock *lock;
struct ldlm_request *body;
struct ldlm_reply *reply;
- int rc, size = sizeof(*body), req_passed_in = 1, is_replay;
+ int rc, size[2] = {sizeof(*body), lvb_len}, req_passed_in = 1;
+ int is_replay = *flags & LDLM_FL_REPLAY;
ENTRY;
- is_replay = *flags & LDLM_FL_REPLAY;
- LASSERT(exp != NULL || !is_replay);
-
if (exp == NULL) {
- rc = ldlm_cli_enqueue_local(ns, parent_lock_handle, res_id,
- type, cookie, cookielen, mode,
- flags, completion, blocking, data,
+ LASSERT(!is_replay);
+ rc = ldlm_cli_enqueue_local(ns, res_id, type, policy, mode,
+ flags, blocking, completion,
+ glimpse, data, lvb_len, lvb_swabber,
lockh);
RETURN(rc);
}
LDLM_DEBUG(lock, "client-side enqueue START");
LASSERT(exp == lock->l_conn_export);
} else {
- lock = ldlm_lock_create(ns, parent_lock_handle, res_id, type,
- mode, blocking, completion, data);
+ lock = ldlm_lock_create(ns, NULL, res_id, type, mode, blocking,
+ completion, glimpse, data, lvb_len);
if (lock == NULL)
GOTO(out_nolock, rc = -ENOMEM);
/* for the local lock, add the reference */
ldlm_lock_addref_internal(lock, mode);
ldlm_lock2handle(lock, lockh);
- if (type != LDLM_PLAIN)
- memcpy(&lock->l_policy_data, cookie, cookielen);
+ lock->l_lvb_swabber = lvb_swabber;
+ if (policy != NULL)
+ memcpy(&lock->l_policy_data, policy, sizeof(*policy));
LDLM_DEBUG(lock, "client-side enqueue START");
}
if (req == NULL) {
req = ptlrpc_prep_req(class_exp2cliimp(exp), LDLM_ENQUEUE, 1,
- &size, NULL);
- if (!req)
- GOTO(out, rc = -ENOMEM);
+ size, NULL);
+ if (req == NULL)
+ GOTO(out_lock, rc = -ENOMEM);
req_passed_in = 0;
} else if (req->rq_reqmsg->buflens[0] != sizeof(*body))
LBUG();
body->lock_flags = *flags;
memcpy(&body->lock_handle1, lockh, sizeof(*lockh));
- if (parent_lock_handle)
- memcpy(&body->lock_handle2, parent_lock_handle,
- sizeof(body->lock_handle2));
/* Continue as normal. */
if (!req_passed_in) {
- size = sizeof(*reply);
- req->rq_replen = lustre_msg_size(1, &size);
+ int buffers = 1;
+ if (lvb_len > 0)
+ buffers = 2;
+ size[0] = sizeof(*reply);
+ req->rq_replen = lustre_msg_size(buffers, size);
}
lock->l_conn_export = exp;
lock->l_export = NULL;
LASSERT(!is_replay);
LDLM_DEBUG(lock, "client-side enqueue END (%s)",
rc == ELDLM_LOCK_ABORTED ? "ABORTED" : "FAILED");
- failed_lock_cleanup(ns, lock, lockh, mode);
if (rc == ELDLM_LOCK_ABORTED) {
/* Before we return, swab the reply */
reply = lustre_swab_repbuf(req, 0, sizeof(*reply),
lustre_swab_ldlm_reply);
if (reply == NULL) {
CERROR("Can't unpack ldlm_reply\n");
- GOTO(out_req, rc = -EPROTO);
+ rc = -EPROTO;
+ }
+ if (lvb_len) {
+ void *tmplvb;
+ tmplvb = lustre_swab_repbuf(req, 1, lvb_len,
+ lvb_swabber);
+ if (tmplvb == NULL)
+ GOTO(out_lock, rc = -EPROTO);
+ if (lvb != NULL)
+ memcpy(lvb, tmplvb, lvb_len);
}
}
- GOTO(out_req, rc);
+ GOTO(out_lock, rc);
}
reply = lustre_swab_repbuf(req, 0, sizeof(*reply),
lustre_swab_ldlm_reply);
if (reply == NULL) {
CERROR("Can't unpack ldlm_reply\n");
- GOTO(out_req, rc = -EPROTO);
+ GOTO(out_lock, rc = -EPROTO);
}
memcpy(&lock->l_remote_handle, &reply->lock_handle,
"extent "LPU64" -> "LPU64"\n",
body->lock_desc.l_policy_data.l_extent.start,
body->lock_desc.l_policy_data.l_extent.end,
- reply->lock_policy_data.l_extent.start,
- reply->lock_policy_data.l_extent.end);
-
- cookie = &reply->lock_policy_data; /* FIXME bug 267 */
- cookielen = sizeof(struct ldlm_extent);
- } else if (type == LDLM_FLOCK) {
- cookie = &reply->lock_policy_data;
- cookielen = sizeof(struct ldlm_flock);
+ reply->lock_desc.l_policy_data.l_extent.start,
+ reply->lock_desc.l_policy_data.l_extent.end);
}
+ if (policy != NULL)
+ memcpy(&lock->l_policy_data, &reply->lock_desc.l_policy_data,
+ sizeof(reply->lock_desc.l_policy_data));
/* If enqueue returned a blocked lock but the completion handler has
* already run, then it fixed up the resource and we don't need to do it
* again. */
if ((*flags) & LDLM_FL_LOCK_CHANGED) {
- int newmode = reply->lock_mode;
+ int newmode = reply->lock_desc.l_req_mode;
LASSERT(!is_replay);
if (newmode && newmode != lock->l_req_mode) {
LDLM_DEBUG(lock, "server returned different mode %s",
lock->l_req_mode = newmode;
}
- if (reply->lock_resource_name.name[0] !=
+ if (reply->lock_desc.l_resource.lr_name.name[0] !=
lock->l_resource->lr_name.name[0]) {
CDEBUG(D_INFO, "remote intent success, locking %ld "
"instead of %ld\n",
- (long)reply->lock_resource_name.name[0],
+ (long)reply->lock_desc.l_resource.lr_name.name[0],
(long)lock->l_resource->lr_name.name[0]);
ldlm_lock_change_resource(ns, lock,
- reply->lock_resource_name);
+ reply->lock_desc.l_resource.lr_name);
if (lock->l_resource == NULL) {
LBUG();
- GOTO(out_req, rc = -ENOMEM);
+ GOTO(out_lock, rc = -ENOMEM);
}
LDLM_DEBUG(lock, "client-side enqueue, new resource");
}
LDLM_DEBUG(lock, "enqueue reply includes blocking AST");
}
+ if (lvb_len) {
+ void *tmplvb;
+ tmplvb = lustre_swab_repbuf(req, 1, lvb_len, lvb_swabber);
+ if (tmplvb == NULL)
+ GOTO(out_lock, rc = -EPROTO);
+ memcpy(lock->l_lvb_data, tmplvb, lvb_len);
+ }
+
if (!is_replay) {
- rc = ldlm_lock_enqueue(ns, &lock, cookie, cookielen, flags);
+ rc = ldlm_lock_enqueue(ns, &lock, NULL, flags);
if (lock->l_completion_ast != NULL) {
int err = lock->l_completion_ast(lock, *flags, NULL);
- if (err)
- failed_lock_cleanup(ns, lock, lockh, mode);
if (!rc)
rc = err;
}
}
+ if (lvb_len && lvb != NULL) {
+ /* Copy the LVB here, and not earlier, because the completion
+ * AST (if any) can override what we got in the reply */
+ memcpy(lvb, lock->l_lvb_data, lvb_len);
+ }
+
LDLM_DEBUG(lock, "client-side enqueue END");
EXIT;
- out_req:
- if (!req_passed_in)
+ out_lock:
+ if (rc)
+ failed_lock_cleanup(ns, lock, lockh, mode);
+ if (!req_passed_in && req != NULL)
ptlrpc_req_finished(req);
- out:
LDLM_LOCK_PUT(lock);
out_nolock:
return rc;
}
-int ldlm_cli_replay_enqueue(struct ldlm_lock *lock)
-{
- struct lustre_handle lockh;
- struct ldlm_res_id junk;
- int flags = LDLM_FL_REPLAY;
- ldlm_lock2handle(lock, &lockh);
- return ldlm_cli_enqueue(lock->l_conn_export, NULL, NULL, NULL, junk,
- lock->l_resource->lr_type, NULL, 0, -1, &flags,
- NULL, NULL, NULL, &lockh);
-}
-
static int ldlm_cli_convert_local(struct ldlm_lock *lock, int new_mode,
int *flags)
{
INIT_LIST_HEAD(&res->lr_granted);
INIT_LIST_HEAD(&res->lr_converting);
INIT_LIST_HEAD(&res->lr_waiting);
-
+ sema_init(&res->lr_lvb_sem, 1);
atomic_set(&res->lr_refcount, 1);
return res;
}
l_unlock(&ns->ns_lock);
+
RETURN(res);
}
{
struct list_head *bucket, *tmp;
struct ldlm_resource *res = NULL;
+ int rc;
ENTRY;
LASSERT(ns != NULL);
l_unlock(&ns->ns_lock);
+ if (create && ns->ns_lvbo && ns->ns_lvbo->lvbo_init) {
+ rc = ns->ns_lvbo->lvbo_init(res);
+ if (rc) {
+ CERROR("lvbo_init failure %d\n", rc);
+ LASSERT(ldlm_resource_putref(res) == 1);
+ res = NULL;
+ }
+ }
+
RETURN(res);
}
ns->ns_refcount--;
list_del_init(&res->lr_hash);
list_del_init(&res->lr_childof);
+ if (res->lr_lvb_data)
+ OBD_FREE(res->lr_lvb_data, res->lr_lvb_len);
l_unlock(&ns->ns_lock);
OBD_SLAB_FREE(res, ldlm_resource_slab, sizeof *res);
{
desc->lr_type = res->lr_type;
memcpy(&desc->lr_name, &res->lr_name, sizeof(desc->lr_name));
- memcpy(desc->lr_version, res->lr_version, sizeof(desc->lr_version));
}
void ldlm_dump_all_namespaces(void)
#include <linux/lustre_lite.h>
#include <linux/lustre_idl.h>
#include <linux/lustre_dlm.h>
+#include <linux/lustre_version.h>
#include "llite_internal.h"
it->it_op_release = ll_intent_release;
}
+#if (LUSTRE_KERNEL_VERSION < 33)
int ll_revalidate_it(struct dentry *de, int flags, struct lookup_intent *it)
+#else
+int ll_revalidate_it(struct dentry *de, int flags, struct nameidata *nd,
+ struct lookup_intent *it)
+#endif
{
int rc;
struct ll_fid pfid, cfid;
}
rc = ldlm_lock_match(obddev->obd_namespace, LDLM_FL_BLOCK_GRANTED,
- &res_id, LDLM_PLAIN, NULL, 0, LCK_PR, &lockh);
+ &res_id, LDLM_PLAIN, NULL, LCK_PR, &lockh);
if (!rc) {
ll_prepare_mdc_op_data(&data, inode, NULL, NULL, 0, 0);
struct ll_file_data *fd = file->private_data;
struct ptlrpc_request *req = NULL;
struct obd_client_handle *och = &fd->fd_mds_och;
- struct ll_inode_info *lli = ll_i2info(inode);
struct obdo obdo;
int rc, valid;
ENTRY;
valid = OBD_MD_FLID;
- if (test_bit(LLI_F_HAVE_OST_SIZE_LOCK, &lli->lli_flags))
- valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
memset(&obdo, 0, sizeof(obdo));
obdo.o_id = inode->i_ino;
}
}
-/* Flush the page cache for an extent as its canceled. No one can dirty the
- * extent until we've finished our work and they can enqueue another lock.
- * The DLM protects us from ll_file_read/write here, but other kernel actors
- * could have pages locked */
+static int ll_lock_to_stripe_offset(struct inode *inode, struct ldlm_lock *lock)
+{
+ struct ll_inode_info *lli = ll_i2info(inode);
+ struct lov_stripe_md *lsm = lli->lli_smd;
+ struct obd_export *exp = ll_i2obdexp(inode);
+ struct {
+ char name[16];
+ struct ldlm_lock *lock;
+ struct lov_stripe_md *lsm;
+ } key = { .name = "lock_to_stripe", .lock = lock, .lsm = lsm };
+ __u32 stripe, vallen = sizeof(stripe);
+ int rc;
+ ENTRY;
+
+ if (lsm->lsm_stripe_count == 1)
+ RETURN(0);
+
+ /* get our offset in the lov */
+ rc = obd_get_info(exp, sizeof(key), &key, &vallen, &stripe);
+ if (rc != 0) {
+ CERROR("obd_get_info: rc = %d\n", rc);
+ LBUG();
+ }
+ LASSERT(stripe < lsm->lsm_stripe_count);
+ RETURN(stripe);
+}
+
+/* Flush the page cache for an extent as its canceled. When we're on an LOV,
+ * we get a lock cancellation for each stripe, so we have to map the obd's
+ * region back onto the stripes in the file that it held.
+ *
+ * No one can dirty the extent until we've finished our work and they can
+ * enqueue another lock. The DLM protects us from ll_file_read/write here,
+ * but other kernel actors could have pages locked. */
void ll_pgcache_remove_extent(struct inode *inode, struct lov_stripe_md *lsm,
- struct ldlm_lock *lock)
+ struct ldlm_lock *lock, __u32 stripe)
{
struct ldlm_extent *extent = &lock->l_policy_data.l_extent;
- struct obd_export *exp = ll_i2obdexp(inode);
- struct ll_inode_info *lli = ll_i2info(inode);
- unsigned long start, end, i;
+ unsigned long start, end, count, skip, i, j;
struct page *page;
int rc, discard = lock->l_flags & LDLM_FL_DISCARD_DATA;
ENTRY;
CDEBUG(D_INODE, "obdo %lu inode %p ["LPU64"->"LPU64"] size: %llu\n",
inode->i_ino, inode, extent->start, extent->end, inode->i_size);
+ /* our locks are page granular thanks to osc_enqueue, we invalidate the
+ * whole page. */
+ LASSERT((extent->start & ~PAGE_CACHE_MASK) == 0);
+ LASSERT(((extent->end+1) & ~PAGE_CACHE_MASK) == 0);
+
start = extent->start >> PAGE_CACHE_SHIFT;
+ count = ~0;
+ skip = 0;
end = (extent->end >> PAGE_CACHE_SHIFT) + 1;
if ((end << PAGE_CACHE_SHIFT) < extent->end)
end = ~0;
+ if (lsm->lsm_stripe_count > 1) {
+ count = lsm->lsm_stripe_size >> PAGE_CACHE_SHIFT;
+ skip = (lsm->lsm_stripe_count - 1) * count;
+ start += (start/count * skip) + (stripe * count);
+ if (end != ~0)
+ end += (end/count * skip) + (stripe * count);
+ }
i = (inode->i_size + PAGE_CACHE_SIZE-1) >> PAGE_CACHE_SHIFT;
- if (end >= i)
- clear_bit(LLI_F_HAVE_OST_SIZE_LOCK,
- &(ll_i2info(inode)->lli_flags));
if (i < end)
end = i;
- CDEBUG(D_INODE, "walking page indices start: %lu end: %lu\n", start,
- end);
+ CDEBUG(D_INODE, "walking page indices start: %lu j: %lu count: %lu "
+ "skip: %lu end: %lu%s\n", start, start % count, count, skip, end,
+ discard ? " (DISCARDING)" : "");
+
+ /* this is the simplistic implementation of page eviction at
+ * cancelation. It is careful to get races with other page
+ * lockers handled correctly. fixes from bug 20 will make it
+ * more efficient by associating locks with pages and with
+ * batching writeback under the lock explicitly. */
+ for (i = start, j = start % count ; ; j++, i++) {
+ if (j == count) {
+ i += skip;
+ j = 0;
+ }
+ if (i >= end)
+ break;
- for (i = start; i < end; i++) {
ll_pgcache_lock(inode->i_mapping);
if (list_empty(&inode->i_mapping->dirty_pages) &&
- list_empty(&inode->i_mapping->clean_pages) &&
- list_empty(&inode->i_mapping->locked_pages)) {
+ list_empty(&inode->i_mapping->clean_pages) &&
+ list_empty(&inode->i_mapping->locked_pages)) {
CDEBUG(D_INODE, "nothing left\n");
ll_pgcache_unlock(inode->i_mapping);
break;
page = find_get_page(inode->i_mapping, i);
if (page == NULL)
continue;
-
- LL_CDEBUG_PAGE(page, "locking\n");
+ LL_CDEBUG_PAGE(page, "locking page\n");
lock_page(page);
/* page->mapping to check with racing against teardown */
list_add(&page->list, &inode->i_mapping->locked_pages);
ll_pgcache_unlock(inode->i_mapping);
-#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
- rc = inode->i_mapping->a_ops->writepage(page);
-#else
- rc = inode->i_mapping->a_ops->writepage(page, NULL);
-#endif
+ rc = ll_call_writepage(inode, page);
if (rc != 0)
CERROR("writepage of page %p failed: %d\n",
page, rc);
unlock_page(page);
page_cache_release(page);
}
-
- if (test_bit(LLI_F_PREFER_EXTENDED_SIZE, &lli->lli_flags)) {
- rc = obd_lock_contains(exp, lsm, lock, inode->i_size - 1);
- if (rc != 0) {
- if (rc < 0)
- CERROR("obd_lock_contains: rc = %d\n", rc);
- clear_bit(LLI_F_PREFER_EXTENDED_SIZE, &lli->lli_flags);
- }
- }
-
EXIT;
}
int rc;
ENTRY;
-
if ((unsigned long)data > 0 && (unsigned long)data < 0x1000) {
LDLM_ERROR(lock, "cancelling lock with bad data %p", data);
LBUG();
case LDLM_CB_CANCELING: {
struct inode *inode = ll_inode_from_lock(lock);
struct ll_inode_info *lli;
+ struct lov_stripe_md *lsm;
+ __u32 stripe;
+ __u64 kms;
- if (!inode)
- RETURN(0);
- lli= ll_i2info(inode);
- if (!lli)
+ if (inode == NULL)
RETURN(0);
- if (!lli->lli_smd)
- RETURN(0);
-
- ll_pgcache_remove_extent(inode, lli->lli_smd, lock);
+ lli = ll_i2info(inode);
+ if (lli == NULL)
+ goto iput;
+ if (lli->lli_smd == NULL)
+ goto iput;
+ lsm = lli->lli_smd;
+
+ stripe = ll_lock_to_stripe_offset(inode, lock);
+ ll_pgcache_remove_extent(inode, lsm, lock, stripe);
+
+ down(&inode->i_sem);
+ kms = ldlm_extent_shift_kms(lock,
+ lsm->lsm_oinfo[stripe].loi_kms);
+ if (lsm->lsm_oinfo[stripe].loi_kms != kms)
+ LDLM_DEBUG(lock, "updating kms from "LPU64" to "LPU64,
+ lsm->lsm_oinfo[stripe].loi_kms, kms);
+ lsm->lsm_oinfo[stripe].loi_kms = kms;
+ up(&inode->i_sem);
//ll_try_done_writing(inode);
+ iput:
iput(inode);
break;
}
RETURN(0);
}
-/*
- * some callers, notably truncate, really don't want i_size set based
- * on the the size returned by the getattr, or lock acquisition in
- * the future.
- */
-int ll_extent_lock_no_validate(struct ll_file_data *fd, struct inode *inode,
- struct lov_stripe_md *lsm,
- int mode, struct ldlm_extent *extent,
- struct lustre_handle *lockh, int ast_flags)
+#if 0
+int ll_async_completion_ast(struct ldlm_lock *lock, int flags, void *data)
{
- struct ll_sb_info *sbi = ll_i2sbi(inode);
- int rc;
+ /* XXX ALLOCATE - 160 bytes */
+ struct inode *inode = ll_inode_from_lock(lock);
+ struct ll_inode_info *lli = ll_i2info(inode);
+ struct lustre_handle lockh = { 0 };
+ struct ost_lvb *lvb;
+ __u32 stripe;
ENTRY;
- LASSERT(lockh->cookie == 0);
+ if (flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED |
+ LDLM_FL_BLOCK_CONV)) {
+ LBUG(); /* not expecting any blocked async locks yet */
+ LDLM_DEBUG(lock, "client-side async enqueue returned a blocked "
+ "lock, returning");
+ ldlm_lock_dump(D_OTHER, lock, 0);
+ ldlm_reprocess_all(lock->l_resource);
+ RETURN(0);
+ }
- /* XXX phil: can we do this? won't it screw the file size up? */
- if ((fd && (fd->fd_flags & LL_FILE_IGNORE_LOCK)) ||
- (sbi->ll_flags & LL_SBI_NOLCK))
+ LDLM_DEBUG(lock, "client-side async enqueue: granted/glimpsed");
+
+ stripe = ll_lock_to_stripe_offset(inode, lock);
+
+ if (lock->l_lvb_len) {
+ struct lov_stripe_md *lsm = lli->lli_smd;
+ __u64 kms;
+ lvb = lock->l_lvb_data;
+ lsm->lsm_oinfo[stripe].loi_rss = lvb->lvb_size;
+
+ down(&inode->i_sem);
+ kms = MAX(lsm->lsm_oinfo[stripe].loi_kms, lvb->lvb_size);
+ kms = ldlm_extent_shift_kms(NULL, kms);
+ if (lsm->lsm_oinfo[stripe].loi_kms != kms)
+ LDLM_DEBUG(lock, "updating kms from "LPU64" to "LPU64,
+ lsm->lsm_oinfo[stripe].loi_kms, kms);
+ lsm->lsm_oinfo[stripe].loi_kms = kms;
+ up(&inode->i_sem);
+ }
+
+ iput(inode);
+ wake_up(&lock->l_waitq);
+
+ ldlm_lock2handle(lock, &lockh);
+ ldlm_lock_decref(&lockh, LCK_PR);
+ RETURN(0);
+}
+#endif
+
+/* This function is a disaster. I hate the LOV. */
+static int ll_glimpse_callback(struct ldlm_lock *lock, void *reqp)
+{
+ struct ptlrpc_request *req = reqp;
+ struct inode *inode = ll_inode_from_lock(lock);
+ struct obd_export *exp;
+ struct ll_inode_info *lli;
+ struct ost_lvb *lvb;
+ struct {
+ int stripe_number;
+ __u64 size;
+ struct lov_stripe_md *lsm;
+ } data;
+ __u32 vallen = sizeof(data);
+ int rc, size = sizeof(*lvb);
+ ENTRY;
+
+ if (inode == NULL)
RETURN(0);
+ lli = ll_i2info(inode);
+ if (lli == NULL)
+ goto iput;
+ if (lli->lli_smd == NULL)
+ goto iput;
+ exp = ll_i2obdexp(inode);
+
+ /* First, find out which stripe index this lock corresponds to. */
+ if (lli->lli_smd->lsm_stripe_count > 1)
+ data.stripe_number = ll_lock_to_stripe_offset(inode, lock);
+ else
+ data.stripe_number = 0;
- CDEBUG(D_DLMTRACE, "Locking inode %lu, start "LPU64" end "LPU64"\n",
- inode->i_ino, extent->start, extent->end);
+ data.size = inode->i_size;
+ data.lsm = lli->lli_smd;
- rc = obd_enqueue(sbi->ll_osc_exp, lsm, NULL, LDLM_EXTENT, extent,
- sizeof(extent), mode, &ast_flags,
- ll_extent_lock_callback, inode, lockh);
+ rc = obd_get_info(exp, strlen("size_to_stripe"), "size_to_stripe",
+ &vallen, &data);
+ if (rc != 0) {
+ CERROR("obd_get_info: rc = %d\n", rc);
+ LBUG();
+ }
+
+ LDLM_DEBUG(lock, "i_size: %Lu -> stripe number %d -> size %Lu",
+ inode->i_size, data.stripe_number, data.size);
+
+ rc = lustre_pack_reply(req, 1, &size, NULL);
+ if (rc) {
+ CERROR("lustre_pack_reply: %d\n", rc);
+ goto iput;
+ }
+
+ lvb = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*lvb));
+ lvb->lvb_size = data.size;
+ ptlrpc_reply(req);
+
+ iput:
+ iput(inode);
+ RETURN(0);
+}
+
+__u64 lov_merge_size(struct lov_stripe_md *lsm, int kms);
+__u64 lov_merge_mtime(struct lov_stripe_md *lsm, __u64 current_time);
+
+/* NB: lov_merge_size will prefer locally cached writes if they extend the
+ * file (because it prefers KMS over RSS when larger) */
+int ll_glimpse_size(struct inode *inode, struct ost_lvb *lvb)
+{
+ struct ll_inode_info *lli = ll_i2info(inode);
+ struct ll_sb_info *sbi = ll_i2sbi(inode);
+ ldlm_policy_data_t policy = { .l_extent = { 0, OBD_OBJECT_EOF } };
+ struct lustre_handle lockh;
+ int rc, flags = LDLM_FL_HAS_INTENT;
+ ENTRY;
+
+ CDEBUG(D_DLMTRACE, "Glimpsing inode %lu\n", inode->i_ino);
+
+ rc = obd_enqueue(sbi->ll_osc_exp, lli->lli_smd, LDLM_EXTENT, &policy,
+ LCK_PR, &flags, ll_extent_lock_callback,
+ ldlm_completion_ast, ll_glimpse_callback, inode,
+ sizeof(*lvb), lustre_swab_ost_lvb, &lockh);
if (rc > 0)
- rc = -EIO;
+ RETURN(-EIO);
+
+ lvb->lvb_size = lov_merge_size(lli->lli_smd, 0);
+ //inode->i_mtime = lov_merge_mtime(lli->lli_smd, inode->i_mtime);
+
+ CDEBUG(D_DLMTRACE, "glimpse: size: "LPU64"\n", lvb->lvb_size);
+
+ obd_cancel(sbi->ll_osc_exp, lli->lli_smd, LCK_PR, &lockh);
+
RETURN(rc);
}
-/*
- * this grabs a lock and manually implements behaviour that makes it look like
- * the OST is returning the file size with each lock acquisition.
- */
int ll_extent_lock(struct ll_file_data *fd, struct inode *inode,
struct lov_stripe_md *lsm, int mode,
- struct ldlm_extent *extent, struct lustre_handle *lockh)
+ ldlm_policy_data_t *policy, struct lustre_handle *lockh,
+ int ast_flags)
{
- struct ll_inode_info *lli = ll_i2info(inode);
- struct obd_export *exp = ll_i2obdexp(inode);
- struct ldlm_extent size_lock;
- struct lustre_handle match_lockh = {0};
- struct obdo oa;
- obd_flag refresh_valid;
- int flags, rc, matched;
+ struct ll_sb_info *sbi = ll_i2sbi(inode);
+ int rc;
ENTRY;
- rc = ll_extent_lock_no_validate(fd, inode, lsm, mode, extent, lockh, 0);
- if (rc != ELDLM_OK)
- RETURN(rc);
+ LASSERT(lockh->cookie == 0);
- if (test_bit(LLI_F_HAVE_OST_SIZE_LOCK, &lli->lli_flags))
+ /* XXX phil: can we do this? won't it screw the file size up? */
+ if ((fd && (fd->fd_flags & LL_FILE_IGNORE_LOCK)) ||
+ (sbi->ll_flags & LL_SBI_NOLCK))
RETURN(0);
- rc = ll_lsm_getattr(exp, lsm, &oa);
- if (rc)
- GOTO(out, rc);
+ CDEBUG(D_DLMTRACE, "Locking inode %lu, start "LPU64" end "LPU64"\n",
+ inode->i_ino, policy->l_extent.start, policy->l_extent.end);
- /* We set this flag in commit write as we extend the file size. When
- * the bit is set and the lock is canceled that covers the file size,
- * we clear the bit. This is enough to protect the window where our
- * local size extension is needed for writeback. However, it relies on
- * behaviour that won't be true in the near future. This assumes that
- * all getattr callers get extent locks, which they currnetly do. It
- * also assumes that we only send discarding asts for {0,eof} truncates
- * as is currently the case. This will have to be replaced by the
- * proper eoc communication between clients and the ost, which is on
- * its way. */
- refresh_valid = (OBD_MD_FLBLOCKS | OBD_MD_FLBLKSZ | OBD_MD_FLMTIME |
- OBD_MD_FLCTIME | OBD_MD_FLSIZE);
- if (test_bit(LLI_F_PREFER_EXTENDED_SIZE, &lli->lli_flags)) {
- if (oa.o_size < inode->i_size)
- refresh_valid &= ~OBD_MD_FLSIZE;
- else
- clear_bit(LLI_F_PREFER_EXTENDED_SIZE, &lli->lli_flags);
- }
- obdo_refresh_inode(inode, &oa, refresh_valid);
-
- CDEBUG(D_INODE, "objid "LPX64" size %Lu, blocks %lu, blksize %lu\n",
- lsm->lsm_object_id, inode->i_size, inode->i_blocks,
- inode->i_blksize);
-
- size_lock.start = inode->i_size;
- size_lock.end = OBD_OBJECT_EOF;
-
- /* XXX I bet we should be checking the lock ignore flags.. */
- flags = LDLM_FL_CBPENDING | LDLM_FL_BLOCK_GRANTED;
- matched = obd_match(exp, lsm, LDLM_EXTENT, &size_lock,
- sizeof(size_lock), LCK_PR, &flags, inode,
- &match_lockh);
- if (matched < 0)
- GOTO(out, rc = matched);
-
- /* hey, alright, we hold a size lock that covers the size we
- * just found, its not going to change for a while.. */
- if (matched == 1) {
- set_bit(LLI_F_HAVE_OST_SIZE_LOCK, &lli->lli_flags);
- obd_cancel(exp, lsm, LCK_PR, &match_lockh);
- }
+ rc = obd_enqueue(sbi->ll_osc_exp, lsm, LDLM_EXTENT, policy, mode,
+ &ast_flags, ll_extent_lock_callback,
+ ldlm_completion_ast, ll_glimpse_callback, inode,
+ sizeof(struct ost_lvb), lustre_swab_ost_lvb, lockh);
+ if (rc > 0)
+ rc = -EIO;
+
+ if (policy->l_extent.start == 0 &&
+ policy->l_extent.end == OBD_OBJECT_EOF)
+ inode->i_size = lov_merge_size(lsm, 1);
+
+ //inode->i_mtime = lov_merge_mtime(lsm, inode->i_mtime);
- rc = 0;
-out:
- if (rc)
- ll_extent_unlock(fd, inode, lsm, mode, lockh);
RETURN(rc);
}
struct ll_inode_info *lli = ll_i2info(inode);
struct lov_stripe_md *lsm = lli->lli_smd;
struct lustre_handle lockh = { 0 };
- struct ldlm_extent extent;
+ ldlm_policy_data_t policy;
ldlm_error_t err;
ssize_t retval;
+ __u64 kms;
ENTRY;
CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),size="LPSZ",offset=%Ld\n",
inode->i_ino, inode->i_generation, inode, count, *ppos);
if (!lsm)
RETURN(0);
- /* grab a -> eof extent to push extending writes out of node's caches
- * so we can see them at the getattr after lock acquisition. this will
- * turn into a seperate [*ppos + count, EOF] 'size intent' lock attempt
- * in the future. */
- extent.start = *ppos;
- extent.end = OBD_OBJECT_EOF;
+ policy.l_extent.start = *ppos;
+ policy.l_extent.end = *ppos + count - 1;
- err = ll_extent_lock(fd, inode, lsm, LCK_PR, &extent, &lockh);
+ err = ll_extent_lock(fd, inode, lsm, LCK_PR, &policy, &lockh, 0);
if (err != ELDLM_OK)
RETURN(err);
+ kms = lov_merge_size(lsm, 1);
+ if (policy.l_extent.end > kms) {
+ /* A glimpse is necessary to determine whether we return a short
+ * read or some zeroes at the end of the buffer */
+ struct ost_lvb lvb;
+ retval = ll_glimpse_size(inode, &lvb);
+ if (retval)
+ goto out;
+ inode->i_size = lvb.lvb_size;
+ } else {
+ inode->i_size = kms;
+ }
- CDEBUG(D_INFO, "Reading inode %lu, "LPSZ" bytes, offset %Ld\n",
- inode->i_ino, count, *ppos);
+ CDEBUG(D_INFO, "Reading inode %lu, "LPSZ" bytes, offset %Ld, i_size "
+ LPU64"\n", inode->i_ino, count, *ppos, inode->i_size);
/* turn off the kernel's read-ahead */
#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
#endif
retval = generic_file_read(filp, buf, count, ppos);
- /* XXX errors? */
+ out:
ll_extent_unlock(fd, inode, lsm, LCK_PR, &lockh);
RETURN(retval);
}
struct inode *inode = file->f_dentry->d_inode;
struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
struct lustre_handle lockh = { 0 };
- struct ldlm_extent extent;
+ ldlm_policy_data_t policy;
loff_t maxbytes = ll_file_maxbytes(inode);
ldlm_error_t err;
ssize_t retval;
- char should_validate = 1;
ENTRY;
CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),size="LPSZ",offset=%Ld\n",
inode->i_ino, inode->i_generation, inode, count, *ppos);
LASSERT(lsm);
if (file->f_flags & O_APPEND) {
- extent.start = 0;
- extent.end = OBD_OBJECT_EOF;
+ policy.l_extent.start = 0;
+ policy.l_extent.end = OBD_OBJECT_EOF;
} else {
- extent.start = *ppos;
- extent.end = *ppos + count - 1;
- /* we really don't care what i_size is if we're doing
- * fully page aligned writes */
- if ((*ppos & ~PAGE_CACHE_MASK) == 0 &&
- (count & ~PAGE_CACHE_MASK) == 0)
- should_validate = 0;
+ policy.l_extent.start = *ppos;
+ policy.l_extent.end = *ppos + count - 1;
}
- if (should_validate)
- err = ll_extent_lock(fd, inode, lsm, LCK_PW, &extent, &lockh);
- else
- err = ll_extent_lock_no_validate(fd, inode, lsm, LCK_PW,
- &extent, &lockh, 0);
+ err = ll_extent_lock(fd, inode, lsm, LCK_PW, &policy, &lockh, 0);
if (err != ELDLM_OK)
RETURN(err);
lprocfs_counter_incr(ll_i2sbi(inode)->ll_stats, LPROC_LL_LLSEEK);
if (origin == 2) { /* SEEK_END */
ldlm_error_t err;
- struct ldlm_extent extent = {0, OBD_OBJECT_EOF};
- err = ll_extent_lock(fd, inode, lsm, LCK_PR, &extent, &lockh);
+ ldlm_policy_data_t policy = { .l_extent = {0, OBD_OBJECT_EOF }};
+ err = ll_extent_lock(fd, inode, lsm, LCK_PR, &policy, &lockh,0);
if (err != ELDLM_OK)
RETURN(err);
struct ldlm_res_id res_id =
{ .name = {inode->i_ino, inode->i_generation, LDLM_FLOCK} };
struct lustre_handle lockh = {0};
- struct ldlm_flock flock;
+ ldlm_policy_data_t flock;
ldlm_mode_t mode = 0;
int flags = 0;
int rc;
CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu file_lock=%p\n",
inode->i_ino, file_lock);
- flock.pid = file_lock->fl_pid;
- flock.start = file_lock->fl_start;
- flock.end = file_lock->fl_end;
+ flock.l_flock.pid = file_lock->fl_pid;
+ flock.l_flock.start = file_lock->fl_start;
+ flock.l_flock.end = file_lock->fl_end;
switch (file_lock->fl_type) {
case F_RDLCK:
}
CDEBUG(D_DLMTRACE, "inode=%lu, pid=%u, flags=%#x, mode=%u, "
- "start="LPU64", end="LPU64"\n", inode->i_ino, flock.pid,
- flags, mode, flock.start, flock.end);
+ "start="LPU64", end="LPU64"\n", inode->i_ino, flock.l_flock.pid,
+ flags, mode, flock.l_flock.start, flock.l_flock.end);
obddev = sbi->ll_mdc_exp->exp_obd;
rc = ldlm_cli_enqueue(sbi->ll_mdc_exp, NULL, obddev->obd_namespace,
- NULL, res_id, LDLM_FLOCK, &flock, sizeof(flock),
- mode, &flags, ldlm_flock_completion_ast, NULL,
- file_lock, &lockh);
+ res_id, LDLM_FLOCK, &flock, mode, &flags,
+ NULL, ldlm_flock_completion_ast, NULL, file_lock,
+ NULL, 0, NULL, &lockh);
RETURN(rc);
}
flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_CBPENDING;
if (ldlm_lock_match(obddev->obd_namespace, flags, &res_id, LDLM_PLAIN,
- NULL, 0, LCK_PR, &lockh)) {
+ NULL, LCK_PR, &lockh)) {
ldlm_lock_decref(&lockh, LCK_PR);
RETURN(1);
}
if (ldlm_lock_match(obddev->obd_namespace, flags, &res_id, LDLM_PLAIN,
- NULL, 0, LCK_PW, &lockh)) {
+ NULL, LCK_PW, &lockh)) {
ldlm_lock_decref(&lockh, LCK_PW);
RETURN(1);
}
int ll_inode_revalidate_it(struct dentry *dentry, struct lookup_intent *it)
{
struct inode *inode = dentry->d_inode;
+ struct ll_inode_info *lli;
struct lov_stripe_md *lsm;
ENTRY;
CERROR("REPORT THIS LINE TO PETER\n");
RETURN(0);
}
+ lli = ll_i2info(inode);
CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),name=%s\n",
inode->i_ino, inode->i_generation, inode, dentry->d_name.name);
#if (LINUX_VERSION_CODE <= KERNEL_VERSION(2,5,0))
ptlrpc_req_finished(req);
}
-#if 0
- if (ll_have_md_lock(dentry) &&
- test_bit(LLI_F_HAVE_MDS_SIZE_LOCK, &ll_i2info(inode)->lli_flags))
- RETURN(0);
-#endif
-
- lsm = ll_i2info(inode)->lli_smd;
- if (!lsm) /* object not yet allocated, don't validate size */
+ lsm = lli->lli_smd;
+ if (lsm == NULL) /* object not yet allocated, don't validate size */
RETURN(0);
- /* unfortunately stat comes in through revalidate and we don't
- * differentiate this use from initial instantiation. we're
- * also being wildly conservative and flushing write caches
- * so that stat really returns the proper size. */
+ /* ll_glimpse_size will prefer locally cached writes if they extend
+ * the file */
{
- struct ldlm_extent extent = {0, OBD_OBJECT_EOF};
- struct lustre_handle lockh = {0};
+ struct ost_lvb lvb;
ldlm_error_t err;
- err = ll_extent_lock(NULL, inode, lsm, LCK_PR, &extent, &lockh);
- if (err != ELDLM_OK)
- RETURN(err);
-
- ll_extent_unlock(NULL, inode, lsm, LCK_PR, &lockh);
+ err = ll_glimpse_size(inode, &lvb);
+ inode->i_size = lvb.lvb_size;
}
RETURN(0);
}
EXIT;
}
+#if 0
/* If we know the file size and have the cookies:
* - send a DONE_WRITING rpc
*
static void ll_close_done_writing(struct inode *inode)
{
struct ll_inode_info *lli = ll_i2info(inode);
- struct ldlm_extent extent = { .start = 0, .end = OBD_OBJECT_EOF };
+ ldlm_policy_data_t policy = { .l_extent = {0, OBD_OBJECT_EOF } };
struct lustre_handle lockh = { 0 };
struct obdo obdo;
obd_flag valid;
if (test_bit(LLI_F_HAVE_OST_SIZE_LOCK, &lli->lli_flags))
goto rpc;
- rc = ll_extent_lock_no_validate(NULL, inode, lli->lli_smd, LCK_PW,
- &extent, &lockh, ast_flags);
+ rc = ll_extent_lock(NULL, inode, lli->lli_smd, LCK_PW, &policy, &lockh,
+ ast_flags);
if (rc != ELDLM_OK) {
CERROR("lock acquisition failed (%d): unable to send "
"DONE_WRITING for inode %lu/%u\n", rc, inode->i_ino,
rc = mdc_done_writing(ll_i2sbi(inode)->ll_mdc_exp, &obdo);
out:
- iput(inode);
}
+#endif
static struct ll_inode_info *ll_close_next_lli(struct ll_close_queue *lcq)
{
while (1) {
struct l_wait_info lwi = { 0 };
struct ll_inode_info *lli;
- struct inode *inode;
+ //struct inode *inode;
l_wait_event_exclusive(lcq->lcq_waitq,
(lli = ll_close_next_lli(lcq)) != NULL,
if (IS_ERR(lli))
break;
- inode = ll_info2i(lli);
- ll_close_done_writing(inode);
+ //inode = ll_info2i(lli);
+ //ll_close_done_writing(inode);
+ //iput(inode);
}
complete(&lcq->lcq_comp);
extern struct inode_operations ll_special_inode_operations;
extern int ll_inode_revalidate_it(struct dentry *, struct lookup_intent *);
int ll_extent_lock(struct ll_file_data *, struct inode *,
- struct lov_stripe_md *, int mode, struct ldlm_extent *,
- struct lustre_handle *);
+ struct lov_stripe_md *, int mode, ldlm_policy_data_t *,
+ struct lustre_handle *, int ast_flags);
int ll_extent_unlock(struct ll_file_data *, struct inode *,
struct lov_stripe_md *, int mode, struct lustre_handle *);
int ll_file_open(struct inode *inode, struct file *file);
int ll_file_release(struct inode *inode, struct file *file);
int ll_lsm_getattr(struct obd_export *, struct lov_stripe_md *, struct obdo *);
-int ll_extent_lock_no_validate(struct ll_file_data *, struct inode *,
- struct lov_stripe_md *, int mode,
- struct ldlm_extent *, struct lustre_handle *,
- int ast_flags);
+int ll_glimpse_size(struct inode *inode, struct ost_lvb *lvb);
#if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
int ll_getattr(struct vfsmount *mnt, struct dentry *de,
struct lookup_intent *it, struct kstat *stat);
* last one is especially bad for racing o_append users on other
* nodes. */
if (ia_valid & ATTR_SIZE) {
- struct ldlm_extent extent = { .start = attr->ia_size,
- .end = OBD_OBJECT_EOF };
+ ldlm_policy_data_t policy = { .l_extent = {attr->ia_size,
+ OBD_OBJECT_EOF } };
struct lustre_handle lockh = { 0 };
int err, ast_flags = 0;
/* XXX when we fix the AST intents to pass the discard-range
/* bug 1639: avoid write/truncate i_sem/DLM deadlock */
LASSERT(atomic_read(&inode->i_sem.count) <= 0);
up(&inode->i_sem);
- rc = ll_extent_lock_no_validate(NULL, inode, lsm, LCK_PW,
- &extent, &lockh, ast_flags);
+ rc = ll_extent_lock(NULL, inode, lsm, LCK_PW, &policy, &lockh,
+ ast_flags);
down(&inode->i_sem);
if (rc != ELDLM_OK)
RETURN(rc);
rc = vmtruncate(inode, attr->ia_size);
- if (rc == 0)
- set_bit(LLI_F_HAVE_OST_SIZE_LOCK,
- &ll_i2info(inode)->lli_flags);
-
- //ll_try_done_writing(inode);
+ /* We need to drop the semaphore here, because this unlock may
+ * result in a cancellation, which will need the i_sem */
+ up(&inode->i_sem);
/* unlock now as we don't mind others file lockers racing with
* the mds updates below? */
err = ll_extent_unlock(NULL, inode, lsm, LCK_PW, &lockh);
+ down(&inode->i_sem);
if (err) {
CERROR("ll_extent_unlock failed: %d\n", err);
if (!rc)
#include <linux/obd_support.h>
#include <linux/lustre_lite.h>
#include <linux/lustre_dlm.h>
+#include <linux/lustre_version.h>
#include "llite_internal.h"
/* methods */
/* If this is a stat, get the authoritative file size */
if (it->it_op == IT_GETATTR && S_ISREG(inode->i_mode) &&
ll_i2info(inode)->lli_smd != NULL) {
- struct ldlm_extent extent = {0, OBD_OBJECT_EOF};
- struct lustre_handle lockh = {0};
struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
+ struct ost_lvb lvb;
ldlm_error_t rc;
LASSERT(lsm->lsm_object_id != 0);
/* bug 2334: drop MDS lock before acquiring OST lock */
ll_intent_drop_lock(it);
- rc = ll_extent_lock(NULL, inode, lsm, LCK_PR, &extent,
- &lockh);
- if (rc != ELDLM_OK) {
+ rc = ll_glimpse_size(inode, &lvb);
+ if (rc) {
iput(inode);
RETURN(-EIO);
}
- ll_extent_unlock(NULL, inode, lsm, LCK_PR, &lockh);
+ inode->i_size = lvb.lvb_size;
}
dentry = *de = ll_find_alias(inode, dentry);
}
+#if (LUSTRE_KERNEL_VERSION < 33)
static struct dentry *ll_lookup_it(struct inode *parent, struct dentry *dentry,
struct lookup_intent *it, int flags)
+#else
+static struct dentry *ll_lookup_it(struct inode *parent, struct dentry *dentry,
+ struct nameidata *nd,
+ struct lookup_intent *it, int flags)
+#endif
{
struct dentry *save = dentry, *retval;
struct ll_fid pfid;
oa.o_id, inode->i_size);
/* truncate == punch from new size to absolute end of file */
+ /* NB: obd_punch must be called with i_sem held! It updates the kms! */
rc = obd_punch(ll_i2obdexp(inode), &oa, lsm, inode->i_size,
OBD_OBJECT_EOF, NULL);
if (rc)
return;
} /* ll_truncate */
+__u64 lov_merge_size(struct lov_stripe_md *lsm, int kms);
int ll_prepare_write(struct file *file, struct page *page, unsigned from,
unsigned to)
{
obd_off offset = ((obd_off)page->index) << PAGE_SHIFT;
struct brw_page pga;
struct obdo oa;
+ __u64 kms;
int rc = 0;
ENTRY;
RETURN(0);
}
- /* If are writing to a new page, no need to read old data.
- * the extent locking and getattr procedures in ll_file_write have
- * guaranteed that i_size is stable enough for our zeroing needs */
- if (inode->i_size <= offset) {
+ /* If are writing to a new page, no need to read old data. The extent
+ * locking will have updated the KMS, and for our purposes here we can
+ * treat it like i_size. */
+ kms = lov_merge_size(lsm, 1);
+ if (kms <= offset) {
memset(kmap(page), 0, PAGE_SIZE);
kunmap(page);
GOTO(prepare_done, rc = 0);
RETURN(llap);
}
+void lov_increase_kms(struct obd_export *exp, struct lov_stripe_md *lsm,
+ obd_off size);
/* update our write count to account for i_size increases that may have
* happened since we've queued the page for io. */
out:
if (rc == 0) {
- /* XXX needs to be pushed down to the OSC as EOC */
size = (((obd_off)page->index) << PAGE_SHIFT) + to;
- if (size > inode->i_size) {
+ lov_increase_kms(exp, lsm, size);
+ if (size > inode->i_size)
inode->i_size = size;
- /* see commentary in file.c:ll_inode_getattr() */
- set_bit(LLI_F_PREFER_EXTENDED_SIZE, &lli->lli_flags);
- }
SetPageUptodate(page);
}
RETURN(rc);
{
struct lustre_handle match_lockh = {0};
struct inode *inode = page->mapping->host;
- struct ldlm_extent page_extent;
+ ldlm_policy_data_t page_extent;
int flags, matches;
ENTRY;
- page_extent.start = (__u64)page->index << PAGE_CACHE_SHIFT;
- page_extent.end = page_extent.start + PAGE_CACHE_SIZE - 1;
+ page_extent.l_extent.start = (__u64)page->index << PAGE_CACHE_SHIFT;
+ page_extent.l_extent.end =
+ page_extent.l_extent.start + PAGE_CACHE_SIZE - 1;
flags = LDLM_FL_CBPENDING | LDLM_FL_BLOCK_GRANTED;
matches = obd_match(ll_i2sbi(inode)->ll_osc_exp,
ll_i2info(inode)->lli_smd, LDLM_EXTENT,
- &page_extent, sizeof(page_extent),
- LCK_PR, &flags, inode, &match_lockh);
+ &page_extent, LCK_PR, &flags, inode, &match_lockh);
if (matches < 0) {
LL_CDEBUG_PAGE(page, "lock match failed\n");
RETURN(matches);
static int lov_stripe_offset(struct lov_stripe_md *lsm, obd_off lov_off,
int stripeno, obd_off *obd_off);
+struct lov_lock_handles {
+ struct portals_handle llh_handle;
+ atomic_t llh_refcount;
+ int llh_stripe_count;
+ struct lustre_handle llh_handles[0];
+};
+
+static void lov_llh_addref(void *llhp)
+{
+ struct lov_lock_handles *llh = llhp;
+
+ atomic_inc(&llh->llh_refcount);
+ CDEBUG(D_INFO, "GETting llh %p : new refcount %d\n", llh,
+ atomic_read(&llh->llh_refcount));
+}
+
+static struct lov_lock_handles *lov_llh_new(struct lov_stripe_md *lsm)
+{
+ struct lov_lock_handles *llh;
+
+ OBD_ALLOC(llh, sizeof *llh +
+ sizeof(*llh->llh_handles) * lsm->lsm_stripe_count);
+ if (llh == NULL) {
+ CERROR("out of memory\n");
+ return NULL;
+ }
+ atomic_set(&llh->llh_refcount, 2);
+ llh->llh_stripe_count = lsm->lsm_stripe_count;
+ INIT_LIST_HEAD(&llh->llh_handle.h_link);
+ class_handle_hash(&llh->llh_handle, lov_llh_addref);
+ return llh;
+}
+
+static struct lov_lock_handles *lov_handle2llh(struct lustre_handle *handle)
+{
+ ENTRY;
+ LASSERT(handle != NULL);
+ RETURN(class_handle2object(handle->cookie));
+}
+
+static void lov_llh_put(struct lov_lock_handles *llh)
+{
+ CDEBUG(D_INFO, "PUTting llh %p : new refcount %d\n", llh,
+ atomic_read(&llh->llh_refcount) - 1);
+ LASSERT(atomic_read(&llh->llh_refcount) > 0 &&
+ atomic_read(&llh->llh_refcount) < 0x5a5a);
+ if (atomic_dec_and_test(&llh->llh_refcount)) {
+ LASSERT(list_empty(&llh->llh_handle.h_link));
+ OBD_FREE(llh, sizeof *llh +
+ sizeof(*llh->llh_handles) * llh->llh_stripe_count);
+ }
+}
+
+static void lov_llh_destroy(struct lov_lock_handles *llh)
+{
+ class_handle_unhash(&llh->llh_handle);
+ lov_llh_put(llh);
+}
+
/* obd methods */
int lov_attach(struct obd_device *dev, obd_count len, void *data)
{
} else {
--ost_start_count;
ost_start_idx += lsm->lsm_stripe_count;
+ if (lsm->lsm_stripe_count == ost_count)
+ ++ost_start_idx;
}
ost_idx = ost_start_idx % ost_count;
} else {
return ret;
}
+/* Given a whole-file size and a stripe number, give the file size which
+ * corresponds to the individual object of that stripe.
+ *
+ * This behaves basically in the same was as lov_stripe_offset, except that
+ * file sizes falling before the beginning of a stripe are clamped to the end
+ * of the previous stripe, not the beginning of the next:
+ *
+ * S
+ * ---------------------------------------------------------------------
+ * | 0 | 1 | 2 | 0 | 1 | 2 |
+ * ---------------------------------------------------------------------
+ *
+ * if clamped to stripe 2 becomes:
+ *
+ * S
+ * ---------------------------------------------------------------------
+ * | 0 | 1 | 2 | 0 | 1 | 2 |
+ * ---------------------------------------------------------------------
+ */
+static obd_off lov_size_to_stripe(struct lov_stripe_md *lsm, obd_off file_size,
+ int stripeno)
+{
+ unsigned long ssize = lsm->lsm_stripe_size;
+ unsigned long swidth = ssize * lsm->lsm_stripe_count;
+ unsigned long stripe_off, this_stripe;
+
+ if (file_size == OBD_OBJECT_EOF)
+ return OBD_OBJECT_EOF;
+
+ /* do_div(a, b) returns a % b, and a = a / b */
+ stripe_off = do_div(file_size, swidth);
+
+ this_stripe = stripeno * ssize;
+ if (stripe_off < this_stripe) {
+ /* Move to end of previous stripe, or zero */
+ if (file_size > 0) {
+ file_size--;
+ stripe_off = ssize;
+ } else {
+ stripe_off = 0;
+ }
+ } else {
+ stripe_off -= this_stripe;
+
+ if (stripe_off >= ssize) {
+ /* Clamp to end of this stripe */
+ stripe_off = ssize;
+ }
+ }
+
+ return (file_size * ssize + stripe_off);
+}
+
/* given an extent in an lov and a stripe, calculate the extent of the stripe
* that is contained within the lov extent. this returns true if the given
* stripe does intersect with the lov extent. */
}
if (!rc)
rc = err;
+ } else {
+ loi->loi_kms = loi->loi_rss = starti;
}
}
RETURN(rc);
}
static int lov_enqueue(struct obd_export *exp, struct lov_stripe_md *lsm,
- struct lustre_handle *parent_lock,
- __u32 type, void *cookie, int cookielen, __u32 mode,
- int *flags, void *cb, void *data,
+ __u32 type, ldlm_policy_data_t *policy, __u32 mode,
+ int *flags, void *bl_cb, void *cp_cb, void *gl_cb,
+ void *data,__u32 lvb_len, void *lvb_swabber,
struct lustre_handle *lockh)
{
+ struct lov_lock_handles *lov_lockh = NULL;
+ struct lustre_handle *lov_lockhp;
struct lov_obd *lov;
struct lov_oinfo *loi;
- struct lov_stripe_md submd;
- struct ldlm_extent *extent = cookie;
- int rc;
+ char submd_buf[sizeof(struct lov_stripe_md) + sizeof(struct lov_oinfo)];
+ struct lov_stripe_md *submd = (void *)submd_buf;
+ ldlm_error_t rc;
+ int i, save_flags = *flags;
ENTRY;
if (lsm_bad_magic(lsm))
if (!exp || !exp->exp_obd)
RETURN(-ENODEV);
+ if (lsm->lsm_stripe_count > 1) {
+ lov_lockh = lov_llh_new(lsm);
+ if (lov_lockh == NULL)
+ RETURN(-ENOMEM);
+
+ lockh->cookie = lov_lockh->llh_handle.h_cookie;
+ lov_lockhp = lov_lockh->llh_handles;
+ } else {
+ lov_lockhp = lockh;
+ }
+
lov = &exp->exp_obd->u.lov;
- loi = lsm->lsm_oinfo;
- if (lov->tgts[loi->loi_ost_idx].active == 0) {
- CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
- RETURN(-EIO);
- }
-
- /* XXX LOV STACKING: submd should be from the subobj */
- submd.lsm_object_id = loi->loi_id;
- submd.lsm_stripe_count = 0;
- /* XXX submd is not fully initialized here */
- *flags = 0;
- rc = obd_enqueue(lov->tgts[loi->loi_ost_idx].ltd_exp, &submd,
- parent_lock, type, extent, sizeof(*extent),
- mode, flags, cb, data, lockh);
-
- if (rc != ELDLM_OK) {
- memset(lockh, 0, sizeof(*lockh));
- if (lov->tgts[loi->loi_ost_idx].active)
- CERROR("error: enqueue objid "LPX64" subobj "
- LPX64" on OST idx %d: rc = %d\n",
- lsm->lsm_object_id, loi->loi_id,
- loi->loi_ost_idx, rc);
+ for (i = 0, loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count;
+ i++, loi++, lov_lockhp++) {
+ ldlm_policy_data_t sub_ext;
+
+ if (!lov_stripe_intersects(lsm, i, policy->l_extent.start,
+ policy->l_extent.end,
+ &sub_ext.l_extent.start,
+ &sub_ext.l_extent.end))
+ continue;
+
+ if (lov->tgts[loi->loi_ost_idx].active == 0) {
+ CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
+ continue;
+ }
+
+ /* XXX LOV STACKING: submd should be from the subobj */
+ submd->lsm_object_id = loi->loi_id;
+ submd->lsm_stripe_count = 0;
+ submd->lsm_oinfo->loi_rss = loi->loi_rss;
+ submd->lsm_oinfo->loi_kms = loi->loi_kms;
+ loi->loi_mtime = submd->lsm_oinfo->loi_mtime;
+ /* XXX submd is not fully initialized here */
+ *flags = save_flags;
+ rc = obd_enqueue(lov->tgts[loi->loi_ost_idx].ltd_exp, submd,
+ type, &sub_ext, mode, flags, bl_cb, cp_cb,
+ gl_cb, data, lvb_len, lvb_swabber, lov_lockhp);
+
+ /* XXX FIXME: This unpleasantness doesn't belong here at *all*.
+ * It belongs in the OSC, except that the OSC doesn't have
+ * access to the real LOI -- it gets a copy, that we created
+ * above, and that copy can be arbitrarily out of date.
+ *
+ * The LOV API is due for a serious rewriting anyways, and this
+ * can be addressed then. */
+ if (rc == ELDLM_OK) {
+ struct ldlm_lock *lock = ldlm_handle2lock(lov_lockhp);
+ __u64 tmp = submd->lsm_oinfo->loi_rss;
+
+ LASSERT(lock != NULL);
+ loi->loi_rss = tmp;
+ // Extend KMS up to the end of this lock, and no further
+ if (tmp > lock->l_policy_data.l_extent.end)
+ tmp = lock->l_policy_data.l_extent.end;
+ if (tmp > loi->loi_kms) {
+ CDEBUG(D_INODE, "lock acquired, setting rss="
+ LPU64", kms="LPU64"\n", loi->loi_rss,
+ tmp);
+ loi->loi_kms = tmp;
+ } else {
+ CDEBUG(D_INODE, "lock acquired, setting rss="
+ LPU64"; leaving kms="LPU64", end="LPU64
+ "\n", loi->loi_rss, loi->loi_kms,
+ lock->l_policy_data.l_extent.end);
+ }
+ ldlm_lock_allow_match(lock);
+ LDLM_LOCK_PUT(lock);
+ } else if (rc == ELDLM_LOCK_ABORTED &&
+ save_flags & LDLM_FL_HAS_INTENT) {
+ memset(lov_lockhp, 0, sizeof(*lov_lockhp));
+ loi->loi_rss = submd->lsm_oinfo->loi_rss;
+ CDEBUG(D_INODE, "glimpsed, setting rss="LPU64"; leaving"
+ " kms="LPU64"\n", loi->loi_rss, loi->loi_kms);
+ } else {
+ memset(lov_lockhp, 0, sizeof(*lov_lockhp));
+ if (lov->tgts[loi->loi_ost_idx].active) {
+ CERROR("error: enqueue objid "LPX64" subobj "
+ LPX64" on OST idx %d: rc = %d\n",
+ lsm->lsm_object_id, loi->loi_id,
+ loi->loi_ost_idx, rc);
+ GOTO(out_locks, rc);
+ }
+ }
}
- RETURN(rc);
+ if (lsm->lsm_stripe_count > 1)
+ lov_llh_put(lov_lockh);
+ RETURN(ELDLM_OK);
+
+ out_locks:
+ while (loi--, lov_lockhp--, i-- > 0) {
+ struct lov_stripe_md submd;
+ int err;
+
+ if (lov_lockhp->cookie == 0)
+ continue;
+
+ /* XXX LOV STACKING: submd should be from the subobj */
+ submd.lsm_object_id = loi->loi_id;
+ submd.lsm_stripe_count = 0;
+ err = obd_cancel(lov->tgts[loi->loi_ost_idx].ltd_exp, &submd,
+ mode, lov_lockhp);
+ if (err && lov->tgts[loi->loi_ost_idx].active) {
+ CERROR("error: cancelling objid "LPX64" on OST "
+ "idx %d after enqueue error: rc = %d\n",
+ loi->loi_id, loi->loi_ost_idx, err);
+ }
+ }
+
+ if (lsm->lsm_stripe_count > 1) {
+ lov_llh_destroy(lov_lockh);
+ lov_llh_put(lov_lockh);
+ }
+ return rc;
}
static int lov_match(struct obd_export *exp, struct lov_stripe_md *lsm,
- __u32 type, void *cookie, int cookielen, __u32 mode,
+ __u32 type, ldlm_policy_data_t *policy, __u32 mode,
int *flags, void *data, struct lustre_handle *lockh)
{
+ struct lov_lock_handles *lov_lockh = NULL;
+ struct lustre_handle *lov_lockhp;
struct lov_obd *lov;
struct lov_oinfo *loi;
struct lov_stripe_md submd;
- struct ldlm_extent *extent = cookie;
- int rc = 0;
+ ldlm_error_t rc = 0;
+ int i;
ENTRY;
if (lsm_bad_magic(lsm))
if (!exp || !exp->exp_obd)
RETURN(-ENODEV);
+ if (lsm->lsm_stripe_count > 1) {
+ lov_lockh = lov_llh_new(lsm);
+ if (lov_lockh == NULL)
+ RETURN(-ENOMEM);
+
+ lockh->cookie = lov_lockh->llh_handle.h_cookie;
+ lov_lockhp = lov_lockh->llh_handles;
+ } else {
+ lov_lockhp = lockh;
+ }
+
lov = &exp->exp_obd->u.lov;
- loi = lsm->lsm_oinfo;
- if (lov->tgts[loi->loi_ost_idx].active == 0) {
- CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
- RETURN(-EIO);
+ for (i = 0, loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count;
+ i++, loi++, lov_lockhp++) {
+ ldlm_policy_data_t sub_ext;
+ int lov_flags;
+
+ if (!lov_stripe_intersects(lsm, i, policy->l_extent.start,
+ policy->l_extent.end,
+ &sub_ext.l_extent.start,
+ &sub_ext.l_extent.end))
+ continue;
+
+ if (lov->tgts[loi->loi_ost_idx].active == 0) {
+ CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
+ rc = -EIO;
+ break;
+ }
+
+ /* XXX LOV STACKING: submd should be from the subobj */
+ submd.lsm_object_id = loi->loi_id;
+ submd.lsm_stripe_count = 0;
+ lov_flags = *flags;
+ /* XXX submd is not fully initialized here */
+ rc = obd_match(lov->tgts[loi->loi_ost_idx].ltd_exp, &submd,
+ type, &sub_ext, mode, &lov_flags, data,
+ lov_lockhp);
+ if (rc != 1)
+ break;
+ }
+ if (rc == 1) {
+ if (lsm->lsm_stripe_count > 1)
+ lov_llh_put(lov_lockh);
+ RETURN(1);
+ }
+
+ while (loi--, lov_lockhp--, i-- > 0) {
+ struct lov_stripe_md submd;
+ int err;
+
+ if (lov_lockhp->cookie == 0)
+ continue;
+
+ /* XXX LOV STACKING: submd should be from the subobj */
+ submd.lsm_object_id = loi->loi_id;
+ submd.lsm_stripe_count = 0;
+ err = obd_cancel(lov->tgts[loi->loi_ost_idx].ltd_exp, &submd,
+ mode, lov_lockhp);
+ if (err && lov->tgts[loi->loi_ost_idx].active) {
+ CERROR("error: cancelling objid "LPX64" on OST "
+ "idx %d after match failure: rc = %d\n",
+ loi->loi_id, loi->loi_ost_idx, err);
+ }
}
- /* XXX LOV STACKING: submd should be from the subobj */
- submd.lsm_object_id = loi->loi_id;
- submd.lsm_stripe_count = 0;
- /* XXX submd is not fully initialized here */
- rc = obd_match(lov->tgts[loi->loi_ost_idx].ltd_exp, &submd, type,
- extent, sizeof(*extent), mode, flags, data, lockh);
+ if (lsm->lsm_stripe_count > 1) {
+ lov_llh_destroy(lov_lockh);
+ lov_llh_put(lov_lockh);
+ }
RETURN(rc);
}
static int lov_cancel(struct obd_export *exp, struct lov_stripe_md *lsm,
__u32 mode, struct lustre_handle *lockh)
{
+ struct lov_lock_handles *lov_lockh = NULL;
+ struct lustre_handle *lov_lockhp;
struct lov_obd *lov;
struct lov_oinfo *loi;
- struct lov_stripe_md submd;
- int rc = 0;
+ int rc = 0, i;
ENTRY;
if (lsm_bad_magic(lsm))
if (!exp || !exp->exp_obd)
RETURN(-ENODEV);
+ LASSERT(lockh);
+ if (lsm->lsm_stripe_count > 1) {
+ lov_lockh = lov_handle2llh(lockh);
+ if (!lov_lockh) {
+ CERROR("LOV: invalid lov lock handle %p\n", lockh);
+ RETURN(-EINVAL);
+ }
+
+ lov_lockhp = lov_lockh->llh_handles;
+ } else {
+ lov_lockhp = lockh;
+ }
+
lov = &exp->exp_obd->u.lov;
- loi = lsm->lsm_oinfo;
+ for (i = 0, loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count;
+ i++, loi++, lov_lockhp++) {
+ struct lov_stripe_md submd;
+ int err;
- /* XXX LOV STACKING: submd should be from the subobj */
- submd.lsm_object_id = loi->loi_id;
- submd.lsm_stripe_count = 0;
- rc = obd_cancel(lov->tgts[loi->loi_ost_idx].ltd_exp, &submd,
- mode, lockh);
- if (rc && lov->tgts[loi->loi_ost_idx].active)
- CERROR("error: cancel objid "LPX64" subobj "
- LPX64" on OST idx %d: rc = %d\n",
- lsm->lsm_object_id, loi->loi_id, loi->loi_ost_idx, rc);
- GOTO(out, rc);
-out:
+ if (lov_lockhp->cookie == 0) {
+ CDEBUG(D_HA, "lov idx %d subobj "LPX64" no lock?\n",
+ loi->loi_ost_idx, loi->loi_id);
+ continue;
+ }
+
+ /* XXX LOV STACKING: submd should be from the subobj */
+ submd.lsm_object_id = loi->loi_id;
+ submd.lsm_stripe_count = 0;
+ err = obd_cancel(lov->tgts[loi->loi_ost_idx].ltd_exp, &submd,
+ mode, lov_lockhp);
+ if (err) {
+ if (lov->tgts[loi->loi_ost_idx].active) {
+ CERROR("error: cancel objid "LPX64" subobj "
+ LPX64" on OST idx %d: rc = %d\n",
+ lsm->lsm_object_id,
+ loi->loi_id, loi->loi_ost_idx, err);
+ if (!rc)
+ rc = err;
+ }
+ }
+ }
+
+ if (lsm->lsm_stripe_count > 1)
+ lov_llh_destroy(lov_lockh);
+ if (lov_lockh != NULL)
+ lov_llh_put(lov_lockh);
RETURN(rc);
}
static int lov_get_info(struct obd_export *exp, __u32 keylen,
void *key, __u32 *vallen, void *val)
-{
+{
struct obd_device *obddev = class_exp2obd(exp);
struct lov_obd *lov = &obddev->u.lov;
int i;
for (i = 0, loi = data->lsm->lsm_oinfo;
i < data->lsm->lsm_stripe_count;
i++, loi++) {
- if (lov->tgts[loi->loi_ost_idx].ltd_exp ==
+ if (lov->tgts[loi->loi_ost_idx].ltd_exp ==
data->lock->l_conn_export) {
*stripe = i;
RETURN(0);
}
}
RETURN(-ENXIO);
+ } else if (keylen >= strlen("size_to_stripe") &&
+ strcmp(key, "size_to_stripe") == 0) {
+ struct {
+ int stripe_number;
+ __u64 size;
+ struct lov_stripe_md *lsm;
+ } *data = val;
+
+ if (*vallen < sizeof(*data))
+ RETURN(-EFAULT);
+
+ data->size = lov_size_to_stripe(data->lsm, data->size,
+ data->stripe_number);
+ RETURN(0);
} else if (keylen >= strlen("last_id") && strcmp(key, "last_id") == 0) {
obd_id *ids = val;
int rc, size = sizeof(obd_id);
}
+/* Merge rss if kms == 0
+ *
+ * Even when merging RSS, we will take the KMS value if it's larger.
+ * This prevents getattr from stomping on dirty cached pages which
+ * extend the file size. */
+__u64 lov_merge_size(struct lov_stripe_md *lsm, int kms)
+{
+ struct lov_oinfo *loi;
+ __u64 size = 0;
+ int i;
+
+ for (i = 0, loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count;
+ i++, loi++) {
+ obd_size lov_size, tmpsize;
+
+ tmpsize = loi->loi_kms;
+ if (kms == 0 && loi->loi_rss > tmpsize)
+ tmpsize = loi->loi_rss;
+
+ lov_size = lov_stripe_size(lsm, tmpsize, i);
+ if (lov_size > size)
+ size = lov_size;
+ }
+ return size;
+}
+EXPORT_SYMBOL(lov_merge_size);
+
+__u64 lov_merge_mtime(struct lov_stripe_md *lsm, __u64 current_time)
+{
+ struct lov_oinfo *loi;
+ int i;
+
+ for (i = 0, loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count;
+ i++, loi++) {
+ if (loi->loi_mtime > current_time)
+ current_time = loi->loi_mtime;
+ }
+ return current_time;
+}
+EXPORT_SYMBOL(lov_merge_mtime);
+
+#if 0
+struct lov_multi_wait {
+ struct ldlm_lock *lock;
+ wait_queue_t wait;
+ int completed;
+ int generation;
+};
-static int lov_lock_contains(struct obd_export *exp, struct lov_stripe_md *lsm,
- struct ldlm_lock *lock, obd_off offset)
+int lov_complete_many(struct obd_export *exp, struct lov_stripe_md *lsm,
+ struct lustre_handle *lockh)
{
+ struct lov_lock_handles *lov_lockh = NULL;
+ struct lustre_handle *lov_lockhp;
struct lov_obd *lov;
struct lov_oinfo *loi;
- struct lov_stripe_md submd;
- int rc;
+ struct lov_multi_wait *queues;
+ int rc = 0, i;
ENTRY;
- LASSERT(lsm != NULL);
- if (exp == NULL)
+ if (lsm_bad_magic(lsm))
+ RETURN(-EINVAL);
+
+ if (!exp || !exp->exp_obd)
RETURN(-ENODEV);
+ LASSERT(lockh != NULL);
+ if (lsm->lsm_stripe_count > 1) {
+ lov_lockh = lov_handle2llh(lockh);
+ if (lov_lockh == NULL) {
+ CERROR("LOV: invalid lov lock handle %p\n", lockh);
+ RETURN(-EINVAL);
+ }
+
+ lov_lockhp = lov_lockh->llh_handles;
+ } else {
+ lov_lockhp = lockh;
+ }
+
+ OBD_ALLOC(queues, lsm->lsm_stripe_count * sizeof(*queues));
+ if (queues == NULL)
+ GOTO(out, rc = -ENOMEM);
+
lov = &exp->exp_obd->u.lov;
- loi = lsm->lsm_oinfo;
+ for (i = 0, loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count;
+ i++, loi++, lov_lockhp++) {
+ struct ldlm_lock *lock;
+ struct obd_device *obd;
+ unsigned long irqflags;
+
+ lock = ldlm_handle2lock(lov_lockhp);
+ if (lock == NULL) {
+ CDEBUG(D_HA, "lov idx %d subobj "LPX64" no lock?\n",
+ loi->loi_ost_idx, loi->loi_id);
+ queues[i].completed = 1;
+ continue;
+ }
- if (lov->tgts[loi->loi_ost_idx].active == 0) {
- CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
- RETURN(-EIO);
+ queues[i].lock = lock;
+ init_waitqueue_entry(&(queues[i].wait), current);
+ add_wait_queue(lock->l_waitq, &(queues[i].wait));
+
+ obd = class_exp2obd(lock->l_conn_export);
+ if (obd != NULL)
+ imp = obd->u.cli.cl_import;
+ if (imp != NULL) {
+ spin_lock_irqsave(&imp->imp_lock, irqflags);
+ queues[i].generation = imp->imp_generation;
+ spin_unlock_irqrestore(&imp->imp_lock, irqflags);
+ }
}
-
- /* XXX submd is not fully initialized here */
- /* XXX LOV STACKING: submd should be from the subobj */
- submd.lsm_object_id = loi->loi_id;
- submd.lsm_stripe_count = 0;
- rc = obd_lock_contains(lov->tgts[loi->loi_ost_idx].ltd_exp, &submd,
- lock, offset);
+ lwi = LWI_TIMEOUT_INTR(obd_timeout * HZ, ldlm_expired_completion_wait,
+ interrupted_completion_wait, &lwd);
+ rc = l_wait_event_added(check_multi_complete(queues, lsm), &lwi);
+
+ for (i = 0; i < lsm->lsm_stripe_count; i++)
+ remove_wait_queue(lock->l_waitq, &(queues[i].wait));
+
+ if (rc == -EINTR || rc == -ETIMEDOUT) {
+
+
+ }
+
+ out:
+ if (lov_lockh != NULL)
+ lov_llh_put(lov_lockh);
RETURN(rc);
}
+#endif
+
+void lov_increase_kms(struct obd_export *exp, struct lov_stripe_md *lsm,
+ obd_off size)
+{
+ struct lov_oinfo *loi;
+ int stripe = 0;
+ __u64 kms;
+ ENTRY;
+
+ if (size > 0)
+ stripe = lov_stripe_number(lsm, size - 1);
+ kms = lov_size_to_stripe(lsm, size, stripe);
+ loi = &(lsm->lsm_oinfo[stripe]);
+
+ CDEBUG(D_INODE, "stripe %d KMS %sincreasing "LPU64"->"LPU64"\n",
+ stripe, kms > loi->loi_kms ? "" : "not ", loi->loi_kms, kms);
+ if (kms > loi->loi_kms)
+ loi->loi_kms = kms;
+ EXIT;
+}
+EXPORT_SYMBOL(lov_increase_kms);
struct obd_ops lov_obd_ops = {
o_owner: THIS_MODULE,
o_set_info: lov_set_info,
o_llog_init: lov_llog_init,
o_llog_finish: lov_llog_finish,
- o_lock_contains:lov_lock_contains,
o_notify: lov_notify,
};
}
mdc_get_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
- rc = ldlm_cli_enqueue(exp, req, obddev->obd_namespace, NULL, res_id,
- lock_type, NULL, 0, lock_mode, &flags,
- cb_completion, cb_blocking, cb_data, lockh);
+ rc = ldlm_cli_enqueue(exp, req, obddev->obd_namespace, res_id,
+ lock_type, NULL, lock_mode, &flags, cb_blocking,
+ cb_completion, NULL, cb_data, NULL, 0, NULL,
+ lockh);
mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
/* Similarly, if we're going to replay this request, we don't want to
lock_mode = lock->l_req_mode;
}
+ ldlm_lock_allow_match(lock);
LDLM_LOCK_PUT(lock);
}
mode = LCK_PR;
rc = ldlm_lock_match(exp->exp_obd->obd_namespace, flags,
- &res_id, LDLM_PLAIN, NULL, 0, LCK_PR,
- &lockh);
+ &res_id, LDLM_PLAIN, NULL, LCK_PR, &lockh);
if (!rc) {
mode = LCK_PW;
rc = ldlm_lock_match(exp->exp_obd->obd_namespace, flags,
- &res_id, LDLM_PLAIN, NULL, 0,
- LCK_PW, &lockh);
+ &res_id, LDLM_PLAIN, NULL, LCK_PW,
+ &lockh);
}
if (rc) {
memcpy(&it->d.lustre.it_lock_handle, &lockh,
LDLM_LOCK_PUT(lock);
memcpy(&old_lock, &lockh, sizeof(lockh));
if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
- LDLM_PLAIN, NULL, 0, LCK_NL, &old_lock)) {
+ LDLM_PLAIN, NULL, LCK_NL, &old_lock)) {
ldlm_lock_decref_and_cancel(&lockh,
it->d.lustre.it_lock_mode);
memcpy(&lockh, &old_lock, sizeof(old_lock));
NULL, NULL);
rc = l_wait_event(req->rq_reply_waitq, mdc_close_check_reply(req),
&lwi);
-
- if (rc == 0) {
+ if (rc == 0) {
rc = req->rq_repmsg->status;
if (req->rq_repmsg->type == PTL_RPC_MSG_ERR) {
- DEBUG_REQ(D_ERROR, req, "type == PTL_RPC_MSG_ERR, err = %d", rc);
+ DEBUG_REQ(D_ERROR, req, "type == PTL_RPC_MSG_ERR, err "
+ "= %d", rc);
if (rc > 0)
rc = -rc;
} else if (mod == NULL) {
- CERROR("Unexpected: can't find mdc_open_data, but the close "
- "succeeded. Please tell CFS.\n");
+ CERROR("Unexpected: can't find mdc_open_data, but the "
+ "close succeeded. Please tell CFS.\n");
}
}
#include "mds_internal.h"
+static int mds_intent_policy(struct ldlm_namespace *ns,
+ struct ldlm_lock **lockp, void *req_cookie,
+ ldlm_mode_t mode, int flags, void *data);
static int mds_postsetup(struct obd_device *obd);
static int mds_cleanup(struct obd_device *obd, int flags);
res_id.name[0] = de->d_inode->i_ino;
res_id.name[1] = de->d_inode->i_generation;
- rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, NULL,
- res_id, LDLM_PLAIN, NULL, 0, lock_mode,
- &flags, ldlm_completion_ast,
- mds_blocking_ast, NULL, lockh);
+ rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, res_id,
+ LDLM_PLAIN, NULL, lock_mode, &flags,
+ mds_blocking_ast, ldlm_completion_ast, NULL, NULL,
+ NULL, 0, NULL, lockh);
if (rc != ELDLM_OK) {
l_dput(de);
retval = ERR_PTR(-EIO); /* XXX translate ldlm code */
/* XXX layering violation! -phil */
l_lock(&lock->l_resource->lr_namespace->ns_lock);
- /* Get this: if mds_blocking_ast is racing with ldlm_intent_policy,
+ /* Get this: if mds_blocking_ast is racing with mds_intent_policy,
* such that mds_blocking_ast is called just before l_i_p takes the
* ns_lock, then by the time we get the lock, we might not be the
* correct blocking function anymore. So check, and return early, if
DEBUG_REQ(D_INODE, req, "enqueue");
OBD_FAIL_RETURN(OBD_FAIL_LDLM_ENQUEUE, 0);
rc = ldlm_handle_enqueue(req, ldlm_server_completion_ast,
- ldlm_server_blocking_ast);
+ ldlm_server_blocking_ast, NULL);
break;
case LDLM_CONVERT:
DEBUG_REQ(D_INODE, req, "convert");
mds_cleanup(obd, 0);
GOTO(err_put, rc = -ENOMEM);
}
+ ldlm_register_intent(obd->obd_namespace, mds_intent_policy);
rc = mds_fs_setup(obd, mnt);
if (rc) {
rep->lock_policy_res1 |= flag;
}
-static int ldlm_intent_policy(struct ldlm_namespace *ns,
- struct ldlm_lock **lockp, void *req_cookie,
- ldlm_mode_t mode, int flags, void *data)
+static int mds_intent_policy(struct ldlm_namespace *ns,
+ struct ldlm_lock **lockp, void *req_cookie,
+ ldlm_mode_t mode, int flags, void *data)
{
struct ptlrpc_request *req = req_cookie;
struct ldlm_lock *lock = *lockp;
- int rc;
+ struct ldlm_intent *it;
+ struct mds_obd *mds = &req->rq_export->exp_obd->u.mds;
+ struct ldlm_reply *rep;
+ struct lustre_handle lockh = { 0 };
+ struct ldlm_lock *new_lock;
+ int rc, offset = 2, repsize[4] = {sizeof(struct ldlm_reply),
+ sizeof(struct mds_body),
+ mds->mds_max_mdsize,
+ mds->mds_max_cookiesize};
ENTRY;
- if (!req_cookie)
+ LASSERT(req != NULL);
+
+ if (req->rq_reqmsg->bufcount <= 1) {
+ /* No intent was provided */
+ int size = sizeof(struct ldlm_reply);
+ rc = lustre_pack_reply(req, 1, &size, NULL);
+ LASSERT(rc == 0);
RETURN(0);
+ }
- if (req->rq_reqmsg->bufcount > 1) {
- /* an intent needs to be considered */
- struct ldlm_intent *it;
- struct mds_obd *mds = &req->rq_export->exp_obd->u.mds;
- struct ldlm_reply *rep;
- struct lustre_handle lockh = { 0 };
- struct ldlm_lock *new_lock;
- int offset = 2, repsize[4] = {sizeof(struct ldlm_reply),
- sizeof(struct mds_body),
- mds->mds_max_mdsize,
- mds->mds_max_cookiesize};
-
- it = lustre_swab_reqbuf(req, 1, sizeof (*it),
- lustre_swab_ldlm_intent);
- if (it == NULL) {
- CERROR ("Intent missing\n");
- req->rq_status = -EFAULT;
- RETURN(req->rq_status);
- }
+ it = lustre_swab_reqbuf(req, 1, sizeof(*it), lustre_swab_ldlm_intent);
+ if (it == NULL) {
+ CERROR("Intent missing\n");
+ RETURN(req->rq_status = -EFAULT);
+ }
- LDLM_DEBUG(lock, "intent policy, opc: %s",
- ldlm_it2str(it->opc));
+ LDLM_DEBUG(lock, "intent policy, opc: %s", ldlm_it2str(it->opc));
- rc = lustre_pack_reply(req, it->opc == IT_UNLINK ? 4 : 3,
- repsize, NULL);
- if (rc)
- RETURN(req->rq_status = rc);
+ rc = lustre_pack_reply(req, it->opc == IT_UNLINK ? 4 : 3, repsize,
+ NULL);
+ if (rc)
+ RETURN(req->rq_status = rc);
- rep = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*rep));
- intent_set_disposition(rep, DISP_IT_EXECD);
+ rep = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*rep));
+ intent_set_disposition(rep, DISP_IT_EXECD);
- fixup_handle_for_resent_req(req, lock, &lockh);
+ fixup_handle_for_resent_req(req, lock, &lockh);
- /* execute policy */
- switch ((long)it->opc) {
- case IT_OPEN:
- case IT_CREAT|IT_OPEN:
- /* XXX swab here to assert that an mds_open reint
- * packet is following */
- rep->lock_policy_res2 = mds_reint(req, offset, &lockh);
+ /* execute policy */
+ switch ((long)it->opc) {
+ case IT_OPEN:
+ case IT_CREAT|IT_OPEN:
+ /* XXX swab here to assert that an mds_open reint
+ * packet is following */
+ rep->lock_policy_res2 = mds_reint(req, offset, &lockh);
#if 0
- /* We abort the lock if the lookup was negative and
- * we did not make it to the OPEN portion */
- if (!intent_disposition(rep, DISP_LOOKUP_EXECD))
- RETURN(ELDLM_LOCK_ABORTED);
- if (intent_disposition(rep, DISP_LOOKUP_NEG) &&
- !intent_disposition(rep, DISP_OPEN_OPEN))
+ /* We abort the lock if the lookup was negative and
+ * we did not make it to the OPEN portion */
+ if (!intent_disposition(rep, DISP_LOOKUP_EXECD))
+ RETURN(ELDLM_LOCK_ABORTED);
+ if (intent_disposition(rep, DISP_LOOKUP_NEG) &&
+ !intent_disposition(rep, DISP_OPEN_OPEN))
#endif
- RETURN(ELDLM_LOCK_ABORTED);
- break;
- case IT_GETATTR:
- case IT_LOOKUP:
- case IT_READDIR:
- rep->lock_policy_res2 = mds_getattr_name(offset, req,
- &lockh);
- /* FIXME: LDLM can set req->rq_status. MDS sets
- policy_res{1,2} with disposition and status.
- - replay: returns 0 & req->status is old status
- - otherwise: returns req->status */
- if (intent_disposition(rep, DISP_LOOKUP_NEG))
- rep->lock_policy_res2 = 0;
- if (!intent_disposition(rep, DISP_LOOKUP_POS) ||
- rep->lock_policy_res2)
- RETURN(ELDLM_LOCK_ABORTED);
- if (req->rq_status != 0) {
- LBUG();
- rep->lock_policy_res2 = req->rq_status;
- RETURN(ELDLM_LOCK_ABORTED);
- }
- break;
- default:
- CERROR("Unhandled intent "LPD64"\n", it->opc);
+ RETURN(ELDLM_LOCK_ABORTED);
+ break;
+ case IT_GETATTR:
+ case IT_LOOKUP:
+ case IT_READDIR:
+ rep->lock_policy_res2 = mds_getattr_name(offset, req, &lockh);
+ /* FIXME: LDLM can set req->rq_status. MDS sets
+ policy_res{1,2} with disposition and status.
+ - replay: returns 0 & req->status is old status
+ - otherwise: returns req->status */
+ if (intent_disposition(rep, DISP_LOOKUP_NEG))
+ rep->lock_policy_res2 = 0;
+ if (!intent_disposition(rep, DISP_LOOKUP_POS) ||
+ rep->lock_policy_res2)
+ RETURN(ELDLM_LOCK_ABORTED);
+ if (req->rq_status != 0) {
LBUG();
+ rep->lock_policy_res2 = req->rq_status;
+ RETURN(ELDLM_LOCK_ABORTED);
}
+ break;
+ default:
+ CERROR("Unhandled intent "LPD64"\n", it->opc);
+ LBUG();
+ }
- /* By this point, whatever function we called above must have
- * either filled in 'lockh', been an intent replay, or returned
- * an error. We want to allow replayed RPCs to not get a lock,
- * since we would just drop it below anyways because lock replay
- * is done separately by the client afterwards. For regular
- * RPCs we want to give the new lock to the client instead of
- * whatever lock it was about to get.
- */
- new_lock = ldlm_handle2lock(&lockh);
- if (new_lock == NULL && (flags & LDLM_FL_INTENT_ONLY))
- RETURN(0);
-
- LASSERT(new_lock != NULL);
+ /* By this point, whatever function we called above must have either
+ * filled in 'lockh', been an intent replay, or returned an error. We
+ * want to allow replayed RPCs to not get a lock, since we would just
+ * drop it below anyways because lock replay is done separately by the
+ * client afterwards. For regular RPCs we want to give the new lock to
+ * the client instead of whatever lock it was about to get. */
+ new_lock = ldlm_handle2lock(&lockh);
+ if (new_lock == NULL && (flags & LDLM_FL_INTENT_ONLY))
+ RETURN(0);
- /* If we've already given this lock to a client once, then we
- * should have no readers or writers. Otherwise, we should
- * have one reader _or_ writer ref (which will be zeroed below)
- * before returning the lock to a client.
- */
- if (new_lock->l_export == req->rq_export) {
- LASSERT(new_lock->l_readers + new_lock->l_writers == 0);
- } else {
- LASSERT(new_lock->l_export == NULL);
- LASSERT(new_lock->l_readers + new_lock->l_writers == 1);
- }
+ LASSERT(new_lock != NULL);
- *lockp = new_lock;
+ /* If we've already given this lock to a client once, then we should
+ * have no readers or writers. Otherwise, we should have one reader
+ * _or_ writer ref (which will be zeroed below) before returning the
+ * lock to a client. */
+ if (new_lock->l_export == req->rq_export) {
+ LASSERT(new_lock->l_readers + new_lock->l_writers == 0);
+ } else {
+ LASSERT(new_lock->l_export == NULL);
+ LASSERT(new_lock->l_readers + new_lock->l_writers == 1);
+ }
- if (new_lock->l_export == req->rq_export) {
- /* Already gave this to the client, which means that we
- * reconstructed a reply. */
- LASSERT(lustre_msg_get_flags(req->rq_reqmsg) &
- MSG_RESENT);
- RETURN(ELDLM_LOCK_REPLACED);
- }
+ *lockp = new_lock;
- /* Fixup the lock to be given to the client */
- l_lock(&new_lock->l_resource->lr_namespace->ns_lock);
- new_lock->l_readers = 0;
- new_lock->l_writers = 0;
+ if (new_lock->l_export == req->rq_export) {
+ /* Already gave this to the client, which means that we
+ * reconstructed a reply. */
+ LASSERT(lustre_msg_get_flags(req->rq_reqmsg) &
+ MSG_RESENT);
+ RETURN(ELDLM_LOCK_REPLACED);
+ }
- new_lock->l_export = class_export_get(req->rq_export);
- list_add(&new_lock->l_export_chain,
- &new_lock->l_export->exp_ldlm_data.led_held_locks);
+ /* Fixup the lock to be given to the client */
+ l_lock(&new_lock->l_resource->lr_namespace->ns_lock);
+ new_lock->l_readers = 0;
+ new_lock->l_writers = 0;
- new_lock->l_blocking_ast = lock->l_blocking_ast;
- new_lock->l_completion_ast = lock->l_completion_ast;
+ new_lock->l_export = class_export_get(req->rq_export);
+ list_add(&new_lock->l_export_chain,
+ &new_lock->l_export->exp_ldlm_data.led_held_locks);
- memcpy(&new_lock->l_remote_handle, &lock->l_remote_handle,
- sizeof(lock->l_remote_handle));
+ new_lock->l_blocking_ast = lock->l_blocking_ast;
+ new_lock->l_completion_ast = lock->l_completion_ast;
- new_lock->l_flags &= ~LDLM_FL_LOCAL;
+ memcpy(&new_lock->l_remote_handle, &lock->l_remote_handle,
+ sizeof(lock->l_remote_handle));
- LDLM_LOCK_PUT(new_lock);
- l_unlock(&new_lock->l_resource->lr_namespace->ns_lock);
+ new_lock->l_flags &= ~LDLM_FL_LOCAL;
- RETURN(ELDLM_LOCK_REPLACED);
- } else {
- int size = sizeof(struct ldlm_reply);
- rc = lustre_pack_reply(req, 1, &size, NULL);
- if (rc) {
- LBUG();
- RETURN(-ENOMEM);
- }
- }
- RETURN(0);
+ LDLM_LOCK_PUT(new_lock);
+ l_unlock(&new_lock->l_resource->lr_namespace->ns_lock);
+
+ RETURN(ELDLM_LOCK_REPLACED);
}
int mds_attach(struct obd_device *dev, obd_count len, void *data)
class_register_type(&mds_obd_ops, lvars.module_vars, LUSTRE_MDS_NAME);
lprocfs_init_multi_vars(1, &lvars);
class_register_type(&mdt_obd_ops, lvars.module_vars, LUSTRE_MDT_NAME);
- ldlm_register_intent(ldlm_intent_policy);
return 0;
}
static void /*__exit*/ mds_exit(void)
{
- ldlm_unregister_intent();
class_unregister_type(LUSTRE_MDS_NAME);
class_unregister_type(LUSTRE_MDT_NAME);
}
if (child_lockh == NULL)
child_lockh = &lockh;
- rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, NULL,
- child_res_id, LDLM_PLAIN, NULL, 0,
- LCK_EX, &lock_flags, ldlm_completion_ast,
- mds_blocking_ast, NULL, child_lockh);
+ rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, child_res_id,
+ LDLM_PLAIN, NULL, LCK_EX, &lock_flags,
+ mds_blocking_ast, ldlm_completion_ast, NULL, NULL,
+ NULL, 0, NULL, child_lockh);
if (rc != ELDLM_OK)
CERROR("ldlm_cli_enqueue: %d\n", rc);
else if (child_lockh == &lockh)
* makes it out to the client but the unlink's does not.
* See bug 2029 for more detail.*/
rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace,
- NULL, child_res_id, LDLM_PLAIN,
- NULL, 0, LCK_EX, &lock_flags,
- ldlm_completion_ast,
- mds_blocking_ast, NULL,
- &child_ino_lockh);
+ child_res_id, LDLM_PLAIN, NULL,
+ LCK_EX, &lock_flags,
+ mds_blocking_ast,
+ ldlm_completion_ast, NULL, NULL,
+ NULL, 0, NULL, &child_ino_lockh);
if (rc != ELDLM_OK) {
CERROR("error locking for unlink/create sync: "
"%d\n", rc);
res_id[0]->name[0], res_id[1]->name[0]);
flags = LDLM_FL_LOCAL_ONLY;
- rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, NULL, *res_id[0],
- LDLM_PLAIN, NULL, 0, lock_modes[0], &flags,
- ldlm_completion_ast, mds_blocking_ast, NULL,
- handles[0]);
+ rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, *res_id[0],
+ LDLM_PLAIN, NULL, lock_modes[0], &flags,
+ mds_blocking_ast, ldlm_completion_ast, NULL, NULL,
+ NULL, 0, NULL, handles[0]);
if (rc != ELDLM_OK)
RETURN(-EIO);
ldlm_lock_dump_handle(D_OTHER, handles[0]);
ldlm_lock_addref(handles[1], lock_modes[1]);
} else if (res_id[1]->name[0] != 0) {
flags = LDLM_FL_LOCAL_ONLY;
- rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, NULL,
- *res_id[1], LDLM_PLAIN, NULL, 0,
- lock_modes[1], &flags,ldlm_completion_ast,
- mds_blocking_ast, NULL, handles[1]);
+ rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace,
+ *res_id[1], LDLM_PLAIN, NULL,
+ lock_modes[1], &flags, mds_blocking_ast,
+ ldlm_completion_ast, NULL, NULL, NULL, 0,
+ NULL, handles[1]);
if (rc != ELDLM_OK) {
ldlm_lock_decref(handles[0], lock_modes[0]);
RETURN(-EIO);
ldlm_lock_addref(dlm_handles[i], lock_modes[i]);
} else {
rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace,
- NULL, *res_id[i], LDLM_PLAIN,
- NULL, 0, lock_modes[i], &flags,
- ldlm_completion_ast,
- mds_blocking_ast, NULL,
- dlm_handles[i]);
+ *res_id[i], LDLM_PLAIN, NULL,
+ lock_modes[i], &flags,
+ mds_blocking_ast,
+ ldlm_completion_ast, NULL, NULL,
+ NULL, 0, NULL, dlm_handles[i]);
if (rc != ELDLM_OK)
GOTO(out_err, rc = -EIO);
ldlm_lock_dump_handle(D_OTHER, dlm_handles[i]);
}
rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace,
- NULL, *child_res_id, LDLM_PLAIN,
- NULL, 0, child_mode, &flags,
- ldlm_completion_ast, mds_blocking_ast,
+ *child_res_id, LDLM_PLAIN, NULL,
+ child_mode, &flags, mds_blocking_ast,
+ ldlm_completion_ast, NULL, NULL, NULL, 0,
NULL, child_lockh);
if (rc != ELDLM_OK)
GOTO(cleanup, rc = -EIO);
#ifdef __KERNEL__
#include <linux/lustre_version.h>
#define LUSTRE_MIN_VERSION 28
-#define LUSTRE_MAX_VERSION 32
+#define LUSTRE_MAX_VERSION 33
#if (LUSTRE_KERNEL_VERSION < LUSTRE_MIN_VERSION)
# error Cannot continue: Your Lustre kernel patch is older than the sources
#elif (LUSTRE_KERNEL_VERSION > LUSTRE_MAX_VERSION)
LPROCFS_OBD_OP_INIT(num_private_stats, stats, san_preprw);
LPROCFS_OBD_OP_INIT(num_private_stats, stats, init_export);
LPROCFS_OBD_OP_INIT(num_private_stats, stats, destroy_export);
- LPROCFS_OBD_OP_INIT(num_private_stats, stats, lock_contains);
LPROCFS_OBD_OP_INIT(num_private_stats, stats, llog_init);
LPROCFS_OBD_OP_INIT(num_private_stats, stats, llog_finish);
LPROCFS_OBD_OP_INIT(num_private_stats, stats, pin);
+++ /dev/null
-/*
- Red Black Trees
- (C) 1999 Andrea Arcangeli <andrea@suse.de>
-
- This program is free software; you can redistribute it and/or modify
- it under the terms of the GNU General Public License as published by
- the Free Software Foundation; either version 2 of the License, or
- (at your option) any later version.
-
- This program is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU General Public License for more details.
-
- You should have received a copy of the GNU General Public License
- along with this program; if not, write to the Free Software
- Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
-
- linux/lib/rbtree.c
-
- rb_get_first and rb_get_next written by Theodore Ts'o, 9/8/2002
-*/
-
-#include <liblustre.h>
-#include <linux/rbtree.h>
-
-static void __rb_rotate_left(rb_node_t * node, rb_root_t * root)
-{
- rb_node_t * right = node->rb_right;
-
- if ((node->rb_right = right->rb_left))
- right->rb_left->rb_parent = node;
- right->rb_left = node;
-
- if ((right->rb_parent = node->rb_parent))
- {
- if (node == node->rb_parent->rb_left)
- node->rb_parent->rb_left = right;
- else
- node->rb_parent->rb_right = right;
- }
- else
- root->rb_node = right;
- node->rb_parent = right;
-}
-
-static void __rb_rotate_right(rb_node_t * node, rb_root_t * root)
-{
- rb_node_t * left = node->rb_left;
-
- if ((node->rb_left = left->rb_right))
- left->rb_right->rb_parent = node;
- left->rb_right = node;
-
- if ((left->rb_parent = node->rb_parent))
- {
- if (node == node->rb_parent->rb_right)
- node->rb_parent->rb_right = left;
- else
- node->rb_parent->rb_left = left;
- }
- else
- root->rb_node = left;
- node->rb_parent = left;
-}
-
-void rb_insert_color(rb_node_t * node, rb_root_t * root)
-{
- rb_node_t * parent, * gparent;
-
- while ((parent = node->rb_parent) && parent->rb_color == RB_RED)
- {
- gparent = parent->rb_parent;
-
- if (parent == gparent->rb_left)
- {
- {
- register rb_node_t * uncle = gparent->rb_right;
- if (uncle && uncle->rb_color == RB_RED)
- {
- uncle->rb_color = RB_BLACK;
- parent->rb_color = RB_BLACK;
- gparent->rb_color = RB_RED;
- node = gparent;
- continue;
- }
- }
-
- if (parent->rb_right == node)
- {
- register rb_node_t * tmp;
- __rb_rotate_left(parent, root);
- tmp = parent;
- parent = node;
- node = tmp;
- }
-
- parent->rb_color = RB_BLACK;
- gparent->rb_color = RB_RED;
- __rb_rotate_right(gparent, root);
- } else {
- {
- register rb_node_t * uncle = gparent->rb_left;
- if (uncle && uncle->rb_color == RB_RED)
- {
- uncle->rb_color = RB_BLACK;
- parent->rb_color = RB_BLACK;
- gparent->rb_color = RB_RED;
- node = gparent;
- continue;
- }
- }
-
- if (parent->rb_left == node)
- {
- register rb_node_t * tmp;
- __rb_rotate_right(parent, root);
- tmp = parent;
- parent = node;
- node = tmp;
- }
-
- parent->rb_color = RB_BLACK;
- gparent->rb_color = RB_RED;
- __rb_rotate_left(gparent, root);
- }
- }
-
- root->rb_node->rb_color = RB_BLACK;
-}
-EXPORT_SYMBOL(rb_insert_color);
-
-static void __rb_erase_color(rb_node_t * node, rb_node_t * parent,
- rb_root_t * root)
-{
- rb_node_t * other;
-
- while ((!node || node->rb_color == RB_BLACK) && node != root->rb_node)
- {
- if (parent->rb_left == node)
- {
- other = parent->rb_right;
- if (other->rb_color == RB_RED)
- {
- other->rb_color = RB_BLACK;
- parent->rb_color = RB_RED;
- __rb_rotate_left(parent, root);
- other = parent->rb_right;
- }
- if ((!other->rb_left ||
- other->rb_left->rb_color == RB_BLACK)
- && (!other->rb_right ||
- other->rb_right->rb_color == RB_BLACK))
- {
- other->rb_color = RB_RED;
- node = parent;
- parent = node->rb_parent;
- }
- else
- {
- if (!other->rb_right ||
- other->rb_right->rb_color == RB_BLACK)
- {
- register rb_node_t * o_left;
- if ((o_left = other->rb_left))
- o_left->rb_color = RB_BLACK;
- other->rb_color = RB_RED;
- __rb_rotate_right(other, root);
- other = parent->rb_right;
- }
- other->rb_color = parent->rb_color;
- parent->rb_color = RB_BLACK;
- if (other->rb_right)
- other->rb_right->rb_color = RB_BLACK;
- __rb_rotate_left(parent, root);
- node = root->rb_node;
- break;
- }
- }
- else
- {
- other = parent->rb_left;
- if (other->rb_color == RB_RED)
- {
- other->rb_color = RB_BLACK;
- parent->rb_color = RB_RED;
- __rb_rotate_right(parent, root);
- other = parent->rb_left;
- }
- if ((!other->rb_left ||
- other->rb_left->rb_color == RB_BLACK)
- && (!other->rb_right ||
- other->rb_right->rb_color == RB_BLACK))
- {
- other->rb_color = RB_RED;
- node = parent;
- parent = node->rb_parent;
- }
- else
- {
- if (!other->rb_left ||
- other->rb_left->rb_color == RB_BLACK)
- {
- register rb_node_t * o_right;
- if ((o_right = other->rb_right))
- o_right->rb_color = RB_BLACK;
- other->rb_color = RB_RED;
- __rb_rotate_left(other, root);
- other = parent->rb_left;
- }
- other->rb_color = parent->rb_color;
- parent->rb_color = RB_BLACK;
- if (other->rb_left)
- other->rb_left->rb_color = RB_BLACK;
- __rb_rotate_right(parent, root);
- node = root->rb_node;
- break;
- }
- }
- }
- if (node)
- node->rb_color = RB_BLACK;
-}
-
-void rb_erase(rb_node_t * node, rb_root_t * root)
-{
- rb_node_t * child, * parent;
- int color;
-
- if (!node->rb_left)
- child = node->rb_right;
- else if (!node->rb_right)
- child = node->rb_left;
- else
- {
- rb_node_t * old = node, * left;
-
- node = node->rb_right;
- while ((left = node->rb_left))
- node = left;
- child = node->rb_right;
- parent = node->rb_parent;
- color = node->rb_color;
-
- if (child)
- child->rb_parent = parent;
- if (parent)
- {
- if (parent->rb_left == node)
- parent->rb_left = child;
- else
- parent->rb_right = child;
- }
- else
- root->rb_node = child;
-
- if (node->rb_parent == old)
- parent = node;
- node->rb_parent = old->rb_parent;
- node->rb_color = old->rb_color;
- node->rb_right = old->rb_right;
- node->rb_left = old->rb_left;
-
- if (old->rb_parent)
- {
- if (old->rb_parent->rb_left == old)
- old->rb_parent->rb_left = node;
- else
- old->rb_parent->rb_right = node;
- } else
- root->rb_node = node;
-
- old->rb_left->rb_parent = node;
- if (old->rb_right)
- old->rb_right->rb_parent = node;
- goto color;
- }
-
- parent = node->rb_parent;
- color = node->rb_color;
-
- if (child)
- child->rb_parent = parent;
- if (parent)
- {
- if (parent->rb_left == node)
- parent->rb_left = child;
- else
- parent->rb_right = child;
- }
- else
- root->rb_node = child;
-
- color:
- if (color == RB_BLACK)
- __rb_erase_color(child, parent, root);
-}
-EXPORT_SYMBOL(rb_erase);
-
-/*
- * This function returns the first node (in sort order) of the tree.
- */
-rb_node_t *rb_get_first(rb_root_t *root)
-{
- rb_node_t *n;
-
- n = root->rb_node;
- if (!n)
- return 0;
- while (n->rb_left)
- n = n->rb_left;
- return n;
-}
-EXPORT_SYMBOL(rb_get_first);
-
-/*
- * Given a node, this function will return the next node in the tree.
- */
-rb_node_t *rb_get_next(rb_node_t *n)
-{
- rb_node_t *parent;
-
- if (n->rb_right) {
- n = n->rb_right;
- while (n->rb_left)
- n = n->rb_left;
- return n;
- } else {
- while ((parent = n->rb_parent)) {
- if (n == parent->rb_left)
- return parent;
- n = parent;
- }
- return 0;
- }
-}
-EXPORT_SYMBOL(rb_get_next);
-
ecl->ecl_mode = mode;
ecl->ecl_object = eco;
- ecl->ecl_extent.start = offset;
- ecl->ecl_extent.end = (nob == 0) ? ((obd_off) -1) : (offset + nob - 1);
+ ecl->ecl_policy.l_extent.start = offset;
+ ecl->ecl_policy.l_extent.end =
+ (nob == 0) ? ((obd_off) -1) : (offset + nob - 1);
flags = 0;
- rc = obd_enqueue(ec->ec_exp, eco->eco_lsm, NULL, LDLM_EXTENT,
- &ecl->ecl_extent,sizeof(ecl->ecl_extent), mode,
- &flags, echo_ldlm_callback, eco,
- &ecl->ecl_lock_handle);
+ rc = obd_enqueue(ec->ec_exp, eco->eco_lsm, LDLM_EXTENT,
+ &ecl->ecl_policy, mode, &flags, echo_ldlm_callback,
+ ldlm_completion_ast, NULL, eco, sizeof(struct ost_lvb),
+ lustre_swab_ost_lvb, &ecl->ecl_lock_handle);
if (rc != 0)
goto failed_1;
modulefs_DATA = obdfilter.o
EXTRA_PROGRAMS = obdfilter
obdfilter_SOURCES = filter.c filter_io.c filter_log.c filter_san.c \
- filter_io_24.c lproc_obdfilter.c filter_internal.h
+ filter_io_24.c lproc_obdfilter.c filter_internal.h filter_lvb.c
include $(top_srcdir)/Rules
RETURN(0);
}
-static int filter_lock_dentry(struct obd_device *obd, struct dentry *de,
- ldlm_mode_t lock_mode,struct lustre_handle *lockh)
+static int filter_lock_dentry(struct obd_device *obd, struct dentry *dparent)
{
- struct ldlm_res_id res_id = { .name = {0} };
- int flags = 0, rc;
- ENTRY;
-
- res_id.name[0] = de->d_inode->i_ino;
- res_id.name[1] = de->d_inode->i_generation;
- rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, NULL,
- res_id, LDLM_PLAIN, NULL, 0, lock_mode,
- &flags, ldlm_completion_ast,
- filter_blocking_ast, NULL, lockh);
-
- RETURN(rc == ELDLM_OK ? 0 : -EIO); /* XXX translate ldlm code */
+ down(&dparent->d_inode->i_sem);
+ return 0;
}
/* We never dget the object parent, so DON'T dput it either */
-static void filter_parent_unlock(struct dentry *dparent,
- struct lustre_handle *lockh,
- ldlm_mode_t lock_mode)
+static void filter_parent_unlock(struct dentry *dparent)
{
- ldlm_lock_decref(lockh, lock_mode);
+ up(&dparent->d_inode->i_sem);
}
/* We never dget the object parent, so DON'T dput it either */
/* We never dget the object parent, so DON'T dput it either */
struct dentry *filter_parent_lock(struct obd_device *obd, obd_gr group,
- obd_id objid, ldlm_mode_t lock_mode,
- struct lustre_handle *lockh)
+ obd_id objid)
{
unsigned long now = jiffies;
- struct dentry *de = filter_parent(obd, group, objid);
+ struct dentry *dparent = filter_parent(obd, group, objid);
int rc;
- if (IS_ERR(de))
- return de;
+ if (IS_ERR(dparent))
+ return dparent;
- rc = filter_lock_dentry(obd, de, lock_mode, lockh);
+ rc = filter_lock_dentry(obd, dparent);
if (time_after(jiffies, now + 15 * HZ))
CERROR("slow parent lock %lus\n", (jiffies - now) / HZ);
- return rc ? ERR_PTR(rc) : de;
+ return rc ? ERR_PTR(rc) : dparent;
}
/* How to get files, dentries, inodes from object id's.
struct dentry *dir_dentry,
obd_gr group, obd_id id)
{
- struct lustre_handle lockh;
struct dentry *dparent = dir_dentry;
struct dentry *dchild;
char name[32];
len = sprintf(name, LPU64, id);
if (dir_dentry == NULL) {
- dparent = filter_parent_lock(obd, group, id, LCK_PR, &lockh);
+ dparent = filter_parent_lock(obd, group, id);
if (IS_ERR(dparent))
RETURN(dparent);
}
CDEBUG(D_INODE, "looking up object O/%*s/%s\n",
dparent->d_name.len, dparent->d_name.name, name);
- dchild = ll_lookup_one_len(name, dparent, len);
+ dchild = /*ll_*/lookup_one_len(name, dparent, len);
if (dir_dentry == NULL)
- filter_parent_unlock(dparent, &lockh, LCK_PR);
+ filter_parent_unlock(dparent);
if (IS_ERR(dchild)) {
CERROR("child lookup error %ld\n", PTR_ERR(dchild));
RETURN(dchild);
struct lustre_handle lockh;
int flags = LDLM_AST_DISCARD_DATA, rc;
struct ldlm_res_id res_id = { .name = { objid } };
- struct ldlm_extent extent = { 0, OBD_OBJECT_EOF };
- ENTRY;
+ ldlm_policy_data_t policy = { .l_extent = { 0, OBD_OBJECT_EOF } };
+ ENTRY;
/* Tell the clients that the object is gone now and that they should
- * throw away any cached pages. If we're the OST at stripe 0 in the
- * file then this enqueue will communicate the DISCARD to all the
- * clients. This assumes that we always destroy all the objects for
- * a file at a time, as is currently the case. If we're not the
- * OST at stripe 0 then we'll harmlessly get a very lonely lock in
- * the local DLM and immediately drop it. */
- rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, NULL,
- res_id, LDLM_EXTENT, &extent,
- sizeof(extent), LCK_PW, &flags,
- ldlm_completion_ast, filter_blocking_ast,
- NULL, &lockh);
+ * throw away any cached pages. */
+ rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, res_id,
+ LDLM_EXTENT, &policy, LCK_PW,
+ &flags, filter_blocking_ast, ldlm_completion_ast,
+ NULL, NULL, NULL, 0, NULL, &lockh);
/* We only care about the side-effects, just drop the lock. */
if (rc == ELDLM_OK)
RETURN(rc);
}
+static int filter_intent_policy(struct ldlm_namespace *ns,
+ struct ldlm_lock **lockp, void *req_cookie,
+ ldlm_mode_t mode, int flags, void *data)
+{
+ struct list_head rpc_list = LIST_HEAD_INIT(rpc_list);
+ struct ptlrpc_request *req = req_cookie;
+ struct ldlm_lock *lock = *lockp, *l = NULL;
+ struct ldlm_resource *res = lock->l_resource;
+ ldlm_processing_policy policy;
+ struct ost_lvb *res_lvb, *reply_lvb;
+ struct list_head *tmp;
+ ldlm_error_t err;
+ int tmpflags = 0, rc, repsize[2] = {sizeof(struct ldlm_reply),
+ sizeof(struct ost_lvb) };
+ ENTRY;
+
+ policy = ldlm_get_processing_policy(res);
+ LASSERT(policy != NULL);
+ LASSERT(req != NULL);
+
+ rc = lustre_pack_reply(req, 2, repsize, NULL);
+ if (rc)
+ RETURN(req->rq_status = rc);
+
+ reply_lvb = lustre_msg_buf(req->rq_repmsg, 1, sizeof(*reply_lvb));
+ LASSERT(reply_lvb != NULL);
+
+ //fixup_handle_for_resent_req(req, lock, &lockh);
+
+ /* If we grant any lock at all, it will be a whole-file read lock.
+ * Call the extent policy function to see if our request can be
+ * granted, or is blocked. */
+ lock->l_policy_data.l_extent.start = 0;
+ lock->l_policy_data.l_extent.end = OBD_OBJECT_EOF;
+ lock->l_req_mode = LCK_PR;
+
+ l_lock(&res->lr_namespace->ns_lock);
+
+ res->lr_tmp = &rpc_list;
+ rc = policy(lock, &tmpflags, 0, &err);
+ res->lr_tmp = NULL;
+
+ /* FIXME: we should change the policy function slightly, to not make
+ * this list at all, since we just turn around and free it */
+ while (!list_empty(&rpc_list)) {
+ struct ldlm_ast_work *w =
+ list_entry(rpc_list.next, struct ldlm_ast_work, w_list);
+ list_del(&w->w_list);
+ LDLM_LOCK_PUT(w->w_lock);
+ OBD_FREE(w, sizeof(*w));
+ }
+
+ if (rc == LDLM_ITER_CONTINUE) {
+ /* The lock met with no resistance; we're finished. */
+ l_unlock(&res->lr_namespace->ns_lock);
+ RETURN(ELDLM_LOCK_REPLACED);
+ }
+
+ /* Do not grant any lock, but instead send GL callbacks. The extent
+ * policy nicely created a list of all PW locks for us. We will choose
+ * the highest of those which are larger than the size in the LVB, if
+ * any, and perform a glimpse callback. */
+ down(&res->lr_lvb_sem);
+ res_lvb = res->lr_lvb_data;
+ LASSERT(res_lvb != NULL);
+ reply_lvb->lvb_size = res_lvb->lvb_size;
+ up(&res->lr_lvb_sem);
+
+ list_for_each(tmp, &res->lr_granted) {
+ struct ldlm_lock *tmplock =
+ list_entry(tmp, struct ldlm_lock, l_res_link);
+
+ if (tmplock->l_granted_mode == LCK_PR)
+ continue;
+
+ if (tmplock->l_policy_data.l_extent.end <=
+ reply_lvb->lvb_size)
+ continue;
+
+ if (l == NULL) {
+ l = LDLM_LOCK_GET(tmplock);
+ continue;
+ }
+
+ if (l->l_policy_data.l_extent.start >
+ tmplock->l_policy_data.l_extent.start)
+ continue;
+
+ LDLM_LOCK_PUT(l);
+ l = LDLM_LOCK_GET(tmplock);
+ }
+ l_unlock(&res->lr_namespace->ns_lock);
+
+ /* There were no PW locks beyond the size in the LVB; finished. */
+ if (l == NULL)
+ RETURN(ELDLM_LOCK_ABORTED);
+
+ LASSERT(l->l_glimpse_ast != NULL);
+ rc = l->l_glimpse_ast(l, NULL); /* this will update the LVB */
+
+ down(&res->lr_lvb_sem);
+#if 0
+ if (res_lvb->lvb_size == reply_lvb->lvb_size)
+ LDLM_ERROR(l, "we lost the glimpse race!");
+#endif
+ reply_lvb->lvb_size = res_lvb->lvb_size;
+ up(&res->lr_lvb_sem);
+
+ LDLM_LOCK_PUT(l);
+
+ RETURN(ELDLM_LOCK_ABORTED);
+}
+
/* mount the file system (secretly) */
int filter_common_setup(struct obd_device *obd, obd_count len, void *buf,
char *option)
LDLM_NAMESPACE_SERVER);
if (obd->obd_namespace == NULL)
GOTO(err_post, rc = -ENOMEM);
+ obd->obd_namespace->ns_lvbp = obd;
+ obd->obd_namespace->ns_lvbo = &filter_lvbo;
+ ldlm_register_intent(obd->obd_namespace, filter_intent_policy);
ptlrpc_init_client(LDLM_CB_REQUEST_PORTAL, LDLM_CB_REPLY_PORTAL,
"filter_ldlm_cb_client", &obd->obd_ldlm_client);
struct filter_obd *filter;
struct dentry *dentry;
struct iattr iattr;
+ struct ldlm_res_id res_id = { .name = { oa->o_id } };
+ struct ldlm_resource *res;
void *handle;
int rc, rc2;
ENTRY;
rc = rc2;
}
+ if (iattr.ia_valid & ATTR_SIZE) {
+ res = ldlm_resource_get(exp->exp_obd->obd_namespace, NULL,
+ res_id, LDLM_EXTENT, 0);
+ if (res == NULL) {
+ CERROR("!!! resource_get failed for object "LPU64" -- "
+ "filter_setattr with no lock?\n", oa->o_id);
+ } else {
+ if (res->lr_namespace->ns_lvbo &&
+ res->lr_namespace->ns_lvbo->lvbo_update) {
+ rc = res->lr_namespace->ns_lvbo->lvbo_update
+ (res, NULL, 0);
+ }
+ ldlm_resource_putref(res);
+ }
+ }
+
oa->o_valid = OBD_MD_FLID;
obdo_from_inode(oa, dentry->d_inode, FILTER_VALID_FLAGS);
static int filter_precreate(struct obd_device *obd, struct obdo *oa,
obd_gr group, int *num)
{
- struct lustre_handle parent_lockh;
struct dentry *dchild = NULL;
struct filter_obd *filter;
struct dentry *dparent;
CDEBUG(D_INFO, "precreate objid "LPU64"\n", next_id);
- dparent = filter_parent_lock(obd, group, next_id, LCK_PW,
- &parent_lockh);
+ dparent = filter_parent_lock(obd, group, next_id);
if (IS_ERR(dparent))
GOTO(cleanup, rc = PTR_ERR(dparent));
cleanup_phase = 1;
case 2:
f_dput(dchild);
case 1:
- filter_parent_unlock(dparent, &parent_lockh, LCK_PW);
+ filter_parent_unlock(dparent);
case 0:
break;
}
struct dentry *dchild = NULL, *dparent = NULL;
struct obd_run_ctxt saved;
void *handle = NULL;
- struct lustre_handle parent_lockh;
struct llog_cookie *fcc = NULL;
int rc, rc2, cleanup_phase = 0, have_prepared = 0;
obd_gr group = 0;
push_ctxt(&saved, &obd->obd_ctxt, NULL);
acquire_locks:
- dparent = filter_parent_lock(obd, group, oa->o_id, LCK_PW,
- &parent_lockh);
+ dparent = filter_parent_lock(obd, group, oa->o_id);
if (IS_ERR(dparent))
GOTO(cleanup, rc = PTR_ERR(dparent));
cleanup_phase = 1;
* complication of condition the above code to skip it on the
* second time through. */
f_dput(dchild);
- filter_parent_unlock(dparent, &parent_lockh, LCK_PW);
+ filter_parent_unlock(dparent);
filter_prepare_destroy(obd, oa->o_id);
have_prepared = 1;
switch(cleanup_phase) {
case 3:
if (fcc != NULL) {
- if (oti != NULL) {
+ if (oti != NULL)
fsfilt_add_journal_cb(obd, 0, oti->oti_handle,
- filter_cancel_cookies_cb, fcc);
- } else {
+ filter_cancel_cookies_cb,
+ fcc);
+ else
fsfilt_add_journal_cb(obd, 0, handle,
- filter_cancel_cookies_cb, fcc);
- }
+ filter_cancel_cookies_cb,
+ fcc);
}
rc = filter_finish_transno(exp, oti, rc);
rc2 = fsfilt_commit(obd, dparent->d_inode, handle, 0);
case 2:
f_dput(dchild);
case 1:
- if (rc || oti == NULL) {
- filter_parent_unlock(dparent, &parent_lockh, LCK_PW);
- } else {
- memcpy(&oti->oti_ack_locks[0].lock, &parent_lockh,
- sizeof(parent_lockh));
- oti->oti_ack_locks[0].mode = LCK_PW;
- }
+ filter_parent_unlock(dparent);
case 0:
pop_ctxt(&saved, &obd->obd_ctxt, NULL);
break;
#define FILTER_MAX_CACHE_SIZE OBD_OBJECT_EOF
/* filter.c */
-struct dentry *filter_parent(struct obd_device *, obd_gr group, obd_id objid);
-struct dentry *filter_parent_lock(struct obd_device *, obd_gr, obd_id,
- ldlm_mode_t, struct lustre_handle *);
void f_dput(struct dentry *);
struct dentry *filter_fid2dentry(struct obd_device *, struct dentry *dir,
obd_gr group, obd_id id);
int filter_common_setup(struct obd_device *, obd_count len, void *buf,
char *option);
+/* filter_lvb.c */
+extern struct ldlm_valblock_ops filter_lvbo;
+
+
/* filter_io.c */
int filter_preprw(int cmd, struct obd_export *, struct obdo *, int objcount,
struct obd_ioobj *, int niocount, struct niobuf_remote *,
--- /dev/null
+/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
+ * vim:expandtab:shiftwidth=8:tabstop=8:
+ *
+ * linux/fs/obdfilter/filter_log.c
+ *
+ * Copyright (c) 2001-2003 Cluster File Systems, Inc.
+ * Author: Peter Braam <braam@clusterfs.com>
+ * Author: Andreas Dilger <adilger@clusterfs.com>
+ * Author: Phil Schwan <phil@clusterfs.com>
+ *
+ * This file is part of Lustre, http://www.lustre.org.
+ *
+ * Lustre is free software; you can redistribute it and/or
+ * modify it under the terms of version 2 of the GNU General Public
+ * License as published by the Free Software Foundation.
+ *
+ * Lustre is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with Lustre; if not, write to the Free Software
+ * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+ */
+
+#define DEBUG_SUBSYSTEM S_FILTER
+
+#include <linux/config.h>
+#include <linux/module.h>
+#include <linux/version.h>
+
+#include <portals/list.h>
+#include <linux/obd_class.h>
+#include <linux/lustre_dlm.h>
+
+#include "filter_internal.h"
+
+static int filter_lvbo_init(struct ldlm_resource *res)
+{
+ int rc = 0;
+ struct ost_lvb *lvb = NULL;
+ struct obd_device *obd;
+ struct obdo *oa = NULL;
+ struct dentry *dentry;
+ ENTRY;
+
+ LASSERT(res);
+
+ /* we only want lvb's for object resources */
+ /* check for internal locks: these have name[1] != 0 */
+ if (res->lr_name.name[1])
+ RETURN(0);
+
+ down(&res->lr_lvb_sem);
+ if (res->lr_lvb_data)
+ GOTO(out, rc = 0);
+
+ OBD_ALLOC(lvb, sizeof(*lvb));
+ if (lvb == NULL)
+ GOTO(out, rc = -ENOMEM);
+
+ res->lr_lvb_data = lvb;
+ res->lr_lvb_len = sizeof(*lvb);
+
+ obd = res->lr_namespace->ns_lvbp;
+ LASSERT(obd != NULL);
+
+ oa = obdo_alloc();
+ if (oa == NULL)
+ GOTO(out, rc = -ENOMEM);
+
+ oa->o_id = res->lr_name.name[0];
+ oa->o_gr = 0;
+ dentry = filter_oa2dentry(obd, oa);
+ if (IS_ERR(dentry))
+ GOTO(out, rc = PTR_ERR(dentry));
+
+ /* Limit the valid bits in the return data to what we actually use */
+ oa->o_valid = OBD_MD_FLID;
+ obdo_from_inode(oa, dentry->d_inode, FILTER_VALID_FLAGS);
+ f_dput(dentry);
+
+ lvb->lvb_size = dentry->d_inode->i_size;
+ lvb->lvb_time = LTIME_S(dentry->d_inode->i_mtime);
+
+ out:
+ if (oa)
+ obdo_free(oa);
+ if (rc && lvb != NULL) {
+ OBD_FREE(lvb, sizeof(*lvb));
+ res->lr_lvb_data = NULL;
+ res->lr_lvb_len = 0;
+ }
+ up(&res->lr_lvb_sem);
+ return rc;
+}
+
+/* This will be called in two ways:
+ *
+ * m != NULL : called by the DLM itself after a glimpse callback
+ * m == NULL : called by the filter after a disk write
+ */
+static int filter_lvbo_update(struct ldlm_resource *res, struct lustre_msg *m,
+ int buf_idx)
+{
+ int rc = 0;
+ struct ost_lvb *lvb = res->lr_lvb_data;
+ struct obd_device *obd;
+ struct obdo *oa = NULL;
+ struct dentry *dentry;
+ ENTRY;
+
+ LASSERT(res);
+
+ /* we only want lvb's for object resources */
+ /* check for internal locks: these have name[1] != 0 */
+ if (res->lr_name.name[1])
+ RETURN(0);
+
+ down(&res->lr_lvb_sem);
+ if (!res->lr_lvb_data) {
+ CERROR("No lvb when running lvbo_update!\n");
+ GOTO(out, rc = 0);
+ }
+
+ /* Update the LVB from the network message */
+ if (m != NULL) {
+ struct ost_lvb *new;
+
+ new = lustre_swab_buf(m, buf_idx, sizeof(*new),
+ lustre_swab_ost_lvb);
+ if (new == NULL) {
+ CERROR("lustre_swab_buf failed\n");
+ //GOTO(out, rc = -EPROTO);
+ GOTO(out, rc = 0);
+ }
+ if (new->lvb_size > lvb->lvb_size) {
+ CDEBUG(D_DLMTRACE, "res: "LPU64" updating lvb size: "
+ LPU64" -> "LPU64"\n", res->lr_name.name[0],
+ lvb->lvb_size, new->lvb_size);
+ lvb->lvb_size = new->lvb_size;
+ }
+ if (new->lvb_time > lvb->lvb_time) {
+ CDEBUG(D_DLMTRACE, "res: "LPU64" updating lvb time: "
+ LPU64" -> "LPU64"\n", res->lr_name.name[0],
+ lvb->lvb_time, new->lvb_time);
+ lvb->lvb_time = new->lvb_time;
+ }
+ GOTO(out, rc = 0);
+ }
+
+ /* Update the LVB from the disk inode */
+ obd = res->lr_namespace->ns_lvbp;
+ LASSERT(obd);
+
+ oa = obdo_alloc();
+ if (oa == NULL)
+ GOTO(out, rc = -ENOMEM);
+
+ oa->o_id = res->lr_name.name[0];
+ oa->o_gr = 0;
+ dentry = filter_oa2dentry(obd, oa);
+ if (IS_ERR(dentry))
+ GOTO(out, rc = PTR_ERR(dentry));
+
+ /* Limit the valid bits in the return data to what we actually use */
+ oa->o_valid = OBD_MD_FLID;
+ obdo_from_inode(oa, dentry->d_inode, FILTER_VALID_FLAGS);
+
+ lvb->lvb_size = dentry->d_inode->i_size;
+ lvb->lvb_time = LTIME_S(dentry->d_inode->i_mtime);
+ CDEBUG(D_DLMTRACE, "res: "LPU64" initial lvb size: "LPU64", time: "
+ LPU64"\n", res->lr_name.name[0], lvb->lvb_size, lvb->lvb_time);
+ f_dput(dentry);
+
+ out:
+ if (oa != NULL)
+ obdo_free(oa);
+ up(&res->lr_lvb_sem);
+ return rc;
+}
+
+
+
+struct ldlm_valblock_ops filter_lvbo = {
+ lvbo_init: filter_lvbo_init,
+ lvbo_update: filter_lvbo_update
+};
} else {
osc_consume_write_grant(cli, ocw->ocw_oap);
}
+
wake_up(&ocw->ocw_waitq);
}
if (cli->cl_dirty_max < PAGE_SIZE)
return(-EDQUOT);
-
/* Hopefully normal case - cache space and write credits available */
if (cli->cl_dirty + PAGE_SIZE <= cli->cl_dirty_max &&
cli->cl_avail_grant >= PAGE_SIZE) {
}
static int osc_enqueue(struct obd_export *exp, struct lov_stripe_md *lsm,
- struct lustre_handle *parent_lock,
- __u32 type, void *extentp, int extent_len, __u32 mode,
- int *flags, void *callback, void *data,
+ __u32 type, ldlm_policy_data_t *policy, __u32 mode,
+ int *flags, void *bl_cb, void *cp_cb, void *gl_cb,
+ void *data, __u32 lvb_len, void *lvb_swabber,
struct lustre_handle *lockh)
{
struct ldlm_res_id res_id = { .name = {lsm->lsm_object_id} };
struct obd_device *obd = exp->exp_obd;
- struct ldlm_extent *extent = extentp;
+ struct ost_lvb lvb;
int rc;
ENTRY;
/* Filesystem lock extents are extended to page boundaries so that
* dealing with the page cache is a little smoother. */
- extent->start -= extent->start & ~PAGE_MASK;
- extent->end |= ~PAGE_MASK;
+ policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
+ policy->l_extent.end |= ~PAGE_MASK;
/* Next, search for already existing extent locks that will cover us */
- rc = ldlm_lock_match(obd->obd_namespace, 0, &res_id,
- type, extent, sizeof(*extent), mode, lockh);
+ rc = ldlm_lock_match(obd->obd_namespace, 0, &res_id, type, policy, mode,
+ lockh);
if (rc == 1) {
osc_set_data_with_check(lockh, data);
+ if (*flags & LDLM_FL_HAS_INTENT) {
+ /* I would like to be able to ASSERT here that rss <=
+ * kms, but I can't, for reasons which are explained in
+ * lov_enqueue() */
+ }
/* We already have a lock, and it's referenced */
RETURN(ELDLM_OK);
}
if (mode == LCK_PR) {
rc = ldlm_lock_match(obd->obd_namespace, 0, &res_id, type,
- extent, sizeof(*extent), LCK_PW, lockh);
+ policy, LCK_PW, lockh);
if (rc == 1) {
/* FIXME: This is not incredibly elegant, but it might
* be more elegant than adding another parameter to
}
}
- rc = ldlm_cli_enqueue(exp, NULL, obd->obd_namespace, parent_lock,
- res_id, type, extent, sizeof(*extent), mode,
- flags,ldlm_completion_ast, callback, data, lockh);
+ rc = ldlm_cli_enqueue(exp, NULL, obd->obd_namespace, res_id, type,
+ policy, mode, flags, bl_cb, cp_cb, gl_cb, data,
+ &lvb, sizeof(lvb), lustre_swab_ost_lvb, lockh);
+
+ if ((*flags & LDLM_FL_HAS_INTENT && rc == ELDLM_LOCK_ABORTED) || !rc)
+ lsm->lsm_oinfo->loi_rss = lvb.lvb_size;
+
RETURN(rc);
}
static int osc_match(struct obd_export *exp, struct lov_stripe_md *lsm,
- __u32 type, void *extentp, int extent_len, __u32 mode,
+ __u32 type, ldlm_policy_data_t *policy, __u32 mode,
int *flags, void *data, struct lustre_handle *lockh)
{
struct ldlm_res_id res_id = { .name = {lsm->lsm_object_id} };
struct obd_device *obd = exp->exp_obd;
- struct ldlm_extent *extent = extentp;
int rc;
ENTRY;
/* Filesystem lock extents are extended to page boundaries so that
* dealing with the page cache is a little smoother */
- extent->start -= extent->start & ~PAGE_MASK;
- extent->end |= ~PAGE_MASK;
+ policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
+ policy->l_extent.end |= ~PAGE_MASK;
/* Next, search for already existing extent locks that will cover us */
rc = ldlm_lock_match(obd->obd_namespace, *flags, &res_id, type,
- extent, sizeof(*extent), mode, lockh);
+ policy, mode, lockh);
if (rc) {
osc_set_data_with_check(lockh, data);
RETURN(rc);
* writers can share a single PW lock. */
if (mode == LCK_PR) {
rc = ldlm_lock_match(obd->obd_namespace, *flags, &res_id, type,
- extent, sizeof(*extent), LCK_PW, lockh);
+ policy, LCK_PW, lockh);
if (rc == 1) {
/* FIXME: This is not incredibly elegant, but it might
* be more elegant than adding another parameter to
return rc;
}
-static int osc_lock_contains(struct obd_export *exp, struct lov_stripe_md *lsm,
- struct ldlm_lock *lock, obd_off offset)
-{
- ENTRY;
- if (exp == NULL)
- RETURN(-ENODEV);
-
- if (lock->l_policy_data.l_extent.start <= offset &&
- lock->l_policy_data.l_extent.end >= offset)
- RETURN(1);
- RETURN(0);
-}
-
static int osc_invalidate_import(struct obd_device *obd,
struct obd_import *imp)
{
o_iocontrol: osc_iocontrol,
o_get_info: osc_get_info,
o_set_info: osc_set_info,
- o_lock_contains:osc_lock_contains,
o_invalidate_import: osc_invalidate_import,
o_llog_init: osc_llog_init,
o_llog_finish: osc_llog_finish,
o_cancel: osc_cancel,
o_cancel_unused:osc_cancel_unused,
o_iocontrol: osc_iocontrol,
- o_lock_contains:osc_lock_contains,
o_invalidate_import: osc_invalidate_import,
o_llog_init: osc_llog_init,
o_llog_finish: osc_llog_finish,
CDEBUG(D_INODE, "enqueue\n");
OBD_FAIL_RETURN(OBD_FAIL_LDLM_ENQUEUE, 0);
rc = ldlm_handle_enqueue(req, ldlm_server_completion_ast,
- ldlm_server_blocking_ast);
+ ldlm_server_blocking_ast,
+ ldlm_server_glimpse_ast);
fail = OBD_FAIL_OST_LDLM_REPLY_NET;
break;
case LDLM_CONVERT:
}
#ifdef __KERNEL__
-#include <linux/lustre_version.h>
-#if (LUSTRE_KERNEL_VERSION >= 30)
-#warning "FIXME: remove workaround when l30 is widely used"
char stack_backtrace[LUSTRE_TRACE_SIZE];
spinlock_t stack_backtrace_lock = SPIN_LOCK_UNLOCKED;
char *portals_debug_dumpstack(void)
{
asm("int $3");
- return "dump stack";
+ return "dump stack\n";
}
#elif defined(__i386__)
#endif /* __arch_um__ */
EXPORT_SYMBOL(stack_backtrace_lock);
EXPORT_SYMBOL(portals_debug_dumpstack);
-#endif /* LUSTRE_KERNEL_VERSION < 30 */
#endif /* __KERNEL__ */
EXPORT_SYMBOL(portals_debug_dumplog);
{ LDLM_CANCEL, "ldlm_cancel" },
{ LDLM_BL_CALLBACK, "ldlm_bl_callback" },
{ LDLM_CP_CALLBACK, "ldlm_cp_callback" },
+ { LDLM_GL_CALLBACK, "ldlm_gl_callback" },
{ PTLBD_QUERY, "ptlbd_query" },
{ PTLBD_READ, "ptlbd_read" },
{ PTLBD_WRITE, "ptlbd_write" },
return (str);
}
-/* Wrap up the normal fixed length case */
-void *lustre_swab_reqbuf (struct ptlrpc_request *req, int index, int min_size,
- void *swabber)
+/* Wrap up the normal fixed length cases */
+void *lustre_swab_buf(struct lustre_msg *msg, int index, int min_size,
+ void *swabber)
{
void *ptr;
- LASSERT_REQSWAB(req, index);
-
- ptr = lustre_msg_buf(req->rq_reqmsg, index, min_size);
+ ptr = lustre_msg_buf(msg, index, min_size);
if (ptr == NULL)
return NULL;
- if (swabber != NULL && lustre_msg_swabbed(req->rq_reqmsg))
+ if (swabber != NULL && lustre_msg_swabbed(msg))
((void (*)(void *))swabber)(ptr);
return ptr;
}
-/* Wrap up the normal fixed length case */
-void *lustre_swab_repbuf (struct ptlrpc_request *req, int index, int min_size,
- void *swabber)
+void *lustre_swab_reqbuf(struct ptlrpc_request *req, int index, int min_size,
+ void *swabber)
{
- void *ptr;
+ LASSERT_REQSWAB(req, index);
+ return lustre_swab_buf(req->rq_reqmsg, index, min_size, swabber);
+}
+void *lustre_swab_repbuf(struct ptlrpc_request *req, int index, int min_size,
+ void *swabber)
+{
LASSERT_REPSWAB(req, index);
-
- ptr = lustre_msg_buf(req->rq_repmsg, index, min_size);
- if (ptr == NULL)
- return NULL;
-
- if (swabber != NULL && lustre_msg_swabbed(req->rq_repmsg))
- ((void (*)(void *))swabber)(ptr);
-
- return ptr;
+ return lustre_swab_buf(req->rq_repmsg, index, min_size, swabber);
}
/* byte flipping routines for all wire types declared in
__swab64s(id);
}
+void lustre_swab_ost_lvb(struct ost_lvb *lvb)
+{
+ __swab64s(&lvb->lvb_size);
+ __swab64s(&lvb->lvb_time);
+}
+
void lustre_swab_ll_fid (struct ll_fid *fid)
{
__swab64s (&fid->id);
void lustre_swab_ldlm_resource_desc (struct ldlm_resource_desc *r)
{
- int i;
-
__swab32s (&r->lr_type);
lustre_swab_ldlm_res_id (&r->lr_name);
- for (i = 0; i < RES_VERSION_SIZE; i++)
- __swab32s (&r->lr_version[i]);
}
void lustre_swab_ldlm_lock_desc (struct ldlm_lock_desc *l)
{
- int i;
-
lustre_swab_ldlm_resource_desc (&l->l_resource);
__swab32s (&l->l_req_mode);
__swab32s (&l->l_granted_mode);
lustre_swab_ldlm_policy_data (&l->l_policy_data);
- for (i = 0; i < RES_VERSION_SIZE; i++)
- __swab32s (&l->l_version[i]);
}
void lustre_swab_ldlm_request (struct ldlm_request *rq)
void lustre_swab_ldlm_reply (struct ldlm_reply *r)
{
__swab32s (&r->lock_flags);
- __swab32s (&r->lock_mode);
- lustre_swab_ldlm_res_id (&r->lock_resource_name);
+ lustre_swab_ldlm_lock_desc (&r->lock_desc);
/* lock_handle opaque */
- lustre_swab_ldlm_policy_data (&r->lock_policy_data);
__swab64s (&r->lock_policy_res1);
__swab64s (&r->lock_policy_res2);
}
LASSERT((int)sizeof(((struct ldlm_intent *)0)->opc) == 8);
/* Checks for struct ldlm_resource_desc */
- LASSERT((int)sizeof(struct ldlm_resource_desc) == 52);
+ LASSERT((int)sizeof(struct ldlm_resource_desc) == 40);
LASSERT(offsetof(struct ldlm_resource_desc, lr_type) == 0);
LASSERT((int)sizeof(((struct ldlm_resource_desc *)0)->lr_type) == 4);
- LASSERT(offsetof(struct ldlm_resource_desc, lr_name) == 4);
+ LASSERT(offsetof(struct ldlm_resource_desc, lr_name) == 8);
LASSERT((int)sizeof(((struct ldlm_resource_desc *)0)->lr_name) == 32);
- LASSERT(offsetof(struct ldlm_resource_desc, lr_version[4]) == 52);
- LASSERT((int)sizeof(((struct ldlm_resource_desc *)0)->lr_version[4]) == 4);
/* Checks for struct ldlm_lock_desc */
LASSERT((int)sizeof(struct ldlm_lock_desc) == 108);
EXPORT_SYMBOL(lustre_unpack_msg);
EXPORT_SYMBOL(lustre_msg_buf);
EXPORT_SYMBOL(lustre_msg_string);
+EXPORT_SYMBOL(lustre_swab_buf);
EXPORT_SYMBOL(lustre_swab_reqbuf);
EXPORT_SYMBOL(lustre_swab_repbuf);
EXPORT_SYMBOL(lustre_swab_obdo);
EXPORT_SYMBOL(lustre_swab_niobuf_remote);
EXPORT_SYMBOL(lustre_swab_ost_body);
EXPORT_SYMBOL(lustre_swab_ost_last_id);
+EXPORT_SYMBOL(lustre_swab_ost_lvb);
EXPORT_SYMBOL(lustre_swab_ll_fid);
EXPORT_SYMBOL(lustre_swab_mds_status_req);
EXPORT_SYMBOL(lustre_swab_mds_body);
# lustre.spec
-%define version HEAD
+%define version b_size
%define kversion @LINUXRELEASE@
%define linuxdir @LINUX@
%define enable_doc @ENABLE_DOC@
# any number of fixes (don't get {0,EOF} on open, match
# composite locks, do smarter file size management) fix
# this, but for now we want these tests to verify that
-# the cancelation with truncate intent works, so we
+# the cancellation with truncate intent works, so we
# start the file with a full-file pw lock to match against
# until the truncate.
trunc_test() {
test_43() {
mkdir $DIR/d43
cp -p /bin/ls $DIR/d43/f
- exec 100>> $DIR/d43/f
+ exec 100>> $DIR/d43/f
$DIR/d43/f && error || true
exec 100<&-
}
test_43a() {
mkdir -p $DIR/d43
cp -p `which multiop` $DIR/d43/multiop
- touch $DIR/d43/g
- $DIR/d43/multiop $DIR/d43/g o_c &
+ $DIR/d43/multiop $TMP/test43.junk O_c &
MULTIPID=$!
sleep 1
multiop $DIR/d43/multiop Oc && error "expected error, got success"
test_43b() {
mkdir -p $DIR/d43
cp -p `which multiop` $DIR/d43/multiop
- touch $DIR/d43/g
- $DIR/d43/multiop $DIR/d43/g o_c &
+ $DIR/d43/multiop $TMP/test43.junk O_c &
MULTIPID=$!
sleep 1
truncate $DIR/d43/multiop 0 && error "expected error, got success"
do_dirty_record "echo blah > $f"
[ $before -eq $after ] && error "write wasn't cached"
do_dirty_record "cancel_lru_locks OSC"
- [ $before -gt $after ] || error "lock cancelation didn't lower dirty count"
+ [ $before -gt $after ] || error "lock cancellation didn't lower dirty count"
start_kupdated
}
run_test 45 "osc io page accounting ============================"
cancel_lru_locks OSC
echo 0x405 > /proc/sys/lustre/fail_loc
cat $f && error "cat succeeded, expect -EIO"
- multiop $f Owc && error "multiop succeeded, expect -EIO"
echo 0 > /proc/sys/lustre/fail_loc
}
run_test 62 "verify obd_match failure doesn't LBUG (should -EIO)"
}
run_test 13 "test directory page revocation ===================="
-test_14() { # bug 974 - ENOSPC
+test_14() {
+ mkdir $DIR1/d14
+ cp -p /bin/ls $DIR1/d14/ls
+ exec 100>> $DIR1/d14/ls
+ $DIR2/d14/ls && error || true
+ exec 100<&-
+}
+run_test 14 "execution of file opened for write should return -ETXTBSY=="
+
+test_14a() {
+ mkdir -p $DIR1/d14
+ cp -p `which multiop` $DIR1/d14/multiop
+ $DIR1/d14/multiop $TMP/test14.junk O_c &
+ MULTIPID=$!
+ sleep 1
+ multiop $DIR2/d14/multiop Oc && error "expected error, got success"
+ kill -USR1 $MULTIPID || return 2
+ wait $MULTIPID || return 3
+}
+run_test 14a "open(RDWR) of file being executed should return -ETXTBSY"
+
+test_14b() {
+ mkdir -p $DIR1/d14
+ cp -p `which multiop` $DIR1/d14/multiop
+ $DIR1/d14/multiop $TMP/test14.junk O_c &
+ MULTIPID=$!
+ sleep 1
+ truncate $DIR2/d14/multiop 0 && error "expected error, got success"
+ kill -USR1 $MULTIPID || return 2
+ wait $MULTIPID || return 3
+}
+run_test 14b "truncate of file being executed should return -ETXTBSY"
+
+test_15() { # bug 974 - ENOSPC
env
sh oos2.sh $MOUNT1 $MOUNT2
}
-run_test 14 "test out-of-space with multiple writers ==========="
+run_test 15 "test out-of-space with multiple writers ==========="
log "cleanup: ======================================================"
rm -rf $DIR1/[df][0-9]* $DIR1/lnk || true
CHECK_STRUCT(ldlm_resource_desc);
CHECK_MEMBER(ldlm_resource_desc, lr_type);
CHECK_MEMBER(ldlm_resource_desc, lr_name);
- CHECK_MEMBER(ldlm_resource_desc, lr_version[RES_VERSION_SIZE]);
}
void
CHECK_MEMBER(ldlm_lock_desc, l_req_mode);
CHECK_MEMBER(ldlm_lock_desc, l_granted_mode);
CHECK_MEMBER(ldlm_lock_desc, l_policy_data);
- CHECK_MEMBER(ldlm_lock_desc, l_version[RES_VERSION_SIZE]);
}
void
BLANK_LINE();
CHECK_STRUCT(ldlm_reply);
CHECK_MEMBER(ldlm_reply, lock_flags);
- CHECK_MEMBER(ldlm_reply, lock_mode);
- CHECK_MEMBER(ldlm_reply, lock_resource_name);
+ CHECK_MEMBER(ldlm_request, lock_desc);
CHECK_MEMBER(ldlm_reply, lock_handle);
- CHECK_MEMBER(ldlm_reply, lock_policy_data);
CHECK_MEMBER(ldlm_reply, lock_policy_res1);
CHECK_MEMBER(ldlm_reply, lock_policy_res2);
}
void lustre_assert_wire_constants(void)
{
/* Wire protocol assertions generated by 'wirecheck'
- * running on Linux schnapps.adilger.int 2.4.22-l32 #4 Thu Jan 8 14:32:57 MST 2004 i686 i686
- * with gcc version 3.2.2 20030222 (Red Hat Linux 3.2.2-5) */
+ * running on Linux adevi 2.4.18-p4smp-10pre1 #1 SMP Thu Feb 5 14:52:15 PST 2004 i686 unknown
+ * with gcc version 2.96 20000731 (Red Hat Linux 7.3 2.96-113) */
/* Constants... */
LASSERT(LDLM_CANCEL == 103);
LASSERT(LDLM_BL_CALLBACK == 104);
LASSERT(LDLM_CP_CALLBACK == 105);
- LASSERT(LDLM_LAST_OPC == 106);
+ LASSERT(LDLM_LAST_OPC == 107);
LASSERT(LCK_EX == 1);
LASSERT(LCK_PW == 2);
LASSERT(LCK_PR == 3);
LASSERT((int)sizeof(((struct ldlm_intent *)0)->opc) == 8);
/* Checks for struct ldlm_resource_desc */
- LASSERT((int)sizeof(struct ldlm_resource_desc) == 52);
+ LASSERT((int)sizeof(struct ldlm_resource_desc) == 40);
LASSERT(offsetof(struct ldlm_resource_desc, lr_type) == 0);
LASSERT((int)sizeof(((struct ldlm_resource_desc *)0)->lr_type) == 4);
- LASSERT(offsetof(struct ldlm_resource_desc, lr_name) == 4);
+ LASSERT(offsetof(struct ldlm_resource_desc, lr_name) == 8);
LASSERT((int)sizeof(((struct ldlm_resource_desc *)0)->lr_name) == 32);
- LASSERT(offsetof(struct ldlm_resource_desc, lr_version[4]) == 52);
- LASSERT((int)sizeof(((struct ldlm_resource_desc *)0)->lr_version[4]) == 4);
/* Checks for struct ldlm_lock_desc */
- LASSERT((int)sizeof(struct ldlm_lock_desc) == 108);
+ LASSERT((int)sizeof(struct ldlm_lock_desc) == 80);
LASSERT(offsetof(struct ldlm_lock_desc, l_resource) == 0);
- LASSERT((int)sizeof(((struct ldlm_lock_desc *)0)->l_resource) == 52);
- LASSERT(offsetof(struct ldlm_lock_desc, l_req_mode) == 52);
+ LASSERT((int)sizeof(((struct ldlm_lock_desc *)0)->l_resource) == 40);
+ LASSERT(offsetof(struct ldlm_lock_desc, l_req_mode) == 40);
LASSERT((int)sizeof(((struct ldlm_lock_desc *)0)->l_req_mode) == 4);
- LASSERT(offsetof(struct ldlm_lock_desc, l_granted_mode) == 56);
+ LASSERT(offsetof(struct ldlm_lock_desc, l_granted_mode) == 44);
LASSERT((int)sizeof(((struct ldlm_lock_desc *)0)->l_granted_mode) == 4);
- LASSERT(offsetof(struct ldlm_lock_desc, l_policy_data) == 60);
+ LASSERT(offsetof(struct ldlm_lock_desc, l_policy_data) == 48);
LASSERT((int)sizeof(((struct ldlm_lock_desc *)0)->l_policy_data) == 32);
- LASSERT(offsetof(struct ldlm_lock_desc, l_version[4]) == 108);
- LASSERT((int)sizeof(((struct ldlm_lock_desc *)0)->l_version[4]) == 4);
/* Checks for struct ldlm_request */
- LASSERT((int)sizeof(struct ldlm_request) == 128);
+ LASSERT((int)sizeof(struct ldlm_request) == 104);
LASSERT(offsetof(struct ldlm_request, lock_flags) == 0);
LASSERT((int)sizeof(((struct ldlm_request *)0)->lock_flags) == 4);
- LASSERT(offsetof(struct ldlm_request, lock_desc) == 4);
- LASSERT((int)sizeof(((struct ldlm_request *)0)->lock_desc) == 108);
- LASSERT(offsetof(struct ldlm_request, lock_handle1) == 112);
+ LASSERT(offsetof(struct ldlm_request, lock_desc) == 8);
+ LASSERT((int)sizeof(((struct ldlm_request *)0)->lock_desc) == 80);
+ LASSERT(offsetof(struct ldlm_request, lock_handle1) == 88);
LASSERT((int)sizeof(((struct ldlm_request *)0)->lock_handle1) == 8);
- LASSERT(offsetof(struct ldlm_request, lock_handle2) == 120);
+ LASSERT(offsetof(struct ldlm_request, lock_handle2) == 96);
LASSERT((int)sizeof(((struct ldlm_request *)0)->lock_handle2) == 8);
/* Checks for struct ldlm_reply */
- LASSERT((int)sizeof(struct ldlm_reply) == 96);
+ LASSERT((int)sizeof(struct ldlm_reply) == 108);
LASSERT(offsetof(struct ldlm_reply, lock_flags) == 0);
LASSERT((int)sizeof(((struct ldlm_reply *)0)->lock_flags) == 4);
- LASSERT(offsetof(struct ldlm_reply, lock_mode) == 4);
- LASSERT((int)sizeof(((struct ldlm_reply *)0)->lock_mode) == 4);
- LASSERT(offsetof(struct ldlm_reply, lock_resource_name) == 8);
- LASSERT((int)sizeof(((struct ldlm_reply *)0)->lock_resource_name) == 32);
- LASSERT(offsetof(struct ldlm_reply, lock_handle) == 40);
+ LASSERT(offsetof(struct ldlm_request, lock_desc) == 8);
+ LASSERT((int)sizeof(((struct ldlm_request *)0)->lock_desc) == 80);
+ LASSERT(offsetof(struct ldlm_reply, lock_handle) == 84);
LASSERT((int)sizeof(((struct ldlm_reply *)0)->lock_handle) == 8);
- LASSERT(offsetof(struct ldlm_reply, lock_policy_data) == 48);
- LASSERT((int)sizeof(((struct ldlm_reply *)0)->lock_policy_data) == 32);
- LASSERT(offsetof(struct ldlm_reply, lock_policy_res1) == 80);
+ LASSERT(offsetof(struct ldlm_reply, lock_policy_res1) == 92);
LASSERT((int)sizeof(((struct ldlm_reply *)0)->lock_policy_res1) == 8);
- LASSERT(offsetof(struct ldlm_reply, lock_policy_res2) == 88);
+ LASSERT(offsetof(struct ldlm_reply, lock_policy_res2) == 100);
LASSERT((int)sizeof(((struct ldlm_reply *)0)->lock_policy_res2) == 8);
/* Checks for struct ptlbd_op */