4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.gnu.org/licenses/gpl-2.0.html
23 * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
24 * Use is subject to license terms.
26 * Copyright (c) 2011, 2016, Intel Corporation.
29 * This file is part of Lustre, http://www.lustre.org/
30 * Lustre is a trademark of Sun Microsystems, Inc.
33 #define DEBUG_SUBSYSTEM S_OSC
35 #include <libcfs/libcfs.h>
37 #include <lprocfs_status.h>
38 #include <lustre_debug.h>
39 #include <lustre_dlm.h>
40 #include <lustre_fid.h>
41 #include <lustre_ha.h>
42 #include <uapi/linux/lustre/lustre_ioctl.h>
43 #include <lustre_net.h>
44 #include <lustre_obdo.h>
45 #include <uapi/linux/lustre/lustre_param.h>
47 #include <obd_cksum.h>
48 #include <obd_class.h>
49 #include <lustre_osc.h>
51 #include "osc_internal.h"
53 atomic_t osc_pool_req_count;
54 unsigned int osc_reqpool_maxreqcount;
55 struct ptlrpc_request_pool *osc_rq_pool;
57 /* max memory used for request pool, unit is MB */
58 static unsigned int osc_reqpool_mem_max = 5;
59 module_param(osc_reqpool_mem_max, uint, 0444);
61 struct osc_brw_async_args {
67 struct brw_page **aa_ppga;
68 struct client_obd *aa_cli;
69 struct list_head aa_oaps;
70 struct list_head aa_exts;
73 #define osc_grant_args osc_brw_async_args
75 struct osc_setattr_args {
77 obd_enqueue_update_f sa_upcall;
81 struct osc_fsync_args {
82 struct osc_object *fa_obj;
84 obd_enqueue_update_f fa_upcall;
88 struct osc_ladvise_args {
90 obd_enqueue_update_f la_upcall;
94 static void osc_release_ppga(struct brw_page **ppga, size_t count);
95 static int brw_interpret(const struct lu_env *env, struct ptlrpc_request *req,
98 void osc_pack_req_body(struct ptlrpc_request *req, struct obdo *oa)
100 struct ost_body *body;
102 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
105 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
108 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
111 struct ptlrpc_request *req;
112 struct ost_body *body;
116 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
120 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
122 ptlrpc_request_free(req);
126 osc_pack_req_body(req, oa);
128 ptlrpc_request_set_replen(req);
130 rc = ptlrpc_queue_wait(req);
134 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
136 GOTO(out, rc = -EPROTO);
138 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
139 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
141 oa->o_blksize = cli_brw_size(exp->exp_obd);
142 oa->o_valid |= OBD_MD_FLBLKSZ;
146 ptlrpc_req_finished(req);
151 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
154 struct ptlrpc_request *req;
155 struct ost_body *body;
159 LASSERT(oa->o_valid & OBD_MD_FLGROUP);
161 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
165 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
167 ptlrpc_request_free(req);
171 osc_pack_req_body(req, oa);
173 ptlrpc_request_set_replen(req);
175 rc = ptlrpc_queue_wait(req);
179 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
181 GOTO(out, rc = -EPROTO);
183 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
187 ptlrpc_req_finished(req);
192 static int osc_setattr_interpret(const struct lu_env *env,
193 struct ptlrpc_request *req,
194 struct osc_setattr_args *sa, int rc)
196 struct ost_body *body;
202 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
204 GOTO(out, rc = -EPROTO);
206 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, sa->sa_oa,
209 rc = sa->sa_upcall(sa->sa_cookie, rc);
213 int osc_setattr_async(struct obd_export *exp, struct obdo *oa,
214 obd_enqueue_update_f upcall, void *cookie,
215 struct ptlrpc_request_set *rqset)
217 struct ptlrpc_request *req;
218 struct osc_setattr_args *sa;
223 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
227 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
229 ptlrpc_request_free(req);
233 osc_pack_req_body(req, oa);
235 ptlrpc_request_set_replen(req);
237 /* do mds to ost setattr asynchronously */
239 /* Do not wait for response. */
240 ptlrpcd_add_req(req);
242 req->rq_interpret_reply =
243 (ptlrpc_interpterer_t)osc_setattr_interpret;
245 CLASSERT(sizeof(*sa) <= sizeof(req->rq_async_args));
246 sa = ptlrpc_req_async_args(req);
248 sa->sa_upcall = upcall;
249 sa->sa_cookie = cookie;
251 if (rqset == PTLRPCD_SET)
252 ptlrpcd_add_req(req);
254 ptlrpc_set_add_req(rqset, req);
260 static int osc_ladvise_interpret(const struct lu_env *env,
261 struct ptlrpc_request *req,
264 struct osc_ladvise_args *la = arg;
265 struct ost_body *body;
271 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
273 GOTO(out, rc = -EPROTO);
275 *la->la_oa = body->oa;
277 rc = la->la_upcall(la->la_cookie, rc);
282 * If rqset is NULL, do not wait for response. Upcall and cookie could also
283 * be NULL in this case
285 int osc_ladvise_base(struct obd_export *exp, struct obdo *oa,
286 struct ladvise_hdr *ladvise_hdr,
287 obd_enqueue_update_f upcall, void *cookie,
288 struct ptlrpc_request_set *rqset)
290 struct ptlrpc_request *req;
291 struct ost_body *body;
292 struct osc_ladvise_args *la;
294 struct lu_ladvise *req_ladvise;
295 struct lu_ladvise *ladvise = ladvise_hdr->lah_advise;
296 int num_advise = ladvise_hdr->lah_count;
297 struct ladvise_hdr *req_ladvise_hdr;
300 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_LADVISE);
304 req_capsule_set_size(&req->rq_pill, &RMF_OST_LADVISE, RCL_CLIENT,
305 num_advise * sizeof(*ladvise));
306 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_LADVISE);
308 ptlrpc_request_free(req);
311 req->rq_request_portal = OST_IO_PORTAL;
312 ptlrpc_at_set_req_timeout(req);
314 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
316 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
319 req_ladvise_hdr = req_capsule_client_get(&req->rq_pill,
320 &RMF_OST_LADVISE_HDR);
321 memcpy(req_ladvise_hdr, ladvise_hdr, sizeof(*ladvise_hdr));
323 req_ladvise = req_capsule_client_get(&req->rq_pill, &RMF_OST_LADVISE);
324 memcpy(req_ladvise, ladvise, sizeof(*ladvise) * num_advise);
325 ptlrpc_request_set_replen(req);
328 /* Do not wait for response. */
329 ptlrpcd_add_req(req);
333 req->rq_interpret_reply = osc_ladvise_interpret;
334 CLASSERT(sizeof(*la) <= sizeof(req->rq_async_args));
335 la = ptlrpc_req_async_args(req);
337 la->la_upcall = upcall;
338 la->la_cookie = cookie;
340 if (rqset == PTLRPCD_SET)
341 ptlrpcd_add_req(req);
343 ptlrpc_set_add_req(rqset, req);
348 static int osc_create(const struct lu_env *env, struct obd_export *exp,
351 struct ptlrpc_request *req;
352 struct ost_body *body;
357 LASSERT(oa->o_valid & OBD_MD_FLGROUP);
358 LASSERT(fid_seq_is_echo(ostid_seq(&oa->o_oi)));
360 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
362 GOTO(out, rc = -ENOMEM);
364 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
366 ptlrpc_request_free(req);
370 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
373 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
375 ptlrpc_request_set_replen(req);
377 rc = ptlrpc_queue_wait(req);
381 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
383 GOTO(out_req, rc = -EPROTO);
385 CDEBUG(D_INFO, "oa flags %x\n", oa->o_flags);
386 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
388 oa->o_blksize = cli_brw_size(exp->exp_obd);
389 oa->o_valid |= OBD_MD_FLBLKSZ;
391 CDEBUG(D_HA, "transno: %lld\n",
392 lustre_msg_get_transno(req->rq_repmsg));
394 ptlrpc_req_finished(req);
399 int osc_punch_send(struct obd_export *exp, struct obdo *oa,
400 obd_enqueue_update_f upcall, void *cookie)
402 struct ptlrpc_request *req;
403 struct osc_setattr_args *sa;
404 struct obd_import *imp = class_exp2cliimp(exp);
405 struct ost_body *body;
410 req = ptlrpc_request_alloc(imp, &RQF_OST_PUNCH);
414 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
416 ptlrpc_request_free(req);
420 osc_set_io_portal(req);
422 ptlrpc_at_set_req_timeout(req);
424 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
426 lustre_set_wire_obdo(&imp->imp_connect_data, &body->oa, oa);
428 ptlrpc_request_set_replen(req);
430 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
431 CLASSERT(sizeof(*sa) <= sizeof(req->rq_async_args));
432 sa = ptlrpc_req_async_args(req);
434 sa->sa_upcall = upcall;
435 sa->sa_cookie = cookie;
437 ptlrpcd_add_req(req);
441 EXPORT_SYMBOL(osc_punch_send);
443 static int osc_sync_interpret(const struct lu_env *env,
444 struct ptlrpc_request *req,
447 struct osc_fsync_args *fa = arg;
448 struct ost_body *body;
449 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
450 unsigned long valid = 0;
451 struct cl_object *obj;
457 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
459 CERROR("can't unpack ost_body\n");
460 GOTO(out, rc = -EPROTO);
463 *fa->fa_oa = body->oa;
464 obj = osc2cl(fa->fa_obj);
466 /* Update osc object's blocks attribute */
467 cl_object_attr_lock(obj);
468 if (body->oa.o_valid & OBD_MD_FLBLOCKS) {
469 attr->cat_blocks = body->oa.o_blocks;
474 cl_object_attr_update(env, obj, attr, valid);
475 cl_object_attr_unlock(obj);
478 rc = fa->fa_upcall(fa->fa_cookie, rc);
482 int osc_sync_base(struct osc_object *obj, struct obdo *oa,
483 obd_enqueue_update_f upcall, void *cookie,
484 struct ptlrpc_request_set *rqset)
486 struct obd_export *exp = osc_export(obj);
487 struct ptlrpc_request *req;
488 struct ost_body *body;
489 struct osc_fsync_args *fa;
493 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
497 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
499 ptlrpc_request_free(req);
503 /* overload the size and blocks fields in the oa with start/end */
504 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
506 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
508 ptlrpc_request_set_replen(req);
509 req->rq_interpret_reply = osc_sync_interpret;
511 CLASSERT(sizeof(*fa) <= sizeof(req->rq_async_args));
512 fa = ptlrpc_req_async_args(req);
515 fa->fa_upcall = upcall;
516 fa->fa_cookie = cookie;
518 if (rqset == PTLRPCD_SET)
519 ptlrpcd_add_req(req);
521 ptlrpc_set_add_req(rqset, req);
526 /* Find and cancel locally locks matched by @mode in the resource found by
527 * @objid. Found locks are added into @cancel list. Returns the amount of
528 * locks added to @cancels list. */
529 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
530 struct list_head *cancels,
531 enum ldlm_mode mode, __u64 lock_flags)
533 struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
534 struct ldlm_res_id res_id;
535 struct ldlm_resource *res;
539 /* Return, i.e. cancel nothing, only if ELC is supported (flag in
540 * export) but disabled through procfs (flag in NS).
542 * This distinguishes from a case when ELC is not supported originally,
543 * when we still want to cancel locks in advance and just cancel them
544 * locally, without sending any RPC. */
545 if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
548 ostid_build_res_name(&oa->o_oi, &res_id);
549 res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
553 LDLM_RESOURCE_ADDREF(res);
554 count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
555 lock_flags, 0, NULL);
556 LDLM_RESOURCE_DELREF(res);
557 ldlm_resource_putref(res);
561 static int osc_destroy_interpret(const struct lu_env *env,
562 struct ptlrpc_request *req, void *data,
565 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
567 atomic_dec(&cli->cl_destroy_in_flight);
568 wake_up(&cli->cl_destroy_waitq);
572 static int osc_can_send_destroy(struct client_obd *cli)
574 if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
575 cli->cl_max_rpcs_in_flight) {
576 /* The destroy request can be sent */
579 if (atomic_dec_return(&cli->cl_destroy_in_flight) <
580 cli->cl_max_rpcs_in_flight) {
582 * The counter has been modified between the two atomic
585 wake_up(&cli->cl_destroy_waitq);
590 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
593 struct client_obd *cli = &exp->exp_obd->u.cli;
594 struct ptlrpc_request *req;
595 struct ost_body *body;
596 struct list_head cancels = LIST_HEAD_INIT(cancels);
601 CDEBUG(D_INFO, "oa NULL\n");
605 count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
606 LDLM_FL_DISCARD_DATA);
608 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
610 ldlm_lock_list_put(&cancels, l_bl_ast, count);
614 rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
617 ptlrpc_request_free(req);
621 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
622 ptlrpc_at_set_req_timeout(req);
624 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
626 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
628 ptlrpc_request_set_replen(req);
630 req->rq_interpret_reply = osc_destroy_interpret;
631 if (!osc_can_send_destroy(cli)) {
632 struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
635 * Wait until the number of on-going destroy RPCs drops
636 * under max_rpc_in_flight
638 rc = l_wait_event_exclusive(cli->cl_destroy_waitq,
639 osc_can_send_destroy(cli), &lwi);
641 ptlrpc_req_finished(req);
646 /* Do not wait for response */
647 ptlrpcd_add_req(req);
651 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
654 u64 bits = OBD_MD_FLBLOCKS | OBD_MD_FLGRANT;
656 LASSERT(!(oa->o_valid & bits));
659 spin_lock(&cli->cl_loi_list_lock);
660 if (OCD_HAS_FLAG(&cli->cl_import->imp_connect_data, GRANT_PARAM))
661 oa->o_dirty = cli->cl_dirty_grant;
663 oa->o_dirty = cli->cl_dirty_pages << PAGE_SHIFT;
664 if (unlikely(cli->cl_dirty_pages - cli->cl_dirty_transit >
665 cli->cl_dirty_max_pages)) {
666 CERROR("dirty %lu - %lu > dirty_max %lu\n",
667 cli->cl_dirty_pages, cli->cl_dirty_transit,
668 cli->cl_dirty_max_pages);
670 } else if (unlikely(atomic_long_read(&obd_dirty_pages) -
671 atomic_long_read(&obd_dirty_transit_pages) >
672 (long)(obd_max_dirty_pages + 1))) {
673 /* The atomic_read() allowing the atomic_inc() are
674 * not covered by a lock thus they may safely race and trip
675 * this CERROR() unless we add in a small fudge factor (+1). */
676 CERROR("%s: dirty %ld - %ld > system dirty_max %ld\n",
677 cli_name(cli), atomic_long_read(&obd_dirty_pages),
678 atomic_long_read(&obd_dirty_transit_pages),
679 obd_max_dirty_pages);
681 } else if (unlikely(cli->cl_dirty_max_pages - cli->cl_dirty_pages >
683 CERROR("dirty %lu - dirty_max %lu too big???\n",
684 cli->cl_dirty_pages, cli->cl_dirty_max_pages);
687 unsigned long nrpages;
689 nrpages = cli->cl_max_pages_per_rpc;
690 nrpages *= cli->cl_max_rpcs_in_flight + 1;
691 nrpages = max(nrpages, cli->cl_dirty_max_pages);
692 oa->o_undirty = nrpages << PAGE_SHIFT;
693 if (OCD_HAS_FLAG(&cli->cl_import->imp_connect_data,
697 /* take extent tax into account when asking for more
699 nrextents = (nrpages + cli->cl_max_extent_pages - 1) /
700 cli->cl_max_extent_pages;
701 oa->o_undirty += nrextents * cli->cl_grant_extent_tax;
704 oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
705 oa->o_dropped = cli->cl_lost_grant;
706 cli->cl_lost_grant = 0;
707 spin_unlock(&cli->cl_loi_list_lock);
708 CDEBUG(D_CACHE, "dirty: %llu undirty: %u dropped %u grant: %llu\n",
709 oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
712 void osc_update_next_shrink(struct client_obd *cli)
714 cli->cl_next_shrink_grant =
715 cfs_time_shift(cli->cl_grant_shrink_interval);
716 CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
717 cli->cl_next_shrink_grant);
720 static void __osc_update_grant(struct client_obd *cli, u64 grant)
722 spin_lock(&cli->cl_loi_list_lock);
723 cli->cl_avail_grant += grant;
724 spin_unlock(&cli->cl_loi_list_lock);
727 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
729 if (body->oa.o_valid & OBD_MD_FLGRANT) {
730 CDEBUG(D_CACHE, "got %llu extra grant\n", body->oa.o_grant);
731 __osc_update_grant(cli, body->oa.o_grant);
735 static int osc_shrink_grant_interpret(const struct lu_env *env,
736 struct ptlrpc_request *req,
739 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
740 struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
741 struct ost_body *body;
744 __osc_update_grant(cli, oa->o_grant);
748 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
750 osc_update_grant(cli, body);
756 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
758 spin_lock(&cli->cl_loi_list_lock);
759 oa->o_grant = cli->cl_avail_grant / 4;
760 cli->cl_avail_grant -= oa->o_grant;
761 spin_unlock(&cli->cl_loi_list_lock);
762 if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
763 oa->o_valid |= OBD_MD_FLFLAGS;
766 oa->o_flags |= OBD_FL_SHRINK_GRANT;
767 osc_update_next_shrink(cli);
770 /* Shrink the current grant, either from some large amount to enough for a
771 * full set of in-flight RPCs, or if we have already shrunk to that limit
772 * then to enough for a single RPC. This avoids keeping more grant than
773 * needed, and avoids shrinking the grant piecemeal. */
774 static int osc_shrink_grant(struct client_obd *cli)
776 __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) *
777 (cli->cl_max_pages_per_rpc << PAGE_SHIFT);
779 spin_lock(&cli->cl_loi_list_lock);
780 if (cli->cl_avail_grant <= target_bytes)
781 target_bytes = cli->cl_max_pages_per_rpc << PAGE_SHIFT;
782 spin_unlock(&cli->cl_loi_list_lock);
784 return osc_shrink_grant_to_target(cli, target_bytes);
787 int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
790 struct ost_body *body;
793 spin_lock(&cli->cl_loi_list_lock);
794 /* Don't shrink if we are already above or below the desired limit
795 * We don't want to shrink below a single RPC, as that will negatively
796 * impact block allocation and long-term performance. */
797 if (target_bytes < cli->cl_max_pages_per_rpc << PAGE_SHIFT)
798 target_bytes = cli->cl_max_pages_per_rpc << PAGE_SHIFT;
800 if (target_bytes >= cli->cl_avail_grant) {
801 spin_unlock(&cli->cl_loi_list_lock);
804 spin_unlock(&cli->cl_loi_list_lock);
810 osc_announce_cached(cli, &body->oa, 0);
812 spin_lock(&cli->cl_loi_list_lock);
813 body->oa.o_grant = cli->cl_avail_grant - target_bytes;
814 cli->cl_avail_grant = target_bytes;
815 spin_unlock(&cli->cl_loi_list_lock);
816 if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
817 body->oa.o_valid |= OBD_MD_FLFLAGS;
818 body->oa.o_flags = 0;
820 body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
821 osc_update_next_shrink(cli);
823 rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
824 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
825 sizeof(*body), body, NULL);
827 __osc_update_grant(cli, body->oa.o_grant);
832 static int osc_should_shrink_grant(struct client_obd *client)
834 cfs_time_t time = cfs_time_current();
835 cfs_time_t next_shrink = client->cl_next_shrink_grant;
837 if ((client->cl_import->imp_connect_data.ocd_connect_flags &
838 OBD_CONNECT_GRANT_SHRINK) == 0)
841 if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
842 /* Get the current RPC size directly, instead of going via:
843 * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
844 * Keep comment here so that it can be found by searching. */
845 int brw_size = client->cl_max_pages_per_rpc << PAGE_SHIFT;
847 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
848 client->cl_avail_grant > brw_size)
851 osc_update_next_shrink(client);
856 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
858 struct client_obd *client;
860 list_for_each_entry(client, &item->ti_obd_list, cl_grant_shrink_list) {
861 if (osc_should_shrink_grant(client))
862 osc_shrink_grant(client);
867 static int osc_add_shrink_grant(struct client_obd *client)
871 rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
873 osc_grant_shrink_grant_cb, NULL,
874 &client->cl_grant_shrink_list);
876 CERROR("add grant client %s error %d\n", cli_name(client), rc);
879 CDEBUG(D_CACHE, "add grant client %s\n", cli_name(client));
880 osc_update_next_shrink(client);
884 static int osc_del_shrink_grant(struct client_obd *client)
886 return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
890 void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
893 * ocd_grant is the total grant amount we're expect to hold: if we've
894 * been evicted, it's the new avail_grant amount, cl_dirty_pages will
895 * drop to 0 as inflight RPCs fail out; otherwise, it's avail_grant +
898 * race is tolerable here: if we're evicted, but imp_state already
899 * left EVICTED state, then cl_dirty_pages must be 0 already.
901 spin_lock(&cli->cl_loi_list_lock);
902 cli->cl_avail_grant = ocd->ocd_grant;
903 if (cli->cl_import->imp_state != LUSTRE_IMP_EVICTED) {
904 cli->cl_avail_grant -= cli->cl_reserved_grant;
905 if (OCD_HAS_FLAG(ocd, GRANT_PARAM))
906 cli->cl_avail_grant -= cli->cl_dirty_grant;
908 cli->cl_avail_grant -=
909 cli->cl_dirty_pages << PAGE_SHIFT;
912 if (OCD_HAS_FLAG(ocd, GRANT_PARAM)) {
916 /* overhead for each extent insertion */
917 cli->cl_grant_extent_tax = ocd->ocd_grant_tax_kb << 10;
918 /* determine the appropriate chunk size used by osc_extent. */
919 cli->cl_chunkbits = max_t(int, PAGE_SHIFT,
920 ocd->ocd_grant_blkbits);
921 /* max_pages_per_rpc must be chunk aligned */
922 chunk_mask = ~((1 << (cli->cl_chunkbits - PAGE_SHIFT)) - 1);
923 cli->cl_max_pages_per_rpc = (cli->cl_max_pages_per_rpc +
924 ~chunk_mask) & chunk_mask;
925 /* determine maximum extent size, in #pages */
926 size = (u64)ocd->ocd_grant_max_blks << ocd->ocd_grant_blkbits;
927 cli->cl_max_extent_pages = size >> PAGE_SHIFT;
928 if (cli->cl_max_extent_pages == 0)
929 cli->cl_max_extent_pages = 1;
931 cli->cl_grant_extent_tax = 0;
932 cli->cl_chunkbits = PAGE_SHIFT;
933 cli->cl_max_extent_pages = DT_MAX_BRW_PAGES;
935 spin_unlock(&cli->cl_loi_list_lock);
937 CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld."
938 "chunk bits: %d cl_max_extent_pages: %d\n",
940 cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits,
941 cli->cl_max_extent_pages);
943 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
944 list_empty(&cli->cl_grant_shrink_list))
945 osc_add_shrink_grant(cli);
947 EXPORT_SYMBOL(osc_init_grant);
949 /* We assume that the reason this OSC got a short read is because it read
950 * beyond the end of a stripe file; i.e. lustre is reading a sparse file
951 * via the LOV, and it _knows_ it's reading inside the file, it's just that
952 * this stripe never got written at or beyond this stripe offset yet. */
953 static void handle_short_read(int nob_read, size_t page_count,
954 struct brw_page **pga)
959 /* skip bytes read OK */
960 while (nob_read > 0) {
961 LASSERT (page_count > 0);
963 if (pga[i]->count > nob_read) {
964 /* EOF inside this page */
965 ptr = kmap(pga[i]->pg) +
966 (pga[i]->off & ~PAGE_MASK);
967 memset(ptr + nob_read, 0, pga[i]->count - nob_read);
974 nob_read -= pga[i]->count;
979 /* zero remaining pages */
980 while (page_count-- > 0) {
981 ptr = kmap(pga[i]->pg) + (pga[i]->off & ~PAGE_MASK);
982 memset(ptr, 0, pga[i]->count);
988 static int check_write_rcs(struct ptlrpc_request *req,
989 int requested_nob, int niocount,
990 size_t page_count, struct brw_page **pga)
995 remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
996 sizeof(*remote_rcs) *
998 if (remote_rcs == NULL) {
999 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1003 /* return error if any niobuf was in error */
1004 for (i = 0; i < niocount; i++) {
1005 if ((int)remote_rcs[i] < 0)
1006 return(remote_rcs[i]);
1008 if (remote_rcs[i] != 0) {
1009 CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1010 i, remote_rcs[i], req);
1015 if (req->rq_bulk->bd_nob_transferred != requested_nob) {
1016 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1017 req->rq_bulk->bd_nob_transferred, requested_nob);
1024 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1026 if (p1->flag != p2->flag) {
1027 unsigned mask = ~(OBD_BRW_FROM_GRANT | OBD_BRW_NOCACHE |
1028 OBD_BRW_SYNC | OBD_BRW_ASYNC |
1029 OBD_BRW_NOQUOTA | OBD_BRW_SOFT_SYNC);
1031 /* warn if we try to combine flags that we don't know to be
1032 * safe to combine */
1033 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
1034 CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
1035 "report this at https://jira.hpdd.intel.com/\n",
1036 p1->flag, p2->flag);
1041 return (p1->off + p1->count == p2->off);
1044 static u32 osc_checksum_bulk(int nob, size_t pg_count,
1045 struct brw_page **pga, int opc,
1046 enum cksum_types cksum_type)
1050 struct cfs_crypto_hash_desc *hdesc;
1051 unsigned int bufsize;
1052 unsigned char cfs_alg = cksum_obd2cfs(cksum_type);
1054 LASSERT(pg_count > 0);
1056 hdesc = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1057 if (IS_ERR(hdesc)) {
1058 CERROR("Unable to initialize checksum hash %s\n",
1059 cfs_crypto_hash_name(cfs_alg));
1060 return PTR_ERR(hdesc);
1063 while (nob > 0 && pg_count > 0) {
1064 unsigned int count = pga[i]->count > nob ? nob : pga[i]->count;
1066 /* corrupt the data before we compute the checksum, to
1067 * simulate an OST->client data error */
1068 if (i == 0 && opc == OST_READ &&
1069 OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
1070 unsigned char *ptr = kmap(pga[i]->pg);
1071 int off = pga[i]->off & ~PAGE_MASK;
1073 memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob));
1076 cfs_crypto_hash_update_page(hdesc, pga[i]->pg,
1077 pga[i]->off & ~PAGE_MASK,
1079 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d\n",
1080 (int)(pga[i]->off & ~PAGE_MASK));
1082 nob -= pga[i]->count;
1087 bufsize = sizeof(cksum);
1088 cfs_crypto_hash_final(hdesc, (unsigned char *)&cksum, &bufsize);
1090 /* For sending we only compute the wrong checksum instead
1091 * of corrupting the data so it is still correct on a redo */
1092 if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1099 osc_brw_prep_request(int cmd, struct client_obd *cli, struct obdo *oa,
1100 u32 page_count, struct brw_page **pga,
1101 struct ptlrpc_request **reqp, int resend)
1103 struct ptlrpc_request *req;
1104 struct ptlrpc_bulk_desc *desc;
1105 struct ost_body *body;
1106 struct obd_ioobj *ioobj;
1107 struct niobuf_remote *niobuf;
1108 int niocount, i, requested_nob, opc, rc;
1109 struct osc_brw_async_args *aa;
1110 struct req_capsule *pill;
1111 struct brw_page *pg_prev;
1114 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1115 RETURN(-ENOMEM); /* Recoverable */
1116 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1117 RETURN(-EINVAL); /* Fatal */
1119 if ((cmd & OBD_BRW_WRITE) != 0) {
1121 req = ptlrpc_request_alloc_pool(cli->cl_import,
1123 &RQF_OST_BRW_WRITE);
1126 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1131 for (niocount = i = 1; i < page_count; i++) {
1132 if (!can_merge_pages(pga[i - 1], pga[i]))
1136 pill = &req->rq_pill;
1137 req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1139 req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1140 niocount * sizeof(*niobuf));
1142 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1144 ptlrpc_request_free(req);
1147 osc_set_io_portal(req);
1149 ptlrpc_at_set_req_timeout(req);
1150 /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1152 req->rq_no_retry_einprogress = 1;
1154 desc = ptlrpc_prep_bulk_imp(req, page_count,
1155 cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
1156 (opc == OST_WRITE ? PTLRPC_BULK_GET_SOURCE :
1157 PTLRPC_BULK_PUT_SINK) |
1158 PTLRPC_BULK_BUF_KIOV,
1160 &ptlrpc_bulk_kiov_pin_ops);
1163 GOTO(out, rc = -ENOMEM);
1164 /* NB request now owns desc and will free it when it gets freed */
1166 body = req_capsule_client_get(pill, &RMF_OST_BODY);
1167 ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1168 niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1169 LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1171 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
1173 obdo_to_ioobj(oa, ioobj);
1174 ioobj->ioo_bufcnt = niocount;
1175 /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
1176 * that might be send for this request. The actual number is decided
1177 * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
1178 * "max - 1" for old client compatibility sending "0", and also so the
1179 * the actual maximum is a power-of-two number, not one less. LU-1431 */
1180 ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
1181 LASSERT(page_count > 0);
1183 for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1184 struct brw_page *pg = pga[i];
1185 int poff = pg->off & ~PAGE_MASK;
1187 LASSERT(pg->count > 0);
1188 /* make sure there is no gap in the middle of page array */
1189 LASSERTF(page_count == 1 ||
1190 (ergo(i == 0, poff + pg->count == PAGE_SIZE) &&
1191 ergo(i > 0 && i < page_count - 1,
1192 poff == 0 && pg->count == PAGE_SIZE) &&
1193 ergo(i == page_count - 1, poff == 0)),
1194 "i: %d/%d pg: %p off: %llu, count: %u\n",
1195 i, page_count, pg, pg->off, pg->count);
1196 LASSERTF(i == 0 || pg->off > pg_prev->off,
1197 "i %d p_c %u pg %p [pri %lu ind %lu] off %llu"
1198 " prev_pg %p [pri %lu ind %lu] off %llu\n",
1200 pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1201 pg_prev->pg, page_private(pg_prev->pg),
1202 pg_prev->pg->index, pg_prev->off);
1203 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1204 (pg->flag & OBD_BRW_SRVLOCK));
1206 desc->bd_frag_ops->add_kiov_frag(desc, pg->pg, poff, pg->count);
1207 requested_nob += pg->count;
1209 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1211 niobuf->rnb_len += pg->count;
1213 niobuf->rnb_offset = pg->off;
1214 niobuf->rnb_len = pg->count;
1215 niobuf->rnb_flags = pg->flag;
1220 LASSERTF((void *)(niobuf - niocount) ==
1221 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1222 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1223 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1225 osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1227 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1228 body->oa.o_valid |= OBD_MD_FLFLAGS;
1229 body->oa.o_flags = 0;
1231 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1234 if (osc_should_shrink_grant(cli))
1235 osc_shrink_grant_local(cli, &body->oa);
1237 /* size[REQ_REC_OFF] still sizeof (*body) */
1238 if (opc == OST_WRITE) {
1239 if (cli->cl_checksum &&
1240 !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1241 /* store cl_cksum_type in a local variable since
1242 * it can be changed via lprocfs */
1243 enum cksum_types cksum_type = cli->cl_cksum_type;
1245 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1246 body->oa.o_flags = 0;
1248 body->oa.o_flags |= cksum_type_pack(cksum_type);
1249 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1250 body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1254 CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1256 /* save this in 'oa', too, for later checking */
1257 oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1258 oa->o_flags |= cksum_type_pack(cksum_type);
1260 /* clear out the checksum flag, in case this is a
1261 * resend but cl_checksum is no longer set. b=11238 */
1262 oa->o_valid &= ~OBD_MD_FLCKSUM;
1264 oa->o_cksum = body->oa.o_cksum;
1265 /* 1 RC per niobuf */
1266 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1267 sizeof(__u32) * niocount);
1269 if (cli->cl_checksum &&
1270 !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1271 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1272 body->oa.o_flags = 0;
1273 body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1274 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1277 /* Client cksum has been already copied to wire obdo in previous
1278 * lustre_set_wire_obdo(), and in the case a bulk-read is being
1279 * resent due to cksum error, this will allow Server to
1280 * check+dump pages on its side */
1282 ptlrpc_request_set_replen(req);
1284 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1285 aa = ptlrpc_req_async_args(req);
1287 aa->aa_requested_nob = requested_nob;
1288 aa->aa_nio_count = niocount;
1289 aa->aa_page_count = page_count;
1293 INIT_LIST_HEAD(&aa->aa_oaps);
1296 niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1297 CDEBUG(D_RPCTRACE, "brw rpc %p - object "DOSTID" offset %lld<>%lld\n",
1298 req, POSTID(&oa->o_oi), niobuf[0].rnb_offset,
1299 niobuf[niocount - 1].rnb_offset + niobuf[niocount - 1].rnb_len);
1303 ptlrpc_req_finished(req);
1307 char dbgcksum_file_name[PATH_MAX];
1309 static void dump_all_bulk_pages(struct obdo *oa, __u32 page_count,
1310 struct brw_page **pga, __u32 server_cksum,
1319 /* will only keep dump of pages on first error for the same range in
1320 * file/fid, not during the resends/retries. */
1321 snprintf(dbgcksum_file_name, sizeof(dbgcksum_file_name),
1322 "%s-checksum_dump-osc-"DFID":[%llu-%llu]-%x-%x",
1323 (strncmp(libcfs_debug_file_path_arr, "NONE", 4) != 0 ?
1324 libcfs_debug_file_path_arr :
1325 LIBCFS_DEBUG_FILE_PATH_DEFAULT),
1326 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : 0ULL,
1327 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1328 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1330 pga[page_count-1]->off + pga[page_count-1]->count - 1,
1331 client_cksum, server_cksum);
1332 filp = filp_open(dbgcksum_file_name,
1333 O_CREAT | O_EXCL | O_WRONLY | O_LARGEFILE, 0600);
1337 CDEBUG(D_INFO, "%s: can't open to dump pages with "
1338 "checksum error: rc = %d\n", dbgcksum_file_name,
1341 CERROR("%s: can't open to dump pages with checksum "
1342 "error: rc = %d\n", dbgcksum_file_name, rc);
1348 for (i = 0; i < page_count; i++) {
1349 len = pga[i]->count;
1350 buf = kmap(pga[i]->pg);
1352 rc = vfs_write(filp, (__force const char __user *)buf,
1355 CERROR("%s: wanted to write %u but got %d "
1356 "error\n", dbgcksum_file_name, len, rc);
1361 CDEBUG(D_INFO, "%s: wrote %d bytes\n",
1362 dbgcksum_file_name, rc);
1368 rc = ll_vfs_fsync_range(filp, 0, LLONG_MAX, 1);
1370 CERROR("%s: sync returns %d\n", dbgcksum_file_name, rc);
1371 filp_close(filp, NULL);
1376 check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1377 __u32 client_cksum, __u32 server_cksum,
1378 struct osc_brw_async_args *aa)
1382 enum cksum_types cksum_type;
1384 if (server_cksum == client_cksum) {
1385 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1389 if (aa->aa_cli->cl_checksum_dump)
1390 dump_all_bulk_pages(oa, aa->aa_page_count, aa->aa_ppga,
1391 server_cksum, client_cksum);
1393 cksum_type = cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1395 new_cksum = osc_checksum_bulk(aa->aa_requested_nob, aa->aa_page_count,
1396 aa->aa_ppga, OST_WRITE, cksum_type);
1398 if (cksum_type != cksum_type_unpack(aa->aa_oa->o_flags))
1399 msg = "the server did not use the checksum type specified in "
1400 "the original request - likely a protocol problem";
1401 else if (new_cksum == server_cksum)
1402 msg = "changed on the client after we checksummed it - "
1403 "likely false positive due to mmap IO (bug 11742)";
1404 else if (new_cksum == client_cksum)
1405 msg = "changed in transit before arrival at OST";
1407 msg = "changed in transit AND doesn't match the original - "
1408 "likely false positive due to mmap IO (bug 11742)";
1410 LCONSOLE_ERROR_MSG(0x132, "%s: BAD WRITE CHECKSUM: %s: from %s inode "
1411 DFID " object "DOSTID" extent [%llu-%llu], original "
1412 "client csum %x (type %x), server csum %x (type %x),"
1413 " client csum now %x\n",
1414 aa->aa_cli->cl_import->imp_obd->obd_name,
1415 msg, libcfs_nid2str(peer->nid),
1416 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1417 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1418 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1419 POSTID(&oa->o_oi), aa->aa_ppga[0]->off,
1420 aa->aa_ppga[aa->aa_page_count - 1]->off +
1421 aa->aa_ppga[aa->aa_page_count-1]->count - 1,
1422 client_cksum, cksum_type_unpack(aa->aa_oa->o_flags),
1423 server_cksum, cksum_type, new_cksum);
1427 /* Note rc enters this function as number of bytes transferred */
1428 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1430 struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1431 const struct lnet_process_id *peer =
1432 &req->rq_import->imp_connection->c_peer;
1433 struct client_obd *cli = aa->aa_cli;
1434 struct ost_body *body;
1435 u32 client_cksum = 0;
1438 if (rc < 0 && rc != -EDQUOT) {
1439 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1443 LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1444 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1446 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1450 /* set/clear over quota flag for a uid/gid/projid */
1451 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1452 body->oa.o_valid & (OBD_MD_FLALLQUOTA)) {
1453 unsigned qid[LL_MAXQUOTAS] = {
1454 body->oa.o_uid, body->oa.o_gid,
1455 body->oa.o_projid };
1456 CDEBUG(D_QUOTA, "setdq for [%u %u %u] with valid %#llx, flags %x\n",
1457 body->oa.o_uid, body->oa.o_gid, body->oa.o_projid,
1458 body->oa.o_valid, body->oa.o_flags);
1459 osc_quota_setdq(cli, qid, body->oa.o_valid,
1463 osc_update_grant(cli, body);
1468 if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1469 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1471 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1473 CERROR("Unexpected +ve rc %d\n", rc);
1476 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1478 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1481 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1482 check_write_checksum(&body->oa, peer, client_cksum,
1483 body->oa.o_cksum, aa))
1486 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1487 aa->aa_page_count, aa->aa_ppga);
1491 /* The rest of this function executes only for OST_READs */
1493 /* if unwrap_bulk failed, return -EAGAIN to retry */
1494 rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1496 GOTO(out, rc = -EAGAIN);
1498 if (rc > aa->aa_requested_nob) {
1499 CERROR("Unexpected rc %d (%d requested)\n", rc,
1500 aa->aa_requested_nob);
1504 if (rc != req->rq_bulk->bd_nob_transferred) {
1505 CERROR ("Unexpected rc %d (%d transferred)\n",
1506 rc, req->rq_bulk->bd_nob_transferred);
1510 if (rc < aa->aa_requested_nob)
1511 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1513 if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1514 static int cksum_counter;
1515 u32 server_cksum = body->oa.o_cksum;
1518 enum cksum_types cksum_type;
1520 cksum_type = cksum_type_unpack(body->oa.o_valid &OBD_MD_FLFLAGS?
1521 body->oa.o_flags : 0);
1522 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1523 aa->aa_ppga, OST_READ,
1526 if (peer->nid != req->rq_bulk->bd_sender) {
1528 router = libcfs_nid2str(req->rq_bulk->bd_sender);
1531 if (server_cksum != client_cksum) {
1532 struct ost_body *clbody;
1533 u32 page_count = aa->aa_page_count;
1535 clbody = req_capsule_client_get(&req->rq_pill,
1537 if (cli->cl_checksum_dump)
1538 dump_all_bulk_pages(&clbody->oa, page_count,
1539 aa->aa_ppga, server_cksum,
1542 LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1543 "%s%s%s inode "DFID" object "DOSTID
1544 " extent [%llu-%llu], client %x, "
1545 "server %x, cksum_type %x\n",
1546 req->rq_import->imp_obd->obd_name,
1547 libcfs_nid2str(peer->nid),
1549 clbody->oa.o_valid & OBD_MD_FLFID ?
1550 clbody->oa.o_parent_seq : 0ULL,
1551 clbody->oa.o_valid & OBD_MD_FLFID ?
1552 clbody->oa.o_parent_oid : 0,
1553 clbody->oa.o_valid & OBD_MD_FLFID ?
1554 clbody->oa.o_parent_ver : 0,
1555 POSTID(&body->oa.o_oi),
1556 aa->aa_ppga[0]->off,
1557 aa->aa_ppga[page_count-1]->off +
1558 aa->aa_ppga[page_count-1]->count - 1,
1559 client_cksum, server_cksum,
1562 aa->aa_oa->o_cksum = client_cksum;
1566 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1569 } else if (unlikely(client_cksum)) {
1570 static int cksum_missed;
1573 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1574 CERROR("Checksum %u requested from %s but not sent\n",
1575 cksum_missed, libcfs_nid2str(peer->nid));
1581 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
1582 aa->aa_oa, &body->oa);
1587 static int osc_brw_redo_request(struct ptlrpc_request *request,
1588 struct osc_brw_async_args *aa, int rc)
1590 struct ptlrpc_request *new_req;
1591 struct osc_brw_async_args *new_aa;
1592 struct osc_async_page *oap;
1595 DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
1596 "redo for recoverable error %d", rc);
1598 rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1599 OST_WRITE ? OBD_BRW_WRITE : OBD_BRW_READ,
1600 aa->aa_cli, aa->aa_oa, aa->aa_page_count,
1601 aa->aa_ppga, &new_req, 1);
1605 list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1606 if (oap->oap_request != NULL) {
1607 LASSERTF(request == oap->oap_request,
1608 "request %p != oap_request %p\n",
1609 request, oap->oap_request);
1610 if (oap->oap_interrupted) {
1611 ptlrpc_req_finished(new_req);
1616 /* New request takes over pga and oaps from old request.
1617 * Note that copying a list_head doesn't work, need to move it... */
1619 new_req->rq_interpret_reply = request->rq_interpret_reply;
1620 new_req->rq_async_args = request->rq_async_args;
1621 new_req->rq_commit_cb = request->rq_commit_cb;
1622 /* cap resend delay to the current request timeout, this is similar to
1623 * what ptlrpc does (see after_reply()) */
1624 if (aa->aa_resends > new_req->rq_timeout)
1625 new_req->rq_sent = ktime_get_real_seconds() + new_req->rq_timeout;
1627 new_req->rq_sent = ktime_get_real_seconds() + aa->aa_resends;
1628 new_req->rq_generation_set = 1;
1629 new_req->rq_import_generation = request->rq_import_generation;
1631 new_aa = ptlrpc_req_async_args(new_req);
1633 INIT_LIST_HEAD(&new_aa->aa_oaps);
1634 list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
1635 INIT_LIST_HEAD(&new_aa->aa_exts);
1636 list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
1637 new_aa->aa_resends = aa->aa_resends;
1639 list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1640 if (oap->oap_request) {
1641 ptlrpc_req_finished(oap->oap_request);
1642 oap->oap_request = ptlrpc_request_addref(new_req);
1646 /* XXX: This code will run into problem if we're going to support
1647 * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
1648 * and wait for all of them to be finished. We should inherit request
1649 * set from old request. */
1650 ptlrpcd_add_req(new_req);
1652 DEBUG_REQ(D_INFO, new_req, "new request");
1657 * ugh, we want disk allocation on the target to happen in offset order. we'll
1658 * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1659 * fine for our small page arrays and doesn't require allocation. its an
1660 * insertion sort that swaps elements that are strides apart, shrinking the
1661 * stride down until its '1' and the array is sorted.
1663 static void sort_brw_pages(struct brw_page **array, int num)
1666 struct brw_page *tmp;
1670 for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1675 for (i = stride ; i < num ; i++) {
1678 while (j >= stride && array[j - stride]->off > tmp->off) {
1679 array[j] = array[j - stride];
1684 } while (stride > 1);
1687 static void osc_release_ppga(struct brw_page **ppga, size_t count)
1689 LASSERT(ppga != NULL);
1690 OBD_FREE(ppga, sizeof(*ppga) * count);
1693 static int brw_interpret(const struct lu_env *env,
1694 struct ptlrpc_request *req, void *data, int rc)
1696 struct osc_brw_async_args *aa = data;
1697 struct osc_extent *ext;
1698 struct osc_extent *tmp;
1699 struct client_obd *cli = aa->aa_cli;
1702 rc = osc_brw_fini_request(req, rc);
1703 CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
1704 /* When server return -EINPROGRESS, client should always retry
1705 * regardless of the number of times the bulk was resent already. */
1706 if (osc_recoverable_error(rc)) {
1707 if (req->rq_import_generation !=
1708 req->rq_import->imp_generation) {
1709 CDEBUG(D_HA, "%s: resend cross eviction for object: "
1710 ""DOSTID", rc = %d.\n",
1711 req->rq_import->imp_obd->obd_name,
1712 POSTID(&aa->aa_oa->o_oi), rc);
1713 } else if (rc == -EINPROGRESS ||
1714 client_should_resend(aa->aa_resends, aa->aa_cli)) {
1715 rc = osc_brw_redo_request(req, aa, rc);
1717 CERROR("%s: too many resent retries for object: "
1718 "%llu:%llu, rc = %d.\n",
1719 req->rq_import->imp_obd->obd_name,
1720 POSTID(&aa->aa_oa->o_oi), rc);
1725 else if (rc == -EAGAIN || rc == -EINPROGRESS)
1730 struct obdo *oa = aa->aa_oa;
1731 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
1732 unsigned long valid = 0;
1733 struct cl_object *obj;
1734 struct osc_async_page *last;
1736 last = brw_page2oap(aa->aa_ppga[aa->aa_page_count - 1]);
1737 obj = osc2cl(last->oap_obj);
1739 cl_object_attr_lock(obj);
1740 if (oa->o_valid & OBD_MD_FLBLOCKS) {
1741 attr->cat_blocks = oa->o_blocks;
1742 valid |= CAT_BLOCKS;
1744 if (oa->o_valid & OBD_MD_FLMTIME) {
1745 attr->cat_mtime = oa->o_mtime;
1748 if (oa->o_valid & OBD_MD_FLATIME) {
1749 attr->cat_atime = oa->o_atime;
1752 if (oa->o_valid & OBD_MD_FLCTIME) {
1753 attr->cat_ctime = oa->o_ctime;
1757 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1758 struct lov_oinfo *loi = cl2osc(obj)->oo_oinfo;
1759 loff_t last_off = last->oap_count + last->oap_obj_off +
1762 /* Change file size if this is an out of quota or
1763 * direct IO write and it extends the file size */
1764 if (loi->loi_lvb.lvb_size < last_off) {
1765 attr->cat_size = last_off;
1768 /* Extend KMS if it's not a lockless write */
1769 if (loi->loi_kms < last_off &&
1770 oap2osc_page(last)->ops_srvlock == 0) {
1771 attr->cat_kms = last_off;
1777 cl_object_attr_update(env, obj, attr, valid);
1778 cl_object_attr_unlock(obj);
1780 OBDO_FREE(aa->aa_oa);
1782 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE && rc == 0)
1783 osc_inc_unstable_pages(req);
1785 list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
1786 list_del_init(&ext->oe_link);
1787 osc_extent_finish(env, ext, 1, rc);
1789 LASSERT(list_empty(&aa->aa_exts));
1790 LASSERT(list_empty(&aa->aa_oaps));
1792 osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1793 ptlrpc_lprocfs_brw(req, req->rq_bulk->bd_nob_transferred);
1795 spin_lock(&cli->cl_loi_list_lock);
1796 /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
1797 * is called so we know whether to go to sync BRWs or wait for more
1798 * RPCs to complete */
1799 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
1800 cli->cl_w_in_flight--;
1802 cli->cl_r_in_flight--;
1803 osc_wake_cache_waiters(cli);
1804 spin_unlock(&cli->cl_loi_list_lock);
1806 osc_io_unplug(env, cli, NULL);
1810 static void brw_commit(struct ptlrpc_request *req)
1812 /* If osc_inc_unstable_pages (via osc_extent_finish) races with
1813 * this called via the rq_commit_cb, I need to ensure
1814 * osc_dec_unstable_pages is still called. Otherwise unstable
1815 * pages may be leaked. */
1816 spin_lock(&req->rq_lock);
1817 if (likely(req->rq_unstable)) {
1818 req->rq_unstable = 0;
1819 spin_unlock(&req->rq_lock);
1821 osc_dec_unstable_pages(req);
1823 req->rq_committed = 1;
1824 spin_unlock(&req->rq_lock);
1829 * Build an RPC by the list of extent @ext_list. The caller must ensure
1830 * that the total pages in this list are NOT over max pages per RPC.
1831 * Extents in the list must be in OES_RPC state.
1833 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
1834 struct list_head *ext_list, int cmd)
1836 struct ptlrpc_request *req = NULL;
1837 struct osc_extent *ext;
1838 struct brw_page **pga = NULL;
1839 struct osc_brw_async_args *aa = NULL;
1840 struct obdo *oa = NULL;
1841 struct osc_async_page *oap;
1842 struct osc_object *obj = NULL;
1843 struct cl_req_attr *crattr = NULL;
1844 loff_t starting_offset = OBD_OBJECT_EOF;
1845 loff_t ending_offset = 0;
1849 bool soft_sync = false;
1850 bool interrupted = false;
1854 struct list_head rpc_list = LIST_HEAD_INIT(rpc_list);
1855 struct ost_body *body;
1857 LASSERT(!list_empty(ext_list));
1859 /* add pages into rpc_list to build BRW rpc */
1860 list_for_each_entry(ext, ext_list, oe_link) {
1861 LASSERT(ext->oe_state == OES_RPC);
1862 mem_tight |= ext->oe_memalloc;
1863 grant += ext->oe_grants;
1864 page_count += ext->oe_nr_pages;
1869 soft_sync = osc_over_unstable_soft_limit(cli);
1871 mpflag = cfs_memory_pressure_get_and_set();
1873 OBD_ALLOC(pga, sizeof(*pga) * page_count);
1875 GOTO(out, rc = -ENOMEM);
1879 GOTO(out, rc = -ENOMEM);
1882 list_for_each_entry(ext, ext_list, oe_link) {
1883 list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
1885 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
1887 oap->oap_brw_flags |= OBD_BRW_SOFT_SYNC;
1888 pga[i] = &oap->oap_brw_page;
1889 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
1892 list_add_tail(&oap->oap_rpc_item, &rpc_list);
1893 if (starting_offset == OBD_OBJECT_EOF ||
1894 starting_offset > oap->oap_obj_off)
1895 starting_offset = oap->oap_obj_off;
1897 LASSERT(oap->oap_page_off == 0);
1898 if (ending_offset < oap->oap_obj_off + oap->oap_count)
1899 ending_offset = oap->oap_obj_off +
1902 LASSERT(oap->oap_page_off + oap->oap_count ==
1904 if (oap->oap_interrupted)
1909 /* first page in the list */
1910 oap = list_entry(rpc_list.next, typeof(*oap), oap_rpc_item);
1912 crattr = &osc_env_info(env)->oti_req_attr;
1913 memset(crattr, 0, sizeof(*crattr));
1914 crattr->cra_type = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
1915 crattr->cra_flags = ~0ULL;
1916 crattr->cra_page = oap2cl_page(oap);
1917 crattr->cra_oa = oa;
1918 cl_req_attr_set(env, osc2cl(obj), crattr);
1920 if (cmd == OBD_BRW_WRITE)
1921 oa->o_grant_used = grant;
1923 sort_brw_pages(pga, page_count);
1924 rc = osc_brw_prep_request(cmd, cli, oa, page_count, pga, &req, 0);
1926 CERROR("prep_req failed: %d\n", rc);
1930 req->rq_commit_cb = brw_commit;
1931 req->rq_interpret_reply = brw_interpret;
1932 req->rq_memalloc = mem_tight != 0;
1933 oap->oap_request = ptlrpc_request_addref(req);
1934 if (interrupted && !req->rq_intr)
1935 ptlrpc_mark_interrupted(req);
1937 /* Need to update the timestamps after the request is built in case
1938 * we race with setattr (locally or in queue at OST). If OST gets
1939 * later setattr before earlier BRW (as determined by the request xid),
1940 * the OST will not use BRW timestamps. Sadly, there is no obvious
1941 * way to do this in a single call. bug 10150 */
1942 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
1943 crattr->cra_oa = &body->oa;
1944 crattr->cra_flags = OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME;
1945 cl_req_attr_set(env, osc2cl(obj), crattr);
1946 lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid);
1948 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1949 aa = ptlrpc_req_async_args(req);
1950 INIT_LIST_HEAD(&aa->aa_oaps);
1951 list_splice_init(&rpc_list, &aa->aa_oaps);
1952 INIT_LIST_HEAD(&aa->aa_exts);
1953 list_splice_init(ext_list, &aa->aa_exts);
1955 spin_lock(&cli->cl_loi_list_lock);
1956 starting_offset >>= PAGE_SHIFT;
1957 if (cmd == OBD_BRW_READ) {
1958 cli->cl_r_in_flight++;
1959 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
1960 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
1961 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
1962 starting_offset + 1);
1964 cli->cl_w_in_flight++;
1965 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
1966 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
1967 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
1968 starting_offset + 1);
1970 spin_unlock(&cli->cl_loi_list_lock);
1972 DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %ur/%uw in flight",
1973 page_count, aa, cli->cl_r_in_flight,
1974 cli->cl_w_in_flight);
1975 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_DELAY_IO, cfs_fail_val);
1977 ptlrpcd_add_req(req);
1983 cfs_memory_pressure_restore(mpflag);
1986 LASSERT(req == NULL);
1991 OBD_FREE(pga, sizeof(*pga) * page_count);
1992 /* this should happen rarely and is pretty bad, it makes the
1993 * pending list not follow the dirty order */
1994 while (!list_empty(ext_list)) {
1995 ext = list_entry(ext_list->next, struct osc_extent,
1997 list_del_init(&ext->oe_link);
1998 osc_extent_finish(env, ext, 0, rc);
2004 static int osc_set_lock_data(struct ldlm_lock *lock, void *data)
2008 LASSERT(lock != NULL);
2010 lock_res_and_lock(lock);
2012 if (lock->l_ast_data == NULL)
2013 lock->l_ast_data = data;
2014 if (lock->l_ast_data == data)
2017 unlock_res_and_lock(lock);
2022 int osc_enqueue_fini(struct ptlrpc_request *req, osc_enqueue_upcall_f upcall,
2023 void *cookie, struct lustre_handle *lockh,
2024 enum ldlm_mode mode, __u64 *flags, bool speculative,
2027 bool intent = *flags & LDLM_FL_HAS_INTENT;
2031 /* The request was created before ldlm_cli_enqueue call. */
2032 if (intent && errcode == ELDLM_LOCK_ABORTED) {
2033 struct ldlm_reply *rep;
2035 rep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
2036 LASSERT(rep != NULL);
2038 rep->lock_policy_res1 =
2039 ptlrpc_status_ntoh(rep->lock_policy_res1);
2040 if (rep->lock_policy_res1)
2041 errcode = rep->lock_policy_res1;
2043 *flags |= LDLM_FL_LVB_READY;
2044 } else if (errcode == ELDLM_OK) {
2045 *flags |= LDLM_FL_LVB_READY;
2048 /* Call the update callback. */
2049 rc = (*upcall)(cookie, lockh, errcode);
2051 /* release the reference taken in ldlm_cli_enqueue() */
2052 if (errcode == ELDLM_LOCK_MATCHED)
2054 if (errcode == ELDLM_OK && lustre_handle_is_used(lockh))
2055 ldlm_lock_decref(lockh, mode);
2060 int osc_enqueue_interpret(const struct lu_env *env, struct ptlrpc_request *req,
2061 struct osc_enqueue_args *aa, int rc)
2063 struct ldlm_lock *lock;
2064 struct lustre_handle *lockh = &aa->oa_lockh;
2065 enum ldlm_mode mode = aa->oa_mode;
2066 struct ost_lvb *lvb = aa->oa_lvb;
2067 __u32 lvb_len = sizeof(*lvb);
2072 /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2074 lock = ldlm_handle2lock(lockh);
2075 LASSERTF(lock != NULL,
2076 "lockh %#llx, req %p, aa %p - client evicted?\n",
2077 lockh->cookie, req, aa);
2079 /* Take an additional reference so that a blocking AST that
2080 * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2081 * to arrive after an upcall has been executed by
2082 * osc_enqueue_fini(). */
2083 ldlm_lock_addref(lockh, mode);
2085 /* Let cl_lock_state_wait fail with -ERESTARTSYS to unuse sublocks. */
2086 OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_HANG, 2);
2088 /* Let CP AST to grant the lock first. */
2089 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
2091 if (aa->oa_speculative) {
2092 LASSERT(aa->oa_lvb == NULL);
2093 LASSERT(aa->oa_flags == NULL);
2094 aa->oa_flags = &flags;
2097 /* Complete obtaining the lock procedure. */
2098 rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_type, 1,
2099 aa->oa_mode, aa->oa_flags, lvb, lvb_len,
2101 /* Complete osc stuff. */
2102 rc = osc_enqueue_fini(req, aa->oa_upcall, aa->oa_cookie, lockh, mode,
2103 aa->oa_flags, aa->oa_speculative, rc);
2105 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
2107 ldlm_lock_decref(lockh, mode);
2108 LDLM_LOCK_PUT(lock);
2112 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
2114 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2115 * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2116 * other synchronous requests, however keeping some locks and trying to obtain
2117 * others may take a considerable amount of time in a case of ost failure; and
2118 * when other sync requests do not get released lock from a client, the client
2119 * is evicted from the cluster -- such scenarious make the life difficult, so
2120 * release locks just after they are obtained. */
2121 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2122 __u64 *flags, union ldlm_policy_data *policy,
2123 struct ost_lvb *lvb, int kms_valid,
2124 osc_enqueue_upcall_f upcall, void *cookie,
2125 struct ldlm_enqueue_info *einfo,
2126 struct ptlrpc_request_set *rqset, int async,
2129 struct obd_device *obd = exp->exp_obd;
2130 struct lustre_handle lockh = { 0 };
2131 struct ptlrpc_request *req = NULL;
2132 int intent = *flags & LDLM_FL_HAS_INTENT;
2133 __u64 match_flags = *flags;
2134 enum ldlm_mode mode;
2138 /* Filesystem lock extents are extended to page boundaries so that
2139 * dealing with the page cache is a little smoother. */
2140 policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2141 policy->l_extent.end |= ~PAGE_MASK;
2144 * kms is not valid when either object is completely fresh (so that no
2145 * locks are cached), or object was evicted. In the latter case cached
2146 * lock cannot be used, because it would prime inode state with
2147 * potentially stale LVB.
2152 /* Next, search for already existing extent locks that will cover us */
2153 /* If we're trying to read, we also search for an existing PW lock. The
2154 * VFS and page cache already protect us locally, so lots of readers/
2155 * writers can share a single PW lock.
2157 * There are problems with conversion deadlocks, so instead of
2158 * converting a read lock to a write lock, we'll just enqueue a new
2161 * At some point we should cancel the read lock instead of making them
2162 * send us a blocking callback, but there are problems with canceling
2163 * locks out from other users right now, too. */
2164 mode = einfo->ei_mode;
2165 if (einfo->ei_mode == LCK_PR)
2167 /* Normal lock requests must wait for the LVB to be ready before
2168 * matching a lock; speculative lock requests do not need to,
2169 * because they will not actually use the lock. */
2171 match_flags |= LDLM_FL_LVB_READY;
2173 match_flags |= LDLM_FL_BLOCK_GRANTED;
2174 mode = ldlm_lock_match(obd->obd_namespace, match_flags, res_id,
2175 einfo->ei_type, policy, mode, &lockh, 0);
2177 struct ldlm_lock *matched;
2179 if (*flags & LDLM_FL_TEST_LOCK)
2182 matched = ldlm_handle2lock(&lockh);
2184 /* This DLM lock request is speculative, and does not
2185 * have an associated IO request. Therefore if there
2186 * is already a DLM lock, it wll just inform the
2187 * caller to cancel the request for this stripe.*/
2188 lock_res_and_lock(matched);
2189 if (ldlm_extent_equal(&policy->l_extent,
2190 &matched->l_policy_data.l_extent))
2194 unlock_res_and_lock(matched);
2196 ldlm_lock_decref(&lockh, mode);
2197 LDLM_LOCK_PUT(matched);
2199 } else if (osc_set_lock_data(matched, einfo->ei_cbdata)) {
2200 *flags |= LDLM_FL_LVB_READY;
2202 /* We already have a lock, and it's referenced. */
2203 (*upcall)(cookie, &lockh, ELDLM_LOCK_MATCHED);
2205 ldlm_lock_decref(&lockh, mode);
2206 LDLM_LOCK_PUT(matched);
2209 ldlm_lock_decref(&lockh, mode);
2210 LDLM_LOCK_PUT(matched);
2215 if (*flags & (LDLM_FL_TEST_LOCK | LDLM_FL_MATCH_LOCK))
2219 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2220 &RQF_LDLM_ENQUEUE_LVB);
2224 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
2226 ptlrpc_request_free(req);
2230 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
2232 ptlrpc_request_set_replen(req);
2235 /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2236 *flags &= ~LDLM_FL_BLOCK_GRANTED;
2238 rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2239 sizeof(*lvb), LVB_T_OST, &lockh, async);
2242 struct osc_enqueue_args *aa;
2243 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2244 aa = ptlrpc_req_async_args(req);
2246 aa->oa_mode = einfo->ei_mode;
2247 aa->oa_type = einfo->ei_type;
2248 lustre_handle_copy(&aa->oa_lockh, &lockh);
2249 aa->oa_upcall = upcall;
2250 aa->oa_cookie = cookie;
2251 aa->oa_speculative = speculative;
2253 aa->oa_flags = flags;
2256 /* speculative locks are essentially to enqueue
2257 * a DLM lock in advance, so we don't care
2258 * about the result of the enqueue. */
2260 aa->oa_flags = NULL;
2263 req->rq_interpret_reply =
2264 (ptlrpc_interpterer_t)osc_enqueue_interpret;
2265 if (rqset == PTLRPCD_SET)
2266 ptlrpcd_add_req(req);
2268 ptlrpc_set_add_req(rqset, req);
2269 } else if (intent) {
2270 ptlrpc_req_finished(req);
2275 rc = osc_enqueue_fini(req, upcall, cookie, &lockh, einfo->ei_mode,
2276 flags, speculative, rc);
2278 ptlrpc_req_finished(req);
2283 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2284 enum ldlm_type type, union ldlm_policy_data *policy,
2285 enum ldlm_mode mode, __u64 *flags, void *data,
2286 struct lustre_handle *lockh, int unref)
2288 struct obd_device *obd = exp->exp_obd;
2289 __u64 lflags = *flags;
2293 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2296 /* Filesystem lock extents are extended to page boundaries so that
2297 * dealing with the page cache is a little smoother */
2298 policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2299 policy->l_extent.end |= ~PAGE_MASK;
2301 /* Next, search for already existing extent locks that will cover us */
2302 /* If we're trying to read, we also search for an existing PW lock. The
2303 * VFS and page cache already protect us locally, so lots of readers/
2304 * writers can share a single PW lock. */
2308 rc = ldlm_lock_match(obd->obd_namespace, lflags,
2309 res_id, type, policy, rc, lockh, unref);
2310 if (rc == 0 || lflags & LDLM_FL_TEST_LOCK)
2314 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2316 LASSERT(lock != NULL);
2317 if (!osc_set_lock_data(lock, data)) {
2318 ldlm_lock_decref(lockh, rc);
2321 LDLM_LOCK_PUT(lock);
2326 static int osc_statfs_interpret(const struct lu_env *env,
2327 struct ptlrpc_request *req,
2328 struct osc_async_args *aa, int rc)
2330 struct obd_statfs *msfs;
2334 /* The request has in fact never been sent
2335 * due to issues at a higher level (LOV).
2336 * Exit immediately since the caller is
2337 * aware of the problem and takes care
2338 * of the clean up */
2341 if ((rc == -ENOTCONN || rc == -EAGAIN) &&
2342 (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
2348 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2350 GOTO(out, rc = -EPROTO);
2353 *aa->aa_oi->oi_osfs = *msfs;
2355 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2359 static int osc_statfs_async(struct obd_export *exp,
2360 struct obd_info *oinfo, __u64 max_age,
2361 struct ptlrpc_request_set *rqset)
2363 struct obd_device *obd = class_exp2obd(exp);
2364 struct ptlrpc_request *req;
2365 struct osc_async_args *aa;
2369 /* We could possibly pass max_age in the request (as an absolute
2370 * timestamp or a "seconds.usec ago") so the target can avoid doing
2371 * extra calls into the filesystem if that isn't necessary (e.g.
2372 * during mount that would help a bit). Having relative timestamps
2373 * is not so great if request processing is slow, while absolute
2374 * timestamps are not ideal because they need time synchronization. */
2375 req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
2379 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2381 ptlrpc_request_free(req);
2384 ptlrpc_request_set_replen(req);
2385 req->rq_request_portal = OST_CREATE_PORTAL;
2386 ptlrpc_at_set_req_timeout(req);
2388 if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
2389 /* procfs requests not want stat in wait for avoid deadlock */
2390 req->rq_no_resend = 1;
2391 req->rq_no_delay = 1;
2394 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
2395 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2396 aa = ptlrpc_req_async_args(req);
2399 ptlrpc_set_add_req(rqset, req);
2403 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
2404 struct obd_statfs *osfs, __u64 max_age, __u32 flags)
2406 struct obd_device *obd = class_exp2obd(exp);
2407 struct obd_statfs *msfs;
2408 struct ptlrpc_request *req;
2409 struct obd_import *imp = NULL;
2413 /*Since the request might also come from lprocfs, so we need
2414 *sync this with client_disconnect_export Bug15684*/
2415 down_read(&obd->u.cli.cl_sem);
2416 if (obd->u.cli.cl_import)
2417 imp = class_import_get(obd->u.cli.cl_import);
2418 up_read(&obd->u.cli.cl_sem);
2422 /* We could possibly pass max_age in the request (as an absolute
2423 * timestamp or a "seconds.usec ago") so the target can avoid doing
2424 * extra calls into the filesystem if that isn't necessary (e.g.
2425 * during mount that would help a bit). Having relative timestamps
2426 * is not so great if request processing is slow, while absolute
2427 * timestamps are not ideal because they need time synchronization. */
2428 req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
2430 class_import_put(imp);
2435 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2437 ptlrpc_request_free(req);
2440 ptlrpc_request_set_replen(req);
2441 req->rq_request_portal = OST_CREATE_PORTAL;
2442 ptlrpc_at_set_req_timeout(req);
2444 if (flags & OBD_STATFS_NODELAY) {
2445 /* procfs requests not want stat in wait for avoid deadlock */
2446 req->rq_no_resend = 1;
2447 req->rq_no_delay = 1;
2450 rc = ptlrpc_queue_wait(req);
2454 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2456 GOTO(out, rc = -EPROTO);
2463 ptlrpc_req_finished(req);
2467 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2468 void *karg, void __user *uarg)
2470 struct obd_device *obd = exp->exp_obd;
2471 struct obd_ioctl_data *data = karg;
2475 if (!try_module_get(THIS_MODULE)) {
2476 CERROR("%s: cannot get module '%s'\n", obd->obd_name,
2477 module_name(THIS_MODULE));
2481 case OBD_IOC_CLIENT_RECOVER:
2482 err = ptlrpc_recover_import(obd->u.cli.cl_import,
2483 data->ioc_inlbuf1, 0);
2487 case IOC_OSC_SET_ACTIVE:
2488 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
2491 case OBD_IOC_PING_TARGET:
2492 err = ptlrpc_obd_ping(obd);
2495 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
2496 cmd, current_comm());
2497 GOTO(out, err = -ENOTTY);
2500 module_put(THIS_MODULE);
2504 int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
2505 u32 keylen, void *key, u32 vallen, void *val,
2506 struct ptlrpc_request_set *set)
2508 struct ptlrpc_request *req;
2509 struct obd_device *obd = exp->exp_obd;
2510 struct obd_import *imp = class_exp2cliimp(exp);
2515 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
2517 if (KEY_IS(KEY_CHECKSUM)) {
2518 if (vallen != sizeof(int))
2520 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
2524 if (KEY_IS(KEY_SPTLRPC_CONF)) {
2525 sptlrpc_conf_client_adapt(obd);
2529 if (KEY_IS(KEY_FLUSH_CTX)) {
2530 sptlrpc_import_flush_my_ctx(imp);
2534 if (KEY_IS(KEY_CACHE_SET)) {
2535 struct client_obd *cli = &obd->u.cli;
2537 LASSERT(cli->cl_cache == NULL); /* only once */
2538 cli->cl_cache = (struct cl_client_cache *)val;
2539 cl_cache_incref(cli->cl_cache);
2540 cli->cl_lru_left = &cli->cl_cache->ccc_lru_left;
2542 /* add this osc into entity list */
2543 LASSERT(list_empty(&cli->cl_lru_osc));
2544 spin_lock(&cli->cl_cache->ccc_lru_lock);
2545 list_add(&cli->cl_lru_osc, &cli->cl_cache->ccc_lru);
2546 spin_unlock(&cli->cl_cache->ccc_lru_lock);
2551 if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
2552 struct client_obd *cli = &obd->u.cli;
2553 long nr = atomic_long_read(&cli->cl_lru_in_list) >> 1;
2554 long target = *(long *)val;
2556 nr = osc_lru_shrink(env, cli, min(nr, target), true);
2561 if (!set && !KEY_IS(KEY_GRANT_SHRINK))
2564 /* We pass all other commands directly to OST. Since nobody calls osc
2565 methods directly and everybody is supposed to go through LOV, we
2566 assume lov checked invalid values for us.
2567 The only recognised values so far are evict_by_nid and mds_conn.
2568 Even if something bad goes through, we'd get a -EINVAL from OST
2571 req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ?
2572 &RQF_OST_SET_GRANT_INFO :
2577 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
2578 RCL_CLIENT, keylen);
2579 if (!KEY_IS(KEY_GRANT_SHRINK))
2580 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
2581 RCL_CLIENT, vallen);
2582 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
2584 ptlrpc_request_free(req);
2588 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
2589 memcpy(tmp, key, keylen);
2590 tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ?
2593 memcpy(tmp, val, vallen);
2595 if (KEY_IS(KEY_GRANT_SHRINK)) {
2596 struct osc_grant_args *aa;
2599 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2600 aa = ptlrpc_req_async_args(req);
2603 ptlrpc_req_finished(req);
2606 *oa = ((struct ost_body *)val)->oa;
2608 req->rq_interpret_reply = osc_shrink_grant_interpret;
2611 ptlrpc_request_set_replen(req);
2612 if (!KEY_IS(KEY_GRANT_SHRINK)) {
2613 LASSERT(set != NULL);
2614 ptlrpc_set_add_req(set, req);
2615 ptlrpc_check_set(NULL, set);
2617 ptlrpcd_add_req(req);
2622 EXPORT_SYMBOL(osc_set_info_async);
2624 static int osc_reconnect(const struct lu_env *env,
2625 struct obd_export *exp, struct obd_device *obd,
2626 struct obd_uuid *cluuid,
2627 struct obd_connect_data *data,
2630 struct client_obd *cli = &obd->u.cli;
2632 if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
2636 spin_lock(&cli->cl_loi_list_lock);
2637 grant = cli->cl_avail_grant + cli->cl_reserved_grant;
2638 if (data->ocd_connect_flags & OBD_CONNECT_GRANT_PARAM)
2639 grant += cli->cl_dirty_grant;
2641 grant += cli->cl_dirty_pages << PAGE_SHIFT;
2642 data->ocd_grant = grant ? : 2 * cli_brw_size(obd);
2643 lost_grant = cli->cl_lost_grant;
2644 cli->cl_lost_grant = 0;
2645 spin_unlock(&cli->cl_loi_list_lock);
2647 CDEBUG(D_RPCTRACE, "ocd_connect_flags: %#llx ocd_version: %d"
2648 " ocd_grant: %d, lost: %ld.\n", data->ocd_connect_flags,
2649 data->ocd_version, data->ocd_grant, lost_grant);
2655 static int osc_disconnect(struct obd_export *exp)
2657 struct obd_device *obd = class_exp2obd(exp);
2660 rc = client_disconnect_export(exp);
2662 * Initially we put del_shrink_grant before disconnect_export, but it
2663 * causes the following problem if setup (connect) and cleanup
2664 * (disconnect) are tangled together.
2665 * connect p1 disconnect p2
2666 * ptlrpc_connect_import
2667 * ............... class_manual_cleanup
2670 * ptlrpc_connect_interrupt
2672 * add this client to shrink list
2674 * Bang! pinger trigger the shrink.
2675 * So the osc should be disconnected from the shrink list, after we
2676 * are sure the import has been destroyed. BUG18662
2678 if (obd->u.cli.cl_import == NULL)
2679 osc_del_shrink_grant(&obd->u.cli);
2683 int osc_ldlm_resource_invalidate(struct cfs_hash *hs, struct cfs_hash_bd *bd,
2684 struct hlist_node *hnode, void *arg)
2686 struct lu_env *env = arg;
2687 struct ldlm_resource *res = cfs_hash_object(hs, hnode);
2688 struct ldlm_lock *lock;
2689 struct osc_object *osc = NULL;
2693 list_for_each_entry(lock, &res->lr_granted, l_res_link) {
2694 if (lock->l_ast_data != NULL && osc == NULL) {
2695 osc = lock->l_ast_data;
2696 cl_object_get(osc2cl(osc));
2699 /* clear LDLM_FL_CLEANED flag to make sure it will be canceled
2700 * by the 2nd round of ldlm_namespace_clean() call in
2701 * osc_import_event(). */
2702 ldlm_clear_cleaned(lock);
2707 osc_object_invalidate(env, osc);
2708 cl_object_put(env, osc2cl(osc));
2713 EXPORT_SYMBOL(osc_ldlm_resource_invalidate);
2715 static int osc_import_event(struct obd_device *obd,
2716 struct obd_import *imp,
2717 enum obd_import_event event)
2719 struct client_obd *cli;
2723 LASSERT(imp->imp_obd == obd);
2726 case IMP_EVENT_DISCON: {
2728 spin_lock(&cli->cl_loi_list_lock);
2729 cli->cl_avail_grant = 0;
2730 cli->cl_lost_grant = 0;
2731 spin_unlock(&cli->cl_loi_list_lock);
2734 case IMP_EVENT_INACTIVE: {
2735 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE);
2738 case IMP_EVENT_INVALIDATE: {
2739 struct ldlm_namespace *ns = obd->obd_namespace;
2743 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
2745 env = cl_env_get(&refcheck);
2747 osc_io_unplug(env, &obd->u.cli, NULL);
2749 cfs_hash_for_each_nolock(ns->ns_rs_hash,
2750 osc_ldlm_resource_invalidate,
2752 cl_env_put(env, &refcheck);
2754 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
2759 case IMP_EVENT_ACTIVE: {
2760 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE);
2763 case IMP_EVENT_OCD: {
2764 struct obd_connect_data *ocd = &imp->imp_connect_data;
2766 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
2767 osc_init_grant(&obd->u.cli, ocd);
2770 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
2771 imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
2773 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD);
2776 case IMP_EVENT_DEACTIVATE: {
2777 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE);
2780 case IMP_EVENT_ACTIVATE: {
2781 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE);
2785 CERROR("Unknown import event %d\n", event);
2792 * Determine whether the lock can be canceled before replaying the lock
2793 * during recovery, see bug16774 for detailed information.
2795 * \retval zero the lock can't be canceled
2796 * \retval other ok to cancel
2798 static int osc_cancel_weight(struct ldlm_lock *lock)
2801 * Cancel all unused and granted extent lock.
2803 if (lock->l_resource->lr_type == LDLM_EXTENT &&
2804 lock->l_granted_mode == lock->l_req_mode &&
2805 osc_ldlm_weigh_ast(lock) == 0)
2811 static int brw_queue_work(const struct lu_env *env, void *data)
2813 struct client_obd *cli = data;
2815 CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
2817 osc_io_unplug(env, cli, NULL);
2821 int osc_setup_common(struct obd_device *obd, struct lustre_cfg *lcfg)
2823 struct client_obd *cli = &obd->u.cli;
2829 rc = ptlrpcd_addref();
2833 rc = client_obd_setup(obd, lcfg);
2835 GOTO(out_ptlrpcd, rc);
2838 handler = ptlrpcd_alloc_work(cli->cl_import, brw_queue_work, cli);
2839 if (IS_ERR(handler))
2840 GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
2841 cli->cl_writeback_work = handler;
2843 handler = ptlrpcd_alloc_work(cli->cl_import, lru_queue_work, cli);
2844 if (IS_ERR(handler))
2845 GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
2846 cli->cl_lru_work = handler;
2848 rc = osc_quota_setup(obd);
2850 GOTO(out_ptlrpcd_work, rc);
2852 cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
2854 INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
2858 if (cli->cl_writeback_work != NULL) {
2859 ptlrpcd_destroy_work(cli->cl_writeback_work);
2860 cli->cl_writeback_work = NULL;
2862 if (cli->cl_lru_work != NULL) {
2863 ptlrpcd_destroy_work(cli->cl_lru_work);
2864 cli->cl_lru_work = NULL;
2866 client_obd_cleanup(obd);
2871 EXPORT_SYMBOL(osc_setup_common);
2873 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
2875 struct client_obd *cli = &obd->u.cli;
2876 struct obd_type *type;
2884 rc = osc_setup_common(obd, lcfg);
2888 #ifdef CONFIG_PROC_FS
2889 obd->obd_vars = lprocfs_osc_obd_vars;
2891 /* If this is true then both client (osc) and server (osp) are on the
2892 * same node. The osp layer if loaded first will register the osc proc
2893 * directory. In that case this obd_device will be attached its proc
2894 * tree to type->typ_procsym instead of obd->obd_type->typ_procroot.
2896 type = class_search_type(LUSTRE_OSP_NAME);
2897 if (type && type->typ_procsym) {
2898 obd->obd_proc_entry = lprocfs_register(obd->obd_name,
2900 obd->obd_vars, obd);
2901 if (IS_ERR(obd->obd_proc_entry)) {
2902 rc = PTR_ERR(obd->obd_proc_entry);
2903 CERROR("error %d setting up lprocfs for %s\n", rc,
2905 obd->obd_proc_entry = NULL;
2909 rc = lprocfs_obd_setup(obd, false);
2911 /* If the basic OSC proc tree construction succeeded then
2914 lproc_osc_attach_seqstat(obd);
2915 sptlrpc_lprocfs_cliobd_attach(obd);
2916 ptlrpc_lprocfs_register_obd(obd);
2920 * We try to control the total number of requests with a upper limit
2921 * osc_reqpool_maxreqcount. There might be some race which will cause
2922 * over-limit allocation, but it is fine.
2924 req_count = atomic_read(&osc_pool_req_count);
2925 if (req_count < osc_reqpool_maxreqcount) {
2926 adding = cli->cl_max_rpcs_in_flight + 2;
2927 if (req_count + adding > osc_reqpool_maxreqcount)
2928 adding = osc_reqpool_maxreqcount - req_count;
2930 added = ptlrpc_add_rqs_to_pool(osc_rq_pool, adding);
2931 atomic_add(added, &osc_pool_req_count);
2934 INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
2935 ns_register_cancel(obd->obd_namespace, osc_cancel_weight);
2937 spin_lock(&osc_shrink_lock);
2938 list_add_tail(&cli->cl_shrink_list, &osc_shrink_list);
2939 spin_unlock(&osc_shrink_lock);
2944 int osc_precleanup_common(struct obd_device *obd)
2946 struct client_obd *cli = &obd->u.cli;
2950 * for echo client, export may be on zombie list, wait for
2951 * zombie thread to cull it, because cli.cl_import will be
2952 * cleared in client_disconnect_export():
2953 * class_export_destroy() -> obd_cleanup() ->
2954 * echo_device_free() -> echo_client_cleanup() ->
2955 * obd_disconnect() -> osc_disconnect() ->
2956 * client_disconnect_export()
2958 obd_zombie_barrier();
2959 if (cli->cl_writeback_work) {
2960 ptlrpcd_destroy_work(cli->cl_writeback_work);
2961 cli->cl_writeback_work = NULL;
2964 if (cli->cl_lru_work) {
2965 ptlrpcd_destroy_work(cli->cl_lru_work);
2966 cli->cl_lru_work = NULL;
2969 obd_cleanup_client_import(obd);
2972 EXPORT_SYMBOL(osc_precleanup_common);
2974 static int osc_precleanup(struct obd_device *obd)
2978 osc_precleanup_common(obd);
2980 ptlrpc_lprocfs_unregister_obd(obd);
2981 lprocfs_obd_cleanup(obd);
2985 int osc_cleanup_common(struct obd_device *obd)
2987 struct client_obd *cli = &obd->u.cli;
2992 spin_lock(&osc_shrink_lock);
2993 list_del(&cli->cl_shrink_list);
2994 spin_unlock(&osc_shrink_lock);
2997 if (cli->cl_cache != NULL) {
2998 LASSERT(atomic_read(&cli->cl_cache->ccc_users) > 0);
2999 spin_lock(&cli->cl_cache->ccc_lru_lock);
3000 list_del_init(&cli->cl_lru_osc);
3001 spin_unlock(&cli->cl_cache->ccc_lru_lock);
3002 cli->cl_lru_left = NULL;
3003 cl_cache_decref(cli->cl_cache);
3004 cli->cl_cache = NULL;
3007 /* free memory of osc quota cache */
3008 osc_quota_cleanup(obd);
3010 rc = client_obd_cleanup(obd);
3015 EXPORT_SYMBOL(osc_cleanup_common);
3017 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
3019 int rc = class_process_proc_param(PARAM_OSC, obd->obd_vars, lcfg, obd);
3020 return rc > 0 ? 0: rc;
3023 static int osc_process_config(struct obd_device *obd, size_t len, void *buf)
3025 return osc_process_config_base(obd, buf);
3028 static struct obd_ops osc_obd_ops = {
3029 .o_owner = THIS_MODULE,
3030 .o_setup = osc_setup,
3031 .o_precleanup = osc_precleanup,
3032 .o_cleanup = osc_cleanup_common,
3033 .o_add_conn = client_import_add_conn,
3034 .o_del_conn = client_import_del_conn,
3035 .o_connect = client_connect_import,
3036 .o_reconnect = osc_reconnect,
3037 .o_disconnect = osc_disconnect,
3038 .o_statfs = osc_statfs,
3039 .o_statfs_async = osc_statfs_async,
3040 .o_create = osc_create,
3041 .o_destroy = osc_destroy,
3042 .o_getattr = osc_getattr,
3043 .o_setattr = osc_setattr,
3044 .o_iocontrol = osc_iocontrol,
3045 .o_set_info_async = osc_set_info_async,
3046 .o_import_event = osc_import_event,
3047 .o_process_config = osc_process_config,
3048 .o_quotactl = osc_quotactl,
3051 static struct shrinker *osc_cache_shrinker;
3052 struct list_head osc_shrink_list = LIST_HEAD_INIT(osc_shrink_list);
3053 DEFINE_SPINLOCK(osc_shrink_lock);
3055 #ifndef HAVE_SHRINKER_COUNT
3056 static int osc_cache_shrink(SHRINKER_ARGS(sc, nr_to_scan, gfp_mask))
3058 struct shrink_control scv = {
3059 .nr_to_scan = shrink_param(sc, nr_to_scan),
3060 .gfp_mask = shrink_param(sc, gfp_mask)
3062 #if !defined(HAVE_SHRINKER_WANT_SHRINK_PTR) && !defined(HAVE_SHRINK_CONTROL)
3063 struct shrinker *shrinker = NULL;
3066 (void)osc_cache_shrink_scan(shrinker, &scv);
3068 return osc_cache_shrink_count(shrinker, &scv);
3072 static int __init osc_init(void)
3074 bool enable_proc = true;
3075 struct obd_type *type;
3076 unsigned int reqpool_size;
3077 unsigned int reqsize;
3079 DEF_SHRINKER_VAR(osc_shvar, osc_cache_shrink,
3080 osc_cache_shrink_count, osc_cache_shrink_scan);
3083 /* print an address of _any_ initialized kernel symbol from this
3084 * module, to allow debugging with gdb that doesn't support data
3085 * symbols from modules.*/
3086 CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
3088 rc = lu_kmem_init(osc_caches);
3092 type = class_search_type(LUSTRE_OSP_NAME);
3093 if (type != NULL && type->typ_procsym != NULL)
3094 enable_proc = false;
3096 rc = class_register_type(&osc_obd_ops, NULL, enable_proc, NULL,
3097 LUSTRE_OSC_NAME, &osc_device_type);
3101 osc_cache_shrinker = set_shrinker(DEFAULT_SEEKS, &osc_shvar);
3103 /* This is obviously too much memory, only prevent overflow here */
3104 if (osc_reqpool_mem_max >= 1 << 12 || osc_reqpool_mem_max == 0)
3105 GOTO(out_type, rc = -EINVAL);
3107 reqpool_size = osc_reqpool_mem_max << 20;
3110 while (reqsize < OST_IO_MAXREQSIZE)
3111 reqsize = reqsize << 1;
3114 * We don't enlarge the request count in OSC pool according to
3115 * cl_max_rpcs_in_flight. The allocation from the pool will only be
3116 * tried after normal allocation failed. So a small OSC pool won't
3117 * cause much performance degression in most of cases.
3119 osc_reqpool_maxreqcount = reqpool_size / reqsize;
3121 atomic_set(&osc_pool_req_count, 0);
3122 osc_rq_pool = ptlrpc_init_rq_pool(0, OST_IO_MAXREQSIZE,
3123 ptlrpc_add_rqs_to_pool);
3125 if (osc_rq_pool != NULL)
3129 class_unregister_type(LUSTRE_OSC_NAME);
3131 lu_kmem_fini(osc_caches);
3136 static void __exit osc_exit(void)
3138 remove_shrinker(osc_cache_shrinker);
3139 class_unregister_type(LUSTRE_OSC_NAME);
3140 lu_kmem_fini(osc_caches);
3141 ptlrpc_free_rq_pool(osc_rq_pool);
3144 MODULE_AUTHOR("OpenSFS, Inc. <http://www.lustre.org/>");
3145 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3146 MODULE_VERSION(LUSTRE_VERSION_STRING);
3147 MODULE_LICENSE("GPL");
3149 module_init(osc_init);
3150 module_exit(osc_exit);