#endif
#include <linux/obd_class.h>
+#include <linux/obd_lov.h>
#include <linux/lustre_dlm.h>
#include <linux/lustre_fsfilt.h>
#include <linux/lprocfs_status.h>
#include "filter_internal.h"
-#define S_SHIFT 12
-static char *obd_type_by_mode[S_IFMT >> S_SHIFT] = {
- [0] NULL,
- [S_IFREG >> S_SHIFT] "R",
- [S_IFDIR >> S_SHIFT] "D",
- [S_IFCHR >> S_SHIFT] "C",
- [S_IFBLK >> S_SHIFT] "B",
- [S_IFIFO >> S_SHIFT] "F",
- [S_IFSOCK >> S_SHIFT] "S",
- [S_IFLNK >> S_SHIFT] "L"
-};
-
-static inline const char *obd_mode_to_type(int mode)
-{
- return obd_type_by_mode[(mode & S_IFMT) >> S_SHIFT];
-}
-
-static void filter_ffd_addref(void *ffdp)
-{
- struct filter_file_data *ffd = ffdp;
-
- atomic_inc(&ffd->ffd_refcount);
- CDEBUG(D_INFO, "GETting ffd %p : new refcount %d\n", ffd,
- atomic_read(&ffd->ffd_refcount));
-}
-
-static struct filter_file_data *filter_ffd_new(void)
-{
- struct filter_file_data *ffd;
-
- OBD_ALLOC(ffd, sizeof *ffd);
- if (ffd == NULL) {
- CERROR("out of memory\n");
- return NULL;
- }
-
- atomic_set(&ffd->ffd_refcount, 2);
-
- INIT_LIST_HEAD(&ffd->ffd_handle.h_link);
- class_handle_hash(&ffd->ffd_handle, filter_ffd_addref);
-
- return ffd;
-}
+static struct lvfs_callback_ops filter_lvfs_ops;
-static struct filter_file_data *filter_handle2ffd(struct lustre_handle *handle)
-{
- struct filter_file_data *ffd = NULL;
- ENTRY;
- LASSERT(handle != NULL);
- ffd = class_handle2object(handle->cookie);
- if (ffd != NULL)
- LASSERT(ffd->ffd_file->private_data == ffd);
- RETURN(ffd);
-}
-
-static void filter_ffd_put(struct filter_file_data *ffd)
-{
- CDEBUG(D_INFO, "PUTting ffd %p : new refcount %d\n", ffd,
- atomic_read(&ffd->ffd_refcount) - 1);
- LASSERT(atomic_read(&ffd->ffd_refcount) > 0 &&
- atomic_read(&ffd->ffd_refcount) < 0x5a5a);
- if (atomic_dec_and_test(&ffd->ffd_refcount)) {
- LASSERT(list_empty(&ffd->ffd_handle.h_link));
- OBD_FREE(ffd, sizeof *ffd);
- }
-}
-
-static void filter_ffd_destroy(struct filter_file_data *ffd)
-{
- class_handle_unhash(&ffd->ffd_handle);
- filter_ffd_put(ffd);
-}
+static int filter_destroy(struct obd_export *exp, struct obdo *oa,
+ struct lov_stripe_md *ea, struct obd_trans_info *);
static void filter_commit_cb(struct obd_device *obd, __u64 transno,
void *cb_data, int error)
obd_transno_commit_cb(obd, transno, error);
}
-static int filter_client_log_cancel(struct lustre_handle *conn,
- struct lov_stripe_md *lsm, int count,
- struct llog_cookie *cookies, int flags)
-{
- struct obd_device *obd = class_conn2obd(conn);
- struct llog_commit_data *llcd;
- struct filter_obd *filter = &obd->u.filter;
- int rc = 0;
- ENTRY;
-
- if (count == 0 || cookies == NULL) {
- down(&filter->fo_sem);
- if (filter->fo_llcd == NULL || !(flags & OBD_LLOG_FL_SENDNOW))
- GOTO(out, rc);
-
- llcd = filter->fo_llcd;
- GOTO(send_now, rc);
- }
-
- down(&filter->fo_sem);
- llcd = filter->fo_llcd;
- if (llcd == NULL) {
- llcd = llcd_grab();
- if (llcd == NULL) {
- CERROR("couldn't get an llcd - dropped "LPX64":%x+%u\n",
- cookies->lgc_lgl.lgl_oid,
- cookies->lgc_lgl.lgl_ogen, cookies->lgc_index);
- GOTO(out, rc = -ENOMEM);
- }
- llcd->llcd_import = filter->fo_mdc_imp;
- filter->fo_llcd = llcd;
- }
-
- memcpy(llcd->llcd_cookies + llcd->llcd_cookiebytes, cookies,
- sizeof(*cookies));
- llcd->llcd_cookiebytes += sizeof(*cookies);
-
- GOTO(send_now, rc);
-send_now:
- if ((PAGE_SIZE - llcd->llcd_cookiebytes < sizeof(*cookies) ||
- flags & OBD_LLOG_FL_SENDNOW)) {
- filter->fo_llcd = NULL;
- llcd_send(llcd);
- }
-out:
- up(&filter->fo_sem);
-
- return rc;
-}
-
-/* When this (destroy) operation is committed, return the cancel cookie */
-static void filter_cancel_cookies_cb(struct obd_device *obd, __u64 transno,
- void *cb_data, int error)
-{
- filter_client_log_cancel(&obd->u.filter.fo_mdc_conn, NULL, 1,
- cb_data, OBD_LLOG_FL_SENDNOW);
- OBD_FREE(cb_data, sizeof(struct llog_cookie));
-}
/* Assumes caller has already pushed us into the kernel context. */
int filter_finish_transno(struct obd_export *exp, struct obd_trans_info *oti,
struct filter_client_data *fcd = fed->fed_fcd;
__u64 last_rcvd;
loff_t off;
- ssize_t written;
+ int err, log_pri = D_HA;
/* Propagate error code. */
if (rc)
RETURN(rc);
- if (!exp->exp_obd->obd_replayable)
+ if (!exp->exp_obd->obd_replayable || oti == NULL)
RETURN(rc);
/* we don't allocate new transnos for replayed requests */
- if (oti != NULL && oti->oti_transno == 0) {
+ if (oti->oti_transno == 0) {
spin_lock(&filter->fo_translock);
last_rcvd = le64_to_cpu(filter->fo_fsd->fsd_last_transno) + 1;
filter->fo_fsd->fsd_last_transno = cpu_to_le64(last_rcvd);
spin_unlock(&filter->fo_translock);
oti->oti_transno = last_rcvd;
- fcd->fcd_last_rcvd = cpu_to_le64(last_rcvd);
- fcd->fcd_mount_count = filter->fo_fsd->fsd_mount_count;
-
- /* could get xid from oti, if it's ever needed */
- fcd->fcd_last_xid = 0;
-
- off = fed->fed_lr_off;
- fsfilt_set_last_rcvd(exp->exp_obd, last_rcvd, oti->oti_handle,
- filter_commit_cb, NULL);
- written = fsfilt_write_record(exp->exp_obd,
- filter->fo_rcvd_filp, fcd,
- sizeof(*fcd), &off);
- CDEBUG(D_HA, "wrote trans #"LPD64" for client %s at #%d: "
- "written = "LPSZ"\n", last_rcvd, fcd->fcd_uuid,
- fed->fed_lr_idx, written);
-
- if (written == sizeof(*fcd))
- RETURN(0);
- CERROR("error writing to %s: rc = %d\n", LAST_RCVD,
- (int)written);
- if (written >= 0)
- RETURN(-ENOSPC);
- RETURN(written);
+ } else {
+ spin_lock(&filter->fo_translock);
+ last_rcvd = oti->oti_transno;
+ if (last_rcvd > le64_to_cpu(filter->fo_fsd->fsd_last_transno))
+ filter->fo_fsd->fsd_last_transno =
+ cpu_to_le64(last_rcvd);
+ spin_unlock(&filter->fo_translock);
}
+ fcd->fcd_last_rcvd = cpu_to_le64(last_rcvd);
+ fcd->fcd_mount_count = filter->fo_fsd->fsd_mount_count;
- RETURN(0);
+ /* could get xid from oti, if it's ever needed */
+ fcd->fcd_last_xid = 0;
+
+ off = fed->fed_lr_off;
+ fsfilt_add_journal_cb(exp->exp_obd, last_rcvd, oti->oti_handle,
+ filter_commit_cb, NULL);
+ err = fsfilt_write_record(exp->exp_obd, filter->fo_rcvd_filp, fcd,
+ sizeof(*fcd), &off, 0);
+ if (err) {
+ log_pri = D_ERROR;
+ if (rc == 0)
+ rc = err;
+ }
+
+ CDEBUG(log_pri, "wrote trans "LPU64" for client %s at #%d: err = %d\n",
+ last_rcvd, fcd->fcd_uuid, fed->fed_lr_idx, err);
+
+ RETURN(rc);
}
void f_dput(struct dentry *dentry)
dput(dentry);
}
-/* Not racy w.r.t. others, because we are the only user of this dentry */
-static void filter_drelease(struct dentry *dentry)
-{
- if (dentry->d_fsdata)
- OBD_FREE(dentry->d_fsdata, sizeof(struct filter_dentry_data));
-}
-
-struct dentry_operations filter_dops = {
- d_release: filter_drelease,
-};
-
/* Add client data to the FILTER. We use a bitmap to locate a free space
* in the last_rcvd file if cl_idx is -1 (i.e. a new client).
* Otherwise, we have just read the data from the last_rcvd file and
LASSERT(bitmap != NULL);
/* XXX if fcd_uuid were a real obd_uuid, I could use obd_uuid_equals */
- if (!strcmp(fed->fed_fcd->fcd_uuid, "OBD_CLASS_UUID"))
+ if (!strcmp(fed->fed_fcd->fcd_uuid, obd->obd_uuid.uuid))
RETURN(0);
/* the bitmap operations can handle cl_idx > sizeof(long) * 8, so
if (new_client) {
struct obd_run_ctxt saved;
loff_t off = fed->fed_lr_off;
- int written;
+ int err;
void *handle;
CDEBUG(D_INFO, "writing client fcd at idx %u (%llu) (len %u)\n",
fed->fed_lr_idx,off,(unsigned int)sizeof(*fed->fed_fcd));
- push_ctxt(&saved, &filter->fo_ctxt, NULL);
+ push_ctxt(&saved, &obd->obd_ctxt, NULL);
/* Transaction needed to fix bug 1403 */
handle = fsfilt_start(obd,
filter->fo_rcvd_filp->f_dentry->d_inode,
FSFILT_OP_SETATTR, NULL);
if (IS_ERR(handle)) {
- written = PTR_ERR(handle);
- CERROR("unable to start transaction: rc %d\n",
- (int)written);
+ err = PTR_ERR(handle);
+ CERROR("unable to start transaction: rc %d\n", err);
} else {
- written = fsfilt_write_record(obd, filter->fo_rcvd_filp,
- fed->fed_fcd,
- sizeof(*fed->fed_fcd),
- &off);
+ err = fsfilt_write_record(obd, filter->fo_rcvd_filp,
+ fed->fed_fcd,
+ sizeof(*fed->fed_fcd),
+ &off, 1);
fsfilt_commit(obd,
filter->fo_rcvd_filp->f_dentry->d_inode,
- handle, 0);
+ handle, 1);
}
- pop_ctxt(&saved, &filter->fo_ctxt, NULL);
+ pop_ctxt(&saved, &obd->obd_ctxt, NULL);
- if (written != sizeof(*fed->fed_fcd)) {
+ if (err) {
CERROR("error writing %s client idx %u: rc %d\n",
- LAST_RCVD, fed->fed_lr_idx, written);
- if (written < 0)
- RETURN(written);
- RETURN(-ENOSPC);
+ LAST_RCVD, fed->fed_lr_idx, err);
+ RETURN(err);
}
}
RETURN(0);
struct obd_device *obd = exp->exp_obd;
struct filter_client_data zero_fcd;
struct obd_run_ctxt saved;
- int written;
+ int rc;
loff_t off;
ENTRY;
GOTO(free, 0);
/* XXX if fcd_uuid were a real obd_uuid, I could use obd_uuid_equals */
- if (strcmp(fed->fed_fcd->fcd_uuid, "OBD_CLASS_UUID") == 0)
+ if (strcmp(fed->fed_fcd->fcd_uuid, obd->obd_uuid.uuid ) == 0)
GOTO(free, 0);
LASSERT(filter->fo_last_rcvd_slots != NULL);
}
memset(&zero_fcd, 0, sizeof zero_fcd);
- push_ctxt(&saved, &filter->fo_ctxt, NULL);
- written = fsfilt_write_record(obd, filter->fo_rcvd_filp,
- &zero_fcd, sizeof(zero_fcd), &off);
-
- /* XXX: this write gets lost sometimes, unless this sync is here. */
- if (written > 0)
- file_fsync(filter->fo_rcvd_filp,
- filter->fo_rcvd_filp->f_dentry, 1);
- pop_ctxt(&saved, &filter->fo_ctxt, NULL);
-
- if (written != sizeof(zero_fcd)) {
- CERROR("error zeroing out client %s idx %u (%llu) in %s: %d\n",
- fed->fed_fcd->fcd_uuid, fed->fed_lr_idx, fed->fed_lr_off,
- LAST_RCVD, written);
- } else {
- CDEBUG(D_INFO,
- "zeroed disconnecting client %s at idx %u (%llu)\n",
- fed->fed_fcd->fcd_uuid, fed->fed_lr_idx,fed->fed_lr_off);
- }
+ push_ctxt(&saved, &obd->obd_ctxt, NULL);
+ rc = fsfilt_write_record(obd, filter->fo_rcvd_filp, &zero_fcd,
+ sizeof(zero_fcd), &off, 1);
+ pop_ctxt(&saved, &obd->obd_ctxt, NULL);
+
+ CDEBUG(rc == 0 ? D_INFO : D_ERROR,
+ "zeroing disconnecting client %s at idx %u (%llu) in %s rc %d\n",
+ fed->fed_fcd->fcd_uuid, fed->fed_lr_idx, fed->fed_lr_off,
+ LAST_RCVD, rc);
free:
OBD_FREE(fed->fed_fcd, sizeof(*fed->fed_fcd));
}
/* assumes caller is already in kernel ctxt */
-int filter_update_server_data(struct obd_device *obd,
- struct file *filp, struct filter_server_data *fsd)
+int filter_update_server_data(struct obd_device *obd, struct file *filp,
+ struct filter_server_data *fsd, int force_sync)
{
loff_t off = 0;
int rc;
ENTRY;
CDEBUG(D_INODE, "server uuid : %s\n", fsd->fsd_uuid);
- CDEBUG(D_INODE, "server last_objid: "LPU64"\n",
- le64_to_cpu(fsd->fsd_last_objid));
CDEBUG(D_INODE, "server last_rcvd : "LPU64"\n",
le64_to_cpu(fsd->fsd_last_transno));
CDEBUG(D_INODE, "server last_mount: "LPU64"\n",
le64_to_cpu(fsd->fsd_mount_count));
- rc = fsfilt_write_record(obd, filp, fsd, sizeof(*fsd), &off);
- if (rc == sizeof(*fsd))
- RETURN(0);
+ rc = fsfilt_write_record(obd, filp, fsd, sizeof(*fsd), &off,force_sync);
+ if (rc)
+ CERROR("error writing filter_server_data: rc = %d\n", rc);
+
+ RETURN(rc);
+}
+
+int filter_update_last_objid(struct obd_device *obd, obd_gr group,
+ int force_sync)
+{
+ struct filter_obd *filter = &obd->u.filter;
+ __u64 tmp;
+ loff_t off = 0;
+ int rc;
+ ENTRY;
+
+ CDEBUG(D_INODE, "server last_objid for group "LPU64": "LPU64"\n",
+ group, filter->fo_last_objids[group]);
- CDEBUG(D_INODE, "error writing filter_server_data: rc = %d\n", rc);
- if (rc >= 0)
- RETURN(-ENOSPC);
+ tmp = cpu_to_le64(filter->fo_last_objids[group]);
+ rc = fsfilt_write_record(obd, filter->fo_last_objid_files[group],
+ &tmp, sizeof(tmp), &off, force_sync);
+ if (rc)
+ CERROR("error writing group "LPU64" last objid: rc = %d\n",
+ group, rc);
RETURN(rc);
}
/* assumes caller has already in kernel ctxt */
-static int filter_init_server_data(struct obd_device *obd, struct file * filp,
- __u64 init_lastobjid)
+static int filter_init_server_data(struct obd_device *obd, struct file * filp)
{
struct filter_obd *filter = &obd->u.filter;
struct filter_server_data *fsd;
struct filter_client_data *fcd = NULL;
struct inode *inode = filp->f_dentry->d_inode;
unsigned long last_rcvd_size = inode->i_size;
- __u64 mount_count = 0;
+ __u64 mount_count;
int cl_idx;
loff_t off = 0;
int rc;
CWARN("%s: initializing new %s\n", obd->obd_name, LAST_RCVD);
memcpy(fsd->fsd_uuid, obd->obd_uuid.uuid,sizeof(fsd->fsd_uuid));
- fsd->fsd_last_objid = cpu_to_le64(init_lastobjid);
fsd->fsd_last_transno = 0;
mount_count = fsd->fsd_mount_count = 0;
fsd->fsd_server_size = cpu_to_le32(FILTER_LR_SERVER_SIZE);
fsd->fsd_subdir_count = cpu_to_le16(FILTER_SUBDIR_COUNT);
filter->fo_subdir_count = FILTER_SUBDIR_COUNT;
} else {
- int retval = fsfilt_read_record(obd, filp, fsd,
- sizeof(*fsd), &off);
- if (retval != sizeof(*fsd)) {
+ rc = fsfilt_read_record(obd, filp, fsd, sizeof(*fsd), &off);
+ if (rc) {
CDEBUG(D_INODE,"OBD filter: error reading %s: rc %d\n",
- LAST_RCVD, retval);
- GOTO(err_fsd, rc = -EIO);
+ LAST_RCVD, rc);
+ GOTO(err_fsd, rc);
+ }
+ if (strcmp(fsd->fsd_uuid, obd->obd_uuid.uuid) != 0) {
+ CERROR("OBD UUID %s does not match last_rcvd UUID %s\n",
+ obd->obd_uuid.uuid, fsd->fsd_uuid);
+ GOTO(err_fsd, rc = -EINVAL);
}
mount_count = le64_to_cpu(fsd->fsd_mount_count);
filter->fo_subdir_count = le16_to_cpu(fsd->fsd_subdir_count);
- fsd->fsd_last_objid =
- cpu_to_le64(le64_to_cpu(fsd->fsd_last_objid) +
- FILTER_SKIP_OBJID);
}
- if (fsd->fsd_feature_incompat) {
+ if (fsd->fsd_feature_incompat & ~le32_to_cpu(FILTER_INCOMPAT_SUPP)) {
CERROR("unsupported feature %x\n",
- le32_to_cpu(fsd->fsd_feature_incompat));
+ le32_to_cpu(fsd->fsd_feature_incompat) &
+ ~FILTER_INCOMPAT_SUPP);
GOTO(err_fsd, rc = -EINVAL);
}
- if (fsd->fsd_feature_rocompat) {
+ if (fsd->fsd_feature_rocompat & ~le32_to_cpu(FILTER_ROCOMPAT_SUPP)) {
CERROR("read-only feature %x\n",
- le32_to_cpu(fsd->fsd_feature_rocompat));
+ le32_to_cpu(fsd->fsd_feature_rocompat) &
+ ~FILTER_ROCOMPAT_SUPP);
/* Do something like remount filesystem read-only */
GOTO(err_fsd, rc = -EINVAL);
}
- CDEBUG(D_INODE, "%s: server last_objid: "LPU64"\n",
- obd->obd_name, le64_to_cpu(fsd->fsd_last_objid));
CDEBUG(D_INODE, "%s: server last_rcvd : "LPU64"\n",
obd->obd_name, le64_to_cpu(fsd->fsd_last_transno));
CDEBUG(D_INODE, "%s: server last_mount: "LPU64"\n",
obd->obd_name, le32_to_cpu(fsd->fsd_client_size));
CDEBUG(D_INODE, "%s: server subdir_count: %u\n",
obd->obd_name, le16_to_cpu(fsd->fsd_subdir_count));
+ CDEBUG(D_INODE, "%s: last_rcvd clients: %lu\n", obd->obd_name,
+ last_rcvd_size <= FILTER_LR_CLIENT_START ? 0 :
+ (last_rcvd_size-FILTER_LR_CLIENT_START) /FILTER_LR_CLIENT_SIZE);
if (!obd->obd_replayable) {
CWARN("%s: recovery support OFF\n", obd->obd_name);
GOTO(out, rc = 0);
}
- for (cl_idx = 0; off < last_rcvd_size; cl_idx++) {
+ for (cl_idx = 0, off = le32_to_cpu(fsd->fsd_client_start);
+ off < last_rcvd_size; cl_idx++) {
__u64 last_rcvd;
int mount_age;
if (!fcd) {
OBD_ALLOC(fcd, sizeof(*fcd));
if (!fcd)
- GOTO(err_fsd, rc = -ENOMEM);
+ GOTO(err_client, rc = -ENOMEM);
}
- /* Don't assume off is incremented properly, in case
- * sizeof(fsd) isn't the same as fsd->fsd_client_size.
- */
+ /* Don't assume off is incremented properly by
+ * fsfilt_read_record(), in case sizeof(*fcd)
+ * isn't the same as fsd->fsd_client_size. */
off = le32_to_cpu(fsd->fsd_client_start) +
cl_idx * le16_to_cpu(fsd->fsd_client_size);
rc = fsfilt_read_record(obd, filp, fcd, sizeof(*fcd), &off);
- if (rc != sizeof(*fcd)) {
- CERROR("error reading FILTER %s offset %d: rc = %d\n",
- LAST_RCVD, cl_idx, rc);
- if (rc > 0) /* XXX fatal error or just abort reading? */
- rc = -EIO;
- break;
+ if (rc) {
+ CERROR("error reading FILT %s idx %d off %llu: rc %d\n",
+ LAST_RCVD, cl_idx, off, rc);
+ break; /* read error shouldn't cause startup to fail */
}
if (fcd->fcd_uuid[0] == '\0') {
LPU64"\n", fcd->fcd_uuid, cl_idx,
last_rcvd, le64_to_cpu(fsd->fsd_last_transno),
le64_to_cpu(fcd->fcd_mount_count), mount_count);
- if (exp == NULL) {
- /* XXX this rc is ignored */
- rc = -ENOMEM;
- break;
- }
+ if (exp == NULL)
+ GOTO(err_client, rc = -ENOMEM);
+
memcpy(&exp->exp_client_uuid.uuid, fcd->fcd_uuid,
sizeof exp->exp_client_uuid.uuid);
fed = &exp->exp_filter_data;
fed->fed_fcd = fcd;
filter_client_add(obd, filter, fed, cl_idx);
/* create helper if export init gets more complex */
- INIT_LIST_HEAD(&fed->fed_open_head);
spin_lock_init(&fed->fed_lock);
fcd = NULL;
obd->obd_recoverable_clients++;
class_export_put(exp);
} else {
- CDEBUG(D_INFO,
- "discarded client %d UUID '%s' count "LPU64"\n",
- cl_idx, fcd->fcd_uuid,
+ CDEBUG(D_INFO, "discarded client %d UUID '%s' count "
+ LPU64"\n", cl_idx, fcd->fcd_uuid,
le64_to_cpu(fcd->fcd_mount_count));
}
if (last_rcvd > le64_to_cpu(filter->fo_fsd->fsd_last_transno))
filter->fo_fsd->fsd_last_transno=cpu_to_le64(last_rcvd);
- obd->obd_last_committed =
- le64_to_cpu(filter->fo_fsd->fsd_last_transno);
+ }
- if (obd->obd_recoverable_clients) {
- CERROR("RECOVERY: %d recoverable clients, last_rcvd "
- LPU64"\n", obd->obd_recoverable_clients,
- le64_to_cpu(filter->fo_fsd->fsd_last_transno));
- obd->obd_next_recovery_transno =
- obd->obd_last_committed + 1;
- obd->obd_recovering = 1;
- }
+ obd->obd_last_committed = le64_to_cpu(filter->fo_fsd->fsd_last_transno);
+ if (obd->obd_recoverable_clients) {
+ CERROR("RECOVERY: %d recoverable clients, last_rcvd "
+ LPU64"\n", obd->obd_recoverable_clients,
+ le64_to_cpu(filter->fo_fsd->fsd_last_transno));
+ obd->obd_next_recovery_transno = obd->obd_last_committed + 1;
+ obd->obd_recovering = 1;
}
if (fcd)
fsd->fsd_mount_count = cpu_to_le64(mount_count + 1);
/* save it, so mount count and last_transno is current */
- rc = filter_update_server_data(obd, filp, filter->fo_fsd);
+ rc = filter_update_server_data(obd, filp, filter->fo_fsd, 1);
RETURN(rc);
+err_client:
+ class_disconnect_exports(obd, 0);
err_fsd:
filter_free_server_data(filter);
RETURN(rc);
}
-/* setup the object store with correct subdirectories */
-static int filter_prep(struct obd_device *obd)
+static int filter_cleanup_groups(struct obd_device *obd)
{
- struct obd_run_ctxt saved;
struct filter_obd *filter = &obd->u.filter;
- struct dentry *dentry, *O_dentry;
- struct file *file;
- struct inode *inode;
int i;
- int rc = 0;
- int mode = 0;
+ ENTRY;
+
+ if (filter->fo_subdir_count) {
+ for (i = 0; i < filter->fo_subdir_count; i++) {
+ struct dentry *dentry = filter->fo_dentry_O_sub[i];
+ f_dput(dentry);
+ filter->fo_dentry_O_sub[i] = NULL;
+ }
+ OBD_FREE(filter->fo_dentry_O_sub,
+ filter->fo_subdir_count *
+ sizeof(*filter->fo_dentry_O_sub));
+ }
+ if (filter->fo_dentry_O_groups != NULL &&
+ filter->fo_last_objids != NULL &&
+ filter->fo_last_objid_files != NULL) {
+ for (i = 0; i < FILTER_GROUPS; i++) {
+ struct dentry *dentry = filter->fo_dentry_O_groups[i];
+ struct file *filp = filter->fo_last_objid_files[i];
+ if (dentry != NULL) {
+ f_dput(dentry);
+ filter->fo_dentry_O_groups[i] = NULL;
+ }
+ if (filp != NULL) {
+ filp_close(filp, 0);
+ filter->fo_last_objid_files[i] = NULL;
+ }
+ }
+ }
+ if (filter->fo_dentry_O_groups != NULL)
+ OBD_FREE(filter->fo_dentry_O_groups,
+ FILTER_GROUPS * sizeof(struct dentry *));
+ if (filter->fo_last_objids != NULL)
+ OBD_FREE(filter->fo_last_objids,
+ FILTER_GROUPS * sizeof(__u64));
+ if (filter->fo_last_objid_files != NULL)
+ OBD_FREE(filter->fo_last_objid_files,
+ FILTER_GROUPS * sizeof(struct file *));
+ RETURN(0);
+}
+
+/* FIXME: object groups */
+static int filter_prep_groups(struct obd_device *obd)
+{
+ struct filter_obd *filter = &obd->u.filter;
+ struct dentry *dentry, *O_dentry;
+ struct file *filp;
+ int i, rc = 0, cleanup_phase = 0;
+ ENTRY;
- push_ctxt(&saved, &filter->fo_ctxt, NULL);
- dentry = simple_mkdir(current->fs->pwd, "O", 0700);
- CDEBUG(D_INODE, "got/created O: %p\n", dentry);
- if (IS_ERR(dentry)) {
- rc = PTR_ERR(dentry);
+ O_dentry = simple_mkdir(current->fs->pwd, "O", 0700);
+ CDEBUG(D_INODE, "got/created O: %p\n", O_dentry);
+ if (IS_ERR(O_dentry)) {
+ rc = PTR_ERR(O_dentry);
CERROR("cannot open/create O: rc = %d\n", rc);
- GOTO(out, rc);
+ GOTO(cleanup, rc);
}
- filter->fo_dentry_O = dentry;
+ filter->fo_dentry_O = O_dentry;
+ cleanup_phase = 1; /* O_dentry */
- /*
- * Create directories and/or get dentries for each object type.
- * This saves us from having to do multiple lookups for each one.
- */
- O_dentry = filter->fo_dentry_O;
- for (mode = 0; mode < (S_IFMT >> S_SHIFT); mode++) {
- char *name = obd_type_by_mode[mode];
+ /* Lookup "R" to tell if we're on an old OST FS and need to convert
+ * from O/R/<dir>/<objid> to O/0/<dir>/<objid>. This can be removed
+ * some time post 1.0 when all old-style OSTs have converted along
+ * with the init_objid hack. */
+ dentry = ll_lookup_one_len("R", O_dentry, 1);
+ if (IS_ERR(dentry))
+ GOTO(cleanup, rc = PTR_ERR(dentry));
+ if (dentry->d_inode && S_ISDIR(dentry->d_inode->i_mode)) {
+ struct dentry *O0_dentry = lookup_one_len("0", O_dentry, 1);
+ ENTRY;
+
+ CWARN("converting OST to new object layout\n");
+ if (IS_ERR(O0_dentry)) {
+ rc = PTR_ERR(O0_dentry);
+ CERROR("error looking up O/0: rc %d\n", rc);
+ GOTO(cleanup_R, rc);
+ }
- if (!name) {
- filter->fo_dentry_O_mode[mode] = NULL;
- continue;
+ if (O0_dentry->d_inode) {
+ CERROR("Both O/R and O/0 exist. Fix manually.\n");
+ GOTO(cleanup_O0, rc = -EEXIST);
+ }
+
+ down(&O_dentry->d_inode->i_sem);
+ rc = vfs_rename(O_dentry->d_inode, dentry,
+ O_dentry->d_inode, O0_dentry);
+ up(&O_dentry->d_inode->i_sem);
+
+ if (rc) {
+ CERROR("error renaming O/R to O/0: rc %d\n", rc);
+ GOTO(cleanup_O0, rc);
}
+ filter->fo_fsd->fsd_feature_incompat |=
+ cpu_to_le32(FILTER_INCOMPAT_GROUPS);
+ rc = filter_update_server_data(obd, filter->fo_rcvd_filp,
+ filter->fo_fsd, 1);
+ GOTO(cleanup_O0, rc);
+
+ cleanup_O0:
+ dput(O0_dentry);
+ cleanup_R:
+ dput(dentry);
+ if (rc)
+ GOTO(cleanup, rc);
+ } else {
+ dput(dentry);
+ }
+
+ OBD_ALLOC(filter->fo_last_objids, FILTER_GROUPS * sizeof(__u64));
+ if (filter->fo_last_objids == NULL)
+ GOTO(cleanup, rc = -ENOMEM);
+ cleanup_phase = 2; /* groups */
+
+ OBD_ALLOC(filter->fo_dentry_O_groups, FILTER_GROUPS * sizeof(dentry));
+ if (filter->fo_dentry_O_groups == NULL)
+ GOTO(cleanup, rc = -ENOMEM);
+ OBD_ALLOC(filter->fo_last_objid_files, FILTER_GROUPS * sizeof(filp));
+ if (filter->fo_last_objid_files == NULL)
+ GOTO(cleanup, rc = -ENOMEM);
+
+ for (i = 0; i < FILTER_GROUPS; i++) {
+ char name[25];
+ loff_t off = 0;
+
+ sprintf(name, "%d", i);
dentry = simple_mkdir(O_dentry, name, 0700);
CDEBUG(D_INODE, "got/created O/%s: %p\n", name, dentry);
if (IS_ERR(dentry)) {
rc = PTR_ERR(dentry);
CERROR("cannot create O/%s: rc = %d\n", name, rc);
- GOTO(err_O_mode, rc);
+ GOTO(cleanup, rc);
+ }
+ filter->fo_dentry_O_groups[i] = dentry;
+
+ sprintf(name, "O/%d/LAST_ID", i);
+ filp = filp_open(name, O_CREAT | O_RDWR, 0700);
+ if (IS_ERR(dentry)) {
+ rc = PTR_ERR(dentry);
+ CERROR("cannot create %s: rc = %d\n", name, rc);
+ GOTO(cleanup, rc);
+ }
+ filter->fo_last_objid_files[i] = filp;
+
+ if (filp->f_dentry->d_inode->i_size == 0) {
+ if (i == 0 && filter->fo_fsd->fsd_unused != 0) {
+ /* OST conversion, remove sometime post 1.0 */
+ filter->fo_last_objids[i] =
+ le64_to_cpu(filter->fo_fsd->fsd_unused);
+ CWARN("saving old objid "LPU64" to LAST_ID\n",
+ filter->fo_last_objids[i]);
+ rc = filter_update_last_objid(obd, 0, 1);
+ if (rc)
+ GOTO(cleanup, rc);
+ } else {
+ filter->fo_last_objids[i] = FILTER_INIT_OBJID;
+ }
+ continue;
+ }
+
+ rc = fsfilt_read_record(obd, filp, &filter->fo_last_objids[i],
+ sizeof(__u64), &off);
+ if (rc) {
+ CDEBUG(D_INODE,"OBD filter: error reading %s: rc %d\n",
+ name, rc);
+ GOTO(cleanup, rc);
+ }
+ filter->fo_last_objids[i] =
+ le64_to_cpu(filter->fo_last_objids[i]);
+ CDEBUG(D_INODE, "%s: server last_objid group %d: "LPU64"\n",
+ obd->obd_name, i, filter->fo_last_objids[i]);
+ }
+
+ if (filter->fo_subdir_count) {
+ O_dentry = filter->fo_dentry_O_groups[0];
+ OBD_ALLOC(filter->fo_dentry_O_sub,
+ filter->fo_subdir_count * sizeof(dentry));
+ if (filter->fo_dentry_O_sub == NULL)
+ GOTO(cleanup, rc = -ENOMEM);
+
+ for (i = 0; i < filter->fo_subdir_count; i++) {
+ char dir[20];
+ snprintf(dir, sizeof(dir), "d%u", i);
+
+ dentry = simple_mkdir(O_dentry, dir, 0700);
+ CDEBUG(D_INODE, "got/created O/0/%s: %p\n", dir,dentry);
+ if (IS_ERR(dentry)) {
+ rc = PTR_ERR(dentry);
+ CERROR("can't create O/0/%s: rc = %d\n",dir,rc);
+ GOTO(cleanup, rc);
+ }
+ filter->fo_dentry_O_sub[i] = dentry;
}
- filter->fo_dentry_O_mode[mode] = dentry;
}
+ RETURN(0);
+
+ cleanup:
+ switch (cleanup_phase) {
+ case 2:
+ filter_cleanup_groups(obd);
+ case 1:
+ f_dput(filter->fo_dentry_O);
+ filter->fo_dentry_O = NULL;
+ default:
+ break;
+ }
+ return rc;
+}
+
+/* setup the object store with correct subdirectories */
+static int filter_prep(struct obd_device *obd)
+{
+ struct obd_run_ctxt saved;
+ struct filter_obd *filter = &obd->u.filter;
+ struct file *file;
+ struct inode *inode;
+ int rc = 0;
+ ENTRY;
+ push_ctxt(&saved, &obd->obd_ctxt, NULL);
file = filp_open(LAST_RCVD, O_RDWR | O_CREAT | O_LARGEFILE, 0700);
if (!file || IS_ERR(file)) {
rc = PTR_ERR(file);
CERROR("OBD filter: cannot open/create %s: rc = %d\n",
LAST_RCVD, rc);
- GOTO(err_O_mode, rc);
+ GOTO(out, rc);
}
if (!S_ISREG(file->f_dentry->d_inode->i_mode)) {
GOTO(err_filp, rc = -ENOENT);
}
- rc = fsfilt_journal_data(obd, file);
- if (rc) {
- CERROR("cannot journal data on %s: rc = %d\n", LAST_RCVD, rc);
- GOTO(err_filp, rc);
- }
/* steal operations */
inode = file->f_dentry->d_inode;
filter->fo_fop = file->f_op;
filter->fo_iop = inode->i_op;
filter->fo_aops = inode->i_mapping->a_ops;
-#ifdef I_SKIP_PDFLUSH
- /*
- * we need this to protect from deadlock
- * pdflush vs. lustre_fwrite()
- */
- inode->i_flags |= I_SKIP_PDFLUSH;
-#endif
- rc = filter_init_server_data(obd, file, FILTER_INIT_OBJID);
+ rc = filter_init_server_data(obd, file);
if (rc) {
CERROR("cannot read %s: rc = %d\n", LAST_RCVD, rc);
- GOTO(err_client, rc);
+ GOTO(err_filp, rc);
}
filter->fo_rcvd_filp = file;
- if (filter->fo_subdir_count) {
- O_dentry = filter->fo_dentry_O_mode[S_IFREG >> S_SHIFT];
- OBD_ALLOC(filter->fo_dentry_O_sub,
- filter->fo_subdir_count * sizeof(dentry));
- if (!filter->fo_dentry_O_sub)
- GOTO(err_client, rc = -ENOMEM);
-
- for (i = 0; i < filter->fo_subdir_count; i++) {
- char dir[20];
- snprintf(dir, sizeof(dir), "d%u", i);
+ rc = filter_prep_groups(obd);
+ if (rc)
+ GOTO(err_server_data, rc);
- dentry = simple_mkdir(O_dentry, dir, 0700);
- CDEBUG(D_INODE, "got/created O/R/%s: %p\n", dir,dentry);
- if (IS_ERR(dentry)) {
- rc = PTR_ERR(dentry);
- CERROR("can't create O/R/%s: rc = %d\n",dir,rc);
- GOTO(err_O_sub, rc);
- }
- filter->fo_dentry_O_sub[i] = dentry;
- }
- }
- rc = 0;
out:
- pop_ctxt(&saved, &filter->fo_ctxt, NULL);
+ pop_ctxt(&saved, &obd->obd_ctxt, NULL);
return(rc);
-err_O_sub:
- while (i-- > 0) {
- struct dentry *dentry = filter->fo_dentry_O_sub[i];
- if (dentry) {
- f_dput(dentry);
- filter->fo_dentry_O_sub[i] = NULL;
- }
- }
- OBD_FREE(filter->fo_dentry_O_sub,
- filter->fo_subdir_count * sizeof(dentry));
-err_client:
- class_disconnect_exports(obd, 0);
-err_filp:
+ err_server_data:
+ //class_disconnect_exports(obd, 0);
+ filter_free_server_data(filter);
+ err_filp:
if (filp_close(file, 0))
CERROR("can't close %s after error\n", LAST_RCVD);
filter->fo_rcvd_filp = NULL;
-err_O_mode:
- while (mode-- > 0) {
- struct dentry *dentry = filter->fo_dentry_O_mode[mode];
- if (dentry) {
- f_dput(dentry);
- filter->fo_dentry_O_mode[mode] = NULL;
- }
- }
- f_dput(filter->fo_dentry_O);
- filter->fo_dentry_O = NULL;
goto out;
}
{
struct obd_run_ctxt saved;
struct filter_obd *filter = &obd->u.filter;
- long rc;
- int mode;
+ int rc, i;
/* XXX: filter_update_lastobjid used to call fsync_dev. It might be
* best to start a transaction with h_sync, because we removed this
* from lastobjid */
- push_ctxt(&saved, &filter->fo_ctxt, NULL);
+ push_ctxt(&saved, &obd->obd_ctxt, NULL);
rc = filter_update_server_data(obd, filter->fo_rcvd_filp,
- filter->fo_fsd);
+ filter->fo_fsd, 0);
if (rc)
- CERROR("error writing lastobjid: rc = %ld\n", rc);
+ CERROR("error writing server data: rc = %d\n", rc);
-
- if (filter->fo_rcvd_filp) {
- rc = file_fsync(filter->fo_rcvd_filp,
- filter->fo_rcvd_filp->f_dentry, 1);
- filp_close(filter->fo_rcvd_filp, 0);
- filter->fo_rcvd_filp = NULL;
+ for (i = 0; i < FILTER_GROUPS; i++) {
+ rc = filter_update_last_objid(obd, i, (i == FILTER_GROUPS - 1));
if (rc)
- CERROR("error closing %s: rc = %ld\n", LAST_RCVD, rc);
+ CERROR("error writing group %d lastobjid: rc = %d\n",
+ i, rc);
}
- if (filter->fo_subdir_count) {
- int i;
- for (i = 0; i < filter->fo_subdir_count; i++) {
- struct dentry *dentry = filter->fo_dentry_O_sub[i];
- f_dput(dentry);
- filter->fo_dentry_O_sub[i] = NULL;
- }
- OBD_FREE(filter->fo_dentry_O_sub,
- filter->fo_subdir_count *
- sizeof(*filter->fo_dentry_O_sub));
- }
- for (mode = 0; mode < (S_IFMT >> S_SHIFT); mode++) {
- struct dentry *dentry = filter->fo_dentry_O_mode[mode];
- if (dentry) {
- f_dput(dentry);
- filter->fo_dentry_O_mode[mode] = NULL;
- }
- }
+ filp_close(filter->fo_rcvd_filp, 0);
+ filter->fo_rcvd_filp = NULL;
+ if (rc)
+ CERROR("error closing %s: rc = %d\n", LAST_RCVD, rc);
+
+ filter_cleanup_groups(obd);
f_dput(filter->fo_dentry_O);
filter_free_server_data(filter);
- pop_ctxt(&saved, &filter->fo_ctxt, NULL);
+ pop_ctxt(&saved, &obd->obd_ctxt, NULL);
+}
+
+static void filter_set_last_id(struct filter_obd *filter, struct obdo *oa,
+ obd_id id)
+{
+ obd_gr group = 0;
+ LASSERT(filter->fo_fsd != NULL);
+
+ if (oa != NULL) {
+ LASSERT(oa->o_gr <= FILTER_GROUPS);
+ group = oa->o_gr;
+ }
+
+ spin_lock(&filter->fo_objidlock);
+ filter->fo_last_objids[group] = id;
+ spin_unlock(&filter->fo_objidlock);
}
-__u64 filter_next_id(struct filter_obd *filter)
+__u64 filter_last_id(struct filter_obd *filter, struct obdo *oa)
{
obd_id id;
+ obd_gr group = 0;
LASSERT(filter->fo_fsd != NULL);
+ if (oa != NULL) {
+ LASSERT(oa->o_gr <= FILTER_GROUPS);
+ group = oa->o_gr;
+ }
+
+ /* FIXME: object groups */
spin_lock(&filter->fo_objidlock);
- id = le64_to_cpu(filter->fo_fsd->fsd_last_objid);
- filter->fo_fsd->fsd_last_objid = cpu_to_le64(id + 1);
+ id = filter->fo_last_objids[group];
spin_unlock(&filter->fo_objidlock);
return id;
/* XXX layering violation! -phil */
l_lock(&lock->l_resource->lr_namespace->ns_lock);
/* Get this: if filter_blocking_ast is racing with ldlm_intent_policy,
- * such that mds_blocking_ast is called just before l_i_p takes the
+ * such that filter_blocking_ast is called just before l_i_p takes the
* ns_lock, then by the time we get the lock, we might not be the
* correct blocking function anymore. So check, and return early, if
* so. */
&flags, ldlm_completion_ast,
filter_blocking_ast, NULL, lockh);
- RETURN(rc == ELDLM_OK ? 0 : -ENOLCK); /* XXX translate ldlm code */
+ RETURN(rc == ELDLM_OK ? 0 : -EIO); /* XXX translate ldlm code */
}
/* We never dget the object parent, so DON'T dput it either */
}
/* We never dget the object parent, so DON'T dput it either */
-struct dentry *filter_parent(struct obd_device *obd, obd_mode mode,
- obd_id objid)
+struct dentry *filter_parent(struct obd_device *obd, obd_gr group, obd_id objid)
{
struct filter_obd *filter = &obd->u.filter;
+ LASSERT(group < FILTER_GROUPS); /* FIXME: object groups */
- LASSERT(S_ISREG(mode)); /* only regular files for now */
- if (!S_ISREG(mode) || filter->fo_subdir_count == 0)
- return filter->fo_dentry_O_mode[(mode & S_IFMT) >> S_SHIFT];
+ if (group > 0 || filter->fo_subdir_count == 0)
+ return filter->fo_dentry_O_groups[group];
return filter->fo_dentry_O_sub[objid & (filter->fo_subdir_count - 1)];
}
/* We never dget the object parent, so DON'T dput it either */
-struct dentry *filter_parent_lock(struct obd_device *obd, obd_mode mode,
+struct dentry *filter_parent_lock(struct obd_device *obd, obd_gr group,
obd_id objid, ldlm_mode_t lock_mode,
struct lustre_handle *lockh)
{
unsigned long now = jiffies;
- struct dentry *de = filter_parent(obd, mode, objid);
+ struct dentry *de = filter_parent(obd, group, objid);
int rc;
if (IS_ERR(de))
* internal to the filesystem code. */
struct dentry *filter_fid2dentry(struct obd_device *obd,
struct dentry *dir_dentry,
- obd_mode mode, obd_id id)
+ obd_gr group, obd_id id)
{
struct lustre_handle lockh;
struct dentry *dparent = dir_dentry;
if (id == 0) {
CERROR("fatal: invalid object id 0\n");
- LBUG();
RETURN(ERR_PTR(-ESTALE));
}
len = sprintf(name, LPU64, id);
if (dir_dentry == NULL) {
- dparent = filter_parent_lock(obd, mode, id, LCK_PR, &lockh);
+ dparent = filter_parent_lock(obd, group, id, LCK_PR, &lockh);
if (IS_ERR(dparent))
RETURN(dparent);
}
RETURN(dchild);
}
-static struct file *filter_obj_open(struct obd_export *export,
- struct obd_trans_info *oti,
- __u64 id, __u32 type, int parent_mode,
- struct lustre_handle *parent_lockh)
+static int filter_prepare_destroy(struct obd_device *obd, obd_id objid)
{
- struct obd_device *obd = export->exp_obd;
- struct filter_obd *filter = &obd->u.filter;
- struct dentry *dchild = NULL, *dparent = NULL;
- struct filter_export_data *fed = &export->exp_filter_data;
- struct filter_dentry_data *fdd = NULL;
- struct filter_file_data *ffd = NULL;
- struct obd_run_ctxt saved;
- char name[24];
- struct file *file;
- int len, cleanup_phase = 0;
+ struct lustre_handle lockh;
+ int flags = LDLM_AST_DISCARD_DATA, rc;
+ struct ldlm_res_id res_id = { .name = { objid } };
+ struct ldlm_extent extent = { 0, OBD_OBJECT_EOF };
ENTRY;
- push_ctxt(&saved, &filter->fo_ctxt, NULL);
-
- if (!id) {
- CERROR("fatal: invalid obdo "LPU64"\n", id);
- GOTO(cleanup, file = ERR_PTR(-ESTALE));
- }
+ /* Tell the clients that the object is gone now and that they should
+ * throw away any cached pages. If we're the OST at stripe 0 in the
+ * file then this enqueue will communicate the DISCARD to all the
+ * clients. This assumes that we always destroy all the objects for
+ * a file at a time, as is currently the case. If we're not the
+ * OST at stripe 0 then we'll harmlessly get a very lonely lock in
+ * the local DLM and immediately drop it. */
+ rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, NULL,
+ res_id, LDLM_EXTENT, &extent,
+ sizeof(extent), LCK_PW, &flags,
+ ldlm_completion_ast, filter_blocking_ast,
+ NULL, &lockh);
- if (!(type & S_IFMT)) {
- CERROR("OBD %s, object "LPU64" has bad type: %o\n",
- __FUNCTION__, id, type);
- GOTO(cleanup, file = ERR_PTR(-EINVAL));
- }
+ /* We only care about the side-effects, just drop the lock. */
+ if (rc == ELDLM_OK)
+ ldlm_lock_decref(&lockh, LCK_PW);
- ffd = filter_ffd_new();
- if (ffd == NULL) {
- CERROR("obdfilter: out of memory\n");
- GOTO(cleanup, file = ERR_PTR(-ENOMEM));
- }
-
- cleanup_phase = 1;
-
- /* We preallocate this to avoid blocking while holding fo_fddlock */
- OBD_ALLOC(fdd, sizeof *fdd);
- if (fdd == NULL) {
- CERROR("obdfilter: out of memory\n");
- GOTO(cleanup, file = ERR_PTR(-ENOMEM));
- }
-
- cleanup_phase = 2;
-
- dparent = filter_parent_lock(obd, type, id, parent_mode, parent_lockh);
- if (IS_ERR(dparent))
- GOTO(cleanup, file = (void *)dparent);
-
- cleanup_phase = 3;
-
- len = snprintf(name, sizeof(name), LPU64, id);
- dchild = ll_lookup_one_len(name, dparent, len);
- if (IS_ERR(dchild))
- GOTO(cleanup, file = (void *)dchild);
-
- cleanup_phase = 4;
-
- if (dchild->d_inode == NULL) {
- CERROR("opening non-existent object %s - O_CREAT?\n", name);
- /* dput(dchild); call filter_create_internal here */
- file = ERR_PTR(-ENOENT);
- GOTO(cleanup, file);
- }
-
- /* dentry_open does a dput(dchild) and mntput(mnt) on error */
- mntget(filter->fo_vfsmnt);
- file = dentry_open(dchild, filter->fo_vfsmnt, O_RDWR | O_LARGEFILE);
- if (IS_ERR(file)) {
- dchild = NULL; /* prevent a double dput in step 4 */
- CERROR("error opening %s: rc %ld\n", name, PTR_ERR(file));
- GOTO(cleanup, file);
- }
-
- spin_lock(&filter->fo_fddlock);
- if (dchild->d_fsdata) {
- spin_unlock(&filter->fo_fddlock);
- OBD_FREE(fdd, sizeof *fdd);
- fdd = dchild->d_fsdata;
- LASSERT(fdd->fdd_magic == FILTER_DENTRY_MAGIC);
- /* should only happen during client recovery */
- if (fdd->fdd_flags & FILTER_FLAG_DESTROY)
- CDEBUG(D_INODE,"opening destroyed object "LPU64"\n",id);
- atomic_inc(&fdd->fdd_open_count);
- } else {
- atomic_set(&fdd->fdd_open_count, 1);
- fdd->fdd_magic = FILTER_DENTRY_MAGIC;
- fdd->fdd_flags = 0;
- fdd->fdd_objid = id;
- /* If this is racy, then we can use {cmp}xchg and atomic_add */
- dchild->d_fsdata = fdd;
- spin_unlock(&filter->fo_fddlock);
- }
-
- ffd->ffd_file = file;
- LASSERT(file->private_data == NULL);
- file->private_data = ffd;
-
- if (!dchild->d_op)
- dchild->d_op = &filter_dops;
- else
- LASSERT(dchild->d_op == &filter_dops);
-
- spin_lock(&fed->fed_lock);
- list_add(&ffd->ffd_export_list, &fed->fed_open_head);
- spin_unlock(&fed->fed_lock);
-
- CDEBUG(D_INODE, "opened objid "LPU64": rc = %p\n", id, file);
-cleanup:
- switch (cleanup_phase) {
- case 4:
- if (IS_ERR(file))
- f_dput(dchild);
- case 3:
- if (IS_ERR(file))
- filter_parent_unlock(dparent, parent_lockh,parent_mode);
- case 2:
- if (IS_ERR(file))
- OBD_FREE(fdd, sizeof *fdd);
- case 1:
- if (IS_ERR(file))
- filter_ffd_destroy(ffd);
- filter_ffd_put(ffd);
- case 0:
- pop_ctxt(&saved, &filter->fo_ctxt, NULL);
- }
- RETURN(file);
+ RETURN(rc);
}
/* Caller must hold LCK_PW on parent and push us into kernel context.
ENTRY;
if (inode->i_nlink != 1 || atomic_read(&inode->i_count) != 1) {
- CERROR("destroying objid %*s nlink = %d, count = %d\n",
+ CERROR("destroying objid %*s nlink = %lu, count = %d\n",
dchild->d_name.len, dchild->d_name.name,
- inode->i_nlink, atomic_read(&inode->i_count));
- }
-
-
-#if 0
- /* Tell the clients that the object is gone now and that they should
- * throw away any cached pages. We don't need to wait until they're
- * done, so just decref the lock right away and let ldlm_completion_ast
- * clean up when it's all over. */
- ldlm_cli_enqueue(..., LCK_PW, AST_INTENT_DESTROY, &lockh);
- ldlm_lock_decref(&lockh, LCK_PW);
-#endif
-
- if (0) {
- struct lustre_handle lockh;
- int flags = 0, rc;
- struct ldlm_res_id res_id = { .name = { objid } };
-
- /* This part is a wee bit iffy: we really only want to bust the
- * locks on our stripe, so that we don't end up bouncing
- * [0->EOF] locks around on each of the OSTs as the rest of the
- * destroys get processed. Because we're only talking to
- * the local LDLM, though, we should only end up locking the
- * whole of our stripe. When bug 1425 (take all locks on OST
- * for stripe 0) is fixed, this code should be revisited. */
- struct ldlm_extent extent = { 0, OBD_OBJECT_EOF };
-
- rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, NULL,
- res_id, LDLM_EXTENT, &extent,
- sizeof(extent), LCK_PW, &flags,
- ldlm_completion_ast, filter_blocking_ast,
- NULL, &lockh);
- /* We only care about the side-effects, just drop the lock. */
- ldlm_lock_decref(&lockh, LCK_PW);
+ (unsigned long)inode->i_nlink,
+ atomic_read(&inode->i_count));
}
rc = vfs_unlink(dparent->d_inode, dchild);
RETURN(rc);
}
-/* If closing because we are failing this device, then
- don't do the unlink on close.
-*/
-static int filter_close_internal(struct obd_export *exp,
- struct filter_file_data *ffd,
- struct obd_trans_info *oti, int flags)
-{
- struct obd_device *obd = exp->exp_obd;
- struct filter_obd *filter = &obd->u.filter;
- struct file *filp = ffd->ffd_file;
- struct dentry *dchild = dget(filp->f_dentry);
- struct filter_dentry_data *fdd = dchild->d_fsdata;
- struct lustre_handle parent_lockh;
- int rc, rc2, cleanup_phase = 0;
- struct dentry *dparent = NULL;
- struct obd_run_ctxt saved;
- int nested_trans = (current->journal_info != NULL);
- ENTRY;
-
- LASSERT(filp->private_data == ffd);
- LASSERT(fdd != NULL);
- LASSERT(fdd->fdd_magic == FILTER_DENTRY_MAGIC);
-
- rc = filp_close(filp, 0);
-
- if (atomic_dec_and_test(&fdd->fdd_open_count) &&
- (fdd->fdd_flags & FILTER_FLAG_DESTROY) &&
- !(flags & OBD_OPT_FAILOVER)) {
- void *handle;
-
- push_ctxt(&saved, &filter->fo_ctxt, NULL);
- cleanup_phase = 1;
-
- LASSERT(fdd->fdd_objid > 0);
- dparent = filter_parent_lock(obd, S_IFREG, fdd->fdd_objid,
- LCK_PW, &parent_lockh);
- if (IS_ERR(dparent))
- GOTO(cleanup, rc = PTR_ERR(dparent));
- cleanup_phase = 2;
-
- handle = fsfilt_start(obd, dparent->d_inode,
- FSFILT_OP_UNLINK_LOG, oti);
- if (IS_ERR(handle))
- GOTO(cleanup, rc = PTR_ERR(handle));
-
- if (oti != NULL) {
- if (oti->oti_handle == NULL)
- oti->oti_handle = handle;
- else
- LASSERT(oti->oti_handle == handle);
- }
-
-#ifdef ENABLE_ORPHANS
- /* Remove orphan unlink record from log */
- llog_cancel_records(filter->fo_catalog, 1, &fdd->fdd_cookie);
-#endif
- /* XXX unlink from PENDING directory now too */
- rc2 = filter_destroy_internal(obd, fdd->fdd_objid, dparent,
- dchild);
- if (rc2 && !rc)
- rc = rc2;
- rc = filter_finish_transno(exp, oti, rc);
- rc2 = fsfilt_commit(obd, dparent->d_inode, handle, 0);
- if (rc2) {
- CERROR("error on commit, err = %d\n", rc2);
- if (!rc)
- rc = rc2;
- }
- if (nested_trans == 0) {
- LASSERT(current->journal_info == NULL);
- if (oti != NULL)
- oti->oti_handle = NULL;
- }
- }
-
-cleanup:
- switch(cleanup_phase) {
- case 2:
- if (rc || oti == NULL) {
- filter_parent_unlock(dparent, &parent_lockh, LCK_PW);
- } else {
- memcpy(&oti->oti_ack_locks[0].lock, &parent_lockh,
- sizeof(parent_lockh));
- oti->oti_ack_locks[0].mode = LCK_PW;
- }
- case 1:
- pop_ctxt(&saved, &filter->fo_ctxt, NULL);
- case 0:
- f_dput(dchild);
- filter_ffd_destroy(ffd);
- break;
- default:
- CERROR("invalid cleanup_phase %d\n", cleanup_phase);
- LBUG();
- }
-
- RETURN(rc);
-}
-
/* mount the file system (secretly) */
int filter_common_setup(struct obd_device *obd, obd_count len, void *buf,
char *option)
{
- struct obd_ioctl_data* data = buf;
+ struct lustre_cfg* lcfg = buf;
struct filter_obd *filter = &obd->u.filter;
struct vfsmount *mnt;
int rc = 0;
ENTRY;
- if (!data->ioc_inlbuf1 || !data->ioc_inlbuf2)
+ dev_clear_rdonly(2);
+
+ if (!lcfg->lcfg_inlbuf1 || !lcfg->lcfg_inlbuf2)
RETURN(-EINVAL);
- obd->obd_fsops = fsfilt_get_ops(data->ioc_inlbuf2);
+ obd->obd_fsops = fsfilt_get_ops(lcfg->lcfg_inlbuf2);
if (IS_ERR(obd->obd_fsops))
RETURN(PTR_ERR(obd->obd_fsops));
- mnt = do_kern_mount(data->ioc_inlbuf2, MS_NOATIME | MS_NODIRATIME,
- data->ioc_inlbuf1, option);
+ mnt = do_kern_mount(lcfg->lcfg_inlbuf2, MS_NOATIME | MS_NODIRATIME,
+ lcfg->lcfg_inlbuf1, option);
rc = PTR_ERR(mnt);
if (IS_ERR(mnt))
GOTO(err_ops, rc);
- if (data->ioc_inllen3 > 0 && data->ioc_inlbuf3) {
- if (*data->ioc_inlbuf3 == 'f') {
+ if (lcfg->lcfg_inllen3 > 0 && lcfg->lcfg_inlbuf3) {
+ if (*lcfg->lcfg_inlbuf3 == 'f') {
obd->obd_replayable = 1;
obd_sync_filter = 1;
- CERROR("%s: configured for recovery and sync write\n",
- obd->obd_name);
+ CERROR("%s: recovery enabled\n", obd->obd_name);
} else {
- if (*data->ioc_inlbuf3 != 'n') {
+ if (*lcfg->lcfg_inlbuf3 != 'n') {
CERROR("unrecognised flag '%c'\n",
- *data->ioc_inlbuf3);
+ *lcfg->lcfg_inlbuf3);
}
- }
- }
-
- if (data->ioc_inllen4 > 0 && data->ioc_inlbuf4) {
- if (*data->ioc_inlbuf4 == '/') {
- CERROR("filter namespace mount: %s\n",
- data->ioc_inlbuf4);
- filter->fo_nspath = strdup(data->ioc_inlbuf4);
- } else {
- CERROR("namespace mount must be absolute path: '%s'\n",
- data->ioc_inlbuf4);
+ // XXX Robert? Why do we get errors here
+ // GOTO(err_mntput, rc = -EINVAL);
}
}
filter->fo_fstype = mnt->mnt_sb->s_type->name;
CDEBUG(D_SUPER, "%s: mnt = %p\n", filter->fo_fstype, mnt);
- OBD_SET_CTXT_MAGIC(&filter->fo_ctxt);
- filter->fo_ctxt.pwdmnt = mnt;
- filter->fo_ctxt.pwd = mnt->mnt_root;
- filter->fo_ctxt.fs = get_ds();
+ OBD_SET_CTXT_MAGIC(&obd->obd_ctxt);
+ obd->obd_ctxt.pwdmnt = mnt;
+ obd->obd_ctxt.pwd = mnt->mnt_root;
+ obd->obd_ctxt.fs = get_ds();
+ obd->obd_ctxt.cb_ops = filter_lvfs_ops;
rc = filter_prep(obd);
if (rc)
GOTO(err_mntput, rc);
spin_lock_init(&filter->fo_translock);
- spin_lock_init(&filter->fo_fddlock);
spin_lock_init(&filter->fo_objidlock);
INIT_LIST_HEAD(&filter->fo_export_list);
-
- ptlrpc_init_client(MDS_REQUEST_PORTAL, MDC_REPLY_PORTAL,
- "filter_mdc", &filter->fo_mdc_client);
- sema_init(&filter->fo_sem, 1);
+ sema_init(&filter->fo_alloc_lock, 1);
obd->obd_namespace = ldlm_namespace_new("filter-tgt",
LDLM_NAMESPACE_SERVER);
ptlrpc_init_client(LDLM_CB_REQUEST_PORTAL, LDLM_CB_REPLY_PORTAL,
"filter_ldlm_cb_client", &obd->obd_ldlm_client);
- /* Create a non-replaying connection for recovery logging, so that
- * we don't create a client entry for this local connection, and do
- * not log or assign transaction numbers for logging operations. */
-#ifdef ENABLE_ORPHANS
- filter->fo_catalog = filter_get_catalog(obd);
- if (IS_ERR(filter->fo_catalog))
- GOTO(err_post, rc = PTR_ERR(filter->fo_catalog));
-#endif
-
RETURN(0);
err_post:
static int filter_setup(struct obd_device *obd, obd_count len, void *buf)
{
- struct obd_ioctl_data* data = buf;
+ struct lustre_cfg* lcfg = buf;
+ const char *str = NULL;
char *option = NULL;
+ int n = 0;
+ int rc;
+ if (!strcmp(lcfg->lcfg_inlbuf2, "ext3")) {
#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
/* bug 1577: implement async-delete for 2.5 */
- if (!strcmp(data->ioc_inlbuf2, "ext3"))
- option = "asyncdel";
+ str = "errors=remount-ro,asyncdel";
+#else
+ str = "errors=remount-ro";
#endif
+ n = strlen(str) + 1;
+ OBD_ALLOC(option, n);
+ if (option == NULL)
+ RETURN(-ENOMEM);
+ strcpy(option, str);
+ }
- return filter_common_setup(obd, len, buf, option);
+ rc = filter_common_setup(obd, len, buf, option);
+ if (option)
+ OBD_FREE(option, n);
+ return rc;
+}
+
+static int filter_postsetup(struct obd_device *obd)
+{
+ int rc = 0;
+ ENTRY;
+
+ // XXX add a storage location for the logid for size changes
+#ifdef ENABLE_ORPHANS
+ rc = llog_cat_initialize(obd, 1);
+ if (rc)
+ CERROR("failed to setup llogging subsystems\n");
+#endif
+ RETURN(rc);
}
static int filter_cleanup(struct obd_device *obd, int flags)
}
}
-#ifdef ENABLE_ORPHANS
- filter_put_catalog(filter->fo_catalog);
-#endif
-
- ldlm_namespace_free(obd->obd_namespace);
+ ldlm_namespace_free(obd->obd_namespace, flags & OBD_OPT_FORCE);
if (filter->fo_sb == NULL)
RETURN(0);
filter->fo_sb = 0;
if (atomic_read(&filter->fo_vfsmnt->mnt_count) > 1)
- CERROR("%s: mount point busy, mnt_count: %d\n", obd->obd_name,
+ CERROR("%s: mount point %p busy, mnt_count: %d\n",
+ obd->obd_name, filter->fo_vfsmnt,
atomic_read(&filter->fo_vfsmnt->mnt_count));
unlock_kernel();
fsfilt_put_ops(obd->obd_fsops);
lock_kernel();
+ dev_clear_rdonly(2);
+
RETURN(0);
}
{
struct obd_export *exp;
struct filter_export_data *fed;
- struct filter_client_data *fcd;
+ struct filter_client_data *fcd = NULL;
struct filter_obd *filter = &obd->u.filter;
int rc;
ENTRY;
LASSERT(exp != NULL);
fed = &exp->exp_filter_data;
- class_export_put(exp);
- INIT_LIST_HEAD(&fed->fed_open_head);
spin_lock_init(&fed->fed_lock);
if (!obd->obd_replayable)
- RETURN(0);
+ GOTO(cleanup, rc = 0);
OBD_ALLOC(fcd, sizeof(*fcd));
if (!fcd) {
CERROR("filter: out of memory for client data\n");
- GOTO(out_export, rc = -ENOMEM);
+ GOTO(cleanup, rc = -ENOMEM);
}
memcpy(fcd->fcd_uuid, cluuid, sizeof(fcd->fcd_uuid));
fcd->fcd_mount_count = cpu_to_le64(filter->fo_fsd->fsd_mount_count);
rc = filter_client_add(obd, filter, fed, -1);
- if (rc)
- GOTO(out_fcd, rc);
- RETURN(rc);
+cleanup:
+ if (rc) {
+ if (fcd)
+ OBD_FREE(fcd, sizeof(*fcd));
+ class_disconnect(exp, 0);
+ } else {
+ class_export_put(exp);
+ }
+ return rc;
+}
-out_fcd:
- OBD_FREE(fcd, sizeof(*fcd));
-out_export:
- class_disconnect(conn, 0);
+static int filter_precleanup(struct obd_device *obd, int flags)
+{
+ int rc = 0;
+ ENTRY;
+
+#ifdef ENABLE_ORPHANS
+ rc = obd_llog_finish(obd, 0);
+ if (rc)
+ CERROR("failed to cleanup llogging subsystem\n");
+#endif
RETURN(rc);
}
-static void filter_destroy_export(struct obd_export *exp)
+static int filter_destroy_export(struct obd_export *exp)
{
- struct filter_export_data *fed = &exp->exp_filter_data;
-
ENTRY;
- spin_lock(&fed->fed_lock);
- while (!list_empty(&fed->fed_open_head)) {
- struct filter_file_data *ffd;
- ffd = list_entry(fed->fed_open_head.next, typeof(*ffd),
- ffd_export_list);
- list_del(&ffd->ffd_export_list);
- spin_unlock(&fed->fed_lock);
-
- CDEBUG(D_INFO, "force close file %*s (hdl %p:"LPX64") on "
- "disconnect\n", ffd->ffd_file->f_dentry->d_name.len,
- ffd->ffd_file->f_dentry->d_name.name,
- ffd, ffd->ffd_handle.h_cookie);
-
- filter_close_internal(exp, ffd, NULL, exp->exp_flags);
- spin_lock(&fed->fed_lock);
- }
- spin_unlock(&fed->fed_lock);
+ target_destroy_export(exp);
if (exp->exp_obd->obd_replayable)
filter_client_free(exp, exp->exp_flags);
- EXIT;
+ RETURN(0);
}
/* also incredibly similar to mds_disconnect */
-static int filter_disconnect(struct lustre_handle *conn, int flags)
+static int filter_disconnect(struct obd_export *exp, int flags)
{
- struct obd_export *exp = class_conn2export(conn);
unsigned long irqflags;
+ struct llog_ctxt *ctxt;
int rc;
ENTRY;
exp->exp_flags = flags;
spin_unlock_irqrestore(&exp->exp_lock, irqflags);
- rc = class_disconnect(conn, flags);
-
fsfilt_sync(exp->exp_obd, exp->exp_obd->u.filter.fo_sb);
- class_export_put(exp);
/* XXX cleanup preallocated inodes */
+
+ /* flush any remaining cancel messages out to the target */
+ ctxt = llog_get_context(exp->exp_obd, LLOG_UNLINK_REPL_CTXT);
+ llog_sync(ctxt, exp);
+
+ rc = class_disconnect(exp, flags);
RETURN(rc);
}
struct obdo *oa, const char *what)
{
struct dentry *dchild = NULL;
+ obd_gr group = 0;
- if (oa->o_valid & OBD_MD_FLHANDLE) {
- struct lustre_handle *ost_handle = obdo_handle(oa);
- struct filter_file_data *ffd = filter_handle2ffd(ost_handle);
+ if (oa->o_valid & OBD_MD_FLGROUP)
+ group = oa->o_gr;
- if (ffd != NULL) {
- struct filter_dentry_data *fdd;
- dchild = dget(ffd->ffd_file->f_dentry);
- fdd = dchild->d_fsdata;
- LASSERT(fdd->fdd_magic == FILTER_DENTRY_MAGIC);
- filter_ffd_put(ffd);
-
- CDEBUG(D_INODE,"%s got child objid %*s: %p, count %d\n",
- what, dchild->d_name.len, dchild->d_name.name,
- dchild, atomic_read(&dchild->d_count));
- }
- }
-
- if (!dchild)
- dchild = filter_fid2dentry(obd, NULL, oa->o_mode, oa->o_id);
+ dchild = filter_fid2dentry(obd, NULL, group, oa->o_id);
if (IS_ERR(dchild)) {
CERROR("%s error looking up object: "LPU64"\n", what, oa->o_id);
RETURN(dchild);
}
- if (!dchild->d_inode) {
+ if (dchild->d_inode == NULL) {
CERROR("%s on non-existent object: "LPU64"\n", what, oa->o_id);
f_dput(dchild);
RETURN(ERR_PTR(-ENOENT));
return dchild;
}
-static int filter_getattr(struct lustre_handle *conn, struct obdo *oa,
+static int filter_getattr(struct obd_export *exp, struct obdo *oa,
struct lov_stripe_md *md)
{
struct dentry *dentry = NULL;
int rc = 0;
ENTRY;
- obd = class_conn2obd(conn);
+ obd = class_exp2obd(exp);
if (obd == NULL) {
- CDEBUG(D_IOCTL, "invalid client cookie "LPX64"\n",conn->cookie);
+ CDEBUG(D_IOCTL, "invalid client cookie "LPX64"\n",
+ exp->exp_handle.h_cookie);
RETURN(-EINVAL);
}
}
/* this is called from filter_truncate() until we have filter_punch() */
-static int filter_setattr(struct lustre_handle *conn, struct obdo *oa,
+static int filter_setattr(struct obd_export *exp, struct obdo *oa,
struct lov_stripe_md *md, struct obd_trans_info *oti)
{
struct obd_run_ctxt saved;
- struct obd_export *exp;
struct filter_obd *filter;
struct dentry *dentry;
struct iattr iattr;
ENTRY;
LASSERT(oti != NULL);
- exp = class_conn2export(conn);
- if (!exp) {
- CERROR("invalid client cookie "LPX64"\n", conn->cookie);
- RETURN(-EINVAL);
- }
dentry = filter_oa2dentry(exp->exp_obd, oa);
if (IS_ERR(dentry))
- GOTO(out_exp, rc = PTR_ERR(dentry));
+ RETURN(PTR_ERR(dentry));
filter = &exp->exp_obd->u.filter;
iattr_from_obdo(&iattr, oa, oa->o_valid);
- push_ctxt(&saved, &filter->fo_ctxt, NULL);
+ push_ctxt(&saved, &exp->exp_obd->obd_ctxt, NULL);
lock_kernel();
- /* XXX this could be a rwsem instead, if filter_preprw played along */
if (iattr.ia_valid & ATTR_SIZE)
down(&dentry->d_inode->i_sem);
-
handle = fsfilt_start(exp->exp_obd, dentry->d_inode, FSFILT_OP_SETATTR,
oti);
if (IS_ERR(handle))
GOTO(out_unlock, rc = PTR_ERR(handle));
- rc = fsfilt_setattr(exp->exp_obd, dentry, handle, &iattr, 1);
+ /* XXX this could be a rwsem instead, if filter_preprw played along */
+ if (iattr.ia_valid & ATTR_ATTR_FLAG)
+ rc = fsfilt_iocontrol(exp->exp_obd, dentry->d_inode, NULL,
+ EXT3_IOC_SETFLAGS,
+ (long)&iattr.ia_attr_flags);
+ else
+ rc = fsfilt_setattr(exp->exp_obd, dentry, handle, &iattr, 1);
rc = filter_finish_transno(exp, oti, rc);
rc2 = fsfilt_commit(exp->exp_obd, dentry->d_inode, handle, 0);
if (rc2) {
rc = rc2;
}
- if (iattr.ia_valid & ATTR_SIZE)
- up(&dentry->d_inode->i_sem);
-
oa->o_valid = OBD_MD_FLID;
obdo_from_inode(oa, dentry->d_inode, FILTER_VALID_FLAGS);
out_unlock:
+ if (iattr.ia_valid & ATTR_SIZE)
+ up(&dentry->d_inode->i_sem);
unlock_kernel();
- pop_ctxt(&saved, &filter->fo_ctxt, NULL);
+ pop_ctxt(&saved, &exp->exp_obd->obd_ctxt, NULL);
f_dput(dentry);
- out_exp:
- class_export_put(exp);
RETURN(rc);
}
-static int filter_open(struct lustre_handle *conn, struct obdo *oa,
- struct lov_stripe_md *ea, struct obd_trans_info *oti,
- struct obd_client_handle *och)
+/* XXX identical to osc_unpackmd */
+static int filter_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
+ struct lov_mds_md *lmm, int lmm_bytes)
{
- struct obd_export *exp;
- struct lustre_handle *handle;
- struct filter_file_data *ffd;
- struct file *filp;
- struct lustre_handle parent_lockh;
- int rc = 0;
+ int lsm_size;
ENTRY;
- exp = class_conn2export(conn);
- if (exp == NULL) {
- CDEBUG(D_IOCTL, "invalid client cookie "LPX64"\n",conn->cookie);
- RETURN(-EINVAL);
+ if (lmm != NULL) {
+ if (lmm_bytes < sizeof (*lmm)) {
+ CERROR("lov_mds_md too small: %d, need %d\n",
+ lmm_bytes, (int)sizeof(*lmm));
+ RETURN(-EINVAL);
+ }
+ /* XXX LOV_MAGIC etc check? */
+
+ if (lmm->lmm_object_id == cpu_to_le64(0)) {
+ CERROR("lov_mds_md: zero lmm_object_id\n");
+ RETURN(-EINVAL);
+ }
}
- filp = filter_obj_open(exp, oti, oa->o_id, oa->o_mode,
- LCK_PR, &parent_lockh);
- if (IS_ERR(filp))
- GOTO(out, rc = PTR_ERR(filp));
+ lsm_size = lov_stripe_md_size(1);
+ if (lsmp == NULL)
+ RETURN(lsm_size);
- oa->o_valid = OBD_MD_FLID;
- obdo_from_inode(oa, filp->f_dentry->d_inode, FILTER_VALID_FLAGS);
+ if (*lsmp != NULL && lmm == NULL) {
+ OBD_FREE(*lsmp, lsm_size);
+ *lsmp = NULL;
+ RETURN(0);
+ }
- ffd = filp->private_data;
- handle = obdo_handle(oa);
- handle->cookie = ffd->ffd_handle.h_cookie;
- oa->o_valid |= OBD_MD_FLHANDLE;
+ if (*lsmp == NULL) {
+ OBD_ALLOC(*lsmp, lsm_size);
+ if (*lsmp == NULL)
+ RETURN(-ENOMEM);
-out:
- class_export_put(exp);
- if (!rc) {
- memcpy(&oti->oti_ack_locks[0].lock, &parent_lockh,
- sizeof(parent_lockh));
- oti->oti_ack_locks[0].mode = LCK_PR;
+ loi_init((*lsmp)->lsm_oinfo);
}
- RETURN(rc);
+
+ if (lmm != NULL) {
+ /* XXX zero *lsmp? */
+ (*lsmp)->lsm_object_id = le64_to_cpu (lmm->lmm_object_id);
+ LASSERT((*lsmp)->lsm_object_id);
+ }
+
+ (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
+
+ RETURN(lsm_size);
}
-static int filter_close(struct lustre_handle *conn, struct obdo *oa,
- struct lov_stripe_md *ea, struct obd_trans_info *oti)
+static void filter_destroy_precreated(struct obd_export *exp, struct obdo *oa,
+ struct filter_obd *filter)
{
- struct obd_export *exp;
- struct filter_file_data *ffd;
- struct filter_export_data *fed;
- int rc;
+ struct obdo doa; /* XXX obdo on stack */
+ __u64 last, id;
ENTRY;
+ LASSERT(oa);
- exp = class_conn2export(conn);
- if (exp == NULL) {
- CDEBUG(D_IOCTL, "invalid client cookie "LPX64"\n",conn->cookie);
- RETURN(-EINVAL);
+ memset(&doa, 0, sizeof(doa));
+ if (oa->o_valid & OBD_MD_FLGROUP)
+ doa.o_gr = oa->o_gr;
+ else
+ doa.o_gr = 0;
+ doa.o_mode = S_IFREG;
+ last = filter_last_id(filter, &doa); /* FIXME: object groups */
+ CWARN("deleting orphan objects from "LPU64" to "LPU64"\n",
+ oa->o_id + 1, last);
+ for (id = oa->o_id + 1; id <= last; id++) {
+ doa.o_id = id;
+ filter_destroy(exp, &doa, NULL, NULL);
}
+ spin_lock(&filter->fo_objidlock);
+ filter->fo_last_objids[0] = oa->o_id; /* FIXME: object groups */
+ spin_unlock(&filter->fo_objidlock);
+ EXIT;
+}
- if (!(oa->o_valid & OBD_MD_FLHANDLE)) {
- CERROR("no handle for close of objid "LPU64"\n", oa->o_id);
- GOTO(out, rc = -EINVAL);
- }
+/* returns a negative error or a nonnegative number of files to create */
+static int filter_should_precreate(struct obd_export *exp, struct obdo *oa,
+ int group)
+{
+ struct obd_device *obd = exp->exp_obd;
+ struct filter_obd *filter = &obd->u.filter;
+ int diff, rc;
+ ENTRY;
- ffd = filter_handle2ffd(obdo_handle(oa));
- if (ffd == NULL) {
- CERROR("bad handle ("LPX64") for close\n",
- obdo_handle(oa)->cookie);
- GOTO(out, rc = -ESTALE);
- }
+ /* only precreate if group == 0 and o_id is specfied */
+ if (group != 0 || oa->o_id == 0)
+ RETURN(1);
- fed = &exp->exp_filter_data;
- spin_lock(&fed->fed_lock);
- list_del(&ffd->ffd_export_list);
- spin_unlock(&fed->fed_lock);
+ diff = oa->o_id - filter_last_id(filter, oa);
+ CDEBUG(D_INFO, "filter_last_id() = "LPU64" -> diff = %d\n",
+ filter_last_id(filter, oa), diff);
+ if (diff >= 0)
+ RETURN(diff);
- oa->o_valid = OBD_MD_FLID;
- obdo_from_inode(oa,ffd->ffd_file->f_dentry->d_inode,FILTER_VALID_FLAGS);
+ if (!(oa->o_valid & OBD_MD_FLFLAGS) ||
+ !(oa->o_flags & OBD_FL_DELORPHAN)) {
+ CERROR("filter asked to delete %d objects, but DELORPHAN flag "
+ "isn't set!\n", -diff);
+ RETURN(0);
+ }
- rc = filter_close_internal(exp, ffd, oti, 0);
- filter_ffd_put(ffd);
- GOTO(out, rc);
- out:
- class_export_put(exp);
- return rc;
+ /* delete orphan request */
+ filter_destroy_precreated(exp, oa, filter);
+ rc = filter_update_last_objid(obd, group, 0);
+ if (rc)
+ CERROR("unable to write lastobjid, but orphans were deleted\n");
+ RETURN(rc);
}
-static int filter_create(struct lustre_handle *conn, struct obdo *oa,
- struct lov_stripe_md **ea, struct obd_trans_info *oti)
+/* We rely on the fact that only one thread will be creating files in a given
+ * group at a time, which is why we don't need an atomic filter_get_new_id.
+ * Even if we had that atomic function, the following race would exist:
+ *
+ * thread 1: gets id x from filter_next_id
+ * thread 2: gets id (x + 1) from filter_next_id
+ * thread 2: creates object (x + 1)
+ * thread 1: tries to create object x, gets -ENOSPC
+ */
+static int filter_precreate(struct obd_device *obd, struct obdo *oa,
+ obd_gr group, int *num)
{
- struct obd_export *exp;
- struct obd_device *obd;
- struct filter_obd *filter;
- struct obd_run_ctxt saved;
struct lustre_handle parent_lockh;
- struct dentry *dparent;
- struct ll_fid mds_fid = { .id = 0 };
struct dentry *dchild = NULL;
+ struct filter_obd *filter;
+ struct dentry *dparent;
+ struct iattr attr;
+ int err = 0, rc = 0, i;
+ __u64 next_id;
void *handle;
- int err, rc, cleanup_phase;
ENTRY;
- exp = class_conn2export(conn);
- if (exp == NULL) {
- CDEBUG(D_IOCTL,"invalid client cookie "LPX64"\n", conn->cookie);
- RETURN(-EINVAL);
- }
-
- obd = exp->exp_obd;
filter = &obd->u.filter;
- push_ctxt(&saved, &filter->fo_ctxt, NULL);
- retry:
- oa->o_id = filter_next_id(filter);
- cleanup_phase = 0;
- dparent = filter_parent_lock(obd, S_IFREG, oa->o_id, LCK_PW,
- &parent_lockh);
- if (IS_ERR(dparent))
- GOTO(cleanup, rc = PTR_ERR(dparent));
- cleanup_phase = 1;
+ for (i = 0; i < *num && err == 0; i++) {
+ next_id = filter_last_id(filter, NULL) + 1;
+ CDEBUG(D_INFO, "precreate objid "LPU64"\n", next_id);
- dchild = filter_fid2dentry(obd, dparent, S_IFREG, oa->o_id);
- if (IS_ERR(dchild))
- GOTO(cleanup, rc = PTR_ERR(dchild));
- if (dchild->d_inode) {
- /* This would only happen if lastobjid was bad on disk */
- CERROR("Serious error: objid %*s already exists; is this "
- "filesystem corrupt? I will try to work around it.\n",
- dchild->d_name.len, dchild->d_name.name);
+ dparent = filter_parent_lock(obd, group, next_id, LCK_PW,
+ &parent_lockh);
+ if (IS_ERR(dparent)) {
+ rc = PTR_ERR(dparent);
+ break;
+ }
+
+ dchild = filter_fid2dentry(obd, dparent, group, next_id);
+ if (IS_ERR(dchild))
+ GOTO(cleanup_lock, rc = PTR_ERR(dchild));
+
+ if (dchild->d_inode != NULL) {
+ /* This would only happen if lastobjid was bad on disk*/
+ CERROR("Serious error: objid %*s already exists; is "
+ "this filesystem corrupt?\n",
+ dchild->d_name.len, dchild->d_name.name);
+ GOTO(cleanup_dchild, rc = -EEXIST);
+ }
+
+ handle = fsfilt_start(obd, dparent->d_inode,
+ FSFILT_OP_CREATE_LOG, NULL);
+ if (IS_ERR(handle))
+ GOTO(cleanup_dchild, rc = PTR_ERR(handle));
+
+ rc = ll_vfs_create(dparent->d_inode, dchild, S_IFREG, NULL);
+ if (rc) {
+ CERROR("create failed rc = %d\n", rc);
+ GOTO(cleanup_commit, rc);
+ } else if (oa != NULL &&
+ (oa->o_valid & (OBD_MD_FLCTIME | OBD_MD_FLMTIME |
+ OBD_MD_FLSIZE))) {
+ iattr_from_obdo(&attr, oa, oa->o_valid);
+ rc = fsfilt_setattr(obd, dchild, handle, &attr, 1);
+ if (rc)
+ CERROR("create setattr failed rc = %d\n", rc);
+ }
+ filter_set_last_id(filter, NULL, next_id);
+ err = filter_update_last_objid(obd, group, 0);
+ if (err)
+ CERROR("unable to write lastobjid but file created\n");
+ cleanup_commit:
+ err = fsfilt_commit(obd, dparent->d_inode, handle, 0);
+ if (err) {
+ CERROR("error on commit, err = %d\n", err);
+ if (!rc)
+ rc = err;
+ }
+ if (dchild->d_inode != NULL && oa != NULL)
+ obdo_from_inode(oa, dchild->d_inode,
+ FILTER_VALID_FLAGS);
+ cleanup_dchild:
f_dput(dchild);
+ cleanup_lock:
filter_parent_unlock(dparent, &parent_lockh, LCK_PW);
- goto retry;
+ if (rc)
+ break;
+ oa = NULL; /* oa applies for first iteration only */
}
+ *num = i;
- cleanup_phase = 2;
- handle = fsfilt_start(obd, dparent->d_inode, FSFILT_OP_CREATE_LOG, oti);
- if (IS_ERR(handle))
- GOTO(cleanup, rc = PTR_ERR(handle));
+ CDEBUG(D_INFO, "filter_precreate() created %d objects\n", i);
+ RETURN(rc);
+}
- rc = vfs_create(dparent->d_inode, dchild, oa->o_mode);
- if (rc) {
- CERROR("create failed rc = %d\n", rc);
- } else if (oa->o_valid & (OBD_MD_FLCTIME|OBD_MD_FLMTIME|OBD_MD_FLSIZE)){
- struct iattr attr;
+static int filter_create(struct obd_export *exp, struct obdo *oa,
+ struct lov_stripe_md **ea, struct obd_trans_info *oti)
+{
+ struct obd_device *obd = NULL;
+ struct obd_run_ctxt saved;
+ struct lov_stripe_md *lsm = NULL;
+ obd_gr group = 0;
+ int rc = 0, diff;
+ ENTRY;
- iattr_from_obdo(&attr, oa, oa->o_valid);
- rc = fsfilt_setattr(obd, dchild, handle, &attr, 1);
- if (rc)
- CERROR("create setattr failed rc = %d\n", rc);
- }
- rc = filter_finish_transno(exp, oti, rc);
- err = filter_update_server_data(obd, filter->fo_rcvd_filp,
- filter->fo_fsd);
- if (err)
- CERROR("unable to write lastobjid but file created\n");
-
- /* Set flags for fields we have set in the inode struct */
- if (!rc && mds_fid.id && (oa->o_valid & OBD_MD_FLCOOKIE)) {
- err = filter_log_op_create(obd->u.filter.fo_catalog, &mds_fid,
- dchild->d_inode->i_ino,
- dchild->d_inode->i_generation,
- oti->oti_logcookies);
- if (err) {
- CERROR("error logging create record: rc %d\n", err);
- oa->o_valid = OBD_MD_FLID;
- } else {
- oa->o_valid = OBD_MD_FLID | OBD_MD_FLCOOKIE;
+ if (oa->o_valid & OBD_MD_FLGROUP)
+ group = oa->o_gr;
+
+ CDEBUG(D_INFO, "filter_create(od->o_gr="LPU64",od->o_id="LPU64")\n",
+ group, oa->o_id);
+ if (ea != NULL) {
+ lsm = *ea;
+ if (lsm == NULL) {
+ rc = obd_alloc_memmd(exp, &lsm);
+ if (rc < 0)
+ RETURN(rc);
}
- } else
- oa->o_valid = OBD_MD_FLID;
-
- err = fsfilt_commit(obd, dparent->d_inode, handle, 0);
- if (err) {
- CERROR("error on commit, err = %d\n", err);
- if (!rc)
- rc = err;
}
- if (rc)
- GOTO(cleanup, rc);
+ obd = exp->exp_obd;
+ push_ctxt(&saved, &obd->obd_ctxt, NULL);
- /* Set flags for fields we have set in the inode struct */
- obdo_from_inode(oa, dchild->d_inode, FILTER_VALID_FLAGS);
+ diff = filter_should_precreate(exp, oa, group);
+ if (diff > 0) {
+ oa->o_id = filter_last_id(&obd->u.filter, oa);
+ rc = filter_precreate(obd, oa, group, &diff);
+ oa->o_id += diff;
+ oa->o_valid = OBD_MD_FLID;
+ }
- EXIT;
-cleanup:
- switch(cleanup_phase) {
- case 2:
- f_dput(dchild);
- case 1: /* locked parent dentry */
- if (rc || oti == NULL) {
- filter_parent_unlock(dparent, &parent_lockh, LCK_PW);
- } else {
- memcpy(&oti->oti_ack_locks[0].lock, &parent_lockh,
- sizeof(parent_lockh));
- oti->oti_ack_locks[0].mode = LCK_PW;
- }
- case 0:
- pop_ctxt(&saved, &filter->fo_ctxt, NULL);
- class_export_put(exp);
- break;
- default:
- CERROR("invalid cleanup_phase %d\n", cleanup_phase);
- LBUG();
+ pop_ctxt(&saved, &obd->obd_ctxt, NULL);
+ if (rc && ea != NULL && *ea != lsm) {
+ obd_free_memmd(exp, &lsm);
+ } else if (rc == 0 && ea != NULL) {
+ /* XXX LOV STACKING: the lsm that is passed to us from
+ * LOV does not have valid lsm_oinfo data structs, so
+ * don't go touching that. This needs to be fixed in a
+ * big way. */
+ lsm->lsm_object_id = oa->o_id;
+ *ea = lsm;
}
RETURN(rc);
}
-static int filter_destroy(struct lustre_handle *conn, struct obdo *oa,
+static int filter_destroy(struct obd_export *exp, struct obdo *oa,
struct lov_stripe_md *ea, struct obd_trans_info *oti)
{
- struct obd_export *exp;
struct obd_device *obd;
struct filter_obd *filter;
struct dentry *dchild = NULL, *dparent = NULL;
- struct filter_dentry_data *fdd;
struct obd_run_ctxt saved;
void *handle = NULL;
struct lustre_handle parent_lockh;
struct llog_cookie *fcc = NULL;
- int rc, rc2, cleanup_phase = 0;
+ int rc, rc2, cleanup_phase = 0, have_prepared = 0;
+ obd_gr group = 0;
ENTRY;
- exp = class_conn2export(conn);
- if (exp == NULL) {
- CDEBUG(D_IOCTL, "invalid client cookie "LPX64"\n",conn->cookie);
- RETURN(-EINVAL);
- }
+ if (oa->o_valid & OBD_MD_FLGROUP)
+ group = oa->o_gr;
obd = exp->exp_obd;
filter = &obd->u.filter;
- push_ctxt(&saved, &filter->fo_ctxt, NULL);
- dparent = filter_parent_lock(obd, oa->o_mode, oa->o_id,
- LCK_PW, &parent_lockh);
+ push_ctxt(&saved, &obd->obd_ctxt, NULL);
+
+ acquire_locks:
+ dparent = filter_parent_lock(obd, group, oa->o_id, LCK_PW,
+ &parent_lockh);
if (IS_ERR(dparent))
GOTO(cleanup, rc = PTR_ERR(dparent));
cleanup_phase = 1;
- dchild = filter_fid2dentry(obd, dparent, S_IFREG, oa->o_id);
+ dchild = filter_fid2dentry(obd, dparent, group, oa->o_id);
if (IS_ERR(dchild))
GOTO(cleanup, rc = -ENOENT);
cleanup_phase = 2;
CERROR("destroying non-existent object "LPU64"\n", oa->o_id);
GOTO(cleanup, rc = -ENOENT);
}
+
+ if (!have_prepared) {
+ /* If we're really going to destroy the object, get ready
+ * by getting the clients to discard their cached data.
+ *
+ * We have to drop the parent lock, because
+ * filter_prepare_destroy will acquire a PW on the object, and
+ * we don't want to deadlock with an incoming write to the
+ * object, which has the extent PW and then wants to get the
+ * parent dentry to do the lookup.
+ *
+ * We dput the child because it's not worth the extra
+ * complication of condition the above code to skip it on the
+ * second time through. */
+ f_dput(dchild);
+ filter_parent_unlock(dparent, &parent_lockh, LCK_PW);
+
+ filter_prepare_destroy(obd, oa->o_id);
+ have_prepared = 1;
+ goto acquire_locks;
+ }
+
handle = fsfilt_start(obd, dparent->d_inode, FSFILT_OP_UNLINK_LOG, oti);
if (IS_ERR(handle))
GOTO(cleanup, rc = PTR_ERR(handle));
cleanup_phase = 3;
- fdd = dchild->d_fsdata;
-
/* Our MDC connection is established by the MDS to us */
- if ((oa->o_valid & OBD_MD_FLCOOKIE) && filter->fo_mdc_imp != NULL) {
+ if (oa->o_valid & OBD_MD_FLCOOKIE) {
OBD_ALLOC(fcc, sizeof(*fcc));
if (fcc != NULL)
memcpy(fcc, obdo_logcookie(oa), sizeof(*fcc));
}
- if (fdd != NULL && atomic_read(&fdd->fdd_open_count)) {
- LASSERT(fdd->fdd_magic == FILTER_DENTRY_MAGIC);
- if (!(fdd->fdd_flags & FILTER_FLAG_DESTROY)) {
- fdd->fdd_flags |= FILTER_FLAG_DESTROY;
-
-#ifdef ENABLE_ORPHANS
- filter_log_op_orphan(filter->fo_catalog, oa->o_id,
- oa->o_generation,&fdd->fdd_cookie);
-#endif
- CDEBUG(D_INODE,
- "defer destroy of %dx open objid "LPU64"\n",
- atomic_read(&fdd->fdd_open_count), oa->o_id);
- } else {
- CDEBUG(D_INODE,
- "repeat destroy of %dx open objid "LPU64"\n",
- atomic_read(&fdd->fdd_open_count), oa->o_id);
- }
- GOTO(cleanup, rc = 0);
- }
-
rc = filter_destroy_internal(obd, oa->o_id, dparent, dchild);
cleanup:
switch(cleanup_phase) {
case 3:
if (fcc != NULL)
- fsfilt_set_last_rcvd(obd, 0, oti->oti_handle,
- filter_cancel_cookies_cb, fcc);
+ fsfilt_add_journal_cb(obd, 0, oti->oti_handle,
+ filter_cancel_cookies_cb, fcc);
rc = filter_finish_transno(exp, oti, rc);
rc2 = fsfilt_commit(obd, dparent->d_inode, handle, 0);
if (rc2) {
oti->oti_ack_locks[0].mode = LCK_PW;
}
case 0:
- pop_ctxt(&saved, &filter->fo_ctxt, NULL);
- class_export_put(exp);
+ pop_ctxt(&saved, &obd->obd_ctxt, NULL);
break;
default:
CERROR("invalid cleanup_phase %d\n", cleanup_phase);
}
/* NB start and end are used for punch, but not truncate */
-static int filter_truncate(struct lustre_handle *conn, struct obdo *oa,
+static int filter_truncate(struct obd_export *exp, struct obdo *oa,
struct lov_stripe_md *lsm,
obd_off start, obd_off end,
struct obd_trans_info *oti)
CDEBUG(D_INODE, "calling truncate for object "LPU64", valid = %x, "
"o_size = "LPD64"\n", oa->o_id, oa->o_valid, start);
oa->o_size = start;
- error = filter_setattr(conn, oa, NULL, oti);
+ error = filter_setattr(exp, oa, NULL, oti);
RETURN(error);
}
-static int filter_syncfs(struct obd_export *exp)
+static int filter_sync(struct obd_export *exp, struct obdo *oa,
+ struct lov_stripe_md *lsm, obd_off start, obd_off end)
{
+ struct obd_run_ctxt saved;
+ struct filter_obd *filter;
+ struct dentry *dentry;
+ int rc, rc2;
ENTRY;
- RETURN(fsfilt_sync(exp->exp_obd, exp->exp_obd->u.filter.fo_sb));
+ filter = &exp->exp_obd->u.filter;
+
+ /* an objid of zero is taken to mean "sync whole filesystem" */
+ if (!oa || !oa->o_valid & OBD_MD_FLID) {
+ rc = fsfilt_sync(exp->exp_obd, filter->fo_sb);
+ GOTO(out_exp, rc);
+ }
+
+ dentry = filter_oa2dentry(exp->exp_obd, oa);
+ if (IS_ERR(dentry))
+ GOTO(out_exp, rc = PTR_ERR(dentry));
+
+ push_ctxt(&saved, &exp->exp_obd->obd_ctxt, NULL);
+
+ down(&dentry->d_inode->i_sem);
+ rc = filemap_fdatasync(dentry->d_inode->i_mapping);
+ if (rc == 0) {
+ /* just any file to grab fsync method - "file" arg unused */
+ struct file *file = filter->fo_rcvd_filp;
+
+ if (file->f_op && file->f_op->fsync)
+ rc = file->f_op->fsync(NULL, dentry, 1);
+
+ rc2 = filemap_fdatawait(dentry->d_inode->i_mapping);
+ if (!rc)
+ rc = rc2;
+ }
+ up(&dentry->d_inode->i_sem);
+
+ oa->o_valid = OBD_MD_FLID;
+ obdo_from_inode(oa, dentry->d_inode, FILTER_VALID_FLAGS);
+
+ pop_ctxt(&saved, &exp->exp_obd->obd_ctxt, NULL);
+
+ f_dput(dentry);
+out_exp:
+ class_export_put(exp);
+ RETURN(rc);
}
static int filter_statfs(struct obd_device *obd, struct obd_statfs *osfs,
RETURN(fsfilt_statfs(obd, obd->u.filter.fo_sb, osfs));
}
-static int filter_get_info(struct lustre_handle *conn, __u32 keylen,
+static int filter_get_info(struct obd_export *exp, __u32 keylen,
void *key, __u32 *vallen, void *val)
{
struct obd_device *obd;
ENTRY;
- obd = class_conn2obd(conn);
+ obd = class_exp2obd(exp);
if (obd == NULL) {
CDEBUG(D_IOCTL, "invalid client cookie "LPX64"\n",
- conn->cookie);
+ exp->exp_handle.h_cookie);
RETURN(-EINVAL);
}
RETURN(0);
}
+ if (keylen >= strlen("last_id") && memcmp(key, "last_id", 7) == 0) {
+ obd_id *last_id = val;
+ /* FIXME: object groups */
+ *last_id = filter_last_id(&obd->u.filter, 0);
+ RETURN(0);
+ }
CDEBUG(D_IOCTL, "invalid key\n");
RETURN(-EINVAL);
}
-static int filter_set_info(struct lustre_handle *conn, __u32 keylen,
+static int filter_set_info(struct obd_export *exp, __u32 keylen,
void *key, __u32 vallen, void *val)
{
struct obd_device *obd;
- struct obd_export *exp;
- struct obd_import *imp;
+ struct lustre_handle conn;
+#ifdef ENABLE_ORPHANS
+ struct llog_ctxt *ctxt;
+#endif
+ int rc = 0;
ENTRY;
- obd = class_conn2obd(conn);
+ conn.cookie = exp->exp_handle.h_cookie;
+
+ obd = exp->exp_obd;
if (obd == NULL) {
- CDEBUG(D_IOCTL, "invalid client cookie "LPX64"\n",
- conn->cookie);
+ CDEBUG(D_IOCTL, "invalid exp %p cookie "LPX64"\n",
+ exp, conn.cookie);
RETURN(-EINVAL);
}
memcmp(key, "mds_conn", keylen) != 0)
RETURN(-EINVAL);
- CERROR("Received MDS connection ("LPX64")\n", conn->cookie);
- memcpy(&obd->u.filter.fo_mdc_conn, conn, sizeof(*conn));
-
- imp = obd->u.filter.fo_mdc_imp = class_new_import();
-
- exp = class_conn2export(conn);
- imp->imp_connection = ptlrpc_connection_addref(exp->exp_connection);
- class_export_put(exp);
-
- imp->imp_client = &obd->u.filter.fo_mdc_client;
- imp->imp_remote_handle = *conn;
- imp->imp_obd = obd;
- imp->imp_dlm_fake = 1; /* XXX rename imp_dlm_fake to something else */
- imp->imp_level = LUSTRE_CONN_FULL;
- class_import_put(imp);
-
- RETURN(0);
+ CWARN("Received MDS connection ("LPX64")\n", conn.cookie);
+ memcpy(&obd->u.filter.fo_mdc_conn, &conn, sizeof(conn));
+#ifdef ENABLE_ORPHANS
+ ctxt = llog_get_context(obd, LLOG_UNLINK_REPL_CTXT);
+ rc = llog_receptor_accept(ctxt, exp->exp_imp_reverse);
+#endif
+ RETURN(rc);
}
-int filter_iocontrol(unsigned int cmd, struct lustre_handle *conn,
+int filter_iocontrol(unsigned int cmd, struct obd_export *exp,
int len, void *karg, void *uarg)
{
- struct obd_device *obd = class_conn2obd(conn);
+ struct obd_device *obd = exp->exp_obd;
+ struct obd_ioctl_data *data = karg;
+ int rc = 0;
switch (cmd) {
case OBD_IOC_ABORT_RECOVERY:
target_abort_recovery(obd);
RETURN(0);
+ case OBD_IOC_SET_READONLY: {
+ void *handle;
+ struct super_block *sb = obd->u.filter.fo_sb;
+ struct inode *inode = sb->s_root->d_inode;
+ BDEVNAME_DECLARE_STORAGE(tmp);
+ CERROR("setting device %s read-only\n",
+ ll_bdevname(sb, tmp));
+
+ handle = fsfilt_start(obd, inode, FSFILT_OP_MKNOD, NULL);
+ LASSERT(handle);
+ (void)fsfilt_commit(obd, inode, handle, 1);
+
+ dev_set_rdonly(ll_sbdev(obd->u.filter.fo_sb), 2);
+ RETURN(0);
+ }
+
+ case OBD_IOC_CATLOGLIST: {
+ rc = llog_catlog_list(obd, 1, data);
+ RETURN(rc);
+ }
+
+ case OBD_IOC_LLOG_CANCEL:
+ case OBD_IOC_LLOG_REMOVE:
+ case OBD_IOC_LLOG_INFO:
+ case OBD_IOC_LLOG_PRINT: {
+ /* FIXME to be finished */
+ RETURN(-EOPNOTSUPP);
+/*
+ struct llog_ctxt *ctxt = NULL;
+
+ push_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_ctxt, NULL);
+ rc = llog_ioctl(ctxt, cmd, data);
+ pop_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_ctxt, NULL);
+
+ RETURN(rc);
+*/
+ }
+
+
default:
RETURN(-EINVAL);
}
RETURN(0);
}
+static struct llog_operations filter_unlink_repl_logops;
+static struct llog_operations filter_size_orig_logops = {
+ lop_setup: llog_obd_origin_setup,
+ lop_cleanup: llog_obd_origin_cleanup,
+ lop_add: llog_obd_origin_add
+};
+
+static int filter_llog_init(struct obd_device *obd, struct obd_device *tgt,
+ int count, struct llog_logid *logid)
+{
+ struct llog_ctxt *ctxt;
+ int rc;
+ ENTRY;
+
+ filter_unlink_repl_logops = llog_client_ops;
+ filter_unlink_repl_logops.lop_cancel = llog_obd_repl_cancel;
+ filter_unlink_repl_logops.lop_connect = llog_repl_connect;
+ filter_unlink_repl_logops.lop_sync = llog_obd_repl_sync;
+
+ rc = llog_setup(obd, LLOG_UNLINK_REPL_CTXT, tgt, 0, NULL,
+ &filter_unlink_repl_logops);
+ if (rc)
+ RETURN(rc);
+ /* FIXME - assign unlink_cb for filter's recovery */
+ ctxt = llog_get_context(obd, LLOG_UNLINK_REPL_CTXT);
+ ctxt->llog_proc_cb = filter_recov_log_unlink_cb;
+
+ rc = llog_setup(obd, LLOG_SIZE_ORIG_CTXT, tgt, 0, NULL,
+ &filter_size_orig_logops);
+ RETURN(rc);
+}
+
+static int filter_llog_finish(struct obd_device *obd, int count)
+{
+ int rc;
+ ENTRY;
+
+ rc = llog_cleanup(llog_get_context(obd, LLOG_UNLINK_REPL_CTXT));
+ if (rc)
+ RETURN(rc);
+
+ rc = llog_cleanup(llog_get_context(obd, LLOG_SIZE_ORIG_CTXT));
+ RETURN(rc);
+}
+
+static struct dentry *filter_lvfs_fid2dentry(__u64 id, __u32 gen, __u64 gr,
+ void *data)
+{
+ return filter_fid2dentry(data, NULL, gr, id);
+}
+
+static struct lvfs_callback_ops filter_lvfs_ops = {
+ l_fid2dentry: filter_lvfs_fid2dentry,
+};
+
static struct obd_ops filter_obd_ops = {
o_owner: THIS_MODULE,
o_attach: filter_attach,
o_get_info: filter_get_info,
o_set_info: filter_set_info,
o_setup: filter_setup,
+ o_postsetup: filter_postsetup,
+ o_precleanup: filter_precleanup,
o_cleanup: filter_cleanup,
o_connect: filter_connect,
o_disconnect: filter_disconnect,
o_statfs: filter_statfs,
- o_syncfs: filter_syncfs,
o_getattr: filter_getattr,
+ o_unpackmd: filter_unpackmd,
o_create: filter_create,
o_setattr: filter_setattr,
o_destroy: filter_destroy,
- o_open: filter_open,
- o_close: filter_close,
o_brw: filter_brw,
o_punch: filter_truncate,
+ o_sync: filter_sync,
o_preprw: filter_preprw,
o_commitrw: filter_commitrw,
- o_log_cancel: filter_log_cancel,
o_destroy_export: filter_destroy_export,
+ o_llog_init: filter_llog_init,
+ o_llog_finish: filter_llog_finish,
o_iocontrol: filter_iocontrol,
};
o_get_info: filter_get_info,
o_set_info: filter_set_info,
o_setup: filter_san_setup,
+ o_precleanup: filter_precleanup,
o_cleanup: filter_cleanup,
o_connect: filter_connect,
o_disconnect: filter_disconnect,
o_statfs: filter_statfs,
o_getattr: filter_getattr,
+ o_unpackmd: filter_unpackmd,
o_create: filter_create,
o_setattr: filter_setattr,
o_destroy: filter_destroy,
- o_open: filter_open,
- o_close: filter_close,
o_brw: filter_brw,
o_punch: filter_truncate,
+ o_sync: filter_sync,
o_preprw: filter_preprw,
o_commitrw: filter_commitrw,
- o_log_cancel: filter_log_cancel,
o_san_preprw: filter_san_preprw,
o_destroy_export: filter_destroy_export,
+ o_llog_init: filter_llog_init,
+ o_llog_finish: filter_llog_finish,
o_iocontrol: filter_iocontrol,
};
struct lprocfs_static_vars lvars;
int rc;
- printk(KERN_INFO "Lustre Filtering OBD driver; info@clusterfs.com\n");
+ printk(KERN_INFO "Lustre: Filtering OBD driver; info@clusterfs.com\n");
lprocfs_init_vars(filter, &lvars);