4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.gnu.org/licenses/gpl-2.0.html
23 * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
24 * Use is subject to license terms.
26 * Copyright (c) 2011, 2015, Intel Corporation.
29 * This file is part of Lustre, http://www.lustre.org/
30 * Lustre is a trademark of Sun Microsystems, Inc.
33 #define DEBUG_SUBSYSTEM S_OSC
35 #include <libcfs/libcfs.h>
37 #include <lustre/lustre_user.h>
39 #include <lprocfs_status.h>
40 #include <lustre_debug.h>
41 #include <lustre_dlm.h>
42 #include <lustre_fid.h>
43 #include <lustre_ha.h>
44 #include <lustre_ioctl.h>
45 #include <lustre_net.h>
46 #include <lustre_obdo.h>
47 #include <lustre_param.h>
49 #include <obd_cksum.h>
50 #include <obd_class.h>
52 #include "osc_cl_internal.h"
53 #include "osc_internal.h"
55 atomic_t osc_pool_req_count;
56 unsigned int osc_reqpool_maxreqcount;
57 struct ptlrpc_request_pool *osc_rq_pool;
59 /* max memory used for request pool, unit is MB */
60 static unsigned int osc_reqpool_mem_max = 5;
61 module_param(osc_reqpool_mem_max, uint, 0444);
63 struct osc_brw_async_args {
69 struct brw_page **aa_ppga;
70 struct client_obd *aa_cli;
71 struct list_head aa_oaps;
72 struct list_head aa_exts;
75 #define osc_grant_args osc_brw_async_args
77 struct osc_setattr_args {
79 obd_enqueue_update_f sa_upcall;
83 struct osc_fsync_args {
84 struct osc_object *fa_obj;
86 obd_enqueue_update_f fa_upcall;
90 struct osc_ladvise_args {
92 obd_enqueue_update_f la_upcall;
96 struct osc_enqueue_args {
97 struct obd_export *oa_exp;
98 enum ldlm_type oa_type;
99 enum ldlm_mode oa_mode;
101 osc_enqueue_upcall_f oa_upcall;
103 struct ost_lvb *oa_lvb;
104 struct lustre_handle oa_lockh;
105 unsigned int oa_agl:1;
108 static void osc_release_ppga(struct brw_page **ppga, size_t count);
109 static int brw_interpret(const struct lu_env *env, struct ptlrpc_request *req,
112 void osc_pack_req_body(struct ptlrpc_request *req, struct obdo *oa)
114 struct ost_body *body;
116 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
119 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
122 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
125 struct ptlrpc_request *req;
126 struct ost_body *body;
130 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
134 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
136 ptlrpc_request_free(req);
140 osc_pack_req_body(req, oa);
142 ptlrpc_request_set_replen(req);
144 rc = ptlrpc_queue_wait(req);
148 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
150 GOTO(out, rc = -EPROTO);
152 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
153 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
155 oa->o_blksize = cli_brw_size(exp->exp_obd);
156 oa->o_valid |= OBD_MD_FLBLKSZ;
160 ptlrpc_req_finished(req);
165 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
168 struct ptlrpc_request *req;
169 struct ost_body *body;
173 LASSERT(oa->o_valid & OBD_MD_FLGROUP);
175 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
179 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
181 ptlrpc_request_free(req);
185 osc_pack_req_body(req, oa);
187 ptlrpc_request_set_replen(req);
189 rc = ptlrpc_queue_wait(req);
193 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
195 GOTO(out, rc = -EPROTO);
197 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
201 ptlrpc_req_finished(req);
206 static int osc_setattr_interpret(const struct lu_env *env,
207 struct ptlrpc_request *req,
208 struct osc_setattr_args *sa, int rc)
210 struct ost_body *body;
216 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
218 GOTO(out, rc = -EPROTO);
220 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, sa->sa_oa,
223 rc = sa->sa_upcall(sa->sa_cookie, rc);
227 int osc_setattr_async(struct obd_export *exp, struct obdo *oa,
228 obd_enqueue_update_f upcall, void *cookie,
229 struct ptlrpc_request_set *rqset)
231 struct ptlrpc_request *req;
232 struct osc_setattr_args *sa;
237 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
241 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
243 ptlrpc_request_free(req);
247 osc_pack_req_body(req, oa);
249 ptlrpc_request_set_replen(req);
251 /* do mds to ost setattr asynchronously */
253 /* Do not wait for response. */
254 ptlrpcd_add_req(req);
256 req->rq_interpret_reply =
257 (ptlrpc_interpterer_t)osc_setattr_interpret;
259 CLASSERT(sizeof(*sa) <= sizeof(req->rq_async_args));
260 sa = ptlrpc_req_async_args(req);
262 sa->sa_upcall = upcall;
263 sa->sa_cookie = cookie;
265 if (rqset == PTLRPCD_SET)
266 ptlrpcd_add_req(req);
268 ptlrpc_set_add_req(rqset, req);
274 static int osc_ladvise_interpret(const struct lu_env *env,
275 struct ptlrpc_request *req,
278 struct osc_ladvise_args *la = arg;
279 struct ost_body *body;
285 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
287 GOTO(out, rc = -EPROTO);
289 *la->la_oa = body->oa;
291 rc = la->la_upcall(la->la_cookie, rc);
296 * If rqset is NULL, do not wait for response. Upcall and cookie could also
297 * be NULL in this case
299 int osc_ladvise_base(struct obd_export *exp, struct obdo *oa,
300 struct ladvise_hdr *ladvise_hdr,
301 obd_enqueue_update_f upcall, void *cookie,
302 struct ptlrpc_request_set *rqset)
304 struct ptlrpc_request *req;
305 struct ost_body *body;
306 struct osc_ladvise_args *la;
308 struct lu_ladvise *req_ladvise;
309 struct lu_ladvise *ladvise = ladvise_hdr->lah_advise;
310 int num_advise = ladvise_hdr->lah_count;
311 struct ladvise_hdr *req_ladvise_hdr;
314 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_LADVISE);
318 req_capsule_set_size(&req->rq_pill, &RMF_OST_LADVISE, RCL_CLIENT,
319 num_advise * sizeof(*ladvise));
320 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_LADVISE);
322 ptlrpc_request_free(req);
325 req->rq_request_portal = OST_IO_PORTAL;
326 ptlrpc_at_set_req_timeout(req);
328 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
330 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
333 req_ladvise_hdr = req_capsule_client_get(&req->rq_pill,
334 &RMF_OST_LADVISE_HDR);
335 memcpy(req_ladvise_hdr, ladvise_hdr, sizeof(*ladvise_hdr));
337 req_ladvise = req_capsule_client_get(&req->rq_pill, &RMF_OST_LADVISE);
338 memcpy(req_ladvise, ladvise, sizeof(*ladvise) * num_advise);
339 ptlrpc_request_set_replen(req);
342 /* Do not wait for response. */
343 ptlrpcd_add_req(req);
347 req->rq_interpret_reply = osc_ladvise_interpret;
348 CLASSERT(sizeof(*la) <= sizeof(req->rq_async_args));
349 la = ptlrpc_req_async_args(req);
351 la->la_upcall = upcall;
352 la->la_cookie = cookie;
354 if (rqset == PTLRPCD_SET)
355 ptlrpcd_add_req(req);
357 ptlrpc_set_add_req(rqset, req);
362 static int osc_create(const struct lu_env *env, struct obd_export *exp,
365 struct ptlrpc_request *req;
366 struct ost_body *body;
371 LASSERT(oa->o_valid & OBD_MD_FLGROUP);
372 LASSERT(fid_seq_is_echo(ostid_seq(&oa->o_oi)));
374 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
376 GOTO(out, rc = -ENOMEM);
378 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
380 ptlrpc_request_free(req);
384 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
387 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
389 ptlrpc_request_set_replen(req);
391 rc = ptlrpc_queue_wait(req);
395 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
397 GOTO(out_req, rc = -EPROTO);
399 CDEBUG(D_INFO, "oa flags %x\n", oa->o_flags);
400 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
402 oa->o_blksize = cli_brw_size(exp->exp_obd);
403 oa->o_valid |= OBD_MD_FLBLKSZ;
405 CDEBUG(D_HA, "transno: %lld\n",
406 lustre_msg_get_transno(req->rq_repmsg));
408 ptlrpc_req_finished(req);
413 int osc_punch_base(struct obd_export *exp, struct obdo *oa,
414 obd_enqueue_update_f upcall, void *cookie,
415 struct ptlrpc_request_set *rqset)
417 struct ptlrpc_request *req;
418 struct osc_setattr_args *sa;
419 struct ost_body *body;
423 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
427 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
429 ptlrpc_request_free(req);
432 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
433 ptlrpc_at_set_req_timeout(req);
435 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
437 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
439 ptlrpc_request_set_replen(req);
441 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
442 CLASSERT(sizeof(*sa) <= sizeof(req->rq_async_args));
443 sa = ptlrpc_req_async_args(req);
445 sa->sa_upcall = upcall;
446 sa->sa_cookie = cookie;
447 if (rqset == PTLRPCD_SET)
448 ptlrpcd_add_req(req);
450 ptlrpc_set_add_req(rqset, req);
455 static int osc_sync_interpret(const struct lu_env *env,
456 struct ptlrpc_request *req,
459 struct osc_fsync_args *fa = arg;
460 struct ost_body *body;
461 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
462 unsigned long valid = 0;
463 struct cl_object *obj;
469 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
471 CERROR("can't unpack ost_body\n");
472 GOTO(out, rc = -EPROTO);
475 *fa->fa_oa = body->oa;
476 obj = osc2cl(fa->fa_obj);
478 /* Update osc object's blocks attribute */
479 cl_object_attr_lock(obj);
480 if (body->oa.o_valid & OBD_MD_FLBLOCKS) {
481 attr->cat_blocks = body->oa.o_blocks;
486 cl_object_attr_update(env, obj, attr, valid);
487 cl_object_attr_unlock(obj);
490 rc = fa->fa_upcall(fa->fa_cookie, rc);
494 int osc_sync_base(struct osc_object *obj, struct obdo *oa,
495 obd_enqueue_update_f upcall, void *cookie,
496 struct ptlrpc_request_set *rqset)
498 struct obd_export *exp = osc_export(obj);
499 struct ptlrpc_request *req;
500 struct ost_body *body;
501 struct osc_fsync_args *fa;
505 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
509 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
511 ptlrpc_request_free(req);
515 /* overload the size and blocks fields in the oa with start/end */
516 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
518 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
520 ptlrpc_request_set_replen(req);
521 req->rq_interpret_reply = osc_sync_interpret;
523 CLASSERT(sizeof(*fa) <= sizeof(req->rq_async_args));
524 fa = ptlrpc_req_async_args(req);
527 fa->fa_upcall = upcall;
528 fa->fa_cookie = cookie;
530 if (rqset == PTLRPCD_SET)
531 ptlrpcd_add_req(req);
533 ptlrpc_set_add_req(rqset, req);
538 /* Find and cancel locally locks matched by @mode in the resource found by
539 * @objid. Found locks are added into @cancel list. Returns the amount of
540 * locks added to @cancels list. */
541 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
542 struct list_head *cancels,
543 enum ldlm_mode mode, __u64 lock_flags)
545 struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
546 struct ldlm_res_id res_id;
547 struct ldlm_resource *res;
551 /* Return, i.e. cancel nothing, only if ELC is supported (flag in
552 * export) but disabled through procfs (flag in NS).
554 * This distinguishes from a case when ELC is not supported originally,
555 * when we still want to cancel locks in advance and just cancel them
556 * locally, without sending any RPC. */
557 if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
560 ostid_build_res_name(&oa->o_oi, &res_id);
561 res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
565 LDLM_RESOURCE_ADDREF(res);
566 count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
567 lock_flags, 0, NULL);
568 LDLM_RESOURCE_DELREF(res);
569 ldlm_resource_putref(res);
573 static int osc_destroy_interpret(const struct lu_env *env,
574 struct ptlrpc_request *req, void *data,
577 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
579 atomic_dec(&cli->cl_destroy_in_flight);
580 wake_up(&cli->cl_destroy_waitq);
584 static int osc_can_send_destroy(struct client_obd *cli)
586 if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
587 cli->cl_max_rpcs_in_flight) {
588 /* The destroy request can be sent */
591 if (atomic_dec_return(&cli->cl_destroy_in_flight) <
592 cli->cl_max_rpcs_in_flight) {
594 * The counter has been modified between the two atomic
597 wake_up(&cli->cl_destroy_waitq);
602 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
605 struct client_obd *cli = &exp->exp_obd->u.cli;
606 struct ptlrpc_request *req;
607 struct ost_body *body;
608 struct list_head cancels = LIST_HEAD_INIT(cancels);
613 CDEBUG(D_INFO, "oa NULL\n");
617 count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
618 LDLM_FL_DISCARD_DATA);
620 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
622 ldlm_lock_list_put(&cancels, l_bl_ast, count);
626 rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
629 ptlrpc_request_free(req);
633 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
634 ptlrpc_at_set_req_timeout(req);
636 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
638 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
640 ptlrpc_request_set_replen(req);
642 req->rq_interpret_reply = osc_destroy_interpret;
643 if (!osc_can_send_destroy(cli)) {
644 struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
647 * Wait until the number of on-going destroy RPCs drops
648 * under max_rpc_in_flight
650 l_wait_event_exclusive(cli->cl_destroy_waitq,
651 osc_can_send_destroy(cli), &lwi);
654 /* Do not wait for response */
655 ptlrpcd_add_req(req);
659 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
662 u64 bits = OBD_MD_FLBLOCKS | OBD_MD_FLGRANT;
664 LASSERT(!(oa->o_valid & bits));
667 spin_lock(&cli->cl_loi_list_lock);
668 if (OCD_HAS_FLAG(&cli->cl_import->imp_connect_data, GRANT_PARAM))
669 oa->o_dirty = cli->cl_dirty_grant;
671 oa->o_dirty = cli->cl_dirty_pages << PAGE_SHIFT;
672 if (unlikely(cli->cl_dirty_pages - cli->cl_dirty_transit >
673 cli->cl_dirty_max_pages)) {
674 CERROR("dirty %lu - %lu > dirty_max %lu\n",
675 cli->cl_dirty_pages, cli->cl_dirty_transit,
676 cli->cl_dirty_max_pages);
678 } else if (unlikely(atomic_long_read(&obd_dirty_pages) -
679 atomic_long_read(&obd_dirty_transit_pages) >
680 (long)(obd_max_dirty_pages + 1))) {
681 /* The atomic_read() allowing the atomic_inc() are
682 * not covered by a lock thus they may safely race and trip
683 * this CERROR() unless we add in a small fudge factor (+1). */
684 CERROR("%s: dirty %ld - %ld > system dirty_max %ld\n",
685 cli_name(cli), atomic_long_read(&obd_dirty_pages),
686 atomic_long_read(&obd_dirty_transit_pages),
687 obd_max_dirty_pages);
689 } else if (unlikely(cli->cl_dirty_max_pages - cli->cl_dirty_pages >
691 CERROR("dirty %lu - dirty_max %lu too big???\n",
692 cli->cl_dirty_pages, cli->cl_dirty_max_pages);
695 unsigned long nrpages;
697 nrpages = cli->cl_max_pages_per_rpc;
698 nrpages *= cli->cl_max_rpcs_in_flight + 1;
699 nrpages = max(nrpages, cli->cl_dirty_max_pages);
700 oa->o_undirty = nrpages << PAGE_SHIFT;
701 if (OCD_HAS_FLAG(&cli->cl_import->imp_connect_data,
705 /* take extent tax into account when asking for more
707 nrextents = (nrpages + cli->cl_max_extent_pages - 1) /
708 cli->cl_max_extent_pages;
709 oa->o_undirty += nrextents * cli->cl_grant_extent_tax;
712 oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
713 oa->o_dropped = cli->cl_lost_grant;
714 cli->cl_lost_grant = 0;
715 spin_unlock(&cli->cl_loi_list_lock);
716 CDEBUG(D_CACHE, "dirty: %llu undirty: %u dropped %u grant: %llu\n",
717 oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
720 void osc_update_next_shrink(struct client_obd *cli)
722 cli->cl_next_shrink_grant =
723 cfs_time_shift(cli->cl_grant_shrink_interval);
724 CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
725 cli->cl_next_shrink_grant);
728 static void __osc_update_grant(struct client_obd *cli, u64 grant)
730 spin_lock(&cli->cl_loi_list_lock);
731 cli->cl_avail_grant += grant;
732 spin_unlock(&cli->cl_loi_list_lock);
735 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
737 if (body->oa.o_valid & OBD_MD_FLGRANT) {
738 CDEBUG(D_CACHE, "got %llu extra grant\n", body->oa.o_grant);
739 __osc_update_grant(cli, body->oa.o_grant);
743 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
744 u32 keylen, void *key,
745 u32 vallen, void *val,
746 struct ptlrpc_request_set *set);
748 static int osc_shrink_grant_interpret(const struct lu_env *env,
749 struct ptlrpc_request *req,
752 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
753 struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
754 struct ost_body *body;
757 __osc_update_grant(cli, oa->o_grant);
761 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
763 osc_update_grant(cli, body);
769 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
771 spin_lock(&cli->cl_loi_list_lock);
772 oa->o_grant = cli->cl_avail_grant / 4;
773 cli->cl_avail_grant -= oa->o_grant;
774 spin_unlock(&cli->cl_loi_list_lock);
775 if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
776 oa->o_valid |= OBD_MD_FLFLAGS;
779 oa->o_flags |= OBD_FL_SHRINK_GRANT;
780 osc_update_next_shrink(cli);
783 /* Shrink the current grant, either from some large amount to enough for a
784 * full set of in-flight RPCs, or if we have already shrunk to that limit
785 * then to enough for a single RPC. This avoids keeping more grant than
786 * needed, and avoids shrinking the grant piecemeal. */
787 static int osc_shrink_grant(struct client_obd *cli)
789 __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) *
790 (cli->cl_max_pages_per_rpc << PAGE_SHIFT);
792 spin_lock(&cli->cl_loi_list_lock);
793 if (cli->cl_avail_grant <= target_bytes)
794 target_bytes = cli->cl_max_pages_per_rpc << PAGE_SHIFT;
795 spin_unlock(&cli->cl_loi_list_lock);
797 return osc_shrink_grant_to_target(cli, target_bytes);
800 int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
803 struct ost_body *body;
806 spin_lock(&cli->cl_loi_list_lock);
807 /* Don't shrink if we are already above or below the desired limit
808 * We don't want to shrink below a single RPC, as that will negatively
809 * impact block allocation and long-term performance. */
810 if (target_bytes < cli->cl_max_pages_per_rpc << PAGE_SHIFT)
811 target_bytes = cli->cl_max_pages_per_rpc << PAGE_SHIFT;
813 if (target_bytes >= cli->cl_avail_grant) {
814 spin_unlock(&cli->cl_loi_list_lock);
817 spin_unlock(&cli->cl_loi_list_lock);
823 osc_announce_cached(cli, &body->oa, 0);
825 spin_lock(&cli->cl_loi_list_lock);
826 body->oa.o_grant = cli->cl_avail_grant - target_bytes;
827 cli->cl_avail_grant = target_bytes;
828 spin_unlock(&cli->cl_loi_list_lock);
829 if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
830 body->oa.o_valid |= OBD_MD_FLFLAGS;
831 body->oa.o_flags = 0;
833 body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
834 osc_update_next_shrink(cli);
836 rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
837 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
838 sizeof(*body), body, NULL);
840 __osc_update_grant(cli, body->oa.o_grant);
845 static int osc_should_shrink_grant(struct client_obd *client)
847 cfs_time_t time = cfs_time_current();
848 cfs_time_t next_shrink = client->cl_next_shrink_grant;
850 if ((client->cl_import->imp_connect_data.ocd_connect_flags &
851 OBD_CONNECT_GRANT_SHRINK) == 0)
854 if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
855 /* Get the current RPC size directly, instead of going via:
856 * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
857 * Keep comment here so that it can be found by searching. */
858 int brw_size = client->cl_max_pages_per_rpc << PAGE_SHIFT;
860 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
861 client->cl_avail_grant > brw_size)
864 osc_update_next_shrink(client);
869 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
871 struct client_obd *client;
873 list_for_each_entry(client, &item->ti_obd_list, cl_grant_shrink_list) {
874 if (osc_should_shrink_grant(client))
875 osc_shrink_grant(client);
880 static int osc_add_shrink_grant(struct client_obd *client)
884 rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
886 osc_grant_shrink_grant_cb, NULL,
887 &client->cl_grant_shrink_list);
889 CERROR("add grant client %s error %d\n", cli_name(client), rc);
892 CDEBUG(D_CACHE, "add grant client %s\n", cli_name(client));
893 osc_update_next_shrink(client);
897 static int osc_del_shrink_grant(struct client_obd *client)
899 return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
903 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
906 * ocd_grant is the total grant amount we're expect to hold: if we've
907 * been evicted, it's the new avail_grant amount, cl_dirty_pages will
908 * drop to 0 as inflight RPCs fail out; otherwise, it's avail_grant +
911 * race is tolerable here: if we're evicted, but imp_state already
912 * left EVICTED state, then cl_dirty_pages must be 0 already.
914 spin_lock(&cli->cl_loi_list_lock);
915 cli->cl_avail_grant = ocd->ocd_grant;
916 if (cli->cl_import->imp_state != LUSTRE_IMP_EVICTED) {
917 cli->cl_avail_grant -= cli->cl_reserved_grant;
918 if (OCD_HAS_FLAG(ocd, GRANT_PARAM))
919 cli->cl_avail_grant -= cli->cl_dirty_grant;
921 cli->cl_avail_grant -=
922 cli->cl_dirty_pages << PAGE_SHIFT;
925 if (OCD_HAS_FLAG(ocd, GRANT_PARAM)) {
929 /* overhead for each extent insertion */
930 cli->cl_grant_extent_tax = ocd->ocd_grant_tax_kb << 10;
931 /* determine the appropriate chunk size used by osc_extent. */
932 cli->cl_chunkbits = max_t(int, PAGE_SHIFT,
933 ocd->ocd_grant_blkbits);
934 /* max_pages_per_rpc must be chunk aligned */
935 chunk_mask = ~((1 << (cli->cl_chunkbits - PAGE_SHIFT)) - 1);
936 cli->cl_max_pages_per_rpc = (cli->cl_max_pages_per_rpc +
937 ~chunk_mask) & chunk_mask;
938 /* determine maximum extent size, in #pages */
939 size = (u64)ocd->ocd_grant_max_blks << ocd->ocd_grant_blkbits;
940 cli->cl_max_extent_pages = size >> PAGE_SHIFT;
941 if (cli->cl_max_extent_pages == 0)
942 cli->cl_max_extent_pages = 1;
944 cli->cl_grant_extent_tax = 0;
945 cli->cl_chunkbits = PAGE_SHIFT;
946 cli->cl_max_extent_pages = DT_MAX_BRW_PAGES;
948 spin_unlock(&cli->cl_loi_list_lock);
950 CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld."
951 "chunk bits: %d cl_max_extent_pages: %d\n",
953 cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits,
954 cli->cl_max_extent_pages);
956 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
957 list_empty(&cli->cl_grant_shrink_list))
958 osc_add_shrink_grant(cli);
961 /* We assume that the reason this OSC got a short read is because it read
962 * beyond the end of a stripe file; i.e. lustre is reading a sparse file
963 * via the LOV, and it _knows_ it's reading inside the file, it's just that
964 * this stripe never got written at or beyond this stripe offset yet. */
965 static void handle_short_read(int nob_read, size_t page_count,
966 struct brw_page **pga)
971 /* skip bytes read OK */
972 while (nob_read > 0) {
973 LASSERT (page_count > 0);
975 if (pga[i]->count > nob_read) {
976 /* EOF inside this page */
977 ptr = kmap(pga[i]->pg) +
978 (pga[i]->off & ~PAGE_MASK);
979 memset(ptr + nob_read, 0, pga[i]->count - nob_read);
986 nob_read -= pga[i]->count;
991 /* zero remaining pages */
992 while (page_count-- > 0) {
993 ptr = kmap(pga[i]->pg) + (pga[i]->off & ~PAGE_MASK);
994 memset(ptr, 0, pga[i]->count);
1000 static int check_write_rcs(struct ptlrpc_request *req,
1001 int requested_nob, int niocount,
1002 size_t page_count, struct brw_page **pga)
1007 remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1008 sizeof(*remote_rcs) *
1010 if (remote_rcs == NULL) {
1011 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1015 /* return error if any niobuf was in error */
1016 for (i = 0; i < niocount; i++) {
1017 if ((int)remote_rcs[i] < 0)
1018 return(remote_rcs[i]);
1020 if (remote_rcs[i] != 0) {
1021 CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1022 i, remote_rcs[i], req);
1027 if (req->rq_bulk->bd_nob_transferred != requested_nob) {
1028 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1029 req->rq_bulk->bd_nob_transferred, requested_nob);
1036 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1038 if (p1->flag != p2->flag) {
1039 unsigned mask = ~(OBD_BRW_FROM_GRANT | OBD_BRW_NOCACHE |
1040 OBD_BRW_SYNC | OBD_BRW_ASYNC |
1041 OBD_BRW_NOQUOTA | OBD_BRW_SOFT_SYNC);
1043 /* warn if we try to combine flags that we don't know to be
1044 * safe to combine */
1045 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
1046 CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
1047 "report this at https://jira.hpdd.intel.com/\n",
1048 p1->flag, p2->flag);
1053 return (p1->off + p1->count == p2->off);
1056 static u32 osc_checksum_bulk(int nob, size_t pg_count,
1057 struct brw_page **pga, int opc,
1058 cksum_type_t cksum_type)
1062 struct cfs_crypto_hash_desc *hdesc;
1063 unsigned int bufsize;
1064 unsigned char cfs_alg = cksum_obd2cfs(cksum_type);
1066 LASSERT(pg_count > 0);
1068 hdesc = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1069 if (IS_ERR(hdesc)) {
1070 CERROR("Unable to initialize checksum hash %s\n",
1071 cfs_crypto_hash_name(cfs_alg));
1072 return PTR_ERR(hdesc);
1075 while (nob > 0 && pg_count > 0) {
1076 unsigned int count = pga[i]->count > nob ? nob : pga[i]->count;
1078 /* corrupt the data before we compute the checksum, to
1079 * simulate an OST->client data error */
1080 if (i == 0 && opc == OST_READ &&
1081 OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
1082 unsigned char *ptr = kmap(pga[i]->pg);
1083 int off = pga[i]->off & ~PAGE_MASK;
1085 memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob));
1088 cfs_crypto_hash_update_page(hdesc, pga[i]->pg,
1089 pga[i]->off & ~PAGE_MASK,
1091 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d\n",
1092 (int)(pga[i]->off & ~PAGE_MASK));
1094 nob -= pga[i]->count;
1099 bufsize = sizeof(cksum);
1100 cfs_crypto_hash_final(hdesc, (unsigned char *)&cksum, &bufsize);
1102 /* For sending we only compute the wrong checksum instead
1103 * of corrupting the data so it is still correct on a redo */
1104 if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1111 osc_brw_prep_request(int cmd, struct client_obd *cli, struct obdo *oa,
1112 u32 page_count, struct brw_page **pga,
1113 struct ptlrpc_request **reqp, int resend)
1115 struct ptlrpc_request *req;
1116 struct ptlrpc_bulk_desc *desc;
1117 struct ost_body *body;
1118 struct obd_ioobj *ioobj;
1119 struct niobuf_remote *niobuf;
1120 int niocount, i, requested_nob, opc, rc;
1121 struct osc_brw_async_args *aa;
1122 struct req_capsule *pill;
1123 struct brw_page *pg_prev;
1126 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1127 RETURN(-ENOMEM); /* Recoverable */
1128 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1129 RETURN(-EINVAL); /* Fatal */
1131 if ((cmd & OBD_BRW_WRITE) != 0) {
1133 req = ptlrpc_request_alloc_pool(cli->cl_import,
1135 &RQF_OST_BRW_WRITE);
1138 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1143 for (niocount = i = 1; i < page_count; i++) {
1144 if (!can_merge_pages(pga[i - 1], pga[i]))
1148 pill = &req->rq_pill;
1149 req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1151 req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1152 niocount * sizeof(*niobuf));
1154 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1156 ptlrpc_request_free(req);
1159 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1160 ptlrpc_at_set_req_timeout(req);
1161 /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1163 req->rq_no_retry_einprogress = 1;
1165 desc = ptlrpc_prep_bulk_imp(req, page_count,
1166 cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
1167 (opc == OST_WRITE ? PTLRPC_BULK_GET_SOURCE :
1168 PTLRPC_BULK_PUT_SINK) |
1169 PTLRPC_BULK_BUF_KIOV,
1171 &ptlrpc_bulk_kiov_pin_ops);
1174 GOTO(out, rc = -ENOMEM);
1175 /* NB request now owns desc and will free it when it gets freed */
1177 body = req_capsule_client_get(pill, &RMF_OST_BODY);
1178 ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1179 niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1180 LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1182 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
1184 obdo_to_ioobj(oa, ioobj);
1185 ioobj->ioo_bufcnt = niocount;
1186 /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
1187 * that might be send for this request. The actual number is decided
1188 * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
1189 * "max - 1" for old client compatibility sending "0", and also so the
1190 * the actual maximum is a power-of-two number, not one less. LU-1431 */
1191 ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
1192 LASSERT(page_count > 0);
1194 for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1195 struct brw_page *pg = pga[i];
1196 int poff = pg->off & ~PAGE_MASK;
1198 LASSERT(pg->count > 0);
1199 /* make sure there is no gap in the middle of page array */
1200 LASSERTF(page_count == 1 ||
1201 (ergo(i == 0, poff + pg->count == PAGE_SIZE) &&
1202 ergo(i > 0 && i < page_count - 1,
1203 poff == 0 && pg->count == PAGE_SIZE) &&
1204 ergo(i == page_count - 1, poff == 0)),
1205 "i: %d/%d pg: %p off: %llu, count: %u\n",
1206 i, page_count, pg, pg->off, pg->count);
1207 LASSERTF(i == 0 || pg->off > pg_prev->off,
1208 "i %d p_c %u pg %p [pri %lu ind %lu] off %llu"
1209 " prev_pg %p [pri %lu ind %lu] off %llu\n",
1211 pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1212 pg_prev->pg, page_private(pg_prev->pg),
1213 pg_prev->pg->index, pg_prev->off);
1214 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1215 (pg->flag & OBD_BRW_SRVLOCK));
1217 desc->bd_frag_ops->add_kiov_frag(desc, pg->pg, poff, pg->count);
1218 requested_nob += pg->count;
1220 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1222 niobuf->rnb_len += pg->count;
1224 niobuf->rnb_offset = pg->off;
1225 niobuf->rnb_len = pg->count;
1226 niobuf->rnb_flags = pg->flag;
1231 LASSERTF((void *)(niobuf - niocount) ==
1232 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1233 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1234 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1236 osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1238 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1239 body->oa.o_valid |= OBD_MD_FLFLAGS;
1240 body->oa.o_flags = 0;
1242 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1245 if (osc_should_shrink_grant(cli))
1246 osc_shrink_grant_local(cli, &body->oa);
1248 /* size[REQ_REC_OFF] still sizeof (*body) */
1249 if (opc == OST_WRITE) {
1250 if (cli->cl_checksum &&
1251 !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1252 /* store cl_cksum_type in a local variable since
1253 * it can be changed via lprocfs */
1254 cksum_type_t cksum_type = cli->cl_cksum_type;
1256 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1257 oa->o_flags &= OBD_FL_LOCAL_MASK;
1258 body->oa.o_flags = 0;
1260 body->oa.o_flags |= cksum_type_pack(cksum_type);
1261 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1262 body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1266 CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1268 /* save this in 'oa', too, for later checking */
1269 oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1270 oa->o_flags |= cksum_type_pack(cksum_type);
1272 /* clear out the checksum flag, in case this is a
1273 * resend but cl_checksum is no longer set. b=11238 */
1274 oa->o_valid &= ~OBD_MD_FLCKSUM;
1276 oa->o_cksum = body->oa.o_cksum;
1277 /* 1 RC per niobuf */
1278 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1279 sizeof(__u32) * niocount);
1281 if (cli->cl_checksum &&
1282 !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1283 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1284 body->oa.o_flags = 0;
1285 body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1286 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1289 ptlrpc_request_set_replen(req);
1291 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1292 aa = ptlrpc_req_async_args(req);
1294 aa->aa_requested_nob = requested_nob;
1295 aa->aa_nio_count = niocount;
1296 aa->aa_page_count = page_count;
1300 INIT_LIST_HEAD(&aa->aa_oaps);
1303 niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1304 CDEBUG(D_RPCTRACE, "brw rpc %p - object "DOSTID" offset %lld<>%lld\n",
1305 req, POSTID(&oa->o_oi), niobuf[0].rnb_offset,
1306 niobuf[niocount - 1].rnb_offset + niobuf[niocount - 1].rnb_len);
1310 ptlrpc_req_finished(req);
1314 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1315 __u32 client_cksum, __u32 server_cksum, int nob,
1316 size_t page_count, struct brw_page **pga,
1317 cksum_type_t client_cksum_type)
1321 cksum_type_t cksum_type;
1323 if (server_cksum == client_cksum) {
1324 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1328 cksum_type = cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1330 new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1333 if (cksum_type != client_cksum_type)
1334 msg = "the server did not use the checksum type specified in "
1335 "the original request - likely a protocol problem";
1336 else if (new_cksum == server_cksum)
1337 msg = "changed on the client after we checksummed it - "
1338 "likely false positive due to mmap IO (bug 11742)";
1339 else if (new_cksum == client_cksum)
1340 msg = "changed in transit before arrival at OST";
1342 msg = "changed in transit AND doesn't match the original - "
1343 "likely false positive due to mmap IO (bug 11742)";
1345 LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inode "DFID
1346 " object "DOSTID" extent [%llu-%llu]\n",
1347 msg, libcfs_nid2str(peer->nid),
1348 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1349 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1350 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1351 POSTID(&oa->o_oi), pga[0]->off,
1352 pga[page_count-1]->off + pga[page_count-1]->count - 1);
1353 CERROR("original client csum %x (type %x), server csum %x (type %x), "
1354 "client csum now %x\n", client_cksum, client_cksum_type,
1355 server_cksum, cksum_type, new_cksum);
1359 /* Note rc enters this function as number of bytes transferred */
1360 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1362 struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1363 const lnet_process_id_t *peer =
1364 &req->rq_import->imp_connection->c_peer;
1365 struct client_obd *cli = aa->aa_cli;
1366 struct ost_body *body;
1367 u32 client_cksum = 0;
1370 if (rc < 0 && rc != -EDQUOT) {
1371 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1375 LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1376 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1378 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1382 /* set/clear over quota flag for a uid/gid */
1383 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1384 body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) {
1385 unsigned int qid[LL_MAXQUOTAS] =
1386 {body->oa.o_uid, body->oa.o_gid};
1388 CDEBUG(D_QUOTA, "setdq for [%u %u] with valid %#llx, flags %x\n",
1389 body->oa.o_uid, body->oa.o_gid, body->oa.o_valid,
1391 osc_quota_setdq(cli, qid, body->oa.o_valid, body->oa.o_flags);
1394 osc_update_grant(cli, body);
1399 if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1400 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1402 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1404 CERROR("Unexpected +ve rc %d\n", rc);
1407 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1409 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1412 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1413 check_write_checksum(&body->oa, peer, client_cksum,
1414 body->oa.o_cksum, aa->aa_requested_nob,
1415 aa->aa_page_count, aa->aa_ppga,
1416 cksum_type_unpack(aa->aa_oa->o_flags)))
1419 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1420 aa->aa_page_count, aa->aa_ppga);
1424 /* The rest of this function executes only for OST_READs */
1426 /* if unwrap_bulk failed, return -EAGAIN to retry */
1427 rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1429 GOTO(out, rc = -EAGAIN);
1431 if (rc > aa->aa_requested_nob) {
1432 CERROR("Unexpected rc %d (%d requested)\n", rc,
1433 aa->aa_requested_nob);
1437 if (rc != req->rq_bulk->bd_nob_transferred) {
1438 CERROR ("Unexpected rc %d (%d transferred)\n",
1439 rc, req->rq_bulk->bd_nob_transferred);
1443 if (rc < aa->aa_requested_nob)
1444 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1446 if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1447 static int cksum_counter;
1448 u32 server_cksum = body->oa.o_cksum;
1451 cksum_type_t cksum_type;
1453 cksum_type = cksum_type_unpack(body->oa.o_valid &OBD_MD_FLFLAGS?
1454 body->oa.o_flags : 0);
1455 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1456 aa->aa_ppga, OST_READ,
1459 if (peer->nid != req->rq_bulk->bd_sender) {
1461 router = libcfs_nid2str(req->rq_bulk->bd_sender);
1464 if (server_cksum != client_cksum) {
1465 LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1466 "%s%s%s inode "DFID" object "DOSTID
1467 " extent [%llu-%llu]\n",
1468 req->rq_import->imp_obd->obd_name,
1469 libcfs_nid2str(peer->nid),
1471 body->oa.o_valid & OBD_MD_FLFID ?
1472 body->oa.o_parent_seq : (__u64)0,
1473 body->oa.o_valid & OBD_MD_FLFID ?
1474 body->oa.o_parent_oid : 0,
1475 body->oa.o_valid & OBD_MD_FLFID ?
1476 body->oa.o_parent_ver : 0,
1477 POSTID(&body->oa.o_oi),
1478 aa->aa_ppga[0]->off,
1479 aa->aa_ppga[aa->aa_page_count-1]->off +
1480 aa->aa_ppga[aa->aa_page_count-1]->count -
1482 CERROR("client %x, server %x, cksum_type %x\n",
1483 client_cksum, server_cksum, cksum_type);
1485 aa->aa_oa->o_cksum = client_cksum;
1489 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1492 } else if (unlikely(client_cksum)) {
1493 static int cksum_missed;
1496 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1497 CERROR("Checksum %u requested from %s but not sent\n",
1498 cksum_missed, libcfs_nid2str(peer->nid));
1504 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
1505 aa->aa_oa, &body->oa);
1510 static int osc_brw_redo_request(struct ptlrpc_request *request,
1511 struct osc_brw_async_args *aa, int rc)
1513 struct ptlrpc_request *new_req;
1514 struct osc_brw_async_args *new_aa;
1515 struct osc_async_page *oap;
1518 DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
1519 "redo for recoverable error %d", rc);
1521 rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1522 OST_WRITE ? OBD_BRW_WRITE : OBD_BRW_READ,
1523 aa->aa_cli, aa->aa_oa, aa->aa_page_count,
1524 aa->aa_ppga, &new_req, 1);
1528 list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1529 if (oap->oap_request != NULL) {
1530 LASSERTF(request == oap->oap_request,
1531 "request %p != oap_request %p\n",
1532 request, oap->oap_request);
1533 if (oap->oap_interrupted) {
1534 ptlrpc_req_finished(new_req);
1539 /* New request takes over pga and oaps from old request.
1540 * Note that copying a list_head doesn't work, need to move it... */
1542 new_req->rq_interpret_reply = request->rq_interpret_reply;
1543 new_req->rq_async_args = request->rq_async_args;
1544 new_req->rq_commit_cb = request->rq_commit_cb;
1545 /* cap resend delay to the current request timeout, this is similar to
1546 * what ptlrpc does (see after_reply()) */
1547 if (aa->aa_resends > new_req->rq_timeout)
1548 new_req->rq_sent = cfs_time_current_sec() + new_req->rq_timeout;
1550 new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
1551 new_req->rq_generation_set = 1;
1552 new_req->rq_import_generation = request->rq_import_generation;
1554 new_aa = ptlrpc_req_async_args(new_req);
1556 INIT_LIST_HEAD(&new_aa->aa_oaps);
1557 list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
1558 INIT_LIST_HEAD(&new_aa->aa_exts);
1559 list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
1560 new_aa->aa_resends = aa->aa_resends;
1562 list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1563 if (oap->oap_request) {
1564 ptlrpc_req_finished(oap->oap_request);
1565 oap->oap_request = ptlrpc_request_addref(new_req);
1569 /* XXX: This code will run into problem if we're going to support
1570 * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
1571 * and wait for all of them to be finished. We should inherit request
1572 * set from old request. */
1573 ptlrpcd_add_req(new_req);
1575 DEBUG_REQ(D_INFO, new_req, "new request");
1580 * ugh, we want disk allocation on the target to happen in offset order. we'll
1581 * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1582 * fine for our small page arrays and doesn't require allocation. its an
1583 * insertion sort that swaps elements that are strides apart, shrinking the
1584 * stride down until its '1' and the array is sorted.
1586 static void sort_brw_pages(struct brw_page **array, int num)
1589 struct brw_page *tmp;
1593 for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1598 for (i = stride ; i < num ; i++) {
1601 while (j >= stride && array[j - stride]->off > tmp->off) {
1602 array[j] = array[j - stride];
1607 } while (stride > 1);
1610 static void osc_release_ppga(struct brw_page **ppga, size_t count)
1612 LASSERT(ppga != NULL);
1613 OBD_FREE(ppga, sizeof(*ppga) * count);
1616 static int brw_interpret(const struct lu_env *env,
1617 struct ptlrpc_request *req, void *data, int rc)
1619 struct osc_brw_async_args *aa = data;
1620 struct osc_extent *ext;
1621 struct osc_extent *tmp;
1622 struct client_obd *cli = aa->aa_cli;
1625 rc = osc_brw_fini_request(req, rc);
1626 CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
1627 /* When server return -EINPROGRESS, client should always retry
1628 * regardless of the number of times the bulk was resent already. */
1629 if (osc_recoverable_error(rc)) {
1630 if (req->rq_import_generation !=
1631 req->rq_import->imp_generation) {
1632 CDEBUG(D_HA, "%s: resend cross eviction for object: "
1633 ""DOSTID", rc = %d.\n",
1634 req->rq_import->imp_obd->obd_name,
1635 POSTID(&aa->aa_oa->o_oi), rc);
1636 } else if (rc == -EINPROGRESS ||
1637 client_should_resend(aa->aa_resends, aa->aa_cli)) {
1638 rc = osc_brw_redo_request(req, aa, rc);
1640 CERROR("%s: too many resent retries for object: "
1641 "%llu:%llu, rc = %d.\n",
1642 req->rq_import->imp_obd->obd_name,
1643 POSTID(&aa->aa_oa->o_oi), rc);
1648 else if (rc == -EAGAIN || rc == -EINPROGRESS)
1653 struct obdo *oa = aa->aa_oa;
1654 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
1655 unsigned long valid = 0;
1656 struct cl_object *obj;
1657 struct osc_async_page *last;
1659 last = brw_page2oap(aa->aa_ppga[aa->aa_page_count - 1]);
1660 obj = osc2cl(last->oap_obj);
1662 cl_object_attr_lock(obj);
1663 if (oa->o_valid & OBD_MD_FLBLOCKS) {
1664 attr->cat_blocks = oa->o_blocks;
1665 valid |= CAT_BLOCKS;
1667 if (oa->o_valid & OBD_MD_FLMTIME) {
1668 attr->cat_mtime = oa->o_mtime;
1671 if (oa->o_valid & OBD_MD_FLATIME) {
1672 attr->cat_atime = oa->o_atime;
1675 if (oa->o_valid & OBD_MD_FLCTIME) {
1676 attr->cat_ctime = oa->o_ctime;
1680 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1681 struct lov_oinfo *loi = cl2osc(obj)->oo_oinfo;
1682 loff_t last_off = last->oap_count + last->oap_obj_off +
1685 /* Change file size if this is an out of quota or
1686 * direct IO write and it extends the file size */
1687 if (loi->loi_lvb.lvb_size < last_off) {
1688 attr->cat_size = last_off;
1691 /* Extend KMS if it's not a lockless write */
1692 if (loi->loi_kms < last_off &&
1693 oap2osc_page(last)->ops_srvlock == 0) {
1694 attr->cat_kms = last_off;
1700 cl_object_attr_update(env, obj, attr, valid);
1701 cl_object_attr_unlock(obj);
1703 OBDO_FREE(aa->aa_oa);
1705 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE && rc == 0)
1706 osc_inc_unstable_pages(req);
1708 list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
1709 list_del_init(&ext->oe_link);
1710 osc_extent_finish(env, ext, 1, rc);
1712 LASSERT(list_empty(&aa->aa_exts));
1713 LASSERT(list_empty(&aa->aa_oaps));
1715 osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1716 ptlrpc_lprocfs_brw(req, req->rq_bulk->bd_nob_transferred);
1718 spin_lock(&cli->cl_loi_list_lock);
1719 /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
1720 * is called so we know whether to go to sync BRWs or wait for more
1721 * RPCs to complete */
1722 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
1723 cli->cl_w_in_flight--;
1725 cli->cl_r_in_flight--;
1726 osc_wake_cache_waiters(cli);
1727 spin_unlock(&cli->cl_loi_list_lock);
1729 osc_io_unplug(env, cli, NULL);
1733 static void brw_commit(struct ptlrpc_request *req)
1735 /* If osc_inc_unstable_pages (via osc_extent_finish) races with
1736 * this called via the rq_commit_cb, I need to ensure
1737 * osc_dec_unstable_pages is still called. Otherwise unstable
1738 * pages may be leaked. */
1739 spin_lock(&req->rq_lock);
1740 if (likely(req->rq_unstable)) {
1741 req->rq_unstable = 0;
1742 spin_unlock(&req->rq_lock);
1744 osc_dec_unstable_pages(req);
1746 req->rq_committed = 1;
1747 spin_unlock(&req->rq_lock);
1752 * Build an RPC by the list of extent @ext_list. The caller must ensure
1753 * that the total pages in this list are NOT over max pages per RPC.
1754 * Extents in the list must be in OES_RPC state.
1756 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
1757 struct list_head *ext_list, int cmd)
1759 struct ptlrpc_request *req = NULL;
1760 struct osc_extent *ext;
1761 struct brw_page **pga = NULL;
1762 struct osc_brw_async_args *aa = NULL;
1763 struct obdo *oa = NULL;
1764 struct osc_async_page *oap;
1765 struct osc_object *obj = NULL;
1766 struct cl_req_attr *crattr = NULL;
1767 loff_t starting_offset = OBD_OBJECT_EOF;
1768 loff_t ending_offset = 0;
1772 bool soft_sync = false;
1773 bool interrupted = false;
1777 struct list_head rpc_list = LIST_HEAD_INIT(rpc_list);
1778 struct ost_body *body;
1780 LASSERT(!list_empty(ext_list));
1782 /* add pages into rpc_list to build BRW rpc */
1783 list_for_each_entry(ext, ext_list, oe_link) {
1784 LASSERT(ext->oe_state == OES_RPC);
1785 mem_tight |= ext->oe_memalloc;
1786 grant += ext->oe_grants;
1787 page_count += ext->oe_nr_pages;
1792 soft_sync = osc_over_unstable_soft_limit(cli);
1794 mpflag = cfs_memory_pressure_get_and_set();
1796 OBD_ALLOC(pga, sizeof(*pga) * page_count);
1798 GOTO(out, rc = -ENOMEM);
1802 GOTO(out, rc = -ENOMEM);
1805 list_for_each_entry(ext, ext_list, oe_link) {
1806 list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
1808 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
1810 oap->oap_brw_flags |= OBD_BRW_SOFT_SYNC;
1811 pga[i] = &oap->oap_brw_page;
1812 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
1815 list_add_tail(&oap->oap_rpc_item, &rpc_list);
1816 if (starting_offset == OBD_OBJECT_EOF ||
1817 starting_offset > oap->oap_obj_off)
1818 starting_offset = oap->oap_obj_off;
1820 LASSERT(oap->oap_page_off == 0);
1821 if (ending_offset < oap->oap_obj_off + oap->oap_count)
1822 ending_offset = oap->oap_obj_off +
1825 LASSERT(oap->oap_page_off + oap->oap_count ==
1827 if (oap->oap_interrupted)
1832 /* first page in the list */
1833 oap = list_entry(rpc_list.next, typeof(*oap), oap_rpc_item);
1835 crattr = &osc_env_info(env)->oti_req_attr;
1836 memset(crattr, 0, sizeof(*crattr));
1837 crattr->cra_type = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
1838 crattr->cra_flags = ~0ULL;
1839 crattr->cra_page = oap2cl_page(oap);
1840 crattr->cra_oa = oa;
1841 cl_req_attr_set(env, osc2cl(obj), crattr);
1843 if (cmd == OBD_BRW_WRITE)
1844 oa->o_grant_used = grant;
1846 sort_brw_pages(pga, page_count);
1847 rc = osc_brw_prep_request(cmd, cli, oa, page_count, pga, &req, 0);
1849 CERROR("prep_req failed: %d\n", rc);
1853 req->rq_commit_cb = brw_commit;
1854 req->rq_interpret_reply = brw_interpret;
1855 req->rq_memalloc = mem_tight != 0;
1856 oap->oap_request = ptlrpc_request_addref(req);
1857 if (interrupted && !req->rq_intr)
1858 ptlrpc_mark_interrupted(req);
1860 /* Need to update the timestamps after the request is built in case
1861 * we race with setattr (locally or in queue at OST). If OST gets
1862 * later setattr before earlier BRW (as determined by the request xid),
1863 * the OST will not use BRW timestamps. Sadly, there is no obvious
1864 * way to do this in a single call. bug 10150 */
1865 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
1866 crattr->cra_oa = &body->oa;
1867 crattr->cra_flags = OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME;
1868 cl_req_attr_set(env, osc2cl(obj), crattr);
1869 lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid);
1871 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1872 aa = ptlrpc_req_async_args(req);
1873 INIT_LIST_HEAD(&aa->aa_oaps);
1874 list_splice_init(&rpc_list, &aa->aa_oaps);
1875 INIT_LIST_HEAD(&aa->aa_exts);
1876 list_splice_init(ext_list, &aa->aa_exts);
1878 spin_lock(&cli->cl_loi_list_lock);
1879 starting_offset >>= PAGE_SHIFT;
1880 if (cmd == OBD_BRW_READ) {
1881 cli->cl_r_in_flight++;
1882 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
1883 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
1884 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
1885 starting_offset + 1);
1887 cli->cl_w_in_flight++;
1888 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
1889 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
1890 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
1891 starting_offset + 1);
1893 spin_unlock(&cli->cl_loi_list_lock);
1895 DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %ur/%uw in flight",
1896 page_count, aa, cli->cl_r_in_flight,
1897 cli->cl_w_in_flight);
1898 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_DELAY_IO, cfs_fail_val);
1900 ptlrpcd_add_req(req);
1906 cfs_memory_pressure_restore(mpflag);
1909 LASSERT(req == NULL);
1914 OBD_FREE(pga, sizeof(*pga) * page_count);
1915 /* this should happen rarely and is pretty bad, it makes the
1916 * pending list not follow the dirty order */
1917 while (!list_empty(ext_list)) {
1918 ext = list_entry(ext_list->next, struct osc_extent,
1920 list_del_init(&ext->oe_link);
1921 osc_extent_finish(env, ext, 0, rc);
1927 static int osc_set_lock_data(struct ldlm_lock *lock, void *data)
1931 LASSERT(lock != NULL);
1933 lock_res_and_lock(lock);
1935 if (lock->l_ast_data == NULL)
1936 lock->l_ast_data = data;
1937 if (lock->l_ast_data == data)
1940 unlock_res_and_lock(lock);
1945 static int osc_enqueue_fini(struct ptlrpc_request *req,
1946 osc_enqueue_upcall_f upcall, void *cookie,
1947 struct lustre_handle *lockh, enum ldlm_mode mode,
1948 __u64 *flags, int agl, int errcode)
1950 bool intent = *flags & LDLM_FL_HAS_INTENT;
1954 /* The request was created before ldlm_cli_enqueue call. */
1955 if (intent && errcode == ELDLM_LOCK_ABORTED) {
1956 struct ldlm_reply *rep;
1958 rep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
1959 LASSERT(rep != NULL);
1961 rep->lock_policy_res1 =
1962 ptlrpc_status_ntoh(rep->lock_policy_res1);
1963 if (rep->lock_policy_res1)
1964 errcode = rep->lock_policy_res1;
1966 *flags |= LDLM_FL_LVB_READY;
1967 } else if (errcode == ELDLM_OK) {
1968 *flags |= LDLM_FL_LVB_READY;
1971 /* Call the update callback. */
1972 rc = (*upcall)(cookie, lockh, errcode);
1974 /* release the reference taken in ldlm_cli_enqueue() */
1975 if (errcode == ELDLM_LOCK_MATCHED)
1977 if (errcode == ELDLM_OK && lustre_handle_is_used(lockh))
1978 ldlm_lock_decref(lockh, mode);
1983 static int osc_enqueue_interpret(const struct lu_env *env,
1984 struct ptlrpc_request *req,
1985 struct osc_enqueue_args *aa, int rc)
1987 struct ldlm_lock *lock;
1988 struct lustre_handle *lockh = &aa->oa_lockh;
1989 enum ldlm_mode mode = aa->oa_mode;
1990 struct ost_lvb *lvb = aa->oa_lvb;
1991 __u32 lvb_len = sizeof(*lvb);
1996 /* ldlm_cli_enqueue is holding a reference on the lock, so it must
1998 lock = ldlm_handle2lock(lockh);
1999 LASSERTF(lock != NULL,
2000 "lockh %#llx, req %p, aa %p - client evicted?\n",
2001 lockh->cookie, req, aa);
2003 /* Take an additional reference so that a blocking AST that
2004 * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2005 * to arrive after an upcall has been executed by
2006 * osc_enqueue_fini(). */
2007 ldlm_lock_addref(lockh, mode);
2009 /* Let cl_lock_state_wait fail with -ERESTARTSYS to unuse sublocks. */
2010 OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_HANG, 2);
2012 /* Let CP AST to grant the lock first. */
2013 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
2016 LASSERT(aa->oa_lvb == NULL);
2017 LASSERT(aa->oa_flags == NULL);
2018 aa->oa_flags = &flags;
2021 /* Complete obtaining the lock procedure. */
2022 rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_type, 1,
2023 aa->oa_mode, aa->oa_flags, lvb, lvb_len,
2025 /* Complete osc stuff. */
2026 rc = osc_enqueue_fini(req, aa->oa_upcall, aa->oa_cookie, lockh, mode,
2027 aa->oa_flags, aa->oa_agl, rc);
2029 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
2031 ldlm_lock_decref(lockh, mode);
2032 LDLM_LOCK_PUT(lock);
2036 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
2038 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2039 * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2040 * other synchronous requests, however keeping some locks and trying to obtain
2041 * others may take a considerable amount of time in a case of ost failure; and
2042 * when other sync requests do not get released lock from a client, the client
2043 * is evicted from the cluster -- such scenarious make the life difficult, so
2044 * release locks just after they are obtained. */
2045 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2046 __u64 *flags, union ldlm_policy_data *policy,
2047 struct ost_lvb *lvb, int kms_valid,
2048 osc_enqueue_upcall_f upcall, void *cookie,
2049 struct ldlm_enqueue_info *einfo,
2050 struct ptlrpc_request_set *rqset, int async, int agl)
2052 struct obd_device *obd = exp->exp_obd;
2053 struct lustre_handle lockh = { 0 };
2054 struct ptlrpc_request *req = NULL;
2055 int intent = *flags & LDLM_FL_HAS_INTENT;
2056 __u64 match_flags = *flags;
2057 enum ldlm_mode mode;
2061 /* Filesystem lock extents are extended to page boundaries so that
2062 * dealing with the page cache is a little smoother. */
2063 policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2064 policy->l_extent.end |= ~PAGE_MASK;
2067 * kms is not valid when either object is completely fresh (so that no
2068 * locks are cached), or object was evicted. In the latter case cached
2069 * lock cannot be used, because it would prime inode state with
2070 * potentially stale LVB.
2075 /* Next, search for already existing extent locks that will cover us */
2076 /* If we're trying to read, we also search for an existing PW lock. The
2077 * VFS and page cache already protect us locally, so lots of readers/
2078 * writers can share a single PW lock.
2080 * There are problems with conversion deadlocks, so instead of
2081 * converting a read lock to a write lock, we'll just enqueue a new
2084 * At some point we should cancel the read lock instead of making them
2085 * send us a blocking callback, but there are problems with canceling
2086 * locks out from other users right now, too. */
2087 mode = einfo->ei_mode;
2088 if (einfo->ei_mode == LCK_PR)
2091 match_flags |= LDLM_FL_LVB_READY;
2093 match_flags |= LDLM_FL_BLOCK_GRANTED;
2094 mode = ldlm_lock_match(obd->obd_namespace, match_flags, res_id,
2095 einfo->ei_type, policy, mode, &lockh, 0);
2097 struct ldlm_lock *matched;
2099 if (*flags & LDLM_FL_TEST_LOCK)
2102 matched = ldlm_handle2lock(&lockh);
2104 /* AGL enqueues DLM locks speculatively. Therefore if
2105 * it already exists a DLM lock, it wll just inform the
2106 * caller to cancel the AGL process for this stripe. */
2107 ldlm_lock_decref(&lockh, mode);
2108 LDLM_LOCK_PUT(matched);
2110 } else if (osc_set_lock_data(matched, einfo->ei_cbdata)) {
2111 *flags |= LDLM_FL_LVB_READY;
2113 /* We already have a lock, and it's referenced. */
2114 (*upcall)(cookie, &lockh, ELDLM_LOCK_MATCHED);
2116 ldlm_lock_decref(&lockh, mode);
2117 LDLM_LOCK_PUT(matched);
2120 ldlm_lock_decref(&lockh, mode);
2121 LDLM_LOCK_PUT(matched);
2126 if (*flags & (LDLM_FL_TEST_LOCK | LDLM_FL_MATCH_LOCK))
2130 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2131 &RQF_LDLM_ENQUEUE_LVB);
2135 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
2137 ptlrpc_request_free(req);
2141 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
2143 ptlrpc_request_set_replen(req);
2146 /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2147 *flags &= ~LDLM_FL_BLOCK_GRANTED;
2149 rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2150 sizeof(*lvb), LVB_T_OST, &lockh, async);
2153 struct osc_enqueue_args *aa;
2154 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2155 aa = ptlrpc_req_async_args(req);
2157 aa->oa_mode = einfo->ei_mode;
2158 aa->oa_type = einfo->ei_type;
2159 lustre_handle_copy(&aa->oa_lockh, &lockh);
2160 aa->oa_upcall = upcall;
2161 aa->oa_cookie = cookie;
2164 aa->oa_flags = flags;
2167 /* AGL is essentially to enqueue an DLM lock
2168 * in advance, so we don't care about the
2169 * result of AGL enqueue. */
2171 aa->oa_flags = NULL;
2174 req->rq_interpret_reply =
2175 (ptlrpc_interpterer_t)osc_enqueue_interpret;
2176 if (rqset == PTLRPCD_SET)
2177 ptlrpcd_add_req(req);
2179 ptlrpc_set_add_req(rqset, req);
2180 } else if (intent) {
2181 ptlrpc_req_finished(req);
2186 rc = osc_enqueue_fini(req, upcall, cookie, &lockh, einfo->ei_mode,
2189 ptlrpc_req_finished(req);
2194 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2195 enum ldlm_type type, union ldlm_policy_data *policy,
2196 enum ldlm_mode mode, __u64 *flags, void *data,
2197 struct lustre_handle *lockh, int unref)
2199 struct obd_device *obd = exp->exp_obd;
2200 __u64 lflags = *flags;
2204 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2207 /* Filesystem lock extents are extended to page boundaries so that
2208 * dealing with the page cache is a little smoother */
2209 policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2210 policy->l_extent.end |= ~PAGE_MASK;
2212 /* Next, search for already existing extent locks that will cover us */
2213 /* If we're trying to read, we also search for an existing PW lock. The
2214 * VFS and page cache already protect us locally, so lots of readers/
2215 * writers can share a single PW lock. */
2219 rc = ldlm_lock_match(obd->obd_namespace, lflags,
2220 res_id, type, policy, rc, lockh, unref);
2221 if (rc == 0 || lflags & LDLM_FL_TEST_LOCK)
2225 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2227 LASSERT(lock != NULL);
2228 if (!osc_set_lock_data(lock, data)) {
2229 ldlm_lock_decref(lockh, rc);
2232 LDLM_LOCK_PUT(lock);
2237 static int osc_statfs_interpret(const struct lu_env *env,
2238 struct ptlrpc_request *req,
2239 struct osc_async_args *aa, int rc)
2241 struct obd_statfs *msfs;
2245 /* The request has in fact never been sent
2246 * due to issues at a higher level (LOV).
2247 * Exit immediately since the caller is
2248 * aware of the problem and takes care
2249 * of the clean up */
2252 if ((rc == -ENOTCONN || rc == -EAGAIN) &&
2253 (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
2259 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2261 GOTO(out, rc = -EPROTO);
2264 *aa->aa_oi->oi_osfs = *msfs;
2266 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2270 static int osc_statfs_async(struct obd_export *exp,
2271 struct obd_info *oinfo, __u64 max_age,
2272 struct ptlrpc_request_set *rqset)
2274 struct obd_device *obd = class_exp2obd(exp);
2275 struct ptlrpc_request *req;
2276 struct osc_async_args *aa;
2280 /* We could possibly pass max_age in the request (as an absolute
2281 * timestamp or a "seconds.usec ago") so the target can avoid doing
2282 * extra calls into the filesystem if that isn't necessary (e.g.
2283 * during mount that would help a bit). Having relative timestamps
2284 * is not so great if request processing is slow, while absolute
2285 * timestamps are not ideal because they need time synchronization. */
2286 req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
2290 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2292 ptlrpc_request_free(req);
2295 ptlrpc_request_set_replen(req);
2296 req->rq_request_portal = OST_CREATE_PORTAL;
2297 ptlrpc_at_set_req_timeout(req);
2299 if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
2300 /* procfs requests not want stat in wait for avoid deadlock */
2301 req->rq_no_resend = 1;
2302 req->rq_no_delay = 1;
2305 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
2306 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2307 aa = ptlrpc_req_async_args(req);
2310 ptlrpc_set_add_req(rqset, req);
2314 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
2315 struct obd_statfs *osfs, __u64 max_age, __u32 flags)
2317 struct obd_device *obd = class_exp2obd(exp);
2318 struct obd_statfs *msfs;
2319 struct ptlrpc_request *req;
2320 struct obd_import *imp = NULL;
2324 /*Since the request might also come from lprocfs, so we need
2325 *sync this with client_disconnect_export Bug15684*/
2326 down_read(&obd->u.cli.cl_sem);
2327 if (obd->u.cli.cl_import)
2328 imp = class_import_get(obd->u.cli.cl_import);
2329 up_read(&obd->u.cli.cl_sem);
2333 /* We could possibly pass max_age in the request (as an absolute
2334 * timestamp or a "seconds.usec ago") so the target can avoid doing
2335 * extra calls into the filesystem if that isn't necessary (e.g.
2336 * during mount that would help a bit). Having relative timestamps
2337 * is not so great if request processing is slow, while absolute
2338 * timestamps are not ideal because they need time synchronization. */
2339 req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
2341 class_import_put(imp);
2346 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2348 ptlrpc_request_free(req);
2351 ptlrpc_request_set_replen(req);
2352 req->rq_request_portal = OST_CREATE_PORTAL;
2353 ptlrpc_at_set_req_timeout(req);
2355 if (flags & OBD_STATFS_NODELAY) {
2356 /* procfs requests not want stat in wait for avoid deadlock */
2357 req->rq_no_resend = 1;
2358 req->rq_no_delay = 1;
2361 rc = ptlrpc_queue_wait(req);
2365 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2367 GOTO(out, rc = -EPROTO);
2374 ptlrpc_req_finished(req);
2378 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2379 void *karg, void __user *uarg)
2381 struct obd_device *obd = exp->exp_obd;
2382 struct obd_ioctl_data *data = karg;
2386 if (!try_module_get(THIS_MODULE)) {
2387 CERROR("%s: cannot get module '%s'\n", obd->obd_name,
2388 module_name(THIS_MODULE));
2392 case OBD_IOC_CLIENT_RECOVER:
2393 err = ptlrpc_recover_import(obd->u.cli.cl_import,
2394 data->ioc_inlbuf1, 0);
2398 case IOC_OSC_SET_ACTIVE:
2399 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
2402 case OBD_IOC_PING_TARGET:
2403 err = ptlrpc_obd_ping(obd);
2406 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
2407 cmd, current_comm());
2408 GOTO(out, err = -ENOTTY);
2411 module_put(THIS_MODULE);
2415 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
2416 u32 keylen, void *key,
2417 u32 vallen, void *val,
2418 struct ptlrpc_request_set *set)
2420 struct ptlrpc_request *req;
2421 struct obd_device *obd = exp->exp_obd;
2422 struct obd_import *imp = class_exp2cliimp(exp);
2427 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
2429 if (KEY_IS(KEY_CHECKSUM)) {
2430 if (vallen != sizeof(int))
2432 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
2436 if (KEY_IS(KEY_SPTLRPC_CONF)) {
2437 sptlrpc_conf_client_adapt(obd);
2441 if (KEY_IS(KEY_FLUSH_CTX)) {
2442 sptlrpc_import_flush_my_ctx(imp);
2446 if (KEY_IS(KEY_CACHE_SET)) {
2447 struct client_obd *cli = &obd->u.cli;
2449 LASSERT(cli->cl_cache == NULL); /* only once */
2450 cli->cl_cache = (struct cl_client_cache *)val;
2451 cl_cache_incref(cli->cl_cache);
2452 cli->cl_lru_left = &cli->cl_cache->ccc_lru_left;
2454 /* add this osc into entity list */
2455 LASSERT(list_empty(&cli->cl_lru_osc));
2456 spin_lock(&cli->cl_cache->ccc_lru_lock);
2457 list_add(&cli->cl_lru_osc, &cli->cl_cache->ccc_lru);
2458 spin_unlock(&cli->cl_cache->ccc_lru_lock);
2463 if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
2464 struct client_obd *cli = &obd->u.cli;
2465 long nr = atomic_long_read(&cli->cl_lru_in_list) >> 1;
2466 long target = *(long *)val;
2468 nr = osc_lru_shrink(env, cli, min(nr, target), true);
2473 if (!set && !KEY_IS(KEY_GRANT_SHRINK))
2476 /* We pass all other commands directly to OST. Since nobody calls osc
2477 methods directly and everybody is supposed to go through LOV, we
2478 assume lov checked invalid values for us.
2479 The only recognised values so far are evict_by_nid and mds_conn.
2480 Even if something bad goes through, we'd get a -EINVAL from OST
2483 req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ?
2484 &RQF_OST_SET_GRANT_INFO :
2489 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
2490 RCL_CLIENT, keylen);
2491 if (!KEY_IS(KEY_GRANT_SHRINK))
2492 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
2493 RCL_CLIENT, vallen);
2494 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
2496 ptlrpc_request_free(req);
2500 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
2501 memcpy(tmp, key, keylen);
2502 tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ?
2505 memcpy(tmp, val, vallen);
2507 if (KEY_IS(KEY_GRANT_SHRINK)) {
2508 struct osc_grant_args *aa;
2511 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2512 aa = ptlrpc_req_async_args(req);
2515 ptlrpc_req_finished(req);
2518 *oa = ((struct ost_body *)val)->oa;
2520 req->rq_interpret_reply = osc_shrink_grant_interpret;
2523 ptlrpc_request_set_replen(req);
2524 if (!KEY_IS(KEY_GRANT_SHRINK)) {
2525 LASSERT(set != NULL);
2526 ptlrpc_set_add_req(set, req);
2527 ptlrpc_check_set(NULL, set);
2529 ptlrpcd_add_req(req);
2535 static int osc_reconnect(const struct lu_env *env,
2536 struct obd_export *exp, struct obd_device *obd,
2537 struct obd_uuid *cluuid,
2538 struct obd_connect_data *data,
2541 struct client_obd *cli = &obd->u.cli;
2543 if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
2547 spin_lock(&cli->cl_loi_list_lock);
2548 grant = cli->cl_avail_grant + cli->cl_reserved_grant;
2549 if (data->ocd_connect_flags & OBD_CONNECT_GRANT_PARAM)
2550 grant += cli->cl_dirty_grant;
2552 grant += cli->cl_dirty_pages << PAGE_SHIFT;
2553 data->ocd_grant = grant ? : 2 * cli_brw_size(obd);
2554 lost_grant = cli->cl_lost_grant;
2555 cli->cl_lost_grant = 0;
2556 spin_unlock(&cli->cl_loi_list_lock);
2558 CDEBUG(D_RPCTRACE, "ocd_connect_flags: %#llx ocd_version: %d"
2559 " ocd_grant: %d, lost: %ld.\n", data->ocd_connect_flags,
2560 data->ocd_version, data->ocd_grant, lost_grant);
2566 static int osc_disconnect(struct obd_export *exp)
2568 struct obd_device *obd = class_exp2obd(exp);
2571 rc = client_disconnect_export(exp);
2573 * Initially we put del_shrink_grant before disconnect_export, but it
2574 * causes the following problem if setup (connect) and cleanup
2575 * (disconnect) are tangled together.
2576 * connect p1 disconnect p2
2577 * ptlrpc_connect_import
2578 * ............... class_manual_cleanup
2581 * ptlrpc_connect_interrupt
2583 * add this client to shrink list
2585 * Bang! pinger trigger the shrink.
2586 * So the osc should be disconnected from the shrink list, after we
2587 * are sure the import has been destroyed. BUG18662
2589 if (obd->u.cli.cl_import == NULL)
2590 osc_del_shrink_grant(&obd->u.cli);
2594 static int osc_ldlm_resource_invalidate(struct cfs_hash *hs,
2595 struct cfs_hash_bd *bd, struct hlist_node *hnode, void *arg)
2597 struct lu_env *env = arg;
2598 struct ldlm_resource *res = cfs_hash_object(hs, hnode);
2599 struct ldlm_lock *lock;
2600 struct osc_object *osc = NULL;
2604 list_for_each_entry(lock, &res->lr_granted, l_res_link) {
2605 if (lock->l_ast_data != NULL && osc == NULL) {
2606 osc = lock->l_ast_data;
2607 cl_object_get(osc2cl(osc));
2610 /* clear LDLM_FL_CLEANED flag to make sure it will be canceled
2611 * by the 2nd round of ldlm_namespace_clean() call in
2612 * osc_import_event(). */
2613 ldlm_clear_cleaned(lock);
2618 osc_object_invalidate(env, osc);
2619 cl_object_put(env, osc2cl(osc));
2625 static int osc_import_event(struct obd_device *obd,
2626 struct obd_import *imp,
2627 enum obd_import_event event)
2629 struct client_obd *cli;
2633 LASSERT(imp->imp_obd == obd);
2636 case IMP_EVENT_DISCON: {
2638 spin_lock(&cli->cl_loi_list_lock);
2639 cli->cl_avail_grant = 0;
2640 cli->cl_lost_grant = 0;
2641 spin_unlock(&cli->cl_loi_list_lock);
2644 case IMP_EVENT_INACTIVE: {
2645 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
2648 case IMP_EVENT_INVALIDATE: {
2649 struct ldlm_namespace *ns = obd->obd_namespace;
2653 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
2655 env = cl_env_get(&refcheck);
2657 osc_io_unplug(env, &obd->u.cli, NULL);
2659 cfs_hash_for_each_nolock(ns->ns_rs_hash,
2660 osc_ldlm_resource_invalidate,
2662 cl_env_put(env, &refcheck);
2664 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
2669 case IMP_EVENT_ACTIVE: {
2670 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
2673 case IMP_EVENT_OCD: {
2674 struct obd_connect_data *ocd = &imp->imp_connect_data;
2676 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
2677 osc_init_grant(&obd->u.cli, ocd);
2680 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
2681 imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
2683 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
2686 case IMP_EVENT_DEACTIVATE: {
2687 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE, NULL);
2690 case IMP_EVENT_ACTIVATE: {
2691 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE, NULL);
2695 CERROR("Unknown import event %d\n", event);
2702 * Determine whether the lock can be canceled before replaying the lock
2703 * during recovery, see bug16774 for detailed information.
2705 * \retval zero the lock can't be canceled
2706 * \retval other ok to cancel
2708 static int osc_cancel_weight(struct ldlm_lock *lock)
2711 * Cancel all unused and granted extent lock.
2713 if (lock->l_resource->lr_type == LDLM_EXTENT &&
2714 lock->l_granted_mode == lock->l_req_mode &&
2715 osc_ldlm_weigh_ast(lock) == 0)
2721 static int brw_queue_work(const struct lu_env *env, void *data)
2723 struct client_obd *cli = data;
2725 CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
2727 osc_io_unplug(env, cli, NULL);
2731 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
2733 struct client_obd *cli = &obd->u.cli;
2734 struct obd_type *type;
2742 rc = ptlrpcd_addref();
2746 rc = client_obd_setup(obd, lcfg);
2748 GOTO(out_ptlrpcd, rc);
2750 handler = ptlrpcd_alloc_work(cli->cl_import, brw_queue_work, cli);
2751 if (IS_ERR(handler))
2752 GOTO(out_client_setup, rc = PTR_ERR(handler));
2753 cli->cl_writeback_work = handler;
2755 handler = ptlrpcd_alloc_work(cli->cl_import, lru_queue_work, cli);
2756 if (IS_ERR(handler))
2757 GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
2758 cli->cl_lru_work = handler;
2760 rc = osc_quota_setup(obd);
2762 GOTO(out_ptlrpcd_work, rc);
2764 cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
2766 #ifdef CONFIG_PROC_FS
2767 obd->obd_vars = lprocfs_osc_obd_vars;
2769 /* If this is true then both client (osc) and server (osp) are on the
2770 * same node. The osp layer if loaded first will register the osc proc
2771 * directory. In that case this obd_device will be attached its proc
2772 * tree to type->typ_procsym instead of obd->obd_type->typ_procroot. */
2773 type = class_search_type(LUSTRE_OSP_NAME);
2774 if (type && type->typ_procsym) {
2775 obd->obd_proc_entry = lprocfs_register(obd->obd_name,
2777 obd->obd_vars, obd);
2778 if (IS_ERR(obd->obd_proc_entry)) {
2779 rc = PTR_ERR(obd->obd_proc_entry);
2780 CERROR("error %d setting up lprocfs for %s\n", rc,
2782 obd->obd_proc_entry = NULL;
2785 rc = lprocfs_obd_setup(obd);
2788 /* If the basic OSC proc tree construction succeeded then
2789 * lets do the rest. */
2791 lproc_osc_attach_seqstat(obd);
2792 sptlrpc_lprocfs_cliobd_attach(obd);
2793 ptlrpc_lprocfs_register_obd(obd);
2797 * We try to control the total number of requests with a upper limit
2798 * osc_reqpool_maxreqcount. There might be some race which will cause
2799 * over-limit allocation, but it is fine.
2801 req_count = atomic_read(&osc_pool_req_count);
2802 if (req_count < osc_reqpool_maxreqcount) {
2803 adding = cli->cl_max_rpcs_in_flight + 2;
2804 if (req_count + adding > osc_reqpool_maxreqcount)
2805 adding = osc_reqpool_maxreqcount - req_count;
2807 added = ptlrpc_add_rqs_to_pool(osc_rq_pool, adding);
2808 atomic_add(added, &osc_pool_req_count);
2811 INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
2812 ns_register_cancel(obd->obd_namespace, osc_cancel_weight);
2814 spin_lock(&osc_shrink_lock);
2815 list_add_tail(&cli->cl_shrink_list, &osc_shrink_list);
2816 spin_unlock(&osc_shrink_lock);
2821 if (cli->cl_writeback_work != NULL) {
2822 ptlrpcd_destroy_work(cli->cl_writeback_work);
2823 cli->cl_writeback_work = NULL;
2825 if (cli->cl_lru_work != NULL) {
2826 ptlrpcd_destroy_work(cli->cl_lru_work);
2827 cli->cl_lru_work = NULL;
2830 client_obd_cleanup(obd);
2836 static int osc_precleanup(struct obd_device *obd)
2838 struct client_obd *cli = &obd->u.cli;
2842 * for echo client, export may be on zombie list, wait for
2843 * zombie thread to cull it, because cli.cl_import will be
2844 * cleared in client_disconnect_export():
2845 * class_export_destroy() -> obd_cleanup() ->
2846 * echo_device_free() -> echo_client_cleanup() ->
2847 * obd_disconnect() -> osc_disconnect() ->
2848 * client_disconnect_export()
2850 obd_zombie_barrier();
2851 if (cli->cl_writeback_work) {
2852 ptlrpcd_destroy_work(cli->cl_writeback_work);
2853 cli->cl_writeback_work = NULL;
2856 if (cli->cl_lru_work) {
2857 ptlrpcd_destroy_work(cli->cl_lru_work);
2858 cli->cl_lru_work = NULL;
2861 obd_cleanup_client_import(obd);
2862 ptlrpc_lprocfs_unregister_obd(obd);
2863 lprocfs_obd_cleanup(obd);
2867 int osc_cleanup(struct obd_device *obd)
2869 struct client_obd *cli = &obd->u.cli;
2874 spin_lock(&osc_shrink_lock);
2875 list_del(&cli->cl_shrink_list);
2876 spin_unlock(&osc_shrink_lock);
2879 if (cli->cl_cache != NULL) {
2880 LASSERT(atomic_read(&cli->cl_cache->ccc_users) > 0);
2881 spin_lock(&cli->cl_cache->ccc_lru_lock);
2882 list_del_init(&cli->cl_lru_osc);
2883 spin_unlock(&cli->cl_cache->ccc_lru_lock);
2884 cli->cl_lru_left = NULL;
2885 cl_cache_decref(cli->cl_cache);
2886 cli->cl_cache = NULL;
2889 /* free memory of osc quota cache */
2890 osc_quota_cleanup(obd);
2892 rc = client_obd_cleanup(obd);
2898 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
2900 int rc = class_process_proc_param(PARAM_OSC, obd->obd_vars, lcfg, obd);
2901 return rc > 0 ? 0: rc;
2904 static int osc_process_config(struct obd_device *obd, size_t len, void *buf)
2906 return osc_process_config_base(obd, buf);
2909 static struct obd_ops osc_obd_ops = {
2910 .o_owner = THIS_MODULE,
2911 .o_setup = osc_setup,
2912 .o_precleanup = osc_precleanup,
2913 .o_cleanup = osc_cleanup,
2914 .o_add_conn = client_import_add_conn,
2915 .o_del_conn = client_import_del_conn,
2916 .o_connect = client_connect_import,
2917 .o_reconnect = osc_reconnect,
2918 .o_disconnect = osc_disconnect,
2919 .o_statfs = osc_statfs,
2920 .o_statfs_async = osc_statfs_async,
2921 .o_create = osc_create,
2922 .o_destroy = osc_destroy,
2923 .o_getattr = osc_getattr,
2924 .o_setattr = osc_setattr,
2925 .o_iocontrol = osc_iocontrol,
2926 .o_set_info_async = osc_set_info_async,
2927 .o_import_event = osc_import_event,
2928 .o_process_config = osc_process_config,
2929 .o_quotactl = osc_quotactl,
2932 static struct shrinker *osc_cache_shrinker;
2933 struct list_head osc_shrink_list = LIST_HEAD_INIT(osc_shrink_list);
2934 DEFINE_SPINLOCK(osc_shrink_lock);
2936 #ifndef HAVE_SHRINKER_COUNT
2937 static int osc_cache_shrink(SHRINKER_ARGS(sc, nr_to_scan, gfp_mask))
2939 struct shrink_control scv = {
2940 .nr_to_scan = shrink_param(sc, nr_to_scan),
2941 .gfp_mask = shrink_param(sc, gfp_mask)
2943 #if !defined(HAVE_SHRINKER_WANT_SHRINK_PTR) && !defined(HAVE_SHRINK_CONTROL)
2944 struct shrinker *shrinker = NULL;
2947 (void)osc_cache_shrink_scan(shrinker, &scv);
2949 return osc_cache_shrink_count(shrinker, &scv);
2953 static int __init osc_init(void)
2955 bool enable_proc = true;
2956 struct obd_type *type;
2957 unsigned int reqpool_size;
2958 unsigned int reqsize;
2960 DEF_SHRINKER_VAR(osc_shvar, osc_cache_shrink,
2961 osc_cache_shrink_count, osc_cache_shrink_scan);
2964 /* print an address of _any_ initialized kernel symbol from this
2965 * module, to allow debugging with gdb that doesn't support data
2966 * symbols from modules.*/
2967 CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
2969 rc = lu_kmem_init(osc_caches);
2973 type = class_search_type(LUSTRE_OSP_NAME);
2974 if (type != NULL && type->typ_procsym != NULL)
2975 enable_proc = false;
2977 rc = class_register_type(&osc_obd_ops, NULL, enable_proc, NULL,
2978 LUSTRE_OSC_NAME, &osc_device_type);
2982 osc_cache_shrinker = set_shrinker(DEFAULT_SEEKS, &osc_shvar);
2984 /* This is obviously too much memory, only prevent overflow here */
2985 if (osc_reqpool_mem_max >= 1 << 12 || osc_reqpool_mem_max == 0)
2986 GOTO(out_type, rc = -EINVAL);
2988 reqpool_size = osc_reqpool_mem_max << 20;
2991 while (reqsize < OST_IO_MAXREQSIZE)
2992 reqsize = reqsize << 1;
2995 * We don't enlarge the request count in OSC pool according to
2996 * cl_max_rpcs_in_flight. The allocation from the pool will only be
2997 * tried after normal allocation failed. So a small OSC pool won't
2998 * cause much performance degression in most of cases.
3000 osc_reqpool_maxreqcount = reqpool_size / reqsize;
3002 atomic_set(&osc_pool_req_count, 0);
3003 osc_rq_pool = ptlrpc_init_rq_pool(0, OST_IO_MAXREQSIZE,
3004 ptlrpc_add_rqs_to_pool);
3006 if (osc_rq_pool != NULL)
3010 class_unregister_type(LUSTRE_OSC_NAME);
3012 lu_kmem_fini(osc_caches);
3017 static void __exit osc_exit(void)
3019 remove_shrinker(osc_cache_shrinker);
3020 class_unregister_type(LUSTRE_OSC_NAME);
3021 lu_kmem_fini(osc_caches);
3022 ptlrpc_free_rq_pool(osc_rq_pool);
3025 MODULE_AUTHOR("OpenSFS, Inc. <http://www.lustre.org/>");
3026 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3027 MODULE_VERSION(LUSTRE_VERSION_STRING);
3028 MODULE_LICENSE("GPL");
3030 module_init(osc_init);
3031 module_exit(osc_exit);