int llog_backup(const struct lu_env *env, struct obd_device *obd,
struct llog_ctxt *ctxt, struct llog_ctxt *bak_ctxt,
char *name, char *backup);
+int llog_read_header(const struct lu_env *env, struct llog_handle *handle,
+ const struct obd_uuid *uuid);
/* llog_process flags */
#define LLOG_FLAG_NODEAMON 0x0001
int lgh_last_idx;
int lgh_cur_idx; /* used during llog_process */
__u64 lgh_cur_offset; /* used during llog_process */
+ /* used during llog_osd_write_rec */
+ __u64 lgh_write_offset;
struct llog_ctxt *lgh_ctxt;
union {
struct plain_handle_data phd;
void *private_data;
struct llog_operations *lgh_logops;
atomic_t lgh_refcount;
+
+ __u32 lgh_stale:1;
};
/* llog_osd.c */
#define OBD_FAIL_SPLIT_UPDATE_REC 0x1702
#define OBD_FAIL_LARGE_STRIPE 0x1703
#define OBD_FAIL_OUT_ENOSPC 0x1704
+#define OBD_FAIL_INVALIDATE_UPDATE 0x1705
/* MIGRATE */
#define OBD_FAIL_MIGRATE_NET_REP 0x1800
RETURN(rc);
}
-static int llog_read_header(const struct lu_env *env,
- struct llog_handle *handle,
- struct obd_uuid *uuid)
+int llog_read_header(const struct lu_env *env, struct llog_handle *handle,
+ const struct obd_uuid *uuid)
{
struct llog_operations *lop;
int rc;
+ ENTRY;
rc = llog_handle2ops(handle, &lop);
if (rc)
/* lrh_len should be initialized in llog_init_handle */
handle->lgh_last_idx = 0; /* header is record with index 0 */
+ handle->lgh_write_offset = 0;
llh->llh_count = 1; /* for the header record */
llh->llh_hdr.lrh_type = LLOG_HDR_MAGIC;
LASSERT(handle->lgh_ctxt->loc_chunk_size >=
memcpy(&llh->llh_tgtuuid, uuid,
sizeof(llh->llh_tgtuuid));
llh->llh_bitmap_offset = offsetof(typeof(*llh), llh_bitmap);
+ /* Since update llog header might also call this function,
+ * let's reset the bitmap to 0 here */
+ memset(LLOG_HDR_BITMAP(llh), 0, llh->llh_hdr.lrh_len -
+ llh->llh_bitmap_offset -
+ sizeof(llh->llh_tail));
ext2_set_bit(0, LLOG_HDR_BITMAP(llh));
LLOG_HDR_TAIL(llh)->lrt_len = llh->llh_hdr.lrh_len;
LLOG_HDR_TAIL(llh)->lrt_index = llh->llh_hdr.lrh_index;
rc = 0;
}
- return rc;
+ RETURN(rc);
}
+EXPORT_SYMBOL(llog_read_header);
int llog_init_handle(const struct lu_env *env, struct llog_handle *handle,
int flags, struct obd_uuid *uuid)
cd->lpcd_last_idx = last_called_index;
if (unlikely(rc == -EIO && loghandle->lgh_obj != NULL)) {
- /* something bad happened to the processing of a local
- * llog file, probably I/O error or the log got corrupted..
- * to be able to finally release the log we discard any
- * remaining bits in the header */
- CERROR("Local llog found corrupted\n");
- while (index <= last_index) {
- if (ext2_test_bit(index, LLOG_HDR_BITMAP(llh)) != 0)
- llog_cancel_rec(lpi->lpi_env, loghandle, index);
- index++;
+ if (dt_object_remote(loghandle->lgh_obj)) {
+ /* If it is remote object, then -EIO might means
+ * disconnection or eviction, let's return -EAGAIN,
+ * so for update recovery log processing, it will
+ * retry until the umount or abort recovery, see
+ * lod_sub_recovery_thread() */
+ CERROR("%s retry remote llog process\n",
+ loghandle->lgh_ctxt->loc_obd->obd_name);
+ rc = -EAGAIN;
+ } else {
+ /* something bad happened to the processing of a local
+ * llog file, probably I/O error or the log got
+ * corrupted to be able to finally release the log we
+ * discard any remaining bits in the header */
+ CERROR("Local llog found corrupted\n");
+ while (index <= last_index) {
+ if (ext2_test_bit(index,
+ LLOG_HDR_BITMAP(llh)) != 0)
+ llog_cancel_rec(lpi->lpi_env, loghandle,
+ index);
+ index++;
+ }
+ rc = 0;
}
- rc = 0;
}
OBD_FREE_LARGE(buf, chunk_size);
loghandle->lgh_hdr->llh_cat_idx = rec->lid_hdr.lrh_index;
out:
- if (handle != NULL)
+ if (handle != NULL) {
+ handle->th_result = rc >= 0 ? 0 : rc;
dt_trans_stop(env, dt, handle);
-
- RETURN(0);
+ }
+ RETURN(rc);
out_destroy:
/* to signal llog_cat_close() it shouldn't try to destroy the llog,
RETURN(loghandle);
}
+static int llog_cat_update_header(const struct lu_env *env,
+ struct llog_handle *cathandle)
+{
+ struct llog_handle *loghandle;
+ int rc;
+ ENTRY;
+
+ /* refresh llog */
+ down_write(&cathandle->lgh_lock);
+ if (!cathandle->lgh_stale) {
+ up_write(&cathandle->lgh_lock);
+ RETURN(0);
+ }
+ list_for_each_entry(loghandle, &cathandle->u.chd.chd_head,
+ u.phd.phd_entry) {
+ if (!llog_exist(loghandle))
+ continue;
+
+ rc = llog_read_header(env, loghandle, NULL);
+ if (rc != 0) {
+ up_write(&cathandle->lgh_lock);
+ GOTO(out, rc);
+ }
+ }
+ rc = llog_read_header(env, cathandle, NULL);
+ if (rc == 0)
+ cathandle->lgh_stale = 0;
+ up_write(&cathandle->lgh_lock);
+ if (rc != 0)
+ GOTO(out, rc);
+out:
+ RETURN(rc);
+}
+
/* Add a single record to the recovery log(s) using a catalog
* Returns as llog_write_record
*
* on the success of this transaction. So let's
* create the llog object synchronously here to
* remove the dependency. */
+create_again:
down_read_nested(&cathandle->lgh_lock, LLOGH_CAT);
loghandle = cathandle->u.chd.chd_current_log;
down_write_nested(&loghandle->lgh_lock, LLOGH_LOG);
- if (!llog_exist(loghandle))
+ if (cathandle->lgh_stale) {
+ up_write(&loghandle->lgh_lock);
+ up_read(&cathandle->lgh_lock);
+ GOTO(out, rc = -EIO);
+ }
+ if (!llog_exist(loghandle)) {
rc = llog_cat_new_log(env, cathandle, loghandle,
NULL);
+ if (rc == -ESTALE)
+ cathandle->lgh_stale = 1;
+ }
up_write(&loghandle->lgh_lock);
up_read(&cathandle->lgh_lock);
- if (rc < 0)
+ if (rc == -ESTALE) {
+ rc = llog_cat_update_header(env, cathandle);
+ if (rc != 0)
+ GOTO(out, rc);
+ goto create_again;
+ } else if (rc < 0) {
GOTO(out, rc);
-
+ }
} else {
rc = llog_declare_create(env,
cathandle->u.chd.chd_current_log, th);
&lirec->lid_hdr, -1, th);
}
}
+
+write_again:
/* declare records in the llogs */
rc = llog_declare_write_rec(env, cathandle->u.chd.chd_current_log,
rec, -1, th);
- if (rc)
+ if (rc == -ESTALE) {
+ down_write(&cathandle->lgh_lock);
+ if (cathandle->lgh_stale) {
+ up_write(&cathandle->lgh_lock);
+ GOTO(out, rc = -EIO);
+ }
+
+ cathandle->lgh_stale = 1;
+ up_write(&cathandle->lgh_lock);
+ rc = llog_cat_update_header(env, cathandle);
+ if (rc != 0)
+ GOTO(out, rc);
+ goto write_again;
+ } else if (rc < 0) {
GOTO(out, rc);
+ }
next = cathandle->u.chd.chd_next_log;
if (next) {
handle->lgh_hdr->llh_flags |= (flags & LLOG_F_EXT_MASK);
handle->lgh_last_idx = LLOG_HDR_TAIL(handle->lgh_hdr)->lrt_index;
+ handle->lgh_write_offset = lgi->lgi_attr.la_size;
RETURN(0);
}
struct dt_object *o;
__u32 chunk_size;
size_t left;
-
+ __u32 orig_last_idx;
+ __u64 orig_write_offset;
ENTRY;
LASSERT(env);
RETURN(-ENOSPC);
LASSERT(lgi->lgi_attr.la_valid & LA_SIZE);
+ orig_last_idx = loghandle->lgh_last_idx;
+ orig_write_offset = loghandle->lgh_write_offset;
lgi->lgi_off = lgi->lgi_attr.la_size;
left = chunk_size - (lgi->lgi_off & (chunk_size - 1));
/* NOTE: padding is a record, but no bit is set */
rc = llog_osd_pad(env, o, &lgi->lgi_off, left, index, th);
if (rc)
RETURN(rc);
+
+ if (dt_object_remote(o))
+ loghandle->lgh_write_offset = lgi->lgi_off;
+
loghandle->lgh_last_idx++; /* for pad rec */
}
/* if it's the last idx in log file, then return -ENOSPC
* records. This also allows to handle Catalog wrap around case */
if (llh->llh_flags & LLOG_F_IS_FIXSIZE) {
lgi->lgi_off = llh->llh_hdr.lrh_len + (index - 1) * reclen;
+ } else if (dt_object_remote(o)) {
+ lgi->lgi_off = max_t(__u64, loghandle->lgh_write_offset,
+ lgi->lgi_off);
} else {
rc = dt_attr_get(env, o, &lgi->lgi_attr);
if (rc)
if (rc < 0)
GOTO(out, rc);
- CDEBUG(D_OTHER, "added record "DOSTID": idx: %u, %u off"LPU64"\n",
- POSTID(&loghandle->lgh_id.lgl_oi), index, rec->lrh_len,
+ if (dt_object_remote(o))
+ loghandle->lgh_write_offset = lgi->lgi_off;
+
+ CDEBUG(D_HA, "added record "DFID": idx: %u, %u off"LPU64"\n",
+ PFID(lu_object_fid(&o->do_lu)), index, rec->lrh_len,
lgi->lgi_off);
if (reccookie != NULL) {
reccookie->lgc_lgl = loghandle->lgh_id;
mutex_unlock(&loghandle->lgh_hdr_mutex);
/* restore llog last_idx */
- if (--loghandle->lgh_last_idx == 0 &&
+ if (dt_object_remote(o)) {
+ loghandle->lgh_last_idx = orig_last_idx;
+ loghandle->lgh_write_offset = orig_write_offset;
+ } else if (--loghandle->lgh_last_idx == 0 &&
(llh->llh_flags & LLOG_F_IS_CAT) && llh->llh_cat_idx != 0) {
/* catalog had just wrap-around case */
loghandle->lgh_last_idx = LLOG_HDR_BITMAP_SIZE(llh) - 1;
}
+
LLOG_HDR_TAIL(llh)->lrt_index = loghandle->lgh_last_idx;
RETURN(rc);
if (len == 0 || len & (chunk_size - 1))
RETURN(-EINVAL);
- CDEBUG(D_OTHER, "looking for log index %u (cur idx %u off "LPU64")\n",
- next_idx, *cur_idx, *cur_offset);
-
LASSERT(loghandle);
LASSERT(loghandle->lgh_ctxt);
if (rc)
GOTO(out, rc);
+ CDEBUG(D_OTHER, "looking for log index %u (cur idx %u off"
+ LPU64"), size %llu\n", next_idx, *cur_idx,
+ *cur_offset, lgi->lgi_attr.la_size);
+
while (*cur_offset < lgi->lgi_attr.la_size) {
struct llog_rec_hdr *rec, *last_rec;
struct llog_rec_tail *tail;
if (LLOG_REC_HDR_NEEDS_SWABBING(last_rec))
lustre_swab_llog_rec(last_rec);
- LASSERT(last_rec->lrh_index == tail->lrt_index);
+
+ if (last_rec->lrh_index != tail->lrt_index) {
+ CERROR("%s: invalid llog tail at log id "DOSTID"/%u "
+ "offset "LPU64" last_rec idx %u tail idx %u\n",
+ o->do_lu.lo_dev->ld_obd->obd_name,
+ POSTID(&loghandle->lgh_id.lgl_oi),
+ loghandle->lgh_id.lgl_ogen, *cur_offset,
+ last_rec->lrh_index, tail->lrt_index);
+ GOTO(out, rc = -EINVAL);
+ }
*cur_idx = tail->lrt_index;
CDEBUG(D_HA, "got connected\n");
break;
case IMP_EVENT_INVALIDATE:
+ if (d->opd_connect_mdt)
+ osp_invalidate_request(d);
+
if (obd->obd_namespace == NULL)
break;
ldlm_namespace_cleanup(obd->obd_namespace, LDLM_FL_LOCAL_ONLY);
struct list_head ours_list;
};
-/**
- * Tracking the updates being executed on this dt_device.
- */
struct osp_update_request {
int our_flags;
/* update request result */
/* points to thandle if this update request belongs to one */
struct osp_thandle *our_th;
+
+ __u64 our_version;
+ /* protect our_list and flag */
+ spinlock_t our_list_lock;
/* linked to the list(ou_list) in osp_updates */
struct list_head our_list;
__u32 our_batchid;
- __u32 our_req_sent:1;
+ __u32 our_req_ready:1;
+
};
struct osp_updates {
struct lu_object_header opo_header;
struct dt_object opo_obj;
unsigned int opo_reserved:1,
- opo_non_exist:1;
+ opo_non_exist:1,
+ opo_stale:1;
/* read/write lock for md osp object */
struct rw_semaphore opo_sem;
struct list_head ot_stop_dcb_list;
struct osp_update_request *ot_our;
atomic_t ot_refcount;
- __u64 ot_version;
};
static inline struct osp_thandle *
void osp_update_request_destroy(struct osp_update_request *update);
int osp_send_update_thread(void *arg);
-int osp_check_and_set_rpc_version(struct osp_thandle *oth);
+int osp_check_and_set_rpc_version(struct osp_thandle *oth,
+ struct osp_object *obj);
void osp_thandle_destroy(struct osp_thandle *oth);
static inline void osp_thandle_get(struct osp_thandle *oth)
struct osp_device *osp);
void osp_trans_callback(const struct lu_env *env,
struct osp_thandle *oth, int rc);
+void osp_invalidate_request(struct osp_device *osp);
/* osp_object.c */
int osp_attr_get(const struct lu_env *env, struct dt_object *dt,
struct lu_attr *attr);
const struct lu_buf *buf,
loff_t pos, struct thandle *th)
{
- return osp_trans_update_request_create(th);
+ struct osp_device *osp = dt2osp_dev(th->th_dev);
+ int rc;
+
+ rc = osp_trans_update_request_create(th);
+ if (rc != 0)
+ return rc;
+
+ if (osp->opd_update == NULL)
+ return 0;
+
+ if (dt2osp_obj(dt)->opo_stale)
+ return -ESTALE;
+
+ return 0;
}
/**
update = thandle_to_osp_update_request(th);
LASSERT(update != NULL);
+ CDEBUG(D_INFO, "write "DFID" offset = "LPU64" length = %zu\n",
+ PFID(lu_object_fid(&dt->do_lu)), *pos, buf->lb_len);
+
rc = osp_update_rpc_pack(env, write, update, OUT_WRITE,
lu_object_fid(&dt->do_lu), buf, *pos);
if (rc < 0)
RETURN(rc);
- CDEBUG(D_INFO, "write "DFID" offset = "LPU64" length = %zu\n",
- PFID(lu_object_fid(&dt->do_lu)), *pos, buf->lb_len);
+ rc = osp_check_and_set_rpc_version(oth, obj);
+ if (rc < 0)
+ RETURN(rc);
/* XXX: how about the write error happened later? */
*pos += buf->lb_len;
obj->opo_ooa->ooa_attr.la_size < *pos)
obj->opo_ooa->ooa_attr.la_size = *pos;
- rc = osp_check_and_set_rpc_version(oth);
- if (rc < 0)
- RETURN(rc);
-
RETURN(buf->lb_len);
}
GOTO(out_update, rc);
}
+ CDEBUG(D_INFO, "%s "DFID" read offset %llu size %zu\n",
+ dt_dev->dd_lu_dev.ld_obd->obd_name,
+ PFID(lu_object_fid(&dt->do_lu)), *pos, rbuf->lb_len);
rc = osp_prep_update_req(env, osp->opd_obd->u.cli.cl_import, update,
&req);
if (rc != 0)
if (obj->opo_ooa != NULL) {
spin_lock(&obj->opo_lock);
- if (obj->opo_ooa->ooa_attr.la_valid != 0) {
+ if (obj->opo_ooa->ooa_attr.la_valid != 0 && !obj->opo_stale) {
*attr = obj->opo_ooa->ooa_attr;
spin_unlock(&obj->opo_lock);
if (rc != 0)
GOTO(out, rc);
- GOTO(out, rc = 0);
+ spin_lock(&obj->opo_lock);
+ if (obj->opo_stale)
+ obj->opo_stale = 0;
+ spin_unlock(&obj->opo_lock);
+
+ GOTO(out, rc);
out:
if (req != NULL)
INIT_LIST_HEAD(&our->our_req_list);
INIT_LIST_HEAD(&our->our_cb_items);
INIT_LIST_HEAD(&our->our_list);
+ spin_lock_init(&our->our_list_lock);
osp_object_update_request_create(our, OUT_UPDATE_INIT_BUFFER_SIZE);
return our;
RETURN(rc);
}
-static void osp_trans_stop_cb(struct osp_thandle *oth, int result)
+/**
+ * Invalidate all objects in the osp thandle
+ *
+ * invalidate all of objects in the update request, which will be called
+ * when the transaction is aborted.
+ *
+ * \param[in] oth osp thandle.
+ */
+static void osp_thandle_invalidate_object(const struct lu_env *env,
+ struct osp_thandle *oth)
+{
+ struct osp_update_request *our = oth->ot_our;
+ struct osp_update_request_sub *ours;
+
+ if (our == NULL)
+ return;
+
+ list_for_each_entry(ours, &our->our_req_list, ours_list) {
+ struct object_update_request *our_req = ours->ours_req;
+ unsigned int i;
+ struct lu_object *obj;
+ struct osp_object *osp_obj;
+
+ for (i = 0; i < our_req->ourq_count; i++) {
+ struct object_update *update;
+
+ update = object_update_request_get(our_req, i, NULL);
+ if (update == NULL)
+ break;
+
+ if (update->ou_type != OUT_WRITE)
+ continue;
+
+ if (!fid_is_sane(&update->ou_fid))
+ continue;
+
+ obj = lu_object_find_slice(env,
+ &oth->ot_super.th_dev->dd_lu_dev,
+ &update->ou_fid, NULL);
+ if (IS_ERR(obj))
+ break;
+
+ osp_obj = lu2osp_obj(obj);
+ if (osp_obj->opo_ooa != NULL) {
+ spin_lock(&osp_obj->opo_lock);
+ osp_obj->opo_ooa->ooa_attr.la_valid = 0;
+ osp_obj->opo_stale = 1;
+ spin_unlock(&osp_obj->opo_lock);
+ }
+ lu_object_put(env, obj);
+ }
+ }
+}
+
+static void osp_trans_stop_cb(const struct lu_env *env,
+ struct osp_thandle *oth, int result)
{
struct dt_txn_commit_cb *dcb;
struct dt_txn_commit_cb *tmp;
list_del_init(&dcb->dcb_linkage);
dcb->dcb_func(NULL, &oth->ot_super, dcb, result);
}
+
+ if (result < 0)
+ osp_thandle_invalidate_object(env, oth);
}
/**
reply = req_capsule_server_sized_get(&req->rq_pill,
&RMF_OUT_UPDATE_REPLY,
OUT_UPDATE_REPLY_SIZE);
- if (reply == NULL || reply->ourp_magic != UPDATE_REPLY_MAGIC)
- rc1 = -EPROTO;
- else
+ if (reply == NULL || reply->ourp_magic != UPDATE_REPLY_MAGIC) {
+ if (rc == 0)
+ rc = -EPROTO;
+ } else {
count = reply->ourp_count;
- } else {
- rc1 = rc;
+ }
}
list_for_each_entry_safe(ouc, next, &our->our_cb_items, ouc_list) {
/* The peer may only have handled some requests (indicated
* by the 'count') in the packaged OUT RPC, we can only get
* results for the handled part. */
- if (index < count && reply->ourp_lens[index] > 0) {
+ if (index < count && reply->ourp_lens[index] > 0 && rc >= 0) {
struct object_update_result *result;
result = object_update_result_get(reply, index, NULL);
if (result == NULL)
- rc1 = -EPROTO;
+ rc1 = rc = -EPROTO;
else
- rc1 = result->our_rc;
- } else {
- rc1 = rc;
- if (unlikely(rc1 == 0))
+ rc1 = rc = result->our_rc;
+ } else if (rc1 >= 0) {
+ /* The peer did not handle these request, let's return
+ * -EINVAL to update interpret for now */
+ if (rc >= 0)
rc1 = -EINVAL;
+ else
+ rc1 = rc;
}
if (ouc->ouc_interpreter != NULL)
if (oth != NULL) {
/* oth and osp_update_requests will be destoryed in
* osp_thandle_put */
- osp_trans_stop_cb(oth, rc);
+ osp_trans_stop_cb(env, oth, rc);
osp_thandle_put(oth);
} else {
osp_update_request_destroy(our);
}
- RETURN(0);
+ RETURN(rc);
}
/**
osp_update_callback_fini(env, ouc);
}
}
- osp_trans_stop_cb(oth, rc);
+ osp_trans_stop_cb(env, oth, rc);
osp_trans_commit_cb(oth, rc);
}
ENTRY;
LASSERT(oth != NULL);
- LASSERT(our->our_req_sent == 0);
rc = osp_prep_update_req(env, osp->opd_obd->u.cli.cl_import,
our, &req);
if (rc != 0) {
/**
* Set version for the transaction
*
- * Set the version for the transaction, then the osp RPC will be
- * sent in the order of version, i.e. the transaction with lower
- * version will be sent first.
+ * Set the version for the transaction and add the request to
+ * the sending list, then after transaction stop, the request
+ * will be picked in the order of version, by sending thread.
*
* \param [in] oth osp thandle to be set version.
*
* \retval 0 if set version succeeds
* negative errno if set version fails.
*/
-int osp_check_and_set_rpc_version(struct osp_thandle *oth)
+int osp_check_and_set_rpc_version(struct osp_thandle *oth,
+ struct osp_object *obj)
{
struct osp_device *osp = dt2osp_dev(oth->ot_super.th_dev);
struct osp_updates *ou = osp->opd_update;
if (ou == NULL)
return -EIO;
- if (oth->ot_version != 0)
+ if (oth->ot_our->our_version != 0)
return 0;
spin_lock(&ou->ou_lock);
- oth->ot_version = ou->ou_version++;
+ spin_lock(&oth->ot_our->our_list_lock);
+ if (obj->opo_stale) {
+ spin_unlock(&oth->ot_our->our_list_lock);
+ spin_unlock(&ou->ou_lock);
+ return -ESTALE;
+ }
+
+ /* Assign the version and add it to the sending list */
+ osp_thandle_get(oth);
+ oth->ot_our->our_version = ou->ou_version++;
+ list_add_tail(&oth->ot_our->our_list,
+ &osp->opd_update->ou_list);
+ oth->ot_our->our_req_ready = 0;
+ spin_unlock(&oth->ot_our->our_list_lock);
spin_unlock(&ou->ou_lock);
+ LASSERT(oth->ot_super.th_wait_submit == 1);
CDEBUG(D_INFO, "%s: version "LPU64" oth:version %p:"LPU64"\n",
- osp->opd_obd->obd_name, ou->ou_version, oth, oth->ot_version);
+ osp->opd_obd->obd_name, ou->ou_version, oth,
+ oth->ot_our->our_version);
return 0;
}
spin_lock(&ou->ou_lock);
list_for_each_entry_safe(our, tmp, &ou->ou_list, our_list) {
LASSERT(our->our_th != NULL);
- CDEBUG(D_INFO, "our %p version "LPU64" rpc_version "LPU64"\n",
- our, our->our_th->ot_version, ou->ou_rpc_version);
- if (our->our_th->ot_version == 0) {
- list_del_init(&our->our_list);
- *ourp = our;
- got_req = true;
- break;
- }
-
+ CDEBUG(D_HA, "ou %p version "LPU64" rpc_version "LPU64"\n",
+ ou, our->our_version, ou->ou_rpc_version);
+ spin_lock(&our->our_list_lock);
/* Find next osp_update_request in the list */
- if (our->our_th->ot_version == ou->ou_rpc_version) {
+ if (our->our_version == ou->ou_rpc_version &&
+ our->our_req_ready) {
list_del_init(&our->our_list);
+ spin_unlock(&our->our_list_lock);
*ourp = our;
got_req = true;
break;
}
+ spin_unlock(&our->our_list_lock);
}
spin_unlock(&ou->ou_lock);
return got_req;
}
-static void osp_update_rpc_version(struct osp_updates *ou,
- struct osp_thandle *oth)
+/**
+ * Invalidate update request
+ *
+ * Invalidate update request in the OSP sending list, so all of
+ * requests in the sending list will return error, which happens
+ * when it finds one update (with writing llog) requests fails or
+ * the OSP is evicted by remote target. see osp_send_update_thread().
+ *
+ * \param[in] osp OSP device whose update requests will be
+ * invalidated.
+ **/
+void osp_invalidate_request(struct osp_device *osp)
{
- if (oth->ot_version == 0)
+ struct lu_env env;
+ struct osp_updates *ou = osp->opd_update;
+ struct osp_update_request *our;
+ struct osp_update_request *tmp;
+ LIST_HEAD(list);
+ int rc;
+ ENTRY;
+
+ if (ou == NULL)
+ return;
+
+ rc = lu_env_init(&env, osp->opd_dt_dev.dd_lu_dev.ld_type->ldt_ctx_tags);
+ if (rc < 0) {
+ CERROR("%s: init env error: rc = %d\n", osp->opd_obd->obd_name,
+ rc);
return;
+ }
+
+ INIT_LIST_HEAD(&list);
- LASSERT(oth->ot_version == ou->ou_rpc_version);
spin_lock(&ou->ou_lock);
- ou->ou_rpc_version++;
+ /* invalidate all of request in the sending list */
+ list_for_each_entry_safe(our, tmp, &ou->ou_list, our_list) {
+ spin_lock(&our->our_list_lock);
+ if (our->our_req_ready)
+ list_move(&our->our_list, &list);
+ else
+ list_del_init(&our->our_list);
+
+ if (our->our_th->ot_super.th_result == 0)
+ our->our_th->ot_super.th_result = -EIO;
+
+ if (our->our_version >= ou->ou_rpc_version)
+ ou->ou_rpc_version = our->our_version + 1;
+ spin_unlock(&our->our_list_lock);
+
+ CDEBUG(D_HA, "%s invalidate our %p\n", osp->opd_obd->obd_name,
+ our);
+ }
+
spin_unlock(&ou->ou_lock);
+
+ /* invalidate all of request in the sending list */
+ list_for_each_entry_safe(our, tmp, &list, our_list) {
+ spin_lock(&our->our_list_lock);
+ list_del_init(&our->our_list);
+ spin_unlock(&our->our_list_lock);
+ osp_trans_callback(&env, our->our_th,
+ our->our_th->ot_super.th_result);
+ osp_thandle_put(our->our_th);
+ }
+ lu_env_fini(&env);
}
/**
our = NULL;
l_wait_event(ou->ou_waitq,
!osp_send_update_thread_running(osp) ||
- osp_get_next_request(ou, &our),
- &lwi);
+ osp_get_next_request(ou, &our), &lwi);
if (!osp_send_update_thread_running(osp)) {
- if (our != NULL && our->our_th != NULL) {
+ if (our != NULL) {
osp_trans_callback(&env, our->our_th, -EINTR);
osp_thandle_put(our->our_th);
}
break;
}
- if (our->our_req_sent == 0) {
- if (our->our_th != NULL &&
- our->our_th->ot_super.th_result != 0)
- osp_trans_callback(&env, our->our_th,
- our->our_th->ot_super.th_result);
- else
- rc = osp_send_update_req(&env, osp, our);
+ LASSERT(our->our_th != NULL);
+ if (our->our_th->ot_super.th_result != 0) {
+ osp_trans_callback(&env, our->our_th,
+ our->our_th->ot_super.th_result);
+ rc = our->our_th->ot_super.th_result;
+ } else if (OBD_FAIL_CHECK(OBD_FAIL_INVALIDATE_UPDATE)) {
+ rc = -EIO;
+ osp_trans_callback(&env, our->our_th, rc);
+ } else {
+ rc = osp_send_update_req(&env, osp, our);
}
- if (our->our_th != NULL) {
- /* Update the rpc version */
- osp_update_rpc_version(ou, our->our_th);
- /* Balanced for thandle_get in osp_trans_trigger() */
- osp_thandle_put(our->our_th);
- }
+ /* Update the rpc version */
+ spin_lock(&ou->ou_lock);
+ if (our->our_version == ou->ou_rpc_version)
+ ou->ou_rpc_version++;
+ spin_unlock(&ou->ou_lock);
+
+ /* If one update request fails, let's fail all of the requests
+ * in the sending list, because the request in the sending
+ * list are dependent on either other, continue sending these
+ * request might cause llog or filesystem corruption */
+ if (rc < 0)
+ osp_invalidate_request(osp);
+
+ /* Balanced for thandle_get in osp_check_and_set_rpc_version */
+ osp_thandle_put(our->our_th);
}
thread->t_flags = SVC_STOPPED;
}
/**
- * Trigger the request for remote updates.
- *
- * Add the request to the sending list, and wake up osp update
- * sending thread.
- *
- * \param[in] env pointer to the thread context
- * \param[in] osp pointer to the OSP device
- * \param[in] oth pointer to the transaction handler
- *
- */
-static void osp_trans_trigger(const struct lu_env *env,
- struct osp_device *osp,
- struct osp_thandle *oth)
-{
-
- CDEBUG(D_INFO, "%s: add oth %p with version "LPU64"\n",
- osp->opd_obd->obd_name, oth, oth->ot_version);
-
- LASSERT(oth->ot_magic == OSP_THANDLE_MAGIC);
- osp_thandle_get(oth);
- LASSERT(oth->ot_our != NULL);
- spin_lock(&osp->opd_update->ou_lock);
- list_add_tail(&oth->ot_our->our_list,
- &osp->opd_update->ou_list);
- spin_unlock(&osp->opd_update->ou_lock);
-
- wake_up(&osp->opd_update->ou_waitq);
-}
-
-/**
* The OSP layer dt_device_operations::dt_trans_start() interface
* to start the transaction.
*
* to stop the transaction.
*
* If the transaction is a remote transaction, related remote
- * updates will be triggered here via osp_trans_trigger().
+ * updates will be triggered at the end of this function.
*
* For synchronous mode update or any failed update, the request
* will be destroyed explicitly when the osp_trans_stop().
GOTO(out, rc = -EIO);
}
- if (th->th_sync) {
- /* if th_sync is set, then it needs to be sent
- * right away. Note: even thought the RPC has been
- * sent, it still needs to be added to the sending
- * list (see osp_trans_trigger()), so ou_rpc_version
- * can be updated correctly. */
+ CDEBUG(D_HA, "%s: add oth %p with version "LPU64"\n",
+ osp->opd_obd->obd_name, oth, our->our_version);
+
+ LASSERT(our->our_req_ready == 0);
+ spin_lock(&our->our_list_lock);
+ if (likely(!list_empty(&our->our_list))) {
+ /* notify sending thread */
+ our->our_req_ready = 1;
+ wake_up(&osp->opd_update->ou_waitq);
+ spin_unlock(&our->our_list_lock);
+ } else if (th->th_result == 0) {
+ /* if the request does not needs to be serialized,
+ * read-only request etc, let's send it right away */
+ spin_unlock(&our->our_list_lock);
rc = osp_send_update_req(env, osp, our);
- our->our_req_sent = 1;
+ } else {
+ spin_unlock(&our->our_list_lock);
+ osp_trans_callback(env, oth, th->th_result);
}
-
- osp_trans_trigger(env, osp, oth);
out:
osp_thandle_put(oth);
oub = req_capsule_client_get(pill, &RMF_OUT_UPDATE_BUF);
if (oub == NULL)
- GOTO(out_free, rc = -EPROTO);
+ GOTO(out_free, rc = err_serious(-EPROTO));
desc = ptlrpc_prep_bulk_exp(pill->rc_req, update_buf_count,
PTLRPC_BULK_OPS_COUNT,
tmp = oub;
for (i = 0; i < update_buf_count; i++, tmp++) {
if (tmp->oub_size >= OUT_MAXREQSIZE)
- GOTO(out_free, rc = -EPROTO);
+ GOTO(out_free, rc = err_serious(-EPROTO));
OBD_ALLOC(update_bufs[i], tmp->oub_size);
if (update_bufs[i] == NULL)
pill->rc_req->rq_bulk_write = 1;
rc = sptlrpc_svc_prep_bulk(pill->rc_req, desc);
if (rc != 0)
- GOTO(out_free, rc);
+ GOTO(out_free, err_serious(rc));
rc = target_bulk_io(pill->rc_req->rq_export, desc, &lwi);
if (rc < 0)
- GOTO(out_free, rc);
+ GOTO(out_free, err_serious(rc));
}
/* validate the request and calculate the total update count and
* set it to reply */
list_for_each_entry(st, &tmt->tmt_sub_thandle_list, st_sub_list) {
struct sub_thandle_cookie *stc;
- CDEBUG(mask, "st %p obd %s committed %d sub_th %p\n",
+ CDEBUG(mask, "st %p obd %s committed %d stopped %d sub_th %p\n",
st, st->st_dt->dd_lu_dev.ld_obd->obd_name,
- st->st_committed, st->st_sub_th);
+ st->st_committed, st->st_stopped, st->st_sub_th);
list_for_each_entry(stc, &st->st_cookie_list, stc_list) {
CDEBUG(mask, " cookie "DOSTID": %u\n",
}
run_test 117 "DNE: cross MDT unlink, fail MDT1 and MDT2"
+test_118() {
+ [ $MDSCOUNT -lt 2 ] && skip "needs >= 2 MDTs" && return 0
+ [ $(lustre_version_code $SINGLEMDS) -lt $(version_code 2.7.64) ] &&
+ skip "Do not support large update log before 2.7.64" &&
+ return 0
+
+ mkdir -p $DIR/$tdir
+
+ $LFS setdirstripe -c2 $DIR/$tdir/striped_dir ||
+ error "setdirstripe fails"
+ $LFS setdirstripe -c2 $DIR/$tdir/striped_dir1 ||
+ error "setdirstripe fails 1"
+ rm -rf $DIR/$tdir/striped_dir* || error "rmdir fails"
+
+ # OBD_FAIL_INVALIDATE_UPDATE 0x1705
+ do_facet mds1 "lctl set_param fail_loc=0x1705"
+ $LFS setdirstripe -c2 $DIR/$tdir/striped_dir
+ $LFS setdirstripe -c2 $DIR/$tdir/striped_dir1
+ do_facet mds1 "lctl set_param fail_loc=0x0"
+
+ replay_barrier mds1
+ $LFS setdirstripe -c2 $DIR/$tdir/striped_dir
+ $LFS setdirstripe -c2 $DIR/$tdir/striped_dir1
+ fail mds1
+
+ true
+}
+run_test 118 "invalidate osp update will not cause update log corruption"
+
complete $SECONDS
check_and_cleanup_lustre
exit_status
error "create striped directory should fail"
[ -e $DIR/$tdir/bad_striped_dir ] && error "striped dir exists"
+
+ $LFS setdirstripe -c2 $DIR/$tdir/bad_striped_dir
true
}
run_test 300p "create striped directory without space"