X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Fobdfilter%2Ffilter.c;h=4e634e47d9352b4f11aff696e75c6471250cde07;hp=18a247bf90b3dd5576135250acf1eb3c7ae6fbba;hb=2d617260aaa5f778ab8dcb006e2a827f4b8f8567;hpb=d0912bb3a0bf5a14002fb96047c3ea4ce1bfbc0e diff --git a/lustre/obdfilter/filter.c b/lustre/obdfilter/filter.c index 18a247b..4e634e4 100644 --- a/lustre/obdfilter/filter.c +++ b/lustre/obdfilter/filter.c @@ -1,6 +1,4 @@ -/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- - * vim:expandtab:shiftwidth=8:tabstop=8: - * +/* * GPL HEADER START * * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. @@ -28,6 +26,8 @@ /* * Copyright (c) 2001, 2010, Oracle and/or its affiliates. All rights reserved. * Use is subject to license terms. + * + * Copyright (c) 2011, 2012, Whamcloud, Inc. */ /* * This file is part of Lustre, http://www.lustre.org/ @@ -51,9 +51,6 @@ #define DEBUG_SUBSYSTEM S_FILTER -#ifndef AUTOCONF_INCLUDED -#include -#endif #include #include #include @@ -135,12 +132,12 @@ int filter_finish_transno(struct obd_export *exp, struct inode *inode, if (!exp->exp_obd->obd_replayable || oti == NULL) RETURN(rc); - cfs_mutex_down(&ted->ted_lcd_lock); + cfs_mutex_lock(&ted->ted_lcd_lock); lcd = ted->ted_lcd; /* if the export has already been disconnected, we have no last_rcvd slot, * update server data with latest transno then */ if (lcd == NULL) { - cfs_mutex_up(&ted->ted_lcd_lock); + cfs_mutex_unlock(&ted->ted_lcd_lock); CWARN("commit transaction for disconnected client %s: rc %d\n", exp->exp_client_uuid.uuid, rc); err = filter_update_server_data(exp->exp_obd); @@ -152,14 +149,25 @@ int filter_finish_transno(struct obd_export *exp, struct inode *inode, if (oti->oti_transno == 0) { last_rcvd = le64_to_cpu(lsd->lsd_last_transno) + 1; lsd->lsd_last_transno = cpu_to_le64(last_rcvd); + LASSERT(last_rcvd >= le64_to_cpu(lcd->lcd_last_transno)); } else { last_rcvd = oti->oti_transno; if (last_rcvd > le64_to_cpu(lsd->lsd_last_transno)) lsd->lsd_last_transno = cpu_to_le64(last_rcvd); + if (unlikely(last_rcvd < le64_to_cpu(lcd->lcd_last_transno))) { + CERROR("Trying to overwrite bigger transno, on-disk: " + LPU64", new: "LPU64"\n", + le64_to_cpu(lcd->lcd_last_transno), last_rcvd); + cfs_spin_lock(&exp->exp_lock); + exp->exp_vbr_failed = 1; + cfs_spin_unlock(&exp->exp_lock); + cfs_spin_unlock(&obt->obt_lut->lut_translock); + cfs_mutex_unlock(&ted->ted_lcd_lock); + RETURN(-EOVERFLOW); + } } oti->oti_transno = last_rcvd; - LASSERT(last_rcvd >= le64_to_cpu(lcd->lcd_last_transno)); lcd->lcd_last_transno = cpu_to_le64(last_rcvd); lcd->lcd_pre_versions[0] = cpu_to_le64(oti->oti_pre_version); lcd->lcd_last_xid = cpu_to_le64(oti->oti_xid); @@ -196,7 +204,7 @@ int filter_finish_transno(struct obd_export *exp, struct inode *inode, CDEBUG(log_pri, "wrote trans "LPU64" for client %s at #%d: err = %d\n", last_rcvd, lcd->lcd_uuid, ted->ted_lr_idx, err); - cfs_mutex_up(&ted->ted_lcd_lock); + cfs_mutex_unlock(&ted->ted_lcd_lock); RETURN(rc); } @@ -241,7 +249,6 @@ static int lprocfs_init_rw_stats(struct obd_device *obd, plus the procfs overhead :( */ static int filter_export_stats_init(struct obd_device *obd, struct obd_export *exp, - int reconnect, void *client_nid) { int rc, newnid = 0; @@ -251,7 +258,7 @@ static int filter_export_stats_init(struct obd_device *obd, /* Self-export gets no proc entry */ RETURN(0); - rc = lprocfs_exp_setup(exp, client_nid, reconnect, &newnid); + rc = lprocfs_exp_setup(exp, client_nid, &newnid); if (rc) { /* Mask error for already created * /proc entries */ @@ -342,7 +349,7 @@ static int filter_client_add(struct obd_device *obd, struct obd_export *exp, ted->ted_lr_idx = cl_idx; ted->ted_lr_off = le32_to_cpu(lsd->lsd_client_start) + cl_idx * le16_to_cpu(lsd->lsd_client_size); - cfs_init_mutex(&ted->ted_lcd_lock); + cfs_mutex_init(&ted->ted_lcd_lock); LASSERTF(ted->ted_lr_off > 0, "ted_lr_off = %llu\n", ted->ted_lr_off); CDEBUG(D_INFO, "client at index %d (%llu) with UUID '%s' added\n", @@ -443,12 +450,12 @@ static int filter_client_del(struct obd_export *exp) * be in server data or in client data in case of failure */ filter_update_server_data(exp->exp_obd); - cfs_mutex_down(&ted->ted_lcd_lock); + cfs_mutex_lock(&ted->ted_lcd_lock); memset(ted->ted_lcd->lcd_uuid, 0, sizeof ted->ted_lcd->lcd_uuid); rc = fsfilt_write_record(exp->exp_obd, obt->obt_rcvd_filp, ted->ted_lcd, sizeof(*ted->ted_lcd), &off, 0); - cfs_mutex_up(&ted->ted_lcd_lock); + cfs_mutex_unlock(&ted->ted_lcd_lock); pop_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL); CDEBUG(rc == 0 ? D_INFO : D_ERROR, @@ -635,17 +642,28 @@ static void filter_fmd_cleanup(struct obd_export *exp) static int filter_init_export(struct obd_export *exp) { int rc; + ENTRY; + cfs_spin_lock_init(&exp->exp_filter_data.fed_lock); CFS_INIT_LIST_HEAD(&exp->exp_filter_data.fed_mod_list); cfs_spin_lock(&exp->exp_lock); exp->exp_connecting = 1; cfs_spin_unlock(&exp->exp_lock); + + /* self-export doesn't need client data and ldlm initialization */ + if (unlikely(obd_uuid_equals(&exp->exp_obd->obd_uuid, + &exp->exp_client_uuid))) + RETURN(0); + rc = lut_client_alloc(exp); if (rc == 0) rc = ldlm_init_export(exp); + if (rc) + CERROR("%s: Can't initialize export: rc %d\n", + exp->exp_obd->obd_name, rc); - return rc; + RETURN(rc); } static int filter_free_server_data(struct obd_device_target *obt) @@ -869,7 +887,7 @@ static int filter_init_server_data(struct obd_device *obd, struct file * filp) fed = &exp->exp_filter_data; *fed->fed_ted.ted_lcd = *lcd; fed->fed_group = 0; /* will be assigned at connect */ - filter_export_stats_init(obd, exp, 0, NULL); + filter_export_stats_init(obd, exp, NULL); rc = filter_client_add(obd, exp, cl_idx); /* can't fail for existing client */ LASSERTF(rc == 0, "rc = %d\n", rc); @@ -891,6 +909,7 @@ static int filter_init_server_data(struct obd_device *obd, struct file * filp) obd->obd_last_committed = le64_to_cpu(lsd->lsd_last_transno); out: obd->u.obt.obt_mount_count = mount_count + 1; + obd->u.obt.obt_instance = (__u32)obd->u.obt.obt_mount_count; lsd->lsd_mount_count = cpu_to_le64(obd->u.obt.obt_mount_count); /* save it, so mount count and last_transno is current */ @@ -1174,14 +1193,14 @@ static int filter_read_groups(struct obd_device *obd, int last_group, struct filter_obd *filter = &obd->u.filter; int old_count, group, rc = 0; - cfs_down(&filter->fo_init_lock); + cfs_mutex_lock(&filter->fo_init_lock); old_count = filter->fo_group_count; for (group = old_count; group <= last_group; group++) { rc = filter_read_group_internal(obd, group, create); if (rc != 0) break; } - cfs_up(&filter->fo_init_lock); + cfs_mutex_unlock(&filter->fo_init_lock); return rc; } @@ -1197,10 +1216,10 @@ static int filter_prep_groups(struct obd_device *obd) O_dentry = simple_mkdir(cfs_fs_pwd(current->fs), obd->u.obt.obt_vfsmnt, "O", 0700, 1); - CDEBUG(D_INODE, "got/created O: %p\n", O_dentry); + CDEBUG(D_INODE, "%s: got/created O: %p\n", obd->obd_name, O_dentry); if (IS_ERR(O_dentry)) { rc = PTR_ERR(O_dentry); - CERROR("cannot open/create O: rc = %d\n", rc); + CERROR("%s: cannot open/create O: rc = %d\n", obd->obd_name,rc); GOTO(cleanup, rc); } filter->fo_dentry_O = O_dentry; @@ -1210,22 +1229,24 @@ static int filter_prep_groups(struct obd_device *obd) * clients because they may send create/destroy for any group -bzzz */ filp = filp_open("LAST_GROUP", O_CREAT | O_RDWR, 0700); if (IS_ERR(filp)) { - CERROR("cannot create LAST_GROUP: rc = %ld\n", PTR_ERR(filp)); + CERROR("%s: cannot create LAST_GROUP: rc = %ld\n", + obd->obd_name, PTR_ERR(filp)); GOTO(cleanup, rc = PTR_ERR(filp)); } cleanup_phase = 2; /* filp */ rc = fsfilt_read_record(obd, filp, &last_group, sizeof(__u32), &off); if (rc) { - CDEBUG(D_INODE, "error reading LAST_GROUP: rc %d\n", rc); + CERROR("%s: error reading LAST_GROUP: rc %d\n", + obd->obd_name, rc); GOTO(cleanup, rc); } if (off == 0) last_group = FID_SEQ_OST_MDT0; - CWARN("%s: initialize groups [%d,%d]\n", obd->obd_name, - FID_SEQ_OST_MDT0, last_group); + CDEBUG(D_INODE, "%s: initialize group %u (max %u)\n", obd->obd_name, + FID_SEQ_OST_MDT0, last_group); filter->fo_committed_group = last_group; rc = filter_read_groups(obd, last_group, 1); if (rc) @@ -1397,8 +1418,8 @@ obd_id filter_last_id(struct filter_obd *filter, obd_seq group) static int filter_lock_dentry(struct obd_device *obd, struct dentry *dparent) { - LOCK_INODE_MUTEX_PARENT(dparent->d_inode); - return 0; + mutex_lock_nested(&dparent->d_inode->i_mutex, I_MUTEX_PARENT); + return 0; } /* We never dget the object parent, so DON'T dput it either */ @@ -1408,7 +1429,7 @@ struct dentry *filter_parent(struct obd_device *obd, obd_seq group, obd_id objid struct filter_subdirs *subdirs; if (group >= filter->fo_group_count) /* FIXME: object groups */ - return ERR_PTR(-EBADF); + return ERR_PTR(-EBADF); if (!fid_seq_is_mdt(group) || filter->fo_subdir_count == 0) return filter->fo_dentry_O_groups[group]; @@ -1438,7 +1459,7 @@ struct dentry *filter_parent_lock(struct obd_device *obd, obd_seq group, /* We never dget the object parent, so DON'T dput it either */ static void filter_parent_unlock(struct dentry *dparent) { - UNLOCK_INODE_MUTEX(dparent->d_inode); + mutex_unlock(&dparent->d_inode->i_mutex); } /* How to get files, dentries, inodes from object id's. @@ -1462,7 +1483,7 @@ struct dentry *filter_fid2dentry(struct obd_device *obd, obd->u.filter.fo_destroys_in_progress == 0) { /* don't fail lookups for orphan recovery, it causes * later LBUGs when objects still exist during precreate */ - CDEBUG(D_INFO, "*** obd_fail_loc=%x ***\n",OBD_FAIL_OST_ENOENT); + CDEBUG(D_INFO, "*** cfs_fail_loc=%x ***\n",OBD_FAIL_OST_ENOENT); RETURN(ERR_PTR(-ENOENT)); } if (id == 0) { @@ -1547,9 +1568,9 @@ int filter_vfs_unlink(struct inode *dir, struct dentry *dentry, /* don't need dir->i_zombie for 2.4, it is for rename/unlink of dir * itself we already hold dir->i_mutex for child create/unlink ops */ - LASSERT(dentry->d_inode != NULL); - LASSERT(TRYLOCK_INODE_MUTEX(dir) == 0); - LASSERT(TRYLOCK_INODE_MUTEX(dentry->d_inode) == 0); + LASSERT(dentry->d_inode != NULL); + LASSERT(mutex_trylock(&dir->i_mutex) == 0); + LASSERT(mutex_trylock(&dentry->d_inode->i_mutex) == 0); /* may_delete() */ @@ -1569,9 +1590,7 @@ int filter_vfs_unlink(struct inode *dir, struct dentry *dentry, IS_IMMUTABLE(dentry->d_inode)) GOTO(out, rc = -EPERM); - /* NOTE: This might need to go outside i_mutex, though it isn't clear if - * that was done because of journal_start (which is already done - * here) or some other ordering issue. */ + /* Locking order: i_mutex -> journal_lock -> dqptr_sem. LU-952 */ ll_vfs_dq_init(dir); rc = ll_security_inode_unlink(dir, dentry, mnt); @@ -1580,12 +1599,12 @@ int filter_vfs_unlink(struct inode *dir, struct dentry *dentry, rc = dir->i_op->unlink(dir, dentry); out: - /* need to drop i_mutex before we lose inode reference */ - UNLOCK_INODE_MUTEX(dentry->d_inode); - if (rc == 0) - d_delete(dentry); + /* need to drop i_mutex before we lose inode reference */ + mutex_unlock(&dentry->d_inode->i_mutex); + if (rc == 0) + d_delete(dentry); - RETURN(rc); + RETURN(rc); } /* Caller must hold LCK_PW on parent and push us into kernel context. @@ -1666,7 +1685,6 @@ static int filter_intent_policy(struct ldlm_namespace *ns, struct ldlm_lock **lockp, void *req_cookie, ldlm_mode_t mode, int flags, void *data) { - CFS_LIST_HEAD(rpc_list); struct ptlrpc_request *req = req_cookie; struct ldlm_lock *lock = *lockp, *l = NULL; struct ldlm_resource *res = lock->l_resource; @@ -1705,24 +1723,18 @@ static int filter_intent_policy(struct ldlm_namespace *ns, * lock, and should not be granted if the lock will be blocked. */ + if (flags & LDLM_FL_BLOCK_NOWAIT) { + OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_AGL_DELAY, 5); + + if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_AGL_NOLOCK)) + RETURN(ELDLM_LOCK_ABORTED); + } + LASSERT(ns == ldlm_res_to_ns(res)); lock_res(res); - rc = policy(lock, &tmpflags, 0, &err, &rpc_list); + rc = policy(lock, &tmpflags, 0, &err, NULL); check_res_locked(res); - /* FIXME: we should change the policy function slightly, to not make - * this list at all, since we just turn around and free it */ - while (!cfs_list_empty(&rpc_list)) { - struct ldlm_lock *wlock = - cfs_list_entry(rpc_list.next, struct ldlm_lock, - l_cp_ast); - LASSERT((lock->l_flags & LDLM_FL_AST_SENT) == 0); - LASSERT(lock->l_flags & LDLM_FL_CP_REQD); - lock->l_flags &= ~LDLM_FL_CP_REQD; - cfs_list_del_init(&wlock->l_cp_ast); - LDLM_LOCK_RELEASE(wlock); - } - /* The lock met with no resistance; we're finished. */ if (rc == LDLM_ITER_CONTINUE) { /* do not grant locks to the liblustre clients: they cannot @@ -1739,6 +1751,12 @@ static int filter_intent_policy(struct ldlm_namespace *ns, } unlock_res(res); RETURN(err); + } else if (flags & LDLM_FL_BLOCK_NOWAIT) { + /* LDLM_FL_BLOCK_NOWAIT means it is for AGL. Do not send glimpse + * callback for glimpse size. The real size user will trigger + * the glimpse callback when necessary. */ + unlock_res(res); + RETURN(ELDLM_LOCK_ABORTED); } /* Do not grant any lock, but instead send GL callbacks. The extent @@ -1754,7 +1772,7 @@ static int filter_intent_policy(struct ldlm_namespace *ns, * therefore, that res->lr_lvb_data cannot increase beyond the * end of already granted lock. As a result, it is safe to * check against "stale" reply_lvb->lvb_size value without - * res->lr_lvb_sem. + * res->lr_lvb_mutex. */ arg.size = reply_lvb->lvb_size; arg.victim = &l; @@ -1827,15 +1845,16 @@ static int filter_intent_policy(struct ldlm_namespace *ns, * at the OST layer there are only (potentially) multiple obd_device of type * unknown at the time of OST thread creation. * - * Instead array of iobuf's is attached to struct filter_obd (->fo_iobuf_pool - * field). This array has size OST_MAX_THREADS, so that each OST thread uses - * it's very own iobuf. + * We create a cfs_hash for struct filter_obd (->fo_iobuf_hash field) on + * initializing, each OST thread will create it's own iobuf on the first + * access and insert it into ->fo_iobuf_hash with thread ID as key, + * so the iobuf can be found again by thread ID. * * Functions below * - * filter_kiobuf_pool_init() + * filter_iobuf_pool_init() * - * filter_kiobuf_pool_done() + * filter_iobuf_pool_done() * * filter_iobuf_get() * @@ -1848,21 +1867,13 @@ static int filter_intent_policy(struct ldlm_namespace *ns, */ static void filter_iobuf_pool_done(struct filter_obd *filter) { - struct filter_iobuf **pool; - int i; + ENTRY; - ENTRY; - - pool = filter->fo_iobuf_pool; - if (pool != NULL) { - for (i = 0; i < filter->fo_iobuf_count; ++ i) { - if (pool[i] != NULL) - filter_free_iobuf(pool[i]); - } - OBD_FREE(pool, filter->fo_iobuf_count * sizeof pool[0]); - filter->fo_iobuf_pool = NULL; - } - EXIT; + if (filter->fo_iobuf_hash != NULL) { + cfs_hash_putref(filter->fo_iobuf_hash); + filter->fo_iobuf_hash = NULL; + } + EXIT; } static int filter_adapt_sptlrpc_conf(struct obd_device *obd, int initial) @@ -1889,50 +1900,126 @@ static int filter_adapt_sptlrpc_conf(struct obd_device *obd, int initial) return 0; } -/* - * pre-allocate pool of iobuf's to be used by filter_{prep,commit}rw_write(). - */ -static int filter_iobuf_pool_init(struct filter_obd *filter) +static unsigned +filter_iobuf_hop_hash(cfs_hash_t *hs, const void *key, unsigned mask) { - void **pool; + __u64 val = *((__u64 *)key); - ENTRY; + return cfs_hash_long(val, hs->hs_cur_bits); +} +static void * +filter_iobuf_hop_key(cfs_hlist_node_t *hnode) +{ + struct filter_iobuf *pool; - OBD_ALLOC_GFP(filter->fo_iobuf_pool, OSS_THREADS_MAX * sizeof(*pool), - GFP_KERNEL); - if (filter->fo_iobuf_pool == NULL) - RETURN(-ENOMEM); + pool = cfs_hlist_entry(hnode, struct filter_iobuf, dr_hlist); + return &pool->dr_hkey; +} - filter->fo_iobuf_count = OSS_THREADS_MAX; +static int +filter_iobuf_hop_keycmp(const void *key, cfs_hlist_node_t *hnode) +{ + struct filter_iobuf *pool; - RETURN(0); + pool = cfs_hlist_entry(hnode, struct filter_iobuf, dr_hlist); + return pool->dr_hkey == *((__u64 *)key); } -/* Return iobuf allocated for @thread_id. We don't know in advance how - * many threads there will be so we allocate a large empty array and only - * fill in those slots that are actually in use. - * If we haven't allocated a pool entry for this thread before, do so now. */ -void *filter_iobuf_get(struct filter_obd *filter, struct obd_trans_info *oti) +static void * +filter_iobuf_hop_object(cfs_hlist_node_t *hnode) { - int thread_id = (oti && oti->oti_thread) ? - oti->oti_thread->t_id : -1; - struct filter_iobuf *pool = NULL; - struct filter_iobuf **pool_place = NULL; + return cfs_hlist_entry(hnode, struct filter_iobuf, dr_hlist); +} - if (thread_id >= 0) { - LASSERT(thread_id < filter->fo_iobuf_count); - pool = *(pool_place = &filter->fo_iobuf_pool[thread_id]); - } +static void +filter_iobuf_hop_get(cfs_hash_t *hs, cfs_hlist_node_t *hnode) +{ + /* dummy, required by cfs_hash */ +} - if (unlikely(pool == NULL)) { - pool = filter_alloc_iobuf(filter, OBD_BRW_WRITE, - PTLRPC_MAX_BRW_PAGES); - if (pool_place != NULL) - *pool_place = pool; - } +static void +filter_iobuf_hop_put_locked(cfs_hash_t *hs, cfs_hlist_node_t *hnode) +{ + /* dummy, required by cfs_hash */ +} - return pool; +static void +filter_iobuf_hop_exit(cfs_hash_t *hs, cfs_hlist_node_t *hnode) +{ + struct filter_iobuf *pool; + + pool = cfs_hlist_entry(hnode, struct filter_iobuf, dr_hlist); + filter_free_iobuf(pool); +} + +static struct cfs_hash_ops filter_iobuf_hops = { + .hs_hash = filter_iobuf_hop_hash, + .hs_key = filter_iobuf_hop_key, + .hs_keycmp = filter_iobuf_hop_keycmp, + .hs_object = filter_iobuf_hop_object, + .hs_get = filter_iobuf_hop_get, + .hs_put_locked = filter_iobuf_hop_put_locked, + .hs_exit = filter_iobuf_hop_exit +}; + +#define FILTER_IOBUF_HASH_BITS 9 +#define FILTER_IOBUF_HBKT_BITS 4 + +/* + * pre-allocate pool of iobuf's to be used by filter_{prep,commit}rw_write(). + */ +static int filter_iobuf_pool_init(struct filter_obd *filter) +{ + filter->fo_iobuf_hash = cfs_hash_create("filter_iobuf", + FILTER_IOBUF_HASH_BITS, + FILTER_IOBUF_HASH_BITS, + FILTER_IOBUF_HBKT_BITS, 0, + CFS_HASH_MIN_THETA, + CFS_HASH_MAX_THETA, + &filter_iobuf_hops, + CFS_HASH_RW_BKTLOCK | + CFS_HASH_NO_ITEMREF); + + return filter->fo_iobuf_hash != NULL ? 0 : -ENOMEM; +} + +/* Return iobuf allocated for @thread_id. + * If we haven't allocated a pool entry for this thread before, do so now and + * insert it into fo_iobuf_hash, otherwise we can find it from fo_iobuf_hash */ +void *filter_iobuf_get(struct filter_obd *filter, struct obd_trans_info *oti) +{ + struct filter_iobuf *pool = NULL; + __u64 key = 0; + int thread_id; + int rc; + + thread_id = (oti && oti->oti_thread) ? oti->oti_thread->t_id : -1; + if (thread_id >= 0) { + struct ptlrpc_service_part *svcpt; + + svcpt = oti->oti_thread->t_svcpt; + LASSERT(svcpt != NULL); + + key = (__u64)(svcpt->scp_cpt) << 32 | thread_id; + pool = cfs_hash_lookup(filter->fo_iobuf_hash, &key); + if (pool != NULL) + return pool; + } + + pool = filter_alloc_iobuf(filter, OBD_BRW_WRITE, PTLRPC_MAX_BRW_PAGES); + if (pool == NULL) + return NULL; + + if (thread_id >= 0) { + pool->dr_hkey = key; + rc = cfs_hash_add_unique(filter->fo_iobuf_hash, + &key, &pool->dr_hlist); + /* ptlrpc service thould guarantee thread ID is unique */ + LASSERT(rc != -EALREADY); + } + + return pool; } /* mount the file system (secretly). lustre_cfg parameters are: @@ -1946,6 +2033,7 @@ int filter_common_setup(struct obd_device *obd, struct lustre_cfg* lcfg, { struct filter_obd *filter = &obd->u.filter; struct vfsmount *mnt; + struct file_system_type *type; struct lustre_mount_info *lmi; struct obd_uuid uuid; __u8 *uuid_ptr; @@ -1967,20 +2055,17 @@ int filter_common_setup(struct obd_device *obd, struct lustre_cfg* lcfg, struct lustre_sb_info *lsi = s2lsi(lmi->lmi_sb); mnt = lmi->lmi_mnt; obd->obd_fsops = fsfilt_get_ops(MT_STR(lsi->lsi_ldd)); - - /* gets recovery timeouts from mount data */ - if (lsi->lsi_lmd && lsi->lsi_lmd->lmd_recovery_time_soft) - obd->obd_recovery_timeout = - lsi->lsi_lmd->lmd_recovery_time_soft; - if (lsi->lsi_lmd && lsi->lsi_lmd->lmd_recovery_time_hard) - obd->obd_recovery_time_hard = - lsi->lsi_lmd->lmd_recovery_time_hard; } else { /* old path - used by lctl */ CERROR("Using old MDS mount method\n"); - mnt = ll_kern_mount(lustre_cfg_string(lcfg, 2), - MS_NOATIME|MS_NODIRATIME, - lustre_cfg_string(lcfg, 1), option); + type = get_fs_type(lustre_cfg_string(lcfg, 2)); + if (!type) { + CERROR("get_fs_type failed\n"); + RETURN(-ENODEV); + } + mnt = vfs_kern_mount(type, MS_NOATIME|MS_NODIRATIME, + lustre_cfg_string(lcfg, 1), option); + cfs_module_put(type->owner); if (IS_ERR(mnt)) { rc = PTR_ERR(mnt); LCONSOLE_ERROR_MSG(0x135, "Can't mount disk %s (%d)\n", @@ -2006,6 +2091,9 @@ int filter_common_setup(struct obd_device *obd, struct lustre_cfg* lcfg, /* failover is the default */ obd->obd_replayable = 1; + /* disable connection until configuration finishes */ + obd->obd_no_conn = 1; + if (lcfg->lcfg_bufcount > 3 && LUSTRE_CFG_BUFLEN(lcfg, 3) > 0) { str = lustre_cfg_string(lcfg, 3); if (strchr(str, 'n')) { @@ -2014,13 +2102,15 @@ int filter_common_setup(struct obd_device *obd, struct lustre_cfg* lcfg, } } + obd->u.obt.obt_magic = OBT_MAGIC; obd->u.obt.obt_vfsmnt = mnt; obd->u.obt.obt_sb = mnt->mnt_sb; - obd->u.obt.obt_magic = OBT_MAGIC; filter->fo_fstype = mnt->mnt_sb->s_type->name; CDEBUG(D_SUPER, "%s: mnt = %p\n", filter->fo_fstype, mnt); - fsfilt_setup(obd, obd->u.obt.obt_sb); + rc = fsfilt_setup(obd, obd->u.obt.obt_sb); + if (rc) + GOTO(err_ops, rc); OBD_SET_CTXT_MAGIC(&obd->obd_lvfs_ctxt); obd->obd_lvfs_ctxt.pwdmnt = mnt; @@ -2028,15 +2118,15 @@ int filter_common_setup(struct obd_device *obd, struct lustre_cfg* lcfg, obd->obd_lvfs_ctxt.fs = get_ds(); obd->obd_lvfs_ctxt.cb_ops = filter_lvfs_ops; - cfs_init_mutex(&filter->fo_init_lock); + cfs_mutex_init(&filter->fo_init_lock); filter->fo_committed_group = 0; filter->fo_destroys_in_progress = 0; for (i = 0; i < 32; i++) - cfs_sema_init(&filter->fo_create_locks[i], 1); + cfs_mutex_init(&filter->fo_create_locks[i]); cfs_spin_lock_init(&filter->fo_objidlock); CFS_INIT_LIST_HEAD(&filter->fo_export_list); - cfs_sema_init(&filter->fo_alloc_lock, 1); + cfs_mutex_init(&filter->fo_alloc_lock); init_brw_stats(&filter->fo_filter_stats); cfs_spin_lock_init(&filter->fo_flags_lock); filter->fo_read_cache = 1; /* enable read-only cache by default */ @@ -2113,17 +2203,6 @@ int filter_common_setup(struct obd_device *obd, struct lustre_cfg* lcfg, lmi ? s2lsi(lmi->lmi_sb)->lsi_lmd->lmd_dev : "", obd->obd_replayable ? "enabled" : "disabled"); - if (obd->obd_recovering) - LCONSOLE_WARN("%s: Will be in recovery for at least %d:%.02d, " - "or until %d client%s reconnect%s\n", - obd->obd_name, - obd->obd_recovery_timeout / 60, - obd->obd_recovery_timeout % 60, - obd->obd_max_recoverable_clients, - (obd->obd_max_recoverable_clients == 1) ? "" : "s", - (obd->obd_max_recoverable_clients == 1) ? "s": ""); - - RETURN(0); err_post: @@ -2140,9 +2219,11 @@ err_mntput: static int filter_setup(struct obd_device *obd, struct lustre_cfg* lcfg) { struct lprocfs_static_vars lvars; + cfs_proc_dir_entry_t *entry; unsigned long addr; struct page *page; int rc; + ENTRY; CLASSERT(offsetof(struct obd_device, u.obt) == offsetof(struct obd_device, u.filter.fo_obt)); @@ -2150,69 +2231,96 @@ static int filter_setup(struct obd_device *obd, struct lustre_cfg* lcfg) if (!LUSTRE_CFG_BUFLEN(lcfg, 1) || !LUSTRE_CFG_BUFLEN(lcfg, 2)) RETURN(-EINVAL); - /* 2.6.9 selinux wants a full option page for do_kern_mount (bug6471) */ - OBD_PAGE_ALLOC(page, CFS_ALLOC_STD); - if (!page) - RETURN(-ENOMEM); - addr = (unsigned long)cfs_page_address(page); - clear_page((void *)addr); - /* lprocfs must be setup before the filter so state can be safely added * to /proc incrementally as the filter is setup */ lprocfs_filter_init_vars(&lvars); - if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0 && - lprocfs_alloc_obd_stats(obd, LPROC_FILTER_LAST) == 0) { - /* Init obdfilter private stats here */ - lprocfs_counter_init(obd->obd_stats, LPROC_FILTER_READ_BYTES, - LPROCFS_CNTR_AVGMINMAX, - "read_bytes", "bytes"); - lprocfs_counter_init(obd->obd_stats, LPROC_FILTER_WRITE_BYTES, - LPROCFS_CNTR_AVGMINMAX, - "write_bytes", "bytes"); - lprocfs_counter_init(obd->obd_stats, LPROC_FILTER_GET_PAGE, - LPROCFS_CNTR_AVGMINMAX|LPROCFS_CNTR_STDDEV, - "get_page", "usec"); - lprocfs_counter_init(obd->obd_stats, LPROC_FILTER_NO_PAGE, - LPROCFS_CNTR_AVGMINMAX, - "get_page_failures", "num"); - lprocfs_counter_init(obd->obd_stats, LPROC_FILTER_CACHE_ACCESS, - LPROCFS_CNTR_AVGMINMAX, - "cache_access", "pages"); - lprocfs_counter_init(obd->obd_stats, LPROC_FILTER_CACHE_HIT, - LPROCFS_CNTR_AVGMINMAX, - "cache_hit", "pages"); - lprocfs_counter_init(obd->obd_stats, LPROC_FILTER_CACHE_MISS, - LPROCFS_CNTR_AVGMINMAX, - "cache_miss", "pages"); - - lproc_filter_attach_seqstat(obd); - obd->obd_proc_exports_entry = lprocfs_register("exports", - obd->obd_proc_entry, - NULL, NULL); - if (IS_ERR(obd->obd_proc_exports_entry)) { - rc = PTR_ERR(obd->obd_proc_exports_entry); - CERROR("error %d setting up lprocfs for %s\n", - rc, "exports"); - obd->obd_proc_exports_entry = NULL; - } + rc = lprocfs_obd_setup(obd, lvars.obd_vars); + if (rc) { + CERROR("%s: lprocfs_obd_setup failed: %d.\n", + obd->obd_name, rc); + RETURN(rc); } - if (obd->obd_proc_exports_entry) - lprocfs_add_simple(obd->obd_proc_exports_entry, "clear", + + rc = lprocfs_alloc_obd_stats(obd, LPROC_FILTER_LAST); + if (rc) { + CERROR("%s: lprocfs_alloc_obd_stats failed: %d.\n", + obd->obd_name, rc); + GOTO(obd_cleanup, rc); + } + + /* Init obdfilter private stats here */ + lprocfs_counter_init(obd->obd_stats, LPROC_FILTER_READ_BYTES, + LPROCFS_CNTR_AVGMINMAX, "read_bytes", "bytes"); + lprocfs_counter_init(obd->obd_stats, LPROC_FILTER_WRITE_BYTES, + LPROCFS_CNTR_AVGMINMAX, "write_bytes", "bytes"); + lprocfs_counter_init(obd->obd_stats, LPROC_FILTER_GET_PAGE, + LPROCFS_CNTR_AVGMINMAX|LPROCFS_CNTR_STDDEV, + "get_page", "usec"); + lprocfs_counter_init(obd->obd_stats, LPROC_FILTER_NO_PAGE, + LPROCFS_CNTR_AVGMINMAX, "get_page_failures", "num"); + lprocfs_counter_init(obd->obd_stats, LPROC_FILTER_CACHE_ACCESS, + LPROCFS_CNTR_AVGMINMAX, "cache_access", "pages"); + lprocfs_counter_init(obd->obd_stats, LPROC_FILTER_CACHE_HIT, + LPROCFS_CNTR_AVGMINMAX, "cache_hit", "pages"); + lprocfs_counter_init(obd->obd_stats, LPROC_FILTER_CACHE_MISS, + LPROCFS_CNTR_AVGMINMAX, "cache_miss", "pages"); + + rc = lproc_filter_attach_seqstat(obd); + if (rc) { + CERROR("%s: create seqstat failed: %d.\n", obd->obd_name, rc); + GOTO(free_obd_stats, rc); + } + + entry = lprocfs_register("exports", obd->obd_proc_entry, NULL, NULL); + if (IS_ERR(entry)) { + rc = PTR_ERR(entry); + CERROR("%s: error %d setting up lprocfs for %s\n", + obd->obd_name, rc, "exports"); + GOTO(free_obd_stats, rc); + } + obd->obd_proc_exports_entry = entry; + + entry = lprocfs_add_simple(obd->obd_proc_exports_entry, "clear", lprocfs_nid_stats_clear_read, lprocfs_nid_stats_clear_write, obd, NULL); + if (IS_ERR(entry)) { + rc = PTR_ERR(entry); + CERROR("%s: add proc entry 'clear' failed: %d.\n", + obd->obd_name, rc); + GOTO(free_obd_stats, rc); + } + + rc = lprocfs_job_stats_init(obd, LPROC_FILTER_STATS_LAST, + filter_stats_counter_init); + if (rc) + GOTO(remove_entry_clear, rc); + /* 2.6.9 selinux wants a full option page for do_kern_mount (bug6471) */ + OBD_PAGE_ALLOC(page, CFS_ALLOC_STD); + if (!page) + GOTO(job_stats_fini, rc = -ENOMEM); + addr = (unsigned long)cfs_page_address(page); + clear_page((void *)addr); memcpy((void *)addr, lustre_cfg_buf(lcfg, 4), LUSTRE_CFG_BUFLEN(lcfg, 4)); rc = filter_common_setup(obd, lcfg, (void *)addr); OBD_PAGE_FREE(page); - if (rc) { - lprocfs_remove_proc_entry("clear", obd->obd_proc_exports_entry); - lprocfs_free_per_client_stats(obd); - lprocfs_free_obd_stats(obd); - lprocfs_obd_cleanup(obd); + CERROR("%s: filter_common_setup failed: %d.\n", + obd->obd_name, rc); + GOTO(job_stats_fini, rc); } + RETURN(0); + +job_stats_fini: + lprocfs_job_stats_fini(obd); +remove_entry_clear: + lprocfs_remove_proc_entry("clear", obd->obd_proc_exports_entry); +free_obd_stats: + lprocfs_free_obd_stats(obd); +obd_cleanup: + lprocfs_obd_cleanup(obd); return rc; } @@ -2366,28 +2474,28 @@ static int filter_llog_finish(struct obd_device *obd, int count) * We actually do sync in disconnect time, but disconnect * may not come being marked rq_no_resend = 1. */ - llog_sync(ctxt, NULL); + llog_sync(ctxt, NULL, OBD_LLOG_FL_EXIT); /* * Balance class_import_get() in llog_receptor_accept(). * This is safe to do, as llog is already synchronized * and its import may go. */ - cfs_mutex_down(&ctxt->loc_sem); + cfs_mutex_lock(&ctxt->loc_mutex); if (ctxt->loc_imp) { class_import_put(ctxt->loc_imp); ctxt->loc_imp = NULL; } - cfs_mutex_up(&ctxt->loc_sem); + + if (filter->fo_lcm) { + llog_recov_thread_fini(filter->fo_lcm, obd->obd_force); + filter->fo_lcm = NULL; + } + + cfs_mutex_unlock(&ctxt->loc_mutex); llog_ctxt_put(ctxt); } - if (filter->fo_lcm) { - cfs_mutex_down(&ctxt->loc_sem); - llog_recov_thread_fini(filter->fo_lcm, obd->obd_force); - filter->fo_lcm = NULL; - cfs_mutex_up(&ctxt->loc_sem); - } RETURN(filter_olg_fini(&obd->obd_olg)); } @@ -2433,7 +2541,7 @@ struct obd_llog_group *filter_find_olg(struct obd_device *obd, int group) */ struct obd_llog_group *filter_find_create_olg(struct obd_device *obd, int group) { - struct obd_llog_group *olg = NULL; + struct obd_llog_group *olg = NULL, *olg_new = NULL; struct filter_obd *filter; int rc; @@ -2442,6 +2550,10 @@ struct obd_llog_group *filter_find_create_olg(struct obd_device *obd, int group) if (group == FID_SEQ_LLOG) RETURN(&obd->obd_olg); + OBD_ALLOC_PTR(olg_new); + if (olg_new == NULL) + RETURN(ERR_PTR(-ENOMEM)); + cfs_spin_lock(&filter->fo_llog_list_lock); olg = filter_find_olg_internal(filter, group); if (olg) { @@ -2450,10 +2562,11 @@ struct obd_llog_group *filter_find_create_olg(struct obd_device *obd, int group) } else { GOTO(out_unlock, olg); } + } else { + /* set as the newly allocated one */ + olg = olg_new; + olg_new = NULL; } - OBD_ALLOC_PTR(olg); - if (olg == NULL) - GOTO(out_unlock, olg = ERR_PTR(-ENOMEM)); llog_group_init(olg, group); cfs_list_add(&olg->olg_list, &filter->fo_llog_list); @@ -2478,7 +2591,9 @@ out: out_unlock: cfs_spin_unlock(&filter->fo_llog_list_lock); - GOTO(out, olg); + if (olg_new) + OBD_FREE_PTR(olg_new); + goto out; } static int filter_llog_connect(struct obd_export *exp, @@ -2506,9 +2621,9 @@ static int filter_llog_connect(struct obd_export *exp, LASSERTF(ctxt != NULL, "ctxt is not null, ctxt idx %d \n", body->lgdc_ctxt_idx); - CWARN("%s: Recovery from log "LPX64"/"LPX64":%x\n", - obd->obd_name, body->lgdc_logid.lgl_oid, - body->lgdc_logid.lgl_oseq, body->lgdc_logid.lgl_ogen); + CDEBUG(D_HA, "%s: Recovery from log "LPX64"/"LPX64":%x\n", + obd->obd_name, body->lgdc_logid.lgl_oid, + body->lgdc_logid.lgl_oseq, body->lgdc_logid.lgl_ogen); cfs_spin_lock(&obd->u.filter.fo_flags_lock); obd->u.filter.fo_mds_ost_sync = 1; @@ -2571,7 +2686,17 @@ static int filter_precleanup(struct obd_device *obd, case OBD_CLEANUP_EXPORTS: /* Stop recovery before namespace cleanup. */ target_recovery_fini(obd); + + obd_exports_barrier(obd); + obd_zombie_barrier(); + rc = filter_llog_preclean(obd); + lprocfs_job_stats_fini(obd); + lprocfs_remove_proc_entry("clear", obd->obd_proc_exports_entry); + lprocfs_free_per_client_stats(obd); + lprocfs_obd_cleanup(obd); + lprocfs_free_obd_stats(obd); + lquota_cleanup(filter_quota_interface_ref, obd); break; } RETURN(rc); @@ -2586,15 +2711,6 @@ static int filter_cleanup(struct obd_device *obd) LCONSOLE_WARN("%s: shutting down for failover; client state " "will be preserved.\n", obd->obd_name); - obd_exports_barrier(obd); - obd_zombie_barrier(); - - lprocfs_remove_proc_entry("clear", obd->obd_proc_exports_entry); - lprocfs_free_per_client_stats(obd); - lprocfs_free_obd_stats(obd); - lprocfs_obd_cleanup(obd); - lquota_cleanup(filter_quota_interface_ref, obd); - ldlm_namespace_free(obd->obd_namespace, NULL, obd->obd_force); obd->obd_namespace = NULL; @@ -2639,7 +2755,7 @@ static int filter_connect_internal(struct obd_export *exp, CWARN("!!! This export (nid %s) used object group %d " "earlier; now it's trying to use group %d! This could " "be a bug in the MDS. Please report to " - "http://bugzilla.lustre.org/\n", + "http://bugs.whamcloud.com/\n", obd_export_nid2str(exp), fed->fed_group,data->ocd_group); RETURN(-EPROTO); } @@ -2650,8 +2766,10 @@ static int filter_connect_internal(struct obd_export *exp, data->ocd_version = LUSTRE_VERSION_CODE; /* Kindly make sure the SKIP_ORPHAN flag is from MDS. */ - if (!ergo(data->ocd_connect_flags & OBD_CONNECT_SKIP_ORPHAN, - data->ocd_connect_flags & OBD_CONNECT_MDS)) + if (data->ocd_connect_flags & OBD_CONNECT_MDS) + CDEBUG(D_HA, "%s: Received MDS connection for group %u\n", + exp->exp_obd->obd_name, data->ocd_group); + else if (data->ocd_connect_flags & OBD_CONNECT_SKIP_ORPHAN) RETURN(-EPROTO); if (exp->exp_connect_flags & OBD_CONNECT_GRANT) { @@ -2705,7 +2823,18 @@ static int filter_connect_internal(struct obd_export *exp, } else if (data->ocd_connect_flags & OBD_CONNECT_BRW_SIZE) { data->ocd_brw_size = min(data->ocd_brw_size, (__u32)(PTLRPC_MAX_BRW_PAGES << CFS_PAGE_SHIFT)); - LASSERT(data->ocd_brw_size); + if (data->ocd_brw_size == 0) { + CERROR("%s: cli %s/%p ocd_connect_flags: "LPX64 + " ocd_version: %x ocd_grant: %d ocd_index: %u " + "ocd_brw_size is unexpectedly zero, " + "network data corruption?" + "Refusing connection of this client\n", + exp->exp_obd->obd_name, + exp->exp_client_uuid.uuid, + exp, data->ocd_connect_flags, data->ocd_version, + data->ocd_grant, data->ocd_index); + RETURN(-EPROTO); + } } if (data->ocd_connect_flags & OBD_CONNECT_CKSUM) { @@ -2714,10 +2843,9 @@ static int filter_connect_internal(struct obd_export *exp, /* The client set in ocd_cksum_types the checksum types it * supports. We have to mask off the algorithms that we don't * support */ - if (cksum_types & OBD_CKSUM_ALL) - data->ocd_cksum_types &= OBD_CKSUM_ALL; - else - data->ocd_cksum_types = OBD_CKSUM_CRC32; + data->ocd_cksum_types &= cksum_types_supported_server(); + + /* 1.6.4 clients are not supported any more */ CDEBUG(D_RPCTRACE, "%s: cli %s supports cksum type %x, return " "%x\n", exp->exp_obd->obd_name, @@ -2732,6 +2860,9 @@ static int filter_connect_internal(struct obd_export *exp, obd_export_nid2str(exp)); } + if (data->ocd_connect_flags & OBD_CONNECT_MAXBYTES) + data->ocd_maxbytes = exp->exp_obd->u.obt.obt_sb->s_maxbytes; + RETURN(0); } @@ -2749,12 +2880,11 @@ static int filter_reconnect(const struct lu_env *env, rc = filter_connect_internal(exp, data, 1); if (rc == 0) - filter_export_stats_init(obd, exp, 1, localdata); + filter_export_stats_init(obd, exp, localdata); RETURN(rc); } -/* nearly identical to mds_connect */ static int filter_connect(const struct lu_env *env, struct obd_export **exp, struct obd_device *obd, struct obd_uuid *cluuid, @@ -2763,7 +2893,6 @@ static int filter_connect(const struct lu_env *env, struct lvfs_run_ctxt saved; struct lustre_handle conn = { 0 }; struct obd_export *lexp; - __u32 group; int rc; ENTRY; @@ -2780,7 +2909,7 @@ static int filter_connect(const struct lu_env *env, if (rc) GOTO(cleanup, rc); - filter_export_stats_init(obd, lexp, 0, localdata); + filter_export_stats_init(obd, lexp, localdata); if (obd->obd_replayable) { struct lsd_client_data *lcd = lexp->exp_target_data.ted_lcd; LASSERT(lcd); @@ -2790,16 +2919,11 @@ static int filter_connect(const struct lu_env *env, GOTO(cleanup, rc); } - group = data->ocd_group; - - CWARN("%s: Received MDS connection ("LPX64"); group %d\n", - obd->obd_name, lexp->exp_handle.h_cookie, group); - push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL); - rc = filter_read_groups(obd, group, 1); + rc = filter_read_groups(obd, data->ocd_group, 1); pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL); if (rc != 0) { - CERROR("can't read group %u\n", group); + CERROR("can't read group %u\n", data->ocd_group); GOTO(cleanup, rc); } @@ -2937,12 +3061,14 @@ static int filter_destroy_export(struct obd_export *exp) lquota_clearinfo(filter_quota_interface_ref, exp, exp->exp_obd); target_destroy_export(exp); + + if (unlikely(obd_uuid_equals(&exp->exp_obd->obd_uuid, + &exp->exp_client_uuid))) + RETURN(0); + ldlm_destroy_export(exp); lut_client_free(exp); - if (obd_uuid_equals(&exp->exp_client_uuid, &exp->exp_obd->obd_uuid)) - RETURN(0); - if (!exp->exp_obd->obd_replayable) fsfilt_sync(exp->exp_obd, exp->exp_obd->u.obt.obt_sb); @@ -3004,7 +3130,7 @@ static void filter_sync_llogs(struct obd_device *obd, struct obd_export *dexp) ctxt = llog_group_get_ctxt(olg_min, LLOG_MDS_OST_REPL_CTXT); if (ctxt) { - err = llog_sync(ctxt, olg_min->olg_exp); + err = llog_sync(ctxt, olg_min->olg_exp, 0); llog_ctxt_put(ctxt); if (err) { CERROR("error flushing logs to MDS: " @@ -3060,7 +3186,7 @@ static void filter_revimp_update(struct obd_export *exp) EXIT; } -static int filter_ping(struct obd_export *exp) +static int filter_ping(const struct lu_env *env, struct obd_export *exp) { filter_fmd_expire(exp); return 0; @@ -3087,13 +3213,31 @@ struct dentry *__filter_oa2dentry(struct obd_device *obd, struct ost_id *ostid, RETURN(ERR_PTR(-ENOENT)); } +#if LUSTRE_VERSION_CODE < OBD_OCD_VERSION(2, 7, 50, 0) + /* Try to correct for a bug in 2.1.0 (LU-221) that caused negative + * timestamps to appear to be in the far future, due old timestamp + * being stored on disk as an unsigned value. This fixes up any + * bad values stored on disk before returning them to the client, + * and ensures any timestamp updates are correct. LU-1042 */ + if (unlikely(LTIME_S(dchild->d_inode->i_atime) == LU221_BAD_TIME)) + LTIME_S(dchild->d_inode->i_atime) = 0; + if (unlikely(LTIME_S(dchild->d_inode->i_mtime) == LU221_BAD_TIME)) + LTIME_S(dchild->d_inode->i_mtime) = 0; + if (unlikely(LTIME_S(dchild->d_inode->i_ctime) == LU221_BAD_TIME)) + LTIME_S(dchild->d_inode->i_ctime) = 0; +#else +#warning "remove old LU-221/LU-1042 workaround code" +#endif + return dchild; } -static int filter_getattr(struct obd_export *exp, struct obd_info *oinfo) +static int filter_getattr(const struct lu_env *env, struct obd_export *exp, + struct obd_info *oinfo) { struct dentry *dentry = NULL; struct obd_device *obd; + __u64 curr_version; int rc = 0; ENTRY; @@ -3114,7 +3258,14 @@ static int filter_getattr(struct obd_export *exp, struct obd_info *oinfo) /* Limit the valid bits in the return data to what we actually use */ oinfo->oi_oa->o_valid = OBD_MD_FLID; - obdo_from_inode(oinfo->oi_oa, dentry->d_inode, NULL, FILTER_VALID_FLAGS); + obdo_from_inode(oinfo->oi_oa, dentry->d_inode, FILTER_VALID_FLAGS); + + /* Store inode version in reply */ + curr_version = fsfilt_get_version(exp->exp_obd, dentry->d_inode); + if ((__s64)curr_version != -EOPNOTSUPP) { + oinfo->oi_oa->o_valid |= OBD_MD_FLDATAVERSION; + oinfo->oi_oa->o_data_version = curr_version; + } f_dput(dentry); RETURN(rc); @@ -3190,18 +3341,17 @@ int filter_setattr_internal(struct obd_export *exp, struct dentry *dentry, if (fcc != NULL) *fcc = oa->o_lcookie; } - if (ia_valid & (ATTR_SIZE | ATTR_UID | ATTR_GID)) { - unsigned long now = jiffies; - ll_vfs_dq_init(inode); - /* Filter truncates and writes are serialized by - * i_alloc_sem, see the comment in - * filter_preprw_write.*/ - if (ia_valid & ATTR_SIZE) - down_write(&inode->i_alloc_sem); - LOCK_INODE_MUTEX(inode); - fsfilt_check_slow(exp->exp_obd, now, "i_alloc_sem and i_mutex"); - old_size = i_size_read(inode); - } + if (ia_valid & (ATTR_SIZE | ATTR_UID | ATTR_GID)) { + unsigned long now = jiffies; + /* Filter truncates and writes are serialized by + * i_alloc_sem, see the comment in + * filter_preprw_write.*/ + if (ia_valid & ATTR_SIZE) + down_write(&inode->i_alloc_sem); + mutex_lock(&inode->i_mutex); + fsfilt_check_slow(exp->exp_obd, now, "i_alloc_sem and i_mutex"); + old_size = i_size_read(inode); + } /* VBR: version recovery check */ rc = filter_version_get_check(exp, oti, inode); @@ -3262,6 +3412,11 @@ int filter_setattr_internal(struct obd_export *exp, struct dentry *dentry, if (IS_ERR(handle)) GOTO(out_unlock, rc = PTR_ERR(handle)); } + + /* Locking order: i_mutex -> journal_lock -> dqptr_sem. LU-952 */ + if (ia_valid & (ATTR_SIZE | ATTR_UID | ATTR_GID)) + ll_vfs_dq_init(inode); + if (oa->o_valid & OBD_MD_FLFLAGS) { rc = fsfilt_iocontrol(exp->exp_obd, dentry, FSFILT_IOC_SETFLAGS, (long)&oa->o_flags); @@ -3315,12 +3470,12 @@ out_unlock: if (page) page_cache_release(page); - if (ia_valid & (ATTR_SIZE | ATTR_UID | ATTR_GID)) - UNLOCK_INODE_MUTEX(inode); - if (ia_valid & ATTR_SIZE) - up_write(&inode->i_alloc_sem); - if (fcc) - OBD_FREE(fcc, sizeof(*fcc)); + if (ia_valid & (ATTR_SIZE | ATTR_UID | ATTR_GID)) + mutex_unlock(&inode->i_mutex); + if (ia_valid & ATTR_SIZE) + up_write(&inode->i_alloc_sem); + if (fcc) + OBD_FREE(fcc, sizeof(*fcc)); /* trigger quota release */ if (ia_valid & (ATTR_SIZE | ATTR_UID | ATTR_GID)) { @@ -3335,8 +3490,8 @@ out_unlock: } /* this is called from filter_truncate() until we have filter_punch() */ -int filter_setattr(struct obd_export *exp, struct obd_info *oinfo, - struct obd_trans_info *oti) +int filter_setattr(const struct lu_env *env, struct obd_export *exp, + struct obd_info *oinfo, struct obd_trans_info *oti) { struct obdo *oa = oinfo->oi_oa; struct lustre_capa *capa = oinfo_capa(oinfo); @@ -3350,7 +3505,7 @@ int filter_setattr(struct obd_export *exp, struct obd_info *oinfo, int rc; ENTRY; - if (oa->o_valid & OBD_FL_TRUNC) + if (oinfo->oi_flags & OBD_FL_PUNCH) opc |= CAPA_OPC_OSS_TRUNC; rc = filter_auth_capa(exp, NULL, oa->o_seq, capa, opc); @@ -3427,9 +3582,11 @@ int filter_setattr(struct obd_export *exp, struct obd_info *oinfo, oa->o_valid = OBD_MD_FLID; /* Quota release need uid/gid info */ - obdo_from_inode(oa, dentry->d_inode, NULL, + obdo_from_inode(oa, dentry->d_inode, FILTER_VALID_FLAGS | OBD_MD_FLUID | OBD_MD_FLGID); + filter_counter_incr(exp, LPROC_FILTER_STATS_SETATTR, + oti ? oti->oti_jobid : NULL, 1); EXIT; out_unlock: f_dput(dentry); @@ -3488,7 +3645,7 @@ static int filter_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp, LASSERT((*lsmp)->lsm_object_id); } - (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES; + (*lsmp)->lsm_maxbytes = exp->exp_obd->u.obt.obt_sb->s_maxbytes; RETURN(lsm_size); } @@ -3503,7 +3660,7 @@ static int filter_destroy_precreated(struct obd_export *exp, struct obdo *oa, int skip_orphan; ENTRY; - LASSERT(down_trylock(&filter->fo_create_locks[oa->o_seq]) != 0); + LASSERT_MUTEX_LOCKED(&filter->fo_create_locks[oa->o_seq]); memset(&doa, 0, sizeof(doa)); @@ -3527,7 +3684,7 @@ static int filter_destroy_precreated(struct obd_export *exp, struct obdo *oa, for (id = last; id > oa->o_id; id--) { doa.o_id = id; - rc = filter_destroy(exp, &doa, NULL, NULL, NULL, NULL); + rc = filter_destroy(NULL, exp, &doa, NULL, NULL, NULL, NULL); if (rc && rc != -ENOENT) /* this is pretty fatal... */ CEMERG("error destroying precreate objid "LPU64": %d\n", id, rc); @@ -3585,18 +3742,18 @@ static int filter_handle_precreate(struct obd_export *exp, struct obdo *oa, } /* This causes inflight precreates to abort and drop lock */ cfs_set_bit(group, &filter->fo_destroys_in_progress); - cfs_down(&filter->fo_create_locks[group]); + cfs_mutex_lock(&filter->fo_create_locks[group]); if (!cfs_test_bit(group, &filter->fo_destroys_in_progress)) { CERROR("%s:["LPU64"] destroys_in_progress already cleared\n", exp->exp_obd->obd_name, group); - cfs_up(&filter->fo_create_locks[group]); + cfs_mutex_unlock(&filter->fo_create_locks[group]); RETURN(0); } diff = oa->o_id - last; CDEBUG(D_HA, "filter_last_id() = "LPU64" -> diff = %d\n", last, diff); - if (-diff > OST_MAX_PRECREATE) { + if (-diff > (OST_MAX_PRECREATE * 3) / 2) { CERROR("%s: ignoring bogus orphan destroy request: " "obdid "LPU64" last_id "LPU64"\n", obd->obd_name, oa->o_id, last); @@ -3614,7 +3771,7 @@ static int filter_handle_precreate(struct obd_export *exp, struct obdo *oa, cfs_clear_bit(group, &filter->fo_destroys_in_progress); } } else { - cfs_down(&filter->fo_create_locks[group]); + cfs_mutex_lock(&filter->fo_create_locks[group]); if (oti->oti_conn_cnt < exp->exp_conn_cnt) { CERROR("%s: dropping old precreate request\n", obd->obd_name); @@ -3628,8 +3785,15 @@ static int filter_handle_precreate(struct obd_export *exp, struct obdo *oa, CDEBUG(D_RPCTRACE, "filter_last_id() = "LPU64" -> diff = %d\n", filter_last_id(filter, group), diff); - LASSERTF(diff >= 0,"%s: "LPU64" - "LPU64" = %d\n",obd->obd_name, - oa->o_id, filter_last_id(filter, group), diff); + /* + * Check obd->obd_recovering to handle the race condition + * while recreating missing precreated objects through + * filter_preprw_write() and mds_lov_clear_orphans() + * at the same time. + */ + LASSERTF(ergo(!obd->obd_recovering, diff >= 0), + "%s: "LPU64" - "LPU64" = %d\n", obd->obd_name, + oa->o_id, filter_last_id(filter, group), diff); } if (diff > 0) { @@ -3637,21 +3801,23 @@ static int filter_handle_precreate(struct obd_export *exp, struct obdo *oa, rc = filter_precreate(obd, oa, group, &diff); oa->o_id = filter_last_id(&obd->u.filter, group); oa->o_seq = group; - oa->o_valid = OBD_MD_FLID | OBD_MD_FLGROUP; + oa->o_valid |= (OBD_MD_FLID | OBD_MD_FLGROUP); GOTO(out, rc); } /* else diff == 0 */ GOTO(out, rc = 0); out: - cfs_up(&filter->fo_create_locks[group]); + cfs_mutex_unlock(&filter->fo_create_locks[group]); return rc; } -static int filter_statfs(struct obd_device *obd, struct obd_statfs *osfs, - __u64 max_age, __u32 flags) +static int filter_statfs(const struct lu_env *env, struct obd_export *exp, + struct obd_statfs *osfs, __u64 max_age, __u32 flags) { + struct obd_device *obd = class_exp2obd(exp); struct filter_obd *filter = &obd->u.filter; int blockbits = obd->u.obt.obt_sb->s_blocksize_bits; + struct lr_server_data *lsd = class_server_data(obd); int rc; ENTRY; @@ -3675,16 +3841,13 @@ static int filter_statfs(struct obd_device *obd, struct obd_statfs *osfs, ((filter->fo_tot_dirty + filter->fo_tot_pending + osfs->os_bsize - 1) >> blockbits)); - if (OBD_FAIL_CHECK(OBD_FAIL_OST_ENOSPC)) { - struct lr_server_data *lsd = class_server_data(obd); - int index = le32_to_cpu(lsd->lsd_ost_index); + if (OBD_FAIL_CHECK_VALUE(OBD_FAIL_OST_ENOSPC, + le32_to_cpu(lsd->lsd_ost_index))) + osfs->os_bfree = osfs->os_bavail = 2; - if (obd_fail_val == -1 || - index == obd_fail_val) - osfs->os_bfree = osfs->os_bavail = 2; - else if (obd_fail_loc & OBD_FAIL_ONCE) - obd_fail_loc &= ~OBD_FAILED; /* reset flag */ - } + if (OBD_FAIL_CHECK_VALUE(OBD_FAIL_OST_ENOINO, + le32_to_cpu(lsd->lsd_ost_index))) + osfs->os_ffree = 0; /* set EROFS to state field if FS is mounted as RDONLY. The goal is to * stop creating files on MDS if OST is not good shape to create @@ -3754,6 +3917,7 @@ static int filter_precreate(struct obd_device *obd, struct obdo *oa, struct dentry *dchild = NULL, *dparent = NULL; struct filter_obd *filter; struct obd_statfs *osfs; + struct iattr iattr; int err = 0, rc = 0, recreate_obj = 0, i; cfs_time_t enough_time = cfs_time_shift(DISK_TIMEOUT/2); __u64 os_ffree; @@ -3763,7 +3927,7 @@ static int filter_precreate(struct obd_device *obd, struct obdo *oa, filter = &obd->u.filter; - LASSERT(down_trylock(&filter->fo_create_locks[group]) != 0); + LASSERT_MUTEX_LOCKED(&filter->fo_create_locks[group]); OBD_FAIL_TIMEOUT(OBD_FAIL_TGT_DELAY_PRECREATE, obd_timeout / 2); @@ -3774,7 +3938,7 @@ static int filter_precreate(struct obd_device *obd, struct obdo *oa, OBD_ALLOC(osfs, sizeof(*osfs)); if (osfs == NULL) RETURN(-ENOMEM); - rc = filter_statfs(obd, osfs, + rc = filter_statfs(NULL, obd->obd_self_export, osfs, cfs_time_shift_64(-OBD_STATFS_CACHE_SECONDS), 0); if (rc == 0 && osfs->os_bavail < (osfs->os_blocks >> 10)) { @@ -3782,6 +3946,13 @@ static int filter_precreate(struct obd_device *obd, struct obdo *oa, LPU64"\n", obd->obd_name, osfs->os_bavail << obd->u.obt.obt_vfsmnt->mnt_sb->s_blocksize_bits); *num = 0; + if (oa->o_valid & OBD_MD_FLFLAGS) + oa->o_flags |= OBD_FL_NOSPC_BLK; + else { + oa->o_valid |= OBD_MD_FLFLAGS; + oa->o_flags = OBD_FL_NOSPC_BLK; + } + rc = -ENOSPC; } OBD_FREE(osfs, sizeof(*osfs)); @@ -3849,7 +4020,7 @@ static int filter_precreate(struct obd_device *obd, struct obdo *oa, } else { /* Use these existing objects if they are * zero length. */ - if (dchild->d_inode->i_size == 0) { + if (i_size_read(dchild->d_inode) == 0) { rc = filter_use_existing_obj(obd,dchild, &handle, &cleanup_phase); if (rc == 0) @@ -3883,12 +4054,24 @@ static int filter_precreate(struct obd_device *obd, struct obdo *oa, rc = ll_vfs_create(dparent->d_inode, dchild, S_IFREG | S_ISUID | S_ISGID | 0666, NULL); if (rc) { - CERROR("create failed rc = %d\n", rc); + CWARN("%s: create failed: rc = %d\n", obd->obd_name,rc); if (rc == -ENOSPC) { os_ffree = filter_calc_free_inodes(obd); - if (os_ffree != -1) - CERROR("%s: free inode "LPU64"\n", - obd->obd_name, os_ffree); + if (os_ffree == -1) + GOTO(cleanup, rc); + + if (obd->obd_osfs.os_bavail < + (obd->obd_osfs.os_blocks >> 10)) { + if (oa->o_valid & OBD_MD_FLFLAGS) { + oa->o_flags |= OBD_FL_NOSPC_BLK; + } else { + oa->o_valid |= OBD_MD_FLFLAGS; + oa->o_flags = OBD_FL_NOSPC_BLK; + } + + CWARN("%s: free inode "LPU64"\n", + obd->obd_name, os_ffree); + } } GOTO(cleanup, rc); } @@ -3898,12 +4081,27 @@ static int filter_precreate(struct obd_device *obd, struct obdo *oa, dchild->d_inode->i_ino); set_last_id: + /* Initialize a/c/m time so any client timestamp will always + * be newer and update the inode. ctime = 0 is also handled + * specially in fsfilt_ext3_setattr(). See LU-221, LU-1042 */ + iattr.ia_valid = ATTR_ATIME | ATTR_MTIME | ATTR_CTIME; + LTIME_S(iattr.ia_atime) = 0; + LTIME_S(iattr.ia_mtime) = 0; + LTIME_S(iattr.ia_ctime) = 0; + err = fsfilt_setattr(obd, dchild, handle, &iattr, 0); + if (err) + CWARN("%s: unable to initialize a/c/m time of newly " + "created object %.*s: rc = %d\n", + obd->obd_name, dchild->d_name.len, + dchild->d_name.name, err); + if (!recreate_obj) { filter_set_last_id(filter, next_id, group); err = filter_update_last_objid(obd, group, 0); if (err) - CERROR("unable to write lastobjid " - "but file created\n"); + CERROR("%s: unable to write lastobjid " + "but file created: rc = %d\n", + obd->obd_name, err); } cleanup: @@ -3942,8 +4140,9 @@ set_last_id: RETURN(rc); } -int filter_create(struct obd_export *exp, struct obdo *oa, - struct lov_stripe_md **ea, struct obd_trans_info *oti) +int filter_create(const struct lu_env *env, struct obd_export *exp, + struct obdo *oa, struct lov_stripe_md **ea, + struct obd_trans_info *oti) { struct obd_device *obd = exp->exp_obd; struct filter_export_data *fed; @@ -3959,7 +4158,17 @@ int filter_create(struct obd_export *exp, struct obdo *oa, fed = &exp->exp_filter_data; filter = &obd->u.filter; - if (fed->fed_group != oa->o_seq) { + /* 1.8 client doesn't carry the ocd_group with connect request, + * so the fed_group will always be zero for 1.8 client. */ + if (!(exp->exp_connect_flags & OBD_CONNECT_FULL20)) { + if (oa->o_seq != FID_SEQ_OST_MDT0 && + oa->o_seq != FID_SEQ_LLOG && + oa->o_seq != FID_SEQ_ECHO) { + CERROR("The request from older client has invalid" + " group "LPU64"!\n", oa->o_seq); + RETURN(-EINVAL); + } + } else if (fed->fed_group != oa->o_seq) { CERROR("%s: this export (nid %s) used object group %d " "earlier; now it's trying to use group "LPU64"!" " This could be a bug in the MDS. Please report to " @@ -3989,9 +4198,9 @@ int filter_create(struct obd_export *exp, struct obdo *oa, rc = -EINVAL; } else { diff = 1; - cfs_down(&filter->fo_create_locks[oa->o_seq]); + cfs_mutex_lock(&filter->fo_create_locks[oa->o_seq]); rc = filter_precreate(obd, oa, oa->o_seq, &diff); - cfs_up(&filter->fo_create_locks[oa->o_seq]); + cfs_mutex_unlock(&filter->fo_create_locks[oa->o_seq]); } } else { rc = filter_handle_precreate(exp, oa, oa->o_seq, oti); @@ -4012,9 +4221,10 @@ int filter_create(struct obd_export *exp, struct obdo *oa, RETURN(rc); } -int filter_destroy(struct obd_export *exp, struct obdo *oa, - struct lov_stripe_md *md, struct obd_trans_info *oti, - struct obd_export *md_exp, void *capa) +int filter_destroy(const struct lu_env *env, struct obd_export *exp, + struct obdo *oa, struct lov_stripe_md *md, + struct obd_trans_info *oti, struct obd_export *md_exp, + void *capa) { unsigned int qcids[MAXQUOTAS] = {0, 0}; struct obd_device *obd; @@ -4064,7 +4274,7 @@ int filter_destroy(struct obd_export *exp, struct obdo *oa, } fcc = &oa->o_lcookie; ctxt = llog_group_get_ctxt(olg, fcc->lgc_subsys + 1); - llog_cancel(ctxt, NULL, 1, fcc, 0); + llog_cancel(NULL, ctxt, NULL, 1, fcc, 0); llog_ctxt_put(ctxt); fcc = NULL; /* we didn't allocate fcc, don't free it */ } @@ -4081,7 +4291,6 @@ int filter_destroy(struct obd_export *exp, struct obdo *oa, if (fcc != NULL) *fcc = oa->o_lcookie; } - ll_vfs_dq_init(dchild->d_inode); /* we're gonna truncate it first in order to avoid possible deadlock: * P1 P2 @@ -4095,37 +4304,40 @@ int filter_destroy(struct obd_export *exp, struct obdo *oa, * between page lock, i_mutex & starting new journal handle. * (see bug 20321) -johann */ - now = jiffies; - down_write(&dchild->d_inode->i_alloc_sem); - LOCK_INODE_MUTEX(dchild->d_inode); - fsfilt_check_slow(exp->exp_obd, now, "i_alloc_sem and i_mutex"); - - /* VBR: version recovery check */ - rc = filter_version_get_check(exp, oti, dchild->d_inode); - if (rc) { - UNLOCK_INODE_MUTEX(dchild->d_inode); - up_write(&dchild->d_inode->i_alloc_sem); - GOTO(cleanup, rc); - } - - handle = fsfilt_start_log(obd, dchild->d_inode, FSFILT_OP_SETATTR, - NULL, 1); - if (IS_ERR(handle)) { - UNLOCK_INODE_MUTEX(dchild->d_inode); - up_write(&dchild->d_inode->i_alloc_sem); - GOTO(cleanup, rc = PTR_ERR(handle)); - } - - iattr.ia_valid = ATTR_SIZE; - iattr.ia_size = 0; - rc = fsfilt_setattr(obd, dchild, handle, &iattr, 1); - rc2 = fsfilt_commit(obd, dchild->d_inode, handle, 0); - UNLOCK_INODE_MUTEX(dchild->d_inode); - up_write(&dchild->d_inode->i_alloc_sem); - if (rc) - GOTO(cleanup, rc); - if (rc2) - GOTO(cleanup, rc = rc2); + now = jiffies; + down_write(&dchild->d_inode->i_alloc_sem); + mutex_lock(&dchild->d_inode->i_mutex); + fsfilt_check_slow(exp->exp_obd, now, "i_alloc_sem and i_mutex"); + + /* VBR: version recovery check */ + rc = filter_version_get_check(exp, oti, dchild->d_inode); + if (rc) { + mutex_unlock(&dchild->d_inode->i_mutex); + up_write(&dchild->d_inode->i_alloc_sem); + GOTO(cleanup, rc); + } + + handle = fsfilt_start_log(obd, dchild->d_inode, FSFILT_OP_SETATTR, + NULL, 1); + if (IS_ERR(handle)) { + mutex_unlock(&dchild->d_inode->i_mutex); + up_write(&dchild->d_inode->i_alloc_sem); + GOTO(cleanup, rc = PTR_ERR(handle)); + } + + /* Locking order: i_mutex -> journal_lock -> dqptr_sem. LU-952 */ + ll_vfs_dq_init(dchild->d_inode); + + iattr.ia_valid = ATTR_SIZE; + iattr.ia_size = 0; + rc = fsfilt_setattr(obd, dchild, handle, &iattr, 1); + rc2 = fsfilt_commit(obd, dchild->d_inode, handle, 0); + mutex_unlock(&dchild->d_inode->i_mutex); + up_write(&dchild->d_inode->i_alloc_sem); + if (rc) + GOTO(cleanup, rc); + if (rc2) + GOTO(cleanup, rc = rc2); /* We don't actually need to lock the parent until we are unlinking * here, and not while truncating above. That avoids holding the @@ -4136,16 +4348,17 @@ int filter_destroy(struct obd_export *exp, struct obdo *oa, GOTO(cleanup, rc = PTR_ERR(dparent)); cleanup_phase = 3; /* filter_parent_unlock */ - LOCK_INODE_MUTEX(dchild->d_inode); - handle = fsfilt_start_log(obd, dparent->d_inode,FSFILT_OP_UNLINK,oti,1); - if (IS_ERR(handle)) { - UNLOCK_INODE_MUTEX(dchild->d_inode); - GOTO(cleanup, rc = PTR_ERR(handle)); - } - cleanup_phase = 4; /* fsfilt_commit */ + mutex_lock(&dchild->d_inode->i_mutex); + handle = fsfilt_start_log(obd, dparent->d_inode, + FSFILT_OP_UNLINK, oti, 1); + if (IS_ERR(handle)) { + mutex_unlock(&dchild->d_inode->i_mutex); + GOTO(cleanup, rc = PTR_ERR(handle)); + } + cleanup_phase = 4; /* fsfilt_commit */ /* Quota release need uid/gid of inode */ - obdo_from_inode(oa, dchild->d_inode, NULL, OBD_MD_FLUID|OBD_MD_FLGID); + obdo_from_inode(oa, dchild->d_inode, OBD_MD_FLUID | OBD_MD_FLGID); filter_fmd_drop(exp, oa->o_id, oa->o_seq); @@ -4206,8 +4419,8 @@ cleanup: } /* NB start and end are used for punch, but not truncate */ -static int filter_truncate(struct obd_export *exp, struct obd_info *oinfo, - struct obd_trans_info *oti, +static int filter_truncate(const struct lu_env *env, struct obd_export *exp, + struct obd_info *oinfo, struct obd_trans_info *oti, struct ptlrpc_request_set *rqset) { int rc; @@ -4224,15 +4437,13 @@ static int filter_truncate(struct obd_export *exp, struct obd_info *oinfo, oinfo->oi_policy.l_extent.start); oinfo->oi_oa->o_size = oinfo->oi_policy.l_extent.start; - oinfo->oi_oa->o_valid |= OBD_FL_TRUNC; - rc = filter_setattr(exp, oinfo, oti); - oinfo->oi_oa->o_valid &= ~OBD_FL_TRUNC; + rc = filter_setattr(env, exp, oinfo, oti); RETURN(rc); } -static int filter_sync(struct obd_export *exp, struct obdo *oa, - struct lov_stripe_md *lsm, obd_off start, obd_off end, - void *capa) +static int filter_sync(const struct lu_env *env, struct obd_export *exp, + struct obd_info *oinfo, obd_off start, obd_off end, + struct ptlrpc_request_set *set) { struct lvfs_run_ctxt saved; struct obd_device_target *obt; @@ -4240,54 +4451,56 @@ static int filter_sync(struct obd_export *exp, struct obdo *oa, int rc, rc2; ENTRY; - rc = filter_auth_capa(exp, NULL, oa->o_seq, - (struct lustre_capa *)capa, CAPA_OPC_OSS_WRITE); + rc = filter_auth_capa(exp, NULL, oinfo->oi_oa->o_seq, + (struct lustre_capa *)oinfo->oi_capa, + CAPA_OPC_OSS_WRITE); if (rc) RETURN(rc); obt = &exp->exp_obd->u.obt; /* An objid of zero is taken to mean "sync whole filesystem" */ - if (!oa || !(oa->o_valid & OBD_MD_FLID)) { + if (!oinfo->oi_oa || !(oinfo->oi_oa->o_valid & OBD_MD_FLID)) { rc = fsfilt_sync(exp->exp_obd, obt->obt_sb); /* Flush any remaining cancel messages out to the target */ filter_sync_llogs(exp->exp_obd, exp); RETURN(rc); } - dentry = filter_oa2dentry(exp->exp_obd, &oa->o_oi); + dentry = filter_oa2dentry(exp->exp_obd, &oinfo->oi_oa->o_oi); if (IS_ERR(dentry)) RETURN(PTR_ERR(dentry)); push_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL); - LOCK_INODE_MUTEX(dentry->d_inode); + mutex_lock(&dentry->d_inode->i_mutex); - rc = filemap_fdatawrite(dentry->d_inode->i_mapping); - if (rc == 0) { - /* just any file to grab fsync method - "file" arg unused */ - struct file *file = obt->obt_rcvd_filp; + rc = filemap_fdatawrite(dentry->d_inode->i_mapping); + if (rc == 0) { + /* just any file to grab fsync method - "file" arg unused */ + struct file *file = obt->obt_rcvd_filp; - if (file->f_op && file->f_op->fsync) - rc = file->f_op->fsync(NULL, dentry, 1); + if (file->f_op && file->f_op->fsync) + rc = file->f_op->fsync(NULL, dentry, 1); - rc2 = filemap_fdatawait(dentry->d_inode->i_mapping); - if (!rc) - rc = rc2; - } - UNLOCK_INODE_MUTEX(dentry->d_inode); + rc2 = filemap_fdatawait(dentry->d_inode->i_mapping); + if (!rc) + rc = rc2; + } + mutex_unlock(&dentry->d_inode->i_mutex); - oa->o_valid = OBD_MD_FLID; - obdo_from_inode(oa, dentry->d_inode, NULL, FILTER_VALID_FLAGS); + oinfo->oi_oa->o_valid = OBD_MD_FLID; + obdo_from_inode(oinfo->oi_oa, dentry->d_inode, FILTER_VALID_FLAGS); pop_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL); + filter_counter_incr(exp, LPROC_FILTER_STATS_SYNC, oinfo->oi_jobid, 1); f_dput(dentry); RETURN(rc); } -static int filter_get_info(struct obd_export *exp, __u32 keylen, - void *key, __u32 *vallen, void *val, +static int filter_get_info(const struct lu_env *env, struct obd_export *exp, + __u32 keylen, void *key, __u32 *vallen, void *val, struct lov_stripe_md *lsm) { struct obd_device *obd; @@ -4443,7 +4656,8 @@ out: RETURN(rc); } -static int filter_set_info_async(struct obd_export *exp, __u32 keylen, +static int filter_set_info_async(const struct lu_env *env, + struct obd_export *exp, __u32 keylen, void *key, __u32 vallen, void *val, struct ptlrpc_request_set *set) { @@ -4519,8 +4733,8 @@ int filter_iocontrol(unsigned int cmd, struct obd_export *exp, CDEBUG(D_HA, "syncing ost %s\n", obd->obd_name); rc = fsfilt_sync(obd, obd->u.obt.obt_sb); - lvfs_set_rdonly(obd, obd->u.obt.obt_sb); - RETURN(0); + rc = lvfs_set_rdonly(obd, obd->u.obt.obt_sb); + RETURN(rc); } case OBD_IOC_CATLOGLIST: { @@ -4552,7 +4766,7 @@ int filter_iocontrol(unsigned int cmd, struct obd_export *exp, RETURN(0); } -static int filter_health_check(struct obd_device *obd) +static int filter_health_check(const struct lu_env *env, struct obd_device *obd) { #ifdef USE_HEALTH_CHECK_WRITE struct filter_obd *filter = &obd->u.filter; @@ -4600,6 +4814,24 @@ static int filter_process_config(struct obd_device *obd, obd_count len, return rc; } +static int filter_notify(struct obd_device *obd, + struct obd_device *unused, + enum obd_notify_event ev, void *data) +{ + switch (ev) { + case OBD_NOTIFY_CONFIG: + LASSERT(obd->obd_no_conn); + cfs_spin_lock(&obd->obd_dev_lock); + obd->obd_no_conn = 0; + cfs_spin_unlock(&obd->obd_dev_lock); + break; + default: + CDEBUG(D_INFO, "%s: Unhandled notification %#x\n", + obd->obd_name, ev); + } + return 0; +} + static struct lvfs_callback_ops filter_lvfs_ops = { l_fid2dentry: filter_lvfs_fid2dentry, }; @@ -4623,7 +4855,6 @@ static struct obd_ops filter_obd_ops = { .o_create = filter_create, .o_setattr = filter_setattr, .o_destroy = filter_destroy, - .o_brw = filter_brw, .o_punch = filter_truncate, .o_sync = filter_sync, .o_preprw = filter_preprw, @@ -4634,6 +4865,7 @@ static struct obd_ops filter_obd_ops = { .o_iocontrol = filter_iocontrol, .o_health_check = filter_health_check, .o_process_config = filter_process_config, + .o_notify = filter_notify, }; quota_interface_t *filter_quota_interface_ref;