4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.gnu.org/licenses/gpl-2.0.html
23 * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
24 * Use is subject to license terms.
26 * Copyright (c) 2011, 2016, Intel Corporation.
29 * This file is part of Lustre, http://www.lustre.org/
30 * Lustre is a trademark of Sun Microsystems, Inc.
33 #define DEBUG_SUBSYSTEM S_OSC
35 #include <libcfs/libcfs.h>
37 #include <lustre/lustre_user.h>
39 #include <lprocfs_status.h>
40 #include <lustre_debug.h>
41 #include <lustre_dlm.h>
42 #include <lustre_fid.h>
43 #include <lustre_ha.h>
44 #include <uapi/linux/lustre_ioctl.h>
45 #include <lustre_net.h>
46 #include <lustre_obdo.h>
47 #include <uapi/linux/lustre_param.h>
49 #include <obd_cksum.h>
50 #include <obd_class.h>
52 #include "osc_cl_internal.h"
53 #include "osc_internal.h"
55 atomic_t osc_pool_req_count;
56 unsigned int osc_reqpool_maxreqcount;
57 struct ptlrpc_request_pool *osc_rq_pool;
59 /* max memory used for request pool, unit is MB */
60 static unsigned int osc_reqpool_mem_max = 5;
61 module_param(osc_reqpool_mem_max, uint, 0444);
63 struct osc_brw_async_args {
69 struct brw_page **aa_ppga;
70 struct client_obd *aa_cli;
71 struct list_head aa_oaps;
72 struct list_head aa_exts;
75 #define osc_grant_args osc_brw_async_args
77 struct osc_setattr_args {
79 obd_enqueue_update_f sa_upcall;
83 struct osc_fsync_args {
84 struct osc_object *fa_obj;
86 obd_enqueue_update_f fa_upcall;
90 struct osc_ladvise_args {
92 obd_enqueue_update_f la_upcall;
96 struct osc_enqueue_args {
97 struct obd_export *oa_exp;
98 enum ldlm_type oa_type;
99 enum ldlm_mode oa_mode;
101 osc_enqueue_upcall_f oa_upcall;
103 struct ost_lvb *oa_lvb;
104 struct lustre_handle oa_lockh;
105 unsigned int oa_agl:1;
108 static void osc_release_ppga(struct brw_page **ppga, size_t count);
109 static int brw_interpret(const struct lu_env *env, struct ptlrpc_request *req,
112 void osc_pack_req_body(struct ptlrpc_request *req, struct obdo *oa)
114 struct ost_body *body;
116 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
119 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
122 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
125 struct ptlrpc_request *req;
126 struct ost_body *body;
130 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
134 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
136 ptlrpc_request_free(req);
140 osc_pack_req_body(req, oa);
142 ptlrpc_request_set_replen(req);
144 rc = ptlrpc_queue_wait(req);
148 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
150 GOTO(out, rc = -EPROTO);
152 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
153 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
155 oa->o_blksize = cli_brw_size(exp->exp_obd);
156 oa->o_valid |= OBD_MD_FLBLKSZ;
160 ptlrpc_req_finished(req);
165 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
168 struct ptlrpc_request *req;
169 struct ost_body *body;
173 LASSERT(oa->o_valid & OBD_MD_FLGROUP);
175 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
179 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
181 ptlrpc_request_free(req);
185 osc_pack_req_body(req, oa);
187 ptlrpc_request_set_replen(req);
189 rc = ptlrpc_queue_wait(req);
193 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
195 GOTO(out, rc = -EPROTO);
197 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
201 ptlrpc_req_finished(req);
206 static int osc_setattr_interpret(const struct lu_env *env,
207 struct ptlrpc_request *req,
208 struct osc_setattr_args *sa, int rc)
210 struct ost_body *body;
216 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
218 GOTO(out, rc = -EPROTO);
220 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, sa->sa_oa,
223 rc = sa->sa_upcall(sa->sa_cookie, rc);
227 int osc_setattr_async(struct obd_export *exp, struct obdo *oa,
228 obd_enqueue_update_f upcall, void *cookie,
229 struct ptlrpc_request_set *rqset)
231 struct ptlrpc_request *req;
232 struct osc_setattr_args *sa;
237 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
241 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
243 ptlrpc_request_free(req);
247 osc_pack_req_body(req, oa);
249 ptlrpc_request_set_replen(req);
251 /* do mds to ost setattr asynchronously */
253 /* Do not wait for response. */
254 ptlrpcd_add_req(req);
256 req->rq_interpret_reply =
257 (ptlrpc_interpterer_t)osc_setattr_interpret;
259 CLASSERT(sizeof(*sa) <= sizeof(req->rq_async_args));
260 sa = ptlrpc_req_async_args(req);
262 sa->sa_upcall = upcall;
263 sa->sa_cookie = cookie;
265 if (rqset == PTLRPCD_SET)
266 ptlrpcd_add_req(req);
268 ptlrpc_set_add_req(rqset, req);
274 static int osc_ladvise_interpret(const struct lu_env *env,
275 struct ptlrpc_request *req,
278 struct osc_ladvise_args *la = arg;
279 struct ost_body *body;
285 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
287 GOTO(out, rc = -EPROTO);
289 *la->la_oa = body->oa;
291 rc = la->la_upcall(la->la_cookie, rc);
296 * If rqset is NULL, do not wait for response. Upcall and cookie could also
297 * be NULL in this case
299 int osc_ladvise_base(struct obd_export *exp, struct obdo *oa,
300 struct ladvise_hdr *ladvise_hdr,
301 obd_enqueue_update_f upcall, void *cookie,
302 struct ptlrpc_request_set *rqset)
304 struct ptlrpc_request *req;
305 struct ost_body *body;
306 struct osc_ladvise_args *la;
308 struct lu_ladvise *req_ladvise;
309 struct lu_ladvise *ladvise = ladvise_hdr->lah_advise;
310 int num_advise = ladvise_hdr->lah_count;
311 struct ladvise_hdr *req_ladvise_hdr;
314 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_LADVISE);
318 req_capsule_set_size(&req->rq_pill, &RMF_OST_LADVISE, RCL_CLIENT,
319 num_advise * sizeof(*ladvise));
320 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_LADVISE);
322 ptlrpc_request_free(req);
325 req->rq_request_portal = OST_IO_PORTAL;
326 ptlrpc_at_set_req_timeout(req);
328 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
330 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
333 req_ladvise_hdr = req_capsule_client_get(&req->rq_pill,
334 &RMF_OST_LADVISE_HDR);
335 memcpy(req_ladvise_hdr, ladvise_hdr, sizeof(*ladvise_hdr));
337 req_ladvise = req_capsule_client_get(&req->rq_pill, &RMF_OST_LADVISE);
338 memcpy(req_ladvise, ladvise, sizeof(*ladvise) * num_advise);
339 ptlrpc_request_set_replen(req);
342 /* Do not wait for response. */
343 ptlrpcd_add_req(req);
347 req->rq_interpret_reply = osc_ladvise_interpret;
348 CLASSERT(sizeof(*la) <= sizeof(req->rq_async_args));
349 la = ptlrpc_req_async_args(req);
351 la->la_upcall = upcall;
352 la->la_cookie = cookie;
354 if (rqset == PTLRPCD_SET)
355 ptlrpcd_add_req(req);
357 ptlrpc_set_add_req(rqset, req);
362 static int osc_create(const struct lu_env *env, struct obd_export *exp,
365 struct ptlrpc_request *req;
366 struct ost_body *body;
371 LASSERT(oa->o_valid & OBD_MD_FLGROUP);
372 LASSERT(fid_seq_is_echo(ostid_seq(&oa->o_oi)));
374 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
376 GOTO(out, rc = -ENOMEM);
378 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
380 ptlrpc_request_free(req);
384 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
387 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
389 ptlrpc_request_set_replen(req);
391 rc = ptlrpc_queue_wait(req);
395 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
397 GOTO(out_req, rc = -EPROTO);
399 CDEBUG(D_INFO, "oa flags %x\n", oa->o_flags);
400 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
402 oa->o_blksize = cli_brw_size(exp->exp_obd);
403 oa->o_valid |= OBD_MD_FLBLKSZ;
405 CDEBUG(D_HA, "transno: %lld\n",
406 lustre_msg_get_transno(req->rq_repmsg));
408 ptlrpc_req_finished(req);
413 int osc_punch_base(struct obd_export *exp, struct obdo *oa,
414 obd_enqueue_update_f upcall, void *cookie,
415 struct ptlrpc_request_set *rqset)
417 struct ptlrpc_request *req;
418 struct osc_setattr_args *sa;
419 struct ost_body *body;
423 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
427 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
429 ptlrpc_request_free(req);
432 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
433 ptlrpc_at_set_req_timeout(req);
435 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
437 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
439 ptlrpc_request_set_replen(req);
441 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
442 CLASSERT(sizeof(*sa) <= sizeof(req->rq_async_args));
443 sa = ptlrpc_req_async_args(req);
445 sa->sa_upcall = upcall;
446 sa->sa_cookie = cookie;
447 if (rqset == PTLRPCD_SET)
448 ptlrpcd_add_req(req);
450 ptlrpc_set_add_req(rqset, req);
455 static int osc_sync_interpret(const struct lu_env *env,
456 struct ptlrpc_request *req,
459 struct osc_fsync_args *fa = arg;
460 struct ost_body *body;
461 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
462 unsigned long valid = 0;
463 struct cl_object *obj;
469 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
471 CERROR("can't unpack ost_body\n");
472 GOTO(out, rc = -EPROTO);
475 *fa->fa_oa = body->oa;
476 obj = osc2cl(fa->fa_obj);
478 /* Update osc object's blocks attribute */
479 cl_object_attr_lock(obj);
480 if (body->oa.o_valid & OBD_MD_FLBLOCKS) {
481 attr->cat_blocks = body->oa.o_blocks;
486 cl_object_attr_update(env, obj, attr, valid);
487 cl_object_attr_unlock(obj);
490 rc = fa->fa_upcall(fa->fa_cookie, rc);
494 int osc_sync_base(struct osc_object *obj, struct obdo *oa,
495 obd_enqueue_update_f upcall, void *cookie,
496 struct ptlrpc_request_set *rqset)
498 struct obd_export *exp = osc_export(obj);
499 struct ptlrpc_request *req;
500 struct ost_body *body;
501 struct osc_fsync_args *fa;
505 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
509 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
511 ptlrpc_request_free(req);
515 /* overload the size and blocks fields in the oa with start/end */
516 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
518 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
520 ptlrpc_request_set_replen(req);
521 req->rq_interpret_reply = osc_sync_interpret;
523 CLASSERT(sizeof(*fa) <= sizeof(req->rq_async_args));
524 fa = ptlrpc_req_async_args(req);
527 fa->fa_upcall = upcall;
528 fa->fa_cookie = cookie;
530 if (rqset == PTLRPCD_SET)
531 ptlrpcd_add_req(req);
533 ptlrpc_set_add_req(rqset, req);
538 /* Find and cancel locally locks matched by @mode in the resource found by
539 * @objid. Found locks are added into @cancel list. Returns the amount of
540 * locks added to @cancels list. */
541 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
542 struct list_head *cancels,
543 enum ldlm_mode mode, __u64 lock_flags)
545 struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
546 struct ldlm_res_id res_id;
547 struct ldlm_resource *res;
551 /* Return, i.e. cancel nothing, only if ELC is supported (flag in
552 * export) but disabled through procfs (flag in NS).
554 * This distinguishes from a case when ELC is not supported originally,
555 * when we still want to cancel locks in advance and just cancel them
556 * locally, without sending any RPC. */
557 if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
560 ostid_build_res_name(&oa->o_oi, &res_id);
561 res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
565 LDLM_RESOURCE_ADDREF(res);
566 count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
567 lock_flags, 0, NULL);
568 LDLM_RESOURCE_DELREF(res);
569 ldlm_resource_putref(res);
573 static int osc_destroy_interpret(const struct lu_env *env,
574 struct ptlrpc_request *req, void *data,
577 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
579 atomic_dec(&cli->cl_destroy_in_flight);
580 wake_up(&cli->cl_destroy_waitq);
584 static int osc_can_send_destroy(struct client_obd *cli)
586 if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
587 cli->cl_max_rpcs_in_flight) {
588 /* The destroy request can be sent */
591 if (atomic_dec_return(&cli->cl_destroy_in_flight) <
592 cli->cl_max_rpcs_in_flight) {
594 * The counter has been modified between the two atomic
597 wake_up(&cli->cl_destroy_waitq);
602 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
605 struct client_obd *cli = &exp->exp_obd->u.cli;
606 struct ptlrpc_request *req;
607 struct ost_body *body;
608 struct list_head cancels = LIST_HEAD_INIT(cancels);
613 CDEBUG(D_INFO, "oa NULL\n");
617 count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
618 LDLM_FL_DISCARD_DATA);
620 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
622 ldlm_lock_list_put(&cancels, l_bl_ast, count);
626 rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
629 ptlrpc_request_free(req);
633 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
634 ptlrpc_at_set_req_timeout(req);
636 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
638 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
640 ptlrpc_request_set_replen(req);
642 req->rq_interpret_reply = osc_destroy_interpret;
643 if (!osc_can_send_destroy(cli)) {
644 struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
647 * Wait until the number of on-going destroy RPCs drops
648 * under max_rpc_in_flight
650 rc = l_wait_event_exclusive(cli->cl_destroy_waitq,
651 osc_can_send_destroy(cli), &lwi);
653 ptlrpc_req_finished(req);
658 /* Do not wait for response */
659 ptlrpcd_add_req(req);
663 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
666 u64 bits = OBD_MD_FLBLOCKS | OBD_MD_FLGRANT;
668 LASSERT(!(oa->o_valid & bits));
671 spin_lock(&cli->cl_loi_list_lock);
672 if (OCD_HAS_FLAG(&cli->cl_import->imp_connect_data, GRANT_PARAM))
673 oa->o_dirty = cli->cl_dirty_grant;
675 oa->o_dirty = cli->cl_dirty_pages << PAGE_SHIFT;
676 if (unlikely(cli->cl_dirty_pages - cli->cl_dirty_transit >
677 cli->cl_dirty_max_pages)) {
678 CERROR("dirty %lu - %lu > dirty_max %lu\n",
679 cli->cl_dirty_pages, cli->cl_dirty_transit,
680 cli->cl_dirty_max_pages);
682 } else if (unlikely(atomic_long_read(&obd_dirty_pages) -
683 atomic_long_read(&obd_dirty_transit_pages) >
684 (long)(obd_max_dirty_pages + 1))) {
685 /* The atomic_read() allowing the atomic_inc() are
686 * not covered by a lock thus they may safely race and trip
687 * this CERROR() unless we add in a small fudge factor (+1). */
688 CERROR("%s: dirty %ld - %ld > system dirty_max %ld\n",
689 cli_name(cli), atomic_long_read(&obd_dirty_pages),
690 atomic_long_read(&obd_dirty_transit_pages),
691 obd_max_dirty_pages);
693 } else if (unlikely(cli->cl_dirty_max_pages - cli->cl_dirty_pages >
695 CERROR("dirty %lu - dirty_max %lu too big???\n",
696 cli->cl_dirty_pages, cli->cl_dirty_max_pages);
699 unsigned long nrpages;
701 nrpages = cli->cl_max_pages_per_rpc;
702 nrpages *= cli->cl_max_rpcs_in_flight + 1;
703 nrpages = max(nrpages, cli->cl_dirty_max_pages);
704 oa->o_undirty = nrpages << PAGE_SHIFT;
705 if (OCD_HAS_FLAG(&cli->cl_import->imp_connect_data,
709 /* take extent tax into account when asking for more
711 nrextents = (nrpages + cli->cl_max_extent_pages - 1) /
712 cli->cl_max_extent_pages;
713 oa->o_undirty += nrextents * cli->cl_grant_extent_tax;
716 oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
717 oa->o_dropped = cli->cl_lost_grant;
718 cli->cl_lost_grant = 0;
719 spin_unlock(&cli->cl_loi_list_lock);
720 CDEBUG(D_CACHE, "dirty: %llu undirty: %u dropped %u grant: %llu\n",
721 oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
724 void osc_update_next_shrink(struct client_obd *cli)
726 cli->cl_next_shrink_grant =
727 cfs_time_shift(cli->cl_grant_shrink_interval);
728 CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
729 cli->cl_next_shrink_grant);
732 static void __osc_update_grant(struct client_obd *cli, u64 grant)
734 spin_lock(&cli->cl_loi_list_lock);
735 cli->cl_avail_grant += grant;
736 spin_unlock(&cli->cl_loi_list_lock);
739 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
741 if (body->oa.o_valid & OBD_MD_FLGRANT) {
742 CDEBUG(D_CACHE, "got %llu extra grant\n", body->oa.o_grant);
743 __osc_update_grant(cli, body->oa.o_grant);
747 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
748 u32 keylen, void *key,
749 u32 vallen, void *val,
750 struct ptlrpc_request_set *set);
752 static int osc_shrink_grant_interpret(const struct lu_env *env,
753 struct ptlrpc_request *req,
756 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
757 struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
758 struct ost_body *body;
761 __osc_update_grant(cli, oa->o_grant);
765 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
767 osc_update_grant(cli, body);
773 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
775 spin_lock(&cli->cl_loi_list_lock);
776 oa->o_grant = cli->cl_avail_grant / 4;
777 cli->cl_avail_grant -= oa->o_grant;
778 spin_unlock(&cli->cl_loi_list_lock);
779 if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
780 oa->o_valid |= OBD_MD_FLFLAGS;
783 oa->o_flags |= OBD_FL_SHRINK_GRANT;
784 osc_update_next_shrink(cli);
787 /* Shrink the current grant, either from some large amount to enough for a
788 * full set of in-flight RPCs, or if we have already shrunk to that limit
789 * then to enough for a single RPC. This avoids keeping more grant than
790 * needed, and avoids shrinking the grant piecemeal. */
791 static int osc_shrink_grant(struct client_obd *cli)
793 __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) *
794 (cli->cl_max_pages_per_rpc << PAGE_SHIFT);
796 spin_lock(&cli->cl_loi_list_lock);
797 if (cli->cl_avail_grant <= target_bytes)
798 target_bytes = cli->cl_max_pages_per_rpc << PAGE_SHIFT;
799 spin_unlock(&cli->cl_loi_list_lock);
801 return osc_shrink_grant_to_target(cli, target_bytes);
804 int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
807 struct ost_body *body;
810 spin_lock(&cli->cl_loi_list_lock);
811 /* Don't shrink if we are already above or below the desired limit
812 * We don't want to shrink below a single RPC, as that will negatively
813 * impact block allocation and long-term performance. */
814 if (target_bytes < cli->cl_max_pages_per_rpc << PAGE_SHIFT)
815 target_bytes = cli->cl_max_pages_per_rpc << PAGE_SHIFT;
817 if (target_bytes >= cli->cl_avail_grant) {
818 spin_unlock(&cli->cl_loi_list_lock);
821 spin_unlock(&cli->cl_loi_list_lock);
827 osc_announce_cached(cli, &body->oa, 0);
829 spin_lock(&cli->cl_loi_list_lock);
830 body->oa.o_grant = cli->cl_avail_grant - target_bytes;
831 cli->cl_avail_grant = target_bytes;
832 spin_unlock(&cli->cl_loi_list_lock);
833 if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
834 body->oa.o_valid |= OBD_MD_FLFLAGS;
835 body->oa.o_flags = 0;
837 body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
838 osc_update_next_shrink(cli);
840 rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
841 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
842 sizeof(*body), body, NULL);
844 __osc_update_grant(cli, body->oa.o_grant);
849 static int osc_should_shrink_grant(struct client_obd *client)
851 cfs_time_t time = cfs_time_current();
852 cfs_time_t next_shrink = client->cl_next_shrink_grant;
854 if ((client->cl_import->imp_connect_data.ocd_connect_flags &
855 OBD_CONNECT_GRANT_SHRINK) == 0)
858 if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
859 /* Get the current RPC size directly, instead of going via:
860 * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
861 * Keep comment here so that it can be found by searching. */
862 int brw_size = client->cl_max_pages_per_rpc << PAGE_SHIFT;
864 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
865 client->cl_avail_grant > brw_size)
868 osc_update_next_shrink(client);
873 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
875 struct client_obd *client;
877 list_for_each_entry(client, &item->ti_obd_list, cl_grant_shrink_list) {
878 if (osc_should_shrink_grant(client))
879 osc_shrink_grant(client);
884 static int osc_add_shrink_grant(struct client_obd *client)
888 rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
890 osc_grant_shrink_grant_cb, NULL,
891 &client->cl_grant_shrink_list);
893 CERROR("add grant client %s error %d\n", cli_name(client), rc);
896 CDEBUG(D_CACHE, "add grant client %s\n", cli_name(client));
897 osc_update_next_shrink(client);
901 static int osc_del_shrink_grant(struct client_obd *client)
903 return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
907 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
910 * ocd_grant is the total grant amount we're expect to hold: if we've
911 * been evicted, it's the new avail_grant amount, cl_dirty_pages will
912 * drop to 0 as inflight RPCs fail out; otherwise, it's avail_grant +
915 * race is tolerable here: if we're evicted, but imp_state already
916 * left EVICTED state, then cl_dirty_pages must be 0 already.
918 spin_lock(&cli->cl_loi_list_lock);
919 cli->cl_avail_grant = ocd->ocd_grant;
920 if (cli->cl_import->imp_state != LUSTRE_IMP_EVICTED) {
921 cli->cl_avail_grant -= cli->cl_reserved_grant;
922 if (OCD_HAS_FLAG(ocd, GRANT_PARAM))
923 cli->cl_avail_grant -= cli->cl_dirty_grant;
925 cli->cl_avail_grant -=
926 cli->cl_dirty_pages << PAGE_SHIFT;
929 if (OCD_HAS_FLAG(ocd, GRANT_PARAM)) {
933 /* overhead for each extent insertion */
934 cli->cl_grant_extent_tax = ocd->ocd_grant_tax_kb << 10;
935 /* determine the appropriate chunk size used by osc_extent. */
936 cli->cl_chunkbits = max_t(int, PAGE_SHIFT,
937 ocd->ocd_grant_blkbits);
938 /* max_pages_per_rpc must be chunk aligned */
939 chunk_mask = ~((1 << (cli->cl_chunkbits - PAGE_SHIFT)) - 1);
940 cli->cl_max_pages_per_rpc = (cli->cl_max_pages_per_rpc +
941 ~chunk_mask) & chunk_mask;
942 /* determine maximum extent size, in #pages */
943 size = (u64)ocd->ocd_grant_max_blks << ocd->ocd_grant_blkbits;
944 cli->cl_max_extent_pages = size >> PAGE_SHIFT;
945 if (cli->cl_max_extent_pages == 0)
946 cli->cl_max_extent_pages = 1;
948 cli->cl_grant_extent_tax = 0;
949 cli->cl_chunkbits = PAGE_SHIFT;
950 cli->cl_max_extent_pages = DT_MAX_BRW_PAGES;
952 spin_unlock(&cli->cl_loi_list_lock);
954 CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld."
955 "chunk bits: %d cl_max_extent_pages: %d\n",
957 cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits,
958 cli->cl_max_extent_pages);
960 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
961 list_empty(&cli->cl_grant_shrink_list))
962 osc_add_shrink_grant(cli);
965 /* We assume that the reason this OSC got a short read is because it read
966 * beyond the end of a stripe file; i.e. lustre is reading a sparse file
967 * via the LOV, and it _knows_ it's reading inside the file, it's just that
968 * this stripe never got written at or beyond this stripe offset yet. */
969 static void handle_short_read(int nob_read, size_t page_count,
970 struct brw_page **pga)
975 /* skip bytes read OK */
976 while (nob_read > 0) {
977 LASSERT (page_count > 0);
979 if (pga[i]->count > nob_read) {
980 /* EOF inside this page */
981 ptr = kmap(pga[i]->pg) +
982 (pga[i]->off & ~PAGE_MASK);
983 memset(ptr + nob_read, 0, pga[i]->count - nob_read);
990 nob_read -= pga[i]->count;
995 /* zero remaining pages */
996 while (page_count-- > 0) {
997 ptr = kmap(pga[i]->pg) + (pga[i]->off & ~PAGE_MASK);
998 memset(ptr, 0, pga[i]->count);
1004 static int check_write_rcs(struct ptlrpc_request *req,
1005 int requested_nob, int niocount,
1006 size_t page_count, struct brw_page **pga)
1011 remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1012 sizeof(*remote_rcs) *
1014 if (remote_rcs == NULL) {
1015 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1019 /* return error if any niobuf was in error */
1020 for (i = 0; i < niocount; i++) {
1021 if ((int)remote_rcs[i] < 0)
1022 return(remote_rcs[i]);
1024 if (remote_rcs[i] != 0) {
1025 CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1026 i, remote_rcs[i], req);
1031 if (req->rq_bulk->bd_nob_transferred != requested_nob) {
1032 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1033 req->rq_bulk->bd_nob_transferred, requested_nob);
1040 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1042 if (p1->flag != p2->flag) {
1043 unsigned mask = ~(OBD_BRW_FROM_GRANT | OBD_BRW_NOCACHE |
1044 OBD_BRW_SYNC | OBD_BRW_ASYNC |
1045 OBD_BRW_NOQUOTA | OBD_BRW_SOFT_SYNC);
1047 /* warn if we try to combine flags that we don't know to be
1048 * safe to combine */
1049 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
1050 CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
1051 "report this at https://jira.hpdd.intel.com/\n",
1052 p1->flag, p2->flag);
1057 return (p1->off + p1->count == p2->off);
1060 static u32 osc_checksum_bulk(int nob, size_t pg_count,
1061 struct brw_page **pga, int opc,
1062 cksum_type_t cksum_type)
1066 struct cfs_crypto_hash_desc *hdesc;
1067 unsigned int bufsize;
1068 unsigned char cfs_alg = cksum_obd2cfs(cksum_type);
1070 LASSERT(pg_count > 0);
1072 hdesc = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1073 if (IS_ERR(hdesc)) {
1074 CERROR("Unable to initialize checksum hash %s\n",
1075 cfs_crypto_hash_name(cfs_alg));
1076 return PTR_ERR(hdesc);
1079 while (nob > 0 && pg_count > 0) {
1080 unsigned int count = pga[i]->count > nob ? nob : pga[i]->count;
1082 /* corrupt the data before we compute the checksum, to
1083 * simulate an OST->client data error */
1084 if (i == 0 && opc == OST_READ &&
1085 OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
1086 unsigned char *ptr = kmap(pga[i]->pg);
1087 int off = pga[i]->off & ~PAGE_MASK;
1089 memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob));
1092 cfs_crypto_hash_update_page(hdesc, pga[i]->pg,
1093 pga[i]->off & ~PAGE_MASK,
1095 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d\n",
1096 (int)(pga[i]->off & ~PAGE_MASK));
1098 nob -= pga[i]->count;
1103 bufsize = sizeof(cksum);
1104 cfs_crypto_hash_final(hdesc, (unsigned char *)&cksum, &bufsize);
1106 /* For sending we only compute the wrong checksum instead
1107 * of corrupting the data so it is still correct on a redo */
1108 if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1115 osc_brw_prep_request(int cmd, struct client_obd *cli, struct obdo *oa,
1116 u32 page_count, struct brw_page **pga,
1117 struct ptlrpc_request **reqp, int resend)
1119 struct ptlrpc_request *req;
1120 struct ptlrpc_bulk_desc *desc;
1121 struct ost_body *body;
1122 struct obd_ioobj *ioobj;
1123 struct niobuf_remote *niobuf;
1124 int niocount, i, requested_nob, opc, rc;
1125 struct osc_brw_async_args *aa;
1126 struct req_capsule *pill;
1127 struct brw_page *pg_prev;
1130 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1131 RETURN(-ENOMEM); /* Recoverable */
1132 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1133 RETURN(-EINVAL); /* Fatal */
1135 if ((cmd & OBD_BRW_WRITE) != 0) {
1137 req = ptlrpc_request_alloc_pool(cli->cl_import,
1139 &RQF_OST_BRW_WRITE);
1142 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1147 for (niocount = i = 1; i < page_count; i++) {
1148 if (!can_merge_pages(pga[i - 1], pga[i]))
1152 pill = &req->rq_pill;
1153 req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1155 req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1156 niocount * sizeof(*niobuf));
1158 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1160 ptlrpc_request_free(req);
1163 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1164 ptlrpc_at_set_req_timeout(req);
1165 /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1167 req->rq_no_retry_einprogress = 1;
1169 desc = ptlrpc_prep_bulk_imp(req, page_count,
1170 cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
1171 (opc == OST_WRITE ? PTLRPC_BULK_GET_SOURCE :
1172 PTLRPC_BULK_PUT_SINK) |
1173 PTLRPC_BULK_BUF_KIOV,
1175 &ptlrpc_bulk_kiov_pin_ops);
1178 GOTO(out, rc = -ENOMEM);
1179 /* NB request now owns desc and will free it when it gets freed */
1181 body = req_capsule_client_get(pill, &RMF_OST_BODY);
1182 ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1183 niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1184 LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1186 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
1188 obdo_to_ioobj(oa, ioobj);
1189 ioobj->ioo_bufcnt = niocount;
1190 /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
1191 * that might be send for this request. The actual number is decided
1192 * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
1193 * "max - 1" for old client compatibility sending "0", and also so the
1194 * the actual maximum is a power-of-two number, not one less. LU-1431 */
1195 ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
1196 LASSERT(page_count > 0);
1198 for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1199 struct brw_page *pg = pga[i];
1200 int poff = pg->off & ~PAGE_MASK;
1202 LASSERT(pg->count > 0);
1203 /* make sure there is no gap in the middle of page array */
1204 LASSERTF(page_count == 1 ||
1205 (ergo(i == 0, poff + pg->count == PAGE_SIZE) &&
1206 ergo(i > 0 && i < page_count - 1,
1207 poff == 0 && pg->count == PAGE_SIZE) &&
1208 ergo(i == page_count - 1, poff == 0)),
1209 "i: %d/%d pg: %p off: %llu, count: %u\n",
1210 i, page_count, pg, pg->off, pg->count);
1211 LASSERTF(i == 0 || pg->off > pg_prev->off,
1212 "i %d p_c %u pg %p [pri %lu ind %lu] off %llu"
1213 " prev_pg %p [pri %lu ind %lu] off %llu\n",
1215 pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1216 pg_prev->pg, page_private(pg_prev->pg),
1217 pg_prev->pg->index, pg_prev->off);
1218 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1219 (pg->flag & OBD_BRW_SRVLOCK));
1221 desc->bd_frag_ops->add_kiov_frag(desc, pg->pg, poff, pg->count);
1222 requested_nob += pg->count;
1224 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1226 niobuf->rnb_len += pg->count;
1228 niobuf->rnb_offset = pg->off;
1229 niobuf->rnb_len = pg->count;
1230 niobuf->rnb_flags = pg->flag;
1235 LASSERTF((void *)(niobuf - niocount) ==
1236 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1237 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1238 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1240 osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1242 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1243 body->oa.o_valid |= OBD_MD_FLFLAGS;
1244 body->oa.o_flags = 0;
1246 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1249 if (osc_should_shrink_grant(cli))
1250 osc_shrink_grant_local(cli, &body->oa);
1252 /* size[REQ_REC_OFF] still sizeof (*body) */
1253 if (opc == OST_WRITE) {
1254 if (cli->cl_checksum &&
1255 !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1256 /* store cl_cksum_type in a local variable since
1257 * it can be changed via lprocfs */
1258 cksum_type_t cksum_type = cli->cl_cksum_type;
1260 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1261 body->oa.o_flags = 0;
1263 body->oa.o_flags |= cksum_type_pack(cksum_type);
1264 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1265 body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1269 CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1271 /* save this in 'oa', too, for later checking */
1272 oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1273 oa->o_flags |= cksum_type_pack(cksum_type);
1275 /* clear out the checksum flag, in case this is a
1276 * resend but cl_checksum is no longer set. b=11238 */
1277 oa->o_valid &= ~OBD_MD_FLCKSUM;
1279 oa->o_cksum = body->oa.o_cksum;
1280 /* 1 RC per niobuf */
1281 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1282 sizeof(__u32) * niocount);
1284 if (cli->cl_checksum &&
1285 !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1286 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1287 body->oa.o_flags = 0;
1288 body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1289 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1292 /* Client cksum has been already copied to wire obdo in previous
1293 * lustre_set_wire_obdo(), and in the case a bulk-read is being
1294 * resent due to cksum error, this will allow Server to
1295 * check+dump pages on its side */
1297 ptlrpc_request_set_replen(req);
1299 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1300 aa = ptlrpc_req_async_args(req);
1302 aa->aa_requested_nob = requested_nob;
1303 aa->aa_nio_count = niocount;
1304 aa->aa_page_count = page_count;
1308 INIT_LIST_HEAD(&aa->aa_oaps);
1311 niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1312 CDEBUG(D_RPCTRACE, "brw rpc %p - object "DOSTID" offset %lld<>%lld\n",
1313 req, POSTID(&oa->o_oi), niobuf[0].rnb_offset,
1314 niobuf[niocount - 1].rnb_offset + niobuf[niocount - 1].rnb_len);
1318 ptlrpc_req_finished(req);
1322 char dbgcksum_file_name[PATH_MAX];
1324 static void dump_all_bulk_pages(struct obdo *oa, __u32 page_count,
1325 struct brw_page **pga, __u32 server_cksum,
1334 /* will only keep dump of pages on first error for the same range in
1335 * file/fid, not during the resends/retries. */
1336 snprintf(dbgcksum_file_name, sizeof(dbgcksum_file_name),
1337 "%s-checksum_dump-osc-"DFID":[%llu-%llu]-%x-%x",
1338 (strncmp(libcfs_debug_file_path_arr, "NONE", 4) != 0 ?
1339 libcfs_debug_file_path_arr :
1340 LIBCFS_DEBUG_FILE_PATH_DEFAULT),
1341 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : 0ULL,
1342 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1343 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1345 pga[page_count-1]->off + pga[page_count-1]->count - 1,
1346 client_cksum, server_cksum);
1347 filp = filp_open(dbgcksum_file_name,
1348 O_CREAT | O_EXCL | O_WRONLY | O_LARGEFILE, 0600);
1352 CDEBUG(D_INFO, "%s: can't open to dump pages with "
1353 "checksum error: rc = %d\n", dbgcksum_file_name,
1356 CERROR("%s: can't open to dump pages with checksum "
1357 "error: rc = %d\n", dbgcksum_file_name, rc);
1363 for (i = 0; i < page_count; i++) {
1364 len = pga[i]->count;
1365 buf = kmap(pga[i]->pg);
1367 rc = vfs_write(filp, (__force const char __user *)buf,
1370 CERROR("%s: wanted to write %u but got %d "
1371 "error\n", dbgcksum_file_name, len, rc);
1376 CDEBUG(D_INFO, "%s: wrote %d bytes\n",
1377 dbgcksum_file_name, rc);
1383 rc = ll_vfs_fsync_range(filp, 0, LLONG_MAX, 1);
1385 CERROR("%s: sync returns %d\n", dbgcksum_file_name, rc);
1386 filp_close(filp, NULL);
1391 check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1392 __u32 client_cksum, __u32 server_cksum,
1393 struct osc_brw_async_args *aa)
1397 cksum_type_t cksum_type;
1399 if (server_cksum == client_cksum) {
1400 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1404 if (aa->aa_cli->cl_checksum_dump)
1405 dump_all_bulk_pages(oa, aa->aa_page_count, aa->aa_ppga,
1406 server_cksum, client_cksum);
1408 cksum_type = cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1410 new_cksum = osc_checksum_bulk(aa->aa_requested_nob, aa->aa_page_count,
1411 aa->aa_ppga, OST_WRITE, cksum_type);
1413 if (cksum_type != cksum_type_unpack(aa->aa_oa->o_flags))
1414 msg = "the server did not use the checksum type specified in "
1415 "the original request - likely a protocol problem";
1416 else if (new_cksum == server_cksum)
1417 msg = "changed on the client after we checksummed it - "
1418 "likely false positive due to mmap IO (bug 11742)";
1419 else if (new_cksum == client_cksum)
1420 msg = "changed in transit before arrival at OST";
1422 msg = "changed in transit AND doesn't match the original - "
1423 "likely false positive due to mmap IO (bug 11742)";
1425 LCONSOLE_ERROR_MSG(0x132, "%s: BAD WRITE CHECKSUM: %s: from %s inode "
1426 DFID " object "DOSTID" extent [%llu-%llu], original "
1427 "client csum %x (type %x), server csum %x (type %x),"
1428 " client csum now %x\n",
1429 aa->aa_cli->cl_import->imp_obd->obd_name,
1430 msg, libcfs_nid2str(peer->nid),
1431 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1432 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1433 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1434 POSTID(&oa->o_oi), aa->aa_ppga[0]->off,
1435 aa->aa_ppga[aa->aa_page_count - 1]->off +
1436 aa->aa_ppga[aa->aa_page_count-1]->count - 1,
1437 client_cksum, cksum_type_unpack(aa->aa_oa->o_flags),
1438 server_cksum, cksum_type, new_cksum);
1442 /* Note rc enters this function as number of bytes transferred */
1443 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1445 struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1446 const struct lnet_process_id *peer =
1447 &req->rq_import->imp_connection->c_peer;
1448 struct client_obd *cli = aa->aa_cli;
1449 struct ost_body *body;
1450 u32 client_cksum = 0;
1453 if (rc < 0 && rc != -EDQUOT) {
1454 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1458 LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1459 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1461 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1465 /* set/clear over quota flag for a uid/gid/projid */
1466 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1467 body->oa.o_valid & (OBD_MD_FLALLQUOTA)) {
1468 unsigned qid[LL_MAXQUOTAS] = {
1469 body->oa.o_uid, body->oa.o_gid,
1470 body->oa.o_projid };
1471 CDEBUG(D_QUOTA, "setdq for [%u %u %u] with valid %#llx, flags %x\n",
1472 body->oa.o_uid, body->oa.o_gid, body->oa.o_projid,
1473 body->oa.o_valid, body->oa.o_flags);
1474 osc_quota_setdq(cli, qid, body->oa.o_valid,
1478 osc_update_grant(cli, body);
1483 if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1484 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1486 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1488 CERROR("Unexpected +ve rc %d\n", rc);
1491 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1493 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1496 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1497 check_write_checksum(&body->oa, peer, client_cksum,
1498 body->oa.o_cksum, aa))
1501 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1502 aa->aa_page_count, aa->aa_ppga);
1506 /* The rest of this function executes only for OST_READs */
1508 /* if unwrap_bulk failed, return -EAGAIN to retry */
1509 rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1511 GOTO(out, rc = -EAGAIN);
1513 if (rc > aa->aa_requested_nob) {
1514 CERROR("Unexpected rc %d (%d requested)\n", rc,
1515 aa->aa_requested_nob);
1519 if (rc != req->rq_bulk->bd_nob_transferred) {
1520 CERROR ("Unexpected rc %d (%d transferred)\n",
1521 rc, req->rq_bulk->bd_nob_transferred);
1525 if (rc < aa->aa_requested_nob)
1526 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1528 if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1529 static int cksum_counter;
1530 u32 server_cksum = body->oa.o_cksum;
1533 cksum_type_t cksum_type;
1535 cksum_type = cksum_type_unpack(body->oa.o_valid &OBD_MD_FLFLAGS?
1536 body->oa.o_flags : 0);
1537 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1538 aa->aa_ppga, OST_READ,
1541 if (peer->nid != req->rq_bulk->bd_sender) {
1543 router = libcfs_nid2str(req->rq_bulk->bd_sender);
1546 if (server_cksum != client_cksum) {
1547 struct ost_body *clbody;
1548 u32 page_count = aa->aa_page_count;
1550 clbody = req_capsule_client_get(&req->rq_pill,
1552 if (cli->cl_checksum_dump)
1553 dump_all_bulk_pages(&clbody->oa, page_count,
1554 aa->aa_ppga, server_cksum,
1557 LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1558 "%s%s%s inode "DFID" object "DOSTID
1559 " extent [%llu-%llu], client %x, "
1560 "server %x, cksum_type %x\n",
1561 req->rq_import->imp_obd->obd_name,
1562 libcfs_nid2str(peer->nid),
1564 clbody->oa.o_valid & OBD_MD_FLFID ?
1565 clbody->oa.o_parent_seq : 0ULL,
1566 clbody->oa.o_valid & OBD_MD_FLFID ?
1567 clbody->oa.o_parent_oid : 0,
1568 clbody->oa.o_valid & OBD_MD_FLFID ?
1569 clbody->oa.o_parent_ver : 0,
1570 POSTID(&body->oa.o_oi),
1571 aa->aa_ppga[0]->off,
1572 aa->aa_ppga[page_count-1]->off +
1573 aa->aa_ppga[page_count-1]->count - 1,
1574 client_cksum, server_cksum,
1577 aa->aa_oa->o_cksum = client_cksum;
1581 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1584 } else if (unlikely(client_cksum)) {
1585 static int cksum_missed;
1588 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1589 CERROR("Checksum %u requested from %s but not sent\n",
1590 cksum_missed, libcfs_nid2str(peer->nid));
1596 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
1597 aa->aa_oa, &body->oa);
1602 static int osc_brw_redo_request(struct ptlrpc_request *request,
1603 struct osc_brw_async_args *aa, int rc)
1605 struct ptlrpc_request *new_req;
1606 struct osc_brw_async_args *new_aa;
1607 struct osc_async_page *oap;
1610 DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
1611 "redo for recoverable error %d", rc);
1613 rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1614 OST_WRITE ? OBD_BRW_WRITE : OBD_BRW_READ,
1615 aa->aa_cli, aa->aa_oa, aa->aa_page_count,
1616 aa->aa_ppga, &new_req, 1);
1620 list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1621 if (oap->oap_request != NULL) {
1622 LASSERTF(request == oap->oap_request,
1623 "request %p != oap_request %p\n",
1624 request, oap->oap_request);
1625 if (oap->oap_interrupted) {
1626 ptlrpc_req_finished(new_req);
1631 /* New request takes over pga and oaps from old request.
1632 * Note that copying a list_head doesn't work, need to move it... */
1634 new_req->rq_interpret_reply = request->rq_interpret_reply;
1635 new_req->rq_async_args = request->rq_async_args;
1636 new_req->rq_commit_cb = request->rq_commit_cb;
1637 /* cap resend delay to the current request timeout, this is similar to
1638 * what ptlrpc does (see after_reply()) */
1639 if (aa->aa_resends > new_req->rq_timeout)
1640 new_req->rq_sent = ktime_get_real_seconds() + new_req->rq_timeout;
1642 new_req->rq_sent = ktime_get_real_seconds() + aa->aa_resends;
1643 new_req->rq_generation_set = 1;
1644 new_req->rq_import_generation = request->rq_import_generation;
1646 new_aa = ptlrpc_req_async_args(new_req);
1648 INIT_LIST_HEAD(&new_aa->aa_oaps);
1649 list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
1650 INIT_LIST_HEAD(&new_aa->aa_exts);
1651 list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
1652 new_aa->aa_resends = aa->aa_resends;
1654 list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1655 if (oap->oap_request) {
1656 ptlrpc_req_finished(oap->oap_request);
1657 oap->oap_request = ptlrpc_request_addref(new_req);
1661 /* XXX: This code will run into problem if we're going to support
1662 * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
1663 * and wait for all of them to be finished. We should inherit request
1664 * set from old request. */
1665 ptlrpcd_add_req(new_req);
1667 DEBUG_REQ(D_INFO, new_req, "new request");
1672 * ugh, we want disk allocation on the target to happen in offset order. we'll
1673 * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1674 * fine for our small page arrays and doesn't require allocation. its an
1675 * insertion sort that swaps elements that are strides apart, shrinking the
1676 * stride down until its '1' and the array is sorted.
1678 static void sort_brw_pages(struct brw_page **array, int num)
1681 struct brw_page *tmp;
1685 for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1690 for (i = stride ; i < num ; i++) {
1693 while (j >= stride && array[j - stride]->off > tmp->off) {
1694 array[j] = array[j - stride];
1699 } while (stride > 1);
1702 static void osc_release_ppga(struct brw_page **ppga, size_t count)
1704 LASSERT(ppga != NULL);
1705 OBD_FREE(ppga, sizeof(*ppga) * count);
1708 static int brw_interpret(const struct lu_env *env,
1709 struct ptlrpc_request *req, void *data, int rc)
1711 struct osc_brw_async_args *aa = data;
1712 struct osc_extent *ext;
1713 struct osc_extent *tmp;
1714 struct client_obd *cli = aa->aa_cli;
1717 rc = osc_brw_fini_request(req, rc);
1718 CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
1719 /* When server return -EINPROGRESS, client should always retry
1720 * regardless of the number of times the bulk was resent already. */
1721 if (osc_recoverable_error(rc)) {
1722 if (req->rq_import_generation !=
1723 req->rq_import->imp_generation) {
1724 CDEBUG(D_HA, "%s: resend cross eviction for object: "
1725 ""DOSTID", rc = %d.\n",
1726 req->rq_import->imp_obd->obd_name,
1727 POSTID(&aa->aa_oa->o_oi), rc);
1728 } else if (rc == -EINPROGRESS ||
1729 client_should_resend(aa->aa_resends, aa->aa_cli)) {
1730 rc = osc_brw_redo_request(req, aa, rc);
1732 CERROR("%s: too many resent retries for object: "
1733 "%llu:%llu, rc = %d.\n",
1734 req->rq_import->imp_obd->obd_name,
1735 POSTID(&aa->aa_oa->o_oi), rc);
1740 else if (rc == -EAGAIN || rc == -EINPROGRESS)
1745 struct obdo *oa = aa->aa_oa;
1746 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
1747 unsigned long valid = 0;
1748 struct cl_object *obj;
1749 struct osc_async_page *last;
1751 last = brw_page2oap(aa->aa_ppga[aa->aa_page_count - 1]);
1752 obj = osc2cl(last->oap_obj);
1754 cl_object_attr_lock(obj);
1755 if (oa->o_valid & OBD_MD_FLBLOCKS) {
1756 attr->cat_blocks = oa->o_blocks;
1757 valid |= CAT_BLOCKS;
1759 if (oa->o_valid & OBD_MD_FLMTIME) {
1760 attr->cat_mtime = oa->o_mtime;
1763 if (oa->o_valid & OBD_MD_FLATIME) {
1764 attr->cat_atime = oa->o_atime;
1767 if (oa->o_valid & OBD_MD_FLCTIME) {
1768 attr->cat_ctime = oa->o_ctime;
1772 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1773 struct lov_oinfo *loi = cl2osc(obj)->oo_oinfo;
1774 loff_t last_off = last->oap_count + last->oap_obj_off +
1777 /* Change file size if this is an out of quota or
1778 * direct IO write and it extends the file size */
1779 if (loi->loi_lvb.lvb_size < last_off) {
1780 attr->cat_size = last_off;
1783 /* Extend KMS if it's not a lockless write */
1784 if (loi->loi_kms < last_off &&
1785 oap2osc_page(last)->ops_srvlock == 0) {
1786 attr->cat_kms = last_off;
1792 cl_object_attr_update(env, obj, attr, valid);
1793 cl_object_attr_unlock(obj);
1795 OBDO_FREE(aa->aa_oa);
1797 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE && rc == 0)
1798 osc_inc_unstable_pages(req);
1800 list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
1801 list_del_init(&ext->oe_link);
1802 osc_extent_finish(env, ext, 1, rc);
1804 LASSERT(list_empty(&aa->aa_exts));
1805 LASSERT(list_empty(&aa->aa_oaps));
1807 osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1808 ptlrpc_lprocfs_brw(req, req->rq_bulk->bd_nob_transferred);
1810 spin_lock(&cli->cl_loi_list_lock);
1811 /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
1812 * is called so we know whether to go to sync BRWs or wait for more
1813 * RPCs to complete */
1814 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
1815 cli->cl_w_in_flight--;
1817 cli->cl_r_in_flight--;
1818 osc_wake_cache_waiters(cli);
1819 spin_unlock(&cli->cl_loi_list_lock);
1821 osc_io_unplug(env, cli, NULL);
1825 static void brw_commit(struct ptlrpc_request *req)
1827 /* If osc_inc_unstable_pages (via osc_extent_finish) races with
1828 * this called via the rq_commit_cb, I need to ensure
1829 * osc_dec_unstable_pages is still called. Otherwise unstable
1830 * pages may be leaked. */
1831 spin_lock(&req->rq_lock);
1832 if (likely(req->rq_unstable)) {
1833 req->rq_unstable = 0;
1834 spin_unlock(&req->rq_lock);
1836 osc_dec_unstable_pages(req);
1838 req->rq_committed = 1;
1839 spin_unlock(&req->rq_lock);
1844 * Build an RPC by the list of extent @ext_list. The caller must ensure
1845 * that the total pages in this list are NOT over max pages per RPC.
1846 * Extents in the list must be in OES_RPC state.
1848 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
1849 struct list_head *ext_list, int cmd)
1851 struct ptlrpc_request *req = NULL;
1852 struct osc_extent *ext;
1853 struct brw_page **pga = NULL;
1854 struct osc_brw_async_args *aa = NULL;
1855 struct obdo *oa = NULL;
1856 struct osc_async_page *oap;
1857 struct osc_object *obj = NULL;
1858 struct cl_req_attr *crattr = NULL;
1859 loff_t starting_offset = OBD_OBJECT_EOF;
1860 loff_t ending_offset = 0;
1864 bool soft_sync = false;
1865 bool interrupted = false;
1869 struct list_head rpc_list = LIST_HEAD_INIT(rpc_list);
1870 struct ost_body *body;
1872 LASSERT(!list_empty(ext_list));
1874 /* add pages into rpc_list to build BRW rpc */
1875 list_for_each_entry(ext, ext_list, oe_link) {
1876 LASSERT(ext->oe_state == OES_RPC);
1877 mem_tight |= ext->oe_memalloc;
1878 grant += ext->oe_grants;
1879 page_count += ext->oe_nr_pages;
1884 soft_sync = osc_over_unstable_soft_limit(cli);
1886 mpflag = cfs_memory_pressure_get_and_set();
1888 OBD_ALLOC(pga, sizeof(*pga) * page_count);
1890 GOTO(out, rc = -ENOMEM);
1894 GOTO(out, rc = -ENOMEM);
1897 list_for_each_entry(ext, ext_list, oe_link) {
1898 list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
1900 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
1902 oap->oap_brw_flags |= OBD_BRW_SOFT_SYNC;
1903 pga[i] = &oap->oap_brw_page;
1904 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
1907 list_add_tail(&oap->oap_rpc_item, &rpc_list);
1908 if (starting_offset == OBD_OBJECT_EOF ||
1909 starting_offset > oap->oap_obj_off)
1910 starting_offset = oap->oap_obj_off;
1912 LASSERT(oap->oap_page_off == 0);
1913 if (ending_offset < oap->oap_obj_off + oap->oap_count)
1914 ending_offset = oap->oap_obj_off +
1917 LASSERT(oap->oap_page_off + oap->oap_count ==
1919 if (oap->oap_interrupted)
1924 /* first page in the list */
1925 oap = list_entry(rpc_list.next, typeof(*oap), oap_rpc_item);
1927 crattr = &osc_env_info(env)->oti_req_attr;
1928 memset(crattr, 0, sizeof(*crattr));
1929 crattr->cra_type = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
1930 crattr->cra_flags = ~0ULL;
1931 crattr->cra_page = oap2cl_page(oap);
1932 crattr->cra_oa = oa;
1933 cl_req_attr_set(env, osc2cl(obj), crattr);
1935 if (cmd == OBD_BRW_WRITE)
1936 oa->o_grant_used = grant;
1938 sort_brw_pages(pga, page_count);
1939 rc = osc_brw_prep_request(cmd, cli, oa, page_count, pga, &req, 0);
1941 CERROR("prep_req failed: %d\n", rc);
1945 req->rq_commit_cb = brw_commit;
1946 req->rq_interpret_reply = brw_interpret;
1947 req->rq_memalloc = mem_tight != 0;
1948 oap->oap_request = ptlrpc_request_addref(req);
1949 if (interrupted && !req->rq_intr)
1950 ptlrpc_mark_interrupted(req);
1952 /* Need to update the timestamps after the request is built in case
1953 * we race with setattr (locally or in queue at OST). If OST gets
1954 * later setattr before earlier BRW (as determined by the request xid),
1955 * the OST will not use BRW timestamps. Sadly, there is no obvious
1956 * way to do this in a single call. bug 10150 */
1957 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
1958 crattr->cra_oa = &body->oa;
1959 crattr->cra_flags = OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME;
1960 cl_req_attr_set(env, osc2cl(obj), crattr);
1961 lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid);
1963 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1964 aa = ptlrpc_req_async_args(req);
1965 INIT_LIST_HEAD(&aa->aa_oaps);
1966 list_splice_init(&rpc_list, &aa->aa_oaps);
1967 INIT_LIST_HEAD(&aa->aa_exts);
1968 list_splice_init(ext_list, &aa->aa_exts);
1970 spin_lock(&cli->cl_loi_list_lock);
1971 starting_offset >>= PAGE_SHIFT;
1972 if (cmd == OBD_BRW_READ) {
1973 cli->cl_r_in_flight++;
1974 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
1975 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
1976 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
1977 starting_offset + 1);
1979 cli->cl_w_in_flight++;
1980 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
1981 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
1982 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
1983 starting_offset + 1);
1985 spin_unlock(&cli->cl_loi_list_lock);
1987 DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %ur/%uw in flight",
1988 page_count, aa, cli->cl_r_in_flight,
1989 cli->cl_w_in_flight);
1990 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_DELAY_IO, cfs_fail_val);
1992 ptlrpcd_add_req(req);
1998 cfs_memory_pressure_restore(mpflag);
2001 LASSERT(req == NULL);
2006 OBD_FREE(pga, sizeof(*pga) * page_count);
2007 /* this should happen rarely and is pretty bad, it makes the
2008 * pending list not follow the dirty order */
2009 while (!list_empty(ext_list)) {
2010 ext = list_entry(ext_list->next, struct osc_extent,
2012 list_del_init(&ext->oe_link);
2013 osc_extent_finish(env, ext, 0, rc);
2019 static int osc_set_lock_data(struct ldlm_lock *lock, void *data)
2023 LASSERT(lock != NULL);
2025 lock_res_and_lock(lock);
2027 if (lock->l_ast_data == NULL)
2028 lock->l_ast_data = data;
2029 if (lock->l_ast_data == data)
2032 unlock_res_and_lock(lock);
2037 static int osc_enqueue_fini(struct ptlrpc_request *req,
2038 osc_enqueue_upcall_f upcall, void *cookie,
2039 struct lustre_handle *lockh, enum ldlm_mode mode,
2040 __u64 *flags, int agl, int errcode)
2042 bool intent = *flags & LDLM_FL_HAS_INTENT;
2046 /* The request was created before ldlm_cli_enqueue call. */
2047 if (intent && errcode == ELDLM_LOCK_ABORTED) {
2048 struct ldlm_reply *rep;
2050 rep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
2051 LASSERT(rep != NULL);
2053 rep->lock_policy_res1 =
2054 ptlrpc_status_ntoh(rep->lock_policy_res1);
2055 if (rep->lock_policy_res1)
2056 errcode = rep->lock_policy_res1;
2058 *flags |= LDLM_FL_LVB_READY;
2059 } else if (errcode == ELDLM_OK) {
2060 *flags |= LDLM_FL_LVB_READY;
2063 /* Call the update callback. */
2064 rc = (*upcall)(cookie, lockh, errcode);
2066 /* release the reference taken in ldlm_cli_enqueue() */
2067 if (errcode == ELDLM_LOCK_MATCHED)
2069 if (errcode == ELDLM_OK && lustre_handle_is_used(lockh))
2070 ldlm_lock_decref(lockh, mode);
2075 static int osc_enqueue_interpret(const struct lu_env *env,
2076 struct ptlrpc_request *req,
2077 struct osc_enqueue_args *aa, int rc)
2079 struct ldlm_lock *lock;
2080 struct lustre_handle *lockh = &aa->oa_lockh;
2081 enum ldlm_mode mode = aa->oa_mode;
2082 struct ost_lvb *lvb = aa->oa_lvb;
2083 __u32 lvb_len = sizeof(*lvb);
2088 /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2090 lock = ldlm_handle2lock(lockh);
2091 LASSERTF(lock != NULL,
2092 "lockh %#llx, req %p, aa %p - client evicted?\n",
2093 lockh->cookie, req, aa);
2095 /* Take an additional reference so that a blocking AST that
2096 * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2097 * to arrive after an upcall has been executed by
2098 * osc_enqueue_fini(). */
2099 ldlm_lock_addref(lockh, mode);
2101 /* Let cl_lock_state_wait fail with -ERESTARTSYS to unuse sublocks. */
2102 OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_HANG, 2);
2104 /* Let CP AST to grant the lock first. */
2105 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
2108 LASSERT(aa->oa_lvb == NULL);
2109 LASSERT(aa->oa_flags == NULL);
2110 aa->oa_flags = &flags;
2113 /* Complete obtaining the lock procedure. */
2114 rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_type, 1,
2115 aa->oa_mode, aa->oa_flags, lvb, lvb_len,
2117 /* Complete osc stuff. */
2118 rc = osc_enqueue_fini(req, aa->oa_upcall, aa->oa_cookie, lockh, mode,
2119 aa->oa_flags, aa->oa_agl, rc);
2121 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
2123 ldlm_lock_decref(lockh, mode);
2124 LDLM_LOCK_PUT(lock);
2128 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
2130 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2131 * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2132 * other synchronous requests, however keeping some locks and trying to obtain
2133 * others may take a considerable amount of time in a case of ost failure; and
2134 * when other sync requests do not get released lock from a client, the client
2135 * is evicted from the cluster -- such scenarious make the life difficult, so
2136 * release locks just after they are obtained. */
2137 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2138 __u64 *flags, union ldlm_policy_data *policy,
2139 struct ost_lvb *lvb, int kms_valid,
2140 osc_enqueue_upcall_f upcall, void *cookie,
2141 struct ldlm_enqueue_info *einfo,
2142 struct ptlrpc_request_set *rqset, int async, int agl)
2144 struct obd_device *obd = exp->exp_obd;
2145 struct lustre_handle lockh = { 0 };
2146 struct ptlrpc_request *req = NULL;
2147 int intent = *flags & LDLM_FL_HAS_INTENT;
2148 __u64 match_flags = *flags;
2149 enum ldlm_mode mode;
2153 /* Filesystem lock extents are extended to page boundaries so that
2154 * dealing with the page cache is a little smoother. */
2155 policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2156 policy->l_extent.end |= ~PAGE_MASK;
2159 * kms is not valid when either object is completely fresh (so that no
2160 * locks are cached), or object was evicted. In the latter case cached
2161 * lock cannot be used, because it would prime inode state with
2162 * potentially stale LVB.
2167 /* Next, search for already existing extent locks that will cover us */
2168 /* If we're trying to read, we also search for an existing PW lock. The
2169 * VFS and page cache already protect us locally, so lots of readers/
2170 * writers can share a single PW lock.
2172 * There are problems with conversion deadlocks, so instead of
2173 * converting a read lock to a write lock, we'll just enqueue a new
2176 * At some point we should cancel the read lock instead of making them
2177 * send us a blocking callback, but there are problems with canceling
2178 * locks out from other users right now, too. */
2179 mode = einfo->ei_mode;
2180 if (einfo->ei_mode == LCK_PR)
2183 match_flags |= LDLM_FL_LVB_READY;
2185 match_flags |= LDLM_FL_BLOCK_GRANTED;
2186 mode = ldlm_lock_match(obd->obd_namespace, match_flags, res_id,
2187 einfo->ei_type, policy, mode, &lockh, 0);
2189 struct ldlm_lock *matched;
2191 if (*flags & LDLM_FL_TEST_LOCK)
2194 matched = ldlm_handle2lock(&lockh);
2196 /* AGL enqueues DLM locks speculatively. Therefore if
2197 * it already exists a DLM lock, it wll just inform the
2198 * caller to cancel the AGL process for this stripe. */
2199 ldlm_lock_decref(&lockh, mode);
2200 LDLM_LOCK_PUT(matched);
2202 } else if (osc_set_lock_data(matched, einfo->ei_cbdata)) {
2203 *flags |= LDLM_FL_LVB_READY;
2205 /* We already have a lock, and it's referenced. */
2206 (*upcall)(cookie, &lockh, ELDLM_LOCK_MATCHED);
2208 ldlm_lock_decref(&lockh, mode);
2209 LDLM_LOCK_PUT(matched);
2212 ldlm_lock_decref(&lockh, mode);
2213 LDLM_LOCK_PUT(matched);
2218 if (*flags & (LDLM_FL_TEST_LOCK | LDLM_FL_MATCH_LOCK))
2222 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2223 &RQF_LDLM_ENQUEUE_LVB);
2227 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
2229 ptlrpc_request_free(req);
2233 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
2235 ptlrpc_request_set_replen(req);
2238 /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2239 *flags &= ~LDLM_FL_BLOCK_GRANTED;
2241 rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2242 sizeof(*lvb), LVB_T_OST, &lockh, async);
2245 struct osc_enqueue_args *aa;
2246 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2247 aa = ptlrpc_req_async_args(req);
2249 aa->oa_mode = einfo->ei_mode;
2250 aa->oa_type = einfo->ei_type;
2251 lustre_handle_copy(&aa->oa_lockh, &lockh);
2252 aa->oa_upcall = upcall;
2253 aa->oa_cookie = cookie;
2256 aa->oa_flags = flags;
2259 /* AGL is essentially to enqueue an DLM lock
2260 * in advance, so we don't care about the
2261 * result of AGL enqueue. */
2263 aa->oa_flags = NULL;
2266 req->rq_interpret_reply =
2267 (ptlrpc_interpterer_t)osc_enqueue_interpret;
2268 if (rqset == PTLRPCD_SET)
2269 ptlrpcd_add_req(req);
2271 ptlrpc_set_add_req(rqset, req);
2272 } else if (intent) {
2273 ptlrpc_req_finished(req);
2278 rc = osc_enqueue_fini(req, upcall, cookie, &lockh, einfo->ei_mode,
2281 ptlrpc_req_finished(req);
2286 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2287 enum ldlm_type type, union ldlm_policy_data *policy,
2288 enum ldlm_mode mode, __u64 *flags, void *data,
2289 struct lustre_handle *lockh, int unref)
2291 struct obd_device *obd = exp->exp_obd;
2292 __u64 lflags = *flags;
2296 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2299 /* Filesystem lock extents are extended to page boundaries so that
2300 * dealing with the page cache is a little smoother */
2301 policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2302 policy->l_extent.end |= ~PAGE_MASK;
2304 /* Next, search for already existing extent locks that will cover us */
2305 /* If we're trying to read, we also search for an existing PW lock. The
2306 * VFS and page cache already protect us locally, so lots of readers/
2307 * writers can share a single PW lock. */
2311 rc = ldlm_lock_match(obd->obd_namespace, lflags,
2312 res_id, type, policy, rc, lockh, unref);
2313 if (rc == 0 || lflags & LDLM_FL_TEST_LOCK)
2317 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2319 LASSERT(lock != NULL);
2320 if (!osc_set_lock_data(lock, data)) {
2321 ldlm_lock_decref(lockh, rc);
2324 LDLM_LOCK_PUT(lock);
2329 static int osc_statfs_interpret(const struct lu_env *env,
2330 struct ptlrpc_request *req,
2331 struct osc_async_args *aa, int rc)
2333 struct obd_statfs *msfs;
2337 /* The request has in fact never been sent
2338 * due to issues at a higher level (LOV).
2339 * Exit immediately since the caller is
2340 * aware of the problem and takes care
2341 * of the clean up */
2344 if ((rc == -ENOTCONN || rc == -EAGAIN) &&
2345 (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
2351 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2353 GOTO(out, rc = -EPROTO);
2356 *aa->aa_oi->oi_osfs = *msfs;
2358 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2362 static int osc_statfs_async(struct obd_export *exp,
2363 struct obd_info *oinfo, __u64 max_age,
2364 struct ptlrpc_request_set *rqset)
2366 struct obd_device *obd = class_exp2obd(exp);
2367 struct ptlrpc_request *req;
2368 struct osc_async_args *aa;
2372 /* We could possibly pass max_age in the request (as an absolute
2373 * timestamp or a "seconds.usec ago") so the target can avoid doing
2374 * extra calls into the filesystem if that isn't necessary (e.g.
2375 * during mount that would help a bit). Having relative timestamps
2376 * is not so great if request processing is slow, while absolute
2377 * timestamps are not ideal because they need time synchronization. */
2378 req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
2382 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2384 ptlrpc_request_free(req);
2387 ptlrpc_request_set_replen(req);
2388 req->rq_request_portal = OST_CREATE_PORTAL;
2389 ptlrpc_at_set_req_timeout(req);
2391 if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
2392 /* procfs requests not want stat in wait for avoid deadlock */
2393 req->rq_no_resend = 1;
2394 req->rq_no_delay = 1;
2397 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
2398 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2399 aa = ptlrpc_req_async_args(req);
2402 ptlrpc_set_add_req(rqset, req);
2406 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
2407 struct obd_statfs *osfs, __u64 max_age, __u32 flags)
2409 struct obd_device *obd = class_exp2obd(exp);
2410 struct obd_statfs *msfs;
2411 struct ptlrpc_request *req;
2412 struct obd_import *imp = NULL;
2416 /*Since the request might also come from lprocfs, so we need
2417 *sync this with client_disconnect_export Bug15684*/
2418 down_read(&obd->u.cli.cl_sem);
2419 if (obd->u.cli.cl_import)
2420 imp = class_import_get(obd->u.cli.cl_import);
2421 up_read(&obd->u.cli.cl_sem);
2425 /* We could possibly pass max_age in the request (as an absolute
2426 * timestamp or a "seconds.usec ago") so the target can avoid doing
2427 * extra calls into the filesystem if that isn't necessary (e.g.
2428 * during mount that would help a bit). Having relative timestamps
2429 * is not so great if request processing is slow, while absolute
2430 * timestamps are not ideal because they need time synchronization. */
2431 req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
2433 class_import_put(imp);
2438 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2440 ptlrpc_request_free(req);
2443 ptlrpc_request_set_replen(req);
2444 req->rq_request_portal = OST_CREATE_PORTAL;
2445 ptlrpc_at_set_req_timeout(req);
2447 if (flags & OBD_STATFS_NODELAY) {
2448 /* procfs requests not want stat in wait for avoid deadlock */
2449 req->rq_no_resend = 1;
2450 req->rq_no_delay = 1;
2453 rc = ptlrpc_queue_wait(req);
2457 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2459 GOTO(out, rc = -EPROTO);
2466 ptlrpc_req_finished(req);
2470 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2471 void *karg, void __user *uarg)
2473 struct obd_device *obd = exp->exp_obd;
2474 struct obd_ioctl_data *data = karg;
2478 if (!try_module_get(THIS_MODULE)) {
2479 CERROR("%s: cannot get module '%s'\n", obd->obd_name,
2480 module_name(THIS_MODULE));
2484 case OBD_IOC_CLIENT_RECOVER:
2485 err = ptlrpc_recover_import(obd->u.cli.cl_import,
2486 data->ioc_inlbuf1, 0);
2490 case IOC_OSC_SET_ACTIVE:
2491 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
2494 case OBD_IOC_PING_TARGET:
2495 err = ptlrpc_obd_ping(obd);
2498 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
2499 cmd, current_comm());
2500 GOTO(out, err = -ENOTTY);
2503 module_put(THIS_MODULE);
2507 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
2508 u32 keylen, void *key,
2509 u32 vallen, void *val,
2510 struct ptlrpc_request_set *set)
2512 struct ptlrpc_request *req;
2513 struct obd_device *obd = exp->exp_obd;
2514 struct obd_import *imp = class_exp2cliimp(exp);
2519 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
2521 if (KEY_IS(KEY_CHECKSUM)) {
2522 if (vallen != sizeof(int))
2524 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
2528 if (KEY_IS(KEY_SPTLRPC_CONF)) {
2529 sptlrpc_conf_client_adapt(obd);
2533 if (KEY_IS(KEY_FLUSH_CTX)) {
2534 sptlrpc_import_flush_my_ctx(imp);
2538 if (KEY_IS(KEY_CACHE_SET)) {
2539 struct client_obd *cli = &obd->u.cli;
2541 LASSERT(cli->cl_cache == NULL); /* only once */
2542 cli->cl_cache = (struct cl_client_cache *)val;
2543 cl_cache_incref(cli->cl_cache);
2544 cli->cl_lru_left = &cli->cl_cache->ccc_lru_left;
2546 /* add this osc into entity list */
2547 LASSERT(list_empty(&cli->cl_lru_osc));
2548 spin_lock(&cli->cl_cache->ccc_lru_lock);
2549 list_add(&cli->cl_lru_osc, &cli->cl_cache->ccc_lru);
2550 spin_unlock(&cli->cl_cache->ccc_lru_lock);
2555 if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
2556 struct client_obd *cli = &obd->u.cli;
2557 long nr = atomic_long_read(&cli->cl_lru_in_list) >> 1;
2558 long target = *(long *)val;
2560 nr = osc_lru_shrink(env, cli, min(nr, target), true);
2565 if (!set && !KEY_IS(KEY_GRANT_SHRINK))
2568 /* We pass all other commands directly to OST. Since nobody calls osc
2569 methods directly and everybody is supposed to go through LOV, we
2570 assume lov checked invalid values for us.
2571 The only recognised values so far are evict_by_nid and mds_conn.
2572 Even if something bad goes through, we'd get a -EINVAL from OST
2575 req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ?
2576 &RQF_OST_SET_GRANT_INFO :
2581 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
2582 RCL_CLIENT, keylen);
2583 if (!KEY_IS(KEY_GRANT_SHRINK))
2584 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
2585 RCL_CLIENT, vallen);
2586 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
2588 ptlrpc_request_free(req);
2592 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
2593 memcpy(tmp, key, keylen);
2594 tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ?
2597 memcpy(tmp, val, vallen);
2599 if (KEY_IS(KEY_GRANT_SHRINK)) {
2600 struct osc_grant_args *aa;
2603 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2604 aa = ptlrpc_req_async_args(req);
2607 ptlrpc_req_finished(req);
2610 *oa = ((struct ost_body *)val)->oa;
2612 req->rq_interpret_reply = osc_shrink_grant_interpret;
2615 ptlrpc_request_set_replen(req);
2616 if (!KEY_IS(KEY_GRANT_SHRINK)) {
2617 LASSERT(set != NULL);
2618 ptlrpc_set_add_req(set, req);
2619 ptlrpc_check_set(NULL, set);
2621 ptlrpcd_add_req(req);
2627 static int osc_reconnect(const struct lu_env *env,
2628 struct obd_export *exp, struct obd_device *obd,
2629 struct obd_uuid *cluuid,
2630 struct obd_connect_data *data,
2633 struct client_obd *cli = &obd->u.cli;
2635 if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
2639 spin_lock(&cli->cl_loi_list_lock);
2640 grant = cli->cl_avail_grant + cli->cl_reserved_grant;
2641 if (data->ocd_connect_flags & OBD_CONNECT_GRANT_PARAM)
2642 grant += cli->cl_dirty_grant;
2644 grant += cli->cl_dirty_pages << PAGE_SHIFT;
2645 data->ocd_grant = grant ? : 2 * cli_brw_size(obd);
2646 lost_grant = cli->cl_lost_grant;
2647 cli->cl_lost_grant = 0;
2648 spin_unlock(&cli->cl_loi_list_lock);
2650 CDEBUG(D_RPCTRACE, "ocd_connect_flags: %#llx ocd_version: %d"
2651 " ocd_grant: %d, lost: %ld.\n", data->ocd_connect_flags,
2652 data->ocd_version, data->ocd_grant, lost_grant);
2658 static int osc_disconnect(struct obd_export *exp)
2660 struct obd_device *obd = class_exp2obd(exp);
2663 rc = client_disconnect_export(exp);
2665 * Initially we put del_shrink_grant before disconnect_export, but it
2666 * causes the following problem if setup (connect) and cleanup
2667 * (disconnect) are tangled together.
2668 * connect p1 disconnect p2
2669 * ptlrpc_connect_import
2670 * ............... class_manual_cleanup
2673 * ptlrpc_connect_interrupt
2675 * add this client to shrink list
2677 * Bang! pinger trigger the shrink.
2678 * So the osc should be disconnected from the shrink list, after we
2679 * are sure the import has been destroyed. BUG18662
2681 if (obd->u.cli.cl_import == NULL)
2682 osc_del_shrink_grant(&obd->u.cli);
2686 static int osc_ldlm_resource_invalidate(struct cfs_hash *hs,
2687 struct cfs_hash_bd *bd, struct hlist_node *hnode, void *arg)
2689 struct lu_env *env = arg;
2690 struct ldlm_resource *res = cfs_hash_object(hs, hnode);
2691 struct ldlm_lock *lock;
2692 struct osc_object *osc = NULL;
2696 list_for_each_entry(lock, &res->lr_granted, l_res_link) {
2697 if (lock->l_ast_data != NULL && osc == NULL) {
2698 osc = lock->l_ast_data;
2699 cl_object_get(osc2cl(osc));
2702 /* clear LDLM_FL_CLEANED flag to make sure it will be canceled
2703 * by the 2nd round of ldlm_namespace_clean() call in
2704 * osc_import_event(). */
2705 ldlm_clear_cleaned(lock);
2710 osc_object_invalidate(env, osc);
2711 cl_object_put(env, osc2cl(osc));
2717 static int osc_import_event(struct obd_device *obd,
2718 struct obd_import *imp,
2719 enum obd_import_event event)
2721 struct client_obd *cli;
2725 LASSERT(imp->imp_obd == obd);
2728 case IMP_EVENT_DISCON: {
2730 spin_lock(&cli->cl_loi_list_lock);
2731 cli->cl_avail_grant = 0;
2732 cli->cl_lost_grant = 0;
2733 spin_unlock(&cli->cl_loi_list_lock);
2736 case IMP_EVENT_INACTIVE: {
2737 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE);
2740 case IMP_EVENT_INVALIDATE: {
2741 struct ldlm_namespace *ns = obd->obd_namespace;
2745 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
2747 env = cl_env_get(&refcheck);
2749 osc_io_unplug(env, &obd->u.cli, NULL);
2751 cfs_hash_for_each_nolock(ns->ns_rs_hash,
2752 osc_ldlm_resource_invalidate,
2754 cl_env_put(env, &refcheck);
2756 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
2761 case IMP_EVENT_ACTIVE: {
2762 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE);
2765 case IMP_EVENT_OCD: {
2766 struct obd_connect_data *ocd = &imp->imp_connect_data;
2768 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
2769 osc_init_grant(&obd->u.cli, ocd);
2772 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
2773 imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
2775 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD);
2778 case IMP_EVENT_DEACTIVATE: {
2779 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE);
2782 case IMP_EVENT_ACTIVATE: {
2783 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE);
2787 CERROR("Unknown import event %d\n", event);
2794 * Determine whether the lock can be canceled before replaying the lock
2795 * during recovery, see bug16774 for detailed information.
2797 * \retval zero the lock can't be canceled
2798 * \retval other ok to cancel
2800 static int osc_cancel_weight(struct ldlm_lock *lock)
2803 * Cancel all unused and granted extent lock.
2805 if (lock->l_resource->lr_type == LDLM_EXTENT &&
2806 lock->l_granted_mode == lock->l_req_mode &&
2807 osc_ldlm_weigh_ast(lock) == 0)
2813 static int brw_queue_work(const struct lu_env *env, void *data)
2815 struct client_obd *cli = data;
2817 CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
2819 osc_io_unplug(env, cli, NULL);
2823 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
2825 struct client_obd *cli = &obd->u.cli;
2826 struct obd_type *type;
2834 rc = ptlrpcd_addref();
2838 rc = client_obd_setup(obd, lcfg);
2840 GOTO(out_ptlrpcd, rc);
2842 handler = ptlrpcd_alloc_work(cli->cl_import, brw_queue_work, cli);
2843 if (IS_ERR(handler))
2844 GOTO(out_client_setup, rc = PTR_ERR(handler));
2845 cli->cl_writeback_work = handler;
2847 handler = ptlrpcd_alloc_work(cli->cl_import, lru_queue_work, cli);
2848 if (IS_ERR(handler))
2849 GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
2850 cli->cl_lru_work = handler;
2852 rc = osc_quota_setup(obd);
2854 GOTO(out_ptlrpcd_work, rc);
2856 cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
2858 #ifdef CONFIG_PROC_FS
2859 obd->obd_vars = lprocfs_osc_obd_vars;
2861 /* If this is true then both client (osc) and server (osp) are on the
2862 * same node. The osp layer if loaded first will register the osc proc
2863 * directory. In that case this obd_device will be attached its proc
2864 * tree to type->typ_procsym instead of obd->obd_type->typ_procroot. */
2865 type = class_search_type(LUSTRE_OSP_NAME);
2866 if (type && type->typ_procsym) {
2867 obd->obd_proc_entry = lprocfs_register(obd->obd_name,
2869 obd->obd_vars, obd);
2870 if (IS_ERR(obd->obd_proc_entry)) {
2871 rc = PTR_ERR(obd->obd_proc_entry);
2872 CERROR("error %d setting up lprocfs for %s\n", rc,
2874 obd->obd_proc_entry = NULL;
2877 rc = lprocfs_obd_setup(obd);
2880 /* If the basic OSC proc tree construction succeeded then
2881 * lets do the rest. */
2883 lproc_osc_attach_seqstat(obd);
2884 sptlrpc_lprocfs_cliobd_attach(obd);
2885 ptlrpc_lprocfs_register_obd(obd);
2889 * We try to control the total number of requests with a upper limit
2890 * osc_reqpool_maxreqcount. There might be some race which will cause
2891 * over-limit allocation, but it is fine.
2893 req_count = atomic_read(&osc_pool_req_count);
2894 if (req_count < osc_reqpool_maxreqcount) {
2895 adding = cli->cl_max_rpcs_in_flight + 2;
2896 if (req_count + adding > osc_reqpool_maxreqcount)
2897 adding = osc_reqpool_maxreqcount - req_count;
2899 added = ptlrpc_add_rqs_to_pool(osc_rq_pool, adding);
2900 atomic_add(added, &osc_pool_req_count);
2903 INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
2904 ns_register_cancel(obd->obd_namespace, osc_cancel_weight);
2906 spin_lock(&osc_shrink_lock);
2907 list_add_tail(&cli->cl_shrink_list, &osc_shrink_list);
2908 spin_unlock(&osc_shrink_lock);
2913 if (cli->cl_writeback_work != NULL) {
2914 ptlrpcd_destroy_work(cli->cl_writeback_work);
2915 cli->cl_writeback_work = NULL;
2917 if (cli->cl_lru_work != NULL) {
2918 ptlrpcd_destroy_work(cli->cl_lru_work);
2919 cli->cl_lru_work = NULL;
2922 client_obd_cleanup(obd);
2928 static int osc_precleanup(struct obd_device *obd)
2930 struct client_obd *cli = &obd->u.cli;
2934 * for echo client, export may be on zombie list, wait for
2935 * zombie thread to cull it, because cli.cl_import will be
2936 * cleared in client_disconnect_export():
2937 * class_export_destroy() -> obd_cleanup() ->
2938 * echo_device_free() -> echo_client_cleanup() ->
2939 * obd_disconnect() -> osc_disconnect() ->
2940 * client_disconnect_export()
2942 obd_zombie_barrier();
2943 if (cli->cl_writeback_work) {
2944 ptlrpcd_destroy_work(cli->cl_writeback_work);
2945 cli->cl_writeback_work = NULL;
2948 if (cli->cl_lru_work) {
2949 ptlrpcd_destroy_work(cli->cl_lru_work);
2950 cli->cl_lru_work = NULL;
2953 obd_cleanup_client_import(obd);
2954 ptlrpc_lprocfs_unregister_obd(obd);
2955 lprocfs_obd_cleanup(obd);
2959 int osc_cleanup(struct obd_device *obd)
2961 struct client_obd *cli = &obd->u.cli;
2966 spin_lock(&osc_shrink_lock);
2967 list_del(&cli->cl_shrink_list);
2968 spin_unlock(&osc_shrink_lock);
2971 if (cli->cl_cache != NULL) {
2972 LASSERT(atomic_read(&cli->cl_cache->ccc_users) > 0);
2973 spin_lock(&cli->cl_cache->ccc_lru_lock);
2974 list_del_init(&cli->cl_lru_osc);
2975 spin_unlock(&cli->cl_cache->ccc_lru_lock);
2976 cli->cl_lru_left = NULL;
2977 cl_cache_decref(cli->cl_cache);
2978 cli->cl_cache = NULL;
2981 /* free memory of osc quota cache */
2982 osc_quota_cleanup(obd);
2984 rc = client_obd_cleanup(obd);
2990 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
2992 int rc = class_process_proc_param(PARAM_OSC, obd->obd_vars, lcfg, obd);
2993 return rc > 0 ? 0: rc;
2996 static int osc_process_config(struct obd_device *obd, size_t len, void *buf)
2998 return osc_process_config_base(obd, buf);
3001 static struct obd_ops osc_obd_ops = {
3002 .o_owner = THIS_MODULE,
3003 .o_setup = osc_setup,
3004 .o_precleanup = osc_precleanup,
3005 .o_cleanup = osc_cleanup,
3006 .o_add_conn = client_import_add_conn,
3007 .o_del_conn = client_import_del_conn,
3008 .o_connect = client_connect_import,
3009 .o_reconnect = osc_reconnect,
3010 .o_disconnect = osc_disconnect,
3011 .o_statfs = osc_statfs,
3012 .o_statfs_async = osc_statfs_async,
3013 .o_create = osc_create,
3014 .o_destroy = osc_destroy,
3015 .o_getattr = osc_getattr,
3016 .o_setattr = osc_setattr,
3017 .o_iocontrol = osc_iocontrol,
3018 .o_set_info_async = osc_set_info_async,
3019 .o_import_event = osc_import_event,
3020 .o_process_config = osc_process_config,
3021 .o_quotactl = osc_quotactl,
3024 static struct shrinker *osc_cache_shrinker;
3025 struct list_head osc_shrink_list = LIST_HEAD_INIT(osc_shrink_list);
3026 DEFINE_SPINLOCK(osc_shrink_lock);
3028 #ifndef HAVE_SHRINKER_COUNT
3029 static int osc_cache_shrink(SHRINKER_ARGS(sc, nr_to_scan, gfp_mask))
3031 struct shrink_control scv = {
3032 .nr_to_scan = shrink_param(sc, nr_to_scan),
3033 .gfp_mask = shrink_param(sc, gfp_mask)
3035 #if !defined(HAVE_SHRINKER_WANT_SHRINK_PTR) && !defined(HAVE_SHRINK_CONTROL)
3036 struct shrinker *shrinker = NULL;
3039 (void)osc_cache_shrink_scan(shrinker, &scv);
3041 return osc_cache_shrink_count(shrinker, &scv);
3045 static int __init osc_init(void)
3047 bool enable_proc = true;
3048 struct obd_type *type;
3049 unsigned int reqpool_size;
3050 unsigned int reqsize;
3052 DEF_SHRINKER_VAR(osc_shvar, osc_cache_shrink,
3053 osc_cache_shrink_count, osc_cache_shrink_scan);
3056 /* print an address of _any_ initialized kernel symbol from this
3057 * module, to allow debugging with gdb that doesn't support data
3058 * symbols from modules.*/
3059 CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
3061 rc = lu_kmem_init(osc_caches);
3065 type = class_search_type(LUSTRE_OSP_NAME);
3066 if (type != NULL && type->typ_procsym != NULL)
3067 enable_proc = false;
3069 rc = class_register_type(&osc_obd_ops, NULL, enable_proc, NULL,
3070 LUSTRE_OSC_NAME, &osc_device_type);
3074 osc_cache_shrinker = set_shrinker(DEFAULT_SEEKS, &osc_shvar);
3076 /* This is obviously too much memory, only prevent overflow here */
3077 if (osc_reqpool_mem_max >= 1 << 12 || osc_reqpool_mem_max == 0)
3078 GOTO(out_type, rc = -EINVAL);
3080 reqpool_size = osc_reqpool_mem_max << 20;
3083 while (reqsize < OST_IO_MAXREQSIZE)
3084 reqsize = reqsize << 1;
3087 * We don't enlarge the request count in OSC pool according to
3088 * cl_max_rpcs_in_flight. The allocation from the pool will only be
3089 * tried after normal allocation failed. So a small OSC pool won't
3090 * cause much performance degression in most of cases.
3092 osc_reqpool_maxreqcount = reqpool_size / reqsize;
3094 atomic_set(&osc_pool_req_count, 0);
3095 osc_rq_pool = ptlrpc_init_rq_pool(0, OST_IO_MAXREQSIZE,
3096 ptlrpc_add_rqs_to_pool);
3098 if (osc_rq_pool != NULL)
3102 class_unregister_type(LUSTRE_OSC_NAME);
3104 lu_kmem_fini(osc_caches);
3109 static void __exit osc_exit(void)
3111 remove_shrinker(osc_cache_shrinker);
3112 class_unregister_type(LUSTRE_OSC_NAME);
3113 lu_kmem_fini(osc_caches);
3114 ptlrpc_free_rq_pool(osc_rq_pool);
3117 MODULE_AUTHOR("OpenSFS, Inc. <http://www.lustre.org/>");
3118 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3119 MODULE_VERSION(LUSTRE_VERSION_STRING);
3120 MODULE_LICENSE("GPL");
3122 module_init(osc_init);
3123 module_exit(osc_exit);