Whamcloud - gitweb
c708dfee4e158ea9c25d8ac4bb7fa4c233a33d79
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2013, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #define DEBUG_SUBSYSTEM S_OSC
38
39 #include <libcfs/libcfs.h>
40
41 #include <lustre_dlm.h>
42 #include <lustre_net.h>
43 #include <lustre/lustre_user.h>
44 #include <obd_cksum.h>
45 #include <lustre_ha.h>
46 #include <lprocfs_status.h>
47 #include <lustre_ioctl.h>
48 #include <lustre_debug.h>
49 #include <lustre_param.h>
50 #include <lustre_fid.h>
51 #include <obd_class.h>
52 #include "osc_internal.h"
53 #include "osc_cl_internal.h"
54
55 struct osc_brw_async_args {
56         struct obdo              *aa_oa;
57         int                       aa_requested_nob;
58         int                       aa_nio_count;
59         obd_count                 aa_page_count;
60         int                       aa_resends;
61         struct brw_page **aa_ppga;
62         struct client_obd        *aa_cli;
63         struct list_head          aa_oaps;
64         struct list_head          aa_exts;
65         struct obd_capa  *aa_ocapa;
66         struct cl_req            *aa_clerq;
67 };
68
69 #define osc_grant_args osc_brw_async_args
70
71 struct osc_async_args {
72         struct obd_info *aa_oi;
73 };
74
75 struct osc_setattr_args {
76         struct obdo             *sa_oa;
77         obd_enqueue_update_f     sa_upcall;
78         void                    *sa_cookie;
79 };
80
81 struct osc_fsync_args {
82         struct obd_info *fa_oi;
83         obd_enqueue_update_f     fa_upcall;
84         void                    *fa_cookie;
85 };
86
87 struct osc_enqueue_args {
88         struct obd_export       *oa_exp;
89         ldlm_type_t             oa_type;
90         ldlm_mode_t             oa_mode;
91         __u64                   *oa_flags;
92         osc_enqueue_upcall_f    oa_upcall;
93         void                    *oa_cookie;
94         struct ost_lvb          *oa_lvb;
95         struct lustre_handle    oa_lockh;
96         unsigned int            oa_agl:1;
97 };
98
99 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
100 static int brw_interpret(const struct lu_env *env, struct ptlrpc_request *req,
101                          void *data, int rc);
102
103 /* Unpack OSC object metadata from disk storage (LE byte order). */
104 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
105                         struct lov_mds_md *lmm, int lmm_bytes)
106 {
107         int lsm_size;
108         struct obd_import *imp = class_exp2cliimp(exp);
109         ENTRY;
110
111         if (lmm != NULL) {
112                 if (lmm_bytes < sizeof(*lmm)) {
113                         CERROR("%s: lov_mds_md too small: %d, need %d\n",
114                                exp->exp_obd->obd_name, lmm_bytes,
115                                (int)sizeof(*lmm));
116                         RETURN(-EINVAL);
117                 }
118                 /* XXX LOV_MAGIC etc check? */
119
120                 if (unlikely(ostid_id(&lmm->lmm_oi) == 0)) {
121                         CERROR("%s: zero lmm_object_id: rc = %d\n",
122                                exp->exp_obd->obd_name, -EINVAL);
123                         RETURN(-EINVAL);
124                 }
125         }
126
127         lsm_size = lov_stripe_md_size(1);
128         if (lsmp == NULL)
129                 RETURN(lsm_size);
130
131         if (*lsmp != NULL && lmm == NULL) {
132                 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
133                 OBD_FREE(*lsmp, lsm_size);
134                 *lsmp = NULL;
135                 RETURN(0);
136         }
137
138         if (*lsmp == NULL) {
139                 OBD_ALLOC(*lsmp, lsm_size);
140                 if (unlikely(*lsmp == NULL))
141                         RETURN(-ENOMEM);
142                 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
143                 if (unlikely((*lsmp)->lsm_oinfo[0] == NULL)) {
144                         OBD_FREE(*lsmp, lsm_size);
145                         RETURN(-ENOMEM);
146                 }
147                 loi_init((*lsmp)->lsm_oinfo[0]);
148         } else if (unlikely(ostid_id(&(*lsmp)->lsm_oi) == 0)) {
149                 RETURN(-EBADF);
150         }
151
152         if (lmm != NULL)
153                 /* XXX zero *lsmp? */
154                 ostid_le_to_cpu(&lmm->lmm_oi, &(*lsmp)->lsm_oi);
155
156         if (imp != NULL &&
157             (imp->imp_connect_data.ocd_connect_flags & OBD_CONNECT_MAXBYTES))
158                 (*lsmp)->lsm_maxbytes = imp->imp_connect_data.ocd_maxbytes;
159         else
160                 (*lsmp)->lsm_maxbytes = LUSTRE_EXT3_STRIPE_MAXBYTES;
161
162         RETURN(lsm_size);
163 }
164
165 static inline void osc_pack_capa(struct ptlrpc_request *req,
166                                  struct ost_body *body, void *capa)
167 {
168         struct obd_capa *oc = (struct obd_capa *)capa;
169         struct lustre_capa *c;
170
171         if (!capa)
172                 return;
173
174         c = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
175         LASSERT(c);
176         capa_cpy(c, oc);
177         body->oa.o_valid |= OBD_MD_FLOSSCAPA;
178         DEBUG_CAPA(D_SEC, c, "pack");
179 }
180
181 static inline void osc_pack_req_body(struct ptlrpc_request *req,
182                                      struct obd_info *oinfo)
183 {
184         struct ost_body *body;
185
186         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
187         LASSERT(body);
188
189         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
190                              oinfo->oi_oa);
191         osc_pack_capa(req, body, oinfo->oi_capa);
192 }
193
194 static inline void osc_set_capa_size(struct ptlrpc_request *req,
195                                      const struct req_msg_field *field,
196                                      struct obd_capa *oc)
197 {
198         if (oc == NULL)
199                 req_capsule_set_size(&req->rq_pill, field, RCL_CLIENT, 0);
200         else
201                 /* it is already calculated as sizeof struct obd_capa */
202                 ;
203 }
204
205 static int osc_getattr_interpret(const struct lu_env *env,
206                                  struct ptlrpc_request *req,
207                                  struct osc_async_args *aa, int rc)
208 {
209         struct ost_body *body;
210         ENTRY;
211
212         if (rc != 0)
213                 GOTO(out, rc);
214
215         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
216         if (body) {
217                 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
218                 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
219                                      aa->aa_oi->oi_oa, &body->oa);
220
221                 /* This should really be sent by the OST */
222                 aa->aa_oi->oi_oa->o_blksize = DT_MAX_BRW_SIZE;
223                 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
224         } else {
225                 CDEBUG(D_INFO, "can't unpack ost_body\n");
226                 rc = -EPROTO;
227                 aa->aa_oi->oi_oa->o_valid = 0;
228         }
229 out:
230         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
231         RETURN(rc);
232 }
233
234 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
235                              struct ptlrpc_request_set *set)
236 {
237         struct ptlrpc_request *req;
238         struct osc_async_args *aa;
239         int                    rc;
240         ENTRY;
241
242         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
243         if (req == NULL)
244                 RETURN(-ENOMEM);
245
246         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
247         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
248         if (rc) {
249                 ptlrpc_request_free(req);
250                 RETURN(rc);
251         }
252
253         osc_pack_req_body(req, oinfo);
254
255         ptlrpc_request_set_replen(req);
256         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_getattr_interpret;
257
258         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
259         aa = ptlrpc_req_async_args(req);
260         aa->aa_oi = oinfo;
261
262         ptlrpc_set_add_req(set, req);
263         RETURN(0);
264 }
265
266 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
267                        struct obd_info *oinfo)
268 {
269         struct ptlrpc_request *req;
270         struct ost_body       *body;
271         int                    rc;
272         ENTRY;
273
274         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
275         if (req == NULL)
276                 RETURN(-ENOMEM);
277
278         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
279         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
280         if (rc) {
281                 ptlrpc_request_free(req);
282                 RETURN(rc);
283         }
284
285         osc_pack_req_body(req, oinfo);
286
287         ptlrpc_request_set_replen(req);
288
289         rc = ptlrpc_queue_wait(req);
290         if (rc)
291                 GOTO(out, rc);
292
293         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
294         if (body == NULL)
295                 GOTO(out, rc = -EPROTO);
296
297         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
298         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oinfo->oi_oa,
299                              &body->oa);
300
301         oinfo->oi_oa->o_blksize = cli_brw_size(exp->exp_obd);
302         oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
303
304         EXIT;
305  out:
306         ptlrpc_req_finished(req);
307         return rc;
308 }
309
310 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
311                        struct obd_info *oinfo, struct obd_trans_info *oti)
312 {
313         struct ptlrpc_request *req;
314         struct ost_body       *body;
315         int                    rc;
316         ENTRY;
317
318         LASSERT(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP);
319
320         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
321         if (req == NULL)
322                 RETURN(-ENOMEM);
323
324         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
325         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
326         if (rc) {
327                 ptlrpc_request_free(req);
328                 RETURN(rc);
329         }
330
331         osc_pack_req_body(req, oinfo);
332
333         ptlrpc_request_set_replen(req);
334
335         rc = ptlrpc_queue_wait(req);
336         if (rc)
337                 GOTO(out, rc);
338
339         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
340         if (body == NULL)
341                 GOTO(out, rc = -EPROTO);
342
343         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oinfo->oi_oa,
344                              &body->oa);
345
346         EXIT;
347 out:
348         ptlrpc_req_finished(req);
349         RETURN(rc);
350 }
351
352 static int osc_setattr_interpret(const struct lu_env *env,
353                                  struct ptlrpc_request *req,
354                                  struct osc_setattr_args *sa, int rc)
355 {
356         struct ost_body *body;
357         ENTRY;
358
359         if (rc != 0)
360                 GOTO(out, rc);
361
362         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
363         if (body == NULL)
364                 GOTO(out, rc = -EPROTO);
365
366         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, sa->sa_oa,
367                              &body->oa);
368 out:
369         rc = sa->sa_upcall(sa->sa_cookie, rc);
370         RETURN(rc);
371 }
372
373 int osc_setattr_async_base(struct obd_export *exp, struct obd_info *oinfo,
374                            struct obd_trans_info *oti,
375                            obd_enqueue_update_f upcall, void *cookie,
376                            struct ptlrpc_request_set *rqset)
377 {
378         struct ptlrpc_request   *req;
379         struct osc_setattr_args *sa;
380         int                      rc;
381         ENTRY;
382
383         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
384         if (req == NULL)
385                 RETURN(-ENOMEM);
386
387         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
388         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
389         if (rc) {
390                 ptlrpc_request_free(req);
391                 RETURN(rc);
392         }
393
394         if (oti && oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)
395                 oinfo->oi_oa->o_lcookie = *oti->oti_logcookies;
396
397         osc_pack_req_body(req, oinfo);
398
399         ptlrpc_request_set_replen(req);
400
401         /* do mds to ost setattr asynchronously */
402         if (!rqset) {
403                 /* Do not wait for response. */
404                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
405         } else {
406                 req->rq_interpret_reply =
407                         (ptlrpc_interpterer_t)osc_setattr_interpret;
408
409                 CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
410                 sa = ptlrpc_req_async_args(req);
411                 sa->sa_oa = oinfo->oi_oa;
412                 sa->sa_upcall = upcall;
413                 sa->sa_cookie = cookie;
414
415                 if (rqset == PTLRPCD_SET)
416                         ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
417                 else
418                         ptlrpc_set_add_req(rqset, req);
419         }
420
421         RETURN(0);
422 }
423
424 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
425                              struct obd_trans_info *oti,
426                              struct ptlrpc_request_set *rqset)
427 {
428         return osc_setattr_async_base(exp, oinfo, oti,
429                                       oinfo->oi_cb_up, oinfo, rqset);
430 }
431
432 int osc_real_create(struct obd_export *exp, struct obdo *oa,
433                     struct lov_stripe_md **ea, struct obd_trans_info *oti)
434 {
435         struct ptlrpc_request *req;
436         struct ost_body       *body;
437         struct lov_stripe_md  *lsm;
438         int                    rc;
439         ENTRY;
440
441         LASSERT(oa);
442         LASSERT(ea);
443
444         lsm = *ea;
445         if (!lsm) {
446                 rc = obd_alloc_memmd(exp, &lsm);
447                 if (rc < 0)
448                         RETURN(rc);
449         }
450
451         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
452         if (req == NULL)
453                 GOTO(out, rc = -ENOMEM);
454
455         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
456         if (rc) {
457                 ptlrpc_request_free(req);
458                 GOTO(out, rc);
459         }
460
461         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
462         LASSERT(body);
463
464         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
465
466         ptlrpc_request_set_replen(req);
467
468         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
469             oa->o_flags == OBD_FL_DELORPHAN) {
470                 DEBUG_REQ(D_HA, req,
471                           "delorphan from OST integration");
472                 /* Don't resend the delorphan req */
473                 req->rq_no_resend = req->rq_no_delay = 1;
474         }
475
476         rc = ptlrpc_queue_wait(req);
477         if (rc)
478                 GOTO(out_req, rc);
479
480         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
481         if (body == NULL)
482                 GOTO(out_req, rc = -EPROTO);
483
484         CDEBUG(D_INFO, "oa flags %x\n", oa->o_flags);
485         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
486
487         oa->o_blksize = cli_brw_size(exp->exp_obd);
488         oa->o_valid |= OBD_MD_FLBLKSZ;
489
490         /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
491          * have valid lsm_oinfo data structs, so don't go touching that.
492          * This needs to be fixed in a big way.
493          */
494         lsm->lsm_oi = oa->o_oi;
495         *ea = lsm;
496
497         if (oti != NULL) {
498                 if (oa->o_valid & OBD_MD_FLCOOKIE) {
499                         if (oti->oti_logcookies == NULL)
500                                 oti->oti_logcookies = &oti->oti_onecookie;
501
502                         *oti->oti_logcookies = oa->o_lcookie;
503                 }
504         }
505
506         CDEBUG(D_HA, "transno: "LPD64"\n",
507                lustre_msg_get_transno(req->rq_repmsg));
508 out_req:
509         ptlrpc_req_finished(req);
510 out:
511         if (rc && !*ea)
512                 obd_free_memmd(exp, &lsm);
513         RETURN(rc);
514 }
515
516 int osc_punch_base(struct obd_export *exp, struct obd_info *oinfo,
517                    obd_enqueue_update_f upcall, void *cookie,
518                    struct ptlrpc_request_set *rqset)
519 {
520         struct ptlrpc_request   *req;
521         struct osc_setattr_args *sa;
522         struct ost_body         *body;
523         int                      rc;
524         ENTRY;
525
526         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
527         if (req == NULL)
528                 RETURN(-ENOMEM);
529
530         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
531         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
532         if (rc) {
533                 ptlrpc_request_free(req);
534                 RETURN(rc);
535         }
536         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
537         ptlrpc_at_set_req_timeout(req);
538
539         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
540         LASSERT(body);
541         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
542                              oinfo->oi_oa);
543         osc_pack_capa(req, body, oinfo->oi_capa);
544
545         ptlrpc_request_set_replen(req);
546
547         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
548         CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
549         sa = ptlrpc_req_async_args(req);
550         sa->sa_oa     = oinfo->oi_oa;
551         sa->sa_upcall = upcall;
552         sa->sa_cookie = cookie;
553         if (rqset == PTLRPCD_SET)
554                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
555         else
556                 ptlrpc_set_add_req(rqset, req);
557
558         RETURN(0);
559 }
560
561 static int osc_sync_interpret(const struct lu_env *env,
562                               struct ptlrpc_request *req,
563                               void *arg, int rc)
564 {
565         struct osc_fsync_args *fa = arg;
566         struct ost_body *body;
567         ENTRY;
568
569         if (rc)
570                 GOTO(out, rc);
571
572         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
573         if (body == NULL) {
574                 CERROR ("can't unpack ost_body\n");
575                 GOTO(out, rc = -EPROTO);
576         }
577
578         *fa->fa_oi->oi_oa = body->oa;
579 out:
580         rc = fa->fa_upcall(fa->fa_cookie, rc);
581         RETURN(rc);
582 }
583
584 int osc_sync_base(struct obd_export *exp, struct obd_info *oinfo,
585                   obd_enqueue_update_f upcall, void *cookie,
586                   struct ptlrpc_request_set *rqset)
587 {
588         struct ptlrpc_request *req;
589         struct ost_body       *body;
590         struct osc_fsync_args *fa;
591         int                    rc;
592         ENTRY;
593
594         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
595         if (req == NULL)
596                 RETURN(-ENOMEM);
597
598         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
599         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
600         if (rc) {
601                 ptlrpc_request_free(req);
602                 RETURN(rc);
603         }
604
605         /* overload the size and blocks fields in the oa with start/end */
606         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
607         LASSERT(body);
608         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
609                              oinfo->oi_oa);
610         osc_pack_capa(req, body, oinfo->oi_capa);
611
612         ptlrpc_request_set_replen(req);
613         req->rq_interpret_reply = osc_sync_interpret;
614
615         CLASSERT(sizeof(*fa) <= sizeof(req->rq_async_args));
616         fa = ptlrpc_req_async_args(req);
617         fa->fa_oi = oinfo;
618         fa->fa_upcall = upcall;
619         fa->fa_cookie = cookie;
620
621         if (rqset == PTLRPCD_SET)
622                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
623         else
624                 ptlrpc_set_add_req(rqset, req);
625
626         RETURN (0);
627 }
628
629 /* Find and cancel locally locks matched by @mode in the resource found by
630  * @objid. Found locks are added into @cancel list. Returns the amount of
631  * locks added to @cancels list. */
632 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
633                                    struct list_head *cancels,
634                                    ldlm_mode_t mode, __u64 lock_flags)
635 {
636         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
637         struct ldlm_res_id res_id;
638         struct ldlm_resource *res;
639         int count;
640         ENTRY;
641
642         /* Return, i.e. cancel nothing, only if ELC is supported (flag in
643          * export) but disabled through procfs (flag in NS).
644          *
645          * This distinguishes from a case when ELC is not supported originally,
646          * when we still want to cancel locks in advance and just cancel them
647          * locally, without sending any RPC. */
648         if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
649                 RETURN(0);
650
651         ostid_build_res_name(&oa->o_oi, &res_id);
652         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
653         if (IS_ERR(res))
654                 RETURN(0);
655
656         LDLM_RESOURCE_ADDREF(res);
657         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
658                                            lock_flags, 0, NULL);
659         LDLM_RESOURCE_DELREF(res);
660         ldlm_resource_putref(res);
661         RETURN(count);
662 }
663
664 static int osc_destroy_interpret(const struct lu_env *env,
665                                  struct ptlrpc_request *req, void *data,
666                                  int rc)
667 {
668         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
669
670         atomic_dec(&cli->cl_destroy_in_flight);
671         wake_up(&cli->cl_destroy_waitq);
672         return 0;
673 }
674
675 static int osc_can_send_destroy(struct client_obd *cli)
676 {
677         if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
678             cli->cl_max_rpcs_in_flight) {
679                 /* The destroy request can be sent */
680                 return 1;
681         }
682         if (atomic_dec_return(&cli->cl_destroy_in_flight) <
683             cli->cl_max_rpcs_in_flight) {
684                 /*
685                  * The counter has been modified between the two atomic
686                  * operations.
687                  */
688                 wake_up(&cli->cl_destroy_waitq);
689         }
690         return 0;
691 }
692
693 int osc_create(const struct lu_env *env, struct obd_export *exp,
694                struct obdo *oa, struct lov_stripe_md **ea,
695                struct obd_trans_info *oti)
696 {
697         int rc = 0;
698         ENTRY;
699
700         LASSERT(oa);
701         LASSERT(ea);
702         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
703
704         if (!fid_seq_is_mdt(ostid_seq(&oa->o_oi)))
705                 RETURN(osc_real_create(exp, oa, ea, oti));
706
707         /* we should not get here anymore */
708         LBUG();
709
710         RETURN(rc);
711 }
712
713 /* Destroy requests can be async always on the client, and we don't even really
714  * care about the return code since the client cannot do anything at all about
715  * a destroy failure.
716  * When the MDS is unlinking a filename, it saves the file objects into a
717  * recovery llog, and these object records are cancelled when the OST reports
718  * they were destroyed and sync'd to disk (i.e. transaction committed).
719  * If the client dies, or the OST is down when the object should be destroyed,
720  * the records are not cancelled, and when the OST reconnects to the MDS next,
721  * it will retrieve the llog unlink logs and then sends the log cancellation
722  * cookies to the MDS after committing destroy transactions. */
723 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
724                        struct obdo *oa, struct lov_stripe_md *ea,
725                        struct obd_trans_info *oti, struct obd_export *md_export,
726                        void *capa)
727 {
728         struct client_obd     *cli = &exp->exp_obd->u.cli;
729         struct ptlrpc_request *req;
730         struct ost_body       *body;
731         struct list_head       cancels = LIST_HEAD_INIT(cancels);
732         int rc, count;
733         ENTRY;
734
735         if (!oa) {
736                 CDEBUG(D_INFO, "oa NULL\n");
737                 RETURN(-EINVAL);
738         }
739
740         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
741                                         LDLM_FL_DISCARD_DATA);
742
743         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
744         if (req == NULL) {
745                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
746                 RETURN(-ENOMEM);
747         }
748
749         osc_set_capa_size(req, &RMF_CAPA1, (struct obd_capa *)capa);
750         rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
751                                0, &cancels, count);
752         if (rc) {
753                 ptlrpc_request_free(req);
754                 RETURN(rc);
755         }
756
757         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
758         ptlrpc_at_set_req_timeout(req);
759
760         if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
761                 oa->o_lcookie = *oti->oti_logcookies;
762         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
763         LASSERT(body);
764         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
765
766         osc_pack_capa(req, body, (struct obd_capa *)capa);
767         ptlrpc_request_set_replen(req);
768
769         /* If osc_destory is for destroying the unlink orphan,
770          * sent from MDT to OST, which should not be blocked here,
771          * because the process might be triggered by ptlrpcd, and
772          * it is not good to block ptlrpcd thread (b=16006)*/
773         if (!(oa->o_flags & OBD_FL_DELORPHAN)) {
774                 req->rq_interpret_reply = osc_destroy_interpret;
775                 if (!osc_can_send_destroy(cli)) {
776                         struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP,
777                                                           NULL);
778
779                         /*
780                          * Wait until the number of on-going destroy RPCs drops
781                          * under max_rpc_in_flight
782                          */
783                         l_wait_event_exclusive(cli->cl_destroy_waitq,
784                                                osc_can_send_destroy(cli), &lwi);
785                 }
786         }
787
788         /* Do not wait for response */
789         ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
790         RETURN(0);
791 }
792
793 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
794                                 long writing_bytes)
795 {
796         obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
797
798         LASSERT(!(oa->o_valid & bits));
799
800         oa->o_valid |= bits;
801         client_obd_list_lock(&cli->cl_loi_list_lock);
802         oa->o_dirty = cli->cl_dirty_pages << PAGE_CACHE_SHIFT;
803         if (unlikely(cli->cl_dirty_pages - cli->cl_dirty_transit >
804                      cli->cl_dirty_max_pages)) {
805                 CERROR("dirty %lu - %lu > dirty_max %lu\n",
806                        cli->cl_dirty_pages, cli->cl_dirty_transit,
807                        cli->cl_dirty_max_pages);
808                 oa->o_undirty = 0;
809         } else if (unlikely(atomic_long_read(&obd_dirty_pages) -
810                             atomic_long_read(&obd_dirty_transit_pages) >
811                             (obd_max_dirty_pages + 1))) {
812                 /* The atomic_read() allowing the atomic_inc() are
813                  * not covered by a lock thus they may safely race and trip
814                  * this CERROR() unless we add in a small fudge factor (+1). */
815                 CERROR("%s: dirty %ld - %ld > system dirty_max %lu\n",
816                        cli->cl_import->imp_obd->obd_name,
817                        atomic_long_read(&obd_dirty_pages),
818                        atomic_long_read(&obd_dirty_transit_pages),
819                        obd_max_dirty_pages);
820                 oa->o_undirty = 0;
821         } else if (unlikely(cli->cl_dirty_max_pages - cli->cl_dirty_pages >
822                             0x7fffffff)) {
823                 CERROR("dirty %lu - dirty_max %lu too big???\n",
824                        cli->cl_dirty_pages, cli->cl_dirty_max_pages);
825                 oa->o_undirty = 0;
826         } else {
827                 unsigned long max_in_flight = (cli->cl_max_pages_per_rpc <<
828                                       PAGE_CACHE_SHIFT) *
829                                      (cli->cl_max_rpcs_in_flight + 1);
830                 oa->o_undirty = max(cli->cl_dirty_max_pages << PAGE_CACHE_SHIFT,
831                                     max_in_flight);
832         }
833         oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
834         oa->o_dropped = cli->cl_lost_grant;
835         cli->cl_lost_grant = 0;
836         client_obd_list_unlock(&cli->cl_loi_list_lock);
837         CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
838                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
839
840 }
841
842 void osc_update_next_shrink(struct client_obd *cli)
843 {
844         cli->cl_next_shrink_grant =
845                 cfs_time_shift(cli->cl_grant_shrink_interval);
846         CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
847                cli->cl_next_shrink_grant);
848 }
849
850 static void __osc_update_grant(struct client_obd *cli, obd_size grant)
851 {
852         client_obd_list_lock(&cli->cl_loi_list_lock);
853         cli->cl_avail_grant += grant;
854         client_obd_list_unlock(&cli->cl_loi_list_lock);
855 }
856
857 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
858 {
859         if (body->oa.o_valid & OBD_MD_FLGRANT) {
860                 CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
861                 __osc_update_grant(cli, body->oa.o_grant);
862         }
863 }
864
865 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
866                               obd_count keylen, void *key, obd_count vallen,
867                               void *val, struct ptlrpc_request_set *set);
868
869 static int osc_shrink_grant_interpret(const struct lu_env *env,
870                                       struct ptlrpc_request *req,
871                                       void *aa, int rc)
872 {
873         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
874         struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
875         struct ost_body *body;
876
877         if (rc != 0) {
878                 __osc_update_grant(cli, oa->o_grant);
879                 GOTO(out, rc);
880         }
881
882         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
883         LASSERT(body);
884         osc_update_grant(cli, body);
885 out:
886         OBDO_FREE(oa);
887         return rc;
888 }
889
890 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
891 {
892         client_obd_list_lock(&cli->cl_loi_list_lock);
893         oa->o_grant = cli->cl_avail_grant / 4;
894         cli->cl_avail_grant -= oa->o_grant;
895         client_obd_list_unlock(&cli->cl_loi_list_lock);
896         if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
897                 oa->o_valid |= OBD_MD_FLFLAGS;
898                 oa->o_flags = 0;
899         }
900         oa->o_flags |= OBD_FL_SHRINK_GRANT;
901         osc_update_next_shrink(cli);
902 }
903
904 /* Shrink the current grant, either from some large amount to enough for a
905  * full set of in-flight RPCs, or if we have already shrunk to that limit
906  * then to enough for a single RPC.  This avoids keeping more grant than
907  * needed, and avoids shrinking the grant piecemeal. */
908 static int osc_shrink_grant(struct client_obd *cli)
909 {
910         __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) *
911                              (cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT);
912
913         client_obd_list_lock(&cli->cl_loi_list_lock);
914         if (cli->cl_avail_grant <= target_bytes)
915                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
916         client_obd_list_unlock(&cli->cl_loi_list_lock);
917
918         return osc_shrink_grant_to_target(cli, target_bytes);
919 }
920
921 int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
922 {
923         int                     rc = 0;
924         struct ost_body        *body;
925         ENTRY;
926
927         client_obd_list_lock(&cli->cl_loi_list_lock);
928         /* Don't shrink if we are already above or below the desired limit
929          * We don't want to shrink below a single RPC, as that will negatively
930          * impact block allocation and long-term performance. */
931         if (target_bytes < cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT)
932                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
933
934         if (target_bytes >= cli->cl_avail_grant) {
935                 client_obd_list_unlock(&cli->cl_loi_list_lock);
936                 RETURN(0);
937         }
938         client_obd_list_unlock(&cli->cl_loi_list_lock);
939
940         OBD_ALLOC_PTR(body);
941         if (!body)
942                 RETURN(-ENOMEM);
943
944         osc_announce_cached(cli, &body->oa, 0);
945
946         client_obd_list_lock(&cli->cl_loi_list_lock);
947         body->oa.o_grant = cli->cl_avail_grant - target_bytes;
948         cli->cl_avail_grant = target_bytes;
949         client_obd_list_unlock(&cli->cl_loi_list_lock);
950         if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
951                 body->oa.o_valid |= OBD_MD_FLFLAGS;
952                 body->oa.o_flags = 0;
953         }
954         body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
955         osc_update_next_shrink(cli);
956
957         rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
958                                 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
959                                 sizeof(*body), body, NULL);
960         if (rc != 0)
961                 __osc_update_grant(cli, body->oa.o_grant);
962         OBD_FREE_PTR(body);
963         RETURN(rc);
964 }
965
966 static int osc_should_shrink_grant(struct client_obd *client)
967 {
968         cfs_time_t time = cfs_time_current();
969         cfs_time_t next_shrink = client->cl_next_shrink_grant;
970
971         if ((client->cl_import->imp_connect_data.ocd_connect_flags &
972              OBD_CONNECT_GRANT_SHRINK) == 0)
973                 return 0;
974
975         if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
976                 /* Get the current RPC size directly, instead of going via:
977                  * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
978                  * Keep comment here so that it can be found by searching. */
979                 int brw_size = client->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
980
981                 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
982                     client->cl_avail_grant > brw_size)
983                         return 1;
984                 else
985                         osc_update_next_shrink(client);
986         }
987         return 0;
988 }
989
990 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
991 {
992         struct client_obd *client;
993
994         list_for_each_entry(client, &item->ti_obd_list, cl_grant_shrink_list) {
995                 if (osc_should_shrink_grant(client))
996                         osc_shrink_grant(client);
997         }
998         return 0;
999 }
1000
1001 static int osc_add_shrink_grant(struct client_obd *client)
1002 {
1003         int rc;
1004
1005         rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
1006                                        TIMEOUT_GRANT,
1007                                        osc_grant_shrink_grant_cb, NULL,
1008                                        &client->cl_grant_shrink_list);
1009         if (rc) {
1010                 CERROR("add grant client %s error %d\n",
1011                         client->cl_import->imp_obd->obd_name, rc);
1012                 return rc;
1013         }
1014         CDEBUG(D_CACHE, "add grant client %s \n",
1015                client->cl_import->imp_obd->obd_name);
1016         osc_update_next_shrink(client);
1017         return 0;
1018 }
1019
1020 static int osc_del_shrink_grant(struct client_obd *client)
1021 {
1022         return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
1023                                          TIMEOUT_GRANT);
1024 }
1025
1026 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
1027 {
1028         /*
1029          * ocd_grant is the total grant amount we're expect to hold: if we've
1030          * been evicted, it's the new avail_grant amount, cl_dirty_pages will
1031          * drop to 0 as inflight RPCs fail out; otherwise, it's avail_grant +
1032          * dirty.
1033          *
1034          * race is tolerable here: if we're evicted, but imp_state already
1035          * left EVICTED state, then cl_dirty_pages must be 0 already.
1036          */
1037         client_obd_list_lock(&cli->cl_loi_list_lock);
1038         if (cli->cl_import->imp_state == LUSTRE_IMP_EVICTED)
1039                 cli->cl_avail_grant = ocd->ocd_grant;
1040         else
1041                 cli->cl_avail_grant = ocd->ocd_grant -
1042                                       (cli->cl_dirty_pages << PAGE_CACHE_SHIFT);
1043
1044         if (cli->cl_avail_grant < 0) {
1045                 CWARN("%s: available grant < 0: avail/ocd/dirty %ld/%u/%ld\n",
1046                       cli->cl_import->imp_obd->obd_name, cli->cl_avail_grant,
1047                       ocd->ocd_grant, cli->cl_dirty_pages << PAGE_CACHE_SHIFT);
1048                 /* workaround for servers which do not have the patch from
1049                  * LU-2679 */
1050                 cli->cl_avail_grant = ocd->ocd_grant;
1051         }
1052
1053         /* determine the appropriate chunk size used by osc_extent. */
1054         cli->cl_chunkbits = max_t(int, PAGE_CACHE_SHIFT, ocd->ocd_blocksize);
1055         client_obd_list_unlock(&cli->cl_loi_list_lock);
1056
1057         CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld."
1058                 "chunk bits: %d.\n", cli->cl_import->imp_obd->obd_name,
1059                 cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits);
1060
1061         if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
1062             list_empty(&cli->cl_grant_shrink_list))
1063                 osc_add_shrink_grant(cli);
1064 }
1065
1066 /* We assume that the reason this OSC got a short read is because it read
1067  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1068  * via the LOV, and it _knows_ it's reading inside the file, it's just that
1069  * this stripe never got written at or beyond this stripe offset yet. */
1070 static void handle_short_read(int nob_read, obd_count page_count,
1071                               struct brw_page **pga)
1072 {
1073         char *ptr;
1074         int i = 0;
1075
1076         /* skip bytes read OK */
1077         while (nob_read > 0) {
1078                 LASSERT (page_count > 0);
1079
1080                 if (pga[i]->count > nob_read) {
1081                         /* EOF inside this page */
1082                         ptr = kmap(pga[i]->pg) +
1083                                 (pga[i]->off & ~CFS_PAGE_MASK);
1084                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1085                         kunmap(pga[i]->pg);
1086                         page_count--;
1087                         i++;
1088                         break;
1089                 }
1090
1091                 nob_read -= pga[i]->count;
1092                 page_count--;
1093                 i++;
1094         }
1095
1096         /* zero remaining pages */
1097         while (page_count-- > 0) {
1098                 ptr = kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
1099                 memset(ptr, 0, pga[i]->count);
1100                 kunmap(pga[i]->pg);
1101                 i++;
1102         }
1103 }
1104
1105 static int check_write_rcs(struct ptlrpc_request *req,
1106                            int requested_nob, int niocount,
1107                            obd_count page_count, struct brw_page **pga)
1108 {
1109         int     i;
1110         __u32   *remote_rcs;
1111
1112         remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1113                                                   sizeof(*remote_rcs) *
1114                                                   niocount);
1115         if (remote_rcs == NULL) {
1116                 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1117                 return(-EPROTO);
1118         }
1119
1120         /* return error if any niobuf was in error */
1121         for (i = 0; i < niocount; i++) {
1122                 if ((int)remote_rcs[i] < 0)
1123                         return(remote_rcs[i]);
1124
1125                 if (remote_rcs[i] != 0) {
1126                         CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1127                                 i, remote_rcs[i], req);
1128                         return(-EPROTO);
1129                 }
1130         }
1131
1132         if (req->rq_bulk->bd_nob_transferred != requested_nob) {
1133                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1134                        req->rq_bulk->bd_nob_transferred, requested_nob);
1135                 return(-EPROTO);
1136         }
1137
1138         return (0);
1139 }
1140
1141 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1142 {
1143         if (p1->flag != p2->flag) {
1144                 unsigned mask = ~(OBD_BRW_FROM_GRANT | OBD_BRW_NOCACHE |
1145                                   OBD_BRW_SYNC       | OBD_BRW_ASYNC   |
1146                                   OBD_BRW_NOQUOTA    | OBD_BRW_SOFT_SYNC);
1147
1148                 /* warn if we try to combine flags that we don't know to be
1149                  * safe to combine */
1150                 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
1151                         CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
1152                               "report this at https://jira.hpdd.intel.com/\n",
1153                               p1->flag, p2->flag);
1154                 }
1155                 return 0;
1156         }
1157
1158         return (p1->off + p1->count == p2->off);
1159 }
1160
1161 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
1162                                    struct brw_page **pga, int opc,
1163                                    cksum_type_t cksum_type)
1164 {
1165         __u32                           cksum;
1166         int                             i = 0;
1167         struct cfs_crypto_hash_desc     *hdesc;
1168         unsigned int                    bufsize;
1169         int                             err;
1170         unsigned char                   cfs_alg = cksum_obd2cfs(cksum_type);
1171
1172         LASSERT(pg_count > 0);
1173
1174         hdesc = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1175         if (IS_ERR(hdesc)) {
1176                 CERROR("Unable to initialize checksum hash %s\n",
1177                        cfs_crypto_hash_name(cfs_alg));
1178                 return PTR_ERR(hdesc);
1179         }
1180
1181         while (nob > 0 && pg_count > 0) {
1182                 int count = pga[i]->count > nob ? nob : pga[i]->count;
1183
1184                 /* corrupt the data before we compute the checksum, to
1185                  * simulate an OST->client data error */
1186                 if (i == 0 && opc == OST_READ &&
1187                     OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
1188                         unsigned char *ptr = kmap(pga[i]->pg);
1189                         int off = pga[i]->off & ~CFS_PAGE_MASK;
1190
1191                         memcpy(ptr + off, "bad1", min(4, nob));
1192                         kunmap(pga[i]->pg);
1193                 }
1194                 cfs_crypto_hash_update_page(hdesc, pga[i]->pg,
1195                                             pga[i]->off & ~CFS_PAGE_MASK,
1196                                             count);
1197                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d\n",
1198                                (int)(pga[i]->off & ~CFS_PAGE_MASK));
1199
1200                 nob -= pga[i]->count;
1201                 pg_count--;
1202                 i++;
1203         }
1204
1205         bufsize = sizeof(cksum);
1206         err = cfs_crypto_hash_final(hdesc, (unsigned char *)&cksum, &bufsize);
1207
1208         /* For sending we only compute the wrong checksum instead
1209          * of corrupting the data so it is still correct on a redo */
1210         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1211                 cksum++;
1212
1213         return cksum;
1214 }
1215
1216 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
1217                                 struct lov_stripe_md *lsm, obd_count page_count,
1218                                 struct brw_page **pga,
1219                                 struct ptlrpc_request **reqp,
1220                                 struct obd_capa *ocapa, int reserve,
1221                                 int resend)
1222 {
1223         struct ptlrpc_request   *req;
1224         struct ptlrpc_bulk_desc *desc;
1225         struct ost_body         *body;
1226         struct obd_ioobj        *ioobj;
1227         struct niobuf_remote    *niobuf;
1228         int niocount, i, requested_nob, opc, rc;
1229         struct osc_brw_async_args *aa;
1230         struct req_capsule      *pill;
1231         struct brw_page *pg_prev;
1232
1233         ENTRY;
1234         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1235                 RETURN(-ENOMEM); /* Recoverable */
1236         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1237                 RETURN(-EINVAL); /* Fatal */
1238
1239         if ((cmd & OBD_BRW_WRITE) != 0) {
1240                 opc = OST_WRITE;
1241                 req = ptlrpc_request_alloc_pool(cli->cl_import,
1242                                                 cli->cl_import->imp_rq_pool,
1243                                                 &RQF_OST_BRW_WRITE);
1244         } else {
1245                 opc = OST_READ;
1246                 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1247         }
1248         if (req == NULL)
1249                 RETURN(-ENOMEM);
1250
1251         for (niocount = i = 1; i < page_count; i++) {
1252                 if (!can_merge_pages(pga[i - 1], pga[i]))
1253                         niocount++;
1254         }
1255
1256         pill = &req->rq_pill;
1257         req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1258                              sizeof(*ioobj));
1259         req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1260                              niocount * sizeof(*niobuf));
1261         osc_set_capa_size(req, &RMF_CAPA1, ocapa);
1262
1263         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1264         if (rc) {
1265                 ptlrpc_request_free(req);
1266                 RETURN(rc);
1267         }
1268         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1269         ptlrpc_at_set_req_timeout(req);
1270         /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1271          * retry logic */
1272         req->rq_no_retry_einprogress = 1;
1273
1274         desc = ptlrpc_prep_bulk_imp(req, page_count,
1275                 cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
1276                 opc == OST_WRITE ? BULK_GET_SOURCE : BULK_PUT_SINK,
1277                 OST_BULK_PORTAL);
1278
1279         if (desc == NULL)
1280                 GOTO(out, rc = -ENOMEM);
1281         /* NB request now owns desc and will free it when it gets freed */
1282
1283         body = req_capsule_client_get(pill, &RMF_OST_BODY);
1284         ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1285         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1286         LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1287
1288         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
1289
1290         obdo_to_ioobj(oa, ioobj);
1291         ioobj->ioo_bufcnt = niocount;
1292         /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
1293          * that might be send for this request.  The actual number is decided
1294          * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
1295          * "max - 1" for old client compatibility sending "0", and also so the
1296          * the actual maximum is a power-of-two number, not one less. LU-1431 */
1297         ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
1298         osc_pack_capa(req, body, ocapa);
1299         LASSERT(page_count > 0);
1300         pg_prev = pga[0];
1301         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1302                 struct brw_page *pg = pga[i];
1303                 int poff = pg->off & ~CFS_PAGE_MASK;
1304
1305                 LASSERT(pg->count > 0);
1306                 /* make sure there is no gap in the middle of page array */
1307                 LASSERTF(page_count == 1 ||
1308                          (ergo(i == 0, poff + pg->count == PAGE_CACHE_SIZE) &&
1309                           ergo(i > 0 && i < page_count - 1,
1310                                poff == 0 && pg->count == PAGE_CACHE_SIZE)   &&
1311                           ergo(i == page_count - 1, poff == 0)),
1312                          "i: %d/%d pg: %p off: "LPU64", count: %u\n",
1313                          i, page_count, pg, pg->off, pg->count);
1314                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1315                          "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1316                          " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1317                          i, page_count,
1318                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1319                          pg_prev->pg, page_private(pg_prev->pg),
1320                          pg_prev->pg->index, pg_prev->off);
1321                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1322                         (pg->flag & OBD_BRW_SRVLOCK));
1323
1324                 ptlrpc_prep_bulk_page_pin(desc, pg->pg, poff, pg->count);
1325                 requested_nob += pg->count;
1326
1327                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1328                         niobuf--;
1329                         niobuf->rnb_len += pg->count;
1330                 } else {
1331                         niobuf->rnb_offset = pg->off;
1332                         niobuf->rnb_len    = pg->count;
1333                         niobuf->rnb_flags  = pg->flag;
1334                 }
1335                 pg_prev = pg;
1336         }
1337
1338         LASSERTF((void *)(niobuf - niocount) ==
1339                 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1340                 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1341                 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1342
1343         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1344         if (resend) {
1345                 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1346                         body->oa.o_valid |= OBD_MD_FLFLAGS;
1347                         body->oa.o_flags = 0;
1348                 }
1349                 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1350         }
1351
1352         if (osc_should_shrink_grant(cli))
1353                 osc_shrink_grant_local(cli, &body->oa);
1354
1355         /* size[REQ_REC_OFF] still sizeof (*body) */
1356         if (opc == OST_WRITE) {
1357                 if (cli->cl_checksum &&
1358                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1359                         /* store cl_cksum_type in a local variable since
1360                          * it can be changed via lprocfs */
1361                         cksum_type_t cksum_type = cli->cl_cksum_type;
1362
1363                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1364                                 oa->o_flags &= OBD_FL_LOCAL_MASK;
1365                                 body->oa.o_flags = 0;
1366                         }
1367                         body->oa.o_flags |= cksum_type_pack(cksum_type);
1368                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1369                         body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1370                                                              page_count, pga,
1371                                                              OST_WRITE,
1372                                                              cksum_type);
1373                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1374                                body->oa.o_cksum);
1375                         /* save this in 'oa', too, for later checking */
1376                         oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1377                         oa->o_flags |= cksum_type_pack(cksum_type);
1378                 } else {
1379                         /* clear out the checksum flag, in case this is a
1380                          * resend but cl_checksum is no longer set. b=11238 */
1381                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1382                 }
1383                 oa->o_cksum = body->oa.o_cksum;
1384                 /* 1 RC per niobuf */
1385                 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1386                                      sizeof(__u32) * niocount);
1387         } else {
1388                 if (cli->cl_checksum &&
1389                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1390                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1391                                 body->oa.o_flags = 0;
1392                         body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1393                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1394                 }
1395         }
1396         ptlrpc_request_set_replen(req);
1397
1398         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1399         aa = ptlrpc_req_async_args(req);
1400         aa->aa_oa = oa;
1401         aa->aa_requested_nob = requested_nob;
1402         aa->aa_nio_count = niocount;
1403         aa->aa_page_count = page_count;
1404         aa->aa_resends = 0;
1405         aa->aa_ppga = pga;
1406         aa->aa_cli = cli;
1407         INIT_LIST_HEAD(&aa->aa_oaps);
1408         if (ocapa && reserve)
1409                 aa->aa_ocapa = capa_get(ocapa);
1410
1411         *reqp = req;
1412         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1413         CDEBUG(D_RPCTRACE, "brw rpc %p - object "DOSTID" offset %lld<>%lld\n",
1414                 req, POSTID(&oa->o_oi), niobuf[0].rnb_offset,
1415                 niobuf[niocount - 1].rnb_offset + niobuf[niocount - 1].rnb_len);
1416         RETURN(0);
1417
1418  out:
1419         ptlrpc_req_finished(req);
1420         RETURN(rc);
1421 }
1422
1423 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1424                                 __u32 client_cksum, __u32 server_cksum, int nob,
1425                                 obd_count page_count, struct brw_page **pga,
1426                                 cksum_type_t client_cksum_type)
1427 {
1428         __u32 new_cksum;
1429         char *msg;
1430         cksum_type_t cksum_type;
1431
1432         if (server_cksum == client_cksum) {
1433                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1434                 return 0;
1435         }
1436
1437         cksum_type = cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1438                                        oa->o_flags : 0);
1439         new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1440                                       cksum_type);
1441
1442         if (cksum_type != client_cksum_type)
1443                 msg = "the server did not use the checksum type specified in "
1444                       "the original request - likely a protocol problem";
1445         else if (new_cksum == server_cksum)
1446                 msg = "changed on the client after we checksummed it - "
1447                       "likely false positive due to mmap IO (bug 11742)";
1448         else if (new_cksum == client_cksum)
1449                 msg = "changed in transit before arrival at OST";
1450         else
1451                 msg = "changed in transit AND doesn't match the original - "
1452                       "likely false positive due to mmap IO (bug 11742)";
1453
1454         LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inode "DFID
1455                            " object "DOSTID" extent ["LPU64"-"LPU64"]\n",
1456                            msg, libcfs_nid2str(peer->nid),
1457                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1458                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1459                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1460                            POSTID(&oa->o_oi), pga[0]->off,
1461                            pga[page_count-1]->off + pga[page_count-1]->count - 1);
1462         CERROR("original client csum %x (type %x), server csum %x (type %x), "
1463                "client csum now %x\n", client_cksum, client_cksum_type,
1464                server_cksum, cksum_type, new_cksum);
1465         return 1;
1466 }
1467
1468 /* Note rc enters this function as number of bytes transferred */
1469 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1470 {
1471         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1472         const lnet_process_id_t *peer =
1473                         &req->rq_import->imp_connection->c_peer;
1474         struct client_obd *cli = aa->aa_cli;
1475         struct ost_body *body;
1476         __u32 client_cksum = 0;
1477         ENTRY;
1478
1479         if (rc < 0 && rc != -EDQUOT) {
1480                 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1481                 RETURN(rc);
1482         }
1483
1484         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1485         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1486         if (body == NULL) {
1487                 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1488                 RETURN(-EPROTO);
1489         }
1490
1491         /* set/clear over quota flag for a uid/gid */
1492         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1493             body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) {
1494                 unsigned int qid[MAXQUOTAS] = { body->oa.o_uid, body->oa.o_gid };
1495
1496                 CDEBUG(D_QUOTA, "setdq for [%u %u] with valid "LPX64", flags %x\n",
1497                        body->oa.o_uid, body->oa.o_gid, body->oa.o_valid,
1498                        body->oa.o_flags);
1499                 osc_quota_setdq(cli, qid, body->oa.o_valid, body->oa.o_flags);
1500         }
1501
1502         osc_update_grant(cli, body);
1503
1504         if (rc < 0)
1505                 RETURN(rc);
1506
1507         if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1508                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1509
1510         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1511                 if (rc > 0) {
1512                         CERROR("Unexpected +ve rc %d\n", rc);
1513                         RETURN(-EPROTO);
1514                 }
1515                 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1516
1517                 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1518                         RETURN(-EAGAIN);
1519
1520                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1521                     check_write_checksum(&body->oa, peer, client_cksum,
1522                                          body->oa.o_cksum, aa->aa_requested_nob,
1523                                          aa->aa_page_count, aa->aa_ppga,
1524                                          cksum_type_unpack(aa->aa_oa->o_flags)))
1525                         RETURN(-EAGAIN);
1526
1527                 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1528                                      aa->aa_page_count, aa->aa_ppga);
1529                 GOTO(out, rc);
1530         }
1531
1532         /* The rest of this function executes only for OST_READs */
1533
1534         /* if unwrap_bulk failed, return -EAGAIN to retry */
1535         rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1536         if (rc < 0)
1537                 GOTO(out, rc = -EAGAIN);
1538
1539         if (rc > aa->aa_requested_nob) {
1540                 CERROR("Unexpected rc %d (%d requested)\n", rc,
1541                        aa->aa_requested_nob);
1542                 RETURN(-EPROTO);
1543         }
1544
1545         if (rc != req->rq_bulk->bd_nob_transferred) {
1546                 CERROR ("Unexpected rc %d (%d transferred)\n",
1547                         rc, req->rq_bulk->bd_nob_transferred);
1548                 return (-EPROTO);
1549         }
1550
1551         if (rc < aa->aa_requested_nob)
1552                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1553
1554         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1555                 static int cksum_counter;
1556                 __u32      server_cksum = body->oa.o_cksum;
1557                 char      *via;
1558                 char      *router;
1559                 cksum_type_t cksum_type;
1560
1561                 cksum_type = cksum_type_unpack(body->oa.o_valid &OBD_MD_FLFLAGS?
1562                                                body->oa.o_flags : 0);
1563                 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1564                                                  aa->aa_ppga, OST_READ,
1565                                                  cksum_type);
1566
1567                 if (peer->nid == req->rq_bulk->bd_sender) {
1568                         via = router = "";
1569                 } else {
1570                         via = " via ";
1571                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
1572                 }
1573
1574                 if (server_cksum != client_cksum) {
1575                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1576                                            "%s%s%s inode "DFID" object "DOSTID
1577                                            " extent ["LPU64"-"LPU64"]\n",
1578                                            req->rq_import->imp_obd->obd_name,
1579                                            libcfs_nid2str(peer->nid),
1580                                            via, router,
1581                                            body->oa.o_valid & OBD_MD_FLFID ?
1582                                                 body->oa.o_parent_seq : (__u64)0,
1583                                            body->oa.o_valid & OBD_MD_FLFID ?
1584                                                 body->oa.o_parent_oid : 0,
1585                                            body->oa.o_valid & OBD_MD_FLFID ?
1586                                                 body->oa.o_parent_ver : 0,
1587                                            POSTID(&body->oa.o_oi),
1588                                            aa->aa_ppga[0]->off,
1589                                            aa->aa_ppga[aa->aa_page_count-1]->off +
1590                                            aa->aa_ppga[aa->aa_page_count-1]->count -
1591                                                                         1);
1592                         CERROR("client %x, server %x, cksum_type %x\n",
1593                                client_cksum, server_cksum, cksum_type);
1594                         cksum_counter = 0;
1595                         aa->aa_oa->o_cksum = client_cksum;
1596                         rc = -EAGAIN;
1597                 } else {
1598                         cksum_counter++;
1599                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1600                         rc = 0;
1601                 }
1602         } else if (unlikely(client_cksum)) {
1603                 static int cksum_missed;
1604
1605                 cksum_missed++;
1606                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1607                         CERROR("Checksum %u requested from %s but not sent\n",
1608                                cksum_missed, libcfs_nid2str(peer->nid));
1609         } else {
1610                 rc = 0;
1611         }
1612 out:
1613         if (rc >= 0)
1614                 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
1615                                      aa->aa_oa, &body->oa);
1616
1617         RETURN(rc);
1618 }
1619
1620 static int osc_brw_redo_request(struct ptlrpc_request *request,
1621                                 struct osc_brw_async_args *aa, int rc)
1622 {
1623         struct ptlrpc_request *new_req;
1624         struct osc_brw_async_args *new_aa;
1625         struct osc_async_page *oap;
1626         ENTRY;
1627
1628         DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
1629                   "redo for recoverable error %d", rc);
1630
1631         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1632                                         OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1633                                   aa->aa_cli, aa->aa_oa,
1634                                   NULL /* lsm unused by osc currently */,
1635                                   aa->aa_page_count, aa->aa_ppga,
1636                                   &new_req, aa->aa_ocapa, 0, 1);
1637         if (rc)
1638                 RETURN(rc);
1639
1640         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1641                 if (oap->oap_request != NULL) {
1642                         LASSERTF(request == oap->oap_request,
1643                                  "request %p != oap_request %p\n",
1644                                  request, oap->oap_request);
1645                         if (oap->oap_interrupted) {
1646                                 ptlrpc_req_finished(new_req);
1647                                 RETURN(-EINTR);
1648                         }
1649                 }
1650         }
1651         /* New request takes over pga and oaps from old request.
1652          * Note that copying a list_head doesn't work, need to move it... */
1653         aa->aa_resends++;
1654         new_req->rq_interpret_reply = request->rq_interpret_reply;
1655         new_req->rq_async_args = request->rq_async_args;
1656         new_req->rq_commit_cb = request->rq_commit_cb;
1657         /* cap resend delay to the current request timeout, this is similar to
1658          * what ptlrpc does (see after_reply()) */
1659         if (aa->aa_resends > new_req->rq_timeout)
1660                 new_req->rq_sent = cfs_time_current_sec() + new_req->rq_timeout;
1661         else
1662                 new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
1663         new_req->rq_generation_set = 1;
1664         new_req->rq_import_generation = request->rq_import_generation;
1665
1666         new_aa = ptlrpc_req_async_args(new_req);
1667
1668         INIT_LIST_HEAD(&new_aa->aa_oaps);
1669         list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
1670         INIT_LIST_HEAD(&new_aa->aa_exts);
1671         list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
1672         new_aa->aa_resends = aa->aa_resends;
1673
1674         list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1675                 if (oap->oap_request) {
1676                         ptlrpc_req_finished(oap->oap_request);
1677                         oap->oap_request = ptlrpc_request_addref(new_req);
1678                 }
1679         }
1680
1681         new_aa->aa_ocapa = aa->aa_ocapa;
1682         aa->aa_ocapa = NULL;
1683
1684         /* XXX: This code will run into problem if we're going to support
1685          * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
1686          * and wait for all of them to be finished. We should inherit request
1687          * set from old request. */
1688         ptlrpcd_add_req(new_req, PDL_POLICY_SAME, -1);
1689
1690         DEBUG_REQ(D_INFO, new_req, "new request");
1691         RETURN(0);
1692 }
1693
1694 /*
1695  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1696  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1697  * fine for our small page arrays and doesn't require allocation.  its an
1698  * insertion sort that swaps elements that are strides apart, shrinking the
1699  * stride down until its '1' and the array is sorted.
1700  */
1701 static void sort_brw_pages(struct brw_page **array, int num)
1702 {
1703         int stride, i, j;
1704         struct brw_page *tmp;
1705
1706         if (num == 1)
1707                 return;
1708         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1709                 ;
1710
1711         do {
1712                 stride /= 3;
1713                 for (i = stride ; i < num ; i++) {
1714                         tmp = array[i];
1715                         j = i;
1716                         while (j >= stride && array[j - stride]->off > tmp->off) {
1717                                 array[j] = array[j - stride];
1718                                 j -= stride;
1719                         }
1720                         array[j] = tmp;
1721                 }
1722         } while (stride > 1);
1723 }
1724
1725 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1726 {
1727         LASSERT(ppga != NULL);
1728         OBD_FREE(ppga, sizeof(*ppga) * count);
1729 }
1730
1731 static int brw_interpret(const struct lu_env *env,
1732                          struct ptlrpc_request *req, void *data, int rc)
1733 {
1734         struct osc_brw_async_args *aa = data;
1735         struct osc_extent *ext;
1736         struct osc_extent *tmp;
1737         struct client_obd *cli = aa->aa_cli;
1738         ENTRY;
1739
1740         rc = osc_brw_fini_request(req, rc);
1741         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
1742         /* When server return -EINPROGRESS, client should always retry
1743          * regardless of the number of times the bulk was resent already. */
1744         if (osc_recoverable_error(rc)) {
1745                 if (req->rq_import_generation !=
1746                     req->rq_import->imp_generation) {
1747                         CDEBUG(D_HA, "%s: resend cross eviction for object: "
1748                                ""DOSTID", rc = %d.\n",
1749                                req->rq_import->imp_obd->obd_name,
1750                                POSTID(&aa->aa_oa->o_oi), rc);
1751                 } else if (rc == -EINPROGRESS ||
1752                     client_should_resend(aa->aa_resends, aa->aa_cli)) {
1753                         rc = osc_brw_redo_request(req, aa, rc);
1754                 } else {
1755                         CERROR("%s: too many resent retries for object: "
1756                                ""LPU64":"LPU64", rc = %d.\n",
1757                                req->rq_import->imp_obd->obd_name,
1758                                POSTID(&aa->aa_oa->o_oi), rc);
1759                 }
1760
1761                 if (rc == 0)
1762                         RETURN(0);
1763                 else if (rc == -EAGAIN || rc == -EINPROGRESS)
1764                         rc = -EIO;
1765         }
1766
1767         if (aa->aa_ocapa) {
1768                 capa_put(aa->aa_ocapa);
1769                 aa->aa_ocapa = NULL;
1770         }
1771
1772         if (rc == 0) {
1773                 struct obdo *oa = aa->aa_oa;
1774                 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
1775                 unsigned long valid = 0;
1776                 struct cl_object *obj;
1777                 struct osc_async_page *last;
1778
1779                 last = brw_page2oap(aa->aa_ppga[aa->aa_page_count - 1]);
1780                 obj = osc2cl(last->oap_obj);
1781
1782                 cl_object_attr_lock(obj);
1783                 if (oa->o_valid & OBD_MD_FLBLOCKS) {
1784                         attr->cat_blocks = oa->o_blocks;
1785                         valid |= CAT_BLOCKS;
1786                 }
1787                 if (oa->o_valid & OBD_MD_FLMTIME) {
1788                         attr->cat_mtime = oa->o_mtime;
1789                         valid |= CAT_MTIME;
1790                 }
1791                 if (oa->o_valid & OBD_MD_FLATIME) {
1792                         attr->cat_atime = oa->o_atime;
1793                         valid |= CAT_ATIME;
1794                 }
1795                 if (oa->o_valid & OBD_MD_FLCTIME) {
1796                         attr->cat_ctime = oa->o_ctime;
1797                         valid |= CAT_CTIME;
1798                 }
1799
1800                 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1801                         struct lov_oinfo *loi = cl2osc(obj)->oo_oinfo;
1802                         loff_t last_off = last->oap_count + last->oap_obj_off +
1803                                 last->oap_page_off;
1804
1805                         /* Change file size if this is an out of quota or
1806                          * direct IO write and it extends the file size */
1807                         if (loi->loi_lvb.lvb_size < last_off) {
1808                                 attr->cat_size = last_off;
1809                                 valid |= CAT_SIZE;
1810                         }
1811                         /* Extend KMS if it's not a lockless write */
1812                         if (loi->loi_kms < last_off &&
1813                             oap2osc_page(last)->ops_srvlock == 0) {
1814                                 attr->cat_kms = last_off;
1815                                 valid |= CAT_KMS;
1816                         }
1817                 }
1818
1819                 if (valid != 0)
1820                         cl_object_attr_set(env, obj, attr, valid);
1821                 cl_object_attr_unlock(obj);
1822         }
1823         OBDO_FREE(aa->aa_oa);
1824
1825         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE && rc == 0)
1826                 osc_inc_unstable_pages(req);
1827
1828         list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
1829                 list_del_init(&ext->oe_link);
1830                 osc_extent_finish(env, ext, 1, rc);
1831         }
1832         LASSERT(list_empty(&aa->aa_exts));
1833         LASSERT(list_empty(&aa->aa_oaps));
1834
1835         cl_req_completion(env, aa->aa_clerq, rc < 0 ? rc :
1836                           req->rq_bulk->bd_nob_transferred);
1837         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1838         ptlrpc_lprocfs_brw(req, req->rq_bulk->bd_nob_transferred);
1839
1840         client_obd_list_lock(&cli->cl_loi_list_lock);
1841         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
1842          * is called so we know whether to go to sync BRWs or wait for more
1843          * RPCs to complete */
1844         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
1845                 cli->cl_w_in_flight--;
1846         else
1847                 cli->cl_r_in_flight--;
1848         osc_wake_cache_waiters(cli);
1849         client_obd_list_unlock(&cli->cl_loi_list_lock);
1850
1851         osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
1852         RETURN(rc);
1853 }
1854
1855 static void brw_commit(struct ptlrpc_request *req)
1856 {
1857         /* If osc_inc_unstable_pages (via osc_extent_finish) races with
1858          * this called via the rq_commit_cb, I need to ensure
1859          * osc_dec_unstable_pages is still called. Otherwise unstable
1860          * pages may be leaked. */
1861         spin_lock(&req->rq_lock);
1862         if (likely(req->rq_unstable)) {
1863                 req->rq_unstable = 0;
1864                 spin_unlock(&req->rq_lock);
1865
1866                 osc_dec_unstable_pages(req);
1867         } else {
1868                 req->rq_committed = 1;
1869                 spin_unlock(&req->rq_lock);
1870         }
1871 }
1872
1873 /**
1874  * Build an RPC by the list of extent @ext_list. The caller must ensure
1875  * that the total pages in this list are NOT over max pages per RPC.
1876  * Extents in the list must be in OES_RPC state.
1877  */
1878 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
1879                   struct list_head *ext_list, int cmd, pdl_policy_t pol)
1880 {
1881         struct ptlrpc_request           *req = NULL;
1882         struct osc_extent               *ext;
1883         struct brw_page                 **pga = NULL;
1884         struct osc_brw_async_args       *aa = NULL;
1885         struct obdo                     *oa = NULL;
1886         struct osc_async_page           *oap;
1887         struct osc_async_page           *tmp;
1888         struct cl_req                   *clerq = NULL;
1889         enum cl_req_type                crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE :
1890                                                                       CRT_READ;
1891         struct cl_req_attr              *crattr = NULL;
1892         obd_off                         starting_offset = OBD_OBJECT_EOF;
1893         obd_off                         ending_offset = 0;
1894         int                             mpflag = 0;
1895         int                             mem_tight = 0;
1896         int                             page_count = 0;
1897         bool                            soft_sync = false;
1898         int                             i;
1899         int                             rc;
1900         struct list_head                rpc_list = LIST_HEAD_INIT(rpc_list);
1901
1902         ENTRY;
1903         LASSERT(!list_empty(ext_list));
1904
1905         /* add pages into rpc_list to build BRW rpc */
1906         list_for_each_entry(ext, ext_list, oe_link) {
1907                 LASSERT(ext->oe_state == OES_RPC);
1908                 mem_tight |= ext->oe_memalloc;
1909                 list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
1910                         ++page_count;
1911                         list_add_tail(&oap->oap_rpc_item, &rpc_list);
1912                         if (starting_offset > oap->oap_obj_off)
1913                                 starting_offset = oap->oap_obj_off;
1914                         else
1915                                 LASSERT(oap->oap_page_off == 0);
1916                         if (ending_offset < oap->oap_obj_off + oap->oap_count)
1917                                 ending_offset = oap->oap_obj_off +
1918                                                 oap->oap_count;
1919                         else
1920                                 LASSERT(oap->oap_page_off + oap->oap_count ==
1921                                         PAGE_CACHE_SIZE);
1922                 }
1923         }
1924
1925         soft_sync = osc_over_unstable_soft_limit(cli);
1926         if (mem_tight)
1927                 mpflag = cfs_memory_pressure_get_and_set();
1928
1929         OBD_ALLOC(crattr, sizeof(*crattr));
1930         if (crattr == NULL)
1931                 GOTO(out, rc = -ENOMEM);
1932
1933         OBD_ALLOC(pga, sizeof(*pga) * page_count);
1934         if (pga == NULL)
1935                 GOTO(out, rc = -ENOMEM);
1936
1937         OBDO_ALLOC(oa);
1938         if (oa == NULL)
1939                 GOTO(out, rc = -ENOMEM);
1940
1941         i = 0;
1942         list_for_each_entry(oap, &rpc_list, oap_rpc_item) {
1943                 struct cl_page *page = oap2cl_page(oap);
1944                 if (clerq == NULL) {
1945                         clerq = cl_req_alloc(env, page, crt,
1946                                              1 /* only 1-object rpcs for now */);
1947                         if (IS_ERR(clerq))
1948                                 GOTO(out, rc = PTR_ERR(clerq));
1949                 }
1950                 if (mem_tight)
1951                         oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
1952                 if (soft_sync)
1953                         oap->oap_brw_flags |= OBD_BRW_SOFT_SYNC;
1954                 pga[i] = &oap->oap_brw_page;
1955                 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
1956                 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
1957                        pga[i]->pg, page_index(oap->oap_page), oap,
1958                        pga[i]->flag);
1959                 i++;
1960                 cl_req_page_add(env, clerq, page);
1961         }
1962
1963         /* always get the data for the obdo for the rpc */
1964         LASSERT(clerq != NULL);
1965         crattr->cra_oa = oa;
1966         cl_req_attr_set(env, clerq, crattr, ~0ULL);
1967
1968         rc = cl_req_prep(env, clerq);
1969         if (rc != 0) {
1970                 CERROR("cl_req_prep failed: %d\n", rc);
1971                 GOTO(out, rc);
1972         }
1973
1974         sort_brw_pages(pga, page_count);
1975         rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
1976                         pga, &req, crattr->cra_capa, 1, 0);
1977         if (rc != 0) {
1978                 CERROR("prep_req failed: %d\n", rc);
1979                 GOTO(out, rc);
1980         }
1981
1982         req->rq_commit_cb = brw_commit;
1983         req->rq_interpret_reply = brw_interpret;
1984
1985         if (mem_tight != 0)
1986                 req->rq_memalloc = 1;
1987
1988         /* Need to update the timestamps after the request is built in case
1989          * we race with setattr (locally or in queue at OST).  If OST gets
1990          * later setattr before earlier BRW (as determined by the request xid),
1991          * the OST will not use BRW timestamps.  Sadly, there is no obvious
1992          * way to do this in a single call.  bug 10150 */
1993         cl_req_attr_set(env, clerq, crattr,
1994                         OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME);
1995
1996         lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid);
1997
1998         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1999         aa = ptlrpc_req_async_args(req);
2000         INIT_LIST_HEAD(&aa->aa_oaps);
2001         list_splice_init(&rpc_list, &aa->aa_oaps);
2002         INIT_LIST_HEAD(&aa->aa_exts);
2003         list_splice_init(ext_list, &aa->aa_exts);
2004         aa->aa_clerq = clerq;
2005
2006         /* queued sync pages can be torn down while the pages
2007          * were between the pending list and the rpc */
2008         tmp = NULL;
2009         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2010                 /* only one oap gets a request reference */
2011                 if (tmp == NULL)
2012                         tmp = oap;
2013                 if (oap->oap_interrupted && !req->rq_intr) {
2014                         CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2015                                         oap, req);
2016                         ptlrpc_mark_interrupted(req);
2017                 }
2018         }
2019         if (tmp != NULL)
2020                 tmp->oap_request = ptlrpc_request_addref(req);
2021
2022         client_obd_list_lock(&cli->cl_loi_list_lock);
2023         starting_offset >>= PAGE_CACHE_SHIFT;
2024         if (cmd == OBD_BRW_READ) {
2025                 cli->cl_r_in_flight++;
2026                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2027                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2028                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2029                                       starting_offset + 1);
2030         } else {
2031                 cli->cl_w_in_flight++;
2032                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2033                 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
2034                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2035                                       starting_offset + 1);
2036         }
2037         client_obd_list_unlock(&cli->cl_loi_list_lock);
2038
2039         DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %ur/%uw in flight",
2040                   page_count, aa, cli->cl_r_in_flight,
2041                   cli->cl_w_in_flight);
2042
2043         /* XXX: Maybe the caller can check the RPC bulk descriptor to
2044          * see which CPU/NUMA node the majority of pages were allocated
2045          * on, and try to assign the async RPC to the CPU core
2046          * (PDL_POLICY_PREFERRED) to reduce cross-CPU memory traffic.
2047          *
2048          * But on the other hand, we expect that multiple ptlrpcd
2049          * threads and the initial write sponsor can run in parallel,
2050          * especially when data checksum is enabled, which is CPU-bound
2051          * operation and single ptlrpcd thread cannot process in time.
2052          * So more ptlrpcd threads sharing BRW load
2053          * (with PDL_POLICY_ROUND) seems better.
2054          */
2055         ptlrpcd_add_req(req, pol, -1);
2056         rc = 0;
2057         EXIT;
2058
2059 out:
2060         if (mem_tight != 0)
2061                 cfs_memory_pressure_restore(mpflag);
2062
2063         if (crattr != NULL) {
2064                 capa_put(crattr->cra_capa);
2065                 OBD_FREE(crattr, sizeof(*crattr));
2066         }
2067
2068         if (rc != 0) {
2069                 LASSERT(req == NULL);
2070
2071                 if (oa)
2072                         OBDO_FREE(oa);
2073                 if (pga)
2074                         OBD_FREE(pga, sizeof(*pga) * page_count);
2075                 /* this should happen rarely and is pretty bad, it makes the
2076                  * pending list not follow the dirty order */
2077                 while (!list_empty(ext_list)) {
2078                         ext = list_entry(ext_list->next, struct osc_extent,
2079                                          oe_link);
2080                         list_del_init(&ext->oe_link);
2081                         osc_extent_finish(env, ext, 0, rc);
2082                 }
2083                 if (clerq && !IS_ERR(clerq))
2084                         cl_req_completion(env, clerq, rc);
2085         }
2086         RETURN(rc);
2087 }
2088
2089 static int osc_set_lock_data_with_check(struct ldlm_lock *lock,
2090                                         struct ldlm_enqueue_info *einfo)
2091 {
2092         void *data = einfo->ei_cbdata;
2093         int set = 0;
2094
2095         LASSERT(lock != NULL);
2096         LASSERT(lock->l_blocking_ast == einfo->ei_cb_bl);
2097         LASSERT(lock->l_resource->lr_type == einfo->ei_type);
2098         LASSERT(lock->l_completion_ast == einfo->ei_cb_cp);
2099         LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl);
2100
2101         lock_res_and_lock(lock);
2102
2103         if (lock->l_ast_data == NULL)
2104                 lock->l_ast_data = data;
2105         if (lock->l_ast_data == data)
2106                 set = 1;
2107
2108         unlock_res_and_lock(lock);
2109
2110         return set;
2111 }
2112
2113 static int osc_set_data_with_check(struct lustre_handle *lockh,
2114                                    struct ldlm_enqueue_info *einfo)
2115 {
2116         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2117         int set = 0;
2118
2119         if (lock != NULL) {
2120                 set = osc_set_lock_data_with_check(lock, einfo);
2121                 LDLM_LOCK_PUT(lock);
2122         } else
2123                 CERROR("lockh %p, data %p - client evicted?\n",
2124                        lockh, einfo->ei_cbdata);
2125         return set;
2126 }
2127
2128 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2129                              ldlm_iterator_t replace, void *data)
2130 {
2131         struct ldlm_res_id res_id;
2132         struct obd_device *obd = class_exp2obd(exp);
2133
2134         ostid_build_res_name(&lsm->lsm_oi, &res_id);
2135         ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2136         return 0;
2137 }
2138
2139 /* find any ldlm lock of the inode in osc
2140  * return 0    not find
2141  *        1    find one
2142  *      < 0    error */
2143 static int osc_find_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2144                            ldlm_iterator_t replace, void *data)
2145 {
2146         struct ldlm_res_id res_id;
2147         struct obd_device *obd = class_exp2obd(exp);
2148         int rc = 0;
2149
2150         ostid_build_res_name(&lsm->lsm_oi, &res_id);
2151         rc = ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2152         if (rc == LDLM_ITER_STOP)
2153                 return(1);
2154         if (rc == LDLM_ITER_CONTINUE)
2155                 return(0);
2156         return(rc);
2157 }
2158
2159 static int osc_enqueue_fini(struct ptlrpc_request *req,
2160                             osc_enqueue_upcall_f upcall, void *cookie,
2161                             struct lustre_handle *lockh, ldlm_mode_t mode,
2162                             __u64 *flags, int agl, int errcode)
2163 {
2164         bool intent = *flags & LDLM_FL_HAS_INTENT;
2165         int rc;
2166         ENTRY;
2167
2168         /* The request was created before ldlm_cli_enqueue call. */
2169         if (intent && errcode == ELDLM_LOCK_ABORTED) {
2170                 struct ldlm_reply *rep;
2171
2172                 rep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
2173                 LASSERT(rep != NULL);
2174
2175                 rep->lock_policy_res1 =
2176                         ptlrpc_status_ntoh(rep->lock_policy_res1);
2177                 if (rep->lock_policy_res1)
2178                         errcode = rep->lock_policy_res1;
2179                 if (!agl)
2180                         *flags |= LDLM_FL_LVB_READY;
2181         } else if (errcode == ELDLM_OK) {
2182                 *flags |= LDLM_FL_LVB_READY;
2183         }
2184
2185         /* Call the update callback. */
2186         rc = (*upcall)(cookie, lockh, errcode);
2187
2188         /* release the reference taken in ldlm_cli_enqueue() */
2189         if (errcode == ELDLM_LOCK_MATCHED)
2190                 errcode = ELDLM_OK;
2191         if (errcode == ELDLM_OK && lustre_handle_is_used(lockh))
2192                 ldlm_lock_decref(lockh, mode);
2193
2194         RETURN(rc);
2195 }
2196
2197 static int osc_enqueue_interpret(const struct lu_env *env,
2198                                  struct ptlrpc_request *req,
2199                                  struct osc_enqueue_args *aa, int rc)
2200 {
2201         struct ldlm_lock *lock;
2202         struct lustre_handle *lockh = &aa->oa_lockh;
2203         ldlm_mode_t mode = aa->oa_mode;
2204         struct ost_lvb *lvb = aa->oa_lvb;
2205         __u32 lvb_len = sizeof(*lvb);
2206         __u64 flags = 0;
2207
2208         ENTRY;
2209
2210         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2211          * be valid. */
2212         lock = ldlm_handle2lock(lockh);
2213         LASSERTF(lock != NULL,
2214                  "lockh "LPX64", req %p, aa %p - client evicted?\n",
2215                  lockh->cookie, req, aa);
2216
2217         /* Take an additional reference so that a blocking AST that
2218          * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2219          * to arrive after an upcall has been executed by
2220          * osc_enqueue_fini(). */
2221         ldlm_lock_addref(lockh, mode);
2222
2223         /* Let cl_lock_state_wait fail with -ERESTARTSYS to unuse sublocks. */
2224         OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_HANG, 2);
2225
2226         /* Let CP AST to grant the lock first. */
2227         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
2228
2229         if (aa->oa_agl) {
2230                 LASSERT(aa->oa_lvb == NULL);
2231                 LASSERT(aa->oa_flags == NULL);
2232                 aa->oa_flags = &flags;
2233         }
2234
2235         /* Complete obtaining the lock procedure. */
2236         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_type, 1,
2237                                    aa->oa_mode, aa->oa_flags, lvb, lvb_len,
2238                                    lockh, rc);
2239         /* Complete osc stuff. */
2240         rc = osc_enqueue_fini(req, aa->oa_upcall, aa->oa_cookie, lockh, mode,
2241                               aa->oa_flags, aa->oa_agl, rc);
2242
2243         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
2244
2245         ldlm_lock_decref(lockh, mode);
2246         LDLM_LOCK_PUT(lock);
2247         RETURN(rc);
2248 }
2249
2250 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
2251
2252 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2253  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2254  * other synchronous requests, however keeping some locks and trying to obtain
2255  * others may take a considerable amount of time in a case of ost failure; and
2256  * when other sync requests do not get released lock from a client, the client
2257  * is evicted from the cluster -- such scenarious make the life difficult, so
2258  * release locks just after they are obtained. */
2259 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2260                      __u64 *flags, ldlm_policy_data_t *policy,
2261                      struct ost_lvb *lvb, int kms_valid,
2262                      osc_enqueue_upcall_f upcall, void *cookie,
2263                      struct ldlm_enqueue_info *einfo,
2264                      struct ptlrpc_request_set *rqset, int async, int agl)
2265 {
2266         struct obd_device *obd = exp->exp_obd;
2267         struct lustre_handle lockh = { 0 };
2268         struct ptlrpc_request *req = NULL;
2269         int intent = *flags & LDLM_FL_HAS_INTENT;
2270         __u64 match_lvb = agl ? 0 : LDLM_FL_LVB_READY;
2271         ldlm_mode_t mode;
2272         int rc;
2273         ENTRY;
2274
2275         /* Filesystem lock extents are extended to page boundaries so that
2276          * dealing with the page cache is a little smoother.  */
2277         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2278         policy->l_extent.end |= ~CFS_PAGE_MASK;
2279
2280         /*
2281          * kms is not valid when either object is completely fresh (so that no
2282          * locks are cached), or object was evicted. In the latter case cached
2283          * lock cannot be used, because it would prime inode state with
2284          * potentially stale LVB.
2285          */
2286         if (!kms_valid)
2287                 goto no_match;
2288
2289         /* Next, search for already existing extent locks that will cover us */
2290         /* If we're trying to read, we also search for an existing PW lock.  The
2291          * VFS and page cache already protect us locally, so lots of readers/
2292          * writers can share a single PW lock.
2293          *
2294          * There are problems with conversion deadlocks, so instead of
2295          * converting a read lock to a write lock, we'll just enqueue a new
2296          * one.
2297          *
2298          * At some point we should cancel the read lock instead of making them
2299          * send us a blocking callback, but there are problems with canceling
2300          * locks out from other users right now, too. */
2301         mode = einfo->ei_mode;
2302         if (einfo->ei_mode == LCK_PR)
2303                 mode |= LCK_PW;
2304         mode = ldlm_lock_match(obd->obd_namespace, *flags | match_lvb, res_id,
2305                                einfo->ei_type, policy, mode, &lockh, 0);
2306         if (mode) {
2307                 struct ldlm_lock *matched;
2308
2309                 if (*flags & LDLM_FL_TEST_LOCK)
2310                         RETURN(ELDLM_OK);
2311
2312                 matched = ldlm_handle2lock(&lockh);
2313                 if (agl) {
2314                         /* AGL enqueues DLM locks speculatively. Therefore if
2315                          * it already exists a DLM lock, it wll just inform the
2316                          * caller to cancel the AGL process for this stripe. */
2317                         ldlm_lock_decref(&lockh, mode);
2318                         LDLM_LOCK_PUT(matched);
2319                         RETURN(-ECANCELED);
2320                 } else if (osc_set_lock_data_with_check(matched, einfo)) {
2321                         *flags |= LDLM_FL_LVB_READY;
2322
2323                         /* We already have a lock, and it's referenced. */
2324                         (*upcall)(cookie, &lockh, ELDLM_LOCK_MATCHED);
2325
2326                         ldlm_lock_decref(&lockh, mode);
2327                         LDLM_LOCK_PUT(matched);
2328                         RETURN(ELDLM_OK);
2329                 } else {
2330                         ldlm_lock_decref(&lockh, mode);
2331                         LDLM_LOCK_PUT(matched);
2332                 }
2333         }
2334
2335 no_match:
2336         if (*flags & LDLM_FL_TEST_LOCK)
2337                 RETURN(-ENOLCK);
2338
2339         if (intent) {
2340                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2341                                            &RQF_LDLM_ENQUEUE_LVB);
2342                 if (req == NULL)
2343                         RETURN(-ENOMEM);
2344
2345                 rc = ptlrpc_request_pack(req, LUSTRE_DLM_VERSION, LDLM_ENQUEUE);
2346                 if (rc < 0) {
2347                         ptlrpc_request_free(req);
2348                         RETURN(rc);
2349                 }
2350
2351                 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
2352                                      sizeof *lvb);
2353                 ptlrpc_request_set_replen(req);
2354         }
2355
2356         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2357         *flags &= ~LDLM_FL_BLOCK_GRANTED;
2358
2359         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2360                               sizeof(*lvb), LVB_T_OST, &lockh, async);
2361         if (async) {
2362                 if (!rc) {
2363                         struct osc_enqueue_args *aa;
2364                         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2365                         aa = ptlrpc_req_async_args(req);
2366                         aa->oa_exp    = exp;
2367                         aa->oa_mode   = einfo->ei_mode;
2368                         aa->oa_type   = einfo->ei_type;
2369                         lustre_handle_copy(&aa->oa_lockh, &lockh);
2370                         aa->oa_upcall = upcall;
2371                         aa->oa_cookie = cookie;
2372                         aa->oa_agl    = !!agl;
2373                         if (!agl) {
2374                                 aa->oa_flags  = flags;
2375                                 aa->oa_lvb    = lvb;
2376                         } else {
2377                                 /* AGL is essentially to enqueue an DLM lock
2378                                  * in advance, so we don't care about the
2379                                  * result of AGL enqueue. */
2380                                 aa->oa_lvb    = NULL;
2381                                 aa->oa_flags  = NULL;
2382                         }
2383
2384                         req->rq_interpret_reply =
2385                                 (ptlrpc_interpterer_t)osc_enqueue_interpret;
2386                         if (rqset == PTLRPCD_SET)
2387                                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
2388                         else
2389                                 ptlrpc_set_add_req(rqset, req);
2390                 } else if (intent) {
2391                         ptlrpc_req_finished(req);
2392                 }
2393                 RETURN(rc);
2394         }
2395
2396         rc = osc_enqueue_fini(req, upcall, cookie, &lockh, einfo->ei_mode,
2397                               flags, agl, rc);
2398         if (intent)
2399                 ptlrpc_req_finished(req);
2400
2401         RETURN(rc);
2402 }
2403
2404 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2405                    __u32 type, ldlm_policy_data_t *policy, __u32 mode,
2406                    __u64 *flags, void *data, struct lustre_handle *lockh,
2407                    int unref)
2408 {
2409         struct obd_device *obd = exp->exp_obd;
2410         __u64 lflags = *flags;
2411         ldlm_mode_t rc;
2412         ENTRY;
2413
2414         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2415                 RETURN(-EIO);
2416
2417         /* Filesystem lock extents are extended to page boundaries so that
2418          * dealing with the page cache is a little smoother */
2419         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2420         policy->l_extent.end |= ~CFS_PAGE_MASK;
2421
2422         /* Next, search for already existing extent locks that will cover us */
2423         /* If we're trying to read, we also search for an existing PW lock.  The
2424          * VFS and page cache already protect us locally, so lots of readers/
2425          * writers can share a single PW lock. */
2426         rc = mode;
2427         if (mode == LCK_PR)
2428                 rc |= LCK_PW;
2429         rc = ldlm_lock_match(obd->obd_namespace, lflags,
2430                              res_id, type, policy, rc, lockh, unref);
2431         if (rc) {
2432                 if (data != NULL) {
2433                         if (!osc_set_data_with_check(lockh, data)) {
2434                                 if (!(lflags & LDLM_FL_TEST_LOCK))
2435                                         ldlm_lock_decref(lockh, rc);
2436                                 RETURN(0);
2437                         }
2438                 }
2439                 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
2440                         ldlm_lock_addref(lockh, LCK_PR);
2441                         ldlm_lock_decref(lockh, LCK_PW);
2442                 }
2443                 RETURN(rc);
2444         }
2445         RETURN(rc);
2446 }
2447
2448 int osc_cancel_base(struct lustre_handle *lockh, __u32 mode)
2449 {
2450         ENTRY;
2451
2452         if (unlikely(mode == LCK_GROUP))
2453                 ldlm_lock_decref_and_cancel(lockh, mode);
2454         else
2455                 ldlm_lock_decref(lockh, mode);
2456
2457         RETURN(0);
2458 }
2459
2460 static int osc_statfs_interpret(const struct lu_env *env,
2461                                 struct ptlrpc_request *req,
2462                                 struct osc_async_args *aa, int rc)
2463 {
2464         struct obd_statfs *msfs;
2465         ENTRY;
2466
2467         if (rc == -EBADR)
2468                 /* The request has in fact never been sent
2469                  * due to issues at a higher level (LOV).
2470                  * Exit immediately since the caller is
2471                  * aware of the problem and takes care
2472                  * of the clean up */
2473                  RETURN(rc);
2474
2475         if ((rc == -ENOTCONN || rc == -EAGAIN) &&
2476             (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
2477                 GOTO(out, rc = 0);
2478
2479         if (rc != 0)
2480                 GOTO(out, rc);
2481
2482         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2483         if (msfs == NULL) {
2484                 GOTO(out, rc = -EPROTO);
2485         }
2486
2487         *aa->aa_oi->oi_osfs = *msfs;
2488 out:
2489         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2490         RETURN(rc);
2491 }
2492
2493 static int osc_statfs_async(struct obd_export *exp,
2494                             struct obd_info *oinfo, __u64 max_age,
2495                             struct ptlrpc_request_set *rqset)
2496 {
2497         struct obd_device     *obd = class_exp2obd(exp);
2498         struct ptlrpc_request *req;
2499         struct osc_async_args *aa;
2500         int                    rc;
2501         ENTRY;
2502
2503         /* We could possibly pass max_age in the request (as an absolute
2504          * timestamp or a "seconds.usec ago") so the target can avoid doing
2505          * extra calls into the filesystem if that isn't necessary (e.g.
2506          * during mount that would help a bit).  Having relative timestamps
2507          * is not so great if request processing is slow, while absolute
2508          * timestamps are not ideal because they need time synchronization. */
2509         req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
2510         if (req == NULL)
2511                 RETURN(-ENOMEM);
2512
2513         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2514         if (rc) {
2515                 ptlrpc_request_free(req);
2516                 RETURN(rc);
2517         }
2518         ptlrpc_request_set_replen(req);
2519         req->rq_request_portal = OST_CREATE_PORTAL;
2520         ptlrpc_at_set_req_timeout(req);
2521
2522         if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
2523                 /* procfs requests not want stat in wait for avoid deadlock */
2524                 req->rq_no_resend = 1;
2525                 req->rq_no_delay = 1;
2526         }
2527
2528         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
2529         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2530         aa = ptlrpc_req_async_args(req);
2531         aa->aa_oi = oinfo;
2532
2533         ptlrpc_set_add_req(rqset, req);
2534         RETURN(0);
2535 }
2536
2537 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
2538                       struct obd_statfs *osfs, __u64 max_age, __u32 flags)
2539 {
2540         struct obd_device     *obd = class_exp2obd(exp);
2541         struct obd_statfs     *msfs;
2542         struct ptlrpc_request *req;
2543         struct obd_import     *imp = NULL;
2544         int rc;
2545         ENTRY;
2546
2547         /*Since the request might also come from lprocfs, so we need
2548          *sync this with client_disconnect_export Bug15684*/
2549         down_read(&obd->u.cli.cl_sem);
2550         if (obd->u.cli.cl_import)
2551                 imp = class_import_get(obd->u.cli.cl_import);
2552         up_read(&obd->u.cli.cl_sem);
2553         if (!imp)
2554                 RETURN(-ENODEV);
2555
2556         /* We could possibly pass max_age in the request (as an absolute
2557          * timestamp or a "seconds.usec ago") so the target can avoid doing
2558          * extra calls into the filesystem if that isn't necessary (e.g.
2559          * during mount that would help a bit).  Having relative timestamps
2560          * is not so great if request processing is slow, while absolute
2561          * timestamps are not ideal because they need time synchronization. */
2562         req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
2563
2564         class_import_put(imp);
2565
2566         if (req == NULL)
2567                 RETURN(-ENOMEM);
2568
2569         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2570         if (rc) {
2571                 ptlrpc_request_free(req);
2572                 RETURN(rc);
2573         }
2574         ptlrpc_request_set_replen(req);
2575         req->rq_request_portal = OST_CREATE_PORTAL;
2576         ptlrpc_at_set_req_timeout(req);
2577
2578         if (flags & OBD_STATFS_NODELAY) {
2579                 /* procfs requests not want stat in wait for avoid deadlock */
2580                 req->rq_no_resend = 1;
2581                 req->rq_no_delay = 1;
2582         }
2583
2584         rc = ptlrpc_queue_wait(req);
2585         if (rc)
2586                 GOTO(out, rc);
2587
2588         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2589         if (msfs == NULL) {
2590                 GOTO(out, rc = -EPROTO);
2591         }
2592
2593         *osfs = *msfs;
2594
2595         EXIT;
2596  out:
2597         ptlrpc_req_finished(req);
2598         return rc;
2599 }
2600
2601 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2602                          void *karg, void *uarg)
2603 {
2604         struct obd_device *obd = exp->exp_obd;
2605         struct obd_ioctl_data *data = karg;
2606         int err = 0;
2607         ENTRY;
2608
2609         if (!try_module_get(THIS_MODULE)) {
2610                 CERROR("%s: cannot get module '%s'\n", obd->obd_name,
2611                        module_name(THIS_MODULE));
2612                 return -EINVAL;
2613         }
2614         switch (cmd) {
2615         case OBD_IOC_CLIENT_RECOVER:
2616                 err = ptlrpc_recover_import(obd->u.cli.cl_import,
2617                                             data->ioc_inlbuf1, 0);
2618                 if (err > 0)
2619                         err = 0;
2620                 GOTO(out, err);
2621         case IOC_OSC_SET_ACTIVE:
2622                 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
2623                                                data->ioc_offset);
2624                 GOTO(out, err);
2625         case OBD_IOC_POLL_QUOTACHECK:
2626                 err = osc_quota_poll_check(exp, (struct if_quotacheck *)karg);
2627                 GOTO(out, err);
2628         case OBD_IOC_PING_TARGET:
2629                 err = ptlrpc_obd_ping(obd);
2630                 GOTO(out, err);
2631         default:
2632                 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
2633                        cmd, current_comm());
2634                 GOTO(out, err = -ENOTTY);
2635         }
2636 out:
2637         module_put(THIS_MODULE);
2638         return err;
2639 }
2640
2641 static int osc_get_info(const struct lu_env *env, struct obd_export *exp,
2642                         obd_count keylen, void *key, __u32 *vallen, void *val,
2643                         struct lov_stripe_md *lsm)
2644 {
2645         ENTRY;
2646         if (!vallen || !val)
2647                 RETURN(-EFAULT);
2648
2649         if (KEY_IS(KEY_LOCK_TO_STRIPE)) {
2650                 __u32 *stripe = val;
2651                 *vallen = sizeof(*stripe);
2652                 *stripe = 0;
2653                 RETURN(0);
2654         } else if (KEY_IS(KEY_LAST_ID)) {
2655                 struct ptlrpc_request *req;
2656                 obd_id                *reply;
2657                 char                  *tmp;
2658                 int                    rc;
2659
2660                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2661                                            &RQF_OST_GET_INFO_LAST_ID);
2662                 if (req == NULL)
2663                         RETURN(-ENOMEM);
2664
2665                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
2666                                      RCL_CLIENT, keylen);
2667                 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
2668                 if (rc) {
2669                         ptlrpc_request_free(req);
2670                         RETURN(rc);
2671                 }
2672
2673                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
2674                 memcpy(tmp, key, keylen);
2675
2676                 req->rq_no_delay = req->rq_no_resend = 1;
2677                 ptlrpc_request_set_replen(req);
2678                 rc = ptlrpc_queue_wait(req);
2679                 if (rc)
2680                         GOTO(out, rc);
2681
2682                 reply = req_capsule_server_get(&req->rq_pill, &RMF_OBD_ID);
2683                 if (reply == NULL)
2684                         GOTO(out, rc = -EPROTO);
2685
2686                 *((obd_id *)val) = *reply;
2687         out:
2688                 ptlrpc_req_finished(req);
2689                 RETURN(rc);
2690         } else if (KEY_IS(KEY_FIEMAP)) {
2691                 struct ll_fiemap_info_key *fm_key =
2692                                 (struct ll_fiemap_info_key *)key;
2693                 struct ldlm_res_id       res_id;
2694                 ldlm_policy_data_t       policy;
2695                 struct lustre_handle     lockh;
2696                 ldlm_mode_t              mode = 0;
2697                 struct ptlrpc_request   *req;
2698                 struct ll_user_fiemap   *reply;
2699                 char                    *tmp;
2700                 int                      rc;
2701
2702                 if (!(fm_key->fiemap.fm_flags & FIEMAP_FLAG_SYNC))
2703                         goto skip_locking;
2704
2705                 policy.l_extent.start = fm_key->fiemap.fm_start &
2706                                                 CFS_PAGE_MASK;
2707
2708                 if (OBD_OBJECT_EOF - fm_key->fiemap.fm_length <=
2709                     fm_key->fiemap.fm_start + PAGE_CACHE_SIZE - 1)
2710                         policy.l_extent.end = OBD_OBJECT_EOF;
2711                 else
2712                         policy.l_extent.end = (fm_key->fiemap.fm_start +
2713                                 fm_key->fiemap.fm_length +
2714                                 PAGE_CACHE_SIZE - 1) & CFS_PAGE_MASK;
2715
2716                 ostid_build_res_name(&fm_key->oa.o_oi, &res_id);
2717                 mode = ldlm_lock_match(exp->exp_obd->obd_namespace,
2718                                        LDLM_FL_BLOCK_GRANTED |
2719                                        LDLM_FL_LVB_READY,
2720                                        &res_id, LDLM_EXTENT, &policy,
2721                                        LCK_PR | LCK_PW, &lockh, 0);
2722                 if (mode) { /* lock is cached on client */
2723                         if (mode != LCK_PR) {
2724                                 ldlm_lock_addref(&lockh, LCK_PR);
2725                                 ldlm_lock_decref(&lockh, LCK_PW);
2726                         }
2727                 } else { /* no cached lock, needs acquire lock on server side */
2728                         fm_key->oa.o_valid |= OBD_MD_FLFLAGS;
2729                         fm_key->oa.o_flags |= OBD_FL_SRVLOCK;
2730                 }
2731
2732 skip_locking:
2733                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2734                                            &RQF_OST_GET_INFO_FIEMAP);
2735                 if (req == NULL)
2736                         GOTO(drop_lock, rc = -ENOMEM);
2737
2738                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_KEY,
2739                                      RCL_CLIENT, keylen);
2740                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
2741                                      RCL_CLIENT, *vallen);
2742                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
2743                                      RCL_SERVER, *vallen);
2744
2745                 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
2746                 if (rc) {
2747                         ptlrpc_request_free(req);
2748                         GOTO(drop_lock, rc);
2749                 }
2750
2751                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_KEY);
2752                 memcpy(tmp, key, keylen);
2753                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_VAL);
2754                 memcpy(tmp, val, *vallen);
2755
2756                 ptlrpc_request_set_replen(req);
2757                 rc = ptlrpc_queue_wait(req);
2758                 if (rc)
2759                         GOTO(fini_req, rc);
2760
2761                 reply = req_capsule_server_get(&req->rq_pill, &RMF_FIEMAP_VAL);
2762                 if (reply == NULL)
2763                         GOTO(fini_req, rc = -EPROTO);
2764
2765                 memcpy(val, reply, *vallen);
2766 fini_req:
2767                 ptlrpc_req_finished(req);
2768 drop_lock:
2769                 if (mode)
2770                         ldlm_lock_decref(&lockh, LCK_PR);
2771                 RETURN(rc);
2772         }
2773
2774         RETURN(-EINVAL);
2775 }
2776
2777 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
2778                               obd_count keylen, void *key, obd_count vallen,
2779                               void *val, struct ptlrpc_request_set *set)
2780 {
2781         struct ptlrpc_request *req;
2782         struct obd_device     *obd = exp->exp_obd;
2783         struct obd_import     *imp = class_exp2cliimp(exp);
2784         char                  *tmp;
2785         int                    rc;
2786         ENTRY;
2787
2788         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
2789
2790         if (KEY_IS(KEY_CHECKSUM)) {
2791                 if (vallen != sizeof(int))
2792                         RETURN(-EINVAL);
2793                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
2794                 RETURN(0);
2795         }
2796
2797         if (KEY_IS(KEY_SPTLRPC_CONF)) {
2798                 sptlrpc_conf_client_adapt(obd);
2799                 RETURN(0);
2800         }
2801
2802         if (KEY_IS(KEY_FLUSH_CTX)) {
2803                 sptlrpc_import_flush_my_ctx(imp);
2804                 RETURN(0);
2805         }
2806
2807         if (KEY_IS(KEY_CACHE_SET)) {
2808                 struct client_obd *cli = &obd->u.cli;
2809
2810                 LASSERT(cli->cl_cache == NULL); /* only once */
2811                 cli->cl_cache = (struct cl_client_cache *)val;
2812                 atomic_inc(&cli->cl_cache->ccc_users);
2813                 cli->cl_lru_left = &cli->cl_cache->ccc_lru_left;
2814
2815                 /* add this osc into entity list */
2816                 LASSERT(list_empty(&cli->cl_lru_osc));
2817                 spin_lock(&cli->cl_cache->ccc_lru_lock);
2818                 list_add(&cli->cl_lru_osc, &cli->cl_cache->ccc_lru);
2819                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
2820
2821                 RETURN(0);
2822         }
2823
2824         if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
2825                 struct client_obd *cli = &obd->u.cli;
2826                 long nr = atomic_long_read(&cli->cl_lru_in_list) >> 1;
2827                 long target = *(long *)val;
2828
2829                 nr = osc_lru_shrink(env, cli, min(nr, target), true);
2830                 *(long *)val -= nr;
2831                 RETURN(0);
2832         }
2833
2834         if (!set && !KEY_IS(KEY_GRANT_SHRINK))
2835                 RETURN(-EINVAL);
2836
2837         /* We pass all other commands directly to OST. Since nobody calls osc
2838            methods directly and everybody is supposed to go through LOV, we
2839            assume lov checked invalid values for us.
2840            The only recognised values so far are evict_by_nid and mds_conn.
2841            Even if something bad goes through, we'd get a -EINVAL from OST
2842            anyway. */
2843
2844         req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ?
2845                                                 &RQF_OST_SET_GRANT_INFO :
2846                                                 &RQF_OBD_SET_INFO);
2847         if (req == NULL)
2848                 RETURN(-ENOMEM);
2849
2850         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
2851                              RCL_CLIENT, keylen);
2852         if (!KEY_IS(KEY_GRANT_SHRINK))
2853                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
2854                                      RCL_CLIENT, vallen);
2855         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
2856         if (rc) {
2857                 ptlrpc_request_free(req);
2858                 RETURN(rc);
2859         }
2860
2861         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
2862         memcpy(tmp, key, keylen);
2863         tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ?
2864                                                         &RMF_OST_BODY :
2865                                                         &RMF_SETINFO_VAL);
2866         memcpy(tmp, val, vallen);
2867
2868         if (KEY_IS(KEY_GRANT_SHRINK)) {
2869                 struct osc_grant_args *aa;
2870                 struct obdo *oa;
2871
2872                 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2873                 aa = ptlrpc_req_async_args(req);
2874                 OBDO_ALLOC(oa);
2875                 if (!oa) {
2876                         ptlrpc_req_finished(req);
2877                         RETURN(-ENOMEM);
2878                 }
2879                 *oa = ((struct ost_body *)val)->oa;
2880                 aa->aa_oa = oa;
2881                 req->rq_interpret_reply = osc_shrink_grant_interpret;
2882         }
2883
2884         ptlrpc_request_set_replen(req);
2885         if (!KEY_IS(KEY_GRANT_SHRINK)) {
2886                 LASSERT(set != NULL);
2887                 ptlrpc_set_add_req(set, req);
2888                 ptlrpc_check_set(NULL, set);
2889         } else
2890                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
2891
2892         RETURN(0);
2893 }
2894
2895 static int osc_reconnect(const struct lu_env *env,
2896                          struct obd_export *exp, struct obd_device *obd,
2897                          struct obd_uuid *cluuid,
2898                          struct obd_connect_data *data,
2899                          void *localdata)
2900 {
2901         struct client_obd *cli = &obd->u.cli;
2902
2903         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
2904                 long lost_grant;
2905
2906                 client_obd_list_lock(&cli->cl_loi_list_lock);
2907                 data->ocd_grant = (cli->cl_avail_grant +
2908                                   (cli->cl_dirty_pages << PAGE_CACHE_SHIFT)) ?:
2909                                   2 * cli_brw_size(obd);
2910                 lost_grant = cli->cl_lost_grant;
2911                 cli->cl_lost_grant = 0;
2912                 client_obd_list_unlock(&cli->cl_loi_list_lock);
2913
2914                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
2915                        " ocd_grant: %d, lost: %ld.\n", data->ocd_connect_flags,
2916                        data->ocd_version, data->ocd_grant, lost_grant);
2917         }
2918
2919         RETURN(0);
2920 }
2921
2922 static int osc_disconnect(struct obd_export *exp)
2923 {
2924         struct obd_device *obd = class_exp2obd(exp);
2925         int rc;
2926
2927         rc = client_disconnect_export(exp);
2928         /**
2929          * Initially we put del_shrink_grant before disconnect_export, but it
2930          * causes the following problem if setup (connect) and cleanup
2931          * (disconnect) are tangled together.
2932          *      connect p1                     disconnect p2
2933          *   ptlrpc_connect_import
2934          *     ...............               class_manual_cleanup
2935          *                                     osc_disconnect
2936          *                                     del_shrink_grant
2937          *   ptlrpc_connect_interrupt
2938          *     init_grant_shrink
2939          *   add this client to shrink list
2940          *                                      cleanup_osc
2941          * Bang! pinger trigger the shrink.
2942          * So the osc should be disconnected from the shrink list, after we
2943          * are sure the import has been destroyed. BUG18662
2944          */
2945         if (obd->u.cli.cl_import == NULL)
2946                 osc_del_shrink_grant(&obd->u.cli);
2947         return rc;
2948 }
2949
2950 static int osc_import_event(struct obd_device *obd,
2951                             struct obd_import *imp,
2952                             enum obd_import_event event)
2953 {
2954         struct client_obd *cli;
2955         int rc = 0;
2956
2957         ENTRY;
2958         LASSERT(imp->imp_obd == obd);
2959
2960         switch (event) {
2961         case IMP_EVENT_DISCON: {
2962                 cli = &obd->u.cli;
2963                 client_obd_list_lock(&cli->cl_loi_list_lock);
2964                 cli->cl_avail_grant = 0;
2965                 cli->cl_lost_grant = 0;
2966                 client_obd_list_unlock(&cli->cl_loi_list_lock);
2967                 break;
2968         }
2969         case IMP_EVENT_INACTIVE: {
2970                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
2971                 break;
2972         }
2973         case IMP_EVENT_INVALIDATE: {
2974                 struct ldlm_namespace *ns = obd->obd_namespace;
2975                 struct lu_env         *env;
2976                 int                    refcheck;
2977
2978                 env = cl_env_get(&refcheck);
2979                 if (!IS_ERR(env)) {
2980                         /* Reset grants */
2981                         cli = &obd->u.cli;
2982                         /* all pages go to failing rpcs due to the invalid
2983                          * import */
2984                         osc_io_unplug(env, cli, NULL, PDL_POLICY_ROUND);
2985
2986                         ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
2987                         cl_env_put(env, &refcheck);
2988                 } else
2989                         rc = PTR_ERR(env);
2990                 break;
2991         }
2992         case IMP_EVENT_ACTIVE: {
2993                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
2994                 break;
2995         }
2996         case IMP_EVENT_OCD: {
2997                 struct obd_connect_data *ocd = &imp->imp_connect_data;
2998
2999                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
3000                         osc_init_grant(&obd->u.cli, ocd);
3001
3002                 /* See bug 7198 */
3003                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
3004                         imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
3005
3006                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
3007                 break;
3008         }
3009         case IMP_EVENT_DEACTIVATE: {
3010                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE, NULL);
3011                 break;
3012         }
3013         case IMP_EVENT_ACTIVATE: {
3014                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE, NULL);
3015                 break;
3016         }
3017         default:
3018                 CERROR("Unknown import event %d\n", event);
3019                 LBUG();
3020         }
3021         RETURN(rc);
3022 }
3023
3024 /**
3025  * Determine whether the lock can be canceled before replaying the lock
3026  * during recovery, see bug16774 for detailed information.
3027  *
3028  * \retval zero the lock can't be canceled
3029  * \retval other ok to cancel
3030  */
3031 static int osc_cancel_weight(struct ldlm_lock *lock)
3032 {
3033         /*
3034          * Cancel all unused and granted extent lock.
3035          */
3036         if (lock->l_resource->lr_type == LDLM_EXTENT &&
3037             lock->l_granted_mode == lock->l_req_mode &&
3038             osc_ldlm_weigh_ast(lock) == 0)
3039                 RETURN(1);
3040
3041         RETURN(0);
3042 }
3043
3044 static int brw_queue_work(const struct lu_env *env, void *data)
3045 {
3046         struct client_obd *cli = data;
3047
3048         CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
3049
3050         osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
3051         RETURN(0);
3052 }
3053
3054 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
3055 {
3056         struct client_obd *cli = &obd->u.cli;
3057         struct obd_type   *type;
3058         void              *handler;
3059         int                rc;
3060         ENTRY;
3061
3062         rc = ptlrpcd_addref();
3063         if (rc)
3064                 RETURN(rc);
3065
3066         rc = client_obd_setup(obd, lcfg);
3067         if (rc)
3068                 GOTO(out_ptlrpcd, rc);
3069
3070         handler = ptlrpcd_alloc_work(cli->cl_import, brw_queue_work, cli);
3071         if (IS_ERR(handler))
3072                 GOTO(out_client_setup, rc = PTR_ERR(handler));
3073         cli->cl_writeback_work = handler;
3074
3075         handler = ptlrpcd_alloc_work(cli->cl_import, lru_queue_work, cli);
3076         if (IS_ERR(handler))
3077                 GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
3078         cli->cl_lru_work = handler;
3079
3080         rc = osc_quota_setup(obd);
3081         if (rc)
3082                 GOTO(out_ptlrpcd_work, rc);
3083
3084         cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
3085
3086 #ifdef LPROCFS
3087         obd->obd_vars = lprocfs_osc_obd_vars;
3088 #endif
3089         /* If this is true then both client (osc) and server (osp) are on the
3090          * same node. The osp layer if loaded first will register the osc proc
3091          * directory. In that case this obd_device will be attached its proc
3092          * tree to type->typ_procsym instead of obd->obd_type->typ_procroot. */
3093         type = class_search_type(LUSTRE_OSP_NAME);
3094         if (type && type->typ_procsym) {
3095                 obd->obd_proc_entry = lprocfs_seq_register(obd->obd_name,
3096                                                            type->typ_procsym,
3097                                                            obd->obd_vars, obd);
3098                 if (IS_ERR(obd->obd_proc_entry)) {
3099                         rc = PTR_ERR(obd->obd_proc_entry);
3100                         CERROR("error %d setting up lprocfs for %s\n", rc,
3101                                obd->obd_name);
3102                         obd->obd_proc_entry = NULL;
3103                 }
3104         } else {
3105                 rc = lprocfs_obd_setup(obd);
3106         }
3107
3108         /* If the basic OSC proc tree construction succeeded then
3109          * lets do the rest. */
3110         if (rc == 0) {
3111                 lproc_osc_attach_seqstat(obd);
3112                 sptlrpc_lprocfs_cliobd_attach(obd);
3113                 ptlrpc_lprocfs_register_obd(obd);
3114         }
3115
3116         /* We need to allocate a few requests more, because
3117          * brw_interpret tries to create new requests before freeing
3118          * previous ones, Ideally we want to have 2x max_rpcs_in_flight
3119          * reserved, but I'm afraid that might be too much wasted RAM
3120          * in fact, so 2 is just my guess and still should work. */
3121         cli->cl_import->imp_rq_pool =
3122                 ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
3123                                     OST_MAXREQSIZE,
3124                                     ptlrpc_add_rqs_to_pool);
3125
3126         INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
3127         ns_register_cancel(obd->obd_namespace, osc_cancel_weight);
3128         RETURN(0);
3129
3130 out_ptlrpcd_work:
3131         if (cli->cl_writeback_work != NULL) {
3132                 ptlrpcd_destroy_work(cli->cl_writeback_work);
3133                 cli->cl_writeback_work = NULL;
3134         }
3135         if (cli->cl_lru_work != NULL) {
3136                 ptlrpcd_destroy_work(cli->cl_lru_work);
3137                 cli->cl_lru_work = NULL;
3138         }
3139 out_client_setup:
3140         client_obd_cleanup(obd);
3141 out_ptlrpcd:
3142         ptlrpcd_decref();
3143         RETURN(rc);
3144 }
3145
3146 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
3147 {
3148         int rc = 0;
3149         ENTRY;
3150
3151         switch (stage) {
3152         case OBD_CLEANUP_EARLY: {
3153                 struct obd_import *imp;
3154                 imp = obd->u.cli.cl_import;
3155                 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
3156                 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
3157                 ptlrpc_deactivate_import(imp);
3158                 spin_lock(&imp->imp_lock);
3159                 imp->imp_pingable = 0;
3160                 spin_unlock(&imp->imp_lock);
3161                 break;
3162         }
3163         case OBD_CLEANUP_EXPORTS: {
3164                 struct client_obd *cli = &obd->u.cli;
3165                 /* LU-464
3166                  * for echo client, export may be on zombie list, wait for
3167                  * zombie thread to cull it, because cli.cl_import will be
3168                  * cleared in client_disconnect_export():
3169                  *   class_export_destroy() -> obd_cleanup() ->
3170                  *   echo_device_free() -> echo_client_cleanup() ->
3171                  *   obd_disconnect() -> osc_disconnect() ->
3172                  *   client_disconnect_export()
3173                  */
3174                 obd_zombie_barrier();
3175                 if (cli->cl_writeback_work) {
3176                         ptlrpcd_destroy_work(cli->cl_writeback_work);
3177                         cli->cl_writeback_work = NULL;
3178                 }
3179                 if (cli->cl_lru_work) {
3180                         ptlrpcd_destroy_work(cli->cl_lru_work);
3181                         cli->cl_lru_work = NULL;
3182                 }
3183                 obd_cleanup_client_import(obd);
3184                 ptlrpc_lprocfs_unregister_obd(obd);
3185                 lprocfs_obd_cleanup(obd);
3186                 break;
3187                 }
3188         }
3189         RETURN(rc);
3190 }
3191
3192 int osc_cleanup(struct obd_device *obd)
3193 {
3194         struct client_obd *cli = &obd->u.cli;
3195         int rc;
3196
3197         ENTRY;
3198
3199         /* lru cleanup */
3200         if (cli->cl_cache != NULL) {
3201                 LASSERT(atomic_read(&cli->cl_cache->ccc_users) > 0);
3202                 spin_lock(&cli->cl_cache->ccc_lru_lock);
3203                 list_del_init(&cli->cl_lru_osc);
3204                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
3205                 cli->cl_lru_left = NULL;
3206                 atomic_dec(&cli->cl_cache->ccc_users);
3207                 cli->cl_cache = NULL;
3208         }
3209
3210         /* free memory of osc quota cache */
3211         osc_quota_cleanup(obd);
3212
3213         rc = client_obd_cleanup(obd);
3214
3215         ptlrpcd_decref();
3216         RETURN(rc);
3217 }
3218
3219 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
3220 {
3221         int rc = class_process_proc_param(PARAM_OSC, obd->obd_vars, lcfg, obd);
3222         return rc > 0 ? 0: rc;
3223 }
3224
3225 static int osc_process_config(struct obd_device *obd, obd_count len, void *buf)
3226 {
3227         return osc_process_config_base(obd, buf);
3228 }
3229
3230 struct obd_ops osc_obd_ops = {
3231         .o_owner                = THIS_MODULE,
3232         .o_setup                = osc_setup,
3233         .o_precleanup           = osc_precleanup,
3234         .o_cleanup              = osc_cleanup,
3235         .o_add_conn             = client_import_add_conn,
3236         .o_del_conn             = client_import_del_conn,
3237         .o_connect              = client_connect_import,
3238         .o_reconnect            = osc_reconnect,
3239         .o_disconnect           = osc_disconnect,
3240         .o_statfs               = osc_statfs,
3241         .o_statfs_async         = osc_statfs_async,
3242         .o_unpackmd             = osc_unpackmd,
3243         .o_create               = osc_create,
3244         .o_destroy              = osc_destroy,
3245         .o_getattr              = osc_getattr,
3246         .o_getattr_async        = osc_getattr_async,
3247         .o_setattr              = osc_setattr,
3248         .o_setattr_async        = osc_setattr_async,
3249         .o_change_cbdata        = osc_change_cbdata,
3250         .o_find_cbdata          = osc_find_cbdata,
3251         .o_iocontrol            = osc_iocontrol,
3252         .o_get_info             = osc_get_info,
3253         .o_set_info_async       = osc_set_info_async,
3254         .o_import_event         = osc_import_event,
3255         .o_process_config       = osc_process_config,
3256         .o_quotactl             = osc_quotactl,
3257         .o_quotacheck           = osc_quotacheck,
3258 };
3259
3260 extern struct lu_kmem_descr osc_caches[];
3261 extern struct lock_class_key osc_ast_guard_class;
3262
3263 int __init osc_init(void)
3264 {
3265         bool enable_proc = true;
3266         struct obd_type *type;
3267         int rc;
3268         ENTRY;
3269
3270         /* print an address of _any_ initialized kernel symbol from this
3271          * module, to allow debugging with gdb that doesn't support data
3272          * symbols from modules.*/
3273         CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
3274
3275         rc = lu_kmem_init(osc_caches);
3276         if (rc)
3277                 RETURN(rc);
3278
3279         type = class_search_type(LUSTRE_OSP_NAME);
3280         if (type != NULL && type->typ_procsym != NULL)
3281                 enable_proc = false;
3282
3283         rc = class_register_type(&osc_obd_ops, NULL, enable_proc, NULL,
3284                                  LUSTRE_OSC_NAME, &osc_device_type);
3285         if (rc) {
3286                 lu_kmem_fini(osc_caches);
3287                 RETURN(rc);
3288         }
3289
3290         RETURN(rc);
3291 }
3292
3293 static void /*__exit*/ osc_exit(void)
3294 {
3295         class_unregister_type(LUSTRE_OSC_NAME);
3296         lu_kmem_fini(osc_caches);
3297 }
3298
3299 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
3300 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3301 MODULE_LICENSE("GPL");
3302
3303 cfs_module(osc, LUSTRE_VERSION_STRING, osc_init, osc_exit);