#include <linux/lprocfs_status.h>
#include <linux/lustre_log.h>
#include <linux/lustre_commit_confd.h>
+#include <portals/list.h>
#include "filter_internal.h"
filter->fo_fsd->fsd_last_transno = cpu_to_le64(last_rcvd);
spin_unlock(&filter->fo_translock);
oti->oti_transno = last_rcvd;
- } else {
+ } else {
spin_lock(&filter->fo_translock);
last_rcvd = oti->oti_transno;
if (last_rcvd > le64_to_cpu(filter->fo_fsd->fsd_last_transno))
spin_unlock(&filter->fo_translock);
}
fcd->fcd_last_rcvd = cpu_to_le64(last_rcvd);
- fcd->fcd_mount_count = filter->fo_fsd->fsd_mount_count;
/* could get xid from oti, if it's ever needed */
fcd->fcd_last_xid = 0;
CDEBUG(D_INFO, "freeing client at idx %u (%lld) with UUID '%s'\n",
fed->fed_lr_idx, fed->fed_lr_off, fed->fed_fcd->fcd_uuid);
- if (!test_and_clear_bit(fed->fed_lr_idx, filter->fo_last_rcvd_slots)) {
+ /* Clear the bit _after_ zeroing out the client so we don't
+ race with filter_client_add and zero out new clients.*/
+ if (!test_bit(fed->fed_lr_idx, filter->fo_last_rcvd_slots)) {
CERROR("FILTER client %u: bit already clear in bitmap!!\n",
fed->fed_lr_idx);
LBUG();
fed->fed_fcd->fcd_uuid, fed->fed_lr_idx, fed->fed_lr_off,
LAST_RCVD, rc);
+ if (!test_and_clear_bit(fed->fed_lr_idx, filter->fo_last_rcvd_slots)) {
+ CERROR("FILTER client %u: bit already clear in bitmap!!\n",
+ fed->fed_lr_idx);
+ LBUG();
+ }
+
free:
OBD_FREE(fed->fed_fcd, sizeof(*fed->fed_fcd));
filter->fo_subdir_count = le16_to_cpu(fsd->fsd_subdir_count);
}
- if (fsd->fsd_feature_incompat & ~le32_to_cpu(FILTER_INCOMPAT_SUPP)) {
+ if (fsd->fsd_feature_incompat & ~cpu_to_le32(FILTER_INCOMPAT_SUPP)) {
CERROR("unsupported feature %x\n",
le32_to_cpu(fsd->fsd_feature_incompat) &
~FILTER_INCOMPAT_SUPP);
GOTO(err_fsd, rc = -EINVAL);
}
- if (fsd->fsd_feature_rocompat & ~le32_to_cpu(FILTER_ROCOMPAT_SUPP)) {
+ if (fsd->fsd_feature_rocompat & ~cpu_to_le32(FILTER_ROCOMPAT_SUPP)) {
CERROR("read-only feature %x\n",
le32_to_cpu(fsd->fsd_feature_rocompat) &
~FILTER_ROCOMPAT_SUPP);
CDEBUG(D_INODE, "%s: server last_rcvd : "LPU64"\n",
obd->obd_name, le64_to_cpu(fsd->fsd_last_transno));
- CDEBUG(D_INODE, "%s: server last_mount: "LPU64"\n",
- obd->obd_name, mount_count);
+ CDEBUG(D_INODE, "%s: server mount_count: "LPU64"\n",
+ obd->obd_name, mount_count + 1);
CDEBUG(D_INODE, "%s: server data size: %u\n",
obd->obd_name, le32_to_cpu(fsd->fsd_server_size));
CDEBUG(D_INODE, "%s: per-client data start: %u\n",
for (cl_idx = 0, off = le32_to_cpu(fsd->fsd_client_start);
off < last_rcvd_size; cl_idx++) {
__u64 last_rcvd;
- int mount_age;
+ struct obd_export *exp;
+ struct filter_export_data *fed;
if (!fcd) {
OBD_ALLOC(fcd, sizeof(*fcd));
/* These exports are cleaned up by filter_disconnect(), so they
* need to be set up like real exports as filter_connect() does.
*/
- mount_age = mount_count - le64_to_cpu(fcd->fcd_mount_count);
- if (mount_age < FILTER_MOUNT_RECOV) {
- struct obd_export *exp = class_new_export(obd);
- struct filter_export_data *fed;
- CERROR("RCVRNG CLIENT uuid: %s idx: %d lr: "LPU64
- " srv lr: "LPU64" mnt: "LPU64" last mount: "
- LPU64"\n", fcd->fcd_uuid, cl_idx,
- last_rcvd, le64_to_cpu(fsd->fsd_last_transno),
- le64_to_cpu(fcd->fcd_mount_count), mount_count);
- if (exp == NULL)
- GOTO(err_client, rc = -ENOMEM);
-
- memcpy(&exp->exp_client_uuid.uuid, fcd->fcd_uuid,
- sizeof exp->exp_client_uuid.uuid);
- fed = &exp->exp_filter_data;
- fed->fed_fcd = fcd;
- filter_client_add(obd, filter, fed, cl_idx);
- /* create helper if export init gets more complex */
- spin_lock_init(&fed->fed_lock);
-
- fcd = NULL;
- obd->obd_recoverable_clients++;
- class_export_put(exp);
- } else {
- CDEBUG(D_INFO, "discarded client %d UUID '%s' count "
- LPU64"\n", cl_idx, fcd->fcd_uuid,
- le64_to_cpu(fcd->fcd_mount_count));
- }
+ exp = class_new_export(obd);
+ CDEBUG(D_HA, "RCVRNG CLIENT uuid: %s idx: %d lr: "LPU64
+ " srv lr: "LPU64"\n", fcd->fcd_uuid, cl_idx,
+ last_rcvd, le64_to_cpu(fsd->fsd_last_transno));
+ if (exp == NULL)
+ GOTO(err_client, rc = -ENOMEM);
+
+ memcpy(&exp->exp_client_uuid.uuid, fcd->fcd_uuid,
+ sizeof exp->exp_client_uuid.uuid);
+ fed = &exp->exp_filter_data;
+ fed->fed_fcd = fcd;
+ filter_client_add(obd, filter, fed, cl_idx);
+ /* create helper if export init gets more complex */
+ spin_lock_init(&fed->fed_lock);
+
+ fcd = NULL;
+ obd->obd_recoverable_clients++;
+ class_export_put(exp);
CDEBUG(D_OTHER, "client at idx %d has last_rcvd = "LPU64"\n",
cl_idx, last_rcvd);
- if (last_rcvd > le64_to_cpu(filter->fo_fsd->fsd_last_transno))
- filter->fo_fsd->fsd_last_transno=cpu_to_le64(last_rcvd);
+ if (last_rcvd > le64_to_cpu(fsd->fsd_last_transno))
+ fsd->fsd_last_transno = cpu_to_le64(last_rcvd);
}
- obd->obd_last_committed = le64_to_cpu(filter->fo_fsd->fsd_last_transno);
+ obd->obd_last_committed = le64_to_cpu(fsd->fsd_last_transno);
if (obd->obd_recoverable_clients) {
- CERROR("RECOVERY: %d recoverable clients, last_rcvd "
+ CWARN("RECOVERY: %d recoverable clients, last_rcvd "
LPU64"\n", obd->obd_recoverable_clients,
- le64_to_cpu(filter->fo_fsd->fsd_last_transno));
+ le64_to_cpu(fsd->fsd_last_transno));
obd->obd_next_recovery_transno = obd->obd_last_committed + 1;
obd->obd_recovering = 1;
}
OBD_FREE(fcd, sizeof(*fcd));
out:
- fsd->fsd_mount_count = cpu_to_le64(mount_count + 1);
+ filter->fo_mount_count = mount_count + 1;
+ fsd->fsd_mount_count = cpu_to_le64(filter->fo_mount_count);
/* save it, so mount count and last_transno is current */
rc = filter_update_server_data(obd, filp, filter->fo_fsd, 1);
int i, rc = 0, cleanup_phase = 0;
ENTRY;
- O_dentry = simple_mkdir(current->fs->pwd, "O", 0700);
+ O_dentry = simple_mkdir(current->fs->pwd, "O", 0700, 1);
CDEBUG(D_INODE, "got/created O: %p\n", O_dentry);
if (IS_ERR(O_dentry)) {
rc = PTR_ERR(O_dentry);
loff_t off = 0;
sprintf(name, "%d", i);
- dentry = simple_mkdir(O_dentry, name, 0700);
+ dentry = simple_mkdir(O_dentry, name, 0700, 1);
CDEBUG(D_INODE, "got/created O/%s: %p\n", name, dentry);
if (IS_ERR(dentry)) {
rc = PTR_ERR(dentry);
char dir[20];
snprintf(dir, sizeof(dir), "d%u", i);
- dentry = simple_mkdir(O_dentry, dir, 0700);
+ dentry = simple_mkdir(O_dentry, dir, 0700, 1);
CDEBUG(D_INODE, "got/created O/0/%s: %p\n", dir,dentry);
if (IS_ERR(dentry)) {
rc = PTR_ERR(dentry);
RETURN(0);
}
-static int filter_lock_dentry(struct obd_device *obd, struct dentry *de,
- ldlm_mode_t lock_mode,struct lustre_handle *lockh)
+static int filter_lock_dentry(struct obd_device *obd, struct dentry *dparent)
{
- struct ldlm_res_id res_id = { .name = {0} };
- int flags = 0, rc;
- ENTRY;
-
- res_id.name[0] = de->d_inode->i_ino;
- res_id.name[1] = de->d_inode->i_generation;
- rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, NULL,
- res_id, LDLM_PLAIN, NULL, 0, lock_mode,
- &flags, ldlm_completion_ast,
- filter_blocking_ast, NULL, lockh);
-
- RETURN(rc == ELDLM_OK ? 0 : -EIO); /* XXX translate ldlm code */
+ down(&dparent->d_inode->i_sem);
+ return 0;
}
/* We never dget the object parent, so DON'T dput it either */
-static void filter_parent_unlock(struct dentry *dparent,
- struct lustre_handle *lockh,
- ldlm_mode_t lock_mode)
+static void filter_parent_unlock(struct dentry *dparent)
{
- ldlm_lock_decref(lockh, lock_mode);
+ up(&dparent->d_inode->i_sem);
}
/* We never dget the object parent, so DON'T dput it either */
/* We never dget the object parent, so DON'T dput it either */
struct dentry *filter_parent_lock(struct obd_device *obd, obd_gr group,
- obd_id objid, ldlm_mode_t lock_mode,
- struct lustre_handle *lockh)
+ obd_id objid)
{
unsigned long now = jiffies;
- struct dentry *de = filter_parent(obd, group, objid);
+ struct dentry *dparent = filter_parent(obd, group, objid);
int rc;
- if (IS_ERR(de))
- return de;
+ if (IS_ERR(dparent))
+ return dparent;
- rc = filter_lock_dentry(obd, de, lock_mode, lockh);
+ rc = filter_lock_dentry(obd, dparent);
if (time_after(jiffies, now + 15 * HZ))
CERROR("slow parent lock %lus\n", (jiffies - now) / HZ);
- return rc ? ERR_PTR(rc) : de;
+ return rc ? ERR_PTR(rc) : dparent;
}
/* How to get files, dentries, inodes from object id's.
struct dentry *dir_dentry,
obd_gr group, obd_id id)
{
- struct lustre_handle lockh;
struct dentry *dparent = dir_dentry;
struct dentry *dchild;
char name[32];
len = sprintf(name, LPU64, id);
if (dir_dentry == NULL) {
- dparent = filter_parent_lock(obd, group, id, LCK_PR, &lockh);
+ dparent = filter_parent_lock(obd, group, id);
if (IS_ERR(dparent))
RETURN(dparent);
}
CDEBUG(D_INODE, "looking up object O/%*s/%s\n",
dparent->d_name.len, dparent->d_name.name, name);
- dchild = ll_lookup_one_len(name, dparent, len);
+ dchild = /*ll_*/lookup_one_len(name, dparent, len);
if (dir_dentry == NULL)
- filter_parent_unlock(dparent, &lockh, LCK_PR);
+ filter_parent_unlock(dparent);
if (IS_ERR(dchild)) {
CERROR("child lookup error %ld\n", PTR_ERR(dchild));
RETURN(dchild);
struct lustre_handle lockh;
int flags = LDLM_AST_DISCARD_DATA, rc;
struct ldlm_res_id res_id = { .name = { objid } };
- struct ldlm_extent extent = { 0, OBD_OBJECT_EOF };
- ENTRY;
+ ldlm_policy_data_t policy = { .l_extent = { 0, OBD_OBJECT_EOF } };
+ ENTRY;
/* Tell the clients that the object is gone now and that they should
- * throw away any cached pages. If we're the OST at stripe 0 in the
- * file then this enqueue will communicate the DISCARD to all the
- * clients. This assumes that we always destroy all the objects for
- * a file at a time, as is currently the case. If we're not the
- * OST at stripe 0 then we'll harmlessly get a very lonely lock in
- * the local DLM and immediately drop it. */
- rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, NULL,
- res_id, LDLM_EXTENT, &extent,
- sizeof(extent), LCK_PW, &flags,
- ldlm_completion_ast, filter_blocking_ast,
- NULL, &lockh);
+ * throw away any cached pages. */
+ rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, res_id,
+ LDLM_EXTENT, &policy, LCK_PW,
+ &flags, filter_blocking_ast, ldlm_completion_ast,
+ NULL, NULL, NULL, 0, NULL, &lockh);
/* We only care about the side-effects, just drop the lock. */
if (rc == ELDLM_OK)
if (inode->i_nlink != 1 || atomic_read(&inode->i_count) != 1) {
CERROR("destroying objid %*s nlink = %lu, count = %d\n",
dchild->d_name.len, dchild->d_name.name,
- (unsigned long)inode->i_nlink,
+ (unsigned long)inode->i_nlink,
atomic_read(&inode->i_count));
}
RETURN(rc);
}
+static int filter_intent_policy(struct ldlm_namespace *ns,
+ struct ldlm_lock **lockp, void *req_cookie,
+ ldlm_mode_t mode, int flags, void *data)
+{
+ struct list_head rpc_list = LIST_HEAD_INIT(rpc_list);
+ struct ptlrpc_request *req = req_cookie;
+ struct ldlm_lock *lock = *lockp, *l = NULL;
+ struct ldlm_resource *res = lock->l_resource;
+ ldlm_processing_policy policy;
+ struct ost_lvb *res_lvb, *reply_lvb;
+ struct list_head *tmp;
+ ldlm_error_t err;
+ int tmpflags = 0, rc, repsize[2] = {sizeof(struct ldlm_reply),
+ sizeof(struct ost_lvb) };
+ ENTRY;
+
+ policy = ldlm_get_processing_policy(res);
+ LASSERT(policy != NULL);
+ LASSERT(req != NULL);
+
+ rc = lustre_pack_reply(req, 2, repsize, NULL);
+ if (rc)
+ RETURN(req->rq_status = rc);
+
+ reply_lvb = lustre_msg_buf(req->rq_repmsg, 1, sizeof(*reply_lvb));
+ LASSERT(reply_lvb != NULL);
+
+ //fixup_handle_for_resent_req(req, lock, &lockh);
+
+ /* If we grant any lock at all, it will be a whole-file read lock.
+ * Call the extent policy function to see if our request can be
+ * granted, or is blocked. */
+ lock->l_policy_data.l_extent.start = 0;
+ lock->l_policy_data.l_extent.end = OBD_OBJECT_EOF;
+ lock->l_req_mode = LCK_PR;
+
+ l_lock(&res->lr_namespace->ns_lock);
+
+ res->lr_tmp = &rpc_list;
+ rc = policy(lock, &tmpflags, 0, &err);
+ res->lr_tmp = NULL;
+
+ /* FIXME: we should change the policy function slightly, to not make
+ * this list at all, since we just turn around and free it */
+ while (!list_empty(&rpc_list)) {
+ struct ldlm_ast_work *w =
+ list_entry(rpc_list.next, struct ldlm_ast_work, w_list);
+ list_del(&w->w_list);
+ LDLM_LOCK_PUT(w->w_lock);
+ OBD_FREE(w, sizeof(*w));
+ }
+
+ if (rc == LDLM_ITER_CONTINUE) {
+ /* The lock met with no resistance; we're finished. */
+ l_unlock(&res->lr_namespace->ns_lock);
+ RETURN(ELDLM_LOCK_REPLACED);
+ }
+
+ /* Do not grant any lock, but instead send GL callbacks. The extent
+ * policy nicely created a list of all PW locks for us. We will choose
+ * the highest of those which are larger than the size in the LVB, if
+ * any, and perform a glimpse callback. */
+ down(&res->lr_lvb_sem);
+ res_lvb = res->lr_lvb_data;
+ LASSERT(res_lvb != NULL);
+ reply_lvb->lvb_size = res_lvb->lvb_size;
+ up(&res->lr_lvb_sem);
+
+ list_for_each(tmp, &res->lr_granted) {
+ struct ldlm_lock *tmplock =
+ list_entry(tmp, struct ldlm_lock, l_res_link);
+
+ if (tmplock->l_granted_mode == LCK_PR)
+ continue;
+
+ if (tmplock->l_policy_data.l_extent.end <=
+ reply_lvb->lvb_size)
+ continue;
+
+ if (l == NULL) {
+ l = LDLM_LOCK_GET(tmplock);
+ continue;
+ }
+
+ if (l->l_policy_data.l_extent.start >
+ tmplock->l_policy_data.l_extent.start)
+ continue;
+
+ LDLM_LOCK_PUT(l);
+ l = LDLM_LOCK_GET(tmplock);
+ }
+ l_unlock(&res->lr_namespace->ns_lock);
+
+ /* There were no PW locks beyond the size in the LVB; finished. */
+ if (l == NULL)
+ RETURN(ELDLM_LOCK_ABORTED);
+
+ LASSERT(l->l_glimpse_ast != NULL);
+ rc = l->l_glimpse_ast(l, NULL); /* this will update the LVB */
+ if (rc != 0 && res->lr_namespace->ns_lvbo &&
+ res->lr_namespace->ns_lvbo->lvbo_update) {
+ res->lr_namespace->ns_lvbo->lvbo_update(res, NULL, 0, 1);
+ }
+
+ down(&res->lr_lvb_sem);
+ reply_lvb->lvb_size = res_lvb->lvb_size;
+ up(&res->lr_lvb_sem);
+
+ LDLM_LOCK_PUT(l);
+
+ RETURN(ELDLM_LOCK_ABORTED);
+}
+
/* mount the file system (secretly) */
int filter_common_setup(struct obd_device *obd, obd_count len, void *buf,
char *option)
if (*lcfg->lcfg_inlbuf3 == 'f') {
obd->obd_replayable = 1;
obd_sync_filter = 1;
- CERROR("%s: recovery enabled\n", obd->obd_name);
+ CWARN("%s: recovery enabled\n", obd->obd_name);
} else {
if (*lcfg->lcfg_inlbuf3 != 'n') {
CERROR("unrecognised flag '%c'\n",
spin_lock_init(&filter->fo_objidlock);
INIT_LIST_HEAD(&filter->fo_export_list);
sema_init(&filter->fo_alloc_lock, 1);
+ spin_lock_init(&filter->fo_r_pages.oh_lock);
+ spin_lock_init(&filter->fo_w_pages.oh_lock);
+ spin_lock_init(&filter->fo_r_discont_pages.oh_lock);
+ spin_lock_init(&filter->fo_w_discont_pages.oh_lock);
+ spin_lock_init(&filter->fo_r_discont_blocks.oh_lock);
+ spin_lock_init(&filter->fo_w_discont_blocks.oh_lock);
+ filter->fo_readcache_max_filesize = FILTER_MAX_CACHE_SIZE;
obd->obd_namespace = ldlm_namespace_new("filter-tgt",
LDLM_NAMESPACE_SERVER);
if (obd->obd_namespace == NULL)
GOTO(err_post, rc = -ENOMEM);
+ obd->obd_namespace->ns_lvbp = obd;
+ obd->obd_namespace->ns_lvbo = &filter_lvbo;
+ ldlm_register_intent(obd->obd_namespace, filter_intent_policy);
ptlrpc_init_client(LDLM_CB_REQUEST_PORTAL, LDLM_CB_REPLY_PORTAL,
"filter_ldlm_cb_client", &obd->obd_ldlm_client);
+ rc = llog_cat_initialize(obd, 1);
+ if (rc) {
+ CERROR("failed to setup llogging subsystems\n");
+ GOTO(err_post, rc);
+ }
+
RETURN(0);
err_post:
return rc;
}
-static int filter_postsetup(struct obd_device *obd)
-{
- int rc = 0;
- ENTRY;
-
- // XXX add a storage location for the logid for size changes
-#ifdef ENABLE_ORPHANS
- rc = llog_cat_initialize(obd, 1);
- if (rc)
- CERROR("failed to setup llogging subsystems\n");
-#endif
- RETURN(rc);
-}
-
static int filter_cleanup(struct obd_device *obd, int flags)
{
struct filter_obd *filter = &obd->u.filter;
LPROCFS_CNTR_AVGMINMAX, "read_bytes", "bytes");
lprocfs_counter_init(obd->obd_stats, LPROC_FILTER_WRITE_BYTES,
LPROCFS_CNTR_AVGMINMAX, "write_bytes", "bytes");
- return rc;
+
+ return lproc_filter_attach_seqstat(obd);
}
static int filter_detach(struct obd_device *dev)
memcpy(fcd->fcd_uuid, cluuid, sizeof(fcd->fcd_uuid));
fed->fed_fcd = fcd;
- fcd->fcd_mount_count = cpu_to_le64(filter->fo_fsd->fsd_mount_count);
rc = filter_client_add(obd, filter, fed, -1);
int rc = 0;
ENTRY;
-#ifdef ENABLE_ORPHANS
rc = obd_llog_finish(obd, 0);
if (rc)
CERROR("failed to cleanup llogging subsystem\n");
-#endif
RETURN(rc);
}
+/* Do extra sanity checks for grant accounting. We do this at connect,
+ * disconnect, and statfs RPC time, so it shouldn't be too bad. We can
+ * always get rid of it or turn it off when we know accounting is good. */
+static void filter_grant_sanity_check(struct obd_device *obd, char *func)
+{
+ struct filter_export_data *fed;
+ struct obd_export *exp;
+ obd_size maxsize = obd->obd_osfs.os_blocks * obd->obd_osfs.os_bsize;
+ obd_size tot_dirty = 0, tot_pending = 0, tot_granted = 0;
+ obd_size fo_tot_dirty, fo_tot_pending, fo_tot_granted;
+
+ if (list_empty(&obd->obd_exports))
+ return;
+
+ spin_lock(&obd->obd_osfs_lock);
+ spin_lock(&obd->obd_dev_lock);
+ list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain) {
+ fed = &exp->exp_filter_data;
+ LASSERTF(fed->fed_grant + fed->fed_pending <= maxsize,
+ "cli %s/%p %lu+%lu > "LPU64"\n",
+ exp->exp_client_uuid.uuid, exp,
+ fed->fed_grant, fed->fed_pending, maxsize);
+ LASSERTF(fed->fed_dirty <= maxsize, "cli %s/%p %lu > "LPU64"\n",
+ exp->exp_client_uuid.uuid, exp,fed->fed_dirty,maxsize);
+ CDEBUG(D_CACHE,"%s: cli %s/%p dirty %lu pend %lu grant %lu\n",
+ obd->obd_name, exp->exp_client_uuid.uuid, exp,
+ fed->fed_dirty, fed->fed_pending, fed->fed_grant);
+ tot_granted += fed->fed_grant + fed->fed_pending;
+ tot_pending += fed->fed_pending;
+ tot_dirty += fed->fed_dirty;
+ }
+ fo_tot_granted = obd->u.filter.fo_tot_granted;
+ fo_tot_pending = obd->u.filter.fo_tot_pending;
+ fo_tot_dirty = obd->u.filter.fo_tot_dirty;
+ spin_unlock(&obd->obd_dev_lock);
+ spin_unlock(&obd->obd_osfs_lock);
+
+ /* Do these assertions outside the spinlocks so we don't kill system */
+ if (tot_granted != fo_tot_granted)
+ CERROR("%s: tot_granted "LPU64" != fo_tot_granted "LPU64"\n",
+ func, tot_granted, fo_tot_granted);
+ if (tot_pending != fo_tot_pending)
+ CERROR("%s: tot_pending "LPU64" != fo_tot_pending "LPU64"\n",
+ func, tot_pending, fo_tot_pending);
+ if (tot_dirty != fo_tot_dirty)
+ CERROR("%s: tot_dirty "LPU64" != fo_tot_dirty "LPU64"\n",
+ func, tot_dirty, fo_tot_dirty);
+ if (tot_pending > tot_granted)
+ CERROR("%s: tot_pending "LPU64" > tot_granted "LPU64"\n",
+ func, tot_pending, tot_granted);
+ if (tot_granted > maxsize)
+ CERROR("%s: tot_granted "LPU64" > maxsize "LPU64"\n",
+ func, tot_granted, maxsize);
+ if (tot_dirty > maxsize)
+ CERROR("%s: tot_dirty "LPU64" > maxsize "LPU64"\n",
+ func, tot_dirty, maxsize);
+}
+
+/* Remove this client from the grant accounting totals. We also remove
+ * the export from the obd device under the osfs and dev locks to ensure
+ * that the filter_grant_sanity_check() calculations are always valid.
+ * The client should do something similar when it invalidates its import. */
+static void filter_grant_discard(struct obd_export *exp)
+{
+ struct obd_device *obd = exp->exp_obd;
+ struct filter_obd *filter = &obd->u.filter;
+ struct filter_export_data *fed = &exp->exp_filter_data;
+
+ spin_lock(&obd->obd_osfs_lock);
+ spin_lock(&exp->exp_obd->obd_dev_lock);
+ list_del_init(&exp->exp_obd_chain);
+ spin_unlock(&exp->exp_obd->obd_dev_lock);
+
+ CDEBUG(D_CACHE, "%s: cli %s/%p dirty %lu pend %lu grant %lu\n",
+ obd->obd_name, exp->exp_client_uuid.uuid, exp,
+ fed->fed_dirty, fed->fed_pending, fed->fed_grant);
+
+ LASSERTF(filter->fo_tot_granted >= fed->fed_grant,
+ "%s: tot_granted "LPU64" cli %s/%p fed_grant %lu\n",
+ obd->obd_name, filter->fo_tot_granted,
+ exp->exp_client_uuid.uuid, exp, fed->fed_grant);
+ filter->fo_tot_granted -= fed->fed_grant;
+ LASSERTF(exp->exp_obd->u.filter.fo_tot_pending >= fed->fed_pending,
+ "%s: tot_pending "LPU64" cli %s/%p fed_pending %lu\n",
+ obd->obd_name, filter->fo_tot_pending,
+ exp->exp_client_uuid.uuid, exp, fed->fed_pending);
+ LASSERTF(filter->fo_tot_dirty >= fed->fed_dirty,
+ "%s: tot_dirty "LPU64" cli %s/%p fed_dirty %lu\n",
+ obd->obd_name, filter->fo_tot_dirty,
+ exp->exp_client_uuid.uuid, exp, fed->fed_dirty);
+ filter->fo_tot_dirty -= fed->fed_dirty;
+ fed->fed_dirty = 0;
+ fed->fed_grant = 0;
+
+ spin_unlock(&obd->obd_osfs_lock);
+}
+
static int filter_destroy_export(struct obd_export *exp)
{
ENTRY;
+ if (exp->exp_filter_data.fed_pending)
+ CERROR("%s: cli %s/%p has %lu pending on destroyed export\n",
+ exp->exp_obd->obd_name, exp->exp_client_uuid.uuid,
+ exp, exp->exp_filter_data.fed_pending);
+
target_destroy_export(exp);
if (exp->exp_obd->obd_replayable)
filter_client_free(exp, exp->exp_flags);
+
+ filter_grant_discard(exp);
+ if (!(exp->exp_flags & OBD_OPT_FORCE))
+ filter_grant_sanity_check(exp->exp_obd, __FUNCTION__);
+
RETURN(0);
}
/* also incredibly similar to mds_disconnect */
static int filter_disconnect(struct obd_export *exp, int flags)
{
+ struct obd_device *obd = exp->exp_obd;
unsigned long irqflags;
struct llog_ctxt *ctxt;
int rc;
ENTRY;
LASSERT(exp);
- ldlm_cancel_locks_for_export(exp);
+ class_export_get(exp);
spin_lock_irqsave(&exp->exp_lock, irqflags);
exp->exp_flags = flags;
spin_unlock_irqrestore(&exp->exp_lock, irqflags);
- fsfilt_sync(exp->exp_obd, exp->exp_obd->u.filter.fo_sb);
- /* XXX cleanup preallocated inodes */
+ if (!(flags & OBD_OPT_FORCE))
+ filter_grant_sanity_check(obd, __FUNCTION__);
+ filter_grant_discard(exp);
+
+ /* Disconnect early so that clients can't keep using export */
+ rc = class_disconnect(exp, flags);
+
+ ldlm_cancel_locks_for_export(exp);
+
+ fsfilt_sync(obd, obd->u.filter.fo_sb);
/* flush any remaining cancel messages out to the target */
- ctxt = llog_get_context(exp->exp_obd, LLOG_UNLINK_REPL_CTXT);
+ ctxt = llog_get_context(obd, LLOG_UNLINK_REPL_CTXT);
llog_sync(ctxt, exp);
- rc = class_disconnect(exp, flags);
+ class_export_put(exp);
RETURN(rc);
}
struct filter_obd *filter;
struct dentry *dentry;
struct iattr iattr;
+ struct ldlm_res_id res_id = { .name = { oa->o_id } };
+ struct ldlm_resource *res;
void *handle;
int rc, rc2;
ENTRY;
rc = rc2;
}
+ if (iattr.ia_valid & ATTR_SIZE) {
+ res = ldlm_resource_get(exp->exp_obd->obd_namespace, NULL,
+ res_id, LDLM_EXTENT, 0);
+ if (res == NULL) {
+ CERROR("!!! resource_get failed for object "LPU64" -- "
+ "filter_setattr with no lock?\n", oa->o_id);
+ } else {
+ if (res->lr_namespace->ns_lvbo &&
+ res->lr_namespace->ns_lvbo->lvbo_update) {
+ rc = res->lr_namespace->ns_lvbo->lvbo_update
+ (res, NULL, 0, 0);
+ }
+ ldlm_resource_putref(res);
+ }
+ }
+
oa->o_valid = OBD_MD_FLID;
obdo_from_inode(oa, dentry->d_inode, FILTER_VALID_FLAGS);
diff = oa->o_id - filter_last_id(filter, oa);
CDEBUG(D_INFO, "filter_last_id() = "LPU64" -> diff = %d\n",
filter_last_id(filter, oa), diff);
-
+
/* delete orphans request */
- if ((oa->o_valid & OBD_MD_FLFLAGS) &&
+ if ((oa->o_valid & OBD_MD_FLFLAGS) &&
(oa->o_flags & OBD_FL_DELORPHAN)) {
- LASSERT(diff <= 0);
- if (diff == 0)
- RETURN(0);
+ if (diff >= 0)
+ RETURN(diff);
+ if (-diff > 10000) { /* XXX make this smarter */
+ CERROR("ignoring bogus orphan destroy request: obdid "
+ LPU64" last_id "LPU64"\n",
+ oa->o_id, filter_last_id(filter, oa));
+ RETURN(-EINVAL);
+ }
filter_destroy_precreated(exp, oa, filter);
rc = filter_update_last_objid(obd, group, 0);
if (rc)
- CERROR("unable to write lastobjid, but orphans"
+ CERROR("unable to write lastobjid, but orphans"
"were deleted\n");
RETURN(0);
} else {
/* only precreate if group == 0 and o_id is specfied */
- if (!(oa->o_valid & OBD_FL_DELORPHAN) &&
+ if (!(oa->o_valid & OBD_FL_DELORPHAN) &&
(group != 0 || oa->o_id == 0))
RETURN(1);
LASSERT(diff >= 0);
RETURN(diff);
}
-
}
/* We rely on the fact that only one thread will be creating files in a given
static int filter_precreate(struct obd_device *obd, struct obdo *oa,
obd_gr group, int *num)
{
- struct lustre_handle parent_lockh;
struct dentry *dchild = NULL;
struct filter_obd *filter;
struct dentry *dparent;
- struct iattr attr;
int err = 0, rc = 0, i;
__u64 next_id;
- void *handle;
+ int recreate_obj = 0;
+ void *handle = NULL;
ENTRY;
filter = &obd->u.filter;
+ if ((oa->o_valid & OBD_MD_FLFLAGS) &&
+ (oa->o_flags & OBD_FL_RECREATE_OBJS)) {
+ recreate_obj = 1;
+ }
+
for (i = 0; i < *num && err == 0; i++) {
- next_id = filter_last_id(filter, NULL) + 1;
+ int cleanup_phase = 0;
+
+ if (recreate_obj) {
+ __u64 last_id;
+ next_id = oa->o_id;
+ last_id = filter_last_id(filter, NULL);
+ if (next_id > last_id) {
+ CERROR("Error: Trying to recreate obj greater"
+ "than last id "LPD64" > "LPD64"\n",
+ next_id, last_id);
+ RETURN(-EINVAL);
+ }
+ } else
+ next_id = filter_last_id(filter, NULL) + 1;
+
CDEBUG(D_INFO, "precreate objid "LPU64"\n", next_id);
- dparent = filter_parent_lock(obd, group, next_id, LCK_PW,
- &parent_lockh);
- if (IS_ERR(dparent)) {
- rc = PTR_ERR(dparent);
- break;
- }
+ dparent = filter_parent_lock(obd, group, next_id);
+ if (IS_ERR(dparent))
+ GOTO(cleanup, rc = PTR_ERR(dparent));
+ cleanup_phase = 1;
dchild = filter_fid2dentry(obd, dparent, group, next_id);
if (IS_ERR(dchild))
- GOTO(cleanup_lock, rc = PTR_ERR(dchild));
+ GOTO(cleanup, rc = PTR_ERR(dchild));
+ cleanup_phase = 2;
if (dchild->d_inode != NULL) {
/* This would only happen if lastobjid was bad on disk*/
- CERROR("Serious error: objid %*s already exists; is "
- "this filesystem corrupt?\n",
- dchild->d_name.len, dchild->d_name.name);
- GOTO(cleanup_dchild, rc = -EEXIST);
+ /* Could also happen if recreating missing obj but
+ * already exists
+ */
+ if (recreate_obj) {
+ CERROR("Serious error: recreating obj %*s but "
+ "obj already exists \n",
+ dchild->d_name.len, dchild->d_name.name);
+ } else {
+ CERROR("Serious error: objid %*s already "
+ "exists; is this filesystem corrupt?\n",
+ dchild->d_name.len, dchild->d_name.name);
+ }
+ GOTO(cleanup, rc = -EEXIST);
}
handle = fsfilt_start(obd, dparent->d_inode,
FSFILT_OP_CREATE_LOG, NULL);
if (IS_ERR(handle))
- GOTO(cleanup_dchild, rc = PTR_ERR(handle));
+ GOTO(cleanup, rc = PTR_ERR(handle));
+ cleanup_phase = 3;
rc = ll_vfs_create(dparent->d_inode, dchild, S_IFREG, NULL);
if (rc) {
CERROR("create failed rc = %d\n", rc);
- GOTO(cleanup_commit, rc);
- } else if (oa != NULL &&
- (oa->o_valid & (OBD_MD_FLCTIME | OBD_MD_FLMTIME |
- OBD_MD_FLSIZE))) {
- iattr_from_obdo(&attr, oa, oa->o_valid);
- rc = fsfilt_setattr(obd, dchild, handle, &attr, 1);
- if (rc)
- CERROR("create setattr failed rc = %d\n", rc);
+ GOTO(cleanup, rc);
}
- filter_set_last_id(filter, NULL, next_id);
- err = filter_update_last_objid(obd, group, 0);
- if (err)
- CERROR("unable to write lastobjid but file created\n");
- cleanup_commit:
- err = fsfilt_commit(obd, dparent->d_inode, handle, 0);
- if (err) {
- CERROR("error on commit, err = %d\n", err);
- if (!rc)
- rc = err;
+
+ if (!recreate_obj) {
+ filter_set_last_id(filter, NULL, next_id);
+ err = filter_update_last_objid(obd, group, 0);
+ if (err)
+ CERROR("unable to write lastobjid "
+ "but file created\n");
}
- if (dchild->d_inode != NULL && oa != NULL)
- obdo_from_inode(oa, dchild->d_inode,
- FILTER_VALID_FLAGS);
- cleanup_dchild:
- f_dput(dchild);
- cleanup_lock:
- filter_parent_unlock(dparent, &parent_lockh, LCK_PW);
+
+ cleanup:
+ switch(cleanup_phase) {
+ case 3:
+ err = fsfilt_commit(obd, dparent->d_inode, handle, 0);
+ if (err) {
+ CERROR("error on commit, err = %d\n", err);
+ if (!rc)
+ rc = err;
+ }
+ case 2:
+ f_dput(dchild);
+ case 1:
+ filter_parent_unlock(dparent);
+ case 0:
+ break;
+ }
+
if (rc)
break;
- oa = NULL; /* oa applies for first iteration only */
}
*num = i;
obd = exp->exp_obd;
push_ctxt(&saved, &obd->obd_ctxt, NULL);
- diff = filter_should_precreate(exp, oa, group);
- if (diff > 0) {
- oa->o_id = filter_last_id(&obd->u.filter, oa);
- rc = filter_precreate(obd, oa, group, &diff);
- oa->o_id += diff;
- oa->o_valid = OBD_MD_FLID;
+ if ((oa->o_valid & OBD_MD_FLFLAGS) &&
+ (oa->o_flags & OBD_FL_RECREATE_OBJS)) {
+ if (oa->o_id > filter_last_id(&obd->u.filter, oa)) {
+ CERROR("recreate objid "LPU64" > last id "LPU64"\n",
+ oa->o_id, filter_last_id(&obd->u.filter, oa));
+ rc = -EINVAL;
+ } else {
+ diff = 1;
+ rc = filter_precreate(obd, oa, group, &diff);
+ }
+ } else {
+ diff = filter_should_precreate(exp, oa, group);
+ if (diff > 0) {
+ oa->o_id = filter_last_id(&obd->u.filter, oa);
+ rc = filter_precreate(obd, oa, group, &diff);
+ oa->o_id += diff;
+ oa->o_valid = OBD_MD_FLID;
+ }
}
pop_ctxt(&saved, &obd->obd_ctxt, NULL);
struct dentry *dchild = NULL, *dparent = NULL;
struct obd_run_ctxt saved;
void *handle = NULL;
- struct lustre_handle parent_lockh;
struct llog_cookie *fcc = NULL;
int rc, rc2, cleanup_phase = 0, have_prepared = 0;
obd_gr group = 0;
push_ctxt(&saved, &obd->obd_ctxt, NULL);
acquire_locks:
- dparent = filter_parent_lock(obd, group, oa->o_id, LCK_PW,
- &parent_lockh);
+ dparent = filter_parent_lock(obd, group, oa->o_id);
if (IS_ERR(dparent))
GOTO(cleanup, rc = PTR_ERR(dparent));
cleanup_phase = 1;
cleanup_phase = 2;
if (dchild->d_inode == NULL) {
- CERROR("destroying non-existent object "LPU64"\n", oa->o_id);
+ CDEBUG(D_INODE, "destroying non-existent object "LPU64"\n",
+ oa->o_id);
GOTO(cleanup, rc = -ENOENT);
}
* complication of condition the above code to skip it on the
* second time through. */
f_dput(dchild);
- filter_parent_unlock(dparent, &parent_lockh, LCK_PW);
+ filter_parent_unlock(dparent);
filter_prepare_destroy(obd, oa->o_id);
have_prepared = 1;
cleanup:
switch(cleanup_phase) {
case 3:
- if (fcc != NULL)
- fsfilt_add_journal_cb(obd, 0, oti->oti_handle,
- filter_cancel_cookies_cb, fcc);
+ if (fcc != NULL) {
+ if (oti != NULL)
+ fsfilt_add_journal_cb(obd, 0, oti->oti_handle,
+ filter_cancel_cookies_cb,
+ fcc);
+ else
+ fsfilt_add_journal_cb(obd, 0, handle,
+ filter_cancel_cookies_cb,
+ fcc);
+ }
rc = filter_finish_transno(exp, oti, rc);
rc2 = fsfilt_commit(obd, dparent->d_inode, handle, 0);
if (rc2) {
case 2:
f_dput(dchild);
case 1:
- if (rc || oti == NULL) {
- filter_parent_unlock(dparent, &parent_lockh, LCK_PW);
- } else {
- memcpy(&oti->oti_ack_locks[0].lock, &parent_lockh,
- sizeof(parent_lockh));
- oti->oti_ack_locks[0].mode = LCK_PW;
- }
+ filter_parent_unlock(dparent);
case 0:
pop_ctxt(&saved, &obd->obd_ctxt, NULL);
break;
struct obd_run_ctxt saved;
struct filter_obd *filter;
struct dentry *dentry;
+ struct llog_ctxt *ctxt;
int rc, rc2;
ENTRY;
/* an objid of zero is taken to mean "sync whole filesystem" */
if (!oa || !(oa->o_valid & OBD_MD_FLID)) {
rc = fsfilt_sync(exp->exp_obd, filter->fo_sb);
+ /* flush any remaining cancel messages out to the target */
+ ctxt = llog_get_context(exp->exp_obd, LLOG_UNLINK_REPL_CTXT);
+ llog_sync(ctxt, exp);
RETURN(rc);
}
static int filter_statfs(struct obd_device *obd, struct obd_statfs *osfs,
unsigned long max_age)
{
+ struct filter_obd *filter = &obd->u.filter;
+ int blockbits = filter->fo_sb->s_blocksize_bits;
+ int rc;
ENTRY;
- RETURN(fsfilt_statfs(obd, obd->u.filter.fo_sb, osfs));
+
+ /* at least try to account for cached pages. its still racey and
+ * might be under-reporting if clients haven't announced their
+ * caches with brw recently */
+ spin_lock(&obd->obd_osfs_lock);
+ rc = fsfilt_statfs(obd, filter->fo_sb, max_age);
+ memcpy(osfs, &obd->obd_osfs, sizeof(*osfs));
+ spin_unlock(&obd->obd_osfs_lock);
+
+ CDEBUG(D_SUPER | D_CACHE, "blocks cached "LPU64" granted "LPU64
+ " pending "LPU64" free "LPU64" avail "LPU64"\n",
+ filter->fo_tot_dirty, filter->fo_tot_granted,
+ filter->fo_tot_pending,
+ osfs->os_bfree << blockbits, osfs->os_bavail << blockbits);
+
+ filter_grant_sanity_check(obd, __FUNCTION__);
+
+ osfs->os_bavail -= min(osfs->os_bavail,
+ (filter->fo_tot_dirty + filter->fo_tot_pending +
+ osfs->os_bsize -1) >> blockbits);
+
+ RETURN(rc);
}
static int filter_get_info(struct obd_export *exp, __u32 keylen,
{
struct obd_device *obd;
struct lustre_handle conn;
-#ifdef ENABLE_ORPHANS
struct llog_ctxt *ctxt;
-#endif
int rc = 0;
ENTRY;
CWARN("Received MDS connection ("LPX64")\n", conn.cookie);
memcpy(&obd->u.filter.fo_mdc_conn, &conn, sizeof(conn));
-#ifdef ENABLE_ORPHANS
ctxt = llog_get_context(obd, LLOG_UNLINK_REPL_CTXT);
rc = llog_receptor_accept(ctxt, exp->exp_imp_reverse);
-#endif
RETURN(rc);
}
BDEVNAME_DECLARE_STORAGE(tmp);
CERROR("setting device %s read-only\n",
ll_bdevname(sb, tmp));
-
+
handle = fsfilt_start(obd, inode, FSFILT_OP_MKNOD, NULL);
LASSERT(handle);
(void)fsfilt_commit(obd, inode, handle, 1);
}
case OBD_IOC_LLOG_CANCEL:
- case OBD_IOC_LLOG_REMOVE:
+ case OBD_IOC_LLOG_REMOVE:
case OBD_IOC_LLOG_INFO:
case OBD_IOC_LLOG_PRINT: {
/* FIXME to be finished */
RETURN(-EOPNOTSUPP);
/*
struct llog_ctxt *ctxt = NULL;
-
+
push_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_ctxt, NULL);
rc = llog_ioctl(ctxt, cmd, data);
pop_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_ctxt, NULL);
-
+
RETURN(rc);
*/
}
};
static int filter_llog_init(struct obd_device *obd, struct obd_device *tgt,
- int count, struct llog_logid *logid)
+ int count, struct llog_catid *logid)
{
struct llog_ctxt *ctxt;
int rc;
ENTRY;
-
+
filter_unlink_repl_logops = llog_client_ops;
filter_unlink_repl_logops.lop_cancel = llog_obd_repl_cancel;
filter_unlink_repl_logops.lop_connect = llog_repl_connect;
{
int rc;
ENTRY;
-
+
rc = llog_cleanup(llog_get_context(obd, LLOG_UNLINK_REPL_CTXT));
if (rc)
RETURN(rc);
o_get_info: filter_get_info,
o_set_info: filter_set_info,
o_setup: filter_setup,
- o_postsetup: filter_postsetup,
o_precleanup: filter_precleanup,
o_cleanup: filter_cleanup,
o_connect: filter_connect,