Whamcloud - gitweb
LU-4841 osc: revise unstable pages accounting
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2013, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #define DEBUG_SUBSYSTEM S_OSC
38
39 #include <libcfs/libcfs.h>
40
41 #ifndef __KERNEL__
42 # include <liblustre.h>
43 #endif
44
45 #include <lustre_dlm.h>
46 #include <lustre_net.h>
47 #include <lustre/lustre_user.h>
48 #include <obd_cksum.h>
49
50 #ifdef  __CYGWIN__
51 # include <ctype.h>
52 #endif
53
54 #include <lustre_ha.h>
55 #include <lprocfs_status.h>
56 #include <lustre_ioctl.h>
57 #include <lustre_log.h>
58 #include <lustre_debug.h>
59 #include <lustre_param.h>
60 #include <lustre_fid.h>
61 #include "osc_internal.h"
62 #include "osc_cl_internal.h"
63
64 struct osc_brw_async_args {
65         struct obdo              *aa_oa;
66         int                       aa_requested_nob;
67         int                       aa_nio_count;
68         obd_count                 aa_page_count;
69         int                       aa_resends;
70         struct brw_page **aa_ppga;
71         struct client_obd        *aa_cli;
72         struct list_head          aa_oaps;
73         struct list_head          aa_exts;
74         struct obd_capa  *aa_ocapa;
75         struct cl_req            *aa_clerq;
76 };
77
78 #define osc_grant_args osc_brw_async_args
79
80 struct osc_async_args {
81         struct obd_info *aa_oi;
82 };
83
84 struct osc_setattr_args {
85         struct obdo             *sa_oa;
86         obd_enqueue_update_f     sa_upcall;
87         void                    *sa_cookie;
88 };
89
90 struct osc_fsync_args {
91         struct obd_info *fa_oi;
92         obd_enqueue_update_f     fa_upcall;
93         void                    *fa_cookie;
94 };
95
96 struct osc_enqueue_args {
97         struct obd_export               *oa_exp;
98         __u64                           *oa_flags;
99         obd_enqueue_update_f             oa_upcall;
100         void                            *oa_cookie;
101         struct ost_lvb                  *oa_lvb;
102         struct lustre_handle            *oa_lockh;
103         struct ldlm_enqueue_info        *oa_ei;
104         unsigned int                     oa_agl:1;
105 };
106
107 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
108 static int brw_interpret(const struct lu_env *env, struct ptlrpc_request *req,
109                          void *data, int rc);
110
111 /* Unpack OSC object metadata from disk storage (LE byte order). */
112 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
113                         struct lov_mds_md *lmm, int lmm_bytes)
114 {
115         int lsm_size;
116         struct obd_import *imp = class_exp2cliimp(exp);
117         ENTRY;
118
119         if (lmm != NULL) {
120                 if (lmm_bytes < sizeof(*lmm)) {
121                         CERROR("%s: lov_mds_md too small: %d, need %d\n",
122                                exp->exp_obd->obd_name, lmm_bytes,
123                                (int)sizeof(*lmm));
124                         RETURN(-EINVAL);
125                 }
126                 /* XXX LOV_MAGIC etc check? */
127
128                 if (unlikely(ostid_id(&lmm->lmm_oi) == 0)) {
129                         CERROR("%s: zero lmm_object_id: rc = %d\n",
130                                exp->exp_obd->obd_name, -EINVAL);
131                         RETURN(-EINVAL);
132                 }
133         }
134
135         lsm_size = lov_stripe_md_size(1);
136         if (lsmp == NULL)
137                 RETURN(lsm_size);
138
139         if (*lsmp != NULL && lmm == NULL) {
140                 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
141                 OBD_FREE(*lsmp, lsm_size);
142                 *lsmp = NULL;
143                 RETURN(0);
144         }
145
146         if (*lsmp == NULL) {
147                 OBD_ALLOC(*lsmp, lsm_size);
148                 if (unlikely(*lsmp == NULL))
149                         RETURN(-ENOMEM);
150                 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
151                 if (unlikely((*lsmp)->lsm_oinfo[0] == NULL)) {
152                         OBD_FREE(*lsmp, lsm_size);
153                         RETURN(-ENOMEM);
154                 }
155                 loi_init((*lsmp)->lsm_oinfo[0]);
156         } else if (unlikely(ostid_id(&(*lsmp)->lsm_oi) == 0)) {
157                 RETURN(-EBADF);
158         }
159
160         if (lmm != NULL)
161                 /* XXX zero *lsmp? */
162                 ostid_le_to_cpu(&lmm->lmm_oi, &(*lsmp)->lsm_oi);
163
164         if (imp != NULL &&
165             (imp->imp_connect_data.ocd_connect_flags & OBD_CONNECT_MAXBYTES))
166                 (*lsmp)->lsm_maxbytes = imp->imp_connect_data.ocd_maxbytes;
167         else
168                 (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
169
170         RETURN(lsm_size);
171 }
172
173 static inline void osc_pack_capa(struct ptlrpc_request *req,
174                                  struct ost_body *body, void *capa)
175 {
176         struct obd_capa *oc = (struct obd_capa *)capa;
177         struct lustre_capa *c;
178
179         if (!capa)
180                 return;
181
182         c = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
183         LASSERT(c);
184         capa_cpy(c, oc);
185         body->oa.o_valid |= OBD_MD_FLOSSCAPA;
186         DEBUG_CAPA(D_SEC, c, "pack");
187 }
188
189 static inline void osc_pack_req_body(struct ptlrpc_request *req,
190                                      struct obd_info *oinfo)
191 {
192         struct ost_body *body;
193
194         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
195         LASSERT(body);
196
197         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
198                              oinfo->oi_oa);
199         osc_pack_capa(req, body, oinfo->oi_capa);
200 }
201
202 static inline void osc_set_capa_size(struct ptlrpc_request *req,
203                                      const struct req_msg_field *field,
204                                      struct obd_capa *oc)
205 {
206         if (oc == NULL)
207                 req_capsule_set_size(&req->rq_pill, field, RCL_CLIENT, 0);
208         else
209                 /* it is already calculated as sizeof struct obd_capa */
210                 ;
211 }
212
213 static int osc_getattr_interpret(const struct lu_env *env,
214                                  struct ptlrpc_request *req,
215                                  struct osc_async_args *aa, int rc)
216 {
217         struct ost_body *body;
218         ENTRY;
219
220         if (rc != 0)
221                 GOTO(out, rc);
222
223         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
224         if (body) {
225                 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
226                 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
227                                      aa->aa_oi->oi_oa, &body->oa);
228
229                 /* This should really be sent by the OST */
230                 aa->aa_oi->oi_oa->o_blksize = DT_MAX_BRW_SIZE;
231                 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
232         } else {
233                 CDEBUG(D_INFO, "can't unpack ost_body\n");
234                 rc = -EPROTO;
235                 aa->aa_oi->oi_oa->o_valid = 0;
236         }
237 out:
238         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
239         RETURN(rc);
240 }
241
242 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
243                              struct ptlrpc_request_set *set)
244 {
245         struct ptlrpc_request *req;
246         struct osc_async_args *aa;
247         int                    rc;
248         ENTRY;
249
250         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
251         if (req == NULL)
252                 RETURN(-ENOMEM);
253
254         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
255         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
256         if (rc) {
257                 ptlrpc_request_free(req);
258                 RETURN(rc);
259         }
260
261         osc_pack_req_body(req, oinfo);
262
263         ptlrpc_request_set_replen(req);
264         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_getattr_interpret;
265
266         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
267         aa = ptlrpc_req_async_args(req);
268         aa->aa_oi = oinfo;
269
270         ptlrpc_set_add_req(set, req);
271         RETURN(0);
272 }
273
274 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
275                        struct obd_info *oinfo)
276 {
277         struct ptlrpc_request *req;
278         struct ost_body       *body;
279         int                    rc;
280         ENTRY;
281
282         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
283         if (req == NULL)
284                 RETURN(-ENOMEM);
285
286         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
287         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
288         if (rc) {
289                 ptlrpc_request_free(req);
290                 RETURN(rc);
291         }
292
293         osc_pack_req_body(req, oinfo);
294
295         ptlrpc_request_set_replen(req);
296
297         rc = ptlrpc_queue_wait(req);
298         if (rc)
299                 GOTO(out, rc);
300
301         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
302         if (body == NULL)
303                 GOTO(out, rc = -EPROTO);
304
305         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
306         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oinfo->oi_oa,
307                              &body->oa);
308
309         oinfo->oi_oa->o_blksize = cli_brw_size(exp->exp_obd);
310         oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
311
312         EXIT;
313  out:
314         ptlrpc_req_finished(req);
315         return rc;
316 }
317
318 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
319                        struct obd_info *oinfo, struct obd_trans_info *oti)
320 {
321         struct ptlrpc_request *req;
322         struct ost_body       *body;
323         int                    rc;
324         ENTRY;
325
326         LASSERT(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP);
327
328         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
329         if (req == NULL)
330                 RETURN(-ENOMEM);
331
332         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
333         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
334         if (rc) {
335                 ptlrpc_request_free(req);
336                 RETURN(rc);
337         }
338
339         osc_pack_req_body(req, oinfo);
340
341         ptlrpc_request_set_replen(req);
342
343         rc = ptlrpc_queue_wait(req);
344         if (rc)
345                 GOTO(out, rc);
346
347         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
348         if (body == NULL)
349                 GOTO(out, rc = -EPROTO);
350
351         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oinfo->oi_oa,
352                              &body->oa);
353
354         EXIT;
355 out:
356         ptlrpc_req_finished(req);
357         RETURN(rc);
358 }
359
360 static int osc_setattr_interpret(const struct lu_env *env,
361                                  struct ptlrpc_request *req,
362                                  struct osc_setattr_args *sa, int rc)
363 {
364         struct ost_body *body;
365         ENTRY;
366
367         if (rc != 0)
368                 GOTO(out, rc);
369
370         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
371         if (body == NULL)
372                 GOTO(out, rc = -EPROTO);
373
374         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, sa->sa_oa,
375                              &body->oa);
376 out:
377         rc = sa->sa_upcall(sa->sa_cookie, rc);
378         RETURN(rc);
379 }
380
381 int osc_setattr_async_base(struct obd_export *exp, struct obd_info *oinfo,
382                            struct obd_trans_info *oti,
383                            obd_enqueue_update_f upcall, void *cookie,
384                            struct ptlrpc_request_set *rqset)
385 {
386         struct ptlrpc_request   *req;
387         struct osc_setattr_args *sa;
388         int                      rc;
389         ENTRY;
390
391         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
392         if (req == NULL)
393                 RETURN(-ENOMEM);
394
395         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
396         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
397         if (rc) {
398                 ptlrpc_request_free(req);
399                 RETURN(rc);
400         }
401
402         if (oti && oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)
403                 oinfo->oi_oa->o_lcookie = *oti->oti_logcookies;
404
405         osc_pack_req_body(req, oinfo);
406
407         ptlrpc_request_set_replen(req);
408
409         /* do mds to ost setattr asynchronously */
410         if (!rqset) {
411                 /* Do not wait for response. */
412                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
413         } else {
414                 req->rq_interpret_reply =
415                         (ptlrpc_interpterer_t)osc_setattr_interpret;
416
417                 CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
418                 sa = ptlrpc_req_async_args(req);
419                 sa->sa_oa = oinfo->oi_oa;
420                 sa->sa_upcall = upcall;
421                 sa->sa_cookie = cookie;
422
423                 if (rqset == PTLRPCD_SET)
424                         ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
425                 else
426                         ptlrpc_set_add_req(rqset, req);
427         }
428
429         RETURN(0);
430 }
431
432 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
433                              struct obd_trans_info *oti,
434                              struct ptlrpc_request_set *rqset)
435 {
436         return osc_setattr_async_base(exp, oinfo, oti,
437                                       oinfo->oi_cb_up, oinfo, rqset);
438 }
439
440 int osc_real_create(struct obd_export *exp, struct obdo *oa,
441                     struct lov_stripe_md **ea, struct obd_trans_info *oti)
442 {
443         struct ptlrpc_request *req;
444         struct ost_body       *body;
445         struct lov_stripe_md  *lsm;
446         int                    rc;
447         ENTRY;
448
449         LASSERT(oa);
450         LASSERT(ea);
451
452         lsm = *ea;
453         if (!lsm) {
454                 rc = obd_alloc_memmd(exp, &lsm);
455                 if (rc < 0)
456                         RETURN(rc);
457         }
458
459         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
460         if (req == NULL)
461                 GOTO(out, rc = -ENOMEM);
462
463         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
464         if (rc) {
465                 ptlrpc_request_free(req);
466                 GOTO(out, rc);
467         }
468
469         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
470         LASSERT(body);
471
472         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
473
474         ptlrpc_request_set_replen(req);
475
476         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
477             oa->o_flags == OBD_FL_DELORPHAN) {
478                 DEBUG_REQ(D_HA, req,
479                           "delorphan from OST integration");
480                 /* Don't resend the delorphan req */
481                 req->rq_no_resend = req->rq_no_delay = 1;
482         }
483
484         rc = ptlrpc_queue_wait(req);
485         if (rc)
486                 GOTO(out_req, rc);
487
488         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
489         if (body == NULL)
490                 GOTO(out_req, rc = -EPROTO);
491
492         CDEBUG(D_INFO, "oa flags %x\n", oa->o_flags);
493         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
494
495         oa->o_blksize = cli_brw_size(exp->exp_obd);
496         oa->o_valid |= OBD_MD_FLBLKSZ;
497
498         /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
499          * have valid lsm_oinfo data structs, so don't go touching that.
500          * This needs to be fixed in a big way.
501          */
502         lsm->lsm_oi = oa->o_oi;
503         *ea = lsm;
504
505         if (oti != NULL) {
506                 if (oa->o_valid & OBD_MD_FLCOOKIE) {
507                         if (oti->oti_logcookies == NULL)
508                                 oti->oti_logcookies = &oti->oti_onecookie;
509
510                         *oti->oti_logcookies = oa->o_lcookie;
511                 }
512         }
513
514         CDEBUG(D_HA, "transno: "LPD64"\n",
515                lustre_msg_get_transno(req->rq_repmsg));
516 out_req:
517         ptlrpc_req_finished(req);
518 out:
519         if (rc && !*ea)
520                 obd_free_memmd(exp, &lsm);
521         RETURN(rc);
522 }
523
524 int osc_punch_base(struct obd_export *exp, struct obd_info *oinfo,
525                    obd_enqueue_update_f upcall, void *cookie,
526                    struct ptlrpc_request_set *rqset)
527 {
528         struct ptlrpc_request   *req;
529         struct osc_setattr_args *sa;
530         struct ost_body         *body;
531         int                      rc;
532         ENTRY;
533
534         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
535         if (req == NULL)
536                 RETURN(-ENOMEM);
537
538         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
539         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
540         if (rc) {
541                 ptlrpc_request_free(req);
542                 RETURN(rc);
543         }
544         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
545         ptlrpc_at_set_req_timeout(req);
546
547         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
548         LASSERT(body);
549         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
550                              oinfo->oi_oa);
551         osc_pack_capa(req, body, oinfo->oi_capa);
552
553         ptlrpc_request_set_replen(req);
554
555         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
556         CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
557         sa = ptlrpc_req_async_args(req);
558         sa->sa_oa     = oinfo->oi_oa;
559         sa->sa_upcall = upcall;
560         sa->sa_cookie = cookie;
561         if (rqset == PTLRPCD_SET)
562                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
563         else
564                 ptlrpc_set_add_req(rqset, req);
565
566         RETURN(0);
567 }
568
569 static int osc_sync_interpret(const struct lu_env *env,
570                               struct ptlrpc_request *req,
571                               void *arg, int rc)
572 {
573         struct osc_fsync_args *fa = arg;
574         struct ost_body *body;
575         ENTRY;
576
577         if (rc)
578                 GOTO(out, rc);
579
580         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
581         if (body == NULL) {
582                 CERROR ("can't unpack ost_body\n");
583                 GOTO(out, rc = -EPROTO);
584         }
585
586         *fa->fa_oi->oi_oa = body->oa;
587 out:
588         rc = fa->fa_upcall(fa->fa_cookie, rc);
589         RETURN(rc);
590 }
591
592 int osc_sync_base(struct obd_export *exp, struct obd_info *oinfo,
593                   obd_enqueue_update_f upcall, void *cookie,
594                   struct ptlrpc_request_set *rqset)
595 {
596         struct ptlrpc_request *req;
597         struct ost_body       *body;
598         struct osc_fsync_args *fa;
599         int                    rc;
600         ENTRY;
601
602         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
603         if (req == NULL)
604                 RETURN(-ENOMEM);
605
606         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
607         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
608         if (rc) {
609                 ptlrpc_request_free(req);
610                 RETURN(rc);
611         }
612
613         /* overload the size and blocks fields in the oa with start/end */
614         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
615         LASSERT(body);
616         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
617                              oinfo->oi_oa);
618         osc_pack_capa(req, body, oinfo->oi_capa);
619
620         ptlrpc_request_set_replen(req);
621         req->rq_interpret_reply = osc_sync_interpret;
622
623         CLASSERT(sizeof(*fa) <= sizeof(req->rq_async_args));
624         fa = ptlrpc_req_async_args(req);
625         fa->fa_oi = oinfo;
626         fa->fa_upcall = upcall;
627         fa->fa_cookie = cookie;
628
629         if (rqset == PTLRPCD_SET)
630                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
631         else
632                 ptlrpc_set_add_req(rqset, req);
633
634         RETURN (0);
635 }
636
637 /* Find and cancel locally locks matched by @mode in the resource found by
638  * @objid. Found locks are added into @cancel list. Returns the amount of
639  * locks added to @cancels list. */
640 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
641                                    struct list_head *cancels,
642                                    ldlm_mode_t mode, __u64 lock_flags)
643 {
644         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
645         struct ldlm_res_id res_id;
646         struct ldlm_resource *res;
647         int count;
648         ENTRY;
649
650         /* Return, i.e. cancel nothing, only if ELC is supported (flag in
651          * export) but disabled through procfs (flag in NS).
652          *
653          * This distinguishes from a case when ELC is not supported originally,
654          * when we still want to cancel locks in advance and just cancel them
655          * locally, without sending any RPC. */
656         if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
657                 RETURN(0);
658
659         ostid_build_res_name(&oa->o_oi, &res_id);
660         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
661         if (res == NULL)
662                 RETURN(0);
663
664         LDLM_RESOURCE_ADDREF(res);
665         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
666                                            lock_flags, 0, NULL);
667         LDLM_RESOURCE_DELREF(res);
668         ldlm_resource_putref(res);
669         RETURN(count);
670 }
671
672 static int osc_destroy_interpret(const struct lu_env *env,
673                                  struct ptlrpc_request *req, void *data,
674                                  int rc)
675 {
676         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
677
678         atomic_dec(&cli->cl_destroy_in_flight);
679         wake_up(&cli->cl_destroy_waitq);
680         return 0;
681 }
682
683 static int osc_can_send_destroy(struct client_obd *cli)
684 {
685         if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
686             cli->cl_max_rpcs_in_flight) {
687                 /* The destroy request can be sent */
688                 return 1;
689         }
690         if (atomic_dec_return(&cli->cl_destroy_in_flight) <
691             cli->cl_max_rpcs_in_flight) {
692                 /*
693                  * The counter has been modified between the two atomic
694                  * operations.
695                  */
696                 wake_up(&cli->cl_destroy_waitq);
697         }
698         return 0;
699 }
700
701 int osc_create(const struct lu_env *env, struct obd_export *exp,
702                struct obdo *oa, struct lov_stripe_md **ea,
703                struct obd_trans_info *oti)
704 {
705         int rc = 0;
706         ENTRY;
707
708         LASSERT(oa);
709         LASSERT(ea);
710         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
711
712         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
713             oa->o_flags == OBD_FL_RECREATE_OBJS) {
714                 RETURN(osc_real_create(exp, oa, ea, oti));
715         }
716
717         if (!fid_seq_is_mdt(ostid_seq(&oa->o_oi)))
718                 RETURN(osc_real_create(exp, oa, ea, oti));
719
720         /* we should not get here anymore */
721         LBUG();
722
723         RETURN(rc);
724 }
725
726 /* Destroy requests can be async always on the client, and we don't even really
727  * care about the return code since the client cannot do anything at all about
728  * a destroy failure.
729  * When the MDS is unlinking a filename, it saves the file objects into a
730  * recovery llog, and these object records are cancelled when the OST reports
731  * they were destroyed and sync'd to disk (i.e. transaction committed).
732  * If the client dies, or the OST is down when the object should be destroyed,
733  * the records are not cancelled, and when the OST reconnects to the MDS next,
734  * it will retrieve the llog unlink logs and then sends the log cancellation
735  * cookies to the MDS after committing destroy transactions. */
736 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
737                        struct obdo *oa, struct lov_stripe_md *ea,
738                        struct obd_trans_info *oti, struct obd_export *md_export,
739                        void *capa)
740 {
741         struct client_obd     *cli = &exp->exp_obd->u.cli;
742         struct ptlrpc_request *req;
743         struct ost_body       *body;
744         struct list_head       cancels = LIST_HEAD_INIT(cancels);
745         int rc, count;
746         ENTRY;
747
748         if (!oa) {
749                 CDEBUG(D_INFO, "oa NULL\n");
750                 RETURN(-EINVAL);
751         }
752
753         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
754                                         LDLM_FL_DISCARD_DATA);
755
756         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
757         if (req == NULL) {
758                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
759                 RETURN(-ENOMEM);
760         }
761
762         osc_set_capa_size(req, &RMF_CAPA1, (struct obd_capa *)capa);
763         rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
764                                0, &cancels, count);
765         if (rc) {
766                 ptlrpc_request_free(req);
767                 RETURN(rc);
768         }
769
770         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
771         ptlrpc_at_set_req_timeout(req);
772
773         if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
774                 oa->o_lcookie = *oti->oti_logcookies;
775         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
776         LASSERT(body);
777         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
778
779         osc_pack_capa(req, body, (struct obd_capa *)capa);
780         ptlrpc_request_set_replen(req);
781
782         /* If osc_destory is for destroying the unlink orphan,
783          * sent from MDT to OST, which should not be blocked here,
784          * because the process might be triggered by ptlrpcd, and
785          * it is not good to block ptlrpcd thread (b=16006)*/
786         if (!(oa->o_flags & OBD_FL_DELORPHAN)) {
787                 req->rq_interpret_reply = osc_destroy_interpret;
788                 if (!osc_can_send_destroy(cli)) {
789                         struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP,
790                                                           NULL);
791
792                         /*
793                          * Wait until the number of on-going destroy RPCs drops
794                          * under max_rpc_in_flight
795                          */
796                         l_wait_event_exclusive(cli->cl_destroy_waitq,
797                                                osc_can_send_destroy(cli), &lwi);
798                 }
799         }
800
801         /* Do not wait for response */
802         ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
803         RETURN(0);
804 }
805
806 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
807                                 long writing_bytes)
808 {
809         obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
810
811         LASSERT(!(oa->o_valid & bits));
812
813         oa->o_valid |= bits;
814         client_obd_list_lock(&cli->cl_loi_list_lock);
815         oa->o_dirty = cli->cl_dirty;
816         if (unlikely(cli->cl_dirty - cli->cl_dirty_transit >
817                      cli->cl_dirty_max)) {
818                 CERROR("dirty %lu - %lu > dirty_max %lu\n",
819                        cli->cl_dirty, cli->cl_dirty_transit, cli->cl_dirty_max);
820                 oa->o_undirty = 0;
821         } else if (unlikely(atomic_read(&obd_dirty_pages) -
822                             atomic_read(&obd_dirty_transit_pages) >
823                             (long)(obd_max_dirty_pages + 1))) {
824                 /* The atomic_read() allowing the atomic_inc() are
825                  * not covered by a lock thus they may safely race and trip
826                  * this CERROR() unless we add in a small fudge factor (+1). */
827                 CERROR("%s: dirty %d - %d > system dirty_max %d\n",
828                        cli->cl_import->imp_obd->obd_name,
829                        atomic_read(&obd_dirty_pages),
830                        atomic_read(&obd_dirty_transit_pages),
831                        obd_max_dirty_pages);
832                 oa->o_undirty = 0;
833         } else if (unlikely(cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff)) {
834                 CERROR("dirty %lu - dirty_max %lu too big???\n",
835                        cli->cl_dirty, cli->cl_dirty_max);
836                 oa->o_undirty = 0;
837         } else {
838                 long max_in_flight = (cli->cl_max_pages_per_rpc <<
839                                       PAGE_CACHE_SHIFT) *
840                                      (cli->cl_max_rpcs_in_flight + 1);
841                 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
842         }
843         oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
844         oa->o_dropped = cli->cl_lost_grant;
845         cli->cl_lost_grant = 0;
846         client_obd_list_unlock(&cli->cl_loi_list_lock);
847         CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
848                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
849
850 }
851
852 void osc_update_next_shrink(struct client_obd *cli)
853 {
854         cli->cl_next_shrink_grant =
855                 cfs_time_shift(cli->cl_grant_shrink_interval);
856         CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
857                cli->cl_next_shrink_grant);
858 }
859
860 static void __osc_update_grant(struct client_obd *cli, obd_size grant)
861 {
862         client_obd_list_lock(&cli->cl_loi_list_lock);
863         cli->cl_avail_grant += grant;
864         client_obd_list_unlock(&cli->cl_loi_list_lock);
865 }
866
867 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
868 {
869         if (body->oa.o_valid & OBD_MD_FLGRANT) {
870                 CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
871                 __osc_update_grant(cli, body->oa.o_grant);
872         }
873 }
874
875 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
876                               obd_count keylen, void *key, obd_count vallen,
877                               void *val, struct ptlrpc_request_set *set);
878
879 static int osc_shrink_grant_interpret(const struct lu_env *env,
880                                       struct ptlrpc_request *req,
881                                       void *aa, int rc)
882 {
883         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
884         struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
885         struct ost_body *body;
886
887         if (rc != 0) {
888                 __osc_update_grant(cli, oa->o_grant);
889                 GOTO(out, rc);
890         }
891
892         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
893         LASSERT(body);
894         osc_update_grant(cli, body);
895 out:
896         OBDO_FREE(oa);
897         return rc;
898 }
899
900 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
901 {
902         client_obd_list_lock(&cli->cl_loi_list_lock);
903         oa->o_grant = cli->cl_avail_grant / 4;
904         cli->cl_avail_grant -= oa->o_grant;
905         client_obd_list_unlock(&cli->cl_loi_list_lock);
906         if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
907                 oa->o_valid |= OBD_MD_FLFLAGS;
908                 oa->o_flags = 0;
909         }
910         oa->o_flags |= OBD_FL_SHRINK_GRANT;
911         osc_update_next_shrink(cli);
912 }
913
914 /* Shrink the current grant, either from some large amount to enough for a
915  * full set of in-flight RPCs, or if we have already shrunk to that limit
916  * then to enough for a single RPC.  This avoids keeping more grant than
917  * needed, and avoids shrinking the grant piecemeal. */
918 static int osc_shrink_grant(struct client_obd *cli)
919 {
920         __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) *
921                              (cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT);
922
923         client_obd_list_lock(&cli->cl_loi_list_lock);
924         if (cli->cl_avail_grant <= target_bytes)
925                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
926         client_obd_list_unlock(&cli->cl_loi_list_lock);
927
928         return osc_shrink_grant_to_target(cli, target_bytes);
929 }
930
931 int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
932 {
933         int                     rc = 0;
934         struct ost_body        *body;
935         ENTRY;
936
937         client_obd_list_lock(&cli->cl_loi_list_lock);
938         /* Don't shrink if we are already above or below the desired limit
939          * We don't want to shrink below a single RPC, as that will negatively
940          * impact block allocation and long-term performance. */
941         if (target_bytes < cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT)
942                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
943
944         if (target_bytes >= cli->cl_avail_grant) {
945                 client_obd_list_unlock(&cli->cl_loi_list_lock);
946                 RETURN(0);
947         }
948         client_obd_list_unlock(&cli->cl_loi_list_lock);
949
950         OBD_ALLOC_PTR(body);
951         if (!body)
952                 RETURN(-ENOMEM);
953
954         osc_announce_cached(cli, &body->oa, 0);
955
956         client_obd_list_lock(&cli->cl_loi_list_lock);
957         body->oa.o_grant = cli->cl_avail_grant - target_bytes;
958         cli->cl_avail_grant = target_bytes;
959         client_obd_list_unlock(&cli->cl_loi_list_lock);
960         if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
961                 body->oa.o_valid |= OBD_MD_FLFLAGS;
962                 body->oa.o_flags = 0;
963         }
964         body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
965         osc_update_next_shrink(cli);
966
967         rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
968                                 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
969                                 sizeof(*body), body, NULL);
970         if (rc != 0)
971                 __osc_update_grant(cli, body->oa.o_grant);
972         OBD_FREE_PTR(body);
973         RETURN(rc);
974 }
975
976 static int osc_should_shrink_grant(struct client_obd *client)
977 {
978         cfs_time_t time = cfs_time_current();
979         cfs_time_t next_shrink = client->cl_next_shrink_grant;
980
981         if ((client->cl_import->imp_connect_data.ocd_connect_flags &
982              OBD_CONNECT_GRANT_SHRINK) == 0)
983                 return 0;
984
985         if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
986                 /* Get the current RPC size directly, instead of going via:
987                  * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
988                  * Keep comment here so that it can be found by searching. */
989                 int brw_size = client->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
990
991                 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
992                     client->cl_avail_grant > brw_size)
993                         return 1;
994                 else
995                         osc_update_next_shrink(client);
996         }
997         return 0;
998 }
999
1000 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
1001 {
1002         struct client_obd *client;
1003
1004         list_for_each_entry(client, &item->ti_obd_list, cl_grant_shrink_list) {
1005                 if (osc_should_shrink_grant(client))
1006                         osc_shrink_grant(client);
1007         }
1008         return 0;
1009 }
1010
1011 static int osc_add_shrink_grant(struct client_obd *client)
1012 {
1013         int rc;
1014
1015         rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
1016                                        TIMEOUT_GRANT,
1017                                        osc_grant_shrink_grant_cb, NULL,
1018                                        &client->cl_grant_shrink_list);
1019         if (rc) {
1020                 CERROR("add grant client %s error %d\n",
1021                         client->cl_import->imp_obd->obd_name, rc);
1022                 return rc;
1023         }
1024         CDEBUG(D_CACHE, "add grant client %s \n",
1025                client->cl_import->imp_obd->obd_name);
1026         osc_update_next_shrink(client);
1027         return 0;
1028 }
1029
1030 static int osc_del_shrink_grant(struct client_obd *client)
1031 {
1032         return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
1033                                          TIMEOUT_GRANT);
1034 }
1035
1036 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
1037 {
1038         /*
1039          * ocd_grant is the total grant amount we're expect to hold: if we've
1040          * been evicted, it's the new avail_grant amount, cl_dirty will drop
1041          * to 0 as inflight RPCs fail out; otherwise, it's avail_grant + dirty.
1042          *
1043          * race is tolerable here: if we're evicted, but imp_state already
1044          * left EVICTED state, then cl_dirty must be 0 already.
1045          */
1046         client_obd_list_lock(&cli->cl_loi_list_lock);
1047         if (cli->cl_import->imp_state == LUSTRE_IMP_EVICTED)
1048                 cli->cl_avail_grant = ocd->ocd_grant;
1049         else
1050                 cli->cl_avail_grant = ocd->ocd_grant - cli->cl_dirty;
1051
1052         if (cli->cl_avail_grant < 0) {
1053                 CWARN("%s: available grant < 0: avail/ocd/dirty %ld/%u/%ld\n",
1054                       cli->cl_import->imp_obd->obd_name, cli->cl_avail_grant,
1055                       ocd->ocd_grant, cli->cl_dirty);
1056                 /* workaround for servers which do not have the patch from
1057                  * LU-2679 */
1058                 cli->cl_avail_grant = ocd->ocd_grant;
1059         }
1060
1061         /* determine the appropriate chunk size used by osc_extent. */
1062         cli->cl_chunkbits = max_t(int, PAGE_CACHE_SHIFT, ocd->ocd_blocksize);
1063         client_obd_list_unlock(&cli->cl_loi_list_lock);
1064
1065         CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld."
1066                 "chunk bits: %d.\n", cli->cl_import->imp_obd->obd_name,
1067                 cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits);
1068
1069         if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
1070             list_empty(&cli->cl_grant_shrink_list))
1071                 osc_add_shrink_grant(cli);
1072 }
1073
1074 /* We assume that the reason this OSC got a short read is because it read
1075  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1076  * via the LOV, and it _knows_ it's reading inside the file, it's just that
1077  * this stripe never got written at or beyond this stripe offset yet. */
1078 static void handle_short_read(int nob_read, obd_count page_count,
1079                               struct brw_page **pga)
1080 {
1081         char *ptr;
1082         int i = 0;
1083
1084         /* skip bytes read OK */
1085         while (nob_read > 0) {
1086                 LASSERT (page_count > 0);
1087
1088                 if (pga[i]->count > nob_read) {
1089                         /* EOF inside this page */
1090                         ptr = kmap(pga[i]->pg) +
1091                                 (pga[i]->off & ~CFS_PAGE_MASK);
1092                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1093                         kunmap(pga[i]->pg);
1094                         page_count--;
1095                         i++;
1096                         break;
1097                 }
1098
1099                 nob_read -= pga[i]->count;
1100                 page_count--;
1101                 i++;
1102         }
1103
1104         /* zero remaining pages */
1105         while (page_count-- > 0) {
1106                 ptr = kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
1107                 memset(ptr, 0, pga[i]->count);
1108                 kunmap(pga[i]->pg);
1109                 i++;
1110         }
1111 }
1112
1113 static int check_write_rcs(struct ptlrpc_request *req,
1114                            int requested_nob, int niocount,
1115                            obd_count page_count, struct brw_page **pga)
1116 {
1117         int     i;
1118         __u32   *remote_rcs;
1119
1120         remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1121                                                   sizeof(*remote_rcs) *
1122                                                   niocount);
1123         if (remote_rcs == NULL) {
1124                 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1125                 return(-EPROTO);
1126         }
1127
1128         /* return error if any niobuf was in error */
1129         for (i = 0; i < niocount; i++) {
1130                 if ((int)remote_rcs[i] < 0)
1131                         return(remote_rcs[i]);
1132
1133                 if (remote_rcs[i] != 0) {
1134                         CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1135                                 i, remote_rcs[i], req);
1136                         return(-EPROTO);
1137                 }
1138         }
1139
1140         if (req->rq_bulk->bd_nob_transferred != requested_nob) {
1141                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1142                        req->rq_bulk->bd_nob_transferred, requested_nob);
1143                 return(-EPROTO);
1144         }
1145
1146         return (0);
1147 }
1148
1149 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1150 {
1151         if (p1->flag != p2->flag) {
1152                 unsigned mask = ~(OBD_BRW_FROM_GRANT | OBD_BRW_NOCACHE |
1153                                   OBD_BRW_SYNC       | OBD_BRW_ASYNC   |
1154                                   OBD_BRW_NOQUOTA    | OBD_BRW_SOFT_SYNC);
1155
1156                 /* warn if we try to combine flags that we don't know to be
1157                  * safe to combine */
1158                 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
1159                         CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
1160                               "report this at http://bugs.whamcloud.com/\n",
1161                               p1->flag, p2->flag);
1162                 }
1163                 return 0;
1164         }
1165
1166         return (p1->off + p1->count == p2->off);
1167 }
1168
1169 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
1170                                    struct brw_page **pga, int opc,
1171                                    cksum_type_t cksum_type)
1172 {
1173         __u32                           cksum;
1174         int                             i = 0;
1175         struct cfs_crypto_hash_desc     *hdesc;
1176         unsigned int                    bufsize;
1177         int                             err;
1178         unsigned char                   cfs_alg = cksum_obd2cfs(cksum_type);
1179
1180         LASSERT(pg_count > 0);
1181
1182         hdesc = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1183         if (IS_ERR(hdesc)) {
1184                 CERROR("Unable to initialize checksum hash %s\n",
1185                        cfs_crypto_hash_name(cfs_alg));
1186                 return PTR_ERR(hdesc);
1187         }
1188
1189         while (nob > 0 && pg_count > 0) {
1190                 int count = pga[i]->count > nob ? nob : pga[i]->count;
1191
1192                 /* corrupt the data before we compute the checksum, to
1193                  * simulate an OST->client data error */
1194                 if (i == 0 && opc == OST_READ &&
1195                     OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
1196                         unsigned char *ptr = kmap(pga[i]->pg);
1197                         int off = pga[i]->off & ~CFS_PAGE_MASK;
1198                         memcpy(ptr + off, "bad1", min(4, nob));
1199                         kunmap(pga[i]->pg);
1200                 }
1201                 cfs_crypto_hash_update_page(hdesc, pga[i]->pg,
1202                                   pga[i]->off & ~CFS_PAGE_MASK,
1203                                   count);
1204                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d\n",
1205                                (int)(pga[i]->off & ~CFS_PAGE_MASK));
1206
1207                 nob -= pga[i]->count;
1208                 pg_count--;
1209                 i++;
1210         }
1211
1212         bufsize = 4;
1213         err = cfs_crypto_hash_final(hdesc, (unsigned char *)&cksum, &bufsize);
1214
1215         if (err)
1216                 cfs_crypto_hash_final(hdesc, NULL, NULL);
1217
1218         /* For sending we only compute the wrong checksum instead
1219          * of corrupting the data so it is still correct on a redo */
1220         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1221                 cksum++;
1222
1223         return cksum;
1224 }
1225
1226 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
1227                                 struct lov_stripe_md *lsm, obd_count page_count,
1228                                 struct brw_page **pga,
1229                                 struct ptlrpc_request **reqp,
1230                                 struct obd_capa *ocapa, int reserve,
1231                                 int resend)
1232 {
1233         struct ptlrpc_request   *req;
1234         struct ptlrpc_bulk_desc *desc;
1235         struct ost_body         *body;
1236         struct obd_ioobj        *ioobj;
1237         struct niobuf_remote    *niobuf;
1238         int niocount, i, requested_nob, opc, rc;
1239         struct osc_brw_async_args *aa;
1240         struct req_capsule      *pill;
1241         struct brw_page *pg_prev;
1242
1243         ENTRY;
1244         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1245                 RETURN(-ENOMEM); /* Recoverable */
1246         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1247                 RETURN(-EINVAL); /* Fatal */
1248
1249         if ((cmd & OBD_BRW_WRITE) != 0) {
1250                 opc = OST_WRITE;
1251                 req = ptlrpc_request_alloc_pool(cli->cl_import,
1252                                                 cli->cl_import->imp_rq_pool,
1253                                                 &RQF_OST_BRW_WRITE);
1254         } else {
1255                 opc = OST_READ;
1256                 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1257         }
1258         if (req == NULL)
1259                 RETURN(-ENOMEM);
1260
1261         for (niocount = i = 1; i < page_count; i++) {
1262                 if (!can_merge_pages(pga[i - 1], pga[i]))
1263                         niocount++;
1264         }
1265
1266         pill = &req->rq_pill;
1267         req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1268                              sizeof(*ioobj));
1269         req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1270                              niocount * sizeof(*niobuf));
1271         osc_set_capa_size(req, &RMF_CAPA1, ocapa);
1272
1273         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1274         if (rc) {
1275                 ptlrpc_request_free(req);
1276                 RETURN(rc);
1277         }
1278         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1279         ptlrpc_at_set_req_timeout(req);
1280         /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1281          * retry logic */
1282         req->rq_no_retry_einprogress = 1;
1283
1284         desc = ptlrpc_prep_bulk_imp(req, page_count,
1285                 cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
1286                 opc == OST_WRITE ? BULK_GET_SOURCE : BULK_PUT_SINK,
1287                 OST_BULK_PORTAL);
1288
1289         if (desc == NULL)
1290                 GOTO(out, rc = -ENOMEM);
1291         /* NB request now owns desc and will free it when it gets freed */
1292
1293         body = req_capsule_client_get(pill, &RMF_OST_BODY);
1294         ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1295         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1296         LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1297
1298         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
1299
1300         obdo_to_ioobj(oa, ioobj);
1301         ioobj->ioo_bufcnt = niocount;
1302         /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
1303          * that might be send for this request.  The actual number is decided
1304          * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
1305          * "max - 1" for old client compatibility sending "0", and also so the
1306          * the actual maximum is a power-of-two number, not one less. LU-1431 */
1307         ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
1308         osc_pack_capa(req, body, ocapa);
1309         LASSERT(page_count > 0);
1310         pg_prev = pga[0];
1311         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1312                 struct brw_page *pg = pga[i];
1313                 int poff = pg->off & ~CFS_PAGE_MASK;
1314
1315                 LASSERT(pg->count > 0);
1316                 /* make sure there is no gap in the middle of page array */
1317                 LASSERTF(page_count == 1 ||
1318                          (ergo(i == 0, poff + pg->count == PAGE_CACHE_SIZE) &&
1319                           ergo(i > 0 && i < page_count - 1,
1320                                poff == 0 && pg->count == PAGE_CACHE_SIZE)   &&
1321                           ergo(i == page_count - 1, poff == 0)),
1322                          "i: %d/%d pg: %p off: "LPU64", count: %u\n",
1323                          i, page_count, pg, pg->off, pg->count);
1324 #ifdef __linux__
1325                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1326                          "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1327                          " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1328                          i, page_count,
1329                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1330                          pg_prev->pg, page_private(pg_prev->pg),
1331                          pg_prev->pg->index, pg_prev->off);
1332 #else
1333                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1334                          "i %d p_c %u\n", i, page_count);
1335 #endif
1336                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1337                         (pg->flag & OBD_BRW_SRVLOCK));
1338
1339                 ptlrpc_prep_bulk_page_pin(desc, pg->pg, poff, pg->count);
1340                 requested_nob += pg->count;
1341
1342                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1343                         niobuf--;
1344                         niobuf->len += pg->count;
1345                 } else {
1346                         niobuf->offset = pg->off;
1347                         niobuf->len    = pg->count;
1348                         niobuf->flags  = pg->flag;
1349                 }
1350                 pg_prev = pg;
1351         }
1352
1353         LASSERTF((void *)(niobuf - niocount) ==
1354                 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1355                 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1356                 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1357
1358         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1359         if (resend) {
1360                 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1361                         body->oa.o_valid |= OBD_MD_FLFLAGS;
1362                         body->oa.o_flags = 0;
1363                 }
1364                 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1365         }
1366
1367         if (osc_should_shrink_grant(cli))
1368                 osc_shrink_grant_local(cli, &body->oa);
1369
1370         /* size[REQ_REC_OFF] still sizeof (*body) */
1371         if (opc == OST_WRITE) {
1372                 if (cli->cl_checksum &&
1373                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1374                         /* store cl_cksum_type in a local variable since
1375                          * it can be changed via lprocfs */
1376                         cksum_type_t cksum_type = cli->cl_cksum_type;
1377
1378                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1379                                 oa->o_flags &= OBD_FL_LOCAL_MASK;
1380                                 body->oa.o_flags = 0;
1381                         }
1382                         body->oa.o_flags |= cksum_type_pack(cksum_type);
1383                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1384                         body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1385                                                              page_count, pga,
1386                                                              OST_WRITE,
1387                                                              cksum_type);
1388                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1389                                body->oa.o_cksum);
1390                         /* save this in 'oa', too, for later checking */
1391                         oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1392                         oa->o_flags |= cksum_type_pack(cksum_type);
1393                 } else {
1394                         /* clear out the checksum flag, in case this is a
1395                          * resend but cl_checksum is no longer set. b=11238 */
1396                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1397                 }
1398                 oa->o_cksum = body->oa.o_cksum;
1399                 /* 1 RC per niobuf */
1400                 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1401                                      sizeof(__u32) * niocount);
1402         } else {
1403                 if (cli->cl_checksum &&
1404                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1405                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1406                                 body->oa.o_flags = 0;
1407                         body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1408                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1409                 }
1410         }
1411         ptlrpc_request_set_replen(req);
1412
1413         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1414         aa = ptlrpc_req_async_args(req);
1415         aa->aa_oa = oa;
1416         aa->aa_requested_nob = requested_nob;
1417         aa->aa_nio_count = niocount;
1418         aa->aa_page_count = page_count;
1419         aa->aa_resends = 0;
1420         aa->aa_ppga = pga;
1421         aa->aa_cli = cli;
1422         INIT_LIST_HEAD(&aa->aa_oaps);
1423         if (ocapa && reserve)
1424                 aa->aa_ocapa = capa_get(ocapa);
1425
1426         *reqp = req;
1427         RETURN(0);
1428
1429  out:
1430         ptlrpc_req_finished(req);
1431         RETURN(rc);
1432 }
1433
1434 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1435                                 __u32 client_cksum, __u32 server_cksum, int nob,
1436                                 obd_count page_count, struct brw_page **pga,
1437                                 cksum_type_t client_cksum_type)
1438 {
1439         __u32 new_cksum;
1440         char *msg;
1441         cksum_type_t cksum_type;
1442
1443         if (server_cksum == client_cksum) {
1444                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1445                 return 0;
1446         }
1447
1448         cksum_type = cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1449                                        oa->o_flags : 0);
1450         new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1451                                       cksum_type);
1452
1453         if (cksum_type != client_cksum_type)
1454                 msg = "the server did not use the checksum type specified in "
1455                       "the original request - likely a protocol problem";
1456         else if (new_cksum == server_cksum)
1457                 msg = "changed on the client after we checksummed it - "
1458                       "likely false positive due to mmap IO (bug 11742)";
1459         else if (new_cksum == client_cksum)
1460                 msg = "changed in transit before arrival at OST";
1461         else
1462                 msg = "changed in transit AND doesn't match the original - "
1463                       "likely false positive due to mmap IO (bug 11742)";
1464
1465         LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inode "DFID
1466                            " object "DOSTID" extent ["LPU64"-"LPU64"]\n",
1467                            msg, libcfs_nid2str(peer->nid),
1468                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1469                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1470                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1471                            POSTID(&oa->o_oi), pga[0]->off,
1472                            pga[page_count-1]->off + pga[page_count-1]->count - 1);
1473         CERROR("original client csum %x (type %x), server csum %x (type %x), "
1474                "client csum now %x\n", client_cksum, client_cksum_type,
1475                server_cksum, cksum_type, new_cksum);
1476         return 1;
1477 }
1478
1479 /* Note rc enters this function as number of bytes transferred */
1480 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1481 {
1482         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1483         const lnet_process_id_t *peer =
1484                         &req->rq_import->imp_connection->c_peer;
1485         struct client_obd *cli = aa->aa_cli;
1486         struct ost_body *body;
1487         __u32 client_cksum = 0;
1488         ENTRY;
1489
1490         if (rc < 0 && rc != -EDQUOT) {
1491                 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1492                 RETURN(rc);
1493         }
1494
1495         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1496         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1497         if (body == NULL) {
1498                 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1499                 RETURN(-EPROTO);
1500         }
1501
1502         /* set/clear over quota flag for a uid/gid */
1503         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1504             body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) {
1505                 unsigned int qid[MAXQUOTAS] = { body->oa.o_uid, body->oa.o_gid };
1506
1507                 CDEBUG(D_QUOTA, "setdq for [%u %u] with valid "LPX64", flags %x\n",
1508                        body->oa.o_uid, body->oa.o_gid, body->oa.o_valid,
1509                        body->oa.o_flags);
1510                 osc_quota_setdq(cli, qid, body->oa.o_valid, body->oa.o_flags);
1511         }
1512
1513         osc_update_grant(cli, body);
1514
1515         if (rc < 0)
1516                 RETURN(rc);
1517
1518         if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1519                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1520
1521         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1522                 if (rc > 0) {
1523                         CERROR("Unexpected +ve rc %d\n", rc);
1524                         RETURN(-EPROTO);
1525                 }
1526                 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1527
1528                 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1529                         RETURN(-EAGAIN);
1530
1531                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1532                     check_write_checksum(&body->oa, peer, client_cksum,
1533                                          body->oa.o_cksum, aa->aa_requested_nob,
1534                                          aa->aa_page_count, aa->aa_ppga,
1535                                          cksum_type_unpack(aa->aa_oa->o_flags)))
1536                         RETURN(-EAGAIN);
1537
1538                 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1539                                      aa->aa_page_count, aa->aa_ppga);
1540                 GOTO(out, rc);
1541         }
1542
1543         /* The rest of this function executes only for OST_READs */
1544
1545         /* if unwrap_bulk failed, return -EAGAIN to retry */
1546         rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1547         if (rc < 0)
1548                 GOTO(out, rc = -EAGAIN);
1549
1550         if (rc > aa->aa_requested_nob) {
1551                 CERROR("Unexpected rc %d (%d requested)\n", rc,
1552                        aa->aa_requested_nob);
1553                 RETURN(-EPROTO);
1554         }
1555
1556         if (rc != req->rq_bulk->bd_nob_transferred) {
1557                 CERROR ("Unexpected rc %d (%d transferred)\n",
1558                         rc, req->rq_bulk->bd_nob_transferred);
1559                 return (-EPROTO);
1560         }
1561
1562         if (rc < aa->aa_requested_nob)
1563                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1564
1565         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1566                 static int cksum_counter;
1567                 __u32      server_cksum = body->oa.o_cksum;
1568                 char      *via;
1569                 char      *router;
1570                 cksum_type_t cksum_type;
1571
1572                 cksum_type = cksum_type_unpack(body->oa.o_valid &OBD_MD_FLFLAGS?
1573                                                body->oa.o_flags : 0);
1574                 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1575                                                  aa->aa_ppga, OST_READ,
1576                                                  cksum_type);
1577
1578                 if (peer->nid == req->rq_bulk->bd_sender) {
1579                         via = router = "";
1580                 } else {
1581                         via = " via ";
1582                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
1583                 }
1584
1585                 if (server_cksum != client_cksum) {
1586                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1587                                            "%s%s%s inode "DFID" object "DOSTID
1588                                            " extent ["LPU64"-"LPU64"]\n",
1589                                            req->rq_import->imp_obd->obd_name,
1590                                            libcfs_nid2str(peer->nid),
1591                                            via, router,
1592                                            body->oa.o_valid & OBD_MD_FLFID ?
1593                                                 body->oa.o_parent_seq : (__u64)0,
1594                                            body->oa.o_valid & OBD_MD_FLFID ?
1595                                                 body->oa.o_parent_oid : 0,
1596                                            body->oa.o_valid & OBD_MD_FLFID ?
1597                                                 body->oa.o_parent_ver : 0,
1598                                            POSTID(&body->oa.o_oi),
1599                                            aa->aa_ppga[0]->off,
1600                                            aa->aa_ppga[aa->aa_page_count-1]->off +
1601                                            aa->aa_ppga[aa->aa_page_count-1]->count -
1602                                                                         1);
1603                         CERROR("client %x, server %x, cksum_type %x\n",
1604                                client_cksum, server_cksum, cksum_type);
1605                         cksum_counter = 0;
1606                         aa->aa_oa->o_cksum = client_cksum;
1607                         rc = -EAGAIN;
1608                 } else {
1609                         cksum_counter++;
1610                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1611                         rc = 0;
1612                 }
1613         } else if (unlikely(client_cksum)) {
1614                 static int cksum_missed;
1615
1616                 cksum_missed++;
1617                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1618                         CERROR("Checksum %u requested from %s but not sent\n",
1619                                cksum_missed, libcfs_nid2str(peer->nid));
1620         } else {
1621                 rc = 0;
1622         }
1623 out:
1624         if (rc >= 0)
1625                 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
1626                                      aa->aa_oa, &body->oa);
1627
1628         RETURN(rc);
1629 }
1630
1631 static int osc_brw_redo_request(struct ptlrpc_request *request,
1632                                 struct osc_brw_async_args *aa, int rc)
1633 {
1634         struct ptlrpc_request *new_req;
1635         struct osc_brw_async_args *new_aa;
1636         struct osc_async_page *oap;
1637         ENTRY;
1638
1639         DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
1640                   "redo for recoverable error %d", rc);
1641
1642         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1643                                         OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1644                                   aa->aa_cli, aa->aa_oa,
1645                                   NULL /* lsm unused by osc currently */,
1646                                   aa->aa_page_count, aa->aa_ppga,
1647                                   &new_req, aa->aa_ocapa, 0, 1);
1648         if (rc)
1649                 RETURN(rc);
1650
1651         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1652                 if (oap->oap_request != NULL) {
1653                         LASSERTF(request == oap->oap_request,
1654                                  "request %p != oap_request %p\n",
1655                                  request, oap->oap_request);
1656                         if (oap->oap_interrupted) {
1657                                 ptlrpc_req_finished(new_req);
1658                                 RETURN(-EINTR);
1659                         }
1660                 }
1661         }
1662         /* New request takes over pga and oaps from old request.
1663          * Note that copying a list_head doesn't work, need to move it... */
1664         aa->aa_resends++;
1665         new_req->rq_interpret_reply = request->rq_interpret_reply;
1666         new_req->rq_async_args = request->rq_async_args;
1667         new_req->rq_commit_cb = request->rq_commit_cb;
1668         /* cap resend delay to the current request timeout, this is similar to
1669          * what ptlrpc does (see after_reply()) */
1670         if (aa->aa_resends > new_req->rq_timeout)
1671                 new_req->rq_sent = cfs_time_current_sec() + new_req->rq_timeout;
1672         else
1673                 new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
1674         new_req->rq_generation_set = 1;
1675         new_req->rq_import_generation = request->rq_import_generation;
1676
1677         new_aa = ptlrpc_req_async_args(new_req);
1678
1679         INIT_LIST_HEAD(&new_aa->aa_oaps);
1680         list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
1681         INIT_LIST_HEAD(&new_aa->aa_exts);
1682         list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
1683         new_aa->aa_resends = aa->aa_resends;
1684
1685         list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1686                 if (oap->oap_request) {
1687                         ptlrpc_req_finished(oap->oap_request);
1688                         oap->oap_request = ptlrpc_request_addref(new_req);
1689                 }
1690         }
1691
1692         new_aa->aa_ocapa = aa->aa_ocapa;
1693         aa->aa_ocapa = NULL;
1694
1695         /* XXX: This code will run into problem if we're going to support
1696          * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
1697          * and wait for all of them to be finished. We should inherit request
1698          * set from old request. */
1699         ptlrpcd_add_req(new_req, PDL_POLICY_SAME, -1);
1700
1701         DEBUG_REQ(D_INFO, new_req, "new request");
1702         RETURN(0);
1703 }
1704
1705 /*
1706  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1707  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1708  * fine for our small page arrays and doesn't require allocation.  its an
1709  * insertion sort that swaps elements that are strides apart, shrinking the
1710  * stride down until its '1' and the array is sorted.
1711  */
1712 static void sort_brw_pages(struct brw_page **array, int num)
1713 {
1714         int stride, i, j;
1715         struct brw_page *tmp;
1716
1717         if (num == 1)
1718                 return;
1719         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1720                 ;
1721
1722         do {
1723                 stride /= 3;
1724                 for (i = stride ; i < num ; i++) {
1725                         tmp = array[i];
1726                         j = i;
1727                         while (j >= stride && array[j - stride]->off > tmp->off) {
1728                                 array[j] = array[j - stride];
1729                                 j -= stride;
1730                         }
1731                         array[j] = tmp;
1732                 }
1733         } while (stride > 1);
1734 }
1735
1736 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1737 {
1738         LASSERT(ppga != NULL);
1739         OBD_FREE(ppga, sizeof(*ppga) * count);
1740 }
1741
1742 static int brw_interpret(const struct lu_env *env,
1743                          struct ptlrpc_request *req, void *data, int rc)
1744 {
1745         struct osc_brw_async_args *aa = data;
1746         struct osc_extent *ext;
1747         struct osc_extent *tmp;
1748         struct client_obd *cli = aa->aa_cli;
1749         ENTRY;
1750
1751         rc = osc_brw_fini_request(req, rc);
1752         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
1753         /* When server return -EINPROGRESS, client should always retry
1754          * regardless of the number of times the bulk was resent already. */
1755         if (osc_recoverable_error(rc)) {
1756                 if (req->rq_import_generation !=
1757                     req->rq_import->imp_generation) {
1758                         CDEBUG(D_HA, "%s: resend cross eviction for object: "
1759                                ""DOSTID", rc = %d.\n",
1760                                req->rq_import->imp_obd->obd_name,
1761                                POSTID(&aa->aa_oa->o_oi), rc);
1762                 } else if (rc == -EINPROGRESS ||
1763                     client_should_resend(aa->aa_resends, aa->aa_cli)) {
1764                         rc = osc_brw_redo_request(req, aa, rc);
1765                 } else {
1766                         CERROR("%s: too many resent retries for object: "
1767                                ""LPU64":"LPU64", rc = %d.\n",
1768                                req->rq_import->imp_obd->obd_name,
1769                                POSTID(&aa->aa_oa->o_oi), rc);
1770                 }
1771
1772                 if (rc == 0)
1773                         RETURN(0);
1774                 else if (rc == -EAGAIN || rc == -EINPROGRESS)
1775                         rc = -EIO;
1776         }
1777
1778         if (aa->aa_ocapa) {
1779                 capa_put(aa->aa_ocapa);
1780                 aa->aa_ocapa = NULL;
1781         }
1782
1783         if (rc == 0) {
1784                 struct obdo *oa = aa->aa_oa;
1785                 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
1786                 unsigned long valid = 0;
1787                 struct cl_object *obj;
1788                 struct osc_async_page *last;
1789
1790                 last = brw_page2oap(aa->aa_ppga[aa->aa_page_count - 1]);
1791                 obj = osc2cl(last->oap_obj);
1792
1793                 cl_object_attr_lock(obj);
1794                 if (oa->o_valid & OBD_MD_FLBLOCKS) {
1795                         attr->cat_blocks = oa->o_blocks;
1796                         valid |= CAT_BLOCKS;
1797                 }
1798                 if (oa->o_valid & OBD_MD_FLMTIME) {
1799                         attr->cat_mtime = oa->o_mtime;
1800                         valid |= CAT_MTIME;
1801                 }
1802                 if (oa->o_valid & OBD_MD_FLATIME) {
1803                         attr->cat_atime = oa->o_atime;
1804                         valid |= CAT_ATIME;
1805                 }
1806                 if (oa->o_valid & OBD_MD_FLCTIME) {
1807                         attr->cat_ctime = oa->o_ctime;
1808                         valid |= CAT_CTIME;
1809                 }
1810
1811                 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1812                         struct lov_oinfo *loi = cl2osc(obj)->oo_oinfo;
1813                         loff_t last_off = last->oap_count + last->oap_obj_off;
1814
1815                         /* Change file size if this is an out of quota or
1816                          * direct IO write and it extends the file size */
1817                         if (loi->loi_lvb.lvb_size < last_off) {
1818                                 attr->cat_size = last_off;
1819                                 valid |= CAT_SIZE;
1820                         }
1821                         /* Extend KMS if it's not a lockless write */
1822                         if (loi->loi_kms < last_off &&
1823                             oap2osc_page(last)->ops_srvlock == 0) {
1824                                 attr->cat_kms = last_off;
1825                                 valid |= CAT_KMS;
1826                         }
1827                 }
1828
1829                 if (valid != 0)
1830                         cl_object_attr_set(env, obj, attr, valid);
1831                 cl_object_attr_unlock(obj);
1832         }
1833         OBDO_FREE(aa->aa_oa);
1834
1835         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE && rc == 0)
1836                 osc_inc_unstable_pages(req);
1837
1838         list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
1839                 list_del_init(&ext->oe_link);
1840                 osc_extent_finish(env, ext, 1, rc);
1841         }
1842         LASSERT(list_empty(&aa->aa_exts));
1843         LASSERT(list_empty(&aa->aa_oaps));
1844
1845         cl_req_completion(env, aa->aa_clerq, rc < 0 ? rc :
1846                           req->rq_bulk->bd_nob_transferred);
1847         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1848         ptlrpc_lprocfs_brw(req, req->rq_bulk->bd_nob_transferred);
1849
1850         client_obd_list_lock(&cli->cl_loi_list_lock);
1851         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
1852          * is called so we know whether to go to sync BRWs or wait for more
1853          * RPCs to complete */
1854         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
1855                 cli->cl_w_in_flight--;
1856         else
1857                 cli->cl_r_in_flight--;
1858         osc_wake_cache_waiters(cli);
1859         client_obd_list_unlock(&cli->cl_loi_list_lock);
1860
1861         osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
1862         RETURN(rc);
1863 }
1864
1865 static void brw_commit(struct ptlrpc_request *req)
1866 {
1867         /* If osc_inc_unstable_pages (via osc_extent_finish) races with
1868          * this called via the rq_commit_cb, I need to ensure
1869          * osc_dec_unstable_pages is still called. Otherwise unstable
1870          * pages may be leaked. */
1871         spin_lock(&req->rq_lock);
1872         if (likely(req->rq_unstable)) {
1873                 req->rq_unstable = 0;
1874                 spin_unlock(&req->rq_lock);
1875
1876                 osc_dec_unstable_pages(req);
1877         } else {
1878                 req->rq_committed = 1;
1879                 spin_unlock(&req->rq_lock);
1880         }
1881 }
1882
1883 /**
1884  * Build an RPC by the list of extent @ext_list. The caller must ensure
1885  * that the total pages in this list are NOT over max pages per RPC.
1886  * Extents in the list must be in OES_RPC state.
1887  */
1888 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
1889                   struct list_head *ext_list, int cmd, pdl_policy_t pol)
1890 {
1891         struct ptlrpc_request           *req = NULL;
1892         struct osc_extent               *ext;
1893         struct brw_page                 **pga = NULL;
1894         struct osc_brw_async_args       *aa = NULL;
1895         struct obdo                     *oa = NULL;
1896         struct osc_async_page           *oap;
1897         struct osc_async_page           *tmp;
1898         struct cl_req                   *clerq = NULL;
1899         enum cl_req_type                crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE :
1900                                                                       CRT_READ;
1901         struct cl_req_attr              *crattr = NULL;
1902         obd_off                         starting_offset = OBD_OBJECT_EOF;
1903         obd_off                         ending_offset = 0;
1904         int                             mpflag = 0;
1905         int                             mem_tight = 0;
1906         int                             page_count = 0;
1907         bool                            soft_sync = false;
1908         int                             i;
1909         int                             rc;
1910         struct list_head                rpc_list = LIST_HEAD_INIT(rpc_list);
1911
1912         ENTRY;
1913         LASSERT(!list_empty(ext_list));
1914
1915         /* add pages into rpc_list to build BRW rpc */
1916         list_for_each_entry(ext, ext_list, oe_link) {
1917                 LASSERT(ext->oe_state == OES_RPC);
1918                 mem_tight |= ext->oe_memalloc;
1919                 list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
1920                         ++page_count;
1921                         list_add_tail(&oap->oap_rpc_item, &rpc_list);
1922                         if (starting_offset > oap->oap_obj_off)
1923                                 starting_offset = oap->oap_obj_off;
1924                         else
1925                                 LASSERT(oap->oap_page_off == 0);
1926                         if (ending_offset < oap->oap_obj_off + oap->oap_count)
1927                                 ending_offset = oap->oap_obj_off +
1928                                                 oap->oap_count;
1929                         else
1930                                 LASSERT(oap->oap_page_off + oap->oap_count ==
1931                                         PAGE_CACHE_SIZE);
1932                 }
1933         }
1934
1935         soft_sync = osc_over_unstable_soft_limit(cli);
1936         if (mem_tight)
1937                 mpflag = cfs_memory_pressure_get_and_set();
1938
1939         OBD_ALLOC(crattr, sizeof(*crattr));
1940         if (crattr == NULL)
1941                 GOTO(out, rc = -ENOMEM);
1942
1943         OBD_ALLOC(pga, sizeof(*pga) * page_count);
1944         if (pga == NULL)
1945                 GOTO(out, rc = -ENOMEM);
1946
1947         OBDO_ALLOC(oa);
1948         if (oa == NULL)
1949                 GOTO(out, rc = -ENOMEM);
1950
1951         i = 0;
1952         list_for_each_entry(oap, &rpc_list, oap_rpc_item) {
1953                 struct cl_page *page = oap2cl_page(oap);
1954                 if (clerq == NULL) {
1955                         clerq = cl_req_alloc(env, page, crt,
1956                                              1 /* only 1-object rpcs for now */);
1957                         if (IS_ERR(clerq))
1958                                 GOTO(out, rc = PTR_ERR(clerq));
1959                 }
1960                 if (mem_tight)
1961                         oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
1962                 if (soft_sync)
1963                         oap->oap_brw_flags |= OBD_BRW_SOFT_SYNC;
1964                 pga[i] = &oap->oap_brw_page;
1965                 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
1966                 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
1967                        pga[i]->pg, page_index(oap->oap_page), oap,
1968                        pga[i]->flag);
1969                 i++;
1970                 cl_req_page_add(env, clerq, page);
1971         }
1972
1973         /* always get the data for the obdo for the rpc */
1974         LASSERT(clerq != NULL);
1975         crattr->cra_oa = oa;
1976         cl_req_attr_set(env, clerq, crattr, ~0ULL);
1977
1978         rc = cl_req_prep(env, clerq);
1979         if (rc != 0) {
1980                 CERROR("cl_req_prep failed: %d\n", rc);
1981                 GOTO(out, rc);
1982         }
1983
1984         sort_brw_pages(pga, page_count);
1985         rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
1986                         pga, &req, crattr->cra_capa, 1, 0);
1987         if (rc != 0) {
1988                 CERROR("prep_req failed: %d\n", rc);
1989                 GOTO(out, rc);
1990         }
1991
1992         req->rq_commit_cb = brw_commit;
1993         req->rq_interpret_reply = brw_interpret;
1994
1995         if (mem_tight != 0)
1996                 req->rq_memalloc = 1;
1997
1998         /* Need to update the timestamps after the request is built in case
1999          * we race with setattr (locally or in queue at OST).  If OST gets
2000          * later setattr before earlier BRW (as determined by the request xid),
2001          * the OST will not use BRW timestamps.  Sadly, there is no obvious
2002          * way to do this in a single call.  bug 10150 */
2003         cl_req_attr_set(env, clerq, crattr,
2004                         OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME);
2005
2006         lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid);
2007
2008         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2009         aa = ptlrpc_req_async_args(req);
2010         INIT_LIST_HEAD(&aa->aa_oaps);
2011         list_splice_init(&rpc_list, &aa->aa_oaps);
2012         INIT_LIST_HEAD(&aa->aa_exts);
2013         list_splice_init(ext_list, &aa->aa_exts);
2014         aa->aa_clerq = clerq;
2015
2016         /* queued sync pages can be torn down while the pages
2017          * were between the pending list and the rpc */
2018         tmp = NULL;
2019         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2020                 /* only one oap gets a request reference */
2021                 if (tmp == NULL)
2022                         tmp = oap;
2023                 if (oap->oap_interrupted && !req->rq_intr) {
2024                         CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2025                                         oap, req);
2026                         ptlrpc_mark_interrupted(req);
2027                 }
2028         }
2029         if (tmp != NULL)
2030                 tmp->oap_request = ptlrpc_request_addref(req);
2031
2032         client_obd_list_lock(&cli->cl_loi_list_lock);
2033         starting_offset >>= PAGE_CACHE_SHIFT;
2034         if (cmd == OBD_BRW_READ) {
2035                 cli->cl_r_in_flight++;
2036                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2037                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2038                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2039                                       starting_offset + 1);
2040         } else {
2041                 cli->cl_w_in_flight++;
2042                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2043                 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
2044                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2045                                       starting_offset + 1);
2046         }
2047         client_obd_list_unlock(&cli->cl_loi_list_lock);
2048
2049         DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2050                   page_count, aa, cli->cl_r_in_flight,
2051                   cli->cl_w_in_flight);
2052
2053         /* XXX: Maybe the caller can check the RPC bulk descriptor to
2054          * see which CPU/NUMA node the majority of pages were allocated
2055          * on, and try to assign the async RPC to the CPU core
2056          * (PDL_POLICY_PREFERRED) to reduce cross-CPU memory traffic.
2057          *
2058          * But on the other hand, we expect that multiple ptlrpcd
2059          * threads and the initial write sponsor can run in parallel,
2060          * especially when data checksum is enabled, which is CPU-bound
2061          * operation and single ptlrpcd thread cannot process in time.
2062          * So more ptlrpcd threads sharing BRW load
2063          * (with PDL_POLICY_ROUND) seems better.
2064          */
2065         ptlrpcd_add_req(req, pol, -1);
2066         rc = 0;
2067         EXIT;
2068
2069 out:
2070         if (mem_tight != 0)
2071                 cfs_memory_pressure_restore(mpflag);
2072
2073         if (crattr != NULL) {
2074                 capa_put(crattr->cra_capa);
2075                 OBD_FREE(crattr, sizeof(*crattr));
2076         }
2077
2078         if (rc != 0) {
2079                 LASSERT(req == NULL);
2080
2081                 if (oa)
2082                         OBDO_FREE(oa);
2083                 if (pga)
2084                         OBD_FREE(pga, sizeof(*pga) * page_count);
2085                 /* this should happen rarely and is pretty bad, it makes the
2086                  * pending list not follow the dirty order */
2087                 while (!list_empty(ext_list)) {
2088                         ext = list_entry(ext_list->next, struct osc_extent,
2089                                          oe_link);
2090                         list_del_init(&ext->oe_link);
2091                         osc_extent_finish(env, ext, 0, rc);
2092                 }
2093                 if (clerq && !IS_ERR(clerq))
2094                         cl_req_completion(env, clerq, rc);
2095         }
2096         RETURN(rc);
2097 }
2098
2099 static int osc_set_lock_data_with_check(struct ldlm_lock *lock,
2100                                         struct ldlm_enqueue_info *einfo)
2101 {
2102         void *data = einfo->ei_cbdata;
2103         int set = 0;
2104
2105         LASSERT(lock != NULL);
2106         LASSERT(lock->l_blocking_ast == einfo->ei_cb_bl);
2107         LASSERT(lock->l_resource->lr_type == einfo->ei_type);
2108         LASSERT(lock->l_completion_ast == einfo->ei_cb_cp);
2109         LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl);
2110
2111         lock_res_and_lock(lock);
2112         spin_lock(&osc_ast_guard);
2113
2114         if (lock->l_ast_data == NULL)
2115                 lock->l_ast_data = data;
2116         if (lock->l_ast_data == data)
2117                 set = 1;
2118
2119         spin_unlock(&osc_ast_guard);
2120         unlock_res_and_lock(lock);
2121
2122         return set;
2123 }
2124
2125 static int osc_set_data_with_check(struct lustre_handle *lockh,
2126                                    struct ldlm_enqueue_info *einfo)
2127 {
2128         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2129         int set = 0;
2130
2131         if (lock != NULL) {
2132                 set = osc_set_lock_data_with_check(lock, einfo);
2133                 LDLM_LOCK_PUT(lock);
2134         } else
2135                 CERROR("lockh %p, data %p - client evicted?\n",
2136                        lockh, einfo->ei_cbdata);
2137         return set;
2138 }
2139
2140 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2141                              ldlm_iterator_t replace, void *data)
2142 {
2143         struct ldlm_res_id res_id;
2144         struct obd_device *obd = class_exp2obd(exp);
2145
2146         ostid_build_res_name(&lsm->lsm_oi, &res_id);
2147         ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2148         return 0;
2149 }
2150
2151 /* find any ldlm lock of the inode in osc
2152  * return 0    not find
2153  *        1    find one
2154  *      < 0    error */
2155 static int osc_find_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2156                            ldlm_iterator_t replace, void *data)
2157 {
2158         struct ldlm_res_id res_id;
2159         struct obd_device *obd = class_exp2obd(exp);
2160         int rc = 0;
2161
2162         ostid_build_res_name(&lsm->lsm_oi, &res_id);
2163         rc = ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2164         if (rc == LDLM_ITER_STOP)
2165                 return(1);
2166         if (rc == LDLM_ITER_CONTINUE)
2167                 return(0);
2168         return(rc);
2169 }
2170
2171 static int osc_enqueue_fini(struct ptlrpc_request *req, struct ost_lvb *lvb,
2172                             obd_enqueue_update_f upcall, void *cookie,
2173                             __u64 *flags, int agl, int rc)
2174 {
2175         int intent = *flags & LDLM_FL_HAS_INTENT;
2176         ENTRY;
2177
2178         if (intent) {
2179                 /* The request was created before ldlm_cli_enqueue call. */
2180                 if (rc == ELDLM_LOCK_ABORTED) {
2181                         struct ldlm_reply *rep;
2182                         rep = req_capsule_server_get(&req->rq_pill,
2183                                                      &RMF_DLM_REP);
2184
2185                         LASSERT(rep != NULL);
2186                         rep->lock_policy_res1 =
2187                                 ptlrpc_status_ntoh(rep->lock_policy_res1);
2188                         if (rep->lock_policy_res1)
2189                                 rc = rep->lock_policy_res1;
2190                 }
2191         }
2192
2193         if ((intent != 0 && rc == ELDLM_LOCK_ABORTED && agl == 0) ||
2194             (rc == 0)) {
2195                 *flags |= LDLM_FL_LVB_READY;
2196                 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
2197                        lvb->lvb_size, lvb->lvb_blocks, lvb->lvb_mtime);
2198         }
2199
2200         /* Call the update callback. */
2201         rc = (*upcall)(cookie, rc);
2202         RETURN(rc);
2203 }
2204
2205 static int osc_enqueue_interpret(const struct lu_env *env,
2206                                  struct ptlrpc_request *req,
2207                                  struct osc_enqueue_args *aa, int rc)
2208 {
2209         struct ldlm_lock *lock;
2210         struct lustre_handle handle;
2211         __u32 mode;
2212         struct ost_lvb *lvb;
2213         __u32 lvb_len;
2214         __u64 *flags = aa->oa_flags;
2215
2216         /* Make a local copy of a lock handle and a mode, because aa->oa_*
2217          * might be freed anytime after lock upcall has been called. */
2218         lustre_handle_copy(&handle, aa->oa_lockh);
2219         mode = aa->oa_ei->ei_mode;
2220
2221         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2222          * be valid. */
2223         lock = ldlm_handle2lock(&handle);
2224
2225         /* Take an additional reference so that a blocking AST that
2226          * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2227          * to arrive after an upcall has been executed by
2228          * osc_enqueue_fini(). */
2229         ldlm_lock_addref(&handle, mode);
2230
2231         /* Let cl_lock_state_wait fail with -ERESTARTSYS to unuse sublocks. */
2232         OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_HANG, 2);
2233
2234         /* Let CP AST to grant the lock first. */
2235         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
2236
2237         if (aa->oa_agl && rc == ELDLM_LOCK_ABORTED) {
2238                 lvb = NULL;
2239                 lvb_len = 0;
2240         } else {
2241                 lvb = aa->oa_lvb;
2242                 lvb_len = sizeof(*aa->oa_lvb);
2243         }
2244
2245         /* Complete obtaining the lock procedure. */
2246         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
2247                                    mode, flags, lvb, lvb_len, &handle, rc);
2248         /* Complete osc stuff. */
2249         rc = osc_enqueue_fini(req, aa->oa_lvb, aa->oa_upcall, aa->oa_cookie,
2250                               flags, aa->oa_agl, rc);
2251
2252         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
2253
2254         /* Release the lock for async request. */
2255         if (lustre_handle_is_used(&handle) && rc == ELDLM_OK)
2256                 /*
2257                  * Releases a reference taken by ldlm_cli_enqueue(), if it is
2258                  * not already released by
2259                  * ldlm_cli_enqueue_fini()->failed_lock_cleanup()
2260                  */
2261                 ldlm_lock_decref(&handle, mode);
2262
2263         LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
2264                  aa->oa_lockh, req, aa);
2265         ldlm_lock_decref(&handle, mode);
2266         LDLM_LOCK_PUT(lock);
2267         return rc;
2268 }
2269
2270 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
2271
2272 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2273  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2274  * other synchronous requests, however keeping some locks and trying to obtain
2275  * others may take a considerable amount of time in a case of ost failure; and
2276  * when other sync requests do not get released lock from a client, the client
2277  * is excluded from the cluster -- such scenarious make the life difficult, so
2278  * release locks just after they are obtained. */
2279 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2280                      __u64 *flags, ldlm_policy_data_t *policy,
2281                      struct ost_lvb *lvb, int kms_valid,
2282                      obd_enqueue_update_f upcall, void *cookie,
2283                      struct ldlm_enqueue_info *einfo,
2284                      struct lustre_handle *lockh,
2285                      struct ptlrpc_request_set *rqset, int async, int agl)
2286 {
2287         struct obd_device *obd = exp->exp_obd;
2288         struct ptlrpc_request *req = NULL;
2289         int intent = *flags & LDLM_FL_HAS_INTENT;
2290         __u64 match_lvb = (agl != 0 ? 0 : LDLM_FL_LVB_READY);
2291         ldlm_mode_t mode;
2292         int rc;
2293         ENTRY;
2294
2295         /* Filesystem lock extents are extended to page boundaries so that
2296          * dealing with the page cache is a little smoother.  */
2297         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2298         policy->l_extent.end |= ~CFS_PAGE_MASK;
2299
2300         /*
2301          * kms is not valid when either object is completely fresh (so that no
2302          * locks are cached), or object was evicted. In the latter case cached
2303          * lock cannot be used, because it would prime inode state with
2304          * potentially stale LVB.
2305          */
2306         if (!kms_valid)
2307                 goto no_match;
2308
2309         /* Next, search for already existing extent locks that will cover us */
2310         /* If we're trying to read, we also search for an existing PW lock.  The
2311          * VFS and page cache already protect us locally, so lots of readers/
2312          * writers can share a single PW lock.
2313          *
2314          * There are problems with conversion deadlocks, so instead of
2315          * converting a read lock to a write lock, we'll just enqueue a new
2316          * one.
2317          *
2318          * At some point we should cancel the read lock instead of making them
2319          * send us a blocking callback, but there are problems with canceling
2320          * locks out from other users right now, too. */
2321         mode = einfo->ei_mode;
2322         if (einfo->ei_mode == LCK_PR)
2323                 mode |= LCK_PW;
2324         mode = ldlm_lock_match(obd->obd_namespace, *flags | match_lvb, res_id,
2325                                einfo->ei_type, policy, mode, lockh, 0);
2326         if (mode) {
2327                 struct ldlm_lock *matched = ldlm_handle2lock(lockh);
2328
2329                 if ((agl != 0) && !ldlm_is_lvb_ready(matched)) {
2330                         /* For AGL, if enqueue RPC is sent but the lock is not
2331                          * granted, then skip to process this strpe.
2332                          * Return -ECANCELED to tell the caller. */
2333                         ldlm_lock_decref(lockh, mode);
2334                         LDLM_LOCK_PUT(matched);
2335                         RETURN(-ECANCELED);
2336                 } else if (osc_set_lock_data_with_check(matched, einfo)) {
2337                         *flags |= LDLM_FL_LVB_READY;
2338                         /* addref the lock only if not async requests and PW
2339                          * lock is matched whereas we asked for PR. */
2340                         if (!rqset && einfo->ei_mode != mode)
2341                                 ldlm_lock_addref(lockh, LCK_PR);
2342                         if (intent) {
2343                                 /* I would like to be able to ASSERT here that
2344                                  * rss <= kms, but I can't, for reasons which
2345                                  * are explained in lov_enqueue() */
2346                         }
2347
2348                         /* We already have a lock, and it's referenced.
2349                          *
2350                          * At this point, the cl_lock::cll_state is CLS_QUEUING,
2351                          * AGL upcall may change it to CLS_HELD directly. */
2352                         (*upcall)(cookie, ELDLM_OK);
2353
2354                         if (einfo->ei_mode != mode)
2355                                 ldlm_lock_decref(lockh, LCK_PW);
2356                         else if (rqset)
2357                                 /* For async requests, decref the lock. */
2358                                 ldlm_lock_decref(lockh, einfo->ei_mode);
2359                         LDLM_LOCK_PUT(matched);
2360                         RETURN(ELDLM_OK);
2361                 } else {
2362                         ldlm_lock_decref(lockh, mode);
2363                         LDLM_LOCK_PUT(matched);
2364                 }
2365         }
2366
2367  no_match:
2368         if (intent) {
2369                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2370                                            &RQF_LDLM_ENQUEUE_LVB);
2371                 if (req == NULL)
2372                         RETURN(-ENOMEM);
2373
2374                 rc = ptlrpc_request_pack(req, LUSTRE_DLM_VERSION, LDLM_ENQUEUE);
2375                 if (rc < 0) {
2376                         ptlrpc_request_free(req);
2377                         RETURN(rc);
2378                 }
2379
2380                 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
2381                                      sizeof *lvb);
2382                 ptlrpc_request_set_replen(req);
2383         }
2384
2385         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2386         *flags &= ~LDLM_FL_BLOCK_GRANTED;
2387
2388         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2389                               sizeof(*lvb), LVB_T_OST, lockh, async);
2390         if (rqset) {
2391                 if (!rc) {
2392                         struct osc_enqueue_args *aa;
2393                         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2394                         aa = ptlrpc_req_async_args(req);
2395                         aa->oa_ei = einfo;
2396                         aa->oa_exp = exp;
2397                         aa->oa_flags  = flags;
2398                         aa->oa_upcall = upcall;
2399                         aa->oa_cookie = cookie;
2400                         aa->oa_lvb    = lvb;
2401                         aa->oa_lockh  = lockh;
2402                         aa->oa_agl    = !!agl;
2403
2404                         req->rq_interpret_reply =
2405                                 (ptlrpc_interpterer_t)osc_enqueue_interpret;
2406                         if (rqset == PTLRPCD_SET)
2407                                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
2408                         else
2409                                 ptlrpc_set_add_req(rqset, req);
2410                 } else if (intent) {
2411                         ptlrpc_req_finished(req);
2412                 }
2413                 RETURN(rc);
2414         }
2415
2416         rc = osc_enqueue_fini(req, lvb, upcall, cookie, flags, agl, rc);
2417         if (intent)
2418                 ptlrpc_req_finished(req);
2419
2420         RETURN(rc);
2421 }
2422
2423 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2424                    __u32 type, ldlm_policy_data_t *policy, __u32 mode,
2425                    __u64 *flags, void *data, struct lustre_handle *lockh,
2426                    int unref)
2427 {
2428         struct obd_device *obd = exp->exp_obd;
2429         __u64 lflags = *flags;
2430         ldlm_mode_t rc;
2431         ENTRY;
2432
2433         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2434                 RETURN(-EIO);
2435
2436         /* Filesystem lock extents are extended to page boundaries so that
2437          * dealing with the page cache is a little smoother */
2438         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2439         policy->l_extent.end |= ~CFS_PAGE_MASK;
2440
2441         /* Next, search for already existing extent locks that will cover us */
2442         /* If we're trying to read, we also search for an existing PW lock.  The
2443          * VFS and page cache already protect us locally, so lots of readers/
2444          * writers can share a single PW lock. */
2445         rc = mode;
2446         if (mode == LCK_PR)
2447                 rc |= LCK_PW;
2448         rc = ldlm_lock_match(obd->obd_namespace, lflags,
2449                              res_id, type, policy, rc, lockh, unref);
2450         if (rc) {
2451                 if (data != NULL) {
2452                         if (!osc_set_data_with_check(lockh, data)) {
2453                                 if (!(lflags & LDLM_FL_TEST_LOCK))
2454                                         ldlm_lock_decref(lockh, rc);
2455                                 RETURN(0);
2456                         }
2457                 }
2458                 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
2459                         ldlm_lock_addref(lockh, LCK_PR);
2460                         ldlm_lock_decref(lockh, LCK_PW);
2461                 }
2462                 RETURN(rc);
2463         }
2464         RETURN(rc);
2465 }
2466
2467 int osc_cancel_base(struct lustre_handle *lockh, __u32 mode)
2468 {
2469         ENTRY;
2470
2471         if (unlikely(mode == LCK_GROUP))
2472                 ldlm_lock_decref_and_cancel(lockh, mode);
2473         else
2474                 ldlm_lock_decref(lockh, mode);
2475
2476         RETURN(0);
2477 }
2478
2479 static int osc_statfs_interpret(const struct lu_env *env,
2480                                 struct ptlrpc_request *req,
2481                                 struct osc_async_args *aa, int rc)
2482 {
2483         struct obd_statfs *msfs;
2484         ENTRY;
2485
2486         if (rc == -EBADR)
2487                 /* The request has in fact never been sent
2488                  * due to issues at a higher level (LOV).
2489                  * Exit immediately since the caller is
2490                  * aware of the problem and takes care
2491                  * of the clean up */
2492                  RETURN(rc);
2493
2494         if ((rc == -ENOTCONN || rc == -EAGAIN) &&
2495             (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
2496                 GOTO(out, rc = 0);
2497
2498         if (rc != 0)
2499                 GOTO(out, rc);
2500
2501         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2502         if (msfs == NULL) {
2503                 GOTO(out, rc = -EPROTO);
2504         }
2505
2506         *aa->aa_oi->oi_osfs = *msfs;
2507 out:
2508         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2509         RETURN(rc);
2510 }
2511
2512 static int osc_statfs_async(struct obd_export *exp,
2513                             struct obd_info *oinfo, __u64 max_age,
2514                             struct ptlrpc_request_set *rqset)
2515 {
2516         struct obd_device     *obd = class_exp2obd(exp);
2517         struct ptlrpc_request *req;
2518         struct osc_async_args *aa;
2519         int                    rc;
2520         ENTRY;
2521
2522         /* We could possibly pass max_age in the request (as an absolute
2523          * timestamp or a "seconds.usec ago") so the target can avoid doing
2524          * extra calls into the filesystem if that isn't necessary (e.g.
2525          * during mount that would help a bit).  Having relative timestamps
2526          * is not so great if request processing is slow, while absolute
2527          * timestamps are not ideal because they need time synchronization. */
2528         req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
2529         if (req == NULL)
2530                 RETURN(-ENOMEM);
2531
2532         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2533         if (rc) {
2534                 ptlrpc_request_free(req);
2535                 RETURN(rc);
2536         }
2537         ptlrpc_request_set_replen(req);
2538         req->rq_request_portal = OST_CREATE_PORTAL;
2539         ptlrpc_at_set_req_timeout(req);
2540
2541         if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
2542                 /* procfs requests not want stat in wait for avoid deadlock */
2543                 req->rq_no_resend = 1;
2544                 req->rq_no_delay = 1;
2545         }
2546
2547         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
2548         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2549         aa = ptlrpc_req_async_args(req);
2550         aa->aa_oi = oinfo;
2551
2552         ptlrpc_set_add_req(rqset, req);
2553         RETURN(0);
2554 }
2555
2556 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
2557                       struct obd_statfs *osfs, __u64 max_age, __u32 flags)
2558 {
2559         struct obd_device     *obd = class_exp2obd(exp);
2560         struct obd_statfs     *msfs;
2561         struct ptlrpc_request *req;
2562         struct obd_import     *imp = NULL;
2563         int rc;
2564         ENTRY;
2565
2566         /*Since the request might also come from lprocfs, so we need
2567          *sync this with client_disconnect_export Bug15684*/
2568         down_read(&obd->u.cli.cl_sem);
2569         if (obd->u.cli.cl_import)
2570                 imp = class_import_get(obd->u.cli.cl_import);
2571         up_read(&obd->u.cli.cl_sem);
2572         if (!imp)
2573                 RETURN(-ENODEV);
2574
2575         /* We could possibly pass max_age in the request (as an absolute
2576          * timestamp or a "seconds.usec ago") so the target can avoid doing
2577          * extra calls into the filesystem if that isn't necessary (e.g.
2578          * during mount that would help a bit).  Having relative timestamps
2579          * is not so great if request processing is slow, while absolute
2580          * timestamps are not ideal because they need time synchronization. */
2581         req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
2582
2583         class_import_put(imp);
2584
2585         if (req == NULL)
2586                 RETURN(-ENOMEM);
2587
2588         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2589         if (rc) {
2590                 ptlrpc_request_free(req);
2591                 RETURN(rc);
2592         }
2593         ptlrpc_request_set_replen(req);
2594         req->rq_request_portal = OST_CREATE_PORTAL;
2595         ptlrpc_at_set_req_timeout(req);
2596
2597         if (flags & OBD_STATFS_NODELAY) {
2598                 /* procfs requests not want stat in wait for avoid deadlock */
2599                 req->rq_no_resend = 1;
2600                 req->rq_no_delay = 1;
2601         }
2602
2603         rc = ptlrpc_queue_wait(req);
2604         if (rc)
2605                 GOTO(out, rc);
2606
2607         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2608         if (msfs == NULL) {
2609                 GOTO(out, rc = -EPROTO);
2610         }
2611
2612         *osfs = *msfs;
2613
2614         EXIT;
2615  out:
2616         ptlrpc_req_finished(req);
2617         return rc;
2618 }
2619
2620 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2621                          void *karg, void *uarg)
2622 {
2623         struct obd_device *obd = exp->exp_obd;
2624         struct obd_ioctl_data *data = karg;
2625         int err = 0;
2626         ENTRY;
2627
2628         if (!try_module_get(THIS_MODULE)) {
2629                 CERROR("Can't get module. Is it alive?");
2630                 return -EINVAL;
2631         }
2632         switch (cmd) {
2633         case OBD_IOC_CLIENT_RECOVER:
2634                 err = ptlrpc_recover_import(obd->u.cli.cl_import,
2635                                             data->ioc_inlbuf1, 0);
2636                 if (err > 0)
2637                         err = 0;
2638                 GOTO(out, err);
2639         case IOC_OSC_SET_ACTIVE:
2640                 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
2641                                                data->ioc_offset);
2642                 GOTO(out, err);
2643         case OBD_IOC_POLL_QUOTACHECK:
2644                 err = osc_quota_poll_check(exp, (struct if_quotacheck *)karg);
2645                 GOTO(out, err);
2646         case OBD_IOC_PING_TARGET:
2647                 err = ptlrpc_obd_ping(obd);
2648                 GOTO(out, err);
2649         default:
2650                 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
2651                        cmd, current_comm());
2652                 GOTO(out, err = -ENOTTY);
2653         }
2654 out:
2655         module_put(THIS_MODULE);
2656         return err;
2657 }
2658
2659 static int osc_get_info(const struct lu_env *env, struct obd_export *exp,
2660                         obd_count keylen, void *key, __u32 *vallen, void *val,
2661                         struct lov_stripe_md *lsm)
2662 {
2663         ENTRY;
2664         if (!vallen || !val)
2665                 RETURN(-EFAULT);
2666
2667         if (KEY_IS(KEY_LOCK_TO_STRIPE)) {
2668                 __u32 *stripe = val;
2669                 *vallen = sizeof(*stripe);
2670                 *stripe = 0;
2671                 RETURN(0);
2672         } else if (KEY_IS(KEY_LAST_ID)) {
2673                 struct ptlrpc_request *req;
2674                 obd_id                *reply;
2675                 char                  *tmp;
2676                 int                    rc;
2677
2678                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2679                                            &RQF_OST_GET_INFO_LAST_ID);
2680                 if (req == NULL)
2681                         RETURN(-ENOMEM);
2682
2683                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
2684                                      RCL_CLIENT, keylen);
2685                 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
2686                 if (rc) {
2687                         ptlrpc_request_free(req);
2688                         RETURN(rc);
2689                 }
2690
2691                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
2692                 memcpy(tmp, key, keylen);
2693
2694                 req->rq_no_delay = req->rq_no_resend = 1;
2695                 ptlrpc_request_set_replen(req);
2696                 rc = ptlrpc_queue_wait(req);
2697                 if (rc)
2698                         GOTO(out, rc);
2699
2700                 reply = req_capsule_server_get(&req->rq_pill, &RMF_OBD_ID);
2701                 if (reply == NULL)
2702                         GOTO(out, rc = -EPROTO);
2703
2704                 *((obd_id *)val) = *reply;
2705         out:
2706                 ptlrpc_req_finished(req);
2707                 RETURN(rc);
2708         } else if (KEY_IS(KEY_FIEMAP)) {
2709                 struct ll_fiemap_info_key *fm_key =
2710                                 (struct ll_fiemap_info_key *)key;
2711                 struct ldlm_res_id       res_id;
2712                 ldlm_policy_data_t       policy;
2713                 struct lustre_handle     lockh;
2714                 ldlm_mode_t              mode = 0;
2715                 struct ptlrpc_request   *req;
2716                 struct ll_user_fiemap   *reply;
2717                 char                    *tmp;
2718                 int                      rc;
2719
2720                 if (!(fm_key->fiemap.fm_flags & FIEMAP_FLAG_SYNC))
2721                         goto skip_locking;
2722
2723                 policy.l_extent.start = fm_key->fiemap.fm_start &
2724                                                 CFS_PAGE_MASK;
2725
2726                 if (OBD_OBJECT_EOF - fm_key->fiemap.fm_length <=
2727                     fm_key->fiemap.fm_start + PAGE_CACHE_SIZE - 1)
2728                         policy.l_extent.end = OBD_OBJECT_EOF;
2729                 else
2730                         policy.l_extent.end = (fm_key->fiemap.fm_start +
2731                                 fm_key->fiemap.fm_length +
2732                                 PAGE_CACHE_SIZE - 1) & CFS_PAGE_MASK;
2733
2734                 ostid_build_res_name(&fm_key->oa.o_oi, &res_id);
2735                 mode = ldlm_lock_match(exp->exp_obd->obd_namespace,
2736                                        LDLM_FL_BLOCK_GRANTED |
2737                                        LDLM_FL_LVB_READY,
2738                                        &res_id, LDLM_EXTENT, &policy,
2739                                        LCK_PR | LCK_PW, &lockh, 0);
2740                 if (mode) { /* lock is cached on client */
2741                         if (mode != LCK_PR) {
2742                                 ldlm_lock_addref(&lockh, LCK_PR);
2743                                 ldlm_lock_decref(&lockh, LCK_PW);
2744                         }
2745                 } else { /* no cached lock, needs acquire lock on server side */
2746                         fm_key->oa.o_valid |= OBD_MD_FLFLAGS;
2747                         fm_key->oa.o_flags |= OBD_FL_SRVLOCK;
2748                 }
2749
2750 skip_locking:
2751                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2752                                            &RQF_OST_GET_INFO_FIEMAP);
2753                 if (req == NULL)
2754                         GOTO(drop_lock, rc = -ENOMEM);
2755
2756                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_KEY,
2757                                      RCL_CLIENT, keylen);
2758                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
2759                                      RCL_CLIENT, *vallen);
2760                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
2761                                      RCL_SERVER, *vallen);
2762
2763                 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
2764                 if (rc) {
2765                         ptlrpc_request_free(req);
2766                         GOTO(drop_lock, rc);
2767                 }
2768
2769                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_KEY);
2770                 memcpy(tmp, key, keylen);
2771                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_VAL);
2772                 memcpy(tmp, val, *vallen);
2773
2774                 ptlrpc_request_set_replen(req);
2775                 rc = ptlrpc_queue_wait(req);
2776                 if (rc)
2777                         GOTO(fini_req, rc);
2778
2779                 reply = req_capsule_server_get(&req->rq_pill, &RMF_FIEMAP_VAL);
2780                 if (reply == NULL)
2781                         GOTO(fini_req, rc = -EPROTO);
2782
2783                 memcpy(val, reply, *vallen);
2784 fini_req:
2785                 ptlrpc_req_finished(req);
2786 drop_lock:
2787                 if (mode)
2788                         ldlm_lock_decref(&lockh, LCK_PR);
2789                 RETURN(rc);
2790         }
2791
2792         RETURN(-EINVAL);
2793 }
2794
2795 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
2796                               obd_count keylen, void *key, obd_count vallen,
2797                               void *val, struct ptlrpc_request_set *set)
2798 {
2799         struct ptlrpc_request *req;
2800         struct obd_device     *obd = exp->exp_obd;
2801         struct obd_import     *imp = class_exp2cliimp(exp);
2802         char                  *tmp;
2803         int                    rc;
2804         ENTRY;
2805
2806         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
2807
2808         if (KEY_IS(KEY_CHECKSUM)) {
2809                 if (vallen != sizeof(int))
2810                         RETURN(-EINVAL);
2811                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
2812                 RETURN(0);
2813         }
2814
2815         if (KEY_IS(KEY_SPTLRPC_CONF)) {
2816                 sptlrpc_conf_client_adapt(obd);
2817                 RETURN(0);
2818         }
2819
2820         if (KEY_IS(KEY_FLUSH_CTX)) {
2821                 sptlrpc_import_flush_my_ctx(imp);
2822                 RETURN(0);
2823         }
2824
2825         if (KEY_IS(KEY_CACHE_SET)) {
2826                 struct client_obd *cli = &obd->u.cli;
2827
2828                 LASSERT(cli->cl_cache == NULL); /* only once */
2829                 cli->cl_cache = (struct cl_client_cache *)val;
2830                 atomic_inc(&cli->cl_cache->ccc_users);
2831                 cli->cl_lru_left = &cli->cl_cache->ccc_lru_left;
2832
2833                 /* add this osc into entity list */
2834                 LASSERT(list_empty(&cli->cl_lru_osc));
2835                 spin_lock(&cli->cl_cache->ccc_lru_lock);
2836                 list_add(&cli->cl_lru_osc, &cli->cl_cache->ccc_lru);
2837                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
2838
2839                 RETURN(0);
2840         }
2841
2842         if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
2843                 struct client_obd *cli = &obd->u.cli;
2844                 int nr = atomic_read(&cli->cl_lru_in_list) >> 1;
2845                 int target = *(int *)val;
2846
2847                 nr = osc_lru_shrink(env, cli, min(nr, target), true);
2848                 *(int *)val -= nr;
2849                 RETURN(0);
2850         }
2851
2852         if (!set && !KEY_IS(KEY_GRANT_SHRINK))
2853                 RETURN(-EINVAL);
2854
2855         /* We pass all other commands directly to OST. Since nobody calls osc
2856            methods directly and everybody is supposed to go through LOV, we
2857            assume lov checked invalid values for us.
2858            The only recognised values so far are evict_by_nid and mds_conn.
2859            Even if something bad goes through, we'd get a -EINVAL from OST
2860            anyway. */
2861
2862         req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ?
2863                                                 &RQF_OST_SET_GRANT_INFO :
2864                                                 &RQF_OBD_SET_INFO);
2865         if (req == NULL)
2866                 RETURN(-ENOMEM);
2867
2868         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
2869                              RCL_CLIENT, keylen);
2870         if (!KEY_IS(KEY_GRANT_SHRINK))
2871                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
2872                                      RCL_CLIENT, vallen);
2873         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
2874         if (rc) {
2875                 ptlrpc_request_free(req);
2876                 RETURN(rc);
2877         }
2878
2879         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
2880         memcpy(tmp, key, keylen);
2881         tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ?
2882                                                         &RMF_OST_BODY :
2883                                                         &RMF_SETINFO_VAL);
2884         memcpy(tmp, val, vallen);
2885
2886         if (KEY_IS(KEY_GRANT_SHRINK)) {
2887                 struct osc_grant_args *aa;
2888                 struct obdo *oa;
2889
2890                 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2891                 aa = ptlrpc_req_async_args(req);
2892                 OBDO_ALLOC(oa);
2893                 if (!oa) {
2894                         ptlrpc_req_finished(req);
2895                         RETURN(-ENOMEM);
2896                 }
2897                 *oa = ((struct ost_body *)val)->oa;
2898                 aa->aa_oa = oa;
2899                 req->rq_interpret_reply = osc_shrink_grant_interpret;
2900         }
2901
2902         ptlrpc_request_set_replen(req);
2903         if (!KEY_IS(KEY_GRANT_SHRINK)) {
2904                 LASSERT(set != NULL);
2905                 ptlrpc_set_add_req(set, req);
2906                 ptlrpc_check_set(NULL, set);
2907         } else
2908                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
2909
2910         RETURN(0);
2911 }
2912
2913 static int osc_reconnect(const struct lu_env *env,
2914                          struct obd_export *exp, struct obd_device *obd,
2915                          struct obd_uuid *cluuid,
2916                          struct obd_connect_data *data,
2917                          void *localdata)
2918 {
2919         struct client_obd *cli = &obd->u.cli;
2920
2921         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
2922                 long lost_grant;
2923
2924                 client_obd_list_lock(&cli->cl_loi_list_lock);
2925                 data->ocd_grant = (cli->cl_avail_grant + cli->cl_dirty) ?:
2926                                 2 * cli_brw_size(obd);
2927                 lost_grant = cli->cl_lost_grant;
2928                 cli->cl_lost_grant = 0;
2929                 client_obd_list_unlock(&cli->cl_loi_list_lock);
2930
2931                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
2932                        " ocd_grant: %d, lost: %ld.\n", data->ocd_connect_flags,
2933                        data->ocd_version, data->ocd_grant, lost_grant);
2934         }
2935
2936         RETURN(0);
2937 }
2938
2939 static int osc_disconnect(struct obd_export *exp)
2940 {
2941         struct obd_device *obd = class_exp2obd(exp);
2942         struct llog_ctxt  *ctxt;
2943         int rc;
2944
2945         ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
2946         if (ctxt) {
2947                 if (obd->u.cli.cl_conn_count == 1) {
2948                         /* Flush any remaining cancel messages out to the
2949                          * target */
2950                         llog_sync(ctxt, exp, 0);
2951                 }
2952                 llog_ctxt_put(ctxt);
2953         } else {
2954                 CDEBUG(D_HA, "No LLOG_SIZE_REPL_CTXT found in obd %p\n",
2955                        obd);
2956         }
2957
2958         rc = client_disconnect_export(exp);
2959         /**
2960          * Initially we put del_shrink_grant before disconnect_export, but it
2961          * causes the following problem if setup (connect) and cleanup
2962          * (disconnect) are tangled together.
2963          *      connect p1                     disconnect p2
2964          *   ptlrpc_connect_import
2965          *     ...............               class_manual_cleanup
2966          *                                     osc_disconnect
2967          *                                     del_shrink_grant
2968          *   ptlrpc_connect_interrupt
2969          *     init_grant_shrink
2970          *   add this client to shrink list
2971          *                                      cleanup_osc
2972          * Bang! pinger trigger the shrink.
2973          * So the osc should be disconnected from the shrink list, after we
2974          * are sure the import has been destroyed. BUG18662
2975          */
2976         if (obd->u.cli.cl_import == NULL)
2977                 osc_del_shrink_grant(&obd->u.cli);
2978         return rc;
2979 }
2980
2981 static int osc_import_event(struct obd_device *obd,
2982                             struct obd_import *imp,
2983                             enum obd_import_event event)
2984 {
2985         struct client_obd *cli;
2986         int rc = 0;
2987
2988         ENTRY;
2989         LASSERT(imp->imp_obd == obd);
2990
2991         switch (event) {
2992         case IMP_EVENT_DISCON: {
2993                 cli = &obd->u.cli;
2994                 client_obd_list_lock(&cli->cl_loi_list_lock);
2995                 cli->cl_avail_grant = 0;
2996                 cli->cl_lost_grant = 0;
2997                 client_obd_list_unlock(&cli->cl_loi_list_lock);
2998                 break;
2999         }
3000         case IMP_EVENT_INACTIVE: {
3001                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
3002                 break;
3003         }
3004         case IMP_EVENT_INVALIDATE: {
3005                 struct ldlm_namespace *ns = obd->obd_namespace;
3006                 struct lu_env         *env;
3007                 int                    refcheck;
3008
3009                 env = cl_env_get(&refcheck);
3010                 if (!IS_ERR(env)) {
3011                         /* Reset grants */
3012                         cli = &obd->u.cli;
3013                         /* all pages go to failing rpcs due to the invalid
3014                          * import */
3015                         osc_io_unplug(env, cli, NULL, PDL_POLICY_ROUND);
3016
3017                         ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3018                         cl_env_put(env, &refcheck);
3019                 } else
3020                         rc = PTR_ERR(env);
3021                 break;
3022         }
3023         case IMP_EVENT_ACTIVE: {
3024                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
3025                 break;
3026         }
3027         case IMP_EVENT_OCD: {
3028                 struct obd_connect_data *ocd = &imp->imp_connect_data;
3029
3030                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
3031                         osc_init_grant(&obd->u.cli, ocd);
3032
3033                 /* See bug 7198 */
3034                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
3035                         imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
3036
3037                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
3038                 break;
3039         }
3040         case IMP_EVENT_DEACTIVATE: {
3041                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE, NULL);
3042                 break;
3043         }
3044         case IMP_EVENT_ACTIVATE: {
3045                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE, NULL);
3046                 break;
3047         }
3048         default:
3049                 CERROR("Unknown import event %d\n", event);
3050                 LBUG();
3051         }
3052         RETURN(rc);
3053 }
3054
3055 /**
3056  * Determine whether the lock can be canceled before replaying the lock
3057  * during recovery, see bug16774 for detailed information.
3058  *
3059  * \retval zero the lock can't be canceled
3060  * \retval other ok to cancel
3061  */
3062 static int osc_cancel_weight(struct ldlm_lock *lock)
3063 {
3064         /*
3065          * Cancel all unused and granted extent lock.
3066          */
3067         if (lock->l_resource->lr_type == LDLM_EXTENT &&
3068             lock->l_granted_mode == lock->l_req_mode &&
3069             osc_ldlm_weigh_ast(lock) == 0)
3070                 RETURN(1);
3071
3072         RETURN(0);
3073 }
3074
3075 static int brw_queue_work(const struct lu_env *env, void *data)
3076 {
3077         struct client_obd *cli = data;
3078
3079         CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
3080
3081         osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
3082         RETURN(0);
3083 }
3084
3085 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
3086 {
3087         struct client_obd *cli = &obd->u.cli;
3088         struct obd_type   *type;
3089         void              *handler;
3090         int                rc;
3091         ENTRY;
3092
3093         rc = ptlrpcd_addref();
3094         if (rc)
3095                 RETURN(rc);
3096
3097         rc = client_obd_setup(obd, lcfg);
3098         if (rc)
3099                 GOTO(out_ptlrpcd, rc);
3100
3101         handler = ptlrpcd_alloc_work(cli->cl_import, brw_queue_work, cli);
3102         if (IS_ERR(handler))
3103                 GOTO(out_client_setup, rc = PTR_ERR(handler));
3104         cli->cl_writeback_work = handler;
3105
3106         handler = ptlrpcd_alloc_work(cli->cl_import, lru_queue_work, cli);
3107         if (IS_ERR(handler))
3108                 GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
3109         cli->cl_lru_work = handler;
3110
3111         rc = osc_quota_setup(obd);
3112         if (rc)
3113                 GOTO(out_ptlrpcd_work, rc);
3114
3115         cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
3116
3117 #ifdef LPROCFS
3118         obd->obd_vars = lprocfs_osc_obd_vars;
3119 #endif
3120         /* If this is true then both client (osc) and server (osp) are on the
3121          * same node. The osp layer if loaded first will register the osc proc
3122          * directory. In that case this obd_device will be attached its proc
3123          * tree to type->typ_procsym instead of obd->obd_type->typ_procroot. */
3124         type = class_search_type(LUSTRE_OSP_NAME);
3125         if (type && type->typ_procsym) {
3126                 obd->obd_proc_entry = lprocfs_seq_register(obd->obd_name,
3127                                                            type->typ_procsym,
3128                                                            obd->obd_vars, obd);
3129                 if (IS_ERR(obd->obd_proc_entry)) {
3130                         rc = PTR_ERR(obd->obd_proc_entry);
3131                         CERROR("error %d setting up lprocfs for %s\n", rc,
3132                                obd->obd_name);
3133                         obd->obd_proc_entry = NULL;
3134                 }
3135         } else {
3136                 rc = lprocfs_seq_obd_setup(obd);
3137         }
3138
3139         /* If the basic OSC proc tree construction succeeded then
3140          * lets do the rest. */
3141         if (rc == 0) {
3142                 lproc_osc_attach_seqstat(obd);
3143                 sptlrpc_lprocfs_cliobd_attach(obd);
3144                 ptlrpc_lprocfs_register_obd(obd);
3145         }
3146
3147         /* We need to allocate a few requests more, because
3148          * brw_interpret tries to create new requests before freeing
3149          * previous ones, Ideally we want to have 2x max_rpcs_in_flight
3150          * reserved, but I'm afraid that might be too much wasted RAM
3151          * in fact, so 2 is just my guess and still should work. */
3152         cli->cl_import->imp_rq_pool =
3153                 ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
3154                                     OST_MAXREQSIZE,
3155                                     ptlrpc_add_rqs_to_pool);
3156
3157         INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
3158         ns_register_cancel(obd->obd_namespace, osc_cancel_weight);
3159         RETURN(0);
3160
3161 out_ptlrpcd_work:
3162         if (cli->cl_writeback_work != NULL) {
3163                 ptlrpcd_destroy_work(cli->cl_writeback_work);
3164                 cli->cl_writeback_work = NULL;
3165         }
3166         if (cli->cl_lru_work != NULL) {
3167                 ptlrpcd_destroy_work(cli->cl_lru_work);
3168                 cli->cl_lru_work = NULL;
3169         }
3170 out_client_setup:
3171         client_obd_cleanup(obd);
3172 out_ptlrpcd:
3173         ptlrpcd_decref();
3174         RETURN(rc);
3175 }
3176
3177 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
3178 {
3179         int rc = 0;
3180         ENTRY;
3181
3182         switch (stage) {
3183         case OBD_CLEANUP_EARLY: {
3184                 struct obd_import *imp;
3185                 imp = obd->u.cli.cl_import;
3186                 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
3187                 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
3188                 ptlrpc_deactivate_import(imp);
3189                 spin_lock(&imp->imp_lock);
3190                 imp->imp_pingable = 0;
3191                 spin_unlock(&imp->imp_lock);
3192                 break;
3193         }
3194         case OBD_CLEANUP_EXPORTS: {
3195                 struct client_obd *cli = &obd->u.cli;
3196                 /* LU-464
3197                  * for echo client, export may be on zombie list, wait for
3198                  * zombie thread to cull it, because cli.cl_import will be
3199                  * cleared in client_disconnect_export():
3200                  *   class_export_destroy() -> obd_cleanup() ->
3201                  *   echo_device_free() -> echo_client_cleanup() ->
3202                  *   obd_disconnect() -> osc_disconnect() ->
3203                  *   client_disconnect_export()
3204                  */
3205                 obd_zombie_barrier();
3206                 if (cli->cl_writeback_work) {
3207                         ptlrpcd_destroy_work(cli->cl_writeback_work);
3208                         cli->cl_writeback_work = NULL;
3209                 }
3210                 if (cli->cl_lru_work) {
3211                         ptlrpcd_destroy_work(cli->cl_lru_work);
3212                         cli->cl_lru_work = NULL;
3213                 }
3214                 obd_cleanup_client_import(obd);
3215                 ptlrpc_lprocfs_unregister_obd(obd);
3216                 lprocfs_obd_cleanup(obd);
3217                 rc = obd_llog_finish(obd, 0);
3218                 if (rc != 0)
3219                         CERROR("failed to cleanup llogging subsystems\n");
3220                 break;
3221                 }
3222         }
3223         RETURN(rc);
3224 }
3225
3226 int osc_cleanup(struct obd_device *obd)
3227 {
3228         struct client_obd *cli = &obd->u.cli;
3229         int rc;
3230
3231         ENTRY;
3232
3233         /* lru cleanup */
3234         if (cli->cl_cache != NULL) {
3235                 LASSERT(atomic_read(&cli->cl_cache->ccc_users) > 0);
3236                 spin_lock(&cli->cl_cache->ccc_lru_lock);
3237                 list_del_init(&cli->cl_lru_osc);
3238                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
3239                 cli->cl_lru_left = NULL;
3240                 atomic_dec(&cli->cl_cache->ccc_users);
3241                 cli->cl_cache = NULL;
3242         }
3243
3244         /* free memory of osc quota cache */
3245         osc_quota_cleanup(obd);
3246
3247         rc = client_obd_cleanup(obd);
3248
3249         ptlrpcd_decref();
3250         RETURN(rc);
3251 }
3252
3253 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
3254 {
3255         int rc = class_process_proc_seq_param(PARAM_OSC, obd->obd_vars,
3256                                               lcfg, obd);
3257         return rc > 0 ? 0: rc;
3258 }
3259
3260 static int osc_process_config(struct obd_device *obd, obd_count len, void *buf)
3261 {
3262         return osc_process_config_base(obd, buf);
3263 }
3264
3265 struct obd_ops osc_obd_ops = {
3266         .o_owner                = THIS_MODULE,
3267         .o_setup                = osc_setup,
3268         .o_precleanup           = osc_precleanup,
3269         .o_cleanup              = osc_cleanup,
3270         .o_add_conn             = client_import_add_conn,
3271         .o_del_conn             = client_import_del_conn,
3272         .o_connect              = client_connect_import,
3273         .o_reconnect            = osc_reconnect,
3274         .o_disconnect           = osc_disconnect,
3275         .o_statfs               = osc_statfs,
3276         .o_statfs_async         = osc_statfs_async,
3277         .o_unpackmd             = osc_unpackmd,
3278         .o_create               = osc_create,
3279         .o_destroy              = osc_destroy,
3280         .o_getattr              = osc_getattr,
3281         .o_getattr_async        = osc_getattr_async,
3282         .o_setattr              = osc_setattr,
3283         .o_setattr_async        = osc_setattr_async,
3284         .o_change_cbdata        = osc_change_cbdata,
3285         .o_find_cbdata          = osc_find_cbdata,
3286         .o_iocontrol            = osc_iocontrol,
3287         .o_get_info             = osc_get_info,
3288         .o_set_info_async       = osc_set_info_async,
3289         .o_import_event         = osc_import_event,
3290         .o_process_config       = osc_process_config,
3291         .o_quotactl             = osc_quotactl,
3292         .o_quotacheck           = osc_quotacheck,
3293 };
3294
3295 extern struct lu_kmem_descr osc_caches[];
3296 extern spinlock_t osc_ast_guard;
3297 extern struct lock_class_key osc_ast_guard_class;
3298
3299 int __init osc_init(void)
3300 {
3301         bool enable_proc = true;
3302         struct obd_type *type;
3303         int rc;
3304         ENTRY;
3305
3306         /* print an address of _any_ initialized kernel symbol from this
3307          * module, to allow debugging with gdb that doesn't support data
3308          * symbols from modules.*/
3309         CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
3310
3311         rc = lu_kmem_init(osc_caches);
3312         if (rc)
3313                 RETURN(rc);
3314
3315         type = class_search_type(LUSTRE_OSP_NAME);
3316         if (type != NULL && type->typ_procsym != NULL)
3317                 enable_proc = false;
3318
3319         rc = class_register_type(&osc_obd_ops, NULL, enable_proc, NULL,
3320 #ifndef HAVE_ONLY_PROCFS_SEQ
3321                                  NULL,
3322 #endif
3323                                  LUSTRE_OSC_NAME, &osc_device_type);
3324         if (rc) {
3325                 lu_kmem_fini(osc_caches);
3326                 RETURN(rc);
3327         }
3328
3329         spin_lock_init(&osc_ast_guard);
3330         lockdep_set_class(&osc_ast_guard, &osc_ast_guard_class);
3331
3332         RETURN(rc);
3333 }
3334
3335 #ifdef __KERNEL__
3336 static void /*__exit*/ osc_exit(void)
3337 {
3338         class_unregister_type(LUSTRE_OSC_NAME);
3339         lu_kmem_fini(osc_caches);
3340 }
3341
3342 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
3343 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3344 MODULE_LICENSE("GPL");
3345
3346 cfs_module(osc, LUSTRE_VERSION_STRING, osc_init, osc_exit);
3347 #endif