1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (C) 2001-2003 Cluster File Systems, Inc.
5 * Author Peter Braam <braam@clusterfs.com>
7 * This file is part of the Lustre file system, http://www.lustre.org
8 * Lustre is a trademark of Cluster File Systems, Inc.
10 * You may have signed or agreed to another license before downloading
11 * this software. If so, you are bound by the terms and conditions
12 * of that agreement, and the following does not apply to you. See the
13 * LICENSE file included with this distribution for more information.
15 * If you did not agree to a different license, then this copy of Lustre
16 * is open source software; you can redistribute it and/or modify it
17 * under the terms of version 2 of the GNU General Public License as
18 * published by the Free Software Foundation.
20 * In either case, Lustre is distributed in the hope that it will be
21 * useful, but WITHOUT ANY WARRANTY; without even the implied warranty
22 * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23 * license text for more details.
25 * For testing and management it is treated as an obd_device,
26 * although * it does not export a full OBD method table (the
27 * requests are coming * in over the wire, so object target modules
28 * do not have a full * method table.)
33 # define EXPORT_SYMTAB
35 #define DEBUG_SUBSYSTEM S_OSC
38 # include <libcfs/libcfs.h>
39 #else /* __KERNEL__ */
40 # include <liblustre.h>
43 #include <lustre_dlm.h>
44 #include <libcfs/kp30.h>
45 #include <lustre_net.h>
46 #include <lustre/lustre_user.h>
54 #include <lustre_ha.h>
55 #include <lprocfs_status.h>
56 #include <lustre_log.h>
57 #include <lustre_debug.h>
58 #include <lustre_param.h>
59 #include "osc_internal.h"
61 static quota_interface_t *quota_interface = NULL;
62 extern quota_interface_t osc_quota_interface;
64 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
67 atomic_t osc_resend_time;
69 /* Pack OSC object metadata for disk storage (LE byte order). */
70 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
71 struct lov_stripe_md *lsm)
76 lmm_size = sizeof(**lmmp);
81 OBD_FREE(*lmmp, lmm_size);
87 OBD_ALLOC(*lmmp, lmm_size);
93 LASSERT(lsm->lsm_object_id);
94 LASSERT(lsm->lsm_object_gr);
95 (*lmmp)->lmm_object_id = cpu_to_le64(lsm->lsm_object_id);
96 (*lmmp)->lmm_object_gr = cpu_to_le64(lsm->lsm_object_gr);
102 /* Unpack OSC object metadata from disk storage (LE byte order). */
103 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
104 struct lov_mds_md *lmm, int lmm_bytes)
110 if (lmm_bytes < sizeof (*lmm)) {
111 CERROR("lov_mds_md too small: %d, need %d\n",
112 lmm_bytes, (int)sizeof(*lmm));
115 /* XXX LOV_MAGIC etc check? */
117 if (lmm->lmm_object_id == 0) {
118 CERROR("lov_mds_md: zero lmm_object_id\n");
123 lsm_size = lov_stripe_md_size(1);
127 if (*lsmp != NULL && lmm == NULL) {
128 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
129 OBD_FREE(*lsmp, lsm_size);
135 OBD_ALLOC(*lsmp, lsm_size);
138 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
139 if ((*lsmp)->lsm_oinfo[0] == NULL) {
140 OBD_FREE(*lsmp, lsm_size);
143 loi_init((*lsmp)->lsm_oinfo[0]);
147 /* XXX zero *lsmp? */
148 (*lsmp)->lsm_object_id = le64_to_cpu (lmm->lmm_object_id);
149 (*lsmp)->lsm_object_gr = le64_to_cpu (lmm->lmm_object_gr);
150 LASSERT((*lsmp)->lsm_object_id);
151 LASSERT((*lsmp)->lsm_object_gr);
154 (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
159 static inline void osc_pack_capa(struct ptlrpc_request *req, int offset,
160 struct ost_body *body, void *capa)
162 struct obd_capa *oc = (struct obd_capa *)capa;
163 struct lustre_capa *c;
168 c = lustre_msg_buf(req->rq_reqmsg, offset, sizeof(*c));
171 body->oa.o_valid |= OBD_MD_FLOSSCAPA;
172 DEBUG_CAPA(D_SEC, c, "pack");
175 static inline void osc_pack_req_body(struct ptlrpc_request *req, int offset,
176 struct obd_info *oinfo)
178 struct ost_body *body;
180 body = lustre_msg_buf(req->rq_reqmsg, offset, sizeof(*body));
181 body->oa = *oinfo->oi_oa;
182 osc_pack_capa(req, offset + 1, body, oinfo->oi_capa);
185 static int osc_getattr_interpret(struct ptlrpc_request *req,
186 struct osc_async_args *aa, int rc)
188 struct ost_body *body;
194 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
195 lustre_swab_ost_body);
197 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
198 memcpy(aa->aa_oi->oi_oa, &body->oa, sizeof(*aa->aa_oi->oi_oa));
200 /* This should really be sent by the OST */
201 aa->aa_oi->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
202 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
204 CERROR("can't unpack ost_body\n");
206 aa->aa_oi->oi_oa->o_valid = 0;
209 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
213 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
214 struct ptlrpc_request_set *set)
216 struct ptlrpc_request *req;
217 struct ost_body *body;
218 int size[3] = { sizeof(struct ptlrpc_body), sizeof(*body) };
219 struct osc_async_args *aa;
222 size[REQ_REC_OFF + 1] = oinfo->oi_capa ? sizeof(struct lustre_capa) : 0;
223 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
224 OST_GETATTR, 3, size,NULL);
228 osc_pack_req_body(req, REQ_REC_OFF, oinfo);
230 ptlrpc_req_set_repsize(req, 2, size);
231 req->rq_interpret_reply = osc_getattr_interpret;
233 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
234 aa = (struct osc_async_args *)&req->rq_async_args;
237 ptlrpc_set_add_req(set, req);
241 static int osc_getattr(struct obd_export *exp, struct obd_info *oinfo)
243 struct ptlrpc_request *req;
244 struct ost_body *body;
245 int rc, size[3] = { sizeof(struct ptlrpc_body), sizeof(*body) };
248 size[REQ_REC_OFF + 1] = oinfo->oi_capa ? sizeof(struct lustre_capa) : 0;
249 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
250 OST_GETATTR, 3, size, NULL);
254 osc_pack_req_body(req, REQ_REC_OFF, oinfo);
256 ptlrpc_req_set_repsize(req, 2, size);
258 rc = ptlrpc_queue_wait(req);
260 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
264 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
265 lustre_swab_ost_body);
267 CERROR ("can't unpack ost_body\n");
268 GOTO (out, rc = -EPROTO);
271 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
272 *oinfo->oi_oa = body->oa;
274 /* This should really be sent by the OST */
275 oinfo->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
276 oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
280 ptlrpc_req_finished(req);
284 static int osc_setattr(struct obd_export *exp, struct obd_info *oinfo,
285 struct obd_trans_info *oti)
287 struct ptlrpc_request *req;
288 struct ost_body *body;
289 int rc, size[3] = { sizeof(struct ptlrpc_body), sizeof(*body) };
292 LASSERT(!(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP) ||
293 oinfo->oi_oa->o_gr > 0);
294 size[REQ_REC_OFF + 1] = oinfo->oi_capa ? sizeof(struct lustre_capa) : 0;
295 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
296 OST_SETATTR, 3, size, NULL);
300 osc_pack_req_body(req, REQ_REC_OFF, oinfo);
302 ptlrpc_req_set_repsize(req, 2, size);
304 rc = ptlrpc_queue_wait(req);
308 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
309 lustre_swab_ost_body);
311 GOTO(out, rc = -EPROTO);
313 *oinfo->oi_oa = body->oa;
317 ptlrpc_req_finished(req);
321 static int osc_setattr_interpret(struct ptlrpc_request *req,
322 struct osc_async_args *aa, int rc)
324 struct ost_body *body;
330 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
331 lustre_swab_ost_body);
333 CERROR("can't unpack ost_body\n");
334 GOTO(out, rc = -EPROTO);
337 *aa->aa_oi->oi_oa = body->oa;
339 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
343 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
344 struct obd_trans_info *oti,
345 struct ptlrpc_request_set *rqset)
347 struct ptlrpc_request *req;
348 int size[3] = { sizeof(struct ptlrpc_body), sizeof(struct ost_body) };
349 struct osc_async_args *aa;
352 size[REQ_REC_OFF + 1] = oinfo->oi_capa ? sizeof(struct lustre_capa) : 0;
353 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
354 OST_SETATTR, 3, size, NULL);
358 osc_pack_req_body(req, REQ_REC_OFF, oinfo);
359 if (oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE) {
361 *obdo_logcookie(oinfo->oi_oa) = *oti->oti_logcookies;
364 ptlrpc_req_set_repsize(req, 2, size);
365 /* do mds to ost setattr asynchronouly */
367 /* Do not wait for response. */
368 ptlrpcd_add_req(req);
370 req->rq_interpret_reply = osc_setattr_interpret;
372 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
373 aa = (struct osc_async_args *)&req->rq_async_args;
376 ptlrpc_set_add_req(rqset, req);
382 int osc_real_create(struct obd_export *exp, struct obdo *oa,
383 struct lov_stripe_md **ea, struct obd_trans_info *oti)
385 struct ptlrpc_request *req;
386 struct ost_body *body;
387 struct lov_stripe_md *lsm;
388 int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
396 rc = obd_alloc_memmd(exp, &lsm);
401 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
402 OST_CREATE, 2, size, NULL);
404 GOTO(out, rc = -ENOMEM);
406 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
409 ptlrpc_req_set_repsize(req, 2, size);
410 if (oa->o_valid & OBD_MD_FLINLINE) {
411 LASSERT((oa->o_valid & OBD_MD_FLFLAGS) &&
412 oa->o_flags == OBD_FL_DELORPHAN);
414 "delorphan from OST integration");
415 /* Don't resend the delorphan req */
416 req->rq_no_resend = req->rq_no_delay = 1;
419 rc = ptlrpc_queue_wait(req);
423 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
424 lustre_swab_ost_body);
426 CERROR ("can't unpack ost_body\n");
427 GOTO (out_req, rc = -EPROTO);
432 /* This should really be sent by the OST */
433 oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
434 oa->o_valid |= OBD_MD_FLBLKSZ;
436 /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
437 * have valid lsm_oinfo data structs, so don't go touching that.
438 * This needs to be fixed in a big way.
440 lsm->lsm_object_id = oa->o_id;
441 lsm->lsm_object_gr = oa->o_gr;
445 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
447 if (oa->o_valid & OBD_MD_FLCOOKIE) {
448 if (!oti->oti_logcookies)
449 oti_alloc_cookies(oti, 1);
450 *oti->oti_logcookies = *obdo_logcookie(oa);
454 CDEBUG(D_HA, "transno: "LPD64"\n",
455 lustre_msg_get_transno(req->rq_repmsg));
458 ptlrpc_req_finished(req);
461 obd_free_memmd(exp, &lsm);
465 static int osc_punch_interpret(struct ptlrpc_request *req,
466 struct osc_async_args *aa, int rc)
468 struct ost_body *body;
474 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof (*body),
475 lustre_swab_ost_body);
477 CERROR ("can't unpack ost_body\n");
478 GOTO(out, rc = -EPROTO);
481 *aa->aa_oi->oi_oa = body->oa;
483 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
487 static int osc_punch(struct obd_export *exp, struct obd_info *oinfo,
488 struct obd_trans_info *oti,
489 struct ptlrpc_request_set *rqset)
491 struct ptlrpc_request *req;
492 struct osc_async_args *aa;
493 struct ost_body *body;
494 int size[3] = { sizeof(struct ptlrpc_body), sizeof(*body) };
502 size[REQ_REC_OFF + 1] = oinfo->oi_capa? sizeof(struct lustre_capa) : 0;
503 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
504 OST_PUNCH, 3, size, NULL);
508 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
510 osc_pack_req_body(req, REQ_REC_OFF, oinfo);
511 /* overload the size and blocks fields in the oa with start/end */
512 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
513 body->oa.o_size = oinfo->oi_policy.l_extent.start;
514 body->oa.o_blocks = oinfo->oi_policy.l_extent.end;
515 body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
517 ptlrpc_req_set_repsize(req, 2, size);
519 req->rq_interpret_reply = osc_punch_interpret;
520 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
521 aa = (struct osc_async_args *)&req->rq_async_args;
523 ptlrpc_set_add_req(rqset, req);
528 static int osc_sync(struct obd_export *exp, struct obdo *oa,
529 struct lov_stripe_md *md, obd_size start, obd_size end,
532 struct ptlrpc_request *req;
533 struct ost_body *body;
534 int rc, size[3] = { sizeof(struct ptlrpc_body), sizeof(*body) };
542 size[REQ_REC_OFF + 1] = capa ? sizeof(struct lustre_capa) : 0;
544 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
545 OST_SYNC, 3, size, NULL);
549 /* overload the size and blocks fields in the oa with start/end */
550 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
552 body->oa.o_size = start;
553 body->oa.o_blocks = end;
554 body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
556 osc_pack_capa(req, REQ_REC_OFF + 1, body, capa);
558 ptlrpc_req_set_repsize(req, 2, size);
560 rc = ptlrpc_queue_wait(req);
564 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
565 lustre_swab_ost_body);
567 CERROR ("can't unpack ost_body\n");
568 GOTO (out, rc = -EPROTO);
575 ptlrpc_req_finished(req);
579 /* Find and cancel locally locks matched by @mode in the resource found by
580 * @objid. Found locks are added into @cancel list. Returns the amount of
581 * locks added to @cancels list. */
582 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
583 struct list_head *cancels, ldlm_mode_t mode,
586 struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
587 struct ldlm_res_id res_id = { .name = { oa->o_id, 0, oa->o_gr, 0 } };
588 struct ldlm_resource *res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
595 count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
596 lock_flags, 0, NULL);
597 ldlm_resource_putref(res);
601 /* Destroy requests can be async always on the client, and we don't even really
602 * care about the return code since the client cannot do anything at all about
604 * When the MDS is unlinking a filename, it saves the file objects into a
605 * recovery llog, and these object records are cancelled when the OST reports
606 * they were destroyed and sync'd to disk (i.e. transaction committed).
607 * If the client dies, or the OST is down when the object should be destroyed,
608 * the records are not cancelled, and when the OST reconnects to the MDS next,
609 * it will retrieve the llog unlink logs and then sends the log cancellation
610 * cookies to the MDS after committing destroy transactions. */
611 static int osc_destroy(struct obd_export *exp, struct obdo *oa,
612 struct lov_stripe_md *ea, struct obd_trans_info *oti,
613 struct obd_export *md_export)
615 CFS_LIST_HEAD(cancels);
616 struct ptlrpc_request *req;
617 struct ost_body *body;
618 int size[3] = { sizeof(struct ptlrpc_body), sizeof(*body), 0 };
619 int count, bufcount = 2;
627 count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
628 LDLM_FL_DISCARD_DATA);
629 if (exp_connect_cancelset(exp) && count) {
631 size[REQ_REC_OFF + 1] = ldlm_request_bufsize(count,OST_DESTROY);
633 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
634 OST_DESTROY, bufcount, size, NULL);
635 if (exp_connect_cancelset(exp) && req)
636 ldlm_cli_cancel_list(&cancels, count, req, REQ_REC_OFF + 1, 0);
638 ldlm_lock_list_put(&cancels, l_bl_ast, count);
643 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
645 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
646 if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
647 memcpy(obdo_logcookie(oa), oti->oti_logcookies,
648 sizeof(*oti->oti_logcookies));
651 ptlrpc_req_set_repsize(req, 2, size);
653 ptlrpcd_add_req(req);
657 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
660 obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
662 LASSERT(!(oa->o_valid & bits));
665 client_obd_list_lock(&cli->cl_loi_list_lock);
666 oa->o_dirty = cli->cl_dirty;
667 if (cli->cl_dirty > cli->cl_dirty_max) {
668 CERROR("dirty %lu > dirty_max %lu\n",
669 cli->cl_dirty, cli->cl_dirty_max);
671 } else if (atomic_read(&obd_dirty_pages) > obd_max_dirty_pages) {
672 CERROR("dirty %d > system dirty_max %d\n",
673 atomic_read(&obd_dirty_pages), obd_max_dirty_pages);
675 } else if (cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff) {
676 CERROR("dirty %lu - dirty_max %lu too big???\n",
677 cli->cl_dirty, cli->cl_dirty_max);
680 long max_in_flight = (cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT)*
681 (cli->cl_max_rpcs_in_flight + 1);
682 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
684 oa->o_grant = cli->cl_avail_grant;
685 oa->o_dropped = cli->cl_lost_grant;
686 cli->cl_lost_grant = 0;
687 client_obd_list_unlock(&cli->cl_loi_list_lock);
688 CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
689 oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
692 /* caller must hold loi_list_lock */
693 static void osc_consume_write_grant(struct client_obd *cli,
694 struct brw_page *pga)
696 atomic_inc(&obd_dirty_pages);
697 cli->cl_dirty += CFS_PAGE_SIZE;
698 cli->cl_avail_grant -= CFS_PAGE_SIZE;
699 pga->flag |= OBD_BRW_FROM_GRANT;
700 CDEBUG(D_CACHE, "using %lu grant credits for brw %p page %p\n",
701 CFS_PAGE_SIZE, pga, pga->pg);
702 LASSERT(cli->cl_avail_grant >= 0);
705 /* the companion to osc_consume_write_grant, called when a brw has completed.
706 * must be called with the loi lock held. */
707 static void osc_release_write_grant(struct client_obd *cli,
708 struct brw_page *pga, int sent)
710 int blocksize = cli->cl_import->imp_obd->obd_osfs.os_bsize ? : 4096;
713 if (!(pga->flag & OBD_BRW_FROM_GRANT)) {
718 pga->flag &= ~OBD_BRW_FROM_GRANT;
719 atomic_dec(&obd_dirty_pages);
720 cli->cl_dirty -= CFS_PAGE_SIZE;
722 cli->cl_lost_grant += CFS_PAGE_SIZE;
723 CDEBUG(D_CACHE, "lost grant: %lu avail grant: %lu dirty: %lu\n",
724 cli->cl_lost_grant, cli->cl_avail_grant, cli->cl_dirty);
725 } else if (CFS_PAGE_SIZE != blocksize && pga->count != CFS_PAGE_SIZE) {
726 /* For short writes we shouldn't count parts of pages that
727 * span a whole block on the OST side, or our accounting goes
728 * wrong. Should match the code in filter_grant_check. */
729 int offset = pga->off & ~CFS_PAGE_MASK;
730 int count = pga->count + (offset & (blocksize - 1));
731 int end = (offset + pga->count) & (blocksize - 1);
733 count += blocksize - end;
735 cli->cl_lost_grant += CFS_PAGE_SIZE - count;
736 CDEBUG(D_CACHE, "lost %lu grant: %lu avail: %lu dirty: %lu\n",
737 CFS_PAGE_SIZE - count, cli->cl_lost_grant,
738 cli->cl_avail_grant, cli->cl_dirty);
744 static unsigned long rpcs_in_flight(struct client_obd *cli)
746 return cli->cl_r_in_flight + cli->cl_w_in_flight;
749 /* caller must hold loi_list_lock */
750 void osc_wake_cache_waiters(struct client_obd *cli)
752 struct list_head *l, *tmp;
753 struct osc_cache_waiter *ocw;
756 list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
757 /* if we can't dirty more, we must wait until some is written */
758 if ((cli->cl_dirty + CFS_PAGE_SIZE > cli->cl_dirty_max) ||
759 (atomic_read(&obd_dirty_pages) + 1 > obd_max_dirty_pages)) {
760 CDEBUG(D_CACHE, "no dirty room: dirty: %ld "
761 "osc max %ld, sys max %d\n", cli->cl_dirty,
762 cli->cl_dirty_max, obd_max_dirty_pages);
766 /* if still dirty cache but no grant wait for pending RPCs that
767 * may yet return us some grant before doing sync writes */
768 if (cli->cl_w_in_flight && cli->cl_avail_grant < CFS_PAGE_SIZE) {
769 CDEBUG(D_CACHE, "%u BRW writes in flight, no grant\n",
770 cli->cl_w_in_flight);
774 ocw = list_entry(l, struct osc_cache_waiter, ocw_entry);
775 list_del_init(&ocw->ocw_entry);
776 if (cli->cl_avail_grant < CFS_PAGE_SIZE) {
777 /* no more RPCs in flight to return grant, do sync IO */
778 ocw->ocw_rc = -EDQUOT;
779 CDEBUG(D_INODE, "wake oap %p for sync\n", ocw->ocw_oap);
781 osc_consume_write_grant(cli,
782 &ocw->ocw_oap->oap_brw_page);
785 cfs_waitq_signal(&ocw->ocw_waitq);
791 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
793 client_obd_list_lock(&cli->cl_loi_list_lock);
794 cli->cl_avail_grant = ocd->ocd_grant;
795 client_obd_list_unlock(&cli->cl_loi_list_lock);
797 CDEBUG(D_CACHE, "setting cl_avail_grant: %ld cl_lost_grant: %ld\n",
798 cli->cl_avail_grant, cli->cl_lost_grant);
799 LASSERT(cli->cl_avail_grant >= 0);
802 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
804 client_obd_list_lock(&cli->cl_loi_list_lock);
805 CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
806 if (body->oa.o_valid & OBD_MD_FLGRANT)
807 cli->cl_avail_grant += body->oa.o_grant;
808 /* waiters are woken in brw_interpret_oap */
809 client_obd_list_unlock(&cli->cl_loi_list_lock);
812 /* We assume that the reason this OSC got a short read is because it read
813 * beyond the end of a stripe file; i.e. lustre is reading a sparse file
814 * via the LOV, and it _knows_ it's reading inside the file, it's just that
815 * this stripe never got written at or beyond this stripe offset yet. */
816 static void handle_short_read(int nob_read, obd_count page_count,
817 struct brw_page **pga)
822 /* skip bytes read OK */
823 while (nob_read > 0) {
824 LASSERT (page_count > 0);
826 if (pga[i]->count > nob_read) {
827 /* EOF inside this page */
828 ptr = cfs_kmap(pga[i]->pg) +
829 (pga[i]->off & ~CFS_PAGE_MASK);
830 memset(ptr + nob_read, 0, pga[i]->count - nob_read);
831 cfs_kunmap(pga[i]->pg);
837 nob_read -= pga[i]->count;
842 /* zero remaining pages */
843 while (page_count-- > 0) {
844 ptr = cfs_kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
845 memset(ptr, 0, pga[i]->count);
846 cfs_kunmap(pga[i]->pg);
851 static int check_write_rcs(struct ptlrpc_request *req,
852 int requested_nob, int niocount,
853 obd_count page_count, struct brw_page **pga)
857 /* return error if any niobuf was in error */
858 remote_rcs = lustre_swab_repbuf(req, REQ_REC_OFF + 1,
859 sizeof(*remote_rcs) * niocount, NULL);
860 if (remote_rcs == NULL) {
861 CERROR("Missing/short RC vector on BRW_WRITE reply\n");
864 if (lustre_msg_swabbed(req->rq_repmsg))
865 for (i = 0; i < niocount; i++)
866 __swab32s(&remote_rcs[i]);
868 for (i = 0; i < niocount; i++) {
869 if (remote_rcs[i] < 0)
870 return(remote_rcs[i]);
872 if (remote_rcs[i] != 0) {
873 CERROR("rc[%d] invalid (%d) req %p\n",
874 i, remote_rcs[i], req);
879 if (req->rq_bulk->bd_nob_transferred != requested_nob) {
880 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
881 requested_nob, req->rq_bulk->bd_nob_transferred);
888 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
890 if (p1->flag != p2->flag) {
891 unsigned mask = ~OBD_BRW_FROM_GRANT;
893 /* warn if we try to combine flags that we don't know to be
895 if ((p1->flag & mask) != (p2->flag & mask))
896 CERROR("is it ok to have flags 0x%x and 0x%x in the "
897 "same brw?\n", p1->flag, p2->flag);
901 return (p1->off + p1->count == p2->off);
904 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
905 struct brw_page **pga, int opc)
910 LASSERT (pg_count > 0);
911 while (nob > 0 && pg_count > 0) {
912 char *ptr = cfs_kmap(pga[i]->pg);
913 int off = pga[i]->off & ~CFS_PAGE_MASK;
914 int count = pga[i]->count > nob ? nob : pga[i]->count;
916 /* corrupt the data before we compute the checksum, to
917 * simulate an OST->client data error */
918 if (i == 0 && opc == OST_READ &&
919 OBD_FAIL_CHECK_ONCE(OBD_FAIL_OSC_CHECKSUM_RECEIVE))
920 memcpy(ptr + off, "bad1", min(4, nob));
921 cksum = crc32_le(cksum, ptr + off, count);
922 cfs_kunmap(pga[i]->pg);
923 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d checksum %x\n",
926 nob -= pga[i]->count;
930 /* For sending we only compute the wrong checksum instead
931 * of corrupting the data so it is still correct on a redo */
932 if (opc == OST_WRITE && OBD_FAIL_CHECK_ONCE(OBD_FAIL_OSC_CHECKSUM_SEND))
938 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
939 struct lov_stripe_md *lsm, obd_count page_count,
940 struct brw_page **pga,
941 struct ptlrpc_request **reqp,
942 struct obd_capa *ocapa)
944 struct ptlrpc_request *req;
945 struct ptlrpc_bulk_desc *desc;
946 struct ost_body *body;
947 struct obd_ioobj *ioobj;
948 struct niobuf_remote *niobuf;
949 int size[5] = { sizeof(struct ptlrpc_body), sizeof(*body) };
950 int niocount, i, requested_nob, opc, rc;
951 struct ptlrpc_request_pool *pool;
952 struct lustre_capa *capa;
953 struct osc_brw_async_args *aa;
956 OBD_FAIL_RETURN(OBD_FAIL_OSC_BRW_PREP_REQ, -ENOMEM); /* Recoverable */
957 OBD_FAIL_RETURN(OBD_FAIL_OSC_BRW_PREP_REQ2, -EINVAL); /* Fatal */
959 if ((cmd & OBD_BRW_WRITE) != 0) {
961 pool = cli->cl_import->imp_rq_pool;
967 for (niocount = i = 1; i < page_count; i++) {
968 if (!can_merge_pages(pga[i - 1], pga[i]))
972 size[REQ_REC_OFF + 1] = sizeof(*ioobj);
973 size[REQ_REC_OFF + 2] = niocount * sizeof(*niobuf);
975 size[REQ_REC_OFF + 3] = sizeof(*capa);
977 req = ptlrpc_prep_req_pool(cli->cl_import, LUSTRE_OST_VERSION, opc, 5,
978 size, NULL, pool, NULL);
982 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
984 if (opc == OST_WRITE)
985 desc = ptlrpc_prep_bulk_imp (req, page_count,
986 BULK_GET_SOURCE, OST_BULK_PORTAL);
988 desc = ptlrpc_prep_bulk_imp (req, page_count,
989 BULK_PUT_SINK, OST_BULK_PORTAL);
991 GOTO(out, rc = -ENOMEM);
992 /* NB request now owns desc and will free it when it gets freed */
994 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
995 ioobj = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 1, sizeof(*ioobj));
996 niobuf = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
997 niocount * sizeof(*niobuf));
1001 obdo_to_ioobj(oa, ioobj);
1002 ioobj->ioo_bufcnt = niocount;
1004 capa = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 3,
1006 capa_cpy(capa, ocapa);
1007 body->oa.o_valid |= OBD_MD_FLOSSCAPA;
1010 LASSERT (page_count > 0);
1011 for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1012 struct brw_page *pg = pga[i];
1013 struct brw_page *pg_prev = pga[i - 1];
1015 LASSERT(pg->count > 0);
1016 LASSERTF((pg->off & ~CFS_PAGE_MASK) + pg->count <= CFS_PAGE_SIZE,
1017 "i: %d pg: %p off: "LPU64", count: %u\n", i, pg,
1018 pg->off, pg->count);
1020 LASSERTF(i == 0 || pg->off > pg_prev->off,
1021 "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1022 " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1024 pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1025 pg_prev->pg, page_private(pg_prev->pg),
1026 pg_prev->pg->index, pg_prev->off);
1028 LASSERTF(i == 0 || pg->off > pg_prev->off,
1029 "i %d p_c %u\n", i, page_count);
1031 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1032 (pg->flag & OBD_BRW_SRVLOCK));
1034 ptlrpc_prep_bulk_page(desc, pg->pg, pg->off & ~CFS_PAGE_MASK,
1036 requested_nob += pg->count;
1038 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1040 niobuf->len += pg->count;
1042 niobuf->offset = pg->off;
1043 niobuf->len = pg->count;
1044 niobuf->flags = pg->flag;
1048 LASSERT((void *)(niobuf - niocount) ==
1049 lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
1050 niocount * sizeof(*niobuf)));
1051 osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1053 /* size[REQ_REC_OFF] still sizeof (*body) */
1054 if (opc == OST_WRITE) {
1055 if (unlikely(cli->cl_checksum)) {
1056 body->oa.o_valid |= OBD_MD_FLCKSUM;
1057 body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1060 CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1062 /* save this in 'oa', too, for later checking */
1063 oa->o_valid |= OBD_MD_FLCKSUM;
1065 /* clear out the checksum flag, in case this is a
1066 * resend but cl_checksum is no longer set. b=11238 */
1067 oa->o_valid &= ~OBD_MD_FLCKSUM;
1069 oa->o_cksum = body->oa.o_cksum;
1070 /* 1 RC per niobuf */
1071 size[REPLY_REC_OFF + 1] = sizeof(__u32) * niocount;
1072 ptlrpc_req_set_repsize(req, 3, size);
1074 if (unlikely(cli->cl_checksum))
1075 body->oa.o_valid |= OBD_MD_FLCKSUM;
1076 /* 1 RC for the whole I/O */
1077 ptlrpc_req_set_repsize(req, 2, size);
1080 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1081 aa = (struct osc_brw_async_args *)&req->rq_async_args;
1083 aa->aa_requested_nob = requested_nob;
1084 aa->aa_nio_count = niocount;
1085 aa->aa_page_count = page_count;
1089 INIT_LIST_HEAD(&aa->aa_oaps);
1095 ptlrpc_req_finished (req);
1099 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1100 __u32 client_cksum, __u32 server_cksum,
1101 int nob, obd_count page_count,
1102 struct brw_page **pga)
1107 if (server_cksum == client_cksum) {
1108 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1112 new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE);
1114 if (new_cksum == server_cksum)
1115 msg = "changed on the client after we checksummed it - "
1116 "likely false positive due to mmap IO (bug 11742)";
1117 else if (new_cksum == client_cksum)
1118 msg = "changed in transit before arrival at OST";
1120 msg = "changed in transit AND doesn't match the original - "
1121 "likely false positive due to mmap IO (bug 11742)";
1123 LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inum "
1124 LPU64"/"LPU64" object "LPU64"/"LPU64" extent "
1125 "["LPU64"-"LPU64"]\n",
1126 msg, libcfs_nid2str(peer->nid),
1127 oa->o_valid & OBD_MD_FLFID ? oa->o_fid : (__u64)0,
1128 oa->o_valid & OBD_MD_FLFID ? oa->o_generation :
1131 oa->o_valid & OBD_MD_FLGROUP ? oa->o_gr : (__u64)0,
1133 pga[page_count-1]->off + pga[page_count-1]->count - 1);
1134 CERROR("original client csum %x, server csum %x, client csum now %x\n",
1135 client_cksum, server_cksum, new_cksum);
1139 /* Note rc enters this function as number of bytes transferred */
1140 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1142 struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1143 const lnet_process_id_t *peer =
1144 &req->rq_import->imp_connection->c_peer;
1145 struct client_obd *cli = aa->aa_cli;
1146 struct ost_body *body;
1147 __u32 client_cksum = 0;
1150 if (rc < 0 && rc != -EDQUOT)
1153 LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1154 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
1155 lustre_swab_ost_body);
1157 CERROR ("Can't unpack body\n");
1161 /* set/clear over quota flag for a uid/gid */
1162 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1163 body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA))
1164 lquota_setdq(quota_interface, cli, body->oa.o_uid,
1165 body->oa.o_gid, body->oa.o_valid,
1171 if (unlikely(aa->aa_oa->o_valid & OBD_MD_FLCKSUM))
1172 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1174 osc_update_grant(cli, body);
1176 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1178 CERROR ("Unexpected +ve rc %d\n", rc);
1181 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1183 if (unlikely((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) &&
1185 check_write_checksum(&body->oa, peer, client_cksum,
1187 aa->aa_requested_nob,
1192 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1195 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1196 aa->aa_page_count, aa->aa_ppga);
1200 /* The rest of this function executes only for OST_READs */
1201 if (rc > aa->aa_requested_nob) {
1202 CERROR("Unexpected rc %d (%d requested)\n", rc,
1203 aa->aa_requested_nob);
1207 if (rc != req->rq_bulk->bd_nob_transferred) {
1208 CERROR ("Unexpected rc %d (%d transferred)\n",
1209 rc, req->rq_bulk->bd_nob_transferred);
1213 if (rc < aa->aa_requested_nob)
1214 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1216 if (sptlrpc_cli_unwrap_bulk_read(req, rc, aa->aa_page_count,
1218 GOTO(out, rc = -EAGAIN);
1220 if (unlikely(body->oa.o_valid & OBD_MD_FLCKSUM)) {
1221 static int cksum_counter;
1222 __u32 server_cksum = body->oa.o_cksum;
1226 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1227 aa->aa_ppga, OST_READ);
1229 if (peer->nid == req->rq_bulk->bd_sender) {
1233 router = libcfs_nid2str(req->rq_bulk->bd_sender);
1236 if (server_cksum == ~0 && rc > 0) {
1237 CERROR("Protocol error: server %s set the 'checksum' "
1238 "bit, but didn't send a checksum. Not fatal, "
1239 "but please tell CFS.\n",
1240 libcfs_nid2str(peer->nid));
1241 } else if (server_cksum != client_cksum) {
1242 LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1243 "%s%s%s inum "LPU64"/"LPU64" object "
1244 LPU64"/"LPU64" extent "
1245 "["LPU64"-"LPU64"]\n",
1246 req->rq_import->imp_obd->obd_name,
1247 libcfs_nid2str(peer->nid),
1249 body->oa.o_valid & OBD_MD_FLFID ?
1250 body->oa.o_fid : (__u64)0,
1251 body->oa.o_valid & OBD_MD_FLFID ?
1252 body->oa.o_generation :(__u64)0,
1254 body->oa.o_valid & OBD_MD_FLGROUP ?
1255 body->oa.o_gr : (__u64)0,
1256 aa->aa_ppga[0]->off,
1257 aa->aa_ppga[aa->aa_page_count-1]->off +
1258 aa->aa_ppga[aa->aa_page_count-1]->count -
1260 CERROR("client %x, server %x\n",
1261 client_cksum, server_cksum);
1263 aa->aa_oa->o_cksum = client_cksum;
1267 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1270 } else if (unlikely(client_cksum)) {
1271 static int cksum_missed;
1274 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1275 CERROR("Checksum %u requested from %s but not sent\n",
1276 cksum_missed, libcfs_nid2str(peer->nid));
1282 *aa->aa_oa = body->oa;
1287 static int osc_brw_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1288 struct lov_stripe_md *lsm,
1289 obd_count page_count, struct brw_page **pga,
1290 struct obd_capa *ocapa)
1292 struct ptlrpc_request *req;
1296 struct l_wait_info lwi;
1300 cfs_waitq_init(&waitq);
1303 rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1304 page_count, pga, &req, ocapa);
1308 rc = ptlrpc_queue_wait(req);
1310 if (rc == -ETIMEDOUT && req->rq_resend) {
1311 DEBUG_REQ(D_HA, req, "BULK TIMEOUT");
1312 ptlrpc_req_finished(req);
1316 rc = osc_brw_fini_request(req, rc);
1318 ptlrpc_req_finished(req);
1319 if (osc_recoverable_error(rc)) {
1321 if (!osc_should_resend(resends, &exp->exp_obd->u.cli)) {
1322 CERROR("too many resend retries, returning error\n");
1326 lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL, NULL);
1327 l_wait_event(waitq, 0, &lwi);
1335 int osc_brw_redo_request(struct ptlrpc_request *request,
1336 struct osc_brw_async_args *aa)
1338 struct ptlrpc_request *new_req;
1339 struct ptlrpc_request_set *set = request->rq_set;
1340 struct osc_brw_async_args *new_aa;
1341 struct osc_async_page *oap;
1345 if (!osc_should_resend(aa->aa_resends, aa->aa_cli)) {
1346 CERROR("too many resend retries, returning error\n");
1350 DEBUG_REQ(D_ERROR, request, "redo for recoverable error");
1352 body = lustre_msg_buf(request->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
1353 if (body->oa.o_valid & OBD_MD_FLOSSCAPA)
1354 ocapa = lustre_unpack_capa(request->rq_reqmsg,
1357 rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1358 OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1359 aa->aa_cli, aa->aa_oa,
1360 NULL /* lsm unused by osc currently */,
1361 aa->aa_page_count, aa->aa_ppga,
1362 &new_req, NULL /* ocapa */);
1366 client_obd_list_lock(&aa->aa_cli->cl_loi_list_lock);
1368 list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1369 if (oap->oap_request != NULL) {
1370 LASSERTF(request == oap->oap_request,
1371 "request %p != oap_request %p\n",
1372 request, oap->oap_request);
1373 if (oap->oap_interrupted) {
1374 client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1375 ptlrpc_req_finished(new_req);
1380 /* New request takes over pga and oaps from old request.
1381 * Note that copying a list_head doesn't work, need to move it... */
1383 new_req->rq_interpret_reply = request->rq_interpret_reply;
1384 new_req->rq_async_args = request->rq_async_args;
1385 new_req->rq_sent = CURRENT_SECONDS + aa->aa_resends;
1387 new_aa = (struct osc_brw_async_args *)&new_req->rq_async_args;
1389 INIT_LIST_HEAD(&new_aa->aa_oaps);
1390 list_splice(&aa->aa_oaps, &new_aa->aa_oaps);
1391 INIT_LIST_HEAD(&aa->aa_oaps);
1393 list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1394 if (oap->oap_request) {
1395 ptlrpc_req_finished(oap->oap_request);
1396 oap->oap_request = ptlrpc_request_addref(new_req);
1399 client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1401 DEBUG_REQ(D_INFO, new_req, "new request");
1403 ptlrpc_set_add_req(set, new_req);
1408 static int brw_interpret(struct ptlrpc_request *req, void *data, int rc)
1410 struct osc_brw_async_args *aa = data;
1415 rc = osc_brw_fini_request(req, rc);
1416 if (osc_recoverable_error(rc)) {
1417 rc = osc_brw_redo_request(req, aa);
1421 if ((rc >= 0) && req->rq_set && req->rq_set->set_countp)
1422 atomic_add(nob, (atomic_t *)req->rq_set->set_countp);
1424 client_obd_list_lock(&aa->aa_cli->cl_loi_list_lock);
1425 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
1426 aa->aa_cli->cl_w_in_flight--;
1428 aa->aa_cli->cl_r_in_flight--;
1429 for (i = 0; i < aa->aa_page_count; i++)
1430 osc_release_write_grant(aa->aa_cli, aa->aa_ppga[i], 1);
1431 client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1433 osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1438 static int async_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1439 struct lov_stripe_md *lsm, obd_count page_count,
1440 struct brw_page **pga, struct ptlrpc_request_set *set,
1441 struct obd_capa *ocapa)
1443 struct ptlrpc_request *req;
1444 struct client_obd *cli = &exp->exp_obd->u.cli;
1446 struct osc_brw_async_args *aa;
1449 /* Consume write credits even if doing a sync write -
1450 * otherwise we may run out of space on OST due to grant. */
1451 if (cmd == OBD_BRW_WRITE) {
1452 spin_lock(&cli->cl_loi_list_lock);
1453 for (i = 0; i < page_count; i++) {
1454 if (cli->cl_avail_grant >= CFS_PAGE_SIZE)
1455 osc_consume_write_grant(cli, pga[i]);
1457 spin_unlock(&cli->cl_loi_list_lock);
1460 rc = osc_brw_prep_request(cmd, cli, oa, lsm, page_count, pga,
1463 aa = (struct osc_brw_async_args *)&req->rq_async_args;
1464 if (cmd == OBD_BRW_READ) {
1465 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
1466 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
1467 ptlrpc_lprocfs_brw(req, OST_READ, aa->aa_requested_nob);
1469 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
1470 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
1471 cli->cl_w_in_flight);
1472 ptlrpc_lprocfs_brw(req, OST_WRITE, aa->aa_requested_nob);
1476 req->rq_interpret_reply = brw_interpret;
1477 ptlrpc_set_add_req(set, req);
1478 client_obd_list_lock(&cli->cl_loi_list_lock);
1479 if (cmd == OBD_BRW_READ)
1480 cli->cl_r_in_flight++;
1482 cli->cl_w_in_flight++;
1483 client_obd_list_unlock(&cli->cl_loi_list_lock);
1484 } else if (cmd == OBD_BRW_WRITE) {
1485 client_obd_list_lock(&cli->cl_loi_list_lock);
1486 for (i = 0; i < page_count; i++)
1487 osc_release_write_grant(cli, pga[i], 0);
1488 client_obd_list_unlock(&cli->cl_loi_list_lock);
1494 * ugh, we want disk allocation on the target to happen in offset order. we'll
1495 * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1496 * fine for our small page arrays and doesn't require allocation. its an
1497 * insertion sort that swaps elements that are strides apart, shrinking the
1498 * stride down until its '1' and the array is sorted.
1500 static void sort_brw_pages(struct brw_page **array, int num)
1503 struct brw_page *tmp;
1507 for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1512 for (i = stride ; i < num ; i++) {
1515 while (j >= stride && array[j - stride]->off > tmp->off) {
1516 array[j] = array[j - stride];
1521 } while (stride > 1);
1524 static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages)
1530 LASSERT (pages > 0);
1531 offset = pg[i]->off & ~CFS_PAGE_MASK;
1535 if (pages == 0) /* that's all */
1538 if (offset + pg[i]->count < CFS_PAGE_SIZE)
1539 return count; /* doesn't end on page boundary */
1542 offset = pg[i]->off & ~CFS_PAGE_MASK;
1543 if (offset != 0) /* doesn't start on page boundary */
1550 static struct brw_page **osc_build_ppga(struct brw_page *pga, obd_count count)
1552 struct brw_page **ppga;
1555 OBD_ALLOC(ppga, sizeof(*ppga) * count);
1559 for (i = 0; i < count; i++)
1564 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1566 LASSERT(ppga != NULL);
1567 OBD_FREE(ppga, sizeof(*ppga) * count);
1570 static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
1571 obd_count page_count, struct brw_page *pga,
1572 struct obd_trans_info *oti)
1574 struct obdo *saved_oa = NULL;
1575 struct brw_page **ppga, **orig;
1576 struct obd_import *imp = class_exp2cliimp(exp);
1577 struct client_obd *cli = &imp->imp_obd->u.cli;
1578 int rc, page_count_orig;
1581 if (cmd & OBD_BRW_CHECK) {
1582 /* The caller just wants to know if there's a chance that this
1583 * I/O can succeed */
1585 if (imp == NULL || imp->imp_invalid)
1590 /* test_brw with a failed create can trip this, maybe others. */
1591 LASSERT(cli->cl_max_pages_per_rpc);
1595 orig = ppga = osc_build_ppga(pga, page_count);
1598 page_count_orig = page_count;
1600 sort_brw_pages(ppga, page_count);
1601 while (page_count) {
1602 obd_count pages_per_brw;
1604 if (page_count > cli->cl_max_pages_per_rpc)
1605 pages_per_brw = cli->cl_max_pages_per_rpc;
1607 pages_per_brw = page_count;
1609 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1611 if (saved_oa != NULL) {
1612 /* restore previously saved oa */
1613 *oinfo->oi_oa = *saved_oa;
1614 } else if (page_count > pages_per_brw) {
1615 /* save a copy of oa (brw will clobber it) */
1616 OBDO_ALLOC(saved_oa);
1617 if (saved_oa == NULL)
1618 GOTO(out, rc = -ENOMEM);
1619 *saved_oa = *oinfo->oi_oa;
1622 rc = osc_brw_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1623 pages_per_brw, ppga, oinfo->oi_capa);
1628 page_count -= pages_per_brw;
1629 ppga += pages_per_brw;
1633 osc_release_ppga(orig, page_count_orig);
1635 if (saved_oa != NULL)
1636 OBDO_FREE(saved_oa);
1641 static int osc_brw_async(int cmd, struct obd_export *exp,
1642 struct obd_info *oinfo, obd_count page_count,
1643 struct brw_page *pga, struct obd_trans_info *oti,
1644 struct ptlrpc_request_set *set)
1646 struct brw_page **ppga, **orig;
1647 struct client_obd *cli = &exp->exp_obd->u.cli;
1648 int page_count_orig;
1652 if (cmd & OBD_BRW_CHECK) {
1653 struct obd_import *imp = class_exp2cliimp(exp);
1654 /* The caller just wants to know if there's a chance that this
1655 * I/O can succeed */
1657 if (imp == NULL || imp->imp_invalid)
1662 orig = ppga = osc_build_ppga(pga, page_count);
1665 page_count_orig = page_count;
1667 sort_brw_pages(ppga, page_count);
1668 while (page_count) {
1669 struct brw_page **copy;
1670 obd_count pages_per_brw;
1672 pages_per_brw = min_t(obd_count, page_count,
1673 cli->cl_max_pages_per_rpc);
1675 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1677 /* use ppga only if single RPC is going to fly */
1678 if (pages_per_brw != page_count_orig || ppga != orig) {
1679 OBD_ALLOC(copy, sizeof(*copy) * pages_per_brw);
1681 GOTO(out, rc = -ENOMEM);
1682 memcpy(copy, ppga, sizeof(*copy) * pages_per_brw);
1686 rc = async_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1687 pages_per_brw, copy, set, oinfo->oi_capa);
1691 OBD_FREE(copy, sizeof(*copy) * pages_per_brw);
1695 /* we passed it to async_internal() which is
1696 * now responsible for releasing memory */
1700 page_count -= pages_per_brw;
1701 ppga += pages_per_brw;
1705 osc_release_ppga(orig, page_count_orig);
1709 static void osc_check_rpcs(struct client_obd *cli);
1711 /* The companion to osc_enter_cache(), called when @oap is no longer part of
1712 * the dirty accounting. Writeback completes or truncate happens before
1713 * writing starts. Must be called with the loi lock held. */
1714 static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap,
1717 osc_release_write_grant(cli, &oap->oap_brw_page, sent);
1721 /* This maintains the lists of pending pages to read/write for a given object
1722 * (lop). This is used by osc_check_rpcs->osc_next_loi() and loi_list_maint()
1723 * to quickly find objects that are ready to send an RPC. */
1724 static int lop_makes_rpc(struct client_obd *cli, struct loi_oap_pages *lop,
1730 if (lop->lop_num_pending == 0)
1733 /* if we have an invalid import we want to drain the queued pages
1734 * by forcing them through rpcs that immediately fail and complete
1735 * the pages. recovery relies on this to empty the queued pages
1736 * before canceling the locks and evicting down the llite pages */
1737 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
1740 /* stream rpcs in queue order as long as as there is an urgent page
1741 * queued. this is our cheap solution for good batching in the case
1742 * where writepage marks some random page in the middle of the file
1743 * as urgent because of, say, memory pressure */
1744 if (!list_empty(&lop->lop_urgent)) {
1745 CDEBUG(D_CACHE, "urgent request forcing RPC\n");
1748 /* fire off rpcs when we have 'optimal' rpcs as tuned for the wire. */
1749 optimal = cli->cl_max_pages_per_rpc;
1750 if (cmd & OBD_BRW_WRITE) {
1751 /* trigger a write rpc stream as long as there are dirtiers
1752 * waiting for space. as they're waiting, they're not going to
1753 * create more pages to coallesce with what's waiting.. */
1754 if (!list_empty(&cli->cl_cache_waiters)) {
1755 CDEBUG(D_CACHE, "cache waiters forcing RPC\n");
1758 /* +16 to avoid triggering rpcs that would want to include pages
1759 * that are being queued but which can't be made ready until
1760 * the queuer finishes with the page. this is a wart for
1761 * llite::commit_write() */
1764 if (lop->lop_num_pending >= optimal)
1770 static void on_list(struct list_head *item, struct list_head *list,
1773 if (list_empty(item) && should_be_on)
1774 list_add_tail(item, list);
1775 else if (!list_empty(item) && !should_be_on)
1776 list_del_init(item);
1779 /* maintain the loi's cli list membership invariants so that osc_send_oap_rpc
1780 * can find pages to build into rpcs quickly */
1781 static void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi)
1783 on_list(&loi->loi_cli_item, &cli->cl_loi_ready_list,
1784 lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE) ||
1785 lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ));
1787 on_list(&loi->loi_write_item, &cli->cl_loi_write_list,
1788 loi->loi_write_lop.lop_num_pending);
1790 on_list(&loi->loi_read_item, &cli->cl_loi_read_list,
1791 loi->loi_read_lop.lop_num_pending);
1794 static void lop_update_pending(struct client_obd *cli,
1795 struct loi_oap_pages *lop, int cmd, int delta)
1797 lop->lop_num_pending += delta;
1798 if (cmd & OBD_BRW_WRITE)
1799 cli->cl_pending_w_pages += delta;
1801 cli->cl_pending_r_pages += delta;
1804 /* this is called when a sync waiter receives an interruption. Its job is to
1805 * get the caller woken as soon as possible. If its page hasn't been put in an
1806 * rpc yet it can dequeue immediately. Otherwise it has to mark the rpc as
1807 * desiring interruption which will forcefully complete the rpc once the rpc
1809 static void osc_occ_interrupted(struct oig_callback_context *occ)
1811 struct osc_async_page *oap;
1812 struct loi_oap_pages *lop;
1813 struct lov_oinfo *loi;
1816 /* XXX member_of() */
1817 oap = list_entry(occ, struct osc_async_page, oap_occ);
1819 client_obd_list_lock(&oap->oap_cli->cl_loi_list_lock);
1821 oap->oap_interrupted = 1;
1823 /* ok, it's been put in an rpc. only one oap gets a request reference */
1824 if (oap->oap_request != NULL) {
1825 ptlrpc_mark_interrupted(oap->oap_request);
1826 ptlrpcd_wake(oap->oap_request);
1830 /* we don't get interruption callbacks until osc_trigger_group_io()
1831 * has been called and put the sync oaps in the pending/urgent lists.*/
1832 if (!list_empty(&oap->oap_pending_item)) {
1833 list_del_init(&oap->oap_pending_item);
1834 list_del_init(&oap->oap_urgent_item);
1837 lop = (oap->oap_cmd & OBD_BRW_WRITE) ?
1838 &loi->loi_write_lop : &loi->loi_read_lop;
1839 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, -1);
1840 loi_list_maint(oap->oap_cli, oap->oap_loi);
1842 oig_complete_one(oap->oap_oig, &oap->oap_occ, -EINTR);
1843 oap->oap_oig = NULL;
1847 client_obd_list_unlock(&oap->oap_cli->cl_loi_list_lock);
1850 /* this is trying to propogate async writeback errors back up to the
1851 * application. As an async write fails we record the error code for later if
1852 * the app does an fsync. As long as errors persist we force future rpcs to be
1853 * sync so that the app can get a sync error and break the cycle of queueing
1854 * pages for which writeback will fail. */
1855 static void osc_process_ar(struct osc_async_rc *ar, __u64 xid,
1862 ar->ar_force_sync = 1;
1863 ar->ar_min_xid = ptlrpc_sample_next_xid();
1868 if (ar->ar_force_sync && (xid >= ar->ar_min_xid))
1869 ar->ar_force_sync = 0;
1872 static void osc_oap_to_pending(struct osc_async_page *oap)
1874 struct loi_oap_pages *lop;
1876 if (oap->oap_cmd & OBD_BRW_WRITE)
1877 lop = &oap->oap_loi->loi_write_lop;
1879 lop = &oap->oap_loi->loi_read_lop;
1881 if (oap->oap_async_flags & ASYNC_URGENT)
1882 list_add(&oap->oap_urgent_item, &lop->lop_urgent);
1883 list_add_tail(&oap->oap_pending_item, &lop->lop_pending);
1884 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, 1);
1887 /* this must be called holding the loi list lock to give coverage to exit_cache,
1888 * async_flag maintenance, and oap_request */
1889 static void osc_ap_completion(struct client_obd *cli, struct obdo *oa,
1890 struct osc_async_page *oap, int sent, int rc)
1895 if (oap->oap_request != NULL) {
1896 xid = ptlrpc_req_xid(oap->oap_request);
1897 ptlrpc_req_finished(oap->oap_request);
1898 oap->oap_request = NULL;
1901 oap->oap_async_flags = 0;
1902 oap->oap_interrupted = 0;
1904 if (oap->oap_cmd & OBD_BRW_WRITE) {
1905 osc_process_ar(&cli->cl_ar, xid, rc);
1906 osc_process_ar(&oap->oap_loi->loi_ar, xid, rc);
1909 if (rc == 0 && oa != NULL) {
1910 if (oa->o_valid & OBD_MD_FLBLOCKS)
1911 oap->oap_loi->loi_lvb.lvb_blocks = oa->o_blocks;
1912 if (oa->o_valid & OBD_MD_FLMTIME)
1913 oap->oap_loi->loi_lvb.lvb_mtime = oa->o_mtime;
1914 if (oa->o_valid & OBD_MD_FLATIME)
1915 oap->oap_loi->loi_lvb.lvb_atime = oa->o_atime;
1916 if (oa->o_valid & OBD_MD_FLCTIME)
1917 oap->oap_loi->loi_lvb.lvb_ctime = oa->o_ctime;
1921 osc_exit_cache(cli, oap, sent);
1922 oig_complete_one(oap->oap_oig, &oap->oap_occ, rc);
1923 oap->oap_oig = NULL;
1928 rc = oap->oap_caller_ops->ap_completion(oap->oap_caller_data,
1929 oap->oap_cmd, oa, rc);
1931 /* ll_ap_completion (from llite) drops PG_locked. so, a new
1932 * I/O on the page could start, but OSC calls it under lock
1933 * and thus we can add oap back to pending safely */
1935 /* upper layer wants to leave the page on pending queue */
1936 osc_oap_to_pending(oap);
1938 osc_exit_cache(cli, oap, sent);
1942 static int brw_interpret_oap(struct ptlrpc_request *req, void *data, int rc)
1944 struct osc_async_page *oap, *tmp;
1945 struct osc_brw_async_args *aa = data;
1946 struct client_obd *cli;
1949 rc = osc_brw_fini_request(req, rc);
1950 CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
1951 if (osc_recoverable_error(rc)) {
1952 rc = osc_brw_redo_request(req, aa);
1959 client_obd_list_lock(&cli->cl_loi_list_lock);
1961 /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
1962 * is called so we know whether to go to sync BRWs or wait for more
1963 * RPCs to complete */
1964 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
1965 cli->cl_w_in_flight--;
1967 cli->cl_r_in_flight--;
1969 /* the caller may re-use the oap after the completion call so
1970 * we need to clean it up a little */
1971 list_for_each_entry_safe(oap, tmp, &aa->aa_oaps, oap_rpc_item) {
1972 list_del_init(&oap->oap_rpc_item);
1973 osc_ap_completion(cli, aa->aa_oa, oap, 1, rc);
1976 osc_wake_cache_waiters(cli);
1977 osc_check_rpcs(cli);
1979 client_obd_list_unlock(&cli->cl_loi_list_lock);
1981 OBDO_FREE(aa->aa_oa);
1983 osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1987 static struct ptlrpc_request *osc_build_req(struct client_obd *cli,
1988 struct list_head *rpc_list,
1989 int page_count, int cmd)
1991 struct ptlrpc_request *req;
1992 struct brw_page **pga = NULL;
1993 struct osc_brw_async_args *aa;
1994 struct obdo *oa = NULL;
1995 struct obd_async_page_ops *ops = NULL;
1996 void *caller_data = NULL;
1997 struct obd_capa *ocapa;
1998 struct osc_async_page *oap;
2002 LASSERT(!list_empty(rpc_list));
2004 OBD_ALLOC(pga, sizeof(*pga) * page_count);
2006 RETURN(ERR_PTR(-ENOMEM));
2010 GOTO(out, req = ERR_PTR(-ENOMEM));
2013 list_for_each_entry(oap, rpc_list, oap_rpc_item) {
2015 ops = oap->oap_caller_ops;
2016 caller_data = oap->oap_caller_data;
2018 pga[i] = &oap->oap_brw_page;
2019 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2020 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
2021 pga[i]->pg, cfs_page_index(oap->oap_page), oap, pga[i]->flag);
2025 /* always get the data for the obdo for the rpc */
2026 LASSERT(ops != NULL);
2027 ops->ap_fill_obdo(caller_data, cmd, oa);
2028 ocapa = ops->ap_lookup_capa(caller_data, cmd);
2030 sort_brw_pages(pga, page_count);
2031 rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
2035 CERROR("prep_req failed: %d\n", rc);
2036 GOTO(out, req = ERR_PTR(rc));
2039 /* Need to update the timestamps after the request is built in case
2040 * we race with setattr (locally or in queue at OST). If OST gets
2041 * later setattr before earlier BRW (as determined by the request xid),
2042 * the OST will not use BRW timestamps. Sadly, there is no obvious
2043 * way to do this in a single call. bug 10150 */
2044 ops->ap_update_obdo(caller_data, cmd, oa,
2045 OBD_MD_FLMTIME | OBD_MD_FLCTIME | OBD_MD_FLATIME);
2047 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2048 aa = (struct osc_brw_async_args *)&req->rq_async_args;
2049 INIT_LIST_HEAD(&aa->aa_oaps);
2050 list_splice(rpc_list, &aa->aa_oaps);
2051 INIT_LIST_HEAD(rpc_list);
2058 OBD_FREE(pga, sizeof(*pga) * page_count);
2063 /* the loi lock is held across this function but it's allowed to release
2064 * and reacquire it during its work */
2065 static int osc_send_oap_rpc(struct client_obd *cli, struct lov_oinfo *loi,
2066 int cmd, struct loi_oap_pages *lop)
2068 struct ptlrpc_request *req;
2069 obd_count page_count = 0;
2070 struct osc_async_page *oap = NULL, *tmp;
2071 struct osc_brw_async_args *aa;
2072 struct obd_async_page_ops *ops;
2073 CFS_LIST_HEAD(rpc_list);
2074 unsigned int ending_offset;
2075 unsigned starting_offset = 0;
2078 /* first we find the pages we're allowed to work with */
2079 list_for_each_entry_safe(oap, tmp, &lop->lop_pending,
2081 ops = oap->oap_caller_ops;
2083 LASSERT(oap->oap_magic == OAP_MAGIC);
2085 /* in llite being 'ready' equates to the page being locked
2086 * until completion unlocks it. commit_write submits a page
2087 * as not ready because its unlock will happen unconditionally
2088 * as the call returns. if we race with commit_write giving
2089 * us that page we dont' want to create a hole in the page
2090 * stream, so we stop and leave the rpc to be fired by
2091 * another dirtier or kupdated interval (the not ready page
2092 * will still be on the dirty list). we could call in
2093 * at the end of ll_file_write to process the queue again. */
2094 if (!(oap->oap_async_flags & ASYNC_READY)) {
2095 int rc = ops->ap_make_ready(oap->oap_caller_data, cmd);
2097 CDEBUG(D_INODE, "oap %p page %p returned %d "
2098 "instead of ready\n", oap,
2102 /* llite is telling us that the page is still
2103 * in commit_write and that we should try
2104 * and put it in an rpc again later. we
2105 * break out of the loop so we don't create
2106 * a hole in the sequence of pages in the rpc
2111 /* the io isn't needed.. tell the checks
2112 * below to complete the rpc with EINTR */
2113 oap->oap_async_flags |= ASYNC_COUNT_STABLE;
2114 oap->oap_count = -EINTR;
2117 oap->oap_async_flags |= ASYNC_READY;
2120 LASSERTF(0, "oap %p page %p returned %d "
2121 "from make_ready\n", oap,
2129 * Page submitted for IO has to be locked. Either by
2130 * ->ap_make_ready() or by higher layers.
2132 * XXX nikita: this assertion should be adjusted when lustre
2133 * starts using PG_writeback for pages being written out.
2135 #if defined(__KERNEL__) && defined(__LINUX__)
2136 LASSERT(PageLocked(oap->oap_page));
2138 /* If there is a gap at the start of this page, it can't merge
2139 * with any previous page, so we'll hand the network a
2140 * "fragmented" page array that it can't transfer in 1 RDMA */
2141 if (page_count != 0 && oap->oap_page_off != 0)
2144 /* take the page out of our book-keeping */
2145 list_del_init(&oap->oap_pending_item);
2146 lop_update_pending(cli, lop, cmd, -1);
2147 list_del_init(&oap->oap_urgent_item);
2149 if (page_count == 0)
2150 starting_offset = (oap->oap_obj_off+oap->oap_page_off) &
2151 (PTLRPC_MAX_BRW_SIZE - 1);
2153 /* ask the caller for the size of the io as the rpc leaves. */
2154 if (!(oap->oap_async_flags & ASYNC_COUNT_STABLE))
2156 ops->ap_refresh_count(oap->oap_caller_data,cmd);
2157 if (oap->oap_count <= 0) {
2158 CDEBUG(D_CACHE, "oap %p count %d, completing\n", oap,
2160 osc_ap_completion(cli, NULL, oap, 0, oap->oap_count);
2164 /* now put the page back in our accounting */
2165 list_add_tail(&oap->oap_rpc_item, &rpc_list);
2166 if (++page_count >= cli->cl_max_pages_per_rpc)
2169 /* End on a PTLRPC_MAX_BRW_SIZE boundary. We want full-sized
2170 * RPCs aligned on PTLRPC_MAX_BRW_SIZE boundaries to help reads
2171 * have the same alignment as the initial writes that allocated
2172 * extents on the server. */
2173 ending_offset = (oap->oap_obj_off + oap->oap_page_off +
2174 oap->oap_count) & (PTLRPC_MAX_BRW_SIZE - 1);
2175 if (ending_offset == 0)
2178 /* If there is a gap at the end of this page, it can't merge
2179 * with any subsequent pages, so we'll hand the network a
2180 * "fragmented" page array that it can't transfer in 1 RDMA */
2181 if (oap->oap_page_off + oap->oap_count < CFS_PAGE_SIZE)
2185 osc_wake_cache_waiters(cli);
2187 if (page_count == 0)
2190 loi_list_maint(cli, loi);
2192 client_obd_list_unlock(&cli->cl_loi_list_lock);
2194 req = osc_build_req(cli, &rpc_list, page_count, cmd);
2196 /* this should happen rarely and is pretty bad, it makes the
2197 * pending list not follow the dirty order */
2198 client_obd_list_lock(&cli->cl_loi_list_lock);
2199 list_for_each_entry_safe(oap, tmp, &rpc_list, oap_rpc_item) {
2200 list_del_init(&oap->oap_rpc_item);
2202 /* queued sync pages can be torn down while the pages
2203 * were between the pending list and the rpc */
2204 if (oap->oap_interrupted) {
2205 CDEBUG(D_INODE, "oap %p interrupted\n", oap);
2206 osc_ap_completion(cli, NULL, oap, 0,
2210 osc_ap_completion(cli, NULL, oap, 0, PTR_ERR(req));
2212 loi_list_maint(cli, loi);
2213 RETURN(PTR_ERR(req));
2216 aa = (struct osc_brw_async_args *)&req->rq_async_args;
2218 if (cmd == OBD_BRW_READ) {
2219 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2220 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2221 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2222 (starting_offset >> CFS_PAGE_SHIFT) + 1);
2223 ptlrpc_lprocfs_brw(req, OST_READ, aa->aa_requested_nob);
2225 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2226 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
2227 cli->cl_w_in_flight);
2228 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2229 (starting_offset >> CFS_PAGE_SHIFT) + 1);
2230 ptlrpc_lprocfs_brw(req, OST_WRITE, aa->aa_requested_nob);
2233 client_obd_list_lock(&cli->cl_loi_list_lock);
2235 if (cmd == OBD_BRW_READ)
2236 cli->cl_r_in_flight++;
2238 cli->cl_w_in_flight++;
2240 /* queued sync pages can be torn down while the pages
2241 * were between the pending list and the rpc */
2243 list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2244 /* only one oap gets a request reference */
2247 if (oap->oap_interrupted && !req->rq_intr) {
2248 CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2250 ptlrpc_mark_interrupted(req);
2254 tmp->oap_request = ptlrpc_request_addref(req);
2256 DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2257 page_count, aa, cli->cl_r_in_flight, cli->cl_w_in_flight);
2259 req->rq_interpret_reply = brw_interpret_oap;
2260 ptlrpcd_add_req(req);
2264 #define LOI_DEBUG(LOI, STR, args...) \
2265 CDEBUG(D_INODE, "loi ready %d wr %d:%d rd %d:%d " STR, \
2266 !list_empty(&(LOI)->loi_cli_item), \
2267 (LOI)->loi_write_lop.lop_num_pending, \
2268 !list_empty(&(LOI)->loi_write_lop.lop_urgent), \
2269 (LOI)->loi_read_lop.lop_num_pending, \
2270 !list_empty(&(LOI)->loi_read_lop.lop_urgent), \
2273 /* This is called by osc_check_rpcs() to find which objects have pages that
2274 * we could be sending. These lists are maintained by lop_makes_rpc(). */
2275 struct lov_oinfo *osc_next_loi(struct client_obd *cli)
2278 /* first return all objects which we already know to have
2279 * pages ready to be stuffed into rpcs */
2280 if (!list_empty(&cli->cl_loi_ready_list))
2281 RETURN(list_entry(cli->cl_loi_ready_list.next,
2282 struct lov_oinfo, loi_cli_item));
2284 /* then if we have cache waiters, return all objects with queued
2285 * writes. This is especially important when many small files
2286 * have filled up the cache and not been fired into rpcs because
2287 * they don't pass the nr_pending/object threshhold */
2288 if (!list_empty(&cli->cl_cache_waiters) &&
2289 !list_empty(&cli->cl_loi_write_list))
2290 RETURN(list_entry(cli->cl_loi_write_list.next,
2291 struct lov_oinfo, loi_write_item));
2293 /* then return all queued objects when we have an invalid import
2294 * so that they get flushed */
2295 if (cli->cl_import == NULL || cli->cl_import->imp_invalid) {
2296 if (!list_empty(&cli->cl_loi_write_list))
2297 RETURN(list_entry(cli->cl_loi_write_list.next,
2298 struct lov_oinfo, loi_write_item));
2299 if (!list_empty(&cli->cl_loi_read_list))
2300 RETURN(list_entry(cli->cl_loi_read_list.next,
2301 struct lov_oinfo, loi_read_item));
2306 /* called with the loi list lock held */
2307 static void osc_check_rpcs(struct client_obd *cli)
2309 struct lov_oinfo *loi;
2310 int rc = 0, race_counter = 0;
2313 while ((loi = osc_next_loi(cli)) != NULL) {
2314 LOI_DEBUG(loi, "%lu in flight\n", rpcs_in_flight(cli));
2316 if (rpcs_in_flight(cli) >= cli->cl_max_rpcs_in_flight)
2319 /* attempt some read/write balancing by alternating between
2320 * reads and writes in an object. The makes_rpc checks here
2321 * would be redundant if we were getting read/write work items
2322 * instead of objects. we don't want send_oap_rpc to drain a
2323 * partial read pending queue when we're given this object to
2324 * do io on writes while there are cache waiters */
2325 if (lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)) {
2326 rc = osc_send_oap_rpc(cli, loi, OBD_BRW_WRITE,
2327 &loi->loi_write_lop);
2335 if (lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ)) {
2336 rc = osc_send_oap_rpc(cli, loi, OBD_BRW_READ,
2337 &loi->loi_read_lop);
2346 /* attempt some inter-object balancing by issueing rpcs
2347 * for each object in turn */
2348 if (!list_empty(&loi->loi_cli_item))
2349 list_del_init(&loi->loi_cli_item);
2350 if (!list_empty(&loi->loi_write_item))
2351 list_del_init(&loi->loi_write_item);
2352 if (!list_empty(&loi->loi_read_item))
2353 list_del_init(&loi->loi_read_item);
2355 loi_list_maint(cli, loi);
2357 /* send_oap_rpc fails with 0 when make_ready tells it to
2358 * back off. llite's make_ready does this when it tries
2359 * to lock a page queued for write that is already locked.
2360 * we want to try sending rpcs from many objects, but we
2361 * don't want to spin failing with 0. */
2362 if (race_counter == 10)
2368 /* we're trying to queue a page in the osc so we're subject to the
2369 * 'cl_dirty_max' limit on the number of pages that can be queued in the osc.
2370 * If the osc's queued pages are already at that limit, then we want to sleep
2371 * until there is space in the osc's queue for us. We also may be waiting for
2372 * write credits from the OST if there are RPCs in flight that may return some
2373 * before we fall back to sync writes.
2375 * We need this know our allocation was granted in the presence of signals */
2376 static int ocw_granted(struct client_obd *cli, struct osc_cache_waiter *ocw)
2380 client_obd_list_lock(&cli->cl_loi_list_lock);
2381 rc = list_empty(&ocw->ocw_entry) || rpcs_in_flight(cli) == 0;
2382 client_obd_list_unlock(&cli->cl_loi_list_lock);
2386 /* Caller must hold loi_list_lock - we drop/regain it if we need to wait for
2387 * grant or cache space. */
2388 static int osc_enter_cache(struct client_obd *cli, struct lov_oinfo *loi,
2389 struct osc_async_page *oap)
2391 struct osc_cache_waiter ocw;
2392 struct l_wait_info lwi = { 0 };
2396 CDEBUG(D_CACHE, "dirty: %ld/%d dirty_max: %ld/%d dropped: %lu "
2397 "grant: %lu\n", cli->cl_dirty, atomic_read(&obd_dirty_pages),
2398 cli->cl_dirty_max, obd_max_dirty_pages,
2399 cli->cl_lost_grant, cli->cl_avail_grant);
2401 /* force the caller to try sync io. this can jump the list
2402 * of queued writes and create a discontiguous rpc stream */
2403 if (cli->cl_dirty_max < CFS_PAGE_SIZE || cli->cl_ar.ar_force_sync ||
2404 loi->loi_ar.ar_force_sync)
2407 /* Hopefully normal case - cache space and write credits available */
2408 if ((cli->cl_dirty + CFS_PAGE_SIZE <= cli->cl_dirty_max) &&
2409 (atomic_read(&obd_dirty_pages) + 1 <= obd_max_dirty_pages) &&
2410 (cli->cl_avail_grant >= CFS_PAGE_SIZE)) {
2411 /* account for ourselves */
2412 osc_consume_write_grant(cli, &oap->oap_brw_page);
2416 /* Make sure that there are write rpcs in flight to wait for. This
2417 * is a little silly as this object may not have any pending but
2418 * other objects sure might. */
2419 if (cli->cl_w_in_flight) {
2420 list_add_tail(&ocw.ocw_entry, &cli->cl_cache_waiters);
2421 cfs_waitq_init(&ocw.ocw_waitq);
2425 loi_list_maint(cli, loi);
2426 osc_check_rpcs(cli);
2427 client_obd_list_unlock(&cli->cl_loi_list_lock);
2429 CDEBUG(D_CACHE, "sleeping for cache space\n");
2430 l_wait_event(ocw.ocw_waitq, ocw_granted(cli, &ocw), &lwi);
2432 client_obd_list_lock(&cli->cl_loi_list_lock);
2433 if (!list_empty(&ocw.ocw_entry)) {
2434 list_del(&ocw.ocw_entry);
2443 int osc_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
2444 struct lov_oinfo *loi, cfs_page_t *page,
2445 obd_off offset, struct obd_async_page_ops *ops,
2446 void *data, void **res)
2448 struct osc_async_page *oap;
2452 return size_round(sizeof(*oap));
2455 oap->oap_magic = OAP_MAGIC;
2456 oap->oap_cli = &exp->exp_obd->u.cli;
2459 oap->oap_caller_ops = ops;
2460 oap->oap_caller_data = data;
2462 oap->oap_page = page;
2463 oap->oap_obj_off = offset;
2465 CFS_INIT_LIST_HEAD(&oap->oap_pending_item);
2466 CFS_INIT_LIST_HEAD(&oap->oap_urgent_item);
2467 CFS_INIT_LIST_HEAD(&oap->oap_rpc_item);
2469 oap->oap_occ.occ_interrupted = osc_occ_interrupted;
2471 CDEBUG(D_CACHE, "oap %p page %p obj off "LPU64"\n", oap, page, offset);
2475 struct osc_async_page *oap_from_cookie(void *cookie)
2477 struct osc_async_page *oap = cookie;
2478 if (oap->oap_magic != OAP_MAGIC)
2479 return ERR_PTR(-EINVAL);
2483 static int osc_queue_async_io(struct obd_export *exp, struct lov_stripe_md *lsm,
2484 struct lov_oinfo *loi, void *cookie,
2485 int cmd, obd_off off, int count,
2486 obd_flag brw_flags, enum async_flags async_flags)
2488 struct client_obd *cli = &exp->exp_obd->u.cli;
2489 struct osc_async_page *oap;
2493 oap = oap_from_cookie(cookie);
2495 RETURN(PTR_ERR(oap));
2497 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2500 if (!list_empty(&oap->oap_pending_item) ||
2501 !list_empty(&oap->oap_urgent_item) ||
2502 !list_empty(&oap->oap_rpc_item))
2505 /* check if the file's owner/group is over quota */
2506 #ifdef HAVE_QUOTA_SUPPORT
2507 if ((cmd & OBD_BRW_WRITE) && !(cmd & OBD_BRW_NOQUOTA)){
2508 struct obd_async_page_ops *ops;
2515 ops = oap->oap_caller_ops;
2516 ops->ap_fill_obdo(oap->oap_caller_data, cmd, oa);
2517 if (lquota_chkdq(quota_interface, cli, oa->o_uid, oa->o_gid) ==
2528 loi = lsm->lsm_oinfo[0];
2530 client_obd_list_lock(&cli->cl_loi_list_lock);
2533 oap->oap_page_off = off;
2534 oap->oap_count = count;
2535 oap->oap_brw_flags = brw_flags;
2536 oap->oap_async_flags = async_flags;
2538 if (cmd & OBD_BRW_WRITE) {
2539 rc = osc_enter_cache(cli, loi, oap);
2541 client_obd_list_unlock(&cli->cl_loi_list_lock);
2546 osc_oap_to_pending(oap);
2547 loi_list_maint(cli, loi);
2549 LOI_DEBUG(loi, "oap %p page %p added for cmd %d\n", oap, oap->oap_page,
2552 osc_check_rpcs(cli);
2553 client_obd_list_unlock(&cli->cl_loi_list_lock);
2558 /* aka (~was & now & flag), but this is more clear :) */
2559 #define SETTING(was, now, flag) (!(was & flag) && (now & flag))
2561 static int osc_set_async_flags(struct obd_export *exp,
2562 struct lov_stripe_md *lsm,
2563 struct lov_oinfo *loi, void *cookie,
2564 obd_flag async_flags)
2566 struct client_obd *cli = &exp->exp_obd->u.cli;
2567 struct loi_oap_pages *lop;
2568 struct osc_async_page *oap;
2572 oap = oap_from_cookie(cookie);
2574 RETURN(PTR_ERR(oap));
2577 * bug 7311: OST-side locking is only supported for liblustre for now
2578 * (and liblustre never calls obd_set_async_flags(). I hope.), generic
2579 * implementation has to handle case where OST-locked page was picked
2580 * up by, e.g., ->writepage().
2582 LASSERT(!(oap->oap_brw_flags & OBD_BRW_SRVLOCK));
2583 LASSERT(!LIBLUSTRE_CLIENT); /* check that liblustre angels do fear to
2586 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2590 loi = lsm->lsm_oinfo[0];
2592 if (oap->oap_cmd & OBD_BRW_WRITE) {
2593 lop = &loi->loi_write_lop;
2595 lop = &loi->loi_read_lop;
2598 client_obd_list_lock(&cli->cl_loi_list_lock);
2600 if (list_empty(&oap->oap_pending_item))
2601 GOTO(out, rc = -EINVAL);
2603 if ((oap->oap_async_flags & async_flags) == async_flags)
2606 if (SETTING(oap->oap_async_flags, async_flags, ASYNC_READY))
2607 oap->oap_async_flags |= ASYNC_READY;
2609 if (SETTING(oap->oap_async_flags, async_flags, ASYNC_URGENT)) {
2610 if (list_empty(&oap->oap_rpc_item)) {
2611 list_add(&oap->oap_urgent_item, &lop->lop_urgent);
2612 loi_list_maint(cli, loi);
2616 LOI_DEBUG(loi, "oap %p page %p has flags %x\n", oap, oap->oap_page,
2617 oap->oap_async_flags);
2619 osc_check_rpcs(cli);
2620 client_obd_list_unlock(&cli->cl_loi_list_lock);
2624 static int osc_queue_group_io(struct obd_export *exp, struct lov_stripe_md *lsm,
2625 struct lov_oinfo *loi,
2626 struct obd_io_group *oig, void *cookie,
2627 int cmd, obd_off off, int count,
2629 obd_flag async_flags)
2631 struct client_obd *cli = &exp->exp_obd->u.cli;
2632 struct osc_async_page *oap;
2633 struct loi_oap_pages *lop;
2637 oap = oap_from_cookie(cookie);
2639 RETURN(PTR_ERR(oap));
2641 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2644 if (!list_empty(&oap->oap_pending_item) ||
2645 !list_empty(&oap->oap_urgent_item) ||
2646 !list_empty(&oap->oap_rpc_item))
2650 loi = lsm->lsm_oinfo[0];
2652 client_obd_list_lock(&cli->cl_loi_list_lock);
2655 oap->oap_page_off = off;
2656 oap->oap_count = count;
2657 oap->oap_brw_flags = brw_flags;
2658 oap->oap_async_flags = async_flags;
2660 if (cmd & OBD_BRW_WRITE)
2661 lop = &loi->loi_write_lop;
2663 lop = &loi->loi_read_lop;
2665 list_add_tail(&oap->oap_pending_item, &lop->lop_pending_group);
2666 if (oap->oap_async_flags & ASYNC_GROUP_SYNC) {
2668 rc = oig_add_one(oig, &oap->oap_occ);
2671 LOI_DEBUG(loi, "oap %p page %p on group pending: rc %d\n",
2672 oap, oap->oap_page, rc);
2674 client_obd_list_unlock(&cli->cl_loi_list_lock);
2679 static void osc_group_to_pending(struct client_obd *cli, struct lov_oinfo *loi,
2680 struct loi_oap_pages *lop, int cmd)
2682 struct list_head *pos, *tmp;
2683 struct osc_async_page *oap;
2685 list_for_each_safe(pos, tmp, &lop->lop_pending_group) {
2686 oap = list_entry(pos, struct osc_async_page, oap_pending_item);
2687 list_del(&oap->oap_pending_item);
2688 osc_oap_to_pending(oap);
2690 loi_list_maint(cli, loi);
2693 static int osc_trigger_group_io(struct obd_export *exp,
2694 struct lov_stripe_md *lsm,
2695 struct lov_oinfo *loi,
2696 struct obd_io_group *oig)
2698 struct client_obd *cli = &exp->exp_obd->u.cli;
2702 loi = lsm->lsm_oinfo[0];
2704 client_obd_list_lock(&cli->cl_loi_list_lock);
2706 osc_group_to_pending(cli, loi, &loi->loi_write_lop, OBD_BRW_WRITE);
2707 osc_group_to_pending(cli, loi, &loi->loi_read_lop, OBD_BRW_READ);
2709 osc_check_rpcs(cli);
2710 client_obd_list_unlock(&cli->cl_loi_list_lock);
2715 static int osc_teardown_async_page(struct obd_export *exp,
2716 struct lov_stripe_md *lsm,
2717 struct lov_oinfo *loi, void *cookie)
2719 struct client_obd *cli = &exp->exp_obd->u.cli;
2720 struct loi_oap_pages *lop;
2721 struct osc_async_page *oap;
2725 oap = oap_from_cookie(cookie);
2727 RETURN(PTR_ERR(oap));
2730 loi = lsm->lsm_oinfo[0];
2732 if (oap->oap_cmd & OBD_BRW_WRITE) {
2733 lop = &loi->loi_write_lop;
2735 lop = &loi->loi_read_lop;
2738 client_obd_list_lock(&cli->cl_loi_list_lock);
2740 if (!list_empty(&oap->oap_rpc_item))
2741 GOTO(out, rc = -EBUSY);
2743 osc_exit_cache(cli, oap, 0);
2744 osc_wake_cache_waiters(cli);
2746 if (!list_empty(&oap->oap_urgent_item)) {
2747 list_del_init(&oap->oap_urgent_item);
2748 oap->oap_async_flags &= ~ASYNC_URGENT;
2750 if (!list_empty(&oap->oap_pending_item)) {
2751 list_del_init(&oap->oap_pending_item);
2752 lop_update_pending(cli, lop, oap->oap_cmd, -1);
2754 loi_list_maint(cli, loi);
2756 LOI_DEBUG(loi, "oap %p page %p torn down\n", oap, oap->oap_page);
2758 client_obd_list_unlock(&cli->cl_loi_list_lock);
2762 static void osc_set_data_with_check(struct lustre_handle *lockh, void *data,
2765 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2768 CERROR("lockh %p, data %p - client evicted?\n", lockh, data);
2771 lock_res_and_lock(lock);
2774 /* Liang XXX: Darwin and Winnt checking should be added */
2775 if (lock->l_ast_data && lock->l_ast_data != data) {
2776 struct inode *new_inode = data;
2777 struct inode *old_inode = lock->l_ast_data;
2778 if (!(old_inode->i_state & I_FREEING))
2779 LDLM_ERROR(lock, "inconsistent l_ast_data found");
2780 LASSERTF(old_inode->i_state & I_FREEING,
2781 "Found existing inode %p/%lu/%u state %lu in lock: "
2782 "setting data to %p/%lu/%u\n", old_inode,
2783 old_inode->i_ino, old_inode->i_generation,
2785 new_inode, new_inode->i_ino, new_inode->i_generation);
2789 lock->l_ast_data = data;
2790 lock->l_flags |= (flags & LDLM_FL_NO_LRU);
2791 unlock_res_and_lock(lock);
2792 LDLM_LOCK_PUT(lock);
2795 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2796 ldlm_iterator_t replace, void *data)
2798 struct ldlm_res_id res_id = { .name = {0} };
2799 struct obd_device *obd = class_exp2obd(exp);
2801 res_id.name[0] = lsm->lsm_object_id;
2802 res_id.name[2] = lsm->lsm_object_gr;
2804 ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2808 static int osc_enqueue_fini(struct ptlrpc_request *req, struct obd_info *oinfo,
2814 /* The request was created before ldlm_cli_enqueue call. */
2815 if (rc == ELDLM_LOCK_ABORTED) {
2816 struct ldlm_reply *rep;
2818 /* swabbed by ldlm_cli_enqueue() */
2819 LASSERT_REPSWABBED(req, DLM_LOCKREPLY_OFF);
2820 rep = lustre_msg_buf(req->rq_repmsg, DLM_LOCKREPLY_OFF,
2822 LASSERT(rep != NULL);
2823 if (rep->lock_policy_res1)
2824 rc = rep->lock_policy_res1;
2828 if ((intent && rc == ELDLM_LOCK_ABORTED) || !rc) {
2829 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
2830 oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_size,
2831 oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_blocks,
2832 oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_mtime);
2835 /* Call the update callback. */
2836 rc = oinfo->oi_cb_up(oinfo, rc);
2840 static int osc_enqueue_interpret(struct ptlrpc_request *req,
2841 struct osc_enqueue_args *aa, int rc)
2843 int intent = aa->oa_oi->oi_flags & LDLM_FL_HAS_INTENT;
2844 struct lov_stripe_md *lsm = aa->oa_oi->oi_md;
2845 struct ldlm_lock *lock;
2847 /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2849 lock = ldlm_handle2lock(aa->oa_oi->oi_lockh);
2851 /* Complete obtaining the lock procedure. */
2852 rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
2854 &aa->oa_oi->oi_flags,
2855 &lsm->lsm_oinfo[0]->loi_lvb,
2856 sizeof(lsm->lsm_oinfo[0]->loi_lvb),
2857 lustre_swab_ost_lvb,
2858 aa->oa_oi->oi_lockh, rc);
2860 /* Complete osc stuff. */
2861 rc = osc_enqueue_fini(req, aa->oa_oi, intent, rc);
2863 /* Release the lock for async request. */
2864 if (lustre_handle_is_used(aa->oa_oi->oi_lockh) && rc == ELDLM_OK)
2865 ldlm_lock_decref(aa->oa_oi->oi_lockh, aa->oa_ei->ei_mode);
2867 LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
2868 aa->oa_oi->oi_lockh, req, aa);
2869 LDLM_LOCK_PUT(lock);
2873 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2874 * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2875 * other synchronous requests, however keeping some locks and trying to obtain
2876 * others may take a considerable amount of time in a case of ost failure; and
2877 * when other sync requests do not get released lock from a client, the client
2878 * is excluded from the cluster -- such scenarious make the life difficult, so
2879 * release locks just after they are obtained. */
2880 static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
2881 struct ldlm_enqueue_info *einfo,
2882 struct ptlrpc_request_set *rqset)
2884 struct ldlm_res_id res_id = { .name = {0} };
2885 struct obd_device *obd = exp->exp_obd;
2886 struct ldlm_reply *rep;
2887 struct ptlrpc_request *req = NULL;
2888 int intent = oinfo->oi_flags & LDLM_FL_HAS_INTENT;
2892 res_id.name[0] = oinfo->oi_md->lsm_object_id;
2893 res_id.name[2] = oinfo->oi_md->lsm_object_gr;
2895 /* Filesystem lock extents are extended to page boundaries so that
2896 * dealing with the page cache is a little smoother. */
2897 oinfo->oi_policy.l_extent.start -=
2898 oinfo->oi_policy.l_extent.start & ~CFS_PAGE_MASK;
2899 oinfo->oi_policy.l_extent.end |= ~CFS_PAGE_MASK;
2901 if (oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid == 0)
2904 /* Next, search for already existing extent locks that will cover us */
2905 rc = ldlm_lock_match(obd->obd_namespace,
2906 oinfo->oi_flags | LDLM_FL_LVB_READY, &res_id,
2907 einfo->ei_type, &oinfo->oi_policy, einfo->ei_mode,
2910 osc_set_data_with_check(oinfo->oi_lockh, einfo->ei_cbdata,
2913 /* I would like to be able to ASSERT here that rss <=
2914 * kms, but I can't, for reasons which are explained in
2918 /* We already have a lock, and it's referenced */
2919 oinfo->oi_cb_up(oinfo, ELDLM_OK);
2921 /* For async requests, decref the lock. */
2923 ldlm_lock_decref(oinfo->oi_lockh, einfo->ei_mode);
2928 /* If we're trying to read, we also search for an existing PW lock. The
2929 * VFS and page cache already protect us locally, so lots of readers/
2930 * writers can share a single PW lock.
2932 * There are problems with conversion deadlocks, so instead of
2933 * converting a read lock to a write lock, we'll just enqueue a new
2936 * At some point we should cancel the read lock instead of making them
2937 * send us a blocking callback, but there are problems with canceling
2938 * locks out from other users right now, too. */
2940 if (einfo->ei_mode == LCK_PR) {
2941 rc = ldlm_lock_match(obd->obd_namespace,
2942 oinfo->oi_flags | LDLM_FL_LVB_READY,
2943 &res_id, einfo->ei_type, &oinfo->oi_policy,
2944 LCK_PW, oinfo->oi_lockh);
2946 /* FIXME: This is not incredibly elegant, but it might
2947 * be more elegant than adding another parameter to
2948 * lock_match. I want a second opinion. */
2949 /* addref the lock only if not async requests. */
2951 ldlm_lock_addref(oinfo->oi_lockh, LCK_PR);
2952 osc_set_data_with_check(oinfo->oi_lockh,
2955 oinfo->oi_cb_up(oinfo, ELDLM_OK);
2956 ldlm_lock_decref(oinfo->oi_lockh, LCK_PW);
2964 [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
2965 [DLM_LOCKREQ_OFF] = sizeof(struct ldlm_request),
2966 [DLM_LOCKREQ_OFF + 1] = 0 };
2968 req = ldlm_prep_enqueue_req(exp, 2, size, NULL, 0);
2972 size[DLM_LOCKREPLY_OFF] = sizeof(*rep);
2973 size[DLM_REPLY_REC_OFF] =
2974 sizeof(oinfo->oi_md->lsm_oinfo[0]->loi_lvb);
2975 ptlrpc_req_set_repsize(req, 3, size);
2978 /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2979 oinfo->oi_flags &= ~LDLM_FL_BLOCK_GRANTED;
2981 rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id,
2982 &oinfo->oi_policy, &oinfo->oi_flags,
2983 &oinfo->oi_md->lsm_oinfo[0]->loi_lvb,
2984 sizeof(oinfo->oi_md->lsm_oinfo[0]->loi_lvb),
2985 lustre_swab_ost_lvb, oinfo->oi_lockh,
2989 struct osc_enqueue_args *aa;
2990 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2991 aa = (struct osc_enqueue_args *)&req->rq_async_args;
2996 req->rq_interpret_reply = osc_enqueue_interpret;
2997 ptlrpc_set_add_req(rqset, req);
2998 } else if (intent) {
2999 ptlrpc_req_finished(req);
3004 rc = osc_enqueue_fini(req, oinfo, intent, rc);
3006 ptlrpc_req_finished(req);
3011 static int osc_match(struct obd_export *exp, struct lov_stripe_md *lsm,
3012 __u32 type, ldlm_policy_data_t *policy, __u32 mode,
3013 int *flags, void *data, struct lustre_handle *lockh)
3015 struct ldlm_res_id res_id = { .name = {0} };
3016 struct obd_device *obd = exp->exp_obd;
3018 int lflags = *flags;
3021 res_id.name[0] = lsm->lsm_object_id;
3022 res_id.name[2] = lsm->lsm_object_gr;
3024 OBD_FAIL_RETURN(OBD_FAIL_OSC_MATCH, -EIO);
3026 /* Filesystem lock extents are extended to page boundaries so that
3027 * dealing with the page cache is a little smoother */
3028 policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
3029 policy->l_extent.end |= ~CFS_PAGE_MASK;
3031 /* Next, search for already existing extent locks that will cover us */
3032 rc = ldlm_lock_match(obd->obd_namespace, lflags | LDLM_FL_LVB_READY,
3033 &res_id, type, policy, mode, lockh);
3035 //if (!(*flags & LDLM_FL_TEST_LOCK))
3036 osc_set_data_with_check(lockh, data, lflags);
3039 /* If we're trying to read, we also search for an existing PW lock. The
3040 * VFS and page cache already protect us locally, so lots of readers/
3041 * writers can share a single PW lock. */
3042 if (mode == LCK_PR) {
3043 rc = ldlm_lock_match(obd->obd_namespace,
3044 lflags | LDLM_FL_LVB_READY, &res_id,
3045 type, policy, LCK_PW, lockh);
3046 if (rc == 1 && !(lflags & LDLM_FL_TEST_LOCK)) {
3047 /* FIXME: This is not incredibly elegant, but it might
3048 * be more elegant than adding another parameter to
3049 * lock_match. I want a second opinion. */
3050 osc_set_data_with_check(lockh, data, lflags);
3051 ldlm_lock_addref(lockh, LCK_PR);
3052 ldlm_lock_decref(lockh, LCK_PW);
3058 static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
3059 __u32 mode, struct lustre_handle *lockh)
3063 if (unlikely(mode == LCK_GROUP))
3064 ldlm_lock_decref_and_cancel(lockh, mode);
3066 ldlm_lock_decref(lockh, mode);
3071 static int osc_cancel_unused(struct obd_export *exp,
3072 struct lov_stripe_md *lsm, int flags,
3075 struct obd_device *obd = class_exp2obd(exp);
3076 struct ldlm_res_id res_id = { .name = {0} }, *resp = NULL;
3079 res_id.name[0] = lsm->lsm_object_id;
3080 res_id.name[2] = lsm->lsm_object_gr;
3084 return ldlm_cli_cancel_unused(obd->obd_namespace, resp, flags, opaque);
3087 static int osc_join_lru(struct obd_export *exp,
3088 struct lov_stripe_md *lsm, int join)
3090 struct obd_device *obd = class_exp2obd(exp);
3091 struct ldlm_res_id res_id = { .name = {0} }, *resp = NULL;
3094 res_id.name[0] = lsm->lsm_object_id;
3095 res_id.name[2] = lsm->lsm_object_gr;
3099 return ldlm_cli_join_lru(obd->obd_namespace, resp, join);
3102 static int osc_statfs_interpret(struct ptlrpc_request *req,
3103 struct osc_async_args *aa, int rc)
3105 struct obd_statfs *msfs;
3111 msfs = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*msfs),
3112 lustre_swab_obd_statfs);
3114 CERROR("Can't unpack obd_statfs\n");
3115 GOTO(out, rc = -EPROTO);
3118 memcpy(aa->aa_oi->oi_osfs, msfs, sizeof(*msfs));
3120 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
3124 static int osc_statfs_async(struct obd_device *obd, struct obd_info *oinfo,
3125 __u64 max_age, struct ptlrpc_request_set *rqset)
3127 struct ptlrpc_request *req;
3128 struct osc_async_args *aa;
3129 int size[2] = { sizeof(struct ptlrpc_body), sizeof(*oinfo->oi_osfs) };
3132 /* We could possibly pass max_age in the request (as an absolute
3133 * timestamp or a "seconds.usec ago") so the target can avoid doing
3134 * extra calls into the filesystem if that isn't necessary (e.g.
3135 * during mount that would help a bit). Having relative timestamps
3136 * is not so great if request processing is slow, while absolute
3137 * timestamps are not ideal because they need time synchronization. */
3138 req = ptlrpc_prep_req(obd->u.cli.cl_import, LUSTRE_OST_VERSION,
3139 OST_STATFS, 1, NULL, NULL);
3143 ptlrpc_req_set_repsize(req, 2, size);
3144 req->rq_request_portal = OST_CREATE_PORTAL; //XXX FIXME bug 249
3146 req->rq_interpret_reply = osc_statfs_interpret;
3147 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
3148 aa = (struct osc_async_args *)&req->rq_async_args;
3151 ptlrpc_set_add_req(rqset, req);
3155 static int osc_statfs(struct obd_device *obd, struct obd_statfs *osfs,
3158 struct obd_statfs *msfs;
3159 struct ptlrpc_request *req;
3160 int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*osfs) };
3163 /* We could possibly pass max_age in the request (as an absolute
3164 * timestamp or a "seconds.usec ago") so the target can avoid doing
3165 * extra calls into the filesystem if that isn't necessary (e.g.
3166 * during mount that would help a bit). Having relative timestamps
3167 * is not so great if request processing is slow, while absolute
3168 * timestamps are not ideal because they need time synchronization. */
3169 req = ptlrpc_prep_req(obd->u.cli.cl_import, LUSTRE_OST_VERSION,
3170 OST_STATFS, 1, NULL, NULL);
3174 ptlrpc_req_set_repsize(req, 2, size);
3175 req->rq_request_portal = OST_CREATE_PORTAL; //XXX FIXME bug 249
3177 rc = ptlrpc_queue_wait(req);
3181 msfs = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*msfs),
3182 lustre_swab_obd_statfs);
3184 CERROR("Can't unpack obd_statfs\n");
3185 GOTO(out, rc = -EPROTO);
3188 memcpy(osfs, msfs, sizeof(*osfs));
3192 ptlrpc_req_finished(req);
3196 /* Retrieve object striping information.
3198 * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
3199 * the maximum number of OST indices which will fit in the user buffer.
3200 * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
3202 static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
3204 struct lov_user_md lum, *lumk;
3205 int rc = 0, lum_size;
3211 if (copy_from_user(&lum, lump, sizeof(lum)))
3214 if (lum.lmm_magic != LOV_USER_MAGIC)
3217 if (lum.lmm_stripe_count > 0) {
3218 lum_size = sizeof(lum) + sizeof(lum.lmm_objects[0]);
3219 OBD_ALLOC(lumk, lum_size);
3223 lumk->lmm_objects[0].l_object_id = lsm->lsm_object_id;
3224 lumk->lmm_objects[0].l_object_gr = lsm->lsm_object_gr;
3226 lum_size = sizeof(lum);
3230 lumk->lmm_object_id = lsm->lsm_object_id;
3231 lumk->lmm_object_gr = lsm->lsm_object_gr;
3232 lumk->lmm_stripe_count = 1;
3234 if (copy_to_user(lump, lumk, lum_size))
3238 OBD_FREE(lumk, lum_size);
3244 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
3245 void *karg, void *uarg)
3247 struct obd_device *obd = exp->exp_obd;
3248 struct obd_ioctl_data *data = karg;
3252 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
3255 if (!try_module_get(THIS_MODULE)) {
3256 CERROR("Can't get module. Is it alive?");
3261 case OBD_IOC_LOV_GET_CONFIG: {
3263 struct lov_desc *desc;
3264 struct obd_uuid uuid;
3268 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
3269 GOTO(out, err = -EINVAL);
3271 data = (struct obd_ioctl_data *)buf;
3273 if (sizeof(*desc) > data->ioc_inllen1) {
3274 obd_ioctl_freedata(buf, len);
3275 GOTO(out, err = -EINVAL);
3278 if (data->ioc_inllen2 < sizeof(uuid)) {
3279 obd_ioctl_freedata(buf, len);
3280 GOTO(out, err = -EINVAL);
3283 desc = (struct lov_desc *)data->ioc_inlbuf1;
3284 desc->ld_tgt_count = 1;
3285 desc->ld_active_tgt_count = 1;
3286 desc->ld_default_stripe_count = 1;
3287 desc->ld_default_stripe_size = 0;
3288 desc->ld_default_stripe_offset = 0;
3289 desc->ld_pattern = 0;
3290 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
3292 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
3294 err = copy_to_user((void *)uarg, buf, len);
3297 obd_ioctl_freedata(buf, len);
3300 case LL_IOC_LOV_SETSTRIPE:
3301 err = obd_alloc_memmd(exp, karg);
3305 case LL_IOC_LOV_GETSTRIPE:
3306 err = osc_getstripe(karg, uarg);
3308 case OBD_IOC_CLIENT_RECOVER:
3309 err = ptlrpc_recover_import(obd->u.cli.cl_import,
3314 case IOC_OSC_SET_ACTIVE:
3315 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
3318 case OBD_IOC_POLL_QUOTACHECK:
3319 err = lquota_poll_check(quota_interface, exp,
3320 (struct if_quotacheck *)karg);
3323 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
3324 cmd, cfs_curproc_comm());
3325 GOTO(out, err = -ENOTTY);
3328 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
3331 module_put(THIS_MODULE);
3336 static int osc_get_info(struct obd_export *exp, obd_count keylen,
3337 void *key, __u32 *vallen, void *val)
3340 if (!vallen || !val)
3343 if (keylen > strlen("lock_to_stripe") &&
3344 strcmp(key, "lock_to_stripe") == 0) {
3345 __u32 *stripe = val;
3346 *vallen = sizeof(*stripe);
3349 } else if (keylen >= strlen("last_id") && strcmp(key, "last_id") == 0) {
3350 struct ptlrpc_request *req;
3352 char *bufs[2] = { NULL, key };
3353 int rc, size[2] = { sizeof(struct ptlrpc_body), keylen };
3355 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
3356 OST_GET_INFO, 2, size, bufs);
3360 size[REPLY_REC_OFF] = *vallen;
3361 ptlrpc_req_set_repsize(req, 2, size);
3362 rc = ptlrpc_queue_wait(req);
3366 reply = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*reply),
3367 lustre_swab_ost_last_id);
3368 if (reply == NULL) {
3369 CERROR("Can't unpack OST last ID\n");
3370 GOTO(out, rc = -EPROTO);
3372 *((obd_id *)val) = *reply;
3374 ptlrpc_req_finished(req);
3380 static int osc_setinfo_mds_conn_interpret(struct ptlrpc_request *req,
3383 struct llog_ctxt *ctxt;
3384 struct obd_import *imp = req->rq_import;
3390 ctxt = llog_get_context(imp->imp_obd, LLOG_MDS_OST_ORIG_CTXT);
3393 rc = llog_initiator_connect(ctxt);
3395 CERROR("cannot establish connection for "
3396 "ctxt %p: %d\n", ctxt, rc);
3399 spin_lock(&imp->imp_lock);
3400 imp->imp_server_timeout = 1;
3401 imp->imp_pingable = 1;
3402 spin_unlock(&imp->imp_lock);
3403 CDEBUG(D_HA, "pinging OST %s\n", obd2cli_tgt(imp->imp_obd));
3408 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
3409 void *key, obd_count vallen, void *val,
3410 struct ptlrpc_request_set *set)
3412 struct ptlrpc_request *req;
3413 struct obd_device *obd = exp->exp_obd;
3414 struct obd_import *imp = class_exp2cliimp(exp);
3415 int size[3] = { sizeof(struct ptlrpc_body), keylen, vallen };
3416 char *bufs[3] = { NULL, key, val };
3419 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
3421 if (KEY_IS(KEY_NEXT_ID)) {
3422 if (vallen != sizeof(obd_id))
3424 obd->u.cli.cl_oscc.oscc_next_id = *((obd_id*)val) + 1;
3425 CDEBUG(D_HA, "%s: set oscc_next_id = "LPU64"\n",
3426 exp->exp_obd->obd_name,
3427 obd->u.cli.cl_oscc.oscc_next_id);
3432 if (KEY_IS("unlinked")) {
3433 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3434 spin_lock(&oscc->oscc_lock);
3435 oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
3436 spin_unlock(&oscc->oscc_lock);
3440 if (KEY_IS(KEY_INIT_RECOV)) {
3441 if (vallen != sizeof(int))
3443 spin_lock(&imp->imp_lock);
3444 imp->imp_initial_recov = *(int *)val;
3445 spin_unlock(&imp->imp_lock);
3446 CDEBUG(D_HA, "%s: set imp_initial_recov = %d\n",
3447 exp->exp_obd->obd_name,
3448 imp->imp_initial_recov);
3452 if (KEY_IS("checksum")) {
3453 if (vallen != sizeof(int))
3455 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
3459 if (KEY_IS(KEY_FLUSH_CTX)) {
3460 sptlrpc_import_flush_my_ctx(imp);
3467 /* We pass all other commands directly to OST. Since nobody calls osc
3468 methods directly and everybody is supposed to go through LOV, we
3469 assume lov checked invalid values for us.
3470 The only recognised values so far are evict_by_nid and mds_conn.
3471 Even if something bad goes through, we'd get a -EINVAL from OST
3474 req = ptlrpc_prep_req(imp, LUSTRE_OST_VERSION, OST_SET_INFO, 3, size,
3479 if (KEY_IS("mds_conn")) {
3480 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3482 oscc->oscc_oa.o_gr = (*(__u32 *)val);
3483 oscc->oscc_oa.o_valid |= OBD_MD_FLGROUP;
3484 LASSERT(oscc->oscc_oa.o_gr > 0);
3485 req->rq_interpret_reply = osc_setinfo_mds_conn_interpret;
3488 ptlrpc_req_set_repsize(req, 1, NULL);
3489 ptlrpc_set_add_req(set, req);
3490 ptlrpc_check_set(set);
3496 static struct llog_operations osc_size_repl_logops = {
3497 lop_cancel: llog_obd_repl_cancel
3500 static struct llog_operations osc_mds_ost_orig_logops;
3501 static int osc_llog_init(struct obd_device *obd, struct obd_llogs *llogs,
3502 struct obd_device *tgt, int count,
3503 struct llog_catid *catid, struct obd_uuid *uuid)
3508 spin_lock(&obd->obd_dev_lock);
3509 if (osc_mds_ost_orig_logops.lop_setup != llog_obd_origin_setup) {
3510 osc_mds_ost_orig_logops = llog_lvfs_ops;
3511 osc_mds_ost_orig_logops.lop_setup = llog_obd_origin_setup;
3512 osc_mds_ost_orig_logops.lop_cleanup = llog_obd_origin_cleanup;
3513 osc_mds_ost_orig_logops.lop_add = llog_obd_origin_add;
3514 osc_mds_ost_orig_logops.lop_connect = llog_origin_connect;
3516 spin_unlock(&obd->obd_dev_lock);
3518 rc = llog_setup(obd, llogs, LLOG_MDS_OST_ORIG_CTXT, tgt, count,
3519 &catid->lci_logid, &osc_mds_ost_orig_logops);
3521 CERROR("failed LLOG_MDS_OST_ORIG_CTXT\n");
3525 rc = llog_setup(obd, llogs, LLOG_SIZE_REPL_CTXT, tgt, count, NULL,
3526 &osc_size_repl_logops);
3528 CERROR("failed LLOG_SIZE_REPL_CTXT\n");
3531 CERROR("osc '%s' tgt '%s' cnt %d catid %p rc=%d\n",
3532 obd->obd_name, tgt->obd_name, count, catid, rc);
3533 CERROR("logid "LPX64":0x%x\n",
3534 catid->lci_logid.lgl_oid, catid->lci_logid.lgl_ogen);
3539 static int osc_llog_finish(struct obd_device *obd, int count)
3541 struct llog_ctxt *ctxt;
3542 int rc = 0, rc2 = 0;
3545 ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
3547 rc = llog_cleanup(ctxt);
3549 ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3551 rc2 = llog_cleanup(ctxt);
3558 static int osc_reconnect(struct obd_export *exp, struct obd_device *obd,
3559 struct obd_uuid *cluuid,
3560 struct obd_connect_data *data)
3562 struct client_obd *cli = &obd->u.cli;
3564 if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
3567 client_obd_list_lock(&cli->cl_loi_list_lock);
3568 data->ocd_grant = cli->cl_avail_grant ?:
3569 2 * cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT;
3570 lost_grant = cli->cl_lost_grant;
3571 cli->cl_lost_grant = 0;
3572 client_obd_list_unlock(&cli->cl_loi_list_lock);
3574 CDEBUG(D_CACHE, "request ocd_grant: %d cl_avail_grant: %ld "
3575 "cl_lost_grant: %ld\n", data->ocd_grant,
3576 cli->cl_avail_grant, lost_grant);
3577 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
3578 " ocd_grant: %d\n", data->ocd_connect_flags,
3579 data->ocd_version, data->ocd_grant);
3585 static int osc_disconnect(struct obd_export *exp)
3587 struct obd_device *obd = class_exp2obd(exp);
3588 struct llog_ctxt *ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3591 if (obd->u.cli.cl_conn_count == 1)
3592 /* flush any remaining cancel messages out to the target */
3593 llog_sync(ctxt, exp);
3595 rc = client_disconnect_export(exp);
3599 static int osc_import_event(struct obd_device *obd,
3600 struct obd_import *imp,
3601 enum obd_import_event event)
3603 struct client_obd *cli;
3607 LASSERT(imp->imp_obd == obd);
3610 case IMP_EVENT_DISCON: {
3611 /* Only do this on the MDS OSC's */
3612 if (imp->imp_server_timeout) {
3613 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3615 spin_lock(&oscc->oscc_lock);
3616 oscc->oscc_flags |= OSCC_FLAG_RECOVERING;
3617 spin_unlock(&oscc->oscc_lock);
3620 client_obd_list_lock(&cli->cl_loi_list_lock);
3621 cli->cl_avail_grant = 0;
3622 cli->cl_lost_grant = 0;
3623 client_obd_list_unlock(&cli->cl_loi_list_lock);
3626 case IMP_EVENT_INACTIVE: {
3627 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
3630 case IMP_EVENT_INVALIDATE: {
3631 struct ldlm_namespace *ns = obd->obd_namespace;
3635 client_obd_list_lock(&cli->cl_loi_list_lock);
3636 /* all pages go to failing rpcs due to the invalid import */
3637 osc_check_rpcs(cli);
3638 client_obd_list_unlock(&cli->cl_loi_list_lock);
3640 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3644 case IMP_EVENT_ACTIVE: {
3645 /* Only do this on the MDS OSC's */
3646 if (imp->imp_server_timeout) {
3647 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3649 spin_lock(&oscc->oscc_lock);
3650 oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
3651 spin_unlock(&oscc->oscc_lock);
3653 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
3656 case IMP_EVENT_OCD: {
3657 struct obd_connect_data *ocd = &imp->imp_connect_data;
3659 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
3660 osc_init_grant(&obd->u.cli, ocd);
3663 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
3664 imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
3666 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
3670 CERROR("Unknown import event %d\n", event);
3676 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
3682 rc = ptlrpcd_addref();
3686 rc = client_obd_setup(obd, lcfg);
3690 struct lprocfs_static_vars lvars;
3691 struct client_obd *cli = &obd->u.cli;
3693 lprocfs_init_vars(osc, &lvars);
3694 if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
3695 lproc_osc_attach_seqstat(obd);
3696 ptlrpc_lprocfs_register_obd(obd);
3700 /* We need to allocate a few requests more, because
3701 brw_interpret_oap tries to create new requests before freeing
3702 previous ones. Ideally we want to have 2x max_rpcs_in_flight
3703 reserved, but I afraid that might be too much wasted RAM
3704 in fact, so 2 is just my guess and still should work. */
3705 cli->cl_import->imp_rq_pool =
3706 ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
3708 ptlrpc_add_rqs_to_pool);
3714 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
3720 case OBD_CLEANUP_EARLY: {
3721 struct obd_import *imp;
3722 imp = obd->u.cli.cl_import;
3723 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
3724 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
3725 ptlrpc_deactivate_import(imp);
3726 spin_lock(&imp->imp_lock);
3727 imp->imp_pingable = 0;
3728 spin_unlock(&imp->imp_lock);
3731 case OBD_CLEANUP_EXPORTS: {
3732 /* If we set up but never connected, the
3733 client import will not have been cleaned. */
3734 if (obd->u.cli.cl_import) {
3735 struct obd_import *imp;
3736 imp = obd->u.cli.cl_import;
3737 CDEBUG(D_CONFIG, "%s: client import never connected\n",
3739 ptlrpc_invalidate_import(imp);
3740 ptlrpc_free_rq_pool(imp->imp_rq_pool);
3741 class_destroy_import(imp);
3742 obd->u.cli.cl_import = NULL;
3746 case OBD_CLEANUP_SELF_EXP:
3747 rc = obd_llog_finish(obd, 0);
3749 CERROR("failed to cleanup llogging subsystems\n");
3751 case OBD_CLEANUP_OBD:
3757 int osc_cleanup(struct obd_device *obd)
3759 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3763 ptlrpc_lprocfs_unregister_obd(obd);
3764 lprocfs_obd_cleanup(obd);
3766 spin_lock(&oscc->oscc_lock);
3767 oscc->oscc_flags &= ~OSCC_FLAG_RECOVERING;
3768 oscc->oscc_flags |= OSCC_FLAG_EXITING;
3769 spin_unlock(&oscc->oscc_lock);
3771 /* free memory of osc quota cache */
3772 lquota_cleanup(quota_interface, obd);
3774 rc = client_obd_cleanup(obd);
3780 static int osc_process_config(struct obd_device *obd, obd_count len, void *buf)
3782 struct lustre_cfg *lcfg = buf;
3783 struct lprocfs_static_vars lvars;
3786 lprocfs_init_vars(osc, &lvars);
3788 rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars, lcfg, obd);
3792 struct obd_ops osc_obd_ops = {
3793 .o_owner = THIS_MODULE,
3794 .o_setup = osc_setup,
3795 .o_precleanup = osc_precleanup,
3796 .o_cleanup = osc_cleanup,
3797 .o_add_conn = client_import_add_conn,
3798 .o_del_conn = client_import_del_conn,
3799 .o_connect = client_connect_import,
3800 .o_reconnect = osc_reconnect,
3801 .o_disconnect = osc_disconnect,
3802 .o_statfs = osc_statfs,
3803 .o_statfs_async = osc_statfs_async,
3804 .o_packmd = osc_packmd,
3805 .o_unpackmd = osc_unpackmd,
3806 .o_precreate = osc_precreate,
3807 .o_create = osc_create,
3808 .o_destroy = osc_destroy,
3809 .o_getattr = osc_getattr,
3810 .o_getattr_async = osc_getattr_async,
3811 .o_setattr = osc_setattr,
3812 .o_setattr_async = osc_setattr_async,
3814 .o_brw_async = osc_brw_async,
3815 .o_prep_async_page = osc_prep_async_page,
3816 .o_queue_async_io = osc_queue_async_io,
3817 .o_set_async_flags = osc_set_async_flags,
3818 .o_queue_group_io = osc_queue_group_io,
3819 .o_trigger_group_io = osc_trigger_group_io,
3820 .o_teardown_async_page = osc_teardown_async_page,
3821 .o_punch = osc_punch,
3823 .o_enqueue = osc_enqueue,
3824 .o_match = osc_match,
3825 .o_change_cbdata = osc_change_cbdata,
3826 .o_cancel = osc_cancel,
3827 .o_cancel_unused = osc_cancel_unused,
3828 .o_join_lru = osc_join_lru,
3829 .o_iocontrol = osc_iocontrol,
3830 .o_get_info = osc_get_info,
3831 .o_set_info_async = osc_set_info_async,
3832 .o_import_event = osc_import_event,
3833 .o_llog_init = osc_llog_init,
3834 .o_llog_finish = osc_llog_finish,
3835 .o_process_config = osc_process_config,
3837 int __init osc_init(void)
3839 struct lprocfs_static_vars lvars;
3843 lprocfs_init_vars(osc, &lvars);
3845 request_module("lquota");
3846 quota_interface = PORTAL_SYMBOL_GET(osc_quota_interface);
3847 lquota_init(quota_interface);
3848 init_obd_quota_ops(quota_interface, &osc_obd_ops);
3850 rc = class_register_type(&osc_obd_ops, NULL, lvars.module_vars,
3851 LUSTRE_OSC_NAME, NULL);
3853 if (quota_interface)
3854 PORTAL_SYMBOL_PUT(osc_quota_interface);
3862 static void /*__exit*/ osc_exit(void)
3864 lquota_exit(quota_interface);
3865 if (quota_interface)
3866 PORTAL_SYMBOL_PUT(osc_quota_interface);
3868 class_unregister_type(LUSTRE_OSC_NAME);
3871 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
3872 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3873 MODULE_LICENSE("GPL");
3875 cfs_module(osc, LUSTRE_VERSION_STRING, osc_init, osc_exit);