Whamcloud - gitweb
84d412043aed352c38578f128f3f55b24276a6ae
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2014, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #define DEBUG_SUBSYSTEM S_OSC
38
39 #include <libcfs/libcfs.h>
40
41 #include <lustre_dlm.h>
42 #include <lustre_net.h>
43 #include <lustre/lustre_user.h>
44 #include <obd_cksum.h>
45 #include <lustre_ha.h>
46 #include <lprocfs_status.h>
47 #include <lustre_ioctl.h>
48 #include <lustre_debug.h>
49 #include <lustre_param.h>
50 #include <lustre_fid.h>
51 #include <obd_class.h>
52 #include "osc_internal.h"
53 #include "osc_cl_internal.h"
54
55 struct osc_brw_async_args {
56         struct obdo              *aa_oa;
57         int                       aa_requested_nob;
58         int                       aa_nio_count;
59         u32                       aa_page_count;
60         int                       aa_resends;
61         struct brw_page **aa_ppga;
62         struct client_obd        *aa_cli;
63         struct list_head          aa_oaps;
64         struct list_head          aa_exts;
65         struct obd_capa  *aa_ocapa;
66         struct cl_req            *aa_clerq;
67 };
68
69 #define osc_grant_args osc_brw_async_args
70
71 struct osc_setattr_args {
72         struct obdo             *sa_oa;
73         obd_enqueue_update_f     sa_upcall;
74         void                    *sa_cookie;
75 };
76
77 struct osc_fsync_args {
78         struct obd_info *fa_oi;
79         obd_enqueue_update_f     fa_upcall;
80         void                    *fa_cookie;
81 };
82
83 struct osc_enqueue_args {
84         struct obd_export       *oa_exp;
85         ldlm_type_t             oa_type;
86         ldlm_mode_t             oa_mode;
87         __u64                   *oa_flags;
88         osc_enqueue_upcall_f    oa_upcall;
89         void                    *oa_cookie;
90         struct ost_lvb          *oa_lvb;
91         struct lustre_handle    oa_lockh;
92         unsigned int            oa_agl:1;
93 };
94
95 static void osc_release_ppga(struct brw_page **ppga, size_t count);
96 static int brw_interpret(const struct lu_env *env, struct ptlrpc_request *req,
97                          void *data, int rc);
98
99 static inline void osc_pack_capa(struct ptlrpc_request *req,
100                                  struct ost_body *body, void *capa)
101 {
102         struct obd_capa *oc = (struct obd_capa *)capa;
103         struct lustre_capa *c;
104
105         if (!capa)
106                 return;
107
108         c = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
109         LASSERT(c);
110         capa_cpy(c, oc);
111         body->oa.o_valid |= OBD_MD_FLOSSCAPA;
112         DEBUG_CAPA(D_SEC, c, "pack");
113 }
114
115 void osc_pack_req_body(struct ptlrpc_request *req, struct obd_info *oinfo)
116 {
117         struct ost_body *body;
118
119         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
120         LASSERT(body);
121
122         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
123                              oinfo->oi_oa);
124         osc_pack_capa(req, body, oinfo->oi_capa);
125 }
126
127 void osc_set_capa_size(struct ptlrpc_request *req,
128                        const struct req_msg_field *field,
129                        struct obd_capa *oc)
130 {
131         if (oc == NULL)
132                 req_capsule_set_size(&req->rq_pill, field, RCL_CLIENT, 0);
133         else
134                 /* it is already calculated as sizeof struct obd_capa */
135                 ;
136 }
137
138 int osc_getattr_interpret(const struct lu_env *env,
139                           struct ptlrpc_request *req,
140                           struct osc_async_args *aa, int rc)
141 {
142         struct ost_body *body;
143         ENTRY;
144
145         if (rc != 0)
146                 GOTO(out, rc);
147
148         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
149         if (body) {
150                 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
151                 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
152                                      aa->aa_oi->oi_oa, &body->oa);
153
154                 /* This should really be sent by the OST */
155                 aa->aa_oi->oi_oa->o_blksize = DT_MAX_BRW_SIZE;
156                 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
157         } else {
158                 CDEBUG(D_INFO, "can't unpack ost_body\n");
159                 rc = -EPROTO;
160                 aa->aa_oi->oi_oa->o_valid = 0;
161         }
162 out:
163         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
164         RETURN(rc);
165 }
166
167 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
168                        struct obd_info *oinfo)
169 {
170         struct ptlrpc_request *req;
171         struct ost_body       *body;
172         int                    rc;
173         ENTRY;
174
175         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
176         if (req == NULL)
177                 RETURN(-ENOMEM);
178
179         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
180         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
181         if (rc) {
182                 ptlrpc_request_free(req);
183                 RETURN(rc);
184         }
185
186         osc_pack_req_body(req, oinfo);
187
188         ptlrpc_request_set_replen(req);
189
190         rc = ptlrpc_queue_wait(req);
191         if (rc)
192                 GOTO(out, rc);
193
194         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
195         if (body == NULL)
196                 GOTO(out, rc = -EPROTO);
197
198         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
199         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oinfo->oi_oa,
200                              &body->oa);
201
202         oinfo->oi_oa->o_blksize = cli_brw_size(exp->exp_obd);
203         oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
204
205         EXIT;
206  out:
207         ptlrpc_req_finished(req);
208         return rc;
209 }
210
211 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
212                        struct obd_info *oinfo, struct obd_trans_info *oti)
213 {
214         struct ptlrpc_request *req;
215         struct ost_body       *body;
216         int                    rc;
217         ENTRY;
218
219         LASSERT(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP);
220
221         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
222         if (req == NULL)
223                 RETURN(-ENOMEM);
224
225         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
226         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
227         if (rc) {
228                 ptlrpc_request_free(req);
229                 RETURN(rc);
230         }
231
232         osc_pack_req_body(req, oinfo);
233
234         ptlrpc_request_set_replen(req);
235
236         rc = ptlrpc_queue_wait(req);
237         if (rc)
238                 GOTO(out, rc);
239
240         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
241         if (body == NULL)
242                 GOTO(out, rc = -EPROTO);
243
244         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oinfo->oi_oa,
245                              &body->oa);
246
247         EXIT;
248 out:
249         ptlrpc_req_finished(req);
250         RETURN(rc);
251 }
252
253 static int osc_setattr_interpret(const struct lu_env *env,
254                                  struct ptlrpc_request *req,
255                                  struct osc_setattr_args *sa, int rc)
256 {
257         struct ost_body *body;
258         ENTRY;
259
260         if (rc != 0)
261                 GOTO(out, rc);
262
263         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
264         if (body == NULL)
265                 GOTO(out, rc = -EPROTO);
266
267         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, sa->sa_oa,
268                              &body->oa);
269 out:
270         rc = sa->sa_upcall(sa->sa_cookie, rc);
271         RETURN(rc);
272 }
273
274 int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
275                       obd_enqueue_update_f upcall, void *cookie,
276                       struct ptlrpc_request_set *rqset)
277 {
278         struct ptlrpc_request   *req;
279         struct osc_setattr_args *sa;
280         int                      rc;
281         ENTRY;
282
283         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
284         if (req == NULL)
285                 RETURN(-ENOMEM);
286
287         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
288         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
289         if (rc) {
290                 ptlrpc_request_free(req);
291                 RETURN(rc);
292         }
293
294         osc_pack_req_body(req, oinfo);
295
296         ptlrpc_request_set_replen(req);
297
298         /* do mds to ost setattr asynchronously */
299         if (!rqset) {
300                 /* Do not wait for response. */
301                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
302         } else {
303                 req->rq_interpret_reply =
304                         (ptlrpc_interpterer_t)osc_setattr_interpret;
305
306                 CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
307                 sa = ptlrpc_req_async_args(req);
308                 sa->sa_oa = oinfo->oi_oa;
309                 sa->sa_upcall = upcall;
310                 sa->sa_cookie = cookie;
311
312                 if (rqset == PTLRPCD_SET)
313                         ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
314                 else
315                         ptlrpc_set_add_req(rqset, req);
316         }
317
318         RETURN(0);
319 }
320
321 static int osc_create(const struct lu_env *env, struct obd_export *exp,
322                       struct obdo *oa, struct obd_trans_info *oti)
323 {
324         struct ptlrpc_request *req;
325         struct ost_body       *body;
326         int                    rc;
327         ENTRY;
328
329         LASSERT(oa != NULL);
330         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
331         LASSERT(fid_seq_is_echo(ostid_seq(&oa->o_oi)));
332
333         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
334         if (req == NULL)
335                 GOTO(out, rc = -ENOMEM);
336
337         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
338         if (rc) {
339                 ptlrpc_request_free(req);
340                 GOTO(out, rc);
341         }
342
343         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
344         LASSERT(body);
345
346         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
347
348         ptlrpc_request_set_replen(req);
349
350         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
351             oa->o_flags == OBD_FL_DELORPHAN) {
352                 DEBUG_REQ(D_HA, req,
353                           "delorphan from OST integration");
354                 /* Don't resend the delorphan req */
355                 req->rq_no_resend = req->rq_no_delay = 1;
356         }
357
358         rc = ptlrpc_queue_wait(req);
359         if (rc)
360                 GOTO(out_req, rc);
361
362         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
363         if (body == NULL)
364                 GOTO(out_req, rc = -EPROTO);
365
366         CDEBUG(D_INFO, "oa flags %x\n", oa->o_flags);
367         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
368
369         oa->o_blksize = cli_brw_size(exp->exp_obd);
370         oa->o_valid |= OBD_MD_FLBLKSZ;
371
372         if (oti != NULL) {
373                 if (oa->o_valid & OBD_MD_FLCOOKIE) {
374                         if (oti->oti_logcookies == NULL)
375                                 oti->oti_logcookies = &oti->oti_onecookie;
376
377                         *oti->oti_logcookies = oa->o_lcookie;
378                 }
379         }
380
381         CDEBUG(D_HA, "transno: "LPD64"\n",
382                lustre_msg_get_transno(req->rq_repmsg));
383 out_req:
384         ptlrpc_req_finished(req);
385 out:
386         RETURN(rc);
387 }
388
389 int osc_punch_base(struct obd_export *exp, struct obd_info *oinfo,
390                    obd_enqueue_update_f upcall, void *cookie,
391                    struct ptlrpc_request_set *rqset)
392 {
393         struct ptlrpc_request   *req;
394         struct osc_setattr_args *sa;
395         struct ost_body         *body;
396         int                      rc;
397         ENTRY;
398
399         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
400         if (req == NULL)
401                 RETURN(-ENOMEM);
402
403         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
404         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
405         if (rc) {
406                 ptlrpc_request_free(req);
407                 RETURN(rc);
408         }
409         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
410         ptlrpc_at_set_req_timeout(req);
411
412         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
413         LASSERT(body);
414         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
415                              oinfo->oi_oa);
416         osc_pack_capa(req, body, oinfo->oi_capa);
417
418         ptlrpc_request_set_replen(req);
419
420         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
421         CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
422         sa = ptlrpc_req_async_args(req);
423         sa->sa_oa     = oinfo->oi_oa;
424         sa->sa_upcall = upcall;
425         sa->sa_cookie = cookie;
426         if (rqset == PTLRPCD_SET)
427                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
428         else
429                 ptlrpc_set_add_req(rqset, req);
430
431         RETURN(0);
432 }
433
434 static int osc_sync_interpret(const struct lu_env *env,
435                               struct ptlrpc_request *req,
436                               void *arg, int rc)
437 {
438         struct osc_fsync_args *fa = arg;
439         struct ost_body *body;
440         ENTRY;
441
442         if (rc)
443                 GOTO(out, rc);
444
445         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
446         if (body == NULL) {
447                 CERROR ("can't unpack ost_body\n");
448                 GOTO(out, rc = -EPROTO);
449         }
450
451         *fa->fa_oi->oi_oa = body->oa;
452 out:
453         rc = fa->fa_upcall(fa->fa_cookie, rc);
454         RETURN(rc);
455 }
456
457 int osc_sync_base(struct obd_export *exp, struct obd_info *oinfo,
458                   obd_enqueue_update_f upcall, void *cookie,
459                   struct ptlrpc_request_set *rqset)
460 {
461         struct ptlrpc_request *req;
462         struct ost_body       *body;
463         struct osc_fsync_args *fa;
464         int                    rc;
465         ENTRY;
466
467         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
468         if (req == NULL)
469                 RETURN(-ENOMEM);
470
471         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
472         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
473         if (rc) {
474                 ptlrpc_request_free(req);
475                 RETURN(rc);
476         }
477
478         /* overload the size and blocks fields in the oa with start/end */
479         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
480         LASSERT(body);
481         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
482                              oinfo->oi_oa);
483         osc_pack_capa(req, body, oinfo->oi_capa);
484
485         ptlrpc_request_set_replen(req);
486         req->rq_interpret_reply = osc_sync_interpret;
487
488         CLASSERT(sizeof(*fa) <= sizeof(req->rq_async_args));
489         fa = ptlrpc_req_async_args(req);
490         fa->fa_oi = oinfo;
491         fa->fa_upcall = upcall;
492         fa->fa_cookie = cookie;
493
494         if (rqset == PTLRPCD_SET)
495                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
496         else
497                 ptlrpc_set_add_req(rqset, req);
498
499         RETURN (0);
500 }
501
502 /* Find and cancel locally locks matched by @mode in the resource found by
503  * @objid. Found locks are added into @cancel list. Returns the amount of
504  * locks added to @cancels list. */
505 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
506                                    struct list_head *cancels,
507                                    ldlm_mode_t mode, __u64 lock_flags)
508 {
509         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
510         struct ldlm_res_id res_id;
511         struct ldlm_resource *res;
512         int count;
513         ENTRY;
514
515         /* Return, i.e. cancel nothing, only if ELC is supported (flag in
516          * export) but disabled through procfs (flag in NS).
517          *
518          * This distinguishes from a case when ELC is not supported originally,
519          * when we still want to cancel locks in advance and just cancel them
520          * locally, without sending any RPC. */
521         if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
522                 RETURN(0);
523
524         ostid_build_res_name(&oa->o_oi, &res_id);
525         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
526         if (IS_ERR(res))
527                 RETURN(0);
528
529         LDLM_RESOURCE_ADDREF(res);
530         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
531                                            lock_flags, 0, NULL);
532         LDLM_RESOURCE_DELREF(res);
533         ldlm_resource_putref(res);
534         RETURN(count);
535 }
536
537 static int osc_destroy_interpret(const struct lu_env *env,
538                                  struct ptlrpc_request *req, void *data,
539                                  int rc)
540 {
541         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
542
543         atomic_dec(&cli->cl_destroy_in_flight);
544         wake_up(&cli->cl_destroy_waitq);
545         return 0;
546 }
547
548 static int osc_can_send_destroy(struct client_obd *cli)
549 {
550         if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
551             cli->cl_max_rpcs_in_flight) {
552                 /* The destroy request can be sent */
553                 return 1;
554         }
555         if (atomic_dec_return(&cli->cl_destroy_in_flight) <
556             cli->cl_max_rpcs_in_flight) {
557                 /*
558                  * The counter has been modified between the two atomic
559                  * operations.
560                  */
561                 wake_up(&cli->cl_destroy_waitq);
562         }
563         return 0;
564 }
565
566 /* Destroy requests can be async always on the client, and we don't even really
567  * care about the return code since the client cannot do anything at all about
568  * a destroy failure.
569  * When the MDS is unlinking a filename, it saves the file objects into a
570  * recovery llog, and these object records are cancelled when the OST reports
571  * they were destroyed and sync'd to disk (i.e. transaction committed).
572  * If the client dies, or the OST is down when the object should be destroyed,
573  * the records are not cancelled, and when the OST reconnects to the MDS next,
574  * it will retrieve the llog unlink logs and then sends the log cancellation
575  * cookies to the MDS after committing destroy transactions. */
576 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
577                        struct obdo *oa, struct obd_trans_info *oti)
578 {
579         struct client_obd     *cli = &exp->exp_obd->u.cli;
580         struct ptlrpc_request *req;
581         struct ost_body       *body;
582         struct list_head       cancels = LIST_HEAD_INIT(cancels);
583         int rc, count;
584         ENTRY;
585
586         if (!oa) {
587                 CDEBUG(D_INFO, "oa NULL\n");
588                 RETURN(-EINVAL);
589         }
590
591         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
592                                         LDLM_FL_DISCARD_DATA);
593
594         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
595         if (req == NULL) {
596                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
597                 RETURN(-ENOMEM);
598         }
599
600         osc_set_capa_size(req, &RMF_CAPA1, NULL);
601         rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
602                                0, &cancels, count);
603         if (rc) {
604                 ptlrpc_request_free(req);
605                 RETURN(rc);
606         }
607
608         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
609         ptlrpc_at_set_req_timeout(req);
610
611         if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
612                 oa->o_lcookie = *oti->oti_logcookies;
613         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
614         LASSERT(body);
615         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
616
617         ptlrpc_request_set_replen(req);
618
619         /* If osc_destory is for destroying the unlink orphan,
620          * sent from MDT to OST, which should not be blocked here,
621          * because the process might be triggered by ptlrpcd, and
622          * it is not good to block ptlrpcd thread (b=16006)*/
623         if (!(oa->o_flags & OBD_FL_DELORPHAN)) {
624                 req->rq_interpret_reply = osc_destroy_interpret;
625                 if (!osc_can_send_destroy(cli)) {
626                         struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP,
627                                                           NULL);
628
629                         /*
630                          * Wait until the number of on-going destroy RPCs drops
631                          * under max_rpc_in_flight
632                          */
633                         l_wait_event_exclusive(cli->cl_destroy_waitq,
634                                                osc_can_send_destroy(cli), &lwi);
635                 }
636         }
637
638         /* Do not wait for response */
639         ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
640         RETURN(0);
641 }
642
643 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
644                                 long writing_bytes)
645 {
646         u64 bits = OBD_MD_FLBLOCKS | OBD_MD_FLGRANT;
647
648         LASSERT(!(oa->o_valid & bits));
649
650         oa->o_valid |= bits;
651         spin_lock(&cli->cl_loi_list_lock);
652         oa->o_dirty = cli->cl_dirty_pages << PAGE_CACHE_SHIFT;
653         if (unlikely(cli->cl_dirty_pages - cli->cl_dirty_transit >
654                      cli->cl_dirty_max_pages)) {
655                 CERROR("dirty %lu - %lu > dirty_max %lu\n",
656                        cli->cl_dirty_pages, cli->cl_dirty_transit,
657                        cli->cl_dirty_max_pages);
658                 oa->o_undirty = 0;
659         } else if (unlikely(atomic_long_read(&obd_dirty_pages) -
660                             atomic_long_read(&obd_dirty_transit_pages) >
661                             (obd_max_dirty_pages + 1))) {
662                 /* The atomic_read() allowing the atomic_inc() are
663                  * not covered by a lock thus they may safely race and trip
664                  * this CERROR() unless we add in a small fudge factor (+1). */
665                 CERROR("%s: dirty %ld - %ld > system dirty_max %lu\n",
666                        cli->cl_import->imp_obd->obd_name,
667                        atomic_long_read(&obd_dirty_pages),
668                        atomic_long_read(&obd_dirty_transit_pages),
669                        obd_max_dirty_pages);
670                 oa->o_undirty = 0;
671         } else if (unlikely(cli->cl_dirty_max_pages - cli->cl_dirty_pages >
672                             0x7fffffff)) {
673                 CERROR("dirty %lu - dirty_max %lu too big???\n",
674                        cli->cl_dirty_pages, cli->cl_dirty_max_pages);
675                 oa->o_undirty = 0;
676         } else {
677                 unsigned long max_in_flight = (cli->cl_max_pages_per_rpc <<
678                                       PAGE_CACHE_SHIFT) *
679                                      (cli->cl_max_rpcs_in_flight + 1);
680                 oa->o_undirty = max(cli->cl_dirty_max_pages << PAGE_CACHE_SHIFT,
681                                     max_in_flight);
682         }
683         oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
684         oa->o_dropped = cli->cl_lost_grant;
685         cli->cl_lost_grant = 0;
686         spin_unlock(&cli->cl_loi_list_lock);
687         CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
688                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
689
690 }
691
692 void osc_update_next_shrink(struct client_obd *cli)
693 {
694         cli->cl_next_shrink_grant =
695                 cfs_time_shift(cli->cl_grant_shrink_interval);
696         CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
697                cli->cl_next_shrink_grant);
698 }
699
700 static void __osc_update_grant(struct client_obd *cli, u64 grant)
701 {
702         spin_lock(&cli->cl_loi_list_lock);
703         cli->cl_avail_grant += grant;
704         spin_unlock(&cli->cl_loi_list_lock);
705 }
706
707 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
708 {
709         if (body->oa.o_valid & OBD_MD_FLGRANT) {
710                 CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
711                 __osc_update_grant(cli, body->oa.o_grant);
712         }
713 }
714
715 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
716                               u32 keylen, void *key,
717                               u32 vallen, void *val,
718                               struct ptlrpc_request_set *set);
719
720 static int osc_shrink_grant_interpret(const struct lu_env *env,
721                                       struct ptlrpc_request *req,
722                                       void *aa, int rc)
723 {
724         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
725         struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
726         struct ost_body *body;
727
728         if (rc != 0) {
729                 __osc_update_grant(cli, oa->o_grant);
730                 GOTO(out, rc);
731         }
732
733         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
734         LASSERT(body);
735         osc_update_grant(cli, body);
736 out:
737         OBDO_FREE(oa);
738         return rc;
739 }
740
741 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
742 {
743         spin_lock(&cli->cl_loi_list_lock);
744         oa->o_grant = cli->cl_avail_grant / 4;
745         cli->cl_avail_grant -= oa->o_grant;
746         spin_unlock(&cli->cl_loi_list_lock);
747         if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
748                 oa->o_valid |= OBD_MD_FLFLAGS;
749                 oa->o_flags = 0;
750         }
751         oa->o_flags |= OBD_FL_SHRINK_GRANT;
752         osc_update_next_shrink(cli);
753 }
754
755 /* Shrink the current grant, either from some large amount to enough for a
756  * full set of in-flight RPCs, or if we have already shrunk to that limit
757  * then to enough for a single RPC.  This avoids keeping more grant than
758  * needed, and avoids shrinking the grant piecemeal. */
759 static int osc_shrink_grant(struct client_obd *cli)
760 {
761         __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) *
762                              (cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT);
763
764         spin_lock(&cli->cl_loi_list_lock);
765         if (cli->cl_avail_grant <= target_bytes)
766                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
767         spin_unlock(&cli->cl_loi_list_lock);
768
769         return osc_shrink_grant_to_target(cli, target_bytes);
770 }
771
772 int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
773 {
774         int                     rc = 0;
775         struct ost_body        *body;
776         ENTRY;
777
778         spin_lock(&cli->cl_loi_list_lock);
779         /* Don't shrink if we are already above or below the desired limit
780          * We don't want to shrink below a single RPC, as that will negatively
781          * impact block allocation and long-term performance. */
782         if (target_bytes < cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT)
783                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
784
785         if (target_bytes >= cli->cl_avail_grant) {
786                 spin_unlock(&cli->cl_loi_list_lock);
787                 RETURN(0);
788         }
789         spin_unlock(&cli->cl_loi_list_lock);
790
791         OBD_ALLOC_PTR(body);
792         if (!body)
793                 RETURN(-ENOMEM);
794
795         osc_announce_cached(cli, &body->oa, 0);
796
797         spin_lock(&cli->cl_loi_list_lock);
798         body->oa.o_grant = cli->cl_avail_grant - target_bytes;
799         cli->cl_avail_grant = target_bytes;
800         spin_unlock(&cli->cl_loi_list_lock);
801         if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
802                 body->oa.o_valid |= OBD_MD_FLFLAGS;
803                 body->oa.o_flags = 0;
804         }
805         body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
806         osc_update_next_shrink(cli);
807
808         rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
809                                 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
810                                 sizeof(*body), body, NULL);
811         if (rc != 0)
812                 __osc_update_grant(cli, body->oa.o_grant);
813         OBD_FREE_PTR(body);
814         RETURN(rc);
815 }
816
817 static int osc_should_shrink_grant(struct client_obd *client)
818 {
819         cfs_time_t time = cfs_time_current();
820         cfs_time_t next_shrink = client->cl_next_shrink_grant;
821
822         if ((client->cl_import->imp_connect_data.ocd_connect_flags &
823              OBD_CONNECT_GRANT_SHRINK) == 0)
824                 return 0;
825
826         if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
827                 /* Get the current RPC size directly, instead of going via:
828                  * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
829                  * Keep comment here so that it can be found by searching. */
830                 int brw_size = client->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
831
832                 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
833                     client->cl_avail_grant > brw_size)
834                         return 1;
835                 else
836                         osc_update_next_shrink(client);
837         }
838         return 0;
839 }
840
841 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
842 {
843         struct client_obd *client;
844
845         list_for_each_entry(client, &item->ti_obd_list, cl_grant_shrink_list) {
846                 if (osc_should_shrink_grant(client))
847                         osc_shrink_grant(client);
848         }
849         return 0;
850 }
851
852 static int osc_add_shrink_grant(struct client_obd *client)
853 {
854         int rc;
855
856         rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
857                                        TIMEOUT_GRANT,
858                                        osc_grant_shrink_grant_cb, NULL,
859                                        &client->cl_grant_shrink_list);
860         if (rc) {
861                 CERROR("add grant client %s error %d\n",
862                         client->cl_import->imp_obd->obd_name, rc);
863                 return rc;
864         }
865         CDEBUG(D_CACHE, "add grant client %s \n",
866                client->cl_import->imp_obd->obd_name);
867         osc_update_next_shrink(client);
868         return 0;
869 }
870
871 static int osc_del_shrink_grant(struct client_obd *client)
872 {
873         return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
874                                          TIMEOUT_GRANT);
875 }
876
877 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
878 {
879         /*
880          * ocd_grant is the total grant amount we're expect to hold: if we've
881          * been evicted, it's the new avail_grant amount, cl_dirty_pages will
882          * drop to 0 as inflight RPCs fail out; otherwise, it's avail_grant +
883          * dirty.
884          *
885          * race is tolerable here: if we're evicted, but imp_state already
886          * left EVICTED state, then cl_dirty_pages must be 0 already.
887          */
888         spin_lock(&cli->cl_loi_list_lock);
889         if (cli->cl_import->imp_state == LUSTRE_IMP_EVICTED)
890                 cli->cl_avail_grant = ocd->ocd_grant;
891         else
892                 cli->cl_avail_grant = ocd->ocd_grant -
893                                       (cli->cl_dirty_pages << PAGE_CACHE_SHIFT);
894
895         if (cli->cl_avail_grant < 0) {
896                 CWARN("%s: available grant < 0: avail/ocd/dirty %ld/%u/%ld\n",
897                       cli->cl_import->imp_obd->obd_name, cli->cl_avail_grant,
898                       ocd->ocd_grant, cli->cl_dirty_pages << PAGE_CACHE_SHIFT);
899                 /* workaround for servers which do not have the patch from
900                  * LU-2679 */
901                 cli->cl_avail_grant = ocd->ocd_grant;
902         }
903
904         /* determine the appropriate chunk size used by osc_extent. */
905         cli->cl_chunkbits = max_t(int, PAGE_CACHE_SHIFT, ocd->ocd_blocksize);
906         spin_unlock(&cli->cl_loi_list_lock);
907
908         CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld."
909                 "chunk bits: %d.\n", cli->cl_import->imp_obd->obd_name,
910                 cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits);
911
912         if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
913             list_empty(&cli->cl_grant_shrink_list))
914                 osc_add_shrink_grant(cli);
915 }
916
917 /* We assume that the reason this OSC got a short read is because it read
918  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
919  * via the LOV, and it _knows_ it's reading inside the file, it's just that
920  * this stripe never got written at or beyond this stripe offset yet. */
921 static void handle_short_read(int nob_read, size_t page_count,
922                               struct brw_page **pga)
923 {
924         char *ptr;
925         int i = 0;
926
927         /* skip bytes read OK */
928         while (nob_read > 0) {
929                 LASSERT (page_count > 0);
930
931                 if (pga[i]->count > nob_read) {
932                         /* EOF inside this page */
933                         ptr = kmap(pga[i]->pg) +
934                                 (pga[i]->off & ~CFS_PAGE_MASK);
935                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
936                         kunmap(pga[i]->pg);
937                         page_count--;
938                         i++;
939                         break;
940                 }
941
942                 nob_read -= pga[i]->count;
943                 page_count--;
944                 i++;
945         }
946
947         /* zero remaining pages */
948         while (page_count-- > 0) {
949                 ptr = kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
950                 memset(ptr, 0, pga[i]->count);
951                 kunmap(pga[i]->pg);
952                 i++;
953         }
954 }
955
956 static int check_write_rcs(struct ptlrpc_request *req,
957                            int requested_nob, int niocount,
958                            size_t page_count, struct brw_page **pga)
959 {
960         int     i;
961         __u32   *remote_rcs;
962
963         remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
964                                                   sizeof(*remote_rcs) *
965                                                   niocount);
966         if (remote_rcs == NULL) {
967                 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
968                 return(-EPROTO);
969         }
970
971         /* return error if any niobuf was in error */
972         for (i = 0; i < niocount; i++) {
973                 if ((int)remote_rcs[i] < 0)
974                         return(remote_rcs[i]);
975
976                 if (remote_rcs[i] != 0) {
977                         CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
978                                 i, remote_rcs[i], req);
979                         return(-EPROTO);
980                 }
981         }
982
983         if (req->rq_bulk->bd_nob_transferred != requested_nob) {
984                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
985                        req->rq_bulk->bd_nob_transferred, requested_nob);
986                 return(-EPROTO);
987         }
988
989         return (0);
990 }
991
992 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
993 {
994         if (p1->flag != p2->flag) {
995                 unsigned mask = ~(OBD_BRW_FROM_GRANT | OBD_BRW_NOCACHE |
996                                   OBD_BRW_SYNC       | OBD_BRW_ASYNC   |
997                                   OBD_BRW_NOQUOTA    | OBD_BRW_SOFT_SYNC);
998
999                 /* warn if we try to combine flags that we don't know to be
1000                  * safe to combine */
1001                 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
1002                         CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
1003                               "report this at https://jira.hpdd.intel.com/\n",
1004                               p1->flag, p2->flag);
1005                 }
1006                 return 0;
1007         }
1008
1009         return (p1->off + p1->count == p2->off);
1010 }
1011
1012 static u32 osc_checksum_bulk(int nob, size_t pg_count,
1013                              struct brw_page **pga, int opc,
1014                              cksum_type_t cksum_type)
1015 {
1016         u32                             cksum;
1017         int                             i = 0;
1018         struct cfs_crypto_hash_desc     *hdesc;
1019         unsigned int                    bufsize;
1020         int                             err;
1021         unsigned char                   cfs_alg = cksum_obd2cfs(cksum_type);
1022
1023         LASSERT(pg_count > 0);
1024
1025         hdesc = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1026         if (IS_ERR(hdesc)) {
1027                 CERROR("Unable to initialize checksum hash %s\n",
1028                        cfs_crypto_hash_name(cfs_alg));
1029                 return PTR_ERR(hdesc);
1030         }
1031
1032         while (nob > 0 && pg_count > 0) {
1033                 unsigned int count = pga[i]->count > nob ? nob : pga[i]->count;
1034
1035                 /* corrupt the data before we compute the checksum, to
1036                  * simulate an OST->client data error */
1037                 if (i == 0 && opc == OST_READ &&
1038                     OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
1039                         unsigned char *ptr = kmap(pga[i]->pg);
1040                         int off = pga[i]->off & ~CFS_PAGE_MASK;
1041
1042                         memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob));
1043                         kunmap(pga[i]->pg);
1044                 }
1045                 cfs_crypto_hash_update_page(hdesc, pga[i]->pg,
1046                                             pga[i]->off & ~CFS_PAGE_MASK,
1047                                             count);
1048                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d\n",
1049                                (int)(pga[i]->off & ~CFS_PAGE_MASK));
1050
1051                 nob -= pga[i]->count;
1052                 pg_count--;
1053                 i++;
1054         }
1055
1056         bufsize = sizeof(cksum);
1057         err = cfs_crypto_hash_final(hdesc, (unsigned char *)&cksum, &bufsize);
1058
1059         /* For sending we only compute the wrong checksum instead
1060          * of corrupting the data so it is still correct on a redo */
1061         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1062                 cksum++;
1063
1064         return cksum;
1065 }
1066
1067 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
1068                                 struct lov_stripe_md *lsm, u32 page_count,
1069                                 struct brw_page **pga,
1070                                 struct ptlrpc_request **reqp,
1071                                 struct obd_capa *ocapa, int reserve,
1072                                 int resend)
1073 {
1074         struct ptlrpc_request   *req;
1075         struct ptlrpc_bulk_desc *desc;
1076         struct ost_body         *body;
1077         struct obd_ioobj        *ioobj;
1078         struct niobuf_remote    *niobuf;
1079         int niocount, i, requested_nob, opc, rc;
1080         struct osc_brw_async_args *aa;
1081         struct req_capsule      *pill;
1082         struct brw_page *pg_prev;
1083
1084         ENTRY;
1085         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1086                 RETURN(-ENOMEM); /* Recoverable */
1087         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1088                 RETURN(-EINVAL); /* Fatal */
1089
1090         if ((cmd & OBD_BRW_WRITE) != 0) {
1091                 opc = OST_WRITE;
1092                 req = ptlrpc_request_alloc_pool(cli->cl_import,
1093                                                 cli->cl_import->imp_rq_pool,
1094                                                 &RQF_OST_BRW_WRITE);
1095         } else {
1096                 opc = OST_READ;
1097                 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1098         }
1099         if (req == NULL)
1100                 RETURN(-ENOMEM);
1101
1102         for (niocount = i = 1; i < page_count; i++) {
1103                 if (!can_merge_pages(pga[i - 1], pga[i]))
1104                         niocount++;
1105         }
1106
1107         pill = &req->rq_pill;
1108         req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1109                              sizeof(*ioobj));
1110         req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1111                              niocount * sizeof(*niobuf));
1112         osc_set_capa_size(req, &RMF_CAPA1, ocapa);
1113
1114         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1115         if (rc) {
1116                 ptlrpc_request_free(req);
1117                 RETURN(rc);
1118         }
1119         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1120         ptlrpc_at_set_req_timeout(req);
1121         /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1122          * retry logic */
1123         req->rq_no_retry_einprogress = 1;
1124
1125         desc = ptlrpc_prep_bulk_imp(req, page_count,
1126                 cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
1127                 opc == OST_WRITE ? BULK_GET_SOURCE : BULK_PUT_SINK,
1128                 OST_BULK_PORTAL);
1129
1130         if (desc == NULL)
1131                 GOTO(out, rc = -ENOMEM);
1132         /* NB request now owns desc and will free it when it gets freed */
1133
1134         body = req_capsule_client_get(pill, &RMF_OST_BODY);
1135         ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1136         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1137         LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1138
1139         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
1140
1141         obdo_to_ioobj(oa, ioobj);
1142         ioobj->ioo_bufcnt = niocount;
1143         /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
1144          * that might be send for this request.  The actual number is decided
1145          * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
1146          * "max - 1" for old client compatibility sending "0", and also so the
1147          * the actual maximum is a power-of-two number, not one less. LU-1431 */
1148         ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
1149         osc_pack_capa(req, body, ocapa);
1150         LASSERT(page_count > 0);
1151         pg_prev = pga[0];
1152         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1153                 struct brw_page *pg = pga[i];
1154                 int poff = pg->off & ~CFS_PAGE_MASK;
1155
1156                 LASSERT(pg->count > 0);
1157                 /* make sure there is no gap in the middle of page array */
1158                 LASSERTF(page_count == 1 ||
1159                          (ergo(i == 0, poff + pg->count == PAGE_CACHE_SIZE) &&
1160                           ergo(i > 0 && i < page_count - 1,
1161                                poff == 0 && pg->count == PAGE_CACHE_SIZE)   &&
1162                           ergo(i == page_count - 1, poff == 0)),
1163                          "i: %d/%d pg: %p off: "LPU64", count: %u\n",
1164                          i, page_count, pg, pg->off, pg->count);
1165                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1166                          "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1167                          " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1168                          i, page_count,
1169                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1170                          pg_prev->pg, page_private(pg_prev->pg),
1171                          pg_prev->pg->index, pg_prev->off);
1172                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1173                         (pg->flag & OBD_BRW_SRVLOCK));
1174
1175                 ptlrpc_prep_bulk_page_pin(desc, pg->pg, poff, pg->count);
1176                 requested_nob += pg->count;
1177
1178                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1179                         niobuf--;
1180                         niobuf->rnb_len += pg->count;
1181                 } else {
1182                         niobuf->rnb_offset = pg->off;
1183                         niobuf->rnb_len    = pg->count;
1184                         niobuf->rnb_flags  = pg->flag;
1185                 }
1186                 pg_prev = pg;
1187         }
1188
1189         LASSERTF((void *)(niobuf - niocount) ==
1190                 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1191                 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1192                 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1193
1194         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1195         if (resend) {
1196                 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1197                         body->oa.o_valid |= OBD_MD_FLFLAGS;
1198                         body->oa.o_flags = 0;
1199                 }
1200                 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1201         }
1202
1203         if (osc_should_shrink_grant(cli))
1204                 osc_shrink_grant_local(cli, &body->oa);
1205
1206         /* size[REQ_REC_OFF] still sizeof (*body) */
1207         if (opc == OST_WRITE) {
1208                 if (cli->cl_checksum &&
1209                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1210                         /* store cl_cksum_type in a local variable since
1211                          * it can be changed via lprocfs */
1212                         cksum_type_t cksum_type = cli->cl_cksum_type;
1213
1214                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1215                                 oa->o_flags &= OBD_FL_LOCAL_MASK;
1216                                 body->oa.o_flags = 0;
1217                         }
1218                         body->oa.o_flags |= cksum_type_pack(cksum_type);
1219                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1220                         body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1221                                                              page_count, pga,
1222                                                              OST_WRITE,
1223                                                              cksum_type);
1224                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1225                                body->oa.o_cksum);
1226                         /* save this in 'oa', too, for later checking */
1227                         oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1228                         oa->o_flags |= cksum_type_pack(cksum_type);
1229                 } else {
1230                         /* clear out the checksum flag, in case this is a
1231                          * resend but cl_checksum is no longer set. b=11238 */
1232                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1233                 }
1234                 oa->o_cksum = body->oa.o_cksum;
1235                 /* 1 RC per niobuf */
1236                 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1237                                      sizeof(__u32) * niocount);
1238         } else {
1239                 if (cli->cl_checksum &&
1240                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1241                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1242                                 body->oa.o_flags = 0;
1243                         body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1244                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1245                 }
1246         }
1247         ptlrpc_request_set_replen(req);
1248
1249         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1250         aa = ptlrpc_req_async_args(req);
1251         aa->aa_oa = oa;
1252         aa->aa_requested_nob = requested_nob;
1253         aa->aa_nio_count = niocount;
1254         aa->aa_page_count = page_count;
1255         aa->aa_resends = 0;
1256         aa->aa_ppga = pga;
1257         aa->aa_cli = cli;
1258         INIT_LIST_HEAD(&aa->aa_oaps);
1259         if (ocapa && reserve)
1260                 aa->aa_ocapa = capa_get(ocapa);
1261
1262         *reqp = req;
1263         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1264         CDEBUG(D_RPCTRACE, "brw rpc %p - object "DOSTID" offset %lld<>%lld\n",
1265                 req, POSTID(&oa->o_oi), niobuf[0].rnb_offset,
1266                 niobuf[niocount - 1].rnb_offset + niobuf[niocount - 1].rnb_len);
1267         RETURN(0);
1268
1269  out:
1270         ptlrpc_req_finished(req);
1271         RETURN(rc);
1272 }
1273
1274 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1275                                 __u32 client_cksum, __u32 server_cksum, int nob,
1276                                 size_t page_count, struct brw_page **pga,
1277                                 cksum_type_t client_cksum_type)
1278 {
1279         __u32 new_cksum;
1280         char *msg;
1281         cksum_type_t cksum_type;
1282
1283         if (server_cksum == client_cksum) {
1284                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1285                 return 0;
1286         }
1287
1288         cksum_type = cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1289                                        oa->o_flags : 0);
1290         new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1291                                       cksum_type);
1292
1293         if (cksum_type != client_cksum_type)
1294                 msg = "the server did not use the checksum type specified in "
1295                       "the original request - likely a protocol problem";
1296         else if (new_cksum == server_cksum)
1297                 msg = "changed on the client after we checksummed it - "
1298                       "likely false positive due to mmap IO (bug 11742)";
1299         else if (new_cksum == client_cksum)
1300                 msg = "changed in transit before arrival at OST";
1301         else
1302                 msg = "changed in transit AND doesn't match the original - "
1303                       "likely false positive due to mmap IO (bug 11742)";
1304
1305         LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inode "DFID
1306                            " object "DOSTID" extent ["LPU64"-"LPU64"]\n",
1307                            msg, libcfs_nid2str(peer->nid),
1308                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1309                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1310                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1311                            POSTID(&oa->o_oi), pga[0]->off,
1312                            pga[page_count-1]->off + pga[page_count-1]->count - 1);
1313         CERROR("original client csum %x (type %x), server csum %x (type %x), "
1314                "client csum now %x\n", client_cksum, client_cksum_type,
1315                server_cksum, cksum_type, new_cksum);
1316         return 1;
1317 }
1318
1319 /* Note rc enters this function as number of bytes transferred */
1320 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1321 {
1322         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1323         const lnet_process_id_t *peer =
1324                         &req->rq_import->imp_connection->c_peer;
1325         struct client_obd *cli = aa->aa_cli;
1326         struct ost_body *body;
1327         u32 client_cksum = 0;
1328         ENTRY;
1329
1330         if (rc < 0 && rc != -EDQUOT) {
1331                 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1332                 RETURN(rc);
1333         }
1334
1335         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1336         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1337         if (body == NULL) {
1338                 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1339                 RETURN(-EPROTO);
1340         }
1341
1342         /* set/clear over quota flag for a uid/gid */
1343         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1344             body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) {
1345                 unsigned int qid[MAXQUOTAS] = { body->oa.o_uid, body->oa.o_gid };
1346
1347                 CDEBUG(D_QUOTA, "setdq for [%u %u] with valid "LPX64", flags %x\n",
1348                        body->oa.o_uid, body->oa.o_gid, body->oa.o_valid,
1349                        body->oa.o_flags);
1350                 osc_quota_setdq(cli, qid, body->oa.o_valid, body->oa.o_flags);
1351         }
1352
1353         osc_update_grant(cli, body);
1354
1355         if (rc < 0)
1356                 RETURN(rc);
1357
1358         if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1359                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1360
1361         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1362                 if (rc > 0) {
1363                         CERROR("Unexpected +ve rc %d\n", rc);
1364                         RETURN(-EPROTO);
1365                 }
1366                 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1367
1368                 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1369                         RETURN(-EAGAIN);
1370
1371                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1372                     check_write_checksum(&body->oa, peer, client_cksum,
1373                                          body->oa.o_cksum, aa->aa_requested_nob,
1374                                          aa->aa_page_count, aa->aa_ppga,
1375                                          cksum_type_unpack(aa->aa_oa->o_flags)))
1376                         RETURN(-EAGAIN);
1377
1378                 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1379                                      aa->aa_page_count, aa->aa_ppga);
1380                 GOTO(out, rc);
1381         }
1382
1383         /* The rest of this function executes only for OST_READs */
1384
1385         /* if unwrap_bulk failed, return -EAGAIN to retry */
1386         rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1387         if (rc < 0)
1388                 GOTO(out, rc = -EAGAIN);
1389
1390         if (rc > aa->aa_requested_nob) {
1391                 CERROR("Unexpected rc %d (%d requested)\n", rc,
1392                        aa->aa_requested_nob);
1393                 RETURN(-EPROTO);
1394         }
1395
1396         if (rc != req->rq_bulk->bd_nob_transferred) {
1397                 CERROR ("Unexpected rc %d (%d transferred)\n",
1398                         rc, req->rq_bulk->bd_nob_transferred);
1399                 return (-EPROTO);
1400         }
1401
1402         if (rc < aa->aa_requested_nob)
1403                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1404
1405         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1406                 static int cksum_counter;
1407                 u32        server_cksum = body->oa.o_cksum;
1408                 char      *via = "";
1409                 char      *router = "";
1410                 cksum_type_t cksum_type;
1411
1412                 cksum_type = cksum_type_unpack(body->oa.o_valid &OBD_MD_FLFLAGS?
1413                                                body->oa.o_flags : 0);
1414                 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1415                                                  aa->aa_ppga, OST_READ,
1416                                                  cksum_type);
1417
1418                 if (peer->nid != req->rq_bulk->bd_sender) {
1419                         via = " via ";
1420                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
1421                 }
1422
1423                 if (server_cksum != client_cksum) {
1424                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1425                                            "%s%s%s inode "DFID" object "DOSTID
1426                                            " extent ["LPU64"-"LPU64"]\n",
1427                                            req->rq_import->imp_obd->obd_name,
1428                                            libcfs_nid2str(peer->nid),
1429                                            via, router,
1430                                            body->oa.o_valid & OBD_MD_FLFID ?
1431                                                 body->oa.o_parent_seq : (__u64)0,
1432                                            body->oa.o_valid & OBD_MD_FLFID ?
1433                                                 body->oa.o_parent_oid : 0,
1434                                            body->oa.o_valid & OBD_MD_FLFID ?
1435                                                 body->oa.o_parent_ver : 0,
1436                                            POSTID(&body->oa.o_oi),
1437                                            aa->aa_ppga[0]->off,
1438                                            aa->aa_ppga[aa->aa_page_count-1]->off +
1439                                            aa->aa_ppga[aa->aa_page_count-1]->count -
1440                                                                         1);
1441                         CERROR("client %x, server %x, cksum_type %x\n",
1442                                client_cksum, server_cksum, cksum_type);
1443                         cksum_counter = 0;
1444                         aa->aa_oa->o_cksum = client_cksum;
1445                         rc = -EAGAIN;
1446                 } else {
1447                         cksum_counter++;
1448                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1449                         rc = 0;
1450                 }
1451         } else if (unlikely(client_cksum)) {
1452                 static int cksum_missed;
1453
1454                 cksum_missed++;
1455                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1456                         CERROR("Checksum %u requested from %s but not sent\n",
1457                                cksum_missed, libcfs_nid2str(peer->nid));
1458         } else {
1459                 rc = 0;
1460         }
1461 out:
1462         if (rc >= 0)
1463                 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
1464                                      aa->aa_oa, &body->oa);
1465
1466         RETURN(rc);
1467 }
1468
1469 static int osc_brw_redo_request(struct ptlrpc_request *request,
1470                                 struct osc_brw_async_args *aa, int rc)
1471 {
1472         struct ptlrpc_request *new_req;
1473         struct osc_brw_async_args *new_aa;
1474         struct osc_async_page *oap;
1475         ENTRY;
1476
1477         DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
1478                   "redo for recoverable error %d", rc);
1479
1480         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1481                                         OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1482                                   aa->aa_cli, aa->aa_oa,
1483                                   NULL /* lsm unused by osc currently */,
1484                                   aa->aa_page_count, aa->aa_ppga,
1485                                   &new_req, aa->aa_ocapa, 0, 1);
1486         if (rc)
1487                 RETURN(rc);
1488
1489         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1490                 if (oap->oap_request != NULL) {
1491                         LASSERTF(request == oap->oap_request,
1492                                  "request %p != oap_request %p\n",
1493                                  request, oap->oap_request);
1494                         if (oap->oap_interrupted) {
1495                                 ptlrpc_req_finished(new_req);
1496                                 RETURN(-EINTR);
1497                         }
1498                 }
1499         }
1500         /* New request takes over pga and oaps from old request.
1501          * Note that copying a list_head doesn't work, need to move it... */
1502         aa->aa_resends++;
1503         new_req->rq_interpret_reply = request->rq_interpret_reply;
1504         new_req->rq_async_args = request->rq_async_args;
1505         new_req->rq_commit_cb = request->rq_commit_cb;
1506         /* cap resend delay to the current request timeout, this is similar to
1507          * what ptlrpc does (see after_reply()) */
1508         if (aa->aa_resends > new_req->rq_timeout)
1509                 new_req->rq_sent = cfs_time_current_sec() + new_req->rq_timeout;
1510         else
1511                 new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
1512         new_req->rq_generation_set = 1;
1513         new_req->rq_import_generation = request->rq_import_generation;
1514
1515         new_aa = ptlrpc_req_async_args(new_req);
1516
1517         INIT_LIST_HEAD(&new_aa->aa_oaps);
1518         list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
1519         INIT_LIST_HEAD(&new_aa->aa_exts);
1520         list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
1521         new_aa->aa_resends = aa->aa_resends;
1522
1523         list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1524                 if (oap->oap_request) {
1525                         ptlrpc_req_finished(oap->oap_request);
1526                         oap->oap_request = ptlrpc_request_addref(new_req);
1527                 }
1528         }
1529
1530         new_aa->aa_ocapa = aa->aa_ocapa;
1531         aa->aa_ocapa = NULL;
1532
1533         /* XXX: This code will run into problem if we're going to support
1534          * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
1535          * and wait for all of them to be finished. We should inherit request
1536          * set from old request. */
1537         ptlrpcd_add_req(new_req, PDL_POLICY_SAME, -1);
1538
1539         DEBUG_REQ(D_INFO, new_req, "new request");
1540         RETURN(0);
1541 }
1542
1543 /*
1544  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1545  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1546  * fine for our small page arrays and doesn't require allocation.  its an
1547  * insertion sort that swaps elements that are strides apart, shrinking the
1548  * stride down until its '1' and the array is sorted.
1549  */
1550 static void sort_brw_pages(struct brw_page **array, int num)
1551 {
1552         int stride, i, j;
1553         struct brw_page *tmp;
1554
1555         if (num == 1)
1556                 return;
1557         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1558                 ;
1559
1560         do {
1561                 stride /= 3;
1562                 for (i = stride ; i < num ; i++) {
1563                         tmp = array[i];
1564                         j = i;
1565                         while (j >= stride && array[j - stride]->off > tmp->off) {
1566                                 array[j] = array[j - stride];
1567                                 j -= stride;
1568                         }
1569                         array[j] = tmp;
1570                 }
1571         } while (stride > 1);
1572 }
1573
1574 static void osc_release_ppga(struct brw_page **ppga, size_t count)
1575 {
1576         LASSERT(ppga != NULL);
1577         OBD_FREE(ppga, sizeof(*ppga) * count);
1578 }
1579
1580 static int brw_interpret(const struct lu_env *env,
1581                          struct ptlrpc_request *req, void *data, int rc)
1582 {
1583         struct osc_brw_async_args *aa = data;
1584         struct osc_extent *ext;
1585         struct osc_extent *tmp;
1586         struct client_obd *cli = aa->aa_cli;
1587         ENTRY;
1588
1589         rc = osc_brw_fini_request(req, rc);
1590         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
1591         /* When server return -EINPROGRESS, client should always retry
1592          * regardless of the number of times the bulk was resent already. */
1593         if (osc_recoverable_error(rc)) {
1594                 if (req->rq_import_generation !=
1595                     req->rq_import->imp_generation) {
1596                         CDEBUG(D_HA, "%s: resend cross eviction for object: "
1597                                ""DOSTID", rc = %d.\n",
1598                                req->rq_import->imp_obd->obd_name,
1599                                POSTID(&aa->aa_oa->o_oi), rc);
1600                 } else if (rc == -EINPROGRESS ||
1601                     client_should_resend(aa->aa_resends, aa->aa_cli)) {
1602                         rc = osc_brw_redo_request(req, aa, rc);
1603                 } else {
1604                         CERROR("%s: too many resent retries for object: "
1605                                ""LPU64":"LPU64", rc = %d.\n",
1606                                req->rq_import->imp_obd->obd_name,
1607                                POSTID(&aa->aa_oa->o_oi), rc);
1608                 }
1609
1610                 if (rc == 0)
1611                         RETURN(0);
1612                 else if (rc == -EAGAIN || rc == -EINPROGRESS)
1613                         rc = -EIO;
1614         }
1615
1616         if (aa->aa_ocapa) {
1617                 capa_put(aa->aa_ocapa);
1618                 aa->aa_ocapa = NULL;
1619         }
1620
1621         if (rc == 0) {
1622                 struct obdo *oa = aa->aa_oa;
1623                 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
1624                 unsigned long valid = 0;
1625                 struct cl_object *obj;
1626                 struct osc_async_page *last;
1627
1628                 last = brw_page2oap(aa->aa_ppga[aa->aa_page_count - 1]);
1629                 obj = osc2cl(last->oap_obj);
1630
1631                 cl_object_attr_lock(obj);
1632                 if (oa->o_valid & OBD_MD_FLBLOCKS) {
1633                         attr->cat_blocks = oa->o_blocks;
1634                         valid |= CAT_BLOCKS;
1635                 }
1636                 if (oa->o_valid & OBD_MD_FLMTIME) {
1637                         attr->cat_mtime = oa->o_mtime;
1638                         valid |= CAT_MTIME;
1639                 }
1640                 if (oa->o_valid & OBD_MD_FLATIME) {
1641                         attr->cat_atime = oa->o_atime;
1642                         valid |= CAT_ATIME;
1643                 }
1644                 if (oa->o_valid & OBD_MD_FLCTIME) {
1645                         attr->cat_ctime = oa->o_ctime;
1646                         valid |= CAT_CTIME;
1647                 }
1648
1649                 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1650                         struct lov_oinfo *loi = cl2osc(obj)->oo_oinfo;
1651                         loff_t last_off = last->oap_count + last->oap_obj_off +
1652                                 last->oap_page_off;
1653
1654                         /* Change file size if this is an out of quota or
1655                          * direct IO write and it extends the file size */
1656                         if (loi->loi_lvb.lvb_size < last_off) {
1657                                 attr->cat_size = last_off;
1658                                 valid |= CAT_SIZE;
1659                         }
1660                         /* Extend KMS if it's not a lockless write */
1661                         if (loi->loi_kms < last_off &&
1662                             oap2osc_page(last)->ops_srvlock == 0) {
1663                                 attr->cat_kms = last_off;
1664                                 valid |= CAT_KMS;
1665                         }
1666                 }
1667
1668                 if (valid != 0)
1669                         cl_object_attr_update(env, obj, attr, valid);
1670                 cl_object_attr_unlock(obj);
1671         }
1672         OBDO_FREE(aa->aa_oa);
1673
1674         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE && rc == 0)
1675                 osc_inc_unstable_pages(req);
1676
1677         list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
1678                 list_del_init(&ext->oe_link);
1679                 osc_extent_finish(env, ext, 1, rc);
1680         }
1681         LASSERT(list_empty(&aa->aa_exts));
1682         LASSERT(list_empty(&aa->aa_oaps));
1683
1684         cl_req_completion(env, aa->aa_clerq, rc < 0 ? rc :
1685                           req->rq_bulk->bd_nob_transferred);
1686         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1687         ptlrpc_lprocfs_brw(req, req->rq_bulk->bd_nob_transferred);
1688
1689         spin_lock(&cli->cl_loi_list_lock);
1690         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
1691          * is called so we know whether to go to sync BRWs or wait for more
1692          * RPCs to complete */
1693         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
1694                 cli->cl_w_in_flight--;
1695         else
1696                 cli->cl_r_in_flight--;
1697         osc_wake_cache_waiters(cli);
1698         spin_unlock(&cli->cl_loi_list_lock);
1699
1700         osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
1701         RETURN(rc);
1702 }
1703
1704 static void brw_commit(struct ptlrpc_request *req)
1705 {
1706         /* If osc_inc_unstable_pages (via osc_extent_finish) races with
1707          * this called via the rq_commit_cb, I need to ensure
1708          * osc_dec_unstable_pages is still called. Otherwise unstable
1709          * pages may be leaked. */
1710         spin_lock(&req->rq_lock);
1711         if (likely(req->rq_unstable)) {
1712                 req->rq_unstable = 0;
1713                 spin_unlock(&req->rq_lock);
1714
1715                 osc_dec_unstable_pages(req);
1716         } else {
1717                 req->rq_committed = 1;
1718                 spin_unlock(&req->rq_lock);
1719         }
1720 }
1721
1722 /**
1723  * Build an RPC by the list of extent @ext_list. The caller must ensure
1724  * that the total pages in this list are NOT over max pages per RPC.
1725  * Extents in the list must be in OES_RPC state.
1726  */
1727 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
1728                   struct list_head *ext_list, int cmd, pdl_policy_t pol)
1729 {
1730         struct ptlrpc_request           *req = NULL;
1731         struct osc_extent               *ext;
1732         struct brw_page                 **pga = NULL;
1733         struct osc_brw_async_args       *aa = NULL;
1734         struct obdo                     *oa = NULL;
1735         struct osc_async_page           *oap;
1736         struct osc_async_page           *tmp;
1737         struct cl_req                   *clerq = NULL;
1738         enum cl_req_type                crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE :
1739                                                                       CRT_READ;
1740         struct cl_req_attr              *crattr = NULL;
1741         loff_t                          starting_offset = OBD_OBJECT_EOF;
1742         loff_t                          ending_offset = 0;
1743         int                             mpflag = 0;
1744         int                             mem_tight = 0;
1745         int                             page_count = 0;
1746         bool                            soft_sync = false;
1747         int                             i;
1748         int                             rc;
1749         struct list_head                rpc_list = LIST_HEAD_INIT(rpc_list);
1750         struct ost_body                 *body;
1751         ENTRY;
1752         LASSERT(!list_empty(ext_list));
1753
1754         /* add pages into rpc_list to build BRW rpc */
1755         list_for_each_entry(ext, ext_list, oe_link) {
1756                 LASSERT(ext->oe_state == OES_RPC);
1757                 mem_tight |= ext->oe_memalloc;
1758                 list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
1759                         ++page_count;
1760                         list_add_tail(&oap->oap_rpc_item, &rpc_list);
1761                         if (starting_offset == OBD_OBJECT_EOF ||
1762                             starting_offset > oap->oap_obj_off)
1763                                 starting_offset = oap->oap_obj_off;
1764                         else
1765                                 LASSERT(oap->oap_page_off == 0);
1766                         if (ending_offset < oap->oap_obj_off + oap->oap_count)
1767                                 ending_offset = oap->oap_obj_off +
1768                                                 oap->oap_count;
1769                         else
1770                                 LASSERT(oap->oap_page_off + oap->oap_count ==
1771                                         PAGE_CACHE_SIZE);
1772                 }
1773         }
1774
1775         soft_sync = osc_over_unstable_soft_limit(cli);
1776         if (mem_tight)
1777                 mpflag = cfs_memory_pressure_get_and_set();
1778
1779         OBD_ALLOC(crattr, sizeof(*crattr));
1780         if (crattr == NULL)
1781                 GOTO(out, rc = -ENOMEM);
1782
1783         OBD_ALLOC(pga, sizeof(*pga) * page_count);
1784         if (pga == NULL)
1785                 GOTO(out, rc = -ENOMEM);
1786
1787         OBDO_ALLOC(oa);
1788         if (oa == NULL)
1789                 GOTO(out, rc = -ENOMEM);
1790
1791         i = 0;
1792         list_for_each_entry(oap, &rpc_list, oap_rpc_item) {
1793                 struct cl_page *page = oap2cl_page(oap);
1794                 if (clerq == NULL) {
1795                         clerq = cl_req_alloc(env, page, crt,
1796                                              1 /* only 1-object rpcs for now */);
1797                         if (IS_ERR(clerq))
1798                                 GOTO(out, rc = PTR_ERR(clerq));
1799                 }
1800                 if (mem_tight)
1801                         oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
1802                 if (soft_sync)
1803                         oap->oap_brw_flags |= OBD_BRW_SOFT_SYNC;
1804                 pga[i] = &oap->oap_brw_page;
1805                 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
1806                 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
1807                        pga[i]->pg, page_index(oap->oap_page), oap,
1808                        pga[i]->flag);
1809                 i++;
1810                 cl_req_page_add(env, clerq, page);
1811         }
1812
1813         /* always get the data for the obdo for the rpc */
1814         LASSERT(clerq != NULL);
1815         crattr->cra_oa = oa;
1816         cl_req_attr_set(env, clerq, crattr, ~0ULL);
1817
1818         rc = cl_req_prep(env, clerq);
1819         if (rc != 0) {
1820                 CERROR("cl_req_prep failed: %d\n", rc);
1821                 GOTO(out, rc);
1822         }
1823
1824         sort_brw_pages(pga, page_count);
1825         rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
1826                         pga, &req, crattr->cra_capa, 1, 0);
1827         if (rc != 0) {
1828                 CERROR("prep_req failed: %d\n", rc);
1829                 GOTO(out, rc);
1830         }
1831
1832         req->rq_commit_cb = brw_commit;
1833         req->rq_interpret_reply = brw_interpret;
1834
1835         if (mem_tight != 0)
1836                 req->rq_memalloc = 1;
1837
1838         /* Need to update the timestamps after the request is built in case
1839          * we race with setattr (locally or in queue at OST).  If OST gets
1840          * later setattr before earlier BRW (as determined by the request xid),
1841          * the OST will not use BRW timestamps.  Sadly, there is no obvious
1842          * way to do this in a single call.  bug 10150 */
1843         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
1844         crattr->cra_oa = &body->oa;
1845         cl_req_attr_set(env, clerq, crattr,
1846                         OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME);
1847
1848         lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid);
1849
1850         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1851         aa = ptlrpc_req_async_args(req);
1852         INIT_LIST_HEAD(&aa->aa_oaps);
1853         list_splice_init(&rpc_list, &aa->aa_oaps);
1854         INIT_LIST_HEAD(&aa->aa_exts);
1855         list_splice_init(ext_list, &aa->aa_exts);
1856         aa->aa_clerq = clerq;
1857
1858         /* queued sync pages can be torn down while the pages
1859          * were between the pending list and the rpc */
1860         tmp = NULL;
1861         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1862                 /* only one oap gets a request reference */
1863                 if (tmp == NULL)
1864                         tmp = oap;
1865                 if (oap->oap_interrupted && !req->rq_intr) {
1866                         CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
1867                                         oap, req);
1868                         ptlrpc_mark_interrupted(req);
1869                 }
1870         }
1871         if (tmp != NULL)
1872                 tmp->oap_request = ptlrpc_request_addref(req);
1873
1874         spin_lock(&cli->cl_loi_list_lock);
1875         starting_offset >>= PAGE_CACHE_SHIFT;
1876         if (cmd == OBD_BRW_READ) {
1877                 cli->cl_r_in_flight++;
1878                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
1879                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
1880                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
1881                                       starting_offset + 1);
1882         } else {
1883                 cli->cl_w_in_flight++;
1884                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
1885                 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
1886                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
1887                                       starting_offset + 1);
1888         }
1889         spin_unlock(&cli->cl_loi_list_lock);
1890
1891         DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %ur/%uw in flight",
1892                   page_count, aa, cli->cl_r_in_flight,
1893                   cli->cl_w_in_flight);
1894
1895         /* XXX: Maybe the caller can check the RPC bulk descriptor to
1896          * see which CPU/NUMA node the majority of pages were allocated
1897          * on, and try to assign the async RPC to the CPU core
1898          * (PDL_POLICY_PREFERRED) to reduce cross-CPU memory traffic.
1899          *
1900          * But on the other hand, we expect that multiple ptlrpcd
1901          * threads and the initial write sponsor can run in parallel,
1902          * especially when data checksum is enabled, which is CPU-bound
1903          * operation and single ptlrpcd thread cannot process in time.
1904          * So more ptlrpcd threads sharing BRW load
1905          * (with PDL_POLICY_ROUND) seems better.
1906          */
1907         ptlrpcd_add_req(req, pol, -1);
1908         rc = 0;
1909         EXIT;
1910
1911 out:
1912         if (mem_tight != 0)
1913                 cfs_memory_pressure_restore(mpflag);
1914
1915         if (crattr != NULL) {
1916                 capa_put(crattr->cra_capa);
1917                 OBD_FREE(crattr, sizeof(*crattr));
1918         }
1919
1920         if (rc != 0) {
1921                 LASSERT(req == NULL);
1922
1923                 if (oa)
1924                         OBDO_FREE(oa);
1925                 if (pga)
1926                         OBD_FREE(pga, sizeof(*pga) * page_count);
1927                 /* this should happen rarely and is pretty bad, it makes the
1928                  * pending list not follow the dirty order */
1929                 while (!list_empty(ext_list)) {
1930                         ext = list_entry(ext_list->next, struct osc_extent,
1931                                          oe_link);
1932                         list_del_init(&ext->oe_link);
1933                         osc_extent_finish(env, ext, 0, rc);
1934                 }
1935                 if (clerq && !IS_ERR(clerq))
1936                         cl_req_completion(env, clerq, rc);
1937         }
1938         RETURN(rc);
1939 }
1940
1941 static int osc_set_lock_data_with_check(struct ldlm_lock *lock,
1942                                         struct ldlm_enqueue_info *einfo)
1943 {
1944         void *data = einfo->ei_cbdata;
1945         int set = 0;
1946
1947         LASSERT(lock != NULL);
1948         LASSERT(lock->l_blocking_ast == einfo->ei_cb_bl);
1949         LASSERT(lock->l_resource->lr_type == einfo->ei_type);
1950         LASSERT(lock->l_completion_ast == einfo->ei_cb_cp);
1951         LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl);
1952
1953         lock_res_and_lock(lock);
1954
1955         if (lock->l_ast_data == NULL)
1956                 lock->l_ast_data = data;
1957         if (lock->l_ast_data == data)
1958                 set = 1;
1959
1960         unlock_res_and_lock(lock);
1961
1962         return set;
1963 }
1964
1965 static int osc_set_data_with_check(struct lustre_handle *lockh,
1966                                    struct ldlm_enqueue_info *einfo)
1967 {
1968         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
1969         int set = 0;
1970
1971         if (lock != NULL) {
1972                 set = osc_set_lock_data_with_check(lock, einfo);
1973                 LDLM_LOCK_PUT(lock);
1974         } else
1975                 CERROR("lockh %p, data %p - client evicted?\n",
1976                        lockh, einfo->ei_cbdata);
1977         return set;
1978 }
1979
1980 static int osc_enqueue_fini(struct ptlrpc_request *req,
1981                             osc_enqueue_upcall_f upcall, void *cookie,
1982                             struct lustre_handle *lockh, ldlm_mode_t mode,
1983                             __u64 *flags, int agl, int errcode)
1984 {
1985         bool intent = *flags & LDLM_FL_HAS_INTENT;
1986         int rc;
1987         ENTRY;
1988
1989         /* The request was created before ldlm_cli_enqueue call. */
1990         if (intent && errcode == ELDLM_LOCK_ABORTED) {
1991                 struct ldlm_reply *rep;
1992
1993                 rep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
1994                 LASSERT(rep != NULL);
1995
1996                 rep->lock_policy_res1 =
1997                         ptlrpc_status_ntoh(rep->lock_policy_res1);
1998                 if (rep->lock_policy_res1)
1999                         errcode = rep->lock_policy_res1;
2000                 if (!agl)
2001                         *flags |= LDLM_FL_LVB_READY;
2002         } else if (errcode == ELDLM_OK) {
2003                 *flags |= LDLM_FL_LVB_READY;
2004         }
2005
2006         /* Call the update callback. */
2007         rc = (*upcall)(cookie, lockh, errcode);
2008
2009         /* release the reference taken in ldlm_cli_enqueue() */
2010         if (errcode == ELDLM_LOCK_MATCHED)
2011                 errcode = ELDLM_OK;
2012         if (errcode == ELDLM_OK && lustre_handle_is_used(lockh))
2013                 ldlm_lock_decref(lockh, mode);
2014
2015         RETURN(rc);
2016 }
2017
2018 static int osc_enqueue_interpret(const struct lu_env *env,
2019                                  struct ptlrpc_request *req,
2020                                  struct osc_enqueue_args *aa, int rc)
2021 {
2022         struct ldlm_lock *lock;
2023         struct lustre_handle *lockh = &aa->oa_lockh;
2024         ldlm_mode_t mode = aa->oa_mode;
2025         struct ost_lvb *lvb = aa->oa_lvb;
2026         __u32 lvb_len = sizeof(*lvb);
2027         __u64 flags = 0;
2028
2029         ENTRY;
2030
2031         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2032          * be valid. */
2033         lock = ldlm_handle2lock(lockh);
2034         LASSERTF(lock != NULL,
2035                  "lockh "LPX64", req %p, aa %p - client evicted?\n",
2036                  lockh->cookie, req, aa);
2037
2038         /* Take an additional reference so that a blocking AST that
2039          * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2040          * to arrive after an upcall has been executed by
2041          * osc_enqueue_fini(). */
2042         ldlm_lock_addref(lockh, mode);
2043
2044         /* Let cl_lock_state_wait fail with -ERESTARTSYS to unuse sublocks. */
2045         OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_HANG, 2);
2046
2047         /* Let CP AST to grant the lock first. */
2048         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
2049
2050         if (aa->oa_agl) {
2051                 LASSERT(aa->oa_lvb == NULL);
2052                 LASSERT(aa->oa_flags == NULL);
2053                 aa->oa_flags = &flags;
2054         }
2055
2056         /* Complete obtaining the lock procedure. */
2057         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_type, 1,
2058                                    aa->oa_mode, aa->oa_flags, lvb, lvb_len,
2059                                    lockh, rc);
2060         /* Complete osc stuff. */
2061         rc = osc_enqueue_fini(req, aa->oa_upcall, aa->oa_cookie, lockh, mode,
2062                               aa->oa_flags, aa->oa_agl, rc);
2063
2064         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
2065
2066         ldlm_lock_decref(lockh, mode);
2067         LDLM_LOCK_PUT(lock);
2068         RETURN(rc);
2069 }
2070
2071 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
2072
2073 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2074  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2075  * other synchronous requests, however keeping some locks and trying to obtain
2076  * others may take a considerable amount of time in a case of ost failure; and
2077  * when other sync requests do not get released lock from a client, the client
2078  * is evicted from the cluster -- such scenarious make the life difficult, so
2079  * release locks just after they are obtained. */
2080 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2081                      __u64 *flags, ldlm_policy_data_t *policy,
2082                      struct ost_lvb *lvb, int kms_valid,
2083                      osc_enqueue_upcall_f upcall, void *cookie,
2084                      struct ldlm_enqueue_info *einfo,
2085                      struct ptlrpc_request_set *rqset, int async, int agl)
2086 {
2087         struct obd_device *obd = exp->exp_obd;
2088         struct lustre_handle lockh = { 0 };
2089         struct ptlrpc_request *req = NULL;
2090         int intent = *flags & LDLM_FL_HAS_INTENT;
2091         __u64 match_lvb = agl ? 0 : LDLM_FL_LVB_READY;
2092         ldlm_mode_t mode;
2093         int rc;
2094         ENTRY;
2095
2096         /* Filesystem lock extents are extended to page boundaries so that
2097          * dealing with the page cache is a little smoother.  */
2098         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2099         policy->l_extent.end |= ~CFS_PAGE_MASK;
2100
2101         /*
2102          * kms is not valid when either object is completely fresh (so that no
2103          * locks are cached), or object was evicted. In the latter case cached
2104          * lock cannot be used, because it would prime inode state with
2105          * potentially stale LVB.
2106          */
2107         if (!kms_valid)
2108                 goto no_match;
2109
2110         /* Next, search for already existing extent locks that will cover us */
2111         /* If we're trying to read, we also search for an existing PW lock.  The
2112          * VFS and page cache already protect us locally, so lots of readers/
2113          * writers can share a single PW lock.
2114          *
2115          * There are problems with conversion deadlocks, so instead of
2116          * converting a read lock to a write lock, we'll just enqueue a new
2117          * one.
2118          *
2119          * At some point we should cancel the read lock instead of making them
2120          * send us a blocking callback, but there are problems with canceling
2121          * locks out from other users right now, too. */
2122         mode = einfo->ei_mode;
2123         if (einfo->ei_mode == LCK_PR)
2124                 mode |= LCK_PW;
2125         mode = ldlm_lock_match(obd->obd_namespace, *flags | match_lvb, res_id,
2126                                einfo->ei_type, policy, mode, &lockh, 0);
2127         if (mode) {
2128                 struct ldlm_lock *matched;
2129
2130                 if (*flags & LDLM_FL_TEST_LOCK)
2131                         RETURN(ELDLM_OK);
2132
2133                 matched = ldlm_handle2lock(&lockh);
2134                 if (agl) {
2135                         /* AGL enqueues DLM locks speculatively. Therefore if
2136                          * it already exists a DLM lock, it wll just inform the
2137                          * caller to cancel the AGL process for this stripe. */
2138                         ldlm_lock_decref(&lockh, mode);
2139                         LDLM_LOCK_PUT(matched);
2140                         RETURN(-ECANCELED);
2141                 } else if (osc_set_lock_data_with_check(matched, einfo)) {
2142                         *flags |= LDLM_FL_LVB_READY;
2143
2144                         /* We already have a lock, and it's referenced. */
2145                         (*upcall)(cookie, &lockh, ELDLM_LOCK_MATCHED);
2146
2147                         ldlm_lock_decref(&lockh, mode);
2148                         LDLM_LOCK_PUT(matched);
2149                         RETURN(ELDLM_OK);
2150                 } else {
2151                         ldlm_lock_decref(&lockh, mode);
2152                         LDLM_LOCK_PUT(matched);
2153                 }
2154         }
2155
2156 no_match:
2157         if (*flags & LDLM_FL_TEST_LOCK)
2158                 RETURN(-ENOLCK);
2159
2160         if (intent) {
2161                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2162                                            &RQF_LDLM_ENQUEUE_LVB);
2163                 if (req == NULL)
2164                         RETURN(-ENOMEM);
2165
2166                 rc = ptlrpc_request_pack(req, LUSTRE_DLM_VERSION, LDLM_ENQUEUE);
2167                 if (rc < 0) {
2168                         ptlrpc_request_free(req);
2169                         RETURN(rc);
2170                 }
2171
2172                 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
2173                                      sizeof *lvb);
2174                 ptlrpc_request_set_replen(req);
2175         }
2176
2177         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2178         *flags &= ~LDLM_FL_BLOCK_GRANTED;
2179
2180         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2181                               sizeof(*lvb), LVB_T_OST, &lockh, async);
2182         if (async) {
2183                 if (!rc) {
2184                         struct osc_enqueue_args *aa;
2185                         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2186                         aa = ptlrpc_req_async_args(req);
2187                         aa->oa_exp    = exp;
2188                         aa->oa_mode   = einfo->ei_mode;
2189                         aa->oa_type   = einfo->ei_type;
2190                         lustre_handle_copy(&aa->oa_lockh, &lockh);
2191                         aa->oa_upcall = upcall;
2192                         aa->oa_cookie = cookie;
2193                         aa->oa_agl    = !!agl;
2194                         if (!agl) {
2195                                 aa->oa_flags  = flags;
2196                                 aa->oa_lvb    = lvb;
2197                         } else {
2198                                 /* AGL is essentially to enqueue an DLM lock
2199                                  * in advance, so we don't care about the
2200                                  * result of AGL enqueue. */
2201                                 aa->oa_lvb    = NULL;
2202                                 aa->oa_flags  = NULL;
2203                         }
2204
2205                         req->rq_interpret_reply =
2206                                 (ptlrpc_interpterer_t)osc_enqueue_interpret;
2207                         if (rqset == PTLRPCD_SET)
2208                                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
2209                         else
2210                                 ptlrpc_set_add_req(rqset, req);
2211                 } else if (intent) {
2212                         ptlrpc_req_finished(req);
2213                 }
2214                 RETURN(rc);
2215         }
2216
2217         rc = osc_enqueue_fini(req, upcall, cookie, &lockh, einfo->ei_mode,
2218                               flags, agl, rc);
2219         if (intent)
2220                 ptlrpc_req_finished(req);
2221
2222         RETURN(rc);
2223 }
2224
2225 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2226                    __u32 type, ldlm_policy_data_t *policy, __u32 mode,
2227                    __u64 *flags, void *data, struct lustre_handle *lockh,
2228                    int unref)
2229 {
2230         struct obd_device *obd = exp->exp_obd;
2231         __u64 lflags = *flags;
2232         ldlm_mode_t rc;
2233         ENTRY;
2234
2235         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2236                 RETURN(-EIO);
2237
2238         /* Filesystem lock extents are extended to page boundaries so that
2239          * dealing with the page cache is a little smoother */
2240         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2241         policy->l_extent.end |= ~CFS_PAGE_MASK;
2242
2243         /* Next, search for already existing extent locks that will cover us */
2244         /* If we're trying to read, we also search for an existing PW lock.  The
2245          * VFS and page cache already protect us locally, so lots of readers/
2246          * writers can share a single PW lock. */
2247         rc = mode;
2248         if (mode == LCK_PR)
2249                 rc |= LCK_PW;
2250         rc = ldlm_lock_match(obd->obd_namespace, lflags,
2251                              res_id, type, policy, rc, lockh, unref);
2252         if (rc) {
2253                 if (data != NULL) {
2254                         if (!osc_set_data_with_check(lockh, data)) {
2255                                 if (!(lflags & LDLM_FL_TEST_LOCK))
2256                                         ldlm_lock_decref(lockh, rc);
2257                                 RETURN(0);
2258                         }
2259                 }
2260                 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
2261                         ldlm_lock_addref(lockh, LCK_PR);
2262                         ldlm_lock_decref(lockh, LCK_PW);
2263                 }
2264                 RETURN(rc);
2265         }
2266         RETURN(rc);
2267 }
2268
2269 int osc_cancel_base(struct lustre_handle *lockh, __u32 mode)
2270 {
2271         ENTRY;
2272
2273         if (unlikely(mode == LCK_GROUP))
2274                 ldlm_lock_decref_and_cancel(lockh, mode);
2275         else
2276                 ldlm_lock_decref(lockh, mode);
2277
2278         RETURN(0);
2279 }
2280
2281 static int osc_statfs_interpret(const struct lu_env *env,
2282                                 struct ptlrpc_request *req,
2283                                 struct osc_async_args *aa, int rc)
2284 {
2285         struct obd_statfs *msfs;
2286         ENTRY;
2287
2288         if (rc == -EBADR)
2289                 /* The request has in fact never been sent
2290                  * due to issues at a higher level (LOV).
2291                  * Exit immediately since the caller is
2292                  * aware of the problem and takes care
2293                  * of the clean up */
2294                  RETURN(rc);
2295
2296         if ((rc == -ENOTCONN || rc == -EAGAIN) &&
2297             (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
2298                 GOTO(out, rc = 0);
2299
2300         if (rc != 0)
2301                 GOTO(out, rc);
2302
2303         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2304         if (msfs == NULL) {
2305                 GOTO(out, rc = -EPROTO);
2306         }
2307
2308         *aa->aa_oi->oi_osfs = *msfs;
2309 out:
2310         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2311         RETURN(rc);
2312 }
2313
2314 static int osc_statfs_async(struct obd_export *exp,
2315                             struct obd_info *oinfo, __u64 max_age,
2316                             struct ptlrpc_request_set *rqset)
2317 {
2318         struct obd_device     *obd = class_exp2obd(exp);
2319         struct ptlrpc_request *req;
2320         struct osc_async_args *aa;
2321         int                    rc;
2322         ENTRY;
2323
2324         /* We could possibly pass max_age in the request (as an absolute
2325          * timestamp or a "seconds.usec ago") so the target can avoid doing
2326          * extra calls into the filesystem if that isn't necessary (e.g.
2327          * during mount that would help a bit).  Having relative timestamps
2328          * is not so great if request processing is slow, while absolute
2329          * timestamps are not ideal because they need time synchronization. */
2330         req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
2331         if (req == NULL)
2332                 RETURN(-ENOMEM);
2333
2334         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2335         if (rc) {
2336                 ptlrpc_request_free(req);
2337                 RETURN(rc);
2338         }
2339         ptlrpc_request_set_replen(req);
2340         req->rq_request_portal = OST_CREATE_PORTAL;
2341         ptlrpc_at_set_req_timeout(req);
2342
2343         if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
2344                 /* procfs requests not want stat in wait for avoid deadlock */
2345                 req->rq_no_resend = 1;
2346                 req->rq_no_delay = 1;
2347         }
2348
2349         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
2350         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2351         aa = ptlrpc_req_async_args(req);
2352         aa->aa_oi = oinfo;
2353
2354         ptlrpc_set_add_req(rqset, req);
2355         RETURN(0);
2356 }
2357
2358 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
2359                       struct obd_statfs *osfs, __u64 max_age, __u32 flags)
2360 {
2361         struct obd_device     *obd = class_exp2obd(exp);
2362         struct obd_statfs     *msfs;
2363         struct ptlrpc_request *req;
2364         struct obd_import     *imp = NULL;
2365         int rc;
2366         ENTRY;
2367
2368         /*Since the request might also come from lprocfs, so we need
2369          *sync this with client_disconnect_export Bug15684*/
2370         down_read(&obd->u.cli.cl_sem);
2371         if (obd->u.cli.cl_import)
2372                 imp = class_import_get(obd->u.cli.cl_import);
2373         up_read(&obd->u.cli.cl_sem);
2374         if (!imp)
2375                 RETURN(-ENODEV);
2376
2377         /* We could possibly pass max_age in the request (as an absolute
2378          * timestamp or a "seconds.usec ago") so the target can avoid doing
2379          * extra calls into the filesystem if that isn't necessary (e.g.
2380          * during mount that would help a bit).  Having relative timestamps
2381          * is not so great if request processing is slow, while absolute
2382          * timestamps are not ideal because they need time synchronization. */
2383         req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
2384
2385         class_import_put(imp);
2386
2387         if (req == NULL)
2388                 RETURN(-ENOMEM);
2389
2390         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2391         if (rc) {
2392                 ptlrpc_request_free(req);
2393                 RETURN(rc);
2394         }
2395         ptlrpc_request_set_replen(req);
2396         req->rq_request_portal = OST_CREATE_PORTAL;
2397         ptlrpc_at_set_req_timeout(req);
2398
2399         if (flags & OBD_STATFS_NODELAY) {
2400                 /* procfs requests not want stat in wait for avoid deadlock */
2401                 req->rq_no_resend = 1;
2402                 req->rq_no_delay = 1;
2403         }
2404
2405         rc = ptlrpc_queue_wait(req);
2406         if (rc)
2407                 GOTO(out, rc);
2408
2409         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2410         if (msfs == NULL) {
2411                 GOTO(out, rc = -EPROTO);
2412         }
2413
2414         *osfs = *msfs;
2415
2416         EXIT;
2417  out:
2418         ptlrpc_req_finished(req);
2419         return rc;
2420 }
2421
2422 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2423                          void *karg, void *uarg)
2424 {
2425         struct obd_device *obd = exp->exp_obd;
2426         struct obd_ioctl_data *data = karg;
2427         int err = 0;
2428         ENTRY;
2429
2430         if (!try_module_get(THIS_MODULE)) {
2431                 CERROR("%s: cannot get module '%s'\n", obd->obd_name,
2432                        module_name(THIS_MODULE));
2433                 return -EINVAL;
2434         }
2435         switch (cmd) {
2436         case OBD_IOC_CLIENT_RECOVER:
2437                 err = ptlrpc_recover_import(obd->u.cli.cl_import,
2438                                             data->ioc_inlbuf1, 0);
2439                 if (err > 0)
2440                         err = 0;
2441                 GOTO(out, err);
2442         case IOC_OSC_SET_ACTIVE:
2443                 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
2444                                                data->ioc_offset);
2445                 GOTO(out, err);
2446         case OBD_IOC_POLL_QUOTACHECK:
2447                 err = osc_quota_poll_check(exp, (struct if_quotacheck *)karg);
2448                 GOTO(out, err);
2449         case OBD_IOC_PING_TARGET:
2450                 err = ptlrpc_obd_ping(obd);
2451                 GOTO(out, err);
2452         default:
2453                 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
2454                        cmd, current_comm());
2455                 GOTO(out, err = -ENOTTY);
2456         }
2457 out:
2458         module_put(THIS_MODULE);
2459         return err;
2460 }
2461
2462 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
2463                               u32 keylen, void *key,
2464                               u32 vallen, void *val,
2465                               struct ptlrpc_request_set *set)
2466 {
2467         struct ptlrpc_request *req;
2468         struct obd_device     *obd = exp->exp_obd;
2469         struct obd_import     *imp = class_exp2cliimp(exp);
2470         char                  *tmp;
2471         int                    rc;
2472         ENTRY;
2473
2474         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
2475
2476         if (KEY_IS(KEY_CHECKSUM)) {
2477                 if (vallen != sizeof(int))
2478                         RETURN(-EINVAL);
2479                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
2480                 RETURN(0);
2481         }
2482
2483         if (KEY_IS(KEY_SPTLRPC_CONF)) {
2484                 sptlrpc_conf_client_adapt(obd);
2485                 RETURN(0);
2486         }
2487
2488         if (KEY_IS(KEY_FLUSH_CTX)) {
2489                 sptlrpc_import_flush_my_ctx(imp);
2490                 RETURN(0);
2491         }
2492
2493         if (KEY_IS(KEY_CACHE_SET)) {
2494                 struct client_obd *cli = &obd->u.cli;
2495
2496                 LASSERT(cli->cl_cache == NULL); /* only once */
2497                 cli->cl_cache = (struct cl_client_cache *)val;
2498                 cl_cache_incref(cli->cl_cache);
2499                 cli->cl_lru_left = &cli->cl_cache->ccc_lru_left;
2500
2501                 /* add this osc into entity list */
2502                 LASSERT(list_empty(&cli->cl_lru_osc));
2503                 spin_lock(&cli->cl_cache->ccc_lru_lock);
2504                 list_add(&cli->cl_lru_osc, &cli->cl_cache->ccc_lru);
2505                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
2506
2507                 RETURN(0);
2508         }
2509
2510         if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
2511                 struct client_obd *cli = &obd->u.cli;
2512                 long nr = atomic_long_read(&cli->cl_lru_in_list) >> 1;
2513                 long target = *(long *)val;
2514
2515                 nr = osc_lru_shrink(env, cli, min(nr, target), true);
2516                 *(long *)val -= nr;
2517                 RETURN(0);
2518         }
2519
2520         if (!set && !KEY_IS(KEY_GRANT_SHRINK))
2521                 RETURN(-EINVAL);
2522
2523         /* We pass all other commands directly to OST. Since nobody calls osc
2524            methods directly and everybody is supposed to go through LOV, we
2525            assume lov checked invalid values for us.
2526            The only recognised values so far are evict_by_nid and mds_conn.
2527            Even if something bad goes through, we'd get a -EINVAL from OST
2528            anyway. */
2529
2530         req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ?
2531                                                 &RQF_OST_SET_GRANT_INFO :
2532                                                 &RQF_OBD_SET_INFO);
2533         if (req == NULL)
2534                 RETURN(-ENOMEM);
2535
2536         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
2537                              RCL_CLIENT, keylen);
2538         if (!KEY_IS(KEY_GRANT_SHRINK))
2539                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
2540                                      RCL_CLIENT, vallen);
2541         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
2542         if (rc) {
2543                 ptlrpc_request_free(req);
2544                 RETURN(rc);
2545         }
2546
2547         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
2548         memcpy(tmp, key, keylen);
2549         tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ?
2550                                                         &RMF_OST_BODY :
2551                                                         &RMF_SETINFO_VAL);
2552         memcpy(tmp, val, vallen);
2553
2554         if (KEY_IS(KEY_GRANT_SHRINK)) {
2555                 struct osc_grant_args *aa;
2556                 struct obdo *oa;
2557
2558                 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2559                 aa = ptlrpc_req_async_args(req);
2560                 OBDO_ALLOC(oa);
2561                 if (!oa) {
2562                         ptlrpc_req_finished(req);
2563                         RETURN(-ENOMEM);
2564                 }
2565                 *oa = ((struct ost_body *)val)->oa;
2566                 aa->aa_oa = oa;
2567                 req->rq_interpret_reply = osc_shrink_grant_interpret;
2568         }
2569
2570         ptlrpc_request_set_replen(req);
2571         if (!KEY_IS(KEY_GRANT_SHRINK)) {
2572                 LASSERT(set != NULL);
2573                 ptlrpc_set_add_req(set, req);
2574                 ptlrpc_check_set(NULL, set);
2575         } else
2576                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
2577
2578         RETURN(0);
2579 }
2580
2581 static int osc_reconnect(const struct lu_env *env,
2582                          struct obd_export *exp, struct obd_device *obd,
2583                          struct obd_uuid *cluuid,
2584                          struct obd_connect_data *data,
2585                          void *localdata)
2586 {
2587         struct client_obd *cli = &obd->u.cli;
2588
2589         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
2590                 long lost_grant;
2591
2592                 spin_lock(&cli->cl_loi_list_lock);
2593                 data->ocd_grant = (cli->cl_avail_grant +
2594                                   (cli->cl_dirty_pages << PAGE_CACHE_SHIFT)) ?:
2595                                   2 * cli_brw_size(obd);
2596                 lost_grant = cli->cl_lost_grant;
2597                 cli->cl_lost_grant = 0;
2598                 spin_unlock(&cli->cl_loi_list_lock);
2599
2600                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
2601                        " ocd_grant: %d, lost: %ld.\n", data->ocd_connect_flags,
2602                        data->ocd_version, data->ocd_grant, lost_grant);
2603         }
2604
2605         RETURN(0);
2606 }
2607
2608 static int osc_disconnect(struct obd_export *exp)
2609 {
2610         struct obd_device *obd = class_exp2obd(exp);
2611         int rc;
2612
2613         rc = client_disconnect_export(exp);
2614         /**
2615          * Initially we put del_shrink_grant before disconnect_export, but it
2616          * causes the following problem if setup (connect) and cleanup
2617          * (disconnect) are tangled together.
2618          *      connect p1                     disconnect p2
2619          *   ptlrpc_connect_import
2620          *     ...............               class_manual_cleanup
2621          *                                     osc_disconnect
2622          *                                     del_shrink_grant
2623          *   ptlrpc_connect_interrupt
2624          *     init_grant_shrink
2625          *   add this client to shrink list
2626          *                                      cleanup_osc
2627          * Bang! pinger trigger the shrink.
2628          * So the osc should be disconnected from the shrink list, after we
2629          * are sure the import has been destroyed. BUG18662
2630          */
2631         if (obd->u.cli.cl_import == NULL)
2632                 osc_del_shrink_grant(&obd->u.cli);
2633         return rc;
2634 }
2635
2636 static int osc_import_event(struct obd_device *obd,
2637                             struct obd_import *imp,
2638                             enum obd_import_event event)
2639 {
2640         struct client_obd *cli;
2641         int rc = 0;
2642
2643         ENTRY;
2644         LASSERT(imp->imp_obd == obd);
2645
2646         switch (event) {
2647         case IMP_EVENT_DISCON: {
2648                 cli = &obd->u.cli;
2649                 spin_lock(&cli->cl_loi_list_lock);
2650                 cli->cl_avail_grant = 0;
2651                 cli->cl_lost_grant = 0;
2652                 spin_unlock(&cli->cl_loi_list_lock);
2653                 break;
2654         }
2655         case IMP_EVENT_INACTIVE: {
2656                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
2657                 break;
2658         }
2659         case IMP_EVENT_INVALIDATE: {
2660                 struct ldlm_namespace *ns = obd->obd_namespace;
2661                 struct lu_env         *env;
2662                 int                    refcheck;
2663
2664                 env = cl_env_get(&refcheck);
2665                 if (!IS_ERR(env)) {
2666                         /* Reset grants */
2667                         cli = &obd->u.cli;
2668                         /* all pages go to failing rpcs due to the invalid
2669                          * import */
2670                         osc_io_unplug(env, cli, NULL, PDL_POLICY_ROUND);
2671
2672                         ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
2673                         cl_env_put(env, &refcheck);
2674                 } else
2675                         rc = PTR_ERR(env);
2676                 break;
2677         }
2678         case IMP_EVENT_ACTIVE: {
2679                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
2680                 break;
2681         }
2682         case IMP_EVENT_OCD: {
2683                 struct obd_connect_data *ocd = &imp->imp_connect_data;
2684
2685                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
2686                         osc_init_grant(&obd->u.cli, ocd);
2687
2688                 /* See bug 7198 */
2689                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
2690                         imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
2691
2692                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
2693                 break;
2694         }
2695         case IMP_EVENT_DEACTIVATE: {
2696                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE, NULL);
2697                 break;
2698         }
2699         case IMP_EVENT_ACTIVATE: {
2700                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE, NULL);
2701                 break;
2702         }
2703         default:
2704                 CERROR("Unknown import event %d\n", event);
2705                 LBUG();
2706         }
2707         RETURN(rc);
2708 }
2709
2710 /**
2711  * Determine whether the lock can be canceled before replaying the lock
2712  * during recovery, see bug16774 for detailed information.
2713  *
2714  * \retval zero the lock can't be canceled
2715  * \retval other ok to cancel
2716  */
2717 static int osc_cancel_weight(struct ldlm_lock *lock)
2718 {
2719         /*
2720          * Cancel all unused and granted extent lock.
2721          */
2722         if (lock->l_resource->lr_type == LDLM_EXTENT &&
2723             lock->l_granted_mode == lock->l_req_mode &&
2724             osc_ldlm_weigh_ast(lock) == 0)
2725                 RETURN(1);
2726
2727         RETURN(0);
2728 }
2729
2730 static int brw_queue_work(const struct lu_env *env, void *data)
2731 {
2732         struct client_obd *cli = data;
2733
2734         CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
2735
2736         osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
2737         RETURN(0);
2738 }
2739
2740 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
2741 {
2742         struct client_obd *cli = &obd->u.cli;
2743         struct obd_type   *type;
2744         void              *handler;
2745         int                rc;
2746         ENTRY;
2747
2748         rc = ptlrpcd_addref();
2749         if (rc)
2750                 RETURN(rc);
2751
2752         rc = client_obd_setup(obd, lcfg);
2753         if (rc)
2754                 GOTO(out_ptlrpcd, rc);
2755
2756         handler = ptlrpcd_alloc_work(cli->cl_import, brw_queue_work, cli);
2757         if (IS_ERR(handler))
2758                 GOTO(out_client_setup, rc = PTR_ERR(handler));
2759         cli->cl_writeback_work = handler;
2760
2761         handler = ptlrpcd_alloc_work(cli->cl_import, lru_queue_work, cli);
2762         if (IS_ERR(handler))
2763                 GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
2764         cli->cl_lru_work = handler;
2765
2766         rc = osc_quota_setup(obd);
2767         if (rc)
2768                 GOTO(out_ptlrpcd_work, rc);
2769
2770         cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
2771
2772 #ifdef CONFIG_PROC_FS
2773         obd->obd_vars = lprocfs_osc_obd_vars;
2774 #endif
2775         /* If this is true then both client (osc) and server (osp) are on the
2776          * same node. The osp layer if loaded first will register the osc proc
2777          * directory. In that case this obd_device will be attached its proc
2778          * tree to type->typ_procsym instead of obd->obd_type->typ_procroot. */
2779         type = class_search_type(LUSTRE_OSP_NAME);
2780         if (type && type->typ_procsym) {
2781                 obd->obd_proc_entry = lprocfs_register(obd->obd_name,
2782                                                        type->typ_procsym,
2783                                                        obd->obd_vars, obd);
2784                 if (IS_ERR(obd->obd_proc_entry)) {
2785                         rc = PTR_ERR(obd->obd_proc_entry);
2786                         CERROR("error %d setting up lprocfs for %s\n", rc,
2787                                obd->obd_name);
2788                         obd->obd_proc_entry = NULL;
2789                 }
2790         } else {
2791                 rc = lprocfs_obd_setup(obd);
2792         }
2793
2794         /* If the basic OSC proc tree construction succeeded then
2795          * lets do the rest. */
2796         if (rc == 0) {
2797                 lproc_osc_attach_seqstat(obd);
2798                 sptlrpc_lprocfs_cliobd_attach(obd);
2799                 ptlrpc_lprocfs_register_obd(obd);
2800         }
2801
2802         /* We need to allocate a few requests more, because
2803          * brw_interpret tries to create new requests before freeing
2804          * previous ones, Ideally we want to have 2x max_rpcs_in_flight
2805          * reserved, but I'm afraid that might be too much wasted RAM
2806          * in fact, so 2 is just my guess and still should work. */
2807         cli->cl_import->imp_rq_pool =
2808                 ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
2809                                     OST_MAXREQSIZE,
2810                                     ptlrpc_add_rqs_to_pool);
2811
2812         INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
2813         ns_register_cancel(obd->obd_namespace, osc_cancel_weight);
2814         RETURN(0);
2815
2816 out_ptlrpcd_work:
2817         if (cli->cl_writeback_work != NULL) {
2818                 ptlrpcd_destroy_work(cli->cl_writeback_work);
2819                 cli->cl_writeback_work = NULL;
2820         }
2821         if (cli->cl_lru_work != NULL) {
2822                 ptlrpcd_destroy_work(cli->cl_lru_work);
2823                 cli->cl_lru_work = NULL;
2824         }
2825 out_client_setup:
2826         client_obd_cleanup(obd);
2827 out_ptlrpcd:
2828         ptlrpcd_decref();
2829         RETURN(rc);
2830 }
2831
2832 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
2833 {
2834         int rc = 0;
2835         ENTRY;
2836
2837         switch (stage) {
2838         case OBD_CLEANUP_EARLY: {
2839                 struct obd_import *imp;
2840                 imp = obd->u.cli.cl_import;
2841                 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
2842                 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
2843                 ptlrpc_deactivate_import(imp);
2844                 spin_lock(&imp->imp_lock);
2845                 imp->imp_pingable = 0;
2846                 spin_unlock(&imp->imp_lock);
2847                 break;
2848         }
2849         case OBD_CLEANUP_EXPORTS: {
2850                 struct client_obd *cli = &obd->u.cli;
2851                 /* LU-464
2852                  * for echo client, export may be on zombie list, wait for
2853                  * zombie thread to cull it, because cli.cl_import will be
2854                  * cleared in client_disconnect_export():
2855                  *   class_export_destroy() -> obd_cleanup() ->
2856                  *   echo_device_free() -> echo_client_cleanup() ->
2857                  *   obd_disconnect() -> osc_disconnect() ->
2858                  *   client_disconnect_export()
2859                  */
2860                 obd_zombie_barrier();
2861                 if (cli->cl_writeback_work) {
2862                         ptlrpcd_destroy_work(cli->cl_writeback_work);
2863                         cli->cl_writeback_work = NULL;
2864                 }
2865                 if (cli->cl_lru_work) {
2866                         ptlrpcd_destroy_work(cli->cl_lru_work);
2867                         cli->cl_lru_work = NULL;
2868                 }
2869                 obd_cleanup_client_import(obd);
2870                 ptlrpc_lprocfs_unregister_obd(obd);
2871                 lprocfs_obd_cleanup(obd);
2872                 break;
2873                 }
2874         }
2875         RETURN(rc);
2876 }
2877
2878 int osc_cleanup(struct obd_device *obd)
2879 {
2880         struct client_obd *cli = &obd->u.cli;
2881         int rc;
2882
2883         ENTRY;
2884
2885         /* lru cleanup */
2886         if (cli->cl_cache != NULL) {
2887                 LASSERT(atomic_read(&cli->cl_cache->ccc_users) > 0);
2888                 spin_lock(&cli->cl_cache->ccc_lru_lock);
2889                 list_del_init(&cli->cl_lru_osc);
2890                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
2891                 cli->cl_lru_left = NULL;
2892                 cl_cache_decref(cli->cl_cache);
2893                 cli->cl_cache = NULL;
2894         }
2895
2896         /* free memory of osc quota cache */
2897         osc_quota_cleanup(obd);
2898
2899         rc = client_obd_cleanup(obd);
2900
2901         ptlrpcd_decref();
2902         RETURN(rc);
2903 }
2904
2905 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
2906 {
2907         int rc = class_process_proc_param(PARAM_OSC, obd->obd_vars, lcfg, obd);
2908         return rc > 0 ? 0: rc;
2909 }
2910
2911 static int osc_process_config(struct obd_device *obd, size_t len, void *buf)
2912 {
2913         return osc_process_config_base(obd, buf);
2914 }
2915
2916 static struct obd_ops osc_obd_ops = {
2917         .o_owner                = THIS_MODULE,
2918         .o_setup                = osc_setup,
2919         .o_precleanup           = osc_precleanup,
2920         .o_cleanup              = osc_cleanup,
2921         .o_add_conn             = client_import_add_conn,
2922         .o_del_conn             = client_import_del_conn,
2923         .o_connect              = client_connect_import,
2924         .o_reconnect            = osc_reconnect,
2925         .o_disconnect           = osc_disconnect,
2926         .o_statfs               = osc_statfs,
2927         .o_statfs_async         = osc_statfs_async,
2928         .o_create               = osc_create,
2929         .o_destroy              = osc_destroy,
2930         .o_getattr              = osc_getattr,
2931         .o_setattr              = osc_setattr,
2932         .o_iocontrol            = osc_iocontrol,
2933         .o_set_info_async       = osc_set_info_async,
2934         .o_import_event         = osc_import_event,
2935         .o_process_config       = osc_process_config,
2936         .o_quotactl             = osc_quotactl,
2937         .o_quotacheck           = osc_quotacheck,
2938 };
2939
2940 static int __init osc_init(void)
2941 {
2942         bool enable_proc = true;
2943         struct obd_type *type;
2944         int rc;
2945         ENTRY;
2946
2947         /* print an address of _any_ initialized kernel symbol from this
2948          * module, to allow debugging with gdb that doesn't support data
2949          * symbols from modules.*/
2950         CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
2951
2952         rc = lu_kmem_init(osc_caches);
2953         if (rc)
2954                 RETURN(rc);
2955
2956         type = class_search_type(LUSTRE_OSP_NAME);
2957         if (type != NULL && type->typ_procsym != NULL)
2958                 enable_proc = false;
2959
2960         rc = class_register_type(&osc_obd_ops, NULL, enable_proc, NULL,
2961                                  LUSTRE_OSC_NAME, &osc_device_type);
2962         if (rc) {
2963                 lu_kmem_fini(osc_caches);
2964                 RETURN(rc);
2965         }
2966
2967         RETURN(rc);
2968 }
2969
2970 static void /*__exit*/ osc_exit(void)
2971 {
2972         class_unregister_type(LUSTRE_OSC_NAME);
2973         lu_kmem_fini(osc_caches);
2974 }
2975
2976 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
2977 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
2978 MODULE_LICENSE("GPL");
2979
2980 cfs_module(osc, LUSTRE_VERSION_STRING, osc_init, osc_exit);