#include <linux/lustre_ha.h>
#include <linux/lprocfs_status.h>
#include <linux/lustre_log.h>
+#include <linux/lustre_lite.h>
#include <linux/lustre_audit.h>
#include <linux/lustre_gs.h>
+
#include "osc_internal.h"
/* Pack OSC object metadata for disk storage (LE byte order). */
static int osc_punch(struct obd_export *exp, struct obdo *oa,
struct lov_stripe_md *md, obd_size start,
- obd_size end, struct obd_trans_info *oti)
+ obd_size end, struct obd_trans_info *oti,
+ struct lustre_capa *capa)
{
struct ptlrpc_request *request;
struct ost_body *body;
- int rc, size = sizeof(*body);
+ struct lustre_capa *req_capa;
+ int bufcnt = 0;
+ int rc, size[2] = { sizeof(*body), sizeof(*capa) };
ENTRY;
if (!oa) {
}
request = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OBD_VERSION,
- OST_PUNCH, 1, &size, NULL);
+ OST_PUNCH, capa ? 2 : 1, size, NULL);
if (!request)
RETURN(-ENOMEM);
- body = lustre_msg_buf(request->rq_reqmsg, 0, sizeof (*body));
+ body = lustre_msg_buf(request->rq_reqmsg, bufcnt++, sizeof (*body));
+
memcpy(&body->oa, oa, sizeof(*oa));
/* overload the size and blocks fields in the oa with start/end */
body->oa.o_blocks = end;
body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
- request->rq_replen = lustre_msg_size(1, &size);
+ if (capa) {
+ req_capa = lustre_msg_buf(request->rq_reqmsg, bufcnt++,
+ sizeof(*capa));
+ capa_dup2(req_capa, capa);
+ body->oa.o_valid |= OBD_MD_CAPA;
+ }
+
+ request->rq_replen = lustre_msg_size(1, size);
rc = ptlrpc_queue_wait(request);
if (rc)
struct ptlrpc_bulk_desc *desc;
struct client_obd *cli = &imp->imp_obd->u.cli;
struct ost_body *body;
+ struct lustre_id *raw_id = obdo_id(oa);
+ struct obd_capa *ocapa;
+ struct lustre_capa *capa;
struct obd_ioobj *ioobj;
struct niobuf_remote *niobuf;
int niocount;
- int size[3];
- int i;
+ int size[4];
+ int i, bufcnt = 0;
int requested_nob;
int opc;
+ int capa_op;
int rc;
opc = ((cmd & OBD_BRW_WRITE) != 0) ? OST_WRITE : OST_READ;
if (!can_merge_pages(&pga[i - 1], &pga[i]))
niocount++;
- size[0] = sizeof(*body);
- size[1] = sizeof(*ioobj);
- size[2] = niocount * sizeof(*niobuf);
+ capa_op = (opc == OST_WRITE) ? MAY_WRITE : MAY_READ;
+get_capa:
+ ocapa = capa_get(oa->o_fsuid, capa_op, raw_id->li_fid.lf_group,
+ raw_id->li_stc.u.e3s.l3s_ino, CLIENT_CAPA,
+ NULL, NULL, NULL);
+ if (!ocapa) {
+ if (opc == OST_READ && capa_op == MAY_READ) {
+ /* partial write might cause read, MAY_WRITE capability
+ * should be used here */
+ capa_op = MAY_WRITE;
+ goto get_capa;
+ }
+ }
+
+ size[bufcnt++] = sizeof(*body);
+ size[bufcnt++] = sizeof(*ioobj);
+ if (ocapa)
+ size[bufcnt++] = sizeof(*capa);
+ size[bufcnt++] = niocount * sizeof(*niobuf);
- req = ptlrpc_prep_req(imp, LUSTRE_OBD_VERSION, opc, 3, size, NULL);
+ req = ptlrpc_prep_req(imp, LUSTRE_OBD_VERSION, opc, bufcnt, size, NULL);
if (req == NULL)
return (-ENOMEM);
GOTO(out, rc = -ENOMEM);
/* NB request now owns desc and will free it when it gets freed */
- body = lustre_msg_buf(req->rq_reqmsg, 0, sizeof(*body));
- ioobj = lustre_msg_buf(req->rq_reqmsg, 1, sizeof(*ioobj));
- niobuf = lustre_msg_buf(req->rq_reqmsg, 2, niocount * sizeof(*niobuf));
+ bufcnt = 0;
+ body = lustre_msg_buf(req->rq_reqmsg, bufcnt++, sizeof(*body));
+ ioobj = lustre_msg_buf(req->rq_reqmsg, bufcnt++, sizeof(*ioobj));
+ if (ocapa)
+ capa = lustre_msg_buf(req->rq_reqmsg, bufcnt++, sizeof(*capa));
+ niobuf = lustre_msg_buf(req->rq_reqmsg, bufcnt++,
+ niocount * sizeof(*niobuf));
memcpy(&body->oa, oa, sizeof(*oa));
obdo_to_ioobj(oa, ioobj);
ioobj->ioo_bufcnt = niocount;
+ if (ocapa) {
+ capa_dup(capa, ocapa);
+ body->oa.o_valid |= OBD_MD_CAPA;
+ capa_put(ocapa, CLIENT_CAPA);
+ }
+
LASSERT (page_count > 0);
for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
}
LASSERT((void *)(niobuf - niocount) ==
- lustre_msg_buf(req->rq_reqmsg, 2, niocount * sizeof(*niobuf)));
+ lustre_msg_buf(req->rq_reqmsg, bufcnt - 1,
+ niocount * sizeof(*niobuf)));
osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
/* size[0] still sizeof (*body) */
spin_unlock(&oscc->oscc_lock);
RETURN(0);
}
+
if (keylen == strlen("unrecovery") &&
memcmp(key, "unrecovery", keylen) == 0) {
struct osc_creator *oscc = &obd->u.cli.cl_oscc;
spin_unlock(&oscc->oscc_lock);
RETURN(0);
}
+
if (keylen == strlen("initial_recov") &&
memcmp(key, "initial_recov", strlen("initial_recov")) == 0) {
- struct obd_import *imp = exp->exp_obd->u.cli.cl_import;
+ struct obd_import *imp = class_exp2cliimp(exp);
if (vallen != sizeof(int))
RETURN(-EINVAL);
imp->imp_initial_recov = *(int *)val;
RETURN(rc);
}
- if (keylen == strlen("sec") &&
- memcmp(key, "sec", keylen) == 0) {
+ if (keylen == strlen("sec") && memcmp(key, "sec", keylen) == 0) {
struct client_obd *cli = &exp->exp_obd->u.cli;
cli->cl_sec_flavor = ptlrpcs_name2flavor(val);
RETURN(0);
}
+ if (keylen == 8 && memcmp(key, "capa_key", 8) == 0) {
+ struct ptlrpc_request *req;
+ char *bufs[2] = {key, val};
+ unsigned long irqflags;
+ int rc, size[2] = {keylen, vallen};
+
+ LASSERT(vallen == sizeof(struct lustre_capa_key));
+
+ req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OBD_VERSION,
+ OST_SET_INFO, 2, size, bufs);
+ if (req == NULL)
+ RETURN(-ENOMEM);
+
+ spin_lock_irqsave (&req->rq_lock, irqflags);
+ req->rq_replay = 1;
+ spin_unlock_irqrestore (&req->rq_lock, irqflags);
+
+ req->rq_replen = lustre_msg_size(0, NULL);
+ rc = ptlrpc_queue_wait(req);
+ ptlrpc_req_finished(req);
+ RETURN(rc);
+ }
+
if (keylen == strlen("setext") &&
memcmp(key, "setext", keylen) == 0) {
struct client_obd *cli = &exp->exp_obd->u.cli;
RETURN(rc);
}
+
struct obd_ops osc_obd_ops = {
.o_owner = THIS_MODULE,
.o_attach = osc_attach,