Whamcloud - gitweb
d0fa025d6a23c1ca9caca7a63ac2582b99c3ae0d
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2014, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #define DEBUG_SUBSYSTEM S_OSC
38
39 #include <libcfs/libcfs.h>
40
41 #include <lustre_dlm.h>
42 #include <lustre_net.h>
43 #include <lustre/lustre_user.h>
44 #include <obd_cksum.h>
45 #include <lustre_ha.h>
46 #include <lprocfs_status.h>
47 #include <lustre_ioctl.h>
48 #include <lustre_debug.h>
49 #include <lustre_param.h>
50 #include <lustre_fid.h>
51 #include <obd_class.h>
52 #include "osc_internal.h"
53 #include "osc_cl_internal.h"
54
55 struct osc_brw_async_args {
56         struct obdo              *aa_oa;
57         int                       aa_requested_nob;
58         int                       aa_nio_count;
59         u32                       aa_page_count;
60         int                       aa_resends;
61         struct brw_page **aa_ppga;
62         struct client_obd        *aa_cli;
63         struct list_head          aa_oaps;
64         struct list_head          aa_exts;
65         struct obd_capa  *aa_ocapa;
66         struct cl_req            *aa_clerq;
67 };
68
69 #define osc_grant_args osc_brw_async_args
70
71 struct osc_setattr_args {
72         struct obdo             *sa_oa;
73         obd_enqueue_update_f     sa_upcall;
74         void                    *sa_cookie;
75 };
76
77 struct osc_fsync_args {
78         struct obd_info *fa_oi;
79         obd_enqueue_update_f     fa_upcall;
80         void                    *fa_cookie;
81 };
82
83 struct osc_enqueue_args {
84         struct obd_export       *oa_exp;
85         ldlm_type_t             oa_type;
86         ldlm_mode_t             oa_mode;
87         __u64                   *oa_flags;
88         osc_enqueue_upcall_f    oa_upcall;
89         void                    *oa_cookie;
90         struct ost_lvb          *oa_lvb;
91         struct lustre_handle    oa_lockh;
92         unsigned int            oa_agl:1;
93 };
94
95 static void osc_release_ppga(struct brw_page **ppga, size_t count);
96 static int brw_interpret(const struct lu_env *env, struct ptlrpc_request *req,
97                          void *data, int rc);
98
99 static inline void osc_pack_capa(struct ptlrpc_request *req,
100                                  struct ost_body *body, void *capa)
101 {
102         struct obd_capa *oc = (struct obd_capa *)capa;
103         struct lustre_capa *c;
104
105         if (!capa)
106                 return;
107
108         c = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
109         LASSERT(c);
110         capa_cpy(c, oc);
111         body->oa.o_valid |= OBD_MD_FLOSSCAPA;
112         DEBUG_CAPA(D_SEC, c, "pack");
113 }
114
115 void osc_pack_req_body(struct ptlrpc_request *req, struct obd_info *oinfo)
116 {
117         struct ost_body *body;
118
119         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
120         LASSERT(body);
121
122         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
123                              oinfo->oi_oa);
124         osc_pack_capa(req, body, oinfo->oi_capa);
125 }
126
127 void osc_set_capa_size(struct ptlrpc_request *req,
128                        const struct req_msg_field *field,
129                        struct obd_capa *oc)
130 {
131         if (oc == NULL)
132                 req_capsule_set_size(&req->rq_pill, field, RCL_CLIENT, 0);
133         else
134                 /* it is already calculated as sizeof struct obd_capa */
135                 ;
136 }
137
138 int osc_getattr_interpret(const struct lu_env *env,
139                           struct ptlrpc_request *req,
140                           struct osc_async_args *aa, int rc)
141 {
142         struct ost_body *body;
143         ENTRY;
144
145         if (rc != 0)
146                 GOTO(out, rc);
147
148         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
149         if (body) {
150                 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
151                 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
152                                      aa->aa_oi->oi_oa, &body->oa);
153
154                 /* This should really be sent by the OST */
155                 aa->aa_oi->oi_oa->o_blksize = DT_MAX_BRW_SIZE;
156                 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
157         } else {
158                 CDEBUG(D_INFO, "can't unpack ost_body\n");
159                 rc = -EPROTO;
160                 aa->aa_oi->oi_oa->o_valid = 0;
161         }
162 out:
163         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
164         RETURN(rc);
165 }
166
167 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
168                        struct obd_info *oinfo)
169 {
170         struct ptlrpc_request *req;
171         struct ost_body       *body;
172         int                    rc;
173         ENTRY;
174
175         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
176         if (req == NULL)
177                 RETURN(-ENOMEM);
178
179         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
180         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
181         if (rc) {
182                 ptlrpc_request_free(req);
183                 RETURN(rc);
184         }
185
186         osc_pack_req_body(req, oinfo);
187
188         ptlrpc_request_set_replen(req);
189
190         rc = ptlrpc_queue_wait(req);
191         if (rc)
192                 GOTO(out, rc);
193
194         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
195         if (body == NULL)
196                 GOTO(out, rc = -EPROTO);
197
198         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
199         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oinfo->oi_oa,
200                              &body->oa);
201
202         oinfo->oi_oa->o_blksize = cli_brw_size(exp->exp_obd);
203         oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
204
205         EXIT;
206  out:
207         ptlrpc_req_finished(req);
208         return rc;
209 }
210
211 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
212                        struct obd_info *oinfo, struct obd_trans_info *oti)
213 {
214         struct ptlrpc_request *req;
215         struct ost_body       *body;
216         int                    rc;
217         ENTRY;
218
219         LASSERT(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP);
220
221         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
222         if (req == NULL)
223                 RETURN(-ENOMEM);
224
225         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
226         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
227         if (rc) {
228                 ptlrpc_request_free(req);
229                 RETURN(rc);
230         }
231
232         osc_pack_req_body(req, oinfo);
233
234         ptlrpc_request_set_replen(req);
235
236         rc = ptlrpc_queue_wait(req);
237         if (rc)
238                 GOTO(out, rc);
239
240         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
241         if (body == NULL)
242                 GOTO(out, rc = -EPROTO);
243
244         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oinfo->oi_oa,
245                              &body->oa);
246
247         EXIT;
248 out:
249         ptlrpc_req_finished(req);
250         RETURN(rc);
251 }
252
253 static int osc_setattr_interpret(const struct lu_env *env,
254                                  struct ptlrpc_request *req,
255                                  struct osc_setattr_args *sa, int rc)
256 {
257         struct ost_body *body;
258         ENTRY;
259
260         if (rc != 0)
261                 GOTO(out, rc);
262
263         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
264         if (body == NULL)
265                 GOTO(out, rc = -EPROTO);
266
267         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, sa->sa_oa,
268                              &body->oa);
269 out:
270         rc = sa->sa_upcall(sa->sa_cookie, rc);
271         RETURN(rc);
272 }
273
274 int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
275                       obd_enqueue_update_f upcall, void *cookie,
276                       struct ptlrpc_request_set *rqset)
277 {
278         struct ptlrpc_request   *req;
279         struct osc_setattr_args *sa;
280         int                      rc;
281         ENTRY;
282
283         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
284         if (req == NULL)
285                 RETURN(-ENOMEM);
286
287         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
288         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
289         if (rc) {
290                 ptlrpc_request_free(req);
291                 RETURN(rc);
292         }
293
294         osc_pack_req_body(req, oinfo);
295
296         ptlrpc_request_set_replen(req);
297
298         /* do mds to ost setattr asynchronously */
299         if (!rqset) {
300                 /* Do not wait for response. */
301                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
302         } else {
303                 req->rq_interpret_reply =
304                         (ptlrpc_interpterer_t)osc_setattr_interpret;
305
306                 CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
307                 sa = ptlrpc_req_async_args(req);
308                 sa->sa_oa = oinfo->oi_oa;
309                 sa->sa_upcall = upcall;
310                 sa->sa_cookie = cookie;
311
312                 if (rqset == PTLRPCD_SET)
313                         ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
314                 else
315                         ptlrpc_set_add_req(rqset, req);
316         }
317
318         RETURN(0);
319 }
320
321 static int osc_create(const struct lu_env *env, struct obd_export *exp,
322                       struct obdo *oa, struct obd_trans_info *oti)
323 {
324         struct ptlrpc_request *req;
325         struct ost_body       *body;
326         int                    rc;
327         ENTRY;
328
329         LASSERT(oa != NULL);
330         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
331         LASSERT(fid_seq_is_echo(ostid_seq(&oa->o_oi)));
332
333         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
334         if (req == NULL)
335                 GOTO(out, rc = -ENOMEM);
336
337         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
338         if (rc) {
339                 ptlrpc_request_free(req);
340                 GOTO(out, rc);
341         }
342
343         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
344         LASSERT(body);
345
346         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
347
348         ptlrpc_request_set_replen(req);
349
350         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
351             oa->o_flags == OBD_FL_DELORPHAN) {
352                 DEBUG_REQ(D_HA, req,
353                           "delorphan from OST integration");
354                 /* Don't resend the delorphan req */
355                 req->rq_no_resend = req->rq_no_delay = 1;
356         }
357
358         rc = ptlrpc_queue_wait(req);
359         if (rc)
360                 GOTO(out_req, rc);
361
362         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
363         if (body == NULL)
364                 GOTO(out_req, rc = -EPROTO);
365
366         CDEBUG(D_INFO, "oa flags %x\n", oa->o_flags);
367         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
368
369         oa->o_blksize = cli_brw_size(exp->exp_obd);
370         oa->o_valid |= OBD_MD_FLBLKSZ;
371
372         if (oti != NULL) {
373                 if (oa->o_valid & OBD_MD_FLCOOKIE) {
374                         if (oti->oti_logcookies == NULL)
375                                 oti->oti_logcookies = &oti->oti_onecookie;
376
377                         *oti->oti_logcookies = oa->o_lcookie;
378                 }
379         }
380
381         CDEBUG(D_HA, "transno: "LPD64"\n",
382                lustre_msg_get_transno(req->rq_repmsg));
383 out_req:
384         ptlrpc_req_finished(req);
385 out:
386         RETURN(rc);
387 }
388
389 int osc_punch_base(struct obd_export *exp, struct obd_info *oinfo,
390                    obd_enqueue_update_f upcall, void *cookie,
391                    struct ptlrpc_request_set *rqset)
392 {
393         struct ptlrpc_request   *req;
394         struct osc_setattr_args *sa;
395         struct ost_body         *body;
396         int                      rc;
397         ENTRY;
398
399         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
400         if (req == NULL)
401                 RETURN(-ENOMEM);
402
403         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
404         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
405         if (rc) {
406                 ptlrpc_request_free(req);
407                 RETURN(rc);
408         }
409         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
410         ptlrpc_at_set_req_timeout(req);
411
412         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
413         LASSERT(body);
414         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
415                              oinfo->oi_oa);
416         osc_pack_capa(req, body, oinfo->oi_capa);
417
418         ptlrpc_request_set_replen(req);
419
420         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
421         CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
422         sa = ptlrpc_req_async_args(req);
423         sa->sa_oa     = oinfo->oi_oa;
424         sa->sa_upcall = upcall;
425         sa->sa_cookie = cookie;
426         if (rqset == PTLRPCD_SET)
427                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
428         else
429                 ptlrpc_set_add_req(rqset, req);
430
431         RETURN(0);
432 }
433
434 static int osc_sync_interpret(const struct lu_env *env,
435                               struct ptlrpc_request *req,
436                               void *arg, int rc)
437 {
438         struct osc_fsync_args *fa = arg;
439         struct ost_body *body;
440         ENTRY;
441
442         if (rc)
443                 GOTO(out, rc);
444
445         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
446         if (body == NULL) {
447                 CERROR ("can't unpack ost_body\n");
448                 GOTO(out, rc = -EPROTO);
449         }
450
451         *fa->fa_oi->oi_oa = body->oa;
452 out:
453         rc = fa->fa_upcall(fa->fa_cookie, rc);
454         RETURN(rc);
455 }
456
457 int osc_sync_base(struct obd_export *exp, struct obd_info *oinfo,
458                   obd_enqueue_update_f upcall, void *cookie,
459                   struct ptlrpc_request_set *rqset)
460 {
461         struct ptlrpc_request *req;
462         struct ost_body       *body;
463         struct osc_fsync_args *fa;
464         int                    rc;
465         ENTRY;
466
467         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
468         if (req == NULL)
469                 RETURN(-ENOMEM);
470
471         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
472         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
473         if (rc) {
474                 ptlrpc_request_free(req);
475                 RETURN(rc);
476         }
477
478         /* overload the size and blocks fields in the oa with start/end */
479         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
480         LASSERT(body);
481         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
482                              oinfo->oi_oa);
483         osc_pack_capa(req, body, oinfo->oi_capa);
484
485         ptlrpc_request_set_replen(req);
486         req->rq_interpret_reply = osc_sync_interpret;
487
488         CLASSERT(sizeof(*fa) <= sizeof(req->rq_async_args));
489         fa = ptlrpc_req_async_args(req);
490         fa->fa_oi = oinfo;
491         fa->fa_upcall = upcall;
492         fa->fa_cookie = cookie;
493
494         if (rqset == PTLRPCD_SET)
495                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
496         else
497                 ptlrpc_set_add_req(rqset, req);
498
499         RETURN (0);
500 }
501
502 /* Find and cancel locally locks matched by @mode in the resource found by
503  * @objid. Found locks are added into @cancel list. Returns the amount of
504  * locks added to @cancels list. */
505 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
506                                    struct list_head *cancels,
507                                    ldlm_mode_t mode, __u64 lock_flags)
508 {
509         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
510         struct ldlm_res_id res_id;
511         struct ldlm_resource *res;
512         int count;
513         ENTRY;
514
515         /* Return, i.e. cancel nothing, only if ELC is supported (flag in
516          * export) but disabled through procfs (flag in NS).
517          *
518          * This distinguishes from a case when ELC is not supported originally,
519          * when we still want to cancel locks in advance and just cancel them
520          * locally, without sending any RPC. */
521         if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
522                 RETURN(0);
523
524         ostid_build_res_name(&oa->o_oi, &res_id);
525         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
526         if (IS_ERR(res))
527                 RETURN(0);
528
529         LDLM_RESOURCE_ADDREF(res);
530         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
531                                            lock_flags, 0, NULL);
532         LDLM_RESOURCE_DELREF(res);
533         ldlm_resource_putref(res);
534         RETURN(count);
535 }
536
537 static int osc_destroy_interpret(const struct lu_env *env,
538                                  struct ptlrpc_request *req, void *data,
539                                  int rc)
540 {
541         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
542
543         atomic_dec(&cli->cl_destroy_in_flight);
544         wake_up(&cli->cl_destroy_waitq);
545         return 0;
546 }
547
548 static int osc_can_send_destroy(struct client_obd *cli)
549 {
550         if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
551             cli->cl_max_rpcs_in_flight) {
552                 /* The destroy request can be sent */
553                 return 1;
554         }
555         if (atomic_dec_return(&cli->cl_destroy_in_flight) <
556             cli->cl_max_rpcs_in_flight) {
557                 /*
558                  * The counter has been modified between the two atomic
559                  * operations.
560                  */
561                 wake_up(&cli->cl_destroy_waitq);
562         }
563         return 0;
564 }
565
566 /* Destroy requests can be async always on the client, and we don't even really
567  * care about the return code since the client cannot do anything at all about
568  * a destroy failure.
569  * When the MDS is unlinking a filename, it saves the file objects into a
570  * recovery llog, and these object records are cancelled when the OST reports
571  * they were destroyed and sync'd to disk (i.e. transaction committed).
572  * If the client dies, or the OST is down when the object should be destroyed,
573  * the records are not cancelled, and when the OST reconnects to the MDS next,
574  * it will retrieve the llog unlink logs and then sends the log cancellation
575  * cookies to the MDS after committing destroy transactions. */
576 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
577                        struct obdo *oa, struct obd_trans_info *oti)
578 {
579         struct client_obd     *cli = &exp->exp_obd->u.cli;
580         struct ptlrpc_request *req;
581         struct ost_body       *body;
582         struct list_head       cancels = LIST_HEAD_INIT(cancels);
583         int rc, count;
584         ENTRY;
585
586         if (!oa) {
587                 CDEBUG(D_INFO, "oa NULL\n");
588                 RETURN(-EINVAL);
589         }
590
591         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
592                                         LDLM_FL_DISCARD_DATA);
593
594         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
595         if (req == NULL) {
596                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
597                 RETURN(-ENOMEM);
598         }
599
600         osc_set_capa_size(req, &RMF_CAPA1, NULL);
601         rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
602                                0, &cancels, count);
603         if (rc) {
604                 ptlrpc_request_free(req);
605                 RETURN(rc);
606         }
607
608         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
609         ptlrpc_at_set_req_timeout(req);
610
611         if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
612                 oa->o_lcookie = *oti->oti_logcookies;
613         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
614         LASSERT(body);
615         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
616
617         ptlrpc_request_set_replen(req);
618
619         /* If osc_destory is for destroying the unlink orphan,
620          * sent from MDT to OST, which should not be blocked here,
621          * because the process might be triggered by ptlrpcd, and
622          * it is not good to block ptlrpcd thread (b=16006)*/
623         if (!(oa->o_flags & OBD_FL_DELORPHAN)) {
624                 req->rq_interpret_reply = osc_destroy_interpret;
625                 if (!osc_can_send_destroy(cli)) {
626                         struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP,
627                                                           NULL);
628
629                         /*
630                          * Wait until the number of on-going destroy RPCs drops
631                          * under max_rpc_in_flight
632                          */
633                         l_wait_event_exclusive(cli->cl_destroy_waitq,
634                                                osc_can_send_destroy(cli), &lwi);
635                 }
636         }
637
638         /* Do not wait for response */
639         ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
640         RETURN(0);
641 }
642
643 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
644                                 long writing_bytes)
645 {
646         u64 bits = OBD_MD_FLBLOCKS | OBD_MD_FLGRANT;
647
648         LASSERT(!(oa->o_valid & bits));
649
650         oa->o_valid |= bits;
651         spin_lock(&cli->cl_loi_list_lock);
652         oa->o_dirty = cli->cl_dirty_pages << PAGE_CACHE_SHIFT;
653         if (unlikely(cli->cl_dirty_pages - cli->cl_dirty_transit >
654                      cli->cl_dirty_max_pages)) {
655                 CERROR("dirty %lu - %lu > dirty_max %lu\n",
656                        cli->cl_dirty_pages, cli->cl_dirty_transit,
657                        cli->cl_dirty_max_pages);
658                 oa->o_undirty = 0;
659         } else if (unlikely(atomic_long_read(&obd_dirty_pages) -
660                             atomic_long_read(&obd_dirty_transit_pages) >
661                             (obd_max_dirty_pages + 1))) {
662                 /* The atomic_read() allowing the atomic_inc() are
663                  * not covered by a lock thus they may safely race and trip
664                  * this CERROR() unless we add in a small fudge factor (+1). */
665                 CERROR("%s: dirty %ld - %ld > system dirty_max %lu\n",
666                        cli->cl_import->imp_obd->obd_name,
667                        atomic_long_read(&obd_dirty_pages),
668                        atomic_long_read(&obd_dirty_transit_pages),
669                        obd_max_dirty_pages);
670                 oa->o_undirty = 0;
671         } else if (unlikely(cli->cl_dirty_max_pages - cli->cl_dirty_pages >
672                             0x7fffffff)) {
673                 CERROR("dirty %lu - dirty_max %lu too big???\n",
674                        cli->cl_dirty_pages, cli->cl_dirty_max_pages);
675                 oa->o_undirty = 0;
676         } else {
677                 unsigned long max_in_flight = (cli->cl_max_pages_per_rpc <<
678                                       PAGE_CACHE_SHIFT) *
679                                      (cli->cl_max_rpcs_in_flight + 1);
680                 oa->o_undirty = max(cli->cl_dirty_max_pages << PAGE_CACHE_SHIFT,
681                                     max_in_flight);
682         }
683         oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
684         oa->o_dropped = cli->cl_lost_grant;
685         cli->cl_lost_grant = 0;
686         spin_unlock(&cli->cl_loi_list_lock);
687         CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
688                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
689
690 }
691
692 void osc_update_next_shrink(struct client_obd *cli)
693 {
694         cli->cl_next_shrink_grant =
695                 cfs_time_shift(cli->cl_grant_shrink_interval);
696         CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
697                cli->cl_next_shrink_grant);
698 }
699
700 static void __osc_update_grant(struct client_obd *cli, u64 grant)
701 {
702         spin_lock(&cli->cl_loi_list_lock);
703         cli->cl_avail_grant += grant;
704         spin_unlock(&cli->cl_loi_list_lock);
705 }
706
707 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
708 {
709         if (body->oa.o_valid & OBD_MD_FLGRANT) {
710                 CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
711                 __osc_update_grant(cli, body->oa.o_grant);
712         }
713 }
714
715 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
716                               u32 keylen, void *key,
717                               u32 vallen, void *val,
718                               struct ptlrpc_request_set *set);
719
720 static int osc_shrink_grant_interpret(const struct lu_env *env,
721                                       struct ptlrpc_request *req,
722                                       void *aa, int rc)
723 {
724         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
725         struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
726         struct ost_body *body;
727
728         if (rc != 0) {
729                 __osc_update_grant(cli, oa->o_grant);
730                 GOTO(out, rc);
731         }
732
733         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
734         LASSERT(body);
735         osc_update_grant(cli, body);
736 out:
737         OBDO_FREE(oa);
738         return rc;
739 }
740
741 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
742 {
743         spin_lock(&cli->cl_loi_list_lock);
744         oa->o_grant = cli->cl_avail_grant / 4;
745         cli->cl_avail_grant -= oa->o_grant;
746         spin_unlock(&cli->cl_loi_list_lock);
747         if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
748                 oa->o_valid |= OBD_MD_FLFLAGS;
749                 oa->o_flags = 0;
750         }
751         oa->o_flags |= OBD_FL_SHRINK_GRANT;
752         osc_update_next_shrink(cli);
753 }
754
755 /* Shrink the current grant, either from some large amount to enough for a
756  * full set of in-flight RPCs, or if we have already shrunk to that limit
757  * then to enough for a single RPC.  This avoids keeping more grant than
758  * needed, and avoids shrinking the grant piecemeal. */
759 static int osc_shrink_grant(struct client_obd *cli)
760 {
761         __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) *
762                              (cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT);
763
764         spin_lock(&cli->cl_loi_list_lock);
765         if (cli->cl_avail_grant <= target_bytes)
766                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
767         spin_unlock(&cli->cl_loi_list_lock);
768
769         return osc_shrink_grant_to_target(cli, target_bytes);
770 }
771
772 int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
773 {
774         int                     rc = 0;
775         struct ost_body        *body;
776         ENTRY;
777
778         spin_lock(&cli->cl_loi_list_lock);
779         /* Don't shrink if we are already above or below the desired limit
780          * We don't want to shrink below a single RPC, as that will negatively
781          * impact block allocation and long-term performance. */
782         if (target_bytes < cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT)
783                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
784
785         if (target_bytes >= cli->cl_avail_grant) {
786                 spin_unlock(&cli->cl_loi_list_lock);
787                 RETURN(0);
788         }
789         spin_unlock(&cli->cl_loi_list_lock);
790
791         OBD_ALLOC_PTR(body);
792         if (!body)
793                 RETURN(-ENOMEM);
794
795         osc_announce_cached(cli, &body->oa, 0);
796
797         spin_lock(&cli->cl_loi_list_lock);
798         body->oa.o_grant = cli->cl_avail_grant - target_bytes;
799         cli->cl_avail_grant = target_bytes;
800         spin_unlock(&cli->cl_loi_list_lock);
801         if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
802                 body->oa.o_valid |= OBD_MD_FLFLAGS;
803                 body->oa.o_flags = 0;
804         }
805         body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
806         osc_update_next_shrink(cli);
807
808         rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
809                                 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
810                                 sizeof(*body), body, NULL);
811         if (rc != 0)
812                 __osc_update_grant(cli, body->oa.o_grant);
813         OBD_FREE_PTR(body);
814         RETURN(rc);
815 }
816
817 static int osc_should_shrink_grant(struct client_obd *client)
818 {
819         cfs_time_t time = cfs_time_current();
820         cfs_time_t next_shrink = client->cl_next_shrink_grant;
821
822         if ((client->cl_import->imp_connect_data.ocd_connect_flags &
823              OBD_CONNECT_GRANT_SHRINK) == 0)
824                 return 0;
825
826         if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
827                 /* Get the current RPC size directly, instead of going via:
828                  * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
829                  * Keep comment here so that it can be found by searching. */
830                 int brw_size = client->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
831
832                 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
833                     client->cl_avail_grant > brw_size)
834                         return 1;
835                 else
836                         osc_update_next_shrink(client);
837         }
838         return 0;
839 }
840
841 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
842 {
843         struct client_obd *client;
844
845         list_for_each_entry(client, &item->ti_obd_list, cl_grant_shrink_list) {
846                 if (osc_should_shrink_grant(client))
847                         osc_shrink_grant(client);
848         }
849         return 0;
850 }
851
852 static int osc_add_shrink_grant(struct client_obd *client)
853 {
854         int rc;
855
856         rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
857                                        TIMEOUT_GRANT,
858                                        osc_grant_shrink_grant_cb, NULL,
859                                        &client->cl_grant_shrink_list);
860         if (rc) {
861                 CERROR("add grant client %s error %d\n",
862                         client->cl_import->imp_obd->obd_name, rc);
863                 return rc;
864         }
865         CDEBUG(D_CACHE, "add grant client %s \n",
866                client->cl_import->imp_obd->obd_name);
867         osc_update_next_shrink(client);
868         return 0;
869 }
870
871 static int osc_del_shrink_grant(struct client_obd *client)
872 {
873         return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
874                                          TIMEOUT_GRANT);
875 }
876
877 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
878 {
879         /*
880          * ocd_grant is the total grant amount we're expect to hold: if we've
881          * been evicted, it's the new avail_grant amount, cl_dirty_pages will
882          * drop to 0 as inflight RPCs fail out; otherwise, it's avail_grant +
883          * dirty.
884          *
885          * race is tolerable here: if we're evicted, but imp_state already
886          * left EVICTED state, then cl_dirty_pages must be 0 already.
887          */
888         spin_lock(&cli->cl_loi_list_lock);
889         if (cli->cl_import->imp_state == LUSTRE_IMP_EVICTED)
890                 cli->cl_avail_grant = ocd->ocd_grant;
891         else
892                 cli->cl_avail_grant = ocd->ocd_grant -
893                                       (cli->cl_dirty_pages << PAGE_CACHE_SHIFT);
894
895         if (cli->cl_avail_grant < 0) {
896                 CWARN("%s: available grant < 0: avail/ocd/dirty %ld/%u/%ld\n",
897                       cli->cl_import->imp_obd->obd_name, cli->cl_avail_grant,
898                       ocd->ocd_grant, cli->cl_dirty_pages << PAGE_CACHE_SHIFT);
899                 /* workaround for servers which do not have the patch from
900                  * LU-2679 */
901                 cli->cl_avail_grant = ocd->ocd_grant;
902         }
903
904         /* determine the appropriate chunk size used by osc_extent. */
905         cli->cl_chunkbits = max_t(int, PAGE_CACHE_SHIFT, ocd->ocd_blocksize);
906         spin_unlock(&cli->cl_loi_list_lock);
907
908         CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld."
909                 "chunk bits: %d.\n", cli->cl_import->imp_obd->obd_name,
910                 cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits);
911
912         if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
913             list_empty(&cli->cl_grant_shrink_list))
914                 osc_add_shrink_grant(cli);
915 }
916
917 /* We assume that the reason this OSC got a short read is because it read
918  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
919  * via the LOV, and it _knows_ it's reading inside the file, it's just that
920  * this stripe never got written at or beyond this stripe offset yet. */
921 static void handle_short_read(int nob_read, size_t page_count,
922                               struct brw_page **pga)
923 {
924         char *ptr;
925         int i = 0;
926
927         /* skip bytes read OK */
928         while (nob_read > 0) {
929                 LASSERT (page_count > 0);
930
931                 if (pga[i]->count > nob_read) {
932                         /* EOF inside this page */
933                         ptr = kmap(pga[i]->pg) +
934                                 (pga[i]->off & ~CFS_PAGE_MASK);
935                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
936                         kunmap(pga[i]->pg);
937                         page_count--;
938                         i++;
939                         break;
940                 }
941
942                 nob_read -= pga[i]->count;
943                 page_count--;
944                 i++;
945         }
946
947         /* zero remaining pages */
948         while (page_count-- > 0) {
949                 ptr = kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
950                 memset(ptr, 0, pga[i]->count);
951                 kunmap(pga[i]->pg);
952                 i++;
953         }
954 }
955
956 static int check_write_rcs(struct ptlrpc_request *req,
957                            int requested_nob, int niocount,
958                            size_t page_count, struct brw_page **pga)
959 {
960         int     i;
961         __u32   *remote_rcs;
962
963         remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
964                                                   sizeof(*remote_rcs) *
965                                                   niocount);
966         if (remote_rcs == NULL) {
967                 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
968                 return(-EPROTO);
969         }
970
971         /* return error if any niobuf was in error */
972         for (i = 0; i < niocount; i++) {
973                 if ((int)remote_rcs[i] < 0)
974                         return(remote_rcs[i]);
975
976                 if (remote_rcs[i] != 0) {
977                         CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
978                                 i, remote_rcs[i], req);
979                         return(-EPROTO);
980                 }
981         }
982
983         if (req->rq_bulk->bd_nob_transferred != requested_nob) {
984                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
985                        req->rq_bulk->bd_nob_transferred, requested_nob);
986                 return(-EPROTO);
987         }
988
989         return (0);
990 }
991
992 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
993 {
994         if (p1->flag != p2->flag) {
995                 unsigned mask = ~(OBD_BRW_FROM_GRANT | OBD_BRW_NOCACHE |
996                                   OBD_BRW_SYNC       | OBD_BRW_ASYNC   |
997                                   OBD_BRW_NOQUOTA    | OBD_BRW_SOFT_SYNC);
998
999                 /* warn if we try to combine flags that we don't know to be
1000                  * safe to combine */
1001                 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
1002                         CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
1003                               "report this at https://jira.hpdd.intel.com/\n",
1004                               p1->flag, p2->flag);
1005                 }
1006                 return 0;
1007         }
1008
1009         return (p1->off + p1->count == p2->off);
1010 }
1011
1012 static u32 osc_checksum_bulk(int nob, size_t pg_count,
1013                              struct brw_page **pga, int opc,
1014                              cksum_type_t cksum_type)
1015 {
1016         u32                             cksum;
1017         int                             i = 0;
1018         struct cfs_crypto_hash_desc     *hdesc;
1019         unsigned int                    bufsize;
1020         int                             err;
1021         unsigned char                   cfs_alg = cksum_obd2cfs(cksum_type);
1022
1023         LASSERT(pg_count > 0);
1024
1025         hdesc = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1026         if (IS_ERR(hdesc)) {
1027                 CERROR("Unable to initialize checksum hash %s\n",
1028                        cfs_crypto_hash_name(cfs_alg));
1029                 return PTR_ERR(hdesc);
1030         }
1031
1032         while (nob > 0 && pg_count > 0) {
1033                 unsigned int count = pga[i]->count > nob ? nob : pga[i]->count;
1034
1035                 /* corrupt the data before we compute the checksum, to
1036                  * simulate an OST->client data error */
1037                 if (i == 0 && opc == OST_READ &&
1038                     OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
1039                         unsigned char *ptr = kmap(pga[i]->pg);
1040                         int off = pga[i]->off & ~CFS_PAGE_MASK;
1041
1042                         memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob));
1043                         kunmap(pga[i]->pg);
1044                 }
1045                 cfs_crypto_hash_update_page(hdesc, pga[i]->pg,
1046                                             pga[i]->off & ~CFS_PAGE_MASK,
1047                                             count);
1048                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d\n",
1049                                (int)(pga[i]->off & ~CFS_PAGE_MASK));
1050
1051                 nob -= pga[i]->count;
1052                 pg_count--;
1053                 i++;
1054         }
1055
1056         bufsize = sizeof(cksum);
1057         err = cfs_crypto_hash_final(hdesc, (unsigned char *)&cksum, &bufsize);
1058
1059         /* For sending we only compute the wrong checksum instead
1060          * of corrupting the data so it is still correct on a redo */
1061         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1062                 cksum++;
1063
1064         return cksum;
1065 }
1066
1067 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
1068                                 struct lov_stripe_md *lsm, u32 page_count,
1069                                 struct brw_page **pga,
1070                                 struct ptlrpc_request **reqp,
1071                                 struct obd_capa *ocapa, int reserve,
1072                                 int resend)
1073 {
1074         struct ptlrpc_request   *req;
1075         struct ptlrpc_bulk_desc *desc;
1076         struct ost_body         *body;
1077         struct obd_ioobj        *ioobj;
1078         struct niobuf_remote    *niobuf;
1079         int niocount, i, requested_nob, opc, rc;
1080         struct osc_brw_async_args *aa;
1081         struct req_capsule      *pill;
1082         struct brw_page *pg_prev;
1083
1084         ENTRY;
1085         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1086                 RETURN(-ENOMEM); /* Recoverable */
1087         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1088                 RETURN(-EINVAL); /* Fatal */
1089
1090         if ((cmd & OBD_BRW_WRITE) != 0) {
1091                 opc = OST_WRITE;
1092                 req = ptlrpc_request_alloc_pool(cli->cl_import,
1093                                                 cli->cl_import->imp_rq_pool,
1094                                                 &RQF_OST_BRW_WRITE);
1095         } else {
1096                 opc = OST_READ;
1097                 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1098         }
1099         if (req == NULL)
1100                 RETURN(-ENOMEM);
1101
1102         for (niocount = i = 1; i < page_count; i++) {
1103                 if (!can_merge_pages(pga[i - 1], pga[i]))
1104                         niocount++;
1105         }
1106
1107         pill = &req->rq_pill;
1108         req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1109                              sizeof(*ioobj));
1110         req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1111                              niocount * sizeof(*niobuf));
1112         osc_set_capa_size(req, &RMF_CAPA1, ocapa);
1113
1114         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1115         if (rc) {
1116                 ptlrpc_request_free(req);
1117                 RETURN(rc);
1118         }
1119         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1120         ptlrpc_at_set_req_timeout(req);
1121         /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1122          * retry logic */
1123         req->rq_no_retry_einprogress = 1;
1124
1125         desc = ptlrpc_prep_bulk_imp(req, page_count,
1126                 cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
1127                 opc == OST_WRITE ? BULK_GET_SOURCE : BULK_PUT_SINK,
1128                 OST_BULK_PORTAL);
1129
1130         if (desc == NULL)
1131                 GOTO(out, rc = -ENOMEM);
1132         /* NB request now owns desc and will free it when it gets freed */
1133
1134         body = req_capsule_client_get(pill, &RMF_OST_BODY);
1135         ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1136         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1137         LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1138
1139         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
1140
1141         obdo_to_ioobj(oa, ioobj);
1142         ioobj->ioo_bufcnt = niocount;
1143         /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
1144          * that might be send for this request.  The actual number is decided
1145          * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
1146          * "max - 1" for old client compatibility sending "0", and also so the
1147          * the actual maximum is a power-of-two number, not one less. LU-1431 */
1148         ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
1149         osc_pack_capa(req, body, ocapa);
1150         LASSERT(page_count > 0);
1151         pg_prev = pga[0];
1152         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1153                 struct brw_page *pg = pga[i];
1154                 int poff = pg->off & ~CFS_PAGE_MASK;
1155
1156                 LASSERT(pg->count > 0);
1157                 /* make sure there is no gap in the middle of page array */
1158                 LASSERTF(page_count == 1 ||
1159                          (ergo(i == 0, poff + pg->count == PAGE_CACHE_SIZE) &&
1160                           ergo(i > 0 && i < page_count - 1,
1161                                poff == 0 && pg->count == PAGE_CACHE_SIZE)   &&
1162                           ergo(i == page_count - 1, poff == 0)),
1163                          "i: %d/%d pg: %p off: "LPU64", count: %u\n",
1164                          i, page_count, pg, pg->off, pg->count);
1165                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1166                          "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1167                          " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1168                          i, page_count,
1169                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1170                          pg_prev->pg, page_private(pg_prev->pg),
1171                          pg_prev->pg->index, pg_prev->off);
1172                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1173                         (pg->flag & OBD_BRW_SRVLOCK));
1174
1175                 ptlrpc_prep_bulk_page_pin(desc, pg->pg, poff, pg->count);
1176                 requested_nob += pg->count;
1177
1178                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1179                         niobuf--;
1180                         niobuf->rnb_len += pg->count;
1181                 } else {
1182                         niobuf->rnb_offset = pg->off;
1183                         niobuf->rnb_len    = pg->count;
1184                         niobuf->rnb_flags  = pg->flag;
1185                 }
1186                 pg_prev = pg;
1187         }
1188
1189         LASSERTF((void *)(niobuf - niocount) ==
1190                 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1191                 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1192                 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1193
1194         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1195         if (resend) {
1196                 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1197                         body->oa.o_valid |= OBD_MD_FLFLAGS;
1198                         body->oa.o_flags = 0;
1199                 }
1200                 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1201         }
1202
1203         if (osc_should_shrink_grant(cli))
1204                 osc_shrink_grant_local(cli, &body->oa);
1205
1206         /* size[REQ_REC_OFF] still sizeof (*body) */
1207         if (opc == OST_WRITE) {
1208                 if (cli->cl_checksum &&
1209                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1210                         /* store cl_cksum_type in a local variable since
1211                          * it can be changed via lprocfs */
1212                         cksum_type_t cksum_type = cli->cl_cksum_type;
1213
1214                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1215                                 oa->o_flags &= OBD_FL_LOCAL_MASK;
1216                                 body->oa.o_flags = 0;
1217                         }
1218                         body->oa.o_flags |= cksum_type_pack(cksum_type);
1219                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1220                         body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1221                                                              page_count, pga,
1222                                                              OST_WRITE,
1223                                                              cksum_type);
1224                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1225                                body->oa.o_cksum);
1226                         /* save this in 'oa', too, for later checking */
1227                         oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1228                         oa->o_flags |= cksum_type_pack(cksum_type);
1229                 } else {
1230                         /* clear out the checksum flag, in case this is a
1231                          * resend but cl_checksum is no longer set. b=11238 */
1232                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1233                 }
1234                 oa->o_cksum = body->oa.o_cksum;
1235                 /* 1 RC per niobuf */
1236                 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1237                                      sizeof(__u32) * niocount);
1238         } else {
1239                 if (cli->cl_checksum &&
1240                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1241                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1242                                 body->oa.o_flags = 0;
1243                         body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1244                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1245                 }
1246         }
1247         ptlrpc_request_set_replen(req);
1248
1249         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1250         aa = ptlrpc_req_async_args(req);
1251         aa->aa_oa = oa;
1252         aa->aa_requested_nob = requested_nob;
1253         aa->aa_nio_count = niocount;
1254         aa->aa_page_count = page_count;
1255         aa->aa_resends = 0;
1256         aa->aa_ppga = pga;
1257         aa->aa_cli = cli;
1258         INIT_LIST_HEAD(&aa->aa_oaps);
1259         if (ocapa && reserve)
1260                 aa->aa_ocapa = capa_get(ocapa);
1261
1262         *reqp = req;
1263         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1264         CDEBUG(D_RPCTRACE, "brw rpc %p - object "DOSTID" offset %lld<>%lld\n",
1265                 req, POSTID(&oa->o_oi), niobuf[0].rnb_offset,
1266                 niobuf[niocount - 1].rnb_offset + niobuf[niocount - 1].rnb_len);
1267         RETURN(0);
1268
1269  out:
1270         ptlrpc_req_finished(req);
1271         RETURN(rc);
1272 }
1273
1274 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1275                                 __u32 client_cksum, __u32 server_cksum, int nob,
1276                                 size_t page_count, struct brw_page **pga,
1277                                 cksum_type_t client_cksum_type)
1278 {
1279         __u32 new_cksum;
1280         char *msg;
1281         cksum_type_t cksum_type;
1282
1283         if (server_cksum == client_cksum) {
1284                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1285                 return 0;
1286         }
1287
1288         cksum_type = cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1289                                        oa->o_flags : 0);
1290         new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1291                                       cksum_type);
1292
1293         if (cksum_type != client_cksum_type)
1294                 msg = "the server did not use the checksum type specified in "
1295                       "the original request - likely a protocol problem";
1296         else if (new_cksum == server_cksum)
1297                 msg = "changed on the client after we checksummed it - "
1298                       "likely false positive due to mmap IO (bug 11742)";
1299         else if (new_cksum == client_cksum)
1300                 msg = "changed in transit before arrival at OST";
1301         else
1302                 msg = "changed in transit AND doesn't match the original - "
1303                       "likely false positive due to mmap IO (bug 11742)";
1304
1305         LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inode "DFID
1306                            " object "DOSTID" extent ["LPU64"-"LPU64"]\n",
1307                            msg, libcfs_nid2str(peer->nid),
1308                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1309                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1310                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1311                            POSTID(&oa->o_oi), pga[0]->off,
1312                            pga[page_count-1]->off + pga[page_count-1]->count - 1);
1313         CERROR("original client csum %x (type %x), server csum %x (type %x), "
1314                "client csum now %x\n", client_cksum, client_cksum_type,
1315                server_cksum, cksum_type, new_cksum);
1316         return 1;
1317 }
1318
1319 /* Note rc enters this function as number of bytes transferred */
1320 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1321 {
1322         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1323         const lnet_process_id_t *peer =
1324                         &req->rq_import->imp_connection->c_peer;
1325         struct client_obd *cli = aa->aa_cli;
1326         struct ost_body *body;
1327         u32 client_cksum = 0;
1328         ENTRY;
1329
1330         if (rc < 0 && rc != -EDQUOT) {
1331                 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1332                 RETURN(rc);
1333         }
1334
1335         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1336         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1337         if (body == NULL) {
1338                 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1339                 RETURN(-EPROTO);
1340         }
1341
1342         /* set/clear over quota flag for a uid/gid */
1343         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1344             body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) {
1345                 unsigned int qid[MAXQUOTAS] = { body->oa.o_uid, body->oa.o_gid };
1346
1347                 CDEBUG(D_QUOTA, "setdq for [%u %u] with valid "LPX64", flags %x\n",
1348                        body->oa.o_uid, body->oa.o_gid, body->oa.o_valid,
1349                        body->oa.o_flags);
1350                 osc_quota_setdq(cli, qid, body->oa.o_valid, body->oa.o_flags);
1351         }
1352
1353         osc_update_grant(cli, body);
1354
1355         if (rc < 0)
1356                 RETURN(rc);
1357
1358         if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1359                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1360
1361         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1362                 if (rc > 0) {
1363                         CERROR("Unexpected +ve rc %d\n", rc);
1364                         RETURN(-EPROTO);
1365                 }
1366                 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1367
1368                 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1369                         RETURN(-EAGAIN);
1370
1371                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1372                     check_write_checksum(&body->oa, peer, client_cksum,
1373                                          body->oa.o_cksum, aa->aa_requested_nob,
1374                                          aa->aa_page_count, aa->aa_ppga,
1375                                          cksum_type_unpack(aa->aa_oa->o_flags)))
1376                         RETURN(-EAGAIN);
1377
1378                 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1379                                      aa->aa_page_count, aa->aa_ppga);
1380                 GOTO(out, rc);
1381         }
1382
1383         /* The rest of this function executes only for OST_READs */
1384
1385         /* if unwrap_bulk failed, return -EAGAIN to retry */
1386         rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1387         if (rc < 0)
1388                 GOTO(out, rc = -EAGAIN);
1389
1390         if (rc > aa->aa_requested_nob) {
1391                 CERROR("Unexpected rc %d (%d requested)\n", rc,
1392                        aa->aa_requested_nob);
1393                 RETURN(-EPROTO);
1394         }
1395
1396         if (rc != req->rq_bulk->bd_nob_transferred) {
1397                 CERROR ("Unexpected rc %d (%d transferred)\n",
1398                         rc, req->rq_bulk->bd_nob_transferred);
1399                 return (-EPROTO);
1400         }
1401
1402         if (rc < aa->aa_requested_nob)
1403                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1404
1405         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1406                 static int cksum_counter;
1407                 u32        server_cksum = body->oa.o_cksum;
1408                 char      *via = "";
1409                 char      *router = "";
1410                 cksum_type_t cksum_type;
1411
1412                 cksum_type = cksum_type_unpack(body->oa.o_valid &OBD_MD_FLFLAGS?
1413                                                body->oa.o_flags : 0);
1414                 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1415                                                  aa->aa_ppga, OST_READ,
1416                                                  cksum_type);
1417
1418                 if (peer->nid != req->rq_bulk->bd_sender) {
1419                         via = " via ";
1420                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
1421                 }
1422
1423                 if (server_cksum != client_cksum) {
1424                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1425                                            "%s%s%s inode "DFID" object "DOSTID
1426                                            " extent ["LPU64"-"LPU64"]\n",
1427                                            req->rq_import->imp_obd->obd_name,
1428                                            libcfs_nid2str(peer->nid),
1429                                            via, router,
1430                                            body->oa.o_valid & OBD_MD_FLFID ?
1431                                                 body->oa.o_parent_seq : (__u64)0,
1432                                            body->oa.o_valid & OBD_MD_FLFID ?
1433                                                 body->oa.o_parent_oid : 0,
1434                                            body->oa.o_valid & OBD_MD_FLFID ?
1435                                                 body->oa.o_parent_ver : 0,
1436                                            POSTID(&body->oa.o_oi),
1437                                            aa->aa_ppga[0]->off,
1438                                            aa->aa_ppga[aa->aa_page_count-1]->off +
1439                                            aa->aa_ppga[aa->aa_page_count-1]->count -
1440                                                                         1);
1441                         CERROR("client %x, server %x, cksum_type %x\n",
1442                                client_cksum, server_cksum, cksum_type);
1443                         cksum_counter = 0;
1444                         aa->aa_oa->o_cksum = client_cksum;
1445                         rc = -EAGAIN;
1446                 } else {
1447                         cksum_counter++;
1448                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1449                         rc = 0;
1450                 }
1451         } else if (unlikely(client_cksum)) {
1452                 static int cksum_missed;
1453
1454                 cksum_missed++;
1455                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1456                         CERROR("Checksum %u requested from %s but not sent\n",
1457                                cksum_missed, libcfs_nid2str(peer->nid));
1458         } else {
1459                 rc = 0;
1460         }
1461 out:
1462         if (rc >= 0)
1463                 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
1464                                      aa->aa_oa, &body->oa);
1465
1466         RETURN(rc);
1467 }
1468
1469 static int osc_brw_redo_request(struct ptlrpc_request *request,
1470                                 struct osc_brw_async_args *aa, int rc)
1471 {
1472         struct ptlrpc_request *new_req;
1473         struct osc_brw_async_args *new_aa;
1474         struct osc_async_page *oap;
1475         ENTRY;
1476
1477         DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
1478                   "redo for recoverable error %d", rc);
1479
1480         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1481                                         OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1482                                   aa->aa_cli, aa->aa_oa,
1483                                   NULL /* lsm unused by osc currently */,
1484                                   aa->aa_page_count, aa->aa_ppga,
1485                                   &new_req, aa->aa_ocapa, 0, 1);
1486         if (rc)
1487                 RETURN(rc);
1488
1489         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1490                 if (oap->oap_request != NULL) {
1491                         LASSERTF(request == oap->oap_request,
1492                                  "request %p != oap_request %p\n",
1493                                  request, oap->oap_request);
1494                         if (oap->oap_interrupted) {
1495                                 ptlrpc_req_finished(new_req);
1496                                 RETURN(-EINTR);
1497                         }
1498                 }
1499         }
1500         /* New request takes over pga and oaps from old request.
1501          * Note that copying a list_head doesn't work, need to move it... */
1502         aa->aa_resends++;
1503         new_req->rq_interpret_reply = request->rq_interpret_reply;
1504         new_req->rq_async_args = request->rq_async_args;
1505         new_req->rq_commit_cb = request->rq_commit_cb;
1506         /* cap resend delay to the current request timeout, this is similar to
1507          * what ptlrpc does (see after_reply()) */
1508         if (aa->aa_resends > new_req->rq_timeout)
1509                 new_req->rq_sent = cfs_time_current_sec() + new_req->rq_timeout;
1510         else
1511                 new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
1512         new_req->rq_generation_set = 1;
1513         new_req->rq_import_generation = request->rq_import_generation;
1514
1515         new_aa = ptlrpc_req_async_args(new_req);
1516
1517         INIT_LIST_HEAD(&new_aa->aa_oaps);
1518         list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
1519         INIT_LIST_HEAD(&new_aa->aa_exts);
1520         list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
1521         new_aa->aa_resends = aa->aa_resends;
1522
1523         list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1524                 if (oap->oap_request) {
1525                         ptlrpc_req_finished(oap->oap_request);
1526                         oap->oap_request = ptlrpc_request_addref(new_req);
1527                 }
1528         }
1529
1530         new_aa->aa_ocapa = aa->aa_ocapa;
1531         aa->aa_ocapa = NULL;
1532
1533         /* XXX: This code will run into problem if we're going to support
1534          * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
1535          * and wait for all of them to be finished. We should inherit request
1536          * set from old request. */
1537         ptlrpcd_add_req(new_req, PDL_POLICY_SAME, -1);
1538
1539         DEBUG_REQ(D_INFO, new_req, "new request");
1540         RETURN(0);
1541 }
1542
1543 /*
1544  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1545  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1546  * fine for our small page arrays and doesn't require allocation.  its an
1547  * insertion sort that swaps elements that are strides apart, shrinking the
1548  * stride down until its '1' and the array is sorted.
1549  */
1550 static void sort_brw_pages(struct brw_page **array, int num)
1551 {
1552         int stride, i, j;
1553         struct brw_page *tmp;
1554
1555         if (num == 1)
1556                 return;
1557         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1558                 ;
1559
1560         do {
1561                 stride /= 3;
1562                 for (i = stride ; i < num ; i++) {
1563                         tmp = array[i];
1564                         j = i;
1565                         while (j >= stride && array[j - stride]->off > tmp->off) {
1566                                 array[j] = array[j - stride];
1567                                 j -= stride;
1568                         }
1569                         array[j] = tmp;
1570                 }
1571         } while (stride > 1);
1572 }
1573
1574 static void osc_release_ppga(struct brw_page **ppga, size_t count)
1575 {
1576         LASSERT(ppga != NULL);
1577         OBD_FREE(ppga, sizeof(*ppga) * count);
1578 }
1579
1580 static int brw_interpret(const struct lu_env *env,
1581                          struct ptlrpc_request *req, void *data, int rc)
1582 {
1583         struct osc_brw_async_args *aa = data;
1584         struct osc_extent *ext;
1585         struct osc_extent *tmp;
1586         struct client_obd *cli = aa->aa_cli;
1587         ENTRY;
1588
1589         rc = osc_brw_fini_request(req, rc);
1590         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
1591         /* When server return -EINPROGRESS, client should always retry
1592          * regardless of the number of times the bulk was resent already. */
1593         if (osc_recoverable_error(rc)) {
1594                 if (req->rq_import_generation !=
1595                     req->rq_import->imp_generation) {
1596                         CDEBUG(D_HA, "%s: resend cross eviction for object: "
1597                                ""DOSTID", rc = %d.\n",
1598                                req->rq_import->imp_obd->obd_name,
1599                                POSTID(&aa->aa_oa->o_oi), rc);
1600                 } else if (rc == -EINPROGRESS ||
1601                     client_should_resend(aa->aa_resends, aa->aa_cli)) {
1602                         rc = osc_brw_redo_request(req, aa, rc);
1603                 } else {
1604                         CERROR("%s: too many resent retries for object: "
1605                                ""LPU64":"LPU64", rc = %d.\n",
1606                                req->rq_import->imp_obd->obd_name,
1607                                POSTID(&aa->aa_oa->o_oi), rc);
1608                 }
1609
1610                 if (rc == 0)
1611                         RETURN(0);
1612                 else if (rc == -EAGAIN || rc == -EINPROGRESS)
1613                         rc = -EIO;
1614         }
1615
1616         if (aa->aa_ocapa) {
1617                 capa_put(aa->aa_ocapa);
1618                 aa->aa_ocapa = NULL;
1619         }
1620
1621         if (rc == 0) {
1622                 struct obdo *oa = aa->aa_oa;
1623                 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
1624                 unsigned long valid = 0;
1625                 struct cl_object *obj;
1626                 struct osc_async_page *last;
1627
1628                 last = brw_page2oap(aa->aa_ppga[aa->aa_page_count - 1]);
1629                 obj = osc2cl(last->oap_obj);
1630
1631                 cl_object_attr_lock(obj);
1632                 if (oa->o_valid & OBD_MD_FLBLOCKS) {
1633                         attr->cat_blocks = oa->o_blocks;
1634                         valid |= CAT_BLOCKS;
1635                 }
1636                 if (oa->o_valid & OBD_MD_FLMTIME) {
1637                         attr->cat_mtime = oa->o_mtime;
1638                         valid |= CAT_MTIME;
1639                 }
1640                 if (oa->o_valid & OBD_MD_FLATIME) {
1641                         attr->cat_atime = oa->o_atime;
1642                         valid |= CAT_ATIME;
1643                 }
1644                 if (oa->o_valid & OBD_MD_FLCTIME) {
1645                         attr->cat_ctime = oa->o_ctime;
1646                         valid |= CAT_CTIME;
1647                 }
1648
1649                 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1650                         struct lov_oinfo *loi = cl2osc(obj)->oo_oinfo;
1651                         loff_t last_off = last->oap_count + last->oap_obj_off +
1652                                 last->oap_page_off;
1653
1654                         /* Change file size if this is an out of quota or
1655                          * direct IO write and it extends the file size */
1656                         if (loi->loi_lvb.lvb_size < last_off) {
1657                                 attr->cat_size = last_off;
1658                                 valid |= CAT_SIZE;
1659                         }
1660                         /* Extend KMS if it's not a lockless write */
1661                         if (loi->loi_kms < last_off &&
1662                             oap2osc_page(last)->ops_srvlock == 0) {
1663                                 attr->cat_kms = last_off;
1664                                 valid |= CAT_KMS;
1665                         }
1666                 }
1667
1668                 if (valid != 0)
1669                         cl_object_attr_update(env, obj, attr, valid);
1670                 cl_object_attr_unlock(obj);
1671         }
1672         OBDO_FREE(aa->aa_oa);
1673
1674         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE && rc == 0)
1675                 osc_inc_unstable_pages(req);
1676
1677         list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
1678                 list_del_init(&ext->oe_link);
1679                 osc_extent_finish(env, ext, 1, rc);
1680         }
1681         LASSERT(list_empty(&aa->aa_exts));
1682         LASSERT(list_empty(&aa->aa_oaps));
1683
1684         cl_req_completion(env, aa->aa_clerq, rc < 0 ? rc :
1685                           req->rq_bulk->bd_nob_transferred);
1686         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1687         ptlrpc_lprocfs_brw(req, req->rq_bulk->bd_nob_transferred);
1688
1689         spin_lock(&cli->cl_loi_list_lock);
1690         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
1691          * is called so we know whether to go to sync BRWs or wait for more
1692          * RPCs to complete */
1693         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
1694                 cli->cl_w_in_flight--;
1695         else
1696                 cli->cl_r_in_flight--;
1697         osc_wake_cache_waiters(cli);
1698         spin_unlock(&cli->cl_loi_list_lock);
1699
1700         osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
1701         RETURN(rc);
1702 }
1703
1704 static void brw_commit(struct ptlrpc_request *req)
1705 {
1706         /* If osc_inc_unstable_pages (via osc_extent_finish) races with
1707          * this called via the rq_commit_cb, I need to ensure
1708          * osc_dec_unstable_pages is still called. Otherwise unstable
1709          * pages may be leaked. */
1710         spin_lock(&req->rq_lock);
1711         if (likely(req->rq_unstable)) {
1712                 req->rq_unstable = 0;
1713                 spin_unlock(&req->rq_lock);
1714
1715                 osc_dec_unstable_pages(req);
1716         } else {
1717                 req->rq_committed = 1;
1718                 spin_unlock(&req->rq_lock);
1719         }
1720 }
1721
1722 /**
1723  * Build an RPC by the list of extent @ext_list. The caller must ensure
1724  * that the total pages in this list are NOT over max pages per RPC.
1725  * Extents in the list must be in OES_RPC state.
1726  */
1727 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
1728                   struct list_head *ext_list, int cmd, pdl_policy_t pol)
1729 {
1730         struct ptlrpc_request           *req = NULL;
1731         struct osc_extent               *ext;
1732         struct brw_page                 **pga = NULL;
1733         struct osc_brw_async_args       *aa = NULL;
1734         struct obdo                     *oa = NULL;
1735         struct osc_async_page           *oap;
1736         struct osc_async_page           *tmp;
1737         struct cl_req                   *clerq = NULL;
1738         enum cl_req_type                crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE :
1739                                                                       CRT_READ;
1740         struct cl_req_attr              *crattr = NULL;
1741         loff_t                          starting_offset = OBD_OBJECT_EOF;
1742         loff_t                          ending_offset = 0;
1743         int                             mpflag = 0;
1744         int                             mem_tight = 0;
1745         int                             page_count = 0;
1746         bool                            soft_sync = false;
1747         int                             i;
1748         int                             rc;
1749         struct list_head                rpc_list = LIST_HEAD_INIT(rpc_list);
1750         struct ost_body                 *body;
1751         ENTRY;
1752         LASSERT(!list_empty(ext_list));
1753
1754         /* add pages into rpc_list to build BRW rpc */
1755         list_for_each_entry(ext, ext_list, oe_link) {
1756                 LASSERT(ext->oe_state == OES_RPC);
1757                 mem_tight |= ext->oe_memalloc;
1758                 list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
1759                         ++page_count;
1760                         list_add_tail(&oap->oap_rpc_item, &rpc_list);
1761                         if (starting_offset == OBD_OBJECT_EOF ||
1762                             starting_offset > oap->oap_obj_off)
1763                                 starting_offset = oap->oap_obj_off;
1764                         else
1765                                 LASSERT(oap->oap_page_off == 0);
1766                         if (ending_offset < oap->oap_obj_off + oap->oap_count)
1767                                 ending_offset = oap->oap_obj_off +
1768                                                 oap->oap_count;
1769                         else
1770                                 LASSERT(oap->oap_page_off + oap->oap_count ==
1771                                         PAGE_CACHE_SIZE);
1772                 }
1773         }
1774
1775         soft_sync = osc_over_unstable_soft_limit(cli);
1776         if (mem_tight)
1777                 mpflag = cfs_memory_pressure_get_and_set();
1778
1779         OBD_ALLOC(crattr, sizeof(*crattr));
1780         if (crattr == NULL)
1781                 GOTO(out, rc = -ENOMEM);
1782
1783         OBD_ALLOC(pga, sizeof(*pga) * page_count);
1784         if (pga == NULL)
1785                 GOTO(out, rc = -ENOMEM);
1786
1787         OBDO_ALLOC(oa);
1788         if (oa == NULL)
1789                 GOTO(out, rc = -ENOMEM);
1790
1791         i = 0;
1792         list_for_each_entry(oap, &rpc_list, oap_rpc_item) {
1793                 struct cl_page *page = oap2cl_page(oap);
1794                 if (clerq == NULL) {
1795                         clerq = cl_req_alloc(env, page, crt,
1796                                              1 /* only 1-object rpcs for now */);
1797                         if (IS_ERR(clerq))
1798                                 GOTO(out, rc = PTR_ERR(clerq));
1799                 }
1800                 if (mem_tight)
1801                         oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
1802                 if (soft_sync)
1803                         oap->oap_brw_flags |= OBD_BRW_SOFT_SYNC;
1804                 pga[i] = &oap->oap_brw_page;
1805                 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
1806                 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
1807                        pga[i]->pg, page_index(oap->oap_page), oap,
1808                        pga[i]->flag);
1809                 i++;
1810                 cl_req_page_add(env, clerq, page);
1811         }
1812
1813         /* always get the data for the obdo for the rpc */
1814         LASSERT(clerq != NULL);
1815         crattr->cra_oa = oa;
1816         cl_req_attr_set(env, clerq, crattr, ~0ULL);
1817
1818         rc = cl_req_prep(env, clerq);
1819         if (rc != 0) {
1820                 CERROR("cl_req_prep failed: %d\n", rc);
1821                 GOTO(out, rc);
1822         }
1823
1824         sort_brw_pages(pga, page_count);
1825         rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
1826                         pga, &req, crattr->cra_capa, 1, 0);
1827         if (rc != 0) {
1828                 CERROR("prep_req failed: %d\n", rc);
1829                 GOTO(out, rc);
1830         }
1831
1832         req->rq_commit_cb = brw_commit;
1833         req->rq_interpret_reply = brw_interpret;
1834
1835         if (mem_tight != 0)
1836                 req->rq_memalloc = 1;
1837
1838         /* Need to update the timestamps after the request is built in case
1839          * we race with setattr (locally or in queue at OST).  If OST gets
1840          * later setattr before earlier BRW (as determined by the request xid),
1841          * the OST will not use BRW timestamps.  Sadly, there is no obvious
1842          * way to do this in a single call.  bug 10150 */
1843         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
1844         crattr->cra_oa = &body->oa;
1845         cl_req_attr_set(env, clerq, crattr,
1846                         OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME);
1847
1848         lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid);
1849
1850         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1851         aa = ptlrpc_req_async_args(req);
1852         INIT_LIST_HEAD(&aa->aa_oaps);
1853         list_splice_init(&rpc_list, &aa->aa_oaps);
1854         INIT_LIST_HEAD(&aa->aa_exts);
1855         list_splice_init(ext_list, &aa->aa_exts);
1856         aa->aa_clerq = clerq;
1857
1858         /* queued sync pages can be torn down while the pages
1859          * were between the pending list and the rpc */
1860         tmp = NULL;
1861         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1862                 /* only one oap gets a request reference */
1863                 if (tmp == NULL)
1864                         tmp = oap;
1865                 if (oap->oap_interrupted && !req->rq_intr) {
1866                         CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
1867                                         oap, req);
1868                         ptlrpc_mark_interrupted(req);
1869                 }
1870         }
1871         if (tmp != NULL)
1872                 tmp->oap_request = ptlrpc_request_addref(req);
1873
1874         spin_lock(&cli->cl_loi_list_lock);
1875         starting_offset >>= PAGE_CACHE_SHIFT;
1876         if (cmd == OBD_BRW_READ) {
1877                 cli->cl_r_in_flight++;
1878                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
1879                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
1880                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
1881                                       starting_offset + 1);
1882         } else {
1883                 cli->cl_w_in_flight++;
1884                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
1885                 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
1886                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
1887                                       starting_offset + 1);
1888         }
1889         spin_unlock(&cli->cl_loi_list_lock);
1890
1891         DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %ur/%uw in flight",
1892                   page_count, aa, cli->cl_r_in_flight,
1893                   cli->cl_w_in_flight);
1894
1895         /* XXX: Maybe the caller can check the RPC bulk descriptor to
1896          * see which CPU/NUMA node the majority of pages were allocated
1897          * on, and try to assign the async RPC to the CPU core
1898          * (PDL_POLICY_PREFERRED) to reduce cross-CPU memory traffic.
1899          *
1900          * But on the other hand, we expect that multiple ptlrpcd
1901          * threads and the initial write sponsor can run in parallel,
1902          * especially when data checksum is enabled, which is CPU-bound
1903          * operation and single ptlrpcd thread cannot process in time.
1904          * So more ptlrpcd threads sharing BRW load
1905          * (with PDL_POLICY_ROUND) seems better.
1906          */
1907         ptlrpcd_add_req(req, pol, -1);
1908         rc = 0;
1909         EXIT;
1910
1911 out:
1912         if (mem_tight != 0)
1913                 cfs_memory_pressure_restore(mpflag);
1914
1915         if (crattr != NULL) {
1916                 capa_put(crattr->cra_capa);
1917                 OBD_FREE(crattr, sizeof(*crattr));
1918         }
1919
1920         if (rc != 0) {
1921                 LASSERT(req == NULL);
1922
1923                 if (oa)
1924                         OBDO_FREE(oa);
1925                 if (pga)
1926                         OBD_FREE(pga, sizeof(*pga) * page_count);
1927                 /* this should happen rarely and is pretty bad, it makes the
1928                  * pending list not follow the dirty order */
1929                 while (!list_empty(ext_list)) {
1930                         ext = list_entry(ext_list->next, struct osc_extent,
1931                                          oe_link);
1932                         list_del_init(&ext->oe_link);
1933                         osc_extent_finish(env, ext, 0, rc);
1934                 }
1935                 if (clerq && !IS_ERR(clerq))
1936                         cl_req_completion(env, clerq, rc);
1937         }
1938         RETURN(rc);
1939 }
1940
1941 static int osc_set_lock_data_with_check(struct ldlm_lock *lock,
1942                                         struct ldlm_enqueue_info *einfo)
1943 {
1944         void *data = einfo->ei_cbdata;
1945         int set = 0;
1946
1947         LASSERT(lock != NULL);
1948         LASSERT(lock->l_blocking_ast == einfo->ei_cb_bl);
1949         LASSERT(lock->l_resource->lr_type == einfo->ei_type);
1950         LASSERT(lock->l_completion_ast == einfo->ei_cb_cp);
1951         LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl);
1952
1953         lock_res_and_lock(lock);
1954
1955         if (lock->l_ast_data == NULL)
1956                 lock->l_ast_data = data;
1957         if (lock->l_ast_data == data)
1958                 set = 1;
1959
1960         unlock_res_and_lock(lock);
1961
1962         return set;
1963 }
1964
1965 static int osc_set_data_with_check(struct lustre_handle *lockh,
1966                                    struct ldlm_enqueue_info *einfo)
1967 {
1968         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
1969         int set = 0;
1970
1971         if (lock != NULL) {
1972                 set = osc_set_lock_data_with_check(lock, einfo);
1973                 LDLM_LOCK_PUT(lock);
1974         } else
1975                 CERROR("lockh %p, data %p - client evicted?\n",
1976                        lockh, einfo->ei_cbdata);
1977         return set;
1978 }
1979
1980 static int osc_enqueue_fini(struct ptlrpc_request *req,
1981                             osc_enqueue_upcall_f upcall, void *cookie,
1982                             struct lustre_handle *lockh, ldlm_mode_t mode,
1983                             __u64 *flags, int agl, int errcode)
1984 {
1985         bool intent = *flags & LDLM_FL_HAS_INTENT;
1986         int rc;
1987         ENTRY;
1988
1989         /* The request was created before ldlm_cli_enqueue call. */
1990         if (intent && errcode == ELDLM_LOCK_ABORTED) {
1991                 struct ldlm_reply *rep;
1992
1993                 rep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
1994                 LASSERT(rep != NULL);
1995
1996                 rep->lock_policy_res1 =
1997                         ptlrpc_status_ntoh(rep->lock_policy_res1);
1998                 if (rep->lock_policy_res1)
1999                         errcode = rep->lock_policy_res1;
2000                 if (!agl)
2001                         *flags |= LDLM_FL_LVB_READY;
2002         } else if (errcode == ELDLM_OK) {
2003                 *flags |= LDLM_FL_LVB_READY;
2004         }
2005
2006         /* Call the update callback. */
2007         rc = (*upcall)(cookie, lockh, errcode);
2008
2009         /* release the reference taken in ldlm_cli_enqueue() */
2010         if (errcode == ELDLM_LOCK_MATCHED)
2011                 errcode = ELDLM_OK;
2012         if (errcode == ELDLM_OK && lustre_handle_is_used(lockh))
2013                 ldlm_lock_decref(lockh, mode);
2014
2015         RETURN(rc);
2016 }
2017
2018 static int osc_enqueue_interpret(const struct lu_env *env,
2019                                  struct ptlrpc_request *req,
2020                                  struct osc_enqueue_args *aa, int rc)
2021 {
2022         struct ldlm_lock *lock;
2023         struct lustre_handle *lockh = &aa->oa_lockh;
2024         ldlm_mode_t mode = aa->oa_mode;
2025         struct ost_lvb *lvb = aa->oa_lvb;
2026         __u32 lvb_len = sizeof(*lvb);
2027         __u64 flags = 0;
2028
2029         ENTRY;
2030
2031         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2032          * be valid. */
2033         lock = ldlm_handle2lock(lockh);
2034         LASSERTF(lock != NULL,
2035                  "lockh "LPX64", req %p, aa %p - client evicted?\n",
2036                  lockh->cookie, req, aa);
2037
2038         /* Take an additional reference so that a blocking AST that
2039          * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2040          * to arrive after an upcall has been executed by
2041          * osc_enqueue_fini(). */
2042         ldlm_lock_addref(lockh, mode);
2043
2044         /* Let cl_lock_state_wait fail with -ERESTARTSYS to unuse sublocks. */
2045         OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_HANG, 2);
2046
2047         /* Let CP AST to grant the lock first. */
2048         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
2049
2050         if (aa->oa_agl) {
2051                 LASSERT(aa->oa_lvb == NULL);
2052                 LASSERT(aa->oa_flags == NULL);
2053                 aa->oa_flags = &flags;
2054         }
2055
2056         /* Complete obtaining the lock procedure. */
2057         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_type, 1,
2058                                    aa->oa_mode, aa->oa_flags, lvb, lvb_len,
2059                                    lockh, rc);
2060         /* Complete osc stuff. */
2061         rc = osc_enqueue_fini(req, aa->oa_upcall, aa->oa_cookie, lockh, mode,
2062                               aa->oa_flags, aa->oa_agl, rc);
2063
2064         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
2065
2066         ldlm_lock_decref(lockh, mode);
2067         LDLM_LOCK_PUT(lock);
2068         RETURN(rc);
2069 }
2070
2071 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
2072
2073 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2074  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2075  * other synchronous requests, however keeping some locks and trying to obtain
2076  * others may take a considerable amount of time in a case of ost failure; and
2077  * when other sync requests do not get released lock from a client, the client
2078  * is evicted from the cluster -- such scenarious make the life difficult, so
2079  * release locks just after they are obtained. */
2080 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2081                      __u64 *flags, ldlm_policy_data_t *policy,
2082                      struct ost_lvb *lvb, int kms_valid,
2083                      osc_enqueue_upcall_f upcall, void *cookie,
2084                      struct ldlm_enqueue_info *einfo,
2085                      struct ptlrpc_request_set *rqset, int async, int agl)
2086 {
2087         struct obd_device *obd = exp->exp_obd;
2088         struct lustre_handle lockh = { 0 };
2089         struct ptlrpc_request *req = NULL;
2090         int intent = *flags & LDLM_FL_HAS_INTENT;
2091         __u64 match_lvb = agl ? 0 : LDLM_FL_LVB_READY;
2092         ldlm_mode_t mode;
2093         int rc;
2094         ENTRY;
2095
2096         /* Filesystem lock extents are extended to page boundaries so that
2097          * dealing with the page cache is a little smoother.  */
2098         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2099         policy->l_extent.end |= ~CFS_PAGE_MASK;
2100
2101         /*
2102          * kms is not valid when either object is completely fresh (so that no
2103          * locks are cached), or object was evicted. In the latter case cached
2104          * lock cannot be used, because it would prime inode state with
2105          * potentially stale LVB.
2106          */
2107         if (!kms_valid)
2108                 goto no_match;
2109
2110         /* Next, search for already existing extent locks that will cover us */
2111         /* If we're trying to read, we also search for an existing PW lock.  The
2112          * VFS and page cache already protect us locally, so lots of readers/
2113          * writers can share a single PW lock.
2114          *
2115          * There are problems with conversion deadlocks, so instead of
2116          * converting a read lock to a write lock, we'll just enqueue a new
2117          * one.
2118          *
2119          * At some point we should cancel the read lock instead of making them
2120          * send us a blocking callback, but there are problems with canceling
2121          * locks out from other users right now, too. */
2122         mode = einfo->ei_mode;
2123         if (einfo->ei_mode == LCK_PR)
2124                 mode |= LCK_PW;
2125         mode = ldlm_lock_match(obd->obd_namespace, *flags | match_lvb, res_id,
2126                                einfo->ei_type, policy, mode, &lockh, 0);
2127         if (mode) {
2128                 struct ldlm_lock *matched;
2129
2130                 if (*flags & LDLM_FL_TEST_LOCK)
2131                         RETURN(ELDLM_OK);
2132
2133                 matched = ldlm_handle2lock(&lockh);
2134                 if (agl) {
2135                         /* AGL enqueues DLM locks speculatively. Therefore if
2136                          * it already exists a DLM lock, it wll just inform the
2137                          * caller to cancel the AGL process for this stripe. */
2138                         ldlm_lock_decref(&lockh, mode);
2139                         LDLM_LOCK_PUT(matched);
2140                         RETURN(-ECANCELED);
2141                 } else if (osc_set_lock_data_with_check(matched, einfo)) {
2142                         *flags |= LDLM_FL_LVB_READY;
2143
2144                         /* We already have a lock, and it's referenced. */
2145                         (*upcall)(cookie, &lockh, ELDLM_LOCK_MATCHED);
2146
2147                         ldlm_lock_decref(&lockh, mode);
2148                         LDLM_LOCK_PUT(matched);
2149                         RETURN(ELDLM_OK);
2150                 } else {
2151                         ldlm_lock_decref(&lockh, mode);
2152                         LDLM_LOCK_PUT(matched);
2153                 }
2154         }
2155
2156 no_match:
2157         if (*flags & LDLM_FL_TEST_LOCK)
2158                 RETURN(-ENOLCK);
2159
2160         if (intent) {
2161                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2162                                            &RQF_LDLM_ENQUEUE_LVB);
2163                 if (req == NULL)
2164                         RETURN(-ENOMEM);
2165
2166                 rc = ptlrpc_request_pack(req, LUSTRE_DLM_VERSION, LDLM_ENQUEUE);
2167                 if (rc < 0) {
2168                         ptlrpc_request_free(req);
2169                         RETURN(rc);
2170                 }
2171
2172                 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
2173                                      sizeof *lvb);
2174                 ptlrpc_request_set_replen(req);
2175         }
2176
2177         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2178         *flags &= ~LDLM_FL_BLOCK_GRANTED;
2179
2180         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2181                               sizeof(*lvb), LVB_T_OST, &lockh, async);
2182         if (async) {
2183                 if (!rc) {
2184                         struct osc_enqueue_args *aa;
2185                         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2186                         aa = ptlrpc_req_async_args(req);
2187                         aa->oa_exp    = exp;
2188                         aa->oa_mode   = einfo->ei_mode;
2189                         aa->oa_type   = einfo->ei_type;
2190                         lustre_handle_copy(&aa->oa_lockh, &lockh);
2191                         aa->oa_upcall = upcall;
2192                         aa->oa_cookie = cookie;
2193                         aa->oa_agl    = !!agl;
2194                         if (!agl) {
2195                                 aa->oa_flags  = flags;
2196                                 aa->oa_lvb    = lvb;
2197                         } else {
2198                                 /* AGL is essentially to enqueue an DLM lock
2199                                  * in advance, so we don't care about the
2200                                  * result of AGL enqueue. */
2201                                 aa->oa_lvb    = NULL;
2202                                 aa->oa_flags  = NULL;
2203                         }
2204
2205                         req->rq_interpret_reply =
2206                                 (ptlrpc_interpterer_t)osc_enqueue_interpret;
2207                         if (rqset == PTLRPCD_SET)
2208                                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
2209                         else
2210                                 ptlrpc_set_add_req(rqset, req);
2211                 } else if (intent) {
2212                         ptlrpc_req_finished(req);
2213                 }
2214                 RETURN(rc);
2215         }
2216
2217         rc = osc_enqueue_fini(req, upcall, cookie, &lockh, einfo->ei_mode,
2218                               flags, agl, rc);
2219         if (intent)
2220                 ptlrpc_req_finished(req);
2221
2222         RETURN(rc);
2223 }
2224
2225 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2226                    __u32 type, ldlm_policy_data_t *policy, __u32 mode,
2227                    __u64 *flags, void *data, struct lustre_handle *lockh,
2228                    int unref)
2229 {
2230         struct obd_device *obd = exp->exp_obd;
2231         __u64 lflags = *flags;
2232         ldlm_mode_t rc;
2233         ENTRY;
2234
2235         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2236                 RETURN(-EIO);
2237
2238         /* Filesystem lock extents are extended to page boundaries so that
2239          * dealing with the page cache is a little smoother */
2240         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2241         policy->l_extent.end |= ~CFS_PAGE_MASK;
2242
2243         /* Next, search for already existing extent locks that will cover us */
2244         /* If we're trying to read, we also search for an existing PW lock.  The
2245          * VFS and page cache already protect us locally, so lots of readers/
2246          * writers can share a single PW lock. */
2247         rc = mode;
2248         if (mode == LCK_PR)
2249                 rc |= LCK_PW;
2250         rc = ldlm_lock_match(obd->obd_namespace, lflags,
2251                              res_id, type, policy, rc, lockh, unref);
2252         if (rc) {
2253                 if (data != NULL) {
2254                         if (!osc_set_data_with_check(lockh, data)) {
2255                                 if (!(lflags & LDLM_FL_TEST_LOCK))
2256                                         ldlm_lock_decref(lockh, rc);
2257                                 RETURN(0);
2258                         }
2259                 }
2260                 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
2261                         ldlm_lock_addref(lockh, LCK_PR);
2262                         ldlm_lock_decref(lockh, LCK_PW);
2263                 }
2264                 RETURN(rc);
2265         }
2266         RETURN(rc);
2267 }
2268
2269 static int osc_statfs_interpret(const struct lu_env *env,
2270                                 struct ptlrpc_request *req,
2271                                 struct osc_async_args *aa, int rc)
2272 {
2273         struct obd_statfs *msfs;
2274         ENTRY;
2275
2276         if (rc == -EBADR)
2277                 /* The request has in fact never been sent
2278                  * due to issues at a higher level (LOV).
2279                  * Exit immediately since the caller is
2280                  * aware of the problem and takes care
2281                  * of the clean up */
2282                  RETURN(rc);
2283
2284         if ((rc == -ENOTCONN || rc == -EAGAIN) &&
2285             (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
2286                 GOTO(out, rc = 0);
2287
2288         if (rc != 0)
2289                 GOTO(out, rc);
2290
2291         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2292         if (msfs == NULL) {
2293                 GOTO(out, rc = -EPROTO);
2294         }
2295
2296         *aa->aa_oi->oi_osfs = *msfs;
2297 out:
2298         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2299         RETURN(rc);
2300 }
2301
2302 static int osc_statfs_async(struct obd_export *exp,
2303                             struct obd_info *oinfo, __u64 max_age,
2304                             struct ptlrpc_request_set *rqset)
2305 {
2306         struct obd_device     *obd = class_exp2obd(exp);
2307         struct ptlrpc_request *req;
2308         struct osc_async_args *aa;
2309         int                    rc;
2310         ENTRY;
2311
2312         /* We could possibly pass max_age in the request (as an absolute
2313          * timestamp or a "seconds.usec ago") so the target can avoid doing
2314          * extra calls into the filesystem if that isn't necessary (e.g.
2315          * during mount that would help a bit).  Having relative timestamps
2316          * is not so great if request processing is slow, while absolute
2317          * timestamps are not ideal because they need time synchronization. */
2318         req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
2319         if (req == NULL)
2320                 RETURN(-ENOMEM);
2321
2322         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2323         if (rc) {
2324                 ptlrpc_request_free(req);
2325                 RETURN(rc);
2326         }
2327         ptlrpc_request_set_replen(req);
2328         req->rq_request_portal = OST_CREATE_PORTAL;
2329         ptlrpc_at_set_req_timeout(req);
2330
2331         if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
2332                 /* procfs requests not want stat in wait for avoid deadlock */
2333                 req->rq_no_resend = 1;
2334                 req->rq_no_delay = 1;
2335         }
2336
2337         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
2338         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2339         aa = ptlrpc_req_async_args(req);
2340         aa->aa_oi = oinfo;
2341
2342         ptlrpc_set_add_req(rqset, req);
2343         RETURN(0);
2344 }
2345
2346 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
2347                       struct obd_statfs *osfs, __u64 max_age, __u32 flags)
2348 {
2349         struct obd_device     *obd = class_exp2obd(exp);
2350         struct obd_statfs     *msfs;
2351         struct ptlrpc_request *req;
2352         struct obd_import     *imp = NULL;
2353         int rc;
2354         ENTRY;
2355
2356         /*Since the request might also come from lprocfs, so we need
2357          *sync this with client_disconnect_export Bug15684*/
2358         down_read(&obd->u.cli.cl_sem);
2359         if (obd->u.cli.cl_import)
2360                 imp = class_import_get(obd->u.cli.cl_import);
2361         up_read(&obd->u.cli.cl_sem);
2362         if (!imp)
2363                 RETURN(-ENODEV);
2364
2365         /* We could possibly pass max_age in the request (as an absolute
2366          * timestamp or a "seconds.usec ago") so the target can avoid doing
2367          * extra calls into the filesystem if that isn't necessary (e.g.
2368          * during mount that would help a bit).  Having relative timestamps
2369          * is not so great if request processing is slow, while absolute
2370          * timestamps are not ideal because they need time synchronization. */
2371         req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
2372
2373         class_import_put(imp);
2374
2375         if (req == NULL)
2376                 RETURN(-ENOMEM);
2377
2378         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2379         if (rc) {
2380                 ptlrpc_request_free(req);
2381                 RETURN(rc);
2382         }
2383         ptlrpc_request_set_replen(req);
2384         req->rq_request_portal = OST_CREATE_PORTAL;
2385         ptlrpc_at_set_req_timeout(req);
2386
2387         if (flags & OBD_STATFS_NODELAY) {
2388                 /* procfs requests not want stat in wait for avoid deadlock */
2389                 req->rq_no_resend = 1;
2390                 req->rq_no_delay = 1;
2391         }
2392
2393         rc = ptlrpc_queue_wait(req);
2394         if (rc)
2395                 GOTO(out, rc);
2396
2397         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2398         if (msfs == NULL) {
2399                 GOTO(out, rc = -EPROTO);
2400         }
2401
2402         *osfs = *msfs;
2403
2404         EXIT;
2405  out:
2406         ptlrpc_req_finished(req);
2407         return rc;
2408 }
2409
2410 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2411                          void *karg, void *uarg)
2412 {
2413         struct obd_device *obd = exp->exp_obd;
2414         struct obd_ioctl_data *data = karg;
2415         int err = 0;
2416         ENTRY;
2417
2418         if (!try_module_get(THIS_MODULE)) {
2419                 CERROR("%s: cannot get module '%s'\n", obd->obd_name,
2420                        module_name(THIS_MODULE));
2421                 return -EINVAL;
2422         }
2423         switch (cmd) {
2424         case OBD_IOC_CLIENT_RECOVER:
2425                 err = ptlrpc_recover_import(obd->u.cli.cl_import,
2426                                             data->ioc_inlbuf1, 0);
2427                 if (err > 0)
2428                         err = 0;
2429                 GOTO(out, err);
2430         case IOC_OSC_SET_ACTIVE:
2431                 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
2432                                                data->ioc_offset);
2433                 GOTO(out, err);
2434         case OBD_IOC_POLL_QUOTACHECK:
2435                 err = osc_quota_poll_check(exp, (struct if_quotacheck *)karg);
2436                 GOTO(out, err);
2437         case OBD_IOC_PING_TARGET:
2438                 err = ptlrpc_obd_ping(obd);
2439                 GOTO(out, err);
2440         default:
2441                 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
2442                        cmd, current_comm());
2443                 GOTO(out, err = -ENOTTY);
2444         }
2445 out:
2446         module_put(THIS_MODULE);
2447         return err;
2448 }
2449
2450 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
2451                               u32 keylen, void *key,
2452                               u32 vallen, void *val,
2453                               struct ptlrpc_request_set *set)
2454 {
2455         struct ptlrpc_request *req;
2456         struct obd_device     *obd = exp->exp_obd;
2457         struct obd_import     *imp = class_exp2cliimp(exp);
2458         char                  *tmp;
2459         int                    rc;
2460         ENTRY;
2461
2462         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
2463
2464         if (KEY_IS(KEY_CHECKSUM)) {
2465                 if (vallen != sizeof(int))
2466                         RETURN(-EINVAL);
2467                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
2468                 RETURN(0);
2469         }
2470
2471         if (KEY_IS(KEY_SPTLRPC_CONF)) {
2472                 sptlrpc_conf_client_adapt(obd);
2473                 RETURN(0);
2474         }
2475
2476         if (KEY_IS(KEY_FLUSH_CTX)) {
2477                 sptlrpc_import_flush_my_ctx(imp);
2478                 RETURN(0);
2479         }
2480
2481         if (KEY_IS(KEY_CACHE_SET)) {
2482                 struct client_obd *cli = &obd->u.cli;
2483
2484                 LASSERT(cli->cl_cache == NULL); /* only once */
2485                 cli->cl_cache = (struct cl_client_cache *)val;
2486                 cl_cache_incref(cli->cl_cache);
2487                 cli->cl_lru_left = &cli->cl_cache->ccc_lru_left;
2488
2489                 /* add this osc into entity list */
2490                 LASSERT(list_empty(&cli->cl_lru_osc));
2491                 spin_lock(&cli->cl_cache->ccc_lru_lock);
2492                 list_add(&cli->cl_lru_osc, &cli->cl_cache->ccc_lru);
2493                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
2494
2495                 RETURN(0);
2496         }
2497
2498         if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
2499                 struct client_obd *cli = &obd->u.cli;
2500                 long nr = atomic_long_read(&cli->cl_lru_in_list) >> 1;
2501                 long target = *(long *)val;
2502
2503                 nr = osc_lru_shrink(env, cli, min(nr, target), true);
2504                 *(long *)val -= nr;
2505                 RETURN(0);
2506         }
2507
2508         if (!set && !KEY_IS(KEY_GRANT_SHRINK))
2509                 RETURN(-EINVAL);
2510
2511         /* We pass all other commands directly to OST. Since nobody calls osc
2512            methods directly and everybody is supposed to go through LOV, we
2513            assume lov checked invalid values for us.
2514            The only recognised values so far are evict_by_nid and mds_conn.
2515            Even if something bad goes through, we'd get a -EINVAL from OST
2516            anyway. */
2517
2518         req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ?
2519                                                 &RQF_OST_SET_GRANT_INFO :
2520                                                 &RQF_OBD_SET_INFO);
2521         if (req == NULL)
2522                 RETURN(-ENOMEM);
2523
2524         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
2525                              RCL_CLIENT, keylen);
2526         if (!KEY_IS(KEY_GRANT_SHRINK))
2527                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
2528                                      RCL_CLIENT, vallen);
2529         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
2530         if (rc) {
2531                 ptlrpc_request_free(req);
2532                 RETURN(rc);
2533         }
2534
2535         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
2536         memcpy(tmp, key, keylen);
2537         tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ?
2538                                                         &RMF_OST_BODY :
2539                                                         &RMF_SETINFO_VAL);
2540         memcpy(tmp, val, vallen);
2541
2542         if (KEY_IS(KEY_GRANT_SHRINK)) {
2543                 struct osc_grant_args *aa;
2544                 struct obdo *oa;
2545
2546                 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2547                 aa = ptlrpc_req_async_args(req);
2548                 OBDO_ALLOC(oa);
2549                 if (!oa) {
2550                         ptlrpc_req_finished(req);
2551                         RETURN(-ENOMEM);
2552                 }
2553                 *oa = ((struct ost_body *)val)->oa;
2554                 aa->aa_oa = oa;
2555                 req->rq_interpret_reply = osc_shrink_grant_interpret;
2556         }
2557
2558         ptlrpc_request_set_replen(req);
2559         if (!KEY_IS(KEY_GRANT_SHRINK)) {
2560                 LASSERT(set != NULL);
2561                 ptlrpc_set_add_req(set, req);
2562                 ptlrpc_check_set(NULL, set);
2563         } else
2564                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
2565
2566         RETURN(0);
2567 }
2568
2569 static int osc_reconnect(const struct lu_env *env,
2570                          struct obd_export *exp, struct obd_device *obd,
2571                          struct obd_uuid *cluuid,
2572                          struct obd_connect_data *data,
2573                          void *localdata)
2574 {
2575         struct client_obd *cli = &obd->u.cli;
2576
2577         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
2578                 long lost_grant;
2579
2580                 spin_lock(&cli->cl_loi_list_lock);
2581                 data->ocd_grant = (cli->cl_avail_grant +
2582                                   (cli->cl_dirty_pages << PAGE_CACHE_SHIFT)) ?:
2583                                   2 * cli_brw_size(obd);
2584                 lost_grant = cli->cl_lost_grant;
2585                 cli->cl_lost_grant = 0;
2586                 spin_unlock(&cli->cl_loi_list_lock);
2587
2588                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
2589                        " ocd_grant: %d, lost: %ld.\n", data->ocd_connect_flags,
2590                        data->ocd_version, data->ocd_grant, lost_grant);
2591         }
2592
2593         RETURN(0);
2594 }
2595
2596 static int osc_disconnect(struct obd_export *exp)
2597 {
2598         struct obd_device *obd = class_exp2obd(exp);
2599         int rc;
2600
2601         rc = client_disconnect_export(exp);
2602         /**
2603          * Initially we put del_shrink_grant before disconnect_export, but it
2604          * causes the following problem if setup (connect) and cleanup
2605          * (disconnect) are tangled together.
2606          *      connect p1                     disconnect p2
2607          *   ptlrpc_connect_import
2608          *     ...............               class_manual_cleanup
2609          *                                     osc_disconnect
2610          *                                     del_shrink_grant
2611          *   ptlrpc_connect_interrupt
2612          *     init_grant_shrink
2613          *   add this client to shrink list
2614          *                                      cleanup_osc
2615          * Bang! pinger trigger the shrink.
2616          * So the osc should be disconnected from the shrink list, after we
2617          * are sure the import has been destroyed. BUG18662
2618          */
2619         if (obd->u.cli.cl_import == NULL)
2620                 osc_del_shrink_grant(&obd->u.cli);
2621         return rc;
2622 }
2623
2624 static int osc_import_event(struct obd_device *obd,
2625                             struct obd_import *imp,
2626                             enum obd_import_event event)
2627 {
2628         struct client_obd *cli;
2629         int rc = 0;
2630
2631         ENTRY;
2632         LASSERT(imp->imp_obd == obd);
2633
2634         switch (event) {
2635         case IMP_EVENT_DISCON: {
2636                 cli = &obd->u.cli;
2637                 spin_lock(&cli->cl_loi_list_lock);
2638                 cli->cl_avail_grant = 0;
2639                 cli->cl_lost_grant = 0;
2640                 spin_unlock(&cli->cl_loi_list_lock);
2641                 break;
2642         }
2643         case IMP_EVENT_INACTIVE: {
2644                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
2645                 break;
2646         }
2647         case IMP_EVENT_INVALIDATE: {
2648                 struct ldlm_namespace *ns = obd->obd_namespace;
2649                 struct lu_env         *env;
2650                 int                    refcheck;
2651
2652                 env = cl_env_get(&refcheck);
2653                 if (!IS_ERR(env)) {
2654                         /* Reset grants */
2655                         cli = &obd->u.cli;
2656                         /* all pages go to failing rpcs due to the invalid
2657                          * import */
2658                         osc_io_unplug(env, cli, NULL, PDL_POLICY_ROUND);
2659
2660                         ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
2661                         cl_env_put(env, &refcheck);
2662                 } else
2663                         rc = PTR_ERR(env);
2664                 break;
2665         }
2666         case IMP_EVENT_ACTIVE: {
2667                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
2668                 break;
2669         }
2670         case IMP_EVENT_OCD: {
2671                 struct obd_connect_data *ocd = &imp->imp_connect_data;
2672
2673                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
2674                         osc_init_grant(&obd->u.cli, ocd);
2675
2676                 /* See bug 7198 */
2677                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
2678                         imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
2679
2680                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
2681                 break;
2682         }
2683         case IMP_EVENT_DEACTIVATE: {
2684                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE, NULL);
2685                 break;
2686         }
2687         case IMP_EVENT_ACTIVATE: {
2688                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE, NULL);
2689                 break;
2690         }
2691         default:
2692                 CERROR("Unknown import event %d\n", event);
2693                 LBUG();
2694         }
2695         RETURN(rc);
2696 }
2697
2698 /**
2699  * Determine whether the lock can be canceled before replaying the lock
2700  * during recovery, see bug16774 for detailed information.
2701  *
2702  * \retval zero the lock can't be canceled
2703  * \retval other ok to cancel
2704  */
2705 static int osc_cancel_weight(struct ldlm_lock *lock)
2706 {
2707         /*
2708          * Cancel all unused and granted extent lock.
2709          */
2710         if (lock->l_resource->lr_type == LDLM_EXTENT &&
2711             lock->l_granted_mode == lock->l_req_mode &&
2712             osc_ldlm_weigh_ast(lock) == 0)
2713                 RETURN(1);
2714
2715         RETURN(0);
2716 }
2717
2718 static int brw_queue_work(const struct lu_env *env, void *data)
2719 {
2720         struct client_obd *cli = data;
2721
2722         CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
2723
2724         osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
2725         RETURN(0);
2726 }
2727
2728 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
2729 {
2730         struct client_obd *cli = &obd->u.cli;
2731         struct obd_type   *type;
2732         void              *handler;
2733         int                rc;
2734         ENTRY;
2735
2736         rc = ptlrpcd_addref();
2737         if (rc)
2738                 RETURN(rc);
2739
2740         rc = client_obd_setup(obd, lcfg);
2741         if (rc)
2742                 GOTO(out_ptlrpcd, rc);
2743
2744         handler = ptlrpcd_alloc_work(cli->cl_import, brw_queue_work, cli);
2745         if (IS_ERR(handler))
2746                 GOTO(out_client_setup, rc = PTR_ERR(handler));
2747         cli->cl_writeback_work = handler;
2748
2749         handler = ptlrpcd_alloc_work(cli->cl_import, lru_queue_work, cli);
2750         if (IS_ERR(handler))
2751                 GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
2752         cli->cl_lru_work = handler;
2753
2754         rc = osc_quota_setup(obd);
2755         if (rc)
2756                 GOTO(out_ptlrpcd_work, rc);
2757
2758         cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
2759
2760 #ifdef CONFIG_PROC_FS
2761         obd->obd_vars = lprocfs_osc_obd_vars;
2762 #endif
2763         /* If this is true then both client (osc) and server (osp) are on the
2764          * same node. The osp layer if loaded first will register the osc proc
2765          * directory. In that case this obd_device will be attached its proc
2766          * tree to type->typ_procsym instead of obd->obd_type->typ_procroot. */
2767         type = class_search_type(LUSTRE_OSP_NAME);
2768         if (type && type->typ_procsym) {
2769                 obd->obd_proc_entry = lprocfs_register(obd->obd_name,
2770                                                        type->typ_procsym,
2771                                                        obd->obd_vars, obd);
2772                 if (IS_ERR(obd->obd_proc_entry)) {
2773                         rc = PTR_ERR(obd->obd_proc_entry);
2774                         CERROR("error %d setting up lprocfs for %s\n", rc,
2775                                obd->obd_name);
2776                         obd->obd_proc_entry = NULL;
2777                 }
2778         } else {
2779                 rc = lprocfs_obd_setup(obd);
2780         }
2781
2782         /* If the basic OSC proc tree construction succeeded then
2783          * lets do the rest. */
2784         if (rc == 0) {
2785                 lproc_osc_attach_seqstat(obd);
2786                 sptlrpc_lprocfs_cliobd_attach(obd);
2787                 ptlrpc_lprocfs_register_obd(obd);
2788         }
2789
2790         /* We need to allocate a few requests more, because
2791          * brw_interpret tries to create new requests before freeing
2792          * previous ones, Ideally we want to have 2x max_rpcs_in_flight
2793          * reserved, but I'm afraid that might be too much wasted RAM
2794          * in fact, so 2 is just my guess and still should work. */
2795         cli->cl_import->imp_rq_pool =
2796                 ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
2797                                     OST_MAXREQSIZE,
2798                                     ptlrpc_add_rqs_to_pool);
2799
2800         INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
2801         ns_register_cancel(obd->obd_namespace, osc_cancel_weight);
2802         RETURN(0);
2803
2804 out_ptlrpcd_work:
2805         if (cli->cl_writeback_work != NULL) {
2806                 ptlrpcd_destroy_work(cli->cl_writeback_work);
2807                 cli->cl_writeback_work = NULL;
2808         }
2809         if (cli->cl_lru_work != NULL) {
2810                 ptlrpcd_destroy_work(cli->cl_lru_work);
2811                 cli->cl_lru_work = NULL;
2812         }
2813 out_client_setup:
2814         client_obd_cleanup(obd);
2815 out_ptlrpcd:
2816         ptlrpcd_decref();
2817         RETURN(rc);
2818 }
2819
2820 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
2821 {
2822         int rc = 0;
2823         ENTRY;
2824
2825         switch (stage) {
2826         case OBD_CLEANUP_EARLY: {
2827                 struct obd_import *imp;
2828                 imp = obd->u.cli.cl_import;
2829                 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
2830                 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
2831                 ptlrpc_deactivate_import(imp);
2832                 spin_lock(&imp->imp_lock);
2833                 imp->imp_pingable = 0;
2834                 spin_unlock(&imp->imp_lock);
2835                 break;
2836         }
2837         case OBD_CLEANUP_EXPORTS: {
2838                 struct client_obd *cli = &obd->u.cli;
2839                 /* LU-464
2840                  * for echo client, export may be on zombie list, wait for
2841                  * zombie thread to cull it, because cli.cl_import will be
2842                  * cleared in client_disconnect_export():
2843                  *   class_export_destroy() -> obd_cleanup() ->
2844                  *   echo_device_free() -> echo_client_cleanup() ->
2845                  *   obd_disconnect() -> osc_disconnect() ->
2846                  *   client_disconnect_export()
2847                  */
2848                 obd_zombie_barrier();
2849                 if (cli->cl_writeback_work) {
2850                         ptlrpcd_destroy_work(cli->cl_writeback_work);
2851                         cli->cl_writeback_work = NULL;
2852                 }
2853                 if (cli->cl_lru_work) {
2854                         ptlrpcd_destroy_work(cli->cl_lru_work);
2855                         cli->cl_lru_work = NULL;
2856                 }
2857                 obd_cleanup_client_import(obd);
2858                 ptlrpc_lprocfs_unregister_obd(obd);
2859                 lprocfs_obd_cleanup(obd);
2860                 break;
2861                 }
2862         }
2863         RETURN(rc);
2864 }
2865
2866 int osc_cleanup(struct obd_device *obd)
2867 {
2868         struct client_obd *cli = &obd->u.cli;
2869         int rc;
2870
2871         ENTRY;
2872
2873         /* lru cleanup */
2874         if (cli->cl_cache != NULL) {
2875                 LASSERT(atomic_read(&cli->cl_cache->ccc_users) > 0);
2876                 spin_lock(&cli->cl_cache->ccc_lru_lock);
2877                 list_del_init(&cli->cl_lru_osc);
2878                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
2879                 cli->cl_lru_left = NULL;
2880                 cl_cache_decref(cli->cl_cache);
2881                 cli->cl_cache = NULL;
2882         }
2883
2884         /* free memory of osc quota cache */
2885         osc_quota_cleanup(obd);
2886
2887         rc = client_obd_cleanup(obd);
2888
2889         ptlrpcd_decref();
2890         RETURN(rc);
2891 }
2892
2893 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
2894 {
2895         int rc = class_process_proc_param(PARAM_OSC, obd->obd_vars, lcfg, obd);
2896         return rc > 0 ? 0: rc;
2897 }
2898
2899 static int osc_process_config(struct obd_device *obd, size_t len, void *buf)
2900 {
2901         return osc_process_config_base(obd, buf);
2902 }
2903
2904 static struct obd_ops osc_obd_ops = {
2905         .o_owner                = THIS_MODULE,
2906         .o_setup                = osc_setup,
2907         .o_precleanup           = osc_precleanup,
2908         .o_cleanup              = osc_cleanup,
2909         .o_add_conn             = client_import_add_conn,
2910         .o_del_conn             = client_import_del_conn,
2911         .o_connect              = client_connect_import,
2912         .o_reconnect            = osc_reconnect,
2913         .o_disconnect           = osc_disconnect,
2914         .o_statfs               = osc_statfs,
2915         .o_statfs_async         = osc_statfs_async,
2916         .o_create               = osc_create,
2917         .o_destroy              = osc_destroy,
2918         .o_getattr              = osc_getattr,
2919         .o_setattr              = osc_setattr,
2920         .o_iocontrol            = osc_iocontrol,
2921         .o_set_info_async       = osc_set_info_async,
2922         .o_import_event         = osc_import_event,
2923         .o_process_config       = osc_process_config,
2924         .o_quotactl             = osc_quotactl,
2925         .o_quotacheck           = osc_quotacheck,
2926 };
2927
2928 static int __init osc_init(void)
2929 {
2930         bool enable_proc = true;
2931         struct obd_type *type;
2932         int rc;
2933         ENTRY;
2934
2935         /* print an address of _any_ initialized kernel symbol from this
2936          * module, to allow debugging with gdb that doesn't support data
2937          * symbols from modules.*/
2938         CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
2939
2940         rc = lu_kmem_init(osc_caches);
2941         if (rc)
2942                 RETURN(rc);
2943
2944         type = class_search_type(LUSTRE_OSP_NAME);
2945         if (type != NULL && type->typ_procsym != NULL)
2946                 enable_proc = false;
2947
2948         rc = class_register_type(&osc_obd_ops, NULL, enable_proc, NULL,
2949                                  LUSTRE_OSC_NAME, &osc_device_type);
2950         if (rc) {
2951                 lu_kmem_fini(osc_caches);
2952                 RETURN(rc);
2953         }
2954
2955         RETURN(rc);
2956 }
2957
2958 static void /*__exit*/ osc_exit(void)
2959 {
2960         class_unregister_type(LUSTRE_OSC_NAME);
2961         lu_kmem_fini(osc_caches);
2962 }
2963
2964 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
2965 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
2966 MODULE_LICENSE("GPL");
2967
2968 cfs_module(osc, LUSTRE_VERSION_STRING, osc_init, osc_exit);