}
/**
+ * Check if a fid sequence is sane or not
+ * \param seq the sequence to be tested.
+ * \return true if the sequence is a sane sequence; otherwise false.
+ */
+static inline int fid_seq_is_sane(__u64 seq)
+{
+ return seq != 0;
+}
+
+/**
+ * Check if a fid is sane or not
+ * \param fid the fid to be tested.
+ * \return true if the fid is sane; otherwise false.
+ */
+static inline int fid_is_sane(const struct lu_fid *fid)
+{
+ return
+ fid != NULL &&
+ ((fid_seq_is_sane(fid_seq(fid)) && fid_oid(fid) != 0
+ && fid_ver(fid) == 0) ||
+ fid_is_igif(fid));
+}
+/**
* Check if a fid is zero.
* \param fid the fid to be tested.
* \return true if the fid is zero; otherwise false.
* Check if two fids are equal or not.
* \param f0 the first fid
* \param f1 the second fid
- * \return true if the two fids are equal; otherwise false.
+ * \return true if the two fids are equal; otherwise false.
*/
static inline int lu_fid_eq(const struct lu_fid *f0,
const struct lu_fid *f1)
#define KEY_FIEMAP "fiemap"
/* XXX unused */
#define KEY_ASYNC "async"
+#define KEY_CAPA_KEY "capa_key"
struct obd_ops {
struct module *o_owner;
ldlm_lock2handle(lock, &body->lock_handle[0]);
size[DLM_LOCKREPLY_OFF] = sizeof(*reply);
+ buffers = 3;
if (lock->l_lvb_len != 0) {
- buffers = 3;
size[DLM_REPLY_REC_OFF] = lock->l_lvb_len;
- }
+ } else
+ size[DLM_REPLY_REC_OFF] = sizeof (struct ost_lvb);
ptlrpc_req_set_repsize(req, buffers, size);
LDLM_DEBUG(lock, "replaying lock:");
ino_t ll_fid_build_ino(struct ll_sb_info *sbi,
struct ll_fid *fid);
+__u32 ll_fid_build_gen(struct ll_sb_info *sbi,
+ struct ll_fid *fid);
#endif
#endif
inode->i_ino = ll_fid_build_ino(sbi, &body->fid1);
- if (body->valid & OBD_MD_FLGENER)
- inode->i_generation = body->generation;
+ inode->i_generation = ll_fid_build_gen(sbi, &body->fid1);
+ *ll_inode_lu_fid(inode) = *((struct lu_fid*)&md->body->fid1);
if (body->valid & OBD_MD_FLATIME) {
if (body->atime > LTIME_S(inode->i_atime))
}
+__u32 ll_fid_build_gen(struct ll_sb_info *sbi, struct ll_fid *fid)
+{
+ __u32 gen = 0;
+ ENTRY;
+
+ if (fid_is_igif((struct lu_fid*)fid)) {
+ gen = lu_igif_gen((struct lu_fid*)fid);
+ }
+ RETURN(gen);
+}
+
/* called from iget5_locked->find_inode() under inode_lock spinlock */
static int fid_test_inode(struct inode *inode, void *opaque)
{
struct lustre_md *md = opaque;
+ struct lu_fid *fid = (struct lu_fid*)&md->body->fid1;
if (unlikely(!(md->body->valid & OBD_MD_FLID))) {
CERROR("MDS body missing FID\n");
return 0;
}
- return lu_fid_eq(ll_inode_lu_fid(inode),
- (struct lu_fid*)&md->body->fid1);
+ return fid_seq(ll_inode_lu_fid(inode)) == fid_seq(fid) &&
+ fid_oid(ll_inode_lu_fid(inode)) == fid_oid(fid);
}
static int fid_set_inode(struct inode *inode, void *opaque)
int old_len, new_size, old_size;
struct lustre_msg *old_msg = req->rq_reqmsg;
struct lustre_msg *new_msg;
+ int offset;
- old_len = lustre_msg_buflen(old_msg, DLM_INTENT_REC_OFF + 2);
+ if (mdc_req_is_2_0_server(req))
+ offset = 4;
+ else
+ offset = 2;
+
+ old_len = lustre_msg_buflen(old_msg, DLM_INTENT_REC_OFF + offset);
old_size = lustre_packed_msg_size(old_msg);
- lustre_msg_set_buflen(old_msg, DLM_INTENT_REC_OFF + 2,
+ lustre_msg_set_buflen(old_msg, DLM_INTENT_REC_OFF + offset,
body->eadatasize);
new_size = lustre_packed_msg_size(old_msg);
OBD_FREE(old_msg, old_size);
} else {
- lustre_msg_set_buflen(old_msg, DLM_INTENT_REC_OFF + 2, old_len);
+ lustre_msg_set_buflen(old_msg,
+ DLM_INTENT_REC_OFF + offset, old_len);
body->valid &= ~OBD_MD_FLEASIZE;
body->eadatasize = 0;
}
size[DLM_INTENT_REC_OFF+1] = 0; /* capa */
bufcount = 8;
repsize[DLM_REPLY_REC_OFF+3]=sizeof(struct lustre_capa);
- repsize[DLM_REPLY_REC_OFF+4]=sizeof(struct lustre_capa);
+ repsize[DLM_REPLY_REC_OFF+4]=sizeof(struct lustre_capa);
repbufcount = 7;
}
rc = lustre_msg_size(class_exp2cliimp(exp)->imp_msg_magic,
* large enough request buffer above we need to
* reallocate it here to hold the actual LOV EA. */
if (it->it_op & IT_OPEN) {
- int offset = DLM_INTENT_REC_OFF + 2;
+ int offset = DLM_INTENT_REC_OFF;
void *lmm;
+ if (mdc_req_is_2_0_server(req))
+ offset += 4;
+ else
+ offset += 2;
+
if (lustre_msg_buflen(req->rq_reqmsg, offset) <
body->eadatasize)
mdc_realloc_openmsg(req, body);
input, input_size, output_size, 0, request);
}
-/* This should be called with both the request and the reply still packed. */
-void mdc_store_inode_generation(struct ptlrpc_request *req, int reqoff,
- int repoff)
+/* For the fid-less server */
+static void mdc_store_inode_generation_18(struct ptlrpc_request *req,
+ int reqoff, int repoff)
{
struct mds_rec_create *rec = lustre_msg_buf(req->rq_reqmsg, reqoff,
sizeof(*rec));
rec->cr_replayfid.generation, rec->cr_replayfid.id);
}
+static void mdc_store_inode_generation_20(struct ptlrpc_request *req,
+ int reqoff, int repoff)
+{
+ struct mdt_rec_create *rec = lustre_msg_buf(req->rq_reqmsg, reqoff,
+ sizeof(*rec));
+ struct mdt_body *body = lustre_msg_buf(req->rq_repmsg, repoff,
+ sizeof(*body));
+
+ LASSERT (rec != NULL);
+ LASSERT (body != NULL);
+
+ rec->cr_fid2 = body->fid1;
+ rec->cr_ioepoch = body->ioepoch;
+ rec->cr_old_handle.cookie = body->handle.cookie;
+
+ if (!fid_is_sane(&body->fid1)) {
+ DEBUG_REQ(D_ERROR, req, "saving replay request with"
+ "insane fid");
+ LBUG();
+ }
+
+ DEBUG_REQ(D_INODE, req, "storing generation %u for ino "LPU64,
+ rec->cr_fid1.f_oid, rec->cr_fid2.f_seq);
+}
+
+/* This should be called with both the request and the reply still packed. */
+void mdc_store_inode_generation(struct ptlrpc_request *req, int reqoff,
+ int repoff)
+{
+ if (mdc_req_is_2_0_server(req))
+ mdc_store_inode_generation_20(req, reqoff, repoff);
+ else
+ mdc_store_inode_generation_18(req, reqoff, repoff);
+}
+
#ifdef CONFIG_FS_POSIX_ACL
static
int mdc_unpack_acl(struct obd_export *exp, struct ptlrpc_request *req,
if (mod->mod_och != NULL)
mod->mod_och->och_mod = NULL;
- OBD_FREE(mod, sizeof(*mod));
+ OBD_FREE_PTR(mod);
req->rq_cb_data = NULL;
}
file_fh = &och->och_fh;
CDEBUG(D_RPCTRACE, "updating handle from "LPX64" to "LPX64"\n",
file_fh->cookie, body->handle.cookie);
- memcpy(&old, file_fh, sizeof(old));
- memcpy(file_fh, &body->handle, sizeof(*file_fh));
+ old = *file_fh;
+ *file_fh = body->handle;
}
close_req = mod->mod_close_req;
+
if (close_req != NULL) {
- struct mds_body *close_body;
LASSERT(lustre_msg_get_opc(close_req->rq_reqmsg) == MDS_CLOSE);
- close_body = lustre_msg_buf(close_req->rq_reqmsg, REQ_REC_OFF,
- sizeof(*close_body));
- if (och != NULL)
- LASSERT(!memcmp(&old, &close_body->handle, sizeof old));
- DEBUG_REQ(D_RPCTRACE, close_req, "updating close with new fh");
- memcpy(&close_body->handle, &body->handle,
- sizeof(close_body->handle));
+ if (mdc_req_is_2_0_server(close_req)) {
+ struct mdt_epoch *epoch = NULL;
+
+ epoch = lustre_msg_buf(close_req->rq_reqmsg,
+ REQ_REC_OFF, sizeof(*epoch));
+ LASSERT(epoch);
+ if (och != NULL)
+ LASSERT(!memcmp(&old, &epoch->handle,
+ sizeof(old)));
+ DEBUG_REQ(D_RPCTRACE, close_req,
+ "updating close with new fh");
+ epoch->handle = body->handle;
+ } else {
+ struct mds_body *close_body = NULL;
+
+ close_body = lustre_msg_buf(close_req->rq_reqmsg,
+ REQ_REC_OFF,
+ sizeof(*close_body));
+ if (och != NULL)
+ LASSERT(!memcmp(&old, &close_body->handle,
+ sizeof(old)));
+ DEBUG_REQ(D_RPCTRACE, close_req,
+ "updating close with new fh");
+ close_body->handle = body->handle;
+ }
}
EXIT;
}
-void mdc_set_open_replay_data(struct obd_client_handle *och,
- struct ptlrpc_request *open_req)
+static void mdc_set_open_replay_data_20(struct obd_client_handle *och,
+ struct ptlrpc_request *open_req)
+{
+ struct mdc_open_data *mod;
+ struct obd_import *imp = open_req->rq_import;
+ struct mdt_rec_create *rec = lustre_msg_buf(open_req->rq_reqmsg,
+ DLM_INTENT_REC_OFF,
+ sizeof(*rec));
+ struct mdt_body *body = lustre_msg_buf(open_req->rq_repmsg,
+ DLM_REPLY_REC_OFF,
+ sizeof(*body));
+
+ /* If request is not eligible for replay, just bail out */
+ if (!open_req->rq_replay)
+ return;
+
+ /* incoming message in my byte order (it's been swabbed) */
+ LASSERT(rec != NULL);
+ LASSERT(lustre_rep_swabbed(open_req, DLM_REPLY_REC_OFF));
+ /* outgoing messages always in my byte order */
+ LASSERT(body != NULL);
+
+ /* Only if the import is replayable, we set replay_open data */
+ if (och && imp->imp_replayable) {
+ OBD_ALLOC_PTR(mod);
+ if (mod == NULL) {
+ DEBUG_REQ(D_ERROR, open_req,
+ "can't allocate mdc_open_data");
+ return;
+ }
+
+ spin_lock(&open_req->rq_lock);
+ och->och_mod = mod;
+ mod->mod_och = och;
+ mod->mod_open_req = open_req;
+ open_req->rq_cb_data = mod;
+ open_req->rq_commit_cb = mdc_commit_open;
+ spin_unlock(&open_req->rq_lock);
+ }
+
+ rec->cr_fid2 = body->fid1;
+ rec->cr_ioepoch = body->ioepoch;
+ rec->cr_old_handle.cookie = body->handle.cookie;
+ open_req->rq_replay_cb = mdc_replay_open;
+ if (!fid_is_sane(&body->fid1)) {
+ DEBUG_REQ(D_ERROR, open_req, "saving replay request with "
+ "insane fid");
+ LBUG();
+ }
+
+ DEBUG_REQ(D_RPCTRACE, open_req, "set up replay data");
+}
+
+static void mdc_set_open_replay_data_18(struct obd_client_handle *och,
+ struct ptlrpc_request *open_req)
{
struct mdc_open_data *mod;
struct mds_rec_create *rec = lustre_msg_buf(open_req->rq_reqmsg,
DEBUG_REQ(D_RPCTRACE, open_req, "set up replay data");
}
+void mdc_set_open_replay_data(struct obd_client_handle *och,
+ struct ptlrpc_request *open_req)
+{
+ if (mdc_req_is_2_0_server(open_req))
+ mdc_set_open_replay_data_20(och, open_req);
+ else
+ mdc_set_open_replay_data_18(och, open_req);
+}
+
void mdc_clear_open_replay_data(struct obd_client_handle *och)
{
struct mdc_open_data *mod = och->och_mod;
RETURN(-EINVAL);
}
+ if (KEY_IS(KEY_CAPA_KEY)) {
+ RETURN(0);
+ }
+
if (!KEY_IS(KEY_MDS_CONN))
RETURN(-EINVAL);
RETURN(0);
}
+/**
+ * must be called under imp lock
+ */
+static int ptlrpc_first_transno(struct obd_import *imp, __u64 *transno)
+{
+ struct ptlrpc_request *req;
+ struct list_head *tmp;
+
+ if (list_empty(&imp->imp_replay_list))
+ return 0;
+ tmp = imp->imp_replay_list.next;
+ req = list_entry(tmp, struct ptlrpc_request, rq_replay_list);
+ *transno = req->rq_transno;
+ if (req->rq_transno == 0) {
+ DEBUG_REQ(D_ERROR, req, "zero transno in replay");
+ LBUG();
+ }
+
+ return 1;
+}
+
int ptlrpc_connect_import(struct obd_import *imp, char *new_uuid)
{
struct obd_device *obd = imp->imp_obd;
+ int set_transno = 0;
int initial_connect = 0;
int rc;
__u64 committed_before_reconnect = 0;
else
committed_before_reconnect = imp->imp_peer_committed_transno;
+ set_transno = ptlrpc_first_transno(imp,
+ &imp->imp_connect_data.ocd_transno);
+
spin_unlock(&imp->imp_lock);
if (new_uuid) {
MSG_CONNECT_INITIAL);
}
+ if (set_transno)
+ lustre_msg_add_op_flags(request->rq_reqmsg,
+ MSG_CONNECT_TRANSNO);
+
DEBUG_REQ(D_RPCTRACE, request, "%sconnect request %d",
aa->pcaa_initial_connect ? "initial " : "re",
imp->imp_conn_cnt);
ptlrpc_req_set_repsize(req, 1, NULL);
req->rq_send_state = LUSTRE_IMP_REPLAY_WAIT;
- lustre_msg_add_flags(req->rq_reqmsg, MSG_LAST_REPLAY);
+ lustre_msg_add_flags(req->rq_reqmsg,
+ MSG_LOCK_REPLAY_DONE |
+ MSG_REQ_REPLAY_DONE |
+ MSG_LAST_REPLAY);
+
if (imp->imp_delayed_recovery)
lustre_msg_add_flags(req->rq_reqmsg, MSG_DELAY_REPLAY);
req->rq_timeout *= 3;
df -P $DIR || df -P $DIR || true # reconnect
wait_mds_recovery_done || error "MDS recovery not done"
+ # For interop with 2.0 only:
+ # FIXME just because recovery is done doesn't mean we've finished
+ # orphan cleanup. Fake it with a sleep for now...
+ sleep 10
+
AFTERUSED=`df -P $DIR | tail -1 | awk '{ print $3 }'`
log "before $BEFOREUSED, after $AFTERUSED"
[ $AFTERUSED -gt $((BEFOREUSED + 20)) ] && \
wait_mds_recovery_done () {
local timeout=`do_facet mds lctl get_param -n timeout`
+ local mdtdevice=$(get_mds_mdt_device_proc_path)
#define OBD_RECOVERY_TIMEOUT (obd_timeout * 5 / 2)
# as we are in process of changing obd_timeout in different ways
# let's set MAX longer than that
local MAX=$(( timeout * 4 ))
local WAIT=0
while [ $WAIT -lt $MAX ]; do
- STATUS=`do_facet mds "lctl get_param -n mds.*-MDT*.recovery_status | grep status"`
+ STATUS=`do_facet mds "lctl get_param -n ${mdtdevice}.*-MDT*.recovery_status | grep status"`
echo $STATUS | grep COMPLETE && return 0
sleep 5
WAIT=$((WAIT + 5))
mds_evict_client() {
UUID=`lctl get_param -n mdc.${mds_svc}-mdc-*.uuid`
- do_facet mds "lctl set_param -n mds.${mds_svc}.evict_client $UUID"
+ local mdtdevice=$(get_mds_mdt_device_proc_path)
+ do_facet mds "lctl set_param -n ${mdtdevice}.${mds_svc}.evict_client $UUID"
}
ost_evict_client() {
echo "${ost}-osc-MDT0000"
fi
}
+
+get_mds_mdt_device_proc_path() {
+ local major=$(get_mds_version_major)
+ local minor=$(get_mds_version_minor)
+ if [ $major -le 1 -a $minor -le 8 ] ; then
+ echo "mds"
+ else
+ echo "mdt"
+ fi
+}
+
+