4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
30 * Copyright (c) 2011, 2014, Intel Corporation.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
37 #define DEBUG_SUBSYSTEM S_OSC
39 #include <libcfs/libcfs.h>
41 #include <lustre_dlm.h>
42 #include <lustre_net.h>
43 #include <lustre/lustre_user.h>
44 #include <obd_cksum.h>
45 #include <lustre_ha.h>
46 #include <lprocfs_status.h>
47 #include <lustre_ioctl.h>
48 #include <lustre_debug.h>
49 #include <lustre_param.h>
50 #include <lustre_fid.h>
51 #include <obd_class.h>
52 #include "osc_internal.h"
53 #include "osc_cl_internal.h"
55 struct osc_brw_async_args {
61 struct brw_page **aa_ppga;
62 struct client_obd *aa_cli;
63 struct list_head aa_oaps;
64 struct list_head aa_exts;
65 struct obd_capa *aa_ocapa;
66 struct cl_req *aa_clerq;
69 #define osc_grant_args osc_brw_async_args
71 struct osc_setattr_args {
73 obd_enqueue_update_f sa_upcall;
77 struct osc_fsync_args {
78 struct obd_info *fa_oi;
79 obd_enqueue_update_f fa_upcall;
83 struct osc_enqueue_args {
84 struct obd_export *oa_exp;
88 osc_enqueue_upcall_f oa_upcall;
90 struct ost_lvb *oa_lvb;
91 struct lustre_handle oa_lockh;
92 unsigned int oa_agl:1;
95 static void osc_release_ppga(struct brw_page **ppga, size_t count);
96 static int brw_interpret(const struct lu_env *env, struct ptlrpc_request *req,
99 static inline void osc_pack_capa(struct ptlrpc_request *req,
100 struct ost_body *body, void *capa)
102 struct obd_capa *oc = (struct obd_capa *)capa;
103 struct lustre_capa *c;
108 c = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
111 body->oa.o_valid |= OBD_MD_FLOSSCAPA;
112 DEBUG_CAPA(D_SEC, c, "pack");
115 void osc_pack_req_body(struct ptlrpc_request *req, struct obd_info *oinfo)
117 struct ost_body *body;
119 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
122 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
124 osc_pack_capa(req, body, oinfo->oi_capa);
127 void osc_set_capa_size(struct ptlrpc_request *req,
128 const struct req_msg_field *field,
132 req_capsule_set_size(&req->rq_pill, field, RCL_CLIENT, 0);
134 /* it is already calculated as sizeof struct obd_capa */
138 int osc_getattr_interpret(const struct lu_env *env,
139 struct ptlrpc_request *req,
140 struct osc_async_args *aa, int rc)
142 struct ost_body *body;
148 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
150 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
151 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
152 aa->aa_oi->oi_oa, &body->oa);
154 /* This should really be sent by the OST */
155 aa->aa_oi->oi_oa->o_blksize = DT_MAX_BRW_SIZE;
156 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
158 CDEBUG(D_INFO, "can't unpack ost_body\n");
160 aa->aa_oi->oi_oa->o_valid = 0;
163 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
167 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
168 struct obd_info *oinfo)
170 struct ptlrpc_request *req;
171 struct ost_body *body;
175 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
179 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
180 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
182 ptlrpc_request_free(req);
186 osc_pack_req_body(req, oinfo);
188 ptlrpc_request_set_replen(req);
190 rc = ptlrpc_queue_wait(req);
194 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
196 GOTO(out, rc = -EPROTO);
198 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
199 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oinfo->oi_oa,
202 oinfo->oi_oa->o_blksize = cli_brw_size(exp->exp_obd);
203 oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
207 ptlrpc_req_finished(req);
211 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
212 struct obd_info *oinfo, struct obd_trans_info *oti)
214 struct ptlrpc_request *req;
215 struct ost_body *body;
219 LASSERT(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP);
221 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
225 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
226 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
228 ptlrpc_request_free(req);
232 osc_pack_req_body(req, oinfo);
234 ptlrpc_request_set_replen(req);
236 rc = ptlrpc_queue_wait(req);
240 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
242 GOTO(out, rc = -EPROTO);
244 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oinfo->oi_oa,
249 ptlrpc_req_finished(req);
253 static int osc_setattr_interpret(const struct lu_env *env,
254 struct ptlrpc_request *req,
255 struct osc_setattr_args *sa, int rc)
257 struct ost_body *body;
263 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
265 GOTO(out, rc = -EPROTO);
267 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, sa->sa_oa,
270 rc = sa->sa_upcall(sa->sa_cookie, rc);
274 int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
275 obd_enqueue_update_f upcall, void *cookie,
276 struct ptlrpc_request_set *rqset)
278 struct ptlrpc_request *req;
279 struct osc_setattr_args *sa;
283 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
287 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
288 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
290 ptlrpc_request_free(req);
294 osc_pack_req_body(req, oinfo);
296 ptlrpc_request_set_replen(req);
298 /* do mds to ost setattr asynchronously */
300 /* Do not wait for response. */
301 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
303 req->rq_interpret_reply =
304 (ptlrpc_interpterer_t)osc_setattr_interpret;
306 CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
307 sa = ptlrpc_req_async_args(req);
308 sa->sa_oa = oinfo->oi_oa;
309 sa->sa_upcall = upcall;
310 sa->sa_cookie = cookie;
312 if (rqset == PTLRPCD_SET)
313 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
315 ptlrpc_set_add_req(rqset, req);
321 static int osc_create(const struct lu_env *env, struct obd_export *exp,
322 struct obdo *oa, struct obd_trans_info *oti)
324 struct ptlrpc_request *req;
325 struct ost_body *body;
330 LASSERT(oa->o_valid & OBD_MD_FLGROUP);
331 LASSERT(fid_seq_is_echo(ostid_seq(&oa->o_oi)));
333 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
335 GOTO(out, rc = -ENOMEM);
337 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
339 ptlrpc_request_free(req);
343 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
346 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
348 ptlrpc_request_set_replen(req);
350 if ((oa->o_valid & OBD_MD_FLFLAGS) &&
351 oa->o_flags == OBD_FL_DELORPHAN) {
353 "delorphan from OST integration");
354 /* Don't resend the delorphan req */
355 req->rq_no_resend = req->rq_no_delay = 1;
358 rc = ptlrpc_queue_wait(req);
362 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
364 GOTO(out_req, rc = -EPROTO);
366 CDEBUG(D_INFO, "oa flags %x\n", oa->o_flags);
367 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
369 oa->o_blksize = cli_brw_size(exp->exp_obd);
370 oa->o_valid |= OBD_MD_FLBLKSZ;
373 if (oa->o_valid & OBD_MD_FLCOOKIE) {
374 if (oti->oti_logcookies == NULL)
375 oti->oti_logcookies = &oti->oti_onecookie;
377 *oti->oti_logcookies = oa->o_lcookie;
381 CDEBUG(D_HA, "transno: "LPD64"\n",
382 lustre_msg_get_transno(req->rq_repmsg));
384 ptlrpc_req_finished(req);
389 int osc_punch_base(struct obd_export *exp, struct obd_info *oinfo,
390 obd_enqueue_update_f upcall, void *cookie,
391 struct ptlrpc_request_set *rqset)
393 struct ptlrpc_request *req;
394 struct osc_setattr_args *sa;
395 struct ost_body *body;
399 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
403 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
404 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
406 ptlrpc_request_free(req);
409 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
410 ptlrpc_at_set_req_timeout(req);
412 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
414 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
416 osc_pack_capa(req, body, oinfo->oi_capa);
418 ptlrpc_request_set_replen(req);
420 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
421 CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
422 sa = ptlrpc_req_async_args(req);
423 sa->sa_oa = oinfo->oi_oa;
424 sa->sa_upcall = upcall;
425 sa->sa_cookie = cookie;
426 if (rqset == PTLRPCD_SET)
427 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
429 ptlrpc_set_add_req(rqset, req);
434 static int osc_sync_interpret(const struct lu_env *env,
435 struct ptlrpc_request *req,
438 struct osc_fsync_args *fa = arg;
439 struct ost_body *body;
445 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
447 CERROR ("can't unpack ost_body\n");
448 GOTO(out, rc = -EPROTO);
451 *fa->fa_oi->oi_oa = body->oa;
453 rc = fa->fa_upcall(fa->fa_cookie, rc);
457 int osc_sync_base(struct obd_export *exp, struct obd_info *oinfo,
458 obd_enqueue_update_f upcall, void *cookie,
459 struct ptlrpc_request_set *rqset)
461 struct ptlrpc_request *req;
462 struct ost_body *body;
463 struct osc_fsync_args *fa;
467 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
471 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
472 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
474 ptlrpc_request_free(req);
478 /* overload the size and blocks fields in the oa with start/end */
479 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
481 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
483 osc_pack_capa(req, body, oinfo->oi_capa);
485 ptlrpc_request_set_replen(req);
486 req->rq_interpret_reply = osc_sync_interpret;
488 CLASSERT(sizeof(*fa) <= sizeof(req->rq_async_args));
489 fa = ptlrpc_req_async_args(req);
491 fa->fa_upcall = upcall;
492 fa->fa_cookie = cookie;
494 if (rqset == PTLRPCD_SET)
495 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
497 ptlrpc_set_add_req(rqset, req);
502 /* Find and cancel locally locks matched by @mode in the resource found by
503 * @objid. Found locks are added into @cancel list. Returns the amount of
504 * locks added to @cancels list. */
505 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
506 struct list_head *cancels,
507 ldlm_mode_t mode, __u64 lock_flags)
509 struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
510 struct ldlm_res_id res_id;
511 struct ldlm_resource *res;
515 /* Return, i.e. cancel nothing, only if ELC is supported (flag in
516 * export) but disabled through procfs (flag in NS).
518 * This distinguishes from a case when ELC is not supported originally,
519 * when we still want to cancel locks in advance and just cancel them
520 * locally, without sending any RPC. */
521 if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
524 ostid_build_res_name(&oa->o_oi, &res_id);
525 res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
529 LDLM_RESOURCE_ADDREF(res);
530 count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
531 lock_flags, 0, NULL);
532 LDLM_RESOURCE_DELREF(res);
533 ldlm_resource_putref(res);
537 static int osc_destroy_interpret(const struct lu_env *env,
538 struct ptlrpc_request *req, void *data,
541 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
543 atomic_dec(&cli->cl_destroy_in_flight);
544 wake_up(&cli->cl_destroy_waitq);
548 static int osc_can_send_destroy(struct client_obd *cli)
550 if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
551 cli->cl_max_rpcs_in_flight) {
552 /* The destroy request can be sent */
555 if (atomic_dec_return(&cli->cl_destroy_in_flight) <
556 cli->cl_max_rpcs_in_flight) {
558 * The counter has been modified between the two atomic
561 wake_up(&cli->cl_destroy_waitq);
566 /* Destroy requests can be async always on the client, and we don't even really
567 * care about the return code since the client cannot do anything at all about
569 * When the MDS is unlinking a filename, it saves the file objects into a
570 * recovery llog, and these object records are cancelled when the OST reports
571 * they were destroyed and sync'd to disk (i.e. transaction committed).
572 * If the client dies, or the OST is down when the object should be destroyed,
573 * the records are not cancelled, and when the OST reconnects to the MDS next,
574 * it will retrieve the llog unlink logs and then sends the log cancellation
575 * cookies to the MDS after committing destroy transactions. */
576 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
577 struct obdo *oa, struct obd_trans_info *oti)
579 struct client_obd *cli = &exp->exp_obd->u.cli;
580 struct ptlrpc_request *req;
581 struct ost_body *body;
582 struct list_head cancels = LIST_HEAD_INIT(cancels);
587 CDEBUG(D_INFO, "oa NULL\n");
591 count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
592 LDLM_FL_DISCARD_DATA);
594 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
596 ldlm_lock_list_put(&cancels, l_bl_ast, count);
600 osc_set_capa_size(req, &RMF_CAPA1, NULL);
601 rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
604 ptlrpc_request_free(req);
608 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
609 ptlrpc_at_set_req_timeout(req);
611 if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
612 oa->o_lcookie = *oti->oti_logcookies;
613 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
615 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
617 ptlrpc_request_set_replen(req);
619 /* If osc_destory is for destroying the unlink orphan,
620 * sent from MDT to OST, which should not be blocked here,
621 * because the process might be triggered by ptlrpcd, and
622 * it is not good to block ptlrpcd thread (b=16006)*/
623 if (!(oa->o_flags & OBD_FL_DELORPHAN)) {
624 req->rq_interpret_reply = osc_destroy_interpret;
625 if (!osc_can_send_destroy(cli)) {
626 struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP,
630 * Wait until the number of on-going destroy RPCs drops
631 * under max_rpc_in_flight
633 l_wait_event_exclusive(cli->cl_destroy_waitq,
634 osc_can_send_destroy(cli), &lwi);
638 /* Do not wait for response */
639 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
643 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
646 u64 bits = OBD_MD_FLBLOCKS | OBD_MD_FLGRANT;
648 LASSERT(!(oa->o_valid & bits));
651 spin_lock(&cli->cl_loi_list_lock);
652 oa->o_dirty = cli->cl_dirty_pages << PAGE_CACHE_SHIFT;
653 if (unlikely(cli->cl_dirty_pages - cli->cl_dirty_transit >
654 cli->cl_dirty_max_pages)) {
655 CERROR("dirty %lu - %lu > dirty_max %lu\n",
656 cli->cl_dirty_pages, cli->cl_dirty_transit,
657 cli->cl_dirty_max_pages);
659 } else if (unlikely(atomic_long_read(&obd_dirty_pages) -
660 atomic_long_read(&obd_dirty_transit_pages) >
661 (obd_max_dirty_pages + 1))) {
662 /* The atomic_read() allowing the atomic_inc() are
663 * not covered by a lock thus they may safely race and trip
664 * this CERROR() unless we add in a small fudge factor (+1). */
665 CERROR("%s: dirty %ld - %ld > system dirty_max %lu\n",
666 cli->cl_import->imp_obd->obd_name,
667 atomic_long_read(&obd_dirty_pages),
668 atomic_long_read(&obd_dirty_transit_pages),
669 obd_max_dirty_pages);
671 } else if (unlikely(cli->cl_dirty_max_pages - cli->cl_dirty_pages >
673 CERROR("dirty %lu - dirty_max %lu too big???\n",
674 cli->cl_dirty_pages, cli->cl_dirty_max_pages);
677 unsigned long max_in_flight = (cli->cl_max_pages_per_rpc <<
679 (cli->cl_max_rpcs_in_flight + 1);
680 oa->o_undirty = max(cli->cl_dirty_max_pages << PAGE_CACHE_SHIFT,
683 oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
684 oa->o_dropped = cli->cl_lost_grant;
685 cli->cl_lost_grant = 0;
686 spin_unlock(&cli->cl_loi_list_lock);
687 CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
688 oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
692 void osc_update_next_shrink(struct client_obd *cli)
694 cli->cl_next_shrink_grant =
695 cfs_time_shift(cli->cl_grant_shrink_interval);
696 CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
697 cli->cl_next_shrink_grant);
700 static void __osc_update_grant(struct client_obd *cli, u64 grant)
702 spin_lock(&cli->cl_loi_list_lock);
703 cli->cl_avail_grant += grant;
704 spin_unlock(&cli->cl_loi_list_lock);
707 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
709 if (body->oa.o_valid & OBD_MD_FLGRANT) {
710 CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
711 __osc_update_grant(cli, body->oa.o_grant);
715 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
716 u32 keylen, void *key,
717 u32 vallen, void *val,
718 struct ptlrpc_request_set *set);
720 static int osc_shrink_grant_interpret(const struct lu_env *env,
721 struct ptlrpc_request *req,
724 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
725 struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
726 struct ost_body *body;
729 __osc_update_grant(cli, oa->o_grant);
733 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
735 osc_update_grant(cli, body);
741 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
743 spin_lock(&cli->cl_loi_list_lock);
744 oa->o_grant = cli->cl_avail_grant / 4;
745 cli->cl_avail_grant -= oa->o_grant;
746 spin_unlock(&cli->cl_loi_list_lock);
747 if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
748 oa->o_valid |= OBD_MD_FLFLAGS;
751 oa->o_flags |= OBD_FL_SHRINK_GRANT;
752 osc_update_next_shrink(cli);
755 /* Shrink the current grant, either from some large amount to enough for a
756 * full set of in-flight RPCs, or if we have already shrunk to that limit
757 * then to enough for a single RPC. This avoids keeping more grant than
758 * needed, and avoids shrinking the grant piecemeal. */
759 static int osc_shrink_grant(struct client_obd *cli)
761 __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) *
762 (cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT);
764 spin_lock(&cli->cl_loi_list_lock);
765 if (cli->cl_avail_grant <= target_bytes)
766 target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
767 spin_unlock(&cli->cl_loi_list_lock);
769 return osc_shrink_grant_to_target(cli, target_bytes);
772 int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
775 struct ost_body *body;
778 spin_lock(&cli->cl_loi_list_lock);
779 /* Don't shrink if we are already above or below the desired limit
780 * We don't want to shrink below a single RPC, as that will negatively
781 * impact block allocation and long-term performance. */
782 if (target_bytes < cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT)
783 target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
785 if (target_bytes >= cli->cl_avail_grant) {
786 spin_unlock(&cli->cl_loi_list_lock);
789 spin_unlock(&cli->cl_loi_list_lock);
795 osc_announce_cached(cli, &body->oa, 0);
797 spin_lock(&cli->cl_loi_list_lock);
798 body->oa.o_grant = cli->cl_avail_grant - target_bytes;
799 cli->cl_avail_grant = target_bytes;
800 spin_unlock(&cli->cl_loi_list_lock);
801 if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
802 body->oa.o_valid |= OBD_MD_FLFLAGS;
803 body->oa.o_flags = 0;
805 body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
806 osc_update_next_shrink(cli);
808 rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
809 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
810 sizeof(*body), body, NULL);
812 __osc_update_grant(cli, body->oa.o_grant);
817 static int osc_should_shrink_grant(struct client_obd *client)
819 cfs_time_t time = cfs_time_current();
820 cfs_time_t next_shrink = client->cl_next_shrink_grant;
822 if ((client->cl_import->imp_connect_data.ocd_connect_flags &
823 OBD_CONNECT_GRANT_SHRINK) == 0)
826 if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
827 /* Get the current RPC size directly, instead of going via:
828 * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
829 * Keep comment here so that it can be found by searching. */
830 int brw_size = client->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
832 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
833 client->cl_avail_grant > brw_size)
836 osc_update_next_shrink(client);
841 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
843 struct client_obd *client;
845 list_for_each_entry(client, &item->ti_obd_list, cl_grant_shrink_list) {
846 if (osc_should_shrink_grant(client))
847 osc_shrink_grant(client);
852 static int osc_add_shrink_grant(struct client_obd *client)
856 rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
858 osc_grant_shrink_grant_cb, NULL,
859 &client->cl_grant_shrink_list);
861 CERROR("add grant client %s error %d\n",
862 client->cl_import->imp_obd->obd_name, rc);
865 CDEBUG(D_CACHE, "add grant client %s \n",
866 client->cl_import->imp_obd->obd_name);
867 osc_update_next_shrink(client);
871 static int osc_del_shrink_grant(struct client_obd *client)
873 return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
877 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
880 * ocd_grant is the total grant amount we're expect to hold: if we've
881 * been evicted, it's the new avail_grant amount, cl_dirty_pages will
882 * drop to 0 as inflight RPCs fail out; otherwise, it's avail_grant +
885 * race is tolerable here: if we're evicted, but imp_state already
886 * left EVICTED state, then cl_dirty_pages must be 0 already.
888 spin_lock(&cli->cl_loi_list_lock);
889 if (cli->cl_import->imp_state == LUSTRE_IMP_EVICTED)
890 cli->cl_avail_grant = ocd->ocd_grant;
892 cli->cl_avail_grant = ocd->ocd_grant -
893 (cli->cl_dirty_pages << PAGE_CACHE_SHIFT);
895 if (cli->cl_avail_grant < 0) {
896 CWARN("%s: available grant < 0: avail/ocd/dirty %ld/%u/%ld\n",
897 cli->cl_import->imp_obd->obd_name, cli->cl_avail_grant,
898 ocd->ocd_grant, cli->cl_dirty_pages << PAGE_CACHE_SHIFT);
899 /* workaround for servers which do not have the patch from
901 cli->cl_avail_grant = ocd->ocd_grant;
904 /* determine the appropriate chunk size used by osc_extent. */
905 cli->cl_chunkbits = max_t(int, PAGE_CACHE_SHIFT, ocd->ocd_blocksize);
906 spin_unlock(&cli->cl_loi_list_lock);
908 CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld."
909 "chunk bits: %d.\n", cli->cl_import->imp_obd->obd_name,
910 cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits);
912 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
913 list_empty(&cli->cl_grant_shrink_list))
914 osc_add_shrink_grant(cli);
917 /* We assume that the reason this OSC got a short read is because it read
918 * beyond the end of a stripe file; i.e. lustre is reading a sparse file
919 * via the LOV, and it _knows_ it's reading inside the file, it's just that
920 * this stripe never got written at or beyond this stripe offset yet. */
921 static void handle_short_read(int nob_read, size_t page_count,
922 struct brw_page **pga)
927 /* skip bytes read OK */
928 while (nob_read > 0) {
929 LASSERT (page_count > 0);
931 if (pga[i]->count > nob_read) {
932 /* EOF inside this page */
933 ptr = kmap(pga[i]->pg) +
934 (pga[i]->off & ~CFS_PAGE_MASK);
935 memset(ptr + nob_read, 0, pga[i]->count - nob_read);
942 nob_read -= pga[i]->count;
947 /* zero remaining pages */
948 while (page_count-- > 0) {
949 ptr = kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
950 memset(ptr, 0, pga[i]->count);
956 static int check_write_rcs(struct ptlrpc_request *req,
957 int requested_nob, int niocount,
958 size_t page_count, struct brw_page **pga)
963 remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
964 sizeof(*remote_rcs) *
966 if (remote_rcs == NULL) {
967 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
971 /* return error if any niobuf was in error */
972 for (i = 0; i < niocount; i++) {
973 if ((int)remote_rcs[i] < 0)
974 return(remote_rcs[i]);
976 if (remote_rcs[i] != 0) {
977 CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
978 i, remote_rcs[i], req);
983 if (req->rq_bulk->bd_nob_transferred != requested_nob) {
984 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
985 req->rq_bulk->bd_nob_transferred, requested_nob);
992 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
994 if (p1->flag != p2->flag) {
995 unsigned mask = ~(OBD_BRW_FROM_GRANT | OBD_BRW_NOCACHE |
996 OBD_BRW_SYNC | OBD_BRW_ASYNC |
997 OBD_BRW_NOQUOTA | OBD_BRW_SOFT_SYNC);
999 /* warn if we try to combine flags that we don't know to be
1000 * safe to combine */
1001 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
1002 CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
1003 "report this at https://jira.hpdd.intel.com/\n",
1004 p1->flag, p2->flag);
1009 return (p1->off + p1->count == p2->off);
1012 static u32 osc_checksum_bulk(int nob, size_t pg_count,
1013 struct brw_page **pga, int opc,
1014 cksum_type_t cksum_type)
1018 struct cfs_crypto_hash_desc *hdesc;
1019 unsigned int bufsize;
1021 unsigned char cfs_alg = cksum_obd2cfs(cksum_type);
1023 LASSERT(pg_count > 0);
1025 hdesc = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1026 if (IS_ERR(hdesc)) {
1027 CERROR("Unable to initialize checksum hash %s\n",
1028 cfs_crypto_hash_name(cfs_alg));
1029 return PTR_ERR(hdesc);
1032 while (nob > 0 && pg_count > 0) {
1033 unsigned int count = pga[i]->count > nob ? nob : pga[i]->count;
1035 /* corrupt the data before we compute the checksum, to
1036 * simulate an OST->client data error */
1037 if (i == 0 && opc == OST_READ &&
1038 OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
1039 unsigned char *ptr = kmap(pga[i]->pg);
1040 int off = pga[i]->off & ~CFS_PAGE_MASK;
1042 memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob));
1045 cfs_crypto_hash_update_page(hdesc, pga[i]->pg,
1046 pga[i]->off & ~CFS_PAGE_MASK,
1048 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d\n",
1049 (int)(pga[i]->off & ~CFS_PAGE_MASK));
1051 nob -= pga[i]->count;
1056 bufsize = sizeof(cksum);
1057 err = cfs_crypto_hash_final(hdesc, (unsigned char *)&cksum, &bufsize);
1059 /* For sending we only compute the wrong checksum instead
1060 * of corrupting the data so it is still correct on a redo */
1061 if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1067 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
1068 struct lov_stripe_md *lsm, u32 page_count,
1069 struct brw_page **pga,
1070 struct ptlrpc_request **reqp,
1071 struct obd_capa *ocapa, int reserve,
1074 struct ptlrpc_request *req;
1075 struct ptlrpc_bulk_desc *desc;
1076 struct ost_body *body;
1077 struct obd_ioobj *ioobj;
1078 struct niobuf_remote *niobuf;
1079 int niocount, i, requested_nob, opc, rc;
1080 struct osc_brw_async_args *aa;
1081 struct req_capsule *pill;
1082 struct brw_page *pg_prev;
1085 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1086 RETURN(-ENOMEM); /* Recoverable */
1087 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1088 RETURN(-EINVAL); /* Fatal */
1090 if ((cmd & OBD_BRW_WRITE) != 0) {
1092 req = ptlrpc_request_alloc_pool(cli->cl_import,
1093 cli->cl_import->imp_rq_pool,
1094 &RQF_OST_BRW_WRITE);
1097 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1102 for (niocount = i = 1; i < page_count; i++) {
1103 if (!can_merge_pages(pga[i - 1], pga[i]))
1107 pill = &req->rq_pill;
1108 req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1110 req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1111 niocount * sizeof(*niobuf));
1112 osc_set_capa_size(req, &RMF_CAPA1, ocapa);
1114 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1116 ptlrpc_request_free(req);
1119 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1120 ptlrpc_at_set_req_timeout(req);
1121 /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1123 req->rq_no_retry_einprogress = 1;
1125 desc = ptlrpc_prep_bulk_imp(req, page_count,
1126 cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
1127 opc == OST_WRITE ? BULK_GET_SOURCE : BULK_PUT_SINK,
1131 GOTO(out, rc = -ENOMEM);
1132 /* NB request now owns desc and will free it when it gets freed */
1134 body = req_capsule_client_get(pill, &RMF_OST_BODY);
1135 ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1136 niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1137 LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1139 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
1141 obdo_to_ioobj(oa, ioobj);
1142 ioobj->ioo_bufcnt = niocount;
1143 /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
1144 * that might be send for this request. The actual number is decided
1145 * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
1146 * "max - 1" for old client compatibility sending "0", and also so the
1147 * the actual maximum is a power-of-two number, not one less. LU-1431 */
1148 ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
1149 osc_pack_capa(req, body, ocapa);
1150 LASSERT(page_count > 0);
1152 for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1153 struct brw_page *pg = pga[i];
1154 int poff = pg->off & ~CFS_PAGE_MASK;
1156 LASSERT(pg->count > 0);
1157 /* make sure there is no gap in the middle of page array */
1158 LASSERTF(page_count == 1 ||
1159 (ergo(i == 0, poff + pg->count == PAGE_CACHE_SIZE) &&
1160 ergo(i > 0 && i < page_count - 1,
1161 poff == 0 && pg->count == PAGE_CACHE_SIZE) &&
1162 ergo(i == page_count - 1, poff == 0)),
1163 "i: %d/%d pg: %p off: "LPU64", count: %u\n",
1164 i, page_count, pg, pg->off, pg->count);
1165 LASSERTF(i == 0 || pg->off > pg_prev->off,
1166 "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1167 " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1169 pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1170 pg_prev->pg, page_private(pg_prev->pg),
1171 pg_prev->pg->index, pg_prev->off);
1172 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1173 (pg->flag & OBD_BRW_SRVLOCK));
1175 ptlrpc_prep_bulk_page_pin(desc, pg->pg, poff, pg->count);
1176 requested_nob += pg->count;
1178 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1180 niobuf->rnb_len += pg->count;
1182 niobuf->rnb_offset = pg->off;
1183 niobuf->rnb_len = pg->count;
1184 niobuf->rnb_flags = pg->flag;
1189 LASSERTF((void *)(niobuf - niocount) ==
1190 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1191 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1192 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1194 osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1196 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1197 body->oa.o_valid |= OBD_MD_FLFLAGS;
1198 body->oa.o_flags = 0;
1200 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1203 if (osc_should_shrink_grant(cli))
1204 osc_shrink_grant_local(cli, &body->oa);
1206 /* size[REQ_REC_OFF] still sizeof (*body) */
1207 if (opc == OST_WRITE) {
1208 if (cli->cl_checksum &&
1209 !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1210 /* store cl_cksum_type in a local variable since
1211 * it can be changed via lprocfs */
1212 cksum_type_t cksum_type = cli->cl_cksum_type;
1214 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1215 oa->o_flags &= OBD_FL_LOCAL_MASK;
1216 body->oa.o_flags = 0;
1218 body->oa.o_flags |= cksum_type_pack(cksum_type);
1219 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1220 body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1224 CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1226 /* save this in 'oa', too, for later checking */
1227 oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1228 oa->o_flags |= cksum_type_pack(cksum_type);
1230 /* clear out the checksum flag, in case this is a
1231 * resend but cl_checksum is no longer set. b=11238 */
1232 oa->o_valid &= ~OBD_MD_FLCKSUM;
1234 oa->o_cksum = body->oa.o_cksum;
1235 /* 1 RC per niobuf */
1236 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1237 sizeof(__u32) * niocount);
1239 if (cli->cl_checksum &&
1240 !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1241 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1242 body->oa.o_flags = 0;
1243 body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1244 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1247 ptlrpc_request_set_replen(req);
1249 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1250 aa = ptlrpc_req_async_args(req);
1252 aa->aa_requested_nob = requested_nob;
1253 aa->aa_nio_count = niocount;
1254 aa->aa_page_count = page_count;
1258 INIT_LIST_HEAD(&aa->aa_oaps);
1259 if (ocapa && reserve)
1260 aa->aa_ocapa = capa_get(ocapa);
1263 niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1264 CDEBUG(D_RPCTRACE, "brw rpc %p - object "DOSTID" offset %lld<>%lld\n",
1265 req, POSTID(&oa->o_oi), niobuf[0].rnb_offset,
1266 niobuf[niocount - 1].rnb_offset + niobuf[niocount - 1].rnb_len);
1270 ptlrpc_req_finished(req);
1274 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1275 __u32 client_cksum, __u32 server_cksum, int nob,
1276 size_t page_count, struct brw_page **pga,
1277 cksum_type_t client_cksum_type)
1281 cksum_type_t cksum_type;
1283 if (server_cksum == client_cksum) {
1284 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1288 cksum_type = cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1290 new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1293 if (cksum_type != client_cksum_type)
1294 msg = "the server did not use the checksum type specified in "
1295 "the original request - likely a protocol problem";
1296 else if (new_cksum == server_cksum)
1297 msg = "changed on the client after we checksummed it - "
1298 "likely false positive due to mmap IO (bug 11742)";
1299 else if (new_cksum == client_cksum)
1300 msg = "changed in transit before arrival at OST";
1302 msg = "changed in transit AND doesn't match the original - "
1303 "likely false positive due to mmap IO (bug 11742)";
1305 LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inode "DFID
1306 " object "DOSTID" extent ["LPU64"-"LPU64"]\n",
1307 msg, libcfs_nid2str(peer->nid),
1308 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1309 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1310 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1311 POSTID(&oa->o_oi), pga[0]->off,
1312 pga[page_count-1]->off + pga[page_count-1]->count - 1);
1313 CERROR("original client csum %x (type %x), server csum %x (type %x), "
1314 "client csum now %x\n", client_cksum, client_cksum_type,
1315 server_cksum, cksum_type, new_cksum);
1319 /* Note rc enters this function as number of bytes transferred */
1320 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1322 struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1323 const lnet_process_id_t *peer =
1324 &req->rq_import->imp_connection->c_peer;
1325 struct client_obd *cli = aa->aa_cli;
1326 struct ost_body *body;
1327 u32 client_cksum = 0;
1330 if (rc < 0 && rc != -EDQUOT) {
1331 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1335 LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1336 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1338 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1342 /* set/clear over quota flag for a uid/gid */
1343 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1344 body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) {
1345 unsigned int qid[MAXQUOTAS] = { body->oa.o_uid, body->oa.o_gid };
1347 CDEBUG(D_QUOTA, "setdq for [%u %u] with valid "LPX64", flags %x\n",
1348 body->oa.o_uid, body->oa.o_gid, body->oa.o_valid,
1350 osc_quota_setdq(cli, qid, body->oa.o_valid, body->oa.o_flags);
1353 osc_update_grant(cli, body);
1358 if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1359 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1361 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1363 CERROR("Unexpected +ve rc %d\n", rc);
1366 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1368 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1371 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1372 check_write_checksum(&body->oa, peer, client_cksum,
1373 body->oa.o_cksum, aa->aa_requested_nob,
1374 aa->aa_page_count, aa->aa_ppga,
1375 cksum_type_unpack(aa->aa_oa->o_flags)))
1378 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1379 aa->aa_page_count, aa->aa_ppga);
1383 /* The rest of this function executes only for OST_READs */
1385 /* if unwrap_bulk failed, return -EAGAIN to retry */
1386 rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1388 GOTO(out, rc = -EAGAIN);
1390 if (rc > aa->aa_requested_nob) {
1391 CERROR("Unexpected rc %d (%d requested)\n", rc,
1392 aa->aa_requested_nob);
1396 if (rc != req->rq_bulk->bd_nob_transferred) {
1397 CERROR ("Unexpected rc %d (%d transferred)\n",
1398 rc, req->rq_bulk->bd_nob_transferred);
1402 if (rc < aa->aa_requested_nob)
1403 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1405 if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1406 static int cksum_counter;
1407 u32 server_cksum = body->oa.o_cksum;
1410 cksum_type_t cksum_type;
1412 cksum_type = cksum_type_unpack(body->oa.o_valid &OBD_MD_FLFLAGS?
1413 body->oa.o_flags : 0);
1414 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1415 aa->aa_ppga, OST_READ,
1418 if (peer->nid != req->rq_bulk->bd_sender) {
1420 router = libcfs_nid2str(req->rq_bulk->bd_sender);
1423 if (server_cksum != client_cksum) {
1424 LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1425 "%s%s%s inode "DFID" object "DOSTID
1426 " extent ["LPU64"-"LPU64"]\n",
1427 req->rq_import->imp_obd->obd_name,
1428 libcfs_nid2str(peer->nid),
1430 body->oa.o_valid & OBD_MD_FLFID ?
1431 body->oa.o_parent_seq : (__u64)0,
1432 body->oa.o_valid & OBD_MD_FLFID ?
1433 body->oa.o_parent_oid : 0,
1434 body->oa.o_valid & OBD_MD_FLFID ?
1435 body->oa.o_parent_ver : 0,
1436 POSTID(&body->oa.o_oi),
1437 aa->aa_ppga[0]->off,
1438 aa->aa_ppga[aa->aa_page_count-1]->off +
1439 aa->aa_ppga[aa->aa_page_count-1]->count -
1441 CERROR("client %x, server %x, cksum_type %x\n",
1442 client_cksum, server_cksum, cksum_type);
1444 aa->aa_oa->o_cksum = client_cksum;
1448 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1451 } else if (unlikely(client_cksum)) {
1452 static int cksum_missed;
1455 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1456 CERROR("Checksum %u requested from %s but not sent\n",
1457 cksum_missed, libcfs_nid2str(peer->nid));
1463 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
1464 aa->aa_oa, &body->oa);
1469 static int osc_brw_redo_request(struct ptlrpc_request *request,
1470 struct osc_brw_async_args *aa, int rc)
1472 struct ptlrpc_request *new_req;
1473 struct osc_brw_async_args *new_aa;
1474 struct osc_async_page *oap;
1477 DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
1478 "redo for recoverable error %d", rc);
1480 rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1481 OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1482 aa->aa_cli, aa->aa_oa,
1483 NULL /* lsm unused by osc currently */,
1484 aa->aa_page_count, aa->aa_ppga,
1485 &new_req, aa->aa_ocapa, 0, 1);
1489 list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1490 if (oap->oap_request != NULL) {
1491 LASSERTF(request == oap->oap_request,
1492 "request %p != oap_request %p\n",
1493 request, oap->oap_request);
1494 if (oap->oap_interrupted) {
1495 ptlrpc_req_finished(new_req);
1500 /* New request takes over pga and oaps from old request.
1501 * Note that copying a list_head doesn't work, need to move it... */
1503 new_req->rq_interpret_reply = request->rq_interpret_reply;
1504 new_req->rq_async_args = request->rq_async_args;
1505 new_req->rq_commit_cb = request->rq_commit_cb;
1506 /* cap resend delay to the current request timeout, this is similar to
1507 * what ptlrpc does (see after_reply()) */
1508 if (aa->aa_resends > new_req->rq_timeout)
1509 new_req->rq_sent = cfs_time_current_sec() + new_req->rq_timeout;
1511 new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
1512 new_req->rq_generation_set = 1;
1513 new_req->rq_import_generation = request->rq_import_generation;
1515 new_aa = ptlrpc_req_async_args(new_req);
1517 INIT_LIST_HEAD(&new_aa->aa_oaps);
1518 list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
1519 INIT_LIST_HEAD(&new_aa->aa_exts);
1520 list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
1521 new_aa->aa_resends = aa->aa_resends;
1523 list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1524 if (oap->oap_request) {
1525 ptlrpc_req_finished(oap->oap_request);
1526 oap->oap_request = ptlrpc_request_addref(new_req);
1530 new_aa->aa_ocapa = aa->aa_ocapa;
1531 aa->aa_ocapa = NULL;
1533 /* XXX: This code will run into problem if we're going to support
1534 * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
1535 * and wait for all of them to be finished. We should inherit request
1536 * set from old request. */
1537 ptlrpcd_add_req(new_req, PDL_POLICY_SAME, -1);
1539 DEBUG_REQ(D_INFO, new_req, "new request");
1544 * ugh, we want disk allocation on the target to happen in offset order. we'll
1545 * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1546 * fine for our small page arrays and doesn't require allocation. its an
1547 * insertion sort that swaps elements that are strides apart, shrinking the
1548 * stride down until its '1' and the array is sorted.
1550 static void sort_brw_pages(struct brw_page **array, int num)
1553 struct brw_page *tmp;
1557 for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1562 for (i = stride ; i < num ; i++) {
1565 while (j >= stride && array[j - stride]->off > tmp->off) {
1566 array[j] = array[j - stride];
1571 } while (stride > 1);
1574 static void osc_release_ppga(struct brw_page **ppga, size_t count)
1576 LASSERT(ppga != NULL);
1577 OBD_FREE(ppga, sizeof(*ppga) * count);
1580 static int brw_interpret(const struct lu_env *env,
1581 struct ptlrpc_request *req, void *data, int rc)
1583 struct osc_brw_async_args *aa = data;
1584 struct osc_extent *ext;
1585 struct osc_extent *tmp;
1586 struct client_obd *cli = aa->aa_cli;
1589 rc = osc_brw_fini_request(req, rc);
1590 CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
1591 /* When server return -EINPROGRESS, client should always retry
1592 * regardless of the number of times the bulk was resent already. */
1593 if (osc_recoverable_error(rc)) {
1594 if (req->rq_import_generation !=
1595 req->rq_import->imp_generation) {
1596 CDEBUG(D_HA, "%s: resend cross eviction for object: "
1597 ""DOSTID", rc = %d.\n",
1598 req->rq_import->imp_obd->obd_name,
1599 POSTID(&aa->aa_oa->o_oi), rc);
1600 } else if (rc == -EINPROGRESS ||
1601 client_should_resend(aa->aa_resends, aa->aa_cli)) {
1602 rc = osc_brw_redo_request(req, aa, rc);
1604 CERROR("%s: too many resent retries for object: "
1605 ""LPU64":"LPU64", rc = %d.\n",
1606 req->rq_import->imp_obd->obd_name,
1607 POSTID(&aa->aa_oa->o_oi), rc);
1612 else if (rc == -EAGAIN || rc == -EINPROGRESS)
1617 capa_put(aa->aa_ocapa);
1618 aa->aa_ocapa = NULL;
1622 struct obdo *oa = aa->aa_oa;
1623 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
1624 unsigned long valid = 0;
1625 struct cl_object *obj;
1626 struct osc_async_page *last;
1628 last = brw_page2oap(aa->aa_ppga[aa->aa_page_count - 1]);
1629 obj = osc2cl(last->oap_obj);
1631 cl_object_attr_lock(obj);
1632 if (oa->o_valid & OBD_MD_FLBLOCKS) {
1633 attr->cat_blocks = oa->o_blocks;
1634 valid |= CAT_BLOCKS;
1636 if (oa->o_valid & OBD_MD_FLMTIME) {
1637 attr->cat_mtime = oa->o_mtime;
1640 if (oa->o_valid & OBD_MD_FLATIME) {
1641 attr->cat_atime = oa->o_atime;
1644 if (oa->o_valid & OBD_MD_FLCTIME) {
1645 attr->cat_ctime = oa->o_ctime;
1649 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1650 struct lov_oinfo *loi = cl2osc(obj)->oo_oinfo;
1651 loff_t last_off = last->oap_count + last->oap_obj_off +
1654 /* Change file size if this is an out of quota or
1655 * direct IO write and it extends the file size */
1656 if (loi->loi_lvb.lvb_size < last_off) {
1657 attr->cat_size = last_off;
1660 /* Extend KMS if it's not a lockless write */
1661 if (loi->loi_kms < last_off &&
1662 oap2osc_page(last)->ops_srvlock == 0) {
1663 attr->cat_kms = last_off;
1669 cl_object_attr_update(env, obj, attr, valid);
1670 cl_object_attr_unlock(obj);
1672 OBDO_FREE(aa->aa_oa);
1674 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE && rc == 0)
1675 osc_inc_unstable_pages(req);
1677 list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
1678 list_del_init(&ext->oe_link);
1679 osc_extent_finish(env, ext, 1, rc);
1681 LASSERT(list_empty(&aa->aa_exts));
1682 LASSERT(list_empty(&aa->aa_oaps));
1684 cl_req_completion(env, aa->aa_clerq, rc < 0 ? rc :
1685 req->rq_bulk->bd_nob_transferred);
1686 osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1687 ptlrpc_lprocfs_brw(req, req->rq_bulk->bd_nob_transferred);
1689 spin_lock(&cli->cl_loi_list_lock);
1690 /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
1691 * is called so we know whether to go to sync BRWs or wait for more
1692 * RPCs to complete */
1693 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
1694 cli->cl_w_in_flight--;
1696 cli->cl_r_in_flight--;
1697 osc_wake_cache_waiters(cli);
1698 spin_unlock(&cli->cl_loi_list_lock);
1700 osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
1704 static void brw_commit(struct ptlrpc_request *req)
1706 /* If osc_inc_unstable_pages (via osc_extent_finish) races with
1707 * this called via the rq_commit_cb, I need to ensure
1708 * osc_dec_unstable_pages is still called. Otherwise unstable
1709 * pages may be leaked. */
1710 spin_lock(&req->rq_lock);
1711 if (likely(req->rq_unstable)) {
1712 req->rq_unstable = 0;
1713 spin_unlock(&req->rq_lock);
1715 osc_dec_unstable_pages(req);
1717 req->rq_committed = 1;
1718 spin_unlock(&req->rq_lock);
1723 * Build an RPC by the list of extent @ext_list. The caller must ensure
1724 * that the total pages in this list are NOT over max pages per RPC.
1725 * Extents in the list must be in OES_RPC state.
1727 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
1728 struct list_head *ext_list, int cmd, pdl_policy_t pol)
1730 struct ptlrpc_request *req = NULL;
1731 struct osc_extent *ext;
1732 struct brw_page **pga = NULL;
1733 struct osc_brw_async_args *aa = NULL;
1734 struct obdo *oa = NULL;
1735 struct osc_async_page *oap;
1736 struct osc_async_page *tmp;
1737 struct cl_req *clerq = NULL;
1738 enum cl_req_type crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE :
1740 struct cl_req_attr *crattr = NULL;
1741 loff_t starting_offset = OBD_OBJECT_EOF;
1742 loff_t ending_offset = 0;
1746 bool soft_sync = false;
1749 struct list_head rpc_list = LIST_HEAD_INIT(rpc_list);
1750 struct ost_body *body;
1752 LASSERT(!list_empty(ext_list));
1754 /* add pages into rpc_list to build BRW rpc */
1755 list_for_each_entry(ext, ext_list, oe_link) {
1756 LASSERT(ext->oe_state == OES_RPC);
1757 mem_tight |= ext->oe_memalloc;
1758 list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
1760 list_add_tail(&oap->oap_rpc_item, &rpc_list);
1761 if (starting_offset == OBD_OBJECT_EOF ||
1762 starting_offset > oap->oap_obj_off)
1763 starting_offset = oap->oap_obj_off;
1765 LASSERT(oap->oap_page_off == 0);
1766 if (ending_offset < oap->oap_obj_off + oap->oap_count)
1767 ending_offset = oap->oap_obj_off +
1770 LASSERT(oap->oap_page_off + oap->oap_count ==
1775 soft_sync = osc_over_unstable_soft_limit(cli);
1777 mpflag = cfs_memory_pressure_get_and_set();
1779 OBD_ALLOC(crattr, sizeof(*crattr));
1781 GOTO(out, rc = -ENOMEM);
1783 OBD_ALLOC(pga, sizeof(*pga) * page_count);
1785 GOTO(out, rc = -ENOMEM);
1789 GOTO(out, rc = -ENOMEM);
1792 list_for_each_entry(oap, &rpc_list, oap_rpc_item) {
1793 struct cl_page *page = oap2cl_page(oap);
1794 if (clerq == NULL) {
1795 clerq = cl_req_alloc(env, page, crt,
1796 1 /* only 1-object rpcs for now */);
1798 GOTO(out, rc = PTR_ERR(clerq));
1801 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
1803 oap->oap_brw_flags |= OBD_BRW_SOFT_SYNC;
1804 pga[i] = &oap->oap_brw_page;
1805 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
1806 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
1807 pga[i]->pg, page_index(oap->oap_page), oap,
1810 cl_req_page_add(env, clerq, page);
1813 /* always get the data for the obdo for the rpc */
1814 LASSERT(clerq != NULL);
1815 crattr->cra_oa = oa;
1816 cl_req_attr_set(env, clerq, crattr, ~0ULL);
1818 rc = cl_req_prep(env, clerq);
1820 CERROR("cl_req_prep failed: %d\n", rc);
1824 sort_brw_pages(pga, page_count);
1825 rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
1826 pga, &req, crattr->cra_capa, 1, 0);
1828 CERROR("prep_req failed: %d\n", rc);
1832 req->rq_commit_cb = brw_commit;
1833 req->rq_interpret_reply = brw_interpret;
1836 req->rq_memalloc = 1;
1838 /* Need to update the timestamps after the request is built in case
1839 * we race with setattr (locally or in queue at OST). If OST gets
1840 * later setattr before earlier BRW (as determined by the request xid),
1841 * the OST will not use BRW timestamps. Sadly, there is no obvious
1842 * way to do this in a single call. bug 10150 */
1843 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
1844 crattr->cra_oa = &body->oa;
1845 cl_req_attr_set(env, clerq, crattr,
1846 OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME);
1848 lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid);
1850 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1851 aa = ptlrpc_req_async_args(req);
1852 INIT_LIST_HEAD(&aa->aa_oaps);
1853 list_splice_init(&rpc_list, &aa->aa_oaps);
1854 INIT_LIST_HEAD(&aa->aa_exts);
1855 list_splice_init(ext_list, &aa->aa_exts);
1856 aa->aa_clerq = clerq;
1858 /* queued sync pages can be torn down while the pages
1859 * were between the pending list and the rpc */
1861 list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1862 /* only one oap gets a request reference */
1865 if (oap->oap_interrupted && !req->rq_intr) {
1866 CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
1868 ptlrpc_mark_interrupted(req);
1872 tmp->oap_request = ptlrpc_request_addref(req);
1874 spin_lock(&cli->cl_loi_list_lock);
1875 starting_offset >>= PAGE_CACHE_SHIFT;
1876 if (cmd == OBD_BRW_READ) {
1877 cli->cl_r_in_flight++;
1878 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
1879 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
1880 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
1881 starting_offset + 1);
1883 cli->cl_w_in_flight++;
1884 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
1885 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
1886 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
1887 starting_offset + 1);
1889 spin_unlock(&cli->cl_loi_list_lock);
1891 DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %ur/%uw in flight",
1892 page_count, aa, cli->cl_r_in_flight,
1893 cli->cl_w_in_flight);
1895 /* XXX: Maybe the caller can check the RPC bulk descriptor to
1896 * see which CPU/NUMA node the majority of pages were allocated
1897 * on, and try to assign the async RPC to the CPU core
1898 * (PDL_POLICY_PREFERRED) to reduce cross-CPU memory traffic.
1900 * But on the other hand, we expect that multiple ptlrpcd
1901 * threads and the initial write sponsor can run in parallel,
1902 * especially when data checksum is enabled, which is CPU-bound
1903 * operation and single ptlrpcd thread cannot process in time.
1904 * So more ptlrpcd threads sharing BRW load
1905 * (with PDL_POLICY_ROUND) seems better.
1907 ptlrpcd_add_req(req, pol, -1);
1913 cfs_memory_pressure_restore(mpflag);
1915 if (crattr != NULL) {
1916 capa_put(crattr->cra_capa);
1917 OBD_FREE(crattr, sizeof(*crattr));
1921 LASSERT(req == NULL);
1926 OBD_FREE(pga, sizeof(*pga) * page_count);
1927 /* this should happen rarely and is pretty bad, it makes the
1928 * pending list not follow the dirty order */
1929 while (!list_empty(ext_list)) {
1930 ext = list_entry(ext_list->next, struct osc_extent,
1932 list_del_init(&ext->oe_link);
1933 osc_extent_finish(env, ext, 0, rc);
1935 if (clerq && !IS_ERR(clerq))
1936 cl_req_completion(env, clerq, rc);
1941 static int osc_set_lock_data_with_check(struct ldlm_lock *lock,
1942 struct ldlm_enqueue_info *einfo)
1944 void *data = einfo->ei_cbdata;
1947 LASSERT(lock != NULL);
1948 LASSERT(lock->l_blocking_ast == einfo->ei_cb_bl);
1949 LASSERT(lock->l_resource->lr_type == einfo->ei_type);
1950 LASSERT(lock->l_completion_ast == einfo->ei_cb_cp);
1951 LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl);
1953 lock_res_and_lock(lock);
1955 if (lock->l_ast_data == NULL)
1956 lock->l_ast_data = data;
1957 if (lock->l_ast_data == data)
1960 unlock_res_and_lock(lock);
1965 static int osc_set_data_with_check(struct lustre_handle *lockh,
1966 struct ldlm_enqueue_info *einfo)
1968 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
1972 set = osc_set_lock_data_with_check(lock, einfo);
1973 LDLM_LOCK_PUT(lock);
1975 CERROR("lockh %p, data %p - client evicted?\n",
1976 lockh, einfo->ei_cbdata);
1980 static int osc_enqueue_fini(struct ptlrpc_request *req,
1981 osc_enqueue_upcall_f upcall, void *cookie,
1982 struct lustre_handle *lockh, ldlm_mode_t mode,
1983 __u64 *flags, int agl, int errcode)
1985 bool intent = *flags & LDLM_FL_HAS_INTENT;
1989 /* The request was created before ldlm_cli_enqueue call. */
1990 if (intent && errcode == ELDLM_LOCK_ABORTED) {
1991 struct ldlm_reply *rep;
1993 rep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
1994 LASSERT(rep != NULL);
1996 rep->lock_policy_res1 =
1997 ptlrpc_status_ntoh(rep->lock_policy_res1);
1998 if (rep->lock_policy_res1)
1999 errcode = rep->lock_policy_res1;
2001 *flags |= LDLM_FL_LVB_READY;
2002 } else if (errcode == ELDLM_OK) {
2003 *flags |= LDLM_FL_LVB_READY;
2006 /* Call the update callback. */
2007 rc = (*upcall)(cookie, lockh, errcode);
2009 /* release the reference taken in ldlm_cli_enqueue() */
2010 if (errcode == ELDLM_LOCK_MATCHED)
2012 if (errcode == ELDLM_OK && lustre_handle_is_used(lockh))
2013 ldlm_lock_decref(lockh, mode);
2018 static int osc_enqueue_interpret(const struct lu_env *env,
2019 struct ptlrpc_request *req,
2020 struct osc_enqueue_args *aa, int rc)
2022 struct ldlm_lock *lock;
2023 struct lustre_handle *lockh = &aa->oa_lockh;
2024 ldlm_mode_t mode = aa->oa_mode;
2025 struct ost_lvb *lvb = aa->oa_lvb;
2026 __u32 lvb_len = sizeof(*lvb);
2031 /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2033 lock = ldlm_handle2lock(lockh);
2034 LASSERTF(lock != NULL,
2035 "lockh "LPX64", req %p, aa %p - client evicted?\n",
2036 lockh->cookie, req, aa);
2038 /* Take an additional reference so that a blocking AST that
2039 * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2040 * to arrive after an upcall has been executed by
2041 * osc_enqueue_fini(). */
2042 ldlm_lock_addref(lockh, mode);
2044 /* Let cl_lock_state_wait fail with -ERESTARTSYS to unuse sublocks. */
2045 OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_HANG, 2);
2047 /* Let CP AST to grant the lock first. */
2048 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
2051 LASSERT(aa->oa_lvb == NULL);
2052 LASSERT(aa->oa_flags == NULL);
2053 aa->oa_flags = &flags;
2056 /* Complete obtaining the lock procedure. */
2057 rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_type, 1,
2058 aa->oa_mode, aa->oa_flags, lvb, lvb_len,
2060 /* Complete osc stuff. */
2061 rc = osc_enqueue_fini(req, aa->oa_upcall, aa->oa_cookie, lockh, mode,
2062 aa->oa_flags, aa->oa_agl, rc);
2064 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
2066 ldlm_lock_decref(lockh, mode);
2067 LDLM_LOCK_PUT(lock);
2071 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
2073 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2074 * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2075 * other synchronous requests, however keeping some locks and trying to obtain
2076 * others may take a considerable amount of time in a case of ost failure; and
2077 * when other sync requests do not get released lock from a client, the client
2078 * is evicted from the cluster -- such scenarious make the life difficult, so
2079 * release locks just after they are obtained. */
2080 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2081 __u64 *flags, ldlm_policy_data_t *policy,
2082 struct ost_lvb *lvb, int kms_valid,
2083 osc_enqueue_upcall_f upcall, void *cookie,
2084 struct ldlm_enqueue_info *einfo,
2085 struct ptlrpc_request_set *rqset, int async, int agl)
2087 struct obd_device *obd = exp->exp_obd;
2088 struct lustre_handle lockh = { 0 };
2089 struct ptlrpc_request *req = NULL;
2090 int intent = *flags & LDLM_FL_HAS_INTENT;
2091 __u64 match_lvb = agl ? 0 : LDLM_FL_LVB_READY;
2096 /* Filesystem lock extents are extended to page boundaries so that
2097 * dealing with the page cache is a little smoother. */
2098 policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2099 policy->l_extent.end |= ~CFS_PAGE_MASK;
2102 * kms is not valid when either object is completely fresh (so that no
2103 * locks are cached), or object was evicted. In the latter case cached
2104 * lock cannot be used, because it would prime inode state with
2105 * potentially stale LVB.
2110 /* Next, search for already existing extent locks that will cover us */
2111 /* If we're trying to read, we also search for an existing PW lock. The
2112 * VFS and page cache already protect us locally, so lots of readers/
2113 * writers can share a single PW lock.
2115 * There are problems with conversion deadlocks, so instead of
2116 * converting a read lock to a write lock, we'll just enqueue a new
2119 * At some point we should cancel the read lock instead of making them
2120 * send us a blocking callback, but there are problems with canceling
2121 * locks out from other users right now, too. */
2122 mode = einfo->ei_mode;
2123 if (einfo->ei_mode == LCK_PR)
2125 mode = ldlm_lock_match(obd->obd_namespace, *flags | match_lvb, res_id,
2126 einfo->ei_type, policy, mode, &lockh, 0);
2128 struct ldlm_lock *matched;
2130 if (*flags & LDLM_FL_TEST_LOCK)
2133 matched = ldlm_handle2lock(&lockh);
2135 /* AGL enqueues DLM locks speculatively. Therefore if
2136 * it already exists a DLM lock, it wll just inform the
2137 * caller to cancel the AGL process for this stripe. */
2138 ldlm_lock_decref(&lockh, mode);
2139 LDLM_LOCK_PUT(matched);
2141 } else if (osc_set_lock_data_with_check(matched, einfo)) {
2142 *flags |= LDLM_FL_LVB_READY;
2144 /* We already have a lock, and it's referenced. */
2145 (*upcall)(cookie, &lockh, ELDLM_LOCK_MATCHED);
2147 ldlm_lock_decref(&lockh, mode);
2148 LDLM_LOCK_PUT(matched);
2151 ldlm_lock_decref(&lockh, mode);
2152 LDLM_LOCK_PUT(matched);
2157 if (*flags & LDLM_FL_TEST_LOCK)
2161 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2162 &RQF_LDLM_ENQUEUE_LVB);
2166 rc = ptlrpc_request_pack(req, LUSTRE_DLM_VERSION, LDLM_ENQUEUE);
2168 ptlrpc_request_free(req);
2172 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
2174 ptlrpc_request_set_replen(req);
2177 /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2178 *flags &= ~LDLM_FL_BLOCK_GRANTED;
2180 rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2181 sizeof(*lvb), LVB_T_OST, &lockh, async);
2184 struct osc_enqueue_args *aa;
2185 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2186 aa = ptlrpc_req_async_args(req);
2188 aa->oa_mode = einfo->ei_mode;
2189 aa->oa_type = einfo->ei_type;
2190 lustre_handle_copy(&aa->oa_lockh, &lockh);
2191 aa->oa_upcall = upcall;
2192 aa->oa_cookie = cookie;
2195 aa->oa_flags = flags;
2198 /* AGL is essentially to enqueue an DLM lock
2199 * in advance, so we don't care about the
2200 * result of AGL enqueue. */
2202 aa->oa_flags = NULL;
2205 req->rq_interpret_reply =
2206 (ptlrpc_interpterer_t)osc_enqueue_interpret;
2207 if (rqset == PTLRPCD_SET)
2208 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
2210 ptlrpc_set_add_req(rqset, req);
2211 } else if (intent) {
2212 ptlrpc_req_finished(req);
2217 rc = osc_enqueue_fini(req, upcall, cookie, &lockh, einfo->ei_mode,
2220 ptlrpc_req_finished(req);
2225 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2226 __u32 type, ldlm_policy_data_t *policy, __u32 mode,
2227 __u64 *flags, void *data, struct lustre_handle *lockh,
2230 struct obd_device *obd = exp->exp_obd;
2231 __u64 lflags = *flags;
2235 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2238 /* Filesystem lock extents are extended to page boundaries so that
2239 * dealing with the page cache is a little smoother */
2240 policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2241 policy->l_extent.end |= ~CFS_PAGE_MASK;
2243 /* Next, search for already existing extent locks that will cover us */
2244 /* If we're trying to read, we also search for an existing PW lock. The
2245 * VFS and page cache already protect us locally, so lots of readers/
2246 * writers can share a single PW lock. */
2250 rc = ldlm_lock_match(obd->obd_namespace, lflags,
2251 res_id, type, policy, rc, lockh, unref);
2254 if (!osc_set_data_with_check(lockh, data)) {
2255 if (!(lflags & LDLM_FL_TEST_LOCK))
2256 ldlm_lock_decref(lockh, rc);
2260 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
2261 ldlm_lock_addref(lockh, LCK_PR);
2262 ldlm_lock_decref(lockh, LCK_PW);
2269 int osc_cancel_base(struct lustre_handle *lockh, __u32 mode)
2273 if (unlikely(mode == LCK_GROUP))
2274 ldlm_lock_decref_and_cancel(lockh, mode);
2276 ldlm_lock_decref(lockh, mode);
2281 static int osc_statfs_interpret(const struct lu_env *env,
2282 struct ptlrpc_request *req,
2283 struct osc_async_args *aa, int rc)
2285 struct obd_statfs *msfs;
2289 /* The request has in fact never been sent
2290 * due to issues at a higher level (LOV).
2291 * Exit immediately since the caller is
2292 * aware of the problem and takes care
2293 * of the clean up */
2296 if ((rc == -ENOTCONN || rc == -EAGAIN) &&
2297 (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
2303 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2305 GOTO(out, rc = -EPROTO);
2308 *aa->aa_oi->oi_osfs = *msfs;
2310 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2314 static int osc_statfs_async(struct obd_export *exp,
2315 struct obd_info *oinfo, __u64 max_age,
2316 struct ptlrpc_request_set *rqset)
2318 struct obd_device *obd = class_exp2obd(exp);
2319 struct ptlrpc_request *req;
2320 struct osc_async_args *aa;
2324 /* We could possibly pass max_age in the request (as an absolute
2325 * timestamp or a "seconds.usec ago") so the target can avoid doing
2326 * extra calls into the filesystem if that isn't necessary (e.g.
2327 * during mount that would help a bit). Having relative timestamps
2328 * is not so great if request processing is slow, while absolute
2329 * timestamps are not ideal because they need time synchronization. */
2330 req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
2334 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2336 ptlrpc_request_free(req);
2339 ptlrpc_request_set_replen(req);
2340 req->rq_request_portal = OST_CREATE_PORTAL;
2341 ptlrpc_at_set_req_timeout(req);
2343 if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
2344 /* procfs requests not want stat in wait for avoid deadlock */
2345 req->rq_no_resend = 1;
2346 req->rq_no_delay = 1;
2349 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
2350 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2351 aa = ptlrpc_req_async_args(req);
2354 ptlrpc_set_add_req(rqset, req);
2358 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
2359 struct obd_statfs *osfs, __u64 max_age, __u32 flags)
2361 struct obd_device *obd = class_exp2obd(exp);
2362 struct obd_statfs *msfs;
2363 struct ptlrpc_request *req;
2364 struct obd_import *imp = NULL;
2368 /*Since the request might also come from lprocfs, so we need
2369 *sync this with client_disconnect_export Bug15684*/
2370 down_read(&obd->u.cli.cl_sem);
2371 if (obd->u.cli.cl_import)
2372 imp = class_import_get(obd->u.cli.cl_import);
2373 up_read(&obd->u.cli.cl_sem);
2377 /* We could possibly pass max_age in the request (as an absolute
2378 * timestamp or a "seconds.usec ago") so the target can avoid doing
2379 * extra calls into the filesystem if that isn't necessary (e.g.
2380 * during mount that would help a bit). Having relative timestamps
2381 * is not so great if request processing is slow, while absolute
2382 * timestamps are not ideal because they need time synchronization. */
2383 req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
2385 class_import_put(imp);
2390 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2392 ptlrpc_request_free(req);
2395 ptlrpc_request_set_replen(req);
2396 req->rq_request_portal = OST_CREATE_PORTAL;
2397 ptlrpc_at_set_req_timeout(req);
2399 if (flags & OBD_STATFS_NODELAY) {
2400 /* procfs requests not want stat in wait for avoid deadlock */
2401 req->rq_no_resend = 1;
2402 req->rq_no_delay = 1;
2405 rc = ptlrpc_queue_wait(req);
2409 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2411 GOTO(out, rc = -EPROTO);
2418 ptlrpc_req_finished(req);
2422 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2423 void *karg, void *uarg)
2425 struct obd_device *obd = exp->exp_obd;
2426 struct obd_ioctl_data *data = karg;
2430 if (!try_module_get(THIS_MODULE)) {
2431 CERROR("%s: cannot get module '%s'\n", obd->obd_name,
2432 module_name(THIS_MODULE));
2436 case OBD_IOC_CLIENT_RECOVER:
2437 err = ptlrpc_recover_import(obd->u.cli.cl_import,
2438 data->ioc_inlbuf1, 0);
2442 case IOC_OSC_SET_ACTIVE:
2443 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
2446 case OBD_IOC_POLL_QUOTACHECK:
2447 err = osc_quota_poll_check(exp, (struct if_quotacheck *)karg);
2449 case OBD_IOC_PING_TARGET:
2450 err = ptlrpc_obd_ping(obd);
2453 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
2454 cmd, current_comm());
2455 GOTO(out, err = -ENOTTY);
2458 module_put(THIS_MODULE);
2462 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
2463 u32 keylen, void *key,
2464 u32 vallen, void *val,
2465 struct ptlrpc_request_set *set)
2467 struct ptlrpc_request *req;
2468 struct obd_device *obd = exp->exp_obd;
2469 struct obd_import *imp = class_exp2cliimp(exp);
2474 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
2476 if (KEY_IS(KEY_CHECKSUM)) {
2477 if (vallen != sizeof(int))
2479 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
2483 if (KEY_IS(KEY_SPTLRPC_CONF)) {
2484 sptlrpc_conf_client_adapt(obd);
2488 if (KEY_IS(KEY_FLUSH_CTX)) {
2489 sptlrpc_import_flush_my_ctx(imp);
2493 if (KEY_IS(KEY_CACHE_SET)) {
2494 struct client_obd *cli = &obd->u.cli;
2496 LASSERT(cli->cl_cache == NULL); /* only once */
2497 cli->cl_cache = (struct cl_client_cache *)val;
2498 cl_cache_incref(cli->cl_cache);
2499 cli->cl_lru_left = &cli->cl_cache->ccc_lru_left;
2501 /* add this osc into entity list */
2502 LASSERT(list_empty(&cli->cl_lru_osc));
2503 spin_lock(&cli->cl_cache->ccc_lru_lock);
2504 list_add(&cli->cl_lru_osc, &cli->cl_cache->ccc_lru);
2505 spin_unlock(&cli->cl_cache->ccc_lru_lock);
2510 if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
2511 struct client_obd *cli = &obd->u.cli;
2512 long nr = atomic_long_read(&cli->cl_lru_in_list) >> 1;
2513 long target = *(long *)val;
2515 nr = osc_lru_shrink(env, cli, min(nr, target), true);
2520 if (!set && !KEY_IS(KEY_GRANT_SHRINK))
2523 /* We pass all other commands directly to OST. Since nobody calls osc
2524 methods directly and everybody is supposed to go through LOV, we
2525 assume lov checked invalid values for us.
2526 The only recognised values so far are evict_by_nid and mds_conn.
2527 Even if something bad goes through, we'd get a -EINVAL from OST
2530 req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ?
2531 &RQF_OST_SET_GRANT_INFO :
2536 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
2537 RCL_CLIENT, keylen);
2538 if (!KEY_IS(KEY_GRANT_SHRINK))
2539 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
2540 RCL_CLIENT, vallen);
2541 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
2543 ptlrpc_request_free(req);
2547 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
2548 memcpy(tmp, key, keylen);
2549 tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ?
2552 memcpy(tmp, val, vallen);
2554 if (KEY_IS(KEY_GRANT_SHRINK)) {
2555 struct osc_grant_args *aa;
2558 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2559 aa = ptlrpc_req_async_args(req);
2562 ptlrpc_req_finished(req);
2565 *oa = ((struct ost_body *)val)->oa;
2567 req->rq_interpret_reply = osc_shrink_grant_interpret;
2570 ptlrpc_request_set_replen(req);
2571 if (!KEY_IS(KEY_GRANT_SHRINK)) {
2572 LASSERT(set != NULL);
2573 ptlrpc_set_add_req(set, req);
2574 ptlrpc_check_set(NULL, set);
2576 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
2581 static int osc_reconnect(const struct lu_env *env,
2582 struct obd_export *exp, struct obd_device *obd,
2583 struct obd_uuid *cluuid,
2584 struct obd_connect_data *data,
2587 struct client_obd *cli = &obd->u.cli;
2589 if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
2592 spin_lock(&cli->cl_loi_list_lock);
2593 data->ocd_grant = (cli->cl_avail_grant +
2594 (cli->cl_dirty_pages << PAGE_CACHE_SHIFT)) ?:
2595 2 * cli_brw_size(obd);
2596 lost_grant = cli->cl_lost_grant;
2597 cli->cl_lost_grant = 0;
2598 spin_unlock(&cli->cl_loi_list_lock);
2600 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
2601 " ocd_grant: %d, lost: %ld.\n", data->ocd_connect_flags,
2602 data->ocd_version, data->ocd_grant, lost_grant);
2608 static int osc_disconnect(struct obd_export *exp)
2610 struct obd_device *obd = class_exp2obd(exp);
2613 rc = client_disconnect_export(exp);
2615 * Initially we put del_shrink_grant before disconnect_export, but it
2616 * causes the following problem if setup (connect) and cleanup
2617 * (disconnect) are tangled together.
2618 * connect p1 disconnect p2
2619 * ptlrpc_connect_import
2620 * ............... class_manual_cleanup
2623 * ptlrpc_connect_interrupt
2625 * add this client to shrink list
2627 * Bang! pinger trigger the shrink.
2628 * So the osc should be disconnected from the shrink list, after we
2629 * are sure the import has been destroyed. BUG18662
2631 if (obd->u.cli.cl_import == NULL)
2632 osc_del_shrink_grant(&obd->u.cli);
2636 static int osc_import_event(struct obd_device *obd,
2637 struct obd_import *imp,
2638 enum obd_import_event event)
2640 struct client_obd *cli;
2644 LASSERT(imp->imp_obd == obd);
2647 case IMP_EVENT_DISCON: {
2649 spin_lock(&cli->cl_loi_list_lock);
2650 cli->cl_avail_grant = 0;
2651 cli->cl_lost_grant = 0;
2652 spin_unlock(&cli->cl_loi_list_lock);
2655 case IMP_EVENT_INACTIVE: {
2656 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
2659 case IMP_EVENT_INVALIDATE: {
2660 struct ldlm_namespace *ns = obd->obd_namespace;
2664 env = cl_env_get(&refcheck);
2668 /* all pages go to failing rpcs due to the invalid
2670 osc_io_unplug(env, cli, NULL, PDL_POLICY_ROUND);
2672 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
2673 cl_env_put(env, &refcheck);
2678 case IMP_EVENT_ACTIVE: {
2679 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
2682 case IMP_EVENT_OCD: {
2683 struct obd_connect_data *ocd = &imp->imp_connect_data;
2685 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
2686 osc_init_grant(&obd->u.cli, ocd);
2689 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
2690 imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
2692 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
2695 case IMP_EVENT_DEACTIVATE: {
2696 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE, NULL);
2699 case IMP_EVENT_ACTIVATE: {
2700 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE, NULL);
2704 CERROR("Unknown import event %d\n", event);
2711 * Determine whether the lock can be canceled before replaying the lock
2712 * during recovery, see bug16774 for detailed information.
2714 * \retval zero the lock can't be canceled
2715 * \retval other ok to cancel
2717 static int osc_cancel_weight(struct ldlm_lock *lock)
2720 * Cancel all unused and granted extent lock.
2722 if (lock->l_resource->lr_type == LDLM_EXTENT &&
2723 lock->l_granted_mode == lock->l_req_mode &&
2724 osc_ldlm_weigh_ast(lock) == 0)
2730 static int brw_queue_work(const struct lu_env *env, void *data)
2732 struct client_obd *cli = data;
2734 CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
2736 osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
2740 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
2742 struct client_obd *cli = &obd->u.cli;
2743 struct obd_type *type;
2748 rc = ptlrpcd_addref();
2752 rc = client_obd_setup(obd, lcfg);
2754 GOTO(out_ptlrpcd, rc);
2756 handler = ptlrpcd_alloc_work(cli->cl_import, brw_queue_work, cli);
2757 if (IS_ERR(handler))
2758 GOTO(out_client_setup, rc = PTR_ERR(handler));
2759 cli->cl_writeback_work = handler;
2761 handler = ptlrpcd_alloc_work(cli->cl_import, lru_queue_work, cli);
2762 if (IS_ERR(handler))
2763 GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
2764 cli->cl_lru_work = handler;
2766 rc = osc_quota_setup(obd);
2768 GOTO(out_ptlrpcd_work, rc);
2770 cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
2772 #ifdef CONFIG_PROC_FS
2773 obd->obd_vars = lprocfs_osc_obd_vars;
2775 /* If this is true then both client (osc) and server (osp) are on the
2776 * same node. The osp layer if loaded first will register the osc proc
2777 * directory. In that case this obd_device will be attached its proc
2778 * tree to type->typ_procsym instead of obd->obd_type->typ_procroot. */
2779 type = class_search_type(LUSTRE_OSP_NAME);
2780 if (type && type->typ_procsym) {
2781 obd->obd_proc_entry = lprocfs_register(obd->obd_name,
2783 obd->obd_vars, obd);
2784 if (IS_ERR(obd->obd_proc_entry)) {
2785 rc = PTR_ERR(obd->obd_proc_entry);
2786 CERROR("error %d setting up lprocfs for %s\n", rc,
2788 obd->obd_proc_entry = NULL;
2791 rc = lprocfs_obd_setup(obd);
2794 /* If the basic OSC proc tree construction succeeded then
2795 * lets do the rest. */
2797 lproc_osc_attach_seqstat(obd);
2798 sptlrpc_lprocfs_cliobd_attach(obd);
2799 ptlrpc_lprocfs_register_obd(obd);
2802 /* We need to allocate a few requests more, because
2803 * brw_interpret tries to create new requests before freeing
2804 * previous ones, Ideally we want to have 2x max_rpcs_in_flight
2805 * reserved, but I'm afraid that might be too much wasted RAM
2806 * in fact, so 2 is just my guess and still should work. */
2807 cli->cl_import->imp_rq_pool =
2808 ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
2810 ptlrpc_add_rqs_to_pool);
2812 INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
2813 ns_register_cancel(obd->obd_namespace, osc_cancel_weight);
2817 if (cli->cl_writeback_work != NULL) {
2818 ptlrpcd_destroy_work(cli->cl_writeback_work);
2819 cli->cl_writeback_work = NULL;
2821 if (cli->cl_lru_work != NULL) {
2822 ptlrpcd_destroy_work(cli->cl_lru_work);
2823 cli->cl_lru_work = NULL;
2826 client_obd_cleanup(obd);
2832 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
2838 case OBD_CLEANUP_EARLY: {
2839 struct obd_import *imp;
2840 imp = obd->u.cli.cl_import;
2841 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
2842 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
2843 ptlrpc_deactivate_import(imp);
2844 spin_lock(&imp->imp_lock);
2845 imp->imp_pingable = 0;
2846 spin_unlock(&imp->imp_lock);
2849 case OBD_CLEANUP_EXPORTS: {
2850 struct client_obd *cli = &obd->u.cli;
2852 * for echo client, export may be on zombie list, wait for
2853 * zombie thread to cull it, because cli.cl_import will be
2854 * cleared in client_disconnect_export():
2855 * class_export_destroy() -> obd_cleanup() ->
2856 * echo_device_free() -> echo_client_cleanup() ->
2857 * obd_disconnect() -> osc_disconnect() ->
2858 * client_disconnect_export()
2860 obd_zombie_barrier();
2861 if (cli->cl_writeback_work) {
2862 ptlrpcd_destroy_work(cli->cl_writeback_work);
2863 cli->cl_writeback_work = NULL;
2865 if (cli->cl_lru_work) {
2866 ptlrpcd_destroy_work(cli->cl_lru_work);
2867 cli->cl_lru_work = NULL;
2869 obd_cleanup_client_import(obd);
2870 ptlrpc_lprocfs_unregister_obd(obd);
2871 lprocfs_obd_cleanup(obd);
2878 int osc_cleanup(struct obd_device *obd)
2880 struct client_obd *cli = &obd->u.cli;
2886 if (cli->cl_cache != NULL) {
2887 LASSERT(atomic_read(&cli->cl_cache->ccc_users) > 0);
2888 spin_lock(&cli->cl_cache->ccc_lru_lock);
2889 list_del_init(&cli->cl_lru_osc);
2890 spin_unlock(&cli->cl_cache->ccc_lru_lock);
2891 cli->cl_lru_left = NULL;
2892 cl_cache_decref(cli->cl_cache);
2893 cli->cl_cache = NULL;
2896 /* free memory of osc quota cache */
2897 osc_quota_cleanup(obd);
2899 rc = client_obd_cleanup(obd);
2905 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
2907 int rc = class_process_proc_param(PARAM_OSC, obd->obd_vars, lcfg, obd);
2908 return rc > 0 ? 0: rc;
2911 static int osc_process_config(struct obd_device *obd, size_t len, void *buf)
2913 return osc_process_config_base(obd, buf);
2916 static struct obd_ops osc_obd_ops = {
2917 .o_owner = THIS_MODULE,
2918 .o_setup = osc_setup,
2919 .o_precleanup = osc_precleanup,
2920 .o_cleanup = osc_cleanup,
2921 .o_add_conn = client_import_add_conn,
2922 .o_del_conn = client_import_del_conn,
2923 .o_connect = client_connect_import,
2924 .o_reconnect = osc_reconnect,
2925 .o_disconnect = osc_disconnect,
2926 .o_statfs = osc_statfs,
2927 .o_statfs_async = osc_statfs_async,
2928 .o_create = osc_create,
2929 .o_destroy = osc_destroy,
2930 .o_getattr = osc_getattr,
2931 .o_setattr = osc_setattr,
2932 .o_iocontrol = osc_iocontrol,
2933 .o_set_info_async = osc_set_info_async,
2934 .o_import_event = osc_import_event,
2935 .o_process_config = osc_process_config,
2936 .o_quotactl = osc_quotactl,
2937 .o_quotacheck = osc_quotacheck,
2940 static int __init osc_init(void)
2942 bool enable_proc = true;
2943 struct obd_type *type;
2947 /* print an address of _any_ initialized kernel symbol from this
2948 * module, to allow debugging with gdb that doesn't support data
2949 * symbols from modules.*/
2950 CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
2952 rc = lu_kmem_init(osc_caches);
2956 type = class_search_type(LUSTRE_OSP_NAME);
2957 if (type != NULL && type->typ_procsym != NULL)
2958 enable_proc = false;
2960 rc = class_register_type(&osc_obd_ops, NULL, enable_proc, NULL,
2961 LUSTRE_OSC_NAME, &osc_device_type);
2963 lu_kmem_fini(osc_caches);
2970 static void /*__exit*/ osc_exit(void)
2972 class_unregister_type(LUSTRE_OSC_NAME);
2973 lu_kmem_fini(osc_caches);
2976 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
2977 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
2978 MODULE_LICENSE("GPL");
2980 cfs_module(osc, LUSTRE_VERSION_STRING, osc_init, osc_exit);