4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
30 * Copyright (c) 2011, 2014, Intel Corporation.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
37 #define DEBUG_SUBSYSTEM S_OSC
39 #include <libcfs/libcfs.h>
41 #include <lustre_dlm.h>
42 #include <lustre_net.h>
43 #include <lustre/lustre_user.h>
44 #include <obd_cksum.h>
45 #include <lustre_ha.h>
46 #include <lprocfs_status.h>
47 #include <lustre_ioctl.h>
48 #include <lustre_debug.h>
49 #include <lustre_param.h>
50 #include <lustre_fid.h>
51 #include <obd_class.h>
52 #include "osc_internal.h"
53 #include "osc_cl_internal.h"
55 struct osc_brw_async_args {
61 struct brw_page **aa_ppga;
62 struct client_obd *aa_cli;
63 struct list_head aa_oaps;
64 struct list_head aa_exts;
65 struct cl_req *aa_clerq;
68 #define osc_grant_args osc_brw_async_args
70 struct osc_setattr_args {
72 obd_enqueue_update_f sa_upcall;
76 struct osc_fsync_args {
78 obd_enqueue_update_f fa_upcall;
82 struct osc_enqueue_args {
83 struct obd_export *oa_exp;
87 osc_enqueue_upcall_f oa_upcall;
89 struct ost_lvb *oa_lvb;
90 struct lustre_handle oa_lockh;
91 unsigned int oa_agl:1;
94 static void osc_release_ppga(struct brw_page **ppga, size_t count);
95 static int brw_interpret(const struct lu_env *env, struct ptlrpc_request *req,
98 void osc_pack_req_body(struct ptlrpc_request *req, struct obdo *oa)
100 struct ost_body *body;
102 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
105 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
108 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
111 struct ptlrpc_request *req;
112 struct ost_body *body;
116 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
120 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
122 ptlrpc_request_free(req);
126 osc_pack_req_body(req, oa);
128 ptlrpc_request_set_replen(req);
130 rc = ptlrpc_queue_wait(req);
134 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
136 GOTO(out, rc = -EPROTO);
138 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
139 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
141 oa->o_blksize = cli_brw_size(exp->exp_obd);
142 oa->o_valid |= OBD_MD_FLBLKSZ;
146 ptlrpc_req_finished(req);
151 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
154 struct ptlrpc_request *req;
155 struct ost_body *body;
159 LASSERT(oa->o_valid & OBD_MD_FLGROUP);
161 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
165 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
167 ptlrpc_request_free(req);
171 osc_pack_req_body(req, oa);
173 ptlrpc_request_set_replen(req);
175 rc = ptlrpc_queue_wait(req);
179 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
181 GOTO(out, rc = -EPROTO);
183 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
187 ptlrpc_req_finished(req);
192 static int osc_setattr_interpret(const struct lu_env *env,
193 struct ptlrpc_request *req,
194 struct osc_setattr_args *sa, int rc)
196 struct ost_body *body;
202 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
204 GOTO(out, rc = -EPROTO);
206 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, sa->sa_oa,
209 rc = sa->sa_upcall(sa->sa_cookie, rc);
213 int osc_setattr_async(struct obd_export *exp, struct obdo *oa,
214 obd_enqueue_update_f upcall, void *cookie,
215 struct ptlrpc_request_set *rqset)
217 struct ptlrpc_request *req;
218 struct osc_setattr_args *sa;
223 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
227 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
229 ptlrpc_request_free(req);
233 osc_pack_req_body(req, oa);
235 ptlrpc_request_set_replen(req);
237 /* do mds to ost setattr asynchronously */
239 /* Do not wait for response. */
240 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
242 req->rq_interpret_reply =
243 (ptlrpc_interpterer_t)osc_setattr_interpret;
245 CLASSERT(sizeof(*sa) <= sizeof(req->rq_async_args));
246 sa = ptlrpc_req_async_args(req);
248 sa->sa_upcall = upcall;
249 sa->sa_cookie = cookie;
251 if (rqset == PTLRPCD_SET)
252 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
254 ptlrpc_set_add_req(rqset, req);
260 static int osc_create(const struct lu_env *env, struct obd_export *exp,
263 struct ptlrpc_request *req;
264 struct ost_body *body;
269 LASSERT(oa->o_valid & OBD_MD_FLGROUP);
270 LASSERT(fid_seq_is_echo(ostid_seq(&oa->o_oi)));
272 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
274 GOTO(out, rc = -ENOMEM);
276 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
278 ptlrpc_request_free(req);
282 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
285 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
287 ptlrpc_request_set_replen(req);
289 rc = ptlrpc_queue_wait(req);
293 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
295 GOTO(out_req, rc = -EPROTO);
297 CDEBUG(D_INFO, "oa flags %x\n", oa->o_flags);
298 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
300 oa->o_blksize = cli_brw_size(exp->exp_obd);
301 oa->o_valid |= OBD_MD_FLBLKSZ;
303 CDEBUG(D_HA, "transno: "LPD64"\n",
304 lustre_msg_get_transno(req->rq_repmsg));
306 ptlrpc_req_finished(req);
311 int osc_punch_base(struct obd_export *exp, struct obdo *oa,
312 obd_enqueue_update_f upcall, void *cookie,
313 struct ptlrpc_request_set *rqset)
315 struct ptlrpc_request *req;
316 struct osc_setattr_args *sa;
317 struct ost_body *body;
321 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
325 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
327 ptlrpc_request_free(req);
330 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
331 ptlrpc_at_set_req_timeout(req);
333 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
335 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
337 ptlrpc_request_set_replen(req);
339 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
340 CLASSERT(sizeof(*sa) <= sizeof(req->rq_async_args));
341 sa = ptlrpc_req_async_args(req);
343 sa->sa_upcall = upcall;
344 sa->sa_cookie = cookie;
345 if (rqset == PTLRPCD_SET)
346 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
348 ptlrpc_set_add_req(rqset, req);
353 static int osc_sync_interpret(const struct lu_env *env,
354 struct ptlrpc_request *req,
357 struct osc_fsync_args *fa = arg;
358 struct ost_body *body;
364 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
366 CERROR ("can't unpack ost_body\n");
367 GOTO(out, rc = -EPROTO);
370 *fa->fa_oa = body->oa;
372 rc = fa->fa_upcall(fa->fa_cookie, rc);
376 int osc_sync_base(struct obd_export *exp, struct obdo *oa,
377 obd_enqueue_update_f upcall, void *cookie,
378 struct ptlrpc_request_set *rqset)
380 struct ptlrpc_request *req;
381 struct ost_body *body;
382 struct osc_fsync_args *fa;
386 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
390 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
392 ptlrpc_request_free(req);
396 /* overload the size and blocks fields in the oa with start/end */
397 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
399 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
401 ptlrpc_request_set_replen(req);
402 req->rq_interpret_reply = osc_sync_interpret;
404 CLASSERT(sizeof(*fa) <= sizeof(req->rq_async_args));
405 fa = ptlrpc_req_async_args(req);
407 fa->fa_upcall = upcall;
408 fa->fa_cookie = cookie;
410 if (rqset == PTLRPCD_SET)
411 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
413 ptlrpc_set_add_req(rqset, req);
418 /* Find and cancel locally locks matched by @mode in the resource found by
419 * @objid. Found locks are added into @cancel list. Returns the amount of
420 * locks added to @cancels list. */
421 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
422 struct list_head *cancels,
423 ldlm_mode_t mode, __u64 lock_flags)
425 struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
426 struct ldlm_res_id res_id;
427 struct ldlm_resource *res;
431 /* Return, i.e. cancel nothing, only if ELC is supported (flag in
432 * export) but disabled through procfs (flag in NS).
434 * This distinguishes from a case when ELC is not supported originally,
435 * when we still want to cancel locks in advance and just cancel them
436 * locally, without sending any RPC. */
437 if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
440 ostid_build_res_name(&oa->o_oi, &res_id);
441 res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
445 LDLM_RESOURCE_ADDREF(res);
446 count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
447 lock_flags, 0, NULL);
448 LDLM_RESOURCE_DELREF(res);
449 ldlm_resource_putref(res);
453 static int osc_destroy_interpret(const struct lu_env *env,
454 struct ptlrpc_request *req, void *data,
457 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
459 atomic_dec(&cli->cl_destroy_in_flight);
460 wake_up(&cli->cl_destroy_waitq);
464 static int osc_can_send_destroy(struct client_obd *cli)
466 if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
467 cli->cl_max_rpcs_in_flight) {
468 /* The destroy request can be sent */
471 if (atomic_dec_return(&cli->cl_destroy_in_flight) <
472 cli->cl_max_rpcs_in_flight) {
474 * The counter has been modified between the two atomic
477 wake_up(&cli->cl_destroy_waitq);
482 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
485 struct client_obd *cli = &exp->exp_obd->u.cli;
486 struct ptlrpc_request *req;
487 struct ost_body *body;
488 struct list_head cancels = LIST_HEAD_INIT(cancels);
493 CDEBUG(D_INFO, "oa NULL\n");
497 count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
498 LDLM_FL_DISCARD_DATA);
500 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
502 ldlm_lock_list_put(&cancels, l_bl_ast, count);
506 rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
509 ptlrpc_request_free(req);
513 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
514 ptlrpc_at_set_req_timeout(req);
516 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
518 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
520 ptlrpc_request_set_replen(req);
522 req->rq_interpret_reply = osc_destroy_interpret;
523 if (!osc_can_send_destroy(cli)) {
524 struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
527 * Wait until the number of on-going destroy RPCs drops
528 * under max_rpc_in_flight
530 l_wait_event_exclusive(cli->cl_destroy_waitq,
531 osc_can_send_destroy(cli), &lwi);
534 /* Do not wait for response */
535 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
539 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
542 u64 bits = OBD_MD_FLBLOCKS | OBD_MD_FLGRANT;
544 LASSERT(!(oa->o_valid & bits));
547 spin_lock(&cli->cl_loi_list_lock);
548 oa->o_dirty = cli->cl_dirty_pages << PAGE_CACHE_SHIFT;
549 if (unlikely(cli->cl_dirty_pages - cli->cl_dirty_transit >
550 cli->cl_dirty_max_pages)) {
551 CERROR("dirty %lu - %lu > dirty_max %lu\n",
552 cli->cl_dirty_pages, cli->cl_dirty_transit,
553 cli->cl_dirty_max_pages);
555 } else if (unlikely(atomic_long_read(&obd_dirty_pages) -
556 atomic_long_read(&obd_dirty_transit_pages) >
557 (obd_max_dirty_pages + 1))) {
558 /* The atomic_read() allowing the atomic_inc() are
559 * not covered by a lock thus they may safely race and trip
560 * this CERROR() unless we add in a small fudge factor (+1). */
561 CERROR("%s: dirty %ld - %ld > system dirty_max %lu\n",
562 cli->cl_import->imp_obd->obd_name,
563 atomic_long_read(&obd_dirty_pages),
564 atomic_long_read(&obd_dirty_transit_pages),
565 obd_max_dirty_pages);
567 } else if (unlikely(cli->cl_dirty_max_pages - cli->cl_dirty_pages >
569 CERROR("dirty %lu - dirty_max %lu too big???\n",
570 cli->cl_dirty_pages, cli->cl_dirty_max_pages);
573 unsigned long max_in_flight = (cli->cl_max_pages_per_rpc <<
575 (cli->cl_max_rpcs_in_flight + 1);
576 oa->o_undirty = max(cli->cl_dirty_max_pages << PAGE_CACHE_SHIFT,
579 oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
580 oa->o_dropped = cli->cl_lost_grant;
581 cli->cl_lost_grant = 0;
582 spin_unlock(&cli->cl_loi_list_lock);
583 CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
584 oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
588 void osc_update_next_shrink(struct client_obd *cli)
590 cli->cl_next_shrink_grant =
591 cfs_time_shift(cli->cl_grant_shrink_interval);
592 CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
593 cli->cl_next_shrink_grant);
596 static void __osc_update_grant(struct client_obd *cli, u64 grant)
598 spin_lock(&cli->cl_loi_list_lock);
599 cli->cl_avail_grant += grant;
600 spin_unlock(&cli->cl_loi_list_lock);
603 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
605 if (body->oa.o_valid & OBD_MD_FLGRANT) {
606 CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
607 __osc_update_grant(cli, body->oa.o_grant);
611 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
612 u32 keylen, void *key,
613 u32 vallen, void *val,
614 struct ptlrpc_request_set *set);
616 static int osc_shrink_grant_interpret(const struct lu_env *env,
617 struct ptlrpc_request *req,
620 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
621 struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
622 struct ost_body *body;
625 __osc_update_grant(cli, oa->o_grant);
629 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
631 osc_update_grant(cli, body);
637 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
639 spin_lock(&cli->cl_loi_list_lock);
640 oa->o_grant = cli->cl_avail_grant / 4;
641 cli->cl_avail_grant -= oa->o_grant;
642 spin_unlock(&cli->cl_loi_list_lock);
643 if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
644 oa->o_valid |= OBD_MD_FLFLAGS;
647 oa->o_flags |= OBD_FL_SHRINK_GRANT;
648 osc_update_next_shrink(cli);
651 /* Shrink the current grant, either from some large amount to enough for a
652 * full set of in-flight RPCs, or if we have already shrunk to that limit
653 * then to enough for a single RPC. This avoids keeping more grant than
654 * needed, and avoids shrinking the grant piecemeal. */
655 static int osc_shrink_grant(struct client_obd *cli)
657 __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) *
658 (cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT);
660 spin_lock(&cli->cl_loi_list_lock);
661 if (cli->cl_avail_grant <= target_bytes)
662 target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
663 spin_unlock(&cli->cl_loi_list_lock);
665 return osc_shrink_grant_to_target(cli, target_bytes);
668 int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
671 struct ost_body *body;
674 spin_lock(&cli->cl_loi_list_lock);
675 /* Don't shrink if we are already above or below the desired limit
676 * We don't want to shrink below a single RPC, as that will negatively
677 * impact block allocation and long-term performance. */
678 if (target_bytes < cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT)
679 target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
681 if (target_bytes >= cli->cl_avail_grant) {
682 spin_unlock(&cli->cl_loi_list_lock);
685 spin_unlock(&cli->cl_loi_list_lock);
691 osc_announce_cached(cli, &body->oa, 0);
693 spin_lock(&cli->cl_loi_list_lock);
694 body->oa.o_grant = cli->cl_avail_grant - target_bytes;
695 cli->cl_avail_grant = target_bytes;
696 spin_unlock(&cli->cl_loi_list_lock);
697 if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
698 body->oa.o_valid |= OBD_MD_FLFLAGS;
699 body->oa.o_flags = 0;
701 body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
702 osc_update_next_shrink(cli);
704 rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
705 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
706 sizeof(*body), body, NULL);
708 __osc_update_grant(cli, body->oa.o_grant);
713 static int osc_should_shrink_grant(struct client_obd *client)
715 cfs_time_t time = cfs_time_current();
716 cfs_time_t next_shrink = client->cl_next_shrink_grant;
718 if ((client->cl_import->imp_connect_data.ocd_connect_flags &
719 OBD_CONNECT_GRANT_SHRINK) == 0)
722 if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
723 /* Get the current RPC size directly, instead of going via:
724 * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
725 * Keep comment here so that it can be found by searching. */
726 int brw_size = client->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
728 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
729 client->cl_avail_grant > brw_size)
732 osc_update_next_shrink(client);
737 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
739 struct client_obd *client;
741 list_for_each_entry(client, &item->ti_obd_list, cl_grant_shrink_list) {
742 if (osc_should_shrink_grant(client))
743 osc_shrink_grant(client);
748 static int osc_add_shrink_grant(struct client_obd *client)
752 rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
754 osc_grant_shrink_grant_cb, NULL,
755 &client->cl_grant_shrink_list);
757 CERROR("add grant client %s error %d\n",
758 client->cl_import->imp_obd->obd_name, rc);
761 CDEBUG(D_CACHE, "add grant client %s \n",
762 client->cl_import->imp_obd->obd_name);
763 osc_update_next_shrink(client);
767 static int osc_del_shrink_grant(struct client_obd *client)
769 return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
773 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
776 * ocd_grant is the total grant amount we're expect to hold: if we've
777 * been evicted, it's the new avail_grant amount, cl_dirty_pages will
778 * drop to 0 as inflight RPCs fail out; otherwise, it's avail_grant +
781 * race is tolerable here: if we're evicted, but imp_state already
782 * left EVICTED state, then cl_dirty_pages must be 0 already.
784 spin_lock(&cli->cl_loi_list_lock);
785 if (cli->cl_import->imp_state == LUSTRE_IMP_EVICTED)
786 cli->cl_avail_grant = ocd->ocd_grant;
788 cli->cl_avail_grant = ocd->ocd_grant -
789 (cli->cl_dirty_pages << PAGE_CACHE_SHIFT);
791 if (cli->cl_avail_grant < 0) {
792 CWARN("%s: available grant < 0: avail/ocd/dirty %ld/%u/%ld\n",
793 cli->cl_import->imp_obd->obd_name, cli->cl_avail_grant,
794 ocd->ocd_grant, cli->cl_dirty_pages << PAGE_CACHE_SHIFT);
795 /* workaround for servers which do not have the patch from
797 cli->cl_avail_grant = ocd->ocd_grant;
800 /* determine the appropriate chunk size used by osc_extent. */
801 cli->cl_chunkbits = max_t(int, PAGE_CACHE_SHIFT, ocd->ocd_blocksize);
802 spin_unlock(&cli->cl_loi_list_lock);
804 CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld."
805 "chunk bits: %d.\n", cli->cl_import->imp_obd->obd_name,
806 cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits);
808 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
809 list_empty(&cli->cl_grant_shrink_list))
810 osc_add_shrink_grant(cli);
813 /* We assume that the reason this OSC got a short read is because it read
814 * beyond the end of a stripe file; i.e. lustre is reading a sparse file
815 * via the LOV, and it _knows_ it's reading inside the file, it's just that
816 * this stripe never got written at or beyond this stripe offset yet. */
817 static void handle_short_read(int nob_read, size_t page_count,
818 struct brw_page **pga)
823 /* skip bytes read OK */
824 while (nob_read > 0) {
825 LASSERT (page_count > 0);
827 if (pga[i]->count > nob_read) {
828 /* EOF inside this page */
829 ptr = kmap(pga[i]->pg) +
830 (pga[i]->off & ~PAGE_MASK);
831 memset(ptr + nob_read, 0, pga[i]->count - nob_read);
838 nob_read -= pga[i]->count;
843 /* zero remaining pages */
844 while (page_count-- > 0) {
845 ptr = kmap(pga[i]->pg) + (pga[i]->off & ~PAGE_MASK);
846 memset(ptr, 0, pga[i]->count);
852 static int check_write_rcs(struct ptlrpc_request *req,
853 int requested_nob, int niocount,
854 size_t page_count, struct brw_page **pga)
859 remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
860 sizeof(*remote_rcs) *
862 if (remote_rcs == NULL) {
863 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
867 /* return error if any niobuf was in error */
868 for (i = 0; i < niocount; i++) {
869 if ((int)remote_rcs[i] < 0)
870 return(remote_rcs[i]);
872 if (remote_rcs[i] != 0) {
873 CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
874 i, remote_rcs[i], req);
879 if (req->rq_bulk->bd_nob_transferred != requested_nob) {
880 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
881 req->rq_bulk->bd_nob_transferred, requested_nob);
888 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
890 if (p1->flag != p2->flag) {
891 unsigned mask = ~(OBD_BRW_FROM_GRANT | OBD_BRW_NOCACHE |
892 OBD_BRW_SYNC | OBD_BRW_ASYNC |
893 OBD_BRW_NOQUOTA | OBD_BRW_SOFT_SYNC);
895 /* warn if we try to combine flags that we don't know to be
897 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
898 CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
899 "report this at https://jira.hpdd.intel.com/\n",
905 return (p1->off + p1->count == p2->off);
908 static u32 osc_checksum_bulk(int nob, size_t pg_count,
909 struct brw_page **pga, int opc,
910 cksum_type_t cksum_type)
914 struct cfs_crypto_hash_desc *hdesc;
915 unsigned int bufsize;
917 unsigned char cfs_alg = cksum_obd2cfs(cksum_type);
919 LASSERT(pg_count > 0);
921 hdesc = cfs_crypto_hash_init(cfs_alg, NULL, 0);
923 CERROR("Unable to initialize checksum hash %s\n",
924 cfs_crypto_hash_name(cfs_alg));
925 return PTR_ERR(hdesc);
928 while (nob > 0 && pg_count > 0) {
929 unsigned int count = pga[i]->count > nob ? nob : pga[i]->count;
931 /* corrupt the data before we compute the checksum, to
932 * simulate an OST->client data error */
933 if (i == 0 && opc == OST_READ &&
934 OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
935 unsigned char *ptr = kmap(pga[i]->pg);
936 int off = pga[i]->off & ~PAGE_MASK;
938 memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob));
941 cfs_crypto_hash_update_page(hdesc, pga[i]->pg,
942 pga[i]->off & ~PAGE_MASK,
944 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d\n",
945 (int)(pga[i]->off & ~PAGE_MASK));
947 nob -= pga[i]->count;
952 bufsize = sizeof(cksum);
953 err = cfs_crypto_hash_final(hdesc, (unsigned char *)&cksum, &bufsize);
955 /* For sending we only compute the wrong checksum instead
956 * of corrupting the data so it is still correct on a redo */
957 if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
964 osc_brw_prep_request(int cmd, struct client_obd *cli, struct obdo *oa,
965 u32 page_count, struct brw_page **pga,
966 struct ptlrpc_request **reqp, int resend)
968 struct ptlrpc_request *req;
969 struct ptlrpc_bulk_desc *desc;
970 struct ost_body *body;
971 struct obd_ioobj *ioobj;
972 struct niobuf_remote *niobuf;
973 int niocount, i, requested_nob, opc, rc;
974 struct osc_brw_async_args *aa;
975 struct req_capsule *pill;
976 struct brw_page *pg_prev;
979 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
980 RETURN(-ENOMEM); /* Recoverable */
981 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
982 RETURN(-EINVAL); /* Fatal */
984 if ((cmd & OBD_BRW_WRITE) != 0) {
986 req = ptlrpc_request_alloc_pool(cli->cl_import,
987 cli->cl_import->imp_rq_pool,
991 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
996 for (niocount = i = 1; i < page_count; i++) {
997 if (!can_merge_pages(pga[i - 1], pga[i]))
1001 pill = &req->rq_pill;
1002 req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1004 req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1005 niocount * sizeof(*niobuf));
1007 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1009 ptlrpc_request_free(req);
1012 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1013 ptlrpc_at_set_req_timeout(req);
1014 /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1016 req->rq_no_retry_einprogress = 1;
1018 desc = ptlrpc_prep_bulk_imp(req, page_count,
1019 cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
1020 (opc == OST_WRITE ? PTLRPC_BULK_GET_SOURCE :
1021 PTLRPC_BULK_PUT_SINK) |
1022 PTLRPC_BULK_BUF_KIOV,
1024 &ptlrpc_bulk_kiov_pin_ops);
1027 GOTO(out, rc = -ENOMEM);
1028 /* NB request now owns desc and will free it when it gets freed */
1030 body = req_capsule_client_get(pill, &RMF_OST_BODY);
1031 ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1032 niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1033 LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1035 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
1037 obdo_to_ioobj(oa, ioobj);
1038 ioobj->ioo_bufcnt = niocount;
1039 /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
1040 * that might be send for this request. The actual number is decided
1041 * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
1042 * "max - 1" for old client compatibility sending "0", and also so the
1043 * the actual maximum is a power-of-two number, not one less. LU-1431 */
1044 ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
1045 LASSERT(page_count > 0);
1047 for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1048 struct brw_page *pg = pga[i];
1049 int poff = pg->off & ~PAGE_MASK;
1051 LASSERT(pg->count > 0);
1052 /* make sure there is no gap in the middle of page array */
1053 LASSERTF(page_count == 1 ||
1054 (ergo(i == 0, poff + pg->count == PAGE_CACHE_SIZE) &&
1055 ergo(i > 0 && i < page_count - 1,
1056 poff == 0 && pg->count == PAGE_CACHE_SIZE) &&
1057 ergo(i == page_count - 1, poff == 0)),
1058 "i: %d/%d pg: %p off: "LPU64", count: %u\n",
1059 i, page_count, pg, pg->off, pg->count);
1060 LASSERTF(i == 0 || pg->off > pg_prev->off,
1061 "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1062 " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1064 pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1065 pg_prev->pg, page_private(pg_prev->pg),
1066 pg_prev->pg->index, pg_prev->off);
1067 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1068 (pg->flag & OBD_BRW_SRVLOCK));
1070 desc->bd_frag_ops->add_kiov_frag(desc, pg->pg, poff, pg->count);
1071 requested_nob += pg->count;
1073 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1075 niobuf->rnb_len += pg->count;
1077 niobuf->rnb_offset = pg->off;
1078 niobuf->rnb_len = pg->count;
1079 niobuf->rnb_flags = pg->flag;
1084 LASSERTF((void *)(niobuf - niocount) ==
1085 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1086 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1087 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1089 osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1091 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1092 body->oa.o_valid |= OBD_MD_FLFLAGS;
1093 body->oa.o_flags = 0;
1095 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1098 if (osc_should_shrink_grant(cli))
1099 osc_shrink_grant_local(cli, &body->oa);
1101 /* size[REQ_REC_OFF] still sizeof (*body) */
1102 if (opc == OST_WRITE) {
1103 if (cli->cl_checksum &&
1104 !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1105 /* store cl_cksum_type in a local variable since
1106 * it can be changed via lprocfs */
1107 cksum_type_t cksum_type = cli->cl_cksum_type;
1109 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1110 oa->o_flags &= OBD_FL_LOCAL_MASK;
1111 body->oa.o_flags = 0;
1113 body->oa.o_flags |= cksum_type_pack(cksum_type);
1114 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1115 body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1119 CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1121 /* save this in 'oa', too, for later checking */
1122 oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1123 oa->o_flags |= cksum_type_pack(cksum_type);
1125 /* clear out the checksum flag, in case this is a
1126 * resend but cl_checksum is no longer set. b=11238 */
1127 oa->o_valid &= ~OBD_MD_FLCKSUM;
1129 oa->o_cksum = body->oa.o_cksum;
1130 /* 1 RC per niobuf */
1131 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1132 sizeof(__u32) * niocount);
1134 if (cli->cl_checksum &&
1135 !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1136 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1137 body->oa.o_flags = 0;
1138 body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1139 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1142 ptlrpc_request_set_replen(req);
1144 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1145 aa = ptlrpc_req_async_args(req);
1147 aa->aa_requested_nob = requested_nob;
1148 aa->aa_nio_count = niocount;
1149 aa->aa_page_count = page_count;
1153 INIT_LIST_HEAD(&aa->aa_oaps);
1156 niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1157 CDEBUG(D_RPCTRACE, "brw rpc %p - object "DOSTID" offset %lld<>%lld\n",
1158 req, POSTID(&oa->o_oi), niobuf[0].rnb_offset,
1159 niobuf[niocount - 1].rnb_offset + niobuf[niocount - 1].rnb_len);
1163 ptlrpc_req_finished(req);
1167 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1168 __u32 client_cksum, __u32 server_cksum, int nob,
1169 size_t page_count, struct brw_page **pga,
1170 cksum_type_t client_cksum_type)
1174 cksum_type_t cksum_type;
1176 if (server_cksum == client_cksum) {
1177 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1181 cksum_type = cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1183 new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1186 if (cksum_type != client_cksum_type)
1187 msg = "the server did not use the checksum type specified in "
1188 "the original request - likely a protocol problem";
1189 else if (new_cksum == server_cksum)
1190 msg = "changed on the client after we checksummed it - "
1191 "likely false positive due to mmap IO (bug 11742)";
1192 else if (new_cksum == client_cksum)
1193 msg = "changed in transit before arrival at OST";
1195 msg = "changed in transit AND doesn't match the original - "
1196 "likely false positive due to mmap IO (bug 11742)";
1198 LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inode "DFID
1199 " object "DOSTID" extent ["LPU64"-"LPU64"]\n",
1200 msg, libcfs_nid2str(peer->nid),
1201 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1202 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1203 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1204 POSTID(&oa->o_oi), pga[0]->off,
1205 pga[page_count-1]->off + pga[page_count-1]->count - 1);
1206 CERROR("original client csum %x (type %x), server csum %x (type %x), "
1207 "client csum now %x\n", client_cksum, client_cksum_type,
1208 server_cksum, cksum_type, new_cksum);
1212 /* Note rc enters this function as number of bytes transferred */
1213 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1215 struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1216 const lnet_process_id_t *peer =
1217 &req->rq_import->imp_connection->c_peer;
1218 struct client_obd *cli = aa->aa_cli;
1219 struct ost_body *body;
1220 u32 client_cksum = 0;
1223 if (rc < 0 && rc != -EDQUOT) {
1224 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1228 LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1229 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1231 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1235 /* set/clear over quota flag for a uid/gid */
1236 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1237 body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) {
1238 unsigned int qid[MAXQUOTAS] = { body->oa.o_uid, body->oa.o_gid };
1240 CDEBUG(D_QUOTA, "setdq for [%u %u] with valid "LPX64", flags %x\n",
1241 body->oa.o_uid, body->oa.o_gid, body->oa.o_valid,
1243 osc_quota_setdq(cli, qid, body->oa.o_valid, body->oa.o_flags);
1246 osc_update_grant(cli, body);
1251 if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1252 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1254 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1256 CERROR("Unexpected +ve rc %d\n", rc);
1259 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1261 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1264 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1265 check_write_checksum(&body->oa, peer, client_cksum,
1266 body->oa.o_cksum, aa->aa_requested_nob,
1267 aa->aa_page_count, aa->aa_ppga,
1268 cksum_type_unpack(aa->aa_oa->o_flags)))
1271 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1272 aa->aa_page_count, aa->aa_ppga);
1276 /* The rest of this function executes only for OST_READs */
1278 /* if unwrap_bulk failed, return -EAGAIN to retry */
1279 rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1281 GOTO(out, rc = -EAGAIN);
1283 if (rc > aa->aa_requested_nob) {
1284 CERROR("Unexpected rc %d (%d requested)\n", rc,
1285 aa->aa_requested_nob);
1289 if (rc != req->rq_bulk->bd_nob_transferred) {
1290 CERROR ("Unexpected rc %d (%d transferred)\n",
1291 rc, req->rq_bulk->bd_nob_transferred);
1295 if (rc < aa->aa_requested_nob)
1296 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1298 if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1299 static int cksum_counter;
1300 u32 server_cksum = body->oa.o_cksum;
1303 cksum_type_t cksum_type;
1305 cksum_type = cksum_type_unpack(body->oa.o_valid &OBD_MD_FLFLAGS?
1306 body->oa.o_flags : 0);
1307 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1308 aa->aa_ppga, OST_READ,
1311 if (peer->nid != req->rq_bulk->bd_sender) {
1313 router = libcfs_nid2str(req->rq_bulk->bd_sender);
1316 if (server_cksum != client_cksum) {
1317 LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1318 "%s%s%s inode "DFID" object "DOSTID
1319 " extent ["LPU64"-"LPU64"]\n",
1320 req->rq_import->imp_obd->obd_name,
1321 libcfs_nid2str(peer->nid),
1323 body->oa.o_valid & OBD_MD_FLFID ?
1324 body->oa.o_parent_seq : (__u64)0,
1325 body->oa.o_valid & OBD_MD_FLFID ?
1326 body->oa.o_parent_oid : 0,
1327 body->oa.o_valid & OBD_MD_FLFID ?
1328 body->oa.o_parent_ver : 0,
1329 POSTID(&body->oa.o_oi),
1330 aa->aa_ppga[0]->off,
1331 aa->aa_ppga[aa->aa_page_count-1]->off +
1332 aa->aa_ppga[aa->aa_page_count-1]->count -
1334 CERROR("client %x, server %x, cksum_type %x\n",
1335 client_cksum, server_cksum, cksum_type);
1337 aa->aa_oa->o_cksum = client_cksum;
1341 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1344 } else if (unlikely(client_cksum)) {
1345 static int cksum_missed;
1348 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1349 CERROR("Checksum %u requested from %s but not sent\n",
1350 cksum_missed, libcfs_nid2str(peer->nid));
1356 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
1357 aa->aa_oa, &body->oa);
1362 static int osc_brw_redo_request(struct ptlrpc_request *request,
1363 struct osc_brw_async_args *aa, int rc)
1365 struct ptlrpc_request *new_req;
1366 struct osc_brw_async_args *new_aa;
1367 struct osc_async_page *oap;
1370 DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
1371 "redo for recoverable error %d", rc);
1373 rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1374 OST_WRITE ? OBD_BRW_WRITE : OBD_BRW_READ,
1375 aa->aa_cli, aa->aa_oa, aa->aa_page_count,
1376 aa->aa_ppga, &new_req, 1);
1380 list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1381 if (oap->oap_request != NULL) {
1382 LASSERTF(request == oap->oap_request,
1383 "request %p != oap_request %p\n",
1384 request, oap->oap_request);
1385 if (oap->oap_interrupted) {
1386 ptlrpc_req_finished(new_req);
1391 /* New request takes over pga and oaps from old request.
1392 * Note that copying a list_head doesn't work, need to move it... */
1394 new_req->rq_interpret_reply = request->rq_interpret_reply;
1395 new_req->rq_async_args = request->rq_async_args;
1396 new_req->rq_commit_cb = request->rq_commit_cb;
1397 /* cap resend delay to the current request timeout, this is similar to
1398 * what ptlrpc does (see after_reply()) */
1399 if (aa->aa_resends > new_req->rq_timeout)
1400 new_req->rq_sent = cfs_time_current_sec() + new_req->rq_timeout;
1402 new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
1403 new_req->rq_generation_set = 1;
1404 new_req->rq_import_generation = request->rq_import_generation;
1406 new_aa = ptlrpc_req_async_args(new_req);
1408 INIT_LIST_HEAD(&new_aa->aa_oaps);
1409 list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
1410 INIT_LIST_HEAD(&new_aa->aa_exts);
1411 list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
1412 new_aa->aa_resends = aa->aa_resends;
1414 list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1415 if (oap->oap_request) {
1416 ptlrpc_req_finished(oap->oap_request);
1417 oap->oap_request = ptlrpc_request_addref(new_req);
1421 /* XXX: This code will run into problem if we're going to support
1422 * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
1423 * and wait for all of them to be finished. We should inherit request
1424 * set from old request. */
1425 ptlrpcd_add_req(new_req, PDL_POLICY_SAME, -1);
1427 DEBUG_REQ(D_INFO, new_req, "new request");
1432 * ugh, we want disk allocation on the target to happen in offset order. we'll
1433 * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1434 * fine for our small page arrays and doesn't require allocation. its an
1435 * insertion sort that swaps elements that are strides apart, shrinking the
1436 * stride down until its '1' and the array is sorted.
1438 static void sort_brw_pages(struct brw_page **array, int num)
1441 struct brw_page *tmp;
1445 for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1450 for (i = stride ; i < num ; i++) {
1453 while (j >= stride && array[j - stride]->off > tmp->off) {
1454 array[j] = array[j - stride];
1459 } while (stride > 1);
1462 static void osc_release_ppga(struct brw_page **ppga, size_t count)
1464 LASSERT(ppga != NULL);
1465 OBD_FREE(ppga, sizeof(*ppga) * count);
1468 static int brw_interpret(const struct lu_env *env,
1469 struct ptlrpc_request *req, void *data, int rc)
1471 struct osc_brw_async_args *aa = data;
1472 struct osc_extent *ext;
1473 struct osc_extent *tmp;
1474 struct client_obd *cli = aa->aa_cli;
1477 rc = osc_brw_fini_request(req, rc);
1478 CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
1479 /* When server return -EINPROGRESS, client should always retry
1480 * regardless of the number of times the bulk was resent already. */
1481 if (osc_recoverable_error(rc)) {
1482 if (req->rq_import_generation !=
1483 req->rq_import->imp_generation) {
1484 CDEBUG(D_HA, "%s: resend cross eviction for object: "
1485 ""DOSTID", rc = %d.\n",
1486 req->rq_import->imp_obd->obd_name,
1487 POSTID(&aa->aa_oa->o_oi), rc);
1488 } else if (rc == -EINPROGRESS ||
1489 client_should_resend(aa->aa_resends, aa->aa_cli)) {
1490 rc = osc_brw_redo_request(req, aa, rc);
1492 CERROR("%s: too many resent retries for object: "
1493 ""LPU64":"LPU64", rc = %d.\n",
1494 req->rq_import->imp_obd->obd_name,
1495 POSTID(&aa->aa_oa->o_oi), rc);
1500 else if (rc == -EAGAIN || rc == -EINPROGRESS)
1505 struct obdo *oa = aa->aa_oa;
1506 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
1507 unsigned long valid = 0;
1508 struct cl_object *obj;
1509 struct osc_async_page *last;
1511 last = brw_page2oap(aa->aa_ppga[aa->aa_page_count - 1]);
1512 obj = osc2cl(last->oap_obj);
1514 cl_object_attr_lock(obj);
1515 if (oa->o_valid & OBD_MD_FLBLOCKS) {
1516 attr->cat_blocks = oa->o_blocks;
1517 valid |= CAT_BLOCKS;
1519 if (oa->o_valid & OBD_MD_FLMTIME) {
1520 attr->cat_mtime = oa->o_mtime;
1523 if (oa->o_valid & OBD_MD_FLATIME) {
1524 attr->cat_atime = oa->o_atime;
1527 if (oa->o_valid & OBD_MD_FLCTIME) {
1528 attr->cat_ctime = oa->o_ctime;
1532 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1533 struct lov_oinfo *loi = cl2osc(obj)->oo_oinfo;
1534 loff_t last_off = last->oap_count + last->oap_obj_off +
1537 /* Change file size if this is an out of quota or
1538 * direct IO write and it extends the file size */
1539 if (loi->loi_lvb.lvb_size < last_off) {
1540 attr->cat_size = last_off;
1543 /* Extend KMS if it's not a lockless write */
1544 if (loi->loi_kms < last_off &&
1545 oap2osc_page(last)->ops_srvlock == 0) {
1546 attr->cat_kms = last_off;
1552 cl_object_attr_update(env, obj, attr, valid);
1553 cl_object_attr_unlock(obj);
1555 OBDO_FREE(aa->aa_oa);
1557 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE && rc == 0)
1558 osc_inc_unstable_pages(req);
1560 list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
1561 list_del_init(&ext->oe_link);
1562 osc_extent_finish(env, ext, 1, rc);
1564 LASSERT(list_empty(&aa->aa_exts));
1565 LASSERT(list_empty(&aa->aa_oaps));
1567 cl_req_completion(env, aa->aa_clerq, rc < 0 ? rc :
1568 req->rq_bulk->bd_nob_transferred);
1569 osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1570 ptlrpc_lprocfs_brw(req, req->rq_bulk->bd_nob_transferred);
1572 spin_lock(&cli->cl_loi_list_lock);
1573 /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
1574 * is called so we know whether to go to sync BRWs or wait for more
1575 * RPCs to complete */
1576 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
1577 cli->cl_w_in_flight--;
1579 cli->cl_r_in_flight--;
1580 osc_wake_cache_waiters(cli);
1581 spin_unlock(&cli->cl_loi_list_lock);
1583 osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
1587 static void brw_commit(struct ptlrpc_request *req)
1589 /* If osc_inc_unstable_pages (via osc_extent_finish) races with
1590 * this called via the rq_commit_cb, I need to ensure
1591 * osc_dec_unstable_pages is still called. Otherwise unstable
1592 * pages may be leaked. */
1593 spin_lock(&req->rq_lock);
1594 if (likely(req->rq_unstable)) {
1595 req->rq_unstable = 0;
1596 spin_unlock(&req->rq_lock);
1598 osc_dec_unstable_pages(req);
1600 req->rq_committed = 1;
1601 spin_unlock(&req->rq_lock);
1606 * Build an RPC by the list of extent @ext_list. The caller must ensure
1607 * that the total pages in this list are NOT over max pages per RPC.
1608 * Extents in the list must be in OES_RPC state.
1610 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
1611 struct list_head *ext_list, int cmd, pdl_policy_t pol)
1613 struct ptlrpc_request *req = NULL;
1614 struct osc_extent *ext;
1615 struct brw_page **pga = NULL;
1616 struct osc_brw_async_args *aa = NULL;
1617 struct obdo *oa = NULL;
1618 struct osc_async_page *oap;
1619 struct osc_async_page *tmp;
1620 struct cl_req *clerq = NULL;
1621 enum cl_req_type crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE :
1623 struct cl_req_attr *crattr = NULL;
1624 loff_t starting_offset = OBD_OBJECT_EOF;
1625 loff_t ending_offset = 0;
1629 bool soft_sync = false;
1632 struct list_head rpc_list = LIST_HEAD_INIT(rpc_list);
1633 struct ost_body *body;
1635 LASSERT(!list_empty(ext_list));
1637 /* add pages into rpc_list to build BRW rpc */
1638 list_for_each_entry(ext, ext_list, oe_link) {
1639 LASSERT(ext->oe_state == OES_RPC);
1640 mem_tight |= ext->oe_memalloc;
1641 list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
1643 list_add_tail(&oap->oap_rpc_item, &rpc_list);
1644 if (starting_offset == OBD_OBJECT_EOF ||
1645 starting_offset > oap->oap_obj_off)
1646 starting_offset = oap->oap_obj_off;
1648 LASSERT(oap->oap_page_off == 0);
1649 if (ending_offset < oap->oap_obj_off + oap->oap_count)
1650 ending_offset = oap->oap_obj_off +
1653 LASSERT(oap->oap_page_off + oap->oap_count ==
1658 soft_sync = osc_over_unstable_soft_limit(cli);
1660 mpflag = cfs_memory_pressure_get_and_set();
1662 OBD_ALLOC(crattr, sizeof(*crattr));
1664 GOTO(out, rc = -ENOMEM);
1666 OBD_ALLOC(pga, sizeof(*pga) * page_count);
1668 GOTO(out, rc = -ENOMEM);
1672 GOTO(out, rc = -ENOMEM);
1675 list_for_each_entry(oap, &rpc_list, oap_rpc_item) {
1676 struct cl_page *page = oap2cl_page(oap);
1677 if (clerq == NULL) {
1678 clerq = cl_req_alloc(env, page, crt,
1679 1 /* only 1-object rpcs for now */);
1681 GOTO(out, rc = PTR_ERR(clerq));
1684 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
1686 oap->oap_brw_flags |= OBD_BRW_SOFT_SYNC;
1687 pga[i] = &oap->oap_brw_page;
1688 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
1689 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
1690 pga[i]->pg, page_index(oap->oap_page), oap,
1693 cl_req_page_add(env, clerq, page);
1696 /* always get the data for the obdo for the rpc */
1697 LASSERT(clerq != NULL);
1698 crattr->cra_oa = oa;
1699 cl_req_attr_set(env, clerq, crattr, ~0ULL);
1701 rc = cl_req_prep(env, clerq);
1703 CERROR("cl_req_prep failed: %d\n", rc);
1707 sort_brw_pages(pga, page_count);
1708 rc = osc_brw_prep_request(cmd, cli, oa, page_count, pga, &req, 0);
1710 CERROR("prep_req failed: %d\n", rc);
1714 req->rq_commit_cb = brw_commit;
1715 req->rq_interpret_reply = brw_interpret;
1718 req->rq_memalloc = 1;
1720 /* Need to update the timestamps after the request is built in case
1721 * we race with setattr (locally or in queue at OST). If OST gets
1722 * later setattr before earlier BRW (as determined by the request xid),
1723 * the OST will not use BRW timestamps. Sadly, there is no obvious
1724 * way to do this in a single call. bug 10150 */
1725 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
1726 crattr->cra_oa = &body->oa;
1727 cl_req_attr_set(env, clerq, crattr,
1728 OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME);
1730 lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid);
1732 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1733 aa = ptlrpc_req_async_args(req);
1734 INIT_LIST_HEAD(&aa->aa_oaps);
1735 list_splice_init(&rpc_list, &aa->aa_oaps);
1736 INIT_LIST_HEAD(&aa->aa_exts);
1737 list_splice_init(ext_list, &aa->aa_exts);
1738 aa->aa_clerq = clerq;
1740 /* queued sync pages can be torn down while the pages
1741 * were between the pending list and the rpc */
1743 list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1744 /* only one oap gets a request reference */
1747 if (oap->oap_interrupted && !req->rq_intr) {
1748 CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
1750 ptlrpc_mark_interrupted(req);
1754 tmp->oap_request = ptlrpc_request_addref(req);
1756 spin_lock(&cli->cl_loi_list_lock);
1757 starting_offset >>= PAGE_CACHE_SHIFT;
1758 if (cmd == OBD_BRW_READ) {
1759 cli->cl_r_in_flight++;
1760 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
1761 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
1762 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
1763 starting_offset + 1);
1765 cli->cl_w_in_flight++;
1766 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
1767 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
1768 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
1769 starting_offset + 1);
1771 spin_unlock(&cli->cl_loi_list_lock);
1773 DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %ur/%uw in flight",
1774 page_count, aa, cli->cl_r_in_flight,
1775 cli->cl_w_in_flight);
1777 /* XXX: Maybe the caller can check the RPC bulk descriptor to
1778 * see which CPU/NUMA node the majority of pages were allocated
1779 * on, and try to assign the async RPC to the CPU core
1780 * (PDL_POLICY_PREFERRED) to reduce cross-CPU memory traffic.
1782 * But on the other hand, we expect that multiple ptlrpcd
1783 * threads and the initial write sponsor can run in parallel,
1784 * especially when data checksum is enabled, which is CPU-bound
1785 * operation and single ptlrpcd thread cannot process in time.
1786 * So more ptlrpcd threads sharing BRW load
1787 * (with PDL_POLICY_ROUND) seems better.
1789 ptlrpcd_add_req(req, pol, -1);
1795 cfs_memory_pressure_restore(mpflag);
1798 OBD_FREE(crattr, sizeof(*crattr));
1801 LASSERT(req == NULL);
1806 OBD_FREE(pga, sizeof(*pga) * page_count);
1807 /* this should happen rarely and is pretty bad, it makes the
1808 * pending list not follow the dirty order */
1809 while (!list_empty(ext_list)) {
1810 ext = list_entry(ext_list->next, struct osc_extent,
1812 list_del_init(&ext->oe_link);
1813 osc_extent_finish(env, ext, 0, rc);
1815 if (clerq && !IS_ERR(clerq))
1816 cl_req_completion(env, clerq, rc);
1821 static int osc_set_lock_data_with_check(struct ldlm_lock *lock,
1822 struct ldlm_enqueue_info *einfo)
1824 void *data = einfo->ei_cbdata;
1827 LASSERT(lock != NULL);
1828 LASSERT(lock->l_blocking_ast == einfo->ei_cb_bl);
1829 LASSERT(lock->l_resource->lr_type == einfo->ei_type);
1830 LASSERT(lock->l_completion_ast == einfo->ei_cb_cp);
1831 LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl);
1833 lock_res_and_lock(lock);
1835 if (lock->l_ast_data == NULL)
1836 lock->l_ast_data = data;
1837 if (lock->l_ast_data == data)
1840 unlock_res_and_lock(lock);
1845 static int osc_set_data_with_check(struct lustre_handle *lockh,
1846 struct ldlm_enqueue_info *einfo)
1848 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
1852 set = osc_set_lock_data_with_check(lock, einfo);
1853 LDLM_LOCK_PUT(lock);
1855 CERROR("lockh %p, data %p - client evicted?\n",
1856 lockh, einfo->ei_cbdata);
1860 static int osc_enqueue_fini(struct ptlrpc_request *req,
1861 osc_enqueue_upcall_f upcall, void *cookie,
1862 struct lustre_handle *lockh, ldlm_mode_t mode,
1863 __u64 *flags, int agl, int errcode)
1865 bool intent = *flags & LDLM_FL_HAS_INTENT;
1869 /* The request was created before ldlm_cli_enqueue call. */
1870 if (intent && errcode == ELDLM_LOCK_ABORTED) {
1871 struct ldlm_reply *rep;
1873 rep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
1874 LASSERT(rep != NULL);
1876 rep->lock_policy_res1 =
1877 ptlrpc_status_ntoh(rep->lock_policy_res1);
1878 if (rep->lock_policy_res1)
1879 errcode = rep->lock_policy_res1;
1881 *flags |= LDLM_FL_LVB_READY;
1882 } else if (errcode == ELDLM_OK) {
1883 *flags |= LDLM_FL_LVB_READY;
1886 /* Call the update callback. */
1887 rc = (*upcall)(cookie, lockh, errcode);
1889 /* release the reference taken in ldlm_cli_enqueue() */
1890 if (errcode == ELDLM_LOCK_MATCHED)
1892 if (errcode == ELDLM_OK && lustre_handle_is_used(lockh))
1893 ldlm_lock_decref(lockh, mode);
1898 static int osc_enqueue_interpret(const struct lu_env *env,
1899 struct ptlrpc_request *req,
1900 struct osc_enqueue_args *aa, int rc)
1902 struct ldlm_lock *lock;
1903 struct lustre_handle *lockh = &aa->oa_lockh;
1904 ldlm_mode_t mode = aa->oa_mode;
1905 struct ost_lvb *lvb = aa->oa_lvb;
1906 __u32 lvb_len = sizeof(*lvb);
1911 /* ldlm_cli_enqueue is holding a reference on the lock, so it must
1913 lock = ldlm_handle2lock(lockh);
1914 LASSERTF(lock != NULL,
1915 "lockh "LPX64", req %p, aa %p - client evicted?\n",
1916 lockh->cookie, req, aa);
1918 /* Take an additional reference so that a blocking AST that
1919 * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
1920 * to arrive after an upcall has been executed by
1921 * osc_enqueue_fini(). */
1922 ldlm_lock_addref(lockh, mode);
1924 /* Let cl_lock_state_wait fail with -ERESTARTSYS to unuse sublocks. */
1925 OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_HANG, 2);
1927 /* Let CP AST to grant the lock first. */
1928 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
1931 LASSERT(aa->oa_lvb == NULL);
1932 LASSERT(aa->oa_flags == NULL);
1933 aa->oa_flags = &flags;
1936 /* Complete obtaining the lock procedure. */
1937 rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_type, 1,
1938 aa->oa_mode, aa->oa_flags, lvb, lvb_len,
1940 /* Complete osc stuff. */
1941 rc = osc_enqueue_fini(req, aa->oa_upcall, aa->oa_cookie, lockh, mode,
1942 aa->oa_flags, aa->oa_agl, rc);
1944 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
1946 ldlm_lock_decref(lockh, mode);
1947 LDLM_LOCK_PUT(lock);
1951 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
1953 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
1954 * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
1955 * other synchronous requests, however keeping some locks and trying to obtain
1956 * others may take a considerable amount of time in a case of ost failure; and
1957 * when other sync requests do not get released lock from a client, the client
1958 * is evicted from the cluster -- such scenarious make the life difficult, so
1959 * release locks just after they are obtained. */
1960 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
1961 __u64 *flags, ldlm_policy_data_t *policy,
1962 struct ost_lvb *lvb, int kms_valid,
1963 osc_enqueue_upcall_f upcall, void *cookie,
1964 struct ldlm_enqueue_info *einfo,
1965 struct ptlrpc_request_set *rqset, int async, int agl)
1967 struct obd_device *obd = exp->exp_obd;
1968 struct lustre_handle lockh = { 0 };
1969 struct ptlrpc_request *req = NULL;
1970 int intent = *flags & LDLM_FL_HAS_INTENT;
1971 __u64 match_lvb = agl ? 0 : LDLM_FL_LVB_READY;
1976 /* Filesystem lock extents are extended to page boundaries so that
1977 * dealing with the page cache is a little smoother. */
1978 policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
1979 policy->l_extent.end |= ~PAGE_MASK;
1982 * kms is not valid when either object is completely fresh (so that no
1983 * locks are cached), or object was evicted. In the latter case cached
1984 * lock cannot be used, because it would prime inode state with
1985 * potentially stale LVB.
1990 /* Next, search for already existing extent locks that will cover us */
1991 /* If we're trying to read, we also search for an existing PW lock. The
1992 * VFS and page cache already protect us locally, so lots of readers/
1993 * writers can share a single PW lock.
1995 * There are problems with conversion deadlocks, so instead of
1996 * converting a read lock to a write lock, we'll just enqueue a new
1999 * At some point we should cancel the read lock instead of making them
2000 * send us a blocking callback, but there are problems with canceling
2001 * locks out from other users right now, too. */
2002 mode = einfo->ei_mode;
2003 if (einfo->ei_mode == LCK_PR)
2005 mode = ldlm_lock_match(obd->obd_namespace, *flags | match_lvb, res_id,
2006 einfo->ei_type, policy, mode, &lockh, 0);
2008 struct ldlm_lock *matched;
2010 if (*flags & LDLM_FL_TEST_LOCK)
2013 matched = ldlm_handle2lock(&lockh);
2015 /* AGL enqueues DLM locks speculatively. Therefore if
2016 * it already exists a DLM lock, it wll just inform the
2017 * caller to cancel the AGL process for this stripe. */
2018 ldlm_lock_decref(&lockh, mode);
2019 LDLM_LOCK_PUT(matched);
2021 } else if (osc_set_lock_data_with_check(matched, einfo)) {
2022 *flags |= LDLM_FL_LVB_READY;
2024 /* We already have a lock, and it's referenced. */
2025 (*upcall)(cookie, &lockh, ELDLM_LOCK_MATCHED);
2027 ldlm_lock_decref(&lockh, mode);
2028 LDLM_LOCK_PUT(matched);
2031 ldlm_lock_decref(&lockh, mode);
2032 LDLM_LOCK_PUT(matched);
2037 if (*flags & LDLM_FL_TEST_LOCK)
2041 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2042 &RQF_LDLM_ENQUEUE_LVB);
2046 rc = ptlrpc_request_pack(req, LUSTRE_DLM_VERSION, LDLM_ENQUEUE);
2048 ptlrpc_request_free(req);
2052 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
2054 ptlrpc_request_set_replen(req);
2057 /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2058 *flags &= ~LDLM_FL_BLOCK_GRANTED;
2060 rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2061 sizeof(*lvb), LVB_T_OST, &lockh, async);
2064 struct osc_enqueue_args *aa;
2065 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2066 aa = ptlrpc_req_async_args(req);
2068 aa->oa_mode = einfo->ei_mode;
2069 aa->oa_type = einfo->ei_type;
2070 lustre_handle_copy(&aa->oa_lockh, &lockh);
2071 aa->oa_upcall = upcall;
2072 aa->oa_cookie = cookie;
2075 aa->oa_flags = flags;
2078 /* AGL is essentially to enqueue an DLM lock
2079 * in advance, so we don't care about the
2080 * result of AGL enqueue. */
2082 aa->oa_flags = NULL;
2085 req->rq_interpret_reply =
2086 (ptlrpc_interpterer_t)osc_enqueue_interpret;
2087 if (rqset == PTLRPCD_SET)
2088 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
2090 ptlrpc_set_add_req(rqset, req);
2091 } else if (intent) {
2092 ptlrpc_req_finished(req);
2097 rc = osc_enqueue_fini(req, upcall, cookie, &lockh, einfo->ei_mode,
2100 ptlrpc_req_finished(req);
2105 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2106 __u32 type, ldlm_policy_data_t *policy, __u32 mode,
2107 __u64 *flags, void *data, struct lustre_handle *lockh,
2110 struct obd_device *obd = exp->exp_obd;
2111 __u64 lflags = *flags;
2115 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2118 /* Filesystem lock extents are extended to page boundaries so that
2119 * dealing with the page cache is a little smoother */
2120 policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2121 policy->l_extent.end |= ~PAGE_MASK;
2123 /* Next, search for already existing extent locks that will cover us */
2124 /* If we're trying to read, we also search for an existing PW lock. The
2125 * VFS and page cache already protect us locally, so lots of readers/
2126 * writers can share a single PW lock. */
2130 rc = ldlm_lock_match(obd->obd_namespace, lflags,
2131 res_id, type, policy, rc, lockh, unref);
2134 if (!osc_set_data_with_check(lockh, data)) {
2135 if (!(lflags & LDLM_FL_TEST_LOCK))
2136 ldlm_lock_decref(lockh, rc);
2140 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
2141 ldlm_lock_addref(lockh, LCK_PR);
2142 ldlm_lock_decref(lockh, LCK_PW);
2149 static int osc_statfs_interpret(const struct lu_env *env,
2150 struct ptlrpc_request *req,
2151 struct osc_async_args *aa, int rc)
2153 struct obd_statfs *msfs;
2157 /* The request has in fact never been sent
2158 * due to issues at a higher level (LOV).
2159 * Exit immediately since the caller is
2160 * aware of the problem and takes care
2161 * of the clean up */
2164 if ((rc == -ENOTCONN || rc == -EAGAIN) &&
2165 (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
2171 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2173 GOTO(out, rc = -EPROTO);
2176 *aa->aa_oi->oi_osfs = *msfs;
2178 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2182 static int osc_statfs_async(struct obd_export *exp,
2183 struct obd_info *oinfo, __u64 max_age,
2184 struct ptlrpc_request_set *rqset)
2186 struct obd_device *obd = class_exp2obd(exp);
2187 struct ptlrpc_request *req;
2188 struct osc_async_args *aa;
2192 /* We could possibly pass max_age in the request (as an absolute
2193 * timestamp or a "seconds.usec ago") so the target can avoid doing
2194 * extra calls into the filesystem if that isn't necessary (e.g.
2195 * during mount that would help a bit). Having relative timestamps
2196 * is not so great if request processing is slow, while absolute
2197 * timestamps are not ideal because they need time synchronization. */
2198 req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
2202 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2204 ptlrpc_request_free(req);
2207 ptlrpc_request_set_replen(req);
2208 req->rq_request_portal = OST_CREATE_PORTAL;
2209 ptlrpc_at_set_req_timeout(req);
2211 if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
2212 /* procfs requests not want stat in wait for avoid deadlock */
2213 req->rq_no_resend = 1;
2214 req->rq_no_delay = 1;
2217 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
2218 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2219 aa = ptlrpc_req_async_args(req);
2222 ptlrpc_set_add_req(rqset, req);
2226 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
2227 struct obd_statfs *osfs, __u64 max_age, __u32 flags)
2229 struct obd_device *obd = class_exp2obd(exp);
2230 struct obd_statfs *msfs;
2231 struct ptlrpc_request *req;
2232 struct obd_import *imp = NULL;
2236 /*Since the request might also come from lprocfs, so we need
2237 *sync this with client_disconnect_export Bug15684*/
2238 down_read(&obd->u.cli.cl_sem);
2239 if (obd->u.cli.cl_import)
2240 imp = class_import_get(obd->u.cli.cl_import);
2241 up_read(&obd->u.cli.cl_sem);
2245 /* We could possibly pass max_age in the request (as an absolute
2246 * timestamp or a "seconds.usec ago") so the target can avoid doing
2247 * extra calls into the filesystem if that isn't necessary (e.g.
2248 * during mount that would help a bit). Having relative timestamps
2249 * is not so great if request processing is slow, while absolute
2250 * timestamps are not ideal because they need time synchronization. */
2251 req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
2253 class_import_put(imp);
2258 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2260 ptlrpc_request_free(req);
2263 ptlrpc_request_set_replen(req);
2264 req->rq_request_portal = OST_CREATE_PORTAL;
2265 ptlrpc_at_set_req_timeout(req);
2267 if (flags & OBD_STATFS_NODELAY) {
2268 /* procfs requests not want stat in wait for avoid deadlock */
2269 req->rq_no_resend = 1;
2270 req->rq_no_delay = 1;
2273 rc = ptlrpc_queue_wait(req);
2277 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2279 GOTO(out, rc = -EPROTO);
2286 ptlrpc_req_finished(req);
2290 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2291 void *karg, void *uarg)
2293 struct obd_device *obd = exp->exp_obd;
2294 struct obd_ioctl_data *data = karg;
2298 if (!try_module_get(THIS_MODULE)) {
2299 CERROR("%s: cannot get module '%s'\n", obd->obd_name,
2300 module_name(THIS_MODULE));
2304 case OBD_IOC_CLIENT_RECOVER:
2305 err = ptlrpc_recover_import(obd->u.cli.cl_import,
2306 data->ioc_inlbuf1, 0);
2310 case IOC_OSC_SET_ACTIVE:
2311 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
2314 case OBD_IOC_PING_TARGET:
2315 err = ptlrpc_obd_ping(obd);
2318 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
2319 cmd, current_comm());
2320 GOTO(out, err = -ENOTTY);
2323 module_put(THIS_MODULE);
2327 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
2328 u32 keylen, void *key,
2329 u32 vallen, void *val,
2330 struct ptlrpc_request_set *set)
2332 struct ptlrpc_request *req;
2333 struct obd_device *obd = exp->exp_obd;
2334 struct obd_import *imp = class_exp2cliimp(exp);
2339 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
2341 if (KEY_IS(KEY_CHECKSUM)) {
2342 if (vallen != sizeof(int))
2344 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
2348 if (KEY_IS(KEY_SPTLRPC_CONF)) {
2349 sptlrpc_conf_client_adapt(obd);
2353 if (KEY_IS(KEY_FLUSH_CTX)) {
2354 sptlrpc_import_flush_my_ctx(imp);
2358 if (KEY_IS(KEY_CACHE_SET)) {
2359 struct client_obd *cli = &obd->u.cli;
2361 LASSERT(cli->cl_cache == NULL); /* only once */
2362 cli->cl_cache = (struct cl_client_cache *)val;
2363 cl_cache_incref(cli->cl_cache);
2364 cli->cl_lru_left = &cli->cl_cache->ccc_lru_left;
2366 /* add this osc into entity list */
2367 LASSERT(list_empty(&cli->cl_lru_osc));
2368 spin_lock(&cli->cl_cache->ccc_lru_lock);
2369 list_add(&cli->cl_lru_osc, &cli->cl_cache->ccc_lru);
2370 spin_unlock(&cli->cl_cache->ccc_lru_lock);
2375 if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
2376 struct client_obd *cli = &obd->u.cli;
2377 long nr = atomic_long_read(&cli->cl_lru_in_list) >> 1;
2378 long target = *(long *)val;
2380 nr = osc_lru_shrink(env, cli, min(nr, target), true);
2385 if (!set && !KEY_IS(KEY_GRANT_SHRINK))
2388 /* We pass all other commands directly to OST. Since nobody calls osc
2389 methods directly and everybody is supposed to go through LOV, we
2390 assume lov checked invalid values for us.
2391 The only recognised values so far are evict_by_nid and mds_conn.
2392 Even if something bad goes through, we'd get a -EINVAL from OST
2395 req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ?
2396 &RQF_OST_SET_GRANT_INFO :
2401 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
2402 RCL_CLIENT, keylen);
2403 if (!KEY_IS(KEY_GRANT_SHRINK))
2404 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
2405 RCL_CLIENT, vallen);
2406 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
2408 ptlrpc_request_free(req);
2412 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
2413 memcpy(tmp, key, keylen);
2414 tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ?
2417 memcpy(tmp, val, vallen);
2419 if (KEY_IS(KEY_GRANT_SHRINK)) {
2420 struct osc_grant_args *aa;
2423 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2424 aa = ptlrpc_req_async_args(req);
2427 ptlrpc_req_finished(req);
2430 *oa = ((struct ost_body *)val)->oa;
2432 req->rq_interpret_reply = osc_shrink_grant_interpret;
2435 ptlrpc_request_set_replen(req);
2436 if (!KEY_IS(KEY_GRANT_SHRINK)) {
2437 LASSERT(set != NULL);
2438 ptlrpc_set_add_req(set, req);
2439 ptlrpc_check_set(NULL, set);
2441 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
2446 static int osc_reconnect(const struct lu_env *env,
2447 struct obd_export *exp, struct obd_device *obd,
2448 struct obd_uuid *cluuid,
2449 struct obd_connect_data *data,
2452 struct client_obd *cli = &obd->u.cli;
2454 if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
2457 spin_lock(&cli->cl_loi_list_lock);
2458 data->ocd_grant = (cli->cl_avail_grant +
2459 (cli->cl_dirty_pages << PAGE_CACHE_SHIFT)) ?:
2460 2 * cli_brw_size(obd);
2461 lost_grant = cli->cl_lost_grant;
2462 cli->cl_lost_grant = 0;
2463 spin_unlock(&cli->cl_loi_list_lock);
2465 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
2466 " ocd_grant: %d, lost: %ld.\n", data->ocd_connect_flags,
2467 data->ocd_version, data->ocd_grant, lost_grant);
2473 static int osc_disconnect(struct obd_export *exp)
2475 struct obd_device *obd = class_exp2obd(exp);
2478 rc = client_disconnect_export(exp);
2480 * Initially we put del_shrink_grant before disconnect_export, but it
2481 * causes the following problem if setup (connect) and cleanup
2482 * (disconnect) are tangled together.
2483 * connect p1 disconnect p2
2484 * ptlrpc_connect_import
2485 * ............... class_manual_cleanup
2488 * ptlrpc_connect_interrupt
2490 * add this client to shrink list
2492 * Bang! pinger trigger the shrink.
2493 * So the osc should be disconnected from the shrink list, after we
2494 * are sure the import has been destroyed. BUG18662
2496 if (obd->u.cli.cl_import == NULL)
2497 osc_del_shrink_grant(&obd->u.cli);
2501 static int osc_import_event(struct obd_device *obd,
2502 struct obd_import *imp,
2503 enum obd_import_event event)
2505 struct client_obd *cli;
2509 LASSERT(imp->imp_obd == obd);
2512 case IMP_EVENT_DISCON: {
2514 spin_lock(&cli->cl_loi_list_lock);
2515 cli->cl_avail_grant = 0;
2516 cli->cl_lost_grant = 0;
2517 spin_unlock(&cli->cl_loi_list_lock);
2520 case IMP_EVENT_INACTIVE: {
2521 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
2524 case IMP_EVENT_INVALIDATE: {
2525 struct ldlm_namespace *ns = obd->obd_namespace;
2529 env = cl_env_get(&refcheck);
2533 /* all pages go to failing rpcs due to the invalid
2535 osc_io_unplug(env, cli, NULL, PDL_POLICY_ROUND);
2537 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
2538 cl_env_put(env, &refcheck);
2543 case IMP_EVENT_ACTIVE: {
2544 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
2547 case IMP_EVENT_OCD: {
2548 struct obd_connect_data *ocd = &imp->imp_connect_data;
2550 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
2551 osc_init_grant(&obd->u.cli, ocd);
2554 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
2555 imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
2557 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
2560 case IMP_EVENT_DEACTIVATE: {
2561 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE, NULL);
2564 case IMP_EVENT_ACTIVATE: {
2565 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE, NULL);
2569 CERROR("Unknown import event %d\n", event);
2576 * Determine whether the lock can be canceled before replaying the lock
2577 * during recovery, see bug16774 for detailed information.
2579 * \retval zero the lock can't be canceled
2580 * \retval other ok to cancel
2582 static int osc_cancel_weight(struct ldlm_lock *lock)
2585 * Cancel all unused and granted extent lock.
2587 if (lock->l_resource->lr_type == LDLM_EXTENT &&
2588 lock->l_granted_mode == lock->l_req_mode &&
2589 osc_ldlm_weigh_ast(lock) == 0)
2595 static int brw_queue_work(const struct lu_env *env, void *data)
2597 struct client_obd *cli = data;
2599 CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
2601 osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
2605 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
2607 struct client_obd *cli = &obd->u.cli;
2608 struct obd_type *type;
2613 rc = ptlrpcd_addref();
2617 rc = client_obd_setup(obd, lcfg);
2619 GOTO(out_ptlrpcd, rc);
2621 handler = ptlrpcd_alloc_work(cli->cl_import, brw_queue_work, cli);
2622 if (IS_ERR(handler))
2623 GOTO(out_client_setup, rc = PTR_ERR(handler));
2624 cli->cl_writeback_work = handler;
2626 handler = ptlrpcd_alloc_work(cli->cl_import, lru_queue_work, cli);
2627 if (IS_ERR(handler))
2628 GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
2629 cli->cl_lru_work = handler;
2631 rc = osc_quota_setup(obd);
2633 GOTO(out_ptlrpcd_work, rc);
2635 cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
2637 #ifdef CONFIG_PROC_FS
2638 obd->obd_vars = lprocfs_osc_obd_vars;
2640 /* If this is true then both client (osc) and server (osp) are on the
2641 * same node. The osp layer if loaded first will register the osc proc
2642 * directory. In that case this obd_device will be attached its proc
2643 * tree to type->typ_procsym instead of obd->obd_type->typ_procroot. */
2644 type = class_search_type(LUSTRE_OSP_NAME);
2645 if (type && type->typ_procsym) {
2646 obd->obd_proc_entry = lprocfs_register(obd->obd_name,
2648 obd->obd_vars, obd);
2649 if (IS_ERR(obd->obd_proc_entry)) {
2650 rc = PTR_ERR(obd->obd_proc_entry);
2651 CERROR("error %d setting up lprocfs for %s\n", rc,
2653 obd->obd_proc_entry = NULL;
2656 rc = lprocfs_obd_setup(obd);
2659 /* If the basic OSC proc tree construction succeeded then
2660 * lets do the rest. */
2662 lproc_osc_attach_seqstat(obd);
2663 sptlrpc_lprocfs_cliobd_attach(obd);
2664 ptlrpc_lprocfs_register_obd(obd);
2667 /* We need to allocate a few requests more, because
2668 * brw_interpret tries to create new requests before freeing
2669 * previous ones, Ideally we want to have 2x max_rpcs_in_flight
2670 * reserved, but I'm afraid that might be too much wasted RAM
2671 * in fact, so 2 is just my guess and still should work. */
2672 cli->cl_import->imp_rq_pool =
2673 ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
2675 ptlrpc_add_rqs_to_pool);
2677 INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
2678 ns_register_cancel(obd->obd_namespace, osc_cancel_weight);
2682 if (cli->cl_writeback_work != NULL) {
2683 ptlrpcd_destroy_work(cli->cl_writeback_work);
2684 cli->cl_writeback_work = NULL;
2686 if (cli->cl_lru_work != NULL) {
2687 ptlrpcd_destroy_work(cli->cl_lru_work);
2688 cli->cl_lru_work = NULL;
2691 client_obd_cleanup(obd);
2697 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
2703 case OBD_CLEANUP_EARLY: {
2704 struct obd_import *imp;
2705 imp = obd->u.cli.cl_import;
2706 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
2707 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
2708 ptlrpc_deactivate_import(imp);
2709 spin_lock(&imp->imp_lock);
2710 imp->imp_pingable = 0;
2711 spin_unlock(&imp->imp_lock);
2714 case OBD_CLEANUP_EXPORTS: {
2715 struct client_obd *cli = &obd->u.cli;
2717 * for echo client, export may be on zombie list, wait for
2718 * zombie thread to cull it, because cli.cl_import will be
2719 * cleared in client_disconnect_export():
2720 * class_export_destroy() -> obd_cleanup() ->
2721 * echo_device_free() -> echo_client_cleanup() ->
2722 * obd_disconnect() -> osc_disconnect() ->
2723 * client_disconnect_export()
2725 obd_zombie_barrier();
2726 if (cli->cl_writeback_work) {
2727 ptlrpcd_destroy_work(cli->cl_writeback_work);
2728 cli->cl_writeback_work = NULL;
2730 if (cli->cl_lru_work) {
2731 ptlrpcd_destroy_work(cli->cl_lru_work);
2732 cli->cl_lru_work = NULL;
2734 obd_cleanup_client_import(obd);
2735 ptlrpc_lprocfs_unregister_obd(obd);
2736 lprocfs_obd_cleanup(obd);
2743 int osc_cleanup(struct obd_device *obd)
2745 struct client_obd *cli = &obd->u.cli;
2751 if (cli->cl_cache != NULL) {
2752 LASSERT(atomic_read(&cli->cl_cache->ccc_users) > 0);
2753 spin_lock(&cli->cl_cache->ccc_lru_lock);
2754 list_del_init(&cli->cl_lru_osc);
2755 spin_unlock(&cli->cl_cache->ccc_lru_lock);
2756 cli->cl_lru_left = NULL;
2757 cl_cache_decref(cli->cl_cache);
2758 cli->cl_cache = NULL;
2761 /* free memory of osc quota cache */
2762 osc_quota_cleanup(obd);
2764 rc = client_obd_cleanup(obd);
2770 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
2772 int rc = class_process_proc_param(PARAM_OSC, obd->obd_vars, lcfg, obd);
2773 return rc > 0 ? 0: rc;
2776 static int osc_process_config(struct obd_device *obd, size_t len, void *buf)
2778 return osc_process_config_base(obd, buf);
2781 static struct obd_ops osc_obd_ops = {
2782 .o_owner = THIS_MODULE,
2783 .o_setup = osc_setup,
2784 .o_precleanup = osc_precleanup,
2785 .o_cleanup = osc_cleanup,
2786 .o_add_conn = client_import_add_conn,
2787 .o_del_conn = client_import_del_conn,
2788 .o_connect = client_connect_import,
2789 .o_reconnect = osc_reconnect,
2790 .o_disconnect = osc_disconnect,
2791 .o_statfs = osc_statfs,
2792 .o_statfs_async = osc_statfs_async,
2793 .o_create = osc_create,
2794 .o_destroy = osc_destroy,
2795 .o_getattr = osc_getattr,
2796 .o_setattr = osc_setattr,
2797 .o_iocontrol = osc_iocontrol,
2798 .o_set_info_async = osc_set_info_async,
2799 .o_import_event = osc_import_event,
2800 .o_process_config = osc_process_config,
2801 .o_quotactl = osc_quotactl,
2804 static int __init osc_init(void)
2806 bool enable_proc = true;
2807 struct obd_type *type;
2811 /* print an address of _any_ initialized kernel symbol from this
2812 * module, to allow debugging with gdb that doesn't support data
2813 * symbols from modules.*/
2814 CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
2816 rc = lu_kmem_init(osc_caches);
2820 type = class_search_type(LUSTRE_OSP_NAME);
2821 if (type != NULL && type->typ_procsym != NULL)
2822 enable_proc = false;
2824 rc = class_register_type(&osc_obd_ops, NULL, enable_proc, NULL,
2825 LUSTRE_OSC_NAME, &osc_device_type);
2827 lu_kmem_fini(osc_caches);
2834 static void /*__exit*/ osc_exit(void)
2836 class_unregister_type(LUSTRE_OSC_NAME);
2837 lu_kmem_fini(osc_caches);
2840 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
2841 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
2842 MODULE_VERSION(LUSTRE_VERSION_STRING);
2843 MODULE_LICENSE("GPL");
2845 module_init(osc_init);
2846 module_exit(osc_exit);