int lpcd_last_idx;
};
-struct llog_process_cat_args {
- /**
- * Llog context used in recovery thread on OST (recov_thread.c)
- */
- struct llog_ctxt *lpca_ctxt;
- /**
- * Llog callback used in recovery thread on OST (recov_thread.c)
- */
- void *lpca_cb;
- /**
- * Data pointer for llog callback.
- */
- void *lpca_arg;
-};
-
int llog_cat_close(const struct lu_env *env, struct llog_handle *cathandle);
int llog_cat_add_rec(const struct lu_env *env, struct llog_handle *cathandle,
struct llog_rec_hdr *rec, struct llog_cookie *reccookie,
void *data, int startcat, int startidx, bool fork);
int llog_cat_process(const struct lu_env *env, struct llog_handle *cat_llh,
llog_cb_t cb, void *data, int startcat, int startidx);
-int llog_cat_process_thread(void *data);
int llog_cat_reverse_process(const struct lu_env *env,
struct llog_handle *cat_llh, llog_cb_t cb,
void *data);
int llog_cancel(const struct lu_env *env, struct llog_ctxt *ctxt,
struct lov_stripe_md *lsm, int count,
struct llog_cookie *cookies, int flags);
-int llog_obd_origin_add(const struct lu_env *env, struct llog_ctxt *ctxt,
- struct llog_rec_hdr *rec, struct lov_stripe_md *lsm,
- struct llog_cookie *logcookies, int numcookies);
int obd_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
struct obd_device *disk_obd, int *idx);
/* llog_ioctl.c */
int llog_ioctl(const struct lu_env *env, struct llog_ctxt *ctxt, int cmd,
struct obd_ioctl_data *data);
-int llog_catalog_list(struct obd_device *obd, int count,
- struct obd_ioctl_data *data);
/* llog_net.c */
int llog_initiator_connect(struct llog_ctxt *ctxt);
-int llog_receptor_accept(struct llog_ctxt *ctxt, struct obd_import *imp);
-int llog_origin_connect(struct llog_ctxt *ctxt,
- struct llog_logid *logid, struct llog_gen *gen,
- struct obd_uuid *uuid);
-int llog_handle_connect(struct ptlrpc_request *req);
-
-/* recov_thread.c */
-int llog_obd_repl_cancel(const struct lu_env *env, struct llog_ctxt *ctxt,
- struct lov_stripe_md *lsm, int count,
- struct llog_cookie *cookies, int flags);
-int llog_obd_repl_sync(struct llog_ctxt *ctxt, struct obd_export *exp,
- int flags);
-int llog_obd_repl_connect(struct llog_ctxt *ctxt,
- struct llog_logid *logid, struct llog_gen *gen,
- struct obd_uuid *uuid);
struct llog_operations {
int (*lop_destroy)(const struct lu_env *env,
/* llog_lvfs.c */
extern struct llog_operations llog_lvfs_ops;
-int llog_get_cat_list(struct obd_device *disk_obd,
- char *name, int idx, int count,
- struct llog_catid *idarray);
-
-int llog_put_cat_list(struct obd_device *disk_obd,
- char *name, int idx, int count, struct llog_catid *idarray);
/* llog_osd.c */
extern struct llog_operations llog_osd_ops;
struct llog_ctxt {
int loc_idx; /* my index the obd array of ctxt's */
- struct llog_gen loc_gen;
struct obd_device *loc_obd; /* points back to the containing obd*/
struct obd_llog_group *loc_olg; /* group containing that ctxt */
struct obd_export *loc_exp; /* parent "disk" export (e.g. MDS) */
pointing import */
struct llog_operations *loc_logops;
struct llog_handle *loc_handle;
- struct llog_commit_master *loc_lcm;
- struct llog_canceld_ctxt *loc_llcd;
- struct mutex loc_mutex; /* protect loc_llcd and loc_imp */
+ struct mutex loc_mutex; /* protect loc_imp */
cfs_atomic_t loc_refcount;
- void *llog_proc_cb;
long loc_flags; /* flags, see above defines */
struct dt_object *loc_dir;
};
-#define LCM_NAME_SIZE 64
-
-struct llog_commit_master {
- /**
- * Thread control flags (start, stop, etc.)
- */
- long lcm_flags;
- /**
- * Number of llcds onthis lcm.
- */
- cfs_atomic_t lcm_count;
- /**
- * The refcount for lcm
- */
- cfs_atomic_t lcm_refcount;
- /**
- * Thread control structure. Used for control commit thread.
- */
- struct ptlrpcd_ctl lcm_pc;
- /**
- * Lock protecting list of llcds.
- */
- spinlock_t lcm_lock;
- /**
- * Llcds in flight for debugging purposes.
- */
- cfs_list_t lcm_llcds;
- /**
- * Commit thread name buffer. Only used for thread start.
- */
- char lcm_name[LCM_NAME_SIZE];
-};
-
-static inline struct llog_commit_master
-*lcm_get(struct llog_commit_master *lcm)
-{
- cfs_atomic_inc(&lcm->lcm_refcount);
- return lcm;
-}
-
-static inline void
-lcm_put(struct llog_commit_master *lcm)
-{
- LASSERT_ATOMIC_POS(&lcm->lcm_refcount);
- if (cfs_atomic_dec_and_test(&lcm->lcm_refcount))
- OBD_FREE_PTR(lcm);
-}
-
-struct llog_canceld_ctxt {
- /**
- * Llog context this llcd is attached to. Used for accessing
- * ->loc_import and others in process of canceling cookies
- * gathered in this llcd.
- */
- struct llog_ctxt *llcd_ctxt;
- /**
- * Cancel thread control stucture pointer. Used for accessing
- * it to see if should stop processing and other needs.
- */
- struct llog_commit_master *llcd_lcm;
- /**
- * Maximal llcd size. Used in calculations on how much of room
- * left in llcd to cookie comming cookies.
- */
- int llcd_size;
- /**
- * Link to lcm llcds list.
- */
- cfs_list_t llcd_list;
- /**
- * Current llcd size while gathering cookies. This should not be
- * more than ->llcd_size. Used for determining if we need to
- * send this llcd (if full) and allocate new one. This is also
- * used for copying new cookie at the end of buffer.
- */
- int llcd_cookiebytes;
- /**
- * Pointer to the start of cookies buffer.
- */
- struct llog_cookie llcd_cookies[0];
-};
-
-/* ptlrpc/recov_thread.c */
-extern struct llog_commit_master *llog_recov_thread_init(char *name);
-extern void llog_recov_thread_fini(struct llog_commit_master *lcm,
- int force);
-extern int llog_recov_thread_start(struct llog_commit_master *lcm);
-extern void llog_recov_thread_stop(struct llog_commit_master *lcm,
- int force);
-
-static inline void llog_gen_init(struct llog_ctxt *ctxt)
-{
- struct obd_device *obd = ctxt->loc_exp->exp_obd;
-
- LASSERTF(obd->u.obt.obt_magic == OBT_MAGIC,
- "%s: wrong obt magic %#x\n",
- obd->obd_name, obd->u.obt.obt_magic);
- ctxt->loc_gen.mnt_cnt = obd->u.obt.obt_mount_count;
- ctxt->loc_gen.conn_cnt++;
-}
-
-static inline int llog_gen_lt(struct llog_gen a, struct llog_gen b)
-{
- if (a.mnt_cnt < b.mnt_cnt)
- return 1;
- if (a.mnt_cnt > b.mnt_cnt)
- return 0;
- return(a.conn_cnt < b.conn_cnt ? 1 : 0);
-}
-
#define LLOG_PROC_BREAK 0x0001
#define LLOG_DEL_RECORD 0x0002
olg->olg_seq = group;
}
-static inline void llog_group_set_export(struct obd_llog_group *olg,
- struct obd_export *exp)
-{
- LASSERT(exp != NULL);
-
- spin_lock(&olg->olg_lock);
- if (olg->olg_exp != NULL && olg->olg_exp != exp)
- CWARN("%s: export for group %d is changed: 0x%p -> 0x%p\n",
- exp->exp_obd->obd_name, olg->olg_seq,
- olg->olg_exp, exp);
- olg->olg_exp = exp;
- spin_unlock(&olg->olg_lock);
-}
-
static inline int llog_group_set_ctxt(struct obd_llog_group *olg,
struct llog_ctxt *ctxt, int index)
{
*/
struct cfs_hash *fo_iobuf_hash;
- cfs_list_t fo_llog_list;
- spinlock_t fo_llog_list_lock;
-
struct brw_stats fo_filter_stats;
int fo_fmd_max_num; /* per exp filter_mod_data */
unsigned int fo_fl_oss_capa;
cfs_list_t fo_capa_keys;
cfs_hlist_head_t *fo_capa_hash;
- struct llog_commit_master *fo_lcm;
int fo_sec_level;
};
enum lustre_sec_part cl_sp_to;
struct sptlrpc_flavor cl_flvr_mgc; /* fixed flavor of mgc->mgs */
- //struct llog_canceld_ctxt *cl_llcd; /* it's included by obd_llog_ctxt */
- void *cl_llcd_offset;
-
/* the grant values are protected by loi_list_lock below */
long cl_dirty; /* all _dirty_ in bytes */
long cl_dirty_max; /* allowed w/o rpc */
};
struct obd_llog_group {
- cfs_list_t olg_list;
int olg_seq;
struct llog_ctxt *olg_ctxts[LLOG_MAX_CTXTS];
cfs_waitq_t olg_waitq;
spinlock_t olg_lock;
- struct obd_export *olg_exp;
- int olg_initializing;
struct mutex olg_cat_processing;
};
return ctxt->cb_ops.l_fid2dentry(id_ino, gen, gr, exp->exp_obd);
}
-static inline int
-obd_lvfs_open_llog(struct obd_export *exp, __u64 id_ino, struct dentry *dentry)
-{
- LASSERT(exp->exp_obd);
- CERROR("FIXME what's the story here? This needs to be an obd fn?\n");
-#if 0
- return lvfs_open_llog(&exp->exp_obd->obd_lvfs_ctxt, id_ino,
- dentry, exp->exp_obd);
-#endif
- return 0;
-}
-
/* @max_age is the oldest time in jiffies that we accept using a cached data.
* If the cache is older than @max_age we will get a new value from the
* target. Use a value of "cfs_time_current() + HZ" to guarantee freshness. */
}
EXPORT_SYMBOL(llog_cat_process);
-#ifdef __KERNEL__
-int llog_cat_process_thread(void *data)
-{
- struct llog_process_cat_args *args = data;
- struct llog_ctxt *ctxt = args->lpca_ctxt;
- struct llog_handle *llh = NULL;
- llog_cb_t cb = args->lpca_cb;
- struct llog_thread_info *lgi;
- struct lu_env env;
- int rc;
- ENTRY;
-
- cfs_daemonize_ctxt("ll_log_process");
-
- rc = lu_env_init(&env, LCT_LOCAL);
- if (rc)
- GOTO(out, rc);
- lgi = llog_info(&env);
- LASSERT(lgi);
-
- lgi->lgi_logid = *(struct llog_logid *)(args->lpca_arg);
- rc = llog_open(&env, ctxt, &llh, &lgi->lgi_logid, NULL,
- LLOG_OPEN_EXISTS);
- if (rc) {
- CERROR("%s: cannot open llog "LPX64":%x: rc = %d\n",
- ctxt->loc_obd->obd_name, lgi->lgi_logid.lgl_oid,
- lgi->lgi_logid.lgl_ogen, rc);
- GOTO(out_env, rc);
- }
- rc = llog_init_handle(&env, llh, LLOG_F_IS_CAT, NULL);
- if (rc) {
- CERROR("%s: llog_init_handle failed: rc = %d\n",
- llh->lgh_ctxt->loc_obd->obd_name, rc);
- GOTO(release_llh, rc);
- }
-
- if (cb) {
- rc = llog_cat_process(&env, llh, cb, NULL, 0, 0);
- if (rc != LLOG_PROC_BREAK && rc != 0)
- CERROR("%s: llog_cat_process() failed: rc = %d\n",
- llh->lgh_ctxt->loc_obd->obd_name, rc);
- cb(&env, llh, NULL, NULL);
- } else {
- CWARN("No callback function for recovery\n");
- }
-
- /*
- * Make sure that all cached data is sent.
- */
- llog_sync(ctxt, NULL, 0);
- GOTO(release_llh, rc);
-release_llh:
- rc = llog_cat_close(&env, llh);
- if (rc)
- CERROR("%s: llog_cat_close() failed: rc = %d\n",
- llh->lgh_ctxt->loc_obd->obd_name, rc);
-out_env:
- lu_env_fini(&env);
-out:
- llog_ctxt_put(ctxt);
- OBD_FREE_PTR(args);
- return rc;
-}
-EXPORT_SYMBOL(llog_cat_process_thread);
-#endif
-
static int llog_cat_reverse_process_cb(const struct lu_env *env,
struct llog_handle *cat_llh,
struct llog_rec_hdr *rec, void *data)
void *lpi_cbdata;
void *lpi_catdata;
int lpi_rc;
- int lpi_flags;
struct completion lpi_completion;
const struct lu_env *lpi_env;
struct llog_thread_info {
struct lu_attr lgi_attr;
struct lu_fid lgi_fid;
- struct llog_logid lgi_logid;
struct dt_object_format lgi_dof;
- struct llog_process_data lgi_lpd;
- struct lustre_mdt_attrs lgi_lma_attr;
-
struct lu_buf lgi_buf;
loff_t lgi_off;
-
struct llog_rec_hdr lgi_lrh;
struct llog_rec_tail lgi_tail;
- struct llog_logid_rec lgi_lid;
};
extern struct lu_context_key llog_thread_key;
RETURN(rc);
}
EXPORT_SYMBOL(llog_ioctl);
-
-#ifdef HAVE_LDISKFS_OSD
-int llog_catalog_list(struct obd_device *obd, int count,
- struct obd_ioctl_data *data)
-{
- int size, i;
- struct llog_catid *idarray;
- struct llog_logid *id;
- char name[32] = CATLIST;
- char *out;
- int l, remains, rc = 0;
-
- ENTRY;
- size = sizeof(*idarray) * count;
-
- OBD_ALLOC_LARGE(idarray, size);
- if (!idarray)
- RETURN(-ENOMEM);
-
- mutex_lock(&obd->obd_olg.olg_cat_processing);
- rc = llog_get_cat_list(obd, name, 0, count, idarray);
- if (rc)
- GOTO(out, rc);
-
- out = data->ioc_bulk;
- remains = data->ioc_inllen1;
- for (i = 0; i < count; i++) {
- id = &idarray[i].lci_logid;
- l = snprintf(out, remains,
- "catalog log: #"LPX64"#"LPX64"#%08x\n",
- id->lgl_oid, id->lgl_oseq, id->lgl_ogen);
- out += l;
- remains -= l;
- if (remains <= 0) {
- CWARN("not enough memory for catlog list\n");
- break;
- }
- }
-out:
- /* release semaphore */
- mutex_unlock(&obd->obd_olg.olg_cat_processing);
-
- OBD_FREE_LARGE(idarray, size);
- RETURN(rc);
-
-}
-EXPORT_SYMBOL(llog_catalog_list);
-#endif
RETURN(rc);
}
-/* reads the catalog list */
-int llog_get_cat_list(struct obd_device *disk_obd,
- char *name, int idx, int count, struct llog_catid *idarray)
-{
- struct lvfs_run_ctxt saved;
- struct l_file *file;
- int rc, rc1 = 0;
- int size = sizeof(*idarray) * count;
- loff_t off = idx * sizeof(*idarray);
- ENTRY;
-
- if (!count)
- RETURN(0);
-
- push_ctxt(&saved, &disk_obd->obd_lvfs_ctxt, NULL);
- file = filp_open(name, O_RDWR | O_CREAT | O_LARGEFILE, 0700);
- if (!file || IS_ERR(file)) {
- rc = PTR_ERR(file);
- CERROR("OBD filter: cannot open/create %s: rc = %d\n",
- name, rc);
- GOTO(out, rc);
- }
-
- if (!S_ISREG(file->f_dentry->d_inode->i_mode)) {
- CERROR("%s is not a regular file!: mode = %o\n", name,
- file->f_dentry->d_inode->i_mode);
- GOTO(out, rc = -ENOENT);
- }
-
- CDEBUG(D_CONFIG, "cat list: disk size=%d, read=%d\n",
- (int)i_size_read(file->f_dentry->d_inode), size);
-
- /* read for new ost index or for empty file */
- memset(idarray, 0, size);
- if (i_size_read(file->f_dentry->d_inode) < off)
- GOTO(out, rc = 0);
-
- rc = fsfilt_read_record(disk_obd, file, idarray, size, &off);
- if (rc) {
- CERROR("OBD filter: error reading %s: rc %d\n", name, rc);
- GOTO(out, rc);
- }
-
- EXIT;
- out:
- pop_ctxt(&saved, &disk_obd->obd_lvfs_ctxt, NULL);
- if (file && !IS_ERR(file))
- rc1 = filp_close(file, 0);
- if (rc == 0)
- rc = rc1;
- return rc;
-}
-EXPORT_SYMBOL(llog_get_cat_list);
-
-/* writes the cat list */
-int llog_put_cat_list(struct obd_device *disk_obd,
- char *name, int idx, int count, struct llog_catid *idarray)
-{
- struct lvfs_run_ctxt saved;
- struct l_file *file;
- int rc, rc1 = 0;
- int size = sizeof(*idarray) * count;
- loff_t off = idx * sizeof(*idarray);
-
- if (!count)
- GOTO(out1, rc = 0);
-
- push_ctxt(&saved, &disk_obd->obd_lvfs_ctxt, NULL);
- file = filp_open(name, O_RDWR | O_CREAT | O_LARGEFILE, 0700);
- if (!file || IS_ERR(file)) {
- rc = PTR_ERR(file);
- CERROR("OBD filter: cannot open/create %s: rc = %d\n",
- name, rc);
- GOTO(out, rc);
- }
-
- if (!S_ISREG(file->f_dentry->d_inode->i_mode)) {
- CERROR("%s is not a regular file!: mode = %o\n", name,
- file->f_dentry->d_inode->i_mode);
- GOTO(out, rc = -ENOENT);
- }
-
- rc = fsfilt_write_record(disk_obd, file, idarray, size, &off, 1);
- if (rc) {
- CDEBUG(D_INODE,"OBD filter: error writeing %s: rc %d\n",
- name, rc);
- GOTO(out, rc);
- }
-
-out:
- pop_ctxt(&saved, &disk_obd->obd_lvfs_ctxt, NULL);
- if (file && !IS_ERR(file))
- rc1 = filp_close(file, 0);
-
- if (rc == 0)
- rc = rc1;
-out1:
- RETURN(rc);
-}
-EXPORT_SYMBOL(llog_put_cat_list);
-
static int llog_lvfs_declare_create(const struct lu_env *env,
struct llog_handle *res,
struct thandle *th)
};
EXPORT_SYMBOL(llog_lvfs_ops);
#else /* !__KERNEL__ */
-int llog_get_cat_list(struct obd_device *disk_obd,
- char *name, int idx, int count,
- struct llog_catid *idarray)
-{
- LBUG();
- return 0;
-}
-
-int llog_put_cat_list(struct obd_device *disk_obd,
- char *name, int idx, int count,
- struct llog_catid *idarray)
-{
- LBUG();
- return 0;
-}
-
struct llog_operations llog_lvfs_ops = {};
#endif
class_import_put(ctxt->loc_imp);
ctxt->loc_imp = NULL;
}
- LASSERT(ctxt->loc_llcd == NULL);
OBD_FREE_PTR(ctxt);
}
olg->olg_ctxts[ctxt->loc_idx] = NULL;
spin_unlock(&olg->olg_lock);
- if (ctxt->loc_lcm)
- lcm_put(ctxt->loc_lcm);
-
obd = ctxt->loc_obd;
spin_lock(&obd->obd_dev_lock);
/* sync with llog ctxt user thread */
}
EXPORT_SYMBOL(llog_cancel);
-/* add for obdfilter/sz and mds/unlink */
-int llog_obd_origin_add(const struct lu_env *env, struct llog_ctxt *ctxt,
- struct llog_rec_hdr *rec, struct lov_stripe_md *lsm,
- struct llog_cookie *logcookies, int numcookies)
-{
- struct llog_handle *cathandle;
- int rc;
- ENTRY;
-
- cathandle = ctxt->loc_handle;
- LASSERT(cathandle != NULL);
- rc = llog_cat_add(env, cathandle, rec, logcookies, NULL);
- if (rc != 0 && rc != 1)
- CERROR("write one catalog record failed: %d\n", rc);
- RETURN(rc);
-}
-EXPORT_SYMBOL(llog_obd_origin_add);
-
int obd_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
struct obd_device *disk_obd, int *index)
{
ldlm_objs += $(LDLM)ldlm_pool.o
ldlm_objs += $(LDLM)interval_tree.o
ptlrpc_objs := client.o recover.o connection.o niobuf.o pack_generic.o
-ptlrpc_objs += events.o ptlrpc_module.o service.o pinger.o recov_thread.o
+ptlrpc_objs += events.o ptlrpc_module.o service.o pinger.o
ptlrpc_objs += llog_net.o llog_client.o llog_server.o import.o ptlrpcd.o
ptlrpc_objs += pers.o lproc_ptlrpc.o wiretest.o layout.o
ptlrpc_objs += sec.o sec_bulk.o sec_gc.o sec_config.o sec_lproc.o
$(top_srcdir)/lustre/ldlm/ldlm_pool.c
COMMON_SOURCES = client.c recover.c connection.c niobuf.c pack_generic.c \
- events.c ptlrpc_module.c service.c pinger.c recov_thread.c llog_net.c \
+ events.c ptlrpc_module.c service.c pinger.c llog_net.c \
llog_client.c llog_server.c import.c ptlrpcd.c pers.c wiretest.c \
ptlrpc_internal.h layout.c sec.c sec_bulk.c sec_gc.c sec_config.c \
sec_lproc.c sec_null.c sec_plain.c lproc_ptlrpc.c nrs.c nrs_fifo.c \
pinger.c \
ptlrpcd.c \
recover.c \
- recov_thread.c \
service.c \
nrs.c \
nrs_fifo.c \
#include <lvfs.h>
#include <lustre_fsfilt.h>
-#ifdef __KERNEL__
-int llog_origin_connect(struct llog_ctxt *ctxt,
- struct llog_logid *logid, struct llog_gen *gen,
- struct obd_uuid *uuid)
-{
- struct llog_gen_rec *lgr;
- struct ptlrpc_request *req;
- struct llogd_conn_body *req_body;
- struct inode* inode = ctxt->loc_handle->lgh_file->f_dentry->d_inode;
- void *handle;
- int rc, rc1;
-
- ENTRY;
-
- if (cfs_list_empty(&ctxt->loc_handle->u.chd.chd_head)) {
- CDEBUG(D_HA, "there is no record related to ctxt %p\n", ctxt);
- RETURN(0);
- }
-
- /* FIXME what value for gen->conn_cnt */
- llog_gen_init(ctxt);
-
- /* first add llog_gen_rec */
- OBD_ALLOC_PTR(lgr);
- if (!lgr)
- RETURN(-ENOMEM);
- lgr->lgr_hdr.lrh_len = lgr->lgr_tail.lrt_len = sizeof(*lgr);
- lgr->lgr_hdr.lrh_type = LLOG_GEN_REC;
-
- handle = fsfilt_start_log(ctxt->loc_exp->exp_obd, inode,
- FSFILT_OP_CANCEL_UNLINK, NULL, 1);
- if (IS_ERR(handle)) {
- CERROR("fsfilt_start failed: %ld\n", PTR_ERR(handle));
- OBD_FREE(lgr, sizeof(*lgr));
- rc = PTR_ERR(handle);
- RETURN(rc);
- }
-
- lgr->lgr_gen = ctxt->loc_gen;
- rc = llog_obd_add(NULL, ctxt, &lgr->lgr_hdr, NULL, NULL, 1);
- OBD_FREE_PTR(lgr);
- rc1 = fsfilt_commit(ctxt->loc_exp->exp_obd, inode, handle, 0);
- if (rc != 1 || rc1 != 0) {
- rc = (rc != 1) ? rc : rc1;
- RETURN(rc);
- }
-
- LASSERT(ctxt->loc_imp);
- req = ptlrpc_request_alloc_pack(ctxt->loc_imp, &RQF_LLOG_ORIGIN_CONNECT,
- LUSTRE_LOG_VERSION,
- LLOG_ORIGIN_CONNECT);
- if (req == NULL)
- RETURN(-ENOMEM);
-
- CDEBUG(D_OTHER, "%s mount_count "LPU64", connection count "LPU64"\n",
- ctxt->loc_exp->exp_obd->obd_type->typ_name,
- ctxt->loc_gen.mnt_cnt, ctxt->loc_gen.conn_cnt);
-
- req_body = req_capsule_client_get(&req->rq_pill, &RMF_LLOGD_CONN_BODY);
- req_body->lgdc_gen = ctxt->loc_gen;
- req_body->lgdc_logid = ctxt->loc_handle->lgh_id;
- req_body->lgdc_ctxt_idx = ctxt->loc_idx + 1;
- ptlrpc_request_set_replen(req);
-
- req->rq_no_resend = req->rq_no_delay = 1;
- rc = ptlrpc_queue_wait(req);
- ptlrpc_req_finished(req);
-
- RETURN(rc);
-}
-EXPORT_SYMBOL(llog_origin_connect);
-
-int llog_handle_connect(struct ptlrpc_request *req)
-{
- struct obd_device *obd = req->rq_export->exp_obd;
- struct llogd_conn_body *req_body;
- struct llog_ctxt *ctxt;
- int rc;
- ENTRY;
-
- req_body = req_capsule_client_get(&req->rq_pill, &RMF_LLOGD_CONN_BODY);
-
- ctxt = llog_get_context(obd, req_body->lgdc_ctxt_idx);
- rc = llog_connect(ctxt, &req_body->lgdc_logid,
- &req_body->lgdc_gen, NULL);
-
- llog_ctxt_put(ctxt);
- if (rc != 0)
- CERROR("failed at llog_relp_connect\n");
-
- RETURN(rc);
-}
-EXPORT_SYMBOL(llog_handle_connect);
-
-int llog_receptor_accept(struct llog_ctxt *ctxt, struct obd_import *imp)
-{
- ENTRY;
-
- LASSERT(ctxt);
- mutex_lock(&ctxt->loc_mutex);
- if (ctxt->loc_imp != imp) {
- if (ctxt->loc_imp) {
- CWARN("changing the import %p - %p\n",
- ctxt->loc_imp, imp);
- class_import_put(ctxt->loc_imp);
- }
- ctxt->loc_imp = class_import_get(imp);
- }
- mutex_unlock(&ctxt->loc_mutex);
- RETURN(0);
-}
-EXPORT_SYMBOL(llog_receptor_accept);
-
-#else /* !__KERNEL__ */
-
-int llog_origin_connect(struct llog_ctxt *ctxt,
- struct llog_logid *logid, struct llog_gen *gen,
- struct obd_uuid *uuid)
-{
- return 0;
-}
-#endif
-
int llog_initiator_connect(struct llog_ctxt *ctxt)
{
struct obd_import *new_imp;
}
EXPORT_SYMBOL(lustre_swab_ldlm_reply);
-/* no one calls this */
-int llog_log_swabbed(struct llog_log_hdr *hdr)
-{
- if (hdr->llh_hdr.lrh_type == __swab32(LLOG_HDR_MAGIC))
- return 1;
- if (hdr->llh_hdr.lrh_type == LLOG_HDR_MAGIC)
- return 0;
- return -1;
-}
-
void lustre_swab_quota_body(struct quota_body *b)
{
lustre_swab_lu_fid(&b->qb_fid);
int sptlrpc_init(void);
void sptlrpc_fini(void);
-/* recov_thread.c */
-int llog_recov_init(void);
-void llog_recov_fini(void);
-
static inline int ll_rpc_recoverable_error(int rc)
{
return (rc == -ENOTCONN || rc == -ENODEV);
if (rc)
GOTO(cleanup, rc);
- cleanup_phase = 6;
- rc = llog_recov_init();
- if (rc)
- GOTO(cleanup, rc);
-
cleanup_phase = 7;
rc = ptlrpc_nrs_init();
if (rc)
ptlrpc_nrs_fini();
#endif
case 7:
- llog_recov_fini();
- case 6:
sptlrpc_fini();
case 5:
ldlm_exit();
{
tgt_mod_exit();
ptlrpc_nrs_fini();
- llog_recov_fini();
sptlrpc_fini();
ldlm_exit();
ptlrpc_stop_pinger();
+++ /dev/null
-/*
- * GPL HEADER START
- *
- * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
- *
- * This program is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License version 2 only,
- * as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful, but
- * WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * General Public License version 2 for more details (a copy is included
- * in the LICENSE file that accompanied this code).
- *
- * You should have received a copy of the GNU General Public License
- * version 2 along with this program; If not, see
- * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
- *
- * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
- * CA 95054 USA or visit www.sun.com if you need additional information or
- * have any questions.
- *
- * GPL HEADER END
- */
-/*
- * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
- * Use is subject to license terms.
- *
- * Copyright (c) 2011, 2012, Intel Corporation.
- */
-/*
- * This file is part of Lustre, http://www.lustre.org/
- * Lustre is a trademark of Sun Microsystems, Inc.
- *
- * lustre/ptlrpc/recov_thread.c
- *
- * OST<->MDS recovery logging thread.
- * Invariants in implementation:
- * - we do not share logs among different OST<->MDS connections, so that
- * if an OST or MDS fails it need only look at log(s) relevant to itself
- *
- * Author: Andreas Dilger <adilger@clusterfs.com>
- * Yury Umanets <yury.umanets@sun.com>
- * Alexey Lyashkov <alexey.lyashkov@sun.com>
- */
-
-#define DEBUG_SUBSYSTEM S_LOG
-
-#ifdef __KERNEL__
-# include <libcfs/libcfs.h>
-#else
-# include <libcfs/list.h>
-# include <liblustre.h>
-#endif
-
-#include <obd_class.h>
-#include <obd_support.h>
-#include <obd_class.h>
-#include <lustre_net.h>
-#include <lnet/types.h>
-#include <libcfs/list.h>
-#include <lustre_log.h>
-#include "ptlrpc_internal.h"
-
-static cfs_atomic_t llcd_count = CFS_ATOMIC_INIT(0);
-static cfs_mem_cache_t *llcd_cache = NULL;
-
-#ifdef __KERNEL__
-enum {
- LLOG_LCM_FL_START = 1 << 0,
- LLOG_LCM_FL_EXIT = 1 << 1
-};
-
-struct llcd_async_args {
- struct llog_canceld_ctxt *la_ctxt;
-};
-
-static void llcd_print(struct llog_canceld_ctxt *llcd,
- const char *func, int line)
-{
- CDEBUG(D_RPCTRACE, "Llcd (%p) at %s:%d:\n", llcd, func, line);
- CDEBUG(D_RPCTRACE, " size: %d\n", llcd->llcd_size);
- CDEBUG(D_RPCTRACE, " ctxt: %p\n", llcd->llcd_ctxt);
- CDEBUG(D_RPCTRACE, " lcm : %p\n", llcd->llcd_lcm);
- CDEBUG(D_RPCTRACE, " cookiebytes : %d\n", llcd->llcd_cookiebytes);
-}
-
-/**
- * Allocate new llcd from cache, init it and return to caller.
- * Bumps number of objects allocated.
- */
-static struct llog_canceld_ctxt *llcd_alloc(struct llog_commit_master *lcm)
-{
- struct llog_canceld_ctxt *llcd;
- int size, overhead;
-
- LASSERT(lcm != NULL);
-
- /*
- * We want to send one page of cookies with rpc header. This buffer
- * will be assigned later to the rpc, this is why we preserve the
- * space for rpc header.
- */
- size = CFS_PAGE_SIZE - lustre_msg_size(LUSTRE_MSG_MAGIC_V2, 1, NULL);
- overhead = offsetof(struct llog_canceld_ctxt, llcd_cookies);
- OBD_SLAB_ALLOC_GFP(llcd, llcd_cache, size + overhead, CFS_ALLOC_STD);
- if (!llcd)
- return NULL;
-
- CFS_INIT_LIST_HEAD(&llcd->llcd_list);
- llcd->llcd_cookiebytes = 0;
- llcd->llcd_size = size;
-
- spin_lock(&lcm->lcm_lock);
- llcd->llcd_lcm = lcm;
- cfs_atomic_inc(&lcm->lcm_count);
- cfs_list_add_tail(&llcd->llcd_list, &lcm->lcm_llcds);
- spin_unlock(&lcm->lcm_lock);
- cfs_atomic_inc(&llcd_count);
-
- CDEBUG(D_RPCTRACE, "Alloc llcd %p on lcm %p (%d)\n",
- llcd, lcm, cfs_atomic_read(&lcm->lcm_count));
-
- return llcd;
-}
-
-/**
- * Returns passed llcd to cache.
- */
-static void llcd_free(struct llog_canceld_ctxt *llcd)
-{
- struct llog_commit_master *lcm = llcd->llcd_lcm;
- int size;
-
- if (lcm) {
- if (cfs_atomic_read(&lcm->lcm_count) == 0) {
- CERROR("Invalid llcd free %p\n", llcd);
- llcd_print(llcd, __FUNCTION__, __LINE__);
- LBUG();
- }
- spin_lock(&lcm->lcm_lock);
- LASSERT(!cfs_list_empty(&llcd->llcd_list));
- cfs_list_del_init(&llcd->llcd_list);
- cfs_atomic_dec(&lcm->lcm_count);
- spin_unlock(&lcm->lcm_lock);
-
- CDEBUG(D_RPCTRACE, "Free llcd %p on lcm %p (%d)\n",
- llcd, lcm, cfs_atomic_read(&lcm->lcm_count));
- }
-
- LASSERT(cfs_atomic_read(&llcd_count) > 0);
- cfs_atomic_dec(&llcd_count);
-
- size = offsetof(struct llog_canceld_ctxt, llcd_cookies) +
- llcd->llcd_size;
- OBD_SLAB_FREE(llcd, llcd_cache, size);
-}
-
-/**
- * Checks if passed cookie fits into llcd free space buffer. Returns
- * 1 if yes and 0 otherwise.
- */
-static inline int
-llcd_fit(struct llog_canceld_ctxt *llcd, struct llog_cookie *cookies)
-{
- return (llcd->llcd_size - llcd->llcd_cookiebytes >= sizeof(*cookies));
-}
-
-/**
- * Copy passed @cookies to @llcd.
- */
-static inline void
-llcd_copy(struct llog_canceld_ctxt *llcd, struct llog_cookie *cookies)
-{
- LASSERT(llcd_fit(llcd, cookies));
- memcpy((char *)llcd->llcd_cookies + llcd->llcd_cookiebytes,
- cookies, sizeof(*cookies));
- llcd->llcd_cookiebytes += sizeof(*cookies);
-}
-
-/**
- * Llcd completion function. Called uppon llcd send finish regardless
- * sending result. Error is passed in @rc. Note, that this will be called
- * in cleanup time when all inflight rpcs aborted.
- */
-static int
-llcd_interpret(const struct lu_env *env,
- struct ptlrpc_request *req, void *args, int rc)
-{
- struct llcd_async_args *la = args;
- struct llog_canceld_ctxt *llcd = la->la_ctxt;
-
- CDEBUG(D_RPCTRACE, "Sent llcd %p (%d) - killing it\n", llcd, rc);
- llcd_free(llcd);
- return 0;
-}
-
-/**
- * Send @llcd to remote node. Free llcd uppon completion or error. Sending
- * is performed in async style so this function will return asap without
- * blocking.
- */
-static int llcd_send(struct llog_canceld_ctxt *llcd)
-{
- char *bufs[2] = { NULL, (char *)llcd->llcd_cookies };
- struct obd_import *import = NULL;
- struct llog_commit_master *lcm;
- struct llcd_async_args *la;
- struct ptlrpc_request *req;
- struct llog_ctxt *ctxt;
- int rc;
- ENTRY;
-
- ctxt = llcd->llcd_ctxt;
- if (!ctxt) {
- CERROR("Invalid llcd with NULL ctxt found (%p)\n",
- llcd);
- llcd_print(llcd, __FUNCTION__, __LINE__);
- LBUG();
- }
- LASSERT_MUTEX_LOCKED(&ctxt->loc_mutex);
-
- if (llcd->llcd_cookiebytes == 0)
- GOTO(exit, rc = 0);
-
- lcm = llcd->llcd_lcm;
-
- /*
- * Check if we're in exit stage. Do not send llcd in
- * this case.
- */
- if (test_bit(LLOG_LCM_FL_EXIT, &lcm->lcm_flags))
- GOTO(exit, rc = -ENODEV);
-
- CDEBUG(D_RPCTRACE, "Sending llcd %p\n", llcd);
-
- import = llcd->llcd_ctxt->loc_imp;
- if (!import || (import == LP_POISON) ||
- (import->imp_client == LP_POISON)) {
- CERROR("Invalid import %p for llcd %p\n",
- import, llcd);
- GOTO(exit, rc = -ENODEV);
- }
-
- OBD_FAIL_TIMEOUT(OBD_FAIL_PTLRPC_DELAY_RECOV, 10);
-
- /*
- * No need to get import here as it is already done in
- * llog_receptor_accept().
- */
- req = ptlrpc_request_alloc(import, &RQF_LOG_CANCEL);
- if (req == NULL) {
- CERROR("Can't allocate request for sending llcd %p\n",
- llcd);
- GOTO(exit, rc = -ENOMEM);
- }
- req_capsule_set_size(&req->rq_pill, &RMF_LOGCOOKIES,
- RCL_CLIENT, llcd->llcd_cookiebytes);
-
- rc = ptlrpc_request_bufs_pack(req, LUSTRE_LOG_VERSION,
- OBD_LOG_CANCEL, bufs, NULL);
- if (rc) {
- ptlrpc_request_free(req);
- GOTO(exit, rc);
- }
-
- ptlrpc_at_set_req_timeout(req);
- ptlrpc_request_set_replen(req);
-
- /* bug 5515 */
- req->rq_request_portal = LDLM_CANCEL_REQUEST_PORTAL;
- req->rq_reply_portal = LDLM_CANCEL_REPLY_PORTAL;
-
- req->rq_interpret_reply = (ptlrpc_interpterer_t)llcd_interpret;
-
- CLASSERT(sizeof(*la) <= sizeof(req->rq_async_args));
- la = ptlrpc_req_async_args(req);
- la->la_ctxt = llcd;
-
- /* llog cancels will be replayed after reconnect so this will do twice
- * first from replay llog, second for resended rpc */
- req->rq_no_delay = req->rq_no_resend = 1;
-
- ptlrpc_set_add_new_req(&lcm->lcm_pc, req);
- RETURN(0);
-exit:
- CDEBUG(D_RPCTRACE, "Refused llcd %p\n", llcd);
- llcd_free(llcd);
- return rc;
-}
-
-/**
- * Attach @llcd to @ctxt. Establish llcd vs. ctxt reserve connection
- * so hat they can refer each other.
- */
-static int
-llcd_attach(struct llog_ctxt *ctxt, struct llog_canceld_ctxt *llcd)
-{
- LASSERT(ctxt != NULL && llcd != NULL);
- LASSERT_MUTEX_LOCKED(&ctxt->loc_mutex);
- LASSERT(ctxt->loc_llcd == NULL);
- llcd->llcd_ctxt = llog_ctxt_get(ctxt);
- ctxt->loc_llcd = llcd;
-
- CDEBUG(D_RPCTRACE, "Attach llcd %p to ctxt %p\n",
- llcd, ctxt);
-
- return 0;
-}
-
-/**
- * Opposite to llcd_attach(). Detaches llcd from its @ctxt. This makes
- * sure that this llcd will not be found another time we try to cancel.
- */
-static struct llog_canceld_ctxt *llcd_detach(struct llog_ctxt *ctxt)
-{
- struct llog_canceld_ctxt *llcd;
-
- LASSERT(ctxt != NULL);
- LASSERT_MUTEX_LOCKED(&ctxt->loc_mutex);
-
- llcd = ctxt->loc_llcd;
- if (!llcd)
- return NULL;
-
- CDEBUG(D_RPCTRACE, "Detach llcd %p from ctxt %p\n",
- llcd, ctxt);
-
- ctxt->loc_llcd = NULL;
- llog_ctxt_put(ctxt);
- return llcd;
-}
-
-/**
- * Return @llcd cached in @ctxt. Allocate new one if required. Attach it
- * to ctxt so that it may be used for gathering cookies and sending.
- */
-static struct llog_canceld_ctxt *llcd_get(struct llog_ctxt *ctxt)
-{
- struct llog_canceld_ctxt *llcd;
- LASSERT(ctxt);
- llcd = llcd_alloc(ctxt->loc_lcm);
- if (!llcd) {
- CERROR("Can't alloc an llcd for ctxt %p\n", ctxt);
- return NULL;
- }
- llcd_attach(ctxt, llcd);
- return llcd;
-}
-
-/**
- * Deatch llcd from its @ctxt. Free llcd.
- */
-static void llcd_put(struct llog_ctxt *ctxt)
-{
- struct llog_canceld_ctxt *llcd;
-
- llcd = llcd_detach(ctxt);
- if (llcd)
- llcd_free(llcd);
-}
-
-/**
- * Detach llcd from its @ctxt so that nobody will find it with try to
- * re-use. Send llcd to remote node.
- */
-static int llcd_push(struct llog_ctxt *ctxt)
-{
- struct llog_canceld_ctxt *llcd;
- int rc;
-
- /*
- * Make sure that this llcd will not be sent again as we detach
- * it from ctxt.
- */
- llcd = llcd_detach(ctxt);
- if (!llcd) {
- CERROR("Invalid detached llcd found %p\n", llcd);
- llcd_print(llcd, __FUNCTION__, __LINE__);
- LBUG();
- }
-
- rc = llcd_send(llcd);
- if (rc)
- CERROR("Couldn't send llcd %p (%d)\n", llcd, rc);
- return rc;
-}
-
-/**
- * Start recovery thread which actually deals llcd sending. This
- * is all ptlrpc standard thread based so there is not much of work
- * to do.
- */
-int llog_recov_thread_start(struct llog_commit_master *lcm)
-{
- int rc;
- ENTRY;
-
- rc = ptlrpcd_start(-1, 1, lcm->lcm_name, &lcm->lcm_pc);
- if (rc) {
- CERROR("Error %d while starting recovery thread %s\n",
- rc, lcm->lcm_name);
- RETURN(rc);
- }
- RETURN(rc);
-}
-EXPORT_SYMBOL(llog_recov_thread_start);
-
-/**
- * Stop recovery thread. Complement to llog_recov_thread_start().
- */
-void llog_recov_thread_stop(struct llog_commit_master *lcm, int force)
-{
- ENTRY;
-
- /*
- * Let all know that we're stopping. This will also make
- * llcd_send() refuse any new llcds.
- */
- set_bit(LLOG_LCM_FL_EXIT, &lcm->lcm_flags);
-
- /*
- * Stop processing thread. No new rpcs will be accepted for
- * for processing now.
- */
- ptlrpcd_stop(&lcm->lcm_pc, force);
- ptlrpcd_free(&lcm->lcm_pc);
-
- /*
- * By this point no alive inflight llcds should be left. Only
- * those forgotten in sync may still be attached to ctxt. Let's
- * print them.
- */
- if (cfs_atomic_read(&lcm->lcm_count) != 0) {
- struct llog_canceld_ctxt *llcd;
- cfs_list_t *tmp;
-
- CERROR("Busy llcds found (%d) on lcm %p\n",
- cfs_atomic_read(&lcm->lcm_count), lcm);
-
- spin_lock(&lcm->lcm_lock);
- cfs_list_for_each(tmp, &lcm->lcm_llcds) {
- llcd = cfs_list_entry(tmp, struct llog_canceld_ctxt,
- llcd_list);
- llcd_print(llcd, __func__, __LINE__);
- }
- spin_unlock(&lcm->lcm_lock);
-
- /*
- * No point to go further with busy llcds at this point
- * as this is clear bug. It might mean we got hanging
- * rpc which holds import ref and this means we will not
- * be able to cleanup anyways.
- *
- * Or we just missed to kill them when they were not
- * attached to ctxt. In this case our slab will remind
- * us about this a bit later.
- */
- LBUG();
- }
- EXIT;
-}
-EXPORT_SYMBOL(llog_recov_thread_stop);
-
-/**
- * Initialize commit master structure and start recovery thread on it.
- */
-struct llog_commit_master *llog_recov_thread_init(char *name)
-{
- struct llog_commit_master *lcm;
- int rc;
- ENTRY;
-
- OBD_ALLOC_PTR(lcm);
- if (!lcm)
- RETURN(NULL);
-
- /*
- * Try to create threads with unique names.
- */
- snprintf(lcm->lcm_name, sizeof(lcm->lcm_name),
- "lcm_%s", name);
-
- cfs_atomic_set(&lcm->lcm_count, 0);
- cfs_atomic_set(&lcm->lcm_refcount, 1);
- spin_lock_init(&lcm->lcm_lock);
- CFS_INIT_LIST_HEAD(&lcm->lcm_llcds);
- rc = llog_recov_thread_start(lcm);
- if (rc) {
- CERROR("Can't start commit thread, rc %d\n", rc);
- GOTO(out, rc);
- }
- RETURN(lcm);
-out:
- OBD_FREE_PTR(lcm);
- return NULL;
-}
-EXPORT_SYMBOL(llog_recov_thread_init);
-
-/**
- * Finalize commit master and its recovery thread.
- */
-void llog_recov_thread_fini(struct llog_commit_master *lcm, int force)
-{
- ENTRY;
- llog_recov_thread_stop(lcm, force);
- lcm_put(lcm);
- EXIT;
-}
-EXPORT_SYMBOL(llog_recov_thread_fini);
-
-static int llog_recov_thread_replay(struct llog_ctxt *ctxt,
- void *cb, void *arg)
-{
- struct obd_device *obd = ctxt->loc_obd;
- struct llog_process_cat_args *lpca;
- int rc;
- ENTRY;
-
- if (obd->obd_stopping)
- RETURN(-ENODEV);
-
- /*
- * This will be balanced in llog_cat_process_thread()
- */
- OBD_ALLOC_PTR(lpca);
- if (!lpca)
- RETURN(-ENOMEM);
-
- lpca->lpca_cb = cb;
- lpca->lpca_arg = arg;
-
- /*
- * This will be balanced in llog_cat_process_thread()
- */
- lpca->lpca_ctxt = llog_ctxt_get(ctxt);
- if (!lpca->lpca_ctxt) {
- OBD_FREE_PTR(lpca);
- RETURN(-ENODEV);
- }
- rc = cfs_create_thread(llog_cat_process_thread, lpca, CFS_DAEMON_FLAGS);
- if (rc < 0) {
- CERROR("Error starting llog_cat_process_thread(): %d\n", rc);
- OBD_FREE_PTR(lpca);
- llog_ctxt_put(ctxt);
- } else {
- CDEBUG(D_HA, "Started llog_cat_process_thread(): %d\n", rc);
- rc = 0;
- }
-
- RETURN(rc);
-}
-
-int llog_obd_repl_connect(struct llog_ctxt *ctxt,
- struct llog_logid *logid, struct llog_gen *gen,
- struct obd_uuid *uuid)
-{
- int rc;
- ENTRY;
-
- /*
- * Send back cached llcd from llog before recovery if we have any.
- * This is void is nothing cached is found there.
- */
- llog_sync(ctxt, NULL, 0);
-
- /*
- * Start recovery in separate thread.
- */
- mutex_lock(&ctxt->loc_mutex);
- ctxt->loc_gen = *gen;
- rc = llog_recov_thread_replay(ctxt, ctxt->llog_proc_cb, logid);
- mutex_unlock(&ctxt->loc_mutex);
-
- RETURN(rc);
-}
-EXPORT_SYMBOL(llog_obd_repl_connect);
-
-/**
- * Deleted objects have a commit callback that cancels the MDS
- * log record for the deletion. The commit callback calls this
- * function.
- */
-int llog_obd_repl_cancel(const struct lu_env *env, struct llog_ctxt *ctxt,
- struct lov_stripe_md *lsm, int count,
- struct llog_cookie *cookies, int flags)
-{
- struct llog_commit_master *lcm;
- struct llog_canceld_ctxt *llcd;
- int rc = 0;
- ENTRY;
-
- LASSERT(ctxt != NULL);
-
- mutex_lock(&ctxt->loc_mutex);
- if (!ctxt->loc_lcm) {
- CDEBUG(D_RPCTRACE, "No lcm for ctxt %p\n", ctxt);
- GOTO(out, rc = -ENODEV);
- }
- lcm = ctxt->loc_lcm;
- CDEBUG(D_INFO, "cancel on lsm %p\n", lcm);
-
- /*
- * Let's check if we have all structures alive. We also check for
- * possible shutdown. Do nothing if we're stopping.
- */
- if (ctxt->loc_flags & LLOG_CTXT_FLAG_STOP) {
- CDEBUG(D_RPCTRACE, "Last sync was done for ctxt %p\n", ctxt);
- GOTO(out, rc = -ENODEV);
- }
-
- if (ctxt->loc_imp == NULL) {
- CDEBUG(D_RPCTRACE, "No import for ctxt %p\n", ctxt);
- GOTO(out, rc = -ENODEV);
- }
-
- if (test_bit(LLOG_LCM_FL_EXIT, &lcm->lcm_flags)) {
- CDEBUG(D_RPCTRACE, "Commit thread is stopping for ctxt %p\n",
- ctxt);
- GOTO(out, rc = -ENODEV);
- }
-
- llcd = ctxt->loc_llcd;
-
- if (count > 0 && cookies != NULL) {
- /*
- * Get new llcd from ctxt if required.
- */
- if (!llcd) {
- llcd = llcd_get(ctxt);
- if (!llcd)
- GOTO(out, rc = -ENOMEM);
- /*
- * Allocation is successful, let's check for stop
- * flag again to fall back as soon as possible.
- */
- if (test_bit(LLOG_LCM_FL_EXIT, &lcm->lcm_flags))
- GOTO(out, rc = -ENODEV);
- }
-
- /*
- * Llcd does not have enough room for @cookies. Let's push
- * it out and allocate new one.
- */
- if (!llcd_fit(llcd, cookies)) {
- rc = llcd_push(ctxt);
- if (rc)
- GOTO(out, rc);
- llcd = llcd_get(ctxt);
- if (!llcd)
- GOTO(out, rc = -ENOMEM);
- /*
- * Allocation is successful, let's check for stop
- * flag again to fall back as soon as possible.
- */
- if (test_bit(LLOG_LCM_FL_EXIT, &lcm->lcm_flags))
- GOTO(out, rc = -ENODEV);
- }
-
- /*
- * Copy cookies to @llcd, no matter old or new allocated
- * one.
- */
- llcd_copy(llcd, cookies);
- }
-
- /*
- * Let's check if we need to send copied @cookies asap. If yes
- * then do it.
- */
- if (llcd && (flags & OBD_LLOG_FL_SENDNOW)) {
- CDEBUG(D_RPCTRACE, "Sync llcd %p\n", llcd);
- rc = llcd_push(ctxt);
- if (rc)
- GOTO(out, rc);
- }
- EXIT;
-out:
- if (rc)
- llcd_put(ctxt);
-
- if (flags & OBD_LLOG_FL_EXIT)
- ctxt->loc_flags = LLOG_CTXT_FLAG_STOP;
-
- mutex_unlock(&ctxt->loc_mutex);
- return rc;
-}
-EXPORT_SYMBOL(llog_obd_repl_cancel);
-
-int llog_obd_repl_sync(struct llog_ctxt *ctxt, struct obd_export *exp,
- int flags)
-{
- int rc = 0;
- ENTRY;
-
- /*
- * Flush any remaining llcd.
- */
- mutex_lock(&ctxt->loc_mutex);
- if (exp && (ctxt->loc_imp == exp->exp_imp_reverse)) {
- /*
- * This is ost->mds connection, we can't be sure that mds
- * can still receive cookies, let's killed the cached llcd.
- */
- CDEBUG(D_RPCTRACE, "Kill cached llcd\n");
- llcd_put(ctxt);
-
- if (flags & OBD_LLOG_FL_EXIT)
- ctxt->loc_flags = LLOG_CTXT_FLAG_STOP;
-
- mutex_unlock(&ctxt->loc_mutex);
- } else {
- /*
- * This is either llog_sync() from generic llog code or sync
- * on client disconnect. In either way let's do it and send
- * llcds to the target with waiting for completion.
- */
- CDEBUG(D_RPCTRACE, "Sync cached llcd\n");
- mutex_unlock(&ctxt->loc_mutex);
- rc = llog_cancel(NULL, ctxt, NULL, 0, NULL,
- OBD_LLOG_FL_SENDNOW | flags);
- }
- RETURN(rc);
-}
-EXPORT_SYMBOL(llog_obd_repl_sync);
-
-#else /* !__KERNEL__ */
-
-int llog_obd_repl_cancel(const struct lu_env *env, struct llog_ctxt *ctxt,
- struct lov_stripe_md *lsm, int count,
- struct llog_cookie *cookies, int flags)
-{
- return 0;
-}
-#endif
-
-/**
- * Module init time fucntion. Initializes slab for llcd objects.
- */
-int llog_recov_init(void)
-{
- int llcd_size;
-
- llcd_size = CFS_PAGE_SIZE -
- lustre_msg_size(LUSTRE_MSG_MAGIC_V2, 1, NULL);
- llcd_size += offsetof(struct llog_canceld_ctxt, llcd_cookies);
- llcd_cache = cfs_mem_cache_create("llcd_cache", llcd_size, 0, 0);
- if (!llcd_cache) {
- CERROR("Error allocating llcd cache\n");
- return -ENOMEM;
- }
- return 0;
-}
-
-/**
- * Module fini time fucntion. Releases slab for llcd objects.
- */
-void llog_recov_fini(void)
-{
- /*
- * Kill llcd cache when thread is stopped and we're sure no
- * llcd in use left.
- */
- if (llcd_cache) {
- /*
- * In 2.6.22 cfs_mem_cache_destroy() will not return error
- * for busy resources. Let's check it another way.
- */
- LASSERTF(cfs_atomic_read(&llcd_count) == 0,
- "Can't destroy llcd cache! Number of "
- "busy llcds: %d\n", cfs_atomic_read(&llcd_count));
- cfs_mem_cache_destroy(llcd_cache);
- llcd_cache = NULL;
- }
-}