Most of the changes are from the landing of b_orphan onto HEAD.
struct llog_canceld_ctxt {
struct list_head llcd_list; /* free or pending struct list */
- struct obd_import *llcd_import;
+ struct llog_ctxt *llcd_ctxt;
struct llog_commit_master *llcd_lcm;
int llcd_tries; /* number of tries to send */
- struct llog_ctxt_gen llcd_gen;
int llcd_cookiebytes;
struct llog_cookie llcd_cookies[0];
};
snprintf(logname, sizeof(logname), "LOGS/%s", name)
struct plain_handle_data {
- struct list_head phd_entry;
- struct llog_handle *phd_cat_handle;
- struct llog_cookie phd_cookie; /* cookie of this log in its cat */
- int phd_last_idx;
+ struct list_head phd_entry;
+ struct llog_handle *phd_cat_handle;
+ struct llog_cookie phd_cookie; /* cookie of this log in its cat */
+ int phd_last_idx;
};
struct cat_handle_data {
/* llog.c - general API */
typedef int (*llog_cb_t)(struct llog_handle *, struct llog_rec_hdr *, void *);
-int llog_init_handle(struct llog_handle *handle, int flags,
+int llog_init_handle(struct llog_handle *handle, int flags,
struct obd_uuid *uuid);
-int llog_process(struct llog_handle *loghandle, llog_cb_t cb, void *data);
+int llog_process(struct llog_handle *loghandle, llog_cb_t cb,
+ void *data, void *catdata);
extern struct llog_handle *llog_alloc_handle(void);
extern void llog_free_handle(struct llog_handle *handle);
extern int llog_close(struct llog_handle *cathandle);
void *lpd_data;
llog_cb_t lpd_cb;
};
+
+struct llog_process_cat_data {
+ int first_idx;
+ int last_idx;
+ /* to process catlog across zero record */
+};
+
int llog_cat_put(struct llog_handle *cathandle);
int llog_cat_add_rec(struct llog_handle *cathandle, struct llog_rec_hdr *rec,
struct llog_cookie *reccookie, void *buf);
int llog_cat_cancel_records(struct llog_handle *cathandle, int count,
struct llog_cookie *cookies);
int llog_cat_process(struct llog_handle *cat_llh, llog_cb_t cb, void *data);
+int llog_cat_set_first_idx(struct llog_handle *cathandle, int index);
/* llog_obd.c */
int llog_setup(struct obd_device *obd, int index, struct obd_device *disk_obd,
- int count, struct llog_logid *logid, struct llog_operations *op);
+ int count, struct llog_logid *logid,struct llog_operations *op);
int llog_cleanup(struct llog_ctxt *);
int llog_sync(struct llog_ctxt *ctxt, struct obd_export *exp);
-int llog_add(struct llog_ctxt *ctxt,
- struct llog_rec_hdr *rec, struct lov_stripe_md *lsm,
- struct llog_cookie *logcookies, int numcookies);
+int llog_add(struct llog_ctxt *ctxt, struct llog_rec_hdr *rec,
+ struct lov_stripe_md *lsm, struct llog_cookie *logcookies,
+ int numcookies);
int llog_cancel(struct llog_ctxt *, struct lov_stripe_md *lsm,
- int count, struct llog_cookie *cookies, int flags);
+ int count, struct llog_cookie *cookies, int flags);
-int llog_obd_origin_setup(struct obd_device *obd, int index,
- struct obd_device *disk_obd, int count,
+int llog_obd_origin_setup(struct obd_device *obd, int index,
+ struct obd_device *disk_obd, int count,
struct llog_logid *logid);
int llog_obd_origin_cleanup(struct llog_ctxt *ctxt);
int llog_obd_origin_add(struct llog_ctxt *ctxt,
/* llog_ioctl.c */
int llog_ioctl(struct llog_ctxt *ctxt, int cmd, struct obd_ioctl_data *data);
-int llog_catlog_list(struct obd_device *obd, int count,
+int llog_catlog_list(struct obd_device *obd, int count,
struct obd_ioctl_data *data);
/* llog_net.c */
int llog_initiator_connect(struct llog_ctxt *ctxt);
int llog_receptor_accept(struct llog_ctxt *ctxt, struct obd_import *imp);
int llog_origin_connect(struct llog_ctxt *ctxt, int count,
- struct llog_logid *logid, struct llog_ctxt_gen *gen);
+ struct llog_logid *logid, struct llog_gen *gen);
int llog_handle_connect(struct ptlrpc_request *req);
/* recov_thread.c */
struct lov_stripe_md *lsm, int count,
struct llog_cookie *cookies, int flags);
int llog_obd_repl_sync(struct llog_ctxt *ctxt, struct obd_export *exp);
-int llog_repl_connect(struct llog_ctxt *ctxt, int count,
- struct llog_logid *logid, struct llog_ctxt_gen *gen);
+int llog_repl_connect(struct llog_ctxt *ctxt, int count,
+ struct llog_logid *logid, struct llog_gen *gen);
struct llog_operations {
int (*lop_write_rec)(struct llog_handle *loghandle,
- struct llog_rec_hdr *rec,
- struct llog_cookie *logcookies,
- int numcookies,
- void *,
- int idx);
+ struct llog_rec_hdr *rec,
+ struct llog_cookie *logcookies, int numcookies,
+ void *, int idx);
int (*lop_destroy)(struct llog_handle *handle);
- int (*lop_next_block)(struct llog_handle *h,
- int *curr_idx,
- int next_idx,
- __u64 *offset,
- void *buf,
- int len);
+ int (*lop_next_block)(struct llog_handle *h, int *curr_idx,
+ int next_idx, __u64 *offset, void *buf, int len);
int (*lop_create)(struct llog_ctxt *ctxt, struct llog_handle **,
struct llog_logid *logid, char *name);
int (*lop_close)(struct llog_handle *handle);
int (*lop_read_header)(struct llog_handle *handle);
- int (*lop_setup)(struct obd_device *obd, int ctxt_idx,
- struct obd_device *disk_obd, int count,
+ int (*lop_setup)(struct obd_device *obd, int ctxt_idx,
+ struct obd_device *disk_obd, int count,
struct llog_logid *logid);
int (*lop_sync)(struct llog_ctxt *ctxt, struct obd_export *exp);
int (*lop_cleanup)(struct llog_ctxt *ctxt);
- int (*lop_add)(struct llog_ctxt *ctxt, struct llog_rec_hdr *rec,
- struct lov_stripe_md *lsm,
+ int (*lop_add)(struct llog_ctxt *ctxt, struct llog_rec_hdr *rec,
+ struct lov_stripe_md *lsm,
struct llog_cookie *logcookies, int numcookies);
int (*lop_cancel)(struct llog_ctxt *ctxt, struct lov_stripe_md *lsm,
int count, struct llog_cookie *cookies, int flags);
int (*lop_connect)(struct llog_ctxt *ctxt, int count,
- struct llog_logid *logid, struct llog_ctxt_gen *gen);
+ struct llog_logid *logid, struct llog_gen *gen);
/* XXX add 2 more: commit callbacks and llog recovery functions */
};
struct llog_ctxt {
int loc_idx; /* my index the obd array of ctxt's */
- struct llog_ctxt_gen loc_gen;
+ struct llog_gen loc_gen;
struct obd_device *loc_obd; /* points back to the containing obd*/
struct obd_export *loc_exp;
- struct obd_import *loc_imp; /* to use in RPC's: can be backward
+ struct obd_import *loc_imp; /* to use in RPC's: can be backward
pointing import */
struct llog_operations *loc_logops;
struct llog_handle *loc_handle;
void *llog_proc_cb;
};
-static inline void log_gen_init(struct llog_ctxt *ctxt)
+static inline void llog_gen_init(struct llog_ctxt *ctxt)
{
struct obd_device *obd = ctxt->loc_exp->exp_obd;
if (!strcmp(obd->obd_type->typ_name, "mds"))
ctxt->loc_gen.mnt_cnt = obd->u.mds.mds_mount_count;
- else if (!strstr(obd->obd_type->typ_name, "filter")) {
- ctxt->loc_gen.mnt_cnt = obd->u.filter.fo_mount_count;
- }
+ else if (!strstr(obd->obd_type->typ_name, "filter"))
+ ctxt->loc_gen.mnt_cnt = obd->u.filter.fo_mount_count;
else
- ctxt->loc_gen.mnt_cnt = 0;
+ ctxt->loc_gen.mnt_cnt = 0;
}
-static inline int log_gen_lt(struct llog_ctxt_gen a, struct llog_ctxt_gen b)
+static inline int llog_gen_lt(struct llog_gen a, struct llog_gen b)
{
if (a.mnt_cnt < b.mnt_cnt)
return 1;
return(a.conn_cnt < b.conn_cnt ? 1 : 0);
}
+#define LLOG_GEN_INC(gen) ((gen).conn_cnt) ++
+#define LLOG_PROC_BREAK 0x0001
static inline int llog_obd2ops(struct llog_ctxt *ctxt,
struct llog_operations **lop)
{
if (ctxt == NULL)
return -ENOTCONN;
+
*lop = ctxt->loc_logops;
if (*lop == NULL)
return -EOPNOTSUPP;
+
return 0;
}
{
if (loghandle == NULL)
return -EINVAL;
+
return llog_obd2ops(loghandle->lgh_ctxt, lop);
}
{
if (index < 0 || index >= LLOG_MAX_CTXTS)
return NULL;
- else
- return obd->obd_llog_ctxt[index];
+
+ return obd->obd_llog_ctxt[index];
}
static inline int llog_write_rec(struct llog_handle *handle,
struct llog_operations *lop;
int rc, buflen;
ENTRY;
-
+
rc = llog_handle2ops(handle, &lop);
if (rc)
RETURN(rc);
struct llog_operations *lop;
int rc;
ENTRY;
-
+
rc = llog_handle2ops(handle, &lop);
if (rc)
RETURN(rc);
struct llog_operations *lop;
int rc;
ENTRY;
-
+
rc = llog_handle2ops(handle, &lop);
if (rc)
RETURN(rc);
rc = lop->lop_cancel(exp, lsm, count, cookies, flags);
RETURN(rc);
}
-#endif
+#endif
static inline int llog_next_block(struct llog_handle *loghandle, int *cur_idx,
int next_idx, __u64 *cur_offset, void *buf,
RETURN(rc);
}
-static inline int llog_create(struct llog_ctxt *ctxt,
- struct llog_handle **res,
+static inline int llog_create(struct llog_ctxt *ctxt, struct llog_handle **res,
struct llog_logid *logid, char *name)
{
struct llog_operations *lop;
}
static inline int llog_connect(struct llog_ctxt *ctxt, int count,
- struct llog_logid *logid,
- struct llog_ctxt_gen *gen)
+ struct llog_logid *logid, struct llog_gen *gen)
{
struct llog_operations *lop;
int rc;
pcfg.pcfg_nid = lmd->lmd_server_nid;
pcfg.pcfg_id = lmd->lmd_server_ipaddr;
pcfg.pcfg_misc = lmd->lmd_port;
- pcfg.pcfg_size = 0;
+ pcfg.pcfg_size = 8388608;
pcfg.pcfg_flags = 0x4; /*share*/
err = kportal_nal_cmd(&pcfg);
if (err <0)
LPROC_FILTER_LAST,
};
+#define FILTER_MAX_CACHE_SIZE OBD_OBJECT_EOF
+
/* filter.c */
struct dentry *filter_parent(struct obd_device *, obd_gr group, obd_id objid);
struct dentry *filter_parent_lock(struct obd_device *, obd_gr, obd_id,
{
struct obd_ioobj *o;
struct niobuf_local *lnb;
- int i, j;
+ int i, j, drop = 0;
ENTRY;
+ if (res->dentry != NULL)
+ drop = (res->dentry->d_inode->i_size >
+ exp->exp_obd->u.filter.fo_readcache_max_filesize);
+
for (i = 0, o = obj, lnb = res; i < objcount; i++, o++) {
for (j = 0 ; j < o->ioo_bufcnt ; j++, lnb++) {
- if (lnb->page != NULL)
- page_cache_release(lnb->page);
+ if (lnb->page == NULL)
+ continue;
+ /* drop from cache like truncate_list_pages() */
+ if (drop && !TryLockPage(lnb->page)) {
+ if (lnb->page->mapping)
+ truncate_complete_page(lnb->page);
+ unlock_page(lnb->page);
+ }
+ page_cache_release(lnb->page);
}
}
if (res->dentry != NULL)
struct obd_device *obd = ctxt->loc_obd;
struct obd_export *exp = obd->obd_self_export;
struct llog_cookie cookie;
+ struct llog_gen_rec *lgr;
struct llog_unlink_rec *lur;
struct obdo *oa;
- struct obd_trans_info oti = { 0 };
obd_id oid;
int rc = 0;
ENTRY;
- if (!le32_to_cpu(llh->lgh_hdr->llh_flags) & LLOG_F_IS_PLAIN) {
+ if (!(le32_to_cpu(llh->lgh_hdr->llh_flags) & LLOG_F_IS_PLAIN)) {
CERROR("log is not plain\n");
RETURN(-EINVAL);
}
- if (rec->lrh_type != MDS_UNLINK_REC) {
+ if (rec->lrh_type != MDS_UNLINK_REC &&
+ rec->lrh_type != LLOG_GEN_REC) {
CERROR("log record type error\n");
RETURN(-EINVAL);
}
cookie.lgc_lgl = llh->lgh_id;
cookie.lgc_subsys = LLOG_UNLINK_ORIG_CTXT;
cookie.lgc_index = le32_to_cpu(rec->lrh_index);
-
+
+ if (rec->lrh_type == LLOG_GEN_REC) {
+ lgr = (struct llog_gen_rec *)rec;
+ if (llog_gen_lt(lgr->lgr_gen, ctxt->loc_gen))
+ rc = 0;
+ else
+ rc = LLOG_PROC_BREAK;
+ CWARN("fetch generation log, send cookie\n");
+ llog_cancel(ctxt, NULL, 1, &cookie, 0);
+ RETURN(rc);
+ }
+
lur = (struct llog_unlink_rec *)rec;
-
oa = obdo_alloc();
if (oa == NULL)
RETURN(-ENOMEM);
memcpy(obdo_logcookie(oa), &cookie, sizeof(cookie));
oid = oa->o_id;
- rc = obd_destroy(exp, oa, NULL, &oti);
+ rc = obd_destroy(exp, oa, NULL, NULL);
obdo_free(oa);
if (rc == -ENOENT) {
- CWARN("object already removed: send cookie\n");
+ CWARN("object already removed, send cookie\n");
llog_cancel(ctxt, NULL, 1, &cookie, 0);
RETURN(0);
}
include $(src)/../portals/Kernelenv
obj-y += ptlrpc.o
+
ptlrpc-objs := recover.o connection.o ptlrpc_module.o events.o service.o \
client.o niobuf.o pack_generic.o lproc_ptlrpc.o pinger.o \
recov_thread.o import.o llog_net.o llog_client.o \
- llog_server.o ptlrpcd.o
+ llog_server.o ptlrpcd.o ../ldlm/l_lock.o ../ldlm/ldlm_lock.o \
+ ../ldlm/ldlm_resource.o ../ldlm/ldlm_extent.o \
+ ../ldlm/ldlm_request.o ../ldlm/ldlm_lockd.o \
+ ../ldlm/ldlm_lib.o ../ldlm/ldlm_flock.o ../ldlm/ldlm_plain.o
+
#include <linux/lustre_log.h>
#include "ptlrpc_internal.h"
+#define LLCD_SIZE 4096
+
#ifdef __KERNEL__
static struct llog_commit_master lustre_lcm;
struct llog_canceld_ctxt *llcd;
int offset = offsetof(struct llog_canceld_ctxt, llcd_cookies);
- OBD_ALLOC(llcd, PAGE_SIZE + offset);
+ OBD_ALLOC(llcd, LLCD_SIZE + offset);
if (llcd == NULL)
return -ENOMEM;
int offset = offsetof(struct llog_canceld_ctxt, llcd_cookies);
if (atomic_read(&lcm->lcm_llcd_numfree) >= lcm->lcm_llcd_maxfree) {
- OBD_FREE(llcd, PAGE_SIZE + offset);
+ OBD_FREE(llcd, LLCD_SIZE + offset);
} else {
spin_lock(&lcm->lcm_llcd_lock);
list_add(&llcd->llcd_list, &lcm->lcm_llcd_free);
EXPORT_SYMBOL(llcd_send);
/* deleted objects have a commit callback that cancels the MDS
- * log record for the deletion. The commit callback calls this
- * function
+ * log record for the deletion. The commit callback calls this
+ * function
*/
int llog_obd_repl_cancel(struct llog_ctxt *ctxt,
struct lov_stripe_md *lsm, int count,
cookies->lgc_lgl.lgl_ogen, cookies->lgc_index);
GOTO(out, rc = -ENOMEM);
}
- llcd->llcd_import = ctxt->loc_imp;
- llcd->llcd_gen = ctxt->loc_gen;
+ llcd->llcd_ctxt = ctxt;
ctxt->loc_llcd = llcd;
}
llcd->llcd_cookiebytes += sizeof(*cookies);
send_now:
- if ((PAGE_SIZE - llcd->llcd_cookiebytes < sizeof(*cookies) ||
+ if ((LLCD_SIZE - llcd->llcd_cookiebytes < sizeof(*cookies) ||
flags & OBD_LLOG_FL_SENDNOW)) {
- CDEBUG(D_HA, "send llcd: %p\n", llcd);
+ CDEBUG(D_HA, "send llcd %p:%p\n", llcd, llcd->llcd_ctxt);
ctxt->loc_llcd = NULL;
llcd_send(llcd);
}
int rc = 0;
ENTRY;
- LASSERT(ctxt->loc_llcd);
-
if (exp && (ctxt->loc_imp == exp->exp_imp_reverse)) {
- CWARN("import will be destroyed, put llcd %p\n",
- ctxt->loc_llcd);
- llcd_put(ctxt->loc_llcd);
- ctxt->loc_llcd = NULL;
+ down(&ctxt->loc_sem);
+ if (ctxt->loc_llcd != NULL) {
+ CWARN("import will be destroyed, put "
+ "llcd %p:%p\n", ctxt->loc_llcd, ctxt);
+ llcd_put(ctxt->loc_llcd);
+ ctxt->loc_llcd = NULL;
+ ctxt->loc_imp = NULL;
+ }
up(&ctxt->loc_sem);
} else {
- up(&ctxt->loc_sem);
rc = llog_cancel(ctxt, NULL, 0, NULL, OBD_LLOG_FL_SENDNOW);
}
llcd = list_entry(lcd->lcd_llcd_list.next,
typeof(*llcd), llcd_list);
LASSERT(llcd->llcd_lcm == lcm);
- import = llcd->llcd_import;
+ import = llcd->llcd_ctxt->loc_imp;
}
list_for_each_entry_safe(llcd, n, sending_list, llcd_list) {
LASSERT(llcd->llcd_lcm == lcm);
- if (import == llcd->llcd_import)
+ if (import == llcd->llcd_ctxt->loc_imp)
list_move_tail(&llcd->llcd_list,
&lcd->lcd_llcd_list);
}
list_for_each_entry_safe(llcd, n, &lcm->lcm_llcd_resend,
llcd_list) {
LASSERT(llcd->llcd_lcm == lcm);
- if (import == llcd->llcd_import)
+ if (import == llcd->llcd_ctxt->loc_imp)
list_move_tail(&llcd->llcd_list,
&lcd->lcd_llcd_list);
}
/* We are the only one manipulating our local list - no lock */
list_for_each_entry_safe(llcd,n, &lcd->lcd_llcd_list,llcd_list){
char *bufs[1] = {(char *)llcd->llcd_cookies};
- struct obd_device *obd = import->imp_obd;
- struct llog_ctxt *ctxt;
list_del(&llcd->llcd_list);
if (llcd->llcd_cookiebytes == 0) {
- CDEBUG(D_HA, "just put empty llcd %p\n", llcd);
+ CDEBUG(D_HA, "put empty llcd %p:%p\n",
+ llcd, llcd->llcd_ctxt);
llcd_put(llcd);
continue;
}
- /* check whether the cookies are new. if new then send, otherwise
- * just put llcd */
- ctxt = llog_get_context(obd, llcd->llcd_cookies[0].lgc_subsys + 1);
- LASSERT(ctxt != NULL);
- down(&ctxt->loc_sem);
- if (log_gen_lt(llcd->llcd_gen, ctxt->loc_gen)) {
- up(&ctxt->loc_sem);
- CDEBUG(D_HA, "just put stale llcd %p\n", llcd);
+
+ down(&llcd->llcd_ctxt->loc_sem);
+ if (llcd->llcd_ctxt->loc_imp == NULL) {
+ up(&llcd->llcd_ctxt->loc_sem);
+ CWARN("import will be destroyed, put "
+ "llcd %p:%p\n", llcd, llcd->llcd_ctxt);
llcd_put(llcd);
continue;
}
- up(&ctxt->loc_sem);
request = ptlrpc_prep_req(import, OBD_LOG_CANCEL, 1,
&llcd->llcd_cookiebytes,
bufs);
+ up(&llcd->llcd_ctxt->loc_sem);
+
if (request == NULL) {
rc = -ENOMEM;
CERROR("error preparing commit: rc %d\n", rc);
}
request->rq_replen = lustre_msg_size(0, NULL);
+ down(&llcd->llcd_ctxt->loc_sem);
+ if (llcd->llcd_ctxt->loc_imp == NULL) {
+ up(&llcd->llcd_ctxt->loc_sem);
+ CWARN("import will be destroyed, put "
+ "llcd %p:%p\n", llcd, llcd->llcd_ctxt);
+ llcd_put(llcd);
+ ptlrpc_req_finished(request);
+ continue;
+ }
rc = ptlrpc_queue_wait(request);
ptlrpc_req_finished(request);
+ up(&llcd->llcd_ctxt->loc_sem);
/* If the RPC failed, we put this and the remaining
* messages onto the resend list for another time. */
continue;
}
-#if 0 /* FIXME just put llcd, not send it again */
+#if 0 /* FIXME just put llcd, not put it on resend list */
spin_lock(&lcm->lcm_llcd_lock);
list_splice(&lcd->lcd_llcd_list, &lcm->lcm_llcd_resend);
if (++llcd->llcd_tries < 5) {
} else {
spin_unlock(&lcm->lcm_llcd_lock);
#endif
- CERROR("commit %p dropped %d cookies: rc %d\n",
- llcd, (int)(llcd->llcd_cookiebytes /
- sizeof(*llcd->llcd_cookies)),
- rc);
+ CERROR("commit %p:%p drop %d cookies: rc %d\n",
+ llcd, llcd->llcd_ctxt,
+ (int)(llcd->llcd_cookiebytes /
+ sizeof(*llcd->llcd_cookies)), rc);
llcd_put(llcd);
-// }
+#if 0
+ }
break;
+#endif
}
if (rc == 0) {
EXPORT_SYMBOL(llog_start_commit_thread);
static struct llog_process_args {
- struct semaphore llpa_sem;
+ struct semaphore llpa_sem;
struct llog_ctxt *llpa_ctxt;
void *llpa_cb;
void *llpa_arg;
} llpa;
+
int llog_init_commit_master(void)
{
INIT_LIST_HEAD(&lcm->lcm_thread_busy);
return 0;
}
-
static int log_process_thread(void *args)
{
struct llog_process_args *data = args;
unsigned long flags;
int rc;
ENTRY;
-
+
up(&data->llpa_sem);
lock_kernel();
ptlrpc_daemonize(); /* thread never needs to do IO */
-
+
SIGNAL_MASK_LOCK(current, flags);
sigfillset(¤t->blocked);
RECALC_SIGPENDING;
SIGNAL_MASK_UNLOCK(current, flags);
unlock_kernel();
-
+
rc = llog_create(ctxt, &llh, &logid, NULL);
if (rc) {
CERROR("llog_create failed %d\n", rc);
CERROR("llog_init_handle failed %d\n", rc);
GOTO(out, rc);
}
-
+
if (cb) {
rc = llog_cat_process(llh, (llog_cb_t)cb, NULL);
- if (rc)
+ if (rc != LLOG_PROC_BREAK)
CERROR("llog_cat_process failed %d\n", rc);
- } else
+ } else {
CWARN("no callback function for recovery\n");
+ }
- CDEBUG(D_HA, "send to llcd :%p forcibly\n", ctxt->loc_llcd);
+ CDEBUG(D_HA, "send llcd %p:%p forcibly after recovery\n",
+ ctxt->loc_llcd, ctxt);
llog_sync(ctxt, NULL);
out:
rc = llog_cat_put(llh);
if (rc)
CERROR("llog_cat_put failed %d\n", rc);
-
+
RETURN(rc);
}
-static int llog_recovery_generic(struct llog_ctxt *ctxt,
- void *handle,
- void *arg)
+
+static int llog_recovery_generic(struct llog_ctxt *ctxt, void *handle,void *arg)
{
int rc;
ENTRY;
RETURN(rc);
}
+
int llog_repl_connect(struct llog_ctxt *ctxt, int count,
- struct llog_logid *logid, struct llog_ctxt_gen *gen)
+ struct llog_logid *logid, struct llog_gen *gen)
{
struct llog_canceld_ctxt *llcd;
int rc;
ENTRY;
-
+
+ /* send back llcd before recovery from llog */
+ if (ctxt->loc_llcd != NULL) {
+ CWARN("llcd %p:%p not empty\n", ctxt->loc_llcd, ctxt);
+ llog_sync(ctxt, NULL);
+ }
+
down(&ctxt->loc_sem);
ctxt->loc_gen = *gen;
- llcd = ctxt->loc_llcd;
- if (llcd) {
- CDEBUG(D_HA, "put current llcd when new connection arrives\n");
- llcd_put(llcd);
- }
llcd = llcd_grab();
if (llcd == NULL) {
CERROR("couldn't get an llcd\n");
+ up(&ctxt->loc_sem);
RETURN(-ENOMEM);
}
- llcd->llcd_import = ctxt->loc_imp;
- llcd->llcd_gen = ctxt->loc_gen;
+ llcd->llcd_ctxt = ctxt;
ctxt->loc_llcd = llcd;
up(&ctxt->loc_sem);
- rc = llog_recovery_generic(ctxt, ctxt->llog_proc_cb, logid);
+ rc = llog_recovery_generic(ctxt, ctxt->llog_proc_cb, logid);
if (rc != 0)
CERROR("error recovery process: %d\n", rc);