struct ldlm_valblock_ops {
int (*lvbo_init)(struct ldlm_resource *res);
- int (*lvbo_update)(struct ldlm_resource *res, struct lustre_msg *m,
+
+ int (*lvbo_update)(struct ldlm_resource *res,
+ struct lustre_msg *m,
int buf_idx, int increase);
};
struct osc_creator {
spinlock_t oscc_lock;
- struct list_head oscc_list;
- struct obd_device *oscc_obd;
- obd_id oscc_last_id;//last available pre-created object
- obd_id oscc_next_id;// what object id to give out next
- obd_id oscc_gr;
- int oscc_grow_count;
- int oscc_max_grow_count;
- int oscc_kick_barrier;
- struct osc_created *oscc_osccd;
- struct obdo oscc_oa;
+ struct obd_device *oscc_obd;
int oscc_flags;
- wait_queue_head_t oscc_waitq; /* creating procs wait on this */
};
struct ldlm_export_data {
return obd->obd_fsops->fs_clear_fs_flags(inode, flags);
return 0;
}
+
static inline int
fsfilt_precreate_rec(struct obd_device *obd, struct dentry *dentry,
int *num, struct obdo *oa)
#define OBD_BRW_FROM_GRANT 0x20 /* the osc manages this under llite */
#define OBD_BRW_GRANTED 0x40 /* the ost manages this */
-#define OBD_OBJECT_EOF 0xffffffffffffffffULL
-
-#define OST_MIN_PRECREATE 32
-#define OST_MAX_PRECREATE 20000
+#define OBD_OBJECT_EOF 0xffffffffffffffffULL
struct obd_ioobj {
obd_id ioo_id;
/* SMFS external flags and methods */
#define SM_ALL_PLG 0x80L
-#define SM_PRECREATE 0x100L
#define SM_DO_REC 0x1
#define SM_INIT_REC 0x2
struct obd_export *mds_dt_exp;
int mds_has_dt_desc;
struct lov_desc mds_dt_desc;
+
+ spinlock_t mds_dt_lock;
obd_id *mds_dt_objids;
- int mds_dt_objids_valid;
- int mds_dt_nextid_set;
struct file *mds_dt_objid_filp;
- spinlock_t mds_dt_lock;
+ int mds_dt_objids_valid;
+
unsigned long *mds_client_bitmap;
struct semaphore mds_orphan_recovery_sem;
/* which secure flavor from remote to this mds is denied */
spinlock_t mds_denylist_lock;
struct list_head mds_denylist;
+ struct semaphore mds_create_sem;
};
struct echo_obd {
int rc;
};
+#define OBD_MODE_ASYNC (1 << 0)
+#define OBD_MODE_CROW (1 << 1)
+
/* Don't conflict with on-wire flags OBD_BRW_WRITE, etc */
#define N_LOCAL_TEMP_PAGE 0x10000000
+typedef int (*obd_obj_alloc_func_t)(obd_id *objid);
+
struct obd_trans_info {
__u64 oti_transno;
__u64 *oti_objid;
- /* Only used on the server side for tracking acks. */
+
+ /* only used on the server side for tracking acks. */
struct oti_req_ack_lock {
struct lustre_handle lock;
__u32 mode;
struct llog_cookie oti_onecookie;
struct llog_cookie *oti_logcookies;
int oti_numcookies;
- int oti_async;
+ int oti_flags;
+ obd_obj_alloc_func_t oti_obj_alloc;
};
static inline void oti_alloc_cookies(struct obd_trans_info *oti,int num_cookies)
LASSERT(list_empty(&lock->l_res_link));
lock->l_resource = ldlm_resource_get(ns, NULL, new_resid,
- lock->l_resource->lr_type, 1);
+ lock->l_resource->lr_type,
+ 1);
if (lock->l_resource == NULL) {
LBUG();
RETURN(-ENOMEM);
parent_res = parent_lock->l_resource;
}
- res = ldlm_resource_get(ns, parent_res, res_id, type, 1);
+ res = ldlm_resource_get(ns, parent_res, res_id,
+ type, 1);
if (res == NULL)
RETURN(NULL);
}
/* non-blocking function to manipulate a lock whose cb_data is being put away.*/
-void ldlm_change_cbdata(struct ldlm_namespace *ns, struct ldlm_res_id *res_id,
- ldlm_iterator_t iter, void *data)
+void ldlm_change_cbdata(struct ldlm_namespace *ns,
+ struct ldlm_res_id *res_id,
+ ldlm_iterator_t iter,
+ void *data)
{
struct ldlm_resource *res;
ENTRY;
ldlm_resource_get(struct ldlm_namespace *ns, struct ldlm_resource *parent,
struct ldlm_res_id name, __u32 type, int create)
{
- struct list_head *bucket, *tmp;
struct ldlm_resource *res = NULL;
+ struct list_head *bucket, *tmp;
ENTRY;
LASSERT(ns != NULL);
rc = ns->ns_lvbo->lvbo_init(res);
up(&res->lr_lvb_sem);
if (rc)
- CERROR("lvbo_init failed for resource "LPU64": rc %d\n",
- name.name[0], rc);
+ CERROR("lvbo_init failed for resource "
+ LPU64": rc %d\n", name.name[0], rc);
} else {
out:
l_unlock(&ns->ns_lock);
return retval;
}
-static int ll_lov_recreate_obj(struct inode *inode, struct file *file,
- unsigned long arg)
-{
- struct ll_inode_info *lli = ll_i2info(inode);
- struct obd_export *exp = ll_i2dtexp(inode);
- struct ll_recreate_obj ucreatp;
- struct obd_trans_info oti = { 0 };
- struct obdo *oa = NULL;
- int lsm_size;
- int rc = 0;
- struct lov_stripe_md *lsm, *lsm2;
- ENTRY;
-
- if (!capable (CAP_SYS_ADMIN))
- RETURN(-EPERM);
-
- rc = copy_from_user(&ucreatp, (struct ll_recreate_obj *)arg,
- sizeof(struct ll_recreate_obj));
- if (rc) {
- RETURN(-EFAULT);
- }
- oa = obdo_alloc();
- if (oa == NULL)
- RETURN(-ENOMEM);
-
- down(&lli->lli_open_sem);
- lsm = lli->lli_smd;
- if (lsm == NULL)
- GOTO(out, rc = -ENOENT);
- lsm_size = sizeof(*lsm) + (sizeof(struct lov_oinfo) *
- (lsm->lsm_stripe_count));
-
- OBD_ALLOC(lsm2, lsm_size);
- if (lsm2 == NULL)
- GOTO(out, rc = -ENOMEM);
-
- oa->o_id = ucreatp.lrc_id;
- oa->o_nlink = ucreatp.lrc_ost_idx;
- oa->o_gr = ucreatp.lrc_group;
- oa->o_valid = OBD_MD_FLID | OBD_MD_FLGROUP | OBD_MD_FLFLAGS;
- oa->o_flags |= OBD_FL_RECREATE_OBJS;
- obdo_from_inode(oa, inode, OBD_MD_FLTYPE | OBD_MD_FLATIME |
- OBD_MD_FLMTIME | OBD_MD_FLCTIME);
-
- oti.oti_objid = NULL;
- memcpy(lsm2, lsm, lsm_size);
- rc = obd_create(exp, oa, NULL, 0, &lsm2, &oti);
-
- OBD_FREE(lsm2, lsm_size);
- GOTO(out, rc);
-out:
- up(&lli->lli_open_sem);
- obdo_free(oa);
- return rc;
-}
-
static int ll_lov_setstripe_ea_info(struct inode *inode, struct file *file,
int flags, struct lov_user_md *lum,
int lum_size)
}
static int ll_lov_setea(struct inode *inode, struct file *file,
- unsigned long arg)
+ unsigned long arg)
{
int flags = MDS_OPEN_HAS_OBJS | FMODE_WRITE;
struct lov_user_md *lump;
int lum_size = sizeof(struct lov_user_md) +
- sizeof(struct lov_user_ost_data);
+ sizeof(struct lov_user_ost_data);
int rc;
ENTRY;
}
case LL_IOC_LOV_GETSTRIPE:
RETURN(ll_lov_getstripe(inode, arg));
- case LL_IOC_RECREATE_OBJ:
- RETURN(ll_lov_recreate_obj(inode, file, arg));
case EXT3_IOC_GETFLAGS:
case EXT3_IOC_SETFLAGS:
RETURN( ll_iocontrol(inode, file, cmd, arg) );
oa->o_id = lsm->lsm_object_id;
oa->o_gr = lsm->lsm_object_gr;
- oa->o_valid = OBD_MD_FLID;
+ oa->o_valid = OBD_MD_FLID | OBD_MD_FLGROUP;
+
obdo_from_inode(oa, inode, (OBD_MD_FLTYPE | OBD_MD_FLATIME |
OBD_MD_FLMTIME | OBD_MD_FLCTIME |
OBD_MD_FLGROUP));
data->mod_time = LTIME_S(CURRENT_TIME);
}
-#if 0
-/*
- * this was needed for catching correct calling place of ll_intent_alloc() with
- * missed ll_intent_free() causing memory leak. --umka
- */
-#define ll_intent_alloc(it) \
- ({ \
- int err; \
- OBD_SLAB_ALLOC((it)->d.fs_data, ll_intent_slab, SLAB_KERNEL, \
- sizeof(struct lustre_intent_data)); \
- if (!(it)->d.fs_data) { \
- err = -ENOMEM; \
- } else { \
- err = 0; \
- } \
- (it)->it_op_release = ll_intent_release; \
- err; \
- })
-
-#define ll_intent_free(it) \
- do { \
- if ((it)->d.fs_data) { \
- OBD_SLAB_FREE((it)->d.fs_data, ll_intent_slab, \
- sizeof(struct lustre_intent_data)); \
- (it)->d.fs_data = NULL; \
- } \
- } while (0)
-#endif
-
#endif /* LLITE_INTERNAL_H */
struct obd_device *obd;
struct obd_statfs osfs;
struct lustre_md md;
- kdev_t devno;
- int err;
__u32 valsize;
+ int err;
ENTRY;
obd = class_name2obd(lmv);
sb->s_blocksize = osfs.os_bsize;
sb->s_blocksize_bits = log2(osfs.os_bsize);
sb->s_maxbytes = PAGE_CACHE_MAXBYTES;
-
- devno = get_uuid2int((char *)sbi->ll_md_exp->exp_obd->obd_uuid.uuid,
- strlen((char *)sbi->ll_md_exp->exp_obd->obd_uuid.uuid));
- sb->s_dev = devno;
+ /* in 2.6.x FS is not allowed to form s_dev */
+#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
+ {
+ kdev_t devno;
+
+ devno = get_uuid2int((char *)sbi->ll_md_exp->exp_obd->obd_uuid.uuid,
+ strlen((char *)sbi->ll_md_exp->exp_obd->obd_uuid.uuid));
+
+ sb->s_dev = devno;
+ }
+#endif
/* after statfs, we are supposed to have connected to MDSs,
* so it's ok to check remote flag returned.
/* from sys_utime() */
if (!(ia_valid & (ATTR_MTIME_SET | ATTR_ATIME_SET))) {
if (current->fsuid != inode->i_uid &&
- (rc=ll_permission(inode,MAY_WRITE,NULL))!=0)
+ (rc = ll_permission(inode, MAY_WRITE, NULL)) != 0)
RETURN(rc);
} else {
/* from inode_change_ok() */
if (!rc)
rc = err;
}
- } else if (ia_valid & (ATTR_MTIME | ATTR_MTIME_SET)) {
+ } else if (ia_valid & (ATTR_MTIME | ATTR_MTIME_SET | ATTR_UID | ATTR_GID)) {
struct obdo *oa = NULL;
CDEBUG(D_INODE, "set mtime on OST inode %lu to %lu\n",
oa->o_id = lsm->lsm_object_id;
oa->o_gr = lsm->lsm_object_gr;
oa->o_valid = OBD_MD_FLID | OBD_MD_FLGROUP;
+
+ if (ia_valid & ATTR_UID) {
+ oa->o_uid = inode->i_uid;
+ oa->o_valid |= OBD_MD_FLUID;
+ }
+
+ if (ia_valid & ATTR_GID) {
+ oa->o_gid = inode->i_gid;
+ oa->o_valid |= OBD_MD_FLGID;
+ }
+
obdo_from_inode(oa, inode, OBD_MD_FLTYPE | OBD_MD_FLATIME |
OBD_MD_FLMTIME | OBD_MD_FLCTIME);
rc = obd_setattr(sbi->ll_dt_exp, oa, lsm, NULL);
if (rc)
CERROR("obd_setattr fails: rc = %d\n", rc);
}
+
RETURN(rc);
}
up(&lli->lli_size_sem);
} /* ll_truncate */
-int ll_prepare_write(struct file *file, struct page *page, unsigned from,
- unsigned to)
+int ll_prepare_write(struct file *file, struct page *page,
+ unsigned from, unsigned to)
{
struct inode *inode = page->mapping->host;
struct ll_inode_info *lli = ll_i2info(inode);
oa->o_id = lsm->lsm_object_id;
oa->o_gr = lsm->lsm_object_gr;
oa->o_mode = inode->i_mode;
+
oa->o_valid = OBD_MD_FLID | OBD_MD_FLMODE |
OBD_MD_FLTYPE | OBD_MD_FLGROUP;
- rc = obd_brw(OBD_BRW_CHECK, ll_i2dtexp(inode), oa, lsm,
- 1, &pga, NULL);
+ /*
+ * needed for quota to create OSS object on write with correct
+ * owner/group.
+ */
+ oa->o_uid = inode->i_uid;
+ oa->o_valid |= OBD_MD_FLUID;
+
+ oa->o_gid = inode->i_gid;
+ oa->o_valid |= OBD_MD_FLGID;
+
+ rc = obd_brw(OBD_BRW_CHECK, ll_i2dtexp(inode),
+ oa, lsm, 1, &pga, NULL);
if (rc)
GOTO(out_free_oa, rc);
lli = ll_i2info(page->mapping->host);
lsm = lli->lli_smd;
- down(&lli->lli_size_sem);
+ /*
+ * this callback is called with client lock taken, thus, it should not
+ * sleep or deadlock is possible. --umka
+ */
+// down(&lli->lli_size_sem);
kms = lov_merge_size(lsm, 1);
- up(&lli->lli_size_sem);
+// up(&lli->lli_size_sem);
/* catch race with truncate */
if (((__u64)page->index << PAGE_SHIFT) >= kms)
struct obd_device *obd = exp->exp_obd;
struct lmv_obd *lmv = &obd->u.lmv;
struct lmv_obj *obj;
- int rc;
+ int rc, mds;
ENTRY;
rc = lmv_check_connect(obd);
if (data->namelen != 0) {
/* usual link request */
- obj = lmv_grab_obj(obd, &data->id1);
+ obj = lmv_grab_obj(obd, &data->id2);
if (obj) {
rc = raw_name2idx(obj->hashtype, obj->objcount,
data->name, data->namelen);
- data->id1 = obj->objs[rc].id;
+ data->id2 = obj->objs[rc].id;
lmv_put_obj(obj);
}
+
+ mds = id_group(&data->id2);
CDEBUG(D_OTHER,"link "DLID4":%*s to "DLID4"\n",
OLID4(&data->id2), data->namelen, data->name,
OLID4(&data->id1));
} else {
+ mds = id_group(&data->id1);
+
/* request from MDS to acquire i_links for inode by id1 */
CDEBUG(D_OTHER, "inc i_nlinks for "DLID4"\n",
OLID4(&data->id1));
}
-
- rc = md_link(lmv->tgts[id_group(&data->id1)].ltd_exp,
- data, request);
+
+ CDEBUG(D_OTHER, "forward to MDS #%u ("DLID4")\n",
+ mds, OLID4(&data->id1));
+ rc = md_link(lmv->tgts[mds].ltd_exp, data, request);
+
RETURN(rc);
}
#define log2(n) ffz(~(n))
#endif
-static int lov_clear_orphans(struct obd_export *export, struct obdo *src_oa,
+static int lov_clear_orphans(struct obd_export *export,
+ struct obdo *src_oa,
struct lov_stripe_md **ea,
struct obd_trans_info *oti)
{
}
for (i = 0; i < lov->desc.ld_tgt_count; i++) {
+ int err;
struct lov_stripe_md obj_md;
struct lov_stripe_md *obj_mdp = &obj_md;
- int err;
- /* if called for a specific target, we don't
- care if it is not active. */
+ /*
+ * if called for a specific target, we don't care if it is not
+ * active.
+ */
if (lov->tgts[i].active == 0 && ost_uuid == NULL) {
CDEBUG(D_HA, "lov idx %d inactive\n", i);
continue;
if (ost_uuid && !obd_uuid_equals(ost_uuid, &lov->tgts[i].uuid))
continue;
+ /*
+ * setting up objid OSS objects should be destroyed starting
+ * from it.
+ */
memcpy(tmp_oa, src_oa, sizeof(*tmp_oa));
+ tmp_oa->o_valid |= OBD_MD_FLID;
+ tmp_oa->o_id = oti->oti_objid[i];
/* XXX: LOV STACKING: use real "obj_mdp" sub-data */
err = obd_create(lov->tgts[i].ltd_exp, tmp_oa, NULL, 0,
&obj_mdp, oti);
- if (err)
- /* This export will be disabled until it is recovered,
- and then orphan recovery will be completed. */
+ if (err) {
+ /*
+ * this export will be disabled until it is recovered,
+ * and then orphan recovery will be completed.
+ */
CERROR("error in orphan recovery on OST idx %d/%d: "
"rc = %d\n", i, lov->desc.ld_tgt_count, err);
+ }
if (ost_uuid)
break;
RETURN(rc);
}
-static int lov_recreate(struct obd_export *exp, struct obdo *src_oa,
- void *acl, int acl_size,
- struct lov_stripe_md **ea, struct obd_trans_info *oti)
-{
- struct lov_stripe_md *obj_mdp, *lsm;
- struct lov_obd *lov = &exp->exp_obd->u.lov;
- unsigned ost_idx;
- int rc, i;
- ENTRY;
-
- LASSERT(src_oa->o_valid & OBD_MD_FLFLAGS &&
- src_oa->o_flags & OBD_FL_RECREATE_OBJS);
-
- OBD_ALLOC(obj_mdp, sizeof(*obj_mdp));
- if (obj_mdp == NULL)
- RETURN(-ENOMEM);
-
- ost_idx = src_oa->o_nlink;
- lsm = *ea;
- if (lsm == NULL)
- GOTO(out, rc = -EINVAL);
- if (ost_idx >= lov->desc.ld_tgt_count)
- GOTO(out, rc = -EINVAL);
-
- for (i = 0; i < lsm->lsm_stripe_count; i++) {
- if (lsm->lsm_oinfo[i].loi_ost_idx == ost_idx) {
- if (lsm->lsm_oinfo[i].loi_id != src_oa->o_id)
- GOTO(out, rc = -EINVAL);
- break;
- }
- }
- if (i == lsm->lsm_stripe_count)
- GOTO(out, rc = -EINVAL);
-
- rc = obd_create(lov->tgts[ost_idx].ltd_exp, src_oa, acl, acl_size,
- &obj_mdp, oti);
-out:
- OBD_FREE(obj_mdp, sizeof(*obj_mdp));
- RETURN(rc);
-}
-
/* the LOV expects oa->o_id to be set to the LOV object id */
-static int lov_create(struct obd_export *exp, struct obdo *src_oa,
- void *acl, int acl_size,
- struct lov_stripe_md **ea, struct obd_trans_info *oti)
+static int
+lov_create(struct obd_export *exp, struct obdo *src_oa,
+ void *acl, int acl_size, struct lov_stripe_md **ea,
+ struct obd_trans_info *oti)
{
struct lov_request_set *set = NULL;
struct list_head *pos;
if (!lov->desc.ld_active_tgt_count)
RETURN(-EIO);
- /* Recreate a specific object id at the given OST index */
- if ((src_oa->o_valid & OBD_MD_FLFLAGS) &&
- (src_oa->o_flags & OBD_FL_RECREATE_OBJS)) {
- rc = lov_recreate(exp, src_oa, acl, acl_size, ea, oti);
- RETURN(rc);
- }
-
+ LASSERT(oti->oti_flags & OBD_MODE_CROW);
+
+ /* main creation loop */
rc = lov_prep_create_set(exp, ea, src_oa, oti, &set);
if (rc)
RETURN(rc);
struct lov_request *req =
list_entry(pos, struct lov_request, rq_link);
- /* XXX: LOV STACKING: use real "obj_mdp" sub-data */
- rc = obd_create(lov->tgts[req->rq_idx].ltd_exp, req->rq_oa,
- acl, acl_size, &req->rq_md, oti);
+ obd_id *objids = oti->oti_objid;
+
+ if (oti->oti_obj_alloc) {
+ __u64 next_id;
+
+ /*
+ * allocating new objid. Here it is delegated to caller,
+ * that is MDS in CROW case.
+ */
+ next_id = oti->oti_obj_alloc(&objids[req->rq_idx]);
+ req->rq_oa->o_id = next_id;
+ } else {
+ /* and here is default "allocator" */
+ req->rq_oa->o_id = ++objids[req->rq_idx];
+ }
lov_update_create_set(set, req, rc);
}
rc = lov_fini_create_set(set, ea);
if (!exp || !exp->exp_obd)
RETURN(-ENODEV);
- /* for now, we only expect time updates here */
- LASSERT(!(src_oa->o_valid & ~(OBD_MD_FLID|OBD_MD_FLTYPE | OBD_MD_FLMODE|
- OBD_MD_FLATIME | OBD_MD_FLMTIME |
- OBD_MD_FLCTIME | OBD_MD_FLFLAGS |
- OBD_MD_FLSIZE | OBD_MD_FLGROUP)));
-
LASSERT(!(src_oa->o_valid & OBD_MD_FLGROUP) || src_oa->o_gr > 0);
lov = &exp->exp_obd->u.lov;
#define KEY_IS(str) \
(keylen == strlen(str) && memcmp(key, str, keylen) == 0)
- if (KEY_IS("next_id")) {
- if (vallen != lov->desc.ld_tgt_count)
- RETURN(-EINVAL);
- for (i = 0; i < lov->desc.ld_tgt_count; i++) {
- /* initialize all OSCs, even inactive ones */
- if (obd_uuid_empty(&lov->tgts[i].uuid))
- continue;
- err = obd_set_info(lov->tgts[i].ltd_exp,
- keylen, key, sizeof(obd_id),
- ((obd_id*)val) + i);
- if (!rc)
- rc = err;
- }
- RETURN(rc);
- }
if (KEY_IS("async")) {
struct lov_desc *desc = &lov->desc;
struct lov_tgt_desc *tgts = lov->tgts;
RETURN(rc);
}
- if (KEY_IS("growth_count")) {
- if (vallen != sizeof(int))
- RETURN(-EINVAL);
- } else if (KEY_IS("mds_conn")) {
+ if (KEY_IS("mds_conn")) {
if (vallen != sizeof(__u32))
RETURN(-EINVAL);
} else if (KEY_IS("unlinked") || KEY_IS("unrecovery")) {
int lov_update_create_set(struct lov_request_set *set,
struct lov_request *req, int rc)
{
+ struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
struct obd_trans_info *oti = set->set_oti;
struct lov_stripe_md *lsm = set->set_md;
struct lov_oinfo *loi;
- struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
ENTRY;
req->rq_stripe = set->set_success;
/* If the MDS file was truncated up to some size, stripe over
* enough OSTs to allow the file to be created at that size. */
if (src_oa->o_valid & OBD_MD_FLSIZE) {
- stripes=((src_oa->o_size+LUSTRE_STRIPE_MAXBYTES)>>12)-1;
+ stripes = ((src_oa->o_size + LUSTRE_STRIPE_MAXBYTES) >> 12) - 1;
do_div(stripes, (__u32)(LUSTRE_STRIPE_MAXBYTES >> 12));
if (stripes > lov->desc.ld_active_tgt_count)
if (inode->i_ino != id_ino(&mds->mds_rootid) && generation &&
inode->i_generation != generation) {
/* we didn't find the right inode.. */
- CERROR("bad inode %lu, link: %lu, ct: %d, generation %u/%u\n",
- inode->i_ino, (unsigned long)inode->i_nlink,
- atomic_read(&inode->i_count), inode->i_generation,
- generation);
+ if (id_group(id) != mds->mds_num) {
+ CERROR("bad inode %lu found, link: %lu, ct: %d, generation "
+ "%u != %u, mds %u != %u, request to wrong MDS?\n",
+ inode->i_ino, (unsigned long)inode->i_nlink,
+ atomic_read(&inode->i_count), inode->i_generation,
+ generation, mds->mds_num, (unsigned)id_group(id));
+ } else {
+ CERROR("bad inode %lu found, link: %lu, ct: %d, generation "
+ "%u != %u, inode is recreated while request handled?\n",
+ inode->i_ino, (unsigned long)inode->i_nlink,
+ atomic_read(&inode->i_count), inode->i_generation,
+ generation);
+ }
dput(result);
RETURN(ERR_PTR(-ENOENT));
}
RETURN(rc);
}
-
/* Call with lock=1 if you want mds_pack_md to take the i_sem.
* Call with lock=0 if the caller has already taken the i_sem. */
int mds_pack_md(struct obd_device *obd, struct lustre_msg *msg, int offset,
struct mds_body *body, struct inode *inode, int lock, int mea)
{
struct mds_obd *mds = &obd->u.mds;
+ int rc, lmm_size;
void *lmm;
- int lmm_size;
- int rc;
ENTRY;
lmm = lustre_msg_buf(msg, offset, 0);
rc = mds_get_md(obd, inode, lmm, &lmm_size, lock, mea);
if (rc > 0) {
- if (S_ISDIR(inode->i_mode))
- body->valid |= OBD_MD_FLDIREA;
- else
- body->valid |= OBD_MD_FLEASIZE;
-
+ body->valid |= S_ISDIR(inode->i_mode) ?
+ OBD_MD_FLDIREA : OBD_MD_FLEASIZE;
+
if (mea)
body->valid |= OBD_MD_MEA;
RETURN(rc);
}
+
int mds_pack_link(struct dentry *dentry, struct ptlrpc_request *req,
struct mds_body *repbody, int reply_off)
{
/* we have to know mdsnum before touching underlying fs -bzzz */
atomic_set(&mds->mds_open_count, 0);
sema_init(&mds->mds_md_sem, 1);
+ sema_init(&mds->mds_create_sem, 1);
mds->mds_md_connected = 0;
mds->mds_md_name = NULL;
ctxt = llog_get_context(&obd->obd_llogs, LLOG_UNLINK_ORIG_CTXT);
LASSERT(ctxt != NULL);
- /* set nextid first, so we are sure it happens */
- rc = mds_dt_set_nextid(obd);
- if (rc) {
- CERROR("%s: mds_dt_set_nextid() failed\n", obd->obd_name);
- GOTO(out, rc);
- }
-
/* clean PENDING dir */
rc = mds_cleanup_orphans(obd);
if (rc < 0)
group = FILTER_GROUP_FIRST_MDS + mds->mds_num;
valsize = sizeof(group);
- rc = obd_set_info(mds->mds_dt_exp, strlen("mds_conn"), "mds_conn",
- valsize, &group);
+ rc = obd_set_info(mds->mds_dt_exp, strlen("mds_conn"),
+ "mds_conn", valsize, &group);
if (rc)
GOTO(out, rc);
}
/* remove the orphaned precreated objects */
- rc = mds_dt_clearorphans(mds, NULL /* all OSTs */);
+ rc = mds_dt_clear_orphans(mds, NULL /* all OSTs */);
if (rc)
GOTO(err_llog, rc);
void *key, obd_count vallen, void *val);
int mds_get_lovtgts(struct obd_device *, int tgt_count, struct obd_uuid *);
int mds_dt_write_objids(struct obd_device *obd);
-void mds_dt_update_objids(struct obd_device *obd, obd_id *ids);
int mds_dt_set_growth(struct mds_obd *mds, int count);
-int mds_dt_set_nextid(struct obd_device *obd);
-int mds_dt_clearorphans(struct mds_obd *mds, struct obd_uuid *ost_uuid);
+int mds_dt_clear_orphans(struct mds_obd *mds, struct obd_uuid *ost_uuid);
int mds_post_mds_lovconf(struct obd_device *obd);
int mds_notify(struct obd_device *obd, struct obd_device *watched,
int active, void *data);
struct lov_mds_md *lmm, int lmm_size);
int mds_revalidate_lov_ea(struct obd_device *obd, struct inode *inode,
struct lustre_msg *msg, int offset);
+void mds_dt_update_objids(struct obd_device *obd, obd_id *ids);
+void mds_dt_save_objids(struct obd_device *obd, obd_id *ids);
/* mds/mds_open.c */
-int mds_destroy_objects(struct obd_device *obd,
- struct inode *inode, int async);
+int
+mds_create_object(struct obd_device *obd, struct ptlrpc_request *req,
+ int offset, struct mds_update_record *rec,
+ struct dentry *dchild, void **handle,
+ obd_id *ids);
+int mds_destroy_object(struct obd_device *obd,
+ struct inode *inode, int async);
int mds_query_write_access(struct inode *inode);
int mds_open(struct mds_update_record *rec, int offset,
struct ptlrpc_request *req, struct lustre_handle *);
return 0;
}
-struct dentry *filter_id2dentry(struct obd_device *obd,
- struct dentry *dir_dentry,
- obd_gr group, obd_id id);
-
int mds_preprw(int cmd, struct obd_export *exp, struct obdo *oa,
int objcount, struct obd_ioobj *obj,
int niocount, struct niobuf_remote *nb,
ld->ld_pattern = cpu_to_le32 (ld->ld_pattern);
}
+void mds_dt_save_objids(struct obd_device *obd, obd_id *ids)
+{
+ struct mds_obd *mds = &obd->u.mds;
+ int i;
+ ENTRY;
+
+ spin_lock(&mds->mds_dt_lock);
+ for (i = 0; i < mds->mds_dt_desc.ld_tgt_count; i++)
+ ids[i] = mds->mds_dt_objids[i];
+ spin_unlock(&mds->mds_dt_lock);
+ EXIT;
+}
+
void mds_dt_update_objids(struct obd_device *obd, obd_id *ids)
{
struct mds_obd *mds = &obd->u.mds;
spin_lock(&mds->mds_dt_lock);
for (i = 0; i < mds->mds_dt_desc.ld_tgt_count; i++)
- if (ids[i] > (mds->mds_dt_objids)[i])
- (mds->mds_dt_objids)[i] = ids[i];
+ if (ids[i] > mds->mds_dt_objids[i])
+ mds->mds_dt_objids[i] = ids[i];
spin_unlock(&mds->mds_dt_lock);
EXIT;
}
static int mds_dt_read_objids(struct obd_device *obd)
{
struct mds_obd *mds = &obd->u.mds;
- obd_id *ids;
+ int i, rc, size;
loff_t off = 0;
- int i, rc, size = mds->mds_dt_desc.ld_tgt_count * sizeof(*ids);
+ obd_id *ids;
ENTRY;
if (mds->mds_dt_objids != NULL)
RETURN(0);
+ size = mds->mds_dt_desc.ld_tgt_count * sizeof(*ids);
OBD_ALLOC(ids, size);
if (ids == NULL)
RETURN(-ENOMEM);
if (mds->mds_dt_objid_filp->f_dentry->d_inode->i_size == 0)
RETURN(0);
+
rc = fsfilt_read_record(obd, mds->mds_dt_objid_filp, ids, size, &off);
if (rc < 0) {
- CERROR("Error reading objids %d\n", rc);
+ CERROR("error reading objids %d\n", rc);
} else {
mds->mds_dt_objids_valid = 1;
rc = 0;
}
- for (i = 0; i < mds->mds_dt_desc.ld_tgt_count; i++)
- CDEBUG(D_INFO, "read last object "LPU64" for idx %d\n",
- mds->mds_dt_objids[i], i);
+ for (i = 0; i < mds->mds_dt_desc.ld_tgt_count; i++) {
+ CDEBUG(D_INFO, "read last object "LPU64
+ " for idx %d\n", mds->mds_dt_objids[i], i);
+ }
RETURN(rc);
}
int mds_dt_write_objids(struct obd_device *obd)
{
struct mds_obd *mds = &obd->u.mds;
+ int i, rc, size;
loff_t off = 0;
- int i, rc, size = mds->mds_dt_desc.ld_tgt_count * sizeof(obd_id);
ENTRY;
for (i = 0; i < mds->mds_dt_desc.ld_tgt_count; i++)
CDEBUG(D_INFO, "writing last object "LPU64" for idx %d\n",
mds->mds_dt_objids[i], i);
+ size = mds->mds_dt_desc.ld_tgt_count * sizeof(obd_id);
rc = fsfilt_write_record(obd, mds->mds_dt_objid_filp,
mds->mds_dt_objids, size, &off, 0);
RETURN(rc);
}
-int mds_dt_clearorphans(struct mds_obd *mds, struct obd_uuid *ost_uuid)
+int mds_dt_clear_orphans(struct mds_obd *mds, struct obd_uuid *ost_uuid)
{
- int rc;
+ struct lov_stripe_md *empty_ea = NULL;
+ struct obd_trans_info oti = { 0 };
struct obdo *oa = NULL;
- struct obd_trans_info oti = {0};
- struct lov_stripe_md *empty_ea = NULL;
+ int rc;
ENTRY;
LASSERT(mds->mds_dt_objids != NULL);
RETURN(-ENOMEM);
memset(oa, 0, sizeof(*oa));
+
oa->o_gr = FILTER_GROUP_FIRST_MDS + mds->mds_num;
oa->o_valid = OBD_MD_FLFLAGS | OBD_MD_FLGROUP;
oa->o_flags = OBD_FL_DELORPHAN;
if (ost_uuid != NULL) {
- memcpy(&oa->o_inline, ost_uuid, sizeof(*ost_uuid));
+ memcpy(&oa->o_inline, ost_uuid,
+ sizeof(*ost_uuid));
oa->o_valid |= OBD_MD_FLINLINE;
}
- rc = obd_create(mds->mds_dt_exp, oa, NULL, 0, &empty_ea, &oti);
- obdo_free(oa);
- RETURN(rc);
-}
-/* update the LOV-OSC knowledge of the last used object id's */
-int mds_dt_set_nextid(struct obd_device *obd)
-{
- struct mds_obd *mds = &obd->u.mds;
- int rc;
- ENTRY;
-
- LASSERT(!obd->obd_recovering);
-
- LASSERT(mds->mds_dt_objids != NULL);
-
- rc = obd_set_info(mds->mds_dt_exp, strlen("next_id"), "next_id",
- mds->mds_dt_desc.ld_tgt_count, mds->mds_dt_objids);
+ /*
+ * passing current objids for letting data layer know last objids MDS
+ * knows about and do appropriate. --umka
+ */
+ oti.oti_objid = mds->mds_dt_objids;
+
+ rc = obd_create(mds->mds_dt_exp, oa,
+ NULL, 0, &empty_ea, &oti);
+
+ obdo_free(oa);
RETURN(rc);
}
int mds_dt_connect(struct obd_device *obd, char *lov_name)
{
struct mds_obd *mds = &obd->u.mds;
- struct lustre_handle conn = {0,};
- int rc, i;
+ struct lustre_handle conn = { 0 };
+ int i, rc = 0;
ENTRY;
if (IS_ERR(mds->mds_dt_obd))
if (mds->mds_ost_sec) {
rc = obd_set_info(mds->mds_dt_obd->obd_self_export,
strlen("sec"), "sec",
- strlen(mds->mds_ost_sec), mds->mds_ost_sec);
+ strlen(mds->mds_ost_sec),
+ mds->mds_ost_sec);
if (rc) {
mds->mds_dt_obd = ERR_PTR(rc);
RETURN(rc);
GOTO(err_reg, rc);
}
- /* If we're mounting this code for the first time on an existing FS,
- * we need to populate the objids array from the real OST values */
+ /*
+ * If we're mounting this code for the first time on an existing FS, we
+ * need to populate the objids array from the real OST values.
+ */
if (!mds->mds_dt_objids_valid) {
__u32 size = sizeof(obd_id) * mds->mds_dt_desc.ld_tgt_count;
+
rc = obd_get_info(mds->mds_dt_exp, strlen("last_id"),
"last_id", &size, mds->mds_dt_objids);
if (!rc) {
"writing objids file: %d\n", rc);
}
}
-
- /* I want to see a callback happen when the OBD moves to a
- * "For General Use" state, and that's when we'll call
- * set_nextid(). The class driver can help us here, because
- * it can use the obd_recovering flag to determine when the
- * the OBD is full available. */
+ /*
+ * I want to see a callback happen when the OBD moves to a "For General
+ * Use" state, and that's when we'll call set_nextid(). The class driver
+ * can help us here, because it can use the obd_recovering flag to
+ * determine when the the OBD is full available.
+ */
if (!obd->obd_recovering) {
CDEBUG(D_OTHER, "call mds_postrecov_common()\n");
rc = mds_postrecov_common(obd);
obd_register_observer(mds->mds_dt_obd, NULL);
err_discon:
obd_disconnect(mds->mds_dt_exp, 0);
- mds->mds_dt_exp = NULL;
mds->mds_dt_obd = ERR_PTR(rc);
+ mds->mds_dt_exp = NULL;
return rc;
}
CWARN("MDS %s: %s now active, resetting orphans\n",
obd->obd_name, uuid->uuid);
- rc = mds_dt_clearorphans(&obd->u.mds, uuid);
+ rc = mds_dt_clear_orphans(&obd->u.mds, uuid);
if (rc != 0) {
- CERROR("%s: failed at mds_dt_clearorphans(): %d\n",
+ CERROR("%s: failed at mds_dt_clear_orphans(): %d\n",
obd->obd_name, rc);
GOTO(cleanup, rc);
}
struct mds_obd *mds = mds_req2mds(req);
struct mds_file_data *mfd;
struct mds_body *body;
- int error;
+ int rc = 0;
ENTRY;
mfd = mds_mfd_new();
if (mfd == NULL) {
CERROR("mds: out of memory\n");
- GOTO(cleanup_dentry, error = -ENOMEM);
+ GOTO(cleanup_dentry, rc = -ENOMEM);
}
body = lustre_msg_buf(req->rq_repmsg, 1, sizeof (*body));
if (flags & FMODE_WRITE) {
/* FIXME: in recovery, need to pass old epoch here */
- error = mds_get_write_access(mds, dentry->d_inode, 0);
- if (error)
- GOTO(cleanup_mfd, error);
+ rc = mds_get_write_access(mds, dentry->d_inode, 0);
+ if (rc)
+ GOTO(cleanup_mfd, rc);
#ifdef IFILTERDATA_ACTUALLY_USED
body->io_epoch = MDS_FILTERDATA(dentry->d_inode)->io_epoch;
-#endif /*IFILTERDATA_ACTUALLY_USED*/
+#endif
} else if (flags & FMODE_EXEC) {
- error = mds_deny_write_access(mds, dentry->d_inode);
- if (error)
- GOTO(cleanup_mfd, error);
+ rc = mds_deny_write_access(mds, dentry->d_inode);
+ if (rc)
+ GOTO(cleanup_mfd, rc);
}
dget(dentry);
- /* Mark the file as open to handle open-unlink. */
-
+ /* mark the file as open to handle open-unlink. */
DOWN_WRITE_I_ALLOC_SEM(dentry->d_inode);
mds_orphan_open_inc(dentry->d_inode);
UP_WRITE_I_ALLOC_SEM(dentry->d_inode);
mds_mfd_put(mfd);
body->handle.cookie = mfd->mfd_handle.h_cookie;
-
RETURN(mfd);
-
cleanup_mfd:
mds_mfd_put(mfd);
mds_mfd_destroy(mfd);
cleanup_dentry:
- return ERR_PTR(error);
+ return ERR_PTR(rc);
}
-static void mds_objids_from_lmm(obd_id *ids, struct lov_mds_md *lmm,
- struct lov_desc *desc)
+/* this is object id allocation callback */
+static int mds_obj_alloc(obd_id *objid)
+{
+ ENTRY;
+ LASSERT(objid != NULL);
+ RETURN(++(*objid));
+}
+
+static inline void
+mds_objids_from_lmm(obd_id *ids, struct lov_mds_md *lmm,
+ struct lov_desc *desc)
{
int i;
+
for (i = 0; i < le32_to_cpu(lmm->lmm_stripe_count); i++) {
ids[le32_to_cpu(lmm->lmm_objects[i].l_ost_idx)] =
le64_to_cpu(lmm->lmm_objects[i].l_object_id);
}
}
-/* Must be called with i_sem held */
-static int mds_create_objects(struct ptlrpc_request *req, int offset,
- struct mds_update_record *rec,
- struct mds_obd *mds, struct obd_device *obd,
- struct dentry *dchild, void **handle,
- obd_id **ids)
+/* must be called with i_sem held */
+int
+mds_create_object(struct obd_device *obd, struct ptlrpc_request *req,
+ int offset, struct mds_update_record *rec,
+ struct dentry *dchild, void **handle,
+ obd_id *ids)
{
- struct obdo *oa = NULL;
+ struct inode *inode = dchild->d_inode;
+ struct mds_obd *mds = &obd->u.mds;
struct obd_trans_info oti = { 0 };
- struct mds_body *body;
struct lov_stripe_md *lsm = NULL;
struct lov_mds_md *lmm = NULL;
- struct inode *inode = dchild->d_inode;
- void *lmm_buf;
int rc, lmm_bufsize, lmm_size;
+ struct obdo *oa = NULL;
+ struct mds_body *body;
+ void *lmm_buf;
ENTRY;
if (rec->ur_flags & MDS_OPEN_DELAY_CREATE ||
if (body->valid & OBD_MD_FLEASIZE)
RETURN(0);
- OBD_ALLOC(*ids, mds->mds_dt_desc.ld_tgt_count * sizeof(**ids));
- if (*ids == NULL)
- RETURN(-ENOMEM);
- oti.oti_objid = *ids;
-
+ oti.oti_objid = ids;
+
/* replay case */
if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY) {
LASSERT(id_ino(rec->ur_id2));
LASSERT(lmm);
if (*handle == NULL)
- *handle = fsfilt_start(obd,inode,FSFILT_OP_CREATE,NULL);
+ *handle = fsfilt_start(obd, inode, FSFILT_OP_CREATE, NULL);
if (IS_ERR(*handle)) {
rc = PTR_ERR(*handle);
*handle = NULL;
- GOTO(out_ids, rc);
+ RETURN(rc);
}
- mds_objids_from_lmm(*ids, lmm, &mds->mds_dt_desc);
+ /*
+ * FIXME: this is evil layering violation, all things related to
+ * stripping should be done by LOV. --umka.
+ */
+ mds_objids_from_lmm(ids, lmm, &mds->mds_dt_desc);
lmm_buf = lustre_msg_buf(req->rq_repmsg, offset, 0);
lmm_bufsize = req->rq_repmsg->buflens[offset];
- LASSERT(lmm_buf);
+
+ LASSERT(lmm_buf != NULL);
LASSERT(lmm_bufsize >= lmm_size);
+
memcpy(lmm_buf, lmm, lmm_size);
rc = fsfilt_set_md(obd, inode, *handle, lmm,
lmm_size, EA_LOV);
}
if (OBD_FAIL_CHECK_ONCE(OBD_FAIL_MDS_ALLOC_OBDO))
- GOTO(out_ids, rc = -ENOMEM);
+ RETURN(-ENOMEM);
oa = obdo_alloc();
if (oa == NULL)
- GOTO(out_ids, rc = -ENOMEM);
+ RETURN(-ENOMEM);
oa->o_mode = S_IFREG | 0600;
oa->o_id = inode->i_ino;
oa->o_gr = FILTER_GROUP_FIRST_MDS + mds->mds_num;
OBD_MD_FLMODE | OBD_MD_FLUID | OBD_MD_FLGID | OBD_MD_FLGROUP;
oa->o_size = 0;
- obdo_from_inode(oa, inode, OBD_MD_FLTYPE|OBD_MD_FLATIME|OBD_MD_FLMTIME|
- OBD_MD_FLCTIME);
+ obdo_from_inode(oa, inode, OBD_MD_FLTYPE | OBD_MD_FLATIME |
+ OBD_MD_FLMTIME | OBD_MD_FLCTIME);
if (!(rec->ur_flags & MDS_OPEN_HAS_OBJS)) {
/* check if things like lfs setstripe are sending us the ea */
OBD_FREE(lmm, mds->mds_max_mdsize);
if (rc)
GOTO(out_oa, rc);
- }
+ }
+
+ /*
+ * create with CROW flag and base ids for allocating new ids on
+ * them.
+ */
+ oti.oti_flags |= OBD_MODE_CROW;
+ oti.oti_obj_alloc = mds_obj_alloc;
+
LASSERT(oa->o_gr >= FILTER_GROUP_FIRST_MDS);
rc = obd_create(mds->mds_dt_exp, oa, NULL, 0, &lsm, &oti);
+
if (rc) {
int level = D_ERROR;
if (rc == -ENOSPC)
level = D_INODE;
- CDEBUG(level, "error creating objects for "
- "inode %lu: rc = %d\n",
+ CDEBUG((rc == -ENOSPC ? D_INODE : D_ERROR),
+ "error creating objects for "
+ "inode %lu: rc = %d\n",
inode->i_ino, rc);
if (rc > 0) {
CERROR("obd_create returned invalid "
} else {
rc = obd_iocontrol(OBD_IOC_LOV_SETEA, mds->mds_dt_exp,
0, &lsm, rec->ur_eadata);
- if (rc) {
+ if (rc)
GOTO(out_oa, rc);
- }
+
lsm->lsm_object_id = oa->o_id;
lsm->lsm_object_gr = oa->o_gr;
}
if (inode->i_size) {
oa->o_size = inode->i_size;
- obdo_from_inode(oa, inode, OBD_MD_FLTYPE|OBD_MD_FLATIME|
- OBD_MD_FLMTIME| OBD_MD_FLCTIME| OBD_MD_FLSIZE);
+ obdo_from_inode(oa, inode, OBD_MD_FLTYPE | OBD_MD_FLATIME |
+ OBD_MD_FLMTIME | OBD_MD_FLCTIME | OBD_MD_FLSIZE);
+
rc = obd_setattr(mds->mds_dt_exp, oa, lsm, &oti);
if (rc) {
CERROR("error setting attrs for inode %lu: rc %d\n",
rc = obd_packmd(mds->mds_dt_exp, &lmm, lsm);
if (!id_ino(rec->ur_id2))
obd_free_memmd(mds->mds_dt_exp, &lsm);
- LASSERT(rc >= 0);
+ if (rc < 0) {
+ CERROR("cannot pack lsm, err = %d\n", rc);
+ GOTO(out_oa, rc);
+ }
+
lmm_size = rc;
body->eadatasize = rc;
memcpy(lmm_buf, lmm, lmm_size);
obd_free_diskmd(mds->mds_dt_exp, &lmm);
- out_oa:
+out_oa:
oti_free_cookies(&oti);
obdo_free(oa);
- out_ids:
- if (rc) {
- OBD_FREE(*ids, mds->mds_dt_desc.ld_tgt_count * sizeof(**ids));
- *ids = NULL;
- }
- if(lsm)
+
+ if (lsm)
obd_free_memmd(mds->mds_dt_exp, &lsm);
RETURN(rc);
}
int
-mds_destroy_objects(struct obd_device *obd,
- struct inode *inode, int async)
+mds_destroy_object(struct obd_device *obd,
+ struct inode *inode, int async)
{
struct mds_obd *mds = &obd->u.mds;
struct lov_mds_md *lmm = NULL;
LASSERT(inode != NULL);
if (inode->i_nlink != 0) {
- CWARN("attempt to destroy OSS object when "
- "i_nlink == %d\n", (int)inode->i_nlink);
+ CDEBUG(D_INODE, "attempt to destroy OSS object when "
+ "i_nlink == %d\n", (int)inode->i_nlink);
RETURN(0);
}
struct mds_body *body, int flags, void **handle,
struct mds_update_record *rec, struct ldlm_reply *rep)
{
- struct mds_obd *mds = mds_req2mds(req);
struct obd_device *obd = req->rq_export->exp_obd;
+ struct mds_obd *mds = mds_req2mds(req);
struct mds_file_data *mfd = NULL;
- obd_id *ids = NULL; /* object IDs created */
+ obd_id *ids = NULL;
unsigned mode;
int rc = 0;
ENTRY;
/* atomically create objects if necessary */
down(&dchild->d_inode->i_sem);
mode = dchild->d_inode->i_mode;
+
if ((S_ISREG(mode) && !(body->valid & OBD_MD_FLEASIZE)) ||
(S_ISDIR(mode) && !(body->valid & OBD_MD_FLDIREA))) {
rc = mds_pack_md(obd, req->rq_repmsg, 2, body,
RETURN(rc);
}
}
+
if (rec != NULL) {
/* no EA: create objects */
if ((body->valid & OBD_MD_FLEASIZE) &&
up(&dchild->d_inode->i_sem);
RETURN(-EEXIST);
}
+
if (!(body->valid & OBD_MD_FLEASIZE)) {
- /* no EA: create objects */
- rc = mds_create_objects(req, 2, rec, mds, obd,
- dchild, handle, &ids);
+ int ids_size = mds->mds_dt_desc.ld_tgt_count * sizeof(*ids);
+
+ OBD_ALLOC(ids, ids_size);
+ if (ids == NULL) {
+ up(&dchild->d_inode->i_sem);
+ RETURN(-ENOMEM);
+ }
+
+ /*
+ * synchronizing object creating to prevent another
+ * threads take the same base objid values.
+ */
+ down(&mds->mds_create_sem);
+
+ /* preparing base ids */
+ mds_dt_save_objids(obd, ids);
+
+ /*
+ * create objects, @ids will contain new allocated obj
+ * ids.
+ */
+ rc = mds_create_object(obd, req, 2, rec,
+ dchild, handle, ids);
if (rc) {
- CERROR("mds_create_objects: rc = %d\n", rc);
+ CERROR("mds_create_object: rc = %d\n", rc);
+ up(&mds->mds_create_sem);
up(&dchild->d_inode->i_sem);
+ OBD_FREE(ids, ids_size);
RETURN(rc);
}
+
+ /*
+ * update MDS objids by new ones allocated in
+ * mds_create_object().
+ */
+ mds_dt_update_objids(obd, ids);
+ OBD_FREE(ids, ids_size);
+
+ up(&mds->mds_create_sem);
}
+
if (S_ISREG(dchild->d_inode->i_mode) &&
(body->valid & OBD_MD_FLEASIZE)) {
rc = mds_revalidate_lov_ea(obd, dchild->d_inode,
}
}
}
+
rc = mds_pack_acl(obd, req->rq_repmsg, 3, body, dchild->d_inode);
if (rc < 0) {
CERROR("mds_pack_acl: rc = %d\n", rc);
body->valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS |
OBD_MD_FLATIME | OBD_MD_FLMTIME);
}
+
up(&dchild->d_inode->i_sem);
intent_set_disposition(rep, DISP_OPEN_OPEN);
CDEBUG(D_INODE, "mfd %p, cookie "LPX64"\n", mfd,
mfd->mfd_handle.h_cookie);
- if (ids != NULL) {
- mds_dt_update_objids(obd, ids);
- OBD_FREE(ids, sizeof(*ids) * mds->mds_dt_desc.ld_tgt_count);
- }
- //if (rc)
- // mds_mfd_destroy(mfd);
RETURN(rc);
}
LASSERT(id_ino(rec->ur_id2));
- rc = mds_open_by_id(req, rec->ur_id2, body, rec->ur_flags,
- rec, rep);
+ rc = mds_open_by_id(req, rec->ur_id2, body,
+ rec->ur_flags, rec, rep);
if (rc != -ENOENT) {
mds_body_do_reverse_map(med, body);
RETURN(rc);
reply_body->valid |= OBD_MD_FLCOOKIE;
}
- rc = mds_destroy_objects(obd, inode, 1);
+ rc = mds_destroy_object(obd, inode, 1);
if (rc) {
CERROR("cannot destroy OSS object on close, err %d\n",
rc);
} else if (rec->ur_iattr.ia_valid & ATTR_EA_RM) {
rc = -EOPNOTSUPP;
if (inode->i_op && inode->i_op->removexattr)
- rc = inode->i_op->removexattr(de,
- rec->ur_eadata);
- } else if ((S_ISREG(inode->i_mode) ||
- S_ISDIR(inode->i_mode)) && rec->ur_eadata != NULL) {
+ rc = inode->i_op->removexattr(de, rec->ur_eadata);
+ } else if (S_ISREG(inode->i_mode) || S_ISDIR(inode->i_mode)) {
struct lov_stripe_md *lsm = NULL;
struct lov_user_md *lum = NULL;
-
- rc = ll_permission(inode, MAY_WRITE, NULL);
- if (rc < 0)
- GOTO(cleanup, rc);
- lum = rec->ur_eadata;
- /* if lmm_stripe_size is -1 delete default stripe from dir */
- if (S_ISDIR(inode->i_mode) &&
- lum->lmm_stripe_size == (typeof(lum->lmm_stripe_size))(-1)){
- rc = fsfilt_set_md(obd, inode, handle, NULL, 0, EA_LOV);
- if (rc)
- GOTO(cleanup, rc);
- } else {
- rc = obd_iocontrol(OBD_IOC_LOV_SETSTRIPE, mds->mds_dt_exp,
- 0, &lsm, rec->ur_eadata);
- if (rc)
+ if (rec->ur_eadata != NULL) {
+ rc = ll_permission(inode, MAY_WRITE, NULL);
+ if (rc < 0)
GOTO(cleanup, rc);
+
+ lum = rec->ur_eadata;
+
+ /* if lmm_stripe_size is -1 delete default
+ * stripe from dir */
+ if (S_ISDIR(inode->i_mode) &&
+ lum->lmm_stripe_size == (typeof(lum->lmm_stripe_size))(-1)){
+ rc = fsfilt_set_md(obd, inode, handle, NULL, 0, EA_LOV);
+ if (rc)
+ GOTO(cleanup, rc);
+ } else {
+ rc = obd_iocontrol(OBD_IOC_LOV_SETSTRIPE,
+ mds->mds_dt_exp, 0,
+ &lsm, rec->ur_eadata);
+ if (rc)
+ GOTO(cleanup, rc);
- obd_free_memmd(mds->mds_dt_exp, &lsm);
- rc = fsfilt_set_md(obd, inode, handle, rec->ur_eadata,
- rec->ur_eadatalen, EA_LOV);
- if (rc)
- GOTO(cleanup, rc);
+ obd_free_memmd(mds->mds_dt_exp, &lsm);
+ rc = fsfilt_set_md(obd, inode, handle, rec->ur_eadata,
+ rec->ur_eadatalen, EA_LOV);
+ if (rc)
+ GOTO(cleanup, rc);
+ }
}
}
}
body->valid |= OBD_MD_FLCOOKIE;
}
- rc = mds_destroy_objects(obd, child_inode, 1);
+ rc = mds_destroy_object(obd, child_inode, 1);
if (rc) {
CERROR("can't remove OST object, err %d\n",
rc);
body->valid |= OBD_MD_FLCOOKIE;
}
- rc = mds_destroy_objects(obd, old_inode, 1);
+ rc = mds_destroy_object(obd, old_inode, 1);
if (rc) {
CERROR("can't remove OST object, err %d\n",
rc);
CDEBUG(D_INODE, "destroy OSS object %d/%d\n",
(int)oa->o_id, (int)oa->o_gr);
+
+ if (async)
+ oti.oti_flags |= OBD_MODE_ASYNC;
- oti.oti_async = async;
rc = obd_destroy(mds->mds_dt_exp, oa, lsm, &oti);
obdo_free(oa);
out_free_memmd:
RETURN(rc);
}
+
#if 0
static int filter_group_set_fs_flags(struct obd_device *obd, int group)
{
RETURN(rc);
}
#endif
+
static int filter_post_fs_setup(struct obd_device *obd)
{
struct filter_obd *filter = &obd->u.filter;
char str[PTL_NALFMT_SIZE];
struct obd_llogs *llog;
struct llog_ctxt *ctxt;
- int rc;
+ int rc = 0;
ENTRY;
fed = &exp->exp_filter_data;
LASSERT(ctxt != NULL);
rc = llog_receptor_accept(ctxt, exp->exp_imp_reverse);
+
portals_nid2str(exp->exp_connection->c_peer.peer_ni->pni_number,
exp->exp_connection->c_peer.peer_id.nid, str);
+
CDEBUG(D_OTHER, "%s: init llog ctxt for export "LPX64"/%s, group %d\n",
obd->obd_name, exp->exp_connection->c_peer.peer_id.nid,
str, fed->fed_group);
{
struct dentry *dchild = NULL;
obd_gr group = 0;
+ ENTRY;
if (oa->o_valid & OBD_MD_FLGROUP)
group = oa->o_gr;
}
if (dchild->d_inode == NULL) {
- CERROR("%s: %s on non-existent object: "LPU64"\n",
- obd->obd_name, what, oa->o_id);
+ CDEBUG(D_INFO, "%s: %s on non-existent object: "
+ LPU64"\n", obd->obd_name, what, oa->o_id);
f_dput(dchild);
RETURN(ERR_PTR(-ENOENT));
}
- return dchild;
+ RETURN(dchild);
}
static int filter_getattr(struct obd_export *exp, struct obdo *oa,
RETURN(rc);
}
-/* this is called from filter_truncate() until we have filter_punch() */
-static int filter_setattr(struct obd_export *exp, struct obdo *oa,
- struct lov_stripe_md *md, struct obd_trans_info *oti)
+int filter_setattr_internal(struct obd_export *exp, struct dentry *dentry,
+ struct obdo *oa, struct obd_trans_info *oti)
{
- struct lvfs_run_ctxt saved;
struct filter_obd *filter;
- struct dentry *dentry;
struct iattr iattr;
- struct ldlm_res_id res_id = { .name = { oa->o_id, 0, oa->o_gr, 0 } };
- struct ldlm_resource *res;
void *handle;
- int rc, rc2;
+ int rc, err;
ENTRY;
- LASSERT(oti != NULL);
-
- dentry = filter_oa2dentry(exp->exp_obd, oa);
- if (IS_ERR(dentry))
- RETURN(PTR_ERR(dentry));
-
+ LASSERT(dentry != NULL);
+ LASSERT(!IS_ERR(dentry));
+ LASSERT(dentry->d_inode != NULL);
+
filter = &exp->exp_obd->u.filter;
-
iattr_from_obdo(&iattr, oa, oa->o_valid);
- push_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL);
- lock_kernel();
-
if (iattr.ia_valid & ATTR_SIZE)
down(&dentry->d_inode->i_sem);
- handle = fsfilt_start(exp->exp_obd, dentry->d_inode, FSFILT_OP_SETATTR,
- oti);
+ handle = fsfilt_start(exp->exp_obd, dentry->d_inode,
+ FSFILT_OP_SETATTR, oti);
if (IS_ERR(handle))
GOTO(out_unlock, rc = PTR_ERR(handle));
/* XXX this could be a rwsem instead, if filter_preprw played along */
if (iattr.ia_valid & ATTR_ATTR_FLAG)
- rc = fsfilt_iocontrol(exp->exp_obd, dentry->d_inode, NULL,
- EXT3_IOC_SETFLAGS,
+ rc = fsfilt_iocontrol(exp->exp_obd, dentry->d_inode,
+ NULL, EXT3_IOC_SETFLAGS,
(long)&iattr.ia_attr_flags);
else
- rc = fsfilt_setattr(exp->exp_obd, dentry, handle, &iattr, 1);
+ rc = fsfilt_setattr(exp->exp_obd, dentry, handle,
+ &iattr, 1);
+
rc = filter_finish_transno(exp, oti, rc);
- rc2 = fsfilt_commit(exp->exp_obd, filter->fo_sb, dentry->d_inode,
- handle, exp->exp_sync);
- if (rc2) {
- CERROR("error on commit, err = %d\n", rc2);
+
+ err = fsfilt_commit(exp->exp_obd, filter->fo_sb,
+ dentry->d_inode, handle,
+ exp->exp_sync);
+ if (err) {
+ CERROR("error on commit, err = %d\n", err);
if (!rc)
- rc = rc2;
+ rc = err;
}
+ EXIT;
+out_unlock:
+ if (iattr.ia_valid & ATTR_SIZE)
+ up(&dentry->d_inode->i_sem);
+ return rc;
+}
+
+/* this is called from filter_truncate() until we have filter_punch() */
+int filter_setattr(struct obd_export *exp, struct obdo *oa,
+ struct lov_stripe_md *md, struct obd_trans_info *oti)
+{
+ struct ldlm_res_id res_id = { .name = { oa->o_id, 0, oa->o_gr, 0 } };
+ struct ldlm_valblock_ops *ns_lvbo;
+ struct lvfs_run_ctxt saved;
+ struct filter_obd *filter;
+ struct ldlm_resource *res;
+ struct dentry *dentry;
+ int rc;
+ ENTRY;
+
+ LASSERT(oti != NULL);
+
+ filter = &exp->exp_obd->u.filter;
+ push_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL);
+
+ /* make sure that object is allocated. */
+ dentry = filter_crow_object(exp->exp_obd,
+ oa->o_gr, oa->o_id);
+ if (IS_ERR(dentry))
+ GOTO(out_pop, rc = PTR_ERR(dentry));
+
+ lock_kernel();
+
+ /* setting objects attributes (including owner/group) */
+ rc = filter_setattr_internal(exp, dentry, oa, oti);
+ if (rc)
+ GOTO(out_unlock, rc);
res = ldlm_resource_get(exp->exp_obd->obd_namespace, NULL,
res_id, LDLM_EXTENT, 0);
+
if (res != NULL) {
- if (res->lr_namespace->ns_lvbo &&
- res->lr_namespace->ns_lvbo->lvbo_update)
- rc = res->lr_namespace->ns_lvbo->lvbo_update(res, NULL,
- 0, 0);
+ ns_lvbo = res->lr_namespace->ns_lvbo;
+ if (ns_lvbo && ns_lvbo->lvbo_update)
+ rc = ns_lvbo->lvbo_update(res, NULL, 0, 0);
ldlm_resource_putref(res);
- } else if (iattr.ia_valid & ATTR_SIZE) {
- /* called from MDS. */
}
-
+
oa->o_valid = OBD_MD_FLID;
obdo_from_inode(oa, dentry->d_inode, FILTER_VALID_FLAGS);
+ EXIT;
out_unlock:
- if (iattr.ia_valid & ATTR_SIZE)
- up(&dentry->d_inode->i_sem);
unlock_kernel();
- pop_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL);
-
f_dput(dentry);
- RETURN(rc);
+out_pop:
+ pop_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL);
+ return rc;
}
/* XXX identical to osc_unpackmd */
RETURN(lsm_size);
}
-static void filter_destroy_precreated(struct obd_export *exp, struct obdo *oa,
- struct filter_obd *filter)
-{
- struct obdo *doa = NULL;
- __u64 last, id;
- ENTRY;
-
- LASSERT(oa);
- LASSERT(oa->o_gr != 0);
- LASSERT(oa->o_valid & OBD_MD_FLGROUP);
-
- doa = obdo_alloc();
- if (doa == NULL) {
- CERROR("cannot allocate doa, error %d\n",
- -ENOMEM);
- EXIT;
- return;
- }
-
- doa->o_mode = S_IFREG;
- doa->o_gr = oa->o_gr;
- doa->o_valid = oa->o_valid & (OBD_MD_FLGROUP | OBD_MD_FLID);
-
- set_bit(doa->o_gr, &filter->fo_destroys_in_progress);
- down(&filter->fo_create_locks[doa->o_gr]);
- if (!test_bit(doa->o_gr, &filter->fo_destroys_in_progress)) {
- CERROR("%s:["LPU64"] destroy_in_progress already cleared\n",
- exp->exp_obd->obd_name, doa->o_gr);
- up(&filter->fo_create_locks[doa->o_gr]);
- GOTO(out_free_doa, 0);
- }
-
- last = filter_last_id(filter, doa->o_gr);
- CWARN("%s:["LPU64"] deleting orphan objects from "LPU64" to "LPU64"\n",
- exp->exp_obd->obd_name, doa->o_gr, oa->o_id + 1, last);
- for (id = oa->o_id + 1; id <= last; id++) {
- doa->o_id = id;
- filter_destroy(exp, doa, NULL, NULL);
- }
-
- CDEBUG(D_HA, "%s:["LPU64"] after destroy: set last_objids = "LPU64"\n",
- exp->exp_obd->obd_name, doa->o_gr, oa->o_id);
-
- filter_set_last_id(filter, doa->o_gr, oa->o_id);
-
- clear_bit(doa->o_gr, &filter->fo_destroys_in_progress);
- up(&filter->fo_create_locks[doa->o_gr]);
-
- EXIT;
-out_free_doa:
- obdo_free(doa);
-}
-
-/* returns a negative error or a nonnegative number of files to create */
-static int filter_should_precreate(struct obd_export *exp, struct obdo *oa,
- obd_gr group)
-{
- struct obd_device *obd = exp->exp_obd;
- struct filter_obd *filter = &obd->u.filter;
- int diff, rc;
- ENTRY;
-
- diff = oa->o_id - filter_last_id(filter, oa->o_gr);
- CDEBUG(D_INFO, "filter_last_id() = "LPU64" -> diff = %d\n",
- filter_last_id(filter, oa->o_gr), diff);
-
- /* delete orphans request */
- if ((oa->o_valid & OBD_MD_FLFLAGS) &&
- (oa->o_flags & OBD_FL_DELORPHAN)) {
- if (diff >= 0)
- RETURN(diff);
- if (-diff > OST_MAX_PRECREATE) {
- CERROR("ignoring bogus orphan destroy request: obdid "
- LPU64" last_id "LPU64"\n",
- oa->o_id, filter_last_id(filter, oa->o_gr));
- RETURN(-EINVAL);
- }
- filter_destroy_precreated(exp, oa, filter);
- rc = filter_update_last_objid(obd, group, 0);
- if (rc)
- CERROR("unable to write lastobjid, but orphans"
- "were deleted\n");
- RETURN(0);
- } else {
- /* only precreate if group == 0 and o_id is specfied */
- if (!(oa->o_valid & OBD_FL_DELORPHAN) &&
- (/*group != 0 ||*/ oa->o_id == 0))
- RETURN(1);
-
- LASSERTF(diff >= 0, LPU64" - "LPU64" = %d\n", oa->o_id,
- filter_last_id(filter, oa->o_gr), diff);
- RETURN(diff);
- }
-}
-static int filter_precreate_rec(struct obd_device *obd, struct dentry *dentry,
- int *number, struct obdo *oa)
-{
- int rc;
- ENTRY;
-
- rc = fsfilt_precreate_rec(obd, dentry, number, oa);
-
- RETURN(rc);
-}
-
static int filter_statfs(struct obd_device *obd, struct obd_statfs *osfs,
unsigned long max_age)
{
RETURN(rc);
}
-/* We rely on the fact that only one thread will be creating files in a given
- * group at a time, which is why we don't need an atomic filter_get_new_id.
- * Even if we had that atomic function, the following race would exist:
- *
- * thread 1: gets id x from filter_next_id
- * thread 2: gets id (x + 1) from filter_next_id
- * thread 2: creates object (x + 1)
- * thread 1: tries to create object x, gets -ENOSPC
- */
-static int filter_precreate(struct obd_device *obd, struct obdo *oa,
- obd_gr group, int *num)
+int filter_create_object(struct obd_device *obd, struct obdo *oa,
+ obd_gr group)
{
- struct dentry *dchild = NULL, *dparent = NULL;
- int err = 0, rc = 0, recreate_obj = 0, i;
+ struct dentry *dparent = NULL;
+ struct dentry *dchild = NULL;
struct filter_obd *filter;
+ struct obd_statfs *osfs;
+ int cleanup_phase = 0;
+ int err = 0, rc = 0;
void *handle = NULL;
void *lock = NULL;
- struct obd_statfs *osfs;
- unsigned long enough_time = jiffies + (obd_timeout * HZ) / 3;
- __u64 next_id;
ENTRY;
filter = &obd->u.filter;
- if ((oa->o_valid & OBD_MD_FLFLAGS) &&
- (oa->o_flags & OBD_FL_RECREATE_OBJS)) {
- recreate_obj = 1;
- } else {
- OBD_ALLOC(osfs, sizeof(*osfs));
- if (osfs == NULL)
- RETURN(-ENOMEM);
- rc = filter_statfs(obd, osfs, jiffies-HZ);
- if (rc == 0 && osfs->os_bavail < (osfs->os_blocks >> 10)) {
- CDEBUG(D_HA, "OST out of space! avail "LPU64"\n",
- osfs->os_bavail<<filter->fo_sb->s_blocksize_bits);
- *num = 0;
- rc = -ENOSPC;
- }
- OBD_FREE(osfs, sizeof(*osfs));
- if (rc) {
- RETURN(rc);
- }
+ OBD_ALLOC(osfs, sizeof(*osfs));
+ if (osfs == NULL)
+ RETURN(-ENOMEM);
+ rc = filter_statfs(obd, osfs, jiffies - HZ);
+ if (rc == 0 && osfs->os_bavail < (osfs->os_blocks >> 10)) {
+ CDEBUG(D_HA, "OST out of space! avail "LPU64"\n",
+ osfs->os_bavail << filter->fo_sb->s_blocksize_bits);
+ rc = -ENOSPC;
}
-
- CDEBUG(D_HA, "%s: precreating %d objects\n", obd->obd_name, *num);
+ OBD_FREE(osfs, sizeof(*osfs));
+ if (rc)
+ RETURN(rc);
down(&filter->fo_create_locks[group]);
- for (i = 0; i < *num && err == 0; i++) {
- int cleanup_phase = 0;
+ if (test_bit(group, &filter->fo_destroys_in_progress)) {
+ CWARN("%s: precreate aborted by destroy\n",
+ obd->obd_name);
+ GOTO(out, rc = -EALREADY);
+ }
- if (test_bit(group, &filter->fo_destroys_in_progress)) {
- CWARN("%s: precreate aborted by destroy\n",
- obd->obd_name);
- break;
- }
+ CDEBUG(D_INFO, "precreate objid "LPU64"\n", oa->o_id);
- if (recreate_obj) {
- __u64 last_id;
- next_id = oa->o_id;
- last_id = filter_last_id(filter, group);
- if (next_id > last_id) {
- CERROR("Error: Trying to recreate obj greater"
- "than last id "LPD64" > "LPD64"\n",
- next_id, last_id);
- GOTO(cleanup, rc = -EINVAL);
- }
- } else {
- next_id = filter_last_id(filter, group) + 1;
- }
+ dparent = filter_parent_lock(obd, group, oa->o_id, &lock);
+ if (IS_ERR(dparent))
+ GOTO(cleanup, rc = PTR_ERR(dparent));
+ cleanup_phase = 1;
- CDEBUG(D_INFO, "precreate objid "LPU64"\n", next_id);
-
- dparent = filter_parent_lock(obd, group, next_id, &lock);
- if (IS_ERR(dparent))
- GOTO(cleanup, rc = PTR_ERR(dparent));
- cleanup_phase = 1;
-
- /* precreate objects are not logged */
- fsfilt_set_fs_flags(obd, dparent->d_inode, SM_PRECREATE);
-
- dchild = filter_id2dentry(obd, dparent, group, next_id);
- if (IS_ERR(dchild))
- GOTO(cleanup, rc = PTR_ERR(dchild));
- cleanup_phase = 2;
-
- if (dchild->d_inode != NULL) {
- /* This would only happen if lastobjid was bad on disk*/
- /* Could also happen if recreating missing obj but
- * already exists
- */
- if (recreate_obj) {
- CERROR("%s: recreating existing object %.*s?\n",
- obd->obd_name, dchild->d_name.len,
- dchild->d_name.name);
- } else {
- CERROR("%s: Serious error: objid %.*s already "
- "exists; is this filesystem corrupt?\n",
- obd->obd_name, dchild->d_name.len,
- dchild->d_name.name);
- LBUG();
- }
- GOTO(cleanup, rc = -EEXIST);
- }
+ dchild = filter_id2dentry(obd, dparent, group, oa->o_id);
+ if (IS_ERR(dchild))
+ GOTO(cleanup, rc = PTR_ERR(dchild));
+ cleanup_phase = 2;
+
+ if (dchild->d_inode != NULL) {
+ CERROR("%s: serious error: objid %.*s already "
+ "exists; is this filesystem corrupted?\n",
+ obd->obd_name, dchild->d_name.len,
+ dchild->d_name.name);
+ GOTO(cleanup, rc = -EEXIST);
+ }
+
+ handle = fsfilt_start_log(obd, dparent->d_inode,
+ FSFILT_OP_CREATE, NULL, 1);
+ if (IS_ERR(handle))
+ GOTO(cleanup, rc = PTR_ERR(handle));
+ cleanup_phase = 3;
- handle = fsfilt_start_log(obd, dparent->d_inode,
- FSFILT_OP_CREATE, NULL, 1);
- if (IS_ERR(handle))
- GOTO(cleanup, rc = PTR_ERR(handle));
- cleanup_phase = 3;
+ rc = ll_vfs_create(dparent->d_inode, dchild, S_IFREG, NULL);
+ if (rc) {
+ CERROR("create failed rc = %d\n", rc);
+ GOTO(cleanup, rc);
+ }
+
+ fsfilt_set_fs_flags(obd, dparent->d_inode, SM_DO_REC);
+
+ if (oa->o_id > filter_last_id(filter, group)) {
+ /*
+ * saving last created object id, it will be needed in recovery
+ * for deleting orphanes.
+ */
+ filter_set_last_id(filter, group, oa->o_id);
- rc = ll_vfs_create(dparent->d_inode, dchild, S_IFREG, NULL);
+ rc = filter_update_last_objid(obd, group, 0);
if (rc) {
- CERROR("create failed rc = %d\n", rc);
- GOTO(cleanup, rc);
+ CERROR("unable to write lastobjid, but "
+ "orphans were deleted, err = %d\n",
+ rc);
+ rc = 0;
}
-
- if (!recreate_obj) {
- filter_set_last_id(filter, group, next_id);
- err = filter_update_last_objid(obd, group, 0);
- if (err)
- CERROR("unable to write lastobjid "
- "but file created\n");
+ }
+cleanup:
+ switch(cleanup_phase) {
+ case 3:
+ err = fsfilt_commit(obd, filter->fo_sb,
+ dparent->d_inode, handle, 0);
+ if (err) {
+ CERROR("error on commit, err = %d\n", err);
+ if (!rc)
+ rc = err;
}
- fsfilt_set_fs_flags(obd, dparent->d_inode, SM_DO_REC);
+ case 2:
+ f_dput(dchild);
+ case 1:
+ filter_parent_unlock(dparent, lock);
+ case 0:
+ break;
+ }
+
+ if (rc)
+ GOTO(out, rc);
+
+out:
+ up(&filter->fo_create_locks[group]);
+ RETURN(rc);
+}
+
+struct dentry *filter_crow_object(struct obd_device *obd,
+ __u64 ogr, __u64 oid)
+{
+ struct dentry *dentry;
+ struct obdo *oa;
+ int rc = 0;
+ ENTRY;
+
+ /* check if object is already allocated */
+ dentry = filter_id2dentry(obd, NULL, ogr, oid);
+ if (IS_ERR(dentry))
+ RETURN(dentry);
+
+ if (dentry->d_inode)
+ RETURN(dentry);
+
+ f_dput(dentry);
- cleanup:
- switch(cleanup_phase) {
- case 3:
- err = fsfilt_commit(obd, filter->fo_sb,
- dparent->d_inode, handle, 0);
- if (err) {
- CERROR("error on commit, err = %d\n", err);
- if (!rc)
- rc = err;
- }
- case 2:
- f_dput(dchild);
- case 1:
- filter_parent_unlock(dparent, lock);
- case 0:
- break;
- }
+ /* allocate object as it does not exist */
+ oa = obdo_alloc();
+ if (oa == NULL)
+ RETURN(ERR_PTR(-ENOMEM));
- if (rc)
- break;
- if (time_after(jiffies, enough_time)) {
- CDEBUG(D_INODE,"%s: precreate slow - want %d got %d \n",
- obd->obd_name, *num, i);
- break;
- }
+ oa->o_id = oid;
+ oa->o_gr = ogr;
+ oa->o_valid = OBD_MD_FLID | OBD_MD_FLGROUP;
+
+ CDEBUG(D_INODE, "OSS object "LPU64"/"LPU64
+ " does not exists - allocate now\n",
+ oid, ogr);
+
+ rc = filter_create_object(obd, oa, oa->o_gr);
+ if (rc) {
+ CERROR("cannot create OSS object "LPU64"/"LPU64
+ ", err = %d\n", oa->o_id, oa->o_gr, rc);
+ GOTO(out_free_oa, dentry = ERR_PTR(rc));
}
- *num = i;
+ /* lookup for just created object and return it to caller */
+ dentry = filter_id2dentry(obd, NULL, ogr, oid);
+ if (IS_ERR(dentry))
+ GOTO(out_free_oa, dentry);
+
+ if (dentry->d_inode == NULL) {
+ f_dput(dentry);
+ dentry = ERR_PTR(-ENOENT);
+ CERROR("cannot find just created OSS object "
+ LPU64"/"LPU64" err = %d\n", oid,
+ ogr, (int)PTR_ERR(dentry));
+ GOTO(out_free_oa, dentry);
+ }
- /* check if we have an error after ll_vfs_create(). It is possible that
- * there will be say -ENOSPC and we will leak it. */
- if (rc == 0)
- rc = filter_precreate_rec(obd, dparent, num, oa);
+ EXIT;
+out_free_oa:
+ obdo_free(oa);
+ return dentry;
+}
- up(&filter->fo_create_locks[group]);
+static int
+filter_clear_orphans(struct obd_export *exp, struct obdo *oa)
+{
+ struct obd_device *obd = NULL;
+ struct filter_obd *filter;
+ struct obdo *doa = NULL;
+ int rc = 0, orphans;
+ __u64 last, id;
+ ENTRY;
+
+ LASSERT(oa);
+ LASSERT(oa->o_gr != 0);
+ LASSERT(oa->o_valid & OBD_MD_FLGROUP);
+
+ obd = exp->exp_obd;
+ filter = &obd->u.filter;
- CDEBUG(D_HA, "%s: server last_objid for group "LPU64": "LPU64"\n",
- obd->obd_name, group, filter->fo_last_objids[group]);
+ last = filter_last_id(filter, oa->o_gr);
+ orphans = last - oa->o_id;
+
+ if (orphans <= 0)
+ RETURN(0);
+
+ doa = obdo_alloc();
+ if (doa == NULL)
+ RETURN(-ENOMEM);
- CDEBUG(D_HA, "%s: filter_precreate() created %d objects\n",
- obd->obd_name, i);
+ doa->o_gr = oa->o_gr;
+ doa->o_mode = S_IFREG;
+ doa->o_valid = oa->o_valid & (OBD_MD_FLGROUP | OBD_MD_FLID);
- RETURN(rc);
+ set_bit(doa->o_gr, &filter->fo_destroys_in_progress);
+ down(&filter->fo_create_locks[doa->o_gr]);
+ if (!test_bit(doa->o_gr, &filter->fo_destroys_in_progress)) {
+ CERROR("%s:["LPU64"] destroy_in_progress already cleared\n",
+ exp->exp_obd->obd_name, doa->o_gr);
+ up(&filter->fo_create_locks[doa->o_gr]);
+ GOTO(out_free_doa, 0);
+ }
+
+ CWARN("%s:["LPU64"] deleting orphan objects from "LPU64" to "
+ LPU64"\n", exp->exp_obd->obd_name, doa->o_gr,
+ oa->o_id + 1, last);
+
+ for (id = oa->o_id + 1; id <= last; id++) {
+ doa->o_id = id;
+ filter_destroy(exp, doa, NULL, NULL);
+ }
+
+ CDEBUG(D_HA, "%s:["LPU64"] after destroy: set last_objids = "
+ LPU64"\n", exp->exp_obd->obd_name, doa->o_gr, oa->o_id);
+
+ filter_set_last_id(filter, oa->o_gr, oa->o_id);
+ clear_bit(doa->o_gr, &filter->fo_destroys_in_progress);
+ up(&filter->fo_create_locks[oa->o_gr]);
+
+ EXIT;
+out_free_doa:
+ obdo_free(doa);
+ return rc;
}
-static int filter_create(struct obd_export *exp, struct obdo *oa,
- void *acl, int acl_size,
- struct lov_stripe_md **ea, struct obd_trans_info *oti)
+/*
+ * by now this function is only needed as entry point for deleting orphanes on
+ * OSS as objects are created on first write attempt. --umka
+ */
+static int
+filter_create(struct obd_export *exp, struct obdo *oa, void *acl,
+ int acl_size, struct lov_stripe_md **ea,
+ struct obd_trans_info *oti)
{
+ struct filter_export_data *fed;
struct obd_device *obd = NULL;
- struct filter_obd *filter;
+ int group = oa->o_gr, rc = 0;
struct lvfs_run_ctxt saved;
- struct lov_stripe_md *lsm = NULL;
- struct filter_export_data *fed;
+ struct filter_obd *filter;
char str[PTL_NALFMT_SIZE];
- int group = oa->o_gr, rc = 0, diff, recreate_objs = 0;
ENTRY;
LASSERT(acl == NULL && acl_size == 0);
RETURN(-EINVAL);
}
- if ((oa->o_valid & OBD_MD_FLFLAGS) &&
- (oa->o_flags & OBD_FL_RECREATE_OBJS))
- recreate_objs = 1;
-
obd = exp->exp_obd;
fed = &exp->exp_filter_data;
filter = &obd->u.filter;
- if (fed->fed_group != group && !recreate_objs &&
- !(oa->o_valid & OBD_MD_REINT)) {
+ if (fed->fed_group != group) {
portals_nid2str(exp->exp_connection->c_peer.peer_ni->pni_number,
exp->exp_connection->c_peer.peer_id.nid, str);
- CERROR("!!! This export (nid "LPX64"/%s) used object group %d "
+ CERROR("!!! this export (nid "LPX64"/%s) used object group %d "
"earlier; now it's trying to use group %d! This could "
"be a bug in the MDS. Tell CFS.\n",
exp->exp_connection->c_peer.peer_id.nid, str,
CDEBUG(D_INFO, "filter_create(od->o_gr=%d,od->o_id="LPU64")\n",
group, oa->o_id);
- if (ea != NULL) {
- lsm = *ea;
- if (lsm == NULL) {
- rc = obd_alloc_memmd(exp, &lsm);
- if (rc < 0)
- RETURN(rc);
- }
- }
obd = exp->exp_obd;
push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
- if (oa->o_valid & OBD_MD_REINT) {
- int num = *((int*)oa->o_inline);
- rc = filter_precreate(obd, oa, oa->o_gr, &num);
- } else if (recreate_objs) {
- if (oa->o_id > filter_last_id(&obd->u.filter, group)) {
- CERROR("recreate objid "LPU64" > last id "LPU64"\n",
- oa->o_id, filter_last_id(&obd->u.filter, group));
- rc = -EINVAL;
- } else {
- diff = 1;
- rc = filter_precreate(obd, oa, group, &diff);
- }
+ LASSERT((oa->o_valid & OBD_MD_FLFLAGS) &&
+ (oa->o_flags == OBD_FL_DELORPHAN));
+
+ rc = filter_clear_orphans(exp, oa);
+ if (rc) {
+ CERROR("cannot clear orphanes starting from "
+ LPU64", err = %d\n", oa->o_id, rc);
} else {
- diff = filter_should_precreate(exp, oa, group);
- if (diff > 0) {
- oa->o_id = filter_last_id(&obd->u.filter, group);
- rc = filter_precreate(obd, oa, group, &diff);
- oa->o_id = filter_last_id(&obd->u.filter, oa->o_gr);
- oa->o_valid = OBD_MD_FLID;
+ rc = filter_update_last_objid(obd, group, 0);
+ if (rc) {
+ CERROR("unable to write lastobjid, but "
+ "orphans were deleted, err = %d\n",
+ rc);
}
}
-
pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
- if (rc && ea != NULL && *ea != lsm) {
- obd_free_memmd(exp, &lsm);
- } else if (rc == 0 && ea != NULL) {
- /* XXX LOV STACKING: the lsm that is passed to us from
- * LOV does not have valid lsm_oinfo data structs, so
- * don't go touching that. This needs to be fixed in a
- * big way. */
- lsm->lsm_object_id = oa->o_id;
- lsm->lsm_object_gr = oa->o_gr;
- *ea = lsm;
- }
-
- RETURN(rc);
+
+ RETURN(0);
}
static int filter_destroy(struct obd_export *exp, struct obdo *oa,
int filter_common_setup(struct obd_device *, obd_count len, void *buf,
char *option);
+struct dentry *filter_crow_object(struct obd_device *obd, __u64 ogr,
+ __u64 oid);
+
+int filter_setattr_internal(struct obd_export *exp, struct dentry *dentry,
+ struct obdo *oa, struct obd_trans_info *oti);
+
+int filter_setattr(struct obd_export *exp, struct obdo *oa,
+ struct lov_stripe_md *md, struct obd_trans_info *oti);
+
+int filter_create_object(struct obd_device *obd, struct obdo *oa,
+ obd_gr group);
+
/* filter_lvb.c */
extern struct ldlm_valblock_ops filter_lvbo;
if (rc)
GOTO(cleanup, rc);
- dentry = filter_oa2dentry(obd, oa);
+ dentry = filter_id2dentry(obd, NULL, oa->o_gr, oa->o_id);
if (IS_ERR(dentry))
GOTO(cleanup, rc = PTR_ERR(dentry));
- if (dentry->d_inode == NULL) {
- CERROR("trying to BRW to non-existent file "LPU64"\n",
- obj->ioo_id);
- GOTO(cleanup, rc = -ENOENT);
- }
-
inode = dentry->d_inode;
fsfilt_check_slow(now, obd_timeout, "preprw_read setup");
lnb->len = rnb->len;
lnb->flags = rnb->flags;
- if (inode->i_size <= rnb->offset)
- /* If there's no more data, abort early.
- * lnb->page == NULL and lnb->rc == 0, so it's
- * easy to detect later. */
+ if ((inode && inode->i_size <= rnb->offset) || inode == NULL)
+ /*
+ * if there's no more data, abort early. lnb->page == *
+ * NULL and lnb->rc == 0, so it's easy to detect later.
+ */
break;
- else
- rc = filter_alloc_dio_page(obd, inode, lnb);
+
+ rc = filter_alloc_dio_page(obd, inode, lnb);
if (rc) {
CDEBUG(rc == -ENOSPC ? D_INODE : D_ERROR,
"page err %u@"LPU64" %u/%u %p: rc %d\n",
fsfilt_check_slow(now, obd_timeout, "start_page_read");
- rc = filter_direct_io(OBD_BRW_READ, dentry, iobuf, exp,
- NULL, NULL, NULL);
- if (rc)
- GOTO(cleanup, rc);
-
- lprocfs_counter_add(obd->obd_stats, LPROC_FILTER_READ_BYTES, tot_bytes);
+ if (inode != NULL) {
+ rc = filter_direct_io(OBD_BRW_READ, dentry, iobuf,
+ exp, NULL, NULL, NULL);
+ if (rc)
+ GOTO(cleanup, rc);
+ }
+ lprocfs_counter_add(obd->obd_stats,
+ LPROC_FILTER_READ_BYTES, tot_bytes);
filter_tally_read(&exp->exp_obd->u.filter, res, niocount);
EXIT;
-
cleanup:
- if (rc != 0) {
- filter_free_dio_pages(objcount, obj, niocount, res);
-
- if (dentry != NULL)
- f_dput(dentry);
- else
- CERROR("NULL dentry in cleanup -- tell CFS\n");
+ if (rc) {
+ filter_free_dio_pages(objcount, obj,
+ niocount, res);
+ /*
+ * in other cases (no errors) dentry is released in
+ * filter_commitrw_read().
+ */
+ f_dput(dentry);
}
if (iobuf != NULL)
filter_free_iobuf(iobuf);
pop_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL);
+
if (rc)
CERROR("io error %d\n", rc);
+
return rc;
}
struct niobuf_local *res,
struct obd_trans_info *oti)
{
+ int rc = 0, i, tot_bytes = 0, cleanup_phase = 0;
+ struct niobuf_local *lnb = res;
+ struct dentry *dentry = NULL;
+ unsigned long now = jiffies;
struct lvfs_run_ctxt saved;
struct niobuf_remote *rnb;
- struct niobuf_local *lnb = res;
struct fsfilt_objinfo fso;
- struct dentry *dentry = NULL;
- void *iobuf;
+ struct obd_device *obd;
obd_size left;
- unsigned long now = jiffies;
- int rc = 0, i, tot_bytes = 0, cleanup_phase = 0;
+ void *iobuf;
+
ENTRY;
LASSERT(objcount == 1);
LASSERT(obj->ioo_bufcnt > 0);
GOTO(cleanup, rc);
cleanup_phase = 1;
- push_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL);
- dentry = filter_id2dentry(exp->exp_obd, NULL, obj->ioo_gr,
- obj->ioo_id);
+ obd = exp->exp_obd;
+ push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
+
+ /* make sure that object is already allocated */
+ dentry = filter_crow_object(obd, obj->ioo_gr,
+ obj->ioo_id);
+
if (IS_ERR(dentry))
GOTO(cleanup, rc = PTR_ERR(dentry));
-
+
cleanup_phase = 2;
-
- if (dentry->d_inode == NULL) {
- CERROR("trying to BRW to non-existent file "LPU64"\n",
- obj->ioo_id);
- GOTO(cleanup, rc = -ENOENT);
- }
+ /*
+ * setting attrs passed along with write requests (owner/group). We
+ * goind it here as object should not exist with wrong owner/group as
+ * this may break quotas. --umka
+ */
+ rc = filter_setattr_internal(exp, dentry, oa, NULL);
+ if (rc) {
+ CERROR("cannot set attrs on write, err %d\n",
+ rc);
+ GOTO(cleanup, rc);
+ }
+
fso.fso_dentry = dentry;
fso.fso_bufcnt = obj->ioo_bufcnt;
fsfilt_check_slow(now, obd_timeout, "preprw_write setup");
- spin_lock(&exp->exp_obd->obd_osfs_lock);
+ spin_lock(&obd->obd_osfs_lock);
if (oa)
filter_grant_incoming(exp, oa);
* o_valid here. */
oa->o_valid = 0;
- spin_unlock(&exp->exp_obd->obd_osfs_lock);
+ spin_unlock(&obd->obd_osfs_lock);
if (rc)
GOTO(cleanup, rc);
lnb->len = rnb->len;
lnb->flags = rnb->flags;
- rc = filter_alloc_dio_page(exp->exp_obd, dentry->d_inode,lnb);
+ rc = filter_alloc_dio_page(obd, dentry->d_inode,lnb);
if (rc) {
CERROR("page err %u@"LPU64" %u/%u %p: rc %d\n",
lnb->len, lnb->offset,
* asked to read unmapped blocks -- brw_kiovec() does this. */
if (lnb->len != PAGE_SIZE) {
if (lnb->offset + lnb->len < dentry->d_inode->i_size) {
- filter_iobuf_add_page(exp->exp_obd, iobuf,
- dentry->d_inode,
+ filter_iobuf_add_page(obd, iobuf, dentry->d_inode,
lnb->page);
} else {
memset(kmap(lnb->page) + lnb->len, 0,
fsfilt_check_slow(now, obd_timeout, "start_page_write");
- lprocfs_counter_add(exp->exp_obd->obd_stats, LPROC_FILTER_WRITE_BYTES,
+ lprocfs_counter_add(obd->obd_stats, LPROC_FILTER_WRITE_BYTES,
tot_bytes);
EXIT;
cleanup:
if (rc)
filter_free_dio_pages(objcount, obj, niocount, res);
case 3:
- pop_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL);
+ pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
filter_free_iobuf(iobuf);
case 2:
- if (rc)
+ if (rc && dentry && !IS_ERR(dentry))
f_dput(dentry);
break;
case 1:
- spin_lock(&exp->exp_obd->obd_osfs_lock);
+ spin_lock(&obd->obd_osfs_lock);
if (oa)
filter_grant_incoming(exp, oa);
- spin_unlock(&exp->exp_obd->obd_osfs_lock);
- pop_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL);
+ spin_unlock(&obd->obd_osfs_lock);
+ pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
filter_free_iobuf(iobuf);
break;
default:;
/* Called with res->lr_lvb_sem held */
static int filter_lvbo_init(struct ldlm_resource *res)
{
- int rc = 0;
- struct obdo *oa = NULL;
struct ost_lvb *lvb = NULL;
+ struct filter_obd *filter;
struct obd_device *obd;
struct dentry *dentry;
+ __u64 ogr, oid;
+ int rc = 0;
ENTRY;
LASSERT(res);
res->lr_lvb_len = sizeof(*lvb);
obd = res->lr_namespace->ns_lvbp;
+ filter = &obd->u.filter;
LASSERT(obd != NULL);
- oa = obdo_alloc();
- if (oa == NULL)
- GOTO(out, rc = -ENOMEM);
-
- oa->o_id = res->lr_name.name[0];
- oa->o_gr = res->lr_name.name[2];
- oa->o_valid = OBD_MD_FLID | OBD_MD_FLGROUP;
+ oid = res->lr_name.name[0];
+ ogr = res->lr_name.name[2];
- dentry = filter_oa2dentry(obd, oa);
+ dentry = filter_id2dentry(obd, NULL, ogr, oid);
if (IS_ERR(dentry))
GOTO(out, rc = PTR_ERR(dentry));
- /* Limit the valid bits in the return data to what we actually use */
- oa->o_valid = OBD_MD_FLID | OBD_MD_FLGROUP;
- obdo_from_inode(oa, dentry->d_inode, FILTER_VALID_FLAGS);
- f_dput(dentry);
-
- lvb->lvb_size = dentry->d_inode->i_size;
- lvb->lvb_mtime = LTIME_S(dentry->d_inode->i_mtime);
- lvb->lvb_blocks = dentry->d_inode->i_blocks;
+ if (dentry->d_inode == NULL) {
+ lvb->lvb_size = 0;
+ lvb->lvb_blocks = 0;
+ lvb->lvb_mtime = LTIME_S(CURRENT_TIME);
+ } else {
+ lvb->lvb_size = dentry->d_inode->i_size;
+ lvb->lvb_blocks = dentry->d_inode->i_blocks;
+ lvb->lvb_mtime = LTIME_S(dentry->d_inode->i_mtime);
+ }
CDEBUG(D_DLMTRACE, "res: "LPU64" initial lvb size: "LPU64", "
- "mtime: "LPU64", blocks: "LPU64"\n",
- res->lr_name.name[0], lvb->lvb_size,
- lvb->lvb_mtime, lvb->lvb_blocks);
+ "mtime: "LPU64", blocks: "LPU64"\n", res->lr_name.name[0],
+ lvb->lvb_size, lvb->lvb_mtime, lvb->lvb_blocks);
- out:
- if (oa)
- obdo_free(oa);
- /* Don't free lvb data on lookup error */
+ f_dput(dentry);
+ EXIT;
+out:
+ /* don't free lvb data on lookup error */
return rc;
}
static int filter_lvbo_update(struct ldlm_resource *res, struct lustre_msg *m,
int buf_idx, int increase)
{
- int rc = 0;
- struct obdo *oa = NULL;
struct ost_lvb *lvb = res->lr_lvb_data;
struct obd_device *obd;
+ struct obdo *oa = NULL;
struct dentry *dentry;
+ int rc = 0;
ENTRY;
LASSERT(res);
return rc;
}
-int osc_rd_create_count(char *page, char **start, off_t off, int count,
- int *eof, void *data)
-{
- struct obd_device *obd = data;
-
- if (obd == NULL)
- return 0;
-
- return snprintf(page, count, "%d\n",
- obd->u.cli.cl_oscc.oscc_grow_count);
-}
-
-int osc_wr_create_count(struct file *file, const char *buffer,
- unsigned long count, void *data)
-{
- struct obd_device *obd = data;
- int val, rc;
-
- if (obd == NULL)
- return 0;
-
- rc = lprocfs_write_helper(buffer, count, &val);
- if (rc)
- return rc;
-
- if (val < 0)
- return -ERANGE;
-
- obd->u.cli.cl_oscc.oscc_grow_count = val;
-
- return count;
-}
-
-int osc_rd_prealloc_next_id(char *page, char **start, off_t off, int count,
- int *eof, void *data)
-{
- struct obd_device *obd = data;
-
- if (obd == NULL)
- return 0;
-
- return snprintf(page, count, LPU64"\n",
- obd->u.cli.cl_oscc.oscc_next_id);
-}
-
-int osc_rd_prealloc_last_id(char *page, char **start, off_t off, int count,
- int *eof, void *data)
-{
- struct obd_device *obd = data;
-
- if (obd == NULL)
- return 0;
-
- return snprintf(page, count, LPU64"\n",
- obd->u.cli.cl_oscc.oscc_last_id);
-}
-
static struct lprocfs_vars lprocfs_obd_vars[] = {
{ "uuid", lprocfs_rd_uuid, 0, 0 },
{ "blocksize", lprocfs_rd_blksize, 0, 0 },
{ "max_dirty_mb", osc_rd_max_dirty_mb, osc_wr_max_dirty_mb, 0 },
{ "cur_dirty_bytes", osc_rd_cur_dirty_bytes, 0, 0 },
{ "cur_grant_bytes", osc_rd_cur_grant_bytes, 0, 0 },
- { "create_count", osc_rd_create_count, osc_wr_create_count, 0 },
- { "prealloc_next_id", osc_rd_prealloc_next_id, 0, 0 },
- { "prealloc_last_id", osc_rd_prealloc_last_id, 0, 0 },
{ 0 }
};
#include <linux/obd_class.h>
#include "osc_internal.h"
-static int osc_interpret_create(struct ptlrpc_request *req, void *data, int rc)
-{
- struct osc_creator *oscc;
- struct ost_body *body = NULL;
- ENTRY;
-
- if (req->rq_repmsg) {
- body = lustre_swab_repbuf(req, 0, sizeof(*body),
- lustre_swab_ost_body);
- if (body == NULL && rc == 0)
- rc = -EPROTO;
- }
-
- oscc = req->rq_async_args.pointer_arg[0];
- spin_lock(&oscc->oscc_lock);
- oscc->oscc_flags &= ~OSCC_FLAG_CREATING;
- if (rc == -ENOSPC || rc == -EROFS) {
- oscc->oscc_flags |= OSCC_FLAG_NOSPC;
- if (body && rc == -ENOSPC) {
- oscc->oscc_grow_count = OST_MIN_PRECREATE;
- oscc->oscc_last_id = body->oa.o_id;
- }
- spin_unlock(&oscc->oscc_lock);
- DEBUG_REQ(D_INODE, req, "OST out of space, flagging");
- } else if (rc != 0 && rc != -EIO) {
- oscc->oscc_flags |= OSCC_FLAG_RECOVERING;
- oscc->oscc_grow_count = OST_MIN_PRECREATE;
- spin_unlock(&oscc->oscc_lock);
- DEBUG_REQ(D_ERROR, req,
- "unknown rc %d from async create: failing oscc", rc);
- ptlrpc_fail_import(req->rq_import, req->rq_import_generation);
- } else {
- if (rc == 0) {
- oscc->oscc_flags &= ~OSCC_FLAG_LOW;
- if (body) {
- int diff = body->oa.o_id - oscc->oscc_last_id;
- if (diff != oscc->oscc_grow_count)
- oscc->oscc_grow_count =
- max(diff/3, OST_MIN_PRECREATE);
- oscc->oscc_last_id = body->oa.o_id;
- }
- }
- spin_unlock(&oscc->oscc_lock);
- }
-
- CDEBUG(D_HA, "preallocated through id "LPU64" (last used "LPU64")\n",
- oscc->oscc_last_id, oscc->oscc_next_id);
-
- wake_up(&oscc->oscc_waitq);
- RETURN(rc);
-}
-
-static int oscc_internal_create(struct osc_creator *oscc)
-{
- struct ptlrpc_request *request;
- struct ost_body *body;
- int size = sizeof(*body);
- ENTRY;
-
- spin_lock(&oscc->oscc_lock);
- if (oscc->oscc_grow_count < OST_MAX_PRECREATE &&
- !(oscc->oscc_flags & (OSCC_FLAG_LOW | OSCC_FLAG_RECOVERING)) &&
- (__s64)(oscc->oscc_last_id - oscc->oscc_next_id) <=
- (oscc->oscc_grow_count / 4 + 1)) {
- oscc->oscc_flags |= OSCC_FLAG_LOW;
- oscc->oscc_grow_count *= 2;
- }
-
- if (oscc->oscc_grow_count > OST_MAX_PRECREATE / 2)
- oscc->oscc_grow_count = OST_MAX_PRECREATE / 2;
-
- if (oscc->oscc_flags & OSCC_FLAG_CREATING ||
- oscc->oscc_flags & OSCC_FLAG_RECOVERING) {
- spin_unlock(&oscc->oscc_lock);
- RETURN(0);
- }
- oscc->oscc_flags |= OSCC_FLAG_CREATING;
- spin_unlock(&oscc->oscc_lock);
-
- request = ptlrpc_prep_req(oscc->oscc_obd->u.cli.cl_import,
- LUSTRE_OBD_VERSION, OST_CREATE,
- 1, &size, NULL);
- if (request == NULL) {
- spin_lock(&oscc->oscc_lock);
- oscc->oscc_flags &= ~OSCC_FLAG_CREATING;
- spin_unlock(&oscc->oscc_lock);
- RETURN(-ENOMEM);
- }
-
- request->rq_request_portal = OST_CREATE_PORTAL; //XXX FIXME bug 249
- body = lustre_msg_buf(request->rq_reqmsg, 0, sizeof(*body));
-
- spin_lock(&oscc->oscc_lock);
- body->oa.o_id = oscc->oscc_last_id + oscc->oscc_grow_count;
- /* probably we should take frequence of request into account? -bzzz */
- if (oscc->oscc_grow_count < oscc->oscc_max_grow_count) {
- oscc->oscc_grow_count *= 2;
- if (oscc->oscc_grow_count > oscc->oscc_max_grow_count)
- oscc->oscc_grow_count = oscc->oscc_max_grow_count;
- }
- body->oa.o_gr = oscc->oscc_gr;
- LASSERT(body->oa.o_gr > 0);
- body->oa.o_valid |= OBD_MD_FLID | OBD_MD_FLGROUP;
- spin_unlock(&oscc->oscc_lock);
- CDEBUG(D_INFO, "preallocating through id "LPU64" (last used "LPU64")\n",
- body->oa.o_id, oscc->oscc_next_id);
-
- request->rq_replen = lustre_msg_size(1, &size);
-
- request->rq_async_args.pointer_arg[0] = oscc;
- request->rq_interpret_reply = osc_interpret_create;
- ptlrpcd_add_req(request);
-
- RETURN(0);
-}
-
-static int oscc_has_objects(struct osc_creator *oscc, int count)
-{
- int have_objs;
- spin_lock(&oscc->oscc_lock);
- have_objs = ((__s64)(oscc->oscc_last_id - oscc->oscc_next_id) >= count);
- spin_unlock(&oscc->oscc_lock);
-
- if (!have_objs)
- oscc_internal_create(oscc);
-
- return have_objs;
-}
-
-static int oscc_wait_for_objects(struct osc_creator *oscc, int count)
-{
- int have_objs;
- int ost_full;
- int osc_invalid;
-
- have_objs = oscc_has_objects(oscc, count);
-
- spin_lock(&oscc->oscc_lock);
- ost_full = (oscc->oscc_flags & OSCC_FLAG_NOSPC);
- spin_unlock(&oscc->oscc_lock);
-
- osc_invalid = oscc->oscc_obd->u.cli.cl_import->imp_invalid;
-
- return have_objs || ost_full || osc_invalid;
-}
-
-static int oscc_precreate(struct osc_creator *oscc, int wait)
-{
- struct l_wait_info lwi = { 0 };
- int rc = 0;
- ENTRY;
-
- if (oscc_has_objects(oscc, oscc->oscc_grow_count / 2))
- RETURN(0);
-
- if (!wait)
- RETURN(0);
-
- /* no rc check -- a no-INTR, no-TIMEOUT wait can't fail */
- l_wait_event(oscc->oscc_waitq, oscc_wait_for_objects(oscc, 1), &lwi);
-
- if (!oscc_has_objects(oscc, 1) && (oscc->oscc_flags & OSCC_FLAG_NOSPC))
- rc = -ENOSPC;
-
- if (oscc->oscc_obd->u.cli.cl_import->imp_invalid)
- rc = -EIO;
-
- RETURN(rc);
-}
-
-int oscc_recovering(struct osc_creator *oscc)
-{
- int recov = 0;
-
- spin_lock(&oscc->oscc_lock);
- recov = oscc->oscc_flags & OSCC_FLAG_RECOVERING;
- spin_unlock(&oscc->oscc_lock);
-
- return recov;
-}
-
+/* this only is used now for deleting orphanes */
int osc_create(struct obd_export *exp, struct obdo *oa,
- void *acl, int acl_size,
- struct lov_stripe_md **ea, struct obd_trans_info *oti)
+ void *acl, int acl_size, struct lov_stripe_md **ea,
+ struct obd_trans_info *oti)
{
- struct lov_stripe_md *lsm;
struct osc_creator *oscc = &exp->exp_obd->u.cli.cl_oscc;
- int try_again = 1, rc = 0;
+ int rc = 0;
ENTRY;
+
LASSERT(oa);
LASSERT(ea);
- LASSERT(oa->o_valid & OBD_MD_FLGROUP);
LASSERT(oa->o_gr > 0);
+ LASSERT(oa->o_valid & OBD_MD_FLGROUP);
LASSERT(acl == NULL && acl_size == 0);
- if ((oa->o_valid & OBD_MD_FLFLAGS) &&
- oa->o_flags == OBD_FL_RECREATE_OBJS) {
- /* Exceptional case where we are trying to repair missing
- * objects for various groups. We have already validated that
- * this is a valid group for the file. Don't set oscc->oscc_gr.
- */
- RETURN(osc_real_create(exp, oa, ea, oti));
- }
-
- LASSERT(oscc->oscc_gr == 0 || oscc->oscc_gr == oa->o_gr);
- oscc->oscc_gr = oa->o_gr;
-
if (oa->o_gr == FILTER_GROUP_LLOG || oa->o_gr == FILTER_GROUP_ECHO)
RETURN(osc_real_create(exp, oa, ea, oti));
exp->exp_obd->obd_name);
LASSERT(oscc->oscc_flags & OSCC_FLAG_RECOVERING);
- /* delete from next_id on up */
- oa->o_valid |= OBD_MD_FLID;
- oa->o_id = oscc->oscc_next_id - 1;
-
CDEBUG(D_HA, "%s: deleting to next_id: "LPU64"\n",
oscc->oscc_obd->obd_name, oa->o_id);
if (rc == -ENOSPC)
oscc->oscc_flags |= OSCC_FLAG_NOSPC;
oscc->oscc_flags &= ~OSCC_FLAG_RECOVERING;
- oscc->oscc_last_id = oa->o_id;
-
CDEBUG(D_HA, "%s: oscc recovery finished: %d\n",
oscc->oscc_obd->obd_name, rc);
- wake_up(&oscc->oscc_waitq);
-
} else {
CDEBUG(D_ERROR, "%s: oscc recovery failed: %d\n",
oscc->oscc_obd->obd_name, rc);
}
spin_unlock(&oscc->oscc_lock);
-
RETURN(rc);
}
- lsm = *ea;
- if (lsm == NULL) {
- rc = obd_alloc_memmd(exp, &lsm);
- if (rc < 0)
- RETURN(rc);
- }
-
- while (try_again) {
- /* If orphans are being recovered, then we must wait until
- it is finished before we can continue with create. */
- if (oscc_recovering(oscc)) {
- struct l_wait_info lwi;
-
- CDEBUG(D_HA,"%p: oscc recovery in progress, waiting\n",
- oscc);
- lwi = LWI_TIMEOUT(MAX(obd_timeout*HZ/4, 1), NULL, NULL);
- rc = l_wait_event(oscc->oscc_waitq,
- !oscc_recovering(oscc), &lwi);
-
- LASSERT(rc == 0 || rc == -ETIMEDOUT);
- if (rc == -ETIMEDOUT) {
- CDEBUG(D_HA,"%p: timeout waiting on recovery\n",
- oscc);
- RETURN(rc);
- }
- CDEBUG(D_HA, "%s: oscc recovery over, waking up\n",
- exp->exp_obd->obd_name);
- }
-
- spin_lock(&oscc->oscc_lock);
- if (oscc->oscc_flags & OSCC_FLAG_EXITING) {
- spin_unlock(&oscc->oscc_lock);
- break;
- }
-
- if (oscc->oscc_last_id >= oscc->oscc_next_id) {
- memcpy(oa, &oscc->oscc_oa, sizeof(*oa));
- oa->o_id = oscc->oscc_next_id;
- oa->o_gr = oscc->oscc_gr;
- lsm->lsm_object_id = oscc->oscc_next_id;
- lsm->lsm_object_gr = oscc->oscc_gr;
- *ea = lsm;
- oscc->oscc_next_id++;
- try_again = 0;
- } else if (oscc->oscc_flags & OSCC_FLAG_NOSPC) {
- rc = -ENOSPC;
- spin_unlock(&oscc->oscc_lock);
- break;
- }
- spin_unlock(&oscc->oscc_lock);
- rc = oscc_precreate(oscc, try_again);
- if (rc)
- break;
- }
-
- if (rc == 0)
- CDEBUG(D_HA, "%s: returning objid "LPU64"\n",
- oscc->oscc_obd->u.cli.cl_import->imp_target_uuid.uuid,
- lsm->lsm_object_id);
- else if (*ea == NULL)
- obd_free_memmd(exp, &lsm);
- RETURN(rc);
+ LBUG();
+ RETURN(0);
}
void oscc_init(struct obd_device *obd)
return;
oscc = &obd->u.cli.cl_oscc;
-
memset(oscc, 0, sizeof(*oscc));
- INIT_LIST_HEAD(&oscc->oscc_list);
- init_waitqueue_head(&oscc->oscc_waitq);
- spin_lock_init(&oscc->oscc_lock);
- oscc->oscc_obd = obd;
- oscc->oscc_kick_barrier = 100;
- oscc->oscc_max_grow_count = 2000;
- oscc->oscc_grow_count = OST_MIN_PRECREATE;
- oscc->oscc_next_id = 2;
- oscc->oscc_last_id = 1;
+ oscc->oscc_obd = obd;
+ spin_lock_init(&oscc->oscc_lock);
oscc->oscc_flags |= OSCC_FLAG_RECOVERING;
- /* XXX the export handle should give the oscc the last object */
- /* oed->oed_oscc.oscc_last_id = exph->....; */
}
request->rq_replen = lustre_msg_size(1, &size);
- rc = ptlrpc_queue_wait(request);
- if (rc)
- GOTO(out, rc);
-
- body = lustre_swab_repbuf(request, 0, sizeof(*body),
- lustre_swab_ost_body);
- if (body == NULL)
- GOTO(out, rc = -EPROTO);
+ if (oti != NULL && (oti->oti_flags & OBD_MODE_ASYNC)) {
+ ptlrpcd_add_req(request);
+ rc = 0;
+ } else {
+ rc = ptlrpc_queue_wait(request);
+ if (rc)
+ GOTO(out, rc);
- memcpy(oa, &body->oa, sizeof(*oa));
+ body = lustre_swab_repbuf(request, 0, sizeof(*body),
+ lustre_swab_ost_body);
+ if (body == NULL)
+ GOTO(out, rc = -EPROTO);
+ memcpy(oa, &body->oa, sizeof(*oa));
+ }
EXIT;
out:
ptlrpc_req_finished(request);
}
static int osc_sync(struct obd_export *exp, struct obdo *oa,
- struct lov_stripe_md *md, obd_size start, obd_size end)
+ struct lov_stripe_md *md, obd_size start,
+ obd_size end)
{
struct ptlrpc_request *request;
struct ost_body *body;
memcpy(&body->oa, oa, sizeof(*oa));
request->rq_replen = lustre_msg_size(1, &size);
- if (oti != NULL && oti->oti_async) {
- /* asynchrounous destroy */
+ if (oti != NULL && (oti->oti_flags & OBD_MODE_ASYNC)) {
ptlrpcd_add_req(request);
rc = 0;
} else {
osc_wake_cache_waiters(cli);
osc_check_rpcs(cli);
-
spin_unlock(&cli->cl_loi_list_lock);
obdo_free(aa->aa_oa);
int rc = 0;
ENTRY;
- if (keylen == strlen("next_id") &&
- memcmp(key, "next_id", strlen("next_id")) == 0) {
- if (vallen != sizeof(obd_id))
- RETURN(-EINVAL);
- obd->u.cli.cl_oscc.oscc_next_id = *((obd_id*)val) + 1;
- CDEBUG(D_HA, "%s: set oscc_next_id = "LPU64"\n",
- exp->exp_obd->obd_name,
- obd->u.cli.cl_oscc.oscc_next_id);
-
- RETURN(0);
- }
-
- if (keylen == strlen("growth_count") &&
- memcmp(key, "growth_count", strlen("growth_count")) == 0) {
- if (vallen != sizeof(int))
- RETURN(-EINVAL);
- obd->u.cli.cl_oscc.oscc_max_grow_count = *((int*)val);
- RETURN(0);
- }
-
if (keylen == strlen("unlinked") &&
memcmp(key, "unlinked", keylen) == 0) {
struct osc_creator *oscc = &obd->u.cli.cl_oscc;
RETURN(0);
}
- if (keylen == strlen("async") && memcmp(key, "async", keylen) == 0) {
+ if (keylen == strlen("async") &&
+ memcmp(key, "async", keylen) == 0) {
struct client_obd *cl = &obd->u.cli;
if (vallen != sizeof(int))
RETURN(-EINVAL);
RETURN(-EINVAL);
}
- if (keylen < strlen("mds_conn") || memcmp(key, "mds_conn", keylen) != 0)
+ if (keylen < strlen("mds_conn") ||
+ memcmp(key, "mds_conn", keylen) != 0)
RETURN(-EINVAL);
- ctxt = llog_get_context(&exp->exp_obd->obd_llogs, LLOG_UNLINK_ORIG_CTXT);
+ ctxt = llog_get_context(&exp->exp_obd->obd_llogs,
+ LLOG_UNLINK_ORIG_CTXT);
if (ctxt) {
if (rc == 0)
rc = llog_initiator_connect(ctxt);
else
- CERROR("cannot establish the connect for ctxt %p: %d\n",
- ctxt, rc);
+ CERROR("cannot establish the connect for "
+ "ctxt %p: %d\n", ctxt, rc);
}
imp->imp_server_timeout = 1;
};
static struct llog_operations osc_unlink_orig_logops;
+
static int osc_llog_init(struct obd_device *obd, struct obd_llogs *llogs,
struct obd_device *tgt, int count,
struct llog_catid *catid)
RETURN(rc);
}
-
static int osc_connect(struct lustre_handle *exph,
struct obd_device *obd, struct obd_uuid *cluuid,
struct obd_connect_data *data,
if (flags & SM_ALL_PLG) /* enable all plugins */
SMFS_SET(I2SMI(inode)->smi_flags, SMFS_PLG_ALL);
- if (flags & SM_PRECREATE) /* disable logs for precreated objs */
- SMFS_CLEAR(I2SMI(inode)->smi_flags, SMFS_PLG_ALL);
-
-
#if 0
if (SMFS_DO_COW(S2SMI(inode->i_sb)) && (flags & SM_DO_COW))
SMFS_SET_INODE_COW(inode);
*/
if(flags & SM_ALL_PLG) /* disable all plugins */
SMFS_CLEAR(I2SMI(inode)->smi_flags, SMFS_PLG_ALL);
- if (flags & SM_PRECREATE) /* enable log again */
- SMFS_SET(I2SMI(inode)->smi_flags, SMFS_PLG_ALL);
-
RETURN(rc);
}
mkdir -p $DIR/$tdir-2
multiop $DIR/$tdir-1/f O_c &
pid=$!
+
# give multiop a chance to open
- sleep 1
+ # 1 second seems to be not enough, we met already such a cases
+ # --umka
+ sleep 5
do_facet $SINGLEMDS "sysctl -w lustre.fail_loc=0x80000107"
touch $DIR/${tdir}-2/f &
$SOCKETCLIENT $DIR/socket || error
$MUNLINK $DIR/socket
}
-run_test 54a "unix damain socket test =========================="
+run_test 54a "unix domain socket test =========================="
test_54b() {
f="$DIR/f54b"