in work with lov objdid file.
b=12702
i=umka
i=nathan
EXTRA_DIST := curproc.h kp30.h libcfs.h list.h lltrace.h \
portals_utils.h types.h user-lock.h user-prim.h user-time.h \
- user-tcpip.h
+ user-tcpip.h bitmap.h
--- /dev/null
+/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
+ * vim:expandtab:shiftwidth=8:tabstop=8:
+ *
+ * Copyright (C) 2007 Cluster File Systems, Inc.
+ *
+ * This file is part of Lustre, http://www.lustre.org.
+ *
+ * Lustre is free software; you can redistribute it and/or
+ * modify it under the terms of version 2 of the GNU General Public
+ * License as published by the Free Software Foundation.
+ *
+ * Lustre is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with Lustre; if not, write to the Free Software
+ * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+ *
+ */
+#ifndef _LIBCFS_BITMAP_H_
+#define _LIBCFS_BITMAP_H_
+
+
+typedef struct {
+ int size;
+ unsigned long data[0];
+} bitmap_t;
+
+#define BITMAP_SIZE(nbits) (((nbits/BITS_PER_LONG)+1)*sizeof(long)+sizeof(bitmap_t))
+
+static inline
+bitmap_t *ALLOCATE_BITMAP(int size)
+{
+ bitmap_t *ptr;
+
+ OBD_ALLOC(ptr, BITMAP_SIZE(size));
+ if (ptr == NULL)
+ RETURN(ptr);
+
+ ptr->size = size;
+ memset(ptr->data, 0, BITMAP_SIZE(size));
+
+ RETURN (ptr);
+}
+
+#define FREE_BITMAP(ptr) OBD_FREE(ptr, BITMAP_SIZE(ptr->size))
+
+static inline
+void bitmap_set(bitmap_t *bitmap, int nbit)
+{
+ set_bit(nbit, bitmap->data);
+}
+
+static inline
+void bitmap_clear(bitmap_t *bitmap, int nbit)
+{
+ clear_bit(nbit, bitmap->data);
+}
+
+static inline
+int bitmap_check(bitmap_t *bitmap, int nbit)
+{
+ int pos = nbit % BITS_PER_LONG;
+ return test_bit(pos, bitmap->data+(nbit/BITS_PER_LONG));
+}
+
+/* return 0 is bitmap has none set bits */
+static inline
+int bitmap_check_empty(bitmap_t *bitmap)
+{
+ return find_first_bit(bitmap->data, bitmap->size) == bitmap->size;
+}
+
+
+#define foreach_bit(bitmap, pos) \
+ for((pos)=find_first_bit((bitmap)->data, bitmap->size); \
+ (pos) < (bitmap)->size; \
+ (pos) = find_next_bit((bitmap)->data, (bitmap)->size, (pos)))
+
+#endif
--- /dev/null
+/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
+ * vim:expandtab:shiftwidth=8:tabstop=8:
+ *
+ * Copyright (C) 2004 Cluster File Systems, Inc.
+ * Author: Nikita Danilov <nikita@clusterfs.com>
+ *
+ * This file is part of Lustre, http://www.lustre.org.
+ *
+ * Lustre is free software; you can redistribute it and/or modify it under the
+ * terms of version 2 of the GNU General Public License as published by the
+ * Free Software Foundation.
+ *
+ * Lustre is distributed in the hope that it will be useful, but WITHOUT ANY
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
+ * details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with Lustre; if not, write to the Free Software Foundation, Inc., 675 Mass
+ * Ave, Cambridge, MA 02139, USA.
+ *
+ * Implementation of portable time API for user-level.
+ *
+ */
+
+#ifndef __LIBCFS_USER_BITOPS_H__
+#define __LIBCFS_USER_BITOPS_H__
+
+unsigned long find_next_bit(const unsigned int *addr,
+ unsigned long size, unsigned long offset);
+
+unsigned long find_next_zero_bit(const unsigned int *addr,
+ unsigned long size, unsigned long offset);
+
+#define find_first_bit(addr,size) (find_next_bit((addr),(size),0))
+#define find_first_zero_bit(addr,size) (find_next_zero_bit((addr),(size),0))
+
+#endif
if LIBLUSTRE
noinst_LIBRARIES= libcfs.a
-libcfs_a_SOURCES= debug.c user-prim.c user-lock.c user-tcpip.c
+libcfs_a_SOURCES= debug.c user-prim.c user-lock.c user-tcpip.c user-bitops.c
libcfs_a_CPPFLAGS = $(LLCPPFLAGS)
libcfs_a_CFLAGS = $(LLCFLAGS)
endif
--- /dev/null
+/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
+ * vim:expandtab:shiftwidth=8:tabstop=8:
+ *
+ * Copyright (C) 2007 Cluster File Systems, Inc.
+ *
+ * This file is part of Lustre, http://www.lustre.org.
+ *
+ * Lustre is free software; you can redistribute it and/or modify it under the
+ * terms of version 2 of the GNU General Public License as published by the
+ * Free Software Foundation.
+ *
+ * Lustre is distributed in the hope that it will be useful, but WITHOUT ANY
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
+ * details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with Lustre; if not, write to the Free Software Foundation, Inc., 675 Mass
+ * Ave, Cambridge, MA 02139, USA.
+ *
+ */
+#ifndef __KERNEL__
+
+#include <libcfs/libcfs.h>
+#include <libcfs/kp30.h>
+
+#include <string.h> /* for ffs - confirm POSIX */
+
+#define BITS_PER_WORD 32
+#define OFF_BY_START(start) ((start)/BITS_PER_WORD)
+
+unsigned long find_next_bit(const unsigned int *addr,
+ unsigned long size, unsigned long offset)
+{
+ uint32_t *word, *last;
+ unsigned int first_bit, bit, base;
+
+ word = addr + OFF_BY_START(offset);
+ last = addr + OFF_BY_START(size-1);
+ first_bit = offset % BITS_PER_WORD;
+ base = offset - first_bit;
+
+ if (offset >= size)
+ return size;
+ if (first_bit != 0) {
+ int tmp = (*word++) & (~0UL << first_bit);
+ bit = ffs(tmp);
+ if (bit < BITS_PER_WORD)
+ goto found;
+ word++;
+ base += BITS_PER_WORD;
+ }
+ while (word <= last) {
+ if (*word != 0ul) {
+ bit = ffs(*word);
+ goto found;
+ }
+ word++;
+ base += BITS_PER_WORD;
+ }
+ return size;
+found:
+ return base + bit;
+}
+
+unsigned long find_next_zero_bit(const unsigned int *addr,
+ unsigned long size, unsigned long offset)
+{
+ uint32_t *word, *last;
+ unsigned int first_bit, bit, base;
+
+ word = addr + OFF_BY_START(offset);
+ last = addr + OFF_BY_START(size-1);
+ first_bit = offset % BITS_PER_WORD;
+ base = offset - first_bit;
+
+ if (offset >= size)
+ return size;
+ if (first_bit != 0) {
+ int tmp = (*word++) & (~0UL << first_bit);
+ bit = ffs(~tmp);
+ if (bit < BITS_PER_WORD)
+ goto found;
+ word++;
+ base += BITS_PER_WORD;
+ }
+ while (word <= last) {
+ if (*word != ~0ul) {
+ bit = ffs(*word);
+ goto found;
+ }
+ word++;
+ base += BITS_PER_WORD;
+ }
+ return size;
+found:
+ return base + bit;
+}
+
+#endif
* Recommended e2fsprogs version: 1.40.2-cfs4
* Note that reiserfs quotas are disabled on SLES 10 in this kernel.
+
+Severity : enhancement
+Bugzilla : 12702
+Description: lost problems with lov objid file
+Details : Fixes some scability and access to not inited memory problems
+ in work with lov objdid file.
+
Severity : major
Frequency : always
Bugzilla : 14270
#include <assert.h>
#include <libcfs/list.h>
+#include <libcfs/user-bitops.h>
#include <lnet/lnet.h>
#include <libcfs/kp30.h>
struct llog_cookie *logcookies, int cookies_size);
int mds_lov_write_objids(struct obd_device *obd);
-void mds_lov_update_objids(struct obd_device *obd, obd_id *ids);
-void mds_objids_from_lmm(obd_id *, struct lov_mds_md *, struct lov_desc *);
+void mds_lov_update_objids(struct obd_device *obd, struct lov_mds_md *lmm);
/* ioctls for trying requests */
#define IOC_REQUEST_TYPE 'f'
#include <lustre_capa.h>
#include <class_hash.h>
+#include <libcfs/bitmap.h>
+
+
#define MAX_OBD_DEVICES 8192
/* this is really local to the OSC */
struct obd_export *mds_osc_exp; /* XXX lov_exp */
struct lov_desc mds_lov_desc;
__u32 mds_id;
- obd_id *mds_lov_objids;
- int mds_lov_objids_size;
- __u32 mds_lov_objids_in_file;
- int mds_lov_nextid_set;
+
+ /* mark pages dirty for write. */
+ bitmap_t *mds_lov_page_dirty;
+ /* array for store pages with obd_id */
+ void **mds_lov_page_array;
+ /* file for store objid */
struct file *mds_lov_objid_filp;
+ __u32 mds_lov_objid_count;
+ __u32 mds_lov_objid_lastpage;
+ __u32 mds_lov_objid_lastidx;
+
struct file *mds_health_check_filp;
unsigned long *mds_client_bitmap;
// struct upcall_cache *mds_group_hash;
struct lustre_quota_info mds_quota_info;
struct semaphore mds_qonoff_sem;
struct semaphore mds_health_sem;
- unsigned long mds_lov_objids_valid:1,
- mds_lov_objids_dirty:1,
- mds_fl_user_xattr:1,
+ unsigned long mds_fl_user_xattr:1,
mds_fl_acl:1,
mds_evict_ost_nids:1,
mds_fl_cfglog:1,
struct lustre_capa_key *mds_capa_keys;
};
+/* lov objid */
+extern __u32 mds_max_ost_index;
+
+#define MDS_LOV_ALLOC_SIZE (CFS_PAGE_SIZE)
+
+#define OBJID_PER_PAGE() (MDS_LOV_ALLOC_SIZE / sizeof(obd_id))
+
+#define MDS_LOV_OBJID_PAGES_COUNT (mds_max_ost_index/OBJID_PER_PAGE())
+
+extern int mds_lov_init_objids(struct obd_device *obd);
+extern void mds_lov_destroy_objids(struct obd_device *obd);
+
+struct obd_id_info {
+ __u32 idx;
+ obd_id *data;
+};
+
+/* */
+
struct echo_obd {
struct obdo eo_oa;
spinlock_t eo_lock;
struct obd_trans_info {
__u64 oti_transno;
__u64 oti_xid;
- __u64 *oti_objid;
/* Only used on the server side for tracking acks. */
struct oti_req_ack_lock {
struct lustre_handle lock;
obdo_from_inode(oa, inode, OBD_MD_FLTYPE | OBD_MD_FLATIME |
OBD_MD_FLMTIME | OBD_MD_FLCTIME);
- oti.oti_objid = NULL;
memcpy(lsm2, lsm, lsm_size);
rc = obd_create(exp, oa, &lsm2, &oti);
dump_lsm(D_ERROR, data->lsm);
GOTO(out, rc = -ENXIO);
} else if (KEY_IS("last_id")) {
- obd_id *ids = val;
- unsigned int size = sizeof(obd_id);
- for (i = 0; i < lov->desc.ld_tgt_count; i++) {
- if (!lov->lov_tgts[i] || !lov->lov_tgts[i]->ltd_active)
- continue;
- rc = obd_get_info(lov->lov_tgts[i]->ltd_exp,
- keylen, key, &size, ids + i);
- if (rc != 0)
- GOTO(out, rc);
- }
+ struct obd_id_info *info = val;
+ int size = sizeof(obd_id);
+ struct lov_tgt_desc *tgt;
+
+ LASSERT(*vallen == sizeof(struct obd_id_info));
+ tgt = lov->lov_tgts[info->idx];
+
+ if (!tgt || !tgt->ltd_active)
+ GOTO(out, rc = -ESRCH);
+
+ rc = obd_get_info(tgt->ltd_exp, keylen, key, &size, info->data);
GOTO(out, rc = 0);
} else if (KEY_IS(KEY_LOVDESC)) {
struct lov_desc *desc_ret = val;
struct lov_obd *lov = &obddev->u.lov;
obd_count count;
int i, rc = 0, err;
- int no_set = !set;
- int incr = 0, check_uuid = 0, do_inactive = 0;
+ struct lov_tgt_desc *tgt;
+ unsigned incr, check_uuid,
+ do_inactive, no_set;
+ unsigned next_id = 0, mds_con = 0;
ENTRY;
- if (no_set) {
+ incr = check_uuid = do_inactive = no_set = 0;
+ if (set == NULL) {
+ no_set = 1;
set = ptlrpc_prep_set();
if (!set)
RETURN(-ENOMEM);
count = lov->desc.ld_tgt_count;
if (KEY_IS(KEY_NEXT_ID)) {
- /* We must use mds's idea of # osts for indexing into
- mds->mds_lov_objids */
- count = vallen / sizeof(obd_id);
- LASSERT(count <= lov->desc.ld_tgt_count);
- vallen = sizeof(obd_id);
- incr = sizeof(obd_id);
+ count = vallen / sizeof(struct obd_id_info);
+ vallen = sizeof(struct obd_id_info);
+ incr = sizeof(struct obd_id_info);
do_inactive = 1;
+ next_id = 1;
} else if (KEY_IS("checksum")) {
do_inactive = 1;
} else if (KEY_IS("unlinked")) {
check_uuid = val ? 1 : 0;
} else if (KEY_IS("evict_by_nid")) {
/* use defaults: do_inactive = incr = 0; */
+ } else if (KEY_IS(KEY_MDS_CONN)) {
+ mds_con = 1;
}
for (i = 0; i < count; i++, val = (char *)val + incr) {
+ if (next_id) {
+ tgt = lov->lov_tgts[((struct obd_id_info*)val)->idx];
+ } else {
+ tgt = lov->lov_tgts[i];
+ }
/* OST was disconnected */
- if (!lov->lov_tgts[i] || !lov->lov_tgts[i]->ltd_exp)
+ if (!tgt || !tgt->ltd_exp)
continue;
/* OST is inactive and we don't want inactive OSCs */
- if (!lov->lov_tgts[i]->ltd_active && !do_inactive)
+ if (!tgt->ltd_active && !do_inactive)
continue;
- if (KEY_IS(KEY_MDS_CONN)) {
+ if (mds_con) {
struct mds_group_info *mgi;
LASSERT(vallen == sizeof(*mgi));
mgi = (struct mds_group_info *)val;
-
+
/* Only want a specific OSC */
if (mgi->uuid && !obd_uuid_equals(mgi->uuid,
- &lov->lov_tgts[i]->ltd_uuid))
+ &tgt->ltd_uuid))
continue;
- err = obd_set_info_async(lov->lov_tgts[i]->ltd_exp,
- keylen, key, sizeof(int),
+ err = obd_set_info_async(tgt->ltd_exp,
+ keylen, key, sizeof(int),
&mgi->group, set);
- } else {
+ } else if (next_id) {
+ err = obd_set_info_async(tgt->ltd_exp,
+ keylen, key, vallen,
+ ((struct obd_id_info*)val)->data, set);
+ } else {
/* Only want a specific OSC */
if (check_uuid &&
- !obd_uuid_equals(val, &lov->lov_tgts[i]->ltd_uuid))
+ !obd_uuid_equals(val, &tgt->ltd_uuid))
continue;
-
- err = obd_set_info_async(lov->lov_tgts[i]->ltd_exp,
+
+ err = obd_set_info_async(tgt->ltd_exp,
keylen, key, vallen, val, set);
}
-
+
if (!rc)
rc = err;
}
if (rc)
RETURN(rc);
- if (oti && oti->oti_objid)
- oti->oti_objid[req->rq_idx] = req->rq_oi.oi_oa->o_id;
-
loi->loi_id = req->rq_oi.oi_oa->o_id;
loi->loi_gr = req->rq_oi.oi_oa->o_gr;
loi->loi_ost_idx = req->rq_idx;
/* update lov_objid data, must be before transaction stop! */
if (rc == 0)
- mdd_lov_objid_update(env, mdd);
+ mdd_lov_objid_update(mdd, lmm);
mdd_trans_stop(env, mdd, rc, handle);
out_free:
/* update lov_objid data, must be before transaction stop! */
if (rc == 0)
- mdd_lov_objid_update(env, mdd);
+ mdd_lov_objid_update(mdd, lmm);
mdd_pdo_write_unlock(env, mdd_pobj, dlh);
out_trans:
struct mdd_object *parent, struct mdd_object *child,
struct lov_mds_md **lmm, int *lmm_size,
const struct md_op_spec *spec, struct lu_attr *la);
-void mdd_lov_objid_update(const struct lu_env *env, struct mdd_device *mdd);
+void mdd_lov_objid_update(struct mdd_device *mdd, struct lov_mds_md *lmm);
void mdd_lov_create_finish(const struct lu_env *env, struct mdd_device *mdd,
struct lov_mds_md *lmm, int lmm_size,
const struct md_op_spec *spec);
return fid_flatten(fid);
}
-static int mdd_lov_objid_alloc(const struct lu_env *env,
- struct mdd_device *mdd)
+void mdd_lov_objid_update(struct mdd_device *mdd, struct lov_mds_md *lmm)
{
- struct mdd_thread_info *info = mdd_env_info(env);
- struct mds_obd *mds = &mdd->mdd_obd_dev->u.mds;
-
- OBD_ALLOC(info->mti_oti.oti_objid,
- mds->mds_lov_desc.ld_tgt_count * sizeof(obd_id));
- return (info->mti_oti.oti_objid == NULL ? -ENOMEM : 0);
-}
-
-void mdd_lov_objid_update(const struct lu_env *env, struct mdd_device *mdd)
-{
- struct mdd_thread_info *info = mdd_env_info(env);
- if (info->mti_oti.oti_objid != NULL)
- mds_lov_update_objids(mdd->mdd_obd_dev,
- info->mti_oti.oti_objid);
-}
-
-static void mdd_lov_objid_from_lmm(const struct lu_env *env,
- struct mdd_device *mdd,
- struct lov_mds_md *lmm)
-{
- struct mds_obd *mds = &mdd->mdd_obd_dev->u.mds;
- struct mdd_thread_info *info = mdd_env_info(env);
- mds_objids_from_lmm(info->mti_oti.oti_objid, lmm, &mds->mds_lov_desc);
-}
-
-static void mdd_lov_objid_free(const struct lu_env *env,
- struct mdd_device *mdd)
-{
- struct mdd_thread_info *info = mdd_env_info(env);
- struct mds_obd *mds = &mdd->mdd_obd_dev->u.mds;
-
- OBD_FREE(info->mti_oti.oti_objid,
- mds->mds_lov_desc.ld_tgt_count * sizeof(obd_id));
- info->mti_oti.oti_objid = NULL;
+ mds_lov_update_objids(mdd->mdd_obd_dev, lmm);
}
void mdd_lov_create_finish(const struct lu_env *env, struct mdd_device *mdd,
struct lov_mds_md *lmm, int lmm_size,
const struct md_op_spec *spec)
{
- struct mdd_thread_info *info = mdd_env_info(env);
-
if (lmm && !spec->u.sp_ea.no_lov_create)
OBD_FREE(lmm, lmm_size);
-
- if (info->mti_oti.oti_objid != NULL)
- mdd_lov_objid_free(env, mdd);
}
int mdd_lov_create(const struct lu_env *env, struct mdd_device *mdd,
RETURN(0);
oti_init(oti, NULL);
- rc = mdd_lov_objid_alloc(env, mdd);
- if (rc != 0)
- RETURN(rc);
/* replay case, has objects already, only get lov from eadata */
if (spec->u.sp_ea.no_lov_create != 0) {
*lmm = (struct lov_mds_md *)spec->u.sp_ea.eadata;
*lmm_size = spec->u.sp_ea.eadatalen;
- mdd_lov_objid_from_lmm(env, mdd, *lmm);
RETURN(0);
}
out_ids:
if (lsm)
obd_free_memmd(lov_exp, &lsm);
- if (rc != 0)
- mdd_lov_objid_free(env, mdd);
return rc;
}
-
/*
* used when destroying orphans and from mds_reint_unlink() when MDS wants to
* destroy objects on OSS.
CFS_MODULE_PARM(mds_num_threads, "i", int, 0444,
"number of MDS service threads to start");
+__u32 mds_max_ost_index=0xFFFF;
+CFS_MODULE_PARM(mds_max_ost_index, "i", int, 0444,
+ "maximal OST index");
+
static int mds_intent_policy(struct ldlm_namespace *ns,
struct ldlm_lock **lockp, void *req_cookie,
ldlm_mode_t mode, int flags, void *data);
lquota_cleanup(mds_quota_interface_ref, obd);
mds_update_server_data(obd, 1);
- if (mds->mds_lov_objids != NULL)
- OBD_FREE(mds->mds_lov_objids, mds->mds_lov_objids_size);
+ /* XXX
+ mds_lov_destroy_objids(obd);
+ */
mds_fs_cleanup(obd);
#if 0
struct lustre_sb_info *lsi;
struct lustre_mount_info *lmi;
struct dentry *dentry;
- struct file *file;
int rc = 0;
ENTRY;
CERROR("__iopen__ directory has no inode? rc = %d\n", rc);
GOTO(err_fid, rc);
}
-
- /* open and test the lov objd file */
- file = filp_open(LOV_OBJID, O_RDWR | O_CREAT, 0644);
- if (IS_ERR(file)) {
- rc = PTR_ERR(file);
- CERROR("cannot open/create %s file: rc = %d\n", LOV_OBJID, rc);
- GOTO(err_fid, rc = PTR_ERR(file));
- }
- mds->mds_lov_objid_filp = file;
- if (!S_ISREG(file->f_dentry->d_inode->i_mode)) {
- CERROR("%s is not a regular file!: mode = %o\n", LOV_OBJID,
- file->f_dentry->d_inode->i_mode);
- GOTO(err_lov_objid, rc = -ENOENT);
+ rc = mds_lov_init_objids(obd);
+ if (rc != 0) {
+ CERROR("cannot init lov objid rc = %d\n", rc);
+ GOTO(err_fid, rc );
}
rc = mds_lov_presetup(mds, lcfg);
err_pop:
pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
RETURN(rc);
-err_lov_objid:
- if (mds->mds_lov_objid_filp &&
- filp_close((struct file *)mds->mds_lov_objid_filp, 0))
- CERROR("can't close %s after error\n", LOV_OBJID);
err_fid:
dput(mds->mds_fid_de);
err_objects:
"will be preserved.\n", obd->obd_name);
push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
- if (mds->mds_lov_objid_filp) {
- rc = filp_close((struct file *)mds->mds_lov_objid_filp, 0);
- mds->mds_lov_objid_filp = NULL;
- if (rc)
- CERROR("%s file won't close, rc=%d\n", LOV_OBJID, rc);
- }
+
+ mds_lov_destroy_objids(obd);
+
if (mds->mds_objects_dir != NULL) {
l_dput(mds->mds_objects_dir);
mds->mds_objects_dir = NULL;
}
- if (mds->mds_lov_objids != NULL)
- OBD_FREE(mds->mds_lov_objids, mds->mds_lov_objids_size);
-
shrink_dcache_parent(mds->mds_fid_de);
dput(mds->mds_fid_de);
LL_DQUOT_OFF(obd->u.obt.obt_sb);
int mds_lov_connect(struct obd_device *obd, char * lov_name);
int mds_lov_disconnect(struct obd_device *obd);
int mds_lov_write_objids(struct obd_device *obd);
+
int mds_lov_clear_orphans(struct mds_obd *mds, struct obd_uuid *ost_uuid);
-void mds_lov_update_objids(struct obd_device *obd, obd_id *ids);
+void mds_lov_update_objids(struct obd_device *obd, struct lov_mds_md *lmm);
int mds_lov_set_nextid(struct obd_device *obd);
+
int mds_lov_start_synchronize(struct obd_device *obd,
struct obd_device *watched,
void *data, int nonblock);
enum obd_notify_event ev, void *data);
int mds_convert_lov_ea(struct obd_device *obd, struct inode *inode,
struct lov_mds_md *lmm, int lmm_size);
-void mds_objids_from_lmm(obd_id *ids, struct lov_mds_md *lmm,
- struct lov_desc *desc);
int mds_init_lov_desc(struct obd_device *obd, struct obd_export *osc_exp);
/* mds/mds_open.c */
#include "mds_internal.h"
-void mds_lov_update_objids(struct obd_device *obd, obd_id *ids)
+static void mds_lov_dump_objids(const char *label, struct obd_device *obd)
{
struct mds_obd *mds = &obd->u.mds;
- int i;
+ unsigned int i=0, j;
+
+ CDEBUG(D_INFO, "dump from %s\n", label);
+ if (mds->mds_lov_page_dirty == NULL) {
+ CERROR("NULL bitmap!\n");
+ GOTO(skip_bitmap, i);
+ }
+
+ for(i=0;i<((mds->mds_lov_page_dirty->size/BITS_PER_LONG)+1);i++)
+ CDEBUG(D_INFO, "%u - %lx\n", i, mds->mds_lov_page_dirty->data[i]);
+skip_bitmap:
+ if (mds->mds_lov_page_array == NULL) {
+ CERROR("not init page array!\n");
+ GOTO(skip_array, i);
+
+ }
+ for(i=0;i<MDS_LOV_OBJID_PAGES_COUNT;i++) {
+ obd_id *data = mds->mds_lov_page_array[i];
+
+ if (data == NULL)
+ continue;
+
+ for(j=0; j < OBJID_PER_PAGE(); j++) {
+ if (data[j] == 0)
+ continue;
+ CDEBUG(D_INFO,"objid page %u idx %u - %llu ",i,j,data[j]);
+ }
+ }
+skip_array:
+ EXIT;
+}
+
+int mds_lov_init_objids(struct obd_device *obd)
+{
+ struct mds_obd *mds = &obd->u.mds;
+ int size = MDS_LOV_OBJID_PAGES_COUNT*sizeof(void *);
+ struct file *file;
+ int rc;
+ ENTRY;
+
+ CLASSERT(((MDS_LOV_ALLOC_SIZE % sizeof(obd_id)) == 0));
+
+ mds->mds_lov_page_dirty = ALLOCATE_BITMAP(MDS_LOV_OBJID_PAGES_COUNT);
+ if (mds->mds_lov_page_dirty == NULL)
+ RETURN(-ENOMEM);
+
+
+ OBD_ALLOC(mds->mds_lov_page_array, size);
+ if (mds->mds_lov_page_array == NULL)
+ GOTO(err_free_bitmap, rc = -ENOMEM);
+
+ /* open and test the lov objd file */
+ file = filp_open(LOV_OBJID, O_RDWR | O_CREAT, 0644);
+ if (IS_ERR(file)) {
+ rc = PTR_ERR(file);
+ CERROR("cannot open/create %s file: rc = %d\n", LOV_OBJID, rc);
+ GOTO(err_free, rc = PTR_ERR(file));
+ }
+ if (!S_ISREG(file->f_dentry->d_inode->i_mode)) {
+ CERROR("%s is not a regular file!: mode = %o\n", LOV_OBJID,
+ file->f_dentry->d_inode->i_mode);
+ GOTO(err_open, rc = -ENOENT);
+ }
+ mds->mds_lov_objid_filp = file;
+
+ RETURN (0);
+err_open:
+ if (filp_close((struct file *)file, 0))
+ CERROR("can't close %s after error\n", LOV_OBJID);
+err_free:
+ OBD_FREE(mds->mds_lov_page_array, size);
+err_free_bitmap:
+ FREE_BITMAP(mds->mds_lov_page_dirty);
+
+ RETURN(rc);
+}
+EXPORT_SYMBOL(mds_lov_init_objids);
+
+void mds_lov_destroy_objids(struct obd_device *obd)
+{
+ struct mds_obd *mds = &obd->u.mds;
+ int i, rc;
+ ENTRY;
+
+ if (mds->mds_lov_page_array != NULL) {
+ for(i=0;i<MDS_LOV_OBJID_PAGES_COUNT;i++) {
+ obd_id *data = mds->mds_lov_page_array[i];
+ if (data != NULL)
+ OBD_FREE(data, MDS_LOV_ALLOC_SIZE);
+ }
+ OBD_FREE(mds->mds_lov_page_array,
+ MDS_LOV_OBJID_PAGES_COUNT*sizeof(void *));
+ }
+
+ if (mds->mds_lov_objid_filp) {
+ rc = filp_close((struct file *)mds->mds_lov_objid_filp, 0);
+ mds->mds_lov_objid_filp = NULL;
+ if (rc)
+ CERROR("%s file won't close, rc=%d\n", LOV_OBJID, rc);
+ }
+
+ FREE_BITMAP(mds->mds_lov_page_dirty);
+ EXIT;
+}
+EXPORT_SYMBOL(mds_lov_destroy_objids);
+
+void mds_lov_update_objids(struct obd_device *obd, struct lov_mds_md *lmm)
+{
+ struct mds_obd *mds = &obd->u.mds;
+ int j;
ENTRY;
- lock_kernel();
- for (i = 0; i < mds->mds_lov_desc.ld_tgt_count; i++)
- if (ids[i] > (mds->mds_lov_objids)[i]) {
- (mds->mds_lov_objids)[i] = ids[i];
- mds->mds_lov_objids_dirty = 1;
+ /* if we create file without objects - lmm is NULL */
+ if (lmm == NULL)
+ return;
+
+ for (j = 0; j < le32_to_cpu(lmm->lmm_stripe_count); j++) {
+ int i = le32_to_cpu(lmm->lmm_objects[j].l_ost_idx);
+ obd_id id = le64_to_cpu(lmm->lmm_objects[j].l_object_id);
+ int page = i / OBJID_PER_PAGE();
+ int idx = i % OBJID_PER_PAGE();
+ obd_id *data = mds->mds_lov_page_array[page];
+
+ CDEBUG(D_INODE,"update last object for ost %d - new %llu"
+ " old %llu\n", i, id, data[idx]);
+ if (id > data[idx]) {
+ data[idx] = id;
+ bitmap_set(mds->mds_lov_page_dirty, page);
}
- unlock_kernel();
+ }
EXIT;
}
EXPORT_SYMBOL(mds_lov_update_objids);
static int mds_lov_read_objids(struct obd_device *obd)
{
struct mds_obd *mds = &obd->u.mds;
- obd_id *ids;
loff_t off = 0;
- int i, rc, size;
+ int i, rc, count = 0, page = 0;
+ size_t size;
ENTRY;
- LASSERT(!mds->mds_lov_objids_size);
- LASSERT(!mds->mds_lov_objids_dirty);
-
/* Read everything in the file, even if our current lov desc
has fewer targets. Old targets not in the lov descriptor
during mds setup may still have valid objids. */
if (size == 0)
RETURN(0);
- OBD_ALLOC(ids, size);
- if (ids == NULL)
- RETURN(-ENOMEM);
- mds->mds_lov_objids = ids;
- mds->mds_lov_objids_size = size;
+ page = (size/(OBJID_PER_PAGE()*sizeof(obd_id)))+1;
+ CDEBUG(D_INFO, "file size %d pages %d\n", size, page);
+ for(i=0; i < page; i++) {
+ obd_id *data = mds->mds_lov_page_array[i];
+ loff_t off_old = off;
- rc = fsfilt_read_record(obd, mds->mds_lov_objid_filp, ids, size, &off);
- if (rc < 0) {
- CERROR("Error reading objids %d\n", rc);
- RETURN(rc);
- }
+ LASSERT(data == NULL);
+ OBD_ALLOC(data, MDS_LOV_ALLOC_SIZE);
+ if (data == NULL)
+ GOTO(out, rc = -ENOMEM);
- mds->mds_lov_objids_in_file = size / sizeof(*ids);
+ mds->mds_lov_page_array[i] = data;
- for (i = 0; i < mds->mds_lov_objids_in_file; i++) {
- CDEBUG(D_INFO, "read last object "LPU64" for idx %d\n",
- mds->mds_lov_objids[i], i);
+ rc = fsfilt_read_record(obd, mds->mds_lov_objid_filp, data,
+ OBJID_PER_PAGE()*sizeof(obd_id), &off);
+ if (rc < 0) {
+ CERROR("Error reading objids %d\n", rc);
+ GOTO(out, rc);
+ }
+ if (off == off_old)
+ break; // eof
+
+ count += (off-off_old+sizeof(obd_id)-1)/sizeof(obd_id);
}
+ mds->mds_lov_objid_count = count;
+ mds->mds_lov_objid_lastpage = count / OBJID_PER_PAGE();
+ mds->mds_lov_objid_lastidx = count % OBJID_PER_PAGE();
+ CDEBUG(D_INFO, "Read %u objid\n", count);
+out:
+ mds_lov_dump_objids("read",obd);
+
RETURN(0);
}
int mds_lov_write_objids(struct obd_device *obd)
{
struct mds_obd *mds = &obd->u.mds;
- loff_t off = 0;
- int i, rc, tgts;
+ int i, rc = 0;
ENTRY;
- if (!mds->mds_lov_objids_dirty)
+ if (bitmap_check_empty(mds->mds_lov_page_dirty))
RETURN(0);
- tgts = max(mds->mds_lov_desc.ld_tgt_count, mds->mds_lov_objids_in_file);
+ mds_lov_dump_objids("write", obd);
- if (!tgts)
- RETURN(0);
+ foreach_bit(mds->mds_lov_page_dirty, i) {
+ obd_id *data = mds->mds_lov_page_array[i];
+ unsigned int size = OBJID_PER_PAGE()*sizeof(obd_id);
+ loff_t off = i * size;
- for (i = 0; i < tgts; i++)
- CDEBUG(D_INFO, "writing last object "LPU64" for idx %d\n",
- mds->mds_lov_objids[i], i);
+ LASSERT(data != NULL);
- rc = fsfilt_write_record(obd, mds->mds_lov_objid_filp,
- mds->mds_lov_objids, tgts * sizeof(obd_id),
- &off, 0);
- if (rc >= 0) {
- mds->mds_lov_objids_dirty = 0;
- rc = 0;
+ /* check for particaly filled last page */
+ if (i == mds->mds_lov_objid_lastpage) {
+ size = mds->mds_lov_objid_lastidx * sizeof(obd_id);
+ }
+
+ rc = fsfilt_write_record(obd, mds->mds_lov_objid_filp, data,
+ size, &off, 0);
+ if (rc < 0)
+ break;
+ bitmap_clear(mds->mds_lov_page_dirty, i);
}
+ if (rc >= 0)
+ rc = 0;
RETURN(rc);
}
EXPORT_SYMBOL(mds_lov_write_objids);
+static int mds_lov_get_objid(struct obd_device * obd, struct obd_export *export,
+ __u32 idx)
+{
+ struct mds_obd *mds = &obd->u.mds;
+ unsigned int page;
+ unsigned int off;
+ obd_id *data;
+ int rc = 0;
+ ENTRY;
+
+ page = idx / OBJID_PER_PAGE();
+ off = idx % OBJID_PER_PAGE();
+ data = mds->mds_lov_page_array[page];
+ if (data == NULL) {
+ OBD_ALLOC(data, MDS_LOV_ALLOC_SIZE);
+ if (data == NULL)
+ GOTO(out, rc = -ENOMEM);
+
+ mds->mds_lov_page_array[page] = data;
+ }
+
+ printk("get %d - %p - %d/%d\n", idx, data, page, off);
+ if (data[off] == 0) {
+ /* We never read this lastid; ask the osc */
+ struct obd_id_info lastid;
+ __u32 size = sizeof(lastid);
+
+ lastid.idx = idx;
+ lastid.data = &data[off];
+ rc = obd_get_info(export, sizeof("last_id"),
+ "last_id", &size, &lastid);
+ if (rc)
+ GOTO(out, rc);
+
+ if (idx > mds->mds_lov_objid_count) {
+ mds->mds_lov_objid_count = idx;
+ mds->mds_lov_objid_lastpage = page;
+ mds->mds_lov_objid_lastidx = off;
+ }
+ bitmap_set(mds->mds_lov_page_dirty, page);
+ }
+out:
+ RETURN(rc);
+}
+
int mds_lov_clear_orphans(struct mds_obd *mds, struct obd_uuid *ost_uuid)
{
int rc;
struct lov_stripe_md *empty_ea = NULL;
ENTRY;
- LASSERT(mds->mds_lov_objids != NULL);
+ LASSERT(mds->mds_lov_page_array != NULL);
/* This create will in fact either create or destroy: If the OST is
* missing objects below this ID, they will be created. If it finds
RETURN(rc);
}
-
/* update the LOV-OSC knowledge of the last used object id's */
+/* for all targets */
+/* is we realy need this ? all osc's should be pass via __mds_lov_synchronize
+ * and call */
+#define MDS_LOV_SETID_COUNT (CFS_PAGE_SIZE / sizeof(struct obd_id_info))
+
int mds_lov_set_nextid(struct obd_device *obd)
{
struct mds_obd *mds = &obd->u.mds;
+ int i = 0, j, rc = 0;
+ struct obd_id_info *info;
+ ENTRY;
+
+ LASSERT(!obd->obd_recovering);
+
+ /* obd->obd_dev_sem must be held so mds_lov_objids doesn't change */
+ LASSERT_SEM_LOCKED(&obd->obd_dev_sem);
+
+ OBD_ALLOC(info, CFS_PAGE_SIZE);
+ if (info == NULL)
+ RETURN(-ENOMEM);
+
+ while(i < mds->mds_lov_desc.ld_tgt_count) {
+ for(j=0; j < MDS_LOV_SETID_COUNT; i++, j++) {
+ int page = i / OBJID_PER_PAGE();
+ int idx = i % OBJID_PER_PAGE();
+ obd_id *data = mds->mds_lov_page_array[page];
+
+ if (i == mds->mds_lov_desc.ld_tgt_count)
+ break;
+
+ info[j].idx = i;
+ info[j].data = &data[idx];
+ }
+
+ rc = obd_set_info_async(mds->mds_osc_exp, sizeof(KEY_NEXT_ID),
+ KEY_NEXT_ID, sizeof(info), &info, NULL);
+ if (rc) {
+ CERROR ("%s: mds_lov_set_nextid failed (%d)\n",
+ obd->obd_name, rc);
+ break;
+ }
+ }
+ OBD_FREE(info, CFS_PAGE_SIZE);
+
+ RETURN(rc);
+
+}
+
+/* for one target */
+static int mds_lov_set_one_nextid(struct obd_device *obd, __u32 idx, obd_id *id)
+{
+ struct mds_obd *mds = &obd->u.mds;
int rc;
+ struct obd_id_info info;
ENTRY;
LASSERT(!obd->obd_recovering);
- LASSERT(mds->mds_lov_objids != NULL);
-
+
/* obd->obd_dev_sem must be held so mds_lov_objids doesn't change */
LASSERT_SEM_LOCKED(&obd->obd_dev_sem);
- rc = obd_set_info_async(mds->mds_osc_exp, strlen(KEY_NEXT_ID),
- KEY_NEXT_ID,
- mds->mds_lov_desc.ld_tgt_count *
- sizeof(*mds->mds_lov_objids),
- mds->mds_lov_objids, NULL);
+ info.idx = idx;
+ info.data = id;
+ rc = obd_set_info_async(mds->mds_osc_exp, sizeof(KEY_NEXT_ID),
+ KEY_NEXT_ID, sizeof(info), &info, NULL);
if (rc)
CERROR ("%s: mds_lov_set_nextid failed (%d)\n",
obd->obd_name, rc);
{
struct mds_obd *mds = &obd->u.mds;
struct lov_desc *ld;
- __u32 size, stripes, valsize = sizeof(mds->mds_lov_desc);
- int rc = 0;
+ __u32 stripes, valsize = sizeof(mds->mds_lov_desc);
+ int page, rc = 0;
ENTRY;
OBD_ALLOC(ld, sizeof(*ld));
if (!ld)
RETURN(-ENOMEM);
- rc = obd_get_info(lov, strlen(KEY_LOVDESC) + 1, KEY_LOVDESC,
+ rc = obd_get_info(lov, sizeof(KEY_LOVDESC), KEY_LOVDESC,
&valsize, ld);
if (rc)
GOTO(out, rc);
/* The size of the LOV target table may have increased. */
- size = ld->ld_tgt_count * sizeof(obd_id);
- if ((mds->mds_lov_objids_size == 0) ||
- (size > mds->mds_lov_objids_size)) {
+ page = ld->ld_tgt_count / OBJID_PER_PAGE();
+ if (mds->mds_lov_page_array[page] == NULL) {
obd_id *ids;
- /* add room by powers of 2 */
- size = 1;
- while (size < ld->ld_tgt_count)
- size = size << 1;
- size = size * sizeof(obd_id);
-
- OBD_ALLOC(ids, size);
+ OBD_ALLOC(ids, MDS_LOV_ALLOC_SIZE);
if (ids == NULL)
GOTO(out, rc = -ENOMEM);
- memset(ids, 0, size);
- if (mds->mds_lov_objids_size) {
- obd_id *old_ids = mds->mds_lov_objids;
- memcpy(ids, mds->mds_lov_objids,
- mds->mds_lov_objids_size);
- mds->mds_lov_objids = ids;
- OBD_FREE(old_ids, mds->mds_lov_objids_size);
- }
- mds->mds_lov_objids = ids;
- mds->mds_lov_objids_size = size;
+
+ mds->mds_lov_page_array[page] = ids;
}
/* Don't change the mds_lov_desc until the objids size matches the
mds->mds_lov_desc.ld_tgt_count);
stripes = min_t(__u32, LOV_MAX_STRIPE_COUNT,
- max(mds->mds_lov_desc.ld_tgt_count,
- mds->mds_lov_objids_in_file));
+ mds->mds_lov_desc.ld_tgt_count);
+
mds->mds_max_mdsize = lov_mds_md_size(stripes);
mds->mds_max_cookiesize = stripes * sizeof(struct llog_cookie);
CDEBUG(D_CONFIG, "updated max_mdsize/max_cookiesize for %d stripes: "
/* If we added a target we have to reconnect the llogs */
/* We only _need_ to do this at first add (idx), or the first time
after recovery. However, it should now be safe to call anytime. */
- llog_cat_initialize(obd, NULL, mds->mds_lov_desc.ld_tgt_count, NULL);
+ rc = llog_cat_initialize(obd, NULL, mds->mds_lov_desc.ld_tgt_count, NULL);
/*XXX this notifies the MDD until lov handling use old mds code */
if (obd->obd_upcall.onu_owner) {
__u32 idx, struct obd_uuid *uuid)
{
struct mds_obd *mds = &obd->u.mds;
- int old_count;
+ __u32 old_count;
int rc = 0;
+ int page;
+ int off;
+ obd_id *data;
+
ENTRY;
/* Don't let anyone else mess with mds_lov_objids now */
old_count = mds->mds_lov_desc.ld_tgt_count;
rc = mds_lov_update_desc(obd, mds->mds_osc_exp);
- if (rc)
+ if (rc)
GOTO(out, rc);
CDEBUG(D_CONFIG, "idx=%d, recov=%d/%d, cnt=%d/%d\n",
mds->mds_lov_desc.ld_tgt_count);
/* idx is set as data from lov_notify. */
- if (idx == MDSLOV_NO_INDEX || obd->obd_recovering)
+ if (idx == MDSLOV_NO_INDEX || obd->obd_recovering)
GOTO(out, rc);
-
+
if (idx >= mds->mds_lov_desc.ld_tgt_count) {
- CERROR("index %d > count %d!\n", idx,
+ CERROR("index %d > count %d!\n", idx,
mds->mds_lov_desc.ld_tgt_count);
GOTO(out, rc = -EINVAL);
}
- if (idx >= mds->mds_lov_objids_in_file) {
- /* We never read this lastid; ask the osc */
- obd_id lastid;
- __u32 size = sizeof(lastid);
- rc = obd_get_info(watched->obd_self_export, strlen("last_id"),
- "last_id", &size, &lastid);
- if (rc)
- GOTO(out, rc);
- mds->mds_lov_objids[idx] = lastid;
- mds->mds_lov_objids_dirty = 1;
- mds_lov_write_objids(obd);
+ page = idx / OBJID_PER_PAGE();
+ off = idx % OBJID_PER_PAGE();
+ data = mds->mds_lov_page_array[page];
+ CDEBUG(D_CONFIG, "idx %d - %p - %d/%d\n", idx, data, page, off);
+ if (data[off] == 0) {
+ rc = mds_lov_get_objid(obd, watched->obd_self_export, idx);
} else {
/* We have read this lastid from disk; tell the osc.
Don't call this during recovery. */
- rc = mds_lov_set_nextid(obd);
+ rc = mds_lov_set_one_nextid(obd, idx, &data[off]);
if (rc) {
CERROR("Failed to set next id, idx=%d rc=%d\n", idx,rc);
/* Don't abort the rest of the sync */
}
CDEBUG(D_CONFIG, "last object "LPU64" from OST %d rc=%d\n",
- mds->mds_lov_objids[idx], idx, rc);
+ data[off], idx, rc);
out:
mutex_up(&obd->obd_dev_sem);
RETURN(rc);
struct mds_obd *mds = &obd->u.mds;
struct lustre_handle conn = {0,};
struct obd_connect_data *data;
- int rc, i;
+ int rc;
ENTRY;
if (IS_ERR(mds->mds_osc_obd))
if (rc)
GOTO(err_reg, rc);
- /* tgt_count may be 0! */
- rc = llog_cat_initialize(obd, NULL, mds->mds_lov_desc.ld_tgt_count, NULL);
- if (rc) {
- CERROR("failed to initialize catalog %d\n", rc);
- GOTO(err_reg, rc);
- }
-
/* If we're mounting this code for the first time on an existing FS,
* we need to populate the objids array from the real OST values */
- if (mds->mds_lov_desc.ld_tgt_count > mds->mds_lov_objids_in_file) {
- int size = sizeof(obd_id) * mds->mds_lov_desc.ld_tgt_count;
- rc = obd_get_info(mds->mds_osc_exp, strlen("last_id"),
- "last_id", &size, mds->mds_lov_objids);
- if (!rc) {
- for (i = 0; i < mds->mds_lov_desc.ld_tgt_count; i++)
- CWARN("got last object "LPU64" from OST %d\n",
- mds->mds_lov_objids[i], i);
- mds->mds_lov_objids_dirty = 1;
- rc = mds_lov_write_objids(obd);
- if (rc)
- CERROR("got last objids from OSTs, but error "
- "writing objids file: %d\n", rc);
+ if (mds->mds_lov_desc.ld_tgt_count > mds->mds_lov_objid_count) {
+ __u32 i = mds->mds_lov_objid_count;
+ for(; i <= mds->mds_lov_desc.ld_tgt_count; i++) {
+ rc = mds_lov_get_objid(obd, mds->mds_osc_exp, i);
+ if (rc != 0)
+ break;
}
+ if (rc == 0)
+ rc = mds_lov_write_objids(obd);
+ if (rc)
+ CERROR("got last objids from OSTs, but error "
+ "in update objids file: %d\n", rc);
}
mutex_up(&obd->obd_dev_sem);
mgi.group = FILTER_GROUP_MDS0 + mds->mds_id;
mgi.uuid = uuid;
- rc = obd_set_info_async(mds->mds_osc_exp, strlen(KEY_MDS_CONN),
+ rc = obd_set_info_async(mds->mds_osc_exp, sizeof(KEY_MDS_CONN),
KEY_MDS_CONN, sizeof(mgi), &mgi, NULL);
if (rc != 0)
GOTO(out, rc);
obd_uuid2str(uuid), rc);
GOTO(out, rc);
}
-
+
if (obd->obd_upcall.onu_owner) {
/*
* This is a hack for mds_notify->mdd_notify. When the mds obd
conv_end:
return rc;
}
-
-void mds_objids_from_lmm(obd_id *ids, struct lov_mds_md *lmm,
- struct lov_desc *desc)
-{
- int i;
- for (i = 0; i < le32_to_cpu(lmm->lmm_stripe_count); i++) {
- ids[le32_to_cpu(lmm->lmm_objects[i].l_ost_idx)] =
- le64_to_cpu(lmm->lmm_objects[i].l_object_id);
- }
-}
-EXPORT_SYMBOL(mds_objids_from_lmm);
struct mds_update_record *rec,
struct mds_obd *mds, struct obd_device *obd,
struct dentry *dchild, void **handle,
- obd_id **ids)
+ struct lov_mds_md **objid)
{
struct inode *inode = dchild->d_inode;
struct obd_trans_info oti = { 0 };
if (body->valid & OBD_MD_FLEASIZE)
RETURN(0);
- OBD_ALLOC(*ids, mds->mds_lov_desc.ld_tgt_count * sizeof(**ids));
- if (*ids == NULL)
- RETURN(-ENOMEM);
oti_init(&oti, req);
- oti.oti_objid = *ids;
/* replay case */
if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY) {
GOTO(out_ids, rc);
}
- mds_objids_from_lmm(*ids, lmm, &mds->mds_lov_desc);
-
rc = fsfilt_set_md(obd, inode, *handle, lmm, lmm_size, "lov");
if (rc)
CERROR("open replay failed to set md:%d\n", rc);
LASSERT(lmm_buf);
memcpy(lmm_buf, lmm, lmm_size);
+ *objid = lmm_buf;
RETURN(rc);
}
lmm_buf = lustre_msg_buf(req->rq_repmsg, offset, lmm_size);
LASSERT(lmm_buf);
memcpy(lmm_buf, lmm, lmm_size);
+
+ *objid = lmm_buf; // save for mds_lov_update_objid
+
free_diskmd:
obd_free_diskmd(mds->mds_osc_exp, &lmm);
out_oa:
oti_free_cookies(&oti);
OBDO_FREE(oinfo.oi_oa);
out_ids:
- if (rc) {
- OBD_FREE(*ids, mds->mds_lov_desc.ld_tgt_count * sizeof(**ids));
- *ids = NULL;
- }
if (oinfo.oi_md)
obd_free_memmd(mds->mds_osc_exp, &oinfo.oi_md);
RETURN(rc);
struct mds_obd *mds = mds_req2mds(req);
struct obd_device *obd = req->rq_export->exp_obd;
struct mds_file_data *mfd = NULL;
- obd_id *ids = NULL; /* object IDs created */
+ struct lov_mds_md *lmm; /* object IDs created */
int rc = 0;
ENTRY;
!(body->valid & OBD_MD_FLMODEASIZE)) {
/* no EA: create objects */
rc = mds_create_objects(req, DLM_REPLY_REC_OFF + 1, rec,
- mds, obd, dchild, handle, &ids);
+ mds, obd, dchild, handle, &lmm);
if (rc) {
CERROR("mds_create_objects: rc = %d\n", rc);
UNLOCK_INODE_MUTEX(dchild->d_inode);
CDEBUG(D_INODE, "mfd %p, cookie "LPX64"\n", mfd,
mfd->mfd_handle.h_cookie);
- if (ids != NULL) {
- mds_lov_update_objids(obd, ids);
- OBD_FREE(ids, sizeof(*ids) * mds->mds_lov_desc.ld_tgt_count);
- }
+ if (lmm != NULL)
+ mds_lov_update_objids(obd, lmm);
+
if (rc) /* coverity[deadcode] */
mds_mfd_unlink(mfd, 1);