#include <linux/obd.h>
#include <linux/lustre_idl.h>
-struct obd_trans_info;
-struct obd_device;
-struct lov_stripe_md;
-
struct plain_handle_data {
struct list_head phd_entry;
struct llog_handle *phd_cat_handle;
struct llog_log_hdr *lgh_hdr;
struct file *lgh_file;
int lgh_last_idx;
+ struct llog_obd_ctxt *lgh_ctxt;
union {
struct plain_handle_data phd;
struct cat_handle_data chd;
/* llog.c - general API */
typedef int (*llog_cb_t)(struct llog_handle *, struct llog_rec_hdr *, void *);
-int llog_init_handle(struct llog_handle *handle, int flags, struct obd_uuid *uuid);
+int llog_init_handle(struct llog_handle *handle, int flags,
+ struct obd_uuid *uuid);
int llog_process(struct llog_handle *loghandle, llog_cb_t cb, void *data);
extern struct llog_handle *llog_alloc_handle(void);
extern void llog_free_handle(struct llog_handle *handle);
extern int llog_close(struct llog_handle *cathandle);
+extern int llog_cancel_rec(struct llog_handle *loghandle, int index);
/* llog_cat.c - catalog api */
struct llog_process_data {
int llog_cat_process(struct llog_handle *cat_llh, llog_cb_t cb, void *data);
/* llog_obd.c */
-int obd_llog_setup(struct obd_device *obd, struct obd_device *disk_obd,
- int index, int count, struct llog_logid *logid);
-int obd_llog_cleanup(struct obd_device *obd);
-int obd_llog_origin_add(struct obd_export *exp,
- int index,
+int llog_setup(struct obd_device *obd, int index, struct obd_device *disk_obd,
+ int count, struct llog_logid *logid, struct llog_operations *op);
+int llog_cleanup(struct llog_obd_ctxt *);
+int llog_add(struct llog_obd_ctxt *ctxt,
struct llog_rec_hdr *rec, struct lov_stripe_md *lsm,
struct llog_cookie *logcookies, int numcookies);
-int obd_llog_repl_cancel(struct obd_device *, struct lov_stripe_md *lsm,
- int count, struct llog_cookie *cookies, int flags);
-
-int llog_obd_setup(struct obd_device *obd, struct obd_device *disk_obd,
- int index, int count, struct llog_logid *logid);
-int llog_obd_cleanup(struct obd_device *obd);
-int llog_obd_origin_add(struct obd_export *exp,
- int index,
+int llog_cancel(struct llog_obd_ctxt *, struct lov_stripe_md *lsm,
+ int count, struct llog_cookie *cookies, int flags);
+
+int llog_obd_origin_setup(struct obd_device *obd, int index,
+ struct obd_device *disk_obd, int count,
+ struct llog_logid *logid);
+int llog_obd_origin_cleanup(struct llog_obd_ctxt *ctxt);
+int llog_obd_origin_add(struct llog_obd_ctxt *ctxt,
struct llog_rec_hdr *rec, struct lov_stripe_md *lsm,
struct llog_cookie *logcookies, int numcookies);
-int llog_initialize(struct obd_device *obd);
-int llog_disconnect(struct obd_device *obd);
+
int llog_cat_initialize(struct obd_device *obd, int count);
+int obd_llog_init(struct obd_device *obd, struct obd_device *disk_obd,
+ int count, struct llog_logid *logid);
+int obd_llog_finish(struct obd_device *obd, int count);
/* llog_net.c */
-int llog_initiator_connect(struct obd_device *obd);
-int llog_receptor_accept(struct obd_device *obd, struct obd_import *imp);
-int llog_origin_handle_cancel(struct obd_device *obd, struct ptlrpc_request *req);
+int llog_initiator_connect(struct llog_obd_ctxt *ctxt);
+int llog_receptor_accept(struct llog_obd_ctxt *ctxt, struct obd_import *imp);
+int llog_origin_handle_cancel(struct llog_obd_ctxt *ctxt,
+ struct ptlrpc_request *req);
/* recov_thread.c */
-int llog_obd_repl_cancel(struct obd_device *obd,
+int llog_obd_repl_cancel(struct llog_obd_ctxt *ctxt,
struct lov_stripe_md *lsm, int count,
struct llog_cookie *cookies, int flags);
__u64 *offset,
void *buf,
int len);
- int (*lop_create)(struct obd_device *obd, struct llog_handle **,
+ int (*lop_create)(struct llog_obd_ctxt *ctxt, struct llog_handle **,
struct llog_logid *logid, char *name);
int (*lop_close)(struct llog_handle *handle);
int (*lop_read_header)(struct llog_handle *handle);
- /* for devices in stacks, using other obd's for log storage */
- int (*lop_origin_setup)(struct obd_device *, int);
- int (*lop_origin_cleanup)(struct obd_device *);
- int (*lop_origin_open)(struct obd_device *originator,
- struct obd_device *disk_obd,
- int index, int named, int flags,
- struct obd_uuid *log_uuid);
+
+ int (*lop_setup)(struct obd_device *obd, int ctxt_idx,
+ struct obd_device *disk_obd, int count,
+ struct llog_logid *logid);
+ int (*lop_cleanup)(struct llog_obd_ctxt *ctxt);
+ int (*lop_add)(struct llog_obd_ctxt *ctxt, struct llog_rec_hdr *rec,
+ struct lov_stripe_md *lsm,
+ struct llog_cookie *logcookies, int numcookies);
+ int (*lop_cancel)(struct llog_obd_ctxt *ctxt, struct lov_stripe_md *lsm,
+ int count, struct llog_cookie *cookies, int flags);
+ /* XXX add 2 more: commit callbacks and llog recovery functions */
};
extern struct llog_operations llog_lvfs_ops;
-static inline int llog_obd2ops(struct obd_device *obd,
+/* MDS stored handles in OSC */
+#define LLOG_OBD_DEL_LOG_HANDLE 0
+
+/* OBDFILTER stored handles in OBDFILTER */
+#define LLOG_OBD_SZ_LOG_HANDLE 0
+#define LLOG_OBD_RD1_LOG_HANDLE 1
+
+struct llog_obd_ctxt {
+ int loc_idx; /* my index the obd array of ctxt's */
+ struct obd_device *loc_obd; /* points back to the containing obd*/
+ struct obd_export *loc_exp;
+ struct obd_import *loc_imp; /* to use in RPC's: can be backward
+ pointing import */
+ struct llog_operations *loc_logops;
+ struct llog_handle *loc_handle;
+ struct llog_commit_data *loc_llcd;
+ struct semaphore loc_sem; /* protects loc_llcd */
+};
+
+#if 0
+int obd_log_cancel(struct obd_export *exp, struct llog_handle *cathandle,
+ void *buf, int count, struct llog_cookie *cookies,
+ int flags);
+
+
+int llog_originator_setup(struct obd_device *, int);
+int llog_originator_cleanup(struct obd_device *);
+int llog_originator_open(struct obd_device *originator,
+ struct obd_device *disk_obd,
+ int index, int named, int flags,
+ struct obd_uuid *log_uuid);
+#endif
+
+static inline int llog_obd2ops(struct llog_obd_ctxt *ctxt,
struct llog_operations **lop)
{
- struct obd_export *exp;
-
- if (obd == NULL)
- return -ENOTCONN;
- exp = obd->obd_log_exp;
- if (exp == NULL)
- return -ENOTCONN;
- if (exp->exp_obd == NULL)
+ if (ctxt == NULL)
return -ENOTCONN;
- *lop = exp->exp_obd->obd_logops;
+ *lop = ctxt->loc_logops;
if (*lop == NULL)
return -EOPNOTSUPP;
return 0;
{
if (loghandle == NULL)
return -EINVAL;
- return llog_obd2ops(loghandle->lgh_obd, lop);
+ return llog_obd2ops(loghandle->lgh_ctxt, lop);
}
+static inline int llog_data_len(int len)
+{
+ int mask = LLOG_MIN_REC_SIZE - 1;
+ int remains = LLOG_MIN_REC_SIZE - sizeof(struct llog_rec_hdr) -
+ sizeof(struct llog_rec_tail);
+
+ return (len <= remains) ?
+ remains : (((len + mask) & (~mask)) + remains);
+}
static inline int llog_write_rec(struct llog_handle *handle,
struct llog_rec_hdr *rec,
RETURN(rc);
}
-static inline int llog_create(struct obd_device *obd, struct llog_handle **res,
+static inline int llog_create(struct llog_obd_ctxt *ctxt,
+ struct llog_handle **res,
struct llog_logid *logid, char *name)
{
struct llog_operations *lop;
int rc;
ENTRY;
- rc = llog_obd2ops(obd, &lop);
+ rc = llog_obd2ops(ctxt, &lop);
if (rc)
RETURN(rc);
if (lop->lop_create == NULL)
RETURN(-EOPNOTSUPP);
- rc = lop->lop_create(obd, res, logid, name);
+ rc = lop->lop_create(ctxt, res, logid, name);
RETURN(rc);
}
-
-
-/* llog obd interfaces */
-#define LLOG_OBD_MAX_HANDLES 3
-
-/* MDS stored handles in OSC */
-#define LLOG_OBD_DEL_LOG_HANDLE 0
-
-/* OBDFILTER stored handles in OBDFILTER */
-#define LLOG_OBD_SZ_LOG_HANDLE 0
-#define LLOG_OBD_RD1_LOG_HANDLE 1
-
-struct llog_obd_ctxt {
- struct obd_device *loc_obd;
- struct llog_handle *loc_handles[LLOG_OBD_MAX_HANDLES];
- struct llog_commit_data *loc_llcd;
- struct semaphore loc_sem; /* protects loc_llcd */
- struct obd_import *loc_imp;
-};
-
-void llog_obd_cleanup_ctxt(struct obd_device *obd);
-int obd_log_cancel(struct obd_export *exp, struct llog_handle *cathandle,
- void *buf, int count, struct llog_cookie *cookies, int flags);
-
-
-int llog_originator_setup(struct obd_device *, int);
-int llog_originator_cleanup(struct obd_device *);
-int llog_originator_open(struct obd_device *originator,
- struct obd_device *disk_obd,
- int index, int named, int flags,
- struct obd_uuid *log_uuid);
-
-
-
#endif
}
void ll_options(char *options, char **ost, char **mdc, char **profile,
- char **mds_uuid, int *flags)
+ char **mds_uuid, char **mds_peer, int *flags)
{
char *this_char;
#if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0))
continue;
if (!*profile && (*profile = ll_read_opt("profile", this_char)))
continue;
- if (!*mds_uuid && (*mds_uuid = ll_read_opt("mds_uuid", this_char)))
+ if (!*mds_uuid && (*mds_uuid = ll_read_opt("mds_uuid",
+ this_char)))
+ continue;
+ if (!*mds_peer && (*mds_peer = ll_read_opt("mds_peer",
+ this_char)))
continue;
if (!(*flags & LL_SBI_NOLCK) &&
((*flags) = (*flags) |
lli->lli_maxbytes = PAGE_CACHE_MAXBYTES;
}
-int ll_process_log(char *mds, char *config, struct config_llog_instance *cfg)
+int ll_process_log(char *mds, char *peer, char *config,
+ struct config_llog_instance *cfg)
{
struct lustre_cfg lcfg;
struct obd_device *obd;
struct obd_export *exp;
char * name = "mdc_dev";
struct obd_uuid uuid = { "MDC_mount_UUID" };
+ struct llog_obd_ctxt *ctxt;
int rc = 0;
int err;
ENTRY;
LCFG_INIT(lcfg, LCFG_SETUP, name);
lcfg.lcfg_inlbuf1 = mds;
lcfg.lcfg_inllen1 = strlen(lcfg.lcfg_inlbuf1) + 1;
- lcfg.lcfg_inlbuf2 = "NET_mds_facet_tcp_UUID";
+ lcfg.lcfg_inlbuf2 = peer;
lcfg.lcfg_inllen2 = strlen(lcfg.lcfg_inlbuf2) + 1;
err = class_process_config(&lcfg);
if (err < 0)
- GOTO(out, err);
+ GOTO(out_detach, err);
obd = class_name2obd(name);
if (obd == NULL)
- GOTO(out, err = -EINVAL);
+ GOTO(out_cleanup, err = -EINVAL);
err = obd_connect(&mdc_conn, obd, &uuid);
if (err) {
CERROR("cannot connect to %s: rc = %d\n", mds, err);
- GOTO(out, err);
+ GOTO(out_cleanup, err);
}
exp = class_conn2export(&mdc_conn);
- rc = class_config_parse_llog(exp, config, cfg);
+ ctxt = exp->exp_obd->obd_llog_ctxt[LLOG_CONFIG_REPL_CTXT];
+ rc = class_config_parse_llog(ctxt, config, cfg);
if (rc) {
- CERROR("mdc_llog_process failed: rc = %d\n", err);
+ CERROR("class_config_parse_llog failed: rc = %d\n", rc);
}
err = obd_disconnect(exp, 0);
+out_cleanup:
LCFG_INIT(lcfg, LCFG_CLEANUP, name);
err = class_process_config(&lcfg);
if (err < 0)
GOTO(out, err);
+out_detach:
+
LCFG_INIT(lcfg, LCFG_DETACH, name);
err = class_process_config(&lcfg);
if (err < 0)
char *osc = NULL;
char *mdc = NULL;
char *mds_uuid = NULL;
+ char *mds_peer = NULL;
char *profile = NULL;
int err;
struct ll_fid rootfid;
class_uuid_unparse(uuid, &sbi->ll_sb_uuid);
sbi->ll_flags |= LL_SBI_READAHEAD;
- ll_options(data, &osc, &mdc, &profile, &mds_uuid, &sbi->ll_flags);
+ ll_options(data, &osc, &mdc, &profile, &mds_uuid, &mds_peer,
+ &sbi->ll_flags);
if (profile) {
struct lustre_profile *lprof;
CERROR("no mds_uuid\n");
GOTO(out_free, err = -EINVAL);
}
+
+ if (!mds_peer) {
+ CERROR("no mds_peer\n");
+ GOTO(out_free, err = -EINVAL);
+ }
/* save these so we can cleanup later */
obd_str2uuid(&sbi->ll_mds_uuid, mds_uuid);
+ obd_str2uuid(&sbi->ll_mds_peer_uuid, mds_peer);
len = strlen(profile) + 1;
OBD_ALLOC(sbi->ll_profile, len);
cfg.cfg_instance = sbi->ll_instance;
cfg.cfg_uuid = sbi->ll_sb_uuid;
- err = ll_process_log(mds_uuid, profile, &cfg);
+ err = ll_process_log(mds_uuid, mds_peer, profile, &cfg);
if (err < 0) {
CERROR("Unable to process log: %s\n", profile);
CERROR("could not register mount in /proc/lustre");
}
+ mdc_init_ea_size(obd, osc);
+
err = obd_connect(&mdc_conn, obd, &sbi->ll_sb_uuid);
if (err) {
CERROR("cannot connect to %s: rc = %d\n", mdc, err);
OBD_FREE(profile, strlen(profile) + 1);
if (mds_uuid)
OBD_FREE(mds_uuid, strlen(mds_uuid) + 1);
+ if (mds_peer)
+ OBD_FREE(mds_peer, strlen(mds_peer) + 1);
RETURN(err);
out_free:
if (sbi->ll_profile != NULL) {
- int len = sizeof(sbi->ll_profile) + sizeof("-clean") + 1;
+ int len = strlen(sbi->ll_profile) + sizeof("-clean") + 1;
int err;
if (sbi->ll_instance != NULL) {
OBD_ALLOC(cln_prof, len);
sprintf(cln_prof, "%s-clean", sbi->ll_profile);
- err = ll_process_log(sbi->ll_mds_uuid.uuid, cln_prof,
- &cfg);
+ err = ll_process_log(sbi->ll_mds_uuid.uuid,
+ sbi->ll_mds_peer_uuid.uuid,
+ cln_prof, &cfg);
if (err < 0)
CERROR("Unable to process log: %s\n", cln_prof);
OBD_FREE(cln_prof, len);
if (sbi->ll_profile != NULL) {
char * cln_prof;
- int len = sizeof(sbi->ll_profile) + sizeof("-clean") + 1;
+ int len = strlen(sbi->ll_profile) + sizeof("-clean") + 1;
int err;
struct config_llog_instance cfg;
OBD_ALLOC(cln_prof, len);
sprintf(cln_prof, "%s-clean", sbi->ll_profile);
- err = ll_process_log(sbi->ll_mds_uuid.uuid, cln_prof, &cfg);
+ err = ll_process_log(sbi->ll_mds_uuid.uuid,
+ sbi->ll_mds_peer_uuid.uuid,
+ cln_prof, &cfg);
if (err < 0)
CERROR("Unable to process log: %s\n", cln_prof);
rc = ll_extent_lock_no_validate(NULL, inode, lsm, LCK_PW,
&extent, &lockh, ast_flags);
down(&inode->i_sem);
- if (rc != ELDLM_OK) {
- if (rc > 0)
- RETURN(-ENOLCK);
+ if (rc != ELDLM_OK)
RETURN(rc);
- }
rc = vmtruncate(inode, attr->ia_size);
if (rc == 0)