4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
30 * Copyright (c) 2011, 2014, Intel Corporation.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
37 #define DEBUG_SUBSYSTEM S_OSC
39 #include <libcfs/libcfs.h>
41 #include <lustre_dlm.h>
42 #include <lustre_net.h>
43 #include <lustre/lustre_user.h>
44 #include <obd_cksum.h>
45 #include <lustre_ha.h>
46 #include <lprocfs_status.h>
47 #include <lustre_ioctl.h>
48 #include <lustre_debug.h>
49 #include <lustre_param.h>
50 #include <lustre_fid.h>
51 #include <obd_class.h>
52 #include "osc_internal.h"
53 #include "osc_cl_internal.h"
55 struct osc_brw_async_args {
61 struct brw_page **aa_ppga;
62 struct client_obd *aa_cli;
63 struct list_head aa_oaps;
64 struct list_head aa_exts;
65 struct obd_capa *aa_ocapa;
66 struct cl_req *aa_clerq;
69 #define osc_grant_args osc_brw_async_args
71 struct osc_setattr_args {
73 obd_enqueue_update_f sa_upcall;
77 struct osc_fsync_args {
78 struct obd_info *fa_oi;
79 obd_enqueue_update_f fa_upcall;
83 struct osc_enqueue_args {
84 struct obd_export *oa_exp;
88 osc_enqueue_upcall_f oa_upcall;
90 struct ost_lvb *oa_lvb;
91 struct lustre_handle oa_lockh;
92 unsigned int oa_agl:1;
95 static void osc_release_ppga(struct brw_page **ppga, size_t count);
96 static int brw_interpret(const struct lu_env *env, struct ptlrpc_request *req,
99 static inline void osc_pack_capa(struct ptlrpc_request *req,
100 struct ost_body *body, void *capa)
102 struct obd_capa *oc = (struct obd_capa *)capa;
103 struct lustre_capa *c;
108 c = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
111 body->oa.o_valid |= OBD_MD_FLOSSCAPA;
112 DEBUG_CAPA(D_SEC, c, "pack");
115 void osc_pack_req_body(struct ptlrpc_request *req, struct obd_info *oinfo)
117 struct ost_body *body;
119 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
122 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
124 osc_pack_capa(req, body, oinfo->oi_capa);
127 void osc_set_capa_size(struct ptlrpc_request *req,
128 const struct req_msg_field *field,
132 req_capsule_set_size(&req->rq_pill, field, RCL_CLIENT, 0);
134 /* it is already calculated as sizeof struct obd_capa */
138 int osc_getattr_interpret(const struct lu_env *env,
139 struct ptlrpc_request *req,
140 struct osc_async_args *aa, int rc)
142 struct ost_body *body;
148 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
150 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
151 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
152 aa->aa_oi->oi_oa, &body->oa);
154 /* This should really be sent by the OST */
155 aa->aa_oi->oi_oa->o_blksize = DT_MAX_BRW_SIZE;
156 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
158 CDEBUG(D_INFO, "can't unpack ost_body\n");
160 aa->aa_oi->oi_oa->o_valid = 0;
163 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
167 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
168 struct obd_info *oinfo)
170 struct ptlrpc_request *req;
171 struct ost_body *body;
175 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
179 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
180 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
182 ptlrpc_request_free(req);
186 osc_pack_req_body(req, oinfo);
188 ptlrpc_request_set_replen(req);
190 rc = ptlrpc_queue_wait(req);
194 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
196 GOTO(out, rc = -EPROTO);
198 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
199 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oinfo->oi_oa,
202 oinfo->oi_oa->o_blksize = cli_brw_size(exp->exp_obd);
203 oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
207 ptlrpc_req_finished(req);
211 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
212 struct obd_info *oinfo, struct obd_trans_info *oti)
214 struct ptlrpc_request *req;
215 struct ost_body *body;
219 LASSERT(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP);
221 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
225 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
226 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
228 ptlrpc_request_free(req);
232 osc_pack_req_body(req, oinfo);
234 ptlrpc_request_set_replen(req);
236 rc = ptlrpc_queue_wait(req);
240 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
242 GOTO(out, rc = -EPROTO);
244 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oinfo->oi_oa,
249 ptlrpc_req_finished(req);
253 static int osc_setattr_interpret(const struct lu_env *env,
254 struct ptlrpc_request *req,
255 struct osc_setattr_args *sa, int rc)
257 struct ost_body *body;
263 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
265 GOTO(out, rc = -EPROTO);
267 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, sa->sa_oa,
270 rc = sa->sa_upcall(sa->sa_cookie, rc);
274 int osc_setattr_async_base(struct obd_export *exp, struct obd_info *oinfo,
275 struct obd_trans_info *oti,
276 obd_enqueue_update_f upcall, void *cookie,
277 struct ptlrpc_request_set *rqset)
279 struct ptlrpc_request *req;
280 struct osc_setattr_args *sa;
284 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
288 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
289 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
291 ptlrpc_request_free(req);
295 if (oti && oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)
296 oinfo->oi_oa->o_lcookie = *oti->oti_logcookies;
298 osc_pack_req_body(req, oinfo);
300 ptlrpc_request_set_replen(req);
302 /* do mds to ost setattr asynchronously */
304 /* Do not wait for response. */
305 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
307 req->rq_interpret_reply =
308 (ptlrpc_interpterer_t)osc_setattr_interpret;
310 CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
311 sa = ptlrpc_req_async_args(req);
312 sa->sa_oa = oinfo->oi_oa;
313 sa->sa_upcall = upcall;
314 sa->sa_cookie = cookie;
316 if (rqset == PTLRPCD_SET)
317 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
319 ptlrpc_set_add_req(rqset, req);
325 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
326 struct obd_trans_info *oti,
327 struct ptlrpc_request_set *rqset)
329 return osc_setattr_async_base(exp, oinfo, oti,
330 oinfo->oi_cb_up, oinfo, rqset);
333 static int osc_create(const struct lu_env *env, struct obd_export *exp,
334 struct obdo *oa, struct obd_trans_info *oti)
336 struct ptlrpc_request *req;
337 struct ost_body *body;
342 LASSERT(oa->o_valid & OBD_MD_FLGROUP);
343 LASSERT(fid_seq_is_echo(ostid_seq(&oa->o_oi)));
345 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
347 GOTO(out, rc = -ENOMEM);
349 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
351 ptlrpc_request_free(req);
355 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
358 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
360 ptlrpc_request_set_replen(req);
362 if ((oa->o_valid & OBD_MD_FLFLAGS) &&
363 oa->o_flags == OBD_FL_DELORPHAN) {
365 "delorphan from OST integration");
366 /* Don't resend the delorphan req */
367 req->rq_no_resend = req->rq_no_delay = 1;
370 rc = ptlrpc_queue_wait(req);
374 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
376 GOTO(out_req, rc = -EPROTO);
378 CDEBUG(D_INFO, "oa flags %x\n", oa->o_flags);
379 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
381 oa->o_blksize = cli_brw_size(exp->exp_obd);
382 oa->o_valid |= OBD_MD_FLBLKSZ;
385 if (oa->o_valid & OBD_MD_FLCOOKIE) {
386 if (oti->oti_logcookies == NULL)
387 oti->oti_logcookies = &oti->oti_onecookie;
389 *oti->oti_logcookies = oa->o_lcookie;
393 CDEBUG(D_HA, "transno: "LPD64"\n",
394 lustre_msg_get_transno(req->rq_repmsg));
396 ptlrpc_req_finished(req);
401 int osc_punch_base(struct obd_export *exp, struct obd_info *oinfo,
402 obd_enqueue_update_f upcall, void *cookie,
403 struct ptlrpc_request_set *rqset)
405 struct ptlrpc_request *req;
406 struct osc_setattr_args *sa;
407 struct ost_body *body;
411 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
415 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
416 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
418 ptlrpc_request_free(req);
421 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
422 ptlrpc_at_set_req_timeout(req);
424 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
426 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
428 osc_pack_capa(req, body, oinfo->oi_capa);
430 ptlrpc_request_set_replen(req);
432 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
433 CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
434 sa = ptlrpc_req_async_args(req);
435 sa->sa_oa = oinfo->oi_oa;
436 sa->sa_upcall = upcall;
437 sa->sa_cookie = cookie;
438 if (rqset == PTLRPCD_SET)
439 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
441 ptlrpc_set_add_req(rqset, req);
446 static int osc_sync_interpret(const struct lu_env *env,
447 struct ptlrpc_request *req,
450 struct osc_fsync_args *fa = arg;
451 struct ost_body *body;
457 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
459 CERROR ("can't unpack ost_body\n");
460 GOTO(out, rc = -EPROTO);
463 *fa->fa_oi->oi_oa = body->oa;
465 rc = fa->fa_upcall(fa->fa_cookie, rc);
469 int osc_sync_base(struct obd_export *exp, struct obd_info *oinfo,
470 obd_enqueue_update_f upcall, void *cookie,
471 struct ptlrpc_request_set *rqset)
473 struct ptlrpc_request *req;
474 struct ost_body *body;
475 struct osc_fsync_args *fa;
479 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
483 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
484 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
486 ptlrpc_request_free(req);
490 /* overload the size and blocks fields in the oa with start/end */
491 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
493 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
495 osc_pack_capa(req, body, oinfo->oi_capa);
497 ptlrpc_request_set_replen(req);
498 req->rq_interpret_reply = osc_sync_interpret;
500 CLASSERT(sizeof(*fa) <= sizeof(req->rq_async_args));
501 fa = ptlrpc_req_async_args(req);
503 fa->fa_upcall = upcall;
504 fa->fa_cookie = cookie;
506 if (rqset == PTLRPCD_SET)
507 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
509 ptlrpc_set_add_req(rqset, req);
514 /* Find and cancel locally locks matched by @mode in the resource found by
515 * @objid. Found locks are added into @cancel list. Returns the amount of
516 * locks added to @cancels list. */
517 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
518 struct list_head *cancels,
519 ldlm_mode_t mode, __u64 lock_flags)
521 struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
522 struct ldlm_res_id res_id;
523 struct ldlm_resource *res;
527 /* Return, i.e. cancel nothing, only if ELC is supported (flag in
528 * export) but disabled through procfs (flag in NS).
530 * This distinguishes from a case when ELC is not supported originally,
531 * when we still want to cancel locks in advance and just cancel them
532 * locally, without sending any RPC. */
533 if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
536 ostid_build_res_name(&oa->o_oi, &res_id);
537 res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
541 LDLM_RESOURCE_ADDREF(res);
542 count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
543 lock_flags, 0, NULL);
544 LDLM_RESOURCE_DELREF(res);
545 ldlm_resource_putref(res);
549 static int osc_destroy_interpret(const struct lu_env *env,
550 struct ptlrpc_request *req, void *data,
553 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
555 atomic_dec(&cli->cl_destroy_in_flight);
556 wake_up(&cli->cl_destroy_waitq);
560 static int osc_can_send_destroy(struct client_obd *cli)
562 if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
563 cli->cl_max_rpcs_in_flight) {
564 /* The destroy request can be sent */
567 if (atomic_dec_return(&cli->cl_destroy_in_flight) <
568 cli->cl_max_rpcs_in_flight) {
570 * The counter has been modified between the two atomic
573 wake_up(&cli->cl_destroy_waitq);
578 /* Destroy requests can be async always on the client, and we don't even really
579 * care about the return code since the client cannot do anything at all about
581 * When the MDS is unlinking a filename, it saves the file objects into a
582 * recovery llog, and these object records are cancelled when the OST reports
583 * they were destroyed and sync'd to disk (i.e. transaction committed).
584 * If the client dies, or the OST is down when the object should be destroyed,
585 * the records are not cancelled, and when the OST reconnects to the MDS next,
586 * it will retrieve the llog unlink logs and then sends the log cancellation
587 * cookies to the MDS after committing destroy transactions. */
588 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
589 struct obdo *oa, struct obd_trans_info *oti)
591 struct client_obd *cli = &exp->exp_obd->u.cli;
592 struct ptlrpc_request *req;
593 struct ost_body *body;
594 struct list_head cancels = LIST_HEAD_INIT(cancels);
599 CDEBUG(D_INFO, "oa NULL\n");
603 count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
604 LDLM_FL_DISCARD_DATA);
606 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
608 ldlm_lock_list_put(&cancels, l_bl_ast, count);
612 osc_set_capa_size(req, &RMF_CAPA1, NULL);
613 rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
616 ptlrpc_request_free(req);
620 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
621 ptlrpc_at_set_req_timeout(req);
623 if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
624 oa->o_lcookie = *oti->oti_logcookies;
625 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
627 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
629 ptlrpc_request_set_replen(req);
631 /* If osc_destory is for destroying the unlink orphan,
632 * sent from MDT to OST, which should not be blocked here,
633 * because the process might be triggered by ptlrpcd, and
634 * it is not good to block ptlrpcd thread (b=16006)*/
635 if (!(oa->o_flags & OBD_FL_DELORPHAN)) {
636 req->rq_interpret_reply = osc_destroy_interpret;
637 if (!osc_can_send_destroy(cli)) {
638 struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP,
642 * Wait until the number of on-going destroy RPCs drops
643 * under max_rpc_in_flight
645 l_wait_event_exclusive(cli->cl_destroy_waitq,
646 osc_can_send_destroy(cli), &lwi);
650 /* Do not wait for response */
651 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
655 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
658 u64 bits = OBD_MD_FLBLOCKS | OBD_MD_FLGRANT;
660 LASSERT(!(oa->o_valid & bits));
663 spin_lock(&cli->cl_loi_list_lock);
664 oa->o_dirty = cli->cl_dirty_pages << PAGE_CACHE_SHIFT;
665 if (unlikely(cli->cl_dirty_pages - cli->cl_dirty_transit >
666 cli->cl_dirty_max_pages)) {
667 CERROR("dirty %lu - %lu > dirty_max %lu\n",
668 cli->cl_dirty_pages, cli->cl_dirty_transit,
669 cli->cl_dirty_max_pages);
671 } else if (unlikely(atomic_long_read(&obd_dirty_pages) -
672 atomic_long_read(&obd_dirty_transit_pages) >
673 (obd_max_dirty_pages + 1))) {
674 /* The atomic_read() allowing the atomic_inc() are
675 * not covered by a lock thus they may safely race and trip
676 * this CERROR() unless we add in a small fudge factor (+1). */
677 CERROR("%s: dirty %ld - %ld > system dirty_max %lu\n",
678 cli->cl_import->imp_obd->obd_name,
679 atomic_long_read(&obd_dirty_pages),
680 atomic_long_read(&obd_dirty_transit_pages),
681 obd_max_dirty_pages);
683 } else if (unlikely(cli->cl_dirty_max_pages - cli->cl_dirty_pages >
685 CERROR("dirty %lu - dirty_max %lu too big???\n",
686 cli->cl_dirty_pages, cli->cl_dirty_max_pages);
689 unsigned long max_in_flight = (cli->cl_max_pages_per_rpc <<
691 (cli->cl_max_rpcs_in_flight + 1);
692 oa->o_undirty = max(cli->cl_dirty_max_pages << PAGE_CACHE_SHIFT,
695 oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
696 oa->o_dropped = cli->cl_lost_grant;
697 cli->cl_lost_grant = 0;
698 spin_unlock(&cli->cl_loi_list_lock);
699 CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
700 oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
704 void osc_update_next_shrink(struct client_obd *cli)
706 cli->cl_next_shrink_grant =
707 cfs_time_shift(cli->cl_grant_shrink_interval);
708 CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
709 cli->cl_next_shrink_grant);
712 static void __osc_update_grant(struct client_obd *cli, u64 grant)
714 spin_lock(&cli->cl_loi_list_lock);
715 cli->cl_avail_grant += grant;
716 spin_unlock(&cli->cl_loi_list_lock);
719 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
721 if (body->oa.o_valid & OBD_MD_FLGRANT) {
722 CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
723 __osc_update_grant(cli, body->oa.o_grant);
727 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
728 u32 keylen, void *key,
729 u32 vallen, void *val,
730 struct ptlrpc_request_set *set);
732 static int osc_shrink_grant_interpret(const struct lu_env *env,
733 struct ptlrpc_request *req,
736 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
737 struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
738 struct ost_body *body;
741 __osc_update_grant(cli, oa->o_grant);
745 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
747 osc_update_grant(cli, body);
753 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
755 spin_lock(&cli->cl_loi_list_lock);
756 oa->o_grant = cli->cl_avail_grant / 4;
757 cli->cl_avail_grant -= oa->o_grant;
758 spin_unlock(&cli->cl_loi_list_lock);
759 if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
760 oa->o_valid |= OBD_MD_FLFLAGS;
763 oa->o_flags |= OBD_FL_SHRINK_GRANT;
764 osc_update_next_shrink(cli);
767 /* Shrink the current grant, either from some large amount to enough for a
768 * full set of in-flight RPCs, or if we have already shrunk to that limit
769 * then to enough for a single RPC. This avoids keeping more grant than
770 * needed, and avoids shrinking the grant piecemeal. */
771 static int osc_shrink_grant(struct client_obd *cli)
773 __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) *
774 (cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT);
776 spin_lock(&cli->cl_loi_list_lock);
777 if (cli->cl_avail_grant <= target_bytes)
778 target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
779 spin_unlock(&cli->cl_loi_list_lock);
781 return osc_shrink_grant_to_target(cli, target_bytes);
784 int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
787 struct ost_body *body;
790 spin_lock(&cli->cl_loi_list_lock);
791 /* Don't shrink if we are already above or below the desired limit
792 * We don't want to shrink below a single RPC, as that will negatively
793 * impact block allocation and long-term performance. */
794 if (target_bytes < cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT)
795 target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
797 if (target_bytes >= cli->cl_avail_grant) {
798 spin_unlock(&cli->cl_loi_list_lock);
801 spin_unlock(&cli->cl_loi_list_lock);
807 osc_announce_cached(cli, &body->oa, 0);
809 spin_lock(&cli->cl_loi_list_lock);
810 body->oa.o_grant = cli->cl_avail_grant - target_bytes;
811 cli->cl_avail_grant = target_bytes;
812 spin_unlock(&cli->cl_loi_list_lock);
813 if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
814 body->oa.o_valid |= OBD_MD_FLFLAGS;
815 body->oa.o_flags = 0;
817 body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
818 osc_update_next_shrink(cli);
820 rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
821 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
822 sizeof(*body), body, NULL);
824 __osc_update_grant(cli, body->oa.o_grant);
829 static int osc_should_shrink_grant(struct client_obd *client)
831 cfs_time_t time = cfs_time_current();
832 cfs_time_t next_shrink = client->cl_next_shrink_grant;
834 if ((client->cl_import->imp_connect_data.ocd_connect_flags &
835 OBD_CONNECT_GRANT_SHRINK) == 0)
838 if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
839 /* Get the current RPC size directly, instead of going via:
840 * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
841 * Keep comment here so that it can be found by searching. */
842 int brw_size = client->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
844 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
845 client->cl_avail_grant > brw_size)
848 osc_update_next_shrink(client);
853 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
855 struct client_obd *client;
857 list_for_each_entry(client, &item->ti_obd_list, cl_grant_shrink_list) {
858 if (osc_should_shrink_grant(client))
859 osc_shrink_grant(client);
864 static int osc_add_shrink_grant(struct client_obd *client)
868 rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
870 osc_grant_shrink_grant_cb, NULL,
871 &client->cl_grant_shrink_list);
873 CERROR("add grant client %s error %d\n",
874 client->cl_import->imp_obd->obd_name, rc);
877 CDEBUG(D_CACHE, "add grant client %s \n",
878 client->cl_import->imp_obd->obd_name);
879 osc_update_next_shrink(client);
883 static int osc_del_shrink_grant(struct client_obd *client)
885 return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
889 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
892 * ocd_grant is the total grant amount we're expect to hold: if we've
893 * been evicted, it's the new avail_grant amount, cl_dirty_pages will
894 * drop to 0 as inflight RPCs fail out; otherwise, it's avail_grant +
897 * race is tolerable here: if we're evicted, but imp_state already
898 * left EVICTED state, then cl_dirty_pages must be 0 already.
900 spin_lock(&cli->cl_loi_list_lock);
901 if (cli->cl_import->imp_state == LUSTRE_IMP_EVICTED)
902 cli->cl_avail_grant = ocd->ocd_grant;
904 cli->cl_avail_grant = ocd->ocd_grant -
905 (cli->cl_dirty_pages << PAGE_CACHE_SHIFT);
907 if (cli->cl_avail_grant < 0) {
908 CWARN("%s: available grant < 0: avail/ocd/dirty %ld/%u/%ld\n",
909 cli->cl_import->imp_obd->obd_name, cli->cl_avail_grant,
910 ocd->ocd_grant, cli->cl_dirty_pages << PAGE_CACHE_SHIFT);
911 /* workaround for servers which do not have the patch from
913 cli->cl_avail_grant = ocd->ocd_grant;
916 /* determine the appropriate chunk size used by osc_extent. */
917 cli->cl_chunkbits = max_t(int, PAGE_CACHE_SHIFT, ocd->ocd_blocksize);
918 spin_unlock(&cli->cl_loi_list_lock);
920 CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld."
921 "chunk bits: %d.\n", cli->cl_import->imp_obd->obd_name,
922 cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits);
924 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
925 list_empty(&cli->cl_grant_shrink_list))
926 osc_add_shrink_grant(cli);
929 /* We assume that the reason this OSC got a short read is because it read
930 * beyond the end of a stripe file; i.e. lustre is reading a sparse file
931 * via the LOV, and it _knows_ it's reading inside the file, it's just that
932 * this stripe never got written at or beyond this stripe offset yet. */
933 static void handle_short_read(int nob_read, size_t page_count,
934 struct brw_page **pga)
939 /* skip bytes read OK */
940 while (nob_read > 0) {
941 LASSERT (page_count > 0);
943 if (pga[i]->count > nob_read) {
944 /* EOF inside this page */
945 ptr = kmap(pga[i]->pg) +
946 (pga[i]->off & ~CFS_PAGE_MASK);
947 memset(ptr + nob_read, 0, pga[i]->count - nob_read);
954 nob_read -= pga[i]->count;
959 /* zero remaining pages */
960 while (page_count-- > 0) {
961 ptr = kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
962 memset(ptr, 0, pga[i]->count);
968 static int check_write_rcs(struct ptlrpc_request *req,
969 int requested_nob, int niocount,
970 size_t page_count, struct brw_page **pga)
975 remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
976 sizeof(*remote_rcs) *
978 if (remote_rcs == NULL) {
979 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
983 /* return error if any niobuf was in error */
984 for (i = 0; i < niocount; i++) {
985 if ((int)remote_rcs[i] < 0)
986 return(remote_rcs[i]);
988 if (remote_rcs[i] != 0) {
989 CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
990 i, remote_rcs[i], req);
995 if (req->rq_bulk->bd_nob_transferred != requested_nob) {
996 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
997 req->rq_bulk->bd_nob_transferred, requested_nob);
1004 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1006 if (p1->flag != p2->flag) {
1007 unsigned mask = ~(OBD_BRW_FROM_GRANT | OBD_BRW_NOCACHE |
1008 OBD_BRW_SYNC | OBD_BRW_ASYNC |
1009 OBD_BRW_NOQUOTA | OBD_BRW_SOFT_SYNC);
1011 /* warn if we try to combine flags that we don't know to be
1012 * safe to combine */
1013 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
1014 CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
1015 "report this at https://jira.hpdd.intel.com/\n",
1016 p1->flag, p2->flag);
1021 return (p1->off + p1->count == p2->off);
1024 static u32 osc_checksum_bulk(int nob, size_t pg_count,
1025 struct brw_page **pga, int opc,
1026 cksum_type_t cksum_type)
1030 struct cfs_crypto_hash_desc *hdesc;
1031 unsigned int bufsize;
1033 unsigned char cfs_alg = cksum_obd2cfs(cksum_type);
1035 LASSERT(pg_count > 0);
1037 hdesc = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1038 if (IS_ERR(hdesc)) {
1039 CERROR("Unable to initialize checksum hash %s\n",
1040 cfs_crypto_hash_name(cfs_alg));
1041 return PTR_ERR(hdesc);
1044 while (nob > 0 && pg_count > 0) {
1045 unsigned int count = pga[i]->count > nob ? nob : pga[i]->count;
1047 /* corrupt the data before we compute the checksum, to
1048 * simulate an OST->client data error */
1049 if (i == 0 && opc == OST_READ &&
1050 OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
1051 unsigned char *ptr = kmap(pga[i]->pg);
1052 int off = pga[i]->off & ~CFS_PAGE_MASK;
1054 memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob));
1057 cfs_crypto_hash_update_page(hdesc, pga[i]->pg,
1058 pga[i]->off & ~CFS_PAGE_MASK,
1060 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d\n",
1061 (int)(pga[i]->off & ~CFS_PAGE_MASK));
1063 nob -= pga[i]->count;
1068 bufsize = sizeof(cksum);
1069 err = cfs_crypto_hash_final(hdesc, (unsigned char *)&cksum, &bufsize);
1071 /* For sending we only compute the wrong checksum instead
1072 * of corrupting the data so it is still correct on a redo */
1073 if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1079 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
1080 struct lov_stripe_md *lsm, u32 page_count,
1081 struct brw_page **pga,
1082 struct ptlrpc_request **reqp,
1083 struct obd_capa *ocapa, int reserve,
1086 struct ptlrpc_request *req;
1087 struct ptlrpc_bulk_desc *desc;
1088 struct ost_body *body;
1089 struct obd_ioobj *ioobj;
1090 struct niobuf_remote *niobuf;
1091 int niocount, i, requested_nob, opc, rc;
1092 struct osc_brw_async_args *aa;
1093 struct req_capsule *pill;
1094 struct brw_page *pg_prev;
1097 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1098 RETURN(-ENOMEM); /* Recoverable */
1099 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1100 RETURN(-EINVAL); /* Fatal */
1102 if ((cmd & OBD_BRW_WRITE) != 0) {
1104 req = ptlrpc_request_alloc_pool(cli->cl_import,
1105 cli->cl_import->imp_rq_pool,
1106 &RQF_OST_BRW_WRITE);
1109 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1114 for (niocount = i = 1; i < page_count; i++) {
1115 if (!can_merge_pages(pga[i - 1], pga[i]))
1119 pill = &req->rq_pill;
1120 req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1122 req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1123 niocount * sizeof(*niobuf));
1124 osc_set_capa_size(req, &RMF_CAPA1, ocapa);
1126 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1128 ptlrpc_request_free(req);
1131 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1132 ptlrpc_at_set_req_timeout(req);
1133 /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1135 req->rq_no_retry_einprogress = 1;
1137 desc = ptlrpc_prep_bulk_imp(req, page_count,
1138 cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
1139 opc == OST_WRITE ? BULK_GET_SOURCE : BULK_PUT_SINK,
1143 GOTO(out, rc = -ENOMEM);
1144 /* NB request now owns desc and will free it when it gets freed */
1146 body = req_capsule_client_get(pill, &RMF_OST_BODY);
1147 ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1148 niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1149 LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1151 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
1153 obdo_to_ioobj(oa, ioobj);
1154 ioobj->ioo_bufcnt = niocount;
1155 /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
1156 * that might be send for this request. The actual number is decided
1157 * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
1158 * "max - 1" for old client compatibility sending "0", and also so the
1159 * the actual maximum is a power-of-two number, not one less. LU-1431 */
1160 ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
1161 osc_pack_capa(req, body, ocapa);
1162 LASSERT(page_count > 0);
1164 for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1165 struct brw_page *pg = pga[i];
1166 int poff = pg->off & ~CFS_PAGE_MASK;
1168 LASSERT(pg->count > 0);
1169 /* make sure there is no gap in the middle of page array */
1170 LASSERTF(page_count == 1 ||
1171 (ergo(i == 0, poff + pg->count == PAGE_CACHE_SIZE) &&
1172 ergo(i > 0 && i < page_count - 1,
1173 poff == 0 && pg->count == PAGE_CACHE_SIZE) &&
1174 ergo(i == page_count - 1, poff == 0)),
1175 "i: %d/%d pg: %p off: "LPU64", count: %u\n",
1176 i, page_count, pg, pg->off, pg->count);
1177 LASSERTF(i == 0 || pg->off > pg_prev->off,
1178 "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1179 " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1181 pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1182 pg_prev->pg, page_private(pg_prev->pg),
1183 pg_prev->pg->index, pg_prev->off);
1184 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1185 (pg->flag & OBD_BRW_SRVLOCK));
1187 ptlrpc_prep_bulk_page_pin(desc, pg->pg, poff, pg->count);
1188 requested_nob += pg->count;
1190 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1192 niobuf->rnb_len += pg->count;
1194 niobuf->rnb_offset = pg->off;
1195 niobuf->rnb_len = pg->count;
1196 niobuf->rnb_flags = pg->flag;
1201 LASSERTF((void *)(niobuf - niocount) ==
1202 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1203 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1204 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1206 osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1208 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1209 body->oa.o_valid |= OBD_MD_FLFLAGS;
1210 body->oa.o_flags = 0;
1212 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1215 if (osc_should_shrink_grant(cli))
1216 osc_shrink_grant_local(cli, &body->oa);
1218 /* size[REQ_REC_OFF] still sizeof (*body) */
1219 if (opc == OST_WRITE) {
1220 if (cli->cl_checksum &&
1221 !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1222 /* store cl_cksum_type in a local variable since
1223 * it can be changed via lprocfs */
1224 cksum_type_t cksum_type = cli->cl_cksum_type;
1226 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1227 oa->o_flags &= OBD_FL_LOCAL_MASK;
1228 body->oa.o_flags = 0;
1230 body->oa.o_flags |= cksum_type_pack(cksum_type);
1231 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1232 body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1236 CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1238 /* save this in 'oa', too, for later checking */
1239 oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1240 oa->o_flags |= cksum_type_pack(cksum_type);
1242 /* clear out the checksum flag, in case this is a
1243 * resend but cl_checksum is no longer set. b=11238 */
1244 oa->o_valid &= ~OBD_MD_FLCKSUM;
1246 oa->o_cksum = body->oa.o_cksum;
1247 /* 1 RC per niobuf */
1248 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1249 sizeof(__u32) * niocount);
1251 if (cli->cl_checksum &&
1252 !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1253 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1254 body->oa.o_flags = 0;
1255 body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1256 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1259 ptlrpc_request_set_replen(req);
1261 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1262 aa = ptlrpc_req_async_args(req);
1264 aa->aa_requested_nob = requested_nob;
1265 aa->aa_nio_count = niocount;
1266 aa->aa_page_count = page_count;
1270 INIT_LIST_HEAD(&aa->aa_oaps);
1271 if (ocapa && reserve)
1272 aa->aa_ocapa = capa_get(ocapa);
1275 niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1276 CDEBUG(D_RPCTRACE, "brw rpc %p - object "DOSTID" offset %lld<>%lld\n",
1277 req, POSTID(&oa->o_oi), niobuf[0].rnb_offset,
1278 niobuf[niocount - 1].rnb_offset + niobuf[niocount - 1].rnb_len);
1282 ptlrpc_req_finished(req);
1286 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1287 __u32 client_cksum, __u32 server_cksum, int nob,
1288 size_t page_count, struct brw_page **pga,
1289 cksum_type_t client_cksum_type)
1293 cksum_type_t cksum_type;
1295 if (server_cksum == client_cksum) {
1296 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1300 cksum_type = cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1302 new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1305 if (cksum_type != client_cksum_type)
1306 msg = "the server did not use the checksum type specified in "
1307 "the original request - likely a protocol problem";
1308 else if (new_cksum == server_cksum)
1309 msg = "changed on the client after we checksummed it - "
1310 "likely false positive due to mmap IO (bug 11742)";
1311 else if (new_cksum == client_cksum)
1312 msg = "changed in transit before arrival at OST";
1314 msg = "changed in transit AND doesn't match the original - "
1315 "likely false positive due to mmap IO (bug 11742)";
1317 LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inode "DFID
1318 " object "DOSTID" extent ["LPU64"-"LPU64"]\n",
1319 msg, libcfs_nid2str(peer->nid),
1320 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1321 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1322 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1323 POSTID(&oa->o_oi), pga[0]->off,
1324 pga[page_count-1]->off + pga[page_count-1]->count - 1);
1325 CERROR("original client csum %x (type %x), server csum %x (type %x), "
1326 "client csum now %x\n", client_cksum, client_cksum_type,
1327 server_cksum, cksum_type, new_cksum);
1331 /* Note rc enters this function as number of bytes transferred */
1332 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1334 struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1335 const lnet_process_id_t *peer =
1336 &req->rq_import->imp_connection->c_peer;
1337 struct client_obd *cli = aa->aa_cli;
1338 struct ost_body *body;
1339 u32 client_cksum = 0;
1342 if (rc < 0 && rc != -EDQUOT) {
1343 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1347 LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1348 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1350 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1354 /* set/clear over quota flag for a uid/gid */
1355 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1356 body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) {
1357 unsigned int qid[MAXQUOTAS] = { body->oa.o_uid, body->oa.o_gid };
1359 CDEBUG(D_QUOTA, "setdq for [%u %u] with valid "LPX64", flags %x\n",
1360 body->oa.o_uid, body->oa.o_gid, body->oa.o_valid,
1362 osc_quota_setdq(cli, qid, body->oa.o_valid, body->oa.o_flags);
1365 osc_update_grant(cli, body);
1370 if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1371 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1373 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1375 CERROR("Unexpected +ve rc %d\n", rc);
1378 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1380 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1383 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1384 check_write_checksum(&body->oa, peer, client_cksum,
1385 body->oa.o_cksum, aa->aa_requested_nob,
1386 aa->aa_page_count, aa->aa_ppga,
1387 cksum_type_unpack(aa->aa_oa->o_flags)))
1390 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1391 aa->aa_page_count, aa->aa_ppga);
1395 /* The rest of this function executes only for OST_READs */
1397 /* if unwrap_bulk failed, return -EAGAIN to retry */
1398 rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1400 GOTO(out, rc = -EAGAIN);
1402 if (rc > aa->aa_requested_nob) {
1403 CERROR("Unexpected rc %d (%d requested)\n", rc,
1404 aa->aa_requested_nob);
1408 if (rc != req->rq_bulk->bd_nob_transferred) {
1409 CERROR ("Unexpected rc %d (%d transferred)\n",
1410 rc, req->rq_bulk->bd_nob_transferred);
1414 if (rc < aa->aa_requested_nob)
1415 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1417 if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1418 static int cksum_counter;
1419 u32 server_cksum = body->oa.o_cksum;
1422 cksum_type_t cksum_type;
1424 cksum_type = cksum_type_unpack(body->oa.o_valid &OBD_MD_FLFLAGS?
1425 body->oa.o_flags : 0);
1426 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1427 aa->aa_ppga, OST_READ,
1430 if (peer->nid != req->rq_bulk->bd_sender) {
1432 router = libcfs_nid2str(req->rq_bulk->bd_sender);
1435 if (server_cksum != client_cksum) {
1436 LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1437 "%s%s%s inode "DFID" object "DOSTID
1438 " extent ["LPU64"-"LPU64"]\n",
1439 req->rq_import->imp_obd->obd_name,
1440 libcfs_nid2str(peer->nid),
1442 body->oa.o_valid & OBD_MD_FLFID ?
1443 body->oa.o_parent_seq : (__u64)0,
1444 body->oa.o_valid & OBD_MD_FLFID ?
1445 body->oa.o_parent_oid : 0,
1446 body->oa.o_valid & OBD_MD_FLFID ?
1447 body->oa.o_parent_ver : 0,
1448 POSTID(&body->oa.o_oi),
1449 aa->aa_ppga[0]->off,
1450 aa->aa_ppga[aa->aa_page_count-1]->off +
1451 aa->aa_ppga[aa->aa_page_count-1]->count -
1453 CERROR("client %x, server %x, cksum_type %x\n",
1454 client_cksum, server_cksum, cksum_type);
1456 aa->aa_oa->o_cksum = client_cksum;
1460 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1463 } else if (unlikely(client_cksum)) {
1464 static int cksum_missed;
1467 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1468 CERROR("Checksum %u requested from %s but not sent\n",
1469 cksum_missed, libcfs_nid2str(peer->nid));
1475 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
1476 aa->aa_oa, &body->oa);
1481 static int osc_brw_redo_request(struct ptlrpc_request *request,
1482 struct osc_brw_async_args *aa, int rc)
1484 struct ptlrpc_request *new_req;
1485 struct osc_brw_async_args *new_aa;
1486 struct osc_async_page *oap;
1489 DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
1490 "redo for recoverable error %d", rc);
1492 rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1493 OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1494 aa->aa_cli, aa->aa_oa,
1495 NULL /* lsm unused by osc currently */,
1496 aa->aa_page_count, aa->aa_ppga,
1497 &new_req, aa->aa_ocapa, 0, 1);
1501 list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1502 if (oap->oap_request != NULL) {
1503 LASSERTF(request == oap->oap_request,
1504 "request %p != oap_request %p\n",
1505 request, oap->oap_request);
1506 if (oap->oap_interrupted) {
1507 ptlrpc_req_finished(new_req);
1512 /* New request takes over pga and oaps from old request.
1513 * Note that copying a list_head doesn't work, need to move it... */
1515 new_req->rq_interpret_reply = request->rq_interpret_reply;
1516 new_req->rq_async_args = request->rq_async_args;
1517 new_req->rq_commit_cb = request->rq_commit_cb;
1518 /* cap resend delay to the current request timeout, this is similar to
1519 * what ptlrpc does (see after_reply()) */
1520 if (aa->aa_resends > new_req->rq_timeout)
1521 new_req->rq_sent = cfs_time_current_sec() + new_req->rq_timeout;
1523 new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
1524 new_req->rq_generation_set = 1;
1525 new_req->rq_import_generation = request->rq_import_generation;
1527 new_aa = ptlrpc_req_async_args(new_req);
1529 INIT_LIST_HEAD(&new_aa->aa_oaps);
1530 list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
1531 INIT_LIST_HEAD(&new_aa->aa_exts);
1532 list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
1533 new_aa->aa_resends = aa->aa_resends;
1535 list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1536 if (oap->oap_request) {
1537 ptlrpc_req_finished(oap->oap_request);
1538 oap->oap_request = ptlrpc_request_addref(new_req);
1542 new_aa->aa_ocapa = aa->aa_ocapa;
1543 aa->aa_ocapa = NULL;
1545 /* XXX: This code will run into problem if we're going to support
1546 * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
1547 * and wait for all of them to be finished. We should inherit request
1548 * set from old request. */
1549 ptlrpcd_add_req(new_req, PDL_POLICY_SAME, -1);
1551 DEBUG_REQ(D_INFO, new_req, "new request");
1556 * ugh, we want disk allocation on the target to happen in offset order. we'll
1557 * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1558 * fine for our small page arrays and doesn't require allocation. its an
1559 * insertion sort that swaps elements that are strides apart, shrinking the
1560 * stride down until its '1' and the array is sorted.
1562 static void sort_brw_pages(struct brw_page **array, int num)
1565 struct brw_page *tmp;
1569 for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1574 for (i = stride ; i < num ; i++) {
1577 while (j >= stride && array[j - stride]->off > tmp->off) {
1578 array[j] = array[j - stride];
1583 } while (stride > 1);
1586 static void osc_release_ppga(struct brw_page **ppga, size_t count)
1588 LASSERT(ppga != NULL);
1589 OBD_FREE(ppga, sizeof(*ppga) * count);
1592 static int brw_interpret(const struct lu_env *env,
1593 struct ptlrpc_request *req, void *data, int rc)
1595 struct osc_brw_async_args *aa = data;
1596 struct osc_extent *ext;
1597 struct osc_extent *tmp;
1598 struct client_obd *cli = aa->aa_cli;
1601 rc = osc_brw_fini_request(req, rc);
1602 CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
1603 /* When server return -EINPROGRESS, client should always retry
1604 * regardless of the number of times the bulk was resent already. */
1605 if (osc_recoverable_error(rc)) {
1606 if (req->rq_import_generation !=
1607 req->rq_import->imp_generation) {
1608 CDEBUG(D_HA, "%s: resend cross eviction for object: "
1609 ""DOSTID", rc = %d.\n",
1610 req->rq_import->imp_obd->obd_name,
1611 POSTID(&aa->aa_oa->o_oi), rc);
1612 } else if (rc == -EINPROGRESS ||
1613 client_should_resend(aa->aa_resends, aa->aa_cli)) {
1614 rc = osc_brw_redo_request(req, aa, rc);
1616 CERROR("%s: too many resent retries for object: "
1617 ""LPU64":"LPU64", rc = %d.\n",
1618 req->rq_import->imp_obd->obd_name,
1619 POSTID(&aa->aa_oa->o_oi), rc);
1624 else if (rc == -EAGAIN || rc == -EINPROGRESS)
1629 capa_put(aa->aa_ocapa);
1630 aa->aa_ocapa = NULL;
1634 struct obdo *oa = aa->aa_oa;
1635 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
1636 unsigned long valid = 0;
1637 struct cl_object *obj;
1638 struct osc_async_page *last;
1640 last = brw_page2oap(aa->aa_ppga[aa->aa_page_count - 1]);
1641 obj = osc2cl(last->oap_obj);
1643 cl_object_attr_lock(obj);
1644 if (oa->o_valid & OBD_MD_FLBLOCKS) {
1645 attr->cat_blocks = oa->o_blocks;
1646 valid |= CAT_BLOCKS;
1648 if (oa->o_valid & OBD_MD_FLMTIME) {
1649 attr->cat_mtime = oa->o_mtime;
1652 if (oa->o_valid & OBD_MD_FLATIME) {
1653 attr->cat_atime = oa->o_atime;
1656 if (oa->o_valid & OBD_MD_FLCTIME) {
1657 attr->cat_ctime = oa->o_ctime;
1661 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1662 struct lov_oinfo *loi = cl2osc(obj)->oo_oinfo;
1663 loff_t last_off = last->oap_count + last->oap_obj_off +
1666 /* Change file size if this is an out of quota or
1667 * direct IO write and it extends the file size */
1668 if (loi->loi_lvb.lvb_size < last_off) {
1669 attr->cat_size = last_off;
1672 /* Extend KMS if it's not a lockless write */
1673 if (loi->loi_kms < last_off &&
1674 oap2osc_page(last)->ops_srvlock == 0) {
1675 attr->cat_kms = last_off;
1681 cl_object_attr_update(env, obj, attr, valid);
1682 cl_object_attr_unlock(obj);
1684 OBDO_FREE(aa->aa_oa);
1686 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE && rc == 0)
1687 osc_inc_unstable_pages(req);
1689 list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
1690 list_del_init(&ext->oe_link);
1691 osc_extent_finish(env, ext, 1, rc);
1693 LASSERT(list_empty(&aa->aa_exts));
1694 LASSERT(list_empty(&aa->aa_oaps));
1696 cl_req_completion(env, aa->aa_clerq, rc < 0 ? rc :
1697 req->rq_bulk->bd_nob_transferred);
1698 osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1699 ptlrpc_lprocfs_brw(req, req->rq_bulk->bd_nob_transferred);
1701 spin_lock(&cli->cl_loi_list_lock);
1702 /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
1703 * is called so we know whether to go to sync BRWs or wait for more
1704 * RPCs to complete */
1705 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
1706 cli->cl_w_in_flight--;
1708 cli->cl_r_in_flight--;
1709 osc_wake_cache_waiters(cli);
1710 spin_unlock(&cli->cl_loi_list_lock);
1712 osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
1716 static void brw_commit(struct ptlrpc_request *req)
1718 /* If osc_inc_unstable_pages (via osc_extent_finish) races with
1719 * this called via the rq_commit_cb, I need to ensure
1720 * osc_dec_unstable_pages is still called. Otherwise unstable
1721 * pages may be leaked. */
1722 spin_lock(&req->rq_lock);
1723 if (likely(req->rq_unstable)) {
1724 req->rq_unstable = 0;
1725 spin_unlock(&req->rq_lock);
1727 osc_dec_unstable_pages(req);
1729 req->rq_committed = 1;
1730 spin_unlock(&req->rq_lock);
1735 * Build an RPC by the list of extent @ext_list. The caller must ensure
1736 * that the total pages in this list are NOT over max pages per RPC.
1737 * Extents in the list must be in OES_RPC state.
1739 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
1740 struct list_head *ext_list, int cmd, pdl_policy_t pol)
1742 struct ptlrpc_request *req = NULL;
1743 struct osc_extent *ext;
1744 struct brw_page **pga = NULL;
1745 struct osc_brw_async_args *aa = NULL;
1746 struct obdo *oa = NULL;
1747 struct osc_async_page *oap;
1748 struct osc_async_page *tmp;
1749 struct cl_req *clerq = NULL;
1750 enum cl_req_type crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE :
1752 struct cl_req_attr *crattr = NULL;
1753 loff_t starting_offset = OBD_OBJECT_EOF;
1754 loff_t ending_offset = 0;
1758 bool soft_sync = false;
1761 struct list_head rpc_list = LIST_HEAD_INIT(rpc_list);
1762 struct ost_body *body;
1764 LASSERT(!list_empty(ext_list));
1766 /* add pages into rpc_list to build BRW rpc */
1767 list_for_each_entry(ext, ext_list, oe_link) {
1768 LASSERT(ext->oe_state == OES_RPC);
1769 mem_tight |= ext->oe_memalloc;
1770 list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
1772 list_add_tail(&oap->oap_rpc_item, &rpc_list);
1773 if (starting_offset == OBD_OBJECT_EOF ||
1774 starting_offset > oap->oap_obj_off)
1775 starting_offset = oap->oap_obj_off;
1777 LASSERT(oap->oap_page_off == 0);
1778 if (ending_offset < oap->oap_obj_off + oap->oap_count)
1779 ending_offset = oap->oap_obj_off +
1782 LASSERT(oap->oap_page_off + oap->oap_count ==
1787 soft_sync = osc_over_unstable_soft_limit(cli);
1789 mpflag = cfs_memory_pressure_get_and_set();
1791 OBD_ALLOC(crattr, sizeof(*crattr));
1793 GOTO(out, rc = -ENOMEM);
1795 OBD_ALLOC(pga, sizeof(*pga) * page_count);
1797 GOTO(out, rc = -ENOMEM);
1801 GOTO(out, rc = -ENOMEM);
1804 list_for_each_entry(oap, &rpc_list, oap_rpc_item) {
1805 struct cl_page *page = oap2cl_page(oap);
1806 if (clerq == NULL) {
1807 clerq = cl_req_alloc(env, page, crt,
1808 1 /* only 1-object rpcs for now */);
1810 GOTO(out, rc = PTR_ERR(clerq));
1813 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
1815 oap->oap_brw_flags |= OBD_BRW_SOFT_SYNC;
1816 pga[i] = &oap->oap_brw_page;
1817 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
1818 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
1819 pga[i]->pg, page_index(oap->oap_page), oap,
1822 cl_req_page_add(env, clerq, page);
1825 /* always get the data for the obdo for the rpc */
1826 LASSERT(clerq != NULL);
1827 crattr->cra_oa = oa;
1828 cl_req_attr_set(env, clerq, crattr, ~0ULL);
1830 rc = cl_req_prep(env, clerq);
1832 CERROR("cl_req_prep failed: %d\n", rc);
1836 sort_brw_pages(pga, page_count);
1837 rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
1838 pga, &req, crattr->cra_capa, 1, 0);
1840 CERROR("prep_req failed: %d\n", rc);
1844 req->rq_commit_cb = brw_commit;
1845 req->rq_interpret_reply = brw_interpret;
1848 req->rq_memalloc = 1;
1850 /* Need to update the timestamps after the request is built in case
1851 * we race with setattr (locally or in queue at OST). If OST gets
1852 * later setattr before earlier BRW (as determined by the request xid),
1853 * the OST will not use BRW timestamps. Sadly, there is no obvious
1854 * way to do this in a single call. bug 10150 */
1855 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
1856 crattr->cra_oa = &body->oa;
1857 cl_req_attr_set(env, clerq, crattr,
1858 OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME);
1860 lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid);
1862 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1863 aa = ptlrpc_req_async_args(req);
1864 INIT_LIST_HEAD(&aa->aa_oaps);
1865 list_splice_init(&rpc_list, &aa->aa_oaps);
1866 INIT_LIST_HEAD(&aa->aa_exts);
1867 list_splice_init(ext_list, &aa->aa_exts);
1868 aa->aa_clerq = clerq;
1870 /* queued sync pages can be torn down while the pages
1871 * were between the pending list and the rpc */
1873 list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1874 /* only one oap gets a request reference */
1877 if (oap->oap_interrupted && !req->rq_intr) {
1878 CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
1880 ptlrpc_mark_interrupted(req);
1884 tmp->oap_request = ptlrpc_request_addref(req);
1886 spin_lock(&cli->cl_loi_list_lock);
1887 starting_offset >>= PAGE_CACHE_SHIFT;
1888 if (cmd == OBD_BRW_READ) {
1889 cli->cl_r_in_flight++;
1890 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
1891 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
1892 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
1893 starting_offset + 1);
1895 cli->cl_w_in_flight++;
1896 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
1897 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
1898 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
1899 starting_offset + 1);
1901 spin_unlock(&cli->cl_loi_list_lock);
1903 DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %ur/%uw in flight",
1904 page_count, aa, cli->cl_r_in_flight,
1905 cli->cl_w_in_flight);
1907 /* XXX: Maybe the caller can check the RPC bulk descriptor to
1908 * see which CPU/NUMA node the majority of pages were allocated
1909 * on, and try to assign the async RPC to the CPU core
1910 * (PDL_POLICY_PREFERRED) to reduce cross-CPU memory traffic.
1912 * But on the other hand, we expect that multiple ptlrpcd
1913 * threads and the initial write sponsor can run in parallel,
1914 * especially when data checksum is enabled, which is CPU-bound
1915 * operation and single ptlrpcd thread cannot process in time.
1916 * So more ptlrpcd threads sharing BRW load
1917 * (with PDL_POLICY_ROUND) seems better.
1919 ptlrpcd_add_req(req, pol, -1);
1925 cfs_memory_pressure_restore(mpflag);
1927 if (crattr != NULL) {
1928 capa_put(crattr->cra_capa);
1929 OBD_FREE(crattr, sizeof(*crattr));
1933 LASSERT(req == NULL);
1938 OBD_FREE(pga, sizeof(*pga) * page_count);
1939 /* this should happen rarely and is pretty bad, it makes the
1940 * pending list not follow the dirty order */
1941 while (!list_empty(ext_list)) {
1942 ext = list_entry(ext_list->next, struct osc_extent,
1944 list_del_init(&ext->oe_link);
1945 osc_extent_finish(env, ext, 0, rc);
1947 if (clerq && !IS_ERR(clerq))
1948 cl_req_completion(env, clerq, rc);
1953 static int osc_set_lock_data_with_check(struct ldlm_lock *lock,
1954 struct ldlm_enqueue_info *einfo)
1956 void *data = einfo->ei_cbdata;
1959 LASSERT(lock != NULL);
1960 LASSERT(lock->l_blocking_ast == einfo->ei_cb_bl);
1961 LASSERT(lock->l_resource->lr_type == einfo->ei_type);
1962 LASSERT(lock->l_completion_ast == einfo->ei_cb_cp);
1963 LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl);
1965 lock_res_and_lock(lock);
1967 if (lock->l_ast_data == NULL)
1968 lock->l_ast_data = data;
1969 if (lock->l_ast_data == data)
1972 unlock_res_and_lock(lock);
1977 static int osc_set_data_with_check(struct lustre_handle *lockh,
1978 struct ldlm_enqueue_info *einfo)
1980 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
1984 set = osc_set_lock_data_with_check(lock, einfo);
1985 LDLM_LOCK_PUT(lock);
1987 CERROR("lockh %p, data %p - client evicted?\n",
1988 lockh, einfo->ei_cbdata);
1992 static int osc_enqueue_fini(struct ptlrpc_request *req,
1993 osc_enqueue_upcall_f upcall, void *cookie,
1994 struct lustre_handle *lockh, ldlm_mode_t mode,
1995 __u64 *flags, int agl, int errcode)
1997 bool intent = *flags & LDLM_FL_HAS_INTENT;
2001 /* The request was created before ldlm_cli_enqueue call. */
2002 if (intent && errcode == ELDLM_LOCK_ABORTED) {
2003 struct ldlm_reply *rep;
2005 rep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
2006 LASSERT(rep != NULL);
2008 rep->lock_policy_res1 =
2009 ptlrpc_status_ntoh(rep->lock_policy_res1);
2010 if (rep->lock_policy_res1)
2011 errcode = rep->lock_policy_res1;
2013 *flags |= LDLM_FL_LVB_READY;
2014 } else if (errcode == ELDLM_OK) {
2015 *flags |= LDLM_FL_LVB_READY;
2018 /* Call the update callback. */
2019 rc = (*upcall)(cookie, lockh, errcode);
2021 /* release the reference taken in ldlm_cli_enqueue() */
2022 if (errcode == ELDLM_LOCK_MATCHED)
2024 if (errcode == ELDLM_OK && lustre_handle_is_used(lockh))
2025 ldlm_lock_decref(lockh, mode);
2030 static int osc_enqueue_interpret(const struct lu_env *env,
2031 struct ptlrpc_request *req,
2032 struct osc_enqueue_args *aa, int rc)
2034 struct ldlm_lock *lock;
2035 struct lustre_handle *lockh = &aa->oa_lockh;
2036 ldlm_mode_t mode = aa->oa_mode;
2037 struct ost_lvb *lvb = aa->oa_lvb;
2038 __u32 lvb_len = sizeof(*lvb);
2043 /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2045 lock = ldlm_handle2lock(lockh);
2046 LASSERTF(lock != NULL,
2047 "lockh "LPX64", req %p, aa %p - client evicted?\n",
2048 lockh->cookie, req, aa);
2050 /* Take an additional reference so that a blocking AST that
2051 * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2052 * to arrive after an upcall has been executed by
2053 * osc_enqueue_fini(). */
2054 ldlm_lock_addref(lockh, mode);
2056 /* Let cl_lock_state_wait fail with -ERESTARTSYS to unuse sublocks. */
2057 OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_HANG, 2);
2059 /* Let CP AST to grant the lock first. */
2060 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
2063 LASSERT(aa->oa_lvb == NULL);
2064 LASSERT(aa->oa_flags == NULL);
2065 aa->oa_flags = &flags;
2068 /* Complete obtaining the lock procedure. */
2069 rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_type, 1,
2070 aa->oa_mode, aa->oa_flags, lvb, lvb_len,
2072 /* Complete osc stuff. */
2073 rc = osc_enqueue_fini(req, aa->oa_upcall, aa->oa_cookie, lockh, mode,
2074 aa->oa_flags, aa->oa_agl, rc);
2076 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
2078 ldlm_lock_decref(lockh, mode);
2079 LDLM_LOCK_PUT(lock);
2083 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
2085 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2086 * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2087 * other synchronous requests, however keeping some locks and trying to obtain
2088 * others may take a considerable amount of time in a case of ost failure; and
2089 * when other sync requests do not get released lock from a client, the client
2090 * is evicted from the cluster -- such scenarious make the life difficult, so
2091 * release locks just after they are obtained. */
2092 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2093 __u64 *flags, ldlm_policy_data_t *policy,
2094 struct ost_lvb *lvb, int kms_valid,
2095 osc_enqueue_upcall_f upcall, void *cookie,
2096 struct ldlm_enqueue_info *einfo,
2097 struct ptlrpc_request_set *rqset, int async, int agl)
2099 struct obd_device *obd = exp->exp_obd;
2100 struct lustre_handle lockh = { 0 };
2101 struct ptlrpc_request *req = NULL;
2102 int intent = *flags & LDLM_FL_HAS_INTENT;
2103 __u64 match_lvb = agl ? 0 : LDLM_FL_LVB_READY;
2108 /* Filesystem lock extents are extended to page boundaries so that
2109 * dealing with the page cache is a little smoother. */
2110 policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2111 policy->l_extent.end |= ~CFS_PAGE_MASK;
2114 * kms is not valid when either object is completely fresh (so that no
2115 * locks are cached), or object was evicted. In the latter case cached
2116 * lock cannot be used, because it would prime inode state with
2117 * potentially stale LVB.
2122 /* Next, search for already existing extent locks that will cover us */
2123 /* If we're trying to read, we also search for an existing PW lock. The
2124 * VFS and page cache already protect us locally, so lots of readers/
2125 * writers can share a single PW lock.
2127 * There are problems with conversion deadlocks, so instead of
2128 * converting a read lock to a write lock, we'll just enqueue a new
2131 * At some point we should cancel the read lock instead of making them
2132 * send us a blocking callback, but there are problems with canceling
2133 * locks out from other users right now, too. */
2134 mode = einfo->ei_mode;
2135 if (einfo->ei_mode == LCK_PR)
2137 mode = ldlm_lock_match(obd->obd_namespace, *flags | match_lvb, res_id,
2138 einfo->ei_type, policy, mode, &lockh, 0);
2140 struct ldlm_lock *matched;
2142 if (*flags & LDLM_FL_TEST_LOCK)
2145 matched = ldlm_handle2lock(&lockh);
2147 /* AGL enqueues DLM locks speculatively. Therefore if
2148 * it already exists a DLM lock, it wll just inform the
2149 * caller to cancel the AGL process for this stripe. */
2150 ldlm_lock_decref(&lockh, mode);
2151 LDLM_LOCK_PUT(matched);
2153 } else if (osc_set_lock_data_with_check(matched, einfo)) {
2154 *flags |= LDLM_FL_LVB_READY;
2156 /* We already have a lock, and it's referenced. */
2157 (*upcall)(cookie, &lockh, ELDLM_LOCK_MATCHED);
2159 ldlm_lock_decref(&lockh, mode);
2160 LDLM_LOCK_PUT(matched);
2163 ldlm_lock_decref(&lockh, mode);
2164 LDLM_LOCK_PUT(matched);
2169 if (*flags & LDLM_FL_TEST_LOCK)
2173 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2174 &RQF_LDLM_ENQUEUE_LVB);
2178 rc = ptlrpc_request_pack(req, LUSTRE_DLM_VERSION, LDLM_ENQUEUE);
2180 ptlrpc_request_free(req);
2184 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
2186 ptlrpc_request_set_replen(req);
2189 /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2190 *flags &= ~LDLM_FL_BLOCK_GRANTED;
2192 rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2193 sizeof(*lvb), LVB_T_OST, &lockh, async);
2196 struct osc_enqueue_args *aa;
2197 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2198 aa = ptlrpc_req_async_args(req);
2200 aa->oa_mode = einfo->ei_mode;
2201 aa->oa_type = einfo->ei_type;
2202 lustre_handle_copy(&aa->oa_lockh, &lockh);
2203 aa->oa_upcall = upcall;
2204 aa->oa_cookie = cookie;
2207 aa->oa_flags = flags;
2210 /* AGL is essentially to enqueue an DLM lock
2211 * in advance, so we don't care about the
2212 * result of AGL enqueue. */
2214 aa->oa_flags = NULL;
2217 req->rq_interpret_reply =
2218 (ptlrpc_interpterer_t)osc_enqueue_interpret;
2219 if (rqset == PTLRPCD_SET)
2220 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
2222 ptlrpc_set_add_req(rqset, req);
2223 } else if (intent) {
2224 ptlrpc_req_finished(req);
2229 rc = osc_enqueue_fini(req, upcall, cookie, &lockh, einfo->ei_mode,
2232 ptlrpc_req_finished(req);
2237 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2238 __u32 type, ldlm_policy_data_t *policy, __u32 mode,
2239 __u64 *flags, void *data, struct lustre_handle *lockh,
2242 struct obd_device *obd = exp->exp_obd;
2243 __u64 lflags = *flags;
2247 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2250 /* Filesystem lock extents are extended to page boundaries so that
2251 * dealing with the page cache is a little smoother */
2252 policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2253 policy->l_extent.end |= ~CFS_PAGE_MASK;
2255 /* Next, search for already existing extent locks that will cover us */
2256 /* If we're trying to read, we also search for an existing PW lock. The
2257 * VFS and page cache already protect us locally, so lots of readers/
2258 * writers can share a single PW lock. */
2262 rc = ldlm_lock_match(obd->obd_namespace, lflags,
2263 res_id, type, policy, rc, lockh, unref);
2266 if (!osc_set_data_with_check(lockh, data)) {
2267 if (!(lflags & LDLM_FL_TEST_LOCK))
2268 ldlm_lock_decref(lockh, rc);
2272 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
2273 ldlm_lock_addref(lockh, LCK_PR);
2274 ldlm_lock_decref(lockh, LCK_PW);
2281 int osc_cancel_base(struct lustre_handle *lockh, __u32 mode)
2285 if (unlikely(mode == LCK_GROUP))
2286 ldlm_lock_decref_and_cancel(lockh, mode);
2288 ldlm_lock_decref(lockh, mode);
2293 static int osc_statfs_interpret(const struct lu_env *env,
2294 struct ptlrpc_request *req,
2295 struct osc_async_args *aa, int rc)
2297 struct obd_statfs *msfs;
2301 /* The request has in fact never been sent
2302 * due to issues at a higher level (LOV).
2303 * Exit immediately since the caller is
2304 * aware of the problem and takes care
2305 * of the clean up */
2308 if ((rc == -ENOTCONN || rc == -EAGAIN) &&
2309 (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
2315 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2317 GOTO(out, rc = -EPROTO);
2320 *aa->aa_oi->oi_osfs = *msfs;
2322 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2326 static int osc_statfs_async(struct obd_export *exp,
2327 struct obd_info *oinfo, __u64 max_age,
2328 struct ptlrpc_request_set *rqset)
2330 struct obd_device *obd = class_exp2obd(exp);
2331 struct ptlrpc_request *req;
2332 struct osc_async_args *aa;
2336 /* We could possibly pass max_age in the request (as an absolute
2337 * timestamp or a "seconds.usec ago") so the target can avoid doing
2338 * extra calls into the filesystem if that isn't necessary (e.g.
2339 * during mount that would help a bit). Having relative timestamps
2340 * is not so great if request processing is slow, while absolute
2341 * timestamps are not ideal because they need time synchronization. */
2342 req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
2346 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2348 ptlrpc_request_free(req);
2351 ptlrpc_request_set_replen(req);
2352 req->rq_request_portal = OST_CREATE_PORTAL;
2353 ptlrpc_at_set_req_timeout(req);
2355 if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
2356 /* procfs requests not want stat in wait for avoid deadlock */
2357 req->rq_no_resend = 1;
2358 req->rq_no_delay = 1;
2361 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
2362 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2363 aa = ptlrpc_req_async_args(req);
2366 ptlrpc_set_add_req(rqset, req);
2370 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
2371 struct obd_statfs *osfs, __u64 max_age, __u32 flags)
2373 struct obd_device *obd = class_exp2obd(exp);
2374 struct obd_statfs *msfs;
2375 struct ptlrpc_request *req;
2376 struct obd_import *imp = NULL;
2380 /*Since the request might also come from lprocfs, so we need
2381 *sync this with client_disconnect_export Bug15684*/
2382 down_read(&obd->u.cli.cl_sem);
2383 if (obd->u.cli.cl_import)
2384 imp = class_import_get(obd->u.cli.cl_import);
2385 up_read(&obd->u.cli.cl_sem);
2389 /* We could possibly pass max_age in the request (as an absolute
2390 * timestamp or a "seconds.usec ago") so the target can avoid doing
2391 * extra calls into the filesystem if that isn't necessary (e.g.
2392 * during mount that would help a bit). Having relative timestamps
2393 * is not so great if request processing is slow, while absolute
2394 * timestamps are not ideal because they need time synchronization. */
2395 req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
2397 class_import_put(imp);
2402 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2404 ptlrpc_request_free(req);
2407 ptlrpc_request_set_replen(req);
2408 req->rq_request_portal = OST_CREATE_PORTAL;
2409 ptlrpc_at_set_req_timeout(req);
2411 if (flags & OBD_STATFS_NODELAY) {
2412 /* procfs requests not want stat in wait for avoid deadlock */
2413 req->rq_no_resend = 1;
2414 req->rq_no_delay = 1;
2417 rc = ptlrpc_queue_wait(req);
2421 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2423 GOTO(out, rc = -EPROTO);
2430 ptlrpc_req_finished(req);
2434 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2435 void *karg, void *uarg)
2437 struct obd_device *obd = exp->exp_obd;
2438 struct obd_ioctl_data *data = karg;
2442 if (!try_module_get(THIS_MODULE)) {
2443 CERROR("%s: cannot get module '%s'\n", obd->obd_name,
2444 module_name(THIS_MODULE));
2448 case OBD_IOC_CLIENT_RECOVER:
2449 err = ptlrpc_recover_import(obd->u.cli.cl_import,
2450 data->ioc_inlbuf1, 0);
2454 case IOC_OSC_SET_ACTIVE:
2455 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
2458 case OBD_IOC_POLL_QUOTACHECK:
2459 err = osc_quota_poll_check(exp, (struct if_quotacheck *)karg);
2461 case OBD_IOC_PING_TARGET:
2462 err = ptlrpc_obd_ping(obd);
2465 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
2466 cmd, current_comm());
2467 GOTO(out, err = -ENOTTY);
2470 module_put(THIS_MODULE);
2474 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
2475 u32 keylen, void *key,
2476 u32 vallen, void *val,
2477 struct ptlrpc_request_set *set)
2479 struct ptlrpc_request *req;
2480 struct obd_device *obd = exp->exp_obd;
2481 struct obd_import *imp = class_exp2cliimp(exp);
2486 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
2488 if (KEY_IS(KEY_CHECKSUM)) {
2489 if (vallen != sizeof(int))
2491 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
2495 if (KEY_IS(KEY_SPTLRPC_CONF)) {
2496 sptlrpc_conf_client_adapt(obd);
2500 if (KEY_IS(KEY_FLUSH_CTX)) {
2501 sptlrpc_import_flush_my_ctx(imp);
2505 if (KEY_IS(KEY_CACHE_SET)) {
2506 struct client_obd *cli = &obd->u.cli;
2508 LASSERT(cli->cl_cache == NULL); /* only once */
2509 cli->cl_cache = (struct cl_client_cache *)val;
2510 cl_cache_incref(cli->cl_cache);
2511 cli->cl_lru_left = &cli->cl_cache->ccc_lru_left;
2513 /* add this osc into entity list */
2514 LASSERT(list_empty(&cli->cl_lru_osc));
2515 spin_lock(&cli->cl_cache->ccc_lru_lock);
2516 list_add(&cli->cl_lru_osc, &cli->cl_cache->ccc_lru);
2517 spin_unlock(&cli->cl_cache->ccc_lru_lock);
2522 if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
2523 struct client_obd *cli = &obd->u.cli;
2524 long nr = atomic_long_read(&cli->cl_lru_in_list) >> 1;
2525 long target = *(long *)val;
2527 nr = osc_lru_shrink(env, cli, min(nr, target), true);
2532 if (!set && !KEY_IS(KEY_GRANT_SHRINK))
2535 /* We pass all other commands directly to OST. Since nobody calls osc
2536 methods directly and everybody is supposed to go through LOV, we
2537 assume lov checked invalid values for us.
2538 The only recognised values so far are evict_by_nid and mds_conn.
2539 Even if something bad goes through, we'd get a -EINVAL from OST
2542 req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ?
2543 &RQF_OST_SET_GRANT_INFO :
2548 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
2549 RCL_CLIENT, keylen);
2550 if (!KEY_IS(KEY_GRANT_SHRINK))
2551 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
2552 RCL_CLIENT, vallen);
2553 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
2555 ptlrpc_request_free(req);
2559 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
2560 memcpy(tmp, key, keylen);
2561 tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ?
2564 memcpy(tmp, val, vallen);
2566 if (KEY_IS(KEY_GRANT_SHRINK)) {
2567 struct osc_grant_args *aa;
2570 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2571 aa = ptlrpc_req_async_args(req);
2574 ptlrpc_req_finished(req);
2577 *oa = ((struct ost_body *)val)->oa;
2579 req->rq_interpret_reply = osc_shrink_grant_interpret;
2582 ptlrpc_request_set_replen(req);
2583 if (!KEY_IS(KEY_GRANT_SHRINK)) {
2584 LASSERT(set != NULL);
2585 ptlrpc_set_add_req(set, req);
2586 ptlrpc_check_set(NULL, set);
2588 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
2593 static int osc_reconnect(const struct lu_env *env,
2594 struct obd_export *exp, struct obd_device *obd,
2595 struct obd_uuid *cluuid,
2596 struct obd_connect_data *data,
2599 struct client_obd *cli = &obd->u.cli;
2601 if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
2604 spin_lock(&cli->cl_loi_list_lock);
2605 data->ocd_grant = (cli->cl_avail_grant +
2606 (cli->cl_dirty_pages << PAGE_CACHE_SHIFT)) ?:
2607 2 * cli_brw_size(obd);
2608 lost_grant = cli->cl_lost_grant;
2609 cli->cl_lost_grant = 0;
2610 spin_unlock(&cli->cl_loi_list_lock);
2612 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
2613 " ocd_grant: %d, lost: %ld.\n", data->ocd_connect_flags,
2614 data->ocd_version, data->ocd_grant, lost_grant);
2620 static int osc_disconnect(struct obd_export *exp)
2622 struct obd_device *obd = class_exp2obd(exp);
2625 rc = client_disconnect_export(exp);
2627 * Initially we put del_shrink_grant before disconnect_export, but it
2628 * causes the following problem if setup (connect) and cleanup
2629 * (disconnect) are tangled together.
2630 * connect p1 disconnect p2
2631 * ptlrpc_connect_import
2632 * ............... class_manual_cleanup
2635 * ptlrpc_connect_interrupt
2637 * add this client to shrink list
2639 * Bang! pinger trigger the shrink.
2640 * So the osc should be disconnected from the shrink list, after we
2641 * are sure the import has been destroyed. BUG18662
2643 if (obd->u.cli.cl_import == NULL)
2644 osc_del_shrink_grant(&obd->u.cli);
2648 static int osc_import_event(struct obd_device *obd,
2649 struct obd_import *imp,
2650 enum obd_import_event event)
2652 struct client_obd *cli;
2656 LASSERT(imp->imp_obd == obd);
2659 case IMP_EVENT_DISCON: {
2661 spin_lock(&cli->cl_loi_list_lock);
2662 cli->cl_avail_grant = 0;
2663 cli->cl_lost_grant = 0;
2664 spin_unlock(&cli->cl_loi_list_lock);
2667 case IMP_EVENT_INACTIVE: {
2668 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
2671 case IMP_EVENT_INVALIDATE: {
2672 struct ldlm_namespace *ns = obd->obd_namespace;
2676 env = cl_env_get(&refcheck);
2680 /* all pages go to failing rpcs due to the invalid
2682 osc_io_unplug(env, cli, NULL, PDL_POLICY_ROUND);
2684 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
2685 cl_env_put(env, &refcheck);
2690 case IMP_EVENT_ACTIVE: {
2691 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
2694 case IMP_EVENT_OCD: {
2695 struct obd_connect_data *ocd = &imp->imp_connect_data;
2697 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
2698 osc_init_grant(&obd->u.cli, ocd);
2701 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
2702 imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
2704 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
2707 case IMP_EVENT_DEACTIVATE: {
2708 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE, NULL);
2711 case IMP_EVENT_ACTIVATE: {
2712 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE, NULL);
2716 CERROR("Unknown import event %d\n", event);
2723 * Determine whether the lock can be canceled before replaying the lock
2724 * during recovery, see bug16774 for detailed information.
2726 * \retval zero the lock can't be canceled
2727 * \retval other ok to cancel
2729 static int osc_cancel_weight(struct ldlm_lock *lock)
2732 * Cancel all unused and granted extent lock.
2734 if (lock->l_resource->lr_type == LDLM_EXTENT &&
2735 lock->l_granted_mode == lock->l_req_mode &&
2736 osc_ldlm_weigh_ast(lock) == 0)
2742 static int brw_queue_work(const struct lu_env *env, void *data)
2744 struct client_obd *cli = data;
2746 CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
2748 osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
2752 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
2754 struct client_obd *cli = &obd->u.cli;
2755 struct obd_type *type;
2760 rc = ptlrpcd_addref();
2764 rc = client_obd_setup(obd, lcfg);
2766 GOTO(out_ptlrpcd, rc);
2768 handler = ptlrpcd_alloc_work(cli->cl_import, brw_queue_work, cli);
2769 if (IS_ERR(handler))
2770 GOTO(out_client_setup, rc = PTR_ERR(handler));
2771 cli->cl_writeback_work = handler;
2773 handler = ptlrpcd_alloc_work(cli->cl_import, lru_queue_work, cli);
2774 if (IS_ERR(handler))
2775 GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
2776 cli->cl_lru_work = handler;
2778 rc = osc_quota_setup(obd);
2780 GOTO(out_ptlrpcd_work, rc);
2782 cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
2784 #ifdef CONFIG_PROC_FS
2785 obd->obd_vars = lprocfs_osc_obd_vars;
2787 /* If this is true then both client (osc) and server (osp) are on the
2788 * same node. The osp layer if loaded first will register the osc proc
2789 * directory. In that case this obd_device will be attached its proc
2790 * tree to type->typ_procsym instead of obd->obd_type->typ_procroot. */
2791 type = class_search_type(LUSTRE_OSP_NAME);
2792 if (type && type->typ_procsym) {
2793 obd->obd_proc_entry = lprocfs_register(obd->obd_name,
2795 obd->obd_vars, obd);
2796 if (IS_ERR(obd->obd_proc_entry)) {
2797 rc = PTR_ERR(obd->obd_proc_entry);
2798 CERROR("error %d setting up lprocfs for %s\n", rc,
2800 obd->obd_proc_entry = NULL;
2803 rc = lprocfs_obd_setup(obd);
2806 /* If the basic OSC proc tree construction succeeded then
2807 * lets do the rest. */
2809 lproc_osc_attach_seqstat(obd);
2810 sptlrpc_lprocfs_cliobd_attach(obd);
2811 ptlrpc_lprocfs_register_obd(obd);
2814 /* We need to allocate a few requests more, because
2815 * brw_interpret tries to create new requests before freeing
2816 * previous ones, Ideally we want to have 2x max_rpcs_in_flight
2817 * reserved, but I'm afraid that might be too much wasted RAM
2818 * in fact, so 2 is just my guess and still should work. */
2819 cli->cl_import->imp_rq_pool =
2820 ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
2822 ptlrpc_add_rqs_to_pool);
2824 INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
2825 ns_register_cancel(obd->obd_namespace, osc_cancel_weight);
2829 if (cli->cl_writeback_work != NULL) {
2830 ptlrpcd_destroy_work(cli->cl_writeback_work);
2831 cli->cl_writeback_work = NULL;
2833 if (cli->cl_lru_work != NULL) {
2834 ptlrpcd_destroy_work(cli->cl_lru_work);
2835 cli->cl_lru_work = NULL;
2838 client_obd_cleanup(obd);
2844 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
2850 case OBD_CLEANUP_EARLY: {
2851 struct obd_import *imp;
2852 imp = obd->u.cli.cl_import;
2853 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
2854 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
2855 ptlrpc_deactivate_import(imp);
2856 spin_lock(&imp->imp_lock);
2857 imp->imp_pingable = 0;
2858 spin_unlock(&imp->imp_lock);
2861 case OBD_CLEANUP_EXPORTS: {
2862 struct client_obd *cli = &obd->u.cli;
2864 * for echo client, export may be on zombie list, wait for
2865 * zombie thread to cull it, because cli.cl_import will be
2866 * cleared in client_disconnect_export():
2867 * class_export_destroy() -> obd_cleanup() ->
2868 * echo_device_free() -> echo_client_cleanup() ->
2869 * obd_disconnect() -> osc_disconnect() ->
2870 * client_disconnect_export()
2872 obd_zombie_barrier();
2873 if (cli->cl_writeback_work) {
2874 ptlrpcd_destroy_work(cli->cl_writeback_work);
2875 cli->cl_writeback_work = NULL;
2877 if (cli->cl_lru_work) {
2878 ptlrpcd_destroy_work(cli->cl_lru_work);
2879 cli->cl_lru_work = NULL;
2881 obd_cleanup_client_import(obd);
2882 ptlrpc_lprocfs_unregister_obd(obd);
2883 lprocfs_obd_cleanup(obd);
2890 int osc_cleanup(struct obd_device *obd)
2892 struct client_obd *cli = &obd->u.cli;
2898 if (cli->cl_cache != NULL) {
2899 LASSERT(atomic_read(&cli->cl_cache->ccc_users) > 0);
2900 spin_lock(&cli->cl_cache->ccc_lru_lock);
2901 list_del_init(&cli->cl_lru_osc);
2902 spin_unlock(&cli->cl_cache->ccc_lru_lock);
2903 cli->cl_lru_left = NULL;
2904 cl_cache_decref(cli->cl_cache);
2905 cli->cl_cache = NULL;
2908 /* free memory of osc quota cache */
2909 osc_quota_cleanup(obd);
2911 rc = client_obd_cleanup(obd);
2917 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
2919 int rc = class_process_proc_param(PARAM_OSC, obd->obd_vars, lcfg, obd);
2920 return rc > 0 ? 0: rc;
2923 static int osc_process_config(struct obd_device *obd, size_t len, void *buf)
2925 return osc_process_config_base(obd, buf);
2928 static struct obd_ops osc_obd_ops = {
2929 .o_owner = THIS_MODULE,
2930 .o_setup = osc_setup,
2931 .o_precleanup = osc_precleanup,
2932 .o_cleanup = osc_cleanup,
2933 .o_add_conn = client_import_add_conn,
2934 .o_del_conn = client_import_del_conn,
2935 .o_connect = client_connect_import,
2936 .o_reconnect = osc_reconnect,
2937 .o_disconnect = osc_disconnect,
2938 .o_statfs = osc_statfs,
2939 .o_statfs_async = osc_statfs_async,
2940 .o_create = osc_create,
2941 .o_destroy = osc_destroy,
2942 .o_getattr = osc_getattr,
2943 .o_setattr = osc_setattr,
2944 .o_setattr_async = osc_setattr_async,
2945 .o_iocontrol = osc_iocontrol,
2946 .o_set_info_async = osc_set_info_async,
2947 .o_import_event = osc_import_event,
2948 .o_process_config = osc_process_config,
2949 .o_quotactl = osc_quotactl,
2950 .o_quotacheck = osc_quotacheck,
2953 static int __init osc_init(void)
2955 bool enable_proc = true;
2956 struct obd_type *type;
2960 /* print an address of _any_ initialized kernel symbol from this
2961 * module, to allow debugging with gdb that doesn't support data
2962 * symbols from modules.*/
2963 CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
2965 rc = lu_kmem_init(osc_caches);
2969 type = class_search_type(LUSTRE_OSP_NAME);
2970 if (type != NULL && type->typ_procsym != NULL)
2971 enable_proc = false;
2973 rc = class_register_type(&osc_obd_ops, NULL, enable_proc, NULL,
2974 LUSTRE_OSC_NAME, &osc_device_type);
2976 lu_kmem_fini(osc_caches);
2983 static void /*__exit*/ osc_exit(void)
2985 class_unregister_type(LUSTRE_OSC_NAME);
2986 lu_kmem_fini(osc_caches);
2989 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
2990 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
2991 MODULE_LICENSE("GPL");
2993 cfs_module(osc, LUSTRE_VERSION_STRING, osc_init, osc_exit);