# include <liblustre.h>
#endif
-# include <linux/lustre_dlm.h>
-#include <linux/kp30.h>
+#include <linux/lustre_dlm.h>
+#include <libcfs/kp30.h>
#include <linux/lustre_net.h>
+#include <linux/lustre_sec.h>
#include <lustre/lustre_user.h>
#include <linux/obd_ost.h>
#include <linux/obd_lov.h>
struct osc_getattr_async_args *aa;
ENTRY;
- request = ptlrpc_prep_req(class_exp2cliimp(exp), OST_GETATTR, 1,
- &size, NULL);
+ request = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OBD_VERSION,
+ OST_GETATTR, 1, &size, NULL);
if (!request)
RETURN(-ENOMEM);
int rc, size = sizeof(*body);
ENTRY;
- request = ptlrpc_prep_req(class_exp2cliimp(exp), OST_GETATTR, 1,
- &size, NULL);
+ request = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OBD_VERSION,
+ OST_GETATTR, 1, &size, NULL);
if (!request)
RETURN(-ENOMEM);
LASSERT(!(oa->o_valid & OBD_MD_FLGROUP) || oa->o_gr > 0);
- request = ptlrpc_prep_req(class_exp2cliimp(exp), OST_SETATTR, 1, &size,
- NULL);
+ request = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OBD_VERSION,
+ OST_SETATTR, 1, &size, NULL);
if (!request)
RETURN(-ENOMEM);
request->rq_replen = lustre_msg_size(1, &size);
- rc = ptlrpc_queue_wait(request);
- if (rc)
- GOTO(out, rc);
-
- body = lustre_swab_repbuf(request, 0, sizeof(*body),
- lustre_swab_ost_body);
- if (body == NULL)
- GOTO(out, rc = -EPROTO);
+ if (oti != NULL && (oti->oti_flags & OBD_MODE_ASYNC)) {
+ ptlrpcd_add_req(request);
+ rc = 0;
+ } else {
+ rc = ptlrpc_queue_wait(request);
+ if (rc)
+ GOTO(out, rc);
- memcpy(oa, &body->oa, sizeof(*oa));
+ body = lustre_swab_repbuf(request, 0, sizeof(*body),
+ lustre_swab_ost_body);
+ if (body == NULL)
+ GOTO(out, rc = -EPROTO);
+ memcpy(oa, &body->oa, sizeof(*oa));
+ }
EXIT;
out:
ptlrpc_req_finished(request);
int osc_real_create(struct obd_export *exp, struct obdo *oa,
struct lov_stripe_md **ea, struct obd_trans_info *oti)
{
+ struct osc_creator *oscc = &exp->exp_obd->u.cli.cl_oscc;
struct ptlrpc_request *request;
struct ost_body *body;
struct lov_stripe_md *lsm;
RETURN(rc);
}
- request = ptlrpc_prep_req(class_exp2cliimp(exp), OST_CREATE, 1, &size,
- NULL);
+ request = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OBD_VERSION,
+ OST_CREATE, 1, &size, NULL);
if (!request)
GOTO(out, rc = -ENOMEM);
GOTO (out_req, rc = -EPROTO);
}
+ if ((oa->o_valid & OBD_MD_FLFLAGS) && oa->o_flags == OBD_FL_DELORPHAN) {
+ struct obd_import *imp = class_exp2cliimp(exp);
+ /* MDS declares last known object, OSS responses
+ * with next possible object -bzzz */
+ spin_lock(&oscc->oscc_lock);
+ oscc->oscc_next_id = body->oa.o_id;
+ spin_unlock(&oscc->oscc_lock);
+ CDEBUG(D_HA, "%s: set nextid "LPD64" after recovery\n",
+ imp->imp_target_uuid.uuid, oa->o_id);
+ }
memcpy(oa, &body->oa, sizeof(*oa));
/* This should really be sent by the OST */
RETURN(-EINVAL);
}
- request = ptlrpc_prep_req(class_exp2cliimp(exp), OST_PUNCH, 1, &size,
- NULL);
+ request = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OBD_VERSION,
+ OST_PUNCH, 1, &size, NULL);
if (!request)
RETURN(-ENOMEM);
}
static int osc_sync(struct obd_export *exp, struct obdo *oa,
- struct lov_stripe_md *md, obd_size start, obd_size end)
+ struct lov_stripe_md *md, obd_size start,
+ obd_size end)
{
struct ptlrpc_request *request;
struct ost_body *body;
RETURN(-EINVAL);
}
- request = ptlrpc_prep_req(class_exp2cliimp(exp), OST_SYNC, 1, &size,
- NULL);
+ request = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OBD_VERSION,
+ OST_SYNC, 1, &size, NULL);
if (!request)
RETURN(-ENOMEM);
RETURN(-EINVAL);
}
- request = ptlrpc_prep_req(class_exp2cliimp(exp), OST_DESTROY, 1,
- &size, NULL);
+ request = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OBD_VERSION,
+ OST_DESTROY, 1, &size, NULL);
if (!request)
RETURN(-ENOMEM);
memcpy(&body->oa, oa, sizeof(*oa));
request->rq_replen = lustre_msg_size(1, &size);
- rc = ptlrpc_queue_wait(request);
- if (rc)
- GOTO(out, rc);
+ if (oti != NULL && (oti->oti_flags & OBD_MODE_ASYNC)) {
+ ptlrpcd_add_req(request);
+ rc = 0;
+ } else {
+ rc = ptlrpc_queue_wait(request);
+
+ if (rc == -ENOENT)
+ rc = 0;
- body = lustre_swab_repbuf(request, 0, sizeof(*body),
- lustre_swab_ost_body);
- if (body == NULL) {
- CERROR ("Can't unpack body\n");
- GOTO (out, rc = -EPROTO);
- }
+ if (rc) {
+ ptlrpc_req_finished(request);
+ RETURN(rc);
+ }
- memcpy(oa, &body->oa, sizeof(*oa));
+ body = lustre_swab_repbuf(request, 0, sizeof(*body),
+ lustre_swab_ost_body);
+ if (body == NULL) {
+ CERROR ("Can't unpack body\n");
+ ptlrpc_req_finished(request);
+ RETURN(-EPROTO);
+ }
- EXIT;
- out:
- ptlrpc_req_finished(request);
- return rc;
+ memcpy(oa, &body->oa, sizeof(*oa));
+ ptlrpc_req_finished(request);
+ }
+ RETURN(rc);
}
static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
long writing_bytes)
{
- obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
+ obd_valid bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
LASSERT(!(oa->o_valid & bits));
LASSERT(cli->cl_avail_grant >= 0);
}
+static unsigned long rpcs_in_flight(struct client_obd *cli)
+{
+ return cli->cl_r_in_flight + cli->cl_w_in_flight;
+}
+
/* caller must hold loi_list_lock */
void osc_wake_cache_waiters(struct client_obd *cli)
{
/* if still dirty cache but no grant wait for pending RPCs that
* may yet return us some grant before doing sync writes */
- if (cli->cl_brw_in_flight && cli->cl_avail_grant < PAGE_SIZE) {
- CDEBUG(D_CACHE, "%d BRWs in flight, no grant\n",
- cli->cl_brw_in_flight);
- return;
+ if (cli->cl_w_in_flight && cli->cl_avail_grant < PAGE_SIZE) {
+ CDEBUG(D_CACHE, "%u BRW writes in flight, no grant\n",
+ cli->cl_w_in_flight);
}
-
ocw = list_entry(l, struct osc_cache_waiter, ocw_entry);
list_del_init(&ocw->ocw_entry);
if (cli->cl_avail_grant < PAGE_SIZE) {
if (pga->count > nob_read) {
/* EOF inside this page */
- ptr = kmap(pga->pg) + (pga->off & ~PAGE_MASK);
+ ptr = kmap(pga->pg) + (pga->page_offset & ~PAGE_MASK);
memset(ptr + nob_read, 0, pga->count - nob_read);
kunmap(pga->pg);
page_count--;
/* zero remaining pages */
while (page_count-- > 0) {
- ptr = kmap(pga->pg) + (pga->off & ~PAGE_MASK);
+ ptr = kmap(pga->pg) + (pga->page_offset & ~PAGE_MASK);
memset(ptr, 0, pga->count);
kunmap(pga->pg);
pga++;
int requested_nob, int niocount,
obd_count page_count, struct brw_page *pga)
{
- int *remote_rcs, i;
+ int *remote_rcs, i;
/* return error if any niobuf was in error */
remote_rcs = lustre_swab_repbuf(request, 1,
}
if (lustre_msg_swabbed(request->rq_repmsg))
for (i = 0; i < niocount; i++)
- __swab32s(&remote_rcs[i]);
+ __swab32s((__u32 *)&remote_rcs[i]);
for (i = 0; i < niocount; i++) {
if (remote_rcs[i] < 0)
return 0;
}
- return (p1->off + p1->count == p2->off);
+ return (p1->disk_offset + p1->count == p2->disk_offset);
}
#if CHECKSUM_BULK
size[1] = sizeof(*ioobj);
size[2] = niocount * sizeof(*niobuf);
- req = ptlrpc_prep_req(imp, opc, 3, size, NULL);
+ req = ptlrpc_prep_req(imp, LUSTRE_OBD_VERSION, opc, 3, size, NULL);
if (req == NULL)
return (-ENOMEM);
ioobj->ioo_bufcnt = niocount;
LASSERT (page_count > 0);
+
for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
struct brw_page *pg = &pga[i];
struct brw_page *pg_prev = pg - 1;
LASSERT(pg->count > 0);
- LASSERT((pg->off & ~PAGE_MASK)+ pg->count <= PAGE_SIZE);
- LASSERTF(i == 0 || pg->off > pg_prev->off,
+ LASSERTF((pg->page_offset & ~PAGE_MASK)+ pg->count <= PAGE_SIZE,
+ "i: %d pg: %p pg_off: "LPU64", count: %u\n", i, pg,
+ pg->page_offset, pg->count);
+ LASSERTF(i == 0 || pg->disk_offset > pg_prev->disk_offset,
"i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
" prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
i, page_count,
- pg->pg, pg->pg->private, pg->pg->index, pg->off,
+ pg->pg, pg->pg->private, pg->pg->index, pg->disk_offset,
pg_prev->pg, pg_prev->pg->private, pg_prev->pg->index,
- pg_prev->off);
+ pg_prev->disk_offset);
ptlrpc_prep_bulk_page(desc, pg->pg,
- pg->off & ~PAGE_MASK, pg->count);
+ pg->page_offset & ~PAGE_MASK, pg->count);
requested_nob += pg->count;
if (i > 0 && can_merge_pages(pg_prev, pg)) {
niobuf--;
niobuf->len += pg->count;
} else {
- niobuf->offset = pg->off;
+ niobuf->offset = pg->disk_offset;
niobuf->len = pg->count;
niobuf->flags = pg->flag;
}
if (server_cksum != cksum) {
CERROR("Bad checksum: server %x, client %x, server NID "
LPX64" (%s)\n", server_cksum, cksum,
- peer->peer_nid, str);
+ peer->peer_id.nid, str);
cksum_counter = 0;
oa->o_cksum = cksum;
} else if ((cksum_counter & (-cksum_counter)) == cksum_counter){
CWARN("Checksum %u from "LPX64" (%s) OK: %x\n",
- cksum_counter, peer->peer_nid, str, cksum);
+ cksum_counter, peer->peer_id.nid, str, cksum);
}
} else {
static int cksum_missed;
if ((cksum_missed & (-cksum_missed)) == cksum_missed)
CERROR("Request checksum %u from "LPX64", no reply\n",
cksum_missed,
- req->rq_import->imp_connection->c_peer.peer_nid);
+ req->rq_import->imp_connection->c_peer.peer_id.nid);
}
#endif
RETURN(0);
for (i = stride ; i < num ; i++) {
tmp = array[i];
j = i;
- while (j >= stride && array[j - stride].off >
- tmp.off) {
+ while (j >= stride && array[j - stride].disk_offset >
+ tmp.disk_offset) {
array[j] = array[j - stride];
j -= stride;
}
}
static int osc_brw(int cmd, struct obd_export *exp, struct obdo *oa,
- struct lov_stripe_md *md, obd_count page_count,
+ struct lov_stripe_md *lsm, obd_count page_count,
struct brw_page *pga, struct obd_trans_info *oti)
{
ENTRY;
sort_brw_pages(pga, pages_per_brw);
pages_per_brw = check_elan_limit(pga, pages_per_brw);
- rc = osc_brw_internal(cmd, exp, oa, md, pages_per_brw, pga);
+ rc = osc_brw_internal(cmd, exp, oa, lsm, pages_per_brw, pga);
if (rc != 0)
RETURN(rc);
}
static int osc_brw_async(int cmd, struct obd_export *exp, struct obdo *oa,
- struct lov_stripe_md *md, obd_count page_count,
+ struct lov_stripe_md *lsm, obd_count page_count,
struct brw_page *pga, struct ptlrpc_request_set *set,
struct obd_trans_info *oti)
{
sort_brw_pages(pga, pages_per_brw);
pages_per_brw = check_elan_limit(pga, pages_per_brw);
- rc = async_internal(cmd, exp, oa, md, pages_per_brw, pga, set);
+ rc = async_internal(cmd, exp, oa, lsm, pages_per_brw, pga, set);
if (rc != 0)
RETURN(rc);
struct osc_async_page *oap;
struct client_obd *cli;
struct list_head *pos, *n;
+ struct timeval now;
ENTRY;
-
+ do_gettimeofday(&now);
rc = osc_brw_fini_request(request, aa->aa_oa, aa->aa_requested_nob,
aa->aa_nio_count, aa->aa_page_count,
aa->aa_pga, rc);
spin_lock(&cli->cl_loi_list_lock);
+ if (request->rq_reqmsg->opc == OST_WRITE)
+ lprocfs_stime_record(&cli->cl_write_stime, &now,
+ &request->rq_rpcd_start);
+ else
+ lprocfs_stime_record(&cli->cl_read_stime, &now,
+ &request->rq_rpcd_start);
+
+
+
/* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
* is called so we know whether to go to sync BRWs or wait for more
* RPCs to complete */
- cli->cl_brw_in_flight--;
+ if (request->rq_reqmsg->opc == OST_WRITE)
+ cli->cl_w_in_flight--;
+ else
+ cli->cl_r_in_flight--;
/* the caller may re-use the oap after the completion call so
* we need to clean it up a little */
osc_wake_cache_waiters(cli);
osc_check_rpcs(cli);
-
spin_unlock(&cli->cl_loi_list_lock);
obdo_free(aa->aa_oa);
ops = oap->oap_caller_ops;
caller_data = oap->oap_caller_data;
}
- pga[i].off = oap->oap_obj_off + oap->oap_page_off;
+ pga[i].disk_offset = oap->oap_obj_off + oap->oap_page_off;
+ pga[i].page_offset = pga[i].disk_offset;
pga[i].pg = oap->oap_page;
pga[i].count = oap->oap_count;
pga[i].flag = oap->oap_brw_flags;
#ifdef __KERNEL__
if (cmd == OBD_BRW_READ) {
lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
- lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_brw_in_flight);
+ lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
} else {
lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
lprocfs_oh_tally(&cli->cl_write_rpc_hist,
- cli->cl_brw_in_flight);
+ cli->cl_w_in_flight);
}
#endif
spin_lock(&cli->cl_loi_list_lock);
- cli->cl_brw_in_flight++;
+ if (cmd == OBD_BRW_READ)
+ cli->cl_r_in_flight++;
+ else
+ cli->cl_w_in_flight++;
/* queued sync pages can be torn down while the pages
* were between the pending list and the rpc */
list_for_each(pos, &aa->aa_oaps) {
}
}
- CDEBUG(D_INODE, "req %p: %d pages, aa %p. now %d in flight\n", request,
- page_count, aa, cli->cl_brw_in_flight);
+ CDEBUG(D_INODE, "req %p: %d pages, aa %p. now %dr/%dw in flight\n",
+ request, page_count, aa, cli->cl_r_in_flight,
+ cli->cl_w_in_flight);
oap->oap_request = ptlrpc_request_addref(request);
request->rq_interpret_reply = brw_interpret_oap;
ENTRY;
while ((loi = osc_next_loi(cli)) != NULL) {
- LOI_DEBUG(loi, "%d in flight\n", cli->cl_brw_in_flight);
-
- if (cli->cl_brw_in_flight >= cli->cl_max_rpcs_in_flight)
+ LOI_DEBUG(loi, "%lu in flight\n", rpcs_in_flight(cli));
+
+ if (rpcs_in_flight(cli) >= cli->cl_max_rpcs_in_flight)
break;
/* attempt some read/write balancing by alternating between
int rc;
ENTRY;
spin_lock(&cli->cl_loi_list_lock);
- rc = list_empty(&ocw->ocw_entry) || cli->cl_brw_in_flight == 0;
+ rc = list_empty(&ocw->ocw_entry) || rpcs_in_flight(cli) == 0;
spin_unlock(&cli->cl_loi_list_lock);
RETURN(rc);
};
{
struct osc_cache_waiter ocw;
struct l_wait_info lwi = { 0 };
+ struct timeval start, stop;
CDEBUG(D_CACHE, "dirty: %ld dirty_max: %ld dropped: %lu grant: %lu\n",
cli->cl_dirty, cli->cl_dirty_max, cli->cl_lost_grant,
/* Make sure that there are write rpcs in flight to wait for. This
* is a little silly as this object may not have any pending but
* other objects sure might. */
- if (cli->cl_brw_in_flight) {
+ if (cli->cl_w_in_flight) {
list_add_tail(&ocw.ocw_entry, &cli->cl_cache_waiters);
init_waitqueue_head(&ocw.ocw_waitq);
ocw.ocw_oap = oap;
spin_unlock(&cli->cl_loi_list_lock);
CDEBUG(0, "sleeping for cache space\n");
+ do_gettimeofday(&start);
l_wait_event(ocw.ocw_waitq, ocw_granted(cli, &ocw), &lwi);
-
+ do_gettimeofday(&stop);
spin_lock(&cli->cl_loi_list_lock);
+ lprocfs_stime_record(&cli->cl_enter_stime, &stop, &start);
if (!list_empty(&ocw.ocw_entry)) {
list_del(&ocw.ocw_entry);
RETURN(-EINTR);
RETURN(0);
}
-struct osc_async_page *oap_from_cookie(void *cookie)
-{
- struct osc_async_page *oap = cookie;
- if (oap->oap_magic != OAP_MAGIC)
- return ERR_PTR(-EINVAL);
- return oap;
-};
-
static int osc_queue_async_io(struct obd_export *exp, struct lov_stripe_md *lsm,
struct lov_oinfo *loi, void *cookie,
int cmd, obd_off off, int count,
- obd_flag brw_flags, enum async_flags async_flags)
+ obd_flags brw_flags, enum async_flags async_flags)
{
struct client_obd *cli = &exp->exp_obd->u.cli;
struct osc_async_page *oap;
int rc;
ENTRY;
- oap = oap_from_cookie(cookie);
- if (IS_ERR(oap))
- RETURN(PTR_ERR(oap));
+ oap = OAP_FROM_COOKIE(cookie);
if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
RETURN(-EIO);
static int osc_set_async_flags(struct obd_export *exp,
struct lov_stripe_md *lsm,
struct lov_oinfo *loi, void *cookie,
- obd_flag async_flags)
+ obd_flags async_flags)
{
struct client_obd *cli = &exp->exp_obd->u.cli;
struct loi_oap_pages *lop;
int rc = 0;
ENTRY;
- oap = oap_from_cookie(cookie);
- if (IS_ERR(oap))
- RETURN(PTR_ERR(oap));
+ oap = OAP_FROM_COOKIE(cookie);
if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
RETURN(-EIO);
struct lov_oinfo *loi,
struct obd_io_group *oig, void *cookie,
int cmd, obd_off off, int count,
- obd_flag brw_flags,
- obd_flag async_flags)
+ obd_flags brw_flags,
+ obd_flags async_flags)
{
struct client_obd *cli = &exp->exp_obd->u.cli;
struct osc_async_page *oap;
struct loi_oap_pages *lop;
ENTRY;
- oap = oap_from_cookie(cookie);
- if (IS_ERR(oap))
- RETURN(PTR_ERR(oap));
+ oap = OAP_FROM_COOKIE(cookie);
if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
RETURN(-EIO);
int rc = 0;
ENTRY;
- oap = oap_from_cookie(cookie);
- if (IS_ERR(oap))
- RETURN(PTR_ERR(oap));
+ oap = OAP_FROM_COOKIE(cookie);
if (loi == NULL)
loi = &lsm->lsm_oinfo[0];
size[1] = sizeof(struct obd_ioobj);
size[2] = page_count * sizeof(*nioptr);
- request = ptlrpc_prep_req(class_exp2cliimp(exp), OST_SAN_READ, 3,
- size, NULL);
+ request = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OBD_VERSION,
+ OST_SAN_READ, 3, size, NULL);
if (!request)
RETURN(-ENOMEM);
for (mapped = 0; mapped < page_count; mapped++, nioptr++) {
LASSERT(PageLocked(pga[mapped].pg));
LASSERT(mapped == 0 ||
- pga[mapped].off > pga[mapped - 1].off);
+ pga[mapped].disk_offset > pga[mapped - 1].disk_offset);
- nioptr->offset = pga[mapped].off;
+ nioptr->offset = pga[mapped].disk_offset;
nioptr->len = pga[mapped].count;
nioptr->flags = pga[mapped].flag;
}
size[1] = sizeof(struct obd_ioobj);
size[2] = page_count * sizeof(*nioptr);
- request = ptlrpc_prep_req(class_exp2cliimp(exp), OST_SAN_WRITE,
- 3, size, NULL);
+ request = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OBD_VERSION,
+ OST_SAN_WRITE, 3, size, NULL);
if (!request)
RETURN(-ENOMEM);
for (mapped = 0; mapped < page_count; mapped++, nioptr++) {
LASSERT(PageLocked(pga[mapped].pg));
LASSERT(mapped == 0 ||
- pga[mapped].off > pga[mapped - 1].off);
+ pga[mapped].disk_offset > pga[mapped - 1].disk_offset);
- nioptr->offset = pga[mapped].off;
+ nioptr->offset = pga[mapped].disk_offset;
nioptr->len = pga[mapped].count;
nioptr->flags = pga[mapped].flag;
}
{
struct ldlm_lock *lock = ldlm_handle2lock(lockh);
- LASSERT(lock != NULL);
+ if (lock == NULL) {
+ CERROR("lockh %p, data %p - client evicted?\n", lockh, data);
+ return;
+ }
+
l_lock(&lock->l_resource->lr_namespace->ns_lock);
#ifdef __KERNEL__
if (lock->l_ast_data && lock->l_ast_data != data) {
struct inode *new_inode = data;
struct inode *old_inode = lock->l_ast_data;
+ if (!(old_inode->i_state & I_FREEING))
+ LDLM_ERROR(lock, "inconsistent l_ast_data found");
LASSERTF(old_inode->i_state & I_FREEING,
"Found existing inode %p/%lu/%u state %lu in lock: "
"setting data to %p/%lu/%u\n", old_inode,
struct obd_device *obd = exp->exp_obd;
struct ldlm_res_id res_id = { .name = {0} };
struct ost_lvb lvb;
+ struct ldlm_reply *rep;
+ struct ptlrpc_request *req = NULL;
int rc;
ENTRY;
rc = ldlm_lock_match(obd->obd_namespace, 0, &res_id, type, policy, mode,
lockh);
if (rc == 1) {
+ if (ptlrpcs_check_cred(obd->u.cli.cl_import)) {
+ /* return immediately if no credential held */
+ ldlm_lock_decref(lockh, mode);
+ RETURN(-EACCES);
+ }
+
osc_set_data_with_check(lockh, data);
if (*flags & LDLM_FL_HAS_INTENT) {
/* I would like to be able to ASSERT here that rss <=
rc = ldlm_lock_match(obd->obd_namespace, 0, &res_id, type,
policy, LCK_PW, lockh);
if (rc == 1) {
+ if (ptlrpcs_check_cred(obd->u.cli.cl_import)) {
+ /* return immediately if no credential held */
+ ldlm_lock_decref(lockh, LCK_PW);
+ RETURN(-EACCES);
+ }
+
/* FIXME: This is not incredibly elegant, but it might
* be more elegant than adding another parameter to
* lock_match. I want a second opinion. */
RETURN(ELDLM_OK);
}
}
+ if (mode == LCK_PW) {
+ rc = ldlm_lock_match(obd->obd_namespace, 0, &res_id, type,
+ policy, LCK_PR, lockh);
+ if (rc == 1) {
+ rc = ldlm_cli_convert(lockh, mode, flags);
+ if (!rc) {
+ /* Update readers/writers accounting */
+ ldlm_lock_addref(lockh, LCK_PW);
+ ldlm_lock_decref(lockh, LCK_PR);
+ osc_set_data_with_check(lockh, data);
+ RETURN(ELDLM_OK);
+ }
+ /* If the conversion failed, we need to drop refcount
+ on matched lock before we get new one */
+ /* XXX Won't it save us some efforts if we cancel PR
+ lock here? We are going to take PW lock anyway and it
+ will invalidate PR lock */
+ ldlm_lock_decref(lockh, LCK_PR);
+ if (rc != EDEADLOCK) {
+ RETURN(rc);
+ }
+ }
+ }
+
+ if (mode == LCK_PW) {
+ rc = ldlm_lock_match(obd->obd_namespace, 0, &res_id, type,
+ policy, LCK_PR, lockh);
+ if (rc == 1) {
+ rc = ldlm_cli_convert(lockh, mode, flags);
+ if (!rc) {
+ /* Update readers/writers accounting */
+ ldlm_lock_addref(lockh, LCK_PW);
+ ldlm_lock_decref(lockh, LCK_PR);
+ osc_set_data_with_check(lockh, data);
+ RETURN(ELDLM_OK);
+ }
+ /* If the conversion failed, we need to drop refcount
+ on matched lock before we get new one */
+ /* XXX Won't it save us some efforts if we cancel PR
+ lock here? We are going to take PW lock anyway and it
+ will invalidate PR lock */
+ ldlm_lock_decref(lockh, LCK_PR);
+ if (rc != EDEADLOCK) {
+ RETURN(rc);
+ }
+ }
+ }
no_match:
- rc = ldlm_cli_enqueue(exp, NULL, obd->obd_namespace, res_id, type,
+ if (*flags & LDLM_FL_HAS_INTENT) {
+ int size[2] = {0, sizeof(struct ldlm_request)};
+
+ req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_DLM_VERSION,
+ LDLM_ENQUEUE, 2, size, NULL);
+ if (req == NULL)
+ RETURN(-ENOMEM);
+
+ size[0] = sizeof(*rep);
+ size[1] = sizeof(lvb);
+ req->rq_replen = lustre_msg_size(2, size);
+ }
+ rc = ldlm_cli_enqueue(exp, req, obd->obd_namespace, res_id, type,
policy, mode, flags, bl_cb, cp_cb, gl_cb, data,
&lvb, sizeof(lvb), lustre_swab_ost_lvb, lockh);
+ if (req != NULL) {
+ if (rc == ELDLM_LOCK_ABORTED) {
+ /* swabbed by ldlm_cli_enqueue() */
+ LASSERT_REPSWABBED(req, 0);
+ rep = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*rep));
+ LASSERT(rep != NULL);
+ if (rep->lock_policy_res1)
+ rc = rep->lock_policy_res1;
+ }
+ ptlrpc_req_finished(req);
+ }
if ((*flags & LDLM_FL_HAS_INTENT && rc == ELDLM_LOCK_ABORTED) || !rc) {
CDEBUG(D_INODE, "received kms == "LPU64", blocks == "LPU64"\n",
rc = ldlm_lock_match(obd->obd_namespace, *flags, &res_id, type,
policy, mode, lockh);
if (rc) {
- if (!(*flags & LDLM_FL_TEST_LOCK))
+ // if (!(*flags & LDLM_FL_TEST_LOCK))
osc_set_data_with_check(lockh, data);
RETURN(rc);
}
}
static int osc_cancel_unused(struct obd_export *exp,
- struct lov_stripe_md *lsm, int flags, void *opaque)
+ struct lov_stripe_md *lsm,
+ int flags, void *opaque)
{
struct obd_device *obd = class_exp2obd(exp);
struct ldlm_res_id res_id = { .name = {0} }, *resp = NULL;
* during mount that would help a bit). Having relative timestamps
* is not so great if request processing is slow, while absolute
* timestamps are not ideal because they need time synchronization. */
- request = ptlrpc_prep_req(obd->u.cli.cl_import, OST_STATFS,0,NULL,NULL);
+ request = ptlrpc_prep_req(obd->u.cli.cl_import, LUSTRE_OBD_VERSION,
+ OST_STATFS, 0, NULL, NULL);
if (!request)
RETURN(-ENOMEM);
err = ptlrpc_set_import_active(obd->u.cli.cl_import,
data->ioc_offset);
GOTO(out, err);
+ case IOC_OSC_CTL_RECOVERY:
+ err = ptlrpc_import_control_recovery(obd->u.cli.cl_import,
+ data->ioc_offset);
+ GOTO(out, err);
default:
CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n", cmd, current->comm);
GOTO(out, err = -ENOTTY);
return err;
}
-static int osc_get_info(struct obd_export *exp, obd_count keylen,
+static int osc_get_info(struct obd_export *exp, __u32 keylen,
void *key, __u32 *vallen, void *val)
{
ENTRY;
obd_id *reply;
char *bufs[1] = {key};
int rc;
- req = ptlrpc_prep_req(class_exp2cliimp(exp), OST_GET_INFO, 1,
- &keylen, bufs);
+ req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OBD_VERSION,
+ OST_GET_INFO, 1, (int *)&keylen, bufs);
if (req == NULL)
RETURN(-ENOMEM);
- req->rq_replen = lustre_msg_size(1, vallen);
+ req->rq_replen = lustre_msg_size(1, (int *)vallen);
rc = ptlrpc_queue_wait(req);
if (rc)
GOTO(out, rc);
ptlrpc_req_finished(req);
RETURN(rc);
}
- RETURN(-EINVAL);
+ RETURN(-EPROTO);
}
static int osc_set_info(struct obd_export *exp, obd_count keylen,
void *key, obd_count vallen, void *val)
{
- struct ptlrpc_request *req;
struct obd_device *obd = exp->exp_obd;
struct obd_import *imp = class_exp2cliimp(exp);
struct llog_ctxt *ctxt;
- int rc, size[2] = {keylen, vallen};
- char *bufs[2] = {key, val};
+ int rc = 0;
ENTRY;
- if (keylen == strlen("next_id") &&
- memcmp(key, "next_id", strlen("next_id")) == 0) {
- if (vallen != sizeof(obd_id))
- RETURN(-EINVAL);
- obd->u.cli.cl_oscc.oscc_next_id = *((obd_id*)val) + 1;
- CDEBUG(D_HA, "%s: set oscc_next_id = "LPU64"\n",
- exp->exp_obd->obd_name,
- obd->u.cli.cl_oscc.oscc_next_id);
-
- RETURN(0);
- }
-
- if (keylen == strlen("growth_count") &&
- memcmp(key, "growth_count", strlen("growth_count")) == 0) {
- if (vallen != sizeof(int))
- RETURN(-EINVAL);
- obd->u.cli.cl_oscc.oscc_max_grow_count = *((int*)val);
- RETURN(0);
- }
-
if (keylen == strlen("unlinked") &&
memcmp(key, "unlinked", keylen) == 0) {
struct osc_creator *oscc = &obd->u.cli.cl_oscc;
RETURN(0);
}
- if (keylen < strlen("mds_conn") ||
- memcmp(key, "mds_conn", strlen("mds_conn")) != 0)
+ if (keylen == strlen("async") &&
+ memcmp(key, "async", keylen) == 0) {
+ struct client_obd *cl = &obd->u.cli;
+ if (vallen != sizeof(int))
+ RETURN(-EINVAL);
+ cl->cl_async = *(int *)val;
+ CDEBUG(D_HA, "%s: set async = %d\n",
+ obd->obd_name, cl->cl_async);
+ RETURN(0);
+ }
+
+ if (keylen == strlen("sec") &&
+ memcmp(key, "sec", keylen) == 0) {
+ struct client_obd *cli = &exp->exp_obd->u.cli;
+
+ if (vallen == strlen("null") &&
+ memcmp(val, "null", vallen) == 0) {
+ cli->cl_sec_flavor = PTLRPC_SEC_NULL;
+ cli->cl_sec_subflavor = 0;
+ RETURN(0);
+ }
+ if (vallen == strlen("krb5i") &&
+ memcmp(val, "krb5i", vallen) == 0) {
+ cli->cl_sec_flavor = PTLRPC_SEC_GSS;
+ cli->cl_sec_subflavor = PTLRPC_SEC_GSS_KRB5I;
+ RETURN(0);
+ }
+ if (vallen == strlen("krb5p") &&
+ memcmp(val, "krb5p", vallen) == 0) {
+ cli->cl_sec_flavor = PTLRPC_SEC_GSS;
+ cli->cl_sec_subflavor = PTLRPC_SEC_GSS_KRB5P;
+ RETURN(0);
+ }
+ CERROR("unrecognized security type %s\n", (char*) val);
RETURN(-EINVAL);
+ }
- req = ptlrpc_prep_req(imp, OST_SET_INFO, 2, size, bufs);
- if (req == NULL)
- RETURN(-ENOMEM);
+ if (keylen == strlen("flush_cred") &&
+ memcmp(key, "flush_cred", keylen) == 0) {
+ struct client_obd *cli = &exp->exp_obd->u.cli;
- req->rq_replen = lustre_msg_size(0, NULL);
- rc = ptlrpc_queue_wait(req);
- ptlrpc_req_finished(req);
+ if (cli->cl_import)
+ ptlrpcs_import_flush_creds(cli->cl_import,
+ *((uid_t *) val));
+ RETURN(0);
+ }
+
+ if (keylen < strlen("mds_conn") ||
+ memcmp(key, "mds_conn", keylen) != 0)
+ RETURN(-EINVAL);
- ctxt = llog_get_context(&exp->exp_obd->obd_llogs, LLOG_UNLINK_ORIG_CTXT);
+ ctxt = llog_get_context(&exp->exp_obd->obd_llogs,
+ LLOG_UNLINK_ORIG_CTXT);
if (ctxt) {
- rc = llog_initiator_connect(ctxt);
- if (rc)
- RETURN(rc);
+ if (rc == 0)
+ rc = llog_initiator_connect(ctxt);
+ else
+ CERROR("cannot establish the connect for "
+ "ctxt %p: %d\n", ctxt, rc);
}
imp->imp_server_timeout = 1;
};
static struct llog_operations osc_unlink_orig_logops;
+
static int osc_llog_init(struct obd_device *obd, struct obd_llogs *llogs,
struct obd_device *tgt, int count,
struct llog_catid *catid)
RETURN(rc);
}
-
static int osc_connect(struct lustre_handle *exph,
- struct obd_device *obd, struct obd_uuid *cluuid)
+ struct obd_device *obd, struct obd_uuid *cluuid,
+ struct obd_connect_data *data,
+ unsigned long connect_flags)
{
int rc;
-
- rc = client_connect_import(exph, obd, cluuid);
-
- return rc;
+ ENTRY;
+ rc = client_connect_import(exph, obd, cluuid, data, connect_flags);
+ RETURN(rc);
}
-static int osc_disconnect(struct obd_export *exp, int flags)
+static int osc_disconnect(struct obd_export *exp, unsigned long flags)
{
struct obd_device *obd = class_exp2obd(exp);
struct llog_ctxt *ctxt;
int rc;
+ ENTRY;
ctxt = llog_get_context(&obd->obd_llogs, LLOG_SIZE_REPL_CTXT);
if (obd->u.cli.cl_conn_count == 1)
llog_sync(ctxt, exp);
rc = client_disconnect_export(exp, flags);
- return rc;
+ RETURN(rc);
}
static int osc_import_event(struct obd_device *obd,
}
case IMP_EVENT_INACTIVE: {
if (obd->obd_observer)
- rc = obd_notify(obd->obd_observer, obd, 0);
+ rc = obd_notify(obd->obd_observer, obd, 0, 0);
break;
}
case IMP_EVENT_INVALIDATE: {
break;
}
case IMP_EVENT_ACTIVE: {
+ /* Only do this on the MDS OSC's */
+ if (imp->imp_server_timeout) {
+ struct osc_creator *oscc = &obd->u.cli.cl_oscc;
+
+ spin_lock(&oscc->oscc_lock);
+ oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
+ spin_unlock(&oscc->oscc_lock);
+ }
+
if (obd->obd_observer)
- rc = obd_notify(obd->obd_observer, obd, 1);
+ rc = obd_notify(obd->obd_observer, obd, 1, 0);
break;
}
default:
static int osc_cleanup(struct obd_device *obd, int flags)
{
+ struct osc_creator *oscc = &obd->u.cli.cl_oscc;
int rc;
rc = ldlm_cli_cancel_unused(obd->obd_namespace, NULL,
if (rc)
RETURN(rc);
+ spin_lock(&oscc->oscc_lock);
+ oscc->oscc_flags &= ~OSCC_FLAG_RECOVERING;
+ oscc->oscc_flags |= OSCC_FLAG_EXITING;
+ spin_unlock(&oscc->oscc_lock);
+
rc = client_obd_cleanup(obd, flags);
ptlrpcd_decref();
RETURN(rc);
.o_detach = osc_detach,
.o_setup = osc_setup,
.o_cleanup = osc_cleanup,
+ .o_add_conn = client_import_add_conn,
+ .o_del_conn = client_import_del_conn,
.o_connect = osc_connect,
.o_disconnect = osc_disconnect,
.o_statfs = osc_statfs,
.o_attach = osc_attach,
.o_detach = osc_detach,
.o_cleanup = client_obd_cleanup,
+ .o_add_conn = client_import_add_conn,
+ .o_del_conn = client_import_del_conn,
.o_connect = osc_connect,
.o_disconnect = client_disconnect_export,
.o_statfs = osc_statfs,