1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (C) 2001-2003 Cluster File Systems, Inc.
5 * Author Peter Braam <braam@clusterfs.com>
7 * This file is part of the Lustre file system, http://www.lustre.org
8 * Lustre is a trademark of Cluster File Systems, Inc.
10 * You may have signed or agreed to another license before downloading
11 * this software. If so, you are bound by the terms and conditions
12 * of that agreement, and the following does not apply to you. See the
13 * LICENSE file included with this distribution for more information.
15 * If you did not agree to a different license, then this copy of Lustre
16 * is open source software; you can redistribute it and/or modify it
17 * under the terms of version 2 of the GNU General Public License as
18 * published by the Free Software Foundation.
20 * In either case, Lustre is distributed in the hope that it will be
21 * useful, but WITHOUT ANY WARRANTY; without even the implied warranty
22 * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23 * license text for more details.
25 * For testing and management it is treated as an obd_device,
26 * although * it does not export a full OBD method table (the
27 * requests are coming * in over the wire, so object target modules
28 * do not have a full * method table.)
33 # define EXPORT_SYMTAB
35 #define DEBUG_SUBSYSTEM S_OSC
38 # include <libcfs/libcfs.h>
39 #else /* __KERNEL__ */
40 # include <liblustre.h>
43 #include <lustre_dlm.h>
44 #include <libcfs/kp30.h>
45 #include <lustre_net.h>
46 #include <lustre/lustre_user.h>
54 #include <lustre_ha.h>
55 #include <lprocfs_status.h>
56 #include <lustre_log.h>
57 #include <lustre_debug.h>
58 #include <lustre_param.h>
59 #include "osc_internal.h"
61 static quota_interface_t *quota_interface = NULL;
62 extern quota_interface_t osc_quota_interface;
64 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
67 atomic_t osc_resend_time;
69 /* Pack OSC object metadata for disk storage (LE byte order). */
70 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
71 struct lov_stripe_md *lsm)
76 lmm_size = sizeof(**lmmp);
81 OBD_FREE(*lmmp, lmm_size);
87 OBD_ALLOC(*lmmp, lmm_size);
93 LASSERT(lsm->lsm_object_id);
94 LASSERT(lsm->lsm_object_gr);
95 (*lmmp)->lmm_object_id = cpu_to_le64(lsm->lsm_object_id);
96 (*lmmp)->lmm_object_gr = cpu_to_le64(lsm->lsm_object_gr);
102 /* Unpack OSC object metadata from disk storage (LE byte order). */
103 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
104 struct lov_mds_md *lmm, int lmm_bytes)
110 if (lmm_bytes < sizeof (*lmm)) {
111 CERROR("lov_mds_md too small: %d, need %d\n",
112 lmm_bytes, (int)sizeof(*lmm));
115 /* XXX LOV_MAGIC etc check? */
117 if (lmm->lmm_object_id == 0) {
118 CERROR("lov_mds_md: zero lmm_object_id\n");
123 lsm_size = lov_stripe_md_size(1);
127 if (*lsmp != NULL && lmm == NULL) {
128 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
129 OBD_FREE(*lsmp, lsm_size);
135 OBD_ALLOC(*lsmp, lsm_size);
138 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
139 if ((*lsmp)->lsm_oinfo[0] == NULL) {
140 OBD_FREE(*lsmp, lsm_size);
143 loi_init((*lsmp)->lsm_oinfo[0]);
147 /* XXX zero *lsmp? */
148 (*lsmp)->lsm_object_id = le64_to_cpu (lmm->lmm_object_id);
149 (*lsmp)->lsm_object_gr = le64_to_cpu (lmm->lmm_object_gr);
150 LASSERT((*lsmp)->lsm_object_id);
151 LASSERT((*lsmp)->lsm_object_gr);
154 (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
159 static inline void osc_pack_capa(struct ptlrpc_request *req, int offset,
160 struct ost_body *body, void *capa)
162 struct obd_capa *oc = (struct obd_capa *)capa;
163 struct lustre_capa *c;
168 c = lustre_msg_buf(req->rq_reqmsg, offset, sizeof(*c));
171 body->oa.o_valid |= OBD_MD_FLOSSCAPA;
172 DEBUG_CAPA(D_SEC, c, "pack");
175 static inline void osc_pack_req_body(struct ptlrpc_request *req, int offset,
176 struct obd_info *oinfo)
178 struct ost_body *body;
180 body = lustre_msg_buf(req->rq_reqmsg, offset, sizeof(*body));
181 body->oa = *oinfo->oi_oa;
182 osc_pack_capa(req, offset + 1, body, oinfo->oi_capa);
185 static int osc_getattr_interpret(struct ptlrpc_request *req,
186 struct osc_async_args *aa, int rc)
188 struct ost_body *body;
194 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
195 lustre_swab_ost_body);
197 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
198 memcpy(aa->aa_oi->oi_oa, &body->oa, sizeof(*aa->aa_oi->oi_oa));
200 /* This should really be sent by the OST */
201 aa->aa_oi->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
202 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
204 CERROR("can't unpack ost_body\n");
206 aa->aa_oi->oi_oa->o_valid = 0;
209 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
213 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
214 struct ptlrpc_request_set *set)
216 struct ptlrpc_request *req;
217 struct ost_body *body;
218 int size[3] = { sizeof(struct ptlrpc_body), sizeof(*body) };
219 struct osc_async_args *aa;
222 size[REQ_REC_OFF + 1] = oinfo->oi_capa ? sizeof(struct lustre_capa) : 0;
223 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
224 OST_GETATTR, 3, size,NULL);
228 osc_pack_req_body(req, REQ_REC_OFF, oinfo);
230 ptlrpc_req_set_repsize(req, 2, size);
231 req->rq_interpret_reply = osc_getattr_interpret;
233 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
234 aa = (struct osc_async_args *)&req->rq_async_args;
237 ptlrpc_set_add_req(set, req);
241 static int osc_getattr(struct obd_export *exp, struct obd_info *oinfo)
243 struct ptlrpc_request *req;
244 struct ost_body *body;
245 int rc, size[3] = { sizeof(struct ptlrpc_body), sizeof(*body) };
248 size[REQ_REC_OFF + 1] = oinfo->oi_capa ? sizeof(struct lustre_capa) : 0;
249 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
250 OST_GETATTR, 3, size, NULL);
254 osc_pack_req_body(req, REQ_REC_OFF, oinfo);
256 ptlrpc_req_set_repsize(req, 2, size);
258 rc = ptlrpc_queue_wait(req);
260 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
264 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
265 lustre_swab_ost_body);
267 CERROR ("can't unpack ost_body\n");
268 GOTO (out, rc = -EPROTO);
271 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
272 *oinfo->oi_oa = body->oa;
274 /* This should really be sent by the OST */
275 oinfo->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
276 oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
280 ptlrpc_req_finished(req);
284 static int osc_setattr(struct obd_export *exp, struct obd_info *oinfo,
285 struct obd_trans_info *oti)
287 struct ptlrpc_request *req;
288 struct ost_body *body;
289 int rc, size[3] = { sizeof(struct ptlrpc_body), sizeof(*body) };
292 LASSERT(!(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP) ||
293 oinfo->oi_oa->o_gr > 0);
294 size[REQ_REC_OFF + 1] = oinfo->oi_capa ? sizeof(struct lustre_capa) : 0;
295 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
296 OST_SETATTR, 3, size, NULL);
300 osc_pack_req_body(req, REQ_REC_OFF, oinfo);
302 ptlrpc_req_set_repsize(req, 2, size);
304 rc = ptlrpc_queue_wait(req);
308 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
309 lustre_swab_ost_body);
311 GOTO(out, rc = -EPROTO);
313 *oinfo->oi_oa = body->oa;
317 ptlrpc_req_finished(req);
321 static int osc_setattr_interpret(struct ptlrpc_request *req,
322 struct osc_async_args *aa, int rc)
324 struct ost_body *body;
330 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
331 lustre_swab_ost_body);
333 CERROR("can't unpack ost_body\n");
334 GOTO(out, rc = -EPROTO);
337 *aa->aa_oi->oi_oa = body->oa;
339 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
343 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
344 struct obd_trans_info *oti,
345 struct ptlrpc_request_set *rqset)
347 struct ptlrpc_request *req;
348 int size[3] = { sizeof(struct ptlrpc_body), sizeof(struct ost_body) };
349 struct osc_async_args *aa;
352 size[REQ_REC_OFF + 1] = oinfo->oi_capa ? sizeof(struct lustre_capa) : 0;
353 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
354 OST_SETATTR, 3, size, NULL);
358 osc_pack_req_body(req, REQ_REC_OFF, oinfo);
359 if (oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE) {
361 *obdo_logcookie(oinfo->oi_oa) = *oti->oti_logcookies;
364 ptlrpc_req_set_repsize(req, 2, size);
365 /* do mds to ost setattr asynchronouly */
367 /* Do not wait for response. */
368 ptlrpcd_add_req(req);
370 req->rq_interpret_reply = osc_setattr_interpret;
372 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
373 aa = (struct osc_async_args *)&req->rq_async_args;
376 ptlrpc_set_add_req(rqset, req);
382 int osc_real_create(struct obd_export *exp, struct obdo *oa,
383 struct lov_stripe_md **ea, struct obd_trans_info *oti)
385 struct ptlrpc_request *req;
386 struct ost_body *body;
387 struct lov_stripe_md *lsm;
388 int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
396 rc = obd_alloc_memmd(exp, &lsm);
401 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
402 OST_CREATE, 2, size, NULL);
404 GOTO(out, rc = -ENOMEM);
406 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
409 ptlrpc_req_set_repsize(req, 2, size);
410 if (oa->o_valid & OBD_MD_FLINLINE) {
411 LASSERT((oa->o_valid & OBD_MD_FLFLAGS) &&
412 oa->o_flags == OBD_FL_DELORPHAN);
414 "delorphan from OST integration");
415 /* Don't resend the delorphan req */
416 req->rq_no_resend = req->rq_no_delay = 1;
419 rc = ptlrpc_queue_wait(req);
423 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
424 lustre_swab_ost_body);
426 CERROR ("can't unpack ost_body\n");
427 GOTO (out_req, rc = -EPROTO);
432 /* This should really be sent by the OST */
433 oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
434 oa->o_valid |= OBD_MD_FLBLKSZ;
436 /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
437 * have valid lsm_oinfo data structs, so don't go touching that.
438 * This needs to be fixed in a big way.
440 lsm->lsm_object_id = oa->o_id;
441 lsm->lsm_object_gr = oa->o_gr;
445 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
447 if (oa->o_valid & OBD_MD_FLCOOKIE) {
448 if (!oti->oti_logcookies)
449 oti_alloc_cookies(oti, 1);
450 *oti->oti_logcookies = *obdo_logcookie(oa);
454 CDEBUG(D_HA, "transno: "LPD64"\n",
455 lustre_msg_get_transno(req->rq_repmsg));
458 ptlrpc_req_finished(req);
461 obd_free_memmd(exp, &lsm);
465 static int osc_punch_interpret(struct ptlrpc_request *req,
466 struct osc_async_args *aa, int rc)
468 struct ost_body *body;
474 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof (*body),
475 lustre_swab_ost_body);
477 CERROR ("can't unpack ost_body\n");
478 GOTO(out, rc = -EPROTO);
481 *aa->aa_oi->oi_oa = body->oa;
483 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
487 static int osc_punch(struct obd_export *exp, struct obd_info *oinfo,
488 struct obd_trans_info *oti,
489 struct ptlrpc_request_set *rqset)
491 struct ptlrpc_request *req;
492 struct osc_async_args *aa;
493 struct ost_body *body;
494 int size[3] = { sizeof(struct ptlrpc_body), sizeof(*body) };
502 size[REQ_REC_OFF + 1] = oinfo->oi_capa? sizeof(struct lustre_capa) : 0;
503 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
504 OST_PUNCH, 3, size, NULL);
508 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
510 osc_pack_req_body(req, REQ_REC_OFF, oinfo);
511 /* overload the size and blocks fields in the oa with start/end */
512 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
513 body->oa.o_size = oinfo->oi_policy.l_extent.start;
514 body->oa.o_blocks = oinfo->oi_policy.l_extent.end;
515 body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
517 ptlrpc_req_set_repsize(req, 2, size);
519 req->rq_interpret_reply = osc_punch_interpret;
520 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
521 aa = (struct osc_async_args *)&req->rq_async_args;
523 ptlrpc_set_add_req(rqset, req);
528 static int osc_sync(struct obd_export *exp, struct obdo *oa,
529 struct lov_stripe_md *md, obd_size start, obd_size end,
532 struct ptlrpc_request *req;
533 struct ost_body *body;
534 int rc, size[3] = { sizeof(struct ptlrpc_body), sizeof(*body) };
542 size[REQ_REC_OFF + 1] = capa ? sizeof(struct lustre_capa) : 0;
544 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
545 OST_SYNC, 3, size, NULL);
549 /* overload the size and blocks fields in the oa with start/end */
550 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
552 body->oa.o_size = start;
553 body->oa.o_blocks = end;
554 body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
556 osc_pack_capa(req, REQ_REC_OFF + 1, body, capa);
558 ptlrpc_req_set_repsize(req, 2, size);
560 rc = ptlrpc_queue_wait(req);
564 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
565 lustre_swab_ost_body);
567 CERROR ("can't unpack ost_body\n");
568 GOTO (out, rc = -EPROTO);
575 ptlrpc_req_finished(req);
579 /* Find and cancel locally locks matched by @mode in the resource found by
580 * @objid. Found locks are added into @cancel list. Returns the amount of
581 * locks added to @cancels list. */
582 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
583 struct list_head *cancels, ldlm_mode_t mode,
586 struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
587 struct ldlm_res_id res_id = { .name = { oa->o_id, 0, oa->o_gr, 0 } };
588 struct ldlm_resource *res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
595 count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
596 lock_flags, 0, NULL);
597 ldlm_resource_putref(res);
601 /* Destroy requests can be async always on the client, and we don't even really
602 * care about the return code since the client cannot do anything at all about
604 * When the MDS is unlinking a filename, it saves the file objects into a
605 * recovery llog, and these object records are cancelled when the OST reports
606 * they were destroyed and sync'd to disk (i.e. transaction committed).
607 * If the client dies, or the OST is down when the object should be destroyed,
608 * the records are not cancelled, and when the OST reconnects to the MDS next,
609 * it will retrieve the llog unlink logs and then sends the log cancellation
610 * cookies to the MDS after committing destroy transactions. */
611 static int osc_destroy(struct obd_export *exp, struct obdo *oa,
612 struct lov_stripe_md *ea, struct obd_trans_info *oti,
613 struct obd_export *md_export)
615 CFS_LIST_HEAD(cancels);
616 struct ptlrpc_request *req;
617 struct ost_body *body;
618 int size[3] = { sizeof(struct ptlrpc_body), sizeof(*body), 0 };
619 int count, bufcount = 2;
627 count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
628 LDLM_FL_DISCARD_DATA);
629 if (exp_connect_cancelset(exp) && count) {
631 size[REQ_REC_OFF + 1] = ldlm_request_bufsize(count,OST_DESTROY);
633 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
634 OST_DESTROY, bufcount, size, NULL);
635 if (exp_connect_cancelset(exp) && req)
636 ldlm_cli_cancel_list(&cancels, count, req, REQ_REC_OFF + 1, 0);
638 ldlm_lock_list_put(&cancels, l_bl_ast, count);
643 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
645 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
646 if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
647 memcpy(obdo_logcookie(oa), oti->oti_logcookies,
648 sizeof(*oti->oti_logcookies));
651 ptlrpc_req_set_repsize(req, 2, size);
653 ptlrpcd_add_req(req);
657 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
660 obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
662 LASSERT(!(oa->o_valid & bits));
665 client_obd_list_lock(&cli->cl_loi_list_lock);
666 oa->o_dirty = cli->cl_dirty;
667 if (cli->cl_dirty > cli->cl_dirty_max) {
668 CERROR("dirty %lu > dirty_max %lu\n",
669 cli->cl_dirty, cli->cl_dirty_max);
671 } else if (atomic_read(&obd_dirty_pages) > obd_max_dirty_pages) {
672 CERROR("dirty %d > system dirty_max %d\n",
673 atomic_read(&obd_dirty_pages), obd_max_dirty_pages);
675 } else if (cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff) {
676 CERROR("dirty %lu - dirty_max %lu too big???\n",
677 cli->cl_dirty, cli->cl_dirty_max);
680 long max_in_flight = (cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT)*
681 (cli->cl_max_rpcs_in_flight + 1);
682 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
684 oa->o_grant = cli->cl_avail_grant;
685 oa->o_dropped = cli->cl_lost_grant;
686 cli->cl_lost_grant = 0;
687 client_obd_list_unlock(&cli->cl_loi_list_lock);
688 CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
689 oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
692 /* caller must hold loi_list_lock */
693 static void osc_consume_write_grant(struct client_obd *cli,
694 struct brw_page *pga)
696 atomic_inc(&obd_dirty_pages);
697 cli->cl_dirty += CFS_PAGE_SIZE;
698 cli->cl_avail_grant -= CFS_PAGE_SIZE;
699 pga->flag |= OBD_BRW_FROM_GRANT;
700 CDEBUG(D_CACHE, "using %lu grant credits for brw %p page %p\n",
701 CFS_PAGE_SIZE, pga, pga->pg);
702 LASSERT(cli->cl_avail_grant >= 0);
705 /* the companion to osc_consume_write_grant, called when a brw has completed.
706 * must be called with the loi lock held. */
707 static void osc_release_write_grant(struct client_obd *cli,
708 struct brw_page *pga, int sent)
710 int blocksize = cli->cl_import->imp_obd->obd_osfs.os_bsize ? : 4096;
713 if (!(pga->flag & OBD_BRW_FROM_GRANT)) {
718 pga->flag &= ~OBD_BRW_FROM_GRANT;
719 atomic_dec(&obd_dirty_pages);
720 cli->cl_dirty -= CFS_PAGE_SIZE;
722 cli->cl_lost_grant += CFS_PAGE_SIZE;
723 CDEBUG(D_CACHE, "lost grant: %lu avail grant: %lu dirty: %lu\n",
724 cli->cl_lost_grant, cli->cl_avail_grant, cli->cl_dirty);
725 } else if (CFS_PAGE_SIZE != blocksize && pga->count != CFS_PAGE_SIZE) {
726 /* For short writes we shouldn't count parts of pages that
727 * span a whole block on the OST side, or our accounting goes
728 * wrong. Should match the code in filter_grant_check. */
729 int offset = pga->off & ~CFS_PAGE_MASK;
730 int count = pga->count + (offset & (blocksize - 1));
731 int end = (offset + pga->count) & (blocksize - 1);
733 count += blocksize - end;
735 cli->cl_lost_grant += CFS_PAGE_SIZE - count;
736 CDEBUG(D_CACHE, "lost %lu grant: %lu avail: %lu dirty: %lu\n",
737 CFS_PAGE_SIZE - count, cli->cl_lost_grant,
738 cli->cl_avail_grant, cli->cl_dirty);
744 static unsigned long rpcs_in_flight(struct client_obd *cli)
746 return cli->cl_r_in_flight + cli->cl_w_in_flight;
749 /* caller must hold loi_list_lock */
750 void osc_wake_cache_waiters(struct client_obd *cli)
752 struct list_head *l, *tmp;
753 struct osc_cache_waiter *ocw;
756 list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
757 /* if we can't dirty more, we must wait until some is written */
758 if ((cli->cl_dirty + CFS_PAGE_SIZE > cli->cl_dirty_max) ||
759 (atomic_read(&obd_dirty_pages) + 1 > obd_max_dirty_pages)) {
760 CDEBUG(D_CACHE, "no dirty room: dirty: %ld "
761 "osc max %ld, sys max %d\n", cli->cl_dirty,
762 cli->cl_dirty_max, obd_max_dirty_pages);
766 /* if still dirty cache but no grant wait for pending RPCs that
767 * may yet return us some grant before doing sync writes */
768 if (cli->cl_w_in_flight && cli->cl_avail_grant < CFS_PAGE_SIZE) {
769 CDEBUG(D_CACHE, "%u BRW writes in flight, no grant\n",
770 cli->cl_w_in_flight);
774 ocw = list_entry(l, struct osc_cache_waiter, ocw_entry);
775 list_del_init(&ocw->ocw_entry);
776 if (cli->cl_avail_grant < CFS_PAGE_SIZE) {
777 /* no more RPCs in flight to return grant, do sync IO */
778 ocw->ocw_rc = -EDQUOT;
779 CDEBUG(D_INODE, "wake oap %p for sync\n", ocw->ocw_oap);
781 osc_consume_write_grant(cli,
782 &ocw->ocw_oap->oap_brw_page);
785 cfs_waitq_signal(&ocw->ocw_waitq);
791 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
793 client_obd_list_lock(&cli->cl_loi_list_lock);
794 cli->cl_avail_grant = ocd->ocd_grant;
795 client_obd_list_unlock(&cli->cl_loi_list_lock);
797 CDEBUG(D_CACHE, "setting cl_avail_grant: %ld cl_lost_grant: %ld\n",
798 cli->cl_avail_grant, cli->cl_lost_grant);
799 LASSERT(cli->cl_avail_grant >= 0);
802 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
804 client_obd_list_lock(&cli->cl_loi_list_lock);
805 CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
806 cli->cl_avail_grant += body->oa.o_grant;
807 /* waiters are woken in brw_interpret_oap */
808 client_obd_list_unlock(&cli->cl_loi_list_lock);
811 /* We assume that the reason this OSC got a short read is because it read
812 * beyond the end of a stripe file; i.e. lustre is reading a sparse file
813 * via the LOV, and it _knows_ it's reading inside the file, it's just that
814 * this stripe never got written at or beyond this stripe offset yet. */
815 static void handle_short_read(int nob_read, obd_count page_count,
816 struct brw_page **pga)
821 /* skip bytes read OK */
822 while (nob_read > 0) {
823 LASSERT (page_count > 0);
825 if (pga[i]->count > nob_read) {
826 /* EOF inside this page */
827 ptr = cfs_kmap(pga[i]->pg) +
828 (pga[i]->off & ~CFS_PAGE_MASK);
829 memset(ptr + nob_read, 0, pga[i]->count - nob_read);
830 cfs_kunmap(pga[i]->pg);
836 nob_read -= pga[i]->count;
841 /* zero remaining pages */
842 while (page_count-- > 0) {
843 ptr = cfs_kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
844 memset(ptr, 0, pga[i]->count);
845 cfs_kunmap(pga[i]->pg);
850 static int check_write_rcs(struct ptlrpc_request *req,
851 int requested_nob, int niocount,
852 obd_count page_count, struct brw_page **pga)
856 /* return error if any niobuf was in error */
857 remote_rcs = lustre_swab_repbuf(req, REQ_REC_OFF + 1,
858 sizeof(*remote_rcs) * niocount, NULL);
859 if (remote_rcs == NULL) {
860 CERROR("Missing/short RC vector on BRW_WRITE reply\n");
863 if (lustre_msg_swabbed(req->rq_repmsg))
864 for (i = 0; i < niocount; i++)
865 __swab32s(&remote_rcs[i]);
867 for (i = 0; i < niocount; i++) {
868 if (remote_rcs[i] < 0)
869 return(remote_rcs[i]);
871 if (remote_rcs[i] != 0) {
872 CERROR("rc[%d] invalid (%d) req %p\n",
873 i, remote_rcs[i], req);
878 if (req->rq_bulk->bd_nob_transferred != requested_nob) {
879 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
880 requested_nob, req->rq_bulk->bd_nob_transferred);
887 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
889 if (p1->flag != p2->flag) {
890 unsigned mask = ~OBD_BRW_FROM_GRANT;
892 /* warn if we try to combine flags that we don't know to be
894 if ((p1->flag & mask) != (p2->flag & mask))
895 CERROR("is it ok to have flags 0x%x and 0x%x in the "
896 "same brw?\n", p1->flag, p2->flag);
900 return (p1->off + p1->count == p2->off);
903 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
904 struct brw_page **pga)
909 LASSERT (pg_count > 0);
910 while (nob > 0 && pg_count > 0) {
911 char *ptr = cfs_kmap(pga[i]->pg);
912 int off = pga[i]->off & ~CFS_PAGE_MASK;
913 int count = pga[i]->count > nob ? nob : pga[i]->count;
915 /* corrupt the data before we compute the checksum, to
916 * simulate an OST->client data error */
918 OBD_FAIL_CHECK_ONCE(OBD_FAIL_OSC_CHECKSUM_RECEIVE))
919 memcpy(ptr + off, "bad1", min(4, nob));
920 cksum = crc32_le(cksum, ptr + off, count);
921 cfs_kunmap(pga[i]->pg);
922 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d checksum %x\n",
925 nob -= pga[i]->count;
929 /* For sending we only compute the wrong checksum instead
930 * of corrupting the data so it is still correct on a redo */
931 if (OBD_FAIL_CHECK_ONCE(OBD_FAIL_OSC_CHECKSUM_SEND))
937 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
938 struct lov_stripe_md *lsm, obd_count page_count,
939 struct brw_page **pga,
940 struct ptlrpc_request **reqp,
941 struct obd_capa *ocapa)
943 struct ptlrpc_request *req;
944 struct ptlrpc_bulk_desc *desc;
945 struct ost_body *body;
946 struct obd_ioobj *ioobj;
947 struct niobuf_remote *niobuf;
948 int size[5] = { sizeof(struct ptlrpc_body), sizeof(*body) };
949 int niocount, i, requested_nob, opc, rc;
950 struct ptlrpc_request_pool *pool;
951 struct lustre_capa *capa;
952 struct osc_brw_async_args *aa;
955 OBD_FAIL_RETURN(OBD_FAIL_OSC_BRW_PREP_REQ, -ENOMEM); /* Recoverable */
956 OBD_FAIL_RETURN(OBD_FAIL_OSC_BRW_PREP_REQ2, -EINVAL); /* Fatal */
958 if ((cmd & OBD_BRW_WRITE) != 0) {
960 pool = cli->cl_import->imp_rq_pool;
966 for (niocount = i = 1; i < page_count; i++) {
967 if (!can_merge_pages(pga[i - 1], pga[i]))
971 size[REQ_REC_OFF + 1] = sizeof(*ioobj);
972 size[REQ_REC_OFF + 2] = niocount * sizeof(*niobuf);
974 size[REQ_REC_OFF + 3] = sizeof(*capa);
976 req = ptlrpc_prep_req_pool(cli->cl_import, LUSTRE_OST_VERSION, opc, 5,
977 size, NULL, pool, NULL);
981 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
983 if (opc == OST_WRITE)
984 desc = ptlrpc_prep_bulk_imp (req, page_count,
985 BULK_GET_SOURCE, OST_BULK_PORTAL);
987 desc = ptlrpc_prep_bulk_imp (req, page_count,
988 BULK_PUT_SINK, OST_BULK_PORTAL);
990 GOTO(out, rc = -ENOMEM);
991 /* NB request now owns desc and will free it when it gets freed */
993 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
994 ioobj = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 1, sizeof(*ioobj));
995 niobuf = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
996 niocount * sizeof(*niobuf));
1000 obdo_to_ioobj(oa, ioobj);
1001 ioobj->ioo_bufcnt = niocount;
1003 capa = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 3,
1005 capa_cpy(capa, ocapa);
1006 body->oa.o_valid |= OBD_MD_FLOSSCAPA;
1009 LASSERT (page_count > 0);
1010 for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1011 struct brw_page *pg = pga[i];
1012 struct brw_page *pg_prev = pga[i - 1];
1014 LASSERT(pg->count > 0);
1015 LASSERTF((pg->off & ~CFS_PAGE_MASK) + pg->count <= CFS_PAGE_SIZE,
1016 "i: %d pg: %p off: "LPU64", count: %u\n", i, pg,
1017 pg->off, pg->count);
1019 LASSERTF(i == 0 || pg->off > pg_prev->off,
1020 "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1021 " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1023 pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1024 pg_prev->pg, page_private(pg_prev->pg),
1025 pg_prev->pg->index, pg_prev->off);
1027 LASSERTF(i == 0 || pg->off > pg_prev->off,
1028 "i %d p_c %u\n", i, page_count);
1030 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1031 (pg->flag & OBD_BRW_SRVLOCK));
1033 ptlrpc_prep_bulk_page(desc, pg->pg, pg->off & ~CFS_PAGE_MASK,
1035 requested_nob += pg->count;
1037 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1039 niobuf->len += pg->count;
1041 niobuf->offset = pg->off;
1042 niobuf->len = pg->count;
1043 niobuf->flags = pg->flag;
1047 LASSERT((void *)(niobuf - niocount) ==
1048 lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
1049 niocount * sizeof(*niobuf)));
1050 osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1052 /* size[REQ_REC_OFF] still sizeof (*body) */
1053 if (opc == OST_WRITE) {
1054 if (unlikely(cli->cl_checksum)) {
1055 body->oa.o_valid |= OBD_MD_FLCKSUM;
1056 body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1058 CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1060 /* save this in 'oa', too, for later checking */
1061 oa->o_valid |= OBD_MD_FLCKSUM;
1063 /* clear out the checksum flag, in case this is a
1064 * resend but cl_checksum is no longer set. b=11238 */
1065 oa->o_valid &= ~OBD_MD_FLCKSUM;
1067 oa->o_cksum = body->oa.o_cksum;
1068 /* 1 RC per niobuf */
1069 size[REPLY_REC_OFF + 1] = sizeof(__u32) * niocount;
1070 ptlrpc_req_set_repsize(req, 3, size);
1072 if (unlikely(cli->cl_checksum))
1073 body->oa.o_valid |= OBD_MD_FLCKSUM;
1074 /* 1 RC for the whole I/O */
1075 ptlrpc_req_set_repsize(req, 2, size);
1078 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1079 aa = (struct osc_brw_async_args *)&req->rq_async_args;
1081 aa->aa_requested_nob = requested_nob;
1082 aa->aa_nio_count = niocount;
1083 aa->aa_page_count = page_count;
1087 INIT_LIST_HEAD(&aa->aa_oaps);
1093 ptlrpc_req_finished (req);
1097 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1098 __u32 client_cksum, __u32 server_cksum,
1099 int nob, obd_count page_count,
1100 struct brw_page **pga)
1105 if (server_cksum == client_cksum) {
1106 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1110 new_cksum = osc_checksum_bulk(nob, page_count, pga);
1112 if (new_cksum == server_cksum)
1113 msg = "changed on the client after we checksummed it - "
1114 "likely false positive due to mmap IO (bug 11742)";
1115 else if (new_cksum == client_cksum)
1116 msg = "changed in transit before arrival at OST";
1118 msg = "changed in transit AND doesn't match the original - "
1119 "likely false positive due to mmap IO (bug 11742)";
1121 LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inum "
1122 LPU64"/"LPU64" object "LPU64"/"LPU64" extent "
1123 "["LPU64"-"LPU64"]\n",
1124 msg, libcfs_nid2str(peer->nid),
1125 oa->o_valid & OBD_MD_FLFID ? oa->o_fid : (__u64)0,
1126 oa->o_valid & OBD_MD_FLFID ? oa->o_generation :
1129 oa->o_valid & OBD_MD_FLGROUP ? oa->o_gr : (__u64)0,
1131 pga[page_count-1]->off + pga[page_count-1]->count - 1);
1132 CERROR("original client csum %x, server csum %x, client csum now %x\n",
1133 client_cksum, server_cksum, new_cksum);
1137 /* Note rc enters this function as number of bytes transferred */
1138 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1140 struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1141 const lnet_process_id_t *peer =
1142 &req->rq_import->imp_connection->c_peer;
1143 struct client_obd *cli = aa->aa_cli;
1144 struct ost_body *body;
1145 __u32 client_cksum = 0;
1148 if (rc < 0 && rc != -EDQUOT)
1151 LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1152 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
1153 lustre_swab_ost_body);
1155 CERROR ("Can't unpack body\n");
1159 /* set/clear over quota flag for a uid/gid */
1160 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1161 body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA))
1162 lquota_setdq(quota_interface, cli, body->oa.o_uid,
1163 body->oa.o_gid, body->oa.o_valid,
1169 if (unlikely(aa->aa_oa->o_valid & OBD_MD_FLCKSUM))
1170 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1172 osc_update_grant(cli, body);
1174 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1176 CERROR ("Unexpected +ve rc %d\n", rc);
1179 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1181 if (unlikely((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) &&
1183 check_write_checksum(&body->oa, peer, client_cksum,
1185 aa->aa_requested_nob,
1190 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1193 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1194 aa->aa_page_count, aa->aa_ppga);
1198 /* The rest of this function executes only for OST_READs */
1199 if (rc > aa->aa_requested_nob) {
1200 CERROR("Unexpected rc %d (%d requested)\n", rc,
1201 aa->aa_requested_nob);
1205 if (rc != req->rq_bulk->bd_nob_transferred) {
1206 CERROR ("Unexpected rc %d (%d transferred)\n",
1207 rc, req->rq_bulk->bd_nob_transferred);
1211 if (rc < aa->aa_requested_nob)
1212 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1214 if (sptlrpc_cli_unwrap_bulk_read(req, rc, aa->aa_page_count,
1216 GOTO(out, rc = -EAGAIN);
1218 if (unlikely(body->oa.o_valid & OBD_MD_FLCKSUM)) {
1219 static int cksum_counter;
1220 __u32 server_cksum = body->oa.o_cksum;
1224 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1227 if (peer->nid == req->rq_bulk->bd_sender) {
1231 router = libcfs_nid2str(req->rq_bulk->bd_sender);
1234 if (server_cksum == ~0 && rc > 0) {
1235 CERROR("Protocol error: server %s set the 'checksum' "
1236 "bit, but didn't send a checksum. Not fatal, "
1237 "but please tell CFS.\n",
1238 libcfs_nid2str(peer->nid));
1239 } else if (server_cksum != client_cksum) {
1240 LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1241 "%s%s%s inum "LPU64"/"LPU64" object "
1242 LPU64"/"LPU64" extent "
1243 "["LPU64"-"LPU64"]\n",
1244 req->rq_import->imp_obd->obd_name,
1245 libcfs_nid2str(peer->nid),
1247 body->oa.o_valid & OBD_MD_FLFID ?
1248 body->oa.o_fid : (__u64)0,
1249 body->oa.o_valid & OBD_MD_FLFID ?
1250 body->oa.o_generation :(__u64)0,
1252 body->oa.o_valid & OBD_MD_FLGROUP ?
1253 body->oa.o_gr : (__u64)0,
1254 aa->aa_ppga[0]->off,
1255 aa->aa_ppga[aa->aa_page_count-1]->off +
1256 aa->aa_ppga[aa->aa_page_count-1]->count -
1258 CERROR("client %x, server %x\n",
1259 client_cksum, server_cksum);
1261 aa->aa_oa->o_cksum = client_cksum;
1265 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1268 } else if (unlikely(client_cksum)) {
1269 static int cksum_missed;
1272 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1273 CERROR("Checksum %u requested from %s but not sent\n",
1274 cksum_missed, libcfs_nid2str(peer->nid));
1280 *aa->aa_oa = body->oa;
1285 static int osc_brw_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1286 struct lov_stripe_md *lsm,
1287 obd_count page_count, struct brw_page **pga,
1288 struct obd_capa *ocapa)
1290 struct ptlrpc_request *req;
1294 struct l_wait_info lwi;
1298 cfs_waitq_init(&waitq);
1301 rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1302 page_count, pga, &req, ocapa);
1306 rc = ptlrpc_queue_wait(req);
1308 if (rc == -ETIMEDOUT && req->rq_resend) {
1309 DEBUG_REQ(D_HA, req, "BULK TIMEOUT");
1310 ptlrpc_req_finished(req);
1314 rc = osc_brw_fini_request(req, rc);
1316 ptlrpc_req_finished(req);
1317 if (osc_recoverable_error(rc)) {
1319 if (!osc_should_resend(resends, &exp->exp_obd->u.cli)) {
1320 CERROR("too many resend retries, returning error\n");
1324 lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL, NULL);
1325 l_wait_event(waitq, 0, &lwi);
1333 int osc_brw_redo_request(struct ptlrpc_request *request,
1334 struct osc_brw_async_args *aa)
1336 struct ptlrpc_request *new_req;
1337 struct ptlrpc_request_set *set = request->rq_set;
1338 struct osc_brw_async_args *new_aa;
1339 struct osc_async_page *oap;
1343 if (!osc_should_resend(aa->aa_resends, aa->aa_cli)) {
1344 CERROR("too many resend retries, returning error\n");
1348 DEBUG_REQ(D_ERROR, request, "redo for recoverable error");
1350 body = lustre_msg_buf(request->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
1351 if (body->oa.o_valid & OBD_MD_FLOSSCAPA)
1352 ocapa = lustre_unpack_capa(request->rq_reqmsg,
1355 rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1356 OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1357 aa->aa_cli, aa->aa_oa,
1358 NULL /* lsm unused by osc currently */,
1359 aa->aa_page_count, aa->aa_ppga,
1360 &new_req, NULL /* ocapa */);
1364 client_obd_list_lock(&aa->aa_cli->cl_loi_list_lock);
1366 list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1367 if (oap->oap_request != NULL) {
1368 LASSERTF(request == oap->oap_request,
1369 "request %p != oap_request %p\n",
1370 request, oap->oap_request);
1371 if (oap->oap_interrupted) {
1372 client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1373 ptlrpc_req_finished(new_req);
1378 /* New request takes over pga and oaps from old request.
1379 * Note that copying a list_head doesn't work, need to move it... */
1381 new_req->rq_interpret_reply = request->rq_interpret_reply;
1382 new_req->rq_async_args = request->rq_async_args;
1383 new_req->rq_sent = CURRENT_SECONDS + aa->aa_resends;
1385 new_aa = (struct osc_brw_async_args *)&new_req->rq_async_args;
1387 INIT_LIST_HEAD(&new_aa->aa_oaps);
1388 list_splice(&aa->aa_oaps, &new_aa->aa_oaps);
1389 INIT_LIST_HEAD(&aa->aa_oaps);
1391 list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1392 if (oap->oap_request) {
1393 ptlrpc_req_finished(oap->oap_request);
1394 oap->oap_request = ptlrpc_request_addref(new_req);
1397 client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1399 DEBUG_REQ(D_INFO, new_req, "new request");
1401 ptlrpc_set_add_req(set, new_req);
1406 static int brw_interpret(struct ptlrpc_request *req, void *data, int rc)
1408 struct osc_brw_async_args *aa = data;
1413 rc = osc_brw_fini_request(req, rc);
1414 if (osc_recoverable_error(rc)) {
1415 rc = osc_brw_redo_request(req, aa);
1419 if ((rc >= 0) && req->rq_set && req->rq_set->set_countp)
1420 atomic_add(nob, (atomic_t *)req->rq_set->set_countp);
1422 client_obd_list_lock(&aa->aa_cli->cl_loi_list_lock);
1423 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
1424 aa->aa_cli->cl_w_in_flight--;
1426 aa->aa_cli->cl_r_in_flight--;
1427 for (i = 0; i < aa->aa_page_count; i++)
1428 osc_release_write_grant(aa->aa_cli, aa->aa_ppga[i], 1);
1429 client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1431 osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1436 static int async_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1437 struct lov_stripe_md *lsm, obd_count page_count,
1438 struct brw_page **pga, struct ptlrpc_request_set *set,
1439 struct obd_capa *ocapa)
1441 struct ptlrpc_request *req;
1442 struct client_obd *cli = &exp->exp_obd->u.cli;
1444 struct osc_brw_async_args *aa;
1447 /* Consume write credits even if doing a sync write -
1448 * otherwise we may run out of space on OST due to grant. */
1449 if (cmd == OBD_BRW_WRITE) {
1450 spin_lock(&cli->cl_loi_list_lock);
1451 for (i = 0; i < page_count; i++) {
1452 if (cli->cl_avail_grant >= CFS_PAGE_SIZE)
1453 osc_consume_write_grant(cli, pga[i]);
1455 spin_unlock(&cli->cl_loi_list_lock);
1458 rc = osc_brw_prep_request(cmd, cli, oa, lsm, page_count, pga,
1461 aa = (struct osc_brw_async_args *)&req->rq_async_args;
1462 if (cmd == OBD_BRW_READ) {
1463 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
1464 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
1465 ptlrpc_lprocfs_brw(req, OST_READ, aa->aa_requested_nob);
1467 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
1468 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
1469 cli->cl_w_in_flight);
1470 ptlrpc_lprocfs_brw(req, OST_WRITE, aa->aa_requested_nob);
1474 req->rq_interpret_reply = brw_interpret;
1475 ptlrpc_set_add_req(set, req);
1476 client_obd_list_lock(&cli->cl_loi_list_lock);
1477 if (cmd == OBD_BRW_READ)
1478 cli->cl_r_in_flight++;
1480 cli->cl_w_in_flight++;
1481 client_obd_list_unlock(&cli->cl_loi_list_lock);
1482 } else if (cmd == OBD_BRW_WRITE) {
1483 client_obd_list_lock(&cli->cl_loi_list_lock);
1484 for (i = 0; i < page_count; i++)
1485 osc_release_write_grant(cli, pga[i], 0);
1486 client_obd_list_unlock(&cli->cl_loi_list_lock);
1492 * ugh, we want disk allocation on the target to happen in offset order. we'll
1493 * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1494 * fine for our small page arrays and doesn't require allocation. its an
1495 * insertion sort that swaps elements that are strides apart, shrinking the
1496 * stride down until its '1' and the array is sorted.
1498 static void sort_brw_pages(struct brw_page **array, int num)
1501 struct brw_page *tmp;
1505 for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1510 for (i = stride ; i < num ; i++) {
1513 while (j >= stride && array[j - stride]->off > tmp->off) {
1514 array[j] = array[j - stride];
1519 } while (stride > 1);
1522 static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages)
1528 LASSERT (pages > 0);
1529 offset = pg[i]->off & ~CFS_PAGE_MASK;
1533 if (pages == 0) /* that's all */
1536 if (offset + pg[i]->count < CFS_PAGE_SIZE)
1537 return count; /* doesn't end on page boundary */
1540 offset = pg[i]->off & ~CFS_PAGE_MASK;
1541 if (offset != 0) /* doesn't start on page boundary */
1548 static struct brw_page **osc_build_ppga(struct brw_page *pga, obd_count count)
1550 struct brw_page **ppga;
1553 OBD_ALLOC(ppga, sizeof(*ppga) * count);
1557 for (i = 0; i < count; i++)
1562 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1564 LASSERT(ppga != NULL);
1565 OBD_FREE(ppga, sizeof(*ppga) * count);
1568 static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
1569 obd_count page_count, struct brw_page *pga,
1570 struct obd_trans_info *oti)
1572 struct obdo *saved_oa = NULL;
1573 struct brw_page **ppga, **orig;
1574 struct obd_import *imp = class_exp2cliimp(exp);
1575 struct client_obd *cli = &imp->imp_obd->u.cli;
1576 int rc, page_count_orig;
1579 if (cmd & OBD_BRW_CHECK) {
1580 /* The caller just wants to know if there's a chance that this
1581 * I/O can succeed */
1583 if (imp == NULL || imp->imp_invalid)
1588 /* test_brw with a failed create can trip this, maybe others. */
1589 LASSERT(cli->cl_max_pages_per_rpc);
1593 orig = ppga = osc_build_ppga(pga, page_count);
1596 page_count_orig = page_count;
1598 sort_brw_pages(ppga, page_count);
1599 while (page_count) {
1600 obd_count pages_per_brw;
1602 if (page_count > cli->cl_max_pages_per_rpc)
1603 pages_per_brw = cli->cl_max_pages_per_rpc;
1605 pages_per_brw = page_count;
1607 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1609 if (saved_oa != NULL) {
1610 /* restore previously saved oa */
1611 *oinfo->oi_oa = *saved_oa;
1612 } else if (page_count > pages_per_brw) {
1613 /* save a copy of oa (brw will clobber it) */
1614 OBDO_ALLOC(saved_oa);
1615 if (saved_oa == NULL)
1616 GOTO(out, rc = -ENOMEM);
1617 *saved_oa = *oinfo->oi_oa;
1620 rc = osc_brw_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1621 pages_per_brw, ppga, oinfo->oi_capa);
1626 page_count -= pages_per_brw;
1627 ppga += pages_per_brw;
1631 osc_release_ppga(orig, page_count_orig);
1633 if (saved_oa != NULL)
1634 OBDO_FREE(saved_oa);
1639 static int osc_brw_async(int cmd, struct obd_export *exp,
1640 struct obd_info *oinfo, obd_count page_count,
1641 struct brw_page *pga, struct obd_trans_info *oti,
1642 struct ptlrpc_request_set *set)
1644 struct brw_page **ppga, **orig;
1645 struct client_obd *cli = &exp->exp_obd->u.cli;
1646 int page_count_orig;
1650 if (cmd & OBD_BRW_CHECK) {
1651 struct obd_import *imp = class_exp2cliimp(exp);
1652 /* The caller just wants to know if there's a chance that this
1653 * I/O can succeed */
1655 if (imp == NULL || imp->imp_invalid)
1660 orig = ppga = osc_build_ppga(pga, page_count);
1663 page_count_orig = page_count;
1665 sort_brw_pages(ppga, page_count);
1666 while (page_count) {
1667 struct brw_page **copy;
1668 obd_count pages_per_brw;
1670 pages_per_brw = min_t(obd_count, page_count,
1671 cli->cl_max_pages_per_rpc);
1673 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1675 /* use ppga only if single RPC is going to fly */
1676 if (pages_per_brw != page_count_orig || ppga != orig) {
1677 OBD_ALLOC(copy, sizeof(*copy) * pages_per_brw);
1679 GOTO(out, rc = -ENOMEM);
1680 memcpy(copy, ppga, sizeof(*copy) * pages_per_brw);
1684 rc = async_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1685 pages_per_brw, copy, set, oinfo->oi_capa);
1689 OBD_FREE(copy, sizeof(*copy) * pages_per_brw);
1693 /* we passed it to async_internal() which is
1694 * now responsible for releasing memory */
1698 page_count -= pages_per_brw;
1699 ppga += pages_per_brw;
1703 osc_release_ppga(orig, page_count_orig);
1707 static void osc_check_rpcs(struct client_obd *cli);
1709 /* The companion to osc_enter_cache(), called when @oap is no longer part of
1710 * the dirty accounting. Writeback completes or truncate happens before
1711 * writing starts. Must be called with the loi lock held. */
1712 static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap,
1715 osc_release_write_grant(cli, &oap->oap_brw_page, sent);
1719 /* This maintains the lists of pending pages to read/write for a given object
1720 * (lop). This is used by osc_check_rpcs->osc_next_loi() and loi_list_maint()
1721 * to quickly find objects that are ready to send an RPC. */
1722 static int lop_makes_rpc(struct client_obd *cli, struct loi_oap_pages *lop,
1728 if (lop->lop_num_pending == 0)
1731 /* if we have an invalid import we want to drain the queued pages
1732 * by forcing them through rpcs that immediately fail and complete
1733 * the pages. recovery relies on this to empty the queued pages
1734 * before canceling the locks and evicting down the llite pages */
1735 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
1738 /* stream rpcs in queue order as long as as there is an urgent page
1739 * queued. this is our cheap solution for good batching in the case
1740 * where writepage marks some random page in the middle of the file
1741 * as urgent because of, say, memory pressure */
1742 if (!list_empty(&lop->lop_urgent)) {
1743 CDEBUG(D_CACHE, "urgent request forcing RPC\n");
1746 /* fire off rpcs when we have 'optimal' rpcs as tuned for the wire. */
1747 optimal = cli->cl_max_pages_per_rpc;
1748 if (cmd & OBD_BRW_WRITE) {
1749 /* trigger a write rpc stream as long as there are dirtiers
1750 * waiting for space. as they're waiting, they're not going to
1751 * create more pages to coallesce with what's waiting.. */
1752 if (!list_empty(&cli->cl_cache_waiters)) {
1753 CDEBUG(D_CACHE, "cache waiters forcing RPC\n");
1756 /* +16 to avoid triggering rpcs that would want to include pages
1757 * that are being queued but which can't be made ready until
1758 * the queuer finishes with the page. this is a wart for
1759 * llite::commit_write() */
1762 if (lop->lop_num_pending >= optimal)
1768 static void on_list(struct list_head *item, struct list_head *list,
1771 if (list_empty(item) && should_be_on)
1772 list_add_tail(item, list);
1773 else if (!list_empty(item) && !should_be_on)
1774 list_del_init(item);
1777 /* maintain the loi's cli list membership invariants so that osc_send_oap_rpc
1778 * can find pages to build into rpcs quickly */
1779 static void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi)
1781 on_list(&loi->loi_cli_item, &cli->cl_loi_ready_list,
1782 lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE) ||
1783 lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ));
1785 on_list(&loi->loi_write_item, &cli->cl_loi_write_list,
1786 loi->loi_write_lop.lop_num_pending);
1788 on_list(&loi->loi_read_item, &cli->cl_loi_read_list,
1789 loi->loi_read_lop.lop_num_pending);
1792 static void lop_update_pending(struct client_obd *cli,
1793 struct loi_oap_pages *lop, int cmd, int delta)
1795 lop->lop_num_pending += delta;
1796 if (cmd & OBD_BRW_WRITE)
1797 cli->cl_pending_w_pages += delta;
1799 cli->cl_pending_r_pages += delta;
1802 /* this is called when a sync waiter receives an interruption. Its job is to
1803 * get the caller woken as soon as possible. If its page hasn't been put in an
1804 * rpc yet it can dequeue immediately. Otherwise it has to mark the rpc as
1805 * desiring interruption which will forcefully complete the rpc once the rpc
1807 static void osc_occ_interrupted(struct oig_callback_context *occ)
1809 struct osc_async_page *oap;
1810 struct loi_oap_pages *lop;
1811 struct lov_oinfo *loi;
1814 /* XXX member_of() */
1815 oap = list_entry(occ, struct osc_async_page, oap_occ);
1817 client_obd_list_lock(&oap->oap_cli->cl_loi_list_lock);
1819 oap->oap_interrupted = 1;
1821 /* ok, it's been put in an rpc. only one oap gets a request reference */
1822 if (oap->oap_request != NULL) {
1823 ptlrpc_mark_interrupted(oap->oap_request);
1824 ptlrpcd_wake(oap->oap_request);
1828 /* we don't get interruption callbacks until osc_trigger_group_io()
1829 * has been called and put the sync oaps in the pending/urgent lists.*/
1830 if (!list_empty(&oap->oap_pending_item)) {
1831 list_del_init(&oap->oap_pending_item);
1832 list_del_init(&oap->oap_urgent_item);
1835 lop = (oap->oap_cmd & OBD_BRW_WRITE) ?
1836 &loi->loi_write_lop : &loi->loi_read_lop;
1837 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, -1);
1838 loi_list_maint(oap->oap_cli, oap->oap_loi);
1840 oig_complete_one(oap->oap_oig, &oap->oap_occ, -EINTR);
1841 oap->oap_oig = NULL;
1845 client_obd_list_unlock(&oap->oap_cli->cl_loi_list_lock);
1848 /* this is trying to propogate async writeback errors back up to the
1849 * application. As an async write fails we record the error code for later if
1850 * the app does an fsync. As long as errors persist we force future rpcs to be
1851 * sync so that the app can get a sync error and break the cycle of queueing
1852 * pages for which writeback will fail. */
1853 static void osc_process_ar(struct osc_async_rc *ar, __u64 xid,
1860 ar->ar_force_sync = 1;
1861 ar->ar_min_xid = ptlrpc_sample_next_xid();
1866 if (ar->ar_force_sync && (xid >= ar->ar_min_xid))
1867 ar->ar_force_sync = 0;
1870 static void osc_oap_to_pending(struct osc_async_page *oap)
1872 struct loi_oap_pages *lop;
1874 if (oap->oap_cmd & OBD_BRW_WRITE)
1875 lop = &oap->oap_loi->loi_write_lop;
1877 lop = &oap->oap_loi->loi_read_lop;
1879 if (oap->oap_async_flags & ASYNC_URGENT)
1880 list_add(&oap->oap_urgent_item, &lop->lop_urgent);
1881 list_add_tail(&oap->oap_pending_item, &lop->lop_pending);
1882 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, 1);
1885 /* this must be called holding the loi list lock to give coverage to exit_cache,
1886 * async_flag maintenance, and oap_request */
1887 static void osc_ap_completion(struct client_obd *cli, struct obdo *oa,
1888 struct osc_async_page *oap, int sent, int rc)
1893 if (oap->oap_request != NULL) {
1894 xid = ptlrpc_req_xid(oap->oap_request);
1895 ptlrpc_req_finished(oap->oap_request);
1896 oap->oap_request = NULL;
1899 oap->oap_async_flags = 0;
1900 oap->oap_interrupted = 0;
1902 if (oap->oap_cmd & OBD_BRW_WRITE) {
1903 osc_process_ar(&cli->cl_ar, xid, rc);
1904 osc_process_ar(&oap->oap_loi->loi_ar, xid, rc);
1907 if (rc == 0 && oa != NULL) {
1908 if (oa->o_valid & OBD_MD_FLBLOCKS)
1909 oap->oap_loi->loi_lvb.lvb_blocks = oa->o_blocks;
1910 if (oa->o_valid & OBD_MD_FLMTIME)
1911 oap->oap_loi->loi_lvb.lvb_mtime = oa->o_mtime;
1912 if (oa->o_valid & OBD_MD_FLATIME)
1913 oap->oap_loi->loi_lvb.lvb_atime = oa->o_atime;
1914 if (oa->o_valid & OBD_MD_FLCTIME)
1915 oap->oap_loi->loi_lvb.lvb_ctime = oa->o_ctime;
1919 osc_exit_cache(cli, oap, sent);
1920 oig_complete_one(oap->oap_oig, &oap->oap_occ, rc);
1921 oap->oap_oig = NULL;
1926 rc = oap->oap_caller_ops->ap_completion(oap->oap_caller_data,
1927 oap->oap_cmd, oa, rc);
1929 /* ll_ap_completion (from llite) drops PG_locked. so, a new
1930 * I/O on the page could start, but OSC calls it under lock
1931 * and thus we can add oap back to pending safely */
1933 /* upper layer wants to leave the page on pending queue */
1934 osc_oap_to_pending(oap);
1936 osc_exit_cache(cli, oap, sent);
1940 static int brw_interpret_oap(struct ptlrpc_request *req, void *data, int rc)
1942 struct osc_async_page *oap, *tmp;
1943 struct osc_brw_async_args *aa = data;
1944 struct client_obd *cli;
1947 rc = osc_brw_fini_request(req, rc);
1948 CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
1949 if (osc_recoverable_error(rc)) {
1950 rc = osc_brw_redo_request(req, aa);
1957 client_obd_list_lock(&cli->cl_loi_list_lock);
1959 /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
1960 * is called so we know whether to go to sync BRWs or wait for more
1961 * RPCs to complete */
1962 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
1963 cli->cl_w_in_flight--;
1965 cli->cl_r_in_flight--;
1967 /* the caller may re-use the oap after the completion call so
1968 * we need to clean it up a little */
1969 list_for_each_entry_safe(oap, tmp, &aa->aa_oaps, oap_rpc_item) {
1970 list_del_init(&oap->oap_rpc_item);
1971 osc_ap_completion(cli, aa->aa_oa, oap, 1, rc);
1974 osc_wake_cache_waiters(cli);
1975 osc_check_rpcs(cli);
1977 client_obd_list_unlock(&cli->cl_loi_list_lock);
1979 OBDO_FREE(aa->aa_oa);
1981 osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1985 static struct ptlrpc_request *osc_build_req(struct client_obd *cli,
1986 struct list_head *rpc_list,
1987 int page_count, int cmd)
1989 struct ptlrpc_request *req;
1990 struct brw_page **pga = NULL;
1991 struct osc_brw_async_args *aa;
1992 struct obdo *oa = NULL;
1993 struct obd_async_page_ops *ops = NULL;
1994 void *caller_data = NULL;
1995 struct obd_capa *ocapa;
1996 struct osc_async_page *oap;
2000 LASSERT(!list_empty(rpc_list));
2002 OBD_ALLOC(pga, sizeof(*pga) * page_count);
2004 RETURN(ERR_PTR(-ENOMEM));
2008 GOTO(out, req = ERR_PTR(-ENOMEM));
2011 list_for_each_entry(oap, rpc_list, oap_rpc_item) {
2013 ops = oap->oap_caller_ops;
2014 caller_data = oap->oap_caller_data;
2016 pga[i] = &oap->oap_brw_page;
2017 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2018 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
2019 pga[i]->pg, cfs_page_index(oap->oap_page), oap, pga[i]->flag);
2023 /* always get the data for the obdo for the rpc */
2024 LASSERT(ops != NULL);
2025 ops->ap_fill_obdo(caller_data, cmd, oa);
2026 ocapa = ops->ap_lookup_capa(caller_data, cmd);
2028 sort_brw_pages(pga, page_count);
2029 rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
2033 CERROR("prep_req failed: %d\n", rc);
2034 GOTO(out, req = ERR_PTR(rc));
2037 /* Need to update the timestamps after the request is built in case
2038 * we race with setattr (locally or in queue at OST). If OST gets
2039 * later setattr before earlier BRW (as determined by the request xid),
2040 * the OST will not use BRW timestamps. Sadly, there is no obvious
2041 * way to do this in a single call. bug 10150 */
2042 ops->ap_update_obdo(caller_data, cmd, oa,
2043 OBD_MD_FLMTIME | OBD_MD_FLCTIME | OBD_MD_FLATIME);
2045 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2046 aa = (struct osc_brw_async_args *)&req->rq_async_args;
2047 INIT_LIST_HEAD(&aa->aa_oaps);
2048 list_splice(rpc_list, &aa->aa_oaps);
2049 INIT_LIST_HEAD(rpc_list);
2056 OBD_FREE(pga, sizeof(*pga) * page_count);
2061 /* the loi lock is held across this function but it's allowed to release
2062 * and reacquire it during its work */
2063 static int osc_send_oap_rpc(struct client_obd *cli, struct lov_oinfo *loi,
2064 int cmd, struct loi_oap_pages *lop)
2066 struct ptlrpc_request *req;
2067 obd_count page_count = 0;
2068 struct osc_async_page *oap = NULL, *tmp;
2069 struct osc_brw_async_args *aa;
2070 struct obd_async_page_ops *ops;
2071 CFS_LIST_HEAD(rpc_list);
2072 unsigned int ending_offset;
2073 unsigned starting_offset = 0;
2076 /* first we find the pages we're allowed to work with */
2077 list_for_each_entry_safe(oap, tmp, &lop->lop_pending,
2079 ops = oap->oap_caller_ops;
2081 LASSERT(oap->oap_magic == OAP_MAGIC);
2083 /* in llite being 'ready' equates to the page being locked
2084 * until completion unlocks it. commit_write submits a page
2085 * as not ready because its unlock will happen unconditionally
2086 * as the call returns. if we race with commit_write giving
2087 * us that page we dont' want to create a hole in the page
2088 * stream, so we stop and leave the rpc to be fired by
2089 * another dirtier or kupdated interval (the not ready page
2090 * will still be on the dirty list). we could call in
2091 * at the end of ll_file_write to process the queue again. */
2092 if (!(oap->oap_async_flags & ASYNC_READY)) {
2093 int rc = ops->ap_make_ready(oap->oap_caller_data, cmd);
2095 CDEBUG(D_INODE, "oap %p page %p returned %d "
2096 "instead of ready\n", oap,
2100 /* llite is telling us that the page is still
2101 * in commit_write and that we should try
2102 * and put it in an rpc again later. we
2103 * break out of the loop so we don't create
2104 * a hole in the sequence of pages in the rpc
2109 /* the io isn't needed.. tell the checks
2110 * below to complete the rpc with EINTR */
2111 oap->oap_async_flags |= ASYNC_COUNT_STABLE;
2112 oap->oap_count = -EINTR;
2115 oap->oap_async_flags |= ASYNC_READY;
2118 LASSERTF(0, "oap %p page %p returned %d "
2119 "from make_ready\n", oap,
2127 * Page submitted for IO has to be locked. Either by
2128 * ->ap_make_ready() or by higher layers.
2130 * XXX nikita: this assertion should be adjusted when lustre
2131 * starts using PG_writeback for pages being written out.
2133 #if defined(__KERNEL__) && defined(__LINUX__)
2134 LASSERT(PageLocked(oap->oap_page));
2136 /* If there is a gap at the start of this page, it can't merge
2137 * with any previous page, so we'll hand the network a
2138 * "fragmented" page array that it can't transfer in 1 RDMA */
2139 if (page_count != 0 && oap->oap_page_off != 0)
2142 /* take the page out of our book-keeping */
2143 list_del_init(&oap->oap_pending_item);
2144 lop_update_pending(cli, lop, cmd, -1);
2145 list_del_init(&oap->oap_urgent_item);
2147 if (page_count == 0)
2148 starting_offset = (oap->oap_obj_off+oap->oap_page_off) &
2149 (PTLRPC_MAX_BRW_SIZE - 1);
2151 /* ask the caller for the size of the io as the rpc leaves. */
2152 if (!(oap->oap_async_flags & ASYNC_COUNT_STABLE))
2154 ops->ap_refresh_count(oap->oap_caller_data,cmd);
2155 if (oap->oap_count <= 0) {
2156 CDEBUG(D_CACHE, "oap %p count %d, completing\n", oap,
2158 osc_ap_completion(cli, NULL, oap, 0, oap->oap_count);
2162 /* now put the page back in our accounting */
2163 list_add_tail(&oap->oap_rpc_item, &rpc_list);
2164 if (++page_count >= cli->cl_max_pages_per_rpc)
2167 /* End on a PTLRPC_MAX_BRW_SIZE boundary. We want full-sized
2168 * RPCs aligned on PTLRPC_MAX_BRW_SIZE boundaries to help reads
2169 * have the same alignment as the initial writes that allocated
2170 * extents on the server. */
2171 ending_offset = (oap->oap_obj_off + oap->oap_page_off +
2172 oap->oap_count) & (PTLRPC_MAX_BRW_SIZE - 1);
2173 if (ending_offset == 0)
2176 /* If there is a gap at the end of this page, it can't merge
2177 * with any subsequent pages, so we'll hand the network a
2178 * "fragmented" page array that it can't transfer in 1 RDMA */
2179 if (oap->oap_page_off + oap->oap_count < CFS_PAGE_SIZE)
2183 osc_wake_cache_waiters(cli);
2185 if (page_count == 0)
2188 loi_list_maint(cli, loi);
2190 client_obd_list_unlock(&cli->cl_loi_list_lock);
2192 req = osc_build_req(cli, &rpc_list, page_count, cmd);
2194 /* this should happen rarely and is pretty bad, it makes the
2195 * pending list not follow the dirty order */
2196 client_obd_list_lock(&cli->cl_loi_list_lock);
2197 list_for_each_entry_safe(oap, tmp, &rpc_list, oap_rpc_item) {
2198 list_del_init(&oap->oap_rpc_item);
2200 /* queued sync pages can be torn down while the pages
2201 * were between the pending list and the rpc */
2202 if (oap->oap_interrupted) {
2203 CDEBUG(D_INODE, "oap %p interrupted\n", oap);
2204 osc_ap_completion(cli, NULL, oap, 0,
2208 osc_ap_completion(cli, NULL, oap, 0, PTR_ERR(req));
2210 loi_list_maint(cli, loi);
2211 RETURN(PTR_ERR(req));
2214 aa = (struct osc_brw_async_args *)&req->rq_async_args;
2216 if (cmd == OBD_BRW_READ) {
2217 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2218 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2219 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2220 (starting_offset >> CFS_PAGE_SHIFT) + 1);
2221 ptlrpc_lprocfs_brw(req, OST_READ, aa->aa_requested_nob);
2223 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2224 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
2225 cli->cl_w_in_flight);
2226 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2227 (starting_offset >> CFS_PAGE_SHIFT) + 1);
2228 ptlrpc_lprocfs_brw(req, OST_WRITE, aa->aa_requested_nob);
2231 client_obd_list_lock(&cli->cl_loi_list_lock);
2233 if (cmd == OBD_BRW_READ)
2234 cli->cl_r_in_flight++;
2236 cli->cl_w_in_flight++;
2238 /* queued sync pages can be torn down while the pages
2239 * were between the pending list and the rpc */
2241 list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2242 /* only one oap gets a request reference */
2245 if (oap->oap_interrupted && !req->rq_intr) {
2246 CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2248 ptlrpc_mark_interrupted(req);
2252 tmp->oap_request = ptlrpc_request_addref(req);
2254 DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2255 page_count, aa, cli->cl_r_in_flight, cli->cl_w_in_flight);
2257 req->rq_interpret_reply = brw_interpret_oap;
2258 ptlrpcd_add_req(req);
2262 #define LOI_DEBUG(LOI, STR, args...) \
2263 CDEBUG(D_INODE, "loi ready %d wr %d:%d rd %d:%d " STR, \
2264 !list_empty(&(LOI)->loi_cli_item), \
2265 (LOI)->loi_write_lop.lop_num_pending, \
2266 !list_empty(&(LOI)->loi_write_lop.lop_urgent), \
2267 (LOI)->loi_read_lop.lop_num_pending, \
2268 !list_empty(&(LOI)->loi_read_lop.lop_urgent), \
2271 /* This is called by osc_check_rpcs() to find which objects have pages that
2272 * we could be sending. These lists are maintained by lop_makes_rpc(). */
2273 struct lov_oinfo *osc_next_loi(struct client_obd *cli)
2276 /* first return all objects which we already know to have
2277 * pages ready to be stuffed into rpcs */
2278 if (!list_empty(&cli->cl_loi_ready_list))
2279 RETURN(list_entry(cli->cl_loi_ready_list.next,
2280 struct lov_oinfo, loi_cli_item));
2282 /* then if we have cache waiters, return all objects with queued
2283 * writes. This is especially important when many small files
2284 * have filled up the cache and not been fired into rpcs because
2285 * they don't pass the nr_pending/object threshhold */
2286 if (!list_empty(&cli->cl_cache_waiters) &&
2287 !list_empty(&cli->cl_loi_write_list))
2288 RETURN(list_entry(cli->cl_loi_write_list.next,
2289 struct lov_oinfo, loi_write_item));
2291 /* then return all queued objects when we have an invalid import
2292 * so that they get flushed */
2293 if (cli->cl_import == NULL || cli->cl_import->imp_invalid) {
2294 if (!list_empty(&cli->cl_loi_write_list))
2295 RETURN(list_entry(cli->cl_loi_write_list.next,
2296 struct lov_oinfo, loi_write_item));
2297 if (!list_empty(&cli->cl_loi_read_list))
2298 RETURN(list_entry(cli->cl_loi_read_list.next,
2299 struct lov_oinfo, loi_read_item));
2304 /* called with the loi list lock held */
2305 static void osc_check_rpcs(struct client_obd *cli)
2307 struct lov_oinfo *loi;
2308 int rc = 0, race_counter = 0;
2311 while ((loi = osc_next_loi(cli)) != NULL) {
2312 LOI_DEBUG(loi, "%lu in flight\n", rpcs_in_flight(cli));
2314 if (rpcs_in_flight(cli) >= cli->cl_max_rpcs_in_flight)
2317 /* attempt some read/write balancing by alternating between
2318 * reads and writes in an object. The makes_rpc checks here
2319 * would be redundant if we were getting read/write work items
2320 * instead of objects. we don't want send_oap_rpc to drain a
2321 * partial read pending queue when we're given this object to
2322 * do io on writes while there are cache waiters */
2323 if (lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)) {
2324 rc = osc_send_oap_rpc(cli, loi, OBD_BRW_WRITE,
2325 &loi->loi_write_lop);
2333 if (lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ)) {
2334 rc = osc_send_oap_rpc(cli, loi, OBD_BRW_READ,
2335 &loi->loi_read_lop);
2344 /* attempt some inter-object balancing by issueing rpcs
2345 * for each object in turn */
2346 if (!list_empty(&loi->loi_cli_item))
2347 list_del_init(&loi->loi_cli_item);
2348 if (!list_empty(&loi->loi_write_item))
2349 list_del_init(&loi->loi_write_item);
2350 if (!list_empty(&loi->loi_read_item))
2351 list_del_init(&loi->loi_read_item);
2353 loi_list_maint(cli, loi);
2355 /* send_oap_rpc fails with 0 when make_ready tells it to
2356 * back off. llite's make_ready does this when it tries
2357 * to lock a page queued for write that is already locked.
2358 * we want to try sending rpcs from many objects, but we
2359 * don't want to spin failing with 0. */
2360 if (race_counter == 10)
2366 /* we're trying to queue a page in the osc so we're subject to the
2367 * 'cl_dirty_max' limit on the number of pages that can be queued in the osc.
2368 * If the osc's queued pages are already at that limit, then we want to sleep
2369 * until there is space in the osc's queue for us. We also may be waiting for
2370 * write credits from the OST if there are RPCs in flight that may return some
2371 * before we fall back to sync writes.
2373 * We need this know our allocation was granted in the presence of signals */
2374 static int ocw_granted(struct client_obd *cli, struct osc_cache_waiter *ocw)
2378 client_obd_list_lock(&cli->cl_loi_list_lock);
2379 rc = list_empty(&ocw->ocw_entry) || rpcs_in_flight(cli) == 0;
2380 client_obd_list_unlock(&cli->cl_loi_list_lock);
2384 /* Caller must hold loi_list_lock - we drop/regain it if we need to wait for
2385 * grant or cache space. */
2386 static int osc_enter_cache(struct client_obd *cli, struct lov_oinfo *loi,
2387 struct osc_async_page *oap)
2389 struct osc_cache_waiter ocw;
2390 struct l_wait_info lwi = { 0 };
2394 CDEBUG(D_CACHE, "dirty: %ld/%d dirty_max: %ld/%d dropped: %lu "
2395 "grant: %lu\n", cli->cl_dirty, atomic_read(&obd_dirty_pages),
2396 cli->cl_dirty_max, obd_max_dirty_pages,
2397 cli->cl_lost_grant, cli->cl_avail_grant);
2399 /* force the caller to try sync io. this can jump the list
2400 * of queued writes and create a discontiguous rpc stream */
2401 if (cli->cl_dirty_max < CFS_PAGE_SIZE || cli->cl_ar.ar_force_sync ||
2402 loi->loi_ar.ar_force_sync)
2405 /* Hopefully normal case - cache space and write credits available */
2406 if ((cli->cl_dirty + CFS_PAGE_SIZE <= cli->cl_dirty_max) &&
2407 (atomic_read(&obd_dirty_pages) + 1 <= obd_max_dirty_pages) &&
2408 (cli->cl_avail_grant >= CFS_PAGE_SIZE)) {
2409 /* account for ourselves */
2410 osc_consume_write_grant(cli, &oap->oap_brw_page);
2414 /* Make sure that there are write rpcs in flight to wait for. This
2415 * is a little silly as this object may not have any pending but
2416 * other objects sure might. */
2417 if (cli->cl_w_in_flight) {
2418 list_add_tail(&ocw.ocw_entry, &cli->cl_cache_waiters);
2419 cfs_waitq_init(&ocw.ocw_waitq);
2423 loi_list_maint(cli, loi);
2424 osc_check_rpcs(cli);
2425 client_obd_list_unlock(&cli->cl_loi_list_lock);
2427 CDEBUG(D_CACHE, "sleeping for cache space\n");
2428 l_wait_event(ocw.ocw_waitq, ocw_granted(cli, &ocw), &lwi);
2430 client_obd_list_lock(&cli->cl_loi_list_lock);
2431 if (!list_empty(&ocw.ocw_entry)) {
2432 list_del(&ocw.ocw_entry);
2441 int osc_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
2442 struct lov_oinfo *loi, cfs_page_t *page,
2443 obd_off offset, struct obd_async_page_ops *ops,
2444 void *data, void **res)
2446 struct osc_async_page *oap;
2450 return size_round(sizeof(*oap));
2453 oap->oap_magic = OAP_MAGIC;
2454 oap->oap_cli = &exp->exp_obd->u.cli;
2457 oap->oap_caller_ops = ops;
2458 oap->oap_caller_data = data;
2460 oap->oap_page = page;
2461 oap->oap_obj_off = offset;
2463 CFS_INIT_LIST_HEAD(&oap->oap_pending_item);
2464 CFS_INIT_LIST_HEAD(&oap->oap_urgent_item);
2465 CFS_INIT_LIST_HEAD(&oap->oap_rpc_item);
2467 oap->oap_occ.occ_interrupted = osc_occ_interrupted;
2469 CDEBUG(D_CACHE, "oap %p page %p obj off "LPU64"\n", oap, page, offset);
2473 struct osc_async_page *oap_from_cookie(void *cookie)
2475 struct osc_async_page *oap = cookie;
2476 if (oap->oap_magic != OAP_MAGIC)
2477 return ERR_PTR(-EINVAL);
2481 static int osc_queue_async_io(struct obd_export *exp, struct lov_stripe_md *lsm,
2482 struct lov_oinfo *loi, void *cookie,
2483 int cmd, obd_off off, int count,
2484 obd_flag brw_flags, enum async_flags async_flags)
2486 struct client_obd *cli = &exp->exp_obd->u.cli;
2487 struct osc_async_page *oap;
2491 oap = oap_from_cookie(cookie);
2493 RETURN(PTR_ERR(oap));
2495 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2498 if (!list_empty(&oap->oap_pending_item) ||
2499 !list_empty(&oap->oap_urgent_item) ||
2500 !list_empty(&oap->oap_rpc_item))
2503 /* check if the file's owner/group is over quota */
2504 #ifdef HAVE_QUOTA_SUPPORT
2505 if ((cmd & OBD_BRW_WRITE) && !(cmd & OBD_BRW_NOQUOTA)){
2506 struct obd_async_page_ops *ops;
2513 ops = oap->oap_caller_ops;
2514 ops->ap_fill_obdo(oap->oap_caller_data, cmd, oa);
2515 if (lquota_chkdq(quota_interface, cli, oa->o_uid, oa->o_gid) ==
2526 loi = lsm->lsm_oinfo[0];
2528 client_obd_list_lock(&cli->cl_loi_list_lock);
2531 oap->oap_page_off = off;
2532 oap->oap_count = count;
2533 oap->oap_brw_flags = brw_flags;
2534 oap->oap_async_flags = async_flags;
2536 if (cmd & OBD_BRW_WRITE) {
2537 rc = osc_enter_cache(cli, loi, oap);
2539 client_obd_list_unlock(&cli->cl_loi_list_lock);
2544 osc_oap_to_pending(oap);
2545 loi_list_maint(cli, loi);
2547 LOI_DEBUG(loi, "oap %p page %p added for cmd %d\n", oap, oap->oap_page,
2550 osc_check_rpcs(cli);
2551 client_obd_list_unlock(&cli->cl_loi_list_lock);
2556 /* aka (~was & now & flag), but this is more clear :) */
2557 #define SETTING(was, now, flag) (!(was & flag) && (now & flag))
2559 static int osc_set_async_flags(struct obd_export *exp,
2560 struct lov_stripe_md *lsm,
2561 struct lov_oinfo *loi, void *cookie,
2562 obd_flag async_flags)
2564 struct client_obd *cli = &exp->exp_obd->u.cli;
2565 struct loi_oap_pages *lop;
2566 struct osc_async_page *oap;
2570 oap = oap_from_cookie(cookie);
2572 RETURN(PTR_ERR(oap));
2575 * bug 7311: OST-side locking is only supported for liblustre for now
2576 * (and liblustre never calls obd_set_async_flags(). I hope.), generic
2577 * implementation has to handle case where OST-locked page was picked
2578 * up by, e.g., ->writepage().
2580 LASSERT(!(oap->oap_brw_flags & OBD_BRW_SRVLOCK));
2581 LASSERT(!LIBLUSTRE_CLIENT); /* check that liblustre angels do fear to
2584 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2588 loi = lsm->lsm_oinfo[0];
2590 if (oap->oap_cmd & OBD_BRW_WRITE) {
2591 lop = &loi->loi_write_lop;
2593 lop = &loi->loi_read_lop;
2596 client_obd_list_lock(&cli->cl_loi_list_lock);
2598 if (list_empty(&oap->oap_pending_item))
2599 GOTO(out, rc = -EINVAL);
2601 if ((oap->oap_async_flags & async_flags) == async_flags)
2604 if (SETTING(oap->oap_async_flags, async_flags, ASYNC_READY))
2605 oap->oap_async_flags |= ASYNC_READY;
2607 if (SETTING(oap->oap_async_flags, async_flags, ASYNC_URGENT)) {
2608 if (list_empty(&oap->oap_rpc_item)) {
2609 list_add(&oap->oap_urgent_item, &lop->lop_urgent);
2610 loi_list_maint(cli, loi);
2614 LOI_DEBUG(loi, "oap %p page %p has flags %x\n", oap, oap->oap_page,
2615 oap->oap_async_flags);
2617 osc_check_rpcs(cli);
2618 client_obd_list_unlock(&cli->cl_loi_list_lock);
2622 static int osc_queue_group_io(struct obd_export *exp, struct lov_stripe_md *lsm,
2623 struct lov_oinfo *loi,
2624 struct obd_io_group *oig, void *cookie,
2625 int cmd, obd_off off, int count,
2627 obd_flag async_flags)
2629 struct client_obd *cli = &exp->exp_obd->u.cli;
2630 struct osc_async_page *oap;
2631 struct loi_oap_pages *lop;
2635 oap = oap_from_cookie(cookie);
2637 RETURN(PTR_ERR(oap));
2639 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2642 if (!list_empty(&oap->oap_pending_item) ||
2643 !list_empty(&oap->oap_urgent_item) ||
2644 !list_empty(&oap->oap_rpc_item))
2648 loi = lsm->lsm_oinfo[0];
2650 client_obd_list_lock(&cli->cl_loi_list_lock);
2653 oap->oap_page_off = off;
2654 oap->oap_count = count;
2655 oap->oap_brw_flags = brw_flags;
2656 oap->oap_async_flags = async_flags;
2658 if (cmd & OBD_BRW_WRITE)
2659 lop = &loi->loi_write_lop;
2661 lop = &loi->loi_read_lop;
2663 list_add_tail(&oap->oap_pending_item, &lop->lop_pending_group);
2664 if (oap->oap_async_flags & ASYNC_GROUP_SYNC) {
2666 rc = oig_add_one(oig, &oap->oap_occ);
2669 LOI_DEBUG(loi, "oap %p page %p on group pending: rc %d\n",
2670 oap, oap->oap_page, rc);
2672 client_obd_list_unlock(&cli->cl_loi_list_lock);
2677 static void osc_group_to_pending(struct client_obd *cli, struct lov_oinfo *loi,
2678 struct loi_oap_pages *lop, int cmd)
2680 struct list_head *pos, *tmp;
2681 struct osc_async_page *oap;
2683 list_for_each_safe(pos, tmp, &lop->lop_pending_group) {
2684 oap = list_entry(pos, struct osc_async_page, oap_pending_item);
2685 list_del(&oap->oap_pending_item);
2686 osc_oap_to_pending(oap);
2688 loi_list_maint(cli, loi);
2691 static int osc_trigger_group_io(struct obd_export *exp,
2692 struct lov_stripe_md *lsm,
2693 struct lov_oinfo *loi,
2694 struct obd_io_group *oig)
2696 struct client_obd *cli = &exp->exp_obd->u.cli;
2700 loi = lsm->lsm_oinfo[0];
2702 client_obd_list_lock(&cli->cl_loi_list_lock);
2704 osc_group_to_pending(cli, loi, &loi->loi_write_lop, OBD_BRW_WRITE);
2705 osc_group_to_pending(cli, loi, &loi->loi_read_lop, OBD_BRW_READ);
2707 osc_check_rpcs(cli);
2708 client_obd_list_unlock(&cli->cl_loi_list_lock);
2713 static int osc_teardown_async_page(struct obd_export *exp,
2714 struct lov_stripe_md *lsm,
2715 struct lov_oinfo *loi, void *cookie)
2717 struct client_obd *cli = &exp->exp_obd->u.cli;
2718 struct loi_oap_pages *lop;
2719 struct osc_async_page *oap;
2723 oap = oap_from_cookie(cookie);
2725 RETURN(PTR_ERR(oap));
2728 loi = lsm->lsm_oinfo[0];
2730 if (oap->oap_cmd & OBD_BRW_WRITE) {
2731 lop = &loi->loi_write_lop;
2733 lop = &loi->loi_read_lop;
2736 client_obd_list_lock(&cli->cl_loi_list_lock);
2738 if (!list_empty(&oap->oap_rpc_item))
2739 GOTO(out, rc = -EBUSY);
2741 osc_exit_cache(cli, oap, 0);
2742 osc_wake_cache_waiters(cli);
2744 if (!list_empty(&oap->oap_urgent_item)) {
2745 list_del_init(&oap->oap_urgent_item);
2746 oap->oap_async_flags &= ~ASYNC_URGENT;
2748 if (!list_empty(&oap->oap_pending_item)) {
2749 list_del_init(&oap->oap_pending_item);
2750 lop_update_pending(cli, lop, oap->oap_cmd, -1);
2752 loi_list_maint(cli, loi);
2754 LOI_DEBUG(loi, "oap %p page %p torn down\n", oap, oap->oap_page);
2756 client_obd_list_unlock(&cli->cl_loi_list_lock);
2760 static void osc_set_data_with_check(struct lustre_handle *lockh, void *data,
2763 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2766 CERROR("lockh %p, data %p - client evicted?\n", lockh, data);
2769 lock_res_and_lock(lock);
2772 /* Liang XXX: Darwin and Winnt checking should be added */
2773 if (lock->l_ast_data && lock->l_ast_data != data) {
2774 struct inode *new_inode = data;
2775 struct inode *old_inode = lock->l_ast_data;
2776 if (!(old_inode->i_state & I_FREEING))
2777 LDLM_ERROR(lock, "inconsistent l_ast_data found");
2778 LASSERTF(old_inode->i_state & I_FREEING,
2779 "Found existing inode %p/%lu/%u state %lu in lock: "
2780 "setting data to %p/%lu/%u\n", old_inode,
2781 old_inode->i_ino, old_inode->i_generation,
2783 new_inode, new_inode->i_ino, new_inode->i_generation);
2787 lock->l_ast_data = data;
2788 lock->l_flags |= (flags & LDLM_FL_NO_LRU);
2789 unlock_res_and_lock(lock);
2790 LDLM_LOCK_PUT(lock);
2793 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2794 ldlm_iterator_t replace, void *data)
2796 struct ldlm_res_id res_id = { .name = {0} };
2797 struct obd_device *obd = class_exp2obd(exp);
2799 res_id.name[0] = lsm->lsm_object_id;
2800 res_id.name[2] = lsm->lsm_object_gr;
2802 ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2806 static int osc_enqueue_fini(struct ptlrpc_request *req, struct obd_info *oinfo,
2812 /* The request was created before ldlm_cli_enqueue call. */
2813 if (rc == ELDLM_LOCK_ABORTED) {
2814 struct ldlm_reply *rep;
2816 /* swabbed by ldlm_cli_enqueue() */
2817 LASSERT_REPSWABBED(req, DLM_LOCKREPLY_OFF);
2818 rep = lustre_msg_buf(req->rq_repmsg, DLM_LOCKREPLY_OFF,
2820 LASSERT(rep != NULL);
2821 if (rep->lock_policy_res1)
2822 rc = rep->lock_policy_res1;
2826 if ((intent && rc == ELDLM_LOCK_ABORTED) || !rc) {
2827 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
2828 oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_size,
2829 oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_blocks,
2830 oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_mtime);
2833 /* Call the update callback. */
2834 rc = oinfo->oi_cb_up(oinfo, rc);
2838 static int osc_enqueue_interpret(struct ptlrpc_request *req,
2839 struct osc_enqueue_args *aa, int rc)
2841 int intent = aa->oa_oi->oi_flags & LDLM_FL_HAS_INTENT;
2842 struct lov_stripe_md *lsm = aa->oa_oi->oi_md;
2843 struct ldlm_lock *lock;
2845 /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2847 lock = ldlm_handle2lock(aa->oa_oi->oi_lockh);
2849 /* Complete obtaining the lock procedure. */
2850 rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
2852 &aa->oa_oi->oi_flags,
2853 &lsm->lsm_oinfo[0]->loi_lvb,
2854 sizeof(lsm->lsm_oinfo[0]->loi_lvb),
2855 lustre_swab_ost_lvb,
2856 aa->oa_oi->oi_lockh, rc);
2858 /* Complete osc stuff. */
2859 rc = osc_enqueue_fini(req, aa->oa_oi, intent, rc);
2861 /* Release the lock for async request. */
2862 if (lustre_handle_is_used(aa->oa_oi->oi_lockh) && rc == ELDLM_OK)
2863 ldlm_lock_decref(aa->oa_oi->oi_lockh, aa->oa_ei->ei_mode);
2865 LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
2866 aa->oa_oi->oi_lockh, req, aa);
2867 LDLM_LOCK_PUT(lock);
2871 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2872 * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2873 * other synchronous requests, however keeping some locks and trying to obtain
2874 * others may take a considerable amount of time in a case of ost failure; and
2875 * when other sync requests do not get released lock from a client, the client
2876 * is excluded from the cluster -- such scenarious make the life difficult, so
2877 * release locks just after they are obtained. */
2878 static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
2879 struct ldlm_enqueue_info *einfo,
2880 struct ptlrpc_request_set *rqset)
2882 struct ldlm_res_id res_id = { .name = {0} };
2883 struct obd_device *obd = exp->exp_obd;
2884 struct ldlm_reply *rep;
2885 struct ptlrpc_request *req = NULL;
2886 int intent = oinfo->oi_flags & LDLM_FL_HAS_INTENT;
2890 res_id.name[0] = oinfo->oi_md->lsm_object_id;
2891 res_id.name[2] = oinfo->oi_md->lsm_object_gr;
2893 /* Filesystem lock extents are extended to page boundaries so that
2894 * dealing with the page cache is a little smoother. */
2895 oinfo->oi_policy.l_extent.start -=
2896 oinfo->oi_policy.l_extent.start & ~CFS_PAGE_MASK;
2897 oinfo->oi_policy.l_extent.end |= ~CFS_PAGE_MASK;
2899 if (oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid == 0)
2902 /* Next, search for already existing extent locks that will cover us */
2903 rc = ldlm_lock_match(obd->obd_namespace,
2904 oinfo->oi_flags | LDLM_FL_LVB_READY, &res_id,
2905 einfo->ei_type, &oinfo->oi_policy, einfo->ei_mode,
2908 osc_set_data_with_check(oinfo->oi_lockh, einfo->ei_cbdata,
2911 /* I would like to be able to ASSERT here that rss <=
2912 * kms, but I can't, for reasons which are explained in
2916 /* We already have a lock, and it's referenced */
2917 oinfo->oi_cb_up(oinfo, ELDLM_OK);
2919 /* For async requests, decref the lock. */
2921 ldlm_lock_decref(oinfo->oi_lockh, einfo->ei_mode);
2926 /* If we're trying to read, we also search for an existing PW lock. The
2927 * VFS and page cache already protect us locally, so lots of readers/
2928 * writers can share a single PW lock.
2930 * There are problems with conversion deadlocks, so instead of
2931 * converting a read lock to a write lock, we'll just enqueue a new
2934 * At some point we should cancel the read lock instead of making them
2935 * send us a blocking callback, but there are problems with canceling
2936 * locks out from other users right now, too. */
2938 if (einfo->ei_mode == LCK_PR) {
2939 rc = ldlm_lock_match(obd->obd_namespace,
2940 oinfo->oi_flags | LDLM_FL_LVB_READY,
2941 &res_id, einfo->ei_type, &oinfo->oi_policy,
2942 LCK_PW, oinfo->oi_lockh);
2944 /* FIXME: This is not incredibly elegant, but it might
2945 * be more elegant than adding another parameter to
2946 * lock_match. I want a second opinion. */
2947 /* addref the lock only if not async requests. */
2949 ldlm_lock_addref(oinfo->oi_lockh, LCK_PR);
2950 osc_set_data_with_check(oinfo->oi_lockh,
2953 oinfo->oi_cb_up(oinfo, ELDLM_OK);
2954 ldlm_lock_decref(oinfo->oi_lockh, LCK_PW);
2962 [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
2963 [DLM_LOCKREQ_OFF] = sizeof(struct ldlm_request),
2964 [DLM_LOCKREQ_OFF + 1] = 0 };
2966 req = ldlm_prep_enqueue_req(exp, 2, size, NULL, 0);
2970 size[DLM_LOCKREPLY_OFF] = sizeof(*rep);
2971 size[DLM_REPLY_REC_OFF] =
2972 sizeof(oinfo->oi_md->lsm_oinfo[0]->loi_lvb);
2973 ptlrpc_req_set_repsize(req, 3, size);
2976 /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2977 oinfo->oi_flags &= ~LDLM_FL_BLOCK_GRANTED;
2979 rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id,
2980 &oinfo->oi_policy, &oinfo->oi_flags,
2981 &oinfo->oi_md->lsm_oinfo[0]->loi_lvb,
2982 sizeof(oinfo->oi_md->lsm_oinfo[0]->loi_lvb),
2983 lustre_swab_ost_lvb, oinfo->oi_lockh,
2987 struct osc_enqueue_args *aa;
2988 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2989 aa = (struct osc_enqueue_args *)&req->rq_async_args;
2994 req->rq_interpret_reply = osc_enqueue_interpret;
2995 ptlrpc_set_add_req(rqset, req);
2996 } else if (intent) {
2997 ptlrpc_req_finished(req);
3002 rc = osc_enqueue_fini(req, oinfo, intent, rc);
3004 ptlrpc_req_finished(req);
3009 static int osc_match(struct obd_export *exp, struct lov_stripe_md *lsm,
3010 __u32 type, ldlm_policy_data_t *policy, __u32 mode,
3011 int *flags, void *data, struct lustre_handle *lockh)
3013 struct ldlm_res_id res_id = { .name = {0} };
3014 struct obd_device *obd = exp->exp_obd;
3016 int lflags = *flags;
3019 res_id.name[0] = lsm->lsm_object_id;
3020 res_id.name[2] = lsm->lsm_object_gr;
3022 OBD_FAIL_RETURN(OBD_FAIL_OSC_MATCH, -EIO);
3024 /* Filesystem lock extents are extended to page boundaries so that
3025 * dealing with the page cache is a little smoother */
3026 policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
3027 policy->l_extent.end |= ~CFS_PAGE_MASK;
3029 /* Next, search for already existing extent locks that will cover us */
3030 rc = ldlm_lock_match(obd->obd_namespace, lflags | LDLM_FL_LVB_READY,
3031 &res_id, type, policy, mode, lockh);
3033 //if (!(*flags & LDLM_FL_TEST_LOCK))
3034 osc_set_data_with_check(lockh, data, lflags);
3037 /* If we're trying to read, we also search for an existing PW lock. The
3038 * VFS and page cache already protect us locally, so lots of readers/
3039 * writers can share a single PW lock. */
3040 if (mode == LCK_PR) {
3041 rc = ldlm_lock_match(obd->obd_namespace,
3042 lflags | LDLM_FL_LVB_READY, &res_id,
3043 type, policy, LCK_PW, lockh);
3044 if (rc == 1 && !(lflags & LDLM_FL_TEST_LOCK)) {
3045 /* FIXME: This is not incredibly elegant, but it might
3046 * be more elegant than adding another parameter to
3047 * lock_match. I want a second opinion. */
3048 osc_set_data_with_check(lockh, data, lflags);
3049 ldlm_lock_addref(lockh, LCK_PR);
3050 ldlm_lock_decref(lockh, LCK_PW);
3056 static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
3057 __u32 mode, struct lustre_handle *lockh)
3061 if (unlikely(mode == LCK_GROUP))
3062 ldlm_lock_decref_and_cancel(lockh, mode);
3064 ldlm_lock_decref(lockh, mode);
3069 static int osc_cancel_unused(struct obd_export *exp,
3070 struct lov_stripe_md *lsm, int flags,
3073 struct obd_device *obd = class_exp2obd(exp);
3074 struct ldlm_res_id res_id = { .name = {0} }, *resp = NULL;
3077 res_id.name[0] = lsm->lsm_object_id;
3078 res_id.name[2] = lsm->lsm_object_gr;
3082 return ldlm_cli_cancel_unused(obd->obd_namespace, resp, flags, opaque);
3085 static int osc_join_lru(struct obd_export *exp,
3086 struct lov_stripe_md *lsm, int join)
3088 struct obd_device *obd = class_exp2obd(exp);
3089 struct ldlm_res_id res_id = { .name = {0} }, *resp = NULL;
3092 res_id.name[0] = lsm->lsm_object_id;
3093 res_id.name[2] = lsm->lsm_object_gr;
3097 return ldlm_cli_join_lru(obd->obd_namespace, resp, join);
3100 static int osc_statfs_interpret(struct ptlrpc_request *req,
3101 struct osc_async_args *aa, int rc)
3103 struct obd_statfs *msfs;
3109 msfs = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*msfs),
3110 lustre_swab_obd_statfs);
3112 CERROR("Can't unpack obd_statfs\n");
3113 GOTO(out, rc = -EPROTO);
3116 memcpy(aa->aa_oi->oi_osfs, msfs, sizeof(*msfs));
3118 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
3122 static int osc_statfs_async(struct obd_device *obd, struct obd_info *oinfo,
3123 __u64 max_age, struct ptlrpc_request_set *rqset)
3125 struct ptlrpc_request *req;
3126 struct osc_async_args *aa;
3127 int size[2] = { sizeof(struct ptlrpc_body), sizeof(*oinfo->oi_osfs) };
3130 /* We could possibly pass max_age in the request (as an absolute
3131 * timestamp or a "seconds.usec ago") so the target can avoid doing
3132 * extra calls into the filesystem if that isn't necessary (e.g.
3133 * during mount that would help a bit). Having relative timestamps
3134 * is not so great if request processing is slow, while absolute
3135 * timestamps are not ideal because they need time synchronization. */
3136 req = ptlrpc_prep_req(obd->u.cli.cl_import, LUSTRE_OST_VERSION,
3137 OST_STATFS, 1, NULL, NULL);
3141 ptlrpc_req_set_repsize(req, 2, size);
3142 req->rq_request_portal = OST_CREATE_PORTAL; //XXX FIXME bug 249
3144 req->rq_interpret_reply = osc_statfs_interpret;
3145 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
3146 aa = (struct osc_async_args *)&req->rq_async_args;
3149 ptlrpc_set_add_req(rqset, req);
3153 static int osc_statfs(struct obd_device *obd, struct obd_statfs *osfs,
3156 struct obd_statfs *msfs;
3157 struct ptlrpc_request *req;
3158 int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*osfs) };
3161 /* We could possibly pass max_age in the request (as an absolute
3162 * timestamp or a "seconds.usec ago") so the target can avoid doing
3163 * extra calls into the filesystem if that isn't necessary (e.g.
3164 * during mount that would help a bit). Having relative timestamps
3165 * is not so great if request processing is slow, while absolute
3166 * timestamps are not ideal because they need time synchronization. */
3167 req = ptlrpc_prep_req(obd->u.cli.cl_import, LUSTRE_OST_VERSION,
3168 OST_STATFS, 1, NULL, NULL);
3172 ptlrpc_req_set_repsize(req, 2, size);
3173 req->rq_request_portal = OST_CREATE_PORTAL; //XXX FIXME bug 249
3175 rc = ptlrpc_queue_wait(req);
3179 msfs = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*msfs),
3180 lustre_swab_obd_statfs);
3182 CERROR("Can't unpack obd_statfs\n");
3183 GOTO(out, rc = -EPROTO);
3186 memcpy(osfs, msfs, sizeof(*osfs));
3190 ptlrpc_req_finished(req);
3194 /* Retrieve object striping information.
3196 * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
3197 * the maximum number of OST indices which will fit in the user buffer.
3198 * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
3200 static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
3202 struct lov_user_md lum, *lumk;
3203 int rc = 0, lum_size;
3209 if (copy_from_user(&lum, lump, sizeof(lum)))
3212 if (lum.lmm_magic != LOV_USER_MAGIC)
3215 if (lum.lmm_stripe_count > 0) {
3216 lum_size = sizeof(lum) + sizeof(lum.lmm_objects[0]);
3217 OBD_ALLOC(lumk, lum_size);
3221 lumk->lmm_objects[0].l_object_id = lsm->lsm_object_id;
3222 lumk->lmm_objects[0].l_object_gr = lsm->lsm_object_gr;
3224 lum_size = sizeof(lum);
3228 lumk->lmm_object_id = lsm->lsm_object_id;
3229 lumk->lmm_object_gr = lsm->lsm_object_gr;
3230 lumk->lmm_stripe_count = 1;
3232 if (copy_to_user(lump, lumk, lum_size))
3236 OBD_FREE(lumk, lum_size);
3242 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
3243 void *karg, void *uarg)
3245 struct obd_device *obd = exp->exp_obd;
3246 struct obd_ioctl_data *data = karg;
3250 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
3253 if (!try_module_get(THIS_MODULE)) {
3254 CERROR("Can't get module. Is it alive?");
3259 case OBD_IOC_LOV_GET_CONFIG: {
3261 struct lov_desc *desc;
3262 struct obd_uuid uuid;
3266 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
3267 GOTO(out, err = -EINVAL);
3269 data = (struct obd_ioctl_data *)buf;
3271 if (sizeof(*desc) > data->ioc_inllen1) {
3272 obd_ioctl_freedata(buf, len);
3273 GOTO(out, err = -EINVAL);
3276 if (data->ioc_inllen2 < sizeof(uuid)) {
3277 obd_ioctl_freedata(buf, len);
3278 GOTO(out, err = -EINVAL);
3281 desc = (struct lov_desc *)data->ioc_inlbuf1;
3282 desc->ld_tgt_count = 1;
3283 desc->ld_active_tgt_count = 1;
3284 desc->ld_default_stripe_count = 1;
3285 desc->ld_default_stripe_size = 0;
3286 desc->ld_default_stripe_offset = 0;
3287 desc->ld_pattern = 0;
3288 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
3290 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
3292 err = copy_to_user((void *)uarg, buf, len);
3295 obd_ioctl_freedata(buf, len);
3298 case LL_IOC_LOV_SETSTRIPE:
3299 err = obd_alloc_memmd(exp, karg);
3303 case LL_IOC_LOV_GETSTRIPE:
3304 err = osc_getstripe(karg, uarg);
3306 case OBD_IOC_CLIENT_RECOVER:
3307 err = ptlrpc_recover_import(obd->u.cli.cl_import,
3312 case IOC_OSC_SET_ACTIVE:
3313 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
3316 case OBD_IOC_POLL_QUOTACHECK:
3317 err = lquota_poll_check(quota_interface, exp,
3318 (struct if_quotacheck *)karg);
3321 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
3322 cmd, cfs_curproc_comm());
3323 GOTO(out, err = -ENOTTY);
3326 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
3329 module_put(THIS_MODULE);
3334 static int osc_get_info(struct obd_export *exp, obd_count keylen,
3335 void *key, __u32 *vallen, void *val)
3338 if (!vallen || !val)
3341 if (keylen > strlen("lock_to_stripe") &&
3342 strcmp(key, "lock_to_stripe") == 0) {
3343 __u32 *stripe = val;
3344 *vallen = sizeof(*stripe);
3347 } else if (keylen >= strlen("last_id") && strcmp(key, "last_id") == 0) {
3348 struct ptlrpc_request *req;
3350 char *bufs[2] = { NULL, key };
3351 int rc, size[2] = { sizeof(struct ptlrpc_body), keylen };
3353 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
3354 OST_GET_INFO, 2, size, bufs);
3358 size[REPLY_REC_OFF] = *vallen;
3359 ptlrpc_req_set_repsize(req, 2, size);
3360 rc = ptlrpc_queue_wait(req);
3364 reply = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*reply),
3365 lustre_swab_ost_last_id);
3366 if (reply == NULL) {
3367 CERROR("Can't unpack OST last ID\n");
3368 GOTO(out, rc = -EPROTO);
3370 *((obd_id *)val) = *reply;
3372 ptlrpc_req_finished(req);
3378 static int osc_setinfo_mds_conn_interpret(struct ptlrpc_request *req,
3381 struct llog_ctxt *ctxt;
3382 struct obd_import *imp = req->rq_import;
3388 ctxt = llog_get_context(imp->imp_obd, LLOG_MDS_OST_ORIG_CTXT);
3391 rc = llog_initiator_connect(ctxt);
3393 CERROR("cannot establish connection for "
3394 "ctxt %p: %d\n", ctxt, rc);
3397 spin_lock(&imp->imp_lock);
3398 imp->imp_server_timeout = 1;
3399 imp->imp_pingable = 1;
3400 spin_unlock(&imp->imp_lock);
3401 CDEBUG(D_HA, "pinging OST %s\n", obd2cli_tgt(imp->imp_obd));
3406 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
3407 void *key, obd_count vallen, void *val,
3408 struct ptlrpc_request_set *set)
3410 struct ptlrpc_request *req;
3411 struct obd_device *obd = exp->exp_obd;
3412 struct obd_import *imp = class_exp2cliimp(exp);
3413 int size[3] = { sizeof(struct ptlrpc_body), keylen, vallen };
3414 char *bufs[3] = { NULL, key, val };
3417 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
3419 if (KEY_IS(KEY_NEXT_ID)) {
3420 if (vallen != sizeof(obd_id))
3422 obd->u.cli.cl_oscc.oscc_next_id = *((obd_id*)val) + 1;
3423 CDEBUG(D_HA, "%s: set oscc_next_id = "LPU64"\n",
3424 exp->exp_obd->obd_name,
3425 obd->u.cli.cl_oscc.oscc_next_id);
3430 if (KEY_IS("unlinked")) {
3431 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3432 spin_lock(&oscc->oscc_lock);
3433 oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
3434 spin_unlock(&oscc->oscc_lock);
3438 if (KEY_IS(KEY_INIT_RECOV)) {
3439 if (vallen != sizeof(int))
3441 spin_lock(&imp->imp_lock);
3442 imp->imp_initial_recov = *(int *)val;
3443 spin_unlock(&imp->imp_lock);
3444 CDEBUG(D_HA, "%s: set imp_initial_recov = %d\n",
3445 exp->exp_obd->obd_name,
3446 imp->imp_initial_recov);
3450 if (KEY_IS("checksum")) {
3451 if (vallen != sizeof(int))
3453 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
3457 if (KEY_IS(KEY_FLUSH_CTX)) {
3458 sptlrpc_import_flush_my_ctx(imp);
3465 /* We pass all other commands directly to OST. Since nobody calls osc
3466 methods directly and everybody is supposed to go through LOV, we
3467 assume lov checked invalid values for us.
3468 The only recognised values so far are evict_by_nid and mds_conn.
3469 Even if something bad goes through, we'd get a -EINVAL from OST
3472 req = ptlrpc_prep_req(imp, LUSTRE_OST_VERSION, OST_SET_INFO, 3, size,
3477 if (KEY_IS("mds_conn")) {
3478 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3480 oscc->oscc_oa.o_gr = (*(__u32 *)val);
3481 oscc->oscc_oa.o_valid |= OBD_MD_FLGROUP;
3482 LASSERT(oscc->oscc_oa.o_gr > 0);
3483 req->rq_interpret_reply = osc_setinfo_mds_conn_interpret;
3486 ptlrpc_req_set_repsize(req, 1, NULL);
3487 ptlrpc_set_add_req(set, req);
3488 ptlrpc_check_set(set);
3494 static struct llog_operations osc_size_repl_logops = {
3495 lop_cancel: llog_obd_repl_cancel
3498 static struct llog_operations osc_mds_ost_orig_logops;
3499 static int osc_llog_init(struct obd_device *obd, struct obd_llogs *llogs,
3500 struct obd_device *tgt, int count,
3501 struct llog_catid *catid, struct obd_uuid *uuid)
3506 spin_lock(&obd->obd_dev_lock);
3507 if (osc_mds_ost_orig_logops.lop_setup != llog_obd_origin_setup) {
3508 osc_mds_ost_orig_logops = llog_lvfs_ops;
3509 osc_mds_ost_orig_logops.lop_setup = llog_obd_origin_setup;
3510 osc_mds_ost_orig_logops.lop_cleanup = llog_obd_origin_cleanup;
3511 osc_mds_ost_orig_logops.lop_add = llog_obd_origin_add;
3512 osc_mds_ost_orig_logops.lop_connect = llog_origin_connect;
3514 spin_unlock(&obd->obd_dev_lock);
3516 rc = llog_setup(obd, llogs, LLOG_MDS_OST_ORIG_CTXT, tgt, count,
3517 &catid->lci_logid, &osc_mds_ost_orig_logops);
3519 CERROR("failed LLOG_MDS_OST_ORIG_CTXT\n");
3523 rc = llog_setup(obd, llogs, LLOG_SIZE_REPL_CTXT, tgt, count, NULL,
3524 &osc_size_repl_logops);
3526 CERROR("failed LLOG_SIZE_REPL_CTXT\n");
3529 CERROR("osc '%s' tgt '%s' cnt %d catid %p rc=%d\n",
3530 obd->obd_name, tgt->obd_name, count, catid, rc);
3531 CERROR("logid "LPX64":0x%x\n",
3532 catid->lci_logid.lgl_oid, catid->lci_logid.lgl_ogen);
3537 static int osc_llog_finish(struct obd_device *obd, int count)
3539 struct llog_ctxt *ctxt;
3540 int rc = 0, rc2 = 0;
3543 ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
3545 rc = llog_cleanup(ctxt);
3547 ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3549 rc2 = llog_cleanup(ctxt);
3556 static int osc_reconnect(struct obd_export *exp, struct obd_device *obd,
3557 struct obd_uuid *cluuid,
3558 struct obd_connect_data *data)
3560 struct client_obd *cli = &obd->u.cli;
3562 if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
3565 client_obd_list_lock(&cli->cl_loi_list_lock);
3566 data->ocd_grant = cli->cl_avail_grant ?:
3567 2 * cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT;
3568 lost_grant = cli->cl_lost_grant;
3569 cli->cl_lost_grant = 0;
3570 client_obd_list_unlock(&cli->cl_loi_list_lock);
3572 CDEBUG(D_CACHE, "request ocd_grant: %d cl_avail_grant: %ld "
3573 "cl_lost_grant: %ld\n", data->ocd_grant,
3574 cli->cl_avail_grant, lost_grant);
3575 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
3576 " ocd_grant: %d\n", data->ocd_connect_flags,
3577 data->ocd_version, data->ocd_grant);
3583 static int osc_disconnect(struct obd_export *exp)
3585 struct obd_device *obd = class_exp2obd(exp);
3586 struct llog_ctxt *ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3589 if (obd->u.cli.cl_conn_count == 1)
3590 /* flush any remaining cancel messages out to the target */
3591 llog_sync(ctxt, exp);
3593 rc = client_disconnect_export(exp);
3597 static int osc_import_event(struct obd_device *obd,
3598 struct obd_import *imp,
3599 enum obd_import_event event)
3601 struct client_obd *cli;
3605 LASSERT(imp->imp_obd == obd);
3608 case IMP_EVENT_DISCON: {
3609 /* Only do this on the MDS OSC's */
3610 if (imp->imp_server_timeout) {
3611 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3613 spin_lock(&oscc->oscc_lock);
3614 oscc->oscc_flags |= OSCC_FLAG_RECOVERING;
3615 spin_unlock(&oscc->oscc_lock);
3618 client_obd_list_lock(&cli->cl_loi_list_lock);
3619 cli->cl_avail_grant = 0;
3620 cli->cl_lost_grant = 0;
3621 client_obd_list_unlock(&cli->cl_loi_list_lock);
3624 case IMP_EVENT_INACTIVE: {
3625 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
3628 case IMP_EVENT_INVALIDATE: {
3629 struct ldlm_namespace *ns = obd->obd_namespace;
3633 client_obd_list_lock(&cli->cl_loi_list_lock);
3634 /* all pages go to failing rpcs due to the invalid import */
3635 osc_check_rpcs(cli);
3636 client_obd_list_unlock(&cli->cl_loi_list_lock);
3638 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3642 case IMP_EVENT_ACTIVE: {
3643 /* Only do this on the MDS OSC's */
3644 if (imp->imp_server_timeout) {
3645 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3647 spin_lock(&oscc->oscc_lock);
3648 oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
3649 spin_unlock(&oscc->oscc_lock);
3651 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
3654 case IMP_EVENT_OCD: {
3655 struct obd_connect_data *ocd = &imp->imp_connect_data;
3657 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
3658 osc_init_grant(&obd->u.cli, ocd);
3661 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
3662 imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
3664 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
3668 CERROR("Unknown import event %d\n", event);
3674 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
3680 rc = ptlrpcd_addref();
3684 rc = client_obd_setup(obd, lcfg);
3688 struct lprocfs_static_vars lvars;
3689 struct client_obd *cli = &obd->u.cli;
3691 lprocfs_init_vars(osc, &lvars);
3692 if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
3693 lproc_osc_attach_seqstat(obd);
3694 ptlrpc_lprocfs_register_obd(obd);
3698 /* We need to allocate a few requests more, because
3699 brw_interpret_oap tries to create new requests before freeing
3700 previous ones. Ideally we want to have 2x max_rpcs_in_flight
3701 reserved, but I afraid that might be too much wasted RAM
3702 in fact, so 2 is just my guess and still should work. */
3703 cli->cl_import->imp_rq_pool =
3704 ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
3706 ptlrpc_add_rqs_to_pool);
3712 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
3718 case OBD_CLEANUP_EARLY: {
3719 struct obd_import *imp;
3720 imp = obd->u.cli.cl_import;
3721 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
3722 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
3723 ptlrpc_deactivate_import(imp);
3724 spin_lock(&imp->imp_lock);
3725 imp->imp_pingable = 0;
3726 spin_unlock(&imp->imp_lock);
3729 case OBD_CLEANUP_EXPORTS: {
3730 /* If we set up but never connected, the
3731 client import will not have been cleaned. */
3732 if (obd->u.cli.cl_import) {
3733 struct obd_import *imp;
3734 imp = obd->u.cli.cl_import;
3735 CDEBUG(D_CONFIG, "%s: client import never connected\n",
3737 ptlrpc_invalidate_import(imp);
3738 ptlrpc_free_rq_pool(imp->imp_rq_pool);
3739 class_destroy_import(imp);
3740 obd->u.cli.cl_import = NULL;
3744 case OBD_CLEANUP_SELF_EXP:
3745 rc = obd_llog_finish(obd, 0);
3747 CERROR("failed to cleanup llogging subsystems\n");
3749 case OBD_CLEANUP_OBD:
3755 int osc_cleanup(struct obd_device *obd)
3757 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3761 ptlrpc_lprocfs_unregister_obd(obd);
3762 lprocfs_obd_cleanup(obd);
3764 spin_lock(&oscc->oscc_lock);
3765 oscc->oscc_flags &= ~OSCC_FLAG_RECOVERING;
3766 oscc->oscc_flags |= OSCC_FLAG_EXITING;
3767 spin_unlock(&oscc->oscc_lock);
3769 /* free memory of osc quota cache */
3770 lquota_cleanup(quota_interface, obd);
3772 rc = client_obd_cleanup(obd);
3778 static int osc_process_config(struct obd_device *obd, obd_count len, void *buf)
3780 struct lustre_cfg *lcfg = buf;
3781 struct lprocfs_static_vars lvars;
3784 lprocfs_init_vars(osc, &lvars);
3786 rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars, lcfg, obd);
3790 struct obd_ops osc_obd_ops = {
3791 .o_owner = THIS_MODULE,
3792 .o_setup = osc_setup,
3793 .o_precleanup = osc_precleanup,
3794 .o_cleanup = osc_cleanup,
3795 .o_add_conn = client_import_add_conn,
3796 .o_del_conn = client_import_del_conn,
3797 .o_connect = client_connect_import,
3798 .o_reconnect = osc_reconnect,
3799 .o_disconnect = osc_disconnect,
3800 .o_statfs = osc_statfs,
3801 .o_statfs_async = osc_statfs_async,
3802 .o_packmd = osc_packmd,
3803 .o_unpackmd = osc_unpackmd,
3804 .o_precreate = osc_precreate,
3805 .o_create = osc_create,
3806 .o_destroy = osc_destroy,
3807 .o_getattr = osc_getattr,
3808 .o_getattr_async = osc_getattr_async,
3809 .o_setattr = osc_setattr,
3810 .o_setattr_async = osc_setattr_async,
3812 .o_brw_async = osc_brw_async,
3813 .o_prep_async_page = osc_prep_async_page,
3814 .o_queue_async_io = osc_queue_async_io,
3815 .o_set_async_flags = osc_set_async_flags,
3816 .o_queue_group_io = osc_queue_group_io,
3817 .o_trigger_group_io = osc_trigger_group_io,
3818 .o_teardown_async_page = osc_teardown_async_page,
3819 .o_punch = osc_punch,
3821 .o_enqueue = osc_enqueue,
3822 .o_match = osc_match,
3823 .o_change_cbdata = osc_change_cbdata,
3824 .o_cancel = osc_cancel,
3825 .o_cancel_unused = osc_cancel_unused,
3826 .o_join_lru = osc_join_lru,
3827 .o_iocontrol = osc_iocontrol,
3828 .o_get_info = osc_get_info,
3829 .o_set_info_async = osc_set_info_async,
3830 .o_import_event = osc_import_event,
3831 .o_llog_init = osc_llog_init,
3832 .o_llog_finish = osc_llog_finish,
3833 .o_process_config = osc_process_config,
3835 int __init osc_init(void)
3837 struct lprocfs_static_vars lvars;
3841 lprocfs_init_vars(osc, &lvars);
3843 request_module("lquota");
3844 quota_interface = PORTAL_SYMBOL_GET(osc_quota_interface);
3845 lquota_init(quota_interface);
3846 init_obd_quota_ops(quota_interface, &osc_obd_ops);
3848 rc = class_register_type(&osc_obd_ops, NULL, lvars.module_vars,
3849 LUSTRE_OSC_NAME, NULL);
3851 if (quota_interface)
3852 PORTAL_SYMBOL_PUT(osc_quota_interface);
3860 static void /*__exit*/ osc_exit(void)
3862 lquota_exit(quota_interface);
3863 if (quota_interface)
3864 PORTAL_SYMBOL_PUT(osc_quota_interface);
3866 class_unregister_type(LUSTRE_OSC_NAME);
3869 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
3870 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3871 MODULE_LICENSE("GPL");
3873 cfs_module(osc, LUSTRE_VERSION_STRING, osc_init, osc_exit);