Whamcloud - gitweb
732487c169dd8dd3f0f5e2a205f90497753c6914
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2013, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #define DEBUG_SUBSYSTEM S_OSC
38
39 #include <libcfs/libcfs.h>
40
41
42 #include <lustre_dlm.h>
43 #include <lustre_net.h>
44 #include <lustre/lustre_user.h>
45 #include <obd_cksum.h>
46 #include <lustre_ha.h>
47 #include <lprocfs_status.h>
48 #include <lustre_ioctl.h>
49 #include <lustre_log.h>
50 #include <lustre_debug.h>
51 #include <lustre_param.h>
52 #include <lustre_fid.h>
53 #include "osc_internal.h"
54 #include "osc_cl_internal.h"
55
56 struct osc_brw_async_args {
57         struct obdo              *aa_oa;
58         int                       aa_requested_nob;
59         int                       aa_nio_count;
60         obd_count                 aa_page_count;
61         int                       aa_resends;
62         struct brw_page **aa_ppga;
63         struct client_obd        *aa_cli;
64         struct list_head          aa_oaps;
65         struct list_head          aa_exts;
66         struct obd_capa  *aa_ocapa;
67         struct cl_req            *aa_clerq;
68 };
69
70 #define osc_grant_args osc_brw_async_args
71
72 struct osc_async_args {
73         struct obd_info *aa_oi;
74 };
75
76 struct osc_setattr_args {
77         struct obdo             *sa_oa;
78         obd_enqueue_update_f     sa_upcall;
79         void                    *sa_cookie;
80 };
81
82 struct osc_fsync_args {
83         struct obd_info *fa_oi;
84         obd_enqueue_update_f     fa_upcall;
85         void                    *fa_cookie;
86 };
87
88 struct osc_enqueue_args {
89         struct obd_export               *oa_exp;
90         __u64                           *oa_flags;
91         obd_enqueue_update_f             oa_upcall;
92         void                            *oa_cookie;
93         struct ost_lvb                  *oa_lvb;
94         struct lustre_handle            *oa_lockh;
95         struct ldlm_enqueue_info        *oa_ei;
96         unsigned int                     oa_agl:1;
97 };
98
99 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
100 static int brw_interpret(const struct lu_env *env, struct ptlrpc_request *req,
101                          void *data, int rc);
102
103 /* Unpack OSC object metadata from disk storage (LE byte order). */
104 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
105                         struct lov_mds_md *lmm, int lmm_bytes)
106 {
107         int lsm_size;
108         struct obd_import *imp = class_exp2cliimp(exp);
109         ENTRY;
110
111         if (lmm != NULL) {
112                 if (lmm_bytes < sizeof(*lmm)) {
113                         CERROR("%s: lov_mds_md too small: %d, need %d\n",
114                                exp->exp_obd->obd_name, lmm_bytes,
115                                (int)sizeof(*lmm));
116                         RETURN(-EINVAL);
117                 }
118                 /* XXX LOV_MAGIC etc check? */
119
120                 if (unlikely(ostid_id(&lmm->lmm_oi) == 0)) {
121                         CERROR("%s: zero lmm_object_id: rc = %d\n",
122                                exp->exp_obd->obd_name, -EINVAL);
123                         RETURN(-EINVAL);
124                 }
125         }
126
127         lsm_size = lov_stripe_md_size(1);
128         if (lsmp == NULL)
129                 RETURN(lsm_size);
130
131         if (*lsmp != NULL && lmm == NULL) {
132                 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
133                 OBD_FREE(*lsmp, lsm_size);
134                 *lsmp = NULL;
135                 RETURN(0);
136         }
137
138         if (*lsmp == NULL) {
139                 OBD_ALLOC(*lsmp, lsm_size);
140                 if (unlikely(*lsmp == NULL))
141                         RETURN(-ENOMEM);
142                 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
143                 if (unlikely((*lsmp)->lsm_oinfo[0] == NULL)) {
144                         OBD_FREE(*lsmp, lsm_size);
145                         RETURN(-ENOMEM);
146                 }
147                 loi_init((*lsmp)->lsm_oinfo[0]);
148         } else if (unlikely(ostid_id(&(*lsmp)->lsm_oi) == 0)) {
149                 RETURN(-EBADF);
150         }
151
152         if (lmm != NULL)
153                 /* XXX zero *lsmp? */
154                 ostid_le_to_cpu(&lmm->lmm_oi, &(*lsmp)->lsm_oi);
155
156         if (imp != NULL &&
157             (imp->imp_connect_data.ocd_connect_flags & OBD_CONNECT_MAXBYTES))
158                 (*lsmp)->lsm_maxbytes = imp->imp_connect_data.ocd_maxbytes;
159         else
160                 (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
161
162         RETURN(lsm_size);
163 }
164
165 static inline void osc_pack_capa(struct ptlrpc_request *req,
166                                  struct ost_body *body, void *capa)
167 {
168         struct obd_capa *oc = (struct obd_capa *)capa;
169         struct lustre_capa *c;
170
171         if (!capa)
172                 return;
173
174         c = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
175         LASSERT(c);
176         capa_cpy(c, oc);
177         body->oa.o_valid |= OBD_MD_FLOSSCAPA;
178         DEBUG_CAPA(D_SEC, c, "pack");
179 }
180
181 static inline void osc_pack_req_body(struct ptlrpc_request *req,
182                                      struct obd_info *oinfo)
183 {
184         struct ost_body *body;
185
186         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
187         LASSERT(body);
188
189         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
190                              oinfo->oi_oa);
191         osc_pack_capa(req, body, oinfo->oi_capa);
192 }
193
194 static inline void osc_set_capa_size(struct ptlrpc_request *req,
195                                      const struct req_msg_field *field,
196                                      struct obd_capa *oc)
197 {
198         if (oc == NULL)
199                 req_capsule_set_size(&req->rq_pill, field, RCL_CLIENT, 0);
200         else
201                 /* it is already calculated as sizeof struct obd_capa */
202                 ;
203 }
204
205 static int osc_getattr_interpret(const struct lu_env *env,
206                                  struct ptlrpc_request *req,
207                                  struct osc_async_args *aa, int rc)
208 {
209         struct ost_body *body;
210         ENTRY;
211
212         if (rc != 0)
213                 GOTO(out, rc);
214
215         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
216         if (body) {
217                 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
218                 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
219                                      aa->aa_oi->oi_oa, &body->oa);
220
221                 /* This should really be sent by the OST */
222                 aa->aa_oi->oi_oa->o_blksize = DT_MAX_BRW_SIZE;
223                 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
224         } else {
225                 CDEBUG(D_INFO, "can't unpack ost_body\n");
226                 rc = -EPROTO;
227                 aa->aa_oi->oi_oa->o_valid = 0;
228         }
229 out:
230         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
231         RETURN(rc);
232 }
233
234 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
235                              struct ptlrpc_request_set *set)
236 {
237         struct ptlrpc_request *req;
238         struct osc_async_args *aa;
239         int                    rc;
240         ENTRY;
241
242         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
243         if (req == NULL)
244                 RETURN(-ENOMEM);
245
246         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
247         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
248         if (rc) {
249                 ptlrpc_request_free(req);
250                 RETURN(rc);
251         }
252
253         osc_pack_req_body(req, oinfo);
254
255         ptlrpc_request_set_replen(req);
256         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_getattr_interpret;
257
258         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
259         aa = ptlrpc_req_async_args(req);
260         aa->aa_oi = oinfo;
261
262         ptlrpc_set_add_req(set, req);
263         RETURN(0);
264 }
265
266 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
267                        struct obd_info *oinfo)
268 {
269         struct ptlrpc_request *req;
270         struct ost_body       *body;
271         int                    rc;
272         ENTRY;
273
274         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
275         if (req == NULL)
276                 RETURN(-ENOMEM);
277
278         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
279         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
280         if (rc) {
281                 ptlrpc_request_free(req);
282                 RETURN(rc);
283         }
284
285         osc_pack_req_body(req, oinfo);
286
287         ptlrpc_request_set_replen(req);
288
289         rc = ptlrpc_queue_wait(req);
290         if (rc)
291                 GOTO(out, rc);
292
293         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
294         if (body == NULL)
295                 GOTO(out, rc = -EPROTO);
296
297         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
298         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oinfo->oi_oa,
299                              &body->oa);
300
301         oinfo->oi_oa->o_blksize = cli_brw_size(exp->exp_obd);
302         oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
303
304         EXIT;
305  out:
306         ptlrpc_req_finished(req);
307         return rc;
308 }
309
310 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
311                        struct obd_info *oinfo, struct obd_trans_info *oti)
312 {
313         struct ptlrpc_request *req;
314         struct ost_body       *body;
315         int                    rc;
316         ENTRY;
317
318         LASSERT(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP);
319
320         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
321         if (req == NULL)
322                 RETURN(-ENOMEM);
323
324         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
325         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
326         if (rc) {
327                 ptlrpc_request_free(req);
328                 RETURN(rc);
329         }
330
331         osc_pack_req_body(req, oinfo);
332
333         ptlrpc_request_set_replen(req);
334
335         rc = ptlrpc_queue_wait(req);
336         if (rc)
337                 GOTO(out, rc);
338
339         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
340         if (body == NULL)
341                 GOTO(out, rc = -EPROTO);
342
343         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oinfo->oi_oa,
344                              &body->oa);
345
346         EXIT;
347 out:
348         ptlrpc_req_finished(req);
349         RETURN(rc);
350 }
351
352 static int osc_setattr_interpret(const struct lu_env *env,
353                                  struct ptlrpc_request *req,
354                                  struct osc_setattr_args *sa, int rc)
355 {
356         struct ost_body *body;
357         ENTRY;
358
359         if (rc != 0)
360                 GOTO(out, rc);
361
362         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
363         if (body == NULL)
364                 GOTO(out, rc = -EPROTO);
365
366         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, sa->sa_oa,
367                              &body->oa);
368 out:
369         rc = sa->sa_upcall(sa->sa_cookie, rc);
370         RETURN(rc);
371 }
372
373 int osc_setattr_async_base(struct obd_export *exp, struct obd_info *oinfo,
374                            struct obd_trans_info *oti,
375                            obd_enqueue_update_f upcall, void *cookie,
376                            struct ptlrpc_request_set *rqset)
377 {
378         struct ptlrpc_request   *req;
379         struct osc_setattr_args *sa;
380         int                      rc;
381         ENTRY;
382
383         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
384         if (req == NULL)
385                 RETURN(-ENOMEM);
386
387         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
388         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
389         if (rc) {
390                 ptlrpc_request_free(req);
391                 RETURN(rc);
392         }
393
394         if (oti && oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)
395                 oinfo->oi_oa->o_lcookie = *oti->oti_logcookies;
396
397         osc_pack_req_body(req, oinfo);
398
399         ptlrpc_request_set_replen(req);
400
401         /* do mds to ost setattr asynchronously */
402         if (!rqset) {
403                 /* Do not wait for response. */
404                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
405         } else {
406                 req->rq_interpret_reply =
407                         (ptlrpc_interpterer_t)osc_setattr_interpret;
408
409                 CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
410                 sa = ptlrpc_req_async_args(req);
411                 sa->sa_oa = oinfo->oi_oa;
412                 sa->sa_upcall = upcall;
413                 sa->sa_cookie = cookie;
414
415                 if (rqset == PTLRPCD_SET)
416                         ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
417                 else
418                         ptlrpc_set_add_req(rqset, req);
419         }
420
421         RETURN(0);
422 }
423
424 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
425                              struct obd_trans_info *oti,
426                              struct ptlrpc_request_set *rqset)
427 {
428         return osc_setattr_async_base(exp, oinfo, oti,
429                                       oinfo->oi_cb_up, oinfo, rqset);
430 }
431
432 int osc_real_create(struct obd_export *exp, struct obdo *oa,
433                     struct lov_stripe_md **ea, struct obd_trans_info *oti)
434 {
435         struct ptlrpc_request *req;
436         struct ost_body       *body;
437         struct lov_stripe_md  *lsm;
438         int                    rc;
439         ENTRY;
440
441         LASSERT(oa);
442         LASSERT(ea);
443
444         lsm = *ea;
445         if (!lsm) {
446                 rc = obd_alloc_memmd(exp, &lsm);
447                 if (rc < 0)
448                         RETURN(rc);
449         }
450
451         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
452         if (req == NULL)
453                 GOTO(out, rc = -ENOMEM);
454
455         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
456         if (rc) {
457                 ptlrpc_request_free(req);
458                 GOTO(out, rc);
459         }
460
461         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
462         LASSERT(body);
463
464         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
465
466         ptlrpc_request_set_replen(req);
467
468         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
469             oa->o_flags == OBD_FL_DELORPHAN) {
470                 DEBUG_REQ(D_HA, req,
471                           "delorphan from OST integration");
472                 /* Don't resend the delorphan req */
473                 req->rq_no_resend = req->rq_no_delay = 1;
474         }
475
476         rc = ptlrpc_queue_wait(req);
477         if (rc)
478                 GOTO(out_req, rc);
479
480         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
481         if (body == NULL)
482                 GOTO(out_req, rc = -EPROTO);
483
484         CDEBUG(D_INFO, "oa flags %x\n", oa->o_flags);
485         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
486
487         oa->o_blksize = cli_brw_size(exp->exp_obd);
488         oa->o_valid |= OBD_MD_FLBLKSZ;
489
490         /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
491          * have valid lsm_oinfo data structs, so don't go touching that.
492          * This needs to be fixed in a big way.
493          */
494         lsm->lsm_oi = oa->o_oi;
495         *ea = lsm;
496
497         if (oti != NULL) {
498                 if (oa->o_valid & OBD_MD_FLCOOKIE) {
499                         if (oti->oti_logcookies == NULL)
500                                 oti->oti_logcookies = &oti->oti_onecookie;
501
502                         *oti->oti_logcookies = oa->o_lcookie;
503                 }
504         }
505
506         CDEBUG(D_HA, "transno: "LPD64"\n",
507                lustre_msg_get_transno(req->rq_repmsg));
508 out_req:
509         ptlrpc_req_finished(req);
510 out:
511         if (rc && !*ea)
512                 obd_free_memmd(exp, &lsm);
513         RETURN(rc);
514 }
515
516 int osc_punch_base(struct obd_export *exp, struct obd_info *oinfo,
517                    obd_enqueue_update_f upcall, void *cookie,
518                    struct ptlrpc_request_set *rqset)
519 {
520         struct ptlrpc_request   *req;
521         struct osc_setattr_args *sa;
522         struct ost_body         *body;
523         int                      rc;
524         ENTRY;
525
526         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
527         if (req == NULL)
528                 RETURN(-ENOMEM);
529
530         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
531         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
532         if (rc) {
533                 ptlrpc_request_free(req);
534                 RETURN(rc);
535         }
536         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
537         ptlrpc_at_set_req_timeout(req);
538
539         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
540         LASSERT(body);
541         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
542                              oinfo->oi_oa);
543         osc_pack_capa(req, body, oinfo->oi_capa);
544
545         ptlrpc_request_set_replen(req);
546
547         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
548         CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
549         sa = ptlrpc_req_async_args(req);
550         sa->sa_oa     = oinfo->oi_oa;
551         sa->sa_upcall = upcall;
552         sa->sa_cookie = cookie;
553         if (rqset == PTLRPCD_SET)
554                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
555         else
556                 ptlrpc_set_add_req(rqset, req);
557
558         RETURN(0);
559 }
560
561 static int osc_sync_interpret(const struct lu_env *env,
562                               struct ptlrpc_request *req,
563                               void *arg, int rc)
564 {
565         struct osc_fsync_args *fa = arg;
566         struct ost_body *body;
567         ENTRY;
568
569         if (rc)
570                 GOTO(out, rc);
571
572         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
573         if (body == NULL) {
574                 CERROR ("can't unpack ost_body\n");
575                 GOTO(out, rc = -EPROTO);
576         }
577
578         *fa->fa_oi->oi_oa = body->oa;
579 out:
580         rc = fa->fa_upcall(fa->fa_cookie, rc);
581         RETURN(rc);
582 }
583
584 int osc_sync_base(struct obd_export *exp, struct obd_info *oinfo,
585                   obd_enqueue_update_f upcall, void *cookie,
586                   struct ptlrpc_request_set *rqset)
587 {
588         struct ptlrpc_request *req;
589         struct ost_body       *body;
590         struct osc_fsync_args *fa;
591         int                    rc;
592         ENTRY;
593
594         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
595         if (req == NULL)
596                 RETURN(-ENOMEM);
597
598         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
599         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
600         if (rc) {
601                 ptlrpc_request_free(req);
602                 RETURN(rc);
603         }
604
605         /* overload the size and blocks fields in the oa with start/end */
606         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
607         LASSERT(body);
608         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
609                              oinfo->oi_oa);
610         osc_pack_capa(req, body, oinfo->oi_capa);
611
612         ptlrpc_request_set_replen(req);
613         req->rq_interpret_reply = osc_sync_interpret;
614
615         CLASSERT(sizeof(*fa) <= sizeof(req->rq_async_args));
616         fa = ptlrpc_req_async_args(req);
617         fa->fa_oi = oinfo;
618         fa->fa_upcall = upcall;
619         fa->fa_cookie = cookie;
620
621         if (rqset == PTLRPCD_SET)
622                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
623         else
624                 ptlrpc_set_add_req(rqset, req);
625
626         RETURN (0);
627 }
628
629 /* Find and cancel locally locks matched by @mode in the resource found by
630  * @objid. Found locks are added into @cancel list. Returns the amount of
631  * locks added to @cancels list. */
632 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
633                                    struct list_head *cancels,
634                                    ldlm_mode_t mode, __u64 lock_flags)
635 {
636         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
637         struct ldlm_res_id res_id;
638         struct ldlm_resource *res;
639         int count;
640         ENTRY;
641
642         /* Return, i.e. cancel nothing, only if ELC is supported (flag in
643          * export) but disabled through procfs (flag in NS).
644          *
645          * This distinguishes from a case when ELC is not supported originally,
646          * when we still want to cancel locks in advance and just cancel them
647          * locally, without sending any RPC. */
648         if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
649                 RETURN(0);
650
651         ostid_build_res_name(&oa->o_oi, &res_id);
652         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
653         if (IS_ERR(res))
654                 RETURN(0);
655
656         LDLM_RESOURCE_ADDREF(res);
657         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
658                                            lock_flags, 0, NULL);
659         LDLM_RESOURCE_DELREF(res);
660         ldlm_resource_putref(res);
661         RETURN(count);
662 }
663
664 static int osc_destroy_interpret(const struct lu_env *env,
665                                  struct ptlrpc_request *req, void *data,
666                                  int rc)
667 {
668         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
669
670         atomic_dec(&cli->cl_destroy_in_flight);
671         wake_up(&cli->cl_destroy_waitq);
672         return 0;
673 }
674
675 static int osc_can_send_destroy(struct client_obd *cli)
676 {
677         if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
678             cli->cl_max_rpcs_in_flight) {
679                 /* The destroy request can be sent */
680                 return 1;
681         }
682         if (atomic_dec_return(&cli->cl_destroy_in_flight) <
683             cli->cl_max_rpcs_in_flight) {
684                 /*
685                  * The counter has been modified between the two atomic
686                  * operations.
687                  */
688                 wake_up(&cli->cl_destroy_waitq);
689         }
690         return 0;
691 }
692
693 int osc_create(const struct lu_env *env, struct obd_export *exp,
694                struct obdo *oa, struct lov_stripe_md **ea,
695                struct obd_trans_info *oti)
696 {
697         int rc = 0;
698         ENTRY;
699
700         LASSERT(oa);
701         LASSERT(ea);
702         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
703
704         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
705             oa->o_flags == OBD_FL_RECREATE_OBJS) {
706                 RETURN(osc_real_create(exp, oa, ea, oti));
707         }
708
709         if (!fid_seq_is_mdt(ostid_seq(&oa->o_oi)))
710                 RETURN(osc_real_create(exp, oa, ea, oti));
711
712         /* we should not get here anymore */
713         LBUG();
714
715         RETURN(rc);
716 }
717
718 /* Destroy requests can be async always on the client, and we don't even really
719  * care about the return code since the client cannot do anything at all about
720  * a destroy failure.
721  * When the MDS is unlinking a filename, it saves the file objects into a
722  * recovery llog, and these object records are cancelled when the OST reports
723  * they were destroyed and sync'd to disk (i.e. transaction committed).
724  * If the client dies, or the OST is down when the object should be destroyed,
725  * the records are not cancelled, and when the OST reconnects to the MDS next,
726  * it will retrieve the llog unlink logs and then sends the log cancellation
727  * cookies to the MDS after committing destroy transactions. */
728 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
729                        struct obdo *oa, struct lov_stripe_md *ea,
730                        struct obd_trans_info *oti, struct obd_export *md_export,
731                        void *capa)
732 {
733         struct client_obd     *cli = &exp->exp_obd->u.cli;
734         struct ptlrpc_request *req;
735         struct ost_body       *body;
736         struct list_head       cancels = LIST_HEAD_INIT(cancels);
737         int rc, count;
738         ENTRY;
739
740         if (!oa) {
741                 CDEBUG(D_INFO, "oa NULL\n");
742                 RETURN(-EINVAL);
743         }
744
745         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
746                                         LDLM_FL_DISCARD_DATA);
747
748         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
749         if (req == NULL) {
750                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
751                 RETURN(-ENOMEM);
752         }
753
754         osc_set_capa_size(req, &RMF_CAPA1, (struct obd_capa *)capa);
755         rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
756                                0, &cancels, count);
757         if (rc) {
758                 ptlrpc_request_free(req);
759                 RETURN(rc);
760         }
761
762         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
763         ptlrpc_at_set_req_timeout(req);
764
765         if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
766                 oa->o_lcookie = *oti->oti_logcookies;
767         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
768         LASSERT(body);
769         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
770
771         osc_pack_capa(req, body, (struct obd_capa *)capa);
772         ptlrpc_request_set_replen(req);
773
774         /* If osc_destory is for destroying the unlink orphan,
775          * sent from MDT to OST, which should not be blocked here,
776          * because the process might be triggered by ptlrpcd, and
777          * it is not good to block ptlrpcd thread (b=16006)*/
778         if (!(oa->o_flags & OBD_FL_DELORPHAN)) {
779                 req->rq_interpret_reply = osc_destroy_interpret;
780                 if (!osc_can_send_destroy(cli)) {
781                         struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP,
782                                                           NULL);
783
784                         /*
785                          * Wait until the number of on-going destroy RPCs drops
786                          * under max_rpc_in_flight
787                          */
788                         l_wait_event_exclusive(cli->cl_destroy_waitq,
789                                                osc_can_send_destroy(cli), &lwi);
790                 }
791         }
792
793         /* Do not wait for response */
794         ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
795         RETURN(0);
796 }
797
798 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
799                                 long writing_bytes)
800 {
801         obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
802
803         LASSERT(!(oa->o_valid & bits));
804
805         oa->o_valid |= bits;
806         client_obd_list_lock(&cli->cl_loi_list_lock);
807         oa->o_dirty = cli->cl_dirty_pages << PAGE_CACHE_SHIFT;
808         if (unlikely(cli->cl_dirty_pages - cli->cl_dirty_transit >
809                      cli->cl_dirty_max_pages)) {
810                 CERROR("dirty %lu - %lu > dirty_max %lu\n",
811                        cli->cl_dirty_pages, cli->cl_dirty_transit,
812                        cli->cl_dirty_max_pages);
813                 oa->o_undirty = 0;
814         } else if (unlikely(atomic_read(&obd_dirty_pages) -
815                             atomic_read(&obd_dirty_transit_pages) >
816                             (long)(obd_max_dirty_pages + 1))) {
817                 /* The atomic_read() allowing the atomic_inc() are
818                  * not covered by a lock thus they may safely race and trip
819                  * this CERROR() unless we add in a small fudge factor (+1). */
820                 CERROR("%s: dirty %d - %d > system dirty_max %d\n",
821                        cli->cl_import->imp_obd->obd_name,
822                        atomic_read(&obd_dirty_pages),
823                        atomic_read(&obd_dirty_transit_pages),
824                        obd_max_dirty_pages);
825                 oa->o_undirty = 0;
826         } else if (unlikely(cli->cl_dirty_max_pages - cli->cl_dirty_pages >
827                             0x7fffffff)) {
828                 CERROR("dirty %lu - dirty_max %lu too big???\n",
829                        cli->cl_dirty_pages, cli->cl_dirty_max_pages);
830                 oa->o_undirty = 0;
831         } else {
832                 unsigned long max_in_flight = (cli->cl_max_pages_per_rpc <<
833                                       PAGE_CACHE_SHIFT) *
834                                      (cli->cl_max_rpcs_in_flight + 1);
835                 oa->o_undirty = max(cli->cl_dirty_max_pages << PAGE_CACHE_SHIFT,
836                                     max_in_flight);
837         }
838         oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
839         oa->o_dropped = cli->cl_lost_grant;
840         cli->cl_lost_grant = 0;
841         client_obd_list_unlock(&cli->cl_loi_list_lock);
842         CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
843                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
844
845 }
846
847 void osc_update_next_shrink(struct client_obd *cli)
848 {
849         cli->cl_next_shrink_grant =
850                 cfs_time_shift(cli->cl_grant_shrink_interval);
851         CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
852                cli->cl_next_shrink_grant);
853 }
854
855 static void __osc_update_grant(struct client_obd *cli, obd_size grant)
856 {
857         client_obd_list_lock(&cli->cl_loi_list_lock);
858         cli->cl_avail_grant += grant;
859         client_obd_list_unlock(&cli->cl_loi_list_lock);
860 }
861
862 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
863 {
864         if (body->oa.o_valid & OBD_MD_FLGRANT) {
865                 CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
866                 __osc_update_grant(cli, body->oa.o_grant);
867         }
868 }
869
870 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
871                               obd_count keylen, void *key, obd_count vallen,
872                               void *val, struct ptlrpc_request_set *set);
873
874 static int osc_shrink_grant_interpret(const struct lu_env *env,
875                                       struct ptlrpc_request *req,
876                                       void *aa, int rc)
877 {
878         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
879         struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
880         struct ost_body *body;
881
882         if (rc != 0) {
883                 __osc_update_grant(cli, oa->o_grant);
884                 GOTO(out, rc);
885         }
886
887         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
888         LASSERT(body);
889         osc_update_grant(cli, body);
890 out:
891         OBDO_FREE(oa);
892         return rc;
893 }
894
895 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
896 {
897         client_obd_list_lock(&cli->cl_loi_list_lock);
898         oa->o_grant = cli->cl_avail_grant / 4;
899         cli->cl_avail_grant -= oa->o_grant;
900         client_obd_list_unlock(&cli->cl_loi_list_lock);
901         if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
902                 oa->o_valid |= OBD_MD_FLFLAGS;
903                 oa->o_flags = 0;
904         }
905         oa->o_flags |= OBD_FL_SHRINK_GRANT;
906         osc_update_next_shrink(cli);
907 }
908
909 /* Shrink the current grant, either from some large amount to enough for a
910  * full set of in-flight RPCs, or if we have already shrunk to that limit
911  * then to enough for a single RPC.  This avoids keeping more grant than
912  * needed, and avoids shrinking the grant piecemeal. */
913 static int osc_shrink_grant(struct client_obd *cli)
914 {
915         __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) *
916                              (cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT);
917
918         client_obd_list_lock(&cli->cl_loi_list_lock);
919         if (cli->cl_avail_grant <= target_bytes)
920                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
921         client_obd_list_unlock(&cli->cl_loi_list_lock);
922
923         return osc_shrink_grant_to_target(cli, target_bytes);
924 }
925
926 int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
927 {
928         int                     rc = 0;
929         struct ost_body        *body;
930         ENTRY;
931
932         client_obd_list_lock(&cli->cl_loi_list_lock);
933         /* Don't shrink if we are already above or below the desired limit
934          * We don't want to shrink below a single RPC, as that will negatively
935          * impact block allocation and long-term performance. */
936         if (target_bytes < cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT)
937                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
938
939         if (target_bytes >= cli->cl_avail_grant) {
940                 client_obd_list_unlock(&cli->cl_loi_list_lock);
941                 RETURN(0);
942         }
943         client_obd_list_unlock(&cli->cl_loi_list_lock);
944
945         OBD_ALLOC_PTR(body);
946         if (!body)
947                 RETURN(-ENOMEM);
948
949         osc_announce_cached(cli, &body->oa, 0);
950
951         client_obd_list_lock(&cli->cl_loi_list_lock);
952         body->oa.o_grant = cli->cl_avail_grant - target_bytes;
953         cli->cl_avail_grant = target_bytes;
954         client_obd_list_unlock(&cli->cl_loi_list_lock);
955         if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
956                 body->oa.o_valid |= OBD_MD_FLFLAGS;
957                 body->oa.o_flags = 0;
958         }
959         body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
960         osc_update_next_shrink(cli);
961
962         rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
963                                 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
964                                 sizeof(*body), body, NULL);
965         if (rc != 0)
966                 __osc_update_grant(cli, body->oa.o_grant);
967         OBD_FREE_PTR(body);
968         RETURN(rc);
969 }
970
971 static int osc_should_shrink_grant(struct client_obd *client)
972 {
973         cfs_time_t time = cfs_time_current();
974         cfs_time_t next_shrink = client->cl_next_shrink_grant;
975
976         if ((client->cl_import->imp_connect_data.ocd_connect_flags &
977              OBD_CONNECT_GRANT_SHRINK) == 0)
978                 return 0;
979
980         if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
981                 /* Get the current RPC size directly, instead of going via:
982                  * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
983                  * Keep comment here so that it can be found by searching. */
984                 int brw_size = client->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
985
986                 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
987                     client->cl_avail_grant > brw_size)
988                         return 1;
989                 else
990                         osc_update_next_shrink(client);
991         }
992         return 0;
993 }
994
995 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
996 {
997         struct client_obd *client;
998
999         list_for_each_entry(client, &item->ti_obd_list, cl_grant_shrink_list) {
1000                 if (osc_should_shrink_grant(client))
1001                         osc_shrink_grant(client);
1002         }
1003         return 0;
1004 }
1005
1006 static int osc_add_shrink_grant(struct client_obd *client)
1007 {
1008         int rc;
1009
1010         rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
1011                                        TIMEOUT_GRANT,
1012                                        osc_grant_shrink_grant_cb, NULL,
1013                                        &client->cl_grant_shrink_list);
1014         if (rc) {
1015                 CERROR("add grant client %s error %d\n",
1016                         client->cl_import->imp_obd->obd_name, rc);
1017                 return rc;
1018         }
1019         CDEBUG(D_CACHE, "add grant client %s \n",
1020                client->cl_import->imp_obd->obd_name);
1021         osc_update_next_shrink(client);
1022         return 0;
1023 }
1024
1025 static int osc_del_shrink_grant(struct client_obd *client)
1026 {
1027         return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
1028                                          TIMEOUT_GRANT);
1029 }
1030
1031 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
1032 {
1033         /*
1034          * ocd_grant is the total grant amount we're expect to hold: if we've
1035          * been evicted, it's the new avail_grant amount, cl_dirty_pages will
1036          * drop to 0 as inflight RPCs fail out; otherwise, it's avail_grant +
1037          * dirty.
1038          *
1039          * race is tolerable here: if we're evicted, but imp_state already
1040          * left EVICTED state, then cl_dirty_pages must be 0 already.
1041          */
1042         client_obd_list_lock(&cli->cl_loi_list_lock);
1043         if (cli->cl_import->imp_state == LUSTRE_IMP_EVICTED)
1044                 cli->cl_avail_grant = ocd->ocd_grant;
1045         else
1046                 cli->cl_avail_grant = ocd->ocd_grant -
1047                                       (cli->cl_dirty_pages << PAGE_CACHE_SHIFT);
1048
1049         if (cli->cl_avail_grant < 0) {
1050                 CWARN("%s: available grant < 0: avail/ocd/dirty %ld/%u/%ld\n",
1051                       cli->cl_import->imp_obd->obd_name, cli->cl_avail_grant,
1052                       ocd->ocd_grant, cli->cl_dirty_pages << PAGE_CACHE_SHIFT);
1053                 /* workaround for servers which do not have the patch from
1054                  * LU-2679 */
1055                 cli->cl_avail_grant = ocd->ocd_grant;
1056         }
1057
1058         /* determine the appropriate chunk size used by osc_extent. */
1059         cli->cl_chunkbits = max_t(int, PAGE_CACHE_SHIFT, ocd->ocd_blocksize);
1060         client_obd_list_unlock(&cli->cl_loi_list_lock);
1061
1062         CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld."
1063                 "chunk bits: %d.\n", cli->cl_import->imp_obd->obd_name,
1064                 cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits);
1065
1066         if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
1067             list_empty(&cli->cl_grant_shrink_list))
1068                 osc_add_shrink_grant(cli);
1069 }
1070
1071 /* We assume that the reason this OSC got a short read is because it read
1072  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1073  * via the LOV, and it _knows_ it's reading inside the file, it's just that
1074  * this stripe never got written at or beyond this stripe offset yet. */
1075 static void handle_short_read(int nob_read, obd_count page_count,
1076                               struct brw_page **pga)
1077 {
1078         char *ptr;
1079         int i = 0;
1080
1081         /* skip bytes read OK */
1082         while (nob_read > 0) {
1083                 LASSERT (page_count > 0);
1084
1085                 if (pga[i]->count > nob_read) {
1086                         /* EOF inside this page */
1087                         ptr = kmap(pga[i]->pg) +
1088                                 (pga[i]->off & ~CFS_PAGE_MASK);
1089                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1090                         kunmap(pga[i]->pg);
1091                         page_count--;
1092                         i++;
1093                         break;
1094                 }
1095
1096                 nob_read -= pga[i]->count;
1097                 page_count--;
1098                 i++;
1099         }
1100
1101         /* zero remaining pages */
1102         while (page_count-- > 0) {
1103                 ptr = kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
1104                 memset(ptr, 0, pga[i]->count);
1105                 kunmap(pga[i]->pg);
1106                 i++;
1107         }
1108 }
1109
1110 static int check_write_rcs(struct ptlrpc_request *req,
1111                            int requested_nob, int niocount,
1112                            obd_count page_count, struct brw_page **pga)
1113 {
1114         int     i;
1115         __u32   *remote_rcs;
1116
1117         remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1118                                                   sizeof(*remote_rcs) *
1119                                                   niocount);
1120         if (remote_rcs == NULL) {
1121                 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1122                 return(-EPROTO);
1123         }
1124
1125         /* return error if any niobuf was in error */
1126         for (i = 0; i < niocount; i++) {
1127                 if ((int)remote_rcs[i] < 0)
1128                         return(remote_rcs[i]);
1129
1130                 if (remote_rcs[i] != 0) {
1131                         CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1132                                 i, remote_rcs[i], req);
1133                         return(-EPROTO);
1134                 }
1135         }
1136
1137         if (req->rq_bulk->bd_nob_transferred != requested_nob) {
1138                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1139                        req->rq_bulk->bd_nob_transferred, requested_nob);
1140                 return(-EPROTO);
1141         }
1142
1143         return (0);
1144 }
1145
1146 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1147 {
1148         if (p1->flag != p2->flag) {
1149                 unsigned mask = ~(OBD_BRW_FROM_GRANT | OBD_BRW_NOCACHE |
1150                                   OBD_BRW_SYNC       | OBD_BRW_ASYNC   |
1151                                   OBD_BRW_NOQUOTA    | OBD_BRW_SOFT_SYNC);
1152
1153                 /* warn if we try to combine flags that we don't know to be
1154                  * safe to combine */
1155                 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
1156                         CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
1157                               "report this at https://jira.hpdd.intel.com/\n",
1158                               p1->flag, p2->flag);
1159                 }
1160                 return 0;
1161         }
1162
1163         return (p1->off + p1->count == p2->off);
1164 }
1165
1166 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
1167                                    struct brw_page **pga, int opc,
1168                                    cksum_type_t cksum_type)
1169 {
1170         __u32                           cksum;
1171         int                             i = 0;
1172         struct cfs_crypto_hash_desc     *hdesc;
1173         unsigned int                    bufsize;
1174         int                             err;
1175         unsigned char                   cfs_alg = cksum_obd2cfs(cksum_type);
1176
1177         LASSERT(pg_count > 0);
1178
1179         hdesc = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1180         if (IS_ERR(hdesc)) {
1181                 CERROR("Unable to initialize checksum hash %s\n",
1182                        cfs_crypto_hash_name(cfs_alg));
1183                 return PTR_ERR(hdesc);
1184         }
1185
1186         while (nob > 0 && pg_count > 0) {
1187                 int count = pga[i]->count > nob ? nob : pga[i]->count;
1188
1189                 /* corrupt the data before we compute the checksum, to
1190                  * simulate an OST->client data error */
1191                 if (i == 0 && opc == OST_READ &&
1192                     OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
1193                         unsigned char *ptr = kmap(pga[i]->pg);
1194                         int off = pga[i]->off & ~CFS_PAGE_MASK;
1195
1196                         memcpy(ptr + off, "bad1", min(4, nob));
1197                         kunmap(pga[i]->pg);
1198                 }
1199                 cfs_crypto_hash_update_page(hdesc, pga[i]->pg,
1200                                             pga[i]->off & ~CFS_PAGE_MASK,
1201                                             count);
1202                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d\n",
1203                                (int)(pga[i]->off & ~CFS_PAGE_MASK));
1204
1205                 nob -= pga[i]->count;
1206                 pg_count--;
1207                 i++;
1208         }
1209
1210         bufsize = sizeof(cksum);
1211         err = cfs_crypto_hash_final(hdesc, (unsigned char *)&cksum, &bufsize);
1212
1213         /* For sending we only compute the wrong checksum instead
1214          * of corrupting the data so it is still correct on a redo */
1215         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1216                 cksum++;
1217
1218         return cksum;
1219 }
1220
1221 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
1222                                 struct lov_stripe_md *lsm, obd_count page_count,
1223                                 struct brw_page **pga,
1224                                 struct ptlrpc_request **reqp,
1225                                 struct obd_capa *ocapa, int reserve,
1226                                 int resend)
1227 {
1228         struct ptlrpc_request   *req;
1229         struct ptlrpc_bulk_desc *desc;
1230         struct ost_body         *body;
1231         struct obd_ioobj        *ioobj;
1232         struct niobuf_remote    *niobuf;
1233         int niocount, i, requested_nob, opc, rc;
1234         struct osc_brw_async_args *aa;
1235         struct req_capsule      *pill;
1236         struct brw_page *pg_prev;
1237
1238         ENTRY;
1239         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1240                 RETURN(-ENOMEM); /* Recoverable */
1241         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1242                 RETURN(-EINVAL); /* Fatal */
1243
1244         if ((cmd & OBD_BRW_WRITE) != 0) {
1245                 opc = OST_WRITE;
1246                 req = ptlrpc_request_alloc_pool(cli->cl_import,
1247                                                 cli->cl_import->imp_rq_pool,
1248                                                 &RQF_OST_BRW_WRITE);
1249         } else {
1250                 opc = OST_READ;
1251                 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1252         }
1253         if (req == NULL)
1254                 RETURN(-ENOMEM);
1255
1256         for (niocount = i = 1; i < page_count; i++) {
1257                 if (!can_merge_pages(pga[i - 1], pga[i]))
1258                         niocount++;
1259         }
1260
1261         pill = &req->rq_pill;
1262         req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1263                              sizeof(*ioobj));
1264         req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1265                              niocount * sizeof(*niobuf));
1266         osc_set_capa_size(req, &RMF_CAPA1, ocapa);
1267
1268         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1269         if (rc) {
1270                 ptlrpc_request_free(req);
1271                 RETURN(rc);
1272         }
1273         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1274         ptlrpc_at_set_req_timeout(req);
1275         /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1276          * retry logic */
1277         req->rq_no_retry_einprogress = 1;
1278
1279         desc = ptlrpc_prep_bulk_imp(req, page_count,
1280                 cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
1281                 opc == OST_WRITE ? BULK_GET_SOURCE : BULK_PUT_SINK,
1282                 OST_BULK_PORTAL);
1283
1284         if (desc == NULL)
1285                 GOTO(out, rc = -ENOMEM);
1286         /* NB request now owns desc and will free it when it gets freed */
1287
1288         body = req_capsule_client_get(pill, &RMF_OST_BODY);
1289         ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1290         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1291         LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1292
1293         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
1294
1295         obdo_to_ioobj(oa, ioobj);
1296         ioobj->ioo_bufcnt = niocount;
1297         /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
1298          * that might be send for this request.  The actual number is decided
1299          * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
1300          * "max - 1" for old client compatibility sending "0", and also so the
1301          * the actual maximum is a power-of-two number, not one less. LU-1431 */
1302         ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
1303         osc_pack_capa(req, body, ocapa);
1304         LASSERT(page_count > 0);
1305         pg_prev = pga[0];
1306         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1307                 struct brw_page *pg = pga[i];
1308                 int poff = pg->off & ~CFS_PAGE_MASK;
1309
1310                 LASSERT(pg->count > 0);
1311                 /* make sure there is no gap in the middle of page array */
1312                 LASSERTF(page_count == 1 ||
1313                          (ergo(i == 0, poff + pg->count == PAGE_CACHE_SIZE) &&
1314                           ergo(i > 0 && i < page_count - 1,
1315                                poff == 0 && pg->count == PAGE_CACHE_SIZE)   &&
1316                           ergo(i == page_count - 1, poff == 0)),
1317                          "i: %d/%d pg: %p off: "LPU64", count: %u\n",
1318                          i, page_count, pg, pg->off, pg->count);
1319                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1320                          "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1321                          " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1322                          i, page_count,
1323                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1324                          pg_prev->pg, page_private(pg_prev->pg),
1325                          pg_prev->pg->index, pg_prev->off);
1326                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1327                         (pg->flag & OBD_BRW_SRVLOCK));
1328
1329                 ptlrpc_prep_bulk_page_pin(desc, pg->pg, poff, pg->count);
1330                 requested_nob += pg->count;
1331
1332                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1333                         niobuf--;
1334                         niobuf->rnb_len += pg->count;
1335                 } else {
1336                         niobuf->rnb_offset = pg->off;
1337                         niobuf->rnb_len    = pg->count;
1338                         niobuf->rnb_flags  = pg->flag;
1339                 }
1340                 pg_prev = pg;
1341         }
1342
1343         LASSERTF((void *)(niobuf - niocount) ==
1344                 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1345                 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1346                 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1347
1348         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1349         if (resend) {
1350                 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1351                         body->oa.o_valid |= OBD_MD_FLFLAGS;
1352                         body->oa.o_flags = 0;
1353                 }
1354                 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1355         }
1356
1357         if (osc_should_shrink_grant(cli))
1358                 osc_shrink_grant_local(cli, &body->oa);
1359
1360         /* size[REQ_REC_OFF] still sizeof (*body) */
1361         if (opc == OST_WRITE) {
1362                 if (cli->cl_checksum &&
1363                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1364                         /* store cl_cksum_type in a local variable since
1365                          * it can be changed via lprocfs */
1366                         cksum_type_t cksum_type = cli->cl_cksum_type;
1367
1368                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1369                                 oa->o_flags &= OBD_FL_LOCAL_MASK;
1370                                 body->oa.o_flags = 0;
1371                         }
1372                         body->oa.o_flags |= cksum_type_pack(cksum_type);
1373                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1374                         body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1375                                                              page_count, pga,
1376                                                              OST_WRITE,
1377                                                              cksum_type);
1378                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1379                                body->oa.o_cksum);
1380                         /* save this in 'oa', too, for later checking */
1381                         oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1382                         oa->o_flags |= cksum_type_pack(cksum_type);
1383                 } else {
1384                         /* clear out the checksum flag, in case this is a
1385                          * resend but cl_checksum is no longer set. b=11238 */
1386                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1387                 }
1388                 oa->o_cksum = body->oa.o_cksum;
1389                 /* 1 RC per niobuf */
1390                 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1391                                      sizeof(__u32) * niocount);
1392         } else {
1393                 if (cli->cl_checksum &&
1394                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1395                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1396                                 body->oa.o_flags = 0;
1397                         body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1398                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1399                 }
1400         }
1401         ptlrpc_request_set_replen(req);
1402
1403         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1404         aa = ptlrpc_req_async_args(req);
1405         aa->aa_oa = oa;
1406         aa->aa_requested_nob = requested_nob;
1407         aa->aa_nio_count = niocount;
1408         aa->aa_page_count = page_count;
1409         aa->aa_resends = 0;
1410         aa->aa_ppga = pga;
1411         aa->aa_cli = cli;
1412         INIT_LIST_HEAD(&aa->aa_oaps);
1413         if (ocapa && reserve)
1414                 aa->aa_ocapa = capa_get(ocapa);
1415
1416         *reqp = req;
1417         RETURN(0);
1418
1419  out:
1420         ptlrpc_req_finished(req);
1421         RETURN(rc);
1422 }
1423
1424 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1425                                 __u32 client_cksum, __u32 server_cksum, int nob,
1426                                 obd_count page_count, struct brw_page **pga,
1427                                 cksum_type_t client_cksum_type)
1428 {
1429         __u32 new_cksum;
1430         char *msg;
1431         cksum_type_t cksum_type;
1432
1433         if (server_cksum == client_cksum) {
1434                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1435                 return 0;
1436         }
1437
1438         cksum_type = cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1439                                        oa->o_flags : 0);
1440         new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1441                                       cksum_type);
1442
1443         if (cksum_type != client_cksum_type)
1444                 msg = "the server did not use the checksum type specified in "
1445                       "the original request - likely a protocol problem";
1446         else if (new_cksum == server_cksum)
1447                 msg = "changed on the client after we checksummed it - "
1448                       "likely false positive due to mmap IO (bug 11742)";
1449         else if (new_cksum == client_cksum)
1450                 msg = "changed in transit before arrival at OST";
1451         else
1452                 msg = "changed in transit AND doesn't match the original - "
1453                       "likely false positive due to mmap IO (bug 11742)";
1454
1455         LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inode "DFID
1456                            " object "DOSTID" extent ["LPU64"-"LPU64"]\n",
1457                            msg, libcfs_nid2str(peer->nid),
1458                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1459                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1460                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1461                            POSTID(&oa->o_oi), pga[0]->off,
1462                            pga[page_count-1]->off + pga[page_count-1]->count - 1);
1463         CERROR("original client csum %x (type %x), server csum %x (type %x), "
1464                "client csum now %x\n", client_cksum, client_cksum_type,
1465                server_cksum, cksum_type, new_cksum);
1466         return 1;
1467 }
1468
1469 /* Note rc enters this function as number of bytes transferred */
1470 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1471 {
1472         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1473         const lnet_process_id_t *peer =
1474                         &req->rq_import->imp_connection->c_peer;
1475         struct client_obd *cli = aa->aa_cli;
1476         struct ost_body *body;
1477         __u32 client_cksum = 0;
1478         ENTRY;
1479
1480         if (rc < 0 && rc != -EDQUOT) {
1481                 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1482                 RETURN(rc);
1483         }
1484
1485         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1486         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1487         if (body == NULL) {
1488                 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1489                 RETURN(-EPROTO);
1490         }
1491
1492         /* set/clear over quota flag for a uid/gid */
1493         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1494             body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) {
1495                 unsigned int qid[MAXQUOTAS] = { body->oa.o_uid, body->oa.o_gid };
1496
1497                 CDEBUG(D_QUOTA, "setdq for [%u %u] with valid "LPX64", flags %x\n",
1498                        body->oa.o_uid, body->oa.o_gid, body->oa.o_valid,
1499                        body->oa.o_flags);
1500                 osc_quota_setdq(cli, qid, body->oa.o_valid, body->oa.o_flags);
1501         }
1502
1503         osc_update_grant(cli, body);
1504
1505         if (rc < 0)
1506                 RETURN(rc);
1507
1508         if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1509                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1510
1511         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1512                 if (rc > 0) {
1513                         CERROR("Unexpected +ve rc %d\n", rc);
1514                         RETURN(-EPROTO);
1515                 }
1516                 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1517
1518                 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1519                         RETURN(-EAGAIN);
1520
1521                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1522                     check_write_checksum(&body->oa, peer, client_cksum,
1523                                          body->oa.o_cksum, aa->aa_requested_nob,
1524                                          aa->aa_page_count, aa->aa_ppga,
1525                                          cksum_type_unpack(aa->aa_oa->o_flags)))
1526                         RETURN(-EAGAIN);
1527
1528                 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1529                                      aa->aa_page_count, aa->aa_ppga);
1530                 GOTO(out, rc);
1531         }
1532
1533         /* The rest of this function executes only for OST_READs */
1534
1535         /* if unwrap_bulk failed, return -EAGAIN to retry */
1536         rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1537         if (rc < 0)
1538                 GOTO(out, rc = -EAGAIN);
1539
1540         if (rc > aa->aa_requested_nob) {
1541                 CERROR("Unexpected rc %d (%d requested)\n", rc,
1542                        aa->aa_requested_nob);
1543                 RETURN(-EPROTO);
1544         }
1545
1546         if (rc != req->rq_bulk->bd_nob_transferred) {
1547                 CERROR ("Unexpected rc %d (%d transferred)\n",
1548                         rc, req->rq_bulk->bd_nob_transferred);
1549                 return (-EPROTO);
1550         }
1551
1552         if (rc < aa->aa_requested_nob)
1553                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1554
1555         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1556                 static int cksum_counter;
1557                 __u32      server_cksum = body->oa.o_cksum;
1558                 char      *via;
1559                 char      *router;
1560                 cksum_type_t cksum_type;
1561
1562                 cksum_type = cksum_type_unpack(body->oa.o_valid &OBD_MD_FLFLAGS?
1563                                                body->oa.o_flags : 0);
1564                 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1565                                                  aa->aa_ppga, OST_READ,
1566                                                  cksum_type);
1567
1568                 if (peer->nid == req->rq_bulk->bd_sender) {
1569                         via = router = "";
1570                 } else {
1571                         via = " via ";
1572                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
1573                 }
1574
1575                 if (server_cksum != client_cksum) {
1576                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1577                                            "%s%s%s inode "DFID" object "DOSTID
1578                                            " extent ["LPU64"-"LPU64"]\n",
1579                                            req->rq_import->imp_obd->obd_name,
1580                                            libcfs_nid2str(peer->nid),
1581                                            via, router,
1582                                            body->oa.o_valid & OBD_MD_FLFID ?
1583                                                 body->oa.o_parent_seq : (__u64)0,
1584                                            body->oa.o_valid & OBD_MD_FLFID ?
1585                                                 body->oa.o_parent_oid : 0,
1586                                            body->oa.o_valid & OBD_MD_FLFID ?
1587                                                 body->oa.o_parent_ver : 0,
1588                                            POSTID(&body->oa.o_oi),
1589                                            aa->aa_ppga[0]->off,
1590                                            aa->aa_ppga[aa->aa_page_count-1]->off +
1591                                            aa->aa_ppga[aa->aa_page_count-1]->count -
1592                                                                         1);
1593                         CERROR("client %x, server %x, cksum_type %x\n",
1594                                client_cksum, server_cksum, cksum_type);
1595                         cksum_counter = 0;
1596                         aa->aa_oa->o_cksum = client_cksum;
1597                         rc = -EAGAIN;
1598                 } else {
1599                         cksum_counter++;
1600                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1601                         rc = 0;
1602                 }
1603         } else if (unlikely(client_cksum)) {
1604                 static int cksum_missed;
1605
1606                 cksum_missed++;
1607                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1608                         CERROR("Checksum %u requested from %s but not sent\n",
1609                                cksum_missed, libcfs_nid2str(peer->nid));
1610         } else {
1611                 rc = 0;
1612         }
1613 out:
1614         if (rc >= 0)
1615                 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
1616                                      aa->aa_oa, &body->oa);
1617
1618         RETURN(rc);
1619 }
1620
1621 static int osc_brw_redo_request(struct ptlrpc_request *request,
1622                                 struct osc_brw_async_args *aa, int rc)
1623 {
1624         struct ptlrpc_request *new_req;
1625         struct osc_brw_async_args *new_aa;
1626         struct osc_async_page *oap;
1627         ENTRY;
1628
1629         DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
1630                   "redo for recoverable error %d", rc);
1631
1632         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1633                                         OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1634                                   aa->aa_cli, aa->aa_oa,
1635                                   NULL /* lsm unused by osc currently */,
1636                                   aa->aa_page_count, aa->aa_ppga,
1637                                   &new_req, aa->aa_ocapa, 0, 1);
1638         if (rc)
1639                 RETURN(rc);
1640
1641         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1642                 if (oap->oap_request != NULL) {
1643                         LASSERTF(request == oap->oap_request,
1644                                  "request %p != oap_request %p\n",
1645                                  request, oap->oap_request);
1646                         if (oap->oap_interrupted) {
1647                                 ptlrpc_req_finished(new_req);
1648                                 RETURN(-EINTR);
1649                         }
1650                 }
1651         }
1652         /* New request takes over pga and oaps from old request.
1653          * Note that copying a list_head doesn't work, need to move it... */
1654         aa->aa_resends++;
1655         new_req->rq_interpret_reply = request->rq_interpret_reply;
1656         new_req->rq_async_args = request->rq_async_args;
1657         new_req->rq_commit_cb = request->rq_commit_cb;
1658         /* cap resend delay to the current request timeout, this is similar to
1659          * what ptlrpc does (see after_reply()) */
1660         if (aa->aa_resends > new_req->rq_timeout)
1661                 new_req->rq_sent = cfs_time_current_sec() + new_req->rq_timeout;
1662         else
1663                 new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
1664         new_req->rq_generation_set = 1;
1665         new_req->rq_import_generation = request->rq_import_generation;
1666
1667         new_aa = ptlrpc_req_async_args(new_req);
1668
1669         INIT_LIST_HEAD(&new_aa->aa_oaps);
1670         list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
1671         INIT_LIST_HEAD(&new_aa->aa_exts);
1672         list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
1673         new_aa->aa_resends = aa->aa_resends;
1674
1675         list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1676                 if (oap->oap_request) {
1677                         ptlrpc_req_finished(oap->oap_request);
1678                         oap->oap_request = ptlrpc_request_addref(new_req);
1679                 }
1680         }
1681
1682         new_aa->aa_ocapa = aa->aa_ocapa;
1683         aa->aa_ocapa = NULL;
1684
1685         /* XXX: This code will run into problem if we're going to support
1686          * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
1687          * and wait for all of them to be finished. We should inherit request
1688          * set from old request. */
1689         ptlrpcd_add_req(new_req, PDL_POLICY_SAME, -1);
1690
1691         DEBUG_REQ(D_INFO, new_req, "new request");
1692         RETURN(0);
1693 }
1694
1695 /*
1696  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1697  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1698  * fine for our small page arrays and doesn't require allocation.  its an
1699  * insertion sort that swaps elements that are strides apart, shrinking the
1700  * stride down until its '1' and the array is sorted.
1701  */
1702 static void sort_brw_pages(struct brw_page **array, int num)
1703 {
1704         int stride, i, j;
1705         struct brw_page *tmp;
1706
1707         if (num == 1)
1708                 return;
1709         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1710                 ;
1711
1712         do {
1713                 stride /= 3;
1714                 for (i = stride ; i < num ; i++) {
1715                         tmp = array[i];
1716                         j = i;
1717                         while (j >= stride && array[j - stride]->off > tmp->off) {
1718                                 array[j] = array[j - stride];
1719                                 j -= stride;
1720                         }
1721                         array[j] = tmp;
1722                 }
1723         } while (stride > 1);
1724 }
1725
1726 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1727 {
1728         LASSERT(ppga != NULL);
1729         OBD_FREE(ppga, sizeof(*ppga) * count);
1730 }
1731
1732 static int brw_interpret(const struct lu_env *env,
1733                          struct ptlrpc_request *req, void *data, int rc)
1734 {
1735         struct osc_brw_async_args *aa = data;
1736         struct osc_extent *ext;
1737         struct osc_extent *tmp;
1738         struct client_obd *cli = aa->aa_cli;
1739         ENTRY;
1740
1741         rc = osc_brw_fini_request(req, rc);
1742         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
1743         /* When server return -EINPROGRESS, client should always retry
1744          * regardless of the number of times the bulk was resent already. */
1745         if (osc_recoverable_error(rc)) {
1746                 if (req->rq_import_generation !=
1747                     req->rq_import->imp_generation) {
1748                         CDEBUG(D_HA, "%s: resend cross eviction for object: "
1749                                ""DOSTID", rc = %d.\n",
1750                                req->rq_import->imp_obd->obd_name,
1751                                POSTID(&aa->aa_oa->o_oi), rc);
1752                 } else if (rc == -EINPROGRESS ||
1753                     client_should_resend(aa->aa_resends, aa->aa_cli)) {
1754                         rc = osc_brw_redo_request(req, aa, rc);
1755                 } else {
1756                         CERROR("%s: too many resent retries for object: "
1757                                ""LPU64":"LPU64", rc = %d.\n",
1758                                req->rq_import->imp_obd->obd_name,
1759                                POSTID(&aa->aa_oa->o_oi), rc);
1760                 }
1761
1762                 if (rc == 0)
1763                         RETURN(0);
1764                 else if (rc == -EAGAIN || rc == -EINPROGRESS)
1765                         rc = -EIO;
1766         }
1767
1768         if (aa->aa_ocapa) {
1769                 capa_put(aa->aa_ocapa);
1770                 aa->aa_ocapa = NULL;
1771         }
1772
1773         if (rc == 0) {
1774                 struct obdo *oa = aa->aa_oa;
1775                 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
1776                 unsigned long valid = 0;
1777                 struct cl_object *obj;
1778                 struct osc_async_page *last;
1779
1780                 last = brw_page2oap(aa->aa_ppga[aa->aa_page_count - 1]);
1781                 obj = osc2cl(last->oap_obj);
1782
1783                 cl_object_attr_lock(obj);
1784                 if (oa->o_valid & OBD_MD_FLBLOCKS) {
1785                         attr->cat_blocks = oa->o_blocks;
1786                         valid |= CAT_BLOCKS;
1787                 }
1788                 if (oa->o_valid & OBD_MD_FLMTIME) {
1789                         attr->cat_mtime = oa->o_mtime;
1790                         valid |= CAT_MTIME;
1791                 }
1792                 if (oa->o_valid & OBD_MD_FLATIME) {
1793                         attr->cat_atime = oa->o_atime;
1794                         valid |= CAT_ATIME;
1795                 }
1796                 if (oa->o_valid & OBD_MD_FLCTIME) {
1797                         attr->cat_ctime = oa->o_ctime;
1798                         valid |= CAT_CTIME;
1799                 }
1800
1801                 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1802                         struct lov_oinfo *loi = cl2osc(obj)->oo_oinfo;
1803                         loff_t last_off = last->oap_count + last->oap_obj_off +
1804                                 last->oap_page_off;
1805
1806                         /* Change file size if this is an out of quota or
1807                          * direct IO write and it extends the file size */
1808                         if (loi->loi_lvb.lvb_size < last_off) {
1809                                 attr->cat_size = last_off;
1810                                 valid |= CAT_SIZE;
1811                         }
1812                         /* Extend KMS if it's not a lockless write */
1813                         if (loi->loi_kms < last_off &&
1814                             oap2osc_page(last)->ops_srvlock == 0) {
1815                                 attr->cat_kms = last_off;
1816                                 valid |= CAT_KMS;
1817                         }
1818                 }
1819
1820                 if (valid != 0)
1821                         cl_object_attr_set(env, obj, attr, valid);
1822                 cl_object_attr_unlock(obj);
1823         }
1824         OBDO_FREE(aa->aa_oa);
1825
1826         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE && rc == 0)
1827                 osc_inc_unstable_pages(req);
1828
1829         list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
1830                 list_del_init(&ext->oe_link);
1831                 osc_extent_finish(env, ext, 1, rc);
1832         }
1833         LASSERT(list_empty(&aa->aa_exts));
1834         LASSERT(list_empty(&aa->aa_oaps));
1835
1836         cl_req_completion(env, aa->aa_clerq, rc < 0 ? rc :
1837                           req->rq_bulk->bd_nob_transferred);
1838         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1839         ptlrpc_lprocfs_brw(req, req->rq_bulk->bd_nob_transferred);
1840
1841         client_obd_list_lock(&cli->cl_loi_list_lock);
1842         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
1843          * is called so we know whether to go to sync BRWs or wait for more
1844          * RPCs to complete */
1845         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
1846                 cli->cl_w_in_flight--;
1847         else
1848                 cli->cl_r_in_flight--;
1849         osc_wake_cache_waiters(cli);
1850         client_obd_list_unlock(&cli->cl_loi_list_lock);
1851
1852         osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
1853         RETURN(rc);
1854 }
1855
1856 static void brw_commit(struct ptlrpc_request *req)
1857 {
1858         /* If osc_inc_unstable_pages (via osc_extent_finish) races with
1859          * this called via the rq_commit_cb, I need to ensure
1860          * osc_dec_unstable_pages is still called. Otherwise unstable
1861          * pages may be leaked. */
1862         spin_lock(&req->rq_lock);
1863         if (likely(req->rq_unstable)) {
1864                 req->rq_unstable = 0;
1865                 spin_unlock(&req->rq_lock);
1866
1867                 osc_dec_unstable_pages(req);
1868         } else {
1869                 req->rq_committed = 1;
1870                 spin_unlock(&req->rq_lock);
1871         }
1872 }
1873
1874 /**
1875  * Build an RPC by the list of extent @ext_list. The caller must ensure
1876  * that the total pages in this list are NOT over max pages per RPC.
1877  * Extents in the list must be in OES_RPC state.
1878  */
1879 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
1880                   struct list_head *ext_list, int cmd, pdl_policy_t pol)
1881 {
1882         struct ptlrpc_request           *req = NULL;
1883         struct osc_extent               *ext;
1884         struct brw_page                 **pga = NULL;
1885         struct osc_brw_async_args       *aa = NULL;
1886         struct obdo                     *oa = NULL;
1887         struct osc_async_page           *oap;
1888         struct osc_async_page           *tmp;
1889         struct cl_req                   *clerq = NULL;
1890         enum cl_req_type                crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE :
1891                                                                       CRT_READ;
1892         struct cl_req_attr              *crattr = NULL;
1893         obd_off                         starting_offset = OBD_OBJECT_EOF;
1894         obd_off                         ending_offset = 0;
1895         int                             mpflag = 0;
1896         int                             mem_tight = 0;
1897         int                             page_count = 0;
1898         bool                            soft_sync = false;
1899         int                             i;
1900         int                             rc;
1901         struct list_head                rpc_list = LIST_HEAD_INIT(rpc_list);
1902
1903         ENTRY;
1904         LASSERT(!list_empty(ext_list));
1905
1906         /* add pages into rpc_list to build BRW rpc */
1907         list_for_each_entry(ext, ext_list, oe_link) {
1908                 LASSERT(ext->oe_state == OES_RPC);
1909                 mem_tight |= ext->oe_memalloc;
1910                 list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
1911                         ++page_count;
1912                         list_add_tail(&oap->oap_rpc_item, &rpc_list);
1913                         if (starting_offset > oap->oap_obj_off)
1914                                 starting_offset = oap->oap_obj_off;
1915                         else
1916                                 LASSERT(oap->oap_page_off == 0);
1917                         if (ending_offset < oap->oap_obj_off + oap->oap_count)
1918                                 ending_offset = oap->oap_obj_off +
1919                                                 oap->oap_count;
1920                         else
1921                                 LASSERT(oap->oap_page_off + oap->oap_count ==
1922                                         PAGE_CACHE_SIZE);
1923                 }
1924         }
1925
1926         soft_sync = osc_over_unstable_soft_limit(cli);
1927         if (mem_tight)
1928                 mpflag = cfs_memory_pressure_get_and_set();
1929
1930         OBD_ALLOC(crattr, sizeof(*crattr));
1931         if (crattr == NULL)
1932                 GOTO(out, rc = -ENOMEM);
1933
1934         OBD_ALLOC(pga, sizeof(*pga) * page_count);
1935         if (pga == NULL)
1936                 GOTO(out, rc = -ENOMEM);
1937
1938         OBDO_ALLOC(oa);
1939         if (oa == NULL)
1940                 GOTO(out, rc = -ENOMEM);
1941
1942         i = 0;
1943         list_for_each_entry(oap, &rpc_list, oap_rpc_item) {
1944                 struct cl_page *page = oap2cl_page(oap);
1945                 if (clerq == NULL) {
1946                         clerq = cl_req_alloc(env, page, crt,
1947                                              1 /* only 1-object rpcs for now */);
1948                         if (IS_ERR(clerq))
1949                                 GOTO(out, rc = PTR_ERR(clerq));
1950                 }
1951                 if (mem_tight)
1952                         oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
1953                 if (soft_sync)
1954                         oap->oap_brw_flags |= OBD_BRW_SOFT_SYNC;
1955                 pga[i] = &oap->oap_brw_page;
1956                 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
1957                 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
1958                        pga[i]->pg, page_index(oap->oap_page), oap,
1959                        pga[i]->flag);
1960                 i++;
1961                 cl_req_page_add(env, clerq, page);
1962         }
1963
1964         /* always get the data for the obdo for the rpc */
1965         LASSERT(clerq != NULL);
1966         crattr->cra_oa = oa;
1967         cl_req_attr_set(env, clerq, crattr, ~0ULL);
1968
1969         rc = cl_req_prep(env, clerq);
1970         if (rc != 0) {
1971                 CERROR("cl_req_prep failed: %d\n", rc);
1972                 GOTO(out, rc);
1973         }
1974
1975         sort_brw_pages(pga, page_count);
1976         rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
1977                         pga, &req, crattr->cra_capa, 1, 0);
1978         if (rc != 0) {
1979                 CERROR("prep_req failed: %d\n", rc);
1980                 GOTO(out, rc);
1981         }
1982
1983         req->rq_commit_cb = brw_commit;
1984         req->rq_interpret_reply = brw_interpret;
1985
1986         if (mem_tight != 0)
1987                 req->rq_memalloc = 1;
1988
1989         /* Need to update the timestamps after the request is built in case
1990          * we race with setattr (locally or in queue at OST).  If OST gets
1991          * later setattr before earlier BRW (as determined by the request xid),
1992          * the OST will not use BRW timestamps.  Sadly, there is no obvious
1993          * way to do this in a single call.  bug 10150 */
1994         cl_req_attr_set(env, clerq, crattr,
1995                         OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME);
1996
1997         lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid);
1998
1999         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2000         aa = ptlrpc_req_async_args(req);
2001         INIT_LIST_HEAD(&aa->aa_oaps);
2002         list_splice_init(&rpc_list, &aa->aa_oaps);
2003         INIT_LIST_HEAD(&aa->aa_exts);
2004         list_splice_init(ext_list, &aa->aa_exts);
2005         aa->aa_clerq = clerq;
2006
2007         /* queued sync pages can be torn down while the pages
2008          * were between the pending list and the rpc */
2009         tmp = NULL;
2010         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2011                 /* only one oap gets a request reference */
2012                 if (tmp == NULL)
2013                         tmp = oap;
2014                 if (oap->oap_interrupted && !req->rq_intr) {
2015                         CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2016                                         oap, req);
2017                         ptlrpc_mark_interrupted(req);
2018                 }
2019         }
2020         if (tmp != NULL)
2021                 tmp->oap_request = ptlrpc_request_addref(req);
2022
2023         client_obd_list_lock(&cli->cl_loi_list_lock);
2024         starting_offset >>= PAGE_CACHE_SHIFT;
2025         if (cmd == OBD_BRW_READ) {
2026                 cli->cl_r_in_flight++;
2027                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2028                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2029                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2030                                       starting_offset + 1);
2031         } else {
2032                 cli->cl_w_in_flight++;
2033                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2034                 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
2035                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2036                                       starting_offset + 1);
2037         }
2038         client_obd_list_unlock(&cli->cl_loi_list_lock);
2039
2040         DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %ur/%uw in flight",
2041                   page_count, aa, cli->cl_r_in_flight,
2042                   cli->cl_w_in_flight);
2043
2044         /* XXX: Maybe the caller can check the RPC bulk descriptor to
2045          * see which CPU/NUMA node the majority of pages were allocated
2046          * on, and try to assign the async RPC to the CPU core
2047          * (PDL_POLICY_PREFERRED) to reduce cross-CPU memory traffic.
2048          *
2049          * But on the other hand, we expect that multiple ptlrpcd
2050          * threads and the initial write sponsor can run in parallel,
2051          * especially when data checksum is enabled, which is CPU-bound
2052          * operation and single ptlrpcd thread cannot process in time.
2053          * So more ptlrpcd threads sharing BRW load
2054          * (with PDL_POLICY_ROUND) seems better.
2055          */
2056         ptlrpcd_add_req(req, pol, -1);
2057         rc = 0;
2058         EXIT;
2059
2060 out:
2061         if (mem_tight != 0)
2062                 cfs_memory_pressure_restore(mpflag);
2063
2064         if (crattr != NULL) {
2065                 capa_put(crattr->cra_capa);
2066                 OBD_FREE(crattr, sizeof(*crattr));
2067         }
2068
2069         if (rc != 0) {
2070                 LASSERT(req == NULL);
2071
2072                 if (oa)
2073                         OBDO_FREE(oa);
2074                 if (pga)
2075                         OBD_FREE(pga, sizeof(*pga) * page_count);
2076                 /* this should happen rarely and is pretty bad, it makes the
2077                  * pending list not follow the dirty order */
2078                 while (!list_empty(ext_list)) {
2079                         ext = list_entry(ext_list->next, struct osc_extent,
2080                                          oe_link);
2081                         list_del_init(&ext->oe_link);
2082                         osc_extent_finish(env, ext, 0, rc);
2083                 }
2084                 if (clerq && !IS_ERR(clerq))
2085                         cl_req_completion(env, clerq, rc);
2086         }
2087         RETURN(rc);
2088 }
2089
2090 static int osc_set_lock_data_with_check(struct ldlm_lock *lock,
2091                                         struct ldlm_enqueue_info *einfo)
2092 {
2093         void *data = einfo->ei_cbdata;
2094         int set = 0;
2095
2096         LASSERT(lock != NULL);
2097         LASSERT(lock->l_blocking_ast == einfo->ei_cb_bl);
2098         LASSERT(lock->l_resource->lr_type == einfo->ei_type);
2099         LASSERT(lock->l_completion_ast == einfo->ei_cb_cp);
2100         LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl);
2101
2102         lock_res_and_lock(lock);
2103         spin_lock(&osc_ast_guard);
2104
2105         if (lock->l_ast_data == NULL)
2106                 lock->l_ast_data = data;
2107         if (lock->l_ast_data == data)
2108                 set = 1;
2109
2110         spin_unlock(&osc_ast_guard);
2111         unlock_res_and_lock(lock);
2112
2113         return set;
2114 }
2115
2116 static int osc_set_data_with_check(struct lustre_handle *lockh,
2117                                    struct ldlm_enqueue_info *einfo)
2118 {
2119         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2120         int set = 0;
2121
2122         if (lock != NULL) {
2123                 set = osc_set_lock_data_with_check(lock, einfo);
2124                 LDLM_LOCK_PUT(lock);
2125         } else
2126                 CERROR("lockh %p, data %p - client evicted?\n",
2127                        lockh, einfo->ei_cbdata);
2128         return set;
2129 }
2130
2131 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2132                              ldlm_iterator_t replace, void *data)
2133 {
2134         struct ldlm_res_id res_id;
2135         struct obd_device *obd = class_exp2obd(exp);
2136
2137         ostid_build_res_name(&lsm->lsm_oi, &res_id);
2138         ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2139         return 0;
2140 }
2141
2142 /* find any ldlm lock of the inode in osc
2143  * return 0    not find
2144  *        1    find one
2145  *      < 0    error */
2146 static int osc_find_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2147                            ldlm_iterator_t replace, void *data)
2148 {
2149         struct ldlm_res_id res_id;
2150         struct obd_device *obd = class_exp2obd(exp);
2151         int rc = 0;
2152
2153         ostid_build_res_name(&lsm->lsm_oi, &res_id);
2154         rc = ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2155         if (rc == LDLM_ITER_STOP)
2156                 return(1);
2157         if (rc == LDLM_ITER_CONTINUE)
2158                 return(0);
2159         return(rc);
2160 }
2161
2162 static int osc_enqueue_fini(struct ptlrpc_request *req, struct ost_lvb *lvb,
2163                             obd_enqueue_update_f upcall, void *cookie,
2164                             __u64 *flags, int agl, int rc)
2165 {
2166         int intent = *flags & LDLM_FL_HAS_INTENT;
2167         ENTRY;
2168
2169         if (intent) {
2170                 /* The request was created before ldlm_cli_enqueue call. */
2171                 if (rc == ELDLM_LOCK_ABORTED) {
2172                         struct ldlm_reply *rep;
2173                         rep = req_capsule_server_get(&req->rq_pill,
2174                                                      &RMF_DLM_REP);
2175
2176                         LASSERT(rep != NULL);
2177                         rep->lock_policy_res1 =
2178                                 ptlrpc_status_ntoh(rep->lock_policy_res1);
2179                         if (rep->lock_policy_res1)
2180                                 rc = rep->lock_policy_res1;
2181                 }
2182         }
2183
2184         if ((intent != 0 && rc == ELDLM_LOCK_ABORTED && agl == 0) ||
2185             (rc == 0)) {
2186                 *flags |= LDLM_FL_LVB_READY;
2187                 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
2188                        lvb->lvb_size, lvb->lvb_blocks, lvb->lvb_mtime);
2189         }
2190
2191         /* Call the update callback. */
2192         rc = (*upcall)(cookie, rc);
2193         RETURN(rc);
2194 }
2195
2196 static int osc_enqueue_interpret(const struct lu_env *env,
2197                                  struct ptlrpc_request *req,
2198                                  struct osc_enqueue_args *aa, int rc)
2199 {
2200         struct ldlm_lock *lock;
2201         struct lustre_handle handle;
2202         __u32 mode;
2203         struct ost_lvb *lvb;
2204         __u32 lvb_len;
2205         __u64 *flags = aa->oa_flags;
2206
2207         /* Make a local copy of a lock handle and a mode, because aa->oa_*
2208          * might be freed anytime after lock upcall has been called. */
2209         lustre_handle_copy(&handle, aa->oa_lockh);
2210         mode = aa->oa_ei->ei_mode;
2211
2212         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2213          * be valid. */
2214         lock = ldlm_handle2lock(&handle);
2215
2216         /* Take an additional reference so that a blocking AST that
2217          * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2218          * to arrive after an upcall has been executed by
2219          * osc_enqueue_fini(). */
2220         ldlm_lock_addref(&handle, mode);
2221
2222         /* Let cl_lock_state_wait fail with -ERESTARTSYS to unuse sublocks. */
2223         OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_HANG, 2);
2224
2225         /* Let CP AST to grant the lock first. */
2226         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
2227
2228         if (aa->oa_agl && rc == ELDLM_LOCK_ABORTED) {
2229                 lvb = NULL;
2230                 lvb_len = 0;
2231         } else {
2232                 lvb = aa->oa_lvb;
2233                 lvb_len = sizeof(*aa->oa_lvb);
2234         }
2235
2236         /* Complete obtaining the lock procedure. */
2237         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
2238                                    mode, flags, lvb, lvb_len, &handle, rc);
2239         /* Complete osc stuff. */
2240         rc = osc_enqueue_fini(req, aa->oa_lvb, aa->oa_upcall, aa->oa_cookie,
2241                               flags, aa->oa_agl, rc);
2242
2243         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
2244
2245         /* Release the lock for async request. */
2246         if (lustre_handle_is_used(&handle) && rc == ELDLM_OK)
2247                 /*
2248                  * Releases a reference taken by ldlm_cli_enqueue(), if it is
2249                  * not already released by
2250                  * ldlm_cli_enqueue_fini()->failed_lock_cleanup()
2251                  */
2252                 ldlm_lock_decref(&handle, mode);
2253
2254         LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
2255                  aa->oa_lockh, req, aa);
2256         ldlm_lock_decref(&handle, mode);
2257         LDLM_LOCK_PUT(lock);
2258         return rc;
2259 }
2260
2261 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
2262
2263 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2264  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2265  * other synchronous requests, however keeping some locks and trying to obtain
2266  * others may take a considerable amount of time in a case of ost failure; and
2267  * when other sync requests do not get released lock from a client, the client
2268  * is excluded from the cluster -- such scenarious make the life difficult, so
2269  * release locks just after they are obtained. */
2270 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2271                      __u64 *flags, ldlm_policy_data_t *policy,
2272                      struct ost_lvb *lvb, int kms_valid,
2273                      obd_enqueue_update_f upcall, void *cookie,
2274                      struct ldlm_enqueue_info *einfo,
2275                      struct lustre_handle *lockh,
2276                      struct ptlrpc_request_set *rqset, int async, int agl)
2277 {
2278         struct obd_device *obd = exp->exp_obd;
2279         struct ptlrpc_request *req = NULL;
2280         int intent = *flags & LDLM_FL_HAS_INTENT;
2281         __u64 match_lvb = (agl != 0 ? 0 : LDLM_FL_LVB_READY);
2282         ldlm_mode_t mode;
2283         int rc;
2284         ENTRY;
2285
2286         /* Filesystem lock extents are extended to page boundaries so that
2287          * dealing with the page cache is a little smoother.  */
2288         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2289         policy->l_extent.end |= ~CFS_PAGE_MASK;
2290
2291         /*
2292          * kms is not valid when either object is completely fresh (so that no
2293          * locks are cached), or object was evicted. In the latter case cached
2294          * lock cannot be used, because it would prime inode state with
2295          * potentially stale LVB.
2296          */
2297         if (!kms_valid)
2298                 goto no_match;
2299
2300         /* Next, search for already existing extent locks that will cover us */
2301         /* If we're trying to read, we also search for an existing PW lock.  The
2302          * VFS and page cache already protect us locally, so lots of readers/
2303          * writers can share a single PW lock.
2304          *
2305          * There are problems with conversion deadlocks, so instead of
2306          * converting a read lock to a write lock, we'll just enqueue a new
2307          * one.
2308          *
2309          * At some point we should cancel the read lock instead of making them
2310          * send us a blocking callback, but there are problems with canceling
2311          * locks out from other users right now, too. */
2312         mode = einfo->ei_mode;
2313         if (einfo->ei_mode == LCK_PR)
2314                 mode |= LCK_PW;
2315         mode = ldlm_lock_match(obd->obd_namespace, *flags | match_lvb, res_id,
2316                                einfo->ei_type, policy, mode, lockh, 0);
2317         if (mode) {
2318                 struct ldlm_lock *matched = ldlm_handle2lock(lockh);
2319
2320                 if ((agl != 0) && !ldlm_is_lvb_ready(matched)) {
2321                         /* For AGL, if enqueue RPC is sent but the lock is not
2322                          * granted, then skip to process this strpe.
2323                          * Return -ECANCELED to tell the caller. */
2324                         ldlm_lock_decref(lockh, mode);
2325                         LDLM_LOCK_PUT(matched);
2326                         RETURN(-ECANCELED);
2327                 } else if (osc_set_lock_data_with_check(matched, einfo)) {
2328                         *flags |= LDLM_FL_LVB_READY;
2329                         /* addref the lock only if not async requests and PW
2330                          * lock is matched whereas we asked for PR. */
2331                         if (!rqset && einfo->ei_mode != mode)
2332                                 ldlm_lock_addref(lockh, LCK_PR);
2333                         if (intent) {
2334                                 /* I would like to be able to ASSERT here that
2335                                  * rss <= kms, but I can't, for reasons which
2336                                  * are explained in lov_enqueue() */
2337                         }
2338
2339                         /* We already have a lock, and it's referenced.
2340                          *
2341                          * At this point, the cl_lock::cll_state is CLS_QUEUING,
2342                          * AGL upcall may change it to CLS_HELD directly. */
2343                         (*upcall)(cookie, ELDLM_OK);
2344
2345                         if (einfo->ei_mode != mode)
2346                                 ldlm_lock_decref(lockh, LCK_PW);
2347                         else if (rqset)
2348                                 /* For async requests, decref the lock. */
2349                                 ldlm_lock_decref(lockh, einfo->ei_mode);
2350                         LDLM_LOCK_PUT(matched);
2351                         RETURN(ELDLM_OK);
2352                 } else {
2353                         ldlm_lock_decref(lockh, mode);
2354                         LDLM_LOCK_PUT(matched);
2355                 }
2356         }
2357
2358  no_match:
2359         if (intent) {
2360                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2361                                            &RQF_LDLM_ENQUEUE_LVB);
2362                 if (req == NULL)
2363                         RETURN(-ENOMEM);
2364
2365                 rc = ptlrpc_request_pack(req, LUSTRE_DLM_VERSION, LDLM_ENQUEUE);
2366                 if (rc < 0) {
2367                         ptlrpc_request_free(req);
2368                         RETURN(rc);
2369                 }
2370
2371                 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
2372                                      sizeof *lvb);
2373                 ptlrpc_request_set_replen(req);
2374         }
2375
2376         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2377         *flags &= ~LDLM_FL_BLOCK_GRANTED;
2378
2379         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2380                               sizeof(*lvb), LVB_T_OST, lockh, async);
2381         if (rqset) {
2382                 if (!rc) {
2383                         struct osc_enqueue_args *aa;
2384                         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2385                         aa = ptlrpc_req_async_args(req);
2386                         aa->oa_ei = einfo;
2387                         aa->oa_exp = exp;
2388                         aa->oa_flags  = flags;
2389                         aa->oa_upcall = upcall;
2390                         aa->oa_cookie = cookie;
2391                         aa->oa_lvb    = lvb;
2392                         aa->oa_lockh  = lockh;
2393                         aa->oa_agl    = !!agl;
2394
2395                         req->rq_interpret_reply =
2396                                 (ptlrpc_interpterer_t)osc_enqueue_interpret;
2397                         if (rqset == PTLRPCD_SET)
2398                                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
2399                         else
2400                                 ptlrpc_set_add_req(rqset, req);
2401                 } else if (intent) {
2402                         ptlrpc_req_finished(req);
2403                 }
2404                 RETURN(rc);
2405         }
2406
2407         rc = osc_enqueue_fini(req, lvb, upcall, cookie, flags, agl, rc);
2408         if (intent)
2409                 ptlrpc_req_finished(req);
2410
2411         RETURN(rc);
2412 }
2413
2414 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2415                    __u32 type, ldlm_policy_data_t *policy, __u32 mode,
2416                    __u64 *flags, void *data, struct lustre_handle *lockh,
2417                    int unref)
2418 {
2419         struct obd_device *obd = exp->exp_obd;
2420         __u64 lflags = *flags;
2421         ldlm_mode_t rc;
2422         ENTRY;
2423
2424         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2425                 RETURN(-EIO);
2426
2427         /* Filesystem lock extents are extended to page boundaries so that
2428          * dealing with the page cache is a little smoother */
2429         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2430         policy->l_extent.end |= ~CFS_PAGE_MASK;
2431
2432         /* Next, search for already existing extent locks that will cover us */
2433         /* If we're trying to read, we also search for an existing PW lock.  The
2434          * VFS and page cache already protect us locally, so lots of readers/
2435          * writers can share a single PW lock. */
2436         rc = mode;
2437         if (mode == LCK_PR)
2438                 rc |= LCK_PW;
2439         rc = ldlm_lock_match(obd->obd_namespace, lflags,
2440                              res_id, type, policy, rc, lockh, unref);
2441         if (rc) {
2442                 if (data != NULL) {
2443                         if (!osc_set_data_with_check(lockh, data)) {
2444                                 if (!(lflags & LDLM_FL_TEST_LOCK))
2445                                         ldlm_lock_decref(lockh, rc);
2446                                 RETURN(0);
2447                         }
2448                 }
2449                 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
2450                         ldlm_lock_addref(lockh, LCK_PR);
2451                         ldlm_lock_decref(lockh, LCK_PW);
2452                 }
2453                 RETURN(rc);
2454         }
2455         RETURN(rc);
2456 }
2457
2458 int osc_cancel_base(struct lustre_handle *lockh, __u32 mode)
2459 {
2460         ENTRY;
2461
2462         if (unlikely(mode == LCK_GROUP))
2463                 ldlm_lock_decref_and_cancel(lockh, mode);
2464         else
2465                 ldlm_lock_decref(lockh, mode);
2466
2467         RETURN(0);
2468 }
2469
2470 static int osc_statfs_interpret(const struct lu_env *env,
2471                                 struct ptlrpc_request *req,
2472                                 struct osc_async_args *aa, int rc)
2473 {
2474         struct obd_statfs *msfs;
2475         ENTRY;
2476
2477         if (rc == -EBADR)
2478                 /* The request has in fact never been sent
2479                  * due to issues at a higher level (LOV).
2480                  * Exit immediately since the caller is
2481                  * aware of the problem and takes care
2482                  * of the clean up */
2483                  RETURN(rc);
2484
2485         if ((rc == -ENOTCONN || rc == -EAGAIN) &&
2486             (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
2487                 GOTO(out, rc = 0);
2488
2489         if (rc != 0)
2490                 GOTO(out, rc);
2491
2492         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2493         if (msfs == NULL) {
2494                 GOTO(out, rc = -EPROTO);
2495         }
2496
2497         *aa->aa_oi->oi_osfs = *msfs;
2498 out:
2499         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2500         RETURN(rc);
2501 }
2502
2503 static int osc_statfs_async(struct obd_export *exp,
2504                             struct obd_info *oinfo, __u64 max_age,
2505                             struct ptlrpc_request_set *rqset)
2506 {
2507         struct obd_device     *obd = class_exp2obd(exp);
2508         struct ptlrpc_request *req;
2509         struct osc_async_args *aa;
2510         int                    rc;
2511         ENTRY;
2512
2513         /* We could possibly pass max_age in the request (as an absolute
2514          * timestamp or a "seconds.usec ago") so the target can avoid doing
2515          * extra calls into the filesystem if that isn't necessary (e.g.
2516          * during mount that would help a bit).  Having relative timestamps
2517          * is not so great if request processing is slow, while absolute
2518          * timestamps are not ideal because they need time synchronization. */
2519         req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
2520         if (req == NULL)
2521                 RETURN(-ENOMEM);
2522
2523         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2524         if (rc) {
2525                 ptlrpc_request_free(req);
2526                 RETURN(rc);
2527         }
2528         ptlrpc_request_set_replen(req);
2529         req->rq_request_portal = OST_CREATE_PORTAL;
2530         ptlrpc_at_set_req_timeout(req);
2531
2532         if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
2533                 /* procfs requests not want stat in wait for avoid deadlock */
2534                 req->rq_no_resend = 1;
2535                 req->rq_no_delay = 1;
2536         }
2537
2538         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
2539         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2540         aa = ptlrpc_req_async_args(req);
2541         aa->aa_oi = oinfo;
2542
2543         ptlrpc_set_add_req(rqset, req);
2544         RETURN(0);
2545 }
2546
2547 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
2548                       struct obd_statfs *osfs, __u64 max_age, __u32 flags)
2549 {
2550         struct obd_device     *obd = class_exp2obd(exp);
2551         struct obd_statfs     *msfs;
2552         struct ptlrpc_request *req;
2553         struct obd_import     *imp = NULL;
2554         int rc;
2555         ENTRY;
2556
2557         /*Since the request might also come from lprocfs, so we need
2558          *sync this with client_disconnect_export Bug15684*/
2559         down_read(&obd->u.cli.cl_sem);
2560         if (obd->u.cli.cl_import)
2561                 imp = class_import_get(obd->u.cli.cl_import);
2562         up_read(&obd->u.cli.cl_sem);
2563         if (!imp)
2564                 RETURN(-ENODEV);
2565
2566         /* We could possibly pass max_age in the request (as an absolute
2567          * timestamp or a "seconds.usec ago") so the target can avoid doing
2568          * extra calls into the filesystem if that isn't necessary (e.g.
2569          * during mount that would help a bit).  Having relative timestamps
2570          * is not so great if request processing is slow, while absolute
2571          * timestamps are not ideal because they need time synchronization. */
2572         req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
2573
2574         class_import_put(imp);
2575
2576         if (req == NULL)
2577                 RETURN(-ENOMEM);
2578
2579         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2580         if (rc) {
2581                 ptlrpc_request_free(req);
2582                 RETURN(rc);
2583         }
2584         ptlrpc_request_set_replen(req);
2585         req->rq_request_portal = OST_CREATE_PORTAL;
2586         ptlrpc_at_set_req_timeout(req);
2587
2588         if (flags & OBD_STATFS_NODELAY) {
2589                 /* procfs requests not want stat in wait for avoid deadlock */
2590                 req->rq_no_resend = 1;
2591                 req->rq_no_delay = 1;
2592         }
2593
2594         rc = ptlrpc_queue_wait(req);
2595         if (rc)
2596                 GOTO(out, rc);
2597
2598         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2599         if (msfs == NULL) {
2600                 GOTO(out, rc = -EPROTO);
2601         }
2602
2603         *osfs = *msfs;
2604
2605         EXIT;
2606  out:
2607         ptlrpc_req_finished(req);
2608         return rc;
2609 }
2610
2611 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2612                          void *karg, void *uarg)
2613 {
2614         struct obd_device *obd = exp->exp_obd;
2615         struct obd_ioctl_data *data = karg;
2616         int err = 0;
2617         ENTRY;
2618
2619         if (!try_module_get(THIS_MODULE)) {
2620                 CERROR("%s: cannot get module '%s'\n", obd->obd_name,
2621                        module_name(THIS_MODULE));
2622                 return -EINVAL;
2623         }
2624         switch (cmd) {
2625         case OBD_IOC_CLIENT_RECOVER:
2626                 err = ptlrpc_recover_import(obd->u.cli.cl_import,
2627                                             data->ioc_inlbuf1, 0);
2628                 if (err > 0)
2629                         err = 0;
2630                 GOTO(out, err);
2631         case IOC_OSC_SET_ACTIVE:
2632                 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
2633                                                data->ioc_offset);
2634                 GOTO(out, err);
2635         case OBD_IOC_POLL_QUOTACHECK:
2636                 err = osc_quota_poll_check(exp, (struct if_quotacheck *)karg);
2637                 GOTO(out, err);
2638         case OBD_IOC_PING_TARGET:
2639                 err = ptlrpc_obd_ping(obd);
2640                 GOTO(out, err);
2641         default:
2642                 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
2643                        cmd, current_comm());
2644                 GOTO(out, err = -ENOTTY);
2645         }
2646 out:
2647         module_put(THIS_MODULE);
2648         return err;
2649 }
2650
2651 static int osc_get_info(const struct lu_env *env, struct obd_export *exp,
2652                         obd_count keylen, void *key, __u32 *vallen, void *val,
2653                         struct lov_stripe_md *lsm)
2654 {
2655         ENTRY;
2656         if (!vallen || !val)
2657                 RETURN(-EFAULT);
2658
2659         if (KEY_IS(KEY_LOCK_TO_STRIPE)) {
2660                 __u32 *stripe = val;
2661                 *vallen = sizeof(*stripe);
2662                 *stripe = 0;
2663                 RETURN(0);
2664         } else if (KEY_IS(KEY_LAST_ID)) {
2665                 struct ptlrpc_request *req;
2666                 obd_id                *reply;
2667                 char                  *tmp;
2668                 int                    rc;
2669
2670                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2671                                            &RQF_OST_GET_INFO_LAST_ID);
2672                 if (req == NULL)
2673                         RETURN(-ENOMEM);
2674
2675                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
2676                                      RCL_CLIENT, keylen);
2677                 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
2678                 if (rc) {
2679                         ptlrpc_request_free(req);
2680                         RETURN(rc);
2681                 }
2682
2683                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
2684                 memcpy(tmp, key, keylen);
2685
2686                 req->rq_no_delay = req->rq_no_resend = 1;
2687                 ptlrpc_request_set_replen(req);
2688                 rc = ptlrpc_queue_wait(req);
2689                 if (rc)
2690                         GOTO(out, rc);
2691
2692                 reply = req_capsule_server_get(&req->rq_pill, &RMF_OBD_ID);
2693                 if (reply == NULL)
2694                         GOTO(out, rc = -EPROTO);
2695
2696                 *((obd_id *)val) = *reply;
2697         out:
2698                 ptlrpc_req_finished(req);
2699                 RETURN(rc);
2700         } else if (KEY_IS(KEY_FIEMAP)) {
2701                 struct ll_fiemap_info_key *fm_key =
2702                                 (struct ll_fiemap_info_key *)key;
2703                 struct ldlm_res_id       res_id;
2704                 ldlm_policy_data_t       policy;
2705                 struct lustre_handle     lockh;
2706                 ldlm_mode_t              mode = 0;
2707                 struct ptlrpc_request   *req;
2708                 struct ll_user_fiemap   *reply;
2709                 char                    *tmp;
2710                 int                      rc;
2711
2712                 if (!(fm_key->fiemap.fm_flags & FIEMAP_FLAG_SYNC))
2713                         goto skip_locking;
2714
2715                 policy.l_extent.start = fm_key->fiemap.fm_start &
2716                                                 CFS_PAGE_MASK;
2717
2718                 if (OBD_OBJECT_EOF - fm_key->fiemap.fm_length <=
2719                     fm_key->fiemap.fm_start + PAGE_CACHE_SIZE - 1)
2720                         policy.l_extent.end = OBD_OBJECT_EOF;
2721                 else
2722                         policy.l_extent.end = (fm_key->fiemap.fm_start +
2723                                 fm_key->fiemap.fm_length +
2724                                 PAGE_CACHE_SIZE - 1) & CFS_PAGE_MASK;
2725
2726                 ostid_build_res_name(&fm_key->oa.o_oi, &res_id);
2727                 mode = ldlm_lock_match(exp->exp_obd->obd_namespace,
2728                                        LDLM_FL_BLOCK_GRANTED |
2729                                        LDLM_FL_LVB_READY,
2730                                        &res_id, LDLM_EXTENT, &policy,
2731                                        LCK_PR | LCK_PW, &lockh, 0);
2732                 if (mode) { /* lock is cached on client */
2733                         if (mode != LCK_PR) {
2734                                 ldlm_lock_addref(&lockh, LCK_PR);
2735                                 ldlm_lock_decref(&lockh, LCK_PW);
2736                         }
2737                 } else { /* no cached lock, needs acquire lock on server side */
2738                         fm_key->oa.o_valid |= OBD_MD_FLFLAGS;
2739                         fm_key->oa.o_flags |= OBD_FL_SRVLOCK;
2740                 }
2741
2742 skip_locking:
2743                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2744                                            &RQF_OST_GET_INFO_FIEMAP);
2745                 if (req == NULL)
2746                         GOTO(drop_lock, rc = -ENOMEM);
2747
2748                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_KEY,
2749                                      RCL_CLIENT, keylen);
2750                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
2751                                      RCL_CLIENT, *vallen);
2752                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
2753                                      RCL_SERVER, *vallen);
2754
2755                 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
2756                 if (rc) {
2757                         ptlrpc_request_free(req);
2758                         GOTO(drop_lock, rc);
2759                 }
2760
2761                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_KEY);
2762                 memcpy(tmp, key, keylen);
2763                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_VAL);
2764                 memcpy(tmp, val, *vallen);
2765
2766                 ptlrpc_request_set_replen(req);
2767                 rc = ptlrpc_queue_wait(req);
2768                 if (rc)
2769                         GOTO(fini_req, rc);
2770
2771                 reply = req_capsule_server_get(&req->rq_pill, &RMF_FIEMAP_VAL);
2772                 if (reply == NULL)
2773                         GOTO(fini_req, rc = -EPROTO);
2774
2775                 memcpy(val, reply, *vallen);
2776 fini_req:
2777                 ptlrpc_req_finished(req);
2778 drop_lock:
2779                 if (mode)
2780                         ldlm_lock_decref(&lockh, LCK_PR);
2781                 RETURN(rc);
2782         }
2783
2784         RETURN(-EINVAL);
2785 }
2786
2787 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
2788                               obd_count keylen, void *key, obd_count vallen,
2789                               void *val, struct ptlrpc_request_set *set)
2790 {
2791         struct ptlrpc_request *req;
2792         struct obd_device     *obd = exp->exp_obd;
2793         struct obd_import     *imp = class_exp2cliimp(exp);
2794         char                  *tmp;
2795         int                    rc;
2796         ENTRY;
2797
2798         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
2799
2800         if (KEY_IS(KEY_CHECKSUM)) {
2801                 if (vallen != sizeof(int))
2802                         RETURN(-EINVAL);
2803                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
2804                 RETURN(0);
2805         }
2806
2807         if (KEY_IS(KEY_SPTLRPC_CONF)) {
2808                 sptlrpc_conf_client_adapt(obd);
2809                 RETURN(0);
2810         }
2811
2812         if (KEY_IS(KEY_FLUSH_CTX)) {
2813                 sptlrpc_import_flush_my_ctx(imp);
2814                 RETURN(0);
2815         }
2816
2817         if (KEY_IS(KEY_CACHE_SET)) {
2818                 struct client_obd *cli = &obd->u.cli;
2819
2820                 LASSERT(cli->cl_cache == NULL); /* only once */
2821                 cli->cl_cache = (struct cl_client_cache *)val;
2822                 atomic_inc(&cli->cl_cache->ccc_users);
2823                 cli->cl_lru_left = &cli->cl_cache->ccc_lru_left;
2824
2825                 /* add this osc into entity list */
2826                 LASSERT(list_empty(&cli->cl_lru_osc));
2827                 spin_lock(&cli->cl_cache->ccc_lru_lock);
2828                 list_add(&cli->cl_lru_osc, &cli->cl_cache->ccc_lru);
2829                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
2830
2831                 RETURN(0);
2832         }
2833
2834         if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
2835                 struct client_obd *cli = &obd->u.cli;
2836                 int nr = atomic_read(&cli->cl_lru_in_list) >> 1;
2837                 int target = *(int *)val;
2838
2839                 nr = osc_lru_shrink(env, cli, min(nr, target), true);
2840                 *(int *)val -= nr;
2841                 RETURN(0);
2842         }
2843
2844         if (!set && !KEY_IS(KEY_GRANT_SHRINK))
2845                 RETURN(-EINVAL);
2846
2847         /* We pass all other commands directly to OST. Since nobody calls osc
2848            methods directly and everybody is supposed to go through LOV, we
2849            assume lov checked invalid values for us.
2850            The only recognised values so far are evict_by_nid and mds_conn.
2851            Even if something bad goes through, we'd get a -EINVAL from OST
2852            anyway. */
2853
2854         req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ?
2855                                                 &RQF_OST_SET_GRANT_INFO :
2856                                                 &RQF_OBD_SET_INFO);
2857         if (req == NULL)
2858                 RETURN(-ENOMEM);
2859
2860         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
2861                              RCL_CLIENT, keylen);
2862         if (!KEY_IS(KEY_GRANT_SHRINK))
2863                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
2864                                      RCL_CLIENT, vallen);
2865         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
2866         if (rc) {
2867                 ptlrpc_request_free(req);
2868                 RETURN(rc);
2869         }
2870
2871         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
2872         memcpy(tmp, key, keylen);
2873         tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ?
2874                                                         &RMF_OST_BODY :
2875                                                         &RMF_SETINFO_VAL);
2876         memcpy(tmp, val, vallen);
2877
2878         if (KEY_IS(KEY_GRANT_SHRINK)) {
2879                 struct osc_grant_args *aa;
2880                 struct obdo *oa;
2881
2882                 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2883                 aa = ptlrpc_req_async_args(req);
2884                 OBDO_ALLOC(oa);
2885                 if (!oa) {
2886                         ptlrpc_req_finished(req);
2887                         RETURN(-ENOMEM);
2888                 }
2889                 *oa = ((struct ost_body *)val)->oa;
2890                 aa->aa_oa = oa;
2891                 req->rq_interpret_reply = osc_shrink_grant_interpret;
2892         }
2893
2894         ptlrpc_request_set_replen(req);
2895         if (!KEY_IS(KEY_GRANT_SHRINK)) {
2896                 LASSERT(set != NULL);
2897                 ptlrpc_set_add_req(set, req);
2898                 ptlrpc_check_set(NULL, set);
2899         } else
2900                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
2901
2902         RETURN(0);
2903 }
2904
2905 static int osc_reconnect(const struct lu_env *env,
2906                          struct obd_export *exp, struct obd_device *obd,
2907                          struct obd_uuid *cluuid,
2908                          struct obd_connect_data *data,
2909                          void *localdata)
2910 {
2911         struct client_obd *cli = &obd->u.cli;
2912
2913         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
2914                 long lost_grant;
2915
2916                 client_obd_list_lock(&cli->cl_loi_list_lock);
2917                 data->ocd_grant = (cli->cl_avail_grant +
2918                                   (cli->cl_dirty_pages << PAGE_CACHE_SHIFT)) ?:
2919                                   2 * cli_brw_size(obd);
2920                 lost_grant = cli->cl_lost_grant;
2921                 cli->cl_lost_grant = 0;
2922                 client_obd_list_unlock(&cli->cl_loi_list_lock);
2923
2924                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
2925                        " ocd_grant: %d, lost: %ld.\n", data->ocd_connect_flags,
2926                        data->ocd_version, data->ocd_grant, lost_grant);
2927         }
2928
2929         RETURN(0);
2930 }
2931
2932 static int osc_disconnect(struct obd_export *exp)
2933 {
2934         struct obd_device *obd = class_exp2obd(exp);
2935         struct llog_ctxt  *ctxt;
2936         int rc;
2937
2938         ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
2939         if (ctxt) {
2940                 if (obd->u.cli.cl_conn_count == 1) {
2941                         /* Flush any remaining cancel messages out to the
2942                          * target */
2943                         llog_sync(ctxt, exp, 0);
2944                 }
2945                 llog_ctxt_put(ctxt);
2946         } else {
2947                 CDEBUG(D_HA, "No LLOG_SIZE_REPL_CTXT found in obd %p\n",
2948                        obd);
2949         }
2950
2951         rc = client_disconnect_export(exp);
2952         /**
2953          * Initially we put del_shrink_grant before disconnect_export, but it
2954          * causes the following problem if setup (connect) and cleanup
2955          * (disconnect) are tangled together.
2956          *      connect p1                     disconnect p2
2957          *   ptlrpc_connect_import
2958          *     ...............               class_manual_cleanup
2959          *                                     osc_disconnect
2960          *                                     del_shrink_grant
2961          *   ptlrpc_connect_interrupt
2962          *     init_grant_shrink
2963          *   add this client to shrink list
2964          *                                      cleanup_osc
2965          * Bang! pinger trigger the shrink.
2966          * So the osc should be disconnected from the shrink list, after we
2967          * are sure the import has been destroyed. BUG18662
2968          */
2969         if (obd->u.cli.cl_import == NULL)
2970                 osc_del_shrink_grant(&obd->u.cli);
2971         return rc;
2972 }
2973
2974 static int osc_import_event(struct obd_device *obd,
2975                             struct obd_import *imp,
2976                             enum obd_import_event event)
2977 {
2978         struct client_obd *cli;
2979         int rc = 0;
2980
2981         ENTRY;
2982         LASSERT(imp->imp_obd == obd);
2983
2984         switch (event) {
2985         case IMP_EVENT_DISCON: {
2986                 cli = &obd->u.cli;
2987                 client_obd_list_lock(&cli->cl_loi_list_lock);
2988                 cli->cl_avail_grant = 0;
2989                 cli->cl_lost_grant = 0;
2990                 client_obd_list_unlock(&cli->cl_loi_list_lock);
2991                 break;
2992         }
2993         case IMP_EVENT_INACTIVE: {
2994                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
2995                 break;
2996         }
2997         case IMP_EVENT_INVALIDATE: {
2998                 struct ldlm_namespace *ns = obd->obd_namespace;
2999                 struct lu_env         *env;
3000                 int                    refcheck;
3001
3002                 env = cl_env_get(&refcheck);
3003                 if (!IS_ERR(env)) {
3004                         /* Reset grants */
3005                         cli = &obd->u.cli;
3006                         /* all pages go to failing rpcs due to the invalid
3007                          * import */
3008                         osc_io_unplug(env, cli, NULL, PDL_POLICY_ROUND);
3009
3010                         ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3011                         cl_env_put(env, &refcheck);
3012                 } else
3013                         rc = PTR_ERR(env);
3014                 break;
3015         }
3016         case IMP_EVENT_ACTIVE: {
3017                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
3018                 break;
3019         }
3020         case IMP_EVENT_OCD: {
3021                 struct obd_connect_data *ocd = &imp->imp_connect_data;
3022
3023                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
3024                         osc_init_grant(&obd->u.cli, ocd);
3025
3026                 /* See bug 7198 */
3027                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
3028                         imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
3029
3030                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
3031                 break;
3032         }
3033         case IMP_EVENT_DEACTIVATE: {
3034                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE, NULL);
3035                 break;
3036         }
3037         case IMP_EVENT_ACTIVATE: {
3038                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE, NULL);
3039                 break;
3040         }
3041         default:
3042                 CERROR("Unknown import event %d\n", event);
3043                 LBUG();
3044         }
3045         RETURN(rc);
3046 }
3047
3048 /**
3049  * Determine whether the lock can be canceled before replaying the lock
3050  * during recovery, see bug16774 for detailed information.
3051  *
3052  * \retval zero the lock can't be canceled
3053  * \retval other ok to cancel
3054  */
3055 static int osc_cancel_weight(struct ldlm_lock *lock)
3056 {
3057         /*
3058          * Cancel all unused and granted extent lock.
3059          */
3060         if (lock->l_resource->lr_type == LDLM_EXTENT &&
3061             lock->l_granted_mode == lock->l_req_mode &&
3062             osc_ldlm_weigh_ast(lock) == 0)
3063                 RETURN(1);
3064
3065         RETURN(0);
3066 }
3067
3068 static int brw_queue_work(const struct lu_env *env, void *data)
3069 {
3070         struct client_obd *cli = data;
3071
3072         CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
3073
3074         osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
3075         RETURN(0);
3076 }
3077
3078 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
3079 {
3080         struct client_obd *cli = &obd->u.cli;
3081         struct obd_type   *type;
3082         void              *handler;
3083         int                rc;
3084         ENTRY;
3085
3086         rc = ptlrpcd_addref();
3087         if (rc)
3088                 RETURN(rc);
3089
3090         rc = client_obd_setup(obd, lcfg);
3091         if (rc)
3092                 GOTO(out_ptlrpcd, rc);
3093
3094         handler = ptlrpcd_alloc_work(cli->cl_import, brw_queue_work, cli);
3095         if (IS_ERR(handler))
3096                 GOTO(out_client_setup, rc = PTR_ERR(handler));
3097         cli->cl_writeback_work = handler;
3098
3099         handler = ptlrpcd_alloc_work(cli->cl_import, lru_queue_work, cli);
3100         if (IS_ERR(handler))
3101                 GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
3102         cli->cl_lru_work = handler;
3103
3104         rc = osc_quota_setup(obd);
3105         if (rc)
3106                 GOTO(out_ptlrpcd_work, rc);
3107
3108         cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
3109
3110 #ifdef LPROCFS
3111         obd->obd_vars = lprocfs_osc_obd_vars;
3112 #endif
3113         /* If this is true then both client (osc) and server (osp) are on the
3114          * same node. The osp layer if loaded first will register the osc proc
3115          * directory. In that case this obd_device will be attached its proc
3116          * tree to type->typ_procsym instead of obd->obd_type->typ_procroot. */
3117         type = class_search_type(LUSTRE_OSP_NAME);
3118         if (type && type->typ_procsym) {
3119                 obd->obd_proc_entry = lprocfs_seq_register(obd->obd_name,
3120                                                            type->typ_procsym,
3121                                                            obd->obd_vars, obd);
3122                 if (IS_ERR(obd->obd_proc_entry)) {
3123                         rc = PTR_ERR(obd->obd_proc_entry);
3124                         CERROR("error %d setting up lprocfs for %s\n", rc,
3125                                obd->obd_name);
3126                         obd->obd_proc_entry = NULL;
3127                 }
3128         } else {
3129                 rc = lprocfs_obd_setup(obd);
3130         }
3131
3132         /* If the basic OSC proc tree construction succeeded then
3133          * lets do the rest. */
3134         if (rc == 0) {
3135                 lproc_osc_attach_seqstat(obd);
3136                 sptlrpc_lprocfs_cliobd_attach(obd);
3137                 ptlrpc_lprocfs_register_obd(obd);
3138         }
3139
3140         /* We need to allocate a few requests more, because
3141          * brw_interpret tries to create new requests before freeing
3142          * previous ones, Ideally we want to have 2x max_rpcs_in_flight
3143          * reserved, but I'm afraid that might be too much wasted RAM
3144          * in fact, so 2 is just my guess and still should work. */
3145         cli->cl_import->imp_rq_pool =
3146                 ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
3147                                     OST_MAXREQSIZE,
3148                                     ptlrpc_add_rqs_to_pool);
3149
3150         INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
3151         ns_register_cancel(obd->obd_namespace, osc_cancel_weight);
3152         RETURN(0);
3153
3154 out_ptlrpcd_work:
3155         if (cli->cl_writeback_work != NULL) {
3156                 ptlrpcd_destroy_work(cli->cl_writeback_work);
3157                 cli->cl_writeback_work = NULL;
3158         }
3159         if (cli->cl_lru_work != NULL) {
3160                 ptlrpcd_destroy_work(cli->cl_lru_work);
3161                 cli->cl_lru_work = NULL;
3162         }
3163 out_client_setup:
3164         client_obd_cleanup(obd);
3165 out_ptlrpcd:
3166         ptlrpcd_decref();
3167         RETURN(rc);
3168 }
3169
3170 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
3171 {
3172         int rc = 0;
3173         ENTRY;
3174
3175         switch (stage) {
3176         case OBD_CLEANUP_EARLY: {
3177                 struct obd_import *imp;
3178                 imp = obd->u.cli.cl_import;
3179                 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
3180                 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
3181                 ptlrpc_deactivate_import(imp);
3182                 spin_lock(&imp->imp_lock);
3183                 imp->imp_pingable = 0;
3184                 spin_unlock(&imp->imp_lock);
3185                 break;
3186         }
3187         case OBD_CLEANUP_EXPORTS: {
3188                 struct client_obd *cli = &obd->u.cli;
3189                 /* LU-464
3190                  * for echo client, export may be on zombie list, wait for
3191                  * zombie thread to cull it, because cli.cl_import will be
3192                  * cleared in client_disconnect_export():
3193                  *   class_export_destroy() -> obd_cleanup() ->
3194                  *   echo_device_free() -> echo_client_cleanup() ->
3195                  *   obd_disconnect() -> osc_disconnect() ->
3196                  *   client_disconnect_export()
3197                  */
3198                 obd_zombie_barrier();
3199                 if (cli->cl_writeback_work) {
3200                         ptlrpcd_destroy_work(cli->cl_writeback_work);
3201                         cli->cl_writeback_work = NULL;
3202                 }
3203                 if (cli->cl_lru_work) {
3204                         ptlrpcd_destroy_work(cli->cl_lru_work);
3205                         cli->cl_lru_work = NULL;
3206                 }
3207                 obd_cleanup_client_import(obd);
3208                 ptlrpc_lprocfs_unregister_obd(obd);
3209                 lprocfs_obd_cleanup(obd);
3210                 rc = obd_llog_finish(obd, 0);
3211                 if (rc != 0)
3212                         CERROR("failed to cleanup llogging subsystems\n");
3213                 break;
3214                 }
3215         }
3216         RETURN(rc);
3217 }
3218
3219 int osc_cleanup(struct obd_device *obd)
3220 {
3221         struct client_obd *cli = &obd->u.cli;
3222         int rc;
3223
3224         ENTRY;
3225
3226         /* lru cleanup */
3227         if (cli->cl_cache != NULL) {
3228                 LASSERT(atomic_read(&cli->cl_cache->ccc_users) > 0);
3229                 spin_lock(&cli->cl_cache->ccc_lru_lock);
3230                 list_del_init(&cli->cl_lru_osc);
3231                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
3232                 cli->cl_lru_left = NULL;
3233                 atomic_dec(&cli->cl_cache->ccc_users);
3234                 cli->cl_cache = NULL;
3235         }
3236
3237         /* free memory of osc quota cache */
3238         osc_quota_cleanup(obd);
3239
3240         rc = client_obd_cleanup(obd);
3241
3242         ptlrpcd_decref();
3243         RETURN(rc);
3244 }
3245
3246 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
3247 {
3248         int rc = class_process_proc_param(PARAM_OSC, obd->obd_vars, lcfg, obd);
3249         return rc > 0 ? 0: rc;
3250 }
3251
3252 static int osc_process_config(struct obd_device *obd, obd_count len, void *buf)
3253 {
3254         return osc_process_config_base(obd, buf);
3255 }
3256
3257 struct obd_ops osc_obd_ops = {
3258         .o_owner                = THIS_MODULE,
3259         .o_setup                = osc_setup,
3260         .o_precleanup           = osc_precleanup,
3261         .o_cleanup              = osc_cleanup,
3262         .o_add_conn             = client_import_add_conn,
3263         .o_del_conn             = client_import_del_conn,
3264         .o_connect              = client_connect_import,
3265         .o_reconnect            = osc_reconnect,
3266         .o_disconnect           = osc_disconnect,
3267         .o_statfs               = osc_statfs,
3268         .o_statfs_async         = osc_statfs_async,
3269         .o_unpackmd             = osc_unpackmd,
3270         .o_create               = osc_create,
3271         .o_destroy              = osc_destroy,
3272         .o_getattr              = osc_getattr,
3273         .o_getattr_async        = osc_getattr_async,
3274         .o_setattr              = osc_setattr,
3275         .o_setattr_async        = osc_setattr_async,
3276         .o_change_cbdata        = osc_change_cbdata,
3277         .o_find_cbdata          = osc_find_cbdata,
3278         .o_iocontrol            = osc_iocontrol,
3279         .o_get_info             = osc_get_info,
3280         .o_set_info_async       = osc_set_info_async,
3281         .o_import_event         = osc_import_event,
3282         .o_process_config       = osc_process_config,
3283         .o_quotactl             = osc_quotactl,
3284         .o_quotacheck           = osc_quotacheck,
3285 };
3286
3287 extern struct lu_kmem_descr osc_caches[];
3288 extern spinlock_t osc_ast_guard;
3289 extern struct lock_class_key osc_ast_guard_class;
3290
3291 int __init osc_init(void)
3292 {
3293         bool enable_proc = true;
3294         struct obd_type *type;
3295         int rc;
3296         ENTRY;
3297
3298         /* print an address of _any_ initialized kernel symbol from this
3299          * module, to allow debugging with gdb that doesn't support data
3300          * symbols from modules.*/
3301         CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
3302
3303         rc = lu_kmem_init(osc_caches);
3304         if (rc)
3305                 RETURN(rc);
3306
3307         type = class_search_type(LUSTRE_OSP_NAME);
3308         if (type != NULL && type->typ_procsym != NULL)
3309                 enable_proc = false;
3310
3311         rc = class_register_type(&osc_obd_ops, NULL, enable_proc, NULL,
3312 #ifndef HAVE_ONLY_PROCFS_SEQ
3313                                  NULL,
3314 #endif
3315                                  LUSTRE_OSC_NAME, &osc_device_type);
3316         if (rc) {
3317                 lu_kmem_fini(osc_caches);
3318                 RETURN(rc);
3319         }
3320
3321         spin_lock_init(&osc_ast_guard);
3322         lockdep_set_class(&osc_ast_guard, &osc_ast_guard_class);
3323
3324         RETURN(rc);
3325 }
3326
3327 static void /*__exit*/ osc_exit(void)
3328 {
3329         class_unregister_type(LUSTRE_OSC_NAME);
3330         lu_kmem_fini(osc_caches);
3331 }
3332
3333 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
3334 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3335 MODULE_LICENSE("GPL");
3336
3337 cfs_module(osc, LUSTRE_VERSION_STRING, osc_init, osc_exit);