#endif
#include <linux/lustre_log.h>
-#include <portals/list.h>
+#include <libcfs/list.h>
/* Create a new log handle and add it to the open list.
* This log handle will be closed when all of the records in it are removed.
*
* Assumes caller has already pushed us into the kernel context and is locking.
*/
-static struct llog_handle *llog_cat_new_log(struct llog_handle *cathandle)
+static struct llog_handle *llog_cat_new_log(struct llog_handle *cathandle,
+ struct llog_cookie *logcookie)
{
struct llog_handle *loghandle;
struct llog_log_hdr *llh;
ENTRY;
llh = cathandle->lgh_hdr;
- bitmap_size = sizeof(llh->llh_bitmap) * 8;
+ bitmap_size = LLOG_BITMAP_SIZE(llh);
index = (cathandle->lgh_last_idx + 1) % bitmap_size;
llh->llh_tail.lrt_index = cpu_to_le32(index);
}
- rc = llog_create(cathandle->lgh_ctxt, &loghandle, NULL, NULL);
- if (rc)
+ if (logcookie && llog_cookie_get_flags(logcookie) & LLOG_COOKIE_REPLAY_NEW)
+ rc = llog_open(cathandle->lgh_ctxt, &loghandle,
+ &logcookie->lgc_lgl, NULL, OBD_LLOG_FL_CREATE);
+ else
+ rc = llog_open(cathandle->lgh_ctxt, &loghandle, NULL, NULL,
+ OBD_LLOG_FL_CREATE);
+ if (rc) {
+ CERROR("cannot create new log, error = %d\n", rc);
RETURN(ERR_PTR(rc));
+ }
rc = llog_init_handle(loghandle,
LLOG_F_IS_PLAIN | LLOG_F_ZAP_WHEN_EMPTY,
/* update the catalog: header and record */
rc = llog_write_rec(cathandle, &rec.lid_hdr,
&loghandle->u.phd.phd_cookie, 1, NULL, index);
- if (rc < 0) {
+ if (rc < 0)
GOTO(out_destroy, rc);
- }
loghandle->lgh_hdr->llh_cat_idx = cpu_to_le32(index);
cathandle->u.chd.chd_current_log = loghandle;
list_add_tail(&loghandle->u.phd.phd_entry, &cathandle->u.chd.chd_head);
out_destroy:
- if (rc < 0)
+ if (rc < 0)
llog_destroy(loghandle);
+ else if (logcookie) {
+ if (llog_cookie_get_flags(logcookie) & LLOG_COOKIE_REPLAY_NEW)
+ LASSERT(EQ_LOGID(loghandle->lgh_id, logcookie->lgc_lgl));
+ else
+ llog_cookie_set_flags(logcookie, LLOG_COOKIE_REPLAY_NEW);
+ }
RETURN(loghandle);
}
}
}
- rc = llog_create(cathandle->lgh_ctxt, &loghandle, logid, NULL);
+ rc = llog_open(cathandle->lgh_ctxt, &loghandle, logid, NULL, 0);
if (rc) {
CERROR("error opening log id "LPX64":%x: rc %d\n",
logid->lgl_oid, logid->lgl_ogen, rc);
* NOTE: loghandle is write-locked upon successful return
*/
static struct llog_handle *llog_cat_current_log(struct llog_handle *cathandle,
- int create)
+ int create,
+ struct llog_cookie *logcookie,
+ struct rw_semaphore **lock)
{
struct llog_handle *loghandle = NULL;
ENTRY;
loghandle = cathandle->u.chd.chd_current_log;
if (loghandle) {
struct llog_log_hdr *llh = loghandle->lgh_hdr;
- if (loghandle->lgh_last_idx < (sizeof(llh->llh_bitmap)*8) - 1) {
- down_write(&loghandle->lgh_lock);
+ down_write(&loghandle->lgh_lock);
+ if (loghandle->lgh_last_idx < (LLOG_BITMAP_SIZE(llh) - 1) &&
+ (!logcookie ||
+ !(llog_cookie_get_flags(logcookie) & LLOG_COOKIE_REPLAY) ||
+ EQ_LOGID(loghandle->lgh_id, logcookie->lgc_lgl))) {
up_read(&cathandle->lgh_lock);
RETURN(loghandle);
+ } else {
+ up_write(&loghandle->lgh_lock);
}
}
+
+ LASSERT(!logcookie ||
+ !(llog_cookie_get_flags(logcookie) & LLOG_COOKIE_REPLAY) ||
+ llog_cookie_get_flags(logcookie) & LLOG_COOKIE_REPLAY_NEW);
+
if (!create) {
if (loghandle)
down_write(&loghandle->lgh_lock);
loghandle = cathandle->u.chd.chd_current_log;
if (loghandle) {
struct llog_log_hdr *llh = loghandle->lgh_hdr;
- if (loghandle->lgh_last_idx < (sizeof(llh->llh_bitmap)*8) - 1) {
- down_write(&loghandle->lgh_lock);
+ down_write(&loghandle->lgh_lock);
+ if (loghandle->lgh_last_idx < (LLOG_BITMAP_SIZE(llh) - 1) &&
+ (!logcookie ||
+ !(llog_cookie_get_flags(logcookie) & LLOG_COOKIE_REPLAY) ||
+ EQ_LOGID(loghandle->lgh_id, logcookie->lgc_lgl))) {
up_write(&cathandle->lgh_lock);
RETURN(loghandle);
+ } else {
+ up_write(&loghandle->lgh_lock);
}
}
CDEBUG(D_INODE, "creating new log\n");
- loghandle = llog_cat_new_log(cathandle);
- if (!IS_ERR(loghandle))
+ loghandle = llog_cat_new_log(cathandle, logcookie);
+ if (!IS_ERR(loghandle)) {
down_write(&loghandle->lgh_lock);
+ if (lock != NULL)
+ *lock = &loghandle->lgh_lock;
+ }
+
up_write(&cathandle->lgh_lock);
RETURN(loghandle);
}
* Assumes caller has already pushed us into the kernel context.
*/
int llog_cat_add_rec(struct llog_handle *cathandle, struct llog_rec_hdr *rec,
- struct llog_cookie *reccookie, void *buf)
+ struct llog_cookie *reccookie, void *buf,
+ struct rw_semaphore **lock, int *lock_count)
{
struct llog_handle *loghandle;
int rc;
ENTRY;
LASSERT(le32_to_cpu(rec->lrh_len) <= LLOG_CHUNK_SIZE);
- loghandle = llog_cat_current_log(cathandle, 1);
+ loghandle = llog_cat_current_log(cathandle, 1, reccookie, lock);
if (IS_ERR(loghandle))
RETURN(PTR_ERR(loghandle));
/* loghandle is already locked by llog_cat_current_log() for us */
rc = llog_write_rec(loghandle, rec, reccookie, 1, buf, -1);
- up_write(&loghandle->lgh_lock);
+ if (!lock || *lock == NULL) {
+ up_write(&loghandle->lgh_lock);
+ } else {
+ LASSERT(lock_count != NULL);
+ *lock_count += 1;
+ }
RETURN(rc);
}
CERROR("invalid record in catalog\n");
RETURN(-EINVAL);
}
- CWARN("processing log "LPX64":%x at index %u of catalog "LPX64"\n",
+ CDEBUG(D_INFO, "processing log "LPX64":%x at index %u of catalog "LPX64"\n",
lir->lid_id.lgl_oid, lir->lid_id.lgl_ogen,
le32_to_cpu(rec->lrh_index), cat_llh->lgh_id.lgl_oid);
int i, bitmap_size, idx;
ENTRY;
- bitmap_size = sizeof(llh->llh_bitmap) * 8;
+ bitmap_size = LLOG_BITMAP_SIZE(llh);
if (llh->llh_cat_idx == cpu_to_le32(index - 1)) {
idx = le32_to_cpu(llh->llh_cat_idx) + 1;
llh->llh_cat_idx = cpu_to_le32(idx);
int llog_catalog_add(struct llog_ctxt *ctxt, struct llog_rec_hdr *rec,
void *buf, struct llog_cookie *logcookies,
- int numcookies, void *data)
+ int numcookies, void *data,
+ struct rw_semaphore **lock, int *lock_count)
{
struct llog_handle *cathandle;
int rc;
cathandle = ctxt->loc_handle;
LASSERT(cathandle != NULL);
- rc = llog_cat_add_rec(cathandle, rec, logcookies, buf);
+ rc = llog_cat_add_rec(cathandle, rec, logcookies, buf, lock, lock_count);
if (rc != 1)
CERROR("write one catalog record failed: %d\n", rc);
RETURN(rc);
EXPORT_SYMBOL(llog_catalog_cancel);
int llog_catalog_setup(struct llog_ctxt **res, char *name,
+ struct obd_export *exp,
struct lvfs_run_ctxt *lvfs_ctxt,
struct fsfilt_operations *fsops,
struct dentry *logs_de,
*res = ctxt;
+ /* marking this ctxt alone. */
+ ctxt->loc_alone = 1;
ctxt->loc_fsops = fsops;
ctxt->loc_lvfs_ctxt = lvfs_ctxt;
+ ctxt->loc_exp = exp;
ctxt->loc_logs_dir = logs_de;
ctxt->loc_objects_dir = objects_de;
ctxt->loc_logops = &llog_lvfs_ops;
RETURN(rc);
}
if (catid.lci_logid.lgl_oid)
- rc = llog_create(ctxt, &handle, &catid.lci_logid, 0);
+ rc = llog_open(ctxt, &handle, &catid.lci_logid, NULL,
+ OBD_LLOG_FL_CREATE);
else {
- rc = llog_create(ctxt, &handle, NULL, NULL);
+ rc = llog_open(ctxt, &handle, NULL, NULL, OBD_LLOG_FL_CREATE);
if (!rc)
catid.lci_logid = handle->lgh_id;
}
int llog_catalog_cleanup(struct llog_ctxt *ctxt)
{
- struct llog_handle *cathandle, *n, *loghandle;
- struct llog_log_hdr *llh;
- int rc, index;
+ struct llog_handle *cathandle;
ENTRY;
-
+
if (!ctxt)
return 0;
cathandle = ctxt->loc_handle;
- if (cathandle) {
- list_for_each_entry_safe(loghandle, n,
- &cathandle->u.chd.chd_head,
- u.phd.phd_entry) {
- llh = loghandle->lgh_hdr;
- if ((le32_to_cpu(llh->llh_flags) &
- LLOG_F_ZAP_WHEN_EMPTY) &&
- (le32_to_cpu(llh->llh_count) == 1)) {
- rc = llog_destroy(loghandle);
- if (rc)
- CERROR("failure destroying log during "
- "cleanup: %d\n", rc);
- LASSERT(rc == 0);
- index = loghandle->u.phd.phd_cookie.lgc_index;
- llog_free_handle(loghandle);
-
- LASSERT(index);
- llog_cat_set_first_idx(cathandle, index);
- rc = llog_cancel_rec(cathandle, index);
- if (rc == 0)
- CDEBUG(D_HA, "cancel plain log at index"
- " %u of catalog "LPX64"\n",
- index,cathandle->lgh_id.lgl_oid);
- }
- }
+ if (cathandle)
llog_cat_put(ctxt->loc_handle);
- }
- OBD_FREE(ctxt, sizeof(*ctxt));
+
return 0;
}
EXPORT_SYMBOL(llog_catalog_cleanup);
struct llog_handle *loghandle;
struct llog_logid *lgl = &cookie->lgc_lgl;
int rc;
-
+
down_read(&handle->lgh_lock);
rc = llog_cat_id2handle(handle, &loghandle, lgl);
if (rc)