yesterday's merge brought in some nasty bugs.
#include <linux/lustre_log.h>
-struct llog_commit_data {
+struct llog_canceld_ctxt {
struct list_head llcd_list; /* free or pending struct list */
struct obd_import *llcd_import;
struct llog_commit_master *llcd_lcm;
int llcd_tries; /* number of tries to send */
+ struct llog_ctxt_gen llcd_gen;
int llcd_cookiebytes;
struct llog_cookie llcd_cookies[0];
};
int lcm_flags;
wait_queue_head_t lcm_waitq;
- struct list_head lcm_llcd_pending; /* llog_commit_data to send */
+ struct list_head lcm_llcd_pending; /* llog_canceld_ctxt to send */
struct list_head lcm_llcd_resend; /* try to resend this data */
- struct list_head lcm_llcd_free; /* free llog_commit_data */
+ struct list_head lcm_llcd_free; /* free llog_canceld_ctxt */
spinlock_t lcm_llcd_lock; /* protects llcd_free */
atomic_t lcm_llcd_numfree; /* items on llcd_free */
int lcm_llcd_minfree; /* min free on llcd_free */
/* ptlrpc/recov_thread.c */
int llog_start_commit_thread(void);
-struct llog_commit_data *llcd_grab(void);
-void llcd_send(struct llog_commit_data *llcd);
+struct llog_canceld_ctxt *llcd_grab(void);
+void llcd_send(struct llog_canceld_ctxt *llcd);
#endif /* _LUSTRE_COMMIT_CONFD_H */
#include <linux/obd.h>
#include <linux/lustre_idl.h>
+#define LOG_NAME_LIMIT(logname, name) \
+ snprintf(logname, sizeof(logname), "LOGS/%s", name)
+
struct plain_handle_data {
struct list_head phd_entry;
struct llog_handle *phd_cat_handle;
struct llog_handle {
struct semaphore lgh_lock;
struct llog_logid lgh_id; /* id of this log */
- struct obd_device *lgh_obd;
struct llog_log_hdr *lgh_hdr;
struct file *lgh_file;
int lgh_last_idx;
- struct llog_obd_ctxt *lgh_ctxt;
+ struct llog_ctxt *lgh_ctxt;
union {
struct plain_handle_data phd;
struct cat_handle_data chd;
/* llog_obd.c */
int llog_setup(struct obd_device *obd, int index, struct obd_device *disk_obd,
int count, struct llog_logid *logid, struct llog_operations *op);
-int llog_cleanup(struct llog_obd_ctxt *);
-int llog_add(struct llog_obd_ctxt *ctxt,
+int llog_cleanup(struct llog_ctxt *);
+int llog_precleanup(struct llog_ctxt *);
+int llog_add(struct llog_ctxt *ctxt,
struct llog_rec_hdr *rec, struct lov_stripe_md *lsm,
struct llog_cookie *logcookies, int numcookies);
-int llog_cancel(struct llog_obd_ctxt *, struct lov_stripe_md *lsm,
+int llog_cancel(struct llog_ctxt *, struct lov_stripe_md *lsm,
int count, struct llog_cookie *cookies, int flags);
int llog_obd_origin_setup(struct obd_device *obd, int index,
struct obd_device *disk_obd, int count,
struct llog_logid *logid);
-int llog_obd_origin_cleanup(struct llog_obd_ctxt *ctxt);
-int llog_obd_origin_add(struct llog_obd_ctxt *ctxt,
+int llog_obd_origin_cleanup(struct llog_ctxt *ctxt);
+int llog_obd_origin_add(struct llog_ctxt *ctxt,
struct llog_rec_hdr *rec, struct lov_stripe_md *lsm,
struct llog_cookie *logcookies, int numcookies);
int obd_llog_finish(struct obd_device *obd, int count);
/* llog_net.c */
-int llog_initiator_connect(struct llog_obd_ctxt *ctxt);
-int llog_receptor_accept(struct llog_obd_ctxt *ctxt, struct obd_import *imp);
-int llog_origin_handle_cancel(struct llog_obd_ctxt *ctxt,
- struct ptlrpc_request *req);
+int llog_initiator_connect(struct llog_ctxt *ctxt);
+int llog_receptor_accept(struct llog_ctxt *ctxt, struct obd_import *imp);
+int llog_origin_handle_cancel(struct ptlrpc_request *req);
+int llog_origin_connect(struct llog_ctxt *ctxt, int count,
+ struct llog_logid *logid, struct llog_ctxt_gen *gen);
+int llog_handle_connect(struct ptlrpc_request *req);
/* recov_thread.c */
-int llog_obd_repl_cancel(struct llog_obd_ctxt *ctxt,
+int llog_obd_repl_cancel(struct llog_ctxt *ctxt,
struct lov_stripe_md *lsm, int count,
struct llog_cookie *cookies, int flags);
+int llog_repl_connect(struct llog_ctxt *ctxt, int count,
+ struct llog_logid *logid, struct llog_ctxt_gen *gen);
struct llog_operations {
int (*lop_write_rec)(struct llog_handle *loghandle,
__u64 *offset,
void *buf,
int len);
- int (*lop_create)(struct llog_obd_ctxt *ctxt, struct llog_handle **,
+ int (*lop_create)(struct llog_ctxt *ctxt, struct llog_handle **,
struct llog_logid *logid, char *name);
int (*lop_close)(struct llog_handle *handle);
int (*lop_read_header)(struct llog_handle *handle);
int (*lop_setup)(struct obd_device *obd, int ctxt_idx,
struct obd_device *disk_obd, int count,
struct llog_logid *logid);
- int (*lop_cleanup)(struct llog_obd_ctxt *ctxt);
- int (*lop_add)(struct llog_obd_ctxt *ctxt, struct llog_rec_hdr *rec,
+ int (*lop_precleanup)(struct llog_ctxt *ctxt);
+ int (*lop_cleanup)(struct llog_ctxt *ctxt);
+ int (*lop_add)(struct llog_ctxt *ctxt, struct llog_rec_hdr *rec,
struct lov_stripe_md *lsm,
struct llog_cookie *logcookies, int numcookies);
- int (*lop_cancel)(struct llog_obd_ctxt *ctxt, struct lov_stripe_md *lsm,
+ int (*lop_cancel)(struct llog_ctxt *ctxt, struct lov_stripe_md *lsm,
int count, struct llog_cookie *cookies, int flags);
+ int (*lop_connect)(struct llog_ctxt *ctxt, int count,
+ struct llog_logid *logid, struct llog_ctxt_gen *gen);
/* XXX add 2 more: commit callbacks and llog recovery functions */
};
extern struct llog_operations llog_lvfs_ops;
-/* MDS stored handles in OSC */
-#define LLOG_OBD_DEL_LOG_HANDLE 0
-
-/* OBDFILTER stored handles in OBDFILTER */
-#define LLOG_OBD_SZ_LOG_HANDLE 0
-#define LLOG_OBD_RD1_LOG_HANDLE 1
-struct llog_obd_ctxt {
+struct llog_ctxt {
int loc_idx; /* my index the obd array of ctxt's */
+ struct llog_ctxt_gen loc_gen;
struct obd_device *loc_obd; /* points back to the containing obd*/
struct obd_export *loc_exp;
struct obd_import *loc_imp; /* to use in RPC's: can be backward
pointing import */
struct llog_operations *loc_logops;
struct llog_handle *loc_handle;
- struct llog_commit_data *loc_llcd;
+ struct llog_canceld_ctxt *loc_llcd;
struct semaphore loc_sem; /* protects loc_llcd */
+ void *llog_proc_cb;
};
-#if 0
-int obd_log_cancel(struct obd_export *exp, struct llog_handle *cathandle,
- void *buf, int count, struct llog_cookie *cookies,
- int flags);
+static inline void log_gen_init(struct llog_ctxt *ctxt)
+{
+ struct obd_device *obd = ctxt->loc_exp->exp_obd;
+ if (!strcmp(obd->obd_type->typ_name, "mds"))
+ ctxt->loc_gen.mnt_cnt = obd->u.mds.mds_mount_count;
+ else if (!strstr(obd->obd_type->typ_name, "filter")) {
+ ctxt->loc_gen.mnt_cnt = obd->u.filter.fo_mount_count;
+ }
+ else
+ ctxt->loc_gen.mnt_cnt = 0;
+}
+
+static inline int log_gen_lt(struct llog_ctxt_gen a, struct llog_ctxt_gen b)
+{
+ if (a.mnt_cnt < b.mnt_cnt)
+ return 1;
+ if (a.mnt_cnt > b.mnt_cnt)
+ return 0;
+ return(a.conn_cnt < b.conn_cnt ? 1 : 0);
+}
-int llog_originator_setup(struct obd_device *, int);
-int llog_originator_cleanup(struct obd_device *);
-int llog_originator_open(struct obd_device *originator,
- struct obd_device *disk_obd,
- int index, int named, int flags,
- struct obd_uuid *log_uuid);
-#endif
-static inline int llog_obd2ops(struct llog_obd_ctxt *ctxt,
+static inline int llog_obd2ops(struct llog_ctxt *ctxt,
struct llog_operations **lop)
{
if (ctxt == NULL)
remains : (((len + mask) & (~mask)) + remains);
}
+static inline struct llog_ctxt *llog_get_context(struct obd_device *obd,
+ int index)
+{
+ if (index < 0 || index >= LLOG_MAX_CTXTS)
+ return NULL;
+ else
+ return obd->obd_llog_ctxt[index];
+}
+
static inline int llog_write_rec(struct llog_handle *handle,
struct llog_rec_hdr *rec,
struct llog_cookie *logcookies,
RETURN(rc);
}
-static inline int llog_create(struct llog_obd_ctxt *ctxt,
+static inline int llog_create(struct llog_ctxt *ctxt,
struct llog_handle **res,
struct llog_logid *logid, char *name)
{
rc = lop->lop_create(ctxt, res, logid, name);
RETURN(rc);
}
+
+static inline int llog_connect(struct llog_ctxt *ctxt, int count,
+ struct llog_logid *logid,
+ struct llog_ctxt_gen *gen)
+{
+ struct llog_operations *lop;
+ int rc;
+ ENTRY;
+
+ rc = llog_obd2ops(ctxt, &lop);
+ if (rc)
+ RETURN(rc);
+ if (lop->lop_connect == NULL)
+ RETURN(-EOPNOTSUPP);
+
+ rc = lop->lop_connect(ctxt, count, logid, gen);
+ RETURN(rc);
+}
+
+static inline int cathandle_print_cb(struct llog_handle *llh,
+ struct llog_rec_hdr *rec, void *data)
+{
+ struct llog_logid_rec *lir = (struct llog_logid_rec *)rec;
+
+ if (le32_to_cpu(rec->lrh_type) != LLOG_LOGID_MAGIC) {
+ CERROR("invalid record in catalog\n");
+ RETURN(-EINVAL);
+ }
+
+ CDEBUG(D_HA, "seeing record at index %d in log "LPX64"\n",
+ le32_to_cpu(rec->lrh_index), lir->lid_id.lgl_oid);
+ RETURN(0);
+}
#endif
char * name = "mdc_dev";
class_uuid_t uuid;
struct obd_uuid mdc_uuid;
- struct llog_obd_ctxt *ctxt;
+ struct llog_ctxt *ctxt;
int rc = 0;
int err;
ENTRY;
exp = class_conn2export(&mdc_conn);
- ctxt = exp->exp_obd->obd_llog_ctxt[LLOG_CONFIG_REPL_CTXT];
+ ctxt = llog_get_context(exp->exp_obd, LLOG_CONFIG_REPL_CTXT);
rc = class_config_parse_llog(ctxt, profile, cfg);
if (rc) {
CERROR("class_config_parse_llog failed: rc = %d\n", rc);
list_for_each_safe(&import->import_cc_list, tmp, save) {
- struct llog_commit_data *cd;
+ struct llog_canceld_ctxt *cd;
if (atomic_read(import->import_cc_count) <=
lccd->llcconf_lowwater)
break;
- cd = list_entry(tmp, struct llog_commit_data *, llcconf_entry);
+ cd = list_entry(tmp, struct llog_canceld_ctxt *, llcconf_entry);
atomic_dec(&import->import_cc_count);
commit_confd_add_and_fire(cd);
}
__u32 io_epoch,
struct llog_cookie *logcookie,
struct inode *inode);
-int filter_get_catalog(struct obd_device *);
+//int filter_get_catalog(struct obd_device *);
void filter_cancel_cookies_cb(struct obd_device *obd, __u64 transno,
void *cb_data, int error);
-
+int filter_recov_log_unlink_cb(struct llog_handle *llh,
+ struct llog_rec_hdr *rec, void *data);
/* filter_san.c */
int filter_san_setup(struct obd_device *obd, obd_count len, void *buf);
void *cb_data, int error)
{
struct llog_cookie *cookie = cb_data;
- llog_obd_repl_cancel(obd->obd_llog_ctxt[LLOG_UNLINK_REPL_CTXT],
- NULL, 1, cookie, OBD_LLOG_FL_SENDNOW);
+ llog_cancel(llog_get_context(obd, cookie->lgc_subsys + 1),
+ NULL, 1, cookie, 0);
+ //NULL, 1, cookie, OBD_LLOG_FL_SENDNOW);
OBD_FREE(cb_data, sizeof(struct llog_cookie));
}
+
+/* Callback for processing the unlink log record received from MDS by
+ * llog_client_api.
+ */
+int filter_recov_log_unlink_cb(struct llog_handle *llh,
+ struct llog_rec_hdr *rec, void *data)
+{
+ struct llog_ctxt *ctxt = llh->lgh_ctxt;
+ struct obd_device *obd = ctxt->loc_obd;
+ struct obd_export *exp = obd->obd_self_export;
+ struct llog_cookie cookie;
+ struct llog_unlink_rec *lur;
+ struct obdo *oa;
+ struct obd_trans_info oti = { 0 };
+ obd_id oid;
+ int rc = 0;
+ ENTRY;
+
+ if (!le32_to_cpu(llh->lgh_hdr->llh_flags) & LLOG_F_IS_PLAIN) {
+ CERROR("log is not plain\n");
+ RETURN(-EINVAL);
+ }
+ if (rec->lrh_type != MDS_UNLINK_REC) {
+ CERROR("log record type error\n");
+ RETURN(-EINVAL);
+ }
+
+ cookie.lgc_lgl = llh->lgh_id;
+ cookie.lgc_subsys = LLOG_UNLINK_ORIG_CTXT;
+ cookie.lgc_index = le32_to_cpu(rec->lrh_index);
+
+ lur = (struct llog_unlink_rec *)rec;
+
+ oa = obdo_alloc();
+ if (oa == NULL)
+ RETURN(-ENOMEM);
+ oa->o_valid |= OBD_MD_FLCOOKIE;
+ oa->o_id = lur->lur_oid;
+ oa->o_gr = lur->lur_ogen;
+ memcpy(obdo_logcookie(oa), &cookie, sizeof(cookie));
+ oid = oa->o_id;
+
+ rc = obd_destroy(exp, oa, NULL, &oti);
+ obdo_free(oa);
+ if (rc == -ENOENT) {
+ CERROR("object already removed: send cookie\n");
+ llog_cancel(ctxt, NULL, 1, &cookie, 0);
+ RETURN(0);
+ }
+
+ if (rc == 0)
+ CERROR("object: "LPU64" in record destroyed successful\n", oid);
+
+ RETURN(rc);
+}
/* Allocate new commit structs in case we do not have enough */
static int llcd_alloc(void)
{
- struct llog_commit_data *llcd;
+ struct llog_canceld_ctxt *llcd;
+ int offset = offsetof(struct llog_canceld_ctxt, llcd_cookies);
- OBD_ALLOC(llcd, PAGE_SIZE);
+ OBD_ALLOC(llcd, PAGE_SIZE + offset);
if (llcd == NULL)
return -ENOMEM;
}
/* Get a free cookie struct from the list */
-struct llog_commit_data *llcd_grab(void)
+struct llog_canceld_ctxt *llcd_grab(void)
{
- struct llog_commit_data *llcd;
+ struct llog_canceld_ctxt *llcd;
spin_lock(&lcm->lcm_llcd_lock);
if (list_empty(&lcm->lcm_llcd_free)) {
}
EXPORT_SYMBOL(llcd_grab);
-static void llcd_put(struct llog_commit_data *llcd)
+static void llcd_put(struct llog_canceld_ctxt *llcd)
{
+ int offset = offsetof(struct llog_canceld_ctxt, llcd_cookies);
+
if (atomic_read(&lcm->lcm_llcd_numfree) >= lcm->lcm_llcd_maxfree) {
- OBD_FREE(llcd, PAGE_SIZE);
+ OBD_FREE(llcd, PAGE_SIZE + offset);
} else {
spin_lock(&lcm->lcm_llcd_lock);
list_add(&llcd->llcd_list, &lcm->lcm_llcd_free);
}
/* Send some cookies to the appropriate target */
-void llcd_send(struct llog_commit_data *llcd)
+void llcd_send(struct llog_canceld_ctxt *llcd)
{
spin_lock(&llcd->llcd_lcm->lcm_llcd_lock);
list_add_tail(&llcd->llcd_list, &llcd->llcd_lcm->lcm_llcd_pending);
* log record for the deletion. The commit callback calls this
* function
*/
-int llog_obd_repl_cancel(struct llog_obd_ctxt *ctxt,
+int llog_obd_repl_cancel(struct llog_ctxt *ctxt,
struct lov_stripe_md *lsm, int count,
struct llog_cookie *cookies, int flags)
{
- struct llog_commit_data *llcd;
+ struct llog_canceld_ctxt *llcd;
int rc = 0;
ENTRY;
GOTO(out, rc = -ENOMEM);
}
llcd->llcd_import = ctxt->loc_imp;
+ llcd->llcd_gen = ctxt->loc_gen;
ctxt->loc_llcd = llcd;
}
- memcpy(llcd->llcd_cookies + llcd->llcd_cookiebytes, cookies,
+ memcpy((char *)llcd->llcd_cookies + llcd->llcd_cookiebytes, cookies,
sizeof(*cookies));
llcd->llcd_cookiebytes += sizeof(*cookies);
- GOTO(send_now, rc);
send_now:
if ((PAGE_SIZE - llcd->llcd_cookiebytes < sizeof(*cookies) ||
flags & OBD_LLOG_FL_SENDNOW)) {
+ CDEBUG(D_HA, "send llcd: %p\n", llcd);
ctxt->loc_llcd = NULL;
llcd_send(llcd);
}
{
struct llog_commit_master *lcm = arg;
struct llog_commit_daemon *lcd;
- struct llog_commit_data *llcd, *n;
+ struct llog_canceld_ctxt *llcd, *n;
unsigned long flags;
ENTRY;
/* We are the only one manipulating our local list - no lock */
list_for_each_entry_safe(llcd,n, &lcd->lcd_llcd_list,llcd_list){
char *bufs[1] = {(char *)llcd->llcd_cookies};
+ struct obd_device *obd = import->imp_obd;
+ struct llog_ctxt *ctxt;
+
list_del(&llcd->llcd_list);
+ if (llcd->llcd_cookiebytes == 0) {
+ CDEBUG(D_HA, "just put empty llcd %p\n", llcd);
+ llcd_put(llcd);
+ continue;
+ }
+ /* check whether the cookies are new. if new then send, otherwise
+ * just put llcd */
+ ctxt = llog_get_context(obd, llcd->llcd_cookies[0].lgc_subsys + 1);
+ LASSERT(ctxt != NULL);
+ down(&ctxt->loc_sem);
+ if (log_gen_lt(llcd->llcd_gen, ctxt->loc_gen)) {
+ up(&ctxt->loc_sem);
+ CDEBUG(D_HA, "just put stale llcd %p\n", llcd);
+ llcd_put(llcd);
+ continue;
+ }
+ up(&ctxt->loc_sem);
request = ptlrpc_prep_req(import, OBD_LOG_CANCEL, 1,
&llcd->llcd_cookiebytes,
continue;
}
+#if 0 /* FIXME just put llcd, not send it again */
spin_lock(&lcm->lcm_llcd_lock);
list_splice(&lcd->lcd_llcd_list, &lcm->lcm_llcd_resend);
if (++llcd->llcd_tries < 5) {
spin_unlock(&lcm->lcm_llcd_lock);
} else {
spin_unlock(&lcm->lcm_llcd_lock);
+#endif
CERROR("commit %p dropped %d cookies: rc %d\n",
llcd, (int)(llcd->llcd_cookiebytes /
sizeof(*llcd->llcd_cookies)),
rc);
llcd_put(llcd);
- }
+// }
break;
}
}
EXPORT_SYMBOL(llog_start_commit_thread);
+static struct llog_process_args {
+ struct semaphore llpa_sem;
+ struct llog_ctxt *llpa_ctxt;
+ void *llpa_cb;
+ void *llpa_arg;
+} llpa;
int llog_init_commit_master(void)
{
INIT_LIST_HEAD(&lcm->lcm_thread_busy);
atomic_set(&lcm->lcm_llcd_numfree, 0);
lcm->lcm_llcd_minfree = 0;
lcm->lcm_thread_max = 5;
+ /* FIXME initialize semaphore for llog_process_args */
+ sema_init(&llpa.llpa_sem, 1);
return 0;
}
return 0;
}
+
+static int log_process_thread(void *args)
+{
+ struct llog_process_args *data = args;
+ struct llog_ctxt *ctxt = data->llpa_ctxt;
+ void *cb = data->llpa_cb;
+ struct llog_logid logid = *(struct llog_logid *)(data->llpa_arg);
+ struct llog_handle *llh = NULL;
+ unsigned long flags;
+ int rc;
+ ENTRY;
+
+ up(&data->llpa_sem);
+ lock_kernel();
+ ptlrpc_daemonize(); /* thread never needs to do IO */
+
+ SIGNAL_MASK_LOCK(current, flags);
+ sigfillset(¤t->blocked);
+ RECALC_SIGPENDING;
+ SIGNAL_MASK_UNLOCK(current, flags);
+ unlock_kernel();
+
+ rc = llog_create(ctxt, &llh, &logid, NULL);
+ if (rc) {
+ CERROR("llog_create failed %d\n", rc);
+ RETURN(rc);
+ }
+ rc = llog_init_handle(llh, LLOG_F_IS_CAT, NULL);
+ if (rc) {
+ CERROR("llog_init_handle failed %d\n", rc);
+ GOTO(out, rc);
+ }
+
+ rc = llog_process(llh, cathandle_print_cb, NULL);
+ if (rc) {
+ CERROR("llog_process with cathandle_print_cb failed %d\n", rc);
+ GOTO(out, rc);
+ }
+
+ if (cb) {
+ rc = llog_cat_process(llh, (llog_cb_t)cb, NULL);
+ if (rc)
+ CERROR("llog_cat_process failed %d\n", rc);
+ } else
+ CERROR("no cb func for recovery\n");
+
+ CDEBUG(D_HA, "send to llcd :%p forcibly\n", ctxt->loc_llcd);
+ llog_cancel(ctxt, NULL, 0, NULL, OBD_LLOG_FL_SENDNOW);
+out:
+ rc = llog_cat_put(llh);
+ if (rc)
+ CERROR("llog_cat_put failed %d\n", rc);
+
+ RETURN(rc);
+}
+static int llog_recovery_generic(struct llog_ctxt *ctxt,
+ void *handle,
+ void *arg)
+{
+ int rc;
+ ENTRY;
+
+ down(&llpa.llpa_sem);
+ llpa.llpa_ctxt = ctxt;
+ llpa.llpa_cb = handle;
+ llpa.llpa_arg = arg;
+
+ rc = kernel_thread(log_process_thread, &llpa, CLONE_VM | CLONE_FILES);
+ if (rc < 0)
+ CERROR("error starting log_process_thread: %d\n", rc);
+ else {
+ CDEBUG(D_HA, "log_process_thread: %d\n", rc);
+ rc = 0;
+ }
+
+ RETURN(rc);
+}
+int llog_repl_connect(struct llog_ctxt *ctxt, int count,
+ struct llog_logid *logid, struct llog_ctxt_gen *gen)
+{
+ struct llog_canceld_ctxt *llcd;
+ int rc;
+ ENTRY;
+
+ down(&ctxt->loc_sem);
+ ctxt->loc_gen = *gen;
+ llcd = ctxt->loc_llcd;
+ if (llcd) {
+ CDEBUG(D_HA, "put current llcd when new connection arrives\n");
+ llcd_put(llcd);
+ }
+ llcd = llcd_grab();
+ if (llcd == NULL) {
+ CERROR("couldn't get an llcd\n");
+ RETURN(-ENOMEM);
+ }
+ llcd->llcd_import = ctxt->loc_imp;
+ llcd->llcd_gen = ctxt->loc_gen;
+ ctxt->loc_llcd = llcd;
+ up(&ctxt->loc_sem);
+
+ rc = llog_recovery_generic(ctxt, ctxt->llog_proc_cb, logid);
+ if (rc != 0)
+ CERROR("error recovery process: %d\n", rc);
+
+ RETURN(rc);
+}
+EXPORT_SYMBOL(llog_repl_connect);
+
#else /* !__KERNEL__ */
-int llog_obd_repl_cancel(struct llog_obd_ctxt *ctxt,
+int llog_obd_repl_cancel(struct llog_ctxt *ctxt,
struct lov_stripe_md *lsm, int count,
struct llog_cookie *cookies, int flags)
{