#define FSFILT_OP_SETATTR 8
#define FSFILT_OP_LINK 9
#define FSFILT_OP_CANCEL_UNLINK 10
+#define FSFILT_OP_NOOP 15
#define fsfilt_check_slow(start, timeout, msg) \
do { \
* MDS REQ RECORDS
*/
+/* FIXME: this is different from HEAD, adjust it
+ * while merge GSS */
+#define MDS_REQ_REC_OFF 0
+
+#define MDS_REQ_INTENT_LOCKREQ_OFF 0
+#define MDS_REQ_INTENT_IT_OFF 1
+#define MDS_REQ_INTENT_REC_OFF 2
+
/* opcodes */
typedef enum {
MDS_GETATTR = 33,
LLOG_ORIGIN_HANDLE_CLOSE = 505,
LLOG_ORIGIN_CONNECT = 506,
LLOG_CATINFO = 507, /* for lfs catinfo */
+ LLOG_ORIGIN_HANDLE_PREV_BLOCK = 508,
};
struct llogd_body {
extern void llog_free_handle(struct llog_handle *handle);
int llog_process(struct llog_handle *loghandle, llog_cb_t cb,
void *data, void *catdata);
+int llog_reverse_process(struct llog_handle *loghandle, llog_cb_t cb,
+ void *data, void *catdata);
extern int llog_cancel_rec(struct llog_handle *loghandle, int index);
extern int llog_close(struct llog_handle *cathandle);
int llog_cat_cancel_records(struct llog_handle *cathandle, int count,
struct llog_cookie *cookies);
int llog_cat_process(struct llog_handle *cat_llh, llog_cb_t cb, void *data);
+int llog_cat_reverse_process(struct llog_handle *cat_llh, llog_cb_t cb, void *data);
int llog_cat_set_first_idx(struct llog_handle *cathandle, int index);
/* llog_obd.c */
int (*lop_destroy)(struct llog_handle *handle);
int (*lop_next_block)(struct llog_handle *h, int *curr_idx,
int next_idx, __u64 *offset, void *buf, int len);
+ int (*lop_prev_block)(struct llog_handle *h,
+ int prev_idx, void *buf, int len);
int (*lop_create)(struct llog_ctxt *ctxt, struct llog_handle **,
struct llog_logid *logid, char *name);
int (*lop_close)(struct llog_handle *handle);
RETURN(rc);
}
+static inline int llog_prev_block(struct llog_handle *loghandle,
+ int prev_idx, void *buf, int len)
+{
+ struct llog_operations *lop;
+ int rc;
+ ENTRY;
+
+ rc = llog_handle2ops(loghandle, &lop);
+ if (rc)
+ RETURN(rc);
+ if (lop->lop_prev_block == NULL)
+ RETURN(-EOPNOTSUPP);
+
+ rc = lop->lop_prev_block(loghandle, prev_idx, buf, len);
+ RETURN(rc);
+}
+
static inline int llog_create(struct llog_ctxt *ctxt, struct llog_handle **res,
struct llog_logid *logid, char *name)
{
obd_valid valid, unsigned int ea_size,
struct ptlrpc_request **request);
int mdc_getattr_name(struct obd_export *exp, struct ll_fid *fid,
- char *filename, int namelen, unsigned long valid,
+ const char *filename, int namelen, unsigned long valid,
unsigned int ea_size, struct ptlrpc_request **request);
int mdc_setattr(struct obd_export *exp, struct mdc_op_data *data,
struct iattr *iattr, void *ea, int ealen, void *ea2, int ea2len,
void ptlrpc_add_rqs_to_pool(struct ptlrpc_request_pool *pool, int num_rq);
struct ptlrpc_request_pool *ptlrpc_init_rq_pool(int, int,
void (*populate_pool)(struct ptlrpc_request_pool *, int));
-struct ptlrpc_request *ptlrpc_prep_req(struct obd_import *imp, int opcode,
+struct ptlrpc_request *ptlrpc_prep_req(struct obd_import *imp, __u32 version, int opcode,
int count, int *lengths, char **bufs);
-struct ptlrpc_request *ptlrpc_prep_req_pool(struct obd_import *imp, int opcode,
- int count, int *lengths,
- char **bufs,
+struct ptlrpc_request *ptlrpc_prep_req_pool(struct obd_import *imp, __u32 version, int opcode,
+ int count, int *lengths, char **bufs,
struct ptlrpc_request_pool *pool);
void ptlrpc_free_req(struct ptlrpc_request *request);
void ptlrpc_req_finished(struct ptlrpc_request *request);
/* ptlrpc/pack_generic.c */
int lustre_msg_swabbed(struct lustre_msg *msg);
+int lustre_msg_check_version(struct lustre_msg *msg, __u32 version);
int lustre_pack_request(struct ptlrpc_request *, int count, int *lens,
char **bufs);
int lustre_pack_reply(struct ptlrpc_request *, int count, int *lens,
/* ptlrpc/llog_server.c */
int llog_origin_handle_create(struct ptlrpc_request *req);
+int llog_origin_handle_prev_block(struct ptlrpc_request *req);
int llog_origin_handle_next_block(struct ptlrpc_request *req);
int llog_origin_handle_read_header(struct ptlrpc_request *req);
int llog_origin_handle_close(struct ptlrpc_request *req);
instant_cancel = 1;
req = ptlrpc_prep_req(lock->l_export->exp_imp_reverse,
- LDLM_BL_CALLBACK, 1, &size, NULL);
+ LUSTRE_DLM_VERSION, LDLM_BL_CALLBACK,
+ 1, &size, NULL);
if (req == NULL) {
l_unlock(&lock->l_resource->lr_namespace->ns_lock);
RETURN(-ENOMEM);
up(&lock->l_resource->lr_lvb_sem);
req = ptlrpc_prep_req(lock->l_export->exp_imp_reverse,
- LDLM_CP_CALLBACK, buffers, size, NULL);
+ LUSTRE_DLM_VERSION, LDLM_CP_CALLBACK,
+ buffers, size, NULL);
if (req == NULL)
RETURN(-ENOMEM);
LASSERT(lock != NULL);
req = ptlrpc_prep_req(lock->l_export->exp_imp_reverse,
- LDLM_GL_CALLBACK, 1, &size, NULL);
+ LUSTRE_DLM_VERSION, LDLM_GL_CALLBACK,
+ 1, &size, NULL);
if (req == NULL)
RETURN(-ENOMEM);
LDLM_DEBUG_NOLOCK("server-side enqueue handler START");
- dlm_req = lustre_swab_reqbuf (req, 0, sizeof (*dlm_req),
+ dlm_req = lustre_swab_reqbuf (req, MDS_REQ_INTENT_LOCKREQ_OFF,
+ sizeof (*dlm_req),
lustre_swab_ldlm_request);
if (dlm_req == NULL) {
CERROR ("Can't unpack dlm_req\n");
struct ldlm_lock *lock;
struct ldlm_request *body;
struct ldlm_reply *reply;
- int rc, size[2] = {sizeof(*body), lvb_len}, req_passed_in = 1;
+ int rc, size[] = {sizeof(*body), lvb_len}, req_passed_in = 1;
int is_replay = *flags & LDLM_FL_REPLAY;
int cleanup_phase = 0;
ENTRY;
cleanup_phase = 2;
if (req == NULL) {
- req = ptlrpc_prep_req(class_exp2cliimp(exp), LDLM_ENQUEUE, 1,
- size, NULL);
+ req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_DLM_VERSION,
+ LDLM_ENQUEUE, 1, size, NULL);
if (req == NULL)
GOTO(cleanup, rc = -ENOMEM);
req_passed_in = 0;
- } else if (req->rq_reqmsg->buflens[0] != sizeof(*body))
- LBUG();
+ } else {
+ LASSERTF(req->rq_reqmsg->buflens[MDS_REQ_INTENT_LOCKREQ_OFF] ==
+ sizeof(*body), "buflen[%d] = %d, not %d\n",
+ MDS_REQ_INTENT_LOCKREQ_OFF,
+ req->rq_reqmsg->buflens[MDS_REQ_INTENT_LOCKREQ_OFF],
+ sizeof(*body));
+ }
/* Dump lock data into the request buffer */
- body = lustre_msg_buf(req->rq_reqmsg, 0, sizeof (*body));
+ body = lustre_msg_buf(req->rq_reqmsg, MDS_REQ_INTENT_LOCKREQ_OFF,
+ sizeof(*body));
ldlm_lock2desc(lock, &body->lock_desc);
body->lock_flags = *flags;
/* Continue as normal. */
if (!req_passed_in) {
- int buffers = 1;
- if (lvb_len > 0)
- buffers = 2;
size[0] = sizeof(*reply);
- req->rq_replen = lustre_msg_size(buffers, size);
+ req->rq_replen = lustre_msg_size(1 + (lvb_len > 0), size);
}
lock->l_conn_export = exp;
lock->l_export = NULL;
LDLM_DEBUG(lock, "client-side convert");
req = ptlrpc_prep_req(class_exp2cliimp(lock->l_conn_export),
- LDLM_CONVERT, 1, &size, NULL);
+ LUSTRE_DLM_VERSION, LDLM_CONVERT, 1, &size, NULL);
if (!req)
GOTO(out, rc = -ENOMEM);
goto local_cancel;
}
- req = ptlrpc_prep_req(imp, LDLM_CANCEL, 1, &size, NULL);
+ req = ptlrpc_prep_req(imp, LUSTRE_DLM_VERSION, LDLM_CANCEL,
+ 1, &size, NULL);
if (!req)
GOTO(out, rc = -ENOMEM);
req->rq_no_resend = 1;
flags = LDLM_FL_REPLAY;
size[0] = sizeof(*body);
- req = ptlrpc_prep_req(imp, LDLM_ENQUEUE, 1, size, NULL);
+ req = ptlrpc_prep_req(imp, LUSTRE_DLM_VERSION, LDLM_ENQUEUE,
+ 1, size, NULL);
if (!req)
RETURN(-ENOMEM);
bufs[1] = NULL;
}
size = data->ioc_plen1;
- req = ptlrpc_prep_req(sbi2mdc(sbi)->cl_import, LLOG_CATINFO,
+ req = ptlrpc_prep_req(sbi2mdc(sbi)->cl_import,
+ LUSTRE_LOG_VERSION, LLOG_CATINFO,
2, lens, bufs);
if (!req)
GOTO(out_catinfo, rc = -ENOMEM);
if (rc)
RETURN(rc);
- mdc_store_inode_generation(request, 2, 1);
+ mdc_store_inode_generation(request, MDS_REQ_INTENT_REC_OFF, 1);
inode = ll_create_node(dir, dentry->d_name.name, dentry->d_name.len,
NULL, 0, mode, 0, it);
if (IS_ERR(inode)) {
#include <linux/lustre_mds.h>
-void mdc_pack_req_body(struct ptlrpc_request *);
+void mdc_pack_req_body(struct ptlrpc_request *req, int offset,
+ __u64 valid, struct ll_fid *fid, int ea_size);
void mdc_pack_rep_body(struct ptlrpc_request *);
-void mdc_readdir_pack(struct ptlrpc_request *req, __u64 offset, __u32 size,
- struct ll_fid *mdc_fid);
+void mdc_readdir_pack(struct ptlrpc_request *req, int pos, __u64 offset,
+ __u32 size, struct ll_fid *mdc_fid);
void mdc_getattr_pack(struct ptlrpc_request *req, int valid, int offset,
int flags, struct mdc_op_data *data);
-void mdc_setattr_pack(struct ptlrpc_request *req,
+void mdc_setattr_pack(struct ptlrpc_request *req, int offset,
struct mdc_op_data *data,
struct iattr *iattr, void *ea, int ealen,
void *ea2, int ea2len);
#endif
#endif
-void mdc_readdir_pack(struct ptlrpc_request *req, __u64 offset, __u32 size,
- struct ll_fid *mdc_fid)
+void mdc_readdir_pack(struct ptlrpc_request *req, int pos, __u64 offset,
+ __u32 size, struct ll_fid *mdc_fid)
{
struct mds_body *b;
- b = lustre_msg_buf(req->rq_reqmsg, 0, sizeof (*b));
+ b = lustre_msg_buf(req->rq_reqmsg, pos, sizeof (*b));
b->fsuid = current->fsuid;
b->fsgid = current->fsgid;
b->capability = current->cap_effective;
b->capability = current->cap_effective;
}
-void mdc_pack_req_body(struct ptlrpc_request *req)
+void mdc_pack_req_body(struct ptlrpc_request *req, int offset,
+ __u64 valid, struct ll_fid *fid, int ea_size)
{
- struct mds_body *b = lustre_msg_buf(req->rq_reqmsg, 0, sizeof (*b));
+ struct mds_body *b = lustre_msg_buf(req->rq_reqmsg, offset, sizeof(*b));
+
+ if (fid)
+ b->fid1 = *fid;
+ b->valid = valid;
+ b->eadatasize = ea_size;
mdc_pack_body(b);
}
}
}
-void mdc_setattr_pack(struct ptlrpc_request *req, struct mdc_op_data *data,
- struct iattr *iattr, void *ea, int ealen,
- void *ea2, int ea2len)
+void mdc_setattr_pack(struct ptlrpc_request *req, int offset,
+ struct mdc_op_data *data, struct iattr *iattr,
+ void *ea, int ealen, void *ea2, int ea2len)
{
- struct mds_rec_setattr *rec = lustre_msg_buf(req->rq_reqmsg, 0,
+ struct mds_rec_setattr *rec = lustre_msg_buf(req->rq_reqmsg, offset,
sizeof (*rec));
rec->sa_opcode = REINT_SETATTR;
rec->sa_fsuid = current->fsuid;
if (ealen == 0)
return;
- memcpy(lustre_msg_buf(req->rq_reqmsg, 1, ealen), ea, ealen);
+ memcpy(lustre_msg_buf(req->rq_reqmsg, offset + 1, ealen), ea, ealen);
if (ea2len == 0)
return;
- memcpy(lustre_msg_buf(req->rq_reqmsg, 2, ea2len), ea2, ea2len);
+ memcpy(lustre_msg_buf(req->rq_reqmsg, offset + 2, ea2len), ea2, ea2len);
}
void mdc_unlink_pack(struct ptlrpc_request *req, int offset,
}
}
-void mdc_getattr_pack(struct ptlrpc_request *req, int valid, int offset,
+void mdc_getattr_pack(struct ptlrpc_request *req, int offset, int valid,
int flags, struct mdc_op_data *data)
{
struct mds_body *b;
struct ldlm_res_id res_id =
{ .name = {data->fid1.id, data->fid1.generation} };
ldlm_policy_data_t policy = { .l_inodebits = { MDS_INODELOCK_LOOKUP } };
- int size[5] = {sizeof(struct ldlm_request), sizeof(struct ldlm_intent)};
- int rc, flags = extra_lock_flags | LDLM_FL_HAS_INTENT;
- int repbufcnt = 3, repsize[4] = {sizeof(struct ldlm_reply),
- sizeof(struct mds_body),
- obddev->u.cli.cl_max_mds_easize};
- struct ldlm_reply *dlm_rep;
- struct ldlm_intent *lit;
struct ldlm_request *lockreq;
+ struct ldlm_intent *lit;
+ int size[5] = {[MDS_REQ_INTENT_LOCKREQ_OFF] = sizeof(*lockreq),
+ [MDS_REQ_INTENT_IT_OFF] = sizeof(*lit) };
+ struct ldlm_reply *dlm_rep;
+ int repsize[4] = {sizeof(*dlm_rep),
+ sizeof(struct mds_body),
+ obddev->u.cli.cl_max_mds_easize};
void *eadata;
unsigned long irqflags;
+ int repbufcnt = 3, req_buffers = 2;
+ int rc, flags = extra_lock_flags | LDLM_FL_HAS_INTENT;
ENTRY;
// LDLM_DEBUG_NOLOCK("mdsintent=%s,name=%s,dir=%lu",
if (it->it_op & IT_OPEN) {
it->it_create_mode |= S_IFREG;
- size[2] = sizeof(struct mds_rec_create);
- size[3] = data->namelen + 1;
+ size[req_buffers++] = sizeof(struct mds_rec_create);
+ size[req_buffers++] = data->namelen + 1;
/* As an optimization, we allocate an RPC request buffer for
* at least a default-sized LOV EA even if we aren't sending
* one. We grow the whole request to the next power-of-two
* size since we get that much from a slab allocation anyways.
* This avoids an allocation below in the common case where
* we need to save a default-sized LOV EA for open replay. */
- size[4] = max(lmmsize, obddev->u.cli.cl_default_mds_easize);
- rc = lustre_msg_size(5, size);
+ size[req_buffers++] = max(lmmsize,
+ obddev->u.cli.cl_default_mds_easize);
+ rc = lustre_msg_size(req_buffers, size);
if (rc & (rc - 1))
- size[4] = min(size[4] + round_up(rc) - rc,
- obddev->u.cli.cl_max_mds_easize);
- req = ptlrpc_prep_req(class_exp2cliimp(exp), LDLM_ENQUEUE,
- 5, size, NULL);
+ size[req_buffers - 1] = min(size[req_buffers - 1] +
+ round_up(rc) - rc,
+ obddev->u.cli.cl_max_mds_easize);
+ req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_DLM_VERSION,
+ LDLM_ENQUEUE, req_buffers, size, NULL);
if (!req)
RETURN(-ENOMEM);
spin_unlock_irqrestore (&req->rq_lock, irqflags);
/* pack the intent */
- lit = lustre_msg_buf(req->rq_reqmsg, 1, sizeof (*lit));
+ lit = lustre_msg_buf(req->rq_reqmsg, MDS_REQ_INTENT_IT_OFF,
+ sizeof (*lit));
lit->opc = (__u64)it->it_op;
/* pack the intended request */
- mdc_open_pack(req, 2, data, it->it_create_mode, 0,
+ mdc_open_pack(req, MDS_REQ_INTENT_REC_OFF, data,
+ it->it_create_mode, 0,
it->it_flags, lmm, lmmsize);
repsize[repbufcnt++] = LUSTRE_POSIX_ACL_MAX_SIZE;
} else if (it->it_op & IT_UNLINK) {
- size[2] = sizeof(struct mds_rec_unlink);
- size[3] = data->namelen + 1;
+ size[req_buffers++] = sizeof(struct mds_rec_unlink);
+ size[req_buffers++] = data->namelen + 1;
policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
- req = ptlrpc_prep_req(class_exp2cliimp(exp), LDLM_ENQUEUE, 4,
- size, NULL);
+ req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_DLM_VERSION,
+ LDLM_ENQUEUE, req_buffers, size, NULL);
if (!req)
RETURN(-ENOMEM);
/* pack the intent */
- lit = lustre_msg_buf(req->rq_reqmsg, 1, sizeof (*lit));
+ lit = lustre_msg_buf(req->rq_reqmsg, MDS_REQ_INTENT_IT_OFF,
+ sizeof (*lit));
lit->opc = (__u64)it->it_op;
/* pack the intended request */
- mdc_unlink_pack(req, 2, data);
-
+ mdc_unlink_pack(req, MDS_REQ_INTENT_REC_OFF, data);
+ /* get ready for the reply */
repsize[repbufcnt++] = obddev->u.cli.cl_max_mds_cookiesize;
} else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) {
obd_valid valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE |
OBD_MD_FLACL;
- size[2] = sizeof(struct mds_body);
- size[3] = data->namelen + 1;
+ size[req_buffers++] = sizeof(struct mds_body);
+ size[req_buffers++] = data->namelen + 1;
if (it->it_op & IT_GETATTR)
policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
- req = ptlrpc_prep_req(class_exp2cliimp(exp), LDLM_ENQUEUE, 4,
- size, NULL);
+ req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_DLM_VERSION,
+ LDLM_ENQUEUE, req_buffers, size, NULL);
if (!req)
RETURN(-ENOMEM);
/* pack the intent */
- lit = lustre_msg_buf(req->rq_reqmsg, 1, sizeof (*lit));
+ lit = lustre_msg_buf(req->rq_reqmsg, MDS_REQ_INTENT_IT_OFF,
+ sizeof (*lit));
lit->opc = (__u64)it->it_op;
/* pack the intended request */
- mdc_getattr_pack(req, valid, 2, it->it_flags, data);
-
+ mdc_getattr_pack(req, MDS_REQ_INTENT_REC_OFF, valid,
+ it->it_flags, data);
+ /* get ready for the reply */
repsize[repbufcnt++] = LUSTRE_POSIX_ACL_MAX_SIZE;
} else if (it->it_op == IT_READDIR) {
policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
- req = ptlrpc_prep_req(class_exp2cliimp(exp), LDLM_ENQUEUE, 1,
- size, NULL);
+ req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_DLM_VERSION,
+ LDLM_ENQUEUE, 1, size, NULL);
if (!req)
RETURN(-ENOMEM);
+ /* get ready for the reply */
repbufcnt = 1;
} else {
LBUG();
mdc_get_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
rc = ldlm_cli_enqueue(exp, req, obddev->obd_namespace, res_id,
- lock_type,&policy,lock_mode, &flags,cb_blocking,
+ lock_type, &policy,lock_mode, &flags, cb_blocking,
cb_completion, NULL, cb_data, NULL, 0, NULL,
lockh);
mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
/* Similarly, if we're going to replay this request, we don't want to
* actually get a lock, just perform the intent. */
if (req->rq_transno || req->rq_replay) {
- lockreq = lustre_msg_buf(req->rq_reqmsg, 0, sizeof (*lockreq));
+ lockreq = lustre_msg_buf(req->rq_reqmsg,
+ MDS_REQ_INTENT_LOCKREQ_OFF,
+ sizeof(*lockreq));
lockreq->lock_flags |= LDLM_FL_INTENT_ONLY;
}
* large enough request buffer above we need to
* reallocate it here to hold the actual LOV EA. */
if (it->it_op & IT_OPEN) {
- if (req->rq_reqmsg->buflens[4] <
+ int pos = MDS_REQ_INTENT_REC_OFF + 2;
+
+ if (req->rq_reqmsg->buflens[pos] <
body->eadatasize)
mdc_realloc_openmsg(req, body, size);
- lmm = lustre_msg_buf(req->rq_reqmsg, 4,
+ lmm = lustre_msg_buf(req->rq_reqmsg, pos,
body->eadatasize);
if (lmm)
memcpy(lmm, eadata, body->eadatasize);
struct mds_rec_setattr *rec;
struct mdc_rpc_lock *rpc_lock;
struct obd_device *obd = exp->exp_obd;
- int rc, bufcount = 1, size[3] = {sizeof(*rec), ealen, ea2len};
+ int size[] = { sizeof(*rec), ealen, ea2len};
+ int rc, bufcount = 1;
ENTRY;
LASSERT(iattr != NULL);
if (ealen > 0) {
- bufcount = 2;
+ bufcount++;
if (ea2len > 0)
- bufcount = 3;
+ bufcount++;
}
- req = ptlrpc_prep_req(class_exp2cliimp(exp), MDS_REINT, bufcount,
- size, NULL);
+ req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_MDS_VERSION,
+ MDS_REINT, bufcount, size, NULL);
if (req == NULL)
RETURN(-ENOMEM);
if (iattr->ia_valid & (ATTR_MTIME | ATTR_CTIME))
CDEBUG(D_INODE, "setting mtime %lu, ctime %lu\n",
LTIME_S(iattr->ia_mtime), LTIME_S(iattr->ia_ctime));
- mdc_setattr_pack(req, data, iattr, ea, ealen, ea2, ea2len);
+ mdc_setattr_pack(req, MDS_REQ_REC_OFF, data, iattr, ea, ealen, ea2, ea2len);
size[0] = sizeof(struct mds_body);
req->rq_replen = lustre_msg_size(1, size);
{
struct obd_device *obd = exp->exp_obd;
struct ptlrpc_request *req;
- int rc, size[3] = {sizeof(struct mds_rec_create), op_data->namelen + 1};
- int level, bufcount = 2;
+ int size[] = { sizeof(struct mds_rec_create), op_data->namelen + 1, 0};
+ int rc, level, bufcount = 2;
ENTRY;
if (data && datalen) {
bufcount++;
}
- req = ptlrpc_prep_req(class_exp2cliimp(exp), MDS_REINT, bufcount,
- size, NULL);
+ req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_MDS_VERSION,
+ MDS_REINT, bufcount, size, NULL);
if (req == NULL)
RETURN(-ENOMEM);
/* mdc_create_pack fills msg->bufs[1] with name
* and msg->bufs[2] with tgt, for symlinks or lov MD data */
- mdc_create_pack(req, 0, op_data, data, datalen, mode,
+ mdc_create_pack(req, MDS_REQ_REC_OFF, op_data, data, datalen, mode,
uid, gid, cap_effective, rdev);
size[0] = sizeof(struct mds_body);
{
struct obd_device *obd = class_exp2obd(exp);
struct ptlrpc_request *req = *request;
- int rc, size[2] = {sizeof(struct mds_rec_unlink), data->namelen + 1};
+ int rc, size[] = { sizeof(struct mds_rec_unlink), data->namelen + 1};
ENTRY;
LASSERT(req == NULL);
- req = ptlrpc_prep_req(class_exp2cliimp(exp), MDS_REINT, 2, size,
- NULL);
+ req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_MDS_VERSION,
+ MDS_REINT, 2, size, NULL);
if (req == NULL)
RETURN(-ENOMEM);
*request = req;
size[2] = obd->u.cli.cl_max_mds_cookiesize;
req->rq_replen = lustre_msg_size(3, size);
- mdc_unlink_pack(req, 0, data);
+ mdc_unlink_pack(req, MDS_REQ_REC_OFF, data);
rc = mdc_reint(req, obd->u.cli.cl_rpc_lock, LUSTRE_IMP_FULL);
if (rc == -ERESTARTSYS)
{
struct obd_device *obd = exp->exp_obd;
struct ptlrpc_request *req;
- int rc, size[2] = {sizeof(struct mds_rec_link), data->namelen + 1};
+ int rc, size[] = { sizeof(struct mds_rec_link), data->namelen + 1};
ENTRY;
- req = ptlrpc_prep_req(class_exp2cliimp(exp), MDS_REINT, 2, size,
- NULL);
+ req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_MDS_VERSION,
+ MDS_REINT, 2, size, NULL);
if (req == NULL)
RETURN(-ENOMEM);
- mdc_link_pack(req, 0, data);
+ mdc_link_pack(req, MDS_REQ_REC_OFF, data);
size[0] = sizeof(struct mds_body);
req->rq_replen = lustre_msg_size(1, size);
{
struct obd_device *obd = exp->exp_obd;
struct ptlrpc_request *req;
- int rc, size[3] = {sizeof(struct mds_rec_rename), oldlen + 1,
- newlen + 1};
+ int rc, size[] = { sizeof(struct mds_rec_rename), oldlen +1, newlen +1};
ENTRY;
- req = ptlrpc_prep_req(class_exp2cliimp(exp), MDS_REINT, 3, size,
- NULL);
+ req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_MDS_VERSION,
+ MDS_REINT, 3, size, NULL);
if (req == NULL)
RETURN(-ENOMEM);
- mdc_rename_pack(req, 0, data, old, oldlen, new, newlen);
+ mdc_rename_pack(req, MDS_REQ_REC_OFF, data, old, oldlen, new, newlen);
size[0] = sizeof(struct mds_body);
size[1] = obd->u.cli.cl_max_mds_easize;
int level, int msg_flags)
{
struct ptlrpc_request *req;
- struct mds_body *body;
- int rc, size = sizeof(*body);
+ int rc, size[] = { [MDS_REQ_REC_OFF] = sizeof(struct mds_body) };
ENTRY;
- req = ptlrpc_prep_req(imp, MDS_GETSTATUS, 1, &size, NULL);
+ req = ptlrpc_prep_req(imp, LUSTRE_MDS_VERSION, MDS_GETSTATUS,
+ 1, size, NULL);
if (!req)
GOTO(out, rc = -ENOMEM);
- body = lustre_msg_buf(req->rq_reqmsg, 0, sizeof (*body));
req->rq_send_state = level;
- req->rq_replen = lustre_msg_size(1, &size);
+ req->rq_replen = lustre_msg_size(1, size);
- mdc_pack_req_body(req);
+ mdc_pack_req_body(req, MDS_REQ_REC_OFF, 0, NULL, 0);
req->rq_reqmsg->flags |= msg_flags;
rc = ptlrpc_queue_wait(req);
if (!rc) {
- body = lustre_swab_repbuf (req, 0, sizeof (*body),
- lustre_swab_mds_body);
+ struct mds_body *body;
+
+ body = lustre_swab_repbuf(req, 0, sizeof(*body),
+ lustre_swab_mds_body);
if (body == NULL) {
CERROR ("Can't extract mds_body\n");
GOTO (out, rc = -EPROTO);
/* XXX do we need to make another request here? We just did a getattr
* to do the lookup in the first place.
*/
- req = ptlrpc_prep_req(class_exp2cliimp(exp), MDS_GETATTR, 1, &size,
- NULL);
+ req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_MDS_VERSION,
+ MDS_GETATTR, 1, &size, NULL);
if (!req)
GOTO(out, rc = -ENOMEM);
- body = lustre_msg_buf(req->rq_reqmsg, 0, sizeof (*body));
- memcpy(&body->fid1, fid, sizeof(*fid));
- body->valid = valid;
- body->eadatasize = ea_size;
- mdc_pack_req_body(req);
+ mdc_pack_req_body(req, MDS_REQ_REC_OFF, valid, fid, ea_size);
/* currently only root inode will call us with FLACL */
if (valid & OBD_MD_FLACL)
}
int mdc_getattr_name(struct obd_export *exp, struct ll_fid *fid,
- char *filename, int namelen, unsigned long valid,
- unsigned int ea_size, struct ptlrpc_request **request)
+ const char *filename, int namelen, unsigned long valid,
+ unsigned int ea_len, struct ptlrpc_request **request)
{
struct ptlrpc_request *req;
- struct mds_body *body;
- int rc, size[2] = {sizeof(*body), namelen};
+ int rc, size[] = { sizeof(struct mds_body), namelen };
ENTRY;
- req = ptlrpc_prep_req(class_exp2cliimp(exp), MDS_GETATTR_NAME, 2,
- size, NULL);
+ req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_MDS_VERSION,
+ MDS_GETATTR_NAME, 2, size, NULL);
if (!req)
GOTO(out, rc = -ENOMEM);
- body = lustre_msg_buf(req->rq_reqmsg, 0, sizeof (*body));
- memcpy(&body->fid1, fid, sizeof(*fid));
- body->valid = valid;
- body->eadatasize = ea_size;
- mdc_pack_req_body(req);
-
+ mdc_pack_req_body(req, MDS_REQ_REC_OFF, valid, fid, ea_len);
+
LASSERT (strnlen (filename, namelen) == namelen - 1);
memcpy(lustre_msg_buf(req->rq_reqmsg, 1, namelen), filename, namelen);
- rc = mdc_getattr_common(exp, ea_size, 0, req);
+ rc = mdc_getattr_common(exp, ea_len, 0, req);
if (rc != 0) {
ptlrpc_req_finished (req);
req = NULL;
size[bufcnt++] = input_size;
}
- req = ptlrpc_prep_req(class_exp2cliimp(exp), opcode,
+ req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_MDS_VERSION, opcode,
bufcnt, size, NULL);
if (!req)
GOTO(out, rc = -ENOMEM);
/* request data */
- body = lustre_msg_buf(req->rq_reqmsg, 0, sizeof (*body));
- memcpy(&body->fid1, fid, sizeof(*fid));
- body->valid = valid;
- body->eadatasize = output_size;
+ body = lustre_msg_buf(req->rq_reqmsg, MDS_REQ_REC_OFF, sizeof(*body));
+ mdc_pack_req_body(req, MDS_REQ_REC_OFF, valid, fid, output_size);
body->flags = flags;
- mdc_pack_req_body(req);
if (xattr_name) {
tmp = lustre_msg_buf(req->rq_reqmsg, 1, xattr_namelen);
void mdc_store_inode_generation(struct ptlrpc_request *req, int reqoff,
int repoff)
{
- struct mds_rec_create *rec =
- lustre_msg_buf(req->rq_reqmsg, reqoff, sizeof(*rec));
- struct mds_body *body =
- lustre_msg_buf(req->rq_repmsg, repoff, sizeof(*body));
+ struct mds_rec_create *rec = lustre_msg_buf(req->rq_reqmsg, reqoff,
+ sizeof(*rec));
+ struct mds_body *body = lustre_msg_buf(req->rq_repmsg, repoff,
+ sizeof(*body));
LASSERT (rec != NULL);
LASSERT (body != NULL);
if (close_req != NULL) {
struct mds_body *close_body;
LASSERT(close_req->rq_reqmsg->opc == MDS_CLOSE);
- close_body = lustre_msg_buf(close_req->rq_reqmsg, 0,
+ close_body = lustre_msg_buf(close_req->rq_reqmsg,
+ MDS_REQ_REC_OFF,
sizeof(*close_body));
if (och != NULL)
LASSERT(!memcmp(&old, &close_body->handle, sizeof old));
struct ptlrpc_request *open_req)
{
struct mdc_open_data *mod;
- struct mds_rec_create *rec =
- lustre_msg_buf(open_req->rq_reqmsg, 2, sizeof(*rec));
- struct mds_body *body =
- lustre_msg_buf(open_req->rq_repmsg, 1, sizeof(*body));
+ struct mds_rec_create *rec = lustre_msg_buf(open_req->rq_reqmsg,
+ MDS_REQ_INTENT_REC_OFF,
+ sizeof(*rec));
+ struct mds_body *body = lustre_msg_buf(open_req->rq_repmsg, 1,
+ sizeof(*body));
- LASSERT(rec != NULL);
- /* outgoing messages always in my byte order */
LASSERT(body != NULL);
/* incoming message in my byte order (it's been swabbed) */
+ LASSERT(rec != NULL);
+ /* outgoing messages always in my byte order */
LASSERT_REPSWABBED(open_req, 1);
OBD_ALLOC(mod, sizeof(*mod));
struct obd_client_handle *och, struct ptlrpc_request **request)
{
struct obd_device *obd = class_exp2obd(exp);
- int reqsize = sizeof(struct mds_body);
- int rc, repsize[3] = {sizeof(struct mds_body),
+ int size[] = { sizeof(struct mds_body) };
+ int rc, repsize[] = { sizeof(struct mds_body),
obd->u.cli.cl_max_mds_easize,
obd->u.cli.cl_max_mds_cookiesize};
struct ptlrpc_request *req;
struct l_wait_info lwi;
ENTRY;
- req = ptlrpc_prep_req(class_exp2cliimp(exp), MDS_CLOSE, 1, &reqsize,
- NULL);
+ req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_MDS_VERSION,
+ MDS_CLOSE, 1, size, NULL);
if (req == NULL)
GOTO(out, rc = -ENOMEM);
CDEBUG(D_HA, "couldn't find open req; expecting close error\n");
}
- mdc_close_pack(req, 0, oa, oa->o_valid, och);
+ mdc_close_pack(req, MDS_REQ_REC_OFF, oa, oa->o_valid, och);
req->rq_replen = lustre_msg_size(3, repsize);
req->rq_commit_cb = mdc_commit_close;
{
struct ptlrpc_request *req;
struct mds_body *body;
- int rc, size = sizeof(*body);
+ int rc, size[] = { [MDS_REQ_REC_OFF] = sizeof(*body) };
ENTRY;
- req = ptlrpc_prep_req(class_exp2cliimp(exp), MDS_DONE_WRITING, 1,
- &size, NULL);
+ req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_MDS_VERSION,
+ MDS_DONE_WRITING, 1, size, NULL);
if (req == NULL)
RETURN(-ENOMEM);
- body = lustre_msg_buf(req->rq_reqmsg, 0, sizeof(*body));
+ body = lustre_msg_buf(req->rq_reqmsg, MDS_REQ_REC_OFF, sizeof(*body));
mdc_pack_fid(&body->fid1, obdo->o_id, 0, obdo->o_mode);
body->size = obdo->o_size;
body->blocks = obdo->o_blocks;
body->valid = obdo->o_valid;
// memcpy(&body->handle, &och->och_fh, sizeof(body->handle));
- req->rq_replen = lustre_msg_size(1, &size);
+ req->rq_replen = lustre_msg_size(1, size);
rc = ptlrpc_queue_wait(req);
ptlrpc_req_finished(req);
RETURN(rc);
}
-int mdc_readpage(struct obd_export *exp, struct ll_fid *mdc_fid, __u64 offset,
+int mdc_readpage(struct obd_export *exp, struct ll_fid *fid, __u64 offset,
struct page *page, struct ptlrpc_request **request)
{
struct obd_import *imp = class_exp2cliimp(exp);
struct ptlrpc_request *req = NULL;
struct ptlrpc_bulk_desc *desc = NULL;
struct mds_body *body;
- int rc, size = sizeof(*body);
+ int rc, size[] = { sizeof(*body) };
ENTRY;
- CDEBUG(D_INODE, "inode: %ld\n", (long)mdc_fid->id);
+ CDEBUG(D_INODE, "inode: "LPU64"\n", fid->id);
- req = ptlrpc_prep_req(imp, MDS_READPAGE, 1, &size, NULL);
+ req = ptlrpc_prep_req(imp, LUSTRE_MDS_VERSION, MDS_READPAGE,
+ 1, size, NULL);
if (req == NULL)
GOTO(out, rc = -ENOMEM);
+
/* XXX FIXME bug 249 */
req->rq_request_portal = MDS_READPAGE_PORTAL;
ptlrpc_prep_bulk_page(desc, page, 0, PAGE_CACHE_SIZE);
- mdc_readdir_pack(req, offset, PAGE_CACHE_SIZE, mdc_fid);
+ mdc_readdir_pack(req, MDS_REQ_REC_OFF, offset, PAGE_CACHE_SIZE, fid);
- req->rq_replen = lustre_msg_size(1, &size);
+ req->rq_replen = lustre_msg_size(1, size);
rc = ptlrpc_queue_wait(req);
if (rc == 0) {
~OBD_CONNECT_RDONLY;
}
- req = ptlrpc_prep_req(imp, MDS_SET_INFO, 2, size, bufs);
+ req = ptlrpc_prep_req(imp, LUSTRE_MDS_VERSION,
+ MDS_SET_INFO, 2, size, bufs);
if (req == NULL)
RETURN(-ENOMEM);
* during mount that would help a bit). Having relative timestamps
* is not so great if request processing is slow, while absolute
* timestamps are not ideal because they need time synchronization. */
- req = ptlrpc_prep_req(obd->u.cli.cl_import, MDS_STATFS, 0, NULL, NULL);
+ req = ptlrpc_prep_req(obd->u.cli.cl_import, LUSTRE_MDS_VERSION,
+ MDS_STATFS, 0, NULL, NULL);
if (!req)
RETURN(-ENOMEM);
{
struct ptlrpc_request *req;
struct mds_body *body;
- int rc, size = sizeof(*body);
+ int rc, size[] = { [MDS_REQ_REC_OFF] = sizeof(struct mds_body) };
ENTRY;
- req = ptlrpc_prep_req(class_exp2cliimp(exp), MDS_PIN, 1, &size, NULL);
+ req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_MDS_VERSION,
+ MDS_PIN, 1, size, NULL);
if (req == NULL)
RETURN(-ENOMEM);
- body = lustre_msg_buf(req->rq_reqmsg, 0, sizeof (*body));
+ body = lustre_msg_buf(req->rq_reqmsg, MDS_REQ_REC_OFF, sizeof (*body));
mdc_pack_fid(&body->fid1, ino, gen, type);
body->flags = flag;
- req->rq_replen = lustre_msg_size(1, &size);
+ req->rq_replen = lustre_msg_size(1, size);
mdc_get_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL);
rc = ptlrpc_queue_wait(req);
{
struct ptlrpc_request *req;
struct mds_body *body;
- int rc, size = sizeof(*body);
+ int rc, size[] = { [MDS_REQ_REC_OFF] = sizeof(struct mds_body) };
ENTRY;
if (handle->och_magic != OBD_CLIENT_HANDLE_MAGIC)
RETURN(0);
- req = ptlrpc_prep_req(class_exp2cliimp(exp), MDS_CLOSE, 1, &size, NULL);
+ req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_MDS_VERSION,
+ MDS_CLOSE, 1, size, NULL);
if (req == NULL)
RETURN(-ENOMEM);
- body = lustre_msg_buf(req->rq_reqmsg, 0, sizeof(*body));
+ body = lustre_msg_buf(req->rq_reqmsg, 1, sizeof(*body));
memcpy(&body->handle, &handle->och_fh, sizeof(body->handle));
body->flags = flag;
struct ptlrpc_request **request)
{
struct ptlrpc_request *req;
- struct mds_body *body;
- int size = sizeof(*body);
- int rc;
+ int rc, size[] = { [MDS_REQ_REC_OFF] = sizeof(struct mds_body) };
ENTRY;
- req = ptlrpc_prep_req(class_exp2cliimp(exp), MDS_SYNC, 1,&size,NULL);
+ req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_MDS_VERSION,
+ MDS_SYNC, 1, size, NULL);
if (!req)
RETURN(rc = -ENOMEM);
- if (fid) {
- body = lustre_msg_buf(req->rq_reqmsg, 0, sizeof (*body));
- memcpy(&body->fid1, fid, sizeof(*fid));
- mdc_pack_req_body(req);
- }
+ mdc_pack_req_body(req, MDS_REQ_REC_OFF, 0, fid, 0);
- req->rq_replen = lustre_msg_size(1, &size);
+ req->rq_replen = lustre_msg_size(1, size);
rc = ptlrpc_queue_wait(req);
if (rc || request == NULL)
/* child orphan sem protects orphan_dec_test and
* is_orphan race, mds_mfd_close drops it */
MDS_DOWN_WRITE_ORPHAN_SEM(dentry->d_inode);
- rc = mds_mfd_close(NULL, obd, mfd,
+ rc = mds_mfd_close(NULL, MDS_REQ_REC_OFF, obd, mfd,
!(export->exp_flags & OBD_OPT_FAILOVER));
if (rc)
if (rc)
GOTO(cleanup, rc);
- LASSERT (offset == 0 || offset == 2);
+ LASSERT (offset == MDS_REQ_REC_OFF || offset == MDS_REQ_INTENT_REC_OFF);
/* if requests were at offset 2, the getattr reply goes back at 1 */
- if (offset) {
+ if (offset == MDS_REQ_INTENT_REC_OFF) {
rep = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*rep));
offset = 1;
}
return rc;
}
-static int mds_getattr(int offset, struct ptlrpc_request *req)
+static int mds_getattr(struct ptlrpc_request *req, int offset)
{
struct mds_obd *mds = mds_req2mds(req);
struct obd_device *obd = req->rq_export->exp_obd;
return 0;
}
-static int mds_sync(struct ptlrpc_request *req)
+static int mds_sync(struct ptlrpc_request *req, int offset)
{
struct obd_device *obd = req->rq_export->exp_obd;
struct mds_obd *mds = &obd->u.mds;
*
* If we were to take another one here, a deadlock will result, if another
* thread is already waiting for a PW lock. */
-static int mds_readpage(struct ptlrpc_request *req)
+static int mds_readpage(struct ptlrpc_request *req, int offset)
{
struct obd_device *obd = req->rq_export->exp_obd;
struct mds_obd *mds = &obd->u.mds;
GOTO(out, rc);
}
- body = lustre_swab_reqbuf(req, 0, sizeof(*body), lustre_swab_mds_body);
+ body = lustre_swab_reqbuf(req, offset, sizeof(*body),
+ lustre_swab_mds_body);
if (body == NULL)
GOTO (out, rc = -EFAULT);
RETURN(0);
}
+static int mds_msg_check_version(struct lustre_msg *msg)
+{
+ int rc;
+
+ /* TODO: enable the below check while really introducing msg version.
+ * it's disabled because it will break compatibility with b1_4.
+ */
+ return (0);
+
+ switch (msg->opc) {
+ case MDS_CONNECT:
+ case MDS_DISCONNECT:
+ case OBD_PING:
+ rc = lustre_msg_check_version(msg, LUSTRE_OBD_VERSION);
+ if (rc)
+ CERROR("bad opc %u version %08x, expecting %08x\n",
+ msg->opc, msg->version, LUSTRE_OBD_VERSION);
+ break;
+ case MDS_GETSTATUS:
+ case MDS_GETATTR:
+ case MDS_GETATTR_NAME:
+ case MDS_STATFS:
+ case MDS_READPAGE:
+ case MDS_REINT:
+ case MDS_CLOSE:
+ case MDS_DONE_WRITING:
+ case MDS_PIN:
+ case MDS_SYNC:
+ case MDS_GETXATTR:
+ case MDS_SETXATTR:
+ case MDS_SET_INFO:
+ case MDS_QUOTACHECK:
+ case MDS_QUOTACTL:
+ case QUOTA_DQACQ:
+ case QUOTA_DQREL:
+ rc = lustre_msg_check_version(msg, LUSTRE_MDS_VERSION);
+ if (rc)
+ CERROR("bad opc %u version %08x, expecting %08x\n",
+ msg->opc, msg->version, LUSTRE_MDS_VERSION);
+ break;
+ case LDLM_ENQUEUE:
+ case LDLM_CONVERT:
+ case LDLM_BL_CALLBACK:
+ case LDLM_CP_CALLBACK:
+ rc = lustre_msg_check_version(msg, LUSTRE_DLM_VERSION);
+ if (rc)
+ CERROR("bad opc %u version %08x, expecting %08x\n",
+ msg->opc, msg->version, LUSTRE_DLM_VERSION);
+ break;
+ case OBD_LOG_CANCEL:
+ case LLOG_ORIGIN_HANDLE_CREATE:
+ case LLOG_ORIGIN_HANDLE_NEXT_BLOCK:
+ case LLOG_ORIGIN_HANDLE_PREV_BLOCK:
+ case LLOG_ORIGIN_HANDLE_READ_HEADER:
+ case LLOG_ORIGIN_HANDLE_CLOSE:
+ case LLOG_CATINFO:
+ rc = lustre_msg_check_version(msg, LUSTRE_LOG_VERSION);
+ if (rc)
+ CERROR("bad opc %u version %08x, expecting %08x\n",
+ msg->opc, msg->version, LUSTRE_LOG_VERSION);
+ break;
+ default:
+ CERROR("MDS unknown opcode %d\n", msg->opc);
+ rc = -ENOTSUPP;
+ }
+ return rc;
+}
+
int mds_handle(struct ptlrpc_request *req)
{
int should_process, fail = OBD_FAIL_MDS_ALL_REPLY_NET;
OBD_FAIL_RETURN(OBD_FAIL_MDS_ALL_REQUEST_NET | OBD_FAIL_ONCE, 0);
LASSERT(current->journal_info == NULL);
+
+ rc = mds_msg_check_version(req->rq_reqmsg);
+ if (rc) {
+ CERROR("MDS drop mal-formed request\n");
+ RETURN(rc);
+ }
+
/* XXX identical to OST */
if (req->rq_reqmsg->opc != MDS_CONNECT) {
struct mds_export_data *med;
case MDS_GETATTR:
DEBUG_REQ(D_INODE, req, "getattr");
OBD_FAIL_RETURN(OBD_FAIL_MDS_GETATTR_NET, 0);
- rc = mds_getattr(0, req);
+ rc = mds_getattr(req, MDS_REQ_REC_OFF);
break;
case MDS_SETXATTR:
* want to cancel.
*/
lockh.cookie = 0;
- rc = mds_getattr_name(0, req, MDS_INODELOCK_UPDATE, &lockh);
+ rc = mds_getattr_name(MDS_REQ_REC_OFF, req,
+ MDS_INODELOCK_UPDATE, &lockh);
/* this non-intent call (from an ioctl) is special */
req->rq_status = rc;
if (rc == 0 && lockh.cookie)
case MDS_READPAGE:
DEBUG_REQ(D_INODE, req, "readpage");
OBD_FAIL_RETURN(OBD_FAIL_MDS_READPAGE_NET, 0);
- rc = mds_readpage(req);
+ rc = mds_readpage(req, MDS_REQ_REC_OFF);
if (OBD_FAIL_CHECK_ONCE(OBD_FAIL_MDS_SENDPAGE)) {
RETURN(0);
break;
case MDS_REINT: {
- __u32 *opcp = lustre_msg_buf(req->rq_reqmsg, 0, sizeof (*opcp));
+ __u32 *opcp = lustre_msg_buf(req->rq_reqmsg, MDS_REQ_REC_OFF,
+ sizeof (*opcp));
__u32 opc;
- int size[3] = {sizeof(struct mds_body), mds->mds_max_mdsize,
+ int size[] = { sizeof(struct mds_body), mds->mds_max_mdsize,
mds->mds_max_cookiesize};
int bufcount;
if (rc)
break;
- rc = mds_reint(req, 0, NULL);
+ rc = mds_reint(req, MDS_REQ_REC_OFF, NULL);
fail = OBD_FAIL_MDS_REINT_NET_REP;
break;
}
case MDS_CLOSE:
DEBUG_REQ(D_INODE, req, "close");
OBD_FAIL_RETURN(OBD_FAIL_MDS_CLOSE_NET, 0);
- rc = mds_close(req);
+ rc = mds_close(req, MDS_REQ_REC_OFF);
break;
case MDS_DONE_WRITING:
DEBUG_REQ(D_INODE, req, "done_writing");
OBD_FAIL_RETURN(OBD_FAIL_MDS_DONE_WRITING_NET, 0);
- rc = mds_done_writing(req);
+ rc = mds_done_writing(req, MDS_REQ_REC_OFF);
break;
case MDS_PIN:
DEBUG_REQ(D_INODE, req, "pin");
OBD_FAIL_RETURN(OBD_FAIL_MDS_PIN_NET, 0);
- rc = mds_pin(req);
+ rc = mds_pin(req, MDS_REQ_REC_OFF);
break;
case MDS_SYNC:
DEBUG_REQ(D_INODE, req, "sync");
OBD_FAIL_RETURN(OBD_FAIL_MDS_SYNC_NET, 0);
- rc = mds_sync(req);
+ rc = mds_sync(req, MDS_REQ_REC_OFF);
break;
case MDS_SET_INFO:
OBD_FAIL_RETURN(OBD_FAIL_OBD_LOGD_NET, 0);
rc = llog_origin_handle_next_block(req);
break;
+ case LLOG_ORIGIN_HANDLE_PREV_BLOCK:
+ DEBUG_REQ(D_INODE, req, "llog prev block");
+ OBD_FAIL_RETURN(OBD_FAIL_OBD_LOGD_NET, 0);
+ rc = llog_origin_handle_prev_block(req);
+ break;
case LLOG_ORIGIN_HANDLE_READ_HEADER:
DEBUG_REQ(D_INODE, req, "llog read header");
OBD_FAIL_RETURN(OBD_FAIL_OBD_LOGD_NET, 0);
RETURN(0);
}
-static void fixup_handle_for_resent_req(struct ptlrpc_request *req,
+static void fixup_handle_for_resent_req(struct ptlrpc_request *req, int offset,
struct ldlm_lock *new_lock,
struct ldlm_lock **old_lock,
struct lustre_handle *lockh)
struct obd_export *exp = req->rq_export;
struct obd_device *obd = exp->exp_obd;
struct ldlm_request *dlmreq =
- lustre_msg_buf(req->rq_reqmsg, 0, sizeof (*dlmreq));
+ lustre_msg_buf(req->rq_reqmsg, offset, sizeof (*dlmreq));
struct lustre_handle remote_hdl = dlmreq->lock_handle1;
struct list_head *iter;
struct lustre_handle lockh = { 0 };
struct ldlm_lock *new_lock = NULL;
int getattr_part = MDS_INODELOCK_UPDATE;
- int rc, offset = 2;
- int repbufcnt = 3, repsize[4] = {sizeof(struct ldlm_reply),
- sizeof(struct mds_body),
- mds->mds_max_mdsize};
+ int repsize[4] = {sizeof(*rep),
+ sizeof(struct mds_body),
+ mds->mds_max_mdsize};
+ int repbufcnt = 3, offset = MDS_REQ_INTENT_REC_OFF;
+ int rc;
ENTRY;
LASSERT(req != NULL);
- if (req->rq_reqmsg->bufcount <= 1) {
+ if (req->rq_reqmsg->bufcount <= MDS_REQ_INTENT_IT_OFF) {
/* No intent was provided */
int size = sizeof(struct ldlm_reply);
rc = lustre_pack_reply(req, 1, &size, NULL);
RETURN(0);
}
- it = lustre_swab_reqbuf(req, 1, sizeof(*it), lustre_swab_ldlm_intent);
+ it = lustre_swab_reqbuf(req, MDS_REQ_INTENT_IT_OFF, sizeof(*it),
+ lustre_swab_ldlm_intent);
if (it == NULL) {
CERROR("Intent missing\n");
RETURN(req->rq_status = -EFAULT);
switch ((long)it->opc) {
case IT_OPEN:
case IT_CREAT|IT_OPEN:
- fixup_handle_for_resent_req(req, lock, NULL, &lockh);
+ fixup_handle_for_resent_req(req, MDS_REQ_INTENT_LOCKREQ_OFF,
+ lock, NULL, &lockh);
/* XXX swab here to assert that an mds_open reint
* packet is following */
rep->lock_policy_res2 = mds_reint(req, offset, &lockh);
case IT_GETATTR:
getattr_part |= MDS_INODELOCK_LOOKUP;
case IT_READDIR:
- fixup_handle_for_resent_req(req, lock, &new_lock, &lockh);
+ fixup_handle_for_resent_req(req, MDS_REQ_INTENT_LOCKREQ_OFF,
+ lock, &new_lock, &lockh);
rep->lock_policy_res2 = mds_getattr_name(offset, req,
getattr_part, &lockh);
int mds_query_write_access(struct inode *inode);
int mds_open(struct mds_update_record *rec, int offset,
struct ptlrpc_request *req, struct lustre_handle *);
-int mds_pin(struct ptlrpc_request *req);
+int mds_pin(struct ptlrpc_request *req, int offset);
void mds_mfd_unlink(struct mds_file_data *mfd, int decref);
-int mds_mfd_close(struct ptlrpc_request *req, struct obd_device *obd,
+int mds_mfd_close(struct ptlrpc_request *req, int offset, struct obd_device *obd,
struct mds_file_data *mfd, int unlink_orphan);
-int mds_close(struct ptlrpc_request *req);
-int mds_done_writing(struct ptlrpc_request *req);
+int mds_close(struct ptlrpc_request *req, int offset);
+int mds_done_writing(struct ptlrpc_request *req, int offset);
/* mds/mds_fs.c */
RETURN(rc);
}
-int mds_pin(struct ptlrpc_request *req)
+int mds_pin(struct ptlrpc_request *req, int offset)
{
struct obd_device *obd = req->rq_export->exp_obd;
struct mds_body *request_body, *reply_body;
int rc, size = sizeof(*reply_body);
ENTRY;
- request_body = lustre_msg_buf(req->rq_reqmsg, 0, sizeof(*request_body));
+ request_body = lustre_msg_buf(req->rq_reqmsg, offset,
+ sizeof(*request_body));
rc = lustre_pack_reply(req, 1, &size, NULL);
if (rc)
if (offset == 2) { /* intent */
rep = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*rep));
body = lustre_msg_buf(req->rq_repmsg, 1, sizeof (*body));
- } else if (offset == 0) { /* non-intent reint */
+ } else if (offset == MDS_REQ_REC_OFF) { /* non-intent reint */
body = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*body));
} else {
body = NULL;
* (it will not even _have_ an entry in last_rcvd anymore).
*
* Returns EAGAIN if the client needs to get more data and re-close. */
-int mds_mfd_close(struct ptlrpc_request *req, struct obd_device *obd,
+int mds_mfd_close(struct ptlrpc_request *req, int offset,struct obd_device *obd,
struct mds_file_data *mfd, int unlink_orphan)
{
struct inode *inode = mfd->mfd_dentry->d_inode;
ENTRY;
if (req && req->rq_reqmsg != NULL)
- request_body = lustre_msg_buf(req->rq_reqmsg, 0,
+ request_body = lustre_msg_buf(req->rq_reqmsg, offset,
sizeof(*request_body));
if (req && req->rq_repmsg != NULL)
reply_body = lustre_msg_buf(req->rq_repmsg, 0,
RETURN(rc);
}
-int mds_close(struct ptlrpc_request *req)
+int mds_close(struct ptlrpc_request *req, int offset)
{
struct mds_export_data *med = &req->rq_export->exp_mds_data;
struct obd_device *obd = req->rq_export->exp_obd;
MDS_CHECK_RESENT(req, mds_reconstruct_generic(req));
}
- body = lustre_swab_reqbuf(req, 0, sizeof(*body), lustre_swab_mds_body);
+ body = lustre_swab_reqbuf(req, offset, sizeof(*body),
+ lustre_swab_mds_body);
if (body == NULL) {
CERROR("Can't unpack body\n");
req->rq_status = -EFAULT;
}
push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
- req->rq_status = mds_mfd_close(req, obd, mfd, 1);
+ req->rq_status = mds_mfd_close(req, offset, obd, mfd, 1);
pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
if (OBD_FAIL_CHECK(OBD_FAIL_MDS_CLOSE_PACK)) {
RETURN(rc);
}
-int mds_done_writing(struct ptlrpc_request *req)
+int mds_done_writing(struct ptlrpc_request *req, int offset)
{
struct mds_body *body;
int rc, size = sizeof(struct mds_body);
MDS_CHECK_RESENT(req, mds_reconstruct_generic(req));
- body = lustre_swab_reqbuf(req, 0, sizeof(*body), lustre_swab_mds_body);
+ body = lustre_swab_reqbuf(req, offset, sizeof(*body),
+ lustre_swab_mds_body);
if (body == NULL) {
CERROR("Can't unpack body\n");
req->rq_status = -EFAULT;
rec->ur_iattr.ia_gid};
ENTRY;
- LASSERT(offset == 0);
+ LASSERT(offset == MDS_REQ_REC_OFF);
DEBUG_REQ(D_INODE, req, "setattr "LPU64"/%u %x", rec->ur_fid1->id,
rec->ur_fid1->generation, rec->ur_iattr.ia_valid);
struct dentry_params dp;
ENTRY;
- LASSERT(offset == 0);
+ LASSERT(offset == MDS_REQ_REC_OFF);
LASSERT(!strcmp(req->rq_export->exp_obd->obd_type->typ_name, "mds"));
DEBUG_REQ(D_INODE, req, "parent "LPU64"/%u name %s mode %o",
unsigned int qpids [MAXQUOTAS] = {0, 0};
ENTRY;
- LASSERT(offset == 0 || offset == 2);
+ LASSERT(offset == MDS_REQ_REC_OFF || offset == 2);
DEBUG_REQ(D_INODE, req, "parent ino "LPU64"/%u, child %s",
rec->ur_fid1->id, rec->ur_fid1->generation, rec->ur_name);
int rc = 0, cleanup_phase = 0;
ENTRY;
- LASSERT(offset == 0);
+ LASSERT(offset == MDS_REQ_REC_OFF);
DEBUG_REQ(D_INODE, req, "original "LPU64"/%u to "LPU64"/%u %s",
rec->ur_fid1->id, rec->ur_fid1->generation,
unsigned int qpids[4] = {0, 0, 0, 0};
ENTRY;
- LASSERT(offset == 0);
+ LASSERT(offset == MDS_REQ_REC_OFF);
DEBUG_REQ(D_INODE, req, "parent "LPU64"/%u %s to "LPU64"/%u %s",
rec->ur_fid1->id, rec->ur_fid1->generation, rec->ur_name,
RETURN(rc);
}
EXPORT_SYMBOL(llog_process);
+
+int llog_reverse_process(struct llog_handle *loghandle, llog_cb_t cb,
+ void *data, void *catdata)
+{
+ struct llog_log_hdr *llh = loghandle->lgh_hdr;
+ struct llog_process_cat_data *cd = catdata;
+ void *buf;
+ int rc = 0, first_index = 1, index, idx;
+ struct llog_rec_tail *tail;
+ ENTRY;
+
+ OBD_ALLOC(buf, LLOG_CHUNK_SIZE);
+ if (!buf)
+ RETURN(-ENOMEM);
+
+ if (cd != NULL)
+ first_index = cd->first_idx + 1;
+ if (cd != NULL && cd->last_idx)
+ index = cd->last_idx;
+ else
+ index = LLOG_BITMAP_BYTES * 8 - 1;
+
+ while (rc == 0) {
+ struct llog_rec_hdr *rec;
+
+ /* skip records not set in bitmap */
+ while (index >= first_index &&
+ !ext2_test_bit(index, llh->llh_bitmap))
+ --index;
+
+ LASSERT(index >= first_index - 1);
+ if (index == first_index - 1)
+ break;
+
+ /* get the buf with our target record; avoid old garbage */
+ memset(buf, 0, LLOG_CHUNK_SIZE);
+ rc = llog_prev_block(loghandle, index, buf, LLOG_CHUNK_SIZE);
+ if (rc)
+ GOTO(out, rc);
+
+ rec = buf;
+ idx = le32_to_cpu(rec->lrh_index);
+ if (idx < index)
+ CDEBUG(D_HA, "index %u : idx %u\n", index, idx);
+ while (idx < index) {
+ rec = ((void *)rec + le32_to_cpu(rec->lrh_len));
+ idx ++;
+ }
+
+ /* process records in buffer, starting where we found one */
+ while ((void *)rec >= buf) {
+ if (rec->lrh_index == 0)
+ GOTO(out, 0); /* no more records */
+
+ /* if set, process the callback on this record */
+ if (ext2_test_bit(index, llh->llh_bitmap)) {
+ rc = cb(loghandle, rec, data);
+ if (rc == LLOG_PROC_BREAK) {
+ CWARN("recovery from log: "LPX64":%x"
+ " stopped\n",
+ loghandle->lgh_id.lgl_oid,
+ loghandle->lgh_id.lgl_ogen);
+ GOTO(out, rc);
+ }
+ if (rc)
+ GOTO(out, rc);
+ }
+
+ /* previous record, still in buffer? */
+ --index;
+ if (index < first_index)
+ GOTO(out, rc = 0);
+ tail = (void *)rec - sizeof(struct llog_rec_tail);
+ rec = ((void *)rec - le32_to_cpu(tail->lrt_len));
+ }
+ }
+
+out:
+ if (buf)
+ OBD_FREE(buf, LLOG_CHUNK_SIZE);
+ RETURN(rc);
+}
+EXPORT_SYMBOL(llog_reverse_process);
}
EXPORT_SYMBOL(llog_cat_process);
+static int llog_cat_reverse_process_cb(struct llog_handle *cat_llh,
+ struct llog_rec_hdr *rec, void *data)
+{
+ struct llog_process_data *d = data;
+ struct llog_logid_rec *lir = (struct llog_logid_rec *)rec;
+ struct llog_handle *llh;
+ int rc;
+
+ if (le32_to_cpu(rec->lrh_type) != LLOG_LOGID_MAGIC) {
+ CERROR("invalid record in catalog\n");
+ RETURN(-EINVAL);
+ }
+ CWARN("processing log "LPX64":%x at index %u of catalog "LPX64"\n",
+ lir->lid_id.lgl_oid, lir->lid_id.lgl_ogen,
+ le32_to_cpu(rec->lrh_index), cat_llh->lgh_id.lgl_oid);
+
+ rc = llog_cat_id2handle(cat_llh, &llh, &lir->lid_id);
+ if (rc) {
+ CERROR("Cannot find handle for log "LPX64"\n",
+ lir->lid_id.lgl_oid);
+ RETURN(rc);
+ }
+
+ rc = llog_reverse_process(llh, d->lpd_cb, d->lpd_data, NULL);
+ RETURN(rc);
+}
+
+int llog_cat_reverse_process(struct llog_handle *cat_llh,
+ llog_cb_t cb, void *data)
+{
+ struct llog_process_data d;
+ struct llog_process_cat_data cd;
+ struct llog_log_hdr *llh = cat_llh->lgh_hdr;
+ int rc;
+ ENTRY;
+
+ LASSERT(llh->llh_flags &cpu_to_le32(LLOG_F_IS_CAT));
+ d.lpd_data = data;
+ d.lpd_cb = cb;
+
+ if (llh->llh_cat_idx > cat_llh->lgh_last_idx) {
+ CWARN("catalog "LPX64" crosses index zero\n",
+ cat_llh->lgh_id.lgl_oid);
+
+ cd.first_idx = 0;
+ cd.last_idx = cat_llh->lgh_last_idx;
+ rc = llog_reverse_process(cat_llh, llog_cat_reverse_process_cb,
+ &d, &cd);
+ if (rc != 0)
+ RETURN(rc);
+
+ cd.first_idx = le32_to_cpu(llh->llh_cat_idx);
+ cd.last_idx = 0;
+ rc = llog_reverse_process(cat_llh, llog_cat_reverse_process_cb,
+ &d, &cd);
+ } else {
+ rc = llog_reverse_process(cat_llh, llog_cat_reverse_process_cb,
+ &d, NULL);
+ }
+
+ RETURN(rc);
+}
+EXPORT_SYMBOL(llog_cat_reverse_process);
+
int llog_cat_set_first_idx(struct llog_handle *cathandle, int index)
{
struct llog_log_hdr *llh = cathandle->lgh_hdr;
rc = fsfilt_read_record(loghandle->lgh_ctxt->loc_exp->exp_obd,
loghandle->lgh_file, buf, len,
&ppos);
-
if (rc) {
CERROR("Cant read llog block at log id "LPU64
"/%u offset "LPU64"\n",
RETURN(-EIO);
}
+static int llog_lvfs_prev_block(struct llog_handle *loghandle,
+ int prev_idx, void *buf, int len)
+{
+ __u64 cur_offset;
+ int rc;
+ ENTRY;
+
+ if (len == 0 || len & (LLOG_CHUNK_SIZE - 1))
+ RETURN(-EINVAL);
+
+ CDEBUG(D_OTHER, "looking for log index %u n", prev_idx);
+
+ cur_offset = LLOG_CHUNK_SIZE;
+ llog_skip_over(&cur_offset, 0, prev_idx);
+
+ while (cur_offset < loghandle->lgh_file->f_dentry->d_inode->i_size) {
+ struct llog_rec_hdr *rec;
+ struct llog_rec_tail *tail;
+ loff_t ppos;
+
+ ppos = cur_offset;
+
+ rc = fsfilt_read_record(loghandle->lgh_ctxt->loc_exp->exp_obd,
+ loghandle->lgh_file, buf, len,
+ &ppos);
+ if (rc) {
+ CERROR("Cant read llog block at log id "LPU64
+ "/%u offset "LPU64"\n",
+ loghandle->lgh_id.lgl_oid,
+ loghandle->lgh_id.lgl_ogen,
+ cur_offset);
+ RETURN(rc);
+ }
+
+ /* put number of bytes read into rc to make code simpler */
+ rc = ppos - cur_offset;
+ cur_offset = ppos;
+
+ if (rc == 0) /* end of file, nothing to do */
+ RETURN(0);
+
+ if (rc < sizeof(*tail)) {
+ CERROR("Invalid llog block at log id "LPU64"/%u offset "
+ LPU64"\n", loghandle->lgh_id.lgl_oid,
+ loghandle->lgh_id.lgl_ogen, cur_offset);
+ RETURN(-EINVAL);
+ }
+
+ tail = buf + rc - sizeof(struct llog_rec_tail);
+
+ /* this shouldn't happen */
+ if (tail->lrt_index == 0) {
+ CERROR("Invalid llog tail at log id "LPU64"/%u offset "
+ LPU64"\n", loghandle->lgh_id.lgl_oid,
+ loghandle->lgh_id.lgl_ogen, cur_offset);
+ RETURN(-EINVAL);
+ }
+ if (le32_to_cpu(tail->lrt_index) < prev_idx)
+ continue;
+
+ /* sanity check that the start of the new buffer is no farther
+ * than the record that we wanted. This shouldn't happen. */
+ rec = buf;
+ if (le32_to_cpu(rec->lrh_index) > prev_idx) {
+ CERROR("missed desired record? %u > %u\n",
+ le32_to_cpu(rec->lrh_index), prev_idx);
+ RETURN(-ENOENT);
+ }
+ RETURN(0);
+ }
+ RETURN(-EIO);
+}
+
static struct file *llog_filp_open(char *name, int flags, int mode)
{
char *logname;
struct llog_operations llog_lvfs_ops = {
lop_write_rec: llog_lvfs_write_rec,
lop_next_block: llog_lvfs_next_block,
+ lop_prev_block: llog_lvfs_prev_block,
lop_read_header: llog_lvfs_read_header,
lop_create: llog_lvfs_create,
lop_destroy: llog_lvfs_destroy,
return 0;
}
+static int llog_lvfs_prev_block(struct llog_handle *loghandle,
+ int prev_idx, void *buf, int len)
+{
+ LBUG();
+ return 0;
+}
+
static int llog_lvfs_create(struct llog_ctxt *ctxt, struct llog_handle **res,
struct llog_logid *logid, char *name)
{
struct llog_operations llog_lvfs_ops = {
lop_write_rec: llog_lvfs_write_rec,
lop_next_block: llog_lvfs_next_block,
+ lop_prev_block: llog_lvfs_prev_block,
lop_read_header: llog_lvfs_read_header,
lop_create: llog_lvfs_create,
lop_destroy: llog_lvfs_destroy,
GOTO(out, rc);
}
+ CWARN("5f: print plain log entries reversely.. expect 6\n");
+ rc = llog_cat_reverse_process(llh, plain_print_cb, "foobar");
+ if (rc) {
+ CERROR("5f: reversely process with plain_print_cb failed: %d\n", rc);
+ GOTO(out, rc);
+ }
+
out:
CWARN("5: close re-opened catalog\n");
if (llh)
if (rc)
CERROR("6: llog_process failed %d\n", rc);
+ rc = llog_reverse_process(llh, (llog_cb_t)plain_print_cb, NULL, NULL);
+ if (rc)
+ CERROR("6: llog_reverse_process failed %d\n", rc);
+
parse_out:
rc = llog_close(llh);
if (rc) {
struct osc_getattr_async_args *aa;
ENTRY;
- request = ptlrpc_prep_req(class_exp2cliimp(exp), OST_GETATTR, 1,
- &size, NULL);
+ request = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
+ OST_GETATTR, 1, &size, NULL);
if (!request)
RETURN(-ENOMEM);
int rc, size = sizeof(*body);
ENTRY;
- request = ptlrpc_prep_req(class_exp2cliimp(exp), OST_GETATTR, 1,
- &size, NULL);
+ request = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
+ OST_GETATTR, 1, &size, NULL);
if (!request)
RETURN(-ENOMEM);
int rc, size = sizeof(*body);
ENTRY;
- request = ptlrpc_prep_req(class_exp2cliimp(exp), OST_SETATTR, 1, &size,
- NULL);
+ request = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
+ OST_SETATTR, 1, &size, NULL);
if (!request)
RETURN(-ENOMEM);
LASSERT(oti);
- request = ptlrpc_prep_req(class_exp2cliimp(exp), OST_SETATTR, 1,
- &size, NULL);
+ request = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
+ OST_SETATTR, 1, &size, NULL);
if (!request)
RETURN(-ENOMEM);
RETURN(rc);
}
- request = ptlrpc_prep_req(class_exp2cliimp(exp), OST_CREATE,
- 1, &size, NULL);
+ request = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
+ OST_CREATE, 1, &size, NULL);
if (!request)
GOTO(out, rc = -ENOMEM);
RETURN(-EINVAL);
}
- request = ptlrpc_prep_req(class_exp2cliimp(exp), OST_PUNCH, 1, &size,
- NULL);
+ request = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
+ OST_PUNCH, 1, &size, NULL);
if (!request)
RETURN(-ENOMEM);
RETURN(-EINVAL);
}
- request = ptlrpc_prep_req(class_exp2cliimp(exp), OST_SYNC, 1, &size,
- NULL);
+ request = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
+ OST_SYNC, 1, &size, NULL);
if (!request)
RETURN(-ENOMEM);
RETURN(-EINVAL);
}
- request = ptlrpc_prep_req(class_exp2cliimp(exp), OST_DESTROY, 1,
- &size, NULL);
+ request = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
+ OST_DESTROY, 1, &size, NULL);
if (!request)
RETURN(-ENOMEM);
size[2] = niocount * sizeof(*niobuf);
OBD_FAIL_RETURN(OBD_FAIL_OSC_BRW_PREP_REQ, -ENOMEM);
- req = ptlrpc_prep_req_pool(imp, opc, 3, size, NULL, pool);
+ req = ptlrpc_prep_req_pool(imp, LUSTRE_OST_VERSION, opc, 3,
+ size, NULL, pool);
if (req == NULL)
return (-ENOMEM);
size[1] = sizeof(struct obd_ioobj);
size[2] = page_count * sizeof(*nioptr);
- request = ptlrpc_prep_req(class_exp2cliimp(exp), OST_SAN_READ, 3,
- size, NULL);
+ request = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
+ OST_SAN_READ, 3, size, NULL);
if (!request)
RETURN(-ENOMEM);
size[1] = sizeof(struct obd_ioobj);
size[2] = page_count * sizeof(*nioptr);
- request = ptlrpc_prep_req_pool(class_exp2cliimp(exp), OST_SAN_WRITE,
+ request = ptlrpc_prep_req_pool(class_exp2cliimp(exp),
+ LUSTRE_OST_VERSION, OST_SAN_WRITE,
3, size, NULL, cli->cl_rq_pool);
if (!request)
RETURN(-ENOMEM);
if (*flags & LDLM_FL_HAS_INTENT) {
int size[2] = {sizeof(struct ldlm_request), sizeof(lvb)};
- req = ptlrpc_prep_req(class_exp2cliimp(exp), LDLM_ENQUEUE, 1,
+ req = ptlrpc_prep_req(class_exp2cliimp(exp),
+ LUSTRE_DLM_VERSION, LDLM_ENQUEUE, 1,
size, NULL);
if (req == NULL)
RETURN(-ENOMEM);
* during mount that would help a bit). Having relative timestamps
* is not so great if request processing is slow, while absolute
* timestamps are not ideal because they need time synchronization. */
- request = ptlrpc_prep_req(obd->u.cli.cl_import, OST_STATFS, 0,
- NULL, NULL);
+ request = ptlrpc_prep_req(obd->u.cli.cl_import, LUSTRE_OST_VERSION,
+ OST_STATFS,0,NULL,NULL);
if (!request)
RETURN(-ENOMEM);
obd_id *reply;
char *bufs[1] = {key};
int rc;
- req = ptlrpc_prep_req(class_exp2cliimp(exp), OST_GET_INFO, 1,
- &keylen, bufs);
+ req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
+ OST_GET_INFO, 1, &keylen, bufs);
if (req == NULL)
RETURN(-ENOMEM);
RETURN(-EINVAL);
- req = ptlrpc_prep_req(imp, OST_SET_INFO, 2, size, bufs);
+ req = ptlrpc_prep_req(imp, LUSTRE_OST_VERSION, OST_SET_INFO,
+ 2, size, bufs);
if (req == NULL)
RETURN(-ENOMEM);
}
}
+int ost_msg_check_version(struct lustre_msg *msg)
+{
+ int rc;
+
+ /* TODO: enable the below check while really introducing msg version.
+ * it's disabled because it will break compatibility with b1_4.
+ */
+ return (0);
+ switch(msg->opc) {
+ case OST_CONNECT:
+ case OST_DISCONNECT:
+ case OBD_PING:
+ rc = lustre_msg_check_version(msg, LUSTRE_OBD_VERSION);
+ if (rc)
+ CERROR("bad opc %u version %08x, expecting %08x\n",
+ msg->opc, msg->version, LUSTRE_OBD_VERSION);
+ break;
+ case OST_CREATE:
+ case OST_DESTROY:
+ case OST_GETATTR:
+ case OST_SETATTR:
+ case OST_WRITE:
+ case OST_READ:
+ case OST_SAN_READ:
+ case OST_SAN_WRITE:
+ case OST_PUNCH:
+ case OST_STATFS:
+ case OST_SYNC:
+ case OST_SET_INFO:
+ case OST_GET_INFO:
+ case OST_QUOTACHECK:
+ case OST_QUOTACTL:
+ rc = lustre_msg_check_version(msg, LUSTRE_OST_VERSION);
+ if (rc)
+ CERROR("bad opc %u version %08x, expecting %08x\n",
+ msg->opc, msg->version, LUSTRE_OST_VERSION);
+ break;
+ case LDLM_ENQUEUE:
+ case LDLM_CONVERT:
+ case LDLM_CANCEL:
+ case LDLM_BL_CALLBACK:
+ case LDLM_CP_CALLBACK:
+ rc = lustre_msg_check_version(msg, LUSTRE_DLM_VERSION);
+ if (rc)
+ CERROR("bad opc %u version %08x, expecting %08x\n",
+ msg->opc, msg->version, LUSTRE_DLM_VERSION);
+ break;
+ case LLOG_ORIGIN_CONNECT:
+ case OBD_LOG_CANCEL:
+ rc = lustre_msg_check_version(msg, LUSTRE_LOG_VERSION);
+ if (rc)
+ CERROR("bad opc %u version %08x, expecting %08x\n",
+ msg->opc, msg->version, LUSTRE_LOG_VERSION);
+ default:
+ CERROR("Unexpected opcode %d\n", msg->opc);
+ rc = -ENOTSUPP;
+ }
+ return rc;
+}
+
static int ost_handle(struct ptlrpc_request *req)
{
struct obd_trans_info trans_info = { 0, };
}
oti_init(oti, req);
+ rc = ost_msg_check_version(req->rq_reqmsg);
+ if (rc)
+ RETURN(rc);
switch (req->rq_reqmsg->opc) {
case OST_CONNECT: {
return request;
}
-struct ptlrpc_request *ptlrpc_prep_req_pool(struct obd_import *imp, int opcode,
- int count, int *lengths,
- char **bufs,
- struct ptlrpc_request_pool *pool)
+struct ptlrpc_request *
+ptlrpc_prep_req_pool(struct obd_import *imp, __u32 version, int opcode,
+ int count, int *lengths, char **bufs,
+ struct ptlrpc_request_pool *pool)
{
struct ptlrpc_request *request = NULL;
int rc;
RETURN(NULL);
}
+#if 0 /* TODO: enable this while really introducing msg version.
+ * it's disabled because it will break compatibility with b1_4.
+ */
+ request->rq_reqmsg->version |= version;
+#endif
if (imp->imp_server_timeout)
request->rq_timeout = obd_timeout / 2;
else
RETURN(request);
}
-struct ptlrpc_request *ptlrpc_prep_req(struct obd_import *imp, int opcode,
- int count, int *lengths, char **bufs)
+struct ptlrpc_request *
+ptlrpc_prep_req(struct obd_import *imp, __u32 version, int opcode,
+ int count, int *lengths, char **bufs)
{
- return ptlrpc_prep_req_pool(imp, opcode, count, lengths, bufs, NULL);
+ return ptlrpc_prep_req_pool(imp, version, opcode, count, lengths,
+ bufs, NULL);
}
-
struct ptlrpc_request_set *ptlrpc_prep_set(void)
{
struct ptlrpc_request_set *set;
if (rc)
GOTO(out, rc);
- request = ptlrpc_prep_req(imp, imp->imp_connect_op, 4, size, tmp);
+ request = ptlrpc_prep_req(imp, LUSTRE_OBD_VERSION, imp->imp_connect_op,
+ 4, size, tmp);
if (!request)
GOTO(out, rc = -ENOMEM);
LASSERT(atomic_read(&imp->imp_replay_inflight) == 0);
atomic_inc(&imp->imp_replay_inflight);
- req = ptlrpc_prep_req(imp, OBD_PING, 0, NULL, NULL);
+ req = ptlrpc_prep_req(imp, LUSTRE_OBD_VERSION, OBD_PING,
+ 0, NULL, NULL);
if (!req) {
atomic_dec(&imp->imp_replay_inflight);
RETURN(-ENOMEM);
spin_unlock_irqrestore(&imp->imp_lock, flags);
- request = ptlrpc_prep_req(imp, rq_opc, 0, NULL, NULL);
+ request = ptlrpc_prep_req(imp, LUSTRE_OBD_VERSION, rq_opc,
+ 0, NULL, NULL);
if (request) {
/* We are disconnecting, do not retry a failed DISCONNECT rpc if
* it fails. We can get through the above with a down server
bufcount++;
}
- req = ptlrpc_prep_req(imp, LLOG_ORIGIN_HANDLE_CREATE,
- bufcount, size, tmp);
+ req = ptlrpc_prep_req(imp, LUSTRE_LOG_VERSION,
+ LLOG_ORIGIN_HANDLE_CREATE, bufcount, size, tmp);
if (!req)
GOTO(err_free, rc = -ENOMEM);
int rc;
ENTRY;
- req = ptlrpc_prep_req(imp, LLOG_ORIGIN_HANDLE_NEXT_BLOCK, 1,&size,NULL);
+ req = ptlrpc_prep_req(imp, LUSTRE_LOG_VERSION,
+ LLOG_ORIGIN_HANDLE_NEXT_BLOCK, 1,&size,NULL);
if (!req)
GOTO(out, rc = -ENOMEM);
RETURN(rc);
}
+static int llog_client_prev_block(struct llog_handle *loghandle,
+ int prev_idx, void *buf, int len)
+{
+ struct obd_import *imp = loghandle->lgh_ctxt->loc_imp;
+ struct ptlrpc_request *req = NULL;
+ struct llogd_body *body;
+ void * ptr;
+ int size = sizeof(*body);
+ int repsize[2] = {sizeof (*body)};
+ int rc;
+ ENTRY;
+
+ req = ptlrpc_prep_req(imp, LUSTRE_LOG_VERSION,
+ LLOG_ORIGIN_HANDLE_PREV_BLOCK, 1,&size,NULL);
+ if (!req)
+ GOTO(out, rc = -ENOMEM);
+
+ body = lustre_msg_buf(req->rq_reqmsg, 0, sizeof (*body));
+ body->lgd_logid = loghandle->lgh_id;
+ body->lgd_ctxt_idx = loghandle->lgh_ctxt->loc_idx - 1;
+ body->lgd_llh_flags = loghandle->lgh_hdr->llh_flags;
+ body->lgd_index = prev_idx;
+ body->lgd_len = len;
+ repsize[1] = len;
+
+ req->rq_replen = lustre_msg_size(2, repsize);
+ rc = ptlrpc_queue_wait(req);
+ if (rc)
+ GOTO(out, rc);
+
+ body = lustre_swab_repbuf(req, 0, sizeof(*body),
+ lustre_swab_llogd_body);
+ if (body == NULL) {
+ CERROR ("Can't unpack llogd_body\n");
+ GOTO(out, rc =-EFAULT);
+ }
+
+ ptr = lustre_msg_buf(req->rq_repmsg, 1, len);
+ if (ptr == NULL) {
+ CERROR ("Can't unpack bitmap\n");
+ GOTO(out, rc =-EFAULT);
+ }
+
+ memcpy(buf, ptr, len);
+
+out:
+ if (req)
+ ptlrpc_req_finished(req);
+ RETURN(rc);
+}
static int llog_client_read_header(struct llog_handle *handle)
{
int rc;
ENTRY;
- req = ptlrpc_prep_req(imp, LLOG_ORIGIN_HANDLE_READ_HEADER,
- 1, &size, NULL);
+ req = ptlrpc_prep_req(imp, LUSTRE_LOG_VERSION,
+ LLOG_ORIGIN_HANDLE_READ_HEADER, 1, &size, NULL);
if (!req)
GOTO(out, rc = -ENOMEM);
struct llog_operations llog_client_ops = {
lop_next_block: llog_client_next_block,
+ lop_prev_block: llog_client_prev_block,
lop_read_header: llog_client_read_header,
lop_create: llog_client_create,
lop_close: llog_client_close,
LASSERT(ctxt->loc_imp);
imp = ctxt->loc_imp;
- request = ptlrpc_prep_req(imp, LLOG_ORIGIN_CONNECT, 1, &size, NULL);
+ request = ptlrpc_prep_req(imp, LUSTRE_LOG_VERSION,
+ LLOG_ORIGIN_CONNECT, 1, &size, NULL);
if (!request)
RETURN(-ENOMEM);
RETURN(rc);
}
+int llog_origin_handle_prev_block(struct ptlrpc_request *req)
+{
+ struct obd_export *exp = req->rq_export;
+ struct obd_device *obd = exp->exp_obd;
+ struct llog_handle *loghandle;
+ struct llogd_body *body;
+ struct obd_device *disk_obd;
+ struct lvfs_run_ctxt saved;
+ struct llog_ctxt *ctxt;
+ __u32 flags;
+ __u8 *buf;
+ void * ptr;
+ int size[] = {sizeof (*body),
+ LLOG_CHUNK_SIZE};
+ int rc, rc2;
+ ENTRY;
+
+ body = lustre_swab_reqbuf(req, 0, sizeof(*body),
+ lustre_swab_llogd_body);
+ if (body == NULL) {
+ CERROR ("Can't unpack llogd_body\n");
+ GOTO(out, rc =-EFAULT);
+ }
+
+ OBD_ALLOC(buf, LLOG_CHUNK_SIZE);
+ if (!buf)
+ GOTO(out, rc = -ENOMEM);
+
+ ctxt = llog_get_context(obd, body->lgd_ctxt_idx);
+ LASSERT(ctxt != NULL);
+ disk_obd = ctxt->loc_exp->exp_obd;
+ push_ctxt(&saved, &disk_obd->obd_lvfs_ctxt, NULL);
+
+ rc = llog_create(ctxt, &loghandle, &body->lgd_logid, NULL);
+ if (rc)
+ GOTO(out_pop, rc);
+
+ flags = body->lgd_llh_flags;
+ rc = llog_init_handle(loghandle, flags, NULL);
+ if (rc)
+ GOTO(out_close, rc);
+
+ memset(buf, 0, LLOG_CHUNK_SIZE);
+ rc = llog_prev_block(loghandle, body->lgd_index,
+ buf, LLOG_CHUNK_SIZE);
+ if (rc)
+ GOTO(out_close, rc);
+
+
+ rc = lustre_pack_reply(req, 2, size, NULL);
+ if (rc)
+ GOTO(out_close, rc = -ENOMEM);
+
+ ptr = lustre_msg_buf(req->rq_repmsg, 0, sizeof (body));
+ memcpy(ptr, body, sizeof(*body));
+
+ ptr = lustre_msg_buf(req->rq_repmsg, 1, LLOG_CHUNK_SIZE);
+ memcpy(ptr, buf, LLOG_CHUNK_SIZE);
+
+out_close:
+ rc2 = llog_close(loghandle);
+ if (!rc)
+ rc = rc2;
+
+out_pop:
+ pop_ctxt(&saved, &disk_obd->obd_lvfs_ctxt, NULL);
+ OBD_FREE(buf, LLOG_CHUNK_SIZE);
+out:
+ RETURN(rc);
+}
+
int llog_origin_handle_read_header(struct ptlrpc_request *req)
{
struct obd_export *exp = req->rq_export;
LBUG();
return 0;
}
+int llog_origin_handle_prev_block(struct ptlrpc_request *req)
+{
+ LBUG();
+ return 0;
+}
int llog_origin_handle_read_header(struct ptlrpc_request *req)
{
LBUG();
int rc;
ENTRY;
- req = ptlrpc_prep_req(obd->u.cli.cl_import, OBD_PING, 0, NULL, NULL);
+ req = ptlrpc_prep_req(obd->u.cli.cl_import, LUSTRE_OBD_VERSION,
+ OBD_PING, 0, NULL, NULL);
if (req == NULL)
RETURN(-ENOMEM);
return (msg->magic == __swab32(PTLRPC_MSG_MAGIC));
}
+int lustre_msg_check_version(struct lustre_msg *msg, __u32 version)
+{
+ if (lustre_msg_swabbed(msg))
+ return (__swab32(msg->version) & LUSTRE_VERSION_MASK) != version;
+
+ return (msg->version & LUSTRE_VERSION_MASK) != version;
+}
+
static void
lustre_init_msg (struct lustre_msg *msg, int count, int *lens, char **bufs)
{
RETURN (-EINVAL);
}
- if (m->version != PTLRPC_MSG_VERSION) {
+ if ((m->version & ~LUSTRE_VERSION_MASK) != PTLRPC_MSG_VERSION) {
CERROR("wrong lustre_msg version %#08x\n", m->version);
RETURN (-EINVAL);
}
int rc = 0;
ENTRY;
- req = ptlrpc_prep_req(imp, OBD_PING, 0, NULL,
- NULL);
+ req = ptlrpc_prep_req(imp, LUSTRE_OBD_VERSION, OBD_PING, 0, NULL, NULL);
if (req) {
DEBUG_REQ(D_INFO, req, "pinging %s->%s",
imp->imp_obd->obd_uuid.uuid,
struct ptlrpc_request *req;
ENTRY;
- req = ptlrpc_prep_req(imp, OST_STATFS, 0,
+ req = ptlrpc_prep_req(imp, LUSTRE_OST_VERSION, OST_STATFS, 0,
NULL, NULL);
if (!req) {
CERROR("OOM trying to ping %s->%s\n",
continue;
}
- req = ptlrpc_prep_req(imp, OBD_PING, 0, NULL,
- NULL);
+ req = ptlrpc_prep_req(imp, LUSTRE_OBD_VERSION, OBD_PING,
+ 0, NULL, NULL);
if (!req) {
CERROR("out of memory\n");
break;
/* pack_generic.c */
EXPORT_SYMBOL(lustre_msg_swabbed);
+EXPORT_SYMBOL(lustre_msg_check_version);
EXPORT_SYMBOL(lustre_pack_request);
EXPORT_SYMBOL(lustre_pack_reply);
EXPORT_SYMBOL(lustre_shrink_reply);
/* llogd.c */
EXPORT_SYMBOL(llog_origin_handle_create);
EXPORT_SYMBOL(llog_origin_handle_next_block);
+EXPORT_SYMBOL(llog_origin_handle_prev_block);
EXPORT_SYMBOL(llog_origin_handle_read_header);
EXPORT_SYMBOL(llog_origin_handle_close);
EXPORT_SYMBOL(llog_client_ops);
/* single threaded!! */
pc->pc_recurred++;
- if (pc->pc_recurred == 1)
+ if (pc->pc_recurred == 1) {
rc = ptlrpcd_check(pc);
+ if (!rc)
+ ptlrpc_expired_set(pc->pc_set);
+ }
pc->pc_recurred--;
return rc;
continue;
}
- request = ptlrpc_prep_req(import, OBD_LOG_CANCEL, 1,
+ request = ptlrpc_prep_req(import, LUSTRE_LOG_VERSION,
+ OBD_LOG_CANCEL, 1,
&llcd->llcd_cookiebytes,
bufs);
if (request == NULL) {
imp->imp_obd->obd_name);
ptlrpc_deactivate_import(imp);
}
- ptlrpc_connect_import(imp, NULL);
+ /* to control recovery via lctl {disable|enable}_recovery */
+ if (imp->imp_deactive == 0)
+ ptlrpc_connect_import(imp, NULL);
}
/* Wait for recovery to complete and resend. If evicted, then
int rc, size = sizeof(*oqctl);
ENTRY;
- req = ptlrpc_prep_req(exp->exp_imp_reverse, OBD_QC_CALLBACK,
- 1, &size, NULL);
+ req = ptlrpc_prep_req(exp->exp_imp_reverse, LUSTRE_OBD_VERSION,
+ OBD_QC_CALLBACK, 1, &size, NULL);
if (!req)
RETURN(-ENOMEM);
struct client_obd *cli = &exp->exp_obd->u.cli;
struct ptlrpc_request *req;
struct obd_quotactl *body;
- int size = sizeof(*body), opc;
+ int size = sizeof(*body), opc, version;
int rc;
ENTRY;
- if (!strcmp(exp->exp_obd->obd_type->typ_name, LUSTRE_MDC_NAME))
+ if (!strcmp(exp->exp_obd->obd_type->typ_name, LUSTRE_MDC_NAME)) {
+ version = LUSTRE_MDS_VERSION;
opc = MDS_QUOTACHECK;
- else if (!strcmp(exp->exp_obd->obd_type->typ_name, LUSTRE_OSC_NAME))
+ } else if (!strcmp(exp->exp_obd->obd_type->typ_name, LUSTRE_OSC_NAME)) {
+ version = LUSTRE_OST_VERSION;
opc = OST_QUOTACHECK;
- else
+ } else {
RETURN(-EINVAL);
+ }
- req = ptlrpc_prep_req(class_exp2cliimp(exp), opc, 1, &size,
+ req = ptlrpc_prep_req(class_exp2cliimp(exp), version, opc, 1, &size,
NULL);
if (!req)
GOTO(out, rc = -ENOMEM);
/* build dqacq/dqrel request */
LASSERT(qctxt->lqc_import);
- req = ptlrpc_prep_req(qctxt->lqc_import, opc, 1, &size, NULL);
+ req = ptlrpc_prep_req(qctxt->lqc_import, LUSTRE_MDS_VERSION, opc, 1,
+ &size, NULL);
if (!req) {
dqacq_completion(obd, qctxt, qdata, -ENOMEM, opc);
RETURN(-ENOMEM);
{
struct ptlrpc_request *req;
struct obd_quotactl *oqc;
- int size = sizeof(*oqctl), opc;
+ int size = sizeof(*oqctl), opc, version;
int rc;
ENTRY;
- if (!strcmp(exp->exp_obd->obd_type->typ_name, LUSTRE_MDC_NAME))
+ if (!strcmp(exp->exp_obd->obd_type->typ_name, LUSTRE_MDC_NAME)) {
opc = MDS_QUOTACTL;
- else if (!strcmp(exp->exp_obd->obd_type->typ_name, LUSTRE_OSC_NAME))
+ version = LUSTRE_MDS_VERSION;
+ } else if (!strcmp(exp->exp_obd->obd_type->typ_name, LUSTRE_OSC_NAME)) {
opc = OST_QUOTACTL;
- else
+ version = LUSTRE_OST_VERSION;
+ } else {
RETURN(-EINVAL);
+ }
- req = ptlrpc_prep_req(class_exp2cliimp(exp), opc, 1, &size, NULL);
+ req = ptlrpc_prep_req(class_exp2cliimp(exp), version, opc, 1, &size,
+ NULL);
if (!req)
GOTO(out, rc = -ENOMEM);