4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.gnu.org/licenses/gpl-2.0.html
23 * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
24 * Use is subject to license terms.
26 * Copyright (c) 2011, 2015, Intel Corporation.
29 * This file is part of Lustre, http://www.lustre.org/
30 * Lustre is a trademark of Sun Microsystems, Inc.
33 #define DEBUG_SUBSYSTEM S_OSC
35 #include <libcfs/libcfs.h>
37 #include <lustre/lustre_user.h>
39 #include <lprocfs_status.h>
40 #include <lustre_debug.h>
41 #include <lustre_dlm.h>
42 #include <lustre_fid.h>
43 #include <lustre_ha.h>
44 #include <lustre_ioctl.h>
45 #include <lustre_net.h>
46 #include <lustre_obdo.h>
47 #include <lustre_param.h>
49 #include <obd_cksum.h>
50 #include <obd_class.h>
52 #include "osc_cl_internal.h"
53 #include "osc_internal.h"
55 atomic_t osc_pool_req_count;
56 unsigned int osc_reqpool_maxreqcount;
57 struct ptlrpc_request_pool *osc_rq_pool;
59 /* max memory used for request pool, unit is MB */
60 static unsigned int osc_reqpool_mem_max = 5;
61 module_param(osc_reqpool_mem_max, uint, 0444);
63 struct osc_brw_async_args {
69 struct brw_page **aa_ppga;
70 struct client_obd *aa_cli;
71 struct list_head aa_oaps;
72 struct list_head aa_exts;
75 #define osc_grant_args osc_brw_async_args
77 struct osc_setattr_args {
79 obd_enqueue_update_f sa_upcall;
83 struct osc_fsync_args {
84 struct osc_object *fa_obj;
86 obd_enqueue_update_f fa_upcall;
90 struct osc_ladvise_args {
92 obd_enqueue_update_f la_upcall;
96 struct osc_enqueue_args {
97 struct obd_export *oa_exp;
98 enum ldlm_type oa_type;
99 enum ldlm_mode oa_mode;
101 osc_enqueue_upcall_f oa_upcall;
103 struct ost_lvb *oa_lvb;
104 struct lustre_handle oa_lockh;
105 unsigned int oa_agl:1;
108 static void osc_release_ppga(struct brw_page **ppga, size_t count);
109 static int brw_interpret(const struct lu_env *env, struct ptlrpc_request *req,
112 void osc_pack_req_body(struct ptlrpc_request *req, struct obdo *oa)
114 struct ost_body *body;
116 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
119 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
122 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
125 struct ptlrpc_request *req;
126 struct ost_body *body;
130 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
134 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
136 ptlrpc_request_free(req);
140 osc_pack_req_body(req, oa);
142 ptlrpc_request_set_replen(req);
144 rc = ptlrpc_queue_wait(req);
148 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
150 GOTO(out, rc = -EPROTO);
152 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
153 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
155 oa->o_blksize = cli_brw_size(exp->exp_obd);
156 oa->o_valid |= OBD_MD_FLBLKSZ;
160 ptlrpc_req_finished(req);
165 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
168 struct ptlrpc_request *req;
169 struct ost_body *body;
173 LASSERT(oa->o_valid & OBD_MD_FLGROUP);
175 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
179 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
181 ptlrpc_request_free(req);
185 osc_pack_req_body(req, oa);
187 ptlrpc_request_set_replen(req);
189 rc = ptlrpc_queue_wait(req);
193 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
195 GOTO(out, rc = -EPROTO);
197 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
201 ptlrpc_req_finished(req);
206 static int osc_setattr_interpret(const struct lu_env *env,
207 struct ptlrpc_request *req,
208 struct osc_setattr_args *sa, int rc)
210 struct ost_body *body;
216 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
218 GOTO(out, rc = -EPROTO);
220 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, sa->sa_oa,
223 rc = sa->sa_upcall(sa->sa_cookie, rc);
227 int osc_setattr_async(struct obd_export *exp, struct obdo *oa,
228 obd_enqueue_update_f upcall, void *cookie,
229 struct ptlrpc_request_set *rqset)
231 struct ptlrpc_request *req;
232 struct osc_setattr_args *sa;
237 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
241 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
243 ptlrpc_request_free(req);
247 osc_pack_req_body(req, oa);
249 ptlrpc_request_set_replen(req);
251 /* do mds to ost setattr asynchronously */
253 /* Do not wait for response. */
254 ptlrpcd_add_req(req);
256 req->rq_interpret_reply =
257 (ptlrpc_interpterer_t)osc_setattr_interpret;
259 CLASSERT(sizeof(*sa) <= sizeof(req->rq_async_args));
260 sa = ptlrpc_req_async_args(req);
262 sa->sa_upcall = upcall;
263 sa->sa_cookie = cookie;
265 if (rqset == PTLRPCD_SET)
266 ptlrpcd_add_req(req);
268 ptlrpc_set_add_req(rqset, req);
274 static int osc_ladvise_interpret(const struct lu_env *env,
275 struct ptlrpc_request *req,
278 struct osc_ladvise_args *la = arg;
279 struct ost_body *body;
285 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
287 GOTO(out, rc = -EPROTO);
289 *la->la_oa = body->oa;
291 rc = la->la_upcall(la->la_cookie, rc);
296 * If rqset is NULL, do not wait for response. Upcall and cookie could also
297 * be NULL in this case
299 int osc_ladvise_base(struct obd_export *exp, struct obdo *oa,
300 struct ladvise_hdr *ladvise_hdr,
301 obd_enqueue_update_f upcall, void *cookie,
302 struct ptlrpc_request_set *rqset)
304 struct ptlrpc_request *req;
305 struct ost_body *body;
306 struct osc_ladvise_args *la;
308 struct lu_ladvise *req_ladvise;
309 struct lu_ladvise *ladvise = ladvise_hdr->lah_advise;
310 int num_advise = ladvise_hdr->lah_count;
311 struct ladvise_hdr *req_ladvise_hdr;
314 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_LADVISE);
318 req_capsule_set_size(&req->rq_pill, &RMF_OST_LADVISE, RCL_CLIENT,
319 num_advise * sizeof(*ladvise));
320 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_LADVISE);
322 ptlrpc_request_free(req);
325 req->rq_request_portal = OST_IO_PORTAL;
326 ptlrpc_at_set_req_timeout(req);
328 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
330 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
333 req_ladvise_hdr = req_capsule_client_get(&req->rq_pill,
334 &RMF_OST_LADVISE_HDR);
335 memcpy(req_ladvise_hdr, ladvise_hdr, sizeof(*ladvise_hdr));
337 req_ladvise = req_capsule_client_get(&req->rq_pill, &RMF_OST_LADVISE);
338 memcpy(req_ladvise, ladvise, sizeof(*ladvise) * num_advise);
339 ptlrpc_request_set_replen(req);
342 /* Do not wait for response. */
343 ptlrpcd_add_req(req);
347 req->rq_interpret_reply = osc_ladvise_interpret;
348 CLASSERT(sizeof(*la) <= sizeof(req->rq_async_args));
349 la = ptlrpc_req_async_args(req);
351 la->la_upcall = upcall;
352 la->la_cookie = cookie;
354 if (rqset == PTLRPCD_SET)
355 ptlrpcd_add_req(req);
357 ptlrpc_set_add_req(rqset, req);
362 static int osc_create(const struct lu_env *env, struct obd_export *exp,
365 struct ptlrpc_request *req;
366 struct ost_body *body;
371 LASSERT(oa->o_valid & OBD_MD_FLGROUP);
372 LASSERT(fid_seq_is_echo(ostid_seq(&oa->o_oi)));
374 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
376 GOTO(out, rc = -ENOMEM);
378 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
380 ptlrpc_request_free(req);
384 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
387 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
389 ptlrpc_request_set_replen(req);
391 rc = ptlrpc_queue_wait(req);
395 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
397 GOTO(out_req, rc = -EPROTO);
399 CDEBUG(D_INFO, "oa flags %x\n", oa->o_flags);
400 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
402 oa->o_blksize = cli_brw_size(exp->exp_obd);
403 oa->o_valid |= OBD_MD_FLBLKSZ;
405 CDEBUG(D_HA, "transno: %lld\n",
406 lustre_msg_get_transno(req->rq_repmsg));
408 ptlrpc_req_finished(req);
413 int osc_punch_base(struct obd_export *exp, struct obdo *oa,
414 obd_enqueue_update_f upcall, void *cookie,
415 struct ptlrpc_request_set *rqset)
417 struct ptlrpc_request *req;
418 struct osc_setattr_args *sa;
419 struct ost_body *body;
423 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
427 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
429 ptlrpc_request_free(req);
432 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
433 ptlrpc_at_set_req_timeout(req);
435 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
437 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
439 ptlrpc_request_set_replen(req);
441 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
442 CLASSERT(sizeof(*sa) <= sizeof(req->rq_async_args));
443 sa = ptlrpc_req_async_args(req);
445 sa->sa_upcall = upcall;
446 sa->sa_cookie = cookie;
447 if (rqset == PTLRPCD_SET)
448 ptlrpcd_add_req(req);
450 ptlrpc_set_add_req(rqset, req);
455 static int osc_sync_interpret(const struct lu_env *env,
456 struct ptlrpc_request *req,
459 struct osc_fsync_args *fa = arg;
460 struct ost_body *body;
461 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
462 unsigned long valid = 0;
463 struct cl_object *obj;
469 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
471 CERROR("can't unpack ost_body\n");
472 GOTO(out, rc = -EPROTO);
475 *fa->fa_oa = body->oa;
476 obj = osc2cl(fa->fa_obj);
478 /* Update osc object's blocks attribute */
479 cl_object_attr_lock(obj);
480 if (body->oa.o_valid & OBD_MD_FLBLOCKS) {
481 attr->cat_blocks = body->oa.o_blocks;
486 cl_object_attr_update(env, obj, attr, valid);
487 cl_object_attr_unlock(obj);
490 rc = fa->fa_upcall(fa->fa_cookie, rc);
494 int osc_sync_base(struct osc_object *obj, struct obdo *oa,
495 obd_enqueue_update_f upcall, void *cookie,
496 struct ptlrpc_request_set *rqset)
498 struct obd_export *exp = osc_export(obj);
499 struct ptlrpc_request *req;
500 struct ost_body *body;
501 struct osc_fsync_args *fa;
505 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
509 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
511 ptlrpc_request_free(req);
515 /* overload the size and blocks fields in the oa with start/end */
516 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
518 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
520 ptlrpc_request_set_replen(req);
521 req->rq_interpret_reply = osc_sync_interpret;
523 CLASSERT(sizeof(*fa) <= sizeof(req->rq_async_args));
524 fa = ptlrpc_req_async_args(req);
527 fa->fa_upcall = upcall;
528 fa->fa_cookie = cookie;
530 if (rqset == PTLRPCD_SET)
531 ptlrpcd_add_req(req);
533 ptlrpc_set_add_req(rqset, req);
538 /* Find and cancel locally locks matched by @mode in the resource found by
539 * @objid. Found locks are added into @cancel list. Returns the amount of
540 * locks added to @cancels list. */
541 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
542 struct list_head *cancels,
543 enum ldlm_mode mode, __u64 lock_flags)
545 struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
546 struct ldlm_res_id res_id;
547 struct ldlm_resource *res;
551 /* Return, i.e. cancel nothing, only if ELC is supported (flag in
552 * export) but disabled through procfs (flag in NS).
554 * This distinguishes from a case when ELC is not supported originally,
555 * when we still want to cancel locks in advance and just cancel them
556 * locally, without sending any RPC. */
557 if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
560 ostid_build_res_name(&oa->o_oi, &res_id);
561 res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
565 LDLM_RESOURCE_ADDREF(res);
566 count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
567 lock_flags, 0, NULL);
568 LDLM_RESOURCE_DELREF(res);
569 ldlm_resource_putref(res);
573 static int osc_destroy_interpret(const struct lu_env *env,
574 struct ptlrpc_request *req, void *data,
577 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
579 atomic_dec(&cli->cl_destroy_in_flight);
580 wake_up(&cli->cl_destroy_waitq);
584 static int osc_can_send_destroy(struct client_obd *cli)
586 if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
587 cli->cl_max_rpcs_in_flight) {
588 /* The destroy request can be sent */
591 if (atomic_dec_return(&cli->cl_destroy_in_flight) <
592 cli->cl_max_rpcs_in_flight) {
594 * The counter has been modified between the two atomic
597 wake_up(&cli->cl_destroy_waitq);
602 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
605 struct client_obd *cli = &exp->exp_obd->u.cli;
606 struct ptlrpc_request *req;
607 struct ost_body *body;
608 struct list_head cancels = LIST_HEAD_INIT(cancels);
613 CDEBUG(D_INFO, "oa NULL\n");
617 count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
618 LDLM_FL_DISCARD_DATA);
620 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
622 ldlm_lock_list_put(&cancels, l_bl_ast, count);
626 rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
629 ptlrpc_request_free(req);
633 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
634 ptlrpc_at_set_req_timeout(req);
636 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
638 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
640 ptlrpc_request_set_replen(req);
642 req->rq_interpret_reply = osc_destroy_interpret;
643 if (!osc_can_send_destroy(cli)) {
644 struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
647 * Wait until the number of on-going destroy RPCs drops
648 * under max_rpc_in_flight
650 l_wait_event_exclusive(cli->cl_destroy_waitq,
651 osc_can_send_destroy(cli), &lwi);
654 /* Do not wait for response */
655 ptlrpcd_add_req(req);
659 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
662 u64 bits = OBD_MD_FLBLOCKS | OBD_MD_FLGRANT;
664 LASSERT(!(oa->o_valid & bits));
667 spin_lock(&cli->cl_loi_list_lock);
668 if (OCD_HAS_FLAG(&cli->cl_import->imp_connect_data, GRANT_PARAM))
669 oa->o_dirty = cli->cl_dirty_grant;
671 oa->o_dirty = cli->cl_dirty_pages << PAGE_SHIFT;
672 if (unlikely(cli->cl_dirty_pages - cli->cl_dirty_transit >
673 cli->cl_dirty_max_pages)) {
674 CERROR("dirty %lu - %lu > dirty_max %lu\n",
675 cli->cl_dirty_pages, cli->cl_dirty_transit,
676 cli->cl_dirty_max_pages);
678 } else if (unlikely(atomic_long_read(&obd_dirty_pages) -
679 atomic_long_read(&obd_dirty_transit_pages) >
680 (long)(obd_max_dirty_pages + 1))) {
681 /* The atomic_read() allowing the atomic_inc() are
682 * not covered by a lock thus they may safely race and trip
683 * this CERROR() unless we add in a small fudge factor (+1). */
684 CERROR("%s: dirty %ld - %ld > system dirty_max %ld\n",
685 cli_name(cli), atomic_long_read(&obd_dirty_pages),
686 atomic_long_read(&obd_dirty_transit_pages),
687 obd_max_dirty_pages);
689 } else if (unlikely(cli->cl_dirty_max_pages - cli->cl_dirty_pages >
691 CERROR("dirty %lu - dirty_max %lu too big???\n",
692 cli->cl_dirty_pages, cli->cl_dirty_max_pages);
695 unsigned long nrpages;
697 nrpages = cli->cl_max_pages_per_rpc;
698 nrpages *= cli->cl_max_rpcs_in_flight + 1;
699 nrpages = max(nrpages, cli->cl_dirty_max_pages);
700 oa->o_undirty = nrpages << PAGE_SHIFT;
701 if (OCD_HAS_FLAG(&cli->cl_import->imp_connect_data,
705 /* take extent tax into account when asking for more
707 nrextents = (nrpages + cli->cl_max_extent_pages - 1) /
708 cli->cl_max_extent_pages;
709 oa->o_undirty += nrextents * cli->cl_grant_extent_tax;
712 oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
713 oa->o_dropped = cli->cl_lost_grant;
714 cli->cl_lost_grant = 0;
715 spin_unlock(&cli->cl_loi_list_lock);
716 CDEBUG(D_CACHE, "dirty: %llu undirty: %u dropped %u grant: %llu\n",
717 oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
720 void osc_update_next_shrink(struct client_obd *cli)
722 cli->cl_next_shrink_grant =
723 cfs_time_shift(cli->cl_grant_shrink_interval);
724 CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
725 cli->cl_next_shrink_grant);
728 static void __osc_update_grant(struct client_obd *cli, u64 grant)
730 spin_lock(&cli->cl_loi_list_lock);
731 cli->cl_avail_grant += grant;
732 spin_unlock(&cli->cl_loi_list_lock);
735 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
737 if (body->oa.o_valid & OBD_MD_FLGRANT) {
738 CDEBUG(D_CACHE, "got %llu extra grant\n", body->oa.o_grant);
739 __osc_update_grant(cli, body->oa.o_grant);
743 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
744 u32 keylen, void *key,
745 u32 vallen, void *val,
746 struct ptlrpc_request_set *set);
748 static int osc_shrink_grant_interpret(const struct lu_env *env,
749 struct ptlrpc_request *req,
752 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
753 struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
754 struct ost_body *body;
757 __osc_update_grant(cli, oa->o_grant);
761 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
763 osc_update_grant(cli, body);
769 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
771 spin_lock(&cli->cl_loi_list_lock);
772 oa->o_grant = cli->cl_avail_grant / 4;
773 cli->cl_avail_grant -= oa->o_grant;
774 spin_unlock(&cli->cl_loi_list_lock);
775 if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
776 oa->o_valid |= OBD_MD_FLFLAGS;
779 oa->o_flags |= OBD_FL_SHRINK_GRANT;
780 osc_update_next_shrink(cli);
783 /* Shrink the current grant, either from some large amount to enough for a
784 * full set of in-flight RPCs, or if we have already shrunk to that limit
785 * then to enough for a single RPC. This avoids keeping more grant than
786 * needed, and avoids shrinking the grant piecemeal. */
787 static int osc_shrink_grant(struct client_obd *cli)
789 __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) *
790 (cli->cl_max_pages_per_rpc << PAGE_SHIFT);
792 spin_lock(&cli->cl_loi_list_lock);
793 if (cli->cl_avail_grant <= target_bytes)
794 target_bytes = cli->cl_max_pages_per_rpc << PAGE_SHIFT;
795 spin_unlock(&cli->cl_loi_list_lock);
797 return osc_shrink_grant_to_target(cli, target_bytes);
800 int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
803 struct ost_body *body;
806 spin_lock(&cli->cl_loi_list_lock);
807 /* Don't shrink if we are already above or below the desired limit
808 * We don't want to shrink below a single RPC, as that will negatively
809 * impact block allocation and long-term performance. */
810 if (target_bytes < cli->cl_max_pages_per_rpc << PAGE_SHIFT)
811 target_bytes = cli->cl_max_pages_per_rpc << PAGE_SHIFT;
813 if (target_bytes >= cli->cl_avail_grant) {
814 spin_unlock(&cli->cl_loi_list_lock);
817 spin_unlock(&cli->cl_loi_list_lock);
823 osc_announce_cached(cli, &body->oa, 0);
825 spin_lock(&cli->cl_loi_list_lock);
826 body->oa.o_grant = cli->cl_avail_grant - target_bytes;
827 cli->cl_avail_grant = target_bytes;
828 spin_unlock(&cli->cl_loi_list_lock);
829 if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
830 body->oa.o_valid |= OBD_MD_FLFLAGS;
831 body->oa.o_flags = 0;
833 body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
834 osc_update_next_shrink(cli);
836 rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
837 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
838 sizeof(*body), body, NULL);
840 __osc_update_grant(cli, body->oa.o_grant);
845 static int osc_should_shrink_grant(struct client_obd *client)
847 cfs_time_t time = cfs_time_current();
848 cfs_time_t next_shrink = client->cl_next_shrink_grant;
850 if ((client->cl_import->imp_connect_data.ocd_connect_flags &
851 OBD_CONNECT_GRANT_SHRINK) == 0)
854 if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
855 /* Get the current RPC size directly, instead of going via:
856 * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
857 * Keep comment here so that it can be found by searching. */
858 int brw_size = client->cl_max_pages_per_rpc << PAGE_SHIFT;
860 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
861 client->cl_avail_grant > brw_size)
864 osc_update_next_shrink(client);
869 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
871 struct client_obd *client;
873 list_for_each_entry(client, &item->ti_obd_list, cl_grant_shrink_list) {
874 if (osc_should_shrink_grant(client))
875 osc_shrink_grant(client);
880 static int osc_add_shrink_grant(struct client_obd *client)
884 rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
886 osc_grant_shrink_grant_cb, NULL,
887 &client->cl_grant_shrink_list);
889 CERROR("add grant client %s error %d\n", cli_name(client), rc);
892 CDEBUG(D_CACHE, "add grant client %s\n", cli_name(client));
893 osc_update_next_shrink(client);
897 static int osc_del_shrink_grant(struct client_obd *client)
899 return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
903 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
906 * ocd_grant is the total grant amount we're expect to hold: if we've
907 * been evicted, it's the new avail_grant amount, cl_dirty_pages will
908 * drop to 0 as inflight RPCs fail out; otherwise, it's avail_grant +
911 * race is tolerable here: if we're evicted, but imp_state already
912 * left EVICTED state, then cl_dirty_pages must be 0 already.
914 spin_lock(&cli->cl_loi_list_lock);
915 cli->cl_avail_grant = ocd->ocd_grant;
916 if (cli->cl_import->imp_state != LUSTRE_IMP_EVICTED) {
917 cli->cl_avail_grant -= cli->cl_reserved_grant;
918 if (OCD_HAS_FLAG(ocd, GRANT_PARAM))
919 cli->cl_avail_grant -= cli->cl_dirty_grant;
921 cli->cl_avail_grant -=
922 cli->cl_dirty_pages << PAGE_SHIFT;
925 if (cli->cl_avail_grant < 0) {
926 CWARN("%s: available grant < 0: avail/ocd/dirty %ld/%u/%ld\n",
927 cli_name(cli), cli->cl_avail_grant,
928 ocd->ocd_grant, cli->cl_dirty_pages << PAGE_SHIFT);
929 /* workaround for servers which do not have the patch from
931 cli->cl_avail_grant = ocd->ocd_grant;
934 if (OCD_HAS_FLAG(ocd, GRANT_PARAM)) {
938 /* overhead for each extent insertion */
939 cli->cl_grant_extent_tax = ocd->ocd_grant_tax_kb << 10;
940 /* determine the appropriate chunk size used by osc_extent. */
941 cli->cl_chunkbits = max_t(int, PAGE_SHIFT,
942 ocd->ocd_grant_blkbits);
943 /* max_pages_per_rpc must be chunk aligned */
944 chunk_mask = ~((1 << (cli->cl_chunkbits - PAGE_SHIFT)) - 1);
945 cli->cl_max_pages_per_rpc = (cli->cl_max_pages_per_rpc +
946 ~chunk_mask) & chunk_mask;
947 /* determine maximum extent size, in #pages */
948 size = (u64)ocd->ocd_grant_max_blks << ocd->ocd_grant_blkbits;
949 cli->cl_max_extent_pages = size >> PAGE_SHIFT;
950 if (cli->cl_max_extent_pages == 0)
951 cli->cl_max_extent_pages = 1;
953 cli->cl_grant_extent_tax = 0;
954 cli->cl_chunkbits = PAGE_SHIFT;
955 cli->cl_max_extent_pages = DT_MAX_BRW_PAGES;
957 spin_unlock(&cli->cl_loi_list_lock);
959 CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld."
960 "chunk bits: %d cl_max_extent_pages: %d\n",
962 cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits,
963 cli->cl_max_extent_pages);
965 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
966 list_empty(&cli->cl_grant_shrink_list))
967 osc_add_shrink_grant(cli);
970 /* We assume that the reason this OSC got a short read is because it read
971 * beyond the end of a stripe file; i.e. lustre is reading a sparse file
972 * via the LOV, and it _knows_ it's reading inside the file, it's just that
973 * this stripe never got written at or beyond this stripe offset yet. */
974 static void handle_short_read(int nob_read, size_t page_count,
975 struct brw_page **pga)
980 /* skip bytes read OK */
981 while (nob_read > 0) {
982 LASSERT (page_count > 0);
984 if (pga[i]->count > nob_read) {
985 /* EOF inside this page */
986 ptr = kmap(pga[i]->pg) +
987 (pga[i]->off & ~PAGE_MASK);
988 memset(ptr + nob_read, 0, pga[i]->count - nob_read);
995 nob_read -= pga[i]->count;
1000 /* zero remaining pages */
1001 while (page_count-- > 0) {
1002 ptr = kmap(pga[i]->pg) + (pga[i]->off & ~PAGE_MASK);
1003 memset(ptr, 0, pga[i]->count);
1009 static int check_write_rcs(struct ptlrpc_request *req,
1010 int requested_nob, int niocount,
1011 size_t page_count, struct brw_page **pga)
1016 remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1017 sizeof(*remote_rcs) *
1019 if (remote_rcs == NULL) {
1020 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1024 /* return error if any niobuf was in error */
1025 for (i = 0; i < niocount; i++) {
1026 if ((int)remote_rcs[i] < 0)
1027 return(remote_rcs[i]);
1029 if (remote_rcs[i] != 0) {
1030 CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1031 i, remote_rcs[i], req);
1036 if (req->rq_bulk->bd_nob_transferred != requested_nob) {
1037 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1038 req->rq_bulk->bd_nob_transferred, requested_nob);
1045 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1047 if (p1->flag != p2->flag) {
1048 unsigned mask = ~(OBD_BRW_FROM_GRANT | OBD_BRW_NOCACHE |
1049 OBD_BRW_SYNC | OBD_BRW_ASYNC |
1050 OBD_BRW_NOQUOTA | OBD_BRW_SOFT_SYNC);
1052 /* warn if we try to combine flags that we don't know to be
1053 * safe to combine */
1054 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
1055 CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
1056 "report this at https://jira.hpdd.intel.com/\n",
1057 p1->flag, p2->flag);
1062 return (p1->off + p1->count == p2->off);
1065 static u32 osc_checksum_bulk(int nob, size_t pg_count,
1066 struct brw_page **pga, int opc,
1067 cksum_type_t cksum_type)
1071 struct cfs_crypto_hash_desc *hdesc;
1072 unsigned int bufsize;
1074 unsigned char cfs_alg = cksum_obd2cfs(cksum_type);
1076 LASSERT(pg_count > 0);
1078 hdesc = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1079 if (IS_ERR(hdesc)) {
1080 CERROR("Unable to initialize checksum hash %s\n",
1081 cfs_crypto_hash_name(cfs_alg));
1082 return PTR_ERR(hdesc);
1085 while (nob > 0 && pg_count > 0) {
1086 unsigned int count = pga[i]->count > nob ? nob : pga[i]->count;
1088 /* corrupt the data before we compute the checksum, to
1089 * simulate an OST->client data error */
1090 if (i == 0 && opc == OST_READ &&
1091 OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
1092 unsigned char *ptr = kmap(pga[i]->pg);
1093 int off = pga[i]->off & ~PAGE_MASK;
1095 memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob));
1098 cfs_crypto_hash_update_page(hdesc, pga[i]->pg,
1099 pga[i]->off & ~PAGE_MASK,
1101 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d\n",
1102 (int)(pga[i]->off & ~PAGE_MASK));
1104 nob -= pga[i]->count;
1109 bufsize = sizeof(cksum);
1110 err = cfs_crypto_hash_final(hdesc, (unsigned char *)&cksum, &bufsize);
1112 /* For sending we only compute the wrong checksum instead
1113 * of corrupting the data so it is still correct on a redo */
1114 if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1121 osc_brw_prep_request(int cmd, struct client_obd *cli, struct obdo *oa,
1122 u32 page_count, struct brw_page **pga,
1123 struct ptlrpc_request **reqp, int resend)
1125 struct ptlrpc_request *req;
1126 struct ptlrpc_bulk_desc *desc;
1127 struct ost_body *body;
1128 struct obd_ioobj *ioobj;
1129 struct niobuf_remote *niobuf;
1130 int niocount, i, requested_nob, opc, rc;
1131 struct osc_brw_async_args *aa;
1132 struct req_capsule *pill;
1133 struct brw_page *pg_prev;
1136 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1137 RETURN(-ENOMEM); /* Recoverable */
1138 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1139 RETURN(-EINVAL); /* Fatal */
1141 if ((cmd & OBD_BRW_WRITE) != 0) {
1143 req = ptlrpc_request_alloc_pool(cli->cl_import,
1145 &RQF_OST_BRW_WRITE);
1148 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1153 for (niocount = i = 1; i < page_count; i++) {
1154 if (!can_merge_pages(pga[i - 1], pga[i]))
1158 pill = &req->rq_pill;
1159 req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1161 req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1162 niocount * sizeof(*niobuf));
1164 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1166 ptlrpc_request_free(req);
1169 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1170 ptlrpc_at_set_req_timeout(req);
1171 /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1173 req->rq_no_retry_einprogress = 1;
1175 desc = ptlrpc_prep_bulk_imp(req, page_count,
1176 cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
1177 (opc == OST_WRITE ? PTLRPC_BULK_GET_SOURCE :
1178 PTLRPC_BULK_PUT_SINK) |
1179 PTLRPC_BULK_BUF_KIOV,
1181 &ptlrpc_bulk_kiov_pin_ops);
1184 GOTO(out, rc = -ENOMEM);
1185 /* NB request now owns desc and will free it when it gets freed */
1187 body = req_capsule_client_get(pill, &RMF_OST_BODY);
1188 ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1189 niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1190 LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1192 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
1194 obdo_to_ioobj(oa, ioobj);
1195 ioobj->ioo_bufcnt = niocount;
1196 /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
1197 * that might be send for this request. The actual number is decided
1198 * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
1199 * "max - 1" for old client compatibility sending "0", and also so the
1200 * the actual maximum is a power-of-two number, not one less. LU-1431 */
1201 ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
1202 LASSERT(page_count > 0);
1204 for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1205 struct brw_page *pg = pga[i];
1206 int poff = pg->off & ~PAGE_MASK;
1208 LASSERT(pg->count > 0);
1209 /* make sure there is no gap in the middle of page array */
1210 LASSERTF(page_count == 1 ||
1211 (ergo(i == 0, poff + pg->count == PAGE_SIZE) &&
1212 ergo(i > 0 && i < page_count - 1,
1213 poff == 0 && pg->count == PAGE_SIZE) &&
1214 ergo(i == page_count - 1, poff == 0)),
1215 "i: %d/%d pg: %p off: %llu, count: %u\n",
1216 i, page_count, pg, pg->off, pg->count);
1217 LASSERTF(i == 0 || pg->off > pg_prev->off,
1218 "i %d p_c %u pg %p [pri %lu ind %lu] off %llu"
1219 " prev_pg %p [pri %lu ind %lu] off %llu\n",
1221 pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1222 pg_prev->pg, page_private(pg_prev->pg),
1223 pg_prev->pg->index, pg_prev->off);
1224 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1225 (pg->flag & OBD_BRW_SRVLOCK));
1227 desc->bd_frag_ops->add_kiov_frag(desc, pg->pg, poff, pg->count);
1228 requested_nob += pg->count;
1230 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1232 niobuf->rnb_len += pg->count;
1234 niobuf->rnb_offset = pg->off;
1235 niobuf->rnb_len = pg->count;
1236 niobuf->rnb_flags = pg->flag;
1241 LASSERTF((void *)(niobuf - niocount) ==
1242 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1243 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1244 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1246 osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1248 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1249 body->oa.o_valid |= OBD_MD_FLFLAGS;
1250 body->oa.o_flags = 0;
1252 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1255 if (osc_should_shrink_grant(cli))
1256 osc_shrink_grant_local(cli, &body->oa);
1258 /* size[REQ_REC_OFF] still sizeof (*body) */
1259 if (opc == OST_WRITE) {
1260 if (cli->cl_checksum &&
1261 !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1262 /* store cl_cksum_type in a local variable since
1263 * it can be changed via lprocfs */
1264 cksum_type_t cksum_type = cli->cl_cksum_type;
1266 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1267 oa->o_flags &= OBD_FL_LOCAL_MASK;
1268 body->oa.o_flags = 0;
1270 body->oa.o_flags |= cksum_type_pack(cksum_type);
1271 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1272 body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1276 CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1278 /* save this in 'oa', too, for later checking */
1279 oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1280 oa->o_flags |= cksum_type_pack(cksum_type);
1282 /* clear out the checksum flag, in case this is a
1283 * resend but cl_checksum is no longer set. b=11238 */
1284 oa->o_valid &= ~OBD_MD_FLCKSUM;
1286 oa->o_cksum = body->oa.o_cksum;
1287 /* 1 RC per niobuf */
1288 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1289 sizeof(__u32) * niocount);
1291 if (cli->cl_checksum &&
1292 !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1293 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1294 body->oa.o_flags = 0;
1295 body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1296 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1299 ptlrpc_request_set_replen(req);
1301 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1302 aa = ptlrpc_req_async_args(req);
1304 aa->aa_requested_nob = requested_nob;
1305 aa->aa_nio_count = niocount;
1306 aa->aa_page_count = page_count;
1310 INIT_LIST_HEAD(&aa->aa_oaps);
1313 niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1314 CDEBUG(D_RPCTRACE, "brw rpc %p - object "DOSTID" offset %lld<>%lld\n",
1315 req, POSTID(&oa->o_oi), niobuf[0].rnb_offset,
1316 niobuf[niocount - 1].rnb_offset + niobuf[niocount - 1].rnb_len);
1320 ptlrpc_req_finished(req);
1324 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1325 __u32 client_cksum, __u32 server_cksum, int nob,
1326 size_t page_count, struct brw_page **pga,
1327 cksum_type_t client_cksum_type)
1331 cksum_type_t cksum_type;
1333 if (server_cksum == client_cksum) {
1334 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1338 cksum_type = cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1340 new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1343 if (cksum_type != client_cksum_type)
1344 msg = "the server did not use the checksum type specified in "
1345 "the original request - likely a protocol problem";
1346 else if (new_cksum == server_cksum)
1347 msg = "changed on the client after we checksummed it - "
1348 "likely false positive due to mmap IO (bug 11742)";
1349 else if (new_cksum == client_cksum)
1350 msg = "changed in transit before arrival at OST";
1352 msg = "changed in transit AND doesn't match the original - "
1353 "likely false positive due to mmap IO (bug 11742)";
1355 LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inode "DFID
1356 " object "DOSTID" extent [%llu-%llu]\n",
1357 msg, libcfs_nid2str(peer->nid),
1358 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1359 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1360 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1361 POSTID(&oa->o_oi), pga[0]->off,
1362 pga[page_count-1]->off + pga[page_count-1]->count - 1);
1363 CERROR("original client csum %x (type %x), server csum %x (type %x), "
1364 "client csum now %x\n", client_cksum, client_cksum_type,
1365 server_cksum, cksum_type, new_cksum);
1369 /* Note rc enters this function as number of bytes transferred */
1370 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1372 struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1373 const lnet_process_id_t *peer =
1374 &req->rq_import->imp_connection->c_peer;
1375 struct client_obd *cli = aa->aa_cli;
1376 struct ost_body *body;
1377 u32 client_cksum = 0;
1380 if (rc < 0 && rc != -EDQUOT) {
1381 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1385 LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1386 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1388 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1392 /* set/clear over quota flag for a uid/gid */
1393 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1394 body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) {
1395 unsigned int qid[LL_MAXQUOTAS] =
1396 {body->oa.o_uid, body->oa.o_gid};
1398 CDEBUG(D_QUOTA, "setdq for [%u %u] with valid %#llx, flags %x\n",
1399 body->oa.o_uid, body->oa.o_gid, body->oa.o_valid,
1401 osc_quota_setdq(cli, qid, body->oa.o_valid, body->oa.o_flags);
1404 osc_update_grant(cli, body);
1409 if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1410 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1412 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1414 CERROR("Unexpected +ve rc %d\n", rc);
1417 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1419 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1422 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1423 check_write_checksum(&body->oa, peer, client_cksum,
1424 body->oa.o_cksum, aa->aa_requested_nob,
1425 aa->aa_page_count, aa->aa_ppga,
1426 cksum_type_unpack(aa->aa_oa->o_flags)))
1429 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1430 aa->aa_page_count, aa->aa_ppga);
1434 /* The rest of this function executes only for OST_READs */
1436 /* if unwrap_bulk failed, return -EAGAIN to retry */
1437 rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1439 GOTO(out, rc = -EAGAIN);
1441 if (rc > aa->aa_requested_nob) {
1442 CERROR("Unexpected rc %d (%d requested)\n", rc,
1443 aa->aa_requested_nob);
1447 if (rc != req->rq_bulk->bd_nob_transferred) {
1448 CERROR ("Unexpected rc %d (%d transferred)\n",
1449 rc, req->rq_bulk->bd_nob_transferred);
1453 if (rc < aa->aa_requested_nob)
1454 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1456 if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1457 static int cksum_counter;
1458 u32 server_cksum = body->oa.o_cksum;
1461 cksum_type_t cksum_type;
1463 cksum_type = cksum_type_unpack(body->oa.o_valid &OBD_MD_FLFLAGS?
1464 body->oa.o_flags : 0);
1465 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1466 aa->aa_ppga, OST_READ,
1469 if (peer->nid != req->rq_bulk->bd_sender) {
1471 router = libcfs_nid2str(req->rq_bulk->bd_sender);
1474 if (server_cksum != client_cksum) {
1475 LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1476 "%s%s%s inode "DFID" object "DOSTID
1477 " extent [%llu-%llu]\n",
1478 req->rq_import->imp_obd->obd_name,
1479 libcfs_nid2str(peer->nid),
1481 body->oa.o_valid & OBD_MD_FLFID ?
1482 body->oa.o_parent_seq : (__u64)0,
1483 body->oa.o_valid & OBD_MD_FLFID ?
1484 body->oa.o_parent_oid : 0,
1485 body->oa.o_valid & OBD_MD_FLFID ?
1486 body->oa.o_parent_ver : 0,
1487 POSTID(&body->oa.o_oi),
1488 aa->aa_ppga[0]->off,
1489 aa->aa_ppga[aa->aa_page_count-1]->off +
1490 aa->aa_ppga[aa->aa_page_count-1]->count -
1492 CERROR("client %x, server %x, cksum_type %x\n",
1493 client_cksum, server_cksum, cksum_type);
1495 aa->aa_oa->o_cksum = client_cksum;
1499 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1502 } else if (unlikely(client_cksum)) {
1503 static int cksum_missed;
1506 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1507 CERROR("Checksum %u requested from %s but not sent\n",
1508 cksum_missed, libcfs_nid2str(peer->nid));
1514 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
1515 aa->aa_oa, &body->oa);
1520 static int osc_brw_redo_request(struct ptlrpc_request *request,
1521 struct osc_brw_async_args *aa, int rc)
1523 struct ptlrpc_request *new_req;
1524 struct osc_brw_async_args *new_aa;
1525 struct osc_async_page *oap;
1528 DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
1529 "redo for recoverable error %d", rc);
1531 rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1532 OST_WRITE ? OBD_BRW_WRITE : OBD_BRW_READ,
1533 aa->aa_cli, aa->aa_oa, aa->aa_page_count,
1534 aa->aa_ppga, &new_req, 1);
1538 list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1539 if (oap->oap_request != NULL) {
1540 LASSERTF(request == oap->oap_request,
1541 "request %p != oap_request %p\n",
1542 request, oap->oap_request);
1543 if (oap->oap_interrupted) {
1544 ptlrpc_req_finished(new_req);
1549 /* New request takes over pga and oaps from old request.
1550 * Note that copying a list_head doesn't work, need to move it... */
1552 new_req->rq_interpret_reply = request->rq_interpret_reply;
1553 new_req->rq_async_args = request->rq_async_args;
1554 new_req->rq_commit_cb = request->rq_commit_cb;
1555 /* cap resend delay to the current request timeout, this is similar to
1556 * what ptlrpc does (see after_reply()) */
1557 if (aa->aa_resends > new_req->rq_timeout)
1558 new_req->rq_sent = cfs_time_current_sec() + new_req->rq_timeout;
1560 new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
1561 new_req->rq_generation_set = 1;
1562 new_req->rq_import_generation = request->rq_import_generation;
1564 new_aa = ptlrpc_req_async_args(new_req);
1566 INIT_LIST_HEAD(&new_aa->aa_oaps);
1567 list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
1568 INIT_LIST_HEAD(&new_aa->aa_exts);
1569 list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
1570 new_aa->aa_resends = aa->aa_resends;
1572 list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1573 if (oap->oap_request) {
1574 ptlrpc_req_finished(oap->oap_request);
1575 oap->oap_request = ptlrpc_request_addref(new_req);
1579 /* XXX: This code will run into problem if we're going to support
1580 * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
1581 * and wait for all of them to be finished. We should inherit request
1582 * set from old request. */
1583 ptlrpcd_add_req(new_req);
1585 DEBUG_REQ(D_INFO, new_req, "new request");
1590 * ugh, we want disk allocation on the target to happen in offset order. we'll
1591 * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1592 * fine for our small page arrays and doesn't require allocation. its an
1593 * insertion sort that swaps elements that are strides apart, shrinking the
1594 * stride down until its '1' and the array is sorted.
1596 static void sort_brw_pages(struct brw_page **array, int num)
1599 struct brw_page *tmp;
1603 for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1608 for (i = stride ; i < num ; i++) {
1611 while (j >= stride && array[j - stride]->off > tmp->off) {
1612 array[j] = array[j - stride];
1617 } while (stride > 1);
1620 static void osc_release_ppga(struct brw_page **ppga, size_t count)
1622 LASSERT(ppga != NULL);
1623 OBD_FREE(ppga, sizeof(*ppga) * count);
1626 static int brw_interpret(const struct lu_env *env,
1627 struct ptlrpc_request *req, void *data, int rc)
1629 struct osc_brw_async_args *aa = data;
1630 struct osc_extent *ext;
1631 struct osc_extent *tmp;
1632 struct client_obd *cli = aa->aa_cli;
1635 rc = osc_brw_fini_request(req, rc);
1636 CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
1637 /* When server return -EINPROGRESS, client should always retry
1638 * regardless of the number of times the bulk was resent already. */
1639 if (osc_recoverable_error(rc)) {
1640 if (req->rq_import_generation !=
1641 req->rq_import->imp_generation) {
1642 CDEBUG(D_HA, "%s: resend cross eviction for object: "
1643 ""DOSTID", rc = %d.\n",
1644 req->rq_import->imp_obd->obd_name,
1645 POSTID(&aa->aa_oa->o_oi), rc);
1646 } else if (rc == -EINPROGRESS ||
1647 client_should_resend(aa->aa_resends, aa->aa_cli)) {
1648 rc = osc_brw_redo_request(req, aa, rc);
1650 CERROR("%s: too many resent retries for object: "
1651 "%llu:%llu, rc = %d.\n",
1652 req->rq_import->imp_obd->obd_name,
1653 POSTID(&aa->aa_oa->o_oi), rc);
1658 else if (rc == -EAGAIN || rc == -EINPROGRESS)
1663 struct obdo *oa = aa->aa_oa;
1664 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
1665 unsigned long valid = 0;
1666 struct cl_object *obj;
1667 struct osc_async_page *last;
1669 last = brw_page2oap(aa->aa_ppga[aa->aa_page_count - 1]);
1670 obj = osc2cl(last->oap_obj);
1672 cl_object_attr_lock(obj);
1673 if (oa->o_valid & OBD_MD_FLBLOCKS) {
1674 attr->cat_blocks = oa->o_blocks;
1675 valid |= CAT_BLOCKS;
1677 if (oa->o_valid & OBD_MD_FLMTIME) {
1678 attr->cat_mtime = oa->o_mtime;
1681 if (oa->o_valid & OBD_MD_FLATIME) {
1682 attr->cat_atime = oa->o_atime;
1685 if (oa->o_valid & OBD_MD_FLCTIME) {
1686 attr->cat_ctime = oa->o_ctime;
1690 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1691 struct lov_oinfo *loi = cl2osc(obj)->oo_oinfo;
1692 loff_t last_off = last->oap_count + last->oap_obj_off +
1695 /* Change file size if this is an out of quota or
1696 * direct IO write and it extends the file size */
1697 if (loi->loi_lvb.lvb_size < last_off) {
1698 attr->cat_size = last_off;
1701 /* Extend KMS if it's not a lockless write */
1702 if (loi->loi_kms < last_off &&
1703 oap2osc_page(last)->ops_srvlock == 0) {
1704 attr->cat_kms = last_off;
1710 cl_object_attr_update(env, obj, attr, valid);
1711 cl_object_attr_unlock(obj);
1713 OBDO_FREE(aa->aa_oa);
1715 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE && rc == 0)
1716 osc_inc_unstable_pages(req);
1718 list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
1719 list_del_init(&ext->oe_link);
1720 osc_extent_finish(env, ext, 1, rc);
1722 LASSERT(list_empty(&aa->aa_exts));
1723 LASSERT(list_empty(&aa->aa_oaps));
1725 osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1726 ptlrpc_lprocfs_brw(req, req->rq_bulk->bd_nob_transferred);
1728 spin_lock(&cli->cl_loi_list_lock);
1729 /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
1730 * is called so we know whether to go to sync BRWs or wait for more
1731 * RPCs to complete */
1732 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
1733 cli->cl_w_in_flight--;
1735 cli->cl_r_in_flight--;
1736 osc_wake_cache_waiters(cli);
1737 spin_unlock(&cli->cl_loi_list_lock);
1739 osc_io_unplug(env, cli, NULL);
1743 static void brw_commit(struct ptlrpc_request *req)
1745 /* If osc_inc_unstable_pages (via osc_extent_finish) races with
1746 * this called via the rq_commit_cb, I need to ensure
1747 * osc_dec_unstable_pages is still called. Otherwise unstable
1748 * pages may be leaked. */
1749 spin_lock(&req->rq_lock);
1750 if (likely(req->rq_unstable)) {
1751 req->rq_unstable = 0;
1752 spin_unlock(&req->rq_lock);
1754 osc_dec_unstable_pages(req);
1756 req->rq_committed = 1;
1757 spin_unlock(&req->rq_lock);
1762 * Build an RPC by the list of extent @ext_list. The caller must ensure
1763 * that the total pages in this list are NOT over max pages per RPC.
1764 * Extents in the list must be in OES_RPC state.
1766 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
1767 struct list_head *ext_list, int cmd)
1769 struct ptlrpc_request *req = NULL;
1770 struct osc_extent *ext;
1771 struct brw_page **pga = NULL;
1772 struct osc_brw_async_args *aa = NULL;
1773 struct obdo *oa = NULL;
1774 struct osc_async_page *oap;
1775 struct osc_object *obj = NULL;
1776 struct cl_req_attr *crattr = NULL;
1777 loff_t starting_offset = OBD_OBJECT_EOF;
1778 loff_t ending_offset = 0;
1782 bool soft_sync = false;
1783 bool interrupted = false;
1787 struct list_head rpc_list = LIST_HEAD_INIT(rpc_list);
1788 struct ost_body *body;
1790 LASSERT(!list_empty(ext_list));
1792 /* add pages into rpc_list to build BRW rpc */
1793 list_for_each_entry(ext, ext_list, oe_link) {
1794 LASSERT(ext->oe_state == OES_RPC);
1795 mem_tight |= ext->oe_memalloc;
1796 grant += ext->oe_grants;
1797 page_count += ext->oe_nr_pages;
1802 soft_sync = osc_over_unstable_soft_limit(cli);
1804 mpflag = cfs_memory_pressure_get_and_set();
1806 OBD_ALLOC(pga, sizeof(*pga) * page_count);
1808 GOTO(out, rc = -ENOMEM);
1812 GOTO(out, rc = -ENOMEM);
1815 list_for_each_entry(ext, ext_list, oe_link) {
1816 list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
1818 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
1820 oap->oap_brw_flags |= OBD_BRW_SOFT_SYNC;
1821 pga[i] = &oap->oap_brw_page;
1822 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
1825 list_add_tail(&oap->oap_rpc_item, &rpc_list);
1826 if (starting_offset == OBD_OBJECT_EOF ||
1827 starting_offset > oap->oap_obj_off)
1828 starting_offset = oap->oap_obj_off;
1830 LASSERT(oap->oap_page_off == 0);
1831 if (ending_offset < oap->oap_obj_off + oap->oap_count)
1832 ending_offset = oap->oap_obj_off +
1835 LASSERT(oap->oap_page_off + oap->oap_count ==
1837 if (oap->oap_interrupted)
1842 /* first page in the list */
1843 oap = list_entry(rpc_list.next, typeof(*oap), oap_rpc_item);
1845 crattr = &osc_env_info(env)->oti_req_attr;
1846 memset(crattr, 0, sizeof(*crattr));
1847 crattr->cra_type = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
1848 crattr->cra_flags = ~0ULL;
1849 crattr->cra_page = oap2cl_page(oap);
1850 crattr->cra_oa = oa;
1851 cl_req_attr_set(env, osc2cl(obj), crattr);
1853 if (cmd == OBD_BRW_WRITE)
1854 oa->o_grant_used = grant;
1856 sort_brw_pages(pga, page_count);
1857 rc = osc_brw_prep_request(cmd, cli, oa, page_count, pga, &req, 0);
1859 CERROR("prep_req failed: %d\n", rc);
1863 req->rq_commit_cb = brw_commit;
1864 req->rq_interpret_reply = brw_interpret;
1865 req->rq_memalloc = mem_tight != 0;
1866 oap->oap_request = ptlrpc_request_addref(req);
1867 if (interrupted && !req->rq_intr)
1868 ptlrpc_mark_interrupted(req);
1870 /* Need to update the timestamps after the request is built in case
1871 * we race with setattr (locally or in queue at OST). If OST gets
1872 * later setattr before earlier BRW (as determined by the request xid),
1873 * the OST will not use BRW timestamps. Sadly, there is no obvious
1874 * way to do this in a single call. bug 10150 */
1875 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
1876 crattr->cra_oa = &body->oa;
1877 crattr->cra_flags = OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME;
1878 cl_req_attr_set(env, osc2cl(obj), crattr);
1879 lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid);
1881 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1882 aa = ptlrpc_req_async_args(req);
1883 INIT_LIST_HEAD(&aa->aa_oaps);
1884 list_splice_init(&rpc_list, &aa->aa_oaps);
1885 INIT_LIST_HEAD(&aa->aa_exts);
1886 list_splice_init(ext_list, &aa->aa_exts);
1888 spin_lock(&cli->cl_loi_list_lock);
1889 starting_offset >>= PAGE_SHIFT;
1890 if (cmd == OBD_BRW_READ) {
1891 cli->cl_r_in_flight++;
1892 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
1893 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
1894 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
1895 starting_offset + 1);
1897 cli->cl_w_in_flight++;
1898 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
1899 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
1900 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
1901 starting_offset + 1);
1903 spin_unlock(&cli->cl_loi_list_lock);
1905 DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %ur/%uw in flight",
1906 page_count, aa, cli->cl_r_in_flight,
1907 cli->cl_w_in_flight);
1908 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_DELAY_IO, cfs_fail_val);
1910 ptlrpcd_add_req(req);
1916 cfs_memory_pressure_restore(mpflag);
1919 LASSERT(req == NULL);
1924 OBD_FREE(pga, sizeof(*pga) * page_count);
1925 /* this should happen rarely and is pretty bad, it makes the
1926 * pending list not follow the dirty order */
1927 while (!list_empty(ext_list)) {
1928 ext = list_entry(ext_list->next, struct osc_extent,
1930 list_del_init(&ext->oe_link);
1931 osc_extent_finish(env, ext, 0, rc);
1937 static int osc_set_lock_data(struct ldlm_lock *lock, void *data)
1941 LASSERT(lock != NULL);
1943 lock_res_and_lock(lock);
1945 if (lock->l_ast_data == NULL)
1946 lock->l_ast_data = data;
1947 if (lock->l_ast_data == data)
1950 unlock_res_and_lock(lock);
1955 static int osc_enqueue_fini(struct ptlrpc_request *req,
1956 osc_enqueue_upcall_f upcall, void *cookie,
1957 struct lustre_handle *lockh, enum ldlm_mode mode,
1958 __u64 *flags, int agl, int errcode)
1960 bool intent = *flags & LDLM_FL_HAS_INTENT;
1964 /* The request was created before ldlm_cli_enqueue call. */
1965 if (intent && errcode == ELDLM_LOCK_ABORTED) {
1966 struct ldlm_reply *rep;
1968 rep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
1969 LASSERT(rep != NULL);
1971 rep->lock_policy_res1 =
1972 ptlrpc_status_ntoh(rep->lock_policy_res1);
1973 if (rep->lock_policy_res1)
1974 errcode = rep->lock_policy_res1;
1976 *flags |= LDLM_FL_LVB_READY;
1977 } else if (errcode == ELDLM_OK) {
1978 *flags |= LDLM_FL_LVB_READY;
1981 /* Call the update callback. */
1982 rc = (*upcall)(cookie, lockh, errcode);
1984 /* release the reference taken in ldlm_cli_enqueue() */
1985 if (errcode == ELDLM_LOCK_MATCHED)
1987 if (errcode == ELDLM_OK && lustre_handle_is_used(lockh))
1988 ldlm_lock_decref(lockh, mode);
1993 static int osc_enqueue_interpret(const struct lu_env *env,
1994 struct ptlrpc_request *req,
1995 struct osc_enqueue_args *aa, int rc)
1997 struct ldlm_lock *lock;
1998 struct lustre_handle *lockh = &aa->oa_lockh;
1999 enum ldlm_mode mode = aa->oa_mode;
2000 struct ost_lvb *lvb = aa->oa_lvb;
2001 __u32 lvb_len = sizeof(*lvb);
2006 /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2008 lock = ldlm_handle2lock(lockh);
2009 LASSERTF(lock != NULL,
2010 "lockh %#llx, req %p, aa %p - client evicted?\n",
2011 lockh->cookie, req, aa);
2013 /* Take an additional reference so that a blocking AST that
2014 * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2015 * to arrive after an upcall has been executed by
2016 * osc_enqueue_fini(). */
2017 ldlm_lock_addref(lockh, mode);
2019 /* Let cl_lock_state_wait fail with -ERESTARTSYS to unuse sublocks. */
2020 OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_HANG, 2);
2022 /* Let CP AST to grant the lock first. */
2023 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
2026 LASSERT(aa->oa_lvb == NULL);
2027 LASSERT(aa->oa_flags == NULL);
2028 aa->oa_flags = &flags;
2031 /* Complete obtaining the lock procedure. */
2032 rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_type, 1,
2033 aa->oa_mode, aa->oa_flags, lvb, lvb_len,
2035 /* Complete osc stuff. */
2036 rc = osc_enqueue_fini(req, aa->oa_upcall, aa->oa_cookie, lockh, mode,
2037 aa->oa_flags, aa->oa_agl, rc);
2039 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
2041 ldlm_lock_decref(lockh, mode);
2042 LDLM_LOCK_PUT(lock);
2046 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
2048 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2049 * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2050 * other synchronous requests, however keeping some locks and trying to obtain
2051 * others may take a considerable amount of time in a case of ost failure; and
2052 * when other sync requests do not get released lock from a client, the client
2053 * is evicted from the cluster -- such scenarious make the life difficult, so
2054 * release locks just after they are obtained. */
2055 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2056 __u64 *flags, union ldlm_policy_data *policy,
2057 struct ost_lvb *lvb, int kms_valid,
2058 osc_enqueue_upcall_f upcall, void *cookie,
2059 struct ldlm_enqueue_info *einfo,
2060 struct ptlrpc_request_set *rqset, int async, int agl)
2062 struct obd_device *obd = exp->exp_obd;
2063 struct lustre_handle lockh = { 0 };
2064 struct ptlrpc_request *req = NULL;
2065 int intent = *flags & LDLM_FL_HAS_INTENT;
2066 __u64 match_flags = *flags;
2067 enum ldlm_mode mode;
2071 /* Filesystem lock extents are extended to page boundaries so that
2072 * dealing with the page cache is a little smoother. */
2073 policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2074 policy->l_extent.end |= ~PAGE_MASK;
2077 * kms is not valid when either object is completely fresh (so that no
2078 * locks are cached), or object was evicted. In the latter case cached
2079 * lock cannot be used, because it would prime inode state with
2080 * potentially stale LVB.
2085 /* Next, search for already existing extent locks that will cover us */
2086 /* If we're trying to read, we also search for an existing PW lock. The
2087 * VFS and page cache already protect us locally, so lots of readers/
2088 * writers can share a single PW lock.
2090 * There are problems with conversion deadlocks, so instead of
2091 * converting a read lock to a write lock, we'll just enqueue a new
2094 * At some point we should cancel the read lock instead of making them
2095 * send us a blocking callback, but there are problems with canceling
2096 * locks out from other users right now, too. */
2097 mode = einfo->ei_mode;
2098 if (einfo->ei_mode == LCK_PR)
2101 match_flags |= LDLM_FL_LVB_READY;
2103 match_flags |= LDLM_FL_BLOCK_GRANTED;
2104 mode = ldlm_lock_match(obd->obd_namespace, match_flags, res_id,
2105 einfo->ei_type, policy, mode, &lockh, 0);
2107 struct ldlm_lock *matched;
2109 if (*flags & LDLM_FL_TEST_LOCK)
2112 matched = ldlm_handle2lock(&lockh);
2114 /* AGL enqueues DLM locks speculatively. Therefore if
2115 * it already exists a DLM lock, it wll just inform the
2116 * caller to cancel the AGL process for this stripe. */
2117 ldlm_lock_decref(&lockh, mode);
2118 LDLM_LOCK_PUT(matched);
2120 } else if (osc_set_lock_data(matched, einfo->ei_cbdata)) {
2121 *flags |= LDLM_FL_LVB_READY;
2123 /* We already have a lock, and it's referenced. */
2124 (*upcall)(cookie, &lockh, ELDLM_LOCK_MATCHED);
2126 ldlm_lock_decref(&lockh, mode);
2127 LDLM_LOCK_PUT(matched);
2130 ldlm_lock_decref(&lockh, mode);
2131 LDLM_LOCK_PUT(matched);
2136 if (*flags & (LDLM_FL_TEST_LOCK | LDLM_FL_MATCH_LOCK))
2140 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2141 &RQF_LDLM_ENQUEUE_LVB);
2145 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
2147 ptlrpc_request_free(req);
2151 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
2153 ptlrpc_request_set_replen(req);
2156 /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2157 *flags &= ~LDLM_FL_BLOCK_GRANTED;
2159 rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2160 sizeof(*lvb), LVB_T_OST, &lockh, async);
2163 struct osc_enqueue_args *aa;
2164 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2165 aa = ptlrpc_req_async_args(req);
2167 aa->oa_mode = einfo->ei_mode;
2168 aa->oa_type = einfo->ei_type;
2169 lustre_handle_copy(&aa->oa_lockh, &lockh);
2170 aa->oa_upcall = upcall;
2171 aa->oa_cookie = cookie;
2174 aa->oa_flags = flags;
2177 /* AGL is essentially to enqueue an DLM lock
2178 * in advance, so we don't care about the
2179 * result of AGL enqueue. */
2181 aa->oa_flags = NULL;
2184 req->rq_interpret_reply =
2185 (ptlrpc_interpterer_t)osc_enqueue_interpret;
2186 if (rqset == PTLRPCD_SET)
2187 ptlrpcd_add_req(req);
2189 ptlrpc_set_add_req(rqset, req);
2190 } else if (intent) {
2191 ptlrpc_req_finished(req);
2196 rc = osc_enqueue_fini(req, upcall, cookie, &lockh, einfo->ei_mode,
2199 ptlrpc_req_finished(req);
2204 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2205 enum ldlm_type type, union ldlm_policy_data *policy,
2206 enum ldlm_mode mode, __u64 *flags, void *data,
2207 struct lustre_handle *lockh, int unref)
2209 struct obd_device *obd = exp->exp_obd;
2210 __u64 lflags = *flags;
2214 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2217 /* Filesystem lock extents are extended to page boundaries so that
2218 * dealing with the page cache is a little smoother */
2219 policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2220 policy->l_extent.end |= ~PAGE_MASK;
2222 /* Next, search for already existing extent locks that will cover us */
2223 /* If we're trying to read, we also search for an existing PW lock. The
2224 * VFS and page cache already protect us locally, so lots of readers/
2225 * writers can share a single PW lock. */
2229 rc = ldlm_lock_match(obd->obd_namespace, lflags,
2230 res_id, type, policy, rc, lockh, unref);
2231 if (rc == 0 || lflags & LDLM_FL_TEST_LOCK)
2235 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2237 LASSERT(lock != NULL);
2238 if (!osc_set_lock_data(lock, data)) {
2239 ldlm_lock_decref(lockh, rc);
2242 LDLM_LOCK_PUT(lock);
2247 static int osc_statfs_interpret(const struct lu_env *env,
2248 struct ptlrpc_request *req,
2249 struct osc_async_args *aa, int rc)
2251 struct obd_statfs *msfs;
2255 /* The request has in fact never been sent
2256 * due to issues at a higher level (LOV).
2257 * Exit immediately since the caller is
2258 * aware of the problem and takes care
2259 * of the clean up */
2262 if ((rc == -ENOTCONN || rc == -EAGAIN) &&
2263 (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
2269 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2271 GOTO(out, rc = -EPROTO);
2274 *aa->aa_oi->oi_osfs = *msfs;
2276 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2280 static int osc_statfs_async(struct obd_export *exp,
2281 struct obd_info *oinfo, __u64 max_age,
2282 struct ptlrpc_request_set *rqset)
2284 struct obd_device *obd = class_exp2obd(exp);
2285 struct ptlrpc_request *req;
2286 struct osc_async_args *aa;
2290 /* We could possibly pass max_age in the request (as an absolute
2291 * timestamp or a "seconds.usec ago") so the target can avoid doing
2292 * extra calls into the filesystem if that isn't necessary (e.g.
2293 * during mount that would help a bit). Having relative timestamps
2294 * is not so great if request processing is slow, while absolute
2295 * timestamps are not ideal because they need time synchronization. */
2296 req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
2300 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2302 ptlrpc_request_free(req);
2305 ptlrpc_request_set_replen(req);
2306 req->rq_request_portal = OST_CREATE_PORTAL;
2307 ptlrpc_at_set_req_timeout(req);
2309 if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
2310 /* procfs requests not want stat in wait for avoid deadlock */
2311 req->rq_no_resend = 1;
2312 req->rq_no_delay = 1;
2315 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
2316 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2317 aa = ptlrpc_req_async_args(req);
2320 ptlrpc_set_add_req(rqset, req);
2324 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
2325 struct obd_statfs *osfs, __u64 max_age, __u32 flags)
2327 struct obd_device *obd = class_exp2obd(exp);
2328 struct obd_statfs *msfs;
2329 struct ptlrpc_request *req;
2330 struct obd_import *imp = NULL;
2334 /*Since the request might also come from lprocfs, so we need
2335 *sync this with client_disconnect_export Bug15684*/
2336 down_read(&obd->u.cli.cl_sem);
2337 if (obd->u.cli.cl_import)
2338 imp = class_import_get(obd->u.cli.cl_import);
2339 up_read(&obd->u.cli.cl_sem);
2343 /* We could possibly pass max_age in the request (as an absolute
2344 * timestamp or a "seconds.usec ago") so the target can avoid doing
2345 * extra calls into the filesystem if that isn't necessary (e.g.
2346 * during mount that would help a bit). Having relative timestamps
2347 * is not so great if request processing is slow, while absolute
2348 * timestamps are not ideal because they need time synchronization. */
2349 req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
2351 class_import_put(imp);
2356 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2358 ptlrpc_request_free(req);
2361 ptlrpc_request_set_replen(req);
2362 req->rq_request_portal = OST_CREATE_PORTAL;
2363 ptlrpc_at_set_req_timeout(req);
2365 if (flags & OBD_STATFS_NODELAY) {
2366 /* procfs requests not want stat in wait for avoid deadlock */
2367 req->rq_no_resend = 1;
2368 req->rq_no_delay = 1;
2371 rc = ptlrpc_queue_wait(req);
2375 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2377 GOTO(out, rc = -EPROTO);
2384 ptlrpc_req_finished(req);
2388 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2389 void *karg, void __user *uarg)
2391 struct obd_device *obd = exp->exp_obd;
2392 struct obd_ioctl_data *data = karg;
2396 if (!try_module_get(THIS_MODULE)) {
2397 CERROR("%s: cannot get module '%s'\n", obd->obd_name,
2398 module_name(THIS_MODULE));
2402 case OBD_IOC_CLIENT_RECOVER:
2403 err = ptlrpc_recover_import(obd->u.cli.cl_import,
2404 data->ioc_inlbuf1, 0);
2408 case IOC_OSC_SET_ACTIVE:
2409 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
2412 case OBD_IOC_PING_TARGET:
2413 err = ptlrpc_obd_ping(obd);
2416 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
2417 cmd, current_comm());
2418 GOTO(out, err = -ENOTTY);
2421 module_put(THIS_MODULE);
2425 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
2426 u32 keylen, void *key,
2427 u32 vallen, void *val,
2428 struct ptlrpc_request_set *set)
2430 struct ptlrpc_request *req;
2431 struct obd_device *obd = exp->exp_obd;
2432 struct obd_import *imp = class_exp2cliimp(exp);
2437 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
2439 if (KEY_IS(KEY_CHECKSUM)) {
2440 if (vallen != sizeof(int))
2442 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
2446 if (KEY_IS(KEY_SPTLRPC_CONF)) {
2447 sptlrpc_conf_client_adapt(obd);
2451 if (KEY_IS(KEY_FLUSH_CTX)) {
2452 sptlrpc_import_flush_my_ctx(imp);
2456 if (KEY_IS(KEY_CACHE_SET)) {
2457 struct client_obd *cli = &obd->u.cli;
2459 LASSERT(cli->cl_cache == NULL); /* only once */
2460 cli->cl_cache = (struct cl_client_cache *)val;
2461 cl_cache_incref(cli->cl_cache);
2462 cli->cl_lru_left = &cli->cl_cache->ccc_lru_left;
2464 /* add this osc into entity list */
2465 LASSERT(list_empty(&cli->cl_lru_osc));
2466 spin_lock(&cli->cl_cache->ccc_lru_lock);
2467 list_add(&cli->cl_lru_osc, &cli->cl_cache->ccc_lru);
2468 spin_unlock(&cli->cl_cache->ccc_lru_lock);
2473 if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
2474 struct client_obd *cli = &obd->u.cli;
2475 long nr = atomic_long_read(&cli->cl_lru_in_list) >> 1;
2476 long target = *(long *)val;
2478 nr = osc_lru_shrink(env, cli, min(nr, target), true);
2483 if (!set && !KEY_IS(KEY_GRANT_SHRINK))
2486 /* We pass all other commands directly to OST. Since nobody calls osc
2487 methods directly and everybody is supposed to go through LOV, we
2488 assume lov checked invalid values for us.
2489 The only recognised values so far are evict_by_nid and mds_conn.
2490 Even if something bad goes through, we'd get a -EINVAL from OST
2493 req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ?
2494 &RQF_OST_SET_GRANT_INFO :
2499 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
2500 RCL_CLIENT, keylen);
2501 if (!KEY_IS(KEY_GRANT_SHRINK))
2502 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
2503 RCL_CLIENT, vallen);
2504 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
2506 ptlrpc_request_free(req);
2510 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
2511 memcpy(tmp, key, keylen);
2512 tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ?
2515 memcpy(tmp, val, vallen);
2517 if (KEY_IS(KEY_GRANT_SHRINK)) {
2518 struct osc_grant_args *aa;
2521 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2522 aa = ptlrpc_req_async_args(req);
2525 ptlrpc_req_finished(req);
2528 *oa = ((struct ost_body *)val)->oa;
2530 req->rq_interpret_reply = osc_shrink_grant_interpret;
2533 ptlrpc_request_set_replen(req);
2534 if (!KEY_IS(KEY_GRANT_SHRINK)) {
2535 LASSERT(set != NULL);
2536 ptlrpc_set_add_req(set, req);
2537 ptlrpc_check_set(NULL, set);
2539 ptlrpcd_add_req(req);
2545 static int osc_reconnect(const struct lu_env *env,
2546 struct obd_export *exp, struct obd_device *obd,
2547 struct obd_uuid *cluuid,
2548 struct obd_connect_data *data,
2551 struct client_obd *cli = &obd->u.cli;
2553 if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
2557 spin_lock(&cli->cl_loi_list_lock);
2558 grant = cli->cl_avail_grant + cli->cl_reserved_grant;
2559 if (data->ocd_connect_flags & OBD_CONNECT_GRANT_PARAM)
2560 grant += cli->cl_dirty_grant;
2562 grant += cli->cl_dirty_pages << PAGE_SHIFT;
2563 data->ocd_grant = grant ? : 2 * cli_brw_size(obd);
2564 lost_grant = cli->cl_lost_grant;
2565 cli->cl_lost_grant = 0;
2566 spin_unlock(&cli->cl_loi_list_lock);
2568 CDEBUG(D_RPCTRACE, "ocd_connect_flags: %#llx ocd_version: %d"
2569 " ocd_grant: %d, lost: %ld.\n", data->ocd_connect_flags,
2570 data->ocd_version, data->ocd_grant, lost_grant);
2576 static int osc_disconnect(struct obd_export *exp)
2578 struct obd_device *obd = class_exp2obd(exp);
2581 rc = client_disconnect_export(exp);
2583 * Initially we put del_shrink_grant before disconnect_export, but it
2584 * causes the following problem if setup (connect) and cleanup
2585 * (disconnect) are tangled together.
2586 * connect p1 disconnect p2
2587 * ptlrpc_connect_import
2588 * ............... class_manual_cleanup
2591 * ptlrpc_connect_interrupt
2593 * add this client to shrink list
2595 * Bang! pinger trigger the shrink.
2596 * So the osc should be disconnected from the shrink list, after we
2597 * are sure the import has been destroyed. BUG18662
2599 if (obd->u.cli.cl_import == NULL)
2600 osc_del_shrink_grant(&obd->u.cli);
2604 static int osc_ldlm_resource_invalidate(struct cfs_hash *hs,
2605 struct cfs_hash_bd *bd, struct hlist_node *hnode, void *arg)
2607 struct lu_env *env = arg;
2608 struct ldlm_resource *res = cfs_hash_object(hs, hnode);
2609 struct ldlm_lock *lock;
2610 struct osc_object *osc = NULL;
2614 list_for_each_entry(lock, &res->lr_granted, l_res_link) {
2615 if (lock->l_ast_data != NULL && osc == NULL) {
2616 osc = lock->l_ast_data;
2617 cl_object_get(osc2cl(osc));
2620 /* clear LDLM_FL_CLEANED flag to make sure it will be canceled
2621 * by the 2nd round of ldlm_namespace_clean() call in
2622 * osc_import_event(). */
2623 ldlm_clear_cleaned(lock);
2628 osc_object_invalidate(env, osc);
2629 cl_object_put(env, osc2cl(osc));
2635 static int osc_import_event(struct obd_device *obd,
2636 struct obd_import *imp,
2637 enum obd_import_event event)
2639 struct client_obd *cli;
2643 LASSERT(imp->imp_obd == obd);
2646 case IMP_EVENT_DISCON: {
2648 spin_lock(&cli->cl_loi_list_lock);
2649 cli->cl_avail_grant = 0;
2650 cli->cl_lost_grant = 0;
2651 spin_unlock(&cli->cl_loi_list_lock);
2654 case IMP_EVENT_INACTIVE: {
2655 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
2658 case IMP_EVENT_INVALIDATE: {
2659 struct ldlm_namespace *ns = obd->obd_namespace;
2663 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
2665 env = cl_env_get(&refcheck);
2667 osc_io_unplug(env, &obd->u.cli, NULL);
2669 cfs_hash_for_each_nolock(ns->ns_rs_hash,
2670 osc_ldlm_resource_invalidate,
2672 cl_env_put(env, &refcheck);
2674 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
2679 case IMP_EVENT_ACTIVE: {
2680 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
2683 case IMP_EVENT_OCD: {
2684 struct obd_connect_data *ocd = &imp->imp_connect_data;
2686 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
2687 osc_init_grant(&obd->u.cli, ocd);
2690 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
2691 imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
2693 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
2696 case IMP_EVENT_DEACTIVATE: {
2697 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE, NULL);
2700 case IMP_EVENT_ACTIVATE: {
2701 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE, NULL);
2705 CERROR("Unknown import event %d\n", event);
2712 * Determine whether the lock can be canceled before replaying the lock
2713 * during recovery, see bug16774 for detailed information.
2715 * \retval zero the lock can't be canceled
2716 * \retval other ok to cancel
2718 static int osc_cancel_weight(struct ldlm_lock *lock)
2721 * Cancel all unused and granted extent lock.
2723 if (lock->l_resource->lr_type == LDLM_EXTENT &&
2724 lock->l_granted_mode == lock->l_req_mode &&
2725 osc_ldlm_weigh_ast(lock) == 0)
2731 static int brw_queue_work(const struct lu_env *env, void *data)
2733 struct client_obd *cli = data;
2735 CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
2737 osc_io_unplug(env, cli, NULL);
2741 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
2743 struct client_obd *cli = &obd->u.cli;
2744 struct obd_type *type;
2752 rc = ptlrpcd_addref();
2756 rc = client_obd_setup(obd, lcfg);
2758 GOTO(out_ptlrpcd, rc);
2760 handler = ptlrpcd_alloc_work(cli->cl_import, brw_queue_work, cli);
2761 if (IS_ERR(handler))
2762 GOTO(out_client_setup, rc = PTR_ERR(handler));
2763 cli->cl_writeback_work = handler;
2765 handler = ptlrpcd_alloc_work(cli->cl_import, lru_queue_work, cli);
2766 if (IS_ERR(handler))
2767 GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
2768 cli->cl_lru_work = handler;
2770 rc = osc_quota_setup(obd);
2772 GOTO(out_ptlrpcd_work, rc);
2774 cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
2776 #ifdef CONFIG_PROC_FS
2777 obd->obd_vars = lprocfs_osc_obd_vars;
2779 /* If this is true then both client (osc) and server (osp) are on the
2780 * same node. The osp layer if loaded first will register the osc proc
2781 * directory. In that case this obd_device will be attached its proc
2782 * tree to type->typ_procsym instead of obd->obd_type->typ_procroot. */
2783 type = class_search_type(LUSTRE_OSP_NAME);
2784 if (type && type->typ_procsym) {
2785 obd->obd_proc_entry = lprocfs_register(obd->obd_name,
2787 obd->obd_vars, obd);
2788 if (IS_ERR(obd->obd_proc_entry)) {
2789 rc = PTR_ERR(obd->obd_proc_entry);
2790 CERROR("error %d setting up lprocfs for %s\n", rc,
2792 obd->obd_proc_entry = NULL;
2795 rc = lprocfs_obd_setup(obd);
2798 /* If the basic OSC proc tree construction succeeded then
2799 * lets do the rest. */
2801 lproc_osc_attach_seqstat(obd);
2802 sptlrpc_lprocfs_cliobd_attach(obd);
2803 ptlrpc_lprocfs_register_obd(obd);
2807 * We try to control the total number of requests with a upper limit
2808 * osc_reqpool_maxreqcount. There might be some race which will cause
2809 * over-limit allocation, but it is fine.
2811 req_count = atomic_read(&osc_pool_req_count);
2812 if (req_count < osc_reqpool_maxreqcount) {
2813 adding = cli->cl_max_rpcs_in_flight + 2;
2814 if (req_count + adding > osc_reqpool_maxreqcount)
2815 adding = osc_reqpool_maxreqcount - req_count;
2817 added = ptlrpc_add_rqs_to_pool(osc_rq_pool, adding);
2818 atomic_add(added, &osc_pool_req_count);
2821 INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
2822 ns_register_cancel(obd->obd_namespace, osc_cancel_weight);
2824 spin_lock(&osc_shrink_lock);
2825 list_add_tail(&cli->cl_shrink_list, &osc_shrink_list);
2826 spin_unlock(&osc_shrink_lock);
2831 if (cli->cl_writeback_work != NULL) {
2832 ptlrpcd_destroy_work(cli->cl_writeback_work);
2833 cli->cl_writeback_work = NULL;
2835 if (cli->cl_lru_work != NULL) {
2836 ptlrpcd_destroy_work(cli->cl_lru_work);
2837 cli->cl_lru_work = NULL;
2840 client_obd_cleanup(obd);
2846 static int osc_precleanup(struct obd_device *obd)
2848 struct client_obd *cli = &obd->u.cli;
2852 * for echo client, export may be on zombie list, wait for
2853 * zombie thread to cull it, because cli.cl_import will be
2854 * cleared in client_disconnect_export():
2855 * class_export_destroy() -> obd_cleanup() ->
2856 * echo_device_free() -> echo_client_cleanup() ->
2857 * obd_disconnect() -> osc_disconnect() ->
2858 * client_disconnect_export()
2860 obd_zombie_barrier();
2861 if (cli->cl_writeback_work) {
2862 ptlrpcd_destroy_work(cli->cl_writeback_work);
2863 cli->cl_writeback_work = NULL;
2866 if (cli->cl_lru_work) {
2867 ptlrpcd_destroy_work(cli->cl_lru_work);
2868 cli->cl_lru_work = NULL;
2871 obd_cleanup_client_import(obd);
2872 ptlrpc_lprocfs_unregister_obd(obd);
2873 lprocfs_obd_cleanup(obd);
2877 int osc_cleanup(struct obd_device *obd)
2879 struct client_obd *cli = &obd->u.cli;
2884 spin_lock(&osc_shrink_lock);
2885 list_del(&cli->cl_shrink_list);
2886 spin_unlock(&osc_shrink_lock);
2889 if (cli->cl_cache != NULL) {
2890 LASSERT(atomic_read(&cli->cl_cache->ccc_users) > 0);
2891 spin_lock(&cli->cl_cache->ccc_lru_lock);
2892 list_del_init(&cli->cl_lru_osc);
2893 spin_unlock(&cli->cl_cache->ccc_lru_lock);
2894 cli->cl_lru_left = NULL;
2895 cl_cache_decref(cli->cl_cache);
2896 cli->cl_cache = NULL;
2899 /* free memory of osc quota cache */
2900 osc_quota_cleanup(obd);
2902 rc = client_obd_cleanup(obd);
2908 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
2910 int rc = class_process_proc_param(PARAM_OSC, obd->obd_vars, lcfg, obd);
2911 return rc > 0 ? 0: rc;
2914 static int osc_process_config(struct obd_device *obd, size_t len, void *buf)
2916 return osc_process_config_base(obd, buf);
2919 static struct obd_ops osc_obd_ops = {
2920 .o_owner = THIS_MODULE,
2921 .o_setup = osc_setup,
2922 .o_precleanup = osc_precleanup,
2923 .o_cleanup = osc_cleanup,
2924 .o_add_conn = client_import_add_conn,
2925 .o_del_conn = client_import_del_conn,
2926 .o_connect = client_connect_import,
2927 .o_reconnect = osc_reconnect,
2928 .o_disconnect = osc_disconnect,
2929 .o_statfs = osc_statfs,
2930 .o_statfs_async = osc_statfs_async,
2931 .o_create = osc_create,
2932 .o_destroy = osc_destroy,
2933 .o_getattr = osc_getattr,
2934 .o_setattr = osc_setattr,
2935 .o_iocontrol = osc_iocontrol,
2936 .o_set_info_async = osc_set_info_async,
2937 .o_import_event = osc_import_event,
2938 .o_process_config = osc_process_config,
2939 .o_quotactl = osc_quotactl,
2942 static struct shrinker *osc_cache_shrinker;
2943 struct list_head osc_shrink_list = LIST_HEAD_INIT(osc_shrink_list);
2944 DEFINE_SPINLOCK(osc_shrink_lock);
2946 #ifndef HAVE_SHRINKER_COUNT
2947 static int osc_cache_shrink(SHRINKER_ARGS(sc, nr_to_scan, gfp_mask))
2949 struct shrink_control scv = {
2950 .nr_to_scan = shrink_param(sc, nr_to_scan),
2951 .gfp_mask = shrink_param(sc, gfp_mask)
2953 #if !defined(HAVE_SHRINKER_WANT_SHRINK_PTR) && !defined(HAVE_SHRINK_CONTROL)
2954 struct shrinker *shrinker = NULL;
2957 (void)osc_cache_shrink_scan(shrinker, &scv);
2959 return osc_cache_shrink_count(shrinker, &scv);
2963 static int __init osc_init(void)
2965 bool enable_proc = true;
2966 struct obd_type *type;
2967 unsigned int reqpool_size;
2968 unsigned int reqsize;
2970 DEF_SHRINKER_VAR(osc_shvar, osc_cache_shrink,
2971 osc_cache_shrink_count, osc_cache_shrink_scan);
2974 /* print an address of _any_ initialized kernel symbol from this
2975 * module, to allow debugging with gdb that doesn't support data
2976 * symbols from modules.*/
2977 CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
2979 rc = lu_kmem_init(osc_caches);
2983 type = class_search_type(LUSTRE_OSP_NAME);
2984 if (type != NULL && type->typ_procsym != NULL)
2985 enable_proc = false;
2987 rc = class_register_type(&osc_obd_ops, NULL, enable_proc, NULL,
2988 LUSTRE_OSC_NAME, &osc_device_type);
2992 osc_cache_shrinker = set_shrinker(DEFAULT_SEEKS, &osc_shvar);
2994 /* This is obviously too much memory, only prevent overflow here */
2995 if (osc_reqpool_mem_max >= 1 << 12 || osc_reqpool_mem_max == 0)
2996 GOTO(out_type, rc = -EINVAL);
2998 reqpool_size = osc_reqpool_mem_max << 20;
3001 while (reqsize < OST_IO_MAXREQSIZE)
3002 reqsize = reqsize << 1;
3005 * We don't enlarge the request count in OSC pool according to
3006 * cl_max_rpcs_in_flight. The allocation from the pool will only be
3007 * tried after normal allocation failed. So a small OSC pool won't
3008 * cause much performance degression in most of cases.
3010 osc_reqpool_maxreqcount = reqpool_size / reqsize;
3012 atomic_set(&osc_pool_req_count, 0);
3013 osc_rq_pool = ptlrpc_init_rq_pool(0, OST_IO_MAXREQSIZE,
3014 ptlrpc_add_rqs_to_pool);
3016 if (osc_rq_pool != NULL)
3020 class_unregister_type(LUSTRE_OSC_NAME);
3022 lu_kmem_fini(osc_caches);
3027 static void __exit osc_exit(void)
3029 remove_shrinker(osc_cache_shrinker);
3030 class_unregister_type(LUSTRE_OSC_NAME);
3031 lu_kmem_fini(osc_caches);
3032 ptlrpc_free_rq_pool(osc_rq_pool);
3035 MODULE_AUTHOR("OpenSFS, Inc. <http://www.lustre.org/>");
3036 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3037 MODULE_VERSION(LUSTRE_VERSION_STRING);
3038 MODULE_LICENSE("GPL");
3040 module_init(osc_init);
3041 module_exit(osc_exit);