* threaded operation on the OST.
*/
-#define EXPORT_SYMTAB
#define DEBUG_SUBSYSTEM S_FILTER
#include <linux/config.h>
#include <linux/module.h>
-#include <linux/pagemap.h> // XXX kill me soon
#include <linux/fs.h>
#include <linux/dcache.h>
-#include <linux/obd_class.h>
-#include <linux/lustre_dlm.h>
-#include <linux/obd_filter.h>
#include <linux/init.h>
-#include <linux/random.h>
-#include <linux/lustre_fsfilt.h>
-#include <linux/lprocfs_status.h>
#include <linux/version.h>
+#include <linux/jbd.h>
+#include <linux/ext3_fs.h>
#if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
-#include <linux/mount.h>
+# include <linux/mount.h>
+# include <linux/buffer_head.h>
+# include <linux/bio.h>
#endif
-enum {
- LPROC_FILTER_READ_BYTES = 0,
- LPROC_FILTER_WRITE_BYTES = 1,
- LPROC_FILTER_LAST,
-};
-
-#define S_SHIFT 12
-static char *obd_type_by_mode[S_IFMT >> S_SHIFT] = {
- [0] NULL,
- [S_IFREG >> S_SHIFT] "R",
- [S_IFDIR >> S_SHIFT] "D",
- [S_IFCHR >> S_SHIFT] "C",
- [S_IFBLK >> S_SHIFT] "B",
- [S_IFIFO >> S_SHIFT] "F",
- [S_IFSOCK >> S_SHIFT] "S",
- [S_IFLNK >> S_SHIFT] "L"
-};
-
-static inline const char *obd_mode_to_type(int mode)
-{
- return obd_type_by_mode[(mode & S_IFMT) >> S_SHIFT];
-}
-
-static void filter_ffd_addref(void *ffdp)
-{
- struct filter_file_data *ffd = ffdp;
-
- atomic_inc(&ffd->ffd_refcount);
- CDEBUG(D_INFO, "GETting ffd %p : new refcount %d\n", ffd,
- atomic_read(&ffd->ffd_refcount));
-}
-
-static struct filter_file_data *filter_ffd_new(void)
-{
- struct filter_file_data *ffd;
-
- OBD_ALLOC(ffd, sizeof *ffd);
- if (ffd == NULL) {
- CERROR("out of memory\n");
- return NULL;
- }
-
- atomic_set(&ffd->ffd_refcount, 2);
-
- INIT_LIST_HEAD(&ffd->ffd_handle.h_link);
- class_handle_hash(&ffd->ffd_handle, filter_ffd_addref);
+#include <linux/obd_class.h>
+#include <linux/obd_lov.h>
+#include <linux/obd_ost.h>
+#include <linux/lustre_dlm.h>
+#include <linux/lustre_fsfilt.h>
+#include <linux/lprocfs_status.h>
+#include <linux/lustre_log.h>
+#include <linux/lustre_commit_confd.h>
+#include <libcfs/list.h>
- return ffd;
-}
+#include <linux/lustre_smfs.h>
+#include <linux/lustre_sec.h>
+#include "filter_internal.h"
-static struct filter_file_data *filter_handle2ffd(struct lustre_handle *handle)
-{
- struct filter_file_data *ffd = NULL;
- ENTRY;
- LASSERT(handle != NULL);
- ffd = class_handle2object(handle->cookie);
- if (ffd != NULL)
- LASSERT(ffd->ffd_file->private_data == ffd);
- RETURN(ffd);
-}
+/* Group 0 is no longer a legal group, to catch uninitialized IDs */
+#define FILTER_MIN_GROUPS 3
-static void filter_ffd_put(struct filter_file_data *ffd)
-{
- CDEBUG(D_INFO, "PUTting ffd %p : new refcount %d\n", ffd,
- atomic_read(&ffd->ffd_refcount) - 1);
- LASSERT(atomic_read(&ffd->ffd_refcount) > 0 &&
- atomic_read(&ffd->ffd_refcount) < 0x5a5a);
- if (atomic_dec_and_test(&ffd->ffd_refcount)) {
- LASSERT(list_empty(&ffd->ffd_handle.h_link));
- OBD_FREE(ffd, sizeof *ffd);
- }
-}
+static struct lvfs_callback_ops filter_lvfs_ops;
-static void filter_ffd_destroy(struct filter_file_data *ffd)
-{
- class_handle_unhash(&ffd->ffd_handle);
- filter_ffd_put(ffd);
-}
+static int filter_destroy(struct obd_export *exp, struct obdo *oa,
+ struct lov_stripe_md *ea, struct obd_trans_info *);
+struct obd_llogs *filter_grab_llog_for_group(struct obd_device *,
+ int, struct obd_export *);
-static void filter_commit_cb(struct obd_device *obd, __u64 transno, int error)
+static void filter_commit_cb(struct obd_device *obd, __u64 transno,
+ void *cb_data, int error)
{
obd_transno_commit_cb(obd, transno, error);
}
+
/* Assumes caller has already pushed us into the kernel context. */
-int filter_finish_transno(struct obd_export *export, void *handle,
- struct obd_trans_info *oti, int rc)
+int filter_finish_transno(struct obd_export *exp, struct obd_trans_info *oti,
+ int rc)
{
- __u64 last_rcvd;
- struct obd_device *obd = export->exp_obd;
- struct filter_obd *filter = &obd->u.filter;
- struct filter_export_data *fed = &export->exp_filter_data;
+ struct filter_obd *filter = &exp->exp_obd->u.filter;
+ struct filter_export_data *fed = &exp->exp_filter_data;
struct filter_client_data *fcd = fed->fed_fcd;
+ __u64 last_rcvd;
loff_t off;
- ssize_t written;
+ int err, log_pri = D_HA;
/* Propagate error code. */
if (rc)
RETURN(rc);
- if (!obd->obd_replayable)
+ if (!exp->exp_obd->obd_replayable || oti == NULL)
RETURN(rc);
/* we don't allocate new transnos for replayed requests */
- if (oti && oti->oti_transno == 0) {
+ if (oti->oti_transno == 0) {
spin_lock(&filter->fo_translock);
- last_rcvd = le64_to_cpu(filter->fo_fsd->fsd_last_rcvd) + 1;
- filter->fo_fsd->fsd_last_rcvd = cpu_to_le64(last_rcvd);
+ last_rcvd = le64_to_cpu(filter->fo_fsd->fsd_last_transno) + 1;
+ filter->fo_fsd->fsd_last_transno = cpu_to_le64(last_rcvd);
spin_unlock(&filter->fo_translock);
oti->oti_transno = last_rcvd;
- fcd->fcd_last_rcvd = cpu_to_le64(last_rcvd);
- fcd->fcd_mount_count = filter->fo_fsd->fsd_mount_count;
-
- /* could get xid from oti, if it's ever needed */
- fcd->fcd_last_xid = 0;
-
- off = fed->fed_lr_off;
- fsfilt_set_last_rcvd(obd, last_rcvd, handle, filter_commit_cb);
- written = lustre_fwrite(filter->fo_rcvd_filp, (char *)fcd,
- sizeof(*fcd), &off);
- CDEBUG(D_HA, "wrote trans #"LPD64" for client %s at #%d: "
- "written = "LPSZ"\n", last_rcvd, fcd->fcd_uuid,
- fed->fed_lr_idx, written);
-
- if (written == sizeof(*fcd))
- RETURN(0);
- CERROR("error writing to last_rcvd file: rc = %d\n",
- (int)written);
- if (written >= 0)
- RETURN(-EIO);
-
- RETURN(written);
- }
+ } else {
+ spin_lock(&filter->fo_translock);
+ last_rcvd = oti->oti_transno;
+ if (last_rcvd > le64_to_cpu(filter->fo_fsd->fsd_last_transno))
+ filter->fo_fsd->fsd_last_transno =
+ cpu_to_le64(last_rcvd);
+ spin_unlock(&filter->fo_translock);
+ }
+
+ fcd->fcd_last_rcvd = cpu_to_le64(last_rcvd);
- RETURN(0);
+ /* could get xid from oti, if it's ever needed */
+ fcd->fcd_last_xid = 0;
+
+ off = fed->fed_lr_off;
+
+ fsfilt_add_journal_cb(exp->exp_obd, filter->fo_sb, last_rcvd,
+ oti->oti_handle, filter_commit_cb, NULL);
+
+ err = fsfilt_write_record(exp->exp_obd, filter->fo_rcvd_filp, fcd,
+ sizeof(*fcd), &off, 0);
+ if (err) {
+ log_pri = D_ERROR;
+ if (rc == 0)
+ rc = err;
+ }
+
+ CDEBUG(log_pri, "wrote trans "LPU64" for client %s at #%d: err = %d\n",
+ last_rcvd, fcd->fcd_uuid, fed->fed_lr_idx, err);
+
+ RETURN(rc);
}
-static inline void f_dput(struct dentry *dentry)
+void f_dput(struct dentry *dentry)
{
/* Can't go inside filter_ddelete because it can block */
CDEBUG(D_INODE, "putting %s: %p, count = %d\n",
dput(dentry);
}
-/* Not racy w.r.t. others, because we are the only user of this dentry */
-static void filter_drelease(struct dentry *dentry)
-{
- if (dentry->d_fsdata)
- OBD_FREE(dentry->d_fsdata, sizeof(struct filter_dentry_data));
-}
-
-struct dentry_operations filter_dops = {
- .d_release = filter_drelease,
-};
-
-#define LAST_RCVD "last_rcvd"
-#define INIT_OBJID 2
-
-/* This limit is arbitrary, but for now we fit it in 1 page (32k clients) */
-#define FILTER_LR_MAX_CLIENTS (PAGE_SIZE * 8)
-#define FILTER_LR_MAX_CLIENT_WORDS (FILTER_LR_MAX_CLIENTS/sizeof(unsigned long))
-
/* Add client data to the FILTER. We use a bitmap to locate a free space
* in the last_rcvd file if cl_idx is -1 (i.e. a new client).
* Otherwise, we have just read the data from the last_rcvd file and
- * we know its offset.
- */
-int filter_client_add(struct obd_device *obd, struct filter_obd *filter,
- struct filter_export_data *fed, int cl_idx)
+ * we know its offset. */
+static int filter_client_add(struct obd_device *obd, struct filter_obd *filter,
+ struct filter_export_data *fed, int cl_idx)
{
unsigned long *bitmap = filter->fo_last_rcvd_slots;
int new_client = (cl_idx == -1);
+ ENTRY;
LASSERT(bitmap != NULL);
/* XXX if fcd_uuid were a real obd_uuid, I could use obd_uuid_equals */
- if (!strcmp(fed->fed_fcd->fcd_uuid, "OBD_CLASS_UUID"))
+ if (!strcmp((char *)fed->fed_fcd->fcd_uuid, (char *)obd->obd_uuid.uuid))
RETURN(0);
/* the bitmap operations can handle cl_idx > sizeof(long) * 8, so
repeat:
if (cl_idx >= FILTER_LR_MAX_CLIENTS) {
CERROR("no client slots - fix FILTER_LR_MAX_CLIENTS\n");
- return -ENOMEM;
+ RETURN(-ENOMEM);
}
if (test_and_set_bit(cl_idx, bitmap)) {
CERROR("FILTER client %d: found bit is set in bitmap\n",
fed->fed_lr_idx, fed->fed_lr_off, fed->fed_fcd->fcd_uuid);
if (new_client) {
- struct obd_run_ctxt saved;
+ struct lvfs_run_ctxt saved;
loff_t off = fed->fed_lr_off;
- ssize_t written;
+ int err;
void *handle;
CDEBUG(D_INFO, "writing client fcd at idx %u (%llu) (len %u)\n",
fed->fed_lr_idx,off,(unsigned int)sizeof(*fed->fed_fcd));
- push_ctxt(&saved, &filter->fo_ctxt, NULL);
- /* Transaction eeded to fix for bug 1403 */
+ push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
+ /* Transaction needed to fix bug 1403 */
handle = fsfilt_start(obd,
filter->fo_rcvd_filp->f_dentry->d_inode,
- FSFILT_OP_SETATTR);
+ FSFILT_OP_SETATTR, NULL);
if (IS_ERR(handle)) {
- written = PTR_ERR(handle);
- CERROR("unable to start transaction: rc %d\n",
- (int)written);
+ err = PTR_ERR(handle);
+ CERROR("unable to start transaction: rc %d\n", err);
} else {
- written = lustre_fwrite(filter->fo_rcvd_filp,
- (char *)fed->fed_fcd,
- sizeof(*fed->fed_fcd), &off);
- fsfilt_commit(obd,
+ err = fsfilt_write_record(obd, filter->fo_rcvd_filp,
+ fed->fed_fcd,
+ sizeof(*fed->fed_fcd),
+ &off, 1);
+ fsfilt_commit(obd, filter->fo_sb,
filter->fo_rcvd_filp->f_dentry->d_inode,
- handle, 0);
+ handle, 1);
}
- pop_ctxt(&saved, &filter->fo_ctxt, NULL);
+ pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
- if (written != sizeof(*fed->fed_fcd)) {
- if (written < 0)
- RETURN(written);
- RETURN(-EIO);
+ if (err) {
+ CERROR("error writing %s client idx %u: rc %d\n",
+ LAST_RCVD, fed->fed_lr_idx, err);
+ RETURN(err);
}
}
- return 0;
+ RETURN(0);
}
-int filter_client_free(struct obd_export *exp, int failover)
+static int filter_client_free(struct obd_export *exp, int flags)
{
struct filter_export_data *fed = &exp->exp_filter_data;
struct filter_obd *filter = &exp->exp_obd->u.filter;
+ struct obd_device *obd = exp->exp_obd;
struct filter_client_data zero_fcd;
- struct obd_run_ctxt saved;
- int written;
+ struct lvfs_run_ctxt saved;
+ int rc;
loff_t off;
ENTRY;
- if (!fed->fed_fcd)
+ if (fed->fed_fcd == NULL)
RETURN(0);
- if (failover != 0)
+ if (flags & OBD_OPT_FAILOVER)
GOTO(free, 0);
/* XXX if fcd_uuid were a real obd_uuid, I could use obd_uuid_equals */
- if (!strcmp(fed->fed_fcd->fcd_uuid, "OBD_CLASS_UUID"))
+ if (!strcmp((char *)fed->fed_fcd->fcd_uuid, (char *)obd->obd_uuid.uuid))
GOTO(free, 0);
LASSERT(filter->fo_last_rcvd_slots != NULL);
CDEBUG(D_INFO, "freeing client at idx %u (%lld) with UUID '%s'\n",
fed->fed_lr_idx, fed->fed_lr_off, fed->fed_fcd->fcd_uuid);
- if (!test_and_clear_bit(fed->fed_lr_idx, filter->fo_last_rcvd_slots)) {
+ /* Clear the bit _after_ zeroing out the client so we don't
+ race with filter_client_add and zero out new clients.*/
+ if (!test_bit(fed->fed_lr_idx, filter->fo_last_rcvd_slots)) {
CERROR("FILTER client %u: bit already clear in bitmap!!\n",
fed->fed_lr_idx);
LBUG();
}
memset(&zero_fcd, 0, sizeof zero_fcd);
- push_ctxt(&saved, &filter->fo_ctxt, NULL);
- written = lustre_fwrite(filter->fo_rcvd_filp, (const char *)&zero_fcd,
- sizeof(zero_fcd), &off);
-
- /* XXX: this write gets lost sometimes, unless this sync is here. */
- if (written > 0)
- file_fsync(filter->fo_rcvd_filp,
- filter->fo_rcvd_filp->f_dentry, 1);
- pop_ctxt(&saved, &filter->fo_ctxt, NULL);
-
- if (written != sizeof(zero_fcd)) {
- CERROR("error zeroing out client %s idx %u (%llu) in %s: %d\n",
- fed->fed_fcd->fcd_uuid, fed->fed_lr_idx, fed->fed_lr_off,
- LAST_RCVD, written);
- } else {
- CDEBUG(D_INFO,
- "zeroed disconnecting client %s at idx %u (%llu)\n",
- fed->fed_fcd->fcd_uuid, fed->fed_lr_idx,fed->fed_lr_off);
+ push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
+ rc = fsfilt_write_record(obd, filter->fo_rcvd_filp, &zero_fcd,
+ sizeof(zero_fcd), &off, 1);
+ if (rc == 0)
+ /* update server's transno */
+ filter_update_server_data(obd, filter->fo_rcvd_filp,
+ filter->fo_fsd, 1);
+ pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
+
+ CDEBUG(rc == 0 ? D_INFO : D_ERROR,
+ "zeroing disconnecting client %s at idx %u (%llu) in %s rc %d\n",
+ fed->fed_fcd->fcd_uuid, fed->fed_lr_idx, fed->fed_lr_off,
+ LAST_RCVD, rc);
+
+ if (!test_and_clear_bit(fed->fed_lr_idx, filter->fo_last_rcvd_slots)) {
+ CERROR("FILTER client %u: bit already clear in bitmap!!\n",
+ fed->fed_lr_idx);
+ LBUG();
}
free:
{
OBD_FREE(filter->fo_fsd, sizeof(*filter->fo_fsd));
filter->fo_fsd = NULL;
- OBD_FREE(filter->fo_last_rcvd_slots,
- FILTER_LR_MAX_CLIENT_WORDS * sizeof(unsigned long));
+ OBD_FREE(filter->fo_last_rcvd_slots, FILTER_LR_MAX_CLIENTS/8);
filter->fo_last_rcvd_slots = NULL;
return 0;
}
-
/* assumes caller is already in kernel ctxt */
-static int filter_update_server_data(struct file *filp,
- struct filter_server_data *fsd)
+int filter_update_server_data(struct obd_device *obd, struct file *filp,
+ struct filter_server_data *fsd, int force_sync)
{
loff_t off = 0;
int rc;
+ ENTRY;
CDEBUG(D_INODE, "server uuid : %s\n", fsd->fsd_uuid);
- CDEBUG(D_INODE, "server last_objid: "LPU64"\n",
- le64_to_cpu(fsd->fsd_last_objid));
CDEBUG(D_INODE, "server last_rcvd : "LPU64"\n",
- le64_to_cpu(fsd->fsd_last_rcvd));
+ le64_to_cpu(fsd->fsd_last_transno));
CDEBUG(D_INODE, "server last_mount: "LPU64"\n",
le64_to_cpu(fsd->fsd_mount_count));
- rc = lustre_fwrite(filp, (char *)fsd, sizeof(*fsd), &off);
- if (rc != sizeof(*fsd)) {
- CDEBUG(D_INODE, "error writing filter_server_data: rc = %d\n",
- rc);
- RETURN(-EIO);
+ rc = fsfilt_write_record(obd, filp, fsd, sizeof(*fsd), &off,force_sync);
+ if (rc)
+ CERROR("error writing filter_server_data: rc = %d\n", rc);
+
+ RETURN(rc);
+}
+
+int filter_update_last_objid(struct obd_device *obd, obd_gr group,
+ int force_sync)
+{
+ struct filter_obd *filter = &obd->u.filter;
+ __u64 tmp;
+ loff_t off = 0;
+ int rc;
+ ENTRY;
+
+ if (filter->fo_last_objid_files[group] == NULL) {
+ CERROR("Object group "LPU64" not fully setup; not updating "
+ "last_objid\n", group);
+ RETURN(0);
}
- RETURN(0);
+
+ CDEBUG(D_INODE, "server last_objid for group "LPU64": "LPU64"\n",
+ group, filter->fo_last_objids[group]);
+
+ tmp = cpu_to_le64(filter->fo_last_objids[group]);
+ rc = fsfilt_write_record(obd, filter->fo_last_objid_files[group],
+ &tmp, sizeof(tmp), &off, force_sync);
+ if (rc)
+ CERROR("error writing group "LPU64" last objid: rc = %d\n",
+ group, rc);
+ RETURN(rc);
}
/* assumes caller has already in kernel ctxt */
-static int filter_init_server_data(struct obd_device *obd, struct file * filp,
- __u64 init_lastobjid)
+static int filter_init_server_data(struct obd_device *obd, struct file * filp)
{
struct filter_obd *filter = &obd->u.filter;
struct filter_server_data *fsd;
struct filter_client_data *fcd = NULL;
struct inode *inode = filp->f_dentry->d_inode;
unsigned long last_rcvd_size = inode->i_size;
- __u64 mount_count = 0;
+ __u64 mount_count;
int cl_idx;
loff_t off = 0;
int rc;
RETURN(-ENOMEM);
filter->fo_fsd = fsd;
- OBD_ALLOC(filter->fo_last_rcvd_slots,
- FILTER_LR_MAX_CLIENT_WORDS * sizeof(unsigned long));
+ OBD_ALLOC(filter->fo_last_rcvd_slots, FILTER_LR_MAX_CLIENTS/8);
if (filter->fo_last_rcvd_slots == NULL) {
OBD_FREE(fsd, sizeof(*fsd));
RETURN(-ENOMEM);
}
if (last_rcvd_size == 0) {
- CERROR("%s: initializing new last_rcvd\n", obd->obd_name);
+ CWARN("%s: initializing new %s\n", obd->obd_name, LAST_RCVD);
memcpy(fsd->fsd_uuid, obd->obd_uuid.uuid,sizeof(fsd->fsd_uuid));
- fsd->fsd_last_objid = cpu_to_le64(init_lastobjid);
- fsd->fsd_last_rcvd = 0;
+ fsd->fsd_last_transno = 0;
mount_count = fsd->fsd_mount_count = 0;
fsd->fsd_server_size = cpu_to_le32(FILTER_LR_SERVER_SIZE);
fsd->fsd_client_start = cpu_to_le32(FILTER_LR_CLIENT_START);
fsd->fsd_subdir_count = cpu_to_le16(FILTER_SUBDIR_COUNT);
filter->fo_subdir_count = FILTER_SUBDIR_COUNT;
} else {
- ssize_t retval = lustre_fread(filp, (char *)fsd, sizeof(*fsd),
- &off);
- if (retval != sizeof(*fsd)) {
- CDEBUG(D_INODE,"OBD filter: error reading %s\n",
- LAST_RCVD);
- GOTO(err_fsd, rc = -EIO);
+ rc = fsfilt_read_record(obd, filp, fsd, sizeof(*fsd), &off);
+ if (rc) {
+ CDEBUG(D_INODE,"OBD filter: error reading %s: rc %d\n",
+ LAST_RCVD, rc);
+ GOTO(err_fsd, rc);
+ }
+ if (strcmp((char *)fsd->fsd_uuid, (char *)obd->obd_uuid.uuid)) {
+ CERROR("OBD UUID %s does not match last_rcvd UUID %s\n",
+ obd->obd_uuid.uuid, fsd->fsd_uuid);
+ GOTO(err_fsd, rc = -EINVAL);
}
mount_count = le64_to_cpu(fsd->fsd_mount_count);
filter->fo_subdir_count = le16_to_cpu(fsd->fsd_subdir_count);
}
- if (fsd->fsd_feature_incompat) {
+ if (fsd->fsd_feature_incompat & ~cpu_to_le32(FILTER_INCOMPAT_SUPP)) {
CERROR("unsupported feature %x\n",
- le32_to_cpu(fsd->fsd_feature_incompat));
+ le32_to_cpu(fsd->fsd_feature_incompat) &
+ ~FILTER_INCOMPAT_SUPP);
GOTO(err_fsd, rc = -EINVAL);
}
- if (fsd->fsd_feature_rocompat) {
+ if (fsd->fsd_feature_rocompat & ~cpu_to_le32(FILTER_ROCOMPAT_SUPP)) {
CERROR("read-only feature %x\n",
- le32_to_cpu(fsd->fsd_feature_rocompat));
+ le32_to_cpu(fsd->fsd_feature_rocompat) &
+ ~FILTER_ROCOMPAT_SUPP);
/* Do something like remount filesystem read-only */
GOTO(err_fsd, rc = -EINVAL);
}
- CDEBUG(D_INODE, "%s: server last_objid: "LPU64"\n",
- obd->obd_name, le64_to_cpu(fsd->fsd_last_objid));
CDEBUG(D_INODE, "%s: server last_rcvd : "LPU64"\n",
- obd->obd_name, le64_to_cpu(fsd->fsd_last_rcvd));
- CDEBUG(D_INODE, "%s: server last_mount: "LPU64"\n",
- obd->obd_name, mount_count);
+ obd->obd_name, le64_to_cpu(fsd->fsd_last_transno));
+ CDEBUG(D_INODE, "%s: server mount_count: "LPU64"\n",
+ obd->obd_name, mount_count + 1);
CDEBUG(D_INODE, "%s: server data size: %u\n",
obd->obd_name, le32_to_cpu(fsd->fsd_server_size));
CDEBUG(D_INODE, "%s: per-client data start: %u\n",
obd->obd_name, le32_to_cpu(fsd->fsd_client_size));
CDEBUG(D_INODE, "%s: server subdir_count: %u\n",
obd->obd_name, le16_to_cpu(fsd->fsd_subdir_count));
+ CDEBUG(D_INODE, "%s: last_rcvd clients: %lu\n", obd->obd_name,
+ last_rcvd_size <= le32_to_cpu(fsd->fsd_client_start) ? 0 :
+ (last_rcvd_size - le32_to_cpu(fsd->fsd_client_start)) /
+ le16_to_cpu(fsd->fsd_client_size));
- /*
- * When we do a clean FILTER shutdown, we save the last_rcvd into
- * the header. If we find clients with higher last_rcvd values
- * then those clients may need recovery done.
- */
if (!obd->obd_replayable) {
- CERROR("%s: recovery support OFF\n", obd->obd_name);
+ CWARN("%s: recovery support OFF\n", obd->obd_name);
GOTO(out, rc = 0);
}
- for (cl_idx = 0; off < last_rcvd_size; cl_idx++) {
+ for (cl_idx = 0, off = le32_to_cpu(fsd->fsd_client_start);
+ off < last_rcvd_size; cl_idx++) {
__u64 last_rcvd;
- int mount_age;
+ struct obd_export *exp;
+ struct filter_export_data *fed;
if (!fcd) {
OBD_ALLOC(fcd, sizeof(*fcd));
if (!fcd)
- GOTO(err_fsd, rc = -ENOMEM);
+ GOTO(err_client, rc = -ENOMEM);
}
- /* Don't assume off is incremented properly, in case
- * sizeof(fsd) isn't the same as fsd->fsd_client_size.
- */
+ /* Don't assume off is incremented properly by
+ * fsfilt_read_record(), in case sizeof(*fcd)
+ * isn't the same as fsd->fsd_client_size. */
off = le32_to_cpu(fsd->fsd_client_start) +
cl_idx * le16_to_cpu(fsd->fsd_client_size);
- rc = lustre_fread(filp, (char *)fcd, sizeof(*fcd), &off);
- if (rc != sizeof(*fcd)) {
- CERROR("error reading FILTER %s offset %d: rc = %d\n",
- LAST_RCVD, cl_idx, rc);
- if (rc > 0) /* XXX fatal error or just abort reading? */
- rc = -EIO;
- break;
+ rc = fsfilt_read_record(obd, filp, fcd, sizeof(*fcd), &off);
+ if (rc) {
+ CERROR("error reading FILT %s idx %d off %llu: rc %d\n",
+ LAST_RCVD, cl_idx, off, rc);
+ break; /* read error shouldn't cause startup to fail */
}
if (fcd->fcd_uuid[0] == '\0') {
/* These exports are cleaned up by filter_disconnect(), so they
* need to be set up like real exports as filter_connect() does.
*/
- mount_age = mount_count - le64_to_cpu(fcd->fcd_mount_count);
- if (mount_age < FILTER_MOUNT_RECOV) {
- struct obd_export *exp = class_new_export(obd);
- struct filter_export_data *fed;
- CERROR("RCVRNG CLIENT uuid: %s idx: %d lr: "LPU64
- " srv lr: "LPU64" mnt: "LPU64" last mount: "
- LPU64"\n", fcd->fcd_uuid, cl_idx,
- last_rcvd, le64_to_cpu(fsd->fsd_last_rcvd),
- le64_to_cpu(fcd->fcd_mount_count), mount_count);
- if (exp == NULL) {
- /* XXX this rc is ignored */
- rc = -ENOMEM;
- break;
- }
- memcpy(&exp->exp_client_uuid.uuid, fcd->fcd_uuid,
- sizeof exp->exp_client_uuid.uuid);
- fed = &exp->exp_filter_data;
- fed->fed_fcd = fcd;
- filter_client_add(obd, filter, fed, cl_idx);
- /* create helper if export init gets more complex */
- INIT_LIST_HEAD(&fed->fed_open_head);
- spin_lock_init(&fed->fed_lock);
-
- fcd = NULL;
- obd->obd_recoverable_clients++;
- class_export_put(exp);
- } else {
- CDEBUG(D_INFO,
- "discarded client %d UUID '%s' count "LPU64"\n",
- cl_idx, fcd->fcd_uuid,
- le64_to_cpu(fcd->fcd_mount_count));
- }
+ exp = class_new_export(obd);
+ CDEBUG(D_HA, "RCVRNG CLIENT uuid: %s idx: %d lr: "LPU64" "
+ "srv lr: "LPU64" fcd_group %d \n", fcd->fcd_uuid, cl_idx,
+ last_rcvd, le64_to_cpu(fsd->fsd_last_transno),
+ le32_to_cpu(fcd->fcd_group));
+ if (exp == NULL)
+ GOTO(err_client, rc = -ENOMEM);
+
+ memcpy(&exp->exp_client_uuid.uuid, fcd->fcd_uuid,
+ sizeof exp->exp_client_uuid.uuid);
+ fed = &exp->exp_filter_data;
+ fed->fed_fcd = fcd;
+ fed->fed_group = le32_to_cpu(fcd->fcd_group);
+ filter_client_add(obd, filter, fed, cl_idx);
+ /* create helper if export init gets more complex */
+ spin_lock_init(&fed->fed_lock);
+
+ fcd = NULL;
+ exp->exp_connected = 0;
+ exp->exp_req_replay_needed = 1;
+ exp->exp_lock_replay_needed = 1;
+ atomic_inc(&obd->obd_req_replay_clients);
+ atomic_inc(&obd->obd_lock_replay_clients);
+ obd->obd_recoverable_clients++;
+ obd->obd_max_recoverable_clients++;
+ class_export_put(exp);
CDEBUG(D_OTHER, "client at idx %d has last_rcvd = "LPU64"\n",
cl_idx, last_rcvd);
- if (last_rcvd > le64_to_cpu(filter->fo_fsd->fsd_last_rcvd))
- filter->fo_fsd->fsd_last_rcvd = cpu_to_le64(last_rcvd);
-
- obd->obd_last_committed =
- le64_to_cpu(filter->fo_fsd->fsd_last_rcvd);
- if (obd->obd_recoverable_clients) {
- CERROR("RECOVERY: %d recoverable clients, last_rcvd "
- LPU64"\n", obd->obd_recoverable_clients,
- le64_to_cpu(filter->fo_fsd->fsd_last_rcvd));
- obd->obd_next_recovery_transno =
- obd->obd_last_committed + 1;
- obd->obd_recovering = 1;
- }
+ if (last_rcvd > le64_to_cpu(fsd->fsd_last_transno))
+ fsd->fsd_last_transno = cpu_to_le64(last_rcvd);
}
+ obd->obd_last_committed = le64_to_cpu(fsd->fsd_last_transno);
+
+ if (obd->obd_recoverable_clients) {
+ CWARN("RECOVERY: service %s, %d recoverable clients, "
+ "last_transno "LPU64"\n", obd->obd_name,
+ obd->obd_recoverable_clients,
+ le64_to_cpu(fsd->fsd_last_transno));
+ obd->obd_next_recovery_transno = obd->obd_last_committed + 1;
+ target_start_recovery_thread(obd, ost_handle);
+ }
+
if (fcd)
OBD_FREE(fcd, sizeof(*fcd));
out:
- fsd->fsd_mount_count = cpu_to_le64(mount_count + 1);
-
- /* save it,so mount count and last_recvd is current */
- rc = filter_update_server_data(filp, filter->fo_fsd);
+ filter->fo_mount_count = mount_count + 1;
+ fsd->fsd_mount_count = cpu_to_le64(filter->fo_mount_count);
- RETURN(rc);
+ /* save it, so mount count and last_transno is current */
+ rc = filter_update_server_data(obd, filp, filter->fo_fsd, 1);
+ if (rc)
+ GOTO(err_client, rc);
+ RETURN(0);
+err_client:
+ class_disconnect_exports(obd, 0);
err_fsd:
filter_free_server_data(filter);
RETURN(rc);
}
-/* setup the object store with correct subdirectories */
-static int filter_prep(struct obd_device *obd)
+static int filter_cleanup_groups(struct obd_device *obd)
{
- struct obd_run_ctxt saved;
struct filter_obd *filter = &obd->u.filter;
- struct dentry *dentry, *O_dentry;
- struct file *file;
- struct inode *inode;
- int i;
- int rc = 0;
- int mode = 0;
-
- push_ctxt(&saved, &filter->fo_ctxt, NULL);
- dentry = simple_mkdir(current->fs->pwd, "O", 0700);
- CDEBUG(D_INODE, "got/created O: %p\n", dentry);
- if (IS_ERR(dentry)) {
- rc = PTR_ERR(dentry);
- CERROR("cannot open/create O: rc = %d\n", rc);
- GOTO(out, rc);
- }
- filter->fo_dentry_O = dentry;
-
- /*
- * Create directories and/or get dentries for each object type.
- * This saves us from having to do multiple lookups for each one.
- */
- O_dentry = filter->fo_dentry_O;
- for (mode = 0; mode < (S_IFMT >> S_SHIFT); mode++) {
- char *name = obd_type_by_mode[mode];
+ struct dentry *dentry;
+ int i, k;
+ ENTRY;
- if (!name) {
- filter->fo_dentry_O_mode[mode] = NULL;
- continue;
+ for (i = 0; i < filter->fo_group_count; i++) {
+ if (filter->fo_subdirs != NULL) {
+ for (k = 0; k < filter->fo_subdir_count; k++) {
+ dentry = filter->fo_subdirs[i].dentry[k];
+ if (dentry == NULL)
+ continue;
+ f_dput(dentry);
+ filter->fo_subdirs[i].dentry[k] = NULL;
+ }
}
- dentry = simple_mkdir(O_dentry, name, 0700);
- CDEBUG(D_INODE, "got/created O/%s: %p\n", name, dentry);
- if (IS_ERR(dentry)) {
- rc = PTR_ERR(dentry);
- CERROR("cannot create O/%s: rc = %d\n", name, rc);
- GOTO(err_O_mode, rc);
+ if (filter->fo_last_objid_files[i] != NULL) {
+ filp_close(filter->fo_last_objid_files[i], 0);
+ filter->fo_last_objid_files[i] = NULL;
+ }
+ if (filter->fo_groups[i] != NULL) {
+ dput(filter->fo_groups[i]);
+ filter->fo_groups[i] = NULL;
}
- filter->fo_dentry_O_mode[mode] = dentry;
}
+ if (filter->fo_subdirs != NULL)
+ OBD_FREE(filter->fo_subdirs,
+ filter->fo_group_count * sizeof(*filter->fo_subdirs));
+ if (filter->fo_groups != NULL)
+ OBD_FREE(filter->fo_groups,
+ filter->fo_group_count * sizeof(*filter->fo_groups));
+ if (filter->fo_last_objids != NULL)
+ OBD_FREE(filter->fo_last_objids,
+ filter->fo_group_count * sizeof(__u64));
+ if (filter->fo_last_objid_files != NULL)
+ OBD_FREE(filter->fo_last_objid_files,
+ filter->fo_group_count * sizeof(struct file *));
+ f_dput(filter->fo_dentry_O);
+ RETURN(0);
+}
- file = filp_open(LAST_RCVD, O_RDWR | O_CREAT, 0700);
- if (!file || IS_ERR(file)) {
- rc = PTR_ERR(file);
- CERROR("OBD filter: cannot open/create %s: rc = %d\n",
- LAST_RCVD, rc);
- GOTO(err_O_mode, rc);
+static int filter_update_last_group(struct obd_device *obd, int group)
+{
+ struct filter_obd *filter = &obd->u.filter;
+ struct file *filp = NULL;
+ int last_group = 0, rc;
+ loff_t off = 0;
+ ENTRY;
+
+ if (group <= filter->fo_committed_group)
+ RETURN(0);
+
+ filp = filp_open("LAST_GROUP", O_RDWR, 0700);
+ if (IS_ERR(filp)) {
+ rc = PTR_ERR(filp);
+ filp = NULL;
+ CERROR("cannot open LAST_GROUP: rc = %d\n", rc);
+ GOTO(cleanup, rc);
}
- if (!S_ISREG(file->f_dentry->d_inode->i_mode)) {
- CERROR("%s is not a regular file!: mode = %o\n", LAST_RCVD,
- file->f_dentry->d_inode->i_mode);
- GOTO(err_filp, rc = -ENOENT);
+ rc = fsfilt_read_record(obd, filp, &last_group, sizeof(__u32), &off);
+ if (rc) {
+ CDEBUG(D_INODE, "error reading LAST_GROUP: rc %d\n",rc);
+ GOTO(cleanup, rc);
}
+ LASSERT(off == 0 || last_group >= FILTER_MIN_GROUPS);
+ CDEBUG(D_INODE, "%s: previous %d, new %d\n",
+ obd->obd_name, last_group, group);
- rc = fsfilt_journal_data(obd, file);
+ off = 0;
+ last_group = group;
+ /* must be sync: bXXXX */
+ rc = fsfilt_write_record(obd, filp, &last_group, sizeof(__u32), &off, 1);
if (rc) {
- CERROR("cannot journal data on %s: rc = %d\n", LAST_RCVD, rc);
- GOTO(err_filp, rc);
+ CDEBUG(D_INODE, "error updating LAST_GROUP: rc %d\n", rc);
+ GOTO(cleanup, rc);
}
- /* steal operations */
- inode = file->f_dentry->d_inode;
- filter->fo_fop = file->f_op;
- filter->fo_iop = inode->i_op;
- filter->fo_aops = inode->i_mapping->a_ops;
- rc = filter_init_server_data(obd, file, INIT_OBJID);
+ filter->fo_committed_group = group;
+cleanup:
+ if (filp)
+ filp_close(filp, 0);
+ RETURN(rc);
+}
+
+static int filter_read_group_internal(struct obd_device *obd, int group,
+ int create)
+{
+ struct filter_obd *filter = &obd->u.filter;
+ __u64 *new_objids = NULL;
+ struct filter_subdirs *new_subdirs = NULL, *tmp_subdirs = NULL;
+ struct dentry **new_groups = NULL;
+ struct file **new_files = NULL;
+ struct dentry *dentry;
+ struct file *filp;
+ int old_count = filter->fo_group_count, rc, stage = 0, i;
+ char name[25];
+ __u64 last_objid;
+ loff_t off = 0;
+
+ snprintf(name, 24, "%d", group);
+ name[24] = '\0';
+
+ if (!create) {
+ dentry = ll_lookup_one_len(name, filter->fo_dentry_O,
+ strlen(name));
+ if (IS_ERR(dentry)) {
+ CERROR("Cannot lookup expected object group %d: %ld\n",
+ group, PTR_ERR(dentry));
+ RETURN(PTR_ERR(dentry));
+ }
+ } else {
+ dentry = simple_mkdir(filter->fo_dentry_O, name, 0700, 1);
+ if (IS_ERR(dentry)) {
+ CERROR("cannot lookup/create O/%s: rc = %ld\n", name,
+ PTR_ERR(dentry));
+ RETURN(PTR_ERR(dentry));
+ }
+ }
+ stage = 1;
+
+ snprintf(name, 24, "O/%d/LAST_ID", group);
+ name[24] = '\0';
+ filp = filp_open(name, O_CREAT | O_RDWR, 0700);
+ if (IS_ERR(filp)) {
+ CERROR("cannot create %s: rc = %ld\n", name, PTR_ERR(filp));
+ GOTO(cleanup, rc = PTR_ERR(filp));
+ }
+ stage = 2;
+
+ rc = fsfilt_read_record(obd, filp, &last_objid, sizeof(__u64), &off);
if (rc) {
- CERROR("cannot read %s: rc = %d\n", LAST_RCVD, rc);
- GOTO(err_client, rc);
+ CDEBUG(D_INODE, "error reading %s: rc %d\n", name, rc);
+ GOTO(cleanup, rc);
}
- filter->fo_rcvd_filp = file;
if (filter->fo_subdir_count) {
- O_dentry = filter->fo_dentry_O_mode[S_IFREG >> S_SHIFT];
- OBD_ALLOC(filter->fo_dentry_O_sub,
- filter->fo_subdir_count * sizeof(dentry));
- if (!filter->fo_dentry_O_sub)
- GOTO(err_client, rc = -ENOMEM);
+ OBD_ALLOC(tmp_subdirs, sizeof(*tmp_subdirs));
+ if (tmp_subdirs == NULL)
+ GOTO(cleanup, rc = -ENOMEM);
+ stage = 3;
for (i = 0; i < filter->fo_subdir_count; i++) {
char dir[20];
snprintf(dir, sizeof(dir), "d%u", i);
- dentry = simple_mkdir(O_dentry, dir, 0700);
- CDEBUG(D_INODE, "got/created O/R/%s: %p\n", dir,dentry);
- if (IS_ERR(dentry)) {
- rc = PTR_ERR(dentry);
- CERROR("can't create O/R/%s: rc = %d\n",dir,rc);
- GOTO(err_O_sub, rc);
+ tmp_subdirs->dentry[i] =
+ simple_mkdir(dentry, dir, 0700, 1);
+ if (IS_ERR(tmp_subdirs->dentry[i])) {
+ rc = PTR_ERR(tmp_subdirs->dentry[i]);
+ CERROR("can't lookup/create O/%d/%s: rc = %d\n",
+ group, dir, rc);
+ GOTO(cleanup, rc);
}
- filter->fo_dentry_O_sub[i] = dentry;
+ CDEBUG(D_INODE, "got/created O/%d/%s: %p\n", group, dir,
+ tmp_subdirs->dentry[i]);
}
}
- rc = 0;
- out:
- pop_ctxt(&saved, &filter->fo_ctxt, NULL);
-
- return(rc);
-err_O_sub:
- while (i-- > 0) {
- struct dentry *dentry = filter->fo_dentry_O_sub[i];
- if (dentry) {
- f_dput(dentry);
- filter->fo_dentry_O_sub[i] = NULL;
+ /* 'group' is an index; we need an array of length 'group + 1' */
+ if (group + 1 > old_count) {
+ int len = group + 1;
+ OBD_ALLOC(new_objids, len * sizeof(*new_objids));
+ OBD_ALLOC(new_subdirs, len * sizeof(*new_subdirs));
+ OBD_ALLOC(new_groups, len * sizeof(*new_groups));
+ OBD_ALLOC(new_files, len * sizeof(*new_files));
+ stage = 4;
+ if (new_objids == NULL || new_subdirs == NULL ||
+ new_groups == NULL || new_files == NULL)
+ GOTO(cleanup, rc = -ENOMEM);
+
+ memcpy(new_objids, filter->fo_last_objids,
+ old_count * sizeof(*new_objids));
+ memcpy(new_subdirs, filter->fo_subdirs,
+ old_count * sizeof(*new_subdirs));
+ memcpy(new_groups, filter->fo_groups,
+ old_count * sizeof(*new_groups));
+ memcpy(new_files, filter->fo_last_objid_files,
+ old_count * sizeof(*new_files));
+
+ if (old_count) {
+ OBD_FREE(filter->fo_last_objids,
+ old_count * sizeof(*new_objids));
+ OBD_FREE(filter->fo_subdirs,
+ old_count * sizeof(*new_subdirs));
+ OBD_FREE(filter->fo_groups,
+ old_count * sizeof(*new_groups));
+ OBD_FREE(filter->fo_last_objid_files,
+ old_count * sizeof(*new_files));
}
+ filter->fo_last_objids = new_objids;
+ filter->fo_subdirs = new_subdirs;
+ filter->fo_groups = new_groups;
+ filter->fo_last_objid_files = new_files;
+ filter->fo_group_count = len;
}
- OBD_FREE(filter->fo_dentry_O_sub,
- filter->fo_subdir_count * sizeof(dentry));
-err_client:
- class_disconnect_exports(obd, 0);
-err_filp:
- if (filp_close(file, 0))
- CERROR("can't close %s after error\n", LAST_RCVD);
- filter->fo_rcvd_filp = NULL;
-err_O_mode:
- while (mode-- > 0) {
- struct dentry *dentry = filter->fo_dentry_O_mode[mode];
- if (dentry) {
- f_dput(dentry);
- filter->fo_dentry_O_mode[mode] = NULL;
+
+ filter->fo_groups[group] = dentry;
+ filter->fo_last_objid_files[group] = filp;
+ if (filter->fo_subdir_count) {
+ filter->fo_subdirs[group] = *tmp_subdirs;
+ OBD_FREE(tmp_subdirs, sizeof(*tmp_subdirs));
+ }
+
+ filter_update_last_group(obd, group);
+
+ if (filp->f_dentry->d_inode->i_size == 0) {
+ filter->fo_last_objids[group] = FILTER_INIT_OBJID;
+ RETURN(0);
+ }
+
+ filter->fo_last_objids[group] = le64_to_cpu(last_objid);
+ CDEBUG(D_INODE, "%s: server last_objid group %d: "LPU64"\n",
+ obd->obd_name, group, last_objid);
+ RETURN(0);
+ cleanup:
+ switch (stage) {
+ case 4:
+ if (new_objids != NULL)
+ OBD_FREE(new_objids, group * sizeof(*new_objids));
+ if (new_subdirs != NULL)
+ OBD_FREE(new_subdirs, group * sizeof(*new_subdirs));
+ if (new_groups != NULL)
+ OBD_FREE(new_groups, group * sizeof(*new_groups));
+ if (new_files != NULL)
+ OBD_FREE(new_files, group * sizeof(*new_files));
+ case 3:
+ if (filter->fo_subdir_count) {
+ for (i = 0; i < filter->fo_subdir_count; i++) {
+ if (tmp_subdirs->dentry[i] != NULL)
+ dput(tmp_subdirs->dentry[i]);
+ }
+ OBD_FREE(tmp_subdirs, sizeof(*tmp_subdirs));
}
+ case 2:
+ filp_close(filp, 0);
+ case 1:
+ dput(dentry);
}
- f_dput(filter->fo_dentry_O);
- filter->fo_dentry_O = NULL;
- goto out;
+ RETURN(rc);
}
-/* cleanup the filter: write last used object id to status file */
-static void filter_post(struct obd_device *obd)
+static int filter_read_groups(struct obd_device *obd, int last_group,
+ int create)
{
- struct obd_run_ctxt saved;
struct filter_obd *filter = &obd->u.filter;
- long rc;
- int mode;
+ int old_count, group, rc = 0;
- /* XXX: filter_update_lastobjid used to call fsync_dev. It might be
- * best to start a transaction with h_sync, because we removed this
- * from lastobjid */
+ down(&filter->fo_init_lock);
+ old_count = filter->fo_group_count;
+ for (group = old_count; group <= last_group; group++) {
+ if (group == 0)
+ continue; /* no group zero */
- push_ctxt(&saved, &filter->fo_ctxt, NULL);
- rc = filter_update_server_data(filter->fo_rcvd_filp, filter->fo_fsd);
- if (rc)
- CERROR("OBD filter: error writing lastobjid: rc = %ld\n", rc);
+ rc = filter_read_group_internal(obd, group, create);
+ if (rc != 0)
+ break;
+ }
+ up(&filter->fo_init_lock);
+ return rc;
+}
+static int filter_prep_groups(struct obd_device *obd)
+{
+ struct filter_obd *filter = &obd->u.filter;
+ struct dentry *dentry, *O_dentry;
+ int rc = 0, cleanup_phase = 0;
+ struct file *filp = NULL;
+ int last_group;
+ loff_t off = 0;
+ ENTRY;
- if (filter->fo_rcvd_filp) {
- rc = file_fsync(filter->fo_rcvd_filp,
- filter->fo_rcvd_filp->f_dentry, 1);
- filp_close(filter->fo_rcvd_filp, 0);
- filter->fo_rcvd_filp = NULL;
- if (rc)
- CERROR("last_rcvd file won't closed rc = %ld\n", rc);
+ O_dentry = simple_mkdir(current->fs->pwd, "O", 0700, 1);
+ CDEBUG(D_INODE, "got/created O: %p\n", O_dentry);
+ if (IS_ERR(O_dentry)) {
+ rc = PTR_ERR(O_dentry);
+ CERROR("cannot open/create O: rc = %d\n", rc);
+ GOTO(cleanup, rc);
}
+ filter->fo_dentry_O = O_dentry;
+ cleanup_phase = 1; /* O_dentry */
- if (filter->fo_subdir_count) {
- int i;
- for (i = 0; i < filter->fo_subdir_count; i++) {
- struct dentry *dentry = filter->fo_dentry_O_sub[i];
- f_dput(dentry);
- filter->fo_dentry_O_sub[i] = NULL;
+ /* Lookup "R" to tell if we're on an old OST FS and need to convert
+ * from O/R/<dir>/<objid> to O/0/<dir>/<objid>. This can be removed
+ * some time post 1.0 when all old-style OSTs have converted along
+ * with the init_objid hack. */
+ dentry = ll_lookup_one_len("R", O_dentry, 1);
+ if (IS_ERR(dentry))
+ GOTO(cleanup, rc = PTR_ERR(dentry));
+ if (dentry->d_inode && S_ISDIR(dentry->d_inode->i_mode)) {
+ struct dentry *O0_dentry = lookup_one_len("0", O_dentry, 1);
+ ENTRY;
+
+ CWARN("converting OST to new object layout\n");
+ if (IS_ERR(O0_dentry)) {
+ rc = PTR_ERR(O0_dentry);
+ CERROR("error looking up O/0: rc %d\n", rc);
+ GOTO(cleanup_R, rc);
}
- OBD_FREE(filter->fo_dentry_O_sub,
- filter->fo_subdir_count *
- sizeof(*filter->fo_dentry_O_sub));
- }
- for (mode = 0; mode < (S_IFMT >> S_SHIFT); mode++) {
- struct dentry *dentry = filter->fo_dentry_O_mode[mode];
- if (dentry) {
- f_dput(dentry);
- filter->fo_dentry_O_mode[mode] = NULL;
+
+ if (O0_dentry->d_inode) {
+ CERROR("Both O/R and O/0 exist. Fix manually.\n");
+ GOTO(cleanup_O0, rc = -EEXIST);
}
- }
- f_dput(filter->fo_dentry_O);
- filter_free_server_data(filter);
- pop_ctxt(&saved, &filter->fo_ctxt, NULL);
-}
+ down(&O_dentry->d_inode->i_sem);
+ rc = vfs_rename(O_dentry->d_inode, dentry,
+ O_dentry->d_inode, O0_dentry);
+ up(&O_dentry->d_inode->i_sem);
-static __u64 filter_next_id(struct filter_obd *filter)
-{
- obd_id id;
- LASSERT(filter->fo_fsd != NULL);
+ if (rc) {
+ CERROR("error renaming O/R to O/0: rc %d\n", rc);
+ GOTO(cleanup_O0, rc);
+ }
+ filter->fo_fsd->fsd_feature_incompat |=
+ cpu_to_le32(FILTER_INCOMPAT_GROUPS);
+ rc = filter_update_server_data(obd, filter->fo_rcvd_filp,
+ filter->fo_fsd, 1);
+ GOTO(cleanup_O0, rc);
+
+ cleanup_O0:
+ f_dput(O0_dentry);
+ cleanup_R:
+ f_dput(dentry);
+ if (rc)
+ GOTO(cleanup, rc);
+ } else {
+ f_dput(dentry);
+ }
- spin_lock(&filter->fo_objidlock);
- id = le64_to_cpu(filter->fo_fsd->fsd_last_objid);
- filter->fo_fsd->fsd_last_objid = cpu_to_le64(id + 1);
- spin_unlock(&filter->fo_objidlock);
+ cleanup_phase = 2; /* groups */
- return id;
-}
+ /* we have to initialize all groups before first connections from
+ * clients because they may send create/destroy for any group -bzzz */
+ filp = filp_open("LAST_GROUP", O_CREAT | O_RDWR, 0700);
+ if (IS_ERR(filp)) {
+ CERROR("cannot create LAST_GROUP: rc = %ld\n", PTR_ERR(filp));
+ GOTO(cleanup, rc = PTR_ERR(filp));
+ }
+ cleanup_phase = 3; /* filp */
-/* direct cut-n-paste of mds_blocking_ast() */
-int filter_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
- void *data, int flag)
+ rc = fsfilt_read_record(obd, filp, &last_group, sizeof(__u32), &off);
+ if (rc) {
+ CDEBUG(D_INODE, "error reading LAST_GROUP: rc %d\n", rc);
+ GOTO(cleanup, rc);
+ }
+ if (off == 0) {
+ last_group = FILTER_MIN_GROUPS;
+ } else {
+ LASSERT(last_group >= FILTER_MIN_GROUPS);
+ }
+
+ CWARN("%s: initialize groups [%d,%d]\n", obd->obd_name,
+ FILTER_MIN_GROUPS, last_group);
+ filter->fo_committed_group = last_group;
+ rc = filter_read_groups(obd, last_group, 1);
+ if (rc)
+ GOTO(cleanup, rc);
+
+ filp_close(filp, 0);
+ RETURN(0);
+
+ cleanup:
+ switch (cleanup_phase) {
+ case 3:
+ filp_close(filp, 0);
+ case 2:
+ filter_cleanup_groups(obd);
+ case 1:
+ f_dput(filter->fo_dentry_O);
+ filter->fo_dentry_O = NULL;
+ default:
+ break;
+ }
+ return rc;
+}
+
+/* setup the object store with correct subdirectories */
+static int filter_prep(struct obd_device *obd)
+{
+ struct lvfs_run_ctxt saved;
+ struct filter_obd *filter = &obd->u.filter;
+ struct file *file;
+ struct inode *inode;
+ int rc = 0;
+ ENTRY;
+
+ push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
+ file = filp_open(LAST_RCVD, O_RDWR | O_CREAT | O_LARGEFILE, 0700);
+ if (!file || IS_ERR(file)) {
+ rc = PTR_ERR(file);
+ CERROR("OBD filter: cannot open/create %s: rc = %d\n",
+ LAST_RCVD, rc);
+ GOTO(out, rc);
+ }
+
+ if (!S_ISREG(file->f_dentry->d_inode->i_mode)) {
+ CERROR("%s is not a regular file!: mode = %o\n", LAST_RCVD,
+ file->f_dentry->d_inode->i_mode);
+ GOTO(err_filp, rc = -ENOENT);
+ }
+
+ /* steal operations */
+ inode = file->f_dentry->d_inode;
+ filter->fo_fop = file->f_op;
+ filter->fo_iop = inode->i_op;
+ filter->fo_aops = inode->i_mapping->a_ops;
+
+ rc = filter_init_server_data(obd, file);
+ if (rc) {
+ CERROR("cannot read %s: rc = %d\n", LAST_RCVD, rc);
+ GOTO(err_filp, rc);
+ }
+ filter->fo_rcvd_filp = file;
+
+ rc = filter_prep_groups(obd);
+ if (rc)
+ GOTO(err_server_data, rc);
+
+ out:
+ pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
+
+ return(rc);
+
+ err_server_data:
+ //class_disconnect_exports(obd, 0);
+ filter_free_server_data(filter);
+ err_filp:
+ if (filp_close(file, 0))
+ CERROR("can't close %s after error\n", LAST_RCVD);
+ filter->fo_rcvd_filp = NULL;
+ goto out;
+}
+
+/* cleanup the filter: write last used object id to status file */
+static void filter_post(struct obd_device *obd)
+{
+ struct lvfs_run_ctxt saved;
+ struct filter_obd *filter = &obd->u.filter;
+ int rc, i;
+
+ /* XXX: filter_update_lastobjid used to call fsync_dev. It might be
+ * best to start a transaction with h_sync, because we removed this
+ * from lastobjid */
+
+ push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
+ rc = filter_update_server_data(obd, filter->fo_rcvd_filp,
+ filter->fo_fsd, 0);
+ if (rc)
+ CERROR("error writing server data: rc = %d\n", rc);
+
+ for (i = 1; i < filter->fo_group_count; i++) {
+ rc = filter_update_last_objid(obd, i,
+ (i == filter->fo_group_count - 1));
+ if (rc)
+ CERROR("error writing group %d lastobjid: rc = %d\n",
+ i, rc);
+ }
+
+ rc = filp_close(filter->fo_rcvd_filp, 0);
+ filter->fo_rcvd_filp = NULL;
+ if (rc)
+ CERROR("error closing %s: rc = %d\n", LAST_RCVD, rc);
+
+ filter_cleanup_groups(obd);
+ filter_free_server_data(filter);
+ pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
+}
+
+static void filter_set_last_id(struct filter_obd *filter, int group, obd_id id)
+{
+ LASSERT(filter->fo_fsd != NULL);
+ LASSERT(group > 0);
+ LASSERT(group < filter->fo_group_count);
+
+ spin_lock(&filter->fo_objidlock);
+ filter->fo_last_objids[group] = id;
+ spin_unlock(&filter->fo_objidlock);
+}
+
+__u64 filter_last_id(struct filter_obd *filter, int group)
+{
+ obd_id id;
+ LASSERT(filter->fo_fsd != NULL);
+ LASSERT(group > 0);
+ LASSERT(group < filter->fo_group_count);
+
+ spin_lock(&filter->fo_objidlock);
+ id = filter->fo_last_objids[group];
+ spin_unlock(&filter->fo_objidlock);
+
+ return id;
+}
+
+/* direct cut-n-paste of mds_blocking_ast() */
+static int filter_blocking_ast(struct ldlm_lock *lock,
+ struct ldlm_lock_desc *desc,
+ void *data, int flag)
{
int do_ast;
ENTRY;
/* XXX layering violation! -phil */
l_lock(&lock->l_resource->lr_namespace->ns_lock);
/* Get this: if filter_blocking_ast is racing with ldlm_intent_policy,
- * such that mds_blocking_ast is called just before l_i_p takes the
+ * such that filter_blocking_ast is called just before l_i_p takes the
* ns_lock, then by the time we get the lock, we might not be the
* correct blocking function anymore. So check, and return early, if
* so. */
RETURN(0);
}
-static int filter_lock_dentry(struct obd_device *obd, struct dentry *de,
- ldlm_mode_t lock_mode,struct lustre_handle *lockh)
-{
- struct ldlm_res_id res_id = { .name = {0} };
- int flags = 0, rc;
- ENTRY;
+extern void *lock_dir(struct inode *dir, struct qstr *name);
+extern void unlock_dir(struct inode *dir, void *lock);
- res_id.name[0] = de->d_inode->i_ino;
- res_id.name[1] = de->d_inode->i_generation;
- rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, NULL,
- res_id, LDLM_PLAIN, NULL, 0, lock_mode,
- &flags, ldlm_completion_ast,
- filter_blocking_ast, NULL, lockh);
+static void *filter_lock_dentry(struct obd_device *obd,
+ struct dentry *dparent,
+ obd_id id)
+{
+#ifdef S_PDIROPS
+ struct qstr qstr;
+ char name[32];
+ int len;
- RETURN(rc == ELDLM_OK ? 0 : -ENOLCK); /* XXX translate ldlm code */
+ len = sprintf(name, LPU64, id);
+ qstr_assign(&qstr, (char *)name, len);
+ return lock_dir(dparent->d_inode, &qstr);
+#else
+ down(&dparent->d_inode->i_sem);
+#endif
+ return 0;
}
-static void filter_parent_unlock(struct dentry *dparent,
- struct lustre_handle *lockh,
- ldlm_mode_t lock_mode)
+/* We never dget the object parent, so DON'T dput it either */
+static void filter_parent_unlock(struct dentry *dparent, void *lock)
{
- ldlm_lock_decref(lockh, lock_mode);
+#ifdef S_PDIROPS
+ LASSERT(lock != NULL);
+ unlock_dir(dparent->d_inode, lock);
+#else
+ up(&dparent->d_inode->i_sem);
+#endif
}
/* We never dget the object parent, so DON'T dput it either */
-static inline struct dentry *filter_parent(struct obd_device *obd,
- obd_mode mode, obd_id objid)
+struct dentry *filter_parent(struct obd_device *obd, obd_gr group, obd_id objid)
{
struct filter_obd *filter = &obd->u.filter;
+ LASSERT(group < filter->fo_group_count);
+ LASSERT(group > 0);
- LASSERT(S_ISREG(mode)); /* only regular files for now */
- if (!S_ISREG(mode) || filter->fo_subdir_count == 0)
- return filter->fo_dentry_O_mode[(mode & S_IFMT) >> S_SHIFT];
+ if (filter->fo_subdir_count == 0)
+ return filter->fo_groups[group];
- return filter->fo_dentry_O_sub[objid & (filter->fo_subdir_count - 1)];
+ return filter->fo_subdirs[group].dentry[objid & (filter->fo_subdir_count - 1)];
}
/* We never dget the object parent, so DON'T dput it either */
-static inline struct dentry *filter_parent_lock(struct obd_device *obd,
- obd_mode mode, obd_id objid,
- ldlm_mode_t lock_mode,
- struct lustre_handle *lockh)
+struct dentry *filter_parent_lock(struct obd_device *obd, obd_gr group,
+ obd_id objid, void **lock)
{
unsigned long now = jiffies;
- struct dentry *de = filter_parent(obd, mode, objid);
- int rc;
+ struct dentry *dparent = filter_parent(obd, group, objid);
- if (IS_ERR(de))
- return de;
+ if (IS_ERR(dparent))
+ return dparent;
+
+ LASSERT(dparent);
+ LASSERT(dparent->d_inode);
- rc = filter_lock_dentry(obd, de, lock_mode, lockh);
- if (time_after(jiffies, now + 15*HZ))
- CERROR("slow parent lock %lus\n", (jiffies - now) / HZ);
- return rc ? ERR_PTR(rc) : de;
+ *lock = filter_lock_dentry(obd, dparent, objid);
+ fsfilt_check_slow(now, obd_timeout, "parent lock");
+ return dparent;
}
/* How to get files, dentries, inodes from object id's.
* appropriately for this operation (normally a write lock). If
* dir_dentry is NULL, we do a read lock while we do the lookup to
* avoid races with create/destroy and such changing the directory
- * internal to the filesystem code.
- */
-static struct dentry *filter_fid2dentry(struct obd_device *obd,
- struct dentry *dir_dentry,
- obd_mode mode, obd_id id)
+ * internal to the filesystem code. */
+struct dentry *filter_id2dentry(struct obd_device *obd,
+ struct dentry *dir_dentry,
+ obd_gr group, obd_id id)
{
- struct super_block *sb = obd->u.filter.fo_sb;
- struct lustre_handle lockh;
struct dentry *dparent = dir_dentry;
struct dentry *dchild;
+ void *lock = NULL;
char name[32];
int len;
ENTRY;
- if (!sb || !sb->s_dev) {
- CERROR("device not initialized.\n");
- RETURN(ERR_PTR(-ENXIO));
- }
-
if (id == 0) {
CERROR("fatal: invalid object id 0\n");
- LBUG();
RETURN(ERR_PTR(-ESTALE));
}
len = sprintf(name, LPU64, id);
- if (!dir_dentry) {
- dparent = filter_parent_lock(obd, mode, id, LCK_PR, &lockh);
- if (IS_ERR(dparent))
+ if (dir_dentry == NULL) {
+ dparent = filter_parent_lock(obd, group, id, &lock);
+ if (IS_ERR(dparent)) {
+ CERROR("%s: error getting object "LPU64":"LPU64
+ " parent: rc %ld\n", obd->obd_name,
+ id, group, PTR_ERR(dparent));
RETURN(dparent);
+ }
}
- CDEBUG(D_INODE, "looking up object O/%*s/%s\n",
+ CDEBUG(D_INODE, "looking up object O/%.*s/%s\n",
dparent->d_name.len, dparent->d_name.name, name);
- dchild = ll_lookup_one_len(name, dparent, len);
- if (!dir_dentry)
- filter_parent_unlock(dparent, &lockh, LCK_PR);
+ dchild = /*ll_*/lookup_one_len(name, dparent, len);
+ if (dir_dentry == NULL)
+ filter_parent_unlock(dparent, lock);
if (IS_ERR(dchild)) {
- CERROR("child lookup error %ld\n", PTR_ERR(dchild));
+ CERROR("%s: child lookup error %ld\n", obd->obd_name,
+ PTR_ERR(dchild));
RETURN(dchild);
}
+ if (dchild->d_inode != NULL && is_bad_inode(dchild->d_inode)) {
+ CERROR("%s: got bad inode "LPU64"\n", obd->obd_name, id);
+ f_dput(dchild);
+ RETURN(ERR_PTR(-ENOENT));
+ }
+
CDEBUG(D_INODE, "got child objid %s: %p, count = %d\n",
name, dchild, atomic_read(&dchild->d_count));
RETURN(dchild);
}
-static struct file *filter_obj_open(struct obd_export *export,
- __u64 id, __u32 type,
- ldlm_mode_t parent_mode,
- struct lustre_handle *parent_lockh)
+static int filter_prepare_destroy(struct obd_device *obd, obd_id objid,
+ obd_id group)
{
- struct obd_device *obd = export->exp_obd;
- struct filter_obd *filter = &obd->u.filter;
- struct super_block *sb = filter->fo_sb;
- struct dentry *dchild = NULL, *dparent = NULL;
- struct filter_export_data *fed = &export->exp_filter_data;
- struct filter_dentry_data *fdd = NULL;
- struct filter_file_data *ffd = NULL;
- struct obd_run_ctxt saved;
- char name[24];
- struct file *file;
- int len, cleanup_phase = 0;
- ENTRY;
-
- push_ctxt(&saved, &filter->fo_ctxt, NULL);
-
- if (!sb || !sb->s_dev) {
- CERROR("fatal: device not initialized.\n");
- GOTO(cleanup, file = ERR_PTR(-ENXIO));
- }
-
- if (!id) {
- CERROR("fatal: invalid obdo "LPU64"\n", id);
- GOTO(cleanup, file = ERR_PTR(-ESTALE));
- }
-
- if (!(type & S_IFMT)) {
- CERROR("OBD %s, object "LPU64" has bad type: %o\n",
- __FUNCTION__, id, type);
- GOTO(cleanup, file = ERR_PTR(-EINVAL));
- }
-
- ffd = filter_ffd_new();
- if (ffd == NULL) {
- CERROR("obdfilter: out of memory\n");
- GOTO(cleanup, file = ERR_PTR(-ENOMEM));
- }
-
- cleanup_phase = 1;
-
- /* We preallocate this to avoid blocking while holding fo_fddlock */
- OBD_ALLOC(fdd, sizeof *fdd);
- if (fdd == NULL) {
- CERROR("obdfilter: out of memory\n");
- GOTO(cleanup, file = ERR_PTR(-ENOMEM));
- }
-
- cleanup_phase = 2;
-
- dparent = filter_parent_lock(obd, type, id, parent_mode, parent_lockh);
- if (IS_ERR(dparent))
- GOTO(cleanup, file = (void *)dparent);
-
- cleanup_phase = 3;
-
- len = snprintf(name, sizeof(name), LPU64, id);
- dchild = ll_lookup_one_len(name, dparent, len);
- if (IS_ERR(dchild))
- GOTO(cleanup, file = (void *)dchild);
-
- cleanup_phase = 4;
-
- if (dchild->d_inode == NULL) {
- CERROR("opening non-existent object %s - O_CREAT?\n", name);
- file = ERR_PTR(-ENOENT);
- GOTO(cleanup, file);
- }
-
- /* dentry_open does a dput(dchild) and mntput(mnt) on error */
- mntget(filter->fo_vfsmnt);
- file = dentry_open(dchild, filter->fo_vfsmnt, O_RDWR | O_LARGEFILE);
- if (IS_ERR(file)) {
- dchild = NULL; /* prevent a double dput in step 4 */
- CERROR("error opening %s: rc %ld\n", name, PTR_ERR(file));
- GOTO(cleanup, file);
- }
-
- spin_lock(&filter->fo_fddlock);
- if (dchild->d_fsdata) {
- spin_unlock(&filter->fo_fddlock);
- OBD_FREE(fdd, sizeof *fdd);
- fdd = dchild->d_fsdata;
- LASSERT(fdd->fdd_magic == FILTER_DENTRY_MAGIC);
- /* should only happen during client recovery */
- if (fdd->fdd_flags & FILTER_FLAG_DESTROY)
- CDEBUG(D_INODE,"opening destroyed object "LPU64"\n",id);
- atomic_inc(&fdd->fdd_open_count);
- } else {
- atomic_set(&fdd->fdd_open_count, 1);
- fdd->fdd_magic = FILTER_DENTRY_MAGIC;
- fdd->fdd_flags = 0;
- fdd->fdd_objid = id;
- /* If this is racy, then we can use {cmp}xchg and atomic_add */
- dchild->d_fsdata = fdd;
- spin_unlock(&filter->fo_fddlock);
- }
-
- ffd->ffd_file = file;
- LASSERT(file->private_data == NULL);
- file->private_data = ffd;
+ struct lustre_handle lockh;
+ int flags = LDLM_AST_DISCARD_DATA, rc;
+ struct ldlm_res_id res_id = { .name = { objid, 0, group, 0 } };
+ ldlm_policy_data_t policy = { .l_extent = { 0, OBD_OBJECT_EOF } };
- if (!dchild->d_op)
- dchild->d_op = &filter_dops;
- else
- LASSERT(dchild->d_op == &filter_dops);
+ ENTRY;
+ /* Tell the clients that the object is gone now and that they should
+ * throw away any cached pages. */
+ rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, res_id,
+ LDLM_EXTENT, &policy, LCK_PW,
+ &flags, filter_blocking_ast, ldlm_completion_ast,
+ NULL, NULL, NULL, 0, NULL, &lockh);
- spin_lock(&fed->fed_lock);
- list_add(&ffd->ffd_export_list, &fed->fed_open_head);
- spin_unlock(&fed->fed_lock);
+ /* We only care about the side-effects, just drop the lock. */
+ if (rc == ELDLM_OK)
+ ldlm_lock_decref(&lockh, LCK_PW);
- CDEBUG(D_INODE, "opened objid "LPU64": rc = %p\n", id, file);
-cleanup:
- switch (cleanup_phase) {
- case 4:
- if (IS_ERR(file))
- f_dput(dchild);
- case 3:
- if (IS_ERR(file))
- filter_parent_unlock(dparent, parent_lockh,parent_mode);
- case 2:
- if (IS_ERR(file))
- OBD_FREE(fdd, sizeof *fdd);
- case 1:
- if (IS_ERR(file))
- filter_ffd_destroy(ffd);
- filter_ffd_put(ffd);
- case 0:
- pop_ctxt(&saved, &filter->fo_ctxt, NULL);
- }
- RETURN(file);
+ RETURN(rc);
}
/* Caller must hold LCK_PW on parent and push us into kernel context.
- * Caller is also required to ensure that dchild->d_inode exists.
- */
-static int filter_destroy_internal(struct obd_device *obd,
+ * Caller is also required to ensure that dchild->d_inode exists. */
+static int filter_destroy_internal(struct obd_device *obd, obd_id objid,
struct dentry *dparent,
struct dentry *dchild)
{
ENTRY;
if (inode->i_nlink != 1 || atomic_read(&inode->i_count) != 1) {
- CERROR("destroying objid %*s nlink = %d, count = %d\n",
+ CERROR("destroying objid %.*s nlink = %lu, count = %d\n",
dchild->d_name.len, dchild->d_name.name,
- inode->i_nlink, atomic_read(&inode->i_count));
+ (unsigned long)inode->i_nlink,
+ atomic_read(&inode->i_count));
}
rc = vfs_unlink(dparent->d_inode, dchild);
if (rc)
- CERROR("error unlinking objid %*s: rc %d\n",
+ CERROR("error unlinking objid %.*s: rc %d\n",
dchild->d_name.len, dchild->d_name.name, rc);
RETURN(rc);
}
-/* If closing because we are failing this device, then
- don't do the unlink on close.
-*/
-static int filter_close_internal(struct obd_export *exp,
- struct filter_file_data *ffd,
- struct obd_trans_info *oti,
- int failover)
-{
- struct obd_device *obd = exp->exp_obd;
- struct filter_obd *filter = &obd->u.filter;
- struct file *filp = ffd->ffd_file;
- struct dentry *dchild = dget(filp->f_dentry);
- struct filter_dentry_data *fdd = dchild->d_fsdata;
- struct lustre_handle parent_lockh;
- int rc, rc2, cleanup_phase = 0;
- struct dentry *dparent = NULL;
- struct obd_run_ctxt saved;
+static int filter_intent_policy(struct ldlm_namespace *ns,
+ struct ldlm_lock **lockp, void *req_cookie,
+ ldlm_mode_t mode, int flags, void *data)
+{
+ struct list_head rpc_list = LIST_HEAD_INIT(rpc_list);
+ struct ptlrpc_request *req = req_cookie;
+ struct ldlm_lock *lock = *lockp, *l = NULL;
+ struct ldlm_resource *res = lock->l_resource;
+ ldlm_processing_policy policy;
+ struct ost_lvb *res_lvb, *reply_lvb;
+ struct ldlm_reply *rep;
+ struct list_head *tmp;
+ ldlm_error_t err;
+ int tmpflags = 0, rc, repsize[2] = {sizeof(*rep), sizeof(*reply_lvb)};
ENTRY;
- LASSERT(filp->private_data == ffd);
- LASSERT(fdd);
- LASSERT(fdd->fdd_magic == FILTER_DENTRY_MAGIC);
+ policy = ldlm_get_processing_policy(res);
+ LASSERT(policy != NULL);
+ LASSERT(req != NULL);
- rc = filp_close(filp, 0);
+ rc = lustre_pack_reply(req, 2, repsize, NULL);
+ if (rc)
+ RETURN(req->rq_status = rc);
- if (atomic_dec_and_test(&fdd->fdd_open_count) &&
- fdd->fdd_flags & FILTER_FLAG_DESTROY && !failover) {
- void *handle;
+ rep = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*rep));
+ LASSERT(rep != NULL);
- push_ctxt(&saved, &filter->fo_ctxt, NULL);
- cleanup_phase = 1;
+ reply_lvb = lustre_msg_buf(req->rq_repmsg, 1, sizeof(*reply_lvb));
+ LASSERT(reply_lvb != NULL);
- LASSERT(fdd->fdd_objid > 0);
- dparent = filter_parent_lock(obd, S_IFREG, fdd->fdd_objid,
- LCK_PW, &parent_lockh);
- if (IS_ERR(dparent))
- GOTO(cleanup, rc = PTR_ERR(dparent));
- cleanup_phase = 2;
+ //fixup_handle_for_resent_req(req, lock, &lockh);
- handle = fsfilt_start(obd, dparent->d_inode,
- FSFILT_OP_UNLINK);
- if (IS_ERR(handle))
- GOTO(cleanup, rc = PTR_ERR(handle));
+ /* If we grant any lock at all, it will be a whole-file read lock.
+ * Call the extent policy function to see if our request can be
+ * granted, or is blocked. */
+ lock->l_policy_data.l_extent.start = 0;
+ lock->l_policy_data.l_extent.end = OBD_OBJECT_EOF;
+ lock->l_req_mode = LCK_PR;
- /* XXX unlink from PENDING directory now too */
- rc2 = filter_destroy_internal(obd, dparent, dchild);
- if (rc2 && !rc)
- rc = rc2;
- rc = filter_finish_transno(exp, handle, oti, rc);
- rc2 = fsfilt_commit(obd, dparent->d_inode, handle, 0);
- if (rc2) {
- CERROR("error on commit, err = %d\n", rc2);
- if (!rc)
- rc = rc2;
- }
+ l_lock(&res->lr_namespace->ns_lock);
+
+ res->lr_tmp = &rpc_list;
+ rc = policy(lock, &tmpflags, 0, &err);
+ res->lr_tmp = NULL;
+
+ /* FIXME: we should change the policy function slightly, to not make
+ * this list at all, since we just turn around and free it */
+ while (!list_empty(&rpc_list)) {
+ struct ldlm_ast_work *w =
+ list_entry(rpc_list.next, struct ldlm_ast_work, w_list);
+ list_del(&w->w_list);
+ LDLM_LOCK_PUT(w->w_lock);
+ OBD_FREE(w, sizeof(*w));
}
-cleanup:
- switch(cleanup_phase) {
- case 2:
- if (rc || oti == NULL) {
- filter_parent_unlock(dparent, &parent_lockh, LCK_PW);
- } else {
- memcpy(&oti->oti_ack_locks[0].lock, &parent_lockh,
- sizeof(parent_lockh));
- oti->oti_ack_locks[0].mode = LCK_PW;
+ if (rc == LDLM_ITER_CONTINUE) {
+ /* The lock met with no resistance; we're finished. */
+ l_unlock(&res->lr_namespace->ns_lock);
+ RETURN(ELDLM_LOCK_REPLACED);
+ }
+
+ /* Do not grant any lock, but instead send GL callbacks. The extent
+ * policy nicely created a list of all PW locks for us. We will choose
+ * the highest of those which are larger than the size in the LVB, if
+ * any, and perform a glimpse callback. */
+ down(&res->lr_lvb_sem);
+ res_lvb = res->lr_lvb_data;
+ LASSERT(res_lvb != NULL);
+ *reply_lvb = *res_lvb;
+ up(&res->lr_lvb_sem);
+
+ list_for_each(tmp, &res->lr_granted) {
+ struct ldlm_lock *tmplock =
+ list_entry(tmp, struct ldlm_lock, l_res_link);
+
+ if (tmplock->l_granted_mode == LCK_PR)
+ continue;
+
+ if (tmplock->l_policy_data.l_extent.end <= reply_lvb->lvb_size)
+ continue;
+
+ if (l == NULL) {
+ l = LDLM_LOCK_GET(tmplock);
+ continue;
}
- case 1:
- pop_ctxt(&saved, &filter->fo_ctxt, NULL);
- case 0:
- f_dput(dchild);
- filter_ffd_destroy(ffd);
- break;
- default:
- CERROR("invalid cleanup_phase %d\n", cleanup_phase);
- LBUG();
+
+ if (l->l_policy_data.l_extent.start >
+ tmplock->l_policy_data.l_extent.start)
+ continue;
+
+ LDLM_LOCK_PUT(l);
+ l = LDLM_LOCK_GET(tmplock);
+ }
+ l_unlock(&res->lr_namespace->ns_lock);
+
+ /* There were no PW locks beyond the size in the LVB; finished. */
+ if (l == NULL)
+ RETURN(ELDLM_LOCK_ABORTED);
+
+ if (l->l_glimpse_ast == NULL) {
+ /* We are racing with unlink(); just return -ENOENT */
+ rep->lock_policy_res1 = -ENOENT;
+ goto out;
+ }
+
+ LASSERTF(l->l_glimpse_ast != NULL, "l == %p", l);
+
+ rc = l->l_glimpse_ast(l, NULL); /* this will update the LVB */
+ if (rc != 0 && res->lr_namespace->ns_lvbo &&
+ res->lr_namespace->ns_lvbo->lvbo_update) {
+ res->lr_namespace->ns_lvbo->lvbo_update(res, NULL, 0, 1);
}
+ down(&res->lr_lvb_sem);
+ *reply_lvb = *res_lvb;
+ up(&res->lr_lvb_sem);
+out:
+ LDLM_LOCK_PUT(l);
+
+ RETURN(ELDLM_LOCK_ABORTED);
+}
+
+static int filter_post_fs_cleanup(struct obd_device *obd)
+{
+ int rc = 0;
+
+ rc = fsfilt_post_cleanup(obd);
+
RETURN(rc);
}
-/* obd methods */
-/* mount the file system (secretly) */
-static int filter_common_setup(struct obd_device *obd, obd_count len, void *buf,
- char *option)
+#if 0
+static int filter_group_set_fs_flags(struct obd_device *obd, int group)
{
- struct obd_ioctl_data* data = buf;
struct filter_obd *filter = &obd->u.filter;
+ int rc = 0, i = 0;
+ ENTRY;
+
+ /* zero group is not longer valid. */
+ if (group== 0)
+ RETURN(rc);
+ for (i = 0; i < filter->fo_subdir_count; i++) {
+ struct dentry *dentry;
+ dentry = (filter->fo_subdirs + group)->dentry[i];
+ rc = fsfilt_set_fs_flags(obd, dentry->d_inode,
+ SM_DO_REC | SM_DO_COW);
+ if (rc)
+ RETURN(rc);
+ }
+ RETURN(rc);
+}
+#endif
- struct vfsmount *mnt;
+static int filter_post_fs_setup(struct obd_device *obd)
+{
+ struct filter_obd *filter = &obd->u.filter;
int rc = 0;
+
+ rc = fsfilt_post_setup(obd, filter->fo_dentry_O);
+
+ return rc;
+}
+
+/* mount the file system (secretly) */
+int filter_common_setup(struct obd_device *obd, obd_count len, void *buf,
+ char *option)
+{
+ struct lustre_cfg *lcfg = buf;
+ struct filter_obd *filter = &obd->u.filter;
+ struct lvfs_obd_ctxt *lvfs_ctxt = NULL;
+ struct vfsmount *mnt;
+ char *str;
+ char ns_name[48];
+ int rc = 0, i;
ENTRY;
- if (!data->ioc_inlbuf1 || !data->ioc_inlbuf2)
+ if ((LUSTRE_CFG_BUFLEN(lcfg, 1)) < 1 ||
+ (LUSTRE_CFG_BUFLEN(lcfg, 2) < 1))
RETURN(-EINVAL);
- obd->obd_fsops = fsfilt_get_ops(data->ioc_inlbuf2);
+ obd->obd_fsops = fsfilt_get_ops(lustre_cfg_string(lcfg, 2));
if (IS_ERR(obd->obd_fsops))
RETURN(PTR_ERR(obd->obd_fsops));
- mnt = do_kern_mount(data->ioc_inlbuf2, 0, data->ioc_inlbuf1, option);
- rc = PTR_ERR(mnt);
- if (IS_ERR(mnt))
+ rc = lvfs_mount_fs(lustre_cfg_string(lcfg, 1),
+ lustre_cfg_string(lcfg, 2),
+ option, MS_NOATIME | MS_NODIRATIME, &lvfs_ctxt);
+ if (rc) {
+ CERROR("lvfs_mount_fs failed: rc = %d\n", rc);
GOTO(err_ops, rc);
+ }
+ LASSERT(lvfs_ctxt);
+
+ mnt = lvfs_ctxt->loc_mnt;
+ filter->fo_lvfs_ctxt = lvfs_ctxt;
- if (data->ioc_inllen3 > 0 && data->ioc_inlbuf3) {
- if (*data->ioc_inlbuf3 == 'f') {
+ if (LUSTRE_CFG_BUFLEN(lcfg, 3) > 0 && lustre_cfg_buf(lcfg, 3)) {
+ str = lustre_cfg_string(lcfg, 3);
+ if (*str == 'f') {
obd->obd_replayable = 1;
obd_sync_filter = 1;
- CERROR("%s: configured for recovery and sync write\n",
- obd->obd_name);
+ CWARN("%s: recovery enabled\n", obd->obd_name);
} else {
- if (*data->ioc_inlbuf3 != 'n') {
+ if (*str != 'n') {
CERROR("unrecognised flag '%c'\n",
- *data->ioc_inlbuf3);
+ *str);
}
- }
- }
-
- if (data->ioc_inllen4 > 0 && data->ioc_inlbuf4) {
- if (*data->ioc_inlbuf4 == '/') {
- CERROR("filter namespace mount: %s\n",
- data->ioc_inlbuf4);
- filter->fo_nspath = strdup(data->ioc_inlbuf4);
- } else {
- CERROR("namespace mount must be absolute path: '%s'\n",
- data->ioc_inlbuf4);
+ // XXX Robert? Why do we get errors here
+ // GOTO(err_mntput, rc = -EINVAL);
}
}
filter->fo_fstype = mnt->mnt_sb->s_type->name;
CDEBUG(D_SUPER, "%s: mnt = %p\n", filter->fo_fstype, mnt);
- OBD_SET_CTXT_MAGIC(&filter->fo_ctxt);
- filter->fo_ctxt.pwdmnt = mnt;
- filter->fo_ctxt.pwd = mnt->mnt_root;
- filter->fo_ctxt.fs = get_ds();
+ OBD_SET_CTXT_MAGIC(&obd->obd_lvfs_ctxt);
+ obd->obd_lvfs_ctxt.pwdmnt = mnt;
+ obd->obd_lvfs_ctxt.pwd = mnt->mnt_root;
+ obd->obd_lvfs_ctxt.fs = get_ds();
+ obd->obd_lvfs_ctxt.cb_ops = filter_lvfs_ops;
+
+ ll_clear_rdonly(ll_sbdev(filter->fo_sb));
+
+ rc = fsfilt_setup(obd, mnt->mnt_sb);
+ if (rc)
+ GOTO(err_mntput, rc);
+ sema_init(&filter->fo_init_lock, 1);
+ filter->fo_committed_group = 0;
rc = filter_prep(obd);
if (rc)
GOTO(err_mntput, rc);
+ filter->fo_destroys_in_progress = 0;
+ for (i = 0; i < 32; i++)
+ sema_init(&filter->fo_create_locks[i], 1);
+
spin_lock_init(&filter->fo_translock);
- spin_lock_init(&filter->fo_fddlock);
spin_lock_init(&filter->fo_objidlock);
INIT_LIST_HEAD(&filter->fo_export_list);
-
- obd->obd_namespace = ldlm_namespace_new("filter-tgt",
- LDLM_NAMESPACE_SERVER);
- if (!obd->obd_namespace)
+ sema_init(&filter->fo_alloc_lock, 1);
+ spin_lock_init(&filter->fo_r_pages.oh_lock);
+ spin_lock_init(&filter->fo_w_pages.oh_lock);
+ spin_lock_init(&filter->fo_r_discont_pages.oh_lock);
+ spin_lock_init(&filter->fo_w_discont_pages.oh_lock);
+ spin_lock_init(&filter->fo_r_discont_blocks.oh_lock);
+ spin_lock_init(&filter->fo_w_discont_blocks.oh_lock);
+ filter->fo_readcache_max_filesize = FILTER_MAX_CACHE_SIZE;
+
+ INIT_LIST_HEAD(&filter->fo_llog_list);
+ spin_lock_init(&filter->fo_llog_list_lock);
+
+ sprintf(ns_name, "filter-%s", obd->obd_uuid.uuid);
+ obd->obd_namespace = ldlm_namespace_new(ns_name, LDLM_NAMESPACE_SERVER);
+
+ if (obd->obd_namespace == NULL)
GOTO(err_post, rc = -ENOMEM);
+ obd->obd_namespace->ns_lvbp = obd;
+ obd->obd_namespace->ns_lvbo = &filter_lvbo;
+ ldlm_register_intent(obd->obd_namespace, filter_intent_policy);
ptlrpc_init_client(LDLM_CB_REQUEST_PORTAL, LDLM_CB_REPLY_PORTAL,
"filter_ldlm_cb_client", &obd->obd_ldlm_client);
- RETURN(0);
+ rc = obd_llog_cat_initialize(obd, &obd->obd_llogs, 1, CATLIST);
+ if (rc) {
+ CERROR("failed to setup llogging subsystems\n");
+ GOTO(err_post, rc);
+ }
+ RETURN(0);
err_post:
filter_post(obd);
err_mntput:
unlock_kernel();
- mntput(mnt);
+ lvfs_umount_fs(filter->fo_lvfs_ctxt);
filter->fo_sb = 0;
lock_kernel();
err_ops:
return rc;
}
-static int filter_setup(struct obd_device *obd, obd_count len, void *buf)
+static int filter_attach(struct obd_device *obd, obd_count len, void *data)
{
- struct obd_ioctl_data* data = buf;
- char *option = NULL;
+ struct lprocfs_static_vars lvars;
+ int rc;
+
+ lprocfs_init_vars(filter, &lvars);
+ rc = lprocfs_obd_attach(obd, lvars.obd_vars);
+ if (rc != 0)
+ return rc;
+
+ rc = lprocfs_alloc_obd_stats(obd, LPROC_FILTER_LAST);
+ if (rc != 0)
+ return rc;
+
+ /* Init obdfilter private stats here */
+ lprocfs_counter_init(obd->obd_stats, LPROC_FILTER_READ_BYTES,
+ LPROCFS_CNTR_AVGMINMAX, "read_bytes", "bytes");
+ lprocfs_counter_init(obd->obd_stats, LPROC_FILTER_WRITE_BYTES,
+ LPROCFS_CNTR_AVGMINMAX, "write_bytes", "bytes");
- if (!strcmp(data->ioc_inlbuf2, "ext3"))
- option = "asyncdel";
+ return lproc_filter_attach_seqstat(obd);
+}
- return filter_common_setup(obd, len, buf, option);
+static int filter_detach(struct obd_device *dev)
+{
+ lprocfs_free_obd_stats(dev);
+ return lprocfs_obd_detach(dev);
}
-/* sanobd setup methods - use a specific mount option */
-static int filter_san_setup(struct obd_device *obd, obd_count len, void *buf)
+static int filter_setup(struct obd_device *obd, obd_count len, void *buf)
{
- struct obd_ioctl_data* data = buf;
- char *option = NULL;
+ struct filter_obd *filter = &obd->u.filter;
+ struct lustre_cfg *lcfg = buf;
+ unsigned long page;
+ int rc;
+ ENTRY;
- if (!data->ioc_inlbuf2)
- RETURN(-EINVAL);
+ spin_lock_init(&filter->fo_denylist_lock);
+ INIT_LIST_HEAD(&filter->fo_denylist);
- /* for extN/ext3 filesystem, we must mount it with 'writeback' mode */
- if (!strcmp(data->ioc_inlbuf2, "extN"))
- option = "data=writeback";
- else if (!strcmp(data->ioc_inlbuf2, "ext3"))
- option = "data=writeback,asyncdel";
- else
- LBUG(); /* just a reminder */
+ /* 2.6.9 selinux wants a full option page for do_kern_mount (bug6471) */
+ page = get_zeroed_page(GFP_KERNEL);
+ if (!page)
+ RETURN(-ENOMEM);
- return filter_common_setup(obd, len, buf, option);
+ memcpy((void *)page, lustre_cfg_buf(lcfg, 4),
+ LUSTRE_CFG_BUFLEN(lcfg, 4));
+
+ /* all mount options including errors=remount-ro and asyncdel are passed
+ * using 4th lcfg param. And it is good, finally we have got rid of
+ * hardcoded fs types in the code. */
+ rc = filter_common_setup(obd, len, buf, (void *)page);
+ free_page(page);
+
+ if (rc)
+ RETURN(rc);
+ rc = filter_post_fs_setup(obd);
+ RETURN(rc);
}
-static int filter_cleanup(struct obd_device *obd, int force, int failover)
+static int filter_cleanup(struct obd_device *obd, int flags)
{
- struct super_block *sb;
+ struct filter_obd *filter = &obd->u.filter;
+ ll_sbdev_type save_dev;
ENTRY;
- if (failover)
+ if (flags & OBD_OPT_FAILOVER)
CERROR("%s: shutting down for failover; client state will"
" be preserved.\n", obd->obd_name);
if (!list_empty(&obd->obd_exports)) {
CERROR("%s: still has clients!\n", obd->obd_name);
- class_disconnect_exports(obd, failover);
+ class_disconnect_exports(obd, flags);
if (!list_empty(&obd->obd_exports)) {
CERROR("still has exports after forced cleanup?\n");
RETURN(-EBUSY);
}
}
- ldlm_namespace_free(obd->obd_namespace);
+ target_cleanup_recovery(obd);
+
+ ldlm_namespace_free(obd->obd_namespace, flags & OBD_OPT_FORCE);
- sb = obd->u.filter.fo_sb;
- if (!sb)
+ if (filter->fo_sb == NULL)
RETURN(0);
+ save_dev = ll_sbdev(filter->fo_sb);
+ filter_post_fs_cleanup(obd);
filter_post(obd);
- shrink_dcache_parent(sb->s_root);
- unlock_kernel();
+ shrink_dcache_parent(filter->fo_sb->s_root);
+ filter->fo_sb = 0;
- if (atomic_read(&obd->u.filter.fo_vfsmnt->mnt_count) > 1){
- CERROR("%s: mount point busy, mnt_count: %d\n", obd->obd_name,
- atomic_read(&obd->u.filter.fo_vfsmnt->mnt_count));
+ spin_lock(&filter->fo_denylist_lock);
+ while (!list_empty(&filter->fo_denylist)) {
+ deny_sec_t *p_deny_sec = list_entry(filter->fo_denylist.next,
+ deny_sec_t, list);
+ list_del(&p_deny_sec->list);
+ OBD_FREE(p_deny_sec, sizeof(*p_deny_sec));
}
+ spin_unlock(&filter->fo_denylist_lock);
- mntput(obd->u.filter.fo_vfsmnt);
- obd->u.filter.fo_sb = 0;
-/* destroy_buffers(obd->u.filter.fo_sb->s_dev);*/
-
+ unlock_kernel();
+ lvfs_umount_fs(filter->fo_lvfs_ctxt);
+ //destroy_buffers(filter->fo_sb->s_dev);
+ filter->fo_sb = NULL;
fsfilt_put_ops(obd->obd_fsops);
lock_kernel();
+ ll_clear_rdonly(save_dev);
+
RETURN(0);
}
-int filter_attach(struct obd_device *obd, obd_count len, void *data)
+static int filter_process_config(struct obd_device *obd, obd_count len, void *buf)
{
- struct lprocfs_static_vars lvars;
- int rc;
-
- lprocfs_init_vars(&lvars);
- rc = lprocfs_obd_attach(obd, lvars.obd_vars);
- if (rc != 0)
- return rc;
+ struct lustre_cfg *lcfg = buf;
+ struct filter_obd *filter = &obd->u.filter;
+ int rc = 0;
+ ENTRY;
- rc = lprocfs_alloc_obd_stats(obd, LPROC_FILTER_LAST);
- if (rc != 0)
- return rc;
+ switch(lcfg->lcfg_command) {
+ case LCFG_SET_SECURITY: {
+ if ((LUSTRE_CFG_BUFLEN(lcfg, 1) == 0) ||
+ (LUSTRE_CFG_BUFLEN(lcfg, 2) == 0))
+ GOTO(out, rc = -EINVAL);
+
+ if (!strcmp(lustre_cfg_string(lcfg, 1), "deny_sec")){
+ spin_lock(&filter->fo_denylist_lock);
+ rc = add_deny_security(lustre_cfg_string(lcfg, 2),
+ &filter->fo_denylist);
+ spin_unlock(&filter->fo_denylist_lock);
+ }else {
+ CERROR("Unrecognized key\n");
+ rc = -EINVAL;
+ }
+ break;
+ }
+ default: {
+ CERROR("Unknown command: %d\n", lcfg->lcfg_command);
+ GOTO(out, rc = -EINVAL);
- /* Init obdfilter private stats here */
- lprocfs_counter_init(obd->obd_stats, LPROC_FILTER_READ_BYTES,
- LPROCFS_CNTR_AVGMINMAX, "read_bytes", "bytes");
- lprocfs_counter_init(obd->obd_stats, LPROC_FILTER_WRITE_BYTES,
- LPROCFS_CNTR_AVGMINMAX, "write_bytes", "bytes");
- return rc;
+ }
+ }
+out:
+ RETURN(rc);
}
-int filter_detach(struct obd_device *dev)
+static int filter_connect_post(struct obd_export *exp, unsigned initial,
+ unsigned long connect_flags)
{
- lprocfs_free_obd_stats(dev);
- return lprocfs_obd_detach(dev);
+ struct obd_device *obd = exp->exp_obd;
+ struct filter_export_data *fed;
+ char str[PTL_NALFMT_SIZE];
+ struct obd_llogs *llog;
+ struct llog_ctxt *ctxt;
+ int rc = 0;
+ ENTRY;
+
+ fed = &exp->exp_filter_data;
+ if (fed->fed_group < FILTER_MIN_GROUPS)
+ RETURN(0);
+
+ /* initialize llogs for connections from MDS */
+ llog = filter_grab_llog_for_group(obd, fed->fed_group, exp);
+ LASSERT(llog != NULL);
+
+ ctxt = llog_get_context(llog, LLOG_UNLINK_REPL_CTXT);
+ LASSERT(ctxt != NULL);
+
+ rc = llog_receptor_accept(ctxt, exp->exp_imp_reverse);
+
+ portals_nid2str(exp->exp_connection->c_peer.peer_ni->pni_number,
+ exp->exp_connection->c_peer.peer_id.nid, str);
+
+ CDEBUG(D_OTHER, "%s: init llog ctxt for export "LPX64"/%s, group %d\n",
+ obd->obd_name, exp->exp_connection->c_peer.peer_id.nid,
+ str, fed->fed_group);
+
+ RETURN(rc);
}
/* nearly identical to mds_connect */
static int filter_connect(struct lustre_handle *conn, struct obd_device *obd,
- struct obd_uuid *cluuid)
+ struct obd_uuid *cluuid,
+ struct obd_connect_data *data,
+ unsigned long connect_flags)
{
struct obd_export *exp;
struct filter_export_data *fed;
- struct filter_client_data *fcd;
+ struct filter_client_data *fcd = NULL;
struct filter_obd *filter = &obd->u.filter;
- int rc;
-
+ struct lvfs_run_ctxt saved;
+ int rc, group;
ENTRY;
- if (!conn || !obd || !cluuid)
+ if (conn == NULL || obd == NULL || cluuid == NULL)
RETURN(-EINVAL);
rc = class_connect(conn, obd, cluuid);
if (rc)
RETURN(rc);
exp = class_conn2export(conn);
- LASSERT(exp);
+ LASSERT(exp != NULL);
fed = &exp->exp_filter_data;
- class_export_put(exp);
- INIT_LIST_HEAD(&fed->fed_open_head);
spin_lock_init(&fed->fed_lock);
- if (!obd->obd_replayable)
- RETURN(0);
+ /* connection from MDS */
+ group = connect_flags;
+ if (obd->obd_replayable) {
+ OBD_ALLOC(fcd, sizeof(*fcd));
+ if (!fcd) {
+ CERROR("filter: out of memory for client data\n");
+ GOTO(cleanup, rc = -ENOMEM);
+ }
- OBD_ALLOC(fcd, sizeof(*fcd));
- if (!fcd) {
- CERROR("filter: out of memory for client data\n");
- GOTO(out_export, rc = -ENOMEM);
+ memcpy(fcd->fcd_uuid, cluuid, sizeof(fcd->fcd_uuid));
+ fed->fed_fcd = fcd;
+ fed->fed_fcd->fcd_group = group;
+ rc = filter_client_add(obd, filter, fed, -1);
+ if (rc)
+ GOTO(cleanup, rc);
+ }
+ CWARN("%s: Received MDS connection ("LPX64"); group %d\n",
+ obd->obd_name, exp->exp_handle.h_cookie, group);
+
+ if (group == 0)
+ GOTO(cleanup, rc);
+
+ if (fed->fed_group != 0 && fed->fed_group != group) {
+ char str[PTL_NALFMT_SIZE];
+ portals_nid2str(exp->exp_connection->c_peer.peer_ni->pni_number,
+ exp->exp_connection->c_peer.peer_id.nid, str);
+ CERROR("!!! This export (nid "LPX64"/%s) used object group %d "
+ "earlier; now it's trying to use group %d! This could "
+ "be a bug in the MDS. Tell CFS.\n",
+ exp->exp_connection->c_peer.peer_id.nid, str,
+ fed->fed_group, group);
+ GOTO(cleanup, rc = -EPROTO);
+ }
+ fed->fed_group = group;
+
+ push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
+ rc = filter_read_groups(obd, group, 1);
+ pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
+ if (rc != 0) {
+ CERROR("can't read group %u\n", group);
+ GOTO(cleanup, rc);
+ }
+#if 0
+ rc = filter_group_set_fs_flags(obd, group);
+ if (rc != 0) {
+ CERROR("can't set kml flags %u\n", group);
+ GOTO(cleanup, rc);
+ }
+#endif
+cleanup:
+ if (rc) {
+ if (fcd)
+ OBD_FREE(fcd, sizeof(*fcd));
+ class_disconnect(exp, 0);
+ } else {
+ class_export_put(exp);
}
+ return rc;
+}
+
+static int filter_precleanup(struct obd_device *obd, int flags)
+{
+ struct filter_group_llog *log;
+ struct filter_obd *filter;
+ int rc = 0;
+ ENTRY;
- memcpy(fcd->fcd_uuid, cluuid, sizeof(fcd->fcd_uuid));
- fed->fed_fcd = fcd;
- fcd->fcd_mount_count = cpu_to_le64(filter->fo_fsd->fsd_mount_count);
+ filter = &obd->u.filter;
- rc = filter_client_add(obd, filter, fed, -1);
- if (rc)
- GOTO(out_fcd, rc);
+ spin_lock(&filter->fo_llog_list_lock);
+ while (!list_empty(&filter->fo_llog_list)) {
+ log = list_entry(filter->fo_llog_list.next,
+ struct filter_group_llog, list);
+ list_del(&log->list);
+ spin_unlock(&filter->fo_llog_list_lock);
- RETURN(rc);
+ rc = obd_llog_finish(obd, log->llogs, 0);
+ if (rc)
+ CERROR("failed to cleanup llogging subsystem for %u\n",
+ log->group);
+ OBD_FREE(log->llogs, sizeof(*(log->llogs)));
+ OBD_FREE(log, sizeof(*log));
+ spin_lock(&filter->fo_llog_list_lock);
+ }
+ spin_unlock(&filter->fo_llog_list_lock);
-out_fcd:
- OBD_FREE(fcd, sizeof(*fcd));
-out_export:
- class_disconnect(conn, 0);
+ rc = obd_llog_finish(obd, &obd->obd_llogs, 0);
+ if (rc)
+ CERROR("failed to cleanup llogging subsystem\n");
RETURN(rc);
}
-static void filter_destroy_export(struct obd_export *exp)
+/* Do extra sanity checks for grant accounting. We do this at connect,
+ * disconnect, and statfs RPC time, so it shouldn't be too bad. We can
+ * always get rid of it or turn it off when we know accounting is good. */
+static void filter_grant_sanity_check(struct obd_device *obd, const char *func)
{
- struct filter_export_data *fed = &exp->exp_filter_data;
+ struct filter_export_data *fed;
+ struct obd_export *exp;
+ obd_size maxsize = obd->obd_osfs.os_blocks * obd->obd_osfs.os_bsize;
+ obd_size tot_dirty = 0, tot_pending = 0, tot_granted = 0;
+ obd_size fo_tot_dirty, fo_tot_pending, fo_tot_granted;
+ int level = D_CACHE;
- ENTRY;
- spin_lock(&fed->fed_lock);
- while (!list_empty(&fed->fed_open_head)) {
- struct filter_file_data *ffd;
+ if (list_empty(&obd->obd_exports))
+ return;
- ffd = list_entry(fed->fed_open_head.next, typeof(*ffd),
- ffd_export_list);
- list_del(&ffd->ffd_export_list);
- spin_unlock(&fed->fed_lock);
+ spin_lock(&obd->obd_osfs_lock);
+ spin_lock(&obd->obd_dev_lock);
+ list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain) {
+ fed = &exp->exp_filter_data;
+ if (fed->fed_grant < 0 || fed->fed_pending < 0 ||
+ fed->fed_dirty < 0)
+ level = D_ERROR;
+ if (maxsize > 0) { /* we may not have done a statfs yet */
+ LASSERTF(fed->fed_grant + fed->fed_pending <= maxsize,
+ "cli %s/%p %ld+%ld > "LPU64"\n",
+ exp->exp_client_uuid.uuid, exp,
+ fed->fed_grant, fed->fed_pending, maxsize);
+ LASSERTF(fed->fed_dirty <= maxsize,
+ "cli %s/%p %ld > "LPU64"\n",
+ exp->exp_client_uuid.uuid, exp,
+ fed->fed_dirty, maxsize);
+ }
+ CDEBUG(level, "%s: cli %s/%p dirty %ld pend %ld grant %ld\n",
+ obd->obd_name, exp->exp_client_uuid.uuid, exp,
+ fed->fed_dirty, fed->fed_pending, fed->fed_grant);
+ tot_granted += fed->fed_grant + fed->fed_pending;
+ tot_pending += fed->fed_pending;
+ tot_dirty += fed->fed_dirty;
+ }
+ fo_tot_granted = obd->u.filter.fo_tot_granted;
+ fo_tot_pending = obd->u.filter.fo_tot_pending;
+ fo_tot_dirty = obd->u.filter.fo_tot_dirty;
+ spin_unlock(&obd->obd_dev_lock);
+ spin_unlock(&obd->obd_osfs_lock);
+
+ /* Do these assertions outside the spinlocks so we don't kill system */
+ if (tot_granted != fo_tot_granted)
+ CERROR("%s: tot_granted "LPU64" != fo_tot_granted "LPU64"\n",
+ func, tot_granted, fo_tot_granted);
+ if (tot_pending != fo_tot_pending)
+ CERROR("%s: tot_pending "LPU64" != fo_tot_pending "LPU64"\n",
+ func, tot_pending, fo_tot_pending);
+ if (tot_dirty != fo_tot_dirty)
+ CERROR("%s: tot_dirty "LPU64" != fo_tot_dirty "LPU64"\n",
+ func, tot_dirty, fo_tot_dirty);
+ if (tot_pending > tot_granted)
+ CERROR("%s: tot_pending "LPU64" > tot_granted "LPU64"\n",
+ func, tot_pending, tot_granted);
+ if (tot_granted > maxsize)
+ CERROR("%s: tot_granted "LPU64" > maxsize "LPU64"\n",
+ func, tot_granted, maxsize);
+ if (tot_dirty > maxsize)
+ CERROR("%s: tot_dirty "LPU64" > maxsize "LPU64"\n",
+ func, tot_dirty, maxsize);
+}
+
+/* Remove this client from the grant accounting totals. We also remove
+ * the export from the obd device under the osfs and dev locks to ensure
+ * that the filter_grant_sanity_check() calculations are always valid.
+ * The client should do something similar when it invalidates its import. */
+static void filter_grant_discard(struct obd_export *exp)
+{
+ struct obd_device *obd = exp->exp_obd;
+ struct filter_obd *filter = &obd->u.filter;
+ struct filter_export_data *fed = &exp->exp_filter_data;
+ int level = D_CACHE;
+
+ spin_lock(&obd->obd_osfs_lock);
+ spin_lock(&exp->exp_obd->obd_dev_lock);
+ list_del_init(&exp->exp_obd_chain);
+ spin_unlock(&exp->exp_obd->obd_dev_lock);
+
+ if (fed->fed_dirty < 0 || fed->fed_grant < 0 || fed->fed_pending < 0)
+ level = D_ERROR;
+ CDEBUG(level, "%s: cli %s/%p dirty %ld pend %ld grant %ld\n",
+ obd->obd_name, exp->exp_client_uuid.uuid, exp,
+ fed->fed_dirty, fed->fed_pending, fed->fed_grant);
+
+ LASSERTF(filter->fo_tot_granted >= fed->fed_grant,
+ "%s: tot_granted "LPU64" cli %s/%p fed_grant %ld\n",
+ obd->obd_name, filter->fo_tot_granted,
+ exp->exp_client_uuid.uuid, exp, fed->fed_grant);
+ filter->fo_tot_granted -= fed->fed_grant;
+ LASSERTF(filter->fo_tot_pending >= fed->fed_pending,
+ "%s: tot_pending "LPU64" cli %s/%p fed_pending %ld\n",
+ obd->obd_name, filter->fo_tot_pending,
+ exp->exp_client_uuid.uuid, exp, fed->fed_pending);
+ LASSERTF(filter->fo_tot_dirty >= fed->fed_dirty,
+ "%s: tot_dirty "LPU64" cli %s/%p fed_dirty %ld\n",
+ obd->obd_name, filter->fo_tot_dirty,
+ exp->exp_client_uuid.uuid, exp, fed->fed_dirty);
+ filter->fo_tot_dirty -= fed->fed_dirty;
+ fed->fed_dirty = 0;
+ fed->fed_grant = 0;
+
+ spin_unlock(&obd->obd_osfs_lock);
+}
+
+static int filter_destroy_export(struct obd_export *exp)
+{
+ ENTRY;
- CERROR("force close file %*s (hdl %p:"LPX64") on disconnect\n",
- ffd->ffd_file->f_dentry->d_name.len,
- ffd->ffd_file->f_dentry->d_name.name,
- ffd, ffd->ffd_handle.h_cookie);
+ if (exp->exp_filter_data.fed_pending)
+ CERROR("%s: cli %s/%p has %lu pending on destroyed export\n",
+ exp->exp_obd->obd_name, exp->exp_client_uuid.uuid,
+ exp, exp->exp_filter_data.fed_pending);
- filter_close_internal(exp, ffd, NULL, exp->exp_failover);
- spin_lock(&fed->fed_lock);
- }
- spin_unlock(&fed->fed_lock);
+ target_destroy_export(exp);
if (exp->exp_obd->obd_replayable)
- filter_client_free(exp, exp->exp_failover);
- EXIT;
+ filter_client_free(exp, exp->exp_flags);
+
+ filter_grant_discard(exp);
+ if (!(exp->exp_flags & OBD_OPT_FORCE))
+ filter_grant_sanity_check(exp->exp_obd, __FUNCTION__);
+
+ RETURN(0);
}
-/* also incredibly similar to mds_disconnect */
-static int filter_disconnect(struct lustre_handle *conn, int failover)
+static void filter_sync_llogs(struct obd_device *obd, struct obd_export *dexp)
{
- struct obd_export *exp = class_conn2export(conn);
- int rc;
- unsigned long flags;
+ struct filter_group_llog *fglog, *nlog;
+ struct filter_obd *filter;
+ int worked = 0, group;
+ struct llog_ctxt *ctxt;
ENTRY;
- LASSERT(exp);
- ldlm_cancel_locks_for_export(exp);
+ filter = &obd->u.filter;
+
+ /* we can't sync log holding spinlock. also, we do not want to get
+ * into livelock. so we do following: loop over MDS's exports in
+ * group order and skip already synced llogs -bzzz */
+ do {
+ /* look for group with min. number, but > worked */
+ fglog = NULL;
+ group = 1 << 30;
+ spin_lock(&filter->fo_llog_list_lock);
+ list_for_each_entry(nlog, &filter->fo_llog_list, list) {
+
+ if (nlog->group <= worked) {
+ /* this group is already synced */
+ continue;
+ }
+
+ if (group < nlog->group) {
+ /* we have group with smaller number to sync */
+ continue;
+ }
- spin_lock_irqsave(&exp->exp_lock, flags);
- exp->exp_failover = failover;
- spin_unlock_irqrestore(&exp->exp_lock, flags);
+ /* store current minimal group */
+ fglog = nlog;
+ group = nlog->group;
+ }
+ spin_unlock(&filter->fo_llog_list_lock);
- rc = class_disconnect(conn, failover);
+ if (fglog == NULL)
+ break;
- fsfilt_sync(exp->exp_obd, exp->exp_obd->u.filter.fo_sb);
- class_export_put(exp);
- /* XXX cleanup preallocated inodes */
- RETURN(rc);
+ worked = fglog->group;
+ if (fglog->exp && (dexp == fglog->exp || dexp == NULL)) {
+ ctxt = llog_get_context(fglog->llogs,
+ LLOG_UNLINK_REPL_CTXT);
+ LASSERT(ctxt != NULL);
+ llog_sync(ctxt, fglog->exp);
+ }
+ } while (fglog != NULL);
}
-static void filter_from_inode(struct obdo *oa, struct inode *inode, int valid)
+/* also incredibly similar to mds_disconnect */
+static int filter_disconnect(struct obd_export *exp, unsigned long flags)
{
- int type = oa->o_mode & S_IFMT;
+ struct obd_device *obd = exp->exp_obd;
+ unsigned long irqflags;
+ int rc;
ENTRY;
- CDEBUG(D_INFO, "src inode %lu (%p), dst obdo "LPU64" valid 0x%08x\n",
- inode->i_ino, inode, oa->o_id, valid);
- /* Don't copy the inode number in place of the object ID */
- obdo_from_inode(oa, inode, valid);
- oa->o_mode &= ~S_IFMT;
- oa->o_mode |= type;
+ LASSERT(exp);
+ class_export_get(exp);
- if (S_ISCHR(inode->i_mode) || S_ISBLK(inode->i_mode)) {
- obd_rdev rdev = kdev_t_to_nr(inode->i_rdev);
- oa->o_rdev = rdev;
- oa->o_valid |= OBD_MD_FLRDEV;
- }
+ spin_lock_irqsave(&exp->exp_lock, irqflags);
+ exp->exp_flags = flags;
+ spin_unlock_irqrestore(&exp->exp_lock, irqflags);
- EXIT;
+ if (!(flags & OBD_OPT_FORCE))
+ filter_grant_sanity_check(obd, __FUNCTION__);
+ filter_grant_discard(exp);
+
+ /* Disconnect early so that clients can't keep using export */
+ rc = class_disconnect(exp, flags);
+
+ ldlm_cancel_locks_for_export(exp);
+
+ fsfilt_sync(obd, obd->u.filter.fo_sb);
+
+ /* flush any remaining cancel messages out to the target */
+ filter_sync_llogs(obd, exp);
+ class_export_put(exp);
+ RETURN(rc);
}
-static struct dentry *__filter_oa2dentry(struct lustre_handle *conn,
- struct obdo *oa, char *what)
+struct dentry *__filter_oa2dentry(struct obd_device *obd,
+ struct obdo *oa, const char *what)
{
struct dentry *dchild = NULL;
+ obd_gr group = 0;
+ ENTRY;
- if (oa->o_valid & OBD_MD_FLHANDLE) {
- struct lustre_handle *ost_handle = obdo_handle(oa);
- struct filter_file_data *ffd = filter_handle2ffd(ost_handle);
-
- if (ffd != NULL) {
- struct filter_dentry_data *fdd;
- dchild = dget(ffd->ffd_file->f_dentry);
- fdd = dchild->d_fsdata;
- LASSERT(fdd->fdd_magic == FILTER_DENTRY_MAGIC);
- filter_ffd_put(ffd);
-
- CDEBUG(D_INODE,
- "got child objid %*s: %p, count = %d\n",
- dchild->d_name.len, dchild->d_name.name,
- dchild, atomic_read(&dchild->d_count));
- }
- }
-
- if (!dchild) {
- struct obd_device *obd = class_conn2obd(conn);
+ if (oa->o_valid & OBD_MD_FLGROUP)
+ group = oa->o_gr;
- if (!obd) {
- CERROR("invalid client cookie "LPX64"\n", conn->cookie);
- RETURN(ERR_PTR(-EINVAL));
- }
- dchild = filter_fid2dentry(obd, NULL, oa->o_mode, oa->o_id);
- }
+ dchild = filter_id2dentry(obd, NULL, group, oa->o_id);
if (IS_ERR(dchild)) {
- CERROR("%s error looking up object: "LPU64"\n", what, oa->o_id);
+ CERROR("%s error looking up object: "LPU64"\n",
+ what, oa->o_id);
RETURN(dchild);
}
- if (!dchild->d_inode) {
- CERROR("%s on non-existent object: "LPU64"\n", what, oa->o_id);
+ if (dchild->d_inode == NULL) {
+ CDEBUG(D_INFO, "%s: %s on non-existent object: "
+ LPU64"\n", obd->obd_name, what, oa->o_id);
f_dput(dchild);
RETURN(ERR_PTR(-ENOENT));
}
- return dchild;
+ RETURN(dchild);
}
-#define filter_oa2dentry(conn, oa) __filter_oa2dentry(conn, oa, __FUNCTION__)
-
-static int filter_getattr(struct lustre_handle *conn, struct obdo *oa,
+static int filter_getattr(struct obd_export *exp, struct obdo *oa,
struct lov_stripe_md *md)
{
struct dentry *dentry = NULL;
+ struct obd_device *obd;
int rc = 0;
ENTRY;
- dentry = filter_oa2dentry(conn, oa);
+ obd = class_exp2obd(exp);
+ if (obd == NULL) {
+ CDEBUG(D_IOCTL, "invalid client cookie "LPX64"\n",
+ exp->exp_handle.h_cookie);
+ RETURN(-EINVAL);
+ }
+
+ dentry = filter_oa2dentry(obd, oa);
if (IS_ERR(dentry))
RETURN(PTR_ERR(dentry));
- filter_from_inode(oa, dentry->d_inode, oa->o_valid);
+ /* Limit the valid bits in the return data to what we actually use */
+ oa->o_valid = OBD_MD_FLID;
+ obdo_from_inode(oa, dentry->d_inode, FILTER_VALID_FLAGS);
f_dput(dentry);
RETURN(rc);
}
-/* this is called from filter_truncate() until we have filter_punch() */
-static int filter_setattr(struct lustre_handle *conn, struct obdo *oa,
- struct lov_stripe_md *md, struct obd_trans_info *oti)
+int filter_setattr_internal(struct obd_export *exp, struct dentry *dentry,
+ struct obdo *oa, struct obd_trans_info *oti)
{
- struct obd_run_ctxt saved;
- struct obd_export *export = class_conn2export(conn);
- struct obd_device *obd = class_conn2obd(conn);
- struct filter_obd *filter = &obd->u.filter;
- struct dentry *dentry;
+ struct filter_obd *filter;
struct iattr iattr;
- struct inode *inode;
- void * handle;
- int rc, rc2;
+ void *handle;
+ int rc, err;
ENTRY;
- dentry = filter_oa2dentry(conn, oa);
-
- if (IS_ERR(dentry))
- GOTO(out_exp, rc = PTR_ERR(dentry));
-
+ LASSERT(dentry != NULL);
+ LASSERT(!IS_ERR(dentry));
+ LASSERT(dentry->d_inode != NULL);
+
+ filter = &exp->exp_obd->u.filter;
iattr_from_obdo(&iattr, oa, oa->o_valid);
- iattr.ia_mode = (iattr.ia_mode & ~S_IFMT) | S_IFREG;
- inode = dentry->d_inode;
- push_ctxt(&saved, &filter->fo_ctxt, NULL);
- lock_kernel();
if (iattr.ia_valid & ATTR_SIZE)
- down(&inode->i_sem);
-
- handle = fsfilt_start(obd, dentry->d_inode, FSFILT_OP_SETATTR);
+ down(&dentry->d_inode->i_sem);
+ handle = fsfilt_start(exp->exp_obd, dentry->d_inode,
+ FSFILT_OP_SETATTR, oti);
if (IS_ERR(handle))
GOTO(out_unlock, rc = PTR_ERR(handle));
- rc = fsfilt_setattr(obd, dentry, handle, &iattr, 1);
- rc = filter_finish_transno(export, handle, oti, rc);
- rc2 = fsfilt_commit(obd, dentry->d_inode, handle, 0);
- if (rc2) {
- CERROR("error on commit, err = %d\n", rc2);
+ /* XXX this could be a rwsem instead, if filter_preprw played along */
+ if (iattr.ia_valid & ATTR_ATTR_FLAG)
+ rc = fsfilt_iocontrol(exp->exp_obd, dentry->d_inode,
+ NULL, EXT3_IOC_SETFLAGS,
+ (long)&iattr.ia_attr_flags);
+ else
+ rc = fsfilt_setattr(exp->exp_obd, dentry, handle,
+ &iattr, 1);
+
+ rc = filter_finish_transno(exp, oti, rc);
+
+ err = fsfilt_commit(exp->exp_obd, filter->fo_sb,
+ dentry->d_inode, handle,
+ exp->exp_sync);
+ if (err) {
+ CERROR("error on commit, err = %d\n", err);
if (!rc)
- rc = rc2;
+ rc = err;
}
+ EXIT;
+out_unlock:
+ if (iattr.ia_valid & ATTR_SIZE)
+ up(&dentry->d_inode->i_sem);
+ return rc;
+}
+
+/* this is called from filter_truncate() until we have filter_punch() */
+int filter_setattr(struct obd_export *exp, struct obdo *oa,
+ struct lov_stripe_md *md, struct obd_trans_info *oti)
+{
+ struct ldlm_res_id res_id = { .name = { oa->o_id, 0, oa->o_gr, 0 } };
+ struct ldlm_valblock_ops *ns_lvbo;
+ struct lvfs_run_ctxt saved;
+ struct filter_obd *filter;
+ struct ldlm_resource *res;
+ struct dentry *dentry;
+ int rc;
+ ENTRY;
+
+ LASSERT(oti != NULL);
+
+ filter = &exp->exp_obd->u.filter;
+ push_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL);
+
+ /* make sure that object is allocated. */
+ dentry = filter_crow_object(exp->exp_obd,
+ oa->o_gr, oa->o_id);
+ if (IS_ERR(dentry))
+ GOTO(out_pop, rc = PTR_ERR(dentry));
+
+ lock_kernel();
+
+ /* setting objects attributes (including owner/group) */
+ rc = filter_setattr_internal(exp, dentry, oa, oti);
+ if (rc)
+ GOTO(out_unlock, rc);
- if (iattr.ia_valid & ATTR_SIZE) {
- up(&inode->i_sem);
- oa->o_valid = OBD_MD_FLBLOCKS | OBD_MD_FLCTIME | OBD_MD_FLMTIME;
- obdo_from_inode(oa, inode, oa->o_valid);
+ res = ldlm_resource_get(exp->exp_obd->obd_namespace, NULL,
+ res_id, LDLM_EXTENT, 0);
+
+ if (res != NULL) {
+ ns_lvbo = res->lr_namespace->ns_lvbo;
+ if (ns_lvbo && ns_lvbo->lvbo_update)
+ rc = ns_lvbo->lvbo_update(res, NULL, 0, 0);
+ ldlm_resource_putref(res);
}
+
+ oa->o_valid = OBD_MD_FLID;
+ obdo_from_inode(oa, dentry->d_inode, FILTER_VALID_FLAGS);
+ EXIT;
out_unlock:
unlock_kernel();
- pop_ctxt(&saved, &filter->fo_ctxt, NULL);
-
f_dput(dentry);
- out_exp:
- class_export_put(export);
- RETURN(rc);
+out_pop:
+ pop_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL);
+ return rc;
}
-static int filter_open(struct lustre_handle *conn, struct obdo *oa,
- struct lov_stripe_md *ea, struct obd_trans_info *oti,
- struct obd_client_handle *och)
+/* XXX identical to osc_unpackmd */
+static int filter_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
+ struct lov_mds_md *lmm, int lmm_bytes)
{
- struct obd_export *export = NULL;
- struct lustre_handle *handle;
- struct filter_file_data *ffd;
- struct file *filp;
- struct lustre_handle parent_lockh;
- int rc = 0;
+ int lsm_size;
ENTRY;
- export = class_conn2export(conn);
- if (!export) {
- CDEBUG(D_IOCTL, "invalid client cookie "LPX64"\n",
- conn->cookie);
- GOTO(out, rc = -EINVAL);
+ if (lmm != NULL) {
+ if (lmm_bytes < sizeof (*lmm)) {
+ CERROR("lov_mds_md too small: %d, need %d\n",
+ lmm_bytes, (int)sizeof(*lmm));
+ RETURN(-EINVAL);
+ }
+ /* XXX LOV_MAGIC etc check? */
+
+ if (lmm->lmm_object_id == cpu_to_le64(0)) {
+ CERROR("lov_mds_md: zero lmm_object_id\n");
+ RETURN(-EINVAL);
+ }
}
- filp = filter_obj_open(export, oa->o_id, oa->o_mode,
- LCK_PR, &parent_lockh);
- if (IS_ERR(filp))
- GOTO(out, rc = PTR_ERR(filp));
+ lsm_size = lov_stripe_md_size(1);
+ if (lsmp == NULL)
+ RETURN(lsm_size);
+
+ if (*lsmp != NULL && lmm == NULL) {
+ OBD_FREE(*lsmp, lsm_size);
+ *lsmp = NULL;
+ RETURN(0);
+ }
- filter_from_inode(oa, filp->f_dentry->d_inode, oa->o_valid);
+ if (*lsmp == NULL) {
+ OBD_ALLOC(*lsmp, lsm_size);
+ if (*lsmp == NULL)
+ RETURN(-ENOMEM);
- ffd = filp->private_data;
- handle = obdo_handle(oa);
- handle->cookie = ffd->ffd_handle.h_cookie;
- oa->o_valid |= OBD_MD_FLHANDLE;
+ loi_init((*lsmp)->lsm_oinfo);
+ }
-out:
- class_export_put(export);
- if (!rc) {
- memcpy(&oti->oti_ack_locks[0].lock, &parent_lockh,
- sizeof(parent_lockh));
- oti->oti_ack_locks[0].mode = LCK_PR;
+ if (lmm != NULL) {
+ /* XXX zero *lsmp? */
+ (*lsmp)->lsm_object_id = le64_to_cpu (lmm->lmm_object_id);
+ LASSERT((*lsmp)->lsm_object_id);
}
- RETURN(rc);
+
+ (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
+
+ RETURN(lsm_size);
}
-static int filter_close(struct lustre_handle *conn, struct obdo *oa,
- struct lov_stripe_md *ea, struct obd_trans_info *oti)
+static int filter_statfs(struct obd_device *obd, struct obd_statfs *osfs,
+ unsigned long max_age)
{
- struct obd_export *exp = class_conn2export(conn);
- struct filter_file_data *ffd;
- struct filter_export_data *fed;
+ struct filter_obd *filter = &obd->u.filter;
+ int blockbits = filter->fo_sb->s_blocksize_bits;
int rc;
ENTRY;
- if (!exp) {
- CDEBUG(D_IOCTL, "invalid client cookie"LPX64"\n", conn->cookie);
- GOTO(out, rc = -EINVAL);
- }
+ /* at least try to account for cached pages. its still racey and
+ * might be under-reporting if clients haven't announced their
+ * caches with brw recently */
+ spin_lock(&obd->obd_osfs_lock);
+ rc = fsfilt_statfs(obd, filter->fo_sb, max_age);
+ memcpy(osfs, &obd->obd_osfs, sizeof(*osfs));
+ spin_unlock(&obd->obd_osfs_lock);
- if (!(oa->o_valid & OBD_MD_FLHANDLE)) {
- CERROR("no handle for close of objid "LPU64"\n", oa->o_id);
- GOTO(out, rc = -EINVAL);
- }
+ CDEBUG(D_SUPER | D_CACHE, "blocks cached "LPU64" granted "LPU64
+ " pending "LPU64" free "LPU64" avail "LPU64"\n",
+ filter->fo_tot_dirty, filter->fo_tot_granted,
+ filter->fo_tot_pending,
+ osfs->os_bfree << blockbits, osfs->os_bavail << blockbits);
- ffd = filter_handle2ffd(obdo_handle(oa));
- if (ffd == NULL) {
- CERROR("bad handle ("LPX64") for close\n",
- obdo_handle(oa)->cookie);
- GOTO(out, rc = -ESTALE);
- }
+ filter_grant_sanity_check(obd, __FUNCTION__);
- fed = &exp->exp_filter_data;
- spin_lock(&fed->fed_lock);
- list_del(&ffd->ffd_export_list);
- spin_unlock(&fed->fed_lock);
+ osfs->os_bavail -= min(osfs->os_bavail,
+ (filter->fo_tot_dirty + filter->fo_tot_pending +
+ osfs->os_bsize -1) >> blockbits);
- rc = filter_close_internal(exp, ffd, oti, 0);
- filter_ffd_put(ffd);
- GOTO(out, rc);
- out:
- class_export_put(exp);
- return rc;
+ RETURN(rc);
}
-static int filter_create(struct lustre_handle *conn, struct obdo *oa,
- struct lov_stripe_md **ea, struct obd_trans_info *oti)
+int filter_create_object(struct obd_device *obd, struct obdo *oa,
+ obd_gr group)
{
- struct obd_export *exp;
- struct obd_device *obd = class_conn2obd(conn);
- struct filter_obd *filter = &obd->u.filter;
- struct obd_run_ctxt saved;
- struct lustre_handle parent_lockh;
- struct dentry *dparent;
+ struct dentry *dparent = NULL;
struct dentry *dchild = NULL;
- struct iattr;
- void *handle;
- int err, rc, cleanup_phase;
+ struct filter_obd *filter;
+ struct obd_statfs *osfs;
+ int cleanup_phase = 0;
+ int err = 0, rc = 0;
+ void *handle = NULL;
+ void *lock = NULL;
ENTRY;
- if (!obd) {
- CERROR("invalid client cookie "LPX64"\n", conn->cookie);
- RETURN(-EINVAL);
+ filter = &obd->u.filter;
+
+ OBD_ALLOC(osfs, sizeof(*osfs));
+ if (osfs == NULL)
+ RETURN(-ENOMEM);
+ rc = filter_statfs(obd, osfs, jiffies - HZ);
+ if (rc == 0 && osfs->os_bavail < (osfs->os_blocks >> 10)) {
+ CDEBUG(D_HA, "OST out of space! avail "LPU64"\n",
+ osfs->os_bavail << filter->fo_sb->s_blocksize_bits);
+ rc = -ENOSPC;
}
+ OBD_FREE(osfs, sizeof(*osfs));
+ if (rc)
+ RETURN(rc);
- exp = class_conn2export(conn);
+ down(&filter->fo_create_locks[group]);
+
+ if (test_bit(group, &filter->fo_destroys_in_progress)) {
+ CWARN("%s: precreate aborted by destroy\n",
+ obd->obd_name);
+ GOTO(out, rc = -EALREADY);
+ }
- push_ctxt(&saved, &filter->fo_ctxt, NULL);
- retry:
- oa->o_id = filter_next_id(filter);
+ CDEBUG(D_INFO, "precreate objid "LPU64"\n", oa->o_id);
- cleanup_phase = 0;
- dparent = filter_parent_lock(obd, S_IFREG, oa->o_id, LCK_PW,
- &parent_lockh);
+ dparent = filter_parent_lock(obd, group, oa->o_id, &lock);
if (IS_ERR(dparent))
GOTO(cleanup, rc = PTR_ERR(dparent));
cleanup_phase = 1;
- dchild = filter_fid2dentry(obd, dparent, S_IFREG, oa->o_id);
+ dchild = filter_id2dentry(obd, dparent, group, oa->o_id);
if (IS_ERR(dchild))
GOTO(cleanup, rc = PTR_ERR(dchild));
- if (dchild->d_inode) {
- /* This would only happen if lastobjid was bad on disk */
- CERROR("Serious error: objid %*s already exists; is this "
- "filesystem corrupt? I will try to work around it.\n",
- dchild->d_name.len, dchild->d_name.name);
- f_dput(dchild);
- filter_parent_unlock(dparent, &parent_lockh, LCK_PW);
- goto retry;
- }
-
cleanup_phase = 2;
- handle = fsfilt_start(obd, dparent->d_inode, FSFILT_OP_CREATE);
+
+ if (dchild->d_inode != NULL)
+ GOTO(cleanup, rc = 0);
+
+ handle = fsfilt_start_log(obd, dparent->d_inode,
+ FSFILT_OP_CREATE, NULL, 1);
if (IS_ERR(handle))
GOTO(cleanup, rc = PTR_ERR(handle));
+ cleanup_phase = 3;
- rc = vfs_create(dparent->d_inode, dchild, oa->o_mode);
- if (rc)
+ rc = ll_vfs_create(dparent->d_inode, dchild, S_IFREG, NULL);
+ if (rc) {
CERROR("create failed rc = %d\n", rc);
-
- rc = filter_finish_transno(exp, handle, oti, rc);
- err = filter_update_server_data(filter->fo_rcvd_filp, filter->fo_fsd);
- if (err) {
- CERROR("unable to write lastobjid but file created\n");
- if (!rc)
- rc = err;
- }
- err = fsfilt_commit(obd, dparent->d_inode, handle, 0);
- if (err) {
- CERROR("error on commit, err = %d\n", err);
- if (!rc)
- rc = err;
- }
-
- if (rc)
GOTO(cleanup, rc);
-
- /* Set flags for fields we have set in the inode struct */
- oa->o_valid = OBD_MD_FLID | OBD_MD_FLBLKSZ | OBD_MD_FLBLOCKS |
- OBD_MD_FLMTIME | OBD_MD_FLATIME | OBD_MD_FLCTIME;
- filter_from_inode(oa, dchild->d_inode, oa->o_valid);
-
- EXIT;
-cleanup:
- switch(cleanup_phase) {
- case 2:
- f_dput(dchild);
- case 1: /* locked parent dentry */
- if (rc || oti == NULL) {
- filter_parent_unlock(dparent, &parent_lockh, LCK_PW);
- } else {
- memcpy(&oti->oti_ack_locks[0].lock, &parent_lockh,
- sizeof(parent_lockh));
- oti->oti_ack_locks[0].mode = LCK_PW;
- }
- case 0:
- pop_ctxt(&saved, &filter->fo_ctxt, NULL);
- class_export_put(exp);
- break;
- default:
- CERROR("invalid cleanup_phase %d\n", cleanup_phase);
- LBUG();
}
- RETURN(rc);
-}
-
-static int filter_destroy(struct lustre_handle *conn, struct obdo *oa,
- struct lov_stripe_md *ea, struct obd_trans_info *oti)
-{
- struct obd_export *exp;
- struct obd_device *obd = class_conn2obd(conn);
- struct filter_obd *filter = &obd->u.filter;
- struct dentry *dparent, *dchild = NULL;
- struct filter_dentry_data *fdd;
- struct obd_run_ctxt saved;
- void *handle = NULL;
- struct lustre_handle parent_lockh;
- int rc, rc2, cleanup_phase = 0;
- ENTRY;
-
- if (!obd) {
- CERROR("invalid client cookie "LPX64"\n", conn->cookie);
- RETURN(-EINVAL);
- }
-
- exp = class_conn2export(conn);
-
- CDEBUG(D_INODE, "destroying objid "LPU64"\n", oa->o_id);
+ fsfilt_set_fs_flags(obd, dparent->d_inode, SM_DO_REC);
- push_ctxt(&saved, &filter->fo_ctxt, NULL);
- dparent = filter_parent_lock(obd, oa->o_mode, oa->o_id,
- LCK_PW, &parent_lockh);
- if (IS_ERR(dparent))
- GOTO(cleanup, rc = PTR_ERR(dparent));
- cleanup_phase = 1;
-
- dchild = filter_fid2dentry(obd, dparent, S_IFREG, oa->o_id);
- if (IS_ERR(dchild))
- GOTO(cleanup, rc = -ENOENT);
- cleanup_phase = 2;
-
- if (!dchild->d_inode) {
- CERROR("destroying non-existent object "LPU64"\n", oa->o_id);
- GOTO(cleanup, rc = -ENOENT);
- }
-
- handle = fsfilt_start(obd, dparent->d_inode, FSFILT_OP_UNLINK);
- if (IS_ERR(handle))
- GOTO(cleanup, rc = PTR_ERR(handle));
- cleanup_phase = 3;
+ if (oa->o_id > filter_last_id(filter, group)) {
+ /*
+ * saving last created object id, it will be needed in recovery
+ * for deleting orphanes.
+ */
+ filter_set_last_id(filter, group, oa->o_id);
- fdd = dchild->d_fsdata;
- if (fdd && atomic_read(&fdd->fdd_open_count)) {
- LASSERT(fdd->fdd_magic = FILTER_DENTRY_MAGIC);
- if (!(fdd->fdd_flags & FILTER_FLAG_DESTROY)) {
- fdd->fdd_flags |= FILTER_FLAG_DESTROY;
- /* XXX put into PENDING directory in case of crash */
- CDEBUG(D_INODE,
- "defer destroy of %dx open objid "LPU64"\n",
- atomic_read(&fdd->fdd_open_count), oa->o_id);
- } else
- CDEBUG(D_INODE,
- "repeat destroy of %dx open objid "LPU64"\n",
- atomic_read(&fdd->fdd_open_count), oa->o_id);
- GOTO(cleanup, rc = 0);
+ rc = filter_update_last_objid(obd, group, 0);
+ if (rc) {
+ CERROR("unable to write lastobjid, but "
+ "orphans were deleted, err = %d\n",
+ rc);
+ rc = 0;
+ }
}
-
- rc = filter_destroy_internal(obd, dparent, dchild);
-
cleanup:
switch(cleanup_phase) {
case 3:
- rc = filter_finish_transno(exp, handle, oti, rc);
- rc2 = fsfilt_commit(obd, dparent->d_inode, handle, 0);
- if (rc2) {
- CERROR("error on commit, err = %d\n", rc2);
+ err = fsfilt_commit(obd, filter->fo_sb,
+ dparent->d_inode, handle, 0);
+ if (err) {
+ CERROR("error on commit, err = %d\n", err);
if (!rc)
- rc = rc2;
+ rc = err;
}
case 2:
f_dput(dchild);
case 1:
- if (rc || oti == NULL) {
- filter_parent_unlock(dparent, &parent_lockh, LCK_PW);
- } else {
- memcpy(&oti->oti_ack_locks[0].lock, &parent_lockh,
- sizeof(parent_lockh));
- oti->oti_ack_locks[0].mode = LCK_PW;
- }
+ filter_parent_unlock(dparent, lock);
case 0:
- pop_ctxt(&saved, &filter->fo_ctxt, NULL);
- class_export_put(exp);
break;
- default:
- CERROR("invalid cleanup_phase %d\n", cleanup_phase);
- LBUG();
}
+ if (rc)
+ GOTO(out, rc);
+
+out:
+ up(&filter->fo_create_locks[group]);
RETURN(rc);
}
-/* NB start and end are used for punch, but not truncate */
-static int filter_truncate(struct lustre_handle *conn, struct obdo *oa,
- struct lov_stripe_md *lsm,
- obd_off start, obd_off end,
- struct obd_trans_info *oti)
+struct dentry *filter_crow_object(struct obd_device *obd,
+ __u64 ogr, __u64 oid)
{
- int error;
+ struct dentry *dentry;
+ struct obdo *oa;
+ int rc = 0;
ENTRY;
- if (end != OBD_OBJECT_EOF)
- CERROR("PUNCH not supported, only truncate: end = "LPX64"\n",
- end);
-
- CDEBUG(D_INODE, "calling truncate for object "LPU64", valid = %x, "
- "o_size = "LPD64"\n", oa->o_id, oa->o_valid, start);
- oa->o_size = start;
- error = filter_setattr(conn, oa, NULL, oti);
- RETURN(error);
-}
+ /* check if object is already allocated */
+ dentry = filter_id2dentry(obd, NULL, ogr, oid);
+ if (IS_ERR(dentry))
+ RETURN(dentry);
-static inline void lustre_put_page(struct page *page)
-{
- page_cache_release(page);
-}
+ if (dentry->d_inode)
+ RETURN(dentry);
-static int filter_start_page_read(struct inode *inode, struct niobuf_local *lnb)
-{
- struct address_space *mapping = inode->i_mapping;
- struct page *page;
- unsigned long index = lnb->offset >> PAGE_SHIFT;
- int rc;
-
- page = grab_cache_page(mapping, index); /* locked page */
- if (IS_ERR(page))
- return lnb->rc = PTR_ERR(page);
+ f_dput(dentry);
+
+ /* allocate object as it does not exist */
+ oa = obdo_alloc();
+ if (oa == NULL)
+ RETURN(ERR_PTR(-ENOMEM));
- lnb->page = page;
+ oa->o_id = oid;
+ oa->o_gr = ogr;
+ oa->o_valid = OBD_MD_FLID | OBD_MD_FLGROUP;
- if (inode->i_size < lnb->offset + lnb->len - 1)
- lnb->rc = inode->i_size - lnb->offset;
- else
- lnb->rc = lnb->len;
+ CDEBUG(D_INODE, "OSS object "LPU64"/"LPU64
+ " does not exists - allocate now\n",
+ oid, ogr);
- if (PageUptodate(page)) {
- unlock_page(page);
- return 0;
+ rc = filter_create_object(obd, oa, oa->o_gr);
+ if (rc) {
+ CERROR("cannot create OSS object "LPU64"/"LPU64
+ ", err = %d\n", oa->o_id, oa->o_gr, rc);
+ GOTO(out_free_oa, dentry = ERR_PTR(rc));
}
- rc = mapping->a_ops->readpage(NULL, page);
- if (rc < 0) {
- CERROR("page index %lu, rc = %d\n", index, rc);
- lnb->page = NULL;
- lustre_put_page(page);
- return lnb->rc = rc;
+ /* lookup for just created object and return it to caller */
+ dentry = filter_id2dentry(obd, NULL, ogr, oid);
+ if (IS_ERR(dentry))
+ GOTO(out_free_oa, dentry);
+
+ if (dentry->d_inode == NULL) {
+ f_dput(dentry);
+ dentry = ERR_PTR(-ENOENT);
+ CERROR("cannot find just created OSS object "
+ LPU64"/"LPU64" err = %d\n", oid,
+ ogr, (int)PTR_ERR(dentry));
+ GOTO(out_free_oa, dentry);
}
- return 0;
+ EXIT;
+out_free_oa:
+ obdo_free(oa);
+ return dentry;
}
-static int filter_finish_page_read(struct niobuf_local *lnb)
+static int
+filter_clear_orphans(struct obd_export *exp, struct obdo *oa)
{
- if (lnb->page == NULL)
- return 0;
+ struct obd_device *obd = NULL;
+ struct filter_obd *filter;
+ struct obdo *doa = NULL;
+ int rc = 0, orphans;
+ __u64 last, id;
+ ENTRY;
+
+ LASSERT(oa);
+ LASSERT(oa->o_gr != 0);
+ LASSERT(oa->o_valid & OBD_MD_FLGROUP);
- if (PageUptodate(lnb->page))
- return 0;
+ obd = exp->exp_obd;
+ filter = &obd->u.filter;
- wait_on_page(lnb->page);
- if (!PageUptodate(lnb->page)) {
- CERROR("page index %lu/offset "LPX64" not uptodate\n",
- lnb->page->index, lnb->offset);
- GOTO(err_page, lnb->rc = -EIO);
- }
- if (PageError(lnb->page)) {
- CERROR("page index %lu/offset "LPX64" has error\n",
- lnb->page->index, lnb->offset);
- GOTO(err_page, lnb->rc = -EIO);
- }
+ last = filter_last_id(filter, oa->o_gr);
+ orphans = last - oa->o_id;
+
+ if (orphans <= 0)
+ RETURN(0);
+
+ doa = obdo_alloc();
+ if (doa == NULL)
+ RETURN(-ENOMEM);
- return 0;
+ doa->o_gr = oa->o_gr;
+ doa->o_mode = S_IFREG;
+ doa->o_valid = oa->o_valid & (OBD_MD_FLGROUP | OBD_MD_FLID);
-err_page:
- lustre_put_page(lnb->page);
- lnb->page = NULL;
- return lnb->rc;
-}
+ set_bit(doa->o_gr, &filter->fo_destroys_in_progress);
+ down(&filter->fo_create_locks[doa->o_gr]);
+ if (!test_bit(doa->o_gr, &filter->fo_destroys_in_progress)) {
+ CERROR("%s:["LPU64"] destroy_in_progress already cleared\n",
+ exp->exp_obd->obd_name, doa->o_gr);
+ up(&filter->fo_create_locks[doa->o_gr]);
+ GOTO(out_free_doa, 0);
+ }
-static struct page *lustre_get_page_write(struct inode *inode,
- unsigned long index)
-{
- struct address_space *mapping = inode->i_mapping;
- struct page *page;
- int rc;
+ CWARN("%s:["LPU64"] deleting orphan objects from "LPU64" to "
+ LPU64"\n", exp->exp_obd->obd_name, doa->o_gr,
+ oa->o_id + 1, last);
+
+ for (id = oa->o_id + 1; id <= last; id++) {
+ doa->o_id = id;
+ filter_destroy(exp, doa, NULL, NULL);
+ }
- page = grab_cache_page(mapping, index); /* locked page */
+ CDEBUG(D_HA, "%s:["LPU64"] after destroy: set last_objids = "
+ LPU64"\n", exp->exp_obd->obd_name, doa->o_gr, oa->o_id);
- if (!IS_ERR(page)) {
- /* Note: Called with "O" and "PAGE_SIZE" this is essentially
- * a no-op for most filesystems, because we write the whole
- * page. For partial-page I/O this will read in the page.
- */
- rc = mapping->a_ops->prepare_write(NULL, page, 0, PAGE_SIZE);
- if (rc) {
- CERROR("page index %lu, rc = %d\n", index, rc);
- if (rc != -ENOSPC)
- LBUG();
- GOTO(err_unlock, rc);
- }
- /* XXX not sure if we need this if we are overwriting page */
- if (PageError(page)) {
- CERROR("error on page index %lu, rc = %d\n", index, rc);
- LBUG();
- GOTO(err_unlock, rc = -EIO);
- }
- }
- return page;
+ /* return next free id to be used as a new start of sequence -bzzz */
+ oa->o_id = last + 1;
-err_unlock:
- unlock_page(page);
- lustre_put_page(page);
- return ERR_PTR(rc);
-}
+ filter_set_last_id(filter, oa->o_gr, oa->o_id);
+ clear_bit(doa->o_gr, &filter->fo_destroys_in_progress);
+ up(&filter->fo_create_locks[oa->o_gr]);
-#if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
-int waitfor_one_page(struct page *page)
-{
- wait_on_page_locked(page);
- return 0;
+ EXIT;
+out_free_doa:
+ obdo_free(doa);
+ return rc;
}
-#endif
-#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
-/* We should only change the file mtime (and not the ctime, like
- * update_inode_times() in generic_file_write()) when we only change data.
+/*
+ * by now this function is only needed as entry point for deleting orphanes on
+ * OSS as objects are created on first write attempt. --umka
*/
-static inline void inode_update_time(struct inode *inode, int ctime_too)
+static int
+filter_create(struct obd_export *exp, struct obdo *oa, void *acl,
+ int acl_size, struct lov_stripe_md **ea,
+ struct obd_trans_info *oti)
{
- time_t now = CURRENT_TIME;
- if (inode->i_mtime == now && (!ctime_too || inode->i_ctime == now))
- return;
- inode->i_mtime = now;
- if (ctime_too)
- inode->i_ctime = now;
- mark_inode_dirty_sync(inode);
-}
-#endif
-
-static int lustre_commit_write(struct niobuf_local *lnb)
-{
- struct page *page = lnb->page;
- unsigned from = lnb->offset & ~PAGE_MASK;
- unsigned to = from + lnb->len;
- struct inode *inode = page->mapping->host;
- int err;
-
- LASSERT(to <= PAGE_SIZE);
- err = page->mapping->a_ops->commit_write(NULL, page, from, to);
- if (!err && IS_SYNC(inode))
- err = waitfor_one_page(page);
- //SetPageUptodate(page); // the client commit_write will do this
-
- SetPageReferenced(page);
- unlock_page(page);
- lustre_put_page(page);
- return err;
-}
+ struct filter_export_data *fed;
+ struct obd_device *obd = NULL;
+ int group = oa->o_gr, rc = 0;
+ struct lvfs_run_ctxt saved;
+ struct filter_obd *filter;
+ char str[PTL_NALFMT_SIZE];
+ ENTRY;
-int filter_get_page_write(struct inode *inode, struct niobuf_local *lnb,
- int *pglocked)
-{
- unsigned long index = lnb->offset >> PAGE_SHIFT;
- struct address_space *mapping = inode->i_mapping;
- struct page *page;
- int rc;
+ LASSERT(acl == NULL && acl_size == 0);
- //ASSERT_PAGE_INDEX(index, GOTO(err, rc = -EINVAL));
- if (*pglocked)
- page = grab_cache_page_nowait(mapping, index); /* locked page */
- else
- page = grab_cache_page(mapping, index); /* locked page */
+ if (!(oa->o_valid & OBD_MD_FLGROUP) || group == 0) {
+ portals_nid2str(exp->exp_connection->c_peer.peer_ni->pni_number,
+ exp->exp_connection->c_peer.peer_id.nid, str);
+ CERROR("!!! nid "LPX64"/%s sent invalid object group %d\n",
+ exp->exp_connection->c_peer.peer_id.nid, str, group);
+ RETURN(-EINVAL);
+ }
+ obd = exp->exp_obd;
+ fed = &exp->exp_filter_data;
+ filter = &obd->u.filter;
- /* This page is currently locked, so get a temporary page instead. */
- if (!page) {
- CDEBUG(D_ERROR,"ino %lu page %ld locked\n", inode->i_ino,index);
- page = alloc_pages(GFP_KERNEL, 0); /* locked page */
- if (!page) {
- CERROR("no memory for a temp page\n");
- GOTO(err, rc = -ENOMEM);
- }
- page->index = index;
- lnb->page = page;
- lnb->flags |= N_LOCAL_TEMP_PAGE;
- } else if (!IS_ERR(page)) {
- (*pglocked)++;
-
- rc = mapping->a_ops->prepare_write(NULL, page,
- lnb->offset & ~PAGE_MASK,
- lnb->len);
- if (rc) {
- if (rc != -ENOSPC)
- CERROR("page index %lu, rc = %d\n", index, rc);
- GOTO(err_unlock, rc);
- }
- /* XXX not sure if we need this if we are overwriting page */
- if (PageError(page)) {
- CERROR("error on page index %lu, rc = %d\n", index, rc);
- LBUG();
- GOTO(err_unlock, rc = -EIO);
- }
- lnb->page = page;
+ if (fed->fed_group != group) {
+ portals_nid2str(exp->exp_connection->c_peer.peer_ni->pni_number,
+ exp->exp_connection->c_peer.peer_id.nid, str);
+ CERROR("!!! this export (nid "LPX64"/%s) used object group %d "
+ "earlier; now it's trying to use group %d! This could "
+ "be a bug in the MDS. Tell CFS.\n",
+ exp->exp_connection->c_peer.peer_id.nid, str,
+ fed->fed_group, group);
+ RETURN(-ENOTUNIQ);
}
- return 0;
+ CDEBUG(D_INFO, "filter_create(od->o_gr=%d,od->o_id="LPU64")\n",
+ group, oa->o_id);
-err_unlock:
- unlock_page(page);
- lustre_put_page(page);
-err:
- return lnb->rc = rc;
-}
+ obd = exp->exp_obd;
+ push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
-/*
- * We need to balance prepare_write() calls with commit_write() calls.
- * If the page has been prepared, but we have no data for it, we don't
- * want to overwrite valid data on disk, but we still need to zero out
- * data for space which was newly allocated. Like part of what happens
- * in __block_prepare_write() for newly allocated blocks.
- *
- * XXX currently __block_prepare_write() creates buffers for all the
- * pages, and the filesystems mark these buffers as BH_New if they
- * were newly allocated from disk. We use the BH_New flag similarly.
- */
-static int filter_commit_write(struct niobuf_local *lnb, int err)
-{
-#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
- if (err) {
- unsigned block_start, block_end;
- struct buffer_head *bh, *head = lnb->page->buffers;
- unsigned blocksize = head->b_size;
-
- /* debugging: just seeing if this ever happens */
- CDEBUG(err == -ENOSPC ? D_INODE : D_ERROR,
- "called for ino %lu:%lu on err %d\n",
- lnb->page->mapping->host->i_ino, lnb->page->index, err);
-
- /* Currently one buffer per page, but in the future... */
- for (bh = head, block_start = 0; bh != head || !block_start;
- block_start = block_end, bh = bh->b_this_page) {
- block_end = block_start + blocksize;
- if (buffer_new(bh)) {
- memset(kmap(lnb->page) + block_start, 0,
- blocksize);
- kunmap(lnb->page);
- }
+ LASSERT((oa->o_valid & OBD_MD_FLFLAGS) &&
+ (oa->o_flags == OBD_FL_DELORPHAN));
+
+ rc = filter_clear_orphans(exp, oa);
+ if (rc) {
+ CERROR("cannot clear orphanes starting from "
+ LPU64", err = %d\n", oa->o_id, rc);
+ } else {
+ rc = filter_update_last_objid(obd, group, 0);
+ if (rc) {
+ CERROR("unable to write lastobjid, but "
+ "orphans were deleted, err = %d\n",
+ rc);
}
}
-#endif
- return lustre_commit_write(lnb);
+ pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
+
+ RETURN(0);
}
-static int filter_preprw(int cmd, struct obd_export *exp, struct obdo *obdo,
- int objcount, struct obd_ioobj *obj,
- int niocount, struct niobuf_remote *nb,
- struct niobuf_local *res, void **desc_private,
- struct obd_trans_info *oti)
+static int filter_destroy(struct obd_export *exp, struct obdo *oa,
+ struct lov_stripe_md *ea, struct obd_trans_info *oti)
{
- struct obd_run_ctxt saved;
+ int rc, rc2, cleanup_phase = 0, have_prepared = 0;
+ struct dentry *dchild = NULL, *dparent = NULL;
+ struct llog_cookie *fcc = NULL;
+ struct lvfs_run_ctxt saved;
+ struct filter_obd *filter;
struct obd_device *obd;
- struct obd_ioobj *o;
- struct niobuf_remote *rnb;
- struct niobuf_local *lnb;
- struct fsfilt_objinfo *fso;
- struct dentry *dentry;
- struct inode *inode;
- int pglocked = 0, rc = 0, i, j, tot_bytes = 0;
- unsigned long now = jiffies;
+ void *handle = NULL;
+ void *lock = NULL;
+ struct iattr iattr;
ENTRY;
- memset(res, 0, niocount * sizeof(*res));
+ LASSERT(oa->o_valid & OBD_MD_FLGROUP);
obd = exp->exp_obd;
- if (obd == NULL)
- RETURN(-EINVAL);
-
- // theoretically we support multi-obj BRW RPCs, but until then...
- LASSERT(objcount == 1);
-
- OBD_ALLOC(fso, objcount * sizeof(*fso));
- if (!fso)
- RETURN(-ENOMEM);
-
- push_ctxt(&saved, &obd->u.filter.fo_ctxt, NULL);
+ filter = &obd->u.filter;
- for (i = 0, o = obj; i < objcount; i++, o++) {
- struct filter_dentry_data *fdd;
-
- LASSERT(o->ioo_bufcnt);
-
- dentry = filter_fid2dentry(obd, NULL, o->ioo_type, o->ioo_id);
-
- if (IS_ERR(dentry))
- GOTO(out_objinfo, rc = PTR_ERR(dentry));
-
- fso[i].fso_dentry = dentry;
- fso[i].fso_bufcnt = o->ioo_bufcnt;
-
- if (!dentry->d_inode) {
- CERROR("trying to BRW to non-existent file "LPU64"\n",
- o->ioo_id);
- f_dput(dentry);
- GOTO(out_objinfo, rc = -ENOENT);
- }
-
- /* If we ever start to support mutli-object BRW RPCs, we will
- * need to get locks on mulitple inodes (in order) or use the
- * DLM to do the locking for us (and use the same locking in
- * filter_setattr() for truncate). That isn't all, because
- * there still exists the possibility of a truncate starting
- * a new transaction while holding the ext3 rwsem = write
- * while some writes (which have started their transactions
- * here) blocking on the ext3 rwsem = read => lock inversion.
- *
- * The handling gets very ugly when dealing with locked pages.
- * It may be easier to just get rid of the locked page code
- * (which has problems of its own) and either discover we do
- * not need it anymore (i.e. it was a symptom of another bug)
- * or ensure we get the page locks in an appropriate order.
- */
- if (cmd & OBD_BRW_WRITE)
- down(&dentry->d_inode->i_sem);
- fdd = dentry->d_fsdata;
- if (!fdd || !atomic_read(&fdd->fdd_open_count))
- CDEBUG(D_PAGE, "I/O to unopened object "LPU64"\n",
- o->ioo_id);
- }
-
- if (time_after(jiffies, now + 15*HZ))
- CERROR("slow prep setup %lus\n", (jiffies - now) / HZ);
-
- if (cmd & OBD_BRW_WRITE) {
- *desc_private = fsfilt_brw_start(obd, objcount, fso,
- niocount, nb);
- if (IS_ERR(*desc_private)) {
- rc = PTR_ERR(*desc_private);
- CDEBUG(rc == -ENOSPC ? D_INODE : D_ERROR,
- "error starting transaction: rc = %d\n", rc);
- *desc_private = NULL;
- GOTO(out_objinfo, rc);
- }
- }
-
- for (i = 0, o = obj, rnb = nb, lnb = res; i < objcount; i++, o++) {
- dentry = fso[i].fso_dentry;
- inode = dentry->d_inode;
-
- for (j = 0; j < o->ioo_bufcnt; j++, rnb++, lnb++) {
- if (j == 0)
- lnb->dentry = dentry;
- else
- lnb->dentry = dget(dentry);
-
- lnb->offset = rnb->offset;
- lnb->len = rnb->len;
- lnb->flags = rnb->flags;
- lnb->start = jiffies;
-
- if (cmd & OBD_BRW_WRITE) {
- rc = filter_get_page_write(inode,lnb,&pglocked);
- if (rc)
- up(&dentry->d_inode->i_sem);
- } else if (inode->i_size <= rnb->offset) {
- /* If there's no more data, abort early.
- * lnb->page == NULL and lnb->rc == 0, so it's
- * easy to detect later. */
- f_dput(dentry);
- lnb->dentry = NULL;
- break;
- } else {
- rc = filter_start_page_read(inode, lnb);
- }
+ push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
- if (rc) {
- CDEBUG(rc == -ENOSPC ? D_INODE : D_ERROR,
- "page err %u@"LPU64" %u/%u %p: rc %d\n",
- lnb->len, lnb->offset, j, o->ioo_bufcnt,
- dentry, rc);
- f_dput(dentry);
- GOTO(out_pages, rc);
- }
+ acquire_locks:
+ dparent = filter_parent_lock(obd, oa->o_gr, oa->o_id, &lock);
+ if (IS_ERR(dparent))
+ GOTO(cleanup, rc = PTR_ERR(dparent));
+ cleanup_phase = 1;
- tot_bytes += lnb->len;
+ dchild = filter_id2dentry(obd, dparent, oa->o_gr, oa->o_id);
+ if (IS_ERR(dchild))
+ GOTO(cleanup, rc = PTR_ERR(dchild));
+ cleanup_phase = 2;
- if ((cmd & OBD_BRW_READ) && lnb->rc < lnb->len) {
- /* Likewise with a partial read */
- break;
- }
- }
+ if (dchild->d_inode == NULL) {
+ CDEBUG(D_INODE, "destroying non-existent object "LPU64"\n",
+ oa->o_id);
+ GOTO(cleanup, rc = -ENOENT);
}
- if (time_after(jiffies, now + 15*HZ))
- CERROR("slow prep get page %lus\n", (jiffies - now) / HZ);
-
- if (cmd & OBD_BRW_READ) {
- lprocfs_counter_add(obd->obd_stats, LPROC_FILTER_READ_BYTES,
- tot_bytes);
- while (lnb-- > res) {
- rc = filter_finish_page_read(lnb);
- if (rc) {
- CERROR("error page %u@"LPU64" %u %p: rc %d\n",
- lnb->len, lnb->offset, lnb - res,
- lnb->dentry, rc);
- f_dput(lnb->dentry);
- GOTO(out_pages, rc);
- }
- }
- } else
- lprocfs_counter_add(obd->obd_stats, LPROC_FILTER_WRITE_BYTES,
- tot_bytes);
-
- if (time_after(jiffies, now + 15*HZ))
- CERROR("slow prep finish page %lus\n", (jiffies - now) / HZ);
-
- EXIT;
-out:
- OBD_FREE(fso, objcount * sizeof(*fso));
- current->journal_info = NULL;
- pop_ctxt(&saved, &obd->u.filter.fo_ctxt, NULL);
- return rc;
+ if (!have_prepared) {
+ /* If we're really going to destroy the object, get ready
+ * by getting the clients to discard their cached data.
+ *
+ * We have to drop the parent lock, because
+ * filter_prepare_destroy will acquire a PW on the object, and
+ * we don't want to deadlock with an incoming write to the
+ * object, which has the extent PW and then wants to get the
+ * parent dentry to do the lookup.
+ *
+ * We dput the child because it's not worth the extra
+ * complication of condition the above code to skip it on the
+ * second time through. */
+ f_dput(dchild);
+ filter_parent_unlock(dparent, lock);
-out_pages:
- while (lnb-- > res) {
- if (cmd & OBD_BRW_WRITE) {
- filter_commit_write(lnb, rc);
- up(&lnb->dentry->d_inode->i_sem);
- } else {
- lustre_put_page(lnb->page);
- }
- f_dput(lnb->dentry);
+ filter_prepare_destroy(obd, oa->o_id, oa->o_gr);
+ have_prepared = 1;
+ goto acquire_locks;
}
- if (cmd & OBD_BRW_WRITE) {
- filter_finish_transno(exp, *desc_private, oti, rc);
- fsfilt_commit(obd,
- filter_parent(obd,S_IFREG,obj->ioo_id)->d_inode,
- *desc_private, 0);
- }
- goto out; /* dropped the dentry refs already (one per page) */
-out_objinfo:
- for (i = 0; i < objcount && fso[i].fso_dentry; i++) {
- if (cmd & OBD_BRW_WRITE)
- up(&fso[i].fso_dentry->d_inode->i_sem);
- f_dput(fso[i].fso_dentry);
+ /* Our MDC connection is established by the MDS to us */
+ if (oa->o_valid & OBD_MD_FLCOOKIE) {
+ OBD_ALLOC(fcc, sizeof(*fcc));
+ if (fcc != NULL)
+ memcpy(fcc, obdo_logcookie(oa), sizeof(*fcc));
}
- goto out;
-}
-static int filter_write_locked_page(struct niobuf_local *lnb)
-{
- struct page *lpage;
- void *lpage_addr;
- void *lnb_addr;
- int rc;
- ENTRY;
-
- lpage = lustre_get_page_write(lnb->dentry->d_inode, lnb->page->index);
- if (IS_ERR(lpage)) {
- /* It is highly unlikely that we would ever get an error here.
- * The page we want to get was previously locked, so it had to
- * have already allocated the space, and we were just writing
- * over the same data, so there would be no hole in the file.
- *
- * XXX: possibility of a race with truncate could exist, need
- * to check that. There are no guarantees w.r.t.
- * write order even on a local filesystem, although the
- * normal response would be to return the number of bytes
- * successfully written and leave the rest to the app.
- */
- rc = PTR_ERR(lpage);
- CERROR("error getting locked page index %ld: rc = %d\n",
- lnb->page->index, rc);
- LBUG();
- lustre_commit_write(lnb);
- RETURN(rc);
+ /* we're gonna truncate it first in order to avoid possible
+ * deadlock:
+ * P1 P2
+ * open trasaction open transaction
+ * down(i_zombie) down(i_zombie)
+ * restart transaction
+ * (see BUG 4180) -bzzz
+ */
+ down(&dchild->d_inode->i_sem);
+ handle = fsfilt_start_log(obd, dparent->d_inode,FSFILT_OP_SETATTR,NULL,1);
+ if (IS_ERR(handle)) {
+ up(&dchild->d_inode->i_sem);
+ GOTO(cleanup, rc = PTR_ERR(handle));
}
- /* 2 kmaps == vanishingly small deadlock opportunity */
- lpage_addr = kmap(lpage);
- lnb_addr = kmap(lnb->page);
-
- memcpy(lpage_addr, lnb_addr, PAGE_SIZE);
+ iattr.ia_valid = ATTR_SIZE;
+ iattr.ia_size = 0;
+ rc = fsfilt_setattr(obd, dchild, handle, &iattr, 1);
- kunmap(lnb->page);
- kunmap(lpage);
-
- lustre_put_page(lnb->page);
-
- lnb->page = lpage;
- rc = lustre_commit_write(lnb);
+ rc2 = fsfilt_commit(obd, filter->fo_sb, dparent->d_inode, handle, 0);
+ up(&dchild->d_inode->i_sem);
if (rc)
- CERROR("error committing locked page %ld: rc = %d\n",
- lnb->page->index, rc);
-
- RETURN(rc);
-}
-
-static int filter_syncfs(struct obd_export *exp)
-{
- struct obd_device *obd = exp->exp_obd;
- ENTRY;
-
- RETURN(fsfilt_sync(obd, obd->u.filter.fo_sb));
-}
-
-static int filter_commitrw(int cmd, struct obd_export *exp,
- int objcount, struct obd_ioobj *obj,
- int niocount, struct niobuf_local *res,
- void *desc_private, struct obd_trans_info *oti)
-{
- struct obd_run_ctxt saved;
- struct obd_ioobj *o;
- struct niobuf_local *lnb;
- struct obd_device *obd = exp->exp_obd;
- int found_locked = 0, rc = 0, i;
- unsigned long now = jiffies; /* DEBUGGING OST TIMEOUTS */
- ENTRY;
-
- push_ctxt(&saved, &obd->u.filter.fo_ctxt, NULL);
-
- LASSERT(!current->journal_info);
- current->journal_info = desc_private;
-
- for (i = 0, o = obj, lnb = res; i < objcount; i++, o++) {
- int j;
-
- if (cmd & OBD_BRW_WRITE) {
- inode_update_time(lnb->dentry->d_inode, 1);
- up(&lnb->dentry->d_inode->i_sem);
- }
- for (j = 0 ; j < o->ioo_bufcnt ; j++, lnb++) {
- if (lnb->page == NULL) {
- continue;
- }
-
- if (lnb->flags & N_LOCAL_TEMP_PAGE) {
- found_locked++;
- continue;
- }
+ GOTO(cleanup, rc);
+ if (rc2)
+ GOTO(cleanup, rc = rc2);
- if (time_after(jiffies, lnb->start + 15*HZ))
- CERROR("slow commitrw %lus\n",
- (jiffies - lnb->start) / HZ);
+ handle = fsfilt_start_log(obd, dparent->d_inode,FSFILT_OP_UNLINK,oti,1);
+ if (IS_ERR(handle))
+ GOTO(cleanup, rc = PTR_ERR(handle));
- if (cmd & OBD_BRW_WRITE) {
- int err = filter_commit_write(lnb, 0);
+ cleanup_phase = 3;
- if (!rc)
- rc = err;
- } else {
- lustre_put_page(lnb->page);
- }
+ rc = filter_destroy_internal(obd, oa->o_id, dparent, dchild);
- f_dput(lnb->dentry);
- if (time_after(jiffies, lnb->start + 15*HZ))
- CERROR("slow commit_write %lus\n",
- (jiffies - lnb->start) / HZ);
+cleanup:
+ switch(cleanup_phase) {
+ case 3:
+ if (fcc != NULL) {
+ fsfilt_add_journal_cb(obd, filter->fo_sb, 0,
+ oti ? oti->oti_handle : handle,
+ filter_cancel_cookies_cb, fcc);
}
- }
-
- for (i = 0, o = obj, lnb = res; found_locked > 0 && i < objcount;
- i++, o++) {
- int j;
- for (j = 0 ; j < o->ioo_bufcnt ; j++, lnb++) {
- int err;
- if (!(lnb->flags & N_LOCAL_TEMP_PAGE))
- continue;
-
- if (time_after(jiffies, lnb->start + 15*HZ))
- CERROR("slow commitrw locked %lus\n",
- (jiffies - lnb->start) / HZ);
-
- err = filter_write_locked_page(lnb);
+ rc = filter_finish_transno(exp, oti, rc);
+ rc2 = fsfilt_commit(obd, filter->fo_sb, dparent->d_inode,
+ handle, exp->exp_sync);
+ if (rc2) {
+ CERROR("error on commit, err = %d\n", rc2);
if (!rc)
- rc = err;
- f_dput(lnb->dentry);
- found_locked--;
-
- if (time_after(jiffies, lnb->start + 15*HZ))
- CERROR("slow commit_write locked %lus\n",
- (jiffies - lnb->start) / HZ);
+ rc = rc2;
}
+ case 2:
+ f_dput(dchild);
+ case 1:
+ filter_parent_unlock(dparent, lock);
+ case 0:
+ pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
+ break;
+ default:
+ CERROR("invalid cleanup_phase %d\n", cleanup_phase);
+ LBUG();
}
- if (cmd & OBD_BRW_WRITE) {
- /* We just want any dentry for the commit, for now */
- struct dentry *dparent = filter_parent(obd, S_IFREG, 0);
- int err;
-
- rc = filter_finish_transno(exp, desc_private, oti, rc);
- err = fsfilt_commit(obd, dparent->d_inode, desc_private,
- obd_sync_filter);
- if (err)
- rc = err;
- if (obd_sync_filter)
- LASSERT(oti->oti_transno <= obd->obd_last_committed);
-
- if (time_after(jiffies, now + 15*HZ))
- CERROR("slow commitrw commit %lus\n", (jiffies-now)/HZ);
- }
-
- LASSERT(!current->journal_info);
-
- pop_ctxt(&saved, &obd->u.filter.fo_ctxt, NULL);
RETURN(rc);
}
-static int filter_brw(int cmd, struct lustre_handle *conn,
- struct lov_stripe_md *lsm, obd_count oa_bufs,
- struct brw_page *pga, struct obd_trans_info *oti)
+/* NB start and end are used for punch, but not truncate */
+static int filter_truncate(struct obd_export *exp, struct obdo *oa,
+ struct lov_stripe_md *lsm,
+ obd_off start, obd_off end,
+ struct obd_trans_info *oti)
{
- struct obd_export *export = class_conn2export(conn);
- struct obd_ioobj ioo;
- struct niobuf_local *lnb;
- struct niobuf_remote *rnb;
- obd_count i;
- void *desc_private;
- int ret = 0;
+ int error;
ENTRY;
- if (export == NULL)
- RETURN(-EINVAL);
+ if (end != OBD_OBJECT_EOF)
+ CERROR("PUNCH not supported, only truncate: end = "LPX64"\n",
+ end);
- OBD_ALLOC(lnb, oa_bufs * sizeof(struct niobuf_local));
- OBD_ALLOC(rnb, oa_bufs * sizeof(struct niobuf_remote));
+ CDEBUG(D_INODE, "calling truncate for object "LPU64", valid = "LPU64", "
+ "o_size = "LPD64"\n", oa->o_id, oa->o_valid, start);
+ oa->o_size = start;
+ error = filter_setattr(exp, oa, NULL, oti);
+ RETURN(error);
+}
- if (lnb == NULL || rnb == NULL)
- GOTO(out, ret = -ENOMEM);
+static int filter_sync(struct obd_export *exp, struct obdo *oa,
+ struct lov_stripe_md *lsm, obd_off start, obd_off end)
+{
+ struct obd_device *obd = exp->exp_obd;
+ struct lvfs_run_ctxt saved;
+ struct filter_obd *filter;
+ struct dentry *dentry;
+ int rc, rc2;
+ ENTRY;
- for (i = 0; i < oa_bufs; i++) {
- rnb[i].offset = pga[i].off;
- rnb[i].len = pga[i].count;
- }
+ filter = &obd->u.filter;
- ioo.ioo_id = lsm->lsm_object_id;
- ioo.ioo_gr = 0;
- ioo.ioo_type = S_IFREG;
- ioo.ioo_bufcnt = oa_bufs;
+ /* an objid of zero is taken to mean "sync whole filesystem" */
+ if (!oa || !(oa->o_valid & OBD_MD_FLID)) {
+ rc = fsfilt_sync(obd, filter->fo_sb);
+ /* flush any remaining cancel messages out to the target */
+ filter_sync_llogs(obd, NULL);
+ RETURN(rc);
+ }
- ret = filter_preprw(cmd, export, NULL, 1, &ioo, oa_bufs, rnb, lnb,
- &desc_private, oti);
- if (ret != 0)
- GOTO(out, ret);
+ dentry = filter_oa2dentry(obd, oa);
+ if (IS_ERR(dentry))
+ RETURN(PTR_ERR(dentry));
- for (i = 0; i < oa_bufs; i++) {
- void *virt = kmap(pga[i].pg);
- obd_off off = pga[i].off & ~PAGE_MASK;
- void *addr = kmap(lnb[i].page);
+ push_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL);
- /* 2 kmaps == vanishingly small deadlock opportunity */
+ down(&dentry->d_inode->i_sem);
+ rc = filemap_fdatawrite(dentry->d_inode->i_mapping);
+ if (rc == 0) {
+ /* just any file to grab fsync method - "file" arg unused */
+ struct file *file = filter->fo_rcvd_filp;
- if (cmd & OBD_BRW_WRITE)
- memcpy(addr + off, virt + off, pga[i].count);
- else
- memcpy(virt + off, addr + off, pga[i].count);
+ if (file->f_op && file->f_op->fsync)
+ rc = file->f_op->fsync(NULL, dentry, 1);
- kunmap(addr);
- kunmap(virt);
+ rc2 = filemap_fdatawait(dentry->d_inode->i_mapping);
+ if (!rc)
+ rc = rc2;
}
+ up(&dentry->d_inode->i_sem);
- ret = filter_commitrw(cmd, export, 1, &ioo, oa_bufs, lnb, desc_private,
- oti);
+ oa->o_valid = OBD_MD_FLID;
+ obdo_from_inode(oa, dentry->d_inode, FILTER_VALID_FLAGS);
-out:
- if (lnb)
- OBD_FREE(lnb, oa_bufs * sizeof(struct niobuf_local));
- if (rnb)
- OBD_FREE(rnb, oa_bufs * sizeof(struct niobuf_remote));
- class_export_put(export);
- RETURN(ret);
-}
-
-static int filter_san_preprw(int cmd, struct lustre_handle *conn,
- int objcount, struct obd_ioobj *obj,
- int niocount, struct niobuf_remote *nb)
-{
- struct obd_device *obd;
- struct obd_ioobj *o = obj;
- struct niobuf_remote *rnb = nb;
- int rc = 0;
- int i;
- ENTRY;
-
- obd = class_conn2obd(conn);
- if (!obd) {
- CDEBUG(D_IOCTL, "invalid client cookie "LPX64"\n",
- conn->cookie);
- RETURN(-EINVAL);
- }
-
- for (i = 0; i < objcount; i++, o++) {
- struct dentry *dentry;
- struct inode *inode;
- int (*fs_bmap)(struct address_space *, long);
- int j;
-
- dentry = filter_fid2dentry(obd, NULL, o->ioo_type, o->ioo_id);
- if (IS_ERR(dentry))
- GOTO(out, rc = PTR_ERR(dentry));
- inode = dentry->d_inode;
- if (!inode) {
- CERROR("trying to BRW to non-existent file "LPU64"\n",
- o->ioo_id);
- f_dput(dentry);
- GOTO(out, rc = -ENOENT);
- }
- fs_bmap = inode->i_mapping->a_ops->bmap;
-
- for (j = 0; j < o->ioo_bufcnt; j++, rnb++) {
- long block;
-
- block = rnb->offset >> inode->i_blkbits;
-
- if (cmd == OBD_BRW_READ) {
- block = fs_bmap(inode->i_mapping, block);
- } else {
- loff_t newsize = rnb->offset + rnb->len;
- /* fs_prep_san_write will also update inode
- * size for us:
- * (1) new alloced block
- * (2) existed block but size extented
- */
- /* FIXME We could call fs_prep_san_write()
- * only once for all the blocks allocation.
- * Now call it once for each block, for
- * simplicity. And if error happens, we
- * probably need to release previous alloced
- * block */
- rc = fs_prep_san_write(obd, inode, &block,
- 1, newsize);
- if (rc)
- break;
- }
+ pop_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL);
- rnb->offset = block;
- }
- f_dput(dentry);
- }
-out:
+ f_dput(dentry);
RETURN(rc);
}
-static int filter_statfs(struct obd_export *exp, struct obd_statfs *osfs)
-{
- struct obd_device *obd = exp->exp_obd;
- ENTRY;
-
- RETURN(fsfilt_statfs(obd, obd->u.filter.fo_sb, osfs));
-}
-
-static int filter_get_info(struct lustre_handle *conn, __u32 keylen,
+static int filter_get_info(struct obd_export *exp, __u32 keylen,
void *key, __u32 *vallen, void *val)
{
+ struct filter_export_data *fed = &exp->exp_filter_data;
struct obd_device *obd;
ENTRY;
- obd = class_conn2obd(conn);
- if (!obd) {
+ obd = class_exp2obd(exp);
+ if (obd == NULL) {
CDEBUG(D_IOCTL, "invalid client cookie "LPX64"\n",
- conn->cookie);
+ exp->exp_handle.h_cookie);
RETURN(-EINVAL);
}
RETURN(0);
}
+ if (keylen >= strlen("last_id") && memcmp(key, "last_id", 7) == 0) {
+ obd_id *last_id = val;
+ *last_id = filter_last_id(&obd->u.filter, fed->fed_group);
+ RETURN(0);
+ }
+ if (keylen >= strlen("reint_log") && memcmp(key, "reint_log", 9) == 0) {
+ /*Get log_context handle*/
+ unsigned long *llh_handle = val;
+ *vallen = sizeof(unsigned long);
+ *llh_handle = (unsigned long)obd->obd_llog_ctxt[LLOG_REINT_ORIG_CTXT];
+ RETURN(0);
+ }
+ if (keylen >= strlen("cache_sb") && memcmp(key, "cache_sb", 8) == 0) {
+ /*Get log_context handle*/
+ unsigned long *sb = val;
+ *vallen = sizeof(unsigned long);
+ *sb = (unsigned long)obd->u.filter.fo_sb;
+ RETURN(0);
+ }
+
CDEBUG(D_IOCTL, "invalid key\n");
RETURN(-EINVAL);
}
-int filter_copy_data(struct lustre_handle *dst_conn, struct obdo *dst,
- struct lustre_handle *src_conn, struct obdo *src,
- obd_size count, obd_off offset, struct obd_trans_info *oti)
+struct obd_llogs *filter_grab_llog_for_group(struct obd_device *obd, int group,
+ struct obd_export *export)
{
- struct page *page;
- struct lov_stripe_md srcmd, dstmd;
- unsigned long index = 0;
- int err = 0;
-
- LBUG(); /* THIS CODE IS NOT CORRECT -phil */
-
- memset(&srcmd, 0, sizeof(srcmd));
- memset(&dstmd, 0, sizeof(dstmd));
- srcmd.lsm_object_id = src->o_id;
- dstmd.lsm_object_id = dst->o_id;
-
- ENTRY;
- CDEBUG(D_INFO, "src: ino "LPU64" blocks "LPU64", size "LPU64
- ", dst: ino "LPU64"\n",
- src->o_id, src->o_blocks, src->o_size, dst->o_id);
- page = alloc_page(GFP_USER);
- if (page == NULL)
- RETURN(-ENOMEM);
-
- wait_on_page(page);
+ struct filter_group_llog *fglog, *nlog;
+ char name[32] = "CATLIST";
+ struct filter_obd *filter;
+ struct llog_ctxt *ctxt;
+ struct list_head *cur;
+ int rc;
- /* XXX with brw vector I/O, we could batch up reads and writes here,
- * all we need to do is allocate multiple pages to handle the I/Os
- * and arrays to handle the request parameters.
- */
- while (index < ((src->o_size + PAGE_SIZE - 1) >> PAGE_SHIFT)) {
- struct brw_page pg;
+ filter = &obd->u.filter;
+
+ spin_lock(&filter->fo_llog_list_lock);
+ list_for_each(cur, &filter->fo_llog_list) {
+ fglog = list_entry(cur, struct filter_group_llog, list);
+ if (fglog->group == group) {
+ if (!(fglog->exp == NULL || fglog->exp == export || export == NULL))
+ CWARN("%s: export for group %d changes: 0x%p -> 0x%p\n",
+ obd->obd_name, group, fglog->exp, export);
+ spin_unlock(&filter->fo_llog_list_lock);
+ goto init;
+ }
+ }
+ spin_unlock(&filter->fo_llog_list_lock);
- pg.pg = page;
- pg.count = PAGE_SIZE;
- pg.off = (page->index) << PAGE_SHIFT;
- pg.flag = 0;
+ if (export == NULL)
+ RETURN(NULL);
- page->index = index;
- err = obd_brw(OBD_BRW_READ, src_conn, &srcmd, 1, &pg, NULL);
- if (err) {
- EXIT;
- break;
- }
+ OBD_ALLOC(fglog, sizeof(*fglog));
+ if (fglog == NULL)
+ RETURN(NULL);
+ fglog->group = group;
- pg.flag = OBD_BRW_CREATE;
- CDEBUG(D_INFO, "Read page %ld ...\n", page->index);
+ OBD_ALLOC(fglog->llogs, sizeof(struct obd_llogs));
+ if (fglog->llogs == NULL) {
+ OBD_FREE(fglog, sizeof(*fglog));
+ RETURN(NULL);
+ }
- err = obd_brw(OBD_BRW_WRITE, dst_conn, &dstmd, 1, &pg, oti);
+ spin_lock(&filter->fo_llog_list_lock);
+ list_for_each(cur, &filter->fo_llog_list) {
+ nlog = list_entry(cur, struct filter_group_llog, list);
+ LASSERT(nlog->group != group);
+ }
+ list_add(&fglog->list, &filter->fo_llog_list);
+ spin_unlock(&filter->fo_llog_list_lock);
- /* XXX should handle dst->o_size, dst->o_blocks here */
- if (err) {
- EXIT;
- break;
- }
+ rc = obd_llog_cat_initialize(obd, fglog->llogs, 1, name);
+ if (rc) {
+ OBD_FREE(fglog->llogs, sizeof(*(fglog->llogs)));
+ OBD_FREE(fglog, sizeof(*fglog));
+ RETURN(NULL);
+ }
- CDEBUG(D_INFO, "Wrote page %ld ...\n", page->index);
+init:
+ if (export) {
+ fglog->exp = export;
+ ctxt = llog_get_context(fglog->llogs, LLOG_UNLINK_REPL_CTXT);
+ LASSERT(ctxt != NULL);
- index++;
+ llog_receptor_accept(ctxt, export->exp_imp_reverse);
}
- dst->o_size = src->o_size;
- dst->o_blocks = src->o_blocks;
- dst->o_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
- unlock_page(page);
- __free_page(page);
+ CDEBUG(D_OTHER, "%s: new llog 0x%p for group %u\n",
+ obd->obd_name, fglog->llogs, group);
- RETURN(err);
+ RETURN(fglog->llogs);
}
-int filter_iocontrol(unsigned int cmd, struct lustre_handle *conn,
- int len, void *karg, void *uarg)
+int filter_iocontrol(unsigned int cmd, struct obd_export *exp,
+ int len, void *karg, void *uarg)
{
- struct obd_device *obd = class_conn2obd(conn);
+ struct obd_device *obd = exp->exp_obd;
+ struct obd_ioctl_data *data = karg;
+ int rc = 0;
switch (cmd) {
case OBD_IOC_ABORT_RECOVERY:
- CERROR("aborting recovery for device %s\n", obd->obd_name);
- target_abort_recovery(obd);
+ target_stop_recovery_thread(obd);
RETURN(0);
+ case OBD_IOC_SET_READONLY: {
+ void *handle;
+ struct super_block *sb = obd->u.filter.fo_sb;
+ struct inode *inode = sb->s_root->d_inode;
+ BDEVNAME_DECLARE_STORAGE(tmp);
+ CERROR("setting device %s read-only\n",
+ ll_bdevname(sb, tmp));
+
+ handle = fsfilt_start(obd, inode, FSFILT_OP_MKNOD, NULL);
+ LASSERT(handle);
+ (void)fsfilt_commit(obd, sb, inode, handle, 1);
+
+ ll_set_rdonly(ll_sbdev(obd->u.filter.fo_sb));
+ RETURN(0);
+ }
+
+ case OBD_IOC_CATLOGLIST: {
+ rc = llog_catalog_list(obd, 1, data);
+ RETURN(rc);
+ }
+ case OBD_IOC_LLOG_CANCEL:
+ case OBD_IOC_LLOG_REMOVE:
+ case OBD_IOC_LLOG_INFO:
+ case OBD_IOC_LLOG_PRINT: {
+ /* FIXME to be finished */
+ RETURN(-EOPNOTSUPP);
+/*
+ struct llog_ctxt *ctxt = NULL;
+
+ push_ctxt(&saved, &ctxt->loc_ctxt, NULL);
+ rc = llog_ioctl(ctxt, cmd, data);
+ pop_ctxt(&saved, &ctxt->loc_ctxt, NULL);
+
+ RETURN(rc);
+*/
+ }
+
+
default:
RETURN(-EINVAL);
}
RETURN(0);
}
+static struct llog_operations filter_unlink_repl_logops;
+static struct llog_operations filter_size_orig_logops = {
+ lop_setup: llog_obd_origin_setup,
+ lop_cleanup: llog_catalog_cleanup,
+ lop_add: llog_catalog_add,
+};
+
+static int filter_llog_init(struct obd_device *obd, struct obd_llogs *llogs,
+ struct obd_device *tgt, int count,
+ struct llog_catid *catid)
+{
+ struct llog_ctxt *ctxt;
+ int rc;
+ ENTRY;
+
+ filter_unlink_repl_logops = llog_client_ops;
+ filter_unlink_repl_logops.lop_cancel = llog_obd_repl_cancel;
+ filter_unlink_repl_logops.lop_connect = llog_repl_connect;
+ filter_unlink_repl_logops.lop_sync = llog_obd_repl_sync;
+
+ rc = obd_llog_setup(obd, llogs, LLOG_UNLINK_REPL_CTXT, tgt, 0, NULL,
+ &filter_unlink_repl_logops);
+ if (rc)
+ RETURN(rc);
+ /* FIXME - assign unlink_cb for filter's recovery */
+ ctxt = llog_get_context(llogs, LLOG_UNLINK_REPL_CTXT);
+ ctxt->llog_proc_cb = filter_recov_log_unlink_cb;
+
+ /* FIXME - count should be 1 to setup size log */
+ rc = obd_llog_setup(obd, llogs, LLOG_SIZE_ORIG_CTXT, tgt, 0,
+ &catid->lci_logid, &filter_size_orig_logops);
+ RETURN(rc);
+}
+
+static int filter_llog_finish(struct obd_device *obd,
+ struct obd_llogs *llogs, int count)
+{
+ int rc;
+ ENTRY;
+
+ rc = obd_llog_cleanup(llog_get_context(llogs, LLOG_UNLINK_REPL_CTXT));
+ if (rc)
+ RETURN(rc);
+
+ rc = obd_llog_cleanup(llog_get_context(llogs, LLOG_SIZE_ORIG_CTXT));
+ RETURN(rc);
+}
+
+static int filter_llog_connect(struct obd_export *exp,
+ struct llogd_conn_body *body)
+{
+ struct obd_device *obd = exp->exp_obd;
+ struct llog_ctxt *ctxt;
+ struct obd_llogs *llog;
+ int rc;
+ ENTRY;
+
+ CDEBUG(D_OTHER, "handle connect for %s: %u/%u/%u\n", obd->obd_name,
+ (unsigned) body->lgdc_logid.lgl_ogr,
+ (unsigned) body->lgdc_logid.lgl_oid,
+ (unsigned) body->lgdc_logid.lgl_ogen);
+ llog = filter_grab_llog_for_group(obd, body->lgdc_logid.lgl_ogr, exp);
+ LASSERT(llog != NULL);
+ ctxt = llog_get_context(llog, body->lgdc_ctxt_idx);
+ rc = llog_connect(ctxt, 1, &body->lgdc_logid,
+ &body->lgdc_gen, NULL);
+ if (rc != 0)
+ CERROR("failed to connect\n");
+
+ RETURN(rc);
+}
+
+static struct dentry *filter_lvfs_id2dentry(__u64 id, __u32 gen,
+ __u64 gr, void *data)
+{
+ return filter_id2dentry(data, NULL, gr, id);
+}
+
+static struct lvfs_callback_ops filter_lvfs_ops = {
+ l_id2dentry: filter_lvfs_id2dentry,
+};
static struct obd_ops filter_obd_ops = {
- o_owner: THIS_MODULE,
- o_attach: filter_attach,
- o_detach: filter_detach,
- o_get_info: filter_get_info,
- o_setup: filter_setup,
- o_cleanup: filter_cleanup,
- o_connect: filter_connect,
- o_disconnect: filter_disconnect,
- o_statfs: filter_statfs,
- o_syncfs: filter_syncfs,
- o_getattr: filter_getattr,
- o_create: filter_create,
- o_setattr: filter_setattr,
- o_destroy: filter_destroy,
- o_open: filter_open,
- o_close: filter_close,
- o_brw: filter_brw,
- o_punch: filter_truncate,
- o_preprw: filter_preprw,
- o_commitrw: filter_commitrw,
- o_destroy_export: filter_destroy_export,
- o_iocontrol: filter_iocontrol,
-#if 0
- o_san_preprw: filter_san_preprw,
- o_preallocate: filter_preallocate_inodes,
- o_migrate: filter_migrate,
- o_copy: filter_copy_data,
- o_iterate: filter_iterate
-#endif
+ .o_owner = THIS_MODULE,
+ .o_attach = filter_attach,
+ .o_detach = filter_detach,
+ .o_get_info = filter_get_info,
+ .o_setup = filter_setup,
+ .o_precleanup = filter_precleanup,
+ .o_cleanup = filter_cleanup,
+ .o_process_config = filter_process_config,
+ .o_connect = filter_connect,
+ .o_connect_post = filter_connect_post,
+ .o_disconnect = filter_disconnect,
+ .o_statfs = filter_statfs,
+ .o_getattr = filter_getattr,
+ .o_unpackmd = filter_unpackmd,
+ .o_create = filter_create,
+ .o_setattr = filter_setattr,
+ .o_destroy = filter_destroy,
+ .o_brw = filter_brw,
+ .o_punch = filter_truncate,
+ .o_sync = filter_sync,
+ .o_preprw = filter_preprw,
+ .o_commitrw = filter_commitrw,
+ .o_do_cow = filter_do_cow,
+ .o_write_extents = filter_write_extents,
+ .o_destroy_export = filter_destroy_export,
+ .o_llog_init = filter_llog_init,
+ .o_llog_finish = filter_llog_finish,
+ .o_llog_connect = filter_llog_connect,
+ .o_iocontrol = filter_iocontrol,
};
static struct obd_ops filter_sanobd_ops = {
- o_owner: THIS_MODULE,
- o_attach: filter_attach,
- o_detach: filter_detach,
- o_get_info: filter_get_info,
- o_setup: filter_san_setup,
- o_cleanup: filter_cleanup,
- o_connect: filter_connect,
- o_disconnect: filter_disconnect,
- o_statfs: filter_statfs,
- o_getattr: filter_getattr,
- o_create: filter_create,
- o_setattr: filter_setattr,
- o_destroy: filter_destroy,
- o_open: filter_open,
- o_close: filter_close,
- o_brw: filter_brw,
- o_punch: filter_truncate,
- o_preprw: filter_preprw,
- o_commitrw: filter_commitrw,
- o_san_preprw: filter_san_preprw,
- o_destroy_export: filter_destroy_export,
- o_iocontrol: filter_iocontrol,
-#if 0
- o_preallocate: filter_preallocate_inodes,
- o_migrate: filter_migrate,
- o_copy: filter_copy_data,
- o_iterate: filter_iterate
-#endif
+ .o_owner = THIS_MODULE,
+ .o_attach = filter_attach,
+ .o_detach = filter_detach,
+ .o_get_info = filter_get_info,
+ .o_setup = filter_san_setup,
+ .o_precleanup = filter_precleanup,
+ .o_cleanup = filter_cleanup,
+ .o_connect = filter_connect,
+ .o_connect_post = filter_connect_post,
+ .o_disconnect = filter_disconnect,
+ .o_statfs = filter_statfs,
+ .o_getattr = filter_getattr,
+ .o_unpackmd = filter_unpackmd,
+ .o_create = filter_create,
+ .o_setattr = filter_setattr,
+ .o_destroy = filter_destroy,
+ .o_brw = filter_brw,
+ .o_punch = filter_truncate,
+ .o_sync = filter_sync,
+ .o_preprw = filter_preprw,
+ .o_commitrw = filter_commitrw,
+ .o_do_cow = filter_do_cow,
+ .o_write_extents = filter_write_extents,
+ .o_san_preprw = filter_san_preprw,
+ .o_destroy_export = filter_destroy_export,
+ .o_llog_init = filter_llog_init,
+ .o_llog_finish = filter_llog_finish,
+ .o_llog_connect = filter_llog_connect,
+ .o_iocontrol = filter_iocontrol,
};
-
static int __init obdfilter_init(void)
{
struct lprocfs_static_vars lvars;
- int rc;
+ int size, rc;
+
+ printk(KERN_INFO "Lustre: Filtering OBD driver; info@clusterfs.com\n");
- printk(KERN_INFO "Lustre Filtering OBD driver; info@clusterfs.com\n");
+ lprocfs_init_vars(filter, &lvars);
- lprocfs_init_vars(&lvars);
+ size = OBDFILTER_CREATED_SCRATCHPAD_ENTRIES *
+ sizeof(*obdfilter_created_scratchpad);
+
+ OBD_ALLOC(obdfilter_created_scratchpad, size);
+ if (obdfilter_created_scratchpad == NULL) {
+ CERROR ("Can't allocate scratchpad\n");
+ return -ENOMEM;
+ }
- rc = class_register_type(&filter_obd_ops, lvars.module_vars,
+ rc = class_register_type(&filter_obd_ops, NULL, lvars.module_vars,
OBD_FILTER_DEVICENAME);
- if (rc)
+ if (rc) {
+ OBD_FREE(obdfilter_created_scratchpad, size);
return rc;
+ }
- rc = class_register_type(&filter_sanobd_ops, lvars.module_vars,
+ rc = class_register_type(&filter_sanobd_ops, NULL, lvars.module_vars,
OBD_FILTER_SAN_DEVICENAME);
- if (rc)
+ if (rc) {
class_unregister_type(OBD_FILTER_DEVICENAME);
+ OBD_FREE(obdfilter_created_scratchpad, size);
+ }
return rc;
}
{
class_unregister_type(OBD_FILTER_SAN_DEVICENAME);
class_unregister_type(OBD_FILTER_DEVICENAME);
+ OBD_FREE(obdfilter_created_scratchpad,
+ OBDFILTER_CREATED_SCRATCHPAD_ENTRIES *
+ sizeof(*obdfilter_created_scratchpad));
}
MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");